noxu_rep/net/
data_channel.rs1use std::sync::Arc;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::Duration;
11
12use super::channel::Channel;
13use crate::error::Result;
14use crate::protocol::ProtocolMessage;
15
16pub struct DataChannel {
22 inner: Arc<dyn Channel>,
24 node_name: String,
26 messages_sent: AtomicU64,
28 messages_received: AtomicU64,
30}
31
32impl DataChannel {
33 pub fn new(channel: Arc<dyn Channel>, node_name: String) -> Self {
39 Self {
40 inner: channel,
41 node_name,
42 messages_sent: AtomicU64::new(0),
43 messages_received: AtomicU64::new(0),
44 }
45 }
46
47 pub fn send_message(&self, msg: &ProtocolMessage) -> Result<()> {
52 let data = msg.encode();
53 self.inner.send(&data)?;
54 self.messages_sent.fetch_add(1, Ordering::Relaxed);
55 Ok(())
56 }
57
58 pub fn receive_message(
63 &self,
64 timeout: Duration,
65 ) -> Result<Option<ProtocolMessage>> {
66 match self.inner.receive(timeout)? {
67 Some(data) => {
68 let msg = ProtocolMessage::decode(&data)?;
69 self.messages_received.fetch_add(1, Ordering::Relaxed);
70 Ok(Some(msg))
71 }
72 None => Ok(None),
73 }
74 }
75
76 pub fn get_node_name(&self) -> &str {
78 &self.node_name
79 }
80
81 pub fn messages_sent(&self) -> u64 {
83 self.messages_sent.load(Ordering::Relaxed)
84 }
85
86 pub fn messages_received(&self) -> u64 {
88 self.messages_received.load(Ordering::Relaxed)
89 }
90
91 pub fn close(&self) -> Result<()> {
93 self.inner.close()
94 }
95
96 pub fn is_open(&self) -> bool {
98 self.inner.is_open()
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use super::*;
105 use crate::net::channel::LocalChannelPair;
106
107 #[test]
108 fn test_send_receive_message() {
109 let pair = LocalChannelPair::new();
110 let dc_a = DataChannel::new(Arc::new(pair.channel_a), "node_a".into());
111 let dc_b = DataChannel::new(Arc::new(pair.channel_b), "node_b".into());
112
113 let msg =
114 ProtocolMessage::Heartbeat { master_vlsn: 0, timestamp_ms: 12345 };
115 dc_a.send_message(&msg).unwrap();
116
117 let received = dc_b.receive_message(Duration::from_secs(1)).unwrap();
118 assert!(received.is_some());
119 match received.unwrap() {
120 ProtocolMessage::Heartbeat { timestamp_ms, .. } => {
121 assert_eq!(timestamp_ms, 12345)
122 }
123 other => panic!("unexpected message: {:?}", other),
124 }
125 }
126
127 #[test]
128 fn test_bidirectional_messages() {
129 let pair = LocalChannelPair::new();
130 let dc_a = DataChannel::new(Arc::new(pair.channel_a), "node_a".into());
131 let dc_b = DataChannel::new(Arc::new(pair.channel_b), "node_b".into());
132
133 let msg_a =
134 ProtocolMessage::Heartbeat { master_vlsn: 0, timestamp_ms: 100 };
135 let msg_b =
136 ProtocolMessage::Heartbeat { master_vlsn: 0, timestamp_ms: 200 };
137
138 dc_a.send_message(&msg_a).unwrap();
139 dc_b.send_message(&msg_b).unwrap();
140
141 let recv_b =
142 dc_b.receive_message(Duration::from_secs(1)).unwrap().unwrap();
143 let recv_a =
144 dc_a.receive_message(Duration::from_secs(1)).unwrap().unwrap();
145
146 match recv_b {
147 ProtocolMessage::Heartbeat { timestamp_ms, .. } => {
148 assert_eq!(timestamp_ms, 100)
149 }
150 other => panic!("unexpected: {:?}", other),
151 }
152 match recv_a {
153 ProtocolMessage::Heartbeat { timestamp_ms, .. } => {
154 assert_eq!(timestamp_ms, 200)
155 }
156 other => panic!("unexpected: {:?}", other),
157 }
158 }
159
160 #[test]
161 fn test_message_counting() {
162 let pair = LocalChannelPair::new();
163 let dc_a = DataChannel::new(Arc::new(pair.channel_a), "node_a".into());
164 let dc_b = DataChannel::new(Arc::new(pair.channel_b), "node_b".into());
165
166 assert_eq!(dc_a.messages_sent(), 0);
167 assert_eq!(dc_a.messages_received(), 0);
168
169 for i in 0..5 {
170 dc_a.send_message(&ProtocolMessage::Heartbeat {
171 master_vlsn: 0,
172 timestamp_ms: i,
173 })
174 .unwrap();
175 }
176 assert_eq!(dc_a.messages_sent(), 5);
177
178 for _ in 0..5 {
179 dc_b.receive_message(Duration::from_secs(1)).unwrap();
180 }
181 assert_eq!(dc_b.messages_received(), 5);
182 }
183
184 #[test]
185 fn test_receive_timeout() {
186 let pair = LocalChannelPair::new();
187 let dc_b = DataChannel::new(Arc::new(pair.channel_b), "node_b".into());
188
189 let result = dc_b.receive_message(Duration::from_millis(50)).unwrap();
190 assert!(result.is_none());
191 assert_eq!(dc_b.messages_received(), 0);
192 }
193
194 #[test]
195 fn test_node_name() {
196 let pair = LocalChannelPair::new();
197 let dc = DataChannel::new(Arc::new(pair.channel_a), "my_node".into());
198 assert_eq!(dc.get_node_name(), "my_node");
199 }
200
201 #[test]
202 fn test_close_and_is_open() {
203 let pair = LocalChannelPair::new();
204 let dc = DataChannel::new(Arc::new(pair.channel_a), "node".into());
205
206 assert!(dc.is_open());
207 dc.close().unwrap();
208 assert!(!dc.is_open());
209 }
210
211 #[test]
212 fn test_send_after_close_fails() {
213 let pair = LocalChannelPair::new();
214 let dc = DataChannel::new(Arc::new(pair.channel_a), "node".into());
215 dc.close().unwrap();
216
217 let result = dc.send_message(&ProtocolMessage::Heartbeat {
218 master_vlsn: 0,
219 timestamp_ms: 0,
220 });
221 assert!(result.is_err());
222 }
223}