1use crate::types::{DatabaseConfig, QueryResult, ColumnValue, Row, DatabaseError};
2use crate::vfs::IndexedDBVFS;
3use rusqlite::{Connection, params_from_iter, Statement};
4use std::time::Instant;
5
6#[cfg(feature = "fs_persist")]
7use crate::storage::BlockStorage;
8#[cfg(feature = "fs_persist")]
9use std::path::PathBuf;
10
11pub struct PreparedStatement<'conn> {
13 stmt: Statement<'conn>,
14}
15
16impl<'conn> PreparedStatement<'conn> {
17 pub async fn execute(&mut self, params: &[ColumnValue]) -> Result<QueryResult, DatabaseError> {
19 log::debug!("Executing prepared statement with {} parameters", params.len());
20 let start_time = Instant::now();
21
22 let rusqlite_params: Vec<rusqlite::types::Value> = params.iter()
24 .map(|p| p.to_rusqlite_value())
25 .collect();
26
27 let mut result = QueryResult {
28 columns: Vec::new(),
29 rows: Vec::new(),
30 affected_rows: 0,
31 last_insert_id: None,
32 execution_time_ms: 0.0,
33 };
34
35 result.columns = self.stmt.column_names().iter()
37 .map(|name| name.to_string())
38 .collect();
39
40 let is_select = !result.columns.is_empty();
42
43 if is_select {
44 let rows = self.stmt.query_map(params_from_iter(rusqlite_params.iter()), |row| {
46 let mut values = Vec::new();
47 for i in 0..result.columns.len() {
48 let value = row.get_ref(i)?;
49 values.push(ColumnValue::from_rusqlite_value(&value.into()));
50 }
51 Ok(Row { values })
52 }).map_err(|e| DatabaseError::from(e))?;
53
54 for row in rows {
55 result.rows.push(row.map_err(|e| DatabaseError::from(e))?);
56 }
57 } else {
58 self.stmt.execute(params_from_iter(rusqlite_params.iter()))
60 .map_err(|e| DatabaseError::from(e))?;
61
62 }
65
66 result.execution_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
67 log::debug!("Prepared statement executed in {:.2}ms, {} rows returned",
68 result.execution_time_ms, result.rows.len());
69
70 Ok(result)
71 }
72
73 pub fn finalize(self) -> Result<(), DatabaseError> {
77 Ok(())
79 }
80}
81
82pub struct SqliteIndexedDB {
84 connection: Connection,
85 #[allow(dead_code)]
86 vfs: IndexedDBVFS,
87 config: DatabaseConfig,
88 #[cfg(feature = "fs_persist")]
89 storage: BlockStorage,
90 transaction_depth: u32,
92}
93
94impl SqliteIndexedDB {
95 pub async fn new(config: DatabaseConfig) -> Result<Self, DatabaseError> {
96 log::info!("Creating SQLiteIndexedDB with config: {:?}", config);
97
98 let vfs = IndexedDBVFS::new(&config.name).await?;
100
101 #[cfg(feature = "fs_persist")]
103 {
104 let storage_name = config.name.strip_suffix(".db")
106 .unwrap_or(&config.name)
107 .to_string();
108
109 let storage = BlockStorage::new(&storage_name).await
111 .map_err(|e| DatabaseError::new("BLOCKSTORAGE_ERROR", &e.to_string()))?;
112
113 let base_dir = std::env::var("ABSURDERSQL_FS_BASE")
115 .unwrap_or_else(|_| "./absurdersql_storage".to_string());
116
117 let db_file_path = PathBuf::from(base_dir)
119 .join(&storage_name)
120 .join("database.sqlite");
121
122 if let Some(parent) = db_file_path.parent() {
124 std::fs::create_dir_all(parent)
125 .map_err(|e| DatabaseError::new("IO_ERROR", &format!("Failed to create directory: {}", e)))?;
126 }
127
128 let connection = Connection::open(&db_file_path)
130 .map_err(|e| DatabaseError::from(e))?;
131
132 log::info!("Native database opened with filesystem persistence: {:?}", db_file_path);
133
134 return Self::configure_connection(connection, vfs, config, storage);
135 }
136
137 #[cfg(not(feature = "fs_persist"))]
139 {
140 let connection = Connection::open_in_memory()
141 .map_err(|e| DatabaseError::from(e))?;
142
143 return Self::configure_connection(connection, vfs, config);
144 }
145 }
146
147 #[cfg(feature = "fs_persist")]
148 fn configure_connection(
149 connection: Connection,
150 vfs: IndexedDBVFS,
151 config: DatabaseConfig,
152 storage: BlockStorage,
153 ) -> Result<Self, DatabaseError> {
154 let mut instance = Self {
155 connection,
156 vfs,
157 config,
158 storage,
159 transaction_depth: 0,
160 };
161 instance.apply_pragmas()?;
162 Ok(instance)
163 }
164
165 #[cfg(not(feature = "fs_persist"))]
166 fn configure_connection(
167 connection: Connection,
168 vfs: IndexedDBVFS,
169 config: DatabaseConfig,
170 ) -> Result<Self, DatabaseError> {
171 let mut instance = Self {
172 connection,
173 vfs,
174 config,
175 transaction_depth: 0,
176 };
177 instance.apply_pragmas()?;
178 Ok(instance)
179 }
180
181 fn apply_pragmas(&mut self) -> Result<(), DatabaseError> {
182 if let Some(cache_size) = self.config.cache_size {
184 let sql = format!("PRAGMA cache_size = {}", cache_size);
185 log::debug!("Setting cache_size: {}", sql);
186 let mut stmt = self.connection.prepare(&sql)
187 .map_err(|e| {
188 log::warn!("Failed to prepare cache_size statement: {}", e);
189 DatabaseError::from(e)
190 })?;
191 let _ = stmt.query_map([], |_| Ok(()))
192 .map_err(|e| {
193 log::warn!("Failed to set cache_size: {}", e);
194 DatabaseError::from(e)
195 })?;
196 }
197
198 if let Some(page_size) = self.config.page_size {
199 let sql = format!("PRAGMA page_size = {}", page_size);
200 log::debug!("Setting page_size: {}", sql);
201 let mut stmt = self.connection.prepare(&sql)
202 .map_err(|e| {
203 log::warn!("Failed to prepare page_size statement: {}", e);
204 DatabaseError::from(e)
205 })?;
206 let _ = stmt.query_map([], |_| Ok(()))
207 .map_err(|e| {
208 log::warn!("Failed to set page_size: {}", e);
209 DatabaseError::from(e)
210 })?;
211 }
212
213 if let Some(journal_mode) = &self.config.journal_mode {
214 let sql = format!("PRAGMA journal_mode = {}", journal_mode);
215 log::debug!("Setting journal_mode: {}", sql);
216 let mut stmt = self.connection.prepare(&sql)
217 .map_err(|e| {
218 log::warn!("Failed to prepare journal_mode statement: {}", e);
219 DatabaseError::from(e)
220 })?;
221 let _ = stmt.query_map([], |_| Ok(()))
222 .map_err(|e| {
223 log::warn!("Failed to set journal_mode: {}", e);
224 DatabaseError::from(e)
225 })?;
226 }
227
228 log::info!("SQLiteIndexedDB configured successfully");
229 Ok(())
230 }
231
232 pub async fn execute(&mut self, sql: &str) -> Result<QueryResult, DatabaseError> {
233 self.execute_with_params(sql, &[]).await
234 }
235
236 pub fn prepare(&mut self, sql: &str) -> Result<PreparedStatement<'_>, DatabaseError> {
252 log::debug!("Preparing SQL statement: {}", sql);
253 let stmt = self.connection.prepare(sql)
254 .map_err(|e| DatabaseError::from(e).with_sql(sql))?;
255 Ok(PreparedStatement { stmt })
256 }
257
258 pub async fn execute_with_params(&mut self, sql: &str, params: &[ColumnValue]) -> Result<QueryResult, DatabaseError> {
259 log::debug!("Executing SQL: {}", sql);
260 let start_time = Instant::now();
261
262 let rusqlite_params: Vec<rusqlite::types::Value> = params.iter()
264 .map(|p| p.to_rusqlite_value())
265 .collect();
266
267 let trimmed_sql = sql.trim_start().to_lowercase();
269 let is_select = trimmed_sql.starts_with("select") ||
270 trimmed_sql.starts_with("with") ||
271 trimmed_sql.starts_with("pragma");
272
273 let mut result = QueryResult {
274 columns: Vec::new(),
275 rows: Vec::new(),
276 affected_rows: 0,
277 last_insert_id: None,
278 execution_time_ms: 0.0,
279 };
280
281 if is_select {
282 let mut stmt = self.connection.prepare(sql)
284 .map_err(|e| DatabaseError::from(e).with_sql(sql))?;
285
286 result.columns = stmt.column_names().iter()
288 .map(|name| name.to_string())
289 .collect();
290
291 let rows = stmt.query_map(params_from_iter(rusqlite_params.iter()), |row| {
293 let mut values = Vec::new();
294 for i in 0..result.columns.len() {
295 let value = row.get_ref(i)?;
296 values.push(ColumnValue::from_rusqlite_value(&value.into()));
297 }
298 Ok(Row { values })
299 }).map_err(|e| DatabaseError::from(e).with_sql(sql))?;
300
301 for row in rows {
302 result.rows.push(row.map_err(|e| DatabaseError::from(e).with_sql(sql))?);
303 }
304 } else {
305 let changes = self.connection.execute(sql, params_from_iter(rusqlite_params.iter()))
307 .map_err(|e| DatabaseError::from(e).with_sql(sql))?;
308
309 result.affected_rows = changes as u32;
310
311 if trimmed_sql.starts_with("insert") {
313 result.last_insert_id = Some(self.connection.last_insert_rowid());
314 }
315 }
316
317 result.execution_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
318
319 log::debug!("SQL executed in {:.2}ms, {} rows affected/returned",
320 result.execution_time_ms,
321 if is_select { result.rows.len() } else { result.affected_rows as usize });
322
323 if trimmed_sql.starts_with("begin") {
325 self.transaction_depth += 1;
326 log::debug!("Transaction BEGIN, depth now: {}", self.transaction_depth);
327 } else if trimmed_sql.starts_with("commit") || trimmed_sql.starts_with("end") {
328 if self.transaction_depth > 0 {
329 self.transaction_depth -= 1;
330 log::debug!("Transaction COMMIT, depth now: {}", self.transaction_depth);
331 }
332 } else if trimmed_sql.starts_with("rollback") {
333 if self.transaction_depth > 0 {
334 self.transaction_depth -= 1;
335 log::debug!("Transaction ROLLBACK, depth now: {}", self.transaction_depth);
336 }
337 }
338
339 if !is_select && self.transaction_depth == 0 {
341 self.sync().await?;
342 }
343
344 Ok(result)
345 }
346
347 pub async fn execute_batch(&mut self, statements: &[String]) -> Result<(), DatabaseError> {
351 log::debug!("Executing batch of {} statements", statements.len());
352 let start_time = Instant::now();
353
354 for (i, sql) in statements.iter().enumerate() {
355 self.execute(sql).await.map_err(|e| {
356 log::error!("Batch execution failed at statement {}: {}", i, sql);
357 e.with_sql(sql)
358 })?;
359 }
360
361 let duration = start_time.elapsed().as_secs_f64() * 1000.0;
362 log::debug!("Batch of {} statements executed in {:.2}ms", statements.len(), duration);
363
364 Ok(())
365 }
366
367 pub async fn sync(&mut self) -> Result<(), DatabaseError> {
368 #[cfg(feature = "fs_persist")]
369 {
370 log::debug!("Syncing database to filesystem");
371 self.storage.sync().await
372 .map_err(|e| DatabaseError::new("SYNC_ERROR", &e.to_string()))?;
373 }
374
375 #[cfg(not(feature = "fs_persist"))]
376 {
377 log::debug!("Native mode without fs_persist - no sync needed");
380 }
381
382 Ok(())
383 }
384
385 pub async fn close(&mut self) -> Result<(), DatabaseError> {
386 log::info!("Closing database");
387 self.sync().await?;
388 Ok(())
390 }
391
392 pub fn get_connection(&self) -> &Connection {
393 &self.connection
394 }
395
396 #[cfg(feature = "fs_persist")]
398 pub fn get_storage(&self) -> &BlockStorage {
399 &self.storage
400 }
401
402 #[cfg(all(not(target_arch = "wasm32"), any(feature = "encryption", feature = "encryption-commoncrypto", feature = "encryption-ios")))]
413 pub async fn new_encrypted(config: DatabaseConfig, key: &str) -> Result<Self, DatabaseError> {
414 log::info!("Creating encrypted SQLiteIndexedDB with config: {:?}", config);
415
416 if key.len() < 8 {
418 return Err(DatabaseError::new(
419 "ENCRYPTION_ERROR",
420 "Encryption key must be at least 8 characters long"
421 ));
422 }
423
424 let vfs_name = format!("{}_vfs_metadata", config.name);
427 let vfs = IndexedDBVFS::new(&vfs_name).await?;
428
429 #[cfg(feature = "fs_persist")]
431 {
432 let storage_name = format!("{}_vfs_storage",
434 config.name.strip_suffix(".db").unwrap_or(&config.name));
435
436 let storage = BlockStorage::new(&storage_name).await
438 .map_err(|e| DatabaseError::new("BLOCKSTORAGE_ERROR", &e.to_string()))?;
439
440 let db_file_path = PathBuf::from(&config.name);
442
443 if let Some(parent) = db_file_path.parent() {
445 std::fs::create_dir_all(parent)
446 .map_err(|e| DatabaseError::new("IO_ERROR", &format!("Failed to create directory: {}", e)))?;
447 }
448
449 if db_file_path.exists() && db_file_path.is_dir() {
453 std::fs::remove_dir_all(&db_file_path)
454 .map_err(|e| DatabaseError::new("IO_ERROR", &format!("Failed to remove existing VFS directory: {}", e)))?;
455 }
456
457 let connection = Connection::open(&db_file_path)
459 .map_err(|e| DatabaseError::from(e))?;
460
461 let escaped_key = key.replace("'", "''");
464 connection.execute_batch(&format!("PRAGMA key = '{}';", escaped_key))
465 .map_err(|e| DatabaseError::new("ENCRYPTION_ERROR", &format!("Failed to set encryption key: {}", e)))?;
466
467 connection.execute("CREATE TABLE IF NOT EXISTS _encryption_check (id INTEGER PRIMARY KEY)", [])
469 .map_err(|e| DatabaseError::new("ENCRYPTION_ERROR", &format!("Failed to verify encryption: {}", e)))?;
470
471 connection.execute("DROP TABLE _encryption_check", [])
473 .map_err(|e| DatabaseError::new("ENCRYPTION_ERROR", &format!("Failed to cleanup test table: {}", e)))?;
474
475 log::info!("Encrypted native database opened with filesystem persistence: {:?}", db_file_path);
476
477 return Self::configure_connection(connection, vfs, config, storage);
478 }
479
480 #[cfg(not(feature = "fs_persist"))]
482 {
483 let connection = Connection::open_in_memory()
484 .map_err(|e| DatabaseError::from(e))?;
485
486 let escaped_key = key.replace("'", "''");
488 connection.execute_batch(&format!("PRAGMA key = '{}';", escaped_key))
489 .map_err(|e| DatabaseError::new("ENCRYPTION_ERROR", &format!("Failed to set encryption key: {}", e)))?;
490
491 return Self::configure_connection(connection, vfs, config);
492 }
493 }
494
495 #[cfg(all(not(target_arch = "wasm32"), any(feature = "encryption", feature = "encryption-commoncrypto", feature = "encryption-ios")))]
505 pub async fn rekey(&self, new_key: &str) -> Result<(), DatabaseError> {
506 log::info!("Rekeying encrypted database");
507
508 if new_key.len() < 8 {
510 return Err(DatabaseError::new(
511 "ENCRYPTION_ERROR",
512 "New encryption key must be at least 8 characters long"
513 ));
514 }
515
516 let escaped_key = new_key.replace("'", "''");
518
519 self.connection.execute_batch(&format!("PRAGMA rekey = '{}';", escaped_key))
521 .map_err(|e| DatabaseError::new("ENCRYPTION_ERROR", &format!("Failed to rekey database: {}", e)))?;
522
523 self.connection.execute_batch("PRAGMA cipher_version;")
525 .map_err(|e| DatabaseError::new("ENCRYPTION_ERROR", &format!("New key verification failed: {}", e)))?;
526
527 log::info!("Successfully rekeyed database");
528 Ok(())
529 }
530}