Skip to main content

rustbasic_core/sql/
any.rs

1use std::sync::Arc;
2use serde_json::Value;
3use super::error::Error;
4use super::row::{AnyColumn, AnyTypeInfo, DbValue};
5pub use super::row::AnyRow;
6
7pub trait Database {}
8impl Database for Any {}
9
10pub struct Any;
11
12pub struct AnyArguments<'q> {
13    pub _marker: std::marker::PhantomData<&'q ()>,
14}
15
16pub fn install_default_drivers() {}
17
18#[derive(Clone)]
19pub enum AnyPool {
20    #[cfg(feature = "sqlite")]
21    Sqlite(Arc<SqlitePoolInner>),
22    #[cfg(feature = "mysql")]
23    MySql(crate::sql::driver::mysql::MySqlPool),
24}
25
26impl std::fmt::Debug for AnyPool {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        match self {
29            #[cfg(feature = "sqlite")]
30            AnyPool::Sqlite(inner) => f.debug_struct("SqlitePool")
31                .field("path", &inner.path)
32                .finish(),
33            #[cfg(feature = "mysql")]
34            AnyPool::MySql(_) => write!(f, "MySqlPool"),
35        }
36    }
37}
38
39#[cfg(feature = "sqlite")]
40pub struct SqlitePoolInner {
41    pub path: String,
42    pub connections: tokio::sync::Mutex<Vec<crate::sql::driver::sqlite::SqliteConnection>>,
43}
44
45pub struct SqliteConnection {
46    #[cfg(feature = "sqlite")]
47    pub conn: Option<crate::sql::driver::sqlite::SqliteConnection>,
48    #[cfg(feature = "sqlite")]
49    pub pool: Option<Arc<SqlitePoolInner>>,
50}
51
52impl std::fmt::Debug for SqliteConnection {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        write!(f, "SqliteConnection")
55    }
56}
57
58impl Drop for SqliteConnection {
59    fn drop(&mut self) {
60        #[cfg(feature = "sqlite")]
61        if let (Some(conn), Some(pool)) = (self.conn.take(), &self.pool) {
62            let pool = pool.clone();
63            tokio::spawn(async move {
64                let mut conns = pool.connections.lock().await;
65                conns.push(conn);
66            });
67        }
68    }
69}
70
71pub enum AnyConnection {
72    #[cfg(feature = "sqlite")]
73    Sqlite(SqliteConnection),
74    #[cfg(feature = "mysql")]
75    MySql(Option<crate::sql::driver::mysql::PoolConnection>),
76}
77
78impl std::fmt::Debug for AnyConnection {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        match self {
81            #[cfg(feature = "sqlite")]
82            AnyConnection::Sqlite(_) => write!(f, "AnyConnection::Sqlite"),
83            #[cfg(feature = "mysql")]
84            AnyConnection::MySql(_) => write!(f, "AnyConnection::MySql"),
85        }
86    }
87}
88
89pub struct AnyQueryResult {
90    pub rows_affected: u64,
91    pub last_insert_id: Option<i64>,
92}
93
94impl AnyQueryResult {
95    pub fn rows_affected(&self) -> u64 {
96        self.rows_affected
97    }
98
99    pub fn last_insert_id(&self) -> Option<i64> {
100        self.last_insert_id
101    }
102}
103
104#[cfg(feature = "mysql")]
105#[derive(Clone)]
106pub struct MySqlPool(pub crate::sql::driver::mysql::MySqlPool);
107
108#[cfg(feature = "mysql")]
109impl MySqlPool {
110    pub async fn connect(url: &str) -> Result<Self, Error> {
111        let parsed = parse_mysql_url(url)?;
112        let pool = crate::sql::driver::mysql::MySqlPool::new(
113            &parsed.host,
114            parsed.port,
115            &parsed.user,
116            &parsed.password,
117            &parsed.database,
118        );
119        {
120            let mut conn = pool.acquire().map_err(|e| Error::Database(e.to_string()))?;
121            let _ping = conn.execute("SELECT 1", &[]).map_err(|e| Error::Database(e.to_string()))?;
122        }
123        Ok(MySqlPool(pool))
124    }
125}
126
127impl AnyPool {
128    pub async fn connect(url: &str) -> Result<Self, Error> {
129        if url.starts_with("sqlite:") {
130            #[cfg(feature = "sqlite")]
131            {
132                let path = url.trim_start_matches("sqlite:")
133                    .split('?')
134                    .next()
135                    .unwrap_or(url);
136                if let Some(parent) = std::path::Path::new(path).parent() {
137                    let _ = std::fs::create_dir_all(parent);
138                }
139                let conn = crate::sql::driver::sqlite::SqliteConnection::connect(path)
140                    .map_err(|e| Error::Database(e.to_string()))?;
141
142                let inner = Arc::new(SqlitePoolInner {
143                    path: path.to_string(),
144                    connections: tokio::sync::Mutex::new(vec![conn]),
145                });
146                Ok(AnyPool::Sqlite(inner))
147            }
148            #[cfg(not(feature = "sqlite"))]
149            {
150                let _ = url;
151                Err(Error::Database("sqlite feature not enabled".to_string()))
152            }
153        } else if url.starts_with("mysql://") {
154            #[cfg(feature = "mysql")]
155            {
156                let parsed = parse_mysql_url(url)?;
157                let pool = crate::sql::driver::mysql::MySqlPool::new(
158                    &parsed.host,
159                    parsed.port,
160                    &parsed.user,
161                    &parsed.password,
162                    &parsed.database,
163                );
164                {
165                    let mut conn = pool.acquire().map_err(|e| Error::Database(e.to_string()))?;
166                    let _ping = conn.execute("SELECT 1", &[]).map_err(|e| Error::Database(e.to_string()))?;
167                }
168                Ok(AnyPool::MySql(pool))
169            }
170            #[cfg(not(feature = "mysql"))]
171            {
172                let _ = url;
173                Err(Error::Database(
174                    "DB_CONNECTION=mysql terdeteksi, tapi fitur mysql belum aktif.\n\
175                     Tambahkan features = [\"mysql\"] pada rustbasic-core di Cargo.toml project Anda".to_string()
176                ))
177            }
178        } else {
179            Err(Error::Database(format!("Unsupported database URL prefix: {}", url)))
180        }
181    }
182
183    pub async fn acquire(&self) -> Result<PoolConnection, Error> {
184        let conn = match self {
185            #[cfg(feature = "sqlite")]
186            AnyPool::Sqlite(pool) => {
187                let mut conns = pool.connections.lock().await;
188                let conn = if let Some(c) = conns.pop() {
189                    c
190                } else {
191                    let path = &pool.path;
192                    let c = crate::sql::driver::sqlite::SqliteConnection::connect(path)
193                        .map_err(|e| Error::Database(e.to_string()))?;
194                    c
195                };
196                AnyConnection::Sqlite(SqliteConnection {
197                    conn: Some(conn),
198                    pool: Some(pool.clone()),
199                })
200            }
201            #[cfg(feature = "mysql")]
202            AnyPool::MySql(pool) => {
203                let conn = pool.acquire()
204                    .map_err(|e| Error::Database(e.to_string()))?;
205                AnyConnection::MySql(Some(conn))
206            }
207        };
208        Ok(PoolConnection { conn })
209    }
210
211    pub fn backend_name(&self) -> &str {
212        match self {
213            #[cfg(feature = "sqlite")]
214            AnyPool::Sqlite(_) => "SQLite",
215            #[cfg(feature = "mysql")]
216            AnyPool::MySql(_) => "MySQL",
217        }
218    }
219}
220
221impl AnyConnection {
222    pub fn backend_name(&self) -> &str {
223        match self {
224            #[cfg(feature = "sqlite")]
225            AnyConnection::Sqlite(_) => "SQLite",
226            #[cfg(feature = "mysql")]
227            AnyConnection::MySql(_) => "MySQL",
228        }
229    }
230}
231
232#[derive(Debug)]
233pub struct PoolConnection {
234    pub conn: AnyConnection,
235}
236
237impl std::ops::Deref for PoolConnection {
238    type Target = AnyConnection;
239    fn deref(&self) -> &Self::Target {
240        &self.conn
241    }
242}
243
244impl std::ops::DerefMut for PoolConnection {
245    fn deref_mut(&mut self) -> &mut Self::Target {
246        &mut self.conn
247    }
248}
249
250#[allow(async_fn_in_trait)]
251pub trait Executor {
252    type Database: Database;
253
254    async fn execute(self, sql: &str, arguments: &[Value]) -> Result<AnyQueryResult, Error>;
255    async fn fetch_all(self, sql: &str, arguments: &[Value]) -> Result<Vec<AnyRow>, Error>;
256    async fn fetch_optional(self, sql: &str, arguments: &[Value]) -> Result<Option<AnyRow>, Error>;
257    async fn fetch_one(self, sql: &str, arguments: &[Value]) -> Result<AnyRow, Error>;
258}
259
260impl<'c> Executor for &'c AnyPool {
261    type Database = Any;
262
263    async fn execute(self, sql: &str, arguments: &[Value]) -> Result<AnyQueryResult, Error> {
264        let mut conn = self.acquire().await?;
265        conn.execute_internal(sql, arguments).await
266    }
267
268    async fn fetch_all(self, sql: &str, arguments: &[Value]) -> Result<Vec<AnyRow>, Error> {
269        let mut conn = self.acquire().await?;
270        conn.fetch_all_internal(sql, arguments).await
271    }
272
273    async fn fetch_optional(self, sql: &str, arguments: &[Value]) -> Result<Option<AnyRow>, Error> {
274        let mut conn = self.acquire().await?;
275        conn.fetch_optional_internal(sql, arguments).await
276    }
277
278    async fn fetch_one(self, sql: &str, arguments: &[Value]) -> Result<AnyRow, Error> {
279        let mut conn = self.acquire().await?;
280        conn.fetch_one_internal(sql, arguments).await
281    }
282}
283
284impl<'c> Executor for &'c mut AnyConnection {
285    type Database = Any;
286
287    async fn execute(self, sql: &str, arguments: &[Value]) -> Result<AnyQueryResult, Error> {
288        self.execute_internal(sql, arguments).await
289    }
290
291    async fn fetch_all(self, sql: &str, arguments: &[Value]) -> Result<Vec<AnyRow>, Error> {
292        self.fetch_all_internal(sql, arguments).await
293    }
294
295    async fn fetch_optional(self, sql: &str, arguments: &[Value]) -> Result<Option<AnyRow>, Error> {
296        self.fetch_optional_internal(sql, arguments).await
297    }
298
299    async fn fetch_one(self, sql: &str, arguments: &[Value]) -> Result<AnyRow, Error> {
300        self.fetch_one_internal(sql, arguments).await
301    }
302}
303
304#[cfg(feature = "mysql")]
305impl<'c> Executor for &'c MySqlPool {
306    type Database = Any;
307
308    async fn execute(self, sql: &str, arguments: &[Value]) -> Result<AnyQueryResult, Error> {
309        let mut conn = self.0.acquire()
310            .map_err(|e| Error::Database(e.to_string()))?;
311        let sql_args: Vec<crate::sql::driver::SqlValue> = arguments.iter().map(json_to_sql_value).collect();
312        let res = conn.execute(sql, &sql_args)
313            .map_err(|e| Error::Database(e.to_string()))?;
314        Ok(AnyQueryResult {
315            rows_affected: res.rows_affected,
316            last_insert_id: Some(res.last_insert_id as i64),
317        })
318    }
319
320    async fn fetch_all(self, _sql: &str, _arguments: &[Value]) -> Result<Vec<AnyRow>, Error> {
321        Err(Error::Database("Not implemented for MySqlPool".to_string()))
322    }
323
324    async fn fetch_optional(self, _sql: &str, _arguments: &[Value]) -> Result<Option<AnyRow>, Error> {
325        Err(Error::Database("Not implemented for MySqlPool".to_string()))
326    }
327
328    async fn fetch_one(self, _sql: &str, _arguments: &[Value]) -> Result<AnyRow, Error> {
329        Err(Error::Database("Not implemented for MySqlPool".to_string()))
330    }
331}
332
333impl AnyConnection {
334    pub async fn execute_internal(&mut self, sql: &str, arguments: &[Value]) -> Result<AnyQueryResult, Error> {
335        let sql_args: Vec<crate::sql::driver::SqlValue> = arguments.iter().map(json_to_sql_value).collect();
336        match self {
337            #[cfg(feature = "sqlite")]
338            AnyConnection::Sqlite(s_conn) => {
339                let mut conn = s_conn.conn.take().ok_or_else(|| Error::Database("SQLite connection already used or dropped".to_string()))?;
340                let sql_str = sql.to_string();
341                let (conn_ret, res) = tokio::task::spawn_blocking(move || {
342                    let result = conn.execute(&sql_str, &sql_args)
343                        .map_err(|e| Error::Database(e.to_string()));
344                    (conn, result)
345                }).await.map_err(|e| Error::Database(e.to_string()))?;
346                s_conn.conn = Some(conn_ret);
347                let query_res = res?;
348                Ok(AnyQueryResult {
349                    rows_affected: query_res.rows_affected,
350                    last_insert_id: Some(query_res.last_insert_id as i64),
351                })
352            }
353            #[cfg(feature = "mysql")]
354            AnyConnection::MySql(m_conn) => {
355                let mut conn = m_conn.take().ok_or_else(|| Error::Database("MySQL connection already used or dropped".to_string()))?;
356                let sql_str = sql.to_string();
357                let (conn_ret, res) = tokio::task::spawn_blocking(move || {
358                    let result = conn.execute(&sql_str, &sql_args)
359                        .map_err(|e| Error::Database(e.to_string()));
360                    (conn, result)
361                }).await.map_err(|e| Error::Database(e.to_string()))?;
362                *m_conn = Some(conn_ret);
363                let query_res = res?;
364                Ok(AnyQueryResult {
365                    rows_affected: query_res.rows_affected,
366                    last_insert_id: Some(query_res.last_insert_id as i64),
367                })
368            }
369        }
370    }
371
372    pub async fn fetch_all_internal(&mut self, sql: &str, arguments: &[Value]) -> Result<Vec<AnyRow>, Error> {
373        let sql_args: Vec<crate::sql::driver::SqlValue> = arguments.iter().map(json_to_sql_value).collect();
374        match self {
375            #[cfg(feature = "sqlite")]
376            AnyConnection::Sqlite(s_conn) => {
377                let mut conn = s_conn.conn.take().ok_or_else(|| Error::Database("SQLite connection already used or dropped".to_string()))?;
378                let sql_str = sql.to_string();
379                let (conn_ret, res) = tokio::task::spawn_blocking(move || {
380                    let result = conn.query(&sql_str, &sql_args)
381                        .map_err(|e| Error::Database(e.to_string()));
382                    (conn, result)
383                }).await.map_err(|e| Error::Database(e.to_string()))?;
384                s_conn.conn = Some(conn_ret);
385                let rows = res?;
386                let any_rows = rows.into_iter().map(sql_row_to_any_row).collect();
387                Ok(any_rows)
388            }
389            #[cfg(feature = "mysql")]
390            AnyConnection::MySql(m_conn) => {
391                let mut conn = m_conn.take().ok_or_else(|| Error::Database("MySQL connection already used or dropped".to_string()))?;
392                let sql_str = sql.to_string();
393                let (conn_ret, res) = tokio::task::spawn_blocking(move || {
394                    let result = conn.query(&sql_str, &sql_args)
395                        .map_err(|e| Error::Database(e.to_string()));
396                    (conn, result)
397                }).await.map_err(|e| Error::Database(e.to_string()))?;
398                *m_conn = Some(conn_ret);
399                let rows = res?;
400                let any_rows = rows.into_iter().map(sql_row_to_any_row).collect();
401                Ok(any_rows)
402            }
403        }
404    }
405
406    pub async fn fetch_optional_internal(&mut self, sql: &str, arguments: &[Value]) -> Result<Option<AnyRow>, Error> {
407        let mut rows = self.fetch_all_internal(sql, arguments).await?;
408        if rows.is_empty() {
409            Ok(None)
410        } else {
411            Ok(Some(rows.remove(0)))
412        }
413    }
414
415    pub async fn fetch_one_internal(&mut self, sql: &str, arguments: &[Value]) -> Result<AnyRow, Error> {
416        let mut rows = self.fetch_all_internal(sql, arguments).await?;
417        if rows.is_empty() {
418            Err(Error::RowNotFound)
419        } else {
420            Ok(rows.remove(0))
421        }
422    }
423}
424
425fn json_to_sql_value(val: &Value) -> crate::sql::driver::SqlValue {
426    match val {
427        Value::Null => crate::sql::driver::SqlValue::Null,
428        Value::Bool(b) => crate::sql::driver::SqlValue::Integer(if *b { 1 } else { 0 }),
429        Value::Number(n) => {
430            if let Some(i) = n.as_i64() {
431                crate::sql::driver::SqlValue::Integer(i)
432            } else if let Some(f) = n.as_f64() {
433                crate::sql::driver::SqlValue::Real(f)
434            } else {
435                crate::sql::driver::SqlValue::Real(0.0)
436            }
437        }
438        Value::String(s) => crate::sql::driver::SqlValue::Text(s.clone()),
439        Value::Array(arr) => {
440            let bytes: Vec<u8> = arr.iter().filter_map(|v| v.as_u64().map(|b| b as u8)).collect();
441            crate::sql::driver::SqlValue::Blob(bytes)
442        }
443        _ => crate::sql::driver::SqlValue::Text(val.to_string()),
444    }
445}
446
447fn sql_row_to_any_row(row: crate::sql::driver::SqlRow) -> AnyRow {
448    let mut columns = Vec::new();
449    let mut values = Vec::new();
450
451    for col in &row.columns {
452        columns.push(AnyColumn {
453            name: col.name.clone(),
454            type_info: AnyTypeInfo {
455                name: "UNKNOWN".to_string(),
456            },
457        });
458    }
459
460    for val in &row.values {
461        let db_val = match val {
462            crate::sql::driver::SqlValue::Null => DbValue::Null,
463            crate::sql::driver::SqlValue::Text(s) => DbValue::Text(s.clone()),
464            crate::sql::driver::SqlValue::Blob(b) => DbValue::Blob(b.clone()),
465            crate::sql::driver::SqlValue::Integer(i) => DbValue::Integer(*i),
466            crate::sql::driver::SqlValue::Real(f) => DbValue::Real(*f),
467        };
468        values.push(db_val);
469    }
470
471    AnyRow { columns, values }
472}
473
474#[cfg(feature = "mysql")]
475struct MysqlUrl {
476    host: String,
477    port: u16,
478    user: String,
479    password: String,
480    database: String,
481}
482
483#[cfg(feature = "mysql")]
484fn parse_mysql_url(url: &str) -> Result<MysqlUrl, Error> {
485    if !url.starts_with("mysql://") {
486        return Err(Error::Database("Invalid MySQL URL scheme".into()));
487    }
488    let s = &url["mysql://".len()..];
489    
490    let (creds, host_db) = if let Some(idx) = s.find('@') {
491        (&s[..idx], &s[idx + 1..])
492    } else {
493        ("", s)
494    };
495    
496    let mut user = String::new();
497    let mut password = String::new();
498    if !creds.is_empty() {
499        if let Some(colon_idx) = creds.find(':') {
500            user = creds[..colon_idx].to_string();
501            password = creds[colon_idx + 1..].to_string();
502        } else {
503            user = creds.to_string();
504        }
505    }
506    
507    let (host_port, database) = if let Some(slash_idx) = host_db.find('/') {
508        (&host_db[..slash_idx], host_db[slash_idx + 1..].to_string())
509    } else {
510        (host_db, String::new())
511    };
512    
513    let mut host = "127.0.0.1".to_string();
514    let mut port = 3306;
515    if !host_port.is_empty() {
516        if let Some(colon_idx) = host_port.find(':') {
517            host = host_port[..colon_idx].to_string();
518            if let Ok(p) = host_port[colon_idx + 1..].parse::<u16>() {
519                port = p;
520            }
521        } else {
522            host = host_port.to_string();
523        }
524    }
525    
526    Ok(MysqlUrl {
527        host,
528        port,
529        user,
530        password,
531        database,
532    })
533}