Skip to main content

liboxen/view/
json_data_frame_view.rs

1// This is the new dataframe format, depreciate JsonDataFrameSliceResponse
2
3use std::io::BufWriter;
4use std::str;
5
6use polars::prelude::*;
7use serde::{Deserialize, Serialize};
8use std::io::Cursor;
9use utoipa::ToSchema;
10
11use super::StatusMessage;
12use super::data_frames::DataFrameColumnChange;
13use super::data_frames::DataFrameRowChange;
14use crate::constants;
15use crate::core::df::tabular;
16use crate::error::OxenError;
17use crate::model::Commit;
18use crate::model::DataFrameSize;
19use crate::model::data_frame::DataFrameSchemaSize;
20use crate::opts::df_opts::DFOptsView;
21
22use crate::view::Pagination;
23use crate::view::entries::ResourceVersion;
24use crate::{model::Schema, opts::DFOpts};
25
26#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)]
27pub struct JsonDataFrameView {
28    pub schema: Schema,
29    pub size: DataFrameSize,
30    pub data: serde_json::Value,
31    pub pagination: Pagination,
32    #[serde(flatten)]
33    pub opts: DFOptsView,
34}
35
36#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)]
37pub struct JsonDataFrameViews {
38    pub source: DataFrameSchemaSize,
39    pub view: JsonDataFrameView,
40}
41
42#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)]
43pub struct JsonDataFrameViewResponse {
44    #[serde(flatten)]
45    pub status: StatusMessage,
46    pub data_frame: JsonDataFrameViews,
47    pub commit: Option<Commit>,
48    pub resource: Option<ResourceVersion>,
49    pub derived_resource: Option<DerivedDFResource>,
50}
51
52#[derive(Serialize, Deserialize, Debug, Clone)]
53pub struct WorkspaceJsonDataFrameViewResponse {
54    #[serde(flatten)]
55    pub status: StatusMessage,
56    pub data_frame: Option<JsonDataFrameViews>,
57    pub commit: Option<Commit>,
58    pub resource: Option<ResourceVersion>,
59    pub derived_resource: Option<DerivedDFResource>,
60    pub is_indexed: bool,
61}
62
63#[derive(Serialize, Deserialize, Debug, Clone)]
64pub struct JsonDataFrameRowResponse {
65    #[serde(flatten)]
66    pub status: StatusMessage,
67    pub diff: Option<Vec<DataFrameRowChange>>,
68    pub data_frame: JsonDataFrameViews,
69    pub commit: Option<Commit>,
70    pub resource: Option<ResourceVersion>,
71    pub derived_resource: Option<DerivedDFResource>,
72    pub row_id: Option<String>,
73    pub row_index: Option<usize>,
74}
75
76#[derive(Serialize, Deserialize, Debug, Clone)]
77pub struct VecBatchUpdateResponse {
78    #[serde(flatten)]
79    pub status: StatusMessage,
80    pub rows: Vec<BatchUpdateResponse>,
81}
82
83#[derive(Serialize, Deserialize, Debug, Clone)]
84pub struct BatchUpdateResponse {
85    pub row_id: String,
86    pub code: i32,
87    pub error: Option<String>,
88}
89
90#[derive(Serialize, Deserialize, Debug, Clone)]
91pub struct JsonDataFrameColumnResponse {
92    #[serde(flatten)]
93    pub status: StatusMessage,
94    pub diff: Option<Vec<DataFrameColumnChange>>,
95    pub data_frame: JsonDataFrameViews,
96    pub commit: Option<Commit>,
97    pub resource: Option<ResourceVersion>,
98    pub derived_resource: Option<DerivedDFResource>,
99}
100#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)]
101#[serde(rename_all = "snake_case")]
102pub enum DFResourceType {
103    Compare,
104    Diff,
105    Query,
106}
107
108#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)]
109pub struct DerivedDFResource {
110    pub resource_id: String,
111    pub path: String,
112    pub resource_type: DFResourceType,
113}
114
115impl WorkspaceJsonDataFrameViewResponse {
116    pub async fn empty() -> WorkspaceJsonDataFrameViewResponse {
117        WorkspaceJsonDataFrameViewResponse {
118            status: StatusMessage::resource_found(),
119            data_frame: Some(JsonDataFrameViews::empty().await),
120            commit: None,
121            resource: None,
122            derived_resource: None,
123            is_indexed: false,
124        }
125    }
126}
127
128impl JsonDataFrameViews {
129    pub async fn empty() -> JsonDataFrameViews {
130        JsonDataFrameViews::from_df_and_opts(DataFrame::empty(), Schema::empty(), &DFOpts::empty())
131            .await
132    }
133}
134
135impl JsonDataFrameView {
136    pub fn empty() -> JsonDataFrameView {
137        JsonDataFrameView::empty_with_schema(&Schema::empty(), 0, &DFOpts::empty())
138    }
139
140    pub async fn from_df_opts(
141        df: DataFrame,
142        og_schema: Schema,
143        opts: &DFOpts,
144    ) -> JsonDataFrameView {
145        let full_width = df.width();
146        let full_height = df.height();
147
148        let page_size = opts.page_size.unwrap_or(constants::DEFAULT_PAGE_SIZE);
149        let page = opts.page.unwrap_or(constants::DEFAULT_PAGE_NUM);
150
151        let start = if page == 0 { 0 } else { page_size * (page - 1) };
152        let end = page_size * page;
153
154        let total_pages = (full_height as f64 / page_size as f64).ceil() as usize;
155
156        let mut opts = opts.clone();
157
158        if df.height() == 0 {
159            return JsonDataFrameView::empty_with_schema(&og_schema, full_height, &opts);
160        };
161
162        opts.slice = Some(format!("{start}..{end}"));
163        let opts_view = DFOptsView::from_df_opts(&opts);
164        let mut sliced_df = tabular::transform(df, opts).await.unwrap();
165
166        // Merge the metadata from the original schema
167        let mut slice_schema = Schema::from_polars(sliced_df.schema());
168        slice_schema.update_metadata_from_schema(&og_schema);
169
170        JsonDataFrameView {
171            schema: slice_schema,
172            size: DataFrameSize {
173                height: full_height,
174                width: full_width,
175            },
176            data: JsonDataFrameView::json_from_df(&mut sliced_df),
177            pagination: Pagination {
178                page_number: page,
179                page_size,
180                total_pages,
181                total_entries: full_height,
182            },
183            opts: opts_view,
184        }
185    }
186
187    pub async fn from_df_opts_unpaginated(
188        df: DataFrame,
189        og_schema: Schema,
190        og_height: usize,
191        opts: &DFOpts,
192    ) -> JsonDataFrameView {
193        let full_width = df.width();
194        let view_height = df.height();
195
196        // Unpaginated means we don't need to slice the df
197        let mut opts = opts.clone();
198        opts.slice = None;
199
200        let opts_view = DFOptsView::from_df_opts(&opts);
201        let mut sliced_df = tabular::transform(df, opts.clone()).await.unwrap();
202
203        // Merge the metadata from the original schema
204        let mut slice_schema = Schema::from_polars(sliced_df.schema());
205        log::debug!("OG schema {og_schema:?}");
206        log::debug!("Pre-Slice schema {slice_schema:?}");
207        slice_schema.update_metadata_from_schema(&og_schema);
208        log::debug!("Slice schema {slice_schema:?}");
209
210        let page_size = opts.page_size.unwrap_or(constants::DEFAULT_PAGE_SIZE);
211        let page_number = opts.page.unwrap_or(constants::DEFAULT_PAGE_NUM);
212
213        let total_pages = (og_height as f64 / page_size as f64).ceil() as usize;
214
215        JsonDataFrameView {
216            schema: slice_schema,
217            size: DataFrameSize {
218                height: view_height,
219                width: full_width,
220            },
221            data: JsonDataFrameView::json_from_df(&mut sliced_df),
222            pagination: Pagination {
223                page_number,
224                page_size,
225                total_pages,
226                total_entries: og_height,
227            },
228            opts: opts_view,
229        }
230    }
231
232    pub async fn to_df(&self) -> DataFrame {
233        if self.data == serde_json::Value::Null {
234            DataFrame::empty()
235        } else {
236            // The fields were coming out of order, so we need to reorder them
237            let columns = self.schema.fields_names();
238            log::debug!("Got columns: {columns:?}");
239
240            match &self.data {
241                serde_json::Value::Array(arr) => {
242                    if !arr.is_empty() {
243                        let data = self.data.to_string();
244                        let content = Cursor::new(data.as_bytes());
245                        // log::debug!("Deserializing df: [{}]", data);
246                        let df = JsonReader::new(content).finish().unwrap();
247
248                        let opts = DFOpts::from_column_names(columns);
249                        tabular::transform(df, opts).await.unwrap()
250                    } else {
251                        let cols = columns
252                            .iter()
253                            .map(|name| {
254                                Column::Series(
255                                    Series::new(PlSmallStr::from_str(name), Vec::<&str>::new())
256                                        .into(),
257                                )
258                            })
259                            .collect::<Vec<Column>>();
260                        DataFrame::new(cols).unwrap()
261                    }
262                }
263                _ => {
264                    log::error!("Could not parse non-array json data: {:?}", self.data);
265                    DataFrame::empty()
266                }
267            }
268        }
269    }
270
271    pub fn json_from_df(df: &mut DataFrame) -> serde_json::Value {
272        log::debug!("Serializing df: [{df}]");
273
274        sanitize_df_for_serialization(df).expect("Error cleaning df before serialization");
275
276        // TODO: catch errors
277        let data: Vec<u8> = Vec::new();
278        let mut buf = BufWriter::new(data);
279
280        let mut writer = JsonWriter::new(&mut buf).with_json_format(JsonFormat::Json);
281        writer.finish(df).expect("Could not write df json buffer");
282
283        let buffer = buf.into_inner().expect("Could not get buffer");
284
285        let json_str = str::from_utf8(&buffer).unwrap();
286
287        serde_json::from_str(json_str).unwrap()
288    }
289
290    fn empty_with_schema(
291        schema: &Schema,
292        total_entries: usize,
293        opts: &DFOpts,
294    ) -> JsonDataFrameView {
295        let mut default_df = DataFrame::empty();
296        JsonDataFrameView {
297            schema: schema.to_owned(),
298            size: DataFrameSize {
299                height: 0,
300                width: schema.fields_names().len(),
301            },
302            data: JsonDataFrameView::json_from_df(&mut default_df),
303            pagination: Pagination {
304                page_number: 0,
305                page_size: 0,
306                total_pages: 0,
307                total_entries,
308            },
309            opts: DFOptsView::from_df_opts(opts),
310        }
311    }
312}
313
314impl JsonDataFrameViews {
315    pub async fn from_df_and_opts(
316        df: DataFrame,
317        og_schema: Schema,
318        opts: &DFOpts,
319    ) -> JsonDataFrameViews {
320        let source = DataFrameSchemaSize::from_df(&df, &og_schema);
321        let view = JsonDataFrameView::from_df_opts(df, og_schema, opts).await;
322        JsonDataFrameViews { source, view }
323    }
324
325    // To avoid duplicate pagination when the pagination has already been applied
326    // most commonly from duckdb / other sql
327    pub async fn from_df_and_opts_unpaginated(
328        df: DataFrame,
329        og_schema: Schema,
330        og_height: usize,
331        opts: &DFOpts,
332    ) -> JsonDataFrameViews {
333        let source = DataFrameSchemaSize::from_df(&df, &og_schema);
334        let view =
335            JsonDataFrameView::from_df_opts_unpaginated(df, og_schema, og_height, opts).await;
336        JsonDataFrameViews { source, view }
337    }
338}
339
340fn sanitize_df_for_serialization(df: &mut DataFrame) -> Result<(), OxenError> {
341    let schema = df.schema();
342    let mut updates: Vec<(usize, Column)> = Vec::new();
343
344    // Collect all updates first
345    for (idx, _field) in schema.iter_fields().enumerate() {
346        let series = df.select_at_idx(idx).unwrap();
347
348        if let Some(new_series) = match series.dtype() {
349            DataType::Binary | DataType::BinaryOffset => {
350                Some(cast_binary_to_string_with_fallback(series, "[binary]"))
351            }
352            DataType::Struct(subfields) => {
353                if has_binary_in_struct(subfields) {
354                    Some(cast_binary_to_string_with_fallback(
355                        series,
356                        &format!("struct[{}]", subfields.len()),
357                    ))
358                } else {
359                    None
360                }
361            }
362            DataType::List(subtype) => match **subtype {
363                DataType::Binary | DataType::BinaryOffset => Some(
364                    cast_binary_to_string_with_fallback(series, "[list[binary]]"),
365                ),
366                DataType::Struct(ref subfields) => {
367                    if has_binary_in_struct(subfields) {
368                        Some(cast_binary_to_string_with_fallback(
369                            series,
370                            &format!("[list[struct[{}]]]", subfields.len()),
371                        ))
372                    } else {
373                        None
374                    }
375                }
376                _ => None,
377            },
378            _ => None,
379        } {
380            updates.push((idx, new_series));
381        }
382    }
383
384    // Apply updates after collecting them all
385    for (idx, new_series) in updates {
386        df.replace_column(idx, new_series)?;
387    }
388
389    Ok(())
390}
391
392fn has_binary_in_struct(subfields: &[Field]) -> bool {
393    subfields.iter().any(|field| {
394        matches!(field.dtype(), DataType::Binary | DataType::BinaryOffset)
395            || field.dtype().to_string().contains("Binary")
396    })
397}
398
399fn cast_binary_to_string_with_fallback(series: &Column, out: &str) -> Column {
400    let res = series.cast(&DataType::String);
401    if let Ok(series) = res {
402        series
403    } else {
404        let mut vec = vec![out];
405        vec.resize(series.len(), out);
406        Column::new(series.name().clone(), vec)
407    }
408}