use std::io::BufWriter;
use std::str;
use polars::prelude::*;
use serde::{Deserialize, Serialize};
use std::io::Cursor;
use utoipa::ToSchema;
use super::StatusMessage;
use super::data_frames::DataFrameColumnChange;
use super::data_frames::DataFrameRowChange;
use crate::constants;
use crate::core::df::tabular;
use crate::error::OxenError;
use crate::model::Commit;
use crate::model::DataFrameSize;
use crate::model::data_frame::DataFrameSchemaSize;
use crate::opts::df_opts::DFOptsView;
use crate::view::Pagination;
use crate::view::entries::ResourceVersion;
use crate::{model::Schema, opts::DFOpts};
#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)]
pub struct JsonDataFrameView {
pub schema: Schema,
pub size: DataFrameSize,
pub data: serde_json::Value,
pub pagination: Pagination,
#[serde(flatten)]
pub opts: DFOptsView,
}
#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)]
pub struct JsonDataFrameViews {
pub source: DataFrameSchemaSize,
pub view: JsonDataFrameView,
}
#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)]
pub struct JsonDataFrameViewResponse {
#[serde(flatten)]
pub status: StatusMessage,
pub data_frame: JsonDataFrameViews,
pub commit: Option<Commit>,
pub resource: Option<ResourceVersion>,
pub derived_resource: Option<DerivedDFResource>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct WorkspaceJsonDataFrameViewResponse {
#[serde(flatten)]
pub status: StatusMessage,
pub data_frame: Option<JsonDataFrameViews>,
pub commit: Option<Commit>,
pub resource: Option<ResourceVersion>,
pub derived_resource: Option<DerivedDFResource>,
pub is_indexed: bool,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct JsonDataFrameRowResponse {
#[serde(flatten)]
pub status: StatusMessage,
pub diff: Option<Vec<DataFrameRowChange>>,
pub data_frame: JsonDataFrameViews,
pub commit: Option<Commit>,
pub resource: Option<ResourceVersion>,
pub derived_resource: Option<DerivedDFResource>,
pub row_id: Option<String>,
pub row_index: Option<usize>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct VecBatchUpdateResponse {
#[serde(flatten)]
pub status: StatusMessage,
pub rows: Vec<BatchUpdateResponse>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct BatchUpdateResponse {
pub row_id: String,
pub code: i32,
pub error: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct JsonDataFrameColumnResponse {
#[serde(flatten)]
pub status: StatusMessage,
pub diff: Option<Vec<DataFrameColumnChange>>,
pub data_frame: JsonDataFrameViews,
pub commit: Option<Commit>,
pub resource: Option<ResourceVersion>,
pub derived_resource: Option<DerivedDFResource>,
}
#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)]
#[serde(rename_all = "snake_case")]
pub enum DFResourceType {
Compare,
Diff,
Query,
}
#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)]
pub struct DerivedDFResource {
pub resource_id: String,
pub path: String,
pub resource_type: DFResourceType,
}
impl WorkspaceJsonDataFrameViewResponse {
pub async fn empty() -> WorkspaceJsonDataFrameViewResponse {
WorkspaceJsonDataFrameViewResponse {
status: StatusMessage::resource_found(),
data_frame: Some(JsonDataFrameViews::empty().await),
commit: None,
resource: None,
derived_resource: None,
is_indexed: false,
}
}
}
impl JsonDataFrameViews {
pub async fn empty() -> JsonDataFrameViews {
JsonDataFrameViews::from_df_and_opts(DataFrame::empty(), Schema::empty(), &DFOpts::empty())
.await
}
}
impl JsonDataFrameView {
pub fn empty() -> JsonDataFrameView {
JsonDataFrameView::empty_with_schema(&Schema::empty(), 0, &DFOpts::empty())
}
pub async fn from_df_opts(
df: DataFrame,
og_schema: Schema,
opts: &DFOpts,
) -> JsonDataFrameView {
let full_width = df.width();
let full_height = df.height();
let page_size = opts.page_size.unwrap_or(constants::DEFAULT_PAGE_SIZE);
let page = opts.page.unwrap_or(constants::DEFAULT_PAGE_NUM);
let start = if page == 0 { 0 } else { page_size * (page - 1) };
let end = page_size * page;
let total_pages = (full_height as f64 / page_size as f64).ceil() as usize;
let mut opts = opts.clone();
if df.height() == 0 {
return JsonDataFrameView::empty_with_schema(&og_schema, full_height, &opts);
};
opts.slice = Some(format!("{start}..{end}"));
let opts_view = DFOptsView::from_df_opts(&opts);
let mut sliced_df = tabular::transform(df, opts).await.unwrap();
let mut slice_schema = Schema::from_polars(sliced_df.schema());
slice_schema.update_metadata_from_schema(&og_schema);
JsonDataFrameView {
schema: slice_schema,
size: DataFrameSize {
height: full_height,
width: full_width,
},
data: JsonDataFrameView::json_from_df(&mut sliced_df),
pagination: Pagination {
page_number: page,
page_size,
total_pages,
total_entries: full_height,
},
opts: opts_view,
}
}
pub async fn from_df_opts_unpaginated(
df: DataFrame,
og_schema: Schema,
og_height: usize,
opts: &DFOpts,
) -> JsonDataFrameView {
let full_width = df.width();
let view_height = df.height();
let mut opts = opts.clone();
opts.slice = None;
let opts_view = DFOptsView::from_df_opts(&opts);
let mut sliced_df = tabular::transform(df, opts.clone()).await.unwrap();
let mut slice_schema = Schema::from_polars(sliced_df.schema());
log::debug!("OG schema {og_schema:?}");
log::debug!("Pre-Slice schema {slice_schema:?}");
slice_schema.update_metadata_from_schema(&og_schema);
log::debug!("Slice schema {slice_schema:?}");
let page_size = opts.page_size.unwrap_or(constants::DEFAULT_PAGE_SIZE);
let page_number = opts.page.unwrap_or(constants::DEFAULT_PAGE_NUM);
let total_pages = (og_height as f64 / page_size as f64).ceil() as usize;
JsonDataFrameView {
schema: slice_schema,
size: DataFrameSize {
height: view_height,
width: full_width,
},
data: JsonDataFrameView::json_from_df(&mut sliced_df),
pagination: Pagination {
page_number,
page_size,
total_pages,
total_entries: og_height,
},
opts: opts_view,
}
}
pub async fn to_df(&self) -> DataFrame {
if self.data == serde_json::Value::Null {
DataFrame::empty()
} else {
let columns = self.schema.fields_names();
log::debug!("Got columns: {columns:?}");
match &self.data {
serde_json::Value::Array(arr) => {
if !arr.is_empty() {
let data = self.data.to_string();
let content = Cursor::new(data.as_bytes());
let df = JsonReader::new(content).finish().unwrap();
let opts = DFOpts::from_column_names(columns);
tabular::transform(df, opts).await.unwrap()
} else {
let cols = columns
.iter()
.map(|name| {
Column::Series(
Series::new(PlSmallStr::from_str(name), Vec::<&str>::new())
.into(),
)
})
.collect::<Vec<Column>>();
DataFrame::new(cols).unwrap()
}
}
_ => {
log::error!("Could not parse non-array json data: {:?}", self.data);
DataFrame::empty()
}
}
}
}
pub fn json_from_df(df: &mut DataFrame) -> serde_json::Value {
log::debug!("Serializing df: [{df}]");
sanitize_df_for_serialization(df).expect("Error cleaning df before serialization");
let data: Vec<u8> = Vec::new();
let mut buf = BufWriter::new(data);
let mut writer = JsonWriter::new(&mut buf).with_json_format(JsonFormat::Json);
writer.finish(df).expect("Could not write df json buffer");
let buffer = buf.into_inner().expect("Could not get buffer");
let json_str = str::from_utf8(&buffer).unwrap();
serde_json::from_str(json_str).unwrap()
}
fn empty_with_schema(
schema: &Schema,
total_entries: usize,
opts: &DFOpts,
) -> JsonDataFrameView {
let mut default_df = DataFrame::empty();
JsonDataFrameView {
schema: schema.to_owned(),
size: DataFrameSize {
height: 0,
width: schema.fields_names().len(),
},
data: JsonDataFrameView::json_from_df(&mut default_df),
pagination: Pagination {
page_number: 0,
page_size: 0,
total_pages: 0,
total_entries,
},
opts: DFOptsView::from_df_opts(opts),
}
}
}
impl JsonDataFrameViews {
pub async fn from_df_and_opts(
df: DataFrame,
og_schema: Schema,
opts: &DFOpts,
) -> JsonDataFrameViews {
let source = DataFrameSchemaSize::from_df(&df, &og_schema);
let view = JsonDataFrameView::from_df_opts(df, og_schema, opts).await;
JsonDataFrameViews { source, view }
}
pub async fn from_df_and_opts_unpaginated(
df: DataFrame,
og_schema: Schema,
og_height: usize,
opts: &DFOpts,
) -> JsonDataFrameViews {
let source = DataFrameSchemaSize::from_df(&df, &og_schema);
let view =
JsonDataFrameView::from_df_opts_unpaginated(df, og_schema, og_height, opts).await;
JsonDataFrameViews { source, view }
}
}
fn sanitize_df_for_serialization(df: &mut DataFrame) -> Result<(), OxenError> {
let schema = df.schema();
let mut updates: Vec<(usize, Column)> = Vec::new();
for (idx, _field) in schema.iter_fields().enumerate() {
let series = df.select_at_idx(idx).unwrap();
if let Some(new_series) = match series.dtype() {
DataType::Binary | DataType::BinaryOffset => {
Some(cast_binary_to_string_with_fallback(series, "[binary]"))
}
DataType::Struct(subfields) => {
if has_binary_in_struct(subfields) {
Some(cast_binary_to_string_with_fallback(
series,
&format!("struct[{}]", subfields.len()),
))
} else {
None
}
}
DataType::List(subtype) => match **subtype {
DataType::Binary | DataType::BinaryOffset => Some(
cast_binary_to_string_with_fallback(series, "[list[binary]]"),
),
DataType::Struct(ref subfields) => {
if has_binary_in_struct(subfields) {
Some(cast_binary_to_string_with_fallback(
series,
&format!("[list[struct[{}]]]", subfields.len()),
))
} else {
None
}
}
_ => None,
},
_ => None,
} {
updates.push((idx, new_series));
}
}
for (idx, new_series) in updates {
df.replace_column(idx, new_series)?;
}
Ok(())
}
fn has_binary_in_struct(subfields: &[Field]) -> bool {
subfields.iter().any(|field| {
matches!(field.dtype(), DataType::Binary | DataType::BinaryOffset)
|| field.dtype().to_string().contains("Binary")
})
}
fn cast_binary_to_string_with_fallback(series: &Column, out: &str) -> Column {
let res = series.cast(&DataType::String);
if let Ok(series) = res {
series
} else {
let mut vec = vec![out];
vec.resize(series.len(), out);
Column::new(series.name().clone(), vec)
}
}