systemprompt_agent/services/agent_orchestration/
process.rs1use anyhow::{Context, Result};
2use std::fs::{self, File};
3use std::path::{Path, PathBuf};
4use std::process::Command;
5use systemprompt_models::{
6 AppPaths, CliPaths, Config, ProfileBootstrap, Secrets, SecretsBootstrap,
7};
8
9use crate::services::agent_orchestration::{OrchestrationError, OrchestrationResult};
10
11const MAX_LOG_SIZE: u64 = 10 * 1024 * 1024;
12
13fn rotate_log_if_needed(log_path: &Path) -> Result<()> {
14 if let Ok(metadata) = fs::metadata(log_path) {
15 if metadata.len() > MAX_LOG_SIZE {
16 let backup_path = log_path.with_extension("log.old");
17 fs::rename(log_path, &backup_path)?;
18 }
19 }
20 Ok(())
21}
22
23fn prepare_agent_log_file(agent_name: &str, log_dir: &Path) -> OrchestrationResult<File> {
24 if let Err(e) = fs::create_dir_all(log_dir) {
25 tracing::error!(
26 error = %e,
27 path = %log_dir.display(),
28 "Failed to create agent log directory - agent may fail to start"
29 );
30 }
31
32 let log_file_path = log_dir.join(format!("agent-{}.log", agent_name));
33 if let Err(e) = rotate_log_if_needed(&log_file_path) {
34 tracing::warn!(
35 error = %e,
36 path = %log_file_path.display(),
37 "Failed to rotate agent log file"
38 );
39 }
40
41 fs::OpenOptions::new()
42 .create(true)
43 .append(true)
44 .open(&log_file_path)
45 .map_err(|e| {
46 OrchestrationError::ProcessSpawnFailed(format!(
47 "Failed to create log file {}: {}",
48 log_file_path.display(),
49 e
50 ))
51 })
52}
53
54fn configure_secrets_env(command: &mut Command, secrets: &Secrets) {
55 if let Some(ref key) = secrets.gemini {
56 command.env("GEMINI_API_KEY", key);
57 }
58 if let Some(ref key) = secrets.anthropic {
59 command.env("ANTHROPIC_API_KEY", key);
60 }
61 if let Some(ref key) = secrets.openai {
62 command.env("OPENAI_API_KEY", key);
63 }
64 if let Some(ref key) = secrets.github {
65 command.env("GITHUB_TOKEN", key);
66 }
67
68 if !secrets.custom.is_empty() {
69 let uppercase_keys = secrets.custom_env_var_names();
70 command.env("SYSTEMPROMPT_CUSTOM_SECRETS", uppercase_keys.join(","));
71 for (env_name, value) in secrets.custom_env_vars() {
72 command.env(env_name, value);
73 }
74 }
75}
76
77struct BuildAgentCommandParams<'a> {
78 binary_path: &'a PathBuf,
79 agent_name: &'a str,
80 port: u16,
81 profile_path: &'a str,
82 secrets: &'a Secrets,
83 config: &'a Config,
84 log_file: File,
85}
86
87fn build_agent_command(params: BuildAgentCommandParams<'_>) -> Command {
88 let BuildAgentCommandParams {
89 binary_path,
90 agent_name,
91 port,
92 profile_path,
93 secrets,
94 config,
95 log_file,
96 } = params;
97 let mut command = Command::new(binary_path);
98 for arg in CliPaths::agent_run_args() {
99 command.arg(arg);
100 }
101 command
102 .arg("--agent-name")
103 .arg(agent_name)
104 .arg("--port")
105 .arg(port.to_string())
106 .env_clear()
107 .env(
108 "PATH",
109 std::env::var("PATH").unwrap_or_default(),
110 )
111 .env(
112 "HOME",
113 std::env::var("HOME").unwrap_or_default(),
114 )
115 .env("SYSTEMPROMPT_PROFILE", profile_path)
116 .env("SYSTEMPROMPT_SUBPROCESS", "1")
117 .env("JWT_SECRET", &secrets.jwt_secret)
118 .env("DATABASE_URL", &secrets.database_url)
119 .env("AGENT_NAME", agent_name)
120 .env("AGENT_PORT", port.to_string())
121 .env("DATABASE_TYPE", &config.database_type)
122 .stdout(std::process::Stdio::null())
123 .stderr(std::process::Stdio::from(log_file))
124 .stdin(std::process::Stdio::null());
125
126 if let Ok(fly_app) = std::env::var("FLY_APP_NAME") {
127 command.env("FLY_APP_NAME", fly_app);
128 }
129
130 configure_secrets_env(&mut command, secrets);
131 command
132}
133
134pub fn spawn_detached(agent_name: &str, port: u16) -> OrchestrationResult<u32> {
135 let paths = AppPaths::get()
136 .map_err(|e| OrchestrationError::ProcessSpawnFailed(format!("Failed to get paths: {e}")))?;
137
138 let binary_path = paths.build().resolve_binary("systemprompt").map_err(|e| {
139 OrchestrationError::ProcessSpawnFailed(format!("Failed to find systemprompt binary: {e}"))
140 })?;
141
142 let config = Config::get().map_err(|e| {
143 OrchestrationError::ProcessSpawnFailed(format!("Failed to get config: {e}"))
144 })?;
145
146 let secrets = SecretsBootstrap::get().map_err(|e| {
147 OrchestrationError::ProcessSpawnFailed(format!("Failed to get secrets: {e}"))
148 })?;
149
150 let profile_path = ProfileBootstrap::get_path().map_err(|e| {
151 OrchestrationError::ProcessSpawnFailed(format!("Failed to get profile path: {e}"))
152 })?;
153
154 let log_file = prepare_agent_log_file(agent_name, &paths.system().logs())?;
155
156 let mut command = build_agent_command(BuildAgentCommandParams {
157 binary_path: &binary_path,
158 agent_name,
159 port,
160 profile_path,
161 secrets,
162 config,
163 log_file,
164 });
165
166 let child = command.spawn().map_err(|e| {
167 OrchestrationError::ProcessSpawnFailed(format!("Failed to spawn {agent_name}: {e}"))
168 })?;
169
170 let pid = child.id();
171 std::mem::forget(child);
172
173 if !verify_process_started(pid) {
174 return Err(OrchestrationError::ProcessSpawnFailed(format!(
175 "Agent {} (PID {}) died immediately after spawn",
176 agent_name, pid
177 )));
178 }
179
180 tracing::debug!(pid = %pid, agent_name = %agent_name, "Detached process spawned");
181 Ok(pid)
182}
183
184#[cfg(unix)]
185fn verify_process_started(pid: u32) -> bool {
186 use nix::sys::wait::{WaitPidFlag, WaitStatus, waitpid};
187 use nix::unistd::Pid;
188
189 match waitpid(Pid::from_raw(pid as i32), Some(WaitPidFlag::WNOHANG)) {
190 Ok(WaitStatus::StillAlive) => true,
191 Ok(_) => false,
192 Err(_) => process_exists(pid),
193 }
194}
195
196#[cfg(windows)]
197fn verify_process_started(pid: u32) -> bool {
198 process_exists(pid)
199}
200
201#[cfg(unix)]
202pub fn process_exists(pid: u32) -> bool {
203 use nix::sys::signal;
204 use nix::unistd::Pid;
205 signal::kill(Pid::from_raw(pid as i32), None).is_ok()
206}
207
208#[cfg(windows)]
209pub fn process_exists(pid: u32) -> bool {
210 Command::new("tasklist")
211 .args(["/FI", &format!("PID eq {}", pid), "/NH"])
212 .output()
213 .map(|o| {
214 let stdout = String::from_utf8_lossy(&o.stdout);
215 !stdout.contains("INFO: No tasks") && !stdout.trim().is_empty()
216 })
217 .unwrap_or(false)
218}
219
220#[cfg(unix)]
221pub fn terminate_process(pid: u32) -> Result<()> {
222 use nix::sys::signal::{self, Signal};
223 use nix::unistd::Pid;
224
225 signal::kill(Pid::from_raw(pid as i32), Signal::SIGTERM)
226 .with_context(|| format!("Failed to send SIGTERM to PID {pid}"))?;
227
228 Ok(())
229}
230
231#[cfg(windows)]
232pub fn terminate_process(pid: u32) -> Result<()> {
233 let output = Command::new("taskkill")
234 .args(["/PID", &pid.to_string()])
235 .output()
236 .with_context(|| format!("Failed to run taskkill for PID {pid}"))?;
237
238 if !output.status.success() {
239 anyhow::bail!("taskkill failed for PID {pid}");
240 }
241 Ok(())
242}
243
244#[cfg(unix)]
245pub fn force_kill_process(pid: u32) -> Result<()> {
246 use nix::sys::signal::{self, Signal};
247 use nix::unistd::Pid;
248
249 signal::kill(Pid::from_raw(pid as i32), Signal::SIGKILL)
250 .with_context(|| format!("Failed to send SIGKILL to PID {pid}"))?;
251
252 Ok(())
253}
254
255#[cfg(windows)]
256pub fn force_kill_process(pid: u32) -> Result<()> {
257 let output = Command::new("taskkill")
258 .args(["/PID", &pid.to_string(), "/F"])
259 .output()
260 .with_context(|| format!("Failed to force-kill PID {pid}"))?;
261
262 if !output.status.success() {
263 anyhow::bail!("taskkill /F failed for PID {pid}");
264 }
265 Ok(())
266}
267
268pub async fn terminate_gracefully(pid: u32, timeout_secs: u64) -> Result<()> {
269 if !process_exists(pid) {
270 return Ok(());
271 }
272
273 terminate_process(pid)?;
274
275 let check_interval = tokio::time::Duration::from_millis(100);
276 let max_checks = (timeout_secs * 1000) / 100;
277
278 for _ in 0..max_checks {
279 if !process_exists(pid) {
280 return Ok(());
281 }
282 tokio::time::sleep(check_interval).await;
283 }
284
285 force_kill_process(pid)?;
286
287 for _ in 0..50 {
288 if !process_exists(pid) {
289 return Ok(());
290 }
291 tokio::time::sleep(check_interval).await;
292 }
293
294 Err(anyhow::anyhow!(
295 "Failed to kill process {} even with SIGKILL",
296 pid
297 ))
298}
299
300pub fn kill_process(pid: u32) -> bool {
301 terminate_process(pid).is_ok()
302}
303
304pub fn is_port_in_use(port: u16) -> bool {
305 use std::net::TcpListener;
306 TcpListener::bind(format!("127.0.0.1:{port}")).is_err()
307}
308
309pub fn spawn_detached_process(agent_name: &str, port: u16) -> OrchestrationResult<u32> {
310 spawn_detached(agent_name, port)
311}
312
313pub fn validate_agent_binary() -> Result<()> {
314 let paths = AppPaths::get().map_err(|e| anyhow::anyhow!("{}", e))?;
315 let binary_path = paths.build().resolve_binary("systemprompt")?;
316
317 let metadata = fs::metadata(&binary_path)
318 .with_context(|| format!("Failed to get metadata for: {}", binary_path.display()))?;
319
320 if !metadata.is_file() {
321 return Err(anyhow::anyhow!(
322 "Agent binary is not a file: {}",
323 binary_path.display()
324 ));
325 }
326
327 #[cfg(unix)]
328 {
329 use std::os::unix::fs::PermissionsExt;
330 let permissions = metadata.permissions();
331 if permissions.mode() & 0o111 == 0 {
332 return Err(anyhow::anyhow!(
333 "Agent binary is not executable: {}",
334 binary_path.display()
335 ));
336 }
337 }
338
339 Ok(())
340}