1use crate::types::{ColumnValue, DatabaseConfig, DatabaseError, QueryResult, Row};
2use crate::vfs::IndexedDBVFS;
3use rusqlite::{Connection, Statement, params_from_iter};
4use std::time::Instant;
5
6#[cfg(feature = "fs_persist")]
7use crate::storage::BlockStorage;
8#[cfg(feature = "fs_persist")]
9use std::path::PathBuf;
10
11pub struct PreparedStatement<'conn> {
13 stmt: Statement<'conn>,
14}
15
16impl<'conn> PreparedStatement<'conn> {
17 pub async fn execute(&mut self, params: &[ColumnValue]) -> Result<QueryResult, DatabaseError> {
19 log::debug!(
20 "Executing prepared statement with {} parameters",
21 params.len()
22 );
23 let start_time = Instant::now();
24
25 let rusqlite_params: Vec<rusqlite::types::Value> =
27 params.iter().map(|p| p.to_rusqlite_value()).collect();
28
29 let mut result = QueryResult {
30 columns: Vec::new(),
31 rows: Vec::new(),
32 affected_rows: 0,
33 last_insert_id: None,
34 execution_time_ms: 0.0,
35 };
36
37 result.columns = self
39 .stmt
40 .column_names()
41 .iter()
42 .map(|name| name.to_string())
43 .collect();
44
45 let is_select = !result.columns.is_empty();
47
48 if is_select {
49 let rows = self
51 .stmt
52 .query_map(params_from_iter(rusqlite_params.iter()), |row| {
53 let mut values = Vec::new();
54 for i in 0..result.columns.len() {
55 let value = row.get_ref(i)?;
56 values.push(ColumnValue::from_rusqlite_value(&value.into()));
57 }
58 Ok(Row { values })
59 })
60 .map_err(DatabaseError::from)?;
61
62 for row in rows {
63 result.rows.push(row.map_err(DatabaseError::from)?);
64 }
65 } else {
66 self.stmt
68 .execute(params_from_iter(rusqlite_params.iter()))
69 .map_err(DatabaseError::from)?;
70
71 }
74
75 result.execution_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
76 log::debug!(
77 "Prepared statement executed in {:.2}ms, {} rows returned",
78 result.execution_time_ms,
79 result.rows.len()
80 );
81
82 Ok(result)
83 }
84
85 pub fn finalize(self) -> Result<(), DatabaseError> {
89 Ok(())
91 }
92}
93
94pub struct SqliteIndexedDB {
96 connection: Connection,
97 #[allow(dead_code)]
98 vfs: IndexedDBVFS,
99 config: DatabaseConfig,
100 #[cfg(feature = "fs_persist")]
101 storage: BlockStorage,
102 transaction_depth: u32,
104}
105
106impl SqliteIndexedDB {
107 pub async fn new(config: DatabaseConfig) -> Result<Self, DatabaseError> {
108 log::info!("Creating SQLiteIndexedDB with config: {:?}", config);
109
110 let vfs = IndexedDBVFS::new(&config.name).await?;
112
113 #[cfg(feature = "fs_persist")]
115 {
116 let storage_name = config
118 .name
119 .strip_suffix(".db")
120 .unwrap_or(&config.name)
121 .to_string();
122
123 let storage = BlockStorage::new(&storage_name)
125 .await
126 .map_err(|e| DatabaseError::new("BLOCKSTORAGE_ERROR", &e.to_string()))?;
127
128 let base_dir = std::env::var("ABSURDERSQL_FS_BASE")
130 .unwrap_or_else(|_| "./absurdersql_storage".to_string());
131
132 let db_file_path = PathBuf::from(base_dir)
134 .join(&storage_name)
135 .join("database.sqlite");
136
137 if let Some(parent) = db_file_path.parent() {
139 std::fs::create_dir_all(parent).map_err(|e| {
140 DatabaseError::new("IO_ERROR", &format!("Failed to create directory: {}", e))
141 })?;
142 }
143
144 let connection = Connection::open(&db_file_path).map_err(DatabaseError::from)?;
146
147 log::info!(
148 "Native database opened with filesystem persistence: {:?}",
149 db_file_path
150 );
151
152 Self::configure_connection(connection, vfs, config, storage)
153 }
154
155 #[cfg(not(feature = "fs_persist"))]
157 {
158 let connection = Connection::open_in_memory().map_err(|e| DatabaseError::from(e))?;
159
160 return Self::configure_connection(connection, vfs, config);
161 }
162 }
163
164 #[cfg(feature = "fs_persist")]
165 fn configure_connection(
166 connection: Connection,
167 vfs: IndexedDBVFS,
168 config: DatabaseConfig,
169 storage: BlockStorage,
170 ) -> Result<Self, DatabaseError> {
171 let mut instance = Self {
172 connection,
173 vfs,
174 config,
175 storage,
176 transaction_depth: 0,
177 };
178 instance.apply_pragmas()?;
179 Ok(instance)
180 }
181
182 #[cfg(not(feature = "fs_persist"))]
183 fn configure_connection(
184 connection: Connection,
185 vfs: IndexedDBVFS,
186 config: DatabaseConfig,
187 ) -> Result<Self, DatabaseError> {
188 let mut instance = Self {
189 connection,
190 vfs,
191 config,
192 transaction_depth: 0,
193 };
194 instance.apply_pragmas()?;
195 Ok(instance)
196 }
197
198 fn apply_pragmas(&mut self) -> Result<(), DatabaseError> {
199 if let Some(cache_size) = self.config.cache_size {
201 let sql = format!("PRAGMA cache_size = {}", cache_size);
202 log::debug!("Setting cache_size: {}", sql);
203 let mut stmt = self.connection.prepare(&sql).map_err(|e| {
204 log::warn!("Failed to prepare cache_size statement: {:?}", e);
205 DatabaseError::from(e)
206 })?;
207 let _ = stmt.query_map([], |_| Ok(())).map_err(|e| {
208 log::warn!("Failed to set cache_size: {:?}", e);
209 DatabaseError::from(e)
210 })?;
211 }
212
213 if let Some(page_size) = self.config.page_size {
214 let sql = format!("PRAGMA page_size = {}", page_size);
215 log::debug!("Setting page_size: {}", sql);
216 let mut stmt = self.connection.prepare(&sql).map_err(|e| {
217 log::warn!("Failed to prepare page_size statement: {:?}", e);
218 DatabaseError::from(e)
219 })?;
220 let _ = stmt.query_map([], |_| Ok(())).map_err(|e| {
221 log::warn!("Failed to set page_size: {:?}", e);
222 DatabaseError::from(e)
223 })?;
224 }
225
226 if let Some(journal_mode) = &self.config.journal_mode {
227 let sql = format!("PRAGMA journal_mode = {}", journal_mode);
228 log::debug!("Setting journal_mode: {}", sql);
229 let mut stmt = self.connection.prepare(&sql).map_err(|e| {
230 log::warn!("Failed to prepare journal_mode statement: {:?}", e);
231 DatabaseError::from(e)
232 })?;
233 let _ = stmt.query_map([], |_| Ok(())).map_err(|e| {
234 log::warn!("Failed to set journal_mode: {:?}", e);
235 DatabaseError::from(e)
236 })?;
237 }
238
239 log::info!("SQLiteIndexedDB configured successfully");
240 Ok(())
241 }
242
243 pub async fn execute(&mut self, sql: &str) -> Result<QueryResult, DatabaseError> {
244 self.execute_with_params(sql, &[]).await
245 }
246
247 pub fn prepare(&mut self, sql: &str) -> Result<PreparedStatement<'_>, DatabaseError> {
263 log::debug!("Preparing SQL statement: {}", sql);
264 let stmt = self
265 .connection
266 .prepare(sql)
267 .map_err(|e| DatabaseError::from(e).with_sql(sql))?;
268 Ok(PreparedStatement { stmt })
269 }
270
271 pub async fn execute_with_params(
272 &mut self,
273 sql: &str,
274 params: &[ColumnValue],
275 ) -> Result<QueryResult, DatabaseError> {
276 log::debug!("Executing SQL: {}", sql);
277 let start_time = Instant::now();
278
279 let rusqlite_params: Vec<rusqlite::types::Value> =
281 params.iter().map(|p| p.to_rusqlite_value()).collect();
282
283 let trimmed_sql = sql.trim_start().to_lowercase();
285 let is_select = trimmed_sql.starts_with("select")
286 || trimmed_sql.starts_with("with")
287 || trimmed_sql.starts_with("pragma");
288
289 let mut result = QueryResult {
290 columns: Vec::new(),
291 rows: Vec::new(),
292 affected_rows: 0,
293 last_insert_id: None,
294 execution_time_ms: 0.0,
295 };
296
297 if is_select {
298 let mut stmt = self
300 .connection
301 .prepare(sql)
302 .map_err(|e| DatabaseError::from(e).with_sql(sql))?;
303
304 result.columns = stmt
306 .column_names()
307 .iter()
308 .map(|name| name.to_string())
309 .collect();
310
311 let rows = stmt
313 .query_map(params_from_iter(rusqlite_params.iter()), |row| {
314 let mut values = Vec::new();
315 for i in 0..result.columns.len() {
316 let value = row.get_ref(i)?;
317 values.push(ColumnValue::from_rusqlite_value(&value.into()));
318 }
319 Ok(Row { values })
320 })
321 .map_err(|e| DatabaseError::from(e).with_sql(sql))?;
322
323 for row in rows {
324 result
325 .rows
326 .push(row.map_err(|e| DatabaseError::from(e).with_sql(sql))?);
327 }
328 } else {
329 let changes = self
331 .connection
332 .execute(sql, params_from_iter(rusqlite_params.iter()))
333 .map_err(|e| DatabaseError::from(e).with_sql(sql))?;
334
335 result.affected_rows = changes as u32;
336
337 if trimmed_sql.starts_with("insert") {
339 result.last_insert_id = Some(self.connection.last_insert_rowid());
340 }
341 }
342
343 result.execution_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
344
345 log::debug!(
346 "SQL executed in {:.2}ms, {} rows affected/returned",
347 result.execution_time_ms,
348 if is_select {
349 result.rows.len()
350 } else {
351 result.affected_rows as usize
352 }
353 );
354
355 if trimmed_sql.starts_with("begin") {
357 self.transaction_depth += 1;
358 log::debug!("Transaction BEGIN, depth now: {}", self.transaction_depth);
359 } else if trimmed_sql.starts_with("commit") || trimmed_sql.starts_with("end") {
360 if self.transaction_depth > 0 {
361 self.transaction_depth -= 1;
362 log::debug!("Transaction COMMIT, depth now: {}", self.transaction_depth);
363 }
364 } else if trimmed_sql.starts_with("rollback") && self.transaction_depth > 0 {
365 self.transaction_depth -= 1;
366 log::debug!(
367 "Transaction ROLLBACK, depth now: {}",
368 self.transaction_depth
369 );
370 }
371
372 if !is_select && self.transaction_depth == 0 {
374 self.sync().await?;
375 }
376
377 Ok(result)
378 }
379
380 pub async fn execute_batch(&mut self, statements: &[String]) -> Result<(), DatabaseError> {
384 log::debug!("Executing batch of {} statements", statements.len());
385 let start_time = Instant::now();
386
387 for (i, sql) in statements.iter().enumerate() {
388 self.execute(sql).await.map_err(|e| {
389 log::error!("Batch execution failed at statement {}: {}", i, sql);
390 e.with_sql(sql)
391 })?;
392 }
393
394 let duration = start_time.elapsed().as_secs_f64() * 1000.0;
395 log::debug!(
396 "Batch of {} statements executed in {:.2}ms",
397 statements.len(),
398 duration
399 );
400
401 Ok(())
402 }
403
404 pub async fn sync(&mut self) -> Result<(), DatabaseError> {
405 #[cfg(feature = "fs_persist")]
406 {
407 log::debug!("Syncing database to filesystem");
408 self.storage
409 .sync()
410 .await
411 .map_err(|e| DatabaseError::new("SYNC_ERROR", &e.to_string()))?;
412 }
413
414 #[cfg(not(feature = "fs_persist"))]
415 {
416 log::debug!("Native mode without fs_persist - no sync needed");
419 }
420
421 Ok(())
422 }
423
424 pub async fn close(&mut self) -> Result<(), DatabaseError> {
425 log::info!("Closing database");
426 self.sync().await?;
427 Ok(())
429 }
430
431 pub fn get_connection(&self) -> &Connection {
432 &self.connection
433 }
434
435 #[cfg(feature = "fs_persist")]
437 pub fn get_storage(&self) -> &BlockStorage {
438 &self.storage
439 }
440
441 #[cfg(all(
452 not(target_arch = "wasm32"),
453 any(
454 feature = "encryption",
455 feature = "encryption-commoncrypto",
456 feature = "encryption-ios"
457 )
458 ))]
459 pub async fn new_encrypted(config: DatabaseConfig, key: &str) -> Result<Self, DatabaseError> {
460 log::info!(
461 "Creating encrypted SQLiteIndexedDB with config: {:?}",
462 config
463 );
464
465 if key.len() < 8 {
467 return Err(DatabaseError::new(
468 "ENCRYPTION_ERROR",
469 "Encryption key must be at least 8 characters long",
470 ));
471 }
472
473 let vfs_name = format!("{}_vfs_metadata", config.name);
476 let vfs = IndexedDBVFS::new(&vfs_name).await?;
477
478 #[cfg(feature = "fs_persist")]
480 {
481 let storage_name = format!(
483 "{}_vfs_storage",
484 config.name.strip_suffix(".db").unwrap_or(&config.name)
485 );
486
487 let storage = BlockStorage::new(&storage_name)
489 .await
490 .map_err(|e| DatabaseError::new("BLOCKSTORAGE_ERROR", &e.to_string()))?;
491
492 let db_file_path = PathBuf::from(&config.name);
494
495 if let Some(parent) = db_file_path.parent() {
497 std::fs::create_dir_all(parent).map_err(|e| {
498 DatabaseError::new("IO_ERROR", &format!("Failed to create directory: {}", e))
499 })?;
500 }
501
502 if db_file_path.exists() && db_file_path.is_dir() {
506 std::fs::remove_dir_all(&db_file_path).map_err(|e| {
507 DatabaseError::new(
508 "IO_ERROR",
509 &format!("Failed to remove existing VFS directory: {}", e),
510 )
511 })?;
512 }
513
514 let connection = Connection::open(&db_file_path).map_err(DatabaseError::from)?;
516
517 let escaped_key = key.replace("'", "''");
520 connection
521 .execute_batch(&format!("PRAGMA key = '{}';", escaped_key))
522 .map_err(|e| {
523 DatabaseError::new(
524 "ENCRYPTION_ERROR",
525 &format!("Failed to set encryption key: {}", e),
526 )
527 })?;
528
529 connection
531 .execute(
532 "CREATE TABLE IF NOT EXISTS _encryption_check (id INTEGER PRIMARY KEY)",
533 [],
534 )
535 .map_err(|e| {
536 DatabaseError::new(
537 "ENCRYPTION_ERROR",
538 &format!("Failed to verify encryption: {}", e),
539 )
540 })?;
541
542 connection
544 .execute("DROP TABLE _encryption_check", [])
545 .map_err(|e| {
546 DatabaseError::new(
547 "ENCRYPTION_ERROR",
548 &format!("Failed to cleanup test table: {}", e),
549 )
550 })?;
551
552 log::info!(
553 "Encrypted native database opened with filesystem persistence: {:?}",
554 db_file_path
555 );
556
557 Self::configure_connection(connection, vfs, config, storage)
558 }
559
560 #[cfg(not(feature = "fs_persist"))]
562 {
563 let connection = Connection::open_in_memory().map_err(|e| DatabaseError::from(e))?;
564
565 let escaped_key = key.replace("'", "''");
567 connection
568 .execute_batch(&format!("PRAGMA key = '{}';", escaped_key))
569 .map_err(|e| {
570 DatabaseError::new(
571 "ENCRYPTION_ERROR",
572 &format!("Failed to set encryption key: {}", e),
573 )
574 })?;
575
576 return Self::configure_connection(connection, vfs, config);
577 }
578 }
579
580 #[cfg(all(
590 not(target_arch = "wasm32"),
591 any(
592 feature = "encryption",
593 feature = "encryption-commoncrypto",
594 feature = "encryption-ios"
595 )
596 ))]
597 pub async fn rekey(&self, new_key: &str) -> Result<(), DatabaseError> {
598 log::info!("Rekeying encrypted database");
599
600 if new_key.len() < 8 {
602 return Err(DatabaseError::new(
603 "ENCRYPTION_ERROR",
604 "New encryption key must be at least 8 characters long",
605 ));
606 }
607
608 let escaped_key = new_key.replace("'", "''");
610
611 self.connection
613 .execute_batch(&format!("PRAGMA rekey = '{}';", escaped_key))
614 .map_err(|e| {
615 DatabaseError::new(
616 "ENCRYPTION_ERROR",
617 &format!("Failed to rekey database: {}", e),
618 )
619 })?;
620
621 self.connection
623 .execute_batch("PRAGMA cipher_version;")
624 .map_err(|e| {
625 DatabaseError::new(
626 "ENCRYPTION_ERROR",
627 &format!("New key verification failed: {}", e),
628 )
629 })?;
630
631 log::info!("Successfully rekeyed database");
632 Ok(())
633 }
634}