1pub use roboticus_api as api_crate;
37pub use roboticus_api::abuse;
38pub use roboticus_api::api;
39pub use roboticus_api::auth;
40pub use roboticus_api::config_runtime;
41pub use roboticus_api::cron_runtime;
42pub use roboticus_api::dashboard;
43pub use roboticus_api::rate_limit;
44pub use roboticus_api::ws;
45pub use roboticus_api::ws_ticket;
46pub use roboticus_api::{
47 AppState, EventBus, PersonalityState, TicketStore, build_dashboard_html, build_mcp_router,
48 build_public_router, build_router, dashboard_handler, ws_route,
49};
50
51pub use roboticus_cli::cli;
53pub use roboticus_cli::migrate;
54
55pub mod config_maintenance;
57pub mod daemon;
58pub mod plugins;
59pub use roboticus_cli::state_hygiene;
60
61use std::sync::Arc;
62use std::sync::OnceLock;
63use std::sync::atomic::{AtomicBool, Ordering};
64use std::time::Duration;
65
66use tokio::sync::RwLock;
67use tower_http::cors::CorsLayer;
68
69use auth::ApiKeyLayer;
70use roboticus_agent::policy::{
71 AuthorityRule, CommandSafetyRule, FinancialRule, PathProtectionRule, PolicyEngine,
72 RateLimitRule, ValidationRule,
73};
74use roboticus_agent::subagents::SubagentRegistry;
75use roboticus_browser::Browser;
76use roboticus_channels::ChannelAdapter;
77use roboticus_channels::a2a::A2aProtocol;
78use roboticus_channels::router::ChannelRouter;
79use roboticus_channels::telegram::TelegramAdapter;
80use roboticus_channels::whatsapp::WhatsAppAdapter;
81use roboticus_core::RoboticusConfig;
82use roboticus_db::Database;
83use roboticus_llm::LlmService;
84use roboticus_llm::OAuthManager;
85use roboticus_wallet::{WalletPaymentHandler, WalletService};
86
87use roboticus_agent::approvals::ApprovalManager;
88use roboticus_agent::obsidian::ObsidianVault;
89use roboticus_agent::obsidian_tools::{ObsidianReadTool, ObsidianSearchTool, ObsidianWriteTool};
90use roboticus_agent::tools::{
91 BashTool, EchoTool, EditFileTool, GlobFilesTool, ListDirectoryTool, ReadFileTool,
92 ScriptRunnerTool, SearchFilesTool, ToolRegistry, WriteFileTool,
93};
94use roboticus_channels::discord::DiscordAdapter;
95use roboticus_channels::email::EmailAdapter;
96use roboticus_channels::signal::SignalAdapter;
97use roboticus_channels::voice::{VoiceConfig, VoicePipeline};
98
99use rate_limit::GlobalRateLimitLayer;
100
101static STDERR_ENABLED: AtomicBool = AtomicBool::new(false);
102static LOG_GUARD: OnceLock<tracing_appender::non_blocking::WorkerGuard> = OnceLock::new();
103
104fn is_taskable_subagent_role(role: &str) -> bool {
105 role.eq_ignore_ascii_case("subagent") || role.eq_ignore_ascii_case("specialist")
106}
107
108pub fn enable_stderr_logging() {
109 STDERR_ENABLED.store(true, Ordering::Release);
110}
111
112fn init_logging(config: &RoboticusConfig) {
113 use tracing_subscriber::EnvFilter;
114 use tracing_subscriber::Layer;
115 use tracing_subscriber::filter::filter_fn;
116 use tracing_subscriber::fmt;
117 use tracing_subscriber::layer::SubscriberExt;
118 use tracing_subscriber::util::SubscriberInitExt;
119
120 let level = config.agent.log_level.as_str();
121 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(level));
122
123 let stderr_gate = filter_fn(|_| STDERR_ENABLED.load(Ordering::Acquire));
124
125 let log_dir = &config.server.log_dir;
126 if std::fs::create_dir_all(log_dir).is_ok() {
127 let file_appender = tracing_appender::rolling::daily(log_dir, "roboticus.log");
128 let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
129 let _ = LOG_GUARD.set(guard);
130
131 let file_layer = fmt::layer()
132 .with_writer(non_blocking)
133 .with_ansi(false)
134 .json();
135
136 let stderr_layer = fmt::layer()
137 .with_writer(std::io::stderr)
138 .with_filter(stderr_gate);
139
140 let _ = tracing_subscriber::registry()
141 .with(filter)
142 .with(stderr_layer)
143 .with(file_layer)
144 .try_init();
145 } else {
146 let stderr_layer = fmt::layer()
147 .with_writer(std::io::stderr)
148 .with_filter(stderr_gate);
149 let _ = tracing_subscriber::registry()
150 .with(filter)
151 .with(stderr_layer)
152 .try_init();
153 }
154
155 cleanup_old_logs(log_dir, config.server.log_max_days);
156}
157
158fn cleanup_old_logs(log_dir: &std::path::Path, max_days: u32) {
159 let cutoff =
160 std::time::SystemTime::now() - std::time::Duration::from_secs(u64::from(max_days) * 86400);
161
162 let entries = match std::fs::read_dir(log_dir) {
163 Ok(e) => e,
164 Err(_) => return,
165 };
166
167 for entry in entries.flatten() {
168 let path = entry.path();
169 if path.extension().and_then(|e| e.to_str()) != Some("log") {
170 continue;
171 }
172 if let Ok(meta) = entry.metadata()
173 && let Ok(modified) = meta.modified()
174 && modified < cutoff
175 {
176 let _ = std::fs::remove_file(&path);
178 }
179 }
180}
181
182fn resolve_token(
184 token_ref: &Option<String>,
185 token_env: &str,
186 keystore: &roboticus_core::Keystore,
187) -> String {
188 if let Some(r) = token_ref
189 && let Some(name) = r.strip_prefix("keystore:")
190 {
191 if let Some(val) = keystore.get(name) {
192 return val;
193 }
194 tracing::warn!(key = %name, "keystore reference not found, falling back to env var");
195 }
196 if !token_env.is_empty() {
197 return match std::env::var(token_env) {
198 Ok(val) if !val.is_empty() => val,
199 Ok(_) => {
200 tracing::warn!(env_var = %token_env, "API key env var is set but empty");
201 String::new()
202 }
203 Err(_) => {
204 tracing::warn!(env_var = %token_env, "API key env var is not set");
205 String::new()
206 }
207 };
208 }
209 String::new()
210}
211
212pub async fn bootstrap(config: RoboticusConfig) -> Result<axum::Router, Box<dyn std::error::Error>> {
214 bootstrap_with_config_path(config, None).await
215}
216
217pub async fn bootstrap_with_config_path(
218 config: RoboticusConfig,
219 config_path: Option<std::path::PathBuf>,
220) -> Result<axum::Router, Box<dyn std::error::Error>> {
221 init_logging(&config);
222
223 let personality_state = api::PersonalityState::from_workspace(&config.agent.workspace);
224
225 if !personality_state.os_text.is_empty() {
226 tracing::info!(
227 personality = %personality_state.identity.name,
228 generated_by = %personality_state.identity.generated_by,
229 "Loaded personality files from workspace"
230 );
231 } else {
232 tracing::info!("No personality files found in workspace, using defaults");
233 }
234
235 let db_path = config.database.path.to_string_lossy().to_string();
236 let db = Database::new(&db_path)?;
237 match crate::state_hygiene::run_state_hygiene(&config.database.path) {
238 Ok(report) if report.changed => {
239 tracing::info!(
240 changed_rows = report.changed_rows,
241 subagent_rows_normalized = report.subagent_rows_normalized,
242 cron_payload_rows_repaired = report.cron_payload_rows_repaired,
243 cron_jobs_disabled_invalid_expr = report.cron_jobs_disabled_invalid_expr,
244 "applied startup mechanic checks"
245 );
246 }
247 Ok(_) => {}
248 Err(e) => tracing::warn!(error = %e, "startup mechanic checks failed"),
249 }
250 match roboticus_db::sessions::backfill_nicknames(&db) {
251 Ok(0) => {}
252 Ok(n) => tracing::info!(count = n, "Backfilled session nicknames"),
253 Err(e) => tracing::warn!(error = %e, "Failed to backfill session nicknames"),
254 }
255 let mut llm = LlmService::new(&config)?;
256 match roboticus_db::metrics::recent_quality_scores(&db, 200) {
259 Ok(scores) if !scores.is_empty() => llm.quality.seed_from_history(&scores),
260 Ok(_) => {}
261 Err(e) => tracing::warn!(error = %e, "failed to seed quality tracker from history"),
262 }
263 let wallet = WalletService::new(&config).await?;
264
265 let wallet_arc = Arc::new(wallet.wallet.clone());
269 let treasury_policy = roboticus_wallet::treasury::TreasuryPolicy::new(&config.treasury);
270 let x402_handler =
271 Arc::new(WalletPaymentHandler::new(wallet_arc).with_treasury_policy(treasury_policy));
272 llm.set_payment_handler(x402_handler);
273
274 let a2a = A2aProtocol::new(config.a2a.clone());
275 let plugin_registry = plugins::init_plugin_registry(&config.plugins).await;
276 let mut policy_engine = PolicyEngine::new();
277 policy_engine.add_rule(Box::new(AuthorityRule));
278 policy_engine.add_rule(Box::new(CommandSafetyRule));
279 policy_engine.add_rule(Box::new(FinancialRule::new(
280 config.treasury.per_payment_cap,
281 )));
282 policy_engine.add_rule(Box::new(PathProtectionRule::from_config(
283 &config.security.filesystem,
284 )));
285 policy_engine.add_rule(Box::new(RateLimitRule::default()));
286 policy_engine.add_rule(Box::new(ValidationRule));
287 let policy_engine = Arc::new(policy_engine);
288 let browser = Arc::new(Browser::new(config.browser.clone()));
289 let registry = Arc::new(SubagentRegistry::new(4, vec![]));
290
291 if let Ok(sub_agents) = roboticus_db::agents::list_enabled_sub_agents(&db) {
292 for sa in &sub_agents {
293 if !is_taskable_subagent_role(&sa.role) {
294 continue;
295 }
296 let resolved_model = match sa.model.trim().to_ascii_lowercase().as_str() {
297 "auto" | "orchestrator" => llm.router.select_model().to_string(),
298 _ => sa.model.clone(),
299 };
300 let fixed_skills = sa
301 .skills_json
302 .as_deref()
303 .and_then(|s| serde_json::from_str::<Vec<String>>(s).ok())
304 .unwrap_or_default();
305 let agent_config = roboticus_agent::subagents::AgentInstanceConfig {
306 id: sa.name.clone(),
307 name: sa.display_name.clone().unwrap_or_else(|| sa.name.clone()),
308 model: resolved_model,
309 skills: fixed_skills,
310 allowed_subagents: vec![],
311 max_concurrent: 4,
312 };
313 if let Err(e) = registry.register(agent_config).await {
314 tracing::warn!(agent = %sa.name, err = %e, "failed to register sub-agent");
315 } else if let Err(e) = registry.start_agent(&sa.name).await {
316 tracing::warn!(agent = %sa.name, err = %e, "failed to auto-start sub-agent");
317 }
318 }
319 if !sub_agents.is_empty() {
320 tracing::info!(
321 count = sub_agents.len(),
322 "registered sub-agents from database"
323 );
324 }
325 }
326
327 let event_bus = EventBus::new(256);
328
329 let keystore =
330 roboticus_core::keystore::Keystore::new(roboticus_core::keystore::Keystore::default_path());
331 if let Err(e) = keystore.unlock_machine() {
332 tracing::warn!("keystore auto-unlock failed: {e}");
333 }
334 let keystore = Arc::new(keystore);
335
336 let channel_router = Arc::new(ChannelRouter::with_store(db.clone()).await);
337 let telegram: Option<Arc<TelegramAdapter>> =
338 if let Some(ref tg_config) = config.channels.telegram {
339 if tg_config.enabled {
340 let token = resolve_token(&tg_config.token_ref, &tg_config.token_env, &keystore);
341 if !token.is_empty() {
342 let adapter = Arc::new(TelegramAdapter::with_config(
343 token,
344 tg_config.poll_timeout_seconds,
345 tg_config.allowed_chat_ids.clone(),
346 tg_config.webhook_secret.clone(),
347 config.security.deny_on_empty_allowlist,
348 ));
349 channel_router
350 .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
351 .await;
352 tracing::info!("Telegram adapter registered");
353 if tg_config.webhook_secret.is_none() {
354 tracing::warn!(
355 "Telegram webhook_secret not set; webhook endpoint will reject with 503"
356 );
357 }
358 Some(adapter)
359 } else {
360 tracing::warn!(
361 token_env = %tg_config.token_env,
362 "Telegram enabled but token is empty"
363 );
364 None
365 }
366 } else {
367 None
368 }
369 } else {
370 None
371 };
372
373 let whatsapp: Option<Arc<WhatsAppAdapter>> =
374 if let Some(ref wa_config) = config.channels.whatsapp {
375 if wa_config.enabled {
376 let token = resolve_token(&wa_config.token_ref, &wa_config.token_env, &keystore);
377 if !token.is_empty() && !wa_config.phone_number_id.is_empty() {
378 let adapter = Arc::new(WhatsAppAdapter::with_config(
379 token,
380 wa_config.phone_number_id.clone(),
381 wa_config.verify_token.clone(),
382 wa_config.allowed_numbers.clone(),
383 wa_config.app_secret.clone(),
384 config.security.deny_on_empty_allowlist,
385 )?);
386 channel_router
387 .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
388 .await;
389 tracing::info!("WhatsApp adapter registered");
390 if wa_config.app_secret.is_none() {
391 tracing::warn!(
392 "WhatsApp app_secret not set; webhook endpoint will reject with 503"
393 );
394 }
395 Some(adapter)
396 } else {
397 tracing::warn!("WhatsApp enabled but token env or phone_number_id is empty");
398 None
399 }
400 } else {
401 None
402 }
403 } else {
404 None
405 };
406
407 let discord: Option<Arc<DiscordAdapter>> = if let Some(ref dc_config) = config.channels.discord
408 {
409 if dc_config.enabled {
410 let token = resolve_token(&dc_config.token_ref, &dc_config.token_env, &keystore);
411 if !token.is_empty() {
412 let adapter = Arc::new(DiscordAdapter::with_config(
413 token,
414 dc_config.allowed_guild_ids.clone(),
415 config.security.deny_on_empty_allowlist,
416 ));
417 channel_router
418 .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
419 .await;
420 if let Err(e) = adapter.connect_gateway().await {
422 tracing::error!(error = %e, "Failed to connect Discord gateway");
423 }
424 tracing::info!("Discord adapter registered with gateway");
425 Some(adapter)
426 } else {
427 tracing::warn!(
428 token_env = %dc_config.token_env,
429 "Discord enabled but token env var is empty"
430 );
431 None
432 }
433 } else {
434 None
435 }
436 } else {
437 None
438 };
439
440 let signal: Option<Arc<SignalAdapter>> = if let Some(ref sig_config) = config.channels.signal {
441 if sig_config.enabled {
442 if !sig_config.phone_number.is_empty() {
443 let adapter = Arc::new(SignalAdapter::with_config(
444 sig_config.phone_number.clone(),
445 sig_config.daemon_url.clone(),
446 sig_config.allowed_numbers.clone(),
447 config.security.deny_on_empty_allowlist,
448 ));
449 channel_router
450 .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
451 .await;
452 tracing::info!("Signal adapter registered");
453 Some(adapter)
454 } else {
455 tracing::warn!("Signal enabled but phone_number is empty");
456 None
457 }
458 } else {
459 None
460 }
461 } else {
462 None
463 };
464
465 let email: Option<Arc<EmailAdapter>> = if config.channels.email.enabled {
466 let email_cfg = &config.channels.email;
467 let password = if email_cfg.password_env.is_empty() {
468 String::new()
469 } else {
470 match std::env::var(&email_cfg.password_env) {
471 Ok(val) => val,
472 Err(_) => {
473 tracing::warn!(env_var = %email_cfg.password_env, "email password env var is not set");
474 String::new()
475 }
476 }
477 };
478 if email_cfg.smtp_host.is_empty()
479 || email_cfg.username.is_empty()
480 || password.is_empty()
481 || email_cfg.from_address.is_empty()
482 {
483 tracing::warn!("Email enabled but SMTP credentials are incomplete");
484 None
485 } else {
486 match EmailAdapter::new(
487 email_cfg.from_address.clone(),
488 email_cfg.smtp_host.clone(),
489 email_cfg.smtp_port,
490 email_cfg.imap_host.clone(),
491 email_cfg.imap_port,
492 email_cfg.username.clone(),
493 password,
494 ) {
495 Ok(email_adapter) => {
496 let oauth2_token =
498 if email_cfg.use_oauth2 && !email_cfg.oauth2_token_env.is_empty() {
499 std::env::var(&email_cfg.oauth2_token_env).ok()
500 } else {
501 None
502 };
503 let adapter = Arc::new(
504 email_adapter
505 .with_allowed_senders(email_cfg.allowed_senders.clone())
506 .with_deny_on_empty(config.security.deny_on_empty_allowlist)
507 .with_poll_interval(std::time::Duration::from_secs(
508 email_cfg.poll_interval_seconds,
509 ))
510 .with_oauth2_token(oauth2_token)
511 .with_imap_idle_enabled(email_cfg.imap_idle_enabled),
512 );
513 channel_router
514 .register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
515 .await;
516 tracing::info!("Email adapter registered");
517
518 if !email_cfg.imap_host.is_empty()
520 && let Err(e) = adapter.start_imap_listener().await
521 {
522 tracing::error!(error = %e, "Failed to start email IMAP listener");
523 }
524
525 Some(adapter)
526 }
527 Err(e) => {
528 tracing::error!(error = %e, "Failed to create email adapter");
529 None
530 }
531 }
532 }
533 } else {
534 None
535 };
536
537 let voice: Option<Arc<RwLock<VoicePipeline>>> = if config.channels.voice.enabled {
538 let mut voice_config = VoiceConfig::default();
539 if let Some(stt) = &config.channels.voice.stt_model {
540 voice_config.stt_model = stt.clone();
541 }
542 if let Some(tts) = &config.channels.voice.tts_model {
543 voice_config.tts_model = tts.clone();
544 }
545 if let Some(v) = &config.channels.voice.tts_voice {
546 voice_config.tts_voice = v.clone();
547 }
548 if let Ok(key) = std::env::var("OPENAI_API_KEY")
549 && !key.is_empty()
550 {
551 voice_config.api_key = Some(key);
552 }
553 tracing::info!("Voice pipeline initialized");
554 Some(Arc::new(RwLock::new(VoicePipeline::new(voice_config))))
555 } else {
556 None
557 };
558
559 let hmac_secret = {
560 use rand::RngCore;
561 let mut buf = vec![0u8; 32];
562 rand::rngs::OsRng.fill_bytes(&mut buf);
563 buf
564 };
565
566 let retriever = Arc::new(roboticus_agent::retrieval::MemoryRetriever::new(
567 config.memory.clone(),
568 ));
569
570 let mut tool_registry = ToolRegistry::new();
571 tool_registry.register(Box::new(EchoTool));
572 tool_registry.register(Box::new(BashTool));
573 tool_registry.register(Box::new(ScriptRunnerTool::new(
574 config.skills.clone(),
575 config.security.filesystem.clone(),
576 )));
577 tool_registry.register(Box::new(ReadFileTool));
578 tool_registry.register(Box::new(WriteFileTool));
579 tool_registry.register(Box::new(EditFileTool));
580 tool_registry.register(Box::new(ListDirectoryTool));
581 tool_registry.register(Box::new(GlobFilesTool));
582 tool_registry.register(Box::new(SearchFilesTool));
583
584 use roboticus_agent::tools::{
586 GetChannelHealthTool, GetMemoryStatsTool, GetRuntimeContextTool, GetSubagentStatusTool,
587 };
588 tool_registry.register(Box::new(GetRuntimeContextTool));
589 tool_registry.register(Box::new(GetMemoryStatsTool));
590 tool_registry.register(Box::new(GetChannelHealthTool));
591 tool_registry.register(Box::new(GetSubagentStatusTool));
592
593 use roboticus_agent::tools::{AlterTableTool, CreateTableTool, DropTableTool};
595 tool_registry.register(Box::new(CreateTableTool));
596 tool_registry.register(Box::new(AlterTableTool));
597 tool_registry.register(Box::new(DropTableTool));
598
599 let obsidian_vault: Option<Arc<RwLock<ObsidianVault>>> = if config.obsidian.enabled {
601 match ObsidianVault::from_config(&config.obsidian) {
602 Ok(vault) => {
603 let vault = Arc::new(RwLock::new(vault));
604 tool_registry.register(Box::new(ObsidianReadTool::new(Arc::clone(&vault))));
605 tool_registry.register(Box::new(ObsidianWriteTool::new(Arc::clone(&vault))));
606 tool_registry.register(Box::new(ObsidianSearchTool::new(Arc::clone(&vault))));
607 tracing::info!("Obsidian vault integration enabled");
608 Some(vault)
609 }
610 Err(e) => {
611 tracing::warn!(error = %e, "Failed to initialize Obsidian vault");
612 None
613 }
614 }
615 } else {
616 None
617 };
618
619 if let Some(ref vault) = obsidian_vault
621 && config.obsidian.watch_for_changes
622 {
623 match roboticus_agent::obsidian::watcher::VaultWatcher::start(Arc::clone(vault)) {
624 Ok(_watcher) => {
625 tracing::info!("Obsidian vault file watcher started");
627 }
628 Err(e) => {
629 tracing::warn!(error = %e, "Failed to start Obsidian vault watcher");
630 }
631 }
632 }
633
634 let tool_registry = Arc::new(tool_registry);
635
636 let approvals = Arc::new(ApprovalManager::new(config.approvals.clone()));
637
638 let oauth = Arc::new(OAuthManager::new()?);
639
640 let discovery_registry = Arc::new(RwLock::new(
641 roboticus_agent::discovery::DiscoveryRegistry::new(),
642 ));
643 let device_manager = Arc::new(RwLock::new(roboticus_agent::device::DeviceManager::new(
644 roboticus_agent::device::DeviceIdentity::generate(&config.agent.id),
645 config.devices.max_paired_devices,
646 )));
647
648 let mut mcp_clients = roboticus_agent::mcp::McpClientManager::new();
649 for c in &config.mcp.clients {
650 let mut conn = roboticus_agent::mcp::McpClientConnection::new(c.name.clone(), c.url.clone());
651 if let Err(e) = conn.discover() {
652 tracing::warn!(mcp_name = %c.name, error = %e, "MCP client discovery failed at startup");
653 }
654 mcp_clients.add_connection(conn);
655 }
656 let mcp_clients = Arc::new(RwLock::new(mcp_clients));
657
658 let mut mcp_server_registry = roboticus_agent::mcp::McpServerRegistry::new();
659 let exported = roboticus_agent::mcp::export_tools_as_mcp(
660 &tool_registry
661 .list()
662 .iter()
663 .map(|t| {
664 (
665 t.name().to_string(),
666 t.description().to_string(),
667 t.parameters_schema(),
668 )
669 })
670 .collect::<Vec<_>>(),
671 );
672 for tool in exported {
673 mcp_server_registry.register_tool(tool);
674 }
675 mcp_server_registry.register_resource(roboticus_agent::mcp::McpResource {
676 uri: "roboticus://sessions/active".to_string(),
677 name: "Active Sessions".to_string(),
678 description: "Active sessions in the local runtime".to_string(),
679 mime_type: "application/json".to_string(),
680 });
681 mcp_server_registry.register_resource(roboticus_agent::mcp::McpResource {
682 uri: "roboticus://metrics/capacity".to_string(),
683 name: "Provider Capacity Stats".to_string(),
684 description: "Current provider utilization and headroom".to_string(),
685 mime_type: "application/json".to_string(),
686 });
687 let mcp_server_registry = Arc::new(RwLock::new(mcp_server_registry));
688
689 let ann_index = roboticus_db::ann::AnnIndex::new(config.memory.ann_index);
690 if config.memory.ann_index {
691 match ann_index.build_from_db(&db) {
692 Ok(count) => {
693 if ann_index.is_built() {
694 tracing::info!(count, "ANN index built from database");
695 } else {
696 tracing::info!(
697 count,
698 min = ann_index.min_entries_for_index,
699 "ANN index below threshold, brute-force search will be used"
700 );
701 }
702 }
703 Err(e) => {
704 tracing::warn!(error = %e, "Failed to build ANN index, falling back to brute-force");
705 }
706 }
707 }
708
709 let media_service = if config.multimodal.enabled {
711 match roboticus_channels::media::MediaService::new(&config.multimodal) {
712 Ok(svc) => {
713 tracing::info!(
714 media_dir = ?config.multimodal.media_dir,
715 "Media service initialized"
716 );
717 Some(Arc::new(svc))
718 }
719 Err(e) => {
720 tracing::error!(error = %e, "Failed to initialize media service");
721 None
722 }
723 }
724 } else {
725 None
726 };
727
728 let resolved_config_path =
729 config_path.unwrap_or_else(crate::config_runtime::resolve_default_config_path);
730
731 let rate_limiter = GlobalRateLimitLayer::new(
734 u64::from(config.server.rate_limit_requests),
735 Duration::from_secs(config.server.rate_limit_window_secs),
736 )
737 .with_per_ip_capacity(u64::from(config.server.per_ip_rate_limit_requests))
738 .with_per_actor_capacity(u64::from(config.server.per_actor_rate_limit_requests))
739 .with_trusted_proxy_cidrs(&config.server.trusted_proxy_cidrs);
740
741 let state = AppState {
742 db,
743 config: Arc::new(RwLock::new(config.clone())),
744 llm: Arc::new(RwLock::new(llm)),
745 wallet: Arc::new(wallet),
746 a2a: Arc::new(RwLock::new(a2a)),
747 personality: Arc::new(RwLock::new(personality_state)),
748 hmac_secret: Arc::new(hmac_secret),
749 interviews: Arc::new(RwLock::new(std::collections::HashMap::new())),
750 plugins: plugin_registry,
751 policy_engine,
752 browser,
753 registry,
754 event_bus: event_bus.clone(),
755 channel_router,
756 telegram,
757 whatsapp,
758 retriever,
759 ann_index,
760 tools: tool_registry,
761 approvals,
762 discord,
763 signal,
764 email,
765 voice,
766 media_service,
767 discovery: discovery_registry,
768 devices: device_manager,
769 mcp_clients,
770 mcp_server: mcp_server_registry,
771 oauth,
772 keystore,
773 obsidian: obsidian_vault,
774 started_at: std::time::Instant::now(),
775 config_path: Arc::new(resolved_config_path.clone()),
776 config_apply_status: crate::config_runtime::status_for_path(&resolved_config_path),
777 pending_specialist_proposals: Arc::new(RwLock::new(std::collections::HashMap::new())),
778 ws_tickets: ws_ticket::TicketStore::new(),
779 rate_limiter: rate_limiter.clone(),
780 };
781
782 if config.memory.ann_index {
784 let ann_db = state.db.clone();
785 let ann_idx = state.ann_index.clone();
786 tokio::spawn(async move {
787 let mut interval = tokio::time::interval(std::time::Duration::from_secs(600));
788 interval.tick().await;
789 loop {
790 interval.tick().await;
791 match ann_idx.rebuild(&ann_db) {
792 Ok(count) => {
793 tracing::debug!(count, "ANN index rebuilt");
794 }
795 Err(e) => {
796 tracing::warn!(error = %e, "ANN index rebuild failed");
797 }
798 }
799 }
800 });
801 tracing::info!("ANN index rebuild daemon spawned (10min interval)");
802 }
803
804 {
806 let loaded = roboticus_db::cache::load_cache_entries(&state.db)
807 .inspect_err(|e| tracing::warn!(error = %e, "failed to load semantic cache entries"))
808 .unwrap_or_default();
809 if !loaded.is_empty() {
810 let imported: Vec<(String, roboticus_llm::ExportedCacheEntry)> = loaded
811 .into_iter()
812 .map(|(id, pe)| {
813 let ttl = pe
814 .expires_at
815 .and_then(|e| {
816 chrono::NaiveDateTime::parse_from_str(&e, "%Y-%m-%dT%H:%M:%S")
817 .ok()
818 .or_else(|| {
819 chrono::NaiveDateTime::parse_from_str(&e, "%Y-%m-%d %H:%M:%S")
820 .ok()
821 })
822 })
823 .map(|exp| {
824 let now = chrono::Utc::now().naive_utc();
825 if exp > now {
826 (exp - now).num_seconds().max(0) as u64
827 } else {
828 0
829 }
830 })
831 .unwrap_or(3600);
832
833 (
834 id,
835 roboticus_llm::ExportedCacheEntry {
836 content: pe.response,
837 model: pe.model,
838 tokens_saved: pe.tokens_saved,
839 hits: pe.hit_count,
840 involved_tools: false,
841 embedding: pe.embedding,
842 ttl_remaining_secs: ttl,
843 },
844 )
845 })
846 .collect();
847 let count = imported.len();
848 let mut llm = state.llm.write().await;
849 llm.cache.import_entries(imported);
850 tracing::info!(count, "Loaded semantic cache from database");
851 }
852 }
853
854 {
856 let flush_db = state.db.clone();
857 let flush_llm = Arc::clone(&state.llm);
858 tokio::spawn(async move {
859 let mut interval = tokio::time::interval(std::time::Duration::from_secs(300));
860 interval.tick().await; loop {
862 interval.tick().await;
863 let entries = {
864 let llm = flush_llm.read().await;
865 llm.cache.export_entries()
866 };
867 for (hash, entry) in &entries {
868 let expires = chrono::Utc::now()
869 + chrono::Duration::seconds(entry.ttl_remaining_secs as i64);
870 let pe = roboticus_db::cache::PersistedCacheEntry {
871 prompt_hash: hash.clone(),
872 response: entry.content.clone(),
873 model: entry.model.clone(),
874 tokens_saved: entry.tokens_saved,
875 hit_count: entry.hits,
876 embedding: entry.embedding.clone(),
877 created_at: chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S").to_string(),
878 expires_at: Some(expires.format("%Y-%m-%dT%H:%M:%S").to_string()),
879 };
880 roboticus_db::cache::save_cache_entry(&flush_db, hash, &pe)
881 .inspect_err(
882 |e| tracing::warn!(error = %e, hash, "failed to persist cache entry"),
883 )
884 .ok();
885 }
886 roboticus_db::cache::evict_expired_cache(&flush_db)
887 .inspect_err(|e| tracing::warn!(error = %e, "failed to evict expired cache"))
888 .ok();
889 tracing::debug!(count = entries.len(), "Flushed semantic cache to database");
890 }
891 });
892 tracing::info!("Cache flush daemon spawned (5min interval)");
893 }
894
895 {
897 let hb_wallet = Arc::clone(&state.wallet);
898 let hb_db = state.db.clone();
899 let hb_session_cfg = config.session.clone();
900 let hb_digest_cfg = config.digest.clone();
901 let hb_agent_id = config.agent.id.clone();
902 let daemon = roboticus_schedule::HeartbeatDaemon::new(60_000);
903 tokio::spawn(async move {
904 roboticus_schedule::run_heartbeat(
905 daemon,
906 hb_wallet,
907 hb_db,
908 hb_session_cfg,
909 hb_digest_cfg,
910 hb_agent_id,
911 )
912 .await;
913 });
914 tracing::info!("Heartbeat daemon spawned (60s interval)");
915 }
916
917 {
919 let drain_router = Arc::clone(&state.channel_router);
920 tokio::spawn(async move {
921 let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
922 interval.tick().await;
923 loop {
924 interval.tick().await;
925 drain_router.drain_retry_queue().await;
926 }
927 });
928 tracing::info!("Delivery retry queue drain daemon spawned (30s interval)");
929 }
930
931 {
933 let instance_id = config.agent.id.clone();
934 let cron_state = state.clone();
935 tokio::spawn(async move {
936 crate::cron_runtime::run_cron_worker(cron_state, instance_id).await;
937 });
938 tracing::info!("Cron worker spawned");
939 }
940
941 {
943 let db_path = config.database.path.clone();
944 let interval_secs = std::env::var("ROBOTICUS_MECHANIC_CHECK_INTERVAL_SECS")
945 .ok()
946 .or_else(|| std::env::var("ROBOTICUS_STATE_HYGIENE_INTERVAL_SECS").ok())
947 .and_then(|v| v.parse::<u64>().ok())
948 .filter(|v| *v >= 300)
949 .unwrap_or(21_600);
950 tokio::spawn(async move {
951 let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
952 interval.tick().await;
953 loop {
954 interval.tick().await;
955 match crate::state_hygiene::run_state_hygiene(&db_path) {
956 Ok(report) if report.changed => {
957 tracing::info!(
958 changed_rows = report.changed_rows,
959 subagent_rows_normalized = report.subagent_rows_normalized,
960 cron_payload_rows_repaired = report.cron_payload_rows_repaired,
961 cron_jobs_disabled_invalid_expr =
962 report.cron_jobs_disabled_invalid_expr,
963 "periodic mechanic checks applied"
964 );
965 }
966 Ok(_) => tracing::debug!("periodic mechanic checks: no changes"),
967 Err(e) => tracing::warn!(error = %e, "periodic mechanic checks failed"),
968 }
969 }
970 });
971 tracing::info!(interval_secs, "Mechanic checks daemon spawned");
972 }
973
974 {
975 let startup_announce_channels = config.channels.startup_announcement_channels();
976
977 let announce_text = format!(
978 "Roboticus online\nagent: {} ({})\nmodel.primary: {}\nrouting: {}\nserver: {}:{}\nskills_dir: {}\nstarted_at: {}",
979 config.agent.name,
980 config.agent.id,
981 config.models.primary,
982 config.models.routing.mode,
983 config.server.bind,
984 config.server.port,
985 config.skills.skills_dir.display(),
986 chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
987 );
988
989 if startup_announce_channels.iter().any(|c| c == "telegram") {
990 if let (Some(adapter), Some(tg_cfg)) =
991 (state.telegram.clone(), config.channels.telegram.as_ref())
992 {
993 let announce_targets = tg_cfg.allowed_chat_ids.clone();
994 if announce_targets.is_empty() {
995 tracing::warn!(
996 "Telegram startup announcement skipped: channels.telegram.allowed_chat_ids is empty"
997 );
998 } else {
999 let text = announce_text.clone();
1000 tokio::spawn(async move {
1001 for chat_id in announce_targets {
1002 let chat = chat_id.to_string();
1003 match adapter
1004 .send(roboticus_channels::OutboundMessage {
1005 content: text.clone(),
1006 recipient_id: chat.clone(),
1007 metadata: None,
1008 })
1009 .await
1010 {
1011 Ok(()) => {
1012 tracing::info!(chat_id = %chat, "telegram startup announcement sent")
1013 }
1014 Err(e) => {
1015 tracing::warn!(chat_id = %chat, error = %e, "telegram startup announcement failed")
1016 }
1017 }
1018 }
1019 });
1020 }
1021 } else {
1022 tracing::warn!(
1023 "Telegram startup announcement requested but telegram channel is not enabled/configured"
1024 );
1025 }
1026 }
1027
1028 if startup_announce_channels.iter().any(|c| c == "whatsapp") {
1029 if let (Some(adapter), Some(wa_cfg)) =
1030 (state.whatsapp.clone(), config.channels.whatsapp.as_ref())
1031 {
1032 let targets = wa_cfg.allowed_numbers.clone();
1033 if targets.is_empty() {
1034 tracing::warn!(
1035 "WhatsApp startup announcement skipped: channels.whatsapp.allowed_numbers is empty"
1036 );
1037 } else {
1038 let text = announce_text.clone();
1039 tokio::spawn(async move {
1040 for number in targets {
1041 match adapter
1042 .send(roboticus_channels::OutboundMessage {
1043 content: text.clone(),
1044 recipient_id: number.clone(),
1045 metadata: None,
1046 })
1047 .await
1048 {
1049 Ok(()) => {
1050 tracing::info!(recipient = %number, "whatsapp startup announcement sent")
1051 }
1052 Err(e) => {
1053 tracing::warn!(recipient = %number, error = %e, "whatsapp startup announcement failed")
1054 }
1055 }
1056 }
1057 });
1058 }
1059 } else {
1060 tracing::warn!(
1061 "WhatsApp startup announcement requested but whatsapp channel is not enabled/configured"
1062 );
1063 }
1064 }
1065
1066 if startup_announce_channels.iter().any(|c| c == "signal") {
1067 if let (Some(adapter), Some(sig_cfg)) =
1068 (state.signal.clone(), config.channels.signal.as_ref())
1069 {
1070 let targets = sig_cfg.allowed_numbers.clone();
1071 if targets.is_empty() {
1072 tracing::warn!(
1073 "Signal startup announcement skipped: channels.signal.allowed_numbers is empty"
1074 );
1075 } else {
1076 let text = announce_text.clone();
1077 tokio::spawn(async move {
1078 for number in targets {
1079 match adapter
1080 .send(roboticus_channels::OutboundMessage {
1081 content: text.clone(),
1082 recipient_id: number.clone(),
1083 metadata: None,
1084 })
1085 .await
1086 {
1087 Ok(()) => {
1088 tracing::info!(recipient = %number, "signal startup announcement sent")
1089 }
1090 Err(e) => {
1091 tracing::warn!(recipient = %number, error = %e, "signal startup announcement failed")
1092 }
1093 }
1094 }
1095 });
1096 }
1097 } else {
1098 tracing::warn!(
1099 "Signal startup announcement requested but signal channel is not enabled/configured"
1100 );
1101 }
1102 }
1103
1104 for ch in &startup_announce_channels {
1105 if ch != "telegram" && ch != "whatsapp" && ch != "signal" {
1106 tracing::warn!(
1107 channel = %ch,
1108 "startup announcement requested for channel without recipient mapping support"
1109 );
1110 }
1111 }
1112 }
1113
1114 if state.telegram.is_some() {
1115 let use_polling = config
1116 .channels
1117 .telegram
1118 .as_ref()
1119 .map(|c| !c.webhook_mode)
1120 .unwrap_or(true);
1121 if use_polling {
1122 let poll_state = state.clone();
1123 tokio::spawn(async move {
1124 api::telegram_poll_loop(poll_state).await;
1125 });
1126 }
1127 }
1128 if state.discord.is_some() {
1129 let poll_state = state.clone();
1130 tokio::spawn(async move {
1131 api::discord_poll_loop(poll_state).await;
1132 });
1133 }
1134 if state.signal.is_some() {
1135 let poll_state = state.clone();
1136 tokio::spawn(async move {
1137 api::signal_poll_loop(poll_state).await;
1138 });
1139 }
1140 if state.email.is_some() {
1141 let poll_state = state.clone();
1142 tokio::spawn(async move {
1143 api::email_poll_loop(poll_state).await;
1144 });
1145 }
1146
1147 let auth_layer = ApiKeyLayer::new(config.server.api_key.clone());
1148 let local_origin = format!("http://{}:{}", config.server.bind, config.server.port);
1149 let origin_header = local_origin
1150 .parse::<axum::http::HeaderValue>()
1151 .unwrap_or_else(|e| {
1152 tracing::warn!(
1153 origin = %local_origin,
1154 error = %e,
1155 "CORS origin failed to parse, falling back to 127.0.0.1 loopback"
1156 );
1157 axum::http::HeaderValue::from_static("http://127.0.0.1:3000")
1158 });
1159 let cors = CorsLayer::new()
1160 .allow_origin(origin_header)
1161 .allow_methods([
1162 axum::http::Method::GET,
1163 axum::http::Method::POST,
1164 axum::http::Method::PUT,
1165 axum::http::Method::DELETE,
1166 ])
1167 .allow_headers([
1168 axum::http::header::CONTENT_TYPE,
1169 axum::http::header::AUTHORIZATION,
1170 axum::http::HeaderName::from_static("x-api-key"),
1171 ]);
1172 let authed_routes = build_router(state.clone()).layer(auth_layer);
1173
1174 let ws_routes = axum::Router::new().route(
1177 "/ws",
1178 ws_route(
1179 event_bus.clone(),
1180 state.ws_tickets.clone(),
1181 config.server.api_key.clone(),
1182 ),
1183 );
1184
1185 let mcp_routes = build_mcp_router(&state, config.server.api_key.clone());
1189
1190 let public_routes = build_public_router(state);
1191
1192 let app = authed_routes
1193 .merge(ws_routes)
1194 .merge(mcp_routes)
1195 .merge(public_routes)
1196 .layer(cors)
1197 .layer(rate_limiter);
1198 Ok(app)
1199}
1200
1201#[cfg(test)]
1202pub mod test_support;
1203
1204#[cfg(test)]
1205mod tests {
1206 use super::*;
1207 use crate::test_support::EnvGuard;
1208
1209 const BOOTSTRAP_CONFIG: &str = r#"
1210[agent]
1211name = "Roboticus"
1212id = "roboticus-test"
1213
1214[server]
1215port = 18789
1216bind = "127.0.0.1"
1217
1218[database]
1219path = ":memory:"
1220
1221[models]
1222primary = "ollama/qwen3:8b"
1223"#;
1224
1225 #[tokio::test]
1226 async fn bootstrap_with_memory_db_succeeds() {
1227 let config = RoboticusConfig::from_str(BOOTSTRAP_CONFIG).expect("parse config");
1228 let result = bootstrap(config).await;
1229 assert!(
1230 result.is_ok(),
1231 "bootstrap with :memory: should succeed: {:?}",
1232 result.err()
1233 );
1234 }
1235
1236 #[test]
1237 fn cleanup_old_logs_no_panic_on_missing_dir() {
1238 let dir = std::path::Path::new("/tmp/roboticus-test-nonexistent-dir-cleanup");
1239 cleanup_old_logs(dir, 30);
1240 }
1241
1242 #[test]
1243 fn cleanup_old_logs_keeps_recent_logs() {
1244 let dir = tempfile::tempdir().unwrap();
1245 let recent = dir.path().join("recent.log");
1246 std::fs::write(&recent, "fresh log").unwrap();
1247
1248 cleanup_old_logs(dir.path(), 30);
1249 assert!(recent.exists(), "recent logs should remain");
1250 }
1251
1252 #[test]
1253 fn cleanup_old_logs_ignores_non_log_files() {
1254 let dir = tempfile::tempdir().unwrap();
1255 let txt = dir.path().join("data.txt");
1256 std::fs::write(&txt, "keep me").unwrap();
1257
1258 cleanup_old_logs(dir.path(), 0);
1259 assert!(txt.exists(), "non-log files should not be deleted");
1260 }
1261
1262 #[test]
1263 fn cleanup_old_logs_empty_dir() {
1264 let dir = tempfile::tempdir().unwrap();
1265 cleanup_old_logs(dir.path(), 1);
1266 }
1267
1268 #[test]
1269 fn taskable_subagent_roles_are_strict() {
1270 assert!(is_taskable_subagent_role("subagent"));
1271 assert!(is_taskable_subagent_role("specialist"));
1272 assert!(is_taskable_subagent_role("SubAgent"));
1273 assert!(!is_taskable_subagent_role("model-proxy"));
1274 }
1275
1276 #[test]
1277 fn resolve_token_prefers_keystore_reference_then_env_then_empty() {
1278 let dir = tempfile::tempdir().unwrap();
1279 let keystore_path = dir.path().join("keystore.enc");
1280 let keystore = roboticus_core::keystore::Keystore::new(keystore_path);
1281 keystore.unlock("pw").unwrap();
1282 keystore.set("telegram_bot_token", "from_keystore").unwrap();
1283
1284 let _env = EnvGuard::set("TEST_TELEGRAM_TOKEN", "from_env");
1285 let token = resolve_token(
1286 &Some("keystore:telegram_bot_token".to_string()),
1287 "TEST_TELEGRAM_TOKEN",
1288 &keystore,
1289 );
1290 assert_eq!(token, "from_keystore");
1291
1292 let fallback = resolve_token(
1293 &Some("keystore:missing".to_string()),
1294 "TEST_TELEGRAM_TOKEN",
1295 &keystore,
1296 );
1297 assert_eq!(fallback, "from_env");
1298
1299 let empty = resolve_token(&None, "", &keystore);
1300 assert!(empty.is_empty());
1301 }
1302
1303 #[test]
1304 fn cleanup_old_logs_can_prune_when_window_is_zero_days() {
1305 let dir = tempfile::tempdir().unwrap();
1306 let old_log = dir.path().join("old.log");
1307 std::fs::write(&old_log, "stale").unwrap();
1308 std::thread::sleep(std::time::Duration::from_millis(10));
1309 cleanup_old_logs(dir.path(), 0);
1310 assert!(!old_log.exists());
1311 }
1312}