Skip to main content

co_primitives/library/
block_diff.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2// Copyright (C) 2026 1io BRANDGUARDIAN GmbH
3
4use crate::{BlockLinks, BlockStorage, StorageError};
5use async_trait::async_trait;
6use cid::Cid;
7use futures::Stream;
8use std::collections::{BinaryHeap, HashMap, VecDeque};
9
10/// Find added/removed references in blocks.
11/// If a [`Cid`] is referenced multiple times it will also returnes multiple times.
12/// This diff works recursively - all added/removed [`Cid`] at any depth will be returned.
13/// However there is a look-ahead depth limit, how many nodes are looked down to reuse references (defaults to `1`).
14pub fn block_diff<S, F>(
15	storage: S,
16	prev: Option<Cid>,
17	next: Cid,
18	block_links: BlockLinks,
19	look_ahead_depth: Option<u8>,
20	mut follow: F,
21) -> impl Stream<Item = Result<BlockDiff, StorageError>>
22where
23	S: BlockStorage + Clone + 'static,
24	F: BlockDiffFollow + 'static,
25{
26	async_stream::try_stream! {
27		let mut stack = VecDeque::<Cid>::new();
28		stack.push_back(next);
29
30		// collect prev (one level)
31		let mut prev_links: HashMap<Cid, BinaryHeap<ReferenceDepth>> = HashMap::new();
32		if let Some(prev) = prev {
33			prev_links.insert(prev, BinaryHeap::from([ReferenceDepth::Deep]));
34		}
35
36		// walk next
37		while let Some(next) = stack.pop_front() {
38			// try to reuse a reference
39			let mut allow_resolve_next_levels = look_ahead_depth.unwrap_or(1);
40			loop {
41				match pop_reference(&mut prev_links, &next) {
42					// found deep reference -> (re-)use it
43					Some(ReferenceDepth::Deep) | Some(ReferenceDepth::DeepNotFound) => {
44						// result.push(Diff::Reuse(*next));
45					},
46					// found shallow reference -> (re-)use it and try to reuse its links
47					Some(ReferenceDepth::Shallow) => {
48						// walk children
49						stack.extend(extract_links_children(&storage, &next, &block_links).await?);
50						// result.push(Diff::Reuse(*next));
51					},
52					// no previous reference found
53					None => {
54						// resolve one level of deep references
55						make_shallow(&storage, &block_links, &mut prev_links, &mut follow).await?;
56
57						// retry with next level resolved
58						if allow_resolve_next_levels > 0 {
59							allow_resolve_next_levels -= 1;
60							continue;
61						}
62
63						// walk children
64						if follow.follow(&next).await? {
65							match extract_links_children(&storage, &next, &block_links).await {
66								Ok(children) => {
67									stack.extend(children.into_iter());
68								},
69								Err(StorageError::NotFound(_, _)) => {
70									// ignore and just return the reference
71								},
72								Err(err) => {
73									// forward
74									Err(err)?;
75								},
76							}
77						}
78
79						// add as added
80						yield BlockDiff::Added(next);
81					},
82				}
83				break;
84			}
85		}
86
87		// removed
88		for (cid, references) in prev_links.into_iter() {
89			for reference in references {
90				match reference {
91					ReferenceDepth::Deep => {
92						yield BlockDiff::Removed(cid);
93						if follow.follow_previous(&cid).await? {
94							for await descendant in extract_links_descendants(&storage, &block_links, cid) {
95								match descendant {
96									Ok(descendant) => {
97										yield BlockDiff::Removed(descendant);
98									},
99									Err(_err) => {
100										// ignore not found (others?)
101									},
102								}
103							}
104						}
105					},
106					ReferenceDepth::Shallow => {
107						yield BlockDiff::Removed(cid);
108					},
109					ReferenceDepth::DeepNotFound => {
110						// ignore not found
111					},
112				}
113			}
114		}
115	}
116}
117
118/// Find added references in blocks with its parent.
119/// This diff works recursively - all added [`Cid`] at any depth will be returned.
120/// However there is a look-ahead depth limit, how many nodes are looked down to reuse references (defaults to `1`).
121///
122/// # Args
123/// - `follow` - Whether to follow this [`Cid`] or not.
124pub fn block_diff_added_with_parent<S, F>(
125	storage: S,
126	prev: Option<Cid>,
127	next: Cid,
128	block_links: BlockLinks,
129	look_ahead_depth: Option<u8>,
130	mut follow: F,
131) -> impl Stream<Item = Result<(Option<Cid>, Cid), StorageError>>
132where
133	S: BlockStorage + Clone + 'static,
134	F: BlockDiffFollow + 'static,
135{
136	async_stream::try_stream! {
137		let mut stack = VecDeque::<(Option<Cid>, Cid)>::new();
138		stack.push_back((None, next));
139
140		// collect prev (one level)
141		let mut prev_links: HashMap<Cid, BinaryHeap<ReferenceDepth>> = HashMap::new();
142		if let Some(prev) = prev {
143			prev_links.insert(prev, BinaryHeap::from([ReferenceDepth::Deep]));
144		}
145
146		// walk next
147		while let Some((next_parent, next)) = stack.pop_front() {
148			// try to reuse a reference
149			let mut allow_resolve_next_levels = look_ahead_depth.unwrap_or(1);
150			loop {
151				match pop_reference(&mut prev_links, &next) {
152					// found deep reference -> (re-)use it
153					Some(ReferenceDepth::Deep) | Some(ReferenceDepth::DeepNotFound) => {
154						// result.push(Diff::Reuse(*next));
155					},
156					// found shallow reference -> (re-)use it and try to reuse its links
157					Some(ReferenceDepth::Shallow) => {
158						// walk children
159						stack.extend(extract_links_children(&storage, &next, &block_links).await?.into_iter().map(|child| (Some(next), child)));
160						// result.push(Diff::Reuse(*next));
161					},
162					// no previous reference found
163					None => {
164						// resolve one level of deep references
165						make_shallow(&storage, &block_links, &mut prev_links, &mut follow).await?;
166
167						// retry with next level resolved
168						if allow_resolve_next_levels > 0 {
169							allow_resolve_next_levels -= 1;
170							continue;
171						}
172
173						// walk children
174						if follow.follow(&next).await? {
175							match extract_links_children(&storage, &next, &block_links).await {
176								Ok(children) => {
177									stack.extend(children.into_iter().map(|child| (Some(next), child)));
178								},
179								Err(StorageError::NotFound(_, _)) => {
180									// ignore and just return the reference
181								},
182								Err(err) => {
183									// forward
184									Err(err)?;
185								},
186							}
187						}
188
189						// add as added
190						yield (next_parent, next);
191					},
192				}
193				break;
194			}
195		}
196	}
197}
198
199#[async_trait]
200pub trait BlockDiffFollow: Send + Sync {
201	/// Test if a next Cid should be followed.
202	async fn follow(&mut self, cid: &Cid) -> Result<bool, StorageError>;
203
204	/// Test if a previous Cid should be followed.
205	async fn follow_previous(&mut self, cid: &Cid) -> Result<bool, StorageError> {
206		self.follow(cid).await
207	}
208}
209
210/// Pop a ReferenceDepth for next, if one.
211fn pop_reference(prev_links: &mut HashMap<Cid, BinaryHeap<ReferenceDepth>>, next: &Cid) -> Option<ReferenceDepth> {
212	// pop previous
213	let reference = match prev_links.get_mut(next) {
214		Some(prev) => prev.pop(),
215		None => None,
216	};
217
218	// remove empty keys
219	if reference.is_some() {
220		if let Some(references) = prev_links.get(next) {
221			if references.is_empty() {
222				prev_links.remove(next);
223			}
224		}
225	}
226
227	// result
228	reference
229}
230
231/// Make all deep references shallow.
232async fn make_shallow<S, F>(
233	storage: &S,
234	links: &BlockLinks,
235	prev_links: &mut HashMap<Cid, BinaryHeap<ReferenceDepth>>,
236	follow: &mut F,
237) -> Result<(), StorageError>
238where
239	S: BlockStorage + Clone + 'static,
240	F: BlockDiffFollow + 'static,
241{
242	let deep_referencs: Vec<Cid> = prev_links
243		.iter()
244		.filter_map(|(cid, references)| match references.peek() {
245			Some(ReferenceDepth::Deep) => Some(*cid),
246			_ => None,
247		})
248		.collect();
249	for cid in deep_referencs {
250		if follow.follow_previous(&cid).await? {
251			make_reference_shallow(storage, links, prev_links, cid).await?;
252		}
253	}
254	Ok(())
255}
256
257/// Make a single deep reference shallow.
258async fn make_reference_shallow<S>(
259	storage: &S,
260	block_links: &BlockLinks,
261	prev_links: &mut HashMap<Cid, BinaryHeap<ReferenceDepth>>,
262	cid: Cid,
263) -> Result<(), StorageError>
264where
265	S: BlockStorage + Clone + 'static,
266{
267	// replace one deep reference with shallow
268	let links = if let Some(prev) = prev_links.get_mut(&cid) {
269		if let Some(mut reference) = prev.peek_mut() {
270			match *reference {
271				ReferenceDepth::Deep => match extract_links_children(storage, &cid, block_links).await {
272					Ok(links) => {
273						*reference = ReferenceDepth::Shallow;
274						Some(links)
275					},
276					Err(StorageError::NotFound(_, _)) => {
277						*reference = ReferenceDepth::DeepNotFound;
278						None
279					},
280					Err(err) => return Err(err),
281				},
282				_ => None,
283			}
284		} else {
285			None
286		}
287	} else {
288		None
289	};
290
291	// add new resolved links
292	if let Some(links) = links {
293		for link in links {
294			prev_links.entry(link).or_default().push(ReferenceDepth::Deep);
295		}
296	}
297
298	Ok(())
299}
300
301/// Extract children links from reference.
302async fn extract_links_children<S>(storage: &S, reference: &Cid, links: &BlockLinks) -> Result<Vec<Cid>, StorageError>
303where
304	S: BlockStorage + Clone + 'static,
305{
306	let block = storage.get(reference).await?;
307	let result = links.links(&block).map_err(StorageError::Internal)?.collect();
308	Ok(result)
309}
310
311/// Extract descendant links from reference.
312/// Does not stop after encountering an error.
313fn extract_links_descendants<'a, S>(
314	storage: &'a S,
315	block_links: &'a BlockLinks,
316	reference: Cid,
317) -> impl Stream<Item = Result<Cid, StorageError>> + use<'a, S>
318where
319	S: BlockStorage + Clone + 'static,
320{
321	async_stream::stream! {
322		let mut stack = VecDeque::new();
323		stack.push_back(reference);
324		while let Some(reference) = stack.pop_front() {
325			let block = match storage.get(&reference).await {
326				Ok(block) => block,
327				Err(err) => {
328					yield Err(err);
329					continue;
330				}
331			};
332			let links = match block_links.links(&block) {
333				Ok(links) => links,
334				Err(err) => {
335					yield Err(StorageError::Internal(err));
336					continue;
337				},
338			};
339			for link in links {
340				yield Ok(link);
341				stack.push_back(link);
342			}
343		}
344	}
345}
346
347#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
348enum ReferenceDepth {
349	/// Reference is deep (all descendants).
350	Deep,
351
352	/// Reference is shallow (no descendants).
353	Shallow,
354
355	/// Reference not found. Same as deep but will not be resolved to shallow again (all descendants).
356	DeepNotFound,
357}
358
359#[derive(Debug, PartialEq)]
360pub enum BlockDiff {
361	/// Item has been added.
362	Added(Cid),
363
364	/// Item has been removed.
365	Removed(Cid),
366}
367
368#[cfg(test)]
369mod tests {
370	use super::BlockDiffFollow;
371	use crate::{
372		library::{
373			block_diff::{block_diff, block_diff_added_with_parent, BlockDiff},
374			test::TestStorage,
375		},
376		BlockStorageExt, StorageError,
377	};
378	use async_trait::async_trait;
379	use cid::Cid;
380	use futures::TryStreamExt;
381	use serde::{Deserialize, Serialize};
382	use std::collections::BTreeSet;
383
384	pub struct FollowAll;
385	#[async_trait]
386	impl BlockDiffFollow for FollowAll {
387		async fn follow_previous(&mut self, _cid: &Cid) -> Result<bool, StorageError> {
388			Ok(true)
389		}
390		async fn follow(&mut self, _cid: &Cid) -> Result<bool, StorageError> {
391			Ok(true)
392		}
393	}
394
395	#[derive(Debug, Clone)]
396	struct FollowAllExcept {
397		previous: BTreeSet<Cid>,
398		next: BTreeSet<Cid>,
399	}
400	#[async_trait]
401	impl BlockDiffFollow for FollowAllExcept {
402		async fn follow_previous(&mut self, cid: &Cid) -> Result<bool, StorageError> {
403			Ok(!self.previous.contains(cid))
404		}
405		async fn follow(&mut self, cid: &Cid) -> Result<bool, StorageError> {
406			Ok(!self.next.contains(cid))
407		}
408	}
409
410	#[derive(Debug, Serialize, Deserialize, PartialEq)]
411	struct Node {
412		id: u32,
413		nodes: Vec<Cid>,
414	}
415
416	/// Replace a node (5) and its root (7).
417	///
418	/// From:
419	/// ```mermaid
420	/// flowchart TD
421	///   5 --> 1
422	///   5 --> 2
423	///   6 --> 3
424	///   6 --> 4
425	///   7 --> 5
426	///   7 --> 6
427	/// ```
428	///
429	/// To:
430	/// ```mermaid
431	/// flowchart TD
432	///   5' --> 1
433	///   5' --> 2
434	///   6 --> 3
435	///   6 --> 4
436	///   7' --> 5'
437	///   7' --> 6
438	/// ```
439	#[tokio::test]
440	async fn test_one_level() {
441		let storage = TestStorage::default();
442
443		// create
444		let node1 = storage.set_serialized(&Node { id: 1, nodes: vec![] }).await.unwrap();
445		let node2 = storage.set_serialized(&Node { id: 2, nodes: vec![] }).await.unwrap();
446		let node3 = storage.set_serialized(&Node { id: 3, nodes: vec![] }).await.unwrap();
447		let node4 = storage.set_serialized(&Node { id: 4, nodes: vec![] }).await.unwrap();
448		let node5 = storage
449			.set_serialized(&Node { id: 5, nodes: vec![node1, node2] })
450			.await
451			.unwrap();
452		let node6 = storage
453			.set_serialized(&Node { id: 6, nodes: vec![node3, node4] })
454			.await
455			.unwrap();
456		let node7 = storage
457			.set_serialized(&Node { id: 7, nodes: vec![node5, node6] })
458			.await
459			.unwrap();
460
461		// update
462		let node5_change = storage
463			.set_serialized(&Node { id: 50, nodes: vec![node1, node2] })
464			.await
465			.unwrap();
466		let node7_change = storage
467			.set_serialized(&Node { id: 70, nodes: vec![node5_change, node6] })
468			.await
469			.unwrap();
470
471		// diff
472		let diff =
473			block_diff(storage.clone(), Some(node7), node7_change, Default::default(), Default::default(), FollowAll)
474				.try_collect::<Vec<BlockDiff>>()
475				.await
476				.unwrap();
477		assert_eq!(diff.len(), 4);
478		assert!(diff.contains(&BlockDiff::Added(node7_change)));
479		assert!(diff.contains(&BlockDiff::Added(node5_change)));
480		assert!(diff.contains(&BlockDiff::Removed(node7)));
481		assert!(diff.contains(&BlockDiff::Removed(node5)));
482
483		// diff added with parent
484		let diff = block_diff_added_with_parent(
485			storage.clone(),
486			Some(node7),
487			node7_change,
488			Default::default(),
489			Default::default(),
490			FollowAll,
491		)
492		.try_collect::<Vec<(Option<Cid>, Cid)>>()
493		.await
494		.unwrap();
495		assert_eq!(diff.len(), 2);
496		assert!(diff.contains(&(None, node7_change)));
497		assert!(diff.contains(&(Some(node7_change), node5_change)));
498	}
499
500	/// Reparent a node (7).
501	///
502	/// From:
503	/// ```mermaid
504	/// flowchart TD
505	///   5 --> 1
506	///   5 --> 2
507	///   6 --> 3
508	///   6 --> 4
509	///   7 --> 5
510	///   7 --> 6
511	/// ```
512	///
513	/// To:
514	/// ```mermaid
515	/// flowchart TD
516	///   5 --> 1
517	///   5 --> 2
518	///   6 --> 3
519	///   6 --> 4
520	///   7 --> 5
521	///   7 --> 6
522	///   8' --> 7
523	/// ```
524	#[tokio::test]
525	async fn test_reparent_root() {
526		let storage = TestStorage::default();
527
528		// create
529		let node1 = storage.set_serialized(&Node { id: 1, nodes: vec![] }).await.unwrap();
530		let node2 = storage.set_serialized(&Node { id: 2, nodes: vec![] }).await.unwrap();
531		let node3 = storage.set_serialized(&Node { id: 3, nodes: vec![] }).await.unwrap();
532		let node4 = storage.set_serialized(&Node { id: 4, nodes: vec![] }).await.unwrap();
533		let node5 = storage
534			.set_serialized(&Node { id: 5, nodes: vec![node1, node2] })
535			.await
536			.unwrap();
537		let node6 = storage
538			.set_serialized(&Node { id: 6, nodes: vec![node3, node4] })
539			.await
540			.unwrap();
541		let node7 = storage
542			.set_serialized(&Node { id: 7, nodes: vec![node5, node6] })
543			.await
544			.unwrap();
545
546		// update
547		let node8_change = storage.set_serialized(&Node { id: 8, nodes: vec![node7] }).await.unwrap();
548
549		// diff
550		let diff =
551			block_diff(storage.clone(), Some(node7), node8_change, Default::default(), Default::default(), FollowAll)
552				.try_collect::<Vec<BlockDiff>>()
553				.await
554				.unwrap();
555		assert_eq!(diff.len(), 1);
556		assert!(diff.contains(&BlockDiff::Added(node8_change)));
557
558		// diff added with parent
559		let diff = block_diff_added_with_parent(
560			storage.clone(),
561			Some(node7),
562			node8_change,
563			Default::default(),
564			Default::default(),
565			FollowAll,
566		)
567		.try_collect::<Vec<(Option<Cid>, Cid)>>()
568		.await
569		.unwrap();
570		assert_eq!(diff.len(), 1);
571		assert!(diff.contains(&(None, node8_change)));
572	}
573
574	/// Replace a node (5) and its root (7).
575	///
576	/// From:
577	/// ```mermaid
578	/// flowchart TD
579	///   5 --> 1
580	///   5 --> 2
581	///   6 --> 3
582	///   6 --> 4
583	///   7 --> 5
584	///   7 --> 6
585	/// ```
586	///
587	/// To:
588	/// ```mermaid
589	/// flowchart TD
590	///   5' --> 1
591	///   5' --> 2
592	///   6 --> 3
593	///   6 --> 4
594	///   7' --> 5'
595	///   7' --> 6
596	/// ```
597	#[tokio::test]
598	async fn test_follow() {
599		let storage = TestStorage::default();
600
601		// create
602		let node1 = storage.set_serialized(&Node { id: 1, nodes: vec![] }).await.unwrap();
603		let node2 = storage.set_serialized(&Node { id: 2, nodes: vec![] }).await.unwrap();
604		let node3 = storage.set_serialized(&Node { id: 3, nodes: vec![] }).await.unwrap();
605		let node4 = storage.set_serialized(&Node { id: 4, nodes: vec![] }).await.unwrap();
606		let node5 = storage
607			.set_serialized(&Node { id: 5, nodes: vec![node1, node2] })
608			.await
609			.unwrap();
610		let node6 = storage
611			.set_serialized(&Node { id: 6, nodes: vec![node3, node4] })
612			.await
613			.unwrap();
614		let node7 = storage
615			.set_serialized(&Node { id: 7, nodes: vec![node5, node6] })
616			.await
617			.unwrap();
618
619		// update
620		let node5_change = storage
621			.set_serialized(&Node { id: 50, nodes: vec![node1, node2] })
622			.await
623			.unwrap();
624		let node7_change = storage
625			.set_serialized(&Node { id: 70, nodes: vec![node5_change, node6] })
626			.await
627			.unwrap();
628
629		// follow
630		let do_not_follow_7 =
631			FollowAllExcept { previous: [node7].into_iter().collect(), next: [node7_change].into_iter().collect() };
632
633		// diff
634		let diff = block_diff(
635			storage.clone(),
636			Some(node7),
637			node7_change,
638			Default::default(),
639			Default::default(),
640			do_not_follow_7.clone(),
641		)
642		.try_collect::<Vec<BlockDiff>>()
643		.await
644		.unwrap();
645		assert_eq!(diff.len(), 2);
646		assert!(diff.contains(&BlockDiff::Added(node7_change)));
647		assert!(diff.contains(&BlockDiff::Removed(node7)));
648
649		// diff added with parent
650		let diff = block_diff_added_with_parent(
651			storage.clone(),
652			Some(node7),
653			node7_change,
654			Default::default(),
655			Default::default(),
656			do_not_follow_7.clone(),
657		)
658		.try_collect::<Vec<(Option<Cid>, Cid)>>()
659		.await
660		.unwrap();
661		assert_eq!(diff.len(), 1);
662		assert!(diff.contains(&(None, node7_change)));
663	}
664}