use crate::chunker::{Chunker, Slice, SlicerSettings};
use crate::repository::backend::common::manifest::ManifestTransaction;
use crate::repository::{Backend, ChunkID, Repository};
use anyhow::Result;
use chrono::prelude::*;
use parking_lot::RwLock;
use rmp_serde::{Deserializer, Serializer};
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::collections::HashMap;
use std::io::{Empty, Read, Write};
use std::sync::Arc;
#[cfg(feature = "profile")]
use flame::*;
#[cfg(feature = "profile")]
use flamer::flame;
#[derive(Serialize, Deserialize, Clone, Copy, Debug)]
pub struct Extent {
pub start: u64,
pub end: u64,
}
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Hash)]
pub struct StoredArchive {
name: String,
id: ChunkID,
timestamp: DateTime<FixedOffset>,
}
impl StoredArchive {
pub async fn load(&self, repo: &mut Repository<impl Backend>) -> Result<Archive> {
let bytes = repo.read_chunk(self.id).await?;
let mut de = Deserializer::new(&bytes[..]);
let archive: Archive =
Deserialize::deserialize(&mut de).expect("Unable to deserialize archive");
Ok(archive)
}
#[cfg(test)]
pub fn dummy_archive() -> StoredArchive {
StoredArchive {
name: "Test".to_string(),
id: ChunkID::manifest_id(),
timestamp: Local::now().with_timezone(Local::now().offset()),
}
}
pub fn timestamp(&self) -> DateTime<FixedOffset> {
self.timestamp
}
pub fn id(&self) -> ChunkID {
self.id
}
pub fn name(&self) -> &str {
&self.name
}
}
impl From<ManifestTransaction> for StoredArchive {
fn from(item: ManifestTransaction) -> Self {
StoredArchive {
name: item.name().to_string(),
id: item.pointer(),
timestamp: item.timestamp(),
}
}
}
#[derive(Serialize, Deserialize, Copy, Clone, Eq, PartialEq, Debug)]
pub struct ChunkLocation {
id: ChunkID,
start: u64,
length: u64,
}
impl PartialOrd for ChunkLocation {
fn partial_cmp(&self, other: &ChunkLocation) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ChunkLocation {
fn cmp(&self, other: &ChunkLocation) -> Ordering {
self.start.cmp(&other.start)
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Archive {
name: String,
objects: Arc<RwLock<HashMap<String, Vec<ChunkLocation>>>>,
namespace: Vec<String>,
timestamp: DateTime<FixedOffset>,
}
impl Archive {
pub fn new(name: &str) -> Archive {
Archive {
name: name.to_string(),
objects: Arc::new(RwLock::new(HashMap::new())),
namespace: Vec::new(),
timestamp: Local::now().with_timezone(Local::now().offset()),
}
}
#[cfg_attr(feature = "profile", flame)]
pub async fn put_object<R: Read + Send>(
&mut self,
chunker: &Chunker<impl SlicerSettings<Empty> + SlicerSettings<R>>,
repository: &mut Repository<impl Backend>,
path: &str,
from_reader: R,
) -> Result<()> {
let mut locations: Vec<ChunkLocation> = Vec::new();
let path = self.canonical_namespace() + path.trim();
#[cfg(feature = "profile")]
flame::start("Packing chunks");
let settings = repository.chunk_settings();
let key = repository.key();
let slices = chunker.chunked_iterator(from_reader, 0, &settings, key);
for Slice { data, start, end } in slices {
let id = repository.write_unpacked_chunk(data).await?.0;
locations.push(ChunkLocation {
id,
start,
length: end - start + 1,
});
}
#[cfg(feature = "profile")]
flame::end("Packing chunks");
let mut objects = self.objects.write();
objects.insert(path.to_string(), locations);
Ok(())
}
pub async fn put_sparse_object<R: Read + Send>(
&mut self,
chunker: &Chunker<impl SlicerSettings<Empty> + SlicerSettings<R>>,
repository: &mut Repository<impl Backend>,
path: &str,
from_readers: Vec<(Extent, R)>,
) -> Result<()> {
let mut locations: Vec<ChunkLocation> = Vec::new();
let path = self.canonical_namespace() + path.trim();
for (extent, read) in from_readers {
let settings = repository.chunk_settings();
let key = repository.key();
let slices = chunker.chunked_iterator(read, 0, &settings, key);
for Slice { data, start, end } in slices {
let id = repository.write_unpacked_chunk(data).await?.0;
locations.push(ChunkLocation {
id,
start: start + extent.start,
length: end - start + 1,
});
}
}
let mut objects = self.objects.write();
objects.insert(path.to_string(), locations);
Ok(())
}
pub fn put_empty(&mut self, path: &str) {
let locations: Vec<ChunkLocation> = Vec::new();
let mut objects = self.objects.write();
objects.insert(path.to_string(), locations);
}
#[cfg_attr(feature = "profile", flame)]
pub async fn get_object(
&self,
repository: &mut Repository<impl Backend>,
path: &str,
mut restore_to: impl Write,
) -> Result<()> {
let path = self.canonical_namespace() + path.trim();
let objects = self.objects.read();
println!("{:?}", path);
let locations = objects.get(&path.to_string()).cloned();
let mut locations = if let Some(locations) = locations {
locations
} else {
return Ok(());
};
locations.sort_unstable();
let mut last_index = locations[0].start;
for location in &locations {
let id = location.id;
let start = location.start;
if start > last_index + 1 {
let zero = [0_u8];
for _ in last_index + 1..start {
restore_to.write_all(&zero)?;
}
}
let bytes = repository.read_chunk(id).await?;
restore_to.write_all(&bytes)?;
last_index = start + location.length - 1;
}
Ok(())
}
pub async fn get_extent(
&self,
repository: &mut Repository<impl Backend>,
path: &str,
extent: Extent,
mut restore_to: impl Write,
) -> Result<()> {
let path = self.canonical_namespace() + path.trim();
let objects = self.objects.read();
let locations = objects.get(&path.to_string()).cloned();
let mut locations = if let Some(locations) = locations {
locations
} else {
return Ok(());
};
locations.sort_unstable();
let locations = locations
.iter()
.filter(|x| x.start >= extent.start && x.start <= extent.end);
let mut last_index = extent.start;
for location in locations {
let id = location.id;
let start = location.start;
if start > last_index + 1 {
let zero = [0_u8];
for _ in last_index + 1..start {
restore_to.write_all(&zero)?;
}
}
let bytes = repository.read_chunk(id).await?;
restore_to.write_all(&bytes)?;
last_index = start + location.length - 1;
}
Ok(())
}
pub async fn get_sparse_object(
&self,
repository: &mut Repository<impl Backend>,
path: &str,
mut to_writers: Vec<(Extent, impl Write)>,
) -> Result<()> {
for (extent, restore_to) in &mut to_writers {
self.get_extent(repository, path, *extent, restore_to)
.await?;
}
Ok(())
}
pub fn canonical_namespace(&self) -> String {
self.namespace.join(":") + ":"
}
pub fn namespace_append(&self, name: &str) -> Archive {
let mut new_namespace = self.namespace.clone();
new_namespace.push(name.to_string());
let mut archive = self.clone();
archive.namespace = new_namespace;
archive
}
pub async fn store(self, repo: &mut Repository<impl Backend>) -> StoredArchive {
let mut bytes = Vec::<u8>::new();
self.serialize(&mut Serializer::new(&mut bytes))
.expect("Unable to serialize archive.");
let id = repo
.write_chunk(bytes)
.await
.expect("Unable to write archive metatdata to repository.")
.0;
repo.commit_index().await;
StoredArchive {
id,
name: self.name,
timestamp: self.timestamp,
}
}
#[cfg_attr(tarpaulin, skip)]
pub fn name(&self) -> &str {
&self.name
}
#[cfg_attr(tarpaulin, skip)]
pub fn timestamp(&self) -> &DateTime<FixedOffset> {
&self.timestamp
}
}
#[cfg(test)]
#[cfg_attr(tarpaulin, skip)]
mod tests {
use super::*;
use crate::chunker::slicer::fastcdc::FastCDC;
use crate::chunker::*;
use crate::repository::backend::mem::Mem;
use crate::repository::ChunkSettings;
use crate::repository::Key;
use futures::executor::{block_on, ThreadPool};
use quickcheck_macros::quickcheck;
use rand::prelude::*;
use std::fs;
use std::io::{BufReader, Cursor, Empty, Seek, SeekFrom};
use std::path::Path;
use tempfile::tempdir;
fn get_repo_mem(key: Key) -> Repository<impl Backend> {
let pool = ThreadPool::new().unwrap();
let settings = ChunkSettings::lightweight();
let backend = Mem::new(settings, &pool);
Repository::with(backend, settings, key, pool)
}
#[quickcheck]
fn single_add_get(seed: u64) -> bool {
block_on(async {
println!("Seed: {}", seed);
let slicer: FastCDC<Empty> = FastCDC::new_defaults();
let chunker = Chunker::new(slicer.copy_settings());
let key = Key::random(32);
let size = 2 * 2_usize.pow(14);
let mut data = vec![0_u8; size];
let mut rand = SmallRng::seed_from_u64(seed);
rand.fill_bytes(&mut data);
let mut repo = get_repo_mem(key);
let mut archive = Archive::new("test");
let testdir = tempdir().unwrap();
let input_file_path = testdir.path().join(Path::new("file1"));
{
let mut input_file = fs::File::create(input_file_path.clone()).unwrap();
input_file.write_all(&data).unwrap();
}
let mut input_file = BufReader::new(fs::File::open(input_file_path).unwrap());
archive
.put_object(&chunker, &mut repo, "FileOne", &mut input_file)
.await
.unwrap();
let mut buf = Cursor::new(Vec::<u8>::new());
archive
.get_object(&mut repo, "FileOne", &mut buf)
.await
.unwrap();
let output = buf.into_inner();
println!("Input length: {}", data.len());
println!("Output length: {}", output.len());
let mut mismatch = false;
for i in 0..data.len() {
if data[i] != output[i] {
println!(
"Byte {} was different in output. Input val: {:X?} Output val {:X?}",
i, data[i], output[i]
);
mismatch = true;
}
}
!mismatch
})
}
#[quickcheck]
fn sparse_add_get(seed: u64) -> bool {
block_on(async {
let slicer: FastCDC<Empty> = FastCDC::new_defaults();
let chunker = Chunker::new(slicer.copy_settings());
let key = Key::random(32);
let mut repo = get_repo_mem(key);
let mut archive = Archive::new("test");
let mut rng = SmallRng::seed_from_u64(seed);
let mut extents: Vec<Extent> = Vec::new();
let extent_count: usize = rng.gen_range(1, 10);
let mut next_start: u64 = 0;
let mut final_size: usize = 0;
for _ in 0..extent_count {
let extent_length = rng.gen_range(256, 16384);
let extent = Extent {
start: next_start,
end: next_start + extent_length,
};
final_size = (next_start + extent_length) as usize;
extents.push(extent);
let jump = rng.gen_range(256, 16384);
next_start = next_start + extent_length + jump;
}
let mut test_input = vec![0_u8; final_size];
for Extent { start, end } in extents.clone() {
for i in start..end {
test_input[i as usize] = rng.gen();
}
}
let mut extent_list = Vec::new();
for extent in extents.clone() {
extent_list.push((
extent,
&test_input[extent.start as usize..extent.end as usize],
));
}
archive
.put_sparse_object(&chunker, &mut repo, "test", extent_list)
.await
.expect("Archive Put Failed");
let test_output = Vec::new();
println!("Output is a buffer of {} bytes.", final_size);
let mut cursor = Cursor::new(test_output);
for (i, extent) in extents.clone().iter().enumerate() {
println!("Getting extent #{} : {:?}", i, extent);
cursor
.seek(SeekFrom::Start(extent.start))
.expect("Out of bounds");
archive
.get_extent(&mut repo, "test", *extent, &mut cursor)
.await
.expect("Archive Get Failed");
}
let test_output = cursor.into_inner();
println!("Input is now a buffer of {} bytes.", test_input.len());
println!("Output is now a buffer of {} bytes.", test_output.len());
for i in 0..test_input.len() {
if test_output[i] != test_input[i] {
println!("Difference at {}", i);
println!("Orig: {:?}", &test_input[i - 2..i + 3]);
println!("New: {:?}", &test_output[i - 2..i + 3]);
break;
}
}
std::mem::drop(repo);
test_input == test_output
})
}
#[test]
fn default_namespace() {
let archive = Archive::new("test");
let namespace = archive.canonical_namespace();
assert_eq!(namespace, ":");
}
#[test]
fn namespace_append() {
let archive = Archive::new("test");
let archive = archive.namespace_append("1");
let archive = archive.namespace_append("2");
let namespace = archive.canonical_namespace();
println!("Namespace: {}", namespace);
assert_eq!(namespace, "1:2:");
}
#[test]
fn namespaced_insertions() {
block_on(async {
let slicer: FastCDC<Empty> = FastCDC::new_defaults();
let chunker = Chunker::new(slicer.copy_settings());
let key = Key::random(32);
let mut repo = get_repo_mem(key);
let mut obj1 = Cursor::new([1_u8; 32]);
let mut obj2 = Cursor::new([2_u8; 32]);
let mut archive_1 = Archive::new("test");
let mut archive_2 = archive_1.clone();
archive_1
.put_object(&chunker, &mut repo, "1", &mut obj1)
.await
.unwrap();
archive_2
.put_object(&chunker, &mut repo, "2", &mut obj2)
.await
.unwrap();
let mut restore_1 = Cursor::new(Vec::<u8>::new());
archive_2
.get_object(&mut repo, "1", &mut restore_1)
.await
.unwrap();
let mut restore_2 = Cursor::new(Vec::<u8>::new());
archive_1
.get_object(&mut repo, "2", &mut restore_2)
.await
.unwrap();
let obj1 = obj1.into_inner();
let obj2 = obj2.into_inner();
let restore1 = restore_1.into_inner();
let restore2 = restore_2.into_inner();
assert_eq!(&obj1[..], &restore1[..]);
assert_eq!(&obj2[..], &restore2[..]);
});
}
#[test]
fn commit_and_load() {
block_on(async {
let slicer: FastCDC<Empty> = FastCDC::new_defaults();
let chunker = Chunker::new(slicer.copy_settings());
let key = Key::random(32);
let mut repo = get_repo_mem(key);
let mut obj1 = [0_u8; 32];
for i in 0..obj1.len() {
obj1[i] = i as u8;
}
let mut obj1 = Cursor::new(obj1);
let mut archive = Archive::new("test");
archive
.put_object(&chunker, &mut repo, "1", &mut obj1)
.await
.expect("Unable to put object in archive");
let stored_archive = archive.store(&mut repo).await;
let archive = stored_archive
.load(&mut repo)
.await
.expect("Unable to load archive from repository");
let mut obj_restore = Cursor::new(Vec::new());
archive
.get_object(&mut repo, "1", &mut obj_restore)
.await
.expect("Unable to restore object from archive");
assert_eq!(&obj1.into_inner()[..], &obj_restore.into_inner()[..]);
});
}
}