Skip to main content

systemprompt_cli/commands/infrastructure/services/
serve.rs

1use crate::cli_settings::CliConfig;
2use crate::interactive::confirm_optional;
3use anyhow::{Context, Result};
4use std::sync::Arc;
5use systemprompt_database::install_extension_schemas;
6use systemprompt_extension::ExtensionRegistry;
7use systemprompt_logging::CliService;
8use systemprompt_models::ProfileBootstrap;
9use systemprompt_runtime::{validate_system, AppContext, ServiceCategory};
10use systemprompt_scheduler::ProcessCleanup;
11use systemprompt_traits::{ModuleInfo, Phase, StartupEvent, StartupEventExt, StartupEventSender};
12
13const DEFAULT_API_PORT: u16 = 8080;
14
15fn get_api_port() -> u16 {
16    ProfileBootstrap::get()
17        .map(|p| p.server.port)
18        .unwrap_or(DEFAULT_API_PORT)
19}
20
21pub async fn execute_with_events(
22    foreground: bool,
23    kill_port_process: bool,
24    config: &CliConfig,
25    events: Option<&StartupEventSender>,
26) -> Result<String> {
27    let port = get_api_port();
28
29    if events.is_none() {
30        CliService::startup_banner(Some("Starting services..."));
31    }
32
33    if let Some(pid) = check_port_available(port) {
34        if let Some(tx) = events {
35            tx.unbounded_send(StartupEvent::PortConflict { port, pid })
36                .ok();
37        }
38        handle_port_conflict(port, pid, kill_port_process, config, events).await?;
39        if let Some(tx) = events {
40            tx.unbounded_send(StartupEvent::PortConflictResolved { port })
41                .ok();
42        }
43    } else if let Some(tx) = events {
44        tx.port_available(port);
45    } else {
46        CliService::phase_success(&format!("Port {} available", port), None);
47    }
48
49    register_modules(events);
50
51    let ctx = Arc::new(
52        AppContext::builder()
53            .with_startup_warnings(true)
54            .build()
55            .await
56            .context("Failed to initialize application context")?,
57    );
58
59    run_migrations(&ctx, events).await?;
60
61    systemprompt_logging::init_logging(Arc::clone(ctx.db_pool()));
62
63    if let Some(tx) = events {
64        tx.phase_started(Phase::Database);
65        tx.unbounded_send(StartupEvent::DatabaseValidated).ok();
66        tx.phase_completed(Phase::Database);
67    } else {
68        CliService::phase("Validation");
69        CliService::phase_info("Running system validation...", None);
70    }
71
72    validate_system(&ctx)
73        .await
74        .context("System validation failed")?;
75
76    if events.is_none() {
77        CliService::phase_success("System validation complete", None);
78    }
79
80    if events.is_none() {
81        CliService::phase("Server");
82        if !foreground {
83            CliService::phase_warning("Daemon mode not supported", Some("running in foreground"));
84        }
85    } else if let Some(tx) = events {
86        tx.phase_started(Phase::ApiServer);
87        if !foreground {
88            tx.warning("Daemon mode not supported, running in foreground");
89        }
90    }
91
92    if foreground {
93        systemprompt_api::services::server::run_server(Arc::unwrap_or_clone(ctx), events.cloned())
94            .await?;
95    }
96
97    Ok(format!("http://127.0.0.1:{}", port))
98}
99
100pub async fn execute(foreground: bool, kill_port_process: bool, config: &CliConfig) -> Result<()> {
101    execute_with_events(foreground, kill_port_process, config, None)
102        .await
103        .map(|_| ())
104}
105
106fn check_port_available(port: u16) -> Option<u32> {
107    ProcessCleanup::check_port(port)
108}
109
110fn kill_process(pid: u32) {
111    ProcessCleanup::kill_process(pid);
112}
113
114async fn handle_port_conflict(
115    port: u16,
116    pid: u32,
117    kill_port_process: bool,
118    config: &CliConfig,
119    events: Option<&StartupEventSender>,
120) -> Result<()> {
121    if events.is_none() {
122        CliService::warning(&format!("Port {} is already in use by PID {}", port, pid));
123    }
124
125    let should_kill = kill_port_process
126        || confirm_optional(&format!("Kill process {} and restart?", pid), false, config)?;
127
128    if should_kill {
129        if events.is_none() {
130            CliService::info(&format!("Killing process {}...", pid));
131        }
132        kill_process(pid);
133        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
134
135        if check_port_available(port).is_some() {
136            return Err(anyhow::anyhow!(
137                "Failed to free port {} after killing PID {}",
138                port,
139                pid
140            ));
141        }
142        if events.is_none() {
143            CliService::success(&format!("Port {} is now available", port));
144        }
145        return Ok(());
146    }
147
148    if config.is_interactive() {
149        return Err(anyhow::anyhow!(
150            "Port {} is occupied by PID {}. Aborted by user.",
151            port,
152            pid
153        ));
154    }
155
156    if events.is_none() {
157        CliService::error(&format!("Port {} is already in use by PID {}", port, pid));
158        CliService::info("Use --kill-port-process to terminate the process, or:");
159        CliService::info("   - just api-rebuild    (rebuild and restart)");
160        CliService::info("   - just api-nuke       (nuclear option - kill everything)");
161        CliService::info(&format!(
162            "   - kill {}             (manually kill the process)",
163            pid
164        ));
165    }
166    Err(anyhow::anyhow!(
167        "Port {} is occupied by PID {}. Use --kill-port-process to terminate.",
168        port,
169        pid
170    ))
171}
172
173fn register_modules(events: Option<&StartupEventSender>) {
174    let api_registrations: Vec<_> =
175        inventory::iter::<systemprompt_runtime::ModuleApiRegistration>().collect();
176
177    if let Some(tx) = events {
178        let modules: Vec<_> = api_registrations
179            .iter()
180            .map(|r| ModuleInfo {
181                name: r.module_name.to_string(),
182                category: format!("{:?}", r.category),
183            })
184            .collect();
185        tx.modules_loaded(modules.len(), modules);
186    } else {
187        CliService::phase_info(
188            &format!("Loading {} route modules", api_registrations.len()),
189            None,
190        );
191
192        for registration in &api_registrations {
193            let category_name = match registration.category {
194                ServiceCategory::Core => "Core",
195                ServiceCategory::Agent => "Agent",
196                ServiceCategory::Mcp => "Mcp",
197                ServiceCategory::Meta => "Meta",
198            };
199            CliService::phase_success(
200                registration.module_name,
201                Some(&format!("{} routes", category_name)),
202            );
203        }
204    }
205}
206
207async fn run_migrations(ctx: &AppContext, events: Option<&StartupEventSender>) -> Result<()> {
208    let registry = ExtensionRegistry::discover();
209
210    install_extension_schemas(&registry, ctx.db_pool().write_provider())
211        .await
212        .context("Failed to install extension schemas")?;
213
214    if events.is_none() {
215        CliService::phase_success("Database schemas installed", None);
216    }
217
218    Ok(())
219}