xdl_database/
lib.rs

1//! XDL Database Connectivity Module
2//!
3//! Provides unified database access for XDL programs supporting:
4//! - PostgreSQL
5//! - MySQL
6//! - DuckDB
7//! - SQLite
8//! - ODBC (generic)
9//! - Redis
10//! - Apache Kafka (streaming)
11//!
12//! # Example
13//! ```xdl
14//! ; Create a database object
15//! objdb = OBJ_NEW('XDLdbDatabase')
16//!
17//! ; Connect to PostgreSQL
18//! conn_str = 'postgresql://user:password@localhost:5432/dbname'
19//! objdb->Connect, CONNECTION=conn_str
20//!
21//! ; Execute a query
22//! recordset = objdb->ExecuteSQL('SELECT * FROM my_table')
23//!
24//! ; Get data
25//! data = recordset->GetData()
26//!
27//! ; Cleanup
28//! recordset->Destroy()
29//! objdb->Disconnect()
30//! OBJ_DESTROY, objdb
31//! ```
32
33use std::collections::HashMap;
34use std::sync::Arc;
35use tokio::sync::RwLock;
36use xdl_core::{XdlError, XdlResult};
37
38pub mod connection;
39pub mod drivers;
40pub mod error;
41pub mod recordset;
42
43pub use connection::DatabaseConnection;
44pub use error::{DatabaseError, DatabaseResult};
45pub use recordset::Recordset;
46
47/// Database connection type
48#[derive(Debug, Clone, PartialEq)]
49pub enum DatabaseType {
50    PostgreSQL,
51    MySQL,
52    DuckDB,
53    SQLite,
54    ODBC,
55    Redis,
56    Kafka,
57    Unknown,
58}
59
60impl DatabaseType {
61    /// Parse database type from connection string
62    pub fn from_connection_string(conn_str: &str) -> Self {
63        let lower = conn_str.to_lowercase();
64
65        if lower.starts_with("postgresql://") || lower.starts_with("postgres://") {
66            DatabaseType::PostgreSQL
67        } else if lower.starts_with("mysql://") {
68            DatabaseType::MySQL
69        } else if lower.starts_with("sqlite://")
70            || lower.starts_with("sqlite:")
71            || lower.contains(".sqlite")
72            || lower.contains(":memory:")
73        {
74            DatabaseType::SQLite
75        } else if lower.starts_with("duckdb://")
76            || lower.contains(".duckdb")
77            || lower.contains(".db")
78        {
79            DatabaseType::DuckDB
80        } else if lower.starts_with("redis://") {
81            DatabaseType::Redis
82        } else if lower.starts_with("kafka://") {
83            DatabaseType::Kafka
84        } else if lower.starts_with("driver={") || lower.contains("driver=") {
85            DatabaseType::ODBC
86        } else {
87            DatabaseType::Unknown
88        }
89    }
90}
91
92/// Main database object for XDL
93#[derive(Debug)]
94pub struct XDLDatabase {
95    connection: Option<Arc<RwLock<DatabaseConnection>>>,
96    db_type: Option<DatabaseType>,
97    connection_string: Option<String>,
98    last_error: Option<String>,
99}
100
101impl XDLDatabase {
102    /// Create a new database object
103    pub fn new() -> Self {
104        Self {
105            connection: None,
106            db_type: None,
107            connection_string: None,
108            last_error: None,
109        }
110    }
111
112    /// Connect to a database using a connection string
113    pub async fn connect(&mut self, connection_string: &str) -> XdlResult<()> {
114        // Determine database type from connection string
115        let db_type = DatabaseType::from_connection_string(connection_string);
116
117        if db_type == DatabaseType::Unknown {
118            return Err(XdlError::RuntimeError(format!(
119                "Unable to determine database type from connection string: {}",
120                connection_string
121            )));
122        }
123
124        // Create connection based on type
125        let conn = DatabaseConnection::new(connection_string, db_type.clone())
126            .await
127            .map_err(|e| XdlError::RuntimeError(format!("Connection failed: {}", e)))?;
128
129        self.connection = Some(Arc::new(RwLock::new(conn)));
130        self.db_type = Some(db_type);
131        self.connection_string = Some(connection_string.to_string());
132        self.last_error = None;
133
134        Ok(())
135    }
136
137    /// Disconnect from the database
138    pub async fn disconnect(&mut self) -> XdlResult<()> {
139        if let Some(conn) = &self.connection {
140            let mut connection = conn.write().await;
141            connection
142                .close()
143                .await
144                .map_err(|e| XdlError::RuntimeError(format!("Disconnect failed: {}", e)))?;
145        }
146
147        self.connection = None;
148        self.db_type = None;
149
150        Ok(())
151    }
152
153    /// Execute a SQL query and return a recordset
154    pub async fn execute_sql(&self, query: &str) -> XdlResult<Recordset> {
155        let conn = self
156            .connection
157            .as_ref()
158            .ok_or_else(|| XdlError::RuntimeError("Not connected to database".to_string()))?;
159
160        let connection = conn.read().await;
161        let recordset = connection
162            .execute(query)
163            .await
164            .map_err(|e| XdlError::RuntimeError(format!("Query execution failed: {}", e)))?;
165
166        Ok(recordset)
167    }
168
169    /// Execute a SQL command (INSERT, UPDATE, DELETE) without returning data
170    pub async fn execute_command(&self, command: &str) -> XdlResult<u64> {
171        let conn = self
172            .connection
173            .as_ref()
174            .ok_or_else(|| XdlError::RuntimeError("Not connected to database".to_string()))?;
175
176        let connection = conn.read().await;
177        let rows_affected = connection
178            .execute_command(command)
179            .await
180            .map_err(|e| XdlError::RuntimeError(format!("Command execution failed: {}", e)))?;
181
182        Ok(rows_affected)
183    }
184
185    /// Check if connected
186    pub fn is_connected(&self) -> bool {
187        self.connection.is_some()
188    }
189
190    /// Get the database type
191    pub fn database_type(&self) -> Option<&DatabaseType> {
192        self.db_type.as_ref()
193    }
194
195    /// Get last error message
196    pub fn last_error(&self) -> Option<&str> {
197        self.last_error.as_deref()
198    }
199}
200
201impl Default for XDLDatabase {
202    fn default() -> Self {
203        Self::new()
204    }
205}
206
207/// Global database object registry for XDL object system integration
208/// Maps object IDs to database instances
209pub struct DatabaseRegistry {
210    databases: RwLock<HashMap<usize, Arc<RwLock<XDLDatabase>>>>,
211    recordsets: RwLock<HashMap<usize, Arc<RwLock<Recordset>>>>,
212    next_id: RwLock<usize>,
213}
214
215impl DatabaseRegistry {
216    /// Create a new registry
217    pub fn new() -> Self {
218        Self {
219            databases: RwLock::new(HashMap::new()),
220            recordsets: RwLock::new(HashMap::new()),
221            next_id: RwLock::new(1),
222        }
223    }
224
225    /// Register a new database object and return its ID
226    pub async fn register_database(&self, db: XDLDatabase) -> usize {
227        let mut next_id = self.next_id.write().await;
228        let id = *next_id;
229        *next_id += 1;
230
231        let mut databases = self.databases.write().await;
232        databases.insert(id, Arc::new(RwLock::new(db)));
233
234        id
235    }
236
237    /// Get a database by ID
238    pub async fn get_database(&self, id: usize) -> Option<Arc<RwLock<XDLDatabase>>> {
239        let databases = self.databases.read().await;
240        databases.get(&id).cloned()
241    }
242
243    /// Remove a database from the registry
244    pub async fn unregister_database(&self, id: usize) {
245        let mut databases = self.databases.write().await;
246        databases.remove(&id);
247    }
248
249    /// Register a recordset and return its ID
250    pub async fn register_recordset(&self, recordset: Recordset) -> usize {
251        let mut next_id = self.next_id.write().await;
252        let id = *next_id;
253        *next_id += 1;
254
255        let mut recordsets = self.recordsets.write().await;
256        recordsets.insert(id, Arc::new(RwLock::new(recordset)));
257
258        id
259    }
260
261    /// Get a recordset by ID
262    pub async fn get_recordset(&self, id: usize) -> Option<Arc<RwLock<Recordset>>> {
263        let recordsets = self.recordsets.read().await;
264        recordsets.get(&id).cloned()
265    }
266
267    /// Remove a recordset from the registry
268    pub async fn unregister_recordset(&self, id: usize) {
269        let mut recordsets = self.recordsets.write().await;
270        recordsets.remove(&id);
271    }
272}
273
274impl Default for DatabaseRegistry {
275    fn default() -> Self {
276        Self::new()
277    }
278}
279
280// Global registry instance
281lazy_static::lazy_static! {
282    pub static ref GLOBAL_DB_REGISTRY: DatabaseRegistry = DatabaseRegistry::new();
283}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288
289    #[test]
290    fn test_database_type_detection() {
291        assert_eq!(
292            DatabaseType::from_connection_string("postgresql://localhost/db"),
293            DatabaseType::PostgreSQL
294        );
295        assert_eq!(
296            DatabaseType::from_connection_string("mysql://localhost/db"),
297            DatabaseType::MySQL
298        );
299        assert_eq!(
300            DatabaseType::from_connection_string("test.duckdb"),
301            DatabaseType::DuckDB
302        );
303        assert_eq!(
304            DatabaseType::from_connection_string("redis://localhost:6379"),
305            DatabaseType::Redis
306        );
307        assert_eq!(
308            DatabaseType::from_connection_string("DRIVER={PostgreSQL};SERVER=localhost"),
309            DatabaseType::ODBC
310        );
311    }
312}