1use std::sync::Arc;
2use serde_json::Value;
3use super::error::Error;
4use super::row::{AnyColumn, AnyTypeInfo, DbValue};
5pub use super::row::AnyRow;
6
7pub trait Database {}
8impl Database for Any {}
9
10pub struct Any;
11
12pub struct AnyArguments<'q> {
13 pub _marker: std::marker::PhantomData<&'q ()>,
14}
15
16pub fn install_default_drivers() {}
17
18#[derive(Clone)]
19pub enum AnyPool {
20 #[cfg(feature = "sqlite")]
21 Sqlite(Arc<SqlitePoolInner>),
22 #[cfg(feature = "mysql")]
23 MySql(crate::sql::driver::mysql::MySqlPool),
24}
25
26impl std::fmt::Debug for AnyPool {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 match self {
29 #[cfg(feature = "sqlite")]
30 AnyPool::Sqlite(inner) => f.debug_struct("SqlitePool")
31 .field("path", &inner.path)
32 .finish(),
33 #[cfg(feature = "mysql")]
34 AnyPool::MySql(_) => write!(f, "MySqlPool"),
35 }
36 }
37}
38
39#[cfg(feature = "sqlite")]
40pub struct SqlitePoolInner {
41 pub path: String,
42 pub connections: tokio::sync::Mutex<Vec<crate::sql::driver::sqlite::SqliteConnection>>,
43}
44
45pub struct SqliteConnection {
46 #[cfg(feature = "sqlite")]
47 pub conn: Option<crate::sql::driver::sqlite::SqliteConnection>,
48 #[cfg(feature = "sqlite")]
49 pub pool: Option<Arc<SqlitePoolInner>>,
50}
51
52impl std::fmt::Debug for SqliteConnection {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 write!(f, "SqliteConnection")
55 }
56}
57
58impl Drop for SqliteConnection {
59 fn drop(&mut self) {
60 #[cfg(feature = "sqlite")]
61 if let (Some(conn), Some(pool)) = (self.conn.take(), &self.pool) {
62 let pool = pool.clone();
63 tokio::spawn(async move {
64 let mut conns = pool.connections.lock().await;
65 conns.push(conn);
66 });
67 }
68 }
69}
70
71pub enum AnyConnection {
72 #[cfg(feature = "sqlite")]
73 Sqlite(SqliteConnection),
74 #[cfg(feature = "mysql")]
75 MySql(Option<crate::sql::driver::mysql::PoolConnection>),
76}
77
78impl std::fmt::Debug for AnyConnection {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 match self {
81 #[cfg(feature = "sqlite")]
82 AnyConnection::Sqlite(_) => write!(f, "AnyConnection::Sqlite"),
83 #[cfg(feature = "mysql")]
84 AnyConnection::MySql(_) => write!(f, "AnyConnection::MySql"),
85 }
86 }
87}
88
89pub struct AnyQueryResult {
90 pub rows_affected: u64,
91 pub last_insert_id: Option<i64>,
92}
93
94impl AnyQueryResult {
95 pub fn rows_affected(&self) -> u64 {
96 self.rows_affected
97 }
98
99 pub fn last_insert_id(&self) -> Option<i64> {
100 self.last_insert_id
101 }
102}
103
104#[cfg(feature = "mysql")]
105#[derive(Clone)]
106pub struct MySqlPool(pub crate::sql::driver::mysql::MySqlPool);
107
108#[cfg(feature = "mysql")]
109impl MySqlPool {
110 pub async fn connect(url: &str) -> Result<Self, Error> {
111 let parsed = parse_mysql_url(url)?;
112 let pool = crate::sql::driver::mysql::MySqlPool::new(
113 &parsed.host,
114 parsed.port,
115 &parsed.user,
116 &parsed.password,
117 &parsed.database,
118 );
119 {
120 let mut conn = pool.acquire().map_err(|e| Error::Database(e.to_string()))?;
121 let _ping = conn.execute("SELECT 1", &[]).map_err(|e| Error::Database(e.to_string()))?;
122 }
123 Ok(MySqlPool(pool))
124 }
125}
126
127impl AnyPool {
128 pub async fn connect(url: &str) -> Result<Self, Error> {
129 if url.starts_with("sqlite:") {
130 #[cfg(feature = "sqlite")]
131 {
132 let path = url.trim_start_matches("sqlite:")
133 .split('?')
134 .next()
135 .unwrap_or(url);
136 if let Some(parent) = std::path::Path::new(path).parent() {
137 let _ = std::fs::create_dir_all(parent);
138 }
139 let conn = crate::sql::driver::sqlite::SqliteConnection::connect(path)
140 .map_err(|e| Error::Database(e.to_string()))?;
141
142 let inner = Arc::new(SqlitePoolInner {
143 path: path.to_string(),
144 connections: tokio::sync::Mutex::new(vec![conn]),
145 });
146 Ok(AnyPool::Sqlite(inner))
147 }
148 #[cfg(not(feature = "sqlite"))]
149 {
150 let _ = url;
151 Err(Error::Database("sqlite feature not enabled".to_string()))
152 }
153 } else if url.starts_with("mysql://") {
154 #[cfg(feature = "mysql")]
155 {
156 let parsed = parse_mysql_url(url)?;
157 let pool = crate::sql::driver::mysql::MySqlPool::new(
158 &parsed.host,
159 parsed.port,
160 &parsed.user,
161 &parsed.password,
162 &parsed.database,
163 );
164 {
165 let mut conn = pool.acquire().map_err(|e| Error::Database(e.to_string()))?;
166 let _ping = conn.execute("SELECT 1", &[]).map_err(|e| Error::Database(e.to_string()))?;
167 }
168 Ok(AnyPool::MySql(pool))
169 }
170 #[cfg(not(feature = "mysql"))]
171 {
172 let _ = url;
173 Err(Error::Database(
174 "DB_CONNECTION=mysql terdeteksi, tapi fitur mysql belum aktif.\n\
175 Tambahkan features = [\"mysql\"] pada rustbasic-core di Cargo.toml project Anda".to_string()
176 ))
177 }
178 } else {
179 Err(Error::Database(format!("Unsupported database URL prefix: {}", url)))
180 }
181 }
182
183 pub async fn acquire(&self) -> Result<PoolConnection, Error> {
184 let conn = match self {
185 #[cfg(feature = "sqlite")]
186 AnyPool::Sqlite(pool) => {
187 let mut conns = pool.connections.lock().await;
188 let conn = if let Some(c) = conns.pop() {
189 c
190 } else {
191 let path = &pool.path;
192 let c = crate::sql::driver::sqlite::SqliteConnection::connect(path)
193 .map_err(|e| Error::Database(e.to_string()))?;
194 c
195 };
196 AnyConnection::Sqlite(SqliteConnection {
197 conn: Some(conn),
198 pool: Some(pool.clone()),
199 })
200 }
201 #[cfg(feature = "mysql")]
202 AnyPool::MySql(pool) => {
203 let conn = pool.acquire()
204 .map_err(|e| Error::Database(e.to_string()))?;
205 AnyConnection::MySql(Some(conn))
206 }
207 };
208 Ok(PoolConnection { conn })
209 }
210
211 pub fn backend_name(&self) -> &str {
212 match self {
213 #[cfg(feature = "sqlite")]
214 AnyPool::Sqlite(_) => "SQLite",
215 #[cfg(feature = "mysql")]
216 AnyPool::MySql(_) => "MySQL",
217 }
218 }
219}
220
221impl AnyConnection {
222 pub fn backend_name(&self) -> &str {
223 match self {
224 #[cfg(feature = "sqlite")]
225 AnyConnection::Sqlite(_) => "SQLite",
226 #[cfg(feature = "mysql")]
227 AnyConnection::MySql(_) => "MySQL",
228 }
229 }
230}
231
232#[derive(Debug)]
233pub struct PoolConnection {
234 pub conn: AnyConnection,
235}
236
237impl std::ops::Deref for PoolConnection {
238 type Target = AnyConnection;
239 fn deref(&self) -> &Self::Target {
240 &self.conn
241 }
242}
243
244impl std::ops::DerefMut for PoolConnection {
245 fn deref_mut(&mut self) -> &mut Self::Target {
246 &mut self.conn
247 }
248}
249
250#[allow(async_fn_in_trait)]
251pub trait Executor {
252 type Database: Database;
253
254 async fn execute(self, sql: &str, arguments: &[Value]) -> Result<AnyQueryResult, Error>;
255 async fn fetch_all(self, sql: &str, arguments: &[Value]) -> Result<Vec<AnyRow>, Error>;
256 async fn fetch_optional(self, sql: &str, arguments: &[Value]) -> Result<Option<AnyRow>, Error>;
257 async fn fetch_one(self, sql: &str, arguments: &[Value]) -> Result<AnyRow, Error>;
258}
259
260impl<'c> Executor for &'c AnyPool {
261 type Database = Any;
262
263 async fn execute(self, sql: &str, arguments: &[Value]) -> Result<AnyQueryResult, Error> {
264 let mut conn = self.acquire().await?;
265 conn.execute_internal(sql, arguments).await
266 }
267
268 async fn fetch_all(self, sql: &str, arguments: &[Value]) -> Result<Vec<AnyRow>, Error> {
269 let mut conn = self.acquire().await?;
270 conn.fetch_all_internal(sql, arguments).await
271 }
272
273 async fn fetch_optional(self, sql: &str, arguments: &[Value]) -> Result<Option<AnyRow>, Error> {
274 let mut conn = self.acquire().await?;
275 conn.fetch_optional_internal(sql, arguments).await
276 }
277
278 async fn fetch_one(self, sql: &str, arguments: &[Value]) -> Result<AnyRow, Error> {
279 let mut conn = self.acquire().await?;
280 conn.fetch_one_internal(sql, arguments).await
281 }
282}
283
284impl<'c> Executor for &'c mut AnyConnection {
285 type Database = Any;
286
287 async fn execute(self, sql: &str, arguments: &[Value]) -> Result<AnyQueryResult, Error> {
288 self.execute_internal(sql, arguments).await
289 }
290
291 async fn fetch_all(self, sql: &str, arguments: &[Value]) -> Result<Vec<AnyRow>, Error> {
292 self.fetch_all_internal(sql, arguments).await
293 }
294
295 async fn fetch_optional(self, sql: &str, arguments: &[Value]) -> Result<Option<AnyRow>, Error> {
296 self.fetch_optional_internal(sql, arguments).await
297 }
298
299 async fn fetch_one(self, sql: &str, arguments: &[Value]) -> Result<AnyRow, Error> {
300 self.fetch_one_internal(sql, arguments).await
301 }
302}
303
304#[cfg(feature = "mysql")]
305impl<'c> Executor for &'c MySqlPool {
306 type Database = Any;
307
308 async fn execute(self, sql: &str, arguments: &[Value]) -> Result<AnyQueryResult, Error> {
309 let mut conn = self.0.acquire()
310 .map_err(|e| Error::Database(e.to_string()))?;
311 let sql_args: Vec<crate::sql::driver::SqlValue> = arguments.iter().map(json_to_sql_value).collect();
312 let res = conn.execute(sql, &sql_args)
313 .map_err(|e| Error::Database(e.to_string()))?;
314 Ok(AnyQueryResult {
315 rows_affected: res.rows_affected,
316 last_insert_id: Some(res.last_insert_id as i64),
317 })
318 }
319
320 async fn fetch_all(self, _sql: &str, _arguments: &[Value]) -> Result<Vec<AnyRow>, Error> {
321 Err(Error::Database("Not implemented for MySqlPool".to_string()))
322 }
323
324 async fn fetch_optional(self, _sql: &str, _arguments: &[Value]) -> Result<Option<AnyRow>, Error> {
325 Err(Error::Database("Not implemented for MySqlPool".to_string()))
326 }
327
328 async fn fetch_one(self, _sql: &str, _arguments: &[Value]) -> Result<AnyRow, Error> {
329 Err(Error::Database("Not implemented for MySqlPool".to_string()))
330 }
331}
332
333impl AnyConnection {
334 pub async fn execute_internal(&mut self, sql: &str, arguments: &[Value]) -> Result<AnyQueryResult, Error> {
335 let sql_args: Vec<crate::sql::driver::SqlValue> = arguments.iter().map(json_to_sql_value).collect();
336 match self {
337 #[cfg(feature = "sqlite")]
338 AnyConnection::Sqlite(s_conn) => {
339 let mut conn = s_conn.conn.take().ok_or_else(|| Error::Database("SQLite connection already used or dropped".to_string()))?;
340 let sql_str = sql.to_string();
341 let (conn_ret, res) = tokio::task::spawn_blocking(move || {
342 let result = conn.execute(&sql_str, &sql_args)
343 .map_err(|e| Error::Database(e.to_string()));
344 (conn, result)
345 }).await.map_err(|e| Error::Database(e.to_string()))?;
346 s_conn.conn = Some(conn_ret);
347 let query_res = res?;
348 Ok(AnyQueryResult {
349 rows_affected: query_res.rows_affected,
350 last_insert_id: Some(query_res.last_insert_id as i64),
351 })
352 }
353 #[cfg(feature = "mysql")]
354 AnyConnection::MySql(m_conn) => {
355 let mut conn = m_conn.take().ok_or_else(|| Error::Database("MySQL connection already used or dropped".to_string()))?;
356 let sql_str = sql.to_string();
357 let (conn_ret, res) = tokio::task::spawn_blocking(move || {
358 let result = conn.execute(&sql_str, &sql_args)
359 .map_err(|e| Error::Database(e.to_string()));
360 (conn, result)
361 }).await.map_err(|e| Error::Database(e.to_string()))?;
362 *m_conn = Some(conn_ret);
363 let query_res = res?;
364 Ok(AnyQueryResult {
365 rows_affected: query_res.rows_affected,
366 last_insert_id: Some(query_res.last_insert_id as i64),
367 })
368 }
369 }
370 }
371
372 pub async fn fetch_all_internal(&mut self, sql: &str, arguments: &[Value]) -> Result<Vec<AnyRow>, Error> {
373 let sql_args: Vec<crate::sql::driver::SqlValue> = arguments.iter().map(json_to_sql_value).collect();
374 match self {
375 #[cfg(feature = "sqlite")]
376 AnyConnection::Sqlite(s_conn) => {
377 let mut conn = s_conn.conn.take().ok_or_else(|| Error::Database("SQLite connection already used or dropped".to_string()))?;
378 let sql_str = sql.to_string();
379 let (conn_ret, res) = tokio::task::spawn_blocking(move || {
380 let result = conn.query(&sql_str, &sql_args)
381 .map_err(|e| Error::Database(e.to_string()));
382 (conn, result)
383 }).await.map_err(|e| Error::Database(e.to_string()))?;
384 s_conn.conn = Some(conn_ret);
385 let rows = res?;
386 let any_rows = rows.into_iter().map(sql_row_to_any_row).collect();
387 Ok(any_rows)
388 }
389 #[cfg(feature = "mysql")]
390 AnyConnection::MySql(m_conn) => {
391 let mut conn = m_conn.take().ok_or_else(|| Error::Database("MySQL connection already used or dropped".to_string()))?;
392 let sql_str = sql.to_string();
393 let (conn_ret, res) = tokio::task::spawn_blocking(move || {
394 let result = conn.query(&sql_str, &sql_args)
395 .map_err(|e| Error::Database(e.to_string()));
396 (conn, result)
397 }).await.map_err(|e| Error::Database(e.to_string()))?;
398 *m_conn = Some(conn_ret);
399 let rows = res?;
400 let any_rows = rows.into_iter().map(sql_row_to_any_row).collect();
401 Ok(any_rows)
402 }
403 }
404 }
405
406 pub async fn fetch_optional_internal(&mut self, sql: &str, arguments: &[Value]) -> Result<Option<AnyRow>, Error> {
407 let mut rows = self.fetch_all_internal(sql, arguments).await?;
408 if rows.is_empty() {
409 Ok(None)
410 } else {
411 Ok(Some(rows.remove(0)))
412 }
413 }
414
415 pub async fn fetch_one_internal(&mut self, sql: &str, arguments: &[Value]) -> Result<AnyRow, Error> {
416 let mut rows = self.fetch_all_internal(sql, arguments).await?;
417 if rows.is_empty() {
418 Err(Error::RowNotFound)
419 } else {
420 Ok(rows.remove(0))
421 }
422 }
423}
424
425fn json_to_sql_value(val: &Value) -> crate::sql::driver::SqlValue {
426 match val {
427 Value::Null => crate::sql::driver::SqlValue::Null,
428 Value::Bool(b) => crate::sql::driver::SqlValue::Integer(if *b { 1 } else { 0 }),
429 Value::Number(n) => {
430 if let Some(i) = n.as_i64() {
431 crate::sql::driver::SqlValue::Integer(i)
432 } else if let Some(f) = n.as_f64() {
433 crate::sql::driver::SqlValue::Real(f)
434 } else {
435 crate::sql::driver::SqlValue::Real(0.0)
436 }
437 }
438 Value::String(s) => crate::sql::driver::SqlValue::Text(s.clone()),
439 Value::Array(arr) => {
440 let bytes: Vec<u8> = arr.iter().filter_map(|v| v.as_u64().map(|b| b as u8)).collect();
441 crate::sql::driver::SqlValue::Blob(bytes)
442 }
443 _ => crate::sql::driver::SqlValue::Text(val.to_string()),
444 }
445}
446
447fn sql_row_to_any_row(row: crate::sql::driver::SqlRow) -> AnyRow {
448 let mut columns = Vec::new();
449 let mut values = Vec::new();
450
451 for col in &row.columns {
452 columns.push(AnyColumn {
453 name: col.name.clone(),
454 type_info: AnyTypeInfo {
455 name: "UNKNOWN".to_string(),
456 },
457 });
458 }
459
460 for val in &row.values {
461 let db_val = match val {
462 crate::sql::driver::SqlValue::Null => DbValue::Null,
463 crate::sql::driver::SqlValue::Text(s) => DbValue::Text(s.clone()),
464 crate::sql::driver::SqlValue::Blob(b) => DbValue::Blob(b.clone()),
465 crate::sql::driver::SqlValue::Integer(i) => DbValue::Integer(*i),
466 crate::sql::driver::SqlValue::Real(f) => DbValue::Real(*f),
467 };
468 values.push(db_val);
469 }
470
471 AnyRow { columns, values }
472}
473
474#[cfg(feature = "mysql")]
475struct MysqlUrl {
476 host: String,
477 port: u16,
478 user: String,
479 password: String,
480 database: String,
481}
482
483#[cfg(feature = "mysql")]
484fn parse_mysql_url(url: &str) -> Result<MysqlUrl, Error> {
485 if !url.starts_with("mysql://") {
486 return Err(Error::Database("Invalid MySQL URL scheme".into()));
487 }
488 let s = &url["mysql://".len()..];
489
490 let (creds, host_db) = if let Some(idx) = s.find('@') {
491 (&s[..idx], &s[idx + 1..])
492 } else {
493 ("", s)
494 };
495
496 let mut user = String::new();
497 let mut password = String::new();
498 if !creds.is_empty() {
499 if let Some(colon_idx) = creds.find(':') {
500 user = creds[..colon_idx].to_string();
501 password = creds[colon_idx + 1..].to_string();
502 } else {
503 user = creds.to_string();
504 }
505 }
506
507 let (host_port, database) = if let Some(slash_idx) = host_db.find('/') {
508 (&host_db[..slash_idx], host_db[slash_idx + 1..].to_string())
509 } else {
510 (host_db, String::new())
511 };
512
513 let mut host = "127.0.0.1".to_string();
514 let mut port = 3306;
515 if !host_port.is_empty() {
516 if let Some(colon_idx) = host_port.find(':') {
517 host = host_port[..colon_idx].to_string();
518 if let Ok(p) = host_port[colon_idx + 1..].parse::<u16>() {
519 port = p;
520 }
521 } else {
522 host = host_port.to_string();
523 }
524 }
525
526 Ok(MysqlUrl {
527 host,
528 port,
529 user,
530 password,
531 database,
532 })
533}