datasynth_core/streaming/
channel.rs1use std::collections::VecDeque;
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::{Arc, Condvar, Mutex};
9use std::time::{Duration, Instant};
10
11use crate::error::{SynthError, SynthResult};
12use crate::traits::{BackpressureStrategy, StreamEvent};
13
14#[derive(Debug, Clone, Default)]
16pub struct ChannelStats {
17 pub items_sent: u64,
19 pub items_received: u64,
21 pub items_dropped: u64,
23 pub buffer_size: usize,
25 pub max_buffer_size: usize,
27 pub send_blocks: u64,
29 pub receive_blocks: u64,
31}
32
33pub struct BoundedChannel<T> {
35 inner: Arc<ChannelInner<T>>,
37 capacity: usize,
39 strategy: BackpressureStrategy,
41}
42
43struct ChannelInner<T> {
44 buffer: Mutex<VecDeque<T>>,
46 not_full: Condvar,
48 not_empty: Condvar,
50 closed: AtomicBool,
52 items_sent: AtomicU64,
54 items_received: AtomicU64,
55 items_dropped: AtomicU64,
56 send_blocks: AtomicU64,
57 receive_blocks: AtomicU64,
58 max_buffer_size: AtomicU64,
59}
60
61impl<T> BoundedChannel<T> {
62 pub fn new(capacity: usize, strategy: BackpressureStrategy) -> Self {
64 Self {
65 inner: Arc::new(ChannelInner {
66 buffer: Mutex::new(VecDeque::with_capacity(capacity)),
67 not_full: Condvar::new(),
68 not_empty: Condvar::new(),
69 closed: AtomicBool::new(false),
70 items_sent: AtomicU64::new(0),
71 items_received: AtomicU64::new(0),
72 items_dropped: AtomicU64::new(0),
73 send_blocks: AtomicU64::new(0),
74 receive_blocks: AtomicU64::new(0),
75 max_buffer_size: AtomicU64::new(0),
76 }),
77 capacity,
78 strategy,
79 }
80 }
81
82 pub fn send(&self, item: T) -> SynthResult<bool> {
87 if self.inner.closed.load(Ordering::SeqCst) {
88 return Err(SynthError::ChannelClosed);
89 }
90
91 let mut buffer = self.inner.buffer.lock().unwrap();
92
93 if buffer.len() >= self.capacity {
95 match self.strategy {
96 BackpressureStrategy::Block => {
97 self.inner.send_blocks.fetch_add(1, Ordering::Relaxed);
98 buffer = self
100 .inner
101 .not_full
102 .wait_while(buffer, |b| {
103 b.len() >= self.capacity && !self.inner.closed.load(Ordering::SeqCst)
104 })
105 .unwrap();
106
107 if self.inner.closed.load(Ordering::SeqCst) {
108 return Err(SynthError::ChannelClosed);
109 }
110 }
111 BackpressureStrategy::DropOldest => {
112 buffer.pop_front();
114 self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
115 }
116 BackpressureStrategy::DropNewest => {
117 self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
119 return Ok(false);
120 }
121 BackpressureStrategy::Buffer { max_overflow } => {
122 if buffer.len() >= self.capacity + max_overflow {
124 self.inner.send_blocks.fetch_add(1, Ordering::Relaxed);
125 buffer = self
126 .inner
127 .not_full
128 .wait_while(buffer, |b| {
129 b.len() >= self.capacity + max_overflow
130 && !self.inner.closed.load(Ordering::SeqCst)
131 })
132 .unwrap();
133
134 if self.inner.closed.load(Ordering::SeqCst) {
135 return Err(SynthError::ChannelClosed);
136 }
137 }
138 }
139 }
140 }
141
142 buffer.push_back(item);
143 let current_size = buffer.len() as u64;
144 self.inner.items_sent.fetch_add(1, Ordering::Relaxed);
145
146 let mut max_size = self.inner.max_buffer_size.load(Ordering::Relaxed);
148 while current_size > max_size {
149 match self.inner.max_buffer_size.compare_exchange_weak(
150 max_size,
151 current_size,
152 Ordering::SeqCst,
153 Ordering::Relaxed,
154 ) {
155 Ok(_) => break,
156 Err(x) => max_size = x,
157 }
158 }
159
160 drop(buffer);
161 self.inner.not_empty.notify_one();
162
163 Ok(true)
164 }
165
166 pub fn send_timeout(&self, item: T, timeout: Duration) -> SynthResult<bool> {
168 if self.inner.closed.load(Ordering::SeqCst) {
169 return Err(SynthError::ChannelClosed);
170 }
171
172 let deadline = Instant::now() + timeout;
173 let mut buffer = self.inner.buffer.lock().unwrap();
174
175 while buffer.len() >= self.capacity {
177 if self.inner.closed.load(Ordering::SeqCst) {
178 return Err(SynthError::ChannelClosed);
179 }
180
181 let remaining = deadline.saturating_duration_since(Instant::now());
182 if remaining.is_zero() {
183 match self.strategy {
185 BackpressureStrategy::DropNewest => {
186 self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
187 return Ok(false);
188 }
189 BackpressureStrategy::DropOldest => {
190 buffer.pop_front();
191 self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
192 break;
193 }
194 _ => {
195 return Err(SynthError::GenerationError("send timeout".to_string()));
196 }
197 }
198 }
199
200 let (new_buffer, wait_result) =
201 self.inner.not_full.wait_timeout(buffer, remaining).unwrap();
202 buffer = new_buffer;
203
204 if wait_result.timed_out() && buffer.len() >= self.capacity {
205 match self.strategy {
206 BackpressureStrategy::DropNewest => {
207 self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
208 return Ok(false);
209 }
210 BackpressureStrategy::DropOldest => {
211 buffer.pop_front();
212 self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
213 break;
214 }
215 _ => {
216 return Err(SynthError::GenerationError("send timeout".to_string()));
217 }
218 }
219 }
220 }
221
222 buffer.push_back(item);
223 self.inner.items_sent.fetch_add(1, Ordering::Relaxed);
224 drop(buffer);
225 self.inner.not_empty.notify_one();
226
227 Ok(true)
228 }
229
230 pub fn recv(&self) -> Option<T> {
234 let mut buffer = self.inner.buffer.lock().unwrap();
235
236 while buffer.is_empty() {
237 if self.inner.closed.load(Ordering::SeqCst) {
238 return None;
239 }
240 self.inner.receive_blocks.fetch_add(1, Ordering::Relaxed);
241 buffer = self.inner.not_empty.wait(buffer).unwrap();
242 }
243
244 let item = buffer.pop_front();
245 if item.is_some() {
246 self.inner.items_received.fetch_add(1, Ordering::Relaxed);
247 }
248 drop(buffer);
249 self.inner.not_full.notify_one();
250
251 item
252 }
253
254 pub fn recv_timeout(&self, timeout: Duration) -> Option<T> {
256 let deadline = Instant::now() + timeout;
257 let mut buffer = self.inner.buffer.lock().unwrap();
258
259 while buffer.is_empty() {
260 if self.inner.closed.load(Ordering::SeqCst) {
261 return None;
262 }
263
264 let remaining = deadline.saturating_duration_since(Instant::now());
265 if remaining.is_zero() {
266 return None;
267 }
268
269 let (new_buffer, wait_result) = self
270 .inner
271 .not_empty
272 .wait_timeout(buffer, remaining)
273 .unwrap();
274 buffer = new_buffer;
275
276 if wait_result.timed_out() && buffer.is_empty() {
277 return None;
278 }
279 }
280
281 let item = buffer.pop_front();
282 if item.is_some() {
283 self.inner.items_received.fetch_add(1, Ordering::Relaxed);
284 }
285 drop(buffer);
286 self.inner.not_full.notify_one();
287
288 item
289 }
290
291 pub fn try_recv(&self) -> Option<T> {
293 let mut buffer = self.inner.buffer.lock().unwrap();
294 let item = buffer.pop_front();
295 if item.is_some() {
296 self.inner.items_received.fetch_add(1, Ordering::Relaxed);
297 drop(buffer);
298 self.inner.not_full.notify_one();
299 }
300 item
301 }
302
303 pub fn close(&self) {
305 self.inner.closed.store(true, Ordering::SeqCst);
306 self.inner.not_full.notify_all();
307 self.inner.not_empty.notify_all();
308 }
309
310 pub fn is_closed(&self) -> bool {
312 self.inner.closed.load(Ordering::SeqCst)
313 }
314
315 pub fn len(&self) -> usize {
317 self.inner.buffer.lock().unwrap().len()
318 }
319
320 pub fn is_empty(&self) -> bool {
322 self.len() == 0
323 }
324
325 pub fn capacity(&self) -> usize {
327 self.capacity
328 }
329
330 pub fn stats(&self) -> ChannelStats {
332 ChannelStats {
333 items_sent: self.inner.items_sent.load(Ordering::Relaxed),
334 items_received: self.inner.items_received.load(Ordering::Relaxed),
335 items_dropped: self.inner.items_dropped.load(Ordering::Relaxed),
336 buffer_size: self.len(),
337 max_buffer_size: self.inner.max_buffer_size.load(Ordering::Relaxed) as usize,
338 send_blocks: self.inner.send_blocks.load(Ordering::Relaxed),
339 receive_blocks: self.inner.receive_blocks.load(Ordering::Relaxed),
340 }
341 }
342}
343
344impl<T> Clone for BoundedChannel<T> {
345 fn clone(&self) -> Self {
346 Self {
347 inner: Arc::clone(&self.inner),
348 capacity: self.capacity,
349 strategy: self.strategy,
350 }
351 }
352}
353
354pub fn stream_channel<T>(
356 capacity: usize,
357 strategy: BackpressureStrategy,
358) -> (StreamSender<T>, StreamReceiver<T>) {
359 let channel = BoundedChannel::new(capacity, strategy);
360 (
361 StreamSender {
362 channel: channel.clone(),
363 },
364 StreamReceiver { channel },
365 )
366}
367
368pub struct StreamSender<T> {
370 channel: BoundedChannel<StreamEvent<T>>,
371}
372
373impl<T> StreamSender<T> {
374 pub fn send(&self, event: StreamEvent<T>) -> SynthResult<bool> {
376 self.channel.send(event)
377 }
378
379 pub fn send_data(&self, item: T) -> SynthResult<bool> {
381 self.channel.send(StreamEvent::Data(item))
382 }
383
384 pub fn close(&self) {
386 self.channel.close();
387 }
388
389 pub fn stats(&self) -> ChannelStats {
391 self.channel.stats()
392 }
393}
394
395impl<T> Clone for StreamSender<T> {
396 fn clone(&self) -> Self {
397 Self {
398 channel: self.channel.clone(),
399 }
400 }
401}
402
403pub struct StreamReceiver<T> {
405 channel: BoundedChannel<StreamEvent<T>>,
406}
407
408impl<T> StreamReceiver<T> {
409 pub fn recv(&self) -> Option<StreamEvent<T>> {
411 self.channel.recv()
412 }
413
414 pub fn recv_timeout(&self, timeout: Duration) -> Option<StreamEvent<T>> {
416 self.channel.recv_timeout(timeout)
417 }
418
419 pub fn try_recv(&self) -> Option<StreamEvent<T>> {
421 self.channel.try_recv()
422 }
423
424 pub fn is_closed(&self) -> bool {
426 self.channel.is_closed()
427 }
428
429 pub fn stats(&self) -> ChannelStats {
431 self.channel.stats()
432 }
433}
434
435impl<T> Iterator for StreamReceiver<T> {
436 type Item = StreamEvent<T>;
437
438 fn next(&mut self) -> Option<Self::Item> {
439 self.recv()
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 use super::*;
446 use std::thread;
447
448 #[test]
449 fn test_bounded_channel_basic() {
450 let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
451
452 channel.send(1).unwrap();
453 channel.send(2).unwrap();
454 channel.send(3).unwrap();
455
456 assert_eq!(channel.recv(), Some(1));
457 assert_eq!(channel.recv(), Some(2));
458 assert_eq!(channel.recv(), Some(3));
459 }
460
461 #[test]
462 fn test_bounded_channel_drop_oldest() {
463 let channel: BoundedChannel<i32> = BoundedChannel::new(2, BackpressureStrategy::DropOldest);
464
465 channel.send(1).unwrap();
466 channel.send(2).unwrap();
467 channel.send(3).unwrap(); let stats = channel.stats();
470 assert_eq!(stats.items_dropped, 1);
471 assert_eq!(channel.recv(), Some(2));
472 assert_eq!(channel.recv(), Some(3));
473 }
474
475 #[test]
476 fn test_bounded_channel_drop_newest() {
477 let channel: BoundedChannel<i32> = BoundedChannel::new(2, BackpressureStrategy::DropNewest);
478
479 channel.send(1).unwrap();
480 channel.send(2).unwrap();
481 let sent = channel.send(3).unwrap(); assert!(!sent);
484 let stats = channel.stats();
485 assert_eq!(stats.items_dropped, 1);
486 }
487
488 #[test]
489 fn test_bounded_channel_close() {
490 let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
491
492 channel.send(1).unwrap();
493 channel.close();
494
495 assert_eq!(channel.recv(), Some(1));
496 assert_eq!(channel.recv(), None);
497 assert!(channel.send(2).is_err());
498 }
499
500 #[test]
501 fn test_bounded_channel_threaded() {
502 let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
503 let sender = channel.clone();
504
505 let handle = thread::spawn(move || {
506 for i in 0..100 {
507 sender.send(i).unwrap();
508 }
509 sender.close();
510 });
511
512 let mut received = Vec::new();
513 while let Some(item) = channel.recv() {
514 received.push(item);
515 }
516
517 handle.join().unwrap();
518
519 assert_eq!(received, (0..100).collect::<Vec<_>>());
520 }
521
522 #[test]
523 fn test_stream_channel() {
524 let (sender, receiver) = stream_channel::<i32>(10, BackpressureStrategy::Block);
525
526 sender.send_data(1).unwrap();
527 sender.send_data(2).unwrap();
528 sender.close();
529
530 let events: Vec<_> = receiver.collect();
531 assert_eq!(events.len(), 2);
532
533 assert!(matches!(events[0], StreamEvent::Data(1)));
534 assert!(matches!(events[1], StreamEvent::Data(2)));
535 }
536
537 #[test]
538 fn test_channel_stats() {
539 let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
540
541 channel.send(1).unwrap();
542 channel.send(2).unwrap();
543 channel.recv();
544
545 let stats = channel.stats();
546 assert_eq!(stats.items_sent, 2);
547 assert_eq!(stats.items_received, 1);
548 assert_eq!(stats.buffer_size, 1);
549 }
550}