1use crate::processor::Processor;
4use rusmes_metrics::MetricsCollector;
5use rusmes_proto::{Mail, MailState};
6use std::collections::HashMap;
7use std::sync::Arc;
8
9pub struct MailProcessorRouter {
11 processors: HashMap<MailState, Arc<Processor>>,
12 error_processor: Option<Arc<Processor>>,
13 metrics: Arc<MetricsCollector>,
14}
15
16impl MailProcessorRouter {
17 pub fn new(metrics: Arc<MetricsCollector>) -> Self {
19 Self {
20 processors: HashMap::new(),
21 error_processor: None,
22 metrics,
23 }
24 }
25
26 pub fn register_processor(&mut self, state: MailState, processor: Arc<Processor>) {
28 self.processors.insert(state, processor);
29 }
30
31 pub fn set_error_processor(&mut self, processor: Arc<Processor>) {
33 self.error_processor = Some(processor);
34 }
35
36 pub async fn route(&self, mut mail: Mail) -> anyhow::Result<()> {
38 let mut processing_depth = 0;
39 const MAX_DEPTH: usize = 100; loop {
42 if processing_depth > MAX_DEPTH {
43 tracing::error!(
44 "Mail processing exceeded max depth for mail {}: {}",
45 mail.id(),
46 processing_depth
47 );
48 mail.state = MailState::Error;
49 }
50
51 let processor = self.processors.get(&mail.state).ok_or_else(|| {
53 anyhow::anyhow!("No processor registered for state: {:?}", mail.state)
54 })?;
55
56 tracing::debug!(
57 "Routing mail {} to processor {} (state: {:?})",
58 mail.id(),
59 processor.name(),
60 mail.state
61 );
62
63 let original_state = mail.state.clone();
65 mail = processor.process(mail).await?;
66
67 if mail.state == MailState::Ghost {
69 tracing::info!("Mail {} completed processing (Ghost state)", mail.id());
70 self.metrics.record_mail_completed(&mail);
71 return Ok(()); }
73
74 if mail.state == original_state {
76 tracing::debug!(
77 "Mail {} processing complete in state {:?} (no state change)",
78 mail.id(),
79 mail.state
80 );
81
82 if mail.state == MailState::Error {
84 return Err(anyhow::anyhow!(
85 "Mail {} processing failed in Error state",
86 mail.id()
87 ));
88 }
89 return Ok(());
90 }
91
92 tracing::debug!(
93 "Mail {} state changed from {:?} to {:?}",
94 mail.id(),
95 original_state,
96 mail.state
97 );
98
99 processing_depth += 1;
100 }
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use super::*;
107 use crate::mailet::{Mailet, MailetAction, MailetConfig};
108 use crate::matcher::AllMatcher;
109 use crate::processor::ProcessingStep;
110 use async_trait::async_trait;
111 use bytes::Bytes;
112 use rusmes_proto::{HeaderMap, MessageBody, MimeMessage};
113
114 struct StateChangeMailet {
115 target_state: MailState,
116 }
117
118 #[async_trait]
119 impl Mailet for StateChangeMailet {
120 async fn init(&mut self, _config: MailetConfig) -> anyhow::Result<()> {
121 Ok(())
122 }
123
124 async fn service(&self, _mail: &mut Mail) -> anyhow::Result<MailetAction> {
125 Ok(MailetAction::ChangeState(self.target_state.clone()))
126 }
127
128 fn name(&self) -> &str {
129 "StateChangeMailet"
130 }
131 }
132
133 struct DropMailet;
134
135 #[async_trait]
136 impl Mailet for DropMailet {
137 async fn init(&mut self, _config: MailetConfig) -> anyhow::Result<()> {
138 Ok(())
139 }
140
141 async fn service(&self, _mail: &mut Mail) -> anyhow::Result<MailetAction> {
142 Ok(MailetAction::Drop)
143 }
144
145 fn name(&self) -> &str {
146 "DropMailet"
147 }
148 }
149
150 #[tokio::test]
151 async fn test_router_state_change() {
152 let metrics = Arc::new(MetricsCollector::new());
153 let mut router = MailProcessorRouter::new(metrics);
154
155 let mut root_processor = Processor::new("root", MailState::Root);
157 root_processor.add_step(ProcessingStep::new(
158 Arc::new(AllMatcher),
159 Arc::new(StateChangeMailet {
160 target_state: MailState::Transport,
161 }),
162 ));
163
164 let mut transport_processor = Processor::new("transport", MailState::Transport);
166 transport_processor.add_step(ProcessingStep::new(
167 Arc::new(AllMatcher),
168 Arc::new(DropMailet),
169 ));
170
171 router.register_processor(MailState::Root, Arc::new(root_processor));
172 router.register_processor(MailState::Transport, Arc::new(transport_processor));
173
174 let message = MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from("Test")));
175 let mail = Mail::new(
176 None,
177 vec!["user@example.com".parse().unwrap()],
178 message,
179 None,
180 None,
181 );
182
183 router.route(mail).await.unwrap();
184 }
185}