Skip to main content

co_primitives/library/
node_builder.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2// Copyright (C) 2026 1io BRANDGUARDIAN GmbH
3
4use super::dag_cbor_size_serializer::DagCborSizeSerializer;
5use crate::{Block, BlockSerializer, DefaultParams, Link, OptionLink, StoreParams};
6use cid::Cid;
7use either::Either;
8use serde::{Deserialize, Serialize};
9use std::{
10	marker::PhantomData,
11	mem::{swap, take},
12};
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub enum Node<T> {
16	#[serde(rename = "n")]
17	Node(Vec<Link<Self>>),
18	#[serde(rename = "l")]
19	Leaf(Vec<T>),
20}
21impl<T> Node<T> {
22	pub fn is_empty(&self) -> bool {
23		match self {
24			Node::Node(items) => items.is_empty(),
25			Node::Leaf(links) => links.is_empty(),
26		}
27	}
28}
29impl<T> Default for Node<T> {
30	fn default() -> Self {
31		Node::Leaf(vec![])
32	}
33}
34impl<T> NodeReader<T> for Node<T> {
35	type Filter = ();
36
37	fn read(self, _: &Self::Filter) -> Either<Vec<Cid>, Vec<T>> {
38		match self {
39			Node::Node(links) => Either::Left(links.into_iter().map(Into::into).collect()),
40			Node::Leaf(items) => Either::Right(items),
41		}
42	}
43}
44
45pub trait NodeReader<T> {
46	type Filter: Default;
47
48	fn read(self, filter: &Self::Filter) -> Either<Vec<Cid>, Vec<T>>;
49}
50
51#[derive(Debug, thiserror::Error)]
52pub enum NodeBuilderError {
53	#[error("Encoding failed")]
54	Encoding(#[source] anyhow::Error),
55
56	#[error("Invalid argument")]
57	InvalidArgument(#[source] anyhow::Error),
58}
59
60pub trait NodeSerializer<N, T> {
61	fn nodes(&mut self, nodes: Vec<Link<N>>) -> Result<N, NodeBuilderError>;
62
63	fn leaf(&mut self, entries: Vec<T>) -> Result<N, NodeBuilderError>;
64
65	fn serialize(&mut self, max_block_size: usize, node: N) -> Result<Block, NodeBuilderError>;
66
67	fn item_size_hint(&self, item: &T) -> Option<usize> {
68		let _item = item;
69		None
70	}
71}
72
73pub struct DefaultNodeSerializer {}
74impl Default for DefaultNodeSerializer {
75	fn default() -> Self {
76		Self::new()
77	}
78}
79impl DefaultNodeSerializer {
80	pub fn new() -> Self {
81		Self {}
82	}
83}
84impl<T> NodeSerializer<Node<T>, T> for DefaultNodeSerializer
85where
86	T: Serialize,
87{
88	fn nodes(&mut self, nodes: Vec<Link<Node<T>>>) -> Result<Node<T>, NodeBuilderError> {
89		Ok(Node::Node(nodes))
90	}
91
92	fn leaf(&mut self, entries: Vec<T>) -> Result<Node<T>, NodeBuilderError> {
93		Ok(Node::Leaf(entries))
94	}
95
96	fn serialize(&mut self, max_block_size: usize, node: Node<T>) -> Result<Block, NodeBuilderError> {
97		BlockSerializer::new()
98			.with_max_block_size(max_block_size)
99			.serialize(&node)
100			.map_err(|err| NodeBuilderError::Encoding(err.into()))
101	}
102
103	fn item_size_hint(&self, item: &T) -> Option<usize> {
104		let mut serializer = DagCborSizeSerializer::new();
105		item.serialize(&mut serializer).ok()?;
106		Some(serializer.size)
107	}
108}
109
110/// Create a balances merkle tree of Node blocks.
111///
112/// Note: This implementation requires the data to fit into memory.
113pub struct NodeBuilder<T, N = Node<T>, S = DefaultNodeSerializer>
114where
115	T: Clone,
116	S: NodeSerializer<N, T>,
117{
118	_node: PhantomData<N>,
119
120	// Current Items.
121	items: Vec<T>,
122	items_size: usize,
123	items_size_max: usize,
124
125	/// Block leaf references.
126	blocks: Vec<Link<N>>,
127
128	/// Computed leaf blocks to store.
129	pending_blocks: Vec<Block>,
130
131	/// Max children for each block.
132	max_children: usize,
133	max_block_size: usize,
134
135	/// Serializer.
136	serializer: S,
137}
138impl<T, N, S> NodeBuilder<T, N, S>
139where
140	T: Clone + Serialize,
141	S: NodeSerializer<N, T>,
142{
143	pub fn new(max_block_size: usize, max_children: usize, serializer: S) -> Self {
144		Self {
145			_node: Default::default(),
146			items: Vec::new(),
147			items_size: 0,
148			items_size_max: max_block_size * 3 / 4,
149			max_block_size,
150			blocks: Vec::new(),
151			pending_blocks: Vec::new(),
152			max_children,
153			serializer,
154		}
155	}
156
157	pub fn with_items_size_max(mut self, items_size_max: usize) -> Self {
158		debug_assert!(items_size_max > 0);
159		self.items_size_max = items_size_max;
160		self
161	}
162
163	pub fn push(&mut self, item: T) -> Result<(), NodeBuilderError> {
164		// size
165		let item_size = self.serializer.item_size_hint(&item).unwrap_or(0);
166		if self.items_size + item_size >= self.items_size_max {
167			self.flush()?;
168		}
169
170		// push item
171		self.items_size += item_size;
172		self.items.push(item);
173
174		// full?
175		if self.items.len() >= self.max_children || self.items_size >= self.items_size_max {
176			self.flush()?;
177		}
178
179		// done
180		Ok(())
181	}
182
183	pub fn extend(&mut self, items: impl IntoIterator<Item = T>) -> Result<(), NodeBuilderError> {
184		for item in items {
185			self.push(item)?;
186		}
187		Ok(())
188	}
189
190	/// Take blocks from builder that already has been created.
191	pub fn take_blocks(&mut self) -> impl Iterator<Item = Block> {
192		let mut blocks = Vec::new();
193		swap(&mut self.pending_blocks, &mut blocks);
194		blocks.into_iter()
195	}
196
197	/// Flush items into new leaf block.
198	fn flush(&mut self) -> Result<(), NodeBuilderError> {
199		let leaf = self.serializer.leaf(take(&mut self.items))?;
200		let block = self.serializer.serialize(self.max_block_size, leaf)?;
201		self.items_size = 0;
202		self.blocks.push(block.cid().into());
203		self.pending_blocks.push(block);
204		Ok(())
205	}
206
207	/// Flush blocks into new node block.
208	fn flush_level(&mut self) -> Result<(), NodeBuilderError> {
209		let mut level_link_blocks = self
210			.blocks
211			.as_slice()
212			.chunks(self.max_children)
213			.map(|chunk| {
214				let node = self.serializer.nodes(chunk.to_vec())?;
215				let block = self.serializer.serialize(self.max_block_size, node)?;
216				Ok(block)
217			})
218			.collect::<Result<Vec<Block>, NodeBuilderError>>()?;
219		let level_links = level_link_blocks
220			.iter()
221			.map(|block| block.cid().into())
222			.collect::<Vec<Link<N>>>();
223
224		// store created link blocks
225		self.pending_blocks.append(&mut level_link_blocks);
226
227		// apply level
228		self.blocks = level_links;
229
230		// result
231		Ok(())
232	}
233
234	/// Convert builder into blocks.
235	/// All blocks that are not yet taken using [`NodeBuilder::take_blocks`] are returned.
236	pub fn into_blocks(mut self) -> Result<(OptionLink<N>, Vec<Block>), NodeBuilderError> {
237		// empty?
238		if self.items.is_empty() && self.blocks.is_empty() {
239			return Ok((Default::default(), Default::default()));
240		}
241
242		// node
243		let (node, mut blocks) = self.take_node()?;
244		let root = self.serializer.serialize(self.max_block_size, node)?;
245		let root_link = root.cid().into();
246		blocks.push(root);
247		Ok((root_link, blocks))
248	}
249
250	/// Convert builder into a node and blocks if needed.
251	/// All blocks that are not yet taken using [`NodeBuilder::take_blocks`] are returned.
252	/// The root node is returned directly and not put into a block.
253	pub fn into_node(mut self) -> Result<(N, Vec<Block>), NodeBuilderError> {
254		self.take_node()
255	}
256
257	/// Take node and blocks. The serializer will be left empty.
258	fn take_node(&mut self) -> Result<(N, Vec<Block>), NodeBuilderError> {
259		// return a leaf if have no full blocks
260		if self.blocks.is_empty() {
261			let node = self.serializer.leaf(take(&mut self.items))?;
262			return Ok((node, Default::default()));
263		}
264
265		// flush
266		if !self.items.is_empty() {
267			self.flush()?;
268		}
269		while self.blocks.len() > self.max_children {
270			self.flush_level()?;
271		}
272
273		// node
274		let node = self.serializer.nodes(take(&mut self.blocks))?;
275
276		// result
277		Ok((node, take(&mut self.pending_blocks)))
278	}
279}
280impl<T> Default for NodeBuilder<T, Node<T>, DefaultNodeSerializer>
281where
282	T: Clone + Serialize,
283{
284	fn default() -> Self {
285		Self::new(DefaultParams::MAX_BLOCK_SIZE, 174, DefaultNodeSerializer::new())
286	}
287}
288
289#[cfg(test)]
290mod tests {
291	use crate::{
292		library::node_builder::{DefaultNodeSerializer, Node, NodeBuilder},
293		DefaultParams, StoreParams,
294	};
295	use std::iter::repeat_n;
296
297	#[test]
298	fn into_blocks() {
299		// build
300		let mut builder = NodeBuilder::<u8>::new(DefaultParams::MAX_BLOCK_SIZE, 2, DefaultNodeSerializer::new());
301		builder.push(1).unwrap();
302		builder.push(2).unwrap();
303		builder.push(3).unwrap();
304		builder.push(4).unwrap();
305		builder.push(5).unwrap();
306		builder.push(6).unwrap();
307		builder.push(7).unwrap();
308		builder.push(8).unwrap();
309
310		// blocks
311		let (_root, blocks) = builder.into_blocks().unwrap();
312		assert_eq!(blocks.len(), 7);
313		insta::assert_debug_snapshot!(blocks);
314	}
315
316	#[test]
317	fn single() {
318		// build
319		let mut builder = NodeBuilder::<u8>::new(DefaultParams::MAX_BLOCK_SIZE, 2, DefaultNodeSerializer::new());
320		builder.push(1).unwrap();
321
322		// blocks
323		let (_root, blocks) = builder.into_blocks().unwrap();
324		assert_eq!(blocks.len(), 1);
325	}
326
327	#[test]
328	fn empty() {
329		// build
330		let builder = NodeBuilder::<u8>::new(DefaultParams::MAX_BLOCK_SIZE, 2, DefaultNodeSerializer::new());
331
332		// blocks
333		let (_root, blocks) = builder.into_blocks().unwrap();
334		assert_eq!(blocks.len(), 0);
335	}
336
337	#[test]
338	fn roundtrip() {
339		// build
340		let mut builder = NodeBuilder::<u8>::new(DefaultParams::MAX_BLOCK_SIZE, 2, DefaultNodeSerializer::new());
341		builder.push(1).unwrap();
342		builder.push(2).unwrap();
343		builder.push(3).unwrap();
344		builder.push(4).unwrap();
345		builder.push(5).unwrap();
346		builder.push(6).unwrap();
347		builder.push(7).unwrap();
348		builder.push(8).unwrap();
349
350		// blocks
351		let (_root, blocks) = builder.into_blocks().unwrap();
352
353		// nodes
354		let nodes: Vec<Node<u8>> = blocks
355			.iter()
356			.map(|block| serde_ipld_dagcbor::from_slice::<Node<u8>>(block.data()).unwrap())
357			.collect();
358		insta::assert_yaml_snapshot!(nodes);
359	}
360
361	#[test]
362	fn big_blocks() {
363		// build
364		let mut builder = NodeBuilder::<Vec<u8>>::new(DefaultParams::MAX_BLOCK_SIZE, 174, DefaultNodeSerializer::new());
365		let block_size = DefaultParams::MAX_BLOCK_SIZE / 10;
366		for _ in 0..11 {
367			builder.push(repeat_n(0u8, block_size).collect()).unwrap();
368		}
369
370		// blocks
371		let (_root, blocks) = builder.into_blocks().unwrap();
372		assert_eq!(blocks.len(), 3);
373	}
374}