daaki-smtp 0.2.0

An async SMTP client library
Documentation
//! SMTP/LMTP client connection.
//!
//! `SmtpConnection` handles the lifecycle of a single SMTP (RFC 5321) or
//! LMTP (RFC 2033) session: connect, EHLO/LHLO, optional STARTTLS,
//! authenticate, and send messages. Supports pipelining (RFC 1854) when
//! the server advertises it.
//!
//! All public methods take `&self`. Mutable state is protected by an
//! internal `tokio::sync::Mutex` (see [`SmtpInner`]), allowing the
//! connection to be shared across async tasks via `Arc<SmtpConnection>`.
//! The mutex guard is intentionally held for the duration of each SMTP
//! operation because SMTP is a serial protocol (RFC 5321 Section 3.1).
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use bytes::BytesMut;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio_rustls::client::TlsStream;
use tokio_rustls::TlsConnector;

use crate::codec::{decode, encode};
use crate::deliver_by::validate_deliver_by_value;
use crate::error::Error;
use crate::future_release::{
    parse_rfc3339_to_utc_key, validate_hold_for_seconds, validate_hold_until_datetime,
};
use crate::types::{
    DomainOrLiteral, EnhancedStatusCode, ForwardPath, Protocol, RecipientResult, ReversePath,
    ServerCapabilities, SmtpResponse,
};

mod auth;
mod bdat;
mod helpers;
mod lifecycle;
mod lmtp;
mod sending;
mod session;

#[cfg(test)]
#[allow(
    clippy::unwrap_used,
    clippy::expect_used,
    clippy::similar_names,
    clippy::wildcard_in_or_patterns,
    clippy::items_after_statements,
    clippy::manual_let_else,
    clippy::match_wild_err_arm
)]
#[path = "tests.rs"]
mod tests;

pub use daaki_message::TlsMode;

/// Abstraction over plain TCP and TLS streams.
enum SmtpStream {
    /// Unencrypted TCP connection.
    Plain(TcpStream),
    /// TLS-encrypted connection (boxed to avoid large enum variant).
    Tls(Box<TlsStream<TcpStream>>),
}

impl SmtpStream {
    /// Write all bytes to the stream.
    async fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
        match self {
            Self::Plain(s) => s.write_all(buf).await,
            Self::Tls(s) => s.write_all(buf).await,
        }
    }

    /// Flush the stream.
    async fn flush(&mut self) -> std::io::Result<()> {
        match self {
            Self::Plain(s) => s.flush().await,
            Self::Tls(s) => s.flush().await,
        }
    }

    /// Read bytes into buffer, returning count read.
    async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        match self {
            Self::Plain(s) => s.read(buf).await,
            Self::Tls(s) => s.read(buf).await,
        }
    }

    /// Returns `true` if the connection is TLS-encrypted.
    ///
    /// RFC 8689 Section 3: REQUIRETLS requires the session to be
    /// TLS-protected before the parameter may be used.
    fn is_tls(&self) -> bool {
        matches!(self, Self::Tls(_))
    }
}

/// Mutable state of an SMTP/LMTP connection, protected by a
/// `tokio::sync::Mutex` inside [`SmtpConnection`].
///
/// SMTP is a serial protocol (RFC 5321 Section 3.1): commands and
/// responses are strictly ordered, so concurrent access must be
/// serialised. Wrapping the mutable state in an async mutex allows
/// callers to share an `SmtpConnection` across tasks (e.g. via
/// `Arc<SmtpConnection>`) without an external lock.
struct SmtpInner {
    /// The underlying TCP or TLS stream.
    stream: SmtpStream,
    /// Read buffer for incoming data.
    read_buf: BytesMut,
    /// Server-advertised capabilities from the last EHLO/LHLO.
    capabilities: ServerCapabilities,
    /// The client domain sent in EHLO/LHLO (RFC 5321 Section 4.1.1.1).
    ///
    /// Defaults to the address-literal `"[127.0.0.1]"`. RFC 5321 Section
    /// 4.1.4 says clients that cannot determine their own FQDN SHOULD
    /// substitute an address-literal. Pre-validated by [`DomainOrLiteral`]
    /// on construction.
    ehlo_domain: DomainOrLiteral,
    /// Whether a successful AUTH exchange has completed in this session.
    ///
    /// RFC 4954 Section 3: "After an AUTH command has been successfully
    /// completed, no more AUTH commands may be issued in the same session."
    /// This flag is set to `true` after a 235 response and persists for the
    /// lifetime of the connection (RSET/EHLO do not reset it — the session
    /// continues until QUIT or connection closure).
    authenticated: bool,
    /// Whether the server has indicated it is shutting down via a 421 response.
    ///
    /// RFC 5321 Section 3.8: "If the SMTP code is 421, the server MUST close
    /// the transmission channel after sending the response." Once set, the
    /// client must not attempt further commands on this connection.
    server_shutting_down: bool,
    /// Whether the initial EHLO was rejected and the connection fell back to
    /// HELO (RFC 5321 Section 4.1.4).  When `true`, subsequent `ehlo_on_inner`
    /// calls (e.g. via `rehlo`) must send HELO instead of EHLO, because the
    /// server has already indicated it does not support ESMTP.
    helo_mode: bool,
}

impl SmtpInner {
    /// Write bytes to the stream and flush.
    async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> {
        self.stream.write_all(buf).await?;
        self.stream.flush().await?;
        Ok(())
    }

    /// Read a complete SMTP response from the stream.
    ///
    /// Accumulates lines until a final line (code followed by SP) is seen,
    /// per RFC 5321 Section 4.2.
    async fn read_response(&mut self) -> Result<SmtpResponse, Error> {
        loop {
            // Try to parse a complete response from the buffer.
            if let Some((resp, consumed)) = SmtpConnection::try_parse_response(&self.read_buf)? {
                let _ = self.read_buf.split_to(consumed);
                // RFC 5321 Section 3.8: a 421 reply means the server will
                // close the transmission channel. Mark the connection as
                // shutting down so subsequent operations fail immediately.
                if resp.code == 421 {
                    self.server_shutting_down = true;
                }
                return Ok(resp);
            }

            // RFC 5321 Section 4.5.3.1.5: enforce a response buffer limit
            // to prevent a malicious server from exhausting client memory.
            if self.read_buf.len() > SmtpConnection::MAX_RESPONSE_BUFFER {
                return Err(Error::Protocol(format!(
                    "response exceeds maximum buffer size ({} bytes) \
                     (RFC 5321 Section 4.5.3.1.5)",
                    SmtpConnection::MAX_RESPONSE_BUFFER
                )));
            }

            // Need more data.
            let mut tmp = [0u8; 4096];
            let n = self.stream.read(&mut tmp).await?;
            if n == 0 {
                return Err(Error::Closed);
            }
            self.read_buf.extend_from_slice(&tmp[..n]);
        }
    }

    /// Best-effort RSET to abort the current mail transaction.
    ///
    /// RFC 5321 Section 4.1.1.5: RSET aborts the current mail transaction,
    /// clearing the reverse-path, forward-path, and mail data buffers.
    /// Errors are intentionally ignored — the caller has a more meaningful
    /// error to return.
    async fn rset_best_effort(&mut self) {
        let mut buf = BytesMut::new();
        encode::encode_rset(&mut buf);
        if self.write_all(&buf).await.is_ok() {
            let _ = self.read_response().await;
        }
    }

    /// Best-effort QUIT to close the session gracefully.
    ///
    /// RFC 5321 Section 4.1.1.10: "The sender MUST NOT intentionally close
    /// the transmission channel until it sends a QUIT command." This helper
    /// is used in error paths during connection setup where the 220 greeting
    /// has already established a session but a subsequent step (EHLO,
    /// STARTTLS, etc.) has failed. Errors are intentionally ignored — the
    /// caller has a more meaningful error to return.
    async fn quit_best_effort(&mut self) {
        let mut buf = BytesMut::new();
        encode::encode_quit(&mut buf);
        if self.write_all(&buf).await.is_ok() {
            let _ = self.read_response().await;
        }
    }
}

/// Return the default EHLO identity for clients that have no better local name.
///
/// RFC 5321 Section 4.1.4 says clients that cannot determine a more
/// specific host identity SHOULD substitute an address-literal.
pub(super) fn default_ehlo_domain() -> Result<DomainOrLiteral, Error> {
    DomainOrLiteral::new("[127.0.0.1]")
        .map_err(|err| Error::Protocol(format!("invalid default EHLO domain: {err}")))
}

/// An SMTP/LMTP client connection.
///
/// Manages a single TCP (or TLS) connection to an SMTP or LMTP server.
/// The caller provides a timeout for each operation — there are no hardcoded
/// defaults.
///
/// All methods take `&self` rather than `&mut self`. Mutable state (the
/// TCP stream, read buffer, and capabilities) is protected by an internal
/// `tokio::sync::Mutex`, so the connection can be shared across async
/// tasks via `Arc<SmtpConnection>` without an external lock. SMTP is a
/// serial protocol (RFC 5321 Section 3.1), so the mutex serialises
/// operations correctly.
///
/// Message construction is NOT this library's responsibility. The caller
/// provides fully-formed RFC 5322 message bytes.
pub struct SmtpConnection {
    /// Mutable connection state, protected by an async mutex.
    inner: tokio::sync::Mutex<SmtpInner>,
    /// Protocol mode — SMTP or LMTP.
    protocol: Protocol,
}

// Compile-time proof that SmtpConnection is Send + Sync — required for
// sharing via `Arc<SmtpConnection>` across async tasks.
const _: fn() = || {
    fn assert_send_sync<T: Send + Sync>() {}
    assert_send_sync::<SmtpConnection>();
};

impl std::fmt::Debug for SmtpConnection {
    /// Prints connection metadata useful for logging and diagnostics,
    /// without exposing internal stream state or buffers.
    ///
    /// Uses `try_lock()` to inspect the inner state without blocking.
    /// If the mutex is currently held by another task, the transport
    /// and capabilities fields show `"<locked>"` as a fallback.
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let mut s = f.debug_struct("SmtpConnection");
        s.field("protocol", &self.protocol);

        if let Ok(inner) = self.inner.try_lock() {
            s.field("ehlo_domain", &inner.ehlo_domain);
            let transport = match &inner.stream {
                SmtpStream::Plain(_) => "plain",
                SmtpStream::Tls(_) => "tls",
            };
            s.field("transport", &transport);
            s.field("capabilities", &inner.capabilities);
        } else {
            s.field("ehlo_domain", &"<locked>");
            s.field("transport", &"<locked>");
            s.field("capabilities", &"<locked>");
        }

        s.finish_non_exhaustive()
    }
}