Skip to main content

datasynth_core/streaming/
channel.rs

1//! Channel utilities for streaming generation.
2//!
3//! Provides bounded channels with backpressure support for
4//! producer-consumer streaming patterns.
5
6use std::collections::VecDeque;
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::{Arc, Condvar, Mutex};
9use std::time::{Duration, Instant};
10
11use crate::error::{SynthError, SynthResult};
12use crate::traits::{BackpressureStrategy, StreamEvent};
13
14/// Statistics for a streaming channel.
15#[derive(Debug, Clone, Default)]
16pub struct ChannelStats {
17    /// Total items sent through the channel.
18    pub items_sent: u64,
19    /// Total items received from the channel.
20    pub items_received: u64,
21    /// Items dropped due to backpressure.
22    pub items_dropped: u64,
23    /// Current buffer size.
24    pub buffer_size: usize,
25    /// Maximum buffer size reached.
26    pub max_buffer_size: usize,
27    /// Times sender blocked waiting for space.
28    pub send_blocks: u64,
29    /// Times receiver blocked waiting for items.
30    pub receive_blocks: u64,
31}
32
33/// A bounded channel with configurable backpressure handling.
34pub struct BoundedChannel<T> {
35    /// Internal state protected by mutex.
36    inner: Arc<ChannelInner<T>>,
37    /// Channel capacity.
38    capacity: usize,
39    /// Backpressure strategy.
40    strategy: BackpressureStrategy,
41}
42
43struct ChannelInner<T> {
44    /// The buffer of items.
45    buffer: Mutex<VecDeque<T>>,
46    /// Condition variable for waiting senders.
47    not_full: Condvar,
48    /// Condition variable for waiting receivers.
49    not_empty: Condvar,
50    /// Whether the channel is closed.
51    closed: AtomicBool,
52    /// Statistics.
53    items_sent: AtomicU64,
54    items_received: AtomicU64,
55    items_dropped: AtomicU64,
56    send_blocks: AtomicU64,
57    receive_blocks: AtomicU64,
58    max_buffer_size: AtomicU64,
59}
60
61impl<T> BoundedChannel<T> {
62    /// Creates a new bounded channel with the given capacity and backpressure strategy.
63    pub fn new(capacity: usize, strategy: BackpressureStrategy) -> Self {
64        Self {
65            inner: Arc::new(ChannelInner {
66                buffer: Mutex::new(VecDeque::with_capacity(capacity)),
67                not_full: Condvar::new(),
68                not_empty: Condvar::new(),
69                closed: AtomicBool::new(false),
70                items_sent: AtomicU64::new(0),
71                items_received: AtomicU64::new(0),
72                items_dropped: AtomicU64::new(0),
73                send_blocks: AtomicU64::new(0),
74                receive_blocks: AtomicU64::new(0),
75                max_buffer_size: AtomicU64::new(0),
76            }),
77            capacity,
78            strategy,
79        }
80    }
81
82    /// Sends an item through the channel.
83    ///
84    /// Returns `Ok(true)` if the item was sent, `Ok(false)` if it was dropped,
85    /// or `Err` if the channel is closed.
86    pub fn send(&self, item: T) -> SynthResult<bool> {
87        if self.inner.closed.load(Ordering::SeqCst) {
88            return Err(SynthError::ChannelClosed);
89        }
90
91        let mut buffer = self.inner.buffer.lock().expect("mutex poisoned");
92
93        // Check if buffer is full
94        if buffer.len() >= self.capacity {
95            match self.strategy {
96                BackpressureStrategy::Block => {
97                    self.inner.send_blocks.fetch_add(1, Ordering::Relaxed);
98                    // Wait until space is available
99                    buffer = self
100                        .inner
101                        .not_full
102                        .wait_while(buffer, |b| {
103                            b.len() >= self.capacity && !self.inner.closed.load(Ordering::SeqCst)
104                        })
105                        .expect("condvar wait");
106
107                    if self.inner.closed.load(Ordering::SeqCst) {
108                        return Err(SynthError::ChannelClosed);
109                    }
110                }
111                BackpressureStrategy::DropOldest => {
112                    // Drop oldest item to make room
113                    buffer.pop_front();
114                    self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
115                }
116                BackpressureStrategy::DropNewest => {
117                    // Don't add the new item
118                    self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
119                    return Ok(false);
120                }
121                BackpressureStrategy::Buffer { max_overflow } => {
122                    // Allow overflow up to max_overflow
123                    if buffer.len() >= self.capacity + max_overflow {
124                        self.inner.send_blocks.fetch_add(1, Ordering::Relaxed);
125                        buffer = self
126                            .inner
127                            .not_full
128                            .wait_while(buffer, |b| {
129                                b.len() >= self.capacity + max_overflow
130                                    && !self.inner.closed.load(Ordering::SeqCst)
131                            })
132                            .expect("condvar wait");
133
134                        if self.inner.closed.load(Ordering::SeqCst) {
135                            return Err(SynthError::ChannelClosed);
136                        }
137                    }
138                }
139            }
140        }
141
142        buffer.push_back(item);
143        let current_size = buffer.len() as u64;
144        self.inner.items_sent.fetch_add(1, Ordering::Relaxed);
145
146        // Update max buffer size
147        let mut max_size = self.inner.max_buffer_size.load(Ordering::Relaxed);
148        while current_size > max_size {
149            match self.inner.max_buffer_size.compare_exchange_weak(
150                max_size,
151                current_size,
152                Ordering::SeqCst,
153                Ordering::Relaxed,
154            ) {
155                Ok(_) => break,
156                Err(x) => max_size = x,
157            }
158        }
159
160        drop(buffer);
161        self.inner.not_empty.notify_one();
162
163        Ok(true)
164    }
165
166    /// Sends an item with a timeout.
167    pub fn send_timeout(&self, item: T, timeout: Duration) -> SynthResult<bool> {
168        if self.inner.closed.load(Ordering::SeqCst) {
169            return Err(SynthError::ChannelClosed);
170        }
171
172        let deadline = Instant::now() + timeout;
173        let mut buffer = self.inner.buffer.lock().expect("mutex poisoned");
174
175        // Check if buffer is full
176        while buffer.len() >= self.capacity {
177            if self.inner.closed.load(Ordering::SeqCst) {
178                return Err(SynthError::ChannelClosed);
179            }
180
181            let remaining = deadline.saturating_duration_since(Instant::now());
182            if remaining.is_zero() {
183                // Timeout - apply strategy
184                match self.strategy {
185                    BackpressureStrategy::DropNewest => {
186                        self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
187                        return Ok(false);
188                    }
189                    BackpressureStrategy::DropOldest => {
190                        buffer.pop_front();
191                        self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
192                        break;
193                    }
194                    _ => {
195                        return Err(SynthError::GenerationError("send timeout".to_string()));
196                    }
197                }
198            }
199
200            let (new_buffer, wait_result) = self
201                .inner
202                .not_full
203                .wait_timeout(buffer, remaining)
204                .expect("condvar wait");
205            buffer = new_buffer;
206
207            if wait_result.timed_out() && buffer.len() >= self.capacity {
208                match self.strategy {
209                    BackpressureStrategy::DropNewest => {
210                        self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
211                        return Ok(false);
212                    }
213                    BackpressureStrategy::DropOldest => {
214                        buffer.pop_front();
215                        self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
216                        break;
217                    }
218                    _ => {
219                        return Err(SynthError::GenerationError("send timeout".to_string()));
220                    }
221                }
222            }
223        }
224
225        buffer.push_back(item);
226        self.inner.items_sent.fetch_add(1, Ordering::Relaxed);
227        drop(buffer);
228        self.inner.not_empty.notify_one();
229
230        Ok(true)
231    }
232
233    /// Receives an item from the channel.
234    ///
235    /// Returns `None` if the channel is closed and empty.
236    pub fn recv(&self) -> Option<T> {
237        let mut buffer = self.inner.buffer.lock().expect("mutex poisoned");
238
239        while buffer.is_empty() {
240            if self.inner.closed.load(Ordering::SeqCst) {
241                return None;
242            }
243            self.inner.receive_blocks.fetch_add(1, Ordering::Relaxed);
244            buffer = self.inner.not_empty.wait(buffer).expect("condvar wait");
245        }
246
247        let item = buffer.pop_front();
248        if item.is_some() {
249            self.inner.items_received.fetch_add(1, Ordering::Relaxed);
250        }
251        drop(buffer);
252        self.inner.not_full.notify_one();
253
254        item
255    }
256
257    /// Receives an item with a timeout.
258    pub fn recv_timeout(&self, timeout: Duration) -> Option<T> {
259        let deadline = Instant::now() + timeout;
260        let mut buffer = self.inner.buffer.lock().expect("mutex poisoned");
261
262        while buffer.is_empty() {
263            if self.inner.closed.load(Ordering::SeqCst) {
264                return None;
265            }
266
267            let remaining = deadline.saturating_duration_since(Instant::now());
268            if remaining.is_zero() {
269                return None;
270            }
271
272            let (new_buffer, wait_result) = self
273                .inner
274                .not_empty
275                .wait_timeout(buffer, remaining)
276                .expect("condvar wait");
277            buffer = new_buffer;
278
279            if wait_result.timed_out() && buffer.is_empty() {
280                return None;
281            }
282        }
283
284        let item = buffer.pop_front();
285        if item.is_some() {
286            self.inner.items_received.fetch_add(1, Ordering::Relaxed);
287        }
288        drop(buffer);
289        self.inner.not_full.notify_one();
290
291        item
292    }
293
294    /// Tries to receive an item without blocking.
295    pub fn try_recv(&self) -> Option<T> {
296        let mut buffer = self.inner.buffer.lock().expect("mutex poisoned");
297        let item = buffer.pop_front();
298        if item.is_some() {
299            self.inner.items_received.fetch_add(1, Ordering::Relaxed);
300            drop(buffer);
301            self.inner.not_full.notify_one();
302        }
303        item
304    }
305
306    /// Closes the channel.
307    pub fn close(&self) {
308        self.inner.closed.store(true, Ordering::SeqCst);
309        self.inner.not_full.notify_all();
310        self.inner.not_empty.notify_all();
311    }
312
313    /// Returns whether the channel is closed.
314    pub fn is_closed(&self) -> bool {
315        self.inner.closed.load(Ordering::SeqCst)
316    }
317
318    /// Returns the current number of items in the buffer.
319    pub fn len(&self) -> usize {
320        self.inner.buffer.lock().expect("mutex poisoned").len()
321    }
322
323    /// Returns whether the buffer is empty.
324    pub fn is_empty(&self) -> bool {
325        self.len() == 0
326    }
327
328    /// Returns the channel capacity.
329    pub fn capacity(&self) -> usize {
330        self.capacity
331    }
332
333    /// Returns channel statistics.
334    pub fn stats(&self) -> ChannelStats {
335        ChannelStats {
336            items_sent: self.inner.items_sent.load(Ordering::Relaxed),
337            items_received: self.inner.items_received.load(Ordering::Relaxed),
338            items_dropped: self.inner.items_dropped.load(Ordering::Relaxed),
339            buffer_size: self.len(),
340            max_buffer_size: self.inner.max_buffer_size.load(Ordering::Relaxed) as usize,
341            send_blocks: self.inner.send_blocks.load(Ordering::Relaxed),
342            receive_blocks: self.inner.receive_blocks.load(Ordering::Relaxed),
343        }
344    }
345}
346
347impl<T> Clone for BoundedChannel<T> {
348    fn clone(&self) -> Self {
349        Self {
350            inner: Arc::clone(&self.inner),
351            capacity: self.capacity,
352            strategy: self.strategy,
353        }
354    }
355}
356
357/// Creates a stream event channel pair.
358pub fn stream_channel<T>(
359    capacity: usize,
360    strategy: BackpressureStrategy,
361) -> (StreamSender<T>, StreamReceiver<T>) {
362    let channel = BoundedChannel::new(capacity, strategy);
363    (
364        StreamSender {
365            channel: channel.clone(),
366        },
367        StreamReceiver { channel },
368    )
369}
370
371/// Sender side of a stream event channel.
372pub struct StreamSender<T> {
373    channel: BoundedChannel<StreamEvent<T>>,
374}
375
376impl<T> StreamSender<T> {
377    /// Sends a stream event.
378    pub fn send(&self, event: StreamEvent<T>) -> SynthResult<bool> {
379        self.channel.send(event)
380    }
381
382    /// Sends a data item.
383    pub fn send_data(&self, item: T) -> SynthResult<bool> {
384        self.channel.send(StreamEvent::Data(item))
385    }
386
387    /// Closes the sender.
388    pub fn close(&self) {
389        self.channel.close();
390    }
391
392    /// Returns channel statistics.
393    pub fn stats(&self) -> ChannelStats {
394        self.channel.stats()
395    }
396}
397
398impl<T> Clone for StreamSender<T> {
399    fn clone(&self) -> Self {
400        Self {
401            channel: self.channel.clone(),
402        }
403    }
404}
405
406/// Receiver side of a stream event channel.
407pub struct StreamReceiver<T> {
408    channel: BoundedChannel<StreamEvent<T>>,
409}
410
411impl<T> StreamReceiver<T> {
412    /// Receives the next stream event.
413    pub fn recv(&self) -> Option<StreamEvent<T>> {
414        self.channel.recv()
415    }
416
417    /// Receives with timeout.
418    pub fn recv_timeout(&self, timeout: Duration) -> Option<StreamEvent<T>> {
419        self.channel.recv_timeout(timeout)
420    }
421
422    /// Tries to receive without blocking.
423    pub fn try_recv(&self) -> Option<StreamEvent<T>> {
424        self.channel.try_recv()
425    }
426
427    /// Returns whether the channel is closed.
428    pub fn is_closed(&self) -> bool {
429        self.channel.is_closed()
430    }
431
432    /// Returns channel statistics.
433    pub fn stats(&self) -> ChannelStats {
434        self.channel.stats()
435    }
436}
437
438impl<T> Iterator for StreamReceiver<T> {
439    type Item = StreamEvent<T>;
440
441    fn next(&mut self) -> Option<Self::Item> {
442        self.recv()
443    }
444}
445
446#[cfg(test)]
447#[allow(clippy::unwrap_used)]
448mod tests {
449    use super::*;
450    use std::thread;
451
452    #[test]
453    fn test_bounded_channel_basic() {
454        let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
455
456        channel.send(1).unwrap();
457        channel.send(2).unwrap();
458        channel.send(3).unwrap();
459
460        assert_eq!(channel.recv(), Some(1));
461        assert_eq!(channel.recv(), Some(2));
462        assert_eq!(channel.recv(), Some(3));
463    }
464
465    #[test]
466    fn test_bounded_channel_drop_oldest() {
467        let channel: BoundedChannel<i32> = BoundedChannel::new(2, BackpressureStrategy::DropOldest);
468
469        channel.send(1).unwrap();
470        channel.send(2).unwrap();
471        channel.send(3).unwrap(); // Should drop 1
472
473        let stats = channel.stats();
474        assert_eq!(stats.items_dropped, 1);
475        assert_eq!(channel.recv(), Some(2));
476        assert_eq!(channel.recv(), Some(3));
477    }
478
479    #[test]
480    fn test_bounded_channel_drop_newest() {
481        let channel: BoundedChannel<i32> = BoundedChannel::new(2, BackpressureStrategy::DropNewest);
482
483        channel.send(1).unwrap();
484        channel.send(2).unwrap();
485        let sent = channel.send(3).unwrap(); // Should be dropped
486
487        assert!(!sent);
488        let stats = channel.stats();
489        assert_eq!(stats.items_dropped, 1);
490    }
491
492    #[test]
493    fn test_bounded_channel_close() {
494        let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
495
496        channel.send(1).unwrap();
497        channel.close();
498
499        assert_eq!(channel.recv(), Some(1));
500        assert_eq!(channel.recv(), None);
501        assert!(channel.send(2).is_err());
502    }
503
504    #[test]
505    fn test_bounded_channel_threaded() {
506        let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
507        let sender = channel.clone();
508
509        let handle = thread::spawn(move || {
510            for i in 0..100 {
511                sender.send(i).unwrap();
512            }
513            sender.close();
514        });
515
516        let mut received = Vec::new();
517        while let Some(item) = channel.recv() {
518            received.push(item);
519        }
520
521        handle.join().unwrap();
522
523        assert_eq!(received, (0..100).collect::<Vec<_>>());
524    }
525
526    #[test]
527    fn test_stream_channel() {
528        let (sender, receiver) = stream_channel::<i32>(10, BackpressureStrategy::Block);
529
530        sender.send_data(1).unwrap();
531        sender.send_data(2).unwrap();
532        sender.close();
533
534        let events: Vec<_> = receiver.collect();
535        assert_eq!(events.len(), 2);
536
537        assert!(matches!(events[0], StreamEvent::Data(1)));
538        assert!(matches!(events[1], StreamEvent::Data(2)));
539    }
540
541    #[test]
542    fn test_channel_stats() {
543        let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
544
545        channel.send(1).unwrap();
546        channel.send(2).unwrap();
547        channel.recv();
548
549        let stats = channel.stats();
550        assert_eq!(stats.items_sent, 2);
551        assert_eq!(stats.items_received, 1);
552        assert_eq!(stats.buffer_size, 1);
553    }
554}