queryer_sql_polars/
lib.rs

1use anyhow::{anyhow, Result};
2use convert::Sql;
3use fetcher::retrieve_data;
4use loader::detect_content;
5use polars::{
6    chunked_array::ops::SortMultipleOptions,
7    frame::DataFrame,
8    io::{csv::write::CsvWriter, SerWriter},
9    lazy::frame::IntoLazy,
10};
11use sqlparser::parser::Parser;
12use tracing::info;
13
14mod convert;
15mod dialect;
16mod fetcher;
17mod loader;
18
19use std::ops::{Deref, DerefMut};
20
21pub use dialect::TyrDialect;
22
23#[derive(Debug)]
24pub struct DataSet(DataFrame);
25
26/// 让 DataSet 用起来和 DataFrame 一致
27impl Deref for DataSet {
28    type Target = DataFrame;
29
30    fn deref(&self) -> &Self::Target {
31        &self.0
32    }
33}
34
35/// 让 DataSet 用起来和 DataFrame 一致
36impl DerefMut for DataSet {
37    fn deref_mut(&mut self) -> &mut Self::Target {
38        &mut self.0
39    }
40}
41
42impl DataSet {
43    /// 从 DataSet 转换成 csv
44    pub fn to_csv(&mut self) -> Result<String> {
45        let mut buf = Vec::new();
46        let mut writer = CsvWriter::new(&mut buf);
47        writer.finish(self)?;
48        Ok(String::from_utf8(buf)?)
49    }
50}
51
52/// 从 from 中获取数据,从 where 中过滤,最后选取需要返回的列
53pub async fn query<T: AsRef<str>>(sql: T, suffix: &str) -> Result<DataSet> {
54    let ast = Parser::parse_sql(&TyrDialect::default(), sql.as_ref())?;
55
56    if ast.len() != 1 {
57        return Err(anyhow!("Only support single sql at the moment"));
58    }
59
60    let sql = &ast[0];
61
62    let Sql {
63        source,
64        condition,
65        selection,
66        offset,
67        limit,
68        order_by,
69    } = sql.try_into()?;
70
71    info!("retrieving data from source: {}", source);
72
73    // 从 source 读入一个 DataSet
74    let data = retrieve_data(source.to_string()).await?;
75    let ds = detect_content(data, suffix)?.load()?;
76
77    let mut filtered = match condition {
78        Some(expr) => ds.0.lazy().filter(expr),
79        None => ds.0.lazy(),
80    };
81
82    filtered = order_by.into_iter().fold(filtered, |acc, (col, desc)| {
83        acc.sort(
84            [&col],
85            SortMultipleOptions::new().with_order_descending(desc),
86        )
87    });
88
89    if offset.is_some() || limit.is_some() {
90        filtered = filtered.slice(offset.unwrap_or(0), limit.unwrap_or(1000) as u32);
91    }
92
93    Ok(DataSet(filtered.select(selection).collect()?))
94}