xrpc/transport/
mod.rs

1use async_trait::async_trait;
2use bytes::Bytes;
3use std::fmt::Debug;
4
5use crate::error::TransportResult;
6
7pub mod arc;
8pub mod channel;
9pub mod shared_memory;
10pub mod tcp;
11pub mod utils;
12
13#[cfg(unix)]
14pub mod unix;
15
16pub use utils::spawn_weak_loop;
17
18/// Low-level byte transport with framing support (Layer 1).
19#[async_trait]
20pub trait FrameTransport: Send + Sync + Debug {
21    /// Send a frame of bytes.
22    async fn send_frame(&self, data: &[u8]) -> TransportResult<()>;
23
24    /// Receive a frame of bytes.
25    async fn recv_frame(&self) -> TransportResult<Bytes>;
26
27    /// Check if the transport is connected.
28    fn is_connected(&self) -> bool;
29
30    /// Check if the transport is healthy.
31    fn is_healthy(&self) -> bool {
32        true
33    }
34
35    /// Close the transport connection.
36    async fn close(&self) -> TransportResult<()>;
37
38    /// Get transport statistics.
39    fn stats(&self) -> Option<TransportStats> {
40        None
41    }
42
43    /// Get transport name/identifier.
44    fn name(&self) -> &str {
45        "unknown"
46    }
47}
48
49/// Statistics collected by transport implementations.
50#[derive(Debug, Clone, Default)]
51pub struct TransportStats {
52    pub messages_sent: u64,
53    pub messages_received: u64,
54    pub bytes_sent: u64,
55    pub bytes_received: u64,
56    pub send_errors: u64,
57    pub recv_errors: u64,
58    pub avg_latency_us: Option<u64>,
59}
60
61impl TransportStats {
62    pub fn new() -> Self {
63        Self::default()
64    }
65
66    pub fn merge(&mut self, other: &TransportStats) {
67        self.messages_sent += other.messages_sent;
68        self.messages_received += other.messages_received;
69        self.bytes_sent += other.bytes_sent;
70        self.bytes_received += other.bytes_received;
71        self.send_errors += other.send_errors;
72        self.recv_errors += other.recv_errors;
73
74        if let (Some(our_latency), Some(other_latency)) =
75            (self.avg_latency_us, other.avg_latency_us)
76        {
77            self.avg_latency_us = Some((our_latency + other_latency) / 2);
78        } else if other.avg_latency_us.is_some() {
79            self.avg_latency_us = other.avg_latency_us;
80        }
81    }
82}
83
84impl std::fmt::Display for TransportStats {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        writeln!(f, "Transport Statistics:")?;
87        writeln!(f, "  Messages sent:     {}", self.messages_sent)?;
88        writeln!(f, "  Messages received: {}", self.messages_received)?;
89        writeln!(f, "  Bytes sent:        {}", self.bytes_sent)?;
90        writeln!(f, "  Bytes received:    {}", self.bytes_received)?;
91        writeln!(f, "  Send errors:       {}", self.send_errors)?;
92        writeln!(f, "  Receive errors:    {}", self.recv_errors)?;
93        if let Some(latency) = self.avg_latency_us {
94            writeln!(f, "  Avg latency:       {}μs", latency)?;
95        }
96        Ok(())
97    }
98}
99
100#[async_trait]
101impl<T: FrameTransport + ?Sized> FrameTransport for std::sync::Arc<T> {
102    async fn send_frame(&self, data: &[u8]) -> TransportResult<()> {
103        (**self).send_frame(data).await
104    }
105
106    async fn recv_frame(&self) -> TransportResult<Bytes> {
107        (**self).recv_frame().await
108    }
109
110    fn is_connected(&self) -> bool {
111        (**self).is_connected()
112    }
113
114    fn is_healthy(&self) -> bool {
115        (**self).is_healthy()
116    }
117
118    async fn close(&self) -> TransportResult<()> {
119        (**self).close().await
120    }
121
122    fn stats(&self) -> Option<TransportStats> {
123        (**self).stats()
124    }
125
126    fn name(&self) -> &str {
127        (**self).name()
128    }
129}
130
131#[async_trait]
132impl<T: FrameTransport + ?Sized> FrameTransport for Box<T> {
133    async fn send_frame(&self, data: &[u8]) -> TransportResult<()> {
134        (**self).send_frame(data).await
135    }
136
137    async fn recv_frame(&self) -> TransportResult<Bytes> {
138        (**self).recv_frame().await
139    }
140
141    fn is_connected(&self) -> bool {
142        (**self).is_connected()
143    }
144
145    fn is_healthy(&self) -> bool {
146        (**self).is_healthy()
147    }
148
149    async fn close(&self) -> TransportResult<()> {
150        (**self).close().await
151    }
152
153    fn stats(&self) -> Option<TransportStats> {
154        (**self).stats()
155    }
156
157    fn name(&self) -> &str {
158        (**self).name()
159    }
160}
161
162// Deprecated alias for backward compatibility
163#[deprecated(since = "0.2.0", note = "Use FrameTransport instead")]
164pub trait Transport: FrameTransport {}
165
166#[allow(deprecated)]
167impl<T: FrameTransport> Transport for T {}