systemprompt_cli/commands/infrastructure/services/
serve.rs1use crate::cli_settings::CliConfig;
2use crate::interactive::confirm_optional;
3use anyhow::{Context, Result};
4use std::sync::Arc;
5use systemprompt_config::ProfileBootstrap;
6use systemprompt_logging::CliService;
7use systemprompt_runtime::{AppContext, ServiceCategory, validate_system};
8use systemprompt_scheduler::ProcessCleanup;
9use systemprompt_traits::{ModuleInfo, Phase, StartupEvent, StartupEventExt, StartupEventSender};
10
11const DEFAULT_API_PORT: u16 = 8080;
12
13fn get_api_port() -> u16 {
14 ProfileBootstrap::get().map_or(DEFAULT_API_PORT, |p| p.server.port)
15}
16
17pub async fn execute_with_events(
18 foreground: bool,
19 kill_port_process: bool,
20 config: &CliConfig,
21 events: Option<&StartupEventSender>,
22) -> Result<String> {
23 let port = get_api_port();
24
25 if events.is_none() {
26 CliService::startup_banner(Some("Starting services..."));
27 }
28
29 if let Some(pid) = check_port_available(port) {
30 if let Some(tx) = events {
31 if let Err(e) = tx.unbounded_send(StartupEvent::PortConflict { port, pid }) {
32 tracing::debug!(error = %e, "startup event channel closed: PortConflict");
33 }
34 }
35 handle_port_conflict(port, pid, kill_port_process, config, events).await?;
36 if let Some(tx) = events {
37 if let Err(e) = tx.unbounded_send(StartupEvent::PortConflictResolved { port }) {
38 tracing::debug!(error = %e, "startup event channel closed: PortConflictResolved");
39 }
40 }
41 } else if let Some(tx) = events {
42 tx.port_available(port);
43 } else {
44 CliService::phase_success(&format!("Port {} available", port), None);
45 }
46
47 register_modules(events);
48
49 let ctx = Arc::new(
50 AppContext::builder()
51 .with_startup_warnings(true)
52 .with_migrations(true)
53 .build()
54 .await
55 .context("Failed to initialize application context")?,
56 );
57
58 if events.is_none() {
59 CliService::phase_success("Database schemas installed", None);
60 }
61
62 if let Some(tx) = events {
63 tx.phase_started(Phase::Database);
64 if let Err(e) = tx.unbounded_send(StartupEvent::DatabaseValidated) {
65 tracing::debug!(error = %e, "startup event channel closed: DatabaseValidated");
66 }
67 tx.phase_completed(Phase::Database);
68 } else {
69 CliService::phase("Validation");
70 CliService::phase_info("Running system validation...", None);
71 }
72
73 validate_system(&ctx)
74 .await
75 .context("System validation failed")?;
76
77 if events.is_none() {
78 CliService::phase_success("System validation complete", None);
79 }
80
81 if events.is_none() {
82 CliService::phase("Server");
83 if !foreground {
84 CliService::phase_warning("Daemon mode not supported", Some("running in foreground"));
85 }
86 } else if let Some(tx) = events {
87 tx.phase_started(Phase::ApiServer);
88 if !foreground {
89 tx.warning("Daemon mode not supported, running in foreground");
90 }
91 }
92
93 if foreground {
94 systemprompt_api::services::server::run_server(Arc::unwrap_or_clone(ctx), events.cloned())
95 .await?;
96 }
97
98 Ok(format!("http://127.0.0.1:{}", port))
99}
100
101pub async fn execute(foreground: bool, kill_port_process: bool, config: &CliConfig) -> Result<()> {
102 execute_with_events(foreground, kill_port_process, config, None)
103 .await
104 .map(|_| ())
105}
106
107fn check_port_available(port: u16) -> Option<u32> {
108 ProcessCleanup::check_port(port)
109}
110
111fn kill_process(pid: u32) {
112 ProcessCleanup::kill_process(pid);
113}
114
115async fn handle_port_conflict(
116 port: u16,
117 pid: u32,
118 kill_port_process: bool,
119 config: &CliConfig,
120 events: Option<&StartupEventSender>,
121) -> Result<()> {
122 if events.is_none() {
123 CliService::warning(&format!("Port {} is already in use by PID {}", port, pid));
124 }
125
126 let should_kill = kill_port_process
127 || confirm_optional(&format!("Kill process {} and restart?", pid), false, config)?;
128
129 if should_kill {
130 if events.is_none() {
131 CliService::info(&format!("Killing process {}...", pid));
132 }
133 kill_process(pid);
134 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
135
136 if check_port_available(port).is_some() {
137 return Err(anyhow::anyhow!(
138 "Failed to free port {} after killing PID {}",
139 port,
140 pid
141 ));
142 }
143 if events.is_none() {
144 CliService::success(&format!("Port {} is now available", port));
145 }
146 return Ok(());
147 }
148
149 if config.is_interactive() {
150 return Err(anyhow::anyhow!(
151 "Port {} is occupied by PID {}. Aborted by user.",
152 port,
153 pid
154 ));
155 }
156
157 if events.is_none() {
158 CliService::error(&format!("Port {} is already in use by PID {}", port, pid));
159 CliService::info("Use --kill-port-process to terminate the process, or:");
160 CliService::info(" - just api-rebuild (rebuild and restart)");
161 CliService::info(" - just api-nuke (nuclear option - kill everything)");
162 CliService::info(&format!(
163 " - kill {} (manually kill the process)",
164 pid
165 ));
166 }
167 Err(anyhow::anyhow!(
168 "Port {} is occupied by PID {}. Use --kill-port-process to terminate.",
169 port,
170 pid
171 ))
172}
173
174fn register_modules(events: Option<&StartupEventSender>) {
175 let api_registrations: Vec<_> =
176 inventory::iter::<systemprompt_runtime::ModuleApiRegistration>().collect();
177
178 if let Some(tx) = events {
179 let modules: Vec<_> = api_registrations
180 .iter()
181 .map(|r| ModuleInfo {
182 name: r.module_name.to_string(),
183 category: format!("{:?}", r.category),
184 })
185 .collect();
186 tx.modules_loaded(modules.len(), modules);
187 } else {
188 CliService::phase_info(
189 &format!("Loading {} route modules", api_registrations.len()),
190 None,
191 );
192
193 for registration in &api_registrations {
194 let category_name = match registration.category {
195 ServiceCategory::Core => "Core",
196 ServiceCategory::Agent => "Agent",
197 ServiceCategory::Mcp => "Mcp",
198 ServiceCategory::Meta => "Meta",
199 };
200 CliService::phase_success(
201 registration.module_name,
202 Some(&format!("{} routes", category_name)),
203 );
204 }
205 }
206}