use crate::constants::{CACHE_DIR, HISTORY_DIR};
use crate::core::cache::cacher_status::{CacherStatus, CacherStatusType};
use crate::core::db::{self, str_json_db};
use crate::error::OxenError;
use crate::model::{Commit, LocalRepository};
use crate::util;
use super::cachers::{content_stats, content_validator, repo_size};
use lazy_static::lazy_static;
use rocksdb::{DBWithThreadMode, MultiThreaded};
use std::path::PathBuf;
type CommitCacher = fn(&LocalRepository, &Commit) -> Result<(), OxenError>;
lazy_static! {
static ref CACHERS: Vec<(String, CommitCacher)> = vec![
(String::from("COMMIT_CONTENT_IS_VALID"), content_validator::compute as CommitCacher),
(String::from("REPO_SIZE"), repo_size::compute as CommitCacher),
(String::from("COMMIT_STATS"), content_stats::compute as CommitCacher),
];
}
fn cached_status_db_path(repo: &LocalRepository, commit: &Commit) -> PathBuf {
util::fs::oxen_hidden_dir(&repo.path)
.join(HISTORY_DIR)
.join(&commit.id)
.join(CACHE_DIR)
.join("status.db")
}
fn cached_status_lock_path(repo: &LocalRepository, commit: &Commit) -> PathBuf {
util::fs::oxen_hidden_dir(&repo.path)
.join(HISTORY_DIR)
.join(&commit.id)
.join(CACHE_DIR)
.join("LOCK")
}
pub fn get_status(
repo: &LocalRepository,
commit: &Commit,
) -> Result<Option<CacherStatusType>, OxenError> {
let vals = get_all_statuses(repo, commit)?;
if vals.iter().any(|v| CacherStatusType::Pending == v.status) {
return Ok(Some(CacherStatusType::Pending));
}
if vals.iter().any(|v| CacherStatusType::Failed == v.status) {
return Ok(Some(CacherStatusType::Failed));
}
Ok(vals
.into_iter()
.find(|v| CacherStatusType::Success == v.status)
.map(|v| v.status))
}
pub fn get_failures(
repo: &LocalRepository,
commit: &Commit,
) -> Result<Vec<CacherStatus>, OxenError> {
let db_path = cached_status_db_path(repo, commit);
let opts = db::opts::default();
let db: DBWithThreadMode<MultiThreaded> =
DBWithThreadMode::open(&opts, dunce::simplified(&db_path))?;
let vals = str_json_db::list_vals::<MultiThreaded, CacherStatus>(&db)?
.into_iter()
.filter(|v| v.status == CacherStatusType::Failed)
.collect();
Ok(vals)
}
pub fn get_all_statuses(
repo: &LocalRepository,
commit: &Commit,
) -> Result<Vec<CacherStatus>, OxenError> {
let db_path = cached_status_db_path(repo, commit);
let lock_path = cached_status_lock_path(repo, commit);
if !db_path.exists() {
return Ok(vec![]);
}
if lock_path.exists() {
return Ok(vec![CacherStatus::pending()]);
}
log::warn!("get_all_statuses Opening db connection {:?}", db_path);
let opts = db::opts::default();
let error_if_log_file_exist = false;
let db = DBWithThreadMode::open_for_read_only(
&opts,
dunce::simplified(&db_path),
error_if_log_file_exist,
);
match db {
Ok(db) => {
let vals = str_json_db::list_vals::<MultiThreaded, CacherStatus>(&db)?;
Ok(vals)
}
Err(_) => {
log::debug!("Could not open db....still processing");
Ok(vec![CacherStatus::pending()])
}
}
}
fn get_db_connection(
repo: &LocalRepository,
commit: &Commit,
) -> Result<DBWithThreadMode<MultiThreaded>, OxenError> {
let db_path = cached_status_db_path(repo, commit);
let opts = db::opts::default();
let sleep_time = 100;
let mut num_attempts = 5;
while num_attempts >= 1 {
match DBWithThreadMode::open(&opts, dunce::simplified(&db_path)) {
Ok(db) => return Ok(db),
Err(err) => {
let time = sleep_time * num_attempts;
log::warn!(
"Could not open db connection sleeping {time}s attempt {num_attempts} {err:?}"
);
std::thread::sleep(std::time::Duration::from_millis(time));
num_attempts -= 1;
}
}
}
Err(OxenError::basic_str("Could not open db"))
}
pub fn force_remove_lock(repo: &LocalRepository, commit: &Commit) -> Result<(), OxenError> {
let lock_path = cached_status_lock_path(repo, commit);
if lock_path.exists() {
log::warn!("force_remove_lock Deleting lock file {:?}", lock_path);
util::fs::remove_file(lock_path)?;
}
Ok(())
}
pub fn run_all(repo: &LocalRepository, commit: &Commit, force: bool) -> Result<(), OxenError> {
let lock_path = cached_status_lock_path(repo, commit);
log::warn!("run_all called on commit {} force? {}", commit, force);
if lock_path.exists() {
log::warn!("run_all LOCK file exists...skipping {:?}", lock_path);
return Err(OxenError::basic_str("Already processing"));
}
if let Some(parent) = lock_path.parent() {
if !parent.exists() {
util::fs::create_dir_all(parent)?;
}
}
log::debug!("run_all Creating lock file {:?}", lock_path);
util::fs::write_to_path(&lock_path, "LOCK")?;
let db: DBWithThreadMode<MultiThreaded> = get_db_connection(repo, commit)?;
for (name, cacher) in CACHERS.iter() {
log::debug!("run_all running {:?}", name);
if let Some(val) = str_json_db::get::<MultiThreaded, &str, CacherStatus>(&db, name)? {
if CacherStatusType::Success == val.status && !force {
log::debug!("run_all skipping {:?}", name);
continue;
}
}
let pending_status = CacherStatus::pending();
str_json_db::put(&db, name, &pending_status)?;
match cacher(repo, commit) {
Ok(_) => {
let status_success = CacherStatus::success();
str_json_db::put(&db, name, &status_success)?;
log::debug!("run_all done running {:?}", name);
}
Err(err) => {
let err = format!("{err}");
log::error!("{}", err);
let status_failed = CacherStatus::failed(&err);
str_json_db::put(&db, name, &status_failed)?;
}
}
}
log::debug!("run_all Deleting lock file {:?}", lock_path);
util::fs::remove_file(lock_path)?;
Ok(())
}