systemprompt_api/services/server/
runner.rs1use anyhow::Result;
2use std::sync::Arc;
3use systemprompt_runtime::AppContext;
4use systemprompt_traits::{Phase, StartupEvent, StartupEventExt, StartupEventSender};
5
6use super::lifecycle::{initialize_scheduler, reconcile_agents, reconcile_system_services};
7
8pub async fn run_server(ctx: AppContext, events: Option<StartupEventSender>) -> Result<()> {
9 let start_time = std::time::Instant::now();
10
11 let mcp_orchestrator = create_mcp_orchestrator(&ctx)?;
12 reconcile_system_services(&ctx, &mcp_orchestrator, events.as_ref()).await?;
13
14 if let Some(ref tx) = events {
15 tx.phase_started(Phase::Agents);
16 }
17 match reconcile_agents(&ctx, events.as_ref()).await {
18 Ok(started_count) => {
19 if let Some(ref tx) = events {
20 if tx
21 .unbounded_send(StartupEvent::AgentReconciliationComplete {
22 running: started_count,
23 total: started_count,
24 })
25 .is_err()
26 {
27 tracing::debug!("Startup event receiver dropped");
28 }
29 tx.phase_completed(Phase::Agents);
30 }
31 },
32 Err(e) => {
33 if let Some(ref tx) = events {
34 tx.phase_failed(Phase::Agents, e.to_string());
35 if tx
36 .unbounded_send(StartupEvent::Error {
37 message: format!("Agent reconciliation failed: {e}"),
38 fatal: true,
39 })
40 .is_err()
41 {
42 tracing::debug!("Startup event receiver dropped");
43 }
44 }
45 return Err(e);
46 },
47 }
48
49 if let Some(ref tx) = events {
50 tx.phase_started(Phase::Scheduler);
51 }
52 match initialize_scheduler(&ctx, events.as_ref()).await {
53 Ok(()) => {
54 if let Some(ref tx) = events {
55 tx.phase_completed(Phase::Scheduler);
56 }
57 },
58 Err(e) => {
59 if let Some(ref tx) = events {
60 tx.phase_failed(Phase::Scheduler, e.to_string());
61 }
62 },
63 }
64
65 if let Some(ref tx) = events {
66 tx.phase_started(Phase::ApiServer);
67 }
68 let api_server = crate::services::server::setup_api_server(&ctx, events.clone())?;
69 let addr = ctx.server_address();
70
71 if let Some(ref tx) = events {
72 tx.phase_completed(Phase::ApiServer);
73 }
74
75 if let Some(ref tx) = events {
76 tx.startup_complete(start_time.elapsed(), format!("http://{}", addr), vec![]);
77 }
78
79 systemprompt_logging::set_startup_mode(false);
80
81 api_server.serve(&addr).await
82}
83
84fn create_mcp_orchestrator(
85 ctx: &AppContext,
86) -> Result<Arc<systemprompt_mcp::services::McpManager>> {
87 use systemprompt_mcp::services::McpManager;
88 let manager = McpManager::new(ctx.db_pool().clone())?;
89 Ok(Arc::new(manager))
90}