use crate::repository::backend::Manifest as ManifestTrait;
pub use crate::repository::backend::{Backend, BackendClone, Index, SegmentDescriptor};
use crate::repository::pipeline::Pipeline;
pub use asuran_core::repository::chunk::{Chunk, ChunkID, ChunkSettings};
pub use asuran_core::repository::compression::Compression;
pub use asuran_core::repository::encryption::Encryption;
pub use asuran_core::repository::hmac::HMAC;
pub use asuran_core::repository::key::{EncryptedKey, Key};
use semver::Version;
use thiserror::Error;
use tracing::{debug, info, instrument, span, trace, Level};
use uuid::Uuid;
use std::collections::HashSet;
pub mod backend;
pub mod pipeline;
#[derive(Error, Debug)]
#[non_exhaustive]
pub enum RepositoryError {
#[error("Chunk Not in Repository")]
ChunkNotFound,
#[error("Chunker Error")]
ChunkerError(#[from] asuran_core::repository::chunk::ChunkError),
#[error("Backend Error")]
BackendError(#[from] backend::BackendError),
}
type Result<T> = std::result::Result<T, RepositoryError>;
#[derive(Clone)]
pub struct Repository<T> {
backend: T,
compression: Compression,
hmac: HMAC,
encryption: Encryption,
key: Key,
pipeline: Pipeline,
pub queue_depth: usize,
}
impl<T: BackendClone + 'static> Repository<T> {
#[instrument(skip(key))]
pub fn new(
backend: T,
compression: Compression,
hmac: HMAC,
encryption: Encryption,
key: Key,
pipeline_tasks: usize,
) -> Repository<T> {
info!("Creating a repository with backend {:?}", backend);
let pipeline = Pipeline::new(pipeline_tasks);
Repository {
backend,
compression,
hmac,
encryption,
key,
pipeline,
queue_depth: pipeline_tasks,
}
}
#[instrument(skip(key))]
pub fn with(
backend: T,
settings: ChunkSettings,
key: Key,
pipeline_tasks: usize,
) -> Repository<T> {
info!(
"Creating a repository with backend {:?} and chunk settings {:?}",
backend, settings
);
let pipeline = Pipeline::new(pipeline_tasks);
Repository {
backend,
key,
pipeline,
compression: settings.compression,
hmac: settings.hmac,
encryption: settings.encryption,
queue_depth: pipeline_tasks,
}
}
#[instrument(skip(self))]
pub async fn commit_index(&self) {
debug!("Commiting Index");
self.backend
.get_index()
.commit_index()
.await
.expect("Unable to commit index");
}
pub async fn write_raw(&mut self, chunk: Chunk) -> Result<(ChunkID, bool)> {
let id = chunk.get_id();
let span = span!(Level::DEBUG, "Writing Chunk", ?id);
let _guard = span.enter();
debug!("Writing chunk with id {:?}", id);
if self.has_chunk(id).await && id != ChunkID::manifest_id() {
trace!("Chunk already existed, doing nothing.");
Ok((id, true))
} else {
trace!("Chunk did not exist, continuning");
let backend = &mut self.backend;
let location = backend.write_chunk(chunk).await?;
self.backend.get_index().set_chunk(id, location).await?;
Ok((id, false))
}
}
#[instrument(skip(self, data))]
pub async fn write_chunk(&mut self, data: Vec<u8>) -> Result<(ChunkID, bool)> {
let chunk = self
.pipeline
.process(
data,
self.compression,
self.encryption,
self.hmac,
self.key.clone(),
)
.await;
self.write_raw(chunk).await
}
#[instrument(skip(self, data))]
pub async fn write_chunk_with_id(
&mut self,
data: Vec<u8>,
id: ChunkID,
) -> Result<(ChunkID, bool)> {
let mut chunk = self
.pipeline
.process(
data,
self.compression,
self.encryption,
self.hmac,
self.key.clone(),
)
.await;
let mac = chunk.mac();
let encryption = chunk.encryption();
let data = (chunk.split().1).0;
chunk = Chunk::from_parts(data, self.compression, encryption, self.hmac, mac, id);
self.write_raw(chunk).await
}
#[instrument(skip(self))]
pub async fn has_chunk(&self, id: ChunkID) -> bool {
self.backend.get_index().lookup_chunk(id).await.is_some()
}
#[instrument(skip(self))]
pub async fn read_chunk(&mut self, id: ChunkID) -> Result<Vec<u8>> {
if self.has_chunk(id).await {
let mut index = self.backend.get_index();
let location = index.lookup_chunk(id).await.unwrap_or_else(|| {
panic!("Index lied to us about having the chunk with ID {:?}", id)
});
let chunk = self.backend.read_chunk(location).await?;
let data = chunk.unpack(&self.key)?;
Ok(data)
} else {
Err(RepositoryError::ChunkNotFound)
}
}
#[instrument(skip(self))]
pub async fn count_chunk(&self) -> usize {
self.backend.get_index().count_chunk().await
}
#[instrument(skip(self))]
pub fn chunk_settings(&self) -> ChunkSettings {
ChunkSettings {
encryption: self.encryption,
compression: self.compression,
hmac: self.hmac,
}
}
#[instrument(skip(self))]
pub fn key(&self) -> &Key {
&self.key
}
#[instrument(skip(self))]
pub fn backend_manifest(&self) -> T::Manifest {
self.backend.get_manifest()
}
#[instrument(skip(self))]
pub async fn close(mut self) {
self.backend.close().await;
}
pub async fn seen_versions(&self) -> HashSet<(Version, Uuid)> {
self.backend_manifest().seen_versions().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::repository::backend::common::sync_backend::BackendHandle;
use crate::repository::backend::mem::*;
use rand::prelude::*;
fn get_repo_mem(key: Key) -> Repository<BackendHandle<Mem>> {
let settings = ChunkSettings {
compression: Compression::ZStd { level: 1 },
hmac: HMAC::Blake2b,
encryption: Encryption::new_aes256ctr(),
};
let backend = Mem::new(settings, key.clone(), 4);
Repository::with(backend, settings, key, 2)
}
#[test]
fn repository_add_read() {
smol::run(async {
let key = Key::random(32);
let size = 7 * 10_u64.pow(3);
let mut data1 = vec![0_u8; size as usize];
thread_rng().fill_bytes(&mut data1);
let mut data2 = vec![0_u8; size as usize];
thread_rng().fill_bytes(&mut data2);
let mut data3 = vec![0_u8; size as usize];
thread_rng().fill_bytes(&mut data3);
let mut repo = get_repo_mem(key);
println!("Adding Chunks");
let key1 = repo.write_chunk(data1.clone()).await.unwrap().0;
let key2 = repo.write_chunk(data2.clone()).await.unwrap().0;
let key3 = repo.write_chunk(data3.clone()).await.unwrap().0;
println!("Reading Chunks");
let out1 = repo.read_chunk(key1).await.unwrap();
let out2 = repo.read_chunk(key2).await.unwrap();
let out3 = repo.read_chunk(key3).await.unwrap();
assert_eq!(data1, out1);
assert_eq!(data2, out2);
assert_eq!(data3, out3);
});
}
#[test]
fn double_add() {
smol::run(async {
let mut repo = get_repo_mem(Key::random(32));
assert_eq!(repo.count_chunk().await, 0);
let data = [1_u8; 8192];
let (key_1, unique_1) = repo.write_chunk(data.to_vec()).await.unwrap();
assert_eq!(unique_1, false);
assert_eq!(repo.count_chunk().await, 1);
let (key_2, unique_2) = repo.write_chunk(data.to_vec()).await.unwrap();
assert_eq!(repo.count_chunk().await, 1);
assert_eq!(unique_2, true);
assert_eq!(key_1, key_2);
std::mem::drop(repo);
});
}
#[test]
fn chunk_with_id() {
smol::run(async {
let mut repo = get_repo_mem(Key::random(32));
let size = 7 * 10_u64.pow(3);
let mut data = vec![0_u8; size as usize];
thread_rng().fill_bytes(&mut data);
let id = ChunkID::manifest_id();
repo.write_chunk_with_id(data.clone(), id)
.await
.expect("Unable to write with id");
let data_restore = repo
.read_chunk(id)
.await
.expect("Unable to read chunk back out");
assert_eq!(data, data_restore);
});
}
}