1use crate::mailet::Mailet;
4use crate::matcher::Matcher;
5use rusmes_proto::{Mail, MailState};
6use std::sync::Arc;
7
8pub struct ProcessingStep {
10 pub matcher: Arc<dyn Matcher>,
11 pub mailet: Arc<dyn Mailet>,
12}
13
14impl ProcessingStep {
15 pub fn new(matcher: Arc<dyn Matcher>, mailet: Arc<dyn Mailet>) -> Self {
17 Self { matcher, mailet }
18 }
19}
20
21pub struct Processor {
23 name: String,
24 state: MailState,
25 steps: Vec<ProcessingStep>,
26 thread_pool_size: usize,
27}
28
29impl Processor {
30 pub fn new(name: impl Into<String>, state: MailState) -> Self {
32 Self {
33 name: name.into(),
34 state,
35 steps: Vec::new(),
36 thread_pool_size: 4,
37 }
38 }
39
40 pub fn add_step(&mut self, step: ProcessingStep) {
42 self.steps.push(step);
43 }
44
45 pub fn set_thread_pool_size(&mut self, size: usize) {
47 self.thread_pool_size = size;
48 }
49
50 pub fn name(&self) -> &str {
52 &self.name
53 }
54
55 pub fn state(&self) -> &MailState {
57 &self.state
58 }
59
60 pub async fn process(&self, mut mail: Mail) -> anyhow::Result<Mail> {
62 tracing::debug!(
63 "Processing mail {} in processor {} (state: {})",
64 mail.id(),
65 self.name,
66 self.state
67 );
68
69 for step in &self.steps {
70 let matched = step.matcher.match_mail(&mail).await?;
72
73 if matched.is_empty() {
74 tracing::trace!(
75 "Matcher {} matched no recipients, skipping mailet {}",
76 step.matcher.name(),
77 step.mailet.name()
78 );
79 continue; }
81
82 if matched.len() < mail.recipients().len() {
84 tracing::trace!(
85 "Matcher {} partially matched {}/{} recipients",
86 step.matcher.name(),
87 matched.len(),
88 mail.recipients().len()
89 );
90
91 let (mut matched_mail, unmatched_mail) = mail.split(matched);
92
93 let action = step.mailet.service(&mut matched_mail).await?;
95 tracing::debug!(
96 "Mailet {} returned action: {:?}",
97 step.mailet.name(),
98 action
99 );
100
101 mail = unmatched_mail;
103
104 if matched_mail.state != self.state {
106 tracing::debug!(
108 "Matched mail state changed from {:?} to {:?}",
109 self.state,
110 matched_mail.state
111 );
112 }
113 } else {
114 tracing::trace!(
116 "Matcher {} matched all {} recipients",
117 step.matcher.name(),
118 mail.recipients().len()
119 );
120
121 let action = step.mailet.service(&mut mail).await?;
122 tracing::debug!(
123 "Mailet {} returned action: {:?}",
124 step.mailet.name(),
125 action
126 );
127 }
128
129 if mail.state != self.state {
131 tracing::debug!(
132 "Mail state changed from {:?} to {:?}, exiting processor",
133 self.state,
134 mail.state
135 );
136 return Ok(mail); }
138 }
139
140 tracing::debug!(
141 "Mail {} completed processing in processor {}",
142 mail.id(),
143 self.name
144 );
145 Ok(mail)
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152 use crate::mailet::{MailetAction, MailetConfig};
153 use crate::matcher::AllMatcher;
154 use async_trait::async_trait;
155 use bytes::Bytes;
156 use rusmes_proto::{HeaderMap, MessageBody, MimeMessage};
157
158 struct TestMailet {
159 name: String,
160 }
161
162 #[async_trait]
163 impl Mailet for TestMailet {
164 async fn init(&mut self, _config: MailetConfig) -> anyhow::Result<()> {
165 Ok(())
166 }
167
168 async fn service(&self, mail: &mut Mail) -> anyhow::Result<MailetAction> {
169 mail.set_attribute("processed_by", self.name.clone());
170 Ok(MailetAction::Continue)
171 }
172
173 fn name(&self) -> &str {
174 &self.name
175 }
176 }
177
178 #[tokio::test]
179 async fn test_processor_chain() {
180 let mut processor = Processor::new("test", MailState::Root);
181
182 let mailet1 = Arc::new(TestMailet {
183 name: "mailet1".to_string(),
184 });
185 let mailet2 = Arc::new(TestMailet {
186 name: "mailet2".to_string(),
187 });
188
189 processor.add_step(ProcessingStep::new(Arc::new(AllMatcher), mailet1));
190 processor.add_step(ProcessingStep::new(Arc::new(AllMatcher), mailet2));
191
192 let message = MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from("Test")));
193 let mail = Mail::new(
194 None,
195 vec!["user@example.com".parse().unwrap()],
196 message,
197 None,
198 None,
199 );
200
201 let result = processor.process(mail).await.unwrap();
202 assert!(result.get_attribute("processed_by").is_some());
203 }
204}