1use super::dag_cbor_size_serializer::DagCborSizeSerializer;
5use crate::{Block, BlockSerializer, DefaultParams, Link, OptionLink, StoreParams};
6use cid::Cid;
7use either::Either;
8use serde::{Deserialize, Serialize};
9use std::{
10 marker::PhantomData,
11 mem::{swap, take},
12};
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub enum Node<T> {
16 #[serde(rename = "n")]
17 Node(Vec<Link<Self>>),
18 #[serde(rename = "l")]
19 Leaf(Vec<T>),
20}
21impl<T> Node<T> {
22 pub fn is_empty(&self) -> bool {
23 match self {
24 Node::Node(items) => items.is_empty(),
25 Node::Leaf(links) => links.is_empty(),
26 }
27 }
28}
29impl<T> Default for Node<T> {
30 fn default() -> Self {
31 Node::Leaf(vec![])
32 }
33}
34impl<T> NodeReader<T> for Node<T> {
35 type Filter = ();
36
37 fn read(self, _: &Self::Filter) -> Either<Vec<Cid>, Vec<T>> {
38 match self {
39 Node::Node(links) => Either::Left(links.into_iter().map(Into::into).collect()),
40 Node::Leaf(items) => Either::Right(items),
41 }
42 }
43}
44
45pub trait NodeReader<T> {
46 type Filter: Default;
47
48 fn read(self, filter: &Self::Filter) -> Either<Vec<Cid>, Vec<T>>;
49}
50
51#[derive(Debug, thiserror::Error)]
52pub enum NodeBuilderError {
53 #[error("Encoding failed")]
54 Encoding(#[source] anyhow::Error),
55
56 #[error("Invalid argument")]
57 InvalidArgument(#[source] anyhow::Error),
58}
59
60pub trait NodeSerializer<N, T> {
61 fn nodes(&mut self, nodes: Vec<Link<N>>) -> Result<N, NodeBuilderError>;
62
63 fn leaf(&mut self, entries: Vec<T>) -> Result<N, NodeBuilderError>;
64
65 fn serialize(&mut self, max_block_size: usize, node: N) -> Result<Block, NodeBuilderError>;
66
67 fn item_size_hint(&self, item: &T) -> Option<usize> {
68 let _item = item;
69 None
70 }
71}
72
73pub struct DefaultNodeSerializer {}
74impl Default for DefaultNodeSerializer {
75 fn default() -> Self {
76 Self::new()
77 }
78}
79impl DefaultNodeSerializer {
80 pub fn new() -> Self {
81 Self {}
82 }
83}
84impl<T> NodeSerializer<Node<T>, T> for DefaultNodeSerializer
85where
86 T: Serialize,
87{
88 fn nodes(&mut self, nodes: Vec<Link<Node<T>>>) -> Result<Node<T>, NodeBuilderError> {
89 Ok(Node::Node(nodes))
90 }
91
92 fn leaf(&mut self, entries: Vec<T>) -> Result<Node<T>, NodeBuilderError> {
93 Ok(Node::Leaf(entries))
94 }
95
96 fn serialize(&mut self, max_block_size: usize, node: Node<T>) -> Result<Block, NodeBuilderError> {
97 BlockSerializer::new()
98 .with_max_block_size(max_block_size)
99 .serialize(&node)
100 .map_err(|err| NodeBuilderError::Encoding(err.into()))
101 }
102
103 fn item_size_hint(&self, item: &T) -> Option<usize> {
104 let mut serializer = DagCborSizeSerializer::new();
105 item.serialize(&mut serializer).ok()?;
106 Some(serializer.size)
107 }
108}
109
110pub struct NodeBuilder<T, N = Node<T>, S = DefaultNodeSerializer>
114where
115 T: Clone,
116 S: NodeSerializer<N, T>,
117{
118 _node: PhantomData<N>,
119
120 items: Vec<T>,
122 items_size: usize,
123 items_size_max: usize,
124
125 blocks: Vec<Link<N>>,
127
128 pending_blocks: Vec<Block>,
130
131 max_children: usize,
133 max_block_size: usize,
134
135 serializer: S,
137}
138impl<T, N, S> NodeBuilder<T, N, S>
139where
140 T: Clone + Serialize,
141 S: NodeSerializer<N, T>,
142{
143 pub fn new(max_block_size: usize, max_children: usize, serializer: S) -> Self {
144 Self {
145 _node: Default::default(),
146 items: Vec::new(),
147 items_size: 0,
148 items_size_max: max_block_size * 3 / 4,
149 max_block_size,
150 blocks: Vec::new(),
151 pending_blocks: Vec::new(),
152 max_children,
153 serializer,
154 }
155 }
156
157 pub fn with_items_size_max(mut self, items_size_max: usize) -> Self {
158 debug_assert!(items_size_max > 0);
159 self.items_size_max = items_size_max;
160 self
161 }
162
163 pub fn push(&mut self, item: T) -> Result<(), NodeBuilderError> {
164 let item_size = self.serializer.item_size_hint(&item).unwrap_or(0);
166 if self.items_size + item_size >= self.items_size_max {
167 self.flush()?;
168 }
169
170 self.items_size += item_size;
172 self.items.push(item);
173
174 if self.items.len() >= self.max_children || self.items_size >= self.items_size_max {
176 self.flush()?;
177 }
178
179 Ok(())
181 }
182
183 pub fn extend(&mut self, items: impl IntoIterator<Item = T>) -> Result<(), NodeBuilderError> {
184 for item in items {
185 self.push(item)?;
186 }
187 Ok(())
188 }
189
190 pub fn take_blocks(&mut self) -> impl Iterator<Item = Block> {
192 let mut blocks = Vec::new();
193 swap(&mut self.pending_blocks, &mut blocks);
194 blocks.into_iter()
195 }
196
197 fn flush(&mut self) -> Result<(), NodeBuilderError> {
199 let leaf = self.serializer.leaf(take(&mut self.items))?;
200 let block = self.serializer.serialize(self.max_block_size, leaf)?;
201 self.items_size = 0;
202 self.blocks.push(block.cid().into());
203 self.pending_blocks.push(block);
204 Ok(())
205 }
206
207 fn flush_level(&mut self) -> Result<(), NodeBuilderError> {
209 let mut level_link_blocks = self
210 .blocks
211 .as_slice()
212 .chunks(self.max_children)
213 .map(|chunk| {
214 let node = self.serializer.nodes(chunk.to_vec())?;
215 let block = self.serializer.serialize(self.max_block_size, node)?;
216 Ok(block)
217 })
218 .collect::<Result<Vec<Block>, NodeBuilderError>>()?;
219 let level_links = level_link_blocks
220 .iter()
221 .map(|block| block.cid().into())
222 .collect::<Vec<Link<N>>>();
223
224 self.pending_blocks.append(&mut level_link_blocks);
226
227 self.blocks = level_links;
229
230 Ok(())
232 }
233
234 pub fn into_blocks(mut self) -> Result<(OptionLink<N>, Vec<Block>), NodeBuilderError> {
237 if self.items.is_empty() && self.blocks.is_empty() {
239 return Ok((Default::default(), Default::default()));
240 }
241
242 let (node, mut blocks) = self.take_node()?;
244 let root = self.serializer.serialize(self.max_block_size, node)?;
245 let root_link = root.cid().into();
246 blocks.push(root);
247 Ok((root_link, blocks))
248 }
249
250 pub fn into_node(mut self) -> Result<(N, Vec<Block>), NodeBuilderError> {
254 self.take_node()
255 }
256
257 fn take_node(&mut self) -> Result<(N, Vec<Block>), NodeBuilderError> {
259 if self.blocks.is_empty() {
261 let node = self.serializer.leaf(take(&mut self.items))?;
262 return Ok((node, Default::default()));
263 }
264
265 if !self.items.is_empty() {
267 self.flush()?;
268 }
269 while self.blocks.len() > self.max_children {
270 self.flush_level()?;
271 }
272
273 let node = self.serializer.nodes(take(&mut self.blocks))?;
275
276 Ok((node, take(&mut self.pending_blocks)))
278 }
279}
280impl<T> Default for NodeBuilder<T, Node<T>, DefaultNodeSerializer>
281where
282 T: Clone + Serialize,
283{
284 fn default() -> Self {
285 Self::new(DefaultParams::MAX_BLOCK_SIZE, 174, DefaultNodeSerializer::new())
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use crate::{
292 library::node_builder::{DefaultNodeSerializer, Node, NodeBuilder},
293 DefaultParams, StoreParams,
294 };
295 use std::iter::repeat_n;
296
297 #[test]
298 fn into_blocks() {
299 let mut builder = NodeBuilder::<u8>::new(DefaultParams::MAX_BLOCK_SIZE, 2, DefaultNodeSerializer::new());
301 builder.push(1).unwrap();
302 builder.push(2).unwrap();
303 builder.push(3).unwrap();
304 builder.push(4).unwrap();
305 builder.push(5).unwrap();
306 builder.push(6).unwrap();
307 builder.push(7).unwrap();
308 builder.push(8).unwrap();
309
310 let (_root, blocks) = builder.into_blocks().unwrap();
312 assert_eq!(blocks.len(), 7);
313 insta::assert_debug_snapshot!(blocks);
314 }
315
316 #[test]
317 fn single() {
318 let mut builder = NodeBuilder::<u8>::new(DefaultParams::MAX_BLOCK_SIZE, 2, DefaultNodeSerializer::new());
320 builder.push(1).unwrap();
321
322 let (_root, blocks) = builder.into_blocks().unwrap();
324 assert_eq!(blocks.len(), 1);
325 }
326
327 #[test]
328 fn empty() {
329 let builder = NodeBuilder::<u8>::new(DefaultParams::MAX_BLOCK_SIZE, 2, DefaultNodeSerializer::new());
331
332 let (_root, blocks) = builder.into_blocks().unwrap();
334 assert_eq!(blocks.len(), 0);
335 }
336
337 #[test]
338 fn roundtrip() {
339 let mut builder = NodeBuilder::<u8>::new(DefaultParams::MAX_BLOCK_SIZE, 2, DefaultNodeSerializer::new());
341 builder.push(1).unwrap();
342 builder.push(2).unwrap();
343 builder.push(3).unwrap();
344 builder.push(4).unwrap();
345 builder.push(5).unwrap();
346 builder.push(6).unwrap();
347 builder.push(7).unwrap();
348 builder.push(8).unwrap();
349
350 let (_root, blocks) = builder.into_blocks().unwrap();
352
353 let nodes: Vec<Node<u8>> = blocks
355 .iter()
356 .map(|block| serde_ipld_dagcbor::from_slice::<Node<u8>>(block.data()).unwrap())
357 .collect();
358 insta::assert_yaml_snapshot!(nodes);
359 }
360
361 #[test]
362 fn big_blocks() {
363 let mut builder = NodeBuilder::<Vec<u8>>::new(DefaultParams::MAX_BLOCK_SIZE, 174, DefaultNodeSerializer::new());
365 let block_size = DefaultParams::MAX_BLOCK_SIZE / 10;
366 for _ in 0..11 {
367 builder.push(repeat_n(0u8, block_size).collect()).unwrap();
368 }
369
370 let (_root, blocks) = builder.into_blocks().unwrap();
372 assert_eq!(blocks.len(), 3);
373 }
374}