Skip to main content

winterbaume_sqlengine_duckdb/
athena.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::{Arc, Mutex};
4
5use duckdb::Connection;
6use winterbaume_athena::backend::{AthenaQueryBackend, QueryResult};
7
8pub struct DuckDbAthenaQueryBackend {
9    conn: Arc<Mutex<Connection>>,
10}
11
12impl DuckDbAthenaQueryBackend {
13    /// Create a backend that executes queries against the given shared DuckDB
14    /// connection.  The caller retains an `Arc<Mutex<Connection>>` handle and
15    /// can use it to seed the database or share it across multiple backends.
16    ///
17    /// Each query briefly locks the mutex to call [`Connection::try_clone`],
18    /// which creates a lightweight handle to the same underlying database.
19    pub fn new(conn: Arc<Mutex<Connection>>) -> Self {
20        Self { conn }
21    }
22}
23
24impl AthenaQueryBackend for DuckDbAthenaQueryBackend {
25    fn execute_query(&self, sql: String) -> Pin<Box<dyn Future<Output = QueryResult> + Send>> {
26        let conn = self
27            .conn
28            .lock()
29            .expect("DuckDB connection mutex poisoned")
30            .try_clone()
31            .expect("failed to clone DuckDB connection");
32        Box::pin(async move {
33            let duckdb_sql = papera::transpile(&sql, papera::SourceDialect::Trino).unwrap_or(sql);
34            match crate::exec::execute_duckdb_sql(&conn, &duckdb_sql) {
35                Ok(r) => QueryResult {
36                    columns: r.columns,
37                    rows: r.rows,
38                    error: None,
39                },
40                Err(e) => QueryResult {
41                    columns: vec![],
42                    rows: vec![],
43                    error: Some(e),
44                },
45            }
46        })
47    }
48}