1use crate::config::CONFIG;
9use crate::sample::Sample;
10use crate::send_buffer::SendBuffer;
11use crate::stream_info::StreamInfo;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::Arc;
14use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
15use tokio::net::{TcpListener, TcpStream};
16
17pub struct TcpServer {
18 info: StreamInfo,
19 send_buffer: Arc<SendBuffer>,
20 shutdown: Arc<AtomicBool>,
21}
22
23pub struct TcpPorts {
25 pub v4_port: u16,
26 pub v6_port: u16,
27 pub shutdown: Arc<AtomicBool>,
28}
29
30impl TcpServer {
31 pub fn start(info: StreamInfo, send_buffer: Arc<SendBuffer>, chunk_size: i32) -> TcpPorts {
34 let shutdown = Arc::new(AtomicBool::new(false));
35
36 let server = Arc::new(TcpServer {
37 info: info.clone(),
38 send_buffer,
39 shutdown: shutdown.clone(),
40 });
41
42 let v4_port = {
44 let listener = crate::RUNTIME
45 .block_on(async { TcpListener::bind("0.0.0.0:0").await })
46 .expect("Failed to bind TCPv4 server");
47 let port = listener.local_addr().unwrap().port();
48 let srv = server.clone();
49 crate::RUNTIME.spawn(async move {
50 Self::accept_loop(listener, srv, chunk_size).await;
51 });
52 port
53 };
54
55 let v6_port = if crate::config::CONFIG.allow_ipv6 {
57 match crate::RUNTIME.block_on(async { TcpListener::bind("[::]:0").await }) {
58 Ok(listener) => {
59 let port = listener.local_addr().unwrap().port();
60 let srv = server.clone();
61 crate::RUNTIME.spawn(async move {
62 Self::accept_loop(listener, srv, chunk_size).await;
63 });
64 port
65 }
66 Err(_) => 0,
67 }
68 } else {
69 0
70 };
71
72 TcpPorts {
73 v4_port,
74 v6_port,
75 shutdown,
76 }
77 }
78
79 async fn accept_loop(listener: TcpListener, server: Arc<TcpServer>, chunk_size: i32) {
80 loop {
81 tokio::select! {
82 result = listener.accept() => {
83 match result {
84 Ok((stream, _addr)) => {
85 let srv = server.clone();
86 let cs = chunk_size;
87 tokio::spawn(async move {
88 let _ = srv.handle_connection(stream, cs).await;
89 });
90 }
91 Err(_) => {
92 if server.shutdown.load(Ordering::Relaxed) { break; }
93 }
94 }
95 }
96 _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
97 if server.shutdown.load(Ordering::Relaxed) { break; }
98 }
99 }
100 }
101 }
102
103 async fn handle_connection(&self, stream: TcpStream, chunk_size: i32) -> std::io::Result<()> {
104 stream.set_nodelay(true)?;
105 let mut reader = BufReader::new(stream);
106
107 let mut command = String::new();
109 reader.read_line(&mut command).await?;
110 let command = command.trim().to_string();
111
112 if command == "LSL:shortinfo" {
113 self.handle_shortinfo(&mut reader).await
114 } else if command == "LSL:fullinfo" {
115 self.handle_fullinfo(&mut reader).await
116 } else if command.starts_with("LSL:streamfeed") {
117 self.handle_streamfeed(&mut reader, &command, chunk_size)
118 .await
119 } else {
120 Ok(())
121 }
122 }
123
124 async fn handle_shortinfo(&self, reader: &mut BufReader<TcpStream>) -> std::io::Result<()> {
125 let mut query = String::new();
126 reader.read_line(&mut query).await?;
127 let query = query.trim().to_string();
128
129 if self.info.matches_query(&query) {
130 let msg = self.info.to_shortinfo_message();
131 reader.get_mut().write_all(msg.as_bytes()).await?;
132 }
133 Ok(())
134 }
135
136 async fn handle_fullinfo(&self, reader: &mut BufReader<TcpStream>) -> std::io::Result<()> {
137 let msg = self.info.to_fullinfo_message();
138 reader.get_mut().write_all(msg.as_bytes()).await?;
139 Ok(())
140 }
141
142 async fn handle_streamfeed(
143 &self,
144 reader: &mut BufReader<TcpStream>,
145 command: &str,
146 chunk_size: i32,
147 ) -> std::io::Result<()> {
148 let mut max_buffered = 360;
149 let mut max_chunklen = 0;
150 let mut _request_uid = String::new();
151 let mut data_protocol_version = 110;
152
153 if command.starts_with("LSL:streamfeed/") {
154 let parts: Vec<&str> = command.split_whitespace().collect();
156 if let Some(ver_str) = parts
157 .first()
158 .and_then(|s| s.strip_prefix("LSL:streamfeed/"))
159 {
160 data_protocol_version = ver_str.parse().unwrap_or(110);
161 }
162 if parts.len() > 1 {
163 _request_uid = parts[1].to_string();
164 }
165
166 loop {
168 let mut line = String::new();
169 reader.read_line(&mut line).await?;
170 let trimmed = line.trim().to_string();
171 if trimmed.is_empty() {
172 break;
173 }
174 let line = trimmed;
175 if let Some(colon) = line.find(':') {
176 let key = line[..colon].trim().to_lowercase();
177 let val = line[colon + 1..].trim().to_string();
178 match key.as_str() {
179 "max-buffer-length" => {
180 max_buffered = val.parse().unwrap_or(360);
181 }
182 "max-chunk-length" => {
183 max_chunklen = val.parse().unwrap_or(0);
184 }
185 _ => {}
186 }
187 }
188 }
189
190 let response = format!(
192 "LSL/{} 200 OK\r\nUID: {}\r\nByte-Order: 1234\r\nSuppress-Subnormals: 0\r\nData-Protocol-Version: {}\r\n\r\n",
193 CONFIG.use_protocol_version,
194 self.info.uid(),
195 data_protocol_version
196 );
197 reader.get_mut().write_all(response.as_bytes()).await?;
198 reader.get_mut().flush().await?;
199 } else {
200 let mut params = String::new();
202 reader.read_line(&mut params).await?;
203 let parts: Vec<&str> = params.split_whitespace().collect();
204 if parts.len() >= 2 {
205 max_buffered = parts[0].parse().unwrap_or(360);
206 max_chunklen = parts[1].parse().unwrap_or(0);
207 }
208 }
209
210 let fmt = self.info.channel_format();
212 let nch = self.info.channel_count();
213 for test_offset in [4, 2] {
214 let mut test_sample = Sample::new(fmt, nch, 0.0);
215 test_sample.assign_test_pattern(test_offset);
216 let mut buf = Vec::new();
217 if data_protocol_version >= 110 {
218 test_sample.serialize_110(&mut buf);
219 } else {
220 test_sample.serialize_100(&mut buf);
221 }
222 reader.get_mut().write_all(&buf).await?;
223 }
224 reader.get_mut().flush().await?;
225
226 if max_buffered <= 0 {
227 return Ok(());
228 }
229
230 let consumer = self.send_buffer.new_consumer(max_buffered as usize);
232
233 let effective_chunk = if max_chunklen > 0 {
234 max_chunklen
235 } else if chunk_size > 0 {
236 chunk_size
237 } else {
238 i32::MAX
239 };
240
241 let mut chunk_count = 0;
243 let mut chunk_buf = Vec::with_capacity(4096);
244
245 loop {
246 if self.shutdown.load(Ordering::Relaxed) {
247 break;
248 }
249
250 match consumer.recv_timeout(std::time::Duration::from_millis(100)) {
251 Ok(Some(sample)) => {
252 if data_protocol_version >= 110 {
253 sample.serialize_110(&mut chunk_buf);
254 } else {
255 sample.serialize_100(&mut chunk_buf);
256 }
257 chunk_count += 1;
258
259 if sample.pushthrough || chunk_count >= effective_chunk {
260 if reader.get_mut().write_all(&chunk_buf).await.is_err() {
261 break;
262 }
263 chunk_buf.clear();
264 chunk_count = 0;
265 }
266 }
267 Ok(None) => break, Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
269 if !chunk_buf.is_empty() {
271 if reader.get_mut().write_all(&chunk_buf).await.is_err() {
272 break;
273 }
274 chunk_buf.clear();
275 chunk_count = 0;
276 }
277 }
278 Err(crossbeam_channel::RecvTimeoutError::Disconnected) => break,
279 }
280 }
281
282 Ok(())
283 }
284}