pub use roboticus_api as api_crate;
pub use roboticus_api::abuse;
pub use roboticus_api::api;
pub use roboticus_api::auth;
pub use roboticus_api::config_runtime;
pub use roboticus_api::cron_runtime;
pub use roboticus_api::dashboard;
pub use roboticus_api::rate_limit;
pub use roboticus_api::ws;
pub use roboticus_api::ws_ticket;
pub use roboticus_api::{
AppState, EventBus, PersonalityState, TicketStore, build_dashboard_html, build_mcp_router,
build_public_router, build_router, dashboard_handler, ws_route,
};
pub use roboticus_cli::cli;
pub use roboticus_cli::migrate;
pub mod config_maintenance;
pub mod daemon;
pub mod plugins;
pub use roboticus_cli::state_hygiene;
use std::sync::Arc;
use std::sync::OnceLock;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::sync::RwLock;
use tower_http::cors::CorsLayer;
use auth::ApiKeyLayer;
use roboticus_agent::policy::{
AuthorityRule, CommandSafetyRule, ConfigProtectionRule, FinancialRule, PathProtectionRule,
PolicyEngine, RateLimitRule, ValidationRule,
};
use roboticus_agent::subagents::SubagentRegistry;
use roboticus_browser::Browser;
use roboticus_channels::ChannelAdapter;
use roboticus_channels::a2a::A2aProtocol;
use roboticus_channels::router::ChannelRouter;
use roboticus_channels::telegram::TelegramAdapter;
use roboticus_channels::whatsapp::WhatsAppAdapter;
use roboticus_core::RoboticusConfig;
use roboticus_db::Database;
use roboticus_llm::LlmService;
use roboticus_llm::OAuthManager;
use roboticus_wallet::{WalletPaymentHandler, WalletService};
use roboticus_agent::approvals::ApprovalManager;
use roboticus_agent::obsidian::ObsidianVault;
use roboticus_agent::obsidian_tools::{ObsidianReadTool, ObsidianSearchTool, ObsidianWriteTool};
use roboticus_agent::tools::{
BashTool, EchoTool, EditFileTool, GlobFilesTool, ListDirectoryTool, ReadFileTool,
ScriptRunnerTool, SearchFilesTool, ToolRegistry, WriteFileTool,
};
use roboticus_channels::discord::DiscordAdapter;
use roboticus_channels::email::EmailAdapter;
use roboticus_channels::matrix::MatrixAdapter;
use roboticus_channels::signal::SignalAdapter;
use roboticus_channels::voice::{VoiceConfig, VoicePipeline};
use rate_limit::GlobalRateLimitLayer;
static STDERR_ENABLED: AtomicBool = AtomicBool::new(false);
static LOG_GUARD: OnceLock<tracing_appender::non_blocking::WorkerGuard> = OnceLock::new();
fn is_taskable_subagent_role(role: &str) -> bool {
role.eq_ignore_ascii_case("subagent")
|| role.eq_ignore_ascii_case("specialist")
|| role.eq_ignore_ascii_case("observer")
}
pub fn enable_stderr_logging() {
STDERR_ENABLED.store(true, Ordering::Release);
}
fn init_logging(config: &RoboticusConfig) {
use tracing_appender::rolling::{RollingFileAppender, Rotation};
use tracing_subscriber::EnvFilter;
use tracing_subscriber::Layer;
use tracing_subscriber::filter::filter_fn;
use tracing_subscriber::fmt;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
let level = config.agent.log_level.as_str();
let base_filter = format!("{level},hyper=warn,h2=warn,rustls=warn");
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&base_filter));
let stderr_gate = filter_fn(|_| STDERR_ENABLED.load(Ordering::Acquire));
let log_dir = &config.server.log_dir;
let stderr_layer = fmt::layer()
.with_writer(std::io::stderr)
.with_filter(stderr_gate);
let file_appender = std::fs::create_dir_all(log_dir).ok().and_then(|_| {
RollingFileAppender::builder()
.rotation(Rotation::DAILY)
.filename_prefix("roboticus")
.filename_suffix("log")
.build(log_dir)
.map_err(|e| {
eprintln!(
"warning: failed to initialize file logging in {}: {e}",
log_dir.display()
);
e
})
.ok()
});
if let Some(file_appender) = file_appender {
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
let _ = LOG_GUARD.set(guard);
let file_layer = fmt::layer()
.with_writer(non_blocking)
.with_ansi(false)
.json();
let _ = tracing_subscriber::registry()
.with(filter)
.with(stderr_layer)
.with(file_layer)
.try_init();
} else {
let _ = tracing_subscriber::registry()
.with(filter)
.with(stderr_layer)
.try_init();
}
cleanup_old_logs(log_dir, config.server.log_max_days);
}
fn cleanup_old_logs(log_dir: &std::path::Path, max_days: u32) {
let cutoff =
std::time::SystemTime::now() - std::time::Duration::from_secs(u64::from(max_days) * 86400);
let entries = match std::fs::read_dir(log_dir) {
Ok(e) => e,
Err(_) => return,
};
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("log") {
continue;
}
if let Ok(meta) = entry.metadata()
&& let Ok(modified) = meta.modified()
&& modified < cutoff
{
let _ = std::fs::remove_file(&path);
}
}
}
fn resolve_token(
token_ref: &Option<String>,
token_env: &str,
keystore: &roboticus_core::Keystore,
) -> String {
if let Some(r) = token_ref
&& let Some(name) = r.strip_prefix("keystore:")
{
if let Some(val) = keystore.get(name) {
return val;
}
tracing::warn!(key = %name, "keystore reference not found, falling back to env var");
}
if !token_env.is_empty() {
return match std::env::var(token_env) {
Ok(val) if !val.is_empty() => val,
Ok(_) => {
tracing::warn!(env_var = %token_env, "API key env var is set but empty");
String::new()
}
Err(_) => {
tracing::warn!(env_var = %token_env, "API key env var is not set");
String::new()
}
};
}
String::new()
}
pub async fn bootstrap(
config: RoboticusConfig,
) -> Result<axum::Router, Box<dyn std::error::Error>> {
bootstrap_with_config_path(config, None).await
}
struct BootPhase {
start: std::time::Instant,
dot_handle: tokio::task::JoinHandle<()>,
}
impl BootPhase {
fn start(label: &str) -> Self {
use std::io::Write;
eprint!(" {label} ");
let _ = std::io::stderr().flush();
let dot_handle = tokio::spawn(async {
loop {
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
eprint!(".");
let _ = std::io::stderr().flush();
}
});
Self {
start: std::time::Instant::now(),
dot_handle,
}
}
fn done(self) {
self.dot_handle.abort();
let elapsed = self.start.elapsed();
if elapsed.as_millis() >= 500 {
eprintln!("({:.1}s)", elapsed.as_secs_f64());
} else {
eprintln!("ok");
}
}
}
pub async fn bootstrap_with_config_path(
config: RoboticusConfig,
config_path: Option<std::path::PathBuf>,
) -> Result<axum::Router, Box<dyn std::error::Error>> {
init_logging(&config);
let personality_state = api::PersonalityState::from_workspace(&config.agent.workspace);
if !personality_state.os_text.is_empty() {
tracing::info!(
personality = %personality_state.identity.name,
generated_by = %personality_state.identity.generated_by,
"Loaded personality files from workspace"
);
} else {
tracing::info!("No personality files found in workspace, using defaults");
}
let db_path = config.database.path.to_string_lossy().to_string();
let bp = BootPhase::start("database");
let db = Database::new(&db_path)?;
bp.done();
match crate::state_hygiene::run_state_hygiene(&config.database.path) {
Ok(report) if report.changed => {
tracing::info!(
changed_rows = report.changed_rows,
subagent_rows_normalized = report.subagent_rows_normalized,
cron_payload_rows_repaired = report.cron_payload_rows_repaired,
cron_jobs_disabled_invalid_expr = report.cron_jobs_disabled_invalid_expr,
"applied startup mechanic checks"
);
}
Ok(_) => {}
Err(e) => tracing::warn!(error = %e, "startup mechanic checks failed"),
}
match roboticus_db::sessions::backfill_nicknames(&db) {
Ok(0) => {}
Ok(n) => tracing::info!(count = n, "Backfilled session nicknames"),
Err(e) => tracing::warn!(error = %e, "Failed to backfill session nicknames"),
}
let bp = BootPhase::start("LLM service");
let mut llm = LlmService::new(&config)?;
match roboticus_db::metrics::recent_quality_scores(&db, 200) {
Ok(scores) if !scores.is_empty() => llm.quality.seed_from_history(&scores),
Ok(_) => {}
Err(e) => tracing::warn!(error = %e, "failed to seed quality tracker from history"),
}
llm.quality
.seed_from_baselines(roboticus_llm::exercise::COMMON_MODEL_BASELINES);
{
let mut candidates = vec![config.models.primary.clone()];
candidates.extend(config.models.fallbacks.iter().cloned());
for model in &candidates {
if llm.quality.observation_count(model) == 0 {
tracing::info!(
model = model.as_str(),
"model has no performance data — run 'roboticus models exercise {}' to baseline it",
model
);
}
}
}
bp.done();
let bp = BootPhase::start("wallet");
let wallet = match tokio::time::timeout(
std::time::Duration::from_secs(30),
WalletService::new(&config),
)
.await
{
Ok(result) => result?,
Err(_) => {
bp.done();
return Err("wallet init timed out (30s) — check RPC endpoint connectivity".into());
}
};
bp.done();
let wallet_arc = Arc::new(wallet.wallet.clone());
let treasury_policy = roboticus_wallet::treasury::TreasuryPolicy::new(&config.treasury);
let x402_handler =
Arc::new(WalletPaymentHandler::new(wallet_arc).with_treasury_policy(treasury_policy));
llm.set_payment_handler(x402_handler);
let bp = BootPhase::start("protocols + plugins");
let a2a = A2aProtocol::new(config.a2a.clone());
let plugin_env = {
let mut env = std::collections::HashMap::new();
env.insert("ROBOTICUS_DELEGATION_DEPTH".into(), "0".into());
env.insert(
"ROBOTICUS_WORKSPACE".into(),
config.agent.workspace.display().to_string(),
);
if let Some(ref ws) = config.skills.workspace_dir {
env.insert("ROBOTICUS_SKILLS_DIR".into(), ws.display().to_string());
}
env
};
let plugin_registry = plugins::init_plugin_registry(&config.plugins, plugin_env).await;
let mut policy_engine = PolicyEngine::new();
policy_engine.add_rule(Box::new(AuthorityRule));
policy_engine.add_rule(Box::new(CommandSafetyRule));
policy_engine.add_rule(Box::new(FinancialRule::new(
config.treasury.per_payment_cap,
)));
policy_engine.add_rule(Box::new(PathProtectionRule::from_config(
&config.security.filesystem,
)));
policy_engine.add_rule(Box::new(RateLimitRule::default()));
policy_engine.add_rule(Box::new(ValidationRule));
policy_engine.add_rule(Box::new(ConfigProtectionRule::default()));
let policy_engine = Arc::new(policy_engine);
let browser = Arc::new(Browser::new(config.browser.clone()));
let registry = Arc::new(SubagentRegistry::new(4, vec![]));
if let Ok(sub_agents) = roboticus_db::agents::list_enabled_sub_agents(&db) {
for sa in &sub_agents {
if !is_taskable_subagent_role(&sa.role) {
continue;
}
let resolved_model = match sa.model.trim().to_ascii_lowercase().as_str() {
"auto" | "orchestrator" => llm.router.select_model().to_string(),
_ => sa.model.clone(),
};
let fixed_skills = sa
.skills_json
.as_deref()
.and_then(|s| serde_json::from_str::<Vec<String>>(s).ok())
.unwrap_or_default();
let agent_config = roboticus_agent::subagents::AgentInstanceConfig {
id: sa.name.clone(),
name: sa.display_name.clone().unwrap_or_else(|| sa.name.clone()),
model: resolved_model,
skills: fixed_skills,
allowed_subagents: vec![],
max_concurrent: 4,
};
if let Err(e) = registry.register(agent_config).await {
tracing::warn!(agent = %sa.name, err = %e, "failed to register sub-agent");
} else if let Err(e) = registry.start_agent(&sa.name).await {
tracing::warn!(agent = %sa.name, err = %e, "failed to auto-start sub-agent");
}
}
if !sub_agents.is_empty() {
tracing::info!(
count = sub_agents.len(),
"registered sub-agents from database"
);
}
}
let event_bus = EventBus::new(256);
let keystore =
roboticus_core::keystore::Keystore::new(roboticus_core::keystore::Keystore::default_path());
if let Err(e) = keystore.unlock_machine() {
tracing::warn!("keystore auto-unlock failed: {e}");
}
let keystore = Arc::new(keystore);
bp.done();
let bp = BootPhase::start("channels");
let channel_router = Arc::new(ChannelRouter::with_store(db.clone()).await);
let telegram: Option<Arc<TelegramAdapter>> =
if let Some(ref tg_config) = config.channels.telegram {
if tg_config.enabled {
let token = resolve_token(&tg_config.token_ref, &tg_config.token_env, &keystore);
if !token.is_empty() {
let adapter = Arc::new(TelegramAdapter::with_config(
token,
tg_config.poll_timeout_seconds,
tg_config.allowed_chat_ids.clone(),
tg_config.webhook_secret.clone(),
config.security.deny_on_empty_allowlist,
));
channel_router
.register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
.await;
tracing::info!("Telegram adapter registered");
if tg_config.webhook_secret.is_none() {
tracing::warn!(
"Telegram webhook_secret not set; webhook endpoint will reject with 503"
);
}
Some(adapter)
} else {
tracing::warn!(
token_env = %tg_config.token_env,
"Telegram enabled but token is empty"
);
None
}
} else {
None
}
} else {
None
};
let whatsapp: Option<Arc<WhatsAppAdapter>> =
if let Some(ref wa_config) = config.channels.whatsapp {
if wa_config.enabled {
let token = resolve_token(&wa_config.token_ref, &wa_config.token_env, &keystore);
if !token.is_empty() && !wa_config.phone_number_id.is_empty() {
let adapter = Arc::new(WhatsAppAdapter::with_config(
token,
wa_config.phone_number_id.clone(),
wa_config.verify_token.clone(),
wa_config.allowed_numbers.clone(),
wa_config.app_secret.clone(),
config.security.deny_on_empty_allowlist,
)?);
channel_router
.register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
.await;
tracing::info!("WhatsApp adapter registered");
if wa_config.app_secret.is_none() {
tracing::warn!(
"WhatsApp app_secret not set; webhook endpoint will reject with 503"
);
}
Some(adapter)
} else {
tracing::warn!("WhatsApp enabled but token env or phone_number_id is empty");
None
}
} else {
None
}
} else {
None
};
let discord: Option<Arc<DiscordAdapter>> = if let Some(ref dc_config) = config.channels.discord
{
if dc_config.enabled {
let token = resolve_token(&dc_config.token_ref, &dc_config.token_env, &keystore);
if !token.is_empty() {
let adapter = Arc::new(DiscordAdapter::with_config(
token,
dc_config.allowed_guild_ids.clone(),
config.security.deny_on_empty_allowlist,
));
channel_router
.register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
.await;
if let Err(e) = adapter.connect_gateway().await {
tracing::error!(error = %e, "Failed to connect Discord gateway");
}
tracing::info!("Discord adapter registered with gateway");
Some(adapter)
} else {
tracing::warn!(
token_env = %dc_config.token_env,
"Discord enabled but token env var is empty"
);
None
}
} else {
None
}
} else {
None
};
let signal: Option<Arc<SignalAdapter>> = if let Some(ref sig_config) = config.channels.signal {
if sig_config.enabled {
if !sig_config.phone_number.is_empty() {
let adapter = Arc::new(SignalAdapter::with_config(
sig_config.phone_number.clone(),
sig_config.daemon_url.clone(),
sig_config.allowed_numbers.clone(),
config.security.deny_on_empty_allowlist,
));
channel_router
.register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
.await;
tracing::info!("Signal adapter registered");
Some(adapter)
} else {
tracing::warn!("Signal enabled but phone_number is empty");
None
}
} else {
None
}
} else {
None
};
let email: Option<Arc<EmailAdapter>> = if config.channels.email.enabled {
let email_cfg = &config.channels.email;
let password = if email_cfg.password_env.is_empty() {
String::new()
} else {
match std::env::var(&email_cfg.password_env) {
Ok(val) => val,
Err(_) => {
tracing::warn!(env_var = %email_cfg.password_env, "email password env var is not set");
String::new()
}
}
};
if email_cfg.smtp_host.is_empty()
|| email_cfg.username.is_empty()
|| password.is_empty()
|| email_cfg.from_address.is_empty()
{
tracing::warn!("Email enabled but SMTP credentials are incomplete");
None
} else {
match EmailAdapter::new(
email_cfg.from_address.clone(),
email_cfg.smtp_host.clone(),
email_cfg.smtp_port,
email_cfg.imap_host.clone(),
email_cfg.imap_port,
email_cfg.username.clone(),
password,
) {
Ok(email_adapter) => {
let oauth2_token =
if email_cfg.use_oauth2 && !email_cfg.oauth2_token_env.is_empty() {
std::env::var(&email_cfg.oauth2_token_env).ok()
} else {
None
};
let adapter = Arc::new(
email_adapter
.with_allowed_senders(email_cfg.allowed_senders.clone())
.with_deny_on_empty(config.security.deny_on_empty_allowlist)
.with_poll_interval(std::time::Duration::from_secs(
email_cfg.poll_interval_seconds,
))
.with_oauth2_token(oauth2_token)
.with_imap_idle_enabled(email_cfg.imap_idle_enabled),
);
channel_router
.register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
.await;
tracing::info!("Email adapter registered");
if !email_cfg.imap_host.is_empty()
&& let Err(e) = adapter.start_imap_listener().await
{
tracing::error!(error = %e, "Failed to start email IMAP listener");
}
Some(adapter)
}
Err(e) => {
tracing::error!(error = %e, "Failed to create email adapter");
None
}
}
}
} else {
None
};
let voice: Option<Arc<RwLock<VoicePipeline>>> = if config.channels.voice.enabled {
let mut voice_config = VoiceConfig::default();
if let Some(stt) = &config.channels.voice.stt_model {
voice_config.stt_model = stt.clone();
}
if let Some(tts) = &config.channels.voice.tts_model {
voice_config.tts_model = tts.clone();
}
if let Some(v) = &config.channels.voice.tts_voice {
voice_config.tts_voice = v.clone();
}
if let Ok(key) = std::env::var("OPENAI_API_KEY")
&& !key.is_empty()
{
voice_config.api_key = Some(key);
}
tracing::info!("Voice pipeline initialized");
Some(Arc::new(RwLock::new(VoicePipeline::new(voice_config))))
} else {
None
};
let _matrix: Option<Arc<MatrixAdapter>> = if let Some(ref mx_config) = config.channels.matrix {
if mx_config.enabled {
let token = resolve_token(&None, &mx_config.access_token_env, &keystore);
if !token.is_empty() && !mx_config.homeserver_url.is_empty() {
let mut adapter = MatrixAdapter::new(
mx_config.homeserver_url.clone(),
token,
mx_config.allowed_rooms.clone(),
mx_config.auto_join,
mx_config.sync_timeout_seconds,
);
if mx_config.encryption_enabled {
let store_path = mx_config.device_store_path.clone().unwrap_or_else(|| {
roboticus_core::home_dir()
.join(".roboticus")
.join("matrix_crypto")
});
match roboticus_channels::matrix_crypto::MatrixCrypto::new(
&store_path,
&mx_config.device_display_name,
) {
Ok(crypto) => {
adapter = adapter.with_crypto(Arc::new(crypto));
tracing::info!("Matrix E2EE enabled");
}
Err(e) => {
tracing::error!(error = %e, "Failed to init Matrix crypto; running unencrypted");
}
}
}
let adapter = Arc::new(adapter);
channel_router
.register(Arc::clone(&adapter) as Arc<dyn ChannelAdapter>)
.await;
tracing::info!("Matrix adapter registered");
Some(adapter)
} else {
tracing::warn!("Matrix enabled but homeserver_url or access token is empty");
None
}
} else {
None
}
} else {
None
};
let hmac_secret = {
use rand::RngCore;
let mut buf = vec![0u8; 32];
rand::rngs::OsRng.fill_bytes(&mut buf);
buf
};
let retriever = Arc::new(roboticus_agent::retrieval::MemoryRetriever::new(
config.memory.clone(),
));
let mut tool_registry = ToolRegistry::new();
tool_registry.register(Box::new(EchoTool));
tool_registry.register(Box::new(BashTool));
tool_registry.register(Box::new(ScriptRunnerTool::new(
config.skills.clone(),
config.security.filesystem.clone(),
)));
tool_registry.register(Box::new(ReadFileTool));
tool_registry.register(Box::new(WriteFileTool));
tool_registry.register(Box::new(EditFileTool));
tool_registry.register(Box::new(ListDirectoryTool));
tool_registry.register(Box::new(GlobFilesTool));
tool_registry.register(Box::new(SearchFilesTool));
use roboticus_agent::tools::CronTool;
tool_registry.register(Box::new(CronTool));
use roboticus_agent::tools::{
GetChannelHealthTool, GetMemoryStatsTool, GetRuntimeContextTool, GetSubagentStatusTool,
RecallMemoryTool,
};
tool_registry.register(Box::new(GetRuntimeContextTool));
tool_registry.register(Box::new(GetMemoryStatsTool));
tool_registry.register(Box::new(GetChannelHealthTool));
tool_registry.register(Box::new(GetSubagentStatusTool));
tool_registry.register(Box::new(RecallMemoryTool));
use roboticus_agent::tools::{AlterTableTool, CreateTableTool, DropTableTool};
tool_registry.register(Box::new(CreateTableTool));
tool_registry.register(Box::new(AlterTableTool));
tool_registry.register(Box::new(DropTableTool));
bp.done();
let bp = BootPhase::start("plugin tools");
plugins::register_plugin_tools(&mut tool_registry, Arc::clone(&plugin_registry)).await;
bp.done();
let obsidian_vault: Option<Arc<RwLock<ObsidianVault>>> = if config.obsidian.enabled {
match ObsidianVault::from_config(&config.obsidian) {
Ok(vault) => {
let vault = Arc::new(RwLock::new(vault));
tool_registry.register(Box::new(ObsidianReadTool::new(Arc::clone(&vault))));
tool_registry.register(Box::new(ObsidianWriteTool::new(Arc::clone(&vault))));
tool_registry.register(Box::new(ObsidianSearchTool::new(Arc::clone(&vault))));
tracing::info!("Obsidian vault integration enabled");
Some(vault)
}
Err(e) => {
tracing::warn!(error = %e, "Failed to initialize Obsidian vault");
None
}
}
} else {
None
};
if let Some(ref vault) = obsidian_vault
&& config.obsidian.watch_for_changes
{
match roboticus_agent::obsidian::watcher::VaultWatcher::start(Arc::clone(vault)).await {
Ok(_watcher) => {
tracing::info!("Obsidian vault file watcher started");
}
Err(e) => {
tracing::warn!(error = %e, "Failed to start Obsidian vault watcher");
}
}
}
let tool_registry = Arc::new(tool_registry);
let capabilities = Arc::new(roboticus_agent::capability::CapabilityRegistry::new());
if let Err(e) = capabilities
.sync_from_tool_registry(Arc::clone(&tool_registry))
.await
{
tracing::warn!(error = %e, "initial capability sync from tool registry reported errors");
}
let approvals = Arc::new(ApprovalManager::new(config.approvals.clone()));
let oauth = Arc::new(OAuthManager::new()?);
let discovery_registry = Arc::new(RwLock::new(
roboticus_agent::discovery::DiscoveryRegistry::new(),
));
let device_manager = Arc::new(RwLock::new(roboticus_agent::device::DeviceManager::new(
roboticus_agent::device::DeviceIdentity::generate(&config.agent.id),
config.devices.max_paired_devices,
)));
let mut mcp_clients = roboticus_agent::mcp::McpClientManager::new();
for c in &config.mcp.clients {
let mut conn =
roboticus_agent::mcp::McpClientConnection::new(c.name.clone(), c.url.clone());
if let Err(e) = conn.discover() {
tracing::warn!(mcp_name = %c.name, error = %e, "MCP client discovery failed at startup");
}
mcp_clients.add_connection(conn);
}
let mcp_clients = Arc::new(RwLock::new(mcp_clients));
let live_mcp = Arc::new(roboticus_agent::mcp::manager::McpConnectionManager::new());
let bp = BootPhase::start("MCP connections");
match tokio::time::timeout(
std::time::Duration::from_secs(30),
live_mcp.connect_all(&config.mcp.servers, &capabilities),
)
.await
{
Ok(()) => {}
Err(_) => {
tracing::warn!("MCP connect_all timed out after 30s; some servers may be unavailable");
}
}
bp.done();
let live_mcp_tool_count = live_mcp.connected_count().await;
if live_mcp_tool_count > 0 {
tracing::info!(
servers = live_mcp_tool_count,
"MCP client connections established"
);
}
let mut mcp_server_registry = roboticus_agent::mcp::McpServerRegistry::new();
let exported = roboticus_agent::mcp::export_tools_as_mcp(
&tool_registry
.list()
.iter()
.map(|t| {
(
t.name().to_string(),
t.description().to_string(),
t.parameters_schema(),
)
})
.collect::<Vec<_>>(),
);
for tool in exported {
mcp_server_registry.register_tool(tool);
}
mcp_server_registry.register_resource(roboticus_agent::mcp::McpResource {
uri: "roboticus://sessions/active".to_string(),
name: "Active Sessions".to_string(),
description: "Active sessions in the local runtime".to_string(),
mime_type: "application/json".to_string(),
});
mcp_server_registry.register_resource(roboticus_agent::mcp::McpResource {
uri: "roboticus://metrics/capacity".to_string(),
name: "Provider Capacity Stats".to_string(),
description: "Current provider utilization and headroom".to_string(),
mime_type: "application/json".to_string(),
});
let mcp_server_registry = Arc::new(RwLock::new(mcp_server_registry));
let ann_index = roboticus_db::ann::AnnIndex::new(config.memory.ann_index);
if config.memory.ann_index {
match ann_index.build_from_db(&db) {
Ok(count) => {
if ann_index.is_built() {
tracing::info!(count, "ANN index built from database");
} else {
tracing::info!(
count,
min = ann_index.min_entries_for_index,
"ANN index below threshold, brute-force search will be used"
);
}
}
Err(e) => {
tracing::warn!(error = %e, "Failed to build ANN index, falling back to brute-force");
}
}
}
let media_service = if config.multimodal.enabled {
match roboticus_channels::media::MediaService::new(&config.multimodal) {
Ok(svc) => {
tracing::info!(
media_dir = ?config.multimodal.media_dir,
"Media service initialized"
);
Some(Arc::new(svc))
}
Err(e) => {
tracing::error!(error = %e, "Failed to initialize media service");
None
}
}
} else {
None
};
let resolved_config_path =
config_path.unwrap_or_else(crate::config_runtime::resolve_default_config_path);
let rate_limiter = GlobalRateLimitLayer::new(
u64::from(config.server.rate_limit_requests),
Duration::from_secs(config.server.rate_limit_window_secs),
)
.with_per_ip_capacity(u64::from(config.server.per_ip_rate_limit_requests))
.with_per_actor_capacity(u64::from(config.server.per_actor_rate_limit_requests))
.with_trusted_proxy_cidrs(&config.server.trusted_proxy_cidrs);
let semantic_classifier = Arc::new(
roboticus_llm::semantic_classifier::SemanticClassifier::new(llm.embedding.clone()),
);
let state = AppState {
db,
config: Arc::new(RwLock::new(config.clone())),
llm: Arc::new(RwLock::new(llm)),
wallet: Arc::new(wallet),
a2a: Arc::new(RwLock::new(a2a)),
personality: Arc::new(RwLock::new(personality_state)),
hmac_secret: Arc::new(hmac_secret),
interviews: Arc::new(RwLock::new(std::collections::HashMap::new())),
plugins: plugin_registry,
policy_engine,
browser,
registry,
event_bus: event_bus.clone(),
channel_router,
telegram,
whatsapp,
retriever,
ann_index,
tools: tool_registry,
capabilities,
approvals,
discord,
signal,
email,
voice,
media_service,
discovery: discovery_registry,
devices: device_manager,
mcp_clients,
mcp_server: mcp_server_registry,
live_mcp,
oauth,
keystore,
obsidian: obsidian_vault,
started_at: std::time::Instant::now(),
config_path: Arc::new(resolved_config_path.clone()),
config_apply_status: crate::config_runtime::status_for_path(&resolved_config_path),
pending_specialist_proposals: Arc::new(RwLock::new(std::collections::HashMap::new())),
ws_tickets: ws_ticket::TicketStore::new(),
rate_limiter: rate_limiter.clone(),
semantic_classifier,
};
let bp = BootPhase::start("capability sync");
state.resync_capabilities_from_tools().await;
bp.done();
if config.memory.ann_index {
let ann_db = state.db.clone();
let ann_idx = state.ann_index.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(600));
interval.tick().await;
loop {
interval.tick().await;
match ann_idx.rebuild(&ann_db) {
Ok(count) => {
tracing::debug!(count, "ANN index rebuilt");
}
Err(e) => {
tracing::warn!(error = %e, "ANN index rebuild failed");
}
}
}
});
tracing::info!("ANN index rebuild daemon spawned (10min interval)");
}
{
let loaded = roboticus_db::cache::load_cache_entries(&state.db)
.inspect_err(|e| tracing::warn!(error = %e, "failed to load semantic cache entries"))
.unwrap_or_default();
if !loaded.is_empty() {
let imported: Vec<(String, roboticus_llm::ExportedCacheEntry)> = loaded
.into_iter()
.map(|(id, pe)| {
let ttl = pe
.expires_at
.and_then(|e| {
chrono::NaiveDateTime::parse_from_str(&e, "%Y-%m-%dT%H:%M:%S")
.ok()
.or_else(|| {
chrono::NaiveDateTime::parse_from_str(&e, "%Y-%m-%d %H:%M:%S")
.ok()
})
})
.map(|exp| {
let now = chrono::Utc::now().naive_utc();
if exp > now {
(exp - now).num_seconds().max(0) as u64
} else {
0
}
})
.unwrap_or(3600);
(
id,
roboticus_llm::ExportedCacheEntry {
content: pe.response,
model: pe.model,
tokens_saved: pe.tokens_saved,
hits: pe.hit_count,
involved_tools: false,
embedding: pe.embedding,
ttl_remaining_secs: ttl,
},
)
})
.collect();
let count = imported.len();
let llm = state.llm.read().await;
llm.cache
.lock()
.unwrap_or_else(|e| e.into_inner())
.import_entries(imported);
tracing::info!(count, "Loaded semantic cache from database");
}
}
{
let flush_db = state.db.clone();
let flush_llm = Arc::clone(&state.llm);
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(300));
interval.tick().await; loop {
interval.tick().await;
let entries = {
let llm = flush_llm.read().await;
llm.cache
.lock()
.unwrap_or_else(|e| e.into_inner())
.export_entries()
};
for (hash, entry) in &entries {
let expires = chrono::Utc::now()
+ chrono::Duration::seconds(entry.ttl_remaining_secs as i64);
let pe = roboticus_db::cache::PersistedCacheEntry {
prompt_hash: hash.clone(),
response: entry.content.clone(),
model: entry.model.clone(),
tokens_saved: entry.tokens_saved,
hit_count: entry.hits,
embedding: entry.embedding.clone(),
created_at: chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S").to_string(),
expires_at: Some(expires.format("%Y-%m-%dT%H:%M:%S").to_string()),
};
roboticus_db::cache::save_cache_entry(&flush_db, hash, &pe)
.inspect_err(
|e| tracing::warn!(error = %e, hash, "failed to persist cache entry"),
)
.ok();
}
roboticus_db::cache::evict_expired_cache(&flush_db)
.inspect_err(|e| tracing::warn!(error = %e, "failed to evict expired cache"))
.ok();
tracing::debug!(count = entries.len(), "Flushed semantic cache to database");
}
});
tracing::info!("Cache flush daemon spawned (5min interval)");
}
let (heartbeat_shutdown_tx, heartbeat_treasury_state) = {
let hb_wallet = Arc::clone(&state.wallet);
let hb_db = state.db.clone();
let hb_session_cfg = config.session.clone();
let hb_digest_cfg = config.digest.clone();
let hb_agent_id = config.agent.id.clone();
let hb_vault = state.obsidian.clone();
let hb_config = config.heartbeat.clone();
roboticus_schedule::spawn_domain_loops(
hb_wallet,
hb_db,
hb_session_cfg,
hb_digest_cfg,
hb_agent_id,
hb_vault,
hb_config,
)
.await
};
tracing::info!(
treasury_s = config.heartbeat.treasury_interval_seconds,
yield_s = config.heartbeat.yield_interval_seconds,
memory_s = config.heartbeat.memory_interval_seconds,
maintenance_s = config.heartbeat.maintenance_interval_seconds,
session_s = config.heartbeat.session_interval_seconds,
discovery_s = config.heartbeat.discovery_interval_seconds,
"domain loops spawned"
);
std::mem::forget(heartbeat_shutdown_tx);
std::mem::forget(heartbeat_treasury_state);
{
let drain_router = Arc::clone(&state.channel_router);
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
interval.tick().await;
loop {
interval.tick().await;
drain_router.drain_retry_queue().await;
}
});
tracing::info!("Delivery retry queue drain daemon spawned (30s interval)");
}
{
let instance_id = config.agent.id.clone();
let cron_state = state.clone();
tokio::spawn(async move {
crate::cron_runtime::run_cron_worker(cron_state, instance_id).await;
});
tracing::info!("Cron worker spawned");
}
{
let db_path = config.database.path.clone();
let interval_secs = std::env::var("ROBOTICUS_MECHANIC_CHECK_INTERVAL_SECS")
.ok()
.or_else(|| std::env::var("ROBOTICUS_STATE_HYGIENE_INTERVAL_SECS").ok())
.and_then(|v| v.parse::<u64>().ok())
.filter(|v| *v >= 300)
.unwrap_or(21_600);
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
interval.tick().await;
loop {
interval.tick().await;
match crate::state_hygiene::run_state_hygiene(&db_path) {
Ok(report) if report.changed => {
tracing::info!(
changed_rows = report.changed_rows,
subagent_rows_normalized = report.subagent_rows_normalized,
cron_payload_rows_repaired = report.cron_payload_rows_repaired,
cron_jobs_disabled_invalid_expr =
report.cron_jobs_disabled_invalid_expr,
"periodic mechanic checks applied"
);
}
Ok(_) => tracing::debug!("periodic mechanic checks: no changes"),
Err(e) => tracing::warn!(error = %e, "periodic mechanic checks failed"),
}
}
});
tracing::info!(interval_secs, "Mechanic checks daemon spawned");
}
{
let startup_announce_channels = config.channels.startup_announcement_channels();
let display_host = match config.server.bind.as_str() {
"127.0.0.1" | "0.0.0.0" | "::1" | "::" => "localhost",
other => other,
};
let channel_list: Vec<&str> = {
let mut ch = vec!["web"];
if config.channels.telegram.is_some() {
ch.push("telegram");
}
if config.channels.whatsapp.is_some() {
ch.push("whatsapp");
}
if config.channels.discord.is_some() {
ch.push("discord");
}
if config.channels.signal.is_some() {
ch.push("signal");
}
ch
};
let announce_text = format!(
"🤖 *Roboticus Online*\n\n\
🧠 *{}* (`{}`)\n\
⚡ `{}`\n\
🔀 routing: {}\n\
🌐 `{}:{}`\n\
📡 {}\n\n\
🕐 {}",
config.agent.name,
config.agent.id,
config.models.primary,
config.models.routing.mode,
display_host,
config.server.port,
channel_list.join(" · "),
chrono::Local::now().format("%Y-%m-%d %H:%M %Z"),
);
if startup_announce_channels.iter().any(|c| c == "telegram") {
if let (Some(adapter), Some(tg_cfg)) =
(state.telegram.clone(), config.channels.telegram.as_ref())
{
let announce_targets = tg_cfg.allowed_chat_ids.clone();
if announce_targets.is_empty() {
tracing::warn!(
"Telegram startup announcement skipped: channels.telegram.allowed_chat_ids is empty"
);
} else {
let text = announce_text.clone();
tokio::spawn(async move {
for chat_id in announce_targets {
let chat = chat_id.to_string();
match adapter
.send(roboticus_channels::OutboundMessage {
content: text.clone(),
recipient_id: chat.clone(),
metadata: None,
})
.await
{
Ok(()) => {
tracing::info!(chat_id = %chat, "telegram startup announcement sent")
}
Err(e) => {
tracing::warn!(chat_id = %chat, error = %e, "telegram startup announcement failed")
}
}
}
});
}
} else {
tracing::warn!(
"Telegram startup announcement requested but telegram channel is not enabled/configured"
);
}
}
if startup_announce_channels.iter().any(|c| c == "whatsapp") {
if let (Some(adapter), Some(wa_cfg)) =
(state.whatsapp.clone(), config.channels.whatsapp.as_ref())
{
let targets = wa_cfg.allowed_numbers.clone();
if targets.is_empty() {
tracing::warn!(
"WhatsApp startup announcement skipped: channels.whatsapp.allowed_numbers is empty"
);
} else {
let text = announce_text.clone();
tokio::spawn(async move {
for number in targets {
match adapter
.send(roboticus_channels::OutboundMessage {
content: text.clone(),
recipient_id: number.clone(),
metadata: None,
})
.await
{
Ok(()) => {
tracing::info!(recipient = %number, "whatsapp startup announcement sent")
}
Err(e) => {
tracing::warn!(recipient = %number, error = %e, "whatsapp startup announcement failed")
}
}
}
});
}
} else {
tracing::warn!(
"WhatsApp startup announcement requested but whatsapp channel is not enabled/configured"
);
}
}
if startup_announce_channels.iter().any(|c| c == "signal") {
if let (Some(adapter), Some(sig_cfg)) =
(state.signal.clone(), config.channels.signal.as_ref())
{
let targets = sig_cfg.allowed_numbers.clone();
if targets.is_empty() {
tracing::warn!(
"Signal startup announcement skipped: channels.signal.allowed_numbers is empty"
);
} else {
let text = announce_text.clone();
tokio::spawn(async move {
for number in targets {
match adapter
.send(roboticus_channels::OutboundMessage {
content: text.clone(),
recipient_id: number.clone(),
metadata: None,
})
.await
{
Ok(()) => {
tracing::info!(recipient = %number, "signal startup announcement sent")
}
Err(e) => {
tracing::warn!(recipient = %number, error = %e, "signal startup announcement failed")
}
}
}
});
}
} else {
tracing::warn!(
"Signal startup announcement requested but signal channel is not enabled/configured"
);
}
}
for ch in &startup_announce_channels {
if ch != "telegram" && ch != "whatsapp" && ch != "signal" {
tracing::warn!(
channel = %ch,
"startup announcement requested for channel without recipient mapping support"
);
}
}
}
if state.telegram.is_some() {
let use_polling = config
.channels
.telegram
.as_ref()
.map(|c| !c.webhook_mode)
.unwrap_or(true);
if use_polling {
let poll_state = state.clone();
tokio::spawn(async move {
api::telegram_poll_loop(poll_state).await;
});
}
}
if state.discord.is_some() {
let poll_state = state.clone();
tokio::spawn(async move {
api::discord_poll_loop(poll_state).await;
});
}
if state.signal.is_some() {
let poll_state = state.clone();
tokio::spawn(async move {
api::signal_poll_loop(poll_state).await;
});
}
if state.email.is_some() {
let poll_state = state.clone();
tokio::spawn(async move {
api::email_poll_loop(poll_state).await;
});
}
let auth_layer = ApiKeyLayer::new(config.server.api_key.clone());
let local_origin = format!("http://{}:{}", config.server.bind, config.server.port);
let origin_header = local_origin
.parse::<axum::http::HeaderValue>()
.unwrap_or_else(|e| {
tracing::warn!(
origin = %local_origin,
error = %e,
"CORS origin failed to parse, falling back to 127.0.0.1 loopback"
);
axum::http::HeaderValue::from_static("http://127.0.0.1:3000")
});
let cors = CorsLayer::new()
.allow_origin(origin_header)
.allow_methods([
axum::http::Method::GET,
axum::http::Method::POST,
axum::http::Method::PUT,
axum::http::Method::DELETE,
])
.allow_headers([
axum::http::header::CONTENT_TYPE,
axum::http::header::AUTHORIZATION,
axum::http::HeaderName::from_static("x-api-key"),
]);
let authed_routes = build_router(state.clone()).layer(auth_layer);
let ws_routes = axum::Router::new().route(
"/ws",
ws_route(
event_bus.clone(),
state.ws_tickets.clone(),
config.server.api_key.clone(),
),
);
let mcp_routes = build_mcp_router(&state, config.server.api_key.clone());
let public_routes = build_public_router(state);
let app = authed_routes
.merge(ws_routes)
.merge(mcp_routes)
.merge(public_routes)
.layer(cors)
.layer(rate_limiter);
Ok(app)
}
#[cfg(test)]
pub mod test_support;
#[cfg(test)]
mod tests {
use super::*;
use crate::test_support::EnvGuard;
const BOOTSTRAP_CONFIG: &str = r#"
[agent]
name = "Roboticus"
id = "roboticus-test"
[server]
port = 18789
bind = "127.0.0.1"
[database]
path = ":memory:"
[models]
primary = "ollama/qwen3:8b"
"#;
#[tokio::test]
async fn bootstrap_with_memory_db_succeeds() {
let config = RoboticusConfig::from_str(BOOTSTRAP_CONFIG).expect("parse config");
let result = bootstrap(config).await;
assert!(
result.is_ok(),
"bootstrap with :memory: should succeed: {:?}",
result.err()
);
}
#[tokio::test]
async fn bootstrap_handles_enabled_channels_with_missing_credentials() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("state.db");
let cfg = format!(
r#"
[agent]
name = "T"
id = "t"
[server]
bind = "127.0.0.1"
port = 18789
[database]
path = "{db_path}"
[models]
primary = "ollama/qwen3:8b"
[channels.telegram]
enabled = true
token_env = "TEST_TELEGRAM_MISSING"
[channels.whatsapp]
enabled = true
token_env = "TEST_WHATSAPP_MISSING"
phone_number_id = ""
[channels.discord]
enabled = true
token_env = "TEST_DISCORD_MISSING"
[channels.signal]
enabled = true
phone_number = ""
[channels.email]
enabled = true
password_env = "TEST_EMAIL_PASSWORD_MISSING"
[channels.voice]
enabled = true
[obsidian]
enabled = true
vault_path = "{vault_path}"
watch_for_changes = true
"#,
db_path = db_path.display(),
vault_path = dir.path().join("missing-vault").display(),
);
let config = RoboticusConfig::from_str(&cfg).expect("parse config");
let result = bootstrap(config).await;
assert!(
result.is_ok(),
"bootstrap should tolerate disabled channel credentials and invalid obsidian path: {:?}",
result.err()
);
}
#[tokio::test]
async fn bootstrap_loads_taskable_subagents_from_database() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("state.db");
let db = roboticus_db::Database::new(db_path.to_str().unwrap()).unwrap();
roboticus_db::agents::upsert_sub_agent(
&db,
&roboticus_db::agents::SubAgentRow {
id: "sa-1".into(),
name: "finance_specialist".into(),
display_name: Some("Finance Specialist".into()),
model: "auto".into(),
fallback_models_json: Some("[]".into()),
role: "subagent".into(),
description: Some("Handles finance tasks".into()),
skills_json: Some(r#"["budgeting"]"#.into()),
enabled: true,
session_count: 0,
last_used_at: None,
},
)
.unwrap();
roboticus_db::agents::upsert_sub_agent(
&db,
&roboticus_db::agents::SubAgentRow {
id: "sa-2".into(),
name: "observer".into(),
display_name: Some("Observer".into()),
model: "ollama/qwen3:8b".into(),
fallback_models_json: Some("[]".into()),
role: "model-proxy".into(),
description: Some("Not taskable".into()),
skills_json: None,
enabled: true,
session_count: 0,
last_used_at: None,
},
)
.unwrap();
let cfg = format!(
r#"
[agent]
name = "T"
id = "t"
[server]
bind = "127.0.0.1"
port = 18789
[database]
path = "{db_path}"
[models]
primary = "ollama/qwen3:8b"
"#,
db_path = db_path.display(),
);
let config = RoboticusConfig::from_str(&cfg).expect("parse config");
let result = bootstrap(config).await;
assert!(
result.is_ok(),
"bootstrap should register enabled taskable subagents from db: {:?}",
result.err()
);
}
#[test]
fn cleanup_old_logs_no_panic_on_missing_dir() {
let dir = std::path::Path::new("/tmp/roboticus-test-nonexistent-dir-cleanup");
cleanup_old_logs(dir, 30);
}
#[test]
fn cleanup_old_logs_keeps_recent_logs() {
let dir = tempfile::tempdir().unwrap();
let recent = dir.path().join("recent.log");
std::fs::write(&recent, "fresh log").unwrap();
cleanup_old_logs(dir.path(), 30);
assert!(recent.exists(), "recent logs should remain");
}
#[test]
fn cleanup_old_logs_ignores_non_log_files() {
let dir = tempfile::tempdir().unwrap();
let txt = dir.path().join("data.txt");
std::fs::write(&txt, "keep me").unwrap();
cleanup_old_logs(dir.path(), 0);
assert!(txt.exists(), "non-log files should not be deleted");
}
#[test]
fn cleanup_old_logs_empty_dir() {
let dir = tempfile::tempdir().unwrap();
cleanup_old_logs(dir.path(), 1);
}
#[test]
fn taskable_subagent_roles_are_strict() {
assert!(is_taskable_subagent_role("subagent"));
assert!(is_taskable_subagent_role("specialist"));
assert!(is_taskable_subagent_role("SubAgent"));
assert!(!is_taskable_subagent_role("model-proxy"));
}
#[test]
fn resolve_token_prefers_keystore_reference_then_env_then_empty() {
let dir = tempfile::tempdir().unwrap();
let keystore_path = dir.path().join("keystore.enc");
let keystore = roboticus_core::keystore::Keystore::new(keystore_path);
keystore.unlock("pw").unwrap();
keystore.set("telegram_bot_token", "from_keystore").unwrap();
let _env = EnvGuard::set("TEST_TELEGRAM_TOKEN", "from_env");
let token = resolve_token(
&Some("keystore:telegram_bot_token".to_string()),
"TEST_TELEGRAM_TOKEN",
&keystore,
);
assert_eq!(token, "from_keystore");
let fallback = resolve_token(
&Some("keystore:missing".to_string()),
"TEST_TELEGRAM_TOKEN",
&keystore,
);
assert_eq!(fallback, "from_env");
let empty = resolve_token(&None, "", &keystore);
assert!(empty.is_empty());
}
#[test]
fn resolve_token_uses_env_when_no_keystore_ref_is_provided() {
let dir = tempfile::tempdir().unwrap();
let keystore_path = dir.path().join("keystore.enc");
let keystore = roboticus_core::keystore::Keystore::new(keystore_path);
keystore.unlock("pw").unwrap();
let _env = EnvGuard::set("TEST_SIGNAL_TOKEN", "from_env_only");
let token = resolve_token(&None, "TEST_SIGNAL_TOKEN", &keystore);
assert_eq!(token, "from_env_only");
}
#[test]
fn resolve_token_returns_empty_for_empty_env_var() {
let dir = tempfile::tempdir().unwrap();
let keystore_path = dir.path().join("keystore.enc");
let keystore = roboticus_core::keystore::Keystore::new(keystore_path);
keystore.unlock("pw").unwrap();
let _env = EnvGuard::set("TEST_EMPTY_TOKEN", "");
let token = resolve_token(&None, "TEST_EMPTY_TOKEN", &keystore);
assert!(token.is_empty());
}
#[test]
fn cleanup_old_logs_can_prune_when_window_is_zero_days() {
let dir = tempfile::tempdir().unwrap();
let old_log = dir.path().join("old.log");
std::fs::write(&old_log, "stale").unwrap();
std::thread::sleep(std::time::Duration::from_millis(10));
cleanup_old_logs(dir.path(), 0);
assert!(!old_log.exists());
}
}