ntex-server 3.10.2

Server for ntex framework
Documentation
use std::{fmt, marker::PhantomData, sync::Arc};

use ntex_io::Io;
use ntex_service::{Service, ServiceCtx, ServiceFactory, boxed, cfg::SharedCfg};
use ntex_util::future::BoxFuture;

use super::{Config, Token, socket::Stream};

pub(super) type BoxServerService = boxed::BoxServiceFactory<SharedCfg, Io, (), (), ()>;
pub(super) type FactoryServiceType = Box<dyn FactoryService>;

#[derive(Debug)]
pub(crate) struct NetService {
    pub(crate) name: Arc<str>,
    pub(crate) tokens: Vec<(Token, SharedCfg)>,
    pub(crate) config: SharedCfg,
    pub(crate) factory: BoxServerService,
}

pub(crate) trait FactoryService: Send {
    #[allow(clippy::unnecessary_literal_bound)]
    fn name(&self, _: Token) -> &str {
        ""
    }

    fn clone_factory(&self) -> FactoryServiceType;

    fn set_config(&mut self, _: Token, _: SharedCfg) {}

    fn create(&self) -> BoxFuture<'static, Result<Vec<NetService>, ()>>;
}

struct Factory {
    name: Arc<str>,
    tokens: Vec<(Token, SharedCfg)>,
    wrapper: Box<dyn FactoryWrapper + Send>,
}

pub(crate) fn create_boxed_factory<S>(name: String, factory: S) -> BoxServerService
where
    S: ServiceFactory<Io, SharedCfg> + 'static,
{
    boxed::factory(ServerServiceFactory {
        name: Arc::from(name),
        factory,
    })
}

pub(crate) fn create_factory_service<F, R>(
    name: String,
    tokens: Vec<(Token, SharedCfg)>,
    factory: F,
) -> FactoryServiceType
where
    F: AsyncFn(Config) -> R + Send + Clone + 'static,
    R: ServiceFactory<Io, SharedCfg> + 'static,
{
    let name: Arc<str> = Arc::from(name);

    Box::from(Factory {
        tokens,
        name: name.clone(),
        wrapper: Box::new(FactoryWrapperImpl(async move |cfg| {
            boxed::factory(ServerServiceFactory {
                name: name.clone(),
                factory: (factory)(cfg).await,
            })
        })),
    })
}

struct FactoryWrapperImpl<F>(F);

trait FactoryWrapper: Send {
    fn clone(&self) -> Box<dyn FactoryWrapper>;
    fn run(&self, cfg: Config) -> BoxFuture<'static, BoxServerService>;
}

impl<F> FactoryWrapper for FactoryWrapperImpl<F>
where
    F: AsyncFn(Config) -> BoxServerService + Send + Clone + 'static,
{
    fn clone(&self) -> Box<dyn FactoryWrapper> {
        Box::new(Self(self.0.clone()))
    }

    fn run(&self, cfg: Config) -> BoxFuture<'static, BoxServerService> {
        let f = self.0.clone();
        Box::pin(async move { f(cfg).await })
    }
}

impl FactoryService for Factory {
    fn name(&self, _: Token) -> &str {
        &self.name
    }

    fn clone_factory(&self) -> FactoryServiceType {
        Box::new(Factory {
            name: self.name.clone(),
            tokens: self.tokens.clone(),
            wrapper: self.wrapper.clone(),
        })
    }

    fn set_config(&mut self, token: Token, cfg: SharedCfg) {
        for item in &mut self.tokens {
            if item.0 == token {
                item.1 = cfg.clone();
            }
        }
    }

    fn create(&self) -> BoxFuture<'static, Result<Vec<NetService>, ()>> {
        let cfg = Config::default();
        let name = self.name.clone();
        let mut tokens = self.tokens.clone();
        for item in &tokens {
            cfg.config(item.1.clone());
        }
        let factory_fut = self.wrapper.run(cfg.clone());

        Box::pin(async move {
            let factory = factory_fut.await;
            let config = cfg.get_config();
            for item in &mut tokens {
                item.1 = config.clone();
            }

            Ok(vec![NetService {
                name,
                tokens,
                factory,
                config: config.clone(),
            }])
        })
    }
}

struct ServerServiceFactory<S> {
    name: Arc<str>,
    factory: S,
}

impl<S> ServiceFactory<Io, SharedCfg> for ServerServiceFactory<S>
where
    S: ServiceFactory<Io, SharedCfg>,
{
    type Response = ();
    type Error = ();
    type Service = ServerService<S::Service>;
    type InitError = ();

    async fn create(&self, cfg: SharedCfg) -> Result<Self::Service, Self::InitError> {
        self.factory
            .create(cfg)
            .await
            .map(|inner| ServerService { inner })
            .map_err(|_| log::error!("Cannot construct {:?} service", self.name))
    }
}

struct ServerService<S> {
    inner: S,
}

impl<S> Service<Io> for ServerService<S>
where
    S: Service<Io>,
{
    type Response = ();
    type Error = ();

    async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {
        ctx.ready(&self.inner).await.map_err(|_| ())
    }

    async fn call(&self, req: Io, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {
        ctx.call(&self.inner, req).await.map(|_| ()).map_err(|_| ())
    }

    ntex_service::forward_shutdown!(inner);
}

// SAFETY: Send cannot be provided authomatically because of E and R params
// but R always get executed in one thread and never leave it
unsafe impl Send for Factory {}

pub(crate) trait OnWorkerStart {
    fn clone_fn(&self) -> Box<dyn OnWorkerStart + Send>;

    fn run(&self) -> BoxFuture<'static, Result<(), ()>>;
}

pub(super) struct OnWorkerStartWrapper<F, E> {
    pub(super) f: F,
    pub(super) _t: PhantomData<E>,
}

unsafe impl<F, E> Send for OnWorkerStartWrapper<F, E> where F: Send {}

impl<F, E> OnWorkerStartWrapper<F, E>
where
    F: AsyncFn() -> Result<(), E> + Send + Clone + 'static,
    E: fmt::Display + 'static,
{
    pub(super) fn create(f: F) -> Box<dyn OnWorkerStart + Send> {
        Box::new(Self { f, _t: PhantomData })
    }
}

impl<F, E> OnWorkerStart for OnWorkerStartWrapper<F, E>
where
    F: AsyncFn() -> Result<(), E> + Send + Clone + 'static,
    E: fmt::Display + 'static,
{
    fn clone_fn(&self) -> Box<dyn OnWorkerStart + Send> {
        Box::new(Self {
            f: self.f.clone(),
            _t: PhantomData,
        })
    }

    fn run(&self) -> BoxFuture<'static, Result<(), ()>> {
        let f = self.f.clone();
        Box::pin(async move {
            (f)().await.map_err(|e| {
                log::error!("On worker start callback failed: {e}");
            })
        })
    }
}

pub(crate) trait OnAccept {
    fn clone_fn(&self) -> Box<dyn OnAccept + Send>;

    fn run(&self, name: Arc<str>, stream: Stream)
    -> BoxFuture<'static, Result<Stream, ()>>;
}

pub(super) struct OnAcceptWrapper<F, E> {
    pub(super) f: F,
    pub(super) _t: PhantomData<E>,
}

unsafe impl<F, E> Send for OnAcceptWrapper<F, E> where F: Send {}

impl<F, E> OnAcceptWrapper<F, E>
where
    F: AsyncFn(Arc<str>, Stream) -> Result<Stream, E> + Send + Clone + 'static,
    E: fmt::Display + 'static,
{
    pub(super) fn create(f: F) -> Box<dyn OnAccept + Send> {
        Box::new(Self { f, _t: PhantomData })
    }
}

impl<F, E> OnAccept for OnAcceptWrapper<F, E>
where
    F: AsyncFn(Arc<str>, Stream) -> Result<Stream, E> + Send + Clone + 'static,
    E: fmt::Display + 'static,
{
    fn clone_fn(&self) -> Box<dyn OnAccept + Send> {
        Box::new(Self {
            f: self.f.clone(),
            _t: PhantomData,
        })
    }

    fn run(
        &self,
        name: Arc<str>,
        stream: Stream,
    ) -> BoxFuture<'static, Result<Stream, ()>> {
        let f = self.f.clone();
        Box::pin(async move {
            (f)(name, stream).await.map_err(|e| {
                log::error!("On accept callback failed: {e}");
            })
        })
    }
}