bee/swarm/
file_chunker.rs1use crate::swarm::bmt::{CHUNK_SIZE, SEGMENT_SIZE, calculate_chunk_address};
14use crate::swarm::errors::Error;
15use crate::swarm::typed_bytes::{Reference, SPAN_LENGTH, Span};
16
17pub const MAX_BRANCHES: usize = CHUNK_SIZE / SEGMENT_SIZE;
19
20#[derive(Clone, Debug)]
21struct LevelRef {
22 addr: [u8; 32],
23 span: u64,
24}
25
26#[derive(Clone, Debug, PartialEq, Eq)]
29pub struct ChunkerRoot {
30 pub address: Reference,
32 pub span: Span,
34}
35
36type OnChunkCallback = Box<dyn FnMut(SealedChunk) -> Result<(), Error> + Send>;
38
39pub struct FileChunker {
47 on_chunk: Option<OnChunkCallback>,
48 leaf_buf: Vec<u8>,
49 levels: Vec<Vec<LevelRef>>,
50}
51
52#[derive(Clone, Debug, PartialEq, Eq)]
54pub struct SealedChunk {
55 pub address: Reference,
57 pub span: Span,
59 pub payload: Vec<u8>,
61}
62
63impl SealedChunk {
64 pub fn data(&self) -> Vec<u8> {
66 let mut out = Vec::with_capacity(SPAN_LENGTH + self.payload.len());
67 out.extend_from_slice(self.span.as_bytes());
68 out.extend_from_slice(&self.payload);
69 out
70 }
71}
72
73impl Default for FileChunker {
74 fn default() -> Self {
75 Self::new()
76 }
77}
78
79impl FileChunker {
80 pub fn new() -> Self {
83 Self {
84 on_chunk: None,
85 leaf_buf: Vec::new(),
86 levels: Vec::new(),
87 }
88 }
89
90 pub fn with_callback<F>(callback: F) -> Self
93 where
94 F: FnMut(SealedChunk) -> Result<(), Error> + Send + 'static,
95 {
96 Self {
97 on_chunk: Some(Box::new(callback)),
98 leaf_buf: Vec::new(),
99 levels: Vec::new(),
100 }
101 }
102
103 pub fn write(&mut self, data: &[u8]) -> Result<usize, Error> {
106 let mut remaining = data;
107 let mut written = 0;
108 while !remaining.is_empty() {
109 let room = CHUNK_SIZE - self.leaf_buf.len();
110 let take = remaining.len().min(room);
111 self.leaf_buf.extend_from_slice(&remaining[..take]);
112 remaining = &remaining[take..];
113 written += take;
114 if self.leaf_buf.len() == CHUNK_SIZE {
115 self.flush_leaf()?;
116 }
117 }
118 Ok(written)
119 }
120
121 pub fn finalize(mut self) -> Result<ChunkerRoot, Error> {
125 if self.levels.is_empty() && self.leaf_buf.is_empty() {
126 return Err(Error::argument("FileChunker: no input"));
127 }
128 if !self.leaf_buf.is_empty() {
129 self.flush_leaf()?;
130 }
131
132 let mut level = 0;
136 while level < self.levels.len() {
137 if level == self.levels.len() - 1 && self.levels[level].len() == 1 {
138 break;
139 }
140 if !self.levels[level].is_empty() {
141 self.collapse_level(level)?;
142 }
143 level += 1;
144 }
145
146 let root_level = self.levels.len() - 1;
147 let root = self.levels[root_level][0].clone();
148
149 Ok(ChunkerRoot {
150 address: Reference::new(&root.addr)?,
151 span: Span::from_u64(root.span),
152 })
153 }
154
155 fn flush_leaf(&mut self) -> Result<(), Error> {
156 let payload = std::mem::take(&mut self.leaf_buf);
157 if payload.is_empty() {
158 return Ok(());
159 }
160 let span = Span::from_u64(payload.len() as u64);
161
162 let mut full = Vec::with_capacity(SPAN_LENGTH + payload.len());
163 full.extend_from_slice(span.as_bytes());
164 full.extend_from_slice(&payload);
165 let addr = calculate_chunk_address(&full)?;
166
167 if let Some(cb) = self.on_chunk.as_mut() {
168 cb(SealedChunk {
169 address: Reference::new(&addr)?,
170 span,
171 payload: payload.clone(),
172 })?;
173 }
174
175 if self.levels.is_empty() {
176 self.levels.push(Vec::new());
177 }
178 self.levels[0].push(LevelRef {
179 addr,
180 span: payload.len() as u64,
181 });
182 if self.levels[0].len() == MAX_BRANCHES {
183 self.collapse_level(0)?;
184 }
185 Ok(())
186 }
187
188 fn collapse_level(&mut self, level: usize) -> Result<(), Error> {
189 let refs = std::mem::take(&mut self.levels[level]);
190 if refs.is_empty() {
191 return Ok(());
192 }
193
194 let mut payload = Vec::with_capacity(refs.len() * SEGMENT_SIZE);
195 let mut total_span = 0u64;
196 for r in &refs {
197 payload.extend_from_slice(&r.addr);
198 total_span += r.span;
199 }
200 let span = Span::from_u64(total_span);
201
202 let mut full = Vec::with_capacity(SPAN_LENGTH + payload.len());
203 full.extend_from_slice(span.as_bytes());
204 full.extend_from_slice(&payload);
205 let addr = calculate_chunk_address(&full)?;
206
207 if let Some(cb) = self.on_chunk.as_mut() {
208 cb(SealedChunk {
209 address: Reference::new(&addr)?,
210 span,
211 payload: payload.clone(),
212 })?;
213 }
214
215 if level + 1 >= self.levels.len() {
216 self.levels.push(Vec::new());
217 }
218 self.levels[level + 1].push(LevelRef {
219 addr,
220 span: total_span,
221 });
222 if self.levels[level + 1].len() == MAX_BRANCHES {
223 self.collapse_level(level + 1)?;
224 }
225 Ok(())
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232 use crate::swarm::bmt::make_content_addressed_chunk;
233
234 #[test]
235 fn single_chunk_matches_direct_cac() {
236 let mut chunker = FileChunker::new();
237 chunker.write(b"hello world").unwrap();
238 let root = chunker.finalize().unwrap();
239 let direct = make_content_addressed_chunk(b"hello world").unwrap();
240 assert_eq!(root.address, direct.address);
241 assert_eq!(root.span, direct.span);
242 }
243
244 #[test]
245 fn empty_input_errors() {
246 let chunker = FileChunker::new();
247 assert!(chunker.finalize().is_err());
248 }
249
250 #[test]
251 fn callback_fires_for_every_chunk() {
252 use std::sync::{Arc, Mutex};
255 let count = Arc::new(Mutex::new(0usize));
256 let count_clone = count.clone();
257 let mut chunker = FileChunker::with_callback(move |_c| {
258 *count_clone.lock().unwrap() += 1;
259 Ok(())
260 });
261 let payload = vec![0xabu8; CHUNK_SIZE * 2];
262 chunker.write(&payload).unwrap();
263 let _ = chunker.finalize().unwrap();
264 assert_eq!(*count.lock().unwrap(), 3);
266 }
267
268 #[test]
269 fn root_span_is_total_byte_count() {
270 let mut chunker = FileChunker::new();
271 let payload = vec![0xcdu8; CHUNK_SIZE * 2 + 10];
272 chunker.write(&payload).unwrap();
273 let root = chunker.finalize().unwrap();
274 assert_eq!(root.span.to_u64(), (CHUNK_SIZE * 2 + 10) as u64);
275 }
276}