socket_server 0.12.0

socket write event polling registration avoiding syscall
Documentation
use std::{
    io::{Read, Write},
    mem::{transmute_copy, MaybeUninit},
    net::SocketAddr,
};

use fast_collections::Slab;
use qcell::{LCell, LCellOwner};

use super::socket::{Registry, ServerSocketListener, Socket, SocketState};

pub(crate) trait Poll<T> {
    fn open(&mut self, stream: &mut T, token: usize) -> Result<(), ()>;
    fn close(&mut self, stream: &mut T);
}

pub(crate) struct Selector<'id, 'registry, T, P, Stream>
where
    T: ServerSocketListener<'id>,
    [(); T::READ_BUFFFER_LEN]:,
    [(); T::WRITE_BUFFER_LEN]:,
    [(); T::MAX_CONNECTIONS]:,
{
    pub poll: P,
    pub server: LCell<'id, T>,
    pub sockets: Slab<Socket<'id, 'registry, T>, { T::MAX_CONNECTIONS }>,
    pub streams: [MaybeUninit<Stream>; T::MAX_CONNECTIONS],
}

impl<'id, 'registry, T, P, Stream> Selector<'id, 'registry, T, P, Stream>
where
    T: ServerSocketListener<'id, Connection: Default>,
    [(); T::READ_BUFFFER_LEN]:,
    [(); T::WRITE_BUFFER_LEN]:,
    [(); T::MAX_CONNECTIONS]:,
{
    pub fn new(server: T, owner: &mut LCellOwner<'id>, poll: P) -> Self {
        let streams = MaybeUninit::<[MaybeUninit<Stream>; T::MAX_CONNECTIONS]>::uninit();
        let streams = unsafe { transmute_copy(&streams.assume_init()) };
        Self {
            server: owner.cell(server),
            sockets: Slab::new(),
            streams,
            poll,
        }
    }
}

impl<'id, 'registry, T, P: Poll<Stream>, Stream: Read + Write>
    Selector<'id, 'registry, T, P, Stream>
where
    T: ServerSocketListener<'id, Connection: Default>,
    [(); T::READ_BUFFFER_LEN]:,
    [(); T::WRITE_BUFFER_LEN]:,
    [(); T::MAX_CONNECTIONS]:,
{
    pub fn read(&mut self, owner: &mut LCellOwner<'id>, token: usize) {
        let socket = unsafe { self.sockets.get_unchecked_mut(token) };
        let stream = unsafe { self.streams.get_unchecked_mut(token).assume_init_mut() };
        match socket.read_buf.rw(owner).push_from_read(stream) {
            Ok(read_len) => {
                if read_len == 0 {
                    socket.register_close_event(owner)
                } else {
                    T::read(owner, &self.server, socket)
                }
            }
            Err(_io_err) => socket.register_close_event(owner),
        }
    }

    pub fn flush_registry(
        &mut self,
        owner: &mut LCellOwner<'id>,
        registry: &'registry LCell<'id, Registry<'id, T>>,
    ) {
        let registry_vec_len = registry.ro(owner).len();
        for ind in 0..registry_vec_len {
            let id = *unsafe { registry.ro(&owner).get_unchecked(ind) };
            let socket = unsafe { self.sockets.get_unchecked_mut(id) };
            let stream = unsafe { self.streams.get_unchecked_mut(id).assume_init_mut() };
            match socket.state {
                SocketState::Idle => continue,
                SocketState::WriteRequest => {
                    socket.state = SocketState::Idle;
                    T::flush(owner, &self.server, socket);
                    match socket.write_buf.rw(owner).push_to_write(stream) {
                        Ok(write_len) => {
                            if write_len == 0 {
                                self.close(owner, id)
                            }
                        }
                        Err(_) => self.close(owner, id),
                    };
                }
                SocketState::CloseRequest => self.close(owner, id),
            }
        }
        registry.rw(owner).clear();
    }

    pub fn accept(
        &mut self,
        owner: &mut LCellOwner<'id>,
        accepted_stream: Stream,
        addr: SocketAddr,
        registry: &'registry LCell<'id, Registry<'id, T>>,
    ) -> Result<(), ()> {
        let id = self
            .sockets
            .add_with_index(|ind| Socket::new(registry, *ind))?;
        let stream = unsafe { self.streams.get_unchecked_mut(id) };
        let socket = unsafe { self.sockets.get_unchecked_mut(id) };
        *stream = MaybeUninit::new(accepted_stream);
        match self
            .poll
            .open(unsafe { stream.assume_init_mut() }, socket.token)
        {
            Ok(()) => T::accept(owner, &self.server, socket, addr),
            Err(_err) => socket.register_close_event(owner),
        }
        Ok(())
    }

    pub fn close(&mut self, owner: &mut LCellOwner<'id>, id: usize) {
        let socket = unsafe { self.sockets.get_unchecked_mut(id) };
        let stream = unsafe { self.streams.get_unchecked_mut(id).assume_init_mut() };
        T::close(owner, &self.server, socket);
        self.poll.close(stream);
        let token = socket.token;
        unsafe { self.sockets.remove_unchecked(token) };
    }
}