mail_auth/dkim/
streaming.rs

1/*
2 * SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
3 *
4 * SPDX-License-Identifier: Apache-2.0 OR MIT
5 */
6
7//! Streaming DKIM signing API for reduced memory usage with large emails.
8
9use std::time::SystemTime;
10
11use mail_builder::encoders::base64::base64_encode;
12
13use super::{DkimSigner, Done, Signature, canonicalize::BodyHasher, sign::SignableMessage};
14
15use crate::{
16    Error,
17    common::{
18        crypto::{HashContext, HashImpl, SigningKey},
19        headers::HeaderIterator,
20    },
21};
22
23/// A streaming DKIM signer that allows signing messages in chunks.
24///
25/// This is useful when you want to avoid loading the entire message into
26/// memory before signing. Headers are buffered internally until the
27/// header/body boundary is detected, then body content is streamed through
28/// the hasher.
29///
30/// # Example
31///
32/// ```ignore
33/// let signer = DkimSigner::from_key(key)
34///     .domain("example.com")
35///     .selector("default")
36///     .headers(["From", "To", "Subject"]);
37///
38/// let mut stream = signer.sign_streaming();
39/// stream.write(b"From: sender@example.com\r\n");
40/// stream.write(b"To: recipient@example.com\r\n");
41/// stream.write(b"Subject: Test\r\n");
42/// stream.write(b"\r\n");
43/// stream.write(b"Body content here...");
44///
45/// let signature = stream.finish()?;
46/// ```
47pub struct DkimSigningStream<'a, T: SigningKey> {
48    template: Signature,
49    key: &'a T,
50    state: SigningState<<<T as SigningKey>::Hasher as HashImpl>::Context>,
51}
52
53enum SigningState<H> {
54    /// Accumulating headers until \r\n\r\n is found
55    ReadingHeaders { buffer: Vec<u8> },
56    /// Headers parsed, now hashing body
57    HashingBody {
58        parsed_headers: Vec<(Vec<u8>, Vec<u8>)>,
59        body_hasher: BodyHasher<H>,
60    },
61    /// Finished or consumed
62    Done,
63}
64
65impl<T: SigningKey> DkimSigner<T, Done> {
66    /// Creates a streaming DKIM signer.
67    ///
68    /// Feed raw message data via [`DkimSigningStream::write`], then call
69    /// [`DkimSigningStream::finish`] to get the signature.
70    ///
71    /// Headers are buffered internally until the header/body boundary (`\r\n\r\n`)
72    /// is detected. After that, body content is streamed through the hasher
73    /// without additional buffering.
74    ///
75    /// # Example
76    ///
77    /// ```ignore
78    /// let mut stream = signer.sign_streaming();
79    /// for chunk in message_chunks {
80    ///     stream.write(chunk);
81    /// }
82    /// let signature = stream.finish()?;
83    /// ```
84    pub fn sign_streaming(&self) -> DkimSigningStream<'_, T> {
85        DkimSigningStream {
86            template: self.template.clone(),
87            key: &self.key,
88            state: SigningState::ReadingHeaders {
89                buffer: Vec::with_capacity(8192),
90            },
91        }
92    }
93}
94
95impl<T: SigningKey> DkimSigningStream<'_, T> {
96    /// Feed a chunk of raw message data to the signer.
97    ///
98    /// Data should be provided in order, starting with headers. The header/body
99    /// boundary (`\r\n\r\n`) is automatically detected.
100    ///
101    /// While reading headers, all data is buffered. Once the header/body boundary
102    /// is detected, subsequent body data is streamed directly to the hasher.
103    pub fn write(&mut self, chunk: &[u8]) {
104        match &mut self.state {
105            SigningState::ReadingHeaders { buffer } => {
106                buffer.extend_from_slice(chunk);
107
108                // Check for header/body boundary
109                if let Some(boundary_pos) = find_header_boundary(buffer) {
110                    // Parse headers from buffer[..boundary_pos - 4] (exclude the \r\n\r\n)
111                    let header_section = &buffer[..boundary_pos - 4];
112                    let parsed_headers = parse_headers(header_section);
113
114                    // Create body hasher
115                    let body_hasher = BodyHasher::new(
116                        <T::Hasher as HashImpl>::hasher(),
117                        self.template.cb,
118                        if self.template.l > 0 { u64::MAX } else { 0 },
119                    );
120
121                    // Get any body data that was in the buffer after the boundary
122                    let remaining_body = buffer[boundary_pos..].to_vec();
123
124                    // Transition state
125                    self.state = SigningState::HashingBody {
126                        parsed_headers,
127                        body_hasher,
128                    };
129
130                    // Hash any body data that was in the buffer
131                    if !remaining_body.is_empty()
132                        && let SigningState::HashingBody { body_hasher, .. } = &mut self.state
133                    {
134                        body_hasher.write(&remaining_body);
135                    }
136                }
137            }
138            SigningState::HashingBody { body_hasher, .. } => {
139                body_hasher.write(chunk);
140            }
141            SigningState::Done => {
142                // Ignore writes after finish
143            }
144        }
145    }
146
147    /// Finalize the signature.
148    ///
149    /// Consumes the stream and returns the DKIM signature. The current system
150    /// time is used for the `t=` timestamp.
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if:
155    /// - No headers matching the signer's header list were found
156    /// - The cryptographic signing operation fails
157    /// - `finish()` was already called
158    pub fn finish(mut self) -> crate::Result<Signature>
159    where
160        <<T as SigningKey>::Hasher as HashImpl>::Context: HashContext,
161    {
162        let now = SystemTime::now()
163            .duration_since(SystemTime::UNIX_EPOCH)
164            .map(|d| d.as_secs())
165            .unwrap_or(0);
166
167        match std::mem::replace(&mut self.state, SigningState::Done) {
168            SigningState::ReadingHeaders { buffer } => {
169                // Never saw body boundary - check if we have any headers at all
170                // This handles the edge case of a message with no body
171                let (header_section, body_section) =
172                    if let Some(boundary_pos) = find_header_boundary(&buffer) {
173                        (&buffer[..boundary_pos - 4], &buffer[boundary_pos..])
174                    } else {
175                        // No boundary found - treat entire buffer as headers with empty body
176                        (buffer.as_slice(), &[][..])
177                    };
178
179                let parsed_headers = parse_headers(header_section);
180
181                // Hash the body (may be empty)
182                let mut body_hasher = BodyHasher::new(
183                    <T::Hasher as HashImpl>::hasher(),
184                    self.template.cb,
185                    if self.template.l > 0 { u64::MAX } else { 0 },
186                );
187                body_hasher.write(body_section);
188                let (hasher, body_len) = body_hasher.finish();
189                let body_hash = hasher.complete();
190
191                self.finish_with_parsed_data(parsed_headers, body_hash, body_len, now)
192            }
193            SigningState::HashingBody {
194                parsed_headers,
195                body_hasher,
196            } => {
197                let (hasher, body_len) = body_hasher.finish();
198                let body_hash = hasher.complete();
199                self.finish_with_parsed_data(parsed_headers, body_hash, body_len, now)
200            }
201            SigningState::Done => Err(Error::NoHeadersFound),
202        }
203    }
204
205    fn finish_with_parsed_data(
206        &self,
207        parsed_headers: Vec<(Vec<u8>, Vec<u8>)>,
208        body_hash: crate::common::crypto::HashOutput,
209        body_len: u64,
210        now: u64,
211    ) -> crate::Result<Signature> {
212        // Filter headers to only those in template.h and build signed_headers list
213        let mut headers = Vec::with_capacity(self.template.h.len());
214        let mut found_headers = vec![false; self.template.h.len()];
215        let mut signed_headers = Vec::with_capacity(self.template.h.len());
216
217        for (name, value) in &parsed_headers {
218            if let Some(pos) = self
219                .template
220                .h
221                .iter()
222                .position(|header| name.eq_ignore_ascii_case(header.as_bytes()))
223            {
224                headers.push((name.as_slice(), value.as_slice()));
225                found_headers[pos] = true;
226                signed_headers.push(std::str::from_utf8(name).unwrap_or_default().to_string());
227            }
228        }
229
230        if signed_headers.is_empty() {
231            return Err(Error::NoHeadersFound);
232        }
233
234        // Add any missing headers (in reverse order as per DKIM spec)
235        signed_headers.reverse();
236        for (header, found) in self.template.h.iter().zip(found_headers) {
237            if !found {
238                signed_headers.push(header.to_string());
239            }
240        }
241
242        // Build canonical headers
243        let canonical_headers = self.template.ch.canonical_headers(headers);
244
245        // Create Signature
246        let mut signature = self.template.clone();
247        signature.bh = base64_encode(body_hash.as_ref())?;
248        signature.t = now;
249        signature.x = if signature.x > 0 {
250            now + signature.x
251        } else {
252            0
253        };
254        signature.h = signed_headers;
255        if signature.l > 0 {
256            signature.l = body_len;
257        }
258
259        // Sign
260        let b = self.key.sign(SignableMessage {
261            headers: canonical_headers,
262            signature: &signature,
263        })?;
264
265        // Encode
266        signature.b = base64_encode(&b)?;
267
268        Ok(signature)
269    }
270}
271
272/// Find the header/body boundary (\r\n\r\n) and return the position after it
273fn find_header_boundary(data: &[u8]) -> Option<usize> {
274    data.windows(4)
275        .position(|w| w == b"\r\n\r\n")
276        .map(|p| p + 4)
277}
278
279/// Parse raw header bytes into (name, value) pairs
280/// Uses the same HeaderIterator as the regular sign() method to ensure consistency
281fn parse_headers(header_section: &[u8]) -> Vec<(Vec<u8>, Vec<u8>)> {
282    // Add a fake body separator so HeaderIterator works correctly
283    let mut with_separator = header_section.to_vec();
284    with_separator.extend_from_slice(b"\r\n");
285
286    HeaderIterator::new(&with_separator)
287        .map(|(name, value)| (name.to_vec(), value.to_vec()))
288        .collect()
289}
290
291#[cfg(test)]
292#[allow(unused)]
293mod test {
294    use crate::{
295        common::crypto::{RsaKey, Sha256},
296        dkim::{Canonicalization, DkimSigner},
297    };
298    #[cfg(all(feature = "ring", not(feature = "rust-crypto")))]
299    use rustls_pki_types::{PrivateKeyDer, PrivatePkcs1KeyDer, pem::PemObject};
300
301    const RSA_PRIVATE_KEY: &str = include_str!("../../resources/rsa-private.pem");
302
303    #[test]
304    fn streaming_sign_matches_regular_sign() {
305        // Test that sign_streaming() produces same body hash as sign()
306        let message = concat!(
307            "From: bill@example.com\r\n",
308            "To: jdoe@example.com\r\n",
309            "Subject: TPS Report\r\n",
310            "\r\n",
311            "I'm going to need those TPS reports ASAP. ",
312            "So, if you could do that, that'd be great.\r\n"
313        );
314
315        #[cfg(feature = "rust-crypto")]
316        let pk_rsa = RsaKey::<Sha256>::from_pkcs1_pem(RSA_PRIVATE_KEY).unwrap();
317        #[cfg(all(feature = "ring", not(feature = "rust-crypto")))]
318        let pk_rsa = RsaKey::<Sha256>::from_key_der(PrivateKeyDer::Pkcs1(
319            PrivatePkcs1KeyDer::from_pem_slice(RSA_PRIVATE_KEY.as_bytes()).unwrap(),
320        ))
321        .unwrap();
322
323        let signer = DkimSigner::from_key(pk_rsa)
324            .domain("example.com")
325            .selector("default")
326            .headers(["From", "To", "Subject"]);
327
328        // Regular sign
329        let sig1 = signer.sign(message.as_bytes()).unwrap();
330
331        // Streaming sign - single chunk
332        let mut stream = signer.sign_streaming();
333        stream.write(message.as_bytes());
334        let sig2 = stream.finish().unwrap();
335
336        // Body hashes should match
337        assert_eq!(sig1.bh, sig2.bh, "Body hashes should match");
338        // Signed headers should match
339        assert_eq!(sig1.h, sig2.h, "Signed headers should match");
340        // Signature should match (same key, same content, same body hash = same signature)
341        assert_eq!(sig1.b, sig2.b, "Signatures should match");
342    }
343
344    #[test]
345    fn streaming_sign_multiple_chunks() {
346        let header = "From: bill@example.com\r\nTo: jdoe@example.com\r\nSubject: Test\r\n\r\n";
347        let body = "Hello World! This is the body.\r\n";
348
349        #[cfg(feature = "rust-crypto")]
350        let pk_rsa = RsaKey::<Sha256>::from_pkcs1_pem(RSA_PRIVATE_KEY).unwrap();
351        #[cfg(all(feature = "ring", not(feature = "rust-crypto")))]
352        let pk_rsa = RsaKey::<Sha256>::from_key_der(PrivateKeyDer::Pkcs1(
353            PrivatePkcs1KeyDer::from_pem_slice(RSA_PRIVATE_KEY.as_bytes()).unwrap(),
354        ))
355        .unwrap();
356
357        let signer = DkimSigner::from_key(pk_rsa)
358            .domain("example.com")
359            .selector("default")
360            .headers(["From", "To", "Subject"]);
361
362        // Reference: single chunk
363        let full_message = format!("{}{}", header, body);
364        let reference_sig = signer.sign(full_message.as_bytes()).unwrap();
365
366        // Streaming: multiple chunks
367        let mut stream = signer.sign_streaming();
368        stream.write(header.as_bytes());
369        stream.write(body.as_bytes());
370        let streamed_sig = stream.finish().unwrap();
371
372        assert_eq!(
373            reference_sig.bh, streamed_sig.bh,
374            "Body hashes should match"
375        );
376    }
377
378    #[test]
379    fn streaming_sign_chunked_body() {
380        let message = concat!(
381            "From: test@example.com\r\n",
382            "Subject: Chunked Test\r\n",
383            "\r\n",
384            "Line 1\r\n",
385            "Line 2\r\n",
386            "Line 3\r\n",
387        );
388
389        #[cfg(feature = "rust-crypto")]
390        let pk_rsa = RsaKey::<Sha256>::from_pkcs1_pem(RSA_PRIVATE_KEY).unwrap();
391        #[cfg(all(feature = "ring", not(feature = "rust-crypto")))]
392        let pk_rsa = RsaKey::<Sha256>::from_key_der(PrivateKeyDer::Pkcs1(
393            PrivatePkcs1KeyDer::from_pem_slice(RSA_PRIVATE_KEY.as_bytes()).unwrap(),
394        ))
395        .unwrap();
396
397        let signer = DkimSigner::from_key(pk_rsa)
398            .domain("example.com")
399            .selector("default")
400            .headers(["From", "Subject"]);
401
402        // Reference
403        let reference_sig = signer.sign(message.as_bytes()).unwrap();
404
405        // Chunked at various sizes
406        for chunk_size in [1, 2, 5, 10, 20] {
407            let mut stream = signer.sign_streaming();
408            for chunk in message.as_bytes().chunks(chunk_size) {
409                stream.write(chunk);
410            }
411            let streamed_sig = stream.finish().unwrap();
412
413            assert_eq!(
414                reference_sig.bh, streamed_sig.bh,
415                "Body hash mismatch at chunk_size={}",
416                chunk_size
417            );
418        }
419    }
420
421    #[test]
422    fn streaming_sign_split_header_boundary() {
423        // Test where \r\n\r\n is split across chunks
424        #[cfg(feature = "rust-crypto")]
425        let pk_rsa = RsaKey::<Sha256>::from_pkcs1_pem(RSA_PRIVATE_KEY).unwrap();
426        #[cfg(all(feature = "ring", not(feature = "rust-crypto")))]
427        let pk_rsa = RsaKey::<Sha256>::from_key_der(PrivateKeyDer::Pkcs1(
428            PrivatePkcs1KeyDer::from_pem_slice(RSA_PRIVATE_KEY.as_bytes()).unwrap(),
429        ))
430        .unwrap();
431
432        let signer = DkimSigner::from_key(pk_rsa)
433            .domain("example.com")
434            .selector("default")
435            .headers(["From", "Subject"]);
436
437        // Reference
438        let message = "From: test@example.com\r\nSubject: Test\r\n\r\nBody";
439        let reference_sig = signer.sign(message.as_bytes()).unwrap();
440
441        // Split right at the boundary
442        let mut stream = signer.sign_streaming();
443        stream.write(b"From: test@example.com\r\n");
444        stream.write(b"Subject: Test\r\n");
445        stream.write(b"\r\n"); // The second \r\n completing the boundary
446        stream.write(b"Body");
447        let streamed_sig = stream.finish().unwrap();
448
449        assert_eq!(reference_sig.bh, streamed_sig.bh);
450    }
451
452    #[test]
453    fn streaming_sign_empty_body() {
454        let message = "From: test@example.com\r\nSubject: Empty\r\n\r\n";
455
456        #[cfg(feature = "rust-crypto")]
457        let pk_rsa = RsaKey::<Sha256>::from_pkcs1_pem(RSA_PRIVATE_KEY).unwrap();
458        #[cfg(all(feature = "ring", not(feature = "rust-crypto")))]
459        let pk_rsa = RsaKey::<Sha256>::from_key_der(PrivateKeyDer::Pkcs1(
460            PrivatePkcs1KeyDer::from_pem_slice(RSA_PRIVATE_KEY.as_bytes()).unwrap(),
461        ))
462        .unwrap();
463
464        let signer = DkimSigner::from_key(pk_rsa)
465            .domain("example.com")
466            .selector("default")
467            .headers(["From", "Subject"]);
468
469        let reference_sig = signer.sign(message.as_bytes()).unwrap();
470
471        let mut stream = signer.sign_streaming();
472        stream.write(message.as_bytes());
473        let streamed_sig = stream.finish().unwrap();
474
475        assert_eq!(reference_sig.bh, streamed_sig.bh);
476    }
477
478    #[test]
479    fn streaming_sign_simple_canonicalization() {
480        let message = concat!(
481            "From: test@example.com\r\n",
482            "Subject: Simple Canon Test\r\n",
483            "\r\n",
484            "Body with   spaces\r\n",
485        );
486
487        #[cfg(feature = "rust-crypto")]
488        let pk_rsa = RsaKey::<Sha256>::from_pkcs1_pem(RSA_PRIVATE_KEY).unwrap();
489        #[cfg(all(feature = "ring", not(feature = "rust-crypto")))]
490        let pk_rsa = RsaKey::<Sha256>::from_key_der(PrivateKeyDer::Pkcs1(
491            PrivatePkcs1KeyDer::from_pem_slice(RSA_PRIVATE_KEY.as_bytes()).unwrap(),
492        ))
493        .unwrap();
494
495        let signer = DkimSigner::from_key(pk_rsa)
496            .domain("example.com")
497            .selector("default")
498            .headers(["From", "Subject"])
499            .header_canonicalization(Canonicalization::Simple)
500            .body_canonicalization(Canonicalization::Simple);
501
502        let reference_sig = signer.sign(message.as_bytes()).unwrap();
503
504        let mut stream = signer.sign_streaming();
505        stream.write(message.as_bytes());
506        let streamed_sig = stream.finish().unwrap();
507
508        assert_eq!(reference_sig.bh, streamed_sig.bh);
509        assert_eq!(reference_sig.b, streamed_sig.b);
510    }
511
512    #[test]
513    fn streaming_sign_folded_headers() {
514        // Test with folded (multi-line) headers
515        let message = concat!(
516            "From: test@example.com\r\n",
517            "Subject: This is a very long subject line that\r\n",
518            " continues on the next line\r\n",
519            "\r\n",
520            "Body\r\n",
521        );
522
523        #[cfg(feature = "rust-crypto")]
524        let pk_rsa = RsaKey::<Sha256>::from_pkcs1_pem(RSA_PRIVATE_KEY).unwrap();
525        #[cfg(all(feature = "ring", not(feature = "rust-crypto")))]
526        let pk_rsa = RsaKey::<Sha256>::from_key_der(PrivateKeyDer::Pkcs1(
527            PrivatePkcs1KeyDer::from_pem_slice(RSA_PRIVATE_KEY.as_bytes()).unwrap(),
528        ))
529        .unwrap();
530
531        let signer = DkimSigner::from_key(pk_rsa)
532            .domain("example.com")
533            .selector("default")
534            .headers(["From", "Subject"]);
535
536        let reference_sig = signer.sign(message.as_bytes()).unwrap();
537
538        let mut stream = signer.sign_streaming();
539        stream.write(message.as_bytes());
540        let streamed_sig = stream.finish().unwrap();
541
542        assert_eq!(reference_sig.bh, streamed_sig.bh);
543    }
544
545    #[test]
546    fn streaming_sign_no_matching_headers_error() {
547        let message = "X-Custom: value\r\n\r\nBody\r\n";
548
549        #[cfg(feature = "rust-crypto")]
550        let pk_rsa = RsaKey::<Sha256>::from_pkcs1_pem(RSA_PRIVATE_KEY).unwrap();
551        #[cfg(all(feature = "ring", not(feature = "rust-crypto")))]
552        let pk_rsa = RsaKey::<Sha256>::from_key_der(PrivateKeyDer::Pkcs1(
553            PrivatePkcs1KeyDer::from_pem_slice(RSA_PRIVATE_KEY.as_bytes()).unwrap(),
554        ))
555        .unwrap();
556
557        let signer = DkimSigner::from_key(pk_rsa)
558            .domain("example.com")
559            .selector("default")
560            .headers(["From", "Subject"]); // These headers don't exist in message
561
562        let mut stream = signer.sign_streaming();
563        stream.write(message.as_bytes());
564        let result = stream.finish();
565
566        assert!(matches!(result, Err(crate::Error::NoHeadersFound)));
567    }
568}