Skip to main content

rusmes_core/
mailet.rs

1//! Mailet trait and types
2
3use async_trait::async_trait;
4use rusmes_proto::{Mail, MailState};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::time::Duration;
8use thiserror::Error;
9
10/// Errors that can occur during mailet processing
11#[derive(Debug, Error)]
12pub enum MailetError {
13    /// Mailet exceeded its configured execution timeout
14    #[error("Mailet execution timed out after {0:?}")]
15    Timeout(Duration),
16    /// Mailet returned an error
17    #[error("Mailet error: {0}")]
18    ServiceError(#[from] anyhow::Error),
19}
20
21/// Policy controlling what happens when a mailet errors or times out
22#[derive(Debug, Clone, Default, Serialize, Deserialize)]
23pub enum MailetErrorPolicy {
24    /// Skip this mailet and continue the pipeline to the next step
25    Skip,
26    /// Abort the pipeline and propagate the error upstream (4xx/5xx response)
27    #[default]
28    Abort,
29    /// Re-enqueue up to `max` times with `backoff` delay between retries,
30    /// then Abort if still failing
31    Retry {
32        /// Maximum number of retry attempts
33        max: u32,
34        /// Delay between retry attempts
35        #[serde(with = "duration_serde")]
36        backoff: Duration,
37    },
38}
39
40/// Serde helpers for Duration
41mod duration_serde {
42    use serde::{Deserialize, Deserializer, Serialize, Serializer};
43    use std::time::Duration;
44
45    pub fn serialize<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
46    where
47        S: Serializer,
48    {
49        duration.as_millis().serialize(serializer)
50    }
51
52    pub fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
53    where
54        D: Deserializer<'de>,
55    {
56        let millis = u128::deserialize(deserializer)?;
57        Ok(Duration::from_millis(millis as u64))
58    }
59}
60
61/// Actions a mailet can take after processing a mail
62#[derive(Debug, Clone, PartialEq)]
63pub enum MailetAction {
64    /// Continue to next mailet in the chain
65    Continue,
66    /// Change mail state and move to different processor
67    ChangeState(MailState),
68    /// Drop the mail (set state to Ghost)
69    Drop,
70    /// Defer processing (requeue with delay)
71    Defer(Duration),
72}
73
74/// Mailet configuration
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct MailetConfig {
77    /// Mailet name
78    pub name: String,
79    /// Configuration parameters
80    pub params: HashMap<String, String>,
81    /// Optional execution timeout in milliseconds. None means no timeout.
82    pub timeout_ms: Option<u64>,
83    /// Error handling policy when the mailet returns an error or times out
84    #[serde(default)]
85    pub error_policy: MailetErrorPolicy,
86}
87
88impl MailetConfig {
89    /// Create a new mailet config
90    pub fn new(name: impl Into<String>) -> Self {
91        Self {
92            name: name.into(),
93            params: HashMap::new(),
94            timeout_ms: None,
95            error_policy: MailetErrorPolicy::default(),
96        }
97    }
98
99    /// Add a parameter
100    pub fn with_param(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
101        self.params.insert(key.into(), value.into());
102        self
103    }
104
105    /// Set execution timeout in milliseconds
106    pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
107        self.timeout_ms = Some(timeout_ms);
108        self
109    }
110
111    /// Set the error policy
112    pub fn with_error_policy(mut self, policy: MailetErrorPolicy) -> Self {
113        self.error_policy = policy;
114        self
115    }
116
117    /// Get a parameter value
118    pub fn get_param(&self, key: &str) -> Option<&str> {
119        self.params.get(key).map(|s| s.as_str())
120    }
121
122    /// Get a required parameter
123    pub fn require_param(&self, key: &str) -> anyhow::Result<&str> {
124        self.get_param(key).ok_or_else(|| {
125            anyhow::anyhow!("Required parameter '{}' not found in mailet config", key)
126        })
127    }
128}
129
130/// Core mailet trait - message processing unit
131#[async_trait]
132pub trait Mailet: Send + Sync {
133    /// Initialize mailet with configuration
134    async fn init(&mut self, config: MailetConfig) -> anyhow::Result<()>;
135
136    /// Process a mail message
137    async fn service(&self, mail: &mut Mail) -> anyhow::Result<MailetAction>;
138
139    /// Cleanup on shutdown
140    async fn destroy(&mut self) -> anyhow::Result<()> {
141        Ok(())
142    }
143
144    /// Mailet name for logging/metrics
145    fn name(&self) -> &str;
146}
147
148#[cfg(test)]
149mod tests {
150    use super::*;
151
152    #[test]
153    fn test_mailet_config() {
154        let config = MailetConfig::new("TestMailet")
155            .with_param("key1", "value1")
156            .with_param("key2", "value2");
157
158        assert_eq!(config.name, "TestMailet");
159        assert_eq!(config.get_param("key1"), Some("value1"));
160        assert_eq!(config.get_param("key2"), Some("value2"));
161        assert_eq!(config.get_param("nonexistent"), None);
162    }
163
164    #[test]
165    fn test_mailet_config_timeout() {
166        let config = MailetConfig::new("TimedMailet").with_timeout_ms(500);
167        assert_eq!(config.timeout_ms, Some(500));
168    }
169
170    #[test]
171    fn test_mailet_config_error_policy() {
172        let skip_config =
173            MailetConfig::new("SkipMailet").with_error_policy(MailetErrorPolicy::Skip);
174        matches!(skip_config.error_policy, MailetErrorPolicy::Skip);
175
176        let retry_config =
177            MailetConfig::new("RetryMailet").with_error_policy(MailetErrorPolicy::Retry {
178                max: 3,
179                backoff: Duration::from_millis(100),
180            });
181        matches!(
182            retry_config.error_policy,
183            MailetErrorPolicy::Retry { max: 3, .. }
184        );
185    }
186
187    #[test]
188    fn test_mailet_action_equality() {
189        assert_eq!(MailetAction::Continue, MailetAction::Continue);
190        assert_eq!(MailetAction::Drop, MailetAction::Drop);
191        assert_ne!(MailetAction::Continue, MailetAction::Drop);
192    }
193}