pg_core/client/rust/
stream.rs

1//! Streaming mode.
2
3use alloc::string::ToString;
4
5use crate::artifacts::{PublicKey, SigningKeyExt, UserSecretKey, VerifyingKey};
6use crate::client::*;
7use crate::error::Error;
8use crate::identity::{EncryptionPolicy, Policy};
9use ibe::kem::cgw_kv::CGWKV;
10use ibs::gg::{Identity, Signature, Signer, Verifier, SIG_BYTES};
11
12use aead::stream::{DecryptorBE32, EncryptorBE32};
13use aead::KeyInit;
14use aes_gcm::Aes128Gcm;
15use alloc::vec::Vec;
16use futures::io::{AsyncRead, AsyncWrite};
17use futures::io::{AsyncReadExt, AsyncWriteExt};
18use futures::TryFutureExt;
19use rand::{CryptoRng, RngCore};
20
21/// Configures an [`Sealer`] to process a payload stream.
22#[derive(Debug)]
23pub struct SealerStreamConfig {
24    /// Segment size.
25    segment_size: u32,
26    /// AEAD key.
27    key: [u8; KEY_SIZE],
28    /// AEAD nonce.
29    nonce: [u8; STREAM_NONCE_SIZE],
30}
31
32/// Configures an [`Unsealer`] to process a payload stream.
33#[derive(Debug)]
34pub struct UnsealerStreamConfig {
35    segment_size: u32,
36}
37
38impl SealerConfig for SealerStreamConfig {}
39impl UnsealerConfig for UnsealerStreamConfig {}
40impl crate::client::sealed::SealerConfig for SealerStreamConfig {}
41impl crate::client::sealed::UnsealerConfig for UnsealerStreamConfig {}
42
43impl<'r, Rng: RngCore + CryptoRng> Sealer<'r, Rng, SealerStreamConfig> {
44    /// Construct a new [`Sealer`] that can process streaming payloads.
45    pub fn new(
46        pk: &PublicKey<CGWKV>,
47        policies: &EncryptionPolicy,
48        pub_sign_key: &SigningKeyExt,
49        rng: &'r mut Rng,
50    ) -> Result<Self, Error> {
51        let (header, ss) = Header::new(pk, policies, rng)?;
52
53        let (segment_size, _) = stream_mode_checked(&header)?;
54        let Algorithm::Aes128Gcm(iv) = header.algo;
55
56        let mut key = [0u8; KEY_SIZE];
57        let mut nonce = [0u8; STREAM_NONCE_SIZE];
58
59        key.copy_from_slice(&ss.0[..KEY_SIZE]);
60        nonce.copy_from_slice(&iv.0[..STREAM_NONCE_SIZE]);
61
62        Ok(Sealer {
63            rng,
64            header,
65            pub_sign_key: pub_sign_key.clone(),
66            priv_sign_key: None,
67            config: SealerStreamConfig {
68                segment_size,
69                key,
70                nonce,
71            },
72        })
73    }
74
75    /// Optional: Add a size hint.
76    ///
77    /// This can help the receiver save some reallocations.
78    pub fn with_size_hint(mut self, size_hint: (u64, Option<u64>)) -> Self {
79        self.header.mode = Mode::Streaming {
80            segment_size: self.config.segment_size,
81            size_hint,
82        };
83
84        self
85    }
86
87    /// Seals payload data from an [`AsyncRead`] into an [`AsyncWrite`].
88    pub async fn seal<R, W>(self, mut r: R, mut w: W) -> Result<(), Error>
89    where
90        R: AsyncRead + Unpin,
91        W: AsyncWrite + Unpin,
92    {
93        w.write_all(&PRELUDE).await?;
94        w.write_all(&VERSION_V3.to_be_bytes()).await?;
95
96        let header_vec = bincode::serialize(&self.header)?;
97        w.write_all(&u32::try_from(header_vec.len())?.to_be_bytes())
98            .await?;
99        w.write_all(&header_vec).await?;
100
101        let mut signer = Signer::default().chain(&header_vec);
102        let header_sig = signer.clone().sign(&self.pub_sign_key.key.0, self.rng);
103        let header_sig_ext = SignatureExt {
104            sig: header_sig,
105            pol: self.pub_sign_key.policy.clone(),
106        };
107        let header_sig_bytes = bincode::serialize(&header_sig_ext)?;
108
109        w.write_all(&u32::try_from(header_sig_bytes.len())?.to_be_bytes())
110            .await?;
111        w.write_all(&header_sig_bytes).await?;
112
113        let aead = Aes128Gcm::new_from_slice(&self.config.key)?;
114        let mut enc = EncryptorBE32::from_aead(aead, &self.config.nonce.into());
115
116        // Check for a private signing key, otherwise fall back to the public one.
117        let signing_key = self.priv_sign_key.unwrap_or(self.pub_sign_key);
118
119        let pol_bytes = bincode::serialize(&signing_key.policy)?;
120        let pol_len = pol_bytes.len();
121
122        if pol_len + POL_SIZE_SIZE > self.config.segment_size as usize {
123            return Err(Error::ConstraintViolation);
124        }
125
126        let mut buf = vec![0; self.config.segment_size as usize + TAG_SIZE];
127
128        buf[..POL_SIZE_SIZE].copy_from_slice(&u32::try_from(pol_len)?.to_be_bytes());
129        buf[POL_SIZE_SIZE..POL_SIZE_SIZE + pol_len].copy_from_slice(&pol_bytes);
130
131        let mut buf_tail = POL_SIZE_SIZE + pol_len;
132        let mut start = buf_tail;
133
134        // First segment: DEM.K (pol_len || pol || m_0 || sig_0 )
135        // Other segments: DEM.K (m_i || sig_0)
136
137        let mut counter: u32 = 0;
138
139        loop {
140            let read = r
141                .read(&mut buf[buf_tail..self.config.segment_size as usize])
142                .await?;
143            buf_tail += read;
144
145            if buf_tail == self.config.segment_size as usize {
146                buf.truncate(buf_tail);
147
148                signer.update(&buf[start..]);
149                let sig = signer
150                    .clone()
151                    .chain(&counter.to_be_bytes())
152                    .chain(&[0x00])
153                    .sign(&signing_key.key.0, self.rng);
154                bincode::serialize_into(&mut buf, &sig)?;
155
156                enc.encrypt_next_in_place(b"", &mut buf)?;
157
158                w.write_all(&buf).await?;
159
160                buf_tail = 0;
161                start = 0;
162                counter = counter.checked_add(1).unwrap(); // cannot fail, otherwise
163                                                           // encrypt_next_in_place would have
164                                                           // failed too.                                                // encrypt_next_in_place not failing
165            } else if read == 0 {
166                buf.truncate(buf_tail);
167
168                signer.update(&buf[start..]);
169                let sig_final = signer
170                    .chain(&counter.to_be_bytes())
171                    .chain(&[0x01])
172                    .sign(&signing_key.key.0, self.rng);
173                bincode::serialize_into(&mut buf, &sig_final)?;
174
175                enc.encrypt_last_in_place(b"", &mut buf)?;
176
177                w.write_all(&buf).await?;
178                break;
179            }
180        }
181
182        w.flush().await?;
183        w.close().await?;
184
185        Ok(())
186    }
187}
188
189impl<R> Unsealer<R, UnsealerStreamConfig>
190where
191    R: AsyncRead + Unpin,
192{
193    /// Create a new [`Unsealer`] that starts reading from an [`AsyncRead`].
194    ///
195    /// Errors if the bytestream is not a legitimate PostGuard bytestream.
196    pub async fn new(mut r: R, pk: &VerifyingKey) -> Result<Self, Error> {
197        let mut preamble = [0u8; PREAMBLE_SIZE];
198        r.read_exact(&mut preamble)
199            .map_err(|_e| Error::NotPostGuard)
200            .await?;
201
202        let (version, header_len) = preamble_checked(&preamble)?;
203        let mut header_raw = Vec::with_capacity(header_len);
204
205        // Limit reader to not read past header
206        let mut r = r.take(header_len as u64);
207
208        r.read_to_end(&mut header_raw)
209            .map_err(|_e| Error::ConstraintViolation)
210            .await?;
211
212        let mut r = r.into_inner();
213
214        let mut header_sig_len_bytes = [0u8; SIG_SIZE_SIZE];
215        r.read_exact(&mut header_sig_len_bytes)
216            .map_err(|_e| Error::FormatViolation("no header signature length".to_string()))
217            .await?;
218        let header_sig_len = u32::from_be_bytes(header_sig_len_bytes);
219
220        let mut header_sig_raw = Vec::with_capacity(header_sig_len as usize);
221        let mut r = r.take(header_sig_len as u64);
222
223        r.read_to_end(&mut header_sig_raw).await?;
224
225        let h_sig_ext: SignatureExt = bincode::deserialize(&header_sig_raw)?;
226
227        let verifier = Verifier::default().chain(&header_raw);
228        let pub_id = h_sig_ext.pol.derive_ibs()?;
229
230        if !verifier.clone().verify(&pk.0, &h_sig_ext.sig, &pub_id) {
231            return Err(Error::IncorrectSignature);
232        }
233
234        let header: Header = bincode::deserialize(&header_raw)?;
235        let (segment_size, _) = stream_mode_checked(&header)?;
236
237        Ok(Unsealer {
238            version,
239            header,
240            pub_id: h_sig_ext.pol,
241            config: UnsealerStreamConfig { segment_size },
242            r: r.into_inner(), // This (new) reader is locked to the payload.
243            verifier,
244            vk: pk.clone(),
245        })
246    }
247
248    /// Unseal the remaining data (which is now only payload) into an [`AsyncWrite`].
249    pub async fn unseal<W: AsyncWrite + Unpin>(
250        mut self,
251        ident: &str,
252        usk: &UserSecretKey<CGWKV>,
253        mut w: W,
254    ) -> Result<VerificationResult, Error> {
255        let rec_info = self
256            .header
257            .recipients
258            .get(ident)
259            .ok_or_else(|| Error::UnknownIdentifier(ident.to_string()))?;
260
261        let ss = rec_info.decaps(usk)?;
262        let key = &ss.0[..KEY_SIZE];
263        let aead = Aes128Gcm::new_from_slice(key)?;
264
265        let Algorithm::Aes128Gcm(iv) = self.header.algo;
266        let nonce = &iv.0[..STREAM_NONCE_SIZE];
267
268        let mut dec = DecryptorBE32::from_aead(aead, nonce.into());
269
270        let bufsize: usize = self.config.segment_size as usize + SIG_BYTES + TAG_SIZE;
271        let mut buf = vec![0u8; bufsize];
272        let mut buf_tail = 0;
273        let mut counter: u32 = 0;
274        let mut pol_id: Option<(Policy, Identity)> = None;
275
276        fn extract_policy(buf: &mut Vec<u8>) -> Result<Option<(Policy, Identity)>, Error> {
277            let pol_len = u32::from_be_bytes(buf[..POL_SIZE_SIZE].try_into()?) as usize;
278            let pol_bytes = &buf[POL_SIZE_SIZE..POL_SIZE_SIZE + pol_len];
279            let pol: Policy = bincode::deserialize(pol_bytes)?;
280            let id = pol.derive_ibs()?;
281
282            buf.drain(..POL_SIZE_SIZE + pol_len);
283
284            Ok(Some((pol, id)))
285        }
286
287        fn verify_segment<'a>(
288            seg: &'a [u8],
289            verifier: &mut Verifier,
290            vk: &VerifyingKey,
291            id: &Identity,
292            counter: u32,
293            is_last: bool,
294        ) -> Result<&'a [u8], Error> {
295            debug_assert!(seg.len() > SIG_BYTES);
296
297            let (m, sig_bytes) = seg.split_at(seg.len() - SIG_BYTES);
298            let sig: Signature = bincode::deserialize(sig_bytes)?;
299            verifier.update(m);
300
301            if !verifier
302                .clone()
303                .chain(&counter.to_be_bytes())
304                .chain(&[is_last as u8])
305                .verify(&vk.0, &sig, id)
306            {
307                return Err(Error::IncorrectSignature);
308            }
309
310            Ok(m)
311        }
312
313        loop {
314            let read = self.r.read(&mut buf[buf_tail..bufsize]).await?;
315            buf_tail += read;
316
317            if buf_tail == bufsize {
318                dec.decrypt_next_in_place(b"", &mut buf)?;
319
320                if counter == 0 {
321                    pol_id = extract_policy(&mut buf)?;
322                }
323
324                let m = verify_segment(
325                    &buf,
326                    &mut self.verifier,
327                    &self.vk,
328                    &pol_id.as_ref().unwrap().1,
329                    counter,
330                    false,
331                )?;
332
333                w.write_all(m).await?;
334
335                buf_tail = 0;
336                buf.resize(bufsize, 0);
337                counter += 1;
338            } else if read == 0 {
339                buf.truncate(buf_tail);
340                dec.decrypt_last_in_place(b"", &mut buf)?;
341
342                if counter == 0 {
343                    pol_id = extract_policy(&mut buf)?;
344                }
345
346                let m = verify_segment(
347                    &buf,
348                    &mut self.verifier,
349                    &self.vk,
350                    &pol_id.as_ref().unwrap().1,
351                    counter,
352                    true,
353                )?;
354
355                w.write_all(m).await?;
356
357                break;
358            }
359        }
360
361        w.close().await?;
362
363        let private_id = pol_id.unwrap().0;
364        let private = if self.pub_id == private_id {
365            None
366        } else {
367            Some(private_id)
368        };
369
370        Ok(VerificationResult {
371            public: self.pub_id,
372            private,
373        })
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use super::{Sealer, SealerStreamConfig, Unsealer, UnsealerStreamConfig};
380    use crate::client::VerificationResult;
381    use crate::error::Error;
382    use crate::test::TestSetup;
383    use crate::{PREAMBLE_SIZE, SYMMETRIC_CRYPTO_DEFAULT_CHUNK, TAG_SIZE};
384    use alloc::string::String;
385    use alloc::vec::Vec;
386    use futures::{executor::block_on, io::AllowStdIo};
387    use rand::{thread_rng, Rng, RngCore};
388    use std::io::Cursor;
389    use tokio::io::AsyncReadExt;
390
391    const LENGTHS: &[u32] = &[
392        1,
393        512,
394        SYMMETRIC_CRYPTO_DEFAULT_CHUNK - 3,
395        SYMMETRIC_CRYPTO_DEFAULT_CHUNK,
396        SYMMETRIC_CRYPTO_DEFAULT_CHUNK + 3,
397        3 * SYMMETRIC_CRYPTO_DEFAULT_CHUNK,
398        3 * SYMMETRIC_CRYPTO_DEFAULT_CHUNK + 16,
399        3 * SYMMETRIC_CRYPTO_DEFAULT_CHUNK - 17,
400    ];
401
402    fn seal_helper(setup: &TestSetup, plain: &[u8]) -> Vec<u8> {
403        let mut rng = rand::thread_rng();
404
405        let mut input = AllowStdIo::new(Cursor::new(plain));
406        let mut output = AllowStdIo::new(Vec::new());
407
408        let signing_key = &setup.signing_keys[0];
409
410        block_on(async {
411            Sealer::<_, SealerStreamConfig>::new(
412                &setup.ibe_pk,
413                &setup.policy,
414                &signing_key,
415                &mut rng,
416            )
417            .unwrap()
418            .seal(&mut input, &mut output)
419            .await
420            .unwrap();
421        });
422
423        output.into_inner()
424    }
425
426    fn unseal_helper(setup: &TestSetup, ct: &[u8]) -> (Vec<u8>, VerificationResult) {
427        let mut input = AllowStdIo::new(Cursor::new(ct));
428        let mut output = AllowStdIo::new(Vec::new());
429
430        // sometimes decrypt as Bob, sometimes decrypt as Charlie
431        let (id, usk_id) = if thread_rng().gen::<bool>() {
432            ("Bob", setup.usks[2].clone())
433        } else {
434            ("Charlie", setup.usks[3].clone())
435        };
436
437        let vr = block_on(async {
438            let unsealer = Unsealer::<_, UnsealerStreamConfig>::new(&mut input, &setup.ibs_pk)
439                .await
440                .unwrap();
441
442            // Normally, a user would need to retrieve a usk here via the PKG,
443            // but in this case we own the master key pair.
444            unsealer.unseal(id, &usk_id, &mut output).await.unwrap()
445        });
446
447        (output.into_inner(), vr)
448    }
449
450    fn seal_and_unseal(setup: &TestSetup, plain: Vec<u8>) {
451        let ct = seal_helper(setup, &plain);
452        let (plain2, vr) = unseal_helper(setup, &ct);
453
454        assert_eq!(&plain, &plain2);
455        assert_eq!(&vr.public, &setup.signing_keys[0].policy);
456        assert_eq!(vr.private, None);
457    }
458
459    fn rand_vec(length: usize) -> Vec<u8> {
460        let mut vec = vec![0u8; length];
461        rand::thread_rng().fill_bytes(&mut vec);
462        vec
463    }
464
465    #[test]
466    fn test_reflection_seal_unsealer() {
467        let mut rng = rand::thread_rng();
468        let setup = TestSetup::new(&mut rng);
469
470        for l in LENGTHS {
471            seal_and_unseal(&setup, rand_vec(*l as usize));
472        }
473    }
474
475    #[test]
476    #[should_panic]
477    fn test_corrupt_header() {
478        let mut rng = rand::thread_rng();
479        let setup = TestSetup::new(&mut rng);
480
481        let plain = rand_vec(100);
482        let mut ct = seal_helper(&setup, &plain);
483
484        // Flip a byte that is guaranteed to be in the header.
485        ct[PREAMBLE_SIZE + 2] = !ct[PREAMBLE_SIZE + 2];
486
487        // This should panic, because of the header signature.
488        let _plain2 = unseal_helper(&setup, &ct);
489    }
490
491    #[test]
492    #[should_panic]
493    fn test_corrupt_payload() {
494        let mut rng = rand::thread_rng();
495        let setup = TestSetup::new(&mut rng);
496
497        let plain = rand_vec(100);
498        let mut ct = seal_helper(&setup, &plain);
499
500        // Flip a byte that is guaranteed to be in the encrypted payload.
501        let ct_len = ct.len();
502        ct[ct_len - TAG_SIZE - 5] = !ct[ct_len - TAG_SIZE - 5];
503
504        // This should panic, because of the AEAD.
505        let _plain2 = unseal_helper(&setup, &ct);
506    }
507
508    #[test]
509    #[should_panic]
510    fn test_corrupt_tag() {
511        let mut rng = rand::thread_rng();
512        let setup = TestSetup::new(&mut rng);
513
514        let plain = rand_vec(100);
515        let mut ct = seal_helper(&setup, &plain);
516
517        let len = ct.len();
518        ct[len - 5] = !ct[len - 5];
519
520        // This should panic as well.
521        let _plain2 = unseal_helper(&setup, &ct);
522    }
523
524    #[tokio::test]
525    async fn test_tokio_file() -> Result<(), Error> {
526        use futures::AsyncWriteExt;
527        use tokio::fs::{File, OpenOptions};
528        use tokio_util::compat::TokioAsyncReadCompatExt;
529
530        let mut rng = rand::thread_rng();
531        let setup = TestSetup::new(&mut rng);
532
533        let signing_key = &setup.signing_keys[0];
534
535        let in_name = std::env::temp_dir().join("foo.txt");
536        let out_name = std::env::temp_dir().join("foo.enc");
537        let orig_name = std::env::temp_dir().join("foo2.txt");
538
539        let mut file = OpenOptions::new()
540            .create(true)
541            .write(true)
542            .truncate(true)
543            .open(&in_name)
544            .await?
545            .compat();
546
547        file.write_all(b"SECRET DATA").await?;
548        file.close().await?;
549
550        let mut in_file = File::open(&in_name).await?.compat();
551        let mut out_file = OpenOptions::new()
552            .create(true)
553            .write(true)
554            .truncate(true)
555            .open(&out_name)
556            .await?
557            .compat();
558
559        Sealer::<_, SealerStreamConfig>::new(&setup.ibe_pk, &setup.policy, signing_key, &mut rng)?
560            .seal(&mut in_file, &mut out_file)
561            .await?;
562
563        in_file.close().await?;
564        out_file.close().await?;
565
566        let mut out_file = File::open(&out_name).await?.compat();
567        let mut orig_file = OpenOptions::new()
568            .create(true)
569            .write(true)
570            .truncate(true)
571            .open(&orig_name)
572            .await?
573            .compat();
574
575        let id = "Bob";
576        let usk = &setup.usks[2];
577
578        Unsealer::<_, UnsealerStreamConfig>::new(&mut out_file, &setup.ibs_pk)
579            .await?
580            .unseal(id, usk, &mut orig_file)
581            .await?;
582
583        out_file.close().await?;
584        orig_file.close().await?;
585
586        let mut buf = String::new();
587        File::open(&orig_name)
588            .await?
589            .read_to_string(&mut buf)
590            .await?;
591
592        assert_eq!(buf.as_bytes(), b"SECRET DATA");
593
594        Ok(())
595    }
596
597    #[tokio::test]
598    async fn test_cursor() -> Result<(), Error> {
599        use futures::io::Cursor;
600
601        let mut rng = rand::thread_rng();
602        let setup = TestSetup::new(&mut rng);
603
604        let signing_key = &setup.signing_keys[0];
605
606        let mut input = Cursor::new(b"SECRET DATA");
607        let mut encrypted = Vec::new();
608
609        Sealer::<_, SealerStreamConfig>::new(&setup.ibe_pk, &setup.policy, signing_key, &mut rng)?
610            .seal(&mut input, &mut encrypted)
611            .await?;
612
613        let mut original = Vec::new();
614        let id = "Bob";
615        let usk = &setup.usks[2];
616        Unsealer::<_, UnsealerStreamConfig>::new(&mut Cursor::new(encrypted), &setup.ibs_pk)
617            .await?
618            .unseal(id, usk, &mut original)
619            .await?;
620
621        assert_eq!(input.into_inner().to_vec(), original);
622        Ok(())
623    }
624}