Skip to main content

lsl_core/
resolver.rs

1//! Stream resolver: discovers streams on the network via UDP.
2//!
3//! Sends LSL:shortinfo queries over IPv4 and IPv6 multicast/broadcast/unicast,
4//! collects responses, and returns discovered StreamInfo objects.
5
6use crate::config::CONFIG;
7use crate::stream_info::StreamInfo;
8use std::collections::HashMap;
9use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
10use std::time::Duration;
11use tokio::net::UdpSocket;
12
13/// Resolve streams matching a query string.
14pub fn resolve_all(wait_time: f64) -> Vec<StreamInfo> {
15    resolve_query("", 0, wait_time)
16}
17
18/// Resolve streams by property name and value.
19pub fn resolve_by_property(prop: &str, value: &str, minimum: i32, timeout: f64) -> Vec<StreamInfo> {
20    let query = if value.is_empty() {
21        String::new()
22    } else {
23        format!("{}='{}'", prop, value)
24    };
25    resolve_query(&query, minimum, timeout)
26}
27
28/// Resolve streams by predicate.
29pub fn resolve_by_predicate(pred: &str, minimum: i32, timeout: f64) -> Vec<StreamInfo> {
30    resolve_query(pred, minimum, timeout)
31}
32
33/// Core resolve function.
34pub fn resolve_query(query: &str, minimum: i32, timeout: f64) -> Vec<StreamInfo> {
35    // Spawn on the RUNTIME and wait via channel (avoids block_on deadlock
36    // when called from a thread that's already inside the RUNTIME).
37    let query = query.to_string();
38    let (tx, rx) = std::sync::mpsc::channel();
39    crate::RUNTIME.spawn(async move {
40        let result = resolve_query_async(&query, minimum, timeout).await;
41        let _ = tx.send(result);
42    });
43    let deadline = std::time::Duration::from_secs_f64(timeout + 2.0);
44    rx.recv_timeout(deadline).unwrap_or_default()
45}
46
47async fn resolve_query_async(query: &str, minimum: i32, timeout: f64) -> Vec<StreamInfo> {
48    let wait = Duration::from_secs_f64(timeout.max(0.01));
49
50    // Create IPv4 receiver socket
51    let v4_recv = UdpSocket::bind("0.0.0.0:0").await.ok();
52    let v4_port = v4_recv
53        .as_ref()
54        .map(|s| s.local_addr().unwrap().port())
55        .unwrap_or(0);
56
57    // Create IPv6 receiver socket
58    let v6_recv = if CONFIG.allow_ipv6 {
59        UdpSocket::bind("[::]:0").await.ok()
60    } else {
61        None
62    };
63    let v6_port = v6_recv
64        .as_ref()
65        .map(|s| s.local_addr().unwrap().port())
66        .unwrap_or(0);
67
68    // Build query message. For IPv6 targets we use v6_port, for v4 we use v4_port.
69    let query_id = format!("{}", fxhash::hash32(query));
70
71    // Build target list: (address, message)
72    let mut targets: Vec<(SocketAddr, String)> = Vec::new();
73
74    for &addr in &CONFIG.multicast_addresses {
75        let ret_port = if addr.is_ipv6() { v6_port } else { v4_port };
76        if ret_port == 0 {
77            continue;
78        }
79        let msg = format!(
80            "LSL:shortinfo\r\n{}\r\n{} {}\r\n",
81            query, ret_port, query_id
82        );
83        targets.push((SocketAddr::new(addr, CONFIG.multicast_port), msg));
84    }
85
86    // Unicast to base ports (IPv4)
87    {
88        let msg = format!("LSL:shortinfo\r\n{}\r\n{} {}\r\n", query, v4_port, query_id);
89        for port in CONFIG.base_port..CONFIG.base_port + CONFIG.port_range {
90            targets.push((
91                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
92                msg.clone(),
93            ));
94        }
95    }
96
97    // Unicast to base ports (IPv6 loopback)
98    if CONFIG.allow_ipv6 && v6_port != 0 {
99        let msg = format!("LSL:shortinfo\r\n{}\r\n{} {}\r\n", query, v6_port, query_id);
100        for port in CONFIG.base_port..CONFIG.base_port + CONFIG.port_range {
101            targets.push((
102                SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), port),
103                msg.clone(),
104            ));
105        }
106    }
107
108    // Send queries with retry waves
109    let end = tokio::time::Instant::now() + wait;
110    let mut results: HashMap<String, StreamInfo> = HashMap::new();
111    let mut wave_interval = Duration::from_millis(500);
112    let mut next_wave = tokio::time::Instant::now();
113
114    let mut buf4 = vec![0u8; 65536];
115    let mut buf6 = vec![0u8; 65536];
116
117    // Prepare send sockets
118    let v4_send = UdpSocket::bind("0.0.0.0:0").await.ok();
119    if let Some(ref s) = v4_send {
120        let _ = s.set_broadcast(true);
121    }
122
123    let v6_send = if CONFIG.allow_ipv6 {
124        UdpSocket::bind("[::]:0").await.ok()
125    } else {
126        None
127    };
128
129    loop {
130        let now = tokio::time::Instant::now();
131        if now >= end {
132            break;
133        }
134        if minimum > 0 && results.len() >= minimum as usize {
135            break;
136        }
137
138        // Send a wave of queries
139        if now >= next_wave {
140            for (target, msg) in &targets {
141                let sock = if target.is_ipv4() { &v4_send } else { &v6_send };
142                if let Some(s) = sock {
143                    let _ = s.send_to(msg.as_bytes(), target).await;
144                }
145            }
146            next_wave = now + wave_interval;
147            wave_interval = (wave_interval * 2).min(Duration::from_secs(3));
148        }
149
150        // Receive replies from both sockets
151        let remaining = end.saturating_duration_since(tokio::time::Instant::now());
152        let recv_timeout = remaining.min(Duration::from_millis(100));
153
154        tokio::select! {
155            // IPv4 replies
156            result = async {
157                match &v4_recv {
158                    Some(s) => s.recv_from(&mut buf4).await,
159                    None => std::future::pending().await,
160                }
161            } => {
162                if let Ok((len, addr)) = result {
163                    if let Some(info) = parse_reply(&buf4[..len], &query_id) {
164                        let uid = info.uid();
165                        if !uid.is_empty() {
166                            // Populate source address for cross-machine networking
167                            if info.v4address().is_empty() {
168                                info.set_v4address(&addr.ip().to_string());
169                            }
170                            results.entry(uid).or_insert(info);
171                        }
172                    }
173                }
174            }
175            // IPv6 replies
176            result = async {
177                match &v6_recv {
178                    Some(s) => s.recv_from(&mut buf6).await,
179                    None => std::future::pending().await,
180                }
181            } => {
182                if let Ok((len, addr)) = result {
183                    if let Some(info) = parse_reply(&buf6[..len], &query_id) {
184                        let uid = info.uid();
185                        if !uid.is_empty() {
186                            if info.v6address().is_empty() {
187                                info.set_v6address(&addr.ip().to_string());
188                            }
189                            results.entry(uid).or_insert(info);
190                        }
191                    }
192                }
193            }
194            _ = tokio::time::sleep(recv_timeout) => {}
195        }
196    }
197
198    results.into_values().collect()
199}
200
201fn parse_reply(data: &[u8], expected_id: &str) -> Option<StreamInfo> {
202    if let Some(newline_pos) = data.iter().position(|&b| b == b'\n') {
203        let returned_id = std::str::from_utf8(&data[..newline_pos])
204            .unwrap_or("")
205            .trim();
206        if returned_id == expected_id {
207            let xml = std::str::from_utf8(&data[newline_pos + 1..]).unwrap_or("");
208            return StreamInfo::from_shortinfo_message(xml);
209        }
210    }
211    None
212}
213
214/// Continuous resolver that keeps discovering streams in the background.
215pub struct ContinuousResolver {
216    results: std::sync::Arc<parking_lot::Mutex<Vec<StreamInfo>>>,
217    shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,
218}
219
220impl ContinuousResolver {
221    pub fn new(query: &str, _forget_after: f64) -> Self {
222        let results = std::sync::Arc::new(parking_lot::Mutex::new(Vec::new()));
223        let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
224
225        let results_clone = results.clone();
226        let shutdown_clone = shutdown.clone();
227        let query = query.to_string();
228
229        std::thread::spawn(move || {
230            while !shutdown_clone.load(std::sync::atomic::Ordering::Relaxed) {
231                let found = resolve_query(&query, 0, 1.0);
232                *results_clone.lock() = found;
233                std::thread::sleep(Duration::from_secs_f64(0.5));
234            }
235        });
236
237        ContinuousResolver { results, shutdown }
238    }
239
240    pub fn results(&self) -> Vec<StreamInfo> {
241        self.results.lock().clone()
242    }
243}
244
245impl Drop for ContinuousResolver {
246    fn drop(&mut self) {
247        self.shutdown
248            .store(true, std::sync::atomic::Ordering::Relaxed);
249    }
250}