turbomcp_transport/
core.rs

1//! Core transport traits, types, and errors.
2//!
3//! This module defines the fundamental abstractions for sending and receiving MCP messages
4//! over different communication protocols. The central piece is the [`Transport`] trait,
5//! which provides a generic interface for all transport implementations.
6
7use std::collections::HashMap;
8use std::fmt;
9use std::time::Duration;
10
11use async_trait::async_trait;
12use bytes::Bytes;
13use futures::{Sink, Stream};
14use serde::{Deserialize, Serialize};
15use thiserror::Error;
16use tokio::sync::mpsc;
17use turbomcp_protocol::MessageId;
18
19/// A specialized `Result` type for transport operations.
20pub type TransportResult<T> = std::result::Result<T, TransportError>;
21
22/// Represents errors that can occur during transport operations.
23#[derive(Error, Debug, Clone)]
24pub enum TransportError {
25    /// Failed to establish a connection.
26    #[error("Connection failed: {0}")]
27    ConnectionFailed(String),
28
29    /// An established connection was lost.
30    #[error("Connection lost: {0}")]
31    ConnectionLost(String),
32
33    /// Failed to send a message.
34    #[error("Send failed: {0}")]
35    SendFailed(String),
36
37    /// Failed to receive a message.
38    #[error("Receive failed: {0}")]
39    ReceiveFailed(String),
40
41    /// Failed to serialize or deserialize a message.
42    #[error("Serialization failed: {0}")]
43    SerializationFailed(String),
44
45    /// A protocol-level error occurred.
46    #[error("Protocol error: {0}")]
47    ProtocolError(String),
48
49    /// The operation did not complete within the specified timeout.
50    #[error("Operation timed out")]
51    Timeout,
52
53    /// Connection establishment timed out.
54    ///
55    /// This error occurs when the TCP/TLS handshake takes too long to complete.
56    /// Consider checking network connectivity or increasing the connect timeout.
57    #[error(
58        "Connection timed out after {timeout:?} for operation: {operation}. \
59         If this is expected, increase the timeout with \
60         `TimeoutConfig {{ connect: Duration::from_secs({}) }}`",
61        timeout.as_secs() * 2
62    )]
63    ConnectionTimeout {
64        /// The operation that timed out
65        operation: String,
66        /// The timeout duration that was exceeded
67        timeout: Duration,
68    },
69
70    /// Single request timed out.
71    ///
72    /// This error occurs when a single request-response cycle takes too long.
73    /// For slow operations like LLM sampling, consider using `TimeoutConfig::patient()`.
74    #[error(
75        "Request timed out after {timeout:?} for operation: {operation}. \
76         If this is expected, increase the timeout with \
77         `TimeoutConfig {{ request: Some(Duration::from_secs({})) }}` \
78         or use `TimeoutConfig::patient()` for slow operations",
79        timeout.as_secs() * 2
80    )]
81    RequestTimeout {
82        /// The operation that timed out
83        operation: String,
84        /// The timeout duration that was exceeded
85        timeout: Duration,
86    },
87
88    /// Total operation timed out (including retries).
89    ///
90    /// This error occurs when the entire operation, including all retries,
91    /// takes too long. This timeout is broader than request timeout.
92    #[error(
93        "Total operation timed out after {timeout:?} for operation: {operation}. \
94         This includes retries. If this is expected, increase the timeout with \
95         `TimeoutConfig {{ total: Some(Duration::from_secs({})) }}`",
96        timeout.as_secs() * 2
97    )]
98    TotalTimeout {
99        /// The operation that timed out
100        operation: String,
101        /// The timeout duration that was exceeded
102        timeout: Duration,
103    },
104
105    /// Read operation timed out (streaming).
106    ///
107    /// This error occurs when reading a chunk from a streaming response takes too long.
108    /// For slow streaming operations, consider using `TimeoutConfig::patient()`.
109    #[error(
110        "Read timed out after {timeout:?} while streaming response for operation: {operation}. \
111         If this is expected, increase the timeout with \
112         `TimeoutConfig {{ read: Some(Duration::from_secs({})) }}`",
113        timeout.as_secs() * 2
114    )]
115    ReadTimeout {
116        /// The operation that timed out
117        operation: String,
118        /// The timeout duration that was exceeded
119        timeout: Duration,
120    },
121
122    /// The transport was configured with invalid parameters.
123    #[error("Configuration error: {0}")]
124    ConfigurationError(String),
125
126    /// Authentication with the remote endpoint failed.
127    #[error("Authentication failed: {0}")]
128    AuthenticationFailed(String),
129
130    /// The request was rejected due to rate limiting.
131    #[error("Rate limit exceeded")]
132    RateLimitExceeded,
133
134    /// The requested transport is not available.
135    #[error("Transport not available: {0}")]
136    NotAvailable(String),
137
138    /// An underlying I/O error occurred.
139    #[error("IO error: {0}")]
140    Io(String),
141
142    /// An unexpected internal error occurred.
143    #[error("Internal error: {0}")]
144    Internal(String),
145
146    /// Request size exceeds the configured maximum limit.
147    ///
148    /// This error protects against memory exhaustion attacks by rejecting
149    /// requests that are too large. If you need to send larger requests,
150    /// increase the limit with `LimitsConfig::max_request_size`.
151    #[error(
152        "Request size ({size} bytes) exceeds maximum allowed ({max} bytes). \
153         If this is expected, increase the limit with \
154         `LimitsConfig {{ max_request_size: Some({}) }}` or use `LimitsConfig::unlimited()` \
155         if running behind an API gateway.",
156        size
157    )]
158    RequestTooLarge {
159        /// The actual size of the request in bytes
160        size: usize,
161        /// The maximum allowed size in bytes
162        max: usize,
163    },
164
165    /// Response size exceeds the configured maximum limit.
166    ///
167    /// This error protects against memory exhaustion attacks by rejecting
168    /// responses that are too large. If you need to receive larger responses,
169    /// increase the limit with `LimitsConfig::max_response_size`.
170    #[error(
171        "Response size ({size} bytes) exceeds maximum allowed ({max} bytes). \
172         If this is expected, increase the limit with \
173         `LimitsConfig {{ max_response_size: Some({}) }}` or use `LimitsConfig::unlimited()` \
174         if running behind an API gateway.",
175        size
176    )]
177    ResponseTooLarge {
178        /// The actual size of the response in bytes
179        size: usize,
180        /// The maximum allowed size in bytes
181        max: usize,
182    },
183}
184
185/// Enumerates the types of transports supported by the system.
186#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
187#[serde(rename_all = "lowercase")]
188pub enum TransportType {
189    /// Standard Input/Output, for command-line servers.
190    Stdio,
191    /// HTTP, including Server-Sent Events (SSE).
192    Http,
193    /// WebSocket for full-duplex communication.
194    WebSocket,
195    /// TCP sockets for network communication.
196    Tcp,
197    /// Unix domain sockets for local inter-process communication.
198    Unix,
199    /// A transport that manages a child process.
200    ChildProcess,
201    /// gRPC for high-performance RPC.
202    #[cfg(feature = "grpc")]
203    Grpc,
204    /// QUIC for a modern, multiplexed transport.
205    #[cfg(feature = "quic")]
206    Quic,
207}
208
209/// Represents the current state of a transport connection.
210#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
211pub enum TransportState {
212    /// The transport is not connected.
213    Disconnected,
214    /// The transport is in the process of connecting.
215    Connecting,
216    /// The transport is connected and ready to send/receive messages.
217    Connected,
218    /// The transport is in the process of disconnecting.
219    Disconnecting,
220    /// The transport has encountered an unrecoverable error.
221    Failed {
222        /// A description of the failure reason.
223        reason: String,
224    },
225}
226
227/// Describes the capabilities of a transport implementation.
228#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
229pub struct TransportCapabilities {
230    /// The maximum message size in bytes that the transport can handle.
231    pub max_message_size: Option<usize>,
232
233    /// Whether the transport supports message compression.
234    pub supports_compression: bool,
235
236    /// Whether the transport supports streaming data.
237    pub supports_streaming: bool,
238
239    /// Whether the transport supports full-duplex bidirectional communication.
240    pub supports_bidirectional: bool,
241
242    /// Whether the transport can handle multiple concurrent requests over a single connection.
243    pub supports_multiplexing: bool,
244
245    /// A list of supported compression algorithms.
246    pub compression_algorithms: Vec<String>,
247
248    /// A map for any other custom capabilities.
249    pub custom: HashMap<String, serde_json::Value>,
250}
251
252/// Configuration for a transport instance.
253#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct TransportConfig {
255    /// The type of the transport.
256    pub transport_type: TransportType,
257
258    /// The maximum time to wait for a connection to be established.
259    pub connect_timeout: Duration,
260
261    /// The maximum time to wait for a read operation to complete.
262    pub read_timeout: Option<Duration>,
263
264    /// The maximum time to wait for a write operation to complete.
265    pub write_timeout: Option<Duration>,
266
267    /// The interval for sending keep-alive messages to maintain the connection.
268    pub keep_alive: Option<Duration>,
269
270    /// The maximum number of concurrent connections allowed.
271    pub max_connections: Option<usize>,
272
273    /// Whether to enable message compression.
274    pub compression: bool,
275
276    /// The preferred compression algorithm to use.
277    pub compression_algorithm: Option<String>,
278
279    /// Size limits for requests and responses (v2.2.0+).
280    ///
281    /// By default, enforces 10MB response limit and 1MB request limit
282    /// to prevent memory exhaustion attacks.
283    #[serde(default)]
284    pub limits: crate::config::LimitsConfig,
285
286    /// Timeout configuration for operations (v2.2.0+).
287    ///
288    /// By default, enforces balanced timeouts (30s connect, 60s request, 120s total)
289    /// to prevent hanging requests and resource exhaustion.
290    #[serde(default)]
291    pub timeouts: crate::config::TimeoutConfig,
292
293    /// TLS/HTTPS configuration (v2.2.0+).
294    ///
295    /// By default, uses TLS 1.2 for backward compatibility in v2.2.0.
296    /// Use `TlsConfig::modern()` for TLS 1.3 (recommended).
297    ///
298    /// This configuration applies to HTTP and WebSocket transports.
299    #[serde(default)]
300    pub tls: crate::config::TlsConfig,
301
302    /// A map for any other custom configuration.
303    pub custom: HashMap<String, serde_json::Value>,
304}
305
306/// A wrapper for a message being sent or received over a transport.
307#[derive(Debug, Clone)]
308pub struct TransportMessage {
309    /// The unique identifier of the message.
310    pub id: MessageId,
311
312    /// The binary payload of the message.
313    pub payload: Bytes,
314
315    /// Metadata associated with the message.
316    pub metadata: TransportMessageMetadata,
317}
318
319/// Metadata associated with a `TransportMessage`.
320#[derive(Debug, Clone, Default, Serialize, Deserialize)]
321pub struct TransportMessageMetadata {
322    /// The encoding of the message payload (e.g., "gzip").
323    pub encoding: Option<String>,
324
325    /// The MIME type of the message payload (e.g., "application/json").
326    pub content_type: Option<String>,
327
328    /// An ID used to correlate requests and responses.
329    pub correlation_id: Option<String>,
330
331    /// A map of custom headers.
332    pub headers: HashMap<String, String>,
333
334    /// The priority of the message (higher numbers indicate higher priority).
335    pub priority: Option<u8>,
336
337    /// The time-to-live for the message, in milliseconds.
338    pub ttl: Option<u64>,
339
340    /// A marker indicating that this is a heartbeat message.
341    pub is_heartbeat: Option<bool>,
342}
343
344/// A serializable snapshot of a transport's performance metrics.
345///
346/// This struct provides a consistent view of metrics for external monitoring.
347/// For internal, high-performance updates, `AtomicMetrics` is preferred.
348///
349/// # Custom Transport Metrics
350/// Transport implementations can store custom metrics in the `metadata` field.
351/// ```
352/// # use turbomcp_transport::core::TransportMetrics;
353/// # use serde_json::json;
354/// let mut metrics = TransportMetrics::default();
355/// metrics.metadata.insert("active_correlations".to_string(), json!(42));
356/// ```
357#[derive(Debug, Clone, Default, Serialize, Deserialize)]
358pub struct TransportMetrics {
359    /// Total number of bytes sent.
360    pub bytes_sent: u64,
361
362    /// Total number of bytes received.
363    pub bytes_received: u64,
364
365    /// Total number of messages sent.
366    pub messages_sent: u64,
367
368    /// Total number of messages received.
369    pub messages_received: u64,
370
371    /// Total number of connection attempts.
372    pub connections: u64,
373
374    /// Total number of failed connection attempts.
375    pub failed_connections: u64,
376
377    /// The average latency of operations, in milliseconds.
378    pub average_latency_ms: f64,
379
380    /// The current number of active connections.
381    pub active_connections: u64,
382
383    /// The compression ratio (uncompressed size / compressed size), if applicable.
384    pub compression_ratio: Option<f64>,
385
386    /// A map for custom, transport-specific metrics.
387    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
388    pub metadata: HashMap<String, serde_json::Value>,
389}
390
391/// A lock-free, atomic structure for high-performance metrics updates.
392///
393/// This struct uses `AtomicU64` for all counters, which is significantly faster
394/// than using mutexes for simple counter updates.
395///
396/// # Performance
397/// - Lock-free increments and decrements.
398/// - No contention on updates.
399/// - Uses `Ordering::Relaxed` for maximum performance where strict ordering is not required.
400#[derive(Debug)]
401pub struct AtomicMetrics {
402    /// Total bytes sent (atomic counter).
403    pub bytes_sent: std::sync::atomic::AtomicU64,
404
405    /// Total bytes received (atomic counter).
406    pub bytes_received: std::sync::atomic::AtomicU64,
407
408    /// Total messages sent (atomic counter).
409    pub messages_sent: std::sync::atomic::AtomicU64,
410
411    /// Total messages received (atomic counter).
412    pub messages_received: std::sync::atomic::AtomicU64,
413
414    /// Total connection attempts (atomic counter).
415    pub connections: std::sync::atomic::AtomicU64,
416
417    /// Failed connection attempts (atomic counter).
418    pub failed_connections: std::sync::atomic::AtomicU64,
419
420    /// Current active connections (atomic counter).
421    pub active_connections: std::sync::atomic::AtomicU64,
422
423    /// The average latency, stored as an exponential moving average in microseconds.
424    avg_latency_us: std::sync::atomic::AtomicU64,
425
426    /// Total bytes before compression.
427    uncompressed_bytes: std::sync::atomic::AtomicU64,
428
429    /// Total bytes after compression.
430    compressed_bytes: std::sync::atomic::AtomicU64,
431}
432
433impl Default for AtomicMetrics {
434    fn default() -> Self {
435        use std::sync::atomic::AtomicU64;
436        Self {
437            bytes_sent: AtomicU64::new(0),
438            bytes_received: AtomicU64::new(0),
439            messages_sent: AtomicU64::new(0),
440            messages_received: AtomicU64::new(0),
441            connections: AtomicU64::new(0),
442            failed_connections: AtomicU64::new(0),
443            active_connections: AtomicU64::new(0),
444            avg_latency_us: AtomicU64::new(0),
445            uncompressed_bytes: AtomicU64::new(0),
446            compressed_bytes: AtomicU64::new(0),
447        }
448    }
449}
450
451impl AtomicMetrics {
452    /// Creates a new `AtomicMetrics` instance with all counters initialized to zero.
453    pub fn new() -> Self {
454        Self::default()
455    }
456
457    /// Updates the average latency using an exponential moving average (EMA).
458    ///
459    /// This method uses an EMA with alpha = 0.1 for smooth latency tracking.
460    ///
461    /// # Arguments
462    /// * `latency_us` - The new latency measurement in microseconds.
463    pub fn update_latency_us(&self, latency_us: u64) {
464        use std::sync::atomic::Ordering;
465
466        let current = self.avg_latency_us.load(Ordering::Relaxed);
467        let new_avg = if current == 0 {
468            latency_us
469        } else {
470            // EMA with alpha = 0.1: new_avg = old_avg * 0.9 + new_value * 0.1
471            (current * 9 + latency_us) / 10
472        };
473        self.avg_latency_us.store(new_avg, Ordering::Relaxed);
474    }
475
476    /// Records compression statistics to track the compression ratio.
477    ///
478    /// # Arguments
479    /// * `uncompressed_size` - The size of the data before compression.
480    /// * `compressed_size` - The size of the data after compression.
481    pub fn record_compression(&self, uncompressed_size: u64, compressed_size: u64) {
482        use std::sync::atomic::Ordering;
483
484        self.uncompressed_bytes
485            .fetch_add(uncompressed_size, Ordering::Relaxed);
486        self.compressed_bytes
487            .fetch_add(compressed_size, Ordering::Relaxed);
488    }
489
490    /// Creates a serializable `TransportMetrics` snapshot from the current atomic values.
491    ///
492    /// This method uses `Ordering::Relaxed` for maximum performance.
493    pub fn snapshot(&self) -> TransportMetrics {
494        use std::sync::atomic::Ordering;
495
496        let avg_latency_us = self.avg_latency_us.load(Ordering::Relaxed);
497        let uncompressed = self.uncompressed_bytes.load(Ordering::Relaxed);
498        let compressed = self.compressed_bytes.load(Ordering::Relaxed);
499
500        let compression_ratio = if compressed > 0 && uncompressed > 0 {
501            Some(uncompressed as f64 / compressed as f64)
502        } else {
503            None
504        };
505
506        TransportMetrics {
507            bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
508            bytes_received: self.bytes_received.load(Ordering::Relaxed),
509            messages_sent: self.messages_sent.load(Ordering::Relaxed),
510            messages_received: self.messages_received.load(Ordering::Relaxed),
511            connections: self.connections.load(Ordering::Relaxed),
512            failed_connections: self.failed_connections.load(Ordering::Relaxed),
513            active_connections: self.active_connections.load(Ordering::Relaxed),
514            average_latency_ms: (avg_latency_us as f64) / 1000.0, // Convert μs to ms
515            compression_ratio,
516            metadata: HashMap::new(), // Empty metadata for base atomic metrics
517        }
518    }
519
520    /// Resets all atomic metric counters to zero.
521    pub fn reset(&self) {
522        use std::sync::atomic::Ordering;
523
524        self.bytes_sent.store(0, Ordering::Relaxed);
525        self.bytes_received.store(0, Ordering::Relaxed);
526        self.messages_sent.store(0, Ordering::Relaxed);
527        self.messages_received.store(0, Ordering::Relaxed);
528        self.connections.store(0, Ordering::Relaxed);
529        self.failed_connections.store(0, Ordering::Relaxed);
530        self.active_connections.store(0, Ordering::Relaxed);
531        self.avg_latency_us.store(0, Ordering::Relaxed);
532        self.uncompressed_bytes.store(0, Ordering::Relaxed);
533        self.compressed_bytes.store(0, Ordering::Relaxed);
534    }
535}
536
537/// Represents events that occur within a transport's lifecycle.
538#[derive(Debug, Clone)]
539pub enum TransportEvent {
540    /// A new connection has been established.
541    Connected {
542        /// The type of the transport that connected.
543        transport_type: TransportType,
544        /// The endpoint of the connection.
545        endpoint: String,
546    },
547
548    /// A connection has been lost.
549    Disconnected {
550        /// The type of the transport that disconnected.
551        transport_type: TransportType,
552        /// The endpoint of the connection.
553        endpoint: String,
554        /// An optional reason for the disconnection.
555        reason: Option<String>,
556    },
557
558    /// A message has been successfully sent.
559    MessageSent {
560        /// The ID of the sent message.
561        message_id: MessageId,
562        /// The size of the sent message in bytes.
563        size: usize,
564    },
565
566    /// A message has been successfully received.
567    MessageReceived {
568        /// The ID of the received message.
569        message_id: MessageId,
570        /// The size of the received message in bytes.
571        size: usize,
572    },
573
574    /// An error has occurred in the transport.
575    Error {
576        /// The error that occurred.
577        error: TransportError,
578        /// Optional additional context about the error.
579        context: Option<String>,
580    },
581
582    /// The transport's metrics have been updated.
583    MetricsUpdated {
584        /// The updated metrics snapshot.
585        metrics: TransportMetrics,
586    },
587}
588
589/// The core trait for all transport implementations.
590///
591/// This trait defines the essential, asynchronous operations for a message-based
592/// communication channel, such as connecting, disconnecting, sending, and receiving.
593#[async_trait]
594pub trait Transport: Send + Sync + std::fmt::Debug {
595    /// Returns the type of this transport.
596    fn transport_type(&self) -> TransportType;
597
598    /// Returns the capabilities of this transport.
599    fn capabilities(&self) -> &TransportCapabilities;
600
601    /// Returns the current state of the transport.
602    async fn state(&self) -> TransportState;
603
604    /// Establishes a connection to the remote endpoint.
605    async fn connect(&self) -> TransportResult<()>;
606
607    /// Closes the connection to the remote endpoint.
608    async fn disconnect(&self) -> TransportResult<()>;
609
610    /// Sends a single message over the transport.
611    async fn send(&self, message: TransportMessage) -> TransportResult<()>;
612
613    /// Receives a single message from the transport in a non-blocking way.
614    async fn receive(&self) -> TransportResult<Option<TransportMessage>>;
615
616    /// Returns a snapshot of the transport's current performance metrics.
617    async fn metrics(&self) -> TransportMetrics;
618
619    /// Returns `true` if the transport is currently in the `Connected` state.
620    async fn is_connected(&self) -> bool {
621        matches!(self.state().await, TransportState::Connected)
622    }
623
624    /// Returns the endpoint address or identifier for this transport, if applicable.
625    fn endpoint(&self) -> Option<String> {
626        None
627    }
628
629    /// Applies a new configuration to the transport.
630    async fn configure(&self, config: TransportConfig) -> TransportResult<()> {
631        // Default implementation does nothing. Transports can override this.
632        let _ = config;
633        Ok(())
634    }
635}
636
637/// A trait for transports that support full-duplex, bidirectional communication.
638///
639/// This extends the base `Transport` trait with the ability to send a request and
640/// await a correlated response.
641#[async_trait]
642pub trait BidirectionalTransport: Transport {
643    /// Sends a request message and waits for a corresponding response.
644    async fn send_request(
645        &self,
646        message: TransportMessage,
647        timeout: Option<Duration>,
648    ) -> TransportResult<TransportMessage>;
649
650    /// Starts tracking a request-response correlation.
651    async fn start_correlation(&self, correlation_id: String) -> TransportResult<()>;
652
653    /// Stops tracking a request-response correlation.
654    async fn stop_correlation(&self, correlation_id: &str) -> TransportResult<()>;
655}
656
657/// A trait for transports that support streaming data.
658#[async_trait]
659pub trait StreamingTransport: Transport {
660    /// The type of the stream used for sending messages.
661    type SendStream: Stream<Item = TransportResult<TransportMessage>> + Send + Unpin;
662
663    /// The type of the sink used for receiving messages.
664    type ReceiveStream: Sink<TransportMessage, Error = TransportError> + Send + Unpin;
665
666    /// Returns a stream for sending messages.
667    async fn send_stream(&self) -> TransportResult<Self::SendStream>;
668
669    /// Returns a sink for receiving messages.
670    async fn receive_stream(&self) -> TransportResult<Self::ReceiveStream>;
671}
672
673/// A factory for creating instances of a specific transport type.
674pub trait TransportFactory: Send + Sync + std::fmt::Debug {
675    /// Returns the type of transport this factory creates.
676    fn transport_type(&self) -> TransportType;
677
678    /// Creates a new transport instance with the given configuration.
679    fn create(&self, config: TransportConfig) -> TransportResult<Box<dyn Transport>>;
680
681    /// Returns `true` if this transport is available on the current system.
682    fn is_available(&self) -> bool {
683        true
684    }
685}
686
687/// An emitter for broadcasting `TransportEvent`s to listeners.
688#[derive(Debug, Clone)]
689pub struct TransportEventEmitter {
690    sender: mpsc::Sender<TransportEvent>,
691}
692
693impl TransportEventEmitter {
694    /// Creates a new event emitter and a corresponding receiver.
695    #[must_use]
696    pub fn new() -> (Self, mpsc::Receiver<TransportEvent>) {
697        let (sender, receiver) = mpsc::channel(500); // Bounded channel for backpressure
698        (Self { sender }, receiver)
699    }
700
701    /// Emits an event, dropping it if the channel is full to avoid blocking.
702    pub fn emit(&self, event: TransportEvent) {
703        // Use try_send for non-blocking event emission.
704        if self.sender.try_send(event).is_err() {
705            // Ignore the error if the channel is full or closed.
706        }
707    }
708
709    /// Emits a `Connected` event.
710    pub fn emit_connected(&self, transport_type: TransportType, endpoint: String) {
711        self.emit(TransportEvent::Connected {
712            transport_type,
713            endpoint,
714        });
715    }
716
717    /// Emits a `Disconnected` event.
718    pub fn emit_disconnected(
719        &self,
720        transport_type: TransportType,
721        endpoint: String,
722        reason: Option<String>,
723    ) {
724        self.emit(TransportEvent::Disconnected {
725            transport_type,
726            endpoint,
727            reason,
728        });
729    }
730
731    /// Emits a `MessageSent` event.
732    pub fn emit_message_sent(&self, message_id: MessageId, size: usize) {
733        self.emit(TransportEvent::MessageSent { message_id, size });
734    }
735
736    /// Emits a `MessageReceived` event.
737    pub fn emit_message_received(&self, message_id: MessageId, size: usize) {
738        self.emit(TransportEvent::MessageReceived { message_id, size });
739    }
740
741    /// Emits an `Error` event.
742    pub fn emit_error(&self, error: TransportError, context: Option<String>) {
743        self.emit(TransportEvent::Error { error, context });
744    }
745
746    /// Emits a `MetricsUpdated` event.
747    pub fn emit_metrics_updated(&self, metrics: TransportMetrics) {
748        self.emit(TransportEvent::MetricsUpdated { metrics });
749    }
750}
751
752impl Default for TransportEventEmitter {
753    fn default() -> Self {
754        Self::new().0
755    }
756}
757
758// Implementations for common types
759
760impl Default for TransportCapabilities {
761    fn default() -> Self {
762        Self {
763            max_message_size: Some(turbomcp_protocol::MAX_MESSAGE_SIZE),
764            supports_compression: false,
765            supports_streaming: false,
766            supports_bidirectional: true,
767            supports_multiplexing: false,
768            compression_algorithms: Vec::new(),
769            custom: HashMap::new(),
770        }
771    }
772}
773
774impl Default for TransportConfig {
775    fn default() -> Self {
776        Self {
777            transport_type: TransportType::Stdio,
778            connect_timeout: Duration::from_secs(30),
779            read_timeout: None,
780            write_timeout: None,
781            keep_alive: None,
782            max_connections: None,
783            compression: false,
784            compression_algorithm: None,
785            limits: crate::config::LimitsConfig::default(),
786            timeouts: crate::config::TimeoutConfig::default(),
787            tls: crate::config::TlsConfig::default(),
788            custom: HashMap::new(),
789        }
790    }
791}
792
793impl TransportMessage {
794    /// Creates a new `TransportMessage` with a given ID and payload.
795    ///
796    /// # Example
797    /// ```
798    /// # use turbomcp_transport::core::TransportMessage;
799    /// # use turbomcp_protocol::MessageId;
800    /// # use bytes::Bytes;
801    /// let msg = TransportMessage::new(MessageId::from(1), Bytes::from("hello"));
802    /// ```
803    pub fn new(id: MessageId, payload: Bytes) -> Self {
804        Self {
805            id,
806            payload,
807            metadata: TransportMessageMetadata::default(),
808        }
809    }
810
811    /// Creates a new `TransportMessage` with the given ID, payload, and metadata.
812    pub const fn with_metadata(
813        id: MessageId,
814        payload: Bytes,
815        metadata: TransportMessageMetadata,
816    ) -> Self {
817        Self {
818            id,
819            payload,
820            metadata,
821        }
822    }
823
824    /// Returns the size of the message payload in bytes.
825    pub const fn size(&self) -> usize {
826        self.payload.len()
827    }
828
829    /// Returns `true` if the message is compressed.
830    pub const fn is_compressed(&self) -> bool {
831        self.metadata.encoding.is_some()
832    }
833
834    /// Returns the content type of the message, if specified.
835    pub fn content_type(&self) -> Option<&str> {
836        self.metadata.content_type.as_deref()
837    }
838
839    /// Returns the correlation ID of the message, if specified.
840    pub fn correlation_id(&self) -> Option<&str> {
841        self.metadata.correlation_id.as_deref()
842    }
843}
844
845impl TransportMessageMetadata {
846    /// Creates a new `TransportMessageMetadata` with a specified content type.
847    pub fn with_content_type(content_type: impl Into<String>) -> Self {
848        Self {
849            content_type: Some(content_type.into()),
850            ..Default::default()
851        }
852    }
853
854    /// Creates a new `TransportMessageMetadata` with a specified correlation ID.
855    pub fn with_correlation_id(correlation_id: impl Into<String>) -> Self {
856        Self {
857            correlation_id: Some(correlation_id.into()),
858            ..Default::default()
859        }
860    }
861
862    /// Adds a header to the metadata using a builder pattern.
863    ///
864    /// # Example
865    /// ```
866    /// # use turbomcp_transport::core::TransportMessageMetadata;
867    /// let metadata = TransportMessageMetadata::default()
868    ///     .with_header("X-Request-ID", "123");
869    /// ```
870    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
871        self.headers.insert(key.into(), value.into());
872        self
873    }
874
875    /// Sets the priority of the message.
876    #[must_use]
877    pub const fn with_priority(mut self, priority: u8) -> Self {
878        self.priority = Some(priority);
879        self
880    }
881
882    /// Sets the time-to-live for the message.
883    #[must_use]
884    pub const fn with_ttl(mut self, ttl: Duration) -> Self {
885        self.ttl = Some(ttl.as_millis() as u64);
886        self
887    }
888
889    /// Marks the message as a heartbeat.
890    #[must_use]
891    pub const fn heartbeat(mut self) -> Self {
892        self.is_heartbeat = Some(true);
893        self
894    }
895}
896
897impl fmt::Display for TransportType {
898    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
899        match self {
900            Self::Stdio => write!(f, "stdio"),
901            Self::Http => write!(f, "http"),
902            Self::WebSocket => write!(f, "websocket"),
903            Self::Tcp => write!(f, "tcp"),
904            Self::Unix => write!(f, "unix"),
905            Self::ChildProcess => write!(f, "child_process"),
906            #[cfg(feature = "grpc")]
907            Self::Grpc => write!(f, "grpc"),
908            #[cfg(feature = "quic")]
909            Self::Quic => write!(f, "quic"),
910        }
911    }
912}
913
914impl fmt::Display for TransportState {
915    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
916        match self {
917            Self::Disconnected => write!(f, "disconnected"),
918            Self::Connecting => write!(f, "connecting"),
919            Self::Connected => write!(f, "connected"),
920            Self::Disconnecting => write!(f, "disconnecting"),
921            Self::Failed { reason } => write!(f, "failed: {reason}"),
922        }
923    }
924}
925
926impl From<std::io::Error> for TransportError {
927    fn from(err: std::io::Error) -> Self {
928        Self::Io(err.to_string())
929    }
930}
931
932impl From<serde_json::Error> for TransportError {
933    fn from(err: serde_json::Error) -> Self {
934        Self::SerializationFailed(err.to_string())
935    }
936}
937
938/// Validates that a request message size does not exceed the configured limit.
939///
940/// # Arguments
941///
942/// * `size` - The size of the request payload in bytes
943/// * `limits` - The limits configuration to check against
944///
945/// # Returns
946///
947/// `Ok(())` if the size is within limits or no limit is set, otherwise `Err(TransportError::RequestTooLarge)`
948///
949/// # Example
950///
951/// ```
952/// use turbomcp_transport::core::validate_request_size;
953/// use turbomcp_transport::config::LimitsConfig;
954///
955/// let limits = LimitsConfig::default();
956/// assert!(validate_request_size(1000, &limits).is_ok());
957/// assert!(validate_request_size(10 * 1024 * 1024, &limits).is_err());
958/// ```
959pub fn validate_request_size(
960    size: usize,
961    limits: &crate::config::LimitsConfig,
962) -> TransportResult<()> {
963    if let Some(max_size) = limits.max_request_size
964        && size > max_size
965    {
966        return Err(TransportError::RequestTooLarge {
967            size,
968            max: max_size,
969        });
970    }
971    Ok(())
972}
973
974/// Validates that a response message size does not exceed the configured limit.
975///
976/// # Arguments
977///
978/// * `size` - The size of the response payload in bytes
979/// * `limits` - The limits configuration to check against
980///
981/// # Returns
982///
983/// `Ok(())` if the size is within limits or no limit is set, otherwise `Err(TransportError::ResponseTooLarge)`
984///
985/// # Example
986///
987/// ```
988/// use turbomcp_transport::core::validate_response_size;
989/// use turbomcp_transport::config::LimitsConfig;
990///
991/// let limits = LimitsConfig::default();
992/// assert!(validate_response_size(1000, &limits).is_ok());
993/// assert!(validate_response_size(50 * 1024 * 1024, &limits).is_err());
994/// ```
995pub fn validate_response_size(
996    size: usize,
997    limits: &crate::config::LimitsConfig,
998) -> TransportResult<()> {
999    if let Some(max_size) = limits.max_response_size
1000        && size > max_size
1001    {
1002        return Err(TransportError::ResponseTooLarge {
1003            size,
1004            max: max_size,
1005        });
1006    }
1007    Ok(())
1008}
1009
1010#[cfg(test)]
1011mod tests {
1012    use super::*;
1013    // use std::sync::Arc;
1014    // use tokio_test;
1015
1016    #[test]
1017    fn test_transport_capabilities_default() {
1018        let caps = TransportCapabilities::default();
1019        assert_eq!(
1020            caps.max_message_size,
1021            Some(turbomcp_protocol::MAX_MESSAGE_SIZE)
1022        );
1023        assert!(caps.supports_bidirectional);
1024    }
1025
1026    #[test]
1027    fn test_transport_config_default() {
1028        let config = TransportConfig::default();
1029        assert_eq!(config.transport_type, TransportType::Stdio);
1030        assert_eq!(config.connect_timeout, Duration::from_secs(30));
1031    }
1032
1033    #[test]
1034    fn test_transport_message_creation() {
1035        let id = MessageId::from("test");
1036        let payload = Bytes::from("test payload");
1037        let msg = TransportMessage::new(id.clone(), payload.clone());
1038
1039        assert_eq!(msg.id, id);
1040        assert_eq!(msg.payload, payload);
1041        assert_eq!(msg.size(), 12);
1042    }
1043
1044    #[test]
1045    fn test_transport_message_metadata() {
1046        let metadata = TransportMessageMetadata::default()
1047            .with_header("custom", "value")
1048            .with_priority(5)
1049            .with_ttl(Duration::from_secs(30));
1050
1051        assert_eq!(metadata.headers.get("custom"), Some(&"value".to_string()));
1052        assert_eq!(metadata.priority, Some(5));
1053        assert_eq!(metadata.ttl, Some(30000));
1054    }
1055
1056    #[test]
1057    fn test_transport_types_display() {
1058        assert_eq!(TransportType::Stdio.to_string(), "stdio");
1059        assert_eq!(TransportType::Http.to_string(), "http");
1060        assert_eq!(TransportType::WebSocket.to_string(), "websocket");
1061        assert_eq!(TransportType::Tcp.to_string(), "tcp");
1062        assert_eq!(TransportType::Unix.to_string(), "unix");
1063    }
1064
1065    #[test]
1066    fn test_transport_state_display() {
1067        assert_eq!(TransportState::Connected.to_string(), "connected");
1068        assert_eq!(TransportState::Disconnected.to_string(), "disconnected");
1069        assert_eq!(
1070            TransportState::Failed {
1071                reason: "timeout".to_string()
1072            }
1073            .to_string(),
1074            "failed: timeout"
1075        );
1076    }
1077
1078    #[tokio::test]
1079    async fn test_transport_event_emitter() {
1080        let (emitter, mut receiver) = TransportEventEmitter::new();
1081
1082        emitter.emit_connected(TransportType::Stdio, "stdio://".to_string());
1083
1084        let event = receiver.recv().await.unwrap();
1085        match event {
1086            TransportEvent::Connected {
1087                transport_type,
1088                endpoint,
1089            } => {
1090                assert_eq!(transport_type, TransportType::Stdio);
1091                assert_eq!(endpoint, "stdio://");
1092            }
1093            other => {
1094                // Avoid panic in test to align with production error handling philosophy
1095                eprintln!("Unexpected event variant: {other:?}");
1096                assert!(
1097                    matches!(other, TransportEvent::Connected { .. }),
1098                    "Expected Connected event"
1099                );
1100            }
1101        }
1102    }
1103}