daaki_smtp/connection/lmtp.rs
1//! LMTP sending and LMTP BDAT (RFC 2033 Section 4.2 + RFC 3030 Section 3).
2//!
3//! Per-recipient response handling for both DATA and BDAT paths.
4
5#[allow(clippy::wildcard_imports)]
6use super::*;
7
8impl SmtpConnection {
9 // -----------------------------------------------------------------------
10 // Sending — LMTP (RFC 2033)
11 // -----------------------------------------------------------------------
12
13 /// Send a message via LMTP and return per-recipient results.
14 ///
15 /// LMTP (RFC 2033 Section 4.2) differs from SMTP in that the server
16 /// returns one response per accepted recipient after the DATA terminator,
17 /// rather than a single aggregate response.
18 ///
19 /// Accepts optional [`MailFromParams`](crate::types::MailFromParams) to include ESMTP parameters
20 /// such as `BODY=8BITMIME` (RFC 1652 Section 3) and `SMTPUTF8`
21 /// (RFC 6531 Section 3.4). The SIZE parameter is always included
22 /// automatically when the server advertises the SIZE extension
23 /// (RFC 1870 Section 3).
24 ///
25 /// Addresses are pre-validated by their type constructors
26 /// ([`ReversePath::new`], [`ForwardPath::new`]) per RFC 5321
27 /// Section 4.1.2.
28 ///
29 /// Returns `Error::Protocol` if this connection is not LMTP.
30 #[allow(clippy::significant_drop_tightening)]
31 pub async fn send_lmtp(
32 &self,
33 from: &ReversePath,
34 recipients: &[ForwardPath],
35 message: &[u8],
36 params: Option<&crate::types::MailFromParams>,
37 timeout: Duration,
38 ) -> Result<crate::types::LmtpSendResult, Error> {
39 if self.protocol != Protocol::Lmtp {
40 return Err(Error::Protocol(
41 "send_lmtp requires an LMTP connection (RFC 2033)".into(),
42 ));
43 }
44 let mut inner = self.inner.lock().await;
45 // RFC 5321 Section 3.8: after a 421 response the server will close
46 // the transmission channel. Fail immediately.
47 if inner.server_shutting_down {
48 return Err(Error::Protocol(
49 "connection is shutting down after 421 (RFC 5321 Section 3.8)".into(),
50 ));
51 }
52 let effective_mail_params = Some(Self::effective_mail_from_params(
53 &inner.capabilities,
54 from,
55 recipients,
56 message,
57 params,
58 )?);
59 Self::validate_send_addresses(from, recipients)?;
60 let message_size = Self::validate_data_prerequisites(
61 &inner.capabilities,
62 message,
63 effective_mail_params.as_ref(),
64 inner.stream.is_tls(),
65 )?;
66 tokio::time::timeout(timeout, async {
67 Self::send_lmtp_inner(
68 &mut inner,
69 from,
70 recipients,
71 message,
72 message_size,
73 effective_mail_params.as_ref(),
74 None,
75 )
76 .await
77 })
78 .await
79 .map_err(|_| Error::Timeout)?
80 }
81
82 /// Send a message via LMTP with both MAIL FROM and per-recipient
83 /// RCPT TO parameters, returning per-recipient results.
84 ///
85 /// Like [`send_lmtp`](Self::send_lmtp) but also accepts per-recipient
86 /// [`RcptToParams`](crate::types::RcptToParams) to include DSN parameters (NOTIFY, ORCPT) on each
87 /// RCPT TO command (RFC 3461 Sections 4.1–4.2).
88 ///
89 /// `rcpt_params` must have the same length as `recipients` — each
90 /// entry is paired by index. Returns `Error::Protocol` if the
91 /// lengths do not match.
92 ///
93 /// Addresses are pre-validated by their type constructors
94 /// ([`ReversePath::new`], [`ForwardPath::new`]) per RFC 5321
95 /// Section 4.1.2.
96 ///
97 /// Returns `Error::Protocol` if this connection is not LMTP.
98 #[allow(clippy::significant_drop_tightening)]
99 pub async fn send_lmtp_with_all_params(
100 &self,
101 from: &ReversePath,
102 recipients: &[ForwardPath],
103 message: &[u8],
104 mail_params: Option<&crate::types::MailFromParams>,
105 rcpt_params: &[crate::types::RcptToParams],
106 timeout: Duration,
107 ) -> Result<crate::types::LmtpSendResult, Error> {
108 // RFC 3461 Sections 4.1–4.2: each recipient must have a
109 // corresponding RcptToParams entry.
110 if recipients.len() != rcpt_params.len() {
111 return Err(Error::Protocol(format!(
112 "rcpt_params length ({}) must match recipients length ({}) \
113 (RFC 3461 Sections 4.1–4.2)",
114 rcpt_params.len(),
115 recipients.len(),
116 )));
117 }
118 if self.protocol != Protocol::Lmtp {
119 return Err(Error::Protocol(
120 "send_lmtp_with_all_params requires an LMTP connection (RFC 2033)".into(),
121 ));
122 }
123 let mut inner = self.inner.lock().await;
124 // RFC 5321 Section 3.8: after a 421 response the server will close
125 // the transmission channel. Fail immediately.
126 if inner.server_shutting_down {
127 return Err(Error::Protocol(
128 "connection is shutting down after 421 (RFC 5321 Section 3.8)".into(),
129 ));
130 }
131 let effective_mail_params = Some(Self::effective_mail_from_params(
132 &inner.capabilities,
133 from,
134 recipients,
135 message,
136 mail_params,
137 )?);
138 Self::validate_send_addresses(from, recipients)?;
139 Self::validate_rcpt_params(&inner.capabilities, rcpt_params)?;
140 let message_size = Self::validate_data_prerequisites(
141 &inner.capabilities,
142 message,
143 effective_mail_params.as_ref(),
144 inner.stream.is_tls(),
145 )?;
146 tokio::time::timeout(timeout, async {
147 Self::send_lmtp_inner(
148 &mut inner,
149 from,
150 recipients,
151 message,
152 message_size,
153 effective_mail_params.as_ref(),
154 Some(rcpt_params),
155 )
156 .await
157 })
158 .await
159 .map_err(|_| Error::Timeout)?
160 }
161
162 /// Inner LMTP send logic (RFC 2033 Section 4.2).
163 ///
164 /// When `rcpt_params` is `Some`, per-recipient DSN parameters are
165 /// included on each RCPT TO command (RFC 3461 Sections 4.1–4.2).
166 async fn send_lmtp_inner(
167 inner: &mut SmtpInner,
168 from: &ReversePath,
169 recipients: &[ForwardPath],
170 message: &[u8],
171 message_size: usize,
172 params: Option<&crate::types::MailFromParams>,
173 rcpt_params: Option<&[crate::types::RcptToParams]>,
174 ) -> Result<crate::types::LmtpSendResult, Error> {
175 Self::send_mail_from(inner, from, message, message_size, params).await?;
176 // RFC 2033 Section 4.2 / RFC 5321 Section 3.3: capture both accepted
177 // and rejected recipients so callers have full visibility.
178 let (accepted_recipients, rejected) =
179 Self::send_rcpt_to_batch(inner, recipients, rcpt_params).await?;
180
181 Self::send_data_body(inner, message).await?;
182 // LMTP: one response per accepted recipient (RFC 2033 Section 4.2).
183 let results = Self::collect_lmtp_results(inner, accepted_recipients).await?;
184 Ok(crate::types::LmtpSendResult {
185 results,
186 rejected_recipients: rejected,
187 })
188 }
189
190 // -----------------------------------------------------------------------
191 // Sending — LMTP BDAT (RFC 3030 §3 + RFC 2033 §4.2)
192 // -----------------------------------------------------------------------
193
194 /// Send a message via LMTP using BDAT chunking and return per-recipient results.
195 ///
196 /// Combines LMTP per-recipient response handling (RFC 2033 Section 4.2)
197 /// with BDAT chunking (RFC 3030 Section 3). Unlike DATA, BDAT does not
198 /// require dot-stuffing, making it suitable for binary content.
199 ///
200 /// Addresses are pre-validated by their type constructors
201 /// ([`ReversePath::new`], [`ForwardPath::new`]) per RFC 5321
202 /// Section 4.1.2.
203 ///
204 /// The server must advertise CHUNKING (RFC 3030). Returns
205 /// `Error::Protocol` if this is not an LMTP connection or if the server
206 /// does not support CHUNKING.
207 #[allow(clippy::significant_drop_tightening)]
208 pub async fn send_lmtp_bdat(
209 &self,
210 from: &ReversePath,
211 recipients: &[ForwardPath],
212 message: &[u8],
213 params: Option<&crate::types::MailFromParams>,
214 timeout: Duration,
215 ) -> Result<crate::types::LmtpSendResult, Error> {
216 // RFC 2033: LMTP connection required.
217 if self.protocol != Protocol::Lmtp {
218 return Err(Error::Protocol(
219 "send_lmtp_bdat requires an LMTP connection (RFC 2033)".into(),
220 ));
221 }
222 let mut inner = self.inner.lock().await;
223 // RFC 5321 Section 3.8: after a 421 response the server will close
224 // the transmission channel. Fail immediately.
225 if inner.server_shutting_down {
226 return Err(Error::Protocol(
227 "connection is shutting down after 421 (RFC 5321 Section 3.8)".into(),
228 ));
229 }
230 let effective_mail_params = Some(Self::effective_mail_from_params(
231 &inner.capabilities,
232 from,
233 recipients,
234 message,
235 params,
236 )?);
237 Self::validate_send_addresses(from, recipients)?;
238 Self::validate_bdat_prerequisites(
239 &inner.capabilities,
240 message,
241 effective_mail_params.as_ref(),
242 inner.stream.is_tls(),
243 )?;
244 tokio::time::timeout(timeout, async {
245 Self::send_lmtp_bdat_inner(
246 &mut inner,
247 from,
248 recipients,
249 message,
250 effective_mail_params.as_ref(),
251 None,
252 )
253 .await
254 })
255 .await
256 .map_err(|_| Error::Timeout)?
257 }
258
259 /// Send a message via LMTP using BDAT chunking with both MAIL FROM
260 /// and per-recipient RCPT TO parameters, returning per-recipient results.
261 ///
262 /// Like [`send_lmtp_bdat`](Self::send_lmtp_bdat) but also accepts
263 /// per-recipient [`RcptToParams`](crate::types::RcptToParams) to include DSN parameters (NOTIFY,
264 /// ORCPT) on each RCPT TO command (RFC 3461 Sections 4.1–4.2).
265 ///
266 /// `rcpt_params` must have the same length as `recipients` — each
267 /// entry is paired by index. Returns `Error::Protocol` if the
268 /// lengths do not match.
269 ///
270 /// Addresses are pre-validated by their type constructors
271 /// ([`ReversePath::new`], [`ForwardPath::new`]) per RFC 5321
272 /// Section 4.1.2.
273 ///
274 /// The server must advertise CHUNKING (RFC 3030). Returns
275 /// `Error::Protocol` if this is not an LMTP connection or if the
276 /// server does not support CHUNKING.
277 #[allow(clippy::significant_drop_tightening)]
278 pub async fn send_lmtp_bdat_with_all_params(
279 &self,
280 from: &ReversePath,
281 recipients: &[ForwardPath],
282 message: &[u8],
283 mail_params: Option<&crate::types::MailFromParams>,
284 rcpt_params: &[crate::types::RcptToParams],
285 timeout: Duration,
286 ) -> Result<crate::types::LmtpSendResult, Error> {
287 // RFC 3461 Sections 4.1–4.2: each recipient must have a
288 // corresponding RcptToParams entry.
289 if recipients.len() != rcpt_params.len() {
290 return Err(Error::Protocol(format!(
291 "rcpt_params length ({}) must match recipients length ({}) \
292 (RFC 3461 Sections 4.1–4.2)",
293 rcpt_params.len(),
294 recipients.len(),
295 )));
296 }
297 // RFC 2033: LMTP connection required.
298 if self.protocol != Protocol::Lmtp {
299 return Err(Error::Protocol(
300 "send_lmtp_bdat_with_all_params requires an LMTP connection (RFC 2033)".into(),
301 ));
302 }
303 let mut inner = self.inner.lock().await;
304 // RFC 5321 Section 3.8: after a 421 response the server will close
305 // the transmission channel. Fail immediately.
306 if inner.server_shutting_down {
307 return Err(Error::Protocol(
308 "connection is shutting down after 421 (RFC 5321 Section 3.8)".into(),
309 ));
310 }
311 let effective_mail_params = Some(Self::effective_mail_from_params(
312 &inner.capabilities,
313 from,
314 recipients,
315 message,
316 mail_params,
317 )?);
318 Self::validate_send_addresses(from, recipients)?;
319 Self::validate_rcpt_params(&inner.capabilities, rcpt_params)?;
320 Self::validate_bdat_prerequisites(
321 &inner.capabilities,
322 message,
323 effective_mail_params.as_ref(),
324 inner.stream.is_tls(),
325 )?;
326 tokio::time::timeout(timeout, async {
327 Self::send_lmtp_bdat_inner(
328 &mut inner,
329 from,
330 recipients,
331 message,
332 effective_mail_params.as_ref(),
333 Some(rcpt_params),
334 )
335 .await
336 })
337 .await
338 .map_err(|_| Error::Timeout)?
339 }
340
341 /// Inner LMTP BDAT send logic (RFC 3030 Section 3 + RFC 2033 Section 4.2).
342 ///
343 /// When `rcpt_params` is `Some`, per-recipient DSN parameters are
344 /// included on each RCPT TO command (RFC 3461 Sections 4.1–4.2).
345 async fn send_lmtp_bdat_inner(
346 inner: &mut SmtpInner,
347 from: &ReversePath,
348 recipients: &[ForwardPath],
349 message: &[u8],
350 params: Option<&crate::types::MailFromParams>,
351 rcpt_params: Option<&[crate::types::RcptToParams]>,
352 ) -> Result<crate::types::LmtpSendResult, Error> {
353 // RFC 2033 Section 4.2 / RFC 5321 Section 3.3: capture both accepted
354 // and rejected recipients so callers have full visibility.
355 let (accepted, rejected) =
356 Self::send_bdat_envelope(inner, from, recipients, message, params, rcpt_params).await?;
357
358 // LMTP: one response per accepted recipient after BDAT LAST
359 // (RFC 2033 Section 4.2).
360 let results = Self::collect_lmtp_results(inner, accepted).await?;
361 Ok(crate::types::LmtpSendResult {
362 results,
363 rejected_recipients: rejected,
364 })
365 }
366
367 /// Send the DATA command, dot-stuff the message body, and write the
368 /// terminator (RFC 5321 Sections 4.1.1.4 / 4.5.2).
369 ///
370 /// On return the message body has been sent, but the final response(s)
371 /// have NOT been read — the caller must read them (SMTP: one response;
372 /// LMTP: one per accepted recipient per RFC 2033 Section 4.2).
373 pub(super) async fn send_data_body(inner: &mut SmtpInner, message: &[u8]) -> Result<(), Error> {
374 // DATA (RFC 5321 Section 4.1.1.4).
375 let mut buf = BytesMut::new();
376 encode::encode_data(&mut buf);
377 inner.write_all(&buf).await?;
378 let resp = inner.read_response().await?;
379 // RFC 5321 Section 4.1.1.4: 354 is the only valid intermediate
380 // response to DATA. Any other code means DATA was rejected.
381 if resp.code != 354 {
382 // RFC 5321 Section 3.3: DATA was rejected but the mail
383 // transaction (MAIL FROM) is still open. RSET to clean up.
384 inner.rset_best_effort().await;
385 return Err(Self::response_to_error(resp));
386 }
387
388 // Send dot-stuffed message body + terminator in a single write
389 // (RFC 5321 Section 4.5.2 / Section 4.1.1.4).
390 let body = encode::dot_stuff_and_terminate(message);
391 inner.write_all(&body).await?;
392 Ok(())
393 }
394
395 /// Collect per-recipient LMTP responses (RFC 2033 Section 4.2).
396 ///
397 /// After DATA or BDAT LAST, LMTP returns one response per accepted
398 /// recipient. This helper reads them all and pairs each with its
399 /// recipient address.
400 pub(super) async fn collect_lmtp_results(
401 inner: &mut SmtpInner,
402 accepted_recipients: Vec<ForwardPath>,
403 ) -> Result<Vec<RecipientResult>, Error> {
404 let mut results = Vec::with_capacity(accepted_recipients.len());
405 for fp in accepted_recipients {
406 let resp = inner.read_response().await?;
407 results.push(RecipientResult {
408 recipient: fp,
409 response: resp,
410 });
411 }
412 Ok(results)
413 }
414}