1use crate::{BlockLinks, BlockStorage, StorageError};
5use async_trait::async_trait;
6use cid::Cid;
7use futures::Stream;
8use std::collections::{BinaryHeap, HashMap, VecDeque};
9
10pub fn block_diff<S, F>(
15 storage: S,
16 prev: Option<Cid>,
17 next: Cid,
18 block_links: BlockLinks,
19 look_ahead_depth: Option<u8>,
20 mut follow: F,
21) -> impl Stream<Item = Result<BlockDiff, StorageError>>
22where
23 S: BlockStorage + Clone + 'static,
24 F: BlockDiffFollow + 'static,
25{
26 async_stream::try_stream! {
27 let mut stack = VecDeque::<Cid>::new();
28 stack.push_back(next);
29
30 let mut prev_links: HashMap<Cid, BinaryHeap<ReferenceDepth>> = HashMap::new();
32 if let Some(prev) = prev {
33 prev_links.insert(prev, BinaryHeap::from([ReferenceDepth::Deep]));
34 }
35
36 while let Some(next) = stack.pop_front() {
38 let mut allow_resolve_next_levels = look_ahead_depth.unwrap_or(1);
40 loop {
41 match pop_reference(&mut prev_links, &next) {
42 Some(ReferenceDepth::Deep) | Some(ReferenceDepth::DeepNotFound) => {
44 },
46 Some(ReferenceDepth::Shallow) => {
48 stack.extend(extract_links_children(&storage, &next, &block_links).await?);
50 },
52 None => {
54 make_shallow(&storage, &block_links, &mut prev_links, &mut follow).await?;
56
57 if allow_resolve_next_levels > 0 {
59 allow_resolve_next_levels -= 1;
60 continue;
61 }
62
63 if follow.follow(&next).await? {
65 match extract_links_children(&storage, &next, &block_links).await {
66 Ok(children) => {
67 stack.extend(children.into_iter());
68 },
69 Err(StorageError::NotFound(_, _)) => {
70 },
72 Err(err) => {
73 Err(err)?;
75 },
76 }
77 }
78
79 yield BlockDiff::Added(next);
81 },
82 }
83 break;
84 }
85 }
86
87 for (cid, references) in prev_links.into_iter() {
89 for reference in references {
90 match reference {
91 ReferenceDepth::Deep => {
92 yield BlockDiff::Removed(cid);
93 if follow.follow_previous(&cid).await? {
94 for await descendant in extract_links_descendants(&storage, &block_links, cid) {
95 match descendant {
96 Ok(descendant) => {
97 yield BlockDiff::Removed(descendant);
98 },
99 Err(_err) => {
100 },
102 }
103 }
104 }
105 },
106 ReferenceDepth::Shallow => {
107 yield BlockDiff::Removed(cid);
108 },
109 ReferenceDepth::DeepNotFound => {
110 },
112 }
113 }
114 }
115 }
116}
117
118pub fn block_diff_added_with_parent<S, F>(
125 storage: S,
126 prev: Option<Cid>,
127 next: Cid,
128 block_links: BlockLinks,
129 look_ahead_depth: Option<u8>,
130 mut follow: F,
131) -> impl Stream<Item = Result<(Option<Cid>, Cid), StorageError>>
132where
133 S: BlockStorage + Clone + 'static,
134 F: BlockDiffFollow + 'static,
135{
136 async_stream::try_stream! {
137 let mut stack = VecDeque::<(Option<Cid>, Cid)>::new();
138 stack.push_back((None, next));
139
140 let mut prev_links: HashMap<Cid, BinaryHeap<ReferenceDepth>> = HashMap::new();
142 if let Some(prev) = prev {
143 prev_links.insert(prev, BinaryHeap::from([ReferenceDepth::Deep]));
144 }
145
146 while let Some((next_parent, next)) = stack.pop_front() {
148 let mut allow_resolve_next_levels = look_ahead_depth.unwrap_or(1);
150 loop {
151 match pop_reference(&mut prev_links, &next) {
152 Some(ReferenceDepth::Deep) | Some(ReferenceDepth::DeepNotFound) => {
154 },
156 Some(ReferenceDepth::Shallow) => {
158 stack.extend(extract_links_children(&storage, &next, &block_links).await?.into_iter().map(|child| (Some(next), child)));
160 },
162 None => {
164 make_shallow(&storage, &block_links, &mut prev_links, &mut follow).await?;
166
167 if allow_resolve_next_levels > 0 {
169 allow_resolve_next_levels -= 1;
170 continue;
171 }
172
173 if follow.follow(&next).await? {
175 match extract_links_children(&storage, &next, &block_links).await {
176 Ok(children) => {
177 stack.extend(children.into_iter().map(|child| (Some(next), child)));
178 },
179 Err(StorageError::NotFound(_, _)) => {
180 },
182 Err(err) => {
183 Err(err)?;
185 },
186 }
187 }
188
189 yield (next_parent, next);
191 },
192 }
193 break;
194 }
195 }
196 }
197}
198
199#[async_trait]
200pub trait BlockDiffFollow: Send + Sync {
201 async fn follow(&mut self, cid: &Cid) -> Result<bool, StorageError>;
203
204 async fn follow_previous(&mut self, cid: &Cid) -> Result<bool, StorageError> {
206 self.follow(cid).await
207 }
208}
209
210fn pop_reference(prev_links: &mut HashMap<Cid, BinaryHeap<ReferenceDepth>>, next: &Cid) -> Option<ReferenceDepth> {
212 let reference = match prev_links.get_mut(next) {
214 Some(prev) => prev.pop(),
215 None => None,
216 };
217
218 if reference.is_some() {
220 if let Some(references) = prev_links.get(next) {
221 if references.is_empty() {
222 prev_links.remove(next);
223 }
224 }
225 }
226
227 reference
229}
230
231async fn make_shallow<S, F>(
233 storage: &S,
234 links: &BlockLinks,
235 prev_links: &mut HashMap<Cid, BinaryHeap<ReferenceDepth>>,
236 follow: &mut F,
237) -> Result<(), StorageError>
238where
239 S: BlockStorage + Clone + 'static,
240 F: BlockDiffFollow + 'static,
241{
242 let deep_referencs: Vec<Cid> = prev_links
243 .iter()
244 .filter_map(|(cid, references)| match references.peek() {
245 Some(ReferenceDepth::Deep) => Some(*cid),
246 _ => None,
247 })
248 .collect();
249 for cid in deep_referencs {
250 if follow.follow_previous(&cid).await? {
251 make_reference_shallow(storage, links, prev_links, cid).await?;
252 }
253 }
254 Ok(())
255}
256
257async fn make_reference_shallow<S>(
259 storage: &S,
260 block_links: &BlockLinks,
261 prev_links: &mut HashMap<Cid, BinaryHeap<ReferenceDepth>>,
262 cid: Cid,
263) -> Result<(), StorageError>
264where
265 S: BlockStorage + Clone + 'static,
266{
267 let links = if let Some(prev) = prev_links.get_mut(&cid) {
269 if let Some(mut reference) = prev.peek_mut() {
270 match *reference {
271 ReferenceDepth::Deep => match extract_links_children(storage, &cid, block_links).await {
272 Ok(links) => {
273 *reference = ReferenceDepth::Shallow;
274 Some(links)
275 },
276 Err(StorageError::NotFound(_, _)) => {
277 *reference = ReferenceDepth::DeepNotFound;
278 None
279 },
280 Err(err) => return Err(err),
281 },
282 _ => None,
283 }
284 } else {
285 None
286 }
287 } else {
288 None
289 };
290
291 if let Some(links) = links {
293 for link in links {
294 prev_links.entry(link).or_default().push(ReferenceDepth::Deep);
295 }
296 }
297
298 Ok(())
299}
300
301async fn extract_links_children<S>(storage: &S, reference: &Cid, links: &BlockLinks) -> Result<Vec<Cid>, StorageError>
303where
304 S: BlockStorage + Clone + 'static,
305{
306 let block = storage.get(reference).await?;
307 let result = links.links(&block).map_err(StorageError::Internal)?.collect();
308 Ok(result)
309}
310
311fn extract_links_descendants<'a, S>(
314 storage: &'a S,
315 block_links: &'a BlockLinks,
316 reference: Cid,
317) -> impl Stream<Item = Result<Cid, StorageError>> + use<'a, S>
318where
319 S: BlockStorage + Clone + 'static,
320{
321 async_stream::stream! {
322 let mut stack = VecDeque::new();
323 stack.push_back(reference);
324 while let Some(reference) = stack.pop_front() {
325 let block = match storage.get(&reference).await {
326 Ok(block) => block,
327 Err(err) => {
328 yield Err(err);
329 continue;
330 }
331 };
332 let links = match block_links.links(&block) {
333 Ok(links) => links,
334 Err(err) => {
335 yield Err(StorageError::Internal(err));
336 continue;
337 },
338 };
339 for link in links {
340 yield Ok(link);
341 stack.push_back(link);
342 }
343 }
344 }
345}
346
347#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
348enum ReferenceDepth {
349 Deep,
351
352 Shallow,
354
355 DeepNotFound,
357}
358
359#[derive(Debug, PartialEq)]
360pub enum BlockDiff {
361 Added(Cid),
363
364 Removed(Cid),
366}
367
368#[cfg(test)]
369mod tests {
370 use super::BlockDiffFollow;
371 use crate::{
372 library::{
373 block_diff::{block_diff, block_diff_added_with_parent, BlockDiff},
374 test::TestStorage,
375 },
376 BlockStorageExt, StorageError,
377 };
378 use async_trait::async_trait;
379 use cid::Cid;
380 use futures::TryStreamExt;
381 use serde::{Deserialize, Serialize};
382 use std::collections::BTreeSet;
383
384 pub struct FollowAll;
385 #[async_trait]
386 impl BlockDiffFollow for FollowAll {
387 async fn follow_previous(&mut self, _cid: &Cid) -> Result<bool, StorageError> {
388 Ok(true)
389 }
390 async fn follow(&mut self, _cid: &Cid) -> Result<bool, StorageError> {
391 Ok(true)
392 }
393 }
394
395 #[derive(Debug, Clone)]
396 struct FollowAllExcept {
397 previous: BTreeSet<Cid>,
398 next: BTreeSet<Cid>,
399 }
400 #[async_trait]
401 impl BlockDiffFollow for FollowAllExcept {
402 async fn follow_previous(&mut self, cid: &Cid) -> Result<bool, StorageError> {
403 Ok(!self.previous.contains(cid))
404 }
405 async fn follow(&mut self, cid: &Cid) -> Result<bool, StorageError> {
406 Ok(!self.next.contains(cid))
407 }
408 }
409
410 #[derive(Debug, Serialize, Deserialize, PartialEq)]
411 struct Node {
412 id: u32,
413 nodes: Vec<Cid>,
414 }
415
416 #[tokio::test]
440 async fn test_one_level() {
441 let storage = TestStorage::default();
442
443 let node1 = storage.set_serialized(&Node { id: 1, nodes: vec![] }).await.unwrap();
445 let node2 = storage.set_serialized(&Node { id: 2, nodes: vec![] }).await.unwrap();
446 let node3 = storage.set_serialized(&Node { id: 3, nodes: vec![] }).await.unwrap();
447 let node4 = storage.set_serialized(&Node { id: 4, nodes: vec![] }).await.unwrap();
448 let node5 = storage
449 .set_serialized(&Node { id: 5, nodes: vec![node1, node2] })
450 .await
451 .unwrap();
452 let node6 = storage
453 .set_serialized(&Node { id: 6, nodes: vec![node3, node4] })
454 .await
455 .unwrap();
456 let node7 = storage
457 .set_serialized(&Node { id: 7, nodes: vec![node5, node6] })
458 .await
459 .unwrap();
460
461 let node5_change = storage
463 .set_serialized(&Node { id: 50, nodes: vec![node1, node2] })
464 .await
465 .unwrap();
466 let node7_change = storage
467 .set_serialized(&Node { id: 70, nodes: vec![node5_change, node6] })
468 .await
469 .unwrap();
470
471 let diff =
473 block_diff(storage.clone(), Some(node7), node7_change, Default::default(), Default::default(), FollowAll)
474 .try_collect::<Vec<BlockDiff>>()
475 .await
476 .unwrap();
477 assert_eq!(diff.len(), 4);
478 assert!(diff.contains(&BlockDiff::Added(node7_change)));
479 assert!(diff.contains(&BlockDiff::Added(node5_change)));
480 assert!(diff.contains(&BlockDiff::Removed(node7)));
481 assert!(diff.contains(&BlockDiff::Removed(node5)));
482
483 let diff = block_diff_added_with_parent(
485 storage.clone(),
486 Some(node7),
487 node7_change,
488 Default::default(),
489 Default::default(),
490 FollowAll,
491 )
492 .try_collect::<Vec<(Option<Cid>, Cid)>>()
493 .await
494 .unwrap();
495 assert_eq!(diff.len(), 2);
496 assert!(diff.contains(&(None, node7_change)));
497 assert!(diff.contains(&(Some(node7_change), node5_change)));
498 }
499
500 #[tokio::test]
525 async fn test_reparent_root() {
526 let storage = TestStorage::default();
527
528 let node1 = storage.set_serialized(&Node { id: 1, nodes: vec![] }).await.unwrap();
530 let node2 = storage.set_serialized(&Node { id: 2, nodes: vec![] }).await.unwrap();
531 let node3 = storage.set_serialized(&Node { id: 3, nodes: vec![] }).await.unwrap();
532 let node4 = storage.set_serialized(&Node { id: 4, nodes: vec![] }).await.unwrap();
533 let node5 = storage
534 .set_serialized(&Node { id: 5, nodes: vec![node1, node2] })
535 .await
536 .unwrap();
537 let node6 = storage
538 .set_serialized(&Node { id: 6, nodes: vec![node3, node4] })
539 .await
540 .unwrap();
541 let node7 = storage
542 .set_serialized(&Node { id: 7, nodes: vec![node5, node6] })
543 .await
544 .unwrap();
545
546 let node8_change = storage.set_serialized(&Node { id: 8, nodes: vec![node7] }).await.unwrap();
548
549 let diff =
551 block_diff(storage.clone(), Some(node7), node8_change, Default::default(), Default::default(), FollowAll)
552 .try_collect::<Vec<BlockDiff>>()
553 .await
554 .unwrap();
555 assert_eq!(diff.len(), 1);
556 assert!(diff.contains(&BlockDiff::Added(node8_change)));
557
558 let diff = block_diff_added_with_parent(
560 storage.clone(),
561 Some(node7),
562 node8_change,
563 Default::default(),
564 Default::default(),
565 FollowAll,
566 )
567 .try_collect::<Vec<(Option<Cid>, Cid)>>()
568 .await
569 .unwrap();
570 assert_eq!(diff.len(), 1);
571 assert!(diff.contains(&(None, node8_change)));
572 }
573
574 #[tokio::test]
598 async fn test_follow() {
599 let storage = TestStorage::default();
600
601 let node1 = storage.set_serialized(&Node { id: 1, nodes: vec![] }).await.unwrap();
603 let node2 = storage.set_serialized(&Node { id: 2, nodes: vec![] }).await.unwrap();
604 let node3 = storage.set_serialized(&Node { id: 3, nodes: vec![] }).await.unwrap();
605 let node4 = storage.set_serialized(&Node { id: 4, nodes: vec![] }).await.unwrap();
606 let node5 = storage
607 .set_serialized(&Node { id: 5, nodes: vec![node1, node2] })
608 .await
609 .unwrap();
610 let node6 = storage
611 .set_serialized(&Node { id: 6, nodes: vec![node3, node4] })
612 .await
613 .unwrap();
614 let node7 = storage
615 .set_serialized(&Node { id: 7, nodes: vec![node5, node6] })
616 .await
617 .unwrap();
618
619 let node5_change = storage
621 .set_serialized(&Node { id: 50, nodes: vec![node1, node2] })
622 .await
623 .unwrap();
624 let node7_change = storage
625 .set_serialized(&Node { id: 70, nodes: vec![node5_change, node6] })
626 .await
627 .unwrap();
628
629 let do_not_follow_7 =
631 FollowAllExcept { previous: [node7].into_iter().collect(), next: [node7_change].into_iter().collect() };
632
633 let diff = block_diff(
635 storage.clone(),
636 Some(node7),
637 node7_change,
638 Default::default(),
639 Default::default(),
640 do_not_follow_7.clone(),
641 )
642 .try_collect::<Vec<BlockDiff>>()
643 .await
644 .unwrap();
645 assert_eq!(diff.len(), 2);
646 assert!(diff.contains(&BlockDiff::Added(node7_change)));
647 assert!(diff.contains(&BlockDiff::Removed(node7)));
648
649 let diff = block_diff_added_with_parent(
651 storage.clone(),
652 Some(node7),
653 node7_change,
654 Default::default(),
655 Default::default(),
656 do_not_follow_7.clone(),
657 )
658 .try_collect::<Vec<(Option<Cid>, Cid)>>()
659 .await
660 .unwrap();
661 assert_eq!(diff.len(), 1);
662 assert!(diff.contains(&(None, node7_change)));
663 }
664}