xdl_database/drivers/
duckdb.rs

1//! DuckDB database driver
2
3use crate::{recordset::ColumnInfo, DatabaseError, DatabaseResult, Recordset};
4use duckdb::{params, Connection};
5use serde_json::Value as JsonValue;
6use std::sync::Mutex;
7
8/// DuckDB connection
9pub struct DuckDBConnection {
10    conn: Option<Mutex<Connection>>,
11}
12
13impl std::fmt::Debug for DuckDBConnection {
14    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
15        f.debug_struct("DuckDBConnection")
16            .field("conn", &self.conn.is_some())
17            .finish()
18    }
19}
20
21impl DuckDBConnection {
22    /// Connect to a DuckDB database
23    pub async fn connect(connection_string: &str) -> DatabaseResult<Self> {
24        // Extract file path from connection string
25        let path = connection_string
26            .trim_start_matches("duckdb://")
27            .trim_start_matches("duckdb:");
28
29        let conn = Connection::open(path).map_err(|e| {
30            DatabaseError::connection_error(format!("DuckDB connection failed: {}", e))
31        })?;
32
33        Ok(Self {
34            conn: Some(Mutex::new(conn)),
35        })
36    }
37
38    /// Execute a SELECT query
39    pub async fn execute(&self, query: &str) -> DatabaseResult<Recordset> {
40        let conn_mutex = self.conn.as_ref().ok_or(DatabaseError::NotConnected)?;
41        let conn = conn_mutex
42            .lock()
43            .map_err(|e| DatabaseError::query_error(format!("Lock failed: {}", e)))?;
44
45        let mut stmt = conn
46            .prepare(query)
47            .map_err(|e| DatabaseError::query_error(format!("Prepare failed: {}", e)))?;
48
49        let column_count = stmt.column_count();
50        let columns: Vec<ColumnInfo> = (0..column_count)
51            .map(|i| ColumnInfo {
52                name: stmt
53                    .column_name(i)
54                    .map(|s| s.to_string())
55                    .unwrap_or_else(|_| "unknown".to_string()),
56                data_type: "unknown".to_string(),
57                ordinal: i,
58            })
59            .collect();
60
61        let mut rows_data = Vec::new();
62        let mut rows = stmt
63            .query(params![])
64            .map_err(|e| DatabaseError::query_error(format!("Query failed: {}", e)))?;
65
66        while let Some(row) = rows
67            .next()
68            .map_err(|e| DatabaseError::query_error(format!("Row fetch failed: {}", e)))?
69        {
70            let mut row_data = Vec::new();
71
72            for i in 0..column_count {
73                let val: Result<Option<String>, _> = row.get(i);
74                let json_val = match val {
75                    Ok(Some(s)) => JsonValue::String(s),
76                    Ok(None) => JsonValue::Null,
77                    Err(_) => JsonValue::Null,
78                };
79                row_data.push(json_val);
80            }
81
82            rows_data.push(row_data);
83        }
84
85        Ok(Recordset::new(columns, rows_data))
86    }
87
88    /// Execute a command
89    pub async fn execute_command(&self, command: &str) -> DatabaseResult<u64> {
90        let conn_mutex = self.conn.as_ref().ok_or(DatabaseError::NotConnected)?;
91        let conn = conn_mutex
92            .lock()
93            .map_err(|e| DatabaseError::query_error(format!("Lock failed: {}", e)))?;
94
95        let affected = conn
96            .execute(command, params![])
97            .map_err(|e| DatabaseError::query_error(format!("Command failed: {}", e)))?;
98
99        Ok(affected as u64)
100    }
101
102    /// Close the connection
103    pub async fn close(&mut self) -> DatabaseResult<()> {
104        self.conn = None;
105        Ok(())
106    }
107
108    /// Check if connected
109    pub async fn is_connected(&self) -> bool {
110        self.conn.is_some()
111    }
112}