Skip to main content

rusmes_core/
router.rs

1//! Mail processor router
2
3use crate::processor::Processor;
4use rusmes_metrics::MetricsCollector;
5use rusmes_proto::{Mail, MailState};
6use std::collections::HashMap;
7use std::sync::Arc;
8
9/// Routes mail between processors based on state
10pub struct MailProcessorRouter {
11    processors: HashMap<MailState, Arc<Processor>>,
12    error_processor: Option<Arc<Processor>>,
13    metrics: Arc<MetricsCollector>,
14}
15
16impl MailProcessorRouter {
17    /// Create a new mail processor router
18    pub fn new(metrics: Arc<MetricsCollector>) -> Self {
19        Self {
20            processors: HashMap::new(),
21            error_processor: None,
22            metrics,
23        }
24    }
25
26    /// Register a processor for a state
27    pub fn register_processor(&mut self, state: MailState, processor: Arc<Processor>) {
28        self.processors.insert(state, processor);
29    }
30
31    /// Set the error processor
32    pub fn set_error_processor(&mut self, processor: Arc<Processor>) {
33        self.error_processor = Some(processor);
34    }
35
36    /// Route and process a mail message
37    pub async fn route(&self, mut mail: Mail) -> anyhow::Result<()> {
38        let mut processing_depth = 0;
39        const MAX_DEPTH: usize = 100; // Prevent infinite loops
40
41        loop {
42            if processing_depth > MAX_DEPTH {
43                tracing::error!(
44                    "Mail processing exceeded max depth for mail {}: {}",
45                    mail.id(),
46                    processing_depth
47                );
48                mail.state = MailState::Error;
49            }
50
51            // Get processor for current state
52            let processor = self.processors.get(&mail.state).ok_or_else(|| {
53                anyhow::anyhow!("No processor registered for state: {:?}", mail.state)
54            })?;
55
56            tracing::debug!(
57                "Routing mail {} to processor {} (state: {:?})",
58                mail.id(),
59                processor.name(),
60                mail.state
61            );
62
63            // Process mail
64            let original_state = mail.state.clone();
65            mail = processor.process(mail).await?;
66
67            // Check for completion
68            if mail.state == MailState::Ghost {
69                tracing::info!("Mail {} completed processing (Ghost state)", mail.id());
70                self.metrics.record_mail_completed(&mail);
71                return Ok(()); // Mail consumed
72            }
73
74            // Check if state changed
75            if mail.state == original_state {
76                tracing::debug!(
77                    "Mail {} processing complete in state {:?} (no state change)",
78                    mail.id(),
79                    mail.state
80                );
81
82                // No state change means processing complete for this state
83                if mail.state == MailState::Error {
84                    return Err(anyhow::anyhow!(
85                        "Mail {} processing failed in Error state",
86                        mail.id()
87                    ));
88                }
89                return Ok(());
90            }
91
92            tracing::debug!(
93                "Mail {} state changed from {:?} to {:?}",
94                mail.id(),
95                original_state,
96                mail.state
97            );
98
99            processing_depth += 1;
100        }
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use super::*;
107    use crate::mailet::{Mailet, MailetAction, MailetConfig};
108    use crate::matcher::AllMatcher;
109    use crate::processor::ProcessingStep;
110    use async_trait::async_trait;
111    use bytes::Bytes;
112    use rusmes_proto::{HeaderMap, MessageBody, MimeMessage};
113
114    struct StateChangeMailet {
115        target_state: MailState,
116    }
117
118    #[async_trait]
119    impl Mailet for StateChangeMailet {
120        async fn init(&mut self, _config: MailetConfig) -> anyhow::Result<()> {
121            Ok(())
122        }
123
124        async fn service(&self, _mail: &mut Mail) -> anyhow::Result<MailetAction> {
125            Ok(MailetAction::ChangeState(self.target_state.clone()))
126        }
127
128        fn name(&self) -> &str {
129            "StateChangeMailet"
130        }
131    }
132
133    struct DropMailet;
134
135    #[async_trait]
136    impl Mailet for DropMailet {
137        async fn init(&mut self, _config: MailetConfig) -> anyhow::Result<()> {
138            Ok(())
139        }
140
141        async fn service(&self, _mail: &mut Mail) -> anyhow::Result<MailetAction> {
142            Ok(MailetAction::Drop)
143        }
144
145        fn name(&self) -> &str {
146            "DropMailet"
147        }
148    }
149
150    #[tokio::test]
151    async fn test_router_state_change() {
152        let metrics = Arc::new(MetricsCollector::new());
153        let mut router = MailProcessorRouter::new(metrics);
154
155        // Root processor - changes to Transport
156        let mut root_processor = Processor::new("root", MailState::Root);
157        root_processor.add_step(ProcessingStep::new(
158            Arc::new(AllMatcher),
159            Arc::new(StateChangeMailet {
160                target_state: MailState::Transport,
161            }),
162        ));
163
164        // Transport processor - drops mail
165        let mut transport_processor = Processor::new("transport", MailState::Transport);
166        transport_processor.add_step(ProcessingStep::new(
167            Arc::new(AllMatcher),
168            Arc::new(DropMailet),
169        ));
170
171        router.register_processor(MailState::Root, Arc::new(root_processor));
172        router.register_processor(MailState::Transport, Arc::new(transport_processor));
173
174        let message = MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from("Test")));
175        let mail = Mail::new(
176            None,
177            vec!["user@example.com".parse().unwrap()],
178            message,
179            None,
180            None,
181        );
182
183        router.route(mail).await.unwrap();
184    }
185}