Skip to main content

roboticus_server/
lib.rs

1//! # roboticus-server
2//!
3//! Top-level binary crate that assembles all Roboticus workspace crates into a
4//! single runtime. The [`bootstrap()`] function initializes the database,
5//! wallet, LLM pipeline, agent loop, channel adapters, and background daemons,
6//! then returns an axum `Router` ready to serve.
7//!
8//! ## Key Types
9//!
10//! - [`AppState`] -- Shared application state passed to all route handlers
11//! - [`PersonalityState`] -- Loaded personality files (OS, firmware, identity)
12//! - [`EventBus`] -- Tokio broadcast channel for WebSocket event push
13//!
14//! ## Modules
15//!
16//! - `api` -- REST API mount point, `build_router()`, route modules
17//! - `auth` -- API key authentication middleware layer
18//! - `rate_limit` -- Global + per-IP rate limiting (sliding window)
19//! - `dashboard` -- Embedded SPA serving (compile-time or filesystem)
20//! - `ws` -- WebSocket upgrade and event broadcasting
21//! - `cli/` -- CLI command handlers (serve, status, sessions, memory, wallet, etc.)
22//! - `daemon` -- Daemon install, status, uninstall
23//! - `migrate/` -- Migration engine, skill import/export
24//! - `plugins` -- Plugin registry initialization and loading
25//!
26//! ## Bootstrap Sequence
27//!
28//! 1. Parse CLI → load config → init DB → load wallet → generate HMAC secret
29//! 2. Init LLM client + router + embedding → load semantic cache
30//! 3. Init agent loop + tool registry + memory retriever
31//! 4. Register channel adapters (Telegram, WhatsApp, Discord, Signal)
32//! 5. Spawn background daemons (heartbeat, cron, cache flush, ANN rebuild)
33//! 6. Build axum router with auth + CORS + rate limiting
34
35// ── Re-exports from roboticus-api ──────────────────────────────
36pub use roboticus_api as api_crate;
37pub use roboticus_api::abuse;
38pub use roboticus_api::api;
39pub use roboticus_api::auth;
40pub use roboticus_api::config_runtime;
41pub use roboticus_api::cron_runtime;
42pub use roboticus_api::dashboard;
43pub use roboticus_api::rate_limit;
44pub use roboticus_api::ws;
45pub use roboticus_api::ws_ticket;
46pub use roboticus_api::{
47    AppState, EventBus, PersonalityState, TicketStore, build_dashboard_html, build_mcp_router,
48    build_public_router, build_router, dashboard_handler, ws_route,
49};
50
51// ── Re-exports from roboticus-cli ─────────────────────────────
52pub use roboticus_cli::cli;
53pub use roboticus_cli::migrate;
54
55// ── Server-local modules ─────────────────────────────────────
56pub mod config_maintenance;
57pub mod daemon;
58pub mod plugins;
59pub use roboticus_cli::state_hygiene;
60
61use std::sync::Arc;
62use std::sync::OnceLock;
63use std::sync::atomic::{AtomicBool, Ordering};
64use std::time::Duration;
65
66use tokio::sync::RwLock;
67use tower_http::cors::CorsLayer;
68
69use auth::ApiKeyLayer;
70use roboticus_agent::policy::{
71    AuthorityRule, CommandSafetyRule, ConfigProtectionRule, FinancialRule, PathProtectionRule,
72    PolicyEngine, RateLimitRule, ValidationRule,
73};
74use roboticus_agent::subagents::SubagentRegistry;
75use roboticus_browser::Browser;
76use roboticus_channels::ChannelAdapter;
77use roboticus_channels::a2a::A2aProtocol;
78use roboticus_channels::router::ChannelRouter;
79use roboticus_channels::telegram::TelegramAdapter;
80use roboticus_channels::whatsapp::WhatsAppAdapter;
81use roboticus_core::RoboticusConfig;
82use roboticus_db::Database;
83use roboticus_llm::LlmService;
84use roboticus_llm::OAuthManager;
85use roboticus_wallet::{WalletPaymentHandler, WalletService};
86
87use roboticus_agent::approvals::ApprovalManager;
88use roboticus_agent::obsidian::ObsidianVault;
89use roboticus_agent::obsidian_tools::{ObsidianReadTool, ObsidianSearchTool, ObsidianWriteTool};
90use roboticus_agent::tools::{
91    BashTool, EchoTool, EditFileTool, GlobFilesTool, ListDirectoryTool, ReadFileTool,
92    ScriptRunnerTool, SearchFilesTool, ToolRegistry, WriteFileTool,
93};
94use roboticus_channels::discord::DiscordAdapter;
95use roboticus_channels::email::EmailAdapter;
96use roboticus_channels::matrix::MatrixAdapter;
97use roboticus_channels::signal::SignalAdapter;
98use roboticus_channels::voice::{VoiceConfig, VoicePipeline};
99
100use rate_limit::GlobalRateLimitLayer;
101
102static STDERR_ENABLED: AtomicBool = AtomicBool::new(false);
103static LOG_GUARD: OnceLock<tracing_appender::non_blocking::WorkerGuard> = OnceLock::new();
104
105fn is_taskable_subagent_role(role: &str) -> bool {
106    role.eq_ignore_ascii_case("subagent")
107        || role.eq_ignore_ascii_case("specialist")
108        || role.eq_ignore_ascii_case("observer")
109}
110
111pub fn enable_stderr_logging() {
112    STDERR_ENABLED.store(true, Ordering::Release);
113}
114
115fn init_logging(config: &RoboticusConfig) {
116    use tracing_appender::rolling::{RollingFileAppender, Rotation};
117    use tracing_subscriber::EnvFilter;
118    use tracing_subscriber::Layer;
119    use tracing_subscriber::filter::filter_fn;
120    use tracing_subscriber::fmt;
121    use tracing_subscriber::layer::SubscriberExt;
122    use tracing_subscriber::util::SubscriberInitExt;
123
124    let level = config.agent.log_level.as_str();
125    // Promote hyper errors to warn so IncompleteMessage and connection
126    // resets are visible in production logs (they default to DEBUG).
127    let base_filter = format!("{level},hyper=warn,h2=warn,rustls=warn");
128    let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&base_filter));
129
130    let stderr_gate = filter_fn(|_| STDERR_ENABLED.load(Ordering::Acquire));
131
132    let log_dir = &config.server.log_dir;
133    let stderr_layer = fmt::layer()
134        .with_writer(std::io::stderr)
135        .with_filter(stderr_gate);
136
137    let file_appender = std::fs::create_dir_all(log_dir).ok().and_then(|_| {
138        RollingFileAppender::builder()
139            .rotation(Rotation::DAILY)
140            .filename_prefix("roboticus")
141            .filename_suffix("log")
142            .build(log_dir)
143            .map_err(|e| {
144                eprintln!(
145                    "warning: failed to initialize file logging in {}: {e}",
146                    log_dir.display()
147                );
148                e
149            })
150            .ok()
151    });
152
153    if let Some(file_appender) = file_appender {
154        let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
155        let _ = LOG_GUARD.set(guard);
156
157        let file_layer = fmt::layer()
158            .with_writer(non_blocking)
159            .with_ansi(false)
160            .json();
161
162        let _ = tracing_subscriber::registry()
163            .with(filter)
164            .with(stderr_layer)
165            .with(file_layer)
166            .try_init();
167    } else {
168        let _ = tracing_subscriber::registry()
169            .with(filter)
170            .with(stderr_layer)
171            .try_init();
172    }
173
174    cleanup_old_logs(log_dir, config.server.log_max_days);
175}
176
177fn cleanup_old_logs(log_dir: &std::path::Path, max_days: u32) {
178    let cutoff =
179        std::time::SystemTime::now() - std::time::Duration::from_secs(u64::from(max_days) * 86400);
180
181    let entries = match std::fs::read_dir(log_dir) {
182        Ok(e) => e,
183        Err(_) => return,
184    };
185
186    for entry in entries.flatten() {
187        let path = entry.path();
188        if path.extension().and_then(|e| e.to_str()) != Some("log") {
189            continue;
190        }
191        if let Ok(meta) = entry.metadata()
192            && let Ok(modified) = meta.modified()
193            && modified < cutoff
194        {
195            // best-effort: log rotation cleanup failure is non-critical
196            let _ = std::fs::remove_file(&path);
197        }
198    }
199}
200
201/// Resolve a channel token: keystore reference first, then env var fallback.
202fn resolve_token(
203    token_ref: &Option<String>,
204    token_env: &str,
205    keystore: &roboticus_core::Keystore,
206) -> String {
207    if let Some(r) = token_ref
208        && let Some(name) = r.strip_prefix("keystore:")
209    {
210        if let Some(val) = keystore.get(name) {
211            return val;
212        }
213        tracing::warn!(key = %name, "keystore reference not found, falling back to env var");
214    }
215    if !token_env.is_empty() {
216        return match std::env::var(token_env) {
217            Ok(val) if !val.is_empty() => val,
218            Ok(_) => {
219                tracing::warn!(env_var = %token_env, "API key env var is set but empty");
220                String::new()
221            }
222            Err(_) => {
223                tracing::warn!(env_var = %token_env, "API key env var is not set");
224                String::new()
225            }
226        };
227    }
228    String::new()
229}
230
231/// Builds the application state and router from config. Used by the binary and by tests.
232pub async fn bootstrap(
233    config: RoboticusConfig,
234) -> Result<axum::Router, Box<dyn std::error::Error>> {
235    bootstrap_with_config_path(config, None).await
236}
237
238/// Lightweight bootstrap phase tracker. Prints dots while running,
239/// then appends `ok` or `(Xs)` when done.
240struct BootPhase {
241    start: std::time::Instant,
242    dot_handle: tokio::task::JoinHandle<()>,
243}
244
245impl BootPhase {
246    fn start(label: &str) -> Self {
247        use std::io::Write;
248        eprint!("  {label} ");
249        let _ = std::io::stderr().flush();
250        let dot_handle = tokio::spawn(async {
251            loop {
252                tokio::time::sleep(std::time::Duration::from_millis(500)).await;
253                eprint!(".");
254                let _ = std::io::stderr().flush();
255            }
256        });
257        Self {
258            start: std::time::Instant::now(),
259            dot_handle,
260        }
261    }
262
263    fn done(self) {
264        self.dot_handle.abort();
265        let elapsed = self.start.elapsed();
266        if elapsed.as_millis() >= 500 {
267            eprintln!("({:.1}s)", elapsed.as_secs_f64());
268        } else {
269            eprintln!("ok");
270        }
271    }
272}
273
274pub async fn bootstrap_with_config_path(
275    config: RoboticusConfig,
276    config_path: Option<std::path::PathBuf>,
277) -> Result<axum::Router, Box<dyn std::error::Error>> {
278    init_logging(&config);
279
280    let personality_state = api::PersonalityState::from_workspace(&config.agent.workspace);
281
282    if !personality_state.os_text.is_empty() {
283        tracing::info!(
284            personality = %personality_state.identity.name,
285            generated_by = %personality_state.identity.generated_by,
286            "Loaded personality files from workspace"
287        );
288    } else {
289        tracing::info!("No personality files found in workspace, using defaults");
290    }
291
292    let db_path = config.database.path.to_string_lossy().to_string();
293    let bp = BootPhase::start("database");
294    let db = Database::new(&db_path)?;
295    bp.done();
296    match crate::state_hygiene::run_state_hygiene(&config.database.path) {
297        Ok(report) if report.changed => {
298            tracing::info!(
299                changed_rows = report.changed_rows,
300                subagent_rows_normalized = report.subagent_rows_normalized,
301                cron_payload_rows_repaired = report.cron_payload_rows_repaired,
302                cron_jobs_disabled_invalid_expr = report.cron_jobs_disabled_invalid_expr,
303                "applied startup mechanic checks"
304            );
305        }
306        Ok(_) => {}
307        Err(e) => tracing::warn!(error = %e, "startup mechanic checks failed"),
308    }
309    match roboticus_db::sessions::backfill_nicknames(&db) {
310        Ok(0) => {}
311        Ok(n) => tracing::info!(count = n, "Backfilled session nicknames"),
312        Err(e) => tracing::warn!(error = %e, "Failed to backfill session nicknames"),
313    }
314    let bp = BootPhase::start("LLM service");
315    let mut llm = LlmService::new(&config)?;
316    // Seed quality tracker with recent observations so metascore routing
317    // has a warm start instead of assuming 0.8 for every model.
318    match roboticus_db::metrics::recent_quality_scores(&db, 200) {
319        Ok(scores) if !scores.is_empty() => llm.quality.seed_from_history(&scores),
320        Ok(_) => {}
321        Err(e) => tracing::warn!(error = %e, "failed to seed quality tracker from history"),
322    }
323    // Seed pre-computed baselines for common models (only applies to models
324    // with zero existing observations — never overwrites real data).
325    llm.quality
326        .seed_from_baselines(roboticus_llm::exercise::COMMON_MODEL_BASELINES);
327
328    // Detect zero-observation models and log onboarding prompts.
329    {
330        let mut candidates = vec![config.models.primary.clone()];
331        candidates.extend(config.models.fallbacks.iter().cloned());
332        for model in &candidates {
333            if llm.quality.observation_count(model) == 0 {
334                tracing::info!(
335                    model = model.as_str(),
336                    "model has no performance data — run 'roboticus models exercise {}' to baseline it",
337                    model
338                );
339            }
340        }
341    }
342    bp.done();
343    let bp = BootPhase::start("wallet");
344    let wallet = match tokio::time::timeout(
345        std::time::Duration::from_secs(30),
346        WalletService::new(&config),
347    )
348    .await
349    {
350        Ok(result) => result?,
351        Err(_) => {
352            bp.done();
353            return Err("wallet init timed out (30s) — check RPC endpoint connectivity".into());
354        }
355    };
356    bp.done();
357
358    // Wire x402 payment handler: when the LLM client hits an HTTP 402, it
359    // signs an EIP-3009 authorization via the agent wallet and retries.
360    // The treasury policy enforces per-payment caps from configuration.
361    let wallet_arc = Arc::new(wallet.wallet.clone());
362    let treasury_policy = roboticus_wallet::treasury::TreasuryPolicy::new(&config.treasury);
363    let x402_handler =
364        Arc::new(WalletPaymentHandler::new(wallet_arc).with_treasury_policy(treasury_policy));
365    llm.set_payment_handler(x402_handler);
366
367    let bp = BootPhase::start("protocols + plugins");
368    let a2a = A2aProtocol::new(config.a2a.clone());
369    let plugin_env = {
370        let mut env = std::collections::HashMap::new();
371        env.insert("ROBOTICUS_DELEGATION_DEPTH".into(), "0".into());
372        env.insert(
373            "ROBOTICUS_WORKSPACE".into(),
374            config.agent.workspace.display().to_string(),
375        );
376        if let Some(ref ws) = config.skills.workspace_dir {
377            env.insert("ROBOTICUS_SKILLS_DIR".into(), ws.display().to_string());
378        }
379        env
380    };
381    let plugin_registry = plugins::init_plugin_registry(&config.plugins, plugin_env).await;
382    let mut policy_engine = PolicyEngine::new();
383    policy_engine.add_rule(Box::new(AuthorityRule));
384    policy_engine.add_rule(Box::new(CommandSafetyRule));
385    policy_engine.add_rule(Box::new(FinancialRule::new(
386        config.treasury.per_payment_cap,
387    )));
388    policy_engine.add_rule(Box::new(PathProtectionRule::from_config(
389        &config.security.filesystem,
390    )));
391    policy_engine.add_rule(Box::new(RateLimitRule::default()));
392    policy_engine.add_rule(Box::new(ValidationRule));
393    policy_engine.add_rule(Box::new(ConfigProtectionRule::default()));
394    let policy_engine = Arc::new(policy_engine);
395    let browser = Arc::new(Browser::new(config.browser.clone()));
396    let registry = Arc::new(SubagentRegistry::new(4, vec![]));
397
398    if let Ok(sub_agents) = roboticus_db::agents::list_enabled_sub_agents(&db) {
399        for sa in &sub_agents {
400            if !is_taskable_subagent_role(&sa.role) {
401                continue;
402            }
403            let resolved_model = match sa.model.trim().to_ascii_lowercase().as_str() {
404                "auto" | "orchestrator" => llm.router.select_model().to_string(),
405                _ => sa.model.clone(),
406            };
407            let fixed_skills = sa
408                .skills_json
409                .as_deref()
410                .and_then(|s| serde_json::from_str::<Vec<String>>(s).ok())
411                .unwrap_or_default();
412            let agent_config = roboticus_agent::subagents::AgentInstanceConfig {
413                id: sa.name.clone(),
414                name: sa.display_name.clone().unwrap_or_else(|| sa.name.clone()),
415                model: resolved_model,
416                skills: fixed_skills,
417                allowed_subagents: vec![],
418                max_concurrent: 4,
419            };
420            if let Err(e) = registry.register(agent_config).await {
421                tracing::warn!(agent = %sa.name, err = %e, "failed to register sub-agent");
422            } else if let Err(e) = registry.start_agent(&sa.name).await {
423                tracing::warn!(agent = %sa.name, err = %e, "failed to auto-start sub-agent");
424            }
425        }
426        if !sub_agents.is_empty() {
427            tracing::info!(
428                count = sub_agents.len(),
429                "registered sub-agents from database"
430            );
431        }
432    }
433
434    let event_bus = EventBus::new(256);
435
436    let keystore =
437        roboticus_core::keystore::Keystore::new(roboticus_core::keystore::Keystore::default_path());
438    if let Err(e) = keystore.unlock_machine() {
439        tracing::warn!("keystore auto-unlock failed: {e}");
440    }
441    let keystore = Arc::new(keystore);
442
443    bp.done();
444    let bp = BootPhase::start("channels");
445    let channel_router = Arc::new(ChannelRouter::with_store(db.clone()).await);
446    let telegram: Option<Arc<TelegramAdapter>> =
447        if let Some(ref tg_config) = config.channels.telegram {
448            if tg_config.enabled {
449                let token = resolve_token(&tg_config.token_ref, &tg_config.token_env, &keystore);
450                if !token.is_empty() {
451                    let adapter = Arc::new(TelegramAdapter::with_config(
452                        token,
453                        tg_config.poll_timeout_seconds,
454                        tg_config.allowed_chat_ids.clone(),
455                        tg_config.webhook_secret.clone(),
456                        config.security.deny_on_empty_allowlist,
457                    ));
458                    channel_router
459                        .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
460                        .await;
461                    tracing::info!("Telegram adapter registered");
462                    if tg_config.webhook_secret.is_none() {
463                        tracing::warn!(
464                            "Telegram webhook_secret not set; webhook endpoint will reject with 503"
465                        );
466                    }
467                    Some(adapter)
468                } else {
469                    tracing::warn!(
470                        token_env = %tg_config.token_env,
471                        "Telegram enabled but token is empty"
472                    );
473                    None
474                }
475            } else {
476                None
477            }
478        } else {
479            None
480        };
481
482    let whatsapp: Option<Arc<WhatsAppAdapter>> =
483        if let Some(ref wa_config) = config.channels.whatsapp {
484            if wa_config.enabled {
485                let token = resolve_token(&wa_config.token_ref, &wa_config.token_env, &keystore);
486                if !token.is_empty() && !wa_config.phone_number_id.is_empty() {
487                    let adapter = Arc::new(WhatsAppAdapter::with_config(
488                        token,
489                        wa_config.phone_number_id.clone(),
490                        wa_config.verify_token.clone(),
491                        wa_config.allowed_numbers.clone(),
492                        wa_config.app_secret.clone(),
493                        config.security.deny_on_empty_allowlist,
494                    )?);
495                    channel_router
496                        .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
497                        .await;
498                    tracing::info!("WhatsApp adapter registered");
499                    if wa_config.app_secret.is_none() {
500                        tracing::warn!(
501                            "WhatsApp app_secret not set; webhook endpoint will reject with 503"
502                        );
503                    }
504                    Some(adapter)
505                } else {
506                    tracing::warn!("WhatsApp enabled but token env or phone_number_id is empty");
507                    None
508                }
509            } else {
510                None
511            }
512        } else {
513            None
514        };
515
516    let discord: Option<Arc<DiscordAdapter>> = if let Some(ref dc_config) = config.channels.discord
517    {
518        if dc_config.enabled {
519            let token = resolve_token(&dc_config.token_ref, &dc_config.token_env, &keystore);
520            if !token.is_empty() {
521                let adapter = Arc::new(DiscordAdapter::with_config(
522                    token,
523                    dc_config.allowed_guild_ids.clone(),
524                    config.security.deny_on_empty_allowlist,
525                ));
526                channel_router
527                    .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
528                    .await;
529                // Start WebSocket gateway for real-time event reception
530                if let Err(e) = adapter.connect_gateway().await {
531                    tracing::error!(error = %e, "Failed to connect Discord gateway");
532                }
533                tracing::info!("Discord adapter registered with gateway");
534                Some(adapter)
535            } else {
536                tracing::warn!(
537                    token_env = %dc_config.token_env,
538                    "Discord enabled but token env var is empty"
539                );
540                None
541            }
542        } else {
543            None
544        }
545    } else {
546        None
547    };
548
549    let signal: Option<Arc<SignalAdapter>> = if let Some(ref sig_config) = config.channels.signal {
550        if sig_config.enabled {
551            if !sig_config.phone_number.is_empty() {
552                let adapter = Arc::new(SignalAdapter::with_config(
553                    sig_config.phone_number.clone(),
554                    sig_config.daemon_url.clone(),
555                    sig_config.allowed_numbers.clone(),
556                    config.security.deny_on_empty_allowlist,
557                ));
558                channel_router
559                    .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
560                    .await;
561                tracing::info!("Signal adapter registered");
562                Some(adapter)
563            } else {
564                tracing::warn!("Signal enabled but phone_number is empty");
565                None
566            }
567        } else {
568            None
569        }
570    } else {
571        None
572    };
573
574    let email: Option<Arc<EmailAdapter>> = if config.channels.email.enabled {
575        let email_cfg = &config.channels.email;
576        let password = if email_cfg.password_env.is_empty() {
577            String::new()
578        } else {
579            match std::env::var(&email_cfg.password_env) {
580                Ok(val) => val,
581                Err(_) => {
582                    tracing::warn!(env_var = %email_cfg.password_env, "email password env var is not set");
583                    String::new()
584                }
585            }
586        };
587        if email_cfg.smtp_host.is_empty()
588            || email_cfg.username.is_empty()
589            || password.is_empty()
590            || email_cfg.from_address.is_empty()
591        {
592            tracing::warn!("Email enabled but SMTP credentials are incomplete");
593            None
594        } else {
595            match EmailAdapter::new(
596                email_cfg.from_address.clone(),
597                email_cfg.smtp_host.clone(),
598                email_cfg.smtp_port,
599                email_cfg.imap_host.clone(),
600                email_cfg.imap_port,
601                email_cfg.username.clone(),
602                password,
603            ) {
604                Ok(email_adapter) => {
605                    // Resolve OAuth2 token from env if configured
606                    let oauth2_token =
607                        if email_cfg.use_oauth2 && !email_cfg.oauth2_token_env.is_empty() {
608                            std::env::var(&email_cfg.oauth2_token_env).ok()
609                        } else {
610                            None
611                        };
612                    let adapter = Arc::new(
613                        email_adapter
614                            .with_allowed_senders(email_cfg.allowed_senders.clone())
615                            .with_deny_on_empty(config.security.deny_on_empty_allowlist)
616                            .with_poll_interval(std::time::Duration::from_secs(
617                                email_cfg.poll_interval_seconds,
618                            ))
619                            .with_oauth2_token(oauth2_token)
620                            .with_imap_idle_enabled(email_cfg.imap_idle_enabled),
621                    );
622                    channel_router
623                        .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
624                        .await;
625                    tracing::info!("Email adapter registered");
626
627                    // Start background IMAP listener if IMAP is configured
628                    if !email_cfg.imap_host.is_empty()
629                        && let Err(e) = adapter.start_imap_listener().await
630                    {
631                        tracing::error!(error = %e, "Failed to start email IMAP listener");
632                    }
633
634                    Some(adapter)
635                }
636                Err(e) => {
637                    tracing::error!(error = %e, "Failed to create email adapter");
638                    None
639                }
640            }
641        }
642    } else {
643        None
644    };
645
646    let voice: Option<Arc<RwLock<VoicePipeline>>> = if config.channels.voice.enabled {
647        let mut voice_config = VoiceConfig::default();
648        if let Some(stt) = &config.channels.voice.stt_model {
649            voice_config.stt_model = stt.clone();
650        }
651        if let Some(tts) = &config.channels.voice.tts_model {
652            voice_config.tts_model = tts.clone();
653        }
654        if let Some(v) = &config.channels.voice.tts_voice {
655            voice_config.tts_voice = v.clone();
656        }
657        if let Ok(key) = std::env::var("OPENAI_API_KEY")
658            && !key.is_empty()
659        {
660            voice_config.api_key = Some(key);
661        }
662        tracing::info!("Voice pipeline initialized");
663        Some(Arc::new(RwLock::new(VoicePipeline::new(voice_config))))
664    } else {
665        None
666    };
667
668    let _matrix: Option<Arc<MatrixAdapter>> = if let Some(ref mx_config) = config.channels.matrix {
669        if mx_config.enabled {
670            let token = resolve_token(&None, &mx_config.access_token_env, &keystore);
671            if !token.is_empty() && !mx_config.homeserver_url.is_empty() {
672                let mut adapter = MatrixAdapter::new(
673                    mx_config.homeserver_url.clone(),
674                    token,
675                    mx_config.allowed_rooms.clone(),
676                    mx_config.auto_join,
677                    mx_config.sync_timeout_seconds,
678                );
679                if mx_config.encryption_enabled {
680                    let store_path = mx_config.device_store_path.clone().unwrap_or_else(|| {
681                        roboticus_core::home_dir()
682                            .join(".roboticus")
683                            .join("matrix_crypto")
684                    });
685                    match roboticus_channels::matrix_crypto::MatrixCrypto::new(
686                        &store_path,
687                        &mx_config.device_display_name,
688                    ) {
689                        Ok(crypto) => {
690                            adapter = adapter.with_crypto(Arc::new(crypto));
691                            tracing::info!("Matrix E2EE enabled");
692                        }
693                        Err(e) => {
694                            tracing::error!(error = %e, "Failed to init Matrix crypto; running unencrypted");
695                        }
696                    }
697                }
698                let adapter = Arc::new(adapter);
699                channel_router
700                    .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
701                    .await;
702                tracing::info!("Matrix adapter registered");
703                Some(adapter)
704            } else {
705                tracing::warn!("Matrix enabled but homeserver_url or access token is empty");
706                None
707            }
708        } else {
709            None
710        }
711    } else {
712        None
713    };
714
715    let hmac_secret = {
716        use rand::RngCore;
717        let mut buf = vec![0u8; 32];
718        rand::rngs::OsRng.fill_bytes(&mut buf);
719        buf
720    };
721
722    let retriever = Arc::new(roboticus_agent::retrieval::MemoryRetriever::new(
723        config.memory.clone(),
724    ));
725
726    let mut tool_registry = ToolRegistry::new();
727    tool_registry.register(Box::new(EchoTool));
728    tool_registry.register(Box::new(BashTool));
729    tool_registry.register(Box::new(ScriptRunnerTool::new(
730        config.skills.clone(),
731        config.security.filesystem.clone(),
732    )));
733    tool_registry.register(Box::new(ReadFileTool));
734    tool_registry.register(Box::new(WriteFileTool));
735    tool_registry.register(Box::new(EditFileTool));
736    tool_registry.register(Box::new(ListDirectoryTool));
737    tool_registry.register(Box::new(GlobFilesTool));
738    tool_registry.register(Box::new(SearchFilesTool));
739
740    // Cron tool — manages scheduled jobs through the tool registry
741    use roboticus_agent::tools::CronTool;
742    tool_registry.register(Box::new(CronTool));
743
744    // Introspection tools — read-only runtime probes for agent self-awareness
745    use roboticus_agent::tools::{
746        GetChannelHealthTool, GetMemoryStatsTool, GetRuntimeContextTool, GetSubagentStatusTool,
747        RecallMemoryTool,
748    };
749    tool_registry.register(Box::new(GetRuntimeContextTool));
750    tool_registry.register(Box::new(GetMemoryStatsTool));
751    tool_registry.register(Box::new(GetChannelHealthTool));
752    tool_registry.register(Box::new(GetSubagentStatusTool));
753    tool_registry.register(Box::new(RecallMemoryTool));
754
755    // Data tools — agent-managed tables via hippocampus
756    use roboticus_agent::tools::{AlterTableTool, CreateTableTool, DropTableTool};
757    tool_registry.register(Box::new(CreateTableTool));
758    tool_registry.register(Box::new(AlterTableTool));
759    tool_registry.register(Box::new(DropTableTool));
760
761    // Bridge active plugin tools into the same runtime registry used by ReAct.
762    bp.done();
763    let bp = BootPhase::start("plugin tools");
764    plugins::register_plugin_tools(&mut tool_registry, Arc::clone(&plugin_registry)).await;
765    bp.done();
766
767    // Obsidian vault integration
768    let obsidian_vault: Option<Arc<RwLock<ObsidianVault>>> = if config.obsidian.enabled {
769        match ObsidianVault::from_config(&config.obsidian) {
770            Ok(vault) => {
771                let vault = Arc::new(RwLock::new(vault));
772                tool_registry.register(Box::new(ObsidianReadTool::new(Arc::clone(&vault))));
773                tool_registry.register(Box::new(ObsidianWriteTool::new(Arc::clone(&vault))));
774                tool_registry.register(Box::new(ObsidianSearchTool::new(Arc::clone(&vault))));
775                tracing::info!("Obsidian vault integration enabled");
776                Some(vault)
777            }
778            Err(e) => {
779                tracing::warn!(error = %e, "Failed to initialize Obsidian vault");
780                None
781            }
782        }
783    } else {
784        None
785    };
786
787    // Start vault file watcher if configured
788    if let Some(ref vault) = obsidian_vault
789        && config.obsidian.watch_for_changes
790    {
791        match roboticus_agent::obsidian::watcher::VaultWatcher::start(Arc::clone(vault)).await {
792            Ok(_watcher) => {
793                // watcher lives in the spawned task — drop is fine here
794                tracing::info!("Obsidian vault file watcher started");
795            }
796            Err(e) => {
797                tracing::warn!(error = %e, "Failed to start Obsidian vault watcher");
798            }
799        }
800    }
801
802    let tool_registry = Arc::new(tool_registry);
803
804    let capabilities = Arc::new(roboticus_agent::capability::CapabilityRegistry::new());
805    if let Err(e) = capabilities
806        .sync_from_tool_registry(Arc::clone(&tool_registry))
807        .await
808    {
809        tracing::warn!(error = %e, "initial capability sync from tool registry reported errors");
810    }
811
812    let approvals = Arc::new(ApprovalManager::new(config.approvals.clone()));
813
814    let oauth = Arc::new(OAuthManager::new()?);
815
816    let discovery_registry = Arc::new(RwLock::new(
817        roboticus_agent::discovery::DiscoveryRegistry::new(),
818    ));
819    let device_manager = Arc::new(RwLock::new(roboticus_agent::device::DeviceManager::new(
820        roboticus_agent::device::DeviceIdentity::generate(&config.agent.id),
821        config.devices.max_paired_devices,
822    )));
823
824    let mut mcp_clients = roboticus_agent::mcp::McpClientManager::new();
825    for c in &config.mcp.clients {
826        let mut conn =
827            roboticus_agent::mcp::McpClientConnection::new(c.name.clone(), c.url.clone());
828        if let Err(e) = conn.discover() {
829            tracing::warn!(mcp_name = %c.name, error = %e, "MCP client discovery failed at startup");
830        }
831        mcp_clients.add_connection(conn);
832    }
833    let mcp_clients = Arc::new(RwLock::new(mcp_clients));
834
835    let live_mcp = Arc::new(roboticus_agent::mcp::manager::McpConnectionManager::new());
836    let bp = BootPhase::start("MCP connections");
837    match tokio::time::timeout(
838        std::time::Duration::from_secs(30),
839        live_mcp.connect_all(&config.mcp.servers, &capabilities),
840    )
841    .await
842    {
843        Ok(()) => {}
844        Err(_) => {
845            tracing::warn!("MCP connect_all timed out after 30s; some servers may be unavailable");
846        }
847    }
848    bp.done();
849    let live_mcp_tool_count = live_mcp.connected_count().await;
850    if live_mcp_tool_count > 0 {
851        tracing::info!(
852            servers = live_mcp_tool_count,
853            "MCP client connections established"
854        );
855    }
856
857    let mut mcp_server_registry = roboticus_agent::mcp::McpServerRegistry::new();
858    let exported = roboticus_agent::mcp::export_tools_as_mcp(
859        &tool_registry
860            .list()
861            .iter()
862            .map(|t| {
863                (
864                    t.name().to_string(),
865                    t.description().to_string(),
866                    t.parameters_schema(),
867                )
868            })
869            .collect::<Vec<_>>(),
870    );
871    for tool in exported {
872        mcp_server_registry.register_tool(tool);
873    }
874    mcp_server_registry.register_resource(roboticus_agent::mcp::McpResource {
875        uri: "roboticus://sessions/active".to_string(),
876        name: "Active Sessions".to_string(),
877        description: "Active sessions in the local runtime".to_string(),
878        mime_type: "application/json".to_string(),
879    });
880    mcp_server_registry.register_resource(roboticus_agent::mcp::McpResource {
881        uri: "roboticus://metrics/capacity".to_string(),
882        name: "Provider Capacity Stats".to_string(),
883        description: "Current provider utilization and headroom".to_string(),
884        mime_type: "application/json".to_string(),
885    });
886    let mcp_server_registry = Arc::new(RwLock::new(mcp_server_registry));
887
888    let ann_index = roboticus_db::ann::AnnIndex::new(config.memory.ann_index);
889    if config.memory.ann_index {
890        match ann_index.build_from_db(&db) {
891            Ok(count) => {
892                if ann_index.is_built() {
893                    tracing::info!(count, "ANN index built from database");
894                } else {
895                    tracing::info!(
896                        count,
897                        min = ann_index.min_entries_for_index,
898                        "ANN index below threshold, brute-force search will be used"
899                    );
900                }
901            }
902            Err(e) => {
903                tracing::warn!(error = %e, "Failed to build ANN index, falling back to brute-force");
904            }
905        }
906    }
907
908    // Initialize media service for multimodal attachment handling
909    let media_service = if config.multimodal.enabled {
910        match roboticus_channels::media::MediaService::new(&config.multimodal) {
911            Ok(svc) => {
912                tracing::info!(
913                    media_dir = ?config.multimodal.media_dir,
914                    "Media service initialized"
915                );
916                Some(Arc::new(svc))
917            }
918            Err(e) => {
919                tracing::error!(error = %e, "Failed to initialize media service");
920                None
921            }
922        }
923    } else {
924        None
925    };
926
927    let resolved_config_path =
928        config_path.unwrap_or_else(crate::config_runtime::resolve_default_config_path);
929
930    // Build rate limiter once — AppState and the middleware layer share the same
931    // Arc<Mutex<…>> so admin observability sees live counters.
932    let rate_limiter = GlobalRateLimitLayer::new(
933        u64::from(config.server.rate_limit_requests),
934        Duration::from_secs(config.server.rate_limit_window_secs),
935    )
936    .with_per_ip_capacity(u64::from(config.server.per_ip_rate_limit_requests))
937    .with_per_actor_capacity(u64::from(config.server.per_actor_rate_limit_requests))
938    .with_trusted_proxy_cidrs(&config.server.trusted_proxy_cidrs);
939
940    // Semantic classifier: backed by the same embedding client as the rest of
941    // the pipeline. Centroid vectors are computed lazily on first classify call
942    // and cached for the lifetime of the process.
943    let semantic_classifier = Arc::new(
944        roboticus_llm::semantic_classifier::SemanticClassifier::new(llm.embedding.clone()),
945    );
946
947    let state = AppState {
948        db,
949        config: Arc::new(RwLock::new(config.clone())),
950        llm: Arc::new(RwLock::new(llm)),
951        wallet: Arc::new(wallet),
952        a2a: Arc::new(RwLock::new(a2a)),
953        personality: Arc::new(RwLock::new(personality_state)),
954        hmac_secret: Arc::new(hmac_secret),
955        interviews: Arc::new(RwLock::new(std::collections::HashMap::new())),
956        plugins: plugin_registry,
957        policy_engine,
958        browser,
959        registry,
960        event_bus: event_bus.clone(),
961        channel_router,
962        telegram,
963        whatsapp,
964        retriever,
965        ann_index,
966        tools: tool_registry,
967        capabilities,
968        approvals,
969        discord,
970        signal,
971        email,
972        voice,
973        media_service,
974        discovery: discovery_registry,
975        devices: device_manager,
976        mcp_clients,
977        mcp_server: mcp_server_registry,
978        live_mcp,
979        oauth,
980        keystore,
981        obsidian: obsidian_vault,
982        started_at: std::time::Instant::now(),
983        config_path: Arc::new(resolved_config_path.clone()),
984        config_apply_status: crate::config_runtime::status_for_path(&resolved_config_path),
985        pending_specialist_proposals: Arc::new(RwLock::new(std::collections::HashMap::new())),
986        ws_tickets: ws_ticket::TicketStore::new(),
987        rate_limiter: rate_limiter.clone(),
988        semantic_classifier,
989    };
990    let bp = BootPhase::start("capability sync");
991    state.resync_capabilities_from_tools().await;
992    bp.done();
993
994    // Periodic ANN index rebuild (every 10 minutes)
995    if config.memory.ann_index {
996        let ann_db = state.db.clone();
997        let ann_idx = state.ann_index.clone();
998        tokio::spawn(async move {
999            let mut interval = tokio::time::interval(std::time::Duration::from_secs(600));
1000            interval.tick().await;
1001            loop {
1002                interval.tick().await;
1003                match ann_idx.rebuild(&ann_db) {
1004                    Ok(count) => {
1005                        tracing::debug!(count, "ANN index rebuilt");
1006                    }
1007                    Err(e) => {
1008                        tracing::warn!(error = %e, "ANN index rebuild failed");
1009                    }
1010                }
1011            }
1012        });
1013        tracing::info!("ANN index rebuild daemon spawned (10min interval)");
1014    }
1015
1016    // Load persisted semantic cache
1017    {
1018        let loaded = roboticus_db::cache::load_cache_entries(&state.db)
1019            .inspect_err(|e| tracing::warn!(error = %e, "failed to load semantic cache entries"))
1020            .unwrap_or_default();
1021        if !loaded.is_empty() {
1022            let imported: Vec<(String, roboticus_llm::ExportedCacheEntry)> = loaded
1023                .into_iter()
1024                .map(|(id, pe)| {
1025                    let ttl = pe
1026                        .expires_at
1027                        .and_then(|e| {
1028                            chrono::NaiveDateTime::parse_from_str(&e, "%Y-%m-%dT%H:%M:%S")
1029                                .ok()
1030                                .or_else(|| {
1031                                    chrono::NaiveDateTime::parse_from_str(&e, "%Y-%m-%d %H:%M:%S")
1032                                        .ok()
1033                                })
1034                        })
1035                        .map(|exp| {
1036                            let now = chrono::Utc::now().naive_utc();
1037                            if exp > now {
1038                                (exp - now).num_seconds().max(0) as u64
1039                            } else {
1040                                0
1041                            }
1042                        })
1043                        .unwrap_or(3600);
1044
1045                    (
1046                        id,
1047                        roboticus_llm::ExportedCacheEntry {
1048                            content: pe.response,
1049                            model: pe.model,
1050                            tokens_saved: pe.tokens_saved,
1051                            hits: pe.hit_count,
1052                            involved_tools: false,
1053                            embedding: pe.embedding,
1054                            ttl_remaining_secs: ttl,
1055                        },
1056                    )
1057                })
1058                .collect();
1059            let count = imported.len();
1060            let llm = state.llm.read().await;
1061            llm.cache
1062                .lock()
1063                .unwrap_or_else(|e| e.into_inner())
1064                .import_entries(imported);
1065            tracing::info!(count, "Loaded semantic cache from database");
1066        }
1067    }
1068
1069    // Periodic cache flush (every 5 minutes)
1070    {
1071        let flush_db = state.db.clone();
1072        let flush_llm = Arc::clone(&state.llm);
1073        tokio::spawn(async move {
1074            let mut interval = tokio::time::interval(std::time::Duration::from_secs(300));
1075            interval.tick().await; // skip first immediate tick
1076            loop {
1077                interval.tick().await;
1078                let entries = {
1079                    let llm = flush_llm.read().await;
1080                    llm.cache
1081                        .lock()
1082                        .unwrap_or_else(|e| e.into_inner())
1083                        .export_entries()
1084                };
1085                for (hash, entry) in &entries {
1086                    let expires = chrono::Utc::now()
1087                        + chrono::Duration::seconds(entry.ttl_remaining_secs as i64);
1088                    let pe = roboticus_db::cache::PersistedCacheEntry {
1089                        prompt_hash: hash.clone(),
1090                        response: entry.content.clone(),
1091                        model: entry.model.clone(),
1092                        tokens_saved: entry.tokens_saved,
1093                        hit_count: entry.hits,
1094                        embedding: entry.embedding.clone(),
1095                        created_at: chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S").to_string(),
1096                        expires_at: Some(expires.format("%Y-%m-%dT%H:%M:%S").to_string()),
1097                    };
1098                    roboticus_db::cache::save_cache_entry(&flush_db, hash, &pe)
1099                        .inspect_err(
1100                            |e| tracing::warn!(error = %e, hash, "failed to persist cache entry"),
1101                        )
1102                        .ok();
1103                }
1104                roboticus_db::cache::evict_expired_cache(&flush_db)
1105                    .inspect_err(|e| tracing::warn!(error = %e, "failed to evict expired cache"))
1106                    .ok();
1107                tracing::debug!(count = entries.len(), "Flushed semantic cache to database");
1108            }
1109        });
1110        tracing::info!("Cache flush daemon spawned (5min interval)");
1111    }
1112
1113    // Start heartbeat domain loops — shutdown_tx MUST live for the server's
1114    // lifetime; dropping it closes the watch channel and all loops exit.
1115    let (heartbeat_shutdown_tx, heartbeat_treasury_state) = {
1116        let hb_wallet = Arc::clone(&state.wallet);
1117        let hb_db = state.db.clone();
1118        let hb_session_cfg = config.session.clone();
1119        let hb_digest_cfg = config.digest.clone();
1120        let hb_agent_id = config.agent.id.clone();
1121        let hb_vault = state.obsidian.clone();
1122        let hb_config = config.heartbeat.clone();
1123        roboticus_schedule::spawn_domain_loops(
1124            hb_wallet,
1125            hb_db,
1126            hb_session_cfg,
1127            hb_digest_cfg,
1128            hb_agent_id,
1129            hb_vault,
1130            hb_config,
1131        )
1132        .await
1133    };
1134    tracing::info!(
1135        treasury_s = config.heartbeat.treasury_interval_seconds,
1136        yield_s = config.heartbeat.yield_interval_seconds,
1137        memory_s = config.heartbeat.memory_interval_seconds,
1138        maintenance_s = config.heartbeat.maintenance_interval_seconds,
1139        session_s = config.heartbeat.session_interval_seconds,
1140        discovery_s = config.heartbeat.discovery_interval_seconds,
1141        "domain loops spawned"
1142    );
1143    // CRITICAL: shutdown_tx and treasury_state MUST outlive the server.
1144    // Dropping shutdown_tx closes the watch channel, causing all domain
1145    // loops to exit immediately. Leaking keeps them alive for the process
1146    // lifetime. treasury_state will be wired into AppState in v0.12.0
1147    // for wallet API route access; until then, leak it too.
1148    std::mem::forget(heartbeat_shutdown_tx);
1149    std::mem::forget(heartbeat_treasury_state);
1150
1151    // Delivery retry queue drain (every 30 seconds)
1152    {
1153        let drain_router = Arc::clone(&state.channel_router);
1154        tokio::spawn(async move {
1155            let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
1156            interval.tick().await;
1157            loop {
1158                interval.tick().await;
1159                drain_router.drain_retry_queue().await;
1160            }
1161        });
1162        tracing::info!("Delivery retry queue drain daemon spawned (30s interval)");
1163    }
1164
1165    // Start cron worker
1166    {
1167        let instance_id = config.agent.id.clone();
1168        let cron_state = state.clone();
1169        tokio::spawn(async move {
1170            crate::cron_runtime::run_cron_worker(cron_state, instance_id).await;
1171        });
1172        tracing::info!("Cron worker spawned");
1173    }
1174
1175    // Periodic mechanic-check sweep (default 6h, per-instance).
1176    {
1177        let db_path = config.database.path.clone();
1178        let interval_secs = std::env::var("ROBOTICUS_MECHANIC_CHECK_INTERVAL_SECS")
1179            .ok()
1180            .or_else(|| std::env::var("ROBOTICUS_STATE_HYGIENE_INTERVAL_SECS").ok())
1181            .and_then(|v| v.parse::<u64>().ok())
1182            .filter(|v| *v >= 300)
1183            .unwrap_or(21_600);
1184        tokio::spawn(async move {
1185            let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
1186            interval.tick().await;
1187            loop {
1188                interval.tick().await;
1189                match crate::state_hygiene::run_state_hygiene(&db_path) {
1190                    Ok(report) if report.changed => {
1191                        tracing::info!(
1192                            changed_rows = report.changed_rows,
1193                            subagent_rows_normalized = report.subagent_rows_normalized,
1194                            cron_payload_rows_repaired = report.cron_payload_rows_repaired,
1195                            cron_jobs_disabled_invalid_expr =
1196                                report.cron_jobs_disabled_invalid_expr,
1197                            "periodic mechanic checks applied"
1198                        );
1199                    }
1200                    Ok(_) => tracing::debug!("periodic mechanic checks: no changes"),
1201                    Err(e) => tracing::warn!(error = %e, "periodic mechanic checks failed"),
1202                }
1203            }
1204        });
1205        tracing::info!(interval_secs, "Mechanic checks daemon spawned");
1206    }
1207
1208    {
1209        let startup_announce_channels = config.channels.startup_announcement_channels();
1210
1211        let display_host = match config.server.bind.as_str() {
1212            "127.0.0.1" | "0.0.0.0" | "::1" | "::" => "localhost",
1213            other => other,
1214        };
1215        let channel_list: Vec<&str> = {
1216            let mut ch = vec!["web"];
1217            if config.channels.telegram.is_some() {
1218                ch.push("telegram");
1219            }
1220            if config.channels.whatsapp.is_some() {
1221                ch.push("whatsapp");
1222            }
1223            if config.channels.discord.is_some() {
1224                ch.push("discord");
1225            }
1226            if config.channels.signal.is_some() {
1227                ch.push("signal");
1228            }
1229            ch
1230        };
1231        let announce_text = format!(
1232            "🤖 *Roboticus Online*\n\n\
1233             🧠 *{}* (`{}`)\n\
1234             ⚡ `{}`\n\
1235             🔀 routing: {}\n\
1236             🌐 `{}:{}`\n\
1237             📡 {}\n\n\
1238             🕐 {}",
1239            config.agent.name,
1240            config.agent.id,
1241            config.models.primary,
1242            config.models.routing.mode,
1243            display_host,
1244            config.server.port,
1245            channel_list.join(" · "),
1246            chrono::Local::now().format("%Y-%m-%d %H:%M %Z"),
1247        );
1248
1249        if startup_announce_channels.iter().any(|c| c == "telegram") {
1250            if let (Some(adapter), Some(tg_cfg)) =
1251                (state.telegram.clone(), config.channels.telegram.as_ref())
1252            {
1253                let announce_targets = tg_cfg.allowed_chat_ids.clone();
1254                if announce_targets.is_empty() {
1255                    tracing::warn!(
1256                        "Telegram startup announcement skipped: channels.telegram.allowed_chat_ids is empty"
1257                    );
1258                } else {
1259                    let text = announce_text.clone();
1260                    tokio::spawn(async move {
1261                        for chat_id in announce_targets {
1262                            let chat = chat_id.to_string();
1263                            match adapter
1264                                .send(roboticus_channels::OutboundMessage {
1265                                    content: text.clone(),
1266                                    recipient_id: chat.clone(),
1267                                    metadata: None,
1268                                })
1269                                .await
1270                            {
1271                                Ok(()) => {
1272                                    tracing::info!(chat_id = %chat, "telegram startup announcement sent")
1273                                }
1274                                Err(e) => {
1275                                    tracing::warn!(chat_id = %chat, error = %e, "telegram startup announcement failed")
1276                                }
1277                            }
1278                        }
1279                    });
1280                }
1281            } else {
1282                tracing::warn!(
1283                    "Telegram startup announcement requested but telegram channel is not enabled/configured"
1284                );
1285            }
1286        }
1287
1288        if startup_announce_channels.iter().any(|c| c == "whatsapp") {
1289            if let (Some(adapter), Some(wa_cfg)) =
1290                (state.whatsapp.clone(), config.channels.whatsapp.as_ref())
1291            {
1292                let targets = wa_cfg.allowed_numbers.clone();
1293                if targets.is_empty() {
1294                    tracing::warn!(
1295                        "WhatsApp startup announcement skipped: channels.whatsapp.allowed_numbers is empty"
1296                    );
1297                } else {
1298                    let text = announce_text.clone();
1299                    tokio::spawn(async move {
1300                        for number in targets {
1301                            match adapter
1302                                .send(roboticus_channels::OutboundMessage {
1303                                    content: text.clone(),
1304                                    recipient_id: number.clone(),
1305                                    metadata: None,
1306                                })
1307                                .await
1308                            {
1309                                Ok(()) => {
1310                                    tracing::info!(recipient = %number, "whatsapp startup announcement sent")
1311                                }
1312                                Err(e) => {
1313                                    tracing::warn!(recipient = %number, error = %e, "whatsapp startup announcement failed")
1314                                }
1315                            }
1316                        }
1317                    });
1318                }
1319            } else {
1320                tracing::warn!(
1321                    "WhatsApp startup announcement requested but whatsapp channel is not enabled/configured"
1322                );
1323            }
1324        }
1325
1326        if startup_announce_channels.iter().any(|c| c == "signal") {
1327            if let (Some(adapter), Some(sig_cfg)) =
1328                (state.signal.clone(), config.channels.signal.as_ref())
1329            {
1330                let targets = sig_cfg.allowed_numbers.clone();
1331                if targets.is_empty() {
1332                    tracing::warn!(
1333                        "Signal startup announcement skipped: channels.signal.allowed_numbers is empty"
1334                    );
1335                } else {
1336                    let text = announce_text.clone();
1337                    tokio::spawn(async move {
1338                        for number in targets {
1339                            match adapter
1340                                .send(roboticus_channels::OutboundMessage {
1341                                    content: text.clone(),
1342                                    recipient_id: number.clone(),
1343                                    metadata: None,
1344                                })
1345                                .await
1346                            {
1347                                Ok(()) => {
1348                                    tracing::info!(recipient = %number, "signal startup announcement sent")
1349                                }
1350                                Err(e) => {
1351                                    tracing::warn!(recipient = %number, error = %e, "signal startup announcement failed")
1352                                }
1353                            }
1354                        }
1355                    });
1356                }
1357            } else {
1358                tracing::warn!(
1359                    "Signal startup announcement requested but signal channel is not enabled/configured"
1360                );
1361            }
1362        }
1363
1364        for ch in &startup_announce_channels {
1365            if ch != "telegram" && ch != "whatsapp" && ch != "signal" {
1366                tracing::warn!(
1367                    channel = %ch,
1368                    "startup announcement requested for channel without recipient mapping support"
1369                );
1370            }
1371        }
1372    }
1373
1374    if state.telegram.is_some() {
1375        let use_polling = config
1376            .channels
1377            .telegram
1378            .as_ref()
1379            .map(|c| !c.webhook_mode)
1380            .unwrap_or(true);
1381        if use_polling {
1382            let poll_state = state.clone();
1383            tokio::spawn(async move {
1384                api::telegram_poll_loop(poll_state).await;
1385            });
1386        }
1387    }
1388    if state.discord.is_some() {
1389        let poll_state = state.clone();
1390        tokio::spawn(async move {
1391            api::discord_poll_loop(poll_state).await;
1392        });
1393    }
1394    if state.signal.is_some() {
1395        let poll_state = state.clone();
1396        tokio::spawn(async move {
1397            api::signal_poll_loop(poll_state).await;
1398        });
1399    }
1400    if state.email.is_some() {
1401        let poll_state = state.clone();
1402        tokio::spawn(async move {
1403            api::email_poll_loop(poll_state).await;
1404        });
1405    }
1406
1407    let auth_layer = ApiKeyLayer::new(config.server.api_key.clone());
1408    let local_origin = format!("http://{}:{}", config.server.bind, config.server.port);
1409    let origin_header = local_origin
1410        .parse::<axum::http::HeaderValue>()
1411        .unwrap_or_else(|e| {
1412            tracing::warn!(
1413                origin = %local_origin,
1414                error = %e,
1415                "CORS origin failed to parse, falling back to 127.0.0.1 loopback"
1416            );
1417            axum::http::HeaderValue::from_static("http://127.0.0.1:3000")
1418        });
1419    let cors = CorsLayer::new()
1420        .allow_origin(origin_header)
1421        .allow_methods([
1422            axum::http::Method::GET,
1423            axum::http::Method::POST,
1424            axum::http::Method::PUT,
1425            axum::http::Method::DELETE,
1426        ])
1427        .allow_headers([
1428            axum::http::header::CONTENT_TYPE,
1429            axum::http::header::AUTHORIZATION,
1430            axum::http::HeaderName::from_static("x-api-key"),
1431        ]);
1432    let authed_routes = build_router(state.clone()).layer(auth_layer);
1433
1434    // WebSocket route handles its own auth (header OR ticket) so it
1435    // lives outside the API-key middleware layer.
1436    let ws_routes = axum::Router::new().route(
1437        "/ws",
1438        ws_route(
1439            event_bus.clone(),
1440            state.ws_tickets.clone(),
1441            config.server.api_key.clone(),
1442        ),
1443    );
1444
1445    // MCP protocol endpoint uses bearer token auth (same API key).
1446    // Lives outside the main API-key middleware because the MCP transport
1447    // service (StreamableHttpService) needs its own route handling.
1448    let mcp_routes = build_mcp_router(&state, config.server.api_key.clone());
1449
1450    let public_routes = build_public_router(state);
1451
1452    let app = authed_routes
1453        .merge(ws_routes)
1454        .merge(mcp_routes)
1455        .merge(public_routes)
1456        .layer(cors)
1457        .layer(rate_limiter);
1458    Ok(app)
1459}
1460
1461#[cfg(test)]
1462pub mod test_support;
1463
1464#[cfg(test)]
1465mod tests {
1466    use super::*;
1467    use crate::test_support::EnvGuard;
1468
1469    const BOOTSTRAP_CONFIG: &str = r#"
1470[agent]
1471name = "Roboticus"
1472id = "roboticus-test"
1473
1474[server]
1475port = 18789
1476bind = "127.0.0.1"
1477
1478[database]
1479path = ":memory:"
1480
1481[models]
1482primary = "ollama/qwen3:8b"
1483"#;
1484
1485    #[tokio::test]
1486    async fn bootstrap_with_memory_db_succeeds() {
1487        let config = RoboticusConfig::from_str(BOOTSTRAP_CONFIG).expect("parse config");
1488        let result = bootstrap(config).await;
1489        assert!(
1490            result.is_ok(),
1491            "bootstrap with :memory: should succeed: {:?}",
1492            result.err()
1493        );
1494    }
1495
1496    #[tokio::test]
1497    async fn bootstrap_handles_enabled_channels_with_missing_credentials() {
1498        let dir = tempfile::tempdir().unwrap();
1499        let db_path = dir.path().join("state.db");
1500        let cfg = format!(
1501            r#"
1502[agent]
1503name = "T"
1504id = "t"
1505
1506[server]
1507bind = "127.0.0.1"
1508port = 18789
1509
1510[database]
1511path = "{db_path}"
1512
1513[models]
1514primary = "ollama/qwen3:8b"
1515
1516[channels.telegram]
1517enabled = true
1518token_env = "TEST_TELEGRAM_MISSING"
1519
1520[channels.whatsapp]
1521enabled = true
1522token_env = "TEST_WHATSAPP_MISSING"
1523phone_number_id = ""
1524
1525[channels.discord]
1526enabled = true
1527token_env = "TEST_DISCORD_MISSING"
1528
1529[channels.signal]
1530enabled = true
1531phone_number = ""
1532
1533[channels.email]
1534enabled = true
1535password_env = "TEST_EMAIL_PASSWORD_MISSING"
1536
1537[channels.voice]
1538enabled = true
1539
1540[obsidian]
1541enabled = true
1542vault_path = "{vault_path}"
1543watch_for_changes = true
1544"#,
1545            db_path = db_path.display(),
1546            vault_path = dir.path().join("missing-vault").display(),
1547        );
1548        let config = RoboticusConfig::from_str(&cfg).expect("parse config");
1549        let result = bootstrap(config).await;
1550        assert!(
1551            result.is_ok(),
1552            "bootstrap should tolerate disabled channel credentials and invalid obsidian path: {:?}",
1553            result.err()
1554        );
1555    }
1556
1557    #[tokio::test]
1558    async fn bootstrap_loads_taskable_subagents_from_database() {
1559        let dir = tempfile::tempdir().unwrap();
1560        let db_path = dir.path().join("state.db");
1561        let db = roboticus_db::Database::new(db_path.to_str().unwrap()).unwrap();
1562
1563        roboticus_db::agents::upsert_sub_agent(
1564            &db,
1565            &roboticus_db::agents::SubAgentRow {
1566                id: "sa-1".into(),
1567                name: "finance_specialist".into(),
1568                display_name: Some("Finance Specialist".into()),
1569                model: "auto".into(),
1570                fallback_models_json: Some("[]".into()),
1571                role: "subagent".into(),
1572                description: Some("Handles finance tasks".into()),
1573                skills_json: Some(r#"["budgeting"]"#.into()),
1574                enabled: true,
1575                session_count: 0,
1576                last_used_at: None,
1577            },
1578        )
1579        .unwrap();
1580        roboticus_db::agents::upsert_sub_agent(
1581            &db,
1582            &roboticus_db::agents::SubAgentRow {
1583                id: "sa-2".into(),
1584                name: "observer".into(),
1585                display_name: Some("Observer".into()),
1586                model: "ollama/qwen3:8b".into(),
1587                fallback_models_json: Some("[]".into()),
1588                role: "model-proxy".into(),
1589                description: Some("Not taskable".into()),
1590                skills_json: None,
1591                enabled: true,
1592                session_count: 0,
1593                last_used_at: None,
1594            },
1595        )
1596        .unwrap();
1597
1598        let cfg = format!(
1599            r#"
1600[agent]
1601name = "T"
1602id = "t"
1603
1604[server]
1605bind = "127.0.0.1"
1606port = 18789
1607
1608[database]
1609path = "{db_path}"
1610
1611[models]
1612primary = "ollama/qwen3:8b"
1613"#,
1614            db_path = db_path.display(),
1615        );
1616        let config = RoboticusConfig::from_str(&cfg).expect("parse config");
1617        let result = bootstrap(config).await;
1618        assert!(
1619            result.is_ok(),
1620            "bootstrap should register enabled taskable subagents from db: {:?}",
1621            result.err()
1622        );
1623    }
1624
1625    #[test]
1626    fn cleanup_old_logs_no_panic_on_missing_dir() {
1627        let dir = std::path::Path::new("/tmp/roboticus-test-nonexistent-dir-cleanup");
1628        cleanup_old_logs(dir, 30);
1629    }
1630
1631    #[test]
1632    fn cleanup_old_logs_keeps_recent_logs() {
1633        let dir = tempfile::tempdir().unwrap();
1634        let recent = dir.path().join("recent.log");
1635        std::fs::write(&recent, "fresh log").unwrap();
1636
1637        cleanup_old_logs(dir.path(), 30);
1638        assert!(recent.exists(), "recent logs should remain");
1639    }
1640
1641    #[test]
1642    fn cleanup_old_logs_ignores_non_log_files() {
1643        let dir = tempfile::tempdir().unwrap();
1644        let txt = dir.path().join("data.txt");
1645        std::fs::write(&txt, "keep me").unwrap();
1646
1647        cleanup_old_logs(dir.path(), 0);
1648        assert!(txt.exists(), "non-log files should not be deleted");
1649    }
1650
1651    #[test]
1652    fn cleanup_old_logs_empty_dir() {
1653        let dir = tempfile::tempdir().unwrap();
1654        cleanup_old_logs(dir.path(), 1);
1655    }
1656
1657    #[test]
1658    fn taskable_subagent_roles_are_strict() {
1659        assert!(is_taskable_subagent_role("subagent"));
1660        assert!(is_taskable_subagent_role("specialist"));
1661        assert!(is_taskable_subagent_role("SubAgent"));
1662        assert!(!is_taskable_subagent_role("model-proxy"));
1663    }
1664
1665    #[test]
1666    fn resolve_token_prefers_keystore_reference_then_env_then_empty() {
1667        let dir = tempfile::tempdir().unwrap();
1668        let keystore_path = dir.path().join("keystore.enc");
1669        let keystore = roboticus_core::keystore::Keystore::new(keystore_path);
1670        keystore.unlock("pw").unwrap();
1671        keystore.set("telegram_bot_token", "from_keystore").unwrap();
1672
1673        let _env = EnvGuard::set("TEST_TELEGRAM_TOKEN", "from_env");
1674        let token = resolve_token(
1675            &Some("keystore:telegram_bot_token".to_string()),
1676            "TEST_TELEGRAM_TOKEN",
1677            &keystore,
1678        );
1679        assert_eq!(token, "from_keystore");
1680
1681        let fallback = resolve_token(
1682            &Some("keystore:missing".to_string()),
1683            "TEST_TELEGRAM_TOKEN",
1684            &keystore,
1685        );
1686        assert_eq!(fallback, "from_env");
1687
1688        let empty = resolve_token(&None, "", &keystore);
1689        assert!(empty.is_empty());
1690    }
1691
1692    #[test]
1693    fn resolve_token_uses_env_when_no_keystore_ref_is_provided() {
1694        let dir = tempfile::tempdir().unwrap();
1695        let keystore_path = dir.path().join("keystore.enc");
1696        let keystore = roboticus_core::keystore::Keystore::new(keystore_path);
1697        keystore.unlock("pw").unwrap();
1698
1699        let _env = EnvGuard::set("TEST_SIGNAL_TOKEN", "from_env_only");
1700        let token = resolve_token(&None, "TEST_SIGNAL_TOKEN", &keystore);
1701        assert_eq!(token, "from_env_only");
1702    }
1703
1704    #[test]
1705    fn resolve_token_returns_empty_for_empty_env_var() {
1706        let dir = tempfile::tempdir().unwrap();
1707        let keystore_path = dir.path().join("keystore.enc");
1708        let keystore = roboticus_core::keystore::Keystore::new(keystore_path);
1709        keystore.unlock("pw").unwrap();
1710
1711        let _env = EnvGuard::set("TEST_EMPTY_TOKEN", "");
1712        let token = resolve_token(&None, "TEST_EMPTY_TOKEN", &keystore);
1713        assert!(token.is_empty());
1714    }
1715
1716    #[test]
1717    fn cleanup_old_logs_can_prune_when_window_is_zero_days() {
1718        let dir = tempfile::tempdir().unwrap();
1719        let old_log = dir.path().join("old.log");
1720        std::fs::write(&old_log, "stale").unwrap();
1721        std::thread::sleep(std::time::Duration::from_millis(10));
1722        cleanup_old_logs(dir.path(), 0);
1723        assert!(!old_log.exists());
1724    }
1725}