use std::collections::BTreeMap;
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
use bytes::Bytes;
use futures_util::stream::{FuturesUnordered, StreamExt};
use crate::api::{CollectionUploadOptions, RedundantUploadOptions, UploadOptions, UploadResult};
use crate::manifest::{MantarayNode, marshal};
use crate::swarm::file_chunker::{FileChunker, SealedChunk};
use crate::swarm::{BatchId, Error, Reference};
use super::FileApi;
use super::bzz::{CollectionEntry, read_directory_entries};
#[derive(Clone, Copy, Debug)]
pub struct StreamProgress {
pub total: usize,
pub processed: usize,
}
pub type OnStreamProgressFn = Arc<dyn Fn(StreamProgress) + Send + Sync>;
const STREAM_CONCURRENCY: usize = 64;
type ChunkUploadFuture = Pin<Box<dyn Future<Output = Result<UploadResult, Error>> + Send>>;
impl FileApi {
pub async fn save_manifest_recursively(
&self,
node: &mut MantarayNode,
batch_id: &BatchId,
opts: Option<&UploadOptions>,
) -> Result<UploadResult, Error> {
Box::pin(save_manifest_recursively_inner(self, node, batch_id, opts)).await
}
pub async fn stream_directory(
&self,
batch_id: &BatchId,
dir: impl AsRef<Path>,
opts: Option<&CollectionUploadOptions>,
on_progress: Option<OnStreamProgressFn>,
) -> Result<UploadResult, Error> {
let entries = read_directory_entries(dir.as_ref())?;
self.stream_collection_entries(batch_id, &entries, opts, on_progress)
.await
}
pub async fn stream_collection_entries(
&self,
batch_id: &BatchId,
entries: &[CollectionEntry],
opts: Option<&CollectionUploadOptions>,
on_progress: Option<OnStreamProgressFn>,
) -> Result<UploadResult, Error> {
let total_chunks: usize = entries.iter().map(|e| total_chunks_for(e.data.len())).sum();
let processed = Arc::new(AtomicUsize::new(0));
let upload_opts: Option<UploadOptions> = opts.map(|o| o.base.clone());
let mut manifest = MantarayNode::new();
let mut has_index_html = false;
for entry in entries {
let chunks_buf: Arc<Mutex<Vec<SealedChunk>>> = Arc::new(Mutex::new(Vec::new()));
{
let chunks_buf = Arc::clone(&chunks_buf);
let mut chunker = FileChunker::with_callback(move |sealed| {
chunks_buf
.lock()
.map_err(|_| Error::argument("chunker buffer poisoned"))?
.push(sealed);
Ok(())
});
chunker.write(&entry.data)?;
let _root = chunker.finalize()?;
}
let chunks = Arc::try_unwrap(chunks_buf)
.map_err(|_| Error::argument("chunker buffer still shared"))?
.into_inner()
.map_err(|_| Error::argument("chunker buffer poisoned"))?;
let root: Reference = chunks
.last()
.map(|c| c.address.clone())
.ok_or_else(|| Error::argument(format!("empty file: {}", entry.path)))?;
let mut inflight: FuturesUnordered<ChunkUploadFuture> = FuturesUnordered::new();
for sealed in chunks {
while inflight.len() >= STREAM_CONCURRENCY {
if let Some(res) = inflight.next().await {
res?;
bump_progress(&processed, total_chunks, on_progress.as_ref());
}
}
let api = self.clone();
let batch = *batch_id;
let opts_clone = upload_opts.clone();
let body: Bytes = Bytes::from(sealed.data());
let fut: ChunkUploadFuture =
Box::pin(
async move { api.upload_chunk(&batch, body, opts_clone.as_ref()).await },
);
inflight.push(fut);
}
while let Some(res) = inflight.next().await {
res?;
bump_progress(&processed, total_chunks, on_progress.as_ref());
}
manifest.add_fork(entry.path.as_bytes(), Some(&root), None);
if entry.path == "index.html" {
has_index_html = true;
}
}
if has_index_html
|| opts.and_then(|o| o.index_document.as_deref()).is_some()
|| opts.and_then(|o| o.error_document.as_deref()).is_some()
{
let mut metadata: BTreeMap<String, String> = BTreeMap::new();
if let Some(idx) = opts.and_then(|o| o.index_document.as_deref()) {
metadata.insert("website-index-document".to_string(), idx.to_string());
} else if has_index_html {
metadata.insert(
"website-index-document".to_string(),
"index.html".to_string(),
);
}
if let Some(err) = opts.and_then(|o| o.error_document.as_deref()) {
metadata.insert("website-error-document".to_string(), err.to_string());
}
manifest.add_fork(b"/", None, Some(&metadata));
}
self.save_manifest_recursively(&mut manifest, batch_id, upload_opts.as_ref())
.await
}
}
fn bump_progress(processed: &Arc<AtomicUsize>, total: usize, cb: Option<&OnStreamProgressFn>) {
let n = processed.fetch_add(1, Ordering::SeqCst) + 1;
if let Some(cb) = cb {
cb(StreamProgress {
total,
processed: n,
});
}
}
fn total_chunks_for(len: usize) -> usize {
use crate::swarm::bmt::CHUNK_SIZE;
use crate::swarm::file_chunker::MAX_BRANCHES;
if len == 0 {
return 0;
}
let mut leaves = len.div_ceil(CHUNK_SIZE);
let mut total = leaves;
while leaves > 1 {
leaves = leaves.div_ceil(MAX_BRANCHES);
total += leaves;
}
total
}
async fn save_manifest_recursively_inner(
api: &FileApi,
node: &mut MantarayNode,
batch_id: &BatchId,
opts: Option<&UploadOptions>,
) -> Result<UploadResult, Error> {
for fork in node.forks.values_mut() {
let result = Box::pin(save_manifest_recursively_inner(
api,
&mut fork.node,
batch_id,
opts,
))
.await?;
fork.node.self_address = Some(reference_to_self_address(&result.reference)?);
}
let bytes = marshal(node)?;
let redundant = opts.map(|o| RedundantUploadOptions {
base: o.clone(),
redundancy_level: None,
});
let result = api.upload_data(batch_id, bytes, redundant.as_ref()).await?;
node.self_address = Some(reference_to_self_address(&result.reference)?);
Ok(result)
}
fn reference_to_self_address(reference: &Reference) -> Result<[u8; 32], Error> {
reference
.as_bytes()
.first_chunk::<32>()
.copied()
.ok_or_else(|| Error::argument("manifest child reference < 32 bytes"))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn total_chunks_for_matches_known_sizes() {
assert_eq!(total_chunks_for(0), 0);
assert_eq!(total_chunks_for(1), 1);
assert_eq!(total_chunks_for(4096), 1);
assert_eq!(total_chunks_for(4097), 3);
let n = 4096 * 128;
assert_eq!(total_chunks_for(n), 128 + 1);
let n = 4096 * 129;
assert_eq!(total_chunks_for(n), 129 + 2 + 1);
}
}