syncable_cli/agent/tools/
background.rs

1//! Background Process Manager
2//!
3//! Manages long-running background processes like `kubectl port-forward`.
4//! Processes run asynchronously and can be started, stopped, and listed.
5//!
6//! # Example
7//!
8//! ```rust,ignore
9//! use syncable_cli::agent::tools::background::BackgroundProcessManager;
10//!
11//! let manager = BackgroundProcessManager::new();
12//!
13//! // Start a port-forward in the background
14//! let port = manager.start_port_forward(
15//!     "prometheus",
16//!     "svc/prometheus-server",
17//!     "monitoring",
18//!     9090
19//! ).await?;
20//!
21//! println!("Port-forward running on localhost:{}", port);
22//!
23//! // Later, stop it
24//! manager.stop("prometheus").await?;
25//! ```
26
27use std::collections::HashMap;
28use std::path::Path;
29use std::sync::Arc;
30use std::time::Instant;
31use tokio::io::{AsyncBufReadExt, BufReader};
32use tokio::process::{Child, Command};
33use tokio::sync::Mutex;
34
35/// Error type for background process operations.
36#[derive(Debug, thiserror::Error)]
37pub enum BackgroundProcessError {
38    #[error("Failed to spawn process: {0}")]
39    SpawnFailed(String),
40
41    #[error("Process not found: {0}")]
42    NotFound(String),
43
44    #[error("Failed to parse port from output: {0}")]
45    PortParseFailed(String),
46
47    #[error("IO error: {0}")]
48    IoError(#[from] std::io::Error),
49
50    #[error("Process exited unexpectedly: {0}")]
51    ProcessExited(String),
52}
53
54/// Information about a running background process.
55#[derive(Debug, Clone)]
56pub struct ProcessInfo {
57    /// Unique identifier for this process
58    pub id: String,
59    /// The command that was executed
60    pub command: String,
61    /// When the process was started
62    pub started_at: Instant,
63    /// Local port (for port-forwards)
64    pub local_port: Option<u16>,
65    /// Whether the process is still running
66    pub is_running: bool,
67}
68
69/// Internal state for a background process.
70struct BackgroundProcess {
71    id: String,
72    command: String,
73    started_at: Instant,
74    local_port: Option<u16>,
75    child: Child,
76}
77
78/// Manages background processes like port-forwards.
79///
80/// Thread-safe and designed to be shared across the agent session.
81pub struct BackgroundProcessManager {
82    processes: Arc<Mutex<HashMap<String, BackgroundProcess>>>,
83}
84
85impl Default for BackgroundProcessManager {
86    fn default() -> Self {
87        Self::new()
88    }
89}
90
91impl BackgroundProcessManager {
92    /// Create a new background process manager.
93    pub fn new() -> Self {
94        Self {
95            processes: Arc::new(Mutex::new(HashMap::new())),
96        }
97    }
98
99    /// Start a kubectl port-forward in the background.
100    ///
101    /// Returns the local port that was allocated.
102    ///
103    /// # Arguments
104    ///
105    /// * `id` - Unique identifier for this port-forward
106    /// * `resource` - Kubernetes resource (e.g., "svc/prometheus-server" or "pod/prometheus-0")
107    /// * `namespace` - Kubernetes namespace
108    /// * `target_port` - The port on the remote resource
109    ///
110    /// # Example
111    ///
112    /// ```rust,ignore
113    /// let port = manager.start_port_forward(
114    ///     "prometheus",
115    ///     "svc/prometheus-server",
116    ///     "monitoring",
117    ///     9090
118    /// ).await?;
119    /// ```
120    pub async fn start_port_forward(
121        &self,
122        id: &str,
123        resource: &str,
124        namespace: &str,
125        target_port: u16,
126    ) -> Result<u16, BackgroundProcessError> {
127        // Check if already running
128        {
129            let processes = self.processes.lock().await;
130            if processes.contains_key(id)
131                && let Some(proc) = processes.get(id)
132                && let Some(port) = proc.local_port
133            {
134                return Ok(port);
135            }
136        }
137
138        // Build the port-forward command
139        // Using :0 to let kubectl pick a random available port
140        let port_spec = format!(":{}", target_port);
141        let command = format!(
142            "kubectl port-forward {} {} -n {}",
143            resource, port_spec, namespace
144        );
145
146        // Spawn kubectl directly (not through sh) to avoid process hierarchy issues
147        let mut child = Command::new("kubectl")
148            .arg("port-forward")
149            .arg(resource)
150            .arg(&port_spec)
151            .arg("-n")
152            .arg(namespace)
153            .stdout(std::process::Stdio::piped())
154            .stderr(std::process::Stdio::piped())
155            .spawn()
156            .map_err(|e| BackgroundProcessError::SpawnFailed(e.to_string()))?;
157
158        // Take stderr for error capturing
159        let stderr = child.stderr.take();
160
161        // Read stdout to get the port
162        // kubectl outputs: "Forwarding from 127.0.0.1:XXXXX -> 9090" to stdout
163        let local_port = if let Some(stdout) = child.stdout.take() {
164            let mut reader = BufReader::new(stdout).lines();
165            let mut port = None;
166
167            // Read lines with timeout
168            let timeout = tokio::time::Duration::from_secs(10);
169            let deadline = tokio::time::Instant::now() + timeout;
170
171            while tokio::time::Instant::now() < deadline {
172                match tokio::time::timeout(
173                    deadline - tokio::time::Instant::now(),
174                    reader.next_line(),
175                )
176                .await
177                {
178                    Ok(Ok(Some(line))) => {
179                        // Parse port from "Forwarding from 127.0.0.1:XXXXX -> 9090"
180                        if line.contains("Forwarding from")
181                            && let Some(port_str) = line
182                                .split(':')
183                                .nth(1)
184                                .and_then(|s| s.split_whitespace().next())
185                        {
186                            port = port_str.parse().ok();
187                            // Keep draining stdout in background to prevent SIGPIPE
188                            tokio::spawn(async move {
189                                while let Ok(Some(_)) = reader.next_line().await {}
190                            });
191                            break;
192                        }
193                    }
194                    Ok(Ok(None)) => break, // EOF
195                    Ok(Err(e)) => {
196                        return Err(BackgroundProcessError::IoError(e));
197                    }
198                    Err(_) => {
199                        // Timeout - process may still be starting
200                        break;
201                    }
202                }
203            }
204
205            port
206        } else {
207            None
208        };
209
210        // If we couldn't get the port, try to capture stderr for better error messages
211        let local_port = match local_port {
212            Some(p) => p,
213            None => {
214                // Try to read stderr for error messages
215                let error_msg = if let Some(stderr) = stderr {
216                    let mut reader = BufReader::new(stderr).lines();
217                    let mut errors = Vec::new();
218                    while let Ok(Ok(Some(line))) = tokio::time::timeout(
219                        tokio::time::Duration::from_millis(100),
220                        reader.next_line(),
221                    )
222                    .await
223                    {
224                        if !line.is_empty() {
225                            errors.push(line);
226                        }
227                    }
228                    if errors.is_empty() {
229                        "Could not determine local port (no output from kubectl)".to_string()
230                    } else {
231                        errors.join("; ")
232                    }
233                } else {
234                    "Could not determine local port".to_string()
235                };
236                return Err(BackgroundProcessError::PortParseFailed(error_msg));
237            }
238        };
239
240        // Store the process
241        let mut processes = self.processes.lock().await;
242        processes.insert(
243            id.to_string(),
244            BackgroundProcess {
245                id: id.to_string(),
246                command,
247                started_at: Instant::now(),
248                local_port: Some(local_port),
249                child,
250            },
251        );
252
253        Ok(local_port)
254    }
255
256    /// Start a generic background command.
257    ///
258    /// # Arguments
259    ///
260    /// * `id` - Unique identifier for this process
261    /// * `command` - The command to execute
262    /// * `working_dir` - Working directory for the command
263    pub async fn start(
264        &self,
265        id: &str,
266        command: &str,
267        working_dir: &Path,
268    ) -> Result<(), BackgroundProcessError> {
269        // Check if already running
270        {
271            let processes = self.processes.lock().await;
272            if processes.contains_key(id) {
273                return Ok(()); // Already running
274            }
275        }
276
277        let child = Command::new("sh")
278            .arg("-c")
279            .arg(command)
280            .current_dir(working_dir)
281            .stdout(std::process::Stdio::piped())
282            .stderr(std::process::Stdio::piped())
283            .spawn()
284            .map_err(|e| BackgroundProcessError::SpawnFailed(e.to_string()))?;
285
286        let mut processes = self.processes.lock().await;
287        processes.insert(
288            id.to_string(),
289            BackgroundProcess {
290                id: id.to_string(),
291                command: command.to_string(),
292                started_at: Instant::now(),
293                local_port: None,
294                child,
295            },
296        );
297
298        Ok(())
299    }
300
301    /// Stop a background process by ID.
302    pub async fn stop(&self, id: &str) -> Result<(), BackgroundProcessError> {
303        let mut processes = self.processes.lock().await;
304        if let Some(mut proc) = processes.remove(id) {
305            // Try graceful shutdown first
306            let _ = proc.child.kill().await;
307        }
308        Ok(())
309    }
310
311    /// Check if a process is running.
312    pub async fn is_running(&self, id: &str) -> bool {
313        let mut processes = self.processes.lock().await;
314        if let Some(proc) = processes.get_mut(id) {
315            // Check if process is still alive
316            match proc.child.try_wait() {
317                Ok(None) => true, // Still running
318                Ok(Some(_)) => {
319                    // Process exited, clean up
320                    processes.remove(id);
321                    false
322                }
323                Err(_) => false,
324            }
325        } else {
326            false
327        }
328    }
329
330    /// Get information about a specific process.
331    pub async fn get(&self, id: &str) -> Option<ProcessInfo> {
332        let mut processes = self.processes.lock().await;
333        if let Some(proc) = processes.get_mut(id) {
334            let is_running = proc
335                .child
336                .try_wait()
337                .ok()
338                .map(|s| s.is_none())
339                .unwrap_or(false);
340            Some(ProcessInfo {
341                id: proc.id.clone(),
342                command: proc.command.clone(),
343                started_at: proc.started_at,
344                local_port: proc.local_port,
345                is_running,
346            })
347        } else {
348            None
349        }
350    }
351
352    /// List all background processes.
353    pub async fn list(&self) -> Vec<ProcessInfo> {
354        let mut processes = self.processes.lock().await;
355        let mut infos = Vec::new();
356        let mut to_remove = Vec::new();
357
358        for (id, proc) in processes.iter_mut() {
359            let is_running = proc
360                .child
361                .try_wait()
362                .ok()
363                .map(|s| s.is_none())
364                .unwrap_or(false);
365            if !is_running {
366                to_remove.push(id.clone());
367            }
368            infos.push(ProcessInfo {
369                id: proc.id.clone(),
370                command: proc.command.clone(),
371                started_at: proc.started_at,
372                local_port: proc.local_port,
373                is_running,
374            });
375        }
376
377        // Clean up exited processes
378        for id in to_remove {
379            processes.remove(&id);
380        }
381
382        infos
383    }
384
385    /// Stop all background processes.
386    pub async fn stop_all(&self) {
387        let mut processes = self.processes.lock().await;
388        for (_, mut proc) in processes.drain() {
389            let _ = proc.child.kill().await;
390        }
391    }
392}
393
394impl Drop for BackgroundProcessManager {
395    fn drop(&mut self) {
396        // Note: We can't await here, so we use blocking
397        // In practice, the manager should be stopped explicitly
398    }
399}
400
401#[cfg(test)]
402mod tests {
403    use super::*;
404
405    #[test]
406    fn test_new_manager() {
407        let manager = BackgroundProcessManager::new();
408        assert!(manager.processes.try_lock().unwrap().is_empty());
409    }
410
411    #[tokio::test]
412    async fn test_list_empty() {
413        let manager = BackgroundProcessManager::new();
414        let list = manager.list().await;
415        assert!(list.is_empty());
416    }
417
418    #[tokio::test]
419    async fn test_is_running_not_found() {
420        let manager = BackgroundProcessManager::new();
421        assert!(!manager.is_running("nonexistent").await);
422    }
423}