Skip to main content

lsl_core/
tcp_server.rs

1//! TCP data server for a stream outlet.
2//!
3//! Implements the liblsl TCP protocol:
4//! - LSL:shortinfo - returns shortinfo XML if query matches
5//! - LSL:fullinfo  - returns full info XML
6//! - LSL:streamfeed/110 - negotiates and streams samples
7
8use crate::config::CONFIG;
9use crate::sample::Sample;
10use crate::send_buffer::SendBuffer;
11use crate::stream_info::StreamInfo;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::Arc;
14use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
15use tokio::net::{TcpListener, TcpStream};
16
17pub struct TcpServer {
18    info: StreamInfo,
19    send_buffer: Arc<SendBuffer>,
20    shutdown: Arc<AtomicBool>,
21}
22
23/// Ports returned by `TcpServer::start`.
24pub struct TcpPorts {
25    pub v4_port: u16,
26    pub v6_port: u16,
27    pub shutdown: Arc<AtomicBool>,
28}
29
30impl TcpServer {
31    /// Start the TCP server on both IPv4 and IPv6.
32    /// Returns ports and a shared shutdown handle.
33    pub fn start(info: StreamInfo, send_buffer: Arc<SendBuffer>, chunk_size: i32) -> TcpPorts {
34        let shutdown = Arc::new(AtomicBool::new(false));
35
36        let server = Arc::new(TcpServer {
37            info: info.clone(),
38            send_buffer,
39            shutdown: shutdown.clone(),
40        });
41
42        // --- IPv4 listener ---
43        let v4_port = {
44            let listener = crate::RUNTIME
45                .block_on(async { TcpListener::bind("0.0.0.0:0").await })
46                .expect("Failed to bind TCPv4 server");
47            let port = listener.local_addr().unwrap().port();
48            let srv = server.clone();
49            crate::RUNTIME.spawn(async move {
50                Self::accept_loop(listener, srv, chunk_size).await;
51            });
52            port
53        };
54
55        // --- IPv6 listener ---
56        let v6_port = if crate::config::CONFIG.allow_ipv6 {
57            match crate::RUNTIME.block_on(async { TcpListener::bind("[::]:0").await }) {
58                Ok(listener) => {
59                    let port = listener.local_addr().unwrap().port();
60                    let srv = server.clone();
61                    crate::RUNTIME.spawn(async move {
62                        Self::accept_loop(listener, srv, chunk_size).await;
63                    });
64                    port
65                }
66                Err(_) => 0,
67            }
68        } else {
69            0
70        };
71
72        TcpPorts {
73            v4_port,
74            v6_port,
75            shutdown,
76        }
77    }
78
79    async fn accept_loop(listener: TcpListener, server: Arc<TcpServer>, chunk_size: i32) {
80        loop {
81            tokio::select! {
82                result = listener.accept() => {
83                    match result {
84                        Ok((stream, _addr)) => {
85                            let srv = server.clone();
86                            let cs = chunk_size;
87                            tokio::spawn(async move {
88                                let _ = srv.handle_connection(stream, cs).await;
89                            });
90                        }
91                        Err(_) => {
92                            if server.shutdown.load(Ordering::Relaxed) { break; }
93                        }
94                    }
95                }
96                _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
97                    if server.shutdown.load(Ordering::Relaxed) { break; }
98                }
99            }
100        }
101    }
102
103    async fn handle_connection(&self, stream: TcpStream, chunk_size: i32) -> std::io::Result<()> {
104        stream.set_nodelay(true)?;
105        let mut reader = BufReader::new(stream);
106
107        // Read the command line
108        let mut command = String::new();
109        reader.read_line(&mut command).await?;
110        let command = command.trim().to_string();
111
112        if command == "LSL:shortinfo" {
113            self.handle_shortinfo(&mut reader).await
114        } else if command == "LSL:fullinfo" {
115            self.handle_fullinfo(&mut reader).await
116        } else if command.starts_with("LSL:streamfeed") {
117            self.handle_streamfeed(&mut reader, &command, chunk_size)
118                .await
119        } else {
120            Ok(())
121        }
122    }
123
124    async fn handle_shortinfo(&self, reader: &mut BufReader<TcpStream>) -> std::io::Result<()> {
125        let mut query = String::new();
126        reader.read_line(&mut query).await?;
127        let query = query.trim().to_string();
128
129        if self.info.matches_query(&query) {
130            let msg = self.info.to_shortinfo_message();
131            reader.get_mut().write_all(msg.as_bytes()).await?;
132        }
133        Ok(())
134    }
135
136    async fn handle_fullinfo(&self, reader: &mut BufReader<TcpStream>) -> std::io::Result<()> {
137        let msg = self.info.to_fullinfo_message();
138        reader.get_mut().write_all(msg.as_bytes()).await?;
139        Ok(())
140    }
141
142    async fn handle_streamfeed(
143        &self,
144        reader: &mut BufReader<TcpStream>,
145        command: &str,
146        chunk_size: i32,
147    ) -> std::io::Result<()> {
148        let mut max_buffered = 360;
149        let mut max_chunklen = 0;
150        let mut _request_uid = String::new();
151        let mut data_protocol_version = 110;
152
153        if command.starts_with("LSL:streamfeed/") {
154            // Parse version and optional UID from command line
155            let parts: Vec<&str> = command.split_whitespace().collect();
156            if let Some(ver_str) = parts
157                .first()
158                .and_then(|s| s.strip_prefix("LSL:streamfeed/"))
159            {
160                data_protocol_version = ver_str.parse().unwrap_or(110);
161            }
162            if parts.len() > 1 {
163                _request_uid = parts[1].to_string();
164            }
165
166            // Read feed parameters (key: value headers until empty line)
167            loop {
168                let mut line = String::new();
169                reader.read_line(&mut line).await?;
170                let trimmed = line.trim().to_string();
171                if trimmed.is_empty() {
172                    break;
173                }
174                let line = trimmed;
175                if let Some(colon) = line.find(':') {
176                    let key = line[..colon].trim().to_lowercase();
177                    let val = line[colon + 1..].trim().to_string();
178                    match key.as_str() {
179                        "max-buffer-length" => {
180                            max_buffered = val.parse().unwrap_or(360);
181                        }
182                        "max-chunk-length" => {
183                            max_chunklen = val.parse().unwrap_or(0);
184                        }
185                        _ => {}
186                    }
187                }
188            }
189
190            // Send response
191            let response = format!(
192                "LSL/{} 200 OK\r\nUID: {}\r\nByte-Order: 1234\r\nSuppress-Subnormals: 0\r\nData-Protocol-Version: {}\r\n\r\n",
193                CONFIG.use_protocol_version,
194                self.info.uid(),
195                data_protocol_version
196            );
197            reader.get_mut().write_all(response.as_bytes()).await?;
198            reader.get_mut().flush().await?;
199        } else {
200            // Protocol 1.00 fallback - read two integers
201            let mut params = String::new();
202            reader.read_line(&mut params).await?;
203            let parts: Vec<&str> = params.split_whitespace().collect();
204            if parts.len() >= 2 {
205                max_buffered = parts[0].parse().unwrap_or(360);
206                max_chunklen = parts[1].parse().unwrap_or(0);
207            }
208        }
209
210        // Send test pattern samples using the negotiated protocol version
211        let fmt = self.info.channel_format();
212        let nch = self.info.channel_count();
213        for test_offset in [4, 2] {
214            let mut test_sample = Sample::new(fmt, nch, 0.0);
215            test_sample.assign_test_pattern(test_offset);
216            let mut buf = Vec::new();
217            if data_protocol_version >= 110 {
218                test_sample.serialize_110(&mut buf);
219            } else {
220                test_sample.serialize_100(&mut buf);
221            }
222            reader.get_mut().write_all(&buf).await?;
223        }
224        reader.get_mut().flush().await?;
225
226        if max_buffered <= 0 {
227            return Ok(());
228        }
229
230        // Subscribe to the send buffer
231        let consumer = self.send_buffer.new_consumer(max_buffered as usize);
232
233        let effective_chunk = if max_chunklen > 0 {
234            max_chunklen
235        } else if chunk_size > 0 {
236            chunk_size
237        } else {
238            i32::MAX
239        };
240
241        // Stream samples
242        let mut chunk_count = 0;
243        let mut chunk_buf = Vec::with_capacity(4096);
244
245        loop {
246            if self.shutdown.load(Ordering::Relaxed) {
247                break;
248            }
249
250            match consumer.recv_timeout(std::time::Duration::from_millis(100)) {
251                Ok(Some(sample)) => {
252                    if data_protocol_version >= 110 {
253                        sample.serialize_110(&mut chunk_buf);
254                    } else {
255                        sample.serialize_100(&mut chunk_buf);
256                    }
257                    chunk_count += 1;
258
259                    if sample.pushthrough || chunk_count >= effective_chunk {
260                        if reader.get_mut().write_all(&chunk_buf).await.is_err() {
261                            break;
262                        }
263                        chunk_buf.clear();
264                        chunk_count = 0;
265                    }
266                }
267                Ok(None) => break, // sentinel
268                Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
269                    // Flush any pending data
270                    if !chunk_buf.is_empty() {
271                        if reader.get_mut().write_all(&chunk_buf).await.is_err() {
272                            break;
273                        }
274                        chunk_buf.clear();
275                        chunk_count = 0;
276                    }
277                }
278                Err(crossbeam_channel::RecvTimeoutError::Disconnected) => break,
279            }
280        }
281
282        Ok(())
283    }
284}