Skip to main content

lsl_core/
udp_server.rs

1//! UDP service responder for a stream outlet.
2//!
3//! Handles:
4//! - LSL:shortinfo queries (discovery)
5//! - LSL:timedata queries (time synchronization)
6//!
7//! Supports both IPv4 and IPv6.
8
9use crate::clock::local_clock;
10use crate::config::CONFIG;
11use crate::stream_info::StreamInfo;
12use socket2::{Domain, Protocol, Socket, Type};
13use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::sync::Arc;
16use tokio::net::UdpSocket;
17
18pub struct UdpServer;
19
20impl UdpServer {
21    /// Start the unicast UDP service (time sync + shortinfo on a dedicated port).
22    /// Binds on both IPv4 and IPv6. Returns (v4_port, v6_port).
23    pub fn start_unicast(info: StreamInfo, shutdown: Arc<AtomicBool>) -> (u16, u16) {
24        // --- IPv4 unicast ---
25        let v4_port = {
26            let socket = crate::RUNTIME.block_on(async {
27                UdpSocket::bind("0.0.0.0:0")
28                    .await
29                    .expect("Failed to bind UDPv4 service socket")
30            });
31            let port = socket.local_addr().unwrap().port();
32            let shortinfo = info.to_shortinfo_message();
33            let info_clone = info.clone();
34            let shutdown = shutdown.clone();
35
36            crate::RUNTIME.spawn(async move {
37                run_unicast_loop(socket, &info_clone, &shortinfo, &shutdown).await;
38            });
39            port
40        };
41
42        // --- IPv6 unicast ---
43        let v6_port = if CONFIG.allow_ipv6 {
44            match crate::RUNTIME.block_on(async { UdpSocket::bind("[::]:0").await }) {
45                Ok(socket) => {
46                    let port = socket.local_addr().unwrap().port();
47                    let shortinfo = info.to_shortinfo_message();
48                    let info_clone = info.clone();
49                    let shutdown = shutdown.clone();
50
51                    crate::RUNTIME.spawn(async move {
52                        run_unicast_loop(socket, &info_clone, &shortinfo, &shutdown).await;
53                    });
54                    port
55                }
56                Err(_) => 0,
57            }
58        } else {
59            0
60        };
61
62        (v4_port, v6_port)
63    }
64
65    /// Start multicast/broadcast responders on the multicast port.
66    /// Creates listeners for both IPv4 and IPv6 multicast groups.
67    pub fn start_multicast(info: StreamInfo, shutdown: Arc<AtomicBool>) {
68        let shortinfo = info.to_shortinfo_message();
69
70        for &addr in &CONFIG.multicast_addresses {
71            // Skip IPv6 addresses if disabled
72            if addr.is_ipv6() && !CONFIG.allow_ipv6 {
73                continue;
74            }
75
76            let shortinfo = shortinfo.clone();
77            let info = info.clone();
78            let shutdown = shutdown.clone();
79
80            crate::RUNTIME.spawn(async move {
81                let socket = match create_multicast_listener(addr, CONFIG.multicast_port).await {
82                    Ok(s) => s,
83                    Err(_) => return,
84                };
85
86                let mut buf = vec![0u8; 65536];
87                loop {
88                    if shutdown.load(Ordering::Relaxed) { break; }
89                    tokio::select! {
90                        result = socket.recv_from(&mut buf) => {
91                            if let Ok((len, peer_addr)) = result {
92                                let msg = std::str::from_utf8(&buf[..len]).unwrap_or("");
93                                let mut lines = msg.lines();
94                                let method = lines.next().unwrap_or("").trim();
95
96                                if method == "LSL:shortinfo" {
97                                    let query = lines.next().unwrap_or("").trim().to_string();
98                                    let params_line = lines.next().unwrap_or("").trim().to_string();
99                                    let parts: Vec<&str> = params_line.split_whitespace().collect();
100                                    let return_port: u16 = parts.first().and_then(|s| s.parse().ok()).unwrap_or(0);
101                                    let query_id = parts.get(1).unwrap_or(&"").to_string();
102
103                                    if info.matches_query(&query) {
104                                        let reply = format!("{}\r\n{}", query_id, shortinfo);
105                                        let return_addr = SocketAddr::new(peer_addr.ip(), return_port);
106                                        let _ = socket.send_to(reply.as_bytes(), return_addr).await;
107                                    }
108                                }
109                            }
110                        }
111                        _ = tokio::time::sleep(std::time::Duration::from_millis(200)) => {
112                            if shutdown.load(Ordering::Relaxed) { break; }
113                        }
114                    }
115                }
116            });
117        }
118    }
119}
120
121// ── Shared unicast handler loop ──────────────────────────────────────
122
123async fn run_unicast_loop(
124    socket: UdpSocket,
125    info: &StreamInfo,
126    shortinfo: &str,
127    shutdown: &Arc<AtomicBool>,
128) {
129    let mut buf = vec![0u8; 65536];
130    loop {
131        if shutdown.load(Ordering::Relaxed) {
132            break;
133        }
134        tokio::select! {
135            result = socket.recv_from(&mut buf) => {
136                if let Ok((len, addr)) = result {
137                    let msg = std::str::from_utf8(&buf[..len]).unwrap_or("");
138                    let mut lines = msg.lines();
139                    let method = lines.next().unwrap_or("").trim();
140
141                    if method == "LSL:shortinfo" {
142                        let query = lines.next().unwrap_or("").trim().to_string();
143                        let params_line = lines.next().unwrap_or("").trim().to_string();
144                        let parts: Vec<&str> = params_line.split_whitespace().collect();
145                        let return_port: u16 = parts.first().and_then(|s| s.parse().ok()).unwrap_or(0);
146                        let query_id = parts.get(1).unwrap_or(&"").to_string();
147
148                        if info.matches_query(&query) {
149                            let reply = format!("{}\r\n{}", query_id, shortinfo);
150                            let return_addr = SocketAddr::new(addr.ip(), return_port);
151                            let _ = socket.send_to(reply.as_bytes(), return_addr).await;
152                        }
153                    } else if method == "LSL:timedata" {
154                        let t1 = local_clock();
155                        let params = lines.next().unwrap_or("").trim().to_string();
156                        let parts: Vec<&str> = params.split_whitespace().collect();
157                        let wave_id = parts.first().unwrap_or(&"0");
158                        let t0 = parts.get(1).unwrap_or(&"0");
159                        let t2 = local_clock();
160                        let reply = format!(" {} {} {} {}", wave_id, t0, t1, t2);
161                        let _ = socket.send_to(reply.as_bytes(), addr).await;
162                    }
163                }
164            }
165            _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
166                if shutdown.load(Ordering::Relaxed) { break; }
167            }
168        }
169    }
170}
171
172// ── Multicast socket helpers ─────────────────────────────────────────
173
174async fn create_multicast_listener(addr: IpAddr, port: u16) -> std::io::Result<UdpSocket> {
175    match addr {
176        IpAddr::V4(v4) => create_multicast_listener_v4(v4, port).await,
177        IpAddr::V6(v6) => create_multicast_listener_v6(v6, port).await,
178    }
179}
180
181async fn create_multicast_listener_v4(addr: Ipv4Addr, port: u16) -> std::io::Result<UdpSocket> {
182    let socket2 = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
183    socket2.set_reuse_address(true)?;
184    #[cfg(unix)]
185    socket2.set_reuse_port(true)?;
186
187    let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port);
188    socket2.bind(&bind_addr.into())?;
189
190    if addr.is_multicast() {
191        let _ = socket2.join_multicast_v4(&addr, &Ipv4Addr::UNSPECIFIED);
192        let _ = socket2.set_multicast_ttl_v4(CONFIG.multicast_ttl);
193    }
194
195    socket2.set_nonblocking(true)?;
196    let std_socket: std::net::UdpSocket = socket2.into();
197    UdpSocket::from_std(std_socket)
198}
199
200async fn create_multicast_listener_v6(addr: Ipv6Addr, port: u16) -> std::io::Result<UdpSocket> {
201    let socket2 = Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?;
202    socket2.set_reuse_address(true)?;
203    #[cfg(unix)]
204    socket2.set_reuse_port(true)?;
205    // Don't use dual-stack on the multicast listener — we have separate v4 listeners.
206    let _ = socket2.set_only_v6(true);
207
208    let bind_addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port);
209    socket2.bind(&bind_addr.into())?;
210
211    if is_ipv6_multicast(&addr) {
212        // interface 0 = all interfaces
213        let _ = socket2.join_multicast_v6(&addr, 0);
214        let _ = socket2.set_multicast_hops_v6(CONFIG.multicast_ttl);
215    }
216
217    socket2.set_nonblocking(true)?;
218    let std_socket: std::net::UdpSocket = socket2.into();
219    UdpSocket::from_std(std_socket)
220}
221
222fn is_ipv6_multicast(addr: &Ipv6Addr) -> bool {
223    addr.segments()[0] & 0xff00 == 0xff00
224}