use polars::prelude::DataFrame;
use polars::{lazy::dsl::Expr, prelude::*};
use serde::{Deserialize, Serialize};
use crate::api;
use crate::model::{CommitEntry, LocalRepository, Schema};
use crate::opts::DFOpts;
use crate::view::{JsonDataFrame, JsonDataFrameView};
use super::tabular_diff_summary::{TabularDiffSummary, TabularDiffSummaryImpl, TabularDiffWrapper};
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct TabularDiff {
pub tabular: TabularDiffImpl,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct TabularDiffImpl {
#[serde(flatten)]
pub summary: TabularDiffSummary,
pub base_schema: Option<Schema>,
pub head_schema: Option<Schema>,
pub added_rows: Option<JsonDataFrame>,
pub added_rows_view: Option<JsonDataFrameView>,
pub removed_rows: Option<JsonDataFrame>,
pub removed_rows_view: Option<JsonDataFrameView>,
pub added_cols: Option<JsonDataFrame>,
pub added_cols_view: Option<JsonDataFrameView>,
pub removed_cols: Option<JsonDataFrame>,
pub removed_cols_view: Option<JsonDataFrameView>,
}
impl TabularDiff {
pub fn from_commit_entries(
repo: &LocalRepository,
base_entry: &Option<CommitEntry>,
head_entry: &Option<CommitEntry>,
df_opts: DFOpts,
) -> TabularDiff {
let base_df = TabularDiffWrapper::maybe_get_df(repo, base_entry);
let head_df = TabularDiffWrapper::maybe_get_df(repo, head_entry);
let schema_has_changed = TabularDiffWrapper::schema_has_changed(&base_df, &head_df);
let base_schema = TabularDiff::maybe_get_schema(&base_df);
let head_schema = TabularDiff::maybe_get_schema(&head_df);
if base_schema.is_some() && head_schema.is_some() {
let base_schema = base_schema.clone().unwrap();
let head_schema = head_schema.clone().unwrap();
let base_df = base_df.unwrap();
let head_df = head_df.unwrap();
if schema_has_changed {
let df_diff = api::local::diff::compute_new_columns_from_dfs(
base_df.clone(),
head_df.clone(),
&base_schema,
&head_schema,
)
.unwrap();
let added_cols = df_diff
.added_cols
.clone()
.map(|df| JsonDataFrame::from_df_opts(df, df_opts.clone()));
let added_cols_view = df_diff
.added_cols
.map(|df| JsonDataFrameView::from_df_opts(df, head_schema.clone(), &df_opts));
let removed_cols = df_diff
.removed_cols
.clone()
.map(|df| JsonDataFrame::from_df_opts(df, df_opts.clone()));
let removed_cols_view = df_diff
.removed_cols
.map(|df| JsonDataFrameView::from_df_opts(df, head_schema.clone(), &df_opts));
let common_fields = base_schema.common_fields(&head_schema);
let column_names = common_fields
.iter()
.map(|field| field.name.clone())
.collect::<Vec<String>>();
log::debug!("COMMON FIELDS: {:?}", column_names);
let (added_rows_view, removed_rows_view) = if column_names.is_empty() {
(None, None)
} else {
let cols = column_names.iter().map(|c| col(c)).collect::<Vec<Expr>>();
let common_base_df = base_df.clone().lazy().select(&cols).collect().unwrap();
let common_head_df = head_df.clone().lazy().select(&cols).collect().unwrap();
log::debug!("common_base_df: {:?}", common_base_df);
log::debug!("common_head_df: {:?}", common_head_df);
let df_diff = api::local::diff::compute_new_rows_proj(
&common_base_df,
&common_head_df,
&base_df,
&head_df,
&base_schema,
&head_schema,
)
.unwrap();
log::debug!("ADDED ROWS: {:?}", df_diff.added_rows);
log::debug!("REMOVED ROWS: {:?}", df_diff.removed_rows);
let added_rows_view = df_diff.added_rows.map(|df| {
JsonDataFrameView::from_df_opts(df, base_schema.clone(), &df_opts)
});
let removed_rows_view = df_diff.removed_rows.map(|df| {
JsonDataFrameView::from_df_opts(df, base_schema.clone(), &df_opts)
});
(added_rows_view, removed_rows_view)
};
let summary = TabularDiffSummary {
summary: TabularDiffSummaryImpl {
num_added_rows: added_rows_view
.as_ref()
.map(|df| df.size.height)
.unwrap_or(0),
num_added_cols: added_cols
.as_ref()
.map(|df| df.full_size.width)
.unwrap_or(0),
num_removed_rows: removed_rows_view
.as_ref()
.map(|df| df.size.height)
.unwrap_or(0),
num_removed_cols: removed_cols
.as_ref()
.map(|df| df.full_size.width)
.unwrap_or(0),
schema_has_changed,
},
};
log::debug!("summary: {:?}", summary);
return TabularDiff {
tabular: TabularDiffImpl {
summary,
base_schema: Some(base_schema),
head_schema: Some(head_schema),
added_rows: None,
added_rows_view,
removed_rows: None,
removed_rows_view,
added_cols,
added_cols_view,
removed_cols,
removed_cols_view,
},
};
} else {
let df_diff =
api::local::diff::compute_new_rows(&base_df, &head_df, &base_schema).unwrap();
let added_rows = df_diff
.added_rows
.clone()
.map(|df| JsonDataFrame::from_df_opts(df, df_opts.clone()));
let added_rows_view = df_diff
.added_rows
.map(|df| JsonDataFrameView::from_df_opts(df, base_schema.clone(), &df_opts));
let removed_rows = df_diff
.removed_rows
.clone()
.map(|df| JsonDataFrame::from_df_opts(df, df_opts.clone()));
let removed_rows_view = df_diff
.removed_rows
.map(|df| JsonDataFrameView::from_df_opts(df, base_schema.clone(), &df_opts));
let summary = TabularDiffSummary {
summary: TabularDiffSummaryImpl {
num_added_rows: added_rows
.as_ref()
.map(|df| df.full_size.height)
.unwrap_or(0),
num_added_cols: 0,
num_removed_rows: removed_rows
.as_ref()
.map(|df| df.full_size.height)
.unwrap_or(0),
num_removed_cols: 0,
schema_has_changed,
},
};
return TabularDiff {
tabular: TabularDiffImpl {
summary,
base_schema: Some(base_schema),
head_schema: Some(head_schema),
added_rows,
added_rows_view,
removed_rows,
removed_rows_view,
added_cols: None,
added_cols_view: None,
removed_cols: None,
removed_cols_view: None,
},
};
}
}
if base_schema.is_none() && head_schema.is_some() {
let head_schema = head_schema.clone().unwrap();
let head_df = head_df.unwrap();
let added_df = Some(JsonDataFrame::from_df_opts(
head_df.clone(),
df_opts.clone(),
));
let added_df_view = Some(JsonDataFrameView::from_df_opts(
head_df,
head_schema.clone(),
&df_opts,
));
let summary = TabularDiffSummary {
summary: TabularDiffSummaryImpl {
num_added_rows: added_df.as_ref().map(|df| df.full_size.height).unwrap_or(0),
num_added_cols: added_df.as_ref().map(|df| df.full_size.width).unwrap_or(0),
num_removed_rows: 0,
num_removed_cols: 0,
schema_has_changed,
},
};
return TabularDiff {
tabular: TabularDiffImpl {
summary,
base_schema: None,
head_schema: Some(head_schema),
added_rows: added_df,
added_rows_view: added_df_view,
removed_rows: None,
removed_rows_view: None,
added_cols: None,
added_cols_view: None,
removed_cols: None,
removed_cols_view: None,
},
};
}
if base_schema.is_some() && head_schema.is_none() {
let base_schema = base_schema.clone().unwrap();
let base_df = base_df.unwrap();
let removed_df = Some(JsonDataFrame::from_df_opts(
base_df.clone(),
df_opts.clone(),
));
let removed_df_view = Some(JsonDataFrameView::from_df_opts(
base_df,
base_schema.clone(),
&df_opts,
));
let summary = TabularDiffSummary {
summary: TabularDiffSummaryImpl {
num_added_rows: 0,
num_added_cols: 0,
num_removed_rows: removed_df
.as_ref()
.map(|df| df.full_size.height)
.unwrap_or(0),
num_removed_cols: removed_df
.as_ref()
.map(|df| df.full_size.width)
.unwrap_or(0),
schema_has_changed,
},
};
return TabularDiff {
tabular: TabularDiffImpl {
summary,
base_schema: Some(base_schema),
head_schema: None,
added_rows: None,
added_rows_view: None,
removed_rows: removed_df,
removed_rows_view: removed_df_view,
added_cols: None,
added_cols_view: None,
removed_cols: None,
removed_cols_view: None,
},
};
}
let summary = TabularDiffSummary {
summary: TabularDiffSummaryImpl {
num_added_rows: 0,
num_added_cols: 0,
num_removed_rows: 0,
num_removed_cols: 0,
schema_has_changed,
},
};
TabularDiff {
tabular: TabularDiffImpl {
summary,
base_schema,
head_schema,
added_rows: None,
added_rows_view: None,
removed_rows: None,
removed_rows_view: None,
added_cols: None,
added_cols_view: None,
removed_cols: None,
removed_cols_view: None,
},
}
}
pub fn maybe_get_schema(df: &Option<DataFrame>) -> Option<Schema> {
df.as_ref().map(|df| Schema::from_polars(&df.schema()))
}
}