Skip to main content

daaki_smtp/connection/
bdat.rs

1//! BDAT/CHUNKING sending (RFC 3030 Section 3).
2//!
3//! Provides `send_bdat` and `send_bdat_with_all_params` for SMTP
4//! connections using the CHUNKING extension.
5
6#[allow(clippy::wildcard_imports)]
7use super::*;
8
9impl SmtpConnection {
10    // -----------------------------------------------------------------------
11    // Sending — BDAT/CHUNKING (RFC 3030)
12    // -----------------------------------------------------------------------
13
14    /// Send a message using BDAT chunking (RFC 3030 Section 3).
15    ///
16    /// Uses the BDAT command instead of DATA, which avoids dot-stuffing
17    /// and allows transmission of binary content. The server must
18    /// advertise the CHUNKING extension (RFC 3030).
19    ///
20    /// Performs the full MAIL FROM / RCPT TO / BDAT sequence.
21    /// The message is sent as a single BDAT chunk with the LAST flag.
22    ///
23    /// Accepts optional [`MailFromParams`](crate::types::MailFromParams) to include ESMTP parameters
24    /// such as `BODY=BINARYMIME` (RFC 3030 Section 2) and `SMTPUTF8`
25    /// (RFC 6531 Section 3.4). The SIZE parameter is always included
26    /// automatically when the server advertises the SIZE extension
27    /// (RFC 1870 Section 3).
28    ///
29    /// Addresses are pre-validated by their type constructors
30    /// ([`ReversePath::new`], [`ForwardPath::new`]) per RFC 5321
31    /// Section 4.1.2.
32    ///
33    /// Returns `Error::Protocol` if the server does not support CHUNKING.
34    #[allow(clippy::significant_drop_tightening)]
35    pub async fn send_bdat(
36        &self,
37        from: &ReversePath,
38        recipients: &[ForwardPath],
39        message: &[u8],
40        params: Option<&crate::types::MailFromParams>,
41        timeout: Duration,
42    ) -> Result<crate::types::SendResult, Error> {
43        // RFC 3030 Section 3 + RFC 2033 Section 4.2: In LMTP, the server
44        // returns one response per accepted recipient after BDAT LAST.
45        // send_bdat reads a single response and cannot handle per-recipient
46        // results. Use send_lmtp_bdat for LMTP BDAT connections.
47        if self.protocol == Protocol::Lmtp {
48            return Err(Error::Protocol(
49                "send_bdat does not support LMTP per-recipient responses \
50                 (RFC 3030 Section 3 / RFC 2033 Section 4.2); \
51                 LMTP connections must use send_lmtp or send_lmtp_bdat"
52                    .into(),
53            ));
54        }
55        let mut inner = self.inner.lock().await;
56        // RFC 5321 Section 3.8: after a 421 response the server will close
57        // the transmission channel. Fail immediately.
58        if inner.server_shutting_down {
59            return Err(Error::Protocol(
60                "connection is shutting down after 421 (RFC 5321 Section 3.8)".into(),
61            ));
62        }
63        let effective_mail_params = Some(Self::effective_mail_from_params(
64            &inner.capabilities,
65            from,
66            recipients,
67            message,
68            params,
69        )?);
70        Self::validate_send_addresses(from, recipients)?;
71        Self::validate_bdat_prerequisites(
72            &inner.capabilities,
73            message,
74            effective_mail_params.as_ref(),
75            inner.stream.is_tls(),
76        )?;
77        tokio::time::timeout(timeout, async {
78            Self::send_bdat_inner(
79                &mut inner,
80                from,
81                recipients,
82                message,
83                effective_mail_params.as_ref(),
84                None,
85            )
86            .await
87        })
88        .await
89        .map_err(|_| Error::Timeout)?
90    }
91
92    /// Send a message using BDAT chunking with both MAIL FROM and
93    /// per-recipient RCPT TO parameters (RFC 3030 Section 3).
94    ///
95    /// Like [`send_bdat`](Self::send_bdat) but also accepts per-recipient
96    /// [`RcptToParams`](crate::types::RcptToParams) to include DSN parameters (NOTIFY, ORCPT) on each
97    /// RCPT TO command (RFC 3461 Sections 4.1–4.2).
98    ///
99    /// `rcpt_params` must have the same length as `recipients` — each
100    /// entry is paired by index. Returns `Error::Protocol` if the
101    /// lengths do not match.
102    ///
103    /// Addresses are pre-validated by their type constructors
104    /// ([`ReversePath::new`], [`ForwardPath::new`]) per RFC 5321
105    /// Section 4.1.2.
106    ///
107    /// The server must advertise CHUNKING (RFC 3030). Returns
108    /// `Error::Protocol` if the server does not support it.
109    #[allow(clippy::significant_drop_tightening)]
110    pub async fn send_bdat_with_all_params(
111        &self,
112        from: &ReversePath,
113        recipients: &[ForwardPath],
114        message: &[u8],
115        mail_params: Option<&crate::types::MailFromParams>,
116        rcpt_params: &[crate::types::RcptToParams],
117        timeout: Duration,
118    ) -> Result<crate::types::SendResult, Error> {
119        // RFC 3461 Sections 4.1–4.2: each recipient must have a
120        // corresponding RcptToParams entry.
121        if recipients.len() != rcpt_params.len() {
122            return Err(Error::Protocol(format!(
123                "rcpt_params length ({}) must match recipients length ({}) \
124                 (RFC 3461 Sections 4.1–4.2)",
125                rcpt_params.len(),
126                recipients.len(),
127            )));
128        }
129        // RFC 2033 Section 4.2: LMTP returns per-recipient responses;
130        // use send_lmtp_bdat_with_all_params instead.
131        if self.protocol == Protocol::Lmtp {
132            return Err(Error::Protocol(
133                "send_bdat_with_all_params does not support LMTP per-recipient \
134                 responses (RFC 3030 Section 3 / RFC 2033 Section 4.2); \
135                 LMTP connections must use send_lmtp_bdat_with_all_params"
136                    .into(),
137            ));
138        }
139        let mut inner = self.inner.lock().await;
140        // RFC 5321 Section 3.8: after a 421 response the server will close
141        // the transmission channel. Fail immediately.
142        if inner.server_shutting_down {
143            return Err(Error::Protocol(
144                "connection is shutting down after 421 (RFC 5321 Section 3.8)".into(),
145            ));
146        }
147        let effective_mail_params = Some(Self::effective_mail_from_params(
148            &inner.capabilities,
149            from,
150            recipients,
151            message,
152            mail_params,
153        )?);
154        Self::validate_send_addresses(from, recipients)?;
155        Self::validate_rcpt_params(&inner.capabilities, rcpt_params)?;
156        Self::validate_bdat_prerequisites(
157            &inner.capabilities,
158            message,
159            effective_mail_params.as_ref(),
160            inner.stream.is_tls(),
161        )?;
162        tokio::time::timeout(timeout, async {
163            Self::send_bdat_inner(
164                &mut inner,
165                from,
166                recipients,
167                message,
168                effective_mail_params.as_ref(),
169                Some(rcpt_params),
170            )
171            .await
172        })
173        .await
174        .map_err(|_| Error::Timeout)?
175    }
176
177    /// Send the MAIL FROM -> RCPT TO -> BDAT LAST sequence shared by
178    /// SMTP and LMTP BDAT paths (RFC 3030 Section 3).
179    ///
180    /// When `rcpt_params` is `Some`, per-recipient DSN parameters are
181    /// included on each RCPT TO command (RFC 3461 Sections 4.1–4.2).
182    ///
183    /// Returns the list of accepted recipients and rejected recipients so the
184    /// caller can handle protocol-specific response logic (SMTP: single
185    /// response; LMTP: one response per accepted recipient per RFC 2033
186    /// Section 4.2) and surface partial rejections (RFC 5321 Section 3.3).
187    pub(super) async fn send_bdat_envelope(
188        inner: &mut SmtpInner,
189        from: &ReversePath,
190        recipients: &[ForwardPath],
191        message: &[u8],
192        params: Option<&crate::types::MailFromParams>,
193        rcpt_params: Option<&[crate::types::RcptToParams]>,
194    ) -> Result<(Vec<ForwardPath>, Vec<crate::types::RejectedRecipient>), Error> {
195        Self::send_mail_from(inner, from, message, message.len(), params).await?;
196        let (accepted, rejected) = Self::send_rcpt_to_batch(inner, recipients, rcpt_params).await?;
197
198        // BDAT <size> LAST — send the entire message as a single chunk
199        // (RFC 3030 Section 3). No dot-stuffing required.
200        let mut buf = BytesMut::new();
201        encode::encode_bdat(&mut buf, message.len(), true);
202        inner.write_all(&buf).await?;
203        inner.write_all(message).await?;
204
205        Ok((accepted, rejected))
206    }
207
208    /// Inner BDAT send logic (RFC 3030 Section 3).
209    ///
210    /// When the server supports PIPELINING (RFC 1854), batches MAIL FROM,
211    /// all RCPT TOs, and BDAT LAST + message data into a single write for
212    /// better throughput — matching the DATA path's pipelining behavior.
213    /// Falls back to sequential command/response for non-pipelining servers.
214    ///
215    /// Returns a [`SendResult`] containing any rejected recipients
216    /// (RFC 5321 Section 3.3).
217    async fn send_bdat_inner(
218        inner: &mut SmtpInner,
219        from: &ReversePath,
220        recipients: &[ForwardPath],
221        message: &[u8],
222        params: Option<&crate::types::MailFromParams>,
223        rcpt_params: Option<&[crate::types::RcptToParams]>,
224    ) -> Result<crate::types::SendResult, Error> {
225        if inner.capabilities.supports_pipelining() {
226            Self::send_bdat_pipelined(inner, from, recipients, message, params, rcpt_params).await
227        } else {
228            Self::send_bdat_sequential(inner, from, recipients, message, params, rcpt_params).await
229        }
230    }
231
232    /// Sequential BDAT send — issues each command and waits for its
233    /// response before sending the next (RFC 3030 Section 3).
234    ///
235    /// Returns a [`SendResult`] containing any rejected recipients
236    /// (RFC 5321 Section 3.3).
237    async fn send_bdat_sequential(
238        inner: &mut SmtpInner,
239        from: &ReversePath,
240        recipients: &[ForwardPath],
241        message: &[u8],
242        params: Option<&crate::types::MailFromParams>,
243        rcpt_params: Option<&[crate::types::RcptToParams]>,
244    ) -> Result<crate::types::SendResult, Error> {
245        let (_accepted, rejected) =
246            Self::send_bdat_envelope(inner, from, recipients, message, params, rcpt_params).await?;
247
248        // SMTP: single response after BDAT LAST (RFC 3030 Section 3).
249        let resp = inner.read_response().await?;
250        if !resp.is_success() {
251            // RFC 3030 Section 3: "The resulting state from a failed BDAT
252            // command is indeterminate.  A RSET command MUST be issued to
253            // clear the transaction before additional commands may be sent."
254            inner.rset_best_effort().await;
255            return Err(Self::response_to_error(resp));
256        }
257
258        Ok(crate::types::SendResult {
259            rejected_recipients: rejected,
260        })
261    }
262
263    /// Pipelined BDAT send — batches MAIL FROM, all RCPT TOs, BDAT LAST,
264    /// and message data into a single write, then reads responses in order
265    /// (RFC 1854 Section 3, RFC 3030 Section 4.2).
266    ///
267    /// Unlike the DATA pipelining path, BDAT does not have an intermediate
268    /// 354 response — the server replies once after receiving the chunk data.
269    /// Per RFC 3030 Section 3, "the resulting state from a failed BDAT
270    /// command is indeterminate" and a RSET must be issued.
271    ///
272    /// When `rcpt_params` is `Some`, per-recipient DSN parameters are
273    /// included on each RCPT TO command (RFC 3461 Sections 4.1–4.2).
274    ///
275    /// Returns a [`SendResult`] containing any rejected recipients
276    /// (RFC 5321 Section 3.3).
277    #[allow(clippy::too_many_lines)]
278    async fn send_bdat_pipelined(
279        inner: &mut SmtpInner,
280        from: &ReversePath,
281        recipients: &[ForwardPath],
282        message: &[u8],
283        params: Option<&crate::types::MailFromParams>,
284        rcpt_params: Option<&[crate::types::RcptToParams]>,
285    ) -> Result<crate::types::SendResult, Error> {
286        // Build the pipeline: MAIL FROM + RCPT TO(s) + BDAT <size> LAST + message data.
287        let mut buf = BytesMut::new();
288        let is_8bit = Self::message_contains_8bit(message);
289        Self::encode_mail_from_cmd(
290            &inner.capabilities,
291            &mut buf,
292            from,
293            message.len(),
294            params,
295            is_8bit,
296        )?;
297        // RFC 5321 Section 4.5.3.1.4 / RFC 1870 Section 4 / RFC 6152
298        // Section 7 / RFC 6531 Section 3.4: MAIL FROM has an extended
299        // command line limit to accommodate ESMTP parameters.
300        Self::validate_mail_from_line_length(buf.len())?;
301
302        for (i, fp) in recipients.iter().enumerate() {
303            let start = buf.len();
304            if let Some(rp) = rcpt_params {
305                // RFC 3461 Sections 4.1–4.2: encode with DSN parameters.
306                encode::encode_rcpt_to_full(&mut buf, fp, &rp[i])?;
307            } else {
308                encode::encode_rcpt_to(&mut buf, fp)?;
309            }
310            // RFC 5321 Section 4.5.3.1.4 / RFC 3461 Section 5: validate
311            // each RCPT TO line, using the extended 1012-octet limit when
312            // DSN parameters are present.
313            let has_dsn = rcpt_params.is_some_and(|rp| !rp[i].is_empty());
314            Self::validate_rcpt_to_line_length(buf.len() - start, has_dsn)?;
315        }
316
317        // BDAT <size> LAST — appended to the pipeline buffer so the entire
318        // envelope + chunk header + message body is sent in one write
319        // (RFC 3030 Section 3, RFC 1854 Section 3).
320        encode::encode_bdat(&mut buf, message.len(), true);
321
322        // Append the raw message data — BDAT requires no dot-stuffing
323        // (RFC 3030 Section 3).
324        buf.extend_from_slice(message);
325
326        // Send all commands + message data at once (RFC 1854 Section 3).
327        inner.write_all(&buf).await?;
328
329        // Read responses: 1 MAIL FROM + N RCPT TOs + 1 BDAT LAST.
330
331        // MAIL FROM response (RFC 5321 Section 4.1.1.2).
332        let mail_resp = inner.read_response().await?;
333        if !mail_resp.is_success() {
334            tracing::debug!(code = mail_resp.code, "pipelined BDAT MAIL FROM rejected");
335            // Drain remaining responses: N RCPT TOs + 1 BDAT LAST.
336            // RFC 1854 Section 3: the server processes pipelined commands
337            // in order; we must read all remaining responses.
338            // RFC 3030 Section 3: after a failed BDAT, session state is
339            // indeterminate — RSET is required.
340            let drain_count = recipients.len() + 1;
341            for _ in 0..drain_count {
342                if inner.read_response().await.is_err() {
343                    break;
344                }
345            }
346            // RFC 3030 Section 3: "The resulting state from a failed BDAT
347            // command is indeterminate. A RSET command MUST be issued."
348            inner.rset_best_effort().await;
349            return Err(Self::response_to_error(mail_resp));
350        }
351
352        // RCPT TO responses (RFC 5321 Section 4.1.1.3).
353        let mut rejected_recipients = Vec::new();
354        let mut accepted_count = 0usize;
355        for fp in recipients {
356            let resp = inner.read_response().await?;
357            if resp.is_success() {
358                accepted_count += 1;
359            } else {
360                tracing::debug!(
361                    recipient = fp.as_str(),
362                    code = resp.code,
363                    "pipelined BDAT RCPT TO rejected"
364                );
365                rejected_recipients.push(crate::types::RejectedRecipient {
366                    recipient: fp.clone(),
367                    response: resp,
368                });
369            }
370        }
371
372        // BDAT LAST response (RFC 3030 Section 3).
373        let bdat_resp = inner.read_response().await?;
374
375        if accepted_count == 0 {
376            // All RCPT TOs failed. The server already received the BDAT data
377            // but should discard it since no recipients were accepted.
378            // RFC 3030 Section 3: indeterminate state — issue RSET.
379            inner.rset_best_effort().await;
380            return Err(Error::AllRecipientsFailed {
381                count: recipients.len(),
382                responses: rejected_recipients
383                    .into_iter()
384                    .map(|r| r.response)
385                    .collect(),
386            });
387        }
388
389        if !bdat_resp.is_success() {
390            // RFC 3030 Section 3: "The resulting state from a failed BDAT
391            // command is indeterminate.  A RSET command MUST be issued to
392            // clear the transaction before additional commands may be sent."
393            inner.rset_best_effort().await;
394            return Err(Self::response_to_error(bdat_resp));
395        }
396
397        Ok(crate::types::SendResult {
398            rejected_recipients,
399        })
400    }
401}