daaki_smtp/connection/bdat.rs
1//! BDAT/CHUNKING sending (RFC 3030 Section 3).
2//!
3//! Provides `send_bdat` and `send_bdat_with_all_params` for SMTP
4//! connections using the CHUNKING extension.
5
6#[allow(clippy::wildcard_imports)]
7use super::*;
8
9impl SmtpConnection {
10 // -----------------------------------------------------------------------
11 // Sending — BDAT/CHUNKING (RFC 3030)
12 // -----------------------------------------------------------------------
13
14 /// Send a message using BDAT chunking (RFC 3030 Section 3).
15 ///
16 /// Uses the BDAT command instead of DATA, which avoids dot-stuffing
17 /// and allows transmission of binary content. The server must
18 /// advertise the CHUNKING extension (RFC 3030).
19 ///
20 /// Performs the full MAIL FROM / RCPT TO / BDAT sequence.
21 /// The message is sent as a single BDAT chunk with the LAST flag.
22 ///
23 /// Accepts optional [`MailFromParams`](crate::types::MailFromParams) to include ESMTP parameters
24 /// such as `BODY=BINARYMIME` (RFC 3030 Section 2) and `SMTPUTF8`
25 /// (RFC 6531 Section 3.4). The SIZE parameter is always included
26 /// automatically when the server advertises the SIZE extension
27 /// (RFC 1870 Section 3).
28 ///
29 /// Addresses are pre-validated by their type constructors
30 /// ([`ReversePath::new`], [`ForwardPath::new`]) per RFC 5321
31 /// Section 4.1.2.
32 ///
33 /// Returns `Error::Protocol` if the server does not support CHUNKING.
34 #[allow(clippy::significant_drop_tightening)]
35 pub async fn send_bdat(
36 &self,
37 from: &ReversePath,
38 recipients: &[ForwardPath],
39 message: &[u8],
40 params: Option<&crate::types::MailFromParams>,
41 timeout: Duration,
42 ) -> Result<crate::types::SendResult, Error> {
43 // RFC 3030 Section 3 + RFC 2033 Section 4.2: In LMTP, the server
44 // returns one response per accepted recipient after BDAT LAST.
45 // send_bdat reads a single response and cannot handle per-recipient
46 // results. Use send_lmtp_bdat for LMTP BDAT connections.
47 if self.protocol == Protocol::Lmtp {
48 return Err(Error::Protocol(
49 "send_bdat does not support LMTP per-recipient responses \
50 (RFC 3030 Section 3 / RFC 2033 Section 4.2); \
51 LMTP connections must use send_lmtp or send_lmtp_bdat"
52 .into(),
53 ));
54 }
55 let mut inner = self.inner.lock().await;
56 // RFC 5321 Section 3.8: after a 421 response the server will close
57 // the transmission channel. Fail immediately.
58 if inner.server_shutting_down {
59 return Err(Error::Protocol(
60 "connection is shutting down after 421 (RFC 5321 Section 3.8)".into(),
61 ));
62 }
63 let effective_mail_params = Some(Self::effective_mail_from_params(
64 &inner.capabilities,
65 from,
66 recipients,
67 message,
68 params,
69 )?);
70 Self::validate_send_addresses(from, recipients)?;
71 Self::validate_bdat_prerequisites(
72 &inner.capabilities,
73 message,
74 effective_mail_params.as_ref(),
75 inner.stream.is_tls(),
76 )?;
77 tokio::time::timeout(timeout, async {
78 Self::send_bdat_inner(
79 &mut inner,
80 from,
81 recipients,
82 message,
83 effective_mail_params.as_ref(),
84 None,
85 )
86 .await
87 })
88 .await
89 .map_err(|_| Error::Timeout)?
90 }
91
92 /// Send a message using BDAT chunking with both MAIL FROM and
93 /// per-recipient RCPT TO parameters (RFC 3030 Section 3).
94 ///
95 /// Like [`send_bdat`](Self::send_bdat) but also accepts per-recipient
96 /// [`RcptToParams`](crate::types::RcptToParams) to include DSN parameters (NOTIFY, ORCPT) on each
97 /// RCPT TO command (RFC 3461 Sections 4.1–4.2).
98 ///
99 /// `rcpt_params` must have the same length as `recipients` — each
100 /// entry is paired by index. Returns `Error::Protocol` if the
101 /// lengths do not match.
102 ///
103 /// Addresses are pre-validated by their type constructors
104 /// ([`ReversePath::new`], [`ForwardPath::new`]) per RFC 5321
105 /// Section 4.1.2.
106 ///
107 /// The server must advertise CHUNKING (RFC 3030). Returns
108 /// `Error::Protocol` if the server does not support it.
109 #[allow(clippy::significant_drop_tightening)]
110 pub async fn send_bdat_with_all_params(
111 &self,
112 from: &ReversePath,
113 recipients: &[ForwardPath],
114 message: &[u8],
115 mail_params: Option<&crate::types::MailFromParams>,
116 rcpt_params: &[crate::types::RcptToParams],
117 timeout: Duration,
118 ) -> Result<crate::types::SendResult, Error> {
119 // RFC 3461 Sections 4.1–4.2: each recipient must have a
120 // corresponding RcptToParams entry.
121 if recipients.len() != rcpt_params.len() {
122 return Err(Error::Protocol(format!(
123 "rcpt_params length ({}) must match recipients length ({}) \
124 (RFC 3461 Sections 4.1–4.2)",
125 rcpt_params.len(),
126 recipients.len(),
127 )));
128 }
129 // RFC 2033 Section 4.2: LMTP returns per-recipient responses;
130 // use send_lmtp_bdat_with_all_params instead.
131 if self.protocol == Protocol::Lmtp {
132 return Err(Error::Protocol(
133 "send_bdat_with_all_params does not support LMTP per-recipient \
134 responses (RFC 3030 Section 3 / RFC 2033 Section 4.2); \
135 LMTP connections must use send_lmtp_bdat_with_all_params"
136 .into(),
137 ));
138 }
139 let mut inner = self.inner.lock().await;
140 // RFC 5321 Section 3.8: after a 421 response the server will close
141 // the transmission channel. Fail immediately.
142 if inner.server_shutting_down {
143 return Err(Error::Protocol(
144 "connection is shutting down after 421 (RFC 5321 Section 3.8)".into(),
145 ));
146 }
147 let effective_mail_params = Some(Self::effective_mail_from_params(
148 &inner.capabilities,
149 from,
150 recipients,
151 message,
152 mail_params,
153 )?);
154 Self::validate_send_addresses(from, recipients)?;
155 Self::validate_rcpt_params(&inner.capabilities, rcpt_params)?;
156 Self::validate_bdat_prerequisites(
157 &inner.capabilities,
158 message,
159 effective_mail_params.as_ref(),
160 inner.stream.is_tls(),
161 )?;
162 tokio::time::timeout(timeout, async {
163 Self::send_bdat_inner(
164 &mut inner,
165 from,
166 recipients,
167 message,
168 effective_mail_params.as_ref(),
169 Some(rcpt_params),
170 )
171 .await
172 })
173 .await
174 .map_err(|_| Error::Timeout)?
175 }
176
177 /// Send the MAIL FROM -> RCPT TO -> BDAT LAST sequence shared by
178 /// SMTP and LMTP BDAT paths (RFC 3030 Section 3).
179 ///
180 /// When `rcpt_params` is `Some`, per-recipient DSN parameters are
181 /// included on each RCPT TO command (RFC 3461 Sections 4.1–4.2).
182 ///
183 /// Returns the list of accepted recipients and rejected recipients so the
184 /// caller can handle protocol-specific response logic (SMTP: single
185 /// response; LMTP: one response per accepted recipient per RFC 2033
186 /// Section 4.2) and surface partial rejections (RFC 5321 Section 3.3).
187 pub(super) async fn send_bdat_envelope(
188 inner: &mut SmtpInner,
189 from: &ReversePath,
190 recipients: &[ForwardPath],
191 message: &[u8],
192 params: Option<&crate::types::MailFromParams>,
193 rcpt_params: Option<&[crate::types::RcptToParams]>,
194 ) -> Result<(Vec<ForwardPath>, Vec<crate::types::RejectedRecipient>), Error> {
195 Self::send_mail_from(inner, from, message, message.len(), params).await?;
196 let (accepted, rejected) = Self::send_rcpt_to_batch(inner, recipients, rcpt_params).await?;
197
198 // BDAT <size> LAST — send the entire message as a single chunk
199 // (RFC 3030 Section 3). No dot-stuffing required.
200 let mut buf = BytesMut::new();
201 encode::encode_bdat(&mut buf, message.len(), true);
202 inner.write_all(&buf).await?;
203 inner.write_all(message).await?;
204
205 Ok((accepted, rejected))
206 }
207
208 /// Inner BDAT send logic (RFC 3030 Section 3).
209 ///
210 /// When the server supports PIPELINING (RFC 1854), batches MAIL FROM,
211 /// all RCPT TOs, and BDAT LAST + message data into a single write for
212 /// better throughput — matching the DATA path's pipelining behavior.
213 /// Falls back to sequential command/response for non-pipelining servers.
214 ///
215 /// Returns a [`SendResult`] containing any rejected recipients
216 /// (RFC 5321 Section 3.3).
217 async fn send_bdat_inner(
218 inner: &mut SmtpInner,
219 from: &ReversePath,
220 recipients: &[ForwardPath],
221 message: &[u8],
222 params: Option<&crate::types::MailFromParams>,
223 rcpt_params: Option<&[crate::types::RcptToParams]>,
224 ) -> Result<crate::types::SendResult, Error> {
225 if inner.capabilities.supports_pipelining() {
226 Self::send_bdat_pipelined(inner, from, recipients, message, params, rcpt_params).await
227 } else {
228 Self::send_bdat_sequential(inner, from, recipients, message, params, rcpt_params).await
229 }
230 }
231
232 /// Sequential BDAT send — issues each command and waits for its
233 /// response before sending the next (RFC 3030 Section 3).
234 ///
235 /// Returns a [`SendResult`] containing any rejected recipients
236 /// (RFC 5321 Section 3.3).
237 async fn send_bdat_sequential(
238 inner: &mut SmtpInner,
239 from: &ReversePath,
240 recipients: &[ForwardPath],
241 message: &[u8],
242 params: Option<&crate::types::MailFromParams>,
243 rcpt_params: Option<&[crate::types::RcptToParams]>,
244 ) -> Result<crate::types::SendResult, Error> {
245 let (_accepted, rejected) =
246 Self::send_bdat_envelope(inner, from, recipients, message, params, rcpt_params).await?;
247
248 // SMTP: single response after BDAT LAST (RFC 3030 Section 3).
249 let resp = inner.read_response().await?;
250 if !resp.is_success() {
251 // RFC 3030 Section 3: "The resulting state from a failed BDAT
252 // command is indeterminate. A RSET command MUST be issued to
253 // clear the transaction before additional commands may be sent."
254 inner.rset_best_effort().await;
255 return Err(Self::response_to_error(resp));
256 }
257
258 Ok(crate::types::SendResult {
259 rejected_recipients: rejected,
260 })
261 }
262
263 /// Pipelined BDAT send — batches MAIL FROM, all RCPT TOs, BDAT LAST,
264 /// and message data into a single write, then reads responses in order
265 /// (RFC 1854 Section 3, RFC 3030 Section 4.2).
266 ///
267 /// Unlike the DATA pipelining path, BDAT does not have an intermediate
268 /// 354 response — the server replies once after receiving the chunk data.
269 /// Per RFC 3030 Section 3, "the resulting state from a failed BDAT
270 /// command is indeterminate" and a RSET must be issued.
271 ///
272 /// When `rcpt_params` is `Some`, per-recipient DSN parameters are
273 /// included on each RCPT TO command (RFC 3461 Sections 4.1–4.2).
274 ///
275 /// Returns a [`SendResult`] containing any rejected recipients
276 /// (RFC 5321 Section 3.3).
277 #[allow(clippy::too_many_lines)]
278 async fn send_bdat_pipelined(
279 inner: &mut SmtpInner,
280 from: &ReversePath,
281 recipients: &[ForwardPath],
282 message: &[u8],
283 params: Option<&crate::types::MailFromParams>,
284 rcpt_params: Option<&[crate::types::RcptToParams]>,
285 ) -> Result<crate::types::SendResult, Error> {
286 // Build the pipeline: MAIL FROM + RCPT TO(s) + BDAT <size> LAST + message data.
287 let mut buf = BytesMut::new();
288 let is_8bit = Self::message_contains_8bit(message);
289 Self::encode_mail_from_cmd(
290 &inner.capabilities,
291 &mut buf,
292 from,
293 message.len(),
294 params,
295 is_8bit,
296 )?;
297 // RFC 5321 Section 4.5.3.1.4 / RFC 1870 Section 4 / RFC 6152
298 // Section 7 / RFC 6531 Section 3.4: MAIL FROM has an extended
299 // command line limit to accommodate ESMTP parameters.
300 Self::validate_mail_from_line_length(buf.len())?;
301
302 for (i, fp) in recipients.iter().enumerate() {
303 let start = buf.len();
304 if let Some(rp) = rcpt_params {
305 // RFC 3461 Sections 4.1–4.2: encode with DSN parameters.
306 encode::encode_rcpt_to_full(&mut buf, fp, &rp[i])?;
307 } else {
308 encode::encode_rcpt_to(&mut buf, fp)?;
309 }
310 // RFC 5321 Section 4.5.3.1.4 / RFC 3461 Section 5: validate
311 // each RCPT TO line, using the extended 1012-octet limit when
312 // DSN parameters are present.
313 let has_dsn = rcpt_params.is_some_and(|rp| !rp[i].is_empty());
314 Self::validate_rcpt_to_line_length(buf.len() - start, has_dsn)?;
315 }
316
317 // BDAT <size> LAST — appended to the pipeline buffer so the entire
318 // envelope + chunk header + message body is sent in one write
319 // (RFC 3030 Section 3, RFC 1854 Section 3).
320 encode::encode_bdat(&mut buf, message.len(), true);
321
322 // Append the raw message data — BDAT requires no dot-stuffing
323 // (RFC 3030 Section 3).
324 buf.extend_from_slice(message);
325
326 // Send all commands + message data at once (RFC 1854 Section 3).
327 inner.write_all(&buf).await?;
328
329 // Read responses: 1 MAIL FROM + N RCPT TOs + 1 BDAT LAST.
330
331 // MAIL FROM response (RFC 5321 Section 4.1.1.2).
332 let mail_resp = inner.read_response().await?;
333 if !mail_resp.is_success() {
334 tracing::debug!(code = mail_resp.code, "pipelined BDAT MAIL FROM rejected");
335 // Drain remaining responses: N RCPT TOs + 1 BDAT LAST.
336 // RFC 1854 Section 3: the server processes pipelined commands
337 // in order; we must read all remaining responses.
338 // RFC 3030 Section 3: after a failed BDAT, session state is
339 // indeterminate — RSET is required.
340 let drain_count = recipients.len() + 1;
341 for _ in 0..drain_count {
342 if inner.read_response().await.is_err() {
343 break;
344 }
345 }
346 // RFC 3030 Section 3: "The resulting state from a failed BDAT
347 // command is indeterminate. A RSET command MUST be issued."
348 inner.rset_best_effort().await;
349 return Err(Self::response_to_error(mail_resp));
350 }
351
352 // RCPT TO responses (RFC 5321 Section 4.1.1.3).
353 let mut rejected_recipients = Vec::new();
354 let mut accepted_count = 0usize;
355 for fp in recipients {
356 let resp = inner.read_response().await?;
357 if resp.is_success() {
358 accepted_count += 1;
359 } else {
360 tracing::debug!(
361 recipient = fp.as_str(),
362 code = resp.code,
363 "pipelined BDAT RCPT TO rejected"
364 );
365 rejected_recipients.push(crate::types::RejectedRecipient {
366 recipient: fp.clone(),
367 response: resp,
368 });
369 }
370 }
371
372 // BDAT LAST response (RFC 3030 Section 3).
373 let bdat_resp = inner.read_response().await?;
374
375 if accepted_count == 0 {
376 // All RCPT TOs failed. The server already received the BDAT data
377 // but should discard it since no recipients were accepted.
378 // RFC 3030 Section 3: indeterminate state — issue RSET.
379 inner.rset_best_effort().await;
380 return Err(Error::AllRecipientsFailed {
381 count: recipients.len(),
382 responses: rejected_recipients
383 .into_iter()
384 .map(|r| r.response)
385 .collect(),
386 });
387 }
388
389 if !bdat_resp.is_success() {
390 // RFC 3030 Section 3: "The resulting state from a failed BDAT
391 // command is indeterminate. A RSET command MUST be issued to
392 // clear the transaction before additional commands may be sent."
393 inner.rset_best_effort().await;
394 return Err(Self::response_to_error(bdat_resp));
395 }
396
397 Ok(crate::types::SendResult {
398 rejected_recipients,
399 })
400 }
401}