1use std::collections::VecDeque;
16use std::process::Stdio;
17use std::sync::atomic::Ordering;
18use std::sync::{Arc, Mutex as StdMutex};
19use std::time::Duration;
20
21use async_trait::async_trait;
22use bytes::Bytes;
23use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
24use tokio::process::{Child, Command};
25use tokio::sync::{Mutex as TokioMutex, mpsc};
26use tokio::time::timeout;
27use tracing::{debug, error, info, trace, warn};
28
29use crate::core::{
30 AtomicMetrics, Transport, TransportCapabilities, TransportError, TransportEvent,
31 TransportEventEmitter, TransportMessage, TransportMetrics, TransportResult, TransportState,
32 TransportType,
33};
34use turbomcp_protocol::MessageId;
35
36#[derive(Debug, Clone)]
38pub struct ChildProcessConfig {
39 pub command: String,
41
42 pub args: Vec<String>,
44
45 pub working_directory: Option<String>,
47
48 pub environment: Option<Vec<(String, String)>>,
50
51 pub startup_timeout: Duration,
53
54 pub shutdown_timeout: Duration,
56
57 pub max_message_size: usize,
59
60 pub buffer_size: usize,
62
63 pub kill_on_drop: bool,
65}
66
67impl Default for ChildProcessConfig {
68 fn default() -> Self {
69 Self {
70 command: String::new(),
71 args: Vec::new(),
72 working_directory: None,
73 environment: None,
74 startup_timeout: Duration::from_secs(30),
75 shutdown_timeout: Duration::from_secs(10),
76 max_message_size: 10 * 1024 * 1024, buffer_size: 8192,
78 kill_on_drop: true,
79 }
80 }
81}
82
83#[derive(Debug)]
94pub struct ChildProcessTransport {
95 config: ChildProcessConfig,
97
98 child: Arc<TokioMutex<Option<Child>>>,
100
101 state: Arc<StdMutex<TransportState>>,
103
104 capabilities: TransportCapabilities,
106
107 metrics: Arc<AtomicMetrics>,
109
110 event_emitter: TransportEventEmitter,
112
113 #[allow(dead_code)] outbound_queue: Arc<StdMutex<VecDeque<TransportMessage>>>,
116
117 stdin_sender: Arc<TokioMutex<Option<mpsc::Sender<String>>>>,
119 stdout_receiver: Arc<TokioMutex<Option<mpsc::Receiver<String>>>>,
120
121 _stdin_task: Arc<TokioMutex<Option<tokio::task::JoinHandle<()>>>>,
123 _stdout_task: Arc<TokioMutex<Option<tokio::task::JoinHandle<()>>>>,
124}
125
126impl ChildProcessTransport {
127 pub fn new(config: ChildProcessConfig) -> Self {
129 let capabilities = TransportCapabilities {
130 max_message_size: Some(config.max_message_size),
131 supports_streaming: false,
132 supports_compression: false,
133 supports_bidirectional: true,
134 supports_multiplexing: false,
135 compression_algorithms: Vec::new(),
136 custom: std::collections::HashMap::new(),
137 };
138
139 Self {
140 config,
141 child: Arc::new(TokioMutex::new(None)),
142 state: Arc::new(StdMutex::new(TransportState::Disconnected)),
143 capabilities,
144 metrics: Arc::new(AtomicMetrics::default()),
145 event_emitter: TransportEventEmitter::new().0,
146 outbound_queue: Arc::new(StdMutex::new(VecDeque::new())),
147 stdin_sender: Arc::new(TokioMutex::new(None)),
148 stdout_receiver: Arc::new(TokioMutex::new(None)),
149 _stdin_task: Arc::new(TokioMutex::new(None)),
150 _stdout_task: Arc::new(TokioMutex::new(None)),
151 }
152 }
153
154 async fn start_process(&self) -> TransportResult<()> {
156 if self.config.command.is_empty() {
157 return Err(TransportError::ConfigurationError(
158 "Command cannot be empty".to_string(),
159 ));
160 }
161
162 info!(
163 "Starting child process: {} {:?}",
164 self.config.command, self.config.args
165 );
166
167 let mut cmd = Command::new(&self.config.command);
169 cmd.args(&self.config.args)
170 .stdin(Stdio::piped())
171 .stdout(Stdio::piped())
172 .stderr(Stdio::piped())
173 .kill_on_drop(self.config.kill_on_drop);
174
175 if let Some(ref wd) = self.config.working_directory {
177 cmd.current_dir(wd);
178 }
179
180 if let Some(ref env) = self.config.environment {
182 for (key, value) in env {
183 cmd.env(key, value);
184 }
185 }
186
187 let mut child = cmd.spawn().map_err(|e| {
189 error!("Failed to spawn child process: {}", e);
190 TransportError::ConnectionFailed(format!("Failed to spawn process: {e}"))
191 })?;
192
193 let stdin = child.stdin.take().ok_or_else(|| {
195 TransportError::ConnectionFailed("Failed to get stdin handle".to_string())
196 })?;
197
198 let stdout = child.stdout.take().ok_or_else(|| {
199 TransportError::ConnectionFailed("Failed to get stdout handle".to_string())
200 })?;
201
202 let stderr = child.stderr.take().ok_or_else(|| {
203 TransportError::ConnectionFailed("Failed to get stderr handle".to_string())
204 })?;
205
206 let (stdin_tx, stdin_rx) = mpsc::channel::<String>(100);
208 let (stdout_tx, stdout_rx) = mpsc::channel::<String>(100);
209
210 let stdin_task = {
212 let mut writer = BufWriter::new(stdin);
213 tokio::spawn(async move {
214 let mut stdin_rx = stdin_rx;
215 while let Some(message) = stdin_rx.recv().await {
216 if let Err(e) = writer.write_all(message.as_bytes()).await {
217 error!("Failed to write to process stdin: {}", e);
218 break;
219 }
220 if let Err(e) = writer.write_all(b"\n").await {
221 error!("Failed to write newline to process stdin: {}", e);
222 break;
223 }
224 if let Err(e) = writer.flush().await {
225 error!("Failed to flush process stdin: {}", e);
226 break;
227 }
228 trace!("Sent message to child process: {}", message);
229 }
230 debug!("STDIN writer task completed");
231 })
232 };
233
234 let stdout_task = {
236 let reader = BufReader::new(stdout);
237 let max_size = self.config.max_message_size;
238 tokio::spawn(async move {
239 let mut lines = reader.lines();
240 while let Ok(Some(line)) = lines.next_line().await {
241 if line.len() > max_size {
242 warn!(
243 "Received oversized message from child process: {} bytes",
244 line.len()
245 );
246 continue;
247 }
248 trace!("Received message from child process: {}", line);
249 if stdout_tx.send(line).await.is_err() {
250 debug!("STDOUT receiver dropped, stopping reader task");
251 break;
252 }
253 }
254 debug!("STDOUT reader task completed");
255 })
256 };
257
258 let _stderr_task = {
260 let reader = BufReader::new(stderr);
261 tokio::spawn(async move {
262 let mut lines = reader.lines();
263 while let Ok(Some(line)) = lines.next_line().await {
264 debug!("Child process stderr: {}", line);
265 }
266 debug!("STDERR reader task completed");
267 })
268 };
269
270 *self.child.lock().await = Some(child);
272 *self.stdin_sender.lock().await = Some(stdin_tx);
273 *self.stdout_receiver.lock().await = Some(stdout_rx);
274 *self._stdin_task.lock().await = Some(stdin_task);
275 *self._stdout_task.lock().await = Some(stdout_task);
276
277 *self.state.lock().expect("state mutex poisoned") = TransportState::Connected;
279
280 match timeout(self.config.startup_timeout, self.wait_for_ready()).await {
282 Ok(Ok(_)) => {
283 info!("Child process started successfully");
284 self.event_emitter.emit(TransportEvent::Connected {
285 transport_type: TransportType::ChildProcess,
286 endpoint: format!("{}:{:?}", self.config.command, self.config.args),
287 });
288 Ok(())
289 }
290 Ok(Err(e)) => {
291 error!("Child process startup failed: {}", e);
292 self.stop_process().await?;
293 Err(e)
294 }
295 Err(_) => {
296 error!("Child process startup timed out");
297 self.stop_process().await?;
298 Err(TransportError::Timeout)
299 }
300 }
301 }
302
303 async fn wait_for_ready(&self) -> TransportResult<()> {
305 let mut child_guard = self.child.lock().await;
306 if let Some(ref mut child) = child_guard.as_mut() {
307 match child.try_wait() {
309 Ok(Some(status)) => {
310 error!("Child process exited early with status: {}", status);
311 return Err(TransportError::ConnectionFailed(format!(
312 "Process exited early: {status}"
313 )));
314 }
315 Ok(None) => {
316 return Ok(());
318 }
319 Err(e) => {
320 error!("Failed to check child process status: {}", e);
321 return Err(TransportError::ConnectionFailed(format!(
322 "Failed to check process status: {e}"
323 )));
324 }
325 }
326 }
327
328 Err(TransportError::ConnectionFailed(
329 "No child process".to_string(),
330 ))
331 }
332
333 async fn stop_process(&self) -> TransportResult<()> {
335 info!("Stopping child process");
336
337 *self.stdin_sender.lock().await = None;
339 *self.stdout_receiver.lock().await = None;
340
341 if let Some(mut child) = self.child.lock().await.take() {
342 if let Err(e) = child.start_kill() {
344 warn!("Failed to send kill signal to child process: {}", e);
345 }
346
347 match timeout(self.config.shutdown_timeout, child.wait()).await {
349 Ok(Ok(status)) => {
350 info!("Child process exited with status: {}", status);
351 }
352 Ok(Err(e)) => {
353 error!("Failed to wait for child process exit: {}", e);
354 }
355 Err(_) => {
356 warn!("Child process shutdown timed out, forcing kill");
357 if let Err(e) = child.kill().await {
358 error!("Failed to force kill child process: {}", e);
359 }
360 }
361 }
362 }
363
364 *self.state.lock().expect("state mutex poisoned") = TransportState::Disconnected;
366 self.event_emitter.emit(TransportEvent::Disconnected {
367 transport_type: TransportType::ChildProcess,
368 endpoint: format!("{}:{:?}", self.config.command, self.config.args),
369 reason: Some("Process stopped".to_string()),
370 });
371
372 Ok(())
373 }
374
375 pub async fn is_process_alive(&self) -> bool {
377 let mut child_guard = self.child.lock().await;
378 if let Some(ref mut child) = child_guard.as_mut() {
379 match child.try_wait() {
380 Ok(Some(_)) => false, Ok(None) => true, Err(_) => false, }
384 } else {
385 false
386 }
387 }
388}
389
390#[async_trait]
391impl Transport for ChildProcessTransport {
392 async fn connect(&self) -> TransportResult<()> {
393 match *self.state.lock().expect("state mutex poisoned") {
394 TransportState::Connected => return Ok(()),
395 TransportState::Connecting => {
396 return Err(TransportError::Internal("Already connecting".to_string()));
397 }
398 _ => {}
399 }
400
401 *self.state.lock().expect("state mutex poisoned") = TransportState::Connecting;
402 self.start_process().await
403 }
404
405 async fn disconnect(&self) -> TransportResult<()> {
406 self.stop_process().await
407 }
408
409 async fn send(&self, message: TransportMessage) -> TransportResult<()> {
410 let state = self.state.lock().expect("state mutex poisoned").clone();
411 if state != TransportState::Connected {
412 return Err(TransportError::Internal(format!(
413 "Cannot send in state: {state:?}"
414 )));
415 }
416
417 if message.payload.len() > self.config.max_message_size {
418 return Err(TransportError::Internal(format!(
419 "Message too large: {} bytes (max: {})",
420 message.payload.len(),
421 self.config.max_message_size
422 )));
423 }
424
425 let payload_str = String::from_utf8(message.payload.to_vec()).map_err(|e| {
427 TransportError::SerializationFailed(format!("Invalid UTF-8 in message payload: {e}"))
428 })?;
429
430 let stdin_sender = self.stdin_sender.lock().await;
432 if let Some(sender) = stdin_sender.as_ref() {
433 sender.send(payload_str).await.map_err(|_| {
434 error!("Failed to send message: stdin channel closed");
435 TransportError::ConnectionLost("STDIN channel closed".to_string())
436 })?;
437
438 self.metrics.messages_sent.fetch_add(1, Ordering::Relaxed);
440 self.metrics
441 .bytes_sent
442 .fetch_add(message.payload.len() as u64, Ordering::Relaxed);
443
444 trace!("Sent message via child process transport");
445 Ok(())
446 } else {
447 Err(TransportError::ConnectionLost(
448 "No stdin channel available".to_string(),
449 ))
450 }
451 }
452
453 async fn receive(&self) -> TransportResult<Option<TransportMessage>> {
454 let state = self.state.lock().expect("state mutex poisoned").clone();
455 if state != TransportState::Connected {
456 return Ok(None);
457 }
458
459 if !self.is_process_alive().await {
461 warn!("Child process died, disconnecting transport");
462 self.stop_process().await?;
463 return Ok(None);
464 }
465
466 let mut stdout_receiver = self.stdout_receiver.lock().await;
468 if let Some(ref mut receiver) = stdout_receiver.as_mut() {
469 match receiver.recv().await {
470 Some(line) => {
471 let payload = Bytes::from(line);
472 let message = TransportMessage::new(
473 MessageId::String(uuid::Uuid::new_v4().to_string()),
474 payload,
475 );
476
477 self.metrics
479 .messages_received
480 .fetch_add(1, Ordering::Relaxed);
481 self.metrics
482 .bytes_received
483 .fetch_add(message.payload.len() as u64, Ordering::Relaxed);
484
485 trace!("Received message via child process transport");
486 Ok(Some(message))
487 }
488 None => {
489 debug!("STDOUT channel disconnected");
490 Ok(None)
491 }
492 }
493 } else {
494 Ok(None)
495 }
496 }
497
498 async fn state(&self) -> TransportState {
499 self.state.lock().expect("state mutex poisoned").clone()
500 }
501
502 fn transport_type(&self) -> TransportType {
503 TransportType::ChildProcess
504 }
505
506 fn capabilities(&self) -> &TransportCapabilities {
507 &self.capabilities
508 }
509
510 async fn metrics(&self) -> TransportMetrics {
511 self.metrics.snapshot()
513 }
514}
515
516impl Drop for ChildProcessTransport {
517 fn drop(&mut self) {
518 if self.config.kill_on_drop {
519 if let Ok(mut child_guard) = self.child.try_lock()
522 && let Some(ref mut child) = child_guard.as_mut()
523 {
524 let _ = child.start_kill();
525 }
526 }
527 }
528}
529
530#[cfg(test)]
531mod tests {
532 use super::*;
533 use std::time::Duration;
534 use tokio::time::sleep;
535
536 #[tokio::test]
537 async fn test_child_process_config_default() {
538 let config = ChildProcessConfig::default();
539 assert_eq!(config.startup_timeout, Duration::from_secs(30));
540 assert_eq!(config.shutdown_timeout, Duration::from_secs(10));
541 assert_eq!(config.max_message_size, 10 * 1024 * 1024);
542 assert!(config.kill_on_drop);
543 }
544
545 #[tokio::test]
546 async fn test_child_process_transport_creation() {
547 let config = ChildProcessConfig {
548 command: "echo".to_string(),
549 args: vec!["hello".to_string()],
550 ..Default::default()
551 };
552
553 let transport = ChildProcessTransport::new(config);
554 assert_eq!(transport.state().await, TransportState::Disconnected);
555 assert_eq!(transport.transport_type(), TransportType::ChildProcess);
556 }
557
558 #[tokio::test]
559 async fn test_empty_command_error() {
560 let config = ChildProcessConfig::default();
561 let transport = ChildProcessTransport::new(config);
562
563 let result = transport.connect().await;
564 assert!(result.is_err());
565 if let Err(TransportError::ConfigurationError(msg)) = result {
566 assert!(msg.contains("Command cannot be empty"));
567 } else {
568 panic!("Expected ConfigurationError");
569 }
570 }
571
572 #[tokio::test]
574 async fn test_echo_command() {
575 let config = ChildProcessConfig {
576 command: "cat".to_string(), args: vec![],
578 startup_timeout: Duration::from_secs(5),
579 ..Default::default()
580 };
581
582 let transport = ChildProcessTransport::new(config);
583
584 if transport.connect().await.is_ok() {
586 sleep(Duration::from_millis(100)).await;
588
589 let test_message = TransportMessage::new(
591 MessageId::String("test".to_string()),
592 Bytes::from("Hello, World!"),
593 );
594 if transport.send(test_message).await.is_ok() {
595 for _ in 0..10 {
597 if let Ok(Some(_response)) = transport.receive().await {
598 break;
599 }
600 sleep(Duration::from_millis(10)).await;
601 }
602 }
603
604 let _ = transport.disconnect().await;
606 }
607 }
610}