use duckdb::arrow::array::{Array, UInt32Array, UInt8Array};
use duckdb::arrow::record_batch::RecordBatch;
use duckdb::Connection;
use polars::frame::DataFrame;
use polars::prelude::NamedFrom;
use polars::series::Series;
use pregel_rs::pregel::Column;
use rayon::iter::ParallelIterator;
use rayon::prelude::IntoParallelIterator;
use std::path::Path;
use strum::IntoEnumIterator;
use wikidata_rs::dtype::DataType;
use super::Backend;
pub struct DuckDB;
impl Backend for DuckDB {
fn import(path: &str) -> Result<DataFrame, String> {
let stmt = DataType::iter()
.map(|dtype| {
format!(
"SELECT src_id, property_id, dst_id, CAST({:} AS UTINYINT) FROM {:}",
u8::from(&dtype),
dtype.as_ref()
)
})
.collect::<Vec<String>>()
.join(" UNION ");
let connection: Connection = match Path::new(path).try_exists() {
Ok(true) => match Connection::open(Path::new(path)) {
Ok(connection) => connection,
Err(_) => return Err(String::from("Cannot connect to the database")),
},
_ => return Err(String::from("Make sure you provide an existing path")),
};
let mut statement = match connection.prepare(stmt.as_ref()) {
Ok(statement) => statement,
Err(error) => return Err(format!("Cannot prepare the provided statement {}", error)),
};
let batches: Vec<RecordBatch> = match statement.query_arrow([]) {
Ok(arrow) => arrow.collect(),
Err(_) => return Err(String::from("Error executing the Arrow query")),
};
Ok(batches
.into_par_iter()
.map(|batch| {
match DataFrame::new(vec![
Series::new(
Column::Src.as_ref(),
batch
.column(0)
.as_any()
.downcast_ref::<UInt32Array>()
.unwrap()
.values(),
),
Series::new(
Column::Custom("property_id").as_ref(),
batch
.column(1)
.as_any()
.downcast_ref::<UInt32Array>()
.unwrap()
.values(),
),
Series::new(
Column::Dst.as_ref(),
batch
.column(2)
.as_any()
.downcast_ref::<UInt32Array>()
.unwrap()
.values(),
),
Series::new(
Column::Custom("dtype").as_ref(),
batch
.column(3)
.as_any()
.downcast_ref::<UInt8Array>()
.unwrap()
.values(),
),
]) {
Ok(tmp_dataframe) => tmp_dataframe,
Err(_) => DataFrame::empty(),
}
})
.reduce(DataFrame::empty, |acc, e| acc.vstack(&e).unwrap()))
}
fn export(_path: &str, _df: DataFrame) -> Result<(), String> {
todo!()
}
}