use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use futures::{StreamExt, TryStream, TryStreamExt};
use once_cell::sync::Lazy;
use regex::Regex;
use tap::TapFallible;
use time::format_description::FormatItem;
use tokio::fs;
use tokio_stream::wrappers::ReadDirStream;
use uuid::Uuid;
use crate::checkers::history::get_history_root_dir;
use crate::checkers::history::operation::OperationImpl;
use crate::checkers::Checker;
use crate::hoard::Direction;
use super::{Error, Operation};
pub static TIME_FORMAT: Lazy<Vec<FormatItem<'static>>> = Lazy::new(|| {
time::format_description::parse(
"[year]_[month]_[day]-[hour repr:24]_[minute]_[second].[subsecond digits:6]",
)
.unwrap()
});
pub(crate) static LOG_FILE_REGEX: Lazy<Regex> = Lazy::new(|| {
Regex::new("^[0-9]{4}(_[0-9]{2}){2}-([0-9]{2}_){2}([0-9]{2})\\.[0-9]{6}\\.log$")
.expect("invalid log file regex")
});
#[must_use]
#[tracing::instrument]
pub fn file_is_log(path: &Path) -> bool {
let result = path.is_file()
&& match path.file_name() {
None => false, Some(name) => match name.to_str() {
None => false, Some(name) => LOG_FILE_REGEX.is_match(name),
},
};
if result {
tracing::trace!("file is operation log");
} else {
tracing::trace!("file is NOT operation log");
}
result
}
#[allow(clippy::unused_async)]
#[tracing::instrument(level = "trace")]
async fn only_valid_uuid_path(entry: fs::DirEntry) -> Result<Option<fs::DirEntry>, Error> {
tracing::trace!(
"checking if {} is a system directory",
entry.path().display()
);
if entry.path().is_dir() {
entry.file_name().to_str().map_or_else(
|| Ok(None),
|s| {
tracing::trace!("checking if {} is a valid UUID", s);
Uuid::parse_str(s).is_ok().then_some(Ok(entry)).transpose()
},
)
} else {
Ok(None)
}
}
#[tracing::instrument(level = "trace")]
async fn log_files_to_delete_from_dir(
path: PathBuf,
) -> Result<impl TryStream<Ok = PathBuf, Error = Error>, Error> {
tracing::trace!("checking files in directory: {}", path.display());
let mut files: Vec<PathBuf> = fs::read_dir(&path)
.await
.map(ReadDirStream::new)?
.map_err(Error::IO)
.try_filter_map(|subentry| async move {
tracing::trace!("checking if {} is a log file", subentry.path().display());
Ok(file_is_log(&subentry.path()).then(|| subentry.path()))
})
.try_collect()
.await
.tap_err(|error| {
tracing::error!(%error, "failed to read contents of {}", path.display());
})?;
files.sort_unstable();
let recent = files.pop();
if let Some(recent) = recent {
let recent = Operation::from_file(&recent).await?;
if recent.direction() == Direction::Restore {
tracing::debug!(
"most recent log is not a backup, making sure to retain a backup log too"
);
let index = Box::pin(
tokio_stream::iter(files.iter().enumerate().rev().map(Ok)).try_filter_map(
|(i, path)| async move {
Operation::from_file(path)
.await
.map(|op| (op.direction() == Direction::Backup).then_some(i))
},
),
)
.try_next()
.await
.tap_err(|error| {
tracing::error!(%error, "error while finding most recent backup");
})?;
if let Some(index) = index {
files.remove(index);
}
}
}
Ok(tokio_stream::iter(files).map(Ok))
}
#[tracing::instrument]
async fn log_files_to_delete(
entry: fs::DirEntry,
) -> Result<impl TryStream<Ok = PathBuf, Error = Error>, Error> {
let stream = fs::read_dir(entry.path())
.await
.map(ReadDirStream::new)?
.map_err(Error::IO)
.and_then(|entry| async move {
let path = entry.path();
tracing::trace!("found hoard directory: {}", path.display());
Ok(path)
})
.try_filter_map(|path| async move { Ok(path.is_dir().then_some(path)) })
.and_then(log_files_to_delete_from_dir)
.try_flatten();
Ok(stream)
}
#[tracing::instrument(level = "trace")]
pub(crate) async fn cleanup_operations() -> Result<u32, (u32, Error)> {
let root = get_history_root_dir();
fs::read_dir(&root)
.await
.map(ReadDirStream::new)
.map_err(|error| {
tracing::error!(%error, "failed to list items in {}", root.display());
(0, Error::IO(error))
})?
.map_err(|error| {
tracing::error!(%error, "failed to read item in {}", root.display());
Error::IO(error)
})
.try_filter_map(only_valid_uuid_path)
.and_then(log_files_to_delete)
.try_flatten()
.and_then(|path| async move {
tracing::trace!("deleting {}", path.display());
fs::remove_file(&path).await.map_err(|error| {
tracing::error!(%error, "failed to delete {}", path.display());
Error::IO(error)
})
})
.fold(Ok((0, ())), |acc, res2| async move {
let (count, _) = acc?;
res2.map_err(|err| (count, err))?;
Ok((count + 1, ()))
})
.await
.map(|(count, _)| count)
}
#[tracing::instrument(level = "trace")]
async fn all_operations() -> Result<impl TryStream<Ok = Operation, Error = Error>, Error> {
let history_dir = get_history_root_dir();
tracing::trace!(?history_dir);
let iter = fs::read_dir(&history_dir)
.await
.map(ReadDirStream::new)
.tap_err(|error| {
tracing::error!(
%error,
"failed to list items in history dir {}",
history_dir.display(),
);
})?
.try_filter_map(|uuid_entry| async move {
let is_uuid = uuid_entry
.file_name()
.to_str()
.map(Uuid::parse_str)
.transpose()
.ok()
.flatten()
.is_some();
let uuid_path = uuid_entry.path();
(is_uuid && uuid_path.is_dir())
.then_some(uuid_path)
.map(Ok)
.transpose()
})
.and_then(|entry| async move {
fs::read_dir(&entry)
.await
.map(ReadDirStream::new)
.tap_err(|error| {
tracing::error!(
%error,
"failed to list items in system history dir {}",
entry.display(),
);
})
})
.try_flatten()
.try_filter_map(|hoard_entry| async move {
hoard_entry
.path()
.is_dir()
.then(|| hoard_entry.path())
.map(Ok)
.transpose()
}) .and_then(|entry| async move {
fs::read_dir(&entry)
.await
.map(ReadDirStream::new)
.tap_err(|error| {
tracing::error!(
%error,
"failed to list items in hoard history dir {}",
entry.display(),
);
})
})
.try_flatten() .map_ok(|hoard_entry| hoard_entry.path()) .try_filter_map(|path| async move { Ok(file_is_log(&path).then_some(path)) }) .map_err(Error::IO)
.and_then(|path| async move { Operation::from_file(&path).await });
Ok(iter)
}
#[tracing::instrument(level = "trace")]
async fn sorted_operations() -> Result<Vec<Operation>, Error> {
let mut list: Vec<Operation> = all_operations().await?.try_collect().await?;
list.sort_unstable_by_key(Operation::timestamp);
Ok(list)
}
#[tracing::instrument(level = "trace")]
pub(crate) async fn upgrade_operations() -> Result<(), Error> {
tracing::debug!("upgrading operation files to latest version");
let mut top_file_checksum_map = HashMap::new();
let mut top_file_set = HashMap::new();
let all_ops: Vec<_> = sorted_operations().await?;
tracing::trace!("found operations: {:?}", all_ops);
for operation in all_ops {
if !top_file_checksum_map.contains_key(operation.hoard_name()) {
top_file_checksum_map.insert(operation.hoard_name().clone(), HashMap::new());
top_file_set.insert(operation.hoard_name().clone(), HashSet::new());
}
let file_checksum_map = top_file_checksum_map
.get_mut(operation.hoard_name())
.expect("checksum map should always exist");
let file_set = top_file_set
.get_mut(operation.hoard_name())
.expect("file set should always exist");
tracing::trace!(?operation, "converting operation");
let operation = operation.convert_to_latest_version(file_checksum_map, file_set);
tracing::trace!(?operation, "converted operation");
operation.commit_to_disk().await?;
}
Ok(())
}