1use crate::clock::local_clock;
4use crate::sample::Sample;
5use crate::send_buffer::SendBuffer;
6use crate::stream_info::StreamInfo;
7use crate::tcp_server::TcpServer;
8use crate::types::*;
9use crate::udp_server::UdpServer;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::Arc;
12
13pub struct StreamOutlet {
15 info: StreamInfo,
16 send_buffer: Arc<SendBuffer>,
17 shutdown: Arc<AtomicBool>,
18 #[allow(dead_code)]
19 chunk_size: i32,
20}
21
22impl StreamOutlet {
23 pub fn new(info: &StreamInfo, chunk_size: i32, _max_buffered: i32) -> Self {
25 let send_buffer = SendBuffer::new();
26
27 info.reset_uid();
29 info.set_created_at(local_clock());
30 info.set_session_id(&crate::config::CONFIG.session_id);
31 info.set_hostname(
32 &hostname::get()
33 .map(|s| s.to_string_lossy().into_owned())
34 .unwrap_or_default(),
35 );
36
37 let tcp = TcpServer::start(info.clone(), send_buffer.clone(), chunk_size);
39 info.set_v4data_port(tcp.v4_port);
40 info.set_v6data_port(tcp.v6_port);
41
42 let (v4_svc_port, v6_svc_port) =
44 UdpServer::start_unicast(info.clone(), tcp.shutdown.clone());
45 info.set_v4service_port(v4_svc_port);
46 info.set_v6service_port(v6_svc_port);
47
48 UdpServer::start_multicast(info.clone(), tcp.shutdown.clone());
50 let tcp_shutdown = tcp.shutdown;
51
52 StreamOutlet {
53 info: info.clone(),
54 send_buffer,
55 shutdown: tcp_shutdown,
56 chunk_size,
57 }
58 }
59
60 pub fn info(&self) -> &StreamInfo {
62 &self.info
63 }
64
65 pub fn push_sample_f(&self, data: &[f32], timestamp: f64, pushthrough: bool) {
67 let ts = if timestamp == 0.0 {
68 local_clock()
69 } else {
70 timestamp
71 };
72 let mut sample = Sample::new(self.info.channel_format(), self.info.channel_count(), ts);
73 sample.assign_f32(data);
74 sample.pushthrough = pushthrough;
75 self.send_buffer.push_sample(sample);
76 }
77
78 pub fn push_sample_d(&self, data: &[f64], timestamp: f64, pushthrough: bool) {
80 let ts = if timestamp == 0.0 {
81 local_clock()
82 } else {
83 timestamp
84 };
85 let mut sample = Sample::new(self.info.channel_format(), self.info.channel_count(), ts);
86 sample.assign_f64(data);
87 sample.pushthrough = pushthrough;
88 self.send_buffer.push_sample(sample);
89 }
90
91 pub fn push_sample_i32(&self, data: &[i32], timestamp: f64, pushthrough: bool) {
92 let ts = if timestamp == 0.0 {
93 local_clock()
94 } else {
95 timestamp
96 };
97 let mut sample = Sample::new(self.info.channel_format(), self.info.channel_count(), ts);
98 sample.assign_i32(data);
99 sample.pushthrough = pushthrough;
100 self.send_buffer.push_sample(sample);
101 }
102
103 pub fn push_sample_i16(&self, data: &[i16], timestamp: f64, pushthrough: bool) {
104 let ts = if timestamp == 0.0 {
105 local_clock()
106 } else {
107 timestamp
108 };
109 let mut sample = Sample::new(self.info.channel_format(), self.info.channel_count(), ts);
110 sample.assign_i16(data);
111 sample.pushthrough = pushthrough;
112 self.send_buffer.push_sample(sample);
113 }
114
115 pub fn push_sample_i64(&self, data: &[i64], timestamp: f64, pushthrough: bool) {
116 let ts = if timestamp == 0.0 {
117 local_clock()
118 } else {
119 timestamp
120 };
121 let mut sample = Sample::new(self.info.channel_format(), self.info.channel_count(), ts);
122 sample.assign_i64(data);
123 sample.pushthrough = pushthrough;
124 self.send_buffer.push_sample(sample);
125 }
126
127 pub fn push_sample_str(&self, data: &[String], timestamp: f64, pushthrough: bool) {
128 let ts = if timestamp == 0.0 {
129 local_clock()
130 } else {
131 timestamp
132 };
133 let mut sample = Sample::new(self.info.channel_format(), self.info.channel_count(), ts);
134 sample.assign_strings(data);
135 sample.pushthrough = pushthrough;
136 self.send_buffer.push_sample(sample);
137 }
138
139 pub fn push_sample_raw(&self, data: &[u8], timestamp: f64, pushthrough: bool) {
140 let ts = if timestamp == 0.0 {
141 local_clock()
142 } else {
143 timestamp
144 };
145 let mut sample = Sample::new(self.info.channel_format(), self.info.channel_count(), ts);
146 sample.assign_raw(data);
147 sample.pushthrough = pushthrough;
148 self.send_buffer.push_sample(sample);
149 }
150
151 pub fn push_chunk_f(&self, data: &[f32], timestamp: f64, pushthrough: bool) {
153 let nch = self.info.channel_count() as usize;
154 if nch == 0 {
155 return;
156 }
157 let n_samples = data.len() / nch;
158 let srate = self.info.nominal_srate();
159 let mut ts = if timestamp == 0.0 {
160 local_clock()
161 } else {
162 timestamp
163 };
164 if srate != IRREGULAR_RATE && n_samples > 1 {
165 ts -= (n_samples - 1) as f64 / srate;
166 }
167 for i in 0..n_samples {
168 let chunk = &data[i * nch..(i + 1) * nch];
169 let is_last = i == n_samples - 1;
170 let sample_ts = if i == 0 { ts } else { DEDUCED_TIMESTAMP };
171 self.push_sample_f(chunk, sample_ts, pushthrough && is_last);
172 if srate != IRREGULAR_RATE && i == 0 {
173 }
175 }
176 }
177
178 pub fn have_consumers(&self) -> bool {
180 self.send_buffer.have_consumers()
181 }
182
183 pub fn wait_for_consumers(&self, timeout: f64) -> bool {
185 self.send_buffer.wait_for_consumers(timeout)
186 }
187}
188
189impl Drop for StreamOutlet {
190 fn drop(&mut self) {
191 self.shutdown.store(true, Ordering::Relaxed);
192 self.send_buffer.push_sentinel();
193 }
194}