winterbaume_sqlengine_duckdb/
redshift.rs1use std::future::Future;
2use std::pin::Pin;
3use std::sync::{Arc, Mutex};
4
5use duckdb::Connection;
6use winterbaume_redshiftdata::backend::{RedshiftQueryBackend, StatementResult};
7
8pub struct DuckDbRedshiftQueryBackend {
9 conn: Arc<Mutex<Connection>>,
10}
11
12impl DuckDbRedshiftQueryBackend {
13 pub fn new(conn: Arc<Mutex<Connection>>) -> Self {
20 Self { conn }
21 }
22}
23
24fn run_sql(conn: &Connection, sql: &str) -> StatementResult {
25 let duckdb_sql =
26 papera::transpile(sql, papera::SourceDialect::Redshift).unwrap_or_else(|_| sql.to_string());
27 match crate::exec::execute_duckdb_sql(conn, &duckdb_sql) {
28 Ok(r) => StatementResult {
29 columns: r.columns,
30 rows: r.rows,
31 error: None,
32 },
33 Err(e) => StatementResult {
34 columns: vec![],
35 rows: vec![],
36 error: Some(e),
37 },
38 }
39}
40
41impl RedshiftQueryBackend for DuckDbRedshiftQueryBackend {
42 fn execute_statement(
43 &self,
44 sql: String,
45 ) -> Pin<Box<dyn Future<Output = StatementResult> + Send>> {
46 let conn = self
47 .conn
48 .lock()
49 .expect("DuckDB connection mutex poisoned")
50 .try_clone()
51 .expect("failed to clone DuckDB connection");
52 Box::pin(async move { run_sql(&conn, &sql) })
53 }
54
55 fn batch_execute(
56 &self,
57 sqls: Vec<String>,
58 ) -> Pin<Box<dyn Future<Output = StatementResult> + Send>> {
59 let conn = self
60 .conn
61 .lock()
62 .expect("DuckDB connection mutex poisoned")
63 .try_clone()
64 .expect("failed to clone DuckDB connection");
65 Box::pin(async move {
66 let mut last = StatementResult::default();
67 for sql in &sqls {
68 let result = run_sql(&conn, sql);
69 if result.error.is_some() {
70 return result;
71 }
72 last = result;
73 }
74 last
75 })
76 }
77}