systemprompt-agent 0.1.22

Core Agent protocol module for systemprompt.io
Documentation
use systemprompt_database::DbPool;

use crate::services::agent_orchestration::database::AgentDatabaseService;
use crate::services::agent_orchestration::{OrchestrationResult, process};

#[derive(Debug)]
pub struct AgentReconciler {
    db_service: AgentDatabaseService,
}

impl AgentReconciler {
    pub fn new(db_pool: &DbPool) -> OrchestrationResult<Self> {
        use crate::repository::agent_service::AgentServiceRepository;

        let agent_service_repo = AgentServiceRepository::new(db_pool)?;
        let db_service = AgentDatabaseService::new(agent_service_repo)?;

        Ok(Self { db_service })
    }

    pub async fn reconcile_running_services(&self) -> OrchestrationResult<u32> {
        tracing::debug!("Reconciling running services with actual processes");

        let all_agents = self.db_service.list_all_agents().await?;
        let mut reconciled = 0;

        for (agent_id, status) in all_agents {
            match status {
                crate::services::agent_orchestration::AgentStatus::Running { pid, .. } => {
                    if !process::process_exists(pid) {
                        tracing::warn!(
                            agent_id = %agent_id,
                            pid = %pid,
                            "Agent marked as running but process not found - marking as failed"
                        );
                        self.db_service
                            .mark_failed(&agent_id, "Process died unexpectedly")
                            .await?;
                        reconciled += 1;
                    }
                },
                crate::services::agent_orchestration::AgentStatus::Failed { .. } => {},
            }
        }

        if reconciled > 0 {
            tracing::info!(reconciled = %reconciled, "Reconciled services");
        } else {
            tracing::debug!("All services are correctly synchronized");
        }

        Ok(reconciled)
    }

    pub fn reconcile_starting_services() -> u32 {
        tracing::debug!("Checking for services stuck in 'starting' state");

        0
    }

    pub async fn perform_consistency_check(&self) -> OrchestrationResult<ConsistencyReport> {
        tracing::debug!("Performing database consistency check");

        let mut report = ConsistencyReport::new();
        let all_agents = self.db_service.list_all_agents().await?;

        for (agent_id, status) in all_agents {
            match status {
                crate::services::agent_orchestration::AgentStatus::Running { pid, .. } => {
                    if process::process_exists(pid) {
                        report.consistent_running.push(agent_id);
                    } else {
                        report.inconsistent_running.push((agent_id, pid));
                    }
                },
                crate::services::agent_orchestration::AgentStatus::Failed { .. } => {
                    report.failed.push(agent_id);
                },
            }
        }

        self.find_orphaned_processes(&mut report).await?;

        report.log_summary();
        Ok(report)
    }

    async fn find_orphaned_processes(
        &self,
        report: &mut ConsistencyReport,
    ) -> OrchestrationResult<()> {
        let running_pids = self.db_service.list_running_agents().await?;

        for agent_id in running_pids {
            let status = self.db_service.get_status(&agent_id).await?;
            if let crate::services::agent_orchestration::AgentStatus::Running { pid, .. } = status {
                if !process::process_exists(pid) {
                    report.orphaned_processes.push((agent_id, pid));
                }
            }
        }

        Ok(())
    }

    pub async fn fix_inconsistencies(
        &self,
        report: &ConsistencyReport,
    ) -> OrchestrationResult<u32> {
        let mut fixed = 0;

        for (agent_id, pid) in &report.inconsistent_running {
            tracing::warn!(agent_id = %agent_id, pid = %pid, "Fixing inconsistent agent");
            self.db_service
                .mark_failed(agent_id, &format!("Process {} died", pid))
                .await?;
            fixed += 1;
        }

        for (agent_id, pid) in &report.orphaned_processes {
            tracing::warn!(agent_id = %agent_id, pid = %pid, "Cleaning up orphaned process for agent");
            self.db_service
                .mark_failed(agent_id, &format!("Orphaned process {pid}"))
                .await?;
            fixed += 1;
        }

        if fixed > 0 {
            tracing::info!(fixed = %fixed, "Fixed inconsistencies");
        }

        Ok(fixed)
    }
}

#[derive(Debug)]
pub struct ConsistencyReport {
    pub consistent_running: Vec<String>,
    pub inconsistent_running: Vec<(String, u32)>,
    pub failed: Vec<String>,
    pub orphaned_processes: Vec<(String, u32)>,
}

impl Default for ConsistencyReport {
    fn default() -> Self {
        Self::new()
    }
}

impl ConsistencyReport {
    pub const fn new() -> Self {
        Self {
            consistent_running: Vec::new(),
            inconsistent_running: Vec::new(),
            failed: Vec::new(),
            orphaned_processes: Vec::new(),
        }
    }

    pub fn has_inconsistencies(&self) -> bool {
        !self.inconsistent_running.is_empty() || !self.orphaned_processes.is_empty()
    }

    pub fn total_agents(&self) -> usize {
        self.consistent_running.len() + self.inconsistent_running.len() + self.failed.len()
    }

    pub fn log_summary(&self) {
        tracing::debug!(
            consistent_running = %self.consistent_running.len(),
            inconsistent_running = %self.inconsistent_running.len(),
            failed = %self.failed.len(),
            orphaned_processes = %self.orphaned_processes.len(),
            "Consistency check results"
        );

        if self.has_inconsistencies() {
            tracing::warn!("Inconsistencies detected - run fix_inconsistencies() to repair");
        } else {
            tracing::debug!("All services are consistent");
        }
    }
}