use bytesize::ByteSize;
use flate2::Compression;
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::str;
use tar::Archive;
use crate::constants::{NODES_DIR, OXEN_HIDDEN_DIR, TREE_DIR};
use crate::core::commit_sync_status;
use crate::core::db::merkle_node::MerkleNodeDB;
use crate::core::db::merkle_node::merkle_node_db::{node_db_path, node_db_prefix};
use crate::core::node_sync_status;
use crate::core::v_latest::index::CommitMerkleTree as CommitMerkleTreeLatest;
use crate::core::v_latest::index::CommitMerkleTree;
use crate::core::v_old::v0_19_0::index::CommitMerkleTree as CommitMerkleTreeV0_19_0;
use crate::core::versions::MinOxenVersion;
use crate::error::OxenError;
use crate::model::merkle_tree::node::{
CommitNode, DirNodeWithPath, EMerkleTreeNode, FileNode, FileNodeWithDir, MerkleTreeNode,
};
use crate::model::{
Commit, EntryDataType, LocalRepository, MerkleHash, MerkleTreeNodeType, PartialNode,
TMerkleTreeNode,
};
use crate::{repositories, util};
pub fn get_root(
repo: &LocalRepository,
commit: &Commit,
) -> Result<Option<MerkleTreeNode>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_19_0 => CommitMerkleTreeV0_19_0::root_without_children(repo, commit),
_ => CommitMerkleTreeLatest::root_without_children(repo, commit),
}
}
pub fn get_root_with_children(
repo: &LocalRepository,
commit: &Commit,
) -> Result<Option<MerkleTreeNode>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_19_0 => CommitMerkleTreeV0_19_0::root_with_children(repo, commit),
_ => CommitMerkleTreeLatest::root_with_children(repo, commit),
}
}
pub fn get_root_with_children_and_node_hashes(
repo: &LocalRepository,
commit: &Commit,
base_hashes: Option<&HashSet<MerkleHash>>,
unique_hashes: Option<&mut HashSet<MerkleHash>>,
shared_hashes: Option<&mut HashSet<MerkleHash>>,
) -> Result<Option<MerkleTreeNode>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_19_0 => CommitMerkleTreeV0_19_0::root_with_children(repo, commit),
_ => CommitMerkleTreeLatest::root_with_children_and_node_hashes(
repo,
commit,
base_hashes,
unique_hashes,
shared_hashes,
),
}
}
pub fn get_root_with_children_and_partial_nodes(
repo: &LocalRepository,
commit: &Commit,
base_hashes: Option<&HashSet<MerkleHash>>,
unique_hashes: Option<&mut HashSet<MerkleHash>>,
shared_hashes: Option<&mut HashSet<MerkleHash>>,
partial_nodes: &mut HashMap<PathBuf, PartialNode>,
) -> Result<Option<MerkleTreeNode>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_19_0 => CommitMerkleTreeV0_19_0::root_with_children(repo, commit),
_ => CommitMerkleTreeLatest::root_with_children_and_partial_nodes(
repo,
commit,
base_hashes,
unique_hashes,
shared_hashes,
partial_nodes,
),
}
}
pub fn get_root_dir(node: &MerkleTreeNode) -> Result<&MerkleTreeNode, OxenError> {
if node.node.node_type() != MerkleTreeNodeType::Commit {
return Err(OxenError::basic_str(format!(
"Expected a commit node, but got: '{:?}'",
node.node.node_type()
)));
}
if node.children.len() != 1 {
return Err(OxenError::basic_str(format!(
"Commit node should have exactly one child (root directory) but got: {} from {}",
node.children.len(),
node
)));
}
let root_dir = &node.children[0];
if root_dir.node.node_type() != MerkleTreeNodeType::Dir {
return Err(OxenError::basic_str(format!(
"The child of a commit node should be a directory, but got: '{:?}'",
root_dir.node.node_type()
)));
}
Ok(root_dir)
}
pub fn get_node_by_id(
repo: &LocalRepository,
hash: &MerkleHash,
) -> Result<Option<MerkleTreeNode>, OxenError> {
let load_recursive = false;
match repo.min_version() {
MinOxenVersion::V0_19_0 => CommitMerkleTreeV0_19_0::read_node(repo, hash, load_recursive),
_ => CommitMerkleTreeLatest::read_node(repo, hash, load_recursive),
}
}
pub fn get_node_by_id_with_children(
repo: &LocalRepository,
hash: &MerkleHash,
) -> Result<Option<MerkleTreeNode>, OxenError> {
let load_recursive = true;
match repo.min_version() {
MinOxenVersion::V0_19_0 => CommitMerkleTreeV0_19_0::read_node(repo, hash, load_recursive),
_ => CommitMerkleTreeLatest::read_node(repo, hash, load_recursive),
}
}
pub fn get_commit_node_version(
repo: &LocalRepository,
commit: &Commit,
) -> Result<MinOxenVersion, OxenError> {
let commit_id = commit.id.parse()?;
let Some(commit_node) = repositories::tree::get_node_by_id(repo, &commit_id)? else {
return Err(OxenError::commit_id_does_not_exist(&commit.id));
};
let EMerkleTreeNode::Commit(commit_node) = &commit_node.node else {
log::error!("Commit node is not a commit node");
return Err(OxenError::commit_id_does_not_exist(&commit.id));
};
Ok(commit_node.version())
}
pub fn has_dir(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
) -> Result<bool, OxenError> {
let dir_hashes = CommitMerkleTreeLatest::dir_hashes(repo, commit)?;
Ok(dir_hashes.contains_key(path.as_ref()))
}
pub fn has_path(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
) -> Result<bool, OxenError> {
let path = path.as_ref();
let dir_hashes = CommitMerkleTreeLatest::dir_hashes(repo, commit)?;
match dir_hashes.get(path) {
Some(dir_hash) => {
let node = get_node_by_id_with_children(repo, dir_hash)?.unwrap();
Ok(node.get_by_path(path)?.is_some())
}
None => {
let parent = path.parent().unwrap();
if let Some(parent_hash) = dir_hashes.get(parent) {
let node = get_node_by_id_with_children(repo, parent_hash)?.unwrap();
Ok(node.get_by_path(path)?.is_some())
} else {
Ok(false)
}
}
}
}
pub fn get_node_by_path(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
) -> Result<Option<MerkleTreeNode>, OxenError> {
let load_recursive = false;
match repo.min_version() {
MinOxenVersion::V0_19_0 => {
match CommitMerkleTreeV0_19_0::from_path(repo, commit, path, load_recursive) {
Ok(tree) => Ok(Some(tree.root)),
Err(e) => {
log::warn!("Error getting node by path: {e:?}");
Ok(None)
}
}
}
_ => match CommitMerkleTreeLatest::read_from_path(repo, commit, path, load_recursive) {
Ok(node) => Ok(node),
Err(e) => {
log::warn!("Error getting node by path: {e:?}");
Ok(None)
}
},
}
}
pub fn get_node_by_path_with_children(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
) -> Result<Option<MerkleTreeNode>, OxenError> {
let load_recursive = true;
let node = match repo.min_version() {
MinOxenVersion::V0_19_0 => {
Some(CommitMerkleTreeV0_19_0::from_path(repo, commit, path, load_recursive)?.root)
}
_ => CommitMerkleTreeLatest::read_from_path(repo, commit, path, load_recursive)?,
};
Ok(node)
}
pub fn get_file_by_path(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
) -> Result<Option<FileNode>, OxenError> {
let Some(root) = get_node_by_path(repo, commit, &path)? else {
return Ok(None);
};
match root.node {
EMerkleTreeNode::File(file_node) => Ok(Some(file_node.clone())),
_ => Ok(None),
}
}
pub fn get_dir_with_children(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
dir_hashes: Option<&HashMap<PathBuf, MerkleHash>>,
) -> Result<Option<MerkleTreeNode>, OxenError> {
let _perf = crate::perf_guard!("tree::get_dir_with_children");
match repo.min_version() {
MinOxenVersion::V0_19_0 => CommitMerkleTreeV0_19_0::dir_with_children(repo, commit, path),
_ => CommitMerkleTreeLatest::dir_with_children(repo, commit, path, dir_hashes),
}
}
pub fn get_dir_without_children(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
dir_hashes: Option<&HashMap<PathBuf, MerkleHash>>,
) -> Result<Option<MerkleTreeNode>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_19_0 => {
CommitMerkleTreeV0_19_0::dir_without_children(repo, commit, path)
}
_ => CommitMerkleTreeLatest::dir_without_children(repo, commit, path, dir_hashes),
}
}
pub fn get_dir_with_children_recursive(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
dir_hashes: Option<&HashMap<PathBuf, MerkleHash>>,
) -> Result<Option<MerkleTreeNode>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_19_0 => {
CommitMerkleTreeV0_19_0::dir_with_children_recursive(repo, commit, path)
}
_ => CommitMerkleTreeLatest::dir_with_children_recursive(repo, commit, path, dir_hashes),
}
}
pub fn get_dir_with_unique_children(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
base_hashes: &HashSet<MerkleHash>,
unique_hashes: &mut HashSet<MerkleHash>,
dir_hashes: Option<&HashMap<PathBuf, MerkleHash>>,
) -> Result<Option<MerkleTreeNode>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_19_0 => CommitMerkleTreeV0_19_0::dir_with_children(repo, commit, path),
_ => CommitMerkleTreeLatest::dir_with_unique_children(
repo,
commit,
path,
base_hashes,
unique_hashes,
dir_hashes,
),
}
}
pub fn get_subtree(
repo: &LocalRepository,
commit: &Commit,
maybe_subtree: &Option<PathBuf>,
maybe_depth: &Option<i32>,
) -> Result<Option<MerkleTreeNode>, OxenError> {
match (maybe_subtree, maybe_depth) {
(Some(subtree), Some(depth)) => {
log::debug!("Getting subtree {subtree:?} with depth {depth} for commit {commit}");
get_subtree_by_depth(repo, commit, subtree, *depth)
}
(Some(subtree), None) => {
log::debug!("Getting subtree {subtree:?} for commit {commit} with depth -1");
get_subtree_by_depth(repo, commit, subtree, -1)
}
(None, Some(depth)) => {
log::debug!("Getting tree from root with depth {depth} for commit {commit}");
get_subtree_by_depth(repo, commit, PathBuf::from("."), *depth)
}
_ => {
log::debug!("Getting full tree for commit {commit}");
get_root_with_children(repo, commit)
}
}
}
pub fn get_subtree_by_depth(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
depth: i32,
) -> Result<Option<MerkleTreeNode>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_19_0 => {
CommitMerkleTreeV0_19_0::from_path_depth(repo, commit, path, depth)
}
_ => CommitMerkleTreeLatest::read_depth_from_path(repo, commit, path, depth),
}
}
pub fn get_subtree_by_depth_with_unique_children(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
base_hashes: Option<&HashSet<MerkleHash>>,
unique_hashes: Option<&mut HashSet<MerkleHash>>,
shared_hashes: Option<&mut HashSet<MerkleHash>>,
depth: i32,
) -> Result<Option<MerkleTreeNode>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_19_0 => {
CommitMerkleTreeV0_19_0::from_path_depth(repo, commit, path, depth)
}
_ => CommitMerkleTreeLatest::read_depth_from_path_and_collect_hashes(
repo,
commit,
path,
base_hashes,
unique_hashes,
shared_hashes,
depth,
),
}
}
pub fn list_nodes_from_paths(
repo: &LocalRepository,
commit: &Commit,
paths: &[PathBuf],
) -> Result<HashMap<PathBuf, MerkleTreeNode>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_19_0 => CommitMerkleTreeV0_19_0::read_nodes(repo, commit, paths),
_ => CommitMerkleTreeLatest::read_nodes(repo, commit, paths),
}
}
pub fn list_files_and_folders(node: &MerkleTreeNode) -> Result<Vec<MerkleTreeNode>, OxenError> {
if MerkleTreeNodeType::Dir != node.node.node_type() {
return Err(OxenError::basic_str(format!(
"list_files_and_folders Merkle tree node is not a directory: '{:?}'",
node.node.node_type()
)));
}
let mut children = Vec::new();
for child in &node.children {
if let EMerkleTreeNode::VNode(_) = &child.node {
children.extend(child.children.iter().cloned());
}
}
Ok(children)
}
pub fn list_files_and_folders_set(
node: &MerkleTreeNode,
) -> Result<HashSet<MerkleTreeNode>, OxenError> {
if MerkleTreeNodeType::Dir != node.node.node_type() {
return Err(OxenError::basic_str(format!(
"list_files_and_folders Merkle tree node is not a directory: '{:?}'",
node.node.node_type()
)));
}
let mut children = HashSet::new();
for child in &node.children {
if let EMerkleTreeNode::VNode(_) = &child.node {
children.extend(child.children.iter().cloned());
}
}
Ok(children)
}
pub fn list_files_and_folders_map(
node: &MerkleTreeNode,
) -> Result<HashMap<PathBuf, MerkleTreeNode>, OxenError> {
if MerkleTreeNodeType::Dir != node.node.node_type() {
return Err(OxenError::basic_str(format!(
"list_files_and_folders_map Merkle tree node is not a directory: '{:?}'",
node.node.node_type()
)));
}
let mut children = HashMap::new();
for child in &node.children {
if let EMerkleTreeNode::VNode(_) = &child.node {
for child in &child.children {
match &child.node {
EMerkleTreeNode::File(file_node) => {
children.insert(PathBuf::from(file_node.name()), child.clone());
}
EMerkleTreeNode::Directory(dir_node) => {
children.insert(PathBuf::from(dir_node.name()), child.clone());
}
_ => {}
}
}
}
}
Ok(children)
}
pub fn collect_nodes_along_path(
repo: &LocalRepository,
commit: &Commit,
paths: Vec<PathBuf>,
starting_node_hashes: &mut HashSet<MerkleHash>,
hashes: &mut HashSet<MerkleHash>,
) -> Result<(), OxenError> {
let root_path = paths
.first()
.ok_or_else(|| OxenError::basic_str("No paths provided"))?;
let node = get_node_by_path_with_children(repo, commit, root_path)?
.ok_or_else(|| OxenError::basic_str("Node not found"))?;
let (_root_node, nodes) = node.get_nodes_along_paths(paths)?;
for node in nodes {
if !starting_node_hashes.contains(&node.hash) {
hashes.insert(node.hash);
}
}
Ok(())
}
pub fn list_missing_node_hashes(
repo: &LocalRepository,
hashes: &HashSet<MerkleHash>,
) -> Result<HashSet<MerkleHash>, OxenError> {
let mut results = HashSet::new();
for hash in hashes {
if !node_sync_status::node_is_synced(repo, hash) {
results.insert(*hash);
}
}
Ok(results)
}
pub async fn list_missing_file_hashes(
repo: &LocalRepository,
hash: &MerkleHash,
) -> Result<HashSet<MerkleHash>, OxenError> {
if repo.min_version() == MinOxenVersion::V0_19_0 {
let Some(node) = CommitMerkleTreeV0_19_0::read_depth(repo, hash, 1)? else {
return Err(OxenError::basic_str(format!("Node {hash} not found")));
};
node.list_missing_file_hashes(repo).await
} else {
let Some(node) = CommitMerkleTreeLatest::read_depth(repo, hash, 1)? else {
return Err(OxenError::basic_str(format!("Node {hash} not found")));
};
node.list_missing_file_hashes(repo).await
}
}
pub async fn list_missing_file_hashes_from_commits(
repo: &LocalRepository,
commit_ids: &HashSet<MerkleHash>,
subtree_paths: &Option<Vec<PathBuf>>,
depth: &Option<i32>,
) -> Result<HashSet<MerkleHash>, OxenError> {
log::debug!(
"list_missing_file_hashes_from_commits checking {} commit ids, subtree paths: {:?}, depth: {:?}",
commit_ids.len(),
subtree_paths,
depth
);
let mut candidate_hashes: HashSet<MerkleHash> = HashSet::new();
for commit_id in commit_ids {
let commit_id_str = commit_id.to_string();
let Some(commit) = repositories::commits::get_by_id(repo, &commit_id_str)? else {
log::error!("list_missing_file_hashes_from_commits Commit {commit_id_str} not found");
return Err(OxenError::RevisionNotFound(commit_id_str.into()));
};
if let Some(subtree_paths) = subtree_paths {
for path in subtree_paths {
let Some(tree) =
repositories::tree::get_subtree(repo, &commit, &Some(path.clone()), depth)?
else {
log::warn!("list_missing_file_hashes_from_commits subtree not found for path");
continue;
};
tree.walk_tree(|node| {
if node.is_file() {
candidate_hashes.insert(node.hash);
}
});
}
} else {
let Some(tree) = get_root_with_children(repo, &commit)? else {
log::warn!(
"list_missing_file_hashes_from_commits root not found for commit: {commit:?}"
);
continue;
};
tree.walk_tree(|node| {
if node.is_file() {
candidate_hashes.insert(node.hash);
}
});
}
}
log::debug!(
"list_missing_file_hashes_from_commits candidate_hashes count: {}",
candidate_hashes.len()
);
list_missing_file_hashes_from_hashes(repo, &candidate_hashes).await
}
pub fn dir_entries_with_paths(
node: &MerkleTreeNode,
base_path: &PathBuf,
) -> Result<HashSet<(FileNode, PathBuf)>, OxenError> {
let mut entries = HashSet::new();
match &node.node {
EMerkleTreeNode::Directory(_) | EMerkleTreeNode::VNode(_) | EMerkleTreeNode::Commit(_) => {
for child in &node.children {
match &child.node {
EMerkleTreeNode::File(file_node) => {
let file_path = base_path.join(file_node.name());
entries.insert((file_node.clone(), file_path));
}
EMerkleTreeNode::Directory(dir_node) => {
let new_base_path = base_path.join(dir_node.name());
entries.extend(dir_entries_with_paths(child, &new_base_path)?);
}
EMerkleTreeNode::VNode(_vnode) => {
entries.extend(dir_entries_with_paths(child, base_path)?);
}
_ => {}
}
}
}
EMerkleTreeNode::File(_) => {}
_ => {
return Err(OxenError::basic_str(format!(
"Unexpected node type: {:?}",
node.node.node_type()
)));
}
}
Ok(entries)
}
pub fn unique_dir_entries(
base_path: &PathBuf,
node: &MerkleTreeNode,
shared_hashes: &HashSet<MerkleHash>,
) -> Result<HashMap<PathBuf, FileNode>, OxenError> {
let mut entries = HashMap::new();
match &node.node {
EMerkleTreeNode::Directory(_) | EMerkleTreeNode::VNode(_) | EMerkleTreeNode::Commit(_) => {
if !shared_hashes.contains(&node.hash) {
for child in &node.children {
match &child.node {
EMerkleTreeNode::File(file_node) => {
let file_path = base_path.join(file_node.name());
entries.insert(file_path, file_node.clone());
}
EMerkleTreeNode::Directory(dir_node) => {
let new_base_path = base_path.join(dir_node.name());
entries.extend(unique_dir_entries(
&new_base_path,
child,
shared_hashes,
)?);
}
EMerkleTreeNode::VNode(_vnode) => {
entries.extend(unique_dir_entries(base_path, child, shared_hashes)?);
}
_ => {}
}
}
}
}
EMerkleTreeNode::File(_) => {}
_ => {
return Err(OxenError::basic_str(format!(
"Unexpected node type: {:?}",
node.node.node_type()
)));
}
}
Ok(entries)
}
pub fn list_unsynced_commit_hashes(
repo: &LocalRepository,
hashes: &HashSet<MerkleHash>,
) -> Result<HashSet<MerkleHash>, OxenError> {
let mut results = HashSet::new();
for hash in hashes {
if !commit_sync_status::commit_is_synced(repo, hash) {
results.insert(*hash);
}
}
Ok(results)
}
async fn list_missing_file_hashes_from_hashes(
repo: &LocalRepository,
hashes: &HashSet<MerkleHash>,
) -> Result<HashSet<MerkleHash>, OxenError> {
let mut results = HashSet::new();
let version_store = repo.version_store()?;
for hash in hashes {
if !version_store.version_exists(&hash.to_string()).await? {
results.insert(*hash);
}
}
Ok(results)
}
pub fn list_all_files(
node: &MerkleTreeNode,
subtree_path: &PathBuf,
) -> Result<HashSet<FileNodeWithDir>, OxenError> {
let mut file_nodes = HashSet::new();
r_list_all_files(node, subtree_path, &mut file_nodes)?;
Ok(file_nodes)
}
fn r_list_all_files(
node: &MerkleTreeNode,
traversed_path: impl AsRef<Path>,
file_nodes: &mut HashSet<FileNodeWithDir>,
) -> Result<(), OxenError> {
let traversed_path = traversed_path.as_ref();
for child in &node.children {
match &child.node {
EMerkleTreeNode::File(file_node) => {
file_nodes.insert(FileNodeWithDir {
file_node: file_node.to_owned(),
dir: traversed_path.to_owned(),
});
}
EMerkleTreeNode::Directory(dir_node) => {
let new_path = traversed_path.join(dir_node.name());
r_list_all_files(child, new_path, file_nodes)?;
}
EMerkleTreeNode::VNode(_) => {
r_list_all_files(child, traversed_path, file_nodes)?;
}
_ => {}
}
}
Ok(())
}
pub fn list_all_dirs(node: &MerkleTreeNode) -> Result<HashSet<DirNodeWithPath>, OxenError> {
let mut dir_nodes = HashSet::new();
r_list_all_dirs(node, PathBuf::from(""), &mut dir_nodes)?;
Ok(dir_nodes)
}
fn r_list_all_dirs(
node: &MerkleTreeNode,
traversed_path: impl AsRef<Path>,
dir_nodes: &mut HashSet<DirNodeWithPath>,
) -> Result<(), OxenError> {
let traversed_path = traversed_path.as_ref();
for child in &node.children {
match &child.node {
EMerkleTreeNode::Directory(dir_node) => {
let new_path = traversed_path.join(dir_node.name());
dir_nodes.insert(DirNodeWithPath {
dir_node: dir_node.to_owned(),
path: new_path.to_owned(),
});
r_list_all_dirs(child, new_path, dir_nodes)?;
}
EMerkleTreeNode::VNode(_) => {
r_list_all_dirs(child, traversed_path, dir_nodes)?;
}
_ => {}
}
}
Ok(())
}
pub fn list_files_and_dirs(
root: &MerkleTreeNode,
) -> Result<(HashSet<FileNodeWithDir>, HashSet<DirNodeWithPath>), OxenError> {
let mut file_nodes = HashSet::new();
let mut dir_nodes = HashSet::new();
r_list_files_and_dirs(root, PathBuf::new(), &mut file_nodes, &mut dir_nodes)?;
Ok((file_nodes, dir_nodes))
}
fn r_list_files_and_dirs(
node: &MerkleTreeNode,
traversed_path: impl AsRef<Path>,
file_nodes: &mut HashSet<FileNodeWithDir>,
dir_nodes: &mut HashSet<DirNodeWithPath>,
) -> Result<(), OxenError> {
let traversed_path = traversed_path.as_ref();
if let EMerkleTreeNode::File(file_node) = &node.node {
file_nodes.insert(FileNodeWithDir {
file_node: file_node.to_owned(),
dir: traversed_path.to_owned(),
});
return Ok(());
}
for child in &node.children {
match &child.node {
EMerkleTreeNode::File(file_node) => {
file_nodes.insert(FileNodeWithDir {
file_node: file_node.to_owned(),
dir: traversed_path.to_owned(),
});
}
EMerkleTreeNode::Directory(dir_node) => {
let new_path = traversed_path.join(dir_node.name());
if new_path != Path::new("") {
dir_nodes.insert(DirNodeWithPath {
dir_node: dir_node.to_owned(),
path: new_path.to_owned(),
});
}
r_list_files_and_dirs(child, new_path, file_nodes, dir_nodes)?;
}
EMerkleTreeNode::VNode(_) => {
r_list_files_and_dirs(child, traversed_path, file_nodes, dir_nodes)?;
}
_ => {}
}
}
Ok(())
}
pub fn list_tabular_files_in_repo(
repo: &LocalRepository,
commit: &Commit,
) -> Result<HashSet<FileNode>, OxenError> {
let entries = list_files_by_type(repo, commit, &EntryDataType::Tabular)?;
Ok(entries)
}
pub fn list_files_by_type(
repo: &LocalRepository,
commit: &Commit,
data_type: &EntryDataType,
) -> Result<HashSet<FileNode>, OxenError> {
let mut file_nodes = HashSet::new();
let Some(tree) = get_root_with_children(repo, commit)? else {
log::warn!("get_root_with_children returned None for commit: {commit:?}");
return Ok(file_nodes);
};
r_list_files_by_type(&tree, data_type, &mut file_nodes, PathBuf::new())?;
Ok(file_nodes)
}
fn r_list_files_by_type(
node: &MerkleTreeNode,
data_type: &EntryDataType,
file_nodes: &mut HashSet<FileNode>,
traversed_path: impl AsRef<Path>,
) -> Result<(), OxenError> {
let traversed_path = traversed_path.as_ref();
for child in &node.children {
match &child.node {
EMerkleTreeNode::File(file_node) => {
if file_node.data_type() == data_type {
let mut file_node = file_node.to_owned();
let full_path = traversed_path.join(file_node.name());
file_node.set_name(&full_path.to_string_lossy());
file_nodes.insert(file_node);
}
}
EMerkleTreeNode::Directory(dir_node) => {
let full_path = traversed_path.join(dir_node.name());
r_list_files_by_type(child, data_type, file_nodes, full_path)?;
}
EMerkleTreeNode::VNode(_) => {
r_list_files_by_type(child, data_type, file_nodes, traversed_path)?;
}
_ => {}
}
}
Ok(())
}
pub fn cp_dir_hashes_to(
repo: &LocalRepository,
original_commit_id: &MerkleHash,
new_commit_id: &MerkleHash,
) -> Result<(), OxenError> {
let original_dir_hashes_path =
CommitMerkleTree::dir_hash_db_path_from_commit_id(repo, &original_commit_id.to_string());
let new_dir_hashes_path =
CommitMerkleTree::dir_hash_db_path_from_commit_id(repo, &new_commit_id.to_string());
util::fs::copy_dir_all(original_dir_hashes_path, new_dir_hashes_path)?;
Ok(())
}
pub fn compress_tree(repository: &LocalRepository) -> Result<Vec<u8>, OxenError> {
let enc = GzEncoder::new(Vec::new(), Compression::fast());
let mut tar = tar::Builder::new(enc);
compress_full_tree(repository, &mut tar)?;
tar.finish()?;
let buffer: Vec<u8> = tar.into_inner()?.finish()?;
let total_size: u64 = u64::try_from(buffer.len()).unwrap_or(u64::MAX);
log::debug!("Compressed entire tree size is {}", ByteSize::b(total_size));
Ok(buffer)
}
pub fn compress_full_tree(
repository: &LocalRepository,
tar: &mut tar::Builder<GzEncoder<Vec<u8>>>,
) -> Result<(), OxenError> {
let tar_subdir = Path::new(TREE_DIR).join(NODES_DIR);
let nodes_dir = repository
.path
.join(OXEN_HIDDEN_DIR)
.join(TREE_DIR)
.join(NODES_DIR);
log::debug!("Compressing tree in dir {nodes_dir:?}");
if nodes_dir.exists() {
tar.append_dir_all(&tar_subdir, nodes_dir)?;
}
Ok(())
}
pub fn compress_nodes(
repository: &LocalRepository,
hashes: &HashSet<MerkleHash>,
) -> Result<Vec<u8>, OxenError> {
let enc = GzEncoder::new(Vec::new(), Compression::fast());
let mut tar = tar::Builder::new(enc);
log::debug!("Compressing {} unique nodes...", hashes.len());
for hash in hashes {
let dir_prefix = node_db_prefix(hash);
let tar_subdir = Path::new(TREE_DIR).join(NODES_DIR).join(dir_prefix);
let node_dir = node_db_path(repository, hash);
if node_dir.exists() {
tar.append_dir_all(&tar_subdir, node_dir)?;
}
}
tar.finish()?;
let buffer: Vec<u8> = tar.into_inner()?.finish()?;
Ok(buffer)
}
pub fn compress_node(
repository: &LocalRepository,
hash: &MerkleHash,
) -> Result<Vec<u8>, OxenError> {
let dir_prefix = node_db_prefix(hash);
let tar_subdir = Path::new(TREE_DIR).join(NODES_DIR).join(dir_prefix);
let enc = GzEncoder::new(Vec::new(), Compression::fast());
let mut tar = tar::Builder::new(enc);
let node_dir = node_db_path(repository, hash);
if node_dir.exists() {
tar.append_dir_all(&tar_subdir, node_dir)?;
}
tar.finish()?;
let buffer: Vec<u8> = tar.into_inner()?.finish()?;
let total_size: u64 = u64::try_from(buffer.len()).unwrap_or(u64::MAX);
log::debug!(
"Compressed node {} size is {}",
hash,
ByteSize::b(total_size)
);
Ok(buffer)
}
pub fn compress_commits(
repository: &LocalRepository,
commits: &Vec<Commit>,
) -> Result<Vec<u8>, OxenError> {
let enc = GzEncoder::new(Vec::new(), Compression::fast());
let mut tar = tar::Builder::new(enc);
for commit in commits {
let hash = commit.hash()?;
let dir_prefix = node_db_prefix(&hash);
let tar_subdir = Path::new(TREE_DIR).join(NODES_DIR).join(dir_prefix);
let node_dir = node_db_path(repository, &hash);
log::debug!("Compressing commit from dir {node_dir:?}");
if node_dir.exists() {
tar.append_dir_all(&tar_subdir, node_dir)?;
}
}
tar.finish()?;
let buffer: Vec<u8> = tar.into_inner()?.finish()?;
Ok(buffer)
}
pub fn unpack_nodes(
repository: &LocalRepository,
buffer: &[u8],
) -> Result<HashSet<MerkleHash>, OxenError> {
let mut hashes: HashSet<MerkleHash> = HashSet::new();
log::debug!("Unpacking nodes from buffer...");
let decoder = GzDecoder::new(buffer);
log::debug!("Decoder created");
let mut archive = Archive::new(decoder);
log::debug!("Archive created");
let Ok(entries) = archive.entries() else {
return Err(OxenError::basic_str(
"Could not unpack tree database from archive",
));
};
log::debug!("Extracting entries...");
for file in entries {
let Ok(mut file) = file else {
log::error!("Could not unpack file in archive...");
continue;
};
let path = file.path().unwrap();
let oxen_hidden_path = repository.path.join(OXEN_HIDDEN_DIR);
let dst_path = oxen_hidden_path.join(TREE_DIR).join(NODES_DIR).join(path);
if let Some(parent) = dst_path.parent() {
util::fs::create_dir_all(parent).expect("Could not create parent dir");
}
if dst_path.exists() {
log::debug!("Node already exists at {dst_path:?}");
continue;
}
file.unpack(&dst_path)?;
if !dst_path.ends_with("node") && !dst_path.ends_with("children") {
let id = dst_path
.components()
.rev()
.take(2)
.map(|c| c.as_os_str().to_str().unwrap())
.collect::<Vec<&str>>()
.into_iter()
.rev()
.collect::<String>();
hashes.insert(id.parse()?);
}
}
Ok(hashes)
}
pub fn write_tree(repo: &LocalRepository, node: &MerkleTreeNode) -> Result<(), OxenError> {
let EMerkleTreeNode::Commit(commit_node) = &node.node else {
return Err(OxenError::basic_str("Expected commit node"));
};
let commit_node = CommitNode::new(repo, commit_node.get_opts())?;
p_write_tree(repo, node, &commit_node)?;
Ok(())
}
fn p_write_tree(
repo: &LocalRepository,
node: &MerkleTreeNode,
node_impl: &impl TMerkleTreeNode,
) -> Result<(), OxenError> {
let parent_id = node.parent_id;
let mut db = MerkleNodeDB::open_read_write(repo, node_impl, parent_id)?;
for child in &node.children {
match &child.node {
EMerkleTreeNode::VNode(vnode) => {
db.add_child(vnode)?;
p_write_tree(repo, child, vnode)?;
}
EMerkleTreeNode::Directory(dir_node) => {
db.add_child(dir_node)?;
p_write_tree(repo, child, dir_node)?;
}
EMerkleTreeNode::File(file_node) => {
db.add_child(file_node)?;
}
node => {
panic!("p_write_tree Unexpected node type: {node:?}");
}
}
}
db.close()?;
Ok(())
}
pub fn get_all_node_hashes_for_commits(
repository: &LocalRepository,
commits: &[Commit],
maybe_subtrees: &Option<Vec<PathBuf>>,
maybe_depth: &Option<i32>,
is_download: bool,
) -> Result<HashSet<MerkleHash>, OxenError> {
log::debug!(
"Getting ALL node hashes for {} commits... and subtree paths {:?}",
commits.len(),
maybe_subtrees
);
let mut all_node_hashes: HashSet<MerkleHash> = HashSet::new();
let mut starting_node_hashes = HashSet::new();
for commit in commits {
get_node_hashes_for_commit(
repository,
commit,
maybe_subtrees,
maybe_depth,
is_download,
&mut starting_node_hashes,
&mut all_node_hashes,
)?;
}
log::debug!("All node hashes: {}", all_node_hashes.len());
Ok(all_node_hashes)
}
pub fn get_node_hashes_between_commits(
repository: &LocalRepository,
commits: &[Commit],
maybe_subtrees: &Option<Vec<PathBuf>>,
maybe_depth: &Option<i32>,
is_download: bool,
) -> Result<HashSet<MerkleHash>, OxenError> {
log::debug!(
"Getting new node hashes for {} commits... and subtree paths {:?}",
commits.len(),
maybe_subtrees
);
let (first_commit, new_commits) = commits
.split_first()
.ok_or_else(|| OxenError::basic_str("Must provide at least one commit"))?;
let mut starting_node_hashes = HashSet::new();
if let Some(subtrees) = maybe_subtrees {
for subtree in subtrees {
populate_starting_hashes(
repository,
first_commit,
&Some(subtree.to_path_buf()),
maybe_depth,
&mut starting_node_hashes,
)?;
}
} else {
populate_starting_hashes(
repository,
first_commit,
&None,
maybe_depth,
&mut starting_node_hashes,
)?;
}
if new_commits.is_empty() {
return Ok(starting_node_hashes);
}
let mut new_node_hashes: HashSet<MerkleHash> = HashSet::new();
for commit in new_commits {
get_node_hashes_for_commit(
repository,
commit,
maybe_subtrees,
maybe_depth,
is_download,
&mut starting_node_hashes,
&mut new_node_hashes,
)?;
}
log::debug!("New node hashes: {}", new_node_hashes.len());
Ok(new_node_hashes)
}
pub fn get_node_hashes_for_commit(
repository: &LocalRepository,
commit: &Commit,
maybe_subtrees: &Option<Vec<PathBuf>>,
maybe_depth: &Option<i32>,
is_download: bool,
starting_node_hashes: &mut HashSet<MerkleHash>,
new_node_hashes: &mut HashSet<MerkleHash>,
) -> Result<(), OxenError> {
log::debug!("get_node_hashes_for_commit: {}", commit.id);
if let Some(subtrees) = maybe_subtrees {
let mut all_parent_paths: Vec<PathBuf> = Vec::new();
log::debug!("subtrees: {subtrees:?}");
for subtree_path in subtrees {
let path = subtree_path.clone();
let mut current_path = path.clone();
all_parent_paths.push(current_path.clone());
while let Some(parent) = current_path.parent() {
all_parent_paths.push(parent.to_path_buf());
current_path = parent.to_path_buf();
}
all_parent_paths.reverse();
}
log::debug!("all_parent_paths: {all_parent_paths:?}");
for subtree in subtrees {
get_node_hashes_for_subtree(
repository,
commit,
&Some(subtree.clone()),
maybe_depth,
starting_node_hashes,
new_node_hashes,
)?;
}
log::debug!("new_node_hashes: {}", new_node_hashes.len());
if !is_download {
collect_nodes_along_path(
repository,
commit,
all_parent_paths,
starting_node_hashes,
new_node_hashes,
)?;
}
} else {
get_node_hashes_for_subtree(
repository,
commit,
&None,
maybe_depth,
starting_node_hashes,
new_node_hashes,
)?;
}
log::debug!("new_node_hashes: {}", new_node_hashes.len());
new_node_hashes.insert(commit.hash()?);
Ok(())
}
fn get_node_hashes_for_subtree(
repository: &LocalRepository,
commit: &Commit,
subtree_path: &Option<PathBuf>,
depth: &Option<i32>,
base_hashes: &mut HashSet<MerkleHash>,
new_node_hashes: &mut HashSet<MerkleHash>,
) -> Result<(), OxenError> {
let mut unique_hashes = HashSet::new();
let Ok(Some(_)) = CommitMerkleTreeLatest::read_depth_from_path_and_collect_hashes(
repository,
commit,
subtree_path.clone().unwrap_or(PathBuf::from(".")),
Some(base_hashes),
Some(&mut unique_hashes),
None,
depth.unwrap_or(-1),
) else {
return Ok(());
};
base_hashes.extend(unique_hashes.clone());
new_node_hashes.extend(unique_hashes);
Ok(())
}
pub fn populate_starting_hashes(
repository: &LocalRepository,
commit: &Commit,
subtree_path: &Option<PathBuf>,
depth: &Option<i32>,
unique_hashes: &mut HashSet<MerkleHash>,
) -> Result<(), OxenError> {
let Ok(Some(_)) = CommitMerkleTreeLatest::read_depth_from_path_and_collect_hashes(
repository,
commit,
subtree_path.clone().unwrap_or(PathBuf::from(".")),
None,
Some(unique_hashes),
None,
depth.unwrap_or(-1),
) else {
return Ok(());
};
Ok(())
}
pub fn get_ancestor_nodes(
repo: &LocalRepository,
commit: &Commit,
subtree_paths: &Vec<PathBuf>,
unique_hashes: &mut HashSet<MerkleHash>,
) -> Result<(), OxenError> {
for subtree_path in subtree_paths {
let parent = subtree_path.parent();
match parent {
Some(parent) => {
let ancestors = parent
.ancestors()
.map(|p| p.to_path_buf())
.collect::<Vec<PathBuf>>();
for ancestor in ancestors.iter().rev() {
let Some(node) =
repositories::tree::get_node_by_path_with_children(repo, commit, ancestor)?
else {
return Err(OxenError::basic_str(format!(
"Ancestor {ancestor:?} for subtree path {subtree_path:?} not found in merkle tree"
)));
};
let ancestor_node_hashes = node.list_dir_and_vnode_hashes()?;
log::debug!(
"get_ancestor_nodes found {} hashes for subtree path {:?}",
ancestor_node_hashes.len(),
subtree_path
);
unique_hashes.extend(ancestor_node_hashes);
}
}
None => {
break; }
}
}
Ok(())
}
pub fn print_tree(repo: &LocalRepository, commit: &Commit) -> Result<(), OxenError> {
let tree = get_root_with_children(repo, commit)?.unwrap();
match repo.min_version() {
MinOxenVersion::V0_19_0 => {
CommitMerkleTreeV0_19_0::print_node(&tree);
}
_ => {
CommitMerkleTreeLatest::print_node(&tree);
}
}
Ok(())
}
pub fn print_tree_depth_subtree(
repo: &LocalRepository,
commit: &Commit,
depth: i32,
subtree: &PathBuf,
) -> Result<(), OxenError> {
let tree = get_subtree_by_depth(repo, commit, subtree, depth)?.unwrap();
match repo.min_version() {
MinOxenVersion::V0_19_0 => {
CommitMerkleTreeV0_19_0::print_node_depth(&tree, depth);
}
_ => {
CommitMerkleTreeLatest::print_node_depth(&tree, depth);
}
}
Ok(())
}
pub fn print_tree_path(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
) -> Result<(), OxenError> {
let tree = get_node_by_path_with_children(repo, commit, path)?.unwrap();
match repo.min_version() {
MinOxenVersion::V0_19_0 => {
CommitMerkleTreeV0_19_0::print_node(&tree);
}
_ => {
CommitMerkleTreeLatest::print_node(&tree);
}
}
Ok(())
}
pub fn print_tree_depth(
repo: &LocalRepository,
commit: &Commit,
depth: i32,
) -> Result<(), OxenError> {
let tree = get_root_with_children(repo, commit)?.unwrap();
match repo.min_version() {
MinOxenVersion::V0_19_0 => {
CommitMerkleTreeV0_19_0::print_node_depth(&tree, depth);
}
_ => {
CommitMerkleTreeLatest::print_node_depth(&tree, depth);
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use crate::error::OxenError;
use crate::opts::RmOpts;
use crate::repositories;
use crate::test;
use crate::util;
use std::path::PathBuf;
#[tokio::test]
async fn test_list_tabular_files_in_repo() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|repo| async move {
let dir_path = repo
.path
.join("data")
.join("train")
.join("images")
.join("cats");
util::fs::create_dir_all(&dir_path)?;
let filename = "cats.tsv";
let filepath = dir_path.join(filename);
util::fs::write(filepath, "1\t2\t3\nhello\tworld\tsup\n")?;
let filename = "dogs.csv";
let filepath = dir_path.join(filename);
util::fs::write(filepath, "1,2,3\nhello,world,sup\n")?;
let filename = "README.md";
let filepath = dir_path.join(filename);
util::fs::write(filepath, "readme....")?;
let filename = "labels.tsv";
let filepath = repo.path.join(filename);
util::fs::write(filepath, "1\t2\t3\nhello\tworld\tsup\n")?;
let filename = "labels.txt";
let filepath = repo.path.join(filename);
util::fs::write(filepath, "1\t2\t3\nhello\tworld\tsup\n")?;
repositories::add(&repo, &repo.path).await?;
let commit = repositories::commit(&repo, "Adding all the data")?;
let files = repositories::tree::list_tabular_files_in_repo(&repo, &commit)?;
assert_eq!(files.len(), 3);
let filename = "dogs.tsv";
let filepath = repo.path.join(filename);
util::fs::write(filepath, "1\t2\t3\nhello\tworld\tsup\n")?;
repositories::add(&repo, &repo.path).await?;
let commit = repositories::commit(&repo, "Adding additional file")?;
let files = repositories::tree::list_tabular_files_in_repo(&repo, &commit)?;
assert_eq!(files.len(), 4);
util::fs::remove_dir_all(&dir_path)?;
let mut opts = RmOpts::from_path(dir_path);
opts.recursive = true;
repositories::rm(&repo, &opts)?;
let commit = repositories::commit(&repo, "Removing dir")?;
let files = repositories::tree::list_tabular_files_in_repo(&repo, &commit)?;
assert_eq!(files.len(), 2);
Ok(())
})
.await
}
#[tokio::test]
async fn test_merkle_two_files_same_hash() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|local_repo| async move {
let p1 = "hi.txt";
let p2 = "bye.txt";
let path_1 = local_repo.path.join(p1);
let path_2 = local_repo.path.join(p2);
let common_contents = "the same file";
test::write_txt_file_to_path(&path_1, common_contents)?;
test::write_txt_file_to_path(&path_2, common_contents)?;
repositories::add(&local_repo, &path_1).await?;
repositories::add(&local_repo, &path_2).await?;
let status = repositories::status(&local_repo)?;
log::debug!("staged files here are {:?}", status.staged_files);
assert_eq!(status.staged_files.len(), 2);
assert!(status.staged_files.contains_key(&PathBuf::from(p1)));
assert!(status.staged_files.contains_key(&PathBuf::from(p2)));
let commit = repositories::commit(&local_repo, "add two files")?;
assert!(repositories::tree::has_path(
&local_repo,
&commit,
PathBuf::from(p1)
)?);
assert!(repositories::tree::has_path(
&local_repo,
&commit,
PathBuf::from(p2)
)?);
Ok(())
})
.await
}
#[tokio::test]
async fn test_get_node_hashes_between_commits() -> Result<(), OxenError> {
test::run_local_repo_training_data_committed_async(|repo| async move {
let starting_commit = repositories::commits::head_commit(&repo)?;
println!("Starting commit: {}", starting_commit.id);
let new_file1 = repo.path.join("new_file1.txt");
let new_file2 = repo.path.join("new_dir1").join("new_file2.txt");
util::fs::create_dir_all(new_file2.parent().unwrap())?;
util::fs::write(&new_file1, "This is new file 1 content")?;
util::fs::write(&new_file2, "This is new file 2 content")?;
repositories::add(&repo, &new_file1).await?;
repositories::add(&repo, &new_file2).await?;
let commit1 = repositories::commit(&repo, "Add first batch of new files")?;
let new_file3 = repo.path.join("new_dir2").join("new_file3.csv");
util::fs::create_dir_all(new_file3.parent().unwrap())?;
util::fs::write(&new_file3, "col1,col2,col3\nval1,val2,val3\n")?;
repositories::add(&repo, &new_file3).await?;
let commit2 = repositories::commit(&repo, "Add second batch of new files")?;
let commit_list = vec![starting_commit, commit1.clone(), commit2.clone()];
let new_hashes = super::get_node_hashes_between_commits(
&repo,
&commit_list,
&None, &None, false, )?;
assert!(
new_hashes.len() == 10,
"Should have found 10 unique node hashes for new commits, got {}",
new_hashes.len()
);
assert!(
new_hashes.contains(&commit1.hash()?),
"Should include first new commit hash"
);
assert!(
new_hashes.contains(&commit2.hash()?),
"Should include second new commit hash"
);
let subtree_path = PathBuf::from("new_dir1");
let subtree_hashes = super::get_node_hashes_between_commits(
&repo,
&commit_list,
&Some(vec![subtree_path]),
&None,
false,
)?;
assert!(
subtree_hashes.len() == 8,
"Subtree filter should return 8 hashes, got {}",
subtree_hashes.len()
);
let depth_limited_hashes = super::get_node_hashes_between_commits(
&repo,
&commit_list,
&None,
&Some(0), false,
)?;
assert!(
depth_limited_hashes.len() == 6,
"Depth filter should return 6 hashes, got {}",
depth_limited_hashes.len()
);
Ok(())
})
.await
}
}