systemprompt_api/services/server/
runner.rs1use anyhow::Result;
2use std::sync::Arc;
3use systemprompt_runtime::AppContext;
4use systemprompt_traits::{Phase, StartupEvent, StartupEventExt, StartupEventSender};
5
6use super::lifecycle::{
7 initialize_scheduler, reconcile_agents, reconcile_system_services, start_event_bridge,
8};
9
10pub async fn run_server(ctx: AppContext, events: Option<StartupEventSender>) -> Result<()> {
11 let start_time = std::time::Instant::now();
12
13 let mcp_orchestrator = create_mcp_orchestrator(&ctx)?;
14
15 start_event_bridge(&ctx);
16 reconcile_system_services(&ctx, &mcp_orchestrator, events.as_ref()).await?;
17
18 if let Some(ref tx) = events {
19 tx.phase_started(Phase::Agents);
20 }
21 match reconcile_agents(&ctx, events.as_ref()).await {
22 Ok(started_count) => {
23 if let Some(ref tx) = events {
24 if tx
25 .unbounded_send(StartupEvent::AgentReconciliationComplete {
26 running: started_count,
27 total: started_count,
28 })
29 .is_err()
30 {
31 tracing::debug!("Startup event receiver dropped");
32 }
33 tx.phase_completed(Phase::Agents);
34 }
35 },
36 Err(e) => {
37 if let Some(ref tx) = events {
38 tx.phase_failed(Phase::Agents, e.to_string());
39 if tx
40 .unbounded_send(StartupEvent::Error {
41 message: format!("Agent reconciliation failed: {e}"),
42 fatal: true,
43 })
44 .is_err()
45 {
46 tracing::debug!("Startup event receiver dropped");
47 }
48 }
49 return Err(e);
50 },
51 }
52
53 if let Some(ref tx) = events {
54 tx.phase_started(Phase::Scheduler);
55 }
56 let scheduler_handle = match initialize_scheduler(&ctx, events.as_ref()).await {
57 Ok(handle) => {
58 if let Some(ref tx) = events {
59 tx.phase_completed(Phase::Scheduler);
60 }
61 handle
62 },
63 Err(e) => {
64 if let Some(ref tx) = events {
65 tx.phase_failed(Phase::Scheduler, e.to_string());
66 }
67 None
68 },
69 };
70
71 if let Some(ref tx) = events {
72 tx.phase_started(Phase::ApiServer);
73 }
74 let api_server = crate::services::server::setup_api_server(&ctx, events.clone())?;
75 let addr = ctx.server_address();
76
77 if let Some(ref tx) = events {
78 tx.phase_completed(Phase::ApiServer);
79 }
80
81 if let Some(ref tx) = events {
82 tx.startup_complete(start_time.elapsed(), format!("http://{}", addr), vec![]);
83 }
84
85 systemprompt_logging::set_startup_mode(false);
86
87 let serve_result = api_server
88 .serve(&addr, super::shutdown::shutdown_signal())
89 .await;
90
91 super::shutdown::drain(&ctx, scheduler_handle).await;
92
93 serve_result
94}
95
96fn create_mcp_orchestrator(
97 ctx: &AppContext,
98) -> Result<Arc<systemprompt_mcp::services::McpOrchestrator>> {
99 use systemprompt_mcp::services::McpOrchestrator;
100 let manager = McpOrchestrator::new(
101 Arc::clone(ctx.db_pool()),
102 Arc::clone(ctx.app_paths_arc()),
103 ctx.mcp_registry().clone(),
104 )?;
105 Ok(Arc::new(manager))
106}