Skip to main content

sendcipher_core/
stream_encryptor.rs

1/* Created on 2025-10-13 */
2/* Copyright (c) 2025-2026 Youcef Lemsafer */
3/* SPDX-License-Identifier: MIT */
4use crate::chunking::*;
5use crate::crypto;
6use crate::crypto::CypherContext;
7use crate::crypto::*;
8use crate::error;
9use crate::parallel_mapper::*;
10use std::collections::BTreeMap;
11use std::sync::Arc;
12use std::sync::RwLock;
13use std::u64;
14
15pub struct StreamEncryptor<C: ChunkGenerator> {
16    /// Chunks generator
17    chunk_generator: C,
18    /// Encryption data
19    encryption_context: CypherContext,
20    /// The manifest (lists the chunks)
21    manifest: Manifest,
22    /// Temporary table of chunks
23    chunks: Arc<RwLock<BTreeMap<u64, ChunkDescriptor>>>,
24    /// Whether the encryption is finalized
25    is_finalized: bool,
26    /// Parallel mapper used for parallel encryption
27    par_mapper:
28        Option<DynParallelMapper<Chunk, Result<(u64, Blob, ChunkDescriptor), crate::error::Error>>>,
29}
30
31// Crate only constructors
32impl<C: ChunkGenerator> StreamEncryptor<C> {
33    pub(crate) fn new(
34        file_name: &str,
35        chunk_generator: C,
36        make_key_wrapper: impl FnOnce(&Vec<u8>) -> Result<AnyKeyWrapper, crate::error::Error>,
37    ) -> Result<Self, crate::error::Error> {
38        let manifest = Manifest::new(file_name.to_string(), 0)?;
39        let file_enc_ctx =
40            crypto::prepare_file_encryption(file_name, manifest.mfp(), make_key_wrapper)
41                .map_err(|e| error::Error::EncryptionError(e.to_string()))?;
42
43        let inst = Self {
44            chunk_generator: chunk_generator,
45            encryption_context: file_enc_ctx,
46            manifest,
47            chunks: Arc::new(RwLock::new(BTreeMap::<u64, ChunkDescriptor>::new())),
48            is_finalized: false,
49            par_mapper: None,
50        };
51        Ok(inst)
52    }
53}
54
55impl StreamEncryptor<RandomChunkGenerator> {
56    pub fn with_rand_chunks(
57        file_name: &str,
58        password: &str,
59        chunking_threshold: u64,
60        min_chunk_size: u64,
61        max_chunk_size: u64,
62    ) -> Result<Self, error::Error> {
63        let chunk_generator =
64            RandomChunkGenerator::new(chunking_threshold, min_chunk_size, max_chunk_size);
65        Self::new(file_name, chunk_generator, |k| {
66            Ok(AnyKeyWrapper::Argon2id(
67                Argon2idKeyWrapper::with_default_parameters(password, k)?,
68            ))
69        })
70    }
71
72    pub fn with_rand_chunks_seed(
73        file_name: &str,
74        password: &str,
75        chunking_threshold: u64,
76        min_chunk_size: u64,
77        max_chunk_size: u64,
78        seed: u128,
79    ) -> Result<Self, error::Error> {
80        let chunk_generator = RandomChunkGenerator::with_seed(
81            chunking_threshold,
82            min_chunk_size,
83            max_chunk_size,
84            seed,
85        );
86        Self::new(file_name, chunk_generator, |k| {
87            Ok(AnyKeyWrapper::Argon2id(
88                Argon2idKeyWrapper::with_default_parameters(password, k)?,
89            ))
90        })
91    }
92}
93
94impl<C: ChunkGenerator> StreamEncryptor<C> {
95    /// Processes given data.
96    ///
97    /// This is the main processing loop to be called repetitively until end of stream.
98    pub fn process_data(&mut self, data: &[u8]) -> Vec<Chunk> {
99        self.chunk_generator.process_data(data)
100    }
101
102    /// Finalizes the encryption
103    ///
104    /// Must be called when the stream is exhausted to get the last remaining chunks to encrypt.
105    pub fn on_end_of_data(&mut self) -> Vec<Chunk> {
106        let remaining_chunks = self.chunk_generator.signal_eos();
107        // Now we know the size so we put it in the manifest...
108        let file_size = self.chunk_generator.chunked_bytes_count();
109        self.manifest.set_file_size(file_size);
110        remaining_chunks
111    }
112
113    /// Returns the identifier of the algorithm to use for computing chunk checksums
114    pub fn chunk_hash_algorithm(&self) -> ChecksumAlgorithm {
115        self.manifest.checksum_algorithm()
116    }
117
118    /// Returns chunk encryption context
119    pub(crate) fn get_encryption_context(
120        &self,
121        chunk: &Chunk,
122    ) -> Result<CypherContext, crate::error::Error> {
123        Self::derive_chunk_encryption_context(&self.encryption_context, chunk.index())
124    }
125
126    /// Returns the given chunk as encrypted data
127    pub fn encrypt_chunk(&self, chunk: &Chunk) -> Result<Blob, error::Error> {
128        //println!("StreamEncryptor::encrypt_chunk, chunk {}, data: {:?}", chunk.index(), &chunk.data()[..128.min(chunk.data().len())]);
129        let (blob, checksum) = Self::do_encrypt_chunk(
130            &self.get_encryption_context(chunk)?,
131            chunk.data(),
132            self.chunk_hash_algorithm(),
133        )?;
134
135        let span = chunk.span();
136        self.insert_chunk_descriptor(
137            chunk.index(),
138            ChunkDescriptor::new("".to_string(), checksum, span.start(), span.size()),
139        )?;
140        Ok(blob)
141    }
142
143    fn insert_chunk_descriptor(
144        &self,
145        chunk_index: u64,
146        chunk_descriptor: ChunkDescriptor,
147    ) -> Result<(), error::Error> {
148        let mut chunks = self.chunks.write().unwrap();
149        let opt_value = chunks.get_mut(&chunk_index);
150        match opt_value {
151            Some(_) => Err(crate::error::Error::LogicError(
152                "Chunk already inserted".to_string(),
153            )),
154            None => {
155                chunks.insert(chunk_index, chunk_descriptor);
156                Ok(())
157            }
158        }
159    }
160
161    /// Returns the given chunks as encrypted data
162    ///
163    /// @pre c.is_ready() for all c in chunks
164    pub fn encrypt_chunks(&self, chunks: &Vec<Chunk>) -> Result<Vec<(u64, Blob)>, error::Error> {
165        chunks
166            .iter()
167            .map(|chunk| {
168                let data = self.encrypt_chunk(chunk)?;
169                Ok((chunk.index(), data))
170            })
171            .collect()
172    }
173
174    fn update_mapper(&mut self, max_threads: u32) {
175        if self.par_mapper.is_some()
176            && self.par_mapper.as_ref().unwrap().concurrency() == max_threads
177        {
178            return;
179        }
180        let checksum_algo = self.manifest.checksum_algorithm();
181        let file_enc_ctx_clone = self.encryption_context.clone();
182        self.par_mapper = Some(DynParallelMapper::<
183            Chunk,
184            Result<(u64, Blob, ChunkDescriptor), crate::error::Error>,
185        >::new(
186            max_threads,
187            Box::new(move |chunk| {
188                let encryption_context =
189                    Self::derive_chunk_encryption_context(&file_enc_ctx_clone, chunk.index())?;
190                let (blob, checksum) =
191                    Self::do_encrypt_chunk(&encryption_context, chunk.data(), checksum_algo)?;
192                let span = chunk.span();
193                Ok((
194                    chunk.index(),
195                    blob,
196                    ChunkDescriptor::new("".to_string(), checksum, span.start(), span.size()),
197                ))
198            }),
199        ));
200    }
201
202    pub fn parallel_encrypt_chunks(
203        &mut self,
204        max_threads: u32,
205        chunks: &Vec<Chunk>,
206    ) -> Result<Vec<(u64, Blob)>, error::Error> {
207        self.update_mapper(max_threads);
208        let results = self.par_mapper.as_mut().unwrap().process_all(chunks);
209
210        let mut result = Vec::with_capacity(results.len());
211        for res in results {
212            if res.is_ok() {
213                let (chunk_index, blob, chunk_desc) = res.unwrap();
214                result.push((chunk_index, blob));
215                self.insert_chunk_descriptor(chunk_index, chunk_desc)?;
216            } else {
217                return Err(res.err().unwrap());
218            }
219        }
220        Ok(result)
221    }
222
223    fn update_chunk_id(&self, chunk_index: u64, chunk_id: &str) -> Result<(), error::Error> {
224        let mut chunks = self.chunks.write().unwrap();
225        let opt_value = chunks.get_mut(&chunk_index);
226        match opt_value {
227            Some(chunk_desc) => Ok(chunk_desc.set_id(chunk_id.to_string())),
228            None => Err(crate::error::Error::LogicError(
229                "Chunk not found".to_string(),
230            )),
231        }
232    }
233
234    ///
235    fn derive_chunk_encryption_context(
236        main_encryption_context: &CypherContext,
237        chunk_index: u64,
238    ) -> Result<CypherContext, crate::error::Error> {
239        let mut chunk_encryption_context = main_encryption_context.clone();
240        Ok(chunk_encryption_context
241            .setup_chunk_encryption(chunk_index)?
242            .clone())
243    }
244
245    /// Returns encrypted data resulting from encryption of given chunk data.
246    /// Advanced! Must remain crate only, use at your own risk.
247    ///
248    /// @param[in] encryption_context encryption context (master key, params, etc..)
249    /// @param[in] chunk_index index of the chunk
250    /// @param[in] chunk_data data to be encrypted
251    /// @param[in] span offset and length the chunk corresponds to in the untransformed file
252    /// @return A couple where first element is the encrypted blob and the second
253    /// is the checksum of the encrypted blob
254    pub(crate) fn do_encrypt_chunk(
255        encryption_context: &CypherContext,
256        chunk_data: &[u8],
257        checksum_algorithm: ChecksumAlgorithm,
258    ) -> Result<(Blob, Vec<u8>), error::Error> {
259        //println!("StreamEncryptor::do_encrypt_chunk called on data = {:?}", &chunk_data[..128.min(chunk_data.len())]);
260        let encrypted_chunk = crypto::encrypt_to_blob(chunk_data, &mut encryption_context.clone())
261            .map_err(|e| error::Error::Any(e.to_string()))?;
262        let mut checksum_computer = checksum_algorithm.get_checksum_computer();
263        checksum_computer.update(encrypted_chunk.data());
264        Ok((encrypted_chunk, checksum_computer.finalize()))
265    }
266
267    /// Associates a string id to an encrypted chunk identified by its index
268    pub fn register_encrypted_chunk(
269        &self,
270        chunk_index: u64,
271        id: &str,
272    ) -> Result<(), crate::error::Error> {
273        self.update_chunk_id(chunk_index, id)
274    }
275
276    ///
277    pub(crate) fn register_encrypted_chunk_descriptor(
278        &mut self,
279        chunk_index: u64,
280        chunk_desc: ChunkDescriptor,
281    ) {
282        self.chunks.write().unwrap().insert(chunk_index, chunk_desc);
283    }
284
285    /// Finalizes the encryption and returns the encrypted manifest
286    /// @pre on_end_of_data has been called and all chunks have been encrypted and registered
287    pub fn finalize(&mut self) -> Result<Blob, crate::error::Error> {
288        if self.is_finalized {
289            return Err(error::Error::LogicError(
290                "Manifest has already been finalized".to_string(),
291            ));
292        }
293        let dst = self.manifest.chunks_mut();
294        {
295            let mut src = self.chunks.write().unwrap();
296            let src_len = src.len();
297            *dst = Vec::with_capacity(src_len);
298            dst.resize(
299                src_len,
300                ChunkDescriptor::new("".to_string(), vec![], u64::MAX, u64::MAX),
301            );
302            for idx in 0..src_len {
303                let opt_chunk_desc = src.remove(&(idx as u64));
304                match opt_chunk_desc {
305                    Some(chunk_desc) => dst[idx] = chunk_desc,
306                    None => {
307                        return Err(error::Error::LogicError(format!(
308                            "Missing chunk descriptor for chunk {}",
309                            idx
310                        )));
311                    }
312                }
313            }
314        }
315
316        let manifest_bytes = self.manifest.to_bytes()?;
317        let blob = crypto::encrypt_to_blob(
318            &manifest_bytes,
319            &mut self.encryption_context.clone().setup_manifest_encryption(),
320        )?;
321        self.is_finalized = true;
322        Ok(blob)
323    }
324
325    /// Gets the id assigned to chunk at index chunk_index
326    /// @pre chunk of index chunk_index as been registered by calling register_encrypted_chunk
327    pub fn get_registered_chunk_id(&self, chunk_index: u64) -> Result<String, error::Error> {
328        if self.is_finalized {
329            if chunk_index >= self.manifest.chunks().len() as u64 {
330                return Err(error::Error::Any(format!(
331                    "Index {} is out of bounds",
332                    chunk_index
333                )));
334            }
335            return Ok(self.manifest.chunks()[chunk_index as usize].id().clone());
336        }
337        let chunks = self.chunks.read().unwrap();
338        let entry = chunks.get_key_value(&chunk_index);
339        if entry.is_none() {
340            return Err(error::Error::Any(format!(
341                "Failed to get the id of the chunk at index {}",
342                chunk_index
343            )));
344        }
345        Ok(entry.unwrap().1.id().clone())
346    }
347
348    /// Returns the total number of chunks so far
349    /// Increases as we keep piling chunks but may decrease e.g. on finalize if merging last chunks
350    pub fn get_chunks_count(&self) -> u64 {
351        self.chunk_generator.chunks_count()
352    }
353
354    /// Returns the number of registered encrypted chunks i.e.
355    /// the number of chunks on which register_encrypted_chunk has
356    /// been called.
357    pub fn get_registered_chunks_count(&self) -> u64 {
358        if self.is_finalized {
359            return self.manifest.chunks_count() as u64;
360        }
361        return self.chunks.read().unwrap().len() as u64;
362    }
363
364    /// Returns the chunk ids
365    /// @pre All chunks have been registered and finalize() has been called
366    pub fn get_chunk_ids(&self) -> Vec<String> {
367        self.manifest
368            .chunks()
369            .iter()
370            .map(|c| c.id().clone())
371            .collect()
372    }
373}
374
375#[cfg(test)]
376pub(crate) mod tests {
377
378    use super::*;
379    use crate::lcg::*;
380    use crate::test_utils::*;
381
382    fn create_encryptor(
383        chunk_generator: RandomChunkGenerator,
384    ) -> StreamEncryptor<RandomChunkGenerator> {
385        StreamEncryptor::new("whatever_file_name", chunk_generator, |k| {
386            Ok(AnyKeyWrapper::Argon2id(Argon2idKeyWrapper::new(
387                "whatever!password",
388                &create_argon2id_params_for_tests(),
389                k,
390            )?))
391        })
392        .unwrap()
393    }
394
395    #[test]
396    fn test_chunking() {
397        let mut start = std::time::Instant::now();
398        let min_chunk_size = 512 * 1024u64;
399        let max_chunk_size = 2 * 1024 * 1024u64;
400        let chunk_generator =
401            RandomChunkGenerator::with_seed(0, min_chunk_size, max_chunk_size, 1u128);
402        let mut encryptor = create_encryptor(chunk_generator);
403
404        log::debug!("Encrypter construction: {:?}", start.elapsed());
405
406        start = std::time::Instant::now();
407        let mut lcg = Lcg::new(LCG_PARAMS[4].0, LCG_PARAMS[4].1);
408        let num_bytes = 5 * 1024 * 1024;
409        let mut data = Vec::with_capacity(num_bytes);
410        let start_filling = std::time::Instant::now();
411        for _ in 0..num_bytes / std::mem::size_of::<u64>() {
412            data.extend_from_slice(&lcg.next().to_le_bytes());
413        }
414        log::debug!(
415            "Data allocation/filling: {:?} (size: {}), filling only: {:?}",
416            start.elapsed(),
417            data.len(),
418            start_filling.elapsed()
419        );
420
421        start = std::time::Instant::now();
422        let mut chunks = encryptor.process_data(&data);
423        log::debug!("Data processing: {:?}", start.elapsed());
424        start = std::time::Instant::now();
425        chunks.append(&mut encryptor.on_end_of_data());
426        log::debug!("Finalization: {:?}", start.elapsed());
427
428        let data_size: usize = chunks.iter().map(|c| c.size() as usize).sum();
429        assert_eq!(data.len(), data_size);
430    }
431
432    #[test]
433    fn test_encryption() {
434        // Set up
435        let mut start = std::time::Instant::now();
436        let min_chunk_size = 8 * 1024 * 1024u64;
437        let max_chunk_size = 24 * 1024 * 1024u64;
438        let chunk_generator =
439            RandomChunkGenerator::with_seed(0, min_chunk_size, max_chunk_size, 1u128);
440        let mut encryptor = create_encryptor(chunk_generator);
441
442        let mut lcg = Lcg::new(LCG_PARAMS[4].0, LCG_PARAMS[4].1);
443        let num_bytes = 5 * 1024 * 1024;
444        let mut data = Vec::with_capacity(num_bytes);
445        for _ in 0..num_bytes / std::mem::size_of::<u64>() {
446            data.extend_from_slice(&lcg.next().to_le_bytes());
447        }
448        log::debug!("Setup took {:?}", start.elapsed());
449
450        // Processing
451        start = std::time::Instant::now();
452        let mut chunks = Vec::new();
453        for _ in 0..10 {
454            // Simple test case, just reuse the same block again and again
455            chunks.extend(encryptor.process_data(&data));
456        }
457        chunks.extend(encryptor.on_end_of_data());
458        log::debug!("Chunking took {:?}", start.elapsed());
459        log::debug!("Number of chunks: {}", chunks.len());
460        log::debug!(
461            "Total size in bytes: {}",
462            encryptor.chunk_generator.chunked_bytes_count()
463        );
464
465        start = std::time::Instant::now();
466        chunks.iter().for_each(|chnk| {
467            encryptor.encrypt_chunk(chnk).unwrap();
468            encryptor.register_encrypted_chunk(chnk.index(), &chnk.index().to_string());
469        });
470
471        log::debug!("Encryption took {:?}", start.elapsed());
472
473        {
474            let chunks_in_encryptor = encryptor.chunks.read().unwrap();
475
476            assert_eq!(chunks.len(), chunks_in_encryptor.len());
477        }
478        encryptor.finalize().expect("Finalize should succeed");
479        {
480            let chunks_in_encryptor = encryptor.chunks.read().unwrap();
481
482            assert_eq!(0, chunks_in_encryptor.len());
483        }
484        assert_eq!(chunks.len(), encryptor.manifest.chunks_count());
485    }
486
487    #[test]
488    fn test_parallel_encryption() {
489        // Set up
490        let mut start = std::time::Instant::now();
491        let min_chunk_size = 8 * 1024 * 1024u64;
492        let max_chunk_size = 24 * 1024 * 1024u64;
493        let num_threads = 8u32;
494
495        let chunk_generator =
496            RandomChunkGenerator::with_seed(0, min_chunk_size, max_chunk_size, 3u128);
497        let mut encryptor = create_encryptor(chunk_generator);
498
499        let mut gcl = Lcg::new(LCG_PARAMS[4].0, LCG_PARAMS[4].1);
500        let num_bytes = 4 * 1024 * 1024;
501        let mut data = Vec::with_capacity(num_bytes);
502        log::debug!("Setup took {:?}", start.elapsed());
503
504        // Processing
505        let mut chunking_duration = core::time::Duration::ZERO;
506        let mut encryption_duration = core::time::Duration::ZERO;
507        let mut gcl_duration = core::time::Duration::ZERO;
508        let mut chunks = Vec::new();
509        let gcl_value_size = std::mem::size_of::<u64>();
510        for i in 0..256 {
511            // 256 * 4MB = 1GB
512            let mut k = 0;
513            start = std::time::Instant::now();
514            (0..num_bytes / gcl_value_size).for_each(|_| {
515                if i == 0 {
516                    data.extend_from_slice(&gcl.next().to_le_bytes());
517                } else {
518                    data[k..k + gcl_value_size].copy_from_slice(&gcl.next().to_le_bytes());
519                    k += gcl_value_size;
520                }
521            });
522            gcl_duration += start.elapsed();
523            start = std::time::Instant::now();
524            chunks.extend(encryptor.process_data(&data));
525            chunking_duration += start.elapsed();
526            start = std::time::Instant::now();
527            if chunks.len() >= num_threads as usize {
528                // Yes limit the number of chunks!! We are simulating the processing of a 1GB file!
529                let encrypted_chunks = encryptor
530                    .parallel_encrypt_chunks(num_threads, &chunks)
531                    .unwrap();
532                assert_eq!(encrypted_chunks.len(), chunks.len());
533                chunks.iter().try_for_each(|chnk| {
534                    encryptor.register_encrypted_chunk(chnk.index(), &chnk.index().to_string())
535                });
536                chunks.clear();
537            }
538            encryption_duration += start.elapsed();
539        }
540        start = std::time::Instant::now();
541        chunks.extend(encryptor.on_end_of_data());
542        chunking_duration += start.elapsed();
543        start = std::time::Instant::now();
544        let encrypted_chunks = encryptor.encrypt_chunks(&chunks).unwrap();
545        assert_eq!(encrypted_chunks.len(), chunks.len());
546        chunks.iter().try_for_each(|chnk| {
547            encryptor.register_encrypted_chunk(chnk.index(), &chnk.index().to_string())
548        });
549        chunks.clear();
550        encryption_duration += start.elapsed();
551        log::debug!("Chunking took {:?}", chunking_duration);
552        log::debug!(
553            "Encryption using up to {} threads took {:?}",
554            num_threads,
555            encryption_duration
556        );
557        log::debug!(
558            "Generating {} values using the LCG::next took {:?}",
559            encryptor.chunk_generator.chunked_bytes_count() as usize / gcl_value_size,
560            gcl_duration
561        );
562        log::debug!(
563            "Total size in bytes: {}",
564            encryptor.chunk_generator.chunked_bytes_count()
565        );
566
567        let chunks = &encryptor.chunks.read().unwrap();
568        assert_eq!(
569            chunks.len(),
570            encryptor.chunk_generator.chunks_count() as usize
571        );
572        let mismatch_pos = chunks
573            .keys()
574            .zip(0..encryptor.chunk_generator.chunked_bytes_count() - 1)
575            .position(|(&actual, expected)| actual != expected);
576        assert_eq!(
577            mismatch_pos, None,
578            "Chunk keys not sequential. First mismatch at position: {:?}",
579            mismatch_pos
580        );
581    }
582}