1use std::io::BufWriter;
4use std::str;
5
6use polars::prelude::*;
7use serde::{Deserialize, Serialize};
8use std::io::Cursor;
9use utoipa::ToSchema;
10
11use super::StatusMessage;
12use super::data_frames::DataFrameColumnChange;
13use super::data_frames::DataFrameRowChange;
14use crate::constants;
15use crate::core::df::tabular;
16use crate::error::OxenError;
17use crate::model::Commit;
18use crate::model::DataFrameSize;
19use crate::model::data_frame::DataFrameSchemaSize;
20use crate::opts::df_opts::DFOptsView;
21
22use crate::view::Pagination;
23use crate::view::entries::ResourceVersion;
24use crate::{model::Schema, opts::DFOpts};
25
26#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)]
27pub struct JsonDataFrameView {
28 pub schema: Schema,
29 pub size: DataFrameSize,
30 pub data: serde_json::Value,
31 pub pagination: Pagination,
32 #[serde(flatten)]
33 pub opts: DFOptsView,
34}
35
36#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)]
37pub struct JsonDataFrameViews {
38 pub source: DataFrameSchemaSize,
39 pub view: JsonDataFrameView,
40}
41
42#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)]
43pub struct JsonDataFrameViewResponse {
44 #[serde(flatten)]
45 pub status: StatusMessage,
46 pub data_frame: JsonDataFrameViews,
47 pub commit: Option<Commit>,
48 pub resource: Option<ResourceVersion>,
49 pub derived_resource: Option<DerivedDFResource>,
50}
51
52#[derive(Serialize, Deserialize, Debug, Clone)]
53pub struct WorkspaceJsonDataFrameViewResponse {
54 #[serde(flatten)]
55 pub status: StatusMessage,
56 pub data_frame: Option<JsonDataFrameViews>,
57 pub commit: Option<Commit>,
58 pub resource: Option<ResourceVersion>,
59 pub derived_resource: Option<DerivedDFResource>,
60 pub is_indexed: bool,
61}
62
63#[derive(Serialize, Deserialize, Debug, Clone)]
64pub struct JsonDataFrameRowResponse {
65 #[serde(flatten)]
66 pub status: StatusMessage,
67 pub diff: Option<Vec<DataFrameRowChange>>,
68 pub data_frame: JsonDataFrameViews,
69 pub commit: Option<Commit>,
70 pub resource: Option<ResourceVersion>,
71 pub derived_resource: Option<DerivedDFResource>,
72 pub row_id: Option<String>,
73 pub row_index: Option<usize>,
74}
75
76#[derive(Serialize, Deserialize, Debug, Clone)]
77pub struct VecBatchUpdateResponse {
78 #[serde(flatten)]
79 pub status: StatusMessage,
80 pub rows: Vec<BatchUpdateResponse>,
81}
82
83#[derive(Serialize, Deserialize, Debug, Clone)]
84pub struct BatchUpdateResponse {
85 pub row_id: String,
86 pub code: i32,
87 pub error: Option<String>,
88}
89
90#[derive(Serialize, Deserialize, Debug, Clone)]
91pub struct JsonDataFrameColumnResponse {
92 #[serde(flatten)]
93 pub status: StatusMessage,
94 pub diff: Option<Vec<DataFrameColumnChange>>,
95 pub data_frame: JsonDataFrameViews,
96 pub commit: Option<Commit>,
97 pub resource: Option<ResourceVersion>,
98 pub derived_resource: Option<DerivedDFResource>,
99}
100#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)]
101#[serde(rename_all = "snake_case")]
102pub enum DFResourceType {
103 Compare,
104 Diff,
105 Query,
106}
107
108#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)]
109pub struct DerivedDFResource {
110 pub resource_id: String,
111 pub path: String,
112 pub resource_type: DFResourceType,
113}
114
115impl WorkspaceJsonDataFrameViewResponse {
116 pub async fn empty() -> WorkspaceJsonDataFrameViewResponse {
117 WorkspaceJsonDataFrameViewResponse {
118 status: StatusMessage::resource_found(),
119 data_frame: Some(JsonDataFrameViews::empty().await),
120 commit: None,
121 resource: None,
122 derived_resource: None,
123 is_indexed: false,
124 }
125 }
126}
127
128impl JsonDataFrameViews {
129 pub async fn empty() -> JsonDataFrameViews {
130 JsonDataFrameViews::from_df_and_opts(DataFrame::empty(), Schema::empty(), &DFOpts::empty())
131 .await
132 }
133}
134
135impl JsonDataFrameView {
136 pub fn empty() -> JsonDataFrameView {
137 JsonDataFrameView::empty_with_schema(&Schema::empty(), 0, &DFOpts::empty())
138 }
139
140 pub async fn from_df_opts(
141 df: DataFrame,
142 og_schema: Schema,
143 opts: &DFOpts,
144 ) -> JsonDataFrameView {
145 let full_width = df.width();
146 let full_height = df.height();
147
148 let page_size = opts.page_size.unwrap_or(constants::DEFAULT_PAGE_SIZE);
149 let page = opts.page.unwrap_or(constants::DEFAULT_PAGE_NUM);
150
151 let start = if page == 0 { 0 } else { page_size * (page - 1) };
152 let end = page_size * page;
153
154 let total_pages = (full_height as f64 / page_size as f64).ceil() as usize;
155
156 let mut opts = opts.clone();
157
158 if df.height() == 0 {
159 return JsonDataFrameView::empty_with_schema(&og_schema, full_height, &opts);
160 };
161
162 opts.slice = Some(format!("{start}..{end}"));
163 let opts_view = DFOptsView::from_df_opts(&opts);
164 let mut sliced_df = tabular::transform(df, opts).await.unwrap();
165
166 let mut slice_schema = Schema::from_polars(sliced_df.schema());
168 slice_schema.update_metadata_from_schema(&og_schema);
169
170 JsonDataFrameView {
171 schema: slice_schema,
172 size: DataFrameSize {
173 height: full_height,
174 width: full_width,
175 },
176 data: JsonDataFrameView::json_from_df(&mut sliced_df),
177 pagination: Pagination {
178 page_number: page,
179 page_size,
180 total_pages,
181 total_entries: full_height,
182 },
183 opts: opts_view,
184 }
185 }
186
187 pub async fn from_df_opts_unpaginated(
188 df: DataFrame,
189 og_schema: Schema,
190 og_height: usize,
191 opts: &DFOpts,
192 ) -> JsonDataFrameView {
193 let full_width = df.width();
194 let view_height = df.height();
195
196 let mut opts = opts.clone();
198 opts.slice = None;
199
200 let opts_view = DFOptsView::from_df_opts(&opts);
201 let mut sliced_df = tabular::transform(df, opts.clone()).await.unwrap();
202
203 let mut slice_schema = Schema::from_polars(sliced_df.schema());
205 log::debug!("OG schema {og_schema:?}");
206 log::debug!("Pre-Slice schema {slice_schema:?}");
207 slice_schema.update_metadata_from_schema(&og_schema);
208 log::debug!("Slice schema {slice_schema:?}");
209
210 let page_size = opts.page_size.unwrap_or(constants::DEFAULT_PAGE_SIZE);
211 let page_number = opts.page.unwrap_or(constants::DEFAULT_PAGE_NUM);
212
213 let total_pages = (og_height as f64 / page_size as f64).ceil() as usize;
214
215 JsonDataFrameView {
216 schema: slice_schema,
217 size: DataFrameSize {
218 height: view_height,
219 width: full_width,
220 },
221 data: JsonDataFrameView::json_from_df(&mut sliced_df),
222 pagination: Pagination {
223 page_number,
224 page_size,
225 total_pages,
226 total_entries: og_height,
227 },
228 opts: opts_view,
229 }
230 }
231
232 pub async fn to_df(&self) -> DataFrame {
233 if self.data == serde_json::Value::Null {
234 DataFrame::empty()
235 } else {
236 let columns = self.schema.fields_names();
238 log::debug!("Got columns: {columns:?}");
239
240 match &self.data {
241 serde_json::Value::Array(arr) => {
242 if !arr.is_empty() {
243 let data = self.data.to_string();
244 let content = Cursor::new(data.as_bytes());
245 let df = JsonReader::new(content).finish().unwrap();
247
248 let opts = DFOpts::from_column_names(columns);
249 tabular::transform(df, opts).await.unwrap()
250 } else {
251 let cols = columns
252 .iter()
253 .map(|name| {
254 Column::Series(
255 Series::new(PlSmallStr::from_str(name), Vec::<&str>::new())
256 .into(),
257 )
258 })
259 .collect::<Vec<Column>>();
260 DataFrame::new(cols).unwrap()
261 }
262 }
263 _ => {
264 log::error!("Could not parse non-array json data: {:?}", self.data);
265 DataFrame::empty()
266 }
267 }
268 }
269 }
270
271 pub fn json_from_df(df: &mut DataFrame) -> serde_json::Value {
272 log::debug!("Serializing df: [{df}]");
273
274 sanitize_df_for_serialization(df).expect("Error cleaning df before serialization");
275
276 let data: Vec<u8> = Vec::new();
278 let mut buf = BufWriter::new(data);
279
280 let mut writer = JsonWriter::new(&mut buf).with_json_format(JsonFormat::Json);
281 writer.finish(df).expect("Could not write df json buffer");
282
283 let buffer = buf.into_inner().expect("Could not get buffer");
284
285 let json_str = str::from_utf8(&buffer).unwrap();
286
287 serde_json::from_str(json_str).unwrap()
288 }
289
290 fn empty_with_schema(
291 schema: &Schema,
292 total_entries: usize,
293 opts: &DFOpts,
294 ) -> JsonDataFrameView {
295 let mut default_df = DataFrame::empty();
296 JsonDataFrameView {
297 schema: schema.to_owned(),
298 size: DataFrameSize {
299 height: 0,
300 width: schema.fields_names().len(),
301 },
302 data: JsonDataFrameView::json_from_df(&mut default_df),
303 pagination: Pagination {
304 page_number: 0,
305 page_size: 0,
306 total_pages: 0,
307 total_entries,
308 },
309 opts: DFOptsView::from_df_opts(opts),
310 }
311 }
312}
313
314impl JsonDataFrameViews {
315 pub async fn from_df_and_opts(
316 df: DataFrame,
317 og_schema: Schema,
318 opts: &DFOpts,
319 ) -> JsonDataFrameViews {
320 let source = DataFrameSchemaSize::from_df(&df, &og_schema);
321 let view = JsonDataFrameView::from_df_opts(df, og_schema, opts).await;
322 JsonDataFrameViews { source, view }
323 }
324
325 pub async fn from_df_and_opts_unpaginated(
328 df: DataFrame,
329 og_schema: Schema,
330 og_height: usize,
331 opts: &DFOpts,
332 ) -> JsonDataFrameViews {
333 let source = DataFrameSchemaSize::from_df(&df, &og_schema);
334 let view =
335 JsonDataFrameView::from_df_opts_unpaginated(df, og_schema, og_height, opts).await;
336 JsonDataFrameViews { source, view }
337 }
338}
339
340fn sanitize_df_for_serialization(df: &mut DataFrame) -> Result<(), OxenError> {
341 let schema = df.schema();
342 let mut updates: Vec<(usize, Column)> = Vec::new();
343
344 for (idx, _field) in schema.iter_fields().enumerate() {
346 let series = df.select_at_idx(idx).unwrap();
347
348 if let Some(new_series) = match series.dtype() {
349 DataType::Binary | DataType::BinaryOffset => {
350 Some(cast_binary_to_string_with_fallback(series, "[binary]"))
351 }
352 DataType::Struct(subfields) => {
353 if has_binary_in_struct(subfields) {
354 Some(cast_binary_to_string_with_fallback(
355 series,
356 &format!("struct[{}]", subfields.len()),
357 ))
358 } else {
359 None
360 }
361 }
362 DataType::List(subtype) => match **subtype {
363 DataType::Binary | DataType::BinaryOffset => Some(
364 cast_binary_to_string_with_fallback(series, "[list[binary]]"),
365 ),
366 DataType::Struct(ref subfields) => {
367 if has_binary_in_struct(subfields) {
368 Some(cast_binary_to_string_with_fallback(
369 series,
370 &format!("[list[struct[{}]]]", subfields.len()),
371 ))
372 } else {
373 None
374 }
375 }
376 _ => None,
377 },
378 _ => None,
379 } {
380 updates.push((idx, new_series));
381 }
382 }
383
384 for (idx, new_series) in updates {
386 df.replace_column(idx, new_series)?;
387 }
388
389 Ok(())
390}
391
392fn has_binary_in_struct(subfields: &[Field]) -> bool {
393 subfields.iter().any(|field| {
394 matches!(field.dtype(), DataType::Binary | DataType::BinaryOffset)
395 || field.dtype().to_string().contains("Binary")
396 })
397}
398
399fn cast_binary_to_string_with_fallback(series: &Column, out: &str) -> Column {
400 let res = series.cast(&DataType::String);
401 if let Ok(series) = res {
402 series
403 } else {
404 let mut vec = vec![out];
405 vec.resize(series.len(), out);
406 Column::new(series.name().clone(), vec)
407 }
408}