1use crate::clock::local_clock;
10use crate::config::CONFIG;
11use crate::stream_info::StreamInfo;
12use socket2::{Domain, Protocol, Socket, Type};
13use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::sync::Arc;
16use tokio::net::UdpSocket;
17
18pub struct UdpServer;
19
20impl UdpServer {
21 pub fn start_unicast(info: StreamInfo, shutdown: Arc<AtomicBool>) -> (u16, u16) {
24 let v4_port = {
26 let socket = crate::RUNTIME.block_on(async {
27 UdpSocket::bind("0.0.0.0:0")
28 .await
29 .expect("Failed to bind UDPv4 service socket")
30 });
31 let port = socket.local_addr().unwrap().port();
32 let shortinfo = info.to_shortinfo_message();
33 let info_clone = info.clone();
34 let shutdown = shutdown.clone();
35
36 crate::RUNTIME.spawn(async move {
37 run_unicast_loop(socket, &info_clone, &shortinfo, &shutdown).await;
38 });
39 port
40 };
41
42 let v6_port = if CONFIG.allow_ipv6 {
44 match crate::RUNTIME.block_on(async { UdpSocket::bind("[::]:0").await }) {
45 Ok(socket) => {
46 let port = socket.local_addr().unwrap().port();
47 let shortinfo = info.to_shortinfo_message();
48 let info_clone = info.clone();
49 let shutdown = shutdown.clone();
50
51 crate::RUNTIME.spawn(async move {
52 run_unicast_loop(socket, &info_clone, &shortinfo, &shutdown).await;
53 });
54 port
55 }
56 Err(_) => 0,
57 }
58 } else {
59 0
60 };
61
62 (v4_port, v6_port)
63 }
64
65 pub fn start_multicast(info: StreamInfo, shutdown: Arc<AtomicBool>) {
68 let shortinfo = info.to_shortinfo_message();
69
70 for &addr in &CONFIG.multicast_addresses {
71 if addr.is_ipv6() && !CONFIG.allow_ipv6 {
73 continue;
74 }
75
76 let shortinfo = shortinfo.clone();
77 let info = info.clone();
78 let shutdown = shutdown.clone();
79
80 crate::RUNTIME.spawn(async move {
81 let socket = match create_multicast_listener(addr, CONFIG.multicast_port).await {
82 Ok(s) => s,
83 Err(_) => return,
84 };
85
86 let mut buf = vec![0u8; 65536];
87 loop {
88 if shutdown.load(Ordering::Relaxed) { break; }
89 tokio::select! {
90 result = socket.recv_from(&mut buf) => {
91 if let Ok((len, peer_addr)) = result {
92 let msg = std::str::from_utf8(&buf[..len]).unwrap_or("");
93 let mut lines = msg.lines();
94 let method = lines.next().unwrap_or("").trim();
95
96 if method == "LSL:shortinfo" {
97 let query = lines.next().unwrap_or("").trim().to_string();
98 let params_line = lines.next().unwrap_or("").trim().to_string();
99 let parts: Vec<&str> = params_line.split_whitespace().collect();
100 let return_port: u16 = parts.first().and_then(|s| s.parse().ok()).unwrap_or(0);
101 let query_id = parts.get(1).unwrap_or(&"").to_string();
102
103 if info.matches_query(&query) {
104 let reply = format!("{}\r\n{}", query_id, shortinfo);
105 let return_addr = SocketAddr::new(peer_addr.ip(), return_port);
106 let _ = socket.send_to(reply.as_bytes(), return_addr).await;
107 }
108 }
109 }
110 }
111 _ = tokio::time::sleep(std::time::Duration::from_millis(200)) => {
112 if shutdown.load(Ordering::Relaxed) { break; }
113 }
114 }
115 }
116 });
117 }
118 }
119}
120
121async fn run_unicast_loop(
124 socket: UdpSocket,
125 info: &StreamInfo,
126 shortinfo: &str,
127 shutdown: &Arc<AtomicBool>,
128) {
129 let mut buf = vec![0u8; 65536];
130 loop {
131 if shutdown.load(Ordering::Relaxed) {
132 break;
133 }
134 tokio::select! {
135 result = socket.recv_from(&mut buf) => {
136 if let Ok((len, addr)) = result {
137 let msg = std::str::from_utf8(&buf[..len]).unwrap_or("");
138 let mut lines = msg.lines();
139 let method = lines.next().unwrap_or("").trim();
140
141 if method == "LSL:shortinfo" {
142 let query = lines.next().unwrap_or("").trim().to_string();
143 let params_line = lines.next().unwrap_or("").trim().to_string();
144 let parts: Vec<&str> = params_line.split_whitespace().collect();
145 let return_port: u16 = parts.first().and_then(|s| s.parse().ok()).unwrap_or(0);
146 let query_id = parts.get(1).unwrap_or(&"").to_string();
147
148 if info.matches_query(&query) {
149 let reply = format!("{}\r\n{}", query_id, shortinfo);
150 let return_addr = SocketAddr::new(addr.ip(), return_port);
151 let _ = socket.send_to(reply.as_bytes(), return_addr).await;
152 }
153 } else if method == "LSL:timedata" {
154 let t1 = local_clock();
155 let params = lines.next().unwrap_or("").trim().to_string();
156 let parts: Vec<&str> = params.split_whitespace().collect();
157 let wave_id = parts.first().unwrap_or(&"0");
158 let t0 = parts.get(1).unwrap_or(&"0");
159 let t2 = local_clock();
160 let reply = format!(" {} {} {} {}", wave_id, t0, t1, t2);
161 let _ = socket.send_to(reply.as_bytes(), addr).await;
162 }
163 }
164 }
165 _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
166 if shutdown.load(Ordering::Relaxed) { break; }
167 }
168 }
169 }
170}
171
172async fn create_multicast_listener(addr: IpAddr, port: u16) -> std::io::Result<UdpSocket> {
175 match addr {
176 IpAddr::V4(v4) => create_multicast_listener_v4(v4, port).await,
177 IpAddr::V6(v6) => create_multicast_listener_v6(v6, port).await,
178 }
179}
180
181async fn create_multicast_listener_v4(addr: Ipv4Addr, port: u16) -> std::io::Result<UdpSocket> {
182 let socket2 = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
183 socket2.set_reuse_address(true)?;
184 #[cfg(unix)]
185 socket2.set_reuse_port(true)?;
186
187 let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port);
188 socket2.bind(&bind_addr.into())?;
189
190 if addr.is_multicast() {
191 let _ = socket2.join_multicast_v4(&addr, &Ipv4Addr::UNSPECIFIED);
192 let _ = socket2.set_multicast_ttl_v4(CONFIG.multicast_ttl);
193 }
194
195 socket2.set_nonblocking(true)?;
196 let std_socket: std::net::UdpSocket = socket2.into();
197 UdpSocket::from_std(std_socket)
198}
199
200async fn create_multicast_listener_v6(addr: Ipv6Addr, port: u16) -> std::io::Result<UdpSocket> {
201 let socket2 = Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?;
202 socket2.set_reuse_address(true)?;
203 #[cfg(unix)]
204 socket2.set_reuse_port(true)?;
205 let _ = socket2.set_only_v6(true);
207
208 let bind_addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port);
209 socket2.bind(&bind_addr.into())?;
210
211 if is_ipv6_multicast(&addr) {
212 let _ = socket2.join_multicast_v6(&addr, 0);
214 let _ = socket2.set_multicast_hops_v6(CONFIG.multicast_ttl);
215 }
216
217 socket2.set_nonblocking(true)?;
218 let std_socket: std::net::UdpSocket = socket2.into();
219 UdpSocket::from_std(std_socket)
220}
221
222fn is_ipv6_multicast(addr: &Ipv6Addr) -> bool {
223 addr.segments()[0] & 0xff00 == 0xff00
224}