laser-dac 0.12.1

Unified laser DAC abstraction supporting multiple protocols
Documentation
//! Core IDN receiver UDP server.

use std::io;
use std::net::{IpAddr, SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Instant;

use super::behavior::{ReceivedChunk, ServerBehavior};
use super::config::ServerConfig;
use super::constants::*;
use super::packet_builder::*;
use super::parser::FrameParser;

/// Max UDP payload for IDN.
const RECV_BUFFER_SIZE: usize = 65_535;

/// An IDN receiver server with pluggable behavior.
///
/// The server tracks exactly one logical client at a time, identified by
/// source IP address. Additional peers attempting to connect concurrently are
/// rejected via [`ServerBehavior::is_occupied`] semantics.
pub struct IdnServer<B: ServerBehavior> {
    socket: UdpSocket,
    config: ServerConfig,
    behavior: B,
    running: Arc<AtomicBool>,
    local_addr: SocketAddr,
    // Client tracking (by IP address, not port — multiple sockets from the same
    // host are the same logical client)
    last_client_ip: Option<IpAddr>,
    last_client_addr: Option<SocketAddr>,
    last_activity: Option<Instant>,
    /// Client that was force-disconnected (ignore packets from them temporarily)
    disconnected_client: Option<(IpAddr, Instant)>,
    parser: FrameParser,
}

impl<B: ServerBehavior> IdnServer<B> {
    /// Create a new IDN receiver server with the given configuration and behavior.
    pub fn new(config: ServerConfig, behavior: B) -> io::Result<Self> {
        let socket = UdpSocket::bind(config.bind_address)?;
        socket.set_read_timeout(Some(config.read_timeout))?;
        let local_addr = socket.local_addr()?;

        log::info!("IDN receiver listening on {}", local_addr);

        Ok(Self {
            socket,
            config,
            behavior,
            running: Arc::new(AtomicBool::new(true)),
            local_addr,
            last_client_ip: None,
            last_client_addr: None,
            last_activity: None,
            disconnected_client: None,
            parser: FrameParser::new(),
        })
    }

    /// Get the server's local address.
    pub fn addr(&self) -> SocketAddr {
        self.local_addr
    }

    /// Get a handle to control the running flag.
    pub fn running_handle(&self) -> Arc<AtomicBool> {
        Arc::clone(&self.running)
    }

    /// Start the server in a background thread and return a handle.
    pub fn spawn(self) -> ServerHandle {
        let addr = self.addr();
        let running = self.running_handle();

        let handle = thread::spawn(move || {
            self.run();
        });

        ServerHandle {
            addr,
            running,
            handle: Some(handle),
        }
    }

    /// Run the server loop (blocking).
    pub fn run(mut self) {
        let mut buf = [0u8; RECV_BUFFER_SIZE];

        while self.running.load(Ordering::SeqCst) {
            // Check for force disconnect
            if self.behavior.should_force_disconnect() {
                if let Some(client_ip) = self.last_client_ip.take() {
                    log::info!(
                        "Force disconnecting client: {}",
                        self.last_client_addr
                            .take()
                            .map_or(client_ip.to_string(), |a| a.to_string())
                    );
                    self.disconnected_client = Some((client_ip, Instant::now()));
                    self.last_activity = None;
                    self.parser.reset();
                    self.behavior.on_client_disconnected();
                }
            }

            // Clear disconnected client after the configured window
            if let Some((_, disconnect_time)) = self.disconnected_client {
                if disconnect_time.elapsed() > self.config.force_disconnect_window {
                    self.disconnected_client = None;
                }
            }

            // Check for link timeout
            if let Some(last_time) = self.last_activity {
                if last_time.elapsed() > self.config.link_timeout && self.last_client_ip.is_some() {
                    log::info!("Client timed out");
                    self.last_client_ip = None;
                    self.last_client_addr = None;
                    self.last_activity = None;
                    self.parser.reset();
                    self.behavior.on_client_disconnected();
                }
            }

            let (len, src) = match self.socket.recv_from(&mut buf) {
                Ok(result) => result,
                Err(e)
                    if matches!(
                        e.kind(),
                        io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut
                    ) =>
                {
                    continue
                }
                Err(e) => {
                    log::error!("Socket error: {}", e);
                    break;
                }
            };

            if len < 4 {
                continue;
            }

            // Apply simulated latency
            let latency = self.behavior.get_simulated_latency();
            if !latency.is_zero() {
                thread::sleep(latency);
            }

            // Ignore packets from force-disconnected client (match by IP)
            if let Some((disconnected_ip, _)) = self.disconnected_client {
                if src.ip() == disconnected_ip {
                    continue;
                }
            }

            self.behavior.on_packet_received(&buf[..len]);

            let command = buf[0];
            let flags = buf[1];
            let sequence = u16::from_be_bytes([buf[2], buf[3]]);

            log::debug!(
                "Received packet: cmd=0x{:02X} flags=0x{:02X} seq={} len={} from {}",
                command,
                flags,
                sequence,
                len,
                src
            );

            if !self.behavior.should_respond(command) {
                log::debug!("Ignoring command 0x{:02X} (should_respond=false)", command);
                continue;
            }

            match command {
                IDNCMD_SCAN_REQUEST => {
                    log::debug!("Received SCAN_REQUEST from {}", src);
                    let response = build_scan_response(
                        flags,
                        sequence,
                        &self.config.unit_id,
                        &self.config.hostname,
                        self.config.protocol_version,
                        self.behavior.get_status_byte(),
                    );
                    let _ = self.socket.send_to(&response, src);
                }
                IDNCMD_SERVICEMAP_REQUEST => {
                    log::debug!("Received SERVICEMAP_REQUEST from {}", src);
                    let response = build_servicemap_response(
                        flags,
                        sequence,
                        &self.config.services,
                        &self.config.relays,
                    );
                    let _ = self.socket.send_to(&response, src);
                }
                IDNCMD_PING_REQUEST => {
                    log::trace!("Received PING_REQUEST from {}", src);
                    let payload = &buf[4..len];
                    let response = build_ping_response(flags, sequence, payload);
                    let _ = self.socket.send_to(&response, src);
                }
                IDNCMD_RT_CNLMSG | IDNCMD_RT_CNLMSG_ACKREQ => {
                    if self.behavior.is_excluded() {
                        if command == IDNCMD_RT_CNLMSG_ACKREQ {
                            let response =
                                build_ack_response(flags, sequence, IDNVAL_RTACK_ERR_EXCLUDED);
                            let _ = self.socket.send_to(&response, src);
                        }
                        continue;
                    }

                    if self.behavior.is_occupied()
                        && self.last_client_ip.is_some()
                        && self.last_client_ip != Some(src.ip())
                    {
                        if command == IDNCMD_RT_CNLMSG_ACKREQ {
                            let response =
                                build_ack_response(flags, sequence, IDNVAL_RTACK_ERR_OCCUPIED);
                            let _ = self.socket.send_to(&response, src);
                        }
                        continue;
                    }

                    // Track client connection by IP — different ports from the
                    // same host are the same logical client.
                    if self.last_client_ip != Some(src.ip()) {
                        log::info!("Client connected: {}", src);
                        self.last_client_ip = Some(src.ip());
                        self.last_client_addr = Some(src);
                        self.behavior.on_client_connected(src);
                    }
                    self.last_activity = Some(Instant::now());

                    // Hand off raw bytes first (for tools that want low-level
                    // access), then parse and emit points for normal consumers.
                    self.behavior.on_frame_received(&buf[..len]);
                    if let Some(chunk) = self.parser.parse_frame_data(src, &buf[..len]) {
                        self.behavior
                            .on_chunk_received(ReceivedChunk::new(src, &chunk));
                    }

                    if command == IDNCMD_RT_CNLMSG_ACKREQ {
                        let response = build_ack_response(
                            flags,
                            sequence,
                            self.behavior.get_ack_result_code(),
                        );
                        let _ = self.socket.send_to(&response, src);
                    }
                }
                IDNCMD_RT_CNLMSG_CLOSE => {
                    log::info!("Received RT_CNLMSG_CLOSE from {}", src);
                    if self.last_client_ip == Some(src.ip()) {
                        self.last_client_ip = None;
                        self.last_client_addr = None;
                        self.last_activity = None;
                        self.parser.reset();
                        self.behavior.on_client_disconnected();
                    }
                }
                IDNCMD_RT_CNLMSG_CLOSE_ACKREQ => {
                    log::info!("Received RT_CNLMSG_CLOSE_ACKREQ from {}", src);
                    if self.last_client_ip == Some(src.ip()) {
                        self.last_client_ip = None;
                        self.last_client_addr = None;
                        self.last_activity = None;
                        self.parser.reset();
                        self.behavior.on_client_disconnected();
                    }
                    let response =
                        build_ack_response(flags, sequence, self.behavior.get_ack_result_code());
                    let _ = self.socket.send_to(&response, src);
                }
                IDNCMD_UNIT_PARAMS_REQUEST => {
                    log::debug!("Received UNIT_PARAMS_REQUEST from {}", src);
                    let service_id = if len > 4 { buf[4] } else { 0 };
                    let param_id = if len > 7 {
                        u16::from_be_bytes([buf[6], buf[7]])
                    } else {
                        0
                    };
                    let response = build_parameter_response(
                        flags,
                        sequence,
                        IDNCMD_UNIT_PARAMS_RESPONSE,
                        0,
                        service_id,
                        param_id,
                        0x12345678,
                    );
                    let _ = self.socket.send_to(&response, src);
                }
                IDNCMD_SERVICE_PARAMS_REQUEST => {
                    log::debug!("Received SERVICE_PARAMS_REQUEST from {}", src);
                    let service_id = if len > 4 { buf[4] } else { 0 };
                    let param_id = if len > 7 {
                        u16::from_be_bytes([buf[6], buf[7]])
                    } else {
                        0
                    };
                    let response = build_parameter_response(
                        flags,
                        sequence,
                        IDNCMD_SERVICE_PARAMS_RESPONSE,
                        0,
                        service_id,
                        param_id,
                        0x12345678,
                    );
                    let _ = self.socket.send_to(&response, src);
                }
                _ => {
                    log::trace!("Unknown command: 0x{:02X} from {}", command, src);
                }
            }
        }

        log::info!("IDN receiver stopped");
    }
}

/// Handle for controlling a spawned server.
pub struct ServerHandle {
    /// The server's local address.
    pub addr: SocketAddr,
    running: Arc<AtomicBool>,
    handle: Option<JoinHandle<()>>,
}

impl ServerHandle {
    /// Stop the server.
    pub fn stop(&self) {
        self.running.store(false, Ordering::SeqCst);
    }
}

impl Drop for ServerHandle {
    fn drop(&mut self) {
        self.stop();
        if let Some(h) = self.handle.take() {
            let _ = h.join();
        }
    }
}