axum-test 20.0.0

Easy E2E testing for Axum
Documentation
use crate::util::SafeSend;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::mpsc::Sender;
use std::sync::mpsc::sync_channel;
use std::thread::spawn;
use tokio::runtime::Builder;
use tokio::task::LocalSet;

#[derive(Debug)]
pub(crate) struct SafeSendBuilder<F> {
    init: F,
}

impl<F, Fut, S> SafeSendBuilder<F>
where
    F: FnOnce() -> Fut + Send + 'static,
    Fut: Future<Output = S> + 'static,
    S: 'static,
{
    pub(crate) fn new(init: F) -> Self {
        Self { init }
    }

    pub(crate) fn on_send<G, In, Out>(self, handler: G) -> SafeSend<In, Out>
    where
        G: for<'s> Fn(&'s S, In) -> Pin<Box<dyn Future<Output = Out> + 's>> + Send + 'static,
        In: Send + 'static,
        Out: Send + 'static,
    {
        let (task_tx, task_rx) = sync_channel::<(In, Sender<Out>)>(0);

        let thread = spawn(move || {
            let rt = Builder::new_current_thread()
                .enable_all()
                .build()
                .expect("Failed to build tokio runtime for SafeSend");

            let local = LocalSet::new();

            let service = rt.block_on(local.run_until(async { (self.init)().await }));

            while let Ok((input, response_tx)) = task_rx.recv() {
                let output = rt.block_on(local.run_until(handler(&service, input)));
                response_tx.send(output).expect("Failed to send reply");
            }
        });

        SafeSend::new(task_tx, Arc::new(thread))
    }
}