use anyhow::{anyhow, Result};
use futures::executor::ThreadPool;
use futures::task::Spawn;
use rmp_serde::{Deserializer, Serializer};
use serde::{Deserialize, Serialize};
pub use self::chunk::{Chunk, ChunkID, ChunkSettings, UnpackedChunk};
pub use crate::repository::backend::{Backend, Index, SegmentDescriptor};
pub use crate::repository::compression::Compression;
pub use crate::repository::encryption::Encryption;
pub use crate::repository::hmac::HMAC;
pub use crate::repository::key::{EncryptedKey, Key};
use crate::repository::pipeline::Pipeline;
#[cfg(feature = "profile")]
use flamer::*;
pub mod backend;
pub mod chunk;
pub mod compression;
pub mod encryption;
pub mod hmac;
pub mod key;
pub mod pipeline;
#[derive(Clone)]
pub struct Repository<T> {
backend: T,
compression: Compression,
hmac: HMAC,
encryption: Encryption,
key: Key,
pipeline: Pipeline,
}
impl<T: Backend + 'static> Repository<T> {
pub fn new(
backend: T,
compression: Compression,
hmac: HMAC,
encryption: Encryption,
key: Key,
) -> Repository<T> {
let pool = ThreadPool::new().unwrap();
let pipeline = Pipeline::new(pool);
Repository {
backend,
compression,
hmac,
encryption,
key,
pipeline,
}
}
pub fn with(backend: T, settings: ChunkSettings, key: Key, pool: impl Spawn) -> Repository<T> {
let pipeline = Pipeline::new(pool);
Repository {
backend,
key,
pipeline,
compression: settings.compression,
hmac: settings.hmac,
encryption: settings.encryption,
}
}
#[cfg_attr(feature = "profile", flame)]
pub async fn commit_index(&self) {
self.backend
.get_index()
.commit_index()
.await
.expect("Unable to commit index");
}
#[cfg_attr(feature = "profile", flame)]
pub async fn write_raw(&mut self, chunk: &Chunk) -> Result<(ChunkID, bool)> {
let id = chunk.get_id();
if self.has_chunk(id).await && id != ChunkID::manifest_id() {
Ok((id, true))
} else {
let mut buff = Vec::<u8>::new();
chunk.serialize(&mut Serializer::new(&mut buff)).unwrap();
let backend = &mut self.backend;
let location = backend.write_chunk(buff, chunk.get_id()).await?;
self.backend
.get_index()
.set_chunk(chunk.get_id(), location)
.await?;
Ok((id, false))
}
}
#[cfg_attr(feature = "profile", flame)]
pub async fn write_chunk(&mut self, data: Vec<u8>) -> Result<(ChunkID, bool)> {
let (_, chunk) = self
.pipeline
.process(
data,
self.compression,
self.encryption,
self.hmac,
self.key.clone(),
)
.await;
self.write_raw(&chunk).await
}
pub async fn write_chunks(&mut self, data: Vec<Vec<u8>>) -> Result<Vec<(ChunkID, bool)>> {
let chunks = self
.pipeline
.process_multiple(
data,
self.compression,
self.encryption,
self.hmac,
self.key.clone(),
)
.await;
let mut results = Vec::new();
for chunk in chunks {
results.push(self.write_raw(&chunk).await?)
}
Ok(results)
}
pub async fn write_unpacked_chunk(&mut self, data: UnpackedChunk) -> Result<(ChunkID, bool)> {
let id = data.id();
self.write_chunk_with_id(data.consuming_data(), id).await
}
#[cfg_attr(feature = "profile", flame)]
pub async fn write_chunk_with_id(
&mut self,
data: Vec<u8>,
id: ChunkID,
) -> Result<(ChunkID, bool)> {
let chunk = self
.pipeline
.process_with_id(
data,
id,
self.compression,
self.encryption,
self.hmac,
self.key.clone(),
)
.await;
self.write_raw(&chunk).await
}
pub async fn has_chunk(&self, id: ChunkID) -> bool {
self.backend.get_index().lookup_chunk(id).await.is_some()
}
#[cfg_attr(feature = "profile", flame)]
pub async fn read_chunk(&mut self, id: ChunkID) -> Result<Vec<u8>> {
if self.has_chunk(id).await {
let mut index = self.backend.get_index();
let location = index.lookup_chunk(id).await.unwrap();
let chunk_bytes = self.backend.read_chunk(location).await?;
let mut de = Deserializer::new(&chunk_bytes[..]);
let chunk: Chunk = Deserialize::deserialize(&mut de).unwrap();
let data = chunk.unpack(&self.key)?;
Ok(data)
} else {
Err(anyhow!("Chunk not in reposiotry"))
}
}
pub async fn count_chunk(&self) -> usize {
self.backend.get_index().count_chunk().await
}
pub fn chunk_settings(&self) -> ChunkSettings {
ChunkSettings {
encryption: self.encryption,
compression: self.compression,
hmac: self.hmac,
}
}
pub fn key(&self) -> &Key {
&self.key
}
pub fn backend_manifest(&self) -> T::Manifest {
self.backend.get_manifest()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::repository::backend::mem::*;
use futures::executor::block_on;
use rand::prelude::*;
fn get_repo_mem(key: Key) -> Repository<Mem> {
let pool = ThreadPool::new().unwrap();
let settings = ChunkSettings {
compression: Compression::ZStd { level: 1 },
hmac: HMAC::Blake2b,
encryption: Encryption::new_aes256ctr(),
};
let backend = Mem::new(settings, &pool);
Repository::with(backend, settings, key, pool)
}
#[test]
fn repository_add_read() {
block_on(async {
let key = Key::random(32);
let size = 7 * 10_u64.pow(3);
let mut data1 = vec![0_u8; size as usize];
thread_rng().fill_bytes(&mut data1);
let mut data2 = vec![0_u8; size as usize];
thread_rng().fill_bytes(&mut data2);
let mut data3 = vec![0_u8; size as usize];
thread_rng().fill_bytes(&mut data3);
let mut repo = get_repo_mem(key);
println!("Adding Chunks");
let key1 = repo.write_chunk(data1.clone()).await.unwrap().0;
let key2 = repo.write_chunk(data2.clone()).await.unwrap().0;
let key3 = repo.write_chunk(data3.clone()).await.unwrap().0;
println!("Reading Chunks");
let out1 = repo.read_chunk(key1).await.unwrap();
let out2 = repo.read_chunk(key2).await.unwrap();
let out3 = repo.read_chunk(key3).await.unwrap();
assert_eq!(data1, out1);
assert_eq!(data2, out2);
assert_eq!(data3, out3);
});
}
#[test]
fn double_add() {
block_on(async {
let mut repo = get_repo_mem(Key::random(32));
assert_eq!(repo.count_chunk().await, 0);
let data = [1_u8; 8192];
let (key_1, unique_1) = repo.write_chunk(data.to_vec()).await.unwrap();
assert_eq!(unique_1, false);
assert_eq!(repo.count_chunk().await, 1);
let (key_2, unique_2) = repo.write_chunk(data.to_vec()).await.unwrap();
assert_eq!(repo.count_chunk().await, 1);
assert_eq!(unique_2, true);
assert_eq!(key_1, key_2);
std::mem::drop(repo);
});
}
}