indymilter 0.3.0

Asynchronous milter library
Documentation
//! Comparison test with libmilter milter.
//!
//! The tests here run the same commands against both a libmilter milter and an
//! indymilter milter. The behaviour (callbacks called) must be the same.

#![cfg(feature = "libmilter-tests")]

mod common;

pub use crate::common::*;
use byte_strings::c_str;
use indymilter::{
    message::{
        command::{Command, ConnInfoPayload, HeloPayload, MacroPayload, OptNegPayload},
        reply::Reply,
        PROTOCOL_VERSION,
    },
    Actions, CallbackFuture, Callbacks, MacroStage, ProtoOpts, SocketInfo, Status,
};
use nix::{
    sys::signal::{self, Signal},
    unistd::Pid,
};
use std::{
    future::Future,
    io,
    net::Ipv4Addr,
    path::{Path, PathBuf},
    process::Stdio,
    sync::{Arc, Mutex},
    time::Duration,
};
use tokio::{
    io::{AsyncBufReadExt, BufReader},
    join,
    process::{self, Child, ChildStdout},
    time,
};

#[derive(Debug, Eq, PartialEq)]
enum Stage {
    Negotiate,
    Connect,
    Helo,
    Mail,
    Rcpt,
    Data,
    Header,
    Eoh,
    Body,
    Eom,
    Abort,
    Close,
    Unknown,
}

#[tokio::test]
async fn empty_macros_and_abort() {
    init_tracing_subscriber();

    let stages1 = run_libmilter_test("all_stages", 3000, test_script_1)
        .await
        .unwrap();

    let stages2 = run_indymilter_test(test_script_1).await.unwrap();

    assert_eq!(stages1, stages2);
}

// This script is an interaction as seen in the wild with Postfix 3.4.13.
// Oddities are the empty (invalid) macro payload, and the unexpected Abort
// command during the HELO stage.
async fn test_script_1(mut client: Client) -> io::Result<()> {
    client
        .write_command(Command::OptNeg(OptNegPayload {
            version: PROTOCOL_VERSION,
            actions: Actions::all(),
            opts: ProtoOpts::all(),
        }))
        .await?;

    let reply = client.read_reply().await?;
    assert!(matches!(reply, Reply::OptNeg { .. }));

    client
        .write_command(Command::ConnInfo(ConnInfoPayload {
            hostname: c_str!("mail.example.com").into(),
            socket_info: SocketInfo::Inet((Ipv4Addr::new(1, 2, 3, 4), 1234).into()),
        }))
        .await?;

    let reply = client.read_reply().await?;
    assert_eq!(reply, Reply::Continue);

    // Before the first HELO callback an empty payload of HELO macros is sent.

    client
        .write_command(Command::DefMacros(MacroPayload {
            stage: MacroStage::Helo,
            macros: vec![],
        }))
        .await?;

    client
        .write_command(Command::Helo(HeloPayload {
            hostname: c_str!("mail.example.com").into(),
        }))
        .await?;

    let reply = client.read_reply().await?;
    assert_eq!(reply, Reply::Continue);

    // After the first HELO, an abort command is sent, and ignored.
    client.write_command(Command::Abort).await?;

    // Now a usable, well-formed payload of HELO macros is sent.
    client
        .write_command(Command::DefMacros(MacroPayload {
            stage: MacroStage::Helo,
            macros: vec![c_str!("{tls_version}").into(), c_str!("TLSv1.3").into()],
        }))
        .await?;

    client
        .write_command(Command::Helo(HeloPayload {
            hostname: c_str!("mail.example.com").into(),
        }))
        .await?;

    let reply = client.read_reply().await?;
    assert_eq!(reply, Reply::Continue);

    client.write_command(Command::Quit).await?;

    client.disconnect().await?;

    Ok(())
}

async fn run_libmilter_test<F>(
    program: &str,
    port: u16,
    script: impl Fn(Client) -> F,
) -> io::Result<Vec<Stage>>
where
    F: Future<Output = io::Result<()>>,
{
    let program = exec_path(program);
    let socket = format!("inet:{port}@127.0.0.1");

    let mut milter = process::Command::new(program)
        .arg(socket)
        .stdout(Stdio::piped())
        .spawn()
        .unwrap();

    let stdout = milter.stdout.take().unwrap();

    let stages = Arc::new(Mutex::new(Vec::new()));

    // Spawn a task that keeps reading stdout.
    let stdout_task = tokio::spawn(process_stdout(stdout, stages.clone()));

    // Give the milter a little time to start.
    time::sleep(Duration::from_millis(100)).await;

    let client = Client::connect((Ipv4Addr::LOCALHOST, port)).await.unwrap();

    script(client).await.unwrap();

    // Send SIGTERM to the milter, the documented way of shutting it down.
    terminate_process(&milter).unwrap();

    let (status, join_handle) = join!(milter.wait(), stdout_task);

    let status = status.unwrap();
    join_handle.unwrap().unwrap();

    assert!(status.success());

    let stages = Arc::try_unwrap(stages).unwrap().into_inner().unwrap();
    eprintln!("stages: {stages:?}");
    Ok(stages)
}

fn exec_path(file_name: impl AsRef<Path>) -> PathBuf {
    Path::new("tests").join(file_name)
}

async fn process_stdout(stdout: ChildStdout, stages: Arc<Mutex<Vec<Stage>>>) -> io::Result<()> {
    let reader = BufReader::new(stdout);

    let mut lines = reader.lines();

    while let Some(line) = lines.next_line().await? {
        {
            let mut stages = stages.lock().unwrap();

            match line.as_str() {
                "NEGOTIATE" => stages.push(Stage::Negotiate),
                "CONNECT" => stages.push(Stage::Connect),
                "HELO" => stages.push(Stage::Helo),
                "MAIL" => stages.push(Stage::Mail),
                "RCPT" => stages.push(Stage::Rcpt),
                "DATA" => stages.push(Stage::Data),
                "HEADER" => stages.push(Stage::Header),
                "EOH" => stages.push(Stage::Eoh),
                "BODY" => stages.push(Stage::Body),
                "EOM" => stages.push(Stage::Eom),
                "ABORT" => stages.push(Stage::Abort),
                "CLOSE" => stages.push(Stage::Close),
                "UNKNOWN" => stages.push(Stage::Unknown),
                _ => {}
            }
        }

        println!("{line}");
    }

    Ok(())
}

fn terminate_process(proc: &Child) -> io::Result<()> {
    let pid = Pid::from_raw(proc.id().unwrap().try_into().unwrap());

    signal::kill(pid, Signal::SIGTERM)?;

    Ok(())
}

async fn run_indymilter_test<F>(script: impl Fn(Client) -> F) -> io::Result<Vec<Stage>>
where
    F: Future<Output = io::Result<()>>,
{
    let stages = Arc::new(Mutex::new(Vec::new()));

    let callbacks = {
        let stages_negotiate = stages.clone();
        let stages_connect = stages.clone();
        let stages_helo = stages.clone();
        let stages_mail = stages.clone();
        let stages_rcpt = stages.clone();
        let stages_data = stages.clone();
        let stages_header = stages.clone();
        let stages_eoh = stages.clone();
        let stages_body = stages.clone();
        let stages_eom = stages.clone();
        let stages_abort = stages.clone();
        let stages_close = stages.clone();
        let stages_unknown = stages.clone();

        Callbacks::<()>::new()
            .on_negotiate(move |_, _, _| {
                let stages = stages_negotiate.clone();

                Box::pin(async move {
                    stages.lock().unwrap().push(Stage::Negotiate);
                    Status::AllOpts
                })
            })
            .on_connect(move |_, _, _| record_stage(&stages_connect, Stage::Connect))
            .on_helo(move |_, _| record_stage(&stages_helo, Stage::Helo))
            .on_mail(move |_, _| record_stage(&stages_mail, Stage::Mail))
            .on_rcpt(move |_, _| record_stage(&stages_rcpt, Stage::Rcpt))
            .on_data(move |_| record_stage(&stages_data, Stage::Data))
            .on_header(move |_, _, _| record_stage(&stages_header, Stage::Header))
            .on_eoh(move |_| record_stage(&stages_eoh, Stage::Eoh))
            .on_body(move |_, _| record_stage(&stages_body, Stage::Body))
            .on_eom(move |_| record_stage(&stages_eom, Stage::Eom))
            .on_abort(move |_| record_stage(&stages_abort, Stage::Abort))
            .on_close(move |_| record_stage(&stages_close, Stage::Close))
            .on_unknown(move |_, _| record_stage(&stages_unknown, Stage::Unknown))
    };

    let milter = Milter::spawn(LOCALHOST, callbacks, default_config())
        .await
        .unwrap();

    let client = Client::connect(milter.addr()).await.unwrap();

    script(client).await.unwrap();

    milter.shutdown().await.unwrap();

    let stages = Arc::try_unwrap(stages).unwrap().into_inner().unwrap();
    eprintln!("stages: {stages:?}");
    Ok(stages)
}

fn record_stage(stages: &Arc<Mutex<Vec<Stage>>>, stage: Stage) -> CallbackFuture<'static> {
    let stages = stages.clone();

    Box::pin(async move {
        stages.lock().unwrap().push(stage);
        Status::Continue
    })
}