Skip to main content

systemprompt_agent/services/agent_orchestration/
process.rs

1use anyhow::{Context, Result};
2use std::fs::{self, File};
3use std::path::{Path, PathBuf};
4use std::process::Command;
5use systemprompt_models::{
6    AppPaths, CliPaths, Config, ProfileBootstrap, Secrets, SecretsBootstrap,
7};
8
9use crate::services::agent_orchestration::{OrchestrationError, OrchestrationResult};
10
11const MAX_LOG_SIZE: u64 = 10 * 1024 * 1024;
12
13fn rotate_log_if_needed(log_path: &Path) -> Result<()> {
14    if let Ok(metadata) = fs::metadata(log_path) {
15        if metadata.len() > MAX_LOG_SIZE {
16            let backup_path = log_path.with_extension("log.old");
17            fs::rename(log_path, &backup_path)?;
18        }
19    }
20    Ok(())
21}
22
23fn prepare_agent_log_file(agent_name: &str, log_dir: &Path) -> OrchestrationResult<File> {
24    if let Err(e) = fs::create_dir_all(log_dir) {
25        tracing::error!(
26            error = %e,
27            path = %log_dir.display(),
28            "Failed to create agent log directory - agent may fail to start"
29        );
30    }
31
32    let log_file_path = log_dir.join(format!("agent-{}.log", agent_name));
33    if let Err(e) = rotate_log_if_needed(&log_file_path) {
34        tracing::warn!(
35            error = %e,
36            path = %log_file_path.display(),
37            "Failed to rotate agent log file"
38        );
39    }
40
41    fs::OpenOptions::new()
42        .create(true)
43        .append(true)
44        .open(&log_file_path)
45        .map_err(|e| {
46            OrchestrationError::ProcessSpawnFailed(format!(
47                "Failed to create log file {}: {}",
48                log_file_path.display(),
49                e
50            ))
51        })
52}
53
54fn configure_secrets_env(command: &mut Command, secrets: &Secrets) {
55    if let Some(ref key) = secrets.gemini {
56        command.env("GEMINI_API_KEY", key);
57    }
58    if let Some(ref key) = secrets.anthropic {
59        command.env("ANTHROPIC_API_KEY", key);
60    }
61    if let Some(ref key) = secrets.openai {
62        command.env("OPENAI_API_KEY", key);
63    }
64    if let Some(ref key) = secrets.github {
65        command.env("GITHUB_TOKEN", key);
66    }
67
68    if !secrets.custom.is_empty() {
69        let uppercase_keys = secrets.custom_env_var_names();
70        command.env("SYSTEMPROMPT_CUSTOM_SECRETS", uppercase_keys.join(","));
71        for (env_name, value) in secrets.custom_env_vars() {
72            command.env(env_name, value);
73        }
74    }
75}
76
77struct BuildAgentCommandParams<'a> {
78    binary_path: &'a PathBuf,
79    agent_name: &'a str,
80    port: u16,
81    profile_path: &'a str,
82    secrets: &'a Secrets,
83    config: &'a Config,
84    log_file: File,
85}
86
87fn build_agent_command(params: BuildAgentCommandParams<'_>) -> Command {
88    let BuildAgentCommandParams {
89        binary_path,
90        agent_name,
91        port,
92        profile_path,
93        secrets,
94        config,
95        log_file,
96    } = params;
97    let mut command = Command::new(binary_path);
98    for arg in CliPaths::agent_run_args() {
99        command.arg(arg);
100    }
101    command
102        .arg("--agent-name")
103        .arg(agent_name)
104        .arg("--port")
105        .arg(port.to_string())
106        .env_clear();
107    if let Ok(path) = std::env::var("PATH") {
108        command.env("PATH", path);
109    }
110    if let Ok(home) = std::env::var("HOME") {
111        command.env("HOME", home);
112    }
113    command
114        .env("SYSTEMPROMPT_PROFILE", profile_path)
115        .env("SYSTEMPROMPT_SUBPROCESS", "1")
116        .env("JWT_SECRET", &secrets.jwt_secret)
117        .env("DATABASE_URL", &secrets.database_url)
118        .env("AGENT_NAME", agent_name)
119        .env("AGENT_PORT", port.to_string())
120        .env("DATABASE_TYPE", &config.database_type)
121        .stdout(std::process::Stdio::null())
122        .stderr(std::process::Stdio::from(log_file))
123        .stdin(std::process::Stdio::null());
124
125    if let Ok(fly_app) = std::env::var("FLY_APP_NAME") {
126        command.env("FLY_APP_NAME", fly_app);
127    }
128
129    configure_secrets_env(&mut command, secrets);
130    command
131}
132
133pub fn spawn_detached(agent_name: &str, port: u16) -> OrchestrationResult<u32> {
134    let paths = AppPaths::get()
135        .map_err(|e| OrchestrationError::ProcessSpawnFailed(format!("Failed to get paths: {e}")))?;
136
137    let binary_path = paths.build().resolve_binary("systemprompt").map_err(|e| {
138        OrchestrationError::ProcessSpawnFailed(format!("Failed to find systemprompt binary: {e}"))
139    })?;
140
141    let config = Config::get().map_err(|e| {
142        OrchestrationError::ProcessSpawnFailed(format!("Failed to get config: {e}"))
143    })?;
144
145    let secrets = SecretsBootstrap::get().map_err(|e| {
146        OrchestrationError::ProcessSpawnFailed(format!("Failed to get secrets: {e}"))
147    })?;
148
149    let profile_path = ProfileBootstrap::get_path().map_err(|e| {
150        OrchestrationError::ProcessSpawnFailed(format!("Failed to get profile path: {e}"))
151    })?;
152
153    let log_file = prepare_agent_log_file(agent_name, &paths.system().logs())?;
154
155    let mut command = build_agent_command(BuildAgentCommandParams {
156        binary_path: &binary_path,
157        agent_name,
158        port,
159        profile_path,
160        secrets,
161        config,
162        log_file,
163    });
164
165    let child = command.spawn().map_err(|e| {
166        OrchestrationError::ProcessSpawnFailed(format!("Failed to spawn {agent_name}: {e}"))
167    })?;
168
169    let pid = child.id();
170    std::mem::forget(child);
171
172    if !verify_process_started(pid) {
173        return Err(OrchestrationError::ProcessSpawnFailed(format!(
174            "Agent {} (PID {}) died immediately after spawn",
175            agent_name, pid
176        )));
177    }
178
179    tracing::debug!(pid = %pid, agent_name = %agent_name, "Detached process spawned");
180    Ok(pid)
181}
182
183#[cfg(unix)]
184fn verify_process_started(pid: u32) -> bool {
185    use nix::sys::wait::{WaitPidFlag, WaitStatus, waitpid};
186    use nix::unistd::Pid;
187
188    match waitpid(Pid::from_raw(pid as i32), Some(WaitPidFlag::WNOHANG)) {
189        Ok(WaitStatus::StillAlive) => true,
190        Ok(_) => false,
191        Err(_) => process_exists(pid),
192    }
193}
194
195#[cfg(windows)]
196fn verify_process_started(pid: u32) -> bool {
197    process_exists(pid)
198}
199
200#[cfg(unix)]
201pub fn process_exists(pid: u32) -> bool {
202    use nix::sys::signal;
203    use nix::unistd::Pid;
204    signal::kill(Pid::from_raw(pid as i32), None).is_ok()
205}
206
207#[cfg(windows)]
208pub fn process_exists(pid: u32) -> bool {
209    Command::new("tasklist")
210        .args(["/FI", &format!("PID eq {}", pid), "/NH"])
211        .output()
212        .map(|o| {
213            let stdout = String::from_utf8_lossy(&o.stdout);
214            !stdout.contains("INFO: No tasks") && !stdout.trim().is_empty()
215        })
216        .unwrap_or(false)
217}
218
219#[cfg(unix)]
220pub fn terminate_process(pid: u32) -> Result<()> {
221    use nix::sys::signal::{self, Signal};
222    use nix::unistd::Pid;
223
224    signal::kill(Pid::from_raw(pid as i32), Signal::SIGTERM)
225        .with_context(|| format!("Failed to send SIGTERM to PID {pid}"))?;
226
227    Ok(())
228}
229
230#[cfg(windows)]
231pub fn terminate_process(pid: u32) -> Result<()> {
232    let output = Command::new("taskkill")
233        .args(["/PID", &pid.to_string()])
234        .output()
235        .with_context(|| format!("Failed to run taskkill for PID {pid}"))?;
236
237    if !output.status.success() {
238        anyhow::bail!("taskkill failed for PID {pid}");
239    }
240    Ok(())
241}
242
243#[cfg(unix)]
244pub fn force_kill_process(pid: u32) -> Result<()> {
245    use nix::sys::signal::{self, Signal};
246    use nix::unistd::Pid;
247
248    signal::kill(Pid::from_raw(pid as i32), Signal::SIGKILL)
249        .with_context(|| format!("Failed to send SIGKILL to PID {pid}"))?;
250
251    Ok(())
252}
253
254#[cfg(windows)]
255pub fn force_kill_process(pid: u32) -> Result<()> {
256    let output = Command::new("taskkill")
257        .args(["/PID", &pid.to_string(), "/F"])
258        .output()
259        .with_context(|| format!("Failed to force-kill PID {pid}"))?;
260
261    if !output.status.success() {
262        anyhow::bail!("taskkill /F failed for PID {pid}");
263    }
264    Ok(())
265}
266
267pub async fn terminate_gracefully(pid: u32, timeout_secs: u64) -> Result<()> {
268    if !process_exists(pid) {
269        return Ok(());
270    }
271
272    terminate_process(pid)?;
273
274    let check_interval = tokio::time::Duration::from_millis(100);
275    let max_checks = (timeout_secs * 1000) / 100;
276
277    for _ in 0..max_checks {
278        if !process_exists(pid) {
279            return Ok(());
280        }
281        tokio::time::sleep(check_interval).await;
282    }
283
284    force_kill_process(pid)?;
285
286    for _ in 0..50 {
287        if !process_exists(pid) {
288            return Ok(());
289        }
290        tokio::time::sleep(check_interval).await;
291    }
292
293    Err(anyhow::anyhow!(
294        "Failed to kill process {} even with SIGKILL",
295        pid
296    ))
297}
298
299pub fn kill_process(pid: u32) -> bool {
300    terminate_process(pid).is_ok()
301}
302
303pub fn is_port_in_use(port: u16) -> bool {
304    use std::net::TcpListener;
305    TcpListener::bind(format!("127.0.0.1:{port}")).is_err()
306}
307
308pub fn spawn_detached_process(agent_name: &str, port: u16) -> OrchestrationResult<u32> {
309    spawn_detached(agent_name, port)
310}
311
312pub fn validate_agent_binary() -> Result<()> {
313    let paths = AppPaths::get().map_err(|e| anyhow::anyhow!("{}", e))?;
314    let binary_path = paths.build().resolve_binary("systemprompt")?;
315
316    let metadata = fs::metadata(&binary_path)
317        .with_context(|| format!("Failed to get metadata for: {}", binary_path.display()))?;
318
319    if !metadata.is_file() {
320        return Err(anyhow::anyhow!(
321            "Agent binary is not a file: {}",
322            binary_path.display()
323        ));
324    }
325
326    #[cfg(unix)]
327    {
328        use std::os::unix::fs::PermissionsExt;
329        let permissions = metadata.permissions();
330        if permissions.mode() & 0o111 == 0 {
331            return Err(anyhow::anyhow!(
332                "Agent binary is not executable: {}",
333                binary_path.display()
334            ));
335        }
336    }
337
338    Ok(())
339}