1use super::PublicNodeSerializable;
4use crate::{
5 error::FsError,
6 public::{PublicDirectory, PublicFile},
7 traits::Id,
8};
9use anyhow::{Result, bail};
10use async_once_cell::OnceCell;
11use chrono::{DateTime, Utc};
12use std::{cmp::Ordering, collections::BTreeSet};
13use wnfs_common::{BlockStore, Cid, Storable, utils::Arc};
14
15#[derive(Debug, Clone)]
33pub enum PublicNode {
34 File(Arc<PublicFile>),
35 Dir(Arc<PublicDirectory>),
36}
37
38impl PublicNode {
43 pub fn upsert_mtime(&mut self, time: DateTime<Utc>) {
67 match self {
68 Self::File(file) => {
69 Arc::make_mut(file).metadata.upsert_mtime(time);
70 }
71 Self::Dir(dir) => {
72 Arc::make_mut(dir).metadata.upsert_mtime(time);
73 }
74 }
75 }
76
77 pub fn update_previous(&self, cids: Vec<Cid>) -> Self {
100 match self {
101 Self::File(file) => {
102 let mut file = (**file).clone();
103 file.previous = cids.into_iter().collect();
104 Self::File(Arc::new(file))
105 }
106 Self::Dir(dir) => {
107 let mut dir = (**dir).clone();
108 dir.previous = cids.into_iter().collect();
109 Self::Dir(Arc::new(dir))
110 }
111 }
112 }
113
114 pub fn get_previous(&self) -> &BTreeSet<Cid> {
133 match self {
134 Self::File(file) => file.get_previous(),
135 Self::Dir(dir) => dir.get_previous(),
136 }
137 }
138
139 pub fn as_dir(&self) -> Result<Arc<PublicDirectory>> {
154 Ok(match self {
155 Self::Dir(dir) => Arc::clone(dir),
156 _ => bail!(FsError::NotADirectory),
157 })
158 }
159
160 pub(crate) fn as_dir_mut(&mut self) -> Result<&mut Arc<PublicDirectory>> {
162 Ok(match self {
163 Self::Dir(dir) => dir,
164 _ => bail!(FsError::NotADirectory),
165 })
166 }
167
168 pub fn as_file(&self) -> Result<Arc<PublicFile>> {
183 Ok(match self {
184 Self::File(file) => Arc::clone(file),
185 _ => bail!(FsError::NotAFile),
186 })
187 }
188
189 pub fn as_file_mut(&mut self) -> Result<&mut Arc<PublicFile>> {
191 match self {
192 Self::File(file) => Ok(file),
193 _ => bail!(FsError::NotAFile),
194 }
195 }
196
197 pub fn is_dir(&self) -> bool {
211 matches!(self, Self::Dir(_))
212 }
213
214 pub fn is_file(&self) -> bool {
228 matches!(self, Self::File(_))
229 }
230
231 pub async fn causal_compare(
242 &self,
243 other: &Self,
244 store: &impl BlockStore,
245 ) -> Result<Option<Ordering>> {
246 async fn next_previous_set(
247 previous_set: BTreeSet<Cid>,
248 visited_cids: &mut BTreeSet<Cid>,
249 store: &impl BlockStore,
250 ) -> Result<BTreeSet<Cid>> {
251 let mut previous = BTreeSet::new();
252
253 for cid in previous_set {
254 let node = PublicNode::load(&cid, store).await?;
255 previous.extend(
256 node.get_previous()
257 .iter()
258 .filter(|cid| visited_cids.insert(**cid))
259 .cloned(),
260 );
261 }
262
263 Ok(previous)
264 }
265
266 let our_root = self.store(store).await?;
267 let other_root = other.store(store).await?;
268
269 if our_root == other_root {
270 return Ok(Some(Ordering::Equal));
271 }
272
273 let mut our_previous_set = self.get_previous().clone();
274 let mut other_previous_set = other.get_previous().clone();
275
276 let mut our_visited = BTreeSet::new();
277 let mut other_visited = BTreeSet::new();
278
279 loop {
280 if other_previous_set.contains(&our_root) {
281 return Ok(Some(Ordering::Less));
282 }
283
284 if our_previous_set.contains(&other_root) {
285 return Ok(Some(Ordering::Greater));
286 }
287
288 let our_is_true_subset =
292 !our_previous_set.is_empty() && our_previous_set.is_subset(&other_visited);
293 let other_is_true_subset =
294 !other_previous_set.is_empty() && other_previous_set.is_subset(&our_visited);
295 if our_is_true_subset || other_is_true_subset {
296 return Ok(None);
297 }
298
299 our_previous_set = next_previous_set(our_previous_set, &mut our_visited, store).await?;
300 other_previous_set =
301 next_previous_set(other_previous_set, &mut other_visited, store).await?;
302
303 if our_previous_set.is_empty() && other_previous_set.is_empty() {
304 return Ok(None); }
306 }
307 }
308}
309
310impl Id for PublicNode {
311 fn get_id(&self) -> String {
312 match self {
313 PublicNode::File(file) => file.get_id(),
314 PublicNode::Dir(dir) => dir.get_id(),
315 }
316 }
317}
318
319impl PartialEq for PublicNode {
320 fn eq(&self, other: &PublicNode) -> bool {
321 match (self, other) {
322 (Self::File(self_file), Self::File(other_file)) => {
323 Arc::ptr_eq(self_file, other_file) || self_file == other_file
324 }
325 (Self::Dir(self_dir), Self::Dir(other_dir)) => {
326 Arc::ptr_eq(self_dir, other_dir) || self_dir == other_dir
327 }
328 _ => false,
329 }
330 }
331}
332
333impl From<PublicFile> for PublicNode {
334 fn from(file: PublicFile) -> Self {
335 Self::File(Arc::new(file))
336 }
337}
338
339impl From<PublicDirectory> for PublicNode {
340 fn from(dir: PublicDirectory) -> Self {
341 Self::Dir(Arc::new(dir))
342 }
343}
344
345impl Storable for PublicNode {
346 type Serializable = PublicNodeSerializable;
347
348 async fn to_serializable(&self, store: &impl BlockStore) -> Result<Self::Serializable> {
349 Ok(match self {
350 Self::File(file) => file.to_serializable(store).await?,
351 Self::Dir(dir) => dir.to_serializable(store).await?,
352 })
353 }
354
355 async fn from_serializable(
356 cid: Option<&Cid>,
357 serializable: Self::Serializable,
358 ) -> Result<Self> {
359 Ok(match serializable {
361 PublicNodeSerializable::File(file) => Self::File(Arc::new(
362 PublicFile::from_serializable(cid, PublicNodeSerializable::File(file)).await?,
363 )),
364 PublicNodeSerializable::Dir(dir) => Self::Dir(Arc::new(
365 PublicDirectory::from_serializable(cid, PublicNodeSerializable::Dir(dir)).await?,
366 )),
367 })
368 }
369
370 fn persisted_as(&self) -> Option<&OnceCell<Cid>> {
371 match self {
372 PublicNode::File(file) => file.as_ref().persisted_as(),
373 PublicNode::Dir(dir) => dir.as_ref().persisted_as(),
374 }
375 }
376}
377
378#[cfg(test)]
383mod tests {
384 use crate::public::{PublicDirectory, PublicFile, PublicNode};
385 use chrono::Utc;
386 use testresult::TestResult;
387 use wnfs_common::{MemoryBlockStore, Storable};
388
389 #[async_std::test]
390 async fn serialized_public_node_can_be_deserialized() -> TestResult {
391 let store = &MemoryBlockStore::new();
392 let dir_node: PublicNode = PublicDirectory::new(Utc::now()).into();
393 let file_node: PublicNode = PublicFile::new(Utc::now()).into();
394
395 let file_node = PublicNode::load(&file_node.store(store).await?, store).await?;
397
398 let dir_cid = dir_node.store(store).await?;
399 let file_cid = file_node.store(store).await?;
400
401 let loaded_file_node = PublicNode::load(&file_cid, store).await?;
402 let loaded_dir_node = PublicNode::load(&dir_cid, store).await?;
403
404 assert_eq!(loaded_file_node, file_node);
405 assert_eq!(loaded_dir_node, dir_node);
406
407 Ok(())
408 }
409}
410
411#[cfg(test)]
412mod proptests {
413 use super::*;
414 use futures::{StreamExt, TryStreamExt, stream};
415 use proptest::{collection::vec, prelude::*};
416 use test_strategy::proptest;
417 use wnfs_common::MemoryBlockStore;
418
419 #[derive(Debug, Clone, Copy)]
420 enum Operation {
421 Write(usize), Merge, Fork(usize), }
425
426 #[derive(Debug, Clone)]
427 struct State {
428 heads: Vec<Arc<PublicDirectory>>, fork_num: i64,
430 }
431
432 impl State {
433 pub fn new(init_time: i64) -> Self {
434 Self {
435 heads: vec![Arc::new(PublicDirectory::new(Self::time(init_time)))],
436 fork_num: 0,
437 }
438 }
439
440 fn time(n: i64) -> DateTime<Utc> {
441 DateTime::<Utc>::from_timestamp(n, 0).unwrap()
442 }
443
444 pub fn get_head(&self, n: usize) -> &Arc<PublicDirectory> {
445 let len = self.heads.len();
446 debug_assert!(len > 0);
447 &self.heads[n % len] }
449
450 pub fn get_head_mut(&mut self, n: usize) -> &mut Arc<PublicDirectory> {
451 let len = self.heads.len();
452 debug_assert!(len > 0);
453 &mut self.heads[n % len] }
455
456 pub async fn run(&mut self, op: &Operation, store: &impl BlockStore) -> Result<()> {
457 match op {
458 Operation::Write(n) => {
459 let head = self.get_head_mut(*n);
460 head.store(store).await?;
461 head.prepare_next_revision();
462 }
463 Operation::Merge => {
464 let head_cids = stream::iter(self.heads.iter())
465 .then(|head| head.store(store))
466 .try_collect::<BTreeSet<_>>()
467 .await?;
468 let mut dir = PublicDirectory::new(Self::time(0));
469 dir.previous = head_cids;
470 self.heads = vec![Arc::new(dir)];
471 }
472 Operation::Fork(n) => {
473 let mut head = (**self.get_head(*n)).clone();
474 self.fork_num += 1;
475 head.metadata.upsert_mtime(Self::time(self.fork_num));
477 self.heads.push(Arc::new(head));
478 }
479 }
480 Ok(())
481 }
482
483 pub async fn run_all(
484 &mut self,
485 ops: impl IntoIterator<Item = Operation>,
486 store: &impl BlockStore,
487 ) -> Result<()> {
488 for op in ops {
489 self.run(&op, store).await?;
490 }
491 Ok(())
492 }
493
494 pub fn head_node(&self) -> PublicNode {
495 debug_assert!(!self.heads.is_empty());
496 PublicNode::Dir(Arc::clone(&self.heads[0]))
497 }
498 }
499
500 fn op() -> impl Strategy<Value = Operation> {
501 (0..=2, 0..16).prop_map(|(op, idx)| match op {
502 0 => Operation::Write(idx as usize),
503 1 => Operation::Merge,
504 2 => Operation::Fork(idx as usize),
505 _ => unreachable!(
506 "This case should be impossible. Values generated are only 0, 1, and 2"
507 ),
508 })
509 }
510
511 async fn run_ops(
512 init_time: i64,
513 operations: impl IntoIterator<Item = Operation>,
514 store: &impl BlockStore,
515 ) -> Result<PublicNode> {
516 let mut state = State::new(init_time);
517 state.run_all(operations, store).await?;
518 Ok(state.head_node())
519 }
520
521 #[proptest]
522 fn test_reflexivity(#[strategy(vec(op(), 0..100))] operations: Vec<Operation>) {
523 async_std::task::block_on(async move {
524 let mut state = State::new(0);
525 let store = &MemoryBlockStore::new();
526
527 state.run_all(operations, store).await.unwrap();
528 let head_one = state.head_node();
529 let head_two = state.head_node();
530
531 prop_assert_eq!(
532 head_one.causal_compare(&head_two, store).await.unwrap(),
533 Some(Ordering::Equal)
534 );
535
536 Ok(())
537 })?;
538 }
539
540 #[proptest(cases = 256, max_global_rejects = 10_000)]
541 fn test_asymmetry(
542 #[strategy(vec(op(), 0..30))] operations_one: Vec<Operation>,
543 #[strategy(vec(op(), 0..30))] operations_two: Vec<Operation>,
544 ) {
545 async_std::task::block_on(async move {
546 let store = &MemoryBlockStore::new();
547 let node_one = run_ops(0, operations_one, store).await.unwrap();
548 let node_two = run_ops(0, operations_two, store).await.unwrap();
549
550 let Some(cmp) = node_one.causal_compare(&node_two, store).await.unwrap() else {
551 return Err(TestCaseError::reject("not testing causally incomparable"));
552 };
553
554 let Some(cmp_rev) = node_two.causal_compare(&node_one, store).await.unwrap() else {
555 return Err(TestCaseError::fail(
556 "causally comparable one way, but not the other",
557 ));
558 };
559
560 prop_assert_eq!(cmp.reverse(), cmp_rev);
561
562 Ok(())
563 })?;
564 }
565
566 #[proptest(cases = 100, max_global_rejects = 10_000)]
567 fn test_transitivity(
568 #[strategy(vec(op(), 0..20))] operations0: Vec<Operation>,
569 #[strategy(vec(op(), 0..20))] operations1: Vec<Operation>,
570 #[strategy(vec(op(), 0..20))] operations2: Vec<Operation>,
571 ) {
572 async_std::task::block_on(async move {
573 let store = &MemoryBlockStore::new();
574 let node0 = run_ops(0, operations0, store).await.unwrap();
575 let node1 = run_ops(0, operations1, store).await.unwrap();
576 let node2 = run_ops(0, operations2, store).await.unwrap();
577
578 let Some(cmp_0_1) = node0.causal_compare(&node1, store).await.unwrap() else {
579 return Err(TestCaseError::reject("not testing causally incomparable"));
580 };
581
582 let Some(cmp_1_2) = node1.causal_compare(&node2, store).await.unwrap() else {
583 return Err(TestCaseError::reject("not testing causally incomparable"));
584 };
585
586 let Some(cmp_0_2) = node0.causal_compare(&node2, store).await.unwrap() else {
587 return Err(TestCaseError::reject("not testing causally incomparable"));
588 };
589
590 match (cmp_0_1, cmp_1_2) {
591 (Ordering::Equal, Ordering::Equal) => prop_assert_eq!(cmp_0_2, Ordering::Equal),
592 (Ordering::Less, Ordering::Less) => prop_assert_eq!(cmp_0_2, Ordering::Less),
593 (Ordering::Less, Ordering::Equal) => prop_assert_eq!(cmp_0_2, Ordering::Less),
594 (Ordering::Equal, Ordering::Less) => prop_assert_eq!(cmp_0_2, Ordering::Less),
595 (Ordering::Equal, Ordering::Greater) => prop_assert_eq!(cmp_0_2, Ordering::Greater),
596 (Ordering::Greater, Ordering::Equal) => prop_assert_eq!(cmp_0_2, Ordering::Greater),
597 (Ordering::Greater, Ordering::Greater) => {
598 prop_assert_eq!(cmp_0_2, Ordering::Greater)
599 }
600 (Ordering::Less, Ordering::Greater) => {
601 return Err(TestCaseError::reject(
602 "a < b and b > c, there's no transitivity to test here",
603 ));
604 }
605 (Ordering::Greater, Ordering::Less) => {
606 return Err(TestCaseError::reject(
607 "a > b and b < c, there's no transitivity to test here",
608 ));
609 }
610 }
611
612 Ok(())
613 })?;
614 }
615
616 #[proptest]
617 fn test_different_roots_incomparable(
618 #[strategy(vec(op(), 0..100))] operations0: Vec<Operation>,
619 #[strategy(vec(op(), 0..100))] operations1: Vec<Operation>,
620 ) {
621 async_std::task::block_on(async move {
622 let store = &MemoryBlockStore::new();
623 let node0 = run_ops(0, operations0, store).await.unwrap();
624 let node1 = run_ops(1, operations1, store).await.unwrap();
625
626 prop_assert_eq!(node0.causal_compare(&node1, store).await.unwrap(), None);
627 prop_assert_eq!(node1.causal_compare(&node0, store).await.unwrap(), None);
628 Ok(())
629 })?;
630 }
631
632 #[proptest]
633 fn test_ops_after_merge_makes_greater(
634 #[strategy(vec(op(), 0..100))] operations: Vec<Operation>,
635 #[strategy(vec(op(), 0..100))] more_ops: Vec<Operation>,
636 ) {
637 async_std::task::block_on(async move {
638 let mut state = State::new(0);
639 let store = &MemoryBlockStore::new();
640
641 state.run_all(operations, store).await.unwrap();
642 let head_one = state.head_node();
643 state.run(&Operation::Merge, store).await.unwrap();
644 state.run_all(more_ops, store).await.unwrap();
645 let head_two = state.head_node();
646
647 prop_assert_eq!(
648 head_one.causal_compare(&head_two, store).await.unwrap(),
649 Some(Ordering::Less)
650 );
651 prop_assert_eq!(
652 head_two.causal_compare(&head_one, store).await.unwrap(),
653 Some(Ordering::Greater)
654 );
655
656 Ok(())
657 })?;
658 }
659}
660
661#[cfg(test)]
662mod snapshot_tests {
663 use super::*;
664 use chrono::TimeZone;
665 use wnfs_common::utils::SnapshotBlockStore;
666
667 #[async_std::test]
668 async fn public_file_and_directory_nodes() {
669 let store = &SnapshotBlockStore::default();
670 let time = Utc.with_ymd_and_hms(1970, 1, 1, 0, 0, 0).unwrap();
671
672 let dir_node: PublicNode = PublicDirectory::new(time).into();
673 let file_node: PublicNode = PublicFile::new(time).into();
674
675 let dir_cid = dir_node.store(store).await.unwrap();
676 let file_cid = file_node.store(store).await.unwrap();
677
678 let dir = store.get_block_snapshot(&dir_cid).await.unwrap();
679 let file = store.get_block_snapshot(&file_cid).await.unwrap();
680
681 insta::assert_json_snapshot!(dir);
682 insta::assert_json_snapshot!(file);
683 }
684
685 #[async_std::test]
686 async fn public_fs() {
687 let store = &SnapshotBlockStore::default();
688 let time = Utc.with_ymd_and_hms(1970, 1, 1, 0, 0, 0).unwrap();
689
690 let paths = [
691 vec!["text.txt".into()],
692 vec!["music".into(), "jazz".into()],
693 vec!["videos".into(), "movies".into(), "anime".into()],
694 ];
695
696 let root_dir = &mut PublicDirectory::new_rc(time);
697 let _ = root_dir.store(store).await.unwrap();
698
699 for path in paths.iter() {
700 root_dir
701 .write(path, b"Hello, World!".to_vec(), time, store)
702 .await
703 .unwrap();
704 }
705
706 let cid = root_dir.store(store).await.unwrap();
707
708 let values = store.get_dag_snapshot(cid).await.unwrap();
709 insta::assert_json_snapshot!(values)
710 }
711}