nectar_primitives/file/
sync_splitter_parallel.rs1use std::marker::PhantomData;
4
5use rayon::prelude::*;
6
7use crate::bmt::DEFAULT_BODY_SIZE;
8use crate::chunk::ContentChunk;
9
10use super::constants::{LEVEL_LIMIT, compute_spans_inline};
11use super::error::{FileError, Result};
12use super::mode::{PlainMode, SplitMode};
13use super::sync_read_at::SyncReadAt;
14use super::tree::TreeParams;
15use crate::store::SyncChunkPut;
16
17#[cfg(feature = "encryption")]
18use super::mode::EncryptedMode;
19
20pub struct GenericSyncParallelSplitter<S, M: SplitMode, const BODY_SIZE: usize = DEFAULT_BODY_SIZE>
25where
26 S: SyncChunkPut<BODY_SIZE> + Send + Sync,
27{
28 store: S,
29 _mode: PhantomData<M>,
30}
31
32pub type SyncParallelSplitter<S, const BODY_SIZE: usize = DEFAULT_BODY_SIZE> =
34 GenericSyncParallelSplitter<S, PlainMode, BODY_SIZE>;
35
36#[cfg(feature = "encryption")]
38pub type EncryptedSyncParallelSplitter<S, const BODY_SIZE: usize = DEFAULT_BODY_SIZE> =
39 GenericSyncParallelSplitter<S, EncryptedMode, BODY_SIZE>;
40
41impl<S, M, const BODY_SIZE: usize> std::fmt::Debug for GenericSyncParallelSplitter<S, M, BODY_SIZE>
42where
43 S: SyncChunkPut<BODY_SIZE> + Send + Sync,
44 M: SplitMode,
45{
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 f.debug_struct("GenericSyncParallelSplitter")
48 .finish_non_exhaustive()
49 }
50}
51
52impl<S, M, const BODY_SIZE: usize> GenericSyncParallelSplitter<S, M, BODY_SIZE>
53where
54 S: SyncChunkPut<BODY_SIZE> + Send + Sync,
55 M: SplitMode + Send + Sync,
56{
57 pub const fn new(store: S) -> Self {
59 Self {
60 store,
61 _mode: PhantomData,
62 }
63 }
64
65 pub fn split<R: SyncReadAt + Sync>(&self, source: &R) -> Result<M::RootRef> {
67 const { super::constants::assert_valid_body_size::<BODY_SIZE>() };
68 let size = source.len();
69 let tree = TreeParams::<BODY_SIZE>::new(size);
70
71 if size == 0 {
72 return self.handle_empty();
73 }
74
75 let spans = compute_spans_inline(BODY_SIZE / M::REF_SIZE);
76
77 let level0_refs = self.create_data_chunks(source, &tree)?;
79
80 self.build_intermediate_levels(level0_refs, size, &spans)
82 }
83
84 pub fn into_store(self) -> S {
86 self.store
87 }
88
89 fn handle_empty(&self) -> Result<M::RootRef> {
90 M::process_empty::<BODY_SIZE, S>(&self.store)
91 }
92
93 fn create_data_chunks<R: SyncReadAt + Sync>(
94 &self,
95 source: &R,
96 tree: &TreeParams<BODY_SIZE>,
97 ) -> Result<Vec<M::RefBytes>> {
98 let data_chunks = tree.data_chunks();
99 let size = tree.size();
100
101 let results: Vec<Result<M::RefBytes>> = (0..data_chunks)
102 .into_par_iter()
103 .map(|i| {
104 let offset = i * BODY_SIZE as u64;
105 let chunk_size = ((size - offset) as usize).min(BODY_SIZE);
106
107 let mut buf = vec![0u8; chunk_size];
108 source
109 .read_at(offset, &mut buf)
110 .map_err(|e| FileError::Store(Box::new(e)))?;
111
112 let span = if i + 1 == data_chunks {
113 size - offset
114 } else {
115 BODY_SIZE as u64
116 };
117
118 let chunk_bytes = super::helpers::build_intermediate_payload(span, &buf);
119
120 let (chunk, ref_bytes) = M::prepare_chunk::<BODY_SIZE>(chunk_bytes)?;
121 self.put_chunk(chunk)?;
122 Ok(ref_bytes)
123 })
124 .collect();
125
126 results.into_iter().collect()
127 }
128
129 fn build_intermediate_levels(
130 &self,
131 mut refs: Vec<M::RefBytes>,
132 total_size: u64,
133 spans: &[u64; LEVEL_LIMIT],
134 ) -> Result<M::RootRef> {
135 let mut level = 1;
136
137 while refs.len() > 1 {
138 refs = self.build_level(&refs, level, total_size, spans)?;
139 level += 1;
140 }
141
142 M::extract_root(refs[0].as_ref())
144 }
145
146 fn build_level(
147 &self,
148 refs: &[M::RefBytes],
149 level: usize,
150 total_size: u64,
151 spans: &[u64; LEVEL_LIMIT],
152 ) -> Result<Vec<M::RefBytes>> {
153 let refs_per_chunk = M::refs_per_chunk(BODY_SIZE);
154 let chunks_at_level = refs.len().div_ceil(refs_per_chunk);
155 let max_span = spans[level] * BODY_SIZE as u64;
156
157 let results: Vec<Result<M::RefBytes>> = (0..chunks_at_level)
158 .into_par_iter()
159 .map(|i| {
160 let start = i * refs_per_chunk;
161 let end = (start + refs_per_chunk).min(refs.len());
162 let child_refs = &refs[start..end];
163
164 if child_refs.len() == 1 {
166 return Ok(child_refs[0].clone());
167 }
168
169 let span = if i + 1 == chunks_at_level {
170 total_size.saturating_sub(i as u64 * max_span)
171 } else {
172 max_span
173 };
174
175 let ref_data: Vec<u8> = child_refs
176 .iter()
177 .flat_map(|r| r.as_ref())
178 .copied()
179 .collect();
180 let chunk_bytes = super::helpers::build_intermediate_payload(span, &ref_data);
181
182 let (chunk, ref_bytes) = M::prepare_chunk::<BODY_SIZE>(chunk_bytes)?;
183 self.put_chunk(chunk)?;
184 Ok(ref_bytes)
185 })
186 .collect();
187
188 results.into_iter().collect()
189 }
190
191 fn put_chunk(&self, chunk: ContentChunk<BODY_SIZE>) -> Result<()> {
192 self.store.put(chunk.into()).map_err(FileError::store)
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199 use crate::file::sync_join;
200 use crate::store::MemoryStore;
201
202 fn split_and_store(
203 data: &[u8],
204 ) -> (crate::chunk::ChunkAddress, MemoryStore<DEFAULT_BODY_SIZE>) {
205 let store = MemoryStore::<DEFAULT_BODY_SIZE>::new();
206 let splitter = SyncParallelSplitter::new(store);
207 let root = splitter.split(&data).unwrap();
208 let store = splitter.into_store();
209 (root, store)
210 }
211
212 generate_plain_splitter_tests!(split_and_store);
213
214 #[test]
215 fn test_parallel_splitter_varying_data() {
216 let data: Vec<u8> = (0..DEFAULT_BODY_SIZE * 5 + 123)
217 .map(|i| (i % 256) as u8)
218 .collect();
219
220 let (root, store) = split_and_store(&data);
221
222 let (seq_root, _) = crate::file::sync_split::<DEFAULT_BODY_SIZE>(&data).unwrap();
223 assert_eq!(root, seq_root);
224
225 let recovered = sync_join(&store, root).unwrap();
226 assert_eq!(recovered, data);
227 }
228
229 #[cfg(feature = "encryption")]
230 mod encrypted {
231 use super::*;
232 use crate::file::{EncryptedSyncParallelSplitter, sync_join, sync_split_encrypted};
233 use crate::store::MemoryStore;
234
235 fn encrypted_split_and_store(
236 data: &[u8],
237 ) -> (
238 crate::chunk::encryption::EncryptedChunkRef,
239 MemoryStore<DEFAULT_BODY_SIZE>,
240 ) {
241 let store = MemoryStore::<DEFAULT_BODY_SIZE>::new();
242 let splitter = EncryptedSyncParallelSplitter::new(store);
243 let root_ref = splitter.split(&data).unwrap();
244 let store = splitter.into_store();
245 (root_ref, store)
246 }
247
248 generate_encrypted_splitter_tests!(encrypted_split_and_store);
249
250 #[test]
251 fn test_encrypted_parallel_matches_sequential() {
252 let data: Vec<u8> = (0..DEFAULT_BODY_SIZE * 5 + 123)
253 .map(|i| (i % 256) as u8)
254 .collect();
255
256 let (par_ref, par_store) = encrypted_split_and_store(&data);
257 let (seq_ref, seq_store) = sync_split_encrypted::<DEFAULT_BODY_SIZE>(&data).unwrap();
258
259 assert_eq!(par_store.len(), seq_store.len());
260
261 let par_recovered = sync_join(&par_store, par_ref).unwrap();
262 assert_eq!(par_recovered, data);
263
264 let seq_recovered = sync_join(&seq_store, seq_ref).unwrap();
265 assert_eq!(seq_recovered, data);
266 }
267
268 #[test]
269 fn test_encrypted_parallel_nondeterministic() {
270 let data = b"test determinism";
271 let (ref1, _) = encrypted_split_and_store(data);
272 let (ref2, _) = encrypted_split_and_store(data);
273
274 assert_ne!(ref1.address(), ref2.address());
276 }
277 }
278}