1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
//! BDAT/CHUNKING sending (RFC 3030 Section 3).
//!
//! Provides `send_bdat` and `send_bdat_with_all_params` for SMTP
//! connections using the CHUNKING extension.
#[allow(clippy::wildcard_imports)]
use super::*;
impl SmtpConnection {
// -----------------------------------------------------------------------
// Sending — BDAT/CHUNKING (RFC 3030)
// -----------------------------------------------------------------------
/// Send a message using BDAT chunking (RFC 3030 Section 3).
///
/// Uses the BDAT command instead of DATA, which avoids dot-stuffing
/// and allows transmission of binary content. The server must
/// advertise the CHUNKING extension (RFC 3030).
///
/// Performs the full MAIL FROM / RCPT TO / BDAT sequence.
/// The message is sent as a single BDAT chunk with the LAST flag.
///
/// Accepts optional [`MailFromParams`](crate::types::MailFromParams) to include ESMTP parameters
/// such as `BODY=BINARYMIME` (RFC 3030 Section 2) and `SMTPUTF8`
/// (RFC 6531 Section 3.4). The SIZE parameter is always included
/// automatically when the server advertises the SIZE extension
/// (RFC 1870 Section 3).
///
/// Addresses are pre-validated by their type constructors
/// ([`ReversePath::new`], [`ForwardPath::new`]) per RFC 5321
/// Section 4.1.2.
///
/// Returns `Error::Protocol` if the server does not support CHUNKING.
#[allow(clippy::significant_drop_tightening)]
pub async fn send_bdat(
&self,
from: &ReversePath,
recipients: &[ForwardPath],
message: &[u8],
params: Option<&crate::types::MailFromParams>,
timeout: Duration,
) -> Result<crate::types::SendResult, Error> {
// RFC 3030 Section 3 + RFC 2033 Section 4.2: In LMTP, the server
// returns one response per accepted recipient after BDAT LAST.
// send_bdat reads a single response and cannot handle per-recipient
// results. Use send_lmtp_bdat for LMTP BDAT connections.
if self.protocol == Protocol::Lmtp {
return Err(Error::Protocol(
"send_bdat does not support LMTP per-recipient responses \
(RFC 3030 Section 3 / RFC 2033 Section 4.2); \
LMTP connections must use send_lmtp or send_lmtp_bdat"
.into(),
));
}
let mut inner = self.inner.lock().await;
// RFC 5321 Section 3.8: after a 421 response the server will close
// the transmission channel. Fail immediately.
if inner.server_shutting_down {
return Err(Error::Protocol(
"connection is shutting down after 421 (RFC 5321 Section 3.8)".into(),
));
}
let effective_mail_params = Some(Self::effective_mail_from_params(
&inner.capabilities,
from,
recipients,
message,
params,
)?);
Self::validate_send_addresses(from, recipients)?;
Self::validate_bdat_prerequisites(
&inner.capabilities,
message,
effective_mail_params.as_ref(),
inner.stream.is_tls(),
)?;
tokio::time::timeout(timeout, async {
Self::send_bdat_inner(
&mut inner,
from,
recipients,
message,
effective_mail_params.as_ref(),
None,
)
.await
})
.await
.map_err(|_| Error::Timeout)?
}
/// Send a message using BDAT chunking with both MAIL FROM and
/// per-recipient RCPT TO parameters (RFC 3030 Section 3).
///
/// Like [`send_bdat`](Self::send_bdat) but also accepts per-recipient
/// [`RcptToParams`](crate::types::RcptToParams) to include DSN parameters (NOTIFY, ORCPT) on each
/// RCPT TO command (RFC 3461 Sections 4.1–4.2).
///
/// `rcpt_params` must have the same length as `recipients` — each
/// entry is paired by index. Returns `Error::Protocol` if the
/// lengths do not match.
///
/// Addresses are pre-validated by their type constructors
/// ([`ReversePath::new`], [`ForwardPath::new`]) per RFC 5321
/// Section 4.1.2.
///
/// The server must advertise CHUNKING (RFC 3030). Returns
/// `Error::Protocol` if the server does not support it.
#[allow(clippy::significant_drop_tightening)]
pub async fn send_bdat_with_all_params(
&self,
from: &ReversePath,
recipients: &[ForwardPath],
message: &[u8],
mail_params: Option<&crate::types::MailFromParams>,
rcpt_params: &[crate::types::RcptToParams],
timeout: Duration,
) -> Result<crate::types::SendResult, Error> {
// RFC 3461 Sections 4.1–4.2: each recipient must have a
// corresponding RcptToParams entry.
if recipients.len() != rcpt_params.len() {
return Err(Error::Protocol(format!(
"rcpt_params length ({}) must match recipients length ({}) \
(RFC 3461 Sections 4.1–4.2)",
rcpt_params.len(),
recipients.len(),
)));
}
// RFC 2033 Section 4.2: LMTP returns per-recipient responses;
// use send_lmtp_bdat_with_all_params instead.
if self.protocol == Protocol::Lmtp {
return Err(Error::Protocol(
"send_bdat_with_all_params does not support LMTP per-recipient \
responses (RFC 3030 Section 3 / RFC 2033 Section 4.2); \
LMTP connections must use send_lmtp_bdat_with_all_params"
.into(),
));
}
let mut inner = self.inner.lock().await;
// RFC 5321 Section 3.8: after a 421 response the server will close
// the transmission channel. Fail immediately.
if inner.server_shutting_down {
return Err(Error::Protocol(
"connection is shutting down after 421 (RFC 5321 Section 3.8)".into(),
));
}
let effective_mail_params = Some(Self::effective_mail_from_params(
&inner.capabilities,
from,
recipients,
message,
mail_params,
)?);
Self::validate_send_addresses(from, recipients)?;
Self::validate_rcpt_params(&inner.capabilities, rcpt_params)?;
Self::validate_bdat_prerequisites(
&inner.capabilities,
message,
effective_mail_params.as_ref(),
inner.stream.is_tls(),
)?;
tokio::time::timeout(timeout, async {
Self::send_bdat_inner(
&mut inner,
from,
recipients,
message,
effective_mail_params.as_ref(),
Some(rcpt_params),
)
.await
})
.await
.map_err(|_| Error::Timeout)?
}
/// Send the MAIL FROM -> RCPT TO -> BDAT LAST sequence shared by
/// SMTP and LMTP BDAT paths (RFC 3030 Section 3).
///
/// When `rcpt_params` is `Some`, per-recipient DSN parameters are
/// included on each RCPT TO command (RFC 3461 Sections 4.1–4.2).
///
/// Returns the list of accepted recipients and rejected recipients so the
/// caller can handle protocol-specific response logic (SMTP: single
/// response; LMTP: one response per accepted recipient per RFC 2033
/// Section 4.2) and surface partial rejections (RFC 5321 Section 3.3).
pub(super) async fn send_bdat_envelope(
inner: &mut SmtpInner,
from: &ReversePath,
recipients: &[ForwardPath],
message: &[u8],
params: Option<&crate::types::MailFromParams>,
rcpt_params: Option<&[crate::types::RcptToParams]>,
) -> Result<(Vec<ForwardPath>, Vec<crate::types::RejectedRecipient>), Error> {
Self::send_mail_from(inner, from, message, message.len(), params).await?;
let (accepted, rejected) = Self::send_rcpt_to_batch(inner, recipients, rcpt_params).await?;
// BDAT <size> LAST — send the entire message as a single chunk
// (RFC 3030 Section 3). No dot-stuffing required.
let mut buf = BytesMut::new();
encode::encode_bdat(&mut buf, message.len(), true);
inner.write_all(&buf).await?;
inner.write_all(message).await?;
Ok((accepted, rejected))
}
/// Inner BDAT send logic (RFC 3030 Section 3).
///
/// When the server supports PIPELINING (RFC 1854), batches MAIL FROM,
/// all RCPT TOs, and BDAT LAST + message data into a single write for
/// better throughput — matching the DATA path's pipelining behavior.
/// Falls back to sequential command/response for non-pipelining servers.
///
/// Returns a [`SendResult`] containing any rejected recipients
/// (RFC 5321 Section 3.3).
async fn send_bdat_inner(
inner: &mut SmtpInner,
from: &ReversePath,
recipients: &[ForwardPath],
message: &[u8],
params: Option<&crate::types::MailFromParams>,
rcpt_params: Option<&[crate::types::RcptToParams]>,
) -> Result<crate::types::SendResult, Error> {
if inner.capabilities.supports_pipelining() {
Self::send_bdat_pipelined(inner, from, recipients, message, params, rcpt_params).await
} else {
Self::send_bdat_sequential(inner, from, recipients, message, params, rcpt_params).await
}
}
/// Sequential BDAT send — issues each command and waits for its
/// response before sending the next (RFC 3030 Section 3).
///
/// Returns a [`SendResult`] containing any rejected recipients
/// (RFC 5321 Section 3.3).
async fn send_bdat_sequential(
inner: &mut SmtpInner,
from: &ReversePath,
recipients: &[ForwardPath],
message: &[u8],
params: Option<&crate::types::MailFromParams>,
rcpt_params: Option<&[crate::types::RcptToParams]>,
) -> Result<crate::types::SendResult, Error> {
let (_accepted, rejected) =
Self::send_bdat_envelope(inner, from, recipients, message, params, rcpt_params).await?;
// SMTP: single response after BDAT LAST (RFC 3030 Section 3).
let resp = inner.read_response().await?;
if !resp.is_success() {
// RFC 3030 Section 3: "The resulting state from a failed BDAT
// command is indeterminate. A RSET command MUST be issued to
// clear the transaction before additional commands may be sent."
inner.rset_best_effort().await;
return Err(Self::response_to_error(resp));
}
Ok(crate::types::SendResult {
rejected_recipients: rejected,
})
}
/// Pipelined BDAT send — batches MAIL FROM, all RCPT TOs, BDAT LAST,
/// and message data into a single write, then reads responses in order
/// (RFC 1854 Section 3, RFC 3030 Section 4.2).
///
/// Unlike the DATA pipelining path, BDAT does not have an intermediate
/// 354 response — the server replies once after receiving the chunk data.
/// Per RFC 3030 Section 3, "the resulting state from a failed BDAT
/// command is indeterminate" and a RSET must be issued.
///
/// When `rcpt_params` is `Some`, per-recipient DSN parameters are
/// included on each RCPT TO command (RFC 3461 Sections 4.1–4.2).
///
/// Returns a [`SendResult`] containing any rejected recipients
/// (RFC 5321 Section 3.3).
#[allow(clippy::too_many_lines)]
async fn send_bdat_pipelined(
inner: &mut SmtpInner,
from: &ReversePath,
recipients: &[ForwardPath],
message: &[u8],
params: Option<&crate::types::MailFromParams>,
rcpt_params: Option<&[crate::types::RcptToParams]>,
) -> Result<crate::types::SendResult, Error> {
// Build the pipeline: MAIL FROM + RCPT TO(s) + BDAT <size> LAST + message data.
let mut buf = BytesMut::new();
let is_8bit = Self::message_contains_8bit(message);
Self::encode_mail_from_cmd(
&inner.capabilities,
&mut buf,
from,
message.len(),
params,
is_8bit,
)?;
// RFC 5321 Section 4.5.3.1.4 / RFC 1870 Section 4 / RFC 6152
// Section 7 / RFC 6531 Section 3.4: MAIL FROM has an extended
// command line limit to accommodate ESMTP parameters.
Self::validate_mail_from_line_length(buf.len())?;
for (i, fp) in recipients.iter().enumerate() {
let start = buf.len();
if let Some(rp) = rcpt_params {
// RFC 3461 Sections 4.1–4.2: encode with DSN parameters.
encode::encode_rcpt_to_full(&mut buf, fp, &rp[i])?;
} else {
encode::encode_rcpt_to(&mut buf, fp)?;
}
// RFC 5321 Section 4.5.3.1.4 / RFC 3461 Section 5: validate
// each RCPT TO line, using the extended 1012-octet limit when
// DSN parameters are present.
let has_dsn = rcpt_params.is_some_and(|rp| !rp[i].is_empty());
Self::validate_rcpt_to_line_length(buf.len() - start, has_dsn)?;
}
// BDAT <size> LAST — appended to the pipeline buffer so the entire
// envelope + chunk header + message body is sent in one write
// (RFC 3030 Section 3, RFC 1854 Section 3).
encode::encode_bdat(&mut buf, message.len(), true);
// Append the raw message data — BDAT requires no dot-stuffing
// (RFC 3030 Section 3).
buf.extend_from_slice(message);
// Send all commands + message data at once (RFC 1854 Section 3).
inner.write_all(&buf).await?;
// Read responses: 1 MAIL FROM + N RCPT TOs + 1 BDAT LAST.
// MAIL FROM response (RFC 5321 Section 4.1.1.2).
let mail_resp = inner.read_response().await?;
if !mail_resp.is_success() {
tracing::debug!(code = mail_resp.code, "pipelined BDAT MAIL FROM rejected");
// Drain remaining responses: N RCPT TOs + 1 BDAT LAST.
// RFC 1854 Section 3: the server processes pipelined commands
// in order; we must read all remaining responses.
// RFC 3030 Section 3: after a failed BDAT, session state is
// indeterminate — RSET is required.
let drain_count = recipients.len() + 1;
for _ in 0..drain_count {
if inner.read_response().await.is_err() {
break;
}
}
// RFC 3030 Section 3: "The resulting state from a failed BDAT
// command is indeterminate. A RSET command MUST be issued."
inner.rset_best_effort().await;
return Err(Self::response_to_error(mail_resp));
}
// RCPT TO responses (RFC 5321 Section 4.1.1.3).
let mut rejected_recipients = Vec::new();
let mut accepted_count = 0usize;
for fp in recipients {
let resp = inner.read_response().await?;
if resp.is_success() {
accepted_count += 1;
} else {
tracing::debug!(
recipient = fp.as_str(),
code = resp.code,
"pipelined BDAT RCPT TO rejected"
);
rejected_recipients.push(crate::types::RejectedRecipient {
recipient: fp.clone(),
response: resp,
});
}
}
// BDAT LAST response (RFC 3030 Section 3).
let bdat_resp = inner.read_response().await?;
if accepted_count == 0 {
// All RCPT TOs failed. The server already received the BDAT data
// but should discard it since no recipients were accepted.
// RFC 3030 Section 3: indeterminate state — issue RSET.
inner.rset_best_effort().await;
return Err(Error::AllRecipientsFailed {
count: recipients.len(),
responses: rejected_recipients
.into_iter()
.map(|r| r.response)
.collect(),
});
}
if !bdat_resp.is_success() {
// RFC 3030 Section 3: "The resulting state from a failed BDAT
// command is indeterminate. A RSET command MUST be issued to
// clear the transaction before additional commands may be sent."
inner.rset_best_effort().await;
return Err(Self::response_to_error(bdat_resp));
}
Ok(crate::types::SendResult {
rejected_recipients,
})
}
}