use crate::client::data_types::chunk::ChunkAddress;
use xor_name::XorName;
use crate::Bytes;
use crate::Client;
use crate::client::GetError;
type ChunkFetcher =
Box<dyn Fn(&[(usize, XorName)]) -> self_encryption::Result<Vec<(usize, Bytes)>> + Send + Sync>;
pub struct DataStream {
streaming_decrypt: self_encryption::DecryptionStream<ChunkFetcher>,
}
impl DataStream {
pub(crate) fn new(client: Client, datamap: self_encryption::DataMap) -> Result<Self, GetError> {
let client_clone = client.clone();
let chunk_fetcher: ChunkFetcher = Box::new(move |chunk_names: &[(usize, XorName)]| -> self_encryption::Result<Vec<(usize, Bytes)>> {
let chunk_addresses: Vec<(usize, ChunkAddress)> = chunk_names
.iter()
.map(|(i, name)| (*i, ChunkAddress::new(*name)))
.collect();
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
client_clone
.fetch_chunks_parallel(&chunk_addresses, chunk_names.len())
.await
})
})
});
let streaming_decrypt = self_encryption::streaming_decrypt(&datamap, chunk_fetcher)
.map_err(|e| GetError::Decryption(crate::self_encryption::Error::SelfEncryption(e)))?;
Ok(Self { streaming_decrypt })
}
pub fn data_size(&self) -> usize {
self.streaming_decrypt.file_size()
}
pub fn get_range(&self, start: usize, len: usize) -> Result<Bytes, GetError> {
self.streaming_decrypt
.get_range(start, len)
.map_err(|e| GetError::Decryption(crate::self_encryption::Error::SelfEncryption(e)))
}
pub fn range(&self, range: std::ops::Range<usize>) -> Result<Bytes, GetError> {
self.streaming_decrypt
.range(range)
.map_err(|e| GetError::Decryption(crate::self_encryption::Error::SelfEncryption(e)))
}
pub fn range_from(&self, start: usize) -> Result<Bytes, GetError> {
self.streaming_decrypt
.range_from(start)
.map_err(|e| GetError::Decryption(crate::self_encryption::Error::SelfEncryption(e)))
}
pub fn range_to(&self, end: usize) -> Result<Bytes, GetError> {
self.streaming_decrypt
.range_to(end)
.map_err(|e| GetError::Decryption(crate::self_encryption::Error::SelfEncryption(e)))
}
pub fn range_full(&self) -> Result<Bytes, GetError> {
self.streaming_decrypt
.range_full()
.map_err(|e| GetError::Decryption(crate::self_encryption::Error::SelfEncryption(e)))
}
pub fn range_inclusive(&self, start: usize, end: usize) -> Result<Bytes, GetError> {
self.streaming_decrypt
.range_inclusive(start, end)
.map_err(|e| GetError::Decryption(crate::self_encryption::Error::SelfEncryption(e)))
}
}
impl Iterator for DataStream {
type Item = Result<Bytes, GetError>;
fn next(&mut self) -> Option<Self::Item> {
match self.streaming_decrypt.next() {
Some(Ok(chunk_bytes)) => {
Some(Ok(chunk_bytes))
}
Some(Err(e)) => {
Some(Err(GetError::Decryption(
crate::self_encryption::Error::SelfEncryption(e),
)))
}
None => {
None
}
}
}
}
#[cfg(test)]
mod tests {
#[allow(unused_imports)]
use super::*;
#[tokio::test]
async fn test_data_stream_range_access() {
use std::collections::HashMap;
let test_data = crate::Bytes::from(vec![42u8; 100_000]);
let (data_map_chunk, chunks) = crate::self_encryption::encrypt(test_data.clone())
.expect("Failed to encrypt test data");
let mut chunk_storage = HashMap::new();
for chunk in &chunks {
let hash = xor_name::XorName::from_content(&chunk.value);
chunk_storage.insert(hash, chunk.clone());
}
let data_map_hash = xor_name::XorName::from_content(&data_map_chunk.value);
chunk_storage.insert(data_map_hash, data_map_chunk.clone());
let data_map_chunk = crate::chunk::DataMapChunk(data_map_chunk);
let restored_data_map: self_encryption::DataMap =
rmp_serde::from_slice(&data_map_chunk.0.value).expect("Failed to deserialize data map");
let chunk_fetcher: ChunkFetcher = Box::new(move |chunk_names: &[(usize, xor_name::XorName)]| -> self_encryption::Result<Vec<(usize, crate::Bytes)>> {
let mut results = Vec::new();
for (i, hash) in chunk_names {
let chunk = chunk_storage.get(hash)
.ok_or_else(|| self_encryption::Error::Generic("Chunk not found".to_string()))?;
results.push((*i, chunk.value.clone()));
}
Ok(results)
});
let streaming_decrypt =
self_encryption::streaming_decrypt(&restored_data_map, chunk_fetcher)
.expect("Failed to create streaming decrypt");
let data_stream = DataStream { streaming_decrypt };
assert_eq!(data_stream.data_size(), test_data.len());
let range_data = data_stream.get_range(1000, 5000).unwrap();
assert_eq!(range_data.len(), 5000);
assert_eq!(range_data.as_ref(), &test_data[1000..6000]);
let range_data2 = data_stream.range(1000..6000).unwrap();
assert_eq!(range_data2, range_data);
let from_data = data_stream.range_from(95000).unwrap();
assert_eq!(from_data.len(), 5000);
assert_eq!(from_data.as_ref(), &test_data[95000..]);
let to_data = data_stream.range_to(5000).unwrap();
assert_eq!(to_data.len(), 5000);
assert_eq!(to_data.as_ref(), &test_data[..5000]);
let full_data = data_stream.range_full().unwrap();
assert_eq!(full_data.len(), test_data.len());
assert_eq!(full_data.as_ref(), &test_data[..]);
let inclusive_data = data_stream.range_inclusive(1000, 1999).unwrap();
assert_eq!(inclusive_data.len(), 1000); assert_eq!(inclusive_data.as_ref(), &test_data[1000..2000]);
}
#[tokio::test]
async fn test_data_stream_range_edge_cases() {
use std::collections::HashMap;
let test_data = crate::Bytes::from((0..=255u8).cycle().take(5000).collect::<Vec<u8>>());
let (data_map_chunk, chunks) = crate::self_encryption::encrypt(test_data.clone())
.expect("Failed to encrypt test data");
let mut chunk_storage = HashMap::new();
for chunk in &chunks {
let hash = xor_name::XorName::from_content(&chunk.value);
chunk_storage.insert(hash, chunk.clone());
}
let data_map_hash = xor_name::XorName::from_content(&data_map_chunk.value);
chunk_storage.insert(data_map_hash, data_map_chunk.clone());
let data_map_chunk = crate::chunk::DataMapChunk(data_map_chunk);
let restored_data_map: self_encryption::DataMap =
rmp_serde::from_slice(&data_map_chunk.0.value).expect("Failed to deserialize data map");
let chunk_fetcher: ChunkFetcher = Box::new(move |chunk_names: &[(usize, xor_name::XorName)]| -> self_encryption::Result<Vec<(usize, crate::Bytes)>> {
let mut results = Vec::new();
for (i, hash) in chunk_names {
let chunk = chunk_storage.get(hash)
.ok_or_else(|| self_encryption::Error::Generic("Chunk not found".to_string()))?;
results.push((*i, chunk.value.clone()));
}
Ok(results)
});
let streaming_decrypt =
self_encryption::streaming_decrypt(&restored_data_map, chunk_fetcher)
.expect("Failed to create streaming decrypt");
let data_stream = DataStream { streaming_decrypt };
let beyond_range = data_stream.get_range(10000, 1000).unwrap();
assert_eq!(beyond_range.len(), 0);
let at_end = data_stream.get_range(5000, 100).unwrap();
assert_eq!(at_end.len(), 0);
let partial_exceed = data_stream.get_range(4800, 400).unwrap();
assert_eq!(partial_exceed.len(), 200); assert_eq!(partial_exceed.as_ref(), &test_data[4800..]);
let zero_len = data_stream.get_range(2500, 0).unwrap();
assert_eq!(zero_len.len(), 0);
let at_start = data_stream.get_range(0, 100).unwrap();
assert_eq!(at_start.len(), 100);
assert_eq!(at_start.as_ref(), &test_data[0..100]);
}
#[tokio::test]
async fn test_data_stream_vs_data_get() {
use std::collections::HashMap;
let test_data = crate::Bytes::from(vec![42u8; 1_000_000]);
let (data_map_chunk, chunks) = crate::self_encryption::encrypt(test_data.clone())
.expect("Failed to encrypt test data");
let mut chunk_storage = HashMap::new();
for chunk in &chunks {
let hash = xor_name::XorName::from_content(&chunk.value);
chunk_storage.insert(hash, chunk.clone());
}
let data_map_hash = xor_name::XorName::from_content(&data_map_chunk.value);
chunk_storage.insert(data_map_hash, data_map_chunk.clone());
let data_map_chunk = crate::chunk::DataMapChunk(data_map_chunk);
let restored_data_map: self_encryption::DataMap =
rmp_serde::from_slice(&data_map_chunk.0.value).expect("Failed to deserialize data map");
let encrypted_chunks: Vec<_> = restored_data_map
.infos()
.iter()
.map(|info| {
let chunk_data = chunk_storage
.get(&info.dst_hash)
.expect("Chunk not found in storage");
self_encryption::EncryptedChunk {
content: chunk_data.value.clone(),
}
})
.collect();
let data_from_get = self_encryption::decrypt(&restored_data_map, &encrypted_chunks)
.expect("Failed to decrypt with data_get method");
let chunk_fetcher = |chunk_names: &[(usize, xor_name::XorName)]| -> self_encryption::Result<Vec<(usize, crate::Bytes)>> {
let mut results = Vec::new();
for (i, hash) in chunk_names {
let chunk = chunk_storage.get(hash)
.ok_or_else(|| self_encryption::Error::Generic("Chunk not found".to_string()))?;
results.push((*i, chunk.value.clone()));
}
Ok(results)
};
let streaming_decrypt =
self_encryption::streaming_decrypt(&restored_data_map, chunk_fetcher)
.expect("Failed to create streaming decrypt");
let mut data_from_stream = Vec::new();
for chunk_result in streaming_decrypt {
let chunk = chunk_result.expect("Failed to get chunk from stream");
data_from_stream.extend_from_slice(&chunk);
}
let data_from_stream = crate::Bytes::from(data_from_stream);
assert_eq!(
data_from_get.len(),
test_data.len(),
"data_get length mismatch"
);
assert_eq!(
data_from_stream.len(),
test_data.len(),
"data_stream length mismatch"
);
assert_eq!(data_from_get, test_data, "data_get content mismatch");
assert_eq!(data_from_stream, test_data, "data_stream content mismatch");
assert_eq!(
data_from_get, data_from_stream,
"data_get and data_stream results don't match"
);
}
}