turbomcp_transport/
child_process.rs

1//! Child Process Transport for TurboMCP
2//!
3//! This module provides a transport implementation for communicating with MCP servers
4//! running as child processes. It uses Tokio's async process management with reliable
5//! error handling, graceful shutdown, and proper STDIO stream management.
6//!
7//! # Interior Mutability Pattern
8//!
9//! This transport follows the research-backed hybrid mutex pattern:
10//!
11//! - **std::sync::Mutex** for state/queue (short-lived locks, never cross .await)
12//! - **AtomicMetrics** for lock-free counter updates (10-100x faster than Mutex)
13//! - **tokio::sync::Mutex** for child process and I/O (cross .await points)
14
15use std::collections::VecDeque;
16use std::process::Stdio;
17use std::sync::atomic::Ordering;
18use std::sync::{Arc, Mutex as StdMutex};
19use std::time::Duration;
20
21use async_trait::async_trait;
22use bytes::Bytes;
23use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
24use tokio::process::{Child, Command};
25use tokio::sync::{Mutex as TokioMutex, mpsc};
26use tokio::time::timeout;
27use tracing::{debug, error, info, trace, warn};
28
29use crate::core::{
30    AtomicMetrics, Transport, TransportCapabilities, TransportError, TransportEvent,
31    TransportEventEmitter, TransportMessage, TransportMetrics, TransportResult, TransportState,
32    TransportType,
33};
34use turbomcp_protocol::MessageId;
35
36/// Configuration for child process transport
37#[derive(Debug, Clone)]
38pub struct ChildProcessConfig {
39    /// Command to execute
40    pub command: String,
41
42    /// Arguments to pass to the command
43    pub args: Vec<String>,
44
45    /// Working directory for the process
46    pub working_directory: Option<String>,
47
48    /// Environment variables to set
49    pub environment: Option<Vec<(String, String)>>,
50
51    /// Timeout for process startup
52    pub startup_timeout: Duration,
53
54    /// Timeout for process shutdown
55    pub shutdown_timeout: Duration,
56
57    /// Maximum message size in bytes
58    pub max_message_size: usize,
59
60    /// Buffer size for STDIO streams
61    pub buffer_size: usize,
62
63    /// Whether to kill the process on drop
64    pub kill_on_drop: bool,
65}
66
67impl Default for ChildProcessConfig {
68    fn default() -> Self {
69        Self {
70            command: String::new(),
71            args: Vec::new(),
72            working_directory: None,
73            environment: None,
74            startup_timeout: Duration::from_secs(30),
75            shutdown_timeout: Duration::from_secs(10),
76            max_message_size: 10 * 1024 * 1024, // 10MB
77            buffer_size: 8192,
78            kill_on_drop: true,
79        }
80    }
81}
82
83/// Child process transport implementation
84///
85/// # Interior Mutability Architecture
86///
87/// Following research-backed 2025 Rust async best practices:
88///
89/// - `state`: std::sync::Mutex (short-lived locks, never held across .await)
90/// - `outbound_queue`: std::sync::Mutex (infrequent access, short-lived locks)
91/// - `metrics`: AtomicMetrics (lock-free counters, 10-100x faster than Mutex)
92/// - `child`/I/O: tokio::sync::Mutex (held across .await, necessary for async operations)
93#[derive(Debug)]
94pub struct ChildProcessTransport {
95    /// Process configuration (immutable after construction)
96    config: ChildProcessConfig,
97
98    /// Child process handle (tokio::sync::Mutex - crosses await boundaries)
99    child: Arc<TokioMutex<Option<Child>>>,
100
101    /// Transport state (std::sync::Mutex - never crosses await)
102    state: Arc<StdMutex<TransportState>>,
103
104    /// Transport capabilities (immutable after construction)
105    capabilities: TransportCapabilities,
106
107    /// Lock-free atomic metrics (10-100x faster than Mutex)
108    metrics: Arc<AtomicMetrics>,
109
110    /// Event emitter
111    event_emitter: TransportEventEmitter,
112
113    /// Outbound message queue (std::sync::Mutex - short-lived locks)
114    #[allow(dead_code)] // Reserved for future buffering implementation
115    outbound_queue: Arc<StdMutex<VecDeque<TransportMessage>>>,
116
117    /// STDIO communication channels (tokio::sync::Mutex - crosses await boundaries)
118    stdin_sender: Arc<TokioMutex<Option<mpsc::Sender<String>>>>,
119    stdout_receiver: Arc<TokioMutex<Option<mpsc::Receiver<String>>>>,
120
121    /// Background task handles (tokio::sync::Mutex - crosses await boundaries)
122    _stdin_task: Arc<TokioMutex<Option<tokio::task::JoinHandle<()>>>>,
123    _stdout_task: Arc<TokioMutex<Option<tokio::task::JoinHandle<()>>>>,
124}
125
126impl ChildProcessTransport {
127    /// Create a new child process transport
128    pub fn new(config: ChildProcessConfig) -> Self {
129        let capabilities = TransportCapabilities {
130            max_message_size: Some(config.max_message_size),
131            supports_streaming: false,
132            supports_compression: false,
133            supports_bidirectional: true,
134            supports_multiplexing: false,
135            compression_algorithms: Vec::new(),
136            custom: std::collections::HashMap::new(),
137        };
138
139        Self {
140            config,
141            child: Arc::new(TokioMutex::new(None)),
142            state: Arc::new(StdMutex::new(TransportState::Disconnected)),
143            capabilities,
144            metrics: Arc::new(AtomicMetrics::default()),
145            event_emitter: TransportEventEmitter::new().0,
146            outbound_queue: Arc::new(StdMutex::new(VecDeque::new())),
147            stdin_sender: Arc::new(TokioMutex::new(None)),
148            stdout_receiver: Arc::new(TokioMutex::new(None)),
149            _stdin_task: Arc::new(TokioMutex::new(None)),
150            _stdout_task: Arc::new(TokioMutex::new(None)),
151        }
152    }
153
154    /// Start the child process and set up communication channels
155    async fn start_process(&self) -> TransportResult<()> {
156        if self.config.command.is_empty() {
157            return Err(TransportError::ConfigurationError(
158                "Command cannot be empty".to_string(),
159            ));
160        }
161
162        info!(
163            "Starting child process: {} {:?}",
164            self.config.command, self.config.args
165        );
166
167        // Create the command
168        let mut cmd = Command::new(&self.config.command);
169        cmd.args(&self.config.args)
170            .stdin(Stdio::piped())
171            .stdout(Stdio::piped())
172            .stderr(Stdio::piped())
173            .kill_on_drop(self.config.kill_on_drop);
174
175        // Set working directory if specified
176        if let Some(ref wd) = self.config.working_directory {
177            cmd.current_dir(wd);
178        }
179
180        // Set environment variables if specified
181        if let Some(ref env) = self.config.environment {
182            for (key, value) in env {
183                cmd.env(key, value);
184            }
185        }
186
187        // Spawn the process
188        let mut child = cmd.spawn().map_err(|e| {
189            error!("Failed to spawn child process: {}", e);
190            TransportError::ConnectionFailed(format!("Failed to spawn process: {e}"))
191        })?;
192
193        // Get STDIO handles
194        let stdin = child.stdin.take().ok_or_else(|| {
195            TransportError::ConnectionFailed("Failed to get stdin handle".to_string())
196        })?;
197
198        let stdout = child.stdout.take().ok_or_else(|| {
199            TransportError::ConnectionFailed("Failed to get stdout handle".to_string())
200        })?;
201
202        let stderr = child.stderr.take().ok_or_else(|| {
203            TransportError::ConnectionFailed("Failed to get stderr handle".to_string())
204        })?;
205
206        // Create communication channels
207        let (stdin_tx, stdin_rx) = mpsc::channel::<String>(100);
208        let (stdout_tx, stdout_rx) = mpsc::channel::<String>(100);
209
210        // Start STDIN writer task
211        let stdin_task = {
212            let mut writer = BufWriter::new(stdin);
213            tokio::spawn(async move {
214                let mut stdin_rx = stdin_rx;
215                while let Some(message) = stdin_rx.recv().await {
216                    if let Err(e) = writer.write_all(message.as_bytes()).await {
217                        error!("Failed to write to process stdin: {}", e);
218                        break;
219                    }
220                    if let Err(e) = writer.write_all(b"\n").await {
221                        error!("Failed to write newline to process stdin: {}", e);
222                        break;
223                    }
224                    if let Err(e) = writer.flush().await {
225                        error!("Failed to flush process stdin: {}", e);
226                        break;
227                    }
228                    trace!("Sent message to child process: {}", message);
229                }
230                debug!("STDIN writer task completed");
231            })
232        };
233
234        // Start STDOUT reader task
235        let stdout_task = {
236            let reader = BufReader::new(stdout);
237            let max_size = self.config.max_message_size;
238            tokio::spawn(async move {
239                let mut lines = reader.lines();
240                while let Ok(Some(line)) = lines.next_line().await {
241                    if line.len() > max_size {
242                        warn!(
243                            "Received oversized message from child process: {} bytes",
244                            line.len()
245                        );
246                        continue;
247                    }
248                    trace!("Received message from child process: {}", line);
249                    if stdout_tx.send(line).await.is_err() {
250                        debug!("STDOUT receiver dropped, stopping reader task");
251                        break;
252                    }
253                }
254                debug!("STDOUT reader task completed");
255            })
256        };
257
258        // Start STDERR reader task for logging
259        let _stderr_task = {
260            let reader = BufReader::new(stderr);
261            tokio::spawn(async move {
262                let mut lines = reader.lines();
263                while let Ok(Some(line)) = lines.next_line().await {
264                    debug!("Child process stderr: {}", line);
265                }
266                debug!("STDERR reader task completed");
267            })
268        };
269
270        // Store handles
271        *self.child.lock().await = Some(child);
272        *self.stdin_sender.lock().await = Some(stdin_tx);
273        *self.stdout_receiver.lock().await = Some(stdout_rx);
274        *self._stdin_task.lock().await = Some(stdin_task);
275        *self._stdout_task.lock().await = Some(stdout_task);
276
277        // Update state
278        *self.state.lock().expect("state mutex poisoned") = TransportState::Connected;
279
280        // Wait for process to be ready with timeout
281        match timeout(self.config.startup_timeout, self.wait_for_ready()).await {
282            Ok(Ok(_)) => {
283                info!("Child process started successfully");
284                self.event_emitter.emit(TransportEvent::Connected {
285                    transport_type: TransportType::ChildProcess,
286                    endpoint: format!("{}:{:?}", self.config.command, self.config.args),
287                });
288                Ok(())
289            }
290            Ok(Err(e)) => {
291                error!("Child process startup failed: {}", e);
292                self.stop_process().await?;
293                Err(e)
294            }
295            Err(_) => {
296                error!("Child process startup timed out");
297                self.stop_process().await?;
298                Err(TransportError::Timeout)
299            }
300        }
301    }
302
303    /// Wait for the process to be ready by checking if it's still running
304    async fn wait_for_ready(&self) -> TransportResult<()> {
305        let mut child_guard = self.child.lock().await;
306        if let Some(ref mut child) = child_guard.as_mut() {
307            // Check if process is still running
308            match child.try_wait() {
309                Ok(Some(status)) => {
310                    error!("Child process exited early with status: {}", status);
311                    return Err(TransportError::ConnectionFailed(format!(
312                        "Process exited early: {status}"
313                    )));
314                }
315                Ok(None) => {
316                    // Process is still running, good
317                    return Ok(());
318                }
319                Err(e) => {
320                    error!("Failed to check child process status: {}", e);
321                    return Err(TransportError::ConnectionFailed(format!(
322                        "Failed to check process status: {e}"
323                    )));
324                }
325            }
326        }
327
328        Err(TransportError::ConnectionFailed(
329            "No child process".to_string(),
330        ))
331    }
332
333    /// Stop the child process gracefully
334    async fn stop_process(&self) -> TransportResult<()> {
335        info!("Stopping child process");
336
337        // Drop communication channels first
338        *self.stdin_sender.lock().await = None;
339        *self.stdout_receiver.lock().await = None;
340
341        if let Some(mut child) = self.child.lock().await.take() {
342            // Try graceful shutdown first
343            if let Err(e) = child.start_kill() {
344                warn!("Failed to send kill signal to child process: {}", e);
345            }
346
347            // Wait for process to exit with timeout
348            match timeout(self.config.shutdown_timeout, child.wait()).await {
349                Ok(Ok(status)) => {
350                    info!("Child process exited with status: {}", status);
351                }
352                Ok(Err(e)) => {
353                    error!("Failed to wait for child process exit: {}", e);
354                }
355                Err(_) => {
356                    warn!("Child process shutdown timed out, forcing kill");
357                    if let Err(e) = child.kill().await {
358                        error!("Failed to force kill child process: {}", e);
359                    }
360                }
361            }
362        }
363
364        // Update state
365        *self.state.lock().expect("state mutex poisoned") = TransportState::Disconnected;
366        self.event_emitter.emit(TransportEvent::Disconnected {
367            transport_type: TransportType::ChildProcess,
368            endpoint: format!("{}:{:?}", self.config.command, self.config.args),
369            reason: Some("Process stopped".to_string()),
370        });
371
372        Ok(())
373    }
374
375    /// Check if the child process is still running
376    pub async fn is_process_alive(&self) -> bool {
377        let mut child_guard = self.child.lock().await;
378        if let Some(ref mut child) = child_guard.as_mut() {
379            match child.try_wait() {
380                Ok(Some(_)) => false, // Process has exited
381                Ok(None) => true,     // Process is still running
382                Err(_) => false,      // Error checking status
383            }
384        } else {
385            false
386        }
387    }
388}
389
390#[async_trait]
391impl Transport for ChildProcessTransport {
392    async fn connect(&self) -> TransportResult<()> {
393        match *self.state.lock().expect("state mutex poisoned") {
394            TransportState::Connected => return Ok(()),
395            TransportState::Connecting => {
396                return Err(TransportError::Internal("Already connecting".to_string()));
397            }
398            _ => {}
399        }
400
401        *self.state.lock().expect("state mutex poisoned") = TransportState::Connecting;
402        self.start_process().await
403    }
404
405    async fn disconnect(&self) -> TransportResult<()> {
406        self.stop_process().await
407    }
408
409    async fn send(&self, message: TransportMessage) -> TransportResult<()> {
410        let state = self.state.lock().expect("state mutex poisoned").clone();
411        if state != TransportState::Connected {
412            return Err(TransportError::Internal(format!(
413                "Cannot send in state: {state:?}"
414            )));
415        }
416
417        if message.payload.len() > self.config.max_message_size {
418            return Err(TransportError::Internal(format!(
419                "Message too large: {} bytes (max: {})",
420                message.payload.len(),
421                self.config.max_message_size
422            )));
423        }
424
425        // Convert message payload to string
426        let payload_str = String::from_utf8(message.payload.to_vec()).map_err(|e| {
427            TransportError::SerializationFailed(format!("Invalid UTF-8 in message payload: {e}"))
428        })?;
429
430        // Send through stdin channel
431        let stdin_sender = self.stdin_sender.lock().await;
432        if let Some(sender) = stdin_sender.as_ref() {
433            sender.send(payload_str).await.map_err(|_| {
434                error!("Failed to send message: stdin channel closed");
435                TransportError::ConnectionLost("STDIN channel closed".to_string())
436            })?;
437
438            // Update metrics (lock-free atomic operations)
439            self.metrics.messages_sent.fetch_add(1, Ordering::Relaxed);
440            self.metrics
441                .bytes_sent
442                .fetch_add(message.payload.len() as u64, Ordering::Relaxed);
443
444            trace!("Sent message via child process transport");
445            Ok(())
446        } else {
447            Err(TransportError::ConnectionLost(
448                "No stdin channel available".to_string(),
449            ))
450        }
451    }
452
453    async fn receive(&self) -> TransportResult<Option<TransportMessage>> {
454        let state = self.state.lock().expect("state mutex poisoned").clone();
455        if state != TransportState::Connected {
456            return Ok(None);
457        }
458
459        // Check if process is still alive
460        if !self.is_process_alive().await {
461            warn!("Child process died, disconnecting transport");
462            self.stop_process().await?;
463            return Ok(None);
464        }
465
466        // Properly block and wait for messages from stdout channel
467        let mut stdout_receiver = self.stdout_receiver.lock().await;
468        if let Some(ref mut receiver) = stdout_receiver.as_mut() {
469            match receiver.recv().await {
470                Some(line) => {
471                    let payload = Bytes::from(line);
472                    let message = TransportMessage::new(
473                        MessageId::String(uuid::Uuid::new_v4().to_string()),
474                        payload,
475                    );
476
477                    // Update metrics (lock-free atomic operations)
478                    self.metrics
479                        .messages_received
480                        .fetch_add(1, Ordering::Relaxed);
481                    self.metrics
482                        .bytes_received
483                        .fetch_add(message.payload.len() as u64, Ordering::Relaxed);
484
485                    trace!("Received message via child process transport");
486                    Ok(Some(message))
487                }
488                None => {
489                    debug!("STDOUT channel disconnected");
490                    Ok(None)
491                }
492            }
493        } else {
494            Ok(None)
495        }
496    }
497
498    async fn state(&self) -> TransportState {
499        self.state.lock().expect("state mutex poisoned").clone()
500    }
501
502    fn transport_type(&self) -> TransportType {
503        TransportType::ChildProcess
504    }
505
506    fn capabilities(&self) -> &TransportCapabilities {
507        &self.capabilities
508    }
509
510    async fn metrics(&self) -> TransportMetrics {
511        // AtomicMetrics: lock-free snapshot with Ordering::Relaxed
512        self.metrics.snapshot()
513    }
514}
515
516impl Drop for ChildProcessTransport {
517    fn drop(&mut self) {
518        if self.config.kill_on_drop {
519            // Best-effort cleanup: try to lock and kill the child process
520            // Use try_lock since Drop is synchronous
521            if let Ok(mut child_guard) = self.child.try_lock()
522                && let Some(ref mut child) = child_guard.as_mut()
523            {
524                let _ = child.start_kill();
525            }
526        }
527    }
528}
529
530#[cfg(test)]
531mod tests {
532    use super::*;
533    use std::time::Duration;
534    use tokio::time::sleep;
535
536    #[tokio::test]
537    async fn test_child_process_config_default() {
538        let config = ChildProcessConfig::default();
539        assert_eq!(config.startup_timeout, Duration::from_secs(30));
540        assert_eq!(config.shutdown_timeout, Duration::from_secs(10));
541        assert_eq!(config.max_message_size, 10 * 1024 * 1024);
542        assert!(config.kill_on_drop);
543    }
544
545    #[tokio::test]
546    async fn test_child_process_transport_creation() {
547        let config = ChildProcessConfig {
548            command: "echo".to_string(),
549            args: vec!["hello".to_string()],
550            ..Default::default()
551        };
552
553        let transport = ChildProcessTransport::new(config);
554        assert_eq!(transport.state().await, TransportState::Disconnected);
555        assert_eq!(transport.transport_type(), TransportType::ChildProcess);
556    }
557
558    #[tokio::test]
559    async fn test_empty_command_error() {
560        let config = ChildProcessConfig::default();
561        let transport = ChildProcessTransport::new(config);
562
563        let result = transport.connect().await;
564        assert!(result.is_err());
565        if let Err(TransportError::ConfigurationError(msg)) = result {
566            assert!(msg.contains("Command cannot be empty"));
567        } else {
568            panic!("Expected ConfigurationError");
569        }
570    }
571
572    // Integration test with a simple command
573    #[tokio::test]
574    async fn test_echo_command() {
575        let config = ChildProcessConfig {
576            command: "cat".to_string(), // Use cat for echo-like behavior
577            args: vec![],
578            startup_timeout: Duration::from_secs(5),
579            ..Default::default()
580        };
581
582        let transport = ChildProcessTransport::new(config);
583
584        // Connect should succeed
585        if transport.connect().await.is_ok() {
586            // Give it a moment to fully initialize
587            sleep(Duration::from_millis(100)).await;
588
589            // Send a test message
590            let test_message = TransportMessage::new(
591                MessageId::String("test".to_string()),
592                Bytes::from("Hello, World!"),
593            );
594            if transport.send(test_message).await.is_ok() {
595                // Try to receive the echo
596                for _ in 0..10 {
597                    if let Ok(Some(_response)) = transport.receive().await {
598                        break;
599                    }
600                    sleep(Duration::from_millis(10)).await;
601                }
602            }
603
604            // Clean disconnect
605            let _ = transport.disconnect().await;
606        }
607        // Note: This test may fail in some CI environments where 'cat' is not available
608        // or process spawning is restricted. That's expected.
609    }
610}