1mod email_parser;
2mod smtp_protocol;
3
4use crate::config::Config;
5use crate::webhook::{EmailPayload, ForwardEmail};
6use acton_reactive::prelude::*;
7use anyhow::{Context, Result};
8use email_parser::EmailParser;
9use log::{error, info, trace, warn};
10use smtp_protocol::{SmtpCommandResult, SmtpProtocol, SmtpState};
11use tokio::io::{AsyncRead, AsyncWrite};
12use tokio::net::{TcpListener, TcpStream};
13use tokio_util::sync::CancellationToken;
14
15use rcgen::generate_simple_self_signed;
16use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
17use rustls::ServerConfig as RustlsServerConfig;
18use std::sync::Arc;
19use tokio_rustls::TlsAcceptor;
20
21#[acton_actor]
24pub struct SmtpListenerState;
25
26impl SmtpListenerState {
27 pub async fn create(
28 runtime: &mut ActorRuntime,
29 config: &Config,
30 webhook_handle: ActorHandle,
31 ) -> anyhow::Result<ActorHandle> {
32 let actor_config = ActorConfig::new(Ern::with_root("smtp-listener")?, None, None)?
33 .with_restart_policy(RestartPolicy::Permanent);
34
35 let mut builder = runtime.new_actor_with_config::<Self>(actor_config);
36
37 let cancel = CancellationToken::new();
38 let cancel_for_loop = cancel.clone();
39 let cancel_for_stop = cancel.clone();
40
41 let smtp_config = config.clone();
42 let wh = webhook_handle.clone();
43
44 builder.after_start(move |_actor| {
45 let config = smtp_config.clone();
46 let webhook_handle = wh.clone();
47 let cancel = cancel_for_loop.clone();
48
49 tokio::spawn(async move {
50 let addr = format!("{}:{}", config.smtp_bind_address, config.smtp_port);
51 let listener = match TcpListener::bind(&addr).await {
52 Ok(l) => {
53 tracing::info!("SMTP server listening on {}", addr);
54 l
55 }
56 Err(e) => {
57 tracing::error!("Failed to bind SMTP: {}", e);
58 return;
59 }
60 };
61
62 loop {
63 tokio::select! {
64 result = listener.accept() => {
65 match result {
66 Ok((stream, remote_addr)) => {
67 tracing::info!("New connection from: {}", remote_addr);
68 let wh = webhook_handle.clone();
69 let targets = config.target_emails.clone();
70 let prefixes = config.header_prefixes.clone();
71 tokio::spawn(async move {
72 if let Err(e) = handle_connection(stream, wh, targets, prefixes).await {
73 tracing::error!("Error handling SMTP connection from {}: {:#?}", remote_addr, e);
74 }
75 });
76 }
77 Err(e) => tracing::error!("Error accepting connection: {:?}", e),
78 }
79 }
80 _ = cancel.cancelled() => {
81 tracing::info!("SMTP listener shutting down gracefully");
82 break;
83 }
84 }
85 }
86 });
87
88 Reply::ready()
89 });
90
91 builder.before_stop(move |_| {
92 cancel_for_stop.cancel();
93 Reply::ready()
94 });
95
96 Ok(builder.start().await)
97 }
98}
99
100fn generate_self_signed_cert() -> Result<(CertificateDer<'static>, PrivateKeyDer<'static>)> {
103 let subject_alt_names = vec!["localhost".to_string()];
104
105 let certified_key = generate_simple_self_signed(subject_alt_names)
106 .context("Failed to generate self-signed certificate using rcgen")?;
107
108 let cert_der = certified_key.cert.der().to_vec();
109 let key_der = certified_key.signing_key.serialize_der();
110
111 Ok((
112 CertificateDer::from(cert_der),
113 PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(key_der)),
114 ))
115}
116
117async fn handle_connection(
120 mut stream: TcpStream,
121 webhook_handle: ActorHandle,
122 target_emails: Vec<String>,
123 header_prefixes: Vec<String>,
124) -> Result<()> {
125 let mut sender = String::new();
126 let mut accepted_recipient = String::new();
127 let mut email_data = String::new();
128 let mut collecting_data = false;
129
130 let protocol_result = async {
131 let (read_half, write_half) = tokio::io::split(&mut stream);
132 let reader = tokio::io::BufReader::new(read_half);
133 let writer = tokio::io::BufWriter::new(write_half);
134 let mut initial_protocol = SmtpProtocol::new(reader, writer);
135
136 initial_protocol.send_greeting().await?;
137
138 loop {
139 trace!("SMTP({:?}): Waiting for command...", initial_protocol.get_state());
140 let line = initial_protocol.read_line().await?;
141 trace!("SMTP({:?}): Received line (len {}): {:?}", initial_protocol.get_state(), line.len(), line);
142
143 if initial_protocol.get_state() != SmtpState::Data && line.is_empty() {
144 info!("Connection closed by client (EOF). State: {:?}", initial_protocol.get_state());
145 return Ok(());
146 }
147
148 let result = initial_protocol.process_command(&line).await?;
149
150 match result {
151 SmtpCommandResult::StartTls => {
152 info!("Client initiated STARTTLS. Proceeding with handshake.");
153 return Err(anyhow::anyhow!("STARTTLS"));
154 }
155 SmtpCommandResult::Quit => {
156 info!("Client quit.");
157 return Ok(());
158 }
159 SmtpCommandResult::MailFrom(email) => {
160 sender = email;
161 }
162 SmtpCommandResult::RcptTo(email) => {
163 let received_email = email;
164 let received_email_lower = received_email.to_lowercase();
165 if target_emails.iter().any(|target| target.to_lowercase() == received_email_lower) {
166 accepted_recipient = received_email;
167 initial_protocol.write_line("250 OK").await?;
168 } else {
169 initial_protocol.write_line("550 No such user here").await?;
170 accepted_recipient.clear();
171 }
172 }
173 SmtpCommandResult::DataStart => {
174 if sender.is_empty() || accepted_recipient.is_empty() {
175 warn!("DATA command received without valid MAIL FROM or RCPT TO. Rejecting.");
176 initial_protocol.write_line("503 Bad sequence of commands (MAIL FROM and RCPT TO required first)").await?;
177 } else {
178 collecting_data = true;
179 email_data.clear();
180 }
181 }
182 SmtpCommandResult::DataLine(line_content) => {
183 if collecting_data {
184 email_data.push_str(&line_content);
185 email_data.push_str("\r\n");
186 } else {
187 warn!("Received DataLine result when not in Data state.");
188 }
189 }
190 SmtpCommandResult::DataEnd => {
191 collecting_data = false;
192 if sender.is_empty() || accepted_recipient.is_empty() {
193 warn!("DataEnd received but sender or recipient was missing. Message likely not processed.");
194 } else {
195 match EmailParser::parse(email_data.as_bytes(), &header_prefixes) {
196 Ok((subject, from_name, text_body, html_body, matched_headers)) => {
197 info!("Received email from {} to {} (Subject: '{}')", sender, accepted_recipient, subject);
198 let headers = if matched_headers.is_empty() { None } else { Some(matched_headers) };
199 let email_payload = EmailPayload {
200 sender: sender.clone(),
201 sender_name: from_name,
202 recipient: accepted_recipient.clone(),
203 subject,
204 body: text_body,
205 html_body,
206 headers,
207 };
208 webhook_handle.send(ForwardEmail { payload: email_payload }).await;
209 }
210 Err(e) => {
211 error!("Failed to parse email data from {}: {:#}", sender, e);
212 }
213 }
214 }
215 sender.clear();
216 accepted_recipient.clear();
217 email_data.clear();
218 }
219 SmtpCommandResult::Continue => {}
220 }
221 }
222 }
223 .await;
224
225 match protocol_result {
226 Ok(()) => Ok(()),
227 Err(e) if e.to_string() == "STARTTLS" => {
228 handle_starttls(stream, webhook_handle, target_emails, header_prefixes).await
229 }
230 Err(e) => Err(e),
231 }
232}
233
234async fn handle_starttls(
235 stream: TcpStream,
236 webhook_handle: ActorHandle,
237 target_emails: Vec<String>,
238 header_prefixes: Vec<String>,
239) -> Result<()> {
240 let (cert, key) = generate_self_signed_cert()
241 .context("Failed to generate self-signed certificate for STARTTLS")?;
242
243 let tls_config = RustlsServerConfig::builder()
244 .with_no_client_auth()
245 .with_single_cert(vec![cert], key)
246 .map_err(|e| anyhow::anyhow!("Failed to create rustls config: {}", e))?;
247
248 let acceptor = TlsAcceptor::from(Arc::new(tls_config));
249
250 match acceptor.accept(stream).await {
251 Ok(tls_stream) => {
252 info!("STARTTLS handshake successful.");
253 handle_secure_session(tls_stream, webhook_handle, target_emails, header_prefixes).await
254 }
255 Err(e) => {
256 error!("STARTTLS handshake failed: {:?}", e);
257 Err(anyhow::Error::new(e).context("STARTTLS handshake failed"))
258 }
259 }
260}
261
262async fn handle_secure_session<T>(
263 tls_stream: T,
264 webhook_handle: ActorHandle,
265 target_emails: Vec<String>,
266 header_prefixes: Vec<String>,
267) -> Result<()>
268where
269 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
270{
271 let (read_half, write_half) = tokio::io::split(tls_stream);
272 let reader = tokio::io::BufReader::new(read_half);
273 let writer = tokio::io::BufWriter::new(write_half);
274 let mut protocol = SmtpProtocol::new(reader, writer);
275
276 let mut sender = String::new();
277 let mut accepted_recipient = String::new();
278 let mut email_data = String::new();
279 let mut collecting_data = false;
280
281 loop {
282 trace!(
283 "SMTP(TLS/{:?}): Waiting for command...",
284 protocol.get_state()
285 );
286 let line = protocol.read_line().await?;
287 trace!(
288 "SMTP(TLS/{:?}): Received line (len {}): {:?}",
289 protocol.get_state(),
290 line.len(),
291 line
292 );
293
294 if protocol.get_state() != SmtpState::Data && line.is_empty() {
295 info!("Connection closed by client (EOF) during secure session.");
296 break;
297 }
298
299 let result = protocol.process_command(&line).await?;
300
301 match result {
302 SmtpCommandResult::Quit => break,
303 SmtpCommandResult::MailFrom(email) => {
304 sender = email;
305 }
306 SmtpCommandResult::RcptTo(email) => {
307 let received_email = email;
308 let received_email_lower = received_email.to_lowercase();
309 if target_emails
310 .iter()
311 .any(|target| target.to_lowercase() == received_email_lower)
312 {
313 accepted_recipient = received_email;
314 protocol.write_line("250 OK").await?;
315 } else {
316 protocol.write_line("550 No such user here").await?;
317 accepted_recipient.clear();
318 }
319 }
320 SmtpCommandResult::DataStart => {
321 collecting_data = true;
322 email_data.clear();
323 }
324 SmtpCommandResult::DataLine(line_content) => {
325 if collecting_data {
326 email_data.push_str(&line_content);
327 email_data.push_str("\r\n");
328 }
329 }
330 SmtpCommandResult::DataEnd => {
331 collecting_data = false;
332 let (subject, from_name, text_body, html_body, matched_headers) =
333 EmailParser::parse(email_data.as_bytes(), &header_prefixes)?;
334 info!(
335 "Received email (TLS) from {} to {} (Subject: '{}')",
336 sender, accepted_recipient, subject
337 );
338
339 let headers = if matched_headers.is_empty() {
340 None
341 } else {
342 Some(matched_headers)
343 };
344 let email_payload = EmailPayload {
345 sender: sender.clone(),
346 sender_name: from_name,
347 recipient: accepted_recipient.clone(),
348 subject,
349 body: text_body,
350 html_body,
351 headers,
352 };
353 webhook_handle
354 .send(ForwardEmail {
355 payload: email_payload,
356 })
357 .await;
358
359 sender.clear();
360 accepted_recipient.clear();
361 email_data.clear();
362 }
363 SmtpCommandResult::Continue => {}
364 SmtpCommandResult::StartTls => {
365 warn!("Received STARTTLS command within secure session. Sending error.");
366 protocol.write_line("503 STARTTLS already active").await?;
367 }
368 }
369 }
370 Ok(())
371}