xdl_database/drivers/
duckdb.rs1use crate::{recordset::ColumnInfo, DatabaseError, DatabaseResult, Recordset};
4use duckdb::{params, Connection};
5use serde_json::Value as JsonValue;
6use std::sync::Mutex;
7
8pub struct DuckDBConnection {
10 conn: Option<Mutex<Connection>>,
11}
12
13impl std::fmt::Debug for DuckDBConnection {
14 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
15 f.debug_struct("DuckDBConnection")
16 .field("conn", &self.conn.is_some())
17 .finish()
18 }
19}
20
21impl DuckDBConnection {
22 pub async fn connect(connection_string: &str) -> DatabaseResult<Self> {
24 let path = connection_string
26 .trim_start_matches("duckdb://")
27 .trim_start_matches("duckdb:");
28
29 let conn = Connection::open(path).map_err(|e| {
30 DatabaseError::connection_error(format!("DuckDB connection failed: {}", e))
31 })?;
32
33 Ok(Self {
34 conn: Some(Mutex::new(conn)),
35 })
36 }
37
38 pub async fn execute(&self, query: &str) -> DatabaseResult<Recordset> {
40 let conn_mutex = self.conn.as_ref().ok_or(DatabaseError::NotConnected)?;
41 let conn = conn_mutex
42 .lock()
43 .map_err(|e| DatabaseError::query_error(format!("Lock failed: {}", e)))?;
44
45 let mut stmt = conn
46 .prepare(query)
47 .map_err(|e| DatabaseError::query_error(format!("Prepare failed: {}", e)))?;
48
49 let column_count = stmt.column_count();
50 let columns: Vec<ColumnInfo> = (0..column_count)
51 .map(|i| ColumnInfo {
52 name: stmt
53 .column_name(i)
54 .map(|s| s.to_string())
55 .unwrap_or_else(|_| "unknown".to_string()),
56 data_type: "unknown".to_string(),
57 ordinal: i,
58 })
59 .collect();
60
61 let mut rows_data = Vec::new();
62 let mut rows = stmt
63 .query(params![])
64 .map_err(|e| DatabaseError::query_error(format!("Query failed: {}", e)))?;
65
66 while let Some(row) = rows
67 .next()
68 .map_err(|e| DatabaseError::query_error(format!("Row fetch failed: {}", e)))?
69 {
70 let mut row_data = Vec::new();
71
72 for i in 0..column_count {
73 let val: Result<Option<String>, _> = row.get(i);
74 let json_val = match val {
75 Ok(Some(s)) => JsonValue::String(s),
76 Ok(None) => JsonValue::Null,
77 Err(_) => JsonValue::Null,
78 };
79 row_data.push(json_val);
80 }
81
82 rows_data.push(row_data);
83 }
84
85 Ok(Recordset::new(columns, rows_data))
86 }
87
88 pub async fn execute_command(&self, command: &str) -> DatabaseResult<u64> {
90 let conn_mutex = self.conn.as_ref().ok_or(DatabaseError::NotConnected)?;
91 let conn = conn_mutex
92 .lock()
93 .map_err(|e| DatabaseError::query_error(format!("Lock failed: {}", e)))?;
94
95 let affected = conn
96 .execute(command, params![])
97 .map_err(|e| DatabaseError::query_error(format!("Command failed: {}", e)))?;
98
99 Ok(affected as u64)
100 }
101
102 pub async fn close(&mut self) -> DatabaseResult<()> {
104 self.conn = None;
105 Ok(())
106 }
107
108 pub async fn is_connected(&self) -> bool {
110 self.conn.is_some()
111 }
112}