Skip to main content

rivet/
frame_queue.rs

1//! Per-rung segment chunk queue connecting the decode pump to N
2//! encoder workers.
3//!
4//! v2 multi-GPU model (2026-05-11): the pump groups decoded source
5//! frames into fixed-size chunks (one chunk = one CMAF segment's
6//! worth of frames = `keyframe_interval` frames) and pushes them
7//! into this queue with a monotonic segment index. Encoder workers
8//! pop chunks and emit segments. The segment index travels with the
9//! frames so each worker knows which output file to write.
10//!
11//! Single-producer, multi-consumer. Bounded capacity for memory
12//! safety: pump blocks when full, workers block when empty.
13
14use std::collections::VecDeque;
15use std::sync::Mutex;
16use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
17
18use codec::frame::VideoFrame;
19use tokio::sync::Notify;
20
21pub struct SegmentChunk {
22    pub segment_idx: usize,
23    pub frames: Vec<VideoFrame>,
24    pub is_final: bool,
25}
26
27pub struct SegmentChunkQueue {
28    inner: Mutex<VecDeque<SegmentChunk>>,
29    capacity: usize,
30    push_notify: Notify,
31    pop_notify: Notify,
32    closed: AtomicBool,
33    pushed_segments: AtomicUsize,
34    popped_segments: AtomicUsize,
35}
36
37impl SegmentChunkQueue {
38    pub fn new(capacity: usize) -> Self {
39        assert!(capacity > 0, "SegmentChunkQueue capacity must be > 0");
40        Self {
41            inner: Mutex::new(VecDeque::with_capacity(capacity)),
42            capacity,
43            push_notify: Notify::new(),
44            pop_notify: Notify::new(),
45            closed: AtomicBool::new(false),
46            pushed_segments: AtomicUsize::new(0),
47            popped_segments: AtomicUsize::new(0),
48        }
49    }
50
51    pub fn capacity(&self) -> usize {
52        self.capacity
53    }
54
55    pub fn is_closed(&self) -> bool {
56        self.closed.load(Ordering::Acquire)
57    }
58
59    pub fn pushed_segments(&self) -> usize {
60        self.pushed_segments.load(Ordering::Relaxed)
61    }
62
63    pub fn popped_segments(&self) -> usize {
64        self.popped_segments.load(Ordering::Relaxed)
65    }
66
67    /// Push one segment chunk. Awaits capacity. Returns `false` if
68    /// closed before the chunk could be enqueued.
69    pub async fn push(&self, chunk: SegmentChunk) -> bool {
70        let mut chunk_slot = Some(chunk);
71        loop {
72            let waker = self.pop_notify.notified();
73            tokio::pin!(waker);
74            waker.as_mut().enable();
75            if self.closed.load(Ordering::Acquire) {
76                return false;
77            }
78            {
79                let mut q = self.inner.lock().unwrap();
80                if q.len() < self.capacity {
81                    q.push_back(chunk_slot.take().unwrap());
82                    self.pushed_segments.fetch_add(1, Ordering::Relaxed);
83                    drop(q);
84                    self.push_notify.notify_one();
85                    return true;
86                }
87            }
88            waker.await;
89        }
90    }
91
92    /// Pop one chunk. Blocks until one is available. Returns `None`
93    /// only once the queue is closed and drained.
94    pub async fn pop(&self) -> Option<SegmentChunk> {
95        loop {
96            let waker = self.push_notify.notified();
97            tokio::pin!(waker);
98            waker.as_mut().enable();
99            {
100                let mut q = self.inner.lock().unwrap();
101                if let Some(chunk) = q.pop_front() {
102                    self.popped_segments.fetch_add(1, Ordering::Relaxed);
103                    drop(q);
104                    self.pop_notify.notify_waiters();
105                    return Some(chunk);
106                }
107                if self.closed.load(Ordering::Acquire) {
108                    return None;
109                }
110            }
111            waker.await;
112        }
113    }
114
115    /// Put a chunk back at the FRONT of the queue. Bypasses capacity
116    /// (a requeued chunk briefly exceeds capacity by 1; the queue
117    /// drains back under capacity at the next pop). Used by encoder
118    /// workers that pop a chunk, observe a cross-vendor codec
119    /// invariant mismatch, and need to hand the chunk off to another
120    /// worker. Decrements `popped_segments` so the dispatcher's
121    /// `pushed > popped` predicate still reflects work-remaining.
122    ///
123    /// Returns `false` if the queue is closed AND no other consumer
124    /// can pick the chunk up — in which case the caller should treat
125    /// this chunk as orphaned and the run will fail coverage.
126    pub fn push_front(&self, chunk: SegmentChunk) -> bool {
127        if self.closed.load(Ordering::Acquire) {
128            // Closed: still re-queue if there are popped-but-not-yet-
129            // -finished consumers that might come back; the queue
130            // drains FIFO, so the chunk lands at head.
131        }
132        let mut q = self.inner.lock().unwrap();
133        q.push_front(chunk);
134        // The caller had popped this chunk before, so `popped_segments`
135        // was incremented; undo that increment so observers see the
136        // queue's true pending count.
137        self.popped_segments.fetch_sub(1, Ordering::Relaxed);
138        drop(q);
139        self.push_notify.notify_one();
140        true
141    }
142
143    /// Mark the queue closed. Pending and future pushes return false;
144    /// pending pops wake and return remaining chunks then None.
145    pub fn close(&self) {
146        self.closed.store(true, Ordering::Release);
147        self.push_notify.notify_waiters();
148        self.pop_notify.notify_waiters();
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155    use bytes::Bytes;
156    use codec::frame::{ColorSpace, PixelFormat};
157    use std::sync::Arc;
158
159    fn dummy_frame(idx: u64) -> VideoFrame {
160        let mut data = vec![idx as u8; 16 * 16];
161        data.extend(vec![128u8; 8 * 8]);
162        data.extend(vec![128u8; 8 * 8]);
163        VideoFrame::new(
164            Bytes::from(data),
165            16,
166            16,
167            PixelFormat::Yuv420p,
168            ColorSpace::Bt709,
169            idx,
170        )
171    }
172
173    fn chunk(idx: usize, frame_count: usize) -> SegmentChunk {
174        SegmentChunk {
175            segment_idx: idx,
176            frames: (0..frame_count).map(|i| dummy_frame(i as u64)).collect(),
177            is_final: false,
178        }
179    }
180
181    #[tokio::test]
182    async fn push_then_pop_preserves_order() {
183        let q = Arc::new(SegmentChunkQueue::new(4));
184        assert!(q.push(chunk(0, 2)).await);
185        assert!(q.push(chunk(1, 3)).await);
186        q.close();
187        let a = q.pop().await.unwrap();
188        assert_eq!(a.segment_idx, 0);
189        assert_eq!(a.frames.len(), 2);
190        let b = q.pop().await.unwrap();
191        assert_eq!(b.segment_idx, 1);
192        assert_eq!(b.frames.len(), 3);
193        assert!(q.pop().await.is_none());
194    }
195
196    #[tokio::test]
197    async fn pop_blocks_until_pushed() {
198        let q = Arc::new(SegmentChunkQueue::new(4));
199        let q2 = q.clone();
200        let pop_task = tokio::spawn(async move { q2.pop().await });
201        tokio::task::yield_now().await;
202        assert!(q.push(chunk(7, 1)).await);
203        let got = pop_task.await.unwrap().unwrap();
204        assert_eq!(got.segment_idx, 7);
205    }
206
207    #[tokio::test]
208    async fn push_blocks_when_full_resumes_after_pop() {
209        let q = Arc::new(SegmentChunkQueue::new(2));
210        assert!(q.push(chunk(0, 1)).await);
211        assert!(q.push(chunk(1, 1)).await);
212        let q2 = q.clone();
213        let push_task = tokio::spawn(async move { q2.push(chunk(2, 1)).await });
214        tokio::task::yield_now().await;
215        let drained = q.pop().await.unwrap();
216        assert_eq!(drained.segment_idx, 0);
217        assert!(push_task.await.unwrap());
218        q.close();
219        let r1 = q.pop().await.unwrap();
220        assert_eq!(r1.segment_idx, 1);
221        let r2 = q.pop().await.unwrap();
222        assert_eq!(r2.segment_idx, 2);
223        assert!(q.pop().await.is_none());
224    }
225
226    #[tokio::test]
227    async fn close_wakes_pending_pop() {
228        let q = Arc::new(SegmentChunkQueue::new(2));
229        let q2 = q.clone();
230        let pop_task = tokio::spawn(async move { q2.pop().await });
231        tokio::task::yield_now().await;
232        q.close();
233        assert!(pop_task.await.unwrap().is_none());
234    }
235
236    #[tokio::test]
237    async fn multiple_consumers_partition_chunks() {
238        let q = Arc::new(SegmentChunkQueue::new(8));
239        for i in 0..6 {
240            assert!(q.push(chunk(i, 1)).await);
241        }
242        q.close();
243        let mut handles = Vec::new();
244        for _ in 0..3 {
245            let q2 = q.clone();
246            handles.push(tokio::spawn(async move {
247                let mut got = Vec::new();
248                while let Some(c) = q2.pop().await {
249                    got.push(c.segment_idx);
250                }
251                got
252            }));
253        }
254        let mut all: Vec<usize> = Vec::new();
255        for h in handles {
256            all.extend(h.await.unwrap());
257        }
258        all.sort();
259        assert_eq!(all, vec![0, 1, 2, 3, 4, 5]);
260    }
261
262    #[tokio::test]
263    async fn closed_rejects_push() {
264        let q = Arc::new(SegmentChunkQueue::new(2));
265        q.close();
266        assert!(!q.push(chunk(0, 1)).await);
267    }
268
269    #[tokio::test]
270    async fn final_chunk_flag_is_preserved() {
271        let q = Arc::new(SegmentChunkQueue::new(4));
272        let mut last = chunk(9, 2);
273        last.is_final = true;
274        assert!(q.push(last).await);
275        q.close();
276        let got = q.pop().await.unwrap();
277        assert!(got.is_final);
278    }
279}