actix-server 2.6.0

General purpose TCP server built for the Actix ecosystem
Documentation
use std::{
    marker::PhantomData,
    net::SocketAddr,
    task::{Context, Poll},
};

use actix_service::{Service, ServiceFactory as BaseServiceFactory};
use actix_utils::future::{ready, Ready};
use futures_core::future::LocalBoxFuture;
use tracing::error;

use crate::{
    socket::{FromStream, MioStream},
    worker::WorkerCounterGuard,
};

#[doc(hidden)]
pub trait ServerServiceFactory<Stream: FromStream>: Send + Clone + 'static {
    type Factory: BaseServiceFactory<Stream, Config = ()>;

    fn create(&self) -> Self::Factory;
}

pub(crate) trait InternalServiceFactory: Send {
    fn name(&self, token: usize) -> &str;

    fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;

    fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>;
}

pub(crate) type BoxedServerService = Box<
    dyn Service<
        (WorkerCounterGuard, MioStream),
        Response = (),
        Error = (),
        Future = Ready<Result<(), ()>>,
    >,
>;

pub(crate) struct StreamService<S, I> {
    service: S,
    _phantom: PhantomData<I>,
}

impl<S, I> StreamService<S, I> {
    pub(crate) fn new(service: S) -> Self {
        StreamService {
            service,
            _phantom: PhantomData,
        }
    }
}

impl<S, I> Service<(WorkerCounterGuard, MioStream)> for StreamService<S, I>
where
    S: Service<I>,
    S::Future: 'static,
    S::Error: 'static,
    I: FromStream,
{
    type Response = ();
    type Error = ();
    type Future = Ready<Result<(), ()>>;

    fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.service.poll_ready(ctx).map_err(|_| ())
    }

    fn call(&self, (guard, req): (WorkerCounterGuard, MioStream)) -> Self::Future {
        ready(match FromStream::from_mio(req) {
            Ok(stream) => {
                let f = self.service.call(stream);
                actix_rt::spawn(async move {
                    let _ = f.await;
                    drop(guard);
                });
                Ok(())
            }
            Err(err) => {
                error!("can not convert to an async TCP stream: {err}");
                Err(())
            }
        })
    }
}

pub(crate) struct StreamNewService<F: ServerServiceFactory<Io>, Io: FromStream> {
    name: String,
    inner: F,
    token: usize,
    addr: SocketAddr,
    _t: PhantomData<Io>,
}

impl<F, Io> StreamNewService<F, Io>
where
    F: ServerServiceFactory<Io>,
    Io: FromStream + Send + 'static,
{
    pub(crate) fn create(
        name: String,
        token: usize,
        inner: F,
        addr: SocketAddr,
    ) -> Box<dyn InternalServiceFactory> {
        Box::new(Self {
            name,
            token,
            inner,
            addr,
            _t: PhantomData,
        })
    }
}

impl<F, Io> InternalServiceFactory for StreamNewService<F, Io>
where
    F: ServerServiceFactory<Io>,
    Io: FromStream + Send + 'static,
{
    fn name(&self, _: usize) -> &str {
        &self.name
    }

    fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
        Box::new(Self {
            name: self.name.clone(),
            inner: self.inner.clone(),
            token: self.token,
            addr: self.addr,
            _t: PhantomData,
        })
    }

    fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> {
        let token = self.token;
        let fut = self.inner.create().new_service(());
        Box::pin(async move {
            match fut.await {
                Ok(inner) => {
                    let service = Box::new(StreamService::new(inner)) as _;
                    Ok((token, service))
                }
                Err(_) => Err(()),
            }
        })
    }
}

impl<F, T, I> ServerServiceFactory<I> for F
where
    F: Fn() -> T + Send + Clone + 'static,
    T: BaseServiceFactory<I, Config = ()>,
    I: FromStream,
{
    type Factory = T;

    fn create(&self) -> T {
        (self)()
    }
}