use crate::{
encrypt::encrypt_chunk,
shrink_data_map,
utils::{get_num_chunks, get_pad_key_and_iv, get_start_end_positions},
ChunkInfo, DataMap, EncryptedChunk, Error, Result,
};
use bytes::Bytes;
use xor_name::XorName;
#[derive(Debug, Clone)]
enum ChunkOrDataMap {
Chunk(EncryptedChunk),
DataMap(DataMap),
}
pub struct ChunkStream<'a, I> {
stream: &'a mut EncryptionStream<I>,
}
impl<'a, I> Iterator for ChunkStream<'a, I>
where
I: Iterator<Item = Bytes>,
{
type Item = Result<(XorName, Bytes)>;
fn next(&mut self) -> Option<Self::Item> {
match self.stream.next_internal() {
Some(Ok(ChunkOrDataMap::Chunk(chunk))) => {
let hash = XorName::from_content(&chunk.content);
let content = chunk.content;
Some(Ok((hash, content)))
}
Some(Ok(ChunkOrDataMap::DataMap(datamap))) => {
self.stream.final_datamap = Some(datamap);
None
}
Some(Err(e)) => Some(Err(e)),
None => None,
}
}
}
pub struct EncryptionStream<I> {
data_iter: I,
data_size: usize,
buffer: Vec<u8>,
chunks_processed: usize,
total_chunks: usize,
src_hashes: Vec<Option<XorName>>,
deferred_chunks: [Option<Bytes>; 2],
chunk_infos: Vec<Option<ChunkInfo>>,
input_complete: bool,
is_complete: bool,
final_datamap: Option<DataMap>,
shrinking_chunks: Vec<EncryptedChunk>,
shrinking_chunk_index: usize,
}
impl<I> EncryptionStream<I>
where
I: Iterator<Item = Bytes>,
{
fn new(data_size: usize, data_iter: I) -> Result<Self> {
if data_size < crate::MIN_ENCRYPTABLE_BYTES {
return Err(Error::Generic(format!(
"File too small for self-encryption! Required size at least {}",
crate::MIN_ENCRYPTABLE_BYTES
)));
}
let total_chunks = get_num_chunks(data_size);
if total_chunks < 3 {
return Err(Error::Generic(
"File must be large enough to generate at least 3 chunks".to_string(),
));
}
Ok(Self {
data_iter,
data_size,
buffer: Vec::new(),
chunks_processed: 0,
total_chunks,
src_hashes: vec![None; total_chunks],
deferred_chunks: [None, None],
chunk_infos: vec![None; total_chunks],
input_complete: false,
is_complete: false,
final_datamap: None,
shrinking_chunks: Vec::new(),
shrinking_chunk_index: 0,
})
}
fn try_process_chunks(&mut self) -> Result<Option<ChunkOrDataMap>> {
while self.chunks_processed < self.total_chunks {
let chunk_index = self.chunks_processed;
let (chunk_start, chunk_end) = get_start_end_positions(self.data_size, chunk_index);
let chunk_size = chunk_end - chunk_start;
let buffer_start = chunk_start.saturating_sub(self.get_processed_bytes());
if self.buffer.len() >= buffer_start + chunk_size || self.input_complete {
let actual_chunk_size = if self.input_complete {
std::cmp::min(chunk_size, self.buffer.len().saturating_sub(buffer_start))
} else {
chunk_size
};
if actual_chunk_size == 0 {
break;
}
let chunk_data = if buffer_start + actual_chunk_size <= self.buffer.len() {
Bytes::from(
self.buffer[buffer_start..buffer_start + actual_chunk_size].to_vec(),
)
} else {
break; };
let src_hash = XorName::from_content(&chunk_data);
self.src_hashes[chunk_index] = Some(src_hash);
self.chunk_infos[chunk_index] = Some(ChunkInfo {
index: chunk_index,
dst_hash: XorName::from_content(&[]), src_hash,
src_size: chunk_data.len(),
});
if chunk_index < 2 {
self.deferred_chunks[chunk_index] = Some(chunk_data);
} else if self.can_encrypt_chunk(chunk_index) {
let encrypted_chunk = self.encrypt_chunk(chunk_index, chunk_data)?;
self.chunks_processed += 1;
if buffer_start + actual_chunk_size <= self.buffer.len() {
let _ = self
.buffer
.drain(buffer_start..buffer_start + actual_chunk_size);
}
return Ok(Some(ChunkOrDataMap::Chunk(encrypted_chunk)));
}
self.chunks_processed += 1;
if buffer_start + actual_chunk_size <= self.buffer.len() {
let _ = self
.buffer
.drain(buffer_start..buffer_start + actual_chunk_size);
}
} else {
break;
}
}
if self.input_complete && self.chunks_processed >= self.total_chunks {
return self.finalize_encryption();
}
Ok(None)
}
fn get_processed_bytes(&self) -> usize {
let mut processed = 0;
for i in 0..self.chunks_processed {
let (start, end) = get_start_end_positions(self.data_size, i);
processed += end - start;
}
processed
}
fn can_encrypt_chunk(&self, chunk_index: usize) -> bool {
if chunk_index < 2 {
self.src_hashes.iter().all(|h| h.is_some())
} else {
let (n1, n2) = crate::utils::get_n_1_n_2(chunk_index, self.total_chunks);
self.src_hashes[chunk_index].is_some()
&& self.src_hashes[n1].is_some()
&& self.src_hashes[n2].is_some()
}
}
fn encrypt_chunk(&mut self, chunk_index: usize, chunk_data: Bytes) -> Result<EncryptedChunk> {
let mut src_hashes = vec![XorName::from_content(&[]); self.total_chunks];
for (i, hash_opt) in self.src_hashes.iter().enumerate() {
if let Some(hash) = hash_opt {
src_hashes[i] = *hash;
}
}
let pki = get_pad_key_and_iv(chunk_index, &src_hashes);
let encrypted_content = encrypt_chunk(chunk_data, pki)?;
let dst_hash = XorName::from_content(&encrypted_content);
if let Some(chunk_info) = &mut self.chunk_infos[chunk_index] {
chunk_info.dst_hash = dst_hash;
}
Ok(EncryptedChunk {
content: encrypted_content,
})
}
fn finalize_encryption(&mut self) -> Result<Option<ChunkOrDataMap>> {
if self.is_complete {
return Ok(None);
}
let src_hashes: Result<Vec<XorName>> = self
.src_hashes
.iter()
.enumerate()
.map(|(i, h)| {
h.ok_or_else(|| Error::Generic(format!("Missing source hash for chunk {i}")))
})
.collect();
let src_hashes = src_hashes?;
for chunk_index in 0..2.min(self.total_chunks) {
if let Some(chunk_data) = self.deferred_chunks[chunk_index].take() {
let pki = get_pad_key_and_iv(chunk_index, &src_hashes);
let encrypted_content = encrypt_chunk(chunk_data, pki)?;
let dst_hash = XorName::from_content(&encrypted_content);
if let Some(chunk_info) = &mut self.chunk_infos[chunk_index] {
chunk_info.dst_hash = dst_hash;
}
return Ok(Some(ChunkOrDataMap::Chunk(EncryptedChunk {
content: encrypted_content,
})));
}
}
let mut final_chunk_infos = Vec::new();
for chunk_info_opt in &self.chunk_infos {
if let Some(chunk_info) = chunk_info_opt {
final_chunk_infos.push(chunk_info.clone());
} else {
return Err(Error::Generic("Missing chunk info".to_string()));
}
}
final_chunk_infos.sort_by_key(|info| info.index);
let data_map = DataMap::new(final_chunk_infos);
let (shrunk_map, shrink_chunks) = shrink_data_map(data_map, |_hash, _content| {
Ok(())
})?;
self.shrinking_chunks = shrink_chunks;
self.final_datamap = Some(shrunk_map);
if !self.shrinking_chunks.is_empty()
&& self.shrinking_chunk_index < self.shrinking_chunks.len()
{
let chunk = &self.shrinking_chunks[self.shrinking_chunk_index];
self.shrinking_chunk_index += 1;
return Ok(Some(ChunkOrDataMap::Chunk(chunk.clone())));
}
self.is_complete = true;
Ok(Some(ChunkOrDataMap::DataMap(
self.final_datamap.as_ref().unwrap().clone(),
)))
}
}
impl<I> EncryptionStream<I>
where
I: Iterator<Item = Bytes>,
{
fn next_internal(&mut self) -> Option<Result<ChunkOrDataMap>> {
if !self.shrinking_chunks.is_empty()
&& self.shrinking_chunk_index < self.shrinking_chunks.len()
{
let chunk = &self.shrinking_chunks[self.shrinking_chunk_index];
self.shrinking_chunk_index += 1;
if self.shrinking_chunk_index >= self.shrinking_chunks.len() {
self.is_complete = true;
}
return Some(Ok(ChunkOrDataMap::Chunk(chunk.clone())));
}
if self.is_complete {
if let Some(datamap) = &self.final_datamap {
return Some(Ok(ChunkOrDataMap::DataMap(datamap.clone())));
}
return None;
}
match self.try_process_chunks() {
Ok(Some(result)) => return Some(Ok(result)),
Ok(None) => {} Err(e) => return Some(Err(e)),
}
if !self.input_complete {
match self.data_iter.next() {
Some(data) => {
self.buffer.extend_from_slice(&data);
match self.try_process_chunks() {
Ok(Some(result)) => Some(Ok(result)),
Ok(None) => self.next_internal(), Err(e) => Some(Err(e)),
}
}
None => {
self.input_complete = true;
match self.try_process_chunks() {
Ok(Some(result)) => Some(Ok(result)),
Ok(None) => None, Err(e) => Some(Err(e)),
}
}
}
} else {
None
}
}
}
impl<I> EncryptionStream<I> {
pub fn chunks(&mut self) -> ChunkStream<'_, I> {
ChunkStream { stream: self }
}
pub fn datamap(&self) -> Option<&DataMap> {
self.final_datamap.as_ref()
}
pub fn into_datamap(self) -> DataMap {
self.final_datamap
.expect("Encryption not complete - ensure chunks() iterator was fully consumed")
}
}
pub fn stream_encrypt<I>(data_size: usize, data_iter: I) -> Result<EncryptionStream<I>>
where
I: Iterator<Item = Bytes>,
{
EncryptionStream::new(data_size, data_iter)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_helpers::random_bytes;
use std::collections::HashMap;
#[test]
fn test_stream_encrypt_basic() -> Result<()> {
let data_size = 50_000; let original_data = random_bytes(data_size);
let data_iter = original_data
.chunks(1024)
.map(|chunk| Bytes::from(chunk.to_vec()));
let mut encrypted_chunks = Vec::new();
let mut stream = stream_encrypt(data_size, data_iter)?;
for chunk_result in stream.chunks() {
let (hash, content) = chunk_result?;
encrypted_chunks.push((hash, content));
}
let data_map = stream
.datamap()
.expect("Should have completed with DataMap");
assert_ne!(data_map.len(), 0, "DataMap should have chunks");
assert_ne!(
encrypted_chunks.len(),
0,
"Should have yielded encrypted chunks"
);
Ok(())
}
#[test]
fn test_stream_encrypt_single_chunk() -> Result<()> {
let data_size = 10_000; let original_data = random_bytes(data_size);
let data_iter = std::iter::once(original_data);
let mut chunks = Vec::new();
let mut stream = stream_encrypt(data_size, data_iter)?;
for chunk_result in stream.chunks() {
let (hash, content) = chunk_result?;
chunks.push((hash, content));
}
assert!(!chunks.is_empty());
let _datamap = stream.datamap().expect("Should have DataMap");
Ok(())
}
#[test]
fn test_stream_encrypt_large_file() -> Result<()> {
let data_size = 5_000_000; let original_data = random_bytes(data_size);
let chunk_size = 64 * 1024; let data_iter = original_data
.chunks(chunk_size)
.map(|chunk| Bytes::from(chunk.to_vec()));
let mut encrypted_chunks = Vec::new();
let mut stream = stream_encrypt(data_size, data_iter)?;
for chunk_result in stream.chunks() {
let (hash, content) = chunk_result?;
encrypted_chunks.push((hash, content));
}
let _data_map = stream.datamap().expect("Should complete with DataMap");
assert!(
encrypted_chunks.len() > 1,
"Large file should produce multiple chunks"
);
Ok(())
}
#[test]
fn test_stream_encrypt_memory_efficiency() -> Result<()> {
let data_size = 10_000_000; let chunk_size = 8192;
let original_data = random_bytes(data_size);
let data_iter = original_data
.chunks(chunk_size)
.map(|chunk| Bytes::from(chunk.to_vec()));
let mut stream = stream_encrypt(data_size, data_iter)?;
let _max_buffer_size = 0;
let mut encrypted_count = 0;
for chunk_result in stream.chunks() {
let (_hash, _content) = chunk_result?;
encrypted_count += 1;
}
assert!(
encrypted_count > 0,
"Should yield encrypted chunks progressively"
);
Ok(())
}
#[test]
fn test_stream_encrypt_too_small() {
let data_size = 2; let data = vec![42u8; data_size];
let data_iter = std::iter::once(Bytes::from(data));
let result = stream_encrypt(data_size, data_iter);
assert!(
result.is_err(),
"Should reject files that are too small for self-encryption"
);
}
#[test]
fn test_stream_encrypt_consistency() -> Result<()> {
let data_size = 200_000;
let original_data = random_bytes(data_size);
let (_standard_data_map, standard_chunks) = crate::encrypt(original_data.clone())?;
let data_iter = original_data
.chunks(4096)
.map(|chunk| Bytes::from(chunk.to_vec()));
let mut streaming_chunks = Vec::new();
let mut stream = stream_encrypt(data_size, data_iter)?;
for chunk_result in stream.chunks() {
let (hash, content) = chunk_result?;
streaming_chunks.push((hash, content));
}
let _streaming_data_map = stream.datamap().expect("Should have DataMap");
let mut standard_storage = HashMap::new();
for chunk in standard_chunks {
let hash = XorName::from_content(&chunk.content);
let _ = standard_storage.insert(hash, chunk.content.to_vec());
}
let mut streaming_storage = HashMap::new();
for (hash, content) in streaming_chunks {
let _ = streaming_storage.insert(hash, content.to_vec());
}
assert_ne!(standard_storage.len(), 0, "Standard should produce chunks");
assert_ne!(
streaming_storage.len(),
0,
"Streaming should produce chunks"
);
Ok(())
}
}