Skip to main content

bee/swarm/
file_chunker.rs

1//! Streaming Swarm content-addressed chunker. Mirrors bee-go's
2//! `pkg/swarm/file_chunker.go` and the bee-js `MerkleTree` chunker
3//! from `cafe-utility`.
4//!
5//! [`FileChunker`] turns an arbitrary byte stream into a tree of
6//! 4-KiB content-addressed chunks (CACs). Leaves carry the raw
7//! payload; intermediate chunks carry a flat list of child addresses
8//! (32 bytes each). The fan-out at every level is
9//! [`MAX_BRANCHES`] = `4096 / 32` = 128. Callers stream input via
10//! [`FileChunker::write`] and finish with [`FileChunker::finalize`],
11//! which returns the root chunk address.
12
13use crate::swarm::bmt::{CHUNK_SIZE, SEGMENT_SIZE, calculate_chunk_address};
14use crate::swarm::errors::Error;
15use crate::swarm::typed_bytes::{Reference, SPAN_LENGTH, Span};
16
17/// Fan-out at every intermediate level (`4096 / 32 = 128`).
18pub const MAX_BRANCHES: usize = CHUNK_SIZE / SEGMENT_SIZE;
19
20#[derive(Clone, Debug)]
21struct LevelRef {
22    addr: [u8; 32],
23    span: u64,
24}
25
26/// Result of finalizing a chunker: the root address plus the total
27/// number of bytes covered by the tree (the root span).
28#[derive(Clone, Debug, PartialEq, Eq)]
29pub struct ChunkerRoot {
30    /// Root address of the chunk tree.
31    pub address: Reference,
32    /// Span of the root chunk: total bytes covered by the tree.
33    pub span: Span,
34}
35
36/// Type alias for the on-chunk callback.
37type OnChunkCallback = Box<dyn FnMut(SealedChunk) -> Result<(), Error> + Send>;
38
39/// Streaming content-addressed chunker.
40///
41/// Construct one via [`FileChunker::new`], push bytes with
42/// [`FileChunker::write`], and call [`FileChunker::finalize`] to get
43/// the root address. Optionally pass an `on_chunk` callback to
44/// [`FileChunker::with_callback`] to be notified as each chunk is
45/// sealed (useful for streaming uploads).
46pub struct FileChunker {
47    on_chunk: Option<OnChunkCallback>,
48    leaf_buf: Vec<u8>,
49    levels: Vec<Vec<LevelRef>>,
50}
51
52/// Snapshot of a sealed chunk, passed to the `on_chunk` callback.
53#[derive(Clone, Debug, PartialEq, Eq)]
54pub struct SealedChunk {
55    /// Chunk address.
56    pub address: Reference,
57    /// Chunk span (8-byte LE u64).
58    pub span: Span,
59    /// Chunk payload (≤ `CHUNK_SIZE` bytes).
60    pub payload: Vec<u8>,
61}
62
63impl SealedChunk {
64    /// Wire form: `span (8) || payload`.
65    pub fn data(&self) -> Vec<u8> {
66        let mut out = Vec::with_capacity(SPAN_LENGTH + self.payload.len());
67        out.extend_from_slice(self.span.as_bytes());
68        out.extend_from_slice(&self.payload);
69        out
70    }
71}
72
73impl Default for FileChunker {
74    fn default() -> Self {
75        Self::new()
76    }
77}
78
79impl FileChunker {
80    /// New chunker without an on-chunk callback. Use this for offline
81    /// hashing.
82    pub fn new() -> Self {
83        Self {
84            on_chunk: None,
85            leaf_buf: Vec::new(),
86            levels: Vec::new(),
87        }
88    }
89
90    /// New chunker with an on-chunk callback. The callback runs
91    /// synchronously as each leaf or intermediate chunk is sealed.
92    pub fn with_callback<F>(callback: F) -> Self
93    where
94        F: FnMut(SealedChunk) -> Result<(), Error> + Send + 'static,
95    {
96        Self {
97            on_chunk: Some(Box::new(callback)),
98            leaf_buf: Vec::new(),
99            levels: Vec::new(),
100        }
101    }
102
103    /// Append bytes to the input stream. As each leaf reaches
104    /// [`CHUNK_SIZE`] it is sealed and propagated up the level stack.
105    pub fn write(&mut self, data: &[u8]) -> Result<usize, Error> {
106        let mut remaining = data;
107        let mut written = 0;
108        while !remaining.is_empty() {
109            let room = CHUNK_SIZE - self.leaf_buf.len();
110            let take = remaining.len().min(room);
111            self.leaf_buf.extend_from_slice(&remaining[..take]);
112            remaining = &remaining[take..];
113            written += take;
114            if self.leaf_buf.len() == CHUNK_SIZE {
115                self.flush_leaf()?;
116            }
117        }
118        Ok(written)
119    }
120
121    /// Seal any trailing partial leaf, collapse the level stack, and
122    /// return the root chunk's address + span. An empty chunker
123    /// (zero bytes written) is rejected as it has no valid root.
124    pub fn finalize(mut self) -> Result<ChunkerRoot, Error> {
125        if self.levels.is_empty() && self.leaf_buf.is_empty() {
126            return Err(Error::argument("FileChunker: no input"));
127        }
128        if !self.leaf_buf.is_empty() {
129            self.flush_leaf()?;
130        }
131
132        // Collapse the level stack from the bottom up. Skip levels
133        // that are already empty; for the highest level holding a
134        // single ref, that ref is the root.
135        let mut level = 0;
136        while level < self.levels.len() {
137            if level == self.levels.len() - 1 && self.levels[level].len() == 1 {
138                break;
139            }
140            if !self.levels[level].is_empty() {
141                self.collapse_level(level)?;
142            }
143            level += 1;
144        }
145
146        let root_level = self.levels.len() - 1;
147        let root = self.levels[root_level][0].clone();
148
149        Ok(ChunkerRoot {
150            address: Reference::new(&root.addr)?,
151            span: Span::from_u64(root.span),
152        })
153    }
154
155    fn flush_leaf(&mut self) -> Result<(), Error> {
156        let payload = std::mem::take(&mut self.leaf_buf);
157        if payload.is_empty() {
158            return Ok(());
159        }
160        let span = Span::from_u64(payload.len() as u64);
161
162        let mut full = Vec::with_capacity(SPAN_LENGTH + payload.len());
163        full.extend_from_slice(span.as_bytes());
164        full.extend_from_slice(&payload);
165        let addr = calculate_chunk_address(&full)?;
166
167        if let Some(cb) = self.on_chunk.as_mut() {
168            cb(SealedChunk {
169                address: Reference::new(&addr)?,
170                span,
171                payload: payload.clone(),
172            })?;
173        }
174
175        if self.levels.is_empty() {
176            self.levels.push(Vec::new());
177        }
178        self.levels[0].push(LevelRef {
179            addr,
180            span: payload.len() as u64,
181        });
182        if self.levels[0].len() == MAX_BRANCHES {
183            self.collapse_level(0)?;
184        }
185        Ok(())
186    }
187
188    fn collapse_level(&mut self, level: usize) -> Result<(), Error> {
189        let refs = std::mem::take(&mut self.levels[level]);
190        if refs.is_empty() {
191            return Ok(());
192        }
193
194        let mut payload = Vec::with_capacity(refs.len() * SEGMENT_SIZE);
195        let mut total_span = 0u64;
196        for r in &refs {
197            payload.extend_from_slice(&r.addr);
198            total_span += r.span;
199        }
200        let span = Span::from_u64(total_span);
201
202        let mut full = Vec::with_capacity(SPAN_LENGTH + payload.len());
203        full.extend_from_slice(span.as_bytes());
204        full.extend_from_slice(&payload);
205        let addr = calculate_chunk_address(&full)?;
206
207        if let Some(cb) = self.on_chunk.as_mut() {
208            cb(SealedChunk {
209                address: Reference::new(&addr)?,
210                span,
211                payload: payload.clone(),
212            })?;
213        }
214
215        if level + 1 >= self.levels.len() {
216            self.levels.push(Vec::new());
217        }
218        self.levels[level + 1].push(LevelRef {
219            addr,
220            span: total_span,
221        });
222        if self.levels[level + 1].len() == MAX_BRANCHES {
223            self.collapse_level(level + 1)?;
224        }
225        Ok(())
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232    use crate::swarm::bmt::make_content_addressed_chunk;
233
234    #[test]
235    fn single_chunk_matches_direct_cac() {
236        let mut chunker = FileChunker::new();
237        chunker.write(b"hello world").unwrap();
238        let root = chunker.finalize().unwrap();
239        let direct = make_content_addressed_chunk(b"hello world").unwrap();
240        assert_eq!(root.address, direct.address);
241        assert_eq!(root.span, direct.span);
242    }
243
244    #[test]
245    fn empty_input_errors() {
246        let chunker = FileChunker::new();
247        assert!(chunker.finalize().is_err());
248    }
249
250    #[test]
251    fn callback_fires_for_every_chunk() {
252        // Two full leaves + one parent = 3 callbacks. Use Arc<Mutex> to
253        // share a counter into the FnMut closure.
254        use std::sync::{Arc, Mutex};
255        let count = Arc::new(Mutex::new(0usize));
256        let count_clone = count.clone();
257        let mut chunker = FileChunker::with_callback(move |_c| {
258            *count_clone.lock().unwrap() += 1;
259            Ok(())
260        });
261        let payload = vec![0xabu8; CHUNK_SIZE * 2];
262        chunker.write(&payload).unwrap();
263        let _ = chunker.finalize().unwrap();
264        // 2 leaves + 1 parent
265        assert_eq!(*count.lock().unwrap(), 3);
266    }
267
268    #[test]
269    fn root_span_is_total_byte_count() {
270        let mut chunker = FileChunker::new();
271        let payload = vec![0xcdu8; CHUNK_SIZE * 2 + 10];
272        chunker.write(&payload).unwrap();
273        let root = chunker.finalize().unwrap();
274        assert_eq!(root.span.to_u64(), (CHUNK_SIZE * 2 + 10) as u64);
275    }
276}