Skip to main content

oximedia_graph/
data_flow.rs

1//! Graph data flow management: buffered port queues with backpressure.
2//!
3//! This module handles the movement of [`DataPacket`]s between graph nodes
4//! through bounded [`PortBuffer`]s.  A [`DataFlowController`] coordinates
5//! all buffers and enforces a configurable backpressure [`BackpressurePolicy`].
6
7use std::collections::VecDeque;
8
9// ── DataPacket ────────────────────────────────────────────────────────────────
10
11/// A unit of data travelling between two ports in the processing graph.
12#[derive(Debug, Clone)]
13pub struct DataPacket {
14    /// ID of the node that produced this packet.
15    pub node_id: u64,
16    /// Output port index on the producing node.
17    pub port: u32,
18    /// Presentation timestamp in milliseconds.
19    pub timestamp_ms: u64,
20    /// Raw payload bytes.
21    pub data: Vec<u8>,
22    /// If `true` this is the last packet in the stream.
23    pub is_eos: bool,
24}
25
26impl DataPacket {
27    /// Creates a normal (non-EOS) data packet.
28    pub fn new(node_id: u64, port: u32, timestamp_ms: u64, data: Vec<u8>) -> Self {
29        Self {
30            node_id,
31            port,
32            timestamp_ms,
33            data,
34            is_eos: false,
35        }
36    }
37
38    /// Creates an end-of-stream sentinel packet with an empty payload.
39    pub fn eos(node_id: u64, port: u32, timestamp_ms: u64) -> Self {
40        Self {
41            node_id,
42            port,
43            timestamp_ms,
44            data: Vec::new(),
45            is_eos: true,
46        }
47    }
48
49    /// Returns the size of the payload in bytes.
50    pub fn size_bytes(&self) -> usize {
51        self.data.len()
52    }
53
54    /// Returns `true` if this is an end-of-stream packet.
55    pub fn is_end_of_stream(&self) -> bool {
56        self.is_eos
57    }
58}
59
60// ── PortBuffer ────────────────────────────────────────────────────────────────
61
62/// A bounded FIFO queue of [`DataPacket`]s associated with one (node, port) pair.
63#[derive(Debug)]
64pub struct PortBuffer {
65    /// Node ID that owns this buffer.
66    pub node_id: u64,
67    /// Port index this buffer serves.
68    pub port: u32,
69    /// Queued packets, oldest at the front.
70    pub buffer: VecDeque<DataPacket>,
71    /// Maximum number of packets this buffer may hold.
72    pub max_size: usize,
73}
74
75impl PortBuffer {
76    /// Creates a new, empty port buffer with the given capacity.
77    pub fn new(node_id: u64, port: u32, max_size: usize) -> Self {
78        Self {
79            node_id,
80            port,
81            buffer: VecDeque::new(),
82            max_size,
83        }
84    }
85
86    /// Enqueues `packet` if there is room.  Returns `true` on success.
87    pub fn push(&mut self, packet: DataPacket) -> bool {
88        if self.buffer.len() >= self.max_size {
89            return false;
90        }
91        self.buffer.push_back(packet);
92        true
93    }
94
95    /// Removes and returns the oldest packet, or `None` if empty.
96    pub fn pop(&mut self) -> Option<DataPacket> {
97        self.buffer.pop_front()
98    }
99
100    /// Returns `true` if the buffer has reached its maximum capacity.
101    pub fn is_full(&self) -> bool {
102        self.buffer.len() >= self.max_size
103    }
104
105    /// Returns the current number of packets in the buffer.
106    pub fn len(&self) -> usize {
107        self.buffer.len()
108    }
109
110    /// Returns `true` if the buffer contains no packets.
111    pub fn is_empty(&self) -> bool {
112        self.buffer.is_empty()
113    }
114}
115
116// ── BackpressurePolicy ────────────────────────────────────────────────────────
117
118/// Strategy to apply when a [`PortBuffer`] is full.
119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120pub enum BackpressurePolicy {
121    /// Silently discard the incoming packet.
122    Drop,
123    /// Signal the producer to block (caller must retry).
124    Block,
125    /// Resize the buffer to accommodate the packet.
126    Resize,
127}
128
129impl BackpressurePolicy {
130    /// Returns `true` if the policy instructs the caller to drop the packet.
131    pub fn should_drop(&self) -> bool {
132        matches!(self, Self::Drop)
133    }
134}
135
136// ── DataFlowController ────────────────────────────────────────────────────────
137
138/// Coordinates all port buffers in the graph and enforces backpressure.
139#[derive(Debug)]
140pub struct DataFlowController {
141    /// All port buffers managed by this controller.
142    pub buffers: Vec<PortBuffer>,
143    /// Backpressure strategy applied to full buffers.
144    pub policy: BackpressurePolicy,
145}
146
147impl DataFlowController {
148    /// Creates a new controller with no buffers and the given policy.
149    pub fn new(policy: BackpressurePolicy) -> Self {
150        Self {
151            buffers: Vec::new(),
152            policy,
153        }
154    }
155
156    /// Adds a new buffer to this controller.
157    pub fn add_buffer(&mut self, buffer: PortBuffer) {
158        self.buffers.push(buffer);
159    }
160
161    /// Returns a mutable reference to the buffer for `(node_id, port)`.
162    pub fn get_buffer(&mut self, node_id: u64, port: u32) -> Option<&mut PortBuffer> {
163        self.buffers
164            .iter_mut()
165            .find(|b| b.node_id == node_id && b.port == port)
166    }
167
168    /// Returns the total number of packets currently held across all buffers.
169    pub fn total_packets_in_flight(&self) -> usize {
170        self.buffers.iter().map(|b| b.len()).sum()
171    }
172
173    /// Drains the buffer for `(node_id, port)`.  Returns `true` if the buffer
174    /// existed (it may have already been empty).
175    pub fn clear_buffer(&mut self, node_id: u64, port: u32) -> bool {
176        if let Some(buf) = self.get_buffer(node_id, port) {
177            buf.buffer.clear();
178            true
179        } else {
180            false
181        }
182    }
183}
184
185// ─────────────────────────────────────────────────────────────────────────────
186#[cfg(test)]
187mod tests {
188    use super::*;
189
190    // ── DataPacket ────────────────────────────────────────────────────────────
191
192    #[test]
193    fn packet_size_bytes_returns_payload_length() {
194        let p = DataPacket::new(1, 0, 1000, vec![0u8; 64]);
195        assert_eq!(p.size_bytes(), 64);
196    }
197
198    #[test]
199    fn packet_is_not_eos_by_default() {
200        let p = DataPacket::new(1, 0, 0, vec![]);
201        assert!(!p.is_end_of_stream());
202    }
203
204    #[test]
205    fn eos_packet_is_end_of_stream() {
206        let p = DataPacket::eos(2, 1, 5000);
207        assert!(p.is_end_of_stream());
208        assert_eq!(p.size_bytes(), 0);
209    }
210
211    #[test]
212    fn packet_stores_timestamp() {
213        let p = DataPacket::new(3, 0, 42_000, vec![1, 2, 3]);
214        assert_eq!(p.timestamp_ms, 42_000);
215    }
216
217    // ── PortBuffer ────────────────────────────────────────────────────────────
218
219    #[test]
220    fn buffer_push_pop_roundtrip() {
221        let mut buf = PortBuffer::new(1, 0, 4);
222        let p = DataPacket::new(1, 0, 0, vec![9]);
223        assert!(buf.push(p));
224        let out = buf.pop().expect("pop should succeed");
225        assert_eq!(out.data, vec![9]);
226    }
227
228    #[test]
229    fn buffer_push_rejects_when_full() {
230        let mut buf = PortBuffer::new(1, 0, 2);
231        assert!(buf.push(DataPacket::new(1, 0, 0, vec![])));
232        assert!(buf.push(DataPacket::new(1, 0, 1, vec![])));
233        assert!(!buf.push(DataPacket::new(1, 0, 2, vec![]))); // full
234    }
235
236    #[test]
237    fn buffer_is_full_true_at_capacity() {
238        let mut buf = PortBuffer::new(1, 0, 1);
239        buf.push(DataPacket::new(1, 0, 0, vec![]));
240        assert!(buf.is_full());
241    }
242
243    #[test]
244    fn buffer_len_reflects_queue_depth() {
245        let mut buf = PortBuffer::new(2, 0, 10);
246        for i in 0..5 {
247            buf.push(DataPacket::new(2, 0, i, vec![]));
248        }
249        assert_eq!(buf.len(), 5);
250    }
251
252    #[test]
253    fn buffer_pop_empty_returns_none() {
254        let mut buf = PortBuffer::new(1, 0, 4);
255        assert!(buf.pop().is_none());
256    }
257
258    // ── BackpressurePolicy ────────────────────────────────────────────────────
259
260    #[test]
261    fn drop_policy_should_drop_is_true() {
262        assert!(BackpressurePolicy::Drop.should_drop());
263    }
264
265    #[test]
266    fn block_policy_should_drop_is_false() {
267        assert!(!BackpressurePolicy::Block.should_drop());
268    }
269
270    #[test]
271    fn resize_policy_should_drop_is_false() {
272        assert!(!BackpressurePolicy::Resize.should_drop());
273    }
274
275    // ── DataFlowController ────────────────────────────────────────────────────
276
277    #[test]
278    fn controller_total_packets_sums_all_buffers() {
279        let mut ctrl = DataFlowController::new(BackpressurePolicy::Drop);
280        let mut b1 = PortBuffer::new(1, 0, 8);
281        let mut b2 = PortBuffer::new(2, 0, 8);
282        b1.push(DataPacket::new(1, 0, 0, vec![]));
283        b1.push(DataPacket::new(1, 0, 1, vec![]));
284        b2.push(DataPacket::new(2, 0, 0, vec![]));
285        ctrl.add_buffer(b1);
286        ctrl.add_buffer(b2);
287        assert_eq!(ctrl.total_packets_in_flight(), 3);
288    }
289
290    #[test]
291    fn controller_get_buffer_returns_correct_buffer() {
292        let mut ctrl = DataFlowController::new(BackpressurePolicy::Block);
293        ctrl.add_buffer(PortBuffer::new(5, 2, 4));
294        assert!(ctrl.get_buffer(5, 2).is_some());
295        assert!(ctrl.get_buffer(5, 3).is_none());
296    }
297
298    #[test]
299    fn controller_clear_buffer_empties_queue() {
300        let mut ctrl = DataFlowController::new(BackpressurePolicy::Drop);
301        let mut buf = PortBuffer::new(1, 0, 8);
302        buf.push(DataPacket::new(1, 0, 0, vec![1, 2, 3]));
303        ctrl.add_buffer(buf);
304        assert!(ctrl.clear_buffer(1, 0));
305        assert_eq!(ctrl.total_packets_in_flight(), 0);
306    }
307
308    #[test]
309    fn controller_clear_buffer_false_for_missing() {
310        let mut ctrl = DataFlowController::new(BackpressurePolicy::Drop);
311        assert!(!ctrl.clear_buffer(99, 0));
312    }
313}