Skip to main content

lsl_core/
outlet.rs

1//! StreamOutlet: publishes data on the lab network.
2
3use crate::clock::local_clock;
4use crate::sample::Sample;
5use crate::send_buffer::SendBuffer;
6use crate::stream_info::StreamInfo;
7use crate::tcp_server::TcpServer;
8use crate::types::*;
9use crate::udp_server::UdpServer;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::Arc;
12
13/// A stream outlet. Creates a discoverable stream on the network.
14pub struct StreamOutlet {
15    info: StreamInfo,
16    send_buffer: Arc<SendBuffer>,
17    shutdown: Arc<AtomicBool>,
18    #[allow(dead_code)]
19    chunk_size: i32,
20}
21
22impl StreamOutlet {
23    /// Create a new stream outlet.
24    pub fn new(info: &StreamInfo, chunk_size: i32, _max_buffered: i32) -> Self {
25        let send_buffer = SendBuffer::new();
26
27        // Set up the stream identity
28        info.reset_uid();
29        info.set_created_at(local_clock());
30        info.set_session_id(&crate::config::CONFIG.session_id);
31        info.set_hostname(
32            &hostname::get()
33                .map(|s| s.to_string_lossy().into_owned())
34                .unwrap_or_default(),
35        );
36
37        // Start TCP data server (IPv4 + IPv6)
38        let tcp = TcpServer::start(info.clone(), send_buffer.clone(), chunk_size);
39        info.set_v4data_port(tcp.v4_port);
40        info.set_v6data_port(tcp.v6_port);
41
42        // Start UDP service (time sync, IPv4 + IPv6)
43        let (v4_svc_port, v6_svc_port) =
44            UdpServer::start_unicast(info.clone(), tcp.shutdown.clone());
45        info.set_v4service_port(v4_svc_port);
46        info.set_v6service_port(v6_svc_port);
47
48        // Start multicast responders (IPv4 + IPv6 groups)
49        UdpServer::start_multicast(info.clone(), tcp.shutdown.clone());
50        let tcp_shutdown = tcp.shutdown;
51
52        StreamOutlet {
53            info: info.clone(),
54            send_buffer,
55            shutdown: tcp_shutdown,
56            chunk_size,
57        }
58    }
59
60    /// Get the stream info
61    pub fn info(&self) -> &StreamInfo {
62        &self.info
63    }
64
65    /// Push a single float sample
66    pub fn push_sample_f(&self, data: &[f32], timestamp: f64, pushthrough: bool) {
67        let ts = if timestamp == 0.0 {
68            local_clock()
69        } else {
70            timestamp
71        };
72        let mut sample = Sample::new(self.info.channel_format(), self.info.channel_count(), ts);
73        sample.assign_f32(data);
74        sample.pushthrough = pushthrough;
75        self.send_buffer.push_sample(sample);
76    }
77
78    /// Push a single double sample
79    pub fn push_sample_d(&self, data: &[f64], timestamp: f64, pushthrough: bool) {
80        let ts = if timestamp == 0.0 {
81            local_clock()
82        } else {
83            timestamp
84        };
85        let mut sample = Sample::new(self.info.channel_format(), self.info.channel_count(), ts);
86        sample.assign_f64(data);
87        sample.pushthrough = pushthrough;
88        self.send_buffer.push_sample(sample);
89    }
90
91    pub fn push_sample_i32(&self, data: &[i32], timestamp: f64, pushthrough: bool) {
92        let ts = if timestamp == 0.0 {
93            local_clock()
94        } else {
95            timestamp
96        };
97        let mut sample = Sample::new(self.info.channel_format(), self.info.channel_count(), ts);
98        sample.assign_i32(data);
99        sample.pushthrough = pushthrough;
100        self.send_buffer.push_sample(sample);
101    }
102
103    pub fn push_sample_i16(&self, data: &[i16], timestamp: f64, pushthrough: bool) {
104        let ts = if timestamp == 0.0 {
105            local_clock()
106        } else {
107            timestamp
108        };
109        let mut sample = Sample::new(self.info.channel_format(), self.info.channel_count(), ts);
110        sample.assign_i16(data);
111        sample.pushthrough = pushthrough;
112        self.send_buffer.push_sample(sample);
113    }
114
115    pub fn push_sample_i64(&self, data: &[i64], timestamp: f64, pushthrough: bool) {
116        let ts = if timestamp == 0.0 {
117            local_clock()
118        } else {
119            timestamp
120        };
121        let mut sample = Sample::new(self.info.channel_format(), self.info.channel_count(), ts);
122        sample.assign_i64(data);
123        sample.pushthrough = pushthrough;
124        self.send_buffer.push_sample(sample);
125    }
126
127    pub fn push_sample_str(&self, data: &[String], timestamp: f64, pushthrough: bool) {
128        let ts = if timestamp == 0.0 {
129            local_clock()
130        } else {
131            timestamp
132        };
133        let mut sample = Sample::new(self.info.channel_format(), self.info.channel_count(), ts);
134        sample.assign_strings(data);
135        sample.pushthrough = pushthrough;
136        self.send_buffer.push_sample(sample);
137    }
138
139    pub fn push_sample_raw(&self, data: &[u8], timestamp: f64, pushthrough: bool) {
140        let ts = if timestamp == 0.0 {
141            local_clock()
142        } else {
143            timestamp
144        };
145        let mut sample = Sample::new(self.info.channel_format(), self.info.channel_count(), ts);
146        sample.assign_raw(data);
147        sample.pushthrough = pushthrough;
148        self.send_buffer.push_sample(sample);
149    }
150
151    /// Push a chunk of multiplexed float data
152    pub fn push_chunk_f(&self, data: &[f32], timestamp: f64, pushthrough: bool) {
153        let nch = self.info.channel_count() as usize;
154        if nch == 0 {
155            return;
156        }
157        let n_samples = data.len() / nch;
158        let srate = self.info.nominal_srate();
159        let mut ts = if timestamp == 0.0 {
160            local_clock()
161        } else {
162            timestamp
163        };
164        if srate != IRREGULAR_RATE && n_samples > 1 {
165            ts -= (n_samples - 1) as f64 / srate;
166        }
167        for i in 0..n_samples {
168            let chunk = &data[i * nch..(i + 1) * nch];
169            let is_last = i == n_samples - 1;
170            let sample_ts = if i == 0 { ts } else { DEDUCED_TIMESTAMP };
171            self.push_sample_f(chunk, sample_ts, pushthrough && is_last);
172            if srate != IRREGULAR_RATE && i == 0 {
173                // subsequent samples use deduced timestamp
174            }
175        }
176    }
177
178    /// Check if there are consumers
179    pub fn have_consumers(&self) -> bool {
180        self.send_buffer.have_consumers()
181    }
182
183    /// Wait for consumers
184    pub fn wait_for_consumers(&self, timeout: f64) -> bool {
185        self.send_buffer.wait_for_consumers(timeout)
186    }
187}
188
189impl Drop for StreamOutlet {
190    fn drop(&mut self) {
191        self.shutdown.store(true, Ordering::Relaxed);
192        self.send_buffer.push_sentinel();
193    }
194}