#![allow(dead_code)]
use std::path::PathBuf;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::Arc;
use std::time::Duration;
use postcrate_core::{
BindHost, CoreConfig, CreateEphemeralInput, CreateMailboxInput, EphemeralHandle, LogSink,
MailboxKind, Service,
};
use tempfile::TempDir;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
pub struct TestService {
pub service: Service,
pub _data_dir: TempDir,
pub http_url: String,
}
impl TestService {
pub async fn boot() -> Self {
Self::boot_with(|_| {}).await
}
pub async fn boot_with<F: FnOnce(&mut CoreConfig)>(tweak: F) -> Self {
let data_dir = TempDir::new().expect("create temp dir");
let mut cfg = CoreConfig::for_data_dir(data_dir.path()).expect("config");
cfg.http_port = 0;
cfg.bind_host = BindHost::Loopback;
cfg.ephemeral_port_range = next_port_range();
cfg.default_smtp_port = 0;
tweak(&mut cfg);
let service = Service::build(cfg, Arc::new(LogSink)).await.expect("build");
service.start_all().await.expect("start_all");
let addr = wait_for(Duration::from_secs(5), || service.http_addr())
.await
.expect("http addr");
let http_url = format!("http://{addr}");
Self {
service,
_data_dir: data_dir,
http_url,
}
}
pub async fn create_ephemeral(&self, ttl_seconds: u64) -> EphemeralHandle {
self.service
.create_ephemeral(CreateEphemeralInput {
project_id: "test".into(),
name: None,
ttl_seconds,
})
.await
.expect("ephemeral")
}
pub async fn create_primary(&self, port: u16) -> postcrate_core::Mailbox {
self.service
.create_mailbox(CreateMailboxInput {
project_id: "test".into(),
name: format!("primary-{port}"),
kind: MailboxKind::Primary,
port: Some(port),
ttl_seconds: None,
implicit_tls: false,
})
.await
.expect("create primary")
}
}
fn next_port_range() -> (u16, u16) {
static OFFSET: AtomicU16 = AtomicU16::new(0);
const SLICE: u16 = 50;
let pid_bucket = (std::process::id() % 35) as u16 * 1000;
let test_offset = OFFSET.fetch_add(SLICE, Ordering::Relaxed) % 1000;
let base = 30_000u16
.checked_add(pid_bucket)
.and_then(|x| x.checked_add(test_offset))
.unwrap_or(30_000);
(base, base + SLICE - 1)
}
pub async fn wait_for<T, F: Fn() -> Option<T>>(deadline: Duration, f: F) -> Option<T> {
let start = std::time::Instant::now();
while start.elapsed() < deadline {
if let Some(v) = f() {
return Some(v);
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
None
}
pub struct SmtpClient {
pub reader: BufReader<tokio::io::ReadHalf<TcpStream>>,
pub writer: tokio::io::WriteHalf<TcpStream>,
}
impl SmtpClient {
pub async fn connect(host: &str, port: u16) -> std::io::Result<Self> {
let stream = TcpStream::connect((host, port)).await?;
let (r, w) = tokio::io::split(stream);
Ok(Self {
reader: BufReader::new(r),
writer: w,
})
}
pub async fn read_line(&mut self) -> std::io::Result<String> {
let mut line = String::new();
let n = self.reader.read_line(&mut line).await?;
if n == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"server closed",
));
}
while line.ends_with('\n') || line.ends_with('\r') {
line.pop();
}
Ok(line)
}
pub async fn read_reply(&mut self) -> std::io::Result<Vec<String>> {
let mut lines = Vec::new();
loop {
let line = self.read_line().await?;
let is_final = line.len() < 4 || line.as_bytes()[3] == b' ';
lines.push(line);
if is_final {
return Ok(lines);
}
}
}
pub async fn send(&mut self, line: &str) -> std::io::Result<()> {
self.writer.write_all(line.as_bytes()).await?;
if !line.ends_with("\r\n") {
self.writer.write_all(b"\r\n").await?;
}
self.writer.flush().await?;
Ok(())
}
pub async fn send_data(&mut self, raw_message: &[u8]) -> std::io::Result<Vec<String>> {
self.send("DATA").await?;
let reply = self.read_reply().await?;
assert!(reply[0].starts_with("354"), "expected 354 got {:?}", reply);
let stuffed = dot_stuff(raw_message);
self.writer.write_all(&stuffed).await?;
if !stuffed.ends_with(b"\r\n") {
self.writer.write_all(b"\r\n").await?;
}
self.writer.write_all(b".\r\n").await?;
self.writer.flush().await?;
self.read_reply().await
}
pub async fn quit(mut self) -> std::io::Result<()> {
self.send("QUIT").await?;
let _ = self.read_reply().await;
Ok(())
}
}
pub fn dot_stuff(body: &[u8]) -> Vec<u8> {
let mut out = Vec::with_capacity(body.len());
let mut at_line_start = true;
for &b in body {
if at_line_start && b == b'.' {
out.push(b'.');
}
out.push(b);
at_line_start = b == b'\n';
}
out
}
pub async fn quick_send(
host: &str,
port: u16,
from: &str,
to: &str,
subject: &str,
body: &str,
) -> std::io::Result<()> {
let mut c = SmtpClient::connect(host, port).await?;
let _ = c.read_reply().await?; c.send("EHLO test").await?;
let _ = c.read_reply().await?;
c.send(&format!("MAIL FROM:<{from}>")).await?;
let _ = c.read_reply().await?;
c.send(&format!("RCPT TO:<{to}>")).await?;
let _ = c.read_reply().await?;
let raw = format!(
"From: {from}\r\nTo: {to}\r\nSubject: {subject}\r\nDate: Mon, 1 Jan 2024 00:00:00 +0000\r\n\r\n{body}"
);
let _ = c.send_data(raw.as_bytes()).await?;
c.quit().await
}
pub fn fixtures_dir() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests")
.join("fixtures")
}
pub fn read_fixture(rel: &str) -> Vec<u8> {
let path = fixtures_dir().join(rel);
std::fs::read(&path).unwrap_or_else(|e| panic!("read {}: {}", path.display(), e))
}
pub fn read_fixture_str(rel: &str) -> String {
let bytes = read_fixture(rel);
String::from_utf8(bytes).expect("utf-8 fixture")
}
pub async fn run_transcript(host: &str, port: u16, transcript: &str) {
let mut c = SmtpClient::connect(host, port).await.expect("connect");
for (lineno, raw) in transcript.lines().enumerate() {
let lineno = lineno + 1;
let line = raw.trim_end();
if line.is_empty() || line.starts_with('#') {
continue;
}
if let Some(rest) = line.strip_prefix("C: ") {
c.send(rest)
.await
.unwrap_or_else(|e| panic!("line {lineno}: send failed: {e}"));
} else if let Some(rest) = line.strip_prefix("S: ") {
let got = c
.read_line()
.await
.unwrap_or_else(|e| panic!("line {lineno}: read failed: {e}"));
assert!(
got.starts_with(rest),
"line {lineno}: expected reply starting with {rest:?}, got {got:?}"
);
} else if let Some(rest) = line.strip_prefix("C-DATA:") {
let payload = rest.strip_prefix(' ').unwrap_or(rest);
c.writer
.write_all(payload.as_bytes())
.await
.unwrap_or_else(|e| panic!("line {lineno}: write data: {e}"));
c.writer
.write_all(b"\r\n")
.await
.unwrap_or_else(|e| panic!("line {lineno}: write crlf: {e}"));
} else if line == "C-DATA-END" {
c.writer
.write_all(b".\r\n")
.await
.unwrap_or_else(|e| panic!("line {lineno}: write terminator: {e}"));
c.writer.flush().await.expect("flush");
} else {
panic!("line {lineno}: unrecognized transcript directive: {line:?}");
}
}
let _ = tokio::time::timeout(Duration::from_millis(100), async {
let mut buf = [0u8; 1024];
loop {
match c.reader.read(&mut buf).await {
Ok(0) | Err(_) => return,
Ok(_) => {}
}
}
})
.await;
}