Skip to main content

rusmes_core/
processor.rs

1//! Processor chain for mailet execution
2
3use crate::mailet::{Mailet, MailetAction, MailetConfig, MailetError, MailetErrorPolicy};
4use crate::matcher::Matcher;
5use rusmes_proto::{Mail, MailState};
6use std::sync::Arc;
7use std::time::Duration;
8use tokio::time::timeout;
9
10/// Retry state for a single mailet step when the error policy is `Retry`
11struct RetryState {
12    attempts: u32,
13    max: u32,
14    backoff: Duration,
15}
16
17/// Invoke a mailet's `service` method, applying an optional per-mailet timeout.
18/// Returns `Ok(action)` on success, or an error wrapping `MailetError`.
19async fn invoke_with_timeout(
20    mailet: &dyn Mailet,
21    mail: &mut Mail,
22    timeout_ms: Option<u64>,
23) -> Result<MailetAction, MailetError> {
24    match timeout_ms {
25        None => mailet
26            .service(mail)
27            .await
28            .map_err(MailetError::ServiceError),
29        Some(ms) => {
30            let duration = Duration::from_millis(ms);
31            match timeout(duration, mailet.service(mail)).await {
32                Ok(Ok(action)) => Ok(action),
33                Ok(Err(e)) => Err(MailetError::ServiceError(e)),
34                Err(_elapsed) => Err(MailetError::Timeout(duration)),
35            }
36        }
37    }
38}
39
40/// Invoke with timeout, retrying according to the error policy.
41///
42/// Returns `Ok(Some(action))` on success, `Ok(None)` if the policy says to skip,
43/// or `Err(MailetError)` if the policy says to abort.
44async fn invoke_with_policy(
45    mailet: &dyn Mailet,
46    mail: &mut Mail,
47    config: &MailetConfig,
48) -> anyhow::Result<Option<MailetAction>> {
49    match &config.error_policy {
50        MailetErrorPolicy::Skip => {
51            match invoke_with_timeout(mailet, mail, config.timeout_ms).await {
52                Ok(action) => Ok(Some(action)),
53                Err(e) => {
54                    tracing::warn!(
55                        "Mailet {} errored (policy=Skip), skipping: {}",
56                        mailet.name(),
57                        e
58                    );
59                    Ok(None)
60                }
61            }
62        }
63        MailetErrorPolicy::Abort => {
64            let action = invoke_with_timeout(mailet, mail, config.timeout_ms)
65                .await
66                .map_err(|e| anyhow::anyhow!("Mailet {} aborted pipeline: {}", mailet.name(), e))?;
67            Ok(Some(action))
68        }
69        MailetErrorPolicy::Retry { max, backoff } => {
70            let mut state = RetryState {
71                attempts: 0,
72                max: *max,
73                backoff: *backoff,
74            };
75            loop {
76                match invoke_with_timeout(mailet, mail, config.timeout_ms).await {
77                    Ok(action) => return Ok(Some(action)),
78                    Err(e) => {
79                        state.attempts += 1;
80                        if state.attempts > state.max {
81                            return Err(anyhow::anyhow!(
82                                "Mailet {} failed after {} retries, aborting: {}",
83                                mailet.name(),
84                                state.max,
85                                e
86                            ));
87                        }
88                        tracing::warn!(
89                            "Mailet {} error (attempt {}/{}), retrying in {:?}: {}",
90                            mailet.name(),
91                            state.attempts,
92                            state.max,
93                            state.backoff,
94                            e
95                        );
96                        tokio::time::sleep(state.backoff).await;
97                    }
98                }
99            }
100        }
101    }
102}
103
104/// Matcher-Mailet pair with optional per-step configuration
105pub struct ProcessingStep {
106    pub matcher: Arc<dyn Matcher>,
107    pub mailet: Arc<dyn Mailet>,
108    /// Per-step mailet configuration (timeout, error policy)
109    pub config: MailetConfig,
110}
111
112impl ProcessingStep {
113    /// Create a new processing step with a default config derived from the mailet name
114    pub fn new(matcher: Arc<dyn Matcher>, mailet: Arc<dyn Mailet>) -> Self {
115        let name = mailet.name().to_string();
116        Self {
117            matcher,
118            mailet,
119            config: MailetConfig::new(name),
120        }
121    }
122
123    /// Create a new processing step with an explicit configuration
124    pub fn new_with_config(
125        matcher: Arc<dyn Matcher>,
126        mailet: Arc<dyn Mailet>,
127        config: MailetConfig,
128    ) -> Self {
129        Self {
130            matcher,
131            mailet,
132            config,
133        }
134    }
135}
136
137/// Named processor chain for a specific state
138pub struct Processor {
139    name: String,
140    state: MailState,
141    steps: Vec<ProcessingStep>,
142    thread_pool_size: usize,
143}
144
145impl Processor {
146    /// Create a new processor
147    pub fn new(name: impl Into<String>, state: MailState) -> Self {
148        Self {
149            name: name.into(),
150            state,
151            steps: Vec::new(),
152            thread_pool_size: 4,
153        }
154    }
155
156    /// Add a processing step
157    pub fn add_step(&mut self, step: ProcessingStep) {
158        self.steps.push(step);
159    }
160
161    /// Set thread pool size
162    pub fn set_thread_pool_size(&mut self, size: usize) {
163        self.thread_pool_size = size;
164    }
165
166    /// Get processor name
167    pub fn name(&self) -> &str {
168        &self.name
169    }
170
171    /// Get processor state
172    pub fn state(&self) -> &MailState {
173        &self.state
174    }
175
176    /// Process a mail through the chain
177    pub async fn process(&self, mut mail: Mail) -> anyhow::Result<Mail> {
178        tracing::debug!(
179            "Processing mail {} in processor {} (state: {})",
180            mail.id(),
181            self.name,
182            self.state
183        );
184
185        for step in &self.steps {
186            // Get matching recipients
187            let matched = step.matcher.match_mail(&mail).await?;
188
189            if matched.is_empty() {
190                tracing::trace!(
191                    "Matcher {} matched no recipients, skipping mailet {}",
192                    step.matcher.name(),
193                    step.mailet.name()
194                );
195                continue; // No match, skip this mailet
196            }
197
198            // Fork mail if partial match
199            if matched.len() < mail.recipients().len() {
200                tracing::trace!(
201                    "Matcher {} partially matched {}/{} recipients",
202                    step.matcher.name(),
203                    matched.len(),
204                    mail.recipients().len()
205                );
206
207                let (mut matched_mail, unmatched_mail) = mail.split(matched);
208
209                // Process matched portion with timeout + error policy
210                match invoke_with_policy(step.mailet.as_ref(), &mut matched_mail, &step.config)
211                    .await?
212                {
213                    Some(action) => {
214                        tracing::debug!(
215                            "Mailet {} returned action: {:?}",
216                            step.mailet.name(),
217                            action
218                        );
219
220                        // Handle state changes from matched portion
221                        if matched_mail.state != self.state {
222                            tracing::debug!(
223                                "Matched mail state changed from {:?} to {:?}",
224                                self.state,
225                                matched_mail.state
226                            );
227                        }
228                    }
229                    None => {
230                        // Skip policy: continue with merged mail
231                        tracing::debug!("Mailet {} skipped (Skip policy)", step.mailet.name());
232                    }
233                }
234
235                // Continue with unmatched
236                mail = unmatched_mail;
237            } else {
238                // All recipients match
239                tracing::trace!(
240                    "Matcher {} matched all {} recipients",
241                    step.matcher.name(),
242                    mail.recipients().len()
243                );
244
245                match invoke_with_policy(step.mailet.as_ref(), &mut mail, &step.config).await? {
246                    Some(action) => {
247                        tracing::debug!(
248                            "Mailet {} returned action: {:?}",
249                            step.mailet.name(),
250                            action
251                        );
252                    }
253                    None => {
254                        // Skip policy: do nothing, continue to next step
255                        tracing::debug!("Mailet {} skipped (Skip policy)", step.mailet.name());
256                        continue;
257                    }
258                }
259            }
260
261            // Check if state changed
262            if mail.state != self.state {
263                tracing::debug!(
264                    "Mail state changed from {:?} to {:?}, exiting processor",
265                    self.state,
266                    mail.state
267                );
268                return Ok(mail); // Forward to different processor
269            }
270        }
271
272        tracing::debug!(
273            "Mail {} completed processing in processor {}",
274            mail.id(),
275            self.name
276        );
277        Ok(mail)
278    }
279}
280
281#[cfg(test)]
282mod tests {
283    use super::*;
284    use crate::mailet::{MailetAction, MailetConfig, MailetError, MailetErrorPolicy};
285    use crate::matcher::AllMatcher;
286    use async_trait::async_trait;
287    use bytes::Bytes;
288    use rusmes_proto::{HeaderMap, MessageBody, MimeMessage};
289    use std::time::Duration;
290
291    struct TestMailet {
292        name: String,
293    }
294
295    #[async_trait]
296    impl Mailet for TestMailet {
297        async fn init(&mut self, _config: MailetConfig) -> anyhow::Result<()> {
298            Ok(())
299        }
300
301        async fn service(&self, mail: &mut Mail) -> anyhow::Result<MailetAction> {
302            mail.set_attribute("processed_by", self.name.clone());
303            Ok(MailetAction::Continue)
304        }
305
306        fn name(&self) -> &str {
307            &self.name
308        }
309    }
310
311    #[tokio::test]
312    async fn test_processor_chain() {
313        let mut processor = Processor::new("test", MailState::Root);
314
315        let mailet1 = Arc::new(TestMailet {
316            name: "mailet1".to_string(),
317        });
318        let mailet2 = Arc::new(TestMailet {
319            name: "mailet2".to_string(),
320        });
321
322        processor.add_step(ProcessingStep::new(Arc::new(AllMatcher), mailet1));
323        processor.add_step(ProcessingStep::new(Arc::new(AllMatcher), mailet2));
324
325        let message = MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from("Test")));
326        let mail = Mail::new(
327            None,
328            vec!["user@example.com".parse().unwrap()],
329            message,
330            None,
331            None,
332        );
333
334        let result = processor.process(mail).await.unwrap();
335        assert!(result.get_attribute("processed_by").is_some());
336    }
337
338    /// A mailet that sleeps for a fixed duration before returning
339    struct SlowMailet {
340        sleep_ms: u64,
341    }
342
343    #[async_trait]
344    impl Mailet for SlowMailet {
345        async fn init(&mut self, _config: MailetConfig) -> anyhow::Result<()> {
346            Ok(())
347        }
348
349        async fn service(&self, _mail: &mut Mail) -> anyhow::Result<MailetAction> {
350            tokio::time::sleep(Duration::from_millis(self.sleep_ms)).await;
351            Ok(MailetAction::Continue)
352        }
353
354        fn name(&self) -> &str {
355            "SlowMailet"
356        }
357    }
358
359    /// A mailet that always fails
360    struct FailingMailet {
361        name: String,
362        call_count: std::sync::Arc<std::sync::atomic::AtomicU32>,
363    }
364
365    #[async_trait]
366    impl Mailet for FailingMailet {
367        async fn init(&mut self, _config: MailetConfig) -> anyhow::Result<()> {
368            Ok(())
369        }
370
371        async fn service(&self, _mail: &mut Mail) -> anyhow::Result<MailetAction> {
372            self.call_count
373                .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
374            Err(anyhow::anyhow!("intentional test failure"))
375        }
376
377        fn name(&self) -> &str {
378            &self.name
379        }
380    }
381
382    /// A mailet that succeeds and records that it ran
383    struct MarkerMailet {
384        name: String,
385        marker: String,
386    }
387
388    #[async_trait]
389    impl Mailet for MarkerMailet {
390        async fn init(&mut self, _config: MailetConfig) -> anyhow::Result<()> {
391            Ok(())
392        }
393
394        async fn service(&self, mail: &mut Mail) -> anyhow::Result<MailetAction> {
395            mail.set_attribute(self.marker.clone(), true);
396            Ok(MailetAction::Continue)
397        }
398
399        fn name(&self) -> &str {
400            &self.name
401        }
402    }
403
404    fn make_test_mail() -> Mail {
405        let message = MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from("Test")));
406        Mail::new(
407            None,
408            vec!["user@example.com".parse().unwrap()],
409            message,
410            None,
411            None,
412        )
413    }
414
415    #[tokio::test]
416    async fn mailet_execution_timeout() {
417        // Mailet sleeps 200 ms; timeout is 50 ms → should produce Timeout error
418        let timeout_ms = 50u64;
419        let sleep_ms = 200u64;
420
421        let result = invoke_with_timeout(
422            &SlowMailet { sleep_ms },
423            &mut make_test_mail(),
424            Some(timeout_ms),
425        )
426        .await;
427
428        assert!(
429            matches!(result, Err(MailetError::Timeout(_))),
430            "Expected Timeout error, got: {:?}",
431            result
432        );
433    }
434
435    #[tokio::test]
436    async fn mailet_error_policy_skip() {
437        // A failing mailet with Skip policy should allow the next mailet to run
438        let mut processor = Processor::new("test", MailState::Root);
439
440        let failing = Arc::new(FailingMailet {
441            name: "failing".to_string(),
442            call_count: Arc::new(std::sync::atomic::AtomicU32::new(0)),
443        });
444
445        let marker = Arc::new(MarkerMailet {
446            name: "marker".to_string(),
447            marker: "marker_ran".to_string(),
448        });
449
450        let skip_config = MailetConfig::new("failing").with_error_policy(MailetErrorPolicy::Skip);
451
452        processor.add_step(ProcessingStep::new_with_config(
453            Arc::new(AllMatcher),
454            failing,
455            skip_config,
456        ));
457        processor.add_step(ProcessingStep::new(Arc::new(AllMatcher), marker));
458
459        let result = processor.process(make_test_mail()).await.unwrap();
460        assert!(
461            result.get_attribute("marker_ran").is_some(),
462            "Next mailet should have run after Skip"
463        );
464    }
465
466    #[tokio::test]
467    async fn mailet_error_policy_abort() {
468        // A failing mailet with Abort policy should propagate the error
469        let mut processor = Processor::new("test", MailState::Root);
470
471        let failing = Arc::new(FailingMailet {
472            name: "failing".to_string(),
473            call_count: Arc::new(std::sync::atomic::AtomicU32::new(0)),
474        });
475
476        let abort_config = MailetConfig::new("failing").with_error_policy(MailetErrorPolicy::Abort);
477
478        processor.add_step(ProcessingStep::new_with_config(
479            Arc::new(AllMatcher),
480            failing,
481            abort_config,
482        ));
483
484        let result = processor.process(make_test_mail()).await;
485        assert!(result.is_err(), "Abort policy should propagate error");
486    }
487
488    #[tokio::test]
489    async fn mailet_error_policy_retry_then_abort() {
490        // Retry with max:2 → mailet called 1 + 2 = 3 times total, then Abort
491        let call_count = Arc::new(std::sync::atomic::AtomicU32::new(0));
492        let failing = Arc::new(FailingMailet {
493            name: "failing".to_string(),
494            call_count: Arc::clone(&call_count),
495        });
496
497        let retry_config =
498            MailetConfig::new("failing").with_error_policy(MailetErrorPolicy::Retry {
499                max: 2,
500                backoff: Duration::from_millis(1), // 1ms backoff for test speed
501            });
502
503        let result =
504            invoke_with_policy(failing.as_ref(), &mut make_test_mail(), &retry_config).await;
505
506        assert!(result.is_err(), "Should error after exhausting retries");
507        // 1 initial + 2 retries = 3 calls
508        assert_eq!(
509            call_count.load(std::sync::atomic::Ordering::SeqCst),
510            3,
511            "Should have been called 3 times (1 initial + 2 retries)"
512        );
513    }
514}