systemprompt_cli/commands/infrastructure/services/
serve.rs1use crate::cli_settings::CliConfig;
2use crate::interactive::confirm_optional;
3use anyhow::{Context, Result};
4use std::sync::Arc;
5use systemprompt_database::install_extension_schemas;
6use systemprompt_extension::ExtensionRegistry;
7use systemprompt_logging::CliService;
8use systemprompt_models::ProfileBootstrap;
9use systemprompt_runtime::{AppContext, ServiceCategory, validate_system};
10use systemprompt_scheduler::ProcessCleanup;
11use systemprompt_traits::{ModuleInfo, Phase, StartupEvent, StartupEventExt, StartupEventSender};
12
13const DEFAULT_API_PORT: u16 = 8080;
14
15fn get_api_port() -> u16 {
16 ProfileBootstrap::get().map_or(DEFAULT_API_PORT, |p| p.server.port)
17}
18
19pub async fn execute_with_events(
20 foreground: bool,
21 kill_port_process: bool,
22 config: &CliConfig,
23 events: Option<&StartupEventSender>,
24) -> Result<String> {
25 let port = get_api_port();
26
27 if events.is_none() {
28 CliService::startup_banner(Some("Starting services..."));
29 }
30
31 if let Some(pid) = check_port_available(port) {
32 if let Some(tx) = events {
33 tx.unbounded_send(StartupEvent::PortConflict { port, pid })
34 .ok();
35 }
36 handle_port_conflict(port, pid, kill_port_process, config, events).await?;
37 if let Some(tx) = events {
38 tx.unbounded_send(StartupEvent::PortConflictResolved { port })
39 .ok();
40 }
41 } else if let Some(tx) = events {
42 tx.port_available(port);
43 } else {
44 CliService::phase_success(&format!("Port {} available", port), None);
45 }
46
47 register_modules(events);
48
49 let ctx = Arc::new(
50 AppContext::builder()
51 .with_startup_warnings(true)
52 .build()
53 .await
54 .context("Failed to initialize application context")?,
55 );
56
57 run_migrations(&ctx, events).await?;
58
59 if let Some(tx) = events {
60 tx.phase_started(Phase::Database);
61 tx.unbounded_send(StartupEvent::DatabaseValidated).ok();
62 tx.phase_completed(Phase::Database);
63 } else {
64 CliService::phase("Validation");
65 CliService::phase_info("Running system validation...", None);
66 }
67
68 validate_system(&ctx)
69 .await
70 .context("System validation failed")?;
71
72 if events.is_none() {
73 CliService::phase_success("System validation complete", None);
74 }
75
76 if events.is_none() {
77 CliService::phase("Server");
78 if !foreground {
79 CliService::phase_warning("Daemon mode not supported", Some("running in foreground"));
80 }
81 } else if let Some(tx) = events {
82 tx.phase_started(Phase::ApiServer);
83 if !foreground {
84 tx.warning("Daemon mode not supported, running in foreground");
85 }
86 }
87
88 if foreground {
89 systemprompt_api::services::server::run_server(Arc::unwrap_or_clone(ctx), events.cloned())
90 .await?;
91 }
92
93 Ok(format!("http://127.0.0.1:{}", port))
94}
95
96pub async fn execute(foreground: bool, kill_port_process: bool, config: &CliConfig) -> Result<()> {
97 execute_with_events(foreground, kill_port_process, config, None)
98 .await
99 .map(|_| ())
100}
101
102fn check_port_available(port: u16) -> Option<u32> {
103 ProcessCleanup::check_port(port)
104}
105
106fn kill_process(pid: u32) {
107 ProcessCleanup::kill_process(pid);
108}
109
110async fn handle_port_conflict(
111 port: u16,
112 pid: u32,
113 kill_port_process: bool,
114 config: &CliConfig,
115 events: Option<&StartupEventSender>,
116) -> Result<()> {
117 if events.is_none() {
118 CliService::warning(&format!("Port {} is already in use by PID {}", port, pid));
119 }
120
121 let should_kill = kill_port_process
122 || confirm_optional(&format!("Kill process {} and restart?", pid), false, config)?;
123
124 if should_kill {
125 if events.is_none() {
126 CliService::info(&format!("Killing process {}...", pid));
127 }
128 kill_process(pid);
129 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
130
131 if check_port_available(port).is_some() {
132 return Err(anyhow::anyhow!(
133 "Failed to free port {} after killing PID {}",
134 port,
135 pid
136 ));
137 }
138 if events.is_none() {
139 CliService::success(&format!("Port {} is now available", port));
140 }
141 return Ok(());
142 }
143
144 if config.is_interactive() {
145 return Err(anyhow::anyhow!(
146 "Port {} is occupied by PID {}. Aborted by user.",
147 port,
148 pid
149 ));
150 }
151
152 if events.is_none() {
153 CliService::error(&format!("Port {} is already in use by PID {}", port, pid));
154 CliService::info("Use --kill-port-process to terminate the process, or:");
155 CliService::info(" - just api-rebuild (rebuild and restart)");
156 CliService::info(" - just api-nuke (nuclear option - kill everything)");
157 CliService::info(&format!(
158 " - kill {} (manually kill the process)",
159 pid
160 ));
161 }
162 Err(anyhow::anyhow!(
163 "Port {} is occupied by PID {}. Use --kill-port-process to terminate.",
164 port,
165 pid
166 ))
167}
168
169fn register_modules(events: Option<&StartupEventSender>) {
170 let api_registrations: Vec<_> =
171 inventory::iter::<systemprompt_runtime::ModuleApiRegistration>().collect();
172
173 if let Some(tx) = events {
174 let modules: Vec<_> = api_registrations
175 .iter()
176 .map(|r| ModuleInfo {
177 name: r.module_name.to_string(),
178 category: format!("{:?}", r.category),
179 })
180 .collect();
181 tx.modules_loaded(modules.len(), modules);
182 } else {
183 CliService::phase_info(
184 &format!("Loading {} route modules", api_registrations.len()),
185 None,
186 );
187
188 for registration in &api_registrations {
189 let category_name = match registration.category {
190 ServiceCategory::Core => "Core",
191 ServiceCategory::Agent => "Agent",
192 ServiceCategory::Mcp => "Mcp",
193 ServiceCategory::Meta => "Meta",
194 };
195 CliService::phase_success(
196 registration.module_name,
197 Some(&format!("{} routes", category_name)),
198 );
199 }
200 }
201}
202
203async fn run_migrations(ctx: &AppContext, events: Option<&StartupEventSender>) -> Result<()> {
204 let registry = ExtensionRegistry::discover();
205
206 install_extension_schemas(®istry, ctx.db_pool().write_provider())
207 .await
208 .context("Failed to install extension schemas")?;
209
210 if events.is_none() {
211 CliService::phase_success("Database schemas installed", None);
212 }
213
214 Ok(())
215}