we_trust_sqlite/
executor.rs1use bytes::Bytes;
2use std::collections::BTreeMap;
3use std::sync::Arc;
4use yykv_types::{DsError, DsValue};
5
6type LimboConn = limbo::Connection;
8
9fn ds_value_to_limbo(value: &DsValue) -> limbo::Value {
11 match value {
12 DsValue::Null => limbo::Value::Null,
13 DsValue::Int(i) => limbo::Value::Integer(*i),
14 DsValue::Float(f) => limbo::Value::Real(*f as f64),
15 DsValue::Text(s) => limbo::Value::Text(s.clone()),
16 DsValue::Bytes(b) => limbo::Value::Blob(b.to_vec()),
17 DsValue::Bool(b) => limbo::Value::Integer(if *b { 1 } else { 0 }),
18 _ => limbo::Value::Null,
19 }
20}
21
22fn limbo_value_to_ds(value: &limbo::Value) -> DsValue {
24 match value {
25 limbo::Value::Null => DsValue::Null,
26 limbo::Value::Integer(i) => DsValue::Int(*i),
27 limbo::Value::Real(f) => DsValue::Float(*f),
28 limbo::Value::Text(s) => DsValue::Text(s.clone()),
29 limbo::Value::Blob(b) => DsValue::Bytes(Bytes::from(b.clone())),
30 }
31}
32
33type Result<T> = std::result::Result<T, DsError>;
34
35pub struct SqliteExecutor {
37 conn: Arc<LimboConn>,
38}
39
40impl SqliteExecutor {
41 pub fn new(conn: Arc<LimboConn>) -> Self {
43 Self { conn }
44 }
45
46 pub async fn execute(&self, sql: &str, params: &[DsValue]) -> Result<u64> {
48 let limbo_params: Vec<limbo::Value> = params.iter().map(ds_value_to_limbo).collect();
50 let result = self.conn.execute(sql, limbo_params).await.map_err(|e| DsError::query(e.to_string()))?;
51 Ok(result as u64)
52 }
53
54 pub async fn query(&self, sql: &str, params: &[DsValue]) -> Result<Vec<DsValue>> {
56 let limbo_params: Vec<limbo::Value> = params.iter().map(ds_value_to_limbo).collect();
58 let mut rows = self.conn.query(sql, limbo_params).await.map_err(|e| DsError::query(e.to_string()))?;
59 let mut results = Vec::new();
60
61 while let Ok(Some(row)) = rows.next().await {
62 let mut fields = BTreeMap::new();
63
64 let stmt = self.conn.prepare(sql).await.map_err(|e| DsError::query(e.to_string()))?;
66 let columns = stmt.columns();
67
68 for (i, column) in columns.iter().enumerate() {
69 let value = self.get_value(&row, i)?;
70 fields.insert(column.name().to_string(), value);
71 }
72
73 results.push(DsValue::Dict(fields));
74 }
75
76 Ok(results)
77 }
78
79 fn get_value(&self, row: &limbo::Row, index: usize) -> Result<DsValue> {
81 let value = row.get_value(index).map_err(|e| DsError::query(e.to_string()))?;
83 Ok(limbo_value_to_ds(&value))
84 }
85}