supertable_core/
polars.rs1use crate::table::Table;
2use anyhow::Result;
3use polars::prelude::*;
4
5pub struct PolarsConnector {
7 table: Table,
8}
9
10impl PolarsConnector {
11 pub fn new(table: Table) -> Self {
12 Self { table }
13 }
14
15 pub async fn scan(&self) -> Result<LazyFrame> {
17 let scanner = self.table.new_scan();
18 let tasks = scanner.plan().await?;
19
20 let mut buffer: Vec<u8> = Vec::new();
23 let schema = self.table.metadata.current_schema();
24 let arrow_schema = schema.to_arrow_schema_ref();
25
26 {
27 let mut writer = parquet::arrow::ArrowWriter::try_new(&mut buffer, arrow_schema, None)?;
28
29 let reader = crate::reader::TableReader::new(self.table.storage.clone());
30 for task in tasks {
31 let batches = reader.read_task(task).await?;
32 for batch in &batches {
33 writer.write(batch)?;
34 }
35 }
36 writer.close()?;
37 }
38
39 let cursor = std::io::Cursor::new(buffer);
41 let df = ParquetReader::new(cursor).finish()?;
42 Ok(df.lazy())
43 }
44}