syncable_cli/agent/tools/
background.rs

1//! Background Process Manager
2//!
3//! Manages long-running background processes like `kubectl port-forward`.
4//! Processes run asynchronously and can be started, stopped, and listed.
5//!
6//! # Example
7//!
8//! ```rust,ignore
9//! use syncable_cli::agent::tools::background::BackgroundProcessManager;
10//!
11//! let manager = BackgroundProcessManager::new();
12//!
13//! // Start a port-forward in the background
14//! let port = manager.start_port_forward(
15//!     "prometheus",
16//!     "svc/prometheus-server",
17//!     "monitoring",
18//!     9090
19//! ).await?;
20//!
21//! println!("Port-forward running on localhost:{}", port);
22//!
23//! // Later, stop it
24//! manager.stop("prometheus").await?;
25//! ```
26
27use std::collections::HashMap;
28use std::path::Path;
29use std::sync::Arc;
30use std::time::Instant;
31use tokio::io::{AsyncBufReadExt, BufReader};
32use tokio::process::{Child, Command};
33use tokio::sync::Mutex;
34
35/// Error type for background process operations.
36#[derive(Debug, thiserror::Error)]
37pub enum BackgroundProcessError {
38    #[error("Failed to spawn process: {0}")]
39    SpawnFailed(String),
40
41    #[error("Process not found: {0}")]
42    NotFound(String),
43
44    #[error("Failed to parse port from output: {0}")]
45    PortParseFailed(String),
46
47    #[error("IO error: {0}")]
48    IoError(#[from] std::io::Error),
49
50    #[error("Process exited unexpectedly: {0}")]
51    ProcessExited(String),
52}
53
54/// Information about a running background process.
55#[derive(Debug, Clone)]
56pub struct ProcessInfo {
57    /// Unique identifier for this process
58    pub id: String,
59    /// The command that was executed
60    pub command: String,
61    /// When the process was started
62    pub started_at: Instant,
63    /// Local port (for port-forwards)
64    pub local_port: Option<u16>,
65    /// Whether the process is still running
66    pub is_running: bool,
67}
68
69/// Internal state for a background process.
70struct BackgroundProcess {
71    id: String,
72    command: String,
73    started_at: Instant,
74    local_port: Option<u16>,
75    child: Child,
76}
77
78/// Manages background processes like port-forwards.
79///
80/// Thread-safe and designed to be shared across the agent session.
81pub struct BackgroundProcessManager {
82    processes: Arc<Mutex<HashMap<String, BackgroundProcess>>>,
83}
84
85impl Default for BackgroundProcessManager {
86    fn default() -> Self {
87        Self::new()
88    }
89}
90
91impl BackgroundProcessManager {
92    /// Create a new background process manager.
93    pub fn new() -> Self {
94        Self {
95            processes: Arc::new(Mutex::new(HashMap::new())),
96        }
97    }
98
99    /// Start a kubectl port-forward in the background.
100    ///
101    /// Returns the local port that was allocated.
102    ///
103    /// # Arguments
104    ///
105    /// * `id` - Unique identifier for this port-forward
106    /// * `resource` - Kubernetes resource (e.g., "svc/prometheus-server" or "pod/prometheus-0")
107    /// * `namespace` - Kubernetes namespace
108    /// * `target_port` - The port on the remote resource
109    ///
110    /// # Example
111    ///
112    /// ```rust,ignore
113    /// let port = manager.start_port_forward(
114    ///     "prometheus",
115    ///     "svc/prometheus-server",
116    ///     "monitoring",
117    ///     9090
118    /// ).await?;
119    /// ```
120    pub async fn start_port_forward(
121        &self,
122        id: &str,
123        resource: &str,
124        namespace: &str,
125        target_port: u16,
126    ) -> Result<u16, BackgroundProcessError> {
127        // Check if already running
128        {
129            let processes = self.processes.lock().await;
130            if processes.contains_key(id) {
131                if let Some(proc) = processes.get(id) {
132                    if let Some(port) = proc.local_port {
133                        return Ok(port);
134                    }
135                }
136            }
137        }
138
139        // Build the port-forward command
140        // Using :0 to let kubectl pick a random available port
141        let port_spec = format!(":{}", target_port);
142        let command = format!(
143            "kubectl port-forward {} {} -n {}",
144            resource, port_spec, namespace
145        );
146
147        // Spawn kubectl directly (not through sh) to avoid process hierarchy issues
148        let mut child = Command::new("kubectl")
149            .arg("port-forward")
150            .arg(resource)
151            .arg(&port_spec)
152            .arg("-n")
153            .arg(namespace)
154            .stdout(std::process::Stdio::piped())
155            .stderr(std::process::Stdio::piped())
156            .spawn()
157            .map_err(|e| BackgroundProcessError::SpawnFailed(e.to_string()))?;
158
159        // Take stderr for error capturing
160        let stderr = child.stderr.take();
161
162        // Read stdout to get the port
163        // kubectl outputs: "Forwarding from 127.0.0.1:XXXXX -> 9090" to stdout
164        let local_port = if let Some(stdout) = child.stdout.take() {
165            let mut reader = BufReader::new(stdout).lines();
166            let mut port = None;
167
168            // Read lines with timeout
169            let timeout = tokio::time::Duration::from_secs(10);
170            let deadline = tokio::time::Instant::now() + timeout;
171
172            while tokio::time::Instant::now() < deadline {
173                match tokio::time::timeout(
174                    deadline - tokio::time::Instant::now(),
175                    reader.next_line(),
176                )
177                .await
178                {
179                    Ok(Ok(Some(line))) => {
180                        // Parse port from "Forwarding from 127.0.0.1:XXXXX -> 9090"
181                        if line.contains("Forwarding from") {
182                            if let Some(port_str) = line
183                                .split(':')
184                                .nth(1)
185                                .and_then(|s| s.split_whitespace().next())
186                            {
187                                port = port_str.parse().ok();
188                                // Keep draining stdout in background to prevent SIGPIPE
189                                tokio::spawn(async move {
190                                    while let Ok(Some(_)) = reader.next_line().await {}
191                                });
192                                break;
193                            }
194                        }
195                    }
196                    Ok(Ok(None)) => break, // EOF
197                    Ok(Err(e)) => {
198                        return Err(BackgroundProcessError::IoError(e));
199                    }
200                    Err(_) => {
201                        // Timeout - process may still be starting
202                        break;
203                    }
204                }
205            }
206
207            port
208        } else {
209            None
210        };
211
212        // If we couldn't get the port, try to capture stderr for better error messages
213        let local_port = match local_port {
214            Some(p) => p,
215            None => {
216                // Try to read stderr for error messages
217                let error_msg = if let Some(stderr) = stderr {
218                    let mut reader = BufReader::new(stderr).lines();
219                    let mut errors = Vec::new();
220                    while let Ok(Ok(Some(line))) = tokio::time::timeout(
221                        tokio::time::Duration::from_millis(100),
222                        reader.next_line(),
223                    )
224                    .await
225                    {
226                        if !line.is_empty() {
227                            errors.push(line);
228                        }
229                    }
230                    if errors.is_empty() {
231                        "Could not determine local port (no output from kubectl)".to_string()
232                    } else {
233                        errors.join("; ")
234                    }
235                } else {
236                    "Could not determine local port".to_string()
237                };
238                return Err(BackgroundProcessError::PortParseFailed(error_msg));
239            }
240        };
241
242        // Store the process
243        let mut processes = self.processes.lock().await;
244        processes.insert(
245            id.to_string(),
246            BackgroundProcess {
247                id: id.to_string(),
248                command,
249                started_at: Instant::now(),
250                local_port: Some(local_port),
251                child,
252            },
253        );
254
255        Ok(local_port)
256    }
257
258    /// Start a generic background command.
259    ///
260    /// # Arguments
261    ///
262    /// * `id` - Unique identifier for this process
263    /// * `command` - The command to execute
264    /// * `working_dir` - Working directory for the command
265    pub async fn start(
266        &self,
267        id: &str,
268        command: &str,
269        working_dir: &Path,
270    ) -> Result<(), BackgroundProcessError> {
271        // Check if already running
272        {
273            let processes = self.processes.lock().await;
274            if processes.contains_key(id) {
275                return Ok(()); // Already running
276            }
277        }
278
279        let child = Command::new("sh")
280            .arg("-c")
281            .arg(command)
282            .current_dir(working_dir)
283            .stdout(std::process::Stdio::piped())
284            .stderr(std::process::Stdio::piped())
285            .spawn()
286            .map_err(|e| BackgroundProcessError::SpawnFailed(e.to_string()))?;
287
288        let mut processes = self.processes.lock().await;
289        processes.insert(
290            id.to_string(),
291            BackgroundProcess {
292                id: id.to_string(),
293                command: command.to_string(),
294                started_at: Instant::now(),
295                local_port: None,
296                child,
297            },
298        );
299
300        Ok(())
301    }
302
303    /// Stop a background process by ID.
304    pub async fn stop(&self, id: &str) -> Result<(), BackgroundProcessError> {
305        let mut processes = self.processes.lock().await;
306        if let Some(mut proc) = processes.remove(id) {
307            // Try graceful shutdown first
308            let _ = proc.child.kill().await;
309        }
310        Ok(())
311    }
312
313    /// Check if a process is running.
314    pub async fn is_running(&self, id: &str) -> bool {
315        let mut processes = self.processes.lock().await;
316        if let Some(proc) = processes.get_mut(id) {
317            // Check if process is still alive
318            match proc.child.try_wait() {
319                Ok(None) => true, // Still running
320                Ok(Some(_)) => {
321                    // Process exited, clean up
322                    processes.remove(id);
323                    false
324                }
325                Err(_) => false,
326            }
327        } else {
328            false
329        }
330    }
331
332    /// Get information about a specific process.
333    pub async fn get(&self, id: &str) -> Option<ProcessInfo> {
334        let mut processes = self.processes.lock().await;
335        if let Some(proc) = processes.get_mut(id) {
336            let is_running = proc
337                .child
338                .try_wait()
339                .ok()
340                .map(|s| s.is_none())
341                .unwrap_or(false);
342            Some(ProcessInfo {
343                id: proc.id.clone(),
344                command: proc.command.clone(),
345                started_at: proc.started_at,
346                local_port: proc.local_port,
347                is_running,
348            })
349        } else {
350            None
351        }
352    }
353
354    /// List all background processes.
355    pub async fn list(&self) -> Vec<ProcessInfo> {
356        let mut processes = self.processes.lock().await;
357        let mut infos = Vec::new();
358        let mut to_remove = Vec::new();
359
360        for (id, proc) in processes.iter_mut() {
361            let is_running = proc
362                .child
363                .try_wait()
364                .ok()
365                .map(|s| s.is_none())
366                .unwrap_or(false);
367            if !is_running {
368                to_remove.push(id.clone());
369            }
370            infos.push(ProcessInfo {
371                id: proc.id.clone(),
372                command: proc.command.clone(),
373                started_at: proc.started_at,
374                local_port: proc.local_port,
375                is_running,
376            });
377        }
378
379        // Clean up exited processes
380        for id in to_remove {
381            processes.remove(&id);
382        }
383
384        infos
385    }
386
387    /// Stop all background processes.
388    pub async fn stop_all(&self) {
389        let mut processes = self.processes.lock().await;
390        for (_, mut proc) in processes.drain() {
391            let _ = proc.child.kill().await;
392        }
393    }
394}
395
396impl Drop for BackgroundProcessManager {
397    fn drop(&mut self) {
398        // Note: We can't await here, so we use blocking
399        // In practice, the manager should be stopped explicitly
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406
407    #[test]
408    fn test_new_manager() {
409        let manager = BackgroundProcessManager::new();
410        assert!(manager.processes.try_lock().unwrap().is_empty());
411    }
412
413    #[tokio::test]
414    async fn test_list_empty() {
415        let manager = BackgroundProcessManager::new();
416        let list = manager.list().await;
417        assert!(list.is_empty());
418    }
419
420    #[tokio::test]
421    async fn test_is_running_not_found() {
422        let manager = BackgroundProcessManager::new();
423        assert!(!manager.is_running("nonexistent").await);
424    }
425}