datasynth_core/streaming/
channel.rs1use std::collections::VecDeque;
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::{Arc, Condvar, Mutex};
9use std::time::{Duration, Instant};
10
11use crate::error::{SynthError, SynthResult};
12use crate::traits::{BackpressureStrategy, StreamEvent};
13
14#[derive(Debug, Clone, Default)]
16pub struct ChannelStats {
17 pub items_sent: u64,
19 pub items_received: u64,
21 pub items_dropped: u64,
23 pub buffer_size: usize,
25 pub max_buffer_size: usize,
27 pub send_blocks: u64,
29 pub receive_blocks: u64,
31}
32
33pub struct BoundedChannel<T> {
35 inner: Arc<ChannelInner<T>>,
37 capacity: usize,
39 strategy: BackpressureStrategy,
41}
42
43struct ChannelInner<T> {
44 buffer: Mutex<VecDeque<T>>,
46 not_full: Condvar,
48 not_empty: Condvar,
50 closed: AtomicBool,
52 items_sent: AtomicU64,
54 items_received: AtomicU64,
55 items_dropped: AtomicU64,
56 send_blocks: AtomicU64,
57 receive_blocks: AtomicU64,
58 max_buffer_size: AtomicU64,
59}
60
61impl<T> BoundedChannel<T> {
62 pub fn new(capacity: usize, strategy: BackpressureStrategy) -> Self {
64 Self {
65 inner: Arc::new(ChannelInner {
66 buffer: Mutex::new(VecDeque::with_capacity(capacity)),
67 not_full: Condvar::new(),
68 not_empty: Condvar::new(),
69 closed: AtomicBool::new(false),
70 items_sent: AtomicU64::new(0),
71 items_received: AtomicU64::new(0),
72 items_dropped: AtomicU64::new(0),
73 send_blocks: AtomicU64::new(0),
74 receive_blocks: AtomicU64::new(0),
75 max_buffer_size: AtomicU64::new(0),
76 }),
77 capacity,
78 strategy,
79 }
80 }
81
82 pub fn send(&self, item: T) -> SynthResult<bool> {
87 if self.inner.closed.load(Ordering::SeqCst) {
88 return Err(SynthError::ChannelClosed);
89 }
90
91 let mut buffer = self
92 .inner
93 .buffer
94 .lock()
95 .expect("BoundedChannel mutex poisoned: a thread panicked while holding the lock");
96
97 if buffer.len() >= self.capacity {
99 match self.strategy {
100 BackpressureStrategy::Block => {
101 self.inner.send_blocks.fetch_add(1, Ordering::Relaxed);
102 buffer = self
104 .inner
105 .not_full
106 .wait_while(buffer, |b| {
107 b.len() >= self.capacity && !self.inner.closed.load(Ordering::SeqCst)
108 })
109 .expect("BoundedChannel condvar wait: mutex was poisoned");
110
111 if self.inner.closed.load(Ordering::SeqCst) {
112 return Err(SynthError::ChannelClosed);
113 }
114 }
115 BackpressureStrategy::DropOldest => {
116 buffer.pop_front();
118 self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
119 }
120 BackpressureStrategy::DropNewest => {
121 self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
123 return Ok(false);
124 }
125 BackpressureStrategy::Buffer { max_overflow } => {
126 if buffer.len() >= self.capacity + max_overflow {
128 self.inner.send_blocks.fetch_add(1, Ordering::Relaxed);
129 buffer = self
130 .inner
131 .not_full
132 .wait_while(buffer, |b| {
133 b.len() >= self.capacity + max_overflow
134 && !self.inner.closed.load(Ordering::SeqCst)
135 })
136 .expect("BoundedChannel condvar wait: mutex was poisoned");
137
138 if self.inner.closed.load(Ordering::SeqCst) {
139 return Err(SynthError::ChannelClosed);
140 }
141 }
142 }
143 }
144 }
145
146 buffer.push_back(item);
147 let current_size = buffer.len() as u64;
148 self.inner.items_sent.fetch_add(1, Ordering::Relaxed);
149
150 let mut max_size = self.inner.max_buffer_size.load(Ordering::Relaxed);
152 while current_size > max_size {
153 match self.inner.max_buffer_size.compare_exchange_weak(
154 max_size,
155 current_size,
156 Ordering::SeqCst,
157 Ordering::Relaxed,
158 ) {
159 Ok(_) => break,
160 Err(x) => max_size = x,
161 }
162 }
163
164 drop(buffer);
165 self.inner.not_empty.notify_one();
166
167 Ok(true)
168 }
169
170 pub fn send_timeout(&self, item: T, timeout: Duration) -> SynthResult<bool> {
172 if self.inner.closed.load(Ordering::SeqCst) {
173 return Err(SynthError::ChannelClosed);
174 }
175
176 let deadline = Instant::now() + timeout;
177 let mut buffer = self
178 .inner
179 .buffer
180 .lock()
181 .expect("BoundedChannel mutex poisoned: a thread panicked while holding the lock");
182
183 while buffer.len() >= self.capacity {
185 if self.inner.closed.load(Ordering::SeqCst) {
186 return Err(SynthError::ChannelClosed);
187 }
188
189 let remaining = deadline.saturating_duration_since(Instant::now());
190 if remaining.is_zero() {
191 match self.strategy {
193 BackpressureStrategy::DropNewest => {
194 self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
195 return Ok(false);
196 }
197 BackpressureStrategy::DropOldest => {
198 buffer.pop_front();
199 self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
200 break;
201 }
202 _ => {
203 return Err(SynthError::GenerationError("send timeout".to_string()));
204 }
205 }
206 }
207
208 let (new_buffer, wait_result) = self
209 .inner
210 .not_full
211 .wait_timeout(buffer, remaining)
212 .expect("BoundedChannel condvar wait: mutex was poisoned");
213 buffer = new_buffer;
214
215 if wait_result.timed_out() && buffer.len() >= self.capacity {
216 match self.strategy {
217 BackpressureStrategy::DropNewest => {
218 self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
219 return Ok(false);
220 }
221 BackpressureStrategy::DropOldest => {
222 buffer.pop_front();
223 self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
224 break;
225 }
226 _ => {
227 return Err(SynthError::GenerationError("send timeout".to_string()));
228 }
229 }
230 }
231 }
232
233 buffer.push_back(item);
234 self.inner.items_sent.fetch_add(1, Ordering::Relaxed);
235 drop(buffer);
236 self.inner.not_empty.notify_one();
237
238 Ok(true)
239 }
240
241 pub fn recv(&self) -> Option<T> {
245 let mut buffer = self
246 .inner
247 .buffer
248 .lock()
249 .expect("BoundedChannel mutex poisoned: a thread panicked while holding the lock");
250
251 while buffer.is_empty() {
252 if self.inner.closed.load(Ordering::SeqCst) {
253 return None;
254 }
255 self.inner.receive_blocks.fetch_add(1, Ordering::Relaxed);
256 buffer = self
257 .inner
258 .not_empty
259 .wait(buffer)
260 .expect("BoundedChannel condvar wait: mutex was poisoned");
261 }
262
263 let item = buffer.pop_front();
264 if item.is_some() {
265 self.inner.items_received.fetch_add(1, Ordering::Relaxed);
266 }
267 drop(buffer);
268 self.inner.not_full.notify_one();
269
270 item
271 }
272
273 pub fn recv_timeout(&self, timeout: Duration) -> Option<T> {
275 let deadline = Instant::now() + timeout;
276 let mut buffer = self
277 .inner
278 .buffer
279 .lock()
280 .expect("BoundedChannel mutex poisoned: a thread panicked while holding the lock");
281
282 while buffer.is_empty() {
283 if self.inner.closed.load(Ordering::SeqCst) {
284 return None;
285 }
286
287 let remaining = deadline.saturating_duration_since(Instant::now());
288 if remaining.is_zero() {
289 return None;
290 }
291
292 let (new_buffer, wait_result) = self
293 .inner
294 .not_empty
295 .wait_timeout(buffer, remaining)
296 .expect("BoundedChannel condvar wait: mutex was poisoned");
297 buffer = new_buffer;
298
299 if wait_result.timed_out() && buffer.is_empty() {
300 return None;
301 }
302 }
303
304 let item = buffer.pop_front();
305 if item.is_some() {
306 self.inner.items_received.fetch_add(1, Ordering::Relaxed);
307 }
308 drop(buffer);
309 self.inner.not_full.notify_one();
310
311 item
312 }
313
314 pub fn try_recv(&self) -> Option<T> {
316 let mut buffer = self
317 .inner
318 .buffer
319 .lock()
320 .expect("BoundedChannel mutex poisoned: a thread panicked while holding the lock");
321 let item = buffer.pop_front();
322 if item.is_some() {
323 self.inner.items_received.fetch_add(1, Ordering::Relaxed);
324 drop(buffer);
325 self.inner.not_full.notify_one();
326 }
327 item
328 }
329
330 pub fn close(&self) {
332 self.inner.closed.store(true, Ordering::SeqCst);
333 self.inner.not_full.notify_all();
334 self.inner.not_empty.notify_all();
335 }
336
337 pub fn is_closed(&self) -> bool {
339 self.inner.closed.load(Ordering::SeqCst)
340 }
341
342 pub fn len(&self) -> usize {
344 self.inner
345 .buffer
346 .lock()
347 .expect("BoundedChannel mutex poisoned: a thread panicked while holding the lock")
348 .len()
349 }
350
351 pub fn is_empty(&self) -> bool {
353 self.len() == 0
354 }
355
356 pub fn capacity(&self) -> usize {
358 self.capacity
359 }
360
361 pub fn stats(&self) -> ChannelStats {
363 ChannelStats {
364 items_sent: self.inner.items_sent.load(Ordering::Relaxed),
365 items_received: self.inner.items_received.load(Ordering::Relaxed),
366 items_dropped: self.inner.items_dropped.load(Ordering::Relaxed),
367 buffer_size: self.len(),
368 max_buffer_size: self.inner.max_buffer_size.load(Ordering::Relaxed) as usize,
369 send_blocks: self.inner.send_blocks.load(Ordering::Relaxed),
370 receive_blocks: self.inner.receive_blocks.load(Ordering::Relaxed),
371 }
372 }
373}
374
375impl<T> Clone for BoundedChannel<T> {
376 fn clone(&self) -> Self {
377 Self {
378 inner: Arc::clone(&self.inner),
379 capacity: self.capacity,
380 strategy: self.strategy,
381 }
382 }
383}
384
385pub fn stream_channel<T>(
387 capacity: usize,
388 strategy: BackpressureStrategy,
389) -> (StreamSender<T>, StreamReceiver<T>) {
390 let channel = BoundedChannel::new(capacity, strategy);
391 (
392 StreamSender {
393 channel: channel.clone(),
394 },
395 StreamReceiver { channel },
396 )
397}
398
399pub struct StreamSender<T> {
401 channel: BoundedChannel<StreamEvent<T>>,
402}
403
404impl<T> StreamSender<T> {
405 pub fn send(&self, event: StreamEvent<T>) -> SynthResult<bool> {
407 self.channel.send(event)
408 }
409
410 pub fn send_data(&self, item: T) -> SynthResult<bool> {
412 self.channel.send(StreamEvent::Data(item))
413 }
414
415 pub fn close(&self) {
417 self.channel.close();
418 }
419
420 pub fn stats(&self) -> ChannelStats {
422 self.channel.stats()
423 }
424}
425
426impl<T> Clone for StreamSender<T> {
427 fn clone(&self) -> Self {
428 Self {
429 channel: self.channel.clone(),
430 }
431 }
432}
433
434pub struct StreamReceiver<T> {
436 channel: BoundedChannel<StreamEvent<T>>,
437}
438
439impl<T> StreamReceiver<T> {
440 pub fn recv(&self) -> Option<StreamEvent<T>> {
442 self.channel.recv()
443 }
444
445 pub fn recv_timeout(&self, timeout: Duration) -> Option<StreamEvent<T>> {
447 self.channel.recv_timeout(timeout)
448 }
449
450 pub fn try_recv(&self) -> Option<StreamEvent<T>> {
452 self.channel.try_recv()
453 }
454
455 pub fn is_closed(&self) -> bool {
457 self.channel.is_closed()
458 }
459
460 pub fn stats(&self) -> ChannelStats {
462 self.channel.stats()
463 }
464}
465
466impl<T> Iterator for StreamReceiver<T> {
467 type Item = StreamEvent<T>;
468
469 fn next(&mut self) -> Option<Self::Item> {
470 self.recv()
471 }
472}
473
474#[cfg(test)]
475#[allow(clippy::unwrap_used)]
476mod tests {
477 use super::*;
478 use std::thread;
479
480 #[test]
481 fn test_bounded_channel_basic() {
482 let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
483
484 channel.send(1).unwrap();
485 channel.send(2).unwrap();
486 channel.send(3).unwrap();
487
488 assert_eq!(channel.recv(), Some(1));
489 assert_eq!(channel.recv(), Some(2));
490 assert_eq!(channel.recv(), Some(3));
491 }
492
493 #[test]
494 fn test_bounded_channel_drop_oldest() {
495 let channel: BoundedChannel<i32> = BoundedChannel::new(2, BackpressureStrategy::DropOldest);
496
497 channel.send(1).unwrap();
498 channel.send(2).unwrap();
499 channel.send(3).unwrap(); let stats = channel.stats();
502 assert_eq!(stats.items_dropped, 1);
503 assert_eq!(channel.recv(), Some(2));
504 assert_eq!(channel.recv(), Some(3));
505 }
506
507 #[test]
508 fn test_bounded_channel_drop_newest() {
509 let channel: BoundedChannel<i32> = BoundedChannel::new(2, BackpressureStrategy::DropNewest);
510
511 channel.send(1).unwrap();
512 channel.send(2).unwrap();
513 let sent = channel.send(3).unwrap(); assert!(!sent);
516 let stats = channel.stats();
517 assert_eq!(stats.items_dropped, 1);
518 }
519
520 #[test]
521 fn test_bounded_channel_close() {
522 let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
523
524 channel.send(1).unwrap();
525 channel.close();
526
527 assert_eq!(channel.recv(), Some(1));
528 assert_eq!(channel.recv(), None);
529 assert!(channel.send(2).is_err());
530 }
531
532 #[test]
533 fn test_bounded_channel_threaded() {
534 let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
535 let sender = channel.clone();
536
537 let handle = thread::spawn(move || {
538 for i in 0..100 {
539 sender.send(i).unwrap();
540 }
541 sender.close();
542 });
543
544 let mut received = Vec::new();
545 while let Some(item) = channel.recv() {
546 received.push(item);
547 }
548
549 handle.join().unwrap();
550
551 assert_eq!(received, (0..100).collect::<Vec<_>>());
552 }
553
554 #[test]
555 fn test_stream_channel() {
556 let (sender, receiver) = stream_channel::<i32>(10, BackpressureStrategy::Block);
557
558 sender.send_data(1).unwrap();
559 sender.send_data(2).unwrap();
560 sender.close();
561
562 let events: Vec<_> = receiver.collect();
563 assert_eq!(events.len(), 2);
564
565 assert!(matches!(events[0], StreamEvent::Data(1)));
566 assert!(matches!(events[1], StreamEvent::Data(2)));
567 }
568
569 #[test]
570 fn test_channel_stats() {
571 let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
572
573 channel.send(1).unwrap();
574 channel.send(2).unwrap();
575 channel.recv();
576
577 let stats = channel.stats();
578 assert_eq!(stats.items_sent, 2);
579 assert_eq!(stats.items_received, 1);
580 assert_eq!(stats.buffer_size, 1);
581 }
582}