use crate::config::CONFIG;
use crate::stream_info::StreamInfo;
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::time::Duration;
use tokio::net::UdpSocket;
pub fn resolve_all(wait_time: f64) -> Vec<StreamInfo> {
resolve_query("", 0, wait_time)
}
pub fn resolve_by_property(prop: &str, value: &str, minimum: i32, timeout: f64) -> Vec<StreamInfo> {
let query = if value.is_empty() {
String::new()
} else {
format!("{}='{}'", prop, value)
};
resolve_query(&query, minimum, timeout)
}
pub fn resolve_by_predicate(pred: &str, minimum: i32, timeout: f64) -> Vec<StreamInfo> {
resolve_query(pred, minimum, timeout)
}
pub fn resolve_query(query: &str, minimum: i32, timeout: f64) -> Vec<StreamInfo> {
let query = query.to_string();
let (tx, rx) = std::sync::mpsc::channel();
crate::RUNTIME.spawn(async move {
let result = resolve_query_async(&query, minimum, timeout).await;
let _ = tx.send(result);
});
let deadline = std::time::Duration::from_secs_f64(timeout + 2.0);
rx.recv_timeout(deadline).unwrap_or_default()
}
async fn resolve_query_async(query: &str, minimum: i32, timeout: f64) -> Vec<StreamInfo> {
let wait = Duration::from_secs_f64(timeout.max(0.01));
let v4_recv = UdpSocket::bind("0.0.0.0:0").await.ok();
let v4_port = v4_recv
.as_ref()
.map(|s| s.local_addr().unwrap().port())
.unwrap_or(0);
let v6_recv = if CONFIG.allow_ipv6 {
UdpSocket::bind("[::]:0").await.ok()
} else {
None
};
let v6_port = v6_recv
.as_ref()
.map(|s| s.local_addr().unwrap().port())
.unwrap_or(0);
let query_id = format!("{}", fxhash::hash32(query));
let mut targets: Vec<(SocketAddr, String)> = Vec::new();
for &addr in &CONFIG.multicast_addresses {
let ret_port = if addr.is_ipv6() { v6_port } else { v4_port };
if ret_port == 0 {
continue;
}
let msg = format!(
"LSL:shortinfo\r\n{}\r\n{} {}\r\n",
query, ret_port, query_id
);
targets.push((SocketAddr::new(addr, CONFIG.multicast_port), msg));
}
{
let msg = format!("LSL:shortinfo\r\n{}\r\n{} {}\r\n", query, v4_port, query_id);
for port in CONFIG.base_port..CONFIG.base_port + CONFIG.port_range {
targets.push((
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
msg.clone(),
));
}
}
if CONFIG.allow_ipv6 && v6_port != 0 {
let msg = format!("LSL:shortinfo\r\n{}\r\n{} {}\r\n", query, v6_port, query_id);
for port in CONFIG.base_port..CONFIG.base_port + CONFIG.port_range {
targets.push((
SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), port),
msg.clone(),
));
}
}
let end = tokio::time::Instant::now() + wait;
let mut results: HashMap<String, StreamInfo> = HashMap::new();
let mut wave_interval = Duration::from_millis(500);
let mut next_wave = tokio::time::Instant::now();
let mut buf4 = vec![0u8; 65536];
let mut buf6 = vec![0u8; 65536];
let v4_send = UdpSocket::bind("0.0.0.0:0").await.ok();
if let Some(ref s) = v4_send {
let _ = s.set_broadcast(true);
}
let v6_send = if CONFIG.allow_ipv6 {
UdpSocket::bind("[::]:0").await.ok()
} else {
None
};
loop {
let now = tokio::time::Instant::now();
if now >= end {
break;
}
if minimum > 0 && results.len() >= minimum as usize {
break;
}
if now >= next_wave {
for (target, msg) in &targets {
let sock = if target.is_ipv4() { &v4_send } else { &v6_send };
if let Some(s) = sock {
let _ = s.send_to(msg.as_bytes(), target).await;
}
}
next_wave = now + wave_interval;
wave_interval = (wave_interval * 2).min(Duration::from_secs(3));
}
let remaining = end.saturating_duration_since(tokio::time::Instant::now());
let recv_timeout = remaining.min(Duration::from_millis(100));
tokio::select! {
result = async {
match &v4_recv {
Some(s) => s.recv_from(&mut buf4).await,
None => std::future::pending().await,
}
} => {
if let Ok((len, addr)) = result {
if let Some(info) = parse_reply(&buf4[..len], &query_id) {
let uid = info.uid();
if !uid.is_empty() {
if info.v4address().is_empty() {
info.set_v4address(&addr.ip().to_string());
}
results.entry(uid).or_insert(info);
}
}
}
}
result = async {
match &v6_recv {
Some(s) => s.recv_from(&mut buf6).await,
None => std::future::pending().await,
}
} => {
if let Ok((len, addr)) = result {
if let Some(info) = parse_reply(&buf6[..len], &query_id) {
let uid = info.uid();
if !uid.is_empty() {
if info.v6address().is_empty() {
info.set_v6address(&addr.ip().to_string());
}
results.entry(uid).or_insert(info);
}
}
}
}
_ = tokio::time::sleep(recv_timeout) => {}
}
}
results.into_values().collect()
}
fn parse_reply(data: &[u8], expected_id: &str) -> Option<StreamInfo> {
if let Some(newline_pos) = data.iter().position(|&b| b == b'\n') {
let returned_id = std::str::from_utf8(&data[..newline_pos])
.unwrap_or("")
.trim();
if returned_id == expected_id {
let xml = std::str::from_utf8(&data[newline_pos + 1..]).unwrap_or("");
return StreamInfo::from_shortinfo_message(xml);
}
}
None
}
pub struct ContinuousResolver {
results: std::sync::Arc<parking_lot::Mutex<Vec<StreamInfo>>>,
shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,
}
impl ContinuousResolver {
pub fn new(query: &str, _forget_after: f64) -> Self {
let results = std::sync::Arc::new(parking_lot::Mutex::new(Vec::new()));
let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let results_clone = results.clone();
let shutdown_clone = shutdown.clone();
let query = query.to_string();
std::thread::spawn(move || {
while !shutdown_clone.load(std::sync::atomic::Ordering::Relaxed) {
let found = resolve_query(&query, 0, 1.0);
*results_clone.lock() = found;
std::thread::sleep(Duration::from_secs_f64(0.5));
}
});
ContinuousResolver { results, shutdown }
}
pub fn results(&self) -> Vec<StreamInfo> {
self.results.lock().clone()
}
}
impl Drop for ContinuousResolver {
fn drop(&mut self) {
self.shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);
}
}