polars_sqlite/
lib.rs

1#![doc(html_root_url = "https://docs.rs/polars-sqlite/0.3.9")]
2//! Rust sqlite3 traits for polars dataframe
3//!
4
5use std::error::Error;
6use polars::{series::Series, prelude::{ChunkApply}}; // , NamedFrom
7use polars::prelude::{DataFrame, AnyValue, Schema, DataType}; // , Field
8use anyvalue_dataframe::{from_any, to_any};
9use anyvalue_dataframe::{row_schema, named_schema};
10use sqlite;
11
12use itertools::Itertools;
13use iter_tuple::{struct_derive, tuple_sqlite3, tuple_derive};
14
15/// trait ToSqlite3ValueVec
16pub trait ToSqlite3ValueVec {
17  ///
18  fn to_sqlite3_vec(&self) -> Vec<(&'_ str, sqlite::Value)>;
19}
20
21/// trait IntoAnyValueVec
22pub trait IntoAnyValueVec<'a> {
23  ///
24  fn into_vec(self) -> Vec<AnyValue<'a>>;
25}
26
27/// create DataFrame from sl3
28/// - n: &amp;Vec&lt;&amp;str&gt;
29/// - p: &amp;[(&amp;str, sqlite::Value)]
30/// - f: FnMut(&amp;'a sqlite::Row) -> Vec&lt;AnyValue&lt;'_&gt;&gt;
31/// - df_from_sl3("db.sl3", n, "select * from tbl;", p, f).expect("df")
32pub fn df_from_sl3<F>(dbn: &str, n: &Vec<&str>,
33  qry: &str, p: &[(&str, sqlite::Value)],
34  mut f: F) -> Result<DataFrame, Box<dyn Error>>
35  where F: for<'a> FnMut(&'a sqlite::Row) -> Vec<AnyValue<'_>> {
36  let cn = sqlite::open(dbn)?;
37  let stmt = cn.prepare(qry)?;
38  let rows: Vec<sqlite::Row> =
39    stmt.into_iter().bind::<&[(_, sqlite::Value)]>(p)?
40    .map(|row| row.expect("row")).collect();
41  if rows.len() == 0 { return Ok(DataFrame::new(Vec::<Series>::new())?) }
42  // convert row as sqlite::Row to polars::frame::row::Row
43  let rows: Vec<polars::frame::row::Row> = rows.iter().map(|row|
44    row_schema(f(row))).collect();
45  let schema = Schema::from(&rows[0]);
46  let mut df = DataFrame::from_rows_iter_and_schema(rows.iter(), &schema)
47  .expect("create DataFrame"); // noname
48  df.set_column_names(&n).expect("set column names");
49  Ok(df)
50}
51
52/// create DataFrame from sl3 with column names and DataTypes
53/// - n: &amp;Vec&lt;&amp;str&gt;
54/// - t: &amp;Vec&lt;DataType&gt;
55/// - p: &amp;[(&amp;str, sqlite::Value)]
56/// - f: FnMut(&amp;'a sqlite::Row) -> Vec&lt;AnyValue&lt;'_&gt;&gt;
57/// - df_from_sl3_type("db.sl3", n, t, "select * from tbl;", p, f).expect("df")
58pub fn df_from_sl3_type<F>(dbn: &str, n: &Vec<&str>, t: &Vec<DataType>,
59  qry: &str, p: &[(&str, sqlite::Value)],
60  mut f: F) -> Result<DataFrame, Box<dyn Error>>
61  where F: for<'a> FnMut(&'a sqlite::Row) -> Vec<AnyValue<'_>> {
62  let sels = n.iter().enumerate().map(|(i, n)|
63    // Series::new(n, &Vec::<AnyValue<'_>>::new()) // Int32 default (NamedFrom)
64    // Series::from_any_values(n, &vec![]).expect("series") // Int32 default
65    Series::from_any_values_and_dtype(n, &vec![], &t[i]).expect("series")
66  ).collect::<Vec<_>>();
67  let mut df = DataFrame::new(sels)?;
68  // println!("{:?}", named_schema(&df, n.clone()));
69  let cn = sqlite::open(dbn)?;
70  let stmt = cn.prepare(qry)?;
71  df = stmt.into_iter().bind::<&[(_, sqlite::Value)]>(p)?
72  .map(|row| row.expect("row")).map(|row| {
73    // convert row as sqlite::Row to polars::frame::row::Row
74    // create temporary single element vec as single row
75    let rows = vec![row_schema(f(&row))];
76    let schema = Schema::from(&rows[0]);
77    let mut s_df = DataFrame::from_rows_iter_and_schema(rows.iter(), &schema)
78    .expect("create temporary DataFrame"); // noname
79    s_df.set_column_names(&n).expect("set column names");
80    s_df
81  }).fold(df, |s, a| s.vstack(&a).expect("vstack"));
82  // }).reduce(|s, a| s.vstack(&a).expect("vstack")).expect("reduce"); // len>0
83  Ok(df)
84}
85
86/// cols
87/// - an: (bool, usize) = (when bool is true, skip usize number)
88pub fn sl3_cols(n: &Vec<&str>, an: (bool, usize)) -> String {
89  n.iter().enumerate().map(|(i, s)|
90    if an.0 && an.1 == i { None } else { Some(format!("{}", s)) }
91  ).filter_map(|s| s).collect::<Vec<_>>().join(", ")
92}
93
94/// tags
95/// - an: (bool, usize) = (when bool is true, skip usize number)
96pub fn sl3_tags(n: &Vec<&str>, an: (bool, usize)) -> String {
97  n.iter().enumerate().map(|(i, s)|
98    if an.0 && an.1 == i { None } else { Some(format!(":{}", s)) }
99  ).filter_map(|s| s).collect::<Vec<_>>().join(", ")
100}
101
102/// insert row
103pub fn sl3_insert_row<T>(cn: &sqlite::Connection, qry: &str,
104  row: &T, an: (bool, usize)) -> Result<(), Box<dyn Error>>
105  where T: ToSqlite3ValueVec {
106  let mut stmt = cn.prepare(qry)?;
107  let mut s = row.to_sqlite3_vec();
108  if an.0 { s.remove(an.1); } // will panic when out of index range
109  stmt.bind_iter::<_, (_, sqlite::Value)>(s)?;
110  stmt.next()?;
111  Ok(())
112}
113
114/// insert
115/// - an: (bool, usize) = (when bool is true, skip usize number)
116pub fn sl3_insert<T>(dbn: &str, qry: &str,
117  v: &Vec<T>, an: (bool, usize)) -> Result<(), Box<dyn Error>>
118  where T: ToSqlite3ValueVec {
119  let cn = sqlite::open(dbn)?;
120  for row in v.iter() { sl3_insert_row::<T>(&cn, qry, row, an)? }
121  Ok(())
122}
123
124/// insert df
125pub fn sl3_insert_df<'a, F, G>(dbn: &str, qry: &str,
126  df: &DataFrame, an: (bool, usize), mut f: F, mut g: G) ->
127  Result<(), Box<dyn Error>> where
128  F: FnMut(&sqlite::Connection, &str, &Vec<AnyValue<'_>>, (bool, usize)) ->
129    Result<(), Box<dyn Error>>,
130  G: FnMut() -> polars::frame::row::Row<'a> {
131  let cn = sqlite::open(dbn)?;
132  let mut row = g();
133  for i in (0..df.height()).into_iter() {
134    df.get_row_amortized(i, &mut row);
135    f(&cn, qry, &row.0, an)?
136  }
137  Ok(())
138}
139
140/// asgns
141pub fn sl3_asgns(n: &Vec<&str>) -> String {
142  n.iter().map(|s| format!("{}=:{}", s, s)).collect::<Vec<_>>().join(", ")
143}
144
145/// kvs
146pub fn sl3_kvs<'a, T>(row: &'a T, pick: &Vec<&str>) ->
147  Vec<(&'a str, sqlite::Value)> where T: ToSqlite3ValueVec {
148  let mut s = row.to_sqlite3_vec();
149  for i in (0..s.len()).rev() { // rev index never out of range
150    if !pick.contains(&&s[i].0[1..]) { s.remove(i); }
151  }
152  s
153}
154
155/// update row
156pub fn sl3_update_row(cn: &sqlite::Connection, qry: &str,
157  p: Vec<(&str, sqlite::Value)>) -> Result<(), Box<dyn Error>> {
158  let mut stmt = cn.prepare(qry)?;
159  stmt.bind_iter::<_, (_, sqlite::Value)>(p)?;
160  stmt.next()?;
161  Ok(())
162}
163
164/// update
165/// - f: |cn, qry, row, pick| { sl3_update_row(cn, qry, sl3_kvs(row, pick)) }
166pub fn sl3_update<T, F>(dbn: &str, qry: &str,
167  v: &Vec<T>, pick: &Vec<&str>, mut f: F) -> Result<(), Box<dyn Error>> where
168  T: ToSqlite3ValueVec,
169  F: FnMut(&sqlite::Connection, &str, &T, &Vec<&str>) ->
170    Result<(), Box<dyn Error>> {
171  let cn = sqlite::open(dbn)?;
172  for row in v.iter() { f(&cn, qry, row, pick)? }
173  Ok(())
174}
175
176/// update df
177pub fn sl3_update_df<'a, F, G>(dbn: &str, qry: &str,
178  df: &DataFrame, pick: &Vec<&str>, mut f: F, mut g: G) ->
179  Result<(), Box<dyn Error>> where
180  F: FnMut(&sqlite::Connection, &str, &Vec<AnyValue<'_>>, &Vec<&str>) ->
181    Result<(), Box<dyn Error>>,
182  G: FnMut() -> polars::frame::row::Row<'a> {
183  let cn = sqlite::open(dbn)?;
184  let mut row = g();
185  for i in (0..df.height()).into_iter() {
186    df.get_row_amortized(i, &mut row);
187    f(&cn, qry, &row.0, pick)?
188  }
189  Ok(())
190}
191
192/// to sqlite3 value
193#[macro_export]
194macro_rules! to_sl3 {
195  (Int64, $v: expr) => { $v.into() };
196  (Int32, $v: expr) => { ($v as i64).into() };
197  (Int16, $v: expr) => { ($v as i64).into() };
198  (Int8, $v: expr) => { ($v as i64).into() };
199  (UInt64, $v: expr) => { ($v as i64).into() };
200  (UInt32, $v: expr) => { ($v as i64).into() };
201  (UInt16, $v: expr) => { ($v as i64).into() };
202  (UInt8, $v: expr) => { ($v as i64).into() };
203  (Float64, $v: expr) => { $v.into() }; // Decimal in polars latest
204  (Float32, $v: expr) => { ($v as f64).into() }; // Decimal in polars latest
205  (Utf8, $v: expr) => { $v.into() }; // polars version 0.25.1
206  (String, $v: expr) => { $v.into() }; // polars latest
207  (Boolean, $v: expr) => { (if $v {"T"} else {"F"}).into() };
208  (Binary, $v: expr) => { (&$v[..]).into() };
209  (Null, $v: expr) => { $v.into() }; // must check later
210  (Unknown, $v: expr) => { $v.into() }; // must check later
211  ($v: expr) => { $v.into() } // must check later
212}
213// pub to_sl3;
214
215/// for tester
216/// - create table tbl (
217/// -   id integer primary key autoincrement,
218/// -   b varchar(1),
219/// -   u blob,
220/// -   s text(16),
221/// -   i integer,
222/// -   f float
223/// - );
224/// - insert into tbl (b, u, s, i, f) values ("T", x'ee55', 'abc', -1, 2.0);
225/// - insert into tbl (b, u, s, i, f) values ("F", x'2324', 'XYZ', 2, -1.0);
226/// - select typeof(b), typeof(u), typeof(s), typeof(i), typeof(f) from tbl;
227/// - text|blob|text|integer|real
228#[struct_derive((id, b, u, s, i, f),
229  (UInt64, Boolean, Binary, Utf8, Int8, Float32))]
230#[tuple_sqlite3(UInt64, Boolean, Binary, Utf8, Int8, Float32)]
231#[tuple_derive(UInt64, Boolean, Binary, Utf8, Int8, Float32)]
232pub type Tester<'a> = (u64, bool, Vec<u8>, &'a str, i8, f32);
233
234/// tester
235/// - select * from tbl;
236pub fn tester(dbn: &str) -> Result<(), Box<dyn Error>> {
237  let n = StTester::members();
238  let t = StTester::types();
239  let qry = "select * from tbl where id > :id;";
240  let p = vec![(":id", 0.into())];
241/**/
242  let df = df_from_sl3(dbn, &n, qry, &p,
243    |row| StTester::from(row).into_vec());
244  let mut df = df.expect("read DataFrame");
245  tester_sub(&n, &mut df, "via StTester")?;
246/**/
247/**/
248  let df = df_from_sl3_type(dbn, &n, &t, qry, &p,
249    |row| StTester::from(row).into_vec());
250  let mut df = df.expect("read DataFrame");
251  tester_sub(&n, &mut df, "via StTester with names and types")?;
252/**/
253/**/
254  let df = df_from_sl3(dbn, &n, qry, &p,
255    |row| RecTester::from(row).into_iter().collect());
256  let mut df = df.expect("read DataFrame");
257  tester_sub(&n, &mut df, "via RecTester")?;
258/**/
259/**/
260  let df = df_from_sl3_type(dbn, &n, &t, qry, &p,
261    |row| RecTester::from(row).into_iter().collect());
262  let mut df = df.expect("read DataFrame");
263  tester_sub(&n, &mut df, "via RecTester with names and types")?;
264/**/
265  Ok(())
266}
267
268/// tester sub
269pub fn tester_sub(n: &Vec<&str>, df: &mut DataFrame, inf: &str) ->
270  Result<(), Box<dyn Error>> {
271  println!("tester: {}", inf);
272
273  let sc = named_schema(&df, n.to_vec());
274  println!("{:?}", sc);
275  println!("{}", df);
276
277  let columns = df.get_columns();
278  let column_3 = columns[3].utf8().expect("s as str"); // ChunkedArray
279  let series_s = Series::from(column_3.apply_with_idx(|(i, s)|
280    // std::borrow::Cow::Borrowed(s) // through
281    format!("{}{:?}", s, (0..2).into_iter().map(|j|
282      i + j).collect_tuple::<(usize, usize)>().expect("tuple")).into()
283  ));
284  df.replace("s", series_s).expect("replace df is_ok");
285  println!("{}", df);
286
287  Ok(())
288}
289
290/// tests
291#[cfg(test)]
292mod tests {
293  use super::*;
294
295  /// [-- --nocapture] [-- --show-output]
296  #[test]
297  fn test_polars_sqlite() {
298    assert_eq!(RecTester::types(), StTester::types());
299    assert_eq!(tester("./res/test_sqlite3_read.sl3").expect("tester"), ());
300  }
301}