nectar-primitives 0.1.0

Core primitives for Ethereum Swarm: chunks, addresses, and binary merkle trees
Documentation
//! Sync parallel file splitter using random-access data sources.

use std::marker::PhantomData;

use rayon::prelude::*;

use crate::bmt::DEFAULT_BODY_SIZE;
use crate::chunk::ContentChunk;

use super::constants::{LEVEL_LIMIT, compute_spans_inline};
use super::error::{FileError, Result};
use super::mode::{PlainMode, SplitMode};
use super::sync_read_at::SyncReadAt;
use super::tree::TreeParams;
use crate::store::SyncChunkPut;

#[cfg(feature = "encryption")]
use super::mode::EncryptedMode;

/// Parallel file splitter using random-access data sources.
///
/// Splits files by reading chunks at known offsets in parallel,
/// then building intermediate levels.
pub struct GenericSyncParallelSplitter<S, M: SplitMode, const BODY_SIZE: usize = DEFAULT_BODY_SIZE>
where
    S: SyncChunkPut<BODY_SIZE> + Send + Sync,
{
    store: S,
    _mode: PhantomData<M>,
}

/// Plain (unencrypted) parallel splitter.
pub type SyncParallelSplitter<S, const BODY_SIZE: usize = DEFAULT_BODY_SIZE> =
    GenericSyncParallelSplitter<S, PlainMode, BODY_SIZE>;

/// Encrypted parallel splitter.
#[cfg(feature = "encryption")]
pub type EncryptedSyncParallelSplitter<S, const BODY_SIZE: usize = DEFAULT_BODY_SIZE> =
    GenericSyncParallelSplitter<S, EncryptedMode, BODY_SIZE>;

impl<S, M, const BODY_SIZE: usize> std::fmt::Debug for GenericSyncParallelSplitter<S, M, BODY_SIZE>
where
    S: SyncChunkPut<BODY_SIZE> + Send + Sync,
    M: SplitMode,
{
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("GenericSyncParallelSplitter")
            .finish_non_exhaustive()
    }
}

impl<S, M, const BODY_SIZE: usize> GenericSyncParallelSplitter<S, M, BODY_SIZE>
where
    S: SyncChunkPut<BODY_SIZE> + Send + Sync,
    M: SplitMode + Send + Sync,
{
    /// Create a parallel splitter with the given chunk store.
    pub const fn new(store: S) -> Self {
        Self {
            store,
            _mode: PhantomData,
        }
    }

    /// Split data from a random-access source.
    pub fn split<R: SyncReadAt + Sync>(&self, source: &R) -> Result<M::RootRef> {
        const { super::constants::assert_valid_body_size::<BODY_SIZE>() };
        let size = source.len();
        let tree = TreeParams::<BODY_SIZE>::new(size);

        if size == 0 {
            return self.handle_empty();
        }

        let spans = compute_spans_inline(BODY_SIZE / M::REF_SIZE);

        // Level 0: Create data chunks in parallel
        let level0_refs = self.create_data_chunks(source, &tree)?;

        // Build intermediate levels
        self.build_intermediate_levels(level0_refs, size, &spans)
    }

    /// Consume the splitter and return the store.
    pub fn into_store(self) -> S {
        self.store
    }

    fn handle_empty(&self) -> Result<M::RootRef> {
        M::process_empty::<BODY_SIZE, S>(&self.store)
    }

    fn create_data_chunks<R: SyncReadAt + Sync>(
        &self,
        source: &R,
        tree: &TreeParams<BODY_SIZE>,
    ) -> Result<Vec<M::RefBytes>> {
        let data_chunks = tree.data_chunks();
        let size = tree.size();

        let results: Vec<Result<M::RefBytes>> = (0..data_chunks)
            .into_par_iter()
            .map(|i| {
                let offset = i * BODY_SIZE as u64;
                let chunk_size = ((size - offset) as usize).min(BODY_SIZE);

                let mut buf = vec![0u8; chunk_size];
                source
                    .read_at(offset, &mut buf)
                    .map_err(|e| FileError::Store(Box::new(e)))?;

                let span = if i + 1 == data_chunks {
                    size - offset
                } else {
                    BODY_SIZE as u64
                };

                let chunk_bytes = super::helpers::build_intermediate_payload(span, &buf);

                let (chunk, ref_bytes) = M::prepare_chunk::<BODY_SIZE>(chunk_bytes)?;
                self.put_chunk(chunk)?;
                Ok(ref_bytes)
            })
            .collect();

        results.into_iter().collect()
    }

    fn build_intermediate_levels(
        &self,
        mut refs: Vec<M::RefBytes>,
        total_size: u64,
        spans: &[u64; LEVEL_LIMIT],
    ) -> Result<M::RootRef> {
        let mut level = 1;

        while refs.len() > 1 {
            refs = self.build_level(&refs, level, total_size, spans)?;
            level += 1;
        }

        // Extract root reference from the single remaining ref
        M::extract_root(refs[0].as_ref())
    }

    fn build_level(
        &self,
        refs: &[M::RefBytes],
        level: usize,
        total_size: u64,
        spans: &[u64; LEVEL_LIMIT],
    ) -> Result<Vec<M::RefBytes>> {
        let refs_per_chunk = M::refs_per_chunk(BODY_SIZE);
        let chunks_at_level = refs.len().div_ceil(refs_per_chunk);
        let max_span = spans[level] * BODY_SIZE as u64;

        let results: Vec<Result<M::RefBytes>> = (0..chunks_at_level)
            .into_par_iter()
            .map(|i| {
                let start = i * refs_per_chunk;
                let end = (start + refs_per_chunk).min(refs.len());
                let child_refs = &refs[start..end];

                // Single reference: carry up without wrapping (dangling chunk optimization)
                if child_refs.len() == 1 {
                    return Ok(child_refs[0].clone());
                }

                let span = if i + 1 == chunks_at_level {
                    total_size.saturating_sub(i as u64 * max_span)
                } else {
                    max_span
                };

                let ref_data: Vec<u8> = child_refs
                    .iter()
                    .flat_map(|r| r.as_ref())
                    .copied()
                    .collect();
                let chunk_bytes = super::helpers::build_intermediate_payload(span, &ref_data);

                let (chunk, ref_bytes) = M::prepare_chunk::<BODY_SIZE>(chunk_bytes)?;
                self.put_chunk(chunk)?;
                Ok(ref_bytes)
            })
            .collect();

        results.into_iter().collect()
    }

    fn put_chunk(&self, chunk: ContentChunk<BODY_SIZE>) -> Result<()> {
        self.store.put(chunk.into()).map_err(FileError::store)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::file::sync_join;
    use crate::store::MemoryStore;

    fn split_and_store(
        data: &[u8],
    ) -> (crate::chunk::ChunkAddress, MemoryStore<DEFAULT_BODY_SIZE>) {
        let store = MemoryStore::<DEFAULT_BODY_SIZE>::new();
        let splitter = SyncParallelSplitter::new(store);
        let root = splitter.split(&data).unwrap();
        let store = splitter.into_store();
        (root, store)
    }

    generate_plain_splitter_tests!(split_and_store);

    #[test]
    fn test_parallel_splitter_varying_data() {
        let data: Vec<u8> = (0..DEFAULT_BODY_SIZE * 5 + 123)
            .map(|i| (i % 256) as u8)
            .collect();

        let (root, store) = split_and_store(&data);

        let (seq_root, _) = crate::file::sync_split::<DEFAULT_BODY_SIZE>(&data).unwrap();
        assert_eq!(root, seq_root);

        let recovered = sync_join(&store, root).unwrap();
        assert_eq!(recovered, data);
    }

    #[cfg(feature = "encryption")]
    mod encrypted {
        use super::*;
        use crate::file::{EncryptedSyncParallelSplitter, sync_join, sync_split_encrypted};
        use crate::store::MemoryStore;

        fn encrypted_split_and_store(
            data: &[u8],
        ) -> (
            crate::chunk::encryption::EncryptedChunkRef,
            MemoryStore<DEFAULT_BODY_SIZE>,
        ) {
            let store = MemoryStore::<DEFAULT_BODY_SIZE>::new();
            let splitter = EncryptedSyncParallelSplitter::new(store);
            let root_ref = splitter.split(&data).unwrap();
            let store = splitter.into_store();
            (root_ref, store)
        }

        generate_encrypted_splitter_tests!(encrypted_split_and_store);

        #[test]
        fn test_encrypted_parallel_matches_sequential() {
            let data: Vec<u8> = (0..DEFAULT_BODY_SIZE * 5 + 123)
                .map(|i| (i % 256) as u8)
                .collect();

            let (par_ref, par_store) = encrypted_split_and_store(&data);
            let (seq_ref, seq_store) = sync_split_encrypted::<DEFAULT_BODY_SIZE>(&data).unwrap();

            assert_eq!(par_store.len(), seq_store.len());

            let par_recovered = sync_join(&par_store, par_ref).unwrap();
            assert_eq!(par_recovered, data);

            let seq_recovered = sync_join(&seq_store, seq_ref).unwrap();
            assert_eq!(seq_recovered, data);
        }

        #[test]
        fn test_encrypted_parallel_nondeterministic() {
            let data = b"test determinism";
            let (ref1, _) = encrypted_split_and_store(data);
            let (ref2, _) = encrypted_split_and_store(data);

            // Different random keys each time
            assert_ne!(ref1.address(), ref2.address());
        }
    }
}