Skip to main content

winterbaume_sqlengine_duckdb/
redshift.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::{Arc, Mutex};
4
5use duckdb::Connection;
6use winterbaume_redshiftdata::backend::{RedshiftQueryBackend, StatementResult};
7
8pub struct DuckDbRedshiftQueryBackend {
9    conn: Arc<Mutex<Connection>>,
10}
11
12impl DuckDbRedshiftQueryBackend {
13    /// Create a backend that executes queries against the given shared DuckDB
14    /// connection.  The caller retains an `Arc<Mutex<Connection>>` handle and
15    /// can use it to seed the database or share it across multiple backends.
16    ///
17    /// Each query briefly locks the mutex to call [`Connection::try_clone`],
18    /// which creates a lightweight handle to the same underlying database.
19    pub fn new(conn: Arc<Mutex<Connection>>) -> Self {
20        Self { conn }
21    }
22}
23
24fn run_sql(conn: &Connection, sql: &str) -> StatementResult {
25    let duckdb_sql =
26        papera::transpile(sql, papera::SourceDialect::Redshift).unwrap_or_else(|_| sql.to_string());
27    match crate::exec::execute_duckdb_sql(conn, &duckdb_sql) {
28        Ok(r) => StatementResult {
29            columns: r.columns,
30            rows: r.rows,
31            error: None,
32        },
33        Err(e) => StatementResult {
34            columns: vec![],
35            rows: vec![],
36            error: Some(e),
37        },
38    }
39}
40
41impl RedshiftQueryBackend for DuckDbRedshiftQueryBackend {
42    fn execute_statement(
43        &self,
44        sql: String,
45    ) -> Pin<Box<dyn Future<Output = StatementResult> + Send>> {
46        let conn = self
47            .conn
48            .lock()
49            .expect("DuckDB connection mutex poisoned")
50            .try_clone()
51            .expect("failed to clone DuckDB connection");
52        Box::pin(async move { run_sql(&conn, &sql) })
53    }
54
55    fn batch_execute(
56        &self,
57        sqls: Vec<String>,
58    ) -> Pin<Box<dyn Future<Output = StatementResult> + Send>> {
59        let conn = self
60            .conn
61            .lock()
62            .expect("DuckDB connection mutex poisoned")
63            .try_clone()
64            .expect("failed to clone DuckDB connection");
65        Box::pin(async move {
66            let mut last = StatementResult::default();
67            for sql in &sqls {
68                let result = run_sql(&conn, sql);
69                if result.error.is_some() {
70                    return result;
71                }
72                last = result;
73            }
74            last
75        })
76    }
77}