use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::io::{self, Read};
use std::path::PathBuf;
use std::time::{SystemTime, UNIX_EPOCH};
use async_trait::async_trait;
use super::cache::*;
use super::consts::*;
use super::file::*;
use super::layer::*;
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use flate2::Compression;
use tar::*;
use tokio::io::AsyncWriteExt;
#[async_trait]
pub trait Packable {
async fn export_layers(
&self,
layer_ids: Box<dyn Iterator<Item = [u32; 5]> + Send>,
) -> io::Result<Vec<u8>>;
async fn import_layers(
&self,
pack: &[u8],
layer_ids: Box<dyn Iterator<Item = [u32; 5]> + Send>,
) -> io::Result<()>;
}
#[async_trait]
impl<T: PersistentLayerStore> Packable for T {
async fn export_layers(
&self,
layer_ids: Box<dyn Iterator<Item = [u32; 5]> + Send>,
) -> io::Result<Vec<u8>> {
let mtime = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let mut enc = GzEncoder::new(Vec::new(), Compression::default());
{
let mut tar = tar::Builder::new(&mut enc);
for id in layer_ids {
tar_append_layer(&mut tar, self, id, mtime).await?;
}
tar.finish().unwrap();
}
Ok(enc.finish().unwrap())
}
async fn import_layers(
&self,
pack: &[u8],
layer_ids: Box<dyn Iterator<Item = [u32; 5]> + Send>,
) -> io::Result<()> {
let mut layer_id_set = HashSet::new();
for id in layer_ids {
layer_id_set.insert(name_to_string(id));
self.create_named_directory(id).await?;
}
let handle = tokio::runtime::Handle::current();
tokio::task::block_in_place(|| {
let cursor = io::Cursor::new(pack);
let tar = GzDecoder::new(cursor);
let mut archive = Archive::new(tar);
for e in archive.entries()? {
let mut entry = e?;
let path = entry.path()?;
let os_file_name = path.file_name().unwrap();
let file_name = os_file_name
.to_str()
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
"unexpected non-utf8 directory name",
)
})?
.to_owned();
let layer_id = path.iter().next().and_then(|p| p.to_str()).unwrap_or("");
if layer_id_set.contains(layer_id) {
let layer_id_arr = string_to_name(layer_id).unwrap();
let header = entry.header();
if !header.entry_type().is_file() {
continue;
}
let mut content = Vec::with_capacity(header.size()? as usize);
entry.read_to_end(&mut content)?;
handle.block_on(async move {
let file = self.get_file(layer_id_arr, &file_name).await?;
let mut writer = file.open_write().await?;
writer.write_all(&content).await?;
writer.flush().await?;
writer.sync_all().await?;
Ok::<_, io::Error>(())
})?;
}
}
for layer_id in layer_id_set {
let layer_id_arr = string_to_name(&layer_id).unwrap();
handle.block_on(self.finalize_layer(layer_id_arr))?;
}
Ok(())
})
}
}
async fn tar_append_file<S: PersistentLayerStore, W: io::Write>(
store: &S,
tar: &mut tar::Builder<W>,
layer: [u32; 5],
layer_path: &PathBuf,
file_name: &str,
mtime: u64,
) -> io::Result<()> {
if store.file_exists(layer, file_name).await? {
let file = store.get_file(layer, file_name).await?;
let contents = file.map().await?;
let cursor = io::Cursor::new(&contents);
let path = layer_path.join(file_name);
let mut header = Header::new_gnu();
header.set_mode(0o644);
header.set_size(file.size().await? as u64);
header.set_mtime(mtime);
tokio::task::block_in_place(|| tar.append_data(&mut header, path, cursor).unwrap());
Ok(())
} else {
Err(io::Error::new(
io::ErrorKind::NotFound,
"file does not exist",
))
}
}
async fn tar_append_file_if_exists<S: PersistentLayerStore, W: io::Write>(
store: &S,
tar: &mut tar::Builder<W>,
layer: [u32; 5],
layer_path: &PathBuf,
file_name: &str,
mtime: u64,
) -> io::Result<()> {
if store.file_exists(layer, file_name).await? {
let file = store.get_file(layer, file_name).await?;
let contents = file.map().await?;
let cursor = io::Cursor::new(&contents);
let path = layer_path.join(file_name);
let mut header = Header::new_gnu();
header.set_mode(0o644);
header.set_size(file.size().await? as u64);
header.set_mtime(mtime);
tokio::task::block_in_place(|| tar.append_data(&mut header, path, cursor).unwrap());
}
Ok(())
}
async fn tar_append_layer<W: io::Write, S: PersistentLayerStore>(
tar: &mut tar::Builder<W>,
store: &S,
layer: [u32; 5],
mtime: u64,
) -> io::Result<()> {
let mut header = Header::new_gnu();
header.set_mode(0o755);
header.set_entry_type(EntryType::Directory);
header.set_mtime(mtime);
header.set_size(0);
let layer_name = name_to_string(layer);
let mut path = PathBuf::new();
path.push(layer_name);
tokio::task::block_in_place(|| {
tar.append_data(&mut header, &path, std::io::empty())
.unwrap()
});
for f in &SHARED_REQUIRED_FILES {
tar_append_file(store, tar, layer, &path, f, mtime).await?;
}
for f in &SHARED_OPTIONAL_FILES {
if f == &FILENAMES.rollup {
continue;
}
tar_append_file_if_exists(store, tar, layer, &path, f, mtime).await?;
}
if store.file_exists(layer, FILENAMES.parent).await? {
for f in &CHILD_LAYER_REQUIRED_FILES {
tar_append_file(store, tar, layer, &path, f, mtime).await?;
}
for f in &CHILD_LAYER_OPTIONAL_FILES {
tar_append_file_if_exists(store, tar, layer, &path, f, mtime).await?;
}
} else {
for f in &BASE_LAYER_REQUIRED_FILES {
tar_append_file(store, tar, layer, &path, f, mtime).await?;
}
for f in &BASE_LAYER_OPTIONAL_FILES {
tar_append_file_if_exists(store, tar, layer, &path, f, mtime).await?;
}
}
Ok(())
}
#[derive(Debug)]
pub enum PackError {
LayerNotFound,
Io(io::Error),
Utf8Error(std::str::Utf8Error),
}
impl Display for PackError {
fn fmt(&self, formatter: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
write!(formatter, "{:?}", self)
}
}
impl From<io::Error> for PackError {
fn from(err: io::Error) -> Self {
Self::Io(err)
}
}
impl From<std::str::Utf8Error> for PackError {
fn from(err: std::str::Utf8Error) -> Self {
Self::Utf8Error(err)
}
}
pub fn pack_layer_parents<R: io::Read>(
readable: R,
) -> Result<HashMap<[u32; 5], Option<[u32; 5]>>, PackError> {
let tar = GzDecoder::new(readable);
let mut archive = Archive::new(tar);
let mut result_map = HashMap::new();
for e in archive.entries()? {
let mut entry = e?;
let path = entry.path()?;
let id = string_to_name(
path.iter()
.next()
.expect("expected path to have at least one component")
.to_str()
.expect("expected proper unicode path"),
)?;
if path.file_name().expect("expected path to have a filename") == "parent.hex" {
let mut parent_id_bytes = [0u8; 40];
entry.read_exact(&mut parent_id_bytes)?;
let parent_id_str = std::str::from_utf8(&parent_id_bytes)?;
let parent_id = string_to_name(parent_id_str)?;
result_map.insert(id, Some(parent_id));
} else {
result_map.entry(id).or_insert(None);
}
}
Ok(result_map)
}
#[async_trait]
impl Packable for CachedLayerStore {
async fn export_layers(
&self,
layer_ids: Box<dyn Iterator<Item = [u32; 5]> + Send>,
) -> io::Result<Vec<u8>> {
self.inner.export_layers(layer_ids).await
}
async fn import_layers(
&self,
pack: &[u8],
layer_ids: Box<dyn Iterator<Item = [u32; 5]> + Send>,
) -> io::Result<()> {
self.inner.import_layers(pack, layer_ids).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::layer::*;
use crate::storage::directory::*;
use std::sync::Arc;
use tempfile::tempdir;
#[tokio::test(flavor = "multi_thread")]
async fn export_import_layer_with_rollup() {
let dir1 = tempdir().unwrap();
let store1 = Arc::new(DirectoryLayerStore::new(dir1.path()));
let dir2 = tempdir().unwrap();
let store2 = Arc::new(DirectoryLayerStore::new(dir2.path()));
let mut builder = store1.create_base_layer().await.unwrap();
let base_name = builder.name();
builder.add_value_triple(ValueTriple::new_node("cow", "likes", "duck"));
builder.add_value_triple(ValueTriple::new_node("duck", "hates", "cow"));
builder.commit_boxed().await.unwrap();
let mut builder = store1.create_child_layer(base_name).await.unwrap();
let child_name = builder.name();
builder.remove_value_triple(ValueTriple::new_node("duck", "hates", "cow"));
builder.add_value_triple(ValueTriple::new_node("duck", "likes", "cow"));
builder.commit_boxed().await.unwrap();
let unrolled_layer = store1.get_layer(child_name).await.unwrap().unwrap();
store1.clone().rollup(unrolled_layer).await.unwrap();
let export = store1
.export_layers(Box::new(vec![base_name, child_name].into_iter()))
.await
.unwrap();
store2
.import_layers(&export, Box::new(vec![base_name, child_name].into_iter()))
.await
.unwrap();
let imported_layer = store2.get_layer(child_name).await.unwrap().unwrap();
let triples: Vec<_> = imported_layer
.triples()
.map(|t| imported_layer.id_triple_to_string(&t).unwrap())
.collect();
assert_eq!(
vec![
ValueTriple::new_node("cow", "likes", "duck"),
ValueTriple::new_node("duck", "likes", "cow")
],
triples
);
}
}