#![doc(html_root_url = "https://docs.rs/polars-sqlite/0.3.8")]
use std::error::Error;
use polars::{series::Series, prelude::{ChunkApply}}; use polars::prelude::{DataFrame, AnyValue, Schema, DataType}; use anyvalue_dataframe::{from_any, to_any};
use anyvalue_dataframe::{row_schema, named_schema};
use sqlite;
use itertools::Itertools;
use iter_tuple::{struct_derive, tuple_sqlite3, tuple_derive};
pub trait ToSqlite3ValueVec {
fn to_sqlite3_vec(&self) -> Vec<(&'_ str, sqlite::Value)>;
}
pub trait IntoAnyValueVec<'a> {
fn into_vec(self) -> Vec<AnyValue<'a>>;
}
pub fn df_from_sl3<F>(dbn: &str, n: &Vec<&str>,
qry: &str, p: &[(&str, sqlite::Value)],
mut f: F) -> Result<DataFrame, Box<dyn Error>>
where F: for<'a> FnMut(&'a sqlite::Row) -> Vec<AnyValue<'_>> {
let cn = sqlite::open(dbn)?;
let stmt = cn.prepare(qry)?;
let rows: Vec<sqlite::Row> =
stmt.into_iter().bind::<&[(_, sqlite::Value)]>(p)?
.map(|row| row.expect("row")).collect();
if rows.len() == 0 { return Ok(DataFrame::new(Vec::<Series>::new())?) }
let rows: Vec<polars::frame::row::Row> = rows.iter().map(|row|
row_schema(f(row))).collect();
let schema = Schema::from(&rows[0]);
let mut df = DataFrame::from_rows_iter_and_schema(rows.iter(), &schema)
.expect("create DataFrame"); df.set_column_names(&n).expect("set column names");
Ok(df)
}
pub fn df_from_sl3_type<F>(dbn: &str, n: &Vec<&str>, t: &Vec<DataType>,
qry: &str, p: &[(&str, sqlite::Value)],
mut f: F) -> Result<DataFrame, Box<dyn Error>>
where F: for<'a> FnMut(&'a sqlite::Row) -> Vec<AnyValue<'_>> {
let sels = n.iter().enumerate().map(|(i, n)|
Series::from_any_values_and_dtype(n, &vec![], &t[i]).expect("series")
).collect::<Vec<_>>();
let mut df = DataFrame::new(sels)?;
let cn = sqlite::open(dbn)?;
let stmt = cn.prepare(qry)?;
df = stmt.into_iter().bind::<&[(_, sqlite::Value)]>(p)?
.map(|row| row.expect("row")).map(|row| {
let rows = vec![row_schema(f(&row))];
let schema = Schema::from(&rows[0]);
let mut s_df = DataFrame::from_rows_iter_and_schema(rows.iter(), &schema)
.expect("create temporary DataFrame"); s_df.set_column_names(&n).expect("set column names");
s_df
}).fold(df, |s, a| s.vstack(&a).expect("vstack"));
Ok(df)
}
pub fn sl3_cols(n: &Vec<&str>, an: (bool, usize)) -> String {
n.iter().enumerate().map(|(i, s)|
if an.0 && an.1 == i { None } else { Some(format!("{}", s)) }
).filter_map(|s| s).collect::<Vec<_>>().join(", ")
}
pub fn sl3_tags(n: &Vec<&str>, an: (bool, usize)) -> String {
n.iter().enumerate().map(|(i, s)|
if an.0 && an.1 == i { None } else { Some(format!(":{}", s)) }
).filter_map(|s| s).collect::<Vec<_>>().join(", ")
}
pub fn sl3_insert_row<T>(cn: &sqlite::Connection, qry: &str,
row: &T, an: (bool, usize)) -> Result<(), Box<dyn Error>>
where T: ToSqlite3ValueVec {
let mut stmt = cn.prepare(qry)?;
let mut s = row.to_sqlite3_vec();
if an.0 { s.remove(an.1); } stmt.bind_iter::<_, (_, sqlite::Value)>(s)?;
stmt.next()?;
Ok(())
}
pub fn sl3_insert<T>(dbn: &str, qry: &str,
v: &Vec<T>, an: (bool, usize)) -> Result<(), Box<dyn Error>>
where T: ToSqlite3ValueVec {
let cn = sqlite::open(dbn)?;
for row in v.iter() { sl3_insert_row::<T>(&cn, qry, row, an)? }
Ok(())
}
pub fn sl3_insert_df<'a, F, G>(dbn: &str, qry: &str,
df: &DataFrame, an: (bool, usize), mut f: F, mut g: G) ->
Result<(), Box<dyn Error>> where
F: FnMut(&sqlite::Connection, &str, &Vec<AnyValue<'_>>, (bool, usize)) ->
Result<(), Box<dyn Error>>,
G: FnMut() -> polars::frame::row::Row<'a> {
let cn = sqlite::open(dbn)?;
let mut row = g();
for i in (0..df.height()).into_iter() {
df.get_row_amortized(i, &mut row);
f(&cn, qry, &row.0, an)?
}
Ok(())
}
#[struct_derive((id, b, u, s, i, f),
(UInt64, Boolean, Binary, Utf8, Int8, Float32))]
#[tuple_sqlite3(UInt64, Boolean, Binary, Utf8, Int8, Float32)]
#[tuple_derive(UInt64, Boolean, Binary, Utf8, Int8, Float32)]
pub type Tester<'a> = (u64, bool, Vec<u8>, &'a str, i8, f32);
pub fn tester(dbn: &str) -> Result<(), Box<dyn Error>> {
let n = StTester::members();
let t = StTester::types();
let qry = "select * from tbl where id > :id;";
let p = vec![(":id", 0.into())];
let df = df_from_sl3(dbn, &n, qry, &p,
|row| StTester::from(row).into_vec());
let mut df = df.expect("read DataFrame");
tester_sub(&n, &mut df, "via StTester")?;
let df = df_from_sl3_type(dbn, &n, &t, qry, &p,
|row| StTester::from(row).into_vec());
let mut df = df.expect("read DataFrame");
tester_sub(&n, &mut df, "via StTester with names and types")?;
let df = df_from_sl3(dbn, &n, qry, &p,
|row| RecTester::from(row).into_iter().collect());
let mut df = df.expect("read DataFrame");
tester_sub(&n, &mut df, "via RecTester")?;
let df = df_from_sl3_type(dbn, &n, &t, qry, &p,
|row| RecTester::from(row).into_iter().collect());
let mut df = df.expect("read DataFrame");
tester_sub(&n, &mut df, "via RecTester with names and types")?;
Ok(())
}
pub fn tester_sub(n: &Vec<&str>, df: &mut DataFrame, inf: &str) ->
Result<(), Box<dyn Error>> {
println!("tester: {}", inf);
let sc = named_schema(&df, n.to_vec());
println!("{:?}", sc);
println!("{}", df);
let columns = df.get_columns();
let column_3 = columns[3].utf8().expect("s as str"); let series_s = Series::from(column_3.apply_with_idx(|(i, s)|
format!("{}{:?}", s, (0..2).into_iter().map(|j|
i + j).collect_tuple::<(usize, usize)>().expect("tuple")).into()
));
df.replace("s", series_s).expect("replace df is_ok");
println!("{}", df);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_polars_sqlite() {
assert_eq!(RecTester::types(), StTester::types());
assert_eq!(tester("./res/test_sqlite3_read.sl3").expect("tester"), ());
}
}