daaki_smtp/connection/sending.rs
1//! SMTP sending: MAIL FROM / RCPT TO / DATA sequence.
2//!
3//! RFC 5321 Section 3.3 (SMTP mail transaction), RFC 1854 (pipelining).
4
5#[allow(clippy::wildcard_imports)]
6use super::*;
7
8impl SmtpConnection {
9 // -----------------------------------------------------------------------
10 // Sending — SMTP (RFC 5321)
11 // -----------------------------------------------------------------------
12
13 /// Send a message to one or more recipients.
14 ///
15 /// Performs the full MAIL FROM / RCPT TO / DATA sequence (RFC 5321
16 /// Section 3.3). When the server advertises PIPELINING (RFC 1854),
17 /// commands are batched for better throughput.
18 ///
19 /// Returns a [`SendResult`](crate::types::SendResult) containing any rejected recipients
20 /// (RFC 5321 Section 3.3). When some RCPT TO commands are rejected
21 /// but at least one succeeds, the message is delivered to the accepted
22 /// recipients and the rejected ones are listed in
23 /// [`SendResult::rejected_recipients`](crate::types::SendResult::rejected_recipients).
24 ///
25 /// BCC recipients should be included in `recipients` but must NOT appear
26 /// in the message headers — that is the caller's responsibility.
27 ///
28 /// Addresses are pre-validated by their type constructors
29 /// ([`ReversePath::new`], [`ForwardPath::new`]) per RFC 5321
30 /// Section 4.1.2.
31 ///
32 /// `timeout` applies to the overall send operation. Per RFC 5321 Section
33 /// 4.5.3.2, the recommended DATA timeout is 600 seconds.
34 pub async fn send(
35 &self,
36 from: &ReversePath,
37 recipients: &[ForwardPath],
38 message: &[u8],
39 timeout: Duration,
40 ) -> Result<crate::types::SendResult, Error> {
41 self.send_with_params(from, recipients, message, None, timeout)
42 .await
43 }
44
45 /// Send a message with extended MAIL FROM parameters.
46 ///
47 /// Like [`send`](Self::send) but accepts optional [`MailFromParams`](crate::types::MailFromParams)
48 /// to include ESMTP parameters such as `BODY=` (RFC 1652 Section 3,
49 /// RFC 3030 Section 2) and `SMTPUTF8` (RFC 6531 Section 3.4).
50 ///
51 /// Returns a [`SendResult`](crate::types::SendResult) containing any rejected recipients
52 /// (RFC 5321 Section 3.3). When some RCPT TO commands are rejected
53 /// but at least one succeeds, the message is delivered to the accepted
54 /// recipients and the rejected ones are listed in
55 /// [`SendResult::rejected_recipients`](crate::types::SendResult::rejected_recipients).
56 ///
57 /// Addresses are pre-validated by their type constructors
58 /// ([`ReversePath::new`], [`ForwardPath::new`]) per RFC 5321
59 /// Section 4.1.2.
60 ///
61 /// The SIZE parameter is always included automatically when the
62 /// server advertises the SIZE extension (RFC 1870 Section 3).
63 #[allow(clippy::significant_drop_tightening)]
64 pub async fn send_with_params(
65 &self,
66 from: &ReversePath,
67 recipients: &[ForwardPath],
68 message: &[u8],
69 params: Option<&crate::types::MailFromParams>,
70 timeout: Duration,
71 ) -> Result<crate::types::SendResult, Error> {
72 // RFC 2033 Section 4.2: LMTP returns one response per accepted
73 // recipient after DATA, not a single aggregate response like SMTP.
74 // send/send_with_params reads only one DATA response; using it on
75 // an LMTP connection would leave per-recipient responses in the
76 // buffer, corrupting subsequent operations. LMTP connections must
77 // use send_lmtp() or send_lmtp_bdat() instead.
78 if self.protocol == Protocol::Lmtp {
79 return Err(Error::Protocol(
80 "send/send_with_params does not support LMTP per-recipient \
81 responses (RFC 2033 Section 4.2); LMTP connections must use \
82 send_lmtp or send_lmtp_bdat"
83 .into(),
84 ));
85 }
86 let mut inner = self.inner.lock().await;
87 // RFC 5321 Section 3.8: after a 421 response the server will close
88 // the transmission channel. Fail immediately instead of writing to
89 // a doomed connection.
90 if inner.server_shutting_down {
91 return Err(Error::Protocol(
92 "connection is shutting down after 421 (RFC 5321 Section 3.8)".into(),
93 ));
94 }
95 let effective_mail_params = Some(Self::effective_mail_from_params(
96 &inner.capabilities,
97 from,
98 recipients,
99 message,
100 params,
101 )?);
102 Self::validate_send_addresses(from, recipients)?;
103 let message_size = Self::validate_data_prerequisites(
104 &inner.capabilities,
105 message,
106 effective_mail_params.as_ref(),
107 inner.stream.is_tls(),
108 )?;
109 tokio::time::timeout(timeout, async {
110 if inner.capabilities.supports_pipelining() {
111 Self::send_pipelined(
112 &mut inner,
113 from,
114 recipients,
115 message,
116 message_size,
117 effective_mail_params.as_ref(),
118 None,
119 )
120 .await
121 } else {
122 Self::send_sequential(
123 &mut inner,
124 from,
125 recipients,
126 message,
127 message_size,
128 effective_mail_params.as_ref(),
129 None,
130 )
131 .await
132 }
133 })
134 .await
135 .map_err(|_| Error::Timeout)?
136 }
137
138 /// Send a message with both MAIL FROM and per-recipient RCPT TO parameters.
139 ///
140 /// Like [`send_with_params`](Self::send_with_params) but also accepts
141 /// per-recipient [`RcptToParams`](crate::types::RcptToParams) to include DSN parameters (NOTIFY,
142 /// ORCPT) on each RCPT TO command (RFC 3461 Sections 4.1–4.2).
143 ///
144 /// Returns a [`SendResult`](crate::types::SendResult) containing any rejected recipients
145 /// (RFC 5321 Section 3.3).
146 ///
147 /// `rcpt_params` must have the same length as `recipients` — each
148 /// entry is paired by index. Returns `Error::Protocol` if the
149 /// lengths do not match.
150 ///
151 /// Addresses are pre-validated by their type constructors
152 /// ([`ReversePath::new`], [`ForwardPath::new`]) per RFC 5321
153 /// Section 4.1.2.
154 ///
155 /// The SIZE parameter is always included automatically when the
156 /// server advertises the SIZE extension (RFC 1870 Section 3).
157 #[allow(clippy::significant_drop_tightening)]
158 pub async fn send_with_all_params(
159 &self,
160 from: &ReversePath,
161 recipients: &[ForwardPath],
162 message: &[u8],
163 mail_params: Option<&crate::types::MailFromParams>,
164 rcpt_params: &[crate::types::RcptToParams],
165 timeout: Duration,
166 ) -> Result<crate::types::SendResult, Error> {
167 // RFC 3461 Sections 4.1–4.2: each recipient must have a
168 // corresponding RcptToParams entry.
169 if recipients.len() != rcpt_params.len() {
170 return Err(Error::Protocol(format!(
171 "rcpt_params length ({}) must match recipients length ({}) \
172 (RFC 3461 Sections 4.1–4.2)",
173 rcpt_params.len(),
174 recipients.len(),
175 )));
176 }
177 // RFC 2033 Section 4.2: LMTP returns per-recipient responses;
178 // use send_lmtp_with_all_params instead.
179 if self.protocol == Protocol::Lmtp {
180 return Err(Error::Protocol(
181 "send_with_all_params does not support LMTP per-recipient \
182 responses (RFC 2033 Section 4.2); LMTP connections must use \
183 send_lmtp_with_all_params"
184 .into(),
185 ));
186 }
187 let mut inner = self.inner.lock().await;
188 // RFC 5321 Section 3.8: after a 421 response the server will close
189 // the transmission channel. Fail immediately.
190 if inner.server_shutting_down {
191 return Err(Error::Protocol(
192 "connection is shutting down after 421 (RFC 5321 Section 3.8)".into(),
193 ));
194 }
195 let effective_mail_params = Some(Self::effective_mail_from_params(
196 &inner.capabilities,
197 from,
198 recipients,
199 message,
200 mail_params,
201 )?);
202 Self::validate_send_addresses(from, recipients)?;
203 Self::validate_rcpt_params(&inner.capabilities, rcpt_params)?;
204 let message_size = Self::validate_data_prerequisites(
205 &inner.capabilities,
206 message,
207 effective_mail_params.as_ref(),
208 inner.stream.is_tls(),
209 )?;
210 tokio::time::timeout(timeout, async {
211 if inner.capabilities.supports_pipelining() {
212 Self::send_pipelined(
213 &mut inner,
214 from,
215 recipients,
216 message,
217 message_size,
218 effective_mail_params.as_ref(),
219 Some(rcpt_params),
220 )
221 .await
222 } else {
223 Self::send_sequential(
224 &mut inner,
225 from,
226 recipients,
227 message,
228 message_size,
229 effective_mail_params.as_ref(),
230 Some(rcpt_params),
231 )
232 .await
233 }
234 })
235 .await
236 .map_err(|_| Error::Timeout)?
237 }
238
239 /// Sequential send — issues each command and waits for its response
240 /// before sending the next (RFC 5321 Section 3.3).
241 ///
242 /// Returns a [`SendResult`] containing any rejected recipients so callers
243 /// can see which addresses were refused (RFC 5321 Section 3.3).
244 ///
245 /// When `rcpt_params` is `Some`, per-recipient DSN parameters are
246 /// included on each RCPT TO command (RFC 3461 Sections 4.1–4.2).
247 async fn send_sequential(
248 inner: &mut SmtpInner,
249 from: &ReversePath,
250 recipients: &[ForwardPath],
251 message: &[u8],
252 message_size: usize,
253 params: Option<&crate::types::MailFromParams>,
254 rcpt_params: Option<&[crate::types::RcptToParams]>,
255 ) -> Result<crate::types::SendResult, Error> {
256 Self::send_mail_from(inner, from, message, message_size, params).await?;
257 let (_accepted, rejected) =
258 Self::send_rcpt_to_batch(inner, recipients, rcpt_params).await?;
259
260 Self::send_data_body(inner, message).await?;
261 // SMTP: single response after the terminator (RFC 5321 Section 3.3).
262 let resp = inner.read_response().await?;
263 if !resp.is_success() {
264 return Err(Self::response_to_error(resp));
265 }
266
267 Ok(crate::types::SendResult {
268 rejected_recipients: rejected,
269 })
270 }
271
272 /// Pipelined send — batches MAIL FROM, all RCPT TOs, and DATA into a
273 /// single write, then reads responses in order (RFC 1854 Section 3).
274 ///
275 /// Returns a [`SendResult`] containing any rejected recipients so callers
276 /// can see which addresses were refused (RFC 5321 Section 3.3).
277 ///
278 /// When `rcpt_params` is `Some`, per-recipient DSN parameters are
279 /// included on each RCPT TO command (RFC 3461 Sections 4.1–4.2).
280 #[allow(clippy::too_many_lines)]
281 async fn send_pipelined(
282 inner: &mut SmtpInner,
283 from: &ReversePath,
284 recipients: &[ForwardPath],
285 message: &[u8],
286 message_size: usize,
287 params: Option<&crate::types::MailFromParams>,
288 rcpt_params: Option<&[crate::types::RcptToParams]>,
289 ) -> Result<crate::types::SendResult, Error> {
290 // Build the pipeline: MAIL FROM + RCPT TO(s) + DATA.
291 let mut buf = BytesMut::new();
292 let is_8bit = Self::message_contains_8bit(message);
293 Self::encode_mail_from_cmd(
294 &inner.capabilities,
295 &mut buf,
296 from,
297 message_size,
298 params,
299 is_8bit,
300 )?;
301 // RFC 5321 Section 4.5.3.1.4 / RFC 1870 Section 4 / RFC 6152
302 // Section 7 / RFC 6531 Section 3.4: MAIL FROM has an extended
303 // command line limit to accommodate ESMTP parameters.
304 Self::validate_mail_from_line_length(buf.len())?;
305 for (i, fp) in recipients.iter().enumerate() {
306 let start = buf.len();
307 if let Some(rp) = rcpt_params {
308 // RFC 3461 Sections 4.1–4.2: encode with DSN parameters.
309 encode::encode_rcpt_to_full(&mut buf, fp, &rp[i])?;
310 } else {
311 encode::encode_rcpt_to(&mut buf, fp)?;
312 }
313 // RFC 5321 Section 4.5.3.1.4 / RFC 3461 Section 5: validate
314 // each RCPT TO line, using the extended 1012-octet limit when
315 // DSN parameters are present.
316 let has_dsn = rcpt_params.is_some_and(|rp| !rp[i].is_empty());
317 Self::validate_rcpt_to_line_length(buf.len() - start, has_dsn)?;
318 }
319 encode::encode_data(&mut buf);
320
321 // Send all commands at once (RFC 1854 Section 3).
322 inner.write_all(&buf).await?;
323
324 // Read responses: 1 MAIL FROM + N RCPT TOs + 1 DATA.
325 // MAIL FROM response (RFC 5321 Section 4.1.1.2).
326 let mail_resp = inner.read_response().await?;
327 if !mail_resp.is_success() {
328 tracing::debug!(code = mail_resp.code, "pipelined MAIL FROM rejected");
329 // Drain remaining responses: N RCPT TOs + 1 DATA.
330 // RFC 1854 Section 3 / RFC 5321 Section 3.3: the server may
331 // have already processed the pipelined DATA command and sent
332 // a 354 intermediate reply. If so, it is waiting for message
333 // data. We must send the dot terminator to exit the DATA
334 // state, otherwise the session is left in an inconsistent
335 // state where the server expects data and the client expects
336 // command responses.
337 // Track the DATA response specifically — it's the last of
338 // the N+1 responses (N RCPT TOs + 1 DATA). We identify it by
339 // index rather than using the last successfully read response,
340 // because a mid-drain read failure would leave `last_resp`
341 // pointing at an RCPT TO response, not the DATA response
342 // (RFC 1854 Section 3).
343 let drain_count = recipients.len() + 1;
344 let mut data_resp: Option<SmtpResponse> = None;
345 for i in 0..drain_count {
346 match inner.read_response().await {
347 Ok(resp) => {
348 // The DATA response is the last one
349 // (index = drain_count - 1).
350 if i == drain_count - 1 {
351 data_resp = Some(resp);
352 }
353 }
354 Err(_) => break,
355 }
356 }
357 // RFC 5321 Section 3.3: if the DATA command received 354
358 // ("Start mail input"), we must send the dot terminator
359 // to exit DATA state.
360 // RFC 5321 Section 4.1.1.4: 354 is the only valid
361 // intermediate response to DATA.
362 if let Some(ref data_resp) = data_resp {
363 if data_resp.code == 354 {
364 buf.clear();
365 encode::encode_data_end(&mut buf, b"");
366 let _ = inner.write_all(&buf).await;
367 // Read and discard the response to the empty DATA.
368 let _ = inner.read_response().await;
369 }
370 }
371 return Err(Self::response_to_error(mail_resp));
372 }
373
374 // RCPT TO responses (RFC 5321 Section 4.1.1.3).
375 let mut accepted = 0usize;
376 let mut rejected_recipients = Vec::new();
377 for fp in recipients {
378 let resp = inner.read_response().await?;
379 if resp.is_success() {
380 accepted += 1;
381 } else {
382 tracing::debug!(
383 recipient = fp.as_str(),
384 code = resp.code,
385 "pipelined RCPT TO rejected"
386 );
387 rejected_recipients.push(crate::types::RejectedRecipient {
388 recipient: fp.clone(),
389 response: resp,
390 });
391 }
392 }
393
394 // DATA response (RFC 5321 Section 4.1.1.4).
395 let data_resp = inner.read_response().await?;
396
397 if accepted == 0 {
398 // All RCPT TOs failed — send terminator to exit DATA state
399 // per RFC 1854 Section 3, then return error.
400 // RFC 5321 Section 4.1.1.4: 354 is the only valid
401 // intermediate response to DATA.
402 if data_resp.code == 354 {
403 buf.clear();
404 // No message data was sent — pass empty slice so the leading
405 // CRLF is included (RFC 5321 Section 4.1.1.4).
406 encode::encode_data_end(&mut buf, b"");
407 // Best-effort write — the server may have already
408 // closed the connection after rejecting all recipients
409 // (RFC 1854 Section 3). Propagating the I/O error here
410 // would shadow the AllRecipientsFailed error below.
411 let _ = inner.write_all(&buf).await;
412 // Read and discard the response to the empty DATA.
413 let _ = inner.read_response().await;
414 } else {
415 // RFC 5321 Section 3.3: DATA was rejected but the mail
416 // transaction (MAIL FROM) is still open. RSET to clean up.
417 inner.rset_best_effort().await;
418 }
419 return Err(Error::AllRecipientsFailed {
420 count: recipients.len(),
421 // Extract just the responses for the error variant.
422 responses: rejected_recipients
423 .into_iter()
424 .map(|r| r.response)
425 .collect(),
426 });
427 }
428
429 // RFC 5321 Section 4.1.1.4: 354 is the only valid intermediate
430 // response to DATA. Any other code means DATA was rejected.
431 if data_resp.code != 354 {
432 // RFC 5321 Section 3.3: DATA was rejected but the mail
433 // transaction (MAIL FROM) is still open. RSET to clean up.
434 inner.rset_best_effort().await;
435 return Err(Self::response_to_error(data_resp));
436 }
437
438 // Send dot-stuffed message body + terminator in a single write
439 // (RFC 5321 Section 4.5.2 / Section 4.1.1.4).
440 let body = encode::dot_stuff_and_terminate(message);
441 inner.write_all(&body).await?;
442 let resp = inner.read_response().await?;
443 if !resp.is_success() {
444 return Err(Self::response_to_error(resp));
445 }
446
447 Ok(crate::types::SendResult {
448 rejected_recipients,
449 })
450 }
451}