Skip to main content

daaki_smtp/connection/
sending.rs

1//! SMTP sending: MAIL FROM / RCPT TO / DATA sequence.
2//!
3//! RFC 5321 Section 3.3 (SMTP mail transaction), RFC 1854 (pipelining).
4
5#[allow(clippy::wildcard_imports)]
6use super::*;
7
8impl SmtpConnection {
9    // -----------------------------------------------------------------------
10    // Sending — SMTP (RFC 5321)
11    // -----------------------------------------------------------------------
12
13    /// Send a message to one or more recipients.
14    ///
15    /// Performs the full MAIL FROM / RCPT TO / DATA sequence (RFC 5321
16    /// Section 3.3). When the server advertises PIPELINING (RFC 1854),
17    /// commands are batched for better throughput.
18    ///
19    /// Returns a [`SendResult`](crate::types::SendResult) containing any rejected recipients
20    /// (RFC 5321 Section 3.3). When some RCPT TO commands are rejected
21    /// but at least one succeeds, the message is delivered to the accepted
22    /// recipients and the rejected ones are listed in
23    /// [`SendResult::rejected_recipients`](crate::types::SendResult::rejected_recipients).
24    ///
25    /// BCC recipients should be included in `recipients` but must NOT appear
26    /// in the message headers — that is the caller's responsibility.
27    ///
28    /// Addresses are pre-validated by their type constructors
29    /// ([`ReversePath::new`], [`ForwardPath::new`]) per RFC 5321
30    /// Section 4.1.2.
31    ///
32    /// `timeout` applies to the overall send operation. Per RFC 5321 Section
33    /// 4.5.3.2, the recommended DATA timeout is 600 seconds.
34    pub async fn send(
35        &self,
36        from: &ReversePath,
37        recipients: &[ForwardPath],
38        message: &[u8],
39        timeout: Duration,
40    ) -> Result<crate::types::SendResult, Error> {
41        self.send_with_params(from, recipients, message, None, timeout)
42            .await
43    }
44
45    /// Send a message with extended MAIL FROM parameters.
46    ///
47    /// Like [`send`](Self::send) but accepts optional [`MailFromParams`](crate::types::MailFromParams)
48    /// to include ESMTP parameters such as `BODY=` (RFC 1652 Section 3,
49    /// RFC 3030 Section 2) and `SMTPUTF8` (RFC 6531 Section 3.4).
50    ///
51    /// Returns a [`SendResult`](crate::types::SendResult) containing any rejected recipients
52    /// (RFC 5321 Section 3.3). When some RCPT TO commands are rejected
53    /// but at least one succeeds, the message is delivered to the accepted
54    /// recipients and the rejected ones are listed in
55    /// [`SendResult::rejected_recipients`](crate::types::SendResult::rejected_recipients).
56    ///
57    /// Addresses are pre-validated by their type constructors
58    /// ([`ReversePath::new`], [`ForwardPath::new`]) per RFC 5321
59    /// Section 4.1.2.
60    ///
61    /// The SIZE parameter is always included automatically when the
62    /// server advertises the SIZE extension (RFC 1870 Section 3).
63    #[allow(clippy::significant_drop_tightening)]
64    pub async fn send_with_params(
65        &self,
66        from: &ReversePath,
67        recipients: &[ForwardPath],
68        message: &[u8],
69        params: Option<&crate::types::MailFromParams>,
70        timeout: Duration,
71    ) -> Result<crate::types::SendResult, Error> {
72        // RFC 2033 Section 4.2: LMTP returns one response per accepted
73        // recipient after DATA, not a single aggregate response like SMTP.
74        // send/send_with_params reads only one DATA response; using it on
75        // an LMTP connection would leave per-recipient responses in the
76        // buffer, corrupting subsequent operations. LMTP connections must
77        // use send_lmtp() or send_lmtp_bdat() instead.
78        if self.protocol == Protocol::Lmtp {
79            return Err(Error::Protocol(
80                "send/send_with_params does not support LMTP per-recipient \
81                 responses (RFC 2033 Section 4.2); LMTP connections must use \
82                 send_lmtp or send_lmtp_bdat"
83                    .into(),
84            ));
85        }
86        let mut inner = self.inner.lock().await;
87        // RFC 5321 Section 3.8: after a 421 response the server will close
88        // the transmission channel. Fail immediately instead of writing to
89        // a doomed connection.
90        if inner.server_shutting_down {
91            return Err(Error::Protocol(
92                "connection is shutting down after 421 (RFC 5321 Section 3.8)".into(),
93            ));
94        }
95        let effective_mail_params = Some(Self::effective_mail_from_params(
96            &inner.capabilities,
97            from,
98            recipients,
99            message,
100            params,
101        )?);
102        Self::validate_send_addresses(from, recipients)?;
103        let message_size = Self::validate_data_prerequisites(
104            &inner.capabilities,
105            message,
106            effective_mail_params.as_ref(),
107            inner.stream.is_tls(),
108        )?;
109        tokio::time::timeout(timeout, async {
110            if inner.capabilities.supports_pipelining() {
111                Self::send_pipelined(
112                    &mut inner,
113                    from,
114                    recipients,
115                    message,
116                    message_size,
117                    effective_mail_params.as_ref(),
118                    None,
119                )
120                .await
121            } else {
122                Self::send_sequential(
123                    &mut inner,
124                    from,
125                    recipients,
126                    message,
127                    message_size,
128                    effective_mail_params.as_ref(),
129                    None,
130                )
131                .await
132            }
133        })
134        .await
135        .map_err(|_| Error::Timeout)?
136    }
137
138    /// Send a message with both MAIL FROM and per-recipient RCPT TO parameters.
139    ///
140    /// Like [`send_with_params`](Self::send_with_params) but also accepts
141    /// per-recipient [`RcptToParams`](crate::types::RcptToParams) to include DSN parameters (NOTIFY,
142    /// ORCPT) on each RCPT TO command (RFC 3461 Sections 4.1–4.2).
143    ///
144    /// Returns a [`SendResult`](crate::types::SendResult) containing any rejected recipients
145    /// (RFC 5321 Section 3.3).
146    ///
147    /// `rcpt_params` must have the same length as `recipients` — each
148    /// entry is paired by index. Returns `Error::Protocol` if the
149    /// lengths do not match.
150    ///
151    /// Addresses are pre-validated by their type constructors
152    /// ([`ReversePath::new`], [`ForwardPath::new`]) per RFC 5321
153    /// Section 4.1.2.
154    ///
155    /// The SIZE parameter is always included automatically when the
156    /// server advertises the SIZE extension (RFC 1870 Section 3).
157    #[allow(clippy::significant_drop_tightening)]
158    pub async fn send_with_all_params(
159        &self,
160        from: &ReversePath,
161        recipients: &[ForwardPath],
162        message: &[u8],
163        mail_params: Option<&crate::types::MailFromParams>,
164        rcpt_params: &[crate::types::RcptToParams],
165        timeout: Duration,
166    ) -> Result<crate::types::SendResult, Error> {
167        // RFC 3461 Sections 4.1–4.2: each recipient must have a
168        // corresponding RcptToParams entry.
169        if recipients.len() != rcpt_params.len() {
170            return Err(Error::Protocol(format!(
171                "rcpt_params length ({}) must match recipients length ({}) \
172                 (RFC 3461 Sections 4.1–4.2)",
173                rcpt_params.len(),
174                recipients.len(),
175            )));
176        }
177        // RFC 2033 Section 4.2: LMTP returns per-recipient responses;
178        // use send_lmtp_with_all_params instead.
179        if self.protocol == Protocol::Lmtp {
180            return Err(Error::Protocol(
181                "send_with_all_params does not support LMTP per-recipient \
182                 responses (RFC 2033 Section 4.2); LMTP connections must use \
183                 send_lmtp_with_all_params"
184                    .into(),
185            ));
186        }
187        let mut inner = self.inner.lock().await;
188        // RFC 5321 Section 3.8: after a 421 response the server will close
189        // the transmission channel. Fail immediately.
190        if inner.server_shutting_down {
191            return Err(Error::Protocol(
192                "connection is shutting down after 421 (RFC 5321 Section 3.8)".into(),
193            ));
194        }
195        let effective_mail_params = Some(Self::effective_mail_from_params(
196            &inner.capabilities,
197            from,
198            recipients,
199            message,
200            mail_params,
201        )?);
202        Self::validate_send_addresses(from, recipients)?;
203        Self::validate_rcpt_params(&inner.capabilities, rcpt_params)?;
204        let message_size = Self::validate_data_prerequisites(
205            &inner.capabilities,
206            message,
207            effective_mail_params.as_ref(),
208            inner.stream.is_tls(),
209        )?;
210        tokio::time::timeout(timeout, async {
211            if inner.capabilities.supports_pipelining() {
212                Self::send_pipelined(
213                    &mut inner,
214                    from,
215                    recipients,
216                    message,
217                    message_size,
218                    effective_mail_params.as_ref(),
219                    Some(rcpt_params),
220                )
221                .await
222            } else {
223                Self::send_sequential(
224                    &mut inner,
225                    from,
226                    recipients,
227                    message,
228                    message_size,
229                    effective_mail_params.as_ref(),
230                    Some(rcpt_params),
231                )
232                .await
233            }
234        })
235        .await
236        .map_err(|_| Error::Timeout)?
237    }
238
239    /// Sequential send — issues each command and waits for its response
240    /// before sending the next (RFC 5321 Section 3.3).
241    ///
242    /// Returns a [`SendResult`] containing any rejected recipients so callers
243    /// can see which addresses were refused (RFC 5321 Section 3.3).
244    ///
245    /// When `rcpt_params` is `Some`, per-recipient DSN parameters are
246    /// included on each RCPT TO command (RFC 3461 Sections 4.1–4.2).
247    async fn send_sequential(
248        inner: &mut SmtpInner,
249        from: &ReversePath,
250        recipients: &[ForwardPath],
251        message: &[u8],
252        message_size: usize,
253        params: Option<&crate::types::MailFromParams>,
254        rcpt_params: Option<&[crate::types::RcptToParams]>,
255    ) -> Result<crate::types::SendResult, Error> {
256        Self::send_mail_from(inner, from, message, message_size, params).await?;
257        let (_accepted, rejected) =
258            Self::send_rcpt_to_batch(inner, recipients, rcpt_params).await?;
259
260        Self::send_data_body(inner, message).await?;
261        // SMTP: single response after the terminator (RFC 5321 Section 3.3).
262        let resp = inner.read_response().await?;
263        if !resp.is_success() {
264            return Err(Self::response_to_error(resp));
265        }
266
267        Ok(crate::types::SendResult {
268            rejected_recipients: rejected,
269        })
270    }
271
272    /// Pipelined send — batches MAIL FROM, all RCPT TOs, and DATA into a
273    /// single write, then reads responses in order (RFC 1854 Section 3).
274    ///
275    /// Returns a [`SendResult`] containing any rejected recipients so callers
276    /// can see which addresses were refused (RFC 5321 Section 3.3).
277    ///
278    /// When `rcpt_params` is `Some`, per-recipient DSN parameters are
279    /// included on each RCPT TO command (RFC 3461 Sections 4.1–4.2).
280    #[allow(clippy::too_many_lines)]
281    async fn send_pipelined(
282        inner: &mut SmtpInner,
283        from: &ReversePath,
284        recipients: &[ForwardPath],
285        message: &[u8],
286        message_size: usize,
287        params: Option<&crate::types::MailFromParams>,
288        rcpt_params: Option<&[crate::types::RcptToParams]>,
289    ) -> Result<crate::types::SendResult, Error> {
290        // Build the pipeline: MAIL FROM + RCPT TO(s) + DATA.
291        let mut buf = BytesMut::new();
292        let is_8bit = Self::message_contains_8bit(message);
293        Self::encode_mail_from_cmd(
294            &inner.capabilities,
295            &mut buf,
296            from,
297            message_size,
298            params,
299            is_8bit,
300        )?;
301        // RFC 5321 Section 4.5.3.1.4 / RFC 1870 Section 4 / RFC 6152
302        // Section 7 / RFC 6531 Section 3.4: MAIL FROM has an extended
303        // command line limit to accommodate ESMTP parameters.
304        Self::validate_mail_from_line_length(buf.len())?;
305        for (i, fp) in recipients.iter().enumerate() {
306            let start = buf.len();
307            if let Some(rp) = rcpt_params {
308                // RFC 3461 Sections 4.1–4.2: encode with DSN parameters.
309                encode::encode_rcpt_to_full(&mut buf, fp, &rp[i])?;
310            } else {
311                encode::encode_rcpt_to(&mut buf, fp)?;
312            }
313            // RFC 5321 Section 4.5.3.1.4 / RFC 3461 Section 5: validate
314            // each RCPT TO line, using the extended 1012-octet limit when
315            // DSN parameters are present.
316            let has_dsn = rcpt_params.is_some_and(|rp| !rp[i].is_empty());
317            Self::validate_rcpt_to_line_length(buf.len() - start, has_dsn)?;
318        }
319        encode::encode_data(&mut buf);
320
321        // Send all commands at once (RFC 1854 Section 3).
322        inner.write_all(&buf).await?;
323
324        // Read responses: 1 MAIL FROM + N RCPT TOs + 1 DATA.
325        // MAIL FROM response (RFC 5321 Section 4.1.1.2).
326        let mail_resp = inner.read_response().await?;
327        if !mail_resp.is_success() {
328            tracing::debug!(code = mail_resp.code, "pipelined MAIL FROM rejected");
329            // Drain remaining responses: N RCPT TOs + 1 DATA.
330            // RFC 1854 Section 3 / RFC 5321 Section 3.3: the server may
331            // have already processed the pipelined DATA command and sent
332            // a 354 intermediate reply. If so, it is waiting for message
333            // data. We must send the dot terminator to exit the DATA
334            // state, otherwise the session is left in an inconsistent
335            // state where the server expects data and the client expects
336            // command responses.
337            // Track the DATA response specifically — it's the last of
338            // the N+1 responses (N RCPT TOs + 1 DATA). We identify it by
339            // index rather than using the last successfully read response,
340            // because a mid-drain read failure would leave `last_resp`
341            // pointing at an RCPT TO response, not the DATA response
342            // (RFC 1854 Section 3).
343            let drain_count = recipients.len() + 1;
344            let mut data_resp: Option<SmtpResponse> = None;
345            for i in 0..drain_count {
346                match inner.read_response().await {
347                    Ok(resp) => {
348                        // The DATA response is the last one
349                        // (index = drain_count - 1).
350                        if i == drain_count - 1 {
351                            data_resp = Some(resp);
352                        }
353                    }
354                    Err(_) => break,
355                }
356            }
357            // RFC 5321 Section 3.3: if the DATA command received 354
358            // ("Start mail input"), we must send the dot terminator
359            // to exit DATA state.
360            // RFC 5321 Section 4.1.1.4: 354 is the only valid
361            // intermediate response to DATA.
362            if let Some(ref data_resp) = data_resp {
363                if data_resp.code == 354 {
364                    buf.clear();
365                    encode::encode_data_end(&mut buf, b"");
366                    let _ = inner.write_all(&buf).await;
367                    // Read and discard the response to the empty DATA.
368                    let _ = inner.read_response().await;
369                }
370            }
371            return Err(Self::response_to_error(mail_resp));
372        }
373
374        // RCPT TO responses (RFC 5321 Section 4.1.1.3).
375        let mut accepted = 0usize;
376        let mut rejected_recipients = Vec::new();
377        for fp in recipients {
378            let resp = inner.read_response().await?;
379            if resp.is_success() {
380                accepted += 1;
381            } else {
382                tracing::debug!(
383                    recipient = fp.as_str(),
384                    code = resp.code,
385                    "pipelined RCPT TO rejected"
386                );
387                rejected_recipients.push(crate::types::RejectedRecipient {
388                    recipient: fp.clone(),
389                    response: resp,
390                });
391            }
392        }
393
394        // DATA response (RFC 5321 Section 4.1.1.4).
395        let data_resp = inner.read_response().await?;
396
397        if accepted == 0 {
398            // All RCPT TOs failed — send terminator to exit DATA state
399            // per RFC 1854 Section 3, then return error.
400            // RFC 5321 Section 4.1.1.4: 354 is the only valid
401            // intermediate response to DATA.
402            if data_resp.code == 354 {
403                buf.clear();
404                // No message data was sent — pass empty slice so the leading
405                // CRLF is included (RFC 5321 Section 4.1.1.4).
406                encode::encode_data_end(&mut buf, b"");
407                // Best-effort write — the server may have already
408                // closed the connection after rejecting all recipients
409                // (RFC 1854 Section 3). Propagating the I/O error here
410                // would shadow the AllRecipientsFailed error below.
411                let _ = inner.write_all(&buf).await;
412                // Read and discard the response to the empty DATA.
413                let _ = inner.read_response().await;
414            } else {
415                // RFC 5321 Section 3.3: DATA was rejected but the mail
416                // transaction (MAIL FROM) is still open. RSET to clean up.
417                inner.rset_best_effort().await;
418            }
419            return Err(Error::AllRecipientsFailed {
420                count: recipients.len(),
421                // Extract just the responses for the error variant.
422                responses: rejected_recipients
423                    .into_iter()
424                    .map(|r| r.response)
425                    .collect(),
426            });
427        }
428
429        // RFC 5321 Section 4.1.1.4: 354 is the only valid intermediate
430        // response to DATA. Any other code means DATA was rejected.
431        if data_resp.code != 354 {
432            // RFC 5321 Section 3.3: DATA was rejected but the mail
433            // transaction (MAIL FROM) is still open. RSET to clean up.
434            inner.rset_best_effort().await;
435            return Err(Self::response_to_error(data_resp));
436        }
437
438        // Send dot-stuffed message body + terminator in a single write
439        // (RFC 5321 Section 4.5.2 / Section 4.1.1.4).
440        let body = encode::dot_stuff_and_terminate(message);
441        inner.write_all(&body).await?;
442        let resp = inner.read_response().await?;
443        if !resp.is_success() {
444            return Err(Self::response_to_error(resp));
445        }
446
447        Ok(crate::types::SendResult {
448            rejected_recipients,
449        })
450    }
451}