retty 0.29.0

Retty — an asynchronous Rust networking framework that makes it easy to build protocols, application clients/servers.
Documentation
use std::collections::VecDeque;
use std::{cell::RefCell, error::Error, io::ErrorKind, marker::PhantomData, rc::Rc, time::Instant};

use crate::channel::{
    handler::Handler,
    handler_internal::{ContextInternal, HandlerInternal},
    Context,
};

const RESERVED_RETTY_PIPELINE_HANDLE_NAME: &str = "ReservedRettyPipelineHandlerName";

pub(crate) struct PipelineInternal<R, W> {
    names: Vec<String>,
    handlers: Vec<Rc<RefCell<dyn HandlerInternal>>>,
    contexts: Vec<Rc<RefCell<dyn ContextInternal>>>,

    transmits: Rc<RefCell<VecDeque<W>>>,
    phantom: PhantomData<R>,
}

impl<R: 'static, W: 'static> PipelineInternal<R, W> {
    pub(crate) fn new() -> Self {
        let transmits = Rc::new(RefCell::new(VecDeque::new()));
        let last_handler = LastHandler::new(transmits.clone());
        let (name, handler, context) = last_handler.generate();
        Self {
            names: vec![name],
            handlers: vec![handler],
            contexts: vec![context],

            transmits,
            phantom: PhantomData,
        }
    }

    pub(crate) fn add_back(&mut self, handler: impl Handler + 'static) {
        let (name, handler, context) = handler.generate();
        if name == RESERVED_RETTY_PIPELINE_HANDLE_NAME {
            panic!("handle name {} is reserved", name);
        }

        let len = self.names.len();

        self.names.insert(len - 1, name);
        self.handlers.insert(len - 1, handler);
        self.contexts.insert(len - 1, context);
    }

    pub(crate) fn add_front(&mut self, handler: impl Handler + 'static) {
        let (name, handler, context) = handler.generate();
        if name == RESERVED_RETTY_PIPELINE_HANDLE_NAME {
            panic!("handle name {} is reserved", name);
        }

        self.names.insert(0, name);
        self.handlers.insert(0, handler);
        self.contexts.insert(0, context);
    }

    pub(crate) fn remove_back(&mut self) -> Result<(), std::io::Error> {
        let len = self.names.len();
        if len == 1 {
            Err(std::io::Error::new(
                ErrorKind::NotFound,
                "No handlers in pipeline",
            ))
        } else {
            self.names.remove(len - 2);
            self.handlers.remove(len - 2);
            self.contexts.remove(len - 2);

            Ok(())
        }
    }

    pub(crate) fn remove_front(&mut self) -> Result<(), std::io::Error> {
        let len = self.names.len();
        if len == 1 {
            Err(std::io::Error::new(
                ErrorKind::NotFound,
                "No handlers in pipeline",
            ))
        } else {
            self.names.remove(0);
            self.handlers.remove(0);
            self.contexts.remove(0);

            Ok(())
        }
    }

    pub(crate) fn remove(&mut self, handler_name: &str) -> Result<(), std::io::Error> {
        if handler_name == RESERVED_RETTY_PIPELINE_HANDLE_NAME {
            return Err(std::io::Error::new(
                ErrorKind::PermissionDenied,
                format!("handle name {} is reserved", handler_name),
            ));
        }

        let mut to_be_removed = vec![];
        for (index, name) in self.names.iter().enumerate() {
            if name == handler_name {
                to_be_removed.push(index);
            }
        }

        if !to_be_removed.is_empty() {
            for index in to_be_removed.into_iter().rev() {
                self.names.remove(index);
                self.handlers.remove(index);
                self.contexts.remove(index);
            }

            Ok(())
        } else {
            Err(std::io::Error::new(
                ErrorKind::NotFound,
                format!("No such handler \"{}\" in pipeline", handler_name),
            ))
        }
    }

    pub(crate) fn len(&self) -> usize {
        self.names.len() - 1
    }

    pub(crate) fn finalize(&self) {
        let mut enumerate = self.contexts.iter().enumerate();
        let ctx_pipe_len = self.contexts.len();
        for _ in 0..ctx_pipe_len {
            let (j, ctx) = enumerate.next().unwrap();
            let mut curr = ctx.borrow_mut();

            let (next_context, next_handler) = (self.contexts.get(j + 1), self.handlers.get(j + 1));
            match (next_context, next_handler) {
                (Some(next_ctx), Some(next_hdr)) => {
                    curr.set_next_context(Some(next_ctx.clone()));
                    curr.set_next_handler(Some(next_hdr.clone()));
                }
                _ => {
                    curr.set_next_context(None);
                    curr.set_next_handler(None);
                }
            }
        }
    }

    pub(crate) fn write(&self, msg: W) {
        let mut transmits = self.transmits.borrow_mut();
        transmits.push_back(msg);
    }

    pub(crate) fn transport_active(&self) {
        let (mut handler, context) = (
            self.handlers.first().unwrap().borrow_mut(),
            self.contexts.first().unwrap().borrow(),
        );
        handler.transport_active_internal(&*context);
    }

    pub(crate) fn transport_inactive(&self) {
        let (mut handler, context) = (
            self.handlers.first().unwrap().borrow_mut(),
            self.contexts.first().unwrap().borrow(),
        );
        handler.transport_inactive_internal(&*context);
    }

    pub(crate) fn handle_read(&self, msg: R) {
        let (mut handler, context) = (
            self.handlers.first().unwrap().borrow_mut(),
            self.contexts.first().unwrap().borrow(),
        );
        handler.handle_read_internal(&*context, Box::new(msg));
    }

    pub(crate) fn poll_write(&self) -> Option<R> {
        let (mut handler, context) = (
            self.handlers.first().unwrap().borrow_mut(),
            self.contexts.first().unwrap().borrow(),
        );
        if let Some(msg) = handler.poll_write_internal(&*context) {
            if let Ok(msg) = msg.downcast::<R>() {
                Some(*msg)
            } else {
                panic!("msg can't downcast::<R> in {} handler", context.name());
            }
        } else {
            None
        }
    }

    pub(crate) fn handle_close(&self) {
        let (mut handler, context) = (
            self.handlers.first().unwrap().borrow_mut(),
            self.contexts.first().unwrap().borrow(),
        );
        handler.handle_close_internal(&*context);
    }

    pub(crate) fn handle_timeout(&self, now: Instant) {
        let (mut handler, context) = (
            self.handlers.first().unwrap().borrow_mut(),
            self.contexts.first().unwrap().borrow(),
        );
        handler.handle_timeout_internal(&*context, now);
    }

    pub(crate) fn poll_timeout(&self, eto: &mut Instant) {
        let (mut handler, context) = (
            self.handlers.first().unwrap().borrow_mut(),
            self.contexts.first().unwrap().borrow(),
        );
        handler.poll_timeout_internal(&*context, eto);
    }

    pub(crate) fn handle_read_eof(&self) {
        let (mut handler, context) = (
            self.handlers.first().unwrap().borrow_mut(),
            self.contexts.first().unwrap().borrow(),
        );
        handler.handle_read_eof_internal(&*context);
    }

    pub(crate) fn handle_exception(&self, err: Box<dyn Error>) {
        let (mut handler, context) = (
            self.handlers.first().unwrap().borrow_mut(),
            self.contexts.first().unwrap().borrow(),
        );
        handler.handle_exception_internal(&*context, err);
    }
}

pub(crate) struct LastHandler<W> {
    transmits: Rc<RefCell<VecDeque<W>>>,
}

impl<W> LastHandler<W> {
    pub(crate) fn new(transmits: Rc<RefCell<VecDeque<W>>>) -> Self {
        Self { transmits }
    }
}

impl<W: 'static> Handler for LastHandler<W> {
    type Rin = W;
    type Rout = Self::Rin;
    type Win = Self::Rin;
    type Wout = Self::Rin;

    fn name(&self) -> &str {
        RESERVED_RETTY_PIPELINE_HANDLE_NAME
    }

    fn handle_read(
        &mut self,
        ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
        msg: Self::Rin,
    ) {
        ctx.fire_read(msg);
    }

    fn poll_write(
        &mut self,
        _ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
    ) -> Option<Self::Wout> {
        let mut transmits = self.transmits.borrow_mut();
        transmits.pop_front()
    }
}