Skip to main content

nectar_primitives/file/
sync_splitter_parallel.rs

1//! Sync parallel file splitter using random-access data sources.
2
3use std::marker::PhantomData;
4
5use rayon::prelude::*;
6
7use crate::bmt::DEFAULT_BODY_SIZE;
8use crate::chunk::ContentChunk;
9
10use super::constants::{LEVEL_LIMIT, compute_spans_inline};
11use super::error::{FileError, Result};
12use super::mode::{PlainMode, SplitMode};
13use super::sync_read_at::SyncReadAt;
14use super::tree::TreeParams;
15use crate::store::SyncChunkPut;
16
17#[cfg(feature = "encryption")]
18use super::mode::EncryptedMode;
19
20/// Parallel file splitter using random-access data sources.
21///
22/// Splits files by reading chunks at known offsets in parallel,
23/// then building intermediate levels.
24pub struct GenericSyncParallelSplitter<S, M: SplitMode, const BODY_SIZE: usize = DEFAULT_BODY_SIZE>
25where
26    S: SyncChunkPut<BODY_SIZE> + Send + Sync,
27{
28    store: S,
29    _mode: PhantomData<M>,
30}
31
32/// Plain (unencrypted) parallel splitter.
33pub type SyncParallelSplitter<S, const BODY_SIZE: usize = DEFAULT_BODY_SIZE> =
34    GenericSyncParallelSplitter<S, PlainMode, BODY_SIZE>;
35
36/// Encrypted parallel splitter.
37#[cfg(feature = "encryption")]
38pub type EncryptedSyncParallelSplitter<S, const BODY_SIZE: usize = DEFAULT_BODY_SIZE> =
39    GenericSyncParallelSplitter<S, EncryptedMode, BODY_SIZE>;
40
41impl<S, M, const BODY_SIZE: usize> std::fmt::Debug for GenericSyncParallelSplitter<S, M, BODY_SIZE>
42where
43    S: SyncChunkPut<BODY_SIZE> + Send + Sync,
44    M: SplitMode,
45{
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        f.debug_struct("GenericSyncParallelSplitter")
48            .finish_non_exhaustive()
49    }
50}
51
52impl<S, M, const BODY_SIZE: usize> GenericSyncParallelSplitter<S, M, BODY_SIZE>
53where
54    S: SyncChunkPut<BODY_SIZE> + Send + Sync,
55    M: SplitMode + Send + Sync,
56{
57    /// Create a parallel splitter with the given chunk store.
58    pub const fn new(store: S) -> Self {
59        Self {
60            store,
61            _mode: PhantomData,
62        }
63    }
64
65    /// Split data from a random-access source.
66    pub fn split<R: SyncReadAt + Sync>(&self, source: &R) -> Result<M::RootRef> {
67        const { super::constants::assert_valid_body_size::<BODY_SIZE>() };
68        let size = source.len();
69        let tree = TreeParams::<BODY_SIZE>::new(size);
70
71        if size == 0 {
72            return self.handle_empty();
73        }
74
75        let spans = compute_spans_inline(BODY_SIZE / M::REF_SIZE);
76
77        // Level 0: Create data chunks in parallel
78        let level0_refs = self.create_data_chunks(source, &tree)?;
79
80        // Build intermediate levels
81        self.build_intermediate_levels(level0_refs, size, &spans)
82    }
83
84    /// Consume the splitter and return the store.
85    pub fn into_store(self) -> S {
86        self.store
87    }
88
89    fn handle_empty(&self) -> Result<M::RootRef> {
90        M::process_empty::<BODY_SIZE, S>(&self.store)
91    }
92
93    fn create_data_chunks<R: SyncReadAt + Sync>(
94        &self,
95        source: &R,
96        tree: &TreeParams<BODY_SIZE>,
97    ) -> Result<Vec<M::RefBytes>> {
98        let data_chunks = tree.data_chunks();
99        let size = tree.size();
100
101        let results: Vec<Result<M::RefBytes>> = (0..data_chunks)
102            .into_par_iter()
103            .map(|i| {
104                let offset = i * BODY_SIZE as u64;
105                let chunk_size = ((size - offset) as usize).min(BODY_SIZE);
106
107                let mut buf = vec![0u8; chunk_size];
108                source
109                    .read_at(offset, &mut buf)
110                    .map_err(|e| FileError::Store(Box::new(e)))?;
111
112                let span = if i + 1 == data_chunks {
113                    size - offset
114                } else {
115                    BODY_SIZE as u64
116                };
117
118                let chunk_bytes = super::helpers::build_intermediate_payload(span, &buf);
119
120                let (chunk, ref_bytes) = M::prepare_chunk::<BODY_SIZE>(chunk_bytes)?;
121                self.put_chunk(chunk)?;
122                Ok(ref_bytes)
123            })
124            .collect();
125
126        results.into_iter().collect()
127    }
128
129    fn build_intermediate_levels(
130        &self,
131        mut refs: Vec<M::RefBytes>,
132        total_size: u64,
133        spans: &[u64; LEVEL_LIMIT],
134    ) -> Result<M::RootRef> {
135        let mut level = 1;
136
137        while refs.len() > 1 {
138            refs = self.build_level(&refs, level, total_size, spans)?;
139            level += 1;
140        }
141
142        // Extract root reference from the single remaining ref
143        M::extract_root(refs[0].as_ref())
144    }
145
146    fn build_level(
147        &self,
148        refs: &[M::RefBytes],
149        level: usize,
150        total_size: u64,
151        spans: &[u64; LEVEL_LIMIT],
152    ) -> Result<Vec<M::RefBytes>> {
153        let refs_per_chunk = M::refs_per_chunk(BODY_SIZE);
154        let chunks_at_level = refs.len().div_ceil(refs_per_chunk);
155        let max_span = spans[level] * BODY_SIZE as u64;
156
157        let results: Vec<Result<M::RefBytes>> = (0..chunks_at_level)
158            .into_par_iter()
159            .map(|i| {
160                let start = i * refs_per_chunk;
161                let end = (start + refs_per_chunk).min(refs.len());
162                let child_refs = &refs[start..end];
163
164                // Single reference: carry up without wrapping (dangling chunk optimization)
165                if child_refs.len() == 1 {
166                    return Ok(child_refs[0].clone());
167                }
168
169                let span = if i + 1 == chunks_at_level {
170                    total_size.saturating_sub(i as u64 * max_span)
171                } else {
172                    max_span
173                };
174
175                let ref_data: Vec<u8> = child_refs
176                    .iter()
177                    .flat_map(|r| r.as_ref())
178                    .copied()
179                    .collect();
180                let chunk_bytes = super::helpers::build_intermediate_payload(span, &ref_data);
181
182                let (chunk, ref_bytes) = M::prepare_chunk::<BODY_SIZE>(chunk_bytes)?;
183                self.put_chunk(chunk)?;
184                Ok(ref_bytes)
185            })
186            .collect();
187
188        results.into_iter().collect()
189    }
190
191    fn put_chunk(&self, chunk: ContentChunk<BODY_SIZE>) -> Result<()> {
192        self.store.put(chunk.into()).map_err(FileError::store)
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199    use crate::file::sync_join;
200    use crate::store::MemoryStore;
201
202    fn split_and_store(
203        data: &[u8],
204    ) -> (crate::chunk::ChunkAddress, MemoryStore<DEFAULT_BODY_SIZE>) {
205        let store = MemoryStore::<DEFAULT_BODY_SIZE>::new();
206        let splitter = SyncParallelSplitter::new(store);
207        let root = splitter.split(&data).unwrap();
208        let store = splitter.into_store();
209        (root, store)
210    }
211
212    generate_plain_splitter_tests!(split_and_store);
213
214    #[test]
215    fn test_parallel_splitter_varying_data() {
216        let data: Vec<u8> = (0..DEFAULT_BODY_SIZE * 5 + 123)
217            .map(|i| (i % 256) as u8)
218            .collect();
219
220        let (root, store) = split_and_store(&data);
221
222        let (seq_root, _) = crate::file::sync_split::<DEFAULT_BODY_SIZE>(&data).unwrap();
223        assert_eq!(root, seq_root);
224
225        let recovered = sync_join(&store, root).unwrap();
226        assert_eq!(recovered, data);
227    }
228
229    #[cfg(feature = "encryption")]
230    mod encrypted {
231        use super::*;
232        use crate::file::{EncryptedSyncParallelSplitter, sync_join, sync_split_encrypted};
233        use crate::store::MemoryStore;
234
235        fn encrypted_split_and_store(
236            data: &[u8],
237        ) -> (
238            crate::chunk::encryption::EncryptedChunkRef,
239            MemoryStore<DEFAULT_BODY_SIZE>,
240        ) {
241            let store = MemoryStore::<DEFAULT_BODY_SIZE>::new();
242            let splitter = EncryptedSyncParallelSplitter::new(store);
243            let root_ref = splitter.split(&data).unwrap();
244            let store = splitter.into_store();
245            (root_ref, store)
246        }
247
248        generate_encrypted_splitter_tests!(encrypted_split_and_store);
249
250        #[test]
251        fn test_encrypted_parallel_matches_sequential() {
252            let data: Vec<u8> = (0..DEFAULT_BODY_SIZE * 5 + 123)
253                .map(|i| (i % 256) as u8)
254                .collect();
255
256            let (par_ref, par_store) = encrypted_split_and_store(&data);
257            let (seq_ref, seq_store) = sync_split_encrypted::<DEFAULT_BODY_SIZE>(&data).unwrap();
258
259            assert_eq!(par_store.len(), seq_store.len());
260
261            let par_recovered = sync_join(&par_store, par_ref).unwrap();
262            assert_eq!(par_recovered, data);
263
264            let seq_recovered = sync_join(&seq_store, seq_ref).unwrap();
265            assert_eq!(seq_recovered, data);
266        }
267
268        #[test]
269        fn test_encrypted_parallel_nondeterministic() {
270            let data = b"test determinism";
271            let (ref1, _) = encrypted_split_and_store(data);
272            let (ref2, _) = encrypted_split_and_store(data);
273
274            // Different random keys each time
275            assert_ne!(ref1.address(), ref2.address());
276        }
277    }
278}