bee/file/stream.rs
1//! Streaming directory upload + recursive Mantaray persistence.
2//!
3//! Mirrors bee-js `Bee.streamDirectory` (`utils/chunk-stream.ts`) and
4//! the recursive `MantarayNode.saveRecursively` flow. Walks a
5//! filesystem directory, content-addresses each file in-process via
6//! [`FileChunker`], uploads the chunks via `POST /chunks` with
7//! bounded concurrency (`N=64`), assembles a Mantaray manifest with
8//! one fork per file, and persists the manifest with
9//! [`FileApi::save_manifest_recursively`].
10//!
11//! The optional `on_progress` callback fires once per uploaded chunk
12//! (matching bee-js's `onUploadProgress` shape) so callers can render
13//! a progress bar without polling.
14
15use std::collections::BTreeMap;
16use std::path::Path;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::sync::Mutex;
20use std::sync::atomic::{AtomicUsize, Ordering};
21
22use bytes::Bytes;
23use futures_util::stream::{FuturesUnordered, StreamExt};
24
25use crate::api::{CollectionUploadOptions, RedundantUploadOptions, UploadOptions, UploadResult};
26use crate::manifest::{MantarayNode, marshal};
27use crate::swarm::file_chunker::{FileChunker, SealedChunk};
28use crate::swarm::{BatchId, Error, Reference};
29
30use super::FileApi;
31use super::bzz::{CollectionEntry, read_directory_entries};
32
33/// Per-chunk upload signal emitted by [`FileApi::stream_directory`].
34/// Mirrors bee-js `UploadProgress`.
35#[derive(Clone, Copy, Debug)]
36pub struct StreamProgress {
37 /// Total chunks the upload will produce (computed before upload
38 /// starts).
39 pub total: usize,
40 /// Chunks uploaded so far (post-this-callback).
41 pub processed: usize,
42}
43
44/// Boxed callback for [`StreamProgress`] events.
45pub type OnStreamProgressFn = Arc<dyn Fn(StreamProgress) + Send + Sync>;
46
47/// Default in-flight upload cap. Matches bee-js's `AsyncQueue(64, 64)`.
48const STREAM_CONCURRENCY: usize = 64;
49
50type ChunkUploadFuture = Pin<Box<dyn Future<Output = Result<UploadResult, Error>> + Send>>;
51
52impl FileApi {
53 /// Persist a [`MantarayNode`] tree recursively, depth-first.
54 ///
55 /// Mirrors bee-js `MantarayNode.saveRecursively` — each child is
56 /// uploaded first (so its `self_address` is populated), then the
57 /// node itself is marshaled and uploaded via `/bytes`. The
58 /// resulting reference is stored on the node's `self_address` and
59 /// returned to the caller.
60 pub async fn save_manifest_recursively(
61 &self,
62 node: &mut MantarayNode,
63 batch_id: &BatchId,
64 opts: Option<&UploadOptions>,
65 ) -> Result<UploadResult, Error> {
66 // Async recursion needs explicit boxing.
67 Box::pin(save_manifest_recursively_inner(self, node, batch_id, opts)).await
68 }
69
70 /// Stream a directory upload chunk-by-chunk.
71 ///
72 /// For each regular file under `dir`, content-addresses it via
73 /// [`FileChunker`], uploads the resulting chunks via `POST /chunks`
74 /// with up to `N=64` concurrent in-flight uploads, then assembles a
75 /// Mantaray manifest with one fork per file (path →
76 /// content-addressed root). Finally calls
77 /// [`FileApi::save_manifest_recursively`] to persist the manifest
78 /// and returns its reference.
79 ///
80 /// Mirrors bee-js `Bee.streamDirectory`. The `on_progress` callback
81 /// fires once per uploaded chunk with `(processed, total)` counts.
82 ///
83 /// Differences from bee-js:
84 /// - File contents are read fully into memory before being fed to
85 /// the chunker. True file-streaming (read → seal → upload as a
86 /// pipeline) can be added later if a real use case lands.
87 /// - Per-file metadata (Content-Type / Filename) is not yet set on
88 /// the Mantaray fork — bee-js sets `Content-Type` from the file
89 /// extension, but bee-rs leaves manifests metadata-free for now;
90 /// see [`Self::upload_collection`] for the tar-based path that
91 /// lets Bee infer types server-side.
92 pub async fn stream_directory(
93 &self,
94 batch_id: &BatchId,
95 dir: impl AsRef<Path>,
96 opts: Option<&CollectionUploadOptions>,
97 on_progress: Option<OnStreamProgressFn>,
98 ) -> Result<UploadResult, Error> {
99 let entries = read_directory_entries(dir.as_ref())?;
100 self.stream_collection_entries(batch_id, &entries, opts, on_progress)
101 .await
102 }
103
104 /// Same as [`Self::stream_directory`] but takes pre-built
105 /// in-memory entries instead of walking the filesystem.
106 pub async fn stream_collection_entries(
107 &self,
108 batch_id: &BatchId,
109 entries: &[CollectionEntry],
110 opts: Option<&CollectionUploadOptions>,
111 on_progress: Option<OnStreamProgressFn>,
112 ) -> Result<UploadResult, Error> {
113 // Pre-count chunks across every file so the progress callback
114 // can render a stable total. The chunker fan-out is fixed
115 // (CHUNK_SIZE leaves, MAX_BRANCHES = 128 per intermediate
116 // level), so the count is a pure function of file size.
117 let total_chunks: usize = entries.iter().map(|e| total_chunks_for(e.data.len())).sum();
118 let processed = Arc::new(AtomicUsize::new(0));
119
120 // Upload-options inherit from the collection's base options.
121 let upload_opts: Option<UploadOptions> = opts.map(|o| o.base.clone());
122
123 let mut manifest = MantarayNode::new();
124 let mut has_index_html = false;
125
126 for entry in entries {
127 // The chunker callback is `FnMut + Send + 'static`, so it
128 // can't borrow a stack-local Vec. Share an `Arc<Mutex<…>>`
129 // instead — locking is uncontended (callback fires
130 // synchronously from this thread) so the cost is just a
131 // pair of atomic ops per chunk.
132 let chunks_buf: Arc<Mutex<Vec<SealedChunk>>> = Arc::new(Mutex::new(Vec::new()));
133 {
134 let chunks_buf = Arc::clone(&chunks_buf);
135 let mut chunker = FileChunker::with_callback(move |sealed| {
136 chunks_buf
137 .lock()
138 .map_err(|_| Error::argument("chunker buffer poisoned"))?
139 .push(sealed);
140 Ok(())
141 });
142 chunker.write(&entry.data)?;
143 let _root = chunker.finalize()?;
144 }
145 let chunks = Arc::try_unwrap(chunks_buf)
146 .map_err(|_| Error::argument("chunker buffer still shared"))?
147 .into_inner()
148 .map_err(|_| Error::argument("chunker buffer poisoned"))?;
149
150 // Compute file root from chunker output (last chunk in the
151 // collapsed level stack). FileChunker.finalize already
152 // returns it, so capture it before draining.
153 let root: Reference = chunks
154 .last()
155 .map(|c| c.address.clone())
156 .ok_or_else(|| Error::argument(format!("empty file: {}", entry.path)))?;
157
158 // Stream the chunks: bounded N concurrent uploads.
159 let mut inflight: FuturesUnordered<ChunkUploadFuture> = FuturesUnordered::new();
160
161 for sealed in chunks {
162 while inflight.len() >= STREAM_CONCURRENCY {
163 if let Some(res) = inflight.next().await {
164 res?;
165 bump_progress(&processed, total_chunks, on_progress.as_ref());
166 }
167 }
168 let api = self.clone();
169 let batch = *batch_id;
170 let opts_clone = upload_opts.clone();
171 let body: Bytes = Bytes::from(sealed.data());
172 let fut: ChunkUploadFuture =
173 Box::pin(
174 async move { api.upload_chunk(&batch, body, opts_clone.as_ref()).await },
175 );
176 inflight.push(fut);
177 }
178 while let Some(res) = inflight.next().await {
179 res?;
180 bump_progress(&processed, total_chunks, on_progress.as_ref());
181 }
182
183 // Add a fork to the manifest pointing at the file's root
184 // chunk. Per bee-js, `Content-Type` / `Filename` would
185 // also be set here; we leave the metadata field empty for
186 // parity with `upload_collection_entries` until the live
187 // soak shows it matters.
188 manifest.add_fork(entry.path.as_bytes(), Some(&root), None);
189 if entry.path == "index.html" {
190 has_index_html = true;
191 }
192 }
193
194 // Apply index_document / error_document metadata on the root
195 // fork. Mirrors bee-js's `mantaray.addFork('/', NULL_ADDRESS, …)`
196 // path.
197 if has_index_html
198 || opts.and_then(|o| o.index_document.as_deref()).is_some()
199 || opts.and_then(|o| o.error_document.as_deref()).is_some()
200 {
201 let mut metadata: BTreeMap<String, String> = BTreeMap::new();
202 if let Some(idx) = opts.and_then(|o| o.index_document.as_deref()) {
203 metadata.insert("website-index-document".to_string(), idx.to_string());
204 } else if has_index_html {
205 metadata.insert(
206 "website-index-document".to_string(),
207 "index.html".to_string(),
208 );
209 }
210 if let Some(err) = opts.and_then(|o| o.error_document.as_deref()) {
211 metadata.insert("website-error-document".to_string(), err.to_string());
212 }
213 // The root metadata fork uses `/` as the path and a null
214 // target — Bee treats this as the manifest root's
215 // metadata.
216 manifest.add_fork(b"/", None, Some(&metadata));
217 }
218
219 self.save_manifest_recursively(&mut manifest, batch_id, upload_opts.as_ref())
220 .await
221 }
222}
223
224fn bump_progress(processed: &Arc<AtomicUsize>, total: usize, cb: Option<&OnStreamProgressFn>) {
225 let n = processed.fetch_add(1, Ordering::SeqCst) + 1;
226 if let Some(cb) = cb {
227 cb(StreamProgress {
228 total,
229 processed: n,
230 });
231 }
232}
233
234/// Total number of chunks (leaves + intermediates) a file of `len`
235/// bytes will produce in the Swarm BMT chunker. Matches bee-js
236/// `totalChunks(size)` from `utils/chunk-size.ts`.
237fn total_chunks_for(len: usize) -> usize {
238 use crate::swarm::bmt::CHUNK_SIZE;
239 use crate::swarm::file_chunker::MAX_BRANCHES;
240
241 if len == 0 {
242 return 0;
243 }
244 let mut leaves = len.div_ceil(CHUNK_SIZE);
245 let mut total = leaves;
246 while leaves > 1 {
247 leaves = leaves.div_ceil(MAX_BRANCHES);
248 total += leaves;
249 }
250 total
251}
252
253async fn save_manifest_recursively_inner(
254 api: &FileApi,
255 node: &mut MantarayNode,
256 batch_id: &BatchId,
257 opts: Option<&UploadOptions>,
258) -> Result<UploadResult, Error> {
259 // Save children first so each fork's self_address is populated
260 // before we marshal this node.
261 for fork in node.forks.values_mut() {
262 let result = Box::pin(save_manifest_recursively_inner(
263 api,
264 &mut fork.node,
265 batch_id,
266 opts,
267 ))
268 .await?;
269 fork.node.self_address = Some(reference_to_self_address(&result.reference)?);
270 }
271 let bytes = marshal(node)?;
272 // upload_data takes RedundantUploadOptions (UploadOptions + a
273 // redundancy level). Wrap the caller's UploadOptions so we get
274 // the same pin/encrypt/act/tag/deferred behavior.
275 let redundant = opts.map(|o| RedundantUploadOptions {
276 base: o.clone(),
277 redundancy_level: None,
278 });
279 let result = api.upload_data(batch_id, bytes, redundant.as_ref()).await?;
280 node.self_address = Some(reference_to_self_address(&result.reference)?);
281 Ok(result)
282}
283
284/// Truncate a reference (32 or 64 bytes) to the 32-byte chunk address
285/// that goes into [`MantarayNode::self_address`]. For an encrypted
286/// 64-byte reference, the first 32 bytes are the CAC address; the
287/// trailing 32 bytes are the encryption key, which Mantaray stores
288/// elsewhere.
289fn reference_to_self_address(reference: &Reference) -> Result<[u8; 32], Error> {
290 reference
291 .as_bytes()
292 .first_chunk::<32>()
293 .copied()
294 .ok_or_else(|| Error::argument("manifest child reference < 32 bytes"))
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300
301 #[test]
302 fn total_chunks_for_matches_known_sizes() {
303 // Empty → 0.
304 assert_eq!(total_chunks_for(0), 0);
305 // Single leaf, no intermediate.
306 assert_eq!(total_chunks_for(1), 1);
307 assert_eq!(total_chunks_for(4096), 1);
308 // 4097 bytes → 2 leaves + 1 intermediate.
309 assert_eq!(total_chunks_for(4097), 3);
310 // Exactly 128 leaves (one full intermediate level).
311 let n = 4096 * 128;
312 assert_eq!(total_chunks_for(n), 128 + 1);
313 // 129 leaves → 2 intermediates + 1 root.
314 let n = 4096 * 129;
315 assert_eq!(total_chunks_for(n), 129 + 2 + 1);
316 }
317}