noosphere_sphere/
walker.rs

1use anyhow::Result;
2use noosphere_core::data::{Did, IdentityIpld, Jwt, Link, MapOperation, MemoIpld};
3use std::{collections::BTreeSet, marker::PhantomData};
4
5use async_stream::try_stream;
6use noosphere_storage::Storage;
7use tokio::io::AsyncRead;
8use tokio_stream::{Stream, StreamExt};
9
10use crate::{
11    content::{SphereContentRead, SphereFile},
12    internal::SphereContextInternal,
13    HasSphereContext, SphereAuthorityRead, SpherePetnameRead,
14};
15
16/// A [SphereWalker] makes it possible to convert anything that implements
17/// [HasSphereContext] into an async [Stream] over sphere content, allowing
18/// incremental iteration over both the breadth of content at any version, or
19/// the depth of changes over a range of history.
20pub struct SphereWalker<'a, C, S>
21where
22    C: HasSphereContext<S>,
23    S: Storage + 'static,
24{
25    has_sphere_context: &'a C,
26    storage: PhantomData<S>,
27}
28
29impl<'a, C, S> From<&'a C> for SphereWalker<'a, C, S>
30where
31    C: HasSphereContext<S>,
32    S: Storage + 'static,
33{
34    fn from(has_sphere_context: &'a C) -> Self {
35        SphereWalker {
36            has_sphere_context,
37            storage: Default::default(),
38        }
39    }
40}
41
42impl<'a, C, S> SphereWalker<'a, C, S>
43where
44    C: SphereAuthorityRead<S> + HasSphereContext<S>,
45    S: Storage + 'static,
46{
47    /// Get a stream that yields a link to every authorization to access the
48    /// sphere along with its corresponding [DelegationIpld]. Note that since a
49    /// revocation may be issued without necessarily removing its revoked
50    /// delegation, this will yield all authorizations regardless of revocation
51    /// status.
52    pub fn authorization_stream(
53        &self,
54    ) -> impl Stream<Item = Result<(String, Did, Link<Jwt>)>> + '_ {
55        try_stream! {
56            let sphere = self.has_sphere_context.to_sphere().await?;
57            let delegations = sphere.get_authority().await?.get_delegations().await?;
58            let stream = delegations.into_stream().await?;
59
60            for await entry in stream {
61                let (link, delegation) = entry?;
62                let ucan = delegation.resolve_ucan(sphere.store()).await?;
63                yield (delegation.name, Did(ucan.audience().to_string()), link);
64            }
65        }
66    }
67
68    /// Get a [BTreeSet] whose members are all the [Link<Jwt>]s to
69    /// authorizations that enable sphere access as of this version of the
70    /// sphere. Note that the full space of authoriztions may be very large; for
71    /// a more space-efficient approach, use
72    /// [SphereWalker::authorization_stream] to incrementally access all
73    /// authorizations in the sphere.
74    ///
75    /// This method is forgiving of missing or corrupted data, and will yield an
76    /// incomplete set of authorizations in the case that some or all names are
77    /// not able to be accessed.
78    pub async fn list_authorizations(&self) -> Result<BTreeSet<Link<Jwt>>> {
79        let sphere_identity = self.has_sphere_context.identity().await?;
80        let authorization_stream = self.authorization_stream();
81
82        tokio::pin!(authorization_stream);
83
84        Ok(authorization_stream
85            .fold(BTreeSet::new(), |mut delegations, another_delegation| {
86                match another_delegation {
87                    Ok((_, _, delegation)) => {
88                        delegations.insert(delegation);
89                    }
90                    Err(error) => {
91                        warn!(
92                            "Could not read a petname from {}: {}",
93                            sphere_identity, error
94                        )
95                    }
96                };
97                delegations
98            })
99            .await)
100    }
101}
102
103impl<'a, C, S> SphereWalker<'a, C, S>
104where
105    C: SpherePetnameRead<S> + HasSphereContext<S>,
106    S: Storage + 'static,
107{
108    /// Same as [SphereWalker::petname_stream], but consumes the [SphereWalker].
109    /// This is useful in cases where it would otherwise be necessary to borrow
110    /// a reference to [SphereWalker] for a static lifetime.
111    pub fn into_petname_stream(self) -> impl Stream<Item = Result<(String, IdentityIpld)>> + 'a {
112        try_stream! {
113            let sphere = self.has_sphere_context.to_sphere().await?;
114            let petnames = sphere.get_address_book().await?.get_identities().await?;
115            let stream = petnames.into_stream().await?;
116
117            for await entry in stream {
118                let (petname, address) = entry?;
119                yield (petname, address);
120            }
121        }
122    }
123
124    /// Get a stream that yields every petname in the namespace along with its
125    /// corresponding [AddressIpld]. This is useful for iterating over sphere
126    /// petnames incrementally without having to load the entire index into
127    /// memory at once.
128    pub fn petname_stream(&self) -> impl Stream<Item = Result<(String, IdentityIpld)>> + '_ {
129        try_stream! {
130            let sphere = self.has_sphere_context.to_sphere().await?;
131            let petnames = sphere.get_address_book().await?.get_identities().await?;
132            let stream = petnames.into_stream().await?;
133
134            for await entry in stream {
135                let (petname, address) = entry?;
136                yield (petname, address);
137            }
138        }
139    }
140
141    /// Get a stream that yields the set of petnames that changed at each
142    /// revision of the backing sphere, up to but excluding an optional `since`
143    /// CID parameter. To stream the entire history, pass `None` as the
144    /// parameter.
145    pub fn petname_change_stream<'b>(
146        &'b self,
147        since: Option<&'a Link<MemoIpld>>,
148    ) -> impl Stream<Item = Result<(Link<MemoIpld>, BTreeSet<String>)>> + 'b {
149        try_stream! {
150            let sphere = self.has_sphere_context.to_sphere().await?;
151            let since = since.cloned();
152            let stream = sphere.into_identities_changelog_stream(since.as_ref());
153
154            for await change in stream {
155                let (cid, changelog) = change?;
156                let mut changed_petnames = BTreeSet::new();
157
158                for operation in changelog.changes {
159                    let petname = match operation {
160                        MapOperation::Add { key, .. } => key,
161                        MapOperation::Remove { key } => key,
162                    };
163                    changed_petnames.insert(petname);
164                }
165
166                yield (cid, changed_petnames);
167            }
168        }
169    }
170
171    /// Get a stream that yields the set of petnames that changed at each
172    /// revision of the backing sphere, up to but excluding an optional `since`
173    /// CID parameter. To stream the entire history, pass `None` as the
174    /// parameter.
175    pub fn into_petname_change_stream(
176        self,
177        since: Option<&'a Link<MemoIpld>>,
178    ) -> impl Stream<Item = Result<(Link<MemoIpld>, BTreeSet<String>)>> + '_ {
179        try_stream! {
180            let sphere = self.has_sphere_context.to_sphere().await?;
181            let since = since.cloned();
182            let stream = sphere.into_identities_changelog_stream(since.as_ref());
183
184            for await change in stream {
185                let (cid, changelog) = change?;
186                let mut changed_petnames = BTreeSet::new();
187
188                for operation in changelog.changes {
189                    let petname = match operation {
190                        MapOperation::Add { key, .. } => key,
191                        MapOperation::Remove { key } => key,
192                    };
193                    changed_petnames.insert(petname);
194                }
195
196                yield (cid, changed_petnames);
197            }
198        }
199    }
200
201    /// Get a [BTreeSet] whose members are all the petnames that have addresses
202    /// as of this version of the sphere. Note that the full space of names may
203    /// be very large; for a more space-efficient approach, use
204    /// [SphereWalker::petname_stream] to incrementally access all petnames in
205    /// the sphere.
206    ///
207    /// This method is forgiving of missing or corrupted data, and will yield an
208    /// incomplete set of names in the case that some or all names are not able
209    /// to be accessed.
210    pub async fn list_petnames(&self) -> Result<BTreeSet<String>> {
211        let sphere_identity = self.has_sphere_context.identity().await?;
212        let petname_stream = self.petname_stream();
213
214        tokio::pin!(petname_stream);
215
216        Ok(petname_stream
217            .fold(BTreeSet::new(), |mut petnames, another_petname| {
218                match another_petname {
219                    Ok((petname, _)) => {
220                        petnames.insert(petname);
221                    }
222                    Err(error) => {
223                        warn!(
224                            "Could not read a petname from {}: {}",
225                            sphere_identity, error
226                        )
227                    }
228                };
229                petnames
230            })
231            .await)
232    }
233
234    /// Get a [BTreeSet] whose members are all the petnames whose values have
235    /// changed at least once since the provided version of the sphere
236    /// (exclusive of the provided version; use `None` to get all petnames
237    /// changed since the beginning of the sphere's history).
238    ///
239    /// This method is forgiving of missing or corrupted history, and will yield
240    /// an incomplete set of changes in the case that some or all changes are
241    /// not able to be accessed.
242    ///
243    /// Note that this operation will scale in memory consumption and duration
244    /// proportionally to the size of the sphere and the length of its history.
245    /// For a more efficient method of accessing changes, consider using
246    /// [SphereWalker::petname_change_stream] instead.
247    pub async fn petname_changes(
248        &self,
249        since: Option<&Link<MemoIpld>>,
250    ) -> Result<BTreeSet<String>> {
251        let sphere_identity = self.has_sphere_context.identity().await?;
252        let change_stream = self.petname_change_stream(since);
253
254        tokio::pin!(change_stream);
255
256        Ok(change_stream
257            .fold(BTreeSet::new(), |mut all, some| {
258                match some {
259                    Ok((_, mut changes)) => all.append(&mut changes),
260                    Err(error) => warn!(
261                        "Could not read some changes from {}: {}",
262                        sphere_identity, error
263                    ),
264                };
265                all
266            })
267            .await)
268    }
269}
270
271impl<'a, C, S> SphereWalker<'a, C, S>
272where
273    C: SphereContentRead<S> + HasSphereContext<S>,
274    S: Storage + 'static,
275{
276    /// Same as [SphereWalker::content_stream], but consumes the [SphereWalker].
277    /// This is useful in cases where it would otherwise be necessary to borrow
278    /// a reference to [SphereWalker] for a static lifetime.
279    pub fn into_content_stream(
280        self,
281    ) -> impl Stream<Item = Result<(String, SphereFile<impl AsyncRead>)>> + 'a {
282        try_stream! {
283            let sphere = self.has_sphere_context.to_sphere().await?;
284            let content = sphere.get_content().await?;
285            let stream = content.into_stream().await?;
286
287            for await entry in stream {
288                let (key, memo_link) = entry?;
289                let file = self.has_sphere_context.get_file(sphere.cid(), memo_link).await?;
290
291                yield (key.clone(), file);
292            }
293        }
294    }
295
296    /// Get a stream that yields every slug in the namespace along with its
297    /// corresponding [SphereFile]. This is useful for iterating over sphere
298    /// content incrementally without having to load the entire index into
299    /// memory at once.
300    pub fn content_stream(
301        &self,
302    ) -> impl Stream<Item = Result<(String, SphereFile<impl AsyncRead>)>> + '_ {
303        try_stream! {
304            let sphere = self.has_sphere_context.to_sphere().await?;
305            let links = sphere.get_content().await?;
306            let stream = links.into_stream().await?;
307
308            for await entry in stream {
309                let (key, memo) = entry?;
310                let file = self.has_sphere_context.get_file(sphere.cid(), memo).await?;
311
312                yield (key.clone(), file);
313            }
314        }
315    }
316
317    /// Get a stream that yields the set of slugs that changed at each revision
318    /// of the backing sphere, up to but excluding an optional CID. To stream
319    /// the entire history, pass `None` as the parameter.
320    pub fn into_content_change_stream(
321        self,
322        since: Option<&'a Link<MemoIpld>>,
323    ) -> impl Stream<Item = Result<(Link<MemoIpld>, BTreeSet<String>)>> + '_ {
324        try_stream! {
325            let sphere = self.has_sphere_context.to_sphere().await?;
326            let since = since.cloned();
327            let stream = sphere.into_content_changelog_stream(since.as_ref());
328
329            for await change in stream {
330                let (cid, changelog) = change?;
331                let mut changed_slugs = BTreeSet::new();
332
333                for operation in changelog.changes {
334                    let slug = match operation {
335                        MapOperation::Add { key, .. } => key,
336                        MapOperation::Remove { key } => key,
337                    };
338                    changed_slugs.insert(slug);
339                }
340
341                yield (cid, changed_slugs);
342            }
343        }
344    }
345
346    /// Get a stream that yields the set of slugs that changed at each revision
347    /// of the backing sphere, up to but excluding an optional CID. To stream
348    /// the entire history, pass `None` as the parameter.
349    pub fn content_change_stream<'b>(
350        &'b self,
351        since: Option<&'b Link<MemoIpld>>,
352    ) -> impl Stream<Item = Result<(Link<MemoIpld>, BTreeSet<String>)>> + 'b {
353        try_stream! {
354            let sphere = self.has_sphere_context.to_sphere().await?;
355            let since = since.cloned();
356            let stream = sphere.into_content_changelog_stream(since.as_ref());
357
358            for await change in stream {
359                let (cid, changelog) = change?;
360                let mut changed_slugs = BTreeSet::new();
361
362                for operation in changelog.changes {
363                    let slug = match operation {
364                        MapOperation::Add { key, .. } => key,
365                        MapOperation::Remove { key } => key,
366                    };
367                    changed_slugs.insert(slug);
368                }
369
370                yield (cid, changed_slugs);
371            }
372        }
373    }
374
375    /// Get a [BTreeSet] whose members are all the slugs that have values as of
376    /// this version of the sphere. Note that the full space of slugs may be
377    /// very large; for a more space-efficient approach, use
378    /// [SphereWalker::content_stream] or [SphereWalker::into_content_stream] to
379    /// incrementally access all slugs in the sphere.
380    ///
381    /// This method is forgiving of missing or corrupted data, and will yield an
382    /// incomplete set of links in the case that some or all links are not able
383    /// to be accessed.
384    pub async fn list_slugs(&self) -> Result<BTreeSet<String>> {
385        let sphere_identity = self.has_sphere_context.identity().await?;
386        let link_stream = self.content_stream();
387
388        tokio::pin!(link_stream);
389
390        Ok(link_stream
391            .fold(BTreeSet::new(), |mut links, another_link| {
392                match another_link {
393                    Ok((slug, _)) => {
394                        links.insert(slug);
395                    }
396                    Err(error) => {
397                        warn!("Could not read a link from {}: {}", sphere_identity, error)
398                    }
399                };
400                links
401            })
402            .await)
403    }
404
405    /// Get a [BTreeSet] whose members are all the slugs whose values have
406    /// changed at least once since the provided version of the sphere
407    /// (exclusive of the provided version; use `None` to get all slugs changed
408    /// since the beginning of the sphere's history).
409    ///
410    /// This method is forgiving of missing or corrupted history, and will yield
411    /// an incomplete set of changes in the case that some or all changes are
412    /// not able to be accessed.
413    ///
414    /// Note that this operation will scale in memory consumption and duration
415    /// proportionally to the size of the sphere and the length of its history.
416    /// For a more efficient method of accessing changes, consider using
417    /// [SphereWalker::content_change_stream] instead.
418    pub async fn content_changes(
419        &self,
420        since: Option<&Link<MemoIpld>>,
421    ) -> Result<BTreeSet<String>> {
422        let sphere_identity = self.has_sphere_context.identity().await?;
423        let change_stream = self.content_change_stream(since);
424
425        tokio::pin!(change_stream);
426
427        Ok(change_stream
428            .fold(BTreeSet::new(), |mut all, some| {
429                match some {
430                    Ok((_, mut changes)) => all.append(&mut changes),
431                    Err(error) => warn!(
432                        "Could not read some changes from {}: {}",
433                        sphere_identity, error
434                    ),
435                };
436                all
437            })
438            .await)
439    }
440}
441
442#[cfg(test)]
443mod tests {
444    use anyhow::Result;
445    use std::collections::BTreeSet;
446
447    use noosphere_core::data::{ContentType, Did};
448    use tokio::io::AsyncReadExt;
449    use tokio_stream::StreamExt;
450
451    #[cfg(target_arch = "wasm32")]
452    use wasm_bindgen_test::wasm_bindgen_test;
453
454    #[cfg(target_arch = "wasm32")]
455    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
456
457    use super::SphereWalker;
458    use crate::helpers::{simulated_sphere_context, SimulationAccess};
459    use crate::{HasMutableSphereContext, SphereAuthorityWrite, SphereContentWrite, SphereCursor};
460
461    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
462    #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
463    async fn it_can_be_initialized_with_a_context_or_a_cursor() {
464        let (sphere_context, _) = simulated_sphere_context(SimulationAccess::ReadWrite, None)
465            .await
466            .unwrap();
467        let mut cursor = SphereCursor::latest(sphere_context.clone());
468
469        let changes = vec![
470            vec!["dogs", "birds"],
471            vec!["cats", "dogs"],
472            vec!["birds"],
473            vec!["cows", "beetles"],
474        ];
475
476        for change in changes {
477            for slug in change {
478                cursor
479                    .write(slug, &ContentType::Subtext, b"are cool".as_ref(), None)
480                    .await
481                    .unwrap();
482            }
483
484            cursor.save(None).await.unwrap();
485        }
486
487        let walker_cursor = SphereWalker::from(&cursor);
488        let walker_context = SphereWalker::from(&sphere_context);
489
490        let slugs_cursor = walker_cursor.list_slugs().await.unwrap();
491        let slugs_context = walker_context.list_slugs().await.unwrap();
492
493        assert_eq!(slugs_cursor.len(), 5);
494        assert_eq!(slugs_cursor, slugs_context);
495    }
496
497    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
498    #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
499    async fn it_can_list_all_slugs_currently_in_a_sphere() {
500        let (sphere_context, _) = simulated_sphere_context(SimulationAccess::ReadWrite, None)
501            .await
502            .unwrap();
503        let mut cursor = SphereCursor::latest(sphere_context);
504
505        let changes = vec![
506            vec!["dogs", "birds"],
507            vec!["cats", "dogs"],
508            vec!["birds"],
509            vec!["cows", "beetles"],
510        ];
511
512        for change in changes {
513            for slug in change {
514                cursor
515                    .write(slug, &ContentType::Subtext, b"are cool".as_ref(), None)
516                    .await
517                    .unwrap();
518            }
519
520            cursor.save(None).await.unwrap();
521        }
522
523        let walker_cursor = cursor.clone();
524        let walker = SphereWalker::from(&walker_cursor);
525        let slugs = walker.list_slugs().await.unwrap();
526
527        assert_eq!(slugs.len(), 5);
528
529        cursor.remove("dogs").await.unwrap();
530        cursor.save(None).await.unwrap();
531
532        let slugs = walker.list_slugs().await.unwrap();
533
534        assert_eq!(slugs.len(), 4);
535    }
536
537    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
538    #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
539    async fn it_can_list_all_authorizations_currently_in_a_sphere() -> Result<()> {
540        let (sphere_context, _) =
541            simulated_sphere_context(SimulationAccess::ReadWrite, None).await?;
542
543        let mut cursor = SphereCursor::latest(sphere_context);
544        let authorizations_to_add = 10;
545
546        for i in 0..authorizations_to_add {
547            cursor
548                .authorize(&format!("foo{}", i), &Did(format!("did:key:foo{}", i)))
549                .await?;
550        }
551
552        cursor.save(None).await?;
553
554        let authorizations = SphereWalker::from(&cursor).list_authorizations().await?;
555
556        assert_eq!(authorizations.len(), authorizations_to_add + 1);
557
558        Ok(())
559    }
560
561    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
562    #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
563    async fn it_can_stream_the_whole_index() {
564        let (sphere_context, _) = simulated_sphere_context(SimulationAccess::ReadWrite, None)
565            .await
566            .unwrap();
567        let mut cursor = SphereCursor::latest(sphere_context);
568
569        let expected = BTreeSet::<(String, String)>::from([
570            ("cats".into(), "Cats are awesome".into()),
571            ("dogs".into(), "Dogs are pretty cool".into()),
572            ("birds".into(), "Birds rights".into()),
573            ("mice".into(), "Mice like cookies".into()),
574        ]);
575
576        for (slug, content) in &expected {
577            cursor
578                .write(slug.as_str(), &ContentType::Subtext, content.as_ref(), None)
579                .await
580                .unwrap();
581
582            cursor.save(None).await.unwrap();
583        }
584
585        let mut actual = BTreeSet::new();
586        let walker = SphereWalker::from(&cursor);
587        let stream = walker.content_stream();
588
589        tokio::pin!(stream);
590
591        while let Some(Ok((slug, mut file))) = stream.next().await {
592            let mut contents = String::new();
593            file.contents.read_to_string(&mut contents).await.unwrap();
594            actual.insert((slug, contents));
595        }
596
597        assert_eq!(expected, actual);
598    }
599}