marlin/
axum.rs

1use std::net::SocketAddr;
2use std::time::Duration;
3
4use axum::{
5    extract::connect_info::Connected,
6    serve::{IncomingStream, Listener},
7};
8use tokio::net::{TcpListener, TcpStream};
9use tokio::time::sleep;
10
11use crate::scallop::{
12    ScallopAuthStore, ScallopAuther, ScallopError, ScallopStream,
13    new_server_async_Noise_IX_25519_ChaChaPoly_BLAKE2b,
14};
15
16#[derive(Debug, thiserror::Error)]
17pub enum AxumError {
18    #[error("failed to accept conns")]
19    AcceptError(#[from] tokio::io::Error),
20    #[error("failed to scallop")]
21    ScallopError(#[from] ScallopError),
22    #[error("timeout")]
23    TimeoutError,
24}
25
26pub struct ScallopListener<AuthStore, Auther> {
27    pub listener: TcpListener,
28    pub secret: [u8; 32],
29    pub auth_store: Option<AuthStore>,
30    pub auther: Option<Auther>,
31}
32
33impl<AuthStore, Auther> ScallopListener<AuthStore, Auther>
34where
35    AuthStore: ScallopAuthStore + Clone + Send + 'static,
36    AuthStore::State: Send + Unpin,
37    Auther: ScallopAuther + Clone + 'static,
38{
39    async fn accept_impl(
40        &mut self,
41    ) -> Result<(<Self as Listener>::Io, <Self as Listener>::Addr), AxumError> {
42        let (stream, addr) = self.listener.accept().await?;
43        // add a timeout so the acceptor is not stuck waiting on scallop
44        tokio::select! {
45            stream = new_server_async_Noise_IX_25519_ChaChaPoly_BLAKE2b(
46                stream,
47                &self.secret,
48                self.auth_store.clone(),
49                self.auther.clone(),
50            ) => {
51                Ok((stream?, addr))
52            }
53            _ = sleep(Duration::from_secs(5)) => {
54                Err(AxumError::TimeoutError)
55            }
56        }
57    }
58}
59
60impl<AuthStore, Auther> Listener for ScallopListener<AuthStore, Auther>
61where
62    AuthStore: ScallopAuthStore + Clone + Send + 'static,
63    AuthStore::State: Send + Unpin,
64    Auther: ScallopAuther + Clone + 'static,
65{
66    type Io = ScallopStream<TcpStream, <AuthStore as ScallopAuthStore>::State>;
67    type Addr = SocketAddr;
68
69    async fn accept(&mut self) -> (Self::Io, Self::Addr) {
70        loop {
71            let res = self.accept_impl().await;
72            match res {
73                Ok(res) => return res,
74                Err(_) => continue, // nothing, maybe log?
75            }
76        }
77    }
78
79    fn local_addr(&self) -> tokio::io::Result<Self::Addr> {
80        self.listener.local_addr()
81    }
82}
83
84#[derive(Clone)]
85pub struct ScallopState<State>(pub Option<State>);
86
87impl<AuthStore, Auther> Connected<IncomingStream<'_, ScallopListener<AuthStore, Auther>>>
88    for ScallopState<AuthStore::State>
89where
90    AuthStore: ScallopAuthStore + Clone + Send + 'static,
91    AuthStore::State: Clone + Send + Sync + Unpin,
92    Auther: ScallopAuther + Clone + 'static,
93{
94    fn connect_info(stream: IncomingStream<'_, ScallopListener<AuthStore, Auther>>) -> Self {
95        // is it possible to avoid a clone here?
96        ScallopState(stream.io().state.clone())
97    }
98}