1use anyhow::Result;
2use noosphere_core::data::{Did, IdentityIpld, Jwt, Link, MapOperation, MemoIpld};
3use std::{collections::BTreeSet, marker::PhantomData};
4
5use async_stream::try_stream;
6use noosphere_storage::Storage;
7use tokio::io::AsyncRead;
8use tokio_stream::{Stream, StreamExt};
9
10use crate::{
11 content::{SphereContentRead, SphereFile},
12 internal::SphereContextInternal,
13 HasSphereContext, SphereAuthorityRead, SpherePetnameRead,
14};
15
16pub struct SphereWalker<'a, C, S>
21where
22 C: HasSphereContext<S>,
23 S: Storage + 'static,
24{
25 has_sphere_context: &'a C,
26 storage: PhantomData<S>,
27}
28
29impl<'a, C, S> From<&'a C> for SphereWalker<'a, C, S>
30where
31 C: HasSphereContext<S>,
32 S: Storage + 'static,
33{
34 fn from(has_sphere_context: &'a C) -> Self {
35 SphereWalker {
36 has_sphere_context,
37 storage: Default::default(),
38 }
39 }
40}
41
42impl<'a, C, S> SphereWalker<'a, C, S>
43where
44 C: SphereAuthorityRead<S> + HasSphereContext<S>,
45 S: Storage + 'static,
46{
47 pub fn authorization_stream(
53 &self,
54 ) -> impl Stream<Item = Result<(String, Did, Link<Jwt>)>> + '_ {
55 try_stream! {
56 let sphere = self.has_sphere_context.to_sphere().await?;
57 let delegations = sphere.get_authority().await?.get_delegations().await?;
58 let stream = delegations.into_stream().await?;
59
60 for await entry in stream {
61 let (link, delegation) = entry?;
62 let ucan = delegation.resolve_ucan(sphere.store()).await?;
63 yield (delegation.name, Did(ucan.audience().to_string()), link);
64 }
65 }
66 }
67
68 pub async fn list_authorizations(&self) -> Result<BTreeSet<Link<Jwt>>> {
79 let sphere_identity = self.has_sphere_context.identity().await?;
80 let authorization_stream = self.authorization_stream();
81
82 tokio::pin!(authorization_stream);
83
84 Ok(authorization_stream
85 .fold(BTreeSet::new(), |mut delegations, another_delegation| {
86 match another_delegation {
87 Ok((_, _, delegation)) => {
88 delegations.insert(delegation);
89 }
90 Err(error) => {
91 warn!(
92 "Could not read a petname from {}: {}",
93 sphere_identity, error
94 )
95 }
96 };
97 delegations
98 })
99 .await)
100 }
101}
102
103impl<'a, C, S> SphereWalker<'a, C, S>
104where
105 C: SpherePetnameRead<S> + HasSphereContext<S>,
106 S: Storage + 'static,
107{
108 pub fn into_petname_stream(self) -> impl Stream<Item = Result<(String, IdentityIpld)>> + 'a {
112 try_stream! {
113 let sphere = self.has_sphere_context.to_sphere().await?;
114 let petnames = sphere.get_address_book().await?.get_identities().await?;
115 let stream = petnames.into_stream().await?;
116
117 for await entry in stream {
118 let (petname, address) = entry?;
119 yield (petname, address);
120 }
121 }
122 }
123
124 pub fn petname_stream(&self) -> impl Stream<Item = Result<(String, IdentityIpld)>> + '_ {
129 try_stream! {
130 let sphere = self.has_sphere_context.to_sphere().await?;
131 let petnames = sphere.get_address_book().await?.get_identities().await?;
132 let stream = petnames.into_stream().await?;
133
134 for await entry in stream {
135 let (petname, address) = entry?;
136 yield (petname, address);
137 }
138 }
139 }
140
141 pub fn petname_change_stream<'b>(
146 &'b self,
147 since: Option<&'a Link<MemoIpld>>,
148 ) -> impl Stream<Item = Result<(Link<MemoIpld>, BTreeSet<String>)>> + 'b {
149 try_stream! {
150 let sphere = self.has_sphere_context.to_sphere().await?;
151 let since = since.cloned();
152 let stream = sphere.into_identities_changelog_stream(since.as_ref());
153
154 for await change in stream {
155 let (cid, changelog) = change?;
156 let mut changed_petnames = BTreeSet::new();
157
158 for operation in changelog.changes {
159 let petname = match operation {
160 MapOperation::Add { key, .. } => key,
161 MapOperation::Remove { key } => key,
162 };
163 changed_petnames.insert(petname);
164 }
165
166 yield (cid, changed_petnames);
167 }
168 }
169 }
170
171 pub fn into_petname_change_stream(
176 self,
177 since: Option<&'a Link<MemoIpld>>,
178 ) -> impl Stream<Item = Result<(Link<MemoIpld>, BTreeSet<String>)>> + '_ {
179 try_stream! {
180 let sphere = self.has_sphere_context.to_sphere().await?;
181 let since = since.cloned();
182 let stream = sphere.into_identities_changelog_stream(since.as_ref());
183
184 for await change in stream {
185 let (cid, changelog) = change?;
186 let mut changed_petnames = BTreeSet::new();
187
188 for operation in changelog.changes {
189 let petname = match operation {
190 MapOperation::Add { key, .. } => key,
191 MapOperation::Remove { key } => key,
192 };
193 changed_petnames.insert(petname);
194 }
195
196 yield (cid, changed_petnames);
197 }
198 }
199 }
200
201 pub async fn list_petnames(&self) -> Result<BTreeSet<String>> {
211 let sphere_identity = self.has_sphere_context.identity().await?;
212 let petname_stream = self.petname_stream();
213
214 tokio::pin!(petname_stream);
215
216 Ok(petname_stream
217 .fold(BTreeSet::new(), |mut petnames, another_petname| {
218 match another_petname {
219 Ok((petname, _)) => {
220 petnames.insert(petname);
221 }
222 Err(error) => {
223 warn!(
224 "Could not read a petname from {}: {}",
225 sphere_identity, error
226 )
227 }
228 };
229 petnames
230 })
231 .await)
232 }
233
234 pub async fn petname_changes(
248 &self,
249 since: Option<&Link<MemoIpld>>,
250 ) -> Result<BTreeSet<String>> {
251 let sphere_identity = self.has_sphere_context.identity().await?;
252 let change_stream = self.petname_change_stream(since);
253
254 tokio::pin!(change_stream);
255
256 Ok(change_stream
257 .fold(BTreeSet::new(), |mut all, some| {
258 match some {
259 Ok((_, mut changes)) => all.append(&mut changes),
260 Err(error) => warn!(
261 "Could not read some changes from {}: {}",
262 sphere_identity, error
263 ),
264 };
265 all
266 })
267 .await)
268 }
269}
270
271impl<'a, C, S> SphereWalker<'a, C, S>
272where
273 C: SphereContentRead<S> + HasSphereContext<S>,
274 S: Storage + 'static,
275{
276 pub fn into_content_stream(
280 self,
281 ) -> impl Stream<Item = Result<(String, SphereFile<impl AsyncRead>)>> + 'a {
282 try_stream! {
283 let sphere = self.has_sphere_context.to_sphere().await?;
284 let content = sphere.get_content().await?;
285 let stream = content.into_stream().await?;
286
287 for await entry in stream {
288 let (key, memo_link) = entry?;
289 let file = self.has_sphere_context.get_file(sphere.cid(), memo_link).await?;
290
291 yield (key.clone(), file);
292 }
293 }
294 }
295
296 pub fn content_stream(
301 &self,
302 ) -> impl Stream<Item = Result<(String, SphereFile<impl AsyncRead>)>> + '_ {
303 try_stream! {
304 let sphere = self.has_sphere_context.to_sphere().await?;
305 let links = sphere.get_content().await?;
306 let stream = links.into_stream().await?;
307
308 for await entry in stream {
309 let (key, memo) = entry?;
310 let file = self.has_sphere_context.get_file(sphere.cid(), memo).await?;
311
312 yield (key.clone(), file);
313 }
314 }
315 }
316
317 pub fn into_content_change_stream(
321 self,
322 since: Option<&'a Link<MemoIpld>>,
323 ) -> impl Stream<Item = Result<(Link<MemoIpld>, BTreeSet<String>)>> + '_ {
324 try_stream! {
325 let sphere = self.has_sphere_context.to_sphere().await?;
326 let since = since.cloned();
327 let stream = sphere.into_content_changelog_stream(since.as_ref());
328
329 for await change in stream {
330 let (cid, changelog) = change?;
331 let mut changed_slugs = BTreeSet::new();
332
333 for operation in changelog.changes {
334 let slug = match operation {
335 MapOperation::Add { key, .. } => key,
336 MapOperation::Remove { key } => key,
337 };
338 changed_slugs.insert(slug);
339 }
340
341 yield (cid, changed_slugs);
342 }
343 }
344 }
345
346 pub fn content_change_stream<'b>(
350 &'b self,
351 since: Option<&'b Link<MemoIpld>>,
352 ) -> impl Stream<Item = Result<(Link<MemoIpld>, BTreeSet<String>)>> + 'b {
353 try_stream! {
354 let sphere = self.has_sphere_context.to_sphere().await?;
355 let since = since.cloned();
356 let stream = sphere.into_content_changelog_stream(since.as_ref());
357
358 for await change in stream {
359 let (cid, changelog) = change?;
360 let mut changed_slugs = BTreeSet::new();
361
362 for operation in changelog.changes {
363 let slug = match operation {
364 MapOperation::Add { key, .. } => key,
365 MapOperation::Remove { key } => key,
366 };
367 changed_slugs.insert(slug);
368 }
369
370 yield (cid, changed_slugs);
371 }
372 }
373 }
374
375 pub async fn list_slugs(&self) -> Result<BTreeSet<String>> {
385 let sphere_identity = self.has_sphere_context.identity().await?;
386 let link_stream = self.content_stream();
387
388 tokio::pin!(link_stream);
389
390 Ok(link_stream
391 .fold(BTreeSet::new(), |mut links, another_link| {
392 match another_link {
393 Ok((slug, _)) => {
394 links.insert(slug);
395 }
396 Err(error) => {
397 warn!("Could not read a link from {}: {}", sphere_identity, error)
398 }
399 };
400 links
401 })
402 .await)
403 }
404
405 pub async fn content_changes(
419 &self,
420 since: Option<&Link<MemoIpld>>,
421 ) -> Result<BTreeSet<String>> {
422 let sphere_identity = self.has_sphere_context.identity().await?;
423 let change_stream = self.content_change_stream(since);
424
425 tokio::pin!(change_stream);
426
427 Ok(change_stream
428 .fold(BTreeSet::new(), |mut all, some| {
429 match some {
430 Ok((_, mut changes)) => all.append(&mut changes),
431 Err(error) => warn!(
432 "Could not read some changes from {}: {}",
433 sphere_identity, error
434 ),
435 };
436 all
437 })
438 .await)
439 }
440}
441
442#[cfg(test)]
443mod tests {
444 use anyhow::Result;
445 use std::collections::BTreeSet;
446
447 use noosphere_core::data::{ContentType, Did};
448 use tokio::io::AsyncReadExt;
449 use tokio_stream::StreamExt;
450
451 #[cfg(target_arch = "wasm32")]
452 use wasm_bindgen_test::wasm_bindgen_test;
453
454 #[cfg(target_arch = "wasm32")]
455 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
456
457 use super::SphereWalker;
458 use crate::helpers::{simulated_sphere_context, SimulationAccess};
459 use crate::{HasMutableSphereContext, SphereAuthorityWrite, SphereContentWrite, SphereCursor};
460
461 #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
462 #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
463 async fn it_can_be_initialized_with_a_context_or_a_cursor() {
464 let (sphere_context, _) = simulated_sphere_context(SimulationAccess::ReadWrite, None)
465 .await
466 .unwrap();
467 let mut cursor = SphereCursor::latest(sphere_context.clone());
468
469 let changes = vec![
470 vec!["dogs", "birds"],
471 vec!["cats", "dogs"],
472 vec!["birds"],
473 vec!["cows", "beetles"],
474 ];
475
476 for change in changes {
477 for slug in change {
478 cursor
479 .write(slug, &ContentType::Subtext, b"are cool".as_ref(), None)
480 .await
481 .unwrap();
482 }
483
484 cursor.save(None).await.unwrap();
485 }
486
487 let walker_cursor = SphereWalker::from(&cursor);
488 let walker_context = SphereWalker::from(&sphere_context);
489
490 let slugs_cursor = walker_cursor.list_slugs().await.unwrap();
491 let slugs_context = walker_context.list_slugs().await.unwrap();
492
493 assert_eq!(slugs_cursor.len(), 5);
494 assert_eq!(slugs_cursor, slugs_context);
495 }
496
497 #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
498 #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
499 async fn it_can_list_all_slugs_currently_in_a_sphere() {
500 let (sphere_context, _) = simulated_sphere_context(SimulationAccess::ReadWrite, None)
501 .await
502 .unwrap();
503 let mut cursor = SphereCursor::latest(sphere_context);
504
505 let changes = vec![
506 vec!["dogs", "birds"],
507 vec!["cats", "dogs"],
508 vec!["birds"],
509 vec!["cows", "beetles"],
510 ];
511
512 for change in changes {
513 for slug in change {
514 cursor
515 .write(slug, &ContentType::Subtext, b"are cool".as_ref(), None)
516 .await
517 .unwrap();
518 }
519
520 cursor.save(None).await.unwrap();
521 }
522
523 let walker_cursor = cursor.clone();
524 let walker = SphereWalker::from(&walker_cursor);
525 let slugs = walker.list_slugs().await.unwrap();
526
527 assert_eq!(slugs.len(), 5);
528
529 cursor.remove("dogs").await.unwrap();
530 cursor.save(None).await.unwrap();
531
532 let slugs = walker.list_slugs().await.unwrap();
533
534 assert_eq!(slugs.len(), 4);
535 }
536
537 #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
538 #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
539 async fn it_can_list_all_authorizations_currently_in_a_sphere() -> Result<()> {
540 let (sphere_context, _) =
541 simulated_sphere_context(SimulationAccess::ReadWrite, None).await?;
542
543 let mut cursor = SphereCursor::latest(sphere_context);
544 let authorizations_to_add = 10;
545
546 for i in 0..authorizations_to_add {
547 cursor
548 .authorize(&format!("foo{}", i), &Did(format!("did:key:foo{}", i)))
549 .await?;
550 }
551
552 cursor.save(None).await?;
553
554 let authorizations = SphereWalker::from(&cursor).list_authorizations().await?;
555
556 assert_eq!(authorizations.len(), authorizations_to_add + 1);
557
558 Ok(())
559 }
560
561 #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
562 #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
563 async fn it_can_stream_the_whole_index() {
564 let (sphere_context, _) = simulated_sphere_context(SimulationAccess::ReadWrite, None)
565 .await
566 .unwrap();
567 let mut cursor = SphereCursor::latest(sphere_context);
568
569 let expected = BTreeSet::<(String, String)>::from([
570 ("cats".into(), "Cats are awesome".into()),
571 ("dogs".into(), "Dogs are pretty cool".into()),
572 ("birds".into(), "Birds rights".into()),
573 ("mice".into(), "Mice like cookies".into()),
574 ]);
575
576 for (slug, content) in &expected {
577 cursor
578 .write(slug.as_str(), &ContentType::Subtext, content.as_ref(), None)
579 .await
580 .unwrap();
581
582 cursor.save(None).await.unwrap();
583 }
584
585 let mut actual = BTreeSet::new();
586 let walker = SphereWalker::from(&cursor);
587 let stream = walker.content_stream();
588
589 tokio::pin!(stream);
590
591 while let Some(Ok((slug, mut file))) = stream.next().await {
592 let mut contents = String::new();
593 file.contents.read_to_string(&mut contents).await.unwrap();
594 actual.insert((slug, contents));
595 }
596
597 assert_eq!(expected, actual);
598 }
599}