use std::sync::Arc;
use anyhow::Result;
use std::time::Duration;
use systemprompt_traits::StartupEventSender;
use tokio::task::JoinHandle;
use super::AgentOrchestrator;
use crate::services::agent_orchestration::OrchestrationResult;
use crate::services::agent_orchestration::monitor::AgentMonitor;
impl AgentOrchestrator {
pub async fn run_daemon(&mut self) -> OrchestrationResult<()> {
tracing::info!("Starting Agent Orchestrator daemon");
self.start_health_monitoring();
loop {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
tracing::info!("Received shutdown signal");
break;
}
() = tokio::time::sleep(Duration::from_secs(60)) => {
if let Err(e) = self.cleanup_crashed_agents().await {
tracing::error!(error = %e, "Cleanup error");
}
}
}
}
self.shutdown();
Ok(())
}
pub(super) async fn startup_reconciliation(
&self,
_events: Option<&StartupEventSender>,
) -> OrchestrationResult<()> {
tracing::debug!("Performing startup reconciliation");
let reconciled = self.reconciler.reconcile_running_services().await?;
let started_fixed = crate::services::agent_orchestration::reconciler::AgentReconciler::reconcile_starting_services();
let report = self.reconciler.perform_consistency_check().await?;
if report.has_inconsistencies() {
let fixed = self.reconciler.fix_inconsistencies(&report).await?;
tracing::info!(fixed = %fixed, "Fixed inconsistencies");
}
let total_fixed = reconciled + started_fixed;
if total_fixed > 0 {
tracing::info!(fixed = %total_fixed, "Startup reconciliation complete");
} else {
tracing::debug!("Startup reconciliation complete - no issues found");
}
Ok(())
}
pub(super) fn start_health_monitoring(&mut self) {
let pool = Arc::clone(self.agent_state.db_pool());
let handle: JoinHandle<Result<()>> = tokio::spawn(async move {
let monitor = match AgentMonitor::new(&pool) {
Ok(m) => m,
Err(e) => {
tracing::error!(error = %e, "Failed to initialize health monitor");
return Ok(());
},
};
let mut interval = tokio::time::interval(Duration::from_secs(60));
interval.tick().await;
loop {
interval.tick().await;
match monitor.monitor_all_agents().await {
Ok(report) => {
if report.total_agents() > 0 {
tracing::debug!(
healthy = %report.healthy.len(),
total = %report.total_agents(),
percentage = %format!("{:.1}", report.healthy_percentage()),
"Health check complete"
);
}
},
Err(e) => {
tracing::error!(error = %e, "Health monitoring error");
},
}
if let Err(e) = monitor.cleanup_unresponsive_agents().await {
tracing::error!(error = %e, "Cleanup error");
}
}
});
self.monitoring_handle = Some(handle);
}
pub fn shutdown(&mut self) {
tracing::info!("Shutting down Agent Orchestrator");
if let Some(handle) = self.monitoring_handle.take() {
handle.abort();
tracing::debug!("Stopped health monitoring");
}
tracing::info!("Agent Orchestrator shutdown complete");
}
}