use anyhow::{anyhow, Result};
use chrono::prelude::*;
use metrics::{describe_gauge, gauge};
use sled::transaction::ConflictableTransactionError;
use sled::Transactional;
use std::convert::TryInto;
use std::ffi::OsStr;
use std::hash::Hasher;
use std::os::unix::prelude::OsStrExt;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use tokio::fs::{create_dir_all, remove_file};
use tokio::io;
use tokio::sync::{Mutex, MutexGuard};
use tracing::info;
use crate::{store::Store, tree::FileTree};
#[derive(Debug)]
pub struct Archivist<T: FileTree> {
db: sled::Db,
arch_dir: PathBuf,
limit: u64,
size: Mutex<u64>,
tree: Arc<Mutex<T>>,
}
impl<T: FileTree> Archivist<T> {
pub async fn new(dir: &str, tree: T, limit_mb: u64) -> Result<Archivist<T>> {
init_metrics();
create_dir_all(dir).await?;
let db_path = db_path(dir);
let db = sled::Config::default()
.path(db_path)
.create_new(true)
.open()?;
gauge!("archive_size_limit").set((limit_mb * 1024 * 1024) as f64);
Ok(Archivist {
arch_dir: dir.into(),
db,
limit: limit_mb * 1024 * 1024,
size: Mutex::new(0),
tree: Arc::new(Mutex::new(tree)),
})
}
pub async fn open(dir: &str, tree: T, limit_mb: u64) -> Result<Archivist<T>> {
init_metrics();
let db_path = db_path(dir);
let db = sled::Config::default().path(db_path).open()?;
let time_tree = db.open_tree("time_tree")?;
let size = time_tree.iter().try_fold(0, |acc, v| {
if let Ok((_, v)) = v {
Ok::<_, anyhow::Error>(acc + u64::from_le_bytes(v[8..16].try_into()?))
} else {
Ok(acc)
}
})?;
gauge!("archive_size_limit").set((limit_mb * 1024 * 1024) as f64);
gauge!("archive_size").set(size as f64);
Ok(Archivist {
arch_dir: dir.into(),
db,
limit: limit_mb * 1024 * 1024,
size: Mutex::new(size),
tree: Arc::new(Mutex::new(tree)),
})
}
pub async fn add(&self, file_suffix: &str, s: &impl Store) -> Result<PathBuf> {
let path = self.new_filepath(file_suffix).await?;
let mut reader = s.store(&path)?;
let size = self.add_file_to_disk(&path, &mut reader).await?;
self.add_file_to_db(&path, size).await?;
Ok(path)
}
pub async fn tree(&self) -> MutexGuard<'_, T> {
self.tree.lock().await
}
async fn add_file_to_disk<P: AsRef<Path>>(
&self,
path: P,
mut r: &mut Pin<Box<dyn io::AsyncBufRead + Send>>,
) -> Result<u64> {
let mut file = tokio::fs::File::create(path).await?;
Ok(io::copy(&mut r, &mut file).await?)
}
async fn add_file_to_db<P: AsRef<Path>>(&self, path: P, file_size: u64) -> Result<()> {
let path = path.as_ref();
let file = path
.file_name()
.ok_or(anyhow!("Can't get filename from path: {}", path.display()))?;
let base = path
.parent()
.ok_or(anyhow!("Can't get parent fro path: {}", path.display()))?;
let time_tree = self.db.open_tree("time_tree")?;
let path_tree = self.db.open_tree("path_tree")?;
let mut hasher = std::collections::hash_map::DefaultHasher::new();
hasher.write(base.as_os_str().as_bytes());
let hash = hasher.finish();
(&time_tree, &path_tree).transaction(|(tx_time, tx_path)| {
tx_path.insert(&hash.to_le_bytes(), base.as_os_str().as_bytes())?;
tx_time.insert(
&Local::now().timestamp_nanos_opt().unwrap().to_be_bytes(),
new_time_tree_entry(hash, file_size, file),
)?;
Ok::<(), ConflictableTransactionError<sled::Error>>(())
})?;
{
let mut size = self.size.lock().await;
info!(
"Adding file: '{}', size: {:.2} MB, archive: {:.0} MB, limit: {:.0} MB",
path.display(),
file_size as f64 / (1024.0 * 1024.0),
*size as f64 / (1024.0 * 1024.0),
self.limit as f64 / (1024.0 * 1024.0),
);
*size += file_size;
gauge!("archive_size").set(*size as f64);
}
self.check_reduce().await?;
Ok(())
}
async fn new_filepath(&self, file_suffix: &str) -> Result<PathBuf> {
let mut a = PathBuf::from(self.arch_dir.as_path());
let (basename, filename) = self.tree.lock().await.basepath_fileprefix(file_suffix)?;
a.push(basename);
create_dir_all(&a).await?;
a.push(filename);
Ok(a)
}
async fn check_reduce(&self) -> Result<()> {
let mut archive_size = self.size.lock().await;
let time_tree = self.db.open_tree("time_tree")?;
let path_tree = self.db.open_tree("path_tree")?;
while *archive_size > self.limit {
let next = time_tree.pop_min()?;
if let Some((_, v)) = next {
let size = u64::from_le_bytes(v[8..16].try_into()?);
let file = OsStr::from_bytes(&v[16..]);
let path = path_tree.get(&v[0..8])?.expect("Path not found");
let path = OsStr::from_bytes(&path);
let file_path: PathBuf = Path::new(path).join(file);
info!(
"Deleting file: '{:?}', size: {:.2} MB",
file_path,
size as f64 / (1024.0 * 1024.0),
);
let _ = remove_file(&file_path)
.await
.map_err(|s| info!("Error deleting file: {:?} ({})", file, s));
*archive_size -= size;
}
}
Ok(())
}
}
fn new_time_tree_entry(path_hash: u64, size: u64, filename: &OsStr) -> Vec<u8> {
let mut bytes = [path_hash.to_le_bytes(), size.to_le_bytes()].concat();
bytes.extend_from_slice(filename.as_bytes());
bytes
}
fn db_path(dir: &str) -> PathBuf {
[dir, "archive.db"].iter().collect()
}
fn init_metrics() {
describe_gauge!(
"archive_size",
metrics::Unit::Kibibytes,
"The current size of the archive"
);
describe_gauge!(
"archive_size_limit",
metrics::Unit::Kibibytes,
"The maxumim size of the archive"
);
}