Skip to main content

systemprompt_cli/commands/infrastructure/services/
serve.rs

1use crate::cli_settings::CliConfig;
2use crate::interactive::confirm_optional;
3use anyhow::{Context, Result};
4use std::sync::Arc;
5use systemprompt_database::install_extension_schemas;
6use systemprompt_extension::ExtensionRegistry;
7use systemprompt_logging::CliService;
8use systemprompt_models::ProfileBootstrap;
9use systemprompt_runtime::{AppContext, ServiceCategory, validate_system};
10use systemprompt_scheduler::ProcessCleanup;
11use systemprompt_traits::{ModuleInfo, Phase, StartupEvent, StartupEventExt, StartupEventSender};
12
13const DEFAULT_API_PORT: u16 = 8080;
14
15fn get_api_port() -> u16 {
16    ProfileBootstrap::get().map_or(DEFAULT_API_PORT, |p| p.server.port)
17}
18
19pub async fn execute_with_events(
20    foreground: bool,
21    kill_port_process: bool,
22    config: &CliConfig,
23    events: Option<&StartupEventSender>,
24) -> Result<String> {
25    let port = get_api_port();
26
27    if events.is_none() {
28        CliService::startup_banner(Some("Starting services..."));
29    }
30
31    if let Some(pid) = check_port_available(port) {
32        if let Some(tx) = events {
33            tx.unbounded_send(StartupEvent::PortConflict { port, pid })
34                .ok();
35        }
36        handle_port_conflict(port, pid, kill_port_process, config, events).await?;
37        if let Some(tx) = events {
38            tx.unbounded_send(StartupEvent::PortConflictResolved { port })
39                .ok();
40        }
41    } else if let Some(tx) = events {
42        tx.port_available(port);
43    } else {
44        CliService::phase_success(&format!("Port {} available", port), None);
45    }
46
47    register_modules(events);
48
49    let ctx = Arc::new(
50        AppContext::builder()
51            .with_startup_warnings(true)
52            .build()
53            .await
54            .context("Failed to initialize application context")?,
55    );
56
57    run_migrations(&ctx, events).await?;
58
59    if let Some(tx) = events {
60        tx.phase_started(Phase::Database);
61        tx.unbounded_send(StartupEvent::DatabaseValidated).ok();
62        tx.phase_completed(Phase::Database);
63    } else {
64        CliService::phase("Validation");
65        CliService::phase_info("Running system validation...", None);
66    }
67
68    validate_system(&ctx)
69        .await
70        .context("System validation failed")?;
71
72    if events.is_none() {
73        CliService::phase_success("System validation complete", None);
74    }
75
76    if events.is_none() {
77        CliService::phase("Server");
78        if !foreground {
79            CliService::phase_warning("Daemon mode not supported", Some("running in foreground"));
80        }
81    } else if let Some(tx) = events {
82        tx.phase_started(Phase::ApiServer);
83        if !foreground {
84            tx.warning("Daemon mode not supported, running in foreground");
85        }
86    }
87
88    if foreground {
89        systemprompt_api::services::server::run_server(Arc::unwrap_or_clone(ctx), events.cloned())
90            .await?;
91    }
92
93    Ok(format!("http://127.0.0.1:{}", port))
94}
95
96pub async fn execute(foreground: bool, kill_port_process: bool, config: &CliConfig) -> Result<()> {
97    execute_with_events(foreground, kill_port_process, config, None)
98        .await
99        .map(|_| ())
100}
101
102fn check_port_available(port: u16) -> Option<u32> {
103    ProcessCleanup::check_port(port)
104}
105
106fn kill_process(pid: u32) {
107    ProcessCleanup::kill_process(pid);
108}
109
110async fn handle_port_conflict(
111    port: u16,
112    pid: u32,
113    kill_port_process: bool,
114    config: &CliConfig,
115    events: Option<&StartupEventSender>,
116) -> Result<()> {
117    if events.is_none() {
118        CliService::warning(&format!("Port {} is already in use by PID {}", port, pid));
119    }
120
121    let should_kill = kill_port_process
122        || confirm_optional(&format!("Kill process {} and restart?", pid), false, config)?;
123
124    if should_kill {
125        if events.is_none() {
126            CliService::info(&format!("Killing process {}...", pid));
127        }
128        kill_process(pid);
129        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
130
131        if check_port_available(port).is_some() {
132            return Err(anyhow::anyhow!(
133                "Failed to free port {} after killing PID {}",
134                port,
135                pid
136            ));
137        }
138        if events.is_none() {
139            CliService::success(&format!("Port {} is now available", port));
140        }
141        return Ok(());
142    }
143
144    if config.is_interactive() {
145        return Err(anyhow::anyhow!(
146            "Port {} is occupied by PID {}. Aborted by user.",
147            port,
148            pid
149        ));
150    }
151
152    if events.is_none() {
153        CliService::error(&format!("Port {} is already in use by PID {}", port, pid));
154        CliService::info("Use --kill-port-process to terminate the process, or:");
155        CliService::info("   - just api-rebuild    (rebuild and restart)");
156        CliService::info("   - just api-nuke       (nuclear option - kill everything)");
157        CliService::info(&format!(
158            "   - kill {}             (manually kill the process)",
159            pid
160        ));
161    }
162    Err(anyhow::anyhow!(
163        "Port {} is occupied by PID {}. Use --kill-port-process to terminate.",
164        port,
165        pid
166    ))
167}
168
169fn register_modules(events: Option<&StartupEventSender>) {
170    let api_registrations: Vec<_> =
171        inventory::iter::<systemprompt_runtime::ModuleApiRegistration>().collect();
172
173    if let Some(tx) = events {
174        let modules: Vec<_> = api_registrations
175            .iter()
176            .map(|r| ModuleInfo {
177                name: r.module_name.to_string(),
178                category: format!("{:?}", r.category),
179            })
180            .collect();
181        tx.modules_loaded(modules.len(), modules);
182    } else {
183        CliService::phase_info(
184            &format!("Loading {} route modules", api_registrations.len()),
185            None,
186        );
187
188        for registration in &api_registrations {
189            let category_name = match registration.category {
190                ServiceCategory::Core => "Core",
191                ServiceCategory::Agent => "Agent",
192                ServiceCategory::Mcp => "Mcp",
193                ServiceCategory::Meta => "Meta",
194            };
195            CliService::phase_success(
196                registration.module_name,
197                Some(&format!("{} routes", category_name)),
198            );
199        }
200    }
201}
202
203async fn run_migrations(ctx: &AppContext, events: Option<&StartupEventSender>) -> Result<()> {
204    let registry = ExtensionRegistry::discover();
205
206    install_extension_schemas(&registry, ctx.db_pool().write_provider())
207        .await
208        .context("Failed to install extension schemas")?;
209
210    if events.is_none() {
211        CliService::phase_success("Database schemas installed", None);
212    }
213
214    Ok(())
215}