1use std::collections::VecDeque;
15use std::sync::Mutex;
16use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
17
18use codec::frame::VideoFrame;
19use tokio::sync::Notify;
20
21pub struct SegmentChunk {
22 pub segment_idx: usize,
23 pub frames: Vec<VideoFrame>,
24 pub is_final: bool,
25}
26
27pub struct SegmentChunkQueue {
28 inner: Mutex<VecDeque<SegmentChunk>>,
29 capacity: usize,
30 push_notify: Notify,
31 pop_notify: Notify,
32 closed: AtomicBool,
33 pushed_segments: AtomicUsize,
34 popped_segments: AtomicUsize,
35}
36
37impl SegmentChunkQueue {
38 pub fn new(capacity: usize) -> Self {
39 assert!(capacity > 0, "SegmentChunkQueue capacity must be > 0");
40 Self {
41 inner: Mutex::new(VecDeque::with_capacity(capacity)),
42 capacity,
43 push_notify: Notify::new(),
44 pop_notify: Notify::new(),
45 closed: AtomicBool::new(false),
46 pushed_segments: AtomicUsize::new(0),
47 popped_segments: AtomicUsize::new(0),
48 }
49 }
50
51 pub fn capacity(&self) -> usize {
52 self.capacity
53 }
54
55 pub fn is_closed(&self) -> bool {
56 self.closed.load(Ordering::Acquire)
57 }
58
59 pub fn pushed_segments(&self) -> usize {
60 self.pushed_segments.load(Ordering::Relaxed)
61 }
62
63 pub fn popped_segments(&self) -> usize {
64 self.popped_segments.load(Ordering::Relaxed)
65 }
66
67 pub async fn push(&self, chunk: SegmentChunk) -> bool {
70 let mut chunk_slot = Some(chunk);
71 loop {
72 let waker = self.pop_notify.notified();
73 tokio::pin!(waker);
74 waker.as_mut().enable();
75 if self.closed.load(Ordering::Acquire) {
76 return false;
77 }
78 {
79 let mut q = self.inner.lock().unwrap();
80 if q.len() < self.capacity {
81 q.push_back(chunk_slot.take().unwrap());
82 self.pushed_segments.fetch_add(1, Ordering::Relaxed);
83 drop(q);
84 self.push_notify.notify_one();
85 return true;
86 }
87 }
88 waker.await;
89 }
90 }
91
92 pub async fn pop(&self) -> Option<SegmentChunk> {
95 loop {
96 let waker = self.push_notify.notified();
97 tokio::pin!(waker);
98 waker.as_mut().enable();
99 {
100 let mut q = self.inner.lock().unwrap();
101 if let Some(chunk) = q.pop_front() {
102 self.popped_segments.fetch_add(1, Ordering::Relaxed);
103 drop(q);
104 self.pop_notify.notify_waiters();
105 return Some(chunk);
106 }
107 if self.closed.load(Ordering::Acquire) {
108 return None;
109 }
110 }
111 waker.await;
112 }
113 }
114
115 pub fn push_front(&self, chunk: SegmentChunk) -> bool {
127 if self.closed.load(Ordering::Acquire) {
128 }
132 let mut q = self.inner.lock().unwrap();
133 q.push_front(chunk);
134 self.popped_segments.fetch_sub(1, Ordering::Relaxed);
138 drop(q);
139 self.push_notify.notify_one();
140 true
141 }
142
143 pub fn close(&self) {
146 self.closed.store(true, Ordering::Release);
147 self.push_notify.notify_waiters();
148 self.pop_notify.notify_waiters();
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155 use bytes::Bytes;
156 use codec::frame::{ColorSpace, PixelFormat};
157 use std::sync::Arc;
158
159 fn dummy_frame(idx: u64) -> VideoFrame {
160 let mut data = vec![idx as u8; 16 * 16];
161 data.extend(vec![128u8; 8 * 8]);
162 data.extend(vec![128u8; 8 * 8]);
163 VideoFrame::new(
164 Bytes::from(data),
165 16,
166 16,
167 PixelFormat::Yuv420p,
168 ColorSpace::Bt709,
169 idx,
170 )
171 }
172
173 fn chunk(idx: usize, frame_count: usize) -> SegmentChunk {
174 SegmentChunk {
175 segment_idx: idx,
176 frames: (0..frame_count).map(|i| dummy_frame(i as u64)).collect(),
177 is_final: false,
178 }
179 }
180
181 #[tokio::test]
182 async fn push_then_pop_preserves_order() {
183 let q = Arc::new(SegmentChunkQueue::new(4));
184 assert!(q.push(chunk(0, 2)).await);
185 assert!(q.push(chunk(1, 3)).await);
186 q.close();
187 let a = q.pop().await.unwrap();
188 assert_eq!(a.segment_idx, 0);
189 assert_eq!(a.frames.len(), 2);
190 let b = q.pop().await.unwrap();
191 assert_eq!(b.segment_idx, 1);
192 assert_eq!(b.frames.len(), 3);
193 assert!(q.pop().await.is_none());
194 }
195
196 #[tokio::test]
197 async fn pop_blocks_until_pushed() {
198 let q = Arc::new(SegmentChunkQueue::new(4));
199 let q2 = q.clone();
200 let pop_task = tokio::spawn(async move { q2.pop().await });
201 tokio::task::yield_now().await;
202 assert!(q.push(chunk(7, 1)).await);
203 let got = pop_task.await.unwrap().unwrap();
204 assert_eq!(got.segment_idx, 7);
205 }
206
207 #[tokio::test]
208 async fn push_blocks_when_full_resumes_after_pop() {
209 let q = Arc::new(SegmentChunkQueue::new(2));
210 assert!(q.push(chunk(0, 1)).await);
211 assert!(q.push(chunk(1, 1)).await);
212 let q2 = q.clone();
213 let push_task = tokio::spawn(async move { q2.push(chunk(2, 1)).await });
214 tokio::task::yield_now().await;
215 let drained = q.pop().await.unwrap();
216 assert_eq!(drained.segment_idx, 0);
217 assert!(push_task.await.unwrap());
218 q.close();
219 let r1 = q.pop().await.unwrap();
220 assert_eq!(r1.segment_idx, 1);
221 let r2 = q.pop().await.unwrap();
222 assert_eq!(r2.segment_idx, 2);
223 assert!(q.pop().await.is_none());
224 }
225
226 #[tokio::test]
227 async fn close_wakes_pending_pop() {
228 let q = Arc::new(SegmentChunkQueue::new(2));
229 let q2 = q.clone();
230 let pop_task = tokio::spawn(async move { q2.pop().await });
231 tokio::task::yield_now().await;
232 q.close();
233 assert!(pop_task.await.unwrap().is_none());
234 }
235
236 #[tokio::test]
237 async fn multiple_consumers_partition_chunks() {
238 let q = Arc::new(SegmentChunkQueue::new(8));
239 for i in 0..6 {
240 assert!(q.push(chunk(i, 1)).await);
241 }
242 q.close();
243 let mut handles = Vec::new();
244 for _ in 0..3 {
245 let q2 = q.clone();
246 handles.push(tokio::spawn(async move {
247 let mut got = Vec::new();
248 while let Some(c) = q2.pop().await {
249 got.push(c.segment_idx);
250 }
251 got
252 }));
253 }
254 let mut all: Vec<usize> = Vec::new();
255 for h in handles {
256 all.extend(h.await.unwrap());
257 }
258 all.sort();
259 assert_eq!(all, vec![0, 1, 2, 3, 4, 5]);
260 }
261
262 #[tokio::test]
263 async fn closed_rejects_push() {
264 let q = Arc::new(SegmentChunkQueue::new(2));
265 q.close();
266 assert!(!q.push(chunk(0, 1)).await);
267 }
268
269 #[tokio::test]
270 async fn final_chunk_flag_is_preserved() {
271 let q = Arc::new(SegmentChunkQueue::new(4));
272 let mut last = chunk(9, 2);
273 last.is_final = true;
274 assert!(q.push(last).await);
275 q.close();
276 let got = q.pop().await.unwrap();
277 assert!(got.is_final);
278 }
279}