Skip to main content

mockforge_smtp/
server.rs

1//! SMTP server implementation
2
3use crate::{SmtpConfig, SmtpSpecRegistry};
4use mockforge_core::protocol_abstraction::{
5    MessagePattern, MiddlewareChain, Protocol, ProtocolRequest, SpecRegistry,
6};
7use mockforge_core::Result;
8use std::collections::HashMap;
9use std::net::SocketAddr;
10use std::pin::Pin;
11use std::sync::Arc;
12use std::task::{Context, Poll};
13use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, ReadBuf};
14use tokio::net::{TcpListener, TcpStream};
15use tokio_rustls::{rustls, server::TlsStream, TlsAcceptor};
16use tracing::{debug, error, info, warn};
17
18/// Stream wrapper that can be either plaintext TCP or TLS-upgraded.
19/// The session handler starts each connection as `Plain` and swaps to
20/// `Tls` mid-stream when the client sends STARTTLS (RFC 3207). The
21/// enum-plus-manual-AsyncRead/Write-impl pattern is what lets us put
22/// the upgraded stream back into the existing `BufReader` without
23/// rewriting the whole session loop.
24pub enum SmtpStream {
25    /// Plaintext TCP before STARTTLS (or when STARTTLS is disabled).
26    Plain(TcpStream),
27    /// TLS-encrypted after STARTTLS completes. Boxed to keep the
28    /// enum size manageable — `TlsStream` is large.
29    Tls(Box<TlsStream<TcpStream>>),
30}
31
32impl AsyncRead for SmtpStream {
33    fn poll_read(
34        self: Pin<&mut Self>,
35        cx: &mut Context<'_>,
36        buf: &mut ReadBuf<'_>,
37    ) -> Poll<std::io::Result<()>> {
38        match self.get_mut() {
39            SmtpStream::Plain(s) => Pin::new(s).poll_read(cx, buf),
40            SmtpStream::Tls(s) => Pin::new(s.as_mut()).poll_read(cx, buf),
41        }
42    }
43}
44
45impl AsyncWrite for SmtpStream {
46    fn poll_write(
47        self: Pin<&mut Self>,
48        cx: &mut Context<'_>,
49        buf: &[u8],
50    ) -> Poll<std::io::Result<usize>> {
51        match self.get_mut() {
52            SmtpStream::Plain(s) => Pin::new(s).poll_write(cx, buf),
53            SmtpStream::Tls(s) => Pin::new(s.as_mut()).poll_write(cx, buf),
54        }
55    }
56
57    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
58        match self.get_mut() {
59            SmtpStream::Plain(s) => Pin::new(s).poll_flush(cx),
60            SmtpStream::Tls(s) => Pin::new(s.as_mut()).poll_flush(cx),
61        }
62    }
63
64    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
65        match self.get_mut() {
66            SmtpStream::Plain(s) => Pin::new(s).poll_shutdown(cx),
67            SmtpStream::Tls(s) => Pin::new(s.as_mut()).poll_shutdown(cx),
68        }
69    }
70}
71
72/// SMTP server
73pub struct SmtpServer {
74    config: SmtpConfig,
75    spec_registry: Arc<SmtpSpecRegistry>,
76    middleware_chain: Arc<MiddlewareChain>,
77    #[allow(dead_code)]
78    tls_acceptor: Option<TlsAcceptor>,
79}
80
81impl SmtpServer {
82    /// Create a new SMTP server
83    pub fn new(config: SmtpConfig, spec_registry: Arc<SmtpSpecRegistry>) -> Result<Self> {
84        let middleware_chain = Arc::new(MiddlewareChain::new());
85
86        let tls_acceptor = if config.enable_starttls {
87            Some(Self::load_tls_acceptor(&config)?)
88        } else {
89            None
90        };
91
92        Ok(Self {
93            config,
94            spec_registry,
95            middleware_chain,
96            tls_acceptor,
97        })
98    }
99
100    /// Load TLS acceptor from certificate and key files
101    fn load_tls_acceptor(config: &SmtpConfig) -> Result<TlsAcceptor> {
102        use rustls_pemfile::{certs, pkcs8_private_keys};
103        use std::fs::File;
104        use std::io::BufReader;
105
106        let cert_path = config.tls_cert_path.as_ref().ok_or_else(|| {
107            mockforge_core::Error::internal("TLS certificate path not configured")
108        })?;
109        let key_path = config.tls_key_path.as_ref().ok_or_else(|| {
110            mockforge_core::Error::internal("TLS private key path not configured")
111        })?;
112
113        // Load certificate
114        let cert_file = File::open(cert_path)?;
115        let mut cert_reader = BufReader::new(cert_file);
116        let certs: Vec<Vec<u8>> = certs(&mut cert_reader)?;
117        // Use rustls types from tokio-rustls for compatibility
118        let certs: Vec<rustls::Certificate> = certs.into_iter().map(rustls::Certificate).collect();
119
120        // Load private key
121        let key_file = File::open(key_path)?;
122        let mut key_reader = BufReader::new(key_file);
123        let mut keys: Vec<Vec<u8>> = pkcs8_private_keys(&mut key_reader)?;
124
125        if keys.is_empty() {
126            return Err(mockforge_core::Error::internal("No private keys found"));
127        }
128
129        // Use rustls from tokio-rustls which has compatible API
130        let mut server_config = rustls::ServerConfig::builder()
131            .with_safe_defaults()
132            .with_no_client_auth()
133            .with_single_cert(certs, rustls::PrivateKey(keys.remove(0)))
134            .map_err(|e| mockforge_core::Error::internal(format!("TLS config error: {}", e)))?;
135
136        server_config.alpn_protocols = vec![b"smtp".to_vec()];
137
138        Ok(TlsAcceptor::from(Arc::new(server_config)))
139    }
140
141    /// Create a new SMTP server with custom middleware
142    pub fn with_middleware(
143        config: SmtpConfig,
144        spec_registry: Arc<SmtpSpecRegistry>,
145        middleware_chain: Arc<MiddlewareChain>,
146    ) -> Result<Self> {
147        let tls_acceptor = if config.enable_starttls {
148            Some(Self::load_tls_acceptor(&config)?)
149        } else {
150            None
151        };
152
153        Ok(Self {
154            config,
155            spec_registry,
156            middleware_chain,
157            tls_acceptor,
158        })
159    }
160
161    /// Start the SMTP server
162    pub async fn start(&self) -> Result<()> {
163        let addr = format!("{}:{}", self.config.host, self.config.port);
164        let listener = TcpListener::bind(&addr).await?;
165
166        info!("SMTP server listening on {}", addr);
167
168        loop {
169            match listener.accept().await {
170                Ok((stream, peer_addr)) => {
171                    debug!("New SMTP connection from {}", peer_addr);
172
173                    let registry = self.spec_registry.clone();
174                    let middleware = self.middleware_chain.clone();
175                    let hostname = self.config.hostname.clone();
176                    let tls_acceptor = self.tls_acceptor.clone();
177
178                    tokio::spawn(async move {
179                        if let Err(e) = handle_smtp_session(
180                            SmtpStream::Plain(stream),
181                            peer_addr,
182                            registry,
183                            middleware,
184                            hostname,
185                            tls_acceptor,
186                        )
187                        .await
188                        {
189                            error!("SMTP session error from {}: {}", peer_addr, e);
190                        }
191                    });
192                }
193                Err(e) => {
194                    error!("Failed to accept SMTP connection: {}", e);
195                }
196            }
197        }
198    }
199}
200
201/// Handle a single SMTP session. Accepts an `SmtpStream` (plaintext
202/// or TLS) so a STARTTLS mid-session upgrade can swap the underlying
203/// transport without tearing down the connection.
204async fn handle_smtp_session(
205    stream: SmtpStream,
206    peer_addr: SocketAddr,
207    registry: Arc<SmtpSpecRegistry>,
208    middleware: Arc<MiddlewareChain>,
209    hostname: String,
210    tls_acceptor: Option<TlsAcceptor>,
211) -> Result<()> {
212    // Keep read + write on the same `SmtpStream` so STARTTLS can
213    // upgrade both halves atomically. Writes go through
214    // `reader.get_mut()` to avoid an extra split.
215    let mut reader = BufReader::new(stream);
216
217    // Send greeting
218    let greeting = format!("220 {} ESMTP MockForge SMTP Server\r\n", hostname);
219    reader.get_mut().write_all(greeting.as_bytes()).await?;
220
221    let mut session_state = SessionState::new();
222    // Byte-level accumulator so 8BITMIME bodies (Latin-1, UTF-8 with
223    // explicit content-transfer-encoding, etc.) round-trip verbatim.
224    // `read_line` would fail on non-UTF-8 inputs.
225    let mut line: Vec<u8> = Vec::new();
226
227    while reader.read_until(b'\n', &mut line).await? > 0 {
228        // DATA mode bypasses command parsing entirely. Without this, the
229        // first word of every body line would be matched against the SMTP
230        // verb table — any body beginning with "Hello ..." / "Data ..." /
231        // "Quit ..." / etc. was being re-interpreted as a command and
232        // dropped on the floor. Inside DATA mode, only the bare "." ends
233        // the message; everything else (headers, blank separator, body
234        // lines, verbatim) accumulates *as bytes*.
235        if session_state.in_data_mode {
236            // Strip trailing \r\n / \n. If the line is exactly "." +
237            // newline, that terminates the DATA section per RFC 5321.
238            let trimmed = strip_line_terminator(&line);
239            if trimmed == b"." {
240                session_state.in_data_mode = false;
241                let response =
242                    process_email(&session_state, &registry, &middleware, peer_addr).await?;
243                reader.get_mut().write_all(response.as_bytes()).await?;
244                session_state.reset();
245            } else {
246                session_state.data.extend_from_slice(trimmed);
247                session_state.data.push(b'\n');
248            }
249            line.clear();
250            continue;
251        }
252
253        // Outside DATA mode, SMTP verbs are ASCII-only per spec. Decode
254        // lossily so a malformed UTF-8 byte doesn't crash the session,
255        // then parse the verb table as before.
256        let as_str = String::from_utf8_lossy(&line);
257        let command = as_str.trim();
258        debug!("SMTP command from {}: {}", peer_addr, command);
259
260        // AUTH continuation: if the previous command opened an AUTH
261        // dialog, this line is base64 credential material, not an
262        // SMTP verb. Handle it here before the verb table would send
263        // back "502 Command not implemented".
264        if let Some(stage) = session_state.pending_auth.clone() {
265            handle_auth_continuation(stage, command, &mut session_state, reader.get_mut()).await?;
266            line.clear();
267            continue;
268        }
269
270        // Skip blank lines outside DATA mode — otherwise idle keep-alive
271        // newlines would reach the verb parser.
272        if command.is_empty() {
273            line.clear();
274            continue;
275        }
276
277        // STARTTLS is handled at this outer level (not in the verb
278        // table) because we need to swap out the BufReader's owned
279        // stream. Only fires when (a) client asked for STARTTLS,
280        // (b) we're still plaintext, and (c) a TLS acceptor is
281        // configured. Per RFC 3207, the upgraded session starts
282        // completely fresh — the client MUST re-EHLO.
283        if command.eq_ignore_ascii_case("STARTTLS") {
284            if !matches!(reader.get_ref(), SmtpStream::Plain(_)) {
285                // Already TLS — refuse per spec.
286                reader.get_mut().write_all(b"503 Command not allowed\r\n").await?;
287            } else if let Some(acceptor) = tls_acceptor.clone() {
288                reader.get_mut().write_all(b"220 Ready to start TLS\r\n").await?;
289                reader.get_mut().flush().await?;
290
291                let inner = reader.into_inner();
292                let tcp = match inner {
293                    SmtpStream::Plain(t) => t,
294                    SmtpStream::Tls(_) => unreachable!("checked is Plain above"),
295                };
296                let tls_stream = acceptor.accept(tcp).await.map_err(|e| {
297                    mockforge_core::Error::internal(format!("TLS accept failed: {e}"))
298                })?;
299                reader = BufReader::new(SmtpStream::Tls(Box::new(tls_stream)));
300                session_state = SessionState::new();
301                line.clear();
302                continue;
303            } else {
304                // STARTTLS requested but no cert configured.
305                reader
306                    .get_mut()
307                    .write_all(b"454 TLS not available due to temporary reason\r\n")
308                    .await?;
309            }
310            line.clear();
311            continue;
312        }
313
314        // Parse and handle SMTP command
315        match handle_smtp_command(
316            command,
317            &mut session_state,
318            reader.get_mut(),
319            &hostname,
320            &registry,
321            &middleware,
322            peer_addr,
323        )
324        .await
325        {
326            Ok(should_continue) => {
327                if !should_continue {
328                    debug!("SMTP session ended for {}", peer_addr);
329                    break;
330                }
331            }
332            Err(e) => {
333                error!("Error handling SMTP command: {}", e);
334                let error_response = "500 Internal server error\r\n";
335                reader.get_mut().write_all(error_response.as_bytes()).await?;
336            }
337        }
338
339        line.clear();
340    }
341
342    Ok(())
343}
344
345/// Decode `AUTH PLAIN`'s base64 credential blob. The SASL PLAIN
346/// payload is `\0authzid\0authcid\0passwd` (authzid may be empty).
347/// Returns the authcid (username) on success; `None` if the base64
348/// is malformed or the structure is wrong. The mock accepts any
349/// credentials — we only parse enough to pull out the username for
350/// mailbox observability.
351fn decode_plain_auth(b64: &str) -> Option<String> {
352    use base64::Engine as _;
353    let decoded = base64::engine::general_purpose::STANDARD.decode(b64.trim()).ok()?;
354    // Split on NULs. Valid forms produce 3 segments; we just need the
355    // middle one (authcid).
356    let mut parts = decoded.split(|b| *b == 0);
357    let _authzid = parts.next()?;
358    let authcid = parts.next()?;
359    let _passwd = parts.next()?;
360    Some(String::from_utf8_lossy(authcid).into_owned())
361}
362
363/// Handle the line *following* an AUTH open. Advances or completes
364/// the dialog depending on the stage we left off in.
365async fn handle_auth_continuation<W: AsyncWriteExt + Unpin>(
366    stage: AuthStage,
367    line: &str,
368    state: &mut SessionState,
369    writer: &mut W,
370) -> Result<()> {
371    use base64::Engine as _;
372    match stage {
373        AuthStage::AwaitingPlainCredentials => {
374            state.pending_auth = None;
375            match decode_plain_auth(line) {
376                Some(user) => {
377                    state.authenticated_user = Some(user);
378                    writer.write_all(b"235 2.7.0 Authentication successful\r\n").await?;
379                }
380                None => {
381                    writer.write_all(b"535 5.7.8 Authentication credentials invalid\r\n").await?;
382                }
383            }
384        }
385        AuthStage::AwaitingLoginUsername => {
386            let decoded = base64::engine::general_purpose::STANDARD
387                .decode(line.trim())
388                .ok()
389                .and_then(|b| String::from_utf8(b).ok());
390            match decoded {
391                Some(u) => {
392                    state.authenticated_user = Some(u);
393                    state.pending_auth = Some(AuthStage::AwaitingLoginPassword);
394                    // "Password:" base64 = "UGFzc3dvcmQ6".
395                    writer.write_all(b"334 UGFzc3dvcmQ6\r\n").await?;
396                }
397                None => {
398                    state.pending_auth = None;
399                    state.authenticated_user = None;
400                    writer.write_all(b"535 5.7.8 Authentication credentials invalid\r\n").await?;
401                }
402            }
403        }
404        AuthStage::AwaitingLoginPassword => {
405            state.pending_auth = None;
406            // We don't actually verify the password — accept anything
407            // that decodes as base64. On decode failure reject with
408            // 535 so clients that intentionally pass junk (negative
409            // tests) still get a sane response.
410            if base64::engine::general_purpose::STANDARD.decode(line.trim()).is_ok() {
411                writer.write_all(b"235 2.7.0 Authentication successful\r\n").await?;
412            } else {
413                state.authenticated_user = None;
414                writer.write_all(b"535 5.7.8 Authentication credentials invalid\r\n").await?;
415            }
416        }
417    }
418    Ok(())
419}
420
421/// Strip trailing `\r\n` or `\n` from an SMTP line read via
422/// `read_until(b'\n', ...)`. Keeps the rest of the bytes intact so
423/// a non-UTF-8 body round-trips verbatim.
424fn strip_line_terminator(line: &[u8]) -> &[u8] {
425    let mut end = line.len();
426    if end > 0 && line[end - 1] == b'\n' {
427        end -= 1;
428    }
429    if end > 0 && line[end - 1] == b'\r' {
430        end -= 1;
431    }
432    &line[..end]
433}
434
435/// Handle a single SMTP command
436async fn handle_smtp_command<W: AsyncWriteExt + Unpin>(
437    command: &str,
438    state: &mut SessionState,
439    writer: &mut W,
440    hostname: &str,
441    registry: &Arc<SmtpSpecRegistry>,
442    middleware: &Arc<MiddlewareChain>,
443    peer_addr: SocketAddr,
444) -> Result<bool> {
445    let parts: Vec<&str> = command.splitn(2, ' ').collect();
446    let cmd = parts[0].to_uppercase();
447
448    match cmd.as_str() {
449        "HELLO" | "EHLO" => {
450            let domain = parts.get(1).unwrap_or(&hostname);
451            let response = if cmd == "EHLO" {
452                // Advertise AUTH PLAIN LOGIN so clients that gate on the
453                // capability line (lettre with Credentials, Python's
454                // smtplib.login, etc.) actually try AUTH instead of
455                // skipping to MAIL FROM.
456                format!(
457                    "250-{} Hello {}\r\n\
458                     250-SIZE 10485760\r\n\
459                     250-8BITMIME\r\n\
460                     250-STARTTLS\r\n\
461                     250-AUTH PLAIN LOGIN\r\n\
462                     250 HELP\r\n",
463                    hostname, domain
464                )
465            } else {
466                format!("250 {} Hello {}\r\n", hostname, domain)
467            };
468            writer.write_all(response.as_bytes()).await?;
469            Ok(true)
470        }
471
472        "MAIL" => {
473            if let Some(from_part) = parts.get(1) {
474                // Parse MAIL FROM:<address>
475                let from = extract_email_address(from_part);
476                state.mail_from = Some(from);
477                writer.write_all(b"250 OK\r\n").await?;
478            } else {
479                writer.write_all(b"501 Syntax error in parameters\r\n").await?;
480            }
481            Ok(true)
482        }
483
484        "RCPT" => {
485            if let Some(to_part) = parts.get(1) {
486                // Parse RCPT TO:<address>
487                let to = extract_email_address(to_part);
488                state.rcpt_to.push(to);
489                writer.write_all(b"250 OK\r\n").await?;
490            } else {
491                writer.write_all(b"501 Syntax error in parameters\r\n").await?;
492            }
493            Ok(true)
494        }
495
496        "DATA" => {
497            writer.write_all(b"354 Start mail input; end with <CRLF>.<CRLF>\r\n").await?;
498            state.in_data_mode = true;
499            Ok(true)
500        }
501
502        "RSET" => {
503            state.reset();
504            writer.write_all(b"250 OK\r\n").await?;
505            Ok(true)
506        }
507
508        "NOOP" => {
509            writer.write_all(b"250 OK\r\n").await?;
510            Ok(true)
511        }
512
513        "QUIT" => {
514            writer.write_all(b"221 Bye\r\n").await?;
515            Ok(false) // End session
516        }
517
518        "STARTTLS" => {
519            // Mock STARTTLS implementation - accept but don't actually upgrade
520            writer.write_all(b"220 Ready to start TLS\r\n").await?;
521            Ok(true)
522        }
523
524        "AUTH" => {
525            // AUTH dispatch. Any credentials are accepted — this is a
526            // mock, not an authenticator — but we do go through the full
527            // handshake so clients that gate on the 334/235 responses
528            // work as expected.
529            //
530            // `parts` uses `splitn(2, ' ')`, so for `AUTH PLAIN <b64>`
531            // we get parts = ["AUTH", "PLAIN <b64>"]. Split the rest
532            // into its own mechanism + optional initial-response piece.
533            let rest = parts.get(1).copied().unwrap_or("");
534            let mut auth_args = rest.splitn(2, ' ');
535            let mechanism = auth_args.next().map(|s| s.to_ascii_uppercase()).unwrap_or_default();
536            let initial_response = auth_args.next().map(str::trim).filter(|s| !s.is_empty());
537            match mechanism.as_str() {
538                "PLAIN" => {
539                    // Two forms:
540                    //   "AUTH PLAIN <base64>"          — single-shot
541                    //   "AUTH PLAIN\r\n" then client → <base64>  — two-shot
542                    if let Some(b64) = initial_response {
543                        match decode_plain_auth(b64) {
544                            Some(user) => {
545                                state.authenticated_user = Some(user);
546                                writer
547                                    .write_all(b"235 2.7.0 Authentication successful\r\n")
548                                    .await?;
549                            }
550                            None => {
551                                writer
552                                    .write_all(b"535 5.7.8 Authentication credentials invalid\r\n")
553                                    .await?;
554                            }
555                        }
556                    } else {
557                        state.pending_auth = Some(AuthStage::AwaitingPlainCredentials);
558                        // 334 with empty challenge = "send me the SASL initial response".
559                        writer.write_all(b"334 \r\n").await?;
560                    }
561                    Ok(true)
562                }
563                "LOGIN" => {
564                    state.pending_auth = Some(AuthStage::AwaitingLoginUsername);
565                    // "Username:" base64-encoded = "VXNlcm5hbWU6".
566                    writer.write_all(b"334 VXNlcm5hbWU6\r\n").await?;
567                    Ok(true)
568                }
569                _ => {
570                    writer
571                        .write_all(b"504 5.5.4 Authentication mechanism not supported\r\n")
572                        .await?;
573                    Ok(true)
574                }
575            }
576        }
577
578        "HELP" => {
579            let help_text = "214-Commands supported:\r\n\
580                            214-  HELLO EHLO MAIL RCPT DATA\r\n\
581                            214-  RSET NOOP QUIT HELP STARTTLS\r\n\
582                            214 End of HELP info\r\n";
583            writer.write_all(help_text.as_bytes()).await?;
584            Ok(true)
585        }
586
587        _ => {
588            // DATA-mode lines are short-circuited before `handle_smtp_command`
589            // is called (see `handle_smtp_session`), so we only land here for
590            // genuinely unknown verbs.
591            if state.in_data_mode {
592                // Defensive fallback in case the short-circuit is ever
593                // bypassed — keep the original accumulator behavior so the
594                // session doesn't derail.
595                if command == "." {
596                    state.in_data_mode = false;
597                    let response = process_email(state, registry, middleware, peer_addr).await?;
598                    writer.write_all(response.as_bytes()).await?;
599                    state.reset();
600                } else {
601                    // Command path: the caller already trimmed the line
602                    // into a `&str`. Non-ASCII bodies are accumulated
603                    // via the byte-level DATA-mode branch in
604                    // `handle_smtp_session`; this fallback just
605                    // preserves the UTF-8 subset.
606                    state.data.extend_from_slice(command.as_bytes());
607                    state.data.push(b'\n');
608                }
609                Ok(true)
610            } else {
611                warn!("Unknown SMTP command: {}", command);
612                writer.write_all(b"502 Command not implemented\r\n").await?;
613                Ok(true)
614            }
615        }
616    }
617}
618
619/// Process received email and generate response
620async fn process_email(
621    state: &SessionState,
622    registry: &Arc<SmtpSpecRegistry>,
623    middleware: &Arc<MiddlewareChain>,
624    peer_addr: SocketAddr,
625) -> Result<String> {
626    let from = state
627        .mail_from
628        .as_ref()
629        .ok_or_else(|| mockforge_core::Error::internal("Missing MAIL FROM"))?;
630    let to = state.rcpt_to.join(", ");
631
632    // Extract subject from data
633    let subject = extract_subject(&state.data);
634
635    // Capture the delivered message in the in-memory mailbox before we
636    // touch fixture-driven response generation. The spec-registry's
637    // storage logic used to live inside `generate_mock_response` gated on
638    // `fixture.storage.save_to_mailbox`, which meant that if no fixture
639    // matched (the common case: users running the mock SMTP to inspect
640    // outgoing mail from their application) the message would silently
641    // disappear AND the server would return a 500 to the client. Capture
642    // is the primary contract of a mock SMTP; fixture matching should only
643    // affect the reply.
644    let captured = crate::fixtures::StoredEmail {
645        id: uuid::Uuid::new_v4().to_string(),
646        from: from.clone(),
647        to: state.rcpt_to.clone(),
648        subject: subject.clone(),
649        // `body` is `String`; decode lossy so callers that only want a
650        // preview of a UTF-8 message aren't forced to round-trip
651        // through `raw`. Byte-exact consumers use `raw`.
652        body: String::from_utf8_lossy(&state.data).into_owned(),
653        headers: HashMap::from([
654            ("from".to_string(), from.clone()),
655            ("to".to_string(), to.clone()),
656            ("subject".to_string(), subject.clone()),
657        ]),
658        received_at: chrono::Utc::now(),
659        raw: Some(state.data.clone()),
660    };
661    if let Err(e) = registry.store_email(captured) {
662        warn!("Failed to store email in mailbox: {}", e);
663    }
664
665    // Create protocol request
666    let mut request = ProtocolRequest {
667        protocol: Protocol::Smtp,
668        pattern: MessagePattern::OneWay,
669        operation: "SEND".to_string(),
670        path: from.clone(),
671        topic: None,
672        routing_key: None,
673        partition: None,
674        qos: None,
675        metadata: HashMap::from([
676            ("from".to_string(), from.clone()),
677            ("to".to_string(), to.clone()),
678            ("subject".to_string(), subject.clone()),
679        ]),
680        body: Some(state.data.clone()),
681        client_ip: Some(peer_addr.ip().to_string()),
682    };
683
684    // Process through middleware (may short-circuit, e.g., auth rejection)
685    if let Some(short_circuit_response) = middleware.process_request(&mut request).await? {
686        return Ok(String::from_utf8_lossy(&short_circuit_response.body).to_string());
687    }
688
689    // Ask the spec registry for a fixture-driven reply. When no fixture
690    // matches, fall back to the standard 250 OK so clients accept the
691    // message (the message has already been captured above).
692    let response = match registry.generate_mock_response(&request) {
693        Ok(mut resp) => {
694            middleware.process_response(&request, &mut resp).await?;
695            String::from_utf8_lossy(&resp.body).to_string()
696        }
697        Err(_) => "250 OK\r\n".to_string(),
698    };
699
700    Ok(response)
701}
702
703/// Extract email address from SMTP command parameter
704fn extract_email_address(param: &str) -> String {
705    // Handle formats like "FROM:<user@example.com>" or "TO:<user@example.com>"
706    if let Some(start) = param.find('<') {
707        if let Some(end) = param.find('>') {
708            return param[start + 1..end].to_string();
709        }
710    }
711
712    // If no angle brackets, just trim and return
713    param.trim().to_string()
714}
715
716/// Extract subject from email data. Takes bytes so non-UTF-8 bodies
717/// still succeed; the header zone (above the blank line) is ASCII per
718/// RFC 5322, so lossy decoding for the search is safe.
719fn extract_subject(data: &[u8]) -> String {
720    let header_text = String::from_utf8_lossy(data);
721    for line in header_text.lines() {
722        // Headers end at the first blank line; stop searching there so we
723        // don't accidentally match a "Subject:" that appears in the body.
724        if line.is_empty() {
725            break;
726        }
727        if line.to_lowercase().starts_with("subject:") {
728            return line[8..].trim().to_string();
729        }
730    }
731    String::new()
732}
733
734/// AUTH dialog state. SMTP's AUTH LOGIN / AUTH PLAIN both need
735/// multi-round-trip exchanges where the *next* line from the client
736/// is not an SMTP verb — it's base64-encoded credential material.
737/// The main read loop consults `SessionState.pending_auth` before
738/// dispatching to the verb table so that continuation data isn't
739/// misrouted into `502 Command not implemented`.
740#[derive(Debug, Clone, PartialEq, Eq)]
741#[allow(clippy::enum_variant_names)] // all stages share the `Awaiting` prefix by design.
742enum AuthStage {
743    /// Client sent `AUTH LOGIN`. Next line should be the base64
744    /// username; we respond with `334 <base64("Password:")>`.
745    AwaitingLoginUsername,
746    /// Client sent the AUTH LOGIN username. Next line should be the
747    /// base64 password; we accept it and send 235.
748    AwaitingLoginPassword,
749    /// Client sent `AUTH PLAIN` on its own. Next line should be a
750    /// single base64 blob decoding to `\0user\0pass`.
751    AwaitingPlainCredentials,
752}
753
754/// Session state for SMTP connection.
755///
756/// `data` is `Vec<u8>` (not `String`) so the DATA body survives
757/// byte-for-byte even when the sender negotiated 8BITMIME and
758/// included non-ASCII / non-UTF-8 content. The mailbox API still
759/// exposes a `String` body via lossy decoding; `raw` holds the
760/// byte-accurate version for consumers that need it.
761struct SessionState {
762    mail_from: Option<String>,
763    rcpt_to: Vec<String>,
764    data: Vec<u8>,
765    in_data_mode: bool,
766    /// Mid-AUTH dialog state, `None` when not currently negotiating.
767    pending_auth: Option<AuthStage>,
768    /// Best-effort capture of the authenticated username so callers
769    /// inspecting the mailbox can filter by who sent what. Populated
770    /// on successful AUTH PLAIN / AUTH LOGIN; cleared by RSET/reset().
771    authenticated_user: Option<String>,
772}
773
774impl SessionState {
775    fn new() -> Self {
776        Self {
777            mail_from: None,
778            rcpt_to: Vec::new(),
779            data: Vec::new(),
780            in_data_mode: false,
781            pending_auth: None,
782            authenticated_user: None,
783        }
784    }
785
786    fn reset(&mut self) {
787        self.mail_from = None;
788        self.rcpt_to.clear();
789        self.data.clear();
790        self.in_data_mode = false;
791        self.pending_auth = None;
792        // `authenticated_user` intentionally survives RSET — RFC 4954:
793        // the authenticated state is tied to the connection, not the
794        // mail transaction.
795    }
796}
797
798#[cfg(test)]
799mod tests {
800    use super::*;
801
802    #[test]
803    fn test_extract_email_address() {
804        assert_eq!(extract_email_address("FROM:<user@example.com>"), "user@example.com");
805        assert_eq!(extract_email_address("TO:<admin@test.com>"), "admin@test.com");
806        assert_eq!(extract_email_address("user@example.com"), "user@example.com");
807    }
808
809    #[test]
810    fn test_extract_email_address_whitespace() {
811        assert_eq!(extract_email_address("  user@example.com  "), "user@example.com");
812    }
813
814    #[test]
815    fn test_extract_email_address_no_brackets() {
816        assert_eq!(extract_email_address("plain@email.com"), "plain@email.com");
817    }
818
819    #[test]
820    fn test_extract_email_address_mail_from_format() {
821        assert_eq!(extract_email_address("FROM:<sender@domain.com>"), "sender@domain.com");
822    }
823
824    #[test]
825    fn test_extract_subject() {
826        let data =
827            "From: sender@example.com\nSubject: Test Email\nTo: recipient@example.com\n\nBody text";
828        assert_eq!(extract_subject(data.as_bytes()), "Test Email");
829    }
830
831    #[test]
832    fn test_extract_subject_not_found() {
833        let data = "From: sender@example.com\nTo: recipient@example.com\n\nBody text";
834        assert_eq!(extract_subject(data.as_bytes()), "");
835    }
836
837    #[test]
838    fn test_extract_subject_lowercase() {
839        let data = "subject: lowercase subject\nFrom: sender@example.com";
840        assert_eq!(extract_subject(data.as_bytes()), "lowercase subject");
841    }
842
843    #[test]
844    fn test_extract_subject_mixed_case() {
845        let data = "SUBJECT: UPPERCASE SUBJECT\nFrom: sender@example.com";
846        assert_eq!(extract_subject(data.as_bytes()), "UPPERCASE SUBJECT");
847    }
848
849    #[test]
850    fn test_session_state() {
851        let mut state = SessionState::new();
852        assert!(state.mail_from.is_none());
853        assert_eq!(state.rcpt_to.len(), 0);
854
855        state.mail_from = Some("sender@example.com".to_string());
856        state.rcpt_to.push("recipient@example.com".to_string());
857
858        state.reset();
859        assert!(state.mail_from.is_none());
860        assert_eq!(state.rcpt_to.len(), 0);
861    }
862
863    #[test]
864    fn test_session_state_new() {
865        let state = SessionState::new();
866        assert!(state.mail_from.is_none());
867        assert!(state.rcpt_to.is_empty());
868        assert!(state.data.is_empty());
869        assert!(!state.in_data_mode);
870    }
871
872    #[test]
873    fn test_session_state_reset() {
874        let mut state = SessionState::new();
875        state.mail_from = Some("test@example.com".to_string());
876        state.rcpt_to.push("recipient1@example.com".to_string());
877        state.rcpt_to.push("recipient2@example.com".to_string());
878        state.data = b"Email body content".to_vec();
879        state.in_data_mode = true;
880
881        state.reset();
882
883        assert!(state.mail_from.is_none());
884        assert!(state.rcpt_to.is_empty());
885        assert!(state.data.is_empty());
886        assert!(!state.in_data_mode);
887    }
888
889    #[test]
890    fn test_session_state_multiple_recipients() {
891        let mut state = SessionState::new();
892        state.rcpt_to.push("a@example.com".to_string());
893        state.rcpt_to.push("b@example.com".to_string());
894        state.rcpt_to.push("c@example.com".to_string());
895        assert_eq!(state.rcpt_to.len(), 3);
896    }
897
898    #[test]
899    fn test_session_state_data_accumulation() {
900        let mut state = SessionState::new();
901        state.data.extend_from_slice(b"Line 1\n");
902        state.data.extend_from_slice(b"Line 2\n");
903        state.data.extend_from_slice(b"Line 3\n");
904        assert_eq!(state.data, b"Line 1\nLine 2\nLine 3\n");
905    }
906
907    #[test]
908    fn test_strip_line_terminator() {
909        assert_eq!(strip_line_terminator(b"hello\r\n"), b"hello");
910        assert_eq!(strip_line_terminator(b"hello\n"), b"hello");
911        assert_eq!(strip_line_terminator(b"hello"), b"hello");
912        assert_eq!(strip_line_terminator(b""), b"");
913        // Non-UTF-8 bytes survive intact through the strip.
914        assert_eq!(strip_line_terminator(b"\xff\xfe\r\n"), b"\xff\xfe");
915    }
916
917    #[test]
918    fn test_extract_subject_from_bytes_with_non_utf8_body() {
919        let mut data = Vec::new();
920        data.extend_from_slice(b"From: a@example.test\r\n");
921        data.extend_from_slice(b"Subject: 8BITMIME body below\r\n");
922        data.extend_from_slice(b"\r\n");
923        data.extend_from_slice(&[0xff, 0xfe, 0xfd]); // garbage bytes
924        assert_eq!(extract_subject(&data), "8BITMIME body below");
925    }
926
927    #[tokio::test]
928    async fn test_smtp_server_new() {
929        let config = SmtpConfig::default();
930        let registry = Arc::new(SmtpSpecRegistry::new());
931        let server = SmtpServer::new(config, registry);
932        assert!(server.is_ok());
933    }
934
935    #[tokio::test]
936    async fn test_smtp_server_with_middleware() {
937        let config = SmtpConfig::default();
938        let registry = Arc::new(SmtpSpecRegistry::new());
939        let middleware = Arc::new(MiddlewareChain::new());
940        let server = SmtpServer::with_middleware(config, registry, middleware);
941        assert!(server.is_ok());
942    }
943}