absurder_sql/
lib.rs

1#[cfg(target_arch = "wasm32")]
2use std::rc::Rc;
3#[cfg(target_arch = "wasm32")]
4use wasm_bindgen::prelude::*;
5
6// Conditional rusqlite import: same crate, different features
7// Make this public so child crates can use it
8// When encryption is enabled, rusqlite uses bundled-sqlcipher-vendored-openssl feature
9// When bundled-sqlite is enabled, rusqlite uses bundled feature
10#[cfg(all(
11    not(target_arch = "wasm32"),
12    any(
13        feature = "bundled-sqlite",
14        feature = "encryption",
15        feature = "encryption-commoncrypto",
16        feature = "encryption-ios"
17    )
18))]
19pub extern crate rusqlite;
20
21// Enable better panic messages and memory allocation
22#[cfg(feature = "console_error_panic_hook")]
23pub use console_error_panic_hook::set_once as set_panic_hook;
24
25#[cfg(feature = "wee_alloc")]
26#[global_allocator]
27static ALLOC: wee_alloc::WeeAlloc = wee_alloc::WeeAlloc::INIT;
28
29// Initialize logging infrastructure for WASM
30#[cfg(all(target_arch = "wasm32", feature = "console_log"))]
31#[wasm_bindgen(start)]
32pub fn init_logger() {
33    // Initialize console_log for browser logging
34    // Use Info level for production, Debug for development
35    #[cfg(debug_assertions)]
36    let log_level = log::Level::Debug;
37    #[cfg(not(debug_assertions))]
38    let log_level = log::Level::Info;
39
40    console_log::init_with_level(log_level).expect("Failed to initialize console_log");
41
42    log::info!("AbsurderSQL logging initialized at level: {:?}", log_level);
43}
44
45/// SINGLE SOURCE OF TRUTH: Normalize database name to ALWAYS include .db extension
46/// This matches main branch behavior where all GLOBAL_STORAGE keys use name WITH .db
47/// Used by: Database::new, import_from_file, sync_internal, IndexedDB operations
48#[inline]
49#[cfg(target_arch = "wasm32")]
50fn normalize_db_name(name: &str) -> String {
51    // Main branch uses full name with .db - we must do the same for consistency
52    // BlockStorage, GLOBAL_STORAGE, and IndexedDB all use this normalized name
53    if name.ends_with(".db") {
54        name.to_string()
55    } else {
56        format!("{}.db", name)
57    }
58}
59
60// Module declarations
61mod cleanup;
62#[cfg(target_arch = "wasm32")]
63pub mod connection_pool;
64#[cfg(not(target_arch = "wasm32"))]
65pub mod database;
66pub mod storage;
67pub mod types;
68pub mod vfs;
69#[cfg(not(target_arch = "wasm32"))]
70pub use database::PreparedStatement;
71pub mod utils;
72
73#[cfg(feature = "telemetry")]
74pub mod telemetry;
75
76// Re-export main public API
77#[cfg(not(target_arch = "wasm32"))]
78pub use database::SqliteIndexedDB;
79
80// WASM: Track databases currently being opened to serialize SQLite connection initialization
81#[cfg(target_arch = "wasm32")]
82thread_local! {
83    static DB_OPEN_IN_PROGRESS: std::cell::RefCell<std::collections::HashSet<String>> =
84        std::cell::RefCell::new(std::collections::HashSet::new());
85}
86
87// Type alias for native platforms
88#[cfg(not(target_arch = "wasm32"))]
89pub type Database = SqliteIndexedDB;
90
91pub use types::DatabaseConfig;
92pub use types::{ColumnValue, DatabaseError, QueryResult, Row, TransactionOptions};
93
94// Re-export VFS
95pub use vfs::indexeddb_vfs::IndexedDBVFS;
96
97/// DRY macro for async storage operations with interior mutability
98#[cfg(target_arch = "wasm32")]
99macro_rules! with_storage_async {
100    ($storage:expr, $operation:expr, |$s:ident| $body:expr) => {{
101        // BlockStorage uses RefCell for interior mutability, no outer borrow needed
102        let $s = &*$storage;
103        Some($body.await)
104    }};
105}
106
107// WASM Database implementation using sqlite-wasm-rs
108#[cfg(target_arch = "wasm32")]
109#[wasm_bindgen]
110pub struct Database {
111    #[wasm_bindgen(skip)]
112    connection_state: Rc<crate::connection_pool::ConnectionState>,
113    #[allow(dead_code)]
114    name: String,
115    #[wasm_bindgen(skip)]
116    on_data_change_callback: Option<js_sys::Function>,
117    #[wasm_bindgen(skip)]
118    allow_non_leader_writes: bool,
119    #[wasm_bindgen(skip)]
120    optimistic_updates_manager:
121        std::cell::RefCell<crate::storage::optimistic_updates::OptimisticUpdatesManager>,
122    #[wasm_bindgen(skip)]
123    coordination_metrics_manager:
124        std::cell::RefCell<crate::storage::coordination_metrics::CoordinationMetricsManager>,
125    #[wasm_bindgen(skip)]
126    #[cfg(feature = "telemetry")]
127    metrics: Option<crate::telemetry::Metrics>,
128    #[wasm_bindgen(skip)]
129    #[cfg(feature = "telemetry")]
130    span_recorder: Option<crate::telemetry::SpanRecorder>,
131    #[wasm_bindgen(skip)]
132    #[cfg(feature = "telemetry")]
133    span_context: Option<crate::telemetry::SpanContext>,
134    #[wasm_bindgen(skip)]
135    max_export_size_bytes: Option<u64>,
136}
137
138#[cfg(target_arch = "wasm32")]
139impl Database {
140    /// Get the SQLite database pointer from the shared connection
141    /// Uses Cell::get() to avoid RefCell borrow checking - eliminates reentrancy panics
142    fn db(&self) -> *mut sqlite_wasm_rs::sqlite3 {
143        let db_ptr = self.connection_state.db.get();
144        if db_ptr.is_null() {
145            panic!("Database connection is null for {}", self.name);
146        }
147        db_ptr
148    }
149
150    /// Check if a SQL statement is a write operation
151    fn is_write_operation(sql: &str) -> bool {
152        let upper = sql.trim().to_uppercase();
153        upper.starts_with("INSERT")
154            || upper.starts_with("UPDATE")
155            || upper.starts_with("DELETE")
156            || upper.starts_with("REPLACE")
157    }
158
159    /// Get metrics for observability
160    ///
161    /// Returns a reference to the Metrics instance for tracking queries, errors, and performance
162    #[cfg(feature = "telemetry")]
163    pub fn metrics(&self) -> Option<&crate::telemetry::Metrics> {
164        self.metrics.as_ref()
165    }
166
167    /// Check write permission - only leader can write (unless override enabled)
168    async fn check_write_permission(&mut self, sql: &str) -> Result<(), DatabaseError> {
169        if !Self::is_write_operation(sql) {
170            // Not a write operation, allow it
171            return Ok(());
172        }
173
174        // Check if non-leader writes are allowed
175        if self.allow_non_leader_writes {
176            log::info!("WRITE_ALLOWED: Non-leader writes enabled for {}", self.name);
177            return Ok(());
178        }
179
180        // Check if this instance is the leader
181        use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
182
183        let db_name = &self.name;
184        let storage_rc = get_storage_with_fallback(db_name);
185
186        if let Some(storage) = storage_rc {
187            let is_leader = with_storage_async!(storage, "check_write_permission", |s| s
188                .is_leader())
189            .ok_or_else(|| {
190                DatabaseError::new("BORROW_CONFLICT", "Failed to check leader status")
191            })?;
192
193            if !is_leader {
194                log::error!("WRITE_DENIED: Instance is not leader for {}", db_name);
195                return Err(DatabaseError::new(
196                    "WRITE_PERMISSION_DENIED",
197                    "Only the leader tab can write to this database. Use db.isLeader() to check status or call db.allowNonLeaderWrites(true) for single-tab mode.",
198                ));
199            }
200
201            log::info!("WRITE_ALLOWED: Instance is leader for {}", db_name);
202            Ok(())
203        } else {
204            // No storage found - allow by default (single-instance mode)
205            log::info!(
206                "WRITE_ALLOWED: No storage found for {} (single-instance mode)",
207                db_name
208            );
209            Ok(())
210        }
211    }
212
213    pub async fn new(config: DatabaseConfig) -> Result<Self, DatabaseError> {
214        use std::ffi::{CStr, CString};
215
216        log::info!("Database::new called for {}", config.name);
217
218        // CRITICAL: Use DRY helper to normalize name WITH .db extension
219        // This ensures Database.name, GLOBAL_STORAGE keys, and IndexedDB keys all match
220        let normalized_name = normalize_db_name(&config.name);
221
222        // Use a unique VFS name per database to avoid interference
223        let vfs_name = format!("vfs_{}", normalized_name.trim_end_matches(".db"));
224        let vfs_name_cstr = CString::new(vfs_name.as_str())
225            .map_err(|_| DatabaseError::new("INVALID_VFS_NAME", "Invalid VFS name"))?;
226        let vfs_exists = unsafe {
227            let existing_vfs = sqlite_wasm_rs::sqlite3_vfs_find(vfs_name_cstr.as_ptr());
228            !existing_vfs.is_null()
229        };
230
231        if !vfs_exists {
232            // Create and register VFS only if it doesn't exist
233            log::debug!("Creating IndexedDBVFS for: {}", normalized_name);
234            let vfs = crate::vfs::IndexedDBVFS::new(&normalized_name).await?;
235            log::debug!("Registering VFS as '{}'", vfs_name);
236            vfs.register(&vfs_name)?;
237            log::info!("VFS registered successfully");
238        } else {
239            log::info!("VFS '{}' already registered, reusing existing", vfs_name);
240            // Ensure BlockStorage exists for this database in the registry
241            // The existing VFS will find it via STORAGE_REGISTRY
242            let _vfs = crate::vfs::IndexedDBVFS::new(&normalized_name).await?;
243            log::info!("BlockStorage ensured for {}", normalized_name);
244        }
245
246        // CRITICAL: Synchronize SQLite connection opening to prevent WAL initialization conflicts
247        // Wait if another task is currently opening a connection to this database
248        #[cfg(target_arch = "wasm32")]
249        {
250            const MAX_OPEN_WAIT_MS: u32 = 1000; // Reduced from 5000 - 1s is sufficient
251            const OPEN_POLL_MS: u32 = 10;
252            let max_open_attempts = MAX_OPEN_WAIT_MS / OPEN_POLL_MS;
253
254            for attempt in 0..max_open_attempts {
255                let can_open = DB_OPEN_IN_PROGRESS.with(|opens| {
256                    let mut set = opens.borrow_mut();
257                    if set.contains(&config.name) {
258                        false // Someone else is opening
259                    } else {
260                        set.insert(config.name.clone());
261                        true // We got it
262                    }
263                });
264
265                if can_open {
266                    web_sys::console::log_1(
267                        &format!("[DB] {} - ACQUIRED sqlite open lock", config.name).into(),
268                    );
269                    break;
270                } else {
271                    web_sys::console::log_1(
272                        &format!(
273                            "[DB] {} - Waiting for sqlite open (attempt {})",
274                            config.name, attempt
275                        )
276                        .into(),
277                    );
278                    use wasm_bindgen_futures::JsFuture;
279                    let promise = js_sys::Promise::new(&mut |resolve, _| {
280                        web_sys::window()
281                            .unwrap()
282                            .set_timeout_with_callback_and_timeout_and_arguments_0(
283                                &resolve,
284                                OPEN_POLL_MS as i32,
285                            )
286                            .unwrap();
287                    });
288                    JsFuture::from(promise).await.ok();
289                    continue;
290                }
291            }
292        }
293
294        // Use connection pooling to share connections between instances
295        let (connection_state, db) = {
296            let vfs_name_str = vfs_name.clone(); // Capture the VFS name to use in closure
297            let filename_copy = normalized_name.clone(); // Capture filename for logging
298            let pool_key = normalized_name.trim_end_matches(".db").to_string(); // Pool uses name without .db
299            let state = crate::connection_pool::get_or_create_connection(&pool_key, || {
300                let mut db = std::ptr::null_mut();
301                let db_name = CString::new(normalized_name.clone())
302                    .map_err(|_| "Invalid database name".to_string())?;
303                let vfs_cstr = CString::new(vfs_name_str.as_str())
304                    .map_err(|_| "Invalid VFS name".to_string())?;
305
306                log::info!(
307                    "Opening database: {} with VFS: {}",
308                    filename_copy,
309                    vfs_name_str
310                );
311
312                #[cfg(target_arch = "wasm32")]
313                web_sys::console::log_1(&format!("[OPEN] About to call sqlite3_open_v2...").into());
314
315                let ret = unsafe {
316                    sqlite_wasm_rs::sqlite3_open_v2(
317                        db_name.as_ptr(),
318                        &mut db as *mut _,
319                        sqlite_wasm_rs::SQLITE_OPEN_READWRITE | sqlite_wasm_rs::SQLITE_OPEN_CREATE,
320                        vfs_cstr.as_ptr(),
321                    )
322                };
323
324                #[cfg(target_arch = "wasm32")]
325                web_sys::console::log_1(
326                    &format!("[OPEN] sqlite3_open_v2 returned: {}", ret).into(),
327                );
328
329                log::info!(
330                    "sqlite3_open_v2 returned: {} for database: {}",
331                    ret,
332                    filename_copy
333                );
334
335                if ret != sqlite_wasm_rs::SQLITE_OK {
336                    let err_msg = unsafe {
337                        let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(db);
338                        if !msg_ptr.is_null() {
339                            CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
340                        } else {
341                            "Unknown error".to_string()
342                        }
343                    };
344
345                    #[cfg(target_arch = "wasm32")]
346                    web_sys::console::log_1(
347                        &format!(
348                            "[OPEN] ERROR - sqlite3_open_v2 FAILED: ret={}, err={}",
349                            ret, err_msg
350                        )
351                        .into(),
352                    );
353
354                    return Err(format!(
355                        "Failed to open database with IndexedDB VFS: {}",
356                        err_msg
357                    ));
358                }
359
360                log::info!("Database opened successfully with IndexedDB VFS");
361                Ok(db)
362            })
363            .map_err(|e| DatabaseError::new("CONNECTION_POOL_ERROR", &e))?;
364            let db_ptr = state.db.get();
365            (state, db_ptr)
366        };
367
368        // Apply configuration options via PRAGMA statements
369        let exec_sql = |db: *mut sqlite_wasm_rs::sqlite3, sql: &str| -> Result<(), DatabaseError> {
370            let c_sql = CString::new(sql)
371                .map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL statement"))?;
372
373            let ret = unsafe {
374                sqlite_wasm_rs::sqlite3_exec(
375                    db,
376                    c_sql.as_ptr(),
377                    None,
378                    std::ptr::null_mut(),
379                    std::ptr::null_mut(),
380                )
381            };
382
383            if ret != sqlite_wasm_rs::SQLITE_OK {
384                let err_msg = unsafe {
385                    let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(db);
386                    if !msg_ptr.is_null() {
387                        CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
388                    } else {
389                        "Unknown error".to_string()
390                    }
391                };
392                log::warn!("Failed to execute SQL '{}': {}", sql, err_msg);
393                return Err(DatabaseError::new(
394                    "SQLITE_ERROR",
395                    &format!("Failed to execute: {}", err_msg),
396                ));
397            }
398            Ok(())
399        };
400
401        // CRITICAL: Set busy_timeout FIRST to handle concurrent access
402        // This makes SQLite wait and retry for up to 10 seconds when the database is locked
403        // instead of immediately returning SQLITE_BUSY errors during parallel operations
404        log::debug!("Setting busy_timeout to 10000ms for concurrent access handling");
405        exec_sql(db, "PRAGMA busy_timeout = 10000")?;
406
407        // Apply page_size (must be set before any tables are created)
408        if let Some(page_size) = config.page_size {
409            log::debug!("Setting page_size to {}", page_size);
410            exec_sql(db, &format!("PRAGMA page_size = {}", page_size))?;
411        }
412
413        // Apply cache_size
414        if let Some(cache_size) = config.cache_size {
415            log::debug!("Setting cache_size to {}", cache_size);
416            exec_sql(db, &format!("PRAGMA cache_size = {}", cache_size))?;
417        }
418
419        // Apply journal_mode
420        // WAL mode is now fully supported via shared memory (xShm*) implementation
421        if let Some(ref journal_mode) = config.journal_mode {
422            log::debug!("Setting journal_mode to {}", journal_mode);
423
424            let pragma_sql = format!("PRAGMA journal_mode = {}", journal_mode);
425            let c_sql = CString::new(pragma_sql.as_str())
426                .map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL statement"))?;
427
428            let mut stmt: *mut sqlite_wasm_rs::sqlite3_stmt = std::ptr::null_mut();
429            let ret = unsafe {
430                sqlite_wasm_rs::sqlite3_prepare_v2(
431                    db,
432                    c_sql.as_ptr(),
433                    -1,
434                    &mut stmt as *mut _,
435                    std::ptr::null_mut(),
436                )
437            };
438
439            if ret == sqlite_wasm_rs::SQLITE_OK && !stmt.is_null() {
440                let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
441                if step_ret == sqlite_wasm_rs::SQLITE_ROW {
442                    let result_ptr = unsafe { sqlite_wasm_rs::sqlite3_column_text(stmt, 0) };
443                    if !result_ptr.is_null() {
444                        let result_mode = unsafe {
445                            std::ffi::CStr::from_ptr(result_ptr as *const i8)
446                                .to_string_lossy()
447                                .to_uppercase()
448                        };
449
450                        if result_mode != journal_mode.to_uppercase() {
451                            log::warn!(
452                                "journal_mode {} requested but SQLite set {}",
453                                journal_mode,
454                                result_mode
455                            );
456                        } else {
457                            log::info!("journal_mode successfully set to {}", result_mode);
458                        }
459                    }
460                }
461                unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
462            } else {
463                log::warn!("Failed to prepare journal_mode PRAGMA");
464            }
465        }
466
467        // Apply auto_vacuum (must be set before any tables are created)
468        if let Some(auto_vacuum) = config.auto_vacuum {
469            let vacuum_mode = if auto_vacuum { 1 } else { 0 }; // 0=none, 1=full, 2=incremental
470            log::debug!("Setting auto_vacuum to {}", vacuum_mode);
471            exec_sql(db, &format!("PRAGMA auto_vacuum = {}", vacuum_mode))?;
472        }
473
474        log::info!("Database configuration applied successfully");
475
476        // Initialize metrics for telemetry
477        #[cfg(feature = "telemetry")]
478        let metrics = crate::telemetry::Metrics::new().map_err(|e| {
479            DatabaseError::new(
480                "METRICS_ERROR",
481                &format!("Failed to initialize metrics: {}", e),
482            )
483        })?;
484
485        let database = Database {
486            connection_state,
487            name: normalized_name.clone(), // CRITICAL: Use normalized name WITH .db to match registry
488            on_data_change_callback: None,
489            allow_non_leader_writes: false,
490            optimistic_updates_manager: std::cell::RefCell::new(
491                crate::storage::optimistic_updates::OptimisticUpdatesManager::new(),
492            ),
493            coordination_metrics_manager: std::cell::RefCell::new(
494                crate::storage::coordination_metrics::CoordinationMetricsManager::new(),
495            ),
496            #[cfg(feature = "telemetry")]
497            metrics: Some(metrics),
498            #[cfg(feature = "telemetry")]
499            span_recorder: None,
500            #[cfg(feature = "telemetry")]
501            span_context: Some(crate::telemetry::SpanContext::new()),
502            max_export_size_bytes: config.max_export_size_bytes,
503        };
504
505        // CRITICAL: Release the SQLite open lock ONLY after Database is fully constructed
506        // This ensures WAL initialization and all setup completes before another instance can start
507        #[cfg(target_arch = "wasm32")]
508        {
509            DB_OPEN_IN_PROGRESS.with(|opens| {
510                opens.borrow_mut().remove(&config.name);
511            });
512            web_sys::console::log_1(
513                &format!("[DB] {} - RELEASED sqlite open lock", config.name).into(),
514            );
515        }
516
517        Ok(database)
518    }
519
520    /// Open a database with a specific VFS using connection pooling
521    pub async fn open_with_vfs(filename: &str, vfs_name: &str) -> Result<Self, DatabaseError> {
522        use std::ffi::CString;
523
524        log::info!("Opening database {} with VFS {}", filename, vfs_name);
525
526        // Normalize the database name WITH .db extension
527        let normalized_name = normalize_db_name(filename);
528        let pool_key = normalized_name.trim_end_matches(".db").to_string();
529
530        // Use connection pooling with custom VFS
531        let connection_state = crate::connection_pool::get_or_create_connection(&pool_key, || {
532            let mut db: *mut sqlite_wasm_rs::sqlite3 = std::ptr::null_mut();
533            let db_name = CString::new(normalized_name.clone())
534                .map_err(|_| "Invalid database name".to_string())?;
535            let vfs_cstr = CString::new(vfs_name).map_err(|_| "Invalid VFS name".to_string())?;
536
537            let ret = unsafe {
538                sqlite_wasm_rs::sqlite3_open_v2(
539                    db_name.as_ptr(),
540                    &mut db as *mut _,
541                    sqlite_wasm_rs::SQLITE_OPEN_READWRITE | sqlite_wasm_rs::SQLITE_OPEN_CREATE,
542                    vfs_cstr.as_ptr(),
543                )
544            };
545
546            if ret != sqlite_wasm_rs::SQLITE_OK {
547                let err_msg = if !db.is_null() {
548                    unsafe {
549                        let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(db);
550                        if !msg_ptr.is_null() {
551                            std::ffi::CStr::from_ptr(msg_ptr)
552                                .to_string_lossy()
553                                .into_owned()
554                        } else {
555                            "Unknown error".to_string()
556                        }
557                    }
558                } else {
559                    "Failed to open database".to_string()
560                };
561                return Err(format!("SQLITE_ERROR: {}", err_msg));
562            }
563
564            Ok(db)
565        })
566        .map_err(|e| DatabaseError::new("OPEN_ERROR", &e))?;
567
568        log::info!(
569            "Successfully opened database {} with VFS {}",
570            normalized_name,
571            vfs_name
572        );
573
574        // Initialize metrics for telemetry
575        #[cfg(feature = "telemetry")]
576        let metrics = crate::telemetry::Metrics::new().map_err(|e| {
577            DatabaseError::new(
578                "METRICS_ERROR",
579                &format!("Failed to initialize metrics: {}", e),
580            )
581        })?;
582
583        Ok(Database {
584            connection_state,
585            name: normalized_name, // CRITICAL: Store normalized name WITH .db
586            on_data_change_callback: None,
587            allow_non_leader_writes: false,
588            optimistic_updates_manager: std::cell::RefCell::new(
589                crate::storage::optimistic_updates::OptimisticUpdatesManager::new(),
590            ),
591            coordination_metrics_manager: std::cell::RefCell::new(
592                crate::storage::coordination_metrics::CoordinationMetricsManager::new(),
593            ),
594            #[cfg(feature = "telemetry")]
595            metrics: Some(metrics),
596            #[cfg(feature = "telemetry")]
597            span_recorder: None,
598            #[cfg(feature = "telemetry")]
599            span_context: Some(crate::telemetry::SpanContext::new()),
600            max_export_size_bytes: Some(2 * 1024 * 1024 * 1024), // Default 2GB limit
601        })
602    }
603
604    pub async fn execute_internal(&mut self, sql: &str) -> Result<QueryResult, DatabaseError> {
605        use std::ffi::{CStr, CString};
606        let start_time = js_sys::Date::now();
607
608        // Create span for query execution and enter context
609        #[cfg(feature = "telemetry")]
610        let span = if self.span_recorder.is_some() {
611            let query_type = sql
612                .trim()
613                .split_whitespace()
614                .next()
615                .unwrap_or("UNKNOWN")
616                .to_uppercase();
617            let mut builder = crate::telemetry::SpanBuilder::new("execute_query".to_string())
618                .with_attribute("query_type", query_type.clone())
619                .with_attribute("sql", sql.to_string());
620
621            // Attach baggage from context
622            if let Some(ref context) = self.span_context {
623                builder = builder.with_baggage_from_context(context);
624            }
625
626            let span = builder.build();
627
628            // Enter span context
629            if let Some(ref context) = self.span_context {
630                context.enter_span(span.span_id.clone());
631            }
632
633            Some(span)
634        } else {
635            None
636        };
637
638        // Track query execution metrics
639        #[cfg(feature = "telemetry")]
640        #[cfg(feature = "telemetry")]
641        if let Some(metrics) = &self.metrics {
642            metrics.queries_total().inc();
643        }
644
645        // Validate connection pointer before using it
646        if self.db().is_null() {
647            return Err(DatabaseError::new(
648                "NULL_CONNECTION",
649                "Database connection is null",
650            ));
651        }
652
653        let sql_cstr = CString::new(sql)
654            .map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL string"))?;
655
656        if sql.trim().to_uppercase().starts_with("SELECT") {
657            let mut stmt = std::ptr::null_mut();
658            let ret = unsafe {
659                sqlite_wasm_rs::sqlite3_prepare_v2(
660                    self.db(),
661                    sql_cstr.as_ptr(),
662                    -1,
663                    &mut stmt,
664                    std::ptr::null_mut(),
665                )
666            };
667
668            if ret != sqlite_wasm_rs::SQLITE_OK {
669                // Get actual error message from SQLite
670                let err_msg = unsafe {
671                    let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
672                    if !msg_ptr.is_null() {
673                        CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
674                    } else {
675                        format!("Unknown error (code: {})", ret)
676                    }
677                };
678
679                // Track error
680                #[cfg(feature = "telemetry")]
681                #[cfg(feature = "telemetry")]
682                if let Some(metrics) = &self.metrics {
683                    metrics.errors_total().inc();
684                }
685
686                // Finish span with error
687                #[cfg(feature = "telemetry")]
688                if let Some(mut s) = span {
689                    s.status = crate::telemetry::SpanStatus::Error(format!(
690                        "Failed to prepare: {}",
691                        err_msg
692                    ));
693                    s.end_time_ms = Some(js_sys::Date::now());
694                    if let Some(recorder) = &self.span_recorder {
695                        recorder.record_span(s);
696                    }
697
698                    // Exit span context
699                    if let Some(ref context) = self.span_context {
700                        context.exit_span();
701                    }
702                }
703
704                return Err(DatabaseError::new(
705                    "SQLITE_ERROR",
706                    &format!("Failed to prepare statement: {}", err_msg),
707                )
708                .with_sql(sql));
709            }
710
711            let column_count = unsafe { sqlite_wasm_rs::sqlite3_column_count(stmt) };
712            let mut columns = Vec::new();
713            let mut rows = Vec::new();
714
715            // Get column names
716            for i in 0..column_count {
717                let col_name = unsafe {
718                    let name_ptr = sqlite_wasm_rs::sqlite3_column_name(stmt, i);
719                    if name_ptr.is_null() {
720                        format!("col_{}", i)
721                    } else {
722                        std::ffi::CStr::from_ptr(name_ptr)
723                            .to_string_lossy()
724                            .into_owned()
725                    }
726                };
727                columns.push(col_name);
728            }
729
730            // Execute and fetch rows
731            loop {
732                let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
733                if step_ret == sqlite_wasm_rs::SQLITE_ROW {
734                    let mut values = Vec::new();
735                    for i in 0..column_count {
736                        let value = unsafe {
737                            let col_type = sqlite_wasm_rs::sqlite3_column_type(stmt, i);
738                            match col_type {
739                                sqlite_wasm_rs::SQLITE_NULL => ColumnValue::Null,
740                                sqlite_wasm_rs::SQLITE_INTEGER => {
741                                    let val = sqlite_wasm_rs::sqlite3_column_int64(stmt, i);
742                                    ColumnValue::Integer(val)
743                                }
744                                sqlite_wasm_rs::SQLITE_FLOAT => {
745                                    let val = sqlite_wasm_rs::sqlite3_column_double(stmt, i);
746                                    ColumnValue::Real(val)
747                                }
748                                sqlite_wasm_rs::SQLITE_TEXT => {
749                                    let text_ptr = sqlite_wasm_rs::sqlite3_column_text(stmt, i);
750                                    if text_ptr.is_null() {
751                                        ColumnValue::Null
752                                    } else {
753                                        let text = std::ffi::CStr::from_ptr(text_ptr as *const i8)
754                                            .to_string_lossy()
755                                            .into_owned();
756                                        ColumnValue::Text(text)
757                                    }
758                                }
759                                sqlite_wasm_rs::SQLITE_BLOB => {
760                                    let blob_ptr = sqlite_wasm_rs::sqlite3_column_blob(stmt, i);
761                                    let blob_size = sqlite_wasm_rs::sqlite3_column_bytes(stmt, i);
762                                    if blob_ptr.is_null() || blob_size == 0 {
763                                        ColumnValue::Blob(vec![])
764                                    } else {
765                                        let blob_slice = std::slice::from_raw_parts(
766                                            blob_ptr as *const u8,
767                                            blob_size as usize,
768                                        );
769                                        ColumnValue::Blob(blob_slice.to_vec())
770                                    }
771                                }
772                                _ => ColumnValue::Null,
773                            }
774                        };
775                        values.push(value);
776                    }
777                    rows.push(Row { values });
778                } else if step_ret == sqlite_wasm_rs::SQLITE_DONE {
779                    break;
780                } else {
781                    // Get SQLite error message before finalizing
782                    let err_msg = unsafe {
783                        let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
784                        if !err_ptr.is_null() {
785                            std::ffi::CStr::from_ptr(err_ptr)
786                                .to_string_lossy()
787                                .to_string()
788                        } else {
789                            "Unknown SQLite error".to_string()
790                        }
791                    };
792                    unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
793                    // Track error
794                    #[cfg(feature = "telemetry")]
795                    if let Some(metrics) = &self.metrics {
796                        metrics.errors_total().inc();
797                    }
798                    return Err(DatabaseError::new(
799                        "SQLITE_ERROR",
800                        &format!("Error executing SELECT statement: {}", err_msg),
801                    )
802                    .with_sql(sql));
803                }
804            }
805
806            unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
807            let execution_time_ms = js_sys::Date::now() - start_time;
808
809            // Track query duration
810            #[cfg(feature = "telemetry")]
811            if let Some(metrics) = &self.metrics {
812                metrics.query_duration().observe(execution_time_ms);
813            }
814
815            Ok(QueryResult {
816                columns,
817                rows,
818                affected_rows: 0,
819                last_insert_id: None,
820                execution_time_ms,
821            })
822        } else {
823            // Non-SELECT statements - Use prepare/step to properly handle PRAGMA results
824            let mut stmt: *mut sqlite_wasm_rs::sqlite3_stmt = std::ptr::null_mut();
825            let ret = unsafe {
826                sqlite_wasm_rs::sqlite3_prepare_v2(
827                    self.db(),
828                    sql_cstr.as_ptr(),
829                    -1,
830                    &mut stmt,
831                    std::ptr::null_mut(),
832                )
833            };
834
835            if ret != sqlite_wasm_rs::SQLITE_OK {
836                // Get actual error message from SQLite
837                let err_msg = unsafe {
838                    let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
839                    if !msg_ptr.is_null() {
840                        CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
841                    } else {
842                        format!("Unknown error (code: {})", ret)
843                    }
844                };
845
846                // Track error
847                #[cfg(feature = "telemetry")]
848                if let Some(metrics) = &self.metrics {
849                    metrics.errors_total().inc();
850                }
851                return Err(DatabaseError::new(
852                    "SQLITE_ERROR",
853                    &format!("Failed to prepare statement: {}", err_msg),
854                )
855                .with_sql(sql));
856            }
857
858            // Get column info for PRAGMA statements that return results
859            let column_count = unsafe { sqlite_wasm_rs::sqlite3_column_count(stmt) };
860            let mut columns = Vec::new();
861            let mut rows = Vec::new();
862
863            if column_count > 0 {
864                // This is a PRAGMA or other statement that returns rows
865                for i in 0..column_count {
866                    let col_name = unsafe {
867                        let name_ptr = sqlite_wasm_rs::sqlite3_column_name(stmt, i);
868                        if name_ptr.is_null() {
869                            format!("column_{}", i)
870                        } else {
871                            std::ffi::CStr::from_ptr(name_ptr)
872                                .to_string_lossy()
873                                .into_owned()
874                        }
875                    };
876                    columns.push(col_name);
877                }
878
879                // Fetch all rows
880                loop {
881                    let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
882                    if step_ret == sqlite_wasm_rs::SQLITE_ROW {
883                        let mut values = Vec::new();
884                        for i in 0..column_count {
885                            let value = unsafe {
886                                let col_type = sqlite_wasm_rs::sqlite3_column_type(stmt, i);
887                                match col_type {
888                                    sqlite_wasm_rs::SQLITE_TEXT => {
889                                        let text_ptr = sqlite_wasm_rs::sqlite3_column_text(stmt, i);
890                                        if text_ptr.is_null() {
891                                            ColumnValue::Null
892                                        } else {
893                                            let text =
894                                                std::ffi::CStr::from_ptr(text_ptr as *const i8)
895                                                    .to_string_lossy()
896                                                    .into_owned();
897                                            ColumnValue::Text(text)
898                                        }
899                                    }
900                                    sqlite_wasm_rs::SQLITE_INTEGER => ColumnValue::Integer(
901                                        sqlite_wasm_rs::sqlite3_column_int64(stmt, i),
902                                    ),
903                                    _ => ColumnValue::Null,
904                                }
905                            };
906                            values.push(value);
907                        }
908                        rows.push(Row { values });
909                    } else if step_ret == sqlite_wasm_rs::SQLITE_DONE {
910                        break;
911                    } else {
912                        // Get SQLite error message before finalizing
913                        let err_msg = unsafe {
914                            let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
915                            if !err_ptr.is_null() {
916                                std::ffi::CStr::from_ptr(err_ptr)
917                                    .to_string_lossy()
918                                    .to_string()
919                            } else {
920                                "Unknown SQLite error".to_string()
921                            }
922                        };
923                        unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
924                        // Track error
925                        #[cfg(feature = "telemetry")]
926                        if let Some(metrics) = &self.metrics {
927                            metrics.errors_total().inc();
928                        }
929                        return Err(DatabaseError::new(
930                            "SQLITE_ERROR",
931                            &format!("Failed to execute statement: {}", err_msg),
932                        )
933                        .with_sql(sql));
934                    }
935                }
936            } else {
937                // Regular non-SELECT statement
938                let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
939                if step_ret != sqlite_wasm_rs::SQLITE_DONE {
940                    // Get SQLite error message before finalizing
941                    let err_msg = unsafe {
942                        let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
943                        if !err_ptr.is_null() {
944                            std::ffi::CStr::from_ptr(err_ptr)
945                                .to_string_lossy()
946                                .to_string()
947                        } else {
948                            "Unknown SQLite error".to_string()
949                        }
950                    };
951                    unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
952                    // Track error
953                    #[cfg(feature = "telemetry")]
954                    if let Some(metrics) = &self.metrics {
955                        metrics.errors_total().inc();
956                    }
957                    return Err(DatabaseError::new(
958                        "SQLITE_ERROR",
959                        &format!("Failed to execute statement: {}", err_msg),
960                    )
961                    .with_sql(sql));
962                }
963            }
964
965            // Finalize to complete the statement
966            unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
967
968            let affected_rows = unsafe { sqlite_wasm_rs::sqlite3_changes(self.db()) } as u32;
969            let last_insert_id = if sql.trim().to_uppercase().starts_with("INSERT") {
970                Some(unsafe { sqlite_wasm_rs::sqlite3_last_insert_rowid(self.db()) })
971            } else {
972                None
973            };
974
975            let execution_time_ms = js_sys::Date::now() - start_time;
976
977            // Track query duration
978            #[cfg(feature = "telemetry")]
979            if let Some(metrics) = &self.metrics {
980                metrics.query_duration().observe(execution_time_ms);
981            }
982
983            // Finish span successfully
984            #[cfg(feature = "telemetry")]
985            if let Some(mut s) = span {
986                s.status = crate::telemetry::SpanStatus::Ok;
987                s.end_time_ms = Some(js_sys::Date::now());
988                s.attributes
989                    .insert("duration_ms".to_string(), execution_time_ms.to_string());
990                s.attributes
991                    .insert("affected_rows".to_string(), affected_rows.to_string());
992                s.attributes
993                    .insert("row_count".to_string(), rows.len().to_string());
994                if let Some(recorder) = &self.span_recorder {
995                    recorder.record_span(s);
996                }
997
998                // Exit span context
999                if let Some(ref context) = self.span_context {
1000                    context.exit_span();
1001                }
1002            }
1003
1004            Ok(QueryResult {
1005                columns,
1006                rows,
1007                affected_rows,
1008                last_insert_id,
1009                execution_time_ms,
1010            })
1011        }
1012    }
1013
1014    pub async fn execute_with_params_internal(
1015        &mut self,
1016        sql: &str,
1017        params: &[ColumnValue],
1018    ) -> Result<QueryResult, DatabaseError> {
1019        use std::ffi::{CStr, CString};
1020        let start_time = js_sys::Date::now();
1021
1022        // Create span for query execution
1023        #[cfg(feature = "telemetry")]
1024        let span = if self.span_recorder.is_some() {
1025            let query_type = sql
1026                .trim()
1027                .split_whitespace()
1028                .next()
1029                .unwrap_or("UNKNOWN")
1030                .to_uppercase();
1031            let span = crate::telemetry::SpanBuilder::new("execute_query".to_string())
1032                .with_attribute("query_type", query_type.clone())
1033                .with_attribute("sql", sql.to_string())
1034                .build();
1035            Some(span)
1036        } else {
1037            None
1038        };
1039
1040        // Ensure metrics are propagated to BlockStorage before execution
1041        #[cfg(feature = "telemetry")]
1042        self.ensure_metrics_propagated();
1043
1044        // Track query execution metrics
1045        #[cfg(feature = "telemetry")]
1046        if let Some(metrics) = &self.metrics {
1047            metrics.queries_total().inc();
1048        }
1049
1050        let sql_cstr = CString::new(sql)
1051            .map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL string"))?;
1052
1053        let mut stmt = std::ptr::null_mut();
1054        let ret = unsafe {
1055            sqlite_wasm_rs::sqlite3_prepare_v2(
1056                self.db(),
1057                sql_cstr.as_ptr(),
1058                -1,
1059                &mut stmt,
1060                std::ptr::null_mut(),
1061            )
1062        };
1063
1064        if ret != sqlite_wasm_rs::SQLITE_OK {
1065            // Get actual error message from SQLite
1066            let err_msg = unsafe {
1067                let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
1068                if !msg_ptr.is_null() {
1069                    CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
1070                } else {
1071                    format!("Unknown error (code: {})", ret)
1072                }
1073            };
1074
1075            // Track error
1076            #[cfg(feature = "telemetry")]
1077            if let Some(metrics) = &self.metrics {
1078                metrics.errors_total().inc();
1079            }
1080
1081            // Finish span with error
1082            #[cfg(feature = "telemetry")]
1083            if let Some(mut s) = span {
1084                s.status =
1085                    crate::telemetry::SpanStatus::Error(format!("Failed to prepare: {}", err_msg));
1086                s.end_time_ms = Some(js_sys::Date::now());
1087                if let Some(recorder) = &self.span_recorder {
1088                    recorder.record_span(s);
1089                }
1090            }
1091
1092            return Err(DatabaseError::new(
1093                "SQLITE_ERROR",
1094                &format!("Failed to prepare statement: {}", err_msg),
1095            )
1096            .with_sql(sql));
1097        }
1098
1099        // Bind parameters
1100        let mut text_cstrings = Vec::new(); // Keep CStrings alive
1101        for (i, param) in params.iter().enumerate() {
1102            let param_index = (i + 1) as i32;
1103            let bind_ret = unsafe {
1104                match param {
1105                    ColumnValue::Null => sqlite_wasm_rs::sqlite3_bind_null(stmt, param_index),
1106                    ColumnValue::Integer(val) => {
1107                        sqlite_wasm_rs::sqlite3_bind_int64(stmt, param_index, *val)
1108                    }
1109                    ColumnValue::Real(val) => {
1110                        sqlite_wasm_rs::sqlite3_bind_double(stmt, param_index, *val)
1111                    }
1112                    ColumnValue::Text(val) => {
1113                        // Sanitize string by removing null bytes (SQLite text shouldn't contain them)
1114                        let sanitized = val.replace('\0', "");
1115                        // Safe: after removing null bytes, CString::new cannot fail
1116                        let text_cstr = CString::new(sanitized.as_str())
1117                            .expect("CString::new should not fail after null byte removal");
1118                        let result = sqlite_wasm_rs::sqlite3_bind_text(
1119                            stmt,
1120                            param_index,
1121                            text_cstr.as_ptr(),
1122                            sanitized.len() as i32,
1123                            sqlite_wasm_rs::SQLITE_TRANSIENT(),
1124                        );
1125                        text_cstrings.push(text_cstr); // Keep alive
1126                        result
1127                    }
1128                    ColumnValue::Blob(val) => sqlite_wasm_rs::sqlite3_bind_blob(
1129                        stmt,
1130                        param_index,
1131                        val.as_ptr() as *const _,
1132                        val.len() as i32,
1133                        sqlite_wasm_rs::SQLITE_TRANSIENT(),
1134                    ),
1135                    _ => sqlite_wasm_rs::sqlite3_bind_null(stmt, param_index),
1136                }
1137            };
1138
1139            if bind_ret != sqlite_wasm_rs::SQLITE_OK {
1140                unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
1141                // Track error
1142                #[cfg(feature = "telemetry")]
1143                if let Some(metrics) = &self.metrics {
1144                    metrics.errors_total().inc();
1145                }
1146                return Err(
1147                    DatabaseError::new("SQLITE_ERROR", "Failed to bind parameter").with_sql(sql),
1148                );
1149            }
1150        }
1151
1152        if sql.trim().to_uppercase().starts_with("SELECT") {
1153            let column_count = unsafe { sqlite_wasm_rs::sqlite3_column_count(stmt) };
1154            let mut columns = Vec::new();
1155            let mut rows = Vec::new();
1156
1157            // Get column names
1158            for i in 0..column_count {
1159                let col_name = unsafe {
1160                    let name_ptr = sqlite_wasm_rs::sqlite3_column_name(stmt, i);
1161                    if name_ptr.is_null() {
1162                        format!("col_{}", i)
1163                    } else {
1164                        std::ffi::CStr::from_ptr(name_ptr)
1165                            .to_string_lossy()
1166                            .into_owned()
1167                    }
1168                };
1169                columns.push(col_name);
1170            }
1171
1172            // Execute and fetch rows
1173            loop {
1174                let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
1175                if step_ret == sqlite_wasm_rs::SQLITE_ROW {
1176                    let mut values = Vec::new();
1177                    for i in 0..column_count {
1178                        let value = unsafe {
1179                            let col_type = sqlite_wasm_rs::sqlite3_column_type(stmt, i);
1180                            match col_type {
1181                                sqlite_wasm_rs::SQLITE_NULL => ColumnValue::Null,
1182                                sqlite_wasm_rs::SQLITE_INTEGER => {
1183                                    let val = sqlite_wasm_rs::sqlite3_column_int64(stmt, i);
1184                                    ColumnValue::Integer(val)
1185                                }
1186                                sqlite_wasm_rs::SQLITE_FLOAT => {
1187                                    let val = sqlite_wasm_rs::sqlite3_column_double(stmt, i);
1188                                    ColumnValue::Real(val)
1189                                }
1190                                sqlite_wasm_rs::SQLITE_TEXT => {
1191                                    let text_ptr = sqlite_wasm_rs::sqlite3_column_text(stmt, i);
1192                                    if text_ptr.is_null() {
1193                                        ColumnValue::Null
1194                                    } else {
1195                                        let text = std::ffi::CStr::from_ptr(text_ptr as *const i8)
1196                                            .to_string_lossy()
1197                                            .into_owned();
1198                                        ColumnValue::Text(text)
1199                                    }
1200                                }
1201                                sqlite_wasm_rs::SQLITE_BLOB => {
1202                                    let blob_ptr = sqlite_wasm_rs::sqlite3_column_blob(stmt, i);
1203                                    let blob_size = sqlite_wasm_rs::sqlite3_column_bytes(stmt, i);
1204                                    if blob_ptr.is_null() || blob_size == 0 {
1205                                        ColumnValue::Blob(vec![])
1206                                    } else {
1207                                        let blob_slice = std::slice::from_raw_parts(
1208                                            blob_ptr as *const u8,
1209                                            blob_size as usize,
1210                                        );
1211                                        ColumnValue::Blob(blob_slice.to_vec())
1212                                    }
1213                                }
1214                                _ => ColumnValue::Null,
1215                            }
1216                        };
1217                        values.push(value);
1218                    }
1219                    rows.push(Row { values });
1220                } else if step_ret == sqlite_wasm_rs::SQLITE_DONE {
1221                    break;
1222                } else {
1223                    // Get SQLite error message before finalizing
1224                    let err_msg = unsafe {
1225                        let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
1226                        if !err_ptr.is_null() {
1227                            std::ffi::CStr::from_ptr(err_ptr)
1228                                .to_string_lossy()
1229                                .to_string()
1230                        } else {
1231                            "Unknown SQLite error".to_string()
1232                        }
1233                    };
1234                    unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
1235                    // Track error
1236                    #[cfg(feature = "telemetry")]
1237                    if let Some(metrics) = &self.metrics {
1238                        metrics.errors_total().inc();
1239                    }
1240                    return Err(DatabaseError::new(
1241                        "SQLITE_ERROR",
1242                        &format!("Error executing SELECT statement: {}", err_msg),
1243                    )
1244                    .with_sql(sql));
1245                }
1246            }
1247
1248            unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
1249
1250            let execution_time_ms = js_sys::Date::now() - start_time;
1251
1252            // Track query duration
1253            #[cfg(feature = "telemetry")]
1254            if let Some(metrics) = &self.metrics {
1255                metrics.query_duration().observe(execution_time_ms);
1256            }
1257
1258            // Finish span successfully for SELECT query
1259            #[cfg(feature = "telemetry")]
1260            if let Some(mut s) = span {
1261                s.status = crate::telemetry::SpanStatus::Ok;
1262                s.end_time_ms = Some(js_sys::Date::now());
1263                s.attributes
1264                    .insert("duration_ms".to_string(), execution_time_ms.to_string());
1265                s.attributes
1266                    .insert("row_count".to_string(), rows.len().to_string());
1267                if let Some(recorder) = &self.span_recorder {
1268                    recorder.record_span(s);
1269                }
1270            }
1271
1272            Ok(QueryResult {
1273                columns,
1274                rows,
1275                affected_rows: 0,
1276                last_insert_id: None,
1277                execution_time_ms,
1278            })
1279        } else {
1280            // Non-SELECT statements
1281            let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
1282            // Track error
1283            #[cfg(feature = "telemetry")]
1284            if let Some(metrics) = &self.metrics {
1285                metrics.errors_total().inc();
1286            }
1287            unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
1288
1289            if step_ret != sqlite_wasm_rs::SQLITE_DONE {
1290                let err_msg = unsafe {
1291                    let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
1292                    if !err_ptr.is_null() {
1293                        std::ffi::CStr::from_ptr(err_ptr)
1294                            .to_string_lossy()
1295                            .to_string()
1296                    } else {
1297                        "Unknown SQLite error".to_string()
1298                    }
1299                };
1300                return Err(DatabaseError::new(
1301                    "SQLITE_ERROR",
1302                    &format!("Failed to execute statement: {}", err_msg),
1303                )
1304                .with_sql(sql));
1305            }
1306
1307            let execution_time_ms = js_sys::Date::now() - start_time;
1308
1309            // Track query duration
1310            #[cfg(feature = "telemetry")]
1311            if let Some(metrics) = &self.metrics {
1312                metrics.query_duration().observe(execution_time_ms);
1313            }
1314            let affected_rows = unsafe { sqlite_wasm_rs::sqlite3_changes(self.db()) } as u32;
1315            let last_insert_id = if sql.trim().to_uppercase().starts_with("INSERT") {
1316                Some(unsafe { sqlite_wasm_rs::sqlite3_last_insert_rowid(self.db()) })
1317            } else {
1318                None
1319            };
1320
1321            // Finish span successfully
1322            #[cfg(feature = "telemetry")]
1323            if let Some(mut s) = span {
1324                s.status = crate::telemetry::SpanStatus::Ok;
1325                s.end_time_ms = Some(js_sys::Date::now());
1326                s.attributes
1327                    .insert("duration_ms".to_string(), execution_time_ms.to_string());
1328                s.attributes
1329                    .insert("affected_rows".to_string(), affected_rows.to_string());
1330                if let Some(recorder) = &self.span_recorder {
1331                    recorder.record_span(s);
1332                }
1333            }
1334
1335            Ok(QueryResult {
1336                columns: vec![],
1337                rows: vec![],
1338                affected_rows,
1339                last_insert_id,
1340                execution_time_ms,
1341            })
1342        }
1343    }
1344
1345    /// Set telemetry metrics for this database instance
1346    #[cfg(feature = "telemetry")]
1347    pub fn set_metrics(&mut self, metrics: Option<crate::telemetry::Metrics>) {
1348        self.metrics = metrics.clone();
1349        self.ensure_metrics_propagated();
1350    }
1351
1352    /// Set span recorder for distributed tracing
1353    #[cfg(feature = "telemetry")]
1354    pub fn set_span_recorder(&mut self, recorder: Option<crate::telemetry::SpanRecorder>) {
1355        self.span_recorder = recorder;
1356    }
1357
1358    /// Get span context for distributed tracing
1359    #[cfg(feature = "telemetry")]
1360    pub fn get_span_context(&self) -> Option<&crate::telemetry::SpanContext> {
1361        self.span_context.as_ref()
1362    }
1363
1364    /// Get span recorder for distributed tracing
1365    #[cfg(feature = "telemetry")]
1366    pub fn get_span_recorder(&self) -> Option<&crate::telemetry::SpanRecorder> {
1367        self.span_recorder.as_ref()
1368    }
1369    /// Ensure metrics are propagated to BlockStorage
1370    #[cfg(feature = "telemetry")]
1371    fn ensure_metrics_propagated(&self) {
1372        // Propagate metrics to BlockStorage via STORAGE_REGISTRY
1373        #[cfg(target_arch = "wasm32")]
1374        {
1375            use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
1376
1377            if self.metrics.is_none() {
1378                return;
1379            }
1380
1381            let db_name = &self.name;
1382
1383            if let Some(storage_rc) = get_storage_with_fallback(db_name) {
1384                use crate::vfs::indexeddb_vfs::with_storage_borrow_mut;
1385                let _ = with_storage_borrow_mut(&storage_rc, "set_metrics", |storage| {
1386                    storage.set_metrics(self.metrics.clone());
1387                    Ok(())
1388                });
1389            }
1390        }
1391    }
1392
1393    pub async fn close_internal(&mut self) -> Result<(), DatabaseError> {
1394        log::info!("CLOSE_INTERNAL STARTED for: {}", self.name);
1395
1396        // Check if connection is already null (e.g., after import force-close)
1397        if self.connection_state.db.get().is_null() {
1398            log::info!(
1399                "Connection already null for {}, skipping close operations",
1400                self.name
1401            );
1402            return Ok(());
1403        }
1404
1405        // Checkpoint WAL data before close using PASSIVE mode (non-blocking)
1406        log::info!("Checkpointing WAL before close: {}", self.name);
1407        let _ = self
1408            .execute_internal("PRAGMA wal_checkpoint(PASSIVE)")
1409            .await;
1410        log::info!("WAL checkpoint completed for: {}", self.name);
1411
1412        // Sync to IndexedDB before closing to ensure data persists
1413        log::info!("Syncing database before close: {}", self.name);
1414        self.sync_internal().await?;
1415        log::info!("Sync completed for: {}", self.name);
1416
1417        web_sys::console::log_1(
1418            &format!("CLOSE: About to stop leader election for {}", self.name).into(),
1419        );
1420
1421        // Stop leader election before closing
1422        #[cfg(target_arch = "wasm32")]
1423        {
1424            use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
1425
1426            let db_name = &self.name;
1427            web_sys::console::log_1(
1428                &format!("STOP_ELECTION: Getting storage for {}", db_name).into(),
1429            );
1430            log::info!("STOP_ELECTION: Getting storage for {}", db_name);
1431            let storage_rc = get_storage_with_fallback(db_name);
1432
1433            if let Some(storage_rc) = storage_rc {
1434                log::info!("STOP_ELECTION: Found storage for {}, calling stop", db_name);
1435                match with_storage_async!(storage_rc, "stop_leader_election", |storage| storage
1436                    .stop_leader_election())
1437                {
1438                    Some(Ok(())) => {
1439                        log::info!("STOP_ELECTION: Successfully stopped for {}", db_name);
1440                    }
1441                    Some(Err(e)) => {
1442                        log::warn!("STOP_ELECTION: Failed for {}: {:?}", db_name, e);
1443                    }
1444                    None => {
1445                        log::warn!("STOP_ELECTION: Borrow failed for {}", db_name);
1446                    }
1447                }
1448            } else {
1449                log::warn!("STOP_ELECTION: No storage found for {}", db_name);
1450            }
1451            log::info!("STOP_ELECTION: Completed section for {}", db_name);
1452        }
1453
1454        // NOTE: Do NOT call release_connection here
1455        // The Drop impl will handle releasing the connection when the Database instance is dropped
1456        // Calling it here would cause a double-release when both close() and Drop are called
1457
1458        log::info!(
1459            "Closed database: {} (connection will be released on Drop)",
1460            self.name
1461        );
1462        Ok(())
1463    }
1464
1465    /// Query database and return rows (alias for execute that returns rows)
1466    pub async fn query(&mut self, sql: &str) -> Result<Vec<Row>, DatabaseError> {
1467        let result = self.execute_internal(sql).await?;
1468        Ok(result.rows)
1469    }
1470
1471    pub async fn sync_internal(&mut self) -> Result<(), DatabaseError> {
1472        // Start timing for telemetry
1473        #[cfg(all(target_arch = "wasm32", feature = "telemetry"))]
1474        let start_time = js_sys::Date::now();
1475
1476        // Create span for VFS sync operation with automatic context linking
1477        #[cfg(feature = "telemetry")]
1478        let span = if self.span_recorder.is_some() {
1479            let mut builder = crate::telemetry::SpanBuilder::new("vfs_sync".to_string())
1480                .with_attribute("operation", "sync");
1481
1482            // Automatically link to parent span via context and copy baggage
1483            if let Some(ref context) = self.span_context {
1484                builder = builder
1485                    .with_context(context)
1486                    .with_baggage_from_context(context);
1487            }
1488
1489            let span = builder.build();
1490
1491            // Enter this span's context
1492            if let Some(ref context) = self.span_context {
1493                context.enter_span(span.span_id.clone());
1494            }
1495
1496            Some(span)
1497        } else {
1498            None
1499        };
1500
1501        // Track sync operation start
1502        #[cfg(feature = "telemetry")]
1503        if let Some(ref metrics) = self.metrics {
1504            metrics.sync_operations_total().inc();
1505        }
1506
1507        // Track blocks persisted for span attributes
1508        #[cfg(feature = "telemetry")]
1509        let mut blocks_count = 0;
1510
1511        // Trigger VFS sync to persist all blocks to IndexedDB
1512        #[cfg(target_arch = "wasm32")]
1513        {
1514            use crate::storage::vfs_sync;
1515
1516            // Collect blocks from GLOBAL_STORAGE (where VFS writes them)
1517            // CRITICAL: Use self.name WITH .db - matches main branch behavior
1518            // Database.name already normalized by normalize_db_name() in Database::new()
1519            let storage_name = &self.name;
1520
1521            // Advance commit marker
1522            let next_commit = vfs_sync::with_global_commit_marker(|cm| {
1523                #[cfg(target_arch = "wasm32")]
1524                let mut cm_ref = cm.borrow_mut();
1525                #[cfg(not(target_arch = "wasm32"))]
1526                let mut cm_ref = cm.lock();
1527
1528                let current = cm_ref.get(storage_name).copied().unwrap_or(0);
1529                let new_marker = current + 1;
1530                cm_ref.insert(storage_name.to_string(), new_marker);
1531                log::debug!(
1532                    "Advanced commit marker for {} from {} to {}",
1533                    storage_name,
1534                    current,
1535                    new_marker
1536                );
1537                new_marker
1538            });
1539
1540            web_sys::console::log_1(
1541                &format!(
1542                    "[SYNC] Collecting blocks from GLOBAL_STORAGE for: {}",
1543                    storage_name
1544                )
1545                .into(),
1546            );
1547            let (blocks_to_persist, metadata_to_persist) =
1548                vfs_sync::with_global_storage(|storage| {
1549                    #[cfg(target_arch = "wasm32")]
1550                    let storage_map = storage.borrow();
1551                    #[cfg(not(target_arch = "wasm32"))]
1552                    let storage_map = storage.lock();
1553
1554                    let blocks = if let Some(db_storage) = storage_map.get(storage_name) {
1555                        let count = db_storage.len();
1556                        web_sys::console::log_1(
1557                            &format!("[SYNC] Found {} blocks in GLOBAL_STORAGE", count).into(),
1558                        );
1559                        db_storage
1560                            .iter()
1561                            .map(|(&id, data)| (id, data.clone()))
1562                            .collect::<Vec<_>>()
1563                    } else {
1564                        web_sys::console::log_1(
1565                            &format!(
1566                                "[SYNC] No blocks found in GLOBAL_STORAGE for {}",
1567                                storage_name
1568                            )
1569                            .into(),
1570                        );
1571                        Vec::new()
1572                    };
1573
1574                    let metadata = vfs_sync::with_global_metadata(|meta| {
1575                        #[cfg(target_arch = "wasm32")]
1576                        let meta_map = meta.borrow();
1577                        #[cfg(not(target_arch = "wasm32"))]
1578                        let meta_map = meta.lock();
1579                        if let Some(db_meta) = meta_map.get(storage_name) {
1580                            let count = db_meta.len();
1581                            web_sys::console::log_1(
1582                                &format!("[SYNC] Found {} metadata entries", count).into(),
1583                            );
1584                            db_meta
1585                                .iter()
1586                                .map(|(&id, metadata)| (id, metadata.checksum))
1587                                .collect::<Vec<_>>()
1588                        } else {
1589                            web_sys::console::log_1(&format!("[SYNC] No metadata found").into());
1590                            Vec::new()
1591                        }
1592                    });
1593
1594                    (blocks, metadata)
1595                });
1596
1597            web_sys::console::log_1(
1598                &format!(
1599                    "[SYNC] Preparing to persist {} blocks to IndexedDB",
1600                    blocks_to_persist.len()
1601                )
1602                .into(),
1603            );
1604
1605            if !blocks_to_persist.is_empty() {
1606                #[cfg(feature = "telemetry")]
1607                {
1608                    blocks_count = blocks_to_persist.len();
1609                }
1610                web_sys::console::log_1(
1611                    &format!(
1612                        "[SYNC] Persisting {} blocks to IndexedDB",
1613                        blocks_to_persist.len()
1614                    )
1615                    .into(),
1616                );
1617                crate::storage::wasm_indexeddb::persist_to_indexeddb_event_based(
1618                    storage_name,
1619                    blocks_to_persist,
1620                    metadata_to_persist,
1621                    next_commit,
1622                    #[cfg(feature = "telemetry")]
1623                    self.span_recorder.clone(),
1624                    #[cfg(feature = "telemetry")]
1625                    span.as_ref().map(|s| s.span_id.clone()),
1626                )
1627                .await?;
1628                web_sys::console::log_1(
1629                    &format!("[SYNC] Successfully persisted to IndexedDB").into(),
1630                );
1631            } else {
1632                web_sys::console::log_1(
1633                    &format!("[SYNC] WARNING: No blocks to persist - GLOBAL_STORAGE is empty!")
1634                        .into(),
1635                );
1636            }
1637
1638            // Send notification after successful sync
1639            use crate::storage::broadcast_notifications::{
1640                BroadcastNotification, send_change_notification,
1641            };
1642
1643            let notification = BroadcastNotification::DataChanged {
1644                db_name: self.name.clone(),
1645                timestamp: js_sys::Date::now() as u64,
1646            };
1647
1648            log::debug!("Sending DataChanged notification for {}", self.name);
1649
1650            if let Err(e) = send_change_notification(&notification) {
1651                log::warn!("Failed to send change notification: {}", e);
1652                // Don't fail the sync if notification fails
1653            }
1654        }
1655
1656        // Record sync duration
1657        #[cfg(all(target_arch = "wasm32", feature = "telemetry"))]
1658        if let Some(ref metrics) = self.metrics {
1659            let duration_ms = js_sys::Date::now() - start_time;
1660            metrics.sync_duration().observe(duration_ms);
1661        }
1662
1663        // Finish span successfully
1664        #[cfg(feature = "telemetry")]
1665        if let Some(mut s) = span {
1666            s.status = crate::telemetry::SpanStatus::Ok;
1667            #[cfg(target_arch = "wasm32")]
1668            {
1669                s.end_time_ms = Some(js_sys::Date::now());
1670                let duration_ms = s.end_time_ms.unwrap() - s.start_time_ms;
1671                s.attributes
1672                    .insert("duration_ms".to_string(), duration_ms.to_string());
1673            }
1674            #[cfg(not(target_arch = "wasm32"))]
1675            {
1676                let now = std::time::SystemTime::now()
1677                    .duration_since(std::time::UNIX_EPOCH)
1678                    .unwrap()
1679                    .as_millis() as f64;
1680                s.end_time_ms = Some(now);
1681                let duration_ms = s.end_time_ms.unwrap() - s.start_time_ms;
1682                s.attributes
1683                    .insert("duration_ms".to_string(), duration_ms.to_string());
1684            }
1685            s.attributes
1686                .insert("blocks_persisted".to_string(), blocks_count.to_string());
1687            if let Some(recorder) = &self.span_recorder {
1688                recorder.record_span(s);
1689            }
1690
1691            // Exit span context
1692            if let Some(ref context) = self.span_context {
1693                context.exit_span();
1694            }
1695        }
1696
1697        Ok(())
1698    }
1699}
1700
1701#[cfg(target_arch = "wasm32")]
1702impl Drop for Database {
1703    fn drop(&mut self) {
1704        web_sys::console::log_1(&format!("DROP: Releasing connection for {}", self.name).into());
1705
1706        // Release the connection back to the pool
1707        // The pool will close it if this was the last reference
1708        // Pool uses name without .db, so strip it
1709        let pool_key = self.name.trim_end_matches(".db");
1710        crate::connection_pool::release_connection(pool_key);
1711
1712        web_sys::console::log_1(&format!("DROP: Connection released for {}", self.name).into());
1713
1714        // CRITICAL: Stop heartbeat interval synchronously to prevent leaks
1715        use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
1716        if let Some(storage_rc) = get_storage_with_fallback(&self.name) {
1717            // No outer borrow needed - BlockStorage uses RefCell for interior mutability
1718            let storage = &*storage_rc;
1719            // Try to borrow manager - if it fails, skip (already being cleaned)
1720            if let Ok(mut manager_ref) = storage.leader_election.try_borrow_mut() {
1721                if let Some(ref mut manager) = *manager_ref {
1722                    // Clear interval if it exists (idempotent)
1723                    if let Some(interval_id) = manager.heartbeat_interval.take() {
1724                        if let Some(window) = web_sys::window() {
1725                            window.clear_interval_with_handle(interval_id);
1726                            web_sys::console::log_1(
1727                                &format!(
1728                                    "DROP: Cleared heartbeat interval {} for {}",
1729                                    interval_id, self.name
1730                                )
1731                                .into(),
1732                            );
1733                        }
1734                    }
1735                    // Note: heartbeat closure is intentionally leaked (via forget())
1736                    // and becomes a no-op when heartbeat_valid is set to false
1737                }
1738            } else {
1739                // Manager already borrowed - skip (first DB is cleaning up)
1740                web_sys::console::log_1(
1741                    &format!(
1742                        "[DROP] Skipping {} (heartbeat already stopped by shared DB)",
1743                        self.name
1744                    )
1745                    .into(),
1746                );
1747            }
1748        }
1749
1750        // Keep BlockStorage in STORAGE_REGISTRY so multiple Database instances
1751        // with the same name share the same BlockStorage and leader election state
1752        // Blocks persist in GLOBAL_STORAGE across Database instances
1753        log::debug!(
1754            "Closed database: {} (BlockStorage remains in registry)",
1755            self.name
1756        );
1757    }
1758}
1759
1760// Add wasm_bindgen exports for the main Database struct
1761#[cfg(target_arch = "wasm32")]
1762#[wasm_bindgen]
1763impl Database {
1764    #[wasm_bindgen(js_name = "newDatabase")]
1765    pub async fn new_wasm(name: String) -> Result<Database, JsValue> {
1766        // Normalize database name: ensure it has .db suffix
1767        let normalized_name = if name.ends_with(".db") {
1768            name.clone()
1769        } else {
1770            format!("{}.db", name)
1771        };
1772
1773        let config = DatabaseConfig {
1774            name: normalized_name.clone(),
1775            version: Some(1),
1776            cache_size: Some(10_000),
1777            page_size: Some(4096),
1778            auto_vacuum: Some(true),
1779            journal_mode: Some("WAL".to_string()),
1780            max_export_size_bytes: Some(2 * 1024 * 1024 * 1024), // 2GB default
1781        };
1782
1783        let db = Database::new(config)
1784            .await
1785            .map_err(|e| JsValue::from_str(&format!("Failed to create database: {}", e)))?;
1786
1787        // Start listening for write queue requests (leader will process them)
1788        Self::start_write_queue_listener(&normalized_name)?;
1789
1790        Ok(db)
1791    }
1792
1793    /// Get the database name
1794    #[wasm_bindgen(getter)]
1795    pub fn name(&self) -> String {
1796        self.name.clone()
1797    }
1798
1799    /// Get all database names stored in IndexedDB
1800    ///
1801    /// Returns an array of database names (sorted alphabetically)
1802    #[wasm_bindgen(js_name = "getAllDatabases")]
1803    pub async fn get_all_databases() -> Result<JsValue, JsValue> {
1804        use crate::storage::vfs_sync::with_global_storage;
1805        use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
1806        use std::collections::HashSet;
1807
1808        log::info!("getAllDatabases called");
1809        let mut db_names = HashSet::new();
1810
1811        // Get databases from persistent list (localStorage)
1812        match Self::get_persistent_database_list() {
1813            Ok(persistent_list) => {
1814                log::info!("Persistent list has {} entries", persistent_list.len());
1815                for name in persistent_list {
1816                    log::info!("Found in persistent list: {}", name);
1817                    db_names.insert(name);
1818                }
1819            }
1820            Err(e) => {
1821                log::warn!("Failed to get persistent list: {:?}", e);
1822            }
1823        }
1824
1825        // Get databases from STORAGE_REGISTRY (currently open)
1826        // SAFETY: WASM is single-threaded, no concurrent access possible
1827        STORAGE_REGISTRY.with(|reg| unsafe {
1828            let registry = &*reg.get();
1829            log::info!("STORAGE_REGISTRY has {} entries", registry.len());
1830            for key in registry.keys() {
1831                log::info!("Found in STORAGE_REGISTRY: {}", key);
1832                db_names.insert(key.clone());
1833            }
1834        });
1835
1836        // Get databases from GLOBAL_STORAGE (in-memory persistent storage)
1837        with_global_storage(|storage| {
1838            let storage_borrow = storage.borrow();
1839            log::info!("GLOBAL_STORAGE has {} entries", storage_borrow.len());
1840            for key in storage_borrow.keys() {
1841                log::info!("Found in GLOBAL_STORAGE: {}", key);
1842                db_names.insert(key.clone());
1843            }
1844        });
1845
1846        log::info!("Total unique databases found: {}", db_names.len());
1847
1848        // Convert to sorted vector
1849        let mut result_vec: Vec<String> = db_names.into_iter().collect();
1850        result_vec.sort();
1851
1852        // Convert to JavaScript array
1853        let js_array = js_sys::Array::new();
1854        for name in &result_vec {
1855            log::info!("Returning database: {}", name);
1856            js_array.push(&JsValue::from_str(name));
1857        }
1858
1859        log::info!("getAllDatabases returning {} databases", result_vec.len());
1860
1861        Ok(js_array.into())
1862    }
1863
1864    /// Delete a database from storage
1865    ///
1866    /// Removes database from both STORAGE_REGISTRY and GLOBAL_STORAGE
1867    #[wasm_bindgen(js_name = "deleteDatabase")]
1868    pub async fn delete_database(name: String) -> Result<(), JsValue> {
1869        use crate::storage::vfs_sync::{
1870            with_global_commit_marker, with_global_metadata, with_global_storage,
1871        };
1872
1873        // Normalize database name
1874        let normalized_name = if name.ends_with(".db") {
1875            name.clone()
1876        } else {
1877            format!("{}.db", name)
1878        };
1879
1880        log::info!("Deleting database: {}", normalized_name);
1881
1882        // Remove from STORAGE_REGISTRY
1883        use crate::vfs::indexeddb_vfs::remove_storage_from_registry;
1884        remove_storage_from_registry(&normalized_name);
1885
1886        // Remove from GLOBAL_STORAGE
1887        with_global_storage(|gs| {
1888            #[cfg(target_arch = "wasm32")]
1889            let mut storage = gs.borrow_mut();
1890            #[cfg(not(target_arch = "wasm32"))]
1891            let mut storage = gs.lock();
1892            storage.remove(&normalized_name);
1893        });
1894
1895        // Remove from GLOBAL_METADATA
1896        with_global_metadata(|gm| {
1897            #[cfg(target_arch = "wasm32")]
1898            let mut metadata = gm.borrow_mut();
1899            #[cfg(not(target_arch = "wasm32"))]
1900            let mut metadata = gm.lock();
1901            metadata.remove(&normalized_name);
1902        });
1903
1904        // Remove from commit markers
1905        with_global_commit_marker(|cm| {
1906            #[cfg(target_arch = "wasm32")]
1907            let mut markers = cm.borrow_mut();
1908            #[cfg(not(target_arch = "wasm32"))]
1909            let mut markers = cm.lock();
1910            log::info!(
1911                "Cleared commit marker for {} from GLOBAL storage",
1912                normalized_name
1913            );
1914            markers.remove(&normalized_name);
1915        });
1916
1917        // Delete from IndexedDB
1918        let idb_name = format!("absurder_{}", normalized_name);
1919        let _delete_promise = js_sys::eval(&format!("indexedDB.deleteDatabase('{}')", idb_name))
1920            .map_err(|e| JsValue::from_str(&format!("Failed to delete IndexedDB: {:?}", e)))?;
1921
1922        log::info!("Database deleted: {}", normalized_name);
1923
1924        // Remove from persistent list
1925        Self::remove_database_from_persistent_list(&normalized_name)?;
1926
1927        Ok(())
1928    }
1929
1930    /// Add database name to persistent list in localStorage
1931    #[allow(dead_code)]
1932    fn add_database_to_persistent_list(db_name: &str) -> Result<(), JsValue> {
1933        log::info!("add_database_to_persistent_list called for: {}", db_name);
1934
1935        let window = web_sys::window().ok_or_else(|| {
1936            log::error!("No window object");
1937            JsValue::from_str("No window")
1938        })?;
1939
1940        let storage = window
1941            .local_storage()
1942            .map_err(|e| {
1943                log::error!("Failed to get localStorage: {:?}", e);
1944                JsValue::from_str("No localStorage")
1945            })?
1946            .ok_or_else(|| {
1947                log::error!("localStorage not available");
1948                JsValue::from_str("localStorage not available")
1949            })?;
1950
1951        let key = "absurder_db_list";
1952        let existing = storage.get_item(key).map_err(|e| {
1953            log::error!("Failed to read localStorage key {}: {:?}", key, e);
1954            JsValue::from_str("Failed to read localStorage")
1955        })?;
1956
1957        log::debug!("Existing localStorage value: {:?}", existing);
1958
1959        let mut db_list: Vec<String> = if let Some(json_str) = existing {
1960            match serde_json::from_str(&json_str) {
1961                Ok(list) => {
1962                    log::debug!("Parsed existing list: {:?}", list);
1963                    list
1964                }
1965                Err(e) => {
1966                    log::warn!("Failed to parse localStorage JSON: {}, starting fresh", e);
1967                    Vec::new()
1968                }
1969            }
1970        } else {
1971            log::debug!("No existing list, creating new");
1972            Vec::new()
1973        };
1974
1975        if !db_list.contains(&db_name.to_string()) {
1976            db_list.push(db_name.to_string());
1977            db_list.sort();
1978            log::debug!("Updated list: {:?}", db_list);
1979
1980            let json_str = serde_json::to_string(&db_list).map_err(|e| {
1981                log::error!("Failed to serialize list: {}", e);
1982                JsValue::from_str("Failed to serialize")
1983            })?;
1984
1985            log::debug!("Writing to localStorage: {}", json_str);
1986
1987            storage.set_item(key, &json_str).map_err(|e| {
1988                log::error!("Failed to write to localStorage: {:?}", e);
1989                JsValue::from_str("Failed to write localStorage")
1990            })?;
1991
1992            log::info!("Successfully added {} to persistent database list", db_name);
1993        } else {
1994            log::info!("{} already in persistent list", db_name);
1995        }
1996
1997        Ok(())
1998    }
1999
2000    /// Remove database name from persistent list in localStorage
2001    fn remove_database_from_persistent_list(db_name: &str) -> Result<(), JsValue> {
2002        let window = web_sys::window().ok_or_else(|| JsValue::from_str("No window"))?;
2003        let storage = window
2004            .local_storage()
2005            .map_err(|_| JsValue::from_str("No localStorage"))?
2006            .ok_or_else(|| JsValue::from_str("localStorage not available"))?;
2007
2008        let key = "absurder_db_list";
2009        let existing = storage
2010            .get_item(key)
2011            .map_err(|_| JsValue::from_str("Failed to read localStorage"))?;
2012
2013        if let Some(json_str) = existing {
2014            let mut db_list: Vec<String> =
2015                serde_json::from_str(&json_str).unwrap_or_else(|_| Vec::new());
2016            db_list.retain(|name| name != db_name);
2017            let json_str = serde_json::to_string(&db_list)
2018                .map_err(|_| JsValue::from_str("Failed to serialize"))?;
2019            storage
2020                .set_item(key, &json_str)
2021                .map_err(|_| JsValue::from_str("Failed to write localStorage"))?;
2022            log::info!("Removed {} from persistent database list", db_name);
2023        }
2024
2025        Ok(())
2026    }
2027
2028    /// Get database names from persistent list in localStorage
2029    fn get_persistent_database_list() -> Result<Vec<String>, JsValue> {
2030        log::info!("get_persistent_database_list called");
2031
2032        let window = web_sys::window().ok_or_else(|| {
2033            log::error!("No window object");
2034            JsValue::from_str("No window")
2035        })?;
2036
2037        let storage = window
2038            .local_storage()
2039            .map_err(|e| {
2040                log::error!("Failed to get localStorage: {:?}", e);
2041                JsValue::from_str("No localStorage")
2042            })?
2043            .ok_or_else(|| {
2044                log::error!("localStorage not available");
2045                JsValue::from_str("localStorage not available")
2046            })?;
2047
2048        let key = "absurder_db_list";
2049        let existing = storage.get_item(key).map_err(|e| {
2050            log::error!("Failed to read localStorage key {}: {:?}", key, e);
2051            JsValue::from_str("Failed to read localStorage")
2052        })?;
2053
2054        log::debug!("Read from localStorage: {:?}", existing);
2055
2056        if let Some(json_str) = existing {
2057            match serde_json::from_str::<Vec<String>>(&json_str) {
2058                Ok(db_list) => {
2059                    log::info!(
2060                        "Successfully parsed {} databases from localStorage",
2061                        db_list.len()
2062                    );
2063                    log::debug!("Database list: {:?}", db_list);
2064                    Ok(db_list)
2065                }
2066                Err(e) => {
2067                    log::error!("Failed to parse localStorage JSON: {}", e);
2068                    Ok(Vec::new())
2069                }
2070            }
2071        } else {
2072            log::info!("No persistent database list in localStorage");
2073            Ok(Vec::new())
2074        }
2075    }
2076
2077    /// Start listening for write queue requests (leader processes these)
2078    fn start_write_queue_listener(db_name: &str) -> Result<(), JsValue> {
2079        use crate::storage::write_queue::{
2080            WriteQueueMessage, WriteResponse, register_write_queue_listener,
2081        };
2082        use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2083
2084        let db_name_clone = db_name.to_string();
2085
2086        let callback = Closure::wrap(Box::new(move |msg: JsValue| {
2087            let db_name_inner = db_name_clone.clone();
2088
2089            // Parse the message
2090            if let Ok(json_str) = js_sys::JSON::stringify(&msg) {
2091                if let Some(json_str) = json_str.as_string() {
2092                    if let Ok(message) = serde_json::from_str::<WriteQueueMessage>(&json_str) {
2093                        if let WriteQueueMessage::WriteRequest(request) = message {
2094                            log::debug!("Leader received write request: {}", request.request_id);
2095
2096                            // Check if we're the leader
2097                            let storage_rc = get_storage_with_fallback(&db_name_inner);
2098
2099                            if let Some(storage) = storage_rc {
2100                                // Spawn async task to process the write
2101                                wasm_bindgen_futures::spawn_local(async move {
2102                                    let is_leader = with_storage_async!(
2103                                        storage,
2104                                        "write_queue_is_leader",
2105                                        |s| s.is_leader()
2106                                    );
2107                                    if is_leader.is_none() {
2108                                        log::error!("Failed to check leader status");
2109                                        return;
2110                                    }
2111                                    let is_leader = is_leader.unwrap();
2112
2113                                    if !is_leader {
2114                                        log::error!("Not leader, ignoring write request");
2115                                        return;
2116                                    }
2117
2118                                    log::debug!("Processing write request as leader");
2119
2120                                    // Create a temporary database instance to execute the SQL
2121                                    match Database::new_wasm(db_name_inner.clone()).await {
2122                                        Ok(mut db) => {
2123                                            // Execute the SQL
2124                                            match db.execute_internal(&request.sql).await {
2125                                                Ok(result) => {
2126                                                    // Send success response
2127                                                    let response = WriteResponse::Success {
2128                                                        request_id: request.request_id.clone(),
2129                                                        affected_rows: result.affected_rows
2130                                                            as usize,
2131                                                    };
2132
2133                                                    use crate::storage::write_queue::send_write_response;
2134                                                    if let Err(e) = send_write_response(
2135                                                        &db_name_inner,
2136                                                        response,
2137                                                    ) {
2138                                                        log::error!(
2139                                                            "Failed to send response: {}",
2140                                                            e
2141                                                        );
2142                                                    } else {
2143                                                        log::info!(
2144                                                            "Write response sent successfully"
2145                                                        );
2146                                                    }
2147                                                }
2148                                                Err(e) => {
2149                                                    // Send error response
2150                                                    let response = WriteResponse::Error {
2151                                                        request_id: request.request_id.clone(),
2152                                                        error_message: e.to_string(),
2153                                                    };
2154
2155                                                    use crate::storage::write_queue::send_write_response;
2156                                                    if let Err(e) = send_write_response(
2157                                                        &db_name_inner,
2158                                                        response,
2159                                                    ) {
2160                                                        log::error!(
2161                                                            "Failed to send error response: {}",
2162                                                            e
2163                                                        );
2164                                                    }
2165                                                }
2166                                            }
2167                                        }
2168                                        Err(e) => {
2169                                            log::error!(
2170                                                "Failed to create db for write processing: {:?}",
2171                                                e
2172                                            );
2173                                        }
2174                                    }
2175                                });
2176                            }
2177                        }
2178                    }
2179                }
2180            }
2181        }) as Box<dyn FnMut(JsValue)>);
2182
2183        let callback_fn = callback.as_ref().unchecked_ref();
2184        register_write_queue_listener(db_name, callback_fn).map_err(|e| {
2185            JsValue::from_str(&format!("Failed to register write queue listener: {}", e))
2186        })?;
2187
2188        callback.forget();
2189
2190        Ok(())
2191    }
2192
2193    #[wasm_bindgen]
2194    pub async fn execute(&mut self, sql: &str) -> Result<JsValue, JsValue> {
2195        // Check write permission before executing
2196        self.check_write_permission(sql)
2197            .await
2198            .map_err(|e| JsValue::from_str(&format!("Write permission denied: {}", e)))?;
2199
2200        let result = self
2201            .execute_internal(sql)
2202            .await
2203            .map_err(|e| JsValue::from_str(&format!("Query execution failed: {}", e)))?;
2204        serde_wasm_bindgen::to_value(&result).map_err(|e| JsValue::from_str(&e.to_string()))
2205    }
2206
2207    #[wasm_bindgen(js_name = "executeWithParams")]
2208    pub async fn execute_with_params(
2209        &mut self,
2210        sql: &str,
2211        params: JsValue,
2212    ) -> Result<JsValue, JsValue> {
2213        let params: Vec<ColumnValue> = serde_wasm_bindgen::from_value(params)
2214            .map_err(|e| JsValue::from_str(&format!("Invalid parameters: {}", e)))?;
2215
2216        // Check write permission before executing
2217        self.check_write_permission(sql)
2218            .await
2219            .map_err(|e| JsValue::from_str(&format!("Write permission denied: {}", e)))?;
2220
2221        let result = self
2222            .execute_with_params_internal(sql, &params)
2223            .await
2224            .map_err(|e| JsValue::from_str(&format!("Query execution failed: {}", e)))?;
2225        serde_wasm_bindgen::to_value(&result).map_err(|e| JsValue::from_str(&e.to_string()))
2226    }
2227
2228    #[wasm_bindgen]
2229    pub async fn close(&mut self) -> Result<(), JsValue> {
2230        self.close_internal()
2231            .await
2232            .map_err(|e| JsValue::from_str(&format!("Failed to close database: {}", e)))
2233    }
2234
2235    /// Force close connection and remove from pool (for test cleanup)
2236    #[wasm_bindgen(js_name = "forceCloseConnection")]
2237    pub async fn force_close_connection(&mut self) -> Result<(), JsValue> {
2238        // First do normal close to cleanup
2239        let _ = self.close_internal().await;
2240
2241        // Then force-remove from connection pool
2242        // Pool uses name without .db, so strip it
2243        let pool_key = self.name.trim_end_matches(".db");
2244        crate::connection_pool::force_close_connection(pool_key);
2245
2246        // CRITICAL: Single source of truth for ALL cleanup
2247        #[cfg(target_arch = "wasm32")]
2248        {
2249            crate::cleanup::cleanup_all_state(pool_key)
2250                .await
2251                .map_err(|e| JsValue::from_str(&format!("Cleanup failed: {}", e)))?;
2252        }
2253        log::info!("Force closed and removed connection for: {}", self.name);
2254        Ok(())
2255    }
2256
2257    #[wasm_bindgen]
2258    pub async fn sync(&mut self) -> Result<(), JsValue> {
2259        self.sync_internal()
2260            .await
2261            .map_err(|e| JsValue::from_str(&format!("Failed to sync database: {}", e)))
2262    }
2263
2264    /// Allow non-leader writes (for single-tab apps or testing)
2265    #[wasm_bindgen(js_name = "allowNonLeaderWrites")]
2266    pub async fn allow_non_leader_writes(&mut self, allow: bool) -> Result<(), JsValue> {
2267        log::debug!("Setting allowNonLeaderWrites = {} for {}", allow, self.name);
2268        self.allow_non_leader_writes = allow;
2269        Ok(())
2270    }
2271
2272    /// Export database to SQLite .db file format
2273    ///
2274    /// Returns the complete database as a Uint8Array that can be downloaded
2275    /// or saved as a standard SQLite .db file.
2276    ///
2277    /// # Example
2278    /// ```javascript
2279    /// const dbBytes = await db.exportToFile();
2280    /// const blob = new Blob([dbBytes], { type: 'application/x-sqlite3' });
2281    /// const url = URL.createObjectURL(blob);
2282    /// const a = document.createElement('a');
2283    /// a.href = url;
2284    /// a.download = 'database.db';
2285    /// a.click();
2286    /// ```
2287    #[wasm_bindgen(js_name = "exportToFile")]
2288    pub async fn export_to_file(&self) -> Result<js_sys::Uint8Array, JsValue> {
2289        let db_name = self.name.clone();
2290        let max_export_size = self.max_export_size_bytes;
2291
2292        log::info!("[EXPORT] ===== Step 1: Acquiring lock");
2293
2294        // Acquire lock FIRST to serialize operations
2295        let _guard = weblocks::acquire(&db_name, weblocks::AcquireOptions::exclusive()).await?;
2296        log::info!("[EXPORT] ===== Step 2: Lock acquired");
2297
2298        // Get storage and sync AFTER lock - this ensures only one export syncs at a time
2299        use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2300        log::info!("[EXPORT] ===== Step 3: Getting storage");
2301        let storage_rc = get_storage_with_fallback(&db_name).ok_or_else(|| {
2302            JsValue::from_str(&format!("Storage not found for database: {}", db_name))
2303        })?;
2304        log::info!("[EXPORT] ===== Step 4: Got storage, reloading cache");
2305
2306        // Reload cache from GLOBAL_STORAGE
2307        #[cfg(target_arch = "wasm32")]
2308        {
2309            storage_rc.reload_cache_from_global_storage();
2310        }
2311
2312        // CRITICAL: Checkpoint WAL to flush SQLite data to VFS blocks before export
2313        // Without this, data stays in SQLite's WAL buffer and doesn't appear in exported bytes
2314        log::info!("[EXPORT] ===== Step 5: Checkpointing WAL");
2315        if !self.connection_state.db.get().is_null() {
2316            // Use raw SQLite call since export_to_file takes &self, not &mut self
2317            use std::ffi::CString;
2318            let pragma = CString::new("PRAGMA wal_checkpoint(PASSIVE)").unwrap();
2319            unsafe {
2320                let mut stmt = std::ptr::null_mut();
2321                let rc = sqlite_wasm_rs::sqlite3_prepare_v2(
2322                    self.connection_state.db.get(),
2323                    pragma.as_ptr(),
2324                    -1,
2325                    &mut stmt,
2326                    std::ptr::null_mut(),
2327                );
2328                if rc == sqlite_wasm_rs::SQLITE_OK && !stmt.is_null() {
2329                    sqlite_wasm_rs::sqlite3_step(stmt);
2330                    sqlite_wasm_rs::sqlite3_finalize(stmt);
2331                    log::info!("[EXPORT] WAL checkpoint completed");
2332                } else {
2333                    log::warn!("[EXPORT] WAL checkpoint failed with rc: {}", rc);
2334                }
2335            }
2336        }
2337
2338        log::info!("[EXPORT] ===== Step 6: Starting sync");
2339        // Sync to ensure all data is persisted before export
2340        storage_rc
2341            .sync()
2342            .await
2343            .map_err(|e| JsValue::from_str(&format!("Sync failed: {}", e)))?;
2344        log::info!("[EXPORT] ===== Step 7: Sync complete");
2345
2346        // Export with configured size limit
2347        log::info!("[EXPORT] Calling export_database_to_bytes");
2348        let db_bytes = {
2349            let storage = &*storage_rc;
2350            crate::storage::export::export_database_to_bytes(storage, max_export_size)
2351                .await
2352                .map_err(|e| {
2353                    log::error!("[EXPORT] Export failed: {}", e);
2354                    JsValue::from_str(&format!("Export failed: {}", e))
2355                })?
2356        };
2357
2358        log::info!("[EXPORT] Export complete: {} bytes", db_bytes.len());
2359
2360        let uint8_array = js_sys::Uint8Array::new_with_length(db_bytes.len() as u32);
2361        uint8_array.copy_from(&db_bytes);
2362
2363        Ok(uint8_array)
2364    }
2365
2366    /// Test method for concurrent locking - simple increment counter
2367    #[wasm_bindgen(js_name = "testLock")]
2368    pub async fn test_lock(&self, value: u32) -> Result<u32, JsValue> {
2369        let lock_name = format!("{}.lock_test", self.name);
2370
2371        log::info!(
2372            "[LOCK-TEST] Acquiring lock: {} with value: {}",
2373            lock_name,
2374            value
2375        );
2376        let _guard = weblocks::acquire(&lock_name, weblocks::AcquireOptions::exclusive()).await?;
2377        log::info!("[LOCK-TEST] Lock acquired, processing value: {}", value);
2378
2379        // Simulate some work
2380        let result = value + 1;
2381
2382        // Small delay to test serialization
2383        let delay_promise = js_sys::Promise::new(&mut |resolve, _reject| {
2384            let window = web_sys::window().unwrap();
2385            let _ = window
2386                .set_timeout_with_callback_and_timeout_and_arguments_0(resolve.unchecked_ref(), 10);
2387        });
2388        wasm_bindgen_futures::JsFuture::from(delay_promise).await?;
2389
2390        log::info!(
2391            "[LOCK-TEST] Lock releasing for: {} with result: {}",
2392            lock_name,
2393            result
2394        );
2395        Ok(result)
2396    }
2397
2398    /// Import SQLite database from .db file bytes
2399    ///
2400    /// Replaces the current database contents with the imported data.
2401    /// This will close the current database connection and clear all existing data.
2402    ///
2403    /// # Arguments
2404    /// * `file_data` - SQLite .db file as Uint8Array
2405    ///
2406    /// # Returns
2407    /// * `Ok(())` - Import successful
2408    /// * `Err(JsValue)` - Import failed (invalid file, validation error, etc.)
2409    ///
2410    /// # Example
2411    /// ```javascript
2412    /// // From file input
2413    /// const fileInput = document.getElementById('dbFile');
2414    /// const file = fileInput.files[0];
2415    /// const arrayBuffer = await file.arrayBuffer();
2416    /// const uint8Array = new Uint8Array(arrayBuffer);
2417    ///
2418    /// await db.importFromFile(uint8Array);
2419    ///
2420    /// // Database is immediately usable after import (no reopen needed)
2421    /// const result = await db.execute('SELECT * FROM imported_table');
2422    /// ```
2423    ///
2424    /// # Warning
2425    /// This operation is destructive and will replace all existing database data.
2426    #[wasm_bindgen(js_name = "importFromFile")]
2427    pub async fn import_from_file(&mut self, file_data: js_sys::Uint8Array) -> Result<(), JsValue> {
2428        log::info!("[IMPORT] Starting import with lock for: {}", self.name);
2429        let db_name = self.name.clone();
2430        let data = file_data.to_vec();
2431
2432        // Acquire lock FIRST to serialize operations
2433        let _guard = weblocks::acquire(&db_name, weblocks::AcquireOptions::exclusive()).await?;
2434        log::info!("[IMPORT] Lock acquired for: {}", db_name);
2435
2436        log::debug!("Import data size: {} bytes", data.len());
2437
2438        // CRITICAL: Force-close database connection BEFORE import
2439        // Must use force_close to remove from connection pool, not just decrement ref_count
2440        // Otherwise new Database instances will reuse stale SQLite connection
2441        log::debug!("Force-closing database connection before import");
2442
2443        // First do normal close to cleanup leader election etc
2444        self.close_internal()
2445            .await
2446            .map_err(|e| JsValue::from_str(&format!("Failed to close before import: {}", e)))?;
2447
2448        // Then force-remove from connection pool
2449        let pool_key = self.name.trim_end_matches(".db");
2450        crate::connection_pool::force_close_connection(pool_key);
2451
2452        // Mark our connection as null since we force-closed it
2453        self.connection_state.db.set(std::ptr::null_mut());
2454        log::debug!("Removed connection from pool for import");
2455
2456        // Call the import function with full name (WITH .db)
2457        crate::storage::import::import_database_from_bytes(&db_name, data)
2458            .await
2459            .map_err(|e| {
2460                log::error!("Import failed for {}: {}", db_name, e);
2461                JsValue::from_str(&format!("Import failed: {}", e))
2462            })?;
2463
2464        log::info!("[IMPORT] Import complete for: {}", db_name);
2465
2466        // Reopen the SQLite connection so this Database instance is usable after import
2467        // The VFS should already exist from when the Database was first created
2468        log::info!("[IMPORT] Reopening connection for: {}", db_name);
2469
2470        use std::ffi::CString;
2471
2472        let vfs_name = format!("vfs_{}", db_name.trim_end_matches(".db"));
2473        let pool_key = db_name.trim_end_matches(".db").to_string();
2474        let db_name_for_closure = db_name.clone();
2475        let vfs_name_for_closure = vfs_name.clone();
2476
2477        let new_state = crate::connection_pool::get_or_create_connection(&pool_key, || {
2478            let mut db = std::ptr::null_mut();
2479            let db_name_cstr = CString::new(db_name_for_closure.clone())
2480                .map_err(|_| "Invalid database name".to_string())?;
2481            let vfs_cstr = CString::new(vfs_name_for_closure.as_str())
2482                .map_err(|_| "Invalid VFS name".to_string())?;
2483
2484            log::info!(
2485                "[IMPORT] Reopening database: {} with VFS: {}",
2486                db_name_for_closure,
2487                vfs_name_for_closure
2488            );
2489
2490            let ret = unsafe {
2491                sqlite_wasm_rs::sqlite3_open_v2(
2492                    db_name_cstr.as_ptr(),
2493                    &mut db as *mut _,
2494                    sqlite_wasm_rs::SQLITE_OPEN_READWRITE | sqlite_wasm_rs::SQLITE_OPEN_CREATE,
2495                    vfs_cstr.as_ptr(),
2496                )
2497            };
2498
2499            if ret != sqlite_wasm_rs::SQLITE_OK {
2500                let err_msg = unsafe {
2501                    let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(db);
2502                    if !msg_ptr.is_null() {
2503                        std::ffi::CStr::from_ptr(msg_ptr)
2504                            .to_string_lossy()
2505                            .into_owned()
2506                    } else {
2507                        "Unknown error".to_string()
2508                    }
2509                };
2510                return Err(format!(
2511                    "Failed to reopen database after import: {}",
2512                    err_msg
2513                ));
2514            }
2515
2516            log::info!("[IMPORT] Database reopened successfully");
2517            Ok(db)
2518        })
2519        .map_err(|e| {
2520            JsValue::from_str(&format!("Failed to reopen connection after import: {}", e))
2521        })?;
2522
2523        // Update our connection state to use the new connection
2524        self.connection_state = new_state;
2525        log::info!("[IMPORT] Connection state updated for: {}", db_name);
2526
2527        Ok(())
2528    }
2529
2530    /// Wait for this instance to become leader
2531    #[wasm_bindgen(js_name = "waitForLeadership")]
2532    pub async fn wait_for_leadership(&mut self) -> Result<(), JsValue> {
2533        use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2534
2535        // Track leader election attempt
2536        #[cfg(feature = "telemetry")]
2537        if let Some(ref metrics) = self.metrics {
2538            metrics.leader_election_attempts_total().inc();
2539        }
2540
2541        let db_name = &self.name;
2542        let start_time = js_sys::Date::now();
2543
2544        let timeout_ms = 5000.0; // 5 second timeout
2545
2546        loop {
2547            let storage_rc = get_storage_with_fallback(db_name);
2548
2549            if let Some(storage) = storage_rc {
2550                let is_leader =
2551                    match with_storage_async!(storage, "wait_for_leadership", |s| s.is_leader()) {
2552                        Some(v) => v,
2553                        None => continue,
2554                    };
2555
2556                if is_leader {
2557                    log::info!("Became leader for {}", db_name);
2558
2559                    // Record telemetry on successful leadership acquisition
2560                    #[cfg(feature = "telemetry")]
2561                    if let Some(ref metrics) = self.metrics {
2562                        let duration_ms = js_sys::Date::now() - start_time;
2563                        metrics.leader_election_duration().observe(duration_ms);
2564                        metrics.is_leader().set(1.0);
2565                        metrics.leadership_changes_total().inc();
2566                    }
2567
2568                    return Ok(());
2569                }
2570            }
2571
2572            // Check timeout
2573            if js_sys::Date::now() - start_time > timeout_ms {
2574                // Record telemetry on timeout
2575                #[cfg(feature = "telemetry")]
2576                if let Some(ref metrics) = self.metrics {
2577                    let duration_ms = js_sys::Date::now() - start_time;
2578                    metrics.leader_election_duration().observe(duration_ms);
2579                }
2580
2581                return Err(JsValue::from_str("Timeout waiting for leadership"));
2582            }
2583
2584            // Wait a bit before checking again
2585            let promise = js_sys::Promise::new(&mut |resolve, _| {
2586                let window = web_sys::window().expect("should have window");
2587                let _ = window.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, 100);
2588            });
2589            let _ = wasm_bindgen_futures::JsFuture::from(promise).await;
2590        }
2591    }
2592
2593    /// Request leadership (triggers re-election check)
2594    #[wasm_bindgen(js_name = "requestLeadership")]
2595    pub async fn request_leadership(&mut self) -> Result<(), JsValue> {
2596        use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2597
2598        let db_name = &self.name;
2599        log::debug!("Requesting leadership for {}", db_name);
2600
2601        // Record telemetry data before the request
2602        #[cfg(feature = "telemetry")]
2603        let telemetry_data = if self.metrics.is_some() {
2604            let start_time = js_sys::Date::now();
2605            let was_leader = self
2606                .is_leader_wasm()
2607                .await
2608                .ok()
2609                .and_then(|v| v.as_bool())
2610                .unwrap_or(false);
2611
2612            if let Some(ref metrics) = self.metrics {
2613                metrics.leader_elections_total().inc();
2614            }
2615
2616            Some((start_time, was_leader))
2617        } else {
2618            None
2619        };
2620
2621        let storage_rc = get_storage_with_fallback(db_name);
2622
2623        if let Some(storage) = storage_rc {
2624            {
2625                // Trigger leader election
2626                let result = with_storage_async!(storage, "request_leadership", |s| s
2627                    .start_leader_election())
2628                .ok_or_else(|| {
2629                    JsValue::from_str("Failed to acquire storage lock for leadership request")
2630                })?;
2631                result.map_err(|e| {
2632                    JsValue::from_str(&format!("Failed to request leadership: {}", e))
2633                })?;
2634
2635                log::debug!("Re-election triggered for {}", db_name);
2636            } // Drop the borrow here
2637
2638            // Record telemetry after election (after dropping borrow)
2639            #[cfg(feature = "telemetry")]
2640            if let Some((start_time, was_leader)) = telemetry_data {
2641                if let Some(ref metrics) = self.metrics {
2642                    // Record election duration
2643                    let duration_ms = js_sys::Date::now() - start_time;
2644                    metrics.leader_election_duration().observe(duration_ms);
2645
2646                    // Check if leadership status changed
2647                    let is_leader_now = self
2648                        .is_leader_wasm()
2649                        .await
2650                        .ok()
2651                        .and_then(|v| v.as_bool())
2652                        .unwrap_or(false);
2653
2654                    // Update is_leader gauge
2655                    metrics
2656                        .is_leader()
2657                        .set(if is_leader_now { 1.0 } else { 0.0 });
2658
2659                    // Track leadership changes
2660                    if was_leader != is_leader_now {
2661                        metrics.leadership_changes_total().inc();
2662                    }
2663                }
2664            }
2665
2666            Ok(())
2667        } else {
2668            Err(JsValue::from_str(&format!(
2669                "No storage found for database: {}",
2670                db_name
2671            )))
2672        }
2673    }
2674
2675    /// Get leader information
2676    #[wasm_bindgen(js_name = "getLeaderInfo")]
2677    pub async fn get_leader_info(&mut self) -> Result<JsValue, JsValue> {
2678        use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2679
2680        let db_name = &self.name;
2681
2682        let storage_rc = get_storage_with_fallback(db_name);
2683
2684        if let Some(storage) = storage_rc {
2685            let is_leader = with_storage_async!(storage, "get_leader_info", |s| s.is_leader())
2686                .ok_or_else(|| {
2687                    JsValue::from_str(&format!(
2688                        "Failed to access storage for database: {}",
2689                        db_name
2690                    ))
2691                })?;
2692
2693            // Get leader info - we'll use simpler data for now
2694            // Real implementation would need public getters on BlockStorage
2695            let leader_id_str = if is_leader {
2696                format!("leader_{}", db_name)
2697            } else {
2698                "unknown".to_string()
2699            };
2700
2701            // Create JavaScript object
2702            let obj = js_sys::Object::new();
2703            js_sys::Reflect::set(&obj, &"isLeader".into(), &JsValue::from_bool(is_leader))?;
2704            js_sys::Reflect::set(&obj, &"leaderId".into(), &JsValue::from_str(&leader_id_str))?;
2705            js_sys::Reflect::set(
2706                &obj,
2707                &"leaseExpiry".into(),
2708                &JsValue::from_f64(js_sys::Date::now()),
2709            )?;
2710
2711            Ok(obj.into())
2712        } else {
2713            Err(JsValue::from_str(&format!(
2714                "No storage found for database: {}",
2715                db_name
2716            )))
2717        }
2718    }
2719
2720    /// Queue a write operation to be executed by the leader
2721    ///
2722    /// Non-leader tabs can use this to request writes from the leader.
2723    /// The write is forwarded via BroadcastChannel and executed by the leader.
2724    ///
2725    /// # Arguments
2726    /// * `sql` - SQL statement to execute (must be a write operation)
2727    ///
2728    /// # Returns
2729    /// Result indicating success or failure
2730    #[wasm_bindgen(js_name = "queueWrite")]
2731    pub async fn queue_write(&mut self, sql: String) -> Result<(), JsValue> {
2732        self.queue_write_with_timeout(sql, 5000).await
2733    }
2734
2735    /// Queue a write operation with a specific timeout
2736    ///
2737    /// # Arguments
2738    /// * `sql` - SQL statement to execute
2739    /// * `timeout_ms` - Timeout in milliseconds
2740    #[wasm_bindgen(js_name = "queueWriteWithTimeout")]
2741    pub async fn queue_write_with_timeout(
2742        &mut self,
2743        sql: String,
2744        timeout_ms: u32,
2745    ) -> Result<(), JsValue> {
2746        use crate::storage::write_queue::{WriteQueueMessage, WriteResponse, send_write_request};
2747        use std::cell::RefCell;
2748        use std::rc::Rc;
2749
2750        log::debug!("Queuing write: {}", sql);
2751
2752        // Check if we're the leader - if so, just execute directly
2753        use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2754        let is_leader = {
2755            let storage_rc = get_storage_with_fallback(&self.name);
2756
2757            if let Some(storage) = storage_rc {
2758                with_storage_async!(storage, "queue_write_is_leader", |s| s.is_leader())
2759                    .unwrap_or(false)
2760            } else {
2761                false
2762            }
2763        };
2764
2765        if is_leader {
2766            log::debug!("We are leader, executing directly");
2767            return self
2768                .execute_internal(&sql)
2769                .await
2770                .map(|_| ())
2771                .map_err(|e| JsValue::from_str(&format!("Execute failed: {}", e)));
2772        }
2773
2774        // Send write request to leader
2775        let request_id = send_write_request(&self.name, &sql)
2776            .map_err(|e| JsValue::from_str(&format!("Failed to send write request: {}", e)))?;
2777
2778        log::debug!("Write request sent with ID: {}", request_id);
2779
2780        // Wait for response with timeout
2781        let response_received = Rc::new(RefCell::new(false));
2782        let response_error = Rc::new(RefCell::new(None::<String>));
2783
2784        let response_received_clone = response_received.clone();
2785        let response_error_clone = response_error.clone();
2786        let request_id_clone = request_id.clone();
2787
2788        // Set up listener for response
2789        let callback = Closure::wrap(Box::new(move |msg: JsValue| {
2790            // Parse the message
2791            if let Ok(json_str) = js_sys::JSON::stringify(&msg) {
2792                if let Some(json_str) = json_str.as_string() {
2793                    if let Ok(message) = serde_json::from_str::<WriteQueueMessage>(&json_str) {
2794                        if let WriteQueueMessage::WriteResponse(response) = message {
2795                            match response {
2796                                WriteResponse::Success { request_id, .. } => {
2797                                    if request_id == request_id_clone {
2798                                        *response_received_clone.borrow_mut() = true;
2799                                        log::debug!("Write response received: Success");
2800                                    }
2801                                }
2802                                WriteResponse::Error {
2803                                    request_id,
2804                                    error_message,
2805                                } => {
2806                                    if request_id == request_id_clone {
2807                                        *response_received_clone.borrow_mut() = true;
2808                                        *response_error_clone.borrow_mut() = Some(error_message);
2809                                        log::debug!("Write response received: Error");
2810                                    }
2811                                }
2812                            }
2813                        }
2814                    }
2815                }
2816            }
2817        }) as Box<dyn FnMut(JsValue)>);
2818
2819        // Register listener
2820        use crate::storage::write_queue::register_write_queue_listener;
2821        let callback_fn = callback.as_ref().unchecked_ref();
2822        register_write_queue_listener(&self.name, callback_fn)
2823            .map_err(|e| JsValue::from_str(&format!("Failed to register listener: {}", e)))?;
2824
2825        // Keep callback alive
2826        callback.forget();
2827
2828        // Wait for response with polling (timeout_ms)
2829        let start_time = js_sys::Date::now();
2830        let timeout_f64 = timeout_ms as f64;
2831
2832        loop {
2833            // Check if response received
2834            if *response_received.borrow() {
2835                if let Some(error_msg) = response_error.borrow().as_ref() {
2836                    return Err(JsValue::from_str(&format!("Write failed: {}", error_msg)));
2837                }
2838                log::info!("Write completed successfully");
2839                return Ok(());
2840            }
2841
2842            // Check timeout
2843            let elapsed = js_sys::Date::now() - start_time;
2844            if elapsed > timeout_f64 {
2845                return Err(JsValue::from_str("Write request timed out"));
2846            }
2847
2848            // Wait a bit before checking again
2849            wasm_bindgen_futures::JsFuture::from(js_sys::Promise::new(&mut |resolve, _reject| {
2850                if let Some(window) = web_sys::window() {
2851                    let _ =
2852                        window.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, 100);
2853                } else {
2854                    log::error!("Window unavailable in timeout handler");
2855                }
2856            }))
2857            .await
2858            .ok();
2859        }
2860    }
2861
2862    #[wasm_bindgen(js_name = "isLeader")]
2863    pub async fn is_leader_wasm(&self) -> Result<JsValue, JsValue> {
2864        // Get the storage from STORAGE_REGISTRY
2865        use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2866
2867        let db_name = &self.name;
2868        log::debug!("isLeader() called for database: {} (self.name)", db_name);
2869
2870        let storage_rc = get_storage_with_fallback(db_name);
2871
2872        if let Some(storage) = storage_rc {
2873            log::debug!("Found storage for {}, calling is_leader()", db_name);
2874            let is_leader = with_storage_async!(storage, "is_leader_wasm", |s| s.is_leader())
2875                .ok_or_else(|| {
2876                    JsValue::from_str(&format!(
2877                        "Failed to access storage for database: {}",
2878                        db_name
2879                    ))
2880                })?;
2881            log::debug!("is_leader() = {} for {}", is_leader, db_name);
2882
2883            // Return as JsValue boolean
2884            Ok(JsValue::from_bool(is_leader))
2885        } else {
2886            log::error!("ERROR: No storage found for database: {}", db_name);
2887            Err(JsValue::from_str(&format!(
2888                "No storage found for database: {}",
2889                db_name
2890            )))
2891        }
2892    }
2893
2894    /// Check if this instance is the leader (non-wasm version for internal use/tests)
2895    pub async fn is_leader(&self) -> Result<bool, JsValue> {
2896        let result = self.is_leader_wasm().await?;
2897        Ok(result.as_bool().unwrap_or(false))
2898    }
2899
2900    #[wasm_bindgen(js_name = "onDataChange")]
2901    pub fn on_data_change_wasm(&mut self, callback: &js_sys::Function) -> Result<(), JsValue> {
2902        log::debug!("Registering onDataChange callback for {}", self.name);
2903
2904        // Store the callback
2905        self.on_data_change_callback = Some(callback.clone());
2906
2907        // Register listener for BroadcastChannel notifications from other tabs
2908        use crate::storage::broadcast_notifications::register_change_listener;
2909
2910        let db_name = &self.name;
2911        register_change_listener(db_name, callback).map_err(|e| {
2912            JsValue::from_str(&format!("Failed to register change listener: {}", e))
2913        })?;
2914
2915        log::debug!("onDataChange callback registered for {}", self.name);
2916        Ok(())
2917    }
2918
2919    /// Enable or disable optimistic updates mode
2920    #[wasm_bindgen(js_name = "enableOptimisticUpdates")]
2921    pub async fn enable_optimistic_updates(&mut self, enabled: bool) -> Result<(), JsValue> {
2922        self.optimistic_updates_manager
2923            .borrow_mut()
2924            .set_enabled(enabled);
2925        log::debug!(
2926            "Optimistic updates {}",
2927            if enabled { "enabled" } else { "disabled" }
2928        );
2929        Ok(())
2930    }
2931
2932    /// Check if optimistic mode is enabled
2933    #[wasm_bindgen(js_name = "isOptimisticMode")]
2934    pub async fn is_optimistic_mode(&self) -> bool {
2935        self.optimistic_updates_manager.borrow().is_enabled()
2936    }
2937
2938    /// Track an optimistic write
2939    #[wasm_bindgen(js_name = "trackOptimisticWrite")]
2940    pub async fn track_optimistic_write(&mut self, sql: String) -> Result<String, JsValue> {
2941        let id = self
2942            .optimistic_updates_manager
2943            .borrow_mut()
2944            .track_write(sql);
2945        Ok(id)
2946    }
2947
2948    /// Get count of pending writes
2949    #[wasm_bindgen(js_name = "getPendingWritesCount")]
2950    pub async fn get_pending_writes_count(&self) -> usize {
2951        self.optimistic_updates_manager.borrow().get_pending_count()
2952    }
2953
2954    /// Clear all optimistic writes
2955    #[wasm_bindgen(js_name = "clearOptimisticWrites")]
2956    pub async fn clear_optimistic_writes(&mut self) -> Result<(), JsValue> {
2957        self.optimistic_updates_manager.borrow_mut().clear_all();
2958        Ok(())
2959    }
2960
2961    /// Enable or disable coordination metrics tracking
2962    #[wasm_bindgen(js_name = "enableCoordinationMetrics")]
2963    pub async fn enable_coordination_metrics(&mut self, enabled: bool) -> Result<(), JsValue> {
2964        self.coordination_metrics_manager
2965            .borrow_mut()
2966            .set_enabled(enabled);
2967        Ok(())
2968    }
2969
2970    /// Check if coordination metrics tracking is enabled
2971    #[wasm_bindgen(js_name = "isCoordinationMetricsEnabled")]
2972    pub async fn is_coordination_metrics_enabled(&self) -> bool {
2973        self.coordination_metrics_manager.borrow().is_enabled()
2974    }
2975
2976    /// Record a leadership change
2977    #[wasm_bindgen(js_name = "recordLeadershipChange")]
2978    pub async fn record_leadership_change(&mut self, became_leader: bool) -> Result<(), JsValue> {
2979        self.coordination_metrics_manager
2980            .borrow_mut()
2981            .record_leadership_change(became_leader);
2982        Ok(())
2983    }
2984
2985    /// Record a notification latency in milliseconds
2986    #[wasm_bindgen(js_name = "recordNotificationLatency")]
2987    pub async fn record_notification_latency(&mut self, latency_ms: f64) -> Result<(), JsValue> {
2988        self.coordination_metrics_manager
2989            .borrow_mut()
2990            .record_notification_latency(latency_ms);
2991        Ok(())
2992    }
2993
2994    /// Record a write conflict (non-leader write attempt)
2995    #[wasm_bindgen(js_name = "recordWriteConflict")]
2996    pub async fn record_write_conflict(&mut self) -> Result<(), JsValue> {
2997        self.coordination_metrics_manager
2998            .borrow_mut()
2999            .record_write_conflict();
3000        Ok(())
3001    }
3002
3003    /// Record a follower refresh
3004    #[wasm_bindgen(js_name = "recordFollowerRefresh")]
3005    pub async fn record_follower_refresh(&mut self) -> Result<(), JsValue> {
3006        self.coordination_metrics_manager
3007            .borrow_mut()
3008            .record_follower_refresh();
3009        Ok(())
3010    }
3011
3012    /// Get coordination metrics as JSON string
3013    #[wasm_bindgen(js_name = "getCoordinationMetrics")]
3014    pub async fn get_coordination_metrics(&self) -> Result<String, JsValue> {
3015        self.coordination_metrics_manager
3016            .borrow()
3017            .get_metrics_json()
3018            .map_err(|e| JsValue::from_str(&e))
3019    }
3020
3021    /// Reset all coordination metrics
3022    #[wasm_bindgen(js_name = "resetCoordinationMetrics")]
3023    pub async fn reset_coordination_metrics(&mut self) -> Result<(), JsValue> {
3024        self.coordination_metrics_manager.borrow_mut().reset();
3025        Ok(())
3026    }
3027}
3028
3029// Export WasmColumnValue for WASM
3030#[cfg(target_arch = "wasm32")]
3031#[wasm_bindgen]
3032pub struct WasmColumnValue {
3033    #[allow(dead_code)]
3034    inner: ColumnValue,
3035}
3036
3037#[cfg(target_arch = "wasm32")]
3038#[wasm_bindgen]
3039impl WasmColumnValue {
3040    #[wasm_bindgen(js_name = "createNull")]
3041    pub fn create_null() -> WasmColumnValue {
3042        WasmColumnValue {
3043            inner: ColumnValue::Null,
3044        }
3045    }
3046
3047    #[wasm_bindgen(js_name = "createInteger")]
3048    pub fn create_integer(value: i64) -> WasmColumnValue {
3049        WasmColumnValue {
3050            inner: ColumnValue::Integer(value),
3051        }
3052    }
3053
3054    #[wasm_bindgen(js_name = "createReal")]
3055    pub fn create_real(value: f64) -> WasmColumnValue {
3056        WasmColumnValue {
3057            inner: ColumnValue::Real(value),
3058        }
3059    }
3060
3061    #[wasm_bindgen(js_name = "createText")]
3062    pub fn create_text(value: String) -> WasmColumnValue {
3063        WasmColumnValue {
3064            inner: ColumnValue::Text(value),
3065        }
3066    }
3067
3068    #[wasm_bindgen(js_name = "createBlob")]
3069    pub fn create_blob(value: &[u8]) -> WasmColumnValue {
3070        WasmColumnValue {
3071            inner: ColumnValue::Blob(value.to_vec()),
3072        }
3073    }
3074
3075    #[wasm_bindgen(js_name = "createBigInt")]
3076    pub fn create_bigint(value: &str) -> WasmColumnValue {
3077        WasmColumnValue {
3078            inner: ColumnValue::BigInt(value.to_string()),
3079        }
3080    }
3081
3082    #[wasm_bindgen(js_name = "createDate")]
3083    pub fn create_date(timestamp: f64) -> WasmColumnValue {
3084        WasmColumnValue {
3085            inner: ColumnValue::Date(timestamp as i64),
3086        }
3087    }
3088
3089    #[wasm_bindgen(js_name = "fromJsValue")]
3090    pub fn from_js_value(value: &JsValue) -> WasmColumnValue {
3091        if value.is_null() || value.is_undefined() {
3092            WasmColumnValue {
3093                inner: ColumnValue::Null,
3094            }
3095        } else if let Some(s) = value.as_string() {
3096            // Check if it's a large number string
3097            if let Ok(parsed) = s.parse::<i64>() {
3098                WasmColumnValue {
3099                    inner: ColumnValue::Integer(parsed),
3100                }
3101            } else {
3102                WasmColumnValue {
3103                    inner: ColumnValue::Text(s),
3104                }
3105            }
3106        } else if let Some(n) = value.as_f64() {
3107            if n.fract() == 0.0 && n >= i64::MIN as f64 && n <= i64::MAX as f64 {
3108                WasmColumnValue {
3109                    inner: ColumnValue::Integer(n as i64),
3110                }
3111            } else {
3112                WasmColumnValue {
3113                    inner: ColumnValue::Real(n),
3114                }
3115            }
3116        } else if value.is_object() {
3117            // Check if it's a Date
3118            if js_sys::Date::new(value).get_time().is_finite() {
3119                let timestamp = js_sys::Date::new(value).get_time() as i64;
3120                WasmColumnValue {
3121                    inner: ColumnValue::Date(timestamp),
3122                }
3123            } else {
3124                // Convert to string for other objects
3125                WasmColumnValue {
3126                    inner: ColumnValue::Text(format!("{:?}", value)),
3127                }
3128            }
3129        } else {
3130            WasmColumnValue {
3131                inner: ColumnValue::Null,
3132            }
3133        }
3134    }
3135
3136    // --- Rust-friendly alias constructors used in wasm tests ---
3137    // These mirror the create* methods but with simpler names and
3138    // argument types matching test usage.
3139    pub fn null() -> WasmColumnValue {
3140        Self::create_null()
3141    }
3142
3143    // Tests call integer(42.0), so accept f64 and cast to i64.
3144    pub fn integer(value: f64) -> WasmColumnValue {
3145        Self::create_integer(value as i64)
3146    }
3147
3148    pub fn real(value: f64) -> WasmColumnValue {
3149        Self::create_real(value)
3150    }
3151
3152    pub fn text(value: String) -> WasmColumnValue {
3153        Self::create_text(value)
3154    }
3155
3156    pub fn blob(value: Vec<u8>) -> WasmColumnValue {
3157        Self::create_blob(&value)
3158    }
3159
3160    pub fn big_int(value: String) -> WasmColumnValue {
3161        Self::create_bigint(&value)
3162    }
3163
3164    pub fn date(timestamp_ms: f64) -> WasmColumnValue {
3165        Self::create_date(timestamp_ms)
3166    }
3167}