mod common;
pub use crate::common::*;
use byte_strings::c_str;
use bytes::Bytes;
use indymilter::{
message::{
command::{Command, EnvAddrPayload, OptNegPayload},
reply::Reply,
PROTOCOL_VERSION,
},
ActionError, Actions, Callbacks, Config, ContextActions, ProtoOpts, SetErrorReply, Status,
};
use std::ffi::CString;
#[tokio::test]
async fn custom_error_reply() {
init_tracing_subscriber();
let callbacks = Callbacks::<()>::new()
.on_mail(|cx, _| {
Box::pin(async move {
cx.reply.set_error_reply("550", Some("5.5.0"), ["No!", "Go away."]).unwrap();
Status::Reject
})
});
let milter = Milter::spawn(LOCALHOST, callbacks, default_config())
.await
.unwrap();
let mut client = Client::connect(milter.addr()).await.unwrap();
client
.write_command(Command::OptNeg(OptNegPayload {
version: PROTOCOL_VERSION,
actions: Actions::all(),
opts: ProtoOpts::all(),
}))
.await
.unwrap();
let reply = client.read_reply().await.unwrap();
assert!(matches!(reply, Reply::OptNeg { .. }));
client
.write_command(Command::Mail(EnvAddrPayload {
args: vec![c_str!("me@example.com").into()],
}))
.await
.unwrap();
let reply = client.read_reply().await.unwrap();
assert_eq!(
reply,
Reply::ReplyCode {
reply: c_str!("550-5.5.0 No!\r\n550 5.5.0 Go away.").into()
}
);
client.write_command(Command::Quit).await.unwrap();
client.disconnect().await.unwrap();
milter.shutdown().await.unwrap();
}
#[tokio::test]
async fn modify_header_actions() {
init_tracing_subscriber();
let callbacks = Callbacks::<()>::new()
.on_eom(|cx| {
Box::pin(async {
let value = format!("<{}{}>", "value", 42);
cx.actions.add_header("name", value.as_str()).await.unwrap();
cx.actions
.change_header("other", 1, Some("some value"))
.await
.unwrap();
Status::Continue
})
});
let config = Config {
actions: Actions::ADD_HEADER | Actions::CHANGE_HEADER,
..default_config()
};
let milter = Milter::spawn(LOCALHOST, callbacks, config).await.unwrap();
let mut client = Client::connect(milter.addr()).await.unwrap();
client
.write_command(Command::OptNeg(OptNegPayload {
version: PROTOCOL_VERSION,
actions: Actions::all(),
opts: ProtoOpts::all(),
}))
.await
.unwrap();
let reply = client.read_reply().await.unwrap();
assert!(matches!(
reply,
Reply::OptNeg { actions, .. } if actions == Actions::ADD_HEADER | Actions::CHANGE_HEADER
));
client
.write_command(Command::BodyEnd(Bytes::new()))
.await
.unwrap();
let reply = client.read_reply().await.unwrap();
assert_eq!(
reply,
Reply::AddHeader {
name: c_str!("name").into(),
value: c_str!("<value42>").into(),
}
);
let reply = client.read_reply().await.unwrap();
assert_eq!(
reply,
Reply::ChangeHeader {
name: c_str!("other").into(),
index: 1,
value: c_str!("some value").into(),
}
);
let reply = client.read_reply().await.unwrap();
assert_eq!(reply, Reply::Continue);
client.write_command(Command::Quit).await.unwrap();
client.disconnect().await.unwrap();
milter.shutdown().await.unwrap();
}
#[tokio::test]
async fn replace_body_chunks() {
init_tracing_subscriber();
let new_body_size = 2_000_000;
let callbacks = Callbacks::<()>::new()
.on_eom(move |cx| {
Box::pin(async move {
let body = vec![b'x'; new_body_size];
cx.actions.replace_body(&body).await.unwrap();
Status::Continue
})
});
let config = Config {
actions: Actions::REPLACE_BODY,
..default_config()
};
let milter = Milter::spawn(LOCALHOST, callbacks, config).await.unwrap();
let mut client = Client::connect(milter.addr()).await.unwrap();
client
.write_command(Command::OptNeg(OptNegPayload {
version: PROTOCOL_VERSION,
actions: Actions::all(),
opts: ProtoOpts::all(),
}))
.await
.unwrap();
let reply = client.read_reply().await.unwrap();
assert!(matches!(
reply,
Reply::OptNeg { actions, .. } if actions == Actions::REPLACE_BODY
));
client
.write_command(Command::BodyChunk(Bytes::from_static(
b"Hello, this is the former body (to be replaced).",
)))
.await
.unwrap();
let reply = client.read_reply().await.unwrap();
assert_eq!(reply, Reply::Continue);
client
.write_command(Command::BodyEnd(Bytes::new()))
.await
.unwrap();
let mut new_body = vec![];
let mut chunk_count = 0;
loop {
let reply = client.read_reply().await.unwrap();
match reply {
Reply::ReplaceBody { chunk } => {
new_body.extend(chunk);
chunk_count += 1;
}
Reply::Continue => break,
_ => panic!(),
}
}
assert_eq!(new_body.len(), new_body_size);
assert!(chunk_count > 1);
client.write_command(Command::Quit).await.unwrap();
client.disconnect().await.unwrap();
milter.shutdown().await.unwrap();
}
#[tokio::test]
async fn context_actions_error() {
init_tracing_subscriber();
let callbacks = Callbacks::<()>::new()
.on_eom(|cx| {
Box::pin(async {
let result = cx.actions.add_recipient("").await;
assert!(matches!(result, Err(ActionError::InvalidParam)));
let result = cx
.actions
.add_recipient_ext("user@example.com", None::<CString>)
.await;
assert!(matches!(result, Err(ActionError::NotAvailable)));
Status::Continue
})
});
let config = Config {
actions: Actions::ADD_RCPT,
..default_config()
};
let milter = Milter::spawn(LOCALHOST, callbacks, config).await.unwrap();
let mut client = Client::connect(milter.addr()).await.unwrap();
client
.write_command(Command::OptNeg(OptNegPayload {
version: PROTOCOL_VERSION,
actions: Actions::all(),
opts: ProtoOpts::all(),
}))
.await
.unwrap();
let reply = client.read_reply().await.unwrap();
assert!(matches!(
reply,
Reply::OptNeg { actions, .. } if actions == Actions::ADD_RCPT
));
client
.write_command(Command::BodyEnd(Bytes::new()))
.await
.unwrap();
let reply = client.read_reply().await.unwrap();
assert_eq!(reply, Reply::Continue);
client.write_command(Command::Quit).await.unwrap();
client.disconnect().await.unwrap();
milter.shutdown().await.unwrap();
}