xsio 0.1.2

High-performance Cross-platform Low-level Socket I/O for Rust.
Documentation
use std::{net::SocketAddr, sync::{atomic::{AtomicBool, AtomicU64, Ordering}, mpsc::Sender, Arc}, time::Duration};

use crate::*;

#[macro_export]
macro_rules! dprintln {
    ($self:expr, $fmt:literal $(, $args:expr)* $(,)?) => {
        if $self.debug_mode {
            println!($fmt $(, $args)*);
        }
    };
}

pub struct XUdpServerWorker {
    pub(crate) number: usize,
    pub(crate) name: String,
    pub(crate) address: SocketAddr,
    pub(crate) running: Arc<AtomicBool>,
    pub(crate) ipv4_handler: Option<Box<dyn FnMut(&SocketAddrSrcV4, &[u8]) + Send>>,
    pub(crate) ipv6_handler: Option<Box<dyn FnMut(&SocketAddrSrcV6, &[u8]) + Send>>,
    pub(crate) drain_capacity: usize,
    pub(crate) frame_count: usize,
    pub(crate) frame_size: usize,
    pub(crate) msg_len: usize,
    pub(crate) kernel_socket_capacity: usize,
    pub(crate) debug_mode: bool,
    pub(crate) kernel_mode: bool,
    pub(crate) processed_counter: Arc<AtomicU64>,
    pub(crate) ready_tx: Sender<()>,
}

impl XUdpServerWorker {
    pub fn new(number: usize, name: String, address: SocketAddr, ready_tx: Sender<()>) -> Self {
        Self {
            number,
            name,
            address,
            running: Arc::new(AtomicBool::new(false)),
            ipv4_handler: None,
            ipv6_handler: None,
            drain_capacity: 1,
            frame_count: 1024,
            frame_size: 2048,
            msg_len: 2048,
            kernel_socket_capacity: 1024 * 1024,
            debug_mode: false,
            kernel_mode: false,
            processed_counter: Arc::new(AtomicU64::new(0)),
            ready_tx,
        }
    }

    pub fn set_socket_capacity(&mut self, size: usize) {
        if size > 0 {
            self.kernel_socket_capacity = size;
        } else {
            panic!("Receive buffer size must be greater than zero");
        }
    }

    pub fn set_max_frames(&mut self, max: usize) -> &mut Self {
        if max > 0 {
            self.frame_count = max;
        } else {
            panic!("Max frame count must be greater than zero");
        }
        self
    }

    pub fn set_frame_size(&mut self, size: usize) -> &mut Self {
        if size > 0 {
            self.frame_size = size;
        } else {
            panic!("Frame size must be greater than zero");
        }
        self
    }

    pub fn set_drain_capacity(&mut self, max: usize) -> &mut Self {
        self.drain_capacity = max;
        self
    }
    
    pub fn set_msg_len(&mut self, max: usize) -> &mut Self {
        self.msg_len = max;
        self
    }

    pub fn on_ipv4<F>(&mut self, h: F)
    where
        F: FnMut(&SocketAddrSrcV4, &[u8]) + Send + 'static,
    {
        if !self.address.is_ipv4() {
            panic!("Cannot set IPv4 handler on non-IPv4 address");
        }

        self.ipv4_handler = Some(Box::new(h));
    }

    pub fn on_ipv6<F>(&mut self, h: F)
    where
        F: FnMut(&SocketAddrSrcV6, &[u8]) + Send + 'static
    {
        if !self.address.is_ipv6() {
            panic!("Cannot set IPv6 handler on non-IPv6 address");
        }
        self.ipv6_handler = Some(Box::new(h));
    }

    pub(crate) fn make_ready(&mut self) {
        self.ready_tx.send(()).expect("Failed to send ready signal");
        while !self.running.load(Ordering::Acquire) {
            std::thread::yield_now();
        }
    }
    
    pub(crate) fn run(&mut self) -> std::io::Result<()> {
        if self.address.is_ipv4() {
            let mut handler = self.ipv4_handler.take().expect("No IPv4 handler set for XUdpServerWorker");
            if self.kernel_mode {
                self.begin_raw_queue_poll_loop(&mut handler)
            } else if self.drain_capacity > 1 {
                self.begin_ipv4_popmany_loop(&mut handler)
            } else {
                self.begin_ipv4_pop_loop(&mut handler)
            }
        } else if self.address.is_ipv6() {
            let mut handler = self.ipv6_handler.take().expect("No IPv6 handler set for XUdpServerWorker");
            if self.drain_capacity > 1 {
                self.begin_ipv6_popmany_loop(&mut handler)
            } else {
                self.begin_ipv6_pop_loop(&mut handler)
            }
        } else {
            Err(std::io::Error::new(
                std::io::ErrorKind::InvalidInput,
                "Unsupported address type",
            ))
        }
    }
}