1mod block;
5mod car;
6mod ledger;
7mod memo;
8mod walk;
9
10pub use block::*;
11pub use car::*;
12pub use ledger::*;
13pub use memo::*;
14pub use walk::*;
15
16#[cfg(test)]
17mod tests {
18 use anyhow::Result;
19 use cid::Cid;
20 use libipld_core::{codec::Codec, ipld::Ipld, raw::RawCodec};
21 use std::collections::BTreeSet;
22 use ucan::{crypto::KeyMaterial, store::UcanJwtStore};
23
24 use crate::{
25 authority::{generate_ed25519_key, Access},
26 context::{
27 HasMutableSphereContext, HasSphereContext, SphereAuthorityWrite, SphereContentRead,
28 SphereContentWrite, SpherePetnameWrite,
29 },
30 data::{BodyChunkIpld, ContentType, Link, LinkRecord, MemoIpld},
31 helpers::{
32 make_valid_link_record, simulated_sphere_context, touch_all_sphere_blocks,
33 SimulatedHasMutableSphereContext,
34 },
35 stream::{from_car_stream, memo_body_stream, memo_history_stream, to_car_stream},
36 tracing::initialize_tracing,
37 view::{BodyChunkDecoder, Sphere},
38 };
39 use libipld_cbor::DagCborCodec;
40 use noosphere_storage::{BlockStore, MemoryStore, UcanStore};
41 use tokio_stream::StreamExt;
42
43 #[cfg(target_arch = "wasm32")]
44 use wasm_bindgen_test::wasm_bindgen_test;
45
46 #[cfg(target_arch = "wasm32")]
47 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
48
49 pub const SCAFFOLD_CHANGES: &[(&[&str], &[&str])] = &[
50 (&["dogs", "birds"], &["alice", "bob"]),
51 (&["cats", "dogs"], &["gordon"]),
52 (&["birds"], &["cdata"]),
53 (&["cows", "beetles"], &["jordan", "ben"]),
54 ];
55
56 pub async fn scaffold_sphere_context_with_history(
57 ) -> Result<(SimulatedHasMutableSphereContext, Vec<Link<MemoIpld>>)> {
58 let (mut sphere_context, _) = simulated_sphere_context(Access::ReadWrite, None).await?;
59 let mut versions = Vec::new();
60 let store = sphere_context.sphere_context().await?.db().clone();
61
62 for (content_change, petname_change) in SCAFFOLD_CHANGES.iter() {
63 for slug in *content_change {
64 sphere_context
65 .write(
66 slug,
67 &ContentType::Subtext,
68 format!("{} are cool", slug).as_bytes(),
69 None,
70 )
71 .await?;
72 }
73
74 for petname in *petname_change {
75 let (id, record, _) = make_valid_link_record(&mut UcanStore(store.clone())).await?;
76 sphere_context.set_petname(petname, Some(id)).await?;
77 versions.push(sphere_context.save(None).await?);
78 sphere_context.set_petname_record(petname, &record).await?;
79 }
80
81 versions.push(sphere_context.save(None).await?);
82 }
83
84 let additional_device_credential = generate_ed25519_key();
85 let additional_device_did = additional_device_credential.get_did().await?.into();
86 let additional_device_authorization = sphere_context
87 .authorize("otherdevice", &additional_device_did)
88 .await?;
89
90 sphere_context.save(None).await?;
91
92 sphere_context
93 .revoke_authorization(&additional_device_authorization)
94 .await?;
95
96 sphere_context.save(None).await?;
97
98 Ok((sphere_context, versions))
99 }
100
101 #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
102 #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
103 async fn it_includes_all_link_records_and_proofs_from_the_address_book() -> Result<()> {
104 initialize_tracing(None);
105
106 let (mut sphere_context, _) = simulated_sphere_context(Access::ReadWrite, None).await?;
107 let mut db = sphere_context.sphere_context().await?.db().clone();
108
109 let (foo_did, foo_link_record, foo_link_record_link) =
110 make_valid_link_record(&mut db).await?;
111
112 sphere_context.set_petname("foo", Some(foo_did)).await?;
113 sphere_context.save(None).await?;
114 sphere_context
115 .set_petname_record("foo", &foo_link_record)
116 .await?;
117 let final_version = sphere_context.save(None).await?;
118
119 let mut other_store = MemoryStore::default();
120
121 let stream = memo_body_stream(
122 sphere_context.sphere_context().await?.db().clone(),
123 &final_version,
124 false,
125 );
126
127 tokio::pin!(stream);
128
129 while let Some((cid, block)) = stream.try_next().await? {
130 debug!("Received {cid}");
131 other_store.put_block(&cid, &block).await?;
132 }
133
134 let ucan_store = UcanStore(other_store);
135
136 let link_record =
137 LinkRecord::try_from(ucan_store.require_token(&foo_link_record_link).await?)?;
138
139 assert_eq!(link_record, foo_link_record);
140
141 link_record.collect_proofs(&ucan_store).await?;
142
143 Ok(())
144 }
145
146 #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
147 #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
148 async fn it_can_stream_all_blocks_in_a_sphere_version() -> Result<()> {
149 initialize_tracing(None);
150
151 let (sphere_context, _) = scaffold_sphere_context_with_history().await?;
152
153 let final_version = sphere_context.version().await?;
154
155 let mut other_store = MemoryStore::default();
156
157 let mut received = BTreeSet::new();
158
159 let stream = memo_body_stream(
160 sphere_context.sphere_context().await?.db().clone(),
161 &final_version,
162 false,
163 );
164
165 tokio::pin!(stream);
166
167 while let Some((cid, block)) = stream.try_next().await? {
168 debug!("Received {cid}");
169 assert!(
170 !received.contains(&cid),
171 "Got {cid} but we already received it",
172 );
173 received.insert(cid);
174 other_store.put_block(&cid, &block).await?;
175 }
176
177 let sphere = Sphere::at(&final_version, &other_store);
178
179 let content = sphere.get_content().await?;
180 let identities = sphere.get_address_book().await?.get_identities().await?;
181
182 for (content_change, petname_change) in SCAFFOLD_CHANGES.iter() {
183 for slug in *content_change {
184 let _ = content.get(&slug.to_string()).await?.cloned().unwrap();
185 }
186
187 for petname in *petname_change {
188 let _ = identities.get(&petname.to_string()).await?;
189 }
190 }
191
192 touch_all_sphere_blocks(&sphere).await?;
193
194 Ok(())
195 }
196
197 #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
198 #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
199 async fn it_can_stream_all_delta_blocks_for_a_range_of_history() -> Result<()> {
200 initialize_tracing(None);
201
202 let (sphere_context, versions) = scaffold_sphere_context_with_history().await?;
203
204 let original_store = sphere_context.sphere_context().await?.db().clone();
205
206 let mut other_store = MemoryStore::default();
207
208 let first_version = versions.first().unwrap();
209 let stream = memo_body_stream(original_store.clone(), first_version, false);
210
211 tokio::pin!(stream);
212
213 while let Some((cid, block)) = stream.try_next().await? {
214 other_store.put_block(&cid, &block).await?;
215 }
216
217 let sphere = Sphere::at(first_version, &other_store);
218
219 touch_all_sphere_blocks(&sphere).await?;
220
221 for i in 1..=3 {
222 let version = versions.get(i).unwrap();
223 let sphere = Sphere::at(version, &other_store);
224
225 assert!(touch_all_sphere_blocks(&sphere).await.is_err());
226 }
227
228 let stream = memo_history_stream(
229 original_store,
230 versions.last().unwrap(),
231 Some(first_version),
232 false,
233 );
234
235 tokio::pin!(stream);
236
237 while let Some((cid, block)) = stream.try_next().await? {
238 other_store.put_block(&cid, &block).await?;
239 }
240
241 for i in 1..=3 {
242 let version = versions.get(i).unwrap();
243 let sphere = Sphere::at(version, &other_store);
244 sphere.hydrate().await?;
245
246 touch_all_sphere_blocks(&sphere).await?;
247 }
248
249 Ok(())
250 }
251
252 #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
253 #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
254 async fn it_can_stream_all_blocks_in_some_sphere_content() -> Result<()> {
255 initialize_tracing(None);
256
257 let (mut sphere_context, _) = simulated_sphere_context(Access::ReadWrite, None).await?;
258 let mut db = sphere_context.sphere_context().await?.db_mut().clone();
259
260 let chunks = [b"foo", b"bar", b"baz"];
261
262 let mut next_chunk_cid = None;
263
264 for bytes in chunks.iter().rev() {
265 next_chunk_cid = Some(
266 db.save::<DagCborCodec, _>(&BodyChunkIpld {
267 bytes: bytes.to_vec(),
268 next: next_chunk_cid,
269 })
270 .await?,
271 );
272 }
273
274 let content_cid = sphere_context
275 .link("foo", &ContentType::Bytes, &next_chunk_cid.unwrap(), None)
276 .await?;
277
278 let stream = memo_body_stream(
279 sphere_context.sphere_context().await?.db().clone(),
280 &content_cid,
281 false,
282 );
283
284 let mut store = MemoryStore::default();
285
286 tokio::pin!(stream);
287
288 while let Some((cid, block)) = stream.try_next().await? {
289 store.put_block(&cid, &block).await?;
290 }
291
292 let memo = store.load::<DagCborCodec, MemoIpld>(&content_cid).await?;
293
294 let mut buffer = Vec::new();
295 let body_stream = BodyChunkDecoder(&memo.body, &store).stream();
296
297 tokio::pin!(body_stream);
298
299 while let Some(bytes) = body_stream.try_next().await? {
300 buffer.append(&mut Vec::from(bytes));
301 }
302
303 assert_eq!(buffer.as_slice(), b"foobarbaz");
304
305 Ok(())
306 }
307
308 #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
309 #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
310 async fn it_can_stream_all_blocks_in_a_sphere_version_as_a_car() -> Result<()> {
311 initialize_tracing(None);
312
313 let (mut sphere_context, _) = scaffold_sphere_context_with_history().await?;
314
315 let mut db = sphere_context.sphere_context().await?.db().clone();
316 let (id, link_record, _) = make_valid_link_record(&mut db).await?;
317 sphere_context.set_petname("hasrecord", Some(id)).await?;
318 sphere_context.save(None).await?;
319 sphere_context
320 .set_petname_record("hasrecord", &link_record)
321 .await?;
322 sphere_context.save(None).await?;
323
324 let final_version = sphere_context.version().await?;
325
326 let mut other_store = MemoryStore::default();
327
328 let stream = to_car_stream(
329 vec![final_version.into()],
330 memo_body_stream(db.clone(), &final_version, false),
331 );
332
333 let block_stream = from_car_stream(stream);
334
335 let mut received = BTreeSet::new();
336 tokio::pin!(block_stream);
337
338 while let Some((cid, block)) = block_stream.try_next().await? {
339 debug!("Received {cid}");
340 assert!(
341 !received.contains(&cid),
342 "Got {cid} but we already received it",
343 );
344 received.insert(cid);
345 other_store.put_block(&cid, &block).await?;
346 }
347
348 let sphere = Sphere::at(&final_version, &other_store);
349
350 let content = sphere.get_content().await?;
351 let identities = sphere.get_address_book().await?.get_identities().await?;
352
353 for (content_change, petname_change) in SCAFFOLD_CHANGES.iter() {
354 for slug in *content_change {
355 let _ = content.get(&slug.to_string()).await?.cloned().unwrap();
356 }
357
358 for petname in *petname_change {
359 let _ = identities.get(&petname.to_string()).await?;
360 }
361 }
362
363 let has_record = identities.get(&"hasrecord".into()).await?.unwrap();
364 let has_record_version = has_record.link_record(&UcanStore(other_store)).await;
365
366 assert!(
367 has_record_version.is_some(),
368 "We got a resolved link record from the stream"
369 );
370
371 touch_all_sphere_blocks(&sphere).await?;
372
373 Ok(())
374 }
375
376 #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
377 #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
378 async fn it_only_omits_memo_parent_references_when_streaming_sphere_body_with_content(
379 ) -> Result<()> {
380 initialize_tracing(None);
381
382 let (sphere_context, mut versions) = scaffold_sphere_context_with_history().await?;
383
384 debug!(
385 "Versions: {:#?}",
386 versions
387 .iter()
388 .map(|cid| cid.to_string())
389 .collect::<Vec<String>>()
390 );
391
392 let store = sphere_context.lock().await.db().clone();
393
394 let last_version = versions.pop().unwrap();
395 let last_version_parent = versions.pop().unwrap();
396
397 let mut links_referenced = BTreeSet::new();
398 let mut links_included = BTreeSet::new();
399
400 links_referenced.insert(*last_version);
402
403 let stream = memo_body_stream(store.clone(), &last_version, true);
404
405 tokio::pin!(stream);
406
407 while let Some((cid, block)) = stream.try_next().await? {
408 if cid == *last_version {
409 let memo = store.load::<DagCborCodec, MemoIpld>(&cid).await?;
411 assert_eq!(memo.parent, Some(last_version_parent));
412
413 let codec = DagCborCodec;
414 let mut root_references = BTreeSet::new();
415 codec.references::<Ipld, BTreeSet<Cid>>(&block, &mut root_references)?;
416
417 assert!(root_references.contains(&last_version_parent));
418 }
419
420 links_included.insert(cid);
421
422 match cid.codec() {
423 codec if codec == u64::from(DagCborCodec) => {
424 let codec = DagCborCodec;
425 codec.references::<Ipld, BTreeSet<Cid>>(&block, &mut links_referenced)?;
426 }
427 codec if codec == u64::from(RawCodec) => {
428 let codec = DagCborCodec;
429 codec.references::<Ipld, BTreeSet<Cid>>(&block, &mut links_referenced)?;
430 }
431 _ => {
432 unreachable!("No other codecs are used in our DAGs");
433 }
434 }
435 }
436
437 assert!(
438 !links_included.contains(&last_version_parent),
439 "Parent version should not be included"
440 );
441
442 let difference = links_referenced
443 .difference(&links_included)
444 .collect::<Vec<&Cid>>();
445
446 debug!(
447 "Difference: {:#?}",
448 difference
449 .iter()
450 .map(|cid| cid.to_string())
451 .collect::<Vec<String>>()
452 );
453
454 let last_dogs_version = sphere_context
457 .read("dogs")
458 .await?
459 .unwrap()
460 .memo
461 .parent
462 .unwrap();
463 let last_birds_version = sphere_context
464 .read("birds")
465 .await?
466 .unwrap()
467 .memo
468 .parent
469 .unwrap();
470
471 let expected_difference: Vec<&Cid> = vec![
472 &last_version_parent,
473 &last_birds_version,
474 &last_dogs_version,
475 ];
476
477 assert_eq!(difference.len(), expected_difference.len());
478
479 for cid in expected_difference {
480 assert!(difference.contains(&cid));
481 }
482
483 Ok(())
484 }
485}