Skip to main content

systemprompt_cli/commands/infrastructure/services/
serve.rs

1use crate::cli_settings::CliConfig;
2use crate::interactive::confirm_optional;
3use anyhow::{Context, Result};
4use std::sync::Arc;
5use systemprompt_config::ProfileBootstrap;
6use systemprompt_logging::CliService;
7use systemprompt_runtime::{AppContext, ServiceCategory, validate_system};
8use systemprompt_scheduler::ProcessCleanup;
9use systemprompt_traits::{ModuleInfo, Phase, StartupEvent, StartupEventExt, StartupEventSender};
10
11const DEFAULT_API_PORT: u16 = 8080;
12
13fn get_api_port() -> u16 {
14    ProfileBootstrap::get().map_or(DEFAULT_API_PORT, |p| p.server.port)
15}
16
17pub async fn execute_with_events(
18    foreground: bool,
19    kill_port_process: bool,
20    config: &CliConfig,
21    events: Option<&StartupEventSender>,
22) -> Result<String> {
23    let port = get_api_port();
24
25    if events.is_none() {
26        CliService::startup_banner(Some("Starting services..."));
27    }
28
29    if let Some(pid) = check_port_available(port) {
30        if let Some(tx) = events {
31            if let Err(e) = tx.unbounded_send(StartupEvent::PortConflict { port, pid }) {
32                tracing::debug!(error = %e, "startup event channel closed: PortConflict");
33            }
34        }
35        handle_port_conflict(port, pid, kill_port_process, config, events).await?;
36        if let Some(tx) = events {
37            if let Err(e) = tx.unbounded_send(StartupEvent::PortConflictResolved { port }) {
38                tracing::debug!(error = %e, "startup event channel closed: PortConflictResolved");
39            }
40        }
41    } else if let Some(tx) = events {
42        tx.port_available(port);
43    } else {
44        CliService::phase_success(&format!("Port {} available", port), None);
45    }
46
47    register_modules(events);
48
49    let ctx = Arc::new(
50        AppContext::builder()
51            .with_startup_warnings(true)
52            .with_migrations(true)
53            .build()
54            .await
55            .context("Failed to initialize application context")?,
56    );
57
58    if events.is_none() {
59        CliService::phase_success("Database schemas installed", None);
60    }
61
62    if let Some(tx) = events {
63        tx.phase_started(Phase::Database);
64        if let Err(e) = tx.unbounded_send(StartupEvent::DatabaseValidated) {
65            tracing::debug!(error = %e, "startup event channel closed: DatabaseValidated");
66        }
67        tx.phase_completed(Phase::Database);
68    } else {
69        CliService::phase("Validation");
70        CliService::phase_info("Running system validation...", None);
71    }
72
73    validate_system(&ctx)
74        .await
75        .context("System validation failed")?;
76
77    if events.is_none() {
78        CliService::phase_success("System validation complete", None);
79    }
80
81    if events.is_none() {
82        CliService::phase("Server");
83        if !foreground {
84            CliService::phase_warning("Daemon mode not supported", Some("running in foreground"));
85        }
86    } else if let Some(tx) = events {
87        tx.phase_started(Phase::ApiServer);
88        if !foreground {
89            tx.warning("Daemon mode not supported, running in foreground");
90        }
91    }
92
93    if foreground {
94        systemprompt_api::services::server::run_server(Arc::unwrap_or_clone(ctx), events.cloned())
95            .await?;
96    }
97
98    Ok(format!("http://127.0.0.1:{}", port))
99}
100
101pub async fn execute(foreground: bool, kill_port_process: bool, config: &CliConfig) -> Result<()> {
102    execute_with_events(foreground, kill_port_process, config, None)
103        .await
104        .map(|_| ())
105}
106
107fn check_port_available(port: u16) -> Option<u32> {
108    ProcessCleanup::check_port(port)
109}
110
111fn kill_process(pid: u32) {
112    ProcessCleanup::kill_process(pid);
113}
114
115async fn handle_port_conflict(
116    port: u16,
117    pid: u32,
118    kill_port_process: bool,
119    config: &CliConfig,
120    events: Option<&StartupEventSender>,
121) -> Result<()> {
122    if events.is_none() {
123        CliService::warning(&format!("Port {} is already in use by PID {}", port, pid));
124    }
125
126    let should_kill = kill_port_process
127        || confirm_optional(&format!("Kill process {} and restart?", pid), false, config)?;
128
129    if should_kill {
130        if events.is_none() {
131            CliService::info(&format!("Killing process {}...", pid));
132        }
133        kill_process(pid);
134        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
135
136        if check_port_available(port).is_some() {
137            return Err(anyhow::anyhow!(
138                "Failed to free port {} after killing PID {}",
139                port,
140                pid
141            ));
142        }
143        if events.is_none() {
144            CliService::success(&format!("Port {} is now available", port));
145        }
146        return Ok(());
147    }
148
149    if config.is_interactive() {
150        return Err(anyhow::anyhow!(
151            "Port {} is occupied by PID {}. Aborted by user.",
152            port,
153            pid
154        ));
155    }
156
157    if events.is_none() {
158        CliService::error(&format!("Port {} is already in use by PID {}", port, pid));
159        CliService::info("Use --kill-port-process to terminate the process, or:");
160        CliService::info("   - just api-rebuild    (rebuild and restart)");
161        CliService::info("   - just api-nuke       (nuclear option - kill everything)");
162        CliService::info(&format!(
163            "   - kill {}             (manually kill the process)",
164            pid
165        ));
166    }
167    Err(anyhow::anyhow!(
168        "Port {} is occupied by PID {}. Use --kill-port-process to terminate.",
169        port,
170        pid
171    ))
172}
173
174fn register_modules(events: Option<&StartupEventSender>) {
175    let api_registrations: Vec<_> =
176        inventory::iter::<systemprompt_runtime::ModuleApiRegistration>().collect();
177
178    if let Some(tx) = events {
179        let modules: Vec<_> = api_registrations
180            .iter()
181            .map(|r| ModuleInfo {
182                name: r.module_name.to_string(),
183                category: format!("{:?}", r.category),
184            })
185            .collect();
186        tx.modules_loaded(modules.len(), modules);
187    } else {
188        CliService::phase_info(
189            &format!("Loading {} route modules", api_registrations.len()),
190            None,
191        );
192
193        for registration in &api_registrations {
194            let category_name = match registration.category {
195                ServiceCategory::Core => "Core",
196                ServiceCategory::Agent => "Agent",
197                ServiceCategory::Mcp => "Mcp",
198                ServiceCategory::Meta => "Meta",
199            };
200            CliService::phase_success(
201                registration.module_name,
202                Some(&format!("{} routes", category_name)),
203            );
204        }
205    }
206}