1use crate::mailet::{Mailet, MailetAction, MailetConfig, MailetError, MailetErrorPolicy};
4use crate::matcher::Matcher;
5use rusmes_proto::{Mail, MailState};
6use std::sync::Arc;
7use std::time::Duration;
8use tokio::time::timeout;
9
10struct RetryState {
12 attempts: u32,
13 max: u32,
14 backoff: Duration,
15}
16
17async fn invoke_with_timeout(
20 mailet: &dyn Mailet,
21 mail: &mut Mail,
22 timeout_ms: Option<u64>,
23) -> Result<MailetAction, MailetError> {
24 match timeout_ms {
25 None => mailet
26 .service(mail)
27 .await
28 .map_err(MailetError::ServiceError),
29 Some(ms) => {
30 let duration = Duration::from_millis(ms);
31 match timeout(duration, mailet.service(mail)).await {
32 Ok(Ok(action)) => Ok(action),
33 Ok(Err(e)) => Err(MailetError::ServiceError(e)),
34 Err(_elapsed) => Err(MailetError::Timeout(duration)),
35 }
36 }
37 }
38}
39
40async fn invoke_with_policy(
45 mailet: &dyn Mailet,
46 mail: &mut Mail,
47 config: &MailetConfig,
48) -> anyhow::Result<Option<MailetAction>> {
49 match &config.error_policy {
50 MailetErrorPolicy::Skip => {
51 match invoke_with_timeout(mailet, mail, config.timeout_ms).await {
52 Ok(action) => Ok(Some(action)),
53 Err(e) => {
54 tracing::warn!(
55 "Mailet {} errored (policy=Skip), skipping: {}",
56 mailet.name(),
57 e
58 );
59 Ok(None)
60 }
61 }
62 }
63 MailetErrorPolicy::Abort => {
64 let action = invoke_with_timeout(mailet, mail, config.timeout_ms)
65 .await
66 .map_err(|e| anyhow::anyhow!("Mailet {} aborted pipeline: {}", mailet.name(), e))?;
67 Ok(Some(action))
68 }
69 MailetErrorPolicy::Retry { max, backoff } => {
70 let mut state = RetryState {
71 attempts: 0,
72 max: *max,
73 backoff: *backoff,
74 };
75 loop {
76 match invoke_with_timeout(mailet, mail, config.timeout_ms).await {
77 Ok(action) => return Ok(Some(action)),
78 Err(e) => {
79 state.attempts += 1;
80 if state.attempts > state.max {
81 return Err(anyhow::anyhow!(
82 "Mailet {} failed after {} retries, aborting: {}",
83 mailet.name(),
84 state.max,
85 e
86 ));
87 }
88 tracing::warn!(
89 "Mailet {} error (attempt {}/{}), retrying in {:?}: {}",
90 mailet.name(),
91 state.attempts,
92 state.max,
93 state.backoff,
94 e
95 );
96 tokio::time::sleep(state.backoff).await;
97 }
98 }
99 }
100 }
101 }
102}
103
104pub struct ProcessingStep {
106 pub matcher: Arc<dyn Matcher>,
107 pub mailet: Arc<dyn Mailet>,
108 pub config: MailetConfig,
110}
111
112impl ProcessingStep {
113 pub fn new(matcher: Arc<dyn Matcher>, mailet: Arc<dyn Mailet>) -> Self {
115 let name = mailet.name().to_string();
116 Self {
117 matcher,
118 mailet,
119 config: MailetConfig::new(name),
120 }
121 }
122
123 pub fn new_with_config(
125 matcher: Arc<dyn Matcher>,
126 mailet: Arc<dyn Mailet>,
127 config: MailetConfig,
128 ) -> Self {
129 Self {
130 matcher,
131 mailet,
132 config,
133 }
134 }
135}
136
137pub struct Processor {
139 name: String,
140 state: MailState,
141 steps: Vec<ProcessingStep>,
142 thread_pool_size: usize,
143}
144
145impl Processor {
146 pub fn new(name: impl Into<String>, state: MailState) -> Self {
148 Self {
149 name: name.into(),
150 state,
151 steps: Vec::new(),
152 thread_pool_size: 4,
153 }
154 }
155
156 pub fn add_step(&mut self, step: ProcessingStep) {
158 self.steps.push(step);
159 }
160
161 pub fn set_thread_pool_size(&mut self, size: usize) {
163 self.thread_pool_size = size;
164 }
165
166 pub fn name(&self) -> &str {
168 &self.name
169 }
170
171 pub fn state(&self) -> &MailState {
173 &self.state
174 }
175
176 pub async fn process(&self, mut mail: Mail) -> anyhow::Result<Mail> {
178 tracing::debug!(
179 "Processing mail {} in processor {} (state: {})",
180 mail.id(),
181 self.name,
182 self.state
183 );
184
185 for step in &self.steps {
186 let matched = step.matcher.match_mail(&mail).await?;
188
189 if matched.is_empty() {
190 tracing::trace!(
191 "Matcher {} matched no recipients, skipping mailet {}",
192 step.matcher.name(),
193 step.mailet.name()
194 );
195 continue; }
197
198 if matched.len() < mail.recipients().len() {
200 tracing::trace!(
201 "Matcher {} partially matched {}/{} recipients",
202 step.matcher.name(),
203 matched.len(),
204 mail.recipients().len()
205 );
206
207 let (mut matched_mail, unmatched_mail) = mail.split(matched);
208
209 match invoke_with_policy(step.mailet.as_ref(), &mut matched_mail, &step.config)
211 .await?
212 {
213 Some(action) => {
214 tracing::debug!(
215 "Mailet {} returned action: {:?}",
216 step.mailet.name(),
217 action
218 );
219
220 if matched_mail.state != self.state {
222 tracing::debug!(
223 "Matched mail state changed from {:?} to {:?}",
224 self.state,
225 matched_mail.state
226 );
227 }
228 }
229 None => {
230 tracing::debug!("Mailet {} skipped (Skip policy)", step.mailet.name());
232 }
233 }
234
235 mail = unmatched_mail;
237 } else {
238 tracing::trace!(
240 "Matcher {} matched all {} recipients",
241 step.matcher.name(),
242 mail.recipients().len()
243 );
244
245 match invoke_with_policy(step.mailet.as_ref(), &mut mail, &step.config).await? {
246 Some(action) => {
247 tracing::debug!(
248 "Mailet {} returned action: {:?}",
249 step.mailet.name(),
250 action
251 );
252 }
253 None => {
254 tracing::debug!("Mailet {} skipped (Skip policy)", step.mailet.name());
256 continue;
257 }
258 }
259 }
260
261 if mail.state != self.state {
263 tracing::debug!(
264 "Mail state changed from {:?} to {:?}, exiting processor",
265 self.state,
266 mail.state
267 );
268 return Ok(mail); }
270 }
271
272 tracing::debug!(
273 "Mail {} completed processing in processor {}",
274 mail.id(),
275 self.name
276 );
277 Ok(mail)
278 }
279}
280
281#[cfg(test)]
282mod tests {
283 use super::*;
284 use crate::mailet::{MailetAction, MailetConfig, MailetError, MailetErrorPolicy};
285 use crate::matcher::AllMatcher;
286 use async_trait::async_trait;
287 use bytes::Bytes;
288 use rusmes_proto::{HeaderMap, MessageBody, MimeMessage};
289 use std::time::Duration;
290
291 struct TestMailet {
292 name: String,
293 }
294
295 #[async_trait]
296 impl Mailet for TestMailet {
297 async fn init(&mut self, _config: MailetConfig) -> anyhow::Result<()> {
298 Ok(())
299 }
300
301 async fn service(&self, mail: &mut Mail) -> anyhow::Result<MailetAction> {
302 mail.set_attribute("processed_by", self.name.clone());
303 Ok(MailetAction::Continue)
304 }
305
306 fn name(&self) -> &str {
307 &self.name
308 }
309 }
310
311 #[tokio::test]
312 async fn test_processor_chain() {
313 let mut processor = Processor::new("test", MailState::Root);
314
315 let mailet1 = Arc::new(TestMailet {
316 name: "mailet1".to_string(),
317 });
318 let mailet2 = Arc::new(TestMailet {
319 name: "mailet2".to_string(),
320 });
321
322 processor.add_step(ProcessingStep::new(Arc::new(AllMatcher), mailet1));
323 processor.add_step(ProcessingStep::new(Arc::new(AllMatcher), mailet2));
324
325 let message = MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from("Test")));
326 let mail = Mail::new(
327 None,
328 vec!["user@example.com".parse().unwrap()],
329 message,
330 None,
331 None,
332 );
333
334 let result = processor.process(mail).await.unwrap();
335 assert!(result.get_attribute("processed_by").is_some());
336 }
337
338 struct SlowMailet {
340 sleep_ms: u64,
341 }
342
343 #[async_trait]
344 impl Mailet for SlowMailet {
345 async fn init(&mut self, _config: MailetConfig) -> anyhow::Result<()> {
346 Ok(())
347 }
348
349 async fn service(&self, _mail: &mut Mail) -> anyhow::Result<MailetAction> {
350 tokio::time::sleep(Duration::from_millis(self.sleep_ms)).await;
351 Ok(MailetAction::Continue)
352 }
353
354 fn name(&self) -> &str {
355 "SlowMailet"
356 }
357 }
358
359 struct FailingMailet {
361 name: String,
362 call_count: std::sync::Arc<std::sync::atomic::AtomicU32>,
363 }
364
365 #[async_trait]
366 impl Mailet for FailingMailet {
367 async fn init(&mut self, _config: MailetConfig) -> anyhow::Result<()> {
368 Ok(())
369 }
370
371 async fn service(&self, _mail: &mut Mail) -> anyhow::Result<MailetAction> {
372 self.call_count
373 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
374 Err(anyhow::anyhow!("intentional test failure"))
375 }
376
377 fn name(&self) -> &str {
378 &self.name
379 }
380 }
381
382 struct MarkerMailet {
384 name: String,
385 marker: String,
386 }
387
388 #[async_trait]
389 impl Mailet for MarkerMailet {
390 async fn init(&mut self, _config: MailetConfig) -> anyhow::Result<()> {
391 Ok(())
392 }
393
394 async fn service(&self, mail: &mut Mail) -> anyhow::Result<MailetAction> {
395 mail.set_attribute(self.marker.clone(), true);
396 Ok(MailetAction::Continue)
397 }
398
399 fn name(&self) -> &str {
400 &self.name
401 }
402 }
403
404 fn make_test_mail() -> Mail {
405 let message = MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from("Test")));
406 Mail::new(
407 None,
408 vec!["user@example.com".parse().unwrap()],
409 message,
410 None,
411 None,
412 )
413 }
414
415 #[tokio::test]
416 async fn mailet_execution_timeout() {
417 let timeout_ms = 50u64;
419 let sleep_ms = 200u64;
420
421 let result = invoke_with_timeout(
422 &SlowMailet { sleep_ms },
423 &mut make_test_mail(),
424 Some(timeout_ms),
425 )
426 .await;
427
428 assert!(
429 matches!(result, Err(MailetError::Timeout(_))),
430 "Expected Timeout error, got: {:?}",
431 result
432 );
433 }
434
435 #[tokio::test]
436 async fn mailet_error_policy_skip() {
437 let mut processor = Processor::new("test", MailState::Root);
439
440 let failing = Arc::new(FailingMailet {
441 name: "failing".to_string(),
442 call_count: Arc::new(std::sync::atomic::AtomicU32::new(0)),
443 });
444
445 let marker = Arc::new(MarkerMailet {
446 name: "marker".to_string(),
447 marker: "marker_ran".to_string(),
448 });
449
450 let skip_config = MailetConfig::new("failing").with_error_policy(MailetErrorPolicy::Skip);
451
452 processor.add_step(ProcessingStep::new_with_config(
453 Arc::new(AllMatcher),
454 failing,
455 skip_config,
456 ));
457 processor.add_step(ProcessingStep::new(Arc::new(AllMatcher), marker));
458
459 let result = processor.process(make_test_mail()).await.unwrap();
460 assert!(
461 result.get_attribute("marker_ran").is_some(),
462 "Next mailet should have run after Skip"
463 );
464 }
465
466 #[tokio::test]
467 async fn mailet_error_policy_abort() {
468 let mut processor = Processor::new("test", MailState::Root);
470
471 let failing = Arc::new(FailingMailet {
472 name: "failing".to_string(),
473 call_count: Arc::new(std::sync::atomic::AtomicU32::new(0)),
474 });
475
476 let abort_config = MailetConfig::new("failing").with_error_policy(MailetErrorPolicy::Abort);
477
478 processor.add_step(ProcessingStep::new_with_config(
479 Arc::new(AllMatcher),
480 failing,
481 abort_config,
482 ));
483
484 let result = processor.process(make_test_mail()).await;
485 assert!(result.is_err(), "Abort policy should propagate error");
486 }
487
488 #[tokio::test]
489 async fn mailet_error_policy_retry_then_abort() {
490 let call_count = Arc::new(std::sync::atomic::AtomicU32::new(0));
492 let failing = Arc::new(FailingMailet {
493 name: "failing".to_string(),
494 call_count: Arc::clone(&call_count),
495 });
496
497 let retry_config =
498 MailetConfig::new("failing").with_error_policy(MailetErrorPolicy::Retry {
499 max: 2,
500 backoff: Duration::from_millis(1), });
502
503 let result =
504 invoke_with_policy(failing.as_ref(), &mut make_test_mail(), &retry_config).await;
505
506 assert!(result.is_err(), "Should error after exhausting retries");
507 assert_eq!(
509 call_count.load(std::sync::atomic::Ordering::SeqCst),
510 3,
511 "Should have been called 3 times (1 initial + 2 retries)"
512 );
513 }
514}