use std::marker::PhantomData;
use rayon::prelude::*;
use crate::bmt::DEFAULT_BODY_SIZE;
use crate::chunk::ContentChunk;
use super::constants::{LEVEL_LIMIT, compute_spans_inline};
use super::error::{FileError, Result};
use super::mode::{PlainMode, SplitMode};
use super::sync_read_at::SyncReadAt;
use super::tree::TreeParams;
use crate::store::SyncChunkPut;
#[cfg(feature = "encryption")]
use super::mode::EncryptedMode;
pub struct GenericSyncParallelSplitter<S, M: SplitMode, const BODY_SIZE: usize = DEFAULT_BODY_SIZE>
where
S: SyncChunkPut<BODY_SIZE> + Send + Sync,
{
store: S,
_mode: PhantomData<M>,
}
pub type SyncParallelSplitter<S, const BODY_SIZE: usize = DEFAULT_BODY_SIZE> =
GenericSyncParallelSplitter<S, PlainMode, BODY_SIZE>;
#[cfg(feature = "encryption")]
pub type EncryptedSyncParallelSplitter<S, const BODY_SIZE: usize = DEFAULT_BODY_SIZE> =
GenericSyncParallelSplitter<S, EncryptedMode, BODY_SIZE>;
impl<S, M, const BODY_SIZE: usize> std::fmt::Debug for GenericSyncParallelSplitter<S, M, BODY_SIZE>
where
S: SyncChunkPut<BODY_SIZE> + Send + Sync,
M: SplitMode,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GenericSyncParallelSplitter")
.finish_non_exhaustive()
}
}
impl<S, M, const BODY_SIZE: usize> GenericSyncParallelSplitter<S, M, BODY_SIZE>
where
S: SyncChunkPut<BODY_SIZE> + Send + Sync,
M: SplitMode + Send + Sync,
{
pub const fn new(store: S) -> Self {
Self {
store,
_mode: PhantomData,
}
}
pub fn split<R: SyncReadAt + Sync>(&self, source: &R) -> Result<M::RootRef> {
const { super::constants::assert_valid_body_size::<BODY_SIZE>() };
let size = source.len();
let tree = TreeParams::<BODY_SIZE>::new(size);
if size == 0 {
return self.handle_empty();
}
let spans = compute_spans_inline(BODY_SIZE / M::REF_SIZE);
let level0_refs = self.create_data_chunks(source, &tree)?;
self.build_intermediate_levels(level0_refs, size, &spans)
}
pub fn into_store(self) -> S {
self.store
}
fn handle_empty(&self) -> Result<M::RootRef> {
M::process_empty::<BODY_SIZE, S>(&self.store)
}
fn create_data_chunks<R: SyncReadAt + Sync>(
&self,
source: &R,
tree: &TreeParams<BODY_SIZE>,
) -> Result<Vec<M::RefBytes>> {
let data_chunks = tree.data_chunks();
let size = tree.size();
let results: Vec<Result<M::RefBytes>> = (0..data_chunks)
.into_par_iter()
.map(|i| {
let offset = i * BODY_SIZE as u64;
let chunk_size = ((size - offset) as usize).min(BODY_SIZE);
let mut buf = vec![0u8; chunk_size];
source
.read_at(offset, &mut buf)
.map_err(|e| FileError::Store(Box::new(e)))?;
let span = if i + 1 == data_chunks {
size - offset
} else {
BODY_SIZE as u64
};
let chunk_bytes = super::helpers::build_intermediate_payload(span, &buf);
let (chunk, ref_bytes) = M::prepare_chunk::<BODY_SIZE>(chunk_bytes)?;
self.put_chunk(chunk)?;
Ok(ref_bytes)
})
.collect();
results.into_iter().collect()
}
fn build_intermediate_levels(
&self,
mut refs: Vec<M::RefBytes>,
total_size: u64,
spans: &[u64; LEVEL_LIMIT],
) -> Result<M::RootRef> {
let mut level = 1;
while refs.len() > 1 {
refs = self.build_level(&refs, level, total_size, spans)?;
level += 1;
}
M::extract_root(refs[0].as_ref())
}
fn build_level(
&self,
refs: &[M::RefBytes],
level: usize,
total_size: u64,
spans: &[u64; LEVEL_LIMIT],
) -> Result<Vec<M::RefBytes>> {
let refs_per_chunk = M::refs_per_chunk(BODY_SIZE);
let chunks_at_level = refs.len().div_ceil(refs_per_chunk);
let max_span = spans[level] * BODY_SIZE as u64;
let results: Vec<Result<M::RefBytes>> = (0..chunks_at_level)
.into_par_iter()
.map(|i| {
let start = i * refs_per_chunk;
let end = (start + refs_per_chunk).min(refs.len());
let child_refs = &refs[start..end];
if child_refs.len() == 1 {
return Ok(child_refs[0].clone());
}
let span = if i + 1 == chunks_at_level {
total_size.saturating_sub(i as u64 * max_span)
} else {
max_span
};
let ref_data: Vec<u8> = child_refs
.iter()
.flat_map(|r| r.as_ref())
.copied()
.collect();
let chunk_bytes = super::helpers::build_intermediate_payload(span, &ref_data);
let (chunk, ref_bytes) = M::prepare_chunk::<BODY_SIZE>(chunk_bytes)?;
self.put_chunk(chunk)?;
Ok(ref_bytes)
})
.collect();
results.into_iter().collect()
}
fn put_chunk(&self, chunk: ContentChunk<BODY_SIZE>) -> Result<()> {
self.store.put(chunk.into()).map_err(FileError::store)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::file::sync_join;
use crate::store::MemoryStore;
fn split_and_store(
data: &[u8],
) -> (crate::chunk::ChunkAddress, MemoryStore<DEFAULT_BODY_SIZE>) {
let store = MemoryStore::<DEFAULT_BODY_SIZE>::new();
let splitter = SyncParallelSplitter::new(store);
let root = splitter.split(&data).unwrap();
let store = splitter.into_store();
(root, store)
}
generate_plain_splitter_tests!(split_and_store);
#[test]
fn test_parallel_splitter_varying_data() {
let data: Vec<u8> = (0..DEFAULT_BODY_SIZE * 5 + 123)
.map(|i| (i % 256) as u8)
.collect();
let (root, store) = split_and_store(&data);
let (seq_root, _) = crate::file::sync_split::<DEFAULT_BODY_SIZE>(&data).unwrap();
assert_eq!(root, seq_root);
let recovered = sync_join(&store, root).unwrap();
assert_eq!(recovered, data);
}
#[cfg(feature = "encryption")]
mod encrypted {
use super::*;
use crate::file::{EncryptedSyncParallelSplitter, sync_join, sync_split_encrypted};
use crate::store::MemoryStore;
fn encrypted_split_and_store(
data: &[u8],
) -> (
crate::chunk::encryption::EncryptedChunkRef,
MemoryStore<DEFAULT_BODY_SIZE>,
) {
let store = MemoryStore::<DEFAULT_BODY_SIZE>::new();
let splitter = EncryptedSyncParallelSplitter::new(store);
let root_ref = splitter.split(&data).unwrap();
let store = splitter.into_store();
(root_ref, store)
}
generate_encrypted_splitter_tests!(encrypted_split_and_store);
#[test]
fn test_encrypted_parallel_matches_sequential() {
let data: Vec<u8> = (0..DEFAULT_BODY_SIZE * 5 + 123)
.map(|i| (i % 256) as u8)
.collect();
let (par_ref, par_store) = encrypted_split_and_store(&data);
let (seq_ref, seq_store) = sync_split_encrypted::<DEFAULT_BODY_SIZE>(&data).unwrap();
assert_eq!(par_store.len(), seq_store.len());
let par_recovered = sync_join(&par_store, par_ref).unwrap();
assert_eq!(par_recovered, data);
let seq_recovered = sync_join(&seq_store, seq_ref).unwrap();
assert_eq!(seq_recovered, data);
}
#[test]
fn test_encrypted_parallel_nondeterministic() {
let data = b"test determinism";
let (ref1, _) = encrypted_split_and_store(data);
let (ref2, _) = encrypted_split_and_store(data);
assert_ne!(ref1.address(), ref2.address());
}
}
}