scirs2_io/database/
mod.rs

1//! Database connectivity for scientific data
2//!
3//! Provides interfaces for reading and writing scientific data to various
4//! database systems, including SQL and NoSQL databases.
5
6#![allow(dead_code)]
7#![allow(missing_docs)]
8#![allow(clippy::too_many_arguments)]
9
10use crate::error::{IoError, Result};
11use crate::metadata::Metadata;
12use scirs2_core::ndarray::{Array1, Array2, ArrayView2};
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15
16// Re-export database implementations
17#[cfg(feature = "sqlite")]
18pub mod sqlite;
19
20#[cfg(feature = "postgres")]
21pub mod postgres;
22
23#[cfg(feature = "mysql")]
24pub mod mysql;
25
26#[cfg(feature = "duckdb")]
27pub mod duckdb;
28
29// Connection pooling
30pub mod pool;
31
32// Specialized modules
33pub mod bulk;
34pub mod timeseries;
35
36// Re-export commonly used types
37pub use self::pool::ConnectionPool;
38
39/// Supported database types
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
41pub enum DatabaseType {
42    /// PostgreSQL database
43    PostgreSQL,
44    /// MySQL/MariaDB database
45    MySQL,
46    /// SQLite database
47    SQLite,
48    /// MongoDB (NoSQL)
49    MongoDB,
50    /// InfluxDB (Time series)
51    InfluxDB,
52    /// Redis (Key-value)
53    Redis,
54    /// Cassandra (Wide column)
55    Cassandra,
56    /// DuckDB (Analytical)
57    DuckDB,
58}
59
60/// Database connection configuration
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct DatabaseConfig {
63    /// The type of database (SQLite, PostgreSQL, etc.)
64    pub db_type: DatabaseType,
65    /// Host address for remote databases
66    pub host: Option<String>,
67    /// Port number for database connection
68    pub port: Option<u16>,
69    /// Database name or file path
70    pub database: String,
71    /// Username for authentication
72    pub username: Option<String>,
73    /// Password for authentication
74    pub password: Option<String>,
75    /// Additional connection options
76    pub options: HashMap<String, String>,
77}
78
79impl DatabaseConfig {
80    /// Create a new database configuration
81    pub fn new(db_type: DatabaseType, database: impl Into<String>) -> Self {
82        Self {
83            db_type,
84            host: None,
85            port: None,
86            database: database.into(),
87            username: None,
88            password: None,
89            options: HashMap::new(),
90        }
91    }
92
93    /// Set host
94    pub fn host(mut self, host: impl Into<String>) -> Self {
95        self.host = Some(host.into());
96        self
97    }
98
99    /// Set port
100    pub fn port(mut self, port: u16) -> Self {
101        self.port = Some(port);
102        self
103    }
104
105    /// Set credentials
106    pub fn credentials(mut self, username: impl Into<String>, password: impl Into<String>) -> Self {
107        self.username = Some(username.into());
108        self.password = Some(password.into());
109        self
110    }
111
112    /// Add connection option
113    pub fn option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
114        self.options.insert(key.into(), value.into());
115        self
116    }
117
118    /// Build connection string
119    pub fn connection_string(&self) -> String {
120        match self.db_type {
121            DatabaseType::PostgreSQL => {
122                let host = self.host.as_deref().unwrap_or("localhost");
123                let port = self.port.unwrap_or(5432);
124                let user = self.username.as_deref().unwrap_or("postgres");
125                format!(
126                    "postgresql://{}:password@{}:{}/{}",
127                    user, host, port, self.database
128                )
129            }
130            DatabaseType::MySQL => {
131                let host = self.host.as_deref().unwrap_or("localhost");
132                let port = self.port.unwrap_or(3306);
133                let user = self.username.as_deref().unwrap_or("root");
134                format!(
135                    "mysql://{}:password@{}:{}/{}",
136                    user, host, port, self.database
137                )
138            }
139            DatabaseType::SQLite => {
140                format!("sqlite://{}", self.database)
141            }
142            DatabaseType::MongoDB => {
143                let host = self.host.as_deref().unwrap_or("localhost");
144                let port = self.port.unwrap_or(27017);
145                format!("mongodb://{}:{}/{}", host, port, self.database)
146            }
147            _ => format!("{}://{}", self.db_type.as_str(), self.database),
148        }
149    }
150}
151
152impl DatabaseType {
153    fn as_str(&self) -> &'static str {
154        match self {
155            Self::PostgreSQL => "postgresql",
156            Self::MySQL => "mysql",
157            Self::SQLite => "sqlite",
158            Self::MongoDB => "mongodb",
159            Self::InfluxDB => "influxdb",
160            Self::Redis => "redis",
161            Self::Cassandra => "cassandra",
162            Self::DuckDB => "duckdb",
163        }
164    }
165}
166
167/// Database query builder
168pub struct QueryBuilder {
169    pub(crate) query_type: QueryType,
170    pub(crate) table: String,
171    pub(crate) columns: Vec<String>,
172    pub(crate) conditions: Vec<String>,
173    pub(crate) values: Vec<serde_json::Value>,
174    pub(crate) order_by: Option<String>,
175    pub(crate) limit: Option<usize>,
176    pub(crate) offset: Option<usize>,
177}
178
179#[derive(Debug, Clone)]
180#[allow(dead_code)]
181pub(crate) enum QueryType {
182    Select,
183    Insert,
184    Update,
185    Delete,
186    CreateTable,
187}
188
189impl QueryBuilder {
190    /// Create a SELECT query
191    pub fn select(table: impl Into<String>) -> Self {
192        Self {
193            query_type: QueryType::Select,
194            table: table.into(),
195            columns: vec!["*".to_string()],
196            conditions: Vec::new(),
197            values: Vec::new(),
198            order_by: None,
199            limit: None,
200            offset: None,
201        }
202    }
203
204    /// Create an INSERT query
205    pub fn insert(table: impl Into<String>) -> Self {
206        Self {
207            query_type: QueryType::Insert,
208            table: table.into(),
209            columns: Vec::new(),
210            conditions: Vec::new(),
211            values: Vec::new(),
212            order_by: None,
213            limit: None,
214            offset: None,
215        }
216    }
217
218    /// Specify columns
219    pub fn columns(mut self, columns: Vec<impl Into<String>>) -> Self {
220        self.columns = columns.into_iter().map(|c| c.into()).collect();
221        self
222    }
223
224    /// Add WHERE condition
225    pub fn where_clause(mut self, condition: impl Into<String>) -> Self {
226        self.conditions.push(condition.into());
227        self
228    }
229
230    /// Add values for INSERT
231    pub fn values(mut self, values: Vec<serde_json::Value>) -> Self {
232        self.values = values;
233        self
234    }
235
236    /// Set ORDER BY
237    pub fn order_by(mut self, column: impl Into<String>, desc: bool) -> Self {
238        self.order_by = Some(format!(
239            "{} {}",
240            column.into(),
241            if desc { "DESC" } else { "ASC" }
242        ));
243        self
244    }
245
246    /// Set LIMIT
247    pub fn limit(mut self, limit: usize) -> Self {
248        self.limit = Some(limit);
249        self
250    }
251
252    /// Set OFFSET
253    pub fn offset(mut self, offset: usize) -> Self {
254        self.offset = Some(offset);
255        self
256    }
257
258    /// Build SQL query string
259    pub fn build_sql(&self) -> String {
260        match self.query_type {
261            QueryType::Select => {
262                let mut sql = format!("SELECT {} FROM {}", self.columns.join(", "), self.table);
263
264                if !self.conditions.is_empty() {
265                    sql.push_str(&format!(" WHERE {}", self.conditions.join(" AND ")));
266                }
267
268                if let Some(order) = &self.order_by {
269                    sql.push_str(&format!(" ORDER BY {order}"));
270                }
271
272                if let Some(limit) = self.limit {
273                    sql.push_str(&format!(" LIMIT {limit}"));
274                }
275
276                if let Some(offset) = self.offset {
277                    sql.push_str(&format!(" OFFSET {offset}"));
278                }
279
280                sql
281            }
282            QueryType::Insert => {
283                format!(
284                    "INSERT INTO {} ({}) VALUES ({})",
285                    self.table,
286                    self.columns.join(", "),
287                    self.values
288                        .iter()
289                        .map(|_| "?")
290                        .collect::<Vec<_>>()
291                        .join(", ")
292                )
293            }
294            _ => String::new(),
295        }
296    }
297
298    /// Build MongoDB query
299    pub fn build_mongo(&self) -> serde_json::Value {
300        match self.query_type {
301            QueryType::Select => {
302                let mut query = serde_json::json!({});
303
304                // Convert SQL-like conditions to MongoDB query
305                for condition in &self.conditions {
306                    // Simple parsing - in real implementation would be more sophisticated
307                    if let Some((field, value)) = condition.split_once(" = ") {
308                        query[field] = serde_json::json!(value.trim_matches('\''));
309                    }
310                }
311
312                serde_json::json!({
313                    "collection": self.table,
314                    "filter": query,
315                    "limit": self.limit,
316                    "skip": self.offset,
317                })
318            }
319            _ => serde_json::json!({}),
320        }
321    }
322}
323
324/// Database result set
325#[derive(Debug, Clone)]
326pub struct ResultSet {
327    /// Column names in the result set
328    pub columns: Vec<String>,
329    /// Data rows as JSON values
330    pub rows: Vec<Vec<serde_json::Value>>,
331    /// Additional metadata about the result set
332    pub metadata: Metadata,
333}
334
335impl ResultSet {
336    /// Create new result set
337    pub fn new(columns: Vec<String>) -> Self {
338        Self {
339            columns,
340            rows: Vec::new(),
341            metadata: Metadata::new(),
342        }
343    }
344
345    /// Add a row
346    pub fn add_row(&mut self, row: Vec<serde_json::Value>) {
347        self.rows.push(row);
348    }
349
350    /// Get number of rows
351    pub fn row_count(&self) -> usize {
352        self.rows.len()
353    }
354
355    /// Get number of columns
356    pub fn column_count(&self) -> usize {
357        self.columns.len()
358    }
359
360    /// Convert to Array2<f64> if all values are numeric
361    pub fn to_array(&self) -> Result<Array2<f64>> {
362        let mut data = Vec::new();
363
364        for row in &self.rows {
365            for value in row {
366                let num = value.as_f64().ok_or_else(|| {
367                    IoError::ConversionError("Non-numeric value in result set".to_string())
368                })?;
369                data.push(num);
370            }
371        }
372
373        Array2::from_shape_vec((self.row_count(), self.column_count()), data)
374            .map_err(|e| IoError::Other(e.to_string()))
375    }
376
377    /// Get column by name as Array1
378    pub fn get_column(&self, name: &str) -> Result<Array1<f64>> {
379        let col_idx = self
380            .columns
381            .iter()
382            .position(|c| c == name)
383            .ok_or_else(|| IoError::Other(format!("Column '{name}' not found")))?;
384
385        let mut data = Vec::new();
386        for row in &self.rows {
387            let num = row[col_idx].as_f64().ok_or_else(|| {
388                IoError::ConversionError("Non-numeric value in column".to_string())
389            })?;
390            data.push(num);
391        }
392
393        Ok(Array1::from_vec(data))
394    }
395}
396
397/// Database connection trait
398pub trait DatabaseConnection: Send + Sync {
399    /// Execute a query and return results
400    fn query(&self, query: &QueryBuilder) -> Result<ResultSet>;
401
402    /// Execute a raw SQL query
403    fn execute_sql(&self, sql: &str, params: &[serde_json::Value]) -> Result<ResultSet>;
404
405    /// Insert data from Array2
406    fn insert_array(&self, table: &str, data: ArrayView2<f64>, columns: &[&str]) -> Result<usize>;
407
408    /// Create table from schema
409    fn create_table(&self, table: &str, schema: &TableSchema) -> Result<()>;
410
411    /// Check if table exists
412    fn table_exists(&self, table: &str) -> Result<bool>;
413
414    /// Get table schema
415    fn get_schema(&self, table: &str) -> Result<TableSchema>;
416}
417
418/// Table schema definition
419#[derive(Debug, Clone, Serialize, Deserialize)]
420pub struct TableSchema {
421    /// Table name
422    pub name: String,
423    /// Column definitions
424    pub columns: Vec<ColumnDef>,
425    /// Primary key column names
426    pub primary_key: Option<Vec<String>>,
427    /// Index definitions
428    pub indexes: Vec<Index>,
429}
430
431/// Column definition for database tables
432#[derive(Debug, Clone, Serialize, Deserialize)]
433pub struct ColumnDef {
434    /// Column name
435    pub name: String,
436    /// Data type of the column
437    pub data_type: DataType,
438    /// Whether the column allows NULL values
439    pub nullable: bool,
440    /// Default value for the column
441    pub default: Option<serde_json::Value>,
442}
443
444/// Database data types
445#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
446#[serde(rename_all = "lowercase")]
447pub enum DataType {
448    /// 32-bit integer
449    Integer,
450    /// 64-bit integer
451    BigInt,
452    /// 32-bit floating point
453    Float,
454    /// 64-bit floating point
455    Double,
456    /// Decimal with precision and scale
457    Decimal(u8, u8),
458    /// Variable-length character string
459    Varchar(usize),
460    /// Text string of unlimited length
461    Text,
462    /// Boolean true/false
463    Boolean,
464    /// Date value
465    Date,
466    /// Timestamp with date and time
467    Timestamp,
468    /// JSON document
469    Json,
470    Binary,
471}
472
473#[derive(Debug, Clone, Serialize, Deserialize)]
474pub struct Index {
475    pub name: String,
476    pub columns: Vec<String>,
477    pub unique: bool,
478}
479
480/// Database connector factory
481pub struct DatabaseConnector;
482
483impl DatabaseConnector {
484    /// Create a new database connection
485    pub fn connect(config: &DatabaseConfig) -> Result<Box<dyn DatabaseConnection>> {
486        match config.db_type {
487            #[cfg(feature = "sqlite")]
488            DatabaseType::SQLite => Ok(Box::new(sqlite::SQLiteConnection::new(config)?)),
489            #[cfg(not(feature = "sqlite"))]
490            DatabaseType::SQLite => Err(IoError::UnsupportedFormat(
491                "SQLite support not enabled. Enable 'sqlite' feature.".to_string(),
492            )),
493
494            #[cfg(feature = "postgres")]
495            DatabaseType::PostgreSQL => Ok(Box::new(postgres::PostgreSQLConnection::new(config)?)),
496            #[cfg(not(feature = "postgres"))]
497            DatabaseType::PostgreSQL => Err(IoError::UnsupportedFormat(
498                "PostgreSQL support not enabled. Enable 'postgres' feature.".to_string(),
499            )),
500
501            #[cfg(feature = "mysql")]
502            DatabaseType::MySQL => Ok(Box::new(mysql::MySQLConnection::new(config)?)),
503            #[cfg(not(feature = "mysql"))]
504            DatabaseType::MySQL => Err(IoError::UnsupportedFormat(
505                "MySQL support not enabled. Enable 'mysql' feature.".to_string(),
506            )),
507
508            #[cfg(feature = "duckdb")]
509            DatabaseType::DuckDB => Ok(Box::new(duckdb::DuckDBConnection::new(config)?)),
510            #[cfg(not(feature = "duckdb"))]
511            DatabaseType::DuckDB => Err(IoError::UnsupportedFormat(
512                "DuckDB support not enabled. Enable 'duckdb' feature.".to_string(),
513            )),
514
515            _ => Err(IoError::UnsupportedFormat(format!(
516                "Database type {:?} not yet implemented",
517                config.db_type
518            ))),
519        }
520    }
521}
522
523#[cfg(test)]
524mod tests {
525    use super::*;
526
527    #[test]
528    fn test_database_config() {
529        let config = DatabaseConfig::new(DatabaseType::SQLite, "test.db");
530        assert_eq!(config.db_type, DatabaseType::SQLite);
531        assert_eq!(config.database, "test.db");
532        assert_eq!(config.connection_string(), "sqlite://test.db");
533    }
534
535    #[test]
536    fn test_query_builder() {
537        let query = QueryBuilder::select("users")
538            .columns(vec!["id", "name", "email"])
539            .where_clause("age > 21")
540            .limit(10);
541
542        let sql = query.build_sql();
543        assert!(sql.contains("SELECT id, name, email FROM users"));
544        assert!(sql.contains("WHERE age > 21"));
545        assert!(sql.contains("LIMIT 10"));
546    }
547}