Skip to main content

lsl_core/
send_buffer.rs

1//! Send buffer: single-producer, multiple-consumer broadcast buffer for samples.
2
3use crate::sample::Sample;
4use crossbeam_channel::{unbounded, Receiver, Sender};
5use parking_lot::Mutex;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::Arc;
8
9/// A broadcast-style send buffer. The outlet pushes samples in, and each consumer
10/// (TCP client session) gets its own queue.
11pub struct SendBuffer {
12    consumers: Mutex<Vec<ConsumerEntry>>,
13    has_consumers: AtomicBool,
14}
15
16struct ConsumerEntry {
17    sender: Sender<Option<Sample>>,
18    max_buffered: usize,
19}
20
21impl SendBuffer {
22    pub fn new() -> Arc<Self> {
23        Arc::new(SendBuffer {
24            consumers: Mutex::new(Vec::new()),
25            has_consumers: AtomicBool::new(false),
26        })
27    }
28
29    /// Push a sample to all consumers
30    pub fn push_sample(&self, sample: Sample) {
31        let mut consumers = self.consumers.lock();
32        consumers.retain(|c| {
33            // Drop oldest if over capacity
34            if c.sender.len() > c.max_buffered {
35                let _ = c.sender.try_send(None); // won't help, but we can't recv here
36            }
37            c.sender.send(Some(sample.clone())).is_ok()
38        });
39        self.has_consumers
40            .store(!consumers.is_empty(), Ordering::Relaxed);
41    }
42
43    /// Wake up consumers (e.g., during shutdown)
44    pub fn push_sentinel(&self) {
45        let consumers = self.consumers.lock();
46        for c in consumers.iter() {
47            let _ = c.sender.send(None);
48        }
49    }
50
51    /// Register a new consumer and return its receiver
52    pub fn new_consumer(&self, max_buffered: usize) -> Receiver<Option<Sample>> {
53        let (tx, rx) = unbounded();
54        let mut consumers = self.consumers.lock();
55        consumers.push(ConsumerEntry {
56            sender: tx,
57            max_buffered,
58        });
59        self.has_consumers.store(true, Ordering::Relaxed);
60        rx
61    }
62
63    /// Check if there are active consumers
64    pub fn have_consumers(&self) -> bool {
65        // clean up dead senders while checking
66        let mut consumers = self.consumers.lock();
67        consumers.retain(|c| !c.sender.is_empty() || c.sender.is_empty());
68        let has = !consumers.is_empty();
69        self.has_consumers.store(has, Ordering::Relaxed);
70        has
71    }
72
73    /// Wait until at least one consumer is registered
74    pub fn wait_for_consumers(&self, timeout: f64) -> bool {
75        let deadline =
76            std::time::Instant::now() + std::time::Duration::from_secs_f64(timeout.max(0.0));
77        loop {
78            if !self.consumers.lock().is_empty() {
79                return true;
80            }
81            if std::time::Instant::now() >= deadline {
82                return false;
83            }
84            std::thread::sleep(std::time::Duration::from_millis(5));
85        }
86    }
87}