use crate::core::db;
pub use crate::core::merge::entry_merge_conflict_db_reader::EntryMergeConflictDBReader;
pub use crate::core::merge::node_merge_conflict_db_reader::NodeMergeConflictDBReader;
use crate::core::merge::node_merge_conflict_reader::NodeMergeConflictReader;
use crate::core::merge::{db_path, node_merge_conflict_writer};
use crate::core::refs::with_ref_manager;
use crate::core::v_latest::commits::{get_commit_or_head, list_between};
use crate::core::v_latest::{add, rm};
use crate::error::OxenError;
use crate::model::merge_conflict::NodeMergeConflict;
use crate::model::merkle_tree::node::{EMerkleTreeNode, MerkleTreeNode};
use crate::model::{Branch, Commit, LocalRepository};
use crate::model::{MerkleHash, PartialNode};
use crate::opts::RmOpts;
use crate::repositories;
use crate::repositories::commits::commit_writer;
use crate::repositories::merge::MergeCommits;
use crate::util;
use rocksdb::DB;
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::str;
use super::index::restore;
use super::index::restore::FileToRestore;
struct MergeResult {
pub entries_to_restore: Vec<FileToRestore>,
pub cannot_overwrite_entries: Vec<PathBuf>,
}
impl MergeResult {
pub fn new() -> Self {
MergeResult {
entries_to_restore: vec![],
cannot_overwrite_entries: vec![],
}
}
}
pub async fn has_conflicts(
repo: &LocalRepository,
base_branch: &Branch,
merge_branch: &Branch,
) -> Result<bool, OxenError> {
let base_commit =
repositories::commits::get_commit_or_head(repo, Some(base_branch.commit_id.clone()))?;
let merge_commit =
repositories::commits::get_commit_or_head(repo, Some(merge_branch.commit_id.clone()))?;
let res = can_merge_commits(repo, &base_commit, &merge_commit).await?;
Ok(!res)
}
pub fn list_conflicts(repo: &LocalRepository) -> Result<Vec<NodeMergeConflict>, OxenError> {
match NodeMergeConflictReader::new(repo) {
Ok(reader) => reader.list_conflicts(),
Err(e) => {
log::debug!("Error creating NodeMergeConflictReader: {e}");
Ok(Vec::new())
}
}
}
pub fn mark_conflict_as_resolved(repo: &LocalRepository, path: &Path) -> Result<(), OxenError> {
node_merge_conflict_writer::mark_conflict_as_resolved_in_db(repo, path)
}
pub async fn can_merge_commits(
repo: &LocalRepository,
base_commit: &Commit,
merge_commit: &Commit,
) -> Result<bool, OxenError> {
let Some(lca) = lowest_common_ancestor_from_commits(repo, base_commit, merge_commit)? else {
return Ok(true);
};
let merge_commits = MergeCommits {
lca: Some(lca),
base: base_commit.clone(),
merge: merge_commit.clone(),
};
if merge_commits.is_fast_forward_merge() {
return Ok(true);
}
let write_to_disk = false;
let mut _hashes = HashSet::new();
let conflicts = find_merge_conflicts(repo, &merge_commits, write_to_disk, &mut _hashes).await?;
Ok(conflicts.is_empty())
}
pub async fn list_conflicts_between_branches(
repo: &LocalRepository,
base_branch: &Branch,
merge_branch: &Branch,
) -> Result<Vec<PathBuf>, OxenError> {
let base_commit = get_commit_or_head(repo, Some(base_branch.commit_id.clone()))?;
let merge_commit = get_commit_or_head(repo, Some(merge_branch.commit_id.clone()))?;
list_conflicts_between_commits(repo, &base_commit, &merge_commit).await
}
pub fn list_commits_between_branches(
repo: &LocalRepository,
base_branch: &Branch,
head_branch: &Branch,
) -> Result<Vec<Commit>, OxenError> {
log::debug!("list_commits_between_branches() base: {base_branch:?} head: {head_branch:?}");
let base_commit = get_commit_or_head(repo, Some(base_branch.commit_id.clone()))?;
let head_commit = get_commit_or_head(repo, Some(head_branch.commit_id.clone()))?;
let Some(lca) = lowest_common_ancestor_from_commits(repo, &base_commit, &head_commit)? else {
return Err(OxenError::basic_str(format!(
"Error: head commit {:?} and base commit {:?} have no common ancestor",
head_commit.id, base_commit.id
)));
};
log::debug!(
"list_commits_between_branches {base_commit:?} -> {head_commit:?} found lca {lca:?}"
);
list_between(repo, &lca, &head_commit)
}
pub fn list_commits_between_commits(
repo: &LocalRepository,
base_commit: &Commit,
head_commit: &Commit,
) -> Result<Vec<Commit>, OxenError> {
log::debug!("list_commits_between_commits()\nbase: {base_commit}\nhead: {head_commit}");
let Some(lca) = lowest_common_ancestor_from_commits(repo, base_commit, head_commit)? else {
return Err(OxenError::basic_str(format!(
"Error: head commit {:?} and base commit {:?} have no common ancestor",
head_commit.id, base_commit.id
)));
};
log::debug!("For commits {base_commit:?} -> {head_commit:?} found lca {lca:?}");
log::debug!("Reading history from lca to head");
list_between(repo, &lca, head_commit)
}
pub async fn list_conflicts_between_commits(
repo: &LocalRepository,
base_commit: &Commit,
merge_commit: &Commit,
) -> Result<Vec<PathBuf>, OxenError> {
let Some(lca) = lowest_common_ancestor_from_commits(repo, base_commit, merge_commit)? else {
return Ok(vec![]);
};
let merge_commits = MergeCommits {
lca: Some(lca),
base: base_commit.clone(),
merge: merge_commit.clone(),
};
let write_to_disk = false;
let mut _hashes = HashSet::new();
let conflicts = find_merge_conflicts(repo, &merge_commits, write_to_disk, &mut _hashes).await?;
Ok(conflicts
.iter()
.map(|c| {
let (_, path) = &c.base_entry;
path.to_owned()
})
.collect())
}
pub async fn merge_into_base(
repo: &LocalRepository,
merge_branch: &Branch,
base_branch: &Branch,
) -> Result<Option<Commit>, OxenError> {
log::debug!("merge_into_base merge {merge_branch} into {base_branch}");
if merge_branch.commit_id == base_branch.commit_id {
return Ok(None);
}
let base_commit = get_commit_or_head(repo, Some(base_branch.commit_id.clone()))?;
let merge_commit = get_commit_or_head(repo, Some(merge_branch.commit_id.clone()))?;
let lca = lowest_common_ancestor_from_commits(repo, &base_commit, &merge_commit)?;
log::debug!("merge_into_base base: {base_commit:?} merge: {merge_commit:?} lca: {lca:?}");
let commits = MergeCommits {
lca,
base: base_commit,
merge: merge_commit,
};
merge_commits(repo, &commits).await
}
pub async fn merge(
repo: &LocalRepository,
branch_name: impl AsRef<str>,
) -> Result<Option<Commit>, OxenError> {
let branch_name = branch_name.as_ref();
let merge_branch = repositories::branches::get_by_name(repo, branch_name)?
.ok_or(OxenError::local_branch_not_found(branch_name))?;
let base_commit = repositories::commits::head_commit(repo)?;
let merge_commit = get_commit_or_head(repo, Some(merge_branch.commit_id.clone()))?;
let lca = lowest_common_ancestor_from_commits(repo, &base_commit, &merge_commit)?;
let commits = MergeCommits {
lca,
base: base_commit,
merge: merge_commit,
};
merge_commits(repo, &commits).await
}
pub async fn merge_commit_into_base(
repo: &LocalRepository,
merge_commit: &Commit,
base_commit: &Commit,
) -> Result<Option<Commit>, OxenError> {
let lca = lowest_common_ancestor_from_commits(repo, base_commit, merge_commit)?;
log::debug!(
"merge_commit_into_base has lca {lca:?} for merge commit {merge_commit:?} and base {base_commit:?}"
);
let commits = MergeCommits {
lca,
base: base_commit.to_owned(),
merge: merge_commit.to_owned(),
};
merge_commits(repo, &commits).await
}
pub async fn merge_commit_into_base_on_branch(
repo: &LocalRepository,
merge_commit: &Commit,
base_commit: &Commit,
branch: &Branch,
) -> Result<Option<Commit>, OxenError> {
let lca = lowest_common_ancestor_from_commits(repo, base_commit, merge_commit)?;
log::debug!(
"merge_commit_into_branch has lca {lca:?} for merge commit {merge_commit:?} and base {base_commit:?}"
);
let merge_commits = MergeCommits {
lca,
base: base_commit.to_owned(),
merge: merge_commit.to_owned(),
};
merge_commits_on_branch(repo, &merge_commits, branch).await
}
pub fn has_file(repo: &LocalRepository, path: &Path) -> Result<bool, OxenError> {
let db_path = db_path(repo);
log::debug!("Merger::new() DB {db_path:?}");
let opts = db::key_val::opts::default();
let merge_db = DB::open(&opts, dunce::simplified(&db_path))?;
NodeMergeConflictDBReader::has_file(&merge_db, path)
}
pub fn remove_conflict_path(repo: &LocalRepository, path: &Path) -> Result<(), OxenError> {
let db_path = db_path(repo);
log::debug!("Merger::new() DB {db_path:?}");
let opts = db::key_val::opts::default();
let merge_db = DB::open(&opts, dunce::simplified(&db_path))?;
let path_str = path.to_str().unwrap();
let key = path_str.as_bytes();
merge_db.delete(key)?;
Ok(())
}
pub fn find_merge_commits<S: AsRef<str>>(
repo: &LocalRepository,
branch_name: S,
) -> Result<MergeCommits, OxenError> {
let branch_name = branch_name.as_ref();
let current_branch = repositories::branches::current_branch(repo)?
.ok_or(OxenError::basic_str("No current branch"))?;
let head_commit =
repositories::commits::get_commit_or_head(repo, Some(current_branch.name.clone()))?;
let merge_commit = get_commit_or_head(repo, Some(branch_name))?;
let lca = lowest_common_ancestor_from_commits(repo, &head_commit, &merge_commit)?;
Ok(MergeCommits {
lca,
base: head_commit,
merge: merge_commit,
})
}
async fn merge_commits_on_branch(
repo: &LocalRepository,
merge_commits: &MergeCommits,
branch: &Branch,
) -> Result<Option<Commit>, OxenError> {
println!(
"merge_commits_on_branch {} -> {}",
merge_commits.base.id, merge_commits.merge.id
);
log::debug!(
"FOUND MERGE COMMITS:\nLCA: {} -> {}\nBASE: {} -> {}\nMerge: {} -> {}",
merge_commits.lca.as_ref().map_or("None", |c| c.id.as_str()),
merge_commits
.lca
.as_ref()
.map_or("None", |c| c.message.as_str()),
merge_commits.base.id,
merge_commits.base.message,
merge_commits.merge.id,
merge_commits.merge.message,
);
if merge_commits.is_fast_forward_merge() {
let commit = fast_forward_merge(repo, &merge_commits.base, &merge_commits.merge).await?;
Ok(commit)
} else {
log::debug!(
"Three way merge! {} -> {}",
merge_commits.base.id,
merge_commits.merge.id
);
let write_to_disk = true;
let mut shared_hashes = HashSet::new();
let conflicts =
find_merge_conflicts(repo, merge_commits, write_to_disk, &mut shared_hashes).await?;
log::debug!("Got {} conflicts", conflicts.len());
if conflicts.is_empty() {
log::debug!("creating merge commit on branch {branch:?}");
let commit =
create_merge_commit_on_branch(repo, merge_commits, branch, shared_hashes).await?;
Ok(Some(commit))
} else {
println!(
r"
Found {} conflicts, please resolve them before merging.
oxen checkout --theirs path/to/file_1.txt
oxen checkout --ours path/to/file_2.txt
oxen add path/to/file_1.txt path/to/file_2.txt
oxen commit -m 'Merge conflict resolution'
",
conflicts.len()
);
let db_path = db_path(repo);
log::debug!("Merger::new() DB {db_path:?}");
let opts = db::key_val::opts::default();
let merge_db = DB::open(&opts, dunce::simplified(&db_path))?;
node_merge_conflict_writer::write_conflicts_to_disk(
repo,
&merge_db,
&merge_commits.merge,
&merge_commits.base,
&conflicts,
)?;
Ok(None)
}
}
}
pub fn lowest_common_ancestor(
repo: &LocalRepository,
branch_name: impl AsRef<str>,
) -> Result<Option<Commit>, OxenError> {
let branch_name = branch_name.as_ref();
let current_branch = repositories::branches::current_branch(repo)?
.ok_or(OxenError::basic_str("No current branch"))?;
let base_commit =
repositories::commits::get_commit_or_head(repo, Some(current_branch.name.clone()))?;
let merge_commit = repositories::commits::get_commit_or_head(repo, Some(branch_name))?;
lowest_common_ancestor_from_commits(repo, &base_commit, &merge_commit)
}
async fn fast_forward_merge(
repo: &LocalRepository,
base_commit: &Commit,
merge_commit: &Commit,
) -> Result<Option<Commit>, OxenError> {
log::debug!("FF merge!");
if base_commit == merge_commit {
return Ok(None);
}
let mut merge_hashes = HashSet::new();
let Some(merge_tree) = repositories::tree::get_root_with_children_and_node_hashes(
repo,
merge_commit,
None,
Some(&mut merge_hashes),
None,
)?
else {
return Err(OxenError::basic_str("Cannot get root node for base commit"));
};
let mut shared_hashes = HashSet::new();
let mut partial_nodes = HashMap::new();
let Some(base_tree) = repositories::tree::get_root_with_children_and_partial_nodes(
repo,
base_commit,
Some(&merge_hashes),
None,
Some(&mut shared_hashes),
&mut partial_nodes,
)?
else {
return Err(OxenError::basic_str(
"Cannot get root node for merge commit",
));
};
let mut merge_tree_results = MergeResult::new();
let mut seen_files = HashSet::new();
r_ff_merge_commit(
repo,
&merge_tree,
PathBuf::from(""),
&mut merge_tree_results,
&mut partial_nodes,
&mut shared_hashes,
&mut seen_files,
)?;
if !merge_tree_results.cannot_overwrite_entries.is_empty() {
return Err(OxenError::cannot_overwrite_files(
&merge_tree_results.cannot_overwrite_entries,
));
}
let mut base_tree_results = MergeResult::new();
r_ff_base_dir(
repo,
&base_tree,
PathBuf::from(""),
&mut base_tree_results,
&mut shared_hashes,
&mut seen_files,
)?;
if base_tree_results.cannot_overwrite_entries.is_empty() {
let version_store = repo.version_store()?;
for entry in merge_tree_results.entries_to_restore.iter() {
restore::restore_file(repo, &entry.file_node, &entry.path, &version_store).await?;
}
for entry in base_tree_results.entries_to_restore.iter() {
util::fs::remove_file(&entry.path)?;
}
} else {
return Err(OxenError::cannot_overwrite_files(
&base_tree_results.cannot_overwrite_entries,
));
}
with_ref_manager(repo, |manager| manager.set_head_commit_id(&merge_commit.id))?;
Ok(Some(merge_commit.clone()))
}
fn r_ff_merge_commit(
repo: &LocalRepository,
merge_node: &MerkleTreeNode,
path: impl AsRef<Path>,
results: &mut MergeResult,
base_files: &mut HashMap<PathBuf, PartialNode>,
shared_hashes: &mut HashSet<MerkleHash>,
seen_files: &mut HashSet<PathBuf>,
) -> Result<(), OxenError> {
let path = path.as_ref();
match &merge_node.node {
EMerkleTreeNode::File(merge_file_node) => {
let file_path = path.join(merge_file_node.name());
seen_files.insert(file_path.clone());
if base_files.contains_key(&file_path) {
let base_file_node = &base_files[&file_path];
let should_restore = restore::should_restore_partial_node(
repo,
Some(base_file_node.clone()),
merge_file_node,
&file_path,
)?;
if merge_node.hash != base_file_node.hash {
if should_restore {
results.entries_to_restore.push(FileToRestore {
file_node: merge_file_node.clone(),
path: file_path.clone(),
});
} else {
results.cannot_overwrite_entries.push(file_path.clone());
}
} else {
log::debug!("Merge entry has not changed, but still !restore: {file_path:?}");
if !should_restore {
results.cannot_overwrite_entries.push(file_path.clone());
}
}
} else if restore::should_restore_file(repo, None, merge_file_node, &file_path)? {
results.entries_to_restore.push(FileToRestore {
file_node: merge_file_node.clone(),
path: file_path.clone(),
});
} else {
results.cannot_overwrite_entries.push(file_path.clone());
}
}
EMerkleTreeNode::Directory(dir_node) => {
let dir_path = path.join(dir_node.name());
if shared_hashes.contains(&merge_node.hash) {
return Ok(());
};
let merge_children = {
let dir_vnodes = &merge_node.children;
let mut unique_nodes = Vec::new();
for vnode in dir_vnodes {
if !shared_hashes.contains(&vnode.hash) {
unique_nodes.extend(vnode.children.iter().cloned());
}
}
unique_nodes
};
for child in merge_children.iter() {
log::debug!("r_ff_merge_commit child_path {child}");
r_ff_merge_commit(
repo,
child,
&dir_path,
results,
base_files,
shared_hashes,
seen_files,
)?;
}
}
EMerkleTreeNode::Commit(_) => {
let root_dir = repositories::tree::get_root_dir(merge_node)?;
r_ff_merge_commit(
repo,
root_dir,
path,
results,
base_files,
shared_hashes,
seen_files,
)?;
}
_ => {
return Err(OxenError::basic_str(
"Got an unexpected node type during checkout",
));
}
}
Ok(())
}
fn r_ff_base_dir(
repo: &LocalRepository,
base_node: &MerkleTreeNode,
path: impl AsRef<Path>,
results: &mut MergeResult,
shared_hashes: &mut HashSet<MerkleHash>,
merge_files: &mut HashSet<PathBuf>,
) -> Result<(), OxenError> {
let path = path.as_ref();
match &base_node.node {
EMerkleTreeNode::File(base_file_node) => {
let file_path = path.join(base_file_node.name());
if !merge_files.contains(&file_path) {
let path = repo.path.join(file_path.clone());
if path.exists() {
if restore::should_restore_file(repo, None, base_file_node, &file_path)? {
results.entries_to_restore.push(FileToRestore {
file_node: base_file_node.clone(),
path: path.clone(),
});
} else {
results.cannot_overwrite_entries.push(file_path);
}
}
}
}
EMerkleTreeNode::Directory(dir_node) => {
let dir_path = path.join(dir_node.name());
if shared_hashes.contains(&base_node.hash) {
return Ok(());
};
let base_children = {
let dir_vnodes = &base_node.children;
let mut unique_nodes = Vec::new();
for vnode in dir_vnodes {
if !shared_hashes.contains(&vnode.hash) {
unique_nodes.extend(vnode.children.iter().cloned());
}
}
unique_nodes
};
for child in base_children.iter() {
r_ff_base_dir(repo, child, &dir_path, results, shared_hashes, merge_files)?;
}
}
EMerkleTreeNode::Commit(_) => {
let root_dir = repositories::tree::get_root_dir(base_node)?;
r_ff_base_dir(repo, root_dir, path, results, shared_hashes, merge_files)?;
}
_ => {
log::debug!("r_ff_base_dir unknown node type");
}
}
Ok(())
}
async fn merge_commits(
repo: &LocalRepository,
merge_commits: &MergeCommits,
) -> Result<Option<Commit>, OxenError> {
println!(
"Merge commits {} -> {}",
merge_commits.base.id, merge_commits.merge.id
);
log::debug!(
"FOUND MERGE COMMITS:\nLCA: {} -> {}\nBASE: {} -> {}\nMerge: {} -> {}",
merge_commits.lca.as_ref().map_or("None", |c| c.id.as_str()),
merge_commits
.lca
.as_ref()
.map_or("None", |c| c.message.as_str()),
merge_commits.base.id,
merge_commits.base.message,
merge_commits.merge.id,
merge_commits.merge.message,
);
if merge_commits.is_fast_forward_merge() {
let commit = fast_forward_merge(repo, &merge_commits.base, &merge_commits.merge).await?;
Ok(commit)
} else {
log::debug!(
"Three way merge! {} -> {}",
merge_commits.base.id,
merge_commits.merge.id
);
let write_to_disk = true;
let mut shared_hashes = HashSet::new();
let conflicts =
find_merge_conflicts(repo, merge_commits, write_to_disk, &mut shared_hashes).await?;
if !conflicts.is_empty() {
println!(
r"
Found {} conflicts, please resolve them before merging.
oxen checkout --theirs path/to/file_1.txt
oxen checkout --ours path/to/file_2.txt
oxen add path/to/file_1.txt path/to/file_2.txt
oxen commit -m 'Merge conflict resolution'
",
conflicts.len()
);
}
log::debug!("Got {} conflicts", conflicts.len());
if conflicts.is_empty() {
let commit = create_merge_commit(repo, merge_commits, shared_hashes).await?;
Ok(Some(commit))
} else {
let db_path = db_path(repo);
log::debug!("Merger::new() DB {db_path:?}");
let opts = db::key_val::opts::default();
let merge_db = DB::open(&opts, dunce::simplified(&db_path))?;
node_merge_conflict_writer::write_conflicts_to_disk(
repo,
&merge_db,
&merge_commits.merge,
&merge_commits.base,
&conflicts,
)?;
Ok(None)
}
}
}
async fn create_merge_commit(
repo: &LocalRepository,
merge_commits: &MergeCommits,
shared_hashes: HashSet<MerkleHash>,
) -> Result<Commit, OxenError> {
let head_commit = repositories::commits::head_commit(repo)?;
add::add_dir_except(repo, &Some(head_commit), repo.path.clone(), shared_hashes).await?;
let commit_msg = format!(
"Merge commit {} into {}",
merge_commits.merge.id, merge_commits.base.id
);
log::debug!("create_merge_commit {commit_msg}");
let parent_ids: Vec<String> = vec![
merge_commits.base.id.to_owned(),
merge_commits.merge.id.to_owned(),
];
let commit = commit_writer::commit_with_parent_ids(repo, &commit_msg, parent_ids)?;
Ok(commit)
}
async fn create_merge_commit_on_branch(
repo: &LocalRepository,
merge_commits: &MergeCommits,
branch: &Branch,
shared_hashes: HashSet<MerkleHash>,
) -> Result<Commit, OxenError> {
let head_commit = repositories::commits::head_commit(repo)?;
add::add_dir_except(repo, &Some(head_commit), repo.path.clone(), shared_hashes).await?;
let commit_msg = format!(
"Merge commit {} into {} on branch {}",
merge_commits.merge.id, merge_commits.base.id, branch.name
);
log::debug!("create_merge_commit_on_branch {commit_msg}");
let parent_ids: Vec<String> = vec![
merge_commits.base.id.to_owned(),
merge_commits.merge.id.to_owned(),
];
let commit = commit_writer::commit_with_parent_ids(repo, &commit_msg, parent_ids)?;
let mut opts = RmOpts::from_path(PathBuf::from("/"));
opts.staged = true;
opts.recursive = true;
rm::remove_staged(repo, &HashSet::from([PathBuf::from("/")]), &opts)?;
Ok(commit)
}
pub fn lowest_common_ancestor_from_commits(
repo: &LocalRepository,
base_commit: &Commit,
merge_commit: &Commit,
) -> Result<Option<Commit>, OxenError> {
log::debug!(
"lowest_common_ancestor_from_commits: base: {} merge: {}",
base_commit.id,
merge_commit.id
);
let commit_depths_from_head =
repositories::commits::list_from_with_depth(repo, base_commit.id.as_str())?;
let commit_depths_from_merge =
repositories::commits::list_from_with_depth(repo, merge_commit.id.as_str())?;
let mut has_common_ancestor = false;
let mut min_depth = usize::MAX;
let mut lca: Commit = commit_depths_from_head.keys().next().unwrap().clone();
for (commit, _) in commit_depths_from_merge.iter() {
if let Some(depth) = commit_depths_from_head.get(commit) {
has_common_ancestor = true;
if depth < &min_depth {
min_depth = *depth;
log::debug!("setting new lca, {commit:?}");
lca = commit.clone();
}
}
}
if has_common_ancestor {
Ok(Some(lca))
} else {
Ok(None)
}
}
pub async fn find_merge_conflicts(
repo: &LocalRepository,
merge_commits: &MergeCommits,
write_to_disk: bool,
shared_hashes: &mut HashSet<MerkleHash>,
) -> Result<Vec<NodeMergeConflict>, OxenError> {
log::debug!("finding merge conflicts");
let mut conflicts: Vec<NodeMergeConflict> = vec![];
let mut entries_to_restore: Vec<FileToRestore> = vec![];
let mut cannot_overwrite_entries: Vec<PathBuf> = vec![];
let mut lca_hashes = HashSet::new();
let mut base_hashes = HashSet::new();
let lca = merge_commits
.lca
.clone()
.unwrap_or(merge_commits.base.clone());
let lca_commit_tree = repositories::tree::get_root_with_children_and_node_hashes(
repo,
&lca,
None,
Some(&mut lca_hashes),
None,
)?
.unwrap();
let base_commit_tree = repositories::tree::get_root_with_children_and_node_hashes(
repo,
&merge_commits.base,
Some(&lca_hashes),
Some(&mut base_hashes),
Some(shared_hashes),
)?
.unwrap();
let merge_commit_tree = repositories::tree::get_root_with_children_and_node_hashes(
repo,
&merge_commits.merge,
Some(&base_hashes),
None,
Some(shared_hashes),
)?
.unwrap();
let starting_path = PathBuf::from("");
let lca_entries =
repositories::tree::unique_dir_entries(&starting_path, &lca_commit_tree, shared_hashes)?;
let base_entries =
repositories::tree::unique_dir_entries(&starting_path, &base_commit_tree, shared_hashes)?;
let merge_entries =
repositories::tree::unique_dir_entries(&starting_path, &merge_commit_tree, shared_hashes)?;
log::debug!("lca_entries.len() {}", lca_entries.len());
log::debug!("base_entries.len() {}", base_entries.len());
log::debug!("merge_entries.len() {}", merge_entries.len());
for merge_entry in merge_entries.iter() {
let entry_path = merge_entry.0;
let merge_file_node = merge_entry.1;
if base_entries.contains_key(entry_path) {
let base_file_node = &base_entries[entry_path];
if lca_entries.contains_key(entry_path) {
let lca_file_node = &lca_entries[entry_path];
if base_file_node.combined_hash() == lca_file_node.combined_hash()
&& base_file_node.combined_hash() != merge_file_node.combined_hash()
&& write_to_disk
{
log::debug!("top update entry");
if restore::should_restore_file(
repo,
Some(base_file_node.clone()),
merge_file_node,
entry_path,
)? {
entries_to_restore.push(FileToRestore {
file_node: merge_file_node.clone(),
path: entry_path.clone(),
});
} else {
cannot_overwrite_entries.push(merge_entry.0.clone());
}
}
if base_file_node.combined_hash() != lca_file_node.combined_hash()
&& lca_file_node.combined_hash() != merge_file_node.combined_hash()
&& base_file_node.combined_hash() != merge_file_node.combined_hash()
{
conflicts.push(NodeMergeConflict {
lca_entry: (lca_file_node.to_owned(), entry_path.to_path_buf()),
base_entry: (base_file_node.to_owned(), entry_path.to_path_buf()),
merge_entry: (merge_file_node.to_owned(), entry_path.to_path_buf()),
});
}
} else {
if base_file_node.combined_hash() != merge_file_node.combined_hash() {
conflicts.push(NodeMergeConflict {
lca_entry: (base_file_node.to_owned(), entry_path.to_path_buf()),
base_entry: (base_file_node.to_owned(), entry_path.to_path_buf()),
merge_entry: (merge_file_node.to_owned(), entry_path.to_path_buf()),
});
}
}
} else if write_to_disk {
log::debug!("bottom update entry {entry_path:?}");
let lca_base_node = lca_commit_tree
.get_by_path(entry_path)?
.and_then(|node| node.file().ok());
if lca_base_node.is_some()
&& let Some(parent) = entry_path.parent()
&& let Some(dir_node) = lca_commit_tree.get_by_path(parent)?
{
shared_hashes.remove(&dir_node.hash);
}
if restore::should_restore_file(repo, lca_base_node, merge_file_node, entry_path)? {
entries_to_restore.push(FileToRestore {
file_node: merge_file_node.clone(),
path: entry_path.to_path_buf(),
});
} else {
cannot_overwrite_entries.push(entry_path.clone());
}
}
}
log::debug!("three_way_merge conflicts.len() {}", conflicts.len());
if cannot_overwrite_entries.is_empty() {
let version_store = repo.version_store()?;
for entry in entries_to_restore.iter() {
restore::restore_file(repo, &entry.file_node, &entry.path, &version_store).await?;
if util::fs::is_tabular(&entry.path)
&& let Some(schema) = repositories::data_frames::schemas::get_by_path(
repo,
&merge_commits.merge,
&entry.path,
)?
{
for field in schema.fields {
if let Some(metadata) = field.metadata {
let _ = repositories::data_frames::schemas::add_column_metadata(
repo,
&entry.path,
&field.name,
&metadata,
)?;
}
}
if let Some(metadata) = schema.metadata {
let _ = repositories::data_frames::schemas::add_schema_metadata(
repo,
&entry.path,
&metadata,
)?;
}
}
}
} else {
return Err(OxenError::cannot_overwrite_files(&cannot_overwrite_entries));
}
Ok(conflicts)
}