1use crate::config::CONFIG;
7use crate::stream_info::StreamInfo;
8use std::collections::HashMap;
9use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
10use std::time::Duration;
11use tokio::net::UdpSocket;
12
13pub fn resolve_all(wait_time: f64) -> Vec<StreamInfo> {
15 resolve_query("", 0, wait_time)
16}
17
18pub fn resolve_by_property(prop: &str, value: &str, minimum: i32, timeout: f64) -> Vec<StreamInfo> {
20 let query = if value.is_empty() {
21 String::new()
22 } else {
23 format!("{}='{}'", prop, value)
24 };
25 resolve_query(&query, minimum, timeout)
26}
27
28pub fn resolve_by_predicate(pred: &str, minimum: i32, timeout: f64) -> Vec<StreamInfo> {
30 resolve_query(pred, minimum, timeout)
31}
32
33pub fn resolve_query(query: &str, minimum: i32, timeout: f64) -> Vec<StreamInfo> {
35 let query = query.to_string();
38 let (tx, rx) = std::sync::mpsc::channel();
39 crate::RUNTIME.spawn(async move {
40 let result = resolve_query_async(&query, minimum, timeout).await;
41 let _ = tx.send(result);
42 });
43 let deadline = std::time::Duration::from_secs_f64(timeout + 2.0);
44 rx.recv_timeout(deadline).unwrap_or_default()
45}
46
47async fn resolve_query_async(query: &str, minimum: i32, timeout: f64) -> Vec<StreamInfo> {
48 let wait = Duration::from_secs_f64(timeout.max(0.01));
49
50 let v4_recv = UdpSocket::bind("0.0.0.0:0").await.ok();
52 let v4_port = v4_recv
53 .as_ref()
54 .map(|s| s.local_addr().unwrap().port())
55 .unwrap_or(0);
56
57 let v6_recv = if CONFIG.allow_ipv6 {
59 UdpSocket::bind("[::]:0").await.ok()
60 } else {
61 None
62 };
63 let v6_port = v6_recv
64 .as_ref()
65 .map(|s| s.local_addr().unwrap().port())
66 .unwrap_or(0);
67
68 let query_id = format!("{}", fxhash::hash32(query));
70
71 let mut targets: Vec<(SocketAddr, String)> = Vec::new();
73
74 for &addr in &CONFIG.multicast_addresses {
75 let ret_port = if addr.is_ipv6() { v6_port } else { v4_port };
76 if ret_port == 0 {
77 continue;
78 }
79 let msg = format!(
80 "LSL:shortinfo\r\n{}\r\n{} {}\r\n",
81 query, ret_port, query_id
82 );
83 targets.push((SocketAddr::new(addr, CONFIG.multicast_port), msg));
84 }
85
86 {
88 let msg = format!("LSL:shortinfo\r\n{}\r\n{} {}\r\n", query, v4_port, query_id);
89 for port in CONFIG.base_port..CONFIG.base_port + CONFIG.port_range {
90 targets.push((
91 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
92 msg.clone(),
93 ));
94 }
95 }
96
97 if CONFIG.allow_ipv6 && v6_port != 0 {
99 let msg = format!("LSL:shortinfo\r\n{}\r\n{} {}\r\n", query, v6_port, query_id);
100 for port in CONFIG.base_port..CONFIG.base_port + CONFIG.port_range {
101 targets.push((
102 SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), port),
103 msg.clone(),
104 ));
105 }
106 }
107
108 let end = tokio::time::Instant::now() + wait;
110 let mut results: HashMap<String, StreamInfo> = HashMap::new();
111 let mut wave_interval = Duration::from_millis(500);
112 let mut next_wave = tokio::time::Instant::now();
113
114 let mut buf4 = vec![0u8; 65536];
115 let mut buf6 = vec![0u8; 65536];
116
117 let v4_send = UdpSocket::bind("0.0.0.0:0").await.ok();
119 if let Some(ref s) = v4_send {
120 let _ = s.set_broadcast(true);
121 }
122
123 let v6_send = if CONFIG.allow_ipv6 {
124 UdpSocket::bind("[::]:0").await.ok()
125 } else {
126 None
127 };
128
129 loop {
130 let now = tokio::time::Instant::now();
131 if now >= end {
132 break;
133 }
134 if minimum > 0 && results.len() >= minimum as usize {
135 break;
136 }
137
138 if now >= next_wave {
140 for (target, msg) in &targets {
141 let sock = if target.is_ipv4() { &v4_send } else { &v6_send };
142 if let Some(s) = sock {
143 let _ = s.send_to(msg.as_bytes(), target).await;
144 }
145 }
146 next_wave = now + wave_interval;
147 wave_interval = (wave_interval * 2).min(Duration::from_secs(3));
148 }
149
150 let remaining = end.saturating_duration_since(tokio::time::Instant::now());
152 let recv_timeout = remaining.min(Duration::from_millis(100));
153
154 tokio::select! {
155 result = async {
157 match &v4_recv {
158 Some(s) => s.recv_from(&mut buf4).await,
159 None => std::future::pending().await,
160 }
161 } => {
162 if let Ok((len, addr)) = result {
163 if let Some(info) = parse_reply(&buf4[..len], &query_id) {
164 let uid = info.uid();
165 if !uid.is_empty() {
166 if info.v4address().is_empty() {
168 info.set_v4address(&addr.ip().to_string());
169 }
170 results.entry(uid).or_insert(info);
171 }
172 }
173 }
174 }
175 result = async {
177 match &v6_recv {
178 Some(s) => s.recv_from(&mut buf6).await,
179 None => std::future::pending().await,
180 }
181 } => {
182 if let Ok((len, addr)) = result {
183 if let Some(info) = parse_reply(&buf6[..len], &query_id) {
184 let uid = info.uid();
185 if !uid.is_empty() {
186 if info.v6address().is_empty() {
187 info.set_v6address(&addr.ip().to_string());
188 }
189 results.entry(uid).or_insert(info);
190 }
191 }
192 }
193 }
194 _ = tokio::time::sleep(recv_timeout) => {}
195 }
196 }
197
198 results.into_values().collect()
199}
200
201fn parse_reply(data: &[u8], expected_id: &str) -> Option<StreamInfo> {
202 if let Some(newline_pos) = data.iter().position(|&b| b == b'\n') {
203 let returned_id = std::str::from_utf8(&data[..newline_pos])
204 .unwrap_or("")
205 .trim();
206 if returned_id == expected_id {
207 let xml = std::str::from_utf8(&data[newline_pos + 1..]).unwrap_or("");
208 return StreamInfo::from_shortinfo_message(xml);
209 }
210 }
211 None
212}
213
214pub struct ContinuousResolver {
216 results: std::sync::Arc<parking_lot::Mutex<Vec<StreamInfo>>>,
217 shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,
218}
219
220impl ContinuousResolver {
221 pub fn new(query: &str, _forget_after: f64) -> Self {
222 let results = std::sync::Arc::new(parking_lot::Mutex::new(Vec::new()));
223 let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
224
225 let results_clone = results.clone();
226 let shutdown_clone = shutdown.clone();
227 let query = query.to_string();
228
229 std::thread::spawn(move || {
230 while !shutdown_clone.load(std::sync::atomic::Ordering::Relaxed) {
231 let found = resolve_query(&query, 0, 1.0);
232 *results_clone.lock() = found;
233 std::thread::sleep(Duration::from_secs_f64(0.5));
234 }
235 });
236
237 ContinuousResolver { results, shutdown }
238 }
239
240 pub fn results(&self) -> Vec<StreamInfo> {
241 self.results.lock().clone()
242 }
243}
244
245impl Drop for ContinuousResolver {
246 fn drop(&mut self) {
247 self.shutdown
248 .store(true, std::sync::atomic::Ordering::Relaxed);
249 }
250}