1use arrow2::array;
16use arrow2::datatypes;
17use itertools::izip;
18use polars_core::prelude as polars;
19use serde_json::Value;
20
21use vineyard::client::*;
22use vineyard::ds::arrow::{Table, TableBuilder};
23use vineyard::ds::dataframe::DataFrame as VineyardDataFrame;
24
25pub fn error(error: polars::PolarsError) -> VineyardError {
34 VineyardError::invalid(format!("{}", error))
35}
36
37#[derive(Debug, Default)]
38pub struct DataFrame {
39 meta: ObjectMeta,
40 dataframe: polars::DataFrame,
41}
42
43impl_typename!(DataFrame, "vineyard::Table");
44
45impl Object for DataFrame {
46 fn construct(&mut self, meta: ObjectMeta) -> Result<()> {
47 let ty = meta.get_typename()?;
48 if ty == typename::<VineyardDataFrame>() {
49 return self.construct_from_pandas_dataframe(meta);
50 } else if ty == typename::<Table>() {
51 return self.construct_from_arrow_table(meta);
52 } else {
53 return Err(VineyardError::type_error(format!(
54 "cannot construct DataFrame from this metadata: {}",
55 ty
56 )));
57 }
58 }
59}
60
61register_vineyard_object!(DataFrame);
62
63impl DataFrame {
64 pub fn new_boxed(meta: ObjectMeta) -> Result<Box<dyn Object>> {
65 let mut object = Box::<Self>::default();
66 object.construct(meta)?;
67 Ok(object)
68 }
69
70 fn construct_from_pandas_dataframe(&mut self, meta: ObjectMeta) -> Result<()> {
71 vineyard_assert_typename(typename::<VineyardDataFrame>(), meta.get_typename()?)?;
72 let dataframe = downcast_object::<VineyardDataFrame>(VineyardDataFrame::new_boxed(meta)?)?;
73 let names = dataframe.names().to_vec();
74 let columns: Vec<Box<dyn array::Array>> = dataframe
75 .columns()
76 .iter()
77 .map(|c| array::from_data(&c.array().to_data()))
78 .collect();
79 let series: Vec<polars_core::series::Series> = names
80 .iter()
81 .zip(columns)
82 .map(|(name, column)| {
83 let datatype = polars::DataType::from(column.data_type());
84 unsafe {
85 polars_core::series::Series::from_chunks_and_dtype_unchecked(
86 name,
87 vec![column],
88 &datatype,
89 )
90 }
91 })
92 .collect::<Vec<_>>();
93 self.meta = dataframe.metadata();
94 self.dataframe = polars::DataFrame::new(series).map_err(error)?;
95 return Ok(());
96 }
97
98 fn construct_from_arrow_table(&mut self, meta: ObjectMeta) -> Result<()> {
99 vineyard_assert_typename(typename::<Table>(), meta.get_typename()?)?;
100 let table = downcast_object::<Table>(Table::new_boxed(meta)?)?;
101 let schema = table.schema();
102 let names = schema
103 .fields()
104 .iter()
105 .map(|f| f.name().clone())
106 .collect::<Vec<_>>();
107 let types = schema
108 .fields()
109 .iter()
110 .map(|f| f.data_type().clone())
111 .collect::<Vec<_>>();
112 let mut columns: Vec<Vec<Box<dyn array::Array>>> = Vec::with_capacity(table.num_columns());
113 for index in 0..table.num_columns() {
114 let mut chunks = Vec::with_capacity(table.num_batches());
115 for batch in table.batches() {
116 let batch = batch.as_ref().as_ref();
117 let chunk = batch.column(index);
118 chunks.push(array::from_data(&chunk.to_data()));
119 }
120 columns.push(chunks);
121 }
122 let series: Vec<polars_core::series::Series> = izip!(&names, types, columns)
123 .map(|(name, datatype, chunks)| unsafe {
124 polars_core::series::Series::from_chunks_and_dtype_unchecked(
125 name,
126 chunks,
127 &polars::DataType::from(&datatypes::DataType::from(datatype)),
128 )
129 })
130 .collect::<Vec<_>>();
131 self.meta = table.metadata();
132 self.dataframe = polars::DataFrame::new(series).map_err(error)?;
133 return Ok(());
134 }
135}
136
137impl AsRef<polars::DataFrame> for DataFrame {
138 fn as_ref(&self) -> &polars::DataFrame {
139 &self.dataframe
140 }
141}
142
143pub struct PandasDataFrameBuilder {
145 sealed: bool,
146 names: Vec<String>,
147 columns: Vec<Box<dyn Object>>,
148}
149
150impl ObjectBuilder for PandasDataFrameBuilder {
151 fn sealed(&self) -> bool {
152 self.sealed
153 }
154
155 fn set_sealed(&mut self, sealed: bool) {
156 self.sealed = sealed;
157 }
158}
159
160impl ObjectBase for PandasDataFrameBuilder {
161 fn build(&mut self, _client: &mut IPCClient) -> Result<()> {
162 if self.sealed {
163 return Ok(());
164 }
165 self.set_sealed(true);
166 return Ok(());
167 }
168
169 fn seal(mut self, client: &mut IPCClient) -> Result<Box<dyn Object>> {
170 self.build(client)?;
171 let mut meta = ObjectMeta::new_from_typename(typename::<DataFrame>());
172 meta.add_usize("__values_-size", self.names.len());
173 meta.add_isize("partition_index_row_", -1);
174 meta.add_isize("partition_index_column_", -1);
175 meta.add_isize("row_batch_index_", -1);
176 for (index, (name, column)) in self.names.iter().zip(self.columns).enumerate() {
177 meta.add_value(
178 &format!("__values_-key-{}", index),
179 Value::String(name.into()),
180 );
181 meta.add_member(&format!("__values_-value-{}", index), column)?;
182 }
183 let metadata = client.create_metadata(&meta)?;
184 return DataFrame::new_boxed(metadata);
185 }
186}
187
188impl PandasDataFrameBuilder {
189 pub fn new(client: &mut IPCClient, dataframe: &polars::DataFrame) -> Result<Self> {
190 let mut names = Vec::with_capacity(dataframe.width());
191 let mut columns = Vec::with_capacity(dataframe.width());
192 for column in dataframe.get_columns() {
193 let column = column.rechunk(); names.push(column.name().into());
195 columns.push(column.chunks()[0].clone());
196 }
197 return Self::new_from_arrays(client, names, columns);
198 }
199
200 pub fn new_from_columns(names: Vec<String>, columns: Vec<Box<dyn Object>>) -> Result<Self> {
201 return Ok(PandasDataFrameBuilder {
202 sealed: false,
203 names,
204 columns,
205 });
206 }
207
208 pub fn new_from_arrays(
209 client: &mut IPCClient,
210 names: Vec<String>,
211 arrays: Vec<Box<dyn array::Array>>,
212 ) -> Result<Self> {
213 use vineyard::ds::tensor::build_tensor;
214
215 let mut columns = Vec::with_capacity(arrays.len());
216 for array in arrays {
217 columns.push(build_tensor(client, array.into())?);
218 }
219 return Ok(PandasDataFrameBuilder {
220 sealed: false,
221 names,
222 columns,
223 });
224 }
225}
226
227pub struct ArrowDataFrameBuilder(pub TableBuilder);
229
230impl ObjectBuilder for ArrowDataFrameBuilder {
231 fn sealed(&self) -> bool {
232 self.0.sealed()
233 }
234
235 fn set_sealed(&mut self, sealed: bool) {
236 self.0.set_sealed(sealed)
237 }
238}
239
240impl ObjectBase for ArrowDataFrameBuilder {
241 fn build(&mut self, client: &mut IPCClient) -> Result<()> {
242 self.0.build(client)
243 }
244
245 fn seal(self, client: &mut IPCClient) -> Result<Box<dyn Object>> {
246 let table = downcast_object::<Table>(self.0.seal(client)?)?;
247 return DataFrame::new_boxed(table.metadata());
248 }
249}
250
251impl ArrowDataFrameBuilder {
252 pub fn new(client: &mut IPCClient, dataframe: &polars::DataFrame) -> Result<Self> {
253 let mut names = Vec::with_capacity(dataframe.width());
254 let mut datatypes = Vec::with_capacity(dataframe.width());
255 let mut columns = Vec::with_capacity(dataframe.width());
256 for column in dataframe.get_columns() {
257 names.push(column.name().into());
258 datatypes.push(column.dtype().to_arrow());
259 columns.push(column.chunks().clone());
260 }
261 return Self::new_from_columns(client, names, datatypes, columns);
262 }
263
264 pub fn new_from_batch_columns(
267 client: &mut IPCClient,
268 names: Vec<String>,
269 datatypes: Vec<datatypes::DataType>,
270 num_rows: Vec<usize>,
271 num_columns: usize,
272 batches: Vec<Vec<Box<dyn Object>>>,
273 ) -> Result<Self> {
274 let schema = arrow_schema::Schema::new(
275 izip!(names, datatypes)
276 .map(|(name, datatype)| {
277 arrow_schema::Field::from(datatypes::Field::new(name, datatype, false))
278 })
279 .collect::<Vec<_>>(),
280 );
281 return Ok(ArrowDataFrameBuilder(TableBuilder::new_from_batch_columns(
282 client,
283 &schema,
284 num_rows,
285 num_columns,
286 batches,
287 )?));
288 }
289
290 pub fn new_from_batches(
293 client: &mut IPCClient,
294 names: Vec<String>,
295 datatypes: Vec<datatypes::DataType>,
296 batches: Vec<Vec<Box<dyn array::Array>>>,
297 ) -> Result<Self> {
298 use vineyard::ds::arrow::build_array;
299
300 let mut num_rows = Vec::with_capacity(batches.len());
301 let mut num_columns = 0;
302 let mut chunks = Vec::with_capacity(batches.len());
303 for batch in batches {
304 let mut columns = Vec::with_capacity(batch.len());
305 num_columns = columns.len();
306 if num_columns == 0 {
307 num_rows.push(0);
308 } else {
309 num_rows.push(batch[0].len());
310 }
311 for array in batch {
312 columns.push(build_array(client, array.into())?);
313 }
314 chunks.push(columns);
315 }
316 return Self::new_from_batch_columns(
317 client,
318 names,
319 datatypes,
320 num_rows,
321 num_columns,
322 chunks,
323 );
324 }
325
326 pub fn new_from_columns(
329 client: &mut IPCClient,
330 names: Vec<String>,
331 datatypes: Vec<datatypes::DataType>,
332 columns: Vec<Vec<Box<dyn array::Array>>>,
333 ) -> Result<Self> {
334 use vineyard::ds::arrow::build_array;
335
336 let mut num_rows = Vec::new();
337 let num_columns = columns.len();
338 let mut chunks = Vec::new();
339 for (column_index, column) in columns.into_iter().enumerate() {
340 for (chunk_index, chunk) in column.into_iter().enumerate() {
341 if column_index == 0 {
342 chunks.push(Vec::new());
343 num_rows.push(chunk.len());
344 }
345 chunks[chunk_index].push(build_array(client, chunk.into())?);
346 }
347 }
348 return Self::new_from_batch_columns(
349 client,
350 names,
351 datatypes,
352 num_rows,
353 num_columns,
354 chunks,
355 );
356 }
357}