1use async_trait::async_trait;
2use bytes::Bytes;
3use std::fmt::Debug;
4
5use crate::error::TransportResult;
6
7pub mod arc;
8pub mod channel;
9pub mod shared_memory;
10pub mod tcp;
11pub mod utils;
12
13#[cfg(unix)]
14pub mod unix;
15
16pub use utils::spawn_weak_loop;
17
18#[async_trait]
20pub trait FrameTransport: Send + Sync + Debug {
21 async fn send_frame(&self, data: &[u8]) -> TransportResult<()>;
23
24 async fn recv_frame(&self) -> TransportResult<Bytes>;
26
27 fn is_connected(&self) -> bool;
29
30 fn is_healthy(&self) -> bool {
32 true
33 }
34
35 async fn close(&self) -> TransportResult<()>;
37
38 fn stats(&self) -> Option<TransportStats> {
40 None
41 }
42
43 fn name(&self) -> &str {
45 "unknown"
46 }
47}
48
49#[derive(Debug, Clone, Default)]
51pub struct TransportStats {
52 pub messages_sent: u64,
53 pub messages_received: u64,
54 pub bytes_sent: u64,
55 pub bytes_received: u64,
56 pub send_errors: u64,
57 pub recv_errors: u64,
58 pub avg_latency_us: Option<u64>,
59}
60
61impl TransportStats {
62 pub fn new() -> Self {
63 Self::default()
64 }
65
66 pub fn merge(&mut self, other: &TransportStats) {
67 self.messages_sent += other.messages_sent;
68 self.messages_received += other.messages_received;
69 self.bytes_sent += other.bytes_sent;
70 self.bytes_received += other.bytes_received;
71 self.send_errors += other.send_errors;
72 self.recv_errors += other.recv_errors;
73
74 if let (Some(our_latency), Some(other_latency)) =
75 (self.avg_latency_us, other.avg_latency_us)
76 {
77 self.avg_latency_us = Some((our_latency + other_latency) / 2);
78 } else if other.avg_latency_us.is_some() {
79 self.avg_latency_us = other.avg_latency_us;
80 }
81 }
82}
83
84impl std::fmt::Display for TransportStats {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 writeln!(f, "Transport Statistics:")?;
87 writeln!(f, " Messages sent: {}", self.messages_sent)?;
88 writeln!(f, " Messages received: {}", self.messages_received)?;
89 writeln!(f, " Bytes sent: {}", self.bytes_sent)?;
90 writeln!(f, " Bytes received: {}", self.bytes_received)?;
91 writeln!(f, " Send errors: {}", self.send_errors)?;
92 writeln!(f, " Receive errors: {}", self.recv_errors)?;
93 if let Some(latency) = self.avg_latency_us {
94 writeln!(f, " Avg latency: {}μs", latency)?;
95 }
96 Ok(())
97 }
98}
99
100#[async_trait]
101impl<T: FrameTransport + ?Sized> FrameTransport for std::sync::Arc<T> {
102 async fn send_frame(&self, data: &[u8]) -> TransportResult<()> {
103 (**self).send_frame(data).await
104 }
105
106 async fn recv_frame(&self) -> TransportResult<Bytes> {
107 (**self).recv_frame().await
108 }
109
110 fn is_connected(&self) -> bool {
111 (**self).is_connected()
112 }
113
114 fn is_healthy(&self) -> bool {
115 (**self).is_healthy()
116 }
117
118 async fn close(&self) -> TransportResult<()> {
119 (**self).close().await
120 }
121
122 fn stats(&self) -> Option<TransportStats> {
123 (**self).stats()
124 }
125
126 fn name(&self) -> &str {
127 (**self).name()
128 }
129}
130
131#[async_trait]
132impl<T: FrameTransport + ?Sized> FrameTransport for Box<T> {
133 async fn send_frame(&self, data: &[u8]) -> TransportResult<()> {
134 (**self).send_frame(data).await
135 }
136
137 async fn recv_frame(&self) -> TransportResult<Bytes> {
138 (**self).recv_frame().await
139 }
140
141 fn is_connected(&self) -> bool {
142 (**self).is_connected()
143 }
144
145 fn is_healthy(&self) -> bool {
146 (**self).is_healthy()
147 }
148
149 async fn close(&self) -> TransportResult<()> {
150 (**self).close().await
151 }
152
153 fn stats(&self) -> Option<TransportStats> {
154 (**self).stats()
155 }
156
157 fn name(&self) -> &str {
158 (**self).name()
159 }
160}
161
162#[deprecated(since = "0.2.0", note = "Use FrameTransport instead")]
164pub trait Transport: FrameTransport {}
165
166#[allow(deprecated)]
167impl<T: FrameTransport> Transport for T {}