use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, LazyLock, Mutex as StdMutex, PoisonError};
use tokio::fs::File;
use tokio::io::BufReader;
use tokio::sync::Mutex as TokioMutex;
use crate::constants::STAGED_DIR;
use crate::core;
use crate::core::refs::with_ref_manager;
use crate::core::staged::remove_from_cache;
use crate::core::v_latest::workspaces;
use crate::error::OxenError;
use crate::model::merkle_tree::node::file_node::FileNodeOpts;
use crate::model::merkle_tree::node::{
EMerkleTreeNode, FileNode, MerkleTreeNode, StagedMerkleTreeNode,
};
use crate::model::metadata::generic_metadata::GenericMetadata;
use crate::model::{
Branch, Commit, EntryDataType, MerkleHash, NewCommitBody, StagedEntryStatus, Workspace,
};
use crate::repositories;
use crate::util;
use crate::util::progress_bar::FinishOnDropProgressBar;
use crate::view::merge::{MergeConflictFile, Mergeable};
use filetime::FileTime;
use indicatif::ProgressBar;
static COMMIT_LOCKS: LazyLock<StdMutex<HashMap<PathBuf, Arc<TokioMutex<()>>>>> =
LazyLock::new(|| StdMutex::new(HashMap::new()));
fn commit_lock_for(key: &Path) -> Arc<TokioMutex<()>> {
let mut locks = COMMIT_LOCKS.lock().unwrap_or_else(PoisonError::into_inner);
locks.entry(key.to_path_buf()).or_default().clone()
}
fn cleanup_commit_lock(key: &Path) {
let mut locks = COMMIT_LOCKS.lock().unwrap_or_else(PoisonError::into_inner);
if let Some(lock) = locks.get(key) {
if Arc::strong_count(lock) == 1 {
locks.remove(key);
}
}
}
pub async fn commit(
workspace: &Workspace,
new_commit: &NewCommitBody,
branch_name: impl AsRef<str>,
) -> Result<Commit, OxenError> {
let lock_key = workspace.workspace_repo.path.clone();
let lock = commit_lock_for(&lock_key);
let result = {
let _guard = lock.lock().await;
commit_inner(workspace, new_commit, branch_name.as_ref()).await
};
drop(lock);
cleanup_commit_lock(&lock_key);
result
}
async fn commit_inner(
workspace: &Workspace,
new_commit: &NewCommitBody,
branch_name: &str,
) -> Result<Commit, OxenError> {
let repo = &workspace.base_repo;
let commit = &workspace.commit;
let branch = match repositories::branches::get_by_name(repo, branch_name) {
Ok(branch) => branch,
Err(OxenError::BranchNotFound(_)) => {
log::debug!("commit creating branch: {branch_name}");
repositories::branches::create(repo, branch_name, &commit.id)?
}
Err(e) => return Err(e),
};
log::debug!("commit looking up branch: {:#?}", &branch);
let staged_db_path = util::fs::oxen_hidden_dir(&workspace.workspace_repo.path).join(STAGED_DIR);
log::debug!("workspaces::commit staged db path: {staged_db_path:?}");
let commit = {
let commit_progress_bar = FinishOnDropProgressBar(ProgressBar::new_spinner());
let (dir_entries, _) = core::v_latest::status::read_staged_entries_with_staged_db_manager(
&workspace.workspace_repo,
&commit_progress_bar,
)?;
let conflicts = list_conflicts(workspace, &dir_entries, &branch)?;
if !conflicts.is_empty() {
return Err(OxenError::WorkspaceBehind(Box::new(workspace.clone())));
}
let dir_entries = export_tabular_data_frames(workspace, dir_entries).await?;
repositories::commits::commit_writer::commit_dir_entries(
&workspace.base_repo,
dir_entries,
new_commit,
branch_name,
)?
};
log::debug!("Removing staged_db_path: {staged_db_path:?}");
remove_from_cache(&workspace.workspace_repo.path)?;
util::fs::remove_dir_all(staged_db_path)?;
let commit_id = commit.id.to_owned();
with_ref_manager(&workspace.base_repo, |manager| {
manager.set_branch_commit_id(branch_name, &commit_id)
})?;
if workspace.name.is_some() {
repositories::workspaces::update_commit(workspace, &commit_id)?;
} else {
repositories::workspaces::delete(workspace)?;
}
Ok(commit)
}
pub fn mergeability(
workspace: &Workspace,
branch_name: impl AsRef<str>,
) -> Result<Mergeable, OxenError> {
let branch_name = branch_name.as_ref();
let branch = repositories::branches::get_by_name(&workspace.base_repo, branch_name)?;
let base = &workspace.commit;
let Some(head) = repositories::commits::get_by_id(&workspace.base_repo, &branch.commit_id)?
else {
return Err(OxenError::RevisionNotFound(
branch.commit_id.as_str().into(),
));
};
log::debug!("workspaces::mergeability base: {base:?}");
log::debug!("workspaces::mergeability head: {head:?}");
let commits =
repositories::merge::list_commits_between_commits(&workspace.base_repo, base, &head)?;
log::debug!("workspaces::mergeability commits: {commits:?}");
let staged_db_path = util::fs::oxen_hidden_dir(&workspace.workspace_repo.path).join(STAGED_DIR);
log::debug!("workspaces::commit staged db path: {staged_db_path:?}");
let commit_progress_bar = ProgressBar::new_spinner();
let (dir_entries, _) = core::v_latest::status::read_staged_entries_with_staged_db_manager(
&workspace.workspace_repo,
&commit_progress_bar,
)?;
let conflicts = list_conflicts(workspace, &dir_entries, &branch)?;
if !conflicts.is_empty() {
return Ok(Mergeable {
is_mergeable: false,
conflicts: conflicts
.into_iter()
.map(|path| MergeConflictFile {
path: path.to_string_lossy().to_string(),
})
.collect(),
commits,
});
}
Ok(Mergeable {
is_mergeable: true,
conflicts: vec![],
commits,
})
}
fn list_conflicts(
workspace: &Workspace,
dir_entries: &HashMap<PathBuf, Vec<StagedMerkleTreeNode>>,
branch: &Branch,
) -> Result<Vec<PathBuf>, OxenError> {
let workspace_commit = &workspace.commit;
let Some(branch_commit) =
repositories::commits::get_by_id(&workspace.base_repo, &branch.commit_id)?
else {
return Err(OxenError::RevisionNotFound(
branch.commit_id.as_str().into(),
));
};
log::debug!(
"checking if workspace is behind: {:?} {} == {}",
branch.name,
branch_commit,
workspace_commit
);
if branch.commit_id == workspace_commit.id {
return Ok(vec![]);
}
let Some(branch_commit) =
repositories::commits::get_by_id(&workspace.base_repo, &branch.commit_id)?
else {
return Err(OxenError::RevisionNotFound(
branch.commit_id.as_str().into(),
));
};
let Some(branch_tree) =
repositories::tree::get_root_with_children(&workspace.base_repo, &branch_commit)?
else {
return Err(OxenError::RevisionNotFound(
branch.commit_id.as_str().into(),
));
};
let Some(workspace_tree) =
repositories::tree::get_root_with_children(&workspace.base_repo, workspace_commit)?
else {
return Err(OxenError::RevisionNotFound(
workspace.commit.id.as_str().into(),
));
};
let mut conflicts = vec![];
for (path, entries) in dir_entries {
for entry in entries {
let EMerkleTreeNode::File(_) = &entry.node.node else {
continue;
};
log::debug!("checking if workspace is behind: {path:?} -> {entry}");
let file_path = entry.node.maybe_path()?;
log::debug!("checking if branch tree has file: {file_path:?}");
let Some(branch_node) = branch_tree.get_by_path(&file_path)? else {
log::debug!("branch node not found: {file_path:?}");
continue;
};
let Some(workspace_node) = workspace_tree.get_by_path(&file_path)? else {
log::debug!("workspace node not found: {file_path:?}");
continue;
};
log::debug!("comparing hashes: {path:?} -> {entry}");
log::debug!("branch node hash: {:?}", branch_node.hash);
log::debug!("workspace node hash: {:?}", workspace_node.hash);
if branch_node.hash == workspace_node.hash {
log::debug!("branch node hashes match: {path:?} -> {entry}");
continue;
}
log::debug!("got conflict: {file_path:?}");
conflicts.push(file_path.to_path_buf());
}
}
Ok(conflicts)
}
async fn export_tabular_data_frames(
workspace: &Workspace,
dir_entries: HashMap<PathBuf, Vec<StagedMerkleTreeNode>>,
) -> Result<HashMap<PathBuf, Vec<StagedMerkleTreeNode>>, OxenError> {
let mut new_dir_entries: HashMap<PathBuf, Vec<StagedMerkleTreeNode>> = HashMap::new();
for (dir_path, entries) in dir_entries {
for dir_entry in entries {
log::debug!(
"workspace commit checking if we want to export tabular data frame: {:?} -> {}",
dir_path,
dir_entry.node
);
match &dir_entry.node.node {
EMerkleTreeNode::File(file_node) => {
let mut node_path = PathBuf::from(file_node.name());
if !node_path.starts_with(&dir_path)
|| (dir_path == Path::new("") && node_path.components().count() == 1)
{
node_path = dir_path.join(node_path);
}
if *file_node.data_type() == EntryDataType::Tabular
&& repositories::workspaces::data_frames::is_indexed(workspace, &node_path)?
{
log::debug!(
"Exporting tabular data frame: {:?} -> {:?}",
node_path,
file_node.name()
);
let exported_path =
workspaces::data_frames::extract_file_node_to_working_dir(
workspace, &dir_path, file_node,
)?;
log::debug!("exported path: {exported_path:?}");
let new_staged_merkle_tree_node = compute_staged_merkle_tree_node(
workspace,
&exported_path,
dir_entry.status,
file_node.data_type().clone(),
)
.await?;
log::debug!(
"export_tabular_data_frames new_staged_merkle_tree_node: {new_staged_merkle_tree_node:?}"
);
new_dir_entries
.entry(dir_path.to_path_buf())
.or_default()
.push(new_staged_merkle_tree_node);
} else {
new_dir_entries
.entry(dir_path.to_path_buf())
.or_default()
.push(dir_entry);
}
}
_ => {
new_dir_entries
.entry(dir_path.to_path_buf())
.or_default()
.push(dir_entry);
}
}
}
}
Ok(new_dir_entries)
}
async fn compute_staged_merkle_tree_node(
workspace: &Workspace,
path: &PathBuf,
status: StagedEntryStatus,
data_type: EntryDataType,
) -> Result<StagedMerkleTreeNode, OxenError> {
let metadata = util::fs::metadata(path)?;
let mtime = FileTime::from_last_modification_time(&metadata);
let hash = util::hasher::get_hash_given_metadata(path, &metadata)?;
let num_bytes = metadata.len();
let hash = MerkleHash::new(hash);
let mime_type = util::fs::file_mime_type(path);
log::debug!("compute_staged_merkle_tree_node path: {path:?}");
let mut metadata = repositories::metadata::get_file_metadata(path, &data_type)?;
log::debug!("compute_staged_merkle_tree_node metadata: {metadata:?}");
if data_type == EntryDataType::Tabular && metadata.is_none() {
return Err(OxenError::TabularExportMissingMetadata(path.clone()));
}
if let Ok(Some(staged_schema)) =
core::v_latest::data_frames::schemas::get_staged_schema_with_staged_db_manager(
&workspace.workspace_repo,
path,
)
&& let Some(GenericMetadata::MetadataTabular(metadata)) = &mut metadata
{
metadata
.tabular
.schema
.update_metadata_from_schema(&staged_schema);
}
let metadata_hash = util::hasher::get_metadata_hash(&metadata)?;
let combined_hash = util::hasher::get_combined_hash(Some(metadata_hash), hash.to_u128())?;
let combined_hash = MerkleHash::new(combined_hash);
log::debug!("compute_staged_merkle_tree_node writing file to version store");
let file_size = tokio::fs::metadata(path).await?.len();
let file = File::open(path).await?;
let reader = BufReader::new(file);
let version_store = workspace.base_repo.version_store();
version_store
.store_version_from_reader(&hash.to_string(), Box::new(reader), file_size)
.await?;
let file_extension = path.extension().unwrap_or_default().to_string_lossy();
let relative_path = util::fs::path_relative_to_dir(path, &workspace.workspace_repo.path)?;
let relative_path_str = relative_path.to_str().unwrap();
let file_node = FileNode::new(
&workspace.base_repo,
FileNodeOpts {
name: relative_path_str.to_string(),
hash,
combined_hash,
metadata_hash: Some(MerkleHash::new(metadata_hash)),
num_bytes,
last_modified_seconds: mtime.unix_seconds(),
last_modified_nanoseconds: mtime.nanoseconds(),
data_type,
metadata,
mime_type: mime_type.clone(),
extension: file_extension.to_string(),
},
)?;
Ok(StagedMerkleTreeNode {
status,
node: MerkleTreeNode::from_file(file_node),
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::repositories;
use crate::test;
use crate::util;
#[tokio::test]
async fn test_commit_lock_registry_shares_and_cleans_up() -> Result<(), OxenError> {
let key = PathBuf::from("test-commit-lock-registry");
let lock_a = commit_lock_for(&key);
let lock_b = commit_lock_for(&key);
let guard = lock_a.lock().await;
assert!(
lock_b.try_lock().is_err(),
"second handle should contend on the same mutex"
);
drop(guard);
assert!(lock_b.try_lock().is_ok());
let other = commit_lock_for(Path::new("test-commit-lock-registry-other"));
let _guard = lock_a.lock().await;
assert!(other.try_lock().is_ok());
drop(_guard);
drop(lock_a);
drop(lock_b);
cleanup_commit_lock(&key);
let registry = COMMIT_LOCKS.lock().unwrap();
assert!(!registry.contains_key(&key));
Ok(())
}
#[tokio::test]
async fn test_concurrent_commits_to_same_workspace_are_serialized() -> Result<(), OxenError> {
test::run_one_commit_local_repo_test_async(|repo| async move {
let head = repositories::commits::head_commit(&repo)?;
let workspace = repositories::workspaces::create_with_name(
&repo,
&head,
"concurrent-commit-test",
Some("concurrent-commit-test-ws".to_string()),
true,
)
.await?;
for name in ["file1.txt", "file2.txt"] {
let path = workspace.workspace_repo.path.join(name);
util::fs::write_to_path(&path, format!("content of {name}"))?;
repositories::workspaces::files::add(&workspace, &path).await?;
}
let body_one = NewCommitBody {
author: "author".to_string(),
email: "email".to_string(),
message: "concurrent commit one".to_string(),
};
let body_two = NewCommitBody {
author: "author".to_string(),
email: "email".to_string(),
message: "concurrent commit two".to_string(),
};
let (result_one, result_two) = tokio::join!(
commit(&workspace, &body_one, "main"),
commit(&workspace, &body_two, "main"),
);
let ok_count = [result_one.is_ok(), result_two.is_ok()]
.iter()
.filter(|ok| **ok)
.count();
assert_eq!(
ok_count, 1,
"exactly one commit should land, got: {result_one:?} / {result_two:?}"
);
let branch = repositories::branches::get_by_name(&repo, "main")?;
let head = repositories::commits::get_by_id(&repo, &branch.commit_id)?
.expect("branch head commit should exist");
for name in ["file1.txt", "file2.txt"] {
assert!(
repositories::tree::get_file_by_path(&repo, &head, Path::new(name))?.is_some(),
"{name} should be committed at the branch head"
);
}
Ok(())
})
.await
}
}