Skip to main content

mail_laser/smtp/
mod.rs

1mod email_parser;
2mod smtp_protocol;
3
4use crate::config::Config;
5use crate::webhook::{EmailPayload, ForwardEmail};
6use acton_reactive::prelude::*;
7use anyhow::{Context, Result};
8use email_parser::EmailParser;
9use log::{error, info, trace, warn};
10use smtp_protocol::{SmtpCommandResult, SmtpProtocol, SmtpState};
11use tokio::io::{AsyncRead, AsyncWrite};
12use tokio::net::{TcpListener, TcpStream};
13use tokio_util::sync::CancellationToken;
14
15use rcgen::generate_simple_self_signed;
16use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
17use rustls::ServerConfig as RustlsServerConfig;
18use std::sync::Arc;
19use tokio_rustls::TlsAcceptor;
20
21// --- SmtpListenerActor ---
22
23#[acton_actor]
24pub struct SmtpListenerState;
25
26impl SmtpListenerState {
27    pub async fn create(
28        runtime: &mut ActorRuntime,
29        config: &Config,
30        webhook_handle: ActorHandle,
31    ) -> anyhow::Result<ActorHandle> {
32        let actor_config = ActorConfig::new(Ern::with_root("smtp-listener")?, None, None)?
33            .with_restart_policy(RestartPolicy::Permanent);
34
35        let mut builder = runtime.new_actor_with_config::<Self>(actor_config);
36
37        let cancel = CancellationToken::new();
38        let cancel_for_loop = cancel.clone();
39        let cancel_for_stop = cancel.clone();
40
41        let smtp_config = config.clone();
42        let wh = webhook_handle.clone();
43
44        builder.after_start(move |_actor| {
45            let config = smtp_config.clone();
46            let webhook_handle = wh.clone();
47            let cancel = cancel_for_loop.clone();
48
49            tokio::spawn(async move {
50                let addr = format!("{}:{}", config.smtp_bind_address, config.smtp_port);
51                let listener = match TcpListener::bind(&addr).await {
52                    Ok(l) => {
53                        tracing::info!("SMTP server listening on {}", addr);
54                        l
55                    }
56                    Err(e) => {
57                        tracing::error!("Failed to bind SMTP: {}", e);
58                        return;
59                    }
60                };
61
62                loop {
63                    tokio::select! {
64                        result = listener.accept() => {
65                            match result {
66                                Ok((stream, remote_addr)) => {
67                                    tracing::info!("New connection from: {}", remote_addr);
68                                    let wh = webhook_handle.clone();
69                                    let targets = config.target_emails.clone();
70                                    let prefixes = config.header_prefixes.clone();
71                                    tokio::spawn(async move {
72                                        if let Err(e) = handle_connection(stream, wh, targets, prefixes).await {
73                                            tracing::error!("Error handling SMTP connection from {}: {:#?}", remote_addr, e);
74                                        }
75                                    });
76                                }
77                                Err(e) => tracing::error!("Error accepting connection: {:?}", e),
78                            }
79                        }
80                        _ = cancel.cancelled() => {
81                            tracing::info!("SMTP listener shutting down gracefully");
82                            break;
83                        }
84                    }
85                }
86            });
87
88            Reply::ready()
89        });
90
91        builder.before_stop(move |_| {
92            cancel_for_stop.cancel();
93            Reply::ready()
94        });
95
96        Ok(builder.start().await)
97    }
98}
99
100// --- Certificate generation (unchanged) ---
101
102fn generate_self_signed_cert() -> Result<(CertificateDer<'static>, PrivateKeyDer<'static>)> {
103    let subject_alt_names = vec!["localhost".to_string()];
104
105    let certified_key = generate_simple_self_signed(subject_alt_names)
106        .context("Failed to generate self-signed certificate using rcgen")?;
107
108    let cert_der = certified_key.cert.der().to_vec();
109    let key_der = certified_key.signing_key.serialize_der();
110
111    Ok((
112        CertificateDer::from(cert_der),
113        PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(key_der)),
114    ))
115}
116
117// --- Connection handlers ---
118
119async fn handle_connection(
120    mut stream: TcpStream,
121    webhook_handle: ActorHandle,
122    target_emails: Vec<String>,
123    header_prefixes: Vec<String>,
124) -> Result<()> {
125    let mut sender = String::new();
126    let mut accepted_recipient = String::new();
127    let mut email_data = String::new();
128    let mut collecting_data = false;
129
130    let protocol_result = async {
131        let (read_half, write_half) = tokio::io::split(&mut stream);
132        let reader = tokio::io::BufReader::new(read_half);
133        let writer = tokio::io::BufWriter::new(write_half);
134        let mut initial_protocol = SmtpProtocol::new(reader, writer);
135
136        initial_protocol.send_greeting().await?;
137
138        loop {
139            trace!("SMTP({:?}): Waiting for command...", initial_protocol.get_state());
140            let line = initial_protocol.read_line().await?;
141            trace!("SMTP({:?}): Received line (len {}): {:?}", initial_protocol.get_state(), line.len(), line);
142
143            if initial_protocol.get_state() != SmtpState::Data && line.is_empty() {
144                info!("Connection closed by client (EOF). State: {:?}", initial_protocol.get_state());
145                return Ok(());
146            }
147
148            let result = initial_protocol.process_command(&line).await?;
149
150            match result {
151                SmtpCommandResult::StartTls => {
152                    info!("Client initiated STARTTLS. Proceeding with handshake.");
153                    return Err(anyhow::anyhow!("STARTTLS"));
154                }
155                SmtpCommandResult::Quit => {
156                    info!("Client quit.");
157                    return Ok(());
158                }
159                SmtpCommandResult::MailFrom(email) => {
160                    sender = email;
161                }
162                SmtpCommandResult::RcptTo(email) => {
163                    let received_email = email;
164                    let received_email_lower = received_email.to_lowercase();
165                    if target_emails.iter().any(|target| target.to_lowercase() == received_email_lower) {
166                        accepted_recipient = received_email;
167                        initial_protocol.write_line("250 OK").await?;
168                    } else {
169                        initial_protocol.write_line("550 No such user here").await?;
170                        accepted_recipient.clear();
171                    }
172                }
173                SmtpCommandResult::DataStart => {
174                    if sender.is_empty() || accepted_recipient.is_empty() {
175                        warn!("DATA command received without valid MAIL FROM or RCPT TO. Rejecting.");
176                        initial_protocol.write_line("503 Bad sequence of commands (MAIL FROM and RCPT TO required first)").await?;
177                    } else {
178                        collecting_data = true;
179                        email_data.clear();
180                    }
181                }
182                SmtpCommandResult::DataLine(line_content) => {
183                    if collecting_data {
184                        email_data.push_str(&line_content);
185                        email_data.push_str("\r\n");
186                    } else {
187                        warn!("Received DataLine result when not in Data state.");
188                    }
189                }
190                SmtpCommandResult::DataEnd => {
191                    collecting_data = false;
192                    if sender.is_empty() || accepted_recipient.is_empty() {
193                        warn!("DataEnd received but sender or recipient was missing. Message likely not processed.");
194                    } else {
195                        match EmailParser::parse(email_data.as_bytes(), &header_prefixes) {
196                            Ok((subject, from_name, text_body, html_body, matched_headers)) => {
197                                info!("Received email from {} to {} (Subject: '{}')", sender, accepted_recipient, subject);
198                                let headers = if matched_headers.is_empty() { None } else { Some(matched_headers) };
199                                let email_payload = EmailPayload {
200                                    sender: sender.clone(),
201                                    sender_name: from_name,
202                                    recipient: accepted_recipient.clone(),
203                                    subject,
204                                    body: text_body,
205                                    html_body,
206                                    headers,
207                                };
208                                webhook_handle.send(ForwardEmail { payload: email_payload }).await;
209                            }
210                            Err(e) => {
211                                error!("Failed to parse email data from {}: {:#}", sender, e);
212                            }
213                        }
214                    }
215                    sender.clear();
216                    accepted_recipient.clear();
217                    email_data.clear();
218                }
219                SmtpCommandResult::Continue => {}
220            }
221        }
222    }
223    .await;
224
225    match protocol_result {
226        Ok(()) => Ok(()),
227        Err(e) if e.to_string() == "STARTTLS" => {
228            handle_starttls(stream, webhook_handle, target_emails, header_prefixes).await
229        }
230        Err(e) => Err(e),
231    }
232}
233
234async fn handle_starttls(
235    stream: TcpStream,
236    webhook_handle: ActorHandle,
237    target_emails: Vec<String>,
238    header_prefixes: Vec<String>,
239) -> Result<()> {
240    let (cert, key) = generate_self_signed_cert()
241        .context("Failed to generate self-signed certificate for STARTTLS")?;
242
243    let tls_config = RustlsServerConfig::builder()
244        .with_no_client_auth()
245        .with_single_cert(vec![cert], key)
246        .map_err(|e| anyhow::anyhow!("Failed to create rustls config: {}", e))?;
247
248    let acceptor = TlsAcceptor::from(Arc::new(tls_config));
249
250    match acceptor.accept(stream).await {
251        Ok(tls_stream) => {
252            info!("STARTTLS handshake successful.");
253            handle_secure_session(tls_stream, webhook_handle, target_emails, header_prefixes).await
254        }
255        Err(e) => {
256            error!("STARTTLS handshake failed: {:?}", e);
257            Err(anyhow::Error::new(e).context("STARTTLS handshake failed"))
258        }
259    }
260}
261
262async fn handle_secure_session<T>(
263    tls_stream: T,
264    webhook_handle: ActorHandle,
265    target_emails: Vec<String>,
266    header_prefixes: Vec<String>,
267) -> Result<()>
268where
269    T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
270{
271    let (read_half, write_half) = tokio::io::split(tls_stream);
272    let reader = tokio::io::BufReader::new(read_half);
273    let writer = tokio::io::BufWriter::new(write_half);
274    let mut protocol = SmtpProtocol::new(reader, writer);
275
276    let mut sender = String::new();
277    let mut accepted_recipient = String::new();
278    let mut email_data = String::new();
279    let mut collecting_data = false;
280
281    loop {
282        trace!(
283            "SMTP(TLS/{:?}): Waiting for command...",
284            protocol.get_state()
285        );
286        let line = protocol.read_line().await?;
287        trace!(
288            "SMTP(TLS/{:?}): Received line (len {}): {:?}",
289            protocol.get_state(),
290            line.len(),
291            line
292        );
293
294        if protocol.get_state() != SmtpState::Data && line.is_empty() {
295            info!("Connection closed by client (EOF) during secure session.");
296            break;
297        }
298
299        let result = protocol.process_command(&line).await?;
300
301        match result {
302            SmtpCommandResult::Quit => break,
303            SmtpCommandResult::MailFrom(email) => {
304                sender = email;
305            }
306            SmtpCommandResult::RcptTo(email) => {
307                let received_email = email;
308                let received_email_lower = received_email.to_lowercase();
309                if target_emails
310                    .iter()
311                    .any(|target| target.to_lowercase() == received_email_lower)
312                {
313                    accepted_recipient = received_email;
314                    protocol.write_line("250 OK").await?;
315                } else {
316                    protocol.write_line("550 No such user here").await?;
317                    accepted_recipient.clear();
318                }
319            }
320            SmtpCommandResult::DataStart => {
321                collecting_data = true;
322                email_data.clear();
323            }
324            SmtpCommandResult::DataLine(line_content) => {
325                if collecting_data {
326                    email_data.push_str(&line_content);
327                    email_data.push_str("\r\n");
328                }
329            }
330            SmtpCommandResult::DataEnd => {
331                collecting_data = false;
332                let (subject, from_name, text_body, html_body, matched_headers) =
333                    EmailParser::parse(email_data.as_bytes(), &header_prefixes)?;
334                info!(
335                    "Received email (TLS) from {} to {} (Subject: '{}')",
336                    sender, accepted_recipient, subject
337                );
338
339                let headers = if matched_headers.is_empty() {
340                    None
341                } else {
342                    Some(matched_headers)
343                };
344                let email_payload = EmailPayload {
345                    sender: sender.clone(),
346                    sender_name: from_name,
347                    recipient: accepted_recipient.clone(),
348                    subject,
349                    body: text_body,
350                    html_body,
351                    headers,
352                };
353                webhook_handle
354                    .send(ForwardEmail {
355                        payload: email_payload,
356                    })
357                    .await;
358
359                sender.clear();
360                accepted_recipient.clear();
361                email_data.clear();
362            }
363            SmtpCommandResult::Continue => {}
364            SmtpCommandResult::StartTls => {
365                warn!("Received STARTTLS command within secure session. Sending error.");
366                protocol.write_line("503 STARTTLS already active").await?;
367            }
368        }
369    }
370    Ok(())
371}