daaki-smtp 0.2.0

An async SMTP client library
Documentation
//! BDAT/CHUNKING sending (RFC 3030 Section 3).
//!
//! Provides `send_bdat` and `send_bdat_with_all_params` for SMTP
//! connections using the CHUNKING extension.

#[allow(clippy::wildcard_imports)]
use super::*;

impl SmtpConnection {
    // -----------------------------------------------------------------------
    // Sending — BDAT/CHUNKING (RFC 3030)
    // -----------------------------------------------------------------------

    /// Send a message using BDAT chunking (RFC 3030 Section 3).
    ///
    /// Uses the BDAT command instead of DATA, which avoids dot-stuffing
    /// and allows transmission of binary content. The server must
    /// advertise the CHUNKING extension (RFC 3030).
    ///
    /// Performs the full MAIL FROM / RCPT TO / BDAT sequence.
    /// The message is sent as a single BDAT chunk with the LAST flag.
    ///
    /// Accepts optional [`MailFromParams`](crate::types::MailFromParams) to include ESMTP parameters
    /// such as `BODY=BINARYMIME` (RFC 3030 Section 2) and `SMTPUTF8`
    /// (RFC 6531 Section 3.4). The SIZE parameter is always included
    /// automatically when the server advertises the SIZE extension
    /// (RFC 1870 Section 3).
    ///
    /// Addresses are pre-validated by their type constructors
    /// ([`ReversePath::new`], [`ForwardPath::new`]) per RFC 5321
    /// Section 4.1.2.
    ///
    /// Returns `Error::Protocol` if the server does not support CHUNKING.
    #[allow(clippy::significant_drop_tightening)]
    pub async fn send_bdat(
        &self,
        from: &ReversePath,
        recipients: &[ForwardPath],
        message: &[u8],
        params: Option<&crate::types::MailFromParams>,
        timeout: Duration,
    ) -> Result<crate::types::SendResult, Error> {
        // RFC 3030 Section 3 + RFC 2033 Section 4.2: In LMTP, the server
        // returns one response per accepted recipient after BDAT LAST.
        // send_bdat reads a single response and cannot handle per-recipient
        // results. Use send_lmtp_bdat for LMTP BDAT connections.
        if self.protocol == Protocol::Lmtp {
            return Err(Error::Protocol(
                "send_bdat does not support LMTP per-recipient responses \
                 (RFC 3030 Section 3 / RFC 2033 Section 4.2); \
                 LMTP connections must use send_lmtp or send_lmtp_bdat"
                    .into(),
            ));
        }
        let mut inner = self.inner.lock().await;
        // RFC 5321 Section 3.8: after a 421 response the server will close
        // the transmission channel. Fail immediately.
        if inner.server_shutting_down {
            return Err(Error::Protocol(
                "connection is shutting down after 421 (RFC 5321 Section 3.8)".into(),
            ));
        }
        let effective_mail_params = Some(Self::effective_mail_from_params(
            &inner.capabilities,
            from,
            recipients,
            message,
            params,
        )?);
        Self::validate_send_addresses(from, recipients)?;
        Self::validate_bdat_prerequisites(
            &inner.capabilities,
            message,
            effective_mail_params.as_ref(),
            inner.stream.is_tls(),
        )?;
        tokio::time::timeout(timeout, async {
            Self::send_bdat_inner(
                &mut inner,
                from,
                recipients,
                message,
                effective_mail_params.as_ref(),
                None,
            )
            .await
        })
        .await
        .map_err(|_| Error::Timeout)?
    }

    /// Send a message using BDAT chunking with both MAIL FROM and
    /// per-recipient RCPT TO parameters (RFC 3030 Section 3).
    ///
    /// Like [`send_bdat`](Self::send_bdat) but also accepts per-recipient
    /// [`RcptToParams`](crate::types::RcptToParams) to include DSN parameters (NOTIFY, ORCPT) on each
    /// RCPT TO command (RFC 3461 Sections 4.1–4.2).
    ///
    /// `rcpt_params` must have the same length as `recipients` — each
    /// entry is paired by index. Returns `Error::Protocol` if the
    /// lengths do not match.
    ///
    /// Addresses are pre-validated by their type constructors
    /// ([`ReversePath::new`], [`ForwardPath::new`]) per RFC 5321
    /// Section 4.1.2.
    ///
    /// The server must advertise CHUNKING (RFC 3030). Returns
    /// `Error::Protocol` if the server does not support it.
    #[allow(clippy::significant_drop_tightening)]
    pub async fn send_bdat_with_all_params(
        &self,
        from: &ReversePath,
        recipients: &[ForwardPath],
        message: &[u8],
        mail_params: Option<&crate::types::MailFromParams>,
        rcpt_params: &[crate::types::RcptToParams],
        timeout: Duration,
    ) -> Result<crate::types::SendResult, Error> {
        // RFC 3461 Sections 4.1–4.2: each recipient must have a
        // corresponding RcptToParams entry.
        if recipients.len() != rcpt_params.len() {
            return Err(Error::Protocol(format!(
                "rcpt_params length ({}) must match recipients length ({}) \
                 (RFC 3461 Sections 4.1–4.2)",
                rcpt_params.len(),
                recipients.len(),
            )));
        }
        // RFC 2033 Section 4.2: LMTP returns per-recipient responses;
        // use send_lmtp_bdat_with_all_params instead.
        if self.protocol == Protocol::Lmtp {
            return Err(Error::Protocol(
                "send_bdat_with_all_params does not support LMTP per-recipient \
                 responses (RFC 3030 Section 3 / RFC 2033 Section 4.2); \
                 LMTP connections must use send_lmtp_bdat_with_all_params"
                    .into(),
            ));
        }
        let mut inner = self.inner.lock().await;
        // RFC 5321 Section 3.8: after a 421 response the server will close
        // the transmission channel. Fail immediately.
        if inner.server_shutting_down {
            return Err(Error::Protocol(
                "connection is shutting down after 421 (RFC 5321 Section 3.8)".into(),
            ));
        }
        let effective_mail_params = Some(Self::effective_mail_from_params(
            &inner.capabilities,
            from,
            recipients,
            message,
            mail_params,
        )?);
        Self::validate_send_addresses(from, recipients)?;
        Self::validate_rcpt_params(&inner.capabilities, rcpt_params)?;
        Self::validate_bdat_prerequisites(
            &inner.capabilities,
            message,
            effective_mail_params.as_ref(),
            inner.stream.is_tls(),
        )?;
        tokio::time::timeout(timeout, async {
            Self::send_bdat_inner(
                &mut inner,
                from,
                recipients,
                message,
                effective_mail_params.as_ref(),
                Some(rcpt_params),
            )
            .await
        })
        .await
        .map_err(|_| Error::Timeout)?
    }

    /// Send the MAIL FROM -> RCPT TO -> BDAT LAST sequence shared by
    /// SMTP and LMTP BDAT paths (RFC 3030 Section 3).
    ///
    /// When `rcpt_params` is `Some`, per-recipient DSN parameters are
    /// included on each RCPT TO command (RFC 3461 Sections 4.1–4.2).
    ///
    /// Returns the list of accepted recipients and rejected recipients so the
    /// caller can handle protocol-specific response logic (SMTP: single
    /// response; LMTP: one response per accepted recipient per RFC 2033
    /// Section 4.2) and surface partial rejections (RFC 5321 Section 3.3).
    pub(super) async fn send_bdat_envelope(
        inner: &mut SmtpInner,
        from: &ReversePath,
        recipients: &[ForwardPath],
        message: &[u8],
        params: Option<&crate::types::MailFromParams>,
        rcpt_params: Option<&[crate::types::RcptToParams]>,
    ) -> Result<(Vec<ForwardPath>, Vec<crate::types::RejectedRecipient>), Error> {
        Self::send_mail_from(inner, from, message, message.len(), params).await?;
        let (accepted, rejected) = Self::send_rcpt_to_batch(inner, recipients, rcpt_params).await?;

        // BDAT <size> LAST — send the entire message as a single chunk
        // (RFC 3030 Section 3). No dot-stuffing required.
        let mut buf = BytesMut::new();
        encode::encode_bdat(&mut buf, message.len(), true);
        inner.write_all(&buf).await?;
        inner.write_all(message).await?;

        Ok((accepted, rejected))
    }

    /// Inner BDAT send logic (RFC 3030 Section 3).
    ///
    /// When the server supports PIPELINING (RFC 1854), batches MAIL FROM,
    /// all RCPT TOs, and BDAT LAST + message data into a single write for
    /// better throughput — matching the DATA path's pipelining behavior.
    /// Falls back to sequential command/response for non-pipelining servers.
    ///
    /// Returns a [`SendResult`] containing any rejected recipients
    /// (RFC 5321 Section 3.3).
    async fn send_bdat_inner(
        inner: &mut SmtpInner,
        from: &ReversePath,
        recipients: &[ForwardPath],
        message: &[u8],
        params: Option<&crate::types::MailFromParams>,
        rcpt_params: Option<&[crate::types::RcptToParams]>,
    ) -> Result<crate::types::SendResult, Error> {
        if inner.capabilities.supports_pipelining() {
            Self::send_bdat_pipelined(inner, from, recipients, message, params, rcpt_params).await
        } else {
            Self::send_bdat_sequential(inner, from, recipients, message, params, rcpt_params).await
        }
    }

    /// Sequential BDAT send — issues each command and waits for its
    /// response before sending the next (RFC 3030 Section 3).
    ///
    /// Returns a [`SendResult`] containing any rejected recipients
    /// (RFC 5321 Section 3.3).
    async fn send_bdat_sequential(
        inner: &mut SmtpInner,
        from: &ReversePath,
        recipients: &[ForwardPath],
        message: &[u8],
        params: Option<&crate::types::MailFromParams>,
        rcpt_params: Option<&[crate::types::RcptToParams]>,
    ) -> Result<crate::types::SendResult, Error> {
        let (_accepted, rejected) =
            Self::send_bdat_envelope(inner, from, recipients, message, params, rcpt_params).await?;

        // SMTP: single response after BDAT LAST (RFC 3030 Section 3).
        let resp = inner.read_response().await?;
        if !resp.is_success() {
            // RFC 3030 Section 3: "The resulting state from a failed BDAT
            // command is indeterminate.  A RSET command MUST be issued to
            // clear the transaction before additional commands may be sent."
            inner.rset_best_effort().await;
            return Err(Self::response_to_error(resp));
        }

        Ok(crate::types::SendResult {
            rejected_recipients: rejected,
        })
    }

    /// Pipelined BDAT send — batches MAIL FROM, all RCPT TOs, BDAT LAST,
    /// and message data into a single write, then reads responses in order
    /// (RFC 1854 Section 3, RFC 3030 Section 4.2).
    ///
    /// Unlike the DATA pipelining path, BDAT does not have an intermediate
    /// 354 response — the server replies once after receiving the chunk data.
    /// Per RFC 3030 Section 3, "the resulting state from a failed BDAT
    /// command is indeterminate" and a RSET must be issued.
    ///
    /// When `rcpt_params` is `Some`, per-recipient DSN parameters are
    /// included on each RCPT TO command (RFC 3461 Sections 4.1–4.2).
    ///
    /// Returns a [`SendResult`] containing any rejected recipients
    /// (RFC 5321 Section 3.3).
    #[allow(clippy::too_many_lines)]
    async fn send_bdat_pipelined(
        inner: &mut SmtpInner,
        from: &ReversePath,
        recipients: &[ForwardPath],
        message: &[u8],
        params: Option<&crate::types::MailFromParams>,
        rcpt_params: Option<&[crate::types::RcptToParams]>,
    ) -> Result<crate::types::SendResult, Error> {
        // Build the pipeline: MAIL FROM + RCPT TO(s) + BDAT <size> LAST + message data.
        let mut buf = BytesMut::new();
        let is_8bit = Self::message_contains_8bit(message);
        Self::encode_mail_from_cmd(
            &inner.capabilities,
            &mut buf,
            from,
            message.len(),
            params,
            is_8bit,
        )?;
        // RFC 5321 Section 4.5.3.1.4 / RFC 1870 Section 4 / RFC 6152
        // Section 7 / RFC 6531 Section 3.4: MAIL FROM has an extended
        // command line limit to accommodate ESMTP parameters.
        Self::validate_mail_from_line_length(buf.len())?;

        for (i, fp) in recipients.iter().enumerate() {
            let start = buf.len();
            if let Some(rp) = rcpt_params {
                // RFC 3461 Sections 4.1–4.2: encode with DSN parameters.
                encode::encode_rcpt_to_full(&mut buf, fp, &rp[i])?;
            } else {
                encode::encode_rcpt_to(&mut buf, fp)?;
            }
            // RFC 5321 Section 4.5.3.1.4 / RFC 3461 Section 5: validate
            // each RCPT TO line, using the extended 1012-octet limit when
            // DSN parameters are present.
            let has_dsn = rcpt_params.is_some_and(|rp| !rp[i].is_empty());
            Self::validate_rcpt_to_line_length(buf.len() - start, has_dsn)?;
        }

        // BDAT <size> LAST — appended to the pipeline buffer so the entire
        // envelope + chunk header + message body is sent in one write
        // (RFC 3030 Section 3, RFC 1854 Section 3).
        encode::encode_bdat(&mut buf, message.len(), true);

        // Append the raw message data — BDAT requires no dot-stuffing
        // (RFC 3030 Section 3).
        buf.extend_from_slice(message);

        // Send all commands + message data at once (RFC 1854 Section 3).
        inner.write_all(&buf).await?;

        // Read responses: 1 MAIL FROM + N RCPT TOs + 1 BDAT LAST.

        // MAIL FROM response (RFC 5321 Section 4.1.1.2).
        let mail_resp = inner.read_response().await?;
        if !mail_resp.is_success() {
            tracing::debug!(code = mail_resp.code, "pipelined BDAT MAIL FROM rejected");
            // Drain remaining responses: N RCPT TOs + 1 BDAT LAST.
            // RFC 1854 Section 3: the server processes pipelined commands
            // in order; we must read all remaining responses.
            // RFC 3030 Section 3: after a failed BDAT, session state is
            // indeterminate — RSET is required.
            let drain_count = recipients.len() + 1;
            for _ in 0..drain_count {
                if inner.read_response().await.is_err() {
                    break;
                }
            }
            // RFC 3030 Section 3: "The resulting state from a failed BDAT
            // command is indeterminate. A RSET command MUST be issued."
            inner.rset_best_effort().await;
            return Err(Self::response_to_error(mail_resp));
        }

        // RCPT TO responses (RFC 5321 Section 4.1.1.3).
        let mut rejected_recipients = Vec::new();
        let mut accepted_count = 0usize;
        for fp in recipients {
            let resp = inner.read_response().await?;
            if resp.is_success() {
                accepted_count += 1;
            } else {
                tracing::debug!(
                    recipient = fp.as_str(),
                    code = resp.code,
                    "pipelined BDAT RCPT TO rejected"
                );
                rejected_recipients.push(crate::types::RejectedRecipient {
                    recipient: fp.clone(),
                    response: resp,
                });
            }
        }

        // BDAT LAST response (RFC 3030 Section 3).
        let bdat_resp = inner.read_response().await?;

        if accepted_count == 0 {
            // All RCPT TOs failed. The server already received the BDAT data
            // but should discard it since no recipients were accepted.
            // RFC 3030 Section 3: indeterminate state — issue RSET.
            inner.rset_best_effort().await;
            return Err(Error::AllRecipientsFailed {
                count: recipients.len(),
                responses: rejected_recipients
                    .into_iter()
                    .map(|r| r.response)
                    .collect(),
            });
        }

        if !bdat_resp.is_success() {
            // RFC 3030 Section 3: "The resulting state from a failed BDAT
            // command is indeterminate.  A RSET command MUST be issued to
            // clear the transaction before additional commands may be sent."
            inner.rset_best_effort().await;
            return Err(Self::response_to_error(bdat_resp));
        }

        Ok(crate::types::SendResult {
            rejected_recipients,
        })
    }
}