indymilter 0.3.0

Asynchronous milter library
Documentation
#[cfg(feature = "miltertest-tests")]
mod miltertest;

#[cfg(feature = "miltertest-tests")]
pub use self::miltertest::*;

use indymilter::{
    message::{self, command::Command, reply::Reply},
    Callbacks, Config,
};
use std::{
    io,
    net::{Ipv4Addr, SocketAddr},
    time::Duration,
};
use tokio::{
    io::{AsyncWriteExt, BufStream},
    net::{TcpListener, TcpStream, ToSocketAddrs},
    sync::oneshot,
    task::{self, JoinHandle},
};

pub fn init_tracing_subscriber() {
    let _ = tracing_subscriber::fmt::try_init();
}

pub const LOCALHOST: (Ipv4Addr, u16) = (Ipv4Addr::LOCALHOST, 0);

pub fn default_config() -> Config {
    // Override very long default connection timeout for tests.
    Config {
        connection_timeout: Duration::from_secs(30),
        ..Default::default()
    }
}

pub struct Milter {
    milter_handle: JoinHandle<io::Result<()>>,
    shutdown: oneshot::Sender<()>,
    addr: SocketAddr,
}

impl Milter {
    pub async fn spawn<T: Send + 'static>(
        addr: impl ToSocketAddrs,
        callbacks: Callbacks<T>,
        config: Config,
    ) -> io::Result<Self> {
        let listener = TcpListener::bind(addr).await?;

        let addr = listener.local_addr()?;

        let (shutdown, shutdown_rx) = oneshot::channel();

        let milter = tokio::spawn(indymilter::run(listener, callbacks, config, shutdown_rx));

        Ok(Self {
            milter_handle: milter,
            shutdown,
            addr,
        })
    }

    pub fn addr(&self) -> SocketAddr {
        self.addr
    }

    pub async fn shutdown(self) -> io::Result<()> {
        // The milter task was spawned with `tokio::spawn`. Sometimes the test
        // function can proceed at each await point, giving the milter task no
        // chance to run to completion before the shutdown signal is received.
        // So we first yield a few times, hoping to give the milter time to
        // proceed a bit further (eg, terminate an open session).
        //
        // Note: This is not intended to impact test correctness, rather to
        // match the behaviour of a real milter (eg in terms of logging).
        yield_a_few_times().await;

        let _ = self.shutdown.send(());

        self.milter_handle.await?
    }
}

pub async fn yield_a_few_times() {
    for _ in 0..10 {
        task::yield_now().await;
    }
}

pub struct Client {
    stream: BufStream<TcpStream>,
}

impl Client {
    pub async fn connect(addr: impl ToSocketAddrs) -> io::Result<Self> {
        let stream = TcpStream::connect(addr).await?;

        Ok(Self {
            stream: BufStream::new(stream),
        })
    }

    pub async fn write_command(&mut self, cmd: Command) -> io::Result<()> {
        let msg = cmd.into_message();

        message::write(&mut self.stream, msg).await?;

        Ok(())
    }

    pub async fn write_bytes(&mut self, bytes: &[u8]) -> io::Result<()> {
        self.stream.write_all(bytes).await?;
        self.stream.flush().await?;

        Ok(())
    }

    pub async fn read_reply(&mut self) -> io::Result<Reply> {
        let msg = message::read(&mut self.stream).await?;

        // Assume that indymilter always responds with a well-formed reply.
        let reply = Reply::parse_reply(msg).unwrap();

        Ok(reply)
    }

    // Note: consumes and therefore drops this client and connection.
    pub async fn disconnect(mut self) -> io::Result<()> {
        self.stream.shutdown().await
    }
}