vineyard_datafusion/ds/
dataframe.rs

1// Copyright 2020-2023 Alibaba Group Holding Limited.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use arrow_array::RecordBatch;
18use arrow_schema::{Field, Schema};
19use datafusion::common::DataFusionError;
20use datafusion::datasource::memory::MemTable;
21use datafusion::datasource::TableProvider;
22
23use vineyard::client::*;
24use vineyard::ds::arrow::{Table, TableBuilder};
25use vineyard::ds::dataframe::DataFrame as VineyardDataFrame;
26
27/// Convert a datafusion error to a Vineyard error, as orphan impls are not allowed
28/// in Rust
29///
30/// Usage:
31///
32/// ```ignore
33/// let x = ctx.select(...).map_err(error)?;
34/// ```
35pub fn error(error: DataFusionError) -> VineyardError {
36    VineyardError::invalid(format!("{}", error))
37}
38
39#[derive(Debug)]
40pub struct DataFrame {
41    meta: ObjectMeta,
42    dataframe: MemTable,
43}
44
45impl Default for DataFrame {
46    fn default() -> Self {
47        DataFrame {
48            meta: ObjectMeta::default(),
49            dataframe: MemTable::try_new(Arc::new(Schema::new(Vec::<Field>::new())), vec![])
50                .unwrap(),
51        }
52    }
53}
54
55impl_typename!(DataFrame, "vineyard::Table");
56
57impl Object for DataFrame {
58    fn construct(&mut self, meta: ObjectMeta) -> Result<()> {
59        let ty = meta.get_typename()?;
60        if ty == typename::<VineyardDataFrame>() {
61            return self.construct_from_pandas_dataframe(meta);
62        } else if ty == typename::<Table>() {
63            return self.construct_from_arrow_table(meta);
64        } else {
65            return Err(VineyardError::type_error(format!(
66                "cannot construct DataFrame from this metadata: {}",
67                ty
68            )));
69        }
70    }
71}
72
73register_vineyard_object!(DataFrame);
74
75impl DataFrame {
76    pub fn new_boxed(meta: ObjectMeta) -> Result<Box<dyn Object>> {
77        let mut object = Box::<Self>::default();
78        object.construct(meta)?;
79        Ok(object)
80    }
81
82    fn construct_from_pandas_dataframe(&mut self, meta: ObjectMeta) -> Result<()> {
83        vineyard_assert_typename(typename::<VineyardDataFrame>(), meta.get_typename()?)?;
84        let dataframe = downcast_object::<VineyardDataFrame>(VineyardDataFrame::new_boxed(meta)?)?;
85        let recordbatch = dataframe.recordbatch()?;
86        self.meta = dataframe.metadata();
87        self.dataframe =
88            MemTable::try_new(recordbatch.schema(), vec![vec![recordbatch]]).map_err(error)?;
89        return Ok(());
90    }
91
92    fn construct_from_arrow_table(&mut self, meta: ObjectMeta) -> Result<()> {
93        vineyard_assert_typename(typename::<Table>(), meta.get_typename()?)?;
94        let table = downcast_object::<Table>(Table::new_boxed(meta)?)?;
95        let schema = table.schema().clone();
96        let batches: Vec<Vec<RecordBatch>> = table
97            .batches()
98            .iter()
99            .map(|batch| vec![batch.as_ref().as_ref().clone()])
100            .collect();
101        self.meta = table.metadata();
102        self.dataframe = MemTable::try_new(Arc::new(schema), batches).map_err(error)?;
103        return Ok(());
104    }
105
106    pub fn table(self) -> MemTable {
107        return self.dataframe;
108    }
109
110    pub fn table_provider(self) -> Arc<dyn TableProvider> {
111        return Arc::new(self.dataframe);
112    }
113}
114
115impl AsRef<MemTable> for DataFrame {
116    fn as_ref(&self) -> &MemTable {
117        &self.dataframe
118    }
119}
120
121/// Building a polars dataframe into a arrow's table-compatible dataframe.
122pub struct ArrowDataFrameBuilder(pub TableBuilder);
123
124impl ObjectBuilder for ArrowDataFrameBuilder {
125    fn sealed(&self) -> bool {
126        self.0.sealed()
127    }
128
129    fn set_sealed(&mut self, sealed: bool) {
130        self.0.set_sealed(sealed)
131    }
132}
133
134impl ObjectBase for ArrowDataFrameBuilder {
135    fn build(&mut self, client: &mut IPCClient) -> Result<()> {
136        self.0.build(client)
137    }
138
139    fn seal(self, client: &mut IPCClient) -> Result<Box<dyn Object>> {
140        let table = downcast_object::<Table>(self.0.seal(client)?)?;
141        return DataFrame::new_boxed(table.metadata());
142    }
143}
144
145impl ArrowDataFrameBuilder {
146    pub fn new(client: &mut IPCClient, batches: &[RecordBatch]) -> Result<Self> {
147        assert!(
148            !batches.is_empty(),
149            "cannot build a dataframe from empty record batch collections"
150        );
151        return Ok(ArrowDataFrameBuilder(TableBuilder::new(
152            client,
153            batches[0].schema().as_ref(),
154            batches,
155        )?));
156    }
157}