1use std::time::SystemTime;
10
11use mail_builder::encoders::base64::base64_encode;
12
13use super::{DkimSigner, Done, Signature, canonicalize::BodyHasher, sign::SignableMessage};
14
15use crate::{
16 Error,
17 common::{
18 crypto::{HashContext, HashImpl, SigningKey},
19 headers::HeaderIterator,
20 },
21};
22
23pub struct DkimSigningStream<'a, T: SigningKey> {
48 template: Signature,
49 key: &'a T,
50 state: SigningState<<<T as SigningKey>::Hasher as HashImpl>::Context>,
51}
52
53enum SigningState<H> {
54 ReadingHeaders { buffer: Vec<u8> },
56 HashingBody {
58 parsed_headers: Vec<(Vec<u8>, Vec<u8>)>,
59 body_hasher: BodyHasher<H>,
60 },
61 Done,
63}
64
65impl<T: SigningKey> DkimSigner<T, Done> {
66 pub fn sign_streaming(&self) -> DkimSigningStream<'_, T> {
85 DkimSigningStream {
86 template: self.template.clone(),
87 key: &self.key,
88 state: SigningState::ReadingHeaders {
89 buffer: Vec::with_capacity(8192),
90 },
91 }
92 }
93}
94
95impl<T: SigningKey> DkimSigningStream<'_, T> {
96 pub fn write(&mut self, chunk: &[u8]) {
104 match &mut self.state {
105 SigningState::ReadingHeaders { buffer } => {
106 buffer.extend_from_slice(chunk);
107
108 if let Some(boundary_pos) = find_header_boundary(buffer) {
110 let header_section = &buffer[..boundary_pos - 4];
112 let parsed_headers = parse_headers(header_section);
113
114 let body_hasher = BodyHasher::new(
116 <T::Hasher as HashImpl>::hasher(),
117 self.template.cb,
118 if self.template.l > 0 { u64::MAX } else { 0 },
119 );
120
121 let remaining_body = buffer[boundary_pos..].to_vec();
123
124 self.state = SigningState::HashingBody {
126 parsed_headers,
127 body_hasher,
128 };
129
130 if !remaining_body.is_empty()
132 && let SigningState::HashingBody { body_hasher, .. } = &mut self.state
133 {
134 body_hasher.write(&remaining_body);
135 }
136 }
137 }
138 SigningState::HashingBody { body_hasher, .. } => {
139 body_hasher.write(chunk);
140 }
141 SigningState::Done => {
142 }
144 }
145 }
146
147 pub fn finish(mut self) -> crate::Result<Signature>
159 where
160 <<T as SigningKey>::Hasher as HashImpl>::Context: HashContext,
161 {
162 let now = SystemTime::now()
163 .duration_since(SystemTime::UNIX_EPOCH)
164 .map(|d| d.as_secs())
165 .unwrap_or(0);
166
167 match std::mem::replace(&mut self.state, SigningState::Done) {
168 SigningState::ReadingHeaders { buffer } => {
169 let (header_section, body_section) =
172 if let Some(boundary_pos) = find_header_boundary(&buffer) {
173 (&buffer[..boundary_pos - 4], &buffer[boundary_pos..])
174 } else {
175 (buffer.as_slice(), &[][..])
177 };
178
179 let parsed_headers = parse_headers(header_section);
180
181 let mut body_hasher = BodyHasher::new(
183 <T::Hasher as HashImpl>::hasher(),
184 self.template.cb,
185 if self.template.l > 0 { u64::MAX } else { 0 },
186 );
187 body_hasher.write(body_section);
188 let (hasher, body_len) = body_hasher.finish();
189 let body_hash = hasher.complete();
190
191 self.finish_with_parsed_data(parsed_headers, body_hash, body_len, now)
192 }
193 SigningState::HashingBody {
194 parsed_headers,
195 body_hasher,
196 } => {
197 let (hasher, body_len) = body_hasher.finish();
198 let body_hash = hasher.complete();
199 self.finish_with_parsed_data(parsed_headers, body_hash, body_len, now)
200 }
201 SigningState::Done => Err(Error::NoHeadersFound),
202 }
203 }
204
205 fn finish_with_parsed_data(
206 &self,
207 parsed_headers: Vec<(Vec<u8>, Vec<u8>)>,
208 body_hash: crate::common::crypto::HashOutput,
209 body_len: u64,
210 now: u64,
211 ) -> crate::Result<Signature> {
212 let mut headers = Vec::with_capacity(self.template.h.len());
214 let mut found_headers = vec![false; self.template.h.len()];
215 let mut signed_headers = Vec::with_capacity(self.template.h.len());
216
217 for (name, value) in &parsed_headers {
218 if let Some(pos) = self
219 .template
220 .h
221 .iter()
222 .position(|header| name.eq_ignore_ascii_case(header.as_bytes()))
223 {
224 headers.push((name.as_slice(), value.as_slice()));
225 found_headers[pos] = true;
226 signed_headers.push(std::str::from_utf8(name).unwrap_or_default().to_string());
227 }
228 }
229
230 if signed_headers.is_empty() {
231 return Err(Error::NoHeadersFound);
232 }
233
234 signed_headers.reverse();
236 for (header, found) in self.template.h.iter().zip(found_headers) {
237 if !found {
238 signed_headers.push(header.to_string());
239 }
240 }
241
242 let canonical_headers = self.template.ch.canonical_headers(headers);
244
245 let mut signature = self.template.clone();
247 signature.bh = base64_encode(body_hash.as_ref())?;
248 signature.t = now;
249 signature.x = if signature.x > 0 {
250 now + signature.x
251 } else {
252 0
253 };
254 signature.h = signed_headers;
255 if signature.l > 0 {
256 signature.l = body_len;
257 }
258
259 let b = self.key.sign(SignableMessage {
261 headers: canonical_headers,
262 signature: &signature,
263 })?;
264
265 signature.b = base64_encode(&b)?;
267
268 Ok(signature)
269 }
270}
271
272fn find_header_boundary(data: &[u8]) -> Option<usize> {
274 data.windows(4)
275 .position(|w| w == b"\r\n\r\n")
276 .map(|p| p + 4)
277}
278
279fn parse_headers(header_section: &[u8]) -> Vec<(Vec<u8>, Vec<u8>)> {
282 let mut with_separator = header_section.to_vec();
284 with_separator.extend_from_slice(b"\r\n");
285
286 HeaderIterator::new(&with_separator)
287 .map(|(name, value)| (name.to_vec(), value.to_vec()))
288 .collect()
289}
290
291#[cfg(test)]
292#[allow(unused)]
293mod test {
294 use crate::{
295 common::crypto::{RsaKey, Sha256},
296 dkim::{Canonicalization, DkimSigner},
297 };
298
299 use rustls_pki_types::{PrivateKeyDer, PrivatePkcs1KeyDer, pem::PemObject};
300
301 const RSA_PRIVATE_KEY: &str = include_str!("../../resources/rsa-private.pem");
302
303 #[test]
304 fn streaming_sign_matches_regular_sign() {
305 let message = concat!(
307 "From: bill@example.com\r\n",
308 "To: jdoe@example.com\r\n",
309 "Subject: TPS Report\r\n",
310 "\r\n",
311 "I'm going to need those TPS reports ASAP. ",
312 "So, if you could do that, that'd be great.\r\n"
313 );
314
315 let pk_rsa = RsaKey::<Sha256>::from_key_der(PrivateKeyDer::Pkcs1(
316 PrivatePkcs1KeyDer::from_pem_slice(RSA_PRIVATE_KEY.as_bytes()).unwrap(),
317 ))
318 .unwrap();
319
320 let signer = DkimSigner::from_key(pk_rsa)
321 .domain("example.com")
322 .selector("default")
323 .headers(["From", "To", "Subject"]);
324
325 let sig1 = signer.sign(message.as_bytes()).unwrap();
327
328 let mut stream = signer.sign_streaming();
330 stream.write(message.as_bytes());
331 let sig2 = stream.finish().unwrap();
332
333 assert_eq!(sig1.bh, sig2.bh, "Body hashes should match");
335 assert_eq!(sig1.h, sig2.h, "Signed headers should match");
337 assert_eq!(sig1.b, sig2.b, "Signatures should match");
339 }
340
341 #[test]
342 fn streaming_sign_multiple_chunks() {
343 let header = "From: bill@example.com\r\nTo: jdoe@example.com\r\nSubject: Test\r\n\r\n";
344 let body = "Hello World! This is the body.\r\n";
345
346 let pk_rsa = RsaKey::<Sha256>::from_key_der(PrivateKeyDer::Pkcs1(
347 PrivatePkcs1KeyDer::from_pem_slice(RSA_PRIVATE_KEY.as_bytes()).unwrap(),
348 ))
349 .unwrap();
350
351 let signer = DkimSigner::from_key(pk_rsa)
352 .domain("example.com")
353 .selector("default")
354 .headers(["From", "To", "Subject"]);
355
356 let full_message = format!("{}{}", header, body);
358 let reference_sig = signer.sign(full_message.as_bytes()).unwrap();
359
360 let mut stream = signer.sign_streaming();
362 stream.write(header.as_bytes());
363 stream.write(body.as_bytes());
364 let streamed_sig = stream.finish().unwrap();
365
366 assert_eq!(
367 reference_sig.bh, streamed_sig.bh,
368 "Body hashes should match"
369 );
370 }
371
372 #[test]
373 fn streaming_sign_chunked_body() {
374 let message = concat!(
375 "From: test@example.com\r\n",
376 "Subject: Chunked Test\r\n",
377 "\r\n",
378 "Line 1\r\n",
379 "Line 2\r\n",
380 "Line 3\r\n",
381 );
382
383 let pk_rsa = RsaKey::<Sha256>::from_key_der(PrivateKeyDer::Pkcs1(
384 PrivatePkcs1KeyDer::from_pem_slice(RSA_PRIVATE_KEY.as_bytes()).unwrap(),
385 ))
386 .unwrap();
387
388 let signer = DkimSigner::from_key(pk_rsa)
389 .domain("example.com")
390 .selector("default")
391 .headers(["From", "Subject"]);
392
393 let reference_sig = signer.sign(message.as_bytes()).unwrap();
395
396 for chunk_size in [1, 2, 5, 10, 20] {
398 let mut stream = signer.sign_streaming();
399 for chunk in message.as_bytes().chunks(chunk_size) {
400 stream.write(chunk);
401 }
402 let streamed_sig = stream.finish().unwrap();
403
404 assert_eq!(
405 reference_sig.bh, streamed_sig.bh,
406 "Body hash mismatch at chunk_size={}",
407 chunk_size
408 );
409 }
410 }
411
412 #[test]
413 fn streaming_sign_split_header_boundary() {
414 let pk_rsa = RsaKey::<Sha256>::from_key_der(PrivateKeyDer::Pkcs1(
416 PrivatePkcs1KeyDer::from_pem_slice(RSA_PRIVATE_KEY.as_bytes()).unwrap(),
417 ))
418 .unwrap();
419
420 let signer = DkimSigner::from_key(pk_rsa)
421 .domain("example.com")
422 .selector("default")
423 .headers(["From", "Subject"]);
424
425 let message = "From: test@example.com\r\nSubject: Test\r\n\r\nBody";
427 let reference_sig = signer.sign(message.as_bytes()).unwrap();
428
429 let mut stream = signer.sign_streaming();
431 stream.write(b"From: test@example.com\r\n");
432 stream.write(b"Subject: Test\r\n");
433 stream.write(b"\r\n"); stream.write(b"Body");
435 let streamed_sig = stream.finish().unwrap();
436
437 assert_eq!(reference_sig.bh, streamed_sig.bh);
438 }
439
440 #[test]
441 fn streaming_sign_empty_body() {
442 let message = "From: test@example.com\r\nSubject: Empty\r\n\r\n";
443 let pk_rsa = RsaKey::<Sha256>::from_key_der(PrivateKeyDer::Pkcs1(
444 PrivatePkcs1KeyDer::from_pem_slice(RSA_PRIVATE_KEY.as_bytes()).unwrap(),
445 ))
446 .unwrap();
447
448 let signer = DkimSigner::from_key(pk_rsa)
449 .domain("example.com")
450 .selector("default")
451 .headers(["From", "Subject"]);
452
453 let reference_sig = signer.sign(message.as_bytes()).unwrap();
454
455 let mut stream = signer.sign_streaming();
456 stream.write(message.as_bytes());
457 let streamed_sig = stream.finish().unwrap();
458
459 assert_eq!(reference_sig.bh, streamed_sig.bh);
460 }
461
462 #[test]
463 fn streaming_sign_simple_canonicalization() {
464 let message = concat!(
465 "From: test@example.com\r\n",
466 "Subject: Simple Canon Test\r\n",
467 "\r\n",
468 "Body with spaces\r\n",
469 );
470
471 let pk_rsa = RsaKey::<Sha256>::from_key_der(PrivateKeyDer::Pkcs1(
472 PrivatePkcs1KeyDer::from_pem_slice(RSA_PRIVATE_KEY.as_bytes()).unwrap(),
473 ))
474 .unwrap();
475
476 let signer = DkimSigner::from_key(pk_rsa)
477 .domain("example.com")
478 .selector("default")
479 .headers(["From", "Subject"])
480 .header_canonicalization(Canonicalization::Simple)
481 .body_canonicalization(Canonicalization::Simple);
482
483 let reference_sig = signer.sign(message.as_bytes()).unwrap();
484
485 let mut stream = signer.sign_streaming();
486 stream.write(message.as_bytes());
487 let streamed_sig = stream.finish().unwrap();
488
489 assert_eq!(reference_sig.bh, streamed_sig.bh);
490 assert_eq!(reference_sig.b, streamed_sig.b);
491 }
492
493 #[test]
494 fn streaming_sign_folded_headers() {
495 let message = concat!(
497 "From: test@example.com\r\n",
498 "Subject: This is a very long subject line that\r\n",
499 " continues on the next line\r\n",
500 "\r\n",
501 "Body\r\n",
502 );
503 let pk_rsa = RsaKey::<Sha256>::from_key_der(PrivateKeyDer::Pkcs1(
504 PrivatePkcs1KeyDer::from_pem_slice(RSA_PRIVATE_KEY.as_bytes()).unwrap(),
505 ))
506 .unwrap();
507
508 let signer = DkimSigner::from_key(pk_rsa)
509 .domain("example.com")
510 .selector("default")
511 .headers(["From", "Subject"]);
512
513 let reference_sig = signer.sign(message.as_bytes()).unwrap();
514
515 let mut stream = signer.sign_streaming();
516 stream.write(message.as_bytes());
517 let streamed_sig = stream.finish().unwrap();
518
519 assert_eq!(reference_sig.bh, streamed_sig.bh);
520 }
521
522 #[test]
523 fn streaming_sign_no_matching_headers_error() {
524 let message = "X-Custom: value\r\n\r\nBody\r\n";
525
526 let pk_rsa = RsaKey::<Sha256>::from_key_der(PrivateKeyDer::Pkcs1(
527 PrivatePkcs1KeyDer::from_pem_slice(RSA_PRIVATE_KEY.as_bytes()).unwrap(),
528 ))
529 .unwrap();
530
531 let signer = DkimSigner::from_key(pk_rsa)
532 .domain("example.com")
533 .selector("default")
534 .headers(["From", "Subject"]); let mut stream = signer.sign_streaming();
537 stream.write(message.as_bytes());
538 let result = stream.finish();
539
540 assert!(matches!(result, Err(crate::Error::NoHeadersFound)));
541 }
542}