Skip to main content

co_storage/library/
node_reader.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2// Copyright (C) 2026 1io BRANDGUARDIAN GmbH
3
4use crate::{Storage, StorageError};
5use cid::Cid;
6use co_primitives::{BlockSerializer, MultiCodec, Node};
7use serde::de::DeserializeOwned;
8use std::collections::VecDeque;
9
10// pub fn node_reader_fn<T, F>(storage: &dyn Storage, cid: &Cid, f: &F) -> anyhow::Result<()>
11// where
12// 	T: Clone + DeserializeOwned,
13// 	F: Fn(T),
14// {
15// 	// get block
16// 	let block = storage.get(cid)?;
17// 	if block.cid().codec() != Into::<u64>::into(DagCborCodec) {
18// 		return Err(StorageError::InvalidArgument)?
19// 	}
20
21// 	// get node
22// 	let node: Node<T> = from_cbor(block.data()).map_err(|_| StorageError::InvalidArgument)?;
23
24// 	// read
25// 	match node {
26// 		Node::Node(links) =>
27// 			for link in links {
28// 				node_reader_fn(storage, link.as_ref(), f)?;
29// 			},
30// 		Node::Leaf(entries) =>
31// 			for value in entries.into_iter() {
32// 				f(value);
33// 			},
34// 	}
35
36// 	// result
37// 	Ok(())
38// }
39
40pub fn node_reader<'a, T, S: Storage>(storage: &'a S, cid: &'a Cid) -> impl Iterator<Item = anyhow::Result<T>> + 'a
41where
42	T: Clone + DeserializeOwned + 'static,
43{
44	NodeIterator::new(storage, cid)
45}
46
47struct NodeIterator<'a, T, S>
48where
49	T: Clone + DeserializeOwned,
50{
51	storage: &'a S,
52	stack: VecDeque<Cid>,
53	entries: VecDeque<T>,
54}
55impl<'a, T, S> NodeIterator<'a, T, S>
56where
57	T: Clone + DeserializeOwned,
58	S: Storage,
59{
60	pub fn new(storage: &'a S, cid: &Cid) -> Self {
61		let mut stack = VecDeque::new();
62		stack.push_front(*cid);
63		Self { storage, stack, entries: Default::default() }
64	}
65}
66impl<'a, T, S> Iterator for NodeIterator<'a, T, S>
67where
68	T: Clone + DeserializeOwned,
69	S: Storage,
70{
71	type Item = anyhow::Result<T>;
72
73	fn next(&mut self) -> Option<Self::Item> {
74		// read node
75		while self.entries.is_empty() && !self.stack.is_empty() {
76			if let Some(next_cid) = self.stack.pop_front() {
77				let node = match read_node(self.storage, &next_cid) {
78					Ok(n) => n,
79					Err(e) => return Some(Err(e.into())),
80				};
81				match node {
82					Node::Node(links) => {
83						self.stack.extend(links.into_iter().map(|link| -> Cid { link.into() }));
84					},
85					Node::Leaf(entries) => self.entries = entries.into(),
86				}
87			}
88		}
89
90		// read
91		self.entries.pop_front().map(|entry| Ok(entry))
92	}
93}
94
95fn read_node<T: Clone + DeserializeOwned, S: Storage>(storage: &S, cid: &Cid) -> Result<Node<T>, StorageError> {
96	// get block
97	let block = storage.get(MultiCodec::with_cbor(cid)?)?;
98
99	// get node
100	let node: Node<T> = BlockSerializer::new()
101		.deserialize(&block)
102		.map_err(|e| StorageError::InvalidArgument(e.into()))?;
103
104	// result
105	Ok(node)
106}
107
108// enum NodeIteratorState<'a, T>
109// where
110// 	T: Clone + DeserializeOwned,
111// {
112// 	Start(&'a dyn Storage),
113// 	Node(&'a dyn Storage, Node<T>),
114// 	End,
115// }
116
117// struct NodeIterator<'a, T>
118// where
119// 	T: Clone + DeserializeOwned,
120// {
121// 	state: NodeIteratorState<'a, T>,
122// }
123
124// impl<'a, T> Iterator for NodeIterator<'a, T>
125// where
126// 	T: Clone + DeserializeOwned,
127// {
128// 	type Item = anyhow::Result<T>;
129
130// 	fn next(&mut self) -> Option<Self::Item> {
131// 		match self.state {
132// 			NodeIteratorState::Start(storage) => {
133// 				// get block
134// 				let block = storage.get(cid)?;
135// 				if block.cid().codec() != Into::<u64>::into(DagCborCodec) {
136// 					return Err(StorageError::InvalidArgument)?
137// 				}
138
139// 				// get node
140// 				let node: Node<T> =
141// 					from_cbor(block.data()).map_err(|_| StorageError::InvalidArgument)?;
142
143// 			},
144// 			NodeIteratorState::Node(storage, node) => todo!(),
145// 			NodeIteratorState::End => None,
146// 		}
147// 	}
148// }