#![doc = include_str!("../README.md")]
#![deny(missing_docs)]
#![deny(rustdoc::broken_intra_doc_links)]
use std::collections::HashMap;
use std::io;
use std::sync::Arc;
use std::time::Duration;
use mailrs_maildir::{Maildir, MessageId};
use tokio::sync::{Semaphore, mpsc, oneshot};
pub const DEFAULT_MAX_BATCH: usize = 64;
pub const DEFAULT_MAX_WAIT: Duration = Duration::from_millis(10);
pub const DEFAULT_MAX_CONCURRENT_FLUSHES: usize = 2;
#[derive(Clone)]
pub struct DeliveryExecutor {
sender: mpsc::Sender<Request>,
}
struct Request {
path: String,
body: Arc<Vec<u8>>,
reply: oneshot::Sender<io::Result<MessageId>>,
}
impl DeliveryExecutor {
pub fn spawn() -> Self {
Self::with_config(DEFAULT_MAX_BATCH, DEFAULT_MAX_WAIT)
}
pub fn with_config(max_batch: usize, max_wait: Duration) -> Self {
Self::with_full_config(max_batch, max_wait, DEFAULT_MAX_CONCURRENT_FLUSHES)
}
pub fn with_full_config(
max_batch: usize,
max_wait: Duration,
max_concurrent_flushes: usize,
) -> Self {
let (tx, rx) = mpsc::channel(max_batch * 16);
let flush_semaphore = Arc::new(Semaphore::new(max_concurrent_flushes.max(1)));
tokio::spawn(run_executor(rx, max_batch, max_wait, flush_semaphore));
Self { sender: tx }
}
pub async fn deliver(&self, path: String, body: Arc<Vec<u8>>) -> io::Result<MessageId> {
let (reply_tx, reply_rx) = oneshot::channel();
if self
.sender
.send(Request {
path,
body,
reply: reply_tx,
})
.await
.is_err()
{
return Err(io::Error::other("delivery executor channel closed"));
}
reply_rx
.await
.unwrap_or_else(|_| Err(io::Error::other("delivery executor dropped reply")))
}
}
async fn run_executor(
mut rx: mpsc::Receiver<Request>,
max_batch: usize,
max_wait: Duration,
flush_semaphore: Arc<Semaphore>,
) {
loop {
let Some(first) = rx.recv().await else {
return;
};
let mut batch: Vec<Request> = Vec::with_capacity(max_batch);
batch.push(first);
let deadline = tokio::time::Instant::now() + max_wait;
while batch.len() < max_batch {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, rx.recv()).await {
Ok(Some(req)) => batch.push(req),
Ok(None) => break, Err(_) => break, }
}
let Ok(permit) = flush_semaphore.clone().acquire_owned().await else {
tokio::task::spawn_blocking(move || flush_batch(batch))
.await
.ok();
continue;
};
tokio::task::spawn_blocking(move || {
flush_batch(batch);
drop(permit);
});
}
}
fn flush_batch(reqs: Vec<Request>) {
let mut by_path: HashMap<String, Vec<Request>> = HashMap::new();
for r in reqs {
by_path.entry(r.path.clone()).or_default().push(r);
}
for (path, mut group) in by_path {
let bodies: Vec<&[u8]> = group.iter().map(|r| r.body.as_slice()).collect();
let result = match Maildir::create_cached(&path) {
Ok(md) => md.deliver_batch(&bodies),
Err(e) => Err(e),
};
match result {
Ok(ids) => {
for (req, id) in group.drain(..).zip(ids) {
let _ = req.reply.send(Ok(id));
}
}
Err(e) => {
let msg = format!("{e}");
for req in group {
let _ = req
.reply
.send(Err(io::Error::other(format!("batch delivery: {msg}"))));
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Instant;
fn tmpdir() -> tempfile::TempDir {
tempfile::tempdir().unwrap()
}
#[tokio::test]
async fn delivers_a_single_message() {
let dir = tmpdir();
let path = dir.path().join("user").to_string_lossy().to_string();
let exec = DeliveryExecutor::with_config(8, Duration::from_millis(5));
let id = exec
.deliver(path.clone(), Arc::new(b"From: a\r\n\r\nhi".to_vec()))
.await
.unwrap();
assert!(!id.0.is_empty());
let new_path = std::path::PathBuf::from(&path).join("new").join(&id.0);
assert!(new_path.exists());
}
#[tokio::test]
async fn batches_concurrent_deliveries_to_same_path() {
let dir = tmpdir();
let path = dir.path().join("user").to_string_lossy().to_string();
let exec = DeliveryExecutor::with_config(64, Duration::from_millis(50));
let mut handles = Vec::new();
for i in 0..16 {
let exec = exec.clone();
let p = path.clone();
let body = Arc::new(format!("body {i}\r\n").into_bytes());
handles.push(tokio::spawn(async move { exec.deliver(p, body).await }));
}
let mut ids = Vec::new();
for h in handles {
ids.push(h.await.unwrap().unwrap());
}
let mut s = std::collections::HashSet::new();
for id in &ids {
assert!(s.insert(id.0.clone()), "dup id: {}", id.0);
}
let new_dir = std::path::PathBuf::from(&path).join("new");
let count = std::fs::read_dir(&new_dir).unwrap().count();
assert_eq!(count, 16);
}
#[tokio::test]
async fn distinct_paths_get_separate_batches() {
let dir = tmpdir();
let exec = DeliveryExecutor::with_config(64, Duration::from_millis(50));
let mut handles = Vec::new();
for i in 0..8 {
let exec = exec.clone();
let path = dir
.path()
.join(format!("user{i}"))
.to_string_lossy()
.to_string();
let body = Arc::new(b"body\r\n".to_vec());
handles.push(tokio::spawn(async move { exec.deliver(path, body).await }));
}
for h in handles {
assert!(h.await.unwrap().is_ok());
}
for i in 0..8 {
let new_dir = dir.path().join(format!("user{i}")).join("new");
let count = std::fs::read_dir(&new_dir).unwrap().count();
assert_eq!(count, 1, "user{i} should have one message");
}
}
#[tokio::test]
async fn single_message_does_not_hang() {
let dir = tmpdir();
let path = dir.path().join("user").to_string_lossy().to_string();
let exec = DeliveryExecutor::with_config(64, Duration::from_millis(50));
let start = Instant::now();
let _ = exec
.deliver(path, Arc::new(b"hi".to_vec()))
.await
.unwrap();
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(2),
"should not hang, took {elapsed:?}"
);
}
}