mail_laser/smtp/mod.rs
1//! Handles the primary SMTP server logic, including connection handling,
2//! optional STARTTLS negotiation, command processing via `smtp_protocol`,
3//! email parsing via `email_parser`, and initiating webhook forwarding.
4
5mod email_parser;
6mod smtp_protocol;
7
8use std::sync::Arc;
9use anyhow::{Result, Context};
10use log::{info, error, trace, warn};
11use tokio::net::{TcpListener, TcpStream};
12use tokio::io::{AsyncRead, AsyncWrite}; // Required for generic TlsStream handling
13use crate::config::Config;
14use crate::webhook::{WebhookClient, EmailPayload};
15use smtp_protocol::{SmtpProtocol, SmtpCommandResult, SmtpState};
16use email_parser::EmailParser;
17
18// TLS related imports
19use rustls::ServerConfig as RustlsServerConfig;
20use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
21use tokio_rustls::TlsAcceptor;
22use rcgen::generate_simple_self_signed;
23
24/// Represents the main SMTP server instance.
25///
26/// Holds the application configuration and a shared `WebhookClient` instance
27/// used by connection handlers to forward processed emails.
28pub struct Server {
29 config: Config,
30 webhook_client: Arc<WebhookClient>, // Arc allows safe sharing across async tasks.
31}
32
33impl Server {
34 /// Creates a new SMTP `Server` instance.
35 ///
36 /// Initializes the shared `WebhookClient`.
37 ///
38 /// # Arguments
39 ///
40 /// * `config` - The application configuration.
41 pub fn new(config: Config) -> Self {
42 // Initialize the webhook client; this might panic if certs can't load.
43 let webhook_client = Arc::new(WebhookClient::new(config.clone()));
44 Server {
45 config,
46 webhook_client,
47 }
48 }
49
50 /// Runs the main SMTP server loop.
51 ///
52 /// Binds to the configured SMTP address and port, then enters an infinite loop
53 /// accepting incoming TCP connections. Each connection is handled in a separate
54 /// Tokio task via `handle_connection`.
55 ///
56 /// # Errors
57 ///
58 /// Returns an `Err` if the server fails to bind to the specified address and port.
59 /// Errors during connection acceptance or handling are logged but do not terminate the server loop.
60 pub async fn run(&self) -> Result<()> {
61 let addr = format!("{}:{}", self.config.smtp_bind_address, self.config.smtp_port);
62 // Attempt to bind the TCP listener to the configured address.
63 let listener = TcpListener::bind(&addr).await
64 .with_context(|| format!("Failed to bind SMTP server to {}", addr))?;
65 info!("SMTP server listening on {}", addr);
66
67 // Main server loop: continuously accept incoming connections.
68 loop {
69 match listener.accept().await {
70 Ok((stream, remote_addr)) => {
71 info!("New connection from: {}", remote_addr);
72 // Clone Arcs for the new task. Cloning Arc is cheap.
73 let webhook_client = Arc::clone(&self.webhook_client);
74 // Clone the Vec of target emails for the new task.
75 let target_emails = self.config.target_emails.clone();
76 let header_prefixes = self.config.header_prefixes.clone();
77 // Spawn a dedicated asynchronous task for each connection.
78 tokio::spawn(async move {
79 if let Err(e) = handle_connection(stream, webhook_client, target_emails, header_prefixes).await {
80 // Log errors from individual connection handlers.
81 // Using {:#?} includes the error source/context from anyhow.
82 error!("Error handling SMTP connection from {}: {:#?}", remote_addr, e);
83 }
84 });
85 }
86 Err(e) => {
87 // Log errors encountered during connection acceptance but continue loop.
88 error!("Error accepting connection: {:?}", e);
89 }
90 }
91 }
92 // This loop is infinite, so Ok(()) is never reached in normal operation.
93 }
94}
95
96/// Generates a self-signed TLS certificate and private key using `rcgen`.
97///
98/// Used for establishing TLS sessions via STARTTLS when no certificate is configured.
99/// Note: Self-signed certificates are generally not trusted by clients without manual configuration.
100///
101/// # Returns
102///
103/// A `Result` containing a tuple of the certificate and private key in DER format.
104///
105/// # Errors
106///
107/// Returns an `Err` if certificate generation fails.
108fn generate_self_signed_cert() -> Result<(CertificateDer<'static>, PrivateKeyDer<'static>)> {
109 // Define Subject Alternative Names (SANs) - typically includes domains/IPs the cert is valid for.
110 let subject_alt_names = vec!["localhost".to_string()]; // Example SAN
111
112 // Generate the certificate and key pair.
113 let certified_key = generate_simple_self_signed(subject_alt_names)
114 .context("Failed to generate self-signed certificate using rcgen")?;
115
116 // Extract the certificate and key in DER format required by rustls.
117 let cert_der = certified_key.cert.der().to_vec(); // Clone bytes needed for owned CertificateDer.
118 let key_der = certified_key.signing_key.serialize_der(); // Key in PKCS#8 format.
119
120 Ok((
121 CertificateDer::from(cert_der),
122 PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(key_der)) // Assume PKCS#8 format from rcgen.
123 ))
124}
125
126
127/// Handles the initial phase of a new client connection, including potential STARTTLS negotiation.
128///
129/// Sends the initial greeting and processes commands like EHLO/HELO and STARTTLS.
130/// If STARTTLS is successfully negotiated, it passes the upgraded TLS stream to `handle_secure_session`.
131///
132/// # Arguments
133///
134/// * `stream` - The raw TCP stream from the accepted connection.
135/// * `webhook_client` - Shared `WebhookClient`.
136/// * `target_emails` - The configured list of target email addresses.
137///
138/// # Errors
139///
140/// Returns `Err` if initial greeting fails, reading/processing initial commands fails,
141/// or if the STARTTLS handshake fails.
142async fn handle_connection(
143 mut stream: TcpStream, // Mutable ownership needed for potential TLS upgrade.
144 webhook_client: Arc<WebhookClient>,
145 target_emails: Vec<String>,
146 header_prefixes: Vec<String>,
147) -> Result<()> {
148 // Variables to store state during the SMTP transaction.
149 // These are needed here because this function handles the full non-TLS flow.
150 let mut sender = String::new();
151 let mut accepted_recipient = String::new();
152 let mut email_data = String::new();
153 let mut collecting_data = false;
154
155 // Scope for the initial, non-TLS protocol handler.
156 // We temporarily split the stream to use BufReader/BufWriter.
157 // The split borrows `stream` mutably. When the scope ends, the borrow ends,
158 // and we regain full ownership of `stream` for potential TLS upgrade.
159 // Scope for the protocol handler using the stream's reader/writer halves.
160 // We need to ensure this scope ends cleanly or the stream is handled correctly on STARTTLS.
161 // Let's restructure slightly to avoid dropping the protocol handler too early if no STARTTLS.
162 let protocol_result = async {
163 let (read_half, write_half) = tokio::io::split(&mut stream);
164 let reader = tokio::io::BufReader::new(read_half);
165 let writer = tokio::io::BufWriter::new(write_half);
166 let mut initial_protocol = SmtpProtocol::new(reader, writer);
167
168 // Send the initial 220 greeting.
169 initial_protocol.send_greeting().await?;
170
171 // Process commands for the entire session (unless STARTTLS happens).
172 loop {
173 trace!("SMTP({:?}): Waiting for command...", initial_protocol.get_state());
174 let line = initial_protocol.read_line().await?;
175 trace!("SMTP({:?}): Received line (len {}): {:?}", initial_protocol.get_state(), line.len(), line);
176
177 // Handle EOF, except during DATA phase.
178 if initial_protocol.get_state() != SmtpState::Data && line.is_empty() {
179 info!("Connection closed by client (EOF). State: {:?}", initial_protocol.get_state());
180 return Ok(()); // Clean exit for this async block
181 }
182
183 // Process the command using the state machine.
184 let result = initial_protocol.process_command(&line).await?;
185
186 match result {
187 SmtpCommandResult::StartTls => {
188 // Client requested TLS upgrade.
189 info!("Client initiated STARTTLS. Proceeding with handshake.");
190 // Signal that TLS should be handled outside this block.
191 return Err(anyhow::anyhow!("STARTTLS")); // Use error to signal STARTTLS needed
192 }
193 SmtpCommandResult::Quit => {
194 info!("Client quit.");
195 return Ok(()); // Clean exit for this async block
196 }
197 SmtpCommandResult::MailFrom(email) => {
198 sender = email;
199 // State is updated internally by process_command
200 },
201 SmtpCommandResult::RcptTo(email) => {
202 let received_email = email; // Rename for clarity
203 // Validate recipient against the list of target emails (case-insensitive).
204 let received_email_lower = received_email.to_lowercase();
205 if target_emails.iter().any(|target| target.to_lowercase() == received_email_lower) {
206 // Store the *actual* accepted recipient address (preserving case)
207 accepted_recipient = received_email;
208 initial_protocol.write_line("250 OK").await?;
209 // State is updated internally by process_command
210 } else {
211 // Reject if not in the list.
212 initial_protocol.write_line("550 No such user here").await?;
213 // Clear any previously accepted recipient if a new, invalid one is provided.
214 accepted_recipient.clear();
215 // State remains MailFrom or RcptTo depending on previous state
216 }
217 },
218 SmtpCommandResult::DataStart => {
219 // Check if we have a sender and at least one valid recipient before accepting DATA
220 if sender.is_empty() || accepted_recipient.is_empty() {
221 warn!("DATA command received without valid MAIL FROM or RCPT TO. Rejecting.");
222 initial_protocol.write_line("503 Bad sequence of commands (MAIL FROM and RCPT TO required first)").await?;
223 // Reset state? Protocol handler might need adjustment or we reset here.
224 // For now, just continue; the protocol handler should keep state correct.
225 } else {
226 // Proceed with DATA
227 collecting_data = true;
228 email_data.clear();
229 // State is updated internally by process_command (to Data)
230 }
231 },
232 SmtpCommandResult::DataLine(line_content) => {
233 if collecting_data {
234 // Append line, handling potential dot-stuffing if necessary (basic append here)
235 email_data.push_str(&line_content);
236 email_data.push_str("\r\n"); // Re-add CRLF lost by read_line
237 } else {
238 // Should not happen if state machine is correct, but log if it does.
239 warn!("Received DataLine result when not in Data state.");
240 }
241 },
242 SmtpCommandResult::DataEnd => {
243 collecting_data = false; // Stop collecting data
244 if sender.is_empty() || accepted_recipient.is_empty() {
245 warn!("DataEnd received but sender or recipient was missing. Message likely not processed.");
246 // State is reset to Greeted internally by protocol handler
247 } else {
248 // Parse the collected email data.
249 match EmailParser::parse(email_data.as_bytes(), &header_prefixes) {
250 Ok((subject, from_name, text_body, html_body, matched_headers)) => {
251 info!("Received email from {} to {} (Subject: '{}')", sender, accepted_recipient, subject);
252 // Prepare and forward the payload.
253 let headers = if matched_headers.is_empty() { None } else { Some(matched_headers) };
254 let email_payload = EmailPayload {
255 sender: sender.clone(),
256 sender_name: from_name,
257 recipient: accepted_recipient.clone(),
258 subject,
259 body: text_body,
260 html_body,
261 headers,
262 };
263 // Spawn forwarding in a separate task to avoid blocking the SMTP loop?
264 // For now, await directly. Consider spawning if webhook is slow.
265 if let Err(e) = webhook_client.forward_email(email_payload).await {
266 error!("Failed to forward email from {}: {:#}", sender, e);
267 // Log only, do not fail the SMTP session.
268 }
269 },
270 Err(e) => {
271 error!("Failed to parse email data from {}: {:#}", sender, e);
272 // Consider sending a 4xx/5xx SMTP error? Difficult after 250 OK for DATA end.
273 }
274 }
275 }
276 // Reset transaction state variables for the next potential email in the session.
277 sender.clear();
278 accepted_recipient.clear();
279 email_data.clear();
280 // State is reset to Greeted internally by protocol handler after DataEnd.
281 },
282 SmtpCommandResult::Continue => {
283 // Usually follows EHLO/HELO or error responses. Just continue the loop.
284 }
285 // STARTTLS is handled above by returning Err
286 }
287 }
288 }.await; // End of async block
289
290 // Check the result of the async block
291 match protocol_result {
292 Ok(()) => Ok(()), // Session ended normally (QUIT or EOF)
293 Err(e) if e.to_string() == "STARTTLS" => {
294 // Signal to handle STARTTLS was received
295 handle_starttls(stream, webhook_client, target_emails, header_prefixes).await
296 }
297 Err(e) => Err(e), // Propagate other errors
298 // `initial_protocol` (and its borrow of `stream`) goes out of scope here.
299 }
300}
301
302
303/// Performs the TLS handshake using a self-signed certificate.
304///
305/// If the handshake is successful, passes the encrypted stream to `handle_secure_session`.
306///
307/// # Arguments
308///
309/// * `stream` - The raw TCP stream after the `220 Go ahead` response to STARTTLS.
310/// * `webhook_client` - Shared `WebhookClient`.
311/// * `target_emails` - The configured list of target email addresses.
312///
313/// # Errors
314///
315/// Returns `Err` if certificate generation fails, TLS config creation fails, or the handshake fails.
316async fn handle_starttls(
317 stream: TcpStream, // Takes ownership of the raw TCP stream.
318 webhook_client: Arc<WebhookClient>,
319 target_emails: Vec<String>,
320 header_prefixes: Vec<String>,
321) -> Result<()> {
322 // Generate ephemeral self-signed cert for the TLS session.
323 let (cert, key) = generate_self_signed_cert()
324 .context("Failed to generate self-signed certificate for STARTTLS")?;
325
326 // Configure the rustls server-side TLS parameters.
327 let tls_config = RustlsServerConfig::builder()
328 .with_no_client_auth() // We don't require client certificates.
329 .with_single_cert(vec![cert], key) // Provide the generated cert and key.
330 .map_err(|e| anyhow::anyhow!("Failed to create rustls config: {}", e))?;
331
332 // Create a TLS acceptor based on the configuration.
333 let acceptor = TlsAcceptor::from(Arc::new(tls_config));
334
335 // Perform the TLS handshake over the existing TCP stream.
336 match acceptor.accept(stream).await {
337 Ok(tls_stream) => {
338 // Handshake successful, proceed with the secure session.
339 info!("STARTTLS handshake successful.");
340 // Pass the list of target emails and header prefixes to the secure session handler.
341 handle_secure_session(tls_stream, webhook_client, target_emails, header_prefixes).await
342 }
343 Err(e) => {
344 // Handshake failed. Log the error and return it.
345 error!("STARTTLS handshake failed: {:?}", e);
346 Err(anyhow::Error::new(e).context("STARTTLS handshake failed"))
347 }
348 }
349}
350
351/// Handles the SMTP command sequence over an established secure (TLS) connection.
352///
353/// This function is similar to the main loop in `handle_connection` but operates
354/// on the encrypted `tls_stream`. It processes MAIL FROM, RCPT TO, DATA, etc.
355///
356/// # Arguments
357///
358/// * `tls_stream` - The encrypted TLS stream after a successful handshake.
359/// * `webhook_client` - Shared `WebhookClient`.
360/// * `target_emails` - The configured list of target email addresses.
361///
362/// # Type Parameters
363///
364/// * `T` - A type that implements `AsyncRead`, `AsyncWrite`, `Unpin`, `Send`, and `'static`,
365/// representing the TLS stream type (e.g., `tokio_rustls::server::TlsStream<TcpStream>`).
366///
367/// # Errors
368///
369/// Returns `Err` if reading/writing to the TLS stream fails or if command processing fails.
370async fn handle_secure_session<T>(
371 tls_stream: T, // Generic over the actual TlsStream type.
372 webhook_client: Arc<WebhookClient>,
373 target_emails: Vec<String>,
374 header_prefixes: Vec<String>,
375) -> Result<()>
376where
377 T: AsyncRead + AsyncWrite + Unpin + Send + 'static, // Traits required by tokio::io::split and SmtpProtocol.
378{
379 // Split the TLS stream for buffered I/O.
380 let (read_half, write_half) = tokio::io::split(tls_stream);
381 let reader = tokio::io::BufReader::new(read_half);
382 let writer = tokio::io::BufWriter::new(write_half);
383 // Create a new protocol handler for the secure stream.
384 // Important: The state starts as Initial, expecting EHLO/HELO again after STARTTLS.
385 let mut protocol = SmtpProtocol::new(reader, writer);
386
387 // Variables to store state during the SMTP transaction within the secure session.
388 let mut sender = String::new();
389 let mut accepted_recipient = String::new(); // Store the specific recipient that was accepted
390 let mut email_data = String::new();
391 let mut collecting_data = false;
392
393 // Main loop for processing commands over the secure connection.
394 loop {
395 trace!("SMTP(TLS/{:?}): Waiting for command...", protocol.get_state());
396 let line = protocol.read_line().await?;
397 trace!("SMTP(TLS/{:?}): Received line (len {}): {:?}", protocol.get_state(), line.len(), line);
398
399 // Handle EOF during secure session.
400 if protocol.get_state() != SmtpState::Data && line.is_empty() {
401 info!("Connection closed by client (EOF) during secure session.");
402 break;
403 }
404
405 // Process the command using the state machine.
406 let result = protocol.process_command(&line).await?;
407
408 match result {
409 SmtpCommandResult::Quit => break,
410 SmtpCommandResult::MailFrom(email) => {
411 sender = email;
412 },
413 SmtpCommandResult::RcptTo(email) => {
414 let received_email = email; // Rename for clarity
415 // Validate recipient against the list of target emails (case-insensitive).
416 let received_email_lower = received_email.to_lowercase();
417 if target_emails.iter().any(|target| target.to_lowercase() == received_email_lower) {
418 // Store the *actual* accepted recipient address (preserving case)
419 accepted_recipient = received_email;
420 protocol.write_line("250 OK").await?;
421 } else {
422 // Reject if not in the list.
423 protocol.write_line("550 No such user here").await?;
424 // Clear any previously accepted recipient if a new, invalid one is provided.
425 accepted_recipient.clear();
426 }
427 },
428 SmtpCommandResult::DataStart => {
429 collecting_data = true;
430 email_data.clear();
431 },
432 SmtpCommandResult::DataLine(line_content) => {
433 if collecting_data {
434 email_data.push_str(&line_content);
435 email_data.push_str("\r\n");
436 }
437 },
438 SmtpCommandResult::DataEnd => {
439 collecting_data = false;
440 // Parse the collected email data with header prefix matching.
441 let (subject, from_name, text_body, html_body, matched_headers) = EmailParser::parse(email_data.as_bytes(), &header_prefixes)?;
442 info!("Received email (TLS) from {} to {} (Subject: '{}')", sender, accepted_recipient, subject);
443
444 // Prepare and forward the payload.
445 let headers = if matched_headers.is_empty() { None } else { Some(matched_headers) };
446 let email_payload = EmailPayload {
447 sender: sender.clone(),
448 sender_name: from_name,
449 recipient: accepted_recipient.clone(),
450 subject,
451 body: text_body,
452 html_body,
453 headers,
454 };
455 if let Err(e) = webhook_client.forward_email(email_payload).await {
456 error!("Failed to forward email (TLS) from {}: {:#}", sender, e);
457 // Log only, do not fail the SMTP session.
458 }
459
460 // Reset state for the next email in the session.
461 sender.clear();
462 accepted_recipient.clear();
463 email_data.clear();
464 // Protocol state is reset to Greeted internally after DataEnd.
465 },
466 SmtpCommandResult::Continue => {
467 // Usually follows EHLO/HELO after STARTTLS.
468 }
469 SmtpCommandResult::StartTls => {
470 // STARTTLS is invalid within an already established TLS session.
471 warn!("Received STARTTLS command within secure session. Sending error.");
472 protocol.write_line("503 STARTTLS already active").await?;
473 }
474 }
475 }
476 Ok(())
477}