ntex-net 3.9.1

ntexwork utils for ntex framework
Documentation
use std::{cell::RefCell, io, os::fd::AsRawFd, rc::Rc};

use ntex_io::Io;
use ntex_io_uring::{opcode, types::Fd};
use ntex_rt::Arbiter;
use ntex_service::cfg::SharedCfg;
use slab::Slab;
use socket2::{Domain, SockAddr, Socket};

use super::{Driver, DriverApi, Handler, TcpStream, UnixStream, stream::StreamOps};
use crate::channel::{self, Receiver, Sender};

#[derive(Clone)]
pub(crate) struct ConnectOps(Rc<ConnectOpsInner>);

struct ConnectOpsHandler {
    inner: Rc<ConnectOpsInner>,
}

type Operations = RefCell<Slab<(Box<SockAddr>, Socket, Sender<Io>, SharedCfg)>>;

struct ConnectOpsInner {
    api: DriverApi,
    streams: StreamOps,
    ops: Operations,
}

impl ConnectOps {
    pub(crate) fn get(driver: &Driver) -> Self {
        let streams = StreamOps::get(driver);

        Arbiter::get_value(move || {
            let mut inner = None;
            driver.register(|api| {
                assert!(
                    api.is_supported(opcode::Connect::CODE),
                    "opcode::Connect is required for io-uring support"
                );

                let ops = Rc::new(ConnectOpsInner {
                    api,
                    streams,
                    ops: RefCell::new(Slab::new()),
                });
                inner = Some(ops.clone());
                Box::new(ConnectOpsHandler { inner: ops })
            });
            ConnectOps(inner.unwrap())
        })
    }

    pub(crate) fn connect(
        &self,
        sock: Socket,
        addr: SockAddr,
        cfg: SharedCfg,
    ) -> Receiver<Io> {
        let (sender, rx) = channel::create();

        let mut ops = self.0.ops.borrow_mut();

        // addr must be stable, neon submits ops at the end of rt turn
        let addr = Box::new(addr);
        let (addr_ptr, addr_len) = (addr.as_ref().as_ptr().cast(), addr.len());

        let fd = sock.as_raw_fd();
        let id = ops.insert((addr, sock, sender, cfg));
        self.0.api.submit(
            id as u32,
            opcode::Connect::new(Fd(fd), addr_ptr, addr_len).build(),
        );
        rx
    }
}

impl Handler for ConnectOpsHandler {
    fn canceled(&mut self, user_data: usize) {
        log::trace!("connect-op is canceled {user_data:?}");
        self.inner.ops.borrow_mut().remove(user_data);
    }

    fn completed(&mut self, user_data: usize, _: u32, result: io::Result<usize>) {
        let (addr, sock, tx, cfg) = self.inner.ops.borrow_mut().remove(user_data);
        log::trace!(
            "{}: connect-op is completed {user_data:?} result: {result:?}, addr: {:?}",
            cfg.tag(),
            addr.as_socket()
        );

        match result {
            Ok(_) => {
                if sock.domain().map(|d| d == Domain::UNIX).unwrap_or(false) {
                    let _ = tx.send(Ok(Io::new(
                        UnixStream(sock, self.inner.streams.clone()),
                        cfg,
                    )));
                } else {
                    let _ = tx.send(Ok(Io::new(
                        TcpStream(sock, self.inner.streams.clone()),
                        cfg,
                    )));
                }
            }
            Err(err) => {
                let _ = tx.send(Err(err));
                crate::helpers::close_socket(sock);
            }
        }
    }

    fn tick(&mut self) {}

    fn cleanup(&mut self) {
        self.inner.ops.borrow_mut().clear();
    }
}