Skip to main content

datasynth_core/streaming/
channel.rs

1//! Channel utilities for streaming generation.
2//!
3//! Provides bounded channels with backpressure support for
4//! producer-consumer streaming patterns.
5
6use std::collections::VecDeque;
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::{Arc, Condvar, Mutex};
9use std::time::{Duration, Instant};
10
11use crate::error::{SynthError, SynthResult};
12use crate::traits::{BackpressureStrategy, StreamEvent};
13
14/// Statistics for a streaming channel.
15#[derive(Debug, Clone, Default)]
16pub struct ChannelStats {
17    /// Total items sent through the channel.
18    pub items_sent: u64,
19    /// Total items received from the channel.
20    pub items_received: u64,
21    /// Items dropped due to backpressure.
22    pub items_dropped: u64,
23    /// Current buffer size.
24    pub buffer_size: usize,
25    /// Maximum buffer size reached.
26    pub max_buffer_size: usize,
27    /// Times sender blocked waiting for space.
28    pub send_blocks: u64,
29    /// Times receiver blocked waiting for items.
30    pub receive_blocks: u64,
31}
32
33/// A bounded channel with configurable backpressure handling.
34pub struct BoundedChannel<T> {
35    /// Internal state protected by mutex.
36    inner: Arc<ChannelInner<T>>,
37    /// Channel capacity.
38    capacity: usize,
39    /// Backpressure strategy.
40    strategy: BackpressureStrategy,
41}
42
43struct ChannelInner<T> {
44    /// The buffer of items.
45    buffer: Mutex<VecDeque<T>>,
46    /// Condition variable for waiting senders.
47    not_full: Condvar,
48    /// Condition variable for waiting receivers.
49    not_empty: Condvar,
50    /// Whether the channel is closed.
51    closed: AtomicBool,
52    /// Statistics.
53    items_sent: AtomicU64,
54    items_received: AtomicU64,
55    items_dropped: AtomicU64,
56    send_blocks: AtomicU64,
57    receive_blocks: AtomicU64,
58    max_buffer_size: AtomicU64,
59}
60
61impl<T> BoundedChannel<T> {
62    /// Creates a new bounded channel with the given capacity and backpressure strategy.
63    pub fn new(capacity: usize, strategy: BackpressureStrategy) -> Self {
64        Self {
65            inner: Arc::new(ChannelInner {
66                buffer: Mutex::new(VecDeque::with_capacity(capacity)),
67                not_full: Condvar::new(),
68                not_empty: Condvar::new(),
69                closed: AtomicBool::new(false),
70                items_sent: AtomicU64::new(0),
71                items_received: AtomicU64::new(0),
72                items_dropped: AtomicU64::new(0),
73                send_blocks: AtomicU64::new(0),
74                receive_blocks: AtomicU64::new(0),
75                max_buffer_size: AtomicU64::new(0),
76            }),
77            capacity,
78            strategy,
79        }
80    }
81
82    /// Sends an item through the channel.
83    ///
84    /// Returns `Ok(true)` if the item was sent, `Ok(false)` if it was dropped,
85    /// or `Err` if the channel is closed.
86    pub fn send(&self, item: T) -> SynthResult<bool> {
87        if self.inner.closed.load(Ordering::SeqCst) {
88            return Err(SynthError::ChannelClosed);
89        }
90
91        let mut buffer = self
92            .inner
93            .buffer
94            .lock()
95            .unwrap_or_else(|poisoned| poisoned.into_inner());
96
97        // Check if buffer is full
98        if buffer.len() >= self.capacity {
99            match self.strategy {
100                BackpressureStrategy::Block => {
101                    self.inner.send_blocks.fetch_add(1, Ordering::Relaxed);
102                    // Wait until space is available
103                    buffer = self
104                        .inner
105                        .not_full
106                        .wait_while(buffer, |b| {
107                            b.len() >= self.capacity && !self.inner.closed.load(Ordering::SeqCst)
108                        })
109                        .unwrap_or_else(|poisoned| poisoned.into_inner());
110
111                    if self.inner.closed.load(Ordering::SeqCst) {
112                        return Err(SynthError::ChannelClosed);
113                    }
114                }
115                BackpressureStrategy::DropOldest => {
116                    // Drop oldest item to make room
117                    buffer.pop_front();
118                    self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
119                }
120                BackpressureStrategy::DropNewest => {
121                    // Don't add the new item
122                    self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
123                    return Ok(false);
124                }
125                BackpressureStrategy::Buffer { max_overflow } => {
126                    // Allow overflow up to max_overflow
127                    if buffer.len() >= self.capacity + max_overflow {
128                        self.inner.send_blocks.fetch_add(1, Ordering::Relaxed);
129                        buffer = self
130                            .inner
131                            .not_full
132                            .wait_while(buffer, |b| {
133                                b.len() >= self.capacity + max_overflow
134                                    && !self.inner.closed.load(Ordering::SeqCst)
135                            })
136                            .unwrap_or_else(|poisoned| poisoned.into_inner());
137
138                        if self.inner.closed.load(Ordering::SeqCst) {
139                            return Err(SynthError::ChannelClosed);
140                        }
141                    }
142                }
143            }
144        }
145
146        buffer.push_back(item);
147        let current_size = buffer.len() as u64;
148        self.inner.items_sent.fetch_add(1, Ordering::Relaxed);
149
150        // Update max buffer size
151        let mut max_size = self.inner.max_buffer_size.load(Ordering::Relaxed);
152        while current_size > max_size {
153            match self.inner.max_buffer_size.compare_exchange_weak(
154                max_size,
155                current_size,
156                Ordering::SeqCst,
157                Ordering::Relaxed,
158            ) {
159                Ok(_) => break,
160                Err(x) => max_size = x,
161            }
162        }
163
164        drop(buffer);
165        self.inner.not_empty.notify_one();
166
167        Ok(true)
168    }
169
170    /// Sends an item with a timeout.
171    pub fn send_timeout(&self, item: T, timeout: Duration) -> SynthResult<bool> {
172        if self.inner.closed.load(Ordering::SeqCst) {
173            return Err(SynthError::ChannelClosed);
174        }
175
176        let deadline = Instant::now() + timeout;
177        let mut buffer = self
178            .inner
179            .buffer
180            .lock()
181            .unwrap_or_else(|poisoned| poisoned.into_inner());
182
183        // Check if buffer is full
184        while buffer.len() >= self.capacity {
185            if self.inner.closed.load(Ordering::SeqCst) {
186                return Err(SynthError::ChannelClosed);
187            }
188
189            let remaining = deadline.saturating_duration_since(Instant::now());
190            if remaining.is_zero() {
191                // Timeout - apply strategy
192                match self.strategy {
193                    BackpressureStrategy::DropNewest => {
194                        self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
195                        return Ok(false);
196                    }
197                    BackpressureStrategy::DropOldest => {
198                        buffer.pop_front();
199                        self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
200                        break;
201                    }
202                    _ => {
203                        return Err(SynthError::GenerationError("send timeout".to_string()));
204                    }
205                }
206            }
207
208            let (new_buffer, wait_result) = self
209                .inner
210                .not_full
211                .wait_timeout(buffer, remaining)
212                .unwrap_or_else(|poisoned| poisoned.into_inner());
213            buffer = new_buffer;
214
215            if wait_result.timed_out() && buffer.len() >= self.capacity {
216                match self.strategy {
217                    BackpressureStrategy::DropNewest => {
218                        self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
219                        return Ok(false);
220                    }
221                    BackpressureStrategy::DropOldest => {
222                        buffer.pop_front();
223                        self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
224                        break;
225                    }
226                    _ => {
227                        return Err(SynthError::GenerationError("send timeout".to_string()));
228                    }
229                }
230            }
231        }
232
233        buffer.push_back(item);
234        self.inner.items_sent.fetch_add(1, Ordering::Relaxed);
235        drop(buffer);
236        self.inner.not_empty.notify_one();
237
238        Ok(true)
239    }
240
241    /// Receives an item from the channel.
242    ///
243    /// Returns `None` if the channel is closed and empty.
244    pub fn recv(&self) -> Option<T> {
245        let mut buffer = self
246            .inner
247            .buffer
248            .lock()
249            .unwrap_or_else(|poisoned| poisoned.into_inner());
250
251        while buffer.is_empty() {
252            if self.inner.closed.load(Ordering::SeqCst) {
253                return None;
254            }
255            self.inner.receive_blocks.fetch_add(1, Ordering::Relaxed);
256            buffer = self
257                .inner
258                .not_empty
259                .wait(buffer)
260                .unwrap_or_else(|poisoned| poisoned.into_inner());
261        }
262
263        let item = buffer.pop_front();
264        if item.is_some() {
265            self.inner.items_received.fetch_add(1, Ordering::Relaxed);
266        }
267        drop(buffer);
268        self.inner.not_full.notify_one();
269
270        item
271    }
272
273    /// Receives an item with a timeout.
274    pub fn recv_timeout(&self, timeout: Duration) -> Option<T> {
275        let deadline = Instant::now() + timeout;
276        let mut buffer = self
277            .inner
278            .buffer
279            .lock()
280            .unwrap_or_else(|poisoned| poisoned.into_inner());
281
282        while buffer.is_empty() {
283            if self.inner.closed.load(Ordering::SeqCst) {
284                return None;
285            }
286
287            let remaining = deadline.saturating_duration_since(Instant::now());
288            if remaining.is_zero() {
289                return None;
290            }
291
292            let (new_buffer, wait_result) = self
293                .inner
294                .not_empty
295                .wait_timeout(buffer, remaining)
296                .unwrap_or_else(|poisoned| poisoned.into_inner());
297            buffer = new_buffer;
298
299            if wait_result.timed_out() && buffer.is_empty() {
300                return None;
301            }
302        }
303
304        let item = buffer.pop_front();
305        if item.is_some() {
306            self.inner.items_received.fetch_add(1, Ordering::Relaxed);
307        }
308        drop(buffer);
309        self.inner.not_full.notify_one();
310
311        item
312    }
313
314    /// Tries to receive an item without blocking.
315    pub fn try_recv(&self) -> Option<T> {
316        let mut buffer = self
317            .inner
318            .buffer
319            .lock()
320            .unwrap_or_else(|poisoned| poisoned.into_inner());
321        let item = buffer.pop_front();
322        if item.is_some() {
323            self.inner.items_received.fetch_add(1, Ordering::Relaxed);
324            drop(buffer);
325            self.inner.not_full.notify_one();
326        }
327        item
328    }
329
330    /// Closes the channel.
331    pub fn close(&self) {
332        self.inner.closed.store(true, Ordering::SeqCst);
333        self.inner.not_full.notify_all();
334        self.inner.not_empty.notify_all();
335    }
336
337    /// Returns whether the channel is closed.
338    pub fn is_closed(&self) -> bool {
339        self.inner.closed.load(Ordering::SeqCst)
340    }
341
342    /// Returns the current number of items in the buffer.
343    pub fn len(&self) -> usize {
344        self.inner
345            .buffer
346            .lock()
347            .unwrap_or_else(|poisoned| poisoned.into_inner())
348            .len()
349    }
350
351    /// Returns whether the buffer is empty.
352    pub fn is_empty(&self) -> bool {
353        self.len() == 0
354    }
355
356    /// Returns the channel capacity.
357    pub fn capacity(&self) -> usize {
358        self.capacity
359    }
360
361    /// Returns channel statistics.
362    pub fn stats(&self) -> ChannelStats {
363        ChannelStats {
364            items_sent: self.inner.items_sent.load(Ordering::Relaxed),
365            items_received: self.inner.items_received.load(Ordering::Relaxed),
366            items_dropped: self.inner.items_dropped.load(Ordering::Relaxed),
367            buffer_size: self.len(),
368            max_buffer_size: self.inner.max_buffer_size.load(Ordering::Relaxed) as usize,
369            send_blocks: self.inner.send_blocks.load(Ordering::Relaxed),
370            receive_blocks: self.inner.receive_blocks.load(Ordering::Relaxed),
371        }
372    }
373}
374
375impl<T> Clone for BoundedChannel<T> {
376    fn clone(&self) -> Self {
377        Self {
378            inner: Arc::clone(&self.inner),
379            capacity: self.capacity,
380            strategy: self.strategy,
381        }
382    }
383}
384
385/// Creates a stream event channel pair.
386pub fn stream_channel<T>(
387    capacity: usize,
388    strategy: BackpressureStrategy,
389) -> (StreamSender<T>, StreamReceiver<T>) {
390    let channel = BoundedChannel::new(capacity, strategy);
391    (
392        StreamSender {
393            channel: channel.clone(),
394        },
395        StreamReceiver { channel },
396    )
397}
398
399/// Sender side of a stream event channel.
400pub struct StreamSender<T> {
401    channel: BoundedChannel<StreamEvent<T>>,
402}
403
404impl<T> StreamSender<T> {
405    /// Sends a stream event.
406    pub fn send(&self, event: StreamEvent<T>) -> SynthResult<bool> {
407        self.channel.send(event)
408    }
409
410    /// Sends a data item.
411    pub fn send_data(&self, item: T) -> SynthResult<bool> {
412        self.channel.send(StreamEvent::Data(item))
413    }
414
415    /// Closes the sender.
416    pub fn close(&self) {
417        self.channel.close();
418    }
419
420    /// Returns channel statistics.
421    pub fn stats(&self) -> ChannelStats {
422        self.channel.stats()
423    }
424}
425
426impl<T> Clone for StreamSender<T> {
427    fn clone(&self) -> Self {
428        Self {
429            channel: self.channel.clone(),
430        }
431    }
432}
433
434/// Receiver side of a stream event channel.
435pub struct StreamReceiver<T> {
436    channel: BoundedChannel<StreamEvent<T>>,
437}
438
439impl<T> StreamReceiver<T> {
440    /// Receives the next stream event.
441    pub fn recv(&self) -> Option<StreamEvent<T>> {
442        self.channel.recv()
443    }
444
445    /// Receives with timeout.
446    pub fn recv_timeout(&self, timeout: Duration) -> Option<StreamEvent<T>> {
447        self.channel.recv_timeout(timeout)
448    }
449
450    /// Tries to receive without blocking.
451    pub fn try_recv(&self) -> Option<StreamEvent<T>> {
452        self.channel.try_recv()
453    }
454
455    /// Returns whether the channel is closed.
456    pub fn is_closed(&self) -> bool {
457        self.channel.is_closed()
458    }
459
460    /// Returns channel statistics.
461    pub fn stats(&self) -> ChannelStats {
462        self.channel.stats()
463    }
464}
465
466impl<T> Iterator for StreamReceiver<T> {
467    type Item = StreamEvent<T>;
468
469    fn next(&mut self) -> Option<Self::Item> {
470        self.recv()
471    }
472}
473
474#[cfg(test)]
475#[allow(clippy::unwrap_used)]
476mod tests {
477    use super::*;
478    use std::thread;
479
480    #[test]
481    fn test_bounded_channel_basic() {
482        let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
483
484        channel.send(1).unwrap();
485        channel.send(2).unwrap();
486        channel.send(3).unwrap();
487
488        assert_eq!(channel.recv(), Some(1));
489        assert_eq!(channel.recv(), Some(2));
490        assert_eq!(channel.recv(), Some(3));
491    }
492
493    #[test]
494    fn test_bounded_channel_drop_oldest() {
495        let channel: BoundedChannel<i32> = BoundedChannel::new(2, BackpressureStrategy::DropOldest);
496
497        channel.send(1).unwrap();
498        channel.send(2).unwrap();
499        channel.send(3).unwrap(); // Should drop 1
500
501        let stats = channel.stats();
502        assert_eq!(stats.items_dropped, 1);
503        assert_eq!(channel.recv(), Some(2));
504        assert_eq!(channel.recv(), Some(3));
505    }
506
507    #[test]
508    fn test_bounded_channel_drop_newest() {
509        let channel: BoundedChannel<i32> = BoundedChannel::new(2, BackpressureStrategy::DropNewest);
510
511        channel.send(1).unwrap();
512        channel.send(2).unwrap();
513        let sent = channel.send(3).unwrap(); // Should be dropped
514
515        assert!(!sent);
516        let stats = channel.stats();
517        assert_eq!(stats.items_dropped, 1);
518    }
519
520    #[test]
521    fn test_bounded_channel_close() {
522        let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
523
524        channel.send(1).unwrap();
525        channel.close();
526
527        assert_eq!(channel.recv(), Some(1));
528        assert_eq!(channel.recv(), None);
529        assert!(channel.send(2).is_err());
530    }
531
532    #[test]
533    fn test_bounded_channel_threaded() {
534        let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
535        let sender = channel.clone();
536
537        let handle = thread::spawn(move || {
538            for i in 0..100 {
539                sender.send(i).unwrap();
540            }
541            sender.close();
542        });
543
544        let mut received = Vec::new();
545        while let Some(item) = channel.recv() {
546            received.push(item);
547        }
548
549        handle.join().unwrap();
550
551        assert_eq!(received, (0..100).collect::<Vec<_>>());
552    }
553
554    #[test]
555    fn test_stream_channel() {
556        let (sender, receiver) = stream_channel::<i32>(10, BackpressureStrategy::Block);
557
558        sender.send_data(1).unwrap();
559        sender.send_data(2).unwrap();
560        sender.close();
561
562        let events: Vec<_> = receiver.collect();
563        assert_eq!(events.len(), 2);
564
565        assert!(matches!(events[0], StreamEvent::Data(1)));
566        assert!(matches!(events[1], StreamEvent::Data(2)));
567    }
568
569    #[test]
570    fn test_channel_stats() {
571        let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
572
573        channel.send(1).unwrap();
574        channel.send(2).unwrap();
575        channel.recv();
576
577        let stats = channel.stats();
578        assert_eq!(stats.items_sent, 2);
579        assert_eq!(stats.items_received, 1);
580        assert_eq!(stats.buffer_size, 1);
581    }
582
583    #[test]
584    fn test_channel_recovers_from_poisoned_mutex() {
585        let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
586        let poisoner = channel.clone();
587
588        // Spawn a thread that acquires the lock and then panics, poisoning the mutex
589        let handle = thread::spawn(move || {
590            let _guard = poisoner
591                .inner
592                .buffer
593                .lock()
594                .unwrap_or_else(|p| p.into_inner());
595            panic!("intentional panic to poison mutex");
596        });
597
598        // Wait for the panicking thread to finish
599        let _ = handle.join();
600
601        // The mutex is now poisoned — verify that send/recv still work
602        assert!(channel.send(42).is_ok());
603        assert_eq!(channel.recv(), Some(42));
604
605        // Also verify try_recv and stats work
606        assert_eq!(channel.try_recv(), None);
607        let stats = channel.stats();
608        assert_eq!(stats.items_sent, 1);
609        assert_eq!(stats.items_received, 1);
610    }
611}