Skip to main content

mail_auth/dkim/
streaming.rs

1/*
2 * SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
3 *
4 * SPDX-License-Identifier: Apache-2.0 OR MIT
5 */
6
7//! Streaming DKIM signing API for reduced memory usage with large emails.
8
9use std::time::SystemTime;
10
11use mail_builder::encoders::base64::base64_encode;
12
13use super::{DkimSigner, Done, Signature, canonicalize::BodyHasher, sign::SignableMessage};
14
15use crate::{
16    Error,
17    common::{
18        crypto::{HashContext, HashImpl, SigningKey},
19        headers::HeaderIterator,
20    },
21};
22
23/// A streaming DKIM signer that allows signing messages in chunks.
24///
25/// This is useful when you want to avoid loading the entire message into
26/// memory before signing. Headers are buffered internally until the
27/// header/body boundary is detected, then body content is streamed through
28/// the hasher.
29///
30/// # Example
31///
32/// ```ignore
33/// let signer = DkimSigner::from_key(key)
34///     .domain("example.com")
35///     .selector("default")
36///     .headers(["From", "To", "Subject"]);
37///
38/// let mut stream = signer.sign_streaming();
39/// stream.write(b"From: sender@example.com\r\n");
40/// stream.write(b"To: recipient@example.com\r\n");
41/// stream.write(b"Subject: Test\r\n");
42/// stream.write(b"\r\n");
43/// stream.write(b"Body content here...");
44///
45/// let signature = stream.finish()?;
46/// ```
47pub struct DkimSigningStream<'a, T: SigningKey> {
48    template: Signature,
49    key: &'a T,
50    state: SigningState<<<T as SigningKey>::Hasher as HashImpl>::Context>,
51}
52
53enum SigningState<H> {
54    /// Accumulating headers until \r\n\r\n is found
55    ReadingHeaders { buffer: Vec<u8> },
56    /// Headers parsed, now hashing body
57    HashingBody {
58        parsed_headers: Vec<(Vec<u8>, Vec<u8>)>,
59        body_hasher: BodyHasher<H>,
60    },
61    /// Finished or consumed
62    Done,
63}
64
65impl<T: SigningKey> DkimSigner<T, Done> {
66    /// Creates a streaming DKIM signer.
67    ///
68    /// Feed raw message data via [`DkimSigningStream::write`], then call
69    /// [`DkimSigningStream::finish`] to get the signature.
70    ///
71    /// Headers are buffered internally until the header/body boundary (`\r\n\r\n`)
72    /// is detected. After that, body content is streamed through the hasher
73    /// without additional buffering.
74    ///
75    /// # Example
76    ///
77    /// ```ignore
78    /// let mut stream = signer.sign_streaming();
79    /// for chunk in message_chunks {
80    ///     stream.write(chunk);
81    /// }
82    /// let signature = stream.finish()?;
83    /// ```
84    pub fn sign_streaming(&self) -> DkimSigningStream<'_, T> {
85        DkimSigningStream {
86            template: self.template.clone(),
87            key: &self.key,
88            state: SigningState::ReadingHeaders {
89                buffer: Vec::with_capacity(8192),
90            },
91        }
92    }
93}
94
95impl<T: SigningKey> DkimSigningStream<'_, T> {
96    /// Feed a chunk of raw message data to the signer.
97    ///
98    /// Data should be provided in order, starting with headers. The header/body
99    /// boundary (`\r\n\r\n`) is automatically detected.
100    ///
101    /// While reading headers, all data is buffered. Once the header/body boundary
102    /// is detected, subsequent body data is streamed directly to the hasher.
103    pub fn write(&mut self, chunk: &[u8]) {
104        match &mut self.state {
105            SigningState::ReadingHeaders { buffer } => {
106                buffer.extend_from_slice(chunk);
107
108                // Check for header/body boundary
109                if let Some(boundary_pos) = find_header_boundary(buffer) {
110                    // Parse headers from buffer[..boundary_pos - 4] (exclude the \r\n\r\n)
111                    let header_section = &buffer[..boundary_pos - 4];
112                    let parsed_headers = parse_headers(header_section);
113
114                    // Create body hasher
115                    let body_hasher = BodyHasher::new(
116                        <T::Hasher as HashImpl>::hasher(),
117                        self.template.cb,
118                        if self.template.l > 0 { u64::MAX } else { 0 },
119                    );
120
121                    // Get any body data that was in the buffer after the boundary
122                    let remaining_body = buffer[boundary_pos..].to_vec();
123
124                    // Transition state
125                    self.state = SigningState::HashingBody {
126                        parsed_headers,
127                        body_hasher,
128                    };
129
130                    // Hash any body data that was in the buffer
131                    if !remaining_body.is_empty()
132                        && let SigningState::HashingBody { body_hasher, .. } = &mut self.state
133                    {
134                        body_hasher.write(&remaining_body);
135                    }
136                }
137            }
138            SigningState::HashingBody { body_hasher, .. } => {
139                body_hasher.write(chunk);
140            }
141            SigningState::Done => {
142                // Ignore writes after finish
143            }
144        }
145    }
146
147    /// Finalize the signature.
148    ///
149    /// Consumes the stream and returns the DKIM signature. The current system
150    /// time is used for the `t=` timestamp.
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if:
155    /// - No headers matching the signer's header list were found
156    /// - The cryptographic signing operation fails
157    /// - `finish()` was already called
158    pub fn finish(mut self) -> crate::Result<Signature>
159    where
160        <<T as SigningKey>::Hasher as HashImpl>::Context: HashContext,
161    {
162        let now = SystemTime::now()
163            .duration_since(SystemTime::UNIX_EPOCH)
164            .map(|d| d.as_secs())
165            .unwrap_or(0);
166
167        match std::mem::replace(&mut self.state, SigningState::Done) {
168            SigningState::ReadingHeaders { buffer } => {
169                // Never saw body boundary - check if we have any headers at all
170                // This handles the edge case of a message with no body
171                let (header_section, body_section) =
172                    if let Some(boundary_pos) = find_header_boundary(&buffer) {
173                        (&buffer[..boundary_pos - 4], &buffer[boundary_pos..])
174                    } else {
175                        // No boundary found - treat entire buffer as headers with empty body
176                        (buffer.as_slice(), &[][..])
177                    };
178
179                let parsed_headers = parse_headers(header_section);
180
181                // Hash the body (may be empty)
182                let mut body_hasher = BodyHasher::new(
183                    <T::Hasher as HashImpl>::hasher(),
184                    self.template.cb,
185                    if self.template.l > 0 { u64::MAX } else { 0 },
186                );
187                body_hasher.write(body_section);
188                let (hasher, body_len) = body_hasher.finish();
189                let body_hash = hasher.complete();
190
191                self.finish_with_parsed_data(parsed_headers, body_hash, body_len, now)
192            }
193            SigningState::HashingBody {
194                parsed_headers,
195                body_hasher,
196            } => {
197                let (hasher, body_len) = body_hasher.finish();
198                let body_hash = hasher.complete();
199                self.finish_with_parsed_data(parsed_headers, body_hash, body_len, now)
200            }
201            SigningState::Done => Err(Error::NoHeadersFound),
202        }
203    }
204
205    fn finish_with_parsed_data(
206        &self,
207        parsed_headers: Vec<(Vec<u8>, Vec<u8>)>,
208        body_hash: crate::common::crypto::HashOutput,
209        body_len: u64,
210        now: u64,
211    ) -> crate::Result<Signature> {
212        // Filter headers to only those in template.h and build signed_headers list
213        let mut headers = Vec::with_capacity(self.template.h.len());
214        let mut found_headers = vec![false; self.template.h.len()];
215        let mut signed_headers = Vec::with_capacity(self.template.h.len());
216
217        for (name, value) in &parsed_headers {
218            if let Some(pos) = self
219                .template
220                .h
221                .iter()
222                .position(|header| name.eq_ignore_ascii_case(header.as_bytes()))
223            {
224                headers.push((name.as_slice(), value.as_slice()));
225                found_headers[pos] = true;
226                signed_headers.push(std::str::from_utf8(name).unwrap_or_default().to_string());
227            }
228        }
229
230        if signed_headers.is_empty() {
231            return Err(Error::NoHeadersFound);
232        }
233
234        // Add any missing headers (in reverse order as per DKIM spec)
235        signed_headers.reverse();
236        for (header, found) in self.template.h.iter().zip(found_headers) {
237            if !found {
238                signed_headers.push(header.to_string());
239            }
240        }
241
242        // Build canonical headers
243        let canonical_headers = self.template.ch.canonical_headers(headers);
244
245        // Create Signature
246        let mut signature = self.template.clone();
247        signature.bh = base64_encode(body_hash.as_ref())?;
248        signature.t = now;
249        signature.x = if signature.x > 0 {
250            now + signature.x
251        } else {
252            0
253        };
254        signature.h = signed_headers;
255        if signature.l > 0 {
256            signature.l = body_len;
257        }
258
259        // Sign
260        let b = self.key.sign(SignableMessage {
261            headers: canonical_headers,
262            signature: &signature,
263        })?;
264
265        // Encode
266        signature.b = base64_encode(&b)?;
267
268        Ok(signature)
269    }
270}
271
272/// Find the header/body boundary (\r\n\r\n) and return the position after it
273fn find_header_boundary(data: &[u8]) -> Option<usize> {
274    data.windows(4)
275        .position(|w| w == b"\r\n\r\n")
276        .map(|p| p + 4)
277}
278
279/// Parse raw header bytes into (name, value) pairs
280/// Uses the same HeaderIterator as the regular sign() method to ensure consistency
281fn parse_headers(header_section: &[u8]) -> Vec<(Vec<u8>, Vec<u8>)> {
282    // Add a fake body separator so HeaderIterator works correctly
283    let mut with_separator = header_section.to_vec();
284    with_separator.extend_from_slice(b"\r\n");
285
286    HeaderIterator::new(&with_separator)
287        .map(|(name, value)| (name.to_vec(), value.to_vec()))
288        .collect()
289}
290
291#[cfg(test)]
292#[allow(unused)]
293mod test {
294    use crate::{
295        common::crypto::{RsaKey, Sha256},
296        dkim::{Canonicalization, DkimSigner},
297    };
298
299    use rustls_pki_types::{PrivateKeyDer, PrivatePkcs1KeyDer, pem::PemObject};
300
301    const RSA_PRIVATE_KEY: &str = include_str!("../../resources/rsa-private.pem");
302
303    #[test]
304    fn streaming_sign_matches_regular_sign() {
305        // Test that sign_streaming() produces same body hash as sign()
306        let message = concat!(
307            "From: bill@example.com\r\n",
308            "To: jdoe@example.com\r\n",
309            "Subject: TPS Report\r\n",
310            "\r\n",
311            "I'm going to need those TPS reports ASAP. ",
312            "So, if you could do that, that'd be great.\r\n"
313        );
314
315        let pk_rsa = RsaKey::<Sha256>::from_key_der(PrivateKeyDer::Pkcs1(
316            PrivatePkcs1KeyDer::from_pem_slice(RSA_PRIVATE_KEY.as_bytes()).unwrap(),
317        ))
318        .unwrap();
319
320        let signer = DkimSigner::from_key(pk_rsa)
321            .domain("example.com")
322            .selector("default")
323            .headers(["From", "To", "Subject"]);
324
325        // Regular sign
326        let sig1 = signer.sign(message.as_bytes()).unwrap();
327
328        // Streaming sign - single chunk
329        let mut stream = signer.sign_streaming();
330        stream.write(message.as_bytes());
331        let sig2 = stream.finish().unwrap();
332
333        // Body hashes should match
334        assert_eq!(sig1.bh, sig2.bh, "Body hashes should match");
335        // Signed headers should match
336        assert_eq!(sig1.h, sig2.h, "Signed headers should match");
337        // Signature should match (same key, same content, same body hash = same signature)
338        assert_eq!(sig1.b, sig2.b, "Signatures should match");
339    }
340
341    #[test]
342    fn streaming_sign_multiple_chunks() {
343        let header = "From: bill@example.com\r\nTo: jdoe@example.com\r\nSubject: Test\r\n\r\n";
344        let body = "Hello World! This is the body.\r\n";
345
346        let pk_rsa = RsaKey::<Sha256>::from_key_der(PrivateKeyDer::Pkcs1(
347            PrivatePkcs1KeyDer::from_pem_slice(RSA_PRIVATE_KEY.as_bytes()).unwrap(),
348        ))
349        .unwrap();
350
351        let signer = DkimSigner::from_key(pk_rsa)
352            .domain("example.com")
353            .selector("default")
354            .headers(["From", "To", "Subject"]);
355
356        // Reference: single chunk
357        let full_message = format!("{}{}", header, body);
358        let reference_sig = signer.sign(full_message.as_bytes()).unwrap();
359
360        // Streaming: multiple chunks
361        let mut stream = signer.sign_streaming();
362        stream.write(header.as_bytes());
363        stream.write(body.as_bytes());
364        let streamed_sig = stream.finish().unwrap();
365
366        assert_eq!(
367            reference_sig.bh, streamed_sig.bh,
368            "Body hashes should match"
369        );
370    }
371
372    #[test]
373    fn streaming_sign_chunked_body() {
374        let message = concat!(
375            "From: test@example.com\r\n",
376            "Subject: Chunked Test\r\n",
377            "\r\n",
378            "Line 1\r\n",
379            "Line 2\r\n",
380            "Line 3\r\n",
381        );
382
383        let pk_rsa = RsaKey::<Sha256>::from_key_der(PrivateKeyDer::Pkcs1(
384            PrivatePkcs1KeyDer::from_pem_slice(RSA_PRIVATE_KEY.as_bytes()).unwrap(),
385        ))
386        .unwrap();
387
388        let signer = DkimSigner::from_key(pk_rsa)
389            .domain("example.com")
390            .selector("default")
391            .headers(["From", "Subject"]);
392
393        // Reference
394        let reference_sig = signer.sign(message.as_bytes()).unwrap();
395
396        // Chunked at various sizes
397        for chunk_size in [1, 2, 5, 10, 20] {
398            let mut stream = signer.sign_streaming();
399            for chunk in message.as_bytes().chunks(chunk_size) {
400                stream.write(chunk);
401            }
402            let streamed_sig = stream.finish().unwrap();
403
404            assert_eq!(
405                reference_sig.bh, streamed_sig.bh,
406                "Body hash mismatch at chunk_size={}",
407                chunk_size
408            );
409        }
410    }
411
412    #[test]
413    fn streaming_sign_split_header_boundary() {
414        // Test where \r\n\r\n is split across chunks
415        let pk_rsa = RsaKey::<Sha256>::from_key_der(PrivateKeyDer::Pkcs1(
416            PrivatePkcs1KeyDer::from_pem_slice(RSA_PRIVATE_KEY.as_bytes()).unwrap(),
417        ))
418        .unwrap();
419
420        let signer = DkimSigner::from_key(pk_rsa)
421            .domain("example.com")
422            .selector("default")
423            .headers(["From", "Subject"]);
424
425        // Reference
426        let message = "From: test@example.com\r\nSubject: Test\r\n\r\nBody";
427        let reference_sig = signer.sign(message.as_bytes()).unwrap();
428
429        // Split right at the boundary
430        let mut stream = signer.sign_streaming();
431        stream.write(b"From: test@example.com\r\n");
432        stream.write(b"Subject: Test\r\n");
433        stream.write(b"\r\n"); // The second \r\n completing the boundary
434        stream.write(b"Body");
435        let streamed_sig = stream.finish().unwrap();
436
437        assert_eq!(reference_sig.bh, streamed_sig.bh);
438    }
439
440    #[test]
441    fn streaming_sign_empty_body() {
442        let message = "From: test@example.com\r\nSubject: Empty\r\n\r\n";
443        let pk_rsa = RsaKey::<Sha256>::from_key_der(PrivateKeyDer::Pkcs1(
444            PrivatePkcs1KeyDer::from_pem_slice(RSA_PRIVATE_KEY.as_bytes()).unwrap(),
445        ))
446        .unwrap();
447
448        let signer = DkimSigner::from_key(pk_rsa)
449            .domain("example.com")
450            .selector("default")
451            .headers(["From", "Subject"]);
452
453        let reference_sig = signer.sign(message.as_bytes()).unwrap();
454
455        let mut stream = signer.sign_streaming();
456        stream.write(message.as_bytes());
457        let streamed_sig = stream.finish().unwrap();
458
459        assert_eq!(reference_sig.bh, streamed_sig.bh);
460    }
461
462    #[test]
463    fn streaming_sign_simple_canonicalization() {
464        let message = concat!(
465            "From: test@example.com\r\n",
466            "Subject: Simple Canon Test\r\n",
467            "\r\n",
468            "Body with   spaces\r\n",
469        );
470
471        let pk_rsa = RsaKey::<Sha256>::from_key_der(PrivateKeyDer::Pkcs1(
472            PrivatePkcs1KeyDer::from_pem_slice(RSA_PRIVATE_KEY.as_bytes()).unwrap(),
473        ))
474        .unwrap();
475
476        let signer = DkimSigner::from_key(pk_rsa)
477            .domain("example.com")
478            .selector("default")
479            .headers(["From", "Subject"])
480            .header_canonicalization(Canonicalization::Simple)
481            .body_canonicalization(Canonicalization::Simple);
482
483        let reference_sig = signer.sign(message.as_bytes()).unwrap();
484
485        let mut stream = signer.sign_streaming();
486        stream.write(message.as_bytes());
487        let streamed_sig = stream.finish().unwrap();
488
489        assert_eq!(reference_sig.bh, streamed_sig.bh);
490        assert_eq!(reference_sig.b, streamed_sig.b);
491    }
492
493    #[test]
494    fn streaming_sign_folded_headers() {
495        // Test with folded (multi-line) headers
496        let message = concat!(
497            "From: test@example.com\r\n",
498            "Subject: This is a very long subject line that\r\n",
499            " continues on the next line\r\n",
500            "\r\n",
501            "Body\r\n",
502        );
503        let pk_rsa = RsaKey::<Sha256>::from_key_der(PrivateKeyDer::Pkcs1(
504            PrivatePkcs1KeyDer::from_pem_slice(RSA_PRIVATE_KEY.as_bytes()).unwrap(),
505        ))
506        .unwrap();
507
508        let signer = DkimSigner::from_key(pk_rsa)
509            .domain("example.com")
510            .selector("default")
511            .headers(["From", "Subject"]);
512
513        let reference_sig = signer.sign(message.as_bytes()).unwrap();
514
515        let mut stream = signer.sign_streaming();
516        stream.write(message.as_bytes());
517        let streamed_sig = stream.finish().unwrap();
518
519        assert_eq!(reference_sig.bh, streamed_sig.bh);
520    }
521
522    #[test]
523    fn streaming_sign_no_matching_headers_error() {
524        let message = "X-Custom: value\r\n\r\nBody\r\n";
525
526        let pk_rsa = RsaKey::<Sha256>::from_key_der(PrivateKeyDer::Pkcs1(
527            PrivatePkcs1KeyDer::from_pem_slice(RSA_PRIVATE_KEY.as_bytes()).unwrap(),
528        ))
529        .unwrap();
530
531        let signer = DkimSigner::from_key(pk_rsa)
532            .domain("example.com")
533            .selector("default")
534            .headers(["From", "Subject"]); // These headers don't exist in message
535
536        let mut stream = signer.sign_streaming();
537        stream.write(message.as_bytes());
538        let result = stream.finish();
539
540        assert!(matches!(result, Err(crate::Error::NoHeadersFound)));
541    }
542}