syncable_cli/agent/tools/
background.rs1use std::collections::HashMap;
28use std::path::Path;
29use std::sync::Arc;
30use std::time::Instant;
31use tokio::io::{AsyncBufReadExt, BufReader};
32use tokio::process::{Child, Command};
33use tokio::sync::Mutex;
34
35#[derive(Debug, thiserror::Error)]
37pub enum BackgroundProcessError {
38 #[error("Failed to spawn process: {0}")]
39 SpawnFailed(String),
40
41 #[error("Process not found: {0}")]
42 NotFound(String),
43
44 #[error("Failed to parse port from output: {0}")]
45 PortParseFailed(String),
46
47 #[error("IO error: {0}")]
48 IoError(#[from] std::io::Error),
49
50 #[error("Process exited unexpectedly: {0}")]
51 ProcessExited(String),
52}
53
54#[derive(Debug, Clone)]
56pub struct ProcessInfo {
57 pub id: String,
59 pub command: String,
61 pub started_at: Instant,
63 pub local_port: Option<u16>,
65 pub is_running: bool,
67}
68
69struct BackgroundProcess {
71 id: String,
72 command: String,
73 started_at: Instant,
74 local_port: Option<u16>,
75 child: Child,
76}
77
78pub struct BackgroundProcessManager {
82 processes: Arc<Mutex<HashMap<String, BackgroundProcess>>>,
83}
84
85impl Default for BackgroundProcessManager {
86 fn default() -> Self {
87 Self::new()
88 }
89}
90
91impl BackgroundProcessManager {
92 pub fn new() -> Self {
94 Self {
95 processes: Arc::new(Mutex::new(HashMap::new())),
96 }
97 }
98
99 pub async fn start_port_forward(
121 &self,
122 id: &str,
123 resource: &str,
124 namespace: &str,
125 target_port: u16,
126 ) -> Result<u16, BackgroundProcessError> {
127 {
129 let processes = self.processes.lock().await;
130 if processes.contains_key(id) {
131 if let Some(proc) = processes.get(id) {
132 if let Some(port) = proc.local_port {
133 return Ok(port);
134 }
135 }
136 }
137 }
138
139 let port_spec = format!(":{}", target_port);
142 let command = format!(
143 "kubectl port-forward {} {} -n {}",
144 resource, port_spec, namespace
145 );
146
147 let mut child = Command::new("kubectl")
149 .arg("port-forward")
150 .arg(resource)
151 .arg(&port_spec)
152 .arg("-n")
153 .arg(namespace)
154 .stdout(std::process::Stdio::piped())
155 .stderr(std::process::Stdio::piped())
156 .spawn()
157 .map_err(|e| BackgroundProcessError::SpawnFailed(e.to_string()))?;
158
159 let stderr = child.stderr.take();
161
162 let local_port = if let Some(stdout) = child.stdout.take() {
165 let mut reader = BufReader::new(stdout).lines();
166 let mut port = None;
167
168 let timeout = tokio::time::Duration::from_secs(10);
170 let deadline = tokio::time::Instant::now() + timeout;
171
172 while tokio::time::Instant::now() < deadline {
173 match tokio::time::timeout(
174 deadline - tokio::time::Instant::now(),
175 reader.next_line(),
176 )
177 .await
178 {
179 Ok(Ok(Some(line))) => {
180 if line.contains("Forwarding from") {
182 if let Some(port_str) = line
183 .split(':')
184 .nth(1)
185 .and_then(|s| s.split_whitespace().next())
186 {
187 port = port_str.parse().ok();
188 tokio::spawn(async move {
190 while let Ok(Some(_)) = reader.next_line().await {}
191 });
192 break;
193 }
194 }
195 }
196 Ok(Ok(None)) => break, Ok(Err(e)) => {
198 return Err(BackgroundProcessError::IoError(e));
199 }
200 Err(_) => {
201 break;
203 }
204 }
205 }
206
207 port
208 } else {
209 None
210 };
211
212 let local_port = match local_port {
214 Some(p) => p,
215 None => {
216 let error_msg = if let Some(stderr) = stderr {
218 let mut reader = BufReader::new(stderr).lines();
219 let mut errors = Vec::new();
220 while let Ok(Ok(Some(line))) = tokio::time::timeout(
221 tokio::time::Duration::from_millis(100),
222 reader.next_line(),
223 )
224 .await
225 {
226 if !line.is_empty() {
227 errors.push(line);
228 }
229 }
230 if errors.is_empty() {
231 "Could not determine local port (no output from kubectl)".to_string()
232 } else {
233 errors.join("; ")
234 }
235 } else {
236 "Could not determine local port".to_string()
237 };
238 return Err(BackgroundProcessError::PortParseFailed(error_msg));
239 }
240 };
241
242 let mut processes = self.processes.lock().await;
244 processes.insert(
245 id.to_string(),
246 BackgroundProcess {
247 id: id.to_string(),
248 command,
249 started_at: Instant::now(),
250 local_port: Some(local_port),
251 child,
252 },
253 );
254
255 Ok(local_port)
256 }
257
258 pub async fn start(
266 &self,
267 id: &str,
268 command: &str,
269 working_dir: &Path,
270 ) -> Result<(), BackgroundProcessError> {
271 {
273 let processes = self.processes.lock().await;
274 if processes.contains_key(id) {
275 return Ok(()); }
277 }
278
279 let child = Command::new("sh")
280 .arg("-c")
281 .arg(command)
282 .current_dir(working_dir)
283 .stdout(std::process::Stdio::piped())
284 .stderr(std::process::Stdio::piped())
285 .spawn()
286 .map_err(|e| BackgroundProcessError::SpawnFailed(e.to_string()))?;
287
288 let mut processes = self.processes.lock().await;
289 processes.insert(
290 id.to_string(),
291 BackgroundProcess {
292 id: id.to_string(),
293 command: command.to_string(),
294 started_at: Instant::now(),
295 local_port: None,
296 child,
297 },
298 );
299
300 Ok(())
301 }
302
303 pub async fn stop(&self, id: &str) -> Result<(), BackgroundProcessError> {
305 let mut processes = self.processes.lock().await;
306 if let Some(mut proc) = processes.remove(id) {
307 let _ = proc.child.kill().await;
309 }
310 Ok(())
311 }
312
313 pub async fn is_running(&self, id: &str) -> bool {
315 let mut processes = self.processes.lock().await;
316 if let Some(proc) = processes.get_mut(id) {
317 match proc.child.try_wait() {
319 Ok(None) => true, Ok(Some(_)) => {
321 processes.remove(id);
323 false
324 }
325 Err(_) => false,
326 }
327 } else {
328 false
329 }
330 }
331
332 pub async fn get(&self, id: &str) -> Option<ProcessInfo> {
334 let mut processes = self.processes.lock().await;
335 if let Some(proc) = processes.get_mut(id) {
336 let is_running = proc
337 .child
338 .try_wait()
339 .ok()
340 .map(|s| s.is_none())
341 .unwrap_or(false);
342 Some(ProcessInfo {
343 id: proc.id.clone(),
344 command: proc.command.clone(),
345 started_at: proc.started_at,
346 local_port: proc.local_port,
347 is_running,
348 })
349 } else {
350 None
351 }
352 }
353
354 pub async fn list(&self) -> Vec<ProcessInfo> {
356 let mut processes = self.processes.lock().await;
357 let mut infos = Vec::new();
358 let mut to_remove = Vec::new();
359
360 for (id, proc) in processes.iter_mut() {
361 let is_running = proc
362 .child
363 .try_wait()
364 .ok()
365 .map(|s| s.is_none())
366 .unwrap_or(false);
367 if !is_running {
368 to_remove.push(id.clone());
369 }
370 infos.push(ProcessInfo {
371 id: proc.id.clone(),
372 command: proc.command.clone(),
373 started_at: proc.started_at,
374 local_port: proc.local_port,
375 is_running,
376 });
377 }
378
379 for id in to_remove {
381 processes.remove(&id);
382 }
383
384 infos
385 }
386
387 pub async fn stop_all(&self) {
389 let mut processes = self.processes.lock().await;
390 for (_, mut proc) in processes.drain() {
391 let _ = proc.child.kill().await;
392 }
393 }
394}
395
396impl Drop for BackgroundProcessManager {
397 fn drop(&mut self) {
398 }
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406
407 #[test]
408 fn test_new_manager() {
409 let manager = BackgroundProcessManager::new();
410 assert!(manager.processes.try_lock().unwrap().is_empty());
411 }
412
413 #[tokio::test]
414 async fn test_list_empty() {
415 let manager = BackgroundProcessManager::new();
416 let list = manager.list().await;
417 assert!(list.is_empty());
418 }
419
420 #[tokio::test]
421 async fn test_is_running_not_found() {
422 let manager = BackgroundProcessManager::new();
423 assert!(!manager.is_running("nonexistent").await);
424 }
425}