Skip to main content

roboticus_server/
lib.rs

1//! # roboticus-server
2//!
3//! Top-level binary crate that assembles all Roboticus workspace crates into a
4//! single runtime. The [`bootstrap()`] function initializes the database,
5//! wallet, LLM pipeline, agent loop, channel adapters, and background daemons,
6//! then returns an axum `Router` ready to serve.
7//!
8//! ## Key Types
9//!
10//! - [`AppState`] -- Shared application state passed to all route handlers
11//! - [`PersonalityState`] -- Loaded personality files (OS, firmware, identity)
12//! - [`EventBus`] -- Tokio broadcast channel for WebSocket event push
13//!
14//! ## Modules
15//!
16//! - `api` -- REST API mount point, `build_router()`, route modules
17//! - `auth` -- API key authentication middleware layer
18//! - `rate_limit` -- Global + per-IP rate limiting (sliding window)
19//! - `dashboard` -- Embedded SPA serving (compile-time or filesystem)
20//! - `ws` -- WebSocket upgrade and event broadcasting
21//! - `cli/` -- CLI command handlers (serve, status, sessions, memory, wallet, etc.)
22//! - `daemon` -- Daemon install, status, uninstall
23//! - `migrate/` -- Migration engine, skill import/export
24//! - `plugins` -- Plugin registry initialization and loading
25//!
26//! ## Bootstrap Sequence
27//!
28//! 1. Parse CLI → load config → init DB → load wallet → generate HMAC secret
29//! 2. Init LLM client + router + embedding → load semantic cache
30//! 3. Init agent loop + tool registry + memory retriever
31//! 4. Register channel adapters (Telegram, WhatsApp, Discord, Signal)
32//! 5. Spawn background daemons (heartbeat, cron, cache flush, ANN rebuild)
33//! 6. Build axum router with auth + CORS + rate limiting
34
35// ── Re-exports from roboticus-api ──────────────────────────────
36pub use roboticus_api as api_crate;
37pub use roboticus_api::abuse;
38pub use roboticus_api::api;
39pub use roboticus_api::auth;
40pub use roboticus_api::config_runtime;
41pub use roboticus_api::cron_runtime;
42pub use roboticus_api::dashboard;
43pub use roboticus_api::rate_limit;
44pub use roboticus_api::ws;
45pub use roboticus_api::ws_ticket;
46pub use roboticus_api::{
47    AppState, EventBus, PersonalityState, TicketStore, build_dashboard_html, build_mcp_router,
48    build_public_router, build_router, dashboard_handler, ws_route,
49};
50
51// ── Re-exports from roboticus-cli ─────────────────────────────
52pub use roboticus_cli::cli;
53pub use roboticus_cli::migrate;
54
55// ── Server-local modules ─────────────────────────────────────
56pub mod config_maintenance;
57pub mod daemon;
58pub mod plugins;
59pub use roboticus_cli::state_hygiene;
60
61use std::sync::Arc;
62use std::sync::OnceLock;
63use std::sync::atomic::{AtomicBool, Ordering};
64use std::time::Duration;
65
66use tokio::sync::RwLock;
67use tower_http::cors::CorsLayer;
68
69use auth::ApiKeyLayer;
70use roboticus_agent::policy::{
71    AuthorityRule, CommandSafetyRule, FinancialRule, PathProtectionRule, PolicyEngine,
72    RateLimitRule, ValidationRule,
73};
74use roboticus_agent::subagents::SubagentRegistry;
75use roboticus_browser::Browser;
76use roboticus_channels::ChannelAdapter;
77use roboticus_channels::a2a::A2aProtocol;
78use roboticus_channels::router::ChannelRouter;
79use roboticus_channels::telegram::TelegramAdapter;
80use roboticus_channels::whatsapp::WhatsAppAdapter;
81use roboticus_core::RoboticusConfig;
82use roboticus_db::Database;
83use roboticus_llm::LlmService;
84use roboticus_llm::OAuthManager;
85use roboticus_wallet::{WalletPaymentHandler, WalletService};
86
87use roboticus_agent::approvals::ApprovalManager;
88use roboticus_agent::obsidian::ObsidianVault;
89use roboticus_agent::obsidian_tools::{ObsidianReadTool, ObsidianSearchTool, ObsidianWriteTool};
90use roboticus_agent::tools::{
91    BashTool, EchoTool, EditFileTool, GlobFilesTool, ListDirectoryTool, ReadFileTool,
92    ScriptRunnerTool, SearchFilesTool, ToolRegistry, WriteFileTool,
93};
94use roboticus_channels::discord::DiscordAdapter;
95use roboticus_channels::email::EmailAdapter;
96use roboticus_channels::signal::SignalAdapter;
97use roboticus_channels::voice::{VoiceConfig, VoicePipeline};
98
99use rate_limit::GlobalRateLimitLayer;
100
101static STDERR_ENABLED: AtomicBool = AtomicBool::new(false);
102static LOG_GUARD: OnceLock<tracing_appender::non_blocking::WorkerGuard> = OnceLock::new();
103
104fn is_taskable_subagent_role(role: &str) -> bool {
105    role.eq_ignore_ascii_case("subagent")
106        || role.eq_ignore_ascii_case("specialist")
107        || role.eq_ignore_ascii_case("observer")
108}
109
110pub fn enable_stderr_logging() {
111    STDERR_ENABLED.store(true, Ordering::Release);
112}
113
114fn init_logging(config: &RoboticusConfig) {
115    use tracing_appender::rolling::{RollingFileAppender, Rotation};
116    use tracing_subscriber::EnvFilter;
117    use tracing_subscriber::Layer;
118    use tracing_subscriber::filter::filter_fn;
119    use tracing_subscriber::fmt;
120    use tracing_subscriber::layer::SubscriberExt;
121    use tracing_subscriber::util::SubscriberInitExt;
122
123    let level = config.agent.log_level.as_str();
124    // Promote hyper errors to warn so IncompleteMessage and connection
125    // resets are visible in production logs (they default to DEBUG).
126    let base_filter = format!("{level},hyper=warn,h2=warn,rustls=warn");
127    let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&base_filter));
128
129    let stderr_gate = filter_fn(|_| STDERR_ENABLED.load(Ordering::Acquire));
130
131    let log_dir = &config.server.log_dir;
132    let stderr_layer = fmt::layer()
133        .with_writer(std::io::stderr)
134        .with_filter(stderr_gate);
135
136    let file_appender = std::fs::create_dir_all(log_dir).ok().and_then(|_| {
137        RollingFileAppender::builder()
138            .rotation(Rotation::DAILY)
139            .filename_prefix("roboticus")
140            .filename_suffix("log")
141            .build(log_dir)
142            .map_err(|e| {
143                eprintln!(
144                    "warning: failed to initialize file logging in {}: {e}",
145                    log_dir.display()
146                );
147                e
148            })
149            .ok()
150    });
151
152    if let Some(file_appender) = file_appender {
153        let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
154        let _ = LOG_GUARD.set(guard);
155
156        let file_layer = fmt::layer()
157            .with_writer(non_blocking)
158            .with_ansi(false)
159            .json();
160
161        let _ = tracing_subscriber::registry()
162            .with(filter)
163            .with(stderr_layer)
164            .with(file_layer)
165            .try_init();
166    } else {
167        let _ = tracing_subscriber::registry()
168            .with(filter)
169            .with(stderr_layer)
170            .try_init();
171    }
172
173    cleanup_old_logs(log_dir, config.server.log_max_days);
174}
175
176fn cleanup_old_logs(log_dir: &std::path::Path, max_days: u32) {
177    let cutoff =
178        std::time::SystemTime::now() - std::time::Duration::from_secs(u64::from(max_days) * 86400);
179
180    let entries = match std::fs::read_dir(log_dir) {
181        Ok(e) => e,
182        Err(_) => return,
183    };
184
185    for entry in entries.flatten() {
186        let path = entry.path();
187        if path.extension().and_then(|e| e.to_str()) != Some("log") {
188            continue;
189        }
190        if let Ok(meta) = entry.metadata()
191            && let Ok(modified) = meta.modified()
192            && modified < cutoff
193        {
194            // best-effort: log rotation cleanup failure is non-critical
195            let _ = std::fs::remove_file(&path);
196        }
197    }
198}
199
200/// Resolve a channel token: keystore reference first, then env var fallback.
201fn resolve_token(
202    token_ref: &Option<String>,
203    token_env: &str,
204    keystore: &roboticus_core::Keystore,
205) -> String {
206    if let Some(r) = token_ref
207        && let Some(name) = r.strip_prefix("keystore:")
208    {
209        if let Some(val) = keystore.get(name) {
210            return val;
211        }
212        tracing::warn!(key = %name, "keystore reference not found, falling back to env var");
213    }
214    if !token_env.is_empty() {
215        return match std::env::var(token_env) {
216            Ok(val) if !val.is_empty() => val,
217            Ok(_) => {
218                tracing::warn!(env_var = %token_env, "API key env var is set but empty");
219                String::new()
220            }
221            Err(_) => {
222                tracing::warn!(env_var = %token_env, "API key env var is not set");
223                String::new()
224            }
225        };
226    }
227    String::new()
228}
229
230/// Builds the application state and router from config. Used by the binary and by tests.
231pub async fn bootstrap(
232    config: RoboticusConfig,
233) -> Result<axum::Router, Box<dyn std::error::Error>> {
234    bootstrap_with_config_path(config, None).await
235}
236
237/// Lightweight bootstrap phase tracker. Prints dots while running,
238/// then appends `ok` or `(Xs)` when done.
239struct BootPhase {
240    start: std::time::Instant,
241    dot_handle: tokio::task::JoinHandle<()>,
242}
243
244impl BootPhase {
245    fn start(label: &str) -> Self {
246        use std::io::Write;
247        eprint!("  {label} ");
248        let _ = std::io::stderr().flush();
249        let dot_handle = tokio::spawn(async {
250            loop {
251                tokio::time::sleep(std::time::Duration::from_millis(500)).await;
252                eprint!(".");
253                let _ = std::io::stderr().flush();
254            }
255        });
256        Self {
257            start: std::time::Instant::now(),
258            dot_handle,
259        }
260    }
261
262    fn done(self) {
263        self.dot_handle.abort();
264        let elapsed = self.start.elapsed();
265        if elapsed.as_millis() >= 500 {
266            eprintln!("({:.1}s)", elapsed.as_secs_f64());
267        } else {
268            eprintln!("ok");
269        }
270    }
271}
272
273pub async fn bootstrap_with_config_path(
274    config: RoboticusConfig,
275    config_path: Option<std::path::PathBuf>,
276) -> Result<axum::Router, Box<dyn std::error::Error>> {
277    init_logging(&config);
278
279    let personality_state = api::PersonalityState::from_workspace(&config.agent.workspace);
280
281    if !personality_state.os_text.is_empty() {
282        tracing::info!(
283            personality = %personality_state.identity.name,
284            generated_by = %personality_state.identity.generated_by,
285            "Loaded personality files from workspace"
286        );
287    } else {
288        tracing::info!("No personality files found in workspace, using defaults");
289    }
290
291    let db_path = config.database.path.to_string_lossy().to_string();
292    let bp = BootPhase::start("database");
293    let db = Database::new(&db_path)?;
294    bp.done();
295    match crate::state_hygiene::run_state_hygiene(&config.database.path) {
296        Ok(report) if report.changed => {
297            tracing::info!(
298                changed_rows = report.changed_rows,
299                subagent_rows_normalized = report.subagent_rows_normalized,
300                cron_payload_rows_repaired = report.cron_payload_rows_repaired,
301                cron_jobs_disabled_invalid_expr = report.cron_jobs_disabled_invalid_expr,
302                "applied startup mechanic checks"
303            );
304        }
305        Ok(_) => {}
306        Err(e) => tracing::warn!(error = %e, "startup mechanic checks failed"),
307    }
308    match roboticus_db::sessions::backfill_nicknames(&db) {
309        Ok(0) => {}
310        Ok(n) => tracing::info!(count = n, "Backfilled session nicknames"),
311        Err(e) => tracing::warn!(error = %e, "Failed to backfill session nicknames"),
312    }
313    let bp = BootPhase::start("LLM service");
314    let mut llm = LlmService::new(&config)?;
315    // Seed quality tracker with recent observations so metascore routing
316    // has a warm start instead of assuming 0.8 for every model.
317    match roboticus_db::metrics::recent_quality_scores(&db, 200) {
318        Ok(scores) if !scores.is_empty() => llm.quality.seed_from_history(&scores),
319        Ok(_) => {}
320        Err(e) => tracing::warn!(error = %e, "failed to seed quality tracker from history"),
321    }
322    bp.done();
323    let bp = BootPhase::start("wallet");
324    let wallet = match tokio::time::timeout(
325        std::time::Duration::from_secs(30),
326        WalletService::new(&config),
327    )
328    .await
329    {
330        Ok(result) => result?,
331        Err(_) => {
332            bp.done();
333            return Err("wallet init timed out (30s) — check RPC endpoint connectivity".into());
334        }
335    };
336    bp.done();
337
338    // Wire x402 payment handler: when the LLM client hits an HTTP 402, it
339    // signs an EIP-3009 authorization via the agent wallet and retries.
340    // The treasury policy enforces per-payment caps from configuration.
341    let wallet_arc = Arc::new(wallet.wallet.clone());
342    let treasury_policy = roboticus_wallet::treasury::TreasuryPolicy::new(&config.treasury);
343    let x402_handler =
344        Arc::new(WalletPaymentHandler::new(wallet_arc).with_treasury_policy(treasury_policy));
345    llm.set_payment_handler(x402_handler);
346
347    let bp = BootPhase::start("protocols + plugins");
348    let a2a = A2aProtocol::new(config.a2a.clone());
349    let plugin_env = {
350        let mut env = std::collections::HashMap::new();
351        env.insert("ROBOTICUS_DELEGATION_DEPTH".into(), "0".into());
352        env.insert(
353            "ROBOTICUS_WORKSPACE".into(),
354            config.agent.workspace.display().to_string(),
355        );
356        if let Some(ref ws) = config.skills.workspace_dir {
357            env.insert("ROBOTICUS_SKILLS_DIR".into(), ws.display().to_string());
358        }
359        env
360    };
361    let plugin_registry = plugins::init_plugin_registry(&config.plugins, plugin_env).await;
362    let mut policy_engine = PolicyEngine::new();
363    policy_engine.add_rule(Box::new(AuthorityRule));
364    policy_engine.add_rule(Box::new(CommandSafetyRule));
365    policy_engine.add_rule(Box::new(FinancialRule::new(
366        config.treasury.per_payment_cap,
367    )));
368    policy_engine.add_rule(Box::new(PathProtectionRule::from_config(
369        &config.security.filesystem,
370    )));
371    policy_engine.add_rule(Box::new(RateLimitRule::default()));
372    policy_engine.add_rule(Box::new(ValidationRule));
373    let policy_engine = Arc::new(policy_engine);
374    let browser = Arc::new(Browser::new(config.browser.clone()));
375    let registry = Arc::new(SubagentRegistry::new(4, vec![]));
376
377    if let Ok(sub_agents) = roboticus_db::agents::list_enabled_sub_agents(&db) {
378        for sa in &sub_agents {
379            if !is_taskable_subagent_role(&sa.role) {
380                continue;
381            }
382            let resolved_model = match sa.model.trim().to_ascii_lowercase().as_str() {
383                "auto" | "orchestrator" => llm.router.select_model().to_string(),
384                _ => sa.model.clone(),
385            };
386            let fixed_skills = sa
387                .skills_json
388                .as_deref()
389                .and_then(|s| serde_json::from_str::<Vec<String>>(s).ok())
390                .unwrap_or_default();
391            let agent_config = roboticus_agent::subagents::AgentInstanceConfig {
392                id: sa.name.clone(),
393                name: sa.display_name.clone().unwrap_or_else(|| sa.name.clone()),
394                model: resolved_model,
395                skills: fixed_skills,
396                allowed_subagents: vec![],
397                max_concurrent: 4,
398            };
399            if let Err(e) = registry.register(agent_config).await {
400                tracing::warn!(agent = %sa.name, err = %e, "failed to register sub-agent");
401            } else if let Err(e) = registry.start_agent(&sa.name).await {
402                tracing::warn!(agent = %sa.name, err = %e, "failed to auto-start sub-agent");
403            }
404        }
405        if !sub_agents.is_empty() {
406            tracing::info!(
407                count = sub_agents.len(),
408                "registered sub-agents from database"
409            );
410        }
411    }
412
413    let event_bus = EventBus::new(256);
414
415    let keystore =
416        roboticus_core::keystore::Keystore::new(roboticus_core::keystore::Keystore::default_path());
417    if let Err(e) = keystore.unlock_machine() {
418        tracing::warn!("keystore auto-unlock failed: {e}");
419    }
420    let keystore = Arc::new(keystore);
421
422    bp.done();
423    let bp = BootPhase::start("channels");
424    let channel_router = Arc::new(ChannelRouter::with_store(db.clone()).await);
425    let telegram: Option<Arc<TelegramAdapter>> =
426        if let Some(ref tg_config) = config.channels.telegram {
427            if tg_config.enabled {
428                let token = resolve_token(&tg_config.token_ref, &tg_config.token_env, &keystore);
429                if !token.is_empty() {
430                    let adapter = Arc::new(TelegramAdapter::with_config(
431                        token,
432                        tg_config.poll_timeout_seconds,
433                        tg_config.allowed_chat_ids.clone(),
434                        tg_config.webhook_secret.clone(),
435                        config.security.deny_on_empty_allowlist,
436                    ));
437                    channel_router
438                        .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
439                        .await;
440                    tracing::info!("Telegram adapter registered");
441                    if tg_config.webhook_secret.is_none() {
442                        tracing::warn!(
443                            "Telegram webhook_secret not set; webhook endpoint will reject with 503"
444                        );
445                    }
446                    Some(adapter)
447                } else {
448                    tracing::warn!(
449                        token_env = %tg_config.token_env,
450                        "Telegram enabled but token is empty"
451                    );
452                    None
453                }
454            } else {
455                None
456            }
457        } else {
458            None
459        };
460
461    let whatsapp: Option<Arc<WhatsAppAdapter>> =
462        if let Some(ref wa_config) = config.channels.whatsapp {
463            if wa_config.enabled {
464                let token = resolve_token(&wa_config.token_ref, &wa_config.token_env, &keystore);
465                if !token.is_empty() && !wa_config.phone_number_id.is_empty() {
466                    let adapter = Arc::new(WhatsAppAdapter::with_config(
467                        token,
468                        wa_config.phone_number_id.clone(),
469                        wa_config.verify_token.clone(),
470                        wa_config.allowed_numbers.clone(),
471                        wa_config.app_secret.clone(),
472                        config.security.deny_on_empty_allowlist,
473                    )?);
474                    channel_router
475                        .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
476                        .await;
477                    tracing::info!("WhatsApp adapter registered");
478                    if wa_config.app_secret.is_none() {
479                        tracing::warn!(
480                            "WhatsApp app_secret not set; webhook endpoint will reject with 503"
481                        );
482                    }
483                    Some(adapter)
484                } else {
485                    tracing::warn!("WhatsApp enabled but token env or phone_number_id is empty");
486                    None
487                }
488            } else {
489                None
490            }
491        } else {
492            None
493        };
494
495    let discord: Option<Arc<DiscordAdapter>> = if let Some(ref dc_config) = config.channels.discord
496    {
497        if dc_config.enabled {
498            let token = resolve_token(&dc_config.token_ref, &dc_config.token_env, &keystore);
499            if !token.is_empty() {
500                let adapter = Arc::new(DiscordAdapter::with_config(
501                    token,
502                    dc_config.allowed_guild_ids.clone(),
503                    config.security.deny_on_empty_allowlist,
504                ));
505                channel_router
506                    .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
507                    .await;
508                // Start WebSocket gateway for real-time event reception
509                if let Err(e) = adapter.connect_gateway().await {
510                    tracing::error!(error = %e, "Failed to connect Discord gateway");
511                }
512                tracing::info!("Discord adapter registered with gateway");
513                Some(adapter)
514            } else {
515                tracing::warn!(
516                    token_env = %dc_config.token_env,
517                    "Discord enabled but token env var is empty"
518                );
519                None
520            }
521        } else {
522            None
523        }
524    } else {
525        None
526    };
527
528    let signal: Option<Arc<SignalAdapter>> = if let Some(ref sig_config) = config.channels.signal {
529        if sig_config.enabled {
530            if !sig_config.phone_number.is_empty() {
531                let adapter = Arc::new(SignalAdapter::with_config(
532                    sig_config.phone_number.clone(),
533                    sig_config.daemon_url.clone(),
534                    sig_config.allowed_numbers.clone(),
535                    config.security.deny_on_empty_allowlist,
536                ));
537                channel_router
538                    .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
539                    .await;
540                tracing::info!("Signal adapter registered");
541                Some(adapter)
542            } else {
543                tracing::warn!("Signal enabled but phone_number is empty");
544                None
545            }
546        } else {
547            None
548        }
549    } else {
550        None
551    };
552
553    let email: Option<Arc<EmailAdapter>> = if config.channels.email.enabled {
554        let email_cfg = &config.channels.email;
555        let password = if email_cfg.password_env.is_empty() {
556            String::new()
557        } else {
558            match std::env::var(&email_cfg.password_env) {
559                Ok(val) => val,
560                Err(_) => {
561                    tracing::warn!(env_var = %email_cfg.password_env, "email password env var is not set");
562                    String::new()
563                }
564            }
565        };
566        if email_cfg.smtp_host.is_empty()
567            || email_cfg.username.is_empty()
568            || password.is_empty()
569            || email_cfg.from_address.is_empty()
570        {
571            tracing::warn!("Email enabled but SMTP credentials are incomplete");
572            None
573        } else {
574            match EmailAdapter::new(
575                email_cfg.from_address.clone(),
576                email_cfg.smtp_host.clone(),
577                email_cfg.smtp_port,
578                email_cfg.imap_host.clone(),
579                email_cfg.imap_port,
580                email_cfg.username.clone(),
581                password,
582            ) {
583                Ok(email_adapter) => {
584                    // Resolve OAuth2 token from env if configured
585                    let oauth2_token =
586                        if email_cfg.use_oauth2 && !email_cfg.oauth2_token_env.is_empty() {
587                            std::env::var(&email_cfg.oauth2_token_env).ok()
588                        } else {
589                            None
590                        };
591                    let adapter = Arc::new(
592                        email_adapter
593                            .with_allowed_senders(email_cfg.allowed_senders.clone())
594                            .with_deny_on_empty(config.security.deny_on_empty_allowlist)
595                            .with_poll_interval(std::time::Duration::from_secs(
596                                email_cfg.poll_interval_seconds,
597                            ))
598                            .with_oauth2_token(oauth2_token)
599                            .with_imap_idle_enabled(email_cfg.imap_idle_enabled),
600                    );
601                    channel_router
602                        .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
603                        .await;
604                    tracing::info!("Email adapter registered");
605
606                    // Start background IMAP listener if IMAP is configured
607                    if !email_cfg.imap_host.is_empty()
608                        && let Err(e) = adapter.start_imap_listener().await
609                    {
610                        tracing::error!(error = %e, "Failed to start email IMAP listener");
611                    }
612
613                    Some(adapter)
614                }
615                Err(e) => {
616                    tracing::error!(error = %e, "Failed to create email adapter");
617                    None
618                }
619            }
620        }
621    } else {
622        None
623    };
624
625    let voice: Option<Arc<RwLock<VoicePipeline>>> = if config.channels.voice.enabled {
626        let mut voice_config = VoiceConfig::default();
627        if let Some(stt) = &config.channels.voice.stt_model {
628            voice_config.stt_model = stt.clone();
629        }
630        if let Some(tts) = &config.channels.voice.tts_model {
631            voice_config.tts_model = tts.clone();
632        }
633        if let Some(v) = &config.channels.voice.tts_voice {
634            voice_config.tts_voice = v.clone();
635        }
636        if let Ok(key) = std::env::var("OPENAI_API_KEY")
637            && !key.is_empty()
638        {
639            voice_config.api_key = Some(key);
640        }
641        tracing::info!("Voice pipeline initialized");
642        Some(Arc::new(RwLock::new(VoicePipeline::new(voice_config))))
643    } else {
644        None
645    };
646
647    let hmac_secret = {
648        use rand::RngCore;
649        let mut buf = vec![0u8; 32];
650        rand::rngs::OsRng.fill_bytes(&mut buf);
651        buf
652    };
653
654    let retriever = Arc::new(roboticus_agent::retrieval::MemoryRetriever::new(
655        config.memory.clone(),
656    ));
657
658    let mut tool_registry = ToolRegistry::new();
659    tool_registry.register(Box::new(EchoTool));
660    tool_registry.register(Box::new(BashTool));
661    tool_registry.register(Box::new(ScriptRunnerTool::new(
662        config.skills.clone(),
663        config.security.filesystem.clone(),
664    )));
665    tool_registry.register(Box::new(ReadFileTool));
666    tool_registry.register(Box::new(WriteFileTool));
667    tool_registry.register(Box::new(EditFileTool));
668    tool_registry.register(Box::new(ListDirectoryTool));
669    tool_registry.register(Box::new(GlobFilesTool));
670    tool_registry.register(Box::new(SearchFilesTool));
671
672    // Cron tool — manages scheduled jobs through the tool registry
673    use roboticus_agent::tools::CronTool;
674    tool_registry.register(Box::new(CronTool));
675
676    // Introspection tools — read-only runtime probes for agent self-awareness
677    use roboticus_agent::tools::{
678        GetChannelHealthTool, GetMemoryStatsTool, GetRuntimeContextTool, GetSubagentStatusTool,
679        RecallMemoryTool,
680    };
681    tool_registry.register(Box::new(GetRuntimeContextTool));
682    tool_registry.register(Box::new(GetMemoryStatsTool));
683    tool_registry.register(Box::new(GetChannelHealthTool));
684    tool_registry.register(Box::new(GetSubagentStatusTool));
685    tool_registry.register(Box::new(RecallMemoryTool));
686
687    // Data tools — agent-managed tables via hippocampus
688    use roboticus_agent::tools::{AlterTableTool, CreateTableTool, DropTableTool};
689    tool_registry.register(Box::new(CreateTableTool));
690    tool_registry.register(Box::new(AlterTableTool));
691    tool_registry.register(Box::new(DropTableTool));
692
693    // Bridge active plugin tools into the same runtime registry used by ReAct.
694    bp.done();
695    let bp = BootPhase::start("plugin tools");
696    plugins::register_plugin_tools(&mut tool_registry, Arc::clone(&plugin_registry)).await;
697    bp.done();
698
699    // Obsidian vault integration
700    let obsidian_vault: Option<Arc<RwLock<ObsidianVault>>> = if config.obsidian.enabled {
701        match ObsidianVault::from_config(&config.obsidian) {
702            Ok(vault) => {
703                let vault = Arc::new(RwLock::new(vault));
704                tool_registry.register(Box::new(ObsidianReadTool::new(Arc::clone(&vault))));
705                tool_registry.register(Box::new(ObsidianWriteTool::new(Arc::clone(&vault))));
706                tool_registry.register(Box::new(ObsidianSearchTool::new(Arc::clone(&vault))));
707                tracing::info!("Obsidian vault integration enabled");
708                Some(vault)
709            }
710            Err(e) => {
711                tracing::warn!(error = %e, "Failed to initialize Obsidian vault");
712                None
713            }
714        }
715    } else {
716        None
717    };
718
719    // Start vault file watcher if configured
720    if let Some(ref vault) = obsidian_vault
721        && config.obsidian.watch_for_changes
722    {
723        match roboticus_agent::obsidian::watcher::VaultWatcher::start(Arc::clone(vault)).await {
724            Ok(_watcher) => {
725                // watcher lives in the spawned task — drop is fine here
726                tracing::info!("Obsidian vault file watcher started");
727            }
728            Err(e) => {
729                tracing::warn!(error = %e, "Failed to start Obsidian vault watcher");
730            }
731        }
732    }
733
734    let tool_registry = Arc::new(tool_registry);
735
736    let capabilities = Arc::new(roboticus_agent::capability::CapabilityRegistry::new());
737    if let Err(e) = capabilities
738        .sync_from_tool_registry(Arc::clone(&tool_registry))
739        .await
740    {
741        tracing::warn!(error = %e, "initial capability sync from tool registry reported errors");
742    }
743
744    let approvals = Arc::new(ApprovalManager::new(config.approvals.clone()));
745
746    let oauth = Arc::new(OAuthManager::new()?);
747
748    let discovery_registry = Arc::new(RwLock::new(
749        roboticus_agent::discovery::DiscoveryRegistry::new(),
750    ));
751    let device_manager = Arc::new(RwLock::new(roboticus_agent::device::DeviceManager::new(
752        roboticus_agent::device::DeviceIdentity::generate(&config.agent.id),
753        config.devices.max_paired_devices,
754    )));
755
756    let mut mcp_clients = roboticus_agent::mcp::McpClientManager::new();
757    for c in &config.mcp.clients {
758        let mut conn =
759            roboticus_agent::mcp::McpClientConnection::new(c.name.clone(), c.url.clone());
760        if let Err(e) = conn.discover() {
761            tracing::warn!(mcp_name = %c.name, error = %e, "MCP client discovery failed at startup");
762        }
763        mcp_clients.add_connection(conn);
764    }
765    let mcp_clients = Arc::new(RwLock::new(mcp_clients));
766
767    let live_mcp = Arc::new(roboticus_agent::mcp::manager::McpConnectionManager::new());
768    let bp = BootPhase::start("MCP connections");
769    match tokio::time::timeout(
770        std::time::Duration::from_secs(30),
771        live_mcp.connect_all(&config.mcp.servers, &capabilities),
772    )
773    .await
774    {
775        Ok(()) => {}
776        Err(_) => {
777            tracing::warn!("MCP connect_all timed out after 30s; some servers may be unavailable");
778        }
779    }
780    bp.done();
781    let live_mcp_tool_count = live_mcp.connected_count().await;
782    if live_mcp_tool_count > 0 {
783        tracing::info!(
784            servers = live_mcp_tool_count,
785            "MCP client connections established"
786        );
787    }
788
789    let mut mcp_server_registry = roboticus_agent::mcp::McpServerRegistry::new();
790    let exported = roboticus_agent::mcp::export_tools_as_mcp(
791        &tool_registry
792            .list()
793            .iter()
794            .map(|t| {
795                (
796                    t.name().to_string(),
797                    t.description().to_string(),
798                    t.parameters_schema(),
799                )
800            })
801            .collect::<Vec<_>>(),
802    );
803    for tool in exported {
804        mcp_server_registry.register_tool(tool);
805    }
806    mcp_server_registry.register_resource(roboticus_agent::mcp::McpResource {
807        uri: "roboticus://sessions/active".to_string(),
808        name: "Active Sessions".to_string(),
809        description: "Active sessions in the local runtime".to_string(),
810        mime_type: "application/json".to_string(),
811    });
812    mcp_server_registry.register_resource(roboticus_agent::mcp::McpResource {
813        uri: "roboticus://metrics/capacity".to_string(),
814        name: "Provider Capacity Stats".to_string(),
815        description: "Current provider utilization and headroom".to_string(),
816        mime_type: "application/json".to_string(),
817    });
818    let mcp_server_registry = Arc::new(RwLock::new(mcp_server_registry));
819
820    let ann_index = roboticus_db::ann::AnnIndex::new(config.memory.ann_index);
821    if config.memory.ann_index {
822        match ann_index.build_from_db(&db) {
823            Ok(count) => {
824                if ann_index.is_built() {
825                    tracing::info!(count, "ANN index built from database");
826                } else {
827                    tracing::info!(
828                        count,
829                        min = ann_index.min_entries_for_index,
830                        "ANN index below threshold, brute-force search will be used"
831                    );
832                }
833            }
834            Err(e) => {
835                tracing::warn!(error = %e, "Failed to build ANN index, falling back to brute-force");
836            }
837        }
838    }
839
840    // Initialize media service for multimodal attachment handling
841    let media_service = if config.multimodal.enabled {
842        match roboticus_channels::media::MediaService::new(&config.multimodal) {
843            Ok(svc) => {
844                tracing::info!(
845                    media_dir = ?config.multimodal.media_dir,
846                    "Media service initialized"
847                );
848                Some(Arc::new(svc))
849            }
850            Err(e) => {
851                tracing::error!(error = %e, "Failed to initialize media service");
852                None
853            }
854        }
855    } else {
856        None
857    };
858
859    let resolved_config_path =
860        config_path.unwrap_or_else(crate::config_runtime::resolve_default_config_path);
861
862    // Build rate limiter once — AppState and the middleware layer share the same
863    // Arc<Mutex<…>> so admin observability sees live counters.
864    let rate_limiter = GlobalRateLimitLayer::new(
865        u64::from(config.server.rate_limit_requests),
866        Duration::from_secs(config.server.rate_limit_window_secs),
867    )
868    .with_per_ip_capacity(u64::from(config.server.per_ip_rate_limit_requests))
869    .with_per_actor_capacity(u64::from(config.server.per_actor_rate_limit_requests))
870    .with_trusted_proxy_cidrs(&config.server.trusted_proxy_cidrs);
871
872    // Semantic classifier: backed by the same embedding client as the rest of
873    // the pipeline. Centroid vectors are computed lazily on first classify call
874    // and cached for the lifetime of the process.
875    let semantic_classifier = Arc::new(
876        roboticus_llm::semantic_classifier::SemanticClassifier::new(llm.embedding.clone()),
877    );
878
879    let state = AppState {
880        db,
881        config: Arc::new(RwLock::new(config.clone())),
882        llm: Arc::new(RwLock::new(llm)),
883        wallet: Arc::new(wallet),
884        a2a: Arc::new(RwLock::new(a2a)),
885        personality: Arc::new(RwLock::new(personality_state)),
886        hmac_secret: Arc::new(hmac_secret),
887        interviews: Arc::new(RwLock::new(std::collections::HashMap::new())),
888        plugins: plugin_registry,
889        policy_engine,
890        browser,
891        registry,
892        event_bus: event_bus.clone(),
893        channel_router,
894        telegram,
895        whatsapp,
896        retriever,
897        ann_index,
898        tools: tool_registry,
899        capabilities,
900        approvals,
901        discord,
902        signal,
903        email,
904        voice,
905        media_service,
906        discovery: discovery_registry,
907        devices: device_manager,
908        mcp_clients,
909        mcp_server: mcp_server_registry,
910        live_mcp,
911        oauth,
912        keystore,
913        obsidian: obsidian_vault,
914        started_at: std::time::Instant::now(),
915        config_path: Arc::new(resolved_config_path.clone()),
916        config_apply_status: crate::config_runtime::status_for_path(&resolved_config_path),
917        pending_specialist_proposals: Arc::new(RwLock::new(std::collections::HashMap::new())),
918        ws_tickets: ws_ticket::TicketStore::new(),
919        rate_limiter: rate_limiter.clone(),
920        semantic_classifier,
921    };
922    let bp = BootPhase::start("capability sync");
923    state.resync_capabilities_from_tools().await;
924    bp.done();
925
926    // Periodic ANN index rebuild (every 10 minutes)
927    if config.memory.ann_index {
928        let ann_db = state.db.clone();
929        let ann_idx = state.ann_index.clone();
930        tokio::spawn(async move {
931            let mut interval = tokio::time::interval(std::time::Duration::from_secs(600));
932            interval.tick().await;
933            loop {
934                interval.tick().await;
935                match ann_idx.rebuild(&ann_db) {
936                    Ok(count) => {
937                        tracing::debug!(count, "ANN index rebuilt");
938                    }
939                    Err(e) => {
940                        tracing::warn!(error = %e, "ANN index rebuild failed");
941                    }
942                }
943            }
944        });
945        tracing::info!("ANN index rebuild daemon spawned (10min interval)");
946    }
947
948    // Load persisted semantic cache
949    {
950        let loaded = roboticus_db::cache::load_cache_entries(&state.db)
951            .inspect_err(|e| tracing::warn!(error = %e, "failed to load semantic cache entries"))
952            .unwrap_or_default();
953        if !loaded.is_empty() {
954            let imported: Vec<(String, roboticus_llm::ExportedCacheEntry)> = loaded
955                .into_iter()
956                .map(|(id, pe)| {
957                    let ttl = pe
958                        .expires_at
959                        .and_then(|e| {
960                            chrono::NaiveDateTime::parse_from_str(&e, "%Y-%m-%dT%H:%M:%S")
961                                .ok()
962                                .or_else(|| {
963                                    chrono::NaiveDateTime::parse_from_str(&e, "%Y-%m-%d %H:%M:%S")
964                                        .ok()
965                                })
966                        })
967                        .map(|exp| {
968                            let now = chrono::Utc::now().naive_utc();
969                            if exp > now {
970                                (exp - now).num_seconds().max(0) as u64
971                            } else {
972                                0
973                            }
974                        })
975                        .unwrap_or(3600);
976
977                    (
978                        id,
979                        roboticus_llm::ExportedCacheEntry {
980                            content: pe.response,
981                            model: pe.model,
982                            tokens_saved: pe.tokens_saved,
983                            hits: pe.hit_count,
984                            involved_tools: false,
985                            embedding: pe.embedding,
986                            ttl_remaining_secs: ttl,
987                        },
988                    )
989                })
990                .collect();
991            let count = imported.len();
992            let llm = state.llm.read().await;
993            llm.cache
994                .lock()
995                .unwrap_or_else(|e| e.into_inner())
996                .import_entries(imported);
997            tracing::info!(count, "Loaded semantic cache from database");
998        }
999    }
1000
1001    // Periodic cache flush (every 5 minutes)
1002    {
1003        let flush_db = state.db.clone();
1004        let flush_llm = Arc::clone(&state.llm);
1005        tokio::spawn(async move {
1006            let mut interval = tokio::time::interval(std::time::Duration::from_secs(300));
1007            interval.tick().await; // skip first immediate tick
1008            loop {
1009                interval.tick().await;
1010                let entries = {
1011                    let llm = flush_llm.read().await;
1012                    llm.cache
1013                        .lock()
1014                        .unwrap_or_else(|e| e.into_inner())
1015                        .export_entries()
1016                };
1017                for (hash, entry) in &entries {
1018                    let expires = chrono::Utc::now()
1019                        + chrono::Duration::seconds(entry.ttl_remaining_secs as i64);
1020                    let pe = roboticus_db::cache::PersistedCacheEntry {
1021                        prompt_hash: hash.clone(),
1022                        response: entry.content.clone(),
1023                        model: entry.model.clone(),
1024                        tokens_saved: entry.tokens_saved,
1025                        hit_count: entry.hits,
1026                        embedding: entry.embedding.clone(),
1027                        created_at: chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S").to_string(),
1028                        expires_at: Some(expires.format("%Y-%m-%dT%H:%M:%S").to_string()),
1029                    };
1030                    roboticus_db::cache::save_cache_entry(&flush_db, hash, &pe)
1031                        .inspect_err(
1032                            |e| tracing::warn!(error = %e, hash, "failed to persist cache entry"),
1033                        )
1034                        .ok();
1035                }
1036                roboticus_db::cache::evict_expired_cache(&flush_db)
1037                    .inspect_err(|e| tracing::warn!(error = %e, "failed to evict expired cache"))
1038                    .ok();
1039                tracing::debug!(count = entries.len(), "Flushed semantic cache to database");
1040            }
1041        });
1042        tracing::info!("Cache flush daemon spawned (5min interval)");
1043    }
1044
1045    // Start heartbeat domain loops — shutdown_tx MUST live for the server's
1046    // lifetime; dropping it closes the watch channel and all loops exit.
1047    let (heartbeat_shutdown_tx, heartbeat_treasury_state) = {
1048        let hb_wallet = Arc::clone(&state.wallet);
1049        let hb_db = state.db.clone();
1050        let hb_session_cfg = config.session.clone();
1051        let hb_digest_cfg = config.digest.clone();
1052        let hb_agent_id = config.agent.id.clone();
1053        let hb_vault = state.obsidian.clone();
1054        let hb_config = config.heartbeat.clone();
1055        roboticus_schedule::spawn_domain_loops(
1056            hb_wallet,
1057            hb_db,
1058            hb_session_cfg,
1059            hb_digest_cfg,
1060            hb_agent_id,
1061            hb_vault,
1062            hb_config,
1063        )
1064        .await
1065    };
1066    tracing::info!(
1067        treasury_s = config.heartbeat.treasury_interval_seconds,
1068        yield_s = config.heartbeat.yield_interval_seconds,
1069        memory_s = config.heartbeat.memory_interval_seconds,
1070        maintenance_s = config.heartbeat.maintenance_interval_seconds,
1071        session_s = config.heartbeat.session_interval_seconds,
1072        discovery_s = config.heartbeat.discovery_interval_seconds,
1073        "domain loops spawned"
1074    );
1075    // CRITICAL: shutdown_tx and treasury_state MUST outlive the server.
1076    // Dropping shutdown_tx closes the watch channel, causing all domain
1077    // loops to exit immediately. Leaking keeps them alive for the process
1078    // lifetime. treasury_state will be wired into AppState in v0.12.0
1079    // for wallet API route access; until then, leak it too.
1080    std::mem::forget(heartbeat_shutdown_tx);
1081    std::mem::forget(heartbeat_treasury_state);
1082
1083    // Delivery retry queue drain (every 30 seconds)
1084    {
1085        let drain_router = Arc::clone(&state.channel_router);
1086        tokio::spawn(async move {
1087            let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
1088            interval.tick().await;
1089            loop {
1090                interval.tick().await;
1091                drain_router.drain_retry_queue().await;
1092            }
1093        });
1094        tracing::info!("Delivery retry queue drain daemon spawned (30s interval)");
1095    }
1096
1097    // Start cron worker
1098    {
1099        let instance_id = config.agent.id.clone();
1100        let cron_state = state.clone();
1101        tokio::spawn(async move {
1102            crate::cron_runtime::run_cron_worker(cron_state, instance_id).await;
1103        });
1104        tracing::info!("Cron worker spawned");
1105    }
1106
1107    // Periodic mechanic-check sweep (default 6h, per-instance).
1108    {
1109        let db_path = config.database.path.clone();
1110        let interval_secs = std::env::var("ROBOTICUS_MECHANIC_CHECK_INTERVAL_SECS")
1111            .ok()
1112            .or_else(|| std::env::var("ROBOTICUS_STATE_HYGIENE_INTERVAL_SECS").ok())
1113            .and_then(|v| v.parse::<u64>().ok())
1114            .filter(|v| *v >= 300)
1115            .unwrap_or(21_600);
1116        tokio::spawn(async move {
1117            let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
1118            interval.tick().await;
1119            loop {
1120                interval.tick().await;
1121                match crate::state_hygiene::run_state_hygiene(&db_path) {
1122                    Ok(report) if report.changed => {
1123                        tracing::info!(
1124                            changed_rows = report.changed_rows,
1125                            subagent_rows_normalized = report.subagent_rows_normalized,
1126                            cron_payload_rows_repaired = report.cron_payload_rows_repaired,
1127                            cron_jobs_disabled_invalid_expr =
1128                                report.cron_jobs_disabled_invalid_expr,
1129                            "periodic mechanic checks applied"
1130                        );
1131                    }
1132                    Ok(_) => tracing::debug!("periodic mechanic checks: no changes"),
1133                    Err(e) => tracing::warn!(error = %e, "periodic mechanic checks failed"),
1134                }
1135            }
1136        });
1137        tracing::info!(interval_secs, "Mechanic checks daemon spawned");
1138    }
1139
1140    {
1141        let startup_announce_channels = config.channels.startup_announcement_channels();
1142
1143        let display_host = match config.server.bind.as_str() {
1144            "127.0.0.1" | "0.0.0.0" | "::1" | "::" => "localhost",
1145            other => other,
1146        };
1147        let channel_list: Vec<&str> = {
1148            let mut ch = vec!["web"];
1149            if config.channels.telegram.is_some() {
1150                ch.push("telegram");
1151            }
1152            if config.channels.whatsapp.is_some() {
1153                ch.push("whatsapp");
1154            }
1155            if config.channels.discord.is_some() {
1156                ch.push("discord");
1157            }
1158            if config.channels.signal.is_some() {
1159                ch.push("signal");
1160            }
1161            ch
1162        };
1163        let announce_text = format!(
1164            "🤖 *Roboticus Online*\n\n\
1165             🧠 *{}* (`{}`)\n\
1166             ⚡ `{}`\n\
1167             🔀 routing: {}\n\
1168             🌐 `{}:{}`\n\
1169             📡 {}\n\n\
1170             🕐 {}",
1171            config.agent.name,
1172            config.agent.id,
1173            config.models.primary,
1174            config.models.routing.mode,
1175            display_host,
1176            config.server.port,
1177            channel_list.join(" · "),
1178            chrono::Local::now().format("%Y-%m-%d %H:%M %Z"),
1179        );
1180
1181        if startup_announce_channels.iter().any(|c| c == "telegram") {
1182            if let (Some(adapter), Some(tg_cfg)) =
1183                (state.telegram.clone(), config.channels.telegram.as_ref())
1184            {
1185                let announce_targets = tg_cfg.allowed_chat_ids.clone();
1186                if announce_targets.is_empty() {
1187                    tracing::warn!(
1188                        "Telegram startup announcement skipped: channels.telegram.allowed_chat_ids is empty"
1189                    );
1190                } else {
1191                    let text = announce_text.clone();
1192                    tokio::spawn(async move {
1193                        for chat_id in announce_targets {
1194                            let chat = chat_id.to_string();
1195                            match adapter
1196                                .send(roboticus_channels::OutboundMessage {
1197                                    content: text.clone(),
1198                                    recipient_id: chat.clone(),
1199                                    metadata: None,
1200                                })
1201                                .await
1202                            {
1203                                Ok(()) => {
1204                                    tracing::info!(chat_id = %chat, "telegram startup announcement sent")
1205                                }
1206                                Err(e) => {
1207                                    tracing::warn!(chat_id = %chat, error = %e, "telegram startup announcement failed")
1208                                }
1209                            }
1210                        }
1211                    });
1212                }
1213            } else {
1214                tracing::warn!(
1215                    "Telegram startup announcement requested but telegram channel is not enabled/configured"
1216                );
1217            }
1218        }
1219
1220        if startup_announce_channels.iter().any(|c| c == "whatsapp") {
1221            if let (Some(adapter), Some(wa_cfg)) =
1222                (state.whatsapp.clone(), config.channels.whatsapp.as_ref())
1223            {
1224                let targets = wa_cfg.allowed_numbers.clone();
1225                if targets.is_empty() {
1226                    tracing::warn!(
1227                        "WhatsApp startup announcement skipped: channels.whatsapp.allowed_numbers is empty"
1228                    );
1229                } else {
1230                    let text = announce_text.clone();
1231                    tokio::spawn(async move {
1232                        for number in targets {
1233                            match adapter
1234                                .send(roboticus_channels::OutboundMessage {
1235                                    content: text.clone(),
1236                                    recipient_id: number.clone(),
1237                                    metadata: None,
1238                                })
1239                                .await
1240                            {
1241                                Ok(()) => {
1242                                    tracing::info!(recipient = %number, "whatsapp startup announcement sent")
1243                                }
1244                                Err(e) => {
1245                                    tracing::warn!(recipient = %number, error = %e, "whatsapp startup announcement failed")
1246                                }
1247                            }
1248                        }
1249                    });
1250                }
1251            } else {
1252                tracing::warn!(
1253                    "WhatsApp startup announcement requested but whatsapp channel is not enabled/configured"
1254                );
1255            }
1256        }
1257
1258        if startup_announce_channels.iter().any(|c| c == "signal") {
1259            if let (Some(adapter), Some(sig_cfg)) =
1260                (state.signal.clone(), config.channels.signal.as_ref())
1261            {
1262                let targets = sig_cfg.allowed_numbers.clone();
1263                if targets.is_empty() {
1264                    tracing::warn!(
1265                        "Signal startup announcement skipped: channels.signal.allowed_numbers is empty"
1266                    );
1267                } else {
1268                    let text = announce_text.clone();
1269                    tokio::spawn(async move {
1270                        for number in targets {
1271                            match adapter
1272                                .send(roboticus_channels::OutboundMessage {
1273                                    content: text.clone(),
1274                                    recipient_id: number.clone(),
1275                                    metadata: None,
1276                                })
1277                                .await
1278                            {
1279                                Ok(()) => {
1280                                    tracing::info!(recipient = %number, "signal startup announcement sent")
1281                                }
1282                                Err(e) => {
1283                                    tracing::warn!(recipient = %number, error = %e, "signal startup announcement failed")
1284                                }
1285                            }
1286                        }
1287                    });
1288                }
1289            } else {
1290                tracing::warn!(
1291                    "Signal startup announcement requested but signal channel is not enabled/configured"
1292                );
1293            }
1294        }
1295
1296        for ch in &startup_announce_channels {
1297            if ch != "telegram" && ch != "whatsapp" && ch != "signal" {
1298                tracing::warn!(
1299                    channel = %ch,
1300                    "startup announcement requested for channel without recipient mapping support"
1301                );
1302            }
1303        }
1304    }
1305
1306    if state.telegram.is_some() {
1307        let use_polling = config
1308            .channels
1309            .telegram
1310            .as_ref()
1311            .map(|c| !c.webhook_mode)
1312            .unwrap_or(true);
1313        if use_polling {
1314            let poll_state = state.clone();
1315            tokio::spawn(async move {
1316                api::telegram_poll_loop(poll_state).await;
1317            });
1318        }
1319    }
1320    if state.discord.is_some() {
1321        let poll_state = state.clone();
1322        tokio::spawn(async move {
1323            api::discord_poll_loop(poll_state).await;
1324        });
1325    }
1326    if state.signal.is_some() {
1327        let poll_state = state.clone();
1328        tokio::spawn(async move {
1329            api::signal_poll_loop(poll_state).await;
1330        });
1331    }
1332    if state.email.is_some() {
1333        let poll_state = state.clone();
1334        tokio::spawn(async move {
1335            api::email_poll_loop(poll_state).await;
1336        });
1337    }
1338
1339    let auth_layer = ApiKeyLayer::new(config.server.api_key.clone());
1340    let local_origin = format!("http://{}:{}", config.server.bind, config.server.port);
1341    let origin_header = local_origin
1342        .parse::<axum::http::HeaderValue>()
1343        .unwrap_or_else(|e| {
1344            tracing::warn!(
1345                origin = %local_origin,
1346                error = %e,
1347                "CORS origin failed to parse, falling back to 127.0.0.1 loopback"
1348            );
1349            axum::http::HeaderValue::from_static("http://127.0.0.1:3000")
1350        });
1351    let cors = CorsLayer::new()
1352        .allow_origin(origin_header)
1353        .allow_methods([
1354            axum::http::Method::GET,
1355            axum::http::Method::POST,
1356            axum::http::Method::PUT,
1357            axum::http::Method::DELETE,
1358        ])
1359        .allow_headers([
1360            axum::http::header::CONTENT_TYPE,
1361            axum::http::header::AUTHORIZATION,
1362            axum::http::HeaderName::from_static("x-api-key"),
1363        ]);
1364    let authed_routes = build_router(state.clone()).layer(auth_layer);
1365
1366    // WebSocket route handles its own auth (header OR ticket) so it
1367    // lives outside the API-key middleware layer.
1368    let ws_routes = axum::Router::new().route(
1369        "/ws",
1370        ws_route(
1371            event_bus.clone(),
1372            state.ws_tickets.clone(),
1373            config.server.api_key.clone(),
1374        ),
1375    );
1376
1377    // MCP protocol endpoint uses bearer token auth (same API key).
1378    // Lives outside the main API-key middleware because the MCP transport
1379    // service (StreamableHttpService) needs its own route handling.
1380    let mcp_routes = build_mcp_router(&state, config.server.api_key.clone());
1381
1382    let public_routes = build_public_router(state);
1383
1384    let app = authed_routes
1385        .merge(ws_routes)
1386        .merge(mcp_routes)
1387        .merge(public_routes)
1388        .layer(cors)
1389        .layer(rate_limiter);
1390    Ok(app)
1391}
1392
1393#[cfg(test)]
1394pub mod test_support;
1395
1396#[cfg(test)]
1397mod tests {
1398    use super::*;
1399    use crate::test_support::EnvGuard;
1400
1401    const BOOTSTRAP_CONFIG: &str = r#"
1402[agent]
1403name = "Roboticus"
1404id = "roboticus-test"
1405
1406[server]
1407port = 18789
1408bind = "127.0.0.1"
1409
1410[database]
1411path = ":memory:"
1412
1413[models]
1414primary = "ollama/qwen3:8b"
1415"#;
1416
1417    #[tokio::test]
1418    async fn bootstrap_with_memory_db_succeeds() {
1419        let config = RoboticusConfig::from_str(BOOTSTRAP_CONFIG).expect("parse config");
1420        let result = bootstrap(config).await;
1421        assert!(
1422            result.is_ok(),
1423            "bootstrap with :memory: should succeed: {:?}",
1424            result.err()
1425        );
1426    }
1427
1428    #[tokio::test]
1429    async fn bootstrap_handles_enabled_channels_with_missing_credentials() {
1430        let dir = tempfile::tempdir().unwrap();
1431        let db_path = dir.path().join("state.db");
1432        let cfg = format!(
1433            r#"
1434[agent]
1435name = "T"
1436id = "t"
1437
1438[server]
1439bind = "127.0.0.1"
1440port = 18789
1441
1442[database]
1443path = "{db_path}"
1444
1445[models]
1446primary = "ollama/qwen3:8b"
1447
1448[channels.telegram]
1449enabled = true
1450token_env = "TEST_TELEGRAM_MISSING"
1451
1452[channels.whatsapp]
1453enabled = true
1454token_env = "TEST_WHATSAPP_MISSING"
1455phone_number_id = ""
1456
1457[channels.discord]
1458enabled = true
1459token_env = "TEST_DISCORD_MISSING"
1460
1461[channels.signal]
1462enabled = true
1463phone_number = ""
1464
1465[channels.email]
1466enabled = true
1467password_env = "TEST_EMAIL_PASSWORD_MISSING"
1468
1469[channels.voice]
1470enabled = true
1471
1472[obsidian]
1473enabled = true
1474vault_path = "{vault_path}"
1475watch_for_changes = true
1476"#,
1477            db_path = db_path.display(),
1478            vault_path = dir.path().join("missing-vault").display(),
1479        );
1480        let config = RoboticusConfig::from_str(&cfg).expect("parse config");
1481        let result = bootstrap(config).await;
1482        assert!(
1483            result.is_ok(),
1484            "bootstrap should tolerate disabled channel credentials and invalid obsidian path: {:?}",
1485            result.err()
1486        );
1487    }
1488
1489    #[tokio::test]
1490    async fn bootstrap_loads_taskable_subagents_from_database() {
1491        let dir = tempfile::tempdir().unwrap();
1492        let db_path = dir.path().join("state.db");
1493        let db = roboticus_db::Database::new(db_path.to_str().unwrap()).unwrap();
1494
1495        roboticus_db::agents::upsert_sub_agent(
1496            &db,
1497            &roboticus_db::agents::SubAgentRow {
1498                id: "sa-1".into(),
1499                name: "finance_specialist".into(),
1500                display_name: Some("Finance Specialist".into()),
1501                model: "auto".into(),
1502                fallback_models_json: Some("[]".into()),
1503                role: "subagent".into(),
1504                description: Some("Handles finance tasks".into()),
1505                skills_json: Some(r#"["budgeting"]"#.into()),
1506                enabled: true,
1507                session_count: 0,
1508                last_used_at: None,
1509            },
1510        )
1511        .unwrap();
1512        roboticus_db::agents::upsert_sub_agent(
1513            &db,
1514            &roboticus_db::agents::SubAgentRow {
1515                id: "sa-2".into(),
1516                name: "observer".into(),
1517                display_name: Some("Observer".into()),
1518                model: "ollama/qwen3:8b".into(),
1519                fallback_models_json: Some("[]".into()),
1520                role: "model-proxy".into(),
1521                description: Some("Not taskable".into()),
1522                skills_json: None,
1523                enabled: true,
1524                session_count: 0,
1525                last_used_at: None,
1526            },
1527        )
1528        .unwrap();
1529
1530        let cfg = format!(
1531            r#"
1532[agent]
1533name = "T"
1534id = "t"
1535
1536[server]
1537bind = "127.0.0.1"
1538port = 18789
1539
1540[database]
1541path = "{db_path}"
1542
1543[models]
1544primary = "ollama/qwen3:8b"
1545"#,
1546            db_path = db_path.display(),
1547        );
1548        let config = RoboticusConfig::from_str(&cfg).expect("parse config");
1549        let result = bootstrap(config).await;
1550        assert!(
1551            result.is_ok(),
1552            "bootstrap should register enabled taskable subagents from db: {:?}",
1553            result.err()
1554        );
1555    }
1556
1557    #[test]
1558    fn cleanup_old_logs_no_panic_on_missing_dir() {
1559        let dir = std::path::Path::new("/tmp/roboticus-test-nonexistent-dir-cleanup");
1560        cleanup_old_logs(dir, 30);
1561    }
1562
1563    #[test]
1564    fn cleanup_old_logs_keeps_recent_logs() {
1565        let dir = tempfile::tempdir().unwrap();
1566        let recent = dir.path().join("recent.log");
1567        std::fs::write(&recent, "fresh log").unwrap();
1568
1569        cleanup_old_logs(dir.path(), 30);
1570        assert!(recent.exists(), "recent logs should remain");
1571    }
1572
1573    #[test]
1574    fn cleanup_old_logs_ignores_non_log_files() {
1575        let dir = tempfile::tempdir().unwrap();
1576        let txt = dir.path().join("data.txt");
1577        std::fs::write(&txt, "keep me").unwrap();
1578
1579        cleanup_old_logs(dir.path(), 0);
1580        assert!(txt.exists(), "non-log files should not be deleted");
1581    }
1582
1583    #[test]
1584    fn cleanup_old_logs_empty_dir() {
1585        let dir = tempfile::tempdir().unwrap();
1586        cleanup_old_logs(dir.path(), 1);
1587    }
1588
1589    #[test]
1590    fn taskable_subagent_roles_are_strict() {
1591        assert!(is_taskable_subagent_role("subagent"));
1592        assert!(is_taskable_subagent_role("specialist"));
1593        assert!(is_taskable_subagent_role("SubAgent"));
1594        assert!(!is_taskable_subagent_role("model-proxy"));
1595    }
1596
1597    #[test]
1598    fn resolve_token_prefers_keystore_reference_then_env_then_empty() {
1599        let dir = tempfile::tempdir().unwrap();
1600        let keystore_path = dir.path().join("keystore.enc");
1601        let keystore = roboticus_core::keystore::Keystore::new(keystore_path);
1602        keystore.unlock("pw").unwrap();
1603        keystore.set("telegram_bot_token", "from_keystore").unwrap();
1604
1605        let _env = EnvGuard::set("TEST_TELEGRAM_TOKEN", "from_env");
1606        let token = resolve_token(
1607            &Some("keystore:telegram_bot_token".to_string()),
1608            "TEST_TELEGRAM_TOKEN",
1609            &keystore,
1610        );
1611        assert_eq!(token, "from_keystore");
1612
1613        let fallback = resolve_token(
1614            &Some("keystore:missing".to_string()),
1615            "TEST_TELEGRAM_TOKEN",
1616            &keystore,
1617        );
1618        assert_eq!(fallback, "from_env");
1619
1620        let empty = resolve_token(&None, "", &keystore);
1621        assert!(empty.is_empty());
1622    }
1623
1624    #[test]
1625    fn resolve_token_uses_env_when_no_keystore_ref_is_provided() {
1626        let dir = tempfile::tempdir().unwrap();
1627        let keystore_path = dir.path().join("keystore.enc");
1628        let keystore = roboticus_core::keystore::Keystore::new(keystore_path);
1629        keystore.unlock("pw").unwrap();
1630
1631        let _env = EnvGuard::set("TEST_SIGNAL_TOKEN", "from_env_only");
1632        let token = resolve_token(&None, "TEST_SIGNAL_TOKEN", &keystore);
1633        assert_eq!(token, "from_env_only");
1634    }
1635
1636    #[test]
1637    fn resolve_token_returns_empty_for_empty_env_var() {
1638        let dir = tempfile::tempdir().unwrap();
1639        let keystore_path = dir.path().join("keystore.enc");
1640        let keystore = roboticus_core::keystore::Keystore::new(keystore_path);
1641        keystore.unlock("pw").unwrap();
1642
1643        let _env = EnvGuard::set("TEST_EMPTY_TOKEN", "");
1644        let token = resolve_token(&None, "TEST_EMPTY_TOKEN", &keystore);
1645        assert!(token.is_empty());
1646    }
1647
1648    #[test]
1649    fn cleanup_old_logs_can_prune_when_window_is_zero_days() {
1650        let dir = tempfile::tempdir().unwrap();
1651        let old_log = dir.path().join("old.log");
1652        std::fs::write(&old_log, "stale").unwrap();
1653        std::thread::sleep(std::time::Duration::from_millis(10));
1654        cleanup_old_logs(dir.path(), 0);
1655        assert!(!old_log.exists());
1656    }
1657}