Skip to main content

daaki_smtp/connection/
mod.rs

1//! SMTP/LMTP client connection.
2//!
3//! `SmtpConnection` handles the lifecycle of a single SMTP (RFC 5321) or
4//! LMTP (RFC 2033) session: connect, EHLO/LHLO, optional STARTTLS,
5//! authenticate, and send messages. Supports pipelining (RFC 1854) when
6//! the server advertises it.
7//!
8//! All public methods take `&self`. Mutable state is protected by an
9//! internal `tokio::sync::Mutex` (see [`SmtpInner`]), allowing the
10//! connection to be shared across async tasks via `Arc<SmtpConnection>`.
11//! The mutex guard is intentionally held for the duration of each SMTP
12//! operation because SMTP is a serial protocol (RFC 5321 Section 3.1).
13use std::sync::Arc;
14use std::time::{Duration, SystemTime, UNIX_EPOCH};
15
16use bytes::BytesMut;
17use tokio::io::{AsyncReadExt, AsyncWriteExt};
18use tokio::net::TcpStream;
19use tokio_rustls::client::TlsStream;
20use tokio_rustls::TlsConnector;
21
22use crate::codec::{decode, encode};
23use crate::deliver_by::validate_deliver_by_value;
24use crate::error::Error;
25use crate::future_release::{
26    parse_rfc3339_to_utc_key, validate_hold_for_seconds, validate_hold_until_datetime,
27};
28use crate::types::{
29    DomainOrLiteral, EnhancedStatusCode, ForwardPath, Protocol, RecipientResult, ReversePath,
30    ServerCapabilities, SmtpResponse,
31};
32
33mod auth;
34mod bdat;
35mod helpers;
36mod lifecycle;
37mod lmtp;
38mod sending;
39mod session;
40
41#[cfg(test)]
42#[allow(
43    clippy::unwrap_used,
44    clippy::expect_used,
45    clippy::similar_names,
46    clippy::wildcard_in_or_patterns,
47    clippy::items_after_statements,
48    clippy::manual_let_else,
49    clippy::match_wild_err_arm
50)]
51#[path = "tests.rs"]
52mod tests;
53
54pub use daaki_message::TlsMode;
55
56/// Abstraction over plain TCP and TLS streams.
57enum SmtpStream {
58    /// Unencrypted TCP connection.
59    Plain(TcpStream),
60    /// TLS-encrypted connection (boxed to avoid large enum variant).
61    Tls(Box<TlsStream<TcpStream>>),
62}
63
64impl SmtpStream {
65    /// Write all bytes to the stream.
66    async fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
67        match self {
68            Self::Plain(s) => s.write_all(buf).await,
69            Self::Tls(s) => s.write_all(buf).await,
70        }
71    }
72
73    /// Flush the stream.
74    async fn flush(&mut self) -> std::io::Result<()> {
75        match self {
76            Self::Plain(s) => s.flush().await,
77            Self::Tls(s) => s.flush().await,
78        }
79    }
80
81    /// Read bytes into buffer, returning count read.
82    async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
83        match self {
84            Self::Plain(s) => s.read(buf).await,
85            Self::Tls(s) => s.read(buf).await,
86        }
87    }
88
89    /// Returns `true` if the connection is TLS-encrypted.
90    ///
91    /// RFC 8689 Section 3: REQUIRETLS requires the session to be
92    /// TLS-protected before the parameter may be used.
93    fn is_tls(&self) -> bool {
94        matches!(self, Self::Tls(_))
95    }
96}
97
98/// Mutable state of an SMTP/LMTP connection, protected by a
99/// `tokio::sync::Mutex` inside [`SmtpConnection`].
100///
101/// SMTP is a serial protocol (RFC 5321 Section 3.1): commands and
102/// responses are strictly ordered, so concurrent access must be
103/// serialised. Wrapping the mutable state in an async mutex allows
104/// callers to share an `SmtpConnection` across tasks (e.g. via
105/// `Arc<SmtpConnection>`) without an external lock.
106struct SmtpInner {
107    /// The underlying TCP or TLS stream.
108    stream: SmtpStream,
109    /// Read buffer for incoming data.
110    read_buf: BytesMut,
111    /// Server-advertised capabilities from the last EHLO/LHLO.
112    capabilities: ServerCapabilities,
113    /// The client domain sent in EHLO/LHLO (RFC 5321 Section 4.1.1.1).
114    ///
115    /// Defaults to the address-literal `"[127.0.0.1]"`. RFC 5321 Section
116    /// 4.1.4 says clients that cannot determine their own FQDN SHOULD
117    /// substitute an address-literal. Pre-validated by [`DomainOrLiteral`]
118    /// on construction.
119    ehlo_domain: DomainOrLiteral,
120    /// Whether a successful AUTH exchange has completed in this session.
121    ///
122    /// RFC 4954 Section 3: "After an AUTH command has been successfully
123    /// completed, no more AUTH commands may be issued in the same session."
124    /// This flag is set to `true` after a 235 response and persists for the
125    /// lifetime of the connection (RSET/EHLO do not reset it — the session
126    /// continues until QUIT or connection closure).
127    authenticated: bool,
128    /// Whether the server has indicated it is shutting down via a 421 response.
129    ///
130    /// RFC 5321 Section 3.8: "If the SMTP code is 421, the server MUST close
131    /// the transmission channel after sending the response." Once set, the
132    /// client must not attempt further commands on this connection.
133    server_shutting_down: bool,
134    /// Whether the initial EHLO was rejected and the connection fell back to
135    /// HELO (RFC 5321 Section 4.1.4).  When `true`, subsequent `ehlo_on_inner`
136    /// calls (e.g. via `rehlo`) must send HELO instead of EHLO, because the
137    /// server has already indicated it does not support ESMTP.
138    helo_mode: bool,
139}
140
141impl SmtpInner {
142    /// Write bytes to the stream and flush.
143    async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> {
144        self.stream.write_all(buf).await?;
145        self.stream.flush().await?;
146        Ok(())
147    }
148
149    /// Read a complete SMTP response from the stream.
150    ///
151    /// Accumulates lines until a final line (code followed by SP) is seen,
152    /// per RFC 5321 Section 4.2.
153    async fn read_response(&mut self) -> Result<SmtpResponse, Error> {
154        loop {
155            // Try to parse a complete response from the buffer.
156            if let Some((resp, consumed)) = SmtpConnection::try_parse_response(&self.read_buf)? {
157                let _ = self.read_buf.split_to(consumed);
158                // RFC 5321 Section 3.8: a 421 reply means the server will
159                // close the transmission channel. Mark the connection as
160                // shutting down so subsequent operations fail immediately.
161                if resp.code == 421 {
162                    self.server_shutting_down = true;
163                }
164                return Ok(resp);
165            }
166
167            // RFC 5321 Section 4.5.3.1.5: enforce a response buffer limit
168            // to prevent a malicious server from exhausting client memory.
169            if self.read_buf.len() > SmtpConnection::MAX_RESPONSE_BUFFER {
170                return Err(Error::Protocol(format!(
171                    "response exceeds maximum buffer size ({} bytes) \
172                     (RFC 5321 Section 4.5.3.1.5)",
173                    SmtpConnection::MAX_RESPONSE_BUFFER
174                )));
175            }
176
177            // Need more data.
178            let mut tmp = [0u8; 4096];
179            let n = self.stream.read(&mut tmp).await?;
180            if n == 0 {
181                return Err(Error::Closed);
182            }
183            self.read_buf.extend_from_slice(&tmp[..n]);
184        }
185    }
186
187    /// Best-effort RSET to abort the current mail transaction.
188    ///
189    /// RFC 5321 Section 4.1.1.5: RSET aborts the current mail transaction,
190    /// clearing the reverse-path, forward-path, and mail data buffers.
191    /// Errors are intentionally ignored — the caller has a more meaningful
192    /// error to return.
193    async fn rset_best_effort(&mut self) {
194        let mut buf = BytesMut::new();
195        encode::encode_rset(&mut buf);
196        if self.write_all(&buf).await.is_ok() {
197            let _ = self.read_response().await;
198        }
199    }
200
201    /// Best-effort QUIT to close the session gracefully.
202    ///
203    /// RFC 5321 Section 4.1.1.10: "The sender MUST NOT intentionally close
204    /// the transmission channel until it sends a QUIT command." This helper
205    /// is used in error paths during connection setup where the 220 greeting
206    /// has already established a session but a subsequent step (EHLO,
207    /// STARTTLS, etc.) has failed. Errors are intentionally ignored — the
208    /// caller has a more meaningful error to return.
209    async fn quit_best_effort(&mut self) {
210        let mut buf = BytesMut::new();
211        encode::encode_quit(&mut buf);
212        if self.write_all(&buf).await.is_ok() {
213            let _ = self.read_response().await;
214        }
215    }
216}
217
218/// Return the default EHLO identity for clients that have no better local name.
219///
220/// RFC 5321 Section 4.1.4 says clients that cannot determine a more
221/// specific host identity SHOULD substitute an address-literal.
222pub(super) fn default_ehlo_domain() -> Result<DomainOrLiteral, Error> {
223    DomainOrLiteral::new("[127.0.0.1]")
224        .map_err(|err| Error::Protocol(format!("invalid default EHLO domain: {err}")))
225}
226
227/// An SMTP/LMTP client connection.
228///
229/// Manages a single TCP (or TLS) connection to an SMTP or LMTP server.
230/// The caller provides a timeout for each operation — there are no hardcoded
231/// defaults.
232///
233/// All methods take `&self` rather than `&mut self`. Mutable state (the
234/// TCP stream, read buffer, and capabilities) is protected by an internal
235/// `tokio::sync::Mutex`, so the connection can be shared across async
236/// tasks via `Arc<SmtpConnection>` without an external lock. SMTP is a
237/// serial protocol (RFC 5321 Section 3.1), so the mutex serialises
238/// operations correctly.
239///
240/// Message construction is NOT this library's responsibility. The caller
241/// provides fully-formed RFC 5322 message bytes.
242pub struct SmtpConnection {
243    /// Mutable connection state, protected by an async mutex.
244    inner: tokio::sync::Mutex<SmtpInner>,
245    /// Protocol mode — SMTP or LMTP.
246    protocol: Protocol,
247}
248
249// Compile-time proof that SmtpConnection is Send + Sync — required for
250// sharing via `Arc<SmtpConnection>` across async tasks.
251const _: fn() = || {
252    fn assert_send_sync<T: Send + Sync>() {}
253    assert_send_sync::<SmtpConnection>();
254};
255
256impl std::fmt::Debug for SmtpConnection {
257    /// Prints connection metadata useful for logging and diagnostics,
258    /// without exposing internal stream state or buffers.
259    ///
260    /// Uses `try_lock()` to inspect the inner state without blocking.
261    /// If the mutex is currently held by another task, the transport
262    /// and capabilities fields show `"<locked>"` as a fallback.
263    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264        let mut s = f.debug_struct("SmtpConnection");
265        s.field("protocol", &self.protocol);
266
267        if let Ok(inner) = self.inner.try_lock() {
268            s.field("ehlo_domain", &inner.ehlo_domain);
269            let transport = match &inner.stream {
270                SmtpStream::Plain(_) => "plain",
271                SmtpStream::Tls(_) => "tls",
272            };
273            s.field("transport", &transport);
274            s.field("capabilities", &inner.capabilities);
275        } else {
276            s.field("ehlo_domain", &"<locked>");
277            s.field("transport", &"<locked>");
278            s.field("capabilities", &"<locked>");
279        }
280
281        s.finish_non_exhaustive()
282    }
283}