use async_trait::async_trait;
use rusmes_proto::{Mail, MailState};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum MailetError {
#[error("Mailet execution timed out after {0:?}")]
Timeout(Duration),
#[error("Mailet error: {0}")]
ServiceError(#[from] anyhow::Error),
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub enum MailetErrorPolicy {
Skip,
#[default]
Abort,
Retry {
max: u32,
#[serde(with = "duration_serde")]
backoff: Duration,
},
}
mod duration_serde {
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::time::Duration;
pub fn serialize<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
duration.as_millis().serialize(serializer)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
{
let millis = u128::deserialize(deserializer)?;
Ok(Duration::from_millis(millis as u64))
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum MailetAction {
Continue,
ChangeState(MailState),
Drop,
Defer(Duration),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MailetConfig {
pub name: String,
pub params: HashMap<String, String>,
pub timeout_ms: Option<u64>,
#[serde(default)]
pub error_policy: MailetErrorPolicy,
}
impl MailetConfig {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
params: HashMap::new(),
timeout_ms: None,
error_policy: MailetErrorPolicy::default(),
}
}
pub fn with_param(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.params.insert(key.into(), value.into());
self
}
pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
self.timeout_ms = Some(timeout_ms);
self
}
pub fn with_error_policy(mut self, policy: MailetErrorPolicy) -> Self {
self.error_policy = policy;
self
}
pub fn get_param(&self, key: &str) -> Option<&str> {
self.params.get(key).map(|s| s.as_str())
}
pub fn require_param(&self, key: &str) -> anyhow::Result<&str> {
self.get_param(key).ok_or_else(|| {
anyhow::anyhow!("Required parameter '{}' not found in mailet config", key)
})
}
}
#[async_trait]
pub trait Mailet: Send + Sync {
async fn init(&mut self, config: MailetConfig) -> anyhow::Result<()>;
async fn service(&self, mail: &mut Mail) -> anyhow::Result<MailetAction>;
async fn destroy(&mut self) -> anyhow::Result<()> {
Ok(())
}
fn name(&self) -> &str;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_mailet_config() {
let config = MailetConfig::new("TestMailet")
.with_param("key1", "value1")
.with_param("key2", "value2");
assert_eq!(config.name, "TestMailet");
assert_eq!(config.get_param("key1"), Some("value1"));
assert_eq!(config.get_param("key2"), Some("value2"));
assert_eq!(config.get_param("nonexistent"), None);
}
#[test]
fn test_mailet_config_timeout() {
let config = MailetConfig::new("TimedMailet").with_timeout_ms(500);
assert_eq!(config.timeout_ms, Some(500));
}
#[test]
fn test_mailet_config_error_policy() {
let skip_config =
MailetConfig::new("SkipMailet").with_error_policy(MailetErrorPolicy::Skip);
matches!(skip_config.error_policy, MailetErrorPolicy::Skip);
let retry_config =
MailetConfig::new("RetryMailet").with_error_policy(MailetErrorPolicy::Retry {
max: 3,
backoff: Duration::from_millis(100),
});
matches!(
retry_config.error_policy,
MailetErrorPolicy::Retry { max: 3, .. }
);
}
#[test]
fn test_mailet_action_equality() {
assert_eq!(MailetAction::Continue, MailetAction::Continue);
assert_eq!(MailetAction::Drop, MailetAction::Drop);
assert_ne!(MailetAction::Continue, MailetAction::Drop);
}
}