Skip to main content

roboticus_server/
lib.rs

1//! # roboticus-server
2//!
3//! Top-level binary crate that assembles all Roboticus workspace crates into a
4//! single runtime. The [`bootstrap()`] function initializes the database,
5//! wallet, LLM pipeline, agent loop, channel adapters, and background daemons,
6//! then returns an axum `Router` ready to serve.
7//!
8//! ## Key Types
9//!
10//! - [`AppState`] -- Shared application state passed to all route handlers
11//! - [`PersonalityState`] -- Loaded personality files (OS, firmware, identity)
12//! - [`EventBus`] -- Tokio broadcast channel for WebSocket event push
13//!
14//! ## Modules
15//!
16//! - `api` -- REST API mount point, `build_router()`, route modules
17//! - `auth` -- API key authentication middleware layer
18//! - `rate_limit` -- Global + per-IP rate limiting (sliding window)
19//! - `dashboard` -- Embedded SPA serving (compile-time or filesystem)
20//! - `ws` -- WebSocket upgrade and event broadcasting
21//! - `cli/` -- CLI command handlers (serve, status, sessions, memory, wallet, etc.)
22//! - `daemon` -- Daemon install, status, uninstall
23//! - `migrate/` -- Migration engine, skill import/export
24//! - `plugins` -- Plugin registry initialization and loading
25//!
26//! ## Bootstrap Sequence
27//!
28//! 1. Parse CLI → load config → init DB → load wallet → generate HMAC secret
29//! 2. Init LLM client + router + embedding → load semantic cache
30//! 3. Init agent loop + tool registry + memory retriever
31//! 4. Register channel adapters (Telegram, WhatsApp, Discord, Signal)
32//! 5. Spawn background daemons (heartbeat, cron, cache flush, ANN rebuild)
33//! 6. Build axum router with auth + CORS + rate limiting
34
35// ── Re-exports from roboticus-api ──────────────────────────────
36pub use roboticus_api as api_crate;
37pub use roboticus_api::abuse;
38pub use roboticus_api::api;
39pub use roboticus_api::auth;
40pub use roboticus_api::config_runtime;
41pub use roboticus_api::cron_runtime;
42pub use roboticus_api::dashboard;
43pub use roboticus_api::rate_limit;
44pub use roboticus_api::ws;
45pub use roboticus_api::ws_ticket;
46pub use roboticus_api::{
47    AppState, EventBus, PersonalityState, TicketStore, build_dashboard_html, build_mcp_router,
48    build_public_router, build_router, dashboard_handler, ws_route,
49};
50
51// ── Re-exports from roboticus-cli ─────────────────────────────
52pub use roboticus_cli::cli;
53pub use roboticus_cli::migrate;
54
55// ── Server-local modules ─────────────────────────────────────
56pub mod config_maintenance;
57pub mod daemon;
58pub mod plugins;
59pub use roboticus_cli::state_hygiene;
60
61use std::sync::Arc;
62use std::sync::OnceLock;
63use std::sync::atomic::{AtomicBool, Ordering};
64use std::time::Duration;
65
66use tokio::sync::RwLock;
67use tower_http::cors::CorsLayer;
68
69use auth::ApiKeyLayer;
70use roboticus_agent::policy::{
71    AuthorityRule, CommandSafetyRule, FinancialRule, PathProtectionRule, PolicyEngine,
72    RateLimitRule, ValidationRule,
73};
74use roboticus_agent::subagents::SubagentRegistry;
75use roboticus_browser::Browser;
76use roboticus_channels::ChannelAdapter;
77use roboticus_channels::a2a::A2aProtocol;
78use roboticus_channels::router::ChannelRouter;
79use roboticus_channels::telegram::TelegramAdapter;
80use roboticus_channels::whatsapp::WhatsAppAdapter;
81use roboticus_core::RoboticusConfig;
82use roboticus_db::Database;
83use roboticus_llm::LlmService;
84use roboticus_llm::OAuthManager;
85use roboticus_wallet::{WalletPaymentHandler, WalletService};
86
87use roboticus_agent::approvals::ApprovalManager;
88use roboticus_agent::obsidian::ObsidianVault;
89use roboticus_agent::obsidian_tools::{ObsidianReadTool, ObsidianSearchTool, ObsidianWriteTool};
90use roboticus_agent::tools::{
91    BashTool, EchoTool, EditFileTool, GlobFilesTool, ListDirectoryTool, ReadFileTool,
92    ScriptRunnerTool, SearchFilesTool, ToolRegistry, WriteFileTool,
93};
94use roboticus_channels::discord::DiscordAdapter;
95use roboticus_channels::email::EmailAdapter;
96use roboticus_channels::signal::SignalAdapter;
97use roboticus_channels::voice::{VoiceConfig, VoicePipeline};
98
99use rate_limit::GlobalRateLimitLayer;
100
101static STDERR_ENABLED: AtomicBool = AtomicBool::new(false);
102static LOG_GUARD: OnceLock<tracing_appender::non_blocking::WorkerGuard> = OnceLock::new();
103
104fn is_taskable_subagent_role(role: &str) -> bool {
105    role.eq_ignore_ascii_case("subagent")
106        || role.eq_ignore_ascii_case("specialist")
107        || role.eq_ignore_ascii_case("observer")
108}
109
110pub fn enable_stderr_logging() {
111    STDERR_ENABLED.store(true, Ordering::Release);
112}
113
114fn init_logging(config: &RoboticusConfig) {
115    use tracing_appender::rolling::{RollingFileAppender, Rotation};
116    use tracing_subscriber::EnvFilter;
117    use tracing_subscriber::Layer;
118    use tracing_subscriber::filter::filter_fn;
119    use tracing_subscriber::fmt;
120    use tracing_subscriber::layer::SubscriberExt;
121    use tracing_subscriber::util::SubscriberInitExt;
122
123    let level = config.agent.log_level.as_str();
124    // Promote hyper errors to warn so IncompleteMessage and connection
125    // resets are visible in production logs (they default to DEBUG).
126    let base_filter = format!("{level},hyper=warn,h2=warn,rustls=warn");
127    let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&base_filter));
128
129    let stderr_gate = filter_fn(|_| STDERR_ENABLED.load(Ordering::Acquire));
130
131    let log_dir = &config.server.log_dir;
132    let stderr_layer = fmt::layer()
133        .with_writer(std::io::stderr)
134        .with_filter(stderr_gate);
135
136    let file_appender = std::fs::create_dir_all(log_dir).ok().and_then(|_| {
137        RollingFileAppender::builder()
138            .rotation(Rotation::DAILY)
139            .filename_prefix("roboticus")
140            .filename_suffix("log")
141            .build(log_dir)
142            .map_err(|e| {
143                eprintln!(
144                    "warning: failed to initialize file logging in {}: {e}",
145                    log_dir.display()
146                );
147                e
148            })
149            .ok()
150    });
151
152    if let Some(file_appender) = file_appender {
153        let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
154        let _ = LOG_GUARD.set(guard);
155
156        let file_layer = fmt::layer()
157            .with_writer(non_blocking)
158            .with_ansi(false)
159            .json();
160
161        let _ = tracing_subscriber::registry()
162            .with(filter)
163            .with(stderr_layer)
164            .with(file_layer)
165            .try_init();
166    } else {
167        let _ = tracing_subscriber::registry()
168            .with(filter)
169            .with(stderr_layer)
170            .try_init();
171    }
172
173    cleanup_old_logs(log_dir, config.server.log_max_days);
174}
175
176fn cleanup_old_logs(log_dir: &std::path::Path, max_days: u32) {
177    let cutoff =
178        std::time::SystemTime::now() - std::time::Duration::from_secs(u64::from(max_days) * 86400);
179
180    let entries = match std::fs::read_dir(log_dir) {
181        Ok(e) => e,
182        Err(_) => return,
183    };
184
185    for entry in entries.flatten() {
186        let path = entry.path();
187        if path.extension().and_then(|e| e.to_str()) != Some("log") {
188            continue;
189        }
190        if let Ok(meta) = entry.metadata()
191            && let Ok(modified) = meta.modified()
192            && modified < cutoff
193        {
194            // best-effort: log rotation cleanup failure is non-critical
195            let _ = std::fs::remove_file(&path);
196        }
197    }
198}
199
200/// Resolve a channel token: keystore reference first, then env var fallback.
201fn resolve_token(
202    token_ref: &Option<String>,
203    token_env: &str,
204    keystore: &roboticus_core::Keystore,
205) -> String {
206    if let Some(r) = token_ref
207        && let Some(name) = r.strip_prefix("keystore:")
208    {
209        if let Some(val) = keystore.get(name) {
210            return val;
211        }
212        tracing::warn!(key = %name, "keystore reference not found, falling back to env var");
213    }
214    if !token_env.is_empty() {
215        return match std::env::var(token_env) {
216            Ok(val) if !val.is_empty() => val,
217            Ok(_) => {
218                tracing::warn!(env_var = %token_env, "API key env var is set but empty");
219                String::new()
220            }
221            Err(_) => {
222                tracing::warn!(env_var = %token_env, "API key env var is not set");
223                String::new()
224            }
225        };
226    }
227    String::new()
228}
229
230/// Builds the application state and router from config. Used by the binary and by tests.
231pub async fn bootstrap(
232    config: RoboticusConfig,
233) -> Result<axum::Router, Box<dyn std::error::Error>> {
234    bootstrap_with_config_path(config, None).await
235}
236
237pub async fn bootstrap_with_config_path(
238    config: RoboticusConfig,
239    config_path: Option<std::path::PathBuf>,
240) -> Result<axum::Router, Box<dyn std::error::Error>> {
241    init_logging(&config);
242
243    let personality_state = api::PersonalityState::from_workspace(&config.agent.workspace);
244
245    if !personality_state.os_text.is_empty() {
246        tracing::info!(
247            personality = %personality_state.identity.name,
248            generated_by = %personality_state.identity.generated_by,
249            "Loaded personality files from workspace"
250        );
251    } else {
252        tracing::info!("No personality files found in workspace, using defaults");
253    }
254
255    let db_path = config.database.path.to_string_lossy().to_string();
256    let db = Database::new(&db_path)?;
257    match crate::state_hygiene::run_state_hygiene(&config.database.path) {
258        Ok(report) if report.changed => {
259            tracing::info!(
260                changed_rows = report.changed_rows,
261                subagent_rows_normalized = report.subagent_rows_normalized,
262                cron_payload_rows_repaired = report.cron_payload_rows_repaired,
263                cron_jobs_disabled_invalid_expr = report.cron_jobs_disabled_invalid_expr,
264                "applied startup mechanic checks"
265            );
266        }
267        Ok(_) => {}
268        Err(e) => tracing::warn!(error = %e, "startup mechanic checks failed"),
269    }
270    match roboticus_db::sessions::backfill_nicknames(&db) {
271        Ok(0) => {}
272        Ok(n) => tracing::info!(count = n, "Backfilled session nicknames"),
273        Err(e) => tracing::warn!(error = %e, "Failed to backfill session nicknames"),
274    }
275    let mut llm = LlmService::new(&config)?;
276    // Seed quality tracker with recent observations so metascore routing
277    // has a warm start instead of assuming 0.8 for every model.
278    match roboticus_db::metrics::recent_quality_scores(&db, 200) {
279        Ok(scores) if !scores.is_empty() => llm.quality.seed_from_history(&scores),
280        Ok(_) => {}
281        Err(e) => tracing::warn!(error = %e, "failed to seed quality tracker from history"),
282    }
283    let wallet = WalletService::new(&config).await?;
284
285    // Wire x402 payment handler: when the LLM client hits an HTTP 402, it
286    // signs an EIP-3009 authorization via the agent wallet and retries.
287    // The treasury policy enforces per-payment caps from configuration.
288    let wallet_arc = Arc::new(wallet.wallet.clone());
289    let treasury_policy = roboticus_wallet::treasury::TreasuryPolicy::new(&config.treasury);
290    let x402_handler =
291        Arc::new(WalletPaymentHandler::new(wallet_arc).with_treasury_policy(treasury_policy));
292    llm.set_payment_handler(x402_handler);
293
294    let a2a = A2aProtocol::new(config.a2a.clone());
295    let plugin_env = {
296        let mut env = std::collections::HashMap::new();
297        env.insert("ROBOTICUS_DELEGATION_DEPTH".into(), "0".into());
298        env.insert(
299            "ROBOTICUS_WORKSPACE".into(),
300            config.agent.workspace.display().to_string(),
301        );
302        if let Some(ref ws) = config.skills.workspace_dir {
303            env.insert("ROBOTICUS_SKILLS_DIR".into(), ws.display().to_string());
304        }
305        env
306    };
307    let plugin_registry = plugins::init_plugin_registry(&config.plugins, plugin_env).await;
308    let mut policy_engine = PolicyEngine::new();
309    policy_engine.add_rule(Box::new(AuthorityRule));
310    policy_engine.add_rule(Box::new(CommandSafetyRule));
311    policy_engine.add_rule(Box::new(FinancialRule::new(
312        config.treasury.per_payment_cap,
313    )));
314    policy_engine.add_rule(Box::new(PathProtectionRule::from_config(
315        &config.security.filesystem,
316    )));
317    policy_engine.add_rule(Box::new(RateLimitRule::default()));
318    policy_engine.add_rule(Box::new(ValidationRule));
319    let policy_engine = Arc::new(policy_engine);
320    let browser = Arc::new(Browser::new(config.browser.clone()));
321    let registry = Arc::new(SubagentRegistry::new(4, vec![]));
322
323    if let Ok(sub_agents) = roboticus_db::agents::list_enabled_sub_agents(&db) {
324        for sa in &sub_agents {
325            if !is_taskable_subagent_role(&sa.role) {
326                continue;
327            }
328            let resolved_model = match sa.model.trim().to_ascii_lowercase().as_str() {
329                "auto" | "orchestrator" => llm.router.select_model().to_string(),
330                _ => sa.model.clone(),
331            };
332            let fixed_skills = sa
333                .skills_json
334                .as_deref()
335                .and_then(|s| serde_json::from_str::<Vec<String>>(s).ok())
336                .unwrap_or_default();
337            let agent_config = roboticus_agent::subagents::AgentInstanceConfig {
338                id: sa.name.clone(),
339                name: sa.display_name.clone().unwrap_or_else(|| sa.name.clone()),
340                model: resolved_model,
341                skills: fixed_skills,
342                allowed_subagents: vec![],
343                max_concurrent: 4,
344            };
345            if let Err(e) = registry.register(agent_config).await {
346                tracing::warn!(agent = %sa.name, err = %e, "failed to register sub-agent");
347            } else if let Err(e) = registry.start_agent(&sa.name).await {
348                tracing::warn!(agent = %sa.name, err = %e, "failed to auto-start sub-agent");
349            }
350        }
351        if !sub_agents.is_empty() {
352            tracing::info!(
353                count = sub_agents.len(),
354                "registered sub-agents from database"
355            );
356        }
357    }
358
359    let event_bus = EventBus::new(256);
360
361    let keystore =
362        roboticus_core::keystore::Keystore::new(roboticus_core::keystore::Keystore::default_path());
363    if let Err(e) = keystore.unlock_machine() {
364        tracing::warn!("keystore auto-unlock failed: {e}");
365    }
366    let keystore = Arc::new(keystore);
367
368    let channel_router = Arc::new(ChannelRouter::with_store(db.clone()).await);
369    let telegram: Option<Arc<TelegramAdapter>> =
370        if let Some(ref tg_config) = config.channels.telegram {
371            if tg_config.enabled {
372                let token = resolve_token(&tg_config.token_ref, &tg_config.token_env, &keystore);
373                if !token.is_empty() {
374                    let adapter = Arc::new(TelegramAdapter::with_config(
375                        token,
376                        tg_config.poll_timeout_seconds,
377                        tg_config.allowed_chat_ids.clone(),
378                        tg_config.webhook_secret.clone(),
379                        config.security.deny_on_empty_allowlist,
380                    ));
381                    channel_router
382                        .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
383                        .await;
384                    tracing::info!("Telegram adapter registered");
385                    if tg_config.webhook_secret.is_none() {
386                        tracing::warn!(
387                            "Telegram webhook_secret not set; webhook endpoint will reject with 503"
388                        );
389                    }
390                    Some(adapter)
391                } else {
392                    tracing::warn!(
393                        token_env = %tg_config.token_env,
394                        "Telegram enabled but token is empty"
395                    );
396                    None
397                }
398            } else {
399                None
400            }
401        } else {
402            None
403        };
404
405    let whatsapp: Option<Arc<WhatsAppAdapter>> =
406        if let Some(ref wa_config) = config.channels.whatsapp {
407            if wa_config.enabled {
408                let token = resolve_token(&wa_config.token_ref, &wa_config.token_env, &keystore);
409                if !token.is_empty() && !wa_config.phone_number_id.is_empty() {
410                    let adapter = Arc::new(WhatsAppAdapter::with_config(
411                        token,
412                        wa_config.phone_number_id.clone(),
413                        wa_config.verify_token.clone(),
414                        wa_config.allowed_numbers.clone(),
415                        wa_config.app_secret.clone(),
416                        config.security.deny_on_empty_allowlist,
417                    )?);
418                    channel_router
419                        .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
420                        .await;
421                    tracing::info!("WhatsApp adapter registered");
422                    if wa_config.app_secret.is_none() {
423                        tracing::warn!(
424                            "WhatsApp app_secret not set; webhook endpoint will reject with 503"
425                        );
426                    }
427                    Some(adapter)
428                } else {
429                    tracing::warn!("WhatsApp enabled but token env or phone_number_id is empty");
430                    None
431                }
432            } else {
433                None
434            }
435        } else {
436            None
437        };
438
439    let discord: Option<Arc<DiscordAdapter>> = if let Some(ref dc_config) = config.channels.discord
440    {
441        if dc_config.enabled {
442            let token = resolve_token(&dc_config.token_ref, &dc_config.token_env, &keystore);
443            if !token.is_empty() {
444                let adapter = Arc::new(DiscordAdapter::with_config(
445                    token,
446                    dc_config.allowed_guild_ids.clone(),
447                    config.security.deny_on_empty_allowlist,
448                ));
449                channel_router
450                    .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
451                    .await;
452                // Start WebSocket gateway for real-time event reception
453                if let Err(e) = adapter.connect_gateway().await {
454                    tracing::error!(error = %e, "Failed to connect Discord gateway");
455                }
456                tracing::info!("Discord adapter registered with gateway");
457                Some(adapter)
458            } else {
459                tracing::warn!(
460                    token_env = %dc_config.token_env,
461                    "Discord enabled but token env var is empty"
462                );
463                None
464            }
465        } else {
466            None
467        }
468    } else {
469        None
470    };
471
472    let signal: Option<Arc<SignalAdapter>> = if let Some(ref sig_config) = config.channels.signal {
473        if sig_config.enabled {
474            if !sig_config.phone_number.is_empty() {
475                let adapter = Arc::new(SignalAdapter::with_config(
476                    sig_config.phone_number.clone(),
477                    sig_config.daemon_url.clone(),
478                    sig_config.allowed_numbers.clone(),
479                    config.security.deny_on_empty_allowlist,
480                ));
481                channel_router
482                    .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
483                    .await;
484                tracing::info!("Signal adapter registered");
485                Some(adapter)
486            } else {
487                tracing::warn!("Signal enabled but phone_number is empty");
488                None
489            }
490        } else {
491            None
492        }
493    } else {
494        None
495    };
496
497    let email: Option<Arc<EmailAdapter>> = if config.channels.email.enabled {
498        let email_cfg = &config.channels.email;
499        let password = if email_cfg.password_env.is_empty() {
500            String::new()
501        } else {
502            match std::env::var(&email_cfg.password_env) {
503                Ok(val) => val,
504                Err(_) => {
505                    tracing::warn!(env_var = %email_cfg.password_env, "email password env var is not set");
506                    String::new()
507                }
508            }
509        };
510        if email_cfg.smtp_host.is_empty()
511            || email_cfg.username.is_empty()
512            || password.is_empty()
513            || email_cfg.from_address.is_empty()
514        {
515            tracing::warn!("Email enabled but SMTP credentials are incomplete");
516            None
517        } else {
518            match EmailAdapter::new(
519                email_cfg.from_address.clone(),
520                email_cfg.smtp_host.clone(),
521                email_cfg.smtp_port,
522                email_cfg.imap_host.clone(),
523                email_cfg.imap_port,
524                email_cfg.username.clone(),
525                password,
526            ) {
527                Ok(email_adapter) => {
528                    // Resolve OAuth2 token from env if configured
529                    let oauth2_token =
530                        if email_cfg.use_oauth2 && !email_cfg.oauth2_token_env.is_empty() {
531                            std::env::var(&email_cfg.oauth2_token_env).ok()
532                        } else {
533                            None
534                        };
535                    let adapter = Arc::new(
536                        email_adapter
537                            .with_allowed_senders(email_cfg.allowed_senders.clone())
538                            .with_deny_on_empty(config.security.deny_on_empty_allowlist)
539                            .with_poll_interval(std::time::Duration::from_secs(
540                                email_cfg.poll_interval_seconds,
541                            ))
542                            .with_oauth2_token(oauth2_token)
543                            .with_imap_idle_enabled(email_cfg.imap_idle_enabled),
544                    );
545                    channel_router
546                        .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
547                        .await;
548                    tracing::info!("Email adapter registered");
549
550                    // Start background IMAP listener if IMAP is configured
551                    if !email_cfg.imap_host.is_empty()
552                        && let Err(e) = adapter.start_imap_listener().await
553                    {
554                        tracing::error!(error = %e, "Failed to start email IMAP listener");
555                    }
556
557                    Some(adapter)
558                }
559                Err(e) => {
560                    tracing::error!(error = %e, "Failed to create email adapter");
561                    None
562                }
563            }
564        }
565    } else {
566        None
567    };
568
569    let voice: Option<Arc<RwLock<VoicePipeline>>> = if config.channels.voice.enabled {
570        let mut voice_config = VoiceConfig::default();
571        if let Some(stt) = &config.channels.voice.stt_model {
572            voice_config.stt_model = stt.clone();
573        }
574        if let Some(tts) = &config.channels.voice.tts_model {
575            voice_config.tts_model = tts.clone();
576        }
577        if let Some(v) = &config.channels.voice.tts_voice {
578            voice_config.tts_voice = v.clone();
579        }
580        if let Ok(key) = std::env::var("OPENAI_API_KEY")
581            && !key.is_empty()
582        {
583            voice_config.api_key = Some(key);
584        }
585        tracing::info!("Voice pipeline initialized");
586        Some(Arc::new(RwLock::new(VoicePipeline::new(voice_config))))
587    } else {
588        None
589    };
590
591    let hmac_secret = {
592        use rand::RngCore;
593        let mut buf = vec![0u8; 32];
594        rand::rngs::OsRng.fill_bytes(&mut buf);
595        buf
596    };
597
598    let retriever = Arc::new(roboticus_agent::retrieval::MemoryRetriever::new(
599        config.memory.clone(),
600    ));
601
602    let mut tool_registry = ToolRegistry::new();
603    tool_registry.register(Box::new(EchoTool));
604    tool_registry.register(Box::new(BashTool));
605    tool_registry.register(Box::new(ScriptRunnerTool::new(
606        config.skills.clone(),
607        config.security.filesystem.clone(),
608    )));
609    tool_registry.register(Box::new(ReadFileTool));
610    tool_registry.register(Box::new(WriteFileTool));
611    tool_registry.register(Box::new(EditFileTool));
612    tool_registry.register(Box::new(ListDirectoryTool));
613    tool_registry.register(Box::new(GlobFilesTool));
614    tool_registry.register(Box::new(SearchFilesTool));
615
616    // Cron tool — manages scheduled jobs through the tool registry
617    use roboticus_agent::tools::CronTool;
618    tool_registry.register(Box::new(CronTool));
619
620    // Introspection tools — read-only runtime probes for agent self-awareness
621    use roboticus_agent::tools::{
622        GetChannelHealthTool, GetMemoryStatsTool, GetRuntimeContextTool, GetSubagentStatusTool,
623    };
624    tool_registry.register(Box::new(GetRuntimeContextTool));
625    tool_registry.register(Box::new(GetMemoryStatsTool));
626    tool_registry.register(Box::new(GetChannelHealthTool));
627    tool_registry.register(Box::new(GetSubagentStatusTool));
628
629    // Data tools — agent-managed tables via hippocampus
630    use roboticus_agent::tools::{AlterTableTool, CreateTableTool, DropTableTool};
631    tool_registry.register(Box::new(CreateTableTool));
632    tool_registry.register(Box::new(AlterTableTool));
633    tool_registry.register(Box::new(DropTableTool));
634
635    // Bridge active plugin tools into the same runtime registry used by ReAct.
636    plugins::register_plugin_tools(&mut tool_registry, Arc::clone(&plugin_registry)).await;
637
638    // Obsidian vault integration
639    let obsidian_vault: Option<Arc<RwLock<ObsidianVault>>> = if config.obsidian.enabled {
640        match ObsidianVault::from_config(&config.obsidian) {
641            Ok(vault) => {
642                let vault = Arc::new(RwLock::new(vault));
643                tool_registry.register(Box::new(ObsidianReadTool::new(Arc::clone(&vault))));
644                tool_registry.register(Box::new(ObsidianWriteTool::new(Arc::clone(&vault))));
645                tool_registry.register(Box::new(ObsidianSearchTool::new(Arc::clone(&vault))));
646                tracing::info!("Obsidian vault integration enabled");
647                Some(vault)
648            }
649            Err(e) => {
650                tracing::warn!(error = %e, "Failed to initialize Obsidian vault");
651                None
652            }
653        }
654    } else {
655        None
656    };
657
658    // Start vault file watcher if configured
659    if let Some(ref vault) = obsidian_vault
660        && config.obsidian.watch_for_changes
661    {
662        match roboticus_agent::obsidian::watcher::VaultWatcher::start(Arc::clone(vault)).await {
663            Ok(_watcher) => {
664                // watcher lives in the spawned task — drop is fine here
665                tracing::info!("Obsidian vault file watcher started");
666            }
667            Err(e) => {
668                tracing::warn!(error = %e, "Failed to start Obsidian vault watcher");
669            }
670        }
671    }
672
673    let tool_registry = Arc::new(tool_registry);
674
675    let capabilities = Arc::new(roboticus_agent::capability::CapabilityRegistry::new());
676    if let Err(e) = capabilities
677        .sync_from_tool_registry(Arc::clone(&tool_registry))
678        .await
679    {
680        tracing::warn!(error = %e, "initial capability sync from tool registry reported errors");
681    }
682
683    let approvals = Arc::new(ApprovalManager::new(config.approvals.clone()));
684
685    let oauth = Arc::new(OAuthManager::new()?);
686
687    let discovery_registry = Arc::new(RwLock::new(
688        roboticus_agent::discovery::DiscoveryRegistry::new(),
689    ));
690    let device_manager = Arc::new(RwLock::new(roboticus_agent::device::DeviceManager::new(
691        roboticus_agent::device::DeviceIdentity::generate(&config.agent.id),
692        config.devices.max_paired_devices,
693    )));
694
695    let mut mcp_clients = roboticus_agent::mcp::McpClientManager::new();
696    for c in &config.mcp.clients {
697        let mut conn =
698            roboticus_agent::mcp::McpClientConnection::new(c.name.clone(), c.url.clone());
699        if let Err(e) = conn.discover() {
700            tracing::warn!(mcp_name = %c.name, error = %e, "MCP client discovery failed at startup");
701        }
702        mcp_clients.add_connection(conn);
703    }
704    let mcp_clients = Arc::new(RwLock::new(mcp_clients));
705
706    // v2 MCP client connections (via rmcp)
707    let live_mcp = Arc::new(roboticus_agent::mcp::manager::McpConnectionManager::new());
708    live_mcp
709        .connect_all(&config.mcp.servers, &capabilities)
710        .await;
711    let live_mcp_tool_count = live_mcp.connected_count().await;
712    if live_mcp_tool_count > 0 {
713        tracing::info!(
714            servers = live_mcp_tool_count,
715            "MCP client connections established"
716        );
717    }
718
719    let mut mcp_server_registry = roboticus_agent::mcp::McpServerRegistry::new();
720    let exported = roboticus_agent::mcp::export_tools_as_mcp(
721        &tool_registry
722            .list()
723            .iter()
724            .map(|t| {
725                (
726                    t.name().to_string(),
727                    t.description().to_string(),
728                    t.parameters_schema(),
729                )
730            })
731            .collect::<Vec<_>>(),
732    );
733    for tool in exported {
734        mcp_server_registry.register_tool(tool);
735    }
736    mcp_server_registry.register_resource(roboticus_agent::mcp::McpResource {
737        uri: "roboticus://sessions/active".to_string(),
738        name: "Active Sessions".to_string(),
739        description: "Active sessions in the local runtime".to_string(),
740        mime_type: "application/json".to_string(),
741    });
742    mcp_server_registry.register_resource(roboticus_agent::mcp::McpResource {
743        uri: "roboticus://metrics/capacity".to_string(),
744        name: "Provider Capacity Stats".to_string(),
745        description: "Current provider utilization and headroom".to_string(),
746        mime_type: "application/json".to_string(),
747    });
748    let mcp_server_registry = Arc::new(RwLock::new(mcp_server_registry));
749
750    let ann_index = roboticus_db::ann::AnnIndex::new(config.memory.ann_index);
751    if config.memory.ann_index {
752        match ann_index.build_from_db(&db) {
753            Ok(count) => {
754                if ann_index.is_built() {
755                    tracing::info!(count, "ANN index built from database");
756                } else {
757                    tracing::info!(
758                        count,
759                        min = ann_index.min_entries_for_index,
760                        "ANN index below threshold, brute-force search will be used"
761                    );
762                }
763            }
764            Err(e) => {
765                tracing::warn!(error = %e, "Failed to build ANN index, falling back to brute-force");
766            }
767        }
768    }
769
770    // Initialize media service for multimodal attachment handling
771    let media_service = if config.multimodal.enabled {
772        match roboticus_channels::media::MediaService::new(&config.multimodal) {
773            Ok(svc) => {
774                tracing::info!(
775                    media_dir = ?config.multimodal.media_dir,
776                    "Media service initialized"
777                );
778                Some(Arc::new(svc))
779            }
780            Err(e) => {
781                tracing::error!(error = %e, "Failed to initialize media service");
782                None
783            }
784        }
785    } else {
786        None
787    };
788
789    let resolved_config_path =
790        config_path.unwrap_or_else(crate::config_runtime::resolve_default_config_path);
791
792    // Build rate limiter once — AppState and the middleware layer share the same
793    // Arc<Mutex<…>> so admin observability sees live counters.
794    let rate_limiter = GlobalRateLimitLayer::new(
795        u64::from(config.server.rate_limit_requests),
796        Duration::from_secs(config.server.rate_limit_window_secs),
797    )
798    .with_per_ip_capacity(u64::from(config.server.per_ip_rate_limit_requests))
799    .with_per_actor_capacity(u64::from(config.server.per_actor_rate_limit_requests))
800    .with_trusted_proxy_cidrs(&config.server.trusted_proxy_cidrs);
801
802    // Semantic classifier: backed by the same embedding client as the rest of
803    // the pipeline. Centroid vectors are computed lazily on first classify call
804    // and cached for the lifetime of the process.
805    let semantic_classifier = Arc::new(
806        roboticus_llm::semantic_classifier::SemanticClassifier::new(llm.embedding.clone()),
807    );
808
809    let state = AppState {
810        db,
811        config: Arc::new(RwLock::new(config.clone())),
812        llm: Arc::new(RwLock::new(llm)),
813        wallet: Arc::new(wallet),
814        a2a: Arc::new(RwLock::new(a2a)),
815        personality: Arc::new(RwLock::new(personality_state)),
816        hmac_secret: Arc::new(hmac_secret),
817        interviews: Arc::new(RwLock::new(std::collections::HashMap::new())),
818        plugins: plugin_registry,
819        policy_engine,
820        browser,
821        registry,
822        event_bus: event_bus.clone(),
823        channel_router,
824        telegram,
825        whatsapp,
826        retriever,
827        ann_index,
828        tools: tool_registry,
829        capabilities,
830        approvals,
831        discord,
832        signal,
833        email,
834        voice,
835        media_service,
836        discovery: discovery_registry,
837        devices: device_manager,
838        mcp_clients,
839        mcp_server: mcp_server_registry,
840        live_mcp,
841        oauth,
842        keystore,
843        obsidian: obsidian_vault,
844        started_at: std::time::Instant::now(),
845        config_path: Arc::new(resolved_config_path.clone()),
846        config_apply_status: crate::config_runtime::status_for_path(&resolved_config_path),
847        pending_specialist_proposals: Arc::new(RwLock::new(std::collections::HashMap::new())),
848        ws_tickets: ws_ticket::TicketStore::new(),
849        rate_limiter: rate_limiter.clone(),
850        semantic_classifier,
851    };
852    state.resync_capabilities_from_tools().await;
853
854    // Periodic ANN index rebuild (every 10 minutes)
855    if config.memory.ann_index {
856        let ann_db = state.db.clone();
857        let ann_idx = state.ann_index.clone();
858        tokio::spawn(async move {
859            let mut interval = tokio::time::interval(std::time::Duration::from_secs(600));
860            interval.tick().await;
861            loop {
862                interval.tick().await;
863                match ann_idx.rebuild(&ann_db) {
864                    Ok(count) => {
865                        tracing::debug!(count, "ANN index rebuilt");
866                    }
867                    Err(e) => {
868                        tracing::warn!(error = %e, "ANN index rebuild failed");
869                    }
870                }
871            }
872        });
873        tracing::info!("ANN index rebuild daemon spawned (10min interval)");
874    }
875
876    // Load persisted semantic cache
877    {
878        let loaded = roboticus_db::cache::load_cache_entries(&state.db)
879            .inspect_err(|e| tracing::warn!(error = %e, "failed to load semantic cache entries"))
880            .unwrap_or_default();
881        if !loaded.is_empty() {
882            let imported: Vec<(String, roboticus_llm::ExportedCacheEntry)> = loaded
883                .into_iter()
884                .map(|(id, pe)| {
885                    let ttl = pe
886                        .expires_at
887                        .and_then(|e| {
888                            chrono::NaiveDateTime::parse_from_str(&e, "%Y-%m-%dT%H:%M:%S")
889                                .ok()
890                                .or_else(|| {
891                                    chrono::NaiveDateTime::parse_from_str(&e, "%Y-%m-%d %H:%M:%S")
892                                        .ok()
893                                })
894                        })
895                        .map(|exp| {
896                            let now = chrono::Utc::now().naive_utc();
897                            if exp > now {
898                                (exp - now).num_seconds().max(0) as u64
899                            } else {
900                                0
901                            }
902                        })
903                        .unwrap_or(3600);
904
905                    (
906                        id,
907                        roboticus_llm::ExportedCacheEntry {
908                            content: pe.response,
909                            model: pe.model,
910                            tokens_saved: pe.tokens_saved,
911                            hits: pe.hit_count,
912                            involved_tools: false,
913                            embedding: pe.embedding,
914                            ttl_remaining_secs: ttl,
915                        },
916                    )
917                })
918                .collect();
919            let count = imported.len();
920            let llm = state.llm.read().await;
921            llm.cache
922                .lock()
923                .unwrap_or_else(|e| e.into_inner())
924                .import_entries(imported);
925            tracing::info!(count, "Loaded semantic cache from database");
926        }
927    }
928
929    // Periodic cache flush (every 5 minutes)
930    {
931        let flush_db = state.db.clone();
932        let flush_llm = Arc::clone(&state.llm);
933        tokio::spawn(async move {
934            let mut interval = tokio::time::interval(std::time::Duration::from_secs(300));
935            interval.tick().await; // skip first immediate tick
936            loop {
937                interval.tick().await;
938                let entries = {
939                    let llm = flush_llm.read().await;
940                    llm.cache
941                        .lock()
942                        .unwrap_or_else(|e| e.into_inner())
943                        .export_entries()
944                };
945                for (hash, entry) in &entries {
946                    let expires = chrono::Utc::now()
947                        + chrono::Duration::seconds(entry.ttl_remaining_secs as i64);
948                    let pe = roboticus_db::cache::PersistedCacheEntry {
949                        prompt_hash: hash.clone(),
950                        response: entry.content.clone(),
951                        model: entry.model.clone(),
952                        tokens_saved: entry.tokens_saved,
953                        hit_count: entry.hits,
954                        embedding: entry.embedding.clone(),
955                        created_at: chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S").to_string(),
956                        expires_at: Some(expires.format("%Y-%m-%dT%H:%M:%S").to_string()),
957                    };
958                    roboticus_db::cache::save_cache_entry(&flush_db, hash, &pe)
959                        .inspect_err(
960                            |e| tracing::warn!(error = %e, hash, "failed to persist cache entry"),
961                        )
962                        .ok();
963                }
964                roboticus_db::cache::evict_expired_cache(&flush_db)
965                    .inspect_err(|e| tracing::warn!(error = %e, "failed to evict expired cache"))
966                    .ok();
967                tracing::debug!(count = entries.len(), "Flushed semantic cache to database");
968            }
969        });
970        tracing::info!("Cache flush daemon spawned (5min interval)");
971    }
972
973    // Start heartbeat daemon
974    {
975        let hb_wallet = Arc::clone(&state.wallet);
976        let hb_db = state.db.clone();
977        let hb_session_cfg = config.session.clone();
978        let hb_digest_cfg = config.digest.clone();
979        let hb_agent_id = config.agent.id.clone();
980        let daemon = roboticus_schedule::HeartbeatDaemon::new(60_000);
981        tokio::spawn(async move {
982            roboticus_schedule::run_heartbeat(
983                daemon,
984                hb_wallet,
985                hb_db,
986                hb_session_cfg,
987                hb_digest_cfg,
988                hb_agent_id,
989            )
990            .await;
991        });
992        tracing::info!("Heartbeat daemon spawned (60s interval)");
993    }
994
995    // Delivery retry queue drain (every 30 seconds)
996    {
997        let drain_router = Arc::clone(&state.channel_router);
998        tokio::spawn(async move {
999            let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
1000            interval.tick().await;
1001            loop {
1002                interval.tick().await;
1003                drain_router.drain_retry_queue().await;
1004            }
1005        });
1006        tracing::info!("Delivery retry queue drain daemon spawned (30s interval)");
1007    }
1008
1009    // Start cron worker
1010    {
1011        let instance_id = config.agent.id.clone();
1012        let cron_state = state.clone();
1013        tokio::spawn(async move {
1014            crate::cron_runtime::run_cron_worker(cron_state, instance_id).await;
1015        });
1016        tracing::info!("Cron worker spawned");
1017    }
1018
1019    // Periodic mechanic-check sweep (default 6h, per-instance).
1020    {
1021        let db_path = config.database.path.clone();
1022        let interval_secs = std::env::var("ROBOTICUS_MECHANIC_CHECK_INTERVAL_SECS")
1023            .ok()
1024            .or_else(|| std::env::var("ROBOTICUS_STATE_HYGIENE_INTERVAL_SECS").ok())
1025            .and_then(|v| v.parse::<u64>().ok())
1026            .filter(|v| *v >= 300)
1027            .unwrap_or(21_600);
1028        tokio::spawn(async move {
1029            let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
1030            interval.tick().await;
1031            loop {
1032                interval.tick().await;
1033                match crate::state_hygiene::run_state_hygiene(&db_path) {
1034                    Ok(report) if report.changed => {
1035                        tracing::info!(
1036                            changed_rows = report.changed_rows,
1037                            subagent_rows_normalized = report.subagent_rows_normalized,
1038                            cron_payload_rows_repaired = report.cron_payload_rows_repaired,
1039                            cron_jobs_disabled_invalid_expr =
1040                                report.cron_jobs_disabled_invalid_expr,
1041                            "periodic mechanic checks applied"
1042                        );
1043                    }
1044                    Ok(_) => tracing::debug!("periodic mechanic checks: no changes"),
1045                    Err(e) => tracing::warn!(error = %e, "periodic mechanic checks failed"),
1046                }
1047            }
1048        });
1049        tracing::info!(interval_secs, "Mechanic checks daemon spawned");
1050    }
1051
1052    {
1053        let startup_announce_channels = config.channels.startup_announcement_channels();
1054
1055        let display_host = match config.server.bind.as_str() {
1056            "127.0.0.1" | "0.0.0.0" | "::1" | "::" => "localhost",
1057            other => other,
1058        };
1059        let channel_list: Vec<&str> = {
1060            let mut ch = vec!["web"];
1061            if config.channels.telegram.is_some() {
1062                ch.push("telegram");
1063            }
1064            if config.channels.whatsapp.is_some() {
1065                ch.push("whatsapp");
1066            }
1067            if config.channels.discord.is_some() {
1068                ch.push("discord");
1069            }
1070            if config.channels.signal.is_some() {
1071                ch.push("signal");
1072            }
1073            ch
1074        };
1075        let announce_text = format!(
1076            "🤖 *Roboticus Online*\n\n\
1077             🧠 *{}* (`{}`)\n\
1078             ⚡ `{}`\n\
1079             🔀 routing: {}\n\
1080             🌐 `{}:{}`\n\
1081             📡 {}\n\n\
1082             🕐 {}",
1083            config.agent.name,
1084            config.agent.id,
1085            config.models.primary,
1086            config.models.routing.mode,
1087            display_host,
1088            config.server.port,
1089            channel_list.join(" · "),
1090            chrono::Local::now().format("%Y-%m-%d %H:%M %Z"),
1091        );
1092
1093        if startup_announce_channels.iter().any(|c| c == "telegram") {
1094            if let (Some(adapter), Some(tg_cfg)) =
1095                (state.telegram.clone(), config.channels.telegram.as_ref())
1096            {
1097                let announce_targets = tg_cfg.allowed_chat_ids.clone();
1098                if announce_targets.is_empty() {
1099                    tracing::warn!(
1100                        "Telegram startup announcement skipped: channels.telegram.allowed_chat_ids is empty"
1101                    );
1102                } else {
1103                    let text = announce_text.clone();
1104                    tokio::spawn(async move {
1105                        for chat_id in announce_targets {
1106                            let chat = chat_id.to_string();
1107                            match adapter
1108                                .send(roboticus_channels::OutboundMessage {
1109                                    content: text.clone(),
1110                                    recipient_id: chat.clone(),
1111                                    metadata: None,
1112                                })
1113                                .await
1114                            {
1115                                Ok(()) => {
1116                                    tracing::info!(chat_id = %chat, "telegram startup announcement sent")
1117                                }
1118                                Err(e) => {
1119                                    tracing::warn!(chat_id = %chat, error = %e, "telegram startup announcement failed")
1120                                }
1121                            }
1122                        }
1123                    });
1124                }
1125            } else {
1126                tracing::warn!(
1127                    "Telegram startup announcement requested but telegram channel is not enabled/configured"
1128                );
1129            }
1130        }
1131
1132        if startup_announce_channels.iter().any(|c| c == "whatsapp") {
1133            if let (Some(adapter), Some(wa_cfg)) =
1134                (state.whatsapp.clone(), config.channels.whatsapp.as_ref())
1135            {
1136                let targets = wa_cfg.allowed_numbers.clone();
1137                if targets.is_empty() {
1138                    tracing::warn!(
1139                        "WhatsApp startup announcement skipped: channels.whatsapp.allowed_numbers is empty"
1140                    );
1141                } else {
1142                    let text = announce_text.clone();
1143                    tokio::spawn(async move {
1144                        for number in targets {
1145                            match adapter
1146                                .send(roboticus_channels::OutboundMessage {
1147                                    content: text.clone(),
1148                                    recipient_id: number.clone(),
1149                                    metadata: None,
1150                                })
1151                                .await
1152                            {
1153                                Ok(()) => {
1154                                    tracing::info!(recipient = %number, "whatsapp startup announcement sent")
1155                                }
1156                                Err(e) => {
1157                                    tracing::warn!(recipient = %number, error = %e, "whatsapp startup announcement failed")
1158                                }
1159                            }
1160                        }
1161                    });
1162                }
1163            } else {
1164                tracing::warn!(
1165                    "WhatsApp startup announcement requested but whatsapp channel is not enabled/configured"
1166                );
1167            }
1168        }
1169
1170        if startup_announce_channels.iter().any(|c| c == "signal") {
1171            if let (Some(adapter), Some(sig_cfg)) =
1172                (state.signal.clone(), config.channels.signal.as_ref())
1173            {
1174                let targets = sig_cfg.allowed_numbers.clone();
1175                if targets.is_empty() {
1176                    tracing::warn!(
1177                        "Signal startup announcement skipped: channels.signal.allowed_numbers is empty"
1178                    );
1179                } else {
1180                    let text = announce_text.clone();
1181                    tokio::spawn(async move {
1182                        for number in targets {
1183                            match adapter
1184                                .send(roboticus_channels::OutboundMessage {
1185                                    content: text.clone(),
1186                                    recipient_id: number.clone(),
1187                                    metadata: None,
1188                                })
1189                                .await
1190                            {
1191                                Ok(()) => {
1192                                    tracing::info!(recipient = %number, "signal startup announcement sent")
1193                                }
1194                                Err(e) => {
1195                                    tracing::warn!(recipient = %number, error = %e, "signal startup announcement failed")
1196                                }
1197                            }
1198                        }
1199                    });
1200                }
1201            } else {
1202                tracing::warn!(
1203                    "Signal startup announcement requested but signal channel is not enabled/configured"
1204                );
1205            }
1206        }
1207
1208        for ch in &startup_announce_channels {
1209            if ch != "telegram" && ch != "whatsapp" && ch != "signal" {
1210                tracing::warn!(
1211                    channel = %ch,
1212                    "startup announcement requested for channel without recipient mapping support"
1213                );
1214            }
1215        }
1216    }
1217
1218    if state.telegram.is_some() {
1219        let use_polling = config
1220            .channels
1221            .telegram
1222            .as_ref()
1223            .map(|c| !c.webhook_mode)
1224            .unwrap_or(true);
1225        if use_polling {
1226            let poll_state = state.clone();
1227            tokio::spawn(async move {
1228                api::telegram_poll_loop(poll_state).await;
1229            });
1230        }
1231    }
1232    if state.discord.is_some() {
1233        let poll_state = state.clone();
1234        tokio::spawn(async move {
1235            api::discord_poll_loop(poll_state).await;
1236        });
1237    }
1238    if state.signal.is_some() {
1239        let poll_state = state.clone();
1240        tokio::spawn(async move {
1241            api::signal_poll_loop(poll_state).await;
1242        });
1243    }
1244    if state.email.is_some() {
1245        let poll_state = state.clone();
1246        tokio::spawn(async move {
1247            api::email_poll_loop(poll_state).await;
1248        });
1249    }
1250
1251    let auth_layer = ApiKeyLayer::new(config.server.api_key.clone());
1252    let local_origin = format!("http://{}:{}", config.server.bind, config.server.port);
1253    let origin_header = local_origin
1254        .parse::<axum::http::HeaderValue>()
1255        .unwrap_or_else(|e| {
1256            tracing::warn!(
1257                origin = %local_origin,
1258                error = %e,
1259                "CORS origin failed to parse, falling back to 127.0.0.1 loopback"
1260            );
1261            axum::http::HeaderValue::from_static("http://127.0.0.1:3000")
1262        });
1263    let cors = CorsLayer::new()
1264        .allow_origin(origin_header)
1265        .allow_methods([
1266            axum::http::Method::GET,
1267            axum::http::Method::POST,
1268            axum::http::Method::PUT,
1269            axum::http::Method::DELETE,
1270        ])
1271        .allow_headers([
1272            axum::http::header::CONTENT_TYPE,
1273            axum::http::header::AUTHORIZATION,
1274            axum::http::HeaderName::from_static("x-api-key"),
1275        ]);
1276    let authed_routes = build_router(state.clone()).layer(auth_layer);
1277
1278    // WebSocket route handles its own auth (header OR ticket) so it
1279    // lives outside the API-key middleware layer.
1280    let ws_routes = axum::Router::new().route(
1281        "/ws",
1282        ws_route(
1283            event_bus.clone(),
1284            state.ws_tickets.clone(),
1285            config.server.api_key.clone(),
1286        ),
1287    );
1288
1289    // MCP protocol endpoint uses bearer token auth (same API key).
1290    // Lives outside the main API-key middleware because the MCP transport
1291    // service (StreamableHttpService) needs its own route handling.
1292    let mcp_routes = build_mcp_router(&state, config.server.api_key.clone());
1293
1294    let public_routes = build_public_router(state);
1295
1296    let app = authed_routes
1297        .merge(ws_routes)
1298        .merge(mcp_routes)
1299        .merge(public_routes)
1300        .layer(cors)
1301        .layer(rate_limiter);
1302    Ok(app)
1303}
1304
1305#[cfg(test)]
1306pub mod test_support;
1307
1308#[cfg(test)]
1309mod tests {
1310    use super::*;
1311    use crate::test_support::EnvGuard;
1312
1313    const BOOTSTRAP_CONFIG: &str = r#"
1314[agent]
1315name = "Roboticus"
1316id = "roboticus-test"
1317
1318[server]
1319port = 18789
1320bind = "127.0.0.1"
1321
1322[database]
1323path = ":memory:"
1324
1325[models]
1326primary = "ollama/qwen3:8b"
1327"#;
1328
1329    #[tokio::test]
1330    async fn bootstrap_with_memory_db_succeeds() {
1331        let config = RoboticusConfig::from_str(BOOTSTRAP_CONFIG).expect("parse config");
1332        let result = bootstrap(config).await;
1333        assert!(
1334            result.is_ok(),
1335            "bootstrap with :memory: should succeed: {:?}",
1336            result.err()
1337        );
1338    }
1339
1340    #[tokio::test]
1341    async fn bootstrap_handles_enabled_channels_with_missing_credentials() {
1342        let dir = tempfile::tempdir().unwrap();
1343        let db_path = dir.path().join("state.db");
1344        let cfg = format!(
1345            r#"
1346[agent]
1347name = "T"
1348id = "t"
1349
1350[server]
1351bind = "127.0.0.1"
1352port = 18789
1353
1354[database]
1355path = "{db_path}"
1356
1357[models]
1358primary = "ollama/qwen3:8b"
1359
1360[channels.telegram]
1361enabled = true
1362token_env = "TEST_TELEGRAM_MISSING"
1363
1364[channels.whatsapp]
1365enabled = true
1366token_env = "TEST_WHATSAPP_MISSING"
1367phone_number_id = ""
1368
1369[channels.discord]
1370enabled = true
1371token_env = "TEST_DISCORD_MISSING"
1372
1373[channels.signal]
1374enabled = true
1375phone_number = ""
1376
1377[channels.email]
1378enabled = true
1379password_env = "TEST_EMAIL_PASSWORD_MISSING"
1380
1381[channels.voice]
1382enabled = true
1383
1384[obsidian]
1385enabled = true
1386vault_path = "{vault_path}"
1387watch_for_changes = true
1388"#,
1389            db_path = db_path.display(),
1390            vault_path = dir.path().join("missing-vault").display(),
1391        );
1392        let config = RoboticusConfig::from_str(&cfg).expect("parse config");
1393        let result = bootstrap(config).await;
1394        assert!(
1395            result.is_ok(),
1396            "bootstrap should tolerate disabled channel credentials and invalid obsidian path: {:?}",
1397            result.err()
1398        );
1399    }
1400
1401    #[tokio::test]
1402    async fn bootstrap_loads_taskable_subagents_from_database() {
1403        let dir = tempfile::tempdir().unwrap();
1404        let db_path = dir.path().join("state.db");
1405        let db = roboticus_db::Database::new(db_path.to_str().unwrap()).unwrap();
1406
1407        roboticus_db::agents::upsert_sub_agent(
1408            &db,
1409            &roboticus_db::agents::SubAgentRow {
1410                id: "sa-1".into(),
1411                name: "finance_specialist".into(),
1412                display_name: Some("Finance Specialist".into()),
1413                model: "auto".into(),
1414                fallback_models_json: Some("[]".into()),
1415                role: "subagent".into(),
1416                description: Some("Handles finance tasks".into()),
1417                skills_json: Some(r#"["budgeting"]"#.into()),
1418                enabled: true,
1419                session_count: 0,
1420                last_used_at: None,
1421            },
1422        )
1423        .unwrap();
1424        roboticus_db::agents::upsert_sub_agent(
1425            &db,
1426            &roboticus_db::agents::SubAgentRow {
1427                id: "sa-2".into(),
1428                name: "observer".into(),
1429                display_name: Some("Observer".into()),
1430                model: "ollama/qwen3:8b".into(),
1431                fallback_models_json: Some("[]".into()),
1432                role: "model-proxy".into(),
1433                description: Some("Not taskable".into()),
1434                skills_json: None,
1435                enabled: true,
1436                session_count: 0,
1437                last_used_at: None,
1438            },
1439        )
1440        .unwrap();
1441
1442        let cfg = format!(
1443            r#"
1444[agent]
1445name = "T"
1446id = "t"
1447
1448[server]
1449bind = "127.0.0.1"
1450port = 18789
1451
1452[database]
1453path = "{db_path}"
1454
1455[models]
1456primary = "ollama/qwen3:8b"
1457"#,
1458            db_path = db_path.display(),
1459        );
1460        let config = RoboticusConfig::from_str(&cfg).expect("parse config");
1461        let result = bootstrap(config).await;
1462        assert!(
1463            result.is_ok(),
1464            "bootstrap should register enabled taskable subagents from db: {:?}",
1465            result.err()
1466        );
1467    }
1468
1469    #[test]
1470    fn cleanup_old_logs_no_panic_on_missing_dir() {
1471        let dir = std::path::Path::new("/tmp/roboticus-test-nonexistent-dir-cleanup");
1472        cleanup_old_logs(dir, 30);
1473    }
1474
1475    #[test]
1476    fn cleanup_old_logs_keeps_recent_logs() {
1477        let dir = tempfile::tempdir().unwrap();
1478        let recent = dir.path().join("recent.log");
1479        std::fs::write(&recent, "fresh log").unwrap();
1480
1481        cleanup_old_logs(dir.path(), 30);
1482        assert!(recent.exists(), "recent logs should remain");
1483    }
1484
1485    #[test]
1486    fn cleanup_old_logs_ignores_non_log_files() {
1487        let dir = tempfile::tempdir().unwrap();
1488        let txt = dir.path().join("data.txt");
1489        std::fs::write(&txt, "keep me").unwrap();
1490
1491        cleanup_old_logs(dir.path(), 0);
1492        assert!(txt.exists(), "non-log files should not be deleted");
1493    }
1494
1495    #[test]
1496    fn cleanup_old_logs_empty_dir() {
1497        let dir = tempfile::tempdir().unwrap();
1498        cleanup_old_logs(dir.path(), 1);
1499    }
1500
1501    #[test]
1502    fn taskable_subagent_roles_are_strict() {
1503        assert!(is_taskable_subagent_role("subagent"));
1504        assert!(is_taskable_subagent_role("specialist"));
1505        assert!(is_taskable_subagent_role("SubAgent"));
1506        assert!(!is_taskable_subagent_role("model-proxy"));
1507    }
1508
1509    #[test]
1510    fn resolve_token_prefers_keystore_reference_then_env_then_empty() {
1511        let dir = tempfile::tempdir().unwrap();
1512        let keystore_path = dir.path().join("keystore.enc");
1513        let keystore = roboticus_core::keystore::Keystore::new(keystore_path);
1514        keystore.unlock("pw").unwrap();
1515        keystore.set("telegram_bot_token", "from_keystore").unwrap();
1516
1517        let _env = EnvGuard::set("TEST_TELEGRAM_TOKEN", "from_env");
1518        let token = resolve_token(
1519            &Some("keystore:telegram_bot_token".to_string()),
1520            "TEST_TELEGRAM_TOKEN",
1521            &keystore,
1522        );
1523        assert_eq!(token, "from_keystore");
1524
1525        let fallback = resolve_token(
1526            &Some("keystore:missing".to_string()),
1527            "TEST_TELEGRAM_TOKEN",
1528            &keystore,
1529        );
1530        assert_eq!(fallback, "from_env");
1531
1532        let empty = resolve_token(&None, "", &keystore);
1533        assert!(empty.is_empty());
1534    }
1535
1536    #[test]
1537    fn resolve_token_uses_env_when_no_keystore_ref_is_provided() {
1538        let dir = tempfile::tempdir().unwrap();
1539        let keystore_path = dir.path().join("keystore.enc");
1540        let keystore = roboticus_core::keystore::Keystore::new(keystore_path);
1541        keystore.unlock("pw").unwrap();
1542
1543        let _env = EnvGuard::set("TEST_SIGNAL_TOKEN", "from_env_only");
1544        let token = resolve_token(&None, "TEST_SIGNAL_TOKEN", &keystore);
1545        assert_eq!(token, "from_env_only");
1546    }
1547
1548    #[test]
1549    fn resolve_token_returns_empty_for_empty_env_var() {
1550        let dir = tempfile::tempdir().unwrap();
1551        let keystore_path = dir.path().join("keystore.enc");
1552        let keystore = roboticus_core::keystore::Keystore::new(keystore_path);
1553        keystore.unlock("pw").unwrap();
1554
1555        let _env = EnvGuard::set("TEST_EMPTY_TOKEN", "");
1556        let token = resolve_token(&None, "TEST_EMPTY_TOKEN", &keystore);
1557        assert!(token.is_empty());
1558    }
1559
1560    #[test]
1561    fn cleanup_old_logs_can_prune_when_window_is_zero_days() {
1562        let dir = tempfile::tempdir().unwrap();
1563        let old_log = dir.path().join("old.log");
1564        std::fs::write(&old_log, "stale").unwrap();
1565        std::thread::sleep(std::time::Duration::from_millis(10));
1566        cleanup_old_logs(dir.path(), 0);
1567        assert!(!old_log.exists());
1568    }
1569}