Skip to main content

mail_laser/smtp/
mod.rs

1//! Handles the primary SMTP server logic, including connection handling,
2//! optional STARTTLS negotiation, command processing via `smtp_protocol`,
3//! email parsing via `email_parser`, and initiating webhook forwarding.
4
5mod email_parser;
6mod smtp_protocol;
7
8use std::sync::Arc;
9use anyhow::{Result, Context};
10use log::{info, error, trace, warn};
11use tokio::net::{TcpListener, TcpStream};
12use tokio::io::{AsyncRead, AsyncWrite}; // Required for generic TlsStream handling
13use crate::config::Config;
14use crate::webhook::{WebhookClient, EmailPayload};
15use smtp_protocol::{SmtpProtocol, SmtpCommandResult, SmtpState};
16use email_parser::EmailParser;
17
18// TLS related imports
19use rustls::ServerConfig as RustlsServerConfig;
20use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
21use tokio_rustls::TlsAcceptor;
22use rcgen::generate_simple_self_signed;
23
24/// Represents the main SMTP server instance.
25///
26/// Holds the application configuration and a shared `WebhookClient` instance
27/// used by connection handlers to forward processed emails.
28pub struct Server {
29    config: Config,
30    webhook_client: Arc<WebhookClient>, // Arc allows safe sharing across async tasks.
31}
32
33impl Server {
34    /// Creates a new SMTP `Server` instance.
35    ///
36    /// Initializes the shared `WebhookClient`.
37    ///
38    /// # Arguments
39    ///
40    /// * `config` - The application configuration.
41    pub fn new(config: Config) -> Self {
42        // Initialize the webhook client; this might panic if certs can't load.
43        let webhook_client = Arc::new(WebhookClient::new(config.clone()));
44        Server {
45            config,
46            webhook_client,
47        }
48    }
49
50    /// Runs the main SMTP server loop.
51    ///
52    /// Binds to the configured SMTP address and port, then enters an infinite loop
53    /// accepting incoming TCP connections. Each connection is handled in a separate
54    /// Tokio task via `handle_connection`.
55    ///
56    /// # Errors
57    ///
58    /// Returns an `Err` if the server fails to bind to the specified address and port.
59    /// Errors during connection acceptance or handling are logged but do not terminate the server loop.
60    pub async fn run(&self) -> Result<()> {
61        let addr = format!("{}:{}", self.config.smtp_bind_address, self.config.smtp_port);
62        // Attempt to bind the TCP listener to the configured address.
63        let listener = TcpListener::bind(&addr).await
64            .with_context(|| format!("Failed to bind SMTP server to {}", addr))?;
65        info!("SMTP server listening on {}", addr);
66
67        // Main server loop: continuously accept incoming connections.
68        loop {
69            match listener.accept().await {
70                Ok((stream, remote_addr)) => {
71                    info!("New connection from: {}", remote_addr);
72                    // Clone Arcs for the new task. Cloning Arc is cheap.
73                    let webhook_client = Arc::clone(&self.webhook_client);
74                    // Clone the Vec of target emails for the new task.
75                    let target_emails = self.config.target_emails.clone();
76                    let header_prefixes = self.config.header_prefixes.clone();
77                    // Spawn a dedicated asynchronous task for each connection.
78                    tokio::spawn(async move {
79                        if let Err(e) = handle_connection(stream, webhook_client, target_emails, header_prefixes).await {
80                            // Log errors from individual connection handlers.
81                            // Using {:#?} includes the error source/context from anyhow.
82                            error!("Error handling SMTP connection from {}: {:#?}", remote_addr, e);
83                        }
84                    });
85                }
86                Err(e) => {
87                    // Log errors encountered during connection acceptance but continue loop.
88                    error!("Error accepting connection: {:?}", e);
89                }
90            }
91        }
92        // This loop is infinite, so Ok(()) is never reached in normal operation.
93    }
94}
95
96/// Generates a self-signed TLS certificate and private key using `rcgen`.
97///
98/// Used for establishing TLS sessions via STARTTLS when no certificate is configured.
99/// Note: Self-signed certificates are generally not trusted by clients without manual configuration.
100///
101/// # Returns
102///
103/// A `Result` containing a tuple of the certificate and private key in DER format.
104///
105/// # Errors
106///
107/// Returns an `Err` if certificate generation fails.
108fn generate_self_signed_cert() -> Result<(CertificateDer<'static>, PrivateKeyDer<'static>)> {
109    // Define Subject Alternative Names (SANs) - typically includes domains/IPs the cert is valid for.
110    let subject_alt_names = vec!["localhost".to_string()]; // Example SAN
111
112    // Generate the certificate and key pair.
113    let certified_key = generate_simple_self_signed(subject_alt_names)
114        .context("Failed to generate self-signed certificate using rcgen")?;
115
116    // Extract the certificate and key in DER format required by rustls.
117    let cert_der = certified_key.cert.der().to_vec(); // Clone bytes needed for owned CertificateDer.
118    let key_der = certified_key.signing_key.serialize_der(); // Key in PKCS#8 format.
119
120    Ok((
121        CertificateDer::from(cert_der),
122        PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(key_der)) // Assume PKCS#8 format from rcgen.
123    ))
124}
125
126
127/// Handles the initial phase of a new client connection, including potential STARTTLS negotiation.
128///
129/// Sends the initial greeting and processes commands like EHLO/HELO and STARTTLS.
130/// If STARTTLS is successfully negotiated, it passes the upgraded TLS stream to `handle_secure_session`.
131///
132/// # Arguments
133///
134/// * `stream` - The raw TCP stream from the accepted connection.
135/// * `webhook_client` - Shared `WebhookClient`.
136/// * `target_emails` - The configured list of target email addresses.
137///
138/// # Errors
139///
140/// Returns `Err` if initial greeting fails, reading/processing initial commands fails,
141/// or if the STARTTLS handshake fails.
142async fn handle_connection(
143    mut stream: TcpStream, // Mutable ownership needed for potential TLS upgrade.
144    webhook_client: Arc<WebhookClient>,
145    target_emails: Vec<String>,
146    header_prefixes: Vec<String>,
147) -> Result<()> {
148    // Variables to store state during the SMTP transaction.
149    // These are needed here because this function handles the full non-TLS flow.
150    let mut sender = String::new();
151    let mut accepted_recipient = String::new();
152    let mut email_data = String::new();
153    let mut collecting_data = false;
154
155    // Scope for the initial, non-TLS protocol handler.
156    // We temporarily split the stream to use BufReader/BufWriter.
157    // The split borrows `stream` mutably. When the scope ends, the borrow ends,
158    // and we regain full ownership of `stream` for potential TLS upgrade.
159    // Scope for the protocol handler using the stream's reader/writer halves.
160    // We need to ensure this scope ends cleanly or the stream is handled correctly on STARTTLS.
161    // Let's restructure slightly to avoid dropping the protocol handler too early if no STARTTLS.
162    let protocol_result = async {
163        let (read_half, write_half) = tokio::io::split(&mut stream);
164        let reader = tokio::io::BufReader::new(read_half);
165        let writer = tokio::io::BufWriter::new(write_half);
166        let mut initial_protocol = SmtpProtocol::new(reader, writer);
167
168        // Send the initial 220 greeting.
169        initial_protocol.send_greeting().await?;
170
171        // Process commands for the entire session (unless STARTTLS happens).
172        loop {
173            trace!("SMTP({:?}): Waiting for command...", initial_protocol.get_state());
174            let line = initial_protocol.read_line().await?;
175            trace!("SMTP({:?}): Received line (len {}): {:?}", initial_protocol.get_state(), line.len(), line);
176
177            // Handle EOF, except during DATA phase.
178            if initial_protocol.get_state() != SmtpState::Data && line.is_empty() {
179                 info!("Connection closed by client (EOF). State: {:?}", initial_protocol.get_state());
180                 return Ok(()); // Clean exit for this async block
181            }
182
183            // Process the command using the state machine.
184            let result = initial_protocol.process_command(&line).await?;
185
186            match result {
187                SmtpCommandResult::StartTls => {
188                    // Client requested TLS upgrade.
189                    info!("Client initiated STARTTLS. Proceeding with handshake.");
190                    // Signal that TLS should be handled outside this block.
191                    return Err(anyhow::anyhow!("STARTTLS")); // Use error to signal STARTTLS needed
192                }
193                SmtpCommandResult::Quit => {
194                    info!("Client quit.");
195                    return Ok(()); // Clean exit for this async block
196                }
197                SmtpCommandResult::MailFrom(email) => {
198                    sender = email;
199                    // State is updated internally by process_command
200                },
201                SmtpCommandResult::RcptTo(email) => {
202                    let received_email = email; // Rename for clarity
203                    // Validate recipient against the list of target emails (case-insensitive).
204                    let received_email_lower = received_email.to_lowercase();
205                    if target_emails.iter().any(|target| target.to_lowercase() == received_email_lower) {
206                        // Store the *actual* accepted recipient address (preserving case)
207                        accepted_recipient = received_email;
208                        initial_protocol.write_line("250 OK").await?;
209                        // State is updated internally by process_command
210                    } else {
211                        // Reject if not in the list.
212                        initial_protocol.write_line("550 No such user here").await?;
213                        // Clear any previously accepted recipient if a new, invalid one is provided.
214                        accepted_recipient.clear();
215                        // State remains MailFrom or RcptTo depending on previous state
216                    }
217                },
218                SmtpCommandResult::DataStart => {
219                    // Check if we have a sender and at least one valid recipient before accepting DATA
220                    if sender.is_empty() || accepted_recipient.is_empty() {
221                         warn!("DATA command received without valid MAIL FROM or RCPT TO. Rejecting.");
222                         initial_protocol.write_line("503 Bad sequence of commands (MAIL FROM and RCPT TO required first)").await?;
223                         // Reset state? Protocol handler might need adjustment or we reset here.
224                         // For now, just continue; the protocol handler should keep state correct.
225                    } else {
226                        // Proceed with DATA
227                        collecting_data = true;
228                        email_data.clear();
229                        // State is updated internally by process_command (to Data)
230                    }
231                },
232                SmtpCommandResult::DataLine(line_content) => {
233                    if collecting_data {
234                        // Append line, handling potential dot-stuffing if necessary (basic append here)
235                        email_data.push_str(&line_content);
236                        email_data.push_str("\r\n"); // Re-add CRLF lost by read_line
237                    } else {
238                        // Should not happen if state machine is correct, but log if it does.
239                        warn!("Received DataLine result when not in Data state.");
240                    }
241                },
242                SmtpCommandResult::DataEnd => {
243                    collecting_data = false; // Stop collecting data
244                    if sender.is_empty() || accepted_recipient.is_empty() {
245                        warn!("DataEnd received but sender or recipient was missing. Message likely not processed.");
246                        // State is reset to Greeted internally by protocol handler
247                    } else {
248                        // Parse the collected email data.
249                        match EmailParser::parse(email_data.as_bytes(), &header_prefixes) {
250                            Ok((subject, from_name, text_body, html_body, matched_headers)) => {
251                                info!("Received email from {} to {} (Subject: '{}')", sender, accepted_recipient, subject);
252                                // Prepare and forward the payload.
253                                let headers = if matched_headers.is_empty() { None } else { Some(matched_headers) };
254                                let email_payload = EmailPayload {
255                                    sender: sender.clone(),
256                                    sender_name: from_name,
257                                    recipient: accepted_recipient.clone(),
258                                    subject,
259                                    body: text_body,
260                                    html_body,
261                                    headers,
262                                };
263                                // Spawn forwarding in a separate task to avoid blocking the SMTP loop?
264                                // For now, await directly. Consider spawning if webhook is slow.
265                                if let Err(e) = webhook_client.forward_email(email_payload).await {
266                                    error!("Failed to forward email from {}: {:#}", sender, e);
267                                    // Log only, do not fail the SMTP session.
268                                }
269                            },
270                            Err(e) => {
271                                error!("Failed to parse email data from {}: {:#}", sender, e);
272                                // Consider sending a 4xx/5xx SMTP error? Difficult after 250 OK for DATA end.
273                            }
274                        }
275                    }
276                    // Reset transaction state variables for the next potential email in the session.
277                    sender.clear();
278                    accepted_recipient.clear();
279                    email_data.clear();
280                    // State is reset to Greeted internally by protocol handler after DataEnd.
281                },
282                SmtpCommandResult::Continue => {
283                    // Usually follows EHLO/HELO or error responses. Just continue the loop.
284                }
285                // STARTTLS is handled above by returning Err
286            }
287        }
288    }.await; // End of async block
289
290    // Check the result of the async block
291    match protocol_result {
292        Ok(()) => Ok(()), // Session ended normally (QUIT or EOF)
293        Err(e) if e.to_string() == "STARTTLS" => {
294            // Signal to handle STARTTLS was received
295            handle_starttls(stream, webhook_client, target_emails, header_prefixes).await
296        }
297        Err(e) => Err(e), // Propagate other errors
298        // `initial_protocol` (and its borrow of `stream`) goes out of scope here.
299    }
300}
301
302
303/// Performs the TLS handshake using a self-signed certificate.
304///
305/// If the handshake is successful, passes the encrypted stream to `handle_secure_session`.
306///
307/// # Arguments
308///
309/// * `stream` - The raw TCP stream after the `220 Go ahead` response to STARTTLS.
310/// * `webhook_client` - Shared `WebhookClient`.
311/// * `target_emails` - The configured list of target email addresses.
312///
313/// # Errors
314///
315/// Returns `Err` if certificate generation fails, TLS config creation fails, or the handshake fails.
316async fn handle_starttls(
317    stream: TcpStream, // Takes ownership of the raw TCP stream.
318    webhook_client: Arc<WebhookClient>,
319    target_emails: Vec<String>,
320    header_prefixes: Vec<String>,
321) -> Result<()> {
322    // Generate ephemeral self-signed cert for the TLS session.
323    let (cert, key) = generate_self_signed_cert()
324        .context("Failed to generate self-signed certificate for STARTTLS")?;
325
326    // Configure the rustls server-side TLS parameters.
327    let tls_config = RustlsServerConfig::builder()
328        .with_no_client_auth() // We don't require client certificates.
329        .with_single_cert(vec![cert], key) // Provide the generated cert and key.
330        .map_err(|e| anyhow::anyhow!("Failed to create rustls config: {}", e))?;
331
332    // Create a TLS acceptor based on the configuration.
333    let acceptor = TlsAcceptor::from(Arc::new(tls_config));
334
335    // Perform the TLS handshake over the existing TCP stream.
336    match acceptor.accept(stream).await {
337        Ok(tls_stream) => {
338            // Handshake successful, proceed with the secure session.
339            info!("STARTTLS handshake successful.");
340            // Pass the list of target emails and header prefixes to the secure session handler.
341            handle_secure_session(tls_stream, webhook_client, target_emails, header_prefixes).await
342        }
343        Err(e) => {
344            // Handshake failed. Log the error and return it.
345            error!("STARTTLS handshake failed: {:?}", e);
346            Err(anyhow::Error::new(e).context("STARTTLS handshake failed"))
347        }
348    }
349}
350
351/// Handles the SMTP command sequence over an established secure (TLS) connection.
352///
353/// This function is similar to the main loop in `handle_connection` but operates
354/// on the encrypted `tls_stream`. It processes MAIL FROM, RCPT TO, DATA, etc.
355///
356/// # Arguments
357///
358/// * `tls_stream` - The encrypted TLS stream after a successful handshake.
359/// * `webhook_client` - Shared `WebhookClient`.
360/// * `target_emails` - The configured list of target email addresses.
361///
362/// # Type Parameters
363///
364/// * `T` - A type that implements `AsyncRead`, `AsyncWrite`, `Unpin`, `Send`, and `'static`,
365///   representing the TLS stream type (e.g., `tokio_rustls::server::TlsStream<TcpStream>`).
366///
367/// # Errors
368///
369/// Returns `Err` if reading/writing to the TLS stream fails or if command processing fails.
370async fn handle_secure_session<T>(
371    tls_stream: T, // Generic over the actual TlsStream type.
372    webhook_client: Arc<WebhookClient>,
373    target_emails: Vec<String>,
374    header_prefixes: Vec<String>,
375) -> Result<()>
376where
377    T: AsyncRead + AsyncWrite + Unpin + Send + 'static, // Traits required by tokio::io::split and SmtpProtocol.
378{
379    // Split the TLS stream for buffered I/O.
380    let (read_half, write_half) = tokio::io::split(tls_stream);
381    let reader = tokio::io::BufReader::new(read_half);
382    let writer = tokio::io::BufWriter::new(write_half);
383    // Create a new protocol handler for the secure stream.
384    // Important: The state starts as Initial, expecting EHLO/HELO again after STARTTLS.
385    let mut protocol = SmtpProtocol::new(reader, writer);
386
387    // Variables to store state during the SMTP transaction within the secure session.
388    let mut sender = String::new();
389    let mut accepted_recipient = String::new(); // Store the specific recipient that was accepted
390    let mut email_data = String::new();
391    let mut collecting_data = false;
392
393    // Main loop for processing commands over the secure connection.
394    loop {
395        trace!("SMTP(TLS/{:?}): Waiting for command...", protocol.get_state());
396        let line = protocol.read_line().await?;
397        trace!("SMTP(TLS/{:?}): Received line (len {}): {:?}", protocol.get_state(), line.len(), line);
398
399        // Handle EOF during secure session.
400        if protocol.get_state() != SmtpState::Data && line.is_empty() {
401             info!("Connection closed by client (EOF) during secure session.");
402             break;
403        }
404
405        // Process the command using the state machine.
406        let result = protocol.process_command(&line).await?;
407
408        match result {
409            SmtpCommandResult::Quit => break,
410            SmtpCommandResult::MailFrom(email) => {
411                sender = email;
412            },
413            SmtpCommandResult::RcptTo(email) => {
414                let received_email = email; // Rename for clarity
415                // Validate recipient against the list of target emails (case-insensitive).
416                let received_email_lower = received_email.to_lowercase();
417                if target_emails.iter().any(|target| target.to_lowercase() == received_email_lower) {
418                    // Store the *actual* accepted recipient address (preserving case)
419                    accepted_recipient = received_email;
420                    protocol.write_line("250 OK").await?;
421                } else {
422                    // Reject if not in the list.
423                    protocol.write_line("550 No such user here").await?;
424                    // Clear any previously accepted recipient if a new, invalid one is provided.
425                    accepted_recipient.clear();
426                }
427            },
428            SmtpCommandResult::DataStart => {
429                collecting_data = true;
430                email_data.clear();
431            },
432            SmtpCommandResult::DataLine(line_content) => {
433                if collecting_data {
434                    email_data.push_str(&line_content);
435                    email_data.push_str("\r\n");
436                }
437            },
438            SmtpCommandResult::DataEnd => {
439                collecting_data = false;
440                // Parse the collected email data with header prefix matching.
441                let (subject, from_name, text_body, html_body, matched_headers) = EmailParser::parse(email_data.as_bytes(), &header_prefixes)?;
442                info!("Received email (TLS) from {} to {} (Subject: '{}')", sender, accepted_recipient, subject);
443
444                // Prepare and forward the payload.
445                let headers = if matched_headers.is_empty() { None } else { Some(matched_headers) };
446                let email_payload = EmailPayload {
447                    sender: sender.clone(),
448                    sender_name: from_name,
449                    recipient: accepted_recipient.clone(),
450                    subject,
451                    body: text_body,
452                    html_body,
453                    headers,
454                };
455                if let Err(e) = webhook_client.forward_email(email_payload).await {
456                    error!("Failed to forward email (TLS) from {}: {:#}", sender, e);
457                    // Log only, do not fail the SMTP session.
458                }
459
460                // Reset state for the next email in the session.
461                sender.clear();
462                accepted_recipient.clear();
463                email_data.clear();
464                // Protocol state is reset to Greeted internally after DataEnd.
465            },
466            SmtpCommandResult::Continue => {
467                // Usually follows EHLO/HELO after STARTTLS.
468            }
469            SmtpCommandResult::StartTls => {
470                // STARTTLS is invalid within an already established TLS session.
471                warn!("Received STARTTLS command within secure session. Sending error.");
472                protocol.write_line("503 STARTTLS already active").await?;
473            }
474        }
475    }
476    Ok(())
477}