1#[cfg(target_arch = "wasm32")]
2use wasm_bindgen::prelude::*;
3
4#[cfg(all(not(target_arch = "wasm32"), any(feature = "bundled-sqlite", feature = "encryption", feature = "encryption-commoncrypto", feature = "encryption-ios")))]
9pub extern crate rusqlite;
10
11#[cfg(feature = "console_error_panic_hook")]
13pub use console_error_panic_hook::set_once as set_panic_hook;
14
15#[cfg(feature = "wee_alloc")]
16#[global_allocator]
17static ALLOC: wee_alloc::WeeAlloc = wee_alloc::WeeAlloc::INIT;
18
19#[cfg(all(target_arch = "wasm32", feature = "console_log"))]
21#[wasm_bindgen(start)]
22pub fn init_logger() {
23 #[cfg(debug_assertions)]
26 let log_level = log::Level::Debug;
27 #[cfg(not(debug_assertions))]
28 let log_level = log::Level::Info;
29
30 console_log::init_with_level(log_level)
31 .expect("Failed to initialize console_log");
32
33 log::info!("AbsurderSQL logging initialized at level: {:?}", log_level);
34}
35
36pub mod types;
38pub mod storage;
39pub mod vfs;
40#[cfg(not(target_arch = "wasm32"))]
41pub mod database;
42#[cfg(not(target_arch = "wasm32"))]
43pub use database::PreparedStatement;
44pub mod utils;
45#[cfg(feature = "telemetry")]
46pub mod telemetry;
47
48#[cfg(not(target_arch = "wasm32"))]
50pub use database::SqliteIndexedDB;
51
52#[cfg(not(target_arch = "wasm32"))]
54pub type Database = SqliteIndexedDB;
55
56pub use types::DatabaseConfig;
57pub use types::{QueryResult, ColumnValue, DatabaseError, TransactionOptions, Row};
58
59pub use vfs::indexeddb_vfs::IndexedDBVFS;
61
62#[cfg(target_arch = "wasm32")]
64#[wasm_bindgen]
65pub struct Database {
66 db: *mut sqlite_wasm_rs::sqlite3,
67 #[allow(dead_code)]
68 name: String,
69 #[wasm_bindgen(skip)]
70 on_data_change_callback: Option<js_sys::Function>,
71 #[wasm_bindgen(skip)]
72 allow_non_leader_writes: bool,
73 #[wasm_bindgen(skip)]
74 optimistic_updates_manager: std::cell::RefCell<crate::storage::optimistic_updates::OptimisticUpdatesManager>,
75 #[wasm_bindgen(skip)]
76 coordination_metrics_manager: std::cell::RefCell<crate::storage::coordination_metrics::CoordinationMetricsManager>,
77 #[wasm_bindgen(skip)]
78 #[cfg(feature = "telemetry")]
79 metrics: Option<crate::telemetry::Metrics>,
80 #[wasm_bindgen(skip)]
81 #[cfg(feature = "telemetry")]
82 span_recorder: Option<crate::telemetry::SpanRecorder>,
83 #[wasm_bindgen(skip)]
84 #[cfg(feature = "telemetry")]
85 span_context: Option<crate::telemetry::SpanContext>,
86 #[wasm_bindgen(skip)]
87 max_export_size_bytes: Option<u64>,
88}
89
90#[cfg(target_arch = "wasm32")]
91impl Database {
92 fn is_write_operation(sql: &str) -> bool {
94 let upper = sql.trim().to_uppercase();
95 upper.starts_with("INSERT")
96 || upper.starts_with("UPDATE")
97 || upper.starts_with("DELETE")
98 || upper.starts_with("REPLACE")
99 }
100
101 #[cfg(feature = "telemetry")]
105 pub fn metrics(&self) -> Option<&crate::telemetry::Metrics> {
106 self.metrics.as_ref()
107 }
108
109 async fn check_write_permission(&mut self, sql: &str) -> Result<(), DatabaseError> {
111 if !Self::is_write_operation(sql) {
112 return Ok(());
114 }
115
116 if self.allow_non_leader_writes {
118 log::info!("WRITE_ALLOWED: Non-leader writes enabled for {}", self.name);
119 return Ok(());
120 }
121
122 use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
124
125 let db_name = &self.name;
126 let storage_rc = STORAGE_REGISTRY.with(|reg| {
127 let registry = reg.borrow();
128 registry.get(db_name).cloned()
129 .or_else(|| registry.get(&format!("{}.db", db_name)).cloned())
130 .or_else(|| {
131 if db_name.ends_with(".db") {
132 registry.get(&db_name[..db_name.len()-3]).cloned()
133 } else {
134 None
135 }
136 })
137 });
138
139 if let Some(storage) = storage_rc {
140 let mut storage_mut = storage.borrow_mut();
141 let is_leader = storage_mut.is_leader().await;
142
143 if !is_leader {
144 log::error!("WRITE_DENIED: Instance is not leader for {}", db_name);
145 return Err(DatabaseError::new(
146 "WRITE_PERMISSION_DENIED",
147 "Only the leader tab can write to this database. Use db.isLeader() to check status or call db.allowNonLeaderWrites(true) for single-tab mode."
148 ));
149 }
150
151 log::info!("WRITE_ALLOWED: Instance is leader for {}", db_name);
152 Ok(())
153 } else {
154 log::info!("WRITE_ALLOWED: No storage found for {} (single-instance mode)", db_name);
156 Ok(())
157 }
158 }
159
160 pub async fn new(config: DatabaseConfig) -> Result<Self, DatabaseError> {
161 use std::ffi::{CString, CStr};
162
163 log::debug!("Creating IndexedDBVFS for: {}", config.name);
165 let vfs = crate::vfs::IndexedDBVFS::new(&config.name).await?;
166 log::debug!("Registering VFS as 'indexeddb'");
167 vfs.register("indexeddb")?;
168 log::info!("VFS registered successfully");
169
170 let mut db = std::ptr::null_mut();
171 let filename = if config.name.ends_with(".db") {
172 config.name.clone()
173 } else {
174 format!("{}.db", config.name)
175 };
176
177 let db_name = CString::new(filename.clone())
178 .map_err(|_| DatabaseError::new("INVALID_NAME", "Invalid database name"))?;
179 let vfs_name = CString::new("indexeddb")
180 .map_err(|_| DatabaseError::new("INVALID_VFS", "Invalid VFS name"))?;
181
182 log::debug!("Opening database: {} with VFS: indexeddb", filename);
183 let ret = unsafe {
184 sqlite_wasm_rs::sqlite3_open_v2(
185 db_name.as_ptr(),
186 &mut db as *mut _,
187 sqlite_wasm_rs::SQLITE_OPEN_READWRITE | sqlite_wasm_rs::SQLITE_OPEN_CREATE,
188 vfs_name.as_ptr()
189 )
190 };
191 log::debug!("sqlite3_open_v2 returned: {}", ret);
192
193 if ret != sqlite_wasm_rs::SQLITE_OK {
194 let err_msg = unsafe {
195 let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(db);
196 if !msg_ptr.is_null() {
197 CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
198 } else {
199 "Unknown error".to_string()
200 }
201 };
202 return Err(DatabaseError::new("SQLITE_ERROR", &format!("Failed to open database with IndexedDB VFS: {}", err_msg)));
203 }
204
205 log::info!("Database opened successfully with IndexedDB VFS");
206
207 let exec_sql = |db: *mut sqlite_wasm_rs::sqlite3, sql: &str| -> Result<(), DatabaseError> {
209 let c_sql = CString::new(sql)
210 .map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL statement"))?;
211
212 let ret = unsafe {
213 sqlite_wasm_rs::sqlite3_exec(
214 db,
215 c_sql.as_ptr(),
216 None,
217 std::ptr::null_mut(),
218 std::ptr::null_mut()
219 )
220 };
221
222 if ret != sqlite_wasm_rs::SQLITE_OK {
223 let err_msg = unsafe {
224 let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(db);
225 if !msg_ptr.is_null() {
226 CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
227 } else {
228 "Unknown error".to_string()
229 }
230 };
231 log::warn!("Failed to execute SQL '{}': {}", sql, err_msg);
232 return Err(DatabaseError::new("SQLITE_ERROR", &format!("Failed to execute: {}", err_msg)));
233 }
234 Ok(())
235 };
236
237 if let Some(page_size) = config.page_size {
239 log::debug!("Setting page_size to {}", page_size);
240 exec_sql(db, &format!("PRAGMA page_size = {}", page_size))?;
241 }
242
243 if let Some(cache_size) = config.cache_size {
245 log::debug!("Setting cache_size to {}", cache_size);
246 exec_sql(db, &format!("PRAGMA cache_size = {}", cache_size))?;
247 }
248
249 if let Some(ref journal_mode) = config.journal_mode {
252 log::debug!("Setting journal_mode to {}", journal_mode);
253
254 let pragma_sql = format!("PRAGMA journal_mode = {}", journal_mode);
255 let c_sql = CString::new(pragma_sql.as_str())
256 .map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL statement"))?;
257
258 let mut stmt: *mut sqlite_wasm_rs::sqlite3_stmt = std::ptr::null_mut();
259 let ret = unsafe {
260 sqlite_wasm_rs::sqlite3_prepare_v2(
261 db,
262 c_sql.as_ptr(),
263 -1,
264 &mut stmt as *mut _,
265 std::ptr::null_mut()
266 )
267 };
268
269 if ret == sqlite_wasm_rs::SQLITE_OK && !stmt.is_null() {
270 let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
271 if step_ret == sqlite_wasm_rs::SQLITE_ROW {
272 let result_ptr = unsafe { sqlite_wasm_rs::sqlite3_column_text(stmt, 0) };
273 if !result_ptr.is_null() {
274 let result_mode = unsafe {
275 std::ffi::CStr::from_ptr(result_ptr as *const i8)
276 .to_string_lossy()
277 .to_uppercase()
278 };
279
280 if result_mode != journal_mode.to_uppercase() {
281 log::warn!(
282 "journal_mode {} requested but SQLite set {}",
283 journal_mode, result_mode
284 );
285 } else {
286 log::info!("journal_mode successfully set to {}", result_mode);
287 }
288 }
289 }
290 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
291 } else {
292 log::warn!("Failed to prepare journal_mode PRAGMA");
293 }
294 }
295
296 if let Some(auto_vacuum) = config.auto_vacuum {
298 let vacuum_mode = if auto_vacuum { 1 } else { 0 }; log::debug!("Setting auto_vacuum to {}", vacuum_mode);
300 exec_sql(db, &format!("PRAGMA auto_vacuum = {}", vacuum_mode))?;
301 }
302
303 log::info!("Database configuration applied successfully");
304
305 #[cfg(feature = "telemetry")]
307 let metrics = crate::telemetry::Metrics::new()
308 .map_err(|e| DatabaseError::new("METRICS_ERROR", &format!("Failed to initialize metrics: {}", e)))?;
309
310 Ok(Database {
311 db,
312 name: config.name.clone(),
313 on_data_change_callback: None,
314 allow_non_leader_writes: false,
315 optimistic_updates_manager: std::cell::RefCell::new(crate::storage::optimistic_updates::OptimisticUpdatesManager::new()),
316 coordination_metrics_manager: std::cell::RefCell::new(crate::storage::coordination_metrics::CoordinationMetricsManager::new()),
317 #[cfg(feature = "telemetry")]
318 metrics: Some(metrics),
319 #[cfg(feature = "telemetry")]
320 span_recorder: None,
321 #[cfg(feature = "telemetry")]
322 span_context: Some(crate::telemetry::SpanContext::new()),
323 max_export_size_bytes: config.max_export_size_bytes,
324 })
325 }
326
327 pub async fn open_with_vfs(filename: &str, vfs_name: &str) -> Result<Self, DatabaseError> {
329 use std::ffi::CString;
330
331 log::info!("Opening database {} with VFS {}", filename, vfs_name);
332
333 let mut db: *mut sqlite_wasm_rs::sqlite3 = std::ptr::null_mut();
334 let db_name = CString::new(filename)
335 .map_err(|_| DatabaseError::new("INVALID_NAME", "Invalid database name"))?;
336 let vfs_cstr = CString::new(vfs_name)
337 .map_err(|_| DatabaseError::new("INVALID_VFS", "Invalid VFS name"))?;
338
339 let ret = unsafe {
340 sqlite_wasm_rs::sqlite3_open_v2(
341 db_name.as_ptr(),
342 &mut db as *mut _,
343 sqlite_wasm_rs::SQLITE_OPEN_READWRITE | sqlite_wasm_rs::SQLITE_OPEN_CREATE,
344 vfs_cstr.as_ptr()
345 )
346 };
347
348 if ret != sqlite_wasm_rs::SQLITE_OK {
349 let err_msg = if !db.is_null() {
350 unsafe {
351 let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(db);
352 if !msg_ptr.is_null() {
353 std::ffi::CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
354 } else {
355 "Unknown error".to_string()
356 }
357 }
358 } else {
359 "Failed to open database".to_string()
360 };
361 return Err(DatabaseError::new("SQLITE_ERROR", &err_msg));
362 }
363
364 let name = filename.strip_prefix("file:").unwrap_or(filename)
366 .strip_suffix(".db").unwrap_or(filename)
367 .to_string();
368
369 log::info!("Successfully opened database {} with VFS {}", name, vfs_name);
370
371 #[cfg(feature = "telemetry")]
373 let metrics = crate::telemetry::Metrics::new()
374 .map_err(|e| DatabaseError::new("METRICS_ERROR", &format!("Failed to initialize metrics: {}", e)))?;
375
376 Ok(Database {
377 db,
378 name,
379 on_data_change_callback: None,
380 allow_non_leader_writes: false,
381 optimistic_updates_manager: std::cell::RefCell::new(crate::storage::optimistic_updates::OptimisticUpdatesManager::new()),
382 coordination_metrics_manager: std::cell::RefCell::new(crate::storage::coordination_metrics::CoordinationMetricsManager::new()),
383 #[cfg(feature = "telemetry")]
384 metrics: Some(metrics),
385 #[cfg(feature = "telemetry")]
386 span_recorder: None,
387 #[cfg(feature = "telemetry")]
388 span_context: Some(crate::telemetry::SpanContext::new()),
389 max_export_size_bytes: Some(2 * 1024 * 1024 * 1024), })
391 }
392
393 pub async fn execute_internal(&mut self, sql: &str) -> Result<QueryResult, DatabaseError> {
394 use std::ffi::CString;
395 let start_time = js_sys::Date::now();
396
397 #[cfg(feature = "telemetry")]
399 let span = if self.span_recorder.is_some() {
400 let query_type = sql.trim().split_whitespace().next().unwrap_or("UNKNOWN").to_uppercase();
401 let mut builder = crate::telemetry::SpanBuilder::new("execute_query".to_string())
402 .with_attribute("query_type", query_type.clone())
403 .with_attribute("sql", sql.to_string());
404
405 if let Some(ref context) = self.span_context {
407 builder = builder.with_baggage_from_context(context);
408 }
409
410 let span = builder.build();
411
412 if let Some(ref context) = self.span_context {
414 context.enter_span(span.span_id.clone());
415 }
416
417 Some(span)
418 } else {
419 None
420 };
421
422 #[cfg(feature = "telemetry")]
424 #[cfg(feature = "telemetry")]
425 if let Some(metrics) = &self.metrics {
426 metrics.queries_total().inc();
427 }
428
429 let sql_cstr = CString::new(sql)
430 .map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL string"))?;
431
432 if sql.trim().to_uppercase().starts_with("SELECT") {
433 let mut stmt = std::ptr::null_mut();
434 let ret = unsafe {
435 sqlite_wasm_rs::sqlite3_prepare_v2(
436 self.db,
437 sql_cstr.as_ptr(),
438 -1,
439 &mut stmt,
440 std::ptr::null_mut()
441 )
442 };
443
444 if ret != sqlite_wasm_rs::SQLITE_OK {
445 #[cfg(feature = "telemetry")]
447 #[cfg(feature = "telemetry")]
448 if let Some(metrics) = &self.metrics {
449 metrics.errors_total().inc();
450 }
451
452 #[cfg(feature = "telemetry")]
454 if let Some(mut s) = span {
455 s.status = crate::telemetry::SpanStatus::Error("Failed to prepare statement".to_string());
456 s.end_time_ms = Some(js_sys::Date::now());
457 if let Some(recorder) = &self.span_recorder {
458 recorder.record_span(s);
459 }
460
461 if let Some(ref context) = self.span_context {
463 context.exit_span();
464 }
465 }
466
467 return Err(DatabaseError::new("SQLITE_ERROR", "Failed to prepare statement").with_sql(sql));
468 }
469
470 let column_count = unsafe { sqlite_wasm_rs::sqlite3_column_count(stmt) };
471 let mut columns = Vec::new();
472 let mut rows = Vec::new();
473
474 for i in 0..column_count {
476 let col_name = unsafe {
477 let name_ptr = sqlite_wasm_rs::sqlite3_column_name(stmt, i);
478 if name_ptr.is_null() {
479 format!("col_{}", i)
480 } else {
481 std::ffi::CStr::from_ptr(name_ptr).to_string_lossy().into_owned()
482 }
483 };
484 columns.push(col_name);
485 }
486
487 loop {
489 let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
490 if step_ret == sqlite_wasm_rs::SQLITE_ROW {
491 let mut values = Vec::new();
492 for i in 0..column_count {
493 let value = unsafe {
494 let col_type = sqlite_wasm_rs::sqlite3_column_type(stmt, i);
495 match col_type {
496 sqlite_wasm_rs::SQLITE_NULL => ColumnValue::Null,
497 sqlite_wasm_rs::SQLITE_INTEGER => {
498 let val = sqlite_wasm_rs::sqlite3_column_int64(stmt, i);
499 ColumnValue::Integer(val)
500 },
501 sqlite_wasm_rs::SQLITE_FLOAT => {
502 let val = sqlite_wasm_rs::sqlite3_column_double(stmt, i);
503 ColumnValue::Real(val)
504 },
505 sqlite_wasm_rs::SQLITE_TEXT => {
506 let text_ptr = sqlite_wasm_rs::sqlite3_column_text(stmt, i);
507 if text_ptr.is_null() {
508 ColumnValue::Null
509 } else {
510 let text = std::ffi::CStr::from_ptr(text_ptr as *const i8).to_string_lossy().into_owned();
511 ColumnValue::Text(text)
512 }
513 },
514 sqlite_wasm_rs::SQLITE_BLOB => {
515 let blob_ptr = sqlite_wasm_rs::sqlite3_column_blob(stmt, i);
516 let blob_size = sqlite_wasm_rs::sqlite3_column_bytes(stmt, i);
517 if blob_ptr.is_null() || blob_size == 0 {
518 ColumnValue::Blob(vec![])
519 } else {
520 let blob_slice = std::slice::from_raw_parts(blob_ptr as *const u8, blob_size as usize);
521 ColumnValue::Blob(blob_slice.to_vec())
522 }
523 },
524 _ => ColumnValue::Null,
525 }
526 };
527 values.push(value);
528 }
529 rows.push(Row { values });
530 } else if step_ret == sqlite_wasm_rs::SQLITE_DONE {
531 break;
532 } else {
533 let err_msg = unsafe {
535 let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db);
536 if !err_ptr.is_null() {
537 std::ffi::CStr::from_ptr(err_ptr)
538 .to_string_lossy()
539 .to_string()
540 } else {
541 "Unknown SQLite error".to_string()
542 }
543 };
544 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
545 #[cfg(feature = "telemetry")]
547 if let Some(metrics) = &self.metrics {
548 metrics.errors_total().inc();
549 }
550 return Err(DatabaseError::new("SQLITE_ERROR", &format!("Error executing SELECT statement: {}", err_msg)).with_sql(sql));
551 }
552 }
553
554 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
555 let execution_time_ms = js_sys::Date::now() - start_time;
556
557 #[cfg(feature = "telemetry")]
559 if let Some(metrics) = &self.metrics {
560 metrics.query_duration().observe(execution_time_ms);
561 }
562
563 Ok(QueryResult {
564 columns,
565 rows,
566 affected_rows: 0,
567 last_insert_id: None,
568 execution_time_ms,
569 })
570 } else {
571 let mut stmt: *mut sqlite_wasm_rs::sqlite3_stmt = std::ptr::null_mut();
573 let ret = unsafe {
574 sqlite_wasm_rs::sqlite3_prepare_v2(
575 self.db,
576 sql_cstr.as_ptr(),
577 -1,
578 &mut stmt,
579 std::ptr::null_mut()
580 )
581 };
582
583 if ret != sqlite_wasm_rs::SQLITE_OK {
584 #[cfg(feature = "telemetry")]
586 if let Some(metrics) = &self.metrics {
587 metrics.errors_total().inc();
588 }
589 return Err(DatabaseError::new("SQLITE_ERROR", "Failed to prepare statement").with_sql(sql));
590 }
591
592 let column_count = unsafe { sqlite_wasm_rs::sqlite3_column_count(stmt) };
594 let mut columns = Vec::new();
595 let mut rows = Vec::new();
596
597 if column_count > 0 {
598 for i in 0..column_count {
600 let col_name = unsafe {
601 let name_ptr = sqlite_wasm_rs::sqlite3_column_name(stmt, i);
602 if name_ptr.is_null() {
603 format!("column_{}", i)
604 } else {
605 std::ffi::CStr::from_ptr(name_ptr).to_string_lossy().into_owned()
606 }
607 };
608 columns.push(col_name);
609 }
610
611 loop {
613 let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
614 if step_ret == sqlite_wasm_rs::SQLITE_ROW {
615 let mut values = Vec::new();
616 for i in 0..column_count {
617 let value = unsafe {
618 let col_type = sqlite_wasm_rs::sqlite3_column_type(stmt, i);
619 match col_type {
620 sqlite_wasm_rs::SQLITE_TEXT => {
621 let text_ptr = sqlite_wasm_rs::sqlite3_column_text(stmt, i);
622 if text_ptr.is_null() {
623 ColumnValue::Null
624 } else {
625 let text = std::ffi::CStr::from_ptr(text_ptr as *const i8).to_string_lossy().into_owned();
626 ColumnValue::Text(text)
627 }
628 },
629 sqlite_wasm_rs::SQLITE_INTEGER => {
630 ColumnValue::Integer(sqlite_wasm_rs::sqlite3_column_int64(stmt, i))
631 },
632 _ => ColumnValue::Null,
633 }
634 };
635 values.push(value);
636 }
637 rows.push(Row { values });
638 } else if step_ret == sqlite_wasm_rs::SQLITE_DONE {
639 break;
640 } else {
641 let err_msg = unsafe {
643 let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db);
644 if !err_ptr.is_null() {
645 std::ffi::CStr::from_ptr(err_ptr)
646 .to_string_lossy()
647 .to_string()
648 } else {
649 "Unknown SQLite error".to_string()
650 }
651 };
652 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
653 #[cfg(feature = "telemetry")]
655 if let Some(metrics) = &self.metrics {
656 metrics.errors_total().inc();
657 }
658 return Err(DatabaseError::new("SQLITE_ERROR", &format!("Failed to execute statement: {}", err_msg)).with_sql(sql));
659 }
660 }
661 } else {
662 let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
664 if step_ret != sqlite_wasm_rs::SQLITE_DONE {
665 let err_msg = unsafe {
667 let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db);
668 if !err_ptr.is_null() {
669 std::ffi::CStr::from_ptr(err_ptr)
670 .to_string_lossy()
671 .to_string()
672 } else {
673 "Unknown SQLite error".to_string()
674 }
675 };
676 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
677 #[cfg(feature = "telemetry")]
679 if let Some(metrics) = &self.metrics {
680 metrics.errors_total().inc();
681 }
682 return Err(DatabaseError::new("SQLITE_ERROR", &format!("Failed to execute statement: {}", err_msg)).with_sql(sql));
683 }
684 }
685
686 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
688
689 let affected_rows = unsafe { sqlite_wasm_rs::sqlite3_changes(self.db) } as u32;
690 let last_insert_id = if sql.trim().to_uppercase().starts_with("INSERT") {
691 Some(unsafe { sqlite_wasm_rs::sqlite3_last_insert_rowid(self.db) })
692 } else {
693 None
694 };
695
696 let execution_time_ms = js_sys::Date::now() - start_time;
697
698 #[cfg(feature = "telemetry")]
700 if let Some(metrics) = &self.metrics {
701 metrics.query_duration().observe(execution_time_ms);
702 }
703
704 #[cfg(feature = "telemetry")]
706 if let Some(mut s) = span {
707 s.status = crate::telemetry::SpanStatus::Ok;
708 s.end_time_ms = Some(js_sys::Date::now());
709 s.attributes.insert("duration_ms".to_string(), execution_time_ms.to_string());
710 s.attributes.insert("affected_rows".to_string(), affected_rows.to_string());
711 s.attributes.insert("row_count".to_string(), rows.len().to_string());
712 if let Some(recorder) = &self.span_recorder {
713 recorder.record_span(s);
714 }
715
716 if let Some(ref context) = self.span_context {
718 context.exit_span();
719 }
720 }
721
722 Ok(QueryResult {
723 columns,
724 rows,
725 affected_rows,
726 last_insert_id,
727 execution_time_ms,
728 })
729 }
730 }
731
732 pub async fn execute_with_params_internal(&mut self, sql: &str, params: &[ColumnValue]) -> Result<QueryResult, DatabaseError> {
733 use std::ffi::CString;
734 let start_time = js_sys::Date::now();
735
736 #[cfg(feature = "telemetry")]
738 let span = if self.span_recorder.is_some() {
739 let query_type = sql.trim().split_whitespace().next().unwrap_or("UNKNOWN").to_uppercase();
740 let span = crate::telemetry::SpanBuilder::new("execute_query".to_string())
741 .with_attribute("query_type", query_type.clone())
742 .with_attribute("sql", sql.to_string())
743 .build();
744 Some(span)
745 } else {
746 None
747 };
748
749 #[cfg(feature = "telemetry")]
751 self.ensure_metrics_propagated();
752
753 #[cfg(feature = "telemetry")]
755 if let Some(metrics) = &self.metrics {
756 metrics.queries_total().inc();
757 }
758
759 let sql_cstr = CString::new(sql)
760 .map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL string"))?;
761
762 let mut stmt = std::ptr::null_mut();
763 let ret = unsafe {
764 sqlite_wasm_rs::sqlite3_prepare_v2(
765 self.db,
766 sql_cstr.as_ptr(),
767 -1,
768 &mut stmt,
769 std::ptr::null_mut()
770 )
771 };
772
773 if ret != sqlite_wasm_rs::SQLITE_OK {
774 #[cfg(feature = "telemetry")]
776 if let Some(metrics) = &self.metrics {
777 metrics.errors_total().inc();
778 }
779
780 #[cfg(feature = "telemetry")]
782 if let Some(mut s) = span {
783 s.status = crate::telemetry::SpanStatus::Error("Failed to prepare statement".to_string());
784 s.end_time_ms = Some(js_sys::Date::now());
785 if let Some(recorder) = &self.span_recorder {
786 recorder.record_span(s);
787 }
788 }
789
790 return Err(DatabaseError::new("SQLITE_ERROR", "Failed to prepare statement").with_sql(sql));
791 }
792
793 let mut text_cstrings = Vec::new(); for (i, param) in params.iter().enumerate() {
796 let param_index = (i + 1) as i32;
797 let bind_ret = unsafe {
798 match param {
799 ColumnValue::Null => sqlite_wasm_rs::sqlite3_bind_null(stmt, param_index),
800 ColumnValue::Integer(val) => sqlite_wasm_rs::sqlite3_bind_int64(stmt, param_index, *val),
801 ColumnValue::Real(val) => sqlite_wasm_rs::sqlite3_bind_double(stmt, param_index, *val),
802 ColumnValue::Text(val) => {
803 let sanitized = val.replace('\0', "");
805 let text_cstr = CString::new(sanitized.as_str())
807 .expect("CString::new should not fail after null byte removal");
808 let result = sqlite_wasm_rs::sqlite3_bind_text(
809 stmt,
810 param_index,
811 text_cstr.as_ptr(),
812 sanitized.len() as i32,
813 sqlite_wasm_rs::SQLITE_TRANSIENT()
814 );
815 text_cstrings.push(text_cstr); result
817 },
818 ColumnValue::Blob(val) => {
819 sqlite_wasm_rs::sqlite3_bind_blob(
820 stmt,
821 param_index,
822 val.as_ptr() as *const _,
823 val.len() as i32,
824 sqlite_wasm_rs::SQLITE_TRANSIENT()
825 )
826 },
827 _ => sqlite_wasm_rs::sqlite3_bind_null(stmt, param_index),
828 }
829 };
830
831 if bind_ret != sqlite_wasm_rs::SQLITE_OK {
832 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
833 #[cfg(feature = "telemetry")]
835 if let Some(metrics) = &self.metrics {
836 metrics.errors_total().inc();
837 }
838 return Err(DatabaseError::new("SQLITE_ERROR", "Failed to bind parameter").with_sql(sql));
839 }
840 }
841
842 if sql.trim().to_uppercase().starts_with("SELECT") {
843 let column_count = unsafe { sqlite_wasm_rs::sqlite3_column_count(stmt) };
844 let mut columns = Vec::new();
845 let mut rows = Vec::new();
846
847 for i in 0..column_count {
849 let col_name = unsafe {
850 let name_ptr = sqlite_wasm_rs::sqlite3_column_name(stmt, i);
851 if name_ptr.is_null() {
852 format!("col_{}", i)
853 } else {
854 std::ffi::CStr::from_ptr(name_ptr).to_string_lossy().into_owned()
855 }
856 };
857 columns.push(col_name);
858 }
859
860 loop {
862 let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
863 if step_ret == sqlite_wasm_rs::SQLITE_ROW {
864 let mut values = Vec::new();
865 for i in 0..column_count {
866 let value = unsafe {
867 let col_type = sqlite_wasm_rs::sqlite3_column_type(stmt, i);
868 match col_type {
869 sqlite_wasm_rs::SQLITE_NULL => ColumnValue::Null,
870 sqlite_wasm_rs::SQLITE_INTEGER => {
871 let val = sqlite_wasm_rs::sqlite3_column_int64(stmt, i);
872 ColumnValue::Integer(val)
873 },
874 sqlite_wasm_rs::SQLITE_FLOAT => {
875 let val = sqlite_wasm_rs::sqlite3_column_double(stmt, i);
876 ColumnValue::Real(val)
877 },
878 sqlite_wasm_rs::SQLITE_TEXT => {
879 let text_ptr = sqlite_wasm_rs::sqlite3_column_text(stmt, i);
880 if text_ptr.is_null() {
881 ColumnValue::Null
882 } else {
883 let text = std::ffi::CStr::from_ptr(text_ptr as *const i8).to_string_lossy().into_owned();
884 ColumnValue::Text(text)
885 }
886 },
887 sqlite_wasm_rs::SQLITE_BLOB => {
888 let blob_ptr = sqlite_wasm_rs::sqlite3_column_blob(stmt, i);
889 let blob_size = sqlite_wasm_rs::sqlite3_column_bytes(stmt, i);
890 if blob_ptr.is_null() || blob_size == 0 {
891 ColumnValue::Blob(vec![])
892 } else {
893 let blob_slice = std::slice::from_raw_parts(blob_ptr as *const u8, blob_size as usize);
894 ColumnValue::Blob(blob_slice.to_vec())
895 }
896 },
897 _ => ColumnValue::Null,
898 }
899 };
900 values.push(value);
901 }
902 rows.push(Row { values });
903 } else if step_ret == sqlite_wasm_rs::SQLITE_DONE {
904 break;
905 } else {
906 let err_msg = unsafe {
908 let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db);
909 if !err_ptr.is_null() {
910 std::ffi::CStr::from_ptr(err_ptr)
911 .to_string_lossy()
912 .to_string()
913 } else {
914 "Unknown SQLite error".to_string()
915 }
916 };
917 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
918 #[cfg(feature = "telemetry")]
920 if let Some(metrics) = &self.metrics {
921 metrics.errors_total().inc();
922 }
923 return Err(DatabaseError::new("SQLITE_ERROR", &format!("Error executing SELECT statement: {}", err_msg)).with_sql(sql));
924 }
925 }
926
927 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
928
929
930 let execution_time_ms = js_sys::Date::now() - start_time;
931
932 #[cfg(feature = "telemetry")]
934 if let Some(metrics) = &self.metrics {
935 metrics.query_duration().observe(execution_time_ms);
936 }
937
938 #[cfg(feature = "telemetry")]
940 if let Some(mut s) = span {
941 s.status = crate::telemetry::SpanStatus::Ok;
942 s.end_time_ms = Some(js_sys::Date::now());
943 s.attributes.insert("duration_ms".to_string(), execution_time_ms.to_string());
944 s.attributes.insert("row_count".to_string(), rows.len().to_string());
945 if let Some(recorder) = &self.span_recorder {
946 recorder.record_span(s);
947 }
948 }
949
950 Ok(QueryResult {
951 columns,
952 rows,
953 affected_rows: 0,
954 last_insert_id: None,
955 execution_time_ms,
956 })
957 } else {
958 let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
960 #[cfg(feature = "telemetry")]
962 if let Some(metrics) = &self.metrics {
963 metrics.errors_total().inc();
964 }
965 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
966
967 if step_ret != sqlite_wasm_rs::SQLITE_DONE {
968 let err_msg = unsafe {
969 let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db);
970 if !err_ptr.is_null() {
971 std::ffi::CStr::from_ptr(err_ptr)
972 .to_string_lossy()
973 .to_string()
974 } else {
975 "Unknown SQLite error".to_string()
976 }
977 };
978 return Err(DatabaseError::new("SQLITE_ERROR", &format!("Failed to execute statement: {}", err_msg)).with_sql(sql));
979 }
980
981
982 let execution_time_ms = js_sys::Date::now() - start_time;
983
984 #[cfg(feature = "telemetry")]
986 if let Some(metrics) = &self.metrics {
987 metrics.query_duration().observe(execution_time_ms);
988 }
989 let affected_rows = unsafe { sqlite_wasm_rs::sqlite3_changes(self.db) } as u32;
990 let last_insert_id = if sql.trim().to_uppercase().starts_with("INSERT") {
991 Some(unsafe { sqlite_wasm_rs::sqlite3_last_insert_rowid(self.db) })
992 } else {
993 None
994 };
995
996 #[cfg(feature = "telemetry")]
998 if let Some(mut s) = span {
999 s.status = crate::telemetry::SpanStatus::Ok;
1000 s.end_time_ms = Some(js_sys::Date::now());
1001 s.attributes.insert("duration_ms".to_string(), execution_time_ms.to_string());
1002 s.attributes.insert("affected_rows".to_string(), affected_rows.to_string());
1003 if let Some(recorder) = &self.span_recorder {
1004 recorder.record_span(s);
1005 }
1006 }
1007
1008 Ok(QueryResult {
1009 columns: vec![],
1010 rows: vec![],
1011 affected_rows,
1012 last_insert_id,
1013 execution_time_ms,
1014 })
1015 }
1016 }
1017
1018 #[cfg(feature = "telemetry")]
1020 pub fn set_metrics(&mut self, metrics: Option<crate::telemetry::Metrics>) {
1021 self.metrics = metrics.clone();
1022 self.ensure_metrics_propagated();
1023 }
1024
1025 #[cfg(feature = "telemetry")]
1027 pub fn set_span_recorder(&mut self, recorder: Option<crate::telemetry::SpanRecorder>) {
1028 self.span_recorder = recorder;
1029 }
1030
1031 #[cfg(feature = "telemetry")]
1033 pub fn get_span_context(&self) -> Option<&crate::telemetry::SpanContext> {
1034 self.span_context.as_ref()
1035 }
1036
1037 #[cfg(feature = "telemetry")]
1039 pub fn get_span_recorder(&self) -> Option<&crate::telemetry::SpanRecorder> {
1040 self.span_recorder.as_ref()
1041 }
1042 #[cfg(feature = "telemetry")]
1044 fn ensure_metrics_propagated(&self) {
1045 #[cfg(target_arch = "wasm32")]
1047 {
1048 use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
1049
1050 if self.metrics.is_none() {
1051 return;
1052 }
1053
1054 let db_name = &self.name;
1055
1056 STORAGE_REGISTRY.with(|reg| {
1057 let registry = reg.borrow();
1058 if let Some(storage_rc) = registry.get(db_name)
1059 .or_else(|| registry.get(&format!("{}.db", db_name)))
1060 .or_else(|| {
1061 if db_name.ends_with(".db") {
1062 registry.get(&db_name[..db_name.len()-3])
1063 } else {
1064 None
1065 }
1066 })
1067 {
1068 let mut storage = storage_rc.borrow_mut();
1069 storage.set_metrics(self.metrics.clone());
1070 }
1071 });
1072 }
1073 }
1074
1075 pub async fn close_internal(&mut self) -> Result<(), DatabaseError> {
1076 log::debug!("Checkpointing WAL before close: {}", self.name);
1079 let _ = self.execute_internal("PRAGMA wal_checkpoint(PASSIVE)").await;
1080
1081 log::debug!("Syncing database before close: {}", self.name);
1083 self.sync_internal().await?;
1084
1085 #[cfg(target_arch = "wasm32")]
1087 {
1088 use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
1089
1090 let db_name = &self.name;
1091 let storage_rc = STORAGE_REGISTRY.with(|reg| {
1092 let registry = reg.borrow();
1093 registry.get(db_name).cloned()
1094 .or_else(|| registry.get(&format!("{}.db", db_name)).cloned())
1095 });
1096
1097 if let Some(storage_rc) = storage_rc {
1098 if let Ok(mut storage) = storage_rc.try_borrow_mut() {
1099 log::debug!("Stopping leader election for {}", db_name);
1100 let _ = storage.stop_leader_election().await;
1101 }
1102 }
1103 }
1104
1105 if !self.db.is_null() {
1106 unsafe {
1107 sqlite_wasm_rs::sqlite3_close(self.db);
1108 self.db = std::ptr::null_mut();
1109 }
1110 }
1111 Ok(())
1112 }
1113
1114 pub async fn query(&mut self, sql: &str) -> Result<Vec<Row>, DatabaseError> {
1116 let result = self.execute_internal(sql).await?;
1117 Ok(result.rows)
1118 }
1119
1120 pub async fn sync_internal(&mut self) -> Result<(), DatabaseError> {
1121 #[cfg(all(target_arch = "wasm32", feature = "telemetry"))]
1123 let start_time = js_sys::Date::now();
1124
1125 #[cfg(feature = "telemetry")]
1127 let span = if self.span_recorder.is_some() {
1128 let mut builder = crate::telemetry::SpanBuilder::new("vfs_sync".to_string())
1129 .with_attribute("operation", "sync");
1130
1131 if let Some(ref context) = self.span_context {
1133 builder = builder.with_context(context).with_baggage_from_context(context);
1134 }
1135
1136 let span = builder.build();
1137
1138 if let Some(ref context) = self.span_context {
1140 context.enter_span(span.span_id.clone());
1141 }
1142
1143 Some(span)
1144 } else {
1145 None
1146 };
1147
1148 #[cfg(feature = "telemetry")]
1150 if let Some(ref metrics) = self.metrics {
1151 metrics.sync_operations_total().inc();
1152 }
1153
1154 #[cfg(feature = "telemetry")]
1156 let mut blocks_count = 0;
1157
1158 #[cfg(target_arch = "wasm32")]
1160 {
1161 use crate::storage::vfs_sync;
1162
1163 let next_commit = vfs_sync::with_global_commit_marker(|cm| {
1165 let mut cm = cm.borrow_mut();
1166 let current = cm.get(&self.name).copied().unwrap_or(0);
1167 let new_marker = current + 1;
1168 cm.insert(self.name.clone(), new_marker);
1169 log::debug!("Advanced commit marker for {} from {} to {}", self.name, current, new_marker);
1170 new_marker
1171 });
1172
1173 let (blocks_to_persist, metadata_to_persist) = vfs_sync::with_global_storage(|storage| {
1175 let storage_map = storage.borrow();
1176 let blocks = if let Some(db_storage) = storage_map.get(&self.name) {
1177 db_storage.iter().map(|(&id, data)| (id, data.clone())).collect::<Vec<_>>()
1178 } else {
1179 Vec::new()
1180 };
1181
1182 let metadata = vfs_sync::with_global_metadata(|meta| {
1183 let meta_map = meta.borrow();
1184 if let Some(db_meta) = meta_map.get(&self.name) {
1185 db_meta.iter().map(|(&id, metadata)| (id, metadata.checksum)).collect::<Vec<_>>()
1186 } else {
1187 Vec::new()
1188 }
1189 });
1190
1191 (blocks, metadata)
1192 });
1193
1194 if !blocks_to_persist.is_empty() {
1195 #[cfg(feature = "telemetry")]
1196 {
1197 blocks_count = blocks_to_persist.len();
1198 }
1199 log::debug!("Persisting {} blocks to IndexedDB for {}", blocks_to_persist.len(), self.name);
1200 crate::storage::wasm_indexeddb::persist_to_indexeddb_event_based(
1201 &self.name,
1202 blocks_to_persist,
1203 metadata_to_persist,
1204 next_commit,
1205 #[cfg(feature = "telemetry")]
1206 self.span_recorder.clone(),
1207 #[cfg(feature = "telemetry")]
1208 span.as_ref().map(|s| s.span_id.clone()),
1209 ).await?;
1210 log::debug!("Successfully persisted {} to IndexedDB (awaited)", self.name);
1211 } else {
1212 log::debug!("No blocks to persist for {}", self.name);
1213 }
1214
1215 use crate::storage::broadcast_notifications::{BroadcastNotification, send_change_notification};
1217
1218 let notification = BroadcastNotification::DataChanged {
1219 db_name: self.name.clone(),
1220 timestamp: js_sys::Date::now() as u64,
1221 };
1222
1223 log::debug!("Sending DataChanged notification for {}", self.name);
1224
1225 if let Err(e) = send_change_notification(¬ification) {
1226 log::warn!("Failed to send change notification: {}", e);
1227 }
1229 }
1230
1231 #[cfg(all(target_arch = "wasm32", feature = "telemetry"))]
1233 if let Some(ref metrics) = self.metrics {
1234 let duration_ms = js_sys::Date::now() - start_time;
1235 metrics.sync_duration().observe(duration_ms);
1236 }
1237
1238 #[cfg(feature = "telemetry")]
1240 if let Some(mut s) = span {
1241 s.status = crate::telemetry::SpanStatus::Ok;
1242 #[cfg(target_arch = "wasm32")]
1243 {
1244 s.end_time_ms = Some(js_sys::Date::now());
1245 let duration_ms = s.end_time_ms.unwrap() - s.start_time_ms;
1246 s.attributes.insert("duration_ms".to_string(), duration_ms.to_string());
1247 }
1248 #[cfg(not(target_arch = "wasm32"))]
1249 {
1250 let now = std::time::SystemTime::now()
1251 .duration_since(std::time::UNIX_EPOCH)
1252 .unwrap()
1253 .as_millis() as f64;
1254 s.end_time_ms = Some(now);
1255 let duration_ms = s.end_time_ms.unwrap() - s.start_time_ms;
1256 s.attributes.insert("duration_ms".to_string(), duration_ms.to_string());
1257 }
1258 s.attributes.insert("blocks_persisted".to_string(), blocks_count.to_string());
1259 if let Some(recorder) = &self.span_recorder {
1260 recorder.record_span(s);
1261 }
1262
1263 if let Some(ref context) = self.span_context {
1265 context.exit_span();
1266 }
1267 }
1268
1269 Ok(())
1270 }
1271}
1272
1273#[cfg(target_arch = "wasm32")]
1274impl Drop for Database {
1275 fn drop(&mut self) {
1276 if !self.db.is_null() {
1277 unsafe {
1278 sqlite_wasm_rs::sqlite3_close(self.db);
1279 }
1280 }
1281
1282 log::debug!("Closed database: {} (BlockStorage remains in registry)", self.name);
1286 }
1287}
1288
1289#[cfg(target_arch = "wasm32")]
1291#[wasm_bindgen]
1292impl Database {
1293 #[wasm_bindgen(js_name = "newDatabase")]
1294 pub async fn new_wasm(name: String) -> Result<Database, JsValue> {
1295 let normalized_name = if name.ends_with(".db") {
1297 name.clone()
1298 } else {
1299 format!("{}.db", name)
1300 };
1301
1302 let config = DatabaseConfig {
1303 name: normalized_name.clone(),
1304 version: Some(1),
1305 cache_size: Some(10_000),
1306 page_size: Some(4096),
1307 auto_vacuum: Some(true),
1308 journal_mode: Some("WAL".to_string()),
1309 max_export_size_bytes: Some(2 * 1024 * 1024 * 1024), };
1311
1312 let db = Database::new(config)
1313 .await
1314 .map_err(|e| JsValue::from_str(&format!("Failed to create database: {}", e)))?;
1315
1316 Self::start_write_queue_listener(&normalized_name)?;
1318
1319 if let Err(e) = Self::add_database_to_persistent_list(&normalized_name) {
1321 log::warn!("Failed to add database to persistent list: {:?}", e);
1322 }
1323
1324 Ok(db)
1325 }
1326
1327 #[wasm_bindgen(js_name = "getAllDatabases")]
1331 pub async fn get_all_databases() -> Result<JsValue, JsValue> {
1332 use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
1333 use crate::storage::vfs_sync::with_global_storage;
1334 use std::collections::HashSet;
1335
1336 log::info!("getAllDatabases called");
1337 let mut db_names = HashSet::new();
1338
1339 match Self::get_persistent_database_list() {
1341 Ok(persistent_list) => {
1342 log::info!("Persistent list has {} entries", persistent_list.len());
1343 for name in persistent_list {
1344 log::info!("Found in persistent list: {}", name);
1345 db_names.insert(name);
1346 }
1347 }
1348 Err(e) => {
1349 log::warn!("Failed to get persistent list: {:?}", e);
1350 }
1351 }
1352
1353 STORAGE_REGISTRY.with(|reg| {
1355 let registry = reg.borrow();
1356 log::info!("STORAGE_REGISTRY has {} entries", registry.len());
1357 for key in registry.keys() {
1358 log::info!("Found in STORAGE_REGISTRY: {}", key);
1359 db_names.insert(key.clone());
1360 }
1361 });
1362
1363 with_global_storage(|gs| {
1365 let storage = gs.borrow();
1366 log::info!("GLOBAL_STORAGE has {} entries", storage.len());
1367 for key in storage.keys() {
1368 log::info!("Found in GLOBAL_STORAGE: {}", key);
1369 db_names.insert(key.clone());
1370 }
1371 });
1372
1373 log::info!("Total unique databases found: {}", db_names.len());
1374
1375 let mut result_vec: Vec<String> = db_names.into_iter().collect();
1377 result_vec.sort();
1378
1379 let js_array = js_sys::Array::new();
1381 for name in &result_vec {
1382 log::info!("Returning database: {}", name);
1383 js_array.push(&JsValue::from_str(name));
1384 }
1385
1386 log::info!("getAllDatabases returning {} databases", result_vec.len());
1387
1388 Ok(js_array.into())
1389 }
1390
1391 #[wasm_bindgen(js_name = "deleteDatabase")]
1395 pub async fn delete_database(name: String) -> Result<(), JsValue> {
1396 use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
1397 use crate::storage::vfs_sync::{with_global_storage, with_global_metadata, with_global_commit_marker};
1398
1399 let normalized_name = if name.ends_with(".db") {
1401 name.clone()
1402 } else {
1403 format!("{}.db", name)
1404 };
1405
1406 log::info!("Deleting database: {}", normalized_name);
1407
1408 STORAGE_REGISTRY.with(|reg| {
1410 let mut registry = reg.borrow_mut();
1411 registry.remove(&normalized_name);
1412 });
1413
1414 with_global_storage(|gs| {
1416 let mut storage = gs.borrow_mut();
1417 storage.remove(&normalized_name);
1418 });
1419
1420 with_global_metadata(|gm| {
1422 let mut metadata = gm.borrow_mut();
1423 metadata.remove(&normalized_name);
1424 });
1425
1426 with_global_commit_marker(|cm| {
1428 let mut markers = cm.borrow_mut();
1429 markers.remove(&normalized_name);
1430 });
1431
1432 let idb_name = format!("absurder_{}", normalized_name);
1434 let _delete_promise = js_sys::eval(&format!(
1435 "indexedDB.deleteDatabase('{}')",
1436 idb_name
1437 )).map_err(|e| JsValue::from_str(&format!("Failed to delete IndexedDB: {:?}", e)))?;
1438
1439 log::info!("Database deleted: {}", normalized_name);
1440
1441 Self::remove_database_from_persistent_list(&normalized_name)?;
1443
1444 Ok(())
1445 }
1446
1447 fn add_database_to_persistent_list(db_name: &str) -> Result<(), JsValue> {
1449 log::info!("add_database_to_persistent_list called for: {}", db_name);
1450
1451 let window = web_sys::window().ok_or_else(|| {
1452 log::error!("No window object");
1453 JsValue::from_str("No window")
1454 })?;
1455
1456 let storage = window.local_storage()
1457 .map_err(|e| {
1458 log::error!("Failed to get localStorage: {:?}", e);
1459 JsValue::from_str("No localStorage")
1460 })?
1461 .ok_or_else(|| {
1462 log::error!("localStorage not available");
1463 JsValue::from_str("localStorage not available")
1464 })?;
1465
1466 let key = "absurder_db_list";
1467 let existing = storage.get_item(key).map_err(|e| {
1468 log::error!("Failed to read localStorage key {}: {:?}", key, e);
1469 JsValue::from_str("Failed to read localStorage")
1470 })?;
1471
1472 log::debug!("Existing localStorage value: {:?}", existing);
1473
1474 let mut db_list: Vec<String> = if let Some(json_str) = existing {
1475 match serde_json::from_str(&json_str) {
1476 Ok(list) => {
1477 log::debug!("Parsed existing list: {:?}", list);
1478 list
1479 }
1480 Err(e) => {
1481 log::warn!("Failed to parse localStorage JSON: {}, starting fresh", e);
1482 Vec::new()
1483 }
1484 }
1485 } else {
1486 log::debug!("No existing list, creating new");
1487 Vec::new()
1488 };
1489
1490 if !db_list.contains(&db_name.to_string()) {
1491 db_list.push(db_name.to_string());
1492 db_list.sort();
1493 log::debug!("Updated list: {:?}", db_list);
1494
1495 let json_str = serde_json::to_string(&db_list).map_err(|e| {
1496 log::error!("Failed to serialize list: {}", e);
1497 JsValue::from_str("Failed to serialize")
1498 })?;
1499
1500 log::debug!("Writing to localStorage: {}", json_str);
1501
1502 storage.set_item(key, &json_str).map_err(|e| {
1503 log::error!("Failed to write to localStorage: {:?}", e);
1504 JsValue::from_str("Failed to write localStorage")
1505 })?;
1506
1507 log::info!("Successfully added {} to persistent database list", db_name);
1508 } else {
1509 log::info!("{} already in persistent list", db_name);
1510 }
1511
1512 Ok(())
1513 }
1514
1515 fn remove_database_from_persistent_list(db_name: &str) -> Result<(), JsValue> {
1517 let window = web_sys::window().ok_or_else(|| JsValue::from_str("No window"))?;
1518 let storage = window.local_storage()
1519 .map_err(|_| JsValue::from_str("No localStorage"))?
1520 .ok_or_else(|| JsValue::from_str("localStorage not available"))?;
1521
1522 let key = "absurder_db_list";
1523 let existing = storage.get_item(key).map_err(|_| JsValue::from_str("Failed to read localStorage"))?;
1524
1525 if let Some(json_str) = existing {
1526 let mut db_list: Vec<String> = serde_json::from_str(&json_str).unwrap_or_else(|_| Vec::new());
1527 db_list.retain(|name| name != db_name);
1528 let json_str = serde_json::to_string(&db_list).map_err(|_| JsValue::from_str("Failed to serialize"))?;
1529 storage.set_item(key, &json_str).map_err(|_| JsValue::from_str("Failed to write localStorage"))?;
1530 log::info!("Removed {} from persistent database list", db_name);
1531 }
1532
1533 Ok(())
1534 }
1535
1536 fn get_persistent_database_list() -> Result<Vec<String>, JsValue> {
1538 log::info!("get_persistent_database_list called");
1539
1540 let window = web_sys::window().ok_or_else(|| {
1541 log::error!("No window object");
1542 JsValue::from_str("No window")
1543 })?;
1544
1545 let storage = window.local_storage()
1546 .map_err(|e| {
1547 log::error!("Failed to get localStorage: {:?}", e);
1548 JsValue::from_str("No localStorage")
1549 })?
1550 .ok_or_else(|| {
1551 log::error!("localStorage not available");
1552 JsValue::from_str("localStorage not available")
1553 })?;
1554
1555 let key = "absurder_db_list";
1556 let existing = storage.get_item(key).map_err(|e| {
1557 log::error!("Failed to read localStorage key {}: {:?}", key, e);
1558 JsValue::from_str("Failed to read localStorage")
1559 })?;
1560
1561 log::debug!("Read from localStorage: {:?}", existing);
1562
1563 if let Some(json_str) = existing {
1564 match serde_json::from_str::<Vec<String>>(&json_str) {
1565 Ok(db_list) => {
1566 log::info!("Successfully parsed {} databases from localStorage", db_list.len());
1567 log::debug!("Database list: {:?}", db_list);
1568 Ok(db_list)
1569 }
1570 Err(e) => {
1571 log::error!("Failed to parse localStorage JSON: {}", e);
1572 Ok(Vec::new())
1573 }
1574 }
1575 } else {
1576 log::info!("No persistent database list in localStorage");
1577 Ok(Vec::new())
1578 }
1579 }
1580
1581 fn start_write_queue_listener(db_name: &str) -> Result<(), JsValue> {
1583 use crate::storage::write_queue::{register_write_queue_listener, WriteQueueMessage, WriteResponse};
1584 use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
1585
1586 let db_name_clone = db_name.to_string();
1587
1588 let callback = Closure::wrap(Box::new(move |msg: JsValue| {
1589 let db_name_inner = db_name_clone.clone();
1590
1591 if let Ok(json_str) = js_sys::JSON::stringify(&msg) {
1593 if let Some(json_str) = json_str.as_string() {
1594 if let Ok(message) = serde_json::from_str::<WriteQueueMessage>(&json_str) {
1595 if let WriteQueueMessage::WriteRequest(request) = message {
1596 log::debug!("Leader received write request: {}", request.request_id);
1597
1598 let storage_rc = STORAGE_REGISTRY.with(|reg| {
1600 let registry = reg.borrow();
1601 registry.get(&db_name_inner).cloned()
1602 .or_else(|| registry.get(&format!("{}.db", &db_name_inner)).cloned())
1603 });
1604
1605 if let Some(storage) = storage_rc {
1606 wasm_bindgen_futures::spawn_local(async move {
1608 let is_leader = {
1609 let mut storage_mut = storage.borrow_mut();
1610 storage_mut.is_leader().await
1611 };
1612
1613 if !is_leader {
1614 log::error!("Not leader, ignoring write request");
1615 return;
1616 }
1617
1618 log::debug!("Processing write request as leader");
1619
1620 match Database::new_wasm(db_name_inner.clone()).await {
1622 Ok(mut db) => {
1623 match db.execute_internal(&request.sql).await {
1625 Ok(result) => {
1626 let response = WriteResponse::Success {
1628 request_id: request.request_id.clone(),
1629 affected_rows: result.affected_rows as usize,
1630 };
1631
1632 use crate::storage::write_queue::send_write_response;
1633 if let Err(e) = send_write_response(&db_name_inner, response) {
1634 log::error!("Failed to send response: {}", e);
1635 } else {
1636 log::info!("Write response sent successfully");
1637 }
1638 }
1639 Err(e) => {
1640 let response = WriteResponse::Error {
1642 request_id: request.request_id.clone(),
1643 error_message: e.to_string(),
1644 };
1645
1646 use crate::storage::write_queue::send_write_response;
1647 if let Err(e) = send_write_response(&db_name_inner, response) {
1648 log::error!("Failed to send error response: {}", e);
1649 }
1650 }
1651 }
1652 }
1653 Err(e) => {
1654 log::error!("Failed to create db for write processing: {:?}", e);
1655 }
1656 }
1657 });
1658 }
1659 }
1660 }
1661 }
1662 }
1663 }) as Box<dyn FnMut(JsValue)>);
1664
1665 let callback_fn = callback.as_ref().unchecked_ref();
1666 register_write_queue_listener(db_name, callback_fn)
1667 .map_err(|e| JsValue::from_str(&format!("Failed to register write queue listener: {}", e)))?;
1668
1669 callback.forget();
1670
1671 Ok(())
1672 }
1673
1674 #[wasm_bindgen]
1675 pub async fn execute(&mut self, sql: &str) -> Result<JsValue, JsValue> {
1676 self.check_write_permission(sql)
1678 .await
1679 .map_err(|e| JsValue::from_str(&format!("Write permission denied: {}", e)))?;
1680
1681 let result = self.execute_internal(sql)
1682 .await
1683 .map_err(|e| JsValue::from_str(&format!("Query execution failed: {}", e)))?;
1684 serde_wasm_bindgen::to_value(&result).map_err(|e| JsValue::from_str(&e.to_string()))
1685 }
1686
1687 #[wasm_bindgen(js_name = "executeWithParams")]
1688 pub async fn execute_with_params(&mut self, sql: &str, params: JsValue) -> Result<JsValue, JsValue> {
1689 let params: Vec<ColumnValue> = serde_wasm_bindgen::from_value(params)
1690 .map_err(|e| JsValue::from_str(&format!("Invalid parameters: {}", e)))?;
1691
1692 self.check_write_permission(sql)
1694 .await
1695 .map_err(|e| JsValue::from_str(&format!("Write permission denied: {}", e)))?;
1696
1697 let result = self.execute_with_params_internal(sql, ¶ms)
1698 .await
1699 .map_err(|e| JsValue::from_str(&format!("Query execution failed: {}", e)))?;
1700 serde_wasm_bindgen::to_value(&result).map_err(|e| JsValue::from_str(&e.to_string()))
1701 }
1702
1703 #[wasm_bindgen]
1704 pub async fn close(&mut self) -> Result<(), JsValue> {
1705 self.close_internal()
1706 .await
1707 .map_err(|e| JsValue::from_str(&format!("Failed to close database: {}", e)))
1708 }
1709
1710 #[wasm_bindgen]
1711 pub async fn sync(&mut self) -> Result<(), JsValue> {
1712 self.sync_internal()
1713 .await
1714 .map_err(|e| JsValue::from_str(&format!("Failed to sync database: {}", e)))
1715 }
1716
1717 #[wasm_bindgen(js_name = "allowNonLeaderWrites")]
1719 pub async fn allow_non_leader_writes(&mut self, allow: bool) -> Result<(), JsValue> {
1720 log::debug!("Setting allowNonLeaderWrites = {} for {}", allow, self.name);
1721 self.allow_non_leader_writes = allow;
1722 Ok(())
1723 }
1724
1725 #[wasm_bindgen(js_name = "exportToFile")]
1741 pub async fn export_to_file(&mut self) -> Result<js_sys::Uint8Array, JsValue> {
1742 log::info!("Exporting database: {}", self.name);
1743
1744 let _lock_guard = crate::storage::export_import_lock::acquire_export_import_lock(&self.name)
1746 .await
1747 .map_err(|e| JsValue::from_str(&format!("Failed to acquire export lock: {}", e)))?;
1748
1749 log::debug!("Export lock acquired for: {}", self.name);
1750
1751 let _ = self.execute("PRAGMA wal_checkpoint(PASSIVE)").await;
1754
1755 use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
1757
1758 let storage_rc = STORAGE_REGISTRY.with(|reg| {
1759 let registry = reg.borrow();
1760 registry.get(&self.name).cloned()
1761 .or_else(|| registry.get(&format!("{}.db", &self.name)).cloned())
1762 .or_else(|| {
1763 if self.name.ends_with(".db") {
1764 registry.get(&self.name[..self.name.len()-3]).cloned()
1765 } else {
1766 None
1767 }
1768 })
1769 });
1770
1771 let storage_rc = storage_rc.ok_or_else(|| {
1772 JsValue::from_str(&format!("Storage not found for database: {}", self.name))
1773 })?;
1774
1775 let mut storage = storage_rc.borrow_mut();
1777
1778 #[cfg(target_arch = "wasm32")]
1780 storage.reload_cache_from_global_storage();
1781
1782 let db_bytes = crate::storage::export::export_database_to_bytes(&mut *storage, self.max_export_size_bytes)
1784 .await
1785 .map_err(|e| JsValue::from_str(&format!("Export failed: {}", e)))?;
1786
1787 log::info!("Export complete: {} bytes", db_bytes.len());
1788
1789 let uint8_array = js_sys::Uint8Array::new_with_length(db_bytes.len() as u32);
1791 uint8_array.copy_from(&db_bytes);
1792
1793 Ok(uint8_array)
1794 }
1796
1797 #[wasm_bindgen(js_name = "importFromFile")]
1826 pub async fn import_from_file(&mut self, file_data: js_sys::Uint8Array) -> Result<(), JsValue> {
1827 log::info!("Importing database: {}", self.name);
1828
1829 let _lock_guard = crate::storage::export_import_lock::acquire_export_import_lock(&self.name)
1831 .await
1832 .map_err(|e| JsValue::from_str(&format!("Failed to acquire import lock: {}", e)))?;
1833
1834 log::debug!("Import lock acquired for: {}", self.name);
1835
1836 let data = file_data.to_vec();
1838 log::debug!("Import data size: {} bytes", data.len());
1839
1840 log::debug!("Closing database connection before import");
1842 self.close().await?;
1843
1844 crate::storage::import::import_database_from_bytes(&self.name, data)
1846 .await
1847 .map_err(|e| {
1848 log::error!("Import failed for {}: {}", self.name, e);
1849 JsValue::from_str(&format!("Import failed: {}", e))
1850 })?;
1851
1852 log::info!("Import complete for: {}", self.name);
1853
1854 Ok(())
1858 }
1860
1861 #[wasm_bindgen(js_name = "waitForLeadership")]
1863 pub async fn wait_for_leadership(&mut self) -> Result<(), JsValue> {
1864 use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
1865
1866 #[cfg(feature = "telemetry")]
1868 if let Some(ref metrics) = self.metrics {
1869 metrics.leader_elections_total().inc();
1870 }
1871
1872 let db_name = &self.name;
1873 log::debug!("Waiting for leadership for {}", db_name);
1874
1875 let start_time = js_sys::Date::now();
1877 let timeout_ms = 5000.0; loop {
1880 let storage_rc = STORAGE_REGISTRY.with(|reg| {
1881 let registry = reg.borrow();
1882 registry.get(db_name).cloned()
1883 .or_else(|| registry.get(&format!("{}.db", db_name)).cloned())
1884 .or_else(|| {
1885 if db_name.ends_with(".db") {
1886 registry.get(&db_name[..db_name.len()-3]).cloned()
1887 } else {
1888 None
1889 }
1890 })
1891 });
1892
1893 if let Some(storage) = storage_rc {
1894 let mut storage_mut = storage.borrow_mut();
1895 let is_leader = storage_mut.is_leader().await;
1896
1897 if is_leader {
1898 log::info!("Became leader for {}", db_name);
1899
1900 #[cfg(feature = "telemetry")]
1902 if let Some(ref metrics) = self.metrics {
1903 let duration_ms = js_sys::Date::now() - start_time;
1904 metrics.leader_election_duration().observe(duration_ms);
1905 metrics.is_leader().set(1.0);
1906 metrics.leadership_changes_total().inc();
1907 }
1908
1909 return Ok(());
1910 }
1911 }
1912
1913 if js_sys::Date::now() - start_time > timeout_ms {
1915 #[cfg(feature = "telemetry")]
1917 if let Some(ref metrics) = self.metrics {
1918 let duration_ms = js_sys::Date::now() - start_time;
1919 metrics.leader_election_duration().observe(duration_ms);
1920 }
1921
1922 return Err(JsValue::from_str("Timeout waiting for leadership"));
1923 }
1924
1925 let promise = js_sys::Promise::new(&mut |resolve, _| {
1927 let window = web_sys::window().expect("should have window");
1928 let _ = window.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, 100);
1929 });
1930 let _ = wasm_bindgen_futures::JsFuture::from(promise).await;
1931 }
1932 }
1933
1934 #[wasm_bindgen(js_name = "requestLeadership")]
1936 pub async fn request_leadership(&mut self) -> Result<(), JsValue> {
1937 use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
1938
1939 let db_name = &self.name;
1940 log::debug!("Requesting leadership for {}", db_name);
1941
1942 #[cfg(feature = "telemetry")]
1944 let telemetry_data = if self.metrics.is_some() {
1945 let start_time = js_sys::Date::now();
1946 let was_leader = self.is_leader_wasm().await.ok()
1947 .and_then(|v| v.as_bool())
1948 .unwrap_or(false);
1949
1950 if let Some(ref metrics) = self.metrics {
1951 metrics.leader_elections_total().inc();
1952 }
1953
1954 Some((start_time, was_leader))
1955 } else {
1956 None
1957 };
1958
1959 let storage_rc = STORAGE_REGISTRY.with(|reg| {
1960 let registry = reg.borrow();
1961 registry.get(db_name).cloned()
1962 .or_else(|| registry.get(&format!("{}.db", db_name)).cloned())
1963 .or_else(|| {
1964 if db_name.ends_with(".db") {
1965 registry.get(&db_name[..db_name.len()-3]).cloned()
1966 } else {
1967 None
1968 }
1969 })
1970 });
1971
1972 if let Some(storage) = storage_rc {
1973 {
1974 let mut storage_mut = storage.borrow_mut();
1975
1976 storage_mut.start_leader_election().await
1978 .map_err(|e| JsValue::from_str(&format!("Failed to request leadership: {}", e)))?;
1979
1980 log::debug!("Re-election triggered for {}", db_name);
1981 } #[cfg(feature = "telemetry")]
1985 if let Some((start_time, was_leader)) = telemetry_data {
1986 if let Some(ref metrics) = self.metrics {
1987 let duration_ms = js_sys::Date::now() - start_time;
1989 metrics.leader_election_duration().observe(duration_ms);
1990
1991 let is_leader_now = self.is_leader_wasm().await.ok()
1993 .and_then(|v| v.as_bool())
1994 .unwrap_or(false);
1995
1996 metrics.is_leader().set(if is_leader_now { 1.0 } else { 0.0 });
1998
1999 if was_leader != is_leader_now {
2001 metrics.leadership_changes_total().inc();
2002 }
2003 }
2004 }
2005
2006 Ok(())
2007 } else {
2008 Err(JsValue::from_str(&format!("No storage found for database: {}", db_name)))
2009 }
2010 }
2011
2012 #[wasm_bindgen(js_name = "getLeaderInfo")]
2014 pub async fn get_leader_info(&mut self) -> Result<JsValue, JsValue> {
2015 use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
2016
2017 let db_name = &self.name;
2018
2019 let storage_rc = STORAGE_REGISTRY.with(|reg| {
2020 let registry = reg.borrow();
2021 registry.get(db_name).cloned()
2022 .or_else(|| registry.get(&format!("{}.db", db_name)).cloned())
2023 .or_else(|| {
2024 if db_name.ends_with(".db") {
2025 registry.get(&db_name[..db_name.len()-3]).cloned()
2026 } else {
2027 None
2028 }
2029 })
2030 });
2031
2032 if let Some(storage) = storage_rc {
2033 let mut storage_mut = storage.borrow_mut();
2034 let is_leader = storage_mut.is_leader().await;
2035
2036 let leader_id_str = if is_leader {
2039 format!("leader_{}", db_name)
2040 } else {
2041 "unknown".to_string()
2042 };
2043
2044 let obj = js_sys::Object::new();
2046 js_sys::Reflect::set(&obj, &"isLeader".into(), &JsValue::from_bool(is_leader))?;
2047 js_sys::Reflect::set(&obj, &"leaderId".into(), &JsValue::from_str(&leader_id_str))?;
2048 js_sys::Reflect::set(&obj, &"leaseExpiry".into(), &JsValue::from_f64(js_sys::Date::now()))?;
2049
2050 Ok(obj.into())
2051 } else {
2052 Err(JsValue::from_str(&format!("No storage found for database: {}", db_name)))
2053 }
2054 }
2055
2056 #[wasm_bindgen(js_name = "queueWrite")]
2067 pub async fn queue_write(&mut self, sql: String) -> Result<(), JsValue> {
2068 self.queue_write_with_timeout(sql, 5000).await
2069 }
2070
2071 #[wasm_bindgen(js_name = "queueWriteWithTimeout")]
2077 pub async fn queue_write_with_timeout(&mut self, sql: String, timeout_ms: u32) -> Result<(), JsValue> {
2078 use crate::storage::write_queue::{send_write_request, WriteResponse, WriteQueueMessage};
2079 use std::cell::RefCell;
2080 use std::rc::Rc;
2081
2082 log::debug!("Queuing write: {}", sql);
2083
2084 use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
2086 let is_leader = {
2087 let storage_rc = STORAGE_REGISTRY.with(|reg| {
2088 let registry = reg.borrow();
2089 registry.get(&self.name).cloned()
2090 .or_else(|| registry.get(&format!("{}.db", &self.name)).cloned())
2091 });
2092
2093 if let Some(storage) = storage_rc {
2094 let mut storage_mut = storage.borrow_mut();
2095 storage_mut.is_leader().await
2096 } else {
2097 false
2098 }
2099 };
2100
2101 if is_leader {
2102 log::debug!("We are leader, executing directly");
2103 return self.execute_internal(&sql).await
2104 .map(|_| ())
2105 .map_err(|e| JsValue::from_str(&format!("Execute failed: {}", e)));
2106 }
2107
2108 let request_id = send_write_request(&self.name, &sql)
2110 .map_err(|e| JsValue::from_str(&format!("Failed to send write request: {}", e)))?;
2111
2112 log::debug!("Write request sent with ID: {}", request_id);
2113
2114 let response_received = Rc::new(RefCell::new(false));
2116 let response_error = Rc::new(RefCell::new(None::<String>));
2117
2118 let response_received_clone = response_received.clone();
2119 let response_error_clone = response_error.clone();
2120 let request_id_clone = request_id.clone();
2121
2122 let callback = Closure::wrap(Box::new(move |msg: JsValue| {
2124 if let Ok(json_str) = js_sys::JSON::stringify(&msg) {
2126 if let Some(json_str) = json_str.as_string() {
2127 if let Ok(message) = serde_json::from_str::<WriteQueueMessage>(&json_str) {
2128 if let WriteQueueMessage::WriteResponse(response) = message {
2129 match response {
2130 WriteResponse::Success { request_id, .. } => {
2131 if request_id == request_id_clone {
2132 *response_received_clone.borrow_mut() = true;
2133 log::debug!("Write response received: Success");
2134 }
2135 }
2136 WriteResponse::Error { request_id, error_message } => {
2137 if request_id == request_id_clone {
2138 *response_received_clone.borrow_mut() = true;
2139 *response_error_clone.borrow_mut() = Some(error_message);
2140 log::debug!("Write response received: Error");
2141 }
2142 }
2143 }
2144 }
2145 }
2146 }
2147 }
2148 }) as Box<dyn FnMut(JsValue)>);
2149
2150 use crate::storage::write_queue::register_write_queue_listener;
2152 let callback_fn = callback.as_ref().unchecked_ref();
2153 register_write_queue_listener(&self.name, callback_fn)
2154 .map_err(|e| JsValue::from_str(&format!("Failed to register listener: {}", e)))?;
2155
2156 callback.forget();
2158
2159 let start_time = js_sys::Date::now();
2161 let timeout_f64 = timeout_ms as f64;
2162
2163 loop {
2164 if *response_received.borrow() {
2166 if let Some(error_msg) = response_error.borrow().as_ref() {
2167 return Err(JsValue::from_str(&format!("Write failed: {}", error_msg)));
2168 }
2169 log::info!("Write completed successfully");
2170 return Ok(());
2171 }
2172
2173 let elapsed = js_sys::Date::now() - start_time;
2175 if elapsed > timeout_f64 {
2176 return Err(JsValue::from_str("Write request timed out"));
2177 }
2178
2179 wasm_bindgen_futures::JsFuture::from(js_sys::Promise::new(&mut |resolve, _reject| {
2181 if let Some(window) = web_sys::window() {
2182 let _ = window.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, 100);
2183 } else {
2184 log::error!("Window unavailable in timeout handler");
2185 }
2186 })).await.ok();
2187 }
2188 }
2189
2190 #[wasm_bindgen(js_name = "isLeader")]
2191 pub async fn is_leader_wasm(&self) -> Result<JsValue, JsValue> {
2192 use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
2194
2195 let db_name = &self.name;
2196 log::debug!("isLeader() called for database: {} (self.name)", db_name);
2197
2198 STORAGE_REGISTRY.with(|reg| {
2200 let registry = reg.borrow();
2201 log::debug!("STORAGE_REGISTRY keys: {:?}", registry.keys().collect::<Vec<_>>());
2202 });
2203
2204 let storage_rc = STORAGE_REGISTRY.with(|reg| {
2205 let registry = reg.borrow();
2206 registry.get(db_name).cloned()
2208 .or_else(|| registry.get(&format!("{}.db", db_name)).cloned())
2209 .or_else(|| {
2210 if db_name.ends_with(".db") {
2211 registry.get(&db_name[..db_name.len()-3]).cloned()
2212 } else {
2213 None
2214 }
2215 })
2216 });
2217
2218 if let Some(storage) = storage_rc {
2219 log::debug!("Found storage for {}, calling is_leader()", db_name);
2220 let mut storage_mut = storage.borrow_mut();
2221 let is_leader = storage_mut.is_leader().await;
2222 log::debug!("is_leader() = {} for {}", is_leader, db_name);
2223
2224 Ok(JsValue::from_bool(is_leader))
2226 } else {
2227 log::error!("ERROR: No storage found for database: {}", db_name);
2228 Err(JsValue::from_str(&format!("No storage found for database: {}", db_name)))
2229 }
2230 }
2231
2232 pub async fn is_leader(&self) -> Result<bool, JsValue> {
2234 let result = self.is_leader_wasm().await?;
2235 Ok(result.as_bool().unwrap_or(false))
2236 }
2237
2238 #[wasm_bindgen(js_name = "onDataChange")]
2239 pub fn on_data_change_wasm(&mut self, callback: &js_sys::Function) -> Result<(), JsValue> {
2240 log::debug!("Registering onDataChange callback for {}", self.name);
2241
2242 self.on_data_change_callback = Some(callback.clone());
2244
2245 use crate::storage::broadcast_notifications::register_change_listener;
2247
2248 let db_name = &self.name;
2249 register_change_listener(db_name, callback)
2250 .map_err(|e| JsValue::from_str(&format!("Failed to register change listener: {}", e)))?;
2251
2252 log::debug!("onDataChange callback registered for {}", self.name);
2253 Ok(())
2254 }
2255
2256 #[wasm_bindgen(js_name = "enableOptimisticUpdates")]
2258 pub async fn enable_optimistic_updates(&mut self, enabled: bool) -> Result<(), JsValue> {
2259 self.optimistic_updates_manager.borrow_mut().set_enabled(enabled);
2260 log::debug!("Optimistic updates {}", if enabled { "enabled" } else { "disabled" });
2261 Ok(())
2262 }
2263
2264 #[wasm_bindgen(js_name = "isOptimisticMode")]
2266 pub async fn is_optimistic_mode(&self) -> bool {
2267 self.optimistic_updates_manager.borrow().is_enabled()
2268 }
2269
2270 #[wasm_bindgen(js_name = "trackOptimisticWrite")]
2272 pub async fn track_optimistic_write(&mut self, sql: String) -> Result<String, JsValue> {
2273 let id = self.optimistic_updates_manager.borrow_mut().track_write(sql);
2274 Ok(id)
2275 }
2276
2277 #[wasm_bindgen(js_name = "getPendingWritesCount")]
2279 pub async fn get_pending_writes_count(&self) -> usize {
2280 self.optimistic_updates_manager.borrow().get_pending_count()
2281 }
2282
2283 #[wasm_bindgen(js_name = "clearOptimisticWrites")]
2285 pub async fn clear_optimistic_writes(&mut self) -> Result<(), JsValue> {
2286 self.optimistic_updates_manager.borrow_mut().clear_all();
2287 Ok(())
2288 }
2289
2290 #[wasm_bindgen(js_name = "enableCoordinationMetrics")]
2292 pub async fn enable_coordination_metrics(&mut self, enabled: bool) -> Result<(), JsValue> {
2293 self.coordination_metrics_manager.borrow_mut().set_enabled(enabled);
2294 Ok(())
2295 }
2296
2297 #[wasm_bindgen(js_name = "isCoordinationMetricsEnabled")]
2299 pub async fn is_coordination_metrics_enabled(&self) -> bool {
2300 self.coordination_metrics_manager.borrow().is_enabled()
2301 }
2302
2303 #[wasm_bindgen(js_name = "recordLeadershipChange")]
2305 pub async fn record_leadership_change(&mut self, became_leader: bool) -> Result<(), JsValue> {
2306 self.coordination_metrics_manager.borrow_mut().record_leadership_change(became_leader);
2307 Ok(())
2308 }
2309
2310 #[wasm_bindgen(js_name = "recordNotificationLatency")]
2312 pub async fn record_notification_latency(&mut self, latency_ms: f64) -> Result<(), JsValue> {
2313 self.coordination_metrics_manager.borrow_mut().record_notification_latency(latency_ms);
2314 Ok(())
2315 }
2316
2317 #[wasm_bindgen(js_name = "recordWriteConflict")]
2319 pub async fn record_write_conflict(&mut self) -> Result<(), JsValue> {
2320 self.coordination_metrics_manager.borrow_mut().record_write_conflict();
2321 Ok(())
2322 }
2323
2324 #[wasm_bindgen(js_name = "recordFollowerRefresh")]
2326 pub async fn record_follower_refresh(&mut self) -> Result<(), JsValue> {
2327 self.coordination_metrics_manager.borrow_mut().record_follower_refresh();
2328 Ok(())
2329 }
2330
2331 #[wasm_bindgen(js_name = "getCoordinationMetrics")]
2333 pub async fn get_coordination_metrics(&self) -> Result<String, JsValue> {
2334 self.coordination_metrics_manager.borrow().get_metrics_json()
2335 .map_err(|e| JsValue::from_str(&e))
2336 }
2337
2338 #[wasm_bindgen(js_name = "resetCoordinationMetrics")]
2340 pub async fn reset_coordination_metrics(&mut self) -> Result<(), JsValue> {
2341 self.coordination_metrics_manager.borrow_mut().reset();
2342 Ok(())
2343 }
2344}
2345
2346#[cfg(target_arch = "wasm32")]
2348#[wasm_bindgen]
2349pub struct WasmColumnValue {
2350 #[allow(dead_code)]
2351 inner: ColumnValue,
2352}
2353
2354#[cfg(target_arch = "wasm32")]
2355#[wasm_bindgen]
2356impl WasmColumnValue {
2357 #[wasm_bindgen(js_name = "createNull")]
2358 pub fn create_null() -> WasmColumnValue {
2359 WasmColumnValue {
2360 inner: ColumnValue::Null,
2361 }
2362 }
2363
2364 #[wasm_bindgen(js_name = "createInteger")]
2365 pub fn create_integer(value: i64) -> WasmColumnValue {
2366 WasmColumnValue {
2367 inner: ColumnValue::Integer(value),
2368 }
2369 }
2370
2371 #[wasm_bindgen(js_name = "createReal")]
2372 pub fn create_real(value: f64) -> WasmColumnValue {
2373 WasmColumnValue {
2374 inner: ColumnValue::Real(value),
2375 }
2376 }
2377
2378 #[wasm_bindgen(js_name = "createText")]
2379 pub fn create_text(value: String) -> WasmColumnValue {
2380 WasmColumnValue {
2381 inner: ColumnValue::Text(value),
2382 }
2383 }
2384
2385 #[wasm_bindgen(js_name = "createBlob")]
2386 pub fn create_blob(value: &[u8]) -> WasmColumnValue {
2387 WasmColumnValue {
2388 inner: ColumnValue::Blob(value.to_vec()),
2389 }
2390 }
2391
2392 #[wasm_bindgen(js_name = "createBigInt")]
2393 pub fn create_bigint(value: &str) -> WasmColumnValue {
2394 WasmColumnValue {
2395 inner: ColumnValue::BigInt(value.to_string()),
2396 }
2397 }
2398
2399 #[wasm_bindgen(js_name = "createDate")]
2400 pub fn create_date(timestamp: f64) -> WasmColumnValue {
2401 WasmColumnValue {
2402 inner: ColumnValue::Date(timestamp as i64),
2403 }
2404 }
2405
2406 #[wasm_bindgen(js_name = "fromJsValue")]
2407 pub fn from_js_value(value: &JsValue) -> WasmColumnValue {
2408 if value.is_null() || value.is_undefined() {
2409 WasmColumnValue {
2410 inner: ColumnValue::Null,
2411 }
2412 } else if let Some(s) = value.as_string() {
2413 if let Ok(parsed) = s.parse::<i64>() {
2415 WasmColumnValue {
2416 inner: ColumnValue::Integer(parsed),
2417 }
2418 } else {
2419 WasmColumnValue {
2420 inner: ColumnValue::Text(s),
2421 }
2422 }
2423 } else if let Some(n) = value.as_f64() {
2424 if n.fract() == 0.0 && n >= i64::MIN as f64 && n <= i64::MAX as f64 {
2425 WasmColumnValue {
2426 inner: ColumnValue::Integer(n as i64),
2427 }
2428 } else {
2429 WasmColumnValue {
2430 inner: ColumnValue::Real(n),
2431 }
2432 }
2433 } else if value.is_object() {
2434 if js_sys::Date::new(value).get_time().is_finite() {
2436 let timestamp = js_sys::Date::new(value).get_time() as i64;
2437 WasmColumnValue {
2438 inner: ColumnValue::Date(timestamp),
2439 }
2440 } else {
2441 WasmColumnValue {
2443 inner: ColumnValue::Text(format!("{:?}", value)),
2444 }
2445 }
2446 } else {
2447 WasmColumnValue {
2448 inner: ColumnValue::Null,
2449 }
2450 }
2451 }
2452
2453 pub fn null() -> WasmColumnValue { Self::create_null() }
2457
2458 pub fn integer(value: f64) -> WasmColumnValue { Self::create_integer(value as i64) }
2460
2461 pub fn real(value: f64) -> WasmColumnValue { Self::create_real(value) }
2462
2463 pub fn text(value: String) -> WasmColumnValue { Self::create_text(value) }
2464
2465 pub fn blob(value: Vec<u8>) -> WasmColumnValue { Self::create_blob(&value) }
2466
2467 pub fn big_int(value: String) -> WasmColumnValue { Self::create_bigint(&value) }
2468
2469 pub fn date(timestamp_ms: f64) -> WasmColumnValue { Self::create_date(timestamp_ms) }
2470}