systemprompt-api 0.1.18

HTTP API server and gateway for systemprompt.io OS
Documentation
use anyhow::Result;
use std::sync::Arc;
use systemprompt_mcp::services::registry::RegistryManager;
use systemprompt_runtime::AppContext;
use systemprompt_traits::{Phase, StartupEvent, StartupEventExt, StartupEventSender};

pub async fn reconcile_system_services(
    ctx: &AppContext,
    mcp_orchestrator: &Arc<systemprompt_mcp::services::McpManager>,
    events: Option<&StartupEventSender>,
) -> Result<()> {
    if let Some(tx) = events {
        tx.phase_started(Phase::McpServers);
    }

    match cleanup_stale_service_entries(ctx, events).await {
        Ok(count) => {
            if count > 0 {
                if let Some(tx) = events {
                    if tx
                        .unbounded_send(StartupEvent::McpServiceCleanup {
                            name: format!("{count} services"),
                            reason: "Stale entries removed".to_string(),
                        })
                        .is_err()
                    {
                        tracing::debug!("Startup event receiver dropped");
                    }
                }
            }
        },
        Err(e) => {
            if let Some(tx) = events {
                tx.warning(format!("Could not clean stale entries: {e}"));
            }
        },
    }

    let required_servers = RegistryManager::get_enabled_servers()?;
    let required_count = required_servers.len();

    match mcp_orchestrator.reconcile().await {
        Ok(running_count) => {
            handle_reconcile_success(
                running_count,
                required_count,
                &required_servers,
                mcp_orchestrator,
                ctx,
                events,
            )
            .await?;
        },
        Err(e) => {
            if let Some(tx) = events {
                tx.phase_failed(Phase::McpServers, e.to_string());
            }
            return Err(anyhow::anyhow!(
                "FATAL: MCP reconciliation failed: {}\n\nCannot start API without MCP servers.",
                e
            ));
        },
    }

    if let Some(tx) = events {
        tx.phase_completed(Phase::McpServers);
    }
    Ok(())
}

async fn handle_reconcile_success(
    running_count: usize,
    required_count: usize,
    required_servers: &[systemprompt_mcp::McpServerConfig],
    mcp_orchestrator: &Arc<systemprompt_mcp::services::McpManager>,
    ctx: &AppContext,
    events: Option<&StartupEventSender>,
) -> Result<()> {
    if running_count < required_count {
        return handle_missing_servers(required_servers, mcp_orchestrator, events).await;
    }

    if running_count > 0 {
        verify_database_registration(required_servers, ctx, events).await?;
    }

    if let Some(tx) = events {
        if tx
            .unbounded_send(StartupEvent::McpReconciliationComplete {
                running: running_count,
                required: required_count,
            })
            .is_err()
        {
            tracing::debug!("Startup event receiver dropped");
        }
    }

    Ok(())
}

async fn handle_missing_servers(
    required_servers: &[systemprompt_mcp::McpServerConfig],
    mcp_orchestrator: &Arc<systemprompt_mcp::services::McpManager>,
    events: Option<&StartupEventSender>,
) -> Result<()> {
    let running_servers = mcp_orchestrator.get_running_servers().await?;
    let running_names: std::collections::HashSet<String> =
        running_servers.iter().map(|s| s.name.clone()).collect();

    let missing: Vec<String> = required_servers
        .iter()
        .map(|s| s.name.clone())
        .filter(|name| !running_names.contains(name))
        .collect();

    if let Some(tx) = events {
        if tx
            .unbounded_send(StartupEvent::Error {
                message: format!(
                    "Server status mismatch: {} servers failed to start: {}",
                    missing.len(),
                    missing.join(", ")
                ),
                fatal: true,
            })
            .is_err()
        {
            tracing::debug!("Startup event receiver dropped");
        }
    }

    Err(anyhow::anyhow!(
        "FATAL: {} required MCP server(s) failed to start: {}\n\nsystemprompt.io OS cannot \
         operate without MCP servers.\nAgents need tools to function.\n\nBuild missing binaries \
         with:\n  cargo build --bin {}\n\nOr build all MCP servers:\n  just mcp build",
        missing.len(),
        missing.join(", "),
        missing.join(" --bin ")
    ))
}

async fn verify_database_registration(
    required_servers: &[systemprompt_mcp::McpServerConfig],
    ctx: &AppContext,
    events: Option<&StartupEventSender>,
) -> Result<()> {
    use systemprompt_database::ServiceRepository;
    let service_repo = ServiceRepository::new(ctx.db_pool())?;

    let mut verification_failed = Vec::new();

    for server in required_servers {
        match service_repo.get_service_by_name(&server.name).await {
            Ok(Some(service)) if service.status == "running" => {
                if let Some(tx) = events {
                    if tx
                        .unbounded_send(StartupEvent::McpServerReady {
                            name: server.name.clone(),
                            port: service.port as u16,
                            startup_time: std::time::Duration::ZERO,
                            tools: 0,
                        })
                        .is_err()
                    {
                        tracing::debug!("Startup event receiver dropped");
                    }
                }
            },
            Ok(Some(service)) => {
                verification_failed.push(format!("{} (status: {})", server.name, service.status));
            },
            Ok(None) => {
                verification_failed.push(format!("{} (not in database)", server.name));
            },
            Err(e) => {
                verification_failed.push(format!("{} (db error: {})", server.name, e));
            },
        }
    }

    if !verification_failed.is_empty() {
        if let Some(tx) = events {
            if tx
                .unbounded_send(StartupEvent::Error {
                    message: format!(
                        "Database verification failed for {} service(s): {}",
                        verification_failed.len(),
                        verification_failed.join(", ")
                    ),
                    fatal: true,
                })
                .is_err()
            {
                tracing::debug!("Startup event receiver dropped");
            }
        }
        return Err(anyhow::anyhow!(
            "FATAL: MCP services running but not properly registered in database\n\nThis \
             indicates a race condition or database synchronization issue.\nFailed services: {}",
            verification_failed.join(", ")
        ));
    }

    Ok(())
}

async fn cleanup_stale_service_entries(
    ctx: &AppContext,
    events: Option<&StartupEventSender>,
) -> Result<u64> {
    use systemprompt_database::ServiceRepository;
    use systemprompt_scheduler::ProcessCleanup;

    let repo = ServiceRepository::new(ctx.db_pool())?;
    let mut deleted_count = 0u64;

    let mcp_services = repo.get_mcp_services().await?;
    for service in mcp_services {
        let should_delete = match service.status.as_str() {
            "running" => service
                .pid
                .is_none_or(|pid| !ProcessCleanup::process_exists(pid as u32)),
            "error" | "stopped" => true,
            _ => false,
        };

        if should_delete && repo.delete_service(&service.name).await.is_ok() {
            deleted_count += 1;
            if let Some(tx) = events {
                if tx
                    .unbounded_send(StartupEvent::McpServiceCleanup {
                        name: service.name.clone(),
                        reason: format!(
                            "Stale entry (status: {}, pid: {:?})",
                            service.status, service.pid
                        ),
                    })
                    .is_err()
                {
                    tracing::debug!("Startup event receiver dropped");
                }
            }
        }
    }

    let agent_service_names = repo.get_all_agent_service_names().await?;
    for service_name in agent_service_names {
        if let Ok(Some(service)) = repo.get_service_by_name(&service_name).await {
            let should_delete = match service.status.as_str() {
                "running" => service
                    .pid
                    .is_none_or(|pid| !ProcessCleanup::process_exists(pid as u32)),
                "error" | "stopped" => true,
                _ => false,
            };

            if should_delete && repo.delete_service(&service_name).await.is_ok() {
                deleted_count += 1;
                if let Some(tx) = events {
                    if tx
                        .unbounded_send(StartupEvent::AgentCleanup {
                            name: service_name.clone(),
                            reason: format!(
                                "Stale entry (status: {}, pid: {:?})",
                                service.status, service.pid
                            ),
                        })
                        .is_err()
                    {
                        tracing::debug!("Startup event receiver dropped");
                    }
                }
            }
        }
    }

    Ok(deleted_count)
}