vineyard_datafusion/ds/
dataframe.rs1use std::sync::Arc;
16
17use arrow_array::RecordBatch;
18use arrow_schema::{Field, Schema};
19use datafusion::common::DataFusionError;
20use datafusion::datasource::memory::MemTable;
21use datafusion::datasource::TableProvider;
22
23use vineyard::client::*;
24use vineyard::ds::arrow::{Table, TableBuilder};
25use vineyard::ds::dataframe::DataFrame as VineyardDataFrame;
26
27pub fn error(error: DataFusionError) -> VineyardError {
36 VineyardError::invalid(format!("{}", error))
37}
38
39#[derive(Debug)]
40pub struct DataFrame {
41 meta: ObjectMeta,
42 dataframe: MemTable,
43}
44
45impl Default for DataFrame {
46 fn default() -> Self {
47 DataFrame {
48 meta: ObjectMeta::default(),
49 dataframe: MemTable::try_new(Arc::new(Schema::new(Vec::<Field>::new())), vec![])
50 .unwrap(),
51 }
52 }
53}
54
55impl_typename!(DataFrame, "vineyard::Table");
56
57impl Object for DataFrame {
58 fn construct(&mut self, meta: ObjectMeta) -> Result<()> {
59 let ty = meta.get_typename()?;
60 if ty == typename::<VineyardDataFrame>() {
61 return self.construct_from_pandas_dataframe(meta);
62 } else if ty == typename::<Table>() {
63 return self.construct_from_arrow_table(meta);
64 } else {
65 return Err(VineyardError::type_error(format!(
66 "cannot construct DataFrame from this metadata: {}",
67 ty
68 )));
69 }
70 }
71}
72
73register_vineyard_object!(DataFrame);
74
75impl DataFrame {
76 pub fn new_boxed(meta: ObjectMeta) -> Result<Box<dyn Object>> {
77 let mut object = Box::<Self>::default();
78 object.construct(meta)?;
79 Ok(object)
80 }
81
82 fn construct_from_pandas_dataframe(&mut self, meta: ObjectMeta) -> Result<()> {
83 vineyard_assert_typename(typename::<VineyardDataFrame>(), meta.get_typename()?)?;
84 let dataframe = downcast_object::<VineyardDataFrame>(VineyardDataFrame::new_boxed(meta)?)?;
85 let recordbatch = dataframe.recordbatch()?;
86 self.meta = dataframe.metadata();
87 self.dataframe =
88 MemTable::try_new(recordbatch.schema(), vec![vec![recordbatch]]).map_err(error)?;
89 return Ok(());
90 }
91
92 fn construct_from_arrow_table(&mut self, meta: ObjectMeta) -> Result<()> {
93 vineyard_assert_typename(typename::<Table>(), meta.get_typename()?)?;
94 let table = downcast_object::<Table>(Table::new_boxed(meta)?)?;
95 let schema = table.schema().clone();
96 let batches: Vec<Vec<RecordBatch>> = table
97 .batches()
98 .iter()
99 .map(|batch| vec![batch.as_ref().as_ref().clone()])
100 .collect();
101 self.meta = table.metadata();
102 self.dataframe = MemTable::try_new(Arc::new(schema), batches).map_err(error)?;
103 return Ok(());
104 }
105
106 pub fn table(self) -> MemTable {
107 return self.dataframe;
108 }
109
110 pub fn table_provider(self) -> Arc<dyn TableProvider> {
111 return Arc::new(self.dataframe);
112 }
113}
114
115impl AsRef<MemTable> for DataFrame {
116 fn as_ref(&self) -> &MemTable {
117 &self.dataframe
118 }
119}
120
121pub struct ArrowDataFrameBuilder(pub TableBuilder);
123
124impl ObjectBuilder for ArrowDataFrameBuilder {
125 fn sealed(&self) -> bool {
126 self.0.sealed()
127 }
128
129 fn set_sealed(&mut self, sealed: bool) {
130 self.0.set_sealed(sealed)
131 }
132}
133
134impl ObjectBase for ArrowDataFrameBuilder {
135 fn build(&mut self, client: &mut IPCClient) -> Result<()> {
136 self.0.build(client)
137 }
138
139 fn seal(self, client: &mut IPCClient) -> Result<Box<dyn Object>> {
140 let table = downcast_object::<Table>(self.0.seal(client)?)?;
141 return DataFrame::new_boxed(table.metadata());
142 }
143}
144
145impl ArrowDataFrameBuilder {
146 pub fn new(client: &mut IPCClient, batches: &[RecordBatch]) -> Result<Self> {
147 assert!(
148 !batches.is_empty(),
149 "cannot build a dataframe from empty record batch collections"
150 );
151 return Ok(ArrowDataFrameBuilder(TableBuilder::new(
152 client,
153 batches[0].schema().as_ref(),
154 batches,
155 )?));
156 }
157}