datasynth_core/streaming/
channel.rs1use std::collections::VecDeque;
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::{Arc, Condvar, Mutex};
9use std::time::{Duration, Instant};
10
11use crate::error::{SynthError, SynthResult};
12use crate::traits::{BackpressureStrategy, StreamEvent};
13
14#[derive(Debug, Clone, Default)]
16pub struct ChannelStats {
17 pub items_sent: u64,
19 pub items_received: u64,
21 pub items_dropped: u64,
23 pub buffer_size: usize,
25 pub max_buffer_size: usize,
27 pub send_blocks: u64,
29 pub receive_blocks: u64,
31}
32
33pub struct BoundedChannel<T> {
35 inner: Arc<ChannelInner<T>>,
37 capacity: usize,
39 strategy: BackpressureStrategy,
41}
42
43struct ChannelInner<T> {
44 buffer: Mutex<VecDeque<T>>,
46 not_full: Condvar,
48 not_empty: Condvar,
50 closed: AtomicBool,
52 items_sent: AtomicU64,
54 items_received: AtomicU64,
55 items_dropped: AtomicU64,
56 send_blocks: AtomicU64,
57 receive_blocks: AtomicU64,
58 max_buffer_size: AtomicU64,
59}
60
61impl<T> BoundedChannel<T> {
62 pub fn new(capacity: usize, strategy: BackpressureStrategy) -> Self {
64 Self {
65 inner: Arc::new(ChannelInner {
66 buffer: Mutex::new(VecDeque::with_capacity(capacity)),
67 not_full: Condvar::new(),
68 not_empty: Condvar::new(),
69 closed: AtomicBool::new(false),
70 items_sent: AtomicU64::new(0),
71 items_received: AtomicU64::new(0),
72 items_dropped: AtomicU64::new(0),
73 send_blocks: AtomicU64::new(0),
74 receive_blocks: AtomicU64::new(0),
75 max_buffer_size: AtomicU64::new(0),
76 }),
77 capacity,
78 strategy,
79 }
80 }
81
82 pub fn send(&self, item: T) -> SynthResult<bool> {
87 if self.inner.closed.load(Ordering::SeqCst) {
88 return Err(SynthError::ChannelClosed);
89 }
90
91 let mut buffer = self.inner.buffer.lock().expect("mutex poisoned");
92
93 if buffer.len() >= self.capacity {
95 match self.strategy {
96 BackpressureStrategy::Block => {
97 self.inner.send_blocks.fetch_add(1, Ordering::Relaxed);
98 buffer = self
100 .inner
101 .not_full
102 .wait_while(buffer, |b| {
103 b.len() >= self.capacity && !self.inner.closed.load(Ordering::SeqCst)
104 })
105 .expect("condvar wait");
106
107 if self.inner.closed.load(Ordering::SeqCst) {
108 return Err(SynthError::ChannelClosed);
109 }
110 }
111 BackpressureStrategy::DropOldest => {
112 buffer.pop_front();
114 self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
115 }
116 BackpressureStrategy::DropNewest => {
117 self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
119 return Ok(false);
120 }
121 BackpressureStrategy::Buffer { max_overflow } => {
122 if buffer.len() >= self.capacity + max_overflow {
124 self.inner.send_blocks.fetch_add(1, Ordering::Relaxed);
125 buffer = self
126 .inner
127 .not_full
128 .wait_while(buffer, |b| {
129 b.len() >= self.capacity + max_overflow
130 && !self.inner.closed.load(Ordering::SeqCst)
131 })
132 .expect("condvar wait");
133
134 if self.inner.closed.load(Ordering::SeqCst) {
135 return Err(SynthError::ChannelClosed);
136 }
137 }
138 }
139 }
140 }
141
142 buffer.push_back(item);
143 let current_size = buffer.len() as u64;
144 self.inner.items_sent.fetch_add(1, Ordering::Relaxed);
145
146 let mut max_size = self.inner.max_buffer_size.load(Ordering::Relaxed);
148 while current_size > max_size {
149 match self.inner.max_buffer_size.compare_exchange_weak(
150 max_size,
151 current_size,
152 Ordering::SeqCst,
153 Ordering::Relaxed,
154 ) {
155 Ok(_) => break,
156 Err(x) => max_size = x,
157 }
158 }
159
160 drop(buffer);
161 self.inner.not_empty.notify_one();
162
163 Ok(true)
164 }
165
166 pub fn send_timeout(&self, item: T, timeout: Duration) -> SynthResult<bool> {
168 if self.inner.closed.load(Ordering::SeqCst) {
169 return Err(SynthError::ChannelClosed);
170 }
171
172 let deadline = Instant::now() + timeout;
173 let mut buffer = self.inner.buffer.lock().expect("mutex poisoned");
174
175 while buffer.len() >= self.capacity {
177 if self.inner.closed.load(Ordering::SeqCst) {
178 return Err(SynthError::ChannelClosed);
179 }
180
181 let remaining = deadline.saturating_duration_since(Instant::now());
182 if remaining.is_zero() {
183 match self.strategy {
185 BackpressureStrategy::DropNewest => {
186 self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
187 return Ok(false);
188 }
189 BackpressureStrategy::DropOldest => {
190 buffer.pop_front();
191 self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
192 break;
193 }
194 _ => {
195 return Err(SynthError::GenerationError("send timeout".to_string()));
196 }
197 }
198 }
199
200 let (new_buffer, wait_result) = self
201 .inner
202 .not_full
203 .wait_timeout(buffer, remaining)
204 .expect("condvar wait");
205 buffer = new_buffer;
206
207 if wait_result.timed_out() && buffer.len() >= self.capacity {
208 match self.strategy {
209 BackpressureStrategy::DropNewest => {
210 self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
211 return Ok(false);
212 }
213 BackpressureStrategy::DropOldest => {
214 buffer.pop_front();
215 self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
216 break;
217 }
218 _ => {
219 return Err(SynthError::GenerationError("send timeout".to_string()));
220 }
221 }
222 }
223 }
224
225 buffer.push_back(item);
226 self.inner.items_sent.fetch_add(1, Ordering::Relaxed);
227 drop(buffer);
228 self.inner.not_empty.notify_one();
229
230 Ok(true)
231 }
232
233 pub fn recv(&self) -> Option<T> {
237 let mut buffer = self.inner.buffer.lock().expect("mutex poisoned");
238
239 while buffer.is_empty() {
240 if self.inner.closed.load(Ordering::SeqCst) {
241 return None;
242 }
243 self.inner.receive_blocks.fetch_add(1, Ordering::Relaxed);
244 buffer = self.inner.not_empty.wait(buffer).expect("condvar wait");
245 }
246
247 let item = buffer.pop_front();
248 if item.is_some() {
249 self.inner.items_received.fetch_add(1, Ordering::Relaxed);
250 }
251 drop(buffer);
252 self.inner.not_full.notify_one();
253
254 item
255 }
256
257 pub fn recv_timeout(&self, timeout: Duration) -> Option<T> {
259 let deadline = Instant::now() + timeout;
260 let mut buffer = self.inner.buffer.lock().expect("mutex poisoned");
261
262 while buffer.is_empty() {
263 if self.inner.closed.load(Ordering::SeqCst) {
264 return None;
265 }
266
267 let remaining = deadline.saturating_duration_since(Instant::now());
268 if remaining.is_zero() {
269 return None;
270 }
271
272 let (new_buffer, wait_result) = self
273 .inner
274 .not_empty
275 .wait_timeout(buffer, remaining)
276 .expect("condvar wait");
277 buffer = new_buffer;
278
279 if wait_result.timed_out() && buffer.is_empty() {
280 return None;
281 }
282 }
283
284 let item = buffer.pop_front();
285 if item.is_some() {
286 self.inner.items_received.fetch_add(1, Ordering::Relaxed);
287 }
288 drop(buffer);
289 self.inner.not_full.notify_one();
290
291 item
292 }
293
294 pub fn try_recv(&self) -> Option<T> {
296 let mut buffer = self.inner.buffer.lock().expect("mutex poisoned");
297 let item = buffer.pop_front();
298 if item.is_some() {
299 self.inner.items_received.fetch_add(1, Ordering::Relaxed);
300 drop(buffer);
301 self.inner.not_full.notify_one();
302 }
303 item
304 }
305
306 pub fn close(&self) {
308 self.inner.closed.store(true, Ordering::SeqCst);
309 self.inner.not_full.notify_all();
310 self.inner.not_empty.notify_all();
311 }
312
313 pub fn is_closed(&self) -> bool {
315 self.inner.closed.load(Ordering::SeqCst)
316 }
317
318 pub fn len(&self) -> usize {
320 self.inner.buffer.lock().expect("mutex poisoned").len()
321 }
322
323 pub fn is_empty(&self) -> bool {
325 self.len() == 0
326 }
327
328 pub fn capacity(&self) -> usize {
330 self.capacity
331 }
332
333 pub fn stats(&self) -> ChannelStats {
335 ChannelStats {
336 items_sent: self.inner.items_sent.load(Ordering::Relaxed),
337 items_received: self.inner.items_received.load(Ordering::Relaxed),
338 items_dropped: self.inner.items_dropped.load(Ordering::Relaxed),
339 buffer_size: self.len(),
340 max_buffer_size: self.inner.max_buffer_size.load(Ordering::Relaxed) as usize,
341 send_blocks: self.inner.send_blocks.load(Ordering::Relaxed),
342 receive_blocks: self.inner.receive_blocks.load(Ordering::Relaxed),
343 }
344 }
345}
346
347impl<T> Clone for BoundedChannel<T> {
348 fn clone(&self) -> Self {
349 Self {
350 inner: Arc::clone(&self.inner),
351 capacity: self.capacity,
352 strategy: self.strategy,
353 }
354 }
355}
356
357pub fn stream_channel<T>(
359 capacity: usize,
360 strategy: BackpressureStrategy,
361) -> (StreamSender<T>, StreamReceiver<T>) {
362 let channel = BoundedChannel::new(capacity, strategy);
363 (
364 StreamSender {
365 channel: channel.clone(),
366 },
367 StreamReceiver { channel },
368 )
369}
370
371pub struct StreamSender<T> {
373 channel: BoundedChannel<StreamEvent<T>>,
374}
375
376impl<T> StreamSender<T> {
377 pub fn send(&self, event: StreamEvent<T>) -> SynthResult<bool> {
379 self.channel.send(event)
380 }
381
382 pub fn send_data(&self, item: T) -> SynthResult<bool> {
384 self.channel.send(StreamEvent::Data(item))
385 }
386
387 pub fn close(&self) {
389 self.channel.close();
390 }
391
392 pub fn stats(&self) -> ChannelStats {
394 self.channel.stats()
395 }
396}
397
398impl<T> Clone for StreamSender<T> {
399 fn clone(&self) -> Self {
400 Self {
401 channel: self.channel.clone(),
402 }
403 }
404}
405
406pub struct StreamReceiver<T> {
408 channel: BoundedChannel<StreamEvent<T>>,
409}
410
411impl<T> StreamReceiver<T> {
412 pub fn recv(&self) -> Option<StreamEvent<T>> {
414 self.channel.recv()
415 }
416
417 pub fn recv_timeout(&self, timeout: Duration) -> Option<StreamEvent<T>> {
419 self.channel.recv_timeout(timeout)
420 }
421
422 pub fn try_recv(&self) -> Option<StreamEvent<T>> {
424 self.channel.try_recv()
425 }
426
427 pub fn is_closed(&self) -> bool {
429 self.channel.is_closed()
430 }
431
432 pub fn stats(&self) -> ChannelStats {
434 self.channel.stats()
435 }
436}
437
438impl<T> Iterator for StreamReceiver<T> {
439 type Item = StreamEvent<T>;
440
441 fn next(&mut self) -> Option<Self::Item> {
442 self.recv()
443 }
444}
445
446#[cfg(test)]
447#[allow(clippy::unwrap_used)]
448mod tests {
449 use super::*;
450 use std::thread;
451
452 #[test]
453 fn test_bounded_channel_basic() {
454 let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
455
456 channel.send(1).unwrap();
457 channel.send(2).unwrap();
458 channel.send(3).unwrap();
459
460 assert_eq!(channel.recv(), Some(1));
461 assert_eq!(channel.recv(), Some(2));
462 assert_eq!(channel.recv(), Some(3));
463 }
464
465 #[test]
466 fn test_bounded_channel_drop_oldest() {
467 let channel: BoundedChannel<i32> = BoundedChannel::new(2, BackpressureStrategy::DropOldest);
468
469 channel.send(1).unwrap();
470 channel.send(2).unwrap();
471 channel.send(3).unwrap(); let stats = channel.stats();
474 assert_eq!(stats.items_dropped, 1);
475 assert_eq!(channel.recv(), Some(2));
476 assert_eq!(channel.recv(), Some(3));
477 }
478
479 #[test]
480 fn test_bounded_channel_drop_newest() {
481 let channel: BoundedChannel<i32> = BoundedChannel::new(2, BackpressureStrategy::DropNewest);
482
483 channel.send(1).unwrap();
484 channel.send(2).unwrap();
485 let sent = channel.send(3).unwrap(); assert!(!sent);
488 let stats = channel.stats();
489 assert_eq!(stats.items_dropped, 1);
490 }
491
492 #[test]
493 fn test_bounded_channel_close() {
494 let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
495
496 channel.send(1).unwrap();
497 channel.close();
498
499 assert_eq!(channel.recv(), Some(1));
500 assert_eq!(channel.recv(), None);
501 assert!(channel.send(2).is_err());
502 }
503
504 #[test]
505 fn test_bounded_channel_threaded() {
506 let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
507 let sender = channel.clone();
508
509 let handle = thread::spawn(move || {
510 for i in 0..100 {
511 sender.send(i).unwrap();
512 }
513 sender.close();
514 });
515
516 let mut received = Vec::new();
517 while let Some(item) = channel.recv() {
518 received.push(item);
519 }
520
521 handle.join().unwrap();
522
523 assert_eq!(received, (0..100).collect::<Vec<_>>());
524 }
525
526 #[test]
527 fn test_stream_channel() {
528 let (sender, receiver) = stream_channel::<i32>(10, BackpressureStrategy::Block);
529
530 sender.send_data(1).unwrap();
531 sender.send_data(2).unwrap();
532 sender.close();
533
534 let events: Vec<_> = receiver.collect();
535 assert_eq!(events.len(), 2);
536
537 assert!(matches!(events[0], StreamEvent::Data(1)));
538 assert!(matches!(events[1], StreamEvent::Data(2)));
539 }
540
541 #[test]
542 fn test_channel_stats() {
543 let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
544
545 channel.send(1).unwrap();
546 channel.send(2).unwrap();
547 channel.recv();
548
549 let stats = channel.stats();
550 assert_eq!(stats.items_sent, 2);
551 assert_eq!(stats.items_received, 1);
552 assert_eq!(stats.buffer_size, 1);
553 }
554}