1use std::collections::VecDeque;
8
9#[derive(Debug, Clone)]
13pub struct DataPacket {
14 pub node_id: u64,
16 pub port: u32,
18 pub timestamp_ms: u64,
20 pub data: Vec<u8>,
22 pub is_eos: bool,
24}
25
26impl DataPacket {
27 pub fn new(node_id: u64, port: u32, timestamp_ms: u64, data: Vec<u8>) -> Self {
29 Self {
30 node_id,
31 port,
32 timestamp_ms,
33 data,
34 is_eos: false,
35 }
36 }
37
38 pub fn eos(node_id: u64, port: u32, timestamp_ms: u64) -> Self {
40 Self {
41 node_id,
42 port,
43 timestamp_ms,
44 data: Vec::new(),
45 is_eos: true,
46 }
47 }
48
49 pub fn size_bytes(&self) -> usize {
51 self.data.len()
52 }
53
54 pub fn is_end_of_stream(&self) -> bool {
56 self.is_eos
57 }
58}
59
60#[derive(Debug)]
64pub struct PortBuffer {
65 pub node_id: u64,
67 pub port: u32,
69 pub buffer: VecDeque<DataPacket>,
71 pub max_size: usize,
73}
74
75impl PortBuffer {
76 pub fn new(node_id: u64, port: u32, max_size: usize) -> Self {
78 Self {
79 node_id,
80 port,
81 buffer: VecDeque::new(),
82 max_size,
83 }
84 }
85
86 pub fn push(&mut self, packet: DataPacket) -> bool {
88 if self.buffer.len() >= self.max_size {
89 return false;
90 }
91 self.buffer.push_back(packet);
92 true
93 }
94
95 pub fn pop(&mut self) -> Option<DataPacket> {
97 self.buffer.pop_front()
98 }
99
100 pub fn is_full(&self) -> bool {
102 self.buffer.len() >= self.max_size
103 }
104
105 pub fn len(&self) -> usize {
107 self.buffer.len()
108 }
109
110 pub fn is_empty(&self) -> bool {
112 self.buffer.is_empty()
113 }
114}
115
116#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120pub enum BackpressurePolicy {
121 Drop,
123 Block,
125 Resize,
127}
128
129impl BackpressurePolicy {
130 pub fn should_drop(&self) -> bool {
132 matches!(self, Self::Drop)
133 }
134}
135
136#[derive(Debug)]
140pub struct DataFlowController {
141 pub buffers: Vec<PortBuffer>,
143 pub policy: BackpressurePolicy,
145}
146
147impl DataFlowController {
148 pub fn new(policy: BackpressurePolicy) -> Self {
150 Self {
151 buffers: Vec::new(),
152 policy,
153 }
154 }
155
156 pub fn add_buffer(&mut self, buffer: PortBuffer) {
158 self.buffers.push(buffer);
159 }
160
161 pub fn get_buffer(&mut self, node_id: u64, port: u32) -> Option<&mut PortBuffer> {
163 self.buffers
164 .iter_mut()
165 .find(|b| b.node_id == node_id && b.port == port)
166 }
167
168 pub fn total_packets_in_flight(&self) -> usize {
170 self.buffers.iter().map(|b| b.len()).sum()
171 }
172
173 pub fn clear_buffer(&mut self, node_id: u64, port: u32) -> bool {
176 if let Some(buf) = self.get_buffer(node_id, port) {
177 buf.buffer.clear();
178 true
179 } else {
180 false
181 }
182 }
183}
184
185#[cfg(test)]
187mod tests {
188 use super::*;
189
190 #[test]
193 fn packet_size_bytes_returns_payload_length() {
194 let p = DataPacket::new(1, 0, 1000, vec![0u8; 64]);
195 assert_eq!(p.size_bytes(), 64);
196 }
197
198 #[test]
199 fn packet_is_not_eos_by_default() {
200 let p = DataPacket::new(1, 0, 0, vec![]);
201 assert!(!p.is_end_of_stream());
202 }
203
204 #[test]
205 fn eos_packet_is_end_of_stream() {
206 let p = DataPacket::eos(2, 1, 5000);
207 assert!(p.is_end_of_stream());
208 assert_eq!(p.size_bytes(), 0);
209 }
210
211 #[test]
212 fn packet_stores_timestamp() {
213 let p = DataPacket::new(3, 0, 42_000, vec![1, 2, 3]);
214 assert_eq!(p.timestamp_ms, 42_000);
215 }
216
217 #[test]
220 fn buffer_push_pop_roundtrip() {
221 let mut buf = PortBuffer::new(1, 0, 4);
222 let p = DataPacket::new(1, 0, 0, vec![9]);
223 assert!(buf.push(p));
224 let out = buf.pop().expect("pop should succeed");
225 assert_eq!(out.data, vec![9]);
226 }
227
228 #[test]
229 fn buffer_push_rejects_when_full() {
230 let mut buf = PortBuffer::new(1, 0, 2);
231 assert!(buf.push(DataPacket::new(1, 0, 0, vec![])));
232 assert!(buf.push(DataPacket::new(1, 0, 1, vec![])));
233 assert!(!buf.push(DataPacket::new(1, 0, 2, vec![]))); }
235
236 #[test]
237 fn buffer_is_full_true_at_capacity() {
238 let mut buf = PortBuffer::new(1, 0, 1);
239 buf.push(DataPacket::new(1, 0, 0, vec![]));
240 assert!(buf.is_full());
241 }
242
243 #[test]
244 fn buffer_len_reflects_queue_depth() {
245 let mut buf = PortBuffer::new(2, 0, 10);
246 for i in 0..5 {
247 buf.push(DataPacket::new(2, 0, i, vec![]));
248 }
249 assert_eq!(buf.len(), 5);
250 }
251
252 #[test]
253 fn buffer_pop_empty_returns_none() {
254 let mut buf = PortBuffer::new(1, 0, 4);
255 assert!(buf.pop().is_none());
256 }
257
258 #[test]
261 fn drop_policy_should_drop_is_true() {
262 assert!(BackpressurePolicy::Drop.should_drop());
263 }
264
265 #[test]
266 fn block_policy_should_drop_is_false() {
267 assert!(!BackpressurePolicy::Block.should_drop());
268 }
269
270 #[test]
271 fn resize_policy_should_drop_is_false() {
272 assert!(!BackpressurePolicy::Resize.should_drop());
273 }
274
275 #[test]
278 fn controller_total_packets_sums_all_buffers() {
279 let mut ctrl = DataFlowController::new(BackpressurePolicy::Drop);
280 let mut b1 = PortBuffer::new(1, 0, 8);
281 let mut b2 = PortBuffer::new(2, 0, 8);
282 b1.push(DataPacket::new(1, 0, 0, vec![]));
283 b1.push(DataPacket::new(1, 0, 1, vec![]));
284 b2.push(DataPacket::new(2, 0, 0, vec![]));
285 ctrl.add_buffer(b1);
286 ctrl.add_buffer(b2);
287 assert_eq!(ctrl.total_packets_in_flight(), 3);
288 }
289
290 #[test]
291 fn controller_get_buffer_returns_correct_buffer() {
292 let mut ctrl = DataFlowController::new(BackpressurePolicy::Block);
293 ctrl.add_buffer(PortBuffer::new(5, 2, 4));
294 assert!(ctrl.get_buffer(5, 2).is_some());
295 assert!(ctrl.get_buffer(5, 3).is_none());
296 }
297
298 #[test]
299 fn controller_clear_buffer_empties_queue() {
300 let mut ctrl = DataFlowController::new(BackpressurePolicy::Drop);
301 let mut buf = PortBuffer::new(1, 0, 8);
302 buf.push(DataPacket::new(1, 0, 0, vec![1, 2, 3]));
303 ctrl.add_buffer(buf);
304 assert!(ctrl.clear_buffer(1, 0));
305 assert_eq!(ctrl.total_packets_in_flight(), 0);
306 }
307
308 #[test]
309 fn controller_clear_buffer_false_for_missing() {
310 let mut ctrl = DataFlowController::new(BackpressurePolicy::Drop);
311 assert!(!ctrl.clear_buffer(99, 0));
312 }
313}