queryer_sql_polars/
lib.rs1use anyhow::{anyhow, Result};
2use convert::Sql;
3use fetcher::retrieve_data;
4use loader::detect_content;
5use polars::{
6 chunked_array::ops::SortMultipleOptions,
7 frame::DataFrame,
8 io::{csv::write::CsvWriter, SerWriter},
9 lazy::frame::IntoLazy,
10};
11use sqlparser::parser::Parser;
12use tracing::info;
13
14mod convert;
15mod dialect;
16mod fetcher;
17mod loader;
18
19use std::ops::{Deref, DerefMut};
20
21pub use dialect::TyrDialect;
22
23#[derive(Debug)]
24pub struct DataSet(DataFrame);
25
26impl Deref for DataSet {
28 type Target = DataFrame;
29
30 fn deref(&self) -> &Self::Target {
31 &self.0
32 }
33}
34
35impl DerefMut for DataSet {
37 fn deref_mut(&mut self) -> &mut Self::Target {
38 &mut self.0
39 }
40}
41
42impl DataSet {
43 pub fn to_csv(&mut self) -> Result<String> {
45 let mut buf = Vec::new();
46 let mut writer = CsvWriter::new(&mut buf);
47 writer.finish(self)?;
48 Ok(String::from_utf8(buf)?)
49 }
50}
51
52pub async fn query<T: AsRef<str>>(sql: T, suffix: &str) -> Result<DataSet> {
54 let ast = Parser::parse_sql(&TyrDialect::default(), sql.as_ref())?;
55
56 if ast.len() != 1 {
57 return Err(anyhow!("Only support single sql at the moment"));
58 }
59
60 let sql = &ast[0];
61
62 let Sql {
63 source,
64 condition,
65 selection,
66 offset,
67 limit,
68 order_by,
69 } = sql.try_into()?;
70
71 info!("retrieving data from source: {}", source);
72
73 let data = retrieve_data(source.to_string()).await?;
75 let ds = detect_content(data, suffix)?.load()?;
76
77 let mut filtered = match condition {
78 Some(expr) => ds.0.lazy().filter(expr),
79 None => ds.0.lazy(),
80 };
81
82 filtered = order_by.into_iter().fold(filtered, |acc, (col, desc)| {
83 acc.sort(
84 [&col],
85 SortMultipleOptions::new().with_order_descending(desc),
86 )
87 });
88
89 if offset.is_some() || limit.is_some() {
90 filtered = filtered.slice(offset.unwrap_or(0), limit.unwrap_or(1000) as u32);
91 }
92
93 Ok(DataSet(filtered.select(selection).collect()?))
94}