use bytesize::ByteSize;
use flate2::Compression;
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use std::collections::{HashMap, HashSet};
use std::io::{Read, Write};
use std::path::{Component, Path, PathBuf};
use std::str;
use tar::Archive;
use crate::constants::{NODES_DIR, OXEN_HIDDEN_DIR, TREE_DIR};
use crate::core::db::merkle_node::MerkleNodeDB;
use crate::core::db::merkle_node::merkle_node_db::{MerkleDbError, node_db_path};
use crate::core::node_sync_status;
use crate::core::v_latest::index::CommitMerkleTree as CommitMerkleTreeLatest;
use crate::core::v_latest::index::CommitMerkleTree;
use crate::core::v_old::v0_19_0::index::CommitMerkleTree as CommitMerkleTreeV0_19_0;
use crate::core::versions::MinOxenVersion;
use crate::error::OxenError;
use crate::model::merkle_tree::merkle_transport::{PackOptions, UnpackOptions};
use crate::model::merkle_tree::node::{
CommitNode, DirNodeWithPath, EMerkleTreeNode, FileNode, FileNodeWithDir, MerkleTreeNode,
};
use crate::model::{
Commit, EntryDataType, LocalRepository, MerkleHash, MerkleTreeNodeType, PartialNode,
TMerkleTreeNode,
};
use crate::util::fs::AtomicFile;
use crate::{repositories, util};
pub fn get_root(
repo: &LocalRepository,
commit: &Commit,
) -> Result<Option<MerkleTreeNode>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_19_0 => CommitMerkleTreeV0_19_0::root_without_children(repo, commit),
_ => CommitMerkleTreeLatest::root_without_children(repo, commit),
}
}
pub fn get_root_with_children(
repo: &LocalRepository,
commit: &Commit,
) -> Result<Option<MerkleTreeNode>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_19_0 => CommitMerkleTreeV0_19_0::root_with_children(repo, commit),
_ => CommitMerkleTreeLatest::root_with_children(repo, commit),
}
}
pub fn get_root_with_children_and_node_hashes(
repo: &LocalRepository,
commit: &Commit,
base_hashes: Option<&HashSet<MerkleHash>>,
unique_hashes: Option<&mut HashSet<MerkleHash>>,
shared_hashes: Option<&mut HashSet<MerkleHash>>,
) -> Result<Option<MerkleTreeNode>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_19_0 => CommitMerkleTreeV0_19_0::root_with_children(repo, commit),
_ => CommitMerkleTreeLatest::root_with_children_and_node_hashes(
repo,
commit,
base_hashes,
unique_hashes,
shared_hashes,
),
}
}
pub fn get_root_with_children_and_partial_nodes(
repo: &LocalRepository,
commit: &Commit,
base_hashes: Option<&HashSet<MerkleHash>>,
unique_hashes: Option<&mut HashSet<MerkleHash>>,
shared_hashes: Option<&mut HashSet<MerkleHash>>,
partial_nodes: &mut HashMap<PathBuf, PartialNode>,
) -> Result<Option<MerkleTreeNode>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_19_0 => CommitMerkleTreeV0_19_0::root_with_children(repo, commit),
_ => CommitMerkleTreeLatest::root_with_children_and_partial_nodes(
repo,
commit,
base_hashes,
unique_hashes,
shared_hashes,
partial_nodes,
),
}
}
pub fn get_root_dir(node: &MerkleTreeNode) -> Result<&MerkleTreeNode, OxenError> {
if node.node.node_type() != MerkleTreeNodeType::Commit {
return Err(OxenError::basic_str(format!(
"Expected a commit node, but got: '{:?}'",
node.node.node_type()
)));
}
if node.children.len() != 1 {
return Err(OxenError::basic_str(format!(
"Commit node should have exactly one child (root directory) but got: {} from {}",
node.children.len(),
node
)));
}
let root_dir = &node.children[0];
if root_dir.node.node_type() != MerkleTreeNodeType::Dir {
return Err(OxenError::basic_str(format!(
"The child of a commit node should be a directory, but got: '{:?}'",
root_dir.node.node_type()
)));
}
Ok(root_dir)
}
pub fn get_node_by_id(
repo: &LocalRepository,
hash: &MerkleHash,
) -> Result<Option<MerkleTreeNode>, OxenError> {
let load_recursive = false;
match repo.min_version() {
MinOxenVersion::V0_19_0 => CommitMerkleTreeV0_19_0::read_node(repo, hash, load_recursive),
_ => CommitMerkleTreeLatest::read_node(repo, hash, load_recursive),
}
}
pub fn get_node_by_id_with_children(
repo: &LocalRepository,
hash: &MerkleHash,
) -> Result<Option<MerkleTreeNode>, OxenError> {
let load_recursive = true;
match repo.min_version() {
MinOxenVersion::V0_19_0 => CommitMerkleTreeV0_19_0::read_node(repo, hash, load_recursive),
_ => CommitMerkleTreeLatest::read_node(repo, hash, load_recursive),
}
}
pub fn get_commit_node_version(
repo: &LocalRepository,
commit: &Commit,
) -> Result<MinOxenVersion, OxenError> {
let commit_id = commit.id.parse()?;
let Some(commit_node) = repositories::tree::get_node_by_id(repo, &commit_id)? else {
return Err(OxenError::commit_id_does_not_exist(&commit.id));
};
let EMerkleTreeNode::Commit(commit_node) = &commit_node.node else {
log::error!("Commit node is not a commit node");
return Err(OxenError::commit_id_does_not_exist(&commit.id));
};
Ok(commit_node.version())
}
pub fn has_dir(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
) -> Result<bool, OxenError> {
let dir_hashes = CommitMerkleTreeLatest::dir_hashes(repo, commit)?;
Ok(dir_hashes.contains_key(path.as_ref()))
}
pub fn has_path(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
) -> Result<bool, OxenError> {
let path = path.as_ref();
let dir_hashes = CommitMerkleTreeLatest::dir_hashes(repo, commit)?;
match dir_hashes.get(path) {
Some(dir_hash) => {
let node = get_node_by_id_with_children(repo, dir_hash)?.unwrap();
Ok(node.get_by_path(path)?.is_some())
}
None => {
let parent = path.parent().unwrap();
if let Some(parent_hash) = dir_hashes.get(parent) {
let node = get_node_by_id_with_children(repo, parent_hash)?.unwrap();
Ok(node.get_by_path(path)?.is_some())
} else {
Ok(false)
}
}
}
}
pub fn get_node_by_path(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
) -> Result<Option<MerkleTreeNode>, OxenError> {
let load_recursive = false;
match repo.min_version() {
MinOxenVersion::V0_19_0 => {
match CommitMerkleTreeV0_19_0::from_path(repo, commit, path, load_recursive) {
Ok(tree) => Ok(Some(tree.root)),
Err(e) => {
log::warn!("Error getting node by path: {e:?}");
Ok(None)
}
}
}
_ => match CommitMerkleTreeLatest::read_from_path(repo, commit, path, load_recursive) {
Ok(node) => Ok(node),
Err(e) => {
log::warn!("Error getting node by path: {e:?}");
Ok(None)
}
},
}
}
pub fn get_node_by_path_with_children(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
) -> Result<Option<MerkleTreeNode>, OxenError> {
let load_recursive = true;
let node = match repo.min_version() {
MinOxenVersion::V0_19_0 => {
Some(CommitMerkleTreeV0_19_0::from_path(repo, commit, path, load_recursive)?.root)
}
_ => CommitMerkleTreeLatest::read_from_path(repo, commit, path, load_recursive)?,
};
Ok(node)
}
pub fn get_file_by_path(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
) -> Result<Option<FileNode>, OxenError> {
let Some(root) = get_node_by_path(repo, commit, &path)? else {
return Ok(None);
};
match root.node {
EMerkleTreeNode::File(file_node) => Ok(Some(file_node.clone())),
_ => Ok(None),
}
}
pub fn get_dir_with_children(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
dir_hashes: Option<&HashMap<PathBuf, MerkleHash>>,
) -> Result<Option<MerkleTreeNode>, OxenError> {
let _perf = crate::perf_guard!("tree::get_dir_with_children");
match repo.min_version() {
MinOxenVersion::V0_19_0 => CommitMerkleTreeV0_19_0::dir_with_children(repo, commit, path),
_ => CommitMerkleTreeLatest::dir_with_children(repo, commit, path, dir_hashes),
}
}
pub fn get_dir_without_children(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
dir_hashes: Option<&HashMap<PathBuf, MerkleHash>>,
) -> Result<Option<MerkleTreeNode>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_19_0 => {
CommitMerkleTreeV0_19_0::dir_without_children(repo, commit, path)
}
_ => CommitMerkleTreeLatest::dir_without_children(repo, commit, path, dir_hashes),
}
}
pub fn get_dir_with_children_recursive(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
dir_hashes: Option<&HashMap<PathBuf, MerkleHash>>,
) -> Result<Option<MerkleTreeNode>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_19_0 => {
CommitMerkleTreeV0_19_0::dir_with_children_recursive(repo, commit, path)
}
_ => CommitMerkleTreeLatest::dir_with_children_recursive(repo, commit, path, dir_hashes),
}
}
pub fn get_subtree(
repo: &LocalRepository,
commit: &Commit,
maybe_subtree: &Option<PathBuf>,
maybe_depth: &Option<i32>,
) -> Result<Option<MerkleTreeNode>, OxenError> {
match (maybe_subtree, maybe_depth) {
(Some(subtree), Some(depth)) => {
log::debug!("Getting subtree {subtree:?} with depth {depth} for commit {commit}");
get_subtree_by_depth(repo, commit, subtree, *depth)
}
(Some(subtree), None) => {
log::debug!("Getting subtree {subtree:?} for commit {commit} with depth -1");
get_subtree_by_depth(repo, commit, subtree, -1)
}
(None, Some(depth)) => {
log::debug!("Getting tree from root with depth {depth} for commit {commit}");
get_subtree_by_depth(repo, commit, PathBuf::from("."), *depth)
}
_ => {
log::debug!("Getting full tree for commit {commit}");
get_root_with_children(repo, commit)
}
}
}
pub fn get_subtree_by_depth(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
depth: i32,
) -> Result<Option<MerkleTreeNode>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_19_0 => {
CommitMerkleTreeV0_19_0::from_path_depth(repo, commit, path, depth)
}
_ => CommitMerkleTreeLatest::read_depth_from_path(repo, commit, path, depth),
}
}
pub fn get_subtree_by_depth_with_unique_children(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
base_hashes: Option<&HashSet<MerkleHash>>,
unique_hashes: Option<&mut HashSet<MerkleHash>>,
shared_hashes: Option<&mut HashSet<MerkleHash>>,
depth: i32,
) -> Result<Option<MerkleTreeNode>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_19_0 => {
CommitMerkleTreeV0_19_0::from_path_depth(repo, commit, path, depth)
}
_ => CommitMerkleTreeLatest::read_depth_from_path_and_collect_hashes(
repo,
commit,
path,
base_hashes,
unique_hashes,
shared_hashes,
depth,
),
}
}
pub fn list_nodes_from_paths(
repo: &LocalRepository,
commit: &Commit,
paths: &[PathBuf],
) -> Result<HashMap<PathBuf, MerkleTreeNode>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_19_0 => CommitMerkleTreeV0_19_0::read_nodes(repo, commit, paths),
_ => CommitMerkleTreeLatest::read_nodes(repo, commit, paths),
}
}
pub fn list_files_and_folders(node: &MerkleTreeNode) -> Result<Vec<MerkleTreeNode>, OxenError> {
if MerkleTreeNodeType::Dir != node.node.node_type() {
return Err(OxenError::basic_str(format!(
"list_files_and_folders Merkle tree node is not a directory: '{:?}'",
node.node.node_type()
)));
}
let mut children = Vec::new();
for child in &node.children {
if let EMerkleTreeNode::VNode(_) = &child.node {
children.extend(child.children.iter().cloned());
}
}
Ok(children)
}
pub fn list_files_and_folders_map(
node: &MerkleTreeNode,
) -> Result<HashMap<PathBuf, MerkleTreeNode>, OxenError> {
if MerkleTreeNodeType::Dir != node.node.node_type() {
return Err(OxenError::basic_str(format!(
"list_files_and_folders_map Merkle tree node is not a directory: '{:?}'",
node.node.node_type()
)));
}
let mut children = HashMap::new();
for child in &node.children {
if let EMerkleTreeNode::VNode(_) = &child.node {
for child in &child.children {
match &child.node {
EMerkleTreeNode::File(file_node) => {
children.insert(PathBuf::from(file_node.name()), child.clone());
}
EMerkleTreeNode::Directory(dir_node) => {
children.insert(PathBuf::from(dir_node.name()), child.clone());
}
_ => {}
}
}
}
}
Ok(children)
}
pub fn collect_nodes_along_path(
repo: &LocalRepository,
commit: &Commit,
paths: Vec<PathBuf>,
starting_node_hashes: &mut HashSet<MerkleHash>,
hashes: &mut HashSet<MerkleHash>,
) -> Result<(), OxenError> {
let root_path = paths
.first()
.ok_or_else(|| OxenError::basic_str("No paths provided"))?;
let node = get_node_by_path_with_children(repo, commit, root_path)?
.ok_or_else(|| OxenError::basic_str("Node not found"))?;
let (_root_node, nodes) = node.get_nodes_along_paths(paths)?;
for node in nodes {
if !starting_node_hashes.contains(&node.hash) {
hashes.insert(node.hash);
}
}
Ok(())
}
pub fn list_missing_node_hashes(
repo: &LocalRepository,
hashes: &HashSet<MerkleHash>,
) -> Result<HashSet<MerkleHash>, OxenError> {
let mut results = HashSet::new();
for hash in hashes {
if !node_sync_status::node_is_synced(repo, hash) {
results.insert(*hash);
}
}
Ok(results)
}
pub async fn list_missing_file_hashes(
repo: &LocalRepository,
hash: &MerkleHash,
) -> Result<HashSet<MerkleHash>, OxenError> {
if repo.min_version() == MinOxenVersion::V0_19_0 {
let Some(node) = CommitMerkleTreeV0_19_0::read_depth(repo, hash, 1)? else {
return Err(OxenError::basic_str(format!("Node {hash} not found")));
};
node.list_missing_file_hashes(repo).await
} else {
let Some(node) = CommitMerkleTreeLatest::read_depth(repo, hash, 1)? else {
return Err(OxenError::basic_str(format!("Node {hash} not found")));
};
node.list_missing_file_hashes(repo).await
}
}
pub async fn list_missing_file_hashes_from_commits(
repo: &LocalRepository,
commit_ids: &HashSet<MerkleHash>,
subtree_paths: &Option<Vec<PathBuf>>,
depth: &Option<i32>,
) -> Result<HashSet<MerkleHash>, OxenError> {
log::debug!(
"list_missing_file_hashes_from_commits checking {} commit ids, subtree paths: {:?}, depth: {:?}",
commit_ids.len(),
subtree_paths,
depth
);
let mut candidate_hashes: HashSet<MerkleHash> = HashSet::new();
for commit_id in commit_ids {
let commit_id_str = commit_id.to_string();
let Some(commit) = repositories::commits::get_by_id(repo, &commit_id_str)? else {
log::error!("list_missing_file_hashes_from_commits Commit {commit_id_str} not found");
return Err(OxenError::RevisionNotFound(commit_id_str.into()));
};
if let Some(subtree_paths) = subtree_paths {
for path in subtree_paths {
let Some(tree) =
repositories::tree::get_subtree(repo, &commit, &Some(path.clone()), depth)?
else {
log::warn!("list_missing_file_hashes_from_commits subtree not found for path");
continue;
};
tree.walk_tree(|node| {
if node.is_file() {
candidate_hashes.insert(node.hash);
}
});
}
} else {
let Some(tree) = get_root_with_children(repo, &commit)? else {
log::warn!(
"list_missing_file_hashes_from_commits root not found for commit: {commit:?}"
);
continue;
};
tree.walk_tree(|node| {
if node.is_file() {
candidate_hashes.insert(node.hash);
}
});
}
}
log::debug!(
"list_missing_file_hashes_from_commits candidate_hashes count: {}",
candidate_hashes.len()
);
list_missing_file_hashes_from_hashes(repo, &candidate_hashes).await
}
#[derive(Debug, Default)]
pub struct DirEntries {
pub files: HashSet<(FileNode, PathBuf)>,
pub dirs: HashSet<PathBuf>,
}
pub fn dir_entries_with_paths(node: &MerkleTreeNode, base_path: &Path) -> DirEntries {
let mut out = DirEntries::default();
if matches!(&node.node, EMerkleTreeNode::Directory(_)) {
out.dirs.insert(base_path.to_path_buf());
}
for child in &node.children {
match &child.node {
EMerkleTreeNode::File(file_node) => {
let file_path = base_path.join(file_node.name());
out.files.insert((file_node.clone(), file_path));
}
EMerkleTreeNode::Directory(dir_node) => {
let new_base_path = base_path.join(dir_node.name());
let sub = dir_entries_with_paths(child, &new_base_path);
out.files.extend(sub.files);
out.dirs.extend(sub.dirs);
}
EMerkleTreeNode::VNode(_) => {
let sub = dir_entries_with_paths(child, base_path);
out.files.extend(sub.files);
out.dirs.extend(sub.dirs);
}
_ => {}
}
}
out
}
pub fn unique_dir_entries(
base_path: &Path,
node: &MerkleTreeNode,
shared_hashes: &HashSet<MerkleHash>,
) -> Result<HashMap<PathBuf, FileNode>, OxenError> {
let mut entries = HashMap::new();
match &node.node {
EMerkleTreeNode::Directory(_) | EMerkleTreeNode::VNode(_) | EMerkleTreeNode::Commit(_) => {
if !shared_hashes.contains(&node.hash) {
for child in &node.children {
match &child.node {
EMerkleTreeNode::File(file_node) => {
let file_path = base_path.join(file_node.name());
entries.insert(file_path, file_node.clone());
}
EMerkleTreeNode::Directory(dir_node) => {
let new_base_path = base_path.join(dir_node.name());
entries.extend(unique_dir_entries(
&new_base_path,
child,
shared_hashes,
)?);
}
EMerkleTreeNode::VNode(_vnode) => {
entries.extend(unique_dir_entries(base_path, child, shared_hashes)?);
}
_ => {}
}
}
}
}
EMerkleTreeNode::File(_) => {}
_ => {
return Err(OxenError::basic_str(format!(
"Unexpected node type: {:?}",
node.node.node_type()
)));
}
}
Ok(entries)
}
async fn list_missing_file_hashes_from_hashes(
repo: &LocalRepository,
hashes: &HashSet<MerkleHash>,
) -> Result<HashSet<MerkleHash>, OxenError> {
let mut results = HashSet::new();
let version_store = repo.version_store();
for hash in hashes {
if !version_store.version_exists(&hash.to_string()).await? {
results.insert(*hash);
}
}
Ok(results)
}
pub fn list_all_files(
node: &MerkleTreeNode,
subtree_path: &PathBuf,
) -> Result<HashSet<FileNodeWithDir>, OxenError> {
let mut file_nodes = HashSet::new();
r_list_all_files(node, subtree_path, &mut file_nodes)?;
Ok(file_nodes)
}
fn r_list_all_files(
node: &MerkleTreeNode,
traversed_path: impl AsRef<Path>,
file_nodes: &mut HashSet<FileNodeWithDir>,
) -> Result<(), OxenError> {
let traversed_path = traversed_path.as_ref();
for child in &node.children {
match &child.node {
EMerkleTreeNode::File(file_node) => {
file_nodes.insert(FileNodeWithDir {
file_node: file_node.to_owned(),
dir: traversed_path.to_owned(),
});
}
EMerkleTreeNode::Directory(dir_node) => {
let new_path = traversed_path.join(dir_node.name());
r_list_all_files(child, new_path, file_nodes)?;
}
EMerkleTreeNode::VNode(_) => {
r_list_all_files(child, traversed_path, file_nodes)?;
}
_ => {}
}
}
Ok(())
}
pub fn list_all_dirs(node: &MerkleTreeNode) -> Result<HashSet<DirNodeWithPath>, OxenError> {
let mut dir_nodes = HashSet::new();
r_list_all_dirs(node, PathBuf::from(""), &mut dir_nodes)?;
Ok(dir_nodes)
}
fn r_list_all_dirs(
node: &MerkleTreeNode,
traversed_path: impl AsRef<Path>,
dir_nodes: &mut HashSet<DirNodeWithPath>,
) -> Result<(), OxenError> {
let traversed_path = traversed_path.as_ref();
for child in &node.children {
match &child.node {
EMerkleTreeNode::Directory(dir_node) => {
let new_path = traversed_path.join(dir_node.name());
dir_nodes.insert(DirNodeWithPath {
dir_node: dir_node.to_owned(),
path: new_path.to_owned(),
});
r_list_all_dirs(child, new_path, dir_nodes)?;
}
EMerkleTreeNode::VNode(_) => {
r_list_all_dirs(child, traversed_path, dir_nodes)?;
}
_ => {}
}
}
Ok(())
}
pub fn list_files_and_dirs(
root: &MerkleTreeNode,
) -> Result<(HashSet<FileNodeWithDir>, HashSet<DirNodeWithPath>), OxenError> {
let mut file_nodes = HashSet::new();
let mut dir_nodes = HashSet::new();
r_list_files_and_dirs(root, PathBuf::new(), &mut file_nodes, &mut dir_nodes)?;
Ok((file_nodes, dir_nodes))
}
fn r_list_files_and_dirs(
node: &MerkleTreeNode,
traversed_path: impl AsRef<Path>,
file_nodes: &mut HashSet<FileNodeWithDir>,
dir_nodes: &mut HashSet<DirNodeWithPath>,
) -> Result<(), OxenError> {
let traversed_path = traversed_path.as_ref();
if let EMerkleTreeNode::File(file_node) = &node.node {
file_nodes.insert(FileNodeWithDir {
file_node: file_node.to_owned(),
dir: traversed_path.to_owned(),
});
return Ok(());
}
for child in &node.children {
match &child.node {
EMerkleTreeNode::File(file_node) => {
file_nodes.insert(FileNodeWithDir {
file_node: file_node.to_owned(),
dir: traversed_path.to_owned(),
});
}
EMerkleTreeNode::Directory(dir_node) => {
let new_path = traversed_path.join(dir_node.name());
if new_path != Path::new("") {
dir_nodes.insert(DirNodeWithPath {
dir_node: dir_node.to_owned(),
path: new_path.to_owned(),
});
}
r_list_files_and_dirs(child, new_path, file_nodes, dir_nodes)?;
}
EMerkleTreeNode::VNode(_) => {
r_list_files_and_dirs(child, traversed_path, file_nodes, dir_nodes)?;
}
_ => {}
}
}
Ok(())
}
pub fn list_tabular_files_in_repo(
repo: &LocalRepository,
commit: &Commit,
) -> Result<HashSet<FileNode>, OxenError> {
let entries = list_files_by_type(repo, commit, &EntryDataType::Tabular)?;
Ok(entries)
}
pub fn list_files_by_type(
repo: &LocalRepository,
commit: &Commit,
data_type: &EntryDataType,
) -> Result<HashSet<FileNode>, OxenError> {
let mut file_nodes = HashSet::new();
let Some(tree) = get_root_with_children(repo, commit)? else {
log::warn!("get_root_with_children returned None for commit: {commit:?}");
return Ok(file_nodes);
};
r_list_files_by_type(&tree, data_type, &mut file_nodes, PathBuf::new())?;
Ok(file_nodes)
}
fn r_list_files_by_type(
node: &MerkleTreeNode,
data_type: &EntryDataType,
file_nodes: &mut HashSet<FileNode>,
traversed_path: impl AsRef<Path>,
) -> Result<(), OxenError> {
let traversed_path = traversed_path.as_ref();
for child in &node.children {
match &child.node {
EMerkleTreeNode::File(file_node) if file_node.data_type() == data_type => {
let mut file_node = file_node.to_owned();
let full_path = traversed_path.join(file_node.name());
file_node.set_name(&full_path.to_string_lossy());
file_nodes.insert(file_node);
}
EMerkleTreeNode::Directory(dir_node) => {
let full_path = traversed_path.join(dir_node.name());
r_list_files_by_type(child, data_type, file_nodes, full_path)?;
}
EMerkleTreeNode::VNode(_) => {
r_list_files_by_type(child, data_type, file_nodes, traversed_path)?;
}
_ => {}
}
}
Ok(())
}
pub fn cp_dir_hashes_to(
repo: &LocalRepository,
original_commit_id: &MerkleHash,
new_commit_id: &MerkleHash,
) -> Result<(), OxenError> {
let original_dir_hashes_path =
CommitMerkleTree::dir_hash_db_path_from_commit_id(repo, &original_commit_id.to_string());
let new_dir_hashes_path =
CommitMerkleTree::dir_hash_db_path_from_commit_id(repo, &new_commit_id.to_string());
util::fs::copy_dir_all(original_dir_hashes_path, new_dir_hashes_path)?;
Ok(())
}
pub fn compress_tree(repository: &LocalRepository) -> Result<Vec<u8>, OxenError> {
let mut buffer = Vec::new();
write_all_tar(&repository.path, &mut buffer, true)?;
let total_size: u64 = u64::try_from(buffer.len()).unwrap_or(u64::MAX);
log::debug!("Compressed entire tree size is {}", ByteSize::b(total_size));
Ok(buffer)
}
pub fn compress_nodes(
repository: &LocalRepository,
hashes: &HashSet<MerkleHash>,
) -> Result<Vec<u8>, OxenError> {
log::debug!("Compressing {} unique nodes...", hashes.len());
let mut buffer = Vec::new();
write_hashes_tar(
&repository.path,
hashes,
PackOptions::ServerCanonical,
&mut buffer,
true,
)?;
Ok(buffer)
}
pub fn compress_node(
repository: &LocalRepository,
hash: &MerkleHash,
) -> Result<Vec<u8>, OxenError> {
let buffer = compress_nodes(repository, &HashSet::from([*hash]))?;
let total_size: u64 = u64::try_from(buffer.len()).unwrap_or(u64::MAX);
log::debug!(
"Compressed node {} size is {}",
hash,
ByteSize::b(total_size)
);
Ok(buffer)
}
pub fn compress_commits(
repository: &LocalRepository,
commits: &[Commit],
) -> Result<Vec<u8>, OxenError> {
let hashes = commits
.iter()
.map(|commit| commit.hash())
.collect::<Result<HashSet<MerkleHash>, _>>()?;
compress_nodes(repository, &hashes)
}
pub fn unpack_nodes(
repository: &LocalRepository,
buffer: &[u8],
) -> Result<HashSet<MerkleHash>, OxenError> {
let mut reader: &[u8] = buffer;
let oxen_hidden = repository.path.join(OXEN_HIDDEN_DIR);
extract_tar_under(&mut reader, &oxen_hidden, false).map_err(OxenError::from)
}
fn write_hashes_tar<W: Write>(
repo_path: &Path,
hashes: &HashSet<MerkleHash>,
opts: PackOptions,
out: W,
skip_missing_node_dir: bool,
) -> Result<(), MerkleDbError> {
let enc = GzEncoder::new(out, pack_options_compression(opts));
let mut tar = tar::Builder::new(enc);
for hash in hashes {
let dir_prefix = hash.to_hex_hash().node_db_prefix();
let tar_subdir: PathBuf = match opts {
PackOptions::ServerCanonical => Path::new(TREE_DIR).join(NODES_DIR).join(&dir_prefix),
PackOptions::LegacyClientPush => PathBuf::from(&dir_prefix),
};
let node_dir = node_db_path(repo_path, hash);
if node_dir.exists() {
tar.append_dir_all(&tar_subdir, node_dir)?;
} else if !skip_missing_node_dir {
return Err(MerkleDbError::MissingNodeDir(*hash));
} else {
log::warn!("Skipping missing node dir for hash {}", hash.to_hex_hash());
}
}
tar.finish()?;
tar.into_inner()?.finish()?;
Ok(())
}
fn pack_options_compression(opts: PackOptions) -> Compression {
match opts {
PackOptions::ServerCanonical => Compression::fast(),
PackOptions::LegacyClientPush => Compression::default(),
}
}
fn write_all_tar<W: Write>(
repo_path: &Path,
out: W,
skip_missing_node_dir: bool,
) -> Result<(), MerkleDbError> {
let enc = GzEncoder::new(out, Compression::fast());
let mut tar = tar::Builder::new(enc);
let tar_subdir = Path::new(TREE_DIR).join(NODES_DIR);
let nodes_dir = repo_path
.join(OXEN_HIDDEN_DIR)
.join(TREE_DIR)
.join(NODES_DIR);
if nodes_dir.exists() {
tar.append_dir_all(&tar_subdir, nodes_dir)?;
} else if !skip_missing_node_dir {
return Err(MerkleDbError::MissingTreeNodesDir);
} else {
log::warn!("Missing oxen tree/nodes dir in this repository: resulting in empty tarball");
}
tar.finish()?;
tar.into_inner()?.finish()?;
Ok(())
}
fn extract_tar_under<R: Read>(
reader: R,
oxen_hidden: &Path,
overwrite_existing: bool,
) -> Result<HashSet<MerkleHash>, MerkleDbError> {
let mut hashes: HashSet<MerkleHash> = HashSet::new();
let decoder = GzDecoder::new(reader);
let mut archive = Archive::new(decoder);
let entries = archive.entries().map_err(MerkleDbError::CannotReadMerkle)?;
let tree_nodes_prefix = Path::new(TREE_DIR).join(NODES_DIR);
for entry in entries {
let Ok(mut file) = entry else {
log::error!("Could not unpack file in merkle tar archive");
continue;
};
let path = file.path()?.into_owned();
if path.components().any(|c| matches!(c, Component::ParentDir)) {
return Err(MerkleDbError::PathTraversal(path.display().to_string()));
}
let entry_type = file.header().entry_type();
if !entry_type.is_file() && !entry_type.is_dir() {
return Err(MerkleDbError::UnsupportedTarEntry {
path: path.display().to_string(),
});
}
let dst_path = if path.starts_with(&tree_nodes_prefix) {
oxen_hidden.join(&path)
} else {
oxen_hidden.join(&tree_nodes_prefix).join(&path)
};
if dst_path.exists() && !overwrite_existing {
log::info!("Node already exists at {dst_path:?}, skipping");
continue;
}
if entry_type.is_dir() {
std::fs::create_dir_all(&dst_path)?;
} else {
AtomicFile::new(&dst_path)
.stream(&mut file)
.map_err(|err| MerkleDbError::FsTransport(Box::new(err)))?;
}
if let Some(hash) = extract_hash_from_entry_path(&dst_path, oxen_hidden)? {
hashes.insert(hash);
}
}
Ok(hashes)
}
fn extract_hash_from_entry_path(
dst_path: &Path,
oxen_hidden: &Path,
) -> Result<Option<MerkleHash>, MerkleDbError> {
let tree_nodes_prefix = Path::new(TREE_DIR).join(NODES_DIR);
let invalid_structure = |reason: &str| MerkleDbError::InvalidTarStructure {
entry_path: dst_path.display().to_string(),
reason: reason.to_string(),
};
let rel = dst_path.strip_prefix(oxen_hidden).map_err(|_| {
invalid_structure("entry resolved outside of the repo's `.oxen/` directory")
})?;
let under_tree_nodes = rel
.strip_prefix(&tree_nodes_prefix)
.map_err(|_| invalid_structure("entry path is not under `tree/nodes/`"))?;
let mut components: Vec<&str> = Vec::new();
for component in under_tree_nodes.components() {
let Component::Normal(segment) = component else {
return Err(invalid_structure(
"entry path contains a non-`Normal` component",
));
};
let Some(s) = segment.to_str() else {
return Err(invalid_structure("entry path contains a non-UTF-8 segment"));
};
components.push(s);
}
match components.as_slice() {
[] | [_] => Ok(None),
[prefix, suffix] => {
let id = format!("{prefix}{suffix}");
let hash_value = u128::from_str_radix(&id, 16)
.map_err(|source| MerkleDbError::InvalidNodeIdHex { id, source })?;
Ok(Some(MerkleHash::new(hash_value)))
}
[_, _, leaf] if *leaf == "node" || *leaf == "children" => Ok(None),
[_, _, other] => Err(invalid_structure(&format!(
"leaf file under `tree/nodes/{{prefix}}/{{suffix}}/` must be `node` or `children`, got `{other}`"
))),
_ => Err(invalid_structure(
"entry has more components than `tree/nodes/{prefix}/{suffix}/[node|children]`",
)),
}
}
pub(crate) fn pack_nodes(
repo: &LocalRepository,
hashes: &HashSet<MerkleHash>,
opts: PackOptions,
out: &mut dyn Write,
) -> Result<(), OxenError> {
write_hashes_tar(&repo.path, hashes, opts, out, true).map_err(OxenError::from)
}
#[cfg(test)]
pub(crate) fn pack_all(repo: &LocalRepository, out: &mut dyn Write) -> Result<(), OxenError> {
write_all_tar(&repo.path, out, true)?;
Ok(())
}
pub(crate) fn pack_nodes_byte_estimate(
repo: &LocalRepository,
hashes: &HashSet<MerkleHash>,
) -> u64 {
const TAR_HEADER_BYTES: u64 = 512;
const TAR_BLOCK_SIZE: u64 = 512;
let mut total: u64 = 0;
for hash in hashes {
let node_dir = node_db_path(&repo.path, hash);
if !node_dir.exists() {
continue;
}
total = total.saturating_add(TAR_HEADER_BYTES);
let Ok(entries) = std::fs::read_dir(&node_dir) else {
continue;
};
for entry in entries.flatten() {
let Ok(meta) = entry.metadata() else {
continue;
};
if meta.is_file() {
let len = meta.len();
let padded = len.div_ceil(TAR_BLOCK_SIZE).saturating_mul(TAR_BLOCK_SIZE);
total = total.saturating_add(TAR_HEADER_BYTES.saturating_add(padded));
} else if meta.is_dir() {
total = total.saturating_add(TAR_HEADER_BYTES);
}
}
}
total
}
pub(crate) fn unpack(
repo: &LocalRepository,
reader: &mut dyn Read,
opts: UnpackOptions,
) -> Result<HashSet<MerkleHash>, OxenError> {
let overwrite_existing = matches!(opts, UnpackOptions::Overwrite);
let oxen_hidden = repo.path.join(OXEN_HIDDEN_DIR);
extract_tar_under(reader, &oxen_hidden, overwrite_existing).map_err(OxenError::from)
}
pub fn write_tree(repo: &LocalRepository, node: &MerkleTreeNode) -> Result<(), OxenError> {
let EMerkleTreeNode::Commit(commit_node) = &node.node else {
return Err(OxenError::basic_str("Expected commit node"));
};
let commit_node = CommitNode::new(repo, commit_node.get_opts())?;
p_write_tree(repo, node, &commit_node)?;
Ok(())
}
fn p_write_tree(
repo: &LocalRepository,
node: &MerkleTreeNode,
node_impl: &impl TMerkleTreeNode,
) -> Result<(), OxenError> {
let parent_id = node.parent_id;
let mut db = MerkleNodeDB::open_read_write(&repo.path, node_impl, parent_id)?;
for child in &node.children {
match &child.node {
EMerkleTreeNode::VNode(vnode) => {
db.add_child(vnode)?;
p_write_tree(repo, child, vnode)?;
}
EMerkleTreeNode::Directory(dir_node) => {
db.add_child(dir_node)?;
p_write_tree(repo, child, dir_node)?;
}
EMerkleTreeNode::File(file_node) => {
db.add_child(file_node)?;
}
node => {
return Err(OxenError::basic_str(format!(
"p_write_tree unexpected node type: {node:?}"
)));
}
}
}
db.close()?;
Ok(())
}
pub fn get_all_node_hashes_for_commits(
repository: &LocalRepository,
commits: &[Commit],
maybe_subtrees: &Option<Vec<PathBuf>>,
maybe_depth: &Option<i32>,
is_download: bool,
) -> Result<HashSet<MerkleHash>, OxenError> {
log::debug!(
"Getting ALL node hashes for {} commits... and subtree paths {:?}",
commits.len(),
maybe_subtrees
);
let mut all_node_hashes: HashSet<MerkleHash> = HashSet::new();
let mut starting_node_hashes = HashSet::new();
for commit in commits {
get_node_hashes_for_commit(
repository,
commit,
maybe_subtrees,
maybe_depth,
is_download,
&mut starting_node_hashes,
&mut all_node_hashes,
)?;
}
log::debug!("All node hashes: {}", all_node_hashes.len());
Ok(all_node_hashes)
}
pub fn get_node_hashes_between_commits(
repository: &LocalRepository,
commits: &[Commit],
maybe_subtrees: &Option<Vec<PathBuf>>,
maybe_depth: &Option<i32>,
is_download: bool,
) -> Result<HashSet<MerkleHash>, OxenError> {
log::debug!(
"Getting new node hashes for {} commits... and subtree paths {:?}",
commits.len(),
maybe_subtrees
);
let (first_commit, new_commits) = commits
.split_first()
.ok_or_else(|| OxenError::basic_str("Must provide at least one commit"))?;
let mut starting_node_hashes = HashSet::new();
if let Some(subtrees) = maybe_subtrees {
for subtree in subtrees {
populate_starting_hashes(
repository,
first_commit,
&Some(subtree.to_path_buf()),
maybe_depth,
&mut starting_node_hashes,
)?;
}
} else {
populate_starting_hashes(
repository,
first_commit,
&None,
maybe_depth,
&mut starting_node_hashes,
)?;
}
if new_commits.is_empty() {
return Ok(starting_node_hashes);
}
let mut new_node_hashes: HashSet<MerkleHash> = HashSet::new();
for commit in new_commits {
get_node_hashes_for_commit(
repository,
commit,
maybe_subtrees,
maybe_depth,
is_download,
&mut starting_node_hashes,
&mut new_node_hashes,
)?;
}
log::debug!("New node hashes: {}", new_node_hashes.len());
Ok(new_node_hashes)
}
pub fn get_node_hashes_for_commit(
repository: &LocalRepository,
commit: &Commit,
maybe_subtrees: &Option<Vec<PathBuf>>,
maybe_depth: &Option<i32>,
is_download: bool,
starting_node_hashes: &mut HashSet<MerkleHash>,
new_node_hashes: &mut HashSet<MerkleHash>,
) -> Result<(), OxenError> {
log::debug!("get_node_hashes_for_commit: {}", commit.id);
if let Some(subtrees) = maybe_subtrees {
let mut all_parent_paths: Vec<PathBuf> = Vec::new();
log::debug!("subtrees: {subtrees:?}");
for subtree_path in subtrees {
let path = subtree_path.clone();
let mut current_path = path.clone();
all_parent_paths.push(current_path.clone());
while let Some(parent) = current_path.parent() {
all_parent_paths.push(parent.to_path_buf());
current_path = parent.to_path_buf();
}
all_parent_paths.reverse();
}
log::debug!("all_parent_paths: {all_parent_paths:?}");
for subtree in subtrees {
get_node_hashes_for_subtree(
repository,
commit,
&Some(subtree.clone()),
maybe_depth,
starting_node_hashes,
new_node_hashes,
)?;
}
log::debug!("new_node_hashes: {}", new_node_hashes.len());
if !is_download {
collect_nodes_along_path(
repository,
commit,
all_parent_paths,
starting_node_hashes,
new_node_hashes,
)?;
}
} else {
get_node_hashes_for_subtree(
repository,
commit,
&None,
maybe_depth,
starting_node_hashes,
new_node_hashes,
)?;
}
log::debug!("new_node_hashes: {}", new_node_hashes.len());
new_node_hashes.insert(commit.hash()?);
Ok(())
}
fn get_node_hashes_for_subtree(
repository: &LocalRepository,
commit: &Commit,
subtree_path: &Option<PathBuf>,
depth: &Option<i32>,
base_hashes: &mut HashSet<MerkleHash>,
new_node_hashes: &mut HashSet<MerkleHash>,
) -> Result<(), OxenError> {
let mut unique_hashes = HashSet::new();
let Ok(Some(_)) = CommitMerkleTreeLatest::read_depth_from_path_and_collect_hashes(
repository,
commit,
subtree_path.clone().unwrap_or(PathBuf::from(".")),
Some(base_hashes),
Some(&mut unique_hashes),
None,
depth.unwrap_or(-1),
) else {
return Ok(());
};
base_hashes.extend(unique_hashes.clone());
new_node_hashes.extend(unique_hashes);
Ok(())
}
pub fn populate_starting_hashes(
repository: &LocalRepository,
commit: &Commit,
subtree_path: &Option<PathBuf>,
depth: &Option<i32>,
unique_hashes: &mut HashSet<MerkleHash>,
) -> Result<(), OxenError> {
let Ok(Some(_)) = CommitMerkleTreeLatest::read_depth_from_path_and_collect_hashes(
repository,
commit,
subtree_path.clone().unwrap_or(PathBuf::from(".")),
None,
Some(unique_hashes),
None,
depth.unwrap_or(-1),
) else {
return Ok(());
};
Ok(())
}
pub fn get_ancestor_nodes(
repo: &LocalRepository,
commit: &Commit,
subtree_paths: &Vec<PathBuf>,
unique_hashes: &mut HashSet<MerkleHash>,
) -> Result<(), OxenError> {
for subtree_path in subtree_paths {
let parent = subtree_path.parent();
match parent {
Some(parent) => {
let ancestors = parent
.ancestors()
.map(|p| p.to_path_buf())
.collect::<Vec<PathBuf>>();
for ancestor in ancestors.iter().rev() {
let Some(node) =
repositories::tree::get_node_by_path_with_children(repo, commit, ancestor)?
else {
return Err(OxenError::basic_str(format!(
"Ancestor {ancestor:?} for subtree path {subtree_path:?} not found in merkle tree"
)));
};
let ancestor_node_hashes = node.list_dir_and_vnode_hashes()?;
log::debug!(
"get_ancestor_nodes found {} hashes for subtree path {:?}",
ancestor_node_hashes.len(),
subtree_path
);
unique_hashes.extend(ancestor_node_hashes);
}
}
None => {
break; }
}
}
Ok(())
}
pub fn print_tree(repo: &LocalRepository, commit: &Commit) -> Result<(), OxenError> {
let tree = get_root_with_children(repo, commit)?.unwrap();
match repo.min_version() {
MinOxenVersion::V0_19_0 => {
CommitMerkleTreeV0_19_0::print_node(&tree);
}
_ => {
CommitMerkleTreeLatest::print_node(&tree);
}
}
Ok(())
}
pub fn print_tree_depth_subtree(
repo: &LocalRepository,
commit: &Commit,
depth: i32,
subtree: &PathBuf,
) -> Result<(), OxenError> {
let tree = get_subtree_by_depth(repo, commit, subtree, depth)?.unwrap();
match repo.min_version() {
MinOxenVersion::V0_19_0 => {
CommitMerkleTreeV0_19_0::print_node_depth(&tree, depth);
}
_ => {
CommitMerkleTreeLatest::print_node_depth(&tree, depth);
}
}
Ok(())
}
pub fn print_tree_path(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
) -> Result<(), OxenError> {
let tree = get_node_by_path_with_children(repo, commit, path)?.unwrap();
match repo.min_version() {
MinOxenVersion::V0_19_0 => {
CommitMerkleTreeV0_19_0::print_node(&tree);
}
_ => {
CommitMerkleTreeLatest::print_node(&tree);
}
}
Ok(())
}
pub fn print_tree_depth(
repo: &LocalRepository,
commit: &Commit,
depth: i32,
) -> Result<(), OxenError> {
let tree = get_root_with_children(repo, commit)?.unwrap();
match repo.min_version() {
MinOxenVersion::V0_19_0 => {
CommitMerkleTreeV0_19_0::print_node_depth(&tree, depth);
}
_ => {
CommitMerkleTreeLatest::print_node_depth(&tree, depth);
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::OxenError;
use crate::model::Commit;
use crate::opts::RmOpts;
use crate::repositories;
use crate::test;
use crate::util;
use bytesize::ByteSize;
use std::collections::BTreeMap;
use std::path::PathBuf;
#[tokio::test]
async fn test_list_tabular_files_in_repo() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|repo| async move {
let dir_path = repo
.path
.join("data")
.join("train")
.join("images")
.join("cats");
util::fs::create_dir_all(&dir_path)?;
let filename = "cats.tsv";
let filepath = dir_path.join(filename);
util::fs::write(filepath, "1\t2\t3\nhello\tworld\tsup\n")?;
let filename = "dogs.csv";
let filepath = dir_path.join(filename);
util::fs::write(filepath, "1,2,3\nhello,world,sup\n")?;
let filename = "README.md";
let filepath = dir_path.join(filename);
util::fs::write(filepath, "readme....")?;
let filename = "labels.tsv";
let filepath = repo.path.join(filename);
util::fs::write(filepath, "1\t2\t3\nhello\tworld\tsup\n")?;
let filename = "labels.txt";
let filepath = repo.path.join(filename);
util::fs::write(filepath, "1\t2\t3\nhello\tworld\tsup\n")?;
repositories::add(&repo, &repo.path).await?;
let commit = repositories::commit(&repo, "Adding all the data")?;
let files = repositories::tree::list_tabular_files_in_repo(&repo, &commit)?;
assert_eq!(files.len(), 3);
let filename = "dogs.tsv";
let filepath = repo.path.join(filename);
util::fs::write(filepath, "1\t2\t3\nhello\tworld\tsup\n")?;
repositories::add(&repo, &repo.path).await?;
let commit = repositories::commit(&repo, "Adding additional file")?;
let files = repositories::tree::list_tabular_files_in_repo(&repo, &commit)?;
assert_eq!(files.len(), 4);
util::fs::remove_dir_all(&dir_path)?;
let opts = RmOpts {
path: dir_path,
staged: false,
recursive: true,
};
repositories::rm(&repo, &opts).await?;
let commit = repositories::commit(&repo, "Removing dir")?;
let files = repositories::tree::list_tabular_files_in_repo(&repo, &commit)?;
assert_eq!(files.len(), 2);
Ok(())
})
.await
}
#[tokio::test]
async fn test_merkle_two_files_same_hash() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|local_repo| async move {
let p1 = "hi.txt";
let p2 = "bye.txt";
let path_1 = local_repo.path.join(p1);
let path_2 = local_repo.path.join(p2);
let common_contents = "the same file";
test::write_txt_file_to_path(&path_1, common_contents)?;
test::write_txt_file_to_path(&path_2, common_contents)?;
repositories::add(&local_repo, &path_1).await?;
repositories::add(&local_repo, &path_2).await?;
let status = repositories::status(&local_repo).await?;
log::debug!("staged files here are {:?}", status.staged_files);
assert_eq!(status.staged_files.len(), 2);
assert!(status.staged_files.contains_key(&PathBuf::from(p1)));
assert!(status.staged_files.contains_key(&PathBuf::from(p2)));
let commit = repositories::commit(&local_repo, "add two files")?;
assert!(repositories::tree::has_path(
&local_repo,
&commit,
PathBuf::from(p1)
)?);
assert!(repositories::tree::has_path(
&local_repo,
&commit,
PathBuf::from(p2)
)?);
Ok(())
})
.await
}
#[tokio::test]
async fn test_get_node_hashes_between_commits() -> Result<(), OxenError> {
test::run_local_repo_training_data_committed_async(|repo| async move {
let starting_commit = repositories::commits::head_commit(&repo)?;
println!("Starting commit: {}", starting_commit.id);
let new_file1 = repo.path.join("new_file1.txt");
let new_file2 = repo.path.join("new_dir1").join("new_file2.txt");
util::fs::create_dir_all(new_file2.parent().unwrap())?;
util::fs::write(&new_file1, "This is new file 1 content")?;
util::fs::write(&new_file2, "This is new file 2 content")?;
repositories::add(&repo, &new_file1).await?;
repositories::add(&repo, &new_file2).await?;
let commit1 = repositories::commit(&repo, "Add first batch of new files")?;
let new_file3 = repo.path.join("new_dir2").join("new_file3.csv");
util::fs::create_dir_all(new_file3.parent().unwrap())?;
util::fs::write(&new_file3, "col1,col2,col3\nval1,val2,val3\n")?;
repositories::add(&repo, &new_file3).await?;
let commit2 = repositories::commit(&repo, "Add second batch of new files")?;
let commit_list = vec![starting_commit, commit1.clone(), commit2.clone()];
let new_hashes = super::get_node_hashes_between_commits(
&repo,
&commit_list,
&None, &None, false, )?;
assert!(
new_hashes.len() == 10,
"Should have found 10 unique node hashes for new commits, got {}",
new_hashes.len()
);
assert!(
new_hashes.contains(&commit1.hash()?),
"Should include first new commit hash"
);
assert!(
new_hashes.contains(&commit2.hash()?),
"Should include second new commit hash"
);
let subtree_path = PathBuf::from("new_dir1");
let subtree_hashes = super::get_node_hashes_between_commits(
&repo,
&commit_list,
&Some(vec![subtree_path]),
&None,
false,
)?;
assert!(
subtree_hashes.len() == 8,
"Subtree filter should return 8 hashes, got {}",
subtree_hashes.len()
);
let depth_limited_hashes = super::get_node_hashes_between_commits(
&repo,
&commit_list,
&None,
&Some(0), false,
)?;
assert!(
depth_limited_hashes.len() == 6,
"Depth filter should return 6 hashes, got {}",
depth_limited_hashes.len()
);
Ok(())
})
.await
}
fn list_tar_entries(buffer: &[u8]) -> BTreeMap<PathBuf, Vec<u8>> {
let mut out = BTreeMap::new();
let decoder = GzDecoder::new(buffer);
let mut archive = Archive::new(decoder);
for entry in archive.entries().expect("tar entries failed") {
let mut entry = entry.expect("tar entry failed");
let path = entry.path().expect("tar path failed").into_owned();
let mut bytes = Vec::new();
std::io::Read::read_to_end(&mut entry, &mut bytes).expect("tar read failed");
out.insert(path, bytes);
}
out
}
fn compress_tree(repository: &LocalRepository) -> Result<Vec<u8>, OxenError> {
let enc = GzEncoder::new(Vec::new(), Compression::fast());
let mut tar = tar::Builder::new(enc);
compress_full_tree(repository, &mut tar)?;
tar.finish()?;
let buffer: Vec<u8> = tar.into_inner()?.finish()?;
let total_size: u64 = u64::try_from(buffer.len()).unwrap_or(u64::MAX);
log::debug!("Compressed entire tree size is {}", ByteSize::b(total_size));
Ok(buffer)
}
fn compress_full_tree(
repository: &LocalRepository,
tar: &mut tar::Builder<GzEncoder<Vec<u8>>>,
) -> Result<(), OxenError> {
let tar_subdir = Path::new(TREE_DIR).join(NODES_DIR);
let nodes_dir = repository
.path
.join(OXEN_HIDDEN_DIR)
.join(TREE_DIR)
.join(NODES_DIR);
log::debug!("Compressing tree in dir {nodes_dir:?}");
if nodes_dir.exists() {
tar.append_dir_all(&tar_subdir, nodes_dir)?;
}
Ok(())
}
fn compress_nodes(
repository: &LocalRepository,
hashes: &HashSet<MerkleHash>,
) -> Result<Vec<u8>, OxenError> {
let enc = GzEncoder::new(Vec::new(), Compression::fast());
let mut tar = tar::Builder::new(enc);
log::debug!("Compressing {} unique nodes...", hashes.len());
for hash in hashes {
let dir_prefix = hash.to_hex_hash().node_db_prefix();
let tar_subdir = Path::new(TREE_DIR).join(NODES_DIR).join(dir_prefix);
let node_dir = node_db_path(&repository.path, hash);
if node_dir.exists() {
tar.append_dir_all(&tar_subdir, node_dir)?;
}
}
tar.finish()?;
let buffer: Vec<u8> = tar.into_inner()?.finish()?;
Ok(buffer)
}
fn compress_node(
repository: &LocalRepository,
hash: &MerkleHash,
) -> Result<Vec<u8>, OxenError> {
let dir_prefix = hash.to_hex_hash().node_db_prefix();
let tar_subdir = Path::new(TREE_DIR).join(NODES_DIR).join(dir_prefix);
let enc = GzEncoder::new(Vec::new(), Compression::fast());
let mut tar = tar::Builder::new(enc);
let node_dir = node_db_path(&repository.path, hash);
if node_dir.exists() {
tar.append_dir_all(&tar_subdir, node_dir)?;
}
tar.finish()?;
let buffer: Vec<u8> = tar.into_inner()?.finish()?;
let total_size: u64 = u64::try_from(buffer.len()).unwrap_or(u64::MAX);
log::debug!(
"Compressed node {} size is {}",
hash,
ByteSize::b(total_size)
);
Ok(buffer)
}
fn compress_commits(
repository: &LocalRepository,
commits: &Vec<Commit>,
) -> Result<Vec<u8>, OxenError> {
let enc = GzEncoder::new(Vec::new(), Compression::fast());
let mut tar = tar::Builder::new(enc);
for commit in commits {
let hash = commit.hash()?;
let dir_prefix = hash.to_hex_hash().node_db_prefix();
let tar_subdir = Path::new(TREE_DIR).join(NODES_DIR).join(dir_prefix);
let node_dir = node_db_path(&repository.path, &hash);
log::debug!("Compressing commit from dir {node_dir:?}");
if node_dir.exists() {
tar.append_dir_all(&tar_subdir, node_dir)?;
}
}
tar.finish()?;
let buffer: Vec<u8> = tar.into_inner()?.finish()?;
Ok(buffer)
}
async fn node_download_request_unpack_old(
repository: &LocalRepository,
buffer: &[u8],
) -> Result<(), OxenError> {
let cursor = futures::io::Cursor::new(buffer.to_vec());
let decoder = async_compression::futures::bufread::GzipDecoder::new(
futures::io::BufReader::new(cursor),
);
let archive = async_tar::Archive::new(decoder);
let dst = repository.path.join(OXEN_HIDDEN_DIR);
util::fs::create_dir_all(&dst)?;
util::fs::unpack_async_tar_archive(archive, &dst).await?;
Ok(())
}
fn create_nodes_pack_old(
local_repo: &LocalRepository,
nodes: &HashSet<MerkleHash>,
) -> Result<Vec<u8>, OxenError> {
let enc = GzEncoder::new(Vec::new(), Compression::default());
let mut tar = tar::Builder::new(enc);
let node_path = local_repo
.path
.join(OXEN_HIDDEN_DIR)
.join(TREE_DIR)
.join(NODES_DIR);
for node_hash in nodes {
let dir_prefix = node_hash.to_hex_hash().node_db_prefix();
let node_dir = node_path.join(&dir_prefix);
tar.append_dir_all(dir_prefix, node_dir)?;
}
tar.finish()?;
let buffer: Vec<u8> = tar.into_inner()?.finish()?;
Ok(buffer)
}
fn unpack_nodes(
repository: &LocalRepository,
buffer: &[u8],
) -> Result<HashSet<MerkleHash>, OxenError> {
let mut hashes: HashSet<MerkleHash> = HashSet::new();
log::debug!("Unpacking nodes from buffer...");
let decoder = GzDecoder::new(buffer);
log::debug!("Decoder created");
let mut archive = Archive::new(decoder);
log::debug!("Archive created");
let Ok(entries) = archive.entries() else {
return Err(OxenError::basic_str(
"Could not unpack tree database from archive",
));
};
log::debug!("Extracting entries...");
for file in entries {
let Ok(mut file) = file else {
log::error!("Could not unpack file in archive...");
continue;
};
let path = file.path().unwrap();
let oxen_hidden_path = repository.path.join(OXEN_HIDDEN_DIR);
let dst_path = oxen_hidden_path.join(TREE_DIR).join(NODES_DIR).join(path);
if let Some(parent) = dst_path.parent() {
util::fs::create_dir_all(parent).expect("Could not create parent dir");
}
if dst_path.exists() {
log::debug!("Node already exists at {dst_path:?}");
continue;
}
file.unpack(&dst_path)?;
if !dst_path.ends_with("node") && !dst_path.ends_with("children") {
let id = dst_path
.components()
.rev()
.take(2)
.map(|c| c.as_os_str().to_str().unwrap())
.collect::<Vec<&str>>()
.into_iter()
.rev()
.collect::<String>();
hashes.insert(id.parse()?);
}
}
Ok(hashes)
}
#[tokio::test]
async fn test_transport_round_trip() -> Result<(), OxenError> {
test::run_one_commit_local_repo_test(|repo| {
let mut packed = Vec::new();
pack_all(&repo, &mut packed).expect("pack_all failed");
assert!(!packed.is_empty(), "pack_all produced empty buffer");
let tmp = tempfile::TempDir::new()?;
let clone = repositories::init(tmp.path())?;
let installed =
unpack(&clone, &mut &packed[..], UnpackOptions::Overwrite).expect("unpack failed");
assert!(!installed.is_empty(), "unpack installed no nodes");
for hash in &installed {
assert!(
MerkleNodeDB::exists(&clone.path, hash),
"expected installed hash {hash} to be readable"
);
}
Ok(())
})
.await
}
#[tokio::test]
async fn test_compress_nodes_wire_format_unchanged() -> Result<(), OxenError> {
test::run_one_commit_local_repo_test(|repo| {
let head = repositories::commits::head_commit(&repo)?;
let hashes = HashSet::from_iter([head.hash().expect("no commit for head")]);
let old_pack_method =
compress_nodes(&repo, &hashes).expect("could not compress Merkle tree nodes");
let new_pack_method = {
let mut via_pack = Vec::new();
pack_nodes(&repo, &hashes, PackOptions::ServerCanonical, &mut via_pack)
.expect("pack_nodes failed");
via_pack
};
assert_eq!(
list_tar_entries(&old_pack_method),
list_tar_entries(&new_pack_method),
"tar entry set differs between compress_nodes helper and pack_nodes"
);
Ok(())
})
.await
}
#[tokio::test]
async fn test_compress_tree_wire_format_unchanged() -> Result<(), OxenError> {
test::run_one_commit_local_repo_test(|repo| {
let old_pack_method = compress_tree(&repo)?;
let new_pack_method = {
let mut via_pack = Vec::new();
pack_all(&repo, &mut via_pack).expect("pack_all failed");
via_pack
};
assert_eq!(
list_tar_entries(&old_pack_method),
list_tar_entries(&new_pack_method),
"tar entry set differs between compress_tree helper and pack_all"
);
Ok(())
})
.await
}
#[tokio::test]
async fn test_compress_node_wire_format_unchanged() -> Result<(), OxenError> {
test::run_one_commit_local_repo_test(|repo| {
let head = repositories::commits::head_commit(&repo)?;
let hash = head.hash().expect("no commit for head");
let old_pack_method =
compress_node(&repo, &hash).expect("could not compress Merkle tree node");
let new_pack_method = {
let hashes = HashSet::from_iter([hash]);
let mut via_pack = Vec::new();
pack_nodes(&repo, &hashes, PackOptions::ServerCanonical, &mut via_pack)
.expect("pack_nodes failed");
via_pack
};
assert_eq!(
list_tar_entries(&old_pack_method),
list_tar_entries(&new_pack_method),
"tar entry set differs between compress_node helper and pack_nodes"
);
Ok(())
})
.await
}
#[tokio::test]
async fn test_compress_commits_wire_format_unchanged() -> Result<(), OxenError> {
test::run_one_commit_local_repo_test(|repo| {
let head = repositories::commits::head_commit(&repo)?;
let commits: Vec<Commit> = vec![head];
let old_pack_method =
compress_commits(&repo, &commits).expect("could not compress Merkle tree commits");
let new_pack_method = {
let mut hashes = HashSet::with_capacity(commits.len());
for c in &commits {
hashes.insert(c.hash().expect("no hash for commit"));
}
let mut via_pack = Vec::new();
pack_nodes(&repo, &hashes, PackOptions::ServerCanonical, &mut via_pack)
.expect("pack_nodes failed");
via_pack
};
assert_eq!(
list_tar_entries(&old_pack_method),
list_tar_entries(&new_pack_method),
"tar entry set differs between compress_commits helper and pack_nodes"
);
Ok(())
})
.await
}
fn compress_nodes_client_push_format(
repo: &LocalRepository,
hashes: &HashSet<MerkleHash>,
) -> Result<Vec<u8>, OxenError> {
let enc = GzEncoder::new(Vec::new(), Compression::default());
let mut tar = tar::Builder::new(enc);
let node_path = repo
.path
.join(OXEN_HIDDEN_DIR)
.join(TREE_DIR)
.join(NODES_DIR);
for hash in hashes {
let dir_prefix = hash.to_hex_hash().node_db_prefix();
let node_dir = node_path.join(&dir_prefix);
if node_dir.exists() {
tar.append_dir_all(dir_prefix, node_dir)?;
}
}
tar.finish()?;
Ok(tar.into_inner()?.finish()?)
}
#[tokio::test]
async fn test_unpack_nodes_unchanged() -> Result<(), OxenError> {
test::run_one_commit_local_repo_test(|repo| {
let head = repositories::commits::head_commit(&repo)?;
let hashes = HashSet::from_iter([head.hash().expect("no commit for head")]);
let bytes = compress_nodes_client_push_format(&repo, &hashes)
.expect("client-push-format pack failed");
let tmp_old = tempfile::TempDir::new()?;
let repo_old = repositories::init(tmp_old.path())?;
let old_hashes = unpack_nodes(&repo_old, &bytes).expect("old unpack_nodes failed");
let tmp_new = tempfile::TempDir::new()?;
let repo_new = repositories::init(tmp_new.path())?;
let new_hashes = unpack(&repo_new, &mut &bytes[..], UnpackOptions::SkipExisting)
.expect("new unpack failed");
assert_eq!(
old_hashes, new_hashes,
"reported hash sets differ between unpack_nodes helper and unpack"
);
assert!(
!new_hashes.is_empty(),
"no hashes were unpacked — test input was empty"
);
for h in &new_hashes {
assert!(
MerkleNodeDB::exists(&repo_old.path, h),
"hash {h} not readable in repo unpacked via legacy unpack_nodes"
);
assert!(
MerkleNodeDB::exists(&repo_new.path, h),
"hash {h} not readable in repo unpacked via unpack"
);
}
Ok(())
})
.await
}
fn collect_dir_contents(root: &Path) -> BTreeMap<PathBuf, Vec<u8>> {
let mut out: BTreeMap<PathBuf, Vec<u8>> = BTreeMap::new();
if !root.exists() {
return out;
}
for entry in walkdir::WalkDir::new(root)
.follow_links(false)
.into_iter()
.filter_map(Result::ok)
{
let path = entry.path();
if !path.is_file() {
continue;
}
let rel = path.strip_prefix(root).unwrap_or(path).to_path_buf();
let bytes = std::fs::read(path).expect("failed to read file under root");
out.insert(rel, bytes);
}
out
}
#[tokio::test]
async fn test_node_download_request_unpack_unchanged() -> Result<(), OxenError> {
test::run_one_commit_local_repo_test_async(|repo| async move {
let mut packed = Vec::new();
pack_all(&repo, &mut packed).expect("pack_all failed");
assert!(!packed.is_empty(), "pack_all produced empty buffer");
let tmp_old = tempfile::TempDir::new()?;
let repo_old = repositories::init(tmp_old.path())?;
node_download_request_unpack_old(&repo_old, &packed)
.await
.expect("old unpack failed");
let tmp_new = tempfile::TempDir::new()?;
let repo_new = repositories::init(tmp_new.path())?;
let installed = unpack(&repo_new, &mut &packed[..], UnpackOptions::Overwrite)
.expect("new unpack failed");
let old_tree = collect_dir_contents(
&repo_old
.path
.join(OXEN_HIDDEN_DIR)
.join(TREE_DIR)
.join(NODES_DIR),
);
let new_tree = collect_dir_contents(
&repo_new
.path
.join(OXEN_HIDDEN_DIR)
.join(TREE_DIR)
.join(NODES_DIR),
);
assert!(
!old_tree.is_empty(),
"no files installed via old node_download_request unpack — test input was empty"
);
assert_eq!(
old_tree, new_tree,
"on-disk merkle node trees differ between old node_download_request \
unpack and new unpack"
);
assert!(!installed.is_empty(), "unpack reported no hashes");
for h in &installed {
assert!(
MerkleNodeDB::exists(&repo_new.path, h),
"hash {h} not readable in repo unpacked via unpack"
);
}
Ok(())
})
.await
}
#[tokio::test]
async fn test_unpack_rejects_path_traversal() -> Result<(), OxenError> {
let mut buf = Vec::new();
{
let enc = GzEncoder::new(&mut buf, Compression::fast());
let mut tar = tar::Builder::new(enc);
let mut header = tar::Header::new_old();
header.set_size(0);
header.set_mode(0o644);
header.set_entry_type(tar::EntryType::Regular);
let name_bytes = b"tree/nodes/../escape";
let old = header.as_old_mut();
old.name[..name_bytes.len()].copy_from_slice(name_bytes);
header.set_cksum();
tar.append(&header, std::io::Cursor::new(Vec::new()))?;
tar.finish()?;
tar.into_inner()?.finish()?;
}
let tmp = tempfile::TempDir::new()?;
let repo = repositories::init(tmp.path())?;
let err = unpack(&repo, &mut &buf[..], UnpackOptions::Overwrite)
.expect_err("path traversal must be rejected");
let msg = format!("{err}");
assert!(
msg.contains("Path traversal"),
"unexpected error message: {msg}"
);
Ok(())
}
#[tokio::test]
async fn test_unpack_rejects_unsupported_entry_type() -> Result<(), OxenError> {
let mut buf = Vec::new();
{
let enc = GzEncoder::new(&mut buf, Compression::fast());
let mut tar = tar::Builder::new(enc);
let mut header = tar::Header::new_gnu();
header.set_size(0);
header.set_mode(0o777);
header.set_entry_type(tar::EntryType::Symlink);
header.set_link_name("/etc/passwd")?;
header.set_cksum();
tar.append_data(
&mut header,
"tree/nodes/some_link",
std::io::Cursor::new(Vec::new()),
)?;
tar.finish()?;
tar.into_inner()?.finish()?;
}
let tmp = tempfile::TempDir::new()?;
let repo = repositories::init(tmp.path())?;
let err = unpack(&repo, &mut &buf[..], UnpackOptions::Overwrite)
.expect_err("unsupported entry type must be rejected");
let msg = format!("{err}");
assert!(
msg.contains("Unsupported tar entry"),
"unexpected error message: {msg}"
);
Ok(())
}
#[tokio::test]
async fn test_unpack_recovers_hash_with_leading_zero_nibbles() -> Result<(), OxenError> {
test::run_empty_local_repo_test(|repo| {
let stripped_hash = MerkleHash::new(0x1234_u128);
let hex = stripped_hash.to_string();
assert!(
hex.len() < 32,
"expected hex form < 32 chars to exercise the regression, got {hex:?}"
);
let prefix = stripped_hash.to_hex_hash().node_db_prefix();
let nodes_root = repo
.path
.join(crate::constants::OXEN_HIDDEN_DIR)
.join(crate::constants::TREE_DIR)
.join(crate::constants::NODES_DIR);
let node_dir = nodes_root.join(prefix);
std::fs::create_dir_all(&node_dir)?;
std::fs::write(node_dir.join("node"), b"node-bytes")?;
std::fs::write(node_dir.join("children"), b"children-bytes")?;
let hashes = HashSet::from_iter([stripped_hash]);
let mut buf = Vec::new();
pack_nodes(&repo, &hashes, PackOptions::ServerCanonical, &mut buf)
.expect("pack_nodes failed");
let tmp = tempfile::TempDir::new()?;
let target = repositories::init(tmp.path())?;
let installed =
unpack(&target, &mut &buf[..], UnpackOptions::Overwrite).expect("unpack failed");
assert!(
installed.contains(&stripped_hash),
"unpack must report the short-hex hash; got {installed:?}"
);
Ok(())
})
}
#[tokio::test]
async fn test_unpack_rejects_non_hex_node_id() -> Result<(), OxenError> {
let mut buf = Vec::new();
{
let enc = GzEncoder::new(&mut buf, Compression::fast());
let mut tar = tar::Builder::new(enc);
for path in &["tree/nodes", "tree/nodes/abc", "tree/nodes/abc/zzzznothex"] {
let mut header = tar::Header::new_gnu();
header.set_size(0);
header.set_mode(0o755);
header.set_entry_type(tar::EntryType::Directory);
header.set_cksum();
tar.append_data(&mut header, path, std::io::Cursor::new(Vec::new()))?;
}
tar.finish()?;
tar.into_inner()?.finish()?;
}
let tmp = tempfile::TempDir::new()?;
let repo = repositories::init(tmp.path())?;
let err = unpack(&repo, &mut &buf[..], UnpackOptions::Overwrite)
.expect_err("non-hex node id must be rejected");
let msg = format!("{err}");
assert!(
msg.contains("Invalid merkle node id") && msg.contains("abczzzznothex"),
"expected InvalidNodeIdHex error mentioning the offending id; got {msg}"
);
Ok(())
}
#[tokio::test]
async fn test_unpack_rejects_path_too_deep() -> Result<(), OxenError> {
let mut buf = Vec::new();
{
let enc = GzEncoder::new(&mut buf, Compression::fast());
let mut tar = tar::Builder::new(enc);
for path in &[
"tree/nodes",
"tree/nodes/abc",
"tree/nodes/abc/def0123",
"tree/nodes/abc/def0123/extra",
] {
let mut header = tar::Header::new_gnu();
header.set_size(0);
header.set_mode(0o755);
header.set_entry_type(tar::EntryType::Directory);
header.set_cksum();
tar.append_data(&mut header, path, std::io::Cursor::new(Vec::new()))?;
}
let mut file_header = tar::Header::new_gnu();
file_header.set_size(0);
file_header.set_mode(0o644);
file_header.set_entry_type(tar::EntryType::Regular);
file_header.set_cksum();
tar.append_data(
&mut file_header,
"tree/nodes/abc/def0123/extra/node",
std::io::Cursor::new(Vec::new()),
)?;
tar.finish()?;
tar.into_inner()?.finish()?;
}
let tmp = tempfile::TempDir::new()?;
let repo = repositories::init(tmp.path())?;
let err = unpack(&repo, &mut &buf[..], UnpackOptions::Overwrite)
.expect_err("over-deep entry must be rejected");
let msg = format!("{err}");
assert!(
msg.contains("Invalid merkle tar archive structure"),
"expected InvalidTarStructure error; got {msg}"
);
Ok(())
}
#[tokio::test]
async fn test_unpack_rejects_unknown_leaf_filename() -> Result<(), OxenError> {
let mut buf = Vec::new();
{
let enc = GzEncoder::new(&mut buf, Compression::fast());
let mut tar = tar::Builder::new(enc);
for path in &["tree/nodes", "tree/nodes/abc", "tree/nodes/abc/def0123"] {
let mut header = tar::Header::new_gnu();
header.set_size(0);
header.set_mode(0o755);
header.set_entry_type(tar::EntryType::Directory);
header.set_cksum();
tar.append_data(&mut header, path, std::io::Cursor::new(Vec::new()))?;
}
let mut file_header = tar::Header::new_gnu();
file_header.set_size(0);
file_header.set_mode(0o644);
file_header.set_entry_type(tar::EntryType::Regular);
file_header.set_cksum();
tar.append_data(
&mut file_header,
"tree/nodes/abc/def0123/unexpected.txt",
std::io::Cursor::new(Vec::new()),
)?;
tar.finish()?;
tar.into_inner()?.finish()?;
}
let tmp = tempfile::TempDir::new()?;
let repo = repositories::init(tmp.path())?;
let err = unpack(&repo, &mut &buf[..], UnpackOptions::Overwrite)
.expect_err("unknown leaf filename must be rejected");
let msg = format!("{err}");
assert!(
msg.contains("Invalid merkle tar archive structure") && msg.contains("unexpected.txt"),
"expected InvalidTarStructure error mentioning the offending filename; got {msg}"
);
Ok(())
}
#[test]
fn test_extract_hash_from_entry_path_is_path_separator_agnostic() {
let hash = MerkleHash::new(0xfeed_face_u128);
let hex = hash.to_string();
let prefix: String = hex.chars().take(3).collect();
let suffix: String = hex.chars().skip(3).collect();
let oxen_hidden = PathBuf::from("repo").join(".oxen");
let under_tree_nodes = |tail: &[&str]| -> PathBuf {
let mut s = PathBuf::from("repo")
.join(".oxen")
.join("tree")
.join("nodes");
for t in tail {
if !t.is_empty() {
s.push(t);
}
}
s
};
let p = under_tree_nodes(&[""]);
assert_eq!(
extract_hash_from_entry_path(&p, &oxen_hidden)
.expect("Could not extract hash from entry path"),
None
);
let p = under_tree_nodes(&[prefix.as_str()]);
assert_eq!(
extract_hash_from_entry_path(&p, &oxen_hidden)
.expect("Could not extract hash from entry path"),
None
);
let p = under_tree_nodes(&[prefix.as_str(), suffix.as_str()]);
assert_eq!(
extract_hash_from_entry_path(&p, &oxen_hidden)
.expect("Could not extract hash from entry path"),
Some(hash),
"{prefix}/{suffix} must classify as the hash dir on every platform"
);
let p = under_tree_nodes(&[prefix.as_str(), suffix.as_str(), "node"]);
assert_eq!(
extract_hash_from_entry_path(&p, &oxen_hidden)
.expect("Could not extract hash from entry path"),
None
);
let p = under_tree_nodes(&[prefix.as_str(), suffix.as_str(), "children"]);
assert_eq!(
extract_hash_from_entry_path(&p, &oxen_hidden)
.expect("Could not extract hash from entry path"),
None
);
let p = oxen_hidden
.join("tree")
.join("nodes")
.join(&prefix)
.join(&suffix);
assert_eq!(
extract_hash_from_entry_path(&p, &oxen_hidden).unwrap(),
Some(hash),
"platform-native joins must classify the same as forward-slash literals"
);
}
#[tokio::test]
async fn test_create_nodes_wire_format_unchanged() -> Result<(), OxenError> {
test::run_one_commit_local_repo_test(|repo| {
let head = repositories::commits::head_commit(&repo)?;
let hashes = HashSet::from_iter([head.hash().expect("no commit for head")]);
let old_pack =
create_nodes_pack_old(&repo, &hashes).expect("legacy create_nodes pack failed");
let new_pack = {
let mut buf = Vec::new();
pack_nodes(&repo, &hashes, PackOptions::LegacyClientPush, &mut buf)
.expect("new pack failed");
buf
};
assert_eq!(
list_tar_entries(&old_pack),
list_tar_entries(&new_pack),
"tar entry set differs between create_nodes pack body and new upload pack"
);
Ok(())
})
.await
}
#[tokio::test]
async fn test_unpack_empty_tarball() -> Result<(), OxenError> {
test::run_one_commit_local_repo_test(|repo| {
let mut buf = Vec::new();
pack_nodes(
&repo,
&HashSet::new(),
PackOptions::ServerCanonical,
&mut buf,
)
.expect("pack_nodes(empty) must not error");
let tmp = tempfile::TempDir::new()?;
let target = repositories::init(tmp.path())?;
let installed = unpack(&target, &mut &buf[..], UnpackOptions::Overwrite)
.expect("unpack of empty tarball must not error");
assert!(
installed.is_empty(),
"expected empty hash set from unpacking an empty tarball, got {} entries",
installed.len()
);
Ok(())
})
.await
}
#[tokio::test]
async fn test_pack_nodes_skips_absent_hashes() -> Result<(), OxenError> {
test::run_one_commit_local_repo_test(|repo| {
let head = repositories::commits::head_commit(&repo).expect("no head commit");
let head_hash = head.hash().expect("no commit for head");
let absent = MerkleHash::new(0xDEAD_BEEF_DEAD_BEEF_DEAD_BEEF_DEAD_BEEF_u128);
let mut hashes = HashSet::with_capacity(2);
hashes.insert(head_hash);
hashes.insert(absent);
let mut buf = Vec::new();
pack_nodes(&repo, &hashes, PackOptions::ServerCanonical, &mut buf)
.expect("pack_nodes failed");
let to_fwd = |p: &Path| p.to_string_lossy().replace('\\', "/");
let head_prefix = to_fwd(&head_hash.to_hex_hash().node_db_prefix());
let absent_prefix = to_fwd(&absent.to_hex_hash().node_db_prefix());
let entries = list_tar_entries(&buf);
let any_head = entries.keys().any(|p| to_fwd(p).contains(&head_prefix));
let any_absent = entries.keys().any(|p| to_fwd(p).contains(&absent_prefix));
assert!(any_head, "expected entries for the head hash prefix");
assert!(
!any_absent,
"expected no entries for the absent hash prefix; got at least one matching entry"
);
Ok(())
})
.await
}
#[tokio::test]
async fn test_pack_nodes_byte_estimate_is_upper_bound() -> Result<(), OxenError> {
test::run_one_commit_local_repo_test(|repo| {
let head = repositories::commits::head_commit(&repo)?;
let head_hash = head.hash().expect("no commit for head");
let mut hashes = HashSet::new();
hashes.insert(head_hash);
let estimate = pack_nodes_byte_estimate(&repo, &hashes);
assert!(estimate > 0, "estimate must be non-zero for a present hash");
let mut buf = Vec::new();
pack_nodes(&repo, &hashes, PackOptions::ServerCanonical, &mut buf)
.expect("pack_nodes failed");
assert!(
estimate >= buf.len() as u64,
"estimate ({estimate}) should be an upper bound on actual gzipped output \
({} bytes)",
buf.len()
);
let absent = MerkleHash::new(0xDEAD_BEEF_DEAD_BEEF_DEAD_BEEF_DEAD_BEEF_u128);
let absent_only: HashSet<_> = HashSet::from_iter([absent]);
assert_eq!(
pack_nodes_byte_estimate(&repo, &absent_only),
0,
"absent hash should contribute 0 to the estimate"
);
let mut mixed = HashSet::new();
mixed.insert(head_hash);
mixed.insert(absent);
assert_eq!(
pack_nodes_byte_estimate(&repo, &mixed),
pack_nodes_byte_estimate(&repo, &hashes),
"absent hash must not change the estimate when added to a present hash"
);
Ok(())
})
.await
}
#[tokio::test]
async fn test_unpack_via_vfs_branch() -> Result<(), OxenError> {
test::run_one_commit_local_repo_test(|repo| {
let mut packed = Vec::new();
pack_all(&repo, &mut packed).expect("pack_all failed");
assert!(!packed.is_empty(), "pack_all produced empty buffer");
let tmp = tempfile::TempDir::new()?;
let mut clone = repositories::init(tmp.path())?;
clone.set_vfs(Some(true));
assert!(clone.is_vfs(), "vfs flag should be on for this test");
let installed = unpack(&clone, &mut &packed[..], UnpackOptions::Overwrite)
.expect("unpack via vfs branch failed");
assert!(
!installed.is_empty(),
"vfs unpack reported no installed hashes"
);
for h in &installed {
assert!(
MerkleNodeDB::exists(&clone.path, h),
"hash {h} not readable in vfs-cloned repo"
);
}
Ok(())
})
.await
}
}