Skip to main content

roboticus_server/
lib.rs

1//! # roboticus-server
2//!
3//! Top-level binary crate that assembles all Roboticus workspace crates into a
4//! single runtime. The [`bootstrap()`] function initializes the database,
5//! wallet, LLM pipeline, agent loop, channel adapters, and background daemons,
6//! then returns an axum `Router` ready to serve.
7//!
8//! ## Key Types
9//!
10//! - [`AppState`] -- Shared application state passed to all route handlers
11//! - [`PersonalityState`] -- Loaded personality files (OS, firmware, identity)
12//! - [`EventBus`] -- Tokio broadcast channel for WebSocket event push
13//!
14//! ## Modules
15//!
16//! - `api` -- REST API mount point, `build_router()`, route modules
17//! - `auth` -- API key authentication middleware layer
18//! - `rate_limit` -- Global + per-IP rate limiting (sliding window)
19//! - `dashboard` -- Embedded SPA serving (compile-time or filesystem)
20//! - `ws` -- WebSocket upgrade and event broadcasting
21//! - `cli/` -- CLI command handlers (serve, status, sessions, memory, wallet, etc.)
22//! - `daemon` -- Daemon install, status, uninstall
23//! - `migrate/` -- Migration engine, skill import/export
24//! - `plugins` -- Plugin registry initialization and loading
25//!
26//! ## Bootstrap Sequence
27//!
28//! 1. Parse CLI → load config → init DB → load wallet → generate HMAC secret
29//! 2. Init LLM client + router + embedding → load semantic cache
30//! 3. Init agent loop + tool registry + memory retriever
31//! 4. Register channel adapters (Telegram, WhatsApp, Discord, Signal)
32//! 5. Spawn background daemons (heartbeat, cron, cache flush, ANN rebuild)
33//! 6. Build axum router with auth + CORS + rate limiting
34
35// ── Re-exports from roboticus-api ──────────────────────────────
36pub use roboticus_api as api_crate;
37pub use roboticus_api::abuse;
38pub use roboticus_api::api;
39pub use roboticus_api::auth;
40pub use roboticus_api::config_runtime;
41pub use roboticus_api::cron_runtime;
42pub use roboticus_api::dashboard;
43pub use roboticus_api::rate_limit;
44pub use roboticus_api::ws;
45pub use roboticus_api::ws_ticket;
46pub use roboticus_api::{
47    AppState, EventBus, PersonalityState, TicketStore, build_dashboard_html, build_mcp_router,
48    build_public_router, build_router, dashboard_handler, ws_route,
49};
50
51// ── Re-exports from roboticus-cli ─────────────────────────────
52pub use roboticus_cli::cli;
53pub use roboticus_cli::migrate;
54
55// ── Server-local modules ─────────────────────────────────────
56pub mod config_maintenance;
57pub mod daemon;
58pub mod plugins;
59pub use roboticus_cli::state_hygiene;
60
61use std::sync::Arc;
62use std::sync::OnceLock;
63use std::sync::atomic::{AtomicBool, Ordering};
64use std::time::Duration;
65
66use tokio::sync::RwLock;
67use tower_http::cors::CorsLayer;
68
69use auth::ApiKeyLayer;
70use roboticus_agent::policy::{
71    AuthorityRule, CommandSafetyRule, FinancialRule, PathProtectionRule, PolicyEngine,
72    RateLimitRule, ValidationRule,
73};
74use roboticus_agent::subagents::SubagentRegistry;
75use roboticus_browser::Browser;
76use roboticus_channels::ChannelAdapter;
77use roboticus_channels::a2a::A2aProtocol;
78use roboticus_channels::router::ChannelRouter;
79use roboticus_channels::telegram::TelegramAdapter;
80use roboticus_channels::whatsapp::WhatsAppAdapter;
81use roboticus_core::RoboticusConfig;
82use roboticus_db::Database;
83use roboticus_llm::LlmService;
84use roboticus_llm::OAuthManager;
85use roboticus_wallet::{WalletPaymentHandler, WalletService};
86
87use roboticus_agent::approvals::ApprovalManager;
88use roboticus_agent::obsidian::ObsidianVault;
89use roboticus_agent::obsidian_tools::{ObsidianReadTool, ObsidianSearchTool, ObsidianWriteTool};
90use roboticus_agent::tools::{
91    BashTool, EchoTool, EditFileTool, GlobFilesTool, ListDirectoryTool, ReadFileTool,
92    ScriptRunnerTool, SearchFilesTool, ToolRegistry, WriteFileTool,
93};
94use roboticus_channels::discord::DiscordAdapter;
95use roboticus_channels::email::EmailAdapter;
96use roboticus_channels::signal::SignalAdapter;
97use roboticus_channels::voice::{VoiceConfig, VoicePipeline};
98
99use rate_limit::GlobalRateLimitLayer;
100
101static STDERR_ENABLED: AtomicBool = AtomicBool::new(false);
102static LOG_GUARD: OnceLock<tracing_appender::non_blocking::WorkerGuard> = OnceLock::new();
103
104fn is_taskable_subagent_role(role: &str) -> bool {
105    role.eq_ignore_ascii_case("subagent") || role.eq_ignore_ascii_case("specialist")
106}
107
108pub fn enable_stderr_logging() {
109    STDERR_ENABLED.store(true, Ordering::Release);
110}
111
112fn init_logging(config: &RoboticusConfig) {
113    use tracing_appender::rolling::{RollingFileAppender, Rotation};
114    use tracing_subscriber::EnvFilter;
115    use tracing_subscriber::Layer;
116    use tracing_subscriber::filter::filter_fn;
117    use tracing_subscriber::fmt;
118    use tracing_subscriber::layer::SubscriberExt;
119    use tracing_subscriber::util::SubscriberInitExt;
120
121    let level = config.agent.log_level.as_str();
122    // Promote hyper errors to warn so IncompleteMessage and connection
123    // resets are visible in production logs (they default to DEBUG).
124    let base_filter = format!("{level},hyper=warn,h2=warn,rustls=warn");
125    let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&base_filter));
126
127    let stderr_gate = filter_fn(|_| STDERR_ENABLED.load(Ordering::Acquire));
128
129    let log_dir = &config.server.log_dir;
130    let stderr_layer = fmt::layer()
131        .with_writer(std::io::stderr)
132        .with_filter(stderr_gate);
133
134    let file_appender = std::fs::create_dir_all(log_dir).ok().and_then(|_| {
135        RollingFileAppender::builder()
136            .rotation(Rotation::DAILY)
137            .filename_prefix("roboticus")
138            .filename_suffix("log")
139            .build(log_dir)
140            .map_err(|e| {
141                eprintln!(
142                    "warning: failed to initialize file logging in {}: {e}",
143                    log_dir.display()
144                );
145                e
146            })
147            .ok()
148    });
149
150    if let Some(file_appender) = file_appender {
151        let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
152        let _ = LOG_GUARD.set(guard);
153
154        let file_layer = fmt::layer()
155            .with_writer(non_blocking)
156            .with_ansi(false)
157            .json();
158
159        let _ = tracing_subscriber::registry()
160            .with(filter)
161            .with(stderr_layer)
162            .with(file_layer)
163            .try_init();
164    } else {
165        let _ = tracing_subscriber::registry()
166            .with(filter)
167            .with(stderr_layer)
168            .try_init();
169    }
170
171    cleanup_old_logs(log_dir, config.server.log_max_days);
172}
173
174fn cleanup_old_logs(log_dir: &std::path::Path, max_days: u32) {
175    let cutoff =
176        std::time::SystemTime::now() - std::time::Duration::from_secs(u64::from(max_days) * 86400);
177
178    let entries = match std::fs::read_dir(log_dir) {
179        Ok(e) => e,
180        Err(_) => return,
181    };
182
183    for entry in entries.flatten() {
184        let path = entry.path();
185        if path.extension().and_then(|e| e.to_str()) != Some("log") {
186            continue;
187        }
188        if let Ok(meta) = entry.metadata()
189            && let Ok(modified) = meta.modified()
190            && modified < cutoff
191        {
192            // best-effort: log rotation cleanup failure is non-critical
193            let _ = std::fs::remove_file(&path);
194        }
195    }
196}
197
198/// Resolve a channel token: keystore reference first, then env var fallback.
199fn resolve_token(
200    token_ref: &Option<String>,
201    token_env: &str,
202    keystore: &roboticus_core::Keystore,
203) -> String {
204    if let Some(r) = token_ref
205        && let Some(name) = r.strip_prefix("keystore:")
206    {
207        if let Some(val) = keystore.get(name) {
208            return val;
209        }
210        tracing::warn!(key = %name, "keystore reference not found, falling back to env var");
211    }
212    if !token_env.is_empty() {
213        return match std::env::var(token_env) {
214            Ok(val) if !val.is_empty() => val,
215            Ok(_) => {
216                tracing::warn!(env_var = %token_env, "API key env var is set but empty");
217                String::new()
218            }
219            Err(_) => {
220                tracing::warn!(env_var = %token_env, "API key env var is not set");
221                String::new()
222            }
223        };
224    }
225    String::new()
226}
227
228/// Builds the application state and router from config. Used by the binary and by tests.
229pub async fn bootstrap(
230    config: RoboticusConfig,
231) -> Result<axum::Router, Box<dyn std::error::Error>> {
232    bootstrap_with_config_path(config, None).await
233}
234
235pub async fn bootstrap_with_config_path(
236    config: RoboticusConfig,
237    config_path: Option<std::path::PathBuf>,
238) -> Result<axum::Router, Box<dyn std::error::Error>> {
239    init_logging(&config);
240
241    let personality_state = api::PersonalityState::from_workspace(&config.agent.workspace);
242
243    if !personality_state.os_text.is_empty() {
244        tracing::info!(
245            personality = %personality_state.identity.name,
246            generated_by = %personality_state.identity.generated_by,
247            "Loaded personality files from workspace"
248        );
249    } else {
250        tracing::info!("No personality files found in workspace, using defaults");
251    }
252
253    let db_path = config.database.path.to_string_lossy().to_string();
254    let db = Database::new(&db_path)?;
255    match crate::state_hygiene::run_state_hygiene(&config.database.path) {
256        Ok(report) if report.changed => {
257            tracing::info!(
258                changed_rows = report.changed_rows,
259                subagent_rows_normalized = report.subagent_rows_normalized,
260                cron_payload_rows_repaired = report.cron_payload_rows_repaired,
261                cron_jobs_disabled_invalid_expr = report.cron_jobs_disabled_invalid_expr,
262                "applied startup mechanic checks"
263            );
264        }
265        Ok(_) => {}
266        Err(e) => tracing::warn!(error = %e, "startup mechanic checks failed"),
267    }
268    match roboticus_db::sessions::backfill_nicknames(&db) {
269        Ok(0) => {}
270        Ok(n) => tracing::info!(count = n, "Backfilled session nicknames"),
271        Err(e) => tracing::warn!(error = %e, "Failed to backfill session nicknames"),
272    }
273    let mut llm = LlmService::new(&config)?;
274    // Seed quality tracker with recent observations so metascore routing
275    // has a warm start instead of assuming 0.8 for every model.
276    match roboticus_db::metrics::recent_quality_scores(&db, 200) {
277        Ok(scores) if !scores.is_empty() => llm.quality.seed_from_history(&scores),
278        Ok(_) => {}
279        Err(e) => tracing::warn!(error = %e, "failed to seed quality tracker from history"),
280    }
281    let wallet = WalletService::new(&config).await?;
282
283    // Wire x402 payment handler: when the LLM client hits an HTTP 402, it
284    // signs an EIP-3009 authorization via the agent wallet and retries.
285    // The treasury policy enforces per-payment caps from configuration.
286    let wallet_arc = Arc::new(wallet.wallet.clone());
287    let treasury_policy = roboticus_wallet::treasury::TreasuryPolicy::new(&config.treasury);
288    let x402_handler =
289        Arc::new(WalletPaymentHandler::new(wallet_arc).with_treasury_policy(treasury_policy));
290    llm.set_payment_handler(x402_handler);
291
292    let a2a = A2aProtocol::new(config.a2a.clone());
293    let plugin_env = {
294        let mut env = std::collections::HashMap::new();
295        env.insert("ROBOTICUS_DELEGATION_DEPTH".into(), "0".into());
296        env.insert(
297            "ROBOTICUS_WORKSPACE".into(),
298            config.agent.workspace.display().to_string(),
299        );
300        if let Some(ref ws) = config.skills.workspace_dir {
301            env.insert("ROBOTICUS_SKILLS_DIR".into(), ws.display().to_string());
302        }
303        env
304    };
305    let plugin_registry = plugins::init_plugin_registry(&config.plugins, plugin_env).await;
306    let mut policy_engine = PolicyEngine::new();
307    policy_engine.add_rule(Box::new(AuthorityRule));
308    policy_engine.add_rule(Box::new(CommandSafetyRule));
309    policy_engine.add_rule(Box::new(FinancialRule::new(
310        config.treasury.per_payment_cap,
311    )));
312    policy_engine.add_rule(Box::new(PathProtectionRule::from_config(
313        &config.security.filesystem,
314    )));
315    policy_engine.add_rule(Box::new(RateLimitRule::default()));
316    policy_engine.add_rule(Box::new(ValidationRule));
317    let policy_engine = Arc::new(policy_engine);
318    let browser = Arc::new(Browser::new(config.browser.clone()));
319    let registry = Arc::new(SubagentRegistry::new(4, vec![]));
320
321    if let Ok(sub_agents) = roboticus_db::agents::list_enabled_sub_agents(&db) {
322        for sa in &sub_agents {
323            if !is_taskable_subagent_role(&sa.role) {
324                continue;
325            }
326            let resolved_model = match sa.model.trim().to_ascii_lowercase().as_str() {
327                "auto" | "orchestrator" => llm.router.select_model().to_string(),
328                _ => sa.model.clone(),
329            };
330            let fixed_skills = sa
331                .skills_json
332                .as_deref()
333                .and_then(|s| serde_json::from_str::<Vec<String>>(s).ok())
334                .unwrap_or_default();
335            let agent_config = roboticus_agent::subagents::AgentInstanceConfig {
336                id: sa.name.clone(),
337                name: sa.display_name.clone().unwrap_or_else(|| sa.name.clone()),
338                model: resolved_model,
339                skills: fixed_skills,
340                allowed_subagents: vec![],
341                max_concurrent: 4,
342            };
343            if let Err(e) = registry.register(agent_config).await {
344                tracing::warn!(agent = %sa.name, err = %e, "failed to register sub-agent");
345            } else if let Err(e) = registry.start_agent(&sa.name).await {
346                tracing::warn!(agent = %sa.name, err = %e, "failed to auto-start sub-agent");
347            }
348        }
349        if !sub_agents.is_empty() {
350            tracing::info!(
351                count = sub_agents.len(),
352                "registered sub-agents from database"
353            );
354        }
355    }
356
357    let event_bus = EventBus::new(256);
358
359    let keystore =
360        roboticus_core::keystore::Keystore::new(roboticus_core::keystore::Keystore::default_path());
361    if let Err(e) = keystore.unlock_machine() {
362        tracing::warn!("keystore auto-unlock failed: {e}");
363    }
364    let keystore = Arc::new(keystore);
365
366    let channel_router = Arc::new(ChannelRouter::with_store(db.clone()).await);
367    let telegram: Option<Arc<TelegramAdapter>> =
368        if let Some(ref tg_config) = config.channels.telegram {
369            if tg_config.enabled {
370                let token = resolve_token(&tg_config.token_ref, &tg_config.token_env, &keystore);
371                if !token.is_empty() {
372                    let adapter = Arc::new(TelegramAdapter::with_config(
373                        token,
374                        tg_config.poll_timeout_seconds,
375                        tg_config.allowed_chat_ids.clone(),
376                        tg_config.webhook_secret.clone(),
377                        config.security.deny_on_empty_allowlist,
378                    ));
379                    channel_router
380                        .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
381                        .await;
382                    tracing::info!("Telegram adapter registered");
383                    if tg_config.webhook_secret.is_none() {
384                        tracing::warn!(
385                            "Telegram webhook_secret not set; webhook endpoint will reject with 503"
386                        );
387                    }
388                    Some(adapter)
389                } else {
390                    tracing::warn!(
391                        token_env = %tg_config.token_env,
392                        "Telegram enabled but token is empty"
393                    );
394                    None
395                }
396            } else {
397                None
398            }
399        } else {
400            None
401        };
402
403    let whatsapp: Option<Arc<WhatsAppAdapter>> =
404        if let Some(ref wa_config) = config.channels.whatsapp {
405            if wa_config.enabled {
406                let token = resolve_token(&wa_config.token_ref, &wa_config.token_env, &keystore);
407                if !token.is_empty() && !wa_config.phone_number_id.is_empty() {
408                    let adapter = Arc::new(WhatsAppAdapter::with_config(
409                        token,
410                        wa_config.phone_number_id.clone(),
411                        wa_config.verify_token.clone(),
412                        wa_config.allowed_numbers.clone(),
413                        wa_config.app_secret.clone(),
414                        config.security.deny_on_empty_allowlist,
415                    )?);
416                    channel_router
417                        .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
418                        .await;
419                    tracing::info!("WhatsApp adapter registered");
420                    if wa_config.app_secret.is_none() {
421                        tracing::warn!(
422                            "WhatsApp app_secret not set; webhook endpoint will reject with 503"
423                        );
424                    }
425                    Some(adapter)
426                } else {
427                    tracing::warn!("WhatsApp enabled but token env or phone_number_id is empty");
428                    None
429                }
430            } else {
431                None
432            }
433        } else {
434            None
435        };
436
437    let discord: Option<Arc<DiscordAdapter>> = if let Some(ref dc_config) = config.channels.discord
438    {
439        if dc_config.enabled {
440            let token = resolve_token(&dc_config.token_ref, &dc_config.token_env, &keystore);
441            if !token.is_empty() {
442                let adapter = Arc::new(DiscordAdapter::with_config(
443                    token,
444                    dc_config.allowed_guild_ids.clone(),
445                    config.security.deny_on_empty_allowlist,
446                ));
447                channel_router
448                    .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
449                    .await;
450                // Start WebSocket gateway for real-time event reception
451                if let Err(e) = adapter.connect_gateway().await {
452                    tracing::error!(error = %e, "Failed to connect Discord gateway");
453                }
454                tracing::info!("Discord adapter registered with gateway");
455                Some(adapter)
456            } else {
457                tracing::warn!(
458                    token_env = %dc_config.token_env,
459                    "Discord enabled but token env var is empty"
460                );
461                None
462            }
463        } else {
464            None
465        }
466    } else {
467        None
468    };
469
470    let signal: Option<Arc<SignalAdapter>> = if let Some(ref sig_config) = config.channels.signal {
471        if sig_config.enabled {
472            if !sig_config.phone_number.is_empty() {
473                let adapter = Arc::new(SignalAdapter::with_config(
474                    sig_config.phone_number.clone(),
475                    sig_config.daemon_url.clone(),
476                    sig_config.allowed_numbers.clone(),
477                    config.security.deny_on_empty_allowlist,
478                ));
479                channel_router
480                    .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
481                    .await;
482                tracing::info!("Signal adapter registered");
483                Some(adapter)
484            } else {
485                tracing::warn!("Signal enabled but phone_number is empty");
486                None
487            }
488        } else {
489            None
490        }
491    } else {
492        None
493    };
494
495    let email: Option<Arc<EmailAdapter>> = if config.channels.email.enabled {
496        let email_cfg = &config.channels.email;
497        let password = if email_cfg.password_env.is_empty() {
498            String::new()
499        } else {
500            match std::env::var(&email_cfg.password_env) {
501                Ok(val) => val,
502                Err(_) => {
503                    tracing::warn!(env_var = %email_cfg.password_env, "email password env var is not set");
504                    String::new()
505                }
506            }
507        };
508        if email_cfg.smtp_host.is_empty()
509            || email_cfg.username.is_empty()
510            || password.is_empty()
511            || email_cfg.from_address.is_empty()
512        {
513            tracing::warn!("Email enabled but SMTP credentials are incomplete");
514            None
515        } else {
516            match EmailAdapter::new(
517                email_cfg.from_address.clone(),
518                email_cfg.smtp_host.clone(),
519                email_cfg.smtp_port,
520                email_cfg.imap_host.clone(),
521                email_cfg.imap_port,
522                email_cfg.username.clone(),
523                password,
524            ) {
525                Ok(email_adapter) => {
526                    // Resolve OAuth2 token from env if configured
527                    let oauth2_token =
528                        if email_cfg.use_oauth2 && !email_cfg.oauth2_token_env.is_empty() {
529                            std::env::var(&email_cfg.oauth2_token_env).ok()
530                        } else {
531                            None
532                        };
533                    let adapter = Arc::new(
534                        email_adapter
535                            .with_allowed_senders(email_cfg.allowed_senders.clone())
536                            .with_deny_on_empty(config.security.deny_on_empty_allowlist)
537                            .with_poll_interval(std::time::Duration::from_secs(
538                                email_cfg.poll_interval_seconds,
539                            ))
540                            .with_oauth2_token(oauth2_token)
541                            .with_imap_idle_enabled(email_cfg.imap_idle_enabled),
542                    );
543                    channel_router
544                        .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
545                        .await;
546                    tracing::info!("Email adapter registered");
547
548                    // Start background IMAP listener if IMAP is configured
549                    if !email_cfg.imap_host.is_empty()
550                        && let Err(e) = adapter.start_imap_listener().await
551                    {
552                        tracing::error!(error = %e, "Failed to start email IMAP listener");
553                    }
554
555                    Some(adapter)
556                }
557                Err(e) => {
558                    tracing::error!(error = %e, "Failed to create email adapter");
559                    None
560                }
561            }
562        }
563    } else {
564        None
565    };
566
567    let voice: Option<Arc<RwLock<VoicePipeline>>> = if config.channels.voice.enabled {
568        let mut voice_config = VoiceConfig::default();
569        if let Some(stt) = &config.channels.voice.stt_model {
570            voice_config.stt_model = stt.clone();
571        }
572        if let Some(tts) = &config.channels.voice.tts_model {
573            voice_config.tts_model = tts.clone();
574        }
575        if let Some(v) = &config.channels.voice.tts_voice {
576            voice_config.tts_voice = v.clone();
577        }
578        if let Ok(key) = std::env::var("OPENAI_API_KEY")
579            && !key.is_empty()
580        {
581            voice_config.api_key = Some(key);
582        }
583        tracing::info!("Voice pipeline initialized");
584        Some(Arc::new(RwLock::new(VoicePipeline::new(voice_config))))
585    } else {
586        None
587    };
588
589    let hmac_secret = {
590        use rand::RngCore;
591        let mut buf = vec![0u8; 32];
592        rand::rngs::OsRng.fill_bytes(&mut buf);
593        buf
594    };
595
596    let retriever = Arc::new(roboticus_agent::retrieval::MemoryRetriever::new(
597        config.memory.clone(),
598    ));
599
600    let mut tool_registry = ToolRegistry::new();
601    tool_registry.register(Box::new(EchoTool));
602    tool_registry.register(Box::new(BashTool));
603    tool_registry.register(Box::new(ScriptRunnerTool::new(
604        config.skills.clone(),
605        config.security.filesystem.clone(),
606    )));
607    tool_registry.register(Box::new(ReadFileTool));
608    tool_registry.register(Box::new(WriteFileTool));
609    tool_registry.register(Box::new(EditFileTool));
610    tool_registry.register(Box::new(ListDirectoryTool));
611    tool_registry.register(Box::new(GlobFilesTool));
612    tool_registry.register(Box::new(SearchFilesTool));
613
614    // Introspection tools — read-only runtime probes for agent self-awareness
615    use roboticus_agent::tools::{
616        GetChannelHealthTool, GetMemoryStatsTool, GetRuntimeContextTool, GetSubagentStatusTool,
617    };
618    tool_registry.register(Box::new(GetRuntimeContextTool));
619    tool_registry.register(Box::new(GetMemoryStatsTool));
620    tool_registry.register(Box::new(GetChannelHealthTool));
621    tool_registry.register(Box::new(GetSubagentStatusTool));
622
623    // Data tools — agent-managed tables via hippocampus
624    use roboticus_agent::tools::{AlterTableTool, CreateTableTool, DropTableTool};
625    tool_registry.register(Box::new(CreateTableTool));
626    tool_registry.register(Box::new(AlterTableTool));
627    tool_registry.register(Box::new(DropTableTool));
628
629    // Bridge active plugin tools into the same runtime registry used by ReAct.
630    plugins::register_plugin_tools(&mut tool_registry, Arc::clone(&plugin_registry)).await;
631
632    // Obsidian vault integration
633    let obsidian_vault: Option<Arc<RwLock<ObsidianVault>>> = if config.obsidian.enabled {
634        match ObsidianVault::from_config(&config.obsidian) {
635            Ok(vault) => {
636                let vault = Arc::new(RwLock::new(vault));
637                tool_registry.register(Box::new(ObsidianReadTool::new(Arc::clone(&vault))));
638                tool_registry.register(Box::new(ObsidianWriteTool::new(Arc::clone(&vault))));
639                tool_registry.register(Box::new(ObsidianSearchTool::new(Arc::clone(&vault))));
640                tracing::info!("Obsidian vault integration enabled");
641                Some(vault)
642            }
643            Err(e) => {
644                tracing::warn!(error = %e, "Failed to initialize Obsidian vault");
645                None
646            }
647        }
648    } else {
649        None
650    };
651
652    // Start vault file watcher if configured
653    if let Some(ref vault) = obsidian_vault
654        && config.obsidian.watch_for_changes
655    {
656        match roboticus_agent::obsidian::watcher::VaultWatcher::start(Arc::clone(vault)).await {
657            Ok(_watcher) => {
658                // watcher lives in the spawned task — drop is fine here
659                tracing::info!("Obsidian vault file watcher started");
660            }
661            Err(e) => {
662                tracing::warn!(error = %e, "Failed to start Obsidian vault watcher");
663            }
664        }
665    }
666
667    let tool_registry = Arc::new(tool_registry);
668
669    let capabilities = Arc::new(roboticus_agent::capability::CapabilityRegistry::new());
670    if let Err(e) = capabilities
671        .sync_from_tool_registry(Arc::clone(&tool_registry))
672        .await
673    {
674        tracing::warn!(error = %e, "initial capability sync from tool registry reported errors");
675    }
676
677    let approvals = Arc::new(ApprovalManager::new(config.approvals.clone()));
678
679    let oauth = Arc::new(OAuthManager::new()?);
680
681    let discovery_registry = Arc::new(RwLock::new(
682        roboticus_agent::discovery::DiscoveryRegistry::new(),
683    ));
684    let device_manager = Arc::new(RwLock::new(roboticus_agent::device::DeviceManager::new(
685        roboticus_agent::device::DeviceIdentity::generate(&config.agent.id),
686        config.devices.max_paired_devices,
687    )));
688
689    let mut mcp_clients = roboticus_agent::mcp::McpClientManager::new();
690    for c in &config.mcp.clients {
691        let mut conn =
692            roboticus_agent::mcp::McpClientConnection::new(c.name.clone(), c.url.clone());
693        if let Err(e) = conn.discover() {
694            tracing::warn!(mcp_name = %c.name, error = %e, "MCP client discovery failed at startup");
695        }
696        mcp_clients.add_connection(conn);
697    }
698    let mcp_clients = Arc::new(RwLock::new(mcp_clients));
699
700    // v2 MCP client connections (via rmcp)
701    let live_mcp = Arc::new(roboticus_agent::mcp::manager::McpConnectionManager::new());
702    live_mcp
703        .connect_all(&config.mcp.servers, &capabilities)
704        .await;
705    let live_mcp_tool_count = live_mcp.connected_count().await;
706    if live_mcp_tool_count > 0 {
707        tracing::info!(
708            servers = live_mcp_tool_count,
709            "MCP client connections established"
710        );
711    }
712
713    let mut mcp_server_registry = roboticus_agent::mcp::McpServerRegistry::new();
714    let exported = roboticus_agent::mcp::export_tools_as_mcp(
715        &tool_registry
716            .list()
717            .iter()
718            .map(|t| {
719                (
720                    t.name().to_string(),
721                    t.description().to_string(),
722                    t.parameters_schema(),
723                )
724            })
725            .collect::<Vec<_>>(),
726    );
727    for tool in exported {
728        mcp_server_registry.register_tool(tool);
729    }
730    mcp_server_registry.register_resource(roboticus_agent::mcp::McpResource {
731        uri: "roboticus://sessions/active".to_string(),
732        name: "Active Sessions".to_string(),
733        description: "Active sessions in the local runtime".to_string(),
734        mime_type: "application/json".to_string(),
735    });
736    mcp_server_registry.register_resource(roboticus_agent::mcp::McpResource {
737        uri: "roboticus://metrics/capacity".to_string(),
738        name: "Provider Capacity Stats".to_string(),
739        description: "Current provider utilization and headroom".to_string(),
740        mime_type: "application/json".to_string(),
741    });
742    let mcp_server_registry = Arc::new(RwLock::new(mcp_server_registry));
743
744    let ann_index = roboticus_db::ann::AnnIndex::new(config.memory.ann_index);
745    if config.memory.ann_index {
746        match ann_index.build_from_db(&db) {
747            Ok(count) => {
748                if ann_index.is_built() {
749                    tracing::info!(count, "ANN index built from database");
750                } else {
751                    tracing::info!(
752                        count,
753                        min = ann_index.min_entries_for_index,
754                        "ANN index below threshold, brute-force search will be used"
755                    );
756                }
757            }
758            Err(e) => {
759                tracing::warn!(error = %e, "Failed to build ANN index, falling back to brute-force");
760            }
761        }
762    }
763
764    // Initialize media service for multimodal attachment handling
765    let media_service = if config.multimodal.enabled {
766        match roboticus_channels::media::MediaService::new(&config.multimodal) {
767            Ok(svc) => {
768                tracing::info!(
769                    media_dir = ?config.multimodal.media_dir,
770                    "Media service initialized"
771                );
772                Some(Arc::new(svc))
773            }
774            Err(e) => {
775                tracing::error!(error = %e, "Failed to initialize media service");
776                None
777            }
778        }
779    } else {
780        None
781    };
782
783    let resolved_config_path =
784        config_path.unwrap_or_else(crate::config_runtime::resolve_default_config_path);
785
786    // Build rate limiter once — AppState and the middleware layer share the same
787    // Arc<Mutex<…>> so admin observability sees live counters.
788    let rate_limiter = GlobalRateLimitLayer::new(
789        u64::from(config.server.rate_limit_requests),
790        Duration::from_secs(config.server.rate_limit_window_secs),
791    )
792    .with_per_ip_capacity(u64::from(config.server.per_ip_rate_limit_requests))
793    .with_per_actor_capacity(u64::from(config.server.per_actor_rate_limit_requests))
794    .with_trusted_proxy_cidrs(&config.server.trusted_proxy_cidrs);
795
796    let state = AppState {
797        db,
798        config: Arc::new(RwLock::new(config.clone())),
799        llm: Arc::new(RwLock::new(llm)),
800        wallet: Arc::new(wallet),
801        a2a: Arc::new(RwLock::new(a2a)),
802        personality: Arc::new(RwLock::new(personality_state)),
803        hmac_secret: Arc::new(hmac_secret),
804        interviews: Arc::new(RwLock::new(std::collections::HashMap::new())),
805        plugins: plugin_registry,
806        policy_engine,
807        browser,
808        registry,
809        event_bus: event_bus.clone(),
810        channel_router,
811        telegram,
812        whatsapp,
813        retriever,
814        ann_index,
815        tools: tool_registry,
816        capabilities,
817        approvals,
818        discord,
819        signal,
820        email,
821        voice,
822        media_service,
823        discovery: discovery_registry,
824        devices: device_manager,
825        mcp_clients,
826        mcp_server: mcp_server_registry,
827        live_mcp,
828        oauth,
829        keystore,
830        obsidian: obsidian_vault,
831        started_at: std::time::Instant::now(),
832        config_path: Arc::new(resolved_config_path.clone()),
833        config_apply_status: crate::config_runtime::status_for_path(&resolved_config_path),
834        pending_specialist_proposals: Arc::new(RwLock::new(std::collections::HashMap::new())),
835        ws_tickets: ws_ticket::TicketStore::new(),
836        rate_limiter: rate_limiter.clone(),
837    };
838    state.resync_capabilities_from_tools().await;
839
840    // Periodic ANN index rebuild (every 10 minutes)
841    if config.memory.ann_index {
842        let ann_db = state.db.clone();
843        let ann_idx = state.ann_index.clone();
844        tokio::spawn(async move {
845            let mut interval = tokio::time::interval(std::time::Duration::from_secs(600));
846            interval.tick().await;
847            loop {
848                interval.tick().await;
849                match ann_idx.rebuild(&ann_db) {
850                    Ok(count) => {
851                        tracing::debug!(count, "ANN index rebuilt");
852                    }
853                    Err(e) => {
854                        tracing::warn!(error = %e, "ANN index rebuild failed");
855                    }
856                }
857            }
858        });
859        tracing::info!("ANN index rebuild daemon spawned (10min interval)");
860    }
861
862    // Load persisted semantic cache
863    {
864        let loaded = roboticus_db::cache::load_cache_entries(&state.db)
865            .inspect_err(|e| tracing::warn!(error = %e, "failed to load semantic cache entries"))
866            .unwrap_or_default();
867        if !loaded.is_empty() {
868            let imported: Vec<(String, roboticus_llm::ExportedCacheEntry)> = loaded
869                .into_iter()
870                .map(|(id, pe)| {
871                    let ttl = pe
872                        .expires_at
873                        .and_then(|e| {
874                            chrono::NaiveDateTime::parse_from_str(&e, "%Y-%m-%dT%H:%M:%S")
875                                .ok()
876                                .or_else(|| {
877                                    chrono::NaiveDateTime::parse_from_str(&e, "%Y-%m-%d %H:%M:%S")
878                                        .ok()
879                                })
880                        })
881                        .map(|exp| {
882                            let now = chrono::Utc::now().naive_utc();
883                            if exp > now {
884                                (exp - now).num_seconds().max(0) as u64
885                            } else {
886                                0
887                            }
888                        })
889                        .unwrap_or(3600);
890
891                    (
892                        id,
893                        roboticus_llm::ExportedCacheEntry {
894                            content: pe.response,
895                            model: pe.model,
896                            tokens_saved: pe.tokens_saved,
897                            hits: pe.hit_count,
898                            involved_tools: false,
899                            embedding: pe.embedding,
900                            ttl_remaining_secs: ttl,
901                        },
902                    )
903                })
904                .collect();
905            let count = imported.len();
906            let llm = state.llm.read().await;
907            llm.cache
908                .lock()
909                .unwrap_or_else(|e| e.into_inner())
910                .import_entries(imported);
911            tracing::info!(count, "Loaded semantic cache from database");
912        }
913    }
914
915    // Periodic cache flush (every 5 minutes)
916    {
917        let flush_db = state.db.clone();
918        let flush_llm = Arc::clone(&state.llm);
919        tokio::spawn(async move {
920            let mut interval = tokio::time::interval(std::time::Duration::from_secs(300));
921            interval.tick().await; // skip first immediate tick
922            loop {
923                interval.tick().await;
924                let entries = {
925                    let llm = flush_llm.read().await;
926                    llm.cache
927                        .lock()
928                        .unwrap_or_else(|e| e.into_inner())
929                        .export_entries()
930                };
931                for (hash, entry) in &entries {
932                    let expires = chrono::Utc::now()
933                        + chrono::Duration::seconds(entry.ttl_remaining_secs as i64);
934                    let pe = roboticus_db::cache::PersistedCacheEntry {
935                        prompt_hash: hash.clone(),
936                        response: entry.content.clone(),
937                        model: entry.model.clone(),
938                        tokens_saved: entry.tokens_saved,
939                        hit_count: entry.hits,
940                        embedding: entry.embedding.clone(),
941                        created_at: chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S").to_string(),
942                        expires_at: Some(expires.format("%Y-%m-%dT%H:%M:%S").to_string()),
943                    };
944                    roboticus_db::cache::save_cache_entry(&flush_db, hash, &pe)
945                        .inspect_err(
946                            |e| tracing::warn!(error = %e, hash, "failed to persist cache entry"),
947                        )
948                        .ok();
949                }
950                roboticus_db::cache::evict_expired_cache(&flush_db)
951                    .inspect_err(|e| tracing::warn!(error = %e, "failed to evict expired cache"))
952                    .ok();
953                tracing::debug!(count = entries.len(), "Flushed semantic cache to database");
954            }
955        });
956        tracing::info!("Cache flush daemon spawned (5min interval)");
957    }
958
959    // Start heartbeat daemon
960    {
961        let hb_wallet = Arc::clone(&state.wallet);
962        let hb_db = state.db.clone();
963        let hb_session_cfg = config.session.clone();
964        let hb_digest_cfg = config.digest.clone();
965        let hb_agent_id = config.agent.id.clone();
966        let daemon = roboticus_schedule::HeartbeatDaemon::new(60_000);
967        tokio::spawn(async move {
968            roboticus_schedule::run_heartbeat(
969                daemon,
970                hb_wallet,
971                hb_db,
972                hb_session_cfg,
973                hb_digest_cfg,
974                hb_agent_id,
975            )
976            .await;
977        });
978        tracing::info!("Heartbeat daemon spawned (60s interval)");
979    }
980
981    // Delivery retry queue drain (every 30 seconds)
982    {
983        let drain_router = Arc::clone(&state.channel_router);
984        tokio::spawn(async move {
985            let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
986            interval.tick().await;
987            loop {
988                interval.tick().await;
989                drain_router.drain_retry_queue().await;
990            }
991        });
992        tracing::info!("Delivery retry queue drain daemon spawned (30s interval)");
993    }
994
995    // Start cron worker
996    {
997        let instance_id = config.agent.id.clone();
998        let cron_state = state.clone();
999        tokio::spawn(async move {
1000            crate::cron_runtime::run_cron_worker(cron_state, instance_id).await;
1001        });
1002        tracing::info!("Cron worker spawned");
1003    }
1004
1005    // Periodic mechanic-check sweep (default 6h, per-instance).
1006    {
1007        let db_path = config.database.path.clone();
1008        let interval_secs = std::env::var("ROBOTICUS_MECHANIC_CHECK_INTERVAL_SECS")
1009            .ok()
1010            .or_else(|| std::env::var("ROBOTICUS_STATE_HYGIENE_INTERVAL_SECS").ok())
1011            .and_then(|v| v.parse::<u64>().ok())
1012            .filter(|v| *v >= 300)
1013            .unwrap_or(21_600);
1014        tokio::spawn(async move {
1015            let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
1016            interval.tick().await;
1017            loop {
1018                interval.tick().await;
1019                match crate::state_hygiene::run_state_hygiene(&db_path) {
1020                    Ok(report) if report.changed => {
1021                        tracing::info!(
1022                            changed_rows = report.changed_rows,
1023                            subagent_rows_normalized = report.subagent_rows_normalized,
1024                            cron_payload_rows_repaired = report.cron_payload_rows_repaired,
1025                            cron_jobs_disabled_invalid_expr =
1026                                report.cron_jobs_disabled_invalid_expr,
1027                            "periodic mechanic checks applied"
1028                        );
1029                    }
1030                    Ok(_) => tracing::debug!("periodic mechanic checks: no changes"),
1031                    Err(e) => tracing::warn!(error = %e, "periodic mechanic checks failed"),
1032                }
1033            }
1034        });
1035        tracing::info!(interval_secs, "Mechanic checks daemon spawned");
1036    }
1037
1038    {
1039        let startup_announce_channels = config.channels.startup_announcement_channels();
1040
1041        let display_host = match config.server.bind.as_str() {
1042            "127.0.0.1" | "0.0.0.0" | "::1" | "::" => "localhost",
1043            other => other,
1044        };
1045        let channel_list: Vec<&str> = {
1046            let mut ch = vec!["web"];
1047            if config.channels.telegram.is_some() {
1048                ch.push("telegram");
1049            }
1050            if config.channels.whatsapp.is_some() {
1051                ch.push("whatsapp");
1052            }
1053            if config.channels.discord.is_some() {
1054                ch.push("discord");
1055            }
1056            if config.channels.signal.is_some() {
1057                ch.push("signal");
1058            }
1059            ch
1060        };
1061        let announce_text = format!(
1062            "🤖 *Roboticus Online*\n\n\
1063             ┌─────────────────────────\n\
1064             │ 🧠  *{}* (`{}`)\n\
1065             │ ⚡  `{}`\n\
1066             │ 🔀  routing: {}\n\
1067             │ 🌐  `{}:{}`\n\
1068             │ 📡  {}\n\
1069             │ 🛠️  {}\n\
1070             └─────────────────────────\n\n\
1071             🕐 {}",
1072            config.agent.name,
1073            config.agent.id,
1074            config.models.primary,
1075            config.models.routing.mode,
1076            display_host,
1077            config.server.port,
1078            channel_list.join(" · "),
1079            config.skills.skills_dir.display(),
1080            chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
1081        );
1082
1083        if startup_announce_channels.iter().any(|c| c == "telegram") {
1084            if let (Some(adapter), Some(tg_cfg)) =
1085                (state.telegram.clone(), config.channels.telegram.as_ref())
1086            {
1087                let announce_targets = tg_cfg.allowed_chat_ids.clone();
1088                if announce_targets.is_empty() {
1089                    tracing::warn!(
1090                        "Telegram startup announcement skipped: channels.telegram.allowed_chat_ids is empty"
1091                    );
1092                } else {
1093                    let text = announce_text.clone();
1094                    tokio::spawn(async move {
1095                        for chat_id in announce_targets {
1096                            let chat = chat_id.to_string();
1097                            match adapter
1098                                .send(roboticus_channels::OutboundMessage {
1099                                    content: text.clone(),
1100                                    recipient_id: chat.clone(),
1101                                    metadata: None,
1102                                })
1103                                .await
1104                            {
1105                                Ok(()) => {
1106                                    tracing::info!(chat_id = %chat, "telegram startup announcement sent")
1107                                }
1108                                Err(e) => {
1109                                    tracing::warn!(chat_id = %chat, error = %e, "telegram startup announcement failed")
1110                                }
1111                            }
1112                        }
1113                    });
1114                }
1115            } else {
1116                tracing::warn!(
1117                    "Telegram startup announcement requested but telegram channel is not enabled/configured"
1118                );
1119            }
1120        }
1121
1122        if startup_announce_channels.iter().any(|c| c == "whatsapp") {
1123            if let (Some(adapter), Some(wa_cfg)) =
1124                (state.whatsapp.clone(), config.channels.whatsapp.as_ref())
1125            {
1126                let targets = wa_cfg.allowed_numbers.clone();
1127                if targets.is_empty() {
1128                    tracing::warn!(
1129                        "WhatsApp startup announcement skipped: channels.whatsapp.allowed_numbers is empty"
1130                    );
1131                } else {
1132                    let text = announce_text.clone();
1133                    tokio::spawn(async move {
1134                        for number in targets {
1135                            match adapter
1136                                .send(roboticus_channels::OutboundMessage {
1137                                    content: text.clone(),
1138                                    recipient_id: number.clone(),
1139                                    metadata: None,
1140                                })
1141                                .await
1142                            {
1143                                Ok(()) => {
1144                                    tracing::info!(recipient = %number, "whatsapp startup announcement sent")
1145                                }
1146                                Err(e) => {
1147                                    tracing::warn!(recipient = %number, error = %e, "whatsapp startup announcement failed")
1148                                }
1149                            }
1150                        }
1151                    });
1152                }
1153            } else {
1154                tracing::warn!(
1155                    "WhatsApp startup announcement requested but whatsapp channel is not enabled/configured"
1156                );
1157            }
1158        }
1159
1160        if startup_announce_channels.iter().any(|c| c == "signal") {
1161            if let (Some(adapter), Some(sig_cfg)) =
1162                (state.signal.clone(), config.channels.signal.as_ref())
1163            {
1164                let targets = sig_cfg.allowed_numbers.clone();
1165                if targets.is_empty() {
1166                    tracing::warn!(
1167                        "Signal startup announcement skipped: channels.signal.allowed_numbers is empty"
1168                    );
1169                } else {
1170                    let text = announce_text.clone();
1171                    tokio::spawn(async move {
1172                        for number in targets {
1173                            match adapter
1174                                .send(roboticus_channels::OutboundMessage {
1175                                    content: text.clone(),
1176                                    recipient_id: number.clone(),
1177                                    metadata: None,
1178                                })
1179                                .await
1180                            {
1181                                Ok(()) => {
1182                                    tracing::info!(recipient = %number, "signal startup announcement sent")
1183                                }
1184                                Err(e) => {
1185                                    tracing::warn!(recipient = %number, error = %e, "signal startup announcement failed")
1186                                }
1187                            }
1188                        }
1189                    });
1190                }
1191            } else {
1192                tracing::warn!(
1193                    "Signal startup announcement requested but signal channel is not enabled/configured"
1194                );
1195            }
1196        }
1197
1198        for ch in &startup_announce_channels {
1199            if ch != "telegram" && ch != "whatsapp" && ch != "signal" {
1200                tracing::warn!(
1201                    channel = %ch,
1202                    "startup announcement requested for channel without recipient mapping support"
1203                );
1204            }
1205        }
1206    }
1207
1208    if state.telegram.is_some() {
1209        let use_polling = config
1210            .channels
1211            .telegram
1212            .as_ref()
1213            .map(|c| !c.webhook_mode)
1214            .unwrap_or(true);
1215        if use_polling {
1216            let poll_state = state.clone();
1217            tokio::spawn(async move {
1218                api::telegram_poll_loop(poll_state).await;
1219            });
1220        }
1221    }
1222    if state.discord.is_some() {
1223        let poll_state = state.clone();
1224        tokio::spawn(async move {
1225            api::discord_poll_loop(poll_state).await;
1226        });
1227    }
1228    if state.signal.is_some() {
1229        let poll_state = state.clone();
1230        tokio::spawn(async move {
1231            api::signal_poll_loop(poll_state).await;
1232        });
1233    }
1234    if state.email.is_some() {
1235        let poll_state = state.clone();
1236        tokio::spawn(async move {
1237            api::email_poll_loop(poll_state).await;
1238        });
1239    }
1240
1241    let auth_layer = ApiKeyLayer::new(config.server.api_key.clone());
1242    let local_origin = format!("http://{}:{}", config.server.bind, config.server.port);
1243    let origin_header = local_origin
1244        .parse::<axum::http::HeaderValue>()
1245        .unwrap_or_else(|e| {
1246            tracing::warn!(
1247                origin = %local_origin,
1248                error = %e,
1249                "CORS origin failed to parse, falling back to 127.0.0.1 loopback"
1250            );
1251            axum::http::HeaderValue::from_static("http://127.0.0.1:3000")
1252        });
1253    let cors = CorsLayer::new()
1254        .allow_origin(origin_header)
1255        .allow_methods([
1256            axum::http::Method::GET,
1257            axum::http::Method::POST,
1258            axum::http::Method::PUT,
1259            axum::http::Method::DELETE,
1260        ])
1261        .allow_headers([
1262            axum::http::header::CONTENT_TYPE,
1263            axum::http::header::AUTHORIZATION,
1264            axum::http::HeaderName::from_static("x-api-key"),
1265        ]);
1266    let authed_routes = build_router(state.clone()).layer(auth_layer);
1267
1268    // WebSocket route handles its own auth (header OR ticket) so it
1269    // lives outside the API-key middleware layer.
1270    let ws_routes = axum::Router::new().route(
1271        "/ws",
1272        ws_route(
1273            event_bus.clone(),
1274            state.ws_tickets.clone(),
1275            config.server.api_key.clone(),
1276        ),
1277    );
1278
1279    // MCP protocol endpoint uses bearer token auth (same API key).
1280    // Lives outside the main API-key middleware because the MCP transport
1281    // service (StreamableHttpService) needs its own route handling.
1282    let mcp_routes = build_mcp_router(&state, config.server.api_key.clone());
1283
1284    let public_routes = build_public_router(state);
1285
1286    let app = authed_routes
1287        .merge(ws_routes)
1288        .merge(mcp_routes)
1289        .merge(public_routes)
1290        .layer(cors)
1291        .layer(rate_limiter);
1292    Ok(app)
1293}
1294
1295#[cfg(test)]
1296pub mod test_support;
1297
1298#[cfg(test)]
1299mod tests {
1300    use super::*;
1301    use crate::test_support::EnvGuard;
1302
1303    const BOOTSTRAP_CONFIG: &str = r#"
1304[agent]
1305name = "Roboticus"
1306id = "roboticus-test"
1307
1308[server]
1309port = 18789
1310bind = "127.0.0.1"
1311
1312[database]
1313path = ":memory:"
1314
1315[models]
1316primary = "ollama/qwen3:8b"
1317"#;
1318
1319    #[tokio::test]
1320    async fn bootstrap_with_memory_db_succeeds() {
1321        let config = RoboticusConfig::from_str(BOOTSTRAP_CONFIG).expect("parse config");
1322        let result = bootstrap(config).await;
1323        assert!(
1324            result.is_ok(),
1325            "bootstrap with :memory: should succeed: {:?}",
1326            result.err()
1327        );
1328    }
1329
1330    #[tokio::test]
1331    async fn bootstrap_handles_enabled_channels_with_missing_credentials() {
1332        let dir = tempfile::tempdir().unwrap();
1333        let db_path = dir.path().join("state.db");
1334        let cfg = format!(
1335            r#"
1336[agent]
1337name = "T"
1338id = "t"
1339
1340[server]
1341bind = "127.0.0.1"
1342port = 18789
1343
1344[database]
1345path = "{db_path}"
1346
1347[models]
1348primary = "ollama/qwen3:8b"
1349
1350[channels.telegram]
1351enabled = true
1352token_env = "TEST_TELEGRAM_MISSING"
1353
1354[channels.whatsapp]
1355enabled = true
1356token_env = "TEST_WHATSAPP_MISSING"
1357phone_number_id = ""
1358
1359[channels.discord]
1360enabled = true
1361token_env = "TEST_DISCORD_MISSING"
1362
1363[channels.signal]
1364enabled = true
1365phone_number = ""
1366
1367[channels.email]
1368enabled = true
1369password_env = "TEST_EMAIL_PASSWORD_MISSING"
1370
1371[channels.voice]
1372enabled = true
1373
1374[obsidian]
1375enabled = true
1376vault_path = "{vault_path}"
1377watch_for_changes = true
1378"#,
1379            db_path = db_path.display(),
1380            vault_path = dir.path().join("missing-vault").display(),
1381        );
1382        let config = RoboticusConfig::from_str(&cfg).expect("parse config");
1383        let result = bootstrap(config).await;
1384        assert!(
1385            result.is_ok(),
1386            "bootstrap should tolerate disabled channel credentials and invalid obsidian path: {:?}",
1387            result.err()
1388        );
1389    }
1390
1391    #[tokio::test]
1392    async fn bootstrap_loads_taskable_subagents_from_database() {
1393        let dir = tempfile::tempdir().unwrap();
1394        let db_path = dir.path().join("state.db");
1395        let db = roboticus_db::Database::new(db_path.to_str().unwrap()).unwrap();
1396
1397        roboticus_db::agents::upsert_sub_agent(
1398            &db,
1399            &roboticus_db::agents::SubAgentRow {
1400                id: "sa-1".into(),
1401                name: "finance_specialist".into(),
1402                display_name: Some("Finance Specialist".into()),
1403                model: "auto".into(),
1404                fallback_models_json: Some("[]".into()),
1405                role: "subagent".into(),
1406                description: Some("Handles finance tasks".into()),
1407                skills_json: Some(r#"["budgeting"]"#.into()),
1408                enabled: true,
1409                session_count: 0,
1410                last_used_at: None,
1411            },
1412        )
1413        .unwrap();
1414        roboticus_db::agents::upsert_sub_agent(
1415            &db,
1416            &roboticus_db::agents::SubAgentRow {
1417                id: "sa-2".into(),
1418                name: "observer".into(),
1419                display_name: Some("Observer".into()),
1420                model: "ollama/qwen3:8b".into(),
1421                fallback_models_json: Some("[]".into()),
1422                role: "model-proxy".into(),
1423                description: Some("Not taskable".into()),
1424                skills_json: None,
1425                enabled: true,
1426                session_count: 0,
1427                last_used_at: None,
1428            },
1429        )
1430        .unwrap();
1431
1432        let cfg = format!(
1433            r#"
1434[agent]
1435name = "T"
1436id = "t"
1437
1438[server]
1439bind = "127.0.0.1"
1440port = 18789
1441
1442[database]
1443path = "{db_path}"
1444
1445[models]
1446primary = "ollama/qwen3:8b"
1447"#,
1448            db_path = db_path.display(),
1449        );
1450        let config = RoboticusConfig::from_str(&cfg).expect("parse config");
1451        let result = bootstrap(config).await;
1452        assert!(
1453            result.is_ok(),
1454            "bootstrap should register enabled taskable subagents from db: {:?}",
1455            result.err()
1456        );
1457    }
1458
1459    #[test]
1460    fn cleanup_old_logs_no_panic_on_missing_dir() {
1461        let dir = std::path::Path::new("/tmp/roboticus-test-nonexistent-dir-cleanup");
1462        cleanup_old_logs(dir, 30);
1463    }
1464
1465    #[test]
1466    fn cleanup_old_logs_keeps_recent_logs() {
1467        let dir = tempfile::tempdir().unwrap();
1468        let recent = dir.path().join("recent.log");
1469        std::fs::write(&recent, "fresh log").unwrap();
1470
1471        cleanup_old_logs(dir.path(), 30);
1472        assert!(recent.exists(), "recent logs should remain");
1473    }
1474
1475    #[test]
1476    fn cleanup_old_logs_ignores_non_log_files() {
1477        let dir = tempfile::tempdir().unwrap();
1478        let txt = dir.path().join("data.txt");
1479        std::fs::write(&txt, "keep me").unwrap();
1480
1481        cleanup_old_logs(dir.path(), 0);
1482        assert!(txt.exists(), "non-log files should not be deleted");
1483    }
1484
1485    #[test]
1486    fn cleanup_old_logs_empty_dir() {
1487        let dir = tempfile::tempdir().unwrap();
1488        cleanup_old_logs(dir.path(), 1);
1489    }
1490
1491    #[test]
1492    fn taskable_subagent_roles_are_strict() {
1493        assert!(is_taskable_subagent_role("subagent"));
1494        assert!(is_taskable_subagent_role("specialist"));
1495        assert!(is_taskable_subagent_role("SubAgent"));
1496        assert!(!is_taskable_subagent_role("model-proxy"));
1497    }
1498
1499    #[test]
1500    fn resolve_token_prefers_keystore_reference_then_env_then_empty() {
1501        let dir = tempfile::tempdir().unwrap();
1502        let keystore_path = dir.path().join("keystore.enc");
1503        let keystore = roboticus_core::keystore::Keystore::new(keystore_path);
1504        keystore.unlock("pw").unwrap();
1505        keystore.set("telegram_bot_token", "from_keystore").unwrap();
1506
1507        let _env = EnvGuard::set("TEST_TELEGRAM_TOKEN", "from_env");
1508        let token = resolve_token(
1509            &Some("keystore:telegram_bot_token".to_string()),
1510            "TEST_TELEGRAM_TOKEN",
1511            &keystore,
1512        );
1513        assert_eq!(token, "from_keystore");
1514
1515        let fallback = resolve_token(
1516            &Some("keystore:missing".to_string()),
1517            "TEST_TELEGRAM_TOKEN",
1518            &keystore,
1519        );
1520        assert_eq!(fallback, "from_env");
1521
1522        let empty = resolve_token(&None, "", &keystore);
1523        assert!(empty.is_empty());
1524    }
1525
1526    #[test]
1527    fn resolve_token_uses_env_when_no_keystore_ref_is_provided() {
1528        let dir = tempfile::tempdir().unwrap();
1529        let keystore_path = dir.path().join("keystore.enc");
1530        let keystore = roboticus_core::keystore::Keystore::new(keystore_path);
1531        keystore.unlock("pw").unwrap();
1532
1533        let _env = EnvGuard::set("TEST_SIGNAL_TOKEN", "from_env_only");
1534        let token = resolve_token(&None, "TEST_SIGNAL_TOKEN", &keystore);
1535        assert_eq!(token, "from_env_only");
1536    }
1537
1538    #[test]
1539    fn resolve_token_returns_empty_for_empty_env_var() {
1540        let dir = tempfile::tempdir().unwrap();
1541        let keystore_path = dir.path().join("keystore.enc");
1542        let keystore = roboticus_core::keystore::Keystore::new(keystore_path);
1543        keystore.unlock("pw").unwrap();
1544
1545        let _env = EnvGuard::set("TEST_EMPTY_TOKEN", "");
1546        let token = resolve_token(&None, "TEST_EMPTY_TOKEN", &keystore);
1547        assert!(token.is_empty());
1548    }
1549
1550    #[test]
1551    fn cleanup_old_logs_can_prune_when_window_is_zero_days() {
1552        let dir = tempfile::tempdir().unwrap();
1553        let old_log = dir.path().join("old.log");
1554        std::fs::write(&old_log, "stale").unwrap();
1555        std::thread::sleep(std::time::Duration::from_millis(10));
1556        cleanup_old_logs(dir.path(), 0);
1557        assert!(!old_log.exists());
1558    }
1559}