mail_laser/smtp/
mod.rs

1//! Handles the primary SMTP server logic, including connection handling,
2//! optional STARTTLS negotiation, command processing via `smtp_protocol`,
3//! email parsing via `email_parser`, and initiating webhook forwarding.
4
5mod email_parser;
6mod smtp_protocol;
7
8use std::sync::Arc;
9use anyhow::{Result, Context};
10use log::{info, error, trace, warn};
11use tokio::net::{TcpListener, TcpStream};
12use tokio::io::{AsyncRead, AsyncWrite}; // Required for generic TlsStream handling
13use crate::config::Config;
14use crate::webhook::{WebhookClient, EmailPayload};
15use smtp_protocol::{SmtpProtocol, SmtpCommandResult, SmtpState};
16use email_parser::EmailParser;
17
18// TLS related imports
19use rustls::ServerConfig as RustlsServerConfig;
20use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
21use tokio_rustls::TlsAcceptor;
22use rcgen::{generate_simple_self_signed, CertifiedKey};
23
24/// Represents the main SMTP server instance.
25///
26/// Holds the application configuration and a shared `WebhookClient` instance
27/// used by connection handlers to forward processed emails.
28pub struct Server {
29    config: Config,
30    webhook_client: Arc<WebhookClient>, // Arc allows safe sharing across async tasks.
31}
32
33impl Server {
34    /// Creates a new SMTP `Server` instance.
35    ///
36    /// Initializes the shared `WebhookClient`.
37    ///
38    /// # Arguments
39    ///
40    /// * `config` - The application configuration.
41    pub fn new(config: Config) -> Self {
42        // Initialize the webhook client; this might panic if certs can't load.
43        let webhook_client = Arc::new(WebhookClient::new(config.clone()));
44        Server {
45            config,
46            webhook_client,
47        }
48    }
49
50    /// Runs the main SMTP server loop.
51    ///
52    /// Binds to the configured SMTP address and port, then enters an infinite loop
53    /// accepting incoming TCP connections. Each connection is handled in a separate
54    /// Tokio task via `handle_connection`.
55    ///
56    /// # Errors
57    ///
58    /// Returns an `Err` if the server fails to bind to the specified address and port.
59    /// Errors during connection acceptance or handling are logged but do not terminate the server loop.
60    pub async fn run(&self) -> Result<()> {
61        let addr = format!("{}:{}", self.config.smtp_bind_address, self.config.smtp_port);
62        // Attempt to bind the TCP listener to the configured address.
63        let listener = TcpListener::bind(&addr).await
64            .with_context(|| format!("Failed to bind SMTP server to {}", addr))?;
65        info!("SMTP server listening on {}", addr);
66
67        // Main server loop: continuously accept incoming connections.
68        loop {
69            match listener.accept().await {
70                Ok((stream, remote_addr)) => {
71                    info!("New connection from: {}", remote_addr);
72                    // Clone Arcs for the new task. Cloning Arc is cheap.
73                    let webhook_client = Arc::clone(&self.webhook_client);
74                    // Clone the Vec of target emails for the new task.
75                    let target_emails = self.config.target_emails.clone();
76                    // Spawn a dedicated asynchronous task for each connection.
77                    tokio::spawn(async move {
78                        if let Err(e) = handle_connection(stream, webhook_client, target_emails).await {
79                            // Log errors from individual connection handlers.
80                            // Using {:#?} includes the error source/context from anyhow.
81                            error!("Error handling SMTP connection from {}: {:#?}", remote_addr, e);
82                        }
83                    });
84                }
85                Err(e) => {
86                    // Log errors encountered during connection acceptance but continue loop.
87                    error!("Error accepting connection: {:?}", e);
88                }
89            }
90        }
91        // This loop is infinite, so Ok(()) is never reached in normal operation.
92    }
93}
94
95/// Generates a self-signed TLS certificate and private key using `rcgen`.
96///
97/// Used for establishing TLS sessions via STARTTLS when no certificate is configured.
98/// Note: Self-signed certificates are generally not trusted by clients without manual configuration.
99///
100/// # Returns
101///
102/// A `Result` containing a tuple of the certificate and private key in DER format.
103///
104/// # Errors
105///
106/// Returns an `Err` if certificate generation fails.
107fn generate_self_signed_cert() -> Result<(CertificateDer<'static>, PrivateKeyDer<'static>)> {
108    // Define Subject Alternative Names (SANs) - typically includes domains/IPs the cert is valid for.
109    let subject_alt_names = vec!["localhost".to_string()]; // Example SAN
110
111    // Generate the certificate and key pair.
112    let certified_key: CertifiedKey = generate_simple_self_signed(subject_alt_names)
113        .context("Failed to generate self-signed certificate using rcgen")?;
114
115    // Extract the certificate and key in DER format required by rustls.
116    let cert_der = certified_key.cert.der().to_vec(); // Clone bytes needed for owned CertificateDer.
117    let key_der = certified_key.key_pair.serialize_der(); // Key in PKCS#8 format.
118
119    Ok((
120        CertificateDer::from(cert_der),
121        PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(key_der)) // Assume PKCS#8 format from rcgen.
122    ))
123}
124
125
126/// Handles the initial phase of a new client connection, including potential STARTTLS negotiation.
127///
128/// Sends the initial greeting and processes commands like EHLO/HELO and STARTTLS.
129/// If STARTTLS is successfully negotiated, it passes the upgraded TLS stream to `handle_secure_session`.
130///
131/// # Arguments
132///
133/// * `stream` - The raw TCP stream from the accepted connection.
134/// * `webhook_client` - Shared `WebhookClient`.
135/// * `target_emails` - The configured list of target email addresses.
136///
137/// # Errors
138///
139/// Returns `Err` if initial greeting fails, reading/processing initial commands fails,
140/// or if the STARTTLS handshake fails.
141async fn handle_connection(
142    mut stream: TcpStream, // Mutable ownership needed for potential TLS upgrade.
143    webhook_client: Arc<WebhookClient>,
144    target_emails: Vec<String>, // Changed from single String to Vec<String>
145) -> Result<()> {
146    // Scope for the initial, non-TLS protocol handler.
147    // We temporarily split the stream to use BufReader/BufWriter.
148    // The split borrows `stream` mutably. When the scope ends, the borrow ends,
149    // and we regain full ownership of `stream` for potential TLS upgrade.
150    {
151        let (read_half, write_half) = tokio::io::split(&mut stream);
152        let reader = tokio::io::BufReader::new(read_half);
153        let writer = tokio::io::BufWriter::new(write_half);
154        let mut initial_protocol = SmtpProtocol::new(reader, writer);
155
156        // Send the initial 220 greeting.
157        initial_protocol.send_greeting().await?;
158
159        // Process initial commands (EHLO/HELO, STARTTLS, QUIT).
160        loop {
161            trace!("SMTP(Initial): Waiting for command...");
162            let line = initial_protocol.read_line().await?;
163            trace!("SMTP(Initial): Received line (len {}): {:?}", line.len(), line);
164
165            // Handle EOF during initial phase.
166            if line.is_empty() {
167                 info!("Connection closed by client (EOF) during initial phase.");
168                 return Ok(()); // Clean exit.
169            }
170
171            // Process the command using the state machine.
172            let result = initial_protocol.process_command(&line).await?;
173
174            match result {
175                SmtpCommandResult::StartTls => {
176                    // Client requested TLS upgrade.
177                    info!("Client initiated STARTTLS. Proceeding with handshake.");
178                    // Drop the initial protocol handler's borrow of the stream
179                    // before passing ownership of the stream to handle_starttls.
180                    drop(initial_protocol);
181                    // Perform TLS handshake and handle the rest of the session.
182                    return handle_starttls(stream, webhook_client, target_emails).await;
183                }
184                SmtpCommandResult::Quit => {
185                    // Client quit before TLS or sending mail.
186                    info!("Client quit during initial phase.");
187                    return Ok(()); // Clean exit.
188                }
189                SmtpCommandResult::Continue => {
190                    // Typically follows EHLO/HELO. State should be Greeted.
191                    // Continue waiting for the next command (e.g., STARTTLS or MAIL FROM).
192                    if initial_protocol.get_state() == SmtpState::Greeted {
193                        continue;
194                    } else {
195                        // This state indicates an unexpected sequence, likely an error response was sent.
196                        warn!("Unexpected state {:?} after Continue result in initial phase.", initial_protocol.get_state());
197                        continue; // Continue waiting for client reaction.
198                    }
199                }
200                 _ => {
201                    // Any other command result (MAIL FROM, RCPT TO, etc.) is invalid here.
202                    // The protocol handler should have sent a 5xx error.
203                    trace!("Received unexpected command result {:?} in initial phase.", result);
204                    // Continue loop, wait for client reaction (e.g., QUIT).
205                    continue;
206                 }
207            }
208        }
209        // `initial_protocol` (and its borrow of `stream`) goes out of scope here.
210    }
211
212    // The loop above will only exit via `return` statements within its arms
213    // (e.g., on STARTTLS, QUIT, or EOF/error). Therefore, code here is unreachable.
214    // The connection closes when the function returns or the task ends.
215}
216
217
218/// Performs the TLS handshake using a self-signed certificate.
219///
220/// If the handshake is successful, passes the encrypted stream to `handle_secure_session`.
221///
222/// # Arguments
223///
224/// * `stream` - The raw TCP stream after the `220 Go ahead` response to STARTTLS.
225/// * `webhook_client` - Shared `WebhookClient`.
226/// * `target_emails` - The configured list of target email addresses.
227///
228/// # Errors
229///
230/// Returns `Err` if certificate generation fails, TLS config creation fails, or the handshake fails.
231async fn handle_starttls(
232    stream: TcpStream, // Takes ownership of the raw TCP stream.
233    webhook_client: Arc<WebhookClient>,
234    target_emails: Vec<String>, // Changed from single String to Vec<String>
235) -> Result<()> {
236    // Generate ephemeral self-signed cert for the TLS session.
237    let (cert, key) = generate_self_signed_cert()
238        .context("Failed to generate self-signed certificate for STARTTLS")?;
239
240    // Configure the rustls server-side TLS parameters.
241    let tls_config = RustlsServerConfig::builder()
242        .with_no_client_auth() // We don't require client certificates.
243        .with_single_cert(vec![cert], key) // Provide the generated cert and key.
244        .map_err(|e| anyhow::anyhow!("Failed to create rustls config: {}", e))?;
245
246    // Create a TLS acceptor based on the configuration.
247    let acceptor = TlsAcceptor::from(Arc::new(tls_config));
248
249    // Perform the TLS handshake over the existing TCP stream.
250    match acceptor.accept(stream).await {
251        Ok(tls_stream) => {
252            // Handshake successful, proceed with the secure session.
253            info!("STARTTLS handshake successful.");
254            // Pass the list of target emails to the secure session handler.
255            handle_secure_session(tls_stream, webhook_client, target_emails).await
256        }
257        Err(e) => {
258            // Handshake failed. Log the error and return it.
259            error!("STARTTLS handshake failed: {:?}", e);
260            Err(anyhow::Error::new(e).context("STARTTLS handshake failed"))
261        }
262    }
263}
264
265/// Handles the SMTP command sequence over an established secure (TLS) connection.
266///
267/// This function is similar to the main loop in `handle_connection` but operates
268/// on the encrypted `tls_stream`. It processes MAIL FROM, RCPT TO, DATA, etc.
269///
270/// # Arguments
271///
272/// * `tls_stream` - The encrypted TLS stream after a successful handshake.
273/// * `webhook_client` - Shared `WebhookClient`.
274/// * `target_emails` - The configured list of target email addresses.
275///
276/// # Type Parameters
277///
278/// * `T` - A type that implements `AsyncRead`, `AsyncWrite`, `Unpin`, `Send`, and `'static`,
279///   representing the TLS stream type (e.g., `tokio_rustls::server::TlsStream<TcpStream>`).
280///
281/// # Errors
282///
283/// Returns `Err` if reading/writing to the TLS stream fails or if command processing fails.
284async fn handle_secure_session<T>(
285    tls_stream: T, // Generic over the actual TlsStream type.
286    webhook_client: Arc<WebhookClient>,
287    target_emails: Vec<String>, // Changed from single String to Vec<String>
288) -> Result<()>
289where
290    T: AsyncRead + AsyncWrite + Unpin + Send + 'static, // Traits required by tokio::io::split and SmtpProtocol.
291{
292    // Split the TLS stream for buffered I/O.
293    let (read_half, write_half) = tokio::io::split(tls_stream);
294    let reader = tokio::io::BufReader::new(read_half);
295    let writer = tokio::io::BufWriter::new(write_half);
296    // Create a new protocol handler for the secure stream.
297    // Important: The state starts as Initial, expecting EHLO/HELO again after STARTTLS.
298    let mut protocol = SmtpProtocol::new(reader, writer);
299
300    // Variables to store state during the SMTP transaction within the secure session.
301    let mut sender = String::new();
302    let mut accepted_recipient = String::new(); // Store the specific recipient that was accepted
303    let mut email_data = String::new();
304    let mut collecting_data = false;
305
306    // Main loop for processing commands over the secure connection.
307    loop {
308        trace!("SMTP(TLS/{:?}): Waiting for command...", protocol.get_state());
309        let line = protocol.read_line().await?;
310        trace!("SMTP(TLS/{:?}): Received line (len {}): {:?}", protocol.get_state(), line.len(), line);
311
312        // Handle EOF during secure session.
313        if protocol.get_state() != SmtpState::Data && line.is_empty() {
314             info!("Connection closed by client (EOF) during secure session.");
315             break;
316        }
317
318        // Process the command using the state machine.
319        let result = protocol.process_command(&line).await?;
320
321        match result {
322            SmtpCommandResult::Quit => break,
323            SmtpCommandResult::MailFrom(email) => {
324                sender = email;
325            },
326            SmtpCommandResult::RcptTo(email) => {
327                let received_email = email; // Rename for clarity
328                // Validate recipient against the list of target emails (case-insensitive).
329                let received_email_lower = received_email.to_lowercase();
330                if target_emails.iter().any(|target| target.to_lowercase() == received_email_lower) {
331                    // Store the *actual* accepted recipient address (preserving case)
332                    accepted_recipient = received_email;
333                    protocol.write_line("250 OK").await?;
334                } else {
335                    // Reject if not in the list.
336                    protocol.write_line("550 No such user here").await?;
337                    // Clear any previously accepted recipient if a new, invalid one is provided.
338                    accepted_recipient.clear();
339                }
340            },
341            SmtpCommandResult::DataStart => {
342                collecting_data = true;
343                email_data.clear();
344            },
345            SmtpCommandResult::DataLine(line_content) => {
346                if collecting_data {
347                    email_data.push_str(&line_content);
348                    email_data.push_str("\r\n");
349                }
350            },
351            SmtpCommandResult::DataEnd => {
352                collecting_data = false;
353                // Parse the collected email data.
354                // Parse returns (subject, text_body, html_body) now
355                // Pass email_data as bytes to the new parser signature
356                let (subject, text_body, html_body) = EmailParser::parse(email_data.as_bytes())?;
357                // Remove duplicate parse call from previous diff attempt
358                info!("Received email (TLS) from {} to {} (Subject: '{}')", sender, accepted_recipient, subject);
359
360                // Prepare and forward the payload.
361                // Prepare and forward the payload.
362                let email_payload = EmailPayload {
363                    sender: sender.clone(),
364                    recipient: accepted_recipient.clone(),
365                    subject, // Use the parsed subject
366                    body: text_body, // Use the parsed text_body
367                    html_body, // Use the parsed html_body
368                };
369                if let Err(e) = webhook_client.forward_email(email_payload).await {
370                    error!("Failed to forward email (TLS) from {}: {:#}", sender, e);
371                    // Log only, do not fail the SMTP session.
372                }
373
374                // Reset state for the next email in the session.
375                sender.clear();
376                accepted_recipient.clear();
377                email_data.clear();
378                // Protocol state is reset to Greeted internally after DataEnd.
379            },
380            SmtpCommandResult::Continue => {
381                // Usually follows EHLO/HELO after STARTTLS.
382            }
383            SmtpCommandResult::StartTls => {
384                // STARTTLS is invalid within an already established TLS session.
385                warn!("Received STARTTLS command within secure session. Sending error.");
386                protocol.write_line("503 STARTTLS already active").await?;
387            }
388        }
389    }
390    Ok(())
391}