use crate::constants::{CACHE_DIR, COMPARES_DIR, LEFT_COMPARE_COMMIT, RIGHT_COMPARE_COMMIT};
use crate::core::merge::entry_merge_conflict_reader::EntryMergeConflictReader;
use crate::core::versions::MinOxenVersion;
use crate::model::entry::commit_entry::CommitPath;
use crate::model::merkle_tree::node::FileNode;
use crate::core;
use crate::core::df::tabular;
use crate::error::OxenError;
use crate::model::diff::diff_entry_status::DiffEntryStatus;
use crate::model::diff::tabular_diff::{
TabularDiff, TabularDiffDupes, TabularDiffMods, TabularDiffParameters, TabularDiffSchemas,
TabularDiffSummary, TabularSchemaDiff,
};
use crate::model::staged_data::StagedDataOpts;
use crate::model::{
Commit, CommitEntry, DataFrameDiff, DiffEntry, EntryDataType, LocalRepository, ParsedResource,
Schema,
};
use crate::storage::version_store::VersionStore;
use crate::view::Pagination;
use crate::{constants, repositories, util};
use polars::prelude::DataFrame;
use polars::prelude::IntoLazy;
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
use crate::model::diff::DiffResult;
use crate::model::diff::diff_entries_counts::DiffEntriesCounts;
use crate::model::diff::schema_diff::SchemaDiff;
use crate::model::diff::{AddRemoveModifyCounts, TextDiff};
use crate::opts::{DFOpts, DiffOpts};
pub mod join_diff;
pub mod utf8_diff;
const TARGETS_HASH_COL: &str = "_targets_hash";
const KEYS_HASH_COL: &str = "_keys_hash";
const DUPES_PATH: &str = "dupes.json";
fn is_files_tabular(file_1: impl AsRef<Path>, file_2: impl AsRef<Path>) -> bool {
util::fs::is_tabular(file_1.as_ref()) && util::fs::is_tabular(file_2.as_ref())
}
fn is_files_utf8(file_1: impl AsRef<Path>, file_2: impl AsRef<Path>) -> bool {
util::fs::is_utf8(file_1.as_ref()) && util::fs::is_utf8(file_2.as_ref())
}
pub async fn diff(opts: DiffOpts) -> Result<Vec<DiffResult>, OxenError> {
log::debug!(
"Starting diff function with keys: {:?} and targets: {:?}",
opts.keys,
opts.targets
);
log::debug!("opts: {opts:?}");
let repo = match &opts.repo_dir {
Some(dir) => {
log::debug!("dir: {dir:?}");
LocalRepository::from_dir(dir)
}
None => {
log::debug!("current dir");
LocalRepository::from_current_dir()
}
};
let repo = match repo {
Err(e) => {
log::error!("Failed to get repo: {e}. Comparing files...");
let path_1 = opts.path_1;
let Some(path_2) = opts.path_2 else {
return Err(OxenError::basic_str(
"Error: `oxen diff` requires a repo or file paths",
));
};
let result = diff_files(
path_1,
path_2,
opts.keys.clone(),
opts.targets.clone(),
vec![],
)
.await?;
return Ok(vec![result]);
}
Ok(repo) => repo,
};
log::debug!(
"Processing paths and revisions - path_2: {:?}, revision_1: {:?}, revision_2: {:?}, path_1: {:?}",
opts.path_2,
opts.revision_1,
opts.revision_2,
opts.path_1
);
match (&opts.path_2, &opts.revision_1, &opts.revision_2) {
(Some(path_2), Some(rev_1), Some(rev_2)) => {
diff_revs(&repo, rev_1, &opts.path_1.clone(), rev_2, path_2, &opts).await
}
(None, Some(rev_1), Some(rev_2)) => {
diff_revs(
&repo,
rev_1,
&opts.path_1.clone(),
rev_2,
&opts.path_1.clone(),
&opts,
)
.await
}
(None, Some(rev_1), None) => {
diff_uncommitted(&repo, rev_1, &opts.path_1.clone(), &opts).await
}
(Some(path_2), None, None) => {
let result = diff_files(
opts.path_1,
path_2,
opts.keys.clone(),
opts.targets.clone(),
vec![],
)
.await?;
log::debug!("🚀 Direct file comparison completed successfully");
Ok(vec![result])
}
_ => Err(OxenError::basic_str(
"Single file comparison mode not supported",
)),
}
}
pub async fn diff_uncommitted(
repo: &LocalRepository,
rev_1: &str,
path_1: &Path,
opts: &DiffOpts,
) -> Result<Vec<DiffResult>, OxenError> {
let status_opts = StagedDataOpts::from_paths(&[path_1.to_path_buf()]);
let status = repositories::status::status_from_opts(repo, &status_opts)?;
let unstaged_files = status.unstaged_files();
log::debug!("unstaged_files: {unstaged_files:?}");
let commit_1 = repositories::revisions::get(repo, rev_1)?
.ok_or_else(|| OxenError::revision_not_found(rev_1.to_string().into()))?;
log::debug!("commit_1: {commit_1:?}");
let mut diff_result = Vec::new();
log::debug!("diff_result: {diff_result:?}");
for file in unstaged_files {
log::debug!("file: {file:?}");
let node_1 = match repositories::entries::get_file(repo, &commit_1, file.as_path()) {
Ok(node) => node,
Err(err) => {
log::error!("Failed to get file {file:?}: {err}");
None
}
};
log::debug!("node_1: {node_1:?}");
diff_result.push(
diff_file_and_node(
repo,
node_1,
file.as_path(),
opts.keys.clone(),
opts.targets.clone(),
vec![],
)
.await?,
);
log::debug!("diff_result: {diff_result:?}");
}
Ok(diff_result)
}
pub async fn diff_revs(
repo: &LocalRepository,
rev_1: &str,
path_1: &Path,
rev_2: &str,
path_2: &Path,
opts: &DiffOpts,
) -> Result<Vec<DiffResult>, OxenError> {
log::debug!(
"Comparing revisions: {}:{} with {}:{}",
rev_1,
path_1.display(),
rev_2,
path_2.display()
);
let commit_1 = repositories::revisions::get(repo, rev_1)?
.ok_or_else(|| OxenError::revision_not_found(rev_1.to_string().into()))?;
let commit_2 = repositories::revisions::get(repo, rev_2)?
.ok_or_else(|| OxenError::revision_not_found(rev_2.to_string().into()))?;
let dir_diff = diff_path(repo, &commit_1, &commit_2, path_1, path_2, opts).await?;
log::debug!(
"Directory structural diff found {} entries",
dir_diff.entries.len()
);
let mut content_diffs = vec![];
for entry in dir_diff.entries {
log::debug!("entry: {entry:#?}");
match (entry.head_resource, entry.base_resource) {
(Some(head_res), Some(base_res)) => {
log::debug!("Computing content diff for file: {:?}", head_res.path);
let cpath_1 = CommitPath {
commit: Some(commit_1.clone()),
path: head_res.path.clone(),
};
let cpath_2 = CommitPath {
commit: Some(commit_2.clone()),
path: base_res.path,
};
match diff_commits(
repo,
cpath_1,
cpath_2,
opts.keys.clone(),
opts.targets.clone(),
vec![],
)
.await
{
Ok(result) => {
log::debug!("Content diff successful for file: {:?}", head_res.path);
content_diffs.push(result);
}
Err(err) => {
log::debug!(
"Failed to compute diff for file {:?}: {}",
head_res.path,
err
);
}
}
}
(None, Some(base_res)) => {
log::debug!("Computing content diff for file: {:?}", base_res.path);
let cpath_1 = CommitPath {
commit: Some(commit_1.clone()),
path: base_res.path.clone(),
};
let cpath_2 = CommitPath {
commit: Some(commit_2.clone()),
path: base_res.path,
};
match diff_commits(
repo,
cpath_1,
cpath_2,
opts.keys.clone(),
opts.targets.clone(),
vec![],
)
.await
{
Ok(result) => {
content_diffs.push(result);
}
Err(err) => {
log::debug!("Failed to compute diff for file {err}");
}
}
}
(Some(head_res), None) => {
log::debug!("Computing content diff for file: {:?}", head_res.path);
let cpath_1 = CommitPath {
commit: Some(commit_1.clone()),
path: head_res.path.clone(),
};
let cpath_2 = CommitPath {
commit: Some(commit_2.clone()),
path: head_res.path,
};
match diff_commits(
repo,
cpath_1,
cpath_2,
opts.keys.clone(),
opts.targets.clone(),
vec![],
)
.await
{
Ok(result) => {
content_diffs.push(result);
}
Err(err) => {
log::debug!("Failed to compute diff for file {err}");
}
}
}
_ => {}
}
}
log::debug!(
"Directory diff completed with {} content diffs",
content_diffs.len()
);
Ok(content_diffs)
}
pub async fn diff_commits(
repo: &LocalRepository,
cpath_1: CommitPath,
cpath_2: CommitPath,
keys: Vec<String>,
targets: Vec<String>,
display: Vec<String>,
) -> Result<DiffResult, OxenError> {
log::debug!("Compare command called with: {cpath_1:?} and {cpath_2:?}");
let (node_1, node_2) = match (cpath_1.commit, cpath_2.commit) {
(Some(commit_1), Some(commit_2)) => {
log::debug!("Diffing commits: {commit_1:?} and {commit_2:?}");
let node_1 = repositories::entries::get_file(repo, &commit_1, &cpath_1.path)?;
log::debug!("Diffing commits: {commit_1:?} and {commit_2:?}");
let merger = EntryMergeConflictReader::new(repo)?;
log::debug!("Diffing merger:",);
let node_2 = match merger.has_conflicts()? {
true => {
let merger = EntryMergeConflictReader::new(repo)?;
log::debug!("Diffing merger: ");
repositories::entries::get_file(
repo,
&merger.get_conflict_commit()?.unwrap(),
&cpath_2.path,
)?
}
false => repositories::entries::get_file(repo, &commit_2, &cpath_2.path)?,
};
(node_1, node_2)
}
(Some(commit_1), None) => {
let node_1 = repositories::entries::get_file(repo, &commit_1, &cpath_1.path)?;
(node_1, None)
}
(None, Some(commit_2)) => {
let node_2 = repositories::entries::get_file(repo, &commit_2, &cpath_2.path)?;
(None, node_2)
}
(None, None) => (None, None),
};
let compare_result =
repositories::diffs::diff_file_nodes(repo, node_1, node_2, keys, targets, display).await?;
Ok(compare_result)
}
pub async fn diff_path(
repo: &LocalRepository,
base_commit: &Commit,
head_commit: &Commit,
base_path: impl AsRef<Path>,
head_path: impl AsRef<Path>,
opts: &DiffOpts,
) -> Result<DiffEntriesCounts, OxenError> {
match (base_path.as_ref().is_file(), head_path.as_ref().is_file()) {
(true, true) => {
let diff_entry = DiffEntry {
filename: head_path.as_ref().to_string_lossy().to_string(),
head_resource: Some(ParsedResource {
commit: Some(head_commit.clone()),
path: head_path.as_ref().to_path_buf(),
..ParsedResource::default() }),
base_resource: Some(ParsedResource {
commit: Some(base_commit.clone()),
path: base_path.as_ref().to_path_buf(),
..ParsedResource::default() }),
..DiffEntry::default()
};
let diff_entries = DiffEntriesCounts {
entries: vec![diff_entry],
counts: AddRemoveModifyCounts {
added: 0,
removed: 0,
modified: 1, },
pagination: Pagination::default(), };
Ok(diff_entries)
}
(false, false) => {
list_diff_entries(
repo,
base_commit,
head_commit,
base_path.as_ref().to_path_buf(),
head_path.as_ref().to_path_buf(),
opts.page,
opts.page_size,
)
.await
}
_ => {
println!("Unable to compare directory and file");
Err(OxenError::basic_str(
"Cannot compare a directory with a file",
))
}
}
}
pub async fn diff_files(
path_1: impl AsRef<Path>,
path_2: impl AsRef<Path>,
keys: Vec<String>,
targets: Vec<String>,
display: Vec<String>,
) -> Result<DiffResult, OxenError> {
log::debug!(
"Compare command called with: {:?} and {:?}",
path_1.as_ref(),
path_2.as_ref()
);
if is_files_tabular(&path_1, &path_2) {
let result = tabular(path_1, path_2, keys, targets, display).await?;
Ok(DiffResult::Tabular(result))
} else if is_files_utf8(&path_1, &path_2) {
let file_content_1 = util::fs::read_file(Some(&path_1))?;
let file_content_2 = util::fs::read_file(Some(&path_2))?;
let result = utf8_diff::diff(
Some(file_content_1),
Some(path_1.as_ref().to_path_buf()),
Some(file_content_2),
Some(path_2.as_ref().to_path_buf()),
)?;
Ok(DiffResult::Text(result))
} else {
Err(OxenError::invalid_file_type(format!(
"Compare not supported for files, found {:?} and {:?}",
path_1.as_ref(),
path_2.as_ref()
)))
}
}
pub async fn diff_file_and_node(
repo: &LocalRepository,
file_node: Option<FileNode>,
file_path: impl AsRef<Path>,
keys: Vec<String>,
targets: Vec<String>,
display: Vec<String>,
) -> Result<DiffResult, OxenError> {
match file_node {
Some(file_node) => match file_node.data_type() {
EntryDataType::Tabular => {
let result = diff_tabular_file_and_file_node(
repo,
Some(&file_node),
file_path,
keys,
targets,
display,
)
.await?;
Ok(DiffResult::Tabular(result))
}
EntryDataType::Text => {
log::debug!(
"diff_file_and_node: diffing text files {:?} and {:?}",
file_path.as_ref(),
file_node.hash().to_string()
);
let result = diff_text_file_and_node(repo, Some(&file_node), file_path).await?;
log::debug!(
"diff_file_and_node: diffing text files {:?} and {:?}",
file_node.hash().to_string(),
result
);
Ok(result)
}
_ => Err(OxenError::invalid_file_type(format!(
"Compare not supported for files, found {:?} and {:?}",
file_path.as_ref(),
file_node.hash().to_string()
))),
},
None => {
let file_type = util::fs::file_data_type(file_path.as_ref());
match file_type {
EntryDataType::Tabular => {
let result = diff_tabular_file_and_file_node(
repo, None, file_path, keys, targets, display,
)
.await?;
Ok(DiffResult::Tabular(result))
}
EntryDataType::Text => {
log::debug!(
"diff_file_and_node: diffing text files {:?}",
file_path.as_ref()
);
let result = diff_text_file_and_node(repo, None, file_path).await?;
log::debug!("diff_file_and_node: diffing text files {result:?}");
Ok(result)
}
_ => Err(OxenError::invalid_file_type(format!(
"Compare not supported for files, found {:?}",
file_path.as_ref(),
))),
}
}
}
}
pub async fn diff_file_nodes(
repo: &LocalRepository,
file_1: Option<FileNode>,
file_2: Option<FileNode>,
keys: Vec<String>,
targets: Vec<String>,
display: Vec<String>,
) -> Result<DiffResult, OxenError> {
match (file_1, file_2) {
(Some(file_1), Some(file_2)) => {
log::debug!(
" version_path_1: {:?}",
*file_1.data_type() == EntryDataType::Tabular
&& *file_2.data_type() == EntryDataType::Tabular
);
match (file_1.data_type(), file_2.data_type()) {
(EntryDataType::Tabular, EntryDataType::Tabular) => {
let mut result = diff_tabular_file_nodes(
repo,
Some(&file_1),
Some(&file_2),
keys,
targets,
display,
)
.await?;
result.filename1 = Some(file_1.name().to_string());
result.filename2 = Some(file_2.name().to_string());
Ok(DiffResult::Tabular(result))
}
(EntryDataType::Text, EntryDataType::Text) => {
let mut result =
diff_text_file_nodes(repo, Some(&file_1), Some(&file_2)).await?;
result.filename1 = Some(file_1.name().to_string());
result.filename2 = Some(file_2.name().to_string());
Ok(DiffResult::Text(result))
}
_ => Err(OxenError::invalid_file_type(format!(
"Compare not supported for files, found {:?} and {:?}",
file_1.data_type(),
file_2.data_type()
))),
}
}
(Some(file_1), None) => match file_1.data_type() {
EntryDataType::Tabular => {
let mut result =
diff_tabular_file_nodes(repo, Some(&file_1), None, keys, targets, display)
.await?;
result.filename1 = Some(file_1.name().to_string());
Ok(DiffResult::Tabular(result))
}
EntryDataType::Text => {
let mut result = diff_text_file_nodes(repo, Some(&file_1), None).await?;
result.filename1 = Some(file_1.name().to_string());
Ok(DiffResult::Text(result))
}
_ => Err(OxenError::invalid_file_type(format!(
"Compare not supported for files, found {:?}",
file_1.data_type()
))),
},
(None, Some(file_2)) => match file_2.data_type() {
EntryDataType::Tabular => {
let result =
diff_tabular_file_nodes(repo, None, Some(&file_2), keys, targets, display)
.await?;
Ok(DiffResult::Tabular(result))
}
EntryDataType::Text => {
let result = diff_text_file_nodes(repo, None, Some(&file_2)).await?;
Ok(DiffResult::Text(result))
}
_ => Err(OxenError::invalid_file_type(format!(
"Compare not supported for files, found {:?}",
file_2.data_type()
))),
},
(None, None) => Err(OxenError::basic_str(
"Could not find one or both of the files to compare",
)),
}
}
pub async fn diff_tabular_file_and_file_node(
repo: &LocalRepository,
file_node: Option<&FileNode>,
file_1_path: impl AsRef<Path>,
keys: Vec<String>,
targets: Vec<String>,
display: Vec<String>,
) -> Result<TabularDiff, OxenError> {
let (df_1, df_2) = match file_node {
Some(file_node) => {
let version_store = repo.version_store()?;
let file_node_path = version_store
.get_version_path(&file_node.hash().to_string())
.await?;
let df_1 = tabular::read_df_with_extension(
file_node_path,
file_node.extension(),
&DFOpts::empty(),
)
.await?;
let df_2 = tabular::read_df(file_1_path, DFOpts::empty()).await?;
(df_1, df_2)
}
None => {
let df_1 = tabular::new_df(); let df_2 = tabular::read_df(file_1_path, DFOpts::empty()).await?;
(df_1, df_2)
}
};
let schema_1 = Schema::from_polars(df_1.schema());
let schema_2 = Schema::from_polars(df_2.schema());
validate_required_fields(schema_1, schema_2, keys.clone(), targets.clone())?;
diff_dfs(&df_1, &df_2, keys, targets, display)
}
pub async fn diff_tabular_file_nodes(
repo: &LocalRepository,
file_1: Option<&FileNode>,
file_2: Option<&FileNode>,
keys: Vec<String>,
targets: Vec<String>,
display: Vec<String>,
) -> Result<TabularDiff, OxenError> {
match (file_1, file_2) {
(Some(file_1), Some(file_2)) => {
let version_store = repo.version_store()?;
let version_path_1 = version_store
.get_version_path(&file_1.hash().to_string())
.await?;
let version_path_2 = version_store
.get_version_path(&file_2.hash().to_string())
.await?;
let df_1 = tabular::read_df_with_extension(
version_path_1,
file_1.extension(),
&DFOpts::empty(),
)
.await?;
let df_2 = tabular::read_df_with_extension(
version_path_2,
file_2.extension(),
&DFOpts::empty(),
)
.await?;
let schema_1 = Schema::from_polars(df_1.schema());
let schema_2 = Schema::from_polars(df_2.schema());
validate_required_fields(schema_1, schema_2, keys.clone(), targets.clone())?;
diff_dfs(&df_1, &df_2, keys, targets, display)
}
(Some(file_1), None) => {
let version_store = repo.version_store()?;
let version_path_1 = version_store
.get_version_path(&file_1.hash().to_string())
.await?;
let df_1 = tabular::read_df_with_extension(
version_path_1,
file_1.extension(),
&DFOpts::empty(),
)
.await?;
let schema_1 = Schema::from_polars(df_1.schema());
let df_2 = tabular::new_df();
validate_required_fields(schema_1.clone(), schema_1, keys.clone(), targets.clone())?;
diff_dfs(&df_1, &df_2, keys, targets, display)
}
(None, Some(file_2)) => {
let version_store = repo.version_store()?;
let version_path_2 = version_store
.get_version_path(&file_2.hash().to_string())
.await?;
let df_1 = tabular::new_df();
let df_2 = tabular::read_df_with_extension(
version_path_2,
file_2.extension(),
&DFOpts::empty(),
)
.await?;
let schema_2 = Schema::from_polars(df_2.schema());
validate_required_fields(schema_2.clone(), schema_2, keys.clone(), targets.clone())?;
diff_dfs(&df_1, &df_2, keys, targets, display)
}
_ => Err(OxenError::basic_str(
"Could not find one or both of the files to compare",
)),
}
}
pub async fn diff_text_file_and_node(
repo: &LocalRepository,
file_node: Option<&FileNode>,
file_path: impl AsRef<Path>,
) -> Result<DiffResult, OxenError> {
let version_store = repo.version_store()?;
let file_node_content = if let Some(node) = file_node {
let file_hash = node.hash().to_string();
Some(read_version_file_to_string(&version_store, &file_hash).await?)
} else {
None
};
let file_path_content = util::fs::read_file(Some(&file_path))?;
let result = utf8_diff::diff(
file_node_content,
Some(file_path.as_ref().to_path_buf()), Some(file_path_content),
Some(file_path.as_ref().to_path_buf()),
)?;
Ok(DiffResult::Text(result))
}
pub async fn diff_text_file_nodes(
repo: &LocalRepository,
file_1: Option<&FileNode>,
file_2: Option<&FileNode>,
) -> Result<TextDiff, OxenError> {
let version_store = repo.version_store()?;
match (file_1, file_2) {
(Some(file_1), Some(file_2)) => {
let file_hash_1 = file_1.hash().to_string();
let file_content_1 = read_version_file_to_string(&version_store, &file_hash_1).await?;
let version_path_1 = version_store.get_version_path(&file_hash_1).await?;
let file_hash_2 = file_2.hash().to_string();
let file_content_2 = read_version_file_to_string(&version_store, &file_hash_2).await?;
let version_path_2 = version_store.get_version_path(&file_hash_2).await?;
utf8_diff::diff(
Some(file_content_1),
Some(version_path_1.to_pathbuf()),
Some(file_content_2),
Some(version_path_2.to_pathbuf()),
)
}
(Some(file_1), None) => {
let file_hash_1 = file_1.hash().to_string();
let file_content_1 = read_version_file_to_string(&version_store, &file_hash_1).await?;
let version_path_1 = version_store.get_version_path(&file_hash_1).await?;
utf8_diff::diff(
Some(file_content_1),
Some(version_path_1.to_pathbuf()),
None,
None,
)
}
(None, Some(file_2)) => {
let file_hash_2 = file_2.hash().to_string();
let file_content_2 = read_version_file_to_string(&version_store, &file_hash_2).await?;
let version_path_2 = version_store.get_version_path(&file_hash_2).await?;
utf8_diff::diff(
None,
None,
Some(file_content_2),
Some(version_path_2.to_pathbuf()),
)
}
(None, None) => Err(OxenError::basic_str(
"Could not find one or both of the files to compare",
)),
}
}
pub async fn tabular(
file_1: impl AsRef<Path>,
file_2: impl AsRef<Path>,
keys: Vec<String>,
targets: Vec<String>,
display: Vec<String>,
) -> Result<TabularDiff, OxenError> {
let df_1 = tabular::read_df(file_1, DFOpts::empty()).await?;
let df_2 = tabular::read_df(file_2, DFOpts::empty()).await?;
let schema_1 = Schema::from_polars(df_1.schema());
let schema_2 = Schema::from_polars(df_2.schema());
validate_required_fields(schema_1, schema_2, keys.clone(), targets.clone())?;
diff_dfs(&df_1, &df_2, keys, targets, display)
}
async fn read_version_file_to_string(
version_store: &Arc<dyn VersionStore>,
hash: impl AsRef<str>,
) -> Result<String, OxenError> {
match version_store.get_version(hash.as_ref()).await {
Ok(bytes) => match String::from_utf8(bytes) {
Ok(s) => Ok(s),
Err(_) => {
let err = format!("could not decode version {} as UTF-8", hash.as_ref());
log::warn!("{err}");
Err(OxenError::basic_str(&err))
}
},
Err(err) => Err(err),
}
}
fn validate_required_fields(
schema_1: Schema,
schema_2: Schema,
keys: Vec<String>,
targets: Vec<String>,
) -> Result<(), OxenError> {
if !schema_1.has_field_names(&keys) {
return Err(OxenError::incompatible_schemas(schema_1.clone()));
};
if !schema_2.has_field_names(&keys) {
return Err(OxenError::incompatible_schemas(schema_2));
};
for target in targets {
if !schema_1.has_field_name(&target) && !schema_2.has_field_name(&target) {
return Err(OxenError::incompatible_schemas(schema_1));
}
}
Ok(())
}
pub fn diff_dfs(
df_1: &DataFrame,
df_2: &DataFrame,
keys: Vec<String>,
targets: Vec<String>,
display: Vec<String>,
) -> Result<TabularDiff, OxenError> {
let schema_diff = get_schema_diff(df_1, df_2);
let (keys, targets) = get_keys_targets_smart_defaults(keys, targets, &schema_diff)?;
let display = get_display_smart_defaults(&keys, &targets, display, &schema_diff);
log::debug!("df_1 is {df_1:?}");
log::debug!("df_2 is {df_2:?}");
let (df_1, df_2) = hash_dfs(df_1.clone(), df_2.clone(), &keys, &targets)?;
let compare = join_diff::diff(&df_1, &df_2, schema_diff, &keys, &targets, &display)?;
Ok(compare)
}
fn get_schema_diff(df1: &DataFrame, df2: &DataFrame) -> SchemaDiff {
let df1_cols = df1.get_column_names();
let df2_cols = df2.get_column_names();
let mut df1_set = HashSet::new();
let mut df2_set = HashSet::new();
for col in df1_cols.iter() {
df1_set.insert(col);
}
for col in df2_cols.iter() {
df2_set.insert(col);
}
let added_cols: Vec<String> = df2_set
.difference(&df1_set)
.map(|s| (*s).to_string())
.collect();
let removed_cols: Vec<String> = df1_set
.difference(&df2_set)
.map(|s| (*s).to_string())
.collect();
let unchanged_cols: Vec<String> = df1_set
.intersection(&df2_set)
.map(|s| (*s).to_string())
.collect();
SchemaDiff {
added_cols,
removed_cols,
unchanged_cols,
}
}
fn get_keys_targets_smart_defaults(
keys: Vec<String>,
targets: Vec<String>,
schema_diff: &SchemaDiff,
) -> Result<(Vec<String>, Vec<String>), OxenError> {
log::debug!("get_keys_targets_smart_defaults keys {keys:?} targets {targets:?}");
let has_keys = !keys.is_empty();
let has_targets = !targets.is_empty();
match (has_keys, has_targets) {
(true, true) => Ok((keys, targets)),
(true, false) => {
let filled_targets = schema_diff
.unchanged_cols
.iter()
.filter(|c| !keys.contains(c))
.cloned()
.collect();
Ok((keys, filled_targets))
}
(false, true) => Err(OxenError::basic_str(
"Must specify at least one key column if specifying target columns.",
)),
(false, false) => {
let filled_keys = schema_diff.unchanged_cols.to_vec();
let filled_targets = schema_diff
.added_cols
.iter()
.chain(schema_diff.removed_cols.iter())
.cloned()
.collect();
Ok((filled_keys, filled_targets))
}
}
}
fn get_display_smart_defaults(
keys: &[String],
targets: &[String],
display: Vec<String>,
schema_diff: &SchemaDiff,
) -> Vec<String> {
if !display.is_empty() {
return display;
}
let mut display_default = vec![];
for col in &schema_diff.unchanged_cols {
if !keys.contains(col) && !targets.contains(col) {
display_default.push(format!("{col}.left"));
display_default.push(format!("{col}.right"));
}
}
for col in &schema_diff.removed_cols {
if !keys.contains(col) && !targets.contains(col) {
display_default.push(format!("{col}.left"));
}
}
for col in &schema_diff.added_cols {
if !keys.contains(col) && !targets.contains(col) {
display_default.push(format!("{col}.right"));
}
}
display_default
}
fn hash_dfs(
mut left_df: DataFrame,
mut right_df: DataFrame,
keys: &[String],
targets: &[String],
) -> Result<(DataFrame, DataFrame), OxenError> {
left_df = tabular::df_hash_rows_on_cols(left_df, targets, TARGETS_HASH_COL)?;
right_df = tabular::df_hash_rows_on_cols(right_df, targets, TARGETS_HASH_COL)?;
left_df = tabular::df_hash_rows_on_cols(left_df, keys, KEYS_HASH_COL)?;
right_df = tabular::df_hash_rows_on_cols(right_df, keys, KEYS_HASH_COL)?;
Ok((left_df, right_df))
}
pub fn count_added_rows(base_df: DataFrame, head_df: DataFrame) -> Result<usize, OxenError> {
let base_df = tabular::df_hash_rows(base_df)?;
let head_df = tabular::df_hash_rows(head_df)?;
let base_hash_indices: HashSet<String> = base_df
.column(constants::ROW_HASH_COL_NAME)
.unwrap()
.str()
.unwrap()
.into_iter()
.map(|v| v.unwrap().to_string())
.collect();
let head_hash_indices: HashSet<String> = head_df
.column(constants::ROW_HASH_COL_NAME)
.unwrap()
.str()
.unwrap()
.into_iter()
.map(|v| v.unwrap().to_string())
.collect();
let num_new_rows = head_hash_indices.difference(&base_hash_indices).count();
Ok(num_new_rows)
}
pub fn count_removed_rows(base_df: DataFrame, head_df: DataFrame) -> Result<usize, OxenError> {
let base_df = tabular::df_hash_rows(base_df)?;
let head_df = tabular::df_hash_rows(head_df)?;
let base_hash_indices: HashSet<String> = base_df
.column(constants::ROW_HASH_COL_NAME)
.unwrap()
.str()
.unwrap()
.into_iter()
.map(|v| v.unwrap().to_string())
.collect();
let head_hash_indices: HashSet<String> = head_df
.column(constants::ROW_HASH_COL_NAME)
.unwrap()
.str()
.unwrap()
.into_iter()
.map(|v| v.unwrap().to_string())
.collect();
let num_removed_rows = base_hash_indices.difference(&head_hash_indices).count();
Ok(num_removed_rows)
}
pub fn compute_new_row_indices(
base_df: &DataFrame,
head_df: &DataFrame,
) -> Result<(Vec<u32>, Vec<u32>), OxenError> {
let base_df = tabular::df_hash_rows(base_df.clone())?;
let head_df = tabular::df_hash_rows(head_df.clone())?;
log::debug!("diff_current got current hashes base_df {base_df:?}");
log::debug!("diff_current got current hashes head_df {head_df:?}");
let base_hash_indices: HashMap<String, u32> = base_df
.column(constants::ROW_HASH_COL_NAME)
.unwrap()
.str()
.unwrap()
.into_iter()
.enumerate()
.map(|(i, v)| (v.unwrap().to_string(), i as u32))
.collect();
let head_hash_indices: HashMap<String, u32> = head_df
.column(constants::ROW_HASH_COL_NAME)
.unwrap()
.str()
.unwrap()
.into_iter()
.enumerate()
.map(|(i, v)| (v.unwrap().to_string(), i as u32))
.collect();
let mut added_indices: Vec<u32> = head_hash_indices
.iter()
.filter(|(hash, _indices)| !base_hash_indices.contains_key(*hash))
.map(|(_hash, index_pair)| *index_pair)
.collect();
added_indices.sort();
let mut removed_indices: Vec<u32> = base_hash_indices
.iter()
.filter(|(hash, _indices)| !head_hash_indices.contains_key(*hash))
.map(|(_hash, index_pair)| *index_pair)
.collect();
removed_indices.sort();
log::debug!("diff_current added_indices {:?}", added_indices.len());
log::debug!("diff_current removed_indices {:?}", removed_indices.len());
Ok((added_indices, removed_indices))
}
pub async fn compute_new_rows(
base_df: &DataFrame,
head_df: &DataFrame,
schema: &Schema,
) -> Result<DataFrameDiff, OxenError> {
let (added_indices, removed_indices) = compute_new_row_indices(base_df, head_df)?;
let added_rows = if !added_indices.is_empty() {
let opts = DFOpts::from_schema_columns(schema);
let head_df = tabular::transform(head_df.clone(), opts).await?;
Some(tabular::take(head_df.lazy(), added_indices)?)
} else {
None
};
log::debug!("diff_current added_rows {added_rows:?}");
let removed_rows = if !removed_indices.is_empty() {
let opts = DFOpts::from_schema_columns(schema);
let base_df = tabular::transform(base_df.clone(), opts).await?;
Some(tabular::take(base_df.lazy(), removed_indices)?)
} else {
None
};
log::debug!("diff_current removed_rows {removed_rows:?}");
Ok(DataFrameDiff {
head_schema: Some(schema.to_owned()),
base_schema: Some(schema.to_owned()),
added_rows,
removed_rows,
added_cols: None,
removed_cols: None,
})
}
pub async fn compute_new_rows_proj(
base_df: &DataFrame,
head_df: &DataFrame,
proj_base_df: &DataFrame,
proj_head_df: &DataFrame,
base_schema: &Schema,
head_schema: &Schema,
) -> Result<DataFrameDiff, OxenError> {
let (added_indices, removed_indices) = compute_new_row_indices(base_df, head_df)?;
let added_rows = if !added_indices.is_empty() {
let opts = DFOpts::from_schema_columns(head_schema);
let proj_head_df = tabular::transform(proj_head_df.clone(), opts).await?;
Some(tabular::take(proj_head_df.lazy(), added_indices)?)
} else {
None
};
log::debug!("diff_current added_rows {added_rows:?}");
let removed_rows = if !removed_indices.is_empty() {
let opts = DFOpts::from_schema_columns(base_schema);
let proj_base_df = tabular::transform(proj_base_df.clone(), opts).await?;
Some(tabular::take(proj_base_df.lazy(), removed_indices)?)
} else {
None
};
log::debug!("diff_current removed_rows {removed_rows:?}");
Ok(DataFrameDiff {
head_schema: Some(base_schema.to_owned()),
base_schema: Some(head_schema.to_owned()),
added_rows,
removed_rows,
added_cols: None,
removed_cols: None,
})
}
pub async fn compute_new_columns_from_dfs(
base_df: DataFrame,
head_df: DataFrame,
base_schema: &Schema,
head_schema: &Schema,
) -> Result<DataFrameDiff, OxenError> {
let added_fields = head_schema.added_fields(base_schema);
let removed_fields = head_schema.removed_fields(base_schema);
let added_cols = if !added_fields.is_empty() {
let opts = DFOpts::from_columns(added_fields);
let df_added = tabular::transform(head_df, opts).await?;
log::debug!("Got added col df: {df_added}");
if df_added.width() > 0 {
Some(df_added)
} else {
None
}
} else {
None
};
let removed_cols = if !removed_fields.is_empty() {
let opts = DFOpts::from_columns(removed_fields);
let df_removed = tabular::transform(base_df, opts).await?;
log::debug!("Got removed col df: {df_removed}");
if df_removed.width() > 0 {
Some(df_removed)
} else {
None
}
} else {
None
};
Ok(DataFrameDiff {
head_schema: Some(base_schema.to_owned()),
base_schema: Some(base_schema.to_owned()),
added_rows: None,
removed_rows: None,
added_cols,
removed_cols,
})
}
pub async fn diff_entries(
repo: &LocalRepository,
file_path: impl AsRef<Path>,
base_entry: Option<FileNode>,
base_commit: &Commit,
head_entry: Option<FileNode>,
head_commit: &Commit,
df_opts: DFOpts,
) -> Result<DiffEntry, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_10_0 => panic!("v0.10.0 no longer supported"),
_ => {
core::v_latest::diff::diff_entries(
repo,
file_path,
base_entry,
base_commit,
head_entry,
head_commit,
df_opts,
)
.await
}
}
}
pub fn list_changed_dirs(
repo: &LocalRepository,
base_commit: &Commit,
head_commit: &Commit,
) -> Result<Vec<(PathBuf, DiffEntryStatus)>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_10_0 => panic!("v0.10.0 no longer supported"),
_ => core::v_latest::diff::list_changed_dirs(repo, base_commit, head_commit),
}
}
pub fn cache_tabular_diff(
repo: &LocalRepository,
compare_id: &str,
commit_entry_1: CommitEntry,
commit_entry_2: CommitEntry,
diff: &TabularDiff,
) -> Result<(), OxenError> {
write_diff_commit_ids(
repo,
compare_id,
&Some(commit_entry_1),
&Some(commit_entry_2),
)?;
write_diff_df_cache(repo, compare_id, diff)?;
write_diff_dupes(repo, compare_id, &diff.summary.dupes)?;
Ok(())
}
pub fn delete_df_diff(repo: &LocalRepository, compare_id: &str) -> Result<(), OxenError> {
let compare_dir = get_diff_dir(repo, compare_id);
if compare_dir.exists() {
log::debug!("delete_df_compare() found compare_dir, deleting: {compare_dir:?}");
std::fs::remove_dir_all(&compare_dir)?;
}
Ok(())
}
fn write_diff_dupes(
repo: &LocalRepository,
compare_id: &str,
dupes: &TabularDiffDupes,
) -> Result<(), OxenError> {
let compare_dir = get_diff_dir(repo, compare_id);
if !compare_dir.exists() {
util::fs::create_dir_all(&compare_dir)?;
}
let dupes_path = compare_dir.join(DUPES_PATH);
std::fs::write(dupes_path, serde_json::to_string(&dupes)?)?;
Ok(())
}
pub async fn get_cached_diff(
repo: &LocalRepository,
compare_id: &str,
compare_entry_1: Option<CommitEntry>,
compare_entry_2: Option<CommitEntry>,
) -> Result<Option<DiffResult>, OxenError> {
let (cached_left_id, cached_right_id) = get_diff_commit_ids(repo, compare_id)?;
if cached_left_id.is_none() || cached_right_id.is_none() {
return Ok(None);
}
if compare_entry_1.is_none() || compare_entry_2.is_none() {
return Ok(None);
}
let left_entry = compare_entry_1.unwrap();
let right_entry = compare_entry_2.unwrap();
let left_version_path = repositories::revisions::get_version_file_from_commit_id(
repo,
left_entry.commit_id,
&left_entry.path,
)
.await?;
let left_full_df = tabular::read_df(&*left_version_path, DFOpts::empty()).await?;
let right_version_path = repositories::revisions::get_version_file_from_commit_id(
repo,
right_entry.commit_id,
&right_entry.path,
)
.await?;
let right_full_df = tabular::read_df(&*right_version_path, DFOpts::empty()).await?;
let schema_diff = TabularSchemaDiff::from_schemas(
&Schema::from_polars(left_full_df.schema()),
&Schema::from_polars(right_full_df.schema()),
)?;
let diff_df = tabular::read_df(get_diff_cache_path(repo, compare_id), DFOpts::empty()).await?;
let schemas = TabularDiffSchemas {
left: Schema::from_polars(left_full_df.schema()),
right: Schema::from_polars(right_full_df.schema()),
diff: Schema::from_polars(diff_df.schema()),
};
let row_mods = AddRemoveModifyCounts::from_diff_df(&diff_df)?;
let tab_diff_summary = TabularDiffSummary {
schemas,
modifications: TabularDiffMods {
row_counts: row_mods,
col_changes: schema_diff,
},
dupes: read_dupes(repo, compare_id)?,
};
let diff_results = TabularDiff {
summary: tab_diff_summary,
parameters: TabularDiffParameters::empty(),
contents: diff_df,
filename1: None,
filename2: None,
};
Ok(Some(DiffResult::Tabular(diff_results)))
}
fn read_dupes(repo: &LocalRepository, compare_id: &str) -> Result<TabularDiffDupes, OxenError> {
let compare_dir = get_diff_dir(repo, compare_id);
let dupes_path = compare_dir.join(DUPES_PATH);
if !dupes_path.exists() {
return Ok(TabularDiffDupes::empty());
}
let dupes: TabularDiffDupes = serde_json::from_str(&std::fs::read_to_string(dupes_path)?)?;
Ok(dupes)
}
pub async fn list_diff_entries(
repo: &LocalRepository,
base_commit: &Commit,
head_commit: &Commit,
base_dir: PathBuf,
head_dir: PathBuf,
page: usize,
page_size: usize,
) -> Result<DiffEntriesCounts, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_10_0 => panic!("v0.10.0 no longer supported"),
_ => {
core::v_latest::diff::list_diff_entries(
repo,
base_commit,
head_commit,
base_dir,
head_dir,
page,
page_size,
)
.await
}
}
}
fn write_diff_df_cache(
repo: &LocalRepository,
compare_id: &str,
diff: &TabularDiff,
) -> Result<(), OxenError> {
let compare_dir = get_diff_dir(repo, compare_id);
if !compare_dir.exists() {
util::fs::create_dir_all(&compare_dir)?;
}
let mut df = diff.contents.clone();
log::debug!("getting diff cache path");
let diff_path = get_diff_cache_path(repo, compare_id);
log::debug!("about to create at path {diff_path:?}");
tabular::write_df(&mut df, &diff_path)?;
Ok(())
}
fn get_diff_commit_ids(
repo: &LocalRepository,
compare_id: &str,
) -> Result<(Option<String>, Option<String>), OxenError> {
let compare_dir = get_diff_dir(repo, compare_id);
if !compare_dir.exists() {
return Ok((None, None));
}
let left_path = compare_dir.join(LEFT_COMPARE_COMMIT);
let right_path = compare_dir.join(RIGHT_COMPARE_COMMIT);
if !left_path.exists() || !right_path.exists() {
return Ok((None, None));
}
let left_id = std::fs::read_to_string(left_path)?;
let right_id = std::fs::read_to_string(right_path)?;
Ok((Some(left_id), Some(right_id)))
}
fn write_diff_commit_ids(
repo: &LocalRepository,
compare_id: &str,
left_entry: &Option<CommitEntry>,
right_entry: &Option<CommitEntry>,
) -> Result<(), OxenError> {
let compare_dir = get_diff_dir(repo, compare_id);
if !compare_dir.exists() {
util::fs::create_dir_all(&compare_dir)?;
}
let left_path = compare_dir.join(LEFT_COMPARE_COMMIT);
let right_path = compare_dir.join(RIGHT_COMPARE_COMMIT);
if let Some(commit_entry) = left_entry {
let left_id = &commit_entry.commit_id;
std::fs::write(left_path, left_id)?;
}
if let Some(commit_entry) = right_entry {
let right_id = &commit_entry.commit_id;
std::fs::write(right_path, right_id)?;
}
Ok(())
}
pub fn get_add_remove_modify_counts(entries: &[DiffEntry]) -> AddRemoveModifyCounts {
let mut added = 0;
let mut removed = 0;
let mut modified = 0;
for entry in entries {
if entry.is_dir {
continue;
}
match DiffEntryStatus::from_str(&entry.status).unwrap() {
DiffEntryStatus::Added => added += 1,
DiffEntryStatus::Removed => removed += 1,
DiffEntryStatus::Modified => modified += 1,
}
}
AddRemoveModifyCounts {
added,
removed,
modified,
}
}
fn get_diff_cache_path(repo: &LocalRepository, compare_id: &str) -> PathBuf {
let compare_dir = get_diff_dir(repo, compare_id);
compare_dir.join("diff.parquet")
}
pub fn get_diff_dir(repo: &LocalRepository, compare_id: &str) -> PathBuf {
util::fs::oxen_hidden_dir(&repo.path)
.join(CACHE_DIR)
.join(COMPARES_DIR)
.join(compare_id)
}
#[cfg(test)]
mod tests {
use std::path::Path;
use std::path::PathBuf;
use crate::error::OxenError;
use crate::model::diff::diff_entry_status::DiffEntryStatus;
use crate::opts::DiffOpts;
use crate::opts::RmOpts;
use crate::repositories;
use crate::test;
use crate::util;
use polars::lazy::dsl::{col, lit};
use polars::lazy::frame::IntoLazy;
use crate::constants::DIFF_STATUS_COL;
use crate::model::diff::{ChangeType, DiffResult};
use crate::model::entry::commit_entry::CommitPath;
#[tokio::test]
async fn test_diff_entries_add_multiple() -> Result<(), OxenError> {
test::run_bounding_box_csv_repo_test_fully_committed_async(|repo| async move {
let base_commit = repositories::commits::head_commit(&repo)?;
let hello_file = repo.path.join("Hello.txt");
let world_file = repo.path.join("World.txt");
test::write_txt_file_to_path(&hello_file, "Hello")?;
test::write_txt_file_to_path(&world_file, "World")?;
repositories::add(&repo, &hello_file).await?;
repositories::add(&repo, &world_file).await?;
let head_commit = repositories::commit(&repo, "Adding two files")?;
let entries = repositories::diffs::list_diff_entries(
&repo,
&base_commit,
&head_commit,
PathBuf::from(""),
PathBuf::from(""),
0,
10,
)
.await?;
let entries = entries.entries;
assert_eq!(2, entries.len());
assert_eq!(DiffEntryStatus::Added.to_string(), entries[0].status);
assert_eq!(DiffEntryStatus::Added.to_string(), entries[1].status);
Ok(())
})
.await
}
#[tokio::test]
async fn test_diff_entries_modify_one_tabular() -> Result<(), OxenError> {
test::run_bounding_box_csv_repo_test_fully_committed_async(|repo| async move {
let bbox_filename = Path::new("annotations")
.join("train")
.join("bounding_box.csv");
let bbox_file = repo.path.join(bbox_filename);
let base_commit = repositories::commits::head_commit(&repo)?;
let bbox_file = test::modify_txt_file(
bbox_file,
r"
file,label,min_x,min_y,width,height
train/dog_1.jpg,dog,101.5,32.0,385,330
train/dog_2.jpg,dog,7.0,29.5,246,247
train/cat_2.jpg,cat,30.5,44.0,333,396
",
)?;
repositories::add(&repo, bbox_file).await?;
let head_commit = repositories::commit(&repo, "Removing a row from train bbox data")?;
let entries = repositories::diffs::list_diff_entries(
&repo,
&base_commit,
&head_commit,
PathBuf::from(""),
PathBuf::from(""),
0,
10,
)
.await?;
let entries = entries.entries;
assert_eq!(3, entries.len());
for entry in entries.iter() {
println!("==================");
println!("entry {entry:?}");
println!("==================");
assert_eq!(DiffEntryStatus::Modified.to_string(), entry.status);
}
Ok(())
})
.await
}
#[tokio::test]
async fn test_diff_entries_remove_one_tabular_file() -> Result<(), OxenError> {
test::run_bounding_box_csv_repo_test_fully_committed_async(|repo| async move {
let bbox_filename = Path::new("annotations")
.join("train")
.join("bounding_box.csv");
let bbox_file = repo.path.join(&bbox_filename);
let base_commit = repositories::commits::head_commit(&repo)?;
util::fs::remove_file(bbox_file)?;
let opts = RmOpts::from_path(&bbox_filename);
repositories::rm(&repo, &opts)?;
let head_commit = repositories::commit(&repo, "Removing a the training data file")?;
let entries = repositories::diffs::list_diff_entries(
&repo,
&base_commit,
&head_commit,
PathBuf::from(""),
PathBuf::from(""),
0,
10,
)
.await?;
let entries = entries.entries;
for entry in entries.iter().enumerate() {
println!("entry {}: {:?}", entry.0, entry.1);
}
assert_eq!(3, entries.len());
let annotations_entry = entries.iter().find(|entry| entry.filename == "annotations");
assert!(annotations_entry.is_some());
assert_eq!(
annotations_entry.unwrap().status,
DiffEntryStatus::Modified.to_string()
);
let annotations_path = Path::new("annotations").join(Path::new("train"));
let annotations_filename = annotations_path.to_str().unwrap();
let annotations_train_entry = entries
.iter()
.find(|entry| entry.filename == annotations_filename);
assert!(annotations_train_entry.is_some());
assert_eq!(
annotations_train_entry.unwrap().status,
DiffEntryStatus::Modified.to_string()
);
let bounding_box_path = Path::new("annotations")
.join(Path::new("train"))
.join(Path::new("bounding_box.csv"));
let bounding_box_filename = bounding_box_path.to_str().unwrap();
let bounding_box_entry = entries
.iter()
.find(|entry| entry.filename == bounding_box_filename);
assert!(bounding_box_entry.is_some());
assert_eq!(
bounding_box_entry.unwrap().status,
DiffEntryStatus::Removed.to_string()
);
Ok(())
})
.await
}
#[tokio::test]
async fn test_diff_get_add_remove_modify_counts() -> Result<(), OxenError> {
test::run_bounding_box_csv_repo_test_fully_committed_async(|repo| async move {
let base_commit = repositories::commits::head_commit(&repo)?;
let hello_file = repo.path.join("Hello.txt");
let world_file = repo.path.join("World.txt");
test::write_txt_file_to_path(&hello_file, "Hello")?;
test::write_txt_file_to_path(&world_file, "World")?;
repositories::add(&repo, &hello_file).await?;
repositories::add(&repo, &world_file).await?;
repositories::commit(&repo, "Removing a row from train bbox data")?;
let bbox_filename = Path::new("annotations")
.join("train")
.join("bounding_box.csv");
let bbox_file = repo.path.join(&bbox_filename);
util::fs::remove_file(bbox_file)?;
let opts = RmOpts::from_path(&bbox_filename);
repositories::rm(&repo, &opts)?;
let head_commit = repositories::commit(&repo, "Removing a the training data file")?;
let entries = repositories::diffs::list_diff_entries(
&repo,
&base_commit,
&head_commit,
PathBuf::from(""),
PathBuf::from(""),
0,
10,
)
.await?;
let entries = entries.entries;
for entry in entries.iter().enumerate() {
println!("entry {}: {:?}", entry.0, entry.1);
}
let counts = repositories::diffs::get_add_remove_modify_counts(&entries);
assert_eq!(5, entries.len());
assert_eq!(2, counts.added);
assert_eq!(1, counts.removed);
Ok(())
})
.await
}
#[tokio::test]
async fn test_diff_entries_in_dir_at_root() -> Result<(), OxenError> {
test::run_bounding_box_csv_repo_test_fully_committed_async(|repo| async move {
let bbox_filename = Path::new("annotations")
.join("train")
.join("bounding_box.csv");
let bbox_file = repo.path.join(&bbox_filename);
let new_bbox_filename = Path::new("annotations")
.join("train")
.join("new_bounding_box.csv");
let new_bbox_file = repo.path.join(&new_bbox_filename);
let new_root_filename = Path::new("READMENOW.md");
let new_root_file = repo.path.join(new_root_filename);
let add_dir = PathBuf::from("annotations").join("schmannotations");
let add_dir_added_file = PathBuf::from("annotations")
.join("schmannotations")
.join("added_file.txt");
let add_root_dir = PathBuf::from("not_annotations");
let add_root_dir_added_file = PathBuf::from("not_annotations").join("added_file.txt");
util::fs::create_dir_all(repo.path.join(add_dir))?;
util::fs::create_dir_all(repo.path.join(add_root_dir))?;
test::write_txt_file_to_path(&new_root_file, "Hello,world")?;
test::write_txt_file_to_path(&new_bbox_file, "Hello,world")?;
test::write_txt_file_to_path(repo.path.join(add_dir_added_file), "Hello,world!!")?;
test::write_txt_file_to_path(repo.path.join(add_root_dir_added_file), "Hello,world!!")?;
let base_commit = repositories::commits::head_commit(&repo)?;
util::fs::remove_file(bbox_file)?;
let opts = RmOpts::from_path(&bbox_filename);
repositories::rm(&repo, &opts)?;
repositories::add(&repo, &repo.path).await?;
let head_commit = repositories::commit(&repo, "Removing a the training data file")?;
let entries = repositories::diffs::list_diff_entries(
&repo,
&base_commit,
&head_commit,
PathBuf::from(""),
PathBuf::from(""),
0,
10,
)
.await?;
let entries = entries.entries;
let annotation_diff_entries = repositories::diffs::list_diff_entries(
&repo,
&base_commit,
&head_commit,
PathBuf::from("annotations"),
PathBuf::from("annotations"),
0,
10,
)
.await?;
log::debug!("Got entries: {entries:?}");
assert_eq!(9, entries.len());
assert_eq!(entries[0].status, DiffEntryStatus::Modified.to_string());
assert_eq!(entries[1].status, DiffEntryStatus::Added.to_string());
assert_eq!(entries[2].status, DiffEntryStatus::Modified.to_string());
assert_eq!(entries[3].status, DiffEntryStatus::Added.to_string());
assert_eq!(entries[4].status, DiffEntryStatus::Added.to_string());
assert_eq!(entries[5].status, DiffEntryStatus::Added.to_string());
assert_eq!(entries[6].status, DiffEntryStatus::Removed.to_string());
assert_eq!(entries[7].status, DiffEntryStatus::Added.to_string());
assert_eq!(entries[8].status, DiffEntryStatus::Added.to_string());
log::debug!(
"Got annotation_diff_entries: {:?}",
annotation_diff_entries.entries
);
assert_eq!(5, annotation_diff_entries.entries.len());
assert_eq!(
annotation_diff_entries.entries[0].status,
DiffEntryStatus::Added.to_string()
);
assert_eq!(
annotation_diff_entries.entries[1].status,
DiffEntryStatus::Modified.to_string()
);
assert_eq!(
annotation_diff_entries.entries[2].status,
DiffEntryStatus::Added.to_string()
);
assert_eq!(
annotation_diff_entries.entries[3].status,
DiffEntryStatus::Removed.to_string()
);
assert_eq!(
annotation_diff_entries.entries[4].status,
DiffEntryStatus::Added.to_string()
);
Ok(())
})
.await
}
#[tokio::test]
async fn test_diff_entries_remove_one_tabular_in_dir() -> Result<(), OxenError> {
test::run_bounding_box_csv_repo_test_fully_committed_async(|repo| async move {
let bbox_filename = Path::new("annotations")
.join("train")
.join("bounding_box.csv");
let bbox_file = repo.path.join(&bbox_filename);
let base_commit = repositories::commits::head_commit(&repo)?;
util::fs::remove_file(bbox_file)?;
let opts = RmOpts::from_path(&bbox_filename);
repositories::rm(&repo, &opts)?;
let head_commit = repositories::commit(&repo, "Removing a the training data file")?;
let entries = repositories::diffs::list_diff_entries(
&repo,
&base_commit,
&head_commit,
PathBuf::from(""),
PathBuf::from(""),
0,
10,
)
.await?;
println!("counts: {:?}", entries.counts);
assert_eq!(0, entries.counts.added);
assert_eq!(1, entries.counts.removed);
assert_eq!(0, entries.counts.modified);
let entries = entries.entries;
for entry in entries.iter().enumerate() {
println!("entry {}: {:?}", entry.0, entry.1);
}
assert_eq!(3, entries.len());
assert_eq!(entries[0].status, DiffEntryStatus::Modified.to_string());
Ok(())
})
.await
}
#[tokio::test]
async fn test_diff_txt_files() -> Result<(), OxenError> {
test::run_empty_dir_test_async(|dir| async move {
let file1 = dir.join("file1.txt");
let file2 = dir.join("file2.txt");
println!("file1: {file1:?}");
println!("file2: {file2:?}");
util::fs::write_to_path(&file1, "hello\nhi\nhow are you?")?;
util::fs::write_to_path(&file2, "hello\nhi\nhow are you doing?")?;
let opts = DiffOpts {
repo_dir: Some(dir.canonicalize()?),
path_1: file1,
path_2: Some(file2),
keys: vec![],
targets: vec![],
revision_1: None,
revision_2: None,
output: None,
page: 0,
page_size: 10,
};
println!("!!!!!!!!!");
let diff = repositories::diffs::diff(opts).await?;
match diff.first() {
Some(DiffResult::Text(result)) => {
let lines = &result.lines;
println!("!!!!");
for line in lines {
println!("!!!!");
println!("{line:?}");
}
assert_eq!(lines.len(), 4);
assert_eq!(lines[0].modification, ChangeType::Unchanged);
assert_eq!(&lines[0].text, "hello");
assert_eq!(lines[1].modification, ChangeType::Unchanged);
assert_eq!(&lines[1].text, "hi");
assert_eq!(lines[2].modification, ChangeType::Removed);
assert_eq!(&lines[2].text, "how are you?");
assert_eq!(lines[3].modification, ChangeType::Added);
assert_eq!(&lines[3].text, "how are you doing?");
}
_ => panic!("expected text result"),
}
Ok(())
})
.await
}
#[tokio::test]
async fn test_compare_one_added_one_removed_no_keys_no_targets() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|repo| async move {
let csv1 = "a,b,c\n1,2,3\n4,5,6\n";
let csv2 = "a,b,c\n1,2,3\n4,5,2\n";
let path_1 = PathBuf::from("file1.csv");
let path_2 = PathBuf::from("file2.csv");
tokio::fs::write(repo.path.join(&path_1), csv1).await?;
tokio::fs::write(repo.path.join(&path_2), csv2).await?;
repositories::add(&repo, repo.path.clone()).await?;
let commit = repositories::commit(&repo, "two files")?;
let c1 = CommitPath {
commit: Some(commit.clone()),
path: path_1.clone(),
};
let c2 = CommitPath {
commit: Some(commit.clone()),
path: path_2.clone(),
};
let compare_result =
repositories::diffs::diff_commits(&repo, c1, c2, vec![], vec![], vec![]).await?;
let diff_col = DIFF_STATUS_COL;
match compare_result {
DiffResult::Tabular(result) => {
let df = result.contents;
assert_eq!(df.height(), 2);
assert_eq!(df.width(), 4); let added_df = df
.clone()
.lazy()
.filter(col(diff_col).eq(lit("added")))
.collect()?;
let removed_df = df
.lazy()
.filter(col(diff_col).eq(lit("removed")))
.collect()?;
assert_eq!(added_df.height(), 1);
assert_eq!(removed_df.height(), 1);
}
_ => panic!("expected tabular result"),
}
Ok(())
})
.await
}
#[tokio::test]
async fn test_compare_all_types_with_keys_and_targets() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|repo| async move {
let csv1 = "a,b,c\n1,2,1\n3,4,1\n5,6,1\n7,8,1";
let csv2 = "a,b,c\n1,2,1\n3,4,1234\n9,10,1";
let path_1 = PathBuf::from("file1.csv");
let path_2 = PathBuf::from("file2.csv");
tokio::fs::write(repo.path.join(&path_1), csv1).await?;
tokio::fs::write(repo.path.join(&path_2), csv2).await?;
repositories::add(&repo, repo.path.clone()).await?;
let commit = repositories::commit(&repo, "two files")?;
let c1 = CommitPath {
commit: Some(commit.clone()),
path: path_1.clone(),
};
let c2 = CommitPath {
commit: Some(commit.clone()),
path: path_2.clone(),
};
let compare_result = repositories::diffs::diff_commits(
&repo,
c1,
c2,
vec!["a".to_string(), "b".to_string()],
vec!["c".to_string()],
vec![],
)
.await?;
let diff_col = DIFF_STATUS_COL;
match compare_result {
DiffResult::Tabular(result) => {
let df = result.contents;
assert_eq!(df.height(), 4);
assert_eq!(df.width(), 5); let added_df = df
.clone()
.lazy()
.filter(col(diff_col).eq(lit("added")))
.collect()?;
let removed_df = df
.clone()
.lazy()
.filter(col(diff_col).eq(lit("removed")))
.collect()?;
let modified_df = df
.lazy()
.filter(col(diff_col).eq(lit("modified")))
.collect()?;
assert_eq!(added_df.height(), 1);
assert_eq!(removed_df.height(), 2);
assert_eq!(modified_df.height(), 1);
}
_ => panic!("expected tabular result"),
}
Ok(())
})
.await
}
#[tokio::test]
async fn test_compare_all_types_with_keys_and_same_targets() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|repo| async move {
let csv1 = "a,b,c\n1,2,1\n3,4,1\n5,6,1\n7,8,1";
let csv2 = "a,b,c\n1,2,1\n3,4,1\n9,10,1";
let path_1 = PathBuf::from("file1.csv");
let path_2 = PathBuf::from("file2.csv");
tokio::fs::write(repo.path.join(&path_1), csv1).await?;
tokio::fs::write(repo.path.join(&path_2), csv2).await?;
repositories::add(&repo, repo.path.clone()).await?;
let commit = repositories::commit(&repo, "two files")?;
let c1 = CommitPath {
commit: Some(commit.clone()),
path: path_1.clone(),
};
let c2 = CommitPath {
commit: Some(commit.clone()),
path: path_2.clone(),
};
let compare_result = repositories::diffs::diff_commits(
&repo,
c1,
c2,
vec!["a".to_string(), "b".to_string()],
vec!["c".to_string()],
vec![],
)
.await?;
let diff_col = DIFF_STATUS_COL;
match compare_result {
DiffResult::Tabular(result) => {
let df = result.contents;
assert_eq!(df.height(), 3);
assert_eq!(df.width(), 5); let added_df = df
.clone()
.lazy()
.filter(col(diff_col).eq(lit("added")))
.collect()?;
let removed_df = df
.clone()
.lazy()
.filter(col(diff_col).eq(lit("removed")))
.collect()?;
let modified_df = df
.lazy()
.filter(col(diff_col).eq(lit("modified")))
.collect()?;
assert_eq!(added_df.height(), 1);
assert_eq!(removed_df.height(), 2);
assert_eq!(modified_df.height(), 0);
}
_ => panic!("expected tabular result"),
}
Ok(())
})
.await
}
#[tokio::test]
async fn test_compare_same_files_with_targets() -> Result<(), OxenError> {
if std::env::consts::OS == "windows" {
return Ok(());
}
test::run_empty_local_repo_test_async(|repo| async move {
let csv1 = "a,b,c,d\n1,2,3,4\n4,5,6,7\n";
let csv2 = "a,b,c,d\n1,2,3,4\n4,5,6,7\n";
let path_1 = PathBuf::from("file1.csv");
let path_2 = PathBuf::from("file2.csv");
tokio::fs::write(repo.path.join(&path_1), csv1).await?;
tokio::fs::write(repo.path.join(&path_2), csv2).await?;
repositories::add(&repo, repo.path.clone()).await?;
let commit = repositories::commit(&repo, "two files")?;
let c1 = CommitPath {
commit: Some(commit.clone()),
path: path_1.clone(),
};
let c2 = CommitPath {
commit: Some(commit.clone()),
path: path_2.clone(),
};
let compare_result = repositories::diffs::diff_commits(
&repo,
c1,
c2,
vec!["a".to_string(), "b".to_string()],
vec!["c".to_string(), "d".to_string()],
vec![],
)
.await?;
let diff_col = DIFF_STATUS_COL;
match compare_result {
DiffResult::Tabular(result) => {
let df = result.contents;
assert_eq!(df.height(), 0);
assert_eq!(df.width(), 7); let added_df = df
.clone()
.lazy()
.filter(col(diff_col).eq(lit("added")))
.collect()?;
let removed_df = df
.clone()
.lazy()
.filter(col(diff_col).eq(lit("removed")))
.collect()?;
let modified_df = df
.lazy()
.filter(col(diff_col).eq(lit("modified")))
.collect()?;
assert_eq!(added_df.height(), 0);
assert_eq!(removed_df.height(), 0);
assert_eq!(modified_df.height(), 0);
}
_ => panic!("expected tabular result"),
}
Ok(())
})
.await
}
#[tokio::test]
async fn compare_no_keys_no_targets_added_column() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|repo| async move {
let csv1 = "a,b,c,d\n1,2,3,4\n4,5,6,7\n8,7,6,5";
let csv2 = "a,b,c,d,e\n1,2,3,4,5\n4,5,6,7,8\n9,8,7,6,5";
let path_1 = PathBuf::from("file1.csv");
let path_2 = PathBuf::from("file2.csv");
tokio::fs::write(repo.path.join(&path_1), csv1).await?;
tokio::fs::write(repo.path.join(&path_2), csv2).await?;
repositories::add(&repo, repo.path.clone()).await?;
let commit = repositories::commit(&repo, "two files")?;
let c1 = CommitPath {
commit: Some(commit.clone()),
path: path_1.clone(),
};
let c2 = CommitPath {
commit: Some(commit.clone()),
path: path_2.clone(),
};
let compare_result =
repositories::diffs::diff_commits(&repo, c1, c2, vec![], vec![], vec![]).await?;
let diff_col = DIFF_STATUS_COL;
match compare_result {
DiffResult::Tabular(result) => {
let df = result.contents;
assert_eq!(df.height(), 4);
let added_df = df
.clone()
.lazy()
.filter(col(diff_col).eq(lit("added")))
.collect()?;
let removed_df = df
.clone()
.lazy()
.filter(col(diff_col).eq(lit("removed")))
.collect()?;
let modified_df = df
.lazy()
.filter(col(diff_col).eq(lit("modified")))
.collect()?;
assert_eq!(added_df.height(), 1);
assert_eq!(removed_df.height(), 1);
assert_eq!(modified_df.height(), 2);
}
_ => panic!("expected tabular result"),
}
Ok(())
})
.await
}
#[tokio::test]
async fn test_compare_keys_no_targets_implies_modified() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|repo| async move {
let csv1 = "a,b,c,d\n1,2,3,4\n4,5,6,7\n0,0,0,0\n";
let csv2 = "a,b,c,d\n1,2,3,4\n4,5,6,8\n1,1,1,1\n";
let path_1 = PathBuf::from("file1.csv");
let path_2 = PathBuf::from("file2.csv");
tokio::fs::write(repo.path.join(&path_1), csv1).await?;
tokio::fs::write(repo.path.join(&path_2), csv2).await?;
repositories::add(&repo, repo.path.clone()).await?;
let commit = repositories::commit(&repo, "two files")?;
let c1 = CommitPath {
commit: Some(commit.clone()),
path: path_1.clone(),
};
let c2 = CommitPath {
commit: Some(commit.clone()),
path: path_2.clone(),
};
let compare_result = repositories::diffs::diff_commits(
&repo,
c1,
c2,
vec!["a".to_string(), "b".to_string(), "c".to_string()],
vec![],
vec![],
)
.await?;
let diff_col = DIFF_STATUS_COL;
match compare_result {
DiffResult::Tabular(result) => {
let df = result.contents;
assert_eq!(df.height(), 3);
let added_df = df
.clone()
.lazy()
.filter(col(diff_col).eq(lit("added")))
.collect()?;
let removed_df = df
.clone()
.lazy()
.filter(col(diff_col).eq(lit("removed")))
.collect()?;
let modified_df = df
.lazy()
.filter(col(diff_col).eq(lit("modified")))
.collect()?;
assert_eq!(added_df.height(), 1);
assert_eq!(removed_df.height(), 1);
assert_eq!(modified_df.height(), 1);
}
_ => panic!("expected tabular result"),
}
Ok(())
})
.await
}
}