systemprompt_cli/commands/infrastructure/services/
serve.rs1use crate::cli_settings::CliConfig;
2use crate::interactive::confirm_optional;
3use anyhow::{Context, Result};
4use std::sync::Arc;
5use systemprompt_database::install_extension_schemas;
6use systemprompt_extension::ExtensionRegistry;
7use systemprompt_logging::CliService;
8use systemprompt_models::ProfileBootstrap;
9use systemprompt_runtime::{validate_system, AppContext, ServiceCategory};
10use systemprompt_scheduler::ProcessCleanup;
11use systemprompt_traits::{ModuleInfo, Phase, StartupEvent, StartupEventExt, StartupEventSender};
12
13const DEFAULT_API_PORT: u16 = 8080;
14
15fn get_api_port() -> u16 {
16 ProfileBootstrap::get()
17 .map(|p| p.server.port)
18 .unwrap_or(DEFAULT_API_PORT)
19}
20
21pub async fn execute_with_events(
22 foreground: bool,
23 kill_port_process: bool,
24 config: &CliConfig,
25 events: Option<&StartupEventSender>,
26) -> Result<String> {
27 let port = get_api_port();
28
29 if events.is_none() {
30 CliService::startup_banner(Some("Starting services..."));
31 }
32
33 if let Some(pid) = check_port_available(port) {
34 if let Some(tx) = events {
35 tx.unbounded_send(StartupEvent::PortConflict { port, pid })
36 .ok();
37 }
38 handle_port_conflict(port, pid, kill_port_process, config, events).await?;
39 if let Some(tx) = events {
40 tx.unbounded_send(StartupEvent::PortConflictResolved { port })
41 .ok();
42 }
43 } else if let Some(tx) = events {
44 tx.port_available(port);
45 } else {
46 CliService::phase_success(&format!("Port {} available", port), None);
47 }
48
49 register_modules(events);
50
51 let ctx = Arc::new(
52 AppContext::builder()
53 .with_startup_warnings(true)
54 .build()
55 .await
56 .context("Failed to initialize application context")?,
57 );
58
59 run_migrations(&ctx, events).await?;
60
61 systemprompt_logging::init_logging(Arc::clone(ctx.db_pool()));
62
63 if let Some(tx) = events {
64 tx.phase_started(Phase::Database);
65 tx.unbounded_send(StartupEvent::DatabaseValidated).ok();
66 tx.phase_completed(Phase::Database);
67 } else {
68 CliService::phase("Validation");
69 CliService::phase_info("Running system validation...", None);
70 }
71
72 validate_system(&ctx)
73 .await
74 .context("System validation failed")?;
75
76 if events.is_none() {
77 CliService::phase_success("System validation complete", None);
78 }
79
80 if events.is_none() {
81 CliService::phase("Server");
82 if !foreground {
83 CliService::phase_warning("Daemon mode not supported", Some("running in foreground"));
84 }
85 } else if let Some(tx) = events {
86 tx.phase_started(Phase::ApiServer);
87 if !foreground {
88 tx.warning("Daemon mode not supported, running in foreground");
89 }
90 }
91
92 if foreground {
93 systemprompt_api::services::server::run_server(Arc::unwrap_or_clone(ctx), events.cloned())
94 .await?;
95 }
96
97 Ok(format!("http://127.0.0.1:{}", port))
98}
99
100pub async fn execute(foreground: bool, kill_port_process: bool, config: &CliConfig) -> Result<()> {
101 execute_with_events(foreground, kill_port_process, config, None)
102 .await
103 .map(|_| ())
104}
105
106fn check_port_available(port: u16) -> Option<u32> {
107 ProcessCleanup::check_port(port)
108}
109
110fn kill_process(pid: u32) {
111 ProcessCleanup::kill_process(pid);
112}
113
114async fn handle_port_conflict(
115 port: u16,
116 pid: u32,
117 kill_port_process: bool,
118 config: &CliConfig,
119 events: Option<&StartupEventSender>,
120) -> Result<()> {
121 if events.is_none() {
122 CliService::warning(&format!("Port {} is already in use by PID {}", port, pid));
123 }
124
125 let should_kill = kill_port_process
126 || confirm_optional(&format!("Kill process {} and restart?", pid), false, config)?;
127
128 if should_kill {
129 if events.is_none() {
130 CliService::info(&format!("Killing process {}...", pid));
131 }
132 kill_process(pid);
133 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
134
135 if check_port_available(port).is_some() {
136 return Err(anyhow::anyhow!(
137 "Failed to free port {} after killing PID {}",
138 port,
139 pid
140 ));
141 }
142 if events.is_none() {
143 CliService::success(&format!("Port {} is now available", port));
144 }
145 return Ok(());
146 }
147
148 if config.is_interactive() {
149 return Err(anyhow::anyhow!(
150 "Port {} is occupied by PID {}. Aborted by user.",
151 port,
152 pid
153 ));
154 }
155
156 if events.is_none() {
157 CliService::error(&format!("Port {} is already in use by PID {}", port, pid));
158 CliService::info("Use --kill-port-process to terminate the process, or:");
159 CliService::info(" - just api-rebuild (rebuild and restart)");
160 CliService::info(" - just api-nuke (nuclear option - kill everything)");
161 CliService::info(&format!(
162 " - kill {} (manually kill the process)",
163 pid
164 ));
165 }
166 Err(anyhow::anyhow!(
167 "Port {} is occupied by PID {}. Use --kill-port-process to terminate.",
168 port,
169 pid
170 ))
171}
172
173fn register_modules(events: Option<&StartupEventSender>) {
174 let api_registrations: Vec<_> =
175 inventory::iter::<systemprompt_runtime::ModuleApiRegistration>().collect();
176
177 if let Some(tx) = events {
178 let modules: Vec<_> = api_registrations
179 .iter()
180 .map(|r| ModuleInfo {
181 name: r.module_name.to_string(),
182 category: format!("{:?}", r.category),
183 })
184 .collect();
185 tx.modules_loaded(modules.len(), modules);
186 } else {
187 CliService::phase_info(
188 &format!("Loading {} route modules", api_registrations.len()),
189 None,
190 );
191
192 for registration in &api_registrations {
193 let category_name = match registration.category {
194 ServiceCategory::Core => "Core",
195 ServiceCategory::Agent => "Agent",
196 ServiceCategory::Mcp => "Mcp",
197 ServiceCategory::Meta => "Meta",
198 };
199 CliService::phase_success(
200 registration.module_name,
201 Some(&format!("{} routes", category_name)),
202 );
203 }
204 }
205}
206
207async fn run_migrations(ctx: &AppContext, events: Option<&StartupEventSender>) -> Result<()> {
208 let registry = ExtensionRegistry::discover();
209
210 install_extension_schemas(®istry, ctx.db_pool().write_provider())
211 .await
212 .context("Failed to install extension schemas")?;
213
214 if events.is_none() {
215 CliService::phase_success("Database schemas installed", None);
216 }
217
218 Ok(())
219}