use crate::{
encrypt::encrypt_chunk,
shrink_data_map,
utils::{get_num_chunks, get_pad_key_and_nonce, get_start_end_positions},
ChunkInfo, DataMap, EncryptedChunk, Error, Result,
};
use bytes::Bytes;
use xor_name::XorName;
#[derive(Debug, Clone)]
enum ChunkOrDataMap {
Chunk(EncryptedChunk),
DataMap(DataMap),
}
pub struct ChunkStream<'a, I> {
stream: &'a mut EncryptionStream<I>,
}
impl<'a, I> Iterator for ChunkStream<'a, I>
where
I: Iterator<Item = Bytes>,
{
type Item = Result<(XorName, Bytes)>;
fn next(&mut self) -> Option<Self::Item> {
match self.stream.next_internal() {
Some(Ok(ChunkOrDataMap::Chunk(chunk))) => {
let hash = crate::hash::content_hash(&chunk.content);
let content = chunk.content;
Some(Ok((hash, content)))
}
Some(Ok(ChunkOrDataMap::DataMap(datamap))) => {
self.stream.final_datamap = Some(datamap);
None
}
Some(Err(e)) => Some(Err(e)),
None => None,
}
}
}
pub struct EncryptionStream<I> {
data_iter: I,
data_size: usize,
buffer: Vec<u8>,
chunks_processed: usize,
total_chunks: usize,
src_hashes: Vec<Option<XorName>>,
deferred_chunks: [Option<Bytes>; 2],
chunk_infos: Vec<Option<ChunkInfo>>,
input_complete: bool,
is_complete: bool,
final_datamap: Option<DataMap>,
shrinking_chunks: Vec<EncryptedChunk>,
shrinking_chunk_index: usize,
}
impl<I> EncryptionStream<I>
where
I: Iterator<Item = Bytes>,
{
fn new(data_size: usize, data_iter: I) -> Result<Self> {
if data_size < crate::MIN_ENCRYPTABLE_BYTES {
return Err(Error::Generic(format!(
"File too small for self-encryption! Required size at least {}",
crate::MIN_ENCRYPTABLE_BYTES
)));
}
let total_chunks = get_num_chunks(data_size);
if total_chunks < 3 {
return Err(Error::Generic(
"File must be large enough to generate at least 3 chunks".to_string(),
));
}
Ok(Self {
data_iter,
data_size,
buffer: Vec::new(),
chunks_processed: 0,
total_chunks,
src_hashes: vec![None; total_chunks],
deferred_chunks: [None, None],
chunk_infos: vec![None; total_chunks],
input_complete: false,
is_complete: false,
final_datamap: None,
shrinking_chunks: Vec::new(),
shrinking_chunk_index: 0,
})
}
fn try_process_chunks(&mut self) -> Result<Option<ChunkOrDataMap>> {
while self.chunks_processed < self.total_chunks {
let chunk_index = self.chunks_processed;
let (chunk_start, chunk_end) = get_start_end_positions(self.data_size, chunk_index);
let chunk_size = chunk_end - chunk_start;
let buffer_start = chunk_start.saturating_sub(self.get_processed_bytes());
if self.buffer.len() >= buffer_start + chunk_size || self.input_complete {
let actual_chunk_size = if self.input_complete {
std::cmp::min(chunk_size, self.buffer.len().saturating_sub(buffer_start))
} else {
chunk_size
};
if actual_chunk_size == 0 {
break;
}
let chunk_data = if buffer_start + actual_chunk_size <= self.buffer.len() {
Bytes::from(
self.buffer[buffer_start..buffer_start + actual_chunk_size].to_vec(),
)
} else {
break; };
let src_hash = crate::hash::content_hash(&chunk_data);
let entry = self.src_hashes.get_mut(chunk_index).ok_or_else(|| {
Error::Generic(format!(
"chunk index {chunk_index} out of bounds for src_hashes"
))
})?;
*entry = Some(src_hash);
let info_entry = self.chunk_infos.get_mut(chunk_index).ok_or_else(|| {
Error::Generic(format!(
"chunk index {chunk_index} out of bounds for chunk_infos"
))
})?;
*info_entry = Some(ChunkInfo {
index: chunk_index,
dst_hash: crate::hash::content_hash(&[]), src_hash,
src_size: chunk_data.len(),
});
if chunk_index < 2 {
let deferred_entry =
self.deferred_chunks.get_mut(chunk_index).ok_or_else(|| {
Error::Generic(format!(
"chunk index {chunk_index} out of bounds for deferred_chunks"
))
})?;
*deferred_entry = Some(chunk_data);
} else if self.can_encrypt_chunk(chunk_index) {
let encrypted_chunk = self.encrypt_chunk(chunk_index, chunk_data)?;
self.chunks_processed += 1;
if buffer_start + actual_chunk_size <= self.buffer.len() {
let _ = self
.buffer
.drain(buffer_start..buffer_start + actual_chunk_size);
}
return Ok(Some(ChunkOrDataMap::Chunk(encrypted_chunk)));
}
self.chunks_processed += 1;
if buffer_start + actual_chunk_size <= self.buffer.len() {
let _ = self
.buffer
.drain(buffer_start..buffer_start + actual_chunk_size);
}
} else {
break;
}
}
if self.input_complete && self.chunks_processed >= self.total_chunks {
return self.finalize_encryption();
}
Ok(None)
}
fn get_processed_bytes(&self) -> usize {
let mut processed = 0;
for i in 0..self.chunks_processed {
let (start, end) = get_start_end_positions(self.data_size, i);
processed += end - start;
}
processed
}
fn can_encrypt_chunk(&self, chunk_index: usize) -> bool {
if chunk_index < 2 {
self.src_hashes.iter().all(|h| h.is_some())
} else {
let Ok((n1, n2)) = crate::utils::get_n_1_n_2(chunk_index, self.total_chunks) else {
return false;
};
self.src_hashes
.get(chunk_index)
.and_then(|h| h.as_ref())
.is_some()
&& self.src_hashes.get(n1).and_then(|h| h.as_ref()).is_some()
&& self.src_hashes.get(n2).and_then(|h| h.as_ref()).is_some()
}
}
fn encrypt_chunk(&mut self, chunk_index: usize, chunk_data: Bytes) -> Result<EncryptedChunk> {
let mut src_hashes = vec![crate::hash::content_hash(&[]); self.total_chunks];
for (i, hash_opt) in self.src_hashes.iter().enumerate() {
if let Some(hash) = hash_opt {
let entry = src_hashes.get_mut(i).ok_or_else(|| {
Error::Generic(format!("index {i} out of bounds for src_hashes"))
})?;
*entry = *hash;
}
}
let pki = get_pad_key_and_nonce(chunk_index, &src_hashes, 0)?;
let encrypted_content = encrypt_chunk(chunk_data, pki)?;
let dst_hash = crate::hash::content_hash(&encrypted_content);
if let Some(chunk_info) = self
.chunk_infos
.get_mut(chunk_index)
.and_then(|c| c.as_mut())
{
chunk_info.dst_hash = dst_hash;
}
Ok(EncryptedChunk {
content: encrypted_content,
})
}
fn finalize_encryption(&mut self) -> Result<Option<ChunkOrDataMap>> {
if self.is_complete {
return Ok(None);
}
let src_hashes: Result<Vec<XorName>> = self
.src_hashes
.iter()
.enumerate()
.map(|(i, h)| {
h.ok_or_else(|| Error::Generic(format!("Missing source hash for chunk {i}")))
})
.collect();
let src_hashes = src_hashes?;
for chunk_index in 0..2.min(self.total_chunks) {
let chunk_data = self
.deferred_chunks
.get_mut(chunk_index)
.and_then(|entry| entry.take());
if let Some(chunk_data) = chunk_data {
let pki = get_pad_key_and_nonce(chunk_index, &src_hashes, 0)?;
let encrypted_content = encrypt_chunk(chunk_data, pki)?;
let dst_hash = crate::hash::content_hash(&encrypted_content);
if let Some(chunk_info) = self
.chunk_infos
.get_mut(chunk_index)
.and_then(|c| c.as_mut())
{
chunk_info.dst_hash = dst_hash;
}
return Ok(Some(ChunkOrDataMap::Chunk(EncryptedChunk {
content: encrypted_content,
})));
}
}
let mut final_chunk_infos = Vec::new();
for chunk_info_opt in &self.chunk_infos {
if let Some(chunk_info) = chunk_info_opt {
final_chunk_infos.push(chunk_info.clone());
} else {
return Err(Error::Generic("Missing chunk info".to_string()));
}
}
final_chunk_infos.sort_by_key(|info| info.index);
let data_map = DataMap::new(final_chunk_infos);
let (shrunk_map, shrink_chunks) = shrink_data_map(data_map, |_hash, _content| {
Ok(())
})?;
self.shrinking_chunks = shrink_chunks;
self.final_datamap = Some(shrunk_map);
if !self.shrinking_chunks.is_empty()
&& self.shrinking_chunk_index < self.shrinking_chunks.len()
{
let chunk = self
.shrinking_chunks
.get(self.shrinking_chunk_index)
.ok_or_else(|| Error::Generic("Shrinking chunk index out of bounds".to_string()))?
.clone();
self.shrinking_chunk_index += 1;
return Ok(Some(ChunkOrDataMap::Chunk(chunk)));
}
self.is_complete = true;
match self.final_datamap.as_ref() {
Some(dm) => Ok(Some(ChunkOrDataMap::DataMap(dm.clone()))),
None => Err(Error::Generic(
"Final datamap missing after finalization".to_string(),
)),
}
}
}
impl<I> EncryptionStream<I>
where
I: Iterator<Item = Bytes>,
{
fn next_internal(&mut self) -> Option<Result<ChunkOrDataMap>> {
loop {
if !self.shrinking_chunks.is_empty()
&& self.shrinking_chunk_index < self.shrinking_chunks.len()
{
let chunk = match self.shrinking_chunks.get(self.shrinking_chunk_index) {
Some(c) => c.clone(),
None => {
return Some(Err(Error::Generic(
"Shrinking chunk index out of bounds".to_string(),
)));
}
};
self.shrinking_chunk_index += 1;
if self.shrinking_chunk_index >= self.shrinking_chunks.len() {
self.is_complete = true;
}
return Some(Ok(ChunkOrDataMap::Chunk(chunk)));
}
if self.is_complete {
if let Some(datamap) = &self.final_datamap {
return Some(Ok(ChunkOrDataMap::DataMap(datamap.clone())));
}
return None;
}
match self.try_process_chunks() {
Ok(Some(result)) => return Some(Ok(result)),
Ok(None) => {} Err(e) => return Some(Err(e)),
}
if !self.input_complete {
match self.data_iter.next() {
Some(data) => {
self.buffer.extend_from_slice(&data);
match self.try_process_chunks() {
Ok(Some(result)) => return Some(Ok(result)),
Ok(None) => continue, Err(e) => return Some(Err(e)),
}
}
None => {
self.input_complete = true;
match self.try_process_chunks() {
Ok(Some(result)) => return Some(Ok(result)),
Ok(None) => return None, Err(e) => return Some(Err(e)),
}
}
}
} else {
return None;
}
}
}
}
impl<I> EncryptionStream<I> {
pub fn chunks(&mut self) -> ChunkStream<'_, I> {
ChunkStream { stream: self }
}
pub fn datamap(&self) -> Option<&DataMap> {
self.final_datamap.as_ref()
}
pub fn into_datamap(self) -> Option<DataMap> {
self.final_datamap
}
}
pub fn stream_encrypt<I>(data_size: usize, data_iter: I) -> Result<EncryptionStream<I>>
where
I: Iterator<Item = Bytes>,
{
EncryptionStream::new(data_size, data_iter)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_helpers::random_bytes;
use std::collections::HashMap;
#[test]
fn test_stream_encrypt_basic() -> Result<()> {
let data_size = 50_000; let original_data = random_bytes(data_size);
let data_iter = original_data
.chunks(1024)
.map(|chunk| Bytes::from(chunk.to_vec()));
let mut encrypted_chunks = Vec::new();
let mut stream = stream_encrypt(data_size, data_iter)?;
for chunk_result in stream.chunks() {
let (hash, content) = chunk_result?;
encrypted_chunks.push((hash, content));
}
let data_map = stream
.datamap()
.expect("Should have completed with DataMap");
assert_ne!(data_map.len(), 0, "DataMap should have chunks");
assert_ne!(
encrypted_chunks.len(),
0,
"Should have yielded encrypted chunks"
);
Ok(())
}
#[test]
fn test_stream_encrypt_single_chunk() -> Result<()> {
let data_size = 10_000; let original_data = random_bytes(data_size);
let data_iter = std::iter::once(original_data);
let mut chunks = Vec::new();
let mut stream = stream_encrypt(data_size, data_iter)?;
for chunk_result in stream.chunks() {
let (hash, content) = chunk_result?;
chunks.push((hash, content));
}
assert!(!chunks.is_empty());
let _datamap = stream.datamap().expect("Should have DataMap");
Ok(())
}
#[test]
fn test_stream_encrypt_large_file() -> Result<()> {
let data_size = 5_000_000; let original_data = random_bytes(data_size);
let chunk_size = 64 * 1024; let data_iter = original_data
.chunks(chunk_size)
.map(|chunk| Bytes::from(chunk.to_vec()));
let mut encrypted_chunks = Vec::new();
let mut stream = stream_encrypt(data_size, data_iter)?;
for chunk_result in stream.chunks() {
let (hash, content) = chunk_result?;
encrypted_chunks.push((hash, content));
}
let _data_map = stream.datamap().expect("Should complete with DataMap");
assert!(
encrypted_chunks.len() > 1,
"Large file should produce multiple chunks"
);
Ok(())
}
#[test]
fn test_stream_encrypt_memory_efficiency() -> Result<()> {
let data_size = 10_000_000; let chunk_size = 8192;
let original_data = random_bytes(data_size);
let data_iter = original_data
.chunks(chunk_size)
.map(|chunk| Bytes::from(chunk.to_vec()));
let mut stream = stream_encrypt(data_size, data_iter)?;
let _max_buffer_size = 0;
let mut encrypted_count = 0;
for chunk_result in stream.chunks() {
let (_hash, _content) = chunk_result?;
encrypted_count += 1;
}
assert!(
encrypted_count > 0,
"Should yield encrypted chunks progressively"
);
Ok(())
}
#[test]
fn test_stream_encrypt_too_small() {
let data_size = 2; let data = vec![42u8; data_size];
let data_iter = std::iter::once(Bytes::from(data));
let result = stream_encrypt(data_size, data_iter);
assert!(
result.is_err(),
"Should reject files that are too small for self-encryption"
);
}
#[test]
fn test_stream_encrypt_consistency() -> Result<()> {
let data_size = 200_000;
let original_data = random_bytes(data_size);
let (_standard_data_map, standard_chunks) = crate::encrypt(original_data.clone())?;
let data_iter = original_data
.chunks(4096)
.map(|chunk| Bytes::from(chunk.to_vec()));
let mut streaming_chunks = Vec::new();
let mut stream = stream_encrypt(data_size, data_iter)?;
for chunk_result in stream.chunks() {
let (hash, content) = chunk_result?;
streaming_chunks.push((hash, content));
}
let _streaming_data_map = stream.datamap().expect("Should have DataMap");
let mut standard_storage = HashMap::new();
for chunk in standard_chunks {
let hash = crate::hash::content_hash(&chunk.content);
let _ = standard_storage.insert(hash, chunk.content.to_vec());
}
let mut streaming_storage = HashMap::new();
for (hash, content) in streaming_chunks {
let _ = streaming_storage.insert(hash, content.to_vec());
}
assert_ne!(standard_storage.len(), 0, "Standard should produce chunks");
assert_ne!(
streaming_storage.len(),
0,
"Streaming should produce chunks"
);
Ok(())
}
}