Skip to main content

nectar_primitives/file/
sync_splitter.rs

1//! File splitter for producing BMT chunks from data streams.
2//!
3//! Buffers data via `Write`, then delegates to `GenericSyncParallelSplitter`
4//! on `finish()` for parallel chunk hashing.
5
6use std::fmt;
7use std::io::{self, Write};
8use std::marker::PhantomData;
9
10use crate::bmt::DEFAULT_BODY_SIZE;
11
12use super::error::{FileError, Result};
13use super::mode::{PlainMode, SplitMode};
14use super::sync_splitter_parallel::GenericSyncParallelSplitter;
15use crate::store::SyncChunkPut;
16
17#[cfg(feature = "encryption")]
18use super::mode::EncryptedMode;
19
20/// Generic splitter parameterized by chunk mode.
21///
22/// Buffers data written via `Write` and delegates to `GenericSyncParallelSplitter`
23/// on `finish()` for parallel chunk hashing.
24pub struct GenericSyncSplitter<S, M: SplitMode, const BODY_SIZE: usize = DEFAULT_BODY_SIZE>
25where
26    S: SyncChunkPut<BODY_SIZE>,
27{
28    store: S,
29    span_length: u64,
30    buffer: Vec<u8>,
31    _mode: PhantomData<M>,
32}
33
34/// Plain (unencrypted) file splitter.
35pub type SyncSplitter<S, const BODY_SIZE: usize = DEFAULT_BODY_SIZE> =
36    GenericSyncSplitter<S, PlainMode, BODY_SIZE>;
37
38/// Encrypted file splitter.
39#[cfg(feature = "encryption")]
40pub type EncryptedSyncSplitter<S, const BODY_SIZE: usize = DEFAULT_BODY_SIZE> =
41    GenericSyncSplitter<S, EncryptedMode, BODY_SIZE>;
42
43impl<S, M, const BODY_SIZE: usize> fmt::Debug for GenericSyncSplitter<S, M, BODY_SIZE>
44where
45    S: SyncChunkPut<BODY_SIZE>,
46    M: SplitMode,
47{
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        f.debug_struct("GenericSyncSplitter")
50            .field("span_length", &self.span_length)
51            .field("length", &self.buffer.len())
52            .finish_non_exhaustive()
53    }
54}
55
56impl<S, M, const BODY_SIZE: usize> GenericSyncSplitter<S, M, BODY_SIZE>
57where
58    S: SyncChunkPut<BODY_SIZE>,
59    M: SplitMode,
60{
61    /// Create a splitter for data of known size.
62    pub fn new(store: S, span_length: u64) -> Self {
63        const { super::constants::assert_valid_body_size::<BODY_SIZE>() };
64
65        Self {
66            store,
67            span_length,
68            buffer: Vec::with_capacity(span_length.min(BODY_SIZE as u64 * 2) as usize),
69            _mode: PhantomData,
70        }
71    }
72
73    /// Bytes written so far.
74    pub const fn len(&self) -> u64 {
75        self.buffer.len() as u64
76    }
77
78    /// Whether any data has been written.
79    pub const fn is_empty(&self) -> bool {
80        self.buffer.is_empty()
81    }
82
83    /// Declared span length.
84    pub const fn span_length(&self) -> u64 {
85        self.span_length
86    }
87}
88
89impl<S, M, const BODY_SIZE: usize> GenericSyncSplitter<S, M, BODY_SIZE>
90where
91    S: SyncChunkPut<BODY_SIZE> + Send + Sync,
92    M: SplitMode + Send + Sync,
93{
94    /// Finalize and return the root reference and store.
95    pub fn finish(self) -> Result<(M::RootRef, S)> {
96        if self.buffer.len() as u64 != self.span_length {
97            return Err(FileError::SpanMismatch {
98                expected: self.span_length,
99                actual: self.buffer.len() as u64,
100            });
101        }
102
103        if self.buffer.is_empty() {
104            let root = M::process_empty::<BODY_SIZE, S>(&self.store)?;
105            return Ok((root, self.store));
106        }
107
108        let parallel = GenericSyncParallelSplitter::<S, M, BODY_SIZE>::new(self.store);
109        let root = parallel.split(&self.buffer)?;
110        let store = parallel.into_store();
111        Ok((root, store))
112    }
113}
114
115impl<S, M, const BODY_SIZE: usize> Write for GenericSyncSplitter<S, M, BODY_SIZE>
116where
117    S: SyncChunkPut<BODY_SIZE>,
118    M: SplitMode,
119{
120    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
121        let remaining = self.span_length.saturating_sub(self.buffer.len() as u64) as usize;
122        let to_write = buf.len().min(remaining);
123        if to_write == 0 && !buf.is_empty() {
124            return Err(io::Error::other(
125                FileError::WritePastSpan {
126                    span: self.span_length,
127                    written: self.span_length + 1,
128                }
129                .to_string(),
130            ));
131        }
132        self.buffer.extend_from_slice(&buf[..to_write]);
133        Ok(to_write)
134    }
135
136    fn flush(&mut self) -> io::Result<()> {
137        Ok(())
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use super::*;
144    use crate::store::MemoryStore;
145
146    fn split_and_store(
147        data: &[u8],
148    ) -> (crate::chunk::ChunkAddress, MemoryStore<DEFAULT_BODY_SIZE>) {
149        let store = MemoryStore::<DEFAULT_BODY_SIZE>::new();
150        let mut splitter = SyncSplitter::new(store, data.len() as u64);
151        splitter.write_all(data).unwrap();
152        splitter.finish().unwrap()
153    }
154
155    generate_plain_splitter_tests!(split_and_store);
156
157    #[test]
158    fn test_splitter_incremental_writes() {
159        let mut data = vec![0u8; DEFAULT_BODY_SIZE * 2 + 100];
160        rand::RngExt::fill(&mut rand::rng(), &mut data);
161        let store = MemoryStore::<DEFAULT_BODY_SIZE>::new();
162        let mut splitter = SyncSplitter::new(store, data.len() as u64);
163
164        for chunk in data.chunks(100) {
165            splitter.write_all(chunk).unwrap();
166        }
167        let (root, store) = splitter.finish().unwrap();
168
169        assert_eq!(store.len(), 4);
170        assert!(!root.is_zero());
171    }
172
173    #[test]
174    fn test_splitter_deterministic() {
175        let data = vec![0x56; DEFAULT_BODY_SIZE * 3];
176
177        let (root1, _) = {
178            let store = MemoryStore::<DEFAULT_BODY_SIZE>::new();
179            let mut splitter = SyncSplitter::new(store, data.len() as u64);
180            splitter.write_all(&data).unwrap();
181            splitter.finish().unwrap()
182        };
183
184        let (root2, _) = {
185            let store = MemoryStore::<DEFAULT_BODY_SIZE>::new();
186            let mut splitter = SyncSplitter::new(store, data.len() as u64);
187            splitter.write_all(&data).unwrap();
188            splitter.finish().unwrap()
189        };
190
191        assert_eq!(root1, root2);
192    }
193
194    #[test]
195    fn test_splitter_write_past_span() {
196        let store = MemoryStore::<DEFAULT_BODY_SIZE>::new();
197        let mut splitter = SyncSplitter::new(store, 10);
198
199        let result = splitter.write_all(b"this is more than 10 bytes");
200        assert!(result.is_err());
201    }
202
203    #[test]
204    fn test_splitter_span_mismatch() {
205        let store = MemoryStore::<DEFAULT_BODY_SIZE>::new();
206        let mut splitter = SyncSplitter::new(store, 100);
207
208        splitter.write_all(b"short").unwrap();
209        let result = splitter.finish();
210
211        assert!(matches!(result, Err(FileError::SpanMismatch { .. })));
212    }
213
214    #[cfg(feature = "encryption")]
215    mod encrypted {
216        use super::*;
217
218        fn encrypted_split_and_store(
219            data: &[u8],
220        ) -> (
221            crate::chunk::encryption::EncryptedChunkRef,
222            MemoryStore<DEFAULT_BODY_SIZE>,
223        ) {
224            let store = MemoryStore::<DEFAULT_BODY_SIZE>::new();
225            let mut splitter = EncryptedSyncSplitter::new(store, data.len() as u64);
226            splitter.write_all(data).unwrap();
227            splitter.finish().unwrap()
228        }
229
230        generate_encrypted_splitter_tests!(encrypted_split_and_store);
231
232        #[test]
233        fn test_encrypted_splitter_write_past_span() {
234            let store = MemoryStore::<DEFAULT_BODY_SIZE>::new();
235            let mut splitter = EncryptedSyncSplitter::<_, DEFAULT_BODY_SIZE>::new(store, 10);
236
237            let result = splitter.write_all(b"this is more than 10 bytes");
238            assert!(result.is_err());
239        }
240
241        #[test]
242        fn test_encrypted_splitter_span_mismatch() {
243            let store = MemoryStore::<DEFAULT_BODY_SIZE>::new();
244            let mut splitter = EncryptedSyncSplitter::<_, DEFAULT_BODY_SIZE>::new(store, 100);
245
246            splitter.write_all(b"short").unwrap();
247            let result = splitter.finish();
248
249            assert!(matches!(result, Err(FileError::SpanMismatch { .. })));
250        }
251
252        #[test]
253        fn test_encrypted_differs_from_plaintext() {
254            let data = b"test data for encryption comparison";
255            let store = MemoryStore::<DEFAULT_BODY_SIZE>::new();
256            let mut splitter = SyncSplitter::new(store, data.len() as u64);
257            splitter.write_all(data).unwrap();
258            let (plain_root, _) = splitter.finish().unwrap();
259
260            let store = MemoryStore::<DEFAULT_BODY_SIZE>::new();
261            let mut enc_splitter = EncryptedSyncSplitter::new(store, data.len() as u64);
262            enc_splitter.write_all(data).unwrap();
263            let (enc_root, _) = enc_splitter.finish().unwrap();
264
265            assert_ne!(enc_root.address(), &plain_root);
266        }
267    }
268}