daaki_smtp/connection/mod.rs
1//! SMTP/LMTP client connection.
2//!
3//! `SmtpConnection` handles the lifecycle of a single SMTP (RFC 5321) or
4//! LMTP (RFC 2033) session: connect, EHLO/LHLO, optional STARTTLS,
5//! authenticate, and send messages. Supports pipelining (RFC 1854) when
6//! the server advertises it.
7//!
8//! All public methods take `&self`. Mutable state is protected by an
9//! internal `tokio::sync::Mutex` (see [`SmtpInner`]), allowing the
10//! connection to be shared across async tasks via `Arc<SmtpConnection>`.
11//! The mutex guard is intentionally held for the duration of each SMTP
12//! operation because SMTP is a serial protocol (RFC 5321 Section 3.1).
13use std::sync::Arc;
14use std::time::{Duration, SystemTime, UNIX_EPOCH};
15
16use bytes::BytesMut;
17use tokio::io::{AsyncReadExt, AsyncWriteExt};
18use tokio::net::TcpStream;
19use tokio_rustls::client::TlsStream;
20use tokio_rustls::TlsConnector;
21
22use crate::codec::{decode, encode};
23use crate::deliver_by::validate_deliver_by_value;
24use crate::error::Error;
25use crate::future_release::{
26 parse_rfc3339_to_utc_key, validate_hold_for_seconds, validate_hold_until_datetime,
27};
28use crate::types::{
29 DomainOrLiteral, EnhancedStatusCode, ForwardPath, Protocol, RecipientResult, ReversePath,
30 ServerCapabilities, SmtpResponse,
31};
32
33mod auth;
34mod bdat;
35mod helpers;
36mod lifecycle;
37mod lmtp;
38mod sending;
39mod session;
40
41#[cfg(test)]
42#[allow(
43 clippy::unwrap_used,
44 clippy::expect_used,
45 clippy::similar_names,
46 clippy::wildcard_in_or_patterns,
47 clippy::items_after_statements,
48 clippy::manual_let_else,
49 clippy::match_wild_err_arm
50)]
51#[path = "tests.rs"]
52mod tests;
53
54pub use daaki_message::TlsMode;
55
56/// Abstraction over plain TCP and TLS streams.
57enum SmtpStream {
58 /// Unencrypted TCP connection.
59 Plain(TcpStream),
60 /// TLS-encrypted connection (boxed to avoid large enum variant).
61 Tls(Box<TlsStream<TcpStream>>),
62}
63
64impl SmtpStream {
65 /// Write all bytes to the stream.
66 async fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
67 match self {
68 Self::Plain(s) => s.write_all(buf).await,
69 Self::Tls(s) => s.write_all(buf).await,
70 }
71 }
72
73 /// Flush the stream.
74 async fn flush(&mut self) -> std::io::Result<()> {
75 match self {
76 Self::Plain(s) => s.flush().await,
77 Self::Tls(s) => s.flush().await,
78 }
79 }
80
81 /// Read bytes into buffer, returning count read.
82 async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
83 match self {
84 Self::Plain(s) => s.read(buf).await,
85 Self::Tls(s) => s.read(buf).await,
86 }
87 }
88
89 /// Returns `true` if the connection is TLS-encrypted.
90 ///
91 /// RFC 8689 Section 3: REQUIRETLS requires the session to be
92 /// TLS-protected before the parameter may be used.
93 fn is_tls(&self) -> bool {
94 matches!(self, Self::Tls(_))
95 }
96}
97
98/// Mutable state of an SMTP/LMTP connection, protected by a
99/// `tokio::sync::Mutex` inside [`SmtpConnection`].
100///
101/// SMTP is a serial protocol (RFC 5321 Section 3.1): commands and
102/// responses are strictly ordered, so concurrent access must be
103/// serialised. Wrapping the mutable state in an async mutex allows
104/// callers to share an `SmtpConnection` across tasks (e.g. via
105/// `Arc<SmtpConnection>`) without an external lock.
106struct SmtpInner {
107 /// The underlying TCP or TLS stream.
108 stream: SmtpStream,
109 /// Read buffer for incoming data.
110 read_buf: BytesMut,
111 /// Server-advertised capabilities from the last EHLO/LHLO.
112 capabilities: ServerCapabilities,
113 /// The client domain sent in EHLO/LHLO (RFC 5321 Section 4.1.1.1).
114 ///
115 /// Defaults to the address-literal `"[127.0.0.1]"`. RFC 5321 Section
116 /// 4.1.4 says clients that cannot determine their own FQDN SHOULD
117 /// substitute an address-literal. Pre-validated by [`DomainOrLiteral`]
118 /// on construction.
119 ehlo_domain: DomainOrLiteral,
120 /// Whether a successful AUTH exchange has completed in this session.
121 ///
122 /// RFC 4954 Section 3: "After an AUTH command has been successfully
123 /// completed, no more AUTH commands may be issued in the same session."
124 /// This flag is set to `true` after a 235 response and persists for the
125 /// lifetime of the connection (RSET/EHLO do not reset it — the session
126 /// continues until QUIT or connection closure).
127 authenticated: bool,
128 /// Whether the server has indicated it is shutting down via a 421 response.
129 ///
130 /// RFC 5321 Section 3.8: "If the SMTP code is 421, the server MUST close
131 /// the transmission channel after sending the response." Once set, the
132 /// client must not attempt further commands on this connection.
133 server_shutting_down: bool,
134 /// Whether the initial EHLO was rejected and the connection fell back to
135 /// HELO (RFC 5321 Section 4.1.4). When `true`, subsequent `ehlo_on_inner`
136 /// calls (e.g. via `rehlo`) must send HELO instead of EHLO, because the
137 /// server has already indicated it does not support ESMTP.
138 helo_mode: bool,
139}
140
141impl SmtpInner {
142 /// Write bytes to the stream and flush.
143 async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> {
144 self.stream.write_all(buf).await?;
145 self.stream.flush().await?;
146 Ok(())
147 }
148
149 /// Read a complete SMTP response from the stream.
150 ///
151 /// Accumulates lines until a final line (code followed by SP) is seen,
152 /// per RFC 5321 Section 4.2.
153 async fn read_response(&mut self) -> Result<SmtpResponse, Error> {
154 loop {
155 // Try to parse a complete response from the buffer.
156 if let Some((resp, consumed)) = SmtpConnection::try_parse_response(&self.read_buf)? {
157 let _ = self.read_buf.split_to(consumed);
158 // RFC 5321 Section 3.8: a 421 reply means the server will
159 // close the transmission channel. Mark the connection as
160 // shutting down so subsequent operations fail immediately.
161 if resp.code == 421 {
162 self.server_shutting_down = true;
163 }
164 return Ok(resp);
165 }
166
167 // RFC 5321 Section 4.5.3.1.5: enforce a response buffer limit
168 // to prevent a malicious server from exhausting client memory.
169 if self.read_buf.len() > SmtpConnection::MAX_RESPONSE_BUFFER {
170 return Err(Error::Protocol(format!(
171 "response exceeds maximum buffer size ({} bytes) \
172 (RFC 5321 Section 4.5.3.1.5)",
173 SmtpConnection::MAX_RESPONSE_BUFFER
174 )));
175 }
176
177 // Need more data.
178 let mut tmp = [0u8; 4096];
179 let n = self.stream.read(&mut tmp).await?;
180 if n == 0 {
181 return Err(Error::Closed);
182 }
183 self.read_buf.extend_from_slice(&tmp[..n]);
184 }
185 }
186
187 /// Best-effort RSET to abort the current mail transaction.
188 ///
189 /// RFC 5321 Section 4.1.1.5: RSET aborts the current mail transaction,
190 /// clearing the reverse-path, forward-path, and mail data buffers.
191 /// Errors are intentionally ignored — the caller has a more meaningful
192 /// error to return.
193 async fn rset_best_effort(&mut self) {
194 let mut buf = BytesMut::new();
195 encode::encode_rset(&mut buf);
196 if self.write_all(&buf).await.is_ok() {
197 let _ = self.read_response().await;
198 }
199 }
200
201 /// Best-effort QUIT to close the session gracefully.
202 ///
203 /// RFC 5321 Section 4.1.1.10: "The sender MUST NOT intentionally close
204 /// the transmission channel until it sends a QUIT command." This helper
205 /// is used in error paths during connection setup where the 220 greeting
206 /// has already established a session but a subsequent step (EHLO,
207 /// STARTTLS, etc.) has failed. Errors are intentionally ignored — the
208 /// caller has a more meaningful error to return.
209 async fn quit_best_effort(&mut self) {
210 let mut buf = BytesMut::new();
211 encode::encode_quit(&mut buf);
212 if self.write_all(&buf).await.is_ok() {
213 let _ = self.read_response().await;
214 }
215 }
216}
217
218/// Return the default EHLO identity for clients that have no better local name.
219///
220/// RFC 5321 Section 4.1.4 says clients that cannot determine a more
221/// specific host identity SHOULD substitute an address-literal.
222pub(super) fn default_ehlo_domain() -> Result<DomainOrLiteral, Error> {
223 DomainOrLiteral::new("[127.0.0.1]")
224 .map_err(|err| Error::Protocol(format!("invalid default EHLO domain: {err}")))
225}
226
227/// An SMTP/LMTP client connection.
228///
229/// Manages a single TCP (or TLS) connection to an SMTP or LMTP server.
230/// The caller provides a timeout for each operation — there are no hardcoded
231/// defaults.
232///
233/// All methods take `&self` rather than `&mut self`. Mutable state (the
234/// TCP stream, read buffer, and capabilities) is protected by an internal
235/// `tokio::sync::Mutex`, so the connection can be shared across async
236/// tasks via `Arc<SmtpConnection>` without an external lock. SMTP is a
237/// serial protocol (RFC 5321 Section 3.1), so the mutex serialises
238/// operations correctly.
239///
240/// Message construction is NOT this library's responsibility. The caller
241/// provides fully-formed RFC 5322 message bytes.
242pub struct SmtpConnection {
243 /// Mutable connection state, protected by an async mutex.
244 inner: tokio::sync::Mutex<SmtpInner>,
245 /// Protocol mode — SMTP or LMTP.
246 protocol: Protocol,
247}
248
249// Compile-time proof that SmtpConnection is Send + Sync — required for
250// sharing via `Arc<SmtpConnection>` across async tasks.
251const _: fn() = || {
252 fn assert_send_sync<T: Send + Sync>() {}
253 assert_send_sync::<SmtpConnection>();
254};
255
256impl std::fmt::Debug for SmtpConnection {
257 /// Prints connection metadata useful for logging and diagnostics,
258 /// without exposing internal stream state or buffers.
259 ///
260 /// Uses `try_lock()` to inspect the inner state without blocking.
261 /// If the mutex is currently held by another task, the transport
262 /// and capabilities fields show `"<locked>"` as a fallback.
263 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264 let mut s = f.debug_struct("SmtpConnection");
265 s.field("protocol", &self.protocol);
266
267 if let Ok(inner) = self.inner.try_lock() {
268 s.field("ehlo_domain", &inner.ehlo_domain);
269 let transport = match &inner.stream {
270 SmtpStream::Plain(_) => "plain",
271 SmtpStream::Tls(_) => "tls",
272 };
273 s.field("transport", &transport);
274 s.field("capabilities", &inner.capabilities);
275 } else {
276 s.field("ehlo_domain", &"<locked>");
277 s.field("transport", &"<locked>");
278 s.field("capabilities", &"<locked>");
279 }
280
281 s.finish_non_exhaustive()
282 }
283}