mcp_sdk_rs/process/
manager.rs

1use tokio::{
2    process::{Child, Command},
3    sync::mpsc,
4    task::JoinHandle,
5};
6
7use super::io::{handle_stderr, handle_stdin, handle_stdout};
8
9const MESSAGE_BUFFER_SIZE: usize = 100;
10
11#[derive(Debug, thiserror::Error)]
12pub enum ProcessError {
13    #[error("IO error: {0}")]
14    Io(#[from] std::io::Error),
15    #[error("Process error: {0}")]
16    Other(String),
17}
18
19pub struct ProcessManager {
20    child: Option<Child>,
21    stdin: Option<JoinHandle<()>>,
22    stdout: Option<JoinHandle<()>>,
23    stderr: Option<JoinHandle<()>>,
24}
25
26impl Drop for ProcessManager {
27    fn drop(&mut self) {
28        if let Some(mut child) = self.child.take() {
29            // Try to kill the child process when dropping
30            log::debug!("Starting kill for {}", child.id().unwrap_or(0));
31            let _ = child.start_kill();
32        }
33    }
34}
35
36impl ProcessManager {
37    /// Create a new ProcessManager
38    pub fn new() -> Self {
39        Self {
40            child: None,
41            stdin: None,
42            stdout: None,
43            stderr: None,
44        }
45    }
46
47    /// Start a new process and return a sender for communicating with it
48    pub async fn start_process(
49        &mut self,
50        command: Command,
51        output_tx: mpsc::Sender<String>,
52    ) -> Result<mpsc::Sender<String>, ProcessError> {
53        // Clean up any existing process first
54        self.shutdown().await;
55
56        let child = self.spawn_process(command)?;
57        let (process_tx, process_rx) = mpsc::channel::<String>(MESSAGE_BUFFER_SIZE);
58
59        self.setup_io_handlers(child, process_rx, output_tx)?;
60
61        Ok(process_tx)
62    }
63
64    /// Spawn a child process with proper stdio configuration
65    fn spawn_process(&mut self, mut command: Command) -> Result<Child, ProcessError> {
66        log::debug!("spawning process: {:?}", command);
67
68        let child = command
69            .stdin(std::process::Stdio::piped())
70            .stdout(std::process::Stdio::piped())
71            .stderr(std::process::Stdio::piped())
72            .kill_on_drop(true)
73            .spawn()?;
74
75        Ok(child)
76    }
77
78    /// Set up IO handlers for the child process
79    fn setup_io_handlers(
80        &mut self,
81        mut child: Child,
82        process_rx: mpsc::Receiver<String>,
83        output_tx: mpsc::Sender<String>,
84    ) -> Result<(), ProcessError> {
85        let stdin = child
86            .stdin
87            .take()
88            .ok_or_else(|| ProcessError::Other("failed to get child stdin".to_string()))?;
89        let stdout = child
90            .stdout
91            .take()
92            .ok_or_else(|| ProcessError::Other("failed to get child stdout".to_string()))?;
93        let stderr = child
94            .stderr
95            .take()
96            .ok_or_else(|| ProcessError::Other("failed to get child stderr".to_string()))?;
97
98        self.child = Some(child);
99
100        self.stdin = Some(tokio::spawn(handle_stdin(stdin, process_rx)));
101        self.stdout = Some(tokio::spawn(handle_stdout(stdout, output_tx)));
102        self.stderr = Some(tokio::spawn(handle_stderr(stderr)));
103
104        Ok(())
105    }
106
107    /// Shutdown the child process gracefully
108    pub async fn shutdown(&mut self) {
109        if let Some(mut child) = self.child.take() {
110            log::debug!("stopping child process...");
111            if let Err(e) = child.kill().await {
112                log::error!("failed to stop child process: {}", e);
113            }
114            if let Err(e) = child.wait().await {
115                log::error!("error waiting for child process to exit: {}", e);
116            }
117            log::debug!("child process stopped");
118            self.abort_io_handlers();
119        }
120    }
121
122    pub fn abort_io_handlers(&mut self) {
123        log::debug!("stopping io handler tasks...");
124        if let (Some(input), Some(output), Some(err)) =
125            (&mut self.stdin, &mut self.stdout, &mut self.stderr)
126        {
127            input.abort();
128            output.abort();
129            err.abort();
130            log::debug!("io handler tasks stopped");
131        } else {
132            log::debug!("missing io handler; unable to abort")
133        }
134    }
135}