Skip to main content

systemprompt_api/services/server/
runner.rs

1use anyhow::Result;
2use std::sync::Arc;
3use systemprompt_runtime::AppContext;
4use systemprompt_traits::{Phase, StartupEvent, StartupEventExt, StartupEventSender};
5
6use super::lifecycle::{
7    initialize_scheduler, reconcile_agents, reconcile_system_services, start_event_bridge,
8};
9
10pub async fn run_server(ctx: AppContext, events: Option<StartupEventSender>) -> Result<()> {
11    let start_time = std::time::Instant::now();
12
13    let mcp_orchestrator = create_mcp_orchestrator(&ctx)?;
14
15    start_event_bridge(&ctx);
16    reconcile_system_services(&ctx, &mcp_orchestrator, events.as_ref()).await?;
17
18    if let Some(ref tx) = events {
19        tx.phase_started(Phase::Agents);
20    }
21    match reconcile_agents(&ctx, events.as_ref()).await {
22        Ok(started_count) => {
23            if let Some(ref tx) = events {
24                if tx
25                    .unbounded_send(StartupEvent::AgentReconciliationComplete {
26                        running: started_count,
27                        total: started_count,
28                    })
29                    .is_err()
30                {
31                    tracing::debug!("Startup event receiver dropped");
32                }
33                tx.phase_completed(Phase::Agents);
34            }
35        },
36        Err(e) => {
37            if let Some(ref tx) = events {
38                tx.phase_failed(Phase::Agents, e.to_string());
39                if tx
40                    .unbounded_send(StartupEvent::Error {
41                        message: format!("Agent reconciliation failed: {e}"),
42                        fatal: true,
43                    })
44                    .is_err()
45                {
46                    tracing::debug!("Startup event receiver dropped");
47                }
48            }
49            return Err(e);
50        },
51    }
52
53    if let Some(ref tx) = events {
54        tx.phase_started(Phase::Scheduler);
55    }
56    let scheduler_handle = match initialize_scheduler(&ctx, events.as_ref()).await {
57        Ok(handle) => {
58            if let Some(ref tx) = events {
59                tx.phase_completed(Phase::Scheduler);
60            }
61            handle
62        },
63        Err(e) => {
64            if let Some(ref tx) = events {
65                tx.phase_failed(Phase::Scheduler, e.to_string());
66            }
67            None
68        },
69    };
70
71    if let Some(ref tx) = events {
72        tx.phase_started(Phase::ApiServer);
73    }
74    let api_server = crate::services::server::setup_api_server(&ctx, events.clone())?;
75    let addr = ctx.server_address();
76
77    if let Some(ref tx) = events {
78        tx.phase_completed(Phase::ApiServer);
79    }
80
81    if let Some(ref tx) = events {
82        tx.startup_complete(start_time.elapsed(), format!("http://{}", addr), vec![]);
83    }
84
85    systemprompt_logging::set_startup_mode(false);
86
87    let serve_result = api_server
88        .serve(&addr, super::shutdown::shutdown_signal())
89        .await;
90
91    super::shutdown::drain(&ctx, scheduler_handle).await;
92
93    serve_result
94}
95
96fn create_mcp_orchestrator(
97    ctx: &AppContext,
98) -> Result<Arc<systemprompt_mcp::services::McpOrchestrator>> {
99    use systemprompt_mcp::services::McpOrchestrator;
100    let manager = McpOrchestrator::new(
101        Arc::clone(ctx.db_pool()),
102        Arc::clone(ctx.app_paths_arc()),
103        ctx.mcp_registry().clone(),
104    )?;
105    Ok(Arc::new(manager))
106}