Skip to main content

systemprompt_agent/services/agent_orchestration/
process.rs

1use anyhow::{Context, Result};
2use std::fs::{self, File};
3use std::path::{Path, PathBuf};
4use std::process::Command;
5use systemprompt_models::{
6    AppPaths, CliPaths, Config, ProfileBootstrap, Secrets, SecretsBootstrap,
7};
8
9use crate::services::agent_orchestration::{OrchestrationError, OrchestrationResult};
10
11const MAX_LOG_SIZE: u64 = 10 * 1024 * 1024;
12
13fn rotate_log_if_needed(log_path: &Path) -> Result<()> {
14    if let Ok(metadata) = fs::metadata(log_path) {
15        if metadata.len() > MAX_LOG_SIZE {
16            let backup_path = log_path.with_extension("log.old");
17            fs::rename(log_path, &backup_path)?;
18        }
19    }
20    Ok(())
21}
22
23fn prepare_agent_log_file(agent_name: &str, log_dir: &Path) -> OrchestrationResult<File> {
24    if let Err(e) = fs::create_dir_all(log_dir) {
25        tracing::error!(
26            error = %e,
27            path = %log_dir.display(),
28            "Failed to create agent log directory - agent may fail to start"
29        );
30    }
31
32    let log_file_path = log_dir.join(format!("agent-{}.log", agent_name));
33    if let Err(e) = rotate_log_if_needed(&log_file_path) {
34        tracing::warn!(
35            error = %e,
36            path = %log_file_path.display(),
37            "Failed to rotate agent log file"
38        );
39    }
40
41    fs::OpenOptions::new()
42        .create(true)
43        .append(true)
44        .open(&log_file_path)
45        .map_err(|e| {
46            OrchestrationError::ProcessSpawnFailed(format!(
47                "Failed to create log file {}: {}",
48                log_file_path.display(),
49                e
50            ))
51        })
52}
53
54fn configure_secrets_env(command: &mut Command, secrets: &Secrets) {
55    if let Some(ref key) = secrets.gemini {
56        command.env("GEMINI_API_KEY", key);
57    }
58    if let Some(ref key) = secrets.anthropic {
59        command.env("ANTHROPIC_API_KEY", key);
60    }
61    if let Some(ref key) = secrets.openai {
62        command.env("OPENAI_API_KEY", key);
63    }
64    if let Some(ref key) = secrets.github {
65        command.env("GITHUB_TOKEN", key);
66    }
67
68    if !secrets.custom.is_empty() {
69        let uppercase_keys = secrets.custom_env_var_names();
70        command.env("SYSTEMPROMPT_CUSTOM_SECRETS", uppercase_keys.join(","));
71        for (env_name, value) in secrets.custom_env_vars() {
72            command.env(env_name, value);
73        }
74    }
75}
76
77struct BuildAgentCommandParams<'a> {
78    binary_path: &'a PathBuf,
79    agent_name: &'a str,
80    port: u16,
81    profile_path: &'a str,
82    secrets: &'a Secrets,
83    config: &'a Config,
84    log_file: File,
85}
86
87fn build_agent_command(params: BuildAgentCommandParams<'_>) -> Command {
88    let BuildAgentCommandParams {
89        binary_path,
90        agent_name,
91        port,
92        profile_path,
93        secrets,
94        config,
95        log_file,
96    } = params;
97    let mut command = Command::new(binary_path);
98    for arg in CliPaths::agent_run_args() {
99        command.arg(arg);
100    }
101    command
102        .arg("--agent-name")
103        .arg(agent_name)
104        .arg("--port")
105        .arg(port.to_string())
106        .env_clear()
107        .env(
108            "PATH",
109            std::env::var("PATH").unwrap_or_default(),
110        )
111        .env(
112            "HOME",
113            std::env::var("HOME").unwrap_or_default(),
114        )
115        .env("SYSTEMPROMPT_PROFILE", profile_path)
116        .env("SYSTEMPROMPT_SUBPROCESS", "1")
117        .env("JWT_SECRET", &secrets.jwt_secret)
118        .env("DATABASE_URL", &secrets.database_url)
119        .env("AGENT_NAME", agent_name)
120        .env("AGENT_PORT", port.to_string())
121        .env("DATABASE_TYPE", &config.database_type)
122        .stdout(std::process::Stdio::null())
123        .stderr(std::process::Stdio::from(log_file))
124        .stdin(std::process::Stdio::null());
125
126    if let Ok(fly_app) = std::env::var("FLY_APP_NAME") {
127        command.env("FLY_APP_NAME", fly_app);
128    }
129
130    configure_secrets_env(&mut command, secrets);
131    command
132}
133
134pub fn spawn_detached(agent_name: &str, port: u16) -> OrchestrationResult<u32> {
135    let paths = AppPaths::get()
136        .map_err(|e| OrchestrationError::ProcessSpawnFailed(format!("Failed to get paths: {e}")))?;
137
138    let binary_path = paths.build().resolve_binary("systemprompt").map_err(|e| {
139        OrchestrationError::ProcessSpawnFailed(format!("Failed to find systemprompt binary: {e}"))
140    })?;
141
142    let config = Config::get().map_err(|e| {
143        OrchestrationError::ProcessSpawnFailed(format!("Failed to get config: {e}"))
144    })?;
145
146    let secrets = SecretsBootstrap::get().map_err(|e| {
147        OrchestrationError::ProcessSpawnFailed(format!("Failed to get secrets: {e}"))
148    })?;
149
150    let profile_path = ProfileBootstrap::get_path().map_err(|e| {
151        OrchestrationError::ProcessSpawnFailed(format!("Failed to get profile path: {e}"))
152    })?;
153
154    let log_file = prepare_agent_log_file(agent_name, &paths.system().logs())?;
155
156    let mut command = build_agent_command(BuildAgentCommandParams {
157        binary_path: &binary_path,
158        agent_name,
159        port,
160        profile_path,
161        secrets,
162        config,
163        log_file,
164    });
165
166    let child = command.spawn().map_err(|e| {
167        OrchestrationError::ProcessSpawnFailed(format!("Failed to spawn {agent_name}: {e}"))
168    })?;
169
170    let pid = child.id();
171    std::mem::forget(child);
172
173    if !verify_process_started(pid) {
174        return Err(OrchestrationError::ProcessSpawnFailed(format!(
175            "Agent {} (PID {}) died immediately after spawn",
176            agent_name, pid
177        )));
178    }
179
180    tracing::debug!(pid = %pid, agent_name = %agent_name, "Detached process spawned");
181    Ok(pid)
182}
183
184#[cfg(unix)]
185fn verify_process_started(pid: u32) -> bool {
186    use nix::sys::wait::{WaitPidFlag, WaitStatus, waitpid};
187    use nix::unistd::Pid;
188
189    match waitpid(Pid::from_raw(pid as i32), Some(WaitPidFlag::WNOHANG)) {
190        Ok(WaitStatus::StillAlive) => true,
191        Ok(_) => false,
192        Err(_) => process_exists(pid),
193    }
194}
195
196#[cfg(windows)]
197fn verify_process_started(pid: u32) -> bool {
198    process_exists(pid)
199}
200
201#[cfg(unix)]
202pub fn process_exists(pid: u32) -> bool {
203    use nix::sys::signal;
204    use nix::unistd::Pid;
205    signal::kill(Pid::from_raw(pid as i32), None).is_ok()
206}
207
208#[cfg(windows)]
209pub fn process_exists(pid: u32) -> bool {
210    Command::new("tasklist")
211        .args(["/FI", &format!("PID eq {}", pid), "/NH"])
212        .output()
213        .map(|o| {
214            let stdout = String::from_utf8_lossy(&o.stdout);
215            !stdout.contains("INFO: No tasks") && !stdout.trim().is_empty()
216        })
217        .unwrap_or(false)
218}
219
220#[cfg(unix)]
221pub fn terminate_process(pid: u32) -> Result<()> {
222    use nix::sys::signal::{self, Signal};
223    use nix::unistd::Pid;
224
225    signal::kill(Pid::from_raw(pid as i32), Signal::SIGTERM)
226        .with_context(|| format!("Failed to send SIGTERM to PID {pid}"))?;
227
228    Ok(())
229}
230
231#[cfg(windows)]
232pub fn terminate_process(pid: u32) -> Result<()> {
233    let output = Command::new("taskkill")
234        .args(["/PID", &pid.to_string()])
235        .output()
236        .with_context(|| format!("Failed to run taskkill for PID {pid}"))?;
237
238    if !output.status.success() {
239        anyhow::bail!("taskkill failed for PID {pid}");
240    }
241    Ok(())
242}
243
244#[cfg(unix)]
245pub fn force_kill_process(pid: u32) -> Result<()> {
246    use nix::sys::signal::{self, Signal};
247    use nix::unistd::Pid;
248
249    signal::kill(Pid::from_raw(pid as i32), Signal::SIGKILL)
250        .with_context(|| format!("Failed to send SIGKILL to PID {pid}"))?;
251
252    Ok(())
253}
254
255#[cfg(windows)]
256pub fn force_kill_process(pid: u32) -> Result<()> {
257    let output = Command::new("taskkill")
258        .args(["/PID", &pid.to_string(), "/F"])
259        .output()
260        .with_context(|| format!("Failed to force-kill PID {pid}"))?;
261
262    if !output.status.success() {
263        anyhow::bail!("taskkill /F failed for PID {pid}");
264    }
265    Ok(())
266}
267
268pub async fn terminate_gracefully(pid: u32, timeout_secs: u64) -> Result<()> {
269    if !process_exists(pid) {
270        return Ok(());
271    }
272
273    terminate_process(pid)?;
274
275    let check_interval = tokio::time::Duration::from_millis(100);
276    let max_checks = (timeout_secs * 1000) / 100;
277
278    for _ in 0..max_checks {
279        if !process_exists(pid) {
280            return Ok(());
281        }
282        tokio::time::sleep(check_interval).await;
283    }
284
285    force_kill_process(pid)?;
286
287    for _ in 0..50 {
288        if !process_exists(pid) {
289            return Ok(());
290        }
291        tokio::time::sleep(check_interval).await;
292    }
293
294    Err(anyhow::anyhow!(
295        "Failed to kill process {} even with SIGKILL",
296        pid
297    ))
298}
299
300pub fn kill_process(pid: u32) -> bool {
301    terminate_process(pid).is_ok()
302}
303
304pub fn is_port_in_use(port: u16) -> bool {
305    use std::net::TcpListener;
306    TcpListener::bind(format!("127.0.0.1:{port}")).is_err()
307}
308
309pub fn spawn_detached_process(agent_name: &str, port: u16) -> OrchestrationResult<u32> {
310    spawn_detached(agent_name, port)
311}
312
313pub fn validate_agent_binary() -> Result<()> {
314    let paths = AppPaths::get().map_err(|e| anyhow::anyhow!("{}", e))?;
315    let binary_path = paths.build().resolve_binary("systemprompt")?;
316
317    let metadata = fs::metadata(&binary_path)
318        .with_context(|| format!("Failed to get metadata for: {}", binary_path.display()))?;
319
320    if !metadata.is_file() {
321        return Err(anyhow::anyhow!(
322            "Agent binary is not a file: {}",
323            binary_path.display()
324        ));
325    }
326
327    #[cfg(unix)]
328    {
329        use std::os::unix::fs::PermissionsExt;
330        let permissions = metadata.permissions();
331        if permissions.mode() & 0o111 == 0 {
332            return Err(anyhow::anyhow!(
333                "Agent binary is not executable: {}",
334                binary_path.display()
335            ));
336        }
337    }
338
339    Ok(())
340}