1use crate::chunking::*;
5use crate::crypto;
6use crate::crypto::CypherContext;
7use crate::crypto::*;
8use crate::error;
9use crate::parallel_mapper::*;
10use std::collections::BTreeMap;
11use std::sync::Arc;
12use std::sync::RwLock;
13use std::u64;
14
15pub struct StreamEncryptor<C: ChunkGenerator> {
16 chunk_generator: C,
18 encryption_context: CypherContext,
20 manifest: Manifest,
22 chunks: Arc<RwLock<BTreeMap<u64, ChunkDescriptor>>>,
24 is_finalized: bool,
26 par_mapper:
28 Option<DynParallelMapper<Chunk, Result<(u64, Blob, ChunkDescriptor), crate::error::Error>>>,
29}
30
31impl<C: ChunkGenerator> StreamEncryptor<C> {
33 pub(crate) fn new(
34 file_name: &str,
35 chunk_generator: C,
36 make_key_wrapper: impl FnOnce(&Vec<u8>) -> Result<AnyKeyWrapper, crate::error::Error>,
37 ) -> Result<Self, crate::error::Error> {
38 let manifest = Manifest::new(file_name.to_string(), 0)?;
39 let file_enc_ctx =
40 crypto::prepare_file_encryption(file_name, manifest.mfp(), make_key_wrapper)
41 .map_err(|e| error::Error::EncryptionError(e.to_string()))?;
42
43 let inst = Self {
44 chunk_generator: chunk_generator,
45 encryption_context: file_enc_ctx,
46 manifest,
47 chunks: Arc::new(RwLock::new(BTreeMap::<u64, ChunkDescriptor>::new())),
48 is_finalized: false,
49 par_mapper: None,
50 };
51 Ok(inst)
52 }
53}
54
55impl StreamEncryptor<RandomChunkGenerator> {
56 pub fn with_rand_chunks(
57 file_name: &str,
58 password: &str,
59 chunking_threshold: u64,
60 min_chunk_size: u64,
61 max_chunk_size: u64,
62 ) -> Result<Self, error::Error> {
63 let chunk_generator =
64 RandomChunkGenerator::new(chunking_threshold, min_chunk_size, max_chunk_size);
65 Self::new(file_name, chunk_generator, |k| {
66 Ok(AnyKeyWrapper::Argon2id(
67 Argon2idKeyWrapper::with_default_parameters(password, k)?,
68 ))
69 })
70 }
71
72 pub fn with_rand_chunks_seed(
73 file_name: &str,
74 password: &str,
75 chunking_threshold: u64,
76 min_chunk_size: u64,
77 max_chunk_size: u64,
78 seed: u128,
79 ) -> Result<Self, error::Error> {
80 let chunk_generator = RandomChunkGenerator::with_seed(
81 chunking_threshold,
82 min_chunk_size,
83 max_chunk_size,
84 seed,
85 );
86 Self::new(file_name, chunk_generator, |k| {
87 Ok(AnyKeyWrapper::Argon2id(
88 Argon2idKeyWrapper::with_default_parameters(password, k)?,
89 ))
90 })
91 }
92}
93
94impl<C: ChunkGenerator> StreamEncryptor<C> {
95 pub fn process_data(&mut self, data: &[u8]) -> Vec<Chunk> {
99 self.chunk_generator.process_data(data)
100 }
101
102 pub fn on_end_of_data(&mut self) -> Vec<Chunk> {
106 let remaining_chunks = self.chunk_generator.signal_eos();
107 let file_size = self.chunk_generator.chunked_bytes_count();
109 self.manifest.set_file_size(file_size);
110 remaining_chunks
111 }
112
113 pub fn chunk_hash_algorithm(&self) -> ChecksumAlgorithm {
115 self.manifest.checksum_algorithm()
116 }
117
118 pub(crate) fn get_encryption_context(
120 &self,
121 chunk: &Chunk,
122 ) -> Result<CypherContext, crate::error::Error> {
123 Self::derive_chunk_encryption_context(&self.encryption_context, chunk.index())
124 }
125
126 pub fn encrypt_chunk(&self, chunk: &Chunk) -> Result<Blob, error::Error> {
128 let (blob, checksum) = Self::do_encrypt_chunk(
130 &self.get_encryption_context(chunk)?,
131 chunk.data(),
132 self.chunk_hash_algorithm(),
133 )?;
134
135 let span = chunk.span();
136 self.insert_chunk_descriptor(
137 chunk.index(),
138 ChunkDescriptor::new("".to_string(), checksum, span.start(), span.size()),
139 )?;
140 Ok(blob)
141 }
142
143 fn insert_chunk_descriptor(
144 &self,
145 chunk_index: u64,
146 chunk_descriptor: ChunkDescriptor,
147 ) -> Result<(), error::Error> {
148 let mut chunks = self.chunks.write().unwrap();
149 let opt_value = chunks.get_mut(&chunk_index);
150 match opt_value {
151 Some(_) => Err(crate::error::Error::LogicError(
152 "Chunk already inserted".to_string(),
153 )),
154 None => {
155 chunks.insert(chunk_index, chunk_descriptor);
156 Ok(())
157 }
158 }
159 }
160
161 pub fn encrypt_chunks(&self, chunks: &Vec<Chunk>) -> Result<Vec<(u64, Blob)>, error::Error> {
165 chunks
166 .iter()
167 .map(|chunk| {
168 let data = self.encrypt_chunk(chunk)?;
169 Ok((chunk.index(), data))
170 })
171 .collect()
172 }
173
174 fn update_mapper(&mut self, max_threads: u32) {
175 if self.par_mapper.is_some()
176 && self.par_mapper.as_ref().unwrap().concurrency() == max_threads
177 {
178 return;
179 }
180 let checksum_algo = self.manifest.checksum_algorithm();
181 let file_enc_ctx_clone = self.encryption_context.clone();
182 self.par_mapper = Some(DynParallelMapper::<
183 Chunk,
184 Result<(u64, Blob, ChunkDescriptor), crate::error::Error>,
185 >::new(
186 max_threads,
187 Box::new(move |chunk| {
188 let encryption_context =
189 Self::derive_chunk_encryption_context(&file_enc_ctx_clone, chunk.index())?;
190 let (blob, checksum) =
191 Self::do_encrypt_chunk(&encryption_context, chunk.data(), checksum_algo)?;
192 let span = chunk.span();
193 Ok((
194 chunk.index(),
195 blob,
196 ChunkDescriptor::new("".to_string(), checksum, span.start(), span.size()),
197 ))
198 }),
199 ));
200 }
201
202 pub fn parallel_encrypt_chunks(
203 &mut self,
204 max_threads: u32,
205 chunks: &Vec<Chunk>,
206 ) -> Result<Vec<(u64, Blob)>, error::Error> {
207 self.update_mapper(max_threads);
208 let results = self.par_mapper.as_mut().unwrap().process_all(chunks);
209
210 let mut result = Vec::with_capacity(results.len());
211 for res in results {
212 if res.is_ok() {
213 let (chunk_index, blob, chunk_desc) = res.unwrap();
214 result.push((chunk_index, blob));
215 self.insert_chunk_descriptor(chunk_index, chunk_desc)?;
216 } else {
217 return Err(res.err().unwrap());
218 }
219 }
220 Ok(result)
221 }
222
223 fn update_chunk_id(&self, chunk_index: u64, chunk_id: &str) -> Result<(), error::Error> {
224 let mut chunks = self.chunks.write().unwrap();
225 let opt_value = chunks.get_mut(&chunk_index);
226 match opt_value {
227 Some(chunk_desc) => Ok(chunk_desc.set_id(chunk_id.to_string())),
228 None => Err(crate::error::Error::LogicError(
229 "Chunk not found".to_string(),
230 )),
231 }
232 }
233
234 fn derive_chunk_encryption_context(
236 main_encryption_context: &CypherContext,
237 chunk_index: u64,
238 ) -> Result<CypherContext, crate::error::Error> {
239 let mut chunk_encryption_context = main_encryption_context.clone();
240 Ok(chunk_encryption_context
241 .setup_chunk_encryption(chunk_index)?
242 .clone())
243 }
244
245 pub(crate) fn do_encrypt_chunk(
255 encryption_context: &CypherContext,
256 chunk_data: &[u8],
257 checksum_algorithm: ChecksumAlgorithm,
258 ) -> Result<(Blob, Vec<u8>), error::Error> {
259 let encrypted_chunk = crypto::encrypt_to_blob(chunk_data, &mut encryption_context.clone())
261 .map_err(|e| error::Error::Any(e.to_string()))?;
262 let mut checksum_computer = checksum_algorithm.get_checksum_computer();
263 checksum_computer.update(encrypted_chunk.data());
264 Ok((encrypted_chunk, checksum_computer.finalize()))
265 }
266
267 pub fn register_encrypted_chunk(
269 &self,
270 chunk_index: u64,
271 id: &str,
272 ) -> Result<(), crate::error::Error> {
273 self.update_chunk_id(chunk_index, id)
274 }
275
276 pub(crate) fn register_encrypted_chunk_descriptor(
278 &mut self,
279 chunk_index: u64,
280 chunk_desc: ChunkDescriptor,
281 ) {
282 self.chunks.write().unwrap().insert(chunk_index, chunk_desc);
283 }
284
285 pub fn finalize(&mut self) -> Result<Blob, crate::error::Error> {
288 if self.is_finalized {
289 return Err(error::Error::LogicError(
290 "Manifest has already been finalized".to_string(),
291 ));
292 }
293 let dst = self.manifest.chunks_mut();
294 {
295 let mut src = self.chunks.write().unwrap();
296 let src_len = src.len();
297 *dst = Vec::with_capacity(src_len);
298 dst.resize(
299 src_len,
300 ChunkDescriptor::new("".to_string(), vec![], u64::MAX, u64::MAX),
301 );
302 for idx in 0..src_len {
303 let opt_chunk_desc = src.remove(&(idx as u64));
304 match opt_chunk_desc {
305 Some(chunk_desc) => dst[idx] = chunk_desc,
306 None => {
307 return Err(error::Error::LogicError(format!(
308 "Missing chunk descriptor for chunk {}",
309 idx
310 )));
311 }
312 }
313 }
314 }
315
316 let manifest_bytes = self.manifest.to_bytes()?;
317 let blob = crypto::encrypt_to_blob(
318 &manifest_bytes,
319 &mut self.encryption_context.clone().setup_manifest_encryption(),
320 )?;
321 self.is_finalized = true;
322 Ok(blob)
323 }
324
325 pub fn get_registered_chunk_id(&self, chunk_index: u64) -> Result<String, error::Error> {
328 if self.is_finalized {
329 if chunk_index >= self.manifest.chunks().len() as u64 {
330 return Err(error::Error::Any(format!(
331 "Index {} is out of bounds",
332 chunk_index
333 )));
334 }
335 return Ok(self.manifest.chunks()[chunk_index as usize].id().clone());
336 }
337 let chunks = self.chunks.read().unwrap();
338 let entry = chunks.get_key_value(&chunk_index);
339 if entry.is_none() {
340 return Err(error::Error::Any(format!(
341 "Failed to get the id of the chunk at index {}",
342 chunk_index
343 )));
344 }
345 Ok(entry.unwrap().1.id().clone())
346 }
347
348 pub fn get_chunks_count(&self) -> u64 {
351 self.chunk_generator.chunks_count()
352 }
353
354 pub fn get_registered_chunks_count(&self) -> u64 {
358 if self.is_finalized {
359 return self.manifest.chunks_count() as u64;
360 }
361 return self.chunks.read().unwrap().len() as u64;
362 }
363
364 pub fn get_chunk_ids(&self) -> Vec<String> {
367 self.manifest
368 .chunks()
369 .iter()
370 .map(|c| c.id().clone())
371 .collect()
372 }
373}
374
375#[cfg(test)]
376pub(crate) mod tests {
377
378 use super::*;
379 use crate::lcg::*;
380 use crate::test_utils::*;
381
382 fn create_encryptor(
383 chunk_generator: RandomChunkGenerator,
384 ) -> StreamEncryptor<RandomChunkGenerator> {
385 StreamEncryptor::new("whatever_file_name", chunk_generator, |k| {
386 Ok(AnyKeyWrapper::Argon2id(Argon2idKeyWrapper::new(
387 "whatever!password",
388 &create_argon2id_params_for_tests(),
389 k,
390 )?))
391 })
392 .unwrap()
393 }
394
395 #[test]
396 fn test_chunking() {
397 let mut start = std::time::Instant::now();
398 let min_chunk_size = 512 * 1024u64;
399 let max_chunk_size = 2 * 1024 * 1024u64;
400 let chunk_generator =
401 RandomChunkGenerator::with_seed(0, min_chunk_size, max_chunk_size, 1u128);
402 let mut encryptor = create_encryptor(chunk_generator);
403
404 log::debug!("Encrypter construction: {:?}", start.elapsed());
405
406 start = std::time::Instant::now();
407 let mut lcg = Lcg::new(LCG_PARAMS[4].0, LCG_PARAMS[4].1);
408 let num_bytes = 5 * 1024 * 1024;
409 let mut data = Vec::with_capacity(num_bytes);
410 let start_filling = std::time::Instant::now();
411 for _ in 0..num_bytes / std::mem::size_of::<u64>() {
412 data.extend_from_slice(&lcg.next().to_le_bytes());
413 }
414 log::debug!(
415 "Data allocation/filling: {:?} (size: {}), filling only: {:?}",
416 start.elapsed(),
417 data.len(),
418 start_filling.elapsed()
419 );
420
421 start = std::time::Instant::now();
422 let mut chunks = encryptor.process_data(&data);
423 log::debug!("Data processing: {:?}", start.elapsed());
424 start = std::time::Instant::now();
425 chunks.append(&mut encryptor.on_end_of_data());
426 log::debug!("Finalization: {:?}", start.elapsed());
427
428 let data_size: usize = chunks.iter().map(|c| c.size() as usize).sum();
429 assert_eq!(data.len(), data_size);
430 }
431
432 #[test]
433 fn test_encryption() {
434 let mut start = std::time::Instant::now();
436 let min_chunk_size = 8 * 1024 * 1024u64;
437 let max_chunk_size = 24 * 1024 * 1024u64;
438 let chunk_generator =
439 RandomChunkGenerator::with_seed(0, min_chunk_size, max_chunk_size, 1u128);
440 let mut encryptor = create_encryptor(chunk_generator);
441
442 let mut lcg = Lcg::new(LCG_PARAMS[4].0, LCG_PARAMS[4].1);
443 let num_bytes = 5 * 1024 * 1024;
444 let mut data = Vec::with_capacity(num_bytes);
445 for _ in 0..num_bytes / std::mem::size_of::<u64>() {
446 data.extend_from_slice(&lcg.next().to_le_bytes());
447 }
448 log::debug!("Setup took {:?}", start.elapsed());
449
450 start = std::time::Instant::now();
452 let mut chunks = Vec::new();
453 for _ in 0..10 {
454 chunks.extend(encryptor.process_data(&data));
456 }
457 chunks.extend(encryptor.on_end_of_data());
458 log::debug!("Chunking took {:?}", start.elapsed());
459 log::debug!("Number of chunks: {}", chunks.len());
460 log::debug!(
461 "Total size in bytes: {}",
462 encryptor.chunk_generator.chunked_bytes_count()
463 );
464
465 start = std::time::Instant::now();
466 chunks.iter().for_each(|chnk| {
467 encryptor.encrypt_chunk(chnk).unwrap();
468 encryptor.register_encrypted_chunk(chnk.index(), &chnk.index().to_string());
469 });
470
471 log::debug!("Encryption took {:?}", start.elapsed());
472
473 {
474 let chunks_in_encryptor = encryptor.chunks.read().unwrap();
475
476 assert_eq!(chunks.len(), chunks_in_encryptor.len());
477 }
478 encryptor.finalize().expect("Finalize should succeed");
479 {
480 let chunks_in_encryptor = encryptor.chunks.read().unwrap();
481
482 assert_eq!(0, chunks_in_encryptor.len());
483 }
484 assert_eq!(chunks.len(), encryptor.manifest.chunks_count());
485 }
486
487 #[test]
488 fn test_parallel_encryption() {
489 let mut start = std::time::Instant::now();
491 let min_chunk_size = 8 * 1024 * 1024u64;
492 let max_chunk_size = 24 * 1024 * 1024u64;
493 let num_threads = 8u32;
494
495 let chunk_generator =
496 RandomChunkGenerator::with_seed(0, min_chunk_size, max_chunk_size, 3u128);
497 let mut encryptor = create_encryptor(chunk_generator);
498
499 let mut gcl = Lcg::new(LCG_PARAMS[4].0, LCG_PARAMS[4].1);
500 let num_bytes = 4 * 1024 * 1024;
501 let mut data = Vec::with_capacity(num_bytes);
502 log::debug!("Setup took {:?}", start.elapsed());
503
504 let mut chunking_duration = core::time::Duration::ZERO;
506 let mut encryption_duration = core::time::Duration::ZERO;
507 let mut gcl_duration = core::time::Duration::ZERO;
508 let mut chunks = Vec::new();
509 let gcl_value_size = std::mem::size_of::<u64>();
510 for i in 0..256 {
511 let mut k = 0;
513 start = std::time::Instant::now();
514 (0..num_bytes / gcl_value_size).for_each(|_| {
515 if i == 0 {
516 data.extend_from_slice(&gcl.next().to_le_bytes());
517 } else {
518 data[k..k + gcl_value_size].copy_from_slice(&gcl.next().to_le_bytes());
519 k += gcl_value_size;
520 }
521 });
522 gcl_duration += start.elapsed();
523 start = std::time::Instant::now();
524 chunks.extend(encryptor.process_data(&data));
525 chunking_duration += start.elapsed();
526 start = std::time::Instant::now();
527 if chunks.len() >= num_threads as usize {
528 let encrypted_chunks = encryptor
530 .parallel_encrypt_chunks(num_threads, &chunks)
531 .unwrap();
532 assert_eq!(encrypted_chunks.len(), chunks.len());
533 chunks.iter().try_for_each(|chnk| {
534 encryptor.register_encrypted_chunk(chnk.index(), &chnk.index().to_string())
535 });
536 chunks.clear();
537 }
538 encryption_duration += start.elapsed();
539 }
540 start = std::time::Instant::now();
541 chunks.extend(encryptor.on_end_of_data());
542 chunking_duration += start.elapsed();
543 start = std::time::Instant::now();
544 let encrypted_chunks = encryptor.encrypt_chunks(&chunks).unwrap();
545 assert_eq!(encrypted_chunks.len(), chunks.len());
546 chunks.iter().try_for_each(|chnk| {
547 encryptor.register_encrypted_chunk(chnk.index(), &chnk.index().to_string())
548 });
549 chunks.clear();
550 encryption_duration += start.elapsed();
551 log::debug!("Chunking took {:?}", chunking_duration);
552 log::debug!(
553 "Encryption using up to {} threads took {:?}",
554 num_threads,
555 encryption_duration
556 );
557 log::debug!(
558 "Generating {} values using the LCG::next took {:?}",
559 encryptor.chunk_generator.chunked_bytes_count() as usize / gcl_value_size,
560 gcl_duration
561 );
562 log::debug!(
563 "Total size in bytes: {}",
564 encryptor.chunk_generator.chunked_bytes_count()
565 );
566
567 let chunks = &encryptor.chunks.read().unwrap();
568 assert_eq!(
569 chunks.len(),
570 encryptor.chunk_generator.chunks_count() as usize
571 );
572 let mismatch_pos = chunks
573 .keys()
574 .zip(0..encryptor.chunk_generator.chunked_bytes_count() - 1)
575 .position(|(&actual, expected)| actual != expected);
576 assert_eq!(
577 mismatch_pos, None,
578 "Chunk keys not sequential. First mismatch at position: {:?}",
579 mismatch_pos
580 );
581 }
582}