xrpc/channel/
message.rs

1//! Message-level channel for RPC communication (Layer 2).
2
3use async_trait::async_trait;
4use std::fmt::Debug;
5use std::sync::Arc;
6
7use crate::codec::{BincodeCodec, Codec};
8use crate::error::TransportResult;
9use crate::message::Message;
10use crate::transport::{FrameTransport, TransportStats};
11
12/// Message-level channel for RPC communication (Layer 2).
13#[async_trait]
14pub trait MessageChannel<C: Codec = BincodeCodec>: Send + Sync + Debug {
15    /// Send an RPC message.
16    async fn send(&self, message: &Message<C>) -> TransportResult<()>;
17
18    /// Receive an RPC message.
19    async fn recv(&self) -> TransportResult<Message<C>>;
20
21    /// Check if the channel is connected.
22    fn is_connected(&self) -> bool;
23
24    /// Check if the channel is healthy.
25    fn is_healthy(&self) -> bool {
26        self.is_connected()
27    }
28
29    /// Close the channel.
30    async fn close(&self) -> TransportResult<()>;
31
32    /// Get transport statistics.
33    fn stats(&self) -> Option<TransportStats> {
34        None
35    }
36}
37
38/// Adapter that wraps a FrameTransport to provide MessageChannel functionality.
39#[derive(Debug)]
40pub struct MessageChannelAdapter<F: FrameTransport, C: Codec = BincodeCodec> {
41    inner: F,
42    _codec: std::marker::PhantomData<C>,
43}
44
45impl<F: FrameTransport> MessageChannelAdapter<F, BincodeCodec> {
46    /// Create a new adapter with default BincodeCodec.
47    pub fn new(transport: F) -> Self {
48        Self {
49            inner: transport,
50            _codec: std::marker::PhantomData,
51        }
52    }
53}
54
55impl<F: FrameTransport, C: Codec> MessageChannelAdapter<F, C> {
56    /// Create a new adapter with a specific codec.
57    pub fn with_codec(transport: F) -> Self {
58        Self {
59            inner: transport,
60            _codec: std::marker::PhantomData,
61        }
62    }
63
64    /// Get a reference to the inner transport.
65    pub fn inner(&self) -> &F {
66        &self.inner
67    }
68
69    /// Consume the adapter and return the inner transport.
70    pub fn into_inner(self) -> F {
71        self.inner
72    }
73}
74
75#[async_trait]
76impl<F: FrameTransport, C: Codec + Default> MessageChannel<C> for MessageChannelAdapter<F, C> {
77    async fn send(&self, message: &Message<C>) -> TransportResult<()> {
78        let bytes = message.encode()?;
79        self.inner.send_frame(&bytes).await
80    }
81
82    async fn recv(&self) -> TransportResult<Message<C>> {
83        let bytes = self.inner.recv_frame().await?;
84        Message::decode(bytes)
85    }
86
87    fn is_connected(&self) -> bool {
88        self.inner.is_connected()
89    }
90
91    fn is_healthy(&self) -> bool {
92        self.inner.is_healthy()
93    }
94
95    async fn close(&self) -> TransportResult<()> {
96        self.inner.close().await
97    }
98
99    fn stats(&self) -> Option<TransportStats> {
100        self.inner.stats()
101    }
102}
103
104#[async_trait]
105impl<T: MessageChannel<C> + ?Sized, C: Codec + Default> MessageChannel<C> for Arc<T> {
106    async fn send(&self, message: &Message<C>) -> TransportResult<()> {
107        (**self).send(message).await
108    }
109
110    async fn recv(&self) -> TransportResult<Message<C>> {
111        (**self).recv().await
112    }
113
114    fn is_connected(&self) -> bool {
115        (**self).is_connected()
116    }
117
118    fn is_healthy(&self) -> bool {
119        (**self).is_healthy()
120    }
121
122    async fn close(&self) -> TransportResult<()> {
123        (**self).close().await
124    }
125
126    fn stats(&self) -> Option<TransportStats> {
127        (**self).stats()
128    }
129}
130
131#[async_trait]
132impl<T: MessageChannel<C> + ?Sized, C: Codec + Default> MessageChannel<C> for Box<T> {
133    async fn send(&self, message: &Message<C>) -> TransportResult<()> {
134        (**self).send(message).await
135    }
136
137    async fn recv(&self) -> TransportResult<Message<C>> {
138        (**self).recv().await
139    }
140
141    fn is_connected(&self) -> bool {
142        (**self).is_connected()
143    }
144
145    fn is_healthy(&self) -> bool {
146        (**self).is_healthy()
147    }
148
149    async fn close(&self) -> TransportResult<()> {
150        (**self).close().await
151    }
152
153    fn stats(&self) -> Option<TransportStats> {
154        (**self).stats()
155    }
156}
157
158// Deprecated aliases for backward compatibility
159#[deprecated(since = "0.2.0", note = "Use MessageChannel instead")]
160pub type MessageTransport<C> = dyn MessageChannel<C>;
161
162#[deprecated(since = "0.2.0", note = "Use MessageChannelAdapter instead")]
163pub type MessageTransportAdapter<F, C> = MessageChannelAdapter<F, C>;
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168    use crate::message::types::MessageType;
169    use crate::transport::channel::{ChannelConfig, ChannelFrameTransport};
170    use serde::{Deserialize, Serialize};
171
172    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
173    struct TestRequest {
174        value: i32,
175    }
176
177    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
178    struct TestResponse {
179        result: String,
180    }
181
182    #[tokio::test]
183    async fn test_message_channel_call_reply() {
184        let config = ChannelConfig::default();
185        let (t1, t2) = ChannelFrameTransport::create_pair("test", config).unwrap();
186
187        let ch1 = MessageChannelAdapter::new(t1);
188        let ch2 = MessageChannelAdapter::new(t2);
189
190        let req = TestRequest { value: 42 };
191        let call_msg: Message = Message::call("test_method", req.clone()).unwrap();
192        let msg_id = call_msg.id;
193
194        ch1.send(&call_msg).await.unwrap();
195
196        let received = ch2.recv().await.unwrap();
197        assert_eq!(received.id, msg_id);
198        assert_eq!(received.msg_type, MessageType::Call);
199        assert_eq!(received.method, "test_method");
200
201        let received_req: TestRequest = received.deserialize_payload().unwrap();
202        assert_eq!(received_req, req);
203
204        let resp = TestResponse {
205            result: "ok".to_string(),
206        };
207        let reply_msg: Message = Message::reply(msg_id, resp.clone()).unwrap();
208        ch2.send(&reply_msg).await.unwrap();
209
210        let received_reply = ch1.recv().await.unwrap();
211        assert_eq!(received_reply.id, msg_id);
212        assert_eq!(received_reply.msg_type, MessageType::Reply);
213
214        let received_resp: TestResponse = received_reply.deserialize_payload().unwrap();
215        assert_eq!(received_resp, resp);
216    }
217
218    #[tokio::test]
219    async fn test_message_channel_with_compression() {
220        use crate::message::types::CompressionType;
221
222        let config = ChannelConfig::default();
223        let (t1, t2) = ChannelFrameTransport::create_pair("compress", config).unwrap();
224
225        let ch1 = MessageChannelAdapter::new(t1);
226        let ch2 = MessageChannelAdapter::new(t2);
227
228        let large_data = TestRequest { value: 12345 };
229        let mut msg: Message = Message::call("compressed_method", large_data.clone()).unwrap();
230        msg.metadata.compression = CompressionType::Lz4;
231
232        ch1.send(&msg).await.unwrap();
233
234        let received = ch2.recv().await.unwrap();
235        assert_eq!(received.metadata.compression, CompressionType::Lz4);
236
237        let received_data: TestRequest = received.deserialize_payload().unwrap();
238        assert_eq!(received_data, large_data);
239    }
240
241    #[tokio::test]
242    async fn test_message_channel_delegation() {
243        let config = ChannelConfig::default();
244        let (t1, _t2) = ChannelFrameTransport::create_pair("delegate", config).unwrap();
245
246        let ch1 = MessageChannelAdapter::new(t1);
247
248        assert!(ch1.is_connected());
249        assert!(ch1.is_healthy());
250        assert!(ch1.stats().is_some());
251    }
252}