#![cfg(feature = "libmilter-tests")]
mod common;
pub use crate::common::*;
use byte_strings::c_str;
use indymilter::{
message::{
command::{Command, ConnInfoPayload, HeloPayload, MacroPayload, OptNegPayload},
reply::Reply,
PROTOCOL_VERSION,
},
Actions, CallbackFuture, Callbacks, MacroStage, ProtoOpts, SocketInfo, Status,
};
use nix::{
sys::signal::{self, Signal},
unistd::Pid,
};
use std::{
future::Future,
io,
net::Ipv4Addr,
path::{Path, PathBuf},
process::Stdio,
sync::{Arc, Mutex},
time::Duration,
};
use tokio::{
io::{AsyncBufReadExt, BufReader},
join,
process::{self, Child, ChildStdout},
time,
};
#[derive(Debug, Eq, PartialEq)]
enum Stage {
Negotiate,
Connect,
Helo,
Mail,
Rcpt,
Data,
Header,
Eoh,
Body,
Eom,
Abort,
Close,
Unknown,
}
#[tokio::test]
async fn empty_macros_and_abort() {
init_tracing_subscriber();
let stages1 = run_libmilter_test("all_stages", 3000, test_script_1)
.await
.unwrap();
let stages2 = run_indymilter_test(test_script_1).await.unwrap();
assert_eq!(stages1, stages2);
}
async fn test_script_1(mut client: Client) -> io::Result<()> {
client
.write_command(Command::OptNeg(OptNegPayload {
version: PROTOCOL_VERSION,
actions: Actions::all(),
opts: ProtoOpts::all(),
}))
.await?;
let reply = client.read_reply().await?;
assert!(matches!(reply, Reply::OptNeg { .. }));
client
.write_command(Command::ConnInfo(ConnInfoPayload {
hostname: c_str!("mail.example.com").into(),
socket_info: SocketInfo::Inet((Ipv4Addr::new(1, 2, 3, 4), 1234).into()),
}))
.await?;
let reply = client.read_reply().await?;
assert_eq!(reply, Reply::Continue);
client
.write_command(Command::DefMacros(MacroPayload {
stage: MacroStage::Helo,
macros: vec![],
}))
.await?;
client
.write_command(Command::Helo(HeloPayload {
hostname: c_str!("mail.example.com").into(),
}))
.await?;
let reply = client.read_reply().await?;
assert_eq!(reply, Reply::Continue);
client.write_command(Command::Abort).await?;
client
.write_command(Command::DefMacros(MacroPayload {
stage: MacroStage::Helo,
macros: vec![c_str!("{tls_version}").into(), c_str!("TLSv1.3").into()],
}))
.await?;
client
.write_command(Command::Helo(HeloPayload {
hostname: c_str!("mail.example.com").into(),
}))
.await?;
let reply = client.read_reply().await?;
assert_eq!(reply, Reply::Continue);
client.write_command(Command::Quit).await?;
client.disconnect().await?;
Ok(())
}
async fn run_libmilter_test<F>(
program: &str,
port: u16,
script: impl Fn(Client) -> F,
) -> io::Result<Vec<Stage>>
where
F: Future<Output = io::Result<()>>,
{
let program = exec_path(program);
let socket = format!("inet:{port}@127.0.0.1");
let mut milter = process::Command::new(program)
.arg(socket)
.stdout(Stdio::piped())
.spawn()
.unwrap();
let stdout = milter.stdout.take().unwrap();
let stages = Arc::new(Mutex::new(Vec::new()));
let stdout_task = tokio::spawn(process_stdout(stdout, stages.clone()));
time::sleep(Duration::from_millis(100)).await;
let client = Client::connect((Ipv4Addr::LOCALHOST, port)).await.unwrap();
script(client).await.unwrap();
terminate_process(&milter).unwrap();
let (status, join_handle) = join!(milter.wait(), stdout_task);
let status = status.unwrap();
join_handle.unwrap().unwrap();
assert!(status.success());
let stages = Arc::try_unwrap(stages).unwrap().into_inner().unwrap();
eprintln!("stages: {stages:?}");
Ok(stages)
}
fn exec_path(file_name: impl AsRef<Path>) -> PathBuf {
Path::new("tests").join(file_name)
}
async fn process_stdout(stdout: ChildStdout, stages: Arc<Mutex<Vec<Stage>>>) -> io::Result<()> {
let reader = BufReader::new(stdout);
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await? {
{
let mut stages = stages.lock().unwrap();
match line.as_str() {
"NEGOTIATE" => stages.push(Stage::Negotiate),
"CONNECT" => stages.push(Stage::Connect),
"HELO" => stages.push(Stage::Helo),
"MAIL" => stages.push(Stage::Mail),
"RCPT" => stages.push(Stage::Rcpt),
"DATA" => stages.push(Stage::Data),
"HEADER" => stages.push(Stage::Header),
"EOH" => stages.push(Stage::Eoh),
"BODY" => stages.push(Stage::Body),
"EOM" => stages.push(Stage::Eom),
"ABORT" => stages.push(Stage::Abort),
"CLOSE" => stages.push(Stage::Close),
"UNKNOWN" => stages.push(Stage::Unknown),
_ => {}
}
}
println!("{line}");
}
Ok(())
}
fn terminate_process(proc: &Child) -> io::Result<()> {
let pid = Pid::from_raw(proc.id().unwrap().try_into().unwrap());
signal::kill(pid, Signal::SIGTERM)?;
Ok(())
}
async fn run_indymilter_test<F>(script: impl Fn(Client) -> F) -> io::Result<Vec<Stage>>
where
F: Future<Output = io::Result<()>>,
{
let stages = Arc::new(Mutex::new(Vec::new()));
let callbacks = {
let stages_negotiate = stages.clone();
let stages_connect = stages.clone();
let stages_helo = stages.clone();
let stages_mail = stages.clone();
let stages_rcpt = stages.clone();
let stages_data = stages.clone();
let stages_header = stages.clone();
let stages_eoh = stages.clone();
let stages_body = stages.clone();
let stages_eom = stages.clone();
let stages_abort = stages.clone();
let stages_close = stages.clone();
let stages_unknown = stages.clone();
Callbacks::<()>::new()
.on_negotiate(move |_, _, _| {
let stages = stages_negotiate.clone();
Box::pin(async move {
stages.lock().unwrap().push(Stage::Negotiate);
Status::AllOpts
})
})
.on_connect(move |_, _, _| record_stage(&stages_connect, Stage::Connect))
.on_helo(move |_, _| record_stage(&stages_helo, Stage::Helo))
.on_mail(move |_, _| record_stage(&stages_mail, Stage::Mail))
.on_rcpt(move |_, _| record_stage(&stages_rcpt, Stage::Rcpt))
.on_data(move |_| record_stage(&stages_data, Stage::Data))
.on_header(move |_, _, _| record_stage(&stages_header, Stage::Header))
.on_eoh(move |_| record_stage(&stages_eoh, Stage::Eoh))
.on_body(move |_, _| record_stage(&stages_body, Stage::Body))
.on_eom(move |_| record_stage(&stages_eom, Stage::Eom))
.on_abort(move |_| record_stage(&stages_abort, Stage::Abort))
.on_close(move |_| record_stage(&stages_close, Stage::Close))
.on_unknown(move |_, _| record_stage(&stages_unknown, Stage::Unknown))
};
let milter = Milter::spawn(LOCALHOST, callbacks, default_config())
.await
.unwrap();
let client = Client::connect(milter.addr()).await.unwrap();
script(client).await.unwrap();
milter.shutdown().await.unwrap();
let stages = Arc::try_unwrap(stages).unwrap().into_inner().unwrap();
eprintln!("stages: {stages:?}");
Ok(stages)
}
fn record_stage(stages: &Arc<Mutex<Vec<Stage>>>, stage: Stage) -> CallbackFuture<'static> {
let stages = stages.clone();
Box::pin(async move {
stages.lock().unwrap().push(stage);
Status::Continue
})
}