Skip to main content

rusmes_core/
processor.rs

1//! Processor chain for mailet execution
2
3use crate::mailet::Mailet;
4use crate::matcher::Matcher;
5use rusmes_proto::{Mail, MailState};
6use std::sync::Arc;
7
8/// Matcher-Mailet pair
9pub struct ProcessingStep {
10    pub matcher: Arc<dyn Matcher>,
11    pub mailet: Arc<dyn Mailet>,
12}
13
14impl ProcessingStep {
15    /// Create a new processing step
16    pub fn new(matcher: Arc<dyn Matcher>, mailet: Arc<dyn Mailet>) -> Self {
17        Self { matcher, mailet }
18    }
19}
20
21/// Named processor chain for a specific state
22pub struct Processor {
23    name: String,
24    state: MailState,
25    steps: Vec<ProcessingStep>,
26    thread_pool_size: usize,
27}
28
29impl Processor {
30    /// Create a new processor
31    pub fn new(name: impl Into<String>, state: MailState) -> Self {
32        Self {
33            name: name.into(),
34            state,
35            steps: Vec::new(),
36            thread_pool_size: 4,
37        }
38    }
39
40    /// Add a processing step
41    pub fn add_step(&mut self, step: ProcessingStep) {
42        self.steps.push(step);
43    }
44
45    /// Set thread pool size
46    pub fn set_thread_pool_size(&mut self, size: usize) {
47        self.thread_pool_size = size;
48    }
49
50    /// Get processor name
51    pub fn name(&self) -> &str {
52        &self.name
53    }
54
55    /// Get processor state
56    pub fn state(&self) -> &MailState {
57        &self.state
58    }
59
60    /// Process a mail through the chain
61    pub async fn process(&self, mut mail: Mail) -> anyhow::Result<Mail> {
62        tracing::debug!(
63            "Processing mail {} in processor {} (state: {})",
64            mail.id(),
65            self.name,
66            self.state
67        );
68
69        for step in &self.steps {
70            // Get matching recipients
71            let matched = step.matcher.match_mail(&mail).await?;
72
73            if matched.is_empty() {
74                tracing::trace!(
75                    "Matcher {} matched no recipients, skipping mailet {}",
76                    step.matcher.name(),
77                    step.mailet.name()
78                );
79                continue; // No match, skip this mailet
80            }
81
82            // Fork mail if partial match
83            if matched.len() < mail.recipients().len() {
84                tracing::trace!(
85                    "Matcher {} partially matched {}/{} recipients",
86                    step.matcher.name(),
87                    matched.len(),
88                    mail.recipients().len()
89                );
90
91                let (mut matched_mail, unmatched_mail) = mail.split(matched);
92
93                // Process matched portion
94                let action = step.mailet.service(&mut matched_mail).await?;
95                tracing::debug!(
96                    "Mailet {} returned action: {:?}",
97                    step.mailet.name(),
98                    action
99                );
100
101                // Continue with unmatched
102                mail = unmatched_mail;
103
104                // Handle state changes from matched portion
105                if matched_mail.state != self.state {
106                    // Would need to re-route matched_mail here in a real implementation
107                    tracing::debug!(
108                        "Matched mail state changed from {:?} to {:?}",
109                        self.state,
110                        matched_mail.state
111                    );
112                }
113            } else {
114                // All recipients match
115                tracing::trace!(
116                    "Matcher {} matched all {} recipients",
117                    step.matcher.name(),
118                    mail.recipients().len()
119                );
120
121                let action = step.mailet.service(&mut mail).await?;
122                tracing::debug!(
123                    "Mailet {} returned action: {:?}",
124                    step.mailet.name(),
125                    action
126                );
127            }
128
129            // Check if state changed
130            if mail.state != self.state {
131                tracing::debug!(
132                    "Mail state changed from {:?} to {:?}, exiting processor",
133                    self.state,
134                    mail.state
135                );
136                return Ok(mail); // Forward to different processor
137            }
138        }
139
140        tracing::debug!(
141            "Mail {} completed processing in processor {}",
142            mail.id(),
143            self.name
144        );
145        Ok(mail)
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152    use crate::mailet::{MailetAction, MailetConfig};
153    use crate::matcher::AllMatcher;
154    use async_trait::async_trait;
155    use bytes::Bytes;
156    use rusmes_proto::{HeaderMap, MessageBody, MimeMessage};
157
158    struct TestMailet {
159        name: String,
160    }
161
162    #[async_trait]
163    impl Mailet for TestMailet {
164        async fn init(&mut self, _config: MailetConfig) -> anyhow::Result<()> {
165            Ok(())
166        }
167
168        async fn service(&self, mail: &mut Mail) -> anyhow::Result<MailetAction> {
169            mail.set_attribute("processed_by", self.name.clone());
170            Ok(MailetAction::Continue)
171        }
172
173        fn name(&self) -> &str {
174            &self.name
175        }
176    }
177
178    #[tokio::test]
179    async fn test_processor_chain() {
180        let mut processor = Processor::new("test", MailState::Root);
181
182        let mailet1 = Arc::new(TestMailet {
183            name: "mailet1".to_string(),
184        });
185        let mailet2 = Arc::new(TestMailet {
186            name: "mailet2".to_string(),
187        });
188
189        processor.add_step(ProcessingStep::new(Arc::new(AllMatcher), mailet1));
190        processor.add_step(ProcessingStep::new(Arc::new(AllMatcher), mailet2));
191
192        let message = MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from("Test")));
193        let mail = Mail::new(
194            None,
195            vec!["user@example.com".parse().unwrap()],
196            message,
197            None,
198            None,
199        );
200
201        let result = processor.process(mail).await.unwrap();
202        assert!(result.get_attribute("processed_by").is_some());
203    }
204}