indymilter 0.3.0

Asynchronous milter library
Documentation
mod common;

pub use crate::common::*;
use byte_strings::c_str;
use bytes::Bytes;
use indymilter::{
    message::{
        command::{Command, EnvAddrPayload, OptNegPayload},
        reply::Reply,
        PROTOCOL_VERSION,
    },
    ActionError, Actions, Callbacks, Config, ContextActions, ProtoOpts, SetErrorReply, Status,
};
use std::ffi::CString;

#[tokio::test]
async fn custom_error_reply() {
    init_tracing_subscriber();

    let callbacks = Callbacks::<()>::new()
        .on_mail(|cx, _| {
            Box::pin(async move {
                cx.reply.set_error_reply("550", Some("5.5.0"), ["No!", "Go away."]).unwrap();

                Status::Reject
            })
        });

    let milter = Milter::spawn(LOCALHOST, callbacks, default_config())
        .await
        .unwrap();

    let mut client = Client::connect(milter.addr()).await.unwrap();

    client
        .write_command(Command::OptNeg(OptNegPayload {
            version: PROTOCOL_VERSION,
            actions: Actions::all(),
            opts: ProtoOpts::all(),
        }))
        .await
        .unwrap();

    let reply = client.read_reply().await.unwrap();
    assert!(matches!(reply, Reply::OptNeg { .. }));

    client
        .write_command(Command::Mail(EnvAddrPayload {
            args: vec![c_str!("me@example.com").into()],
        }))
        .await
        .unwrap();

    let reply = client.read_reply().await.unwrap();
    assert_eq!(
        reply,
        Reply::ReplyCode {
            reply: c_str!("550-5.5.0 No!\r\n550 5.5.0 Go away.").into()
        }
    );

    client.write_command(Command::Quit).await.unwrap();

    client.disconnect().await.unwrap();

    milter.shutdown().await.unwrap();
}

#[tokio::test]
async fn modify_header_actions() {
    init_tracing_subscriber();

    let callbacks = Callbacks::<()>::new()
        .on_eom(|cx| {
            Box::pin(async {
                // Also try passing a non-static string to `add_header` (to test
                // `IntoCString + Send` parameter type).
                let value = format!("<{}{}>", "value", 42);

                cx.actions.add_header("name", value.as_str()).await.unwrap();
                cx.actions
                    .change_header("other", 1, Some("some value"))
                    .await
                    .unwrap();

                Status::Continue
            })
        });

    let config = Config {
        actions: Actions::ADD_HEADER | Actions::CHANGE_HEADER,
        ..default_config()
    };

    let milter = Milter::spawn(LOCALHOST, callbacks, config).await.unwrap();

    let mut client = Client::connect(milter.addr()).await.unwrap();

    client
        .write_command(Command::OptNeg(OptNegPayload {
            version: PROTOCOL_VERSION,
            actions: Actions::all(),
            opts: ProtoOpts::all(),
        }))
        .await
        .unwrap();

    let reply = client.read_reply().await.unwrap();
    assert!(matches!(
        reply,
        Reply::OptNeg { actions, .. } if actions == Actions::ADD_HEADER | Actions::CHANGE_HEADER
    ));

    client
        .write_command(Command::BodyEnd(Bytes::new()))
        .await
        .unwrap();

    let reply = client.read_reply().await.unwrap();
    assert_eq!(
        reply,
        Reply::AddHeader {
            name: c_str!("name").into(),
            value: c_str!("<value42>").into(),
        }
    );

    let reply = client.read_reply().await.unwrap();
    assert_eq!(
        reply,
        Reply::ChangeHeader {
            name: c_str!("other").into(),
            index: 1,
            value: c_str!("some value").into(),
        }
    );

    let reply = client.read_reply().await.unwrap();
    assert_eq!(reply, Reply::Continue);

    client.write_command(Command::Quit).await.unwrap();

    client.disconnect().await.unwrap();

    milter.shutdown().await.unwrap();
}

#[tokio::test]
async fn replace_body_chunks() {
    init_tracing_subscriber();

    let new_body_size = 2_000_000;

    let callbacks = Callbacks::<()>::new()
        .on_eom(move |cx| {
            Box::pin(async move {
                // New body with large size, greater than chunk size (an
                // implementation detail).
                let body = vec![b'x'; new_body_size];

                cx.actions.replace_body(&body).await.unwrap();

                Status::Continue
            })
        });

    let config = Config {
        actions: Actions::REPLACE_BODY,
        ..default_config()
    };

    let milter = Milter::spawn(LOCALHOST, callbacks, config).await.unwrap();

    let mut client = Client::connect(milter.addr()).await.unwrap();

    client
        .write_command(Command::OptNeg(OptNegPayload {
            version: PROTOCOL_VERSION,
            actions: Actions::all(),
            opts: ProtoOpts::all(),
        }))
        .await
        .unwrap();

    let reply = client.read_reply().await.unwrap();
    assert!(matches!(
        reply,
        Reply::OptNeg { actions, .. } if actions == Actions::REPLACE_BODY
    ));

    client
        .write_command(Command::BodyChunk(Bytes::from_static(
            b"Hello, this is the former body (to be replaced).",
        )))
        .await
        .unwrap();

    let reply = client.read_reply().await.unwrap();
    assert_eq!(reply, Reply::Continue);

    client
        .write_command(Command::BodyEnd(Bytes::new()))
        .await
        .unwrap();

    let mut new_body = vec![];
    let mut chunk_count = 0;

    loop {
        let reply = client.read_reply().await.unwrap();

        match reply {
            Reply::ReplaceBody { chunk } => {
                new_body.extend(chunk);
                chunk_count += 1;
            }
            Reply::Continue => break,
            _ => panic!(),
        }
    }

    assert_eq!(new_body.len(), new_body_size);
    assert!(chunk_count > 1);

    client.write_command(Command::Quit).await.unwrap();

    client.disconnect().await.unwrap();

    milter.shutdown().await.unwrap();
}

#[tokio::test]
async fn context_actions_error() {
    init_tracing_subscriber();

    let callbacks = Callbacks::<()>::new()
        .on_eom(|cx| {
            Box::pin(async {
                let result = cx.actions.add_recipient("").await;
                assert!(matches!(result, Err(ActionError::InvalidParam)));

                let result = cx
                    .actions
                    .add_recipient_ext("user@example.com", None::<CString>)
                    .await;
                assert!(matches!(result, Err(ActionError::NotAvailable)));

                Status::Continue
            })
        });

    let config = Config {
        actions: Actions::ADD_RCPT,
        ..default_config()
    };

    let milter = Milter::spawn(LOCALHOST, callbacks, config).await.unwrap();

    let mut client = Client::connect(milter.addr()).await.unwrap();

    client
        .write_command(Command::OptNeg(OptNegPayload {
            version: PROTOCOL_VERSION,
            actions: Actions::all(),
            opts: ProtoOpts::all(),
        }))
        .await
        .unwrap();

    let reply = client.read_reply().await.unwrap();
    assert!(matches!(
        reply,
        Reply::OptNeg { actions, .. } if actions == Actions::ADD_RCPT
    ));

    client
        .write_command(Command::BodyEnd(Bytes::new()))
        .await
        .unwrap();

    let reply = client.read_reply().await.unwrap();
    assert_eq!(reply, Reply::Continue);

    client.write_command(Command::Quit).await.unwrap();

    client.disconnect().await.unwrap();

    milter.shutdown().await.unwrap();
}