Skip to main content

bee/file/
stream.rs

1//! Streaming directory upload + recursive Mantaray persistence.
2//!
3//! Mirrors bee-js `Bee.streamDirectory` (`utils/chunk-stream.ts`) and
4//! the recursive `MantarayNode.saveRecursively` flow. Walks a
5//! filesystem directory, content-addresses each file in-process via
6//! [`FileChunker`], uploads the chunks via `POST /chunks` with
7//! bounded concurrency (`N=64`), assembles a Mantaray manifest with
8//! one fork per file, and persists the manifest with
9//! [`FileApi::save_manifest_recursively`].
10//!
11//! The optional `on_progress` callback fires once per uploaded chunk
12//! (matching bee-js's `onUploadProgress` shape) so callers can render
13//! a progress bar without polling.
14
15use std::collections::BTreeMap;
16use std::path::Path;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::sync::Mutex;
20use std::sync::atomic::{AtomicUsize, Ordering};
21
22use bytes::Bytes;
23use futures_util::stream::{FuturesUnordered, StreamExt};
24
25use crate::api::{CollectionUploadOptions, RedundantUploadOptions, UploadOptions, UploadResult};
26use crate::manifest::{MantarayNode, marshal};
27use crate::swarm::file_chunker::{FileChunker, SealedChunk};
28use crate::swarm::{BatchId, Error, Reference};
29
30use super::FileApi;
31use super::bzz::{CollectionEntry, read_directory_entries};
32
33/// Per-chunk upload signal emitted by [`FileApi::stream_directory`].
34/// Mirrors bee-js `UploadProgress`.
35#[derive(Clone, Copy, Debug)]
36pub struct StreamProgress {
37    /// Total chunks the upload will produce (computed before upload
38    /// starts).
39    pub total: usize,
40    /// Chunks uploaded so far (post-this-callback).
41    pub processed: usize,
42}
43
44/// Boxed callback for [`StreamProgress`] events.
45pub type OnStreamProgressFn = Arc<dyn Fn(StreamProgress) + Send + Sync>;
46
47/// Default in-flight upload cap. Matches bee-js's `AsyncQueue(64, 64)`.
48const STREAM_CONCURRENCY: usize = 64;
49
50type ChunkUploadFuture = Pin<Box<dyn Future<Output = Result<UploadResult, Error>> + Send>>;
51
52impl FileApi {
53    /// Persist a [`MantarayNode`] tree recursively, depth-first.
54    ///
55    /// Mirrors bee-js `MantarayNode.saveRecursively` — each child is
56    /// uploaded first (so its `self_address` is populated), then the
57    /// node itself is marshaled and uploaded via `/bytes`. The
58    /// resulting reference is stored on the node's `self_address` and
59    /// returned to the caller.
60    pub async fn save_manifest_recursively(
61        &self,
62        node: &mut MantarayNode,
63        batch_id: &BatchId,
64        opts: Option<&UploadOptions>,
65    ) -> Result<UploadResult, Error> {
66        // Async recursion needs explicit boxing.
67        Box::pin(save_manifest_recursively_inner(self, node, batch_id, opts)).await
68    }
69
70    /// Stream a directory upload chunk-by-chunk.
71    ///
72    /// For each regular file under `dir`, content-addresses it via
73    /// [`FileChunker`], uploads the resulting chunks via `POST /chunks`
74    /// with up to `N=64` concurrent in-flight uploads, then assembles a
75    /// Mantaray manifest with one fork per file (path →
76    /// content-addressed root). Finally calls
77    /// [`FileApi::save_manifest_recursively`] to persist the manifest
78    /// and returns its reference.
79    ///
80    /// Mirrors bee-js `Bee.streamDirectory`. The `on_progress` callback
81    /// fires once per uploaded chunk with `(processed, total)` counts.
82    ///
83    /// Differences from bee-js:
84    /// - File contents are read fully into memory before being fed to
85    ///   the chunker. True file-streaming (read → seal → upload as a
86    ///   pipeline) can be added later if a real use case lands.
87    /// - Per-file metadata (Content-Type / Filename) is not yet set on
88    ///   the Mantaray fork — bee-js sets `Content-Type` from the file
89    ///   extension, but bee-rs leaves manifests metadata-free for now;
90    ///   see [`Self::upload_collection`] for the tar-based path that
91    ///   lets Bee infer types server-side.
92    pub async fn stream_directory(
93        &self,
94        batch_id: &BatchId,
95        dir: impl AsRef<Path>,
96        opts: Option<&CollectionUploadOptions>,
97        on_progress: Option<OnStreamProgressFn>,
98    ) -> Result<UploadResult, Error> {
99        let entries = read_directory_entries(dir.as_ref())?;
100        self.stream_collection_entries(batch_id, &entries, opts, on_progress)
101            .await
102    }
103
104    /// Same as [`Self::stream_directory`] but takes pre-built
105    /// in-memory entries instead of walking the filesystem.
106    pub async fn stream_collection_entries(
107        &self,
108        batch_id: &BatchId,
109        entries: &[CollectionEntry],
110        opts: Option<&CollectionUploadOptions>,
111        on_progress: Option<OnStreamProgressFn>,
112    ) -> Result<UploadResult, Error> {
113        // Pre-count chunks across every file so the progress callback
114        // can render a stable total. The chunker fan-out is fixed
115        // (CHUNK_SIZE leaves, MAX_BRANCHES = 128 per intermediate
116        // level), so the count is a pure function of file size.
117        let total_chunks: usize = entries.iter().map(|e| total_chunks_for(e.data.len())).sum();
118        let processed = Arc::new(AtomicUsize::new(0));
119
120        // Upload-options inherit from the collection's base options.
121        let upload_opts: Option<UploadOptions> = opts.map(|o| o.base.clone());
122
123        let mut manifest = MantarayNode::new();
124        let mut has_index_html = false;
125
126        for entry in entries {
127            // The chunker callback is `FnMut + Send + 'static`, so it
128            // can't borrow a stack-local Vec. Share an `Arc<Mutex<…>>`
129            // instead — locking is uncontended (callback fires
130            // synchronously from this thread) so the cost is just a
131            // pair of atomic ops per chunk.
132            let chunks_buf: Arc<Mutex<Vec<SealedChunk>>> = Arc::new(Mutex::new(Vec::new()));
133            {
134                let chunks_buf = Arc::clone(&chunks_buf);
135                let mut chunker = FileChunker::with_callback(move |sealed| {
136                    chunks_buf
137                        .lock()
138                        .map_err(|_| Error::argument("chunker buffer poisoned"))?
139                        .push(sealed);
140                    Ok(())
141                });
142                chunker.write(&entry.data)?;
143                let _root = chunker.finalize()?;
144            }
145            let chunks = Arc::try_unwrap(chunks_buf)
146                .map_err(|_| Error::argument("chunker buffer still shared"))?
147                .into_inner()
148                .map_err(|_| Error::argument("chunker buffer poisoned"))?;
149
150            // Compute file root from chunker output (last chunk in the
151            // collapsed level stack). FileChunker.finalize already
152            // returns it, so capture it before draining.
153            let root: Reference = chunks
154                .last()
155                .map(|c| c.address.clone())
156                .ok_or_else(|| Error::argument(format!("empty file: {}", entry.path)))?;
157
158            // Stream the chunks: bounded N concurrent uploads.
159            let mut inflight: FuturesUnordered<ChunkUploadFuture> = FuturesUnordered::new();
160
161            for sealed in chunks {
162                while inflight.len() >= STREAM_CONCURRENCY {
163                    if let Some(res) = inflight.next().await {
164                        res?;
165                        bump_progress(&processed, total_chunks, on_progress.as_ref());
166                    }
167                }
168                let api = self.clone();
169                let batch = *batch_id;
170                let opts_clone = upload_opts.clone();
171                let body: Bytes = Bytes::from(sealed.data());
172                let fut: ChunkUploadFuture =
173                    Box::pin(
174                        async move { api.upload_chunk(&batch, body, opts_clone.as_ref()).await },
175                    );
176                inflight.push(fut);
177            }
178            while let Some(res) = inflight.next().await {
179                res?;
180                bump_progress(&processed, total_chunks, on_progress.as_ref());
181            }
182
183            // Add a fork to the manifest pointing at the file's root
184            // chunk. Per bee-js, `Content-Type` / `Filename` would
185            // also be set here; we leave the metadata field empty for
186            // parity with `upload_collection_entries` until the live
187            // soak shows it matters.
188            manifest.add_fork(entry.path.as_bytes(), Some(&root), None);
189            if entry.path == "index.html" {
190                has_index_html = true;
191            }
192        }
193
194        // Apply index_document / error_document metadata on the root
195        // fork. Mirrors bee-js's `mantaray.addFork('/', NULL_ADDRESS, …)`
196        // path.
197        if has_index_html
198            || opts.and_then(|o| o.index_document.as_deref()).is_some()
199            || opts.and_then(|o| o.error_document.as_deref()).is_some()
200        {
201            let mut metadata: BTreeMap<String, String> = BTreeMap::new();
202            if let Some(idx) = opts.and_then(|o| o.index_document.as_deref()) {
203                metadata.insert("website-index-document".to_string(), idx.to_string());
204            } else if has_index_html {
205                metadata.insert(
206                    "website-index-document".to_string(),
207                    "index.html".to_string(),
208                );
209            }
210            if let Some(err) = opts.and_then(|o| o.error_document.as_deref()) {
211                metadata.insert("website-error-document".to_string(), err.to_string());
212            }
213            // The root metadata fork uses `/` as the path and a null
214            // target — Bee treats this as the manifest root's
215            // metadata.
216            manifest.add_fork(b"/", None, Some(&metadata));
217        }
218
219        self.save_manifest_recursively(&mut manifest, batch_id, upload_opts.as_ref())
220            .await
221    }
222}
223
224fn bump_progress(processed: &Arc<AtomicUsize>, total: usize, cb: Option<&OnStreamProgressFn>) {
225    let n = processed.fetch_add(1, Ordering::SeqCst) + 1;
226    if let Some(cb) = cb {
227        cb(StreamProgress {
228            total,
229            processed: n,
230        });
231    }
232}
233
234/// Total number of chunks (leaves + intermediates) a file of `len`
235/// bytes will produce in the Swarm BMT chunker. Matches bee-js
236/// `totalChunks(size)` from `utils/chunk-size.ts`.
237fn total_chunks_for(len: usize) -> usize {
238    use crate::swarm::bmt::CHUNK_SIZE;
239    use crate::swarm::file_chunker::MAX_BRANCHES;
240
241    if len == 0 {
242        return 0;
243    }
244    let mut leaves = len.div_ceil(CHUNK_SIZE);
245    let mut total = leaves;
246    while leaves > 1 {
247        leaves = leaves.div_ceil(MAX_BRANCHES);
248        total += leaves;
249    }
250    total
251}
252
253async fn save_manifest_recursively_inner(
254    api: &FileApi,
255    node: &mut MantarayNode,
256    batch_id: &BatchId,
257    opts: Option<&UploadOptions>,
258) -> Result<UploadResult, Error> {
259    // Save children first so each fork's self_address is populated
260    // before we marshal this node.
261    for fork in node.forks.values_mut() {
262        let result = Box::pin(save_manifest_recursively_inner(
263            api,
264            &mut fork.node,
265            batch_id,
266            opts,
267        ))
268        .await?;
269        fork.node.self_address = Some(reference_to_self_address(&result.reference)?);
270    }
271    let bytes = marshal(node)?;
272    // upload_data takes RedundantUploadOptions (UploadOptions + a
273    // redundancy level). Wrap the caller's UploadOptions so we get
274    // the same pin/encrypt/act/tag/deferred behavior.
275    let redundant = opts.map(|o| RedundantUploadOptions {
276        base: o.clone(),
277        redundancy_level: None,
278    });
279    let result = api.upload_data(batch_id, bytes, redundant.as_ref()).await?;
280    node.self_address = Some(reference_to_self_address(&result.reference)?);
281    Ok(result)
282}
283
284/// Truncate a reference (32 or 64 bytes) to the 32-byte chunk address
285/// that goes into [`MantarayNode::self_address`]. For an encrypted
286/// 64-byte reference, the first 32 bytes are the CAC address; the
287/// trailing 32 bytes are the encryption key, which Mantaray stores
288/// elsewhere.
289fn reference_to_self_address(reference: &Reference) -> Result<[u8; 32], Error> {
290    reference
291        .as_bytes()
292        .first_chunk::<32>()
293        .copied()
294        .ok_or_else(|| Error::argument("manifest child reference < 32 bytes"))
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300
301    #[test]
302    fn total_chunks_for_matches_known_sizes() {
303        // Empty → 0.
304        assert_eq!(total_chunks_for(0), 0);
305        // Single leaf, no intermediate.
306        assert_eq!(total_chunks_for(1), 1);
307        assert_eq!(total_chunks_for(4096), 1);
308        // 4097 bytes → 2 leaves + 1 intermediate.
309        assert_eq!(total_chunks_for(4097), 3);
310        // Exactly 128 leaves (one full intermediate level).
311        let n = 4096 * 128;
312        assert_eq!(total_chunks_for(n), 128 + 1);
313        // 129 leaves → 2 intermediates + 1 root.
314        let n = 4096 * 129;
315        assert_eq!(total_chunks_for(n), 129 + 2 + 1);
316    }
317}