mcp_protocol_sdk/transport/
traits.rs

1//! Transport layer traits and abstractions
2//!
3//! This module defines the core transport traits that enable MCP communication
4//! over different protocols like STDIO, HTTP, and WebSocket.
5
6use crate::core::error::McpResult;
7use crate::protocol::types::{JsonRpcNotification, JsonRpcRequest, JsonRpcResponse};
8use async_trait::async_trait;
9
10/// Transport trait for MCP clients
11///
12/// This trait defines the interface for sending requests and receiving responses
13/// in a client-side MCP connection.
14#[async_trait]
15pub trait Transport: Send + Sync {
16    /// Send a JSON-RPC request and wait for a response
17    ///
18    /// # Arguments
19    /// * `request` - The JSON-RPC request to send
20    ///
21    /// # Returns
22    /// Result containing the JSON-RPC response or an error
23    async fn send_request(&mut self, request: JsonRpcRequest) -> McpResult<JsonRpcResponse>;
24
25    /// Send a JSON-RPC notification (no response expected)
26    ///
27    /// # Arguments
28    /// * `notification` - The JSON-RPC notification to send
29    ///
30    /// # Returns
31    /// Result indicating success or an error
32    async fn send_notification(&mut self, notification: JsonRpcNotification) -> McpResult<()>;
33
34    /// Receive a notification from the server (non-blocking)
35    ///
36    /// # Returns
37    /// Result containing an optional notification or an error
38    async fn receive_notification(&mut self) -> McpResult<Option<JsonRpcNotification>>;
39
40    /// Close the transport connection
41    ///
42    /// # Returns
43    /// Result indicating success or an error
44    async fn close(&mut self) -> McpResult<()>;
45
46    /// Check if the transport is connected
47    ///
48    /// # Returns
49    /// True if the transport is connected and ready for communication
50    fn is_connected(&self) -> bool {
51        true // Default implementation - assume connected
52    }
53
54    /// Get connection information for debugging
55    ///
56    /// # Returns
57    /// String describing the connection
58    fn connection_info(&self) -> String {
59        "Unknown transport".to_string()
60    }
61}
62
63/// Server request handler function type
64pub type ServerRequestHandler = std::sync::Arc<
65    dyn Fn(
66            JsonRpcRequest,
67        ) -> std::pin::Pin<
68            Box<dyn std::future::Future<Output = McpResult<JsonRpcResponse>> + Send + 'static>,
69        > + Send
70        + Sync,
71>;
72
73/// Transport trait for MCP servers
74///
75/// This trait defines the interface for handling incoming requests and
76/// sending responses in a server-side MCP connection.
77#[async_trait]
78pub trait ServerTransport: Send + Sync {
79    /// Start the server transport and begin listening for connections
80    ///
81    /// # Returns
82    /// Result indicating success or an error
83    async fn start(&mut self) -> McpResult<()>;
84
85    /// Set the request handler that will process incoming requests
86    ///
87    /// # Arguments
88    /// * `handler` - The request handler function
89    fn set_request_handler(&mut self, handler: ServerRequestHandler);
90
91    /// Send a JSON-RPC notification to the client
92    ///
93    /// # Arguments
94    /// * `notification` - The JSON-RPC notification to send
95    ///
96    /// # Returns
97    /// Result indicating success or an error
98    async fn send_notification(&mut self, notification: JsonRpcNotification) -> McpResult<()>;
99
100    /// Stop the server transport
101    ///
102    /// # Returns
103    /// Result indicating success or an error
104    async fn stop(&mut self) -> McpResult<()>;
105
106    /// Check if the server is running
107    ///
108    /// # Returns
109    /// True if the server is running and accepting connections
110    fn is_running(&self) -> bool {
111        true // Default implementation - assume running
112    }
113
114    /// Get server information for debugging
115    ///
116    /// # Returns
117    /// String describing the server state
118    fn server_info(&self) -> String {
119        "Unknown server transport".to_string()
120    }
121}
122
123/// Transport configuration options
124#[derive(Debug, Clone)]
125pub struct TransportConfig {
126    /// Connection timeout in milliseconds
127    pub connect_timeout_ms: Option<u64>,
128    /// Read timeout in milliseconds
129    pub read_timeout_ms: Option<u64>,
130    /// Write timeout in milliseconds
131    pub write_timeout_ms: Option<u64>,
132    /// Maximum message size in bytes
133    pub max_message_size: Option<usize>,
134    /// Keep-alive interval in milliseconds
135    pub keep_alive_ms: Option<u64>,
136    /// Whether to enable compression
137    pub compression: bool,
138    /// Custom headers for HTTP-based transports
139    pub headers: std::collections::HashMap<String, String>,
140}
141
142impl Default for TransportConfig {
143    fn default() -> Self {
144        Self {
145            connect_timeout_ms: Some(30_000),         // 30 seconds
146            read_timeout_ms: Some(60_000),            // 60 seconds
147            write_timeout_ms: Some(30_000),           // 30 seconds
148            max_message_size: Some(16 * 1024 * 1024), // 16 MB
149            keep_alive_ms: Some(30_000),              // 30 seconds
150            compression: false,
151            headers: std::collections::HashMap::new(),
152        }
153    }
154}
155
156/// Connection state for transports
157#[derive(Debug, Clone, PartialEq)]
158pub enum ConnectionState {
159    /// Transport is disconnected
160    Disconnected,
161    /// Transport is connecting
162    Connecting,
163    /// Transport is connected and ready
164    Connected,
165    /// Transport is reconnecting after an error
166    Reconnecting,
167    /// Transport is closing
168    Closing,
169    /// Transport has encountered an error
170    Error(String),
171}
172
173/// Transport statistics for monitoring
174#[derive(Debug, Clone, Default)]
175pub struct TransportStats {
176    /// Number of requests sent
177    pub requests_sent: u64,
178    /// Number of responses received
179    pub responses_received: u64,
180    /// Number of notifications sent
181    pub notifications_sent: u64,
182    /// Number of notifications received
183    pub notifications_received: u64,
184    /// Number of connection errors
185    pub connection_errors: u64,
186    /// Number of protocol errors
187    pub protocol_errors: u64,
188    /// Total bytes sent
189    pub bytes_sent: u64,
190    /// Total bytes received
191    pub bytes_received: u64,
192    /// Connection uptime in milliseconds
193    pub uptime_ms: u64,
194}
195
196/// Trait for transports that support statistics
197pub trait TransportStats_: Send + Sync {
198    /// Get current transport statistics
199    fn stats(&self) -> TransportStats;
200
201    /// Reset transport statistics
202    fn reset_stats(&mut self);
203}
204
205/// Trait for transports that support reconnection
206#[async_trait]
207pub trait ReconnectableTransport: Transport {
208    /// Attempt to reconnect the transport
209    ///
210    /// # Returns
211    /// Result indicating success or an error
212    async fn reconnect(&mut self) -> McpResult<()>;
213
214    /// Set the reconnection configuration
215    ///
216    /// # Arguments
217    /// * `config` - Reconnection configuration
218    fn set_reconnect_config(&mut self, config: ReconnectConfig);
219
220    /// Get the current connection state
221    fn connection_state(&self) -> ConnectionState;
222}
223
224/// Configuration for automatic reconnection
225#[derive(Debug, Clone)]
226pub struct ReconnectConfig {
227    /// Whether automatic reconnection is enabled
228    pub enabled: bool,
229    /// Maximum number of reconnection attempts
230    pub max_attempts: Option<u32>,
231    /// Initial delay before first reconnection attempt (milliseconds)
232    pub initial_delay_ms: u64,
233    /// Maximum delay between reconnection attempts (milliseconds)
234    pub max_delay_ms: u64,
235    /// Multiplier for exponential backoff
236    pub backoff_multiplier: f64,
237    /// Jitter factor for randomizing delays (0.0 to 1.0)
238    pub jitter_factor: f64,
239}
240
241impl Default for ReconnectConfig {
242    fn default() -> Self {
243        Self {
244            enabled: true,
245            max_attempts: Some(5),
246            initial_delay_ms: 1000, // 1 second
247            max_delay_ms: 30_000,   // 30 seconds
248            backoff_multiplier: 2.0,
249            jitter_factor: 0.1,
250        }
251    }
252}
253
254/// Trait for transports that support message filtering
255pub trait FilterableTransport: Send + Sync {
256    /// Set a message filter function
257    ///
258    /// # Arguments
259    /// * `filter` - Function that returns true if message should be processed
260    fn set_message_filter(&mut self, filter: Box<dyn Fn(&JsonRpcRequest) -> bool + Send + Sync>);
261
262    /// Clear the message filter
263    fn clear_message_filter(&mut self);
264}
265
266/// Transport event for monitoring and debugging
267#[derive(Debug, Clone)]
268pub enum TransportEvent {
269    /// Connection established
270    Connected,
271    /// Connection lost
272    Disconnected,
273    /// Message sent
274    MessageSent {
275        /// Message type
276        message_type: String,
277        /// Message size in bytes
278        size: usize,
279    },
280    /// Message received
281    MessageReceived {
282        /// Message type
283        message_type: String,
284        /// Message size in bytes
285        size: usize,
286    },
287    /// Error occurred
288    Error {
289        /// Error message
290        message: String,
291    },
292}
293
294/// Trait for transports that support event listeners
295pub trait EventEmittingTransport: Send + Sync {
296    /// Add an event listener
297    ///
298    /// # Arguments
299    /// * `listener` - Event listener function
300    fn add_event_listener(&mut self, listener: Box<dyn Fn(TransportEvent) + Send + Sync>);
301
302    /// Remove all event listeners
303    fn clear_event_listeners(&mut self);
304}
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309
310    #[test]
311    fn test_transport_config_default() {
312        let config = TransportConfig::default();
313        assert_eq!(config.connect_timeout_ms, Some(30_000));
314        assert_eq!(config.read_timeout_ms, Some(60_000));
315        assert_eq!(config.max_message_size, Some(16 * 1024 * 1024));
316        assert!(!config.compression);
317    }
318
319    #[test]
320    fn test_reconnect_config_default() {
321        let config = ReconnectConfig::default();
322        assert!(config.enabled);
323        assert_eq!(config.max_attempts, Some(5));
324        assert_eq!(config.initial_delay_ms, 1000);
325        assert_eq!(config.max_delay_ms, 30_000);
326        assert_eq!(config.backoff_multiplier, 2.0);
327        assert_eq!(config.jitter_factor, 0.1);
328    }
329
330    #[test]
331    fn test_connection_state_equality() {
332        assert_eq!(ConnectionState::Connected, ConnectionState::Connected);
333        assert_eq!(ConnectionState::Disconnected, ConnectionState::Disconnected);
334        assert_ne!(ConnectionState::Connected, ConnectionState::Disconnected);
335
336        let error1 = ConnectionState::Error("test".to_string());
337        let error2 = ConnectionState::Error("test".to_string());
338        let error3 = ConnectionState::Error("other".to_string());
339        assert_eq!(error1, error2);
340        assert_ne!(error1, error3);
341    }
342
343    #[test]
344    fn test_transport_stats_default() {
345        let stats = TransportStats::default();
346        assert_eq!(stats.requests_sent, 0);
347        assert_eq!(stats.responses_received, 0);
348        assert_eq!(stats.bytes_sent, 0);
349        assert_eq!(stats.bytes_received, 0);
350    }
351}