vineyard_polars/ds/
dataframe.rs

1// Copyright 2020-2023 Alibaba Group Holding Limited.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use arrow2::array;
16use arrow2::datatypes;
17use itertools::izip;
18use polars_core::prelude as polars;
19use serde_json::Value;
20
21use vineyard::client::*;
22use vineyard::ds::arrow::{Table, TableBuilder};
23use vineyard::ds::dataframe::DataFrame as VineyardDataFrame;
24
25/// Convert a Polars error to a Vineyard error, as orphan impls are not allowed
26/// in Rust
27///
28/// Usage:
29///
30/// ```ignore
31/// let x = polars::DataFrame::new(...).map_err(error)?;
32/// ```
33pub fn error(error: polars::PolarsError) -> VineyardError {
34    VineyardError::invalid(format!("{}", error))
35}
36
37#[derive(Debug, Default)]
38pub struct DataFrame {
39    meta: ObjectMeta,
40    dataframe: polars::DataFrame,
41}
42
43impl_typename!(DataFrame, "vineyard::Table");
44
45impl Object for DataFrame {
46    fn construct(&mut self, meta: ObjectMeta) -> Result<()> {
47        let ty = meta.get_typename()?;
48        if ty == typename::<VineyardDataFrame>() {
49            return self.construct_from_pandas_dataframe(meta);
50        } else if ty == typename::<Table>() {
51            return self.construct_from_arrow_table(meta);
52        } else {
53            return Err(VineyardError::type_error(format!(
54                "cannot construct DataFrame from this metadata: {}",
55                ty
56            )));
57        }
58    }
59}
60
61register_vineyard_object!(DataFrame);
62
63impl DataFrame {
64    pub fn new_boxed(meta: ObjectMeta) -> Result<Box<dyn Object>> {
65        let mut object = Box::<Self>::default();
66        object.construct(meta)?;
67        Ok(object)
68    }
69
70    fn construct_from_pandas_dataframe(&mut self, meta: ObjectMeta) -> Result<()> {
71        vineyard_assert_typename(typename::<VineyardDataFrame>(), meta.get_typename()?)?;
72        let dataframe = downcast_object::<VineyardDataFrame>(VineyardDataFrame::new_boxed(meta)?)?;
73        let names = dataframe.names().to_vec();
74        let columns: Vec<Box<dyn array::Array>> = dataframe
75            .columns()
76            .iter()
77            .map(|c| array::from_data(&c.array().to_data()))
78            .collect();
79        let series: Vec<polars_core::series::Series> = names
80            .iter()
81            .zip(columns)
82            .map(|(name, column)| {
83                let datatype = polars::DataType::from(column.data_type());
84                unsafe {
85                    polars_core::series::Series::from_chunks_and_dtype_unchecked(
86                        name,
87                        vec![column],
88                        &datatype,
89                    )
90                }
91            })
92            .collect::<Vec<_>>();
93        self.meta = dataframe.metadata();
94        self.dataframe = polars::DataFrame::new(series).map_err(error)?;
95        return Ok(());
96    }
97
98    fn construct_from_arrow_table(&mut self, meta: ObjectMeta) -> Result<()> {
99        vineyard_assert_typename(typename::<Table>(), meta.get_typename()?)?;
100        let table = downcast_object::<Table>(Table::new_boxed(meta)?)?;
101        let schema = table.schema();
102        let names = schema
103            .fields()
104            .iter()
105            .map(|f| f.name().clone())
106            .collect::<Vec<_>>();
107        let types = schema
108            .fields()
109            .iter()
110            .map(|f| f.data_type().clone())
111            .collect::<Vec<_>>();
112        let mut columns: Vec<Vec<Box<dyn array::Array>>> = Vec::with_capacity(table.num_columns());
113        for index in 0..table.num_columns() {
114            let mut chunks = Vec::with_capacity(table.num_batches());
115            for batch in table.batches() {
116                let batch = batch.as_ref().as_ref();
117                let chunk = batch.column(index);
118                chunks.push(array::from_data(&chunk.to_data()));
119            }
120            columns.push(chunks);
121        }
122        let series: Vec<polars_core::series::Series> = izip!(&names, types, columns)
123            .map(|(name, datatype, chunks)| unsafe {
124                polars_core::series::Series::from_chunks_and_dtype_unchecked(
125                    name,
126                    chunks,
127                    &polars::DataType::from(&datatypes::DataType::from(datatype)),
128                )
129            })
130            .collect::<Vec<_>>();
131        self.meta = table.metadata();
132        self.dataframe = polars::DataFrame::new(series).map_err(error)?;
133        return Ok(());
134    }
135}
136
137impl AsRef<polars::DataFrame> for DataFrame {
138    fn as_ref(&self) -> &polars::DataFrame {
139        &self.dataframe
140    }
141}
142
143/// Building a polars dataframe into a pandas-compatible dataframe.
144pub struct PandasDataFrameBuilder {
145    sealed: bool,
146    names: Vec<String>,
147    columns: Vec<Box<dyn Object>>,
148}
149
150impl ObjectBuilder for PandasDataFrameBuilder {
151    fn sealed(&self) -> bool {
152        self.sealed
153    }
154
155    fn set_sealed(&mut self, sealed: bool) {
156        self.sealed = sealed;
157    }
158}
159
160impl ObjectBase for PandasDataFrameBuilder {
161    fn build(&mut self, _client: &mut IPCClient) -> Result<()> {
162        if self.sealed {
163            return Ok(());
164        }
165        self.set_sealed(true);
166        return Ok(());
167    }
168
169    fn seal(mut self, client: &mut IPCClient) -> Result<Box<dyn Object>> {
170        self.build(client)?;
171        let mut meta = ObjectMeta::new_from_typename(typename::<DataFrame>());
172        meta.add_usize("__values_-size", self.names.len());
173        meta.add_isize("partition_index_row_", -1);
174        meta.add_isize("partition_index_column_", -1);
175        meta.add_isize("row_batch_index_", -1);
176        for (index, (name, column)) in self.names.iter().zip(self.columns).enumerate() {
177            meta.add_value(
178                &format!("__values_-key-{}", index),
179                Value::String(name.into()),
180            );
181            meta.add_member(&format!("__values_-value-{}", index), column)?;
182        }
183        let metadata = client.create_metadata(&meta)?;
184        return DataFrame::new_boxed(metadata);
185    }
186}
187
188impl PandasDataFrameBuilder {
189    pub fn new(client: &mut IPCClient, dataframe: &polars::DataFrame) -> Result<Self> {
190        let mut names = Vec::with_capacity(dataframe.width());
191        let mut columns = Vec::with_capacity(dataframe.width());
192        for column in dataframe.get_columns() {
193            let column = column.rechunk(); // FIXME(avoid copying)
194            names.push(column.name().into());
195            columns.push(column.chunks()[0].clone());
196        }
197        return Self::new_from_arrays(client, names, columns);
198    }
199
200    pub fn new_from_columns(names: Vec<String>, columns: Vec<Box<dyn Object>>) -> Result<Self> {
201        return Ok(PandasDataFrameBuilder {
202            sealed: false,
203            names,
204            columns,
205        });
206    }
207
208    pub fn new_from_arrays(
209        client: &mut IPCClient,
210        names: Vec<String>,
211        arrays: Vec<Box<dyn array::Array>>,
212    ) -> Result<Self> {
213        use vineyard::ds::tensor::build_tensor;
214
215        let mut columns = Vec::with_capacity(arrays.len());
216        for array in arrays {
217            columns.push(build_tensor(client, array.into())?);
218        }
219        return Ok(PandasDataFrameBuilder {
220            sealed: false,
221            names,
222            columns,
223        });
224    }
225}
226
227/// Building a polars dataframe into a arrow's table-compatible dataframe.
228pub struct ArrowDataFrameBuilder(pub TableBuilder);
229
230impl ObjectBuilder for ArrowDataFrameBuilder {
231    fn sealed(&self) -> bool {
232        self.0.sealed()
233    }
234
235    fn set_sealed(&mut self, sealed: bool) {
236        self.0.set_sealed(sealed)
237    }
238}
239
240impl ObjectBase for ArrowDataFrameBuilder {
241    fn build(&mut self, client: &mut IPCClient) -> Result<()> {
242        self.0.build(client)
243    }
244
245    fn seal(self, client: &mut IPCClient) -> Result<Box<dyn Object>> {
246        let table = downcast_object::<Table>(self.0.seal(client)?)?;
247        return DataFrame::new_boxed(table.metadata());
248    }
249}
250
251impl ArrowDataFrameBuilder {
252    pub fn new(client: &mut IPCClient, dataframe: &polars::DataFrame) -> Result<Self> {
253        let mut names = Vec::with_capacity(dataframe.width());
254        let mut datatypes = Vec::with_capacity(dataframe.width());
255        let mut columns = Vec::with_capacity(dataframe.width());
256        for column in dataframe.get_columns() {
257            names.push(column.name().into());
258            datatypes.push(column.dtype().to_arrow());
259            columns.push(column.chunks().clone());
260        }
261        return Self::new_from_columns(client, names, datatypes, columns);
262    }
263
264    /// batches[0]: the first record batch
265    /// batches[0][0]: the first column of the first record batch
266    pub fn new_from_batch_columns(
267        client: &mut IPCClient,
268        names: Vec<String>,
269        datatypes: Vec<datatypes::DataType>,
270        num_rows: Vec<usize>,
271        num_columns: usize,
272        batches: Vec<Vec<Box<dyn Object>>>,
273    ) -> Result<Self> {
274        let schema = arrow_schema::Schema::new(
275            izip!(names, datatypes)
276                .map(|(name, datatype)| {
277                    arrow_schema::Field::from(datatypes::Field::new(name, datatype, false))
278                })
279                .collect::<Vec<_>>(),
280        );
281        return Ok(ArrowDataFrameBuilder(TableBuilder::new_from_batch_columns(
282            client,
283            &schema,
284            num_rows,
285            num_columns,
286            batches,
287        )?));
288    }
289
290    /// batches[0]: the first record batch
291    /// batches[0][0]: the first column of the first record batch
292    pub fn new_from_batches(
293        client: &mut IPCClient,
294        names: Vec<String>,
295        datatypes: Vec<datatypes::DataType>,
296        batches: Vec<Vec<Box<dyn array::Array>>>,
297    ) -> Result<Self> {
298        use vineyard::ds::arrow::build_array;
299
300        let mut num_rows = Vec::with_capacity(batches.len());
301        let mut num_columns = 0;
302        let mut chunks = Vec::with_capacity(batches.len());
303        for batch in batches {
304            let mut columns = Vec::with_capacity(batch.len());
305            num_columns = columns.len();
306            if num_columns == 0 {
307                num_rows.push(0);
308            } else {
309                num_rows.push(batch[0].len());
310            }
311            for array in batch {
312                columns.push(build_array(client, array.into())?);
313            }
314            chunks.push(columns);
315        }
316        return Self::new_from_batch_columns(
317            client,
318            names,
319            datatypes,
320            num_rows,
321            num_columns,
322            chunks,
323        );
324    }
325
326    /// columns[0]: the first column
327    /// columns[0][0]: the first chunk of the first column
328    pub fn new_from_columns(
329        client: &mut IPCClient,
330        names: Vec<String>,
331        datatypes: Vec<datatypes::DataType>,
332        columns: Vec<Vec<Box<dyn array::Array>>>,
333    ) -> Result<Self> {
334        use vineyard::ds::arrow::build_array;
335
336        let mut num_rows = Vec::new();
337        let num_columns = columns.len();
338        let mut chunks = Vec::new();
339        for (column_index, column) in columns.into_iter().enumerate() {
340            for (chunk_index, chunk) in column.into_iter().enumerate() {
341                if column_index == 0 {
342                    chunks.push(Vec::new());
343                    num_rows.push(chunk.len());
344                }
345                chunks[chunk_index].push(build_array(client, chunk.into())?);
346            }
347        }
348        return Self::new_from_batch_columns(
349            client,
350            names,
351            datatypes,
352            num_rows,
353            num_columns,
354            chunks,
355        );
356    }
357}