1use crate::sample::Sample;
4use crossbeam_channel::{unbounded, Receiver, Sender};
5use parking_lot::Mutex;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::Arc;
8
9pub struct SendBuffer {
12 consumers: Mutex<Vec<ConsumerEntry>>,
13 has_consumers: AtomicBool,
14}
15
16struct ConsumerEntry {
17 sender: Sender<Option<Sample>>,
18 max_buffered: usize,
19}
20
21impl SendBuffer {
22 pub fn new() -> Arc<Self> {
23 Arc::new(SendBuffer {
24 consumers: Mutex::new(Vec::new()),
25 has_consumers: AtomicBool::new(false),
26 })
27 }
28
29 pub fn push_sample(&self, sample: Sample) {
31 let mut consumers = self.consumers.lock();
32 consumers.retain(|c| {
33 if c.sender.len() > c.max_buffered {
35 let _ = c.sender.try_send(None); }
37 c.sender.send(Some(sample.clone())).is_ok()
38 });
39 self.has_consumers
40 .store(!consumers.is_empty(), Ordering::Relaxed);
41 }
42
43 pub fn push_sentinel(&self) {
45 let consumers = self.consumers.lock();
46 for c in consumers.iter() {
47 let _ = c.sender.send(None);
48 }
49 }
50
51 pub fn new_consumer(&self, max_buffered: usize) -> Receiver<Option<Sample>> {
53 let (tx, rx) = unbounded();
54 let mut consumers = self.consumers.lock();
55 consumers.push(ConsumerEntry {
56 sender: tx,
57 max_buffered,
58 });
59 self.has_consumers.store(true, Ordering::Relaxed);
60 rx
61 }
62
63 pub fn have_consumers(&self) -> bool {
65 let mut consumers = self.consumers.lock();
67 consumers.retain(|c| !c.sender.is_empty() || c.sender.is_empty());
68 let has = !consumers.is_empty();
69 self.has_consumers.store(has, Ordering::Relaxed);
70 has
71 }
72
73 pub fn wait_for_consumers(&self, timeout: f64) -> bool {
75 let deadline =
76 std::time::Instant::now() + std::time::Duration::from_secs_f64(timeout.max(0.0));
77 loop {
78 if !self.consumers.lock().is_empty() {
79 return true;
80 }
81 if std::time::Instant::now() >= deadline {
82 return false;
83 }
84 std::thread::sleep(std::time::Duration::from_millis(5));
85 }
86 }
87}