systemprompt_api/services/server/
runner.rs1use anyhow::Result;
2use std::sync::Arc;
3use systemprompt_runtime::AppContext;
4use systemprompt_traits::{Phase, StartupEvent, StartupEventExt, StartupEventSender};
5
6use super::lifecycle::{
7 initialize_scheduler, reconcile_agents, reconcile_system_services, start_event_bridge,
8};
9
10pub async fn run_server(ctx: AppContext, events: Option<StartupEventSender>) -> Result<()> {
11 let start_time = std::time::Instant::now();
12
13 let mcp_orchestrator = create_mcp_orchestrator(&ctx)?;
14
15 start_event_bridge(&ctx);
16 reconcile_system_services(&ctx, &mcp_orchestrator, events.as_ref()).await?;
17
18 if let Some(ref tx) = events {
19 tx.phase_started(Phase::Agents);
20 }
21 match reconcile_agents(&ctx, events.as_ref()).await {
22 Ok(started_count) => {
23 if let Some(ref tx) = events {
24 if tx
25 .unbounded_send(StartupEvent::AgentReconciliationComplete {
26 running: started_count,
27 total: started_count,
28 })
29 .is_err()
30 {
31 tracing::debug!("Startup event receiver dropped");
32 }
33 tx.phase_completed(Phase::Agents);
34 }
35 },
36 Err(e) => {
37 if let Some(ref tx) = events {
38 tx.phase_failed(Phase::Agents, e.to_string());
39 if tx
40 .unbounded_send(StartupEvent::Error {
41 message: format!("Agent reconciliation failed: {e}"),
42 fatal: true,
43 })
44 .is_err()
45 {
46 tracing::debug!("Startup event receiver dropped");
47 }
48 }
49 return Err(e);
50 },
51 }
52
53 if let Some(ref tx) = events {
54 tx.phase_started(Phase::Scheduler);
55 }
56 match initialize_scheduler(&ctx, events.as_ref()).await {
57 Ok(()) => {
58 if let Some(ref tx) = events {
59 tx.phase_completed(Phase::Scheduler);
60 }
61 },
62 Err(e) => {
63 if let Some(ref tx) = events {
64 tx.phase_failed(Phase::Scheduler, e.to_string());
65 }
66 },
67 }
68
69 if let Some(ref tx) = events {
70 tx.phase_started(Phase::ApiServer);
71 }
72 let api_server = crate::services::server::setup_api_server(&ctx, events.clone())?;
73 let addr = ctx.server_address();
74
75 if let Some(ref tx) = events {
76 tx.phase_completed(Phase::ApiServer);
77 }
78
79 if let Some(ref tx) = events {
80 tx.startup_complete(start_time.elapsed(), format!("http://{}", addr), vec![]);
81 }
82
83 systemprompt_logging::set_startup_mode(false);
84
85 api_server.serve(&addr).await
86}
87
88fn create_mcp_orchestrator(
89 ctx: &AppContext,
90) -> Result<Arc<systemprompt_mcp::services::McpOrchestrator>> {
91 use systemprompt_mcp::services::McpOrchestrator;
92 let manager = McpOrchestrator::new(
93 Arc::clone(ctx.db_pool()),
94 Arc::clone(ctx.app_paths_arc()),
95 ctx.mcp_registry().clone(),
96 )?;
97 Ok(Arc::new(manager))
98}