syncable_cli/agent/tools/
background.rs1use std::collections::HashMap;
28use std::path::Path;
29use std::sync::Arc;
30use std::time::Instant;
31use tokio::io::{AsyncBufReadExt, BufReader};
32use tokio::process::{Child, Command};
33use tokio::sync::Mutex;
34
35#[derive(Debug, thiserror::Error)]
37pub enum BackgroundProcessError {
38 #[error("Failed to spawn process: {0}")]
39 SpawnFailed(String),
40
41 #[error("Process not found: {0}")]
42 NotFound(String),
43
44 #[error("Failed to parse port from output: {0}")]
45 PortParseFailed(String),
46
47 #[error("IO error: {0}")]
48 IoError(#[from] std::io::Error),
49
50 #[error("Process exited unexpectedly: {0}")]
51 ProcessExited(String),
52}
53
54#[derive(Debug, Clone)]
56pub struct ProcessInfo {
57 pub id: String,
59 pub command: String,
61 pub started_at: Instant,
63 pub local_port: Option<u16>,
65 pub is_running: bool,
67}
68
69struct BackgroundProcess {
71 id: String,
72 command: String,
73 started_at: Instant,
74 local_port: Option<u16>,
75 child: Child,
76}
77
78pub struct BackgroundProcessManager {
82 processes: Arc<Mutex<HashMap<String, BackgroundProcess>>>,
83}
84
85impl Default for BackgroundProcessManager {
86 fn default() -> Self {
87 Self::new()
88 }
89}
90
91impl BackgroundProcessManager {
92 pub fn new() -> Self {
94 Self {
95 processes: Arc::new(Mutex::new(HashMap::new())),
96 }
97 }
98
99 pub async fn start_port_forward(
121 &self,
122 id: &str,
123 resource: &str,
124 namespace: &str,
125 target_port: u16,
126 ) -> Result<u16, BackgroundProcessError> {
127 {
129 let processes = self.processes.lock().await;
130 if processes.contains_key(id)
131 && let Some(proc) = processes.get(id)
132 && let Some(port) = proc.local_port
133 {
134 return Ok(port);
135 }
136 }
137
138 let port_spec = format!(":{}", target_port);
141 let command = format!(
142 "kubectl port-forward {} {} -n {}",
143 resource, port_spec, namespace
144 );
145
146 let mut child = Command::new("kubectl")
148 .arg("port-forward")
149 .arg(resource)
150 .arg(&port_spec)
151 .arg("-n")
152 .arg(namespace)
153 .stdout(std::process::Stdio::piped())
154 .stderr(std::process::Stdio::piped())
155 .spawn()
156 .map_err(|e| BackgroundProcessError::SpawnFailed(e.to_string()))?;
157
158 let stderr = child.stderr.take();
160
161 let local_port = if let Some(stdout) = child.stdout.take() {
164 let mut reader = BufReader::new(stdout).lines();
165 let mut port = None;
166
167 let timeout = tokio::time::Duration::from_secs(10);
169 let deadline = tokio::time::Instant::now() + timeout;
170
171 while tokio::time::Instant::now() < deadline {
172 match tokio::time::timeout(
173 deadline - tokio::time::Instant::now(),
174 reader.next_line(),
175 )
176 .await
177 {
178 Ok(Ok(Some(line))) => {
179 if line.contains("Forwarding from")
181 && let Some(port_str) = line
182 .split(':')
183 .nth(1)
184 .and_then(|s| s.split_whitespace().next())
185 {
186 port = port_str.parse().ok();
187 tokio::spawn(async move {
189 while let Ok(Some(_)) = reader.next_line().await {}
190 });
191 break;
192 }
193 }
194 Ok(Ok(None)) => break, Ok(Err(e)) => {
196 return Err(BackgroundProcessError::IoError(e));
197 }
198 Err(_) => {
199 break;
201 }
202 }
203 }
204
205 port
206 } else {
207 None
208 };
209
210 let local_port = match local_port {
212 Some(p) => p,
213 None => {
214 let error_msg = if let Some(stderr) = stderr {
216 let mut reader = BufReader::new(stderr).lines();
217 let mut errors = Vec::new();
218 while let Ok(Ok(Some(line))) = tokio::time::timeout(
219 tokio::time::Duration::from_millis(100),
220 reader.next_line(),
221 )
222 .await
223 {
224 if !line.is_empty() {
225 errors.push(line);
226 }
227 }
228 if errors.is_empty() {
229 "Could not determine local port (no output from kubectl)".to_string()
230 } else {
231 errors.join("; ")
232 }
233 } else {
234 "Could not determine local port".to_string()
235 };
236 return Err(BackgroundProcessError::PortParseFailed(error_msg));
237 }
238 };
239
240 let mut processes = self.processes.lock().await;
242 processes.insert(
243 id.to_string(),
244 BackgroundProcess {
245 id: id.to_string(),
246 command,
247 started_at: Instant::now(),
248 local_port: Some(local_port),
249 child,
250 },
251 );
252
253 Ok(local_port)
254 }
255
256 pub async fn start(
264 &self,
265 id: &str,
266 command: &str,
267 working_dir: &Path,
268 ) -> Result<(), BackgroundProcessError> {
269 {
271 let processes = self.processes.lock().await;
272 if processes.contains_key(id) {
273 return Ok(()); }
275 }
276
277 let child = Command::new("sh")
278 .arg("-c")
279 .arg(command)
280 .current_dir(working_dir)
281 .stdout(std::process::Stdio::piped())
282 .stderr(std::process::Stdio::piped())
283 .spawn()
284 .map_err(|e| BackgroundProcessError::SpawnFailed(e.to_string()))?;
285
286 let mut processes = self.processes.lock().await;
287 processes.insert(
288 id.to_string(),
289 BackgroundProcess {
290 id: id.to_string(),
291 command: command.to_string(),
292 started_at: Instant::now(),
293 local_port: None,
294 child,
295 },
296 );
297
298 Ok(())
299 }
300
301 pub async fn stop(&self, id: &str) -> Result<(), BackgroundProcessError> {
303 let mut processes = self.processes.lock().await;
304 if let Some(mut proc) = processes.remove(id) {
305 let _ = proc.child.kill().await;
307 }
308 Ok(())
309 }
310
311 pub async fn is_running(&self, id: &str) -> bool {
313 let mut processes = self.processes.lock().await;
314 if let Some(proc) = processes.get_mut(id) {
315 match proc.child.try_wait() {
317 Ok(None) => true, Ok(Some(_)) => {
319 processes.remove(id);
321 false
322 }
323 Err(_) => false,
324 }
325 } else {
326 false
327 }
328 }
329
330 pub async fn get(&self, id: &str) -> Option<ProcessInfo> {
332 let mut processes = self.processes.lock().await;
333 if let Some(proc) = processes.get_mut(id) {
334 let is_running = proc
335 .child
336 .try_wait()
337 .ok()
338 .map(|s| s.is_none())
339 .unwrap_or(false);
340 Some(ProcessInfo {
341 id: proc.id.clone(),
342 command: proc.command.clone(),
343 started_at: proc.started_at,
344 local_port: proc.local_port,
345 is_running,
346 })
347 } else {
348 None
349 }
350 }
351
352 pub async fn list(&self) -> Vec<ProcessInfo> {
354 let mut processes = self.processes.lock().await;
355 let mut infos = Vec::new();
356 let mut to_remove = Vec::new();
357
358 for (id, proc) in processes.iter_mut() {
359 let is_running = proc
360 .child
361 .try_wait()
362 .ok()
363 .map(|s| s.is_none())
364 .unwrap_or(false);
365 if !is_running {
366 to_remove.push(id.clone());
367 }
368 infos.push(ProcessInfo {
369 id: proc.id.clone(),
370 command: proc.command.clone(),
371 started_at: proc.started_at,
372 local_port: proc.local_port,
373 is_running,
374 });
375 }
376
377 for id in to_remove {
379 processes.remove(&id);
380 }
381
382 infos
383 }
384
385 pub async fn stop_all(&self) {
387 let mut processes = self.processes.lock().await;
388 for (_, mut proc) in processes.drain() {
389 let _ = proc.child.kill().await;
390 }
391 }
392}
393
394impl Drop for BackgroundProcessManager {
395 fn drop(&mut self) {
396 }
399}
400
401#[cfg(test)]
402mod tests {
403 use super::*;
404
405 #[test]
406 fn test_new_manager() {
407 let manager = BackgroundProcessManager::new();
408 assert!(manager.processes.try_lock().unwrap().is_empty());
409 }
410
411 #[tokio::test]
412 async fn test_list_empty() {
413 let manager = BackgroundProcessManager::new();
414 let list = manager.list().await;
415 assert!(list.is_empty());
416 }
417
418 #[tokio::test]
419 async fn test_is_running_not_found() {
420 let manager = BackgroundProcessManager::new();
421 assert!(!manager.is_running("nonexistent").await);
422 }
423}