lsl-core 0.0.1

Lab Streaming Layer — pure Rust core library
Documentation
//! Stream resolver: discovers streams on the network via UDP.
//!
//! Sends LSL:shortinfo queries over IPv4 and IPv6 multicast/broadcast/unicast,
//! collects responses, and returns discovered StreamInfo objects.

use crate::config::CONFIG;
use crate::stream_info::StreamInfo;
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::time::Duration;
use tokio::net::UdpSocket;

/// Resolve streams matching a query string.
pub fn resolve_all(wait_time: f64) -> Vec<StreamInfo> {
    resolve_query("", 0, wait_time)
}

/// Resolve streams by property name and value.
pub fn resolve_by_property(prop: &str, value: &str, minimum: i32, timeout: f64) -> Vec<StreamInfo> {
    let query = if value.is_empty() {
        String::new()
    } else {
        format!("{}='{}'", prop, value)
    };
    resolve_query(&query, minimum, timeout)
}

/// Resolve streams by predicate.
pub fn resolve_by_predicate(pred: &str, minimum: i32, timeout: f64) -> Vec<StreamInfo> {
    resolve_query(pred, minimum, timeout)
}

/// Core resolve function.
pub fn resolve_query(query: &str, minimum: i32, timeout: f64) -> Vec<StreamInfo> {
    // Spawn on the RUNTIME and wait via channel (avoids block_on deadlock
    // when called from a thread that's already inside the RUNTIME).
    let query = query.to_string();
    let (tx, rx) = std::sync::mpsc::channel();
    crate::RUNTIME.spawn(async move {
        let result = resolve_query_async(&query, minimum, timeout).await;
        let _ = tx.send(result);
    });
    let deadline = std::time::Duration::from_secs_f64(timeout + 2.0);
    rx.recv_timeout(deadline).unwrap_or_default()
}

async fn resolve_query_async(query: &str, minimum: i32, timeout: f64) -> Vec<StreamInfo> {
    let wait = Duration::from_secs_f64(timeout.max(0.01));

    // Create IPv4 receiver socket
    let v4_recv = UdpSocket::bind("0.0.0.0:0").await.ok();
    let v4_port = v4_recv
        .as_ref()
        .map(|s| s.local_addr().unwrap().port())
        .unwrap_or(0);

    // Create IPv6 receiver socket
    let v6_recv = if CONFIG.allow_ipv6 {
        UdpSocket::bind("[::]:0").await.ok()
    } else {
        None
    };
    let v6_port = v6_recv
        .as_ref()
        .map(|s| s.local_addr().unwrap().port())
        .unwrap_or(0);

    // Build query message. For IPv6 targets we use v6_port, for v4 we use v4_port.
    let query_id = format!("{}", fxhash::hash32(query));

    // Build target list: (address, message)
    let mut targets: Vec<(SocketAddr, String)> = Vec::new();

    for &addr in &CONFIG.multicast_addresses {
        let ret_port = if addr.is_ipv6() { v6_port } else { v4_port };
        if ret_port == 0 {
            continue;
        }
        let msg = format!(
            "LSL:shortinfo\r\n{}\r\n{} {}\r\n",
            query, ret_port, query_id
        );
        targets.push((SocketAddr::new(addr, CONFIG.multicast_port), msg));
    }

    // Unicast to base ports (IPv4)
    {
        let msg = format!("LSL:shortinfo\r\n{}\r\n{} {}\r\n", query, v4_port, query_id);
        for port in CONFIG.base_port..CONFIG.base_port + CONFIG.port_range {
            targets.push((
                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
                msg.clone(),
            ));
        }
    }

    // Unicast to base ports (IPv6 loopback)
    if CONFIG.allow_ipv6 && v6_port != 0 {
        let msg = format!("LSL:shortinfo\r\n{}\r\n{} {}\r\n", query, v6_port, query_id);
        for port in CONFIG.base_port..CONFIG.base_port + CONFIG.port_range {
            targets.push((
                SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), port),
                msg.clone(),
            ));
        }
    }

    // Send queries with retry waves
    let end = tokio::time::Instant::now() + wait;
    let mut results: HashMap<String, StreamInfo> = HashMap::new();
    let mut wave_interval = Duration::from_millis(500);
    let mut next_wave = tokio::time::Instant::now();

    let mut buf4 = vec![0u8; 65536];
    let mut buf6 = vec![0u8; 65536];

    // Prepare send sockets
    let v4_send = UdpSocket::bind("0.0.0.0:0").await.ok();
    if let Some(ref s) = v4_send {
        let _ = s.set_broadcast(true);
    }

    let v6_send = if CONFIG.allow_ipv6 {
        UdpSocket::bind("[::]:0").await.ok()
    } else {
        None
    };

    loop {
        let now = tokio::time::Instant::now();
        if now >= end {
            break;
        }
        if minimum > 0 && results.len() >= minimum as usize {
            break;
        }

        // Send a wave of queries
        if now >= next_wave {
            for (target, msg) in &targets {
                let sock = if target.is_ipv4() { &v4_send } else { &v6_send };
                if let Some(s) = sock {
                    let _ = s.send_to(msg.as_bytes(), target).await;
                }
            }
            next_wave = now + wave_interval;
            wave_interval = (wave_interval * 2).min(Duration::from_secs(3));
        }

        // Receive replies from both sockets
        let remaining = end.saturating_duration_since(tokio::time::Instant::now());
        let recv_timeout = remaining.min(Duration::from_millis(100));

        tokio::select! {
            // IPv4 replies
            result = async {
                match &v4_recv {
                    Some(s) => s.recv_from(&mut buf4).await,
                    None => std::future::pending().await,
                }
            } => {
                if let Ok((len, addr)) = result {
                    if let Some(info) = parse_reply(&buf4[..len], &query_id) {
                        let uid = info.uid();
                        if !uid.is_empty() {
                            // Populate source address for cross-machine networking
                            if info.v4address().is_empty() {
                                info.set_v4address(&addr.ip().to_string());
                            }
                            results.entry(uid).or_insert(info);
                        }
                    }
                }
            }
            // IPv6 replies
            result = async {
                match &v6_recv {
                    Some(s) => s.recv_from(&mut buf6).await,
                    None => std::future::pending().await,
                }
            } => {
                if let Ok((len, addr)) = result {
                    if let Some(info) = parse_reply(&buf6[..len], &query_id) {
                        let uid = info.uid();
                        if !uid.is_empty() {
                            if info.v6address().is_empty() {
                                info.set_v6address(&addr.ip().to_string());
                            }
                            results.entry(uid).or_insert(info);
                        }
                    }
                }
            }
            _ = tokio::time::sleep(recv_timeout) => {}
        }
    }

    results.into_values().collect()
}

fn parse_reply(data: &[u8], expected_id: &str) -> Option<StreamInfo> {
    if let Some(newline_pos) = data.iter().position(|&b| b == b'\n') {
        let returned_id = std::str::from_utf8(&data[..newline_pos])
            .unwrap_or("")
            .trim();
        if returned_id == expected_id {
            let xml = std::str::from_utf8(&data[newline_pos + 1..]).unwrap_or("");
            return StreamInfo::from_shortinfo_message(xml);
        }
    }
    None
}

/// Continuous resolver that keeps discovering streams in the background.
pub struct ContinuousResolver {
    results: std::sync::Arc<parking_lot::Mutex<Vec<StreamInfo>>>,
    shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,
}

impl ContinuousResolver {
    pub fn new(query: &str, _forget_after: f64) -> Self {
        let results = std::sync::Arc::new(parking_lot::Mutex::new(Vec::new()));
        let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));

        let results_clone = results.clone();
        let shutdown_clone = shutdown.clone();
        let query = query.to_string();

        std::thread::spawn(move || {
            while !shutdown_clone.load(std::sync::atomic::Ordering::Relaxed) {
                let found = resolve_query(&query, 0, 1.0);
                *results_clone.lock() = found;
                std::thread::sleep(Duration::from_secs_f64(0.5));
            }
        });

        ContinuousResolver { results, shutdown }
    }

    pub fn results(&self) -> Vec<StreamInfo> {
        self.results.lock().clone()
    }
}

impl Drop for ContinuousResolver {
    fn drop(&mut self) {
        self.shutdown
            .store(true, std::sync::atomic::Ordering::Relaxed);
    }
}