use polars::datatypes::BooleanChunked;
use crate::core::df::tabular;
use crate::error::OxenError;
use crate::model::compare::tabular_compare::TabularCompare;
use crate::model::compare::tabular_compare_summary::TabularCompareSummary;
use crate::model::{Commit, LocalRepository, Schema};
use crate::opts::DFOpts;
use crate::view::schema::SchemaWithPath;
use crate::view::JsonDataFrame;
use crate::{api, util};
use polars::prelude::ChunkCompare;
use polars::prelude::{DataFrame, DataFrameJoinOps};
use std::path::{Path, PathBuf};
pub fn compare_files(
repo: &LocalRepository,
file_1: PathBuf, commit_1: Commit,
file_2: PathBuf,
commit_2: Commit,
keys: Vec<String>,
targets: Vec<String>,
opts: DFOpts, ) -> Result<TabularCompare, OxenError> {
let version_file_1 = api::local::diff::get_version_file_from_commit(repo, &commit_1, &file_1)?;
let version_file_2 = api::local::diff::get_version_file_from_commit(repo, &commit_2, &file_2)?;
if !util::fs::is_tabular(&version_file_1) || !util::fs::is_tabular(&version_file_2) {
return Err(OxenError::invalid_file_type(format!(
"Compare not supported for non-tabular files, found {file_1:?} and {file_2:?}",
)));
}
if !version_file_1.exists() {
return Err(OxenError::entry_does_not_exist(version_file_1));
}
if !version_file_2.exists() {
return Err(OxenError::entry_does_not_exist(version_file_2));
}
let df_1 = tabular::read_df(&version_file_1, DFOpts::empty())?;
let df_2 = tabular::read_df(&version_file_2, DFOpts::empty())?;
let schema_1 = Schema::from_polars(&df_1.schema());
let schema_2 = Schema::from_polars(&df_2.schema());
let required_fields = keys
.iter()
.chain(targets.iter())
.cloned()
.collect::<Vec<String>>();
println!("required fields are {:?}", required_fields);
if !schema_1.has_field_names(&required_fields) {
return Err(OxenError::InvalidSchema(Box::new(schema_1)));
}
if !schema_2.has_field_names(&required_fields) {
return Err(OxenError::InvalidSchema(Box::new(schema_2)));
}
let df_1 = df_1.select(&required_fields)?;
let df_2 = df_2.select(&required_fields)?;
let keys = keys.iter().map(|key| key.as_str()).collect::<Vec<&str>>();
let targets = targets
.iter()
.map(|target| target.as_str())
.collect::<Vec<&str>>();
let compare = compute_row_comparison(&df_1, &df_2, &file_1, &file_2, keys, targets, opts)?;
Ok(compare)
}
fn compute_row_comparison(
df_1: &DataFrame,
df_2: &DataFrame,
path_1: &Path,
path_2: &Path,
keys: Vec<&str>,
targets: Vec<&str>,
opts: DFOpts,
) -> Result<TabularCompare, OxenError> {
let mut joined_df = df_1.outer_join(df_2, keys.clone(), keys.clone())?;
for target in targets.iter() {
let left_before = target.to_string();
let left_after = format!("{}.left", target);
let right_before = format!("{}_right", target);
let right_after = format!("{}.right", target);
joined_df.rename(&left_before, &left_after)?;
joined_df.rename(&right_before, &right_after)?;
}
println!(
"columns of df after join are {:?}",
joined_df.get_column_names()
);
let df1_unique = joined_df.filter(
&joined_df
.column(format!("{}.right", targets[0]).as_str())?
.is_null(),
)?;
let df2_unique = joined_df.filter(
&joined_df
.column(format!("{}.left", targets[0]).as_str())?
.is_null(),
)?;
let different_conditions = targets
.iter()
.map(|target| {
let left = format!("{}.left", target);
let right = format!("{}.right", target);
let left_col = joined_df.column(&left)?;
let right_col = joined_df.column(&right)?;
Ok(left_col.not_equal(right_col)?)
})
.collect::<Result<Vec<BooleanChunked>, OxenError>>()?;
let different_mask = different_conditions
.into_iter()
.reduce(|acc, mask| acc | mask)
.unwrap();
let different_targets = joined_df.filter(&different_mask)?;
let same_conditions = targets
.iter()
.map(|target| {
let left = format!("{}.left", target);
let right = format!("{}.right", target);
let left_col = joined_df.column(&left)?;
let right_col = joined_df.column(&right)?;
Ok(left_col.equal(right_col)?)
})
.collect::<Result<Vec<BooleanChunked>, OxenError>>()?;
let same_mask = same_conditions
.into_iter()
.reduce(|acc, mask| acc & mask)
.unwrap();
let mut same_targets = joined_df.filter(&same_mask)?;
for target in targets.iter() {
let left = format!("{}.left", target);
let right = format!("{}.right", target);
same_targets = same_targets.drop(&right)?;
same_targets.rename(&left, target)?;
}
println!("different targets are {:?}", different_targets);
println!("same targets are {:?}", same_targets);
println!("df1 unique are {:?}", df1_unique);
println!("df2 unique are {:?}", df2_unique);
let summary = TabularCompareSummary {
num_left_only_rows: df1_unique.height(),
num_right_only_rows: df2_unique.height(),
num_diff_rows: different_targets.height(),
num_match_rows: same_targets.height(),
};
let match_rows = JsonDataFrame::from_df_opts(same_targets, opts.clone());
let diff_rows = JsonDataFrame::from_df_opts(different_targets, opts.clone());
let schema_left = SchemaWithPath {
path: path_1.as_os_str().to_str().map(|s| s.to_owned()).unwrap(),
schema: Schema::from_polars(&df_1.schema()),
};
let schema_right = SchemaWithPath {
path: path_2.as_os_str().to_str().map(|s| s.to_owned()).unwrap(),
schema: Schema::from_polars(&df_2.schema()),
};
let tabular_compare = TabularCompare {
summary,
schema_left: Some(schema_left),
schema_right: Some(schema_right),
keys: keys
.iter()
.map(|key| key.to_string())
.collect::<Vec<String>>(),
targets: targets
.iter()
.map(|target| target.to_string())
.collect::<Vec<String>>(),
match_rows: Some(match_rows),
diff_rows: Some(diff_rows),
};
Ok(tabular_compare)
}