pg_core/client/web/
stream.rs

1//! Streaming mode.
2
3use super::aesgcm::{decrypt, encrypt, get_key};
4
5use crate::artifacts::{PublicKey, SigningKeyExt, UserSecretKey, VerifyingKey};
6use crate::client::*;
7use crate::error::Error;
8use crate::identity::{EncryptionPolicy, Policy};
9use crate::util::preamble_checked;
10use ibs::gg::{Identity, Signature, Signer, Verifier, SIG_BYTES};
11
12use futures::{Sink, SinkExt, Stream, StreamExt};
13use ibe::kem::cgw_kv::CGWKV;
14use js_sys::Uint8Array;
15use rand::{CryptoRng, RngCore};
16use wasm_bindgen::{JsCast, JsValue};
17
18use alloc::string::ToString;
19use alloc::vec::Vec;
20
21/// Configures an [`Sealer`] to process a payload stream.
22#[derive(Debug)]
23pub struct StreamSealerConfig {
24    segment_size: u32,
25    key: [u8; KEY_SIZE],
26    nonce: [u8; STREAM_NONCE_SIZE],
27}
28
29/// Configures an [`Unsealer`] to process a payload stream.
30#[derive(Debug)]
31pub struct StreamUnsealerConfig {
32    segment_size: u32,
33    spill: Vec<u8>,
34}
35
36impl SealerConfig for StreamSealerConfig {}
37impl UnsealerConfig for StreamUnsealerConfig {}
38impl crate::client::sealed::SealerConfig for StreamSealerConfig {}
39impl crate::client::sealed::UnsealerConfig for StreamUnsealerConfig {}
40
41impl<'r, Rng: RngCore + CryptoRng> Sealer<'r, Rng, StreamSealerConfig> {
42    /// Construct a new [`Sealer`] that can process payloads streamingly.
43    pub fn new(
44        pk: &PublicKey<CGWKV>,
45        policies: &EncryptionPolicy,
46        pub_sign_key: &SigningKeyExt,
47        rng: &'r mut Rng,
48    ) -> Result<Self, Error> {
49        let (header, ss) = Header::new(pk, policies, rng)?;
50
51        let (segment_size, _) = stream_mode_checked(&header)?;
52        let Algorithm::Aes128Gcm(iv) = header.algo;
53
54        let mut key = [0u8; KEY_SIZE];
55        let mut nonce = [0u8; STREAM_NONCE_SIZE];
56
57        key.copy_from_slice(&ss.0[..KEY_SIZE]);
58        nonce.copy_from_slice(&iv.0[..STREAM_NONCE_SIZE]);
59
60        Ok(Sealer {
61            rng,
62            header,
63            pub_sign_key: pub_sign_key.clone(),
64            priv_sign_key: None,
65            config: StreamSealerConfig {
66                segment_size,
67                key,
68                nonce,
69            },
70        })
71    }
72
73    /// Seals payload data from a [`Stream`] of [`JsValue`] to a Sink of [`JsValue`].
74    ///
75    /// # Errors
76    ///
77    /// Make sure the [`JsValue`]s *can* dynamically be cast to [`Uint8Array`],
78    /// otherwise this operation *will* error.
79    pub async fn seal<R, W>(mut self, mut r: R, mut w: W) -> Result<(), Error>
80    where
81        R: Stream<Item = Result<JsValue, JsValue>> + Unpin,
82        W: Sink<JsValue, Error = JsValue> + Unpin,
83    {
84        let size_hint = r.size_hint();
85        let new_hint = (size_hint.0 as u64, size_hint.1.map(|x| x as u64));
86
87        self.header = self.header.with_mode(Mode::Streaming {
88            segment_size: self.config.segment_size,
89            size_hint: new_hint,
90        });
91
92        w.feed(Uint8Array::from(&PRELUDE[..]).into()).await?;
93        w.feed(Uint8Array::from(&VERSION_V3.to_be_bytes()[..]).into())
94            .await?;
95
96        let header_vec = bincode::serialize(&self.header)?;
97
98        w.feed(Uint8Array::from(&(header_vec.len() as u32).to_be_bytes()[..]).into())
99            .await?;
100
101        w.feed(Uint8Array::from(&header_vec[..]).into()).await?;
102
103        let mut signer = Signer::default().chain(&header_vec);
104        let header_sig = signer.clone().sign(&self.pub_sign_key.key.0, self.rng);
105        let header_sig_ext = SignatureExt {
106            sig: header_sig,
107            pol: self.pub_sign_key.policy.clone(),
108        };
109        let header_sig_bytes = bincode::serialize(&header_sig_ext)?;
110
111        w.feed(Uint8Array::from(&(header_sig_bytes.len() as u32).to_be_bytes()[..]).into())
112            .await?;
113        w.feed(Uint8Array::from(&header_sig_bytes[..]).into())
114            .await?;
115
116        let key = get_key(&self.config.key).await?;
117
118        // Check for a private signing key, otherwise fall back to the public one.
119        let signing_key = self.priv_sign_key.unwrap_or(self.pub_sign_key);
120
121        let pol_bytes = bincode::serialize(&signing_key.policy)?;
122        let pol_len: u32 = pol_bytes.len() as u32;
123
124        if pol_len + POL_SIZE_SIZE as u32 > self.config.segment_size {
125            return Err(Error::ConstraintViolation.into());
126        }
127
128        let buf = Uint8Array::new_with_length(self.config.segment_size + SIG_BYTES as u32);
129
130        buf.set(
131            &Uint8Array::from(&(pol_len as u32).to_be_bytes()[..]).into(),
132            0,
133        );
134        buf.set(
135            &Uint8Array::from(&pol_bytes[..]).into(),
136            POL_SIZE_SIZE as u32,
137        );
138
139        let mut counter = 0u32;
140        let mut buf_tail: u32 = POL_SIZE_SIZE as u32 + pol_len;
141        let mut start: u32 = buf_tail;
142
143        while let Some(Ok(data)) = r.next().await {
144            let mut array: Uint8Array = data.dyn_into()?;
145
146            while array.byte_length() != 0 {
147                let len = array.byte_length();
148                let rem = self.config.segment_size - buf_tail;
149
150                if len < rem {
151                    buf.set(&array, buf_tail);
152                    array = Uint8Array::new_with_length(0);
153                    buf_tail += len;
154                } else {
155                    buf.set(&array.slice(0, rem), buf_tail);
156                    array = array.slice(rem, len);
157                    buf_tail += rem;
158
159                    signer.update(&buf.slice(start, buf_tail).to_vec());
160                    let sig = signer
161                        .clone()
162                        .chain(&counter.to_be_bytes())
163                        .chain(&[0x00])
164                        .sign(&signing_key.key.0, self.rng);
165                    let sig_bytes = bincode::serialize(&sig)?;
166
167                    buf.set(&Uint8Array::from(&sig_bytes[..]).into(), buf_tail);
168
169                    let ct = encrypt(
170                        &key,
171                        &aead_nonce(&self.config.nonce, counter, false),
172                        &Uint8Array::new_with_length(0),
173                        &buf,
174                    )
175                    .await?;
176
177                    w.feed(ct.into()).await?;
178
179                    counter = counter.checked_add(1).ok_or(Error::Symmetric)?;
180                    buf_tail = 0;
181                    start = 0;
182                }
183            }
184        }
185
186        signer.update(&buf.slice(start, buf_tail).to_vec());
187        let sig = signer
188            .chain(&counter.to_be_bytes())
189            .chain(&[0x01])
190            .sign(&signing_key.key.0, self.rng);
191        let sig_bytes = bincode::serialize(&sig)?;
192
193        buf.set(&Uint8Array::from(&sig_bytes[..]).into(), buf_tail);
194        buf_tail += SIG_BYTES as u32;
195
196        let final_ct = encrypt(
197            &key,
198            &aead_nonce(&self.config.nonce, counter, true),
199            &Uint8Array::new_with_length(0),
200            &buf.slice(0, buf_tail),
201        )
202        .await?;
203
204        w.feed(final_ct.into()).await?;
205
206        w.flush().await?;
207        w.close().await?;
208
209        Ok(())
210    }
211}
212
213// Nonce generation as defined in the STREAM construction.
214fn aead_nonce(nonce: &[u8], counter: u32, last_block: bool) -> [u8; IV_SIZE] {
215    let mut iv = [0u8; IV_SIZE];
216
217    iv[..STREAM_NONCE_SIZE].copy_from_slice(nonce);
218    iv[STREAM_NONCE_SIZE..IV_SIZE - 1].copy_from_slice(&counter.to_be_bytes());
219    iv[IV_SIZE - 1] = last_block as u8;
220
221    iv
222}
223
224async fn read_atleast<R>(mut r: R, buf: &mut [u8], spill: &mut Vec<u8>) -> Result<(), Error>
225where
226    R: Stream<Item = Result<JsValue, JsValue>> + Unpin,
227{
228    let buf_len = buf.len();
229    let spill_len = spill.len();
230
231    if buf_len <= spill_len {
232        buf.copy_from_slice(&spill[..buf_len]);
233        spill.drain(..buf_len);
234
235        Ok(())
236    } else {
237        buf[..spill_len].copy_from_slice(&spill);
238        let mut rem = buf_len - spill_len;
239        spill.clear();
240
241        while let Some(Ok(data)) = r.next().await {
242            let arr: Uint8Array = data.dyn_into()?;
243            let len = arr.byte_length();
244
245            if len as usize >= rem {
246                buf[buf_len - rem..].copy_from_slice(&arr.slice(0, rem as u32).to_vec()[..]);
247                spill.extend_from_slice(&arr.slice(rem as u32, len).to_vec()[..]);
248                rem = 0;
249                break;
250            } else {
251                buf[buf_len - rem..buf_len - rem + len as usize].copy_from_slice(&arr.to_vec()[..]);
252                rem -= len as usize;
253            }
254        }
255
256        if rem == 0 {
257            Ok(())
258        } else {
259            Err(Error::FormatViolation("unexpected EOF".to_string()).into())
260        }
261    }
262}
263
264// Note: It might be easier to work with R: ReadableStream.
265
266impl<R> Unsealer<R, StreamUnsealerConfig>
267where
268    R: Stream<Item = Result<JsValue, JsValue>> + Unpin,
269{
270    /// Create a new [`Unsealer`] that starts reading from a [`Stream<Item = Result<Uint8Array, JsValue>>`][Stream].
271    ///
272    /// # Errors
273    ///
274    /// Errors if the bytestream is not a legitimate PostGuard bytestream.
275    /// Also errors if the items (of type [`JsValue`]) cannot be cast into [`Uint8Array`].
276    pub async fn new(mut r: R, vk: &VerifyingKey) -> Result<Self, Error> {
277        let mut spill = Vec::new();
278
279        let mut preamble = [0u8; PREAMBLE_SIZE];
280        read_atleast(&mut r, &mut preamble, &mut spill).await?;
281        let (version, header_len) = preamble_checked(&preamble)?;
282
283        let mut header_raw = vec![0u8; header_len];
284        read_atleast(&mut r, &mut header_raw, &mut spill).await?;
285
286        let mut h_sig_len_bytes = [0u8; SIG_SIZE_SIZE];
287        read_atleast(&mut r, &mut h_sig_len_bytes, &mut spill).await?;
288        let header_sig_len = u32::from_be_bytes(h_sig_len_bytes);
289
290        let mut header_sig_raw = vec![0u8; header_sig_len as usize];
291        read_atleast(&mut r, &mut header_sig_raw, &mut spill).await?;
292        let h_sig_ext: SignatureExt = bincode::deserialize(&header_sig_raw)?;
293
294        let verifier = Verifier::default().chain(&header_raw);
295        let pub_id = h_sig_ext.pol.derive_ibs()?;
296
297        if !verifier.clone().verify(&vk.0, &h_sig_ext.sig, &pub_id) {
298            return Err(Error::IncorrectSignature.into());
299        }
300
301        let header: Header = bincode::deserialize(&header_raw)?;
302        let (segment_size, _) = stream_mode_checked(&header)?;
303
304        Ok(Unsealer {
305            version,
306            header,
307            pub_id: h_sig_ext.pol,
308            verifier,
309            vk: vk.clone(),
310            r,
311            config: StreamUnsealerConfig {
312                spill,
313                segment_size,
314            },
315        })
316    }
317
318    /// Unseal into an [`Sink<Uint8Array, Error = JsValue>`][Sink].
319    pub async fn unseal<W>(
320        &mut self,
321        ident: &str,
322        usk: &UserSecretKey<CGWKV>,
323        mut w: W,
324    ) -> Result<VerificationResult, Error>
325    where
326        W: Sink<JsValue, Error = JsValue> + Unpin,
327    {
328        let rec_info = self
329            .header
330            .recipients
331            .get(ident)
332            .ok_or_else(|| Error::UnknownIdentifier(ident.to_string()))?;
333
334        let ss = rec_info.decaps(usk)?;
335        let key = get_key(&ss.0[..KEY_SIZE]).await?;
336
337        let Algorithm::Aes128Gcm(iv) = self.header.algo;
338        let nonce = &iv.0[..STREAM_NONCE_SIZE];
339
340        let segment_size: u32 = self.config.segment_size + (SIG_BYTES + TAG_SIZE) as u32;
341
342        let buf = Uint8Array::new_with_length(segment_size);
343        let mut counter = 0u32;
344        let mut buf_tail = 0;
345        let mut pol_id: Option<(Policy, Identity)> = None;
346
347        fn extract_policy(
348            plain: Uint8Array,
349        ) -> Result<(Option<(Policy, Identity)>, Uint8Array), Error> {
350            let pol_len =
351                u32::from_be_bytes(plain.slice(0, POL_SIZE_SIZE as u32).to_vec()[..].try_into()?);
352            let pol_bytes = plain.slice(POL_SIZE_SIZE as u32, POL_SIZE_SIZE as u32 + pol_len);
353            let pol: Policy = bincode::deserialize(&pol_bytes.to_vec())?;
354            let id = pol.derive_ibs()?;
355            let new_plain = plain.slice(POL_SIZE_SIZE as u32 + pol_len, plain.byte_length());
356
357            Ok((Some((pol, id)), new_plain))
358        }
359
360        loop {
361            // First exhaust the spillage, then the rest of the stream.
362            let mut array: Uint8Array = if !self.config.spill.is_empty() {
363                let arr = Uint8Array::from(&self.config.spill[..]);
364                self.config.spill.clear();
365                arr
366            } else if let Some(Ok(data)) = self.r.next().await {
367                data.dyn_into()?
368            } else {
369                break;
370            };
371
372            while array.byte_length() != 0 {
373                let len = array.byte_length();
374                let rem = buf.byte_length() - buf_tail;
375
376                if len < rem {
377                    buf.set(&array, buf_tail);
378                    array = Uint8Array::new_with_length(0);
379                    buf_tail += len;
380                } else {
381                    buf.set(&array.slice(0, rem), buf_tail);
382                    array = array.slice(rem, len);
383
384                    let mut plain = decrypt(
385                        &key,
386                        &aead_nonce(nonce, counter, false),
387                        &Uint8Array::new_with_length(0),
388                        &buf,
389                    )
390                    .await?;
391
392                    if counter == 0 {
393                        (pol_id, plain) = extract_policy(plain)?;
394                    }
395
396                    debug_assert!(plain.byte_length() > SIG_BYTES as u32);
397
398                    let m = plain.slice(0, plain.byte_length() - SIG_BYTES as u32);
399                    let sig =
400                        plain.slice(plain.byte_length() - SIG_BYTES as u32, plain.byte_length());
401                    let sig: Signature = bincode::deserialize(&sig.to_vec())?;
402
403                    self.verifier.update(&m.to_vec());
404
405                    if !self
406                        .verifier
407                        .clone()
408                        .chain(&counter.to_be_bytes())
409                        .chain(&[0x00])
410                        .verify(&self.vk.0, &sig, &pol_id.as_ref().unwrap().1)
411                    {
412                        return Err(Error::IncorrectSignature.into());
413                    }
414
415                    w.feed(m.into()).await?;
416
417                    counter = counter.checked_add(1).ok_or(Error::Symmetric)?;
418                    buf_tail = 0;
419                }
420            }
421        }
422
423        let mut final_plain = decrypt(
424            &key,
425            &aead_nonce(nonce, counter, true),
426            &Uint8Array::new_with_length(0),
427            &buf.slice(0, buf_tail),
428        )
429        .await?;
430
431        if counter == 0 {
432            (pol_id, final_plain) = extract_policy(final_plain)?;
433        }
434
435        debug_assert!(final_plain.byte_length() > SIG_BYTES as u32);
436        let m = final_plain.slice(0, final_plain.byte_length() - SIG_BYTES as u32);
437        let sig = final_plain.slice(
438            final_plain.byte_length() - SIG_BYTES as u32,
439            final_plain.byte_length(),
440        );
441
442        let sig: Signature = bincode::deserialize(&sig.to_vec())?;
443        self.verifier.update(&m.to_vec());
444        if !self
445            .verifier
446            .clone()
447            .chain(&counter.to_be_bytes())
448            .chain(&[0x01])
449            .verify(&self.vk.0, &sig, &pol_id.as_ref().unwrap().1)
450        {
451            return Err(Error::IncorrectSignature.into());
452        }
453
454        w.feed(m.into()).await?;
455
456        w.flush().await?;
457        w.close().await?;
458
459        let private_id = pol_id.unwrap().0;
460        let private = if self.pub_id == private_id {
461            None
462        } else {
463            Some(private_id)
464        };
465
466        Ok(VerificationResult {
467            public: self.pub_id.clone(),
468            private,
469        })
470    }
471}