noosphere_core/stream/
mod.rs

1//! Utilities to support producing streams of blocks, as well as converting
2//! streams of blocks to and from CARv1-encoded byte streams
3
4mod block;
5mod car;
6mod ledger;
7mod memo;
8mod walk;
9
10pub use block::*;
11pub use car::*;
12pub use ledger::*;
13pub use memo::*;
14pub use walk::*;
15
16#[cfg(test)]
17mod tests {
18    use anyhow::Result;
19    use cid::Cid;
20    use libipld_core::{codec::Codec, ipld::Ipld, raw::RawCodec};
21    use std::collections::BTreeSet;
22    use ucan::{crypto::KeyMaterial, store::UcanJwtStore};
23
24    use crate::{
25        authority::{generate_ed25519_key, Access},
26        context::{
27            HasMutableSphereContext, HasSphereContext, SphereAuthorityWrite, SphereContentRead,
28            SphereContentWrite, SpherePetnameWrite,
29        },
30        data::{BodyChunkIpld, ContentType, Link, LinkRecord, MemoIpld},
31        helpers::{
32            make_valid_link_record, simulated_sphere_context, touch_all_sphere_blocks,
33            SimulatedHasMutableSphereContext,
34        },
35        stream::{from_car_stream, memo_body_stream, memo_history_stream, to_car_stream},
36        tracing::initialize_tracing,
37        view::{BodyChunkDecoder, Sphere},
38    };
39    use libipld_cbor::DagCborCodec;
40    use noosphere_storage::{BlockStore, MemoryStore, UcanStore};
41    use tokio_stream::StreamExt;
42
43    #[cfg(target_arch = "wasm32")]
44    use wasm_bindgen_test::wasm_bindgen_test;
45
46    #[cfg(target_arch = "wasm32")]
47    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
48
49    pub const SCAFFOLD_CHANGES: &[(&[&str], &[&str])] = &[
50        (&["dogs", "birds"], &["alice", "bob"]),
51        (&["cats", "dogs"], &["gordon"]),
52        (&["birds"], &["cdata"]),
53        (&["cows", "beetles"], &["jordan", "ben"]),
54    ];
55
56    pub async fn scaffold_sphere_context_with_history(
57    ) -> Result<(SimulatedHasMutableSphereContext, Vec<Link<MemoIpld>>)> {
58        let (mut sphere_context, _) = simulated_sphere_context(Access::ReadWrite, None).await?;
59        let mut versions = Vec::new();
60        let store = sphere_context.sphere_context().await?.db().clone();
61
62        for (content_change, petname_change) in SCAFFOLD_CHANGES.iter() {
63            for slug in *content_change {
64                sphere_context
65                    .write(
66                        slug,
67                        &ContentType::Subtext,
68                        format!("{} are cool", slug).as_bytes(),
69                        None,
70                    )
71                    .await?;
72            }
73
74            for petname in *petname_change {
75                let (id, record, _) = make_valid_link_record(&mut UcanStore(store.clone())).await?;
76                sphere_context.set_petname(petname, Some(id)).await?;
77                versions.push(sphere_context.save(None).await?);
78                sphere_context.set_petname_record(petname, &record).await?;
79            }
80
81            versions.push(sphere_context.save(None).await?);
82        }
83
84        let additional_device_credential = generate_ed25519_key();
85        let additional_device_did = additional_device_credential.get_did().await?.into();
86        let additional_device_authorization = sphere_context
87            .authorize("otherdevice", &additional_device_did)
88            .await?;
89
90        sphere_context.save(None).await?;
91
92        sphere_context
93            .revoke_authorization(&additional_device_authorization)
94            .await?;
95
96        sphere_context.save(None).await?;
97
98        Ok((sphere_context, versions))
99    }
100
101    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
102    #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
103    async fn it_includes_all_link_records_and_proofs_from_the_address_book() -> Result<()> {
104        initialize_tracing(None);
105
106        let (mut sphere_context, _) = simulated_sphere_context(Access::ReadWrite, None).await?;
107        let mut db = sphere_context.sphere_context().await?.db().clone();
108
109        let (foo_did, foo_link_record, foo_link_record_link) =
110            make_valid_link_record(&mut db).await?;
111
112        sphere_context.set_petname("foo", Some(foo_did)).await?;
113        sphere_context.save(None).await?;
114        sphere_context
115            .set_petname_record("foo", &foo_link_record)
116            .await?;
117        let final_version = sphere_context.save(None).await?;
118
119        let mut other_store = MemoryStore::default();
120
121        let stream = memo_body_stream(
122            sphere_context.sphere_context().await?.db().clone(),
123            &final_version,
124            false,
125        );
126
127        tokio::pin!(stream);
128
129        while let Some((cid, block)) = stream.try_next().await? {
130            debug!("Received {cid}");
131            other_store.put_block(&cid, &block).await?;
132        }
133
134        let ucan_store = UcanStore(other_store);
135
136        let link_record =
137            LinkRecord::try_from(ucan_store.require_token(&foo_link_record_link).await?)?;
138
139        assert_eq!(link_record, foo_link_record);
140
141        link_record.collect_proofs(&ucan_store).await?;
142
143        Ok(())
144    }
145
146    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
147    #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
148    async fn it_can_stream_all_blocks_in_a_sphere_version() -> Result<()> {
149        initialize_tracing(None);
150
151        let (sphere_context, _) = scaffold_sphere_context_with_history().await?;
152
153        let final_version = sphere_context.version().await?;
154
155        let mut other_store = MemoryStore::default();
156
157        let mut received = BTreeSet::new();
158
159        let stream = memo_body_stream(
160            sphere_context.sphere_context().await?.db().clone(),
161            &final_version,
162            false,
163        );
164
165        tokio::pin!(stream);
166
167        while let Some((cid, block)) = stream.try_next().await? {
168            debug!("Received {cid}");
169            assert!(
170                !received.contains(&cid),
171                "Got {cid} but we already received it",
172            );
173            received.insert(cid);
174            other_store.put_block(&cid, &block).await?;
175        }
176
177        let sphere = Sphere::at(&final_version, &other_store);
178
179        let content = sphere.get_content().await?;
180        let identities = sphere.get_address_book().await?.get_identities().await?;
181
182        for (content_change, petname_change) in SCAFFOLD_CHANGES.iter() {
183            for slug in *content_change {
184                let _ = content.get(&slug.to_string()).await?.cloned().unwrap();
185            }
186
187            for petname in *petname_change {
188                let _ = identities.get(&petname.to_string()).await?;
189            }
190        }
191
192        touch_all_sphere_blocks(&sphere).await?;
193
194        Ok(())
195    }
196
197    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
198    #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
199    async fn it_can_stream_all_delta_blocks_for_a_range_of_history() -> Result<()> {
200        initialize_tracing(None);
201
202        let (sphere_context, versions) = scaffold_sphere_context_with_history().await?;
203
204        let original_store = sphere_context.sphere_context().await?.db().clone();
205
206        let mut other_store = MemoryStore::default();
207
208        let first_version = versions.first().unwrap();
209        let stream = memo_body_stream(original_store.clone(), first_version, false);
210
211        tokio::pin!(stream);
212
213        while let Some((cid, block)) = stream.try_next().await? {
214            other_store.put_block(&cid, &block).await?;
215        }
216
217        let sphere = Sphere::at(first_version, &other_store);
218
219        touch_all_sphere_blocks(&sphere).await?;
220
221        for i in 1..=3 {
222            let version = versions.get(i).unwrap();
223            let sphere = Sphere::at(version, &other_store);
224
225            assert!(touch_all_sphere_blocks(&sphere).await.is_err());
226        }
227
228        let stream = memo_history_stream(
229            original_store,
230            versions.last().unwrap(),
231            Some(first_version),
232            false,
233        );
234
235        tokio::pin!(stream);
236
237        while let Some((cid, block)) = stream.try_next().await? {
238            other_store.put_block(&cid, &block).await?;
239        }
240
241        for i in 1..=3 {
242            let version = versions.get(i).unwrap();
243            let sphere = Sphere::at(version, &other_store);
244            sphere.hydrate().await?;
245
246            touch_all_sphere_blocks(&sphere).await?;
247        }
248
249        Ok(())
250    }
251
252    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
253    #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
254    async fn it_can_stream_all_blocks_in_some_sphere_content() -> Result<()> {
255        initialize_tracing(None);
256
257        let (mut sphere_context, _) = simulated_sphere_context(Access::ReadWrite, None).await?;
258        let mut db = sphere_context.sphere_context().await?.db_mut().clone();
259
260        let chunks = [b"foo", b"bar", b"baz"];
261
262        let mut next_chunk_cid = None;
263
264        for bytes in chunks.iter().rev() {
265            next_chunk_cid = Some(
266                db.save::<DagCborCodec, _>(&BodyChunkIpld {
267                    bytes: bytes.to_vec(),
268                    next: next_chunk_cid,
269                })
270                .await?,
271            );
272        }
273
274        let content_cid = sphere_context
275            .link("foo", &ContentType::Bytes, &next_chunk_cid.unwrap(), None)
276            .await?;
277
278        let stream = memo_body_stream(
279            sphere_context.sphere_context().await?.db().clone(),
280            &content_cid,
281            false,
282        );
283
284        let mut store = MemoryStore::default();
285
286        tokio::pin!(stream);
287
288        while let Some((cid, block)) = stream.try_next().await? {
289            store.put_block(&cid, &block).await?;
290        }
291
292        let memo = store.load::<DagCborCodec, MemoIpld>(&content_cid).await?;
293
294        let mut buffer = Vec::new();
295        let body_stream = BodyChunkDecoder(&memo.body, &store).stream();
296
297        tokio::pin!(body_stream);
298
299        while let Some(bytes) = body_stream.try_next().await? {
300            buffer.append(&mut Vec::from(bytes));
301        }
302
303        assert_eq!(buffer.as_slice(), b"foobarbaz");
304
305        Ok(())
306    }
307
308    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
309    #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
310    async fn it_can_stream_all_blocks_in_a_sphere_version_as_a_car() -> Result<()> {
311        initialize_tracing(None);
312
313        let (mut sphere_context, _) = scaffold_sphere_context_with_history().await?;
314
315        let mut db = sphere_context.sphere_context().await?.db().clone();
316        let (id, link_record, _) = make_valid_link_record(&mut db).await?;
317        sphere_context.set_petname("hasrecord", Some(id)).await?;
318        sphere_context.save(None).await?;
319        sphere_context
320            .set_petname_record("hasrecord", &link_record)
321            .await?;
322        sphere_context.save(None).await?;
323
324        let final_version = sphere_context.version().await?;
325
326        let mut other_store = MemoryStore::default();
327
328        let stream = to_car_stream(
329            vec![final_version.into()],
330            memo_body_stream(db.clone(), &final_version, false),
331        );
332
333        let block_stream = from_car_stream(stream);
334
335        let mut received = BTreeSet::new();
336        tokio::pin!(block_stream);
337
338        while let Some((cid, block)) = block_stream.try_next().await? {
339            debug!("Received {cid}");
340            assert!(
341                !received.contains(&cid),
342                "Got {cid} but we already received it",
343            );
344            received.insert(cid);
345            other_store.put_block(&cid, &block).await?;
346        }
347
348        let sphere = Sphere::at(&final_version, &other_store);
349
350        let content = sphere.get_content().await?;
351        let identities = sphere.get_address_book().await?.get_identities().await?;
352
353        for (content_change, petname_change) in SCAFFOLD_CHANGES.iter() {
354            for slug in *content_change {
355                let _ = content.get(&slug.to_string()).await?.cloned().unwrap();
356            }
357
358            for petname in *petname_change {
359                let _ = identities.get(&petname.to_string()).await?;
360            }
361        }
362
363        let has_record = identities.get(&"hasrecord".into()).await?.unwrap();
364        let has_record_version = has_record.link_record(&UcanStore(other_store)).await;
365
366        assert!(
367            has_record_version.is_some(),
368            "We got a resolved link record from the stream"
369        );
370
371        touch_all_sphere_blocks(&sphere).await?;
372
373        Ok(())
374    }
375
376    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
377    #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
378    async fn it_only_omits_memo_parent_references_when_streaming_sphere_body_with_content(
379    ) -> Result<()> {
380        initialize_tracing(None);
381
382        let (sphere_context, mut versions) = scaffold_sphere_context_with_history().await?;
383
384        debug!(
385            "Versions: {:#?}",
386            versions
387                .iter()
388                .map(|cid| cid.to_string())
389                .collect::<Vec<String>>()
390        );
391
392        let store = sphere_context.lock().await.db().clone();
393
394        let last_version = versions.pop().unwrap();
395        let last_version_parent = versions.pop().unwrap();
396
397        let mut links_referenced = BTreeSet::new();
398        let mut links_included = BTreeSet::new();
399
400        // The root is referenced implicitly
401        links_referenced.insert(*last_version);
402
403        let stream = memo_body_stream(store.clone(), &last_version, true);
404
405        tokio::pin!(stream);
406
407        while let Some((cid, block)) = stream.try_next().await? {
408            if cid == *last_version {
409                // Verify that parent of root is what we expect...
410                let memo = store.load::<DagCborCodec, MemoIpld>(&cid).await?;
411                assert_eq!(memo.parent, Some(last_version_parent));
412
413                let codec = DagCborCodec;
414                let mut root_references = BTreeSet::new();
415                codec.references::<Ipld, BTreeSet<Cid>>(&block, &mut root_references)?;
416
417                assert!(root_references.contains(&last_version_parent));
418            }
419
420            links_included.insert(cid);
421
422            match cid.codec() {
423                codec if codec == u64::from(DagCborCodec) => {
424                    let codec = DagCborCodec;
425                    codec.references::<Ipld, BTreeSet<Cid>>(&block, &mut links_referenced)?;
426                }
427                codec if codec == u64::from(RawCodec) => {
428                    let codec = DagCborCodec;
429                    codec.references::<Ipld, BTreeSet<Cid>>(&block, &mut links_referenced)?;
430                }
431                _ => {
432                    unreachable!("No other codecs are used in our DAGs");
433                }
434            }
435        }
436
437        assert!(
438            !links_included.contains(&last_version_parent),
439            "Parent version should not be included"
440        );
441
442        let difference = links_referenced
443            .difference(&links_included)
444            .collect::<Vec<&Cid>>();
445
446        debug!(
447            "Difference: {:#?}",
448            difference
449                .iter()
450                .map(|cid| cid.to_string())
451                .collect::<Vec<String>>()
452        );
453
454        // These files have been each updated once after the first write, so their memos have
455        // parent pointers to old versions that won't be included in the CAR
456        let last_dogs_version = sphere_context
457            .read("dogs")
458            .await?
459            .unwrap()
460            .memo
461            .parent
462            .unwrap();
463        let last_birds_version = sphere_context
464            .read("birds")
465            .await?
466            .unwrap()
467            .memo
468            .parent
469            .unwrap();
470
471        let expected_difference: Vec<&Cid> = vec![
472            &last_version_parent,
473            &last_birds_version,
474            &last_dogs_version,
475        ];
476
477        assert_eq!(difference.len(), expected_difference.len());
478
479        for cid in expected_difference {
480            assert!(difference.contains(&cid));
481        }
482
483        Ok(())
484    }
485}