1pub use roboticus_api as api_crate;
37pub use roboticus_api::abuse;
38pub use roboticus_api::api;
39pub use roboticus_api::auth;
40pub use roboticus_api::config_runtime;
41pub use roboticus_api::cron_runtime;
42pub use roboticus_api::dashboard;
43pub use roboticus_api::rate_limit;
44pub use roboticus_api::ws;
45pub use roboticus_api::ws_ticket;
46pub use roboticus_api::{
47 AppState, EventBus, PersonalityState, TicketStore, build_dashboard_html, build_mcp_router,
48 build_public_router, build_router, dashboard_handler, ws_route,
49};
50
51pub use roboticus_cli::cli;
53pub use roboticus_cli::migrate;
54
55pub mod config_maintenance;
57pub mod daemon;
58pub mod plugins;
59pub use roboticus_cli::state_hygiene;
60
61use std::sync::Arc;
62use std::sync::OnceLock;
63use std::sync::atomic::{AtomicBool, Ordering};
64use std::time::Duration;
65
66use tokio::sync::RwLock;
67use tower_http::cors::CorsLayer;
68
69use auth::ApiKeyLayer;
70use roboticus_agent::policy::{
71 AuthorityRule, CommandSafetyRule, ConfigProtectionRule, FinancialRule, PathProtectionRule,
72 PolicyEngine, RateLimitRule, ValidationRule,
73};
74use roboticus_agent::subagents::SubagentRegistry;
75use roboticus_browser::Browser;
76use roboticus_channels::ChannelAdapter;
77use roboticus_channels::a2a::A2aProtocol;
78use roboticus_channels::router::ChannelRouter;
79use roboticus_channels::telegram::TelegramAdapter;
80use roboticus_channels::whatsapp::WhatsAppAdapter;
81use roboticus_core::RoboticusConfig;
82use roboticus_db::Database;
83use roboticus_llm::LlmService;
84use roboticus_llm::OAuthManager;
85use roboticus_wallet::{WalletPaymentHandler, WalletService};
86
87use roboticus_agent::approvals::ApprovalManager;
88use roboticus_agent::obsidian::ObsidianVault;
89use roboticus_agent::obsidian_tools::{ObsidianReadTool, ObsidianSearchTool, ObsidianWriteTool};
90use roboticus_agent::tools::{
91 BashTool, EchoTool, EditFileTool, GlobFilesTool, ListDirectoryTool, ReadFileTool,
92 ScriptRunnerTool, SearchFilesTool, ToolRegistry, WriteFileTool,
93};
94use roboticus_channels::discord::DiscordAdapter;
95use roboticus_channels::email::EmailAdapter;
96use roboticus_channels::matrix::MatrixAdapter;
97use roboticus_channels::signal::SignalAdapter;
98use roboticus_channels::voice::{VoiceConfig, VoicePipeline};
99
100use rate_limit::GlobalRateLimitLayer;
101
102static STDERR_ENABLED: AtomicBool = AtomicBool::new(false);
103static LOG_GUARD: OnceLock<tracing_appender::non_blocking::WorkerGuard> = OnceLock::new();
104
105fn is_taskable_subagent_role(role: &str) -> bool {
106 role.eq_ignore_ascii_case("subagent")
107 || role.eq_ignore_ascii_case("specialist")
108 || role.eq_ignore_ascii_case("observer")
109}
110
111pub fn enable_stderr_logging() {
112 STDERR_ENABLED.store(true, Ordering::Release);
113}
114
115fn init_logging(config: &RoboticusConfig) {
116 use tracing_appender::rolling::{RollingFileAppender, Rotation};
117 use tracing_subscriber::EnvFilter;
118 use tracing_subscriber::Layer;
119 use tracing_subscriber::filter::filter_fn;
120 use tracing_subscriber::fmt;
121 use tracing_subscriber::layer::SubscriberExt;
122 use tracing_subscriber::util::SubscriberInitExt;
123
124 let level = config.agent.log_level.as_str();
125 let base_filter = format!("{level},hyper=warn,h2=warn,rustls=warn");
128 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&base_filter));
129
130 let stderr_gate = filter_fn(|_| STDERR_ENABLED.load(Ordering::Acquire));
131
132 let log_dir = &config.server.log_dir;
133 let stderr_layer = fmt::layer()
134 .with_writer(std::io::stderr)
135 .with_filter(stderr_gate);
136
137 let file_appender = std::fs::create_dir_all(log_dir).ok().and_then(|_| {
138 RollingFileAppender::builder()
139 .rotation(Rotation::DAILY)
140 .filename_prefix("roboticus")
141 .filename_suffix("log")
142 .build(log_dir)
143 .map_err(|e| {
144 eprintln!(
145 "warning: failed to initialize file logging in {}: {e}",
146 log_dir.display()
147 );
148 e
149 })
150 .ok()
151 });
152
153 if let Some(file_appender) = file_appender {
154 let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
155 let _ = LOG_GUARD.set(guard);
156
157 let file_layer = fmt::layer()
158 .with_writer(non_blocking)
159 .with_ansi(false)
160 .json();
161
162 let _ = tracing_subscriber::registry()
163 .with(filter)
164 .with(stderr_layer)
165 .with(file_layer)
166 .try_init();
167 } else {
168 let _ = tracing_subscriber::registry()
169 .with(filter)
170 .with(stderr_layer)
171 .try_init();
172 }
173
174 cleanup_old_logs(log_dir, config.server.log_max_days);
175}
176
177fn cleanup_old_logs(log_dir: &std::path::Path, max_days: u32) {
178 let cutoff =
179 std::time::SystemTime::now() - std::time::Duration::from_secs(u64::from(max_days) * 86400);
180
181 let entries = match std::fs::read_dir(log_dir) {
182 Ok(e) => e,
183 Err(_) => return,
184 };
185
186 for entry in entries.flatten() {
187 let path = entry.path();
188 if path.extension().and_then(|e| e.to_str()) != Some("log") {
189 continue;
190 }
191 if let Ok(meta) = entry.metadata()
192 && let Ok(modified) = meta.modified()
193 && modified < cutoff
194 {
195 let _ = std::fs::remove_file(&path);
197 }
198 }
199}
200
201fn resolve_token(
203 token_ref: &Option<String>,
204 token_env: &str,
205 keystore: &roboticus_core::Keystore,
206) -> String {
207 if let Some(r) = token_ref
208 && let Some(name) = r.strip_prefix("keystore:")
209 {
210 if let Some(val) = keystore.get(name) {
211 return val;
212 }
213 tracing::warn!(key = %name, "keystore reference not found, falling back to env var");
214 }
215 if !token_env.is_empty() {
216 return match std::env::var(token_env) {
217 Ok(val) if !val.is_empty() => val,
218 Ok(_) => {
219 tracing::warn!(env_var = %token_env, "API key env var is set but empty");
220 String::new()
221 }
222 Err(_) => {
223 tracing::warn!(env_var = %token_env, "API key env var is not set");
224 String::new()
225 }
226 };
227 }
228 String::new()
229}
230
231pub async fn bootstrap(
233 config: RoboticusConfig,
234) -> Result<axum::Router, Box<dyn std::error::Error>> {
235 bootstrap_with_config_path(config, None).await
236}
237
238struct BootPhase {
241 start: std::time::Instant,
242 dot_handle: tokio::task::JoinHandle<()>,
243}
244
245impl BootPhase {
246 fn start(label: &str) -> Self {
247 use std::io::Write;
248 eprint!(" {label} ");
249 let _ = std::io::stderr().flush();
250 let dot_handle = tokio::spawn(async {
251 loop {
252 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
253 eprint!(".");
254 let _ = std::io::stderr().flush();
255 }
256 });
257 Self {
258 start: std::time::Instant::now(),
259 dot_handle,
260 }
261 }
262
263 fn done(self) {
264 self.dot_handle.abort();
265 let elapsed = self.start.elapsed();
266 if elapsed.as_millis() >= 500 {
267 eprintln!("({:.1}s)", elapsed.as_secs_f64());
268 } else {
269 eprintln!("ok");
270 }
271 }
272}
273
274pub async fn bootstrap_with_config_path(
275 config: RoboticusConfig,
276 config_path: Option<std::path::PathBuf>,
277) -> Result<axum::Router, Box<dyn std::error::Error>> {
278 init_logging(&config);
279
280 let personality_state = api::PersonalityState::from_workspace(&config.agent.workspace);
281
282 if !personality_state.os_text.is_empty() {
283 tracing::info!(
284 personality = %personality_state.identity.name,
285 generated_by = %personality_state.identity.generated_by,
286 "Loaded personality files from workspace"
287 );
288 } else {
289 tracing::info!("No personality files found in workspace, using defaults");
290 }
291
292 let db_path = config.database.path.to_string_lossy().to_string();
293 let bp = BootPhase::start("database");
294 let db = Database::new(&db_path)?;
295 bp.done();
296 match crate::state_hygiene::run_state_hygiene(&config.database.path) {
297 Ok(report) if report.changed => {
298 tracing::info!(
299 changed_rows = report.changed_rows,
300 subagent_rows_normalized = report.subagent_rows_normalized,
301 cron_payload_rows_repaired = report.cron_payload_rows_repaired,
302 cron_jobs_disabled_invalid_expr = report.cron_jobs_disabled_invalid_expr,
303 "applied startup mechanic checks"
304 );
305 }
306 Ok(_) => {}
307 Err(e) => tracing::warn!(error = %e, "startup mechanic checks failed"),
308 }
309 match roboticus_db::sessions::backfill_nicknames(&db) {
310 Ok(0) => {}
311 Ok(n) => tracing::info!(count = n, "Backfilled session nicknames"),
312 Err(e) => tracing::warn!(error = %e, "Failed to backfill session nicknames"),
313 }
314 let bp = BootPhase::start("LLM service");
315 let mut llm = LlmService::new(&config)?;
316 match roboticus_db::metrics::recent_quality_scores(&db, 200) {
319 Ok(scores) if !scores.is_empty() => llm.quality.seed_from_history(&scores),
320 Ok(_) => {}
321 Err(e) => tracing::warn!(error = %e, "failed to seed quality tracker from history"),
322 }
323 llm.quality
326 .seed_from_baselines(roboticus_llm::exercise::COMMON_MODEL_BASELINES);
327
328 {
330 let mut candidates = vec![config.models.primary.clone()];
331 candidates.extend(config.models.fallbacks.iter().cloned());
332 for model in &candidates {
333 if llm.quality.observation_count(model) == 0 {
334 tracing::info!(
335 model = model.as_str(),
336 "model has no performance data — run 'roboticus models exercise {}' to baseline it",
337 model
338 );
339 }
340 }
341 }
342 bp.done();
343 let bp = BootPhase::start("wallet");
344 let wallet = match tokio::time::timeout(
345 std::time::Duration::from_secs(30),
346 WalletService::new(&config),
347 )
348 .await
349 {
350 Ok(result) => result?,
351 Err(_) => {
352 bp.done();
353 return Err("wallet init timed out (30s) — check RPC endpoint connectivity".into());
354 }
355 };
356 bp.done();
357
358 let wallet_arc = Arc::new(wallet.wallet.clone());
362 let treasury_policy = roboticus_wallet::treasury::TreasuryPolicy::new(&config.treasury);
363 let x402_handler =
364 Arc::new(WalletPaymentHandler::new(wallet_arc).with_treasury_policy(treasury_policy));
365 llm.set_payment_handler(x402_handler);
366
367 let bp = BootPhase::start("protocols + plugins");
368 let a2a = A2aProtocol::new(config.a2a.clone());
369 let plugin_env = {
370 let mut env = std::collections::HashMap::new();
371 env.insert("ROBOTICUS_DELEGATION_DEPTH".into(), "0".into());
372 env.insert(
373 "ROBOTICUS_WORKSPACE".into(),
374 config.agent.workspace.display().to_string(),
375 );
376 if let Some(ref ws) = config.skills.workspace_dir {
377 env.insert("ROBOTICUS_SKILLS_DIR".into(), ws.display().to_string());
378 }
379 env
380 };
381 let plugin_registry = plugins::init_plugin_registry(&config.plugins, plugin_env).await;
382 let mut policy_engine = PolicyEngine::new();
383 policy_engine.add_rule(Box::new(AuthorityRule));
384 policy_engine.add_rule(Box::new(CommandSafetyRule));
385 policy_engine.add_rule(Box::new(FinancialRule::new(
386 config.treasury.per_payment_cap,
387 )));
388 policy_engine.add_rule(Box::new(PathProtectionRule::from_config(
389 &config.security.filesystem,
390 )));
391 policy_engine.add_rule(Box::new(RateLimitRule::default()));
392 policy_engine.add_rule(Box::new(ValidationRule));
393 policy_engine.add_rule(Box::new(ConfigProtectionRule::default()));
394 let policy_engine = Arc::new(policy_engine);
395 let browser = Arc::new(Browser::new(config.browser.clone()));
396 let registry = Arc::new(SubagentRegistry::new(4, vec![]));
397
398 if let Ok(sub_agents) = roboticus_db::agents::list_enabled_sub_agents(&db) {
399 for sa in &sub_agents {
400 if !is_taskable_subagent_role(&sa.role) {
401 continue;
402 }
403 let resolved_model = match sa.model.trim().to_ascii_lowercase().as_str() {
404 "auto" | "orchestrator" => llm.router.select_model().to_string(),
405 _ => sa.model.clone(),
406 };
407 let fixed_skills = sa
408 .skills_json
409 .as_deref()
410 .and_then(|s| serde_json::from_str::<Vec<String>>(s).ok())
411 .unwrap_or_default();
412 let agent_config = roboticus_agent::subagents::AgentInstanceConfig {
413 id: sa.name.clone(),
414 name: sa.display_name.clone().unwrap_or_else(|| sa.name.clone()),
415 model: resolved_model,
416 skills: fixed_skills,
417 allowed_subagents: vec![],
418 max_concurrent: 4,
419 };
420 if let Err(e) = registry.register(agent_config).await {
421 tracing::warn!(agent = %sa.name, err = %e, "failed to register sub-agent");
422 } else if let Err(e) = registry.start_agent(&sa.name).await {
423 tracing::warn!(agent = %sa.name, err = %e, "failed to auto-start sub-agent");
424 }
425 }
426 if !sub_agents.is_empty() {
427 tracing::info!(
428 count = sub_agents.len(),
429 "registered sub-agents from database"
430 );
431 }
432 }
433
434 let event_bus = EventBus::new(256);
435
436 let keystore =
437 roboticus_core::keystore::Keystore::new(roboticus_core::keystore::Keystore::default_path());
438 if let Err(e) = keystore.unlock_machine() {
439 tracing::warn!("keystore auto-unlock failed: {e}");
440 }
441 let keystore = Arc::new(keystore);
442
443 bp.done();
444 let bp = BootPhase::start("channels");
445 let channel_router = Arc::new(ChannelRouter::with_store(db.clone()).await);
446 let telegram: Option<Arc<TelegramAdapter>> =
447 if let Some(ref tg_config) = config.channels.telegram {
448 if tg_config.enabled {
449 let token = resolve_token(&tg_config.token_ref, &tg_config.token_env, &keystore);
450 if !token.is_empty() {
451 let adapter = Arc::new(TelegramAdapter::with_config(
452 token,
453 tg_config.poll_timeout_seconds,
454 tg_config.allowed_chat_ids.clone(),
455 tg_config.webhook_secret.clone(),
456 config.security.deny_on_empty_allowlist,
457 ));
458 channel_router
459 .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
460 .await;
461 tracing::info!("Telegram adapter registered");
462 if tg_config.webhook_secret.is_none() {
463 tracing::warn!(
464 "Telegram webhook_secret not set; webhook endpoint will reject with 503"
465 );
466 }
467 Some(adapter)
468 } else {
469 tracing::warn!(
470 token_env = %tg_config.token_env,
471 "Telegram enabled but token is empty"
472 );
473 None
474 }
475 } else {
476 None
477 }
478 } else {
479 None
480 };
481
482 let whatsapp: Option<Arc<WhatsAppAdapter>> =
483 if let Some(ref wa_config) = config.channels.whatsapp {
484 if wa_config.enabled {
485 let token = resolve_token(&wa_config.token_ref, &wa_config.token_env, &keystore);
486 if !token.is_empty() && !wa_config.phone_number_id.is_empty() {
487 let adapter = Arc::new(WhatsAppAdapter::with_config(
488 token,
489 wa_config.phone_number_id.clone(),
490 wa_config.verify_token.clone(),
491 wa_config.allowed_numbers.clone(),
492 wa_config.app_secret.clone(),
493 config.security.deny_on_empty_allowlist,
494 )?);
495 channel_router
496 .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
497 .await;
498 tracing::info!("WhatsApp adapter registered");
499 if wa_config.app_secret.is_none() {
500 tracing::warn!(
501 "WhatsApp app_secret not set; webhook endpoint will reject with 503"
502 );
503 }
504 Some(adapter)
505 } else {
506 tracing::warn!("WhatsApp enabled but token env or phone_number_id is empty");
507 None
508 }
509 } else {
510 None
511 }
512 } else {
513 None
514 };
515
516 let discord: Option<Arc<DiscordAdapter>> = if let Some(ref dc_config) = config.channels.discord
517 {
518 if dc_config.enabled {
519 let token = resolve_token(&dc_config.token_ref, &dc_config.token_env, &keystore);
520 if !token.is_empty() {
521 let adapter = Arc::new(DiscordAdapter::with_config(
522 token,
523 dc_config.allowed_guild_ids.clone(),
524 config.security.deny_on_empty_allowlist,
525 ));
526 channel_router
527 .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
528 .await;
529 if let Err(e) = adapter.connect_gateway().await {
531 tracing::error!(error = %e, "Failed to connect Discord gateway");
532 }
533 tracing::info!("Discord adapter registered with gateway");
534 Some(adapter)
535 } else {
536 tracing::warn!(
537 token_env = %dc_config.token_env,
538 "Discord enabled but token env var is empty"
539 );
540 None
541 }
542 } else {
543 None
544 }
545 } else {
546 None
547 };
548
549 let signal: Option<Arc<SignalAdapter>> = if let Some(ref sig_config) = config.channels.signal {
550 if sig_config.enabled {
551 if !sig_config.phone_number.is_empty() {
552 let adapter = Arc::new(SignalAdapter::with_config(
553 sig_config.phone_number.clone(),
554 sig_config.daemon_url.clone(),
555 sig_config.allowed_numbers.clone(),
556 config.security.deny_on_empty_allowlist,
557 ));
558 channel_router
559 .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
560 .await;
561 tracing::info!("Signal adapter registered");
562 Some(adapter)
563 } else {
564 tracing::warn!("Signal enabled but phone_number is empty");
565 None
566 }
567 } else {
568 None
569 }
570 } else {
571 None
572 };
573
574 let email: Option<Arc<EmailAdapter>> = if config.channels.email.enabled {
575 let email_cfg = &config.channels.email;
576 let password = if email_cfg.password_env.is_empty() {
577 String::new()
578 } else {
579 match std::env::var(&email_cfg.password_env) {
580 Ok(val) => val,
581 Err(_) => {
582 tracing::warn!(env_var = %email_cfg.password_env, "email password env var is not set");
583 String::new()
584 }
585 }
586 };
587 if email_cfg.smtp_host.is_empty()
588 || email_cfg.username.is_empty()
589 || password.is_empty()
590 || email_cfg.from_address.is_empty()
591 {
592 tracing::warn!("Email enabled but SMTP credentials are incomplete");
593 None
594 } else {
595 match EmailAdapter::new(
596 email_cfg.from_address.clone(),
597 email_cfg.smtp_host.clone(),
598 email_cfg.smtp_port,
599 email_cfg.imap_host.clone(),
600 email_cfg.imap_port,
601 email_cfg.username.clone(),
602 password,
603 ) {
604 Ok(email_adapter) => {
605 let oauth2_token =
607 if email_cfg.use_oauth2 && !email_cfg.oauth2_token_env.is_empty() {
608 std::env::var(&email_cfg.oauth2_token_env).ok()
609 } else {
610 None
611 };
612 let adapter = Arc::new(
613 email_adapter
614 .with_allowed_senders(email_cfg.allowed_senders.clone())
615 .with_deny_on_empty(config.security.deny_on_empty_allowlist)
616 .with_poll_interval(std::time::Duration::from_secs(
617 email_cfg.poll_interval_seconds,
618 ))
619 .with_oauth2_token(oauth2_token)
620 .with_imap_idle_enabled(email_cfg.imap_idle_enabled),
621 );
622 channel_router
623 .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
624 .await;
625 tracing::info!("Email adapter registered");
626
627 if !email_cfg.imap_host.is_empty()
629 && let Err(e) = adapter.start_imap_listener().await
630 {
631 tracing::error!(error = %e, "Failed to start email IMAP listener");
632 }
633
634 Some(adapter)
635 }
636 Err(e) => {
637 tracing::error!(error = %e, "Failed to create email adapter");
638 None
639 }
640 }
641 }
642 } else {
643 None
644 };
645
646 let voice: Option<Arc<RwLock<VoicePipeline>>> = if config.channels.voice.enabled {
647 let mut voice_config = VoiceConfig::default();
648 if let Some(stt) = &config.channels.voice.stt_model {
649 voice_config.stt_model = stt.clone();
650 }
651 if let Some(tts) = &config.channels.voice.tts_model {
652 voice_config.tts_model = tts.clone();
653 }
654 if let Some(v) = &config.channels.voice.tts_voice {
655 voice_config.tts_voice = v.clone();
656 }
657 if let Ok(key) = std::env::var("OPENAI_API_KEY")
658 && !key.is_empty()
659 {
660 voice_config.api_key = Some(key);
661 }
662 tracing::info!("Voice pipeline initialized");
663 Some(Arc::new(RwLock::new(VoicePipeline::new(voice_config))))
664 } else {
665 None
666 };
667
668 let _matrix: Option<Arc<MatrixAdapter>> = if let Some(ref mx_config) = config.channels.matrix {
669 if mx_config.enabled {
670 let token = resolve_token(&None, &mx_config.access_token_env, &keystore);
671 if !token.is_empty() && !mx_config.homeserver_url.is_empty() {
672 let mut adapter = MatrixAdapter::new(
673 mx_config.homeserver_url.clone(),
674 token,
675 mx_config.allowed_rooms.clone(),
676 mx_config.auto_join,
677 mx_config.sync_timeout_seconds,
678 );
679 if mx_config.encryption_enabled {
680 let store_path = mx_config.device_store_path.clone().unwrap_or_else(|| {
681 roboticus_core::home_dir()
682 .join(".roboticus")
683 .join("matrix_crypto")
684 });
685 match roboticus_channels::matrix_crypto::MatrixCrypto::new(
686 &store_path,
687 &mx_config.device_display_name,
688 ) {
689 Ok(crypto) => {
690 adapter = adapter.with_crypto(Arc::new(crypto));
691 tracing::info!("Matrix E2EE enabled");
692 }
693 Err(e) => {
694 tracing::error!(error = %e, "Failed to init Matrix crypto; running unencrypted");
695 }
696 }
697 }
698 let adapter = Arc::new(adapter);
699 channel_router
700 .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
701 .await;
702 tracing::info!("Matrix adapter registered");
703 Some(adapter)
704 } else {
705 tracing::warn!("Matrix enabled but homeserver_url or access token is empty");
706 None
707 }
708 } else {
709 None
710 }
711 } else {
712 None
713 };
714
715 let hmac_secret = {
716 use rand::RngCore;
717 let mut buf = vec![0u8; 32];
718 rand::rngs::OsRng.fill_bytes(&mut buf);
719 buf
720 };
721
722 let retriever = Arc::new(roboticus_agent::retrieval::MemoryRetriever::new(
723 config.memory.clone(),
724 ));
725
726 let mut tool_registry = ToolRegistry::new();
727 tool_registry.register(Box::new(EchoTool));
728 tool_registry.register(Box::new(BashTool));
729 tool_registry.register(Box::new(ScriptRunnerTool::new(
730 config.skills.clone(),
731 config.security.filesystem.clone(),
732 )));
733 tool_registry.register(Box::new(ReadFileTool));
734 tool_registry.register(Box::new(WriteFileTool));
735 tool_registry.register(Box::new(EditFileTool));
736 tool_registry.register(Box::new(ListDirectoryTool));
737 tool_registry.register(Box::new(GlobFilesTool));
738 tool_registry.register(Box::new(SearchFilesTool));
739
740 use roboticus_agent::tools::CronTool;
742 tool_registry.register(Box::new(CronTool));
743
744 use roboticus_agent::tools::{
746 GetChannelHealthTool, GetMemoryStatsTool, GetRuntimeContextTool, GetSubagentStatusTool,
747 RecallMemoryTool,
748 };
749 tool_registry.register(Box::new(GetRuntimeContextTool));
750 tool_registry.register(Box::new(GetMemoryStatsTool));
751 tool_registry.register(Box::new(GetChannelHealthTool));
752 tool_registry.register(Box::new(GetSubagentStatusTool));
753 tool_registry.register(Box::new(RecallMemoryTool));
754
755 use roboticus_agent::tools::{AlterTableTool, CreateTableTool, DropTableTool};
757 tool_registry.register(Box::new(CreateTableTool));
758 tool_registry.register(Box::new(AlterTableTool));
759 tool_registry.register(Box::new(DropTableTool));
760
761 bp.done();
763 let bp = BootPhase::start("plugin tools");
764 plugins::register_plugin_tools(&mut tool_registry, Arc::clone(&plugin_registry)).await;
765 bp.done();
766
767 let obsidian_vault: Option<Arc<RwLock<ObsidianVault>>> = if config.obsidian.enabled {
769 match ObsidianVault::from_config(&config.obsidian) {
770 Ok(vault) => {
771 let vault = Arc::new(RwLock::new(vault));
772 tool_registry.register(Box::new(ObsidianReadTool::new(Arc::clone(&vault))));
773 tool_registry.register(Box::new(ObsidianWriteTool::new(Arc::clone(&vault))));
774 tool_registry.register(Box::new(ObsidianSearchTool::new(Arc::clone(&vault))));
775 tracing::info!("Obsidian vault integration enabled");
776 Some(vault)
777 }
778 Err(e) => {
779 tracing::warn!(error = %e, "Failed to initialize Obsidian vault");
780 None
781 }
782 }
783 } else {
784 None
785 };
786
787 if let Some(ref vault) = obsidian_vault
789 && config.obsidian.watch_for_changes
790 {
791 match roboticus_agent::obsidian::watcher::VaultWatcher::start(Arc::clone(vault)).await {
792 Ok(_watcher) => {
793 tracing::info!("Obsidian vault file watcher started");
795 }
796 Err(e) => {
797 tracing::warn!(error = %e, "Failed to start Obsidian vault watcher");
798 }
799 }
800 }
801
802 let tool_registry = Arc::new(tool_registry);
803
804 let capabilities = Arc::new(roboticus_agent::capability::CapabilityRegistry::new());
805 if let Err(e) = capabilities
806 .sync_from_tool_registry(Arc::clone(&tool_registry))
807 .await
808 {
809 tracing::warn!(error = %e, "initial capability sync from tool registry reported errors");
810 }
811
812 let approvals = Arc::new(ApprovalManager::new(config.approvals.clone()));
813
814 let oauth = Arc::new(OAuthManager::new()?);
815
816 let discovery_registry = Arc::new(RwLock::new(
817 roboticus_agent::discovery::DiscoveryRegistry::new(),
818 ));
819 let device_manager = Arc::new(RwLock::new(roboticus_agent::device::DeviceManager::new(
820 roboticus_agent::device::DeviceIdentity::generate(&config.agent.id),
821 config.devices.max_paired_devices,
822 )));
823
824 let mut mcp_clients = roboticus_agent::mcp::McpClientManager::new();
825 for c in &config.mcp.clients {
826 let mut conn =
827 roboticus_agent::mcp::McpClientConnection::new(c.name.clone(), c.url.clone());
828 if let Err(e) = conn.discover() {
829 tracing::warn!(mcp_name = %c.name, error = %e, "MCP client discovery failed at startup");
830 }
831 mcp_clients.add_connection(conn);
832 }
833 let mcp_clients = Arc::new(RwLock::new(mcp_clients));
834
835 let live_mcp = Arc::new(roboticus_agent::mcp::manager::McpConnectionManager::new());
836 let bp = BootPhase::start("MCP connections");
837 match tokio::time::timeout(
838 std::time::Duration::from_secs(30),
839 live_mcp.connect_all(&config.mcp.servers, &capabilities),
840 )
841 .await
842 {
843 Ok(()) => {}
844 Err(_) => {
845 tracing::warn!("MCP connect_all timed out after 30s; some servers may be unavailable");
846 }
847 }
848 bp.done();
849 let live_mcp_tool_count = live_mcp.connected_count().await;
850 if live_mcp_tool_count > 0 {
851 tracing::info!(
852 servers = live_mcp_tool_count,
853 "MCP client connections established"
854 );
855 }
856
857 let mut mcp_server_registry = roboticus_agent::mcp::McpServerRegistry::new();
858 let exported = roboticus_agent::mcp::export_tools_as_mcp(
859 &tool_registry
860 .list()
861 .iter()
862 .map(|t| {
863 (
864 t.name().to_string(),
865 t.description().to_string(),
866 t.parameters_schema(),
867 )
868 })
869 .collect::<Vec<_>>(),
870 );
871 for tool in exported {
872 mcp_server_registry.register_tool(tool);
873 }
874 mcp_server_registry.register_resource(roboticus_agent::mcp::McpResource {
875 uri: "roboticus://sessions/active".to_string(),
876 name: "Active Sessions".to_string(),
877 description: "Active sessions in the local runtime".to_string(),
878 mime_type: "application/json".to_string(),
879 });
880 mcp_server_registry.register_resource(roboticus_agent::mcp::McpResource {
881 uri: "roboticus://metrics/capacity".to_string(),
882 name: "Provider Capacity Stats".to_string(),
883 description: "Current provider utilization and headroom".to_string(),
884 mime_type: "application/json".to_string(),
885 });
886 let mcp_server_registry = Arc::new(RwLock::new(mcp_server_registry));
887
888 let ann_index = roboticus_db::ann::AnnIndex::new(config.memory.ann_index);
889 if config.memory.ann_index {
890 match ann_index.build_from_db(&db) {
891 Ok(count) => {
892 if ann_index.is_built() {
893 tracing::info!(count, "ANN index built from database");
894 } else {
895 tracing::info!(
896 count,
897 min = ann_index.min_entries_for_index,
898 "ANN index below threshold, brute-force search will be used"
899 );
900 }
901 }
902 Err(e) => {
903 tracing::warn!(error = %e, "Failed to build ANN index, falling back to brute-force");
904 }
905 }
906 }
907
908 let media_service = if config.multimodal.enabled {
910 match roboticus_channels::media::MediaService::new(&config.multimodal) {
911 Ok(svc) => {
912 tracing::info!(
913 media_dir = ?config.multimodal.media_dir,
914 "Media service initialized"
915 );
916 Some(Arc::new(svc))
917 }
918 Err(e) => {
919 tracing::error!(error = %e, "Failed to initialize media service");
920 None
921 }
922 }
923 } else {
924 None
925 };
926
927 let resolved_config_path =
928 config_path.unwrap_or_else(crate::config_runtime::resolve_default_config_path);
929
930 let rate_limiter = GlobalRateLimitLayer::new(
933 u64::from(config.server.rate_limit_requests),
934 Duration::from_secs(config.server.rate_limit_window_secs),
935 )
936 .with_per_ip_capacity(u64::from(config.server.per_ip_rate_limit_requests))
937 .with_per_actor_capacity(u64::from(config.server.per_actor_rate_limit_requests))
938 .with_trusted_proxy_cidrs(&config.server.trusted_proxy_cidrs);
939
940 let semantic_classifier = Arc::new(
944 roboticus_llm::semantic_classifier::SemanticClassifier::new(llm.embedding.clone()),
945 );
946
947 let state = AppState {
948 db,
949 config: Arc::new(RwLock::new(config.clone())),
950 llm: Arc::new(RwLock::new(llm)),
951 wallet: Arc::new(wallet),
952 a2a: Arc::new(RwLock::new(a2a)),
953 personality: Arc::new(RwLock::new(personality_state)),
954 hmac_secret: Arc::new(hmac_secret),
955 interviews: Arc::new(RwLock::new(std::collections::HashMap::new())),
956 plugins: plugin_registry,
957 policy_engine,
958 browser,
959 registry,
960 event_bus: event_bus.clone(),
961 channel_router,
962 telegram,
963 whatsapp,
964 retriever,
965 ann_index,
966 tools: tool_registry,
967 capabilities,
968 approvals,
969 discord,
970 signal,
971 email,
972 voice,
973 media_service,
974 discovery: discovery_registry,
975 devices: device_manager,
976 mcp_clients,
977 mcp_server: mcp_server_registry,
978 live_mcp,
979 oauth,
980 keystore,
981 obsidian: obsidian_vault,
982 started_at: std::time::Instant::now(),
983 config_path: Arc::new(resolved_config_path.clone()),
984 config_apply_status: crate::config_runtime::status_for_path(&resolved_config_path),
985 pending_specialist_proposals: Arc::new(RwLock::new(std::collections::HashMap::new())),
986 ws_tickets: ws_ticket::TicketStore::new(),
987 rate_limiter: rate_limiter.clone(),
988 semantic_classifier,
989 };
990 let bp = BootPhase::start("capability sync");
991 state.resync_capabilities_from_tools().await;
992 bp.done();
993
994 if config.memory.ann_index {
996 let ann_db = state.db.clone();
997 let ann_idx = state.ann_index.clone();
998 tokio::spawn(async move {
999 let mut interval = tokio::time::interval(std::time::Duration::from_secs(600));
1000 interval.tick().await;
1001 loop {
1002 interval.tick().await;
1003 match ann_idx.rebuild(&ann_db) {
1004 Ok(count) => {
1005 tracing::debug!(count, "ANN index rebuilt");
1006 }
1007 Err(e) => {
1008 tracing::warn!(error = %e, "ANN index rebuild failed");
1009 }
1010 }
1011 }
1012 });
1013 tracing::info!("ANN index rebuild daemon spawned (10min interval)");
1014 }
1015
1016 {
1018 let loaded = roboticus_db::cache::load_cache_entries(&state.db)
1019 .inspect_err(|e| tracing::warn!(error = %e, "failed to load semantic cache entries"))
1020 .unwrap_or_default();
1021 if !loaded.is_empty() {
1022 let imported: Vec<(String, roboticus_llm::ExportedCacheEntry)> = loaded
1023 .into_iter()
1024 .map(|(id, pe)| {
1025 let ttl = pe
1026 .expires_at
1027 .and_then(|e| {
1028 chrono::NaiveDateTime::parse_from_str(&e, "%Y-%m-%dT%H:%M:%S")
1029 .ok()
1030 .or_else(|| {
1031 chrono::NaiveDateTime::parse_from_str(&e, "%Y-%m-%d %H:%M:%S")
1032 .ok()
1033 })
1034 })
1035 .map(|exp| {
1036 let now = chrono::Utc::now().naive_utc();
1037 if exp > now {
1038 (exp - now).num_seconds().max(0) as u64
1039 } else {
1040 0
1041 }
1042 })
1043 .unwrap_or(3600);
1044
1045 (
1046 id,
1047 roboticus_llm::ExportedCacheEntry {
1048 content: pe.response,
1049 model: pe.model,
1050 tokens_saved: pe.tokens_saved,
1051 hits: pe.hit_count,
1052 involved_tools: false,
1053 embedding: pe.embedding,
1054 ttl_remaining_secs: ttl,
1055 },
1056 )
1057 })
1058 .collect();
1059 let count = imported.len();
1060 let llm = state.llm.read().await;
1061 llm.cache
1062 .lock()
1063 .unwrap_or_else(|e| e.into_inner())
1064 .import_entries(imported);
1065 tracing::info!(count, "Loaded semantic cache from database");
1066 }
1067 }
1068
1069 {
1071 let flush_db = state.db.clone();
1072 let flush_llm = Arc::clone(&state.llm);
1073 tokio::spawn(async move {
1074 let mut interval = tokio::time::interval(std::time::Duration::from_secs(300));
1075 interval.tick().await; loop {
1077 interval.tick().await;
1078 let entries = {
1079 let llm = flush_llm.read().await;
1080 llm.cache
1081 .lock()
1082 .unwrap_or_else(|e| e.into_inner())
1083 .export_entries()
1084 };
1085 for (hash, entry) in &entries {
1086 let expires = chrono::Utc::now()
1087 + chrono::Duration::seconds(entry.ttl_remaining_secs as i64);
1088 let pe = roboticus_db::cache::PersistedCacheEntry {
1089 prompt_hash: hash.clone(),
1090 response: entry.content.clone(),
1091 model: entry.model.clone(),
1092 tokens_saved: entry.tokens_saved,
1093 hit_count: entry.hits,
1094 embedding: entry.embedding.clone(),
1095 created_at: chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S").to_string(),
1096 expires_at: Some(expires.format("%Y-%m-%dT%H:%M:%S").to_string()),
1097 };
1098 roboticus_db::cache::save_cache_entry(&flush_db, hash, &pe)
1099 .inspect_err(
1100 |e| tracing::warn!(error = %e, hash, "failed to persist cache entry"),
1101 )
1102 .ok();
1103 }
1104 roboticus_db::cache::evict_expired_cache(&flush_db)
1105 .inspect_err(|e| tracing::warn!(error = %e, "failed to evict expired cache"))
1106 .ok();
1107 tracing::debug!(count = entries.len(), "Flushed semantic cache to database");
1108 }
1109 });
1110 tracing::info!("Cache flush daemon spawned (5min interval)");
1111 }
1112
1113 let (heartbeat_shutdown_tx, heartbeat_treasury_state) = {
1116 let hb_wallet = Arc::clone(&state.wallet);
1117 let hb_db = state.db.clone();
1118 let hb_session_cfg = config.session.clone();
1119 let hb_digest_cfg = config.digest.clone();
1120 let hb_agent_id = config.agent.id.clone();
1121 let hb_vault = state.obsidian.clone();
1122 let hb_config = config.heartbeat.clone();
1123 roboticus_schedule::spawn_domain_loops(
1124 hb_wallet,
1125 hb_db,
1126 hb_session_cfg,
1127 hb_digest_cfg,
1128 hb_agent_id,
1129 hb_vault,
1130 hb_config,
1131 )
1132 .await
1133 };
1134 tracing::info!(
1135 treasury_s = config.heartbeat.treasury_interval_seconds,
1136 yield_s = config.heartbeat.yield_interval_seconds,
1137 memory_s = config.heartbeat.memory_interval_seconds,
1138 maintenance_s = config.heartbeat.maintenance_interval_seconds,
1139 session_s = config.heartbeat.session_interval_seconds,
1140 discovery_s = config.heartbeat.discovery_interval_seconds,
1141 "domain loops spawned"
1142 );
1143 std::mem::forget(heartbeat_shutdown_tx);
1149 std::mem::forget(heartbeat_treasury_state);
1150
1151 {
1153 let drain_router = Arc::clone(&state.channel_router);
1154 tokio::spawn(async move {
1155 let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
1156 interval.tick().await;
1157 loop {
1158 interval.tick().await;
1159 drain_router.drain_retry_queue().await;
1160 }
1161 });
1162 tracing::info!("Delivery retry queue drain daemon spawned (30s interval)");
1163 }
1164
1165 {
1167 let instance_id = config.agent.id.clone();
1168 let cron_state = state.clone();
1169 tokio::spawn(async move {
1170 crate::cron_runtime::run_cron_worker(cron_state, instance_id).await;
1171 });
1172 tracing::info!("Cron worker spawned");
1173 }
1174
1175 {
1177 let db_path = config.database.path.clone();
1178 let interval_secs = std::env::var("ROBOTICUS_MECHANIC_CHECK_INTERVAL_SECS")
1179 .ok()
1180 .or_else(|| std::env::var("ROBOTICUS_STATE_HYGIENE_INTERVAL_SECS").ok())
1181 .and_then(|v| v.parse::<u64>().ok())
1182 .filter(|v| *v >= 300)
1183 .unwrap_or(21_600);
1184 tokio::spawn(async move {
1185 let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
1186 interval.tick().await;
1187 loop {
1188 interval.tick().await;
1189 match crate::state_hygiene::run_state_hygiene(&db_path) {
1190 Ok(report) if report.changed => {
1191 tracing::info!(
1192 changed_rows = report.changed_rows,
1193 subagent_rows_normalized = report.subagent_rows_normalized,
1194 cron_payload_rows_repaired = report.cron_payload_rows_repaired,
1195 cron_jobs_disabled_invalid_expr =
1196 report.cron_jobs_disabled_invalid_expr,
1197 "periodic mechanic checks applied"
1198 );
1199 }
1200 Ok(_) => tracing::debug!("periodic mechanic checks: no changes"),
1201 Err(e) => tracing::warn!(error = %e, "periodic mechanic checks failed"),
1202 }
1203 }
1204 });
1205 tracing::info!(interval_secs, "Mechanic checks daemon spawned");
1206 }
1207
1208 {
1209 let startup_announce_channels = config.channels.startup_announcement_channels();
1210
1211 let display_host = match config.server.bind.as_str() {
1212 "127.0.0.1" | "0.0.0.0" | "::1" | "::" => "localhost",
1213 other => other,
1214 };
1215 let channel_list: Vec<&str> = {
1216 let mut ch = vec!["web"];
1217 if config.channels.telegram.is_some() {
1218 ch.push("telegram");
1219 }
1220 if config.channels.whatsapp.is_some() {
1221 ch.push("whatsapp");
1222 }
1223 if config.channels.discord.is_some() {
1224 ch.push("discord");
1225 }
1226 if config.channels.signal.is_some() {
1227 ch.push("signal");
1228 }
1229 ch
1230 };
1231 let announce_text = format!(
1232 "🤖 *Roboticus Online*\n\n\
1233 🧠 *{}* (`{}`)\n\
1234 ⚡ `{}`\n\
1235 🔀 routing: {}\n\
1236 🌐 `{}:{}`\n\
1237 📡 {}\n\n\
1238 🕐 {}",
1239 config.agent.name,
1240 config.agent.id,
1241 config.models.primary,
1242 config.models.routing.mode,
1243 display_host,
1244 config.server.port,
1245 channel_list.join(" · "),
1246 chrono::Local::now().format("%Y-%m-%d %H:%M %Z"),
1247 );
1248
1249 if startup_announce_channels.iter().any(|c| c == "telegram") {
1250 if let (Some(adapter), Some(tg_cfg)) =
1251 (state.telegram.clone(), config.channels.telegram.as_ref())
1252 {
1253 let announce_targets = tg_cfg.allowed_chat_ids.clone();
1254 if announce_targets.is_empty() {
1255 tracing::warn!(
1256 "Telegram startup announcement skipped: channels.telegram.allowed_chat_ids is empty"
1257 );
1258 } else {
1259 let text = announce_text.clone();
1260 tokio::spawn(async move {
1261 for chat_id in announce_targets {
1262 let chat = chat_id.to_string();
1263 match adapter
1264 .send(roboticus_channels::OutboundMessage {
1265 content: text.clone(),
1266 recipient_id: chat.clone(),
1267 metadata: None,
1268 })
1269 .await
1270 {
1271 Ok(()) => {
1272 tracing::info!(chat_id = %chat, "telegram startup announcement sent")
1273 }
1274 Err(e) => {
1275 tracing::warn!(chat_id = %chat, error = %e, "telegram startup announcement failed")
1276 }
1277 }
1278 }
1279 });
1280 }
1281 } else {
1282 tracing::warn!(
1283 "Telegram startup announcement requested but telegram channel is not enabled/configured"
1284 );
1285 }
1286 }
1287
1288 if startup_announce_channels.iter().any(|c| c == "whatsapp") {
1289 if let (Some(adapter), Some(wa_cfg)) =
1290 (state.whatsapp.clone(), config.channels.whatsapp.as_ref())
1291 {
1292 let targets = wa_cfg.allowed_numbers.clone();
1293 if targets.is_empty() {
1294 tracing::warn!(
1295 "WhatsApp startup announcement skipped: channels.whatsapp.allowed_numbers is empty"
1296 );
1297 } else {
1298 let text = announce_text.clone();
1299 tokio::spawn(async move {
1300 for number in targets {
1301 match adapter
1302 .send(roboticus_channels::OutboundMessage {
1303 content: text.clone(),
1304 recipient_id: number.clone(),
1305 metadata: None,
1306 })
1307 .await
1308 {
1309 Ok(()) => {
1310 tracing::info!(recipient = %number, "whatsapp startup announcement sent")
1311 }
1312 Err(e) => {
1313 tracing::warn!(recipient = %number, error = %e, "whatsapp startup announcement failed")
1314 }
1315 }
1316 }
1317 });
1318 }
1319 } else {
1320 tracing::warn!(
1321 "WhatsApp startup announcement requested but whatsapp channel is not enabled/configured"
1322 );
1323 }
1324 }
1325
1326 if startup_announce_channels.iter().any(|c| c == "signal") {
1327 if let (Some(adapter), Some(sig_cfg)) =
1328 (state.signal.clone(), config.channels.signal.as_ref())
1329 {
1330 let targets = sig_cfg.allowed_numbers.clone();
1331 if targets.is_empty() {
1332 tracing::warn!(
1333 "Signal startup announcement skipped: channels.signal.allowed_numbers is empty"
1334 );
1335 } else {
1336 let text = announce_text.clone();
1337 tokio::spawn(async move {
1338 for number in targets {
1339 match adapter
1340 .send(roboticus_channels::OutboundMessage {
1341 content: text.clone(),
1342 recipient_id: number.clone(),
1343 metadata: None,
1344 })
1345 .await
1346 {
1347 Ok(()) => {
1348 tracing::info!(recipient = %number, "signal startup announcement sent")
1349 }
1350 Err(e) => {
1351 tracing::warn!(recipient = %number, error = %e, "signal startup announcement failed")
1352 }
1353 }
1354 }
1355 });
1356 }
1357 } else {
1358 tracing::warn!(
1359 "Signal startup announcement requested but signal channel is not enabled/configured"
1360 );
1361 }
1362 }
1363
1364 for ch in &startup_announce_channels {
1365 if ch != "telegram" && ch != "whatsapp" && ch != "signal" {
1366 tracing::warn!(
1367 channel = %ch,
1368 "startup announcement requested for channel without recipient mapping support"
1369 );
1370 }
1371 }
1372 }
1373
1374 if state.telegram.is_some() {
1375 let use_polling = config
1376 .channels
1377 .telegram
1378 .as_ref()
1379 .map(|c| !c.webhook_mode)
1380 .unwrap_or(true);
1381 if use_polling {
1382 let poll_state = state.clone();
1383 tokio::spawn(async move {
1384 api::telegram_poll_loop(poll_state).await;
1385 });
1386 }
1387 }
1388 if state.discord.is_some() {
1389 let poll_state = state.clone();
1390 tokio::spawn(async move {
1391 api::discord_poll_loop(poll_state).await;
1392 });
1393 }
1394 if state.signal.is_some() {
1395 let poll_state = state.clone();
1396 tokio::spawn(async move {
1397 api::signal_poll_loop(poll_state).await;
1398 });
1399 }
1400 if state.email.is_some() {
1401 let poll_state = state.clone();
1402 tokio::spawn(async move {
1403 api::email_poll_loop(poll_state).await;
1404 });
1405 }
1406
1407 let auth_layer = ApiKeyLayer::new(config.server.api_key.clone());
1408 let local_origin = format!("http://{}:{}", config.server.bind, config.server.port);
1409 let origin_header = local_origin
1410 .parse::<axum::http::HeaderValue>()
1411 .unwrap_or_else(|e| {
1412 tracing::warn!(
1413 origin = %local_origin,
1414 error = %e,
1415 "CORS origin failed to parse, falling back to 127.0.0.1 loopback"
1416 );
1417 axum::http::HeaderValue::from_static("http://127.0.0.1:3000")
1418 });
1419 let cors = CorsLayer::new()
1420 .allow_origin(origin_header)
1421 .allow_methods([
1422 axum::http::Method::GET,
1423 axum::http::Method::POST,
1424 axum::http::Method::PUT,
1425 axum::http::Method::DELETE,
1426 ])
1427 .allow_headers([
1428 axum::http::header::CONTENT_TYPE,
1429 axum::http::header::AUTHORIZATION,
1430 axum::http::HeaderName::from_static("x-api-key"),
1431 ]);
1432 let authed_routes = build_router(state.clone()).layer(auth_layer);
1433
1434 let ws_routes = axum::Router::new().route(
1437 "/ws",
1438 ws_route(
1439 event_bus.clone(),
1440 state.ws_tickets.clone(),
1441 config.server.api_key.clone(),
1442 ),
1443 );
1444
1445 let mcp_routes = build_mcp_router(&state, config.server.api_key.clone());
1449
1450 let public_routes = build_public_router(state);
1451
1452 let app = authed_routes
1453 .merge(ws_routes)
1454 .merge(mcp_routes)
1455 .merge(public_routes)
1456 .layer(cors)
1457 .layer(rate_limiter);
1458 Ok(app)
1459}
1460
1461#[cfg(test)]
1462pub mod test_support;
1463
1464#[cfg(test)]
1465mod tests {
1466 use super::*;
1467 use crate::test_support::EnvGuard;
1468
1469 const BOOTSTRAP_CONFIG: &str = r#"
1470[agent]
1471name = "Roboticus"
1472id = "roboticus-test"
1473
1474[server]
1475port = 18789
1476bind = "127.0.0.1"
1477
1478[database]
1479path = ":memory:"
1480
1481[models]
1482primary = "ollama/qwen3:8b"
1483"#;
1484
1485 #[tokio::test]
1486 async fn bootstrap_with_memory_db_succeeds() {
1487 let config = RoboticusConfig::from_str(BOOTSTRAP_CONFIG).expect("parse config");
1488 let result = bootstrap(config).await;
1489 assert!(
1490 result.is_ok(),
1491 "bootstrap with :memory: should succeed: {:?}",
1492 result.err()
1493 );
1494 }
1495
1496 #[tokio::test]
1497 async fn bootstrap_handles_enabled_channels_with_missing_credentials() {
1498 let dir = tempfile::tempdir().unwrap();
1499 let db_path = dir.path().join("state.db");
1500 let cfg = format!(
1501 r#"
1502[agent]
1503name = "T"
1504id = "t"
1505
1506[server]
1507bind = "127.0.0.1"
1508port = 18789
1509
1510[database]
1511path = "{db_path}"
1512
1513[models]
1514primary = "ollama/qwen3:8b"
1515
1516[channels.telegram]
1517enabled = true
1518token_env = "TEST_TELEGRAM_MISSING"
1519
1520[channels.whatsapp]
1521enabled = true
1522token_env = "TEST_WHATSAPP_MISSING"
1523phone_number_id = ""
1524
1525[channels.discord]
1526enabled = true
1527token_env = "TEST_DISCORD_MISSING"
1528
1529[channels.signal]
1530enabled = true
1531phone_number = ""
1532
1533[channels.email]
1534enabled = true
1535password_env = "TEST_EMAIL_PASSWORD_MISSING"
1536
1537[channels.voice]
1538enabled = true
1539
1540[obsidian]
1541enabled = true
1542vault_path = "{vault_path}"
1543watch_for_changes = true
1544"#,
1545 db_path = db_path.display(),
1546 vault_path = dir.path().join("missing-vault").display(),
1547 );
1548 let config = RoboticusConfig::from_str(&cfg).expect("parse config");
1549 let result = bootstrap(config).await;
1550 assert!(
1551 result.is_ok(),
1552 "bootstrap should tolerate disabled channel credentials and invalid obsidian path: {:?}",
1553 result.err()
1554 );
1555 }
1556
1557 #[tokio::test]
1558 async fn bootstrap_loads_taskable_subagents_from_database() {
1559 let dir = tempfile::tempdir().unwrap();
1560 let db_path = dir.path().join("state.db");
1561 let db = roboticus_db::Database::new(db_path.to_str().unwrap()).unwrap();
1562
1563 roboticus_db::agents::upsert_sub_agent(
1564 &db,
1565 &roboticus_db::agents::SubAgentRow {
1566 id: "sa-1".into(),
1567 name: "finance_specialist".into(),
1568 display_name: Some("Finance Specialist".into()),
1569 model: "auto".into(),
1570 fallback_models_json: Some("[]".into()),
1571 role: "subagent".into(),
1572 description: Some("Handles finance tasks".into()),
1573 skills_json: Some(r#"["budgeting"]"#.into()),
1574 enabled: true,
1575 session_count: 0,
1576 last_used_at: None,
1577 },
1578 )
1579 .unwrap();
1580 roboticus_db::agents::upsert_sub_agent(
1581 &db,
1582 &roboticus_db::agents::SubAgentRow {
1583 id: "sa-2".into(),
1584 name: "observer".into(),
1585 display_name: Some("Observer".into()),
1586 model: "ollama/qwen3:8b".into(),
1587 fallback_models_json: Some("[]".into()),
1588 role: "model-proxy".into(),
1589 description: Some("Not taskable".into()),
1590 skills_json: None,
1591 enabled: true,
1592 session_count: 0,
1593 last_used_at: None,
1594 },
1595 )
1596 .unwrap();
1597
1598 let cfg = format!(
1599 r#"
1600[agent]
1601name = "T"
1602id = "t"
1603
1604[server]
1605bind = "127.0.0.1"
1606port = 18789
1607
1608[database]
1609path = "{db_path}"
1610
1611[models]
1612primary = "ollama/qwen3:8b"
1613"#,
1614 db_path = db_path.display(),
1615 );
1616 let config = RoboticusConfig::from_str(&cfg).expect("parse config");
1617 let result = bootstrap(config).await;
1618 assert!(
1619 result.is_ok(),
1620 "bootstrap should register enabled taskable subagents from db: {:?}",
1621 result.err()
1622 );
1623 }
1624
1625 #[test]
1626 fn cleanup_old_logs_no_panic_on_missing_dir() {
1627 let dir = std::path::Path::new("/tmp/roboticus-test-nonexistent-dir-cleanup");
1628 cleanup_old_logs(dir, 30);
1629 }
1630
1631 #[test]
1632 fn cleanup_old_logs_keeps_recent_logs() {
1633 let dir = tempfile::tempdir().unwrap();
1634 let recent = dir.path().join("recent.log");
1635 std::fs::write(&recent, "fresh log").unwrap();
1636
1637 cleanup_old_logs(dir.path(), 30);
1638 assert!(recent.exists(), "recent logs should remain");
1639 }
1640
1641 #[test]
1642 fn cleanup_old_logs_ignores_non_log_files() {
1643 let dir = tempfile::tempdir().unwrap();
1644 let txt = dir.path().join("data.txt");
1645 std::fs::write(&txt, "keep me").unwrap();
1646
1647 cleanup_old_logs(dir.path(), 0);
1648 assert!(txt.exists(), "non-log files should not be deleted");
1649 }
1650
1651 #[test]
1652 fn cleanup_old_logs_empty_dir() {
1653 let dir = tempfile::tempdir().unwrap();
1654 cleanup_old_logs(dir.path(), 1);
1655 }
1656
1657 #[test]
1658 fn taskable_subagent_roles_are_strict() {
1659 assert!(is_taskable_subagent_role("subagent"));
1660 assert!(is_taskable_subagent_role("specialist"));
1661 assert!(is_taskable_subagent_role("SubAgent"));
1662 assert!(!is_taskable_subagent_role("model-proxy"));
1663 }
1664
1665 #[test]
1666 fn resolve_token_prefers_keystore_reference_then_env_then_empty() {
1667 let dir = tempfile::tempdir().unwrap();
1668 let keystore_path = dir.path().join("keystore.enc");
1669 let keystore = roboticus_core::keystore::Keystore::new(keystore_path);
1670 keystore.unlock("pw").unwrap();
1671 keystore.set("telegram_bot_token", "from_keystore").unwrap();
1672
1673 let _env = EnvGuard::set("TEST_TELEGRAM_TOKEN", "from_env");
1674 let token = resolve_token(
1675 &Some("keystore:telegram_bot_token".to_string()),
1676 "TEST_TELEGRAM_TOKEN",
1677 &keystore,
1678 );
1679 assert_eq!(token, "from_keystore");
1680
1681 let fallback = resolve_token(
1682 &Some("keystore:missing".to_string()),
1683 "TEST_TELEGRAM_TOKEN",
1684 &keystore,
1685 );
1686 assert_eq!(fallback, "from_env");
1687
1688 let empty = resolve_token(&None, "", &keystore);
1689 assert!(empty.is_empty());
1690 }
1691
1692 #[test]
1693 fn resolve_token_uses_env_when_no_keystore_ref_is_provided() {
1694 let dir = tempfile::tempdir().unwrap();
1695 let keystore_path = dir.path().join("keystore.enc");
1696 let keystore = roboticus_core::keystore::Keystore::new(keystore_path);
1697 keystore.unlock("pw").unwrap();
1698
1699 let _env = EnvGuard::set("TEST_SIGNAL_TOKEN", "from_env_only");
1700 let token = resolve_token(&None, "TEST_SIGNAL_TOKEN", &keystore);
1701 assert_eq!(token, "from_env_only");
1702 }
1703
1704 #[test]
1705 fn resolve_token_returns_empty_for_empty_env_var() {
1706 let dir = tempfile::tempdir().unwrap();
1707 let keystore_path = dir.path().join("keystore.enc");
1708 let keystore = roboticus_core::keystore::Keystore::new(keystore_path);
1709 keystore.unlock("pw").unwrap();
1710
1711 let _env = EnvGuard::set("TEST_EMPTY_TOKEN", "");
1712 let token = resolve_token(&None, "TEST_EMPTY_TOKEN", &keystore);
1713 assert!(token.is_empty());
1714 }
1715
1716 #[test]
1717 fn cleanup_old_logs_can_prune_when_window_is_zero_days() {
1718 let dir = tempfile::tempdir().unwrap();
1719 let old_log = dir.path().join("old.log");
1720 std::fs::write(&old_log, "stale").unwrap();
1721 std::thread::sleep(std::time::Duration::from_millis(10));
1722 cleanup_old_logs(dir.path(), 0);
1723 assert!(!old_log.exists());
1724 }
1725}