use crate::config::RepositoryConfig;
use crate::constants::SHALLOW_FLAG;
use crate::constants::{self, DEFAULT_VNODE_SIZE};
use crate::core::versions::MinOxenVersion;
use crate::error::OxenError;
use crate::model::merkle_tree::node::FileNode;
use crate::model::{MetadataEntry, Remote, RemoteRepository};
use crate::storage::{S3Opts, StorageConfig, VersionStore, create_version_store};
use crate::util;
use crate::util::fs::AtomicFile;
use crate::view::RepositoryView;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::path::{Path, PathBuf};
use std::sync::{Arc, LazyLock, Mutex};
use std::time::{Duration, SystemTime};
static MTIME_TOLERANCE_CACHE: LazyLock<Mutex<HashMap<PathBuf, Duration>>> =
LazyLock::new(|| Mutex::new(HashMap::new()));
#[derive(Debug, Clone)]
pub struct LocalRepository {
pub path: PathBuf,
remote_name: Option<String>, min_version: Option<String>, remotes: Vec<Remote>, vnode_size: Option<u64>, subtree_paths: Option<Vec<PathBuf>>, pub depth: Option<i32>, pub vfs: Option<bool>, pub remote_mode: Option<bool>, pub workspace_name: Option<String>, workspaces: Option<Vec<String>>,
storage_config: StorageConfig,
server_s3_opts: Option<S3Opts>,
version_store: Arc<dyn VersionStore>,
}
#[derive(Debug, Clone)]
pub struct LocalRepositoryWithEntries {
pub local_repo: LocalRepository,
pub entries: Option<Vec<MetadataEntry>>,
}
impl LocalRepository {
pub fn from_dir(path: impl AsRef<Path>) -> Result<Self, OxenError> {
Self::from_dir_with_server_opts(path, None)
}
pub fn from_dir_with_server_opts(
path: impl AsRef<Path>,
server_s3_opts: Option<&S3Opts>,
) -> Result<Self, OxenError> {
let path = path.as_ref();
let config_path = util::fs::config_filepath(path);
let config = RepositoryConfig::from_file(&config_path)?;
Self::new_with_server_opts(path, config, server_s3_opts)
}
pub fn storage_config(&self) -> &StorageConfig {
&self.storage_config
}
pub fn server_s3_opts(&self) -> Option<&S3Opts> {
self.server_s3_opts.as_ref()
}
pub fn version_store(&self) -> Arc<dyn VersionStore> {
Arc::clone(&self.version_store)
}
pub fn from_current_dir() -> Result<LocalRepository, OxenError> {
let current_dir = std::env::current_dir().map_err(OxenError::from)?;
let repo_dir = util::fs::get_repo_root_from_current_dir()
.ok_or_else(|| OxenError::local_repo_not_found(¤t_dir))?;
LocalRepository::from_dir(&repo_dir)
}
pub fn new(
path: impl AsRef<Path>,
config: RepositoryConfig,
) -> Result<LocalRepository, OxenError> {
Self::new_with_server_opts(path, config, None)
}
pub fn new_with_server_opts(
path: impl AsRef<Path>,
config: RepositoryConfig,
server_s3_opts: Option<&S3Opts>,
) -> Result<LocalRepository, OxenError> {
let path = path.as_ref().to_path_buf();
let storage_config = config.storage.unwrap_or_default();
let version_store = create_version_store(&path, &storage_config, server_s3_opts)?;
Ok(LocalRepository {
path,
remote_name: config.remote_name,
min_version: config.min_version,
remotes: config.remotes,
vnode_size: config.vnode_size,
subtree_paths: config.subtree_paths,
depth: config.depth,
vfs: config.vfs,
remote_mode: config.remote_mode,
workspace_name: config.workspace_name,
workspaces: config.workspaces,
storage_config,
server_s3_opts: server_s3_opts.cloned(),
version_store,
})
}
#[cfg(test)]
pub fn new_for_testing(base: &LocalRepository, version_store: Arc<dyn VersionStore>) -> Self {
LocalRepository {
version_store,
..base.clone()
}
}
pub fn from_view(view: RepositoryView) -> Result<LocalRepository, OxenError> {
let path = std::env::current_dir()?.join(view.name);
let storage_config = StorageConfig::default();
let version_store = create_version_store(&path, &storage_config, None)?;
Ok(LocalRepository {
path,
remotes: vec![],
remote_name: None,
min_version: None,
vnode_size: None,
subtree_paths: None,
depth: None,
vfs: None,
remote_mode: None,
workspace_name: None,
workspaces: None,
storage_config,
server_s3_opts: None,
version_store,
})
}
pub fn from_remote(repo: RemoteRepository, path: &Path) -> Result<LocalRepository, OxenError> {
let path = path.to_owned();
let storage_config = StorageConfig::default();
let version_store = create_version_store(&path, &storage_config, None)?;
Ok(LocalRepository {
path,
remotes: vec![repo.remote],
remote_name: Some(String::from(constants::DEFAULT_REMOTE_NAME)),
min_version: None,
vnode_size: None,
subtree_paths: None,
depth: None,
vfs: None,
remote_mode: None,
workspace_name: None,
workspaces: None,
storage_config,
server_s3_opts: None,
version_store,
})
}
pub fn min_version(&self) -> MinOxenVersion {
match MinOxenVersion::or_earliest(self.min_version.clone()) {
Ok(version) => version,
Err(err) => {
panic!("Invalid repo version\n{err}")
}
}
}
pub fn set_remote_name(&mut self, name: impl AsRef<str>) {
self.remote_name = Some(name.as_ref().to_string());
}
pub fn set_min_version(&mut self, version: MinOxenVersion) {
self.min_version = Some(version.to_string());
}
pub fn remotes(&self) -> &Vec<Remote> {
&self.remotes
}
pub fn dirname(&self) -> String {
String::from(self.path.file_name().unwrap().to_str().unwrap())
}
pub fn vnode_size(&self) -> u64 {
self.vnode_size.unwrap_or(DEFAULT_VNODE_SIZE)
}
pub fn set_vnode_size(&mut self, size: u64) {
self.vnode_size = Some(size);
}
pub fn subtree_paths(&self) -> Option<Vec<PathBuf>> {
self.subtree_paths.as_ref().map(|paths| {
paths
.iter()
.map(|p| {
if p == &PathBuf::from(".") {
PathBuf::from("")
} else {
p.clone()
}
})
.collect()
})
}
pub fn set_subtree_paths(&mut self, paths: Option<Vec<PathBuf>>) {
self.subtree_paths = paths;
}
pub fn depth(&self) -> Option<i32> {
self.depth
}
pub fn set_depth(&mut self, depth: Option<i32>) {
self.depth = depth;
}
pub fn set_remote_mode(&mut self, is_remote: Option<bool>) {
self.remote_mode = is_remote;
}
pub fn is_remote_mode(&self) -> bool {
self.remote_mode.unwrap_or(false)
}
pub fn is_vfs(&self) -> bool {
self.vfs.unwrap_or(false)
}
pub fn set_vfs(&mut self, is_vfs: Option<bool>) {
self.vfs = is_vfs;
}
pub fn save(&self) -> Result<(), OxenError> {
let config_path = util::fs::config_filepath(&self.path);
let config = RepositoryConfig {
remote_name: self.remote_name.clone(),
remotes: self.remotes.clone(),
subtree_paths: self.subtree_paths.clone(),
depth: self.depth,
min_version: self.min_version.clone(),
vnode_size: self.vnode_size,
storage: Some(self.storage_config.clone()),
vfs: self.vfs,
remote_mode: self.remote_mode,
workspace_name: self.workspace_name.clone(),
workspaces: self.workspaces.clone(),
};
config.save(&config_path)?;
Ok(())
}
pub fn set_remote(&mut self, name: impl AsRef<str>, url: impl AsRef<str>) -> Remote {
self.remote_name = Some(name.as_ref().to_owned());
let name = name.as_ref();
let url = url.as_ref();
let remote = Remote {
name: name.to_owned(),
url: url.to_owned(),
};
if self.has_remote(name) {
for i in 0..self.remotes.len() {
if self.remotes[i].name == name {
self.remotes[i] = remote.clone()
}
}
} else {
self.remotes.push(remote.clone());
}
remote
}
pub fn delete_remote(&mut self, name: impl AsRef<str>) {
let name = name.as_ref();
let mut new_remotes: Vec<Remote> = vec![];
for i in 0..self.remotes.len() {
if self.remotes[i].name != name {
new_remotes.push(self.remotes[i].clone());
}
}
self.remotes = new_remotes;
}
pub fn has_remote(&self, name: impl AsRef<str>) -> bool {
let name = name.as_ref();
for remote in self.remotes.iter() {
if remote.name == name {
return true;
}
}
false
}
pub fn get_remote(&self, name: impl AsRef<str>) -> Option<Remote> {
let name = name.as_ref();
log::trace!("Checking for remote {name} have {}", self.remotes.len());
for remote in self.remotes.iter() {
log::trace!("comparing: {name} -> {}", remote.name);
if remote.name == name {
return Some(remote.clone());
}
}
None
}
pub fn remote(&self) -> Option<Remote> {
if let Some(name) = &self.remote_name {
self.get_remote(name)
} else {
None
}
}
pub fn add_workspace(&mut self, name: impl AsRef<str>) {
let workspace_name = name.as_ref();
let workspaces = self.workspaces.clone().unwrap_or_default();
let mut new_workspaces = HashSet::new();
for workspace in workspaces {
new_workspaces.insert(workspace.clone());
}
new_workspaces.insert(workspace_name.to_string());
self.workspaces = Some(new_workspaces.iter().cloned().collect());
}
pub fn delete_workspace(&mut self, name: impl AsRef<str>) -> Result<(), OxenError> {
let name = name.as_ref();
if self.workspaces.is_none() {
return Err(OxenError::basic_str(format!(
"Error: Cannot delete workspace {name:?} as it does not exist"
)));
}
if self.workspace_name.is_some() && name == self.workspace_name.as_ref().unwrap() {
return Err(OxenError::basic_str(
"Error: Cannot delete current workspace",
));
}
let mut new_workspaces: Vec<String> = vec![];
let prev_workspaces = self.workspaces.clone().unwrap();
for workspace in prev_workspaces {
if workspace != name {
new_workspaces.push(workspace.clone());
}
}
self.workspaces = Some(new_workspaces);
Ok(())
}
pub fn has_workspace(&self, name: impl AsRef<str>) -> bool {
let workspace_name = name.as_ref();
self.workspaces.is_some()
&& self
.workspaces
.clone()
.unwrap()
.contains(&workspace_name.to_string())
}
pub fn set_workspace(&mut self, name: impl AsRef<str>) -> Result<(), OxenError> {
let workspace_name = name.as_ref();
if let Some(ws_name) = self
.workspaces
.clone()
.unwrap()
.iter()
.find(|ws| ws.starts_with(&format!("{workspace_name}: ")))
{
self.workspace_name = Some(ws_name.to_string());
} else {
self.add_workspace(workspace_name);
self.workspace_name = Some(workspace_name.to_string());
}
Ok(())
}
pub fn num_workspaces(&self) -> usize {
if let Some(workspaces) = &self.workspaces {
workspaces.len()
} else {
0
}
}
pub fn write_is_shallow(&self, shallow: bool) -> Result<(), OxenError> {
let shallow_flag_path = util::fs::oxen_hidden_dir(&self.path).join(SHALLOW_FLAG);
log::debug!("Write is shallow [{shallow}] to path: {shallow_flag_path:?}");
if shallow {
AtomicFile::new(&shallow_flag_path).write(b"true")?;
} else if shallow_flag_path.exists() {
util::fs::remove_file(&shallow_flag_path)?;
}
Ok(())
}
pub async fn mtime_tolerance(&self) -> Duration {
if let Some(&t) = MTIME_TOLERANCE_CACHE
.lock()
.expect("mtime tolerance cache poisoned")
.get(&self.path)
{
return t;
}
let t = probe_mtime_drift(&self.path.join(constants::OXEN_HIDDEN_DIR)).await;
MTIME_TOLERANCE_CACHE
.lock()
.expect("mtime tolerance cache poisoned")
.insert(self.path.clone(), t);
t
}
pub async fn mtime_matches(&self, disk: filetime::FileTime, node: filetime::FileTime) -> bool {
if disk == node {
return true;
}
let tolerance = self.mtime_tolerance().await;
if tolerance.is_zero() {
return false;
}
let disk =
SystemTime::UNIX_EPOCH + Duration::new(disk.unix_seconds() as u64, disk.nanoseconds());
let node =
SystemTime::UNIX_EPOCH + Duration::new(node.unix_seconds() as u64, node.nanoseconds());
let diff = if disk >= node {
disk.duration_since(node).unwrap_or_default()
} else {
node.duration_since(disk).unwrap_or_default()
};
diff <= tolerance
}
pub async fn is_modified_from_node(
&self,
path: &Path,
node: &FileNode,
) -> Result<bool, OxenError> {
let metadata = match tokio::fs::symlink_metadata(path).await {
Ok(m) => m,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
log::debug!("is_modified_from_node: missing path {path:?}, returning false");
return Ok(false);
}
Err(err) => return Err(OxenError::file_metadata_error(path, err)),
};
self.is_modified_from_node_with_metadata(path, node, &metadata)
.await
}
pub async fn is_modified_from_node_with_metadata(
&self,
path: &Path,
node: &FileNode,
metadata: &std::fs::Metadata,
) -> Result<bool, OxenError> {
let file_last_modified = filetime::FileTime::from_last_modification_time(metadata);
let node_last_modified = util::fs::last_modified_time(
node.last_modified_seconds(),
node.last_modified_nanoseconds(),
);
let mtime_matched = self
.mtime_matches(file_last_modified, node_last_modified)
.await;
util::fs::classify_modified_from_node_with_metadata(path, node, metadata, mtime_matched)
}
#[cfg(test)]
pub fn set_mtime_tolerance_for_test(&self, tolerance: Duration) {
MTIME_TOLERANCE_CACHE
.lock()
.expect("mtime tolerance cache poisoned")
.insert(self.path.clone(), tolerance);
}
}
async fn probe_mtime_drift(probe_dir: &Path) -> Duration {
let probe_path = probe_dir.join(".oxen-mtime-probe");
if tokio::fs::write(&probe_path, b"").await.is_err() {
return Duration::ZERO;
}
let target = SystemTime::UNIX_EPOCH + Duration::new(1, 123_456_789);
let set_ok =
filetime::set_file_mtime(&probe_path, filetime::FileTime::from_system_time(target)).is_ok();
let drift_detected = if set_ok {
match tokio::fs::metadata(&probe_path).await {
Ok(meta) => meta
.modified()
.map(|actual| actual != target)
.unwrap_or(false),
Err(_) => false,
}
} else {
false
};
let _ = tokio::fs::remove_file(&probe_path).await;
if drift_detected {
Duration::from_secs(2)
} else {
Duration::ZERO
}
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use std::time::Duration;
use filetime::FileTime;
use crate::api::requests::RepoNew;
use crate::config::RepositoryConfig;
use crate::error::OxenError;
use crate::model::LocalRepository;
use crate::test;
use tempfile::TempDir;
#[tokio::test]
async fn test_mtime_matches_honors_tolerance() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|repo| async move {
let a = FileTime::from_unix_time(1_000_000, 500_000_000);
let b = FileTime::from_unix_time(1_000_000, 500_000_000);
let one_sec_off = FileTime::from_unix_time(1_000_001, 500_000_000);
let three_sec_off = FileTime::from_unix_time(1_000_003, 500_000_000);
repo.set_mtime_tolerance_for_test(Duration::ZERO);
assert!(
repo.mtime_matches(a, b).await,
"exact equality always matches"
);
assert!(
!repo.mtime_matches(a, one_sec_off).await,
"1 s drift must not match when tolerance is zero",
);
repo.set_mtime_tolerance_for_test(Duration::from_secs(2));
assert!(
repo.mtime_matches(a, one_sec_off).await,
"1 s drift is inside the 2 s tolerance window",
);
assert!(
!repo.mtime_matches(a, three_sec_off).await,
"3 s drift is outside the 2 s tolerance window",
);
Ok(())
})
.await
}
#[test]
fn test_get_dirname_from_url() -> Result<(), OxenError> {
let url = "http://0.0.0.0:3000/repositories/OxenData";
let repo = RepoNew::from_url(url)?;
assert_eq!(repo.name, "OxenData");
assert_eq!(repo.namespace, "repositories");
Ok(())
}
#[test]
fn test_get_set_has_remote() -> Result<(), OxenError> {
test::run_empty_local_repo_test(|mut local_repo| {
let url = "http://0.0.0.0:3000/repositories/OxenData";
let remote_name = "origin";
local_repo.set_remote(remote_name, url);
let remote = local_repo.get_remote(remote_name).unwrap();
assert_eq!(remote.name, remote_name);
assert_eq!(remote.url, url);
Ok(())
})
}
#[test]
fn test_delete_remote() -> Result<(), OxenError> {
test::run_empty_local_repo_test(|mut local_repo| {
let origin_url = "http://0.0.0.0:3000/repositories/OxenData";
let origin_name = "origin";
let other_url = "http://0.0.0.0:4000/repositories/OxenData";
let other_name = "other";
local_repo.set_remote(origin_name, origin_url);
local_repo.set_remote(other_name, other_url);
local_repo.delete_remote(origin_name);
let remote = local_repo.get_remote(origin_name);
assert!(remote.is_none());
Ok(())
})
}
#[test]
fn test_add_workspace() -> Result<(), OxenError> {
let temp_dir = TempDir::new()?;
let repo_path = temp_dir.path().to_path_buf();
let mut repo = LocalRepository::new(repo_path, RepositoryConfig::default())?;
let sample_name = "sample";
repo.add_workspace(sample_name);
let result = repo.has_workspace(sample_name);
assert!(result);
repo.set_workspace(sample_name)?;
assert_eq!(repo.workspace_name, Some(sample_name.to_string()));
Ok(())
}
#[test]
fn test_cannot_add_repeat_workspace() -> Result<(), OxenError> {
let temp_dir = TempDir::new()?;
let repo_path = temp_dir.path().to_path_buf();
let mut repo = LocalRepository::new(repo_path, RepositoryConfig::default())?;
let sample_name = "sample";
repo.add_workspace(sample_name);
assert_eq!(repo.num_workspaces(), 1);
Ok(())
}
#[test]
fn test_delete_workspace() -> Result<(), OxenError> {
let temp_dir = TempDir::new()?;
let repo_path = temp_dir.path().to_path_buf();
let mut repo = LocalRepository::new(repo_path, RepositoryConfig::default())?;
let sample_name = "sample";
repo.add_workspace(sample_name);
repo.set_workspace(sample_name)?;
let result = repo.delete_workspace(sample_name);
assert!(result.is_err());
let sample_2 = "second";
repo.add_workspace(sample_2);
repo.set_workspace(sample_2)?;
repo.delete_workspace(sample_name)?;
Ok(())
}
#[test]
fn test_storage_config_custom_path_round_trip() -> Result<(), OxenError> {
use crate::storage::{StorageConfig, StorageKind};
let temp_dir = TempDir::new()?;
let repo_path = temp_dir.path().to_path_buf();
let custom = StorageConfig {
kind: StorageKind::Local,
versions_path: Some(PathBuf::from("/mnt/nfs/customer/.oxen/versions/files")),
};
let repo = LocalRepository::new(
&repo_path,
RepositoryConfig {
storage: Some(custom.clone()),
..Default::default()
},
)?;
repo.save()?;
let reloaded = LocalRepository::from_dir(&repo_path)?;
assert_eq!(reloaded.storage_config().kind, StorageKind::Local);
assert_eq!(
reloaded.storage_config().versions_path,
custom.versions_path
);
Ok(())
}
}