#[cfg(feature = "miltertest-tests")]
mod miltertest;
#[cfg(feature = "miltertest-tests")]
pub use self::miltertest::*;
use indymilter::{
message::{self, command::Command, reply::Reply},
Callbacks, Config,
};
use std::{
io,
net::{Ipv4Addr, SocketAddr},
time::Duration,
};
use tokio::{
io::{AsyncWriteExt, BufStream},
net::{TcpListener, TcpStream, ToSocketAddrs},
sync::oneshot,
task::{self, JoinHandle},
};
pub fn init_tracing_subscriber() {
let _ = tracing_subscriber::fmt::try_init();
}
pub const LOCALHOST: (Ipv4Addr, u16) = (Ipv4Addr::LOCALHOST, 0);
pub fn default_config() -> Config {
Config {
connection_timeout: Duration::from_secs(30),
..Default::default()
}
}
pub struct Milter {
milter_handle: JoinHandle<io::Result<()>>,
shutdown: oneshot::Sender<()>,
addr: SocketAddr,
}
impl Milter {
pub async fn spawn<T: Send + 'static>(
addr: impl ToSocketAddrs,
callbacks: Callbacks<T>,
config: Config,
) -> io::Result<Self> {
let listener = TcpListener::bind(addr).await?;
let addr = listener.local_addr()?;
let (shutdown, shutdown_rx) = oneshot::channel();
let milter = tokio::spawn(indymilter::run(listener, callbacks, config, shutdown_rx));
Ok(Self {
milter_handle: milter,
shutdown,
addr,
})
}
pub fn addr(&self) -> SocketAddr {
self.addr
}
pub async fn shutdown(self) -> io::Result<()> {
yield_a_few_times().await;
let _ = self.shutdown.send(());
self.milter_handle.await?
}
}
pub async fn yield_a_few_times() {
for _ in 0..10 {
task::yield_now().await;
}
}
pub struct Client {
stream: BufStream<TcpStream>,
}
impl Client {
pub async fn connect(addr: impl ToSocketAddrs) -> io::Result<Self> {
let stream = TcpStream::connect(addr).await?;
Ok(Self {
stream: BufStream::new(stream),
})
}
pub async fn write_command(&mut self, cmd: Command) -> io::Result<()> {
let msg = cmd.into_message();
message::write(&mut self.stream, msg).await?;
Ok(())
}
pub async fn write_bytes(&mut self, bytes: &[u8]) -> io::Result<()> {
self.stream.write_all(bytes).await?;
self.stream.flush().await?;
Ok(())
}
pub async fn read_reply(&mut self) -> io::Result<Reply> {
let msg = message::read(&mut self.stream).await?;
let reply = Reply::parse_reply(msg).unwrap();
Ok(reply)
}
pub async fn disconnect(mut self) -> io::Result<()> {
self.stream.shutdown().await
}
}