archivist 0.4.5

Store files in a time or index based directory hierarchy, automatically deleting the oldest files if the size limit is reached
Documentation
use anyhow::{anyhow, Result};
use chrono::prelude::*;
use metrics::{describe_gauge, gauge};
use sled::transaction::ConflictableTransactionError;
use sled::Transactional;
use std::convert::TryInto;
use std::ffi::OsStr;
use std::hash::Hasher;
use std::os::unix::prelude::OsStrExt;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use tokio::fs::{create_dir_all, remove_file};
use tokio::io;
use tokio::sync::{Mutex, MutexGuard};
use tracing::info;

use crate::{store::Store, tree::FileTree};

#[derive(Debug)]
pub struct Archivist<T: FileTree> {
    db: sled::Db,
    arch_dir: PathBuf,
    limit: u64,
    size: Mutex<u64>,
    tree: Arc<Mutex<T>>,
}

impl<T: FileTree> Archivist<T> {
    pub async fn new(dir: &str, tree: T, limit_mb: u64) -> Result<Archivist<T>> {
        init_metrics();
        create_dir_all(dir).await?;
        let db_path = db_path(dir);
        let db = sled::Config::default()
            .path(db_path)
            .create_new(true)
            .open()?;

        gauge!("archive_size_limit").set((limit_mb * 1024 * 1024) as f64);
        Ok(Archivist {
            arch_dir: dir.into(),
            db,
            limit: limit_mb * 1024 * 1024,
            size: Mutex::new(0),
            tree: Arc::new(Mutex::new(tree)),
        })
    }

    pub async fn open(dir: &str, tree: T, limit_mb: u64) -> Result<Archivist<T>> {
        init_metrics();
        let db_path = db_path(dir);
        let db = sled::Config::default().path(db_path).open()?;
        let time_tree = db.open_tree("time_tree")?;
        let size = time_tree.iter().try_fold(0, |acc, v| {
            if let Ok((_, v)) = v {
                Ok::<_, anyhow::Error>(acc + u64::from_le_bytes(v[8..16].try_into()?))
            } else {
                Ok(acc)
            }
        })?;

        gauge!("archive_size_limit").set((limit_mb * 1024 * 1024) as f64);
        gauge!("archive_size").set(size as f64);
        Ok(Archivist {
            arch_dir: dir.into(),
            db,
            limit: limit_mb * 1024 * 1024,
            size: Mutex::new(size),
            tree: Arc::new(Mutex::new(tree)),
        })
    }

    pub async fn add(&self, file_suffix: &str, s: &impl Store) -> Result<PathBuf> {
        let path = self.new_filepath(file_suffix).await?;
        let mut reader = s.store(&path)?;
        let size = self.add_file_to_disk(&path, &mut reader).await?;
        self.add_file_to_db(&path, size).await?;
        Ok(path)
    }

    pub async fn tree(&self) -> MutexGuard<'_, T> {
        self.tree.lock().await
    }

    async fn add_file_to_disk<P: AsRef<Path>>(
        &self,
        path: P,
        mut r: &mut Pin<Box<dyn io::AsyncBufRead + Send>>,
    ) -> Result<u64> {
        let mut file = tokio::fs::File::create(path).await?;
        Ok(io::copy(&mut r, &mut file).await?)
    }

    async fn add_file_to_db<P: AsRef<Path>>(&self, path: P, file_size: u64) -> Result<()> {
        let path = path.as_ref();
        let file = path
            .file_name()
            .ok_or(anyhow!("Can't get filename from path: {}", path.display()))?;
        let base = path
            .parent()
            .ok_or(anyhow!("Can't get parent fro path: {}", path.display()))?;

        let time_tree = self.db.open_tree("time_tree")?;
        let path_tree = self.db.open_tree("path_tree")?;
        let mut hasher = std::collections::hash_map::DefaultHasher::new();
        hasher.write(base.as_os_str().as_bytes());
        let hash = hasher.finish();

        (&time_tree, &path_tree).transaction(|(tx_time, tx_path)| {
            tx_path.insert(&hash.to_le_bytes(), base.as_os_str().as_bytes())?;
            tx_time.insert(
                &Local::now().timestamp_nanos_opt().unwrap().to_be_bytes(),
                new_time_tree_entry(hash, file_size, file),
            )?;
            Ok::<(), ConflictableTransactionError<sled::Error>>(())
        })?;

        // extra scope for size-mutex lock to avoid deadlock in check_reduce below
        {
            let mut size = self.size.lock().await;
            info!(
                "Adding file: '{}', size: {:.2} MB, archive: {:.0} MB, limit: {:.0} MB",
                path.display(),
                file_size as f64 / (1024.0 * 1024.0),
                *size as f64 / (1024.0 * 1024.0),
                self.limit as f64 / (1024.0 * 1024.0),
            );

            *size += file_size;
            gauge!("archive_size").set(*size as f64);
        }

        self.check_reduce().await?;
        Ok(())
    }

    async fn new_filepath(&self, file_suffix: &str) -> Result<PathBuf> {
        let mut a = PathBuf::from(self.arch_dir.as_path());
        let (basename, filename) = self.tree.lock().await.basepath_fileprefix(file_suffix)?;
        a.push(basename);
        create_dir_all(&a).await?;
        a.push(filename);
        Ok(a)
    }

    async fn check_reduce(&self) -> Result<()> {
        let mut archive_size = self.size.lock().await;
        let time_tree = self.db.open_tree("time_tree")?;
        let path_tree = self.db.open_tree("path_tree")?;
        while *archive_size > self.limit {
            let next = time_tree.pop_min()?;
            if let Some((_, v)) = next {
                let size = u64::from_le_bytes(v[8..16].try_into()?);
                let file = OsStr::from_bytes(&v[16..]);
                let path = path_tree.get(&v[0..8])?.expect("Path not found");
                let path = OsStr::from_bytes(&path);
                let file_path: PathBuf = Path::new(path).join(file);

                info!(
                    "Deleting file: '{:?}', size: {:.2} MB",
                    file_path,
                    size as f64 / (1024.0 * 1024.0),
                );
                // file may not exist, which is ok
                let _ = remove_file(&file_path)
                    .await
                    .map_err(|s| info!("Error deleting file: {:?} ({})", file, s));
                *archive_size -= size;
            }
        }
        Ok(())
    }
}

fn new_time_tree_entry(path_hash: u64, size: u64, filename: &OsStr) -> Vec<u8> {
    let mut bytes = [path_hash.to_le_bytes(), size.to_le_bytes()].concat();
    bytes.extend_from_slice(filename.as_bytes());
    bytes
}

fn db_path(dir: &str) -> PathBuf {
    [dir, "archive.db"].iter().collect()
}

fn init_metrics() {
    describe_gauge!(
        "archive_size",
        metrics::Unit::Kibibytes,
        "The current size of the archive"
    );
    describe_gauge!(
        "archive_size_limit",
        metrics::Unit::Kibibytes,
        "The maxumim size of the archive"
    );
}