Skip to main content

systemprompt_api/services/server/
runner.rs

1use anyhow::Result;
2use std::sync::Arc;
3use systemprompt_runtime::AppContext;
4use systemprompt_traits::{Phase, StartupEvent, StartupEventExt, StartupEventSender};
5
6use super::lifecycle::{initialize_scheduler, reconcile_agents, reconcile_system_services};
7
8pub async fn run_server(ctx: AppContext, events: Option<StartupEventSender>) -> Result<()> {
9    let start_time = std::time::Instant::now();
10
11    let mcp_orchestrator = create_mcp_orchestrator(&ctx)?;
12    reconcile_system_services(&ctx, &mcp_orchestrator, events.as_ref()).await?;
13
14    if let Some(ref tx) = events {
15        tx.phase_started(Phase::Agents);
16    }
17    match reconcile_agents(&ctx, events.as_ref()).await {
18        Ok(started_count) => {
19            if let Some(ref tx) = events {
20                if tx
21                    .unbounded_send(StartupEvent::AgentReconciliationComplete {
22                        running: started_count,
23                        total: started_count,
24                    })
25                    .is_err()
26                {
27                    tracing::debug!("Startup event receiver dropped");
28                }
29                tx.phase_completed(Phase::Agents);
30            }
31        },
32        Err(e) => {
33            if let Some(ref tx) = events {
34                tx.phase_failed(Phase::Agents, e.to_string());
35                if tx
36                    .unbounded_send(StartupEvent::Error {
37                        message: format!("Agent reconciliation failed: {e}"),
38                        fatal: true,
39                    })
40                    .is_err()
41                {
42                    tracing::debug!("Startup event receiver dropped");
43                }
44            }
45            return Err(e);
46        },
47    }
48
49    if let Some(ref tx) = events {
50        tx.phase_started(Phase::Scheduler);
51    }
52    match initialize_scheduler(&ctx, events.as_ref()).await {
53        Ok(()) => {
54            if let Some(ref tx) = events {
55                tx.phase_completed(Phase::Scheduler);
56            }
57        },
58        Err(e) => {
59            if let Some(ref tx) = events {
60                tx.phase_failed(Phase::Scheduler, e.to_string());
61            }
62        },
63    }
64
65    if let Some(ref tx) = events {
66        tx.phase_started(Phase::ApiServer);
67    }
68    let api_server = crate::services::server::setup_api_server(&ctx, events.clone())?;
69    let addr = ctx.server_address();
70
71    if let Some(ref tx) = events {
72        tx.phase_completed(Phase::ApiServer);
73    }
74
75    if let Some(ref tx) = events {
76        tx.startup_complete(start_time.elapsed(), format!("http://{}", addr), vec![]);
77    }
78
79    systemprompt_logging::set_startup_mode(false);
80
81    api_server.serve(&addr).await
82}
83
84fn create_mcp_orchestrator(
85    ctx: &AppContext,
86) -> Result<Arc<systemprompt_mcp::services::McpManager>> {
87    use systemprompt_mcp::services::McpManager;
88    let manager = McpManager::new(ctx.db_pool().clone())?;
89    Ok(Arc::new(manager))
90}