Skip to main content

systemprompt_api/services/server/
runner.rs

1use anyhow::Result;
2use std::sync::Arc;
3use systemprompt_runtime::AppContext;
4use systemprompt_traits::{Phase, StartupEvent, StartupEventExt, StartupEventSender};
5
6use super::lifecycle::{
7    initialize_scheduler, reconcile_agents, reconcile_system_services, start_event_bridge,
8};
9
10pub async fn run_server(ctx: AppContext, events: Option<StartupEventSender>) -> Result<()> {
11    let start_time = std::time::Instant::now();
12
13    let mcp_orchestrator = create_mcp_orchestrator(&ctx)?;
14
15    start_event_bridge(&ctx);
16    reconcile_system_services(&ctx, &mcp_orchestrator, events.as_ref()).await?;
17
18    if let Some(ref tx) = events {
19        tx.phase_started(Phase::Agents);
20    }
21    match reconcile_agents(&ctx, events.as_ref()).await {
22        Ok(started_count) => {
23            if let Some(ref tx) = events {
24                if tx
25                    .unbounded_send(StartupEvent::AgentReconciliationComplete {
26                        running: started_count,
27                        total: started_count,
28                    })
29                    .is_err()
30                {
31                    tracing::debug!("Startup event receiver dropped");
32                }
33                tx.phase_completed(Phase::Agents);
34            }
35        },
36        Err(e) => {
37            if let Some(ref tx) = events {
38                tx.phase_failed(Phase::Agents, e.to_string());
39                if tx
40                    .unbounded_send(StartupEvent::Error {
41                        message: format!("Agent reconciliation failed: {e}"),
42                        fatal: true,
43                    })
44                    .is_err()
45                {
46                    tracing::debug!("Startup event receiver dropped");
47                }
48            }
49            return Err(e);
50        },
51    }
52
53    if let Some(ref tx) = events {
54        tx.phase_started(Phase::Scheduler);
55    }
56    match initialize_scheduler(&ctx, events.as_ref()).await {
57        Ok(()) => {
58            if let Some(ref tx) = events {
59                tx.phase_completed(Phase::Scheduler);
60            }
61        },
62        Err(e) => {
63            if let Some(ref tx) = events {
64                tx.phase_failed(Phase::Scheduler, e.to_string());
65            }
66        },
67    }
68
69    if let Some(ref tx) = events {
70        tx.phase_started(Phase::ApiServer);
71    }
72    let api_server = crate::services::server::setup_api_server(&ctx, events.clone())?;
73    let addr = ctx.server_address();
74
75    if let Some(ref tx) = events {
76        tx.phase_completed(Phase::ApiServer);
77    }
78
79    if let Some(ref tx) = events {
80        tx.startup_complete(start_time.elapsed(), format!("http://{}", addr), vec![]);
81    }
82
83    systemprompt_logging::set_startup_mode(false);
84
85    api_server.serve(&addr).await
86}
87
88fn create_mcp_orchestrator(
89    ctx: &AppContext,
90) -> Result<Arc<systemprompt_mcp::services::McpOrchestrator>> {
91    use systemprompt_mcp::services::McpOrchestrator;
92    let manager = McpOrchestrator::new(
93        Arc::clone(ctx.db_pool()),
94        Arc::clone(ctx.app_paths_arc()),
95        ctx.mcp_registry().clone(),
96    )?;
97    Ok(Arc::new(manager))
98}