xsio 0.1.2

High-performance Cross-platform Low-level Socket I/O for Rust.
Documentation
use std::{sync::{atomic::Ordering, mpsc::Sender}, time::Duration};

use crate::*;

impl XUdpServerWorker {
    pub(crate) fn begin_ipv4_popmany_loop(&mut self, handler: &mut Box<dyn FnMut(&SocketAddrSrcV4, &[u8]) + Send + 'static>) -> std::io::Result<()> {
        let mut bucket = Ipv4Bucket::new(self.drain_capacity, self.msg_len);
        let socket = Socket::new(AfInet, SockDgram, IpProtoUdp).unwrap();
        socket.set_socket_option(SoRecvBufSize, self.kernel_socket_capacity)?;
        socket.set_socket_option(SoRecvTimeout, Duration::from_millis(500))?;
        socket.set_socket_option(SoReuseAddr, true)?;
        socket.bind(&self.address)?;
        if self.debug_mode {
            println!("[XUdpServerWorker] Starting IPv4 drain worker on {}", self.address);
        }
        let bucket_ref = &mut bucket;
        self.make_ready();
        while self.running.load(Ordering::Relaxed) {
            match socket.vecrecv(bucket_ref, 0) {
                Ok(count) => {
                    for i in 0..count {
                        let (addr, buf) = unsafe { bucket_ref.unsafe_peek(i) };
                        handler(addr, buf);
                    }
                }
                Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock || e.kind() == std::io::ErrorKind::TimedOut => {
                    continue;
                }
                Err(e) => {
                    dprintln!(self, "Error receiving data: {e}");
                    break;
                }
            }
        }
        Ok(())
    }

    pub(crate) fn begin_ipv6_popmany_loop(&mut self, handler: &mut Box<dyn FnMut(&SocketAddrSrcV6, &[u8]) + Send + 'static>) -> std::io::Result<()> {
        let mut bucket = Ipv6Bucket::new(self.drain_capacity, self.msg_len);
        let socket = Socket::new(AfInet6, SockDgram, IpProtoUdp).unwrap();
        socket.set_socket_option(SoRecvBufSize, self.kernel_socket_capacity)?;
        socket.set_socket_option(SoRecvTimeout, Duration::from_millis(500))?;
        socket.set_socket_option(SoReuseAddr, true)?;
        socket.bind(&self.address)?;
        let bucket_ref = &mut bucket;
        self.make_ready();
        while self.running.load(Ordering::Relaxed) {
            match socket.vecrecv(bucket_ref, 0) {
                Ok(count) => {
                    for i in 0..count {
                        let (addr, buf) = unsafe { bucket_ref.unsafe_peek(i) };
                        handler(addr, buf);
                    }
                }
                Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock || e.kind() == std::io::ErrorKind::TimedOut => {
                    continue;
                }
                Err(e) => {
                    dprintln!(self, "Error receiving data: {e}");
                    break;
                }
            }
        }
        Ok(())
    }
}