Skip to main content

noxu_rep/net/
data_channel.rs

1//! Typed protocol message channel wrapper.
2//!
3//! Wraps a raw `Channel` with protocol message serialization, providing
4//! send/receive of typed `ProtocolMessage` values and message counting
5//! statistics. This corresponds to the pattern in where protocol
6//! messages are serialized/deserialized over the underlying DataChannel.
7
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::Duration;
11
12use super::channel::Channel;
13use crate::error::Result;
14use crate::protocol::ProtocolMessage;
15
16/// A channel that sends and receives typed protocol messages.
17///
18/// Wraps a raw `Channel` and provides automatic serialization via
19/// `ProtocolMessage::encode` and `ProtocolMessage::decode`. Tracks
20/// message counts for monitoring.
21pub struct DataChannel {
22    /// The underlying raw channel.
23    inner: Arc<dyn Channel>,
24    /// Name identifying the peer node on this channel.
25    node_name: String,
26    /// Count of messages sent through this channel.
27    messages_sent: AtomicU64,
28    /// Count of messages received through this channel.
29    messages_received: AtomicU64,
30}
31
32impl DataChannel {
33    /// Create a new `DataChannel` wrapping the given raw channel.
34    ///
35    /// # Arguments
36    /// * `channel` - The underlying channel for byte transport.
37    /// * `node_name` - The name of the peer node on this channel.
38    pub fn new(channel: Arc<dyn Channel>, node_name: String) -> Self {
39        Self {
40            inner: channel,
41            node_name,
42            messages_sent: AtomicU64::new(0),
43            messages_received: AtomicU64::new(0),
44        }
45    }
46
47    /// Send a protocol message through the channel.
48    ///
49    /// The message is encoded to bytes via `ProtocolMessage::encode` and
50    /// sent through the underlying channel.
51    pub fn send_message(&self, msg: &ProtocolMessage) -> Result<()> {
52        let data = msg.encode();
53        self.inner.send(&data)?;
54        self.messages_sent.fetch_add(1, Ordering::Relaxed);
55        Ok(())
56    }
57
58    /// Receive a protocol message with a timeout.
59    ///
60    /// Blocks until a message is available or the timeout expires.
61    /// Returns `Ok(None)` on timeout.
62    pub fn receive_message(
63        &self,
64        timeout: Duration,
65    ) -> Result<Option<ProtocolMessage>> {
66        match self.inner.receive(timeout)? {
67            Some(data) => {
68                let msg = ProtocolMessage::decode(&data)?;
69                self.messages_received.fetch_add(1, Ordering::Relaxed);
70                Ok(Some(msg))
71            }
72            None => Ok(None),
73        }
74    }
75
76    /// Get the peer node name.
77    pub fn get_node_name(&self) -> &str {
78        &self.node_name
79    }
80
81    /// Get the total number of messages sent.
82    pub fn messages_sent(&self) -> u64 {
83        self.messages_sent.load(Ordering::Relaxed)
84    }
85
86    /// Get the total number of messages received.
87    pub fn messages_received(&self) -> u64 {
88        self.messages_received.load(Ordering::Relaxed)
89    }
90
91    /// Close the underlying channel.
92    pub fn close(&self) -> Result<()> {
93        self.inner.close()
94    }
95
96    /// Check if the underlying channel is open.
97    pub fn is_open(&self) -> bool {
98        self.inner.is_open()
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use super::*;
105    use crate::net::channel::LocalChannelPair;
106
107    #[test]
108    fn test_send_receive_message() {
109        let pair = LocalChannelPair::new();
110        let dc_a = DataChannel::new(Arc::new(pair.channel_a), "node_a".into());
111        let dc_b = DataChannel::new(Arc::new(pair.channel_b), "node_b".into());
112
113        let msg =
114            ProtocolMessage::Heartbeat { master_vlsn: 0, timestamp_ms: 12345 };
115        dc_a.send_message(&msg).unwrap();
116
117        let received = dc_b.receive_message(Duration::from_secs(1)).unwrap();
118        assert!(received.is_some());
119        match received.unwrap() {
120            ProtocolMessage::Heartbeat { timestamp_ms, .. } => {
121                assert_eq!(timestamp_ms, 12345)
122            }
123            other => panic!("unexpected message: {:?}", other),
124        }
125    }
126
127    #[test]
128    fn test_bidirectional_messages() {
129        let pair = LocalChannelPair::new();
130        let dc_a = DataChannel::new(Arc::new(pair.channel_a), "node_a".into());
131        let dc_b = DataChannel::new(Arc::new(pair.channel_b), "node_b".into());
132
133        let msg_a =
134            ProtocolMessage::Heartbeat { master_vlsn: 0, timestamp_ms: 100 };
135        let msg_b =
136            ProtocolMessage::Heartbeat { master_vlsn: 0, timestamp_ms: 200 };
137
138        dc_a.send_message(&msg_a).unwrap();
139        dc_b.send_message(&msg_b).unwrap();
140
141        let recv_b =
142            dc_b.receive_message(Duration::from_secs(1)).unwrap().unwrap();
143        let recv_a =
144            dc_a.receive_message(Duration::from_secs(1)).unwrap().unwrap();
145
146        match recv_b {
147            ProtocolMessage::Heartbeat { timestamp_ms, .. } => {
148                assert_eq!(timestamp_ms, 100)
149            }
150            other => panic!("unexpected: {:?}", other),
151        }
152        match recv_a {
153            ProtocolMessage::Heartbeat { timestamp_ms, .. } => {
154                assert_eq!(timestamp_ms, 200)
155            }
156            other => panic!("unexpected: {:?}", other),
157        }
158    }
159
160    #[test]
161    fn test_message_counting() {
162        let pair = LocalChannelPair::new();
163        let dc_a = DataChannel::new(Arc::new(pair.channel_a), "node_a".into());
164        let dc_b = DataChannel::new(Arc::new(pair.channel_b), "node_b".into());
165
166        assert_eq!(dc_a.messages_sent(), 0);
167        assert_eq!(dc_a.messages_received(), 0);
168
169        for i in 0..5 {
170            dc_a.send_message(&ProtocolMessage::Heartbeat {
171                master_vlsn: 0,
172                timestamp_ms: i,
173            })
174            .unwrap();
175        }
176        assert_eq!(dc_a.messages_sent(), 5);
177
178        for _ in 0..5 {
179            dc_b.receive_message(Duration::from_secs(1)).unwrap();
180        }
181        assert_eq!(dc_b.messages_received(), 5);
182    }
183
184    #[test]
185    fn test_receive_timeout() {
186        let pair = LocalChannelPair::new();
187        let dc_b = DataChannel::new(Arc::new(pair.channel_b), "node_b".into());
188
189        let result = dc_b.receive_message(Duration::from_millis(50)).unwrap();
190        assert!(result.is_none());
191        assert_eq!(dc_b.messages_received(), 0);
192    }
193
194    #[test]
195    fn test_node_name() {
196        let pair = LocalChannelPair::new();
197        let dc = DataChannel::new(Arc::new(pair.channel_a), "my_node".into());
198        assert_eq!(dc.get_node_name(), "my_node");
199    }
200
201    #[test]
202    fn test_close_and_is_open() {
203        let pair = LocalChannelPair::new();
204        let dc = DataChannel::new(Arc::new(pair.channel_a), "node".into());
205
206        assert!(dc.is_open());
207        dc.close().unwrap();
208        assert!(!dc.is_open());
209    }
210
211    #[test]
212    fn test_send_after_close_fails() {
213        let pair = LocalChannelPair::new();
214        let dc = DataChannel::new(Arc::new(pair.channel_a), "node".into());
215        dc.close().unwrap();
216
217        let result = dc.send_message(&ProtocolMessage::Heartbeat {
218            master_vlsn: 0,
219            timestamp_ms: 0,
220        });
221        assert!(result.is_err());
222    }
223}