1use async_trait::async_trait;
4use std::fmt::Debug;
5use std::sync::Arc;
6
7use crate::codec::{BincodeCodec, Codec};
8use crate::error::TransportResult;
9use crate::message::Message;
10use crate::transport::{FrameTransport, TransportStats};
11
12#[async_trait]
14pub trait MessageChannel<C: Codec = BincodeCodec>: Send + Sync + Debug {
15 async fn send(&self, message: &Message<C>) -> TransportResult<()>;
17
18 async fn recv(&self) -> TransportResult<Message<C>>;
20
21 fn is_connected(&self) -> bool;
23
24 fn is_healthy(&self) -> bool {
26 self.is_connected()
27 }
28
29 async fn close(&self) -> TransportResult<()>;
31
32 fn stats(&self) -> Option<TransportStats> {
34 None
35 }
36}
37
38#[derive(Debug)]
40pub struct MessageChannelAdapter<F: FrameTransport, C: Codec = BincodeCodec> {
41 inner: F,
42 _codec: std::marker::PhantomData<C>,
43}
44
45impl<F: FrameTransport> MessageChannelAdapter<F, BincodeCodec> {
46 pub fn new(transport: F) -> Self {
48 Self {
49 inner: transport,
50 _codec: std::marker::PhantomData,
51 }
52 }
53}
54
55impl<F: FrameTransport, C: Codec> MessageChannelAdapter<F, C> {
56 pub fn with_codec(transport: F) -> Self {
58 Self {
59 inner: transport,
60 _codec: std::marker::PhantomData,
61 }
62 }
63
64 pub fn inner(&self) -> &F {
66 &self.inner
67 }
68
69 pub fn into_inner(self) -> F {
71 self.inner
72 }
73}
74
75#[async_trait]
76impl<F: FrameTransport, C: Codec + Default> MessageChannel<C> for MessageChannelAdapter<F, C> {
77 async fn send(&self, message: &Message<C>) -> TransportResult<()> {
78 let bytes = message.encode()?;
79 self.inner.send_frame(&bytes).await
80 }
81
82 async fn recv(&self) -> TransportResult<Message<C>> {
83 let bytes = self.inner.recv_frame().await?;
84 Message::decode(bytes)
85 }
86
87 fn is_connected(&self) -> bool {
88 self.inner.is_connected()
89 }
90
91 fn is_healthy(&self) -> bool {
92 self.inner.is_healthy()
93 }
94
95 async fn close(&self) -> TransportResult<()> {
96 self.inner.close().await
97 }
98
99 fn stats(&self) -> Option<TransportStats> {
100 self.inner.stats()
101 }
102}
103
104#[async_trait]
105impl<T: MessageChannel<C> + ?Sized, C: Codec + Default> MessageChannel<C> for Arc<T> {
106 async fn send(&self, message: &Message<C>) -> TransportResult<()> {
107 (**self).send(message).await
108 }
109
110 async fn recv(&self) -> TransportResult<Message<C>> {
111 (**self).recv().await
112 }
113
114 fn is_connected(&self) -> bool {
115 (**self).is_connected()
116 }
117
118 fn is_healthy(&self) -> bool {
119 (**self).is_healthy()
120 }
121
122 async fn close(&self) -> TransportResult<()> {
123 (**self).close().await
124 }
125
126 fn stats(&self) -> Option<TransportStats> {
127 (**self).stats()
128 }
129}
130
131#[async_trait]
132impl<T: MessageChannel<C> + ?Sized, C: Codec + Default> MessageChannel<C> for Box<T> {
133 async fn send(&self, message: &Message<C>) -> TransportResult<()> {
134 (**self).send(message).await
135 }
136
137 async fn recv(&self) -> TransportResult<Message<C>> {
138 (**self).recv().await
139 }
140
141 fn is_connected(&self) -> bool {
142 (**self).is_connected()
143 }
144
145 fn is_healthy(&self) -> bool {
146 (**self).is_healthy()
147 }
148
149 async fn close(&self) -> TransportResult<()> {
150 (**self).close().await
151 }
152
153 fn stats(&self) -> Option<TransportStats> {
154 (**self).stats()
155 }
156}
157
158#[deprecated(since = "0.2.0", note = "Use MessageChannel instead")]
160pub type MessageTransport<C> = dyn MessageChannel<C>;
161
162#[deprecated(since = "0.2.0", note = "Use MessageChannelAdapter instead")]
163pub type MessageTransportAdapter<F, C> = MessageChannelAdapter<F, C>;
164
165#[cfg(test)]
166mod tests {
167 use super::*;
168 use crate::message::types::MessageType;
169 use crate::transport::channel::{ChannelConfig, ChannelFrameTransport};
170 use serde::{Deserialize, Serialize};
171
172 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
173 struct TestRequest {
174 value: i32,
175 }
176
177 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
178 struct TestResponse {
179 result: String,
180 }
181
182 #[tokio::test]
183 async fn test_message_channel_call_reply() {
184 let config = ChannelConfig::default();
185 let (t1, t2) = ChannelFrameTransport::create_pair("test", config).unwrap();
186
187 let ch1 = MessageChannelAdapter::new(t1);
188 let ch2 = MessageChannelAdapter::new(t2);
189
190 let req = TestRequest { value: 42 };
191 let call_msg: Message = Message::call("test_method", req.clone()).unwrap();
192 let msg_id = call_msg.id;
193
194 ch1.send(&call_msg).await.unwrap();
195
196 let received = ch2.recv().await.unwrap();
197 assert_eq!(received.id, msg_id);
198 assert_eq!(received.msg_type, MessageType::Call);
199 assert_eq!(received.method, "test_method");
200
201 let received_req: TestRequest = received.deserialize_payload().unwrap();
202 assert_eq!(received_req, req);
203
204 let resp = TestResponse {
205 result: "ok".to_string(),
206 };
207 let reply_msg: Message = Message::reply(msg_id, resp.clone()).unwrap();
208 ch2.send(&reply_msg).await.unwrap();
209
210 let received_reply = ch1.recv().await.unwrap();
211 assert_eq!(received_reply.id, msg_id);
212 assert_eq!(received_reply.msg_type, MessageType::Reply);
213
214 let received_resp: TestResponse = received_reply.deserialize_payload().unwrap();
215 assert_eq!(received_resp, resp);
216 }
217
218 #[tokio::test]
219 async fn test_message_channel_with_compression() {
220 use crate::message::types::CompressionType;
221
222 let config = ChannelConfig::default();
223 let (t1, t2) = ChannelFrameTransport::create_pair("compress", config).unwrap();
224
225 let ch1 = MessageChannelAdapter::new(t1);
226 let ch2 = MessageChannelAdapter::new(t2);
227
228 let large_data = TestRequest { value: 12345 };
229 let mut msg: Message = Message::call("compressed_method", large_data.clone()).unwrap();
230 msg.metadata.compression = CompressionType::Lz4;
231
232 ch1.send(&msg).await.unwrap();
233
234 let received = ch2.recv().await.unwrap();
235 assert_eq!(received.metadata.compression, CompressionType::Lz4);
236
237 let received_data: TestRequest = received.deserialize_payload().unwrap();
238 assert_eq!(received_data, large_data);
239 }
240
241 #[tokio::test]
242 async fn test_message_channel_delegation() {
243 let config = ChannelConfig::default();
244 let (t1, _t2) = ChannelFrameTransport::create_pair("delegate", config).unwrap();
245
246 let ch1 = MessageChannelAdapter::new(t1);
247
248 assert!(ch1.is_connected());
249 assert!(ch1.is_healthy());
250 assert!(ch1.stats().is_some());
251 }
252}