1#![allow(dead_code)]
7#![allow(missing_docs)]
8#![allow(clippy::too_many_arguments)]
9
10use crate::error::{IoError, Result};
11use crate::metadata::Metadata;
12use scirs2_core::ndarray::{Array1, Array2, ArrayView2};
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15
16#[cfg(feature = "sqlite")]
18pub mod sqlite;
19
20#[cfg(feature = "postgres")]
21pub mod postgres;
22
23#[cfg(feature = "mysql")]
24pub mod mysql;
25
26#[cfg(feature = "duckdb")]
27pub mod duckdb;
28
29pub mod pool;
31
32pub mod bulk;
34pub mod timeseries;
35
36pub use self::pool::ConnectionPool;
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
41pub enum DatabaseType {
42 PostgreSQL,
44 MySQL,
46 SQLite,
48 MongoDB,
50 InfluxDB,
52 Redis,
54 Cassandra,
56 DuckDB,
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct DatabaseConfig {
63 pub db_type: DatabaseType,
65 pub host: Option<String>,
67 pub port: Option<u16>,
69 pub database: String,
71 pub username: Option<String>,
73 pub password: Option<String>,
75 pub options: HashMap<String, String>,
77}
78
79impl DatabaseConfig {
80 pub fn new(db_type: DatabaseType, database: impl Into<String>) -> Self {
82 Self {
83 db_type,
84 host: None,
85 port: None,
86 database: database.into(),
87 username: None,
88 password: None,
89 options: HashMap::new(),
90 }
91 }
92
93 pub fn host(mut self, host: impl Into<String>) -> Self {
95 self.host = Some(host.into());
96 self
97 }
98
99 pub fn port(mut self, port: u16) -> Self {
101 self.port = Some(port);
102 self
103 }
104
105 pub fn credentials(mut self, username: impl Into<String>, password: impl Into<String>) -> Self {
107 self.username = Some(username.into());
108 self.password = Some(password.into());
109 self
110 }
111
112 pub fn option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
114 self.options.insert(key.into(), value.into());
115 self
116 }
117
118 pub fn connection_string(&self) -> String {
120 match self.db_type {
121 DatabaseType::PostgreSQL => {
122 let host = self.host.as_deref().unwrap_or("localhost");
123 let port = self.port.unwrap_or(5432);
124 let user = self.username.as_deref().unwrap_or("postgres");
125 format!(
126 "postgresql://{}:password@{}:{}/{}",
127 user, host, port, self.database
128 )
129 }
130 DatabaseType::MySQL => {
131 let host = self.host.as_deref().unwrap_or("localhost");
132 let port = self.port.unwrap_or(3306);
133 let user = self.username.as_deref().unwrap_or("root");
134 format!(
135 "mysql://{}:password@{}:{}/{}",
136 user, host, port, self.database
137 )
138 }
139 DatabaseType::SQLite => {
140 format!("sqlite://{}", self.database)
141 }
142 DatabaseType::MongoDB => {
143 let host = self.host.as_deref().unwrap_or("localhost");
144 let port = self.port.unwrap_or(27017);
145 format!("mongodb://{}:{}/{}", host, port, self.database)
146 }
147 _ => format!("{}://{}", self.db_type.as_str(), self.database),
148 }
149 }
150}
151
152impl DatabaseType {
153 fn as_str(&self) -> &'static str {
154 match self {
155 Self::PostgreSQL => "postgresql",
156 Self::MySQL => "mysql",
157 Self::SQLite => "sqlite",
158 Self::MongoDB => "mongodb",
159 Self::InfluxDB => "influxdb",
160 Self::Redis => "redis",
161 Self::Cassandra => "cassandra",
162 Self::DuckDB => "duckdb",
163 }
164 }
165}
166
167pub struct QueryBuilder {
169 pub(crate) query_type: QueryType,
170 pub(crate) table: String,
171 pub(crate) columns: Vec<String>,
172 pub(crate) conditions: Vec<String>,
173 pub(crate) values: Vec<serde_json::Value>,
174 pub(crate) order_by: Option<String>,
175 pub(crate) limit: Option<usize>,
176 pub(crate) offset: Option<usize>,
177}
178
179#[derive(Debug, Clone)]
180#[allow(dead_code)]
181pub(crate) enum QueryType {
182 Select,
183 Insert,
184 Update,
185 Delete,
186 CreateTable,
187}
188
189impl QueryBuilder {
190 pub fn select(table: impl Into<String>) -> Self {
192 Self {
193 query_type: QueryType::Select,
194 table: table.into(),
195 columns: vec!["*".to_string()],
196 conditions: Vec::new(),
197 values: Vec::new(),
198 order_by: None,
199 limit: None,
200 offset: None,
201 }
202 }
203
204 pub fn insert(table: impl Into<String>) -> Self {
206 Self {
207 query_type: QueryType::Insert,
208 table: table.into(),
209 columns: Vec::new(),
210 conditions: Vec::new(),
211 values: Vec::new(),
212 order_by: None,
213 limit: None,
214 offset: None,
215 }
216 }
217
218 pub fn columns(mut self, columns: Vec<impl Into<String>>) -> Self {
220 self.columns = columns.into_iter().map(|c| c.into()).collect();
221 self
222 }
223
224 pub fn where_clause(mut self, condition: impl Into<String>) -> Self {
226 self.conditions.push(condition.into());
227 self
228 }
229
230 pub fn values(mut self, values: Vec<serde_json::Value>) -> Self {
232 self.values = values;
233 self
234 }
235
236 pub fn order_by(mut self, column: impl Into<String>, desc: bool) -> Self {
238 self.order_by = Some(format!(
239 "{} {}",
240 column.into(),
241 if desc { "DESC" } else { "ASC" }
242 ));
243 self
244 }
245
246 pub fn limit(mut self, limit: usize) -> Self {
248 self.limit = Some(limit);
249 self
250 }
251
252 pub fn offset(mut self, offset: usize) -> Self {
254 self.offset = Some(offset);
255 self
256 }
257
258 pub fn build_sql(&self) -> String {
260 match self.query_type {
261 QueryType::Select => {
262 let mut sql = format!("SELECT {} FROM {}", self.columns.join(", "), self.table);
263
264 if !self.conditions.is_empty() {
265 sql.push_str(&format!(" WHERE {}", self.conditions.join(" AND ")));
266 }
267
268 if let Some(order) = &self.order_by {
269 sql.push_str(&format!(" ORDER BY {order}"));
270 }
271
272 if let Some(limit) = self.limit {
273 sql.push_str(&format!(" LIMIT {limit}"));
274 }
275
276 if let Some(offset) = self.offset {
277 sql.push_str(&format!(" OFFSET {offset}"));
278 }
279
280 sql
281 }
282 QueryType::Insert => {
283 format!(
284 "INSERT INTO {} ({}) VALUES ({})",
285 self.table,
286 self.columns.join(", "),
287 self.values
288 .iter()
289 .map(|_| "?")
290 .collect::<Vec<_>>()
291 .join(", ")
292 )
293 }
294 _ => String::new(),
295 }
296 }
297
298 pub fn build_mongo(&self) -> serde_json::Value {
300 match self.query_type {
301 QueryType::Select => {
302 let mut query = serde_json::json!({});
303
304 for condition in &self.conditions {
306 if let Some((field, value)) = condition.split_once(" = ") {
308 query[field] = serde_json::json!(value.trim_matches('\''));
309 }
310 }
311
312 serde_json::json!({
313 "collection": self.table,
314 "filter": query,
315 "limit": self.limit,
316 "skip": self.offset,
317 })
318 }
319 _ => serde_json::json!({}),
320 }
321 }
322}
323
324#[derive(Debug, Clone)]
326pub struct ResultSet {
327 pub columns: Vec<String>,
329 pub rows: Vec<Vec<serde_json::Value>>,
331 pub metadata: Metadata,
333}
334
335impl ResultSet {
336 pub fn new(columns: Vec<String>) -> Self {
338 Self {
339 columns,
340 rows: Vec::new(),
341 metadata: Metadata::new(),
342 }
343 }
344
345 pub fn add_row(&mut self, row: Vec<serde_json::Value>) {
347 self.rows.push(row);
348 }
349
350 pub fn row_count(&self) -> usize {
352 self.rows.len()
353 }
354
355 pub fn column_count(&self) -> usize {
357 self.columns.len()
358 }
359
360 pub fn to_array(&self) -> Result<Array2<f64>> {
362 let mut data = Vec::new();
363
364 for row in &self.rows {
365 for value in row {
366 let num = value.as_f64().ok_or_else(|| {
367 IoError::ConversionError("Non-numeric value in result set".to_string())
368 })?;
369 data.push(num);
370 }
371 }
372
373 Array2::from_shape_vec((self.row_count(), self.column_count()), data)
374 .map_err(|e| IoError::Other(e.to_string()))
375 }
376
377 pub fn get_column(&self, name: &str) -> Result<Array1<f64>> {
379 let col_idx = self
380 .columns
381 .iter()
382 .position(|c| c == name)
383 .ok_or_else(|| IoError::Other(format!("Column '{name}' not found")))?;
384
385 let mut data = Vec::new();
386 for row in &self.rows {
387 let num = row[col_idx].as_f64().ok_or_else(|| {
388 IoError::ConversionError("Non-numeric value in column".to_string())
389 })?;
390 data.push(num);
391 }
392
393 Ok(Array1::from_vec(data))
394 }
395}
396
397pub trait DatabaseConnection: Send + Sync {
399 fn query(&self, query: &QueryBuilder) -> Result<ResultSet>;
401
402 fn execute_sql(&self, sql: &str, params: &[serde_json::Value]) -> Result<ResultSet>;
404
405 fn insert_array(&self, table: &str, data: ArrayView2<f64>, columns: &[&str]) -> Result<usize>;
407
408 fn create_table(&self, table: &str, schema: &TableSchema) -> Result<()>;
410
411 fn table_exists(&self, table: &str) -> Result<bool>;
413
414 fn get_schema(&self, table: &str) -> Result<TableSchema>;
416}
417
418#[derive(Debug, Clone, Serialize, Deserialize)]
420pub struct TableSchema {
421 pub name: String,
423 pub columns: Vec<ColumnDef>,
425 pub primary_key: Option<Vec<String>>,
427 pub indexes: Vec<Index>,
429}
430
431#[derive(Debug, Clone, Serialize, Deserialize)]
433pub struct ColumnDef {
434 pub name: String,
436 pub data_type: DataType,
438 pub nullable: bool,
440 pub default: Option<serde_json::Value>,
442}
443
444#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
446#[serde(rename_all = "lowercase")]
447pub enum DataType {
448 Integer,
450 BigInt,
452 Float,
454 Double,
456 Decimal(u8, u8),
458 Varchar(usize),
460 Text,
462 Boolean,
464 Date,
466 Timestamp,
468 Json,
470 Binary,
471}
472
473#[derive(Debug, Clone, Serialize, Deserialize)]
474pub struct Index {
475 pub name: String,
476 pub columns: Vec<String>,
477 pub unique: bool,
478}
479
480pub struct DatabaseConnector;
482
483impl DatabaseConnector {
484 pub fn connect(config: &DatabaseConfig) -> Result<Box<dyn DatabaseConnection>> {
486 match config.db_type {
487 #[cfg(feature = "sqlite")]
488 DatabaseType::SQLite => Ok(Box::new(sqlite::SQLiteConnection::new(config)?)),
489 #[cfg(not(feature = "sqlite"))]
490 DatabaseType::SQLite => Err(IoError::UnsupportedFormat(
491 "SQLite support not enabled. Enable 'sqlite' feature.".to_string(),
492 )),
493
494 #[cfg(feature = "postgres")]
495 DatabaseType::PostgreSQL => Ok(Box::new(postgres::PostgreSQLConnection::new(config)?)),
496 #[cfg(not(feature = "postgres"))]
497 DatabaseType::PostgreSQL => Err(IoError::UnsupportedFormat(
498 "PostgreSQL support not enabled. Enable 'postgres' feature.".to_string(),
499 )),
500
501 #[cfg(feature = "mysql")]
502 DatabaseType::MySQL => Ok(Box::new(mysql::MySQLConnection::new(config)?)),
503 #[cfg(not(feature = "mysql"))]
504 DatabaseType::MySQL => Err(IoError::UnsupportedFormat(
505 "MySQL support not enabled. Enable 'mysql' feature.".to_string(),
506 )),
507
508 #[cfg(feature = "duckdb")]
509 DatabaseType::DuckDB => Ok(Box::new(duckdb::DuckDBConnection::new(config)?)),
510 #[cfg(not(feature = "duckdb"))]
511 DatabaseType::DuckDB => Err(IoError::UnsupportedFormat(
512 "DuckDB support not enabled. Enable 'duckdb' feature.".to_string(),
513 )),
514
515 _ => Err(IoError::UnsupportedFormat(format!(
516 "Database type {:?} not yet implemented",
517 config.db_type
518 ))),
519 }
520 }
521}
522
523#[cfg(test)]
524mod tests {
525 use super::*;
526
527 #[test]
528 fn test_database_config() {
529 let config = DatabaseConfig::new(DatabaseType::SQLite, "test.db");
530 assert_eq!(config.db_type, DatabaseType::SQLite);
531 assert_eq!(config.database, "test.db");
532 assert_eq!(config.connection_string(), "sqlite://test.db");
533 }
534
535 #[test]
536 fn test_query_builder() {
537 let query = QueryBuilder::select("users")
538 .columns(vec!["id", "name", "email"])
539 .where_clause("age > 21")
540 .limit(10);
541
542 let sql = query.build_sql();
543 assert!(sql.contains("SELECT id, name, email FROM users"));
544 assert!(sql.contains("WHERE age > 21"));
545 assert!(sql.contains("LIMIT 10"));
546 }
547}