Skip to main content

datasynth_core/streaming/
channel.rs

1//! Channel utilities for streaming generation.
2//!
3//! Provides bounded channels with backpressure support for
4//! producer-consumer streaming patterns.
5
6use std::collections::VecDeque;
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::{Arc, Condvar, Mutex};
9use std::time::{Duration, Instant};
10
11use crate::error::{SynthError, SynthResult};
12use crate::traits::{BackpressureStrategy, StreamEvent};
13
14/// Statistics for a streaming channel.
15#[derive(Debug, Clone, Default)]
16pub struct ChannelStats {
17    /// Total items sent through the channel.
18    pub items_sent: u64,
19    /// Total items received from the channel.
20    pub items_received: u64,
21    /// Items dropped due to backpressure.
22    pub items_dropped: u64,
23    /// Current buffer size.
24    pub buffer_size: usize,
25    /// Maximum buffer size reached.
26    pub max_buffer_size: usize,
27    /// Times sender blocked waiting for space.
28    pub send_blocks: u64,
29    /// Times receiver blocked waiting for items.
30    pub receive_blocks: u64,
31}
32
33/// A bounded channel with configurable backpressure handling.
34pub struct BoundedChannel<T> {
35    /// Internal state protected by mutex.
36    inner: Arc<ChannelInner<T>>,
37    /// Channel capacity.
38    capacity: usize,
39    /// Backpressure strategy.
40    strategy: BackpressureStrategy,
41}
42
43struct ChannelInner<T> {
44    /// The buffer of items.
45    buffer: Mutex<VecDeque<T>>,
46    /// Condition variable for waiting senders.
47    not_full: Condvar,
48    /// Condition variable for waiting receivers.
49    not_empty: Condvar,
50    /// Whether the channel is closed.
51    closed: AtomicBool,
52    /// Statistics.
53    items_sent: AtomicU64,
54    items_received: AtomicU64,
55    items_dropped: AtomicU64,
56    send_blocks: AtomicU64,
57    receive_blocks: AtomicU64,
58    max_buffer_size: AtomicU64,
59}
60
61impl<T> BoundedChannel<T> {
62    /// Creates a new bounded channel with the given capacity and backpressure strategy.
63    pub fn new(capacity: usize, strategy: BackpressureStrategy) -> Self {
64        Self {
65            inner: Arc::new(ChannelInner {
66                buffer: Mutex::new(VecDeque::with_capacity(capacity)),
67                not_full: Condvar::new(),
68                not_empty: Condvar::new(),
69                closed: AtomicBool::new(false),
70                items_sent: AtomicU64::new(0),
71                items_received: AtomicU64::new(0),
72                items_dropped: AtomicU64::new(0),
73                send_blocks: AtomicU64::new(0),
74                receive_blocks: AtomicU64::new(0),
75                max_buffer_size: AtomicU64::new(0),
76            }),
77            capacity,
78            strategy,
79        }
80    }
81
82    /// Sends an item through the channel.
83    ///
84    /// Returns `Ok(true)` if the item was sent, `Ok(false)` if it was dropped,
85    /// or `Err` if the channel is closed.
86    pub fn send(&self, item: T) -> SynthResult<bool> {
87        if self.inner.closed.load(Ordering::SeqCst) {
88            return Err(SynthError::ChannelClosed);
89        }
90
91        let mut buffer = self.inner.buffer.lock().unwrap();
92
93        // Check if buffer is full
94        if buffer.len() >= self.capacity {
95            match self.strategy {
96                BackpressureStrategy::Block => {
97                    self.inner.send_blocks.fetch_add(1, Ordering::Relaxed);
98                    // Wait until space is available
99                    buffer = self
100                        .inner
101                        .not_full
102                        .wait_while(buffer, |b| {
103                            b.len() >= self.capacity && !self.inner.closed.load(Ordering::SeqCst)
104                        })
105                        .unwrap();
106
107                    if self.inner.closed.load(Ordering::SeqCst) {
108                        return Err(SynthError::ChannelClosed);
109                    }
110                }
111                BackpressureStrategy::DropOldest => {
112                    // Drop oldest item to make room
113                    buffer.pop_front();
114                    self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
115                }
116                BackpressureStrategy::DropNewest => {
117                    // Don't add the new item
118                    self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
119                    return Ok(false);
120                }
121                BackpressureStrategy::Buffer { max_overflow } => {
122                    // Allow overflow up to max_overflow
123                    if buffer.len() >= self.capacity + max_overflow {
124                        self.inner.send_blocks.fetch_add(1, Ordering::Relaxed);
125                        buffer = self
126                            .inner
127                            .not_full
128                            .wait_while(buffer, |b| {
129                                b.len() >= self.capacity + max_overflow
130                                    && !self.inner.closed.load(Ordering::SeqCst)
131                            })
132                            .unwrap();
133
134                        if self.inner.closed.load(Ordering::SeqCst) {
135                            return Err(SynthError::ChannelClosed);
136                        }
137                    }
138                }
139            }
140        }
141
142        buffer.push_back(item);
143        let current_size = buffer.len() as u64;
144        self.inner.items_sent.fetch_add(1, Ordering::Relaxed);
145
146        // Update max buffer size
147        let mut max_size = self.inner.max_buffer_size.load(Ordering::Relaxed);
148        while current_size > max_size {
149            match self.inner.max_buffer_size.compare_exchange_weak(
150                max_size,
151                current_size,
152                Ordering::SeqCst,
153                Ordering::Relaxed,
154            ) {
155                Ok(_) => break,
156                Err(x) => max_size = x,
157            }
158        }
159
160        drop(buffer);
161        self.inner.not_empty.notify_one();
162
163        Ok(true)
164    }
165
166    /// Sends an item with a timeout.
167    pub fn send_timeout(&self, item: T, timeout: Duration) -> SynthResult<bool> {
168        if self.inner.closed.load(Ordering::SeqCst) {
169            return Err(SynthError::ChannelClosed);
170        }
171
172        let deadline = Instant::now() + timeout;
173        let mut buffer = self.inner.buffer.lock().unwrap();
174
175        // Check if buffer is full
176        while buffer.len() >= self.capacity {
177            if self.inner.closed.load(Ordering::SeqCst) {
178                return Err(SynthError::ChannelClosed);
179            }
180
181            let remaining = deadline.saturating_duration_since(Instant::now());
182            if remaining.is_zero() {
183                // Timeout - apply strategy
184                match self.strategy {
185                    BackpressureStrategy::DropNewest => {
186                        self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
187                        return Ok(false);
188                    }
189                    BackpressureStrategy::DropOldest => {
190                        buffer.pop_front();
191                        self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
192                        break;
193                    }
194                    _ => {
195                        return Err(SynthError::GenerationError("send timeout".to_string()));
196                    }
197                }
198            }
199
200            let (new_buffer, wait_result) =
201                self.inner.not_full.wait_timeout(buffer, remaining).unwrap();
202            buffer = new_buffer;
203
204            if wait_result.timed_out() && buffer.len() >= self.capacity {
205                match self.strategy {
206                    BackpressureStrategy::DropNewest => {
207                        self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
208                        return Ok(false);
209                    }
210                    BackpressureStrategy::DropOldest => {
211                        buffer.pop_front();
212                        self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
213                        break;
214                    }
215                    _ => {
216                        return Err(SynthError::GenerationError("send timeout".to_string()));
217                    }
218                }
219            }
220        }
221
222        buffer.push_back(item);
223        self.inner.items_sent.fetch_add(1, Ordering::Relaxed);
224        drop(buffer);
225        self.inner.not_empty.notify_one();
226
227        Ok(true)
228    }
229
230    /// Receives an item from the channel.
231    ///
232    /// Returns `None` if the channel is closed and empty.
233    pub fn recv(&self) -> Option<T> {
234        let mut buffer = self.inner.buffer.lock().unwrap();
235
236        while buffer.is_empty() {
237            if self.inner.closed.load(Ordering::SeqCst) {
238                return None;
239            }
240            self.inner.receive_blocks.fetch_add(1, Ordering::Relaxed);
241            buffer = self.inner.not_empty.wait(buffer).unwrap();
242        }
243
244        let item = buffer.pop_front();
245        if item.is_some() {
246            self.inner.items_received.fetch_add(1, Ordering::Relaxed);
247        }
248        drop(buffer);
249        self.inner.not_full.notify_one();
250
251        item
252    }
253
254    /// Receives an item with a timeout.
255    pub fn recv_timeout(&self, timeout: Duration) -> Option<T> {
256        let deadline = Instant::now() + timeout;
257        let mut buffer = self.inner.buffer.lock().unwrap();
258
259        while buffer.is_empty() {
260            if self.inner.closed.load(Ordering::SeqCst) {
261                return None;
262            }
263
264            let remaining = deadline.saturating_duration_since(Instant::now());
265            if remaining.is_zero() {
266                return None;
267            }
268
269            let (new_buffer, wait_result) = self
270                .inner
271                .not_empty
272                .wait_timeout(buffer, remaining)
273                .unwrap();
274            buffer = new_buffer;
275
276            if wait_result.timed_out() && buffer.is_empty() {
277                return None;
278            }
279        }
280
281        let item = buffer.pop_front();
282        if item.is_some() {
283            self.inner.items_received.fetch_add(1, Ordering::Relaxed);
284        }
285        drop(buffer);
286        self.inner.not_full.notify_one();
287
288        item
289    }
290
291    /// Tries to receive an item without blocking.
292    pub fn try_recv(&self) -> Option<T> {
293        let mut buffer = self.inner.buffer.lock().unwrap();
294        let item = buffer.pop_front();
295        if item.is_some() {
296            self.inner.items_received.fetch_add(1, Ordering::Relaxed);
297            drop(buffer);
298            self.inner.not_full.notify_one();
299        }
300        item
301    }
302
303    /// Closes the channel.
304    pub fn close(&self) {
305        self.inner.closed.store(true, Ordering::SeqCst);
306        self.inner.not_full.notify_all();
307        self.inner.not_empty.notify_all();
308    }
309
310    /// Returns whether the channel is closed.
311    pub fn is_closed(&self) -> bool {
312        self.inner.closed.load(Ordering::SeqCst)
313    }
314
315    /// Returns the current number of items in the buffer.
316    pub fn len(&self) -> usize {
317        self.inner.buffer.lock().unwrap().len()
318    }
319
320    /// Returns whether the buffer is empty.
321    pub fn is_empty(&self) -> bool {
322        self.len() == 0
323    }
324
325    /// Returns the channel capacity.
326    pub fn capacity(&self) -> usize {
327        self.capacity
328    }
329
330    /// Returns channel statistics.
331    pub fn stats(&self) -> ChannelStats {
332        ChannelStats {
333            items_sent: self.inner.items_sent.load(Ordering::Relaxed),
334            items_received: self.inner.items_received.load(Ordering::Relaxed),
335            items_dropped: self.inner.items_dropped.load(Ordering::Relaxed),
336            buffer_size: self.len(),
337            max_buffer_size: self.inner.max_buffer_size.load(Ordering::Relaxed) as usize,
338            send_blocks: self.inner.send_blocks.load(Ordering::Relaxed),
339            receive_blocks: self.inner.receive_blocks.load(Ordering::Relaxed),
340        }
341    }
342}
343
344impl<T> Clone for BoundedChannel<T> {
345    fn clone(&self) -> Self {
346        Self {
347            inner: Arc::clone(&self.inner),
348            capacity: self.capacity,
349            strategy: self.strategy,
350        }
351    }
352}
353
354/// Creates a stream event channel pair.
355pub fn stream_channel<T>(
356    capacity: usize,
357    strategy: BackpressureStrategy,
358) -> (StreamSender<T>, StreamReceiver<T>) {
359    let channel = BoundedChannel::new(capacity, strategy);
360    (
361        StreamSender {
362            channel: channel.clone(),
363        },
364        StreamReceiver { channel },
365    )
366}
367
368/// Sender side of a stream event channel.
369pub struct StreamSender<T> {
370    channel: BoundedChannel<StreamEvent<T>>,
371}
372
373impl<T> StreamSender<T> {
374    /// Sends a stream event.
375    pub fn send(&self, event: StreamEvent<T>) -> SynthResult<bool> {
376        self.channel.send(event)
377    }
378
379    /// Sends a data item.
380    pub fn send_data(&self, item: T) -> SynthResult<bool> {
381        self.channel.send(StreamEvent::Data(item))
382    }
383
384    /// Closes the sender.
385    pub fn close(&self) {
386        self.channel.close();
387    }
388
389    /// Returns channel statistics.
390    pub fn stats(&self) -> ChannelStats {
391        self.channel.stats()
392    }
393}
394
395impl<T> Clone for StreamSender<T> {
396    fn clone(&self) -> Self {
397        Self {
398            channel: self.channel.clone(),
399        }
400    }
401}
402
403/// Receiver side of a stream event channel.
404pub struct StreamReceiver<T> {
405    channel: BoundedChannel<StreamEvent<T>>,
406}
407
408impl<T> StreamReceiver<T> {
409    /// Receives the next stream event.
410    pub fn recv(&self) -> Option<StreamEvent<T>> {
411        self.channel.recv()
412    }
413
414    /// Receives with timeout.
415    pub fn recv_timeout(&self, timeout: Duration) -> Option<StreamEvent<T>> {
416        self.channel.recv_timeout(timeout)
417    }
418
419    /// Tries to receive without blocking.
420    pub fn try_recv(&self) -> Option<StreamEvent<T>> {
421        self.channel.try_recv()
422    }
423
424    /// Returns whether the channel is closed.
425    pub fn is_closed(&self) -> bool {
426        self.channel.is_closed()
427    }
428
429    /// Returns channel statistics.
430    pub fn stats(&self) -> ChannelStats {
431        self.channel.stats()
432    }
433}
434
435impl<T> Iterator for StreamReceiver<T> {
436    type Item = StreamEvent<T>;
437
438    fn next(&mut self) -> Option<Self::Item> {
439        self.recv()
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use super::*;
446    use std::thread;
447
448    #[test]
449    fn test_bounded_channel_basic() {
450        let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
451
452        channel.send(1).unwrap();
453        channel.send(2).unwrap();
454        channel.send(3).unwrap();
455
456        assert_eq!(channel.recv(), Some(1));
457        assert_eq!(channel.recv(), Some(2));
458        assert_eq!(channel.recv(), Some(3));
459    }
460
461    #[test]
462    fn test_bounded_channel_drop_oldest() {
463        let channel: BoundedChannel<i32> = BoundedChannel::new(2, BackpressureStrategy::DropOldest);
464
465        channel.send(1).unwrap();
466        channel.send(2).unwrap();
467        channel.send(3).unwrap(); // Should drop 1
468
469        let stats = channel.stats();
470        assert_eq!(stats.items_dropped, 1);
471        assert_eq!(channel.recv(), Some(2));
472        assert_eq!(channel.recv(), Some(3));
473    }
474
475    #[test]
476    fn test_bounded_channel_drop_newest() {
477        let channel: BoundedChannel<i32> = BoundedChannel::new(2, BackpressureStrategy::DropNewest);
478
479        channel.send(1).unwrap();
480        channel.send(2).unwrap();
481        let sent = channel.send(3).unwrap(); // Should be dropped
482
483        assert!(!sent);
484        let stats = channel.stats();
485        assert_eq!(stats.items_dropped, 1);
486    }
487
488    #[test]
489    fn test_bounded_channel_close() {
490        let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
491
492        channel.send(1).unwrap();
493        channel.close();
494
495        assert_eq!(channel.recv(), Some(1));
496        assert_eq!(channel.recv(), None);
497        assert!(channel.send(2).is_err());
498    }
499
500    #[test]
501    fn test_bounded_channel_threaded() {
502        let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
503        let sender = channel.clone();
504
505        let handle = thread::spawn(move || {
506            for i in 0..100 {
507                sender.send(i).unwrap();
508            }
509            sender.close();
510        });
511
512        let mut received = Vec::new();
513        while let Some(item) = channel.recv() {
514            received.push(item);
515        }
516
517        handle.join().unwrap();
518
519        assert_eq!(received, (0..100).collect::<Vec<_>>());
520    }
521
522    #[test]
523    fn test_stream_channel() {
524        let (sender, receiver) = stream_channel::<i32>(10, BackpressureStrategy::Block);
525
526        sender.send_data(1).unwrap();
527        sender.send_data(2).unwrap();
528        sender.close();
529
530        let events: Vec<_> = receiver.collect();
531        assert_eq!(events.len(), 2);
532
533        assert!(matches!(events[0], StreamEvent::Data(1)));
534        assert!(matches!(events[1], StreamEvent::Data(2)));
535    }
536
537    #[test]
538    fn test_channel_stats() {
539        let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
540
541        channel.send(1).unwrap();
542        channel.send(2).unwrap();
543        channel.recv();
544
545        let stats = channel.stats();
546        assert_eq!(stats.items_sent, 2);
547        assert_eq!(stats.items_received, 1);
548        assert_eq!(stats.buffer_size, 1);
549    }
550}