mcp_sdk_rs/process/
manager.rs1use tokio::{
2 process::{Child, Command},
3 sync::mpsc,
4 task::JoinHandle,
5};
6
7use super::io::{handle_stderr, handle_stdin, handle_stdout};
8
9const MESSAGE_BUFFER_SIZE: usize = 100;
10
11#[derive(Debug, thiserror::Error)]
12pub enum ProcessError {
13 #[error("IO error: {0}")]
14 Io(#[from] std::io::Error),
15 #[error("Process error: {0}")]
16 Other(String),
17}
18
19pub struct ProcessManager {
20 child: Option<Child>,
21 stdin: Option<JoinHandle<()>>,
22 stdout: Option<JoinHandle<()>>,
23 stderr: Option<JoinHandle<()>>,
24}
25
26impl Drop for ProcessManager {
27 fn drop(&mut self) {
28 if let Some(mut child) = self.child.take() {
29 log::debug!("Starting kill for {}", child.id().unwrap_or(0));
31 let _ = child.start_kill();
32 }
33 }
34}
35
36impl ProcessManager {
37 pub fn new() -> Self {
39 Self {
40 child: None,
41 stdin: None,
42 stdout: None,
43 stderr: None,
44 }
45 }
46
47 pub async fn start_process(
49 &mut self,
50 command: Command,
51 output_tx: mpsc::Sender<String>,
52 ) -> Result<mpsc::Sender<String>, ProcessError> {
53 self.shutdown().await;
55
56 let child = self.spawn_process(command)?;
57 let (process_tx, process_rx) = mpsc::channel::<String>(MESSAGE_BUFFER_SIZE);
58
59 self.setup_io_handlers(child, process_rx, output_tx)?;
60
61 Ok(process_tx)
62 }
63
64 fn spawn_process(&mut self, mut command: Command) -> Result<Child, ProcessError> {
66 log::debug!("spawning process: {:?}", command);
67
68 let child = command
69 .stdin(std::process::Stdio::piped())
70 .stdout(std::process::Stdio::piped())
71 .stderr(std::process::Stdio::piped())
72 .kill_on_drop(true)
73 .spawn()?;
74
75 Ok(child)
76 }
77
78 fn setup_io_handlers(
80 &mut self,
81 mut child: Child,
82 process_rx: mpsc::Receiver<String>,
83 output_tx: mpsc::Sender<String>,
84 ) -> Result<(), ProcessError> {
85 let stdin = child
86 .stdin
87 .take()
88 .ok_or_else(|| ProcessError::Other("failed to get child stdin".to_string()))?;
89 let stdout = child
90 .stdout
91 .take()
92 .ok_or_else(|| ProcessError::Other("failed to get child stdout".to_string()))?;
93 let stderr = child
94 .stderr
95 .take()
96 .ok_or_else(|| ProcessError::Other("failed to get child stderr".to_string()))?;
97
98 self.child = Some(child);
99
100 self.stdin = Some(tokio::spawn(handle_stdin(stdin, process_rx)));
101 self.stdout = Some(tokio::spawn(handle_stdout(stdout, output_tx)));
102 self.stderr = Some(tokio::spawn(handle_stderr(stderr)));
103
104 Ok(())
105 }
106
107 pub async fn shutdown(&mut self) {
109 if let Some(mut child) = self.child.take() {
110 log::debug!("stopping child process...");
111 if let Err(e) = child.kill().await {
112 log::error!("failed to stop child process: {}", e);
113 }
114 if let Err(e) = child.wait().await {
115 log::error!("error waiting for child process to exit: {}", e);
116 }
117 log::debug!("child process stopped");
118 self.abort_io_handlers();
119 }
120 }
121
122 pub fn abort_io_handlers(&mut self) {
123 log::debug!("stopping io handler tasks...");
124 if let (Some(input), Some(output), Some(err)) =
125 (&mut self.stdin, &mut self.stdout, &mut self.stderr)
126 {
127 input.abort();
128 output.abort();
129 err.abort();
130 log::debug!("io handler tasks stopped");
131 } else {
132 log::debug!("missing io handler; unable to abort")
133 }
134 }
135}