use crate::config::RepositoryConfig;
use crate::constants::SHALLOW_FLAG;
use crate::constants::{self, DEFAULT_VNODE_SIZE, MIN_OXEN_VERSION};
use crate::core::versions::MinOxenVersion;
use crate::error::OxenError;
use crate::model::merkle_tree::node::FileNode;
use crate::model::{MetadataEntry, Remote, RemoteRepository};
use crate::opts::StorageOpts;
use crate::storage::{StorageConfig, VersionStore, create_version_store};
use crate::util;
use crate::view::RepositoryView;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::path::{Path, PathBuf};
use std::sync::{Arc, LazyLock, Mutex};
use std::time::{Duration, SystemTime};
use utoipa::ToSchema;
static MTIME_TOLERANCE_CACHE: LazyLock<Mutex<HashMap<PathBuf, Duration>>> =
LazyLock::new(|| Mutex::new(HashMap::new()));
#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)]
pub struct LocalRepository {
#[schema(value_type = String)]
pub path: PathBuf,
remote_name: Option<String>, min_version: Option<String>, remotes: Vec<Remote>, vnode_size: Option<u64>, #[schema(value_type = Option<Vec<String>>)]
subtree_paths: Option<Vec<PathBuf>>, pub depth: Option<i32>, pub vfs: Option<bool>, pub remote_mode: Option<bool>, pub workspace_name: Option<String>, workspaces: Option<Vec<String>>,
#[serde(skip)]
#[schema(ignore)]
version_store: Option<Arc<dyn VersionStore>>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct LocalRepositoryWithEntries {
pub local_repo: LocalRepository,
pub entries: Option<Vec<MetadataEntry>>,
}
impl LocalRepository {
pub fn from_dir(path: impl AsRef<Path>) -> Result<Self, OxenError> {
let path = path.as_ref().to_path_buf();
let config_path = util::fs::config_filepath(&path);
let config = RepositoryConfig::from_file(&config_path)?;
let mut repo = LocalRepository {
path,
remote_name: config.remote_name,
min_version: config.min_version,
remotes: config.remotes,
vnode_size: config.vnode_size,
subtree_paths: config.subtree_paths.clone(),
depth: config.depth,
version_store: None,
vfs: config.vfs,
remote_mode: config.remote_mode,
workspace_name: config.workspace_name,
workspaces: config.workspaces,
};
let storage_opts = if let Some(storage_config) = config.storage {
StorageOpts::from_repo_config(&repo, &storage_config)?
} else {
StorageOpts::from_path(&repo.path, true)
};
let store = create_version_store(&repo.path, &storage_opts)?;
repo.version_store = Some(store);
Ok(repo)
}
pub fn version_store(&self) -> Result<Arc<dyn VersionStore>, OxenError> {
match &self.version_store {
Some(store) => Ok(Arc::clone(store)),
None => Err(OxenError::basic_str("Version store not initialized")),
}
}
pub fn init_version_store(&mut self, storage_opts: &StorageOpts) -> Result<(), OxenError> {
let store = create_version_store(&self.path, storage_opts)?;
self.version_store = Some(store);
Ok(())
}
pub fn init_default_version_store(&mut self) -> Result<(), OxenError> {
let storage_opts = StorageOpts::from_path(&self.path, true);
let store = create_version_store(&self.path, &storage_opts)?;
self.version_store = Some(store);
Ok(())
}
pub async fn set_version_store(&mut self, storage_opts: &StorageOpts) -> Result<(), OxenError> {
let version_store = create_version_store(&self.path, storage_opts)?;
version_store.init().await?;
self.version_store = Some(version_store);
Ok(())
}
pub fn from_current_dir() -> Result<LocalRepository, OxenError> {
let current_dir = std::env::current_dir().map_err(OxenError::from)?;
let repo_dir = util::fs::get_repo_root_from_current_dir()
.ok_or_else(|| OxenError::local_repo_not_found(¤t_dir))?;
LocalRepository::from_dir(&repo_dir)
}
pub fn new(
path: impl AsRef<Path>,
storage_opts: Option<StorageOpts>,
) -> Result<LocalRepository, OxenError> {
let mut repo = LocalRepository {
path: path.as_ref().to_path_buf(),
remotes: vec![],
remote_name: None,
min_version: Some(MIN_OXEN_VERSION.to_string()),
vnode_size: None,
subtree_paths: None,
depth: None,
version_store: None,
vfs: None,
remote_mode: None,
workspace_name: None,
workspaces: None,
};
if let Some(storage_opts) = storage_opts {
repo.init_version_store(&storage_opts)?;
} else {
repo.init_default_version_store()?;
}
Ok(repo)
}
pub fn new_from_version(
path: impl AsRef<Path>,
min_version: impl AsRef<str>,
storage_opts: Option<StorageOpts>,
) -> Result<LocalRepository, OxenError> {
let mut repo = LocalRepository {
path: path.as_ref().to_path_buf(),
remotes: vec![],
remote_name: None,
min_version: Some(min_version.as_ref().to_string()),
vnode_size: None,
subtree_paths: None,
depth: None,
version_store: None,
vfs: None,
remote_mode: None,
workspace_name: None,
workspaces: None,
};
if let Some(storage_opts) = storage_opts {
repo.init_version_store(&storage_opts)?;
} else {
repo.init_default_version_store()?;
}
Ok(repo)
}
pub fn from_view(view: RepositoryView) -> Result<LocalRepository, OxenError> {
let mut repo = LocalRepository {
path: std::env::current_dir()?.join(view.name),
remotes: vec![],
remote_name: None,
min_version: None,
vnode_size: None,
subtree_paths: None,
depth: None,
version_store: None,
vfs: None,
remote_mode: None,
workspace_name: None,
workspaces: None,
};
repo.init_default_version_store()?;
Ok(repo)
}
pub fn from_remote(repo: RemoteRepository, path: &Path) -> Result<LocalRepository, OxenError> {
let mut local_repo = LocalRepository {
path: path.to_owned(),
remotes: vec![repo.remote],
remote_name: Some(String::from(constants::DEFAULT_REMOTE_NAME)),
min_version: None,
vnode_size: None,
subtree_paths: None,
depth: None,
version_store: None,
vfs: None,
remote_mode: None,
workspace_name: None,
workspaces: None,
};
local_repo.init_default_version_store()?;
Ok(local_repo)
}
pub fn min_version(&self) -> MinOxenVersion {
match MinOxenVersion::or_earliest(self.min_version.clone()) {
Ok(version) => version,
Err(err) => {
panic!("Invalid repo version\n{err}")
}
}
}
pub fn set_remote_name(&mut self, name: impl AsRef<str>) {
self.remote_name = Some(name.as_ref().to_string());
}
pub fn set_min_version(&mut self, version: MinOxenVersion) {
self.min_version = Some(version.to_string());
}
pub fn remotes(&self) -> &Vec<Remote> {
&self.remotes
}
pub fn dirname(&self) -> String {
String::from(self.path.file_name().unwrap().to_str().unwrap())
}
pub fn vnode_size(&self) -> u64 {
self.vnode_size.unwrap_or(DEFAULT_VNODE_SIZE)
}
pub fn set_vnode_size(&mut self, size: u64) {
self.vnode_size = Some(size);
}
pub fn subtree_paths(&self) -> Option<Vec<PathBuf>> {
self.subtree_paths.as_ref().map(|paths| {
paths
.iter()
.map(|p| {
if p == &PathBuf::from(".") {
PathBuf::from("")
} else {
p.clone()
}
})
.collect()
})
}
pub fn set_subtree_paths(&mut self, paths: Option<Vec<PathBuf>>) {
self.subtree_paths = paths;
}
pub fn depth(&self) -> Option<i32> {
self.depth
}
pub fn set_depth(&mut self, depth: Option<i32>) {
self.depth = depth;
}
pub fn set_remote_mode(&mut self, is_remote: Option<bool>) {
self.remote_mode = is_remote;
}
pub fn is_remote_mode(&self) -> bool {
self.remote_mode.unwrap_or(false)
}
pub fn is_vfs(&self) -> bool {
self.vfs.unwrap_or(false)
}
pub fn set_vfs(&mut self, is_vfs: Option<bool>) {
self.vfs = is_vfs;
}
pub fn save(&self) -> Result<(), OxenError> {
let config_path = util::fs::config_filepath(&self.path);
let storage = self
.version_store
.as_ref()
.map(|store| -> Result<StorageConfig, OxenError> {
let settings = store.storage_settings();
match store.storage_type() {
"local" => {
let path = settings.get("path").ok_or_else(|| {
OxenError::basic_str("Storage settings missing 'path' key")
})?;
let versions_path = if util::fs::is_relative_to_dir(
path,
util::fs::oxen_hidden_dir(&self.path),
) {
util::fs::path_relative_to_dir(path, &self.path).unwrap()
} else {
PathBuf::from(path)
};
Ok(StorageConfig {
kind: store.storage_type().to_string(),
versions_path: Some(versions_path),
})
}
_ => Ok(StorageConfig {
kind: store.storage_type().to_string(),
versions_path: None,
}),
}
})
.transpose()?;
let config = RepositoryConfig {
remote_name: self.remote_name.clone(),
remotes: self.remotes.clone(),
subtree_paths: self.subtree_paths.clone(),
depth: self.depth,
min_version: self.min_version.clone(),
vnode_size: self.vnode_size,
storage,
vfs: self.vfs,
remote_mode: self.remote_mode,
workspace_name: self.workspace_name.clone(),
workspaces: self.workspaces.clone(),
};
config.save(&config_path)
}
pub fn set_remote(&mut self, name: impl AsRef<str>, url: impl AsRef<str>) -> Remote {
self.remote_name = Some(name.as_ref().to_owned());
let name = name.as_ref();
let url = url.as_ref();
let remote = Remote {
name: name.to_owned(),
url: url.to_owned(),
};
if self.has_remote(name) {
for i in 0..self.remotes.len() {
if self.remotes[i].name == name {
self.remotes[i] = remote.clone()
}
}
} else {
self.remotes.push(remote.clone());
}
remote
}
pub fn delete_remote(&mut self, name: impl AsRef<str>) {
let name = name.as_ref();
let mut new_remotes: Vec<Remote> = vec![];
for i in 0..self.remotes.len() {
if self.remotes[i].name != name {
new_remotes.push(self.remotes[i].clone());
}
}
self.remotes = new_remotes;
}
pub fn has_remote(&self, name: impl AsRef<str>) -> bool {
let name = name.as_ref();
for remote in self.remotes.iter() {
if remote.name == name {
return true;
}
}
false
}
pub fn get_remote(&self, name: impl AsRef<str>) -> Option<Remote> {
let name = name.as_ref();
log::trace!("Checking for remote {name} have {}", self.remotes.len());
for remote in self.remotes.iter() {
log::trace!("comparing: {name} -> {}", remote.name);
if remote.name == name {
return Some(remote.clone());
}
}
None
}
pub fn remote(&self) -> Option<Remote> {
if let Some(name) = &self.remote_name {
self.get_remote(name)
} else {
None
}
}
pub fn add_workspace(&mut self, name: impl AsRef<str>) {
let workspace_name = name.as_ref();
let workspaces = self.workspaces.clone().unwrap_or_default();
let mut new_workspaces = HashSet::new();
for workspace in workspaces {
new_workspaces.insert(workspace.clone());
}
new_workspaces.insert(workspace_name.to_string());
self.workspaces = Some(new_workspaces.iter().cloned().collect());
}
pub fn delete_workspace(&mut self, name: impl AsRef<str>) -> Result<(), OxenError> {
let name = name.as_ref();
if self.workspaces.is_none() {
return Err(OxenError::basic_str(format!(
"Error: Cannot delete workspace {name:?} as it does not exist"
)));
}
if self.workspace_name.is_some() && name == self.workspace_name.as_ref().unwrap() {
return Err(OxenError::basic_str(
"Error: Cannot delete current workspace",
));
}
let mut new_workspaces: Vec<String> = vec![];
let prev_workspaces = self.workspaces.clone().unwrap();
for workspace in prev_workspaces {
if workspace != name {
new_workspaces.push(workspace.clone());
}
}
self.workspaces = Some(new_workspaces);
Ok(())
}
pub fn has_workspace(&self, name: impl AsRef<str>) -> bool {
let workspace_name = name.as_ref();
self.workspaces.is_some()
&& self
.workspaces
.clone()
.unwrap()
.contains(&workspace_name.to_string())
}
pub fn set_workspace(&mut self, name: impl AsRef<str>) -> Result<(), OxenError> {
let workspace_name = name.as_ref();
if let Some(ws_name) = self
.workspaces
.clone()
.unwrap()
.iter()
.find(|ws| ws.starts_with(&format!("{workspace_name}: ")))
{
self.workspace_name = Some(ws_name.to_string());
} else {
self.add_workspace(workspace_name);
self.workspace_name = Some(workspace_name.to_string());
}
Ok(())
}
pub fn num_workspaces(&self) -> usize {
if let Some(workspaces) = &self.workspaces {
workspaces.len()
} else {
0
}
}
pub fn write_is_shallow(&self, shallow: bool) -> Result<(), OxenError> {
let shallow_flag_path = util::fs::oxen_hidden_dir(&self.path).join(SHALLOW_FLAG);
log::debug!("Write is shallow [{shallow}] to path: {shallow_flag_path:?}");
if shallow {
util::fs::write_to_path(&shallow_flag_path, "true")?;
} else if shallow_flag_path.exists() {
util::fs::remove_file(&shallow_flag_path)?;
}
Ok(())
}
pub async fn mtime_tolerance(&self) -> Duration {
if let Some(&t) = MTIME_TOLERANCE_CACHE
.lock()
.expect("mtime tolerance cache poisoned")
.get(&self.path)
{
return t;
}
let t = probe_mtime_drift(&self.path.join(constants::OXEN_HIDDEN_DIR)).await;
MTIME_TOLERANCE_CACHE
.lock()
.expect("mtime tolerance cache poisoned")
.insert(self.path.clone(), t);
t
}
pub async fn mtime_matches(&self, disk: filetime::FileTime, node: filetime::FileTime) -> bool {
if disk == node {
return true;
}
let tolerance = self.mtime_tolerance().await;
if tolerance.is_zero() {
return false;
}
let disk =
SystemTime::UNIX_EPOCH + Duration::new(disk.unix_seconds() as u64, disk.nanoseconds());
let node =
SystemTime::UNIX_EPOCH + Duration::new(node.unix_seconds() as u64, node.nanoseconds());
let diff = if disk >= node {
disk.duration_since(node).unwrap_or_default()
} else {
node.duration_since(disk).unwrap_or_default()
};
diff <= tolerance
}
pub async fn is_modified_from_node(
&self,
path: &Path,
node: &FileNode,
) -> Result<bool, OxenError> {
let metadata = match tokio::fs::symlink_metadata(path).await {
Ok(m) => m,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
log::debug!("is_modified_from_node: missing path {path:?}, returning false");
return Ok(false);
}
Err(err) => return Err(OxenError::file_metadata_error(path, err)),
};
self.is_modified_from_node_with_metadata(path, node, &metadata)
.await
}
pub async fn is_modified_from_node_with_metadata(
&self,
path: &Path,
node: &FileNode,
metadata: &std::fs::Metadata,
) -> Result<bool, OxenError> {
let file_last_modified = filetime::FileTime::from_last_modification_time(metadata);
let node_last_modified = util::fs::last_modified_time(
node.last_modified_seconds(),
node.last_modified_nanoseconds(),
);
let mtime_matched = self
.mtime_matches(file_last_modified, node_last_modified)
.await;
util::fs::classify_modified_from_node_with_metadata(path, node, metadata, mtime_matched)
}
#[cfg(test)]
pub fn set_mtime_tolerance_for_test(&self, tolerance: Duration) {
MTIME_TOLERANCE_CACHE
.lock()
.expect("mtime tolerance cache poisoned")
.insert(self.path.clone(), tolerance);
}
}
async fn probe_mtime_drift(probe_dir: &Path) -> Duration {
let probe_path = probe_dir.join(".oxen-mtime-probe");
if tokio::fs::write(&probe_path, b"").await.is_err() {
return Duration::ZERO;
}
let target = SystemTime::UNIX_EPOCH + Duration::new(1, 123_456_789);
let set_ok =
filetime::set_file_mtime(&probe_path, filetime::FileTime::from_system_time(target)).is_ok();
let drift_detected = if set_ok {
match tokio::fs::metadata(&probe_path).await {
Ok(meta) => meta
.modified()
.map(|actual| actual != target)
.unwrap_or(false),
Err(_) => false,
}
} else {
false
};
let _ = tokio::fs::remove_file(&probe_path).await;
if drift_detected {
Duration::from_secs(2)
} else {
Duration::ZERO
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use filetime::FileTime;
use crate::error::OxenError;
use crate::model::{LocalRepository, RepoNew};
use crate::test;
use tempfile::TempDir;
#[tokio::test]
async fn test_mtime_matches_honors_tolerance() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|repo| async move {
let a = FileTime::from_unix_time(1_000_000, 500_000_000);
let b = FileTime::from_unix_time(1_000_000, 500_000_000);
let one_sec_off = FileTime::from_unix_time(1_000_001, 500_000_000);
let three_sec_off = FileTime::from_unix_time(1_000_003, 500_000_000);
repo.set_mtime_tolerance_for_test(Duration::ZERO);
assert!(
repo.mtime_matches(a, b).await,
"exact equality always matches"
);
assert!(
!repo.mtime_matches(a, one_sec_off).await,
"1 s drift must not match when tolerance is zero",
);
repo.set_mtime_tolerance_for_test(Duration::from_secs(2));
assert!(
repo.mtime_matches(a, one_sec_off).await,
"1 s drift is inside the 2 s tolerance window",
);
assert!(
!repo.mtime_matches(a, three_sec_off).await,
"3 s drift is outside the 2 s tolerance window",
);
Ok(())
})
.await
}
#[test]
fn test_get_dirname_from_url() -> Result<(), OxenError> {
let url = "http://0.0.0.0:3000/repositories/OxenData";
let repo = RepoNew::from_url(url)?;
assert_eq!(repo.name, "OxenData");
assert_eq!(repo.namespace, "repositories");
Ok(())
}
#[test]
fn test_get_set_has_remote() -> Result<(), OxenError> {
test::run_empty_local_repo_test(|mut local_repo| {
let url = "http://0.0.0.0:3000/repositories/OxenData";
let remote_name = "origin";
local_repo.set_remote(remote_name, url);
let remote = local_repo.get_remote(remote_name).unwrap();
assert_eq!(remote.name, remote_name);
assert_eq!(remote.url, url);
Ok(())
})
}
#[test]
fn test_delete_remote() -> Result<(), OxenError> {
test::run_empty_local_repo_test(|mut local_repo| {
let origin_url = "http://0.0.0.0:3000/repositories/OxenData";
let origin_name = "origin";
let other_url = "http://0.0.0.0:4000/repositories/OxenData";
let other_name = "other";
local_repo.set_remote(origin_name, origin_url);
local_repo.set_remote(other_name, other_url);
local_repo.delete_remote(origin_name);
let remote = local_repo.get_remote(origin_name);
assert!(remote.is_none());
Ok(())
})
}
#[test]
fn test_add_workspace() -> Result<(), OxenError> {
let temp_dir = TempDir::new()?;
let repo_path = temp_dir.path().to_path_buf();
let mut repo = LocalRepository::new(repo_path, None)?;
let sample_name = "sample";
repo.add_workspace(sample_name);
let result = repo.has_workspace(sample_name);
assert!(result);
repo.set_workspace(sample_name)?;
assert_eq!(repo.workspace_name, Some(sample_name.to_string()));
Ok(())
}
#[test]
fn test_cannot_add_repeat_workspace() -> Result<(), OxenError> {
let temp_dir = TempDir::new()?;
let repo_path = temp_dir.path().to_path_buf();
let mut repo = LocalRepository::new(repo_path, None)?;
let sample_name = "sample";
repo.add_workspace(sample_name);
assert_eq!(repo.num_workspaces(), 1);
Ok(())
}
#[test]
fn test_delete_workspace() -> Result<(), OxenError> {
let temp_dir = TempDir::new()?;
let repo_path = temp_dir.path().to_path_buf();
let mut repo = LocalRepository::new(repo_path, None)?;
let sample_name = "sample";
repo.add_workspace(sample_name);
repo.set_workspace(sample_name)?;
let result = repo.delete_workspace(sample_name);
assert!(result.is_err());
let sample_2 = "second";
repo.add_workspace(sample_2);
repo.set_workspace(sample_2)?;
repo.delete_workspace(sample_name)?;
Ok(())
}
}