Skip to main content

systemprompt_agent/services/agent_orchestration/
process.rs

1use anyhow::{Context, Result};
2use std::fs;
3use std::path::Path;
4use std::process::Command;
5use systemprompt_models::{AppPaths, CliPaths, Config, ProfileBootstrap, SecretsBootstrap};
6
7use crate::services::agent_orchestration::{OrchestrationError, OrchestrationResult};
8
9const MAX_LOG_SIZE: u64 = 10 * 1024 * 1024;
10
11fn rotate_log_if_needed(log_path: &Path) -> Result<()> {
12    if let Ok(metadata) = fs::metadata(log_path) {
13        if metadata.len() > MAX_LOG_SIZE {
14            let backup_path = log_path.with_extension("log.old");
15            fs::rename(log_path, &backup_path)?;
16        }
17    }
18    Ok(())
19}
20
21pub async fn spawn_detached(agent_name: &str, port: u16) -> OrchestrationResult<u32> {
22    let paths = AppPaths::get()
23        .map_err(|e| OrchestrationError::ProcessSpawnFailed(format!("Failed to get paths: {e}")))?;
24
25    let binary_path = paths.build().resolve_binary("systemprompt").map_err(|e| {
26        OrchestrationError::ProcessSpawnFailed(format!("Failed to find systemprompt binary: {e}"))
27    })?;
28
29    let config = Config::get().map_err(|e| {
30        OrchestrationError::ProcessSpawnFailed(format!("Failed to get config: {e}"))
31    })?;
32
33    let log_dir = paths.system().logs();
34    if let Err(e) = fs::create_dir_all(&log_dir) {
35        tracing::error!(
36            error = %e,
37            path = %log_dir.display(),
38            "Failed to create agent log directory - agent may fail to start"
39        );
40    }
41
42    let log_file_path = log_dir.join(format!("agent-{}.log", agent_name));
43    if let Err(e) = rotate_log_if_needed(&log_file_path) {
44        tracing::warn!(
45            error = %e,
46            path = %log_file_path.display(),
47            "Failed to rotate agent log file"
48        );
49    }
50
51    let log_file = fs::OpenOptions::new()
52        .create(true)
53        .append(true)
54        .open(&log_file_path)
55        .map_err(|e| {
56            OrchestrationError::ProcessSpawnFailed(format!(
57                "Failed to create log file {}: {}",
58                log_file_path.display(),
59                e
60            ))
61        })?;
62
63    let secrets = SecretsBootstrap::get().map_err(|e| {
64        OrchestrationError::ProcessSpawnFailed(format!("Failed to get secrets: {e}"))
65    })?;
66
67    let profile_path = ProfileBootstrap::get_path().map_err(|e| {
68        OrchestrationError::ProcessSpawnFailed(format!("Failed to get profile path: {e}"))
69    })?;
70
71    let mut command = Command::new(&binary_path);
72    for arg in CliPaths::agent_run_args() {
73        command.arg(arg);
74    }
75    command
76        .arg("--agent-name")
77        .arg(agent_name)
78        .arg("--port")
79        .arg(port.to_string())
80        .envs(std::env::vars())
81        .env("SYSTEMPROMPT_PROFILE", profile_path)
82        .env("JWT_SECRET", &secrets.jwt_secret)
83        .env("DATABASE_URL", &secrets.database_url)
84        .env("AGENT_NAME", agent_name)
85        .env("AGENT_PORT", port.to_string())
86        .env("DATABASE_TYPE", &config.database_type)
87        .stdout(std::process::Stdio::null())
88        .stderr(std::process::Stdio::from(log_file))
89        .stdin(std::process::Stdio::null());
90
91    if let Some(ref key) = secrets.gemini {
92        command.env("GEMINI_API_KEY", key);
93    }
94    if let Some(ref key) = secrets.anthropic {
95        command.env("ANTHROPIC_API_KEY", key);
96    }
97    if let Some(ref key) = secrets.openai {
98        command.env("OPENAI_API_KEY", key);
99    }
100    if let Some(ref key) = secrets.github {
101        command.env("GITHUB_TOKEN", key);
102    }
103
104    if !secrets.custom.is_empty() {
105        let custom_keys: Vec<&str> = secrets.custom.keys().map(String::as_str).collect();
106        command.env("SYSTEMPROMPT_CUSTOM_SECRETS", custom_keys.join(","));
107        for (key, value) in &secrets.custom {
108            command.env(key, value);
109        }
110    }
111
112    let child = command.spawn().map_err(|e| {
113        OrchestrationError::ProcessSpawnFailed(format!("Failed to spawn {agent_name}: {e}"))
114    })?;
115
116    let pid = child.id();
117
118    std::mem::forget(child);
119
120    if !verify_process_started(pid) {
121        return Err(OrchestrationError::ProcessSpawnFailed(format!(
122            "Agent {} (PID {}) died immediately after spawn",
123            agent_name, pid
124        )));
125    }
126
127    tracing::debug!(pid = %pid, agent_name = %agent_name, "Detached process spawned");
128    Ok(pid)
129}
130
131fn verify_process_started(pid: u32) -> bool {
132    use nix::sys::wait::{waitpid, WaitPidFlag, WaitStatus};
133    use nix::unistd::Pid;
134
135    match waitpid(Pid::from_raw(pid as i32), Some(WaitPidFlag::WNOHANG)) {
136        Ok(WaitStatus::StillAlive) => true,
137        Ok(_) => false,
138        Err(_) => process_exists(pid),
139    }
140}
141
142pub fn process_exists(pid: u32) -> bool {
143    Path::new(&format!("/proc/{}", pid)).exists()
144}
145
146pub fn terminate_process(pid: u32) -> Result<()> {
147    use nix::sys::signal::{self, Signal};
148    use nix::unistd::Pid;
149
150    signal::kill(Pid::from_raw(pid as i32), Signal::SIGTERM)
151        .with_context(|| format!("Failed to send SIGTERM to PID {pid}"))?;
152
153    Ok(())
154}
155
156pub fn force_kill_process(pid: u32) -> Result<()> {
157    use nix::sys::signal::{self, Signal};
158    use nix::unistd::Pid;
159
160    signal::kill(Pid::from_raw(pid as i32), Signal::SIGKILL)
161        .with_context(|| format!("Failed to send SIGKILL to PID {pid}"))?;
162
163    Ok(())
164}
165
166pub async fn terminate_gracefully(pid: u32, timeout_secs: u64) -> Result<()> {
167    terminate_process(pid)?;
168
169    let check_interval = tokio::time::Duration::from_millis(100);
170    let max_checks = (timeout_secs * 1000) / 100;
171
172    for _ in 0..max_checks {
173        if !process_exists(pid) {
174            return Ok(());
175        }
176        tokio::time::sleep(check_interval).await;
177    }
178
179    force_kill_process(pid)?;
180
181    for _ in 0..50 {
182        if !process_exists(pid) {
183            return Ok(());
184        }
185        tokio::time::sleep(check_interval).await;
186    }
187
188    Err(anyhow::anyhow!(
189        "Failed to kill process {} even with SIGKILL",
190        pid
191    ))
192}
193
194pub fn kill_process(pid: u32) -> bool {
195    terminate_process(pid).is_ok()
196}
197
198pub fn is_port_in_use(port: u16) -> bool {
199    use std::net::TcpListener;
200    TcpListener::bind(format!("127.0.0.1:{port}")).is_err()
201}
202
203pub async fn spawn_detached_process(agent_name: &str, port: u16) -> OrchestrationResult<u32> {
204    spawn_detached(agent_name, port).await
205}
206
207pub fn validate_agent_binary() -> Result<()> {
208    let paths = AppPaths::get().map_err(|e| anyhow::anyhow!("{}", e))?;
209    let binary_path = paths.build().resolve_binary("systemprompt")?;
210
211    let metadata = fs::metadata(&binary_path)
212        .with_context(|| format!("Failed to get metadata for: {}", binary_path.display()))?;
213
214    if !metadata.is_file() {
215        return Err(anyhow::anyhow!(
216            "Agent binary is not a file: {}",
217            binary_path.display()
218        ));
219    }
220
221    #[cfg(unix)]
222    {
223        use std::os::unix::fs::PermissionsExt;
224        let permissions = metadata.permissions();
225        if permissions.mode() & 0o111 == 0 {
226            return Err(anyhow::anyhow!(
227                "Agent binary is not executable: {}",
228                binary_path.display()
229            ));
230        }
231    }
232
233    Ok(())
234}