absurder_sql/
lib.rs

1#[cfg(target_arch = "wasm32")]
2use std::rc::Rc;
3#[cfg(target_arch = "wasm32")]
4use wasm_bindgen::prelude::*;
5
6// Conditional rusqlite import: same crate, different features
7// Make this public so child crates can use it
8// When encryption is enabled, rusqlite uses bundled-sqlcipher-vendored-openssl feature
9// When bundled-sqlite is enabled, rusqlite uses bundled feature
10#[cfg(all(
11    not(target_arch = "wasm32"),
12    any(
13        feature = "bundled-sqlite",
14        feature = "encryption",
15        feature = "encryption-commoncrypto",
16        feature = "encryption-ios"
17    )
18))]
19pub extern crate rusqlite;
20
21// Enable better panic messages and memory allocation
22#[cfg(feature = "console_error_panic_hook")]
23pub use console_error_panic_hook::set_once as set_panic_hook;
24
25#[cfg(feature = "wee_alloc")]
26#[global_allocator]
27static ALLOC: wee_alloc::WeeAlloc = wee_alloc::WeeAlloc::INIT;
28
29// Initialize logging infrastructure for WASM
30#[cfg(all(target_arch = "wasm32", feature = "console_log"))]
31#[wasm_bindgen(start)]
32pub fn init_logger() {
33    // Initialize console_log for browser logging
34    // Use Info level for production, Debug for development
35    #[cfg(debug_assertions)]
36    let log_level = log::Level::Debug;
37    #[cfg(not(debug_assertions))]
38    let log_level = log::Level::Info;
39
40    console_log::init_with_level(log_level).expect("Failed to initialize console_log");
41
42    log::info!("AbsurderSQL logging initialized at level: {:?}", log_level);
43}
44
45/// SINGLE SOURCE OF TRUTH: Normalize database name to ALWAYS include .db extension
46/// This matches main branch behavior where all GLOBAL_STORAGE keys use name WITH .db
47/// Used by: Database::new, import_from_file, sync_internal, IndexedDB operations
48#[inline]
49#[cfg(target_arch = "wasm32")]
50fn normalize_db_name(name: &str) -> String {
51    // Main branch uses full name with .db - we must do the same for consistency
52    // BlockStorage, GLOBAL_STORAGE, and IndexedDB all use this normalized name
53    if name.ends_with(".db") {
54        name.to_string()
55    } else {
56        format!("{}.db", name)
57    }
58}
59
60// Module declarations
61mod cleanup;
62#[cfg(target_arch = "wasm32")]
63pub mod connection_pool;
64#[cfg(not(target_arch = "wasm32"))]
65pub mod database;
66pub mod storage;
67pub mod types;
68pub mod vfs;
69#[cfg(not(target_arch = "wasm32"))]
70pub use database::PreparedStatement;
71pub mod utils;
72
73#[cfg(feature = "telemetry")]
74pub mod telemetry;
75
76// Re-export main public API
77#[cfg(not(target_arch = "wasm32"))]
78pub use database::SqliteIndexedDB;
79
80// WASM: Track databases currently being opened to serialize SQLite connection initialization
81#[cfg(target_arch = "wasm32")]
82thread_local! {
83    static DB_OPEN_IN_PROGRESS: std::cell::RefCell<std::collections::HashSet<String>> =
84        std::cell::RefCell::new(std::collections::HashSet::new());
85}
86
87// Type alias for native platforms
88#[cfg(not(target_arch = "wasm32"))]
89pub type Database = SqliteIndexedDB;
90
91pub use types::DatabaseConfig;
92pub use types::{ColumnValue, DatabaseError, QueryResult, Row, TransactionOptions};
93
94// Re-export VFS
95pub use vfs::indexeddb_vfs::IndexedDBVFS;
96
97/// DRY macro for async storage operations with interior mutability
98#[cfg(target_arch = "wasm32")]
99macro_rules! with_storage_async {
100    ($storage:expr, $operation:expr, |$s:ident| $body:expr) => {{
101        // BlockStorage uses RefCell for interior mutability, no outer borrow needed
102        let $s = &*$storage;
103        Some($body.await)
104    }};
105}
106
107// WASM Database implementation using sqlite-wasm-rs
108#[cfg(target_arch = "wasm32")]
109#[wasm_bindgen]
110pub struct Database {
111    #[wasm_bindgen(skip)]
112    connection_state: Rc<crate::connection_pool::ConnectionState>,
113    #[allow(dead_code)]
114    name: String,
115    #[wasm_bindgen(skip)]
116    on_data_change_callback: Option<js_sys::Function>,
117    #[wasm_bindgen(skip)]
118    allow_non_leader_writes: bool,
119    #[wasm_bindgen(skip)]
120    optimistic_updates_manager:
121        std::cell::RefCell<crate::storage::optimistic_updates::OptimisticUpdatesManager>,
122    #[wasm_bindgen(skip)]
123    coordination_metrics_manager:
124        std::cell::RefCell<crate::storage::coordination_metrics::CoordinationMetricsManager>,
125    #[wasm_bindgen(skip)]
126    #[cfg(feature = "telemetry")]
127    metrics: Option<crate::telemetry::Metrics>,
128    #[wasm_bindgen(skip)]
129    #[cfg(feature = "telemetry")]
130    span_recorder: Option<crate::telemetry::SpanRecorder>,
131    #[wasm_bindgen(skip)]
132    #[cfg(feature = "telemetry")]
133    span_context: Option<crate::telemetry::SpanContext>,
134    #[wasm_bindgen(skip)]
135    max_export_size_bytes: Option<u64>,
136}
137
138#[cfg(target_arch = "wasm32")]
139impl Database {
140    /// Get the SQLite database pointer from the shared connection
141    /// Uses Cell::get() to avoid RefCell borrow checking - eliminates reentrancy panics
142    fn db(&self) -> *mut sqlite_wasm_rs::sqlite3 {
143        let db_ptr = self.connection_state.db.get();
144        if db_ptr.is_null() {
145            panic!("Database connection is null for {}", self.name);
146        }
147        db_ptr
148    }
149
150    /// Check if a SQL statement is a write operation
151    fn is_write_operation(sql: &str) -> bool {
152        let upper = sql.trim().to_uppercase();
153        upper.starts_with("INSERT")
154            || upper.starts_with("UPDATE")
155            || upper.starts_with("DELETE")
156            || upper.starts_with("REPLACE")
157    }
158
159    /// Get metrics for observability
160    ///
161    /// Returns a reference to the Metrics instance for tracking queries, errors, and performance
162    #[cfg(feature = "telemetry")]
163    pub fn metrics(&self) -> Option<&crate::telemetry::Metrics> {
164        self.metrics.as_ref()
165    }
166
167    /// Check write permission - only leader can write (unless override enabled)
168    async fn check_write_permission(&mut self, sql: &str) -> Result<(), DatabaseError> {
169        if !Self::is_write_operation(sql) {
170            // Not a write operation, allow it
171            return Ok(());
172        }
173
174        // Check if non-leader writes are allowed
175        if self.allow_non_leader_writes {
176            log::info!("WRITE_ALLOWED: Non-leader writes enabled for {}", self.name);
177            return Ok(());
178        }
179
180        // Check if this instance is the leader
181        use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
182
183        let db_name = &self.name;
184        let storage_rc = get_storage_with_fallback(db_name);
185
186        if let Some(storage) = storage_rc {
187            let is_leader = with_storage_async!(storage, "check_write_permission", |s| s
188                .is_leader())
189            .ok_or_else(|| {
190                DatabaseError::new("BORROW_CONFLICT", "Failed to check leader status")
191            })?;
192
193            if !is_leader {
194                log::error!("WRITE_DENIED: Instance is not leader for {}", db_name);
195                return Err(DatabaseError::new(
196                    "WRITE_PERMISSION_DENIED",
197                    "Only the leader tab can write to this database. Use db.isLeader() to check status or call db.allowNonLeaderWrites(true) for single-tab mode.",
198                ));
199            }
200
201            log::info!("WRITE_ALLOWED: Instance is leader for {}", db_name);
202            Ok(())
203        } else {
204            // No storage found - allow by default (single-instance mode)
205            log::info!(
206                "WRITE_ALLOWED: No storage found for {} (single-instance mode)",
207                db_name
208            );
209            Ok(())
210        }
211    }
212
213    pub async fn new(config: DatabaseConfig) -> Result<Self, DatabaseError> {
214        use std::ffi::{CStr, CString};
215
216        log::info!("Database::new called for {}", config.name);
217
218        // CRITICAL: Use DRY helper to normalize name WITH .db extension
219        // This ensures Database.name, GLOBAL_STORAGE keys, and IndexedDB keys all match
220        let normalized_name = normalize_db_name(&config.name);
221
222        // Use a unique VFS name per database to avoid interference
223        let vfs_name = format!("vfs_{}", normalized_name.trim_end_matches(".db"));
224        let vfs_name_cstr = CString::new(vfs_name.as_str())
225            .map_err(|_| DatabaseError::new("INVALID_VFS_NAME", "Invalid VFS name"))?;
226        let vfs_exists = unsafe {
227            let existing_vfs = sqlite_wasm_rs::sqlite3_vfs_find(vfs_name_cstr.as_ptr());
228            !existing_vfs.is_null()
229        };
230
231        if !vfs_exists {
232            // Create and register VFS only if it doesn't exist
233            log::debug!("Creating IndexedDBVFS for: {}", normalized_name);
234            let vfs = crate::vfs::IndexedDBVFS::new(&normalized_name).await?;
235            log::debug!("Registering VFS as '{}'", vfs_name);
236            vfs.register(&vfs_name)?;
237            log::info!("VFS registered successfully");
238        } else {
239            log::info!("VFS '{}' already registered, reusing existing", vfs_name);
240            // Ensure BlockStorage exists for this database in the registry
241            // The existing VFS will find it via STORAGE_REGISTRY
242            let _vfs = crate::vfs::IndexedDBVFS::new(&normalized_name).await?;
243            log::info!("BlockStorage ensured for {}", normalized_name);
244        }
245
246        // CRITICAL: Synchronize SQLite connection opening to prevent WAL initialization conflicts
247        // Wait if another task is currently opening a connection to this database
248        #[cfg(target_arch = "wasm32")]
249        {
250            const MAX_OPEN_WAIT_MS: u32 = 1000; // Reduced from 5000 - 1s is sufficient
251            const OPEN_POLL_MS: u32 = 10;
252            let max_open_attempts = MAX_OPEN_WAIT_MS / OPEN_POLL_MS;
253
254            for attempt in 0..max_open_attempts {
255                let can_open = DB_OPEN_IN_PROGRESS.with(|opens| {
256                    let mut set = opens.borrow_mut();
257                    if set.contains(&config.name) {
258                        false // Someone else is opening
259                    } else {
260                        set.insert(config.name.clone());
261                        true // We got it
262                    }
263                });
264
265                if can_open {
266                    web_sys::console::log_1(
267                        &format!("[DB] {} - ACQUIRED sqlite open lock", config.name).into(),
268                    );
269                    break;
270                } else {
271                    web_sys::console::log_1(
272                        &format!(
273                            "[DB] {} - Waiting for sqlite open (attempt {})",
274                            config.name, attempt
275                        )
276                        .into(),
277                    );
278                    use wasm_bindgen_futures::JsFuture;
279                    let promise = js_sys::Promise::new(&mut |resolve, _| {
280                        web_sys::window()
281                            .unwrap()
282                            .set_timeout_with_callback_and_timeout_and_arguments_0(
283                                &resolve,
284                                OPEN_POLL_MS as i32,
285                            )
286                            .unwrap();
287                    });
288                    JsFuture::from(promise).await.ok();
289                    continue;
290                }
291            }
292        }
293
294        // Use connection pooling to share connections between instances
295        let (connection_state, db) = {
296            let vfs_name_str = vfs_name.clone(); // Capture the VFS name to use in closure
297            let filename_copy = normalized_name.clone(); // Capture filename for logging
298            let pool_key = normalized_name.trim_end_matches(".db").to_string(); // Pool uses name without .db
299            let state = crate::connection_pool::get_or_create_connection(&pool_key, || {
300                let mut db = std::ptr::null_mut();
301                let db_name = CString::new(normalized_name.clone())
302                    .map_err(|_| "Invalid database name".to_string())?;
303                let vfs_cstr = CString::new(vfs_name_str.as_str())
304                    .map_err(|_| "Invalid VFS name".to_string())?;
305
306                log::info!(
307                    "Opening database: {} with VFS: {}",
308                    filename_copy,
309                    vfs_name_str
310                );
311
312                #[cfg(target_arch = "wasm32")]
313                web_sys::console::log_1(&format!("[OPEN] About to call sqlite3_open_v2...").into());
314
315                let ret = unsafe {
316                    sqlite_wasm_rs::sqlite3_open_v2(
317                        db_name.as_ptr(),
318                        &mut db as *mut _,
319                        sqlite_wasm_rs::SQLITE_OPEN_READWRITE | sqlite_wasm_rs::SQLITE_OPEN_CREATE,
320                        vfs_cstr.as_ptr(),
321                    )
322                };
323
324                #[cfg(target_arch = "wasm32")]
325                web_sys::console::log_1(
326                    &format!("[OPEN] sqlite3_open_v2 returned: {}", ret).into(),
327                );
328
329                log::info!(
330                    "sqlite3_open_v2 returned: {} for database: {}",
331                    ret,
332                    filename_copy
333                );
334
335                if ret != sqlite_wasm_rs::SQLITE_OK {
336                    let err_msg = unsafe {
337                        let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(db);
338                        if !msg_ptr.is_null() {
339                            CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
340                        } else {
341                            "Unknown error".to_string()
342                        }
343                    };
344
345                    #[cfg(target_arch = "wasm32")]
346                    web_sys::console::log_1(
347                        &format!(
348                            "[OPEN] ERROR - sqlite3_open_v2 FAILED: ret={}, err={}",
349                            ret, err_msg
350                        )
351                        .into(),
352                    );
353
354                    return Err(format!(
355                        "Failed to open database with IndexedDB VFS: {}",
356                        err_msg
357                    ));
358                }
359
360                log::info!("Database opened successfully with IndexedDB VFS");
361                Ok(db)
362            })
363            .map_err(|e| DatabaseError::new("CONNECTION_POOL_ERROR", &e))?;
364            let db_ptr = state.db.get();
365            (state, db_ptr)
366        };
367
368        // Apply configuration options via PRAGMA statements
369        let exec_sql = |db: *mut sqlite_wasm_rs::sqlite3, sql: &str| -> Result<(), DatabaseError> {
370            let c_sql = CString::new(sql)
371                .map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL statement"))?;
372
373            let ret = unsafe {
374                sqlite_wasm_rs::sqlite3_exec(
375                    db,
376                    c_sql.as_ptr(),
377                    None,
378                    std::ptr::null_mut(),
379                    std::ptr::null_mut(),
380                )
381            };
382
383            if ret != sqlite_wasm_rs::SQLITE_OK {
384                let err_msg = unsafe {
385                    let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(db);
386                    if !msg_ptr.is_null() {
387                        CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
388                    } else {
389                        "Unknown error".to_string()
390                    }
391                };
392                log::warn!("Failed to execute SQL '{}': {}", sql, err_msg);
393                return Err(DatabaseError::new(
394                    "SQLITE_ERROR",
395                    &format!("Failed to execute: {}", err_msg),
396                ));
397            }
398            Ok(())
399        };
400
401        // CRITICAL: Set busy_timeout FIRST to handle concurrent access
402        // This makes SQLite wait and retry for up to 10 seconds when the database is locked
403        // instead of immediately returning SQLITE_BUSY errors during parallel operations
404        log::debug!("Setting busy_timeout to 10000ms for concurrent access handling");
405        exec_sql(db, "PRAGMA busy_timeout = 10000")?;
406
407        // Apply page_size (must be set before any tables are created)
408        if let Some(page_size) = config.page_size {
409            log::debug!("Setting page_size to {}", page_size);
410            exec_sql(db, &format!("PRAGMA page_size = {}", page_size))?;
411        }
412
413        // Apply cache_size
414        if let Some(cache_size) = config.cache_size {
415            log::debug!("Setting cache_size to {}", cache_size);
416            exec_sql(db, &format!("PRAGMA cache_size = {}", cache_size))?;
417        }
418
419        // Apply journal_mode
420        // WAL mode is now fully supported via shared memory (xShm*) implementation
421        if let Some(ref journal_mode) = config.journal_mode {
422            log::debug!("Setting journal_mode to {}", journal_mode);
423
424            let pragma_sql = format!("PRAGMA journal_mode = {}", journal_mode);
425            let c_sql = CString::new(pragma_sql.as_str())
426                .map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL statement"))?;
427
428            let mut stmt: *mut sqlite_wasm_rs::sqlite3_stmt = std::ptr::null_mut();
429            let ret = unsafe {
430                sqlite_wasm_rs::sqlite3_prepare_v2(
431                    db,
432                    c_sql.as_ptr(),
433                    -1,
434                    &mut stmt as *mut _,
435                    std::ptr::null_mut(),
436                )
437            };
438
439            if ret == sqlite_wasm_rs::SQLITE_OK && !stmt.is_null() {
440                let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
441                if step_ret == sqlite_wasm_rs::SQLITE_ROW {
442                    let result_ptr = unsafe { sqlite_wasm_rs::sqlite3_column_text(stmt, 0) };
443                    if !result_ptr.is_null() {
444                        let result_mode = unsafe {
445                            std::ffi::CStr::from_ptr(result_ptr as *const i8)
446                                .to_string_lossy()
447                                .to_uppercase()
448                        };
449
450                        if result_mode != journal_mode.to_uppercase() {
451                            log::warn!(
452                                "journal_mode {} requested but SQLite set {}",
453                                journal_mode,
454                                result_mode
455                            );
456                        } else {
457                            log::info!("journal_mode successfully set to {}", result_mode);
458                        }
459                    }
460                }
461                unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
462            } else {
463                log::warn!("Failed to prepare journal_mode PRAGMA");
464            }
465        }
466
467        // Apply auto_vacuum (must be set before any tables are created)
468        if let Some(auto_vacuum) = config.auto_vacuum {
469            let vacuum_mode = if auto_vacuum { 1 } else { 0 }; // 0=none, 1=full, 2=incremental
470            log::debug!("Setting auto_vacuum to {}", vacuum_mode);
471            exec_sql(db, &format!("PRAGMA auto_vacuum = {}", vacuum_mode))?;
472        }
473
474        log::info!("Database configuration applied successfully");
475
476        // Initialize metrics for telemetry
477        #[cfg(feature = "telemetry")]
478        let metrics = crate::telemetry::Metrics::new().map_err(|e| {
479            DatabaseError::new(
480                "METRICS_ERROR",
481                &format!("Failed to initialize metrics: {}", e),
482            )
483        })?;
484
485        let database = Database {
486            connection_state,
487            name: normalized_name.clone(), // CRITICAL: Use normalized name WITH .db to match registry
488            on_data_change_callback: None,
489            allow_non_leader_writes: false,
490            optimistic_updates_manager: std::cell::RefCell::new(
491                crate::storage::optimistic_updates::OptimisticUpdatesManager::new(),
492            ),
493            coordination_metrics_manager: std::cell::RefCell::new(
494                crate::storage::coordination_metrics::CoordinationMetricsManager::new(),
495            ),
496            #[cfg(feature = "telemetry")]
497            metrics: Some(metrics),
498            #[cfg(feature = "telemetry")]
499            span_recorder: None,
500            #[cfg(feature = "telemetry")]
501            span_context: Some(crate::telemetry::SpanContext::new()),
502            max_export_size_bytes: config.max_export_size_bytes,
503        };
504
505        // CRITICAL: Release the SQLite open lock ONLY after Database is fully constructed
506        // This ensures WAL initialization and all setup completes before another instance can start
507        #[cfg(target_arch = "wasm32")]
508        {
509            DB_OPEN_IN_PROGRESS.with(|opens| {
510                opens.borrow_mut().remove(&config.name);
511            });
512            web_sys::console::log_1(
513                &format!("[DB] {} - RELEASED sqlite open lock", config.name).into(),
514            );
515        }
516
517        Ok(database)
518    }
519
520    /// Open a database with a specific VFS using connection pooling
521    pub async fn open_with_vfs(filename: &str, vfs_name: &str) -> Result<Self, DatabaseError> {
522        use std::ffi::CString;
523
524        log::info!("Opening database {} with VFS {}", filename, vfs_name);
525
526        // Normalize the database name WITH .db extension
527        let normalized_name = normalize_db_name(filename);
528        let pool_key = normalized_name.trim_end_matches(".db").to_string();
529
530        // Use connection pooling with custom VFS
531        let connection_state = crate::connection_pool::get_or_create_connection(&pool_key, || {
532            let mut db: *mut sqlite_wasm_rs::sqlite3 = std::ptr::null_mut();
533            let db_name = CString::new(normalized_name.clone())
534                .map_err(|_| "Invalid database name".to_string())?;
535            let vfs_cstr = CString::new(vfs_name).map_err(|_| "Invalid VFS name".to_string())?;
536
537            let ret = unsafe {
538                sqlite_wasm_rs::sqlite3_open_v2(
539                    db_name.as_ptr(),
540                    &mut db as *mut _,
541                    sqlite_wasm_rs::SQLITE_OPEN_READWRITE | sqlite_wasm_rs::SQLITE_OPEN_CREATE,
542                    vfs_cstr.as_ptr(),
543                )
544            };
545
546            if ret != sqlite_wasm_rs::SQLITE_OK {
547                let err_msg = if !db.is_null() {
548                    unsafe {
549                        let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(db);
550                        if !msg_ptr.is_null() {
551                            std::ffi::CStr::from_ptr(msg_ptr)
552                                .to_string_lossy()
553                                .into_owned()
554                        } else {
555                            "Unknown error".to_string()
556                        }
557                    }
558                } else {
559                    "Failed to open database".to_string()
560                };
561                return Err(format!("SQLITE_ERROR: {}", err_msg));
562            }
563
564            Ok(db)
565        })
566        .map_err(|e| DatabaseError::new("OPEN_ERROR", &e))?;
567
568        log::info!(
569            "Successfully opened database {} with VFS {}",
570            normalized_name,
571            vfs_name
572        );
573
574        // Initialize metrics for telemetry
575        #[cfg(feature = "telemetry")]
576        let metrics = crate::telemetry::Metrics::new().map_err(|e| {
577            DatabaseError::new(
578                "METRICS_ERROR",
579                &format!("Failed to initialize metrics: {}", e),
580            )
581        })?;
582
583        Ok(Database {
584            connection_state,
585            name: normalized_name, // CRITICAL: Store normalized name WITH .db
586            on_data_change_callback: None,
587            allow_non_leader_writes: false,
588            optimistic_updates_manager: std::cell::RefCell::new(
589                crate::storage::optimistic_updates::OptimisticUpdatesManager::new(),
590            ),
591            coordination_metrics_manager: std::cell::RefCell::new(
592                crate::storage::coordination_metrics::CoordinationMetricsManager::new(),
593            ),
594            #[cfg(feature = "telemetry")]
595            metrics: Some(metrics),
596            #[cfg(feature = "telemetry")]
597            span_recorder: None,
598            #[cfg(feature = "telemetry")]
599            span_context: Some(crate::telemetry::SpanContext::new()),
600            max_export_size_bytes: Some(2 * 1024 * 1024 * 1024), // Default 2GB limit
601        })
602    }
603
604    pub async fn execute_internal(&mut self, sql: &str) -> Result<QueryResult, DatabaseError> {
605        use std::ffi::{CStr, CString};
606        let start_time = js_sys::Date::now();
607
608        // Create span for query execution and enter context
609        #[cfg(feature = "telemetry")]
610        let span = if self.span_recorder.is_some() {
611            let query_type = sql
612                .trim()
613                .split_whitespace()
614                .next()
615                .unwrap_or("UNKNOWN")
616                .to_uppercase();
617            let mut builder = crate::telemetry::SpanBuilder::new("execute_query".to_string())
618                .with_attribute("query_type", query_type.clone())
619                .with_attribute("sql", sql.to_string());
620
621            // Attach baggage from context
622            if let Some(ref context) = self.span_context {
623                builder = builder.with_baggage_from_context(context);
624            }
625
626            let span = builder.build();
627
628            // Enter span context
629            if let Some(ref context) = self.span_context {
630                context.enter_span(span.span_id.clone());
631            }
632
633            Some(span)
634        } else {
635            None
636        };
637
638        // Track query execution metrics
639        #[cfg(feature = "telemetry")]
640        #[cfg(feature = "telemetry")]
641        if let Some(metrics) = &self.metrics {
642            metrics.queries_total().inc();
643        }
644
645        // Validate connection pointer before using it
646        if self.db().is_null() {
647            return Err(DatabaseError::new(
648                "NULL_CONNECTION",
649                "Database connection is null",
650            ));
651        }
652
653        let sql_cstr = CString::new(sql)
654            .map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL string"))?;
655
656        if sql.trim().to_uppercase().starts_with("SELECT") {
657            let mut stmt = std::ptr::null_mut();
658            let ret = unsafe {
659                sqlite_wasm_rs::sqlite3_prepare_v2(
660                    self.db(),
661                    sql_cstr.as_ptr(),
662                    -1,
663                    &mut stmt,
664                    std::ptr::null_mut(),
665                )
666            };
667
668            if ret != sqlite_wasm_rs::SQLITE_OK {
669                // Get actual error message from SQLite
670                let err_msg = unsafe {
671                    let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
672                    if !msg_ptr.is_null() {
673                        CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
674                    } else {
675                        format!("Unknown error (code: {})", ret)
676                    }
677                };
678
679                // Track error
680                #[cfg(feature = "telemetry")]
681                #[cfg(feature = "telemetry")]
682                if let Some(metrics) = &self.metrics {
683                    metrics.errors_total().inc();
684                }
685
686                // Finish span with error
687                #[cfg(feature = "telemetry")]
688                if let Some(mut s) = span {
689                    s.status = crate::telemetry::SpanStatus::Error(format!(
690                        "Failed to prepare: {}",
691                        err_msg
692                    ));
693                    s.end_time_ms = Some(js_sys::Date::now());
694                    if let Some(recorder) = &self.span_recorder {
695                        recorder.record_span(s);
696                    }
697
698                    // Exit span context
699                    if let Some(ref context) = self.span_context {
700                        context.exit_span();
701                    }
702                }
703
704                return Err(DatabaseError::new(
705                    "SQLITE_ERROR",
706                    &format!("Failed to prepare statement: {}", err_msg),
707                )
708                .with_sql(sql));
709            }
710
711            let column_count = unsafe { sqlite_wasm_rs::sqlite3_column_count(stmt) };
712            let mut columns = Vec::new();
713            let mut rows = Vec::new();
714
715            // Get column names
716            for i in 0..column_count {
717                let col_name = unsafe {
718                    let name_ptr = sqlite_wasm_rs::sqlite3_column_name(stmt, i);
719                    if name_ptr.is_null() {
720                        format!("col_{}", i)
721                    } else {
722                        std::ffi::CStr::from_ptr(name_ptr)
723                            .to_string_lossy()
724                            .into_owned()
725                    }
726                };
727                columns.push(col_name);
728            }
729
730            // Execute and fetch rows
731            loop {
732                let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
733                if step_ret == sqlite_wasm_rs::SQLITE_ROW {
734                    let mut values = Vec::new();
735                    for i in 0..column_count {
736                        let value = unsafe {
737                            let col_type = sqlite_wasm_rs::sqlite3_column_type(stmt, i);
738                            match col_type {
739                                sqlite_wasm_rs::SQLITE_NULL => ColumnValue::Null,
740                                sqlite_wasm_rs::SQLITE_INTEGER => {
741                                    let val = sqlite_wasm_rs::sqlite3_column_int64(stmt, i);
742                                    ColumnValue::Integer(val)
743                                }
744                                sqlite_wasm_rs::SQLITE_FLOAT => {
745                                    let val = sqlite_wasm_rs::sqlite3_column_double(stmt, i);
746                                    ColumnValue::Real(val)
747                                }
748                                sqlite_wasm_rs::SQLITE_TEXT => {
749                                    let text_ptr = sqlite_wasm_rs::sqlite3_column_text(stmt, i);
750                                    if text_ptr.is_null() {
751                                        ColumnValue::Null
752                                    } else {
753                                        let text = std::ffi::CStr::from_ptr(text_ptr as *const i8)
754                                            .to_string_lossy()
755                                            .into_owned();
756                                        ColumnValue::Text(text)
757                                    }
758                                }
759                                sqlite_wasm_rs::SQLITE_BLOB => {
760                                    let blob_ptr = sqlite_wasm_rs::sqlite3_column_blob(stmt, i);
761                                    let blob_size = sqlite_wasm_rs::sqlite3_column_bytes(stmt, i);
762                                    if blob_ptr.is_null() || blob_size == 0 {
763                                        ColumnValue::Blob(vec![])
764                                    } else {
765                                        let blob_slice = std::slice::from_raw_parts(
766                                            blob_ptr as *const u8,
767                                            blob_size as usize,
768                                        );
769                                        ColumnValue::Blob(blob_slice.to_vec())
770                                    }
771                                }
772                                _ => ColumnValue::Null,
773                            }
774                        };
775                        values.push(value);
776                    }
777                    rows.push(Row { values });
778                } else if step_ret == sqlite_wasm_rs::SQLITE_DONE {
779                    break;
780                } else {
781                    // Get SQLite error message before finalizing
782                    let err_msg = unsafe {
783                        let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
784                        if !err_ptr.is_null() {
785                            std::ffi::CStr::from_ptr(err_ptr)
786                                .to_string_lossy()
787                                .to_string()
788                        } else {
789                            "Unknown SQLite error".to_string()
790                        }
791                    };
792                    unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
793                    // Track error
794                    #[cfg(feature = "telemetry")]
795                    if let Some(metrics) = &self.metrics {
796                        metrics.errors_total().inc();
797                    }
798                    return Err(DatabaseError::new(
799                        "SQLITE_ERROR",
800                        &format!("Error executing SELECT statement: {}", err_msg),
801                    )
802                    .with_sql(sql));
803                }
804            }
805
806            unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
807            let execution_time_ms = js_sys::Date::now() - start_time;
808
809            // Track query duration
810            #[cfg(feature = "telemetry")]
811            if let Some(metrics) = &self.metrics {
812                metrics.query_duration().observe(execution_time_ms);
813            }
814
815            Ok(QueryResult {
816                columns,
817                rows,
818                affected_rows: 0,
819                last_insert_id: None,
820                execution_time_ms,
821            })
822        } else {
823            // Non-SELECT statements - Use prepare/step to properly handle PRAGMA results
824            let mut stmt: *mut sqlite_wasm_rs::sqlite3_stmt = std::ptr::null_mut();
825            let ret = unsafe {
826                sqlite_wasm_rs::sqlite3_prepare_v2(
827                    self.db(),
828                    sql_cstr.as_ptr(),
829                    -1,
830                    &mut stmt,
831                    std::ptr::null_mut(),
832                )
833            };
834
835            if ret != sqlite_wasm_rs::SQLITE_OK {
836                // Get actual error message from SQLite
837                let err_msg = unsafe {
838                    let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
839                    if !msg_ptr.is_null() {
840                        CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
841                    } else {
842                        format!("Unknown error (code: {})", ret)
843                    }
844                };
845
846                // Track error
847                #[cfg(feature = "telemetry")]
848                if let Some(metrics) = &self.metrics {
849                    metrics.errors_total().inc();
850                }
851                return Err(DatabaseError::new(
852                    "SQLITE_ERROR",
853                    &format!("Failed to prepare statement: {}", err_msg),
854                )
855                .with_sql(sql));
856            }
857
858            // Get column info for PRAGMA statements that return results
859            let column_count = unsafe { sqlite_wasm_rs::sqlite3_column_count(stmt) };
860            let mut columns = Vec::new();
861            let mut rows = Vec::new();
862
863            if column_count > 0 {
864                // This is a PRAGMA or other statement that returns rows
865                for i in 0..column_count {
866                    let col_name = unsafe {
867                        let name_ptr = sqlite_wasm_rs::sqlite3_column_name(stmt, i);
868                        if name_ptr.is_null() {
869                            format!("column_{}", i)
870                        } else {
871                            std::ffi::CStr::from_ptr(name_ptr)
872                                .to_string_lossy()
873                                .into_owned()
874                        }
875                    };
876                    columns.push(col_name);
877                }
878
879                // Fetch all rows
880                loop {
881                    let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
882                    if step_ret == sqlite_wasm_rs::SQLITE_ROW {
883                        let mut values = Vec::new();
884                        for i in 0..column_count {
885                            let value = unsafe {
886                                let col_type = sqlite_wasm_rs::sqlite3_column_type(stmt, i);
887                                match col_type {
888                                    sqlite_wasm_rs::SQLITE_TEXT => {
889                                        let text_ptr = sqlite_wasm_rs::sqlite3_column_text(stmt, i);
890                                        if text_ptr.is_null() {
891                                            ColumnValue::Null
892                                        } else {
893                                            let text =
894                                                std::ffi::CStr::from_ptr(text_ptr as *const i8)
895                                                    .to_string_lossy()
896                                                    .into_owned();
897                                            ColumnValue::Text(text)
898                                        }
899                                    }
900                                    sqlite_wasm_rs::SQLITE_INTEGER => ColumnValue::Integer(
901                                        sqlite_wasm_rs::sqlite3_column_int64(stmt, i),
902                                    ),
903                                    _ => ColumnValue::Null,
904                                }
905                            };
906                            values.push(value);
907                        }
908                        rows.push(Row { values });
909                    } else if step_ret == sqlite_wasm_rs::SQLITE_DONE {
910                        break;
911                    } else {
912                        // Get SQLite error message before finalizing
913                        let err_msg = unsafe {
914                            let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
915                            if !err_ptr.is_null() {
916                                std::ffi::CStr::from_ptr(err_ptr)
917                                    .to_string_lossy()
918                                    .to_string()
919                            } else {
920                                "Unknown SQLite error".to_string()
921                            }
922                        };
923                        unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
924                        // Track error
925                        #[cfg(feature = "telemetry")]
926                        if let Some(metrics) = &self.metrics {
927                            metrics.errors_total().inc();
928                        }
929                        return Err(DatabaseError::new(
930                            "SQLITE_ERROR",
931                            &format!("Failed to execute statement: {}", err_msg),
932                        )
933                        .with_sql(sql));
934                    }
935                }
936            } else {
937                // Regular non-SELECT statement
938                let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
939                if step_ret != sqlite_wasm_rs::SQLITE_DONE {
940                    // Get SQLite error message before finalizing
941                    let err_msg = unsafe {
942                        let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
943                        if !err_ptr.is_null() {
944                            std::ffi::CStr::from_ptr(err_ptr)
945                                .to_string_lossy()
946                                .to_string()
947                        } else {
948                            "Unknown SQLite error".to_string()
949                        }
950                    };
951                    unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
952                    // Track error
953                    #[cfg(feature = "telemetry")]
954                    if let Some(metrics) = &self.metrics {
955                        metrics.errors_total().inc();
956                    }
957                    return Err(DatabaseError::new(
958                        "SQLITE_ERROR",
959                        &format!("Failed to execute statement: {}", err_msg),
960                    )
961                    .with_sql(sql));
962                }
963            }
964
965            // Finalize to complete the statement
966            unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
967
968            let affected_rows = unsafe { sqlite_wasm_rs::sqlite3_changes(self.db()) } as u32;
969            let last_insert_id = if sql.trim().to_uppercase().starts_with("INSERT") {
970                Some(unsafe { sqlite_wasm_rs::sqlite3_last_insert_rowid(self.db()) })
971            } else {
972                None
973            };
974
975            let execution_time_ms = js_sys::Date::now() - start_time;
976
977            // Track query duration
978            #[cfg(feature = "telemetry")]
979            if let Some(metrics) = &self.metrics {
980                metrics.query_duration().observe(execution_time_ms);
981            }
982
983            // Finish span successfully
984            #[cfg(feature = "telemetry")]
985            if let Some(mut s) = span {
986                s.status = crate::telemetry::SpanStatus::Ok;
987                s.end_time_ms = Some(js_sys::Date::now());
988                s.attributes
989                    .insert("duration_ms".to_string(), execution_time_ms.to_string());
990                s.attributes
991                    .insert("affected_rows".to_string(), affected_rows.to_string());
992                s.attributes
993                    .insert("row_count".to_string(), rows.len().to_string());
994                if let Some(recorder) = &self.span_recorder {
995                    recorder.record_span(s);
996                }
997
998                // Exit span context
999                if let Some(ref context) = self.span_context {
1000                    context.exit_span();
1001                }
1002            }
1003
1004            Ok(QueryResult {
1005                columns,
1006                rows,
1007                affected_rows,
1008                last_insert_id,
1009                execution_time_ms,
1010            })
1011        }
1012    }
1013
1014    pub async fn execute_with_params_internal(
1015        &mut self,
1016        sql: &str,
1017        params: &[ColumnValue],
1018    ) -> Result<QueryResult, DatabaseError> {
1019        use std::ffi::{CStr, CString};
1020        let start_time = js_sys::Date::now();
1021
1022        // Create span for query execution
1023        #[cfg(feature = "telemetry")]
1024        let span = if self.span_recorder.is_some() {
1025            let query_type = sql
1026                .trim()
1027                .split_whitespace()
1028                .next()
1029                .unwrap_or("UNKNOWN")
1030                .to_uppercase();
1031            let span = crate::telemetry::SpanBuilder::new("execute_query".to_string())
1032                .with_attribute("query_type", query_type.clone())
1033                .with_attribute("sql", sql.to_string())
1034                .build();
1035            Some(span)
1036        } else {
1037            None
1038        };
1039
1040        // Ensure metrics are propagated to BlockStorage before execution
1041        #[cfg(feature = "telemetry")]
1042        self.ensure_metrics_propagated();
1043
1044        // Track query execution metrics
1045        #[cfg(feature = "telemetry")]
1046        if let Some(metrics) = &self.metrics {
1047            metrics.queries_total().inc();
1048        }
1049
1050        let sql_cstr = CString::new(sql)
1051            .map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL string"))?;
1052
1053        let mut stmt = std::ptr::null_mut();
1054        let ret = unsafe {
1055            sqlite_wasm_rs::sqlite3_prepare_v2(
1056                self.db(),
1057                sql_cstr.as_ptr(),
1058                -1,
1059                &mut stmt,
1060                std::ptr::null_mut(),
1061            )
1062        };
1063
1064        if ret != sqlite_wasm_rs::SQLITE_OK {
1065            // Get actual error message from SQLite
1066            let err_msg = unsafe {
1067                let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
1068                if !msg_ptr.is_null() {
1069                    CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
1070                } else {
1071                    format!("Unknown error (code: {})", ret)
1072                }
1073            };
1074
1075            // Track error
1076            #[cfg(feature = "telemetry")]
1077            if let Some(metrics) = &self.metrics {
1078                metrics.errors_total().inc();
1079            }
1080
1081            // Finish span with error
1082            #[cfg(feature = "telemetry")]
1083            if let Some(mut s) = span {
1084                s.status =
1085                    crate::telemetry::SpanStatus::Error(format!("Failed to prepare: {}", err_msg));
1086                s.end_time_ms = Some(js_sys::Date::now());
1087                if let Some(recorder) = &self.span_recorder {
1088                    recorder.record_span(s);
1089                }
1090            }
1091
1092            return Err(DatabaseError::new(
1093                "SQLITE_ERROR",
1094                &format!("Failed to prepare statement: {}", err_msg),
1095            )
1096            .with_sql(sql));
1097        }
1098
1099        // Bind parameters
1100        let mut text_cstrings = Vec::new(); // Keep CStrings alive
1101        for (i, param) in params.iter().enumerate() {
1102            let param_index = (i + 1) as i32;
1103            let bind_ret = unsafe {
1104                match param {
1105                    ColumnValue::Null => sqlite_wasm_rs::sqlite3_bind_null(stmt, param_index),
1106                    ColumnValue::Integer(val) => {
1107                        sqlite_wasm_rs::sqlite3_bind_int64(stmt, param_index, *val)
1108                    }
1109                    ColumnValue::Real(val) => {
1110                        sqlite_wasm_rs::sqlite3_bind_double(stmt, param_index, *val)
1111                    }
1112                    ColumnValue::Text(val) => {
1113                        // Sanitize string by removing null bytes (SQLite text shouldn't contain them)
1114                        let sanitized = val.replace('\0', "");
1115                        // Safe: after removing null bytes, CString::new cannot fail
1116                        let text_cstr = CString::new(sanitized.as_str())
1117                            .expect("CString::new should not fail after null byte removal");
1118                        let result = sqlite_wasm_rs::sqlite3_bind_text(
1119                            stmt,
1120                            param_index,
1121                            text_cstr.as_ptr(),
1122                            sanitized.len() as i32,
1123                            sqlite_wasm_rs::SQLITE_TRANSIENT(),
1124                        );
1125                        text_cstrings.push(text_cstr); // Keep alive
1126                        result
1127                    }
1128                    ColumnValue::Blob(val) => sqlite_wasm_rs::sqlite3_bind_blob(
1129                        stmt,
1130                        param_index,
1131                        val.as_ptr() as *const _,
1132                        val.len() as i32,
1133                        sqlite_wasm_rs::SQLITE_TRANSIENT(),
1134                    ),
1135                    _ => sqlite_wasm_rs::sqlite3_bind_null(stmt, param_index),
1136                }
1137            };
1138
1139            if bind_ret != sqlite_wasm_rs::SQLITE_OK {
1140                unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
1141                // Track error
1142                #[cfg(feature = "telemetry")]
1143                if let Some(metrics) = &self.metrics {
1144                    metrics.errors_total().inc();
1145                }
1146                return Err(
1147                    DatabaseError::new("SQLITE_ERROR", "Failed to bind parameter").with_sql(sql),
1148                );
1149            }
1150        }
1151
1152        if sql.trim().to_uppercase().starts_with("SELECT") {
1153            let column_count = unsafe { sqlite_wasm_rs::sqlite3_column_count(stmt) };
1154            let mut columns = Vec::new();
1155            let mut rows = Vec::new();
1156
1157            // Get column names
1158            for i in 0..column_count {
1159                let col_name = unsafe {
1160                    let name_ptr = sqlite_wasm_rs::sqlite3_column_name(stmt, i);
1161                    if name_ptr.is_null() {
1162                        format!("col_{}", i)
1163                    } else {
1164                        std::ffi::CStr::from_ptr(name_ptr)
1165                            .to_string_lossy()
1166                            .into_owned()
1167                    }
1168                };
1169                columns.push(col_name);
1170            }
1171
1172            // Execute and fetch rows
1173            loop {
1174                let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
1175                if step_ret == sqlite_wasm_rs::SQLITE_ROW {
1176                    let mut values = Vec::new();
1177                    for i in 0..column_count {
1178                        let value = unsafe {
1179                            let col_type = sqlite_wasm_rs::sqlite3_column_type(stmt, i);
1180                            match col_type {
1181                                sqlite_wasm_rs::SQLITE_NULL => ColumnValue::Null,
1182                                sqlite_wasm_rs::SQLITE_INTEGER => {
1183                                    let val = sqlite_wasm_rs::sqlite3_column_int64(stmt, i);
1184                                    ColumnValue::Integer(val)
1185                                }
1186                                sqlite_wasm_rs::SQLITE_FLOAT => {
1187                                    let val = sqlite_wasm_rs::sqlite3_column_double(stmt, i);
1188                                    ColumnValue::Real(val)
1189                                }
1190                                sqlite_wasm_rs::SQLITE_TEXT => {
1191                                    let text_ptr = sqlite_wasm_rs::sqlite3_column_text(stmt, i);
1192                                    if text_ptr.is_null() {
1193                                        ColumnValue::Null
1194                                    } else {
1195                                        let text = std::ffi::CStr::from_ptr(text_ptr as *const i8)
1196                                            .to_string_lossy()
1197                                            .into_owned();
1198                                        ColumnValue::Text(text)
1199                                    }
1200                                }
1201                                sqlite_wasm_rs::SQLITE_BLOB => {
1202                                    let blob_ptr = sqlite_wasm_rs::sqlite3_column_blob(stmt, i);
1203                                    let blob_size = sqlite_wasm_rs::sqlite3_column_bytes(stmt, i);
1204                                    if blob_ptr.is_null() || blob_size == 0 {
1205                                        ColumnValue::Blob(vec![])
1206                                    } else {
1207                                        let blob_slice = std::slice::from_raw_parts(
1208                                            blob_ptr as *const u8,
1209                                            blob_size as usize,
1210                                        );
1211                                        ColumnValue::Blob(blob_slice.to_vec())
1212                                    }
1213                                }
1214                                _ => ColumnValue::Null,
1215                            }
1216                        };
1217                        values.push(value);
1218                    }
1219                    rows.push(Row { values });
1220                } else if step_ret == sqlite_wasm_rs::SQLITE_DONE {
1221                    break;
1222                } else {
1223                    // Get SQLite error message before finalizing
1224                    let err_msg = unsafe {
1225                        let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
1226                        if !err_ptr.is_null() {
1227                            std::ffi::CStr::from_ptr(err_ptr)
1228                                .to_string_lossy()
1229                                .to_string()
1230                        } else {
1231                            "Unknown SQLite error".to_string()
1232                        }
1233                    };
1234                    unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
1235                    // Track error
1236                    #[cfg(feature = "telemetry")]
1237                    if let Some(metrics) = &self.metrics {
1238                        metrics.errors_total().inc();
1239                    }
1240                    return Err(DatabaseError::new(
1241                        "SQLITE_ERROR",
1242                        &format!("Error executing SELECT statement: {}", err_msg),
1243                    )
1244                    .with_sql(sql));
1245                }
1246            }
1247
1248            unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
1249
1250            let execution_time_ms = js_sys::Date::now() - start_time;
1251
1252            // Track query duration
1253            #[cfg(feature = "telemetry")]
1254            if let Some(metrics) = &self.metrics {
1255                metrics.query_duration().observe(execution_time_ms);
1256            }
1257
1258            // Finish span successfully for SELECT query
1259            #[cfg(feature = "telemetry")]
1260            if let Some(mut s) = span {
1261                s.status = crate::telemetry::SpanStatus::Ok;
1262                s.end_time_ms = Some(js_sys::Date::now());
1263                s.attributes
1264                    .insert("duration_ms".to_string(), execution_time_ms.to_string());
1265                s.attributes
1266                    .insert("row_count".to_string(), rows.len().to_string());
1267                if let Some(recorder) = &self.span_recorder {
1268                    recorder.record_span(s);
1269                }
1270            }
1271
1272            Ok(QueryResult {
1273                columns,
1274                rows,
1275                affected_rows: 0,
1276                last_insert_id: None,
1277                execution_time_ms,
1278            })
1279        } else {
1280            // Non-SELECT statements
1281            let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
1282            // Track error
1283            #[cfg(feature = "telemetry")]
1284            if let Some(metrics) = &self.metrics {
1285                metrics.errors_total().inc();
1286            }
1287            unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
1288
1289            if step_ret != sqlite_wasm_rs::SQLITE_DONE {
1290                let err_msg = unsafe {
1291                    let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
1292                    if !err_ptr.is_null() {
1293                        std::ffi::CStr::from_ptr(err_ptr)
1294                            .to_string_lossy()
1295                            .to_string()
1296                    } else {
1297                        "Unknown SQLite error".to_string()
1298                    }
1299                };
1300                return Err(DatabaseError::new(
1301                    "SQLITE_ERROR",
1302                    &format!("Failed to execute statement: {}", err_msg),
1303                )
1304                .with_sql(sql));
1305            }
1306
1307            let execution_time_ms = js_sys::Date::now() - start_time;
1308
1309            // Track query duration
1310            #[cfg(feature = "telemetry")]
1311            if let Some(metrics) = &self.metrics {
1312                metrics.query_duration().observe(execution_time_ms);
1313            }
1314            let affected_rows = unsafe { sqlite_wasm_rs::sqlite3_changes(self.db()) } as u32;
1315            let last_insert_id = if sql.trim().to_uppercase().starts_with("INSERT") {
1316                Some(unsafe { sqlite_wasm_rs::sqlite3_last_insert_rowid(self.db()) })
1317            } else {
1318                None
1319            };
1320
1321            // Finish span successfully
1322            #[cfg(feature = "telemetry")]
1323            if let Some(mut s) = span {
1324                s.status = crate::telemetry::SpanStatus::Ok;
1325                s.end_time_ms = Some(js_sys::Date::now());
1326                s.attributes
1327                    .insert("duration_ms".to_string(), execution_time_ms.to_string());
1328                s.attributes
1329                    .insert("affected_rows".to_string(), affected_rows.to_string());
1330                if let Some(recorder) = &self.span_recorder {
1331                    recorder.record_span(s);
1332                }
1333            }
1334
1335            Ok(QueryResult {
1336                columns: vec![],
1337                rows: vec![],
1338                affected_rows,
1339                last_insert_id,
1340                execution_time_ms,
1341            })
1342        }
1343    }
1344
1345    /// Set telemetry metrics for this database instance
1346    #[cfg(feature = "telemetry")]
1347    pub fn set_metrics(&mut self, metrics: Option<crate::telemetry::Metrics>) {
1348        self.metrics = metrics.clone();
1349        self.ensure_metrics_propagated();
1350    }
1351
1352    /// Set span recorder for distributed tracing
1353    #[cfg(feature = "telemetry")]
1354    pub fn set_span_recorder(&mut self, recorder: Option<crate::telemetry::SpanRecorder>) {
1355        self.span_recorder = recorder;
1356    }
1357
1358    /// Get span context for distributed tracing
1359    #[cfg(feature = "telemetry")]
1360    pub fn get_span_context(&self) -> Option<&crate::telemetry::SpanContext> {
1361        self.span_context.as_ref()
1362    }
1363
1364    /// Get span recorder for distributed tracing
1365    #[cfg(feature = "telemetry")]
1366    pub fn get_span_recorder(&self) -> Option<&crate::telemetry::SpanRecorder> {
1367        self.span_recorder.as_ref()
1368    }
1369    /// Ensure metrics are propagated to BlockStorage
1370    #[cfg(feature = "telemetry")]
1371    fn ensure_metrics_propagated(&self) {
1372        // Propagate metrics to BlockStorage via STORAGE_REGISTRY
1373        #[cfg(target_arch = "wasm32")]
1374        {
1375            use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
1376
1377            if self.metrics.is_none() {
1378                return;
1379            }
1380
1381            let db_name = &self.name;
1382
1383            if let Some(storage_rc) = get_storage_with_fallback(db_name) {
1384                use crate::vfs::indexeddb_vfs::with_storage_borrow_mut;
1385                let _ = with_storage_borrow_mut(&storage_rc, "set_metrics", |storage| {
1386                    storage.set_metrics(self.metrics.clone());
1387                    Ok(())
1388                });
1389            }
1390        }
1391    }
1392
1393    pub async fn close_internal(&mut self) -> Result<(), DatabaseError> {
1394        log::info!("CLOSE_INTERNAL STARTED for: {}", self.name);
1395
1396        // Check if connection is already null (e.g., after import force-close)
1397        if self.connection_state.db.get().is_null() {
1398            log::info!(
1399                "Connection already null for {}, skipping close operations",
1400                self.name
1401            );
1402            return Ok(());
1403        }
1404
1405        // Checkpoint WAL data before close using PASSIVE mode (non-blocking)
1406        log::info!("Checkpointing WAL before close: {}", self.name);
1407        let _ = self
1408            .execute_internal("PRAGMA wal_checkpoint(PASSIVE)")
1409            .await;
1410        log::info!("WAL checkpoint completed for: {}", self.name);
1411
1412        // Sync to IndexedDB before closing to ensure data persists
1413        log::info!("Syncing database before close: {}", self.name);
1414        self.sync_internal().await?;
1415        log::info!("Sync completed for: {}", self.name);
1416
1417        web_sys::console::log_1(
1418            &format!("CLOSE: About to stop leader election for {}", self.name).into(),
1419        );
1420
1421        // Stop leader election before closing
1422        #[cfg(target_arch = "wasm32")]
1423        {
1424            use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
1425
1426            let db_name = &self.name;
1427            web_sys::console::log_1(
1428                &format!("STOP_ELECTION: Getting storage for {}", db_name).into(),
1429            );
1430            log::info!("STOP_ELECTION: Getting storage for {}", db_name);
1431            let storage_rc = get_storage_with_fallback(db_name);
1432
1433            if let Some(storage_rc) = storage_rc {
1434                log::info!("STOP_ELECTION: Found storage for {}, calling stop", db_name);
1435                match with_storage_async!(storage_rc, "stop_leader_election", |storage| storage
1436                    .stop_leader_election())
1437                {
1438                    Some(Ok(())) => {
1439                        log::info!("STOP_ELECTION: Successfully stopped for {}", db_name);
1440                    }
1441                    Some(Err(e)) => {
1442                        log::warn!("STOP_ELECTION: Failed for {}: {:?}", db_name, e);
1443                    }
1444                    None => {
1445                        log::warn!("STOP_ELECTION: Borrow failed for {}", db_name);
1446                    }
1447                }
1448            } else {
1449                log::warn!("STOP_ELECTION: No storage found for {}", db_name);
1450            }
1451            log::info!("STOP_ELECTION: Completed section for {}", db_name);
1452        }
1453
1454        // NOTE: Do NOT call release_connection here
1455        // The Drop impl will handle releasing the connection when the Database instance is dropped
1456        // Calling it here would cause a double-release when both close() and Drop are called
1457
1458        log::info!(
1459            "Closed database: {} (connection will be released on Drop)",
1460            self.name
1461        );
1462        Ok(())
1463    }
1464
1465    /// Query database and return rows (alias for execute that returns rows)
1466    pub async fn query(&mut self, sql: &str) -> Result<Vec<Row>, DatabaseError> {
1467        let result = self.execute_internal(sql).await?;
1468        Ok(result.rows)
1469    }
1470
1471    pub async fn sync_internal(&mut self) -> Result<(), DatabaseError> {
1472        // Start timing for telemetry
1473        #[cfg(all(target_arch = "wasm32", feature = "telemetry"))]
1474        let start_time = js_sys::Date::now();
1475
1476        // Create span for VFS sync operation with automatic context linking
1477        #[cfg(feature = "telemetry")]
1478        let span = if self.span_recorder.is_some() {
1479            let mut builder = crate::telemetry::SpanBuilder::new("vfs_sync".to_string())
1480                .with_attribute("operation", "sync");
1481
1482            // Automatically link to parent span via context and copy baggage
1483            if let Some(ref context) = self.span_context {
1484                builder = builder
1485                    .with_context(context)
1486                    .with_baggage_from_context(context);
1487            }
1488
1489            let span = builder.build();
1490
1491            // Enter this span's context
1492            if let Some(ref context) = self.span_context {
1493                context.enter_span(span.span_id.clone());
1494            }
1495
1496            Some(span)
1497        } else {
1498            None
1499        };
1500
1501        // Track sync operation start
1502        #[cfg(feature = "telemetry")]
1503        if let Some(ref metrics) = self.metrics {
1504            metrics.sync_operations_total().inc();
1505        }
1506
1507        // Track blocks persisted for span attributes
1508        #[cfg(feature = "telemetry")]
1509        let mut blocks_count = 0;
1510
1511        // Trigger VFS sync to persist all blocks to IndexedDB
1512        #[cfg(target_arch = "wasm32")]
1513        {
1514            use crate::storage::vfs_sync;
1515
1516            // Collect blocks from GLOBAL_STORAGE (where VFS writes them)
1517            // CRITICAL: Use self.name WITH .db - matches main branch behavior
1518            // Database.name already normalized by normalize_db_name() in Database::new()
1519            let storage_name = &self.name;
1520
1521            // Advance commit marker
1522            let next_commit = vfs_sync::with_global_commit_marker(|cm| {
1523                #[cfg(target_arch = "wasm32")]
1524                let mut cm_ref = cm.borrow_mut();
1525                #[cfg(not(target_arch = "wasm32"))]
1526                let mut cm_ref = cm.lock();
1527
1528                let current = cm_ref.get(storage_name).copied().unwrap_or(0);
1529                let new_marker = current + 1;
1530                cm_ref.insert(storage_name.to_string(), new_marker);
1531                log::debug!(
1532                    "Advanced commit marker for {} from {} to {}",
1533                    storage_name,
1534                    current,
1535                    new_marker
1536                );
1537                new_marker
1538            });
1539
1540            web_sys::console::log_1(
1541                &format!(
1542                    "[SYNC] Collecting blocks from GLOBAL_STORAGE for: {}",
1543                    storage_name
1544                )
1545                .into(),
1546            );
1547            let (blocks_to_persist, metadata_to_persist) =
1548                vfs_sync::with_global_storage(|storage| {
1549                    #[cfg(target_arch = "wasm32")]
1550                    let storage_map = storage.borrow();
1551                    #[cfg(not(target_arch = "wasm32"))]
1552                    let storage_map = storage.lock();
1553
1554                    let blocks = if let Some(db_storage) = storage_map.get(storage_name) {
1555                        let count = db_storage.len();
1556                        web_sys::console::log_1(
1557                            &format!("[SYNC] Found {} blocks in GLOBAL_STORAGE", count).into(),
1558                        );
1559                        db_storage
1560                            .iter()
1561                            .map(|(&id, data)| (id, data.clone()))
1562                            .collect::<Vec<_>>()
1563                    } else {
1564                        web_sys::console::log_1(
1565                            &format!(
1566                                "[SYNC] No blocks found in GLOBAL_STORAGE for {}",
1567                                storage_name
1568                            )
1569                            .into(),
1570                        );
1571                        Vec::new()
1572                    };
1573
1574                    let metadata = vfs_sync::with_global_metadata(|meta| {
1575                        #[cfg(target_arch = "wasm32")]
1576                        let meta_map = meta.borrow();
1577                        #[cfg(not(target_arch = "wasm32"))]
1578                        let meta_map = meta.lock();
1579                        if let Some(db_meta) = meta_map.get(storage_name) {
1580                            let count = db_meta.len();
1581                            web_sys::console::log_1(
1582                                &format!("[SYNC] Found {} metadata entries", count).into(),
1583                            );
1584                            db_meta
1585                                .iter()
1586                                .map(|(&id, metadata)| (id, metadata.checksum))
1587                                .collect::<Vec<_>>()
1588                        } else {
1589                            web_sys::console::log_1(&format!("[SYNC] No metadata found").into());
1590                            Vec::new()
1591                        }
1592                    });
1593
1594                    (blocks, metadata)
1595                });
1596
1597            web_sys::console::log_1(
1598                &format!(
1599                    "[SYNC] Preparing to persist {} blocks to IndexedDB",
1600                    blocks_to_persist.len()
1601                )
1602                .into(),
1603            );
1604
1605            if !blocks_to_persist.is_empty() {
1606                #[cfg(feature = "telemetry")]
1607                {
1608                    blocks_count = blocks_to_persist.len();
1609                }
1610                web_sys::console::log_1(
1611                    &format!(
1612                        "[SYNC] Persisting {} blocks to IndexedDB",
1613                        blocks_to_persist.len()
1614                    )
1615                    .into(),
1616                );
1617                crate::storage::wasm_indexeddb::persist_to_indexeddb_event_based(
1618                    storage_name,
1619                    blocks_to_persist,
1620                    metadata_to_persist,
1621                    next_commit,
1622                    #[cfg(feature = "telemetry")]
1623                    self.span_recorder.clone(),
1624                    #[cfg(feature = "telemetry")]
1625                    span.as_ref().map(|s| s.span_id.clone()),
1626                )
1627                .await?;
1628                web_sys::console::log_1(
1629                    &format!("[SYNC] Successfully persisted to IndexedDB").into(),
1630                );
1631            } else {
1632                web_sys::console::log_1(
1633                    &format!("[SYNC] WARNING: No blocks to persist - GLOBAL_STORAGE is empty!")
1634                        .into(),
1635                );
1636            }
1637
1638            // Send notification after successful sync
1639            use crate::storage::broadcast_notifications::{
1640                BroadcastNotification, send_change_notification,
1641            };
1642
1643            let notification = BroadcastNotification::DataChanged {
1644                db_name: self.name.clone(),
1645                timestamp: js_sys::Date::now() as u64,
1646            };
1647
1648            log::debug!("Sending DataChanged notification for {}", self.name);
1649
1650            if let Err(e) = send_change_notification(&notification) {
1651                log::warn!("Failed to send change notification: {}", e);
1652                // Don't fail the sync if notification fails
1653            }
1654        }
1655
1656        // Record sync duration
1657        #[cfg(all(target_arch = "wasm32", feature = "telemetry"))]
1658        if let Some(ref metrics) = self.metrics {
1659            let duration_ms = js_sys::Date::now() - start_time;
1660            metrics.sync_duration().observe(duration_ms);
1661        }
1662
1663        // Finish span successfully
1664        #[cfg(feature = "telemetry")]
1665        if let Some(mut s) = span {
1666            s.status = crate::telemetry::SpanStatus::Ok;
1667            #[cfg(target_arch = "wasm32")]
1668            {
1669                s.end_time_ms = Some(js_sys::Date::now());
1670                let duration_ms = s.end_time_ms.unwrap() - s.start_time_ms;
1671                s.attributes
1672                    .insert("duration_ms".to_string(), duration_ms.to_string());
1673            }
1674            #[cfg(not(target_arch = "wasm32"))]
1675            {
1676                let now = std::time::SystemTime::now()
1677                    .duration_since(std::time::UNIX_EPOCH)
1678                    .unwrap()
1679                    .as_millis() as f64;
1680                s.end_time_ms = Some(now);
1681                let duration_ms = s.end_time_ms.unwrap() - s.start_time_ms;
1682                s.attributes
1683                    .insert("duration_ms".to_string(), duration_ms.to_string());
1684            }
1685            s.attributes
1686                .insert("blocks_persisted".to_string(), blocks_count.to_string());
1687            if let Some(recorder) = &self.span_recorder {
1688                recorder.record_span(s);
1689            }
1690
1691            // Exit span context
1692            if let Some(ref context) = self.span_context {
1693                context.exit_span();
1694            }
1695        }
1696
1697        Ok(())
1698    }
1699}
1700
1701#[cfg(target_arch = "wasm32")]
1702impl Drop for Database {
1703    fn drop(&mut self) {
1704        web_sys::console::log_1(&format!("DROP: Releasing connection for {}", self.name).into());
1705
1706        // Release the connection back to the pool
1707        // The pool will close it if this was the last reference
1708        // Pool uses name without .db, so strip it
1709        let pool_key = self.name.trim_end_matches(".db");
1710        crate::connection_pool::release_connection(pool_key);
1711
1712        web_sys::console::log_1(&format!("DROP: Connection released for {}", self.name).into());
1713
1714        // CRITICAL: Stop heartbeat interval synchronously to prevent leaks
1715        use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
1716        if let Some(storage_rc) = get_storage_with_fallback(&self.name) {
1717            // No outer borrow needed - BlockStorage uses RefCell for interior mutability
1718            let storage = &*storage_rc;
1719            // Try to borrow manager - if it fails, skip (already being cleaned)
1720            if let Ok(mut manager_ref) = storage.leader_election.try_borrow_mut() {
1721                if let Some(ref mut manager) = *manager_ref {
1722                    // Clear interval if it exists (idempotent)
1723                    if let Some(interval_id) = manager.heartbeat_interval.take() {
1724                        if let Some(window) = web_sys::window() {
1725                            window.clear_interval_with_handle(interval_id);
1726                            web_sys::console::log_1(
1727                                &format!(
1728                                    "DROP: Cleared heartbeat interval {} for {}",
1729                                    interval_id, self.name
1730                                )
1731                                .into(),
1732                            );
1733                        }
1734                    }
1735                    // Drop closure to release Rc references
1736                    if manager.heartbeat_closure.take().is_some() {
1737                        web_sys::console::log_1(
1738                            &format!("DROP: Released heartbeat closure for {}", self.name).into(),
1739                        );
1740                    }
1741                }
1742            } else {
1743                // Manager already borrowed - skip (first DB is cleaning up)
1744                web_sys::console::log_1(
1745                    &format!(
1746                        "[DROP] Skipping {} (heartbeat already stopped by shared DB)",
1747                        self.name
1748                    )
1749                    .into(),
1750                );
1751            }
1752        }
1753
1754        // Keep BlockStorage in STORAGE_REGISTRY so multiple Database instances
1755        // with the same name share the same BlockStorage and leader election state
1756        // Blocks persist in GLOBAL_STORAGE across Database instances
1757        log::debug!(
1758            "Closed database: {} (BlockStorage remains in registry)",
1759            self.name
1760        );
1761    }
1762}
1763
1764// Add wasm_bindgen exports for the main Database struct
1765#[cfg(target_arch = "wasm32")]
1766#[wasm_bindgen]
1767impl Database {
1768    #[wasm_bindgen(js_name = "newDatabase")]
1769    pub async fn new_wasm(name: String) -> Result<Database, JsValue> {
1770        // Normalize database name: ensure it has .db suffix
1771        let normalized_name = if name.ends_with(".db") {
1772            name.clone()
1773        } else {
1774            format!("{}.db", name)
1775        };
1776
1777        let config = DatabaseConfig {
1778            name: normalized_name.clone(),
1779            version: Some(1),
1780            cache_size: Some(10_000),
1781            page_size: Some(4096),
1782            auto_vacuum: Some(true),
1783            journal_mode: Some("WAL".to_string()),
1784            max_export_size_bytes: Some(2 * 1024 * 1024 * 1024), // 2GB default
1785        };
1786
1787        let db = Database::new(config)
1788            .await
1789            .map_err(|e| JsValue::from_str(&format!("Failed to create database: {}", e)))?;
1790
1791        // Start listening for write queue requests (leader will process them)
1792        Self::start_write_queue_listener(&normalized_name)?;
1793
1794        Ok(db)
1795    }
1796
1797    /// Get the database name
1798    #[wasm_bindgen(getter)]
1799    pub fn name(&self) -> String {
1800        self.name.clone()
1801    }
1802
1803    /// Get all database names stored in IndexedDB
1804    ///
1805    /// Returns an array of database names (sorted alphabetically)
1806    #[wasm_bindgen(js_name = "getAllDatabases")]
1807    pub async fn get_all_databases() -> Result<JsValue, JsValue> {
1808        use crate::storage::vfs_sync::with_global_storage;
1809        use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
1810        use std::collections::HashSet;
1811
1812        log::info!("getAllDatabases called");
1813        let mut db_names = HashSet::new();
1814
1815        // Get databases from persistent list (localStorage)
1816        match Self::get_persistent_database_list() {
1817            Ok(persistent_list) => {
1818                log::info!("Persistent list has {} entries", persistent_list.len());
1819                for name in persistent_list {
1820                    log::info!("Found in persistent list: {}", name);
1821                    db_names.insert(name);
1822                }
1823            }
1824            Err(e) => {
1825                log::warn!("Failed to get persistent list: {:?}", e);
1826            }
1827        }
1828
1829        // Get databases from STORAGE_REGISTRY (currently open)
1830        // SAFETY: WASM is single-threaded, no concurrent access possible
1831        STORAGE_REGISTRY.with(|reg| unsafe {
1832            let registry = &*reg.get();
1833            log::info!("STORAGE_REGISTRY has {} entries", registry.len());
1834            for key in registry.keys() {
1835                log::info!("Found in STORAGE_REGISTRY: {}", key);
1836                db_names.insert(key.clone());
1837            }
1838        });
1839
1840        // Get databases from GLOBAL_STORAGE (in-memory persistent storage)
1841        with_global_storage(|storage| {
1842            let storage_borrow = storage.borrow();
1843            log::info!("GLOBAL_STORAGE has {} entries", storage_borrow.len());
1844            for key in storage_borrow.keys() {
1845                log::info!("Found in GLOBAL_STORAGE: {}", key);
1846                db_names.insert(key.clone());
1847            }
1848        });
1849
1850        log::info!("Total unique databases found: {}", db_names.len());
1851
1852        // Convert to sorted vector
1853        let mut result_vec: Vec<String> = db_names.into_iter().collect();
1854        result_vec.sort();
1855
1856        // Convert to JavaScript array
1857        let js_array = js_sys::Array::new();
1858        for name in &result_vec {
1859            log::info!("Returning database: {}", name);
1860            js_array.push(&JsValue::from_str(name));
1861        }
1862
1863        log::info!("getAllDatabases returning {} databases", result_vec.len());
1864
1865        Ok(js_array.into())
1866    }
1867
1868    /// Delete a database from storage
1869    ///
1870    /// Removes database from both STORAGE_REGISTRY and GLOBAL_STORAGE
1871    #[wasm_bindgen(js_name = "deleteDatabase")]
1872    pub async fn delete_database(name: String) -> Result<(), JsValue> {
1873        use crate::storage::vfs_sync::{
1874            with_global_commit_marker, with_global_metadata, with_global_storage,
1875        };
1876
1877        // Normalize database name
1878        let normalized_name = if name.ends_with(".db") {
1879            name.clone()
1880        } else {
1881            format!("{}.db", name)
1882        };
1883
1884        log::info!("Deleting database: {}", normalized_name);
1885
1886        // Remove from STORAGE_REGISTRY
1887        use crate::vfs::indexeddb_vfs::remove_storage_from_registry;
1888        remove_storage_from_registry(&normalized_name);
1889
1890        // Remove from GLOBAL_STORAGE
1891        with_global_storage(|gs| {
1892            #[cfg(target_arch = "wasm32")]
1893            let mut storage = gs.borrow_mut();
1894            #[cfg(not(target_arch = "wasm32"))]
1895            let mut storage = gs.lock();
1896            storage.remove(&normalized_name);
1897        });
1898
1899        // Remove from GLOBAL_METADATA
1900        with_global_metadata(|gm| {
1901            #[cfg(target_arch = "wasm32")]
1902            let mut metadata = gm.borrow_mut();
1903            #[cfg(not(target_arch = "wasm32"))]
1904            let mut metadata = gm.lock();
1905            metadata.remove(&normalized_name);
1906        });
1907
1908        // Remove from commit markers
1909        with_global_commit_marker(|cm| {
1910            #[cfg(target_arch = "wasm32")]
1911            let mut markers = cm.borrow_mut();
1912            #[cfg(not(target_arch = "wasm32"))]
1913            let mut markers = cm.lock();
1914            log::info!(
1915                "Cleared commit marker for {} from GLOBAL storage",
1916                normalized_name
1917            );
1918            markers.remove(&normalized_name);
1919        });
1920
1921        // Delete from IndexedDB
1922        let idb_name = format!("absurder_{}", normalized_name);
1923        let _delete_promise = js_sys::eval(&format!("indexedDB.deleteDatabase('{}')", idb_name))
1924            .map_err(|e| JsValue::from_str(&format!("Failed to delete IndexedDB: {:?}", e)))?;
1925
1926        log::info!("Database deleted: {}", normalized_name);
1927
1928        // Remove from persistent list
1929        Self::remove_database_from_persistent_list(&normalized_name)?;
1930
1931        Ok(())
1932    }
1933
1934    /// Add database name to persistent list in localStorage
1935    #[allow(dead_code)]
1936    fn add_database_to_persistent_list(db_name: &str) -> Result<(), JsValue> {
1937        log::info!("add_database_to_persistent_list called for: {}", db_name);
1938
1939        let window = web_sys::window().ok_or_else(|| {
1940            log::error!("No window object");
1941            JsValue::from_str("No window")
1942        })?;
1943
1944        let storage = window
1945            .local_storage()
1946            .map_err(|e| {
1947                log::error!("Failed to get localStorage: {:?}", e);
1948                JsValue::from_str("No localStorage")
1949            })?
1950            .ok_or_else(|| {
1951                log::error!("localStorage not available");
1952                JsValue::from_str("localStorage not available")
1953            })?;
1954
1955        let key = "absurder_db_list";
1956        let existing = storage.get_item(key).map_err(|e| {
1957            log::error!("Failed to read localStorage key {}: {:?}", key, e);
1958            JsValue::from_str("Failed to read localStorage")
1959        })?;
1960
1961        log::debug!("Existing localStorage value: {:?}", existing);
1962
1963        let mut db_list: Vec<String> = if let Some(json_str) = existing {
1964            match serde_json::from_str(&json_str) {
1965                Ok(list) => {
1966                    log::debug!("Parsed existing list: {:?}", list);
1967                    list
1968                }
1969                Err(e) => {
1970                    log::warn!("Failed to parse localStorage JSON: {}, starting fresh", e);
1971                    Vec::new()
1972                }
1973            }
1974        } else {
1975            log::debug!("No existing list, creating new");
1976            Vec::new()
1977        };
1978
1979        if !db_list.contains(&db_name.to_string()) {
1980            db_list.push(db_name.to_string());
1981            db_list.sort();
1982            log::debug!("Updated list: {:?}", db_list);
1983
1984            let json_str = serde_json::to_string(&db_list).map_err(|e| {
1985                log::error!("Failed to serialize list: {}", e);
1986                JsValue::from_str("Failed to serialize")
1987            })?;
1988
1989            log::debug!("Writing to localStorage: {}", json_str);
1990
1991            storage.set_item(key, &json_str).map_err(|e| {
1992                log::error!("Failed to write to localStorage: {:?}", e);
1993                JsValue::from_str("Failed to write localStorage")
1994            })?;
1995
1996            log::info!("Successfully added {} to persistent database list", db_name);
1997        } else {
1998            log::info!("{} already in persistent list", db_name);
1999        }
2000
2001        Ok(())
2002    }
2003
2004    /// Remove database name from persistent list in localStorage
2005    fn remove_database_from_persistent_list(db_name: &str) -> Result<(), JsValue> {
2006        let window = web_sys::window().ok_or_else(|| JsValue::from_str("No window"))?;
2007        let storage = window
2008            .local_storage()
2009            .map_err(|_| JsValue::from_str("No localStorage"))?
2010            .ok_or_else(|| JsValue::from_str("localStorage not available"))?;
2011
2012        let key = "absurder_db_list";
2013        let existing = storage
2014            .get_item(key)
2015            .map_err(|_| JsValue::from_str("Failed to read localStorage"))?;
2016
2017        if let Some(json_str) = existing {
2018            let mut db_list: Vec<String> =
2019                serde_json::from_str(&json_str).unwrap_or_else(|_| Vec::new());
2020            db_list.retain(|name| name != db_name);
2021            let json_str = serde_json::to_string(&db_list)
2022                .map_err(|_| JsValue::from_str("Failed to serialize"))?;
2023            storage
2024                .set_item(key, &json_str)
2025                .map_err(|_| JsValue::from_str("Failed to write localStorage"))?;
2026            log::info!("Removed {} from persistent database list", db_name);
2027        }
2028
2029        Ok(())
2030    }
2031
2032    /// Get database names from persistent list in localStorage
2033    fn get_persistent_database_list() -> Result<Vec<String>, JsValue> {
2034        log::info!("get_persistent_database_list called");
2035
2036        let window = web_sys::window().ok_or_else(|| {
2037            log::error!("No window object");
2038            JsValue::from_str("No window")
2039        })?;
2040
2041        let storage = window
2042            .local_storage()
2043            .map_err(|e| {
2044                log::error!("Failed to get localStorage: {:?}", e);
2045                JsValue::from_str("No localStorage")
2046            })?
2047            .ok_or_else(|| {
2048                log::error!("localStorage not available");
2049                JsValue::from_str("localStorage not available")
2050            })?;
2051
2052        let key = "absurder_db_list";
2053        let existing = storage.get_item(key).map_err(|e| {
2054            log::error!("Failed to read localStorage key {}: {:?}", key, e);
2055            JsValue::from_str("Failed to read localStorage")
2056        })?;
2057
2058        log::debug!("Read from localStorage: {:?}", existing);
2059
2060        if let Some(json_str) = existing {
2061            match serde_json::from_str::<Vec<String>>(&json_str) {
2062                Ok(db_list) => {
2063                    log::info!(
2064                        "Successfully parsed {} databases from localStorage",
2065                        db_list.len()
2066                    );
2067                    log::debug!("Database list: {:?}", db_list);
2068                    Ok(db_list)
2069                }
2070                Err(e) => {
2071                    log::error!("Failed to parse localStorage JSON: {}", e);
2072                    Ok(Vec::new())
2073                }
2074            }
2075        } else {
2076            log::info!("No persistent database list in localStorage");
2077            Ok(Vec::new())
2078        }
2079    }
2080
2081    /// Start listening for write queue requests (leader processes these)
2082    fn start_write_queue_listener(db_name: &str) -> Result<(), JsValue> {
2083        use crate::storage::write_queue::{
2084            WriteQueueMessage, WriteResponse, register_write_queue_listener,
2085        };
2086        use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2087
2088        let db_name_clone = db_name.to_string();
2089
2090        let callback = Closure::wrap(Box::new(move |msg: JsValue| {
2091            let db_name_inner = db_name_clone.clone();
2092
2093            // Parse the message
2094            if let Ok(json_str) = js_sys::JSON::stringify(&msg) {
2095                if let Some(json_str) = json_str.as_string() {
2096                    if let Ok(message) = serde_json::from_str::<WriteQueueMessage>(&json_str) {
2097                        if let WriteQueueMessage::WriteRequest(request) = message {
2098                            log::debug!("Leader received write request: {}", request.request_id);
2099
2100                            // Check if we're the leader
2101                            let storage_rc = get_storage_with_fallback(&db_name_inner);
2102
2103                            if let Some(storage) = storage_rc {
2104                                // Spawn async task to process the write
2105                                wasm_bindgen_futures::spawn_local(async move {
2106                                    let is_leader = with_storage_async!(
2107                                        storage,
2108                                        "write_queue_is_leader",
2109                                        |s| s.is_leader()
2110                                    );
2111                                    if is_leader.is_none() {
2112                                        log::error!("Failed to check leader status");
2113                                        return;
2114                                    }
2115                                    let is_leader = is_leader.unwrap();
2116
2117                                    if !is_leader {
2118                                        log::error!("Not leader, ignoring write request");
2119                                        return;
2120                                    }
2121
2122                                    log::debug!("Processing write request as leader");
2123
2124                                    // Create a temporary database instance to execute the SQL
2125                                    match Database::new_wasm(db_name_inner.clone()).await {
2126                                        Ok(mut db) => {
2127                                            // Execute the SQL
2128                                            match db.execute_internal(&request.sql).await {
2129                                                Ok(result) => {
2130                                                    // Send success response
2131                                                    let response = WriteResponse::Success {
2132                                                        request_id: request.request_id.clone(),
2133                                                        affected_rows: result.affected_rows
2134                                                            as usize,
2135                                                    };
2136
2137                                                    use crate::storage::write_queue::send_write_response;
2138                                                    if let Err(e) = send_write_response(
2139                                                        &db_name_inner,
2140                                                        response,
2141                                                    ) {
2142                                                        log::error!(
2143                                                            "Failed to send response: {}",
2144                                                            e
2145                                                        );
2146                                                    } else {
2147                                                        log::info!(
2148                                                            "Write response sent successfully"
2149                                                        );
2150                                                    }
2151                                                }
2152                                                Err(e) => {
2153                                                    // Send error response
2154                                                    let response = WriteResponse::Error {
2155                                                        request_id: request.request_id.clone(),
2156                                                        error_message: e.to_string(),
2157                                                    };
2158
2159                                                    use crate::storage::write_queue::send_write_response;
2160                                                    if let Err(e) = send_write_response(
2161                                                        &db_name_inner,
2162                                                        response,
2163                                                    ) {
2164                                                        log::error!(
2165                                                            "Failed to send error response: {}",
2166                                                            e
2167                                                        );
2168                                                    }
2169                                                }
2170                                            }
2171                                        }
2172                                        Err(e) => {
2173                                            log::error!(
2174                                                "Failed to create db for write processing: {:?}",
2175                                                e
2176                                            );
2177                                        }
2178                                    }
2179                                });
2180                            }
2181                        }
2182                    }
2183                }
2184            }
2185        }) as Box<dyn FnMut(JsValue)>);
2186
2187        let callback_fn = callback.as_ref().unchecked_ref();
2188        register_write_queue_listener(db_name, callback_fn).map_err(|e| {
2189            JsValue::from_str(&format!("Failed to register write queue listener: {}", e))
2190        })?;
2191
2192        callback.forget();
2193
2194        Ok(())
2195    }
2196
2197    #[wasm_bindgen]
2198    pub async fn execute(&mut self, sql: &str) -> Result<JsValue, JsValue> {
2199        // Check write permission before executing
2200        self.check_write_permission(sql)
2201            .await
2202            .map_err(|e| JsValue::from_str(&format!("Write permission denied: {}", e)))?;
2203
2204        let result = self
2205            .execute_internal(sql)
2206            .await
2207            .map_err(|e| JsValue::from_str(&format!("Query execution failed: {}", e)))?;
2208        serde_wasm_bindgen::to_value(&result).map_err(|e| JsValue::from_str(&e.to_string()))
2209    }
2210
2211    #[wasm_bindgen(js_name = "executeWithParams")]
2212    pub async fn execute_with_params(
2213        &mut self,
2214        sql: &str,
2215        params: JsValue,
2216    ) -> Result<JsValue, JsValue> {
2217        let params: Vec<ColumnValue> = serde_wasm_bindgen::from_value(params)
2218            .map_err(|e| JsValue::from_str(&format!("Invalid parameters: {}", e)))?;
2219
2220        // Check write permission before executing
2221        self.check_write_permission(sql)
2222            .await
2223            .map_err(|e| JsValue::from_str(&format!("Write permission denied: {}", e)))?;
2224
2225        let result = self
2226            .execute_with_params_internal(sql, &params)
2227            .await
2228            .map_err(|e| JsValue::from_str(&format!("Query execution failed: {}", e)))?;
2229        serde_wasm_bindgen::to_value(&result).map_err(|e| JsValue::from_str(&e.to_string()))
2230    }
2231
2232    #[wasm_bindgen]
2233    pub async fn close(&mut self) -> Result<(), JsValue> {
2234        self.close_internal()
2235            .await
2236            .map_err(|e| JsValue::from_str(&format!("Failed to close database: {}", e)))
2237    }
2238
2239    /// Force close connection and remove from pool (for test cleanup)
2240    #[wasm_bindgen(js_name = "forceCloseConnection")]
2241    pub async fn force_close_connection(&mut self) -> Result<(), JsValue> {
2242        // First do normal close to cleanup
2243        let _ = self.close_internal().await;
2244
2245        // Then force-remove from connection pool
2246        // Pool uses name without .db, so strip it
2247        let pool_key = self.name.trim_end_matches(".db");
2248        crate::connection_pool::force_close_connection(pool_key);
2249
2250        // CRITICAL: Single source of truth for ALL cleanup
2251        #[cfg(target_arch = "wasm32")]
2252        {
2253            crate::cleanup::cleanup_all_state(pool_key)
2254                .await
2255                .map_err(|e| JsValue::from_str(&format!("Cleanup failed: {}", e)))?;
2256        }
2257        log::info!("Force closed and removed connection for: {}", self.name);
2258        Ok(())
2259    }
2260
2261    #[wasm_bindgen]
2262    pub async fn sync(&mut self) -> Result<(), JsValue> {
2263        self.sync_internal()
2264            .await
2265            .map_err(|e| JsValue::from_str(&format!("Failed to sync database: {}", e)))
2266    }
2267
2268    /// Allow non-leader writes (for single-tab apps or testing)
2269    #[wasm_bindgen(js_name = "allowNonLeaderWrites")]
2270    pub async fn allow_non_leader_writes(&mut self, allow: bool) -> Result<(), JsValue> {
2271        log::debug!("Setting allowNonLeaderWrites = {} for {}", allow, self.name);
2272        self.allow_non_leader_writes = allow;
2273        Ok(())
2274    }
2275
2276    /// Export database to SQLite .db file format
2277    ///
2278    /// Returns the complete database as a Uint8Array that can be downloaded
2279    /// or saved as a standard SQLite .db file.
2280    ///
2281    /// # Example
2282    /// ```javascript
2283    /// const dbBytes = await db.exportToFile();
2284    /// const blob = new Blob([dbBytes], { type: 'application/x-sqlite3' });
2285    /// const url = URL.createObjectURL(blob);
2286    /// const a = document.createElement('a');
2287    /// a.href = url;
2288    /// a.download = 'database.db';
2289    /// a.click();
2290    /// ```
2291    #[wasm_bindgen(js_name = "exportToFile")]
2292    pub async fn export_to_file(&self) -> Result<js_sys::Uint8Array, JsValue> {
2293        let db_name = self.name.clone();
2294        let max_export_size = self.max_export_size_bytes;
2295
2296        log::info!("[EXPORT] ===== Step 1: Acquiring lock");
2297
2298        // Acquire lock FIRST to serialize operations
2299        let _guard = weblocks::acquire(&db_name, weblocks::AcquireOptions::exclusive()).await?;
2300        log::info!("[EXPORT] ===== Step 2: Lock acquired");
2301
2302        // Get storage and sync AFTER lock - this ensures only one export syncs at a time
2303        use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2304        log::info!("[EXPORT] ===== Step 3: Getting storage");
2305        let storage_rc = get_storage_with_fallback(&db_name).ok_or_else(|| {
2306            JsValue::from_str(&format!("Storage not found for database: {}", db_name))
2307        })?;
2308        log::info!("[EXPORT] ===== Step 4: Got storage, reloading cache");
2309
2310        // Reload cache from GLOBAL_STORAGE
2311        #[cfg(target_arch = "wasm32")]
2312        {
2313            storage_rc.reload_cache_from_global_storage();
2314        }
2315
2316        // CRITICAL: Checkpoint WAL to flush SQLite data to VFS blocks before export
2317        // Without this, data stays in SQLite's WAL buffer and doesn't appear in exported bytes
2318        log::info!("[EXPORT] ===== Step 5: Checkpointing WAL");
2319        if !self.connection_state.db.get().is_null() {
2320            // Use raw SQLite call since export_to_file takes &self, not &mut self
2321            use std::ffi::CString;
2322            let pragma = CString::new("PRAGMA wal_checkpoint(PASSIVE)").unwrap();
2323            unsafe {
2324                let mut stmt = std::ptr::null_mut();
2325                let rc = sqlite_wasm_rs::sqlite3_prepare_v2(
2326                    self.connection_state.db.get(),
2327                    pragma.as_ptr(),
2328                    -1,
2329                    &mut stmt,
2330                    std::ptr::null_mut(),
2331                );
2332                if rc == sqlite_wasm_rs::SQLITE_OK && !stmt.is_null() {
2333                    sqlite_wasm_rs::sqlite3_step(stmt);
2334                    sqlite_wasm_rs::sqlite3_finalize(stmt);
2335                    log::info!("[EXPORT] WAL checkpoint completed");
2336                } else {
2337                    log::warn!("[EXPORT] WAL checkpoint failed with rc: {}", rc);
2338                }
2339            }
2340        }
2341
2342        log::info!("[EXPORT] ===== Step 6: Starting sync");
2343        // Sync to ensure all data is persisted before export
2344        storage_rc
2345            .sync()
2346            .await
2347            .map_err(|e| JsValue::from_str(&format!("Sync failed: {}", e)))?;
2348        log::info!("[EXPORT] ===== Step 7: Sync complete");
2349
2350        // Export with configured size limit
2351        log::info!("[EXPORT] Calling export_database_to_bytes");
2352        let db_bytes = {
2353            let storage = &*storage_rc;
2354            crate::storage::export::export_database_to_bytes(storage, max_export_size)
2355                .await
2356                .map_err(|e| {
2357                    log::error!("[EXPORT] Export failed: {}", e);
2358                    JsValue::from_str(&format!("Export failed: {}", e))
2359                })?
2360        };
2361
2362        log::info!("[EXPORT] Export complete: {} bytes", db_bytes.len());
2363
2364        let uint8_array = js_sys::Uint8Array::new_with_length(db_bytes.len() as u32);
2365        uint8_array.copy_from(&db_bytes);
2366
2367        Ok(uint8_array)
2368    }
2369
2370    /// Test method for concurrent locking - simple increment counter
2371    #[wasm_bindgen(js_name = "testLock")]
2372    pub async fn test_lock(&self, value: u32) -> Result<u32, JsValue> {
2373        let lock_name = format!("{}.lock_test", self.name);
2374
2375        log::info!(
2376            "[LOCK-TEST] Acquiring lock: {} with value: {}",
2377            lock_name,
2378            value
2379        );
2380        let _guard = weblocks::acquire(&lock_name, weblocks::AcquireOptions::exclusive()).await?;
2381        log::info!("[LOCK-TEST] Lock acquired, processing value: {}", value);
2382
2383        // Simulate some work
2384        let result = value + 1;
2385
2386        // Small delay to test serialization
2387        let delay_promise = js_sys::Promise::new(&mut |resolve, _reject| {
2388            let window = web_sys::window().unwrap();
2389            let _ = window
2390                .set_timeout_with_callback_and_timeout_and_arguments_0(resolve.unchecked_ref(), 10);
2391        });
2392        wasm_bindgen_futures::JsFuture::from(delay_promise).await?;
2393
2394        log::info!(
2395            "[LOCK-TEST] Lock releasing for: {} with result: {}",
2396            lock_name,
2397            result
2398        );
2399        Ok(result)
2400    }
2401
2402    /// Import SQLite database from .db file bytes
2403    ///
2404    /// Replaces the current database contents with the imported data.
2405    /// This will close the current database connection and clear all existing data.
2406    ///
2407    /// # Arguments
2408    /// * `file_data` - SQLite .db file as Uint8Array
2409    ///
2410    /// # Returns
2411    /// * `Ok(())` - Import successful
2412    /// * `Err(JsValue)` - Import failed (invalid file, validation error, etc.)
2413    ///
2414    /// # Example
2415    /// ```javascript
2416    /// // From file input
2417    /// const fileInput = document.getElementById('dbFile');
2418    /// const file = fileInput.files[0];
2419    /// const arrayBuffer = await file.arrayBuffer();
2420    /// const uint8Array = new Uint8Array(arrayBuffer);
2421    ///
2422    /// await db.importFromFile(uint8Array);
2423    ///
2424    /// // Database is now replaced - you may need to reopen connections
2425    /// ```
2426    ///
2427    /// # Warning
2428    /// This operation is destructive and will replace all existing database data.
2429    /// **IMPORTANT:** You MUST call `db.close()` after import and reopen the database
2430    /// for changes to take effect.
2431    #[wasm_bindgen(js_name = "importFromFile")]
2432    pub async fn import_from_file(&mut self, file_data: js_sys::Uint8Array) -> Result<(), JsValue> {
2433        log::info!("[IMPORT] Starting import with lock for: {}", self.name);
2434        let db_name = self.name.clone();
2435        let data = file_data.to_vec();
2436
2437        // Acquire lock FIRST to serialize operations
2438        let _guard = weblocks::acquire(&db_name, weblocks::AcquireOptions::exclusive()).await?;
2439        log::info!("[IMPORT] Lock acquired for: {}", db_name);
2440
2441        log::debug!("Import data size: {} bytes", data.len());
2442
2443        // CRITICAL: Force-close database connection BEFORE import
2444        // Must use force_close to remove from connection pool, not just decrement ref_count
2445        // Otherwise new Database instances will reuse stale SQLite connection
2446        log::debug!("Force-closing database connection before import");
2447
2448        // First do normal close to cleanup leader election etc
2449        self.close_internal()
2450            .await
2451            .map_err(|e| JsValue::from_str(&format!("Failed to close before import: {}", e)))?;
2452
2453        // Then force-remove from connection pool
2454        let pool_key = self.name.trim_end_matches(".db");
2455        crate::connection_pool::force_close_connection(pool_key);
2456
2457        // Mark our connection as null since we force-closed it
2458        self.connection_state.db.set(std::ptr::null_mut());
2459        log::debug!("Removed connection from pool for import");
2460
2461        // Call the import function with full name (WITH .db)
2462        crate::storage::import::import_database_from_bytes(&db_name, data)
2463            .await
2464            .map_err(|e| {
2465                log::error!("Import failed for {}: {}", db_name, e);
2466                JsValue::from_str(&format!("Import failed: {}", e))
2467            })?;
2468
2469        log::info!("[IMPORT] Import complete for: {}", db_name);
2470
2471        // Note: Database connection is closed. User must create a new Database instance
2472        // to use the imported data. This matches main branch behavior.
2473
2474        Ok(())
2475    }
2476
2477    /// Wait for this instance to become leader
2478    #[wasm_bindgen(js_name = "waitForLeadership")]
2479    pub async fn wait_for_leadership(&mut self) -> Result<(), JsValue> {
2480        use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2481
2482        // Track leader election attempt
2483        #[cfg(feature = "telemetry")]
2484        if let Some(ref metrics) = self.metrics {
2485            metrics.leader_election_attempts_total().inc();
2486        }
2487
2488        let db_name = &self.name;
2489        let start_time = js_sys::Date::now();
2490
2491        let timeout_ms = 5000.0; // 5 second timeout
2492
2493        loop {
2494            let storage_rc = get_storage_with_fallback(db_name);
2495
2496            if let Some(storage) = storage_rc {
2497                let is_leader =
2498                    match with_storage_async!(storage, "wait_for_leadership", |s| s.is_leader()) {
2499                        Some(v) => v,
2500                        None => continue,
2501                    };
2502
2503                if is_leader {
2504                    log::info!("Became leader for {}", db_name);
2505
2506                    // Record telemetry on successful leadership acquisition
2507                    #[cfg(feature = "telemetry")]
2508                    if let Some(ref metrics) = self.metrics {
2509                        let duration_ms = js_sys::Date::now() - start_time;
2510                        metrics.leader_election_duration().observe(duration_ms);
2511                        metrics.is_leader().set(1.0);
2512                        metrics.leadership_changes_total().inc();
2513                    }
2514
2515                    return Ok(());
2516                }
2517            }
2518
2519            // Check timeout
2520            if js_sys::Date::now() - start_time > timeout_ms {
2521                // Record telemetry on timeout
2522                #[cfg(feature = "telemetry")]
2523                if let Some(ref metrics) = self.metrics {
2524                    let duration_ms = js_sys::Date::now() - start_time;
2525                    metrics.leader_election_duration().observe(duration_ms);
2526                }
2527
2528                return Err(JsValue::from_str("Timeout waiting for leadership"));
2529            }
2530
2531            // Wait a bit before checking again
2532            let promise = js_sys::Promise::new(&mut |resolve, _| {
2533                let window = web_sys::window().expect("should have window");
2534                let _ = window.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, 100);
2535            });
2536            let _ = wasm_bindgen_futures::JsFuture::from(promise).await;
2537        }
2538    }
2539
2540    /// Request leadership (triggers re-election check)
2541    #[wasm_bindgen(js_name = "requestLeadership")]
2542    pub async fn request_leadership(&mut self) -> Result<(), JsValue> {
2543        use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2544
2545        let db_name = &self.name;
2546        log::debug!("Requesting leadership for {}", db_name);
2547
2548        // Record telemetry data before the request
2549        #[cfg(feature = "telemetry")]
2550        let telemetry_data = if self.metrics.is_some() {
2551            let start_time = js_sys::Date::now();
2552            let was_leader = self
2553                .is_leader_wasm()
2554                .await
2555                .ok()
2556                .and_then(|v| v.as_bool())
2557                .unwrap_or(false);
2558
2559            if let Some(ref metrics) = self.metrics {
2560                metrics.leader_elections_total().inc();
2561            }
2562
2563            Some((start_time, was_leader))
2564        } else {
2565            None
2566        };
2567
2568        let storage_rc = get_storage_with_fallback(db_name);
2569
2570        if let Some(storage) = storage_rc {
2571            {
2572                // Trigger leader election
2573                let result = with_storage_async!(storage, "request_leadership", |s| s
2574                    .start_leader_election())
2575                .ok_or_else(|| {
2576                    JsValue::from_str("Failed to acquire storage lock for leadership request")
2577                })?;
2578                result.map_err(|e| {
2579                    JsValue::from_str(&format!("Failed to request leadership: {}", e))
2580                })?;
2581
2582                log::debug!("Re-election triggered for {}", db_name);
2583            } // Drop the borrow here
2584
2585            // Record telemetry after election (after dropping borrow)
2586            #[cfg(feature = "telemetry")]
2587            if let Some((start_time, was_leader)) = telemetry_data {
2588                if let Some(ref metrics) = self.metrics {
2589                    // Record election duration
2590                    let duration_ms = js_sys::Date::now() - start_time;
2591                    metrics.leader_election_duration().observe(duration_ms);
2592
2593                    // Check if leadership status changed
2594                    let is_leader_now = self
2595                        .is_leader_wasm()
2596                        .await
2597                        .ok()
2598                        .and_then(|v| v.as_bool())
2599                        .unwrap_or(false);
2600
2601                    // Update is_leader gauge
2602                    metrics
2603                        .is_leader()
2604                        .set(if is_leader_now { 1.0 } else { 0.0 });
2605
2606                    // Track leadership changes
2607                    if was_leader != is_leader_now {
2608                        metrics.leadership_changes_total().inc();
2609                    }
2610                }
2611            }
2612
2613            Ok(())
2614        } else {
2615            Err(JsValue::from_str(&format!(
2616                "No storage found for database: {}",
2617                db_name
2618            )))
2619        }
2620    }
2621
2622    /// Get leader information
2623    #[wasm_bindgen(js_name = "getLeaderInfo")]
2624    pub async fn get_leader_info(&mut self) -> Result<JsValue, JsValue> {
2625        use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2626
2627        let db_name = &self.name;
2628
2629        let storage_rc = get_storage_with_fallback(db_name);
2630
2631        if let Some(storage) = storage_rc {
2632            let is_leader = with_storage_async!(storage, "get_leader_info", |s| s.is_leader())
2633                .ok_or_else(|| {
2634                    JsValue::from_str(&format!(
2635                        "Failed to access storage for database: {}",
2636                        db_name
2637                    ))
2638                })?;
2639
2640            // Get leader info - we'll use simpler data for now
2641            // Real implementation would need public getters on BlockStorage
2642            let leader_id_str = if is_leader {
2643                format!("leader_{}", db_name)
2644            } else {
2645                "unknown".to_string()
2646            };
2647
2648            // Create JavaScript object
2649            let obj = js_sys::Object::new();
2650            js_sys::Reflect::set(&obj, &"isLeader".into(), &JsValue::from_bool(is_leader))?;
2651            js_sys::Reflect::set(&obj, &"leaderId".into(), &JsValue::from_str(&leader_id_str))?;
2652            js_sys::Reflect::set(
2653                &obj,
2654                &"leaseExpiry".into(),
2655                &JsValue::from_f64(js_sys::Date::now()),
2656            )?;
2657
2658            Ok(obj.into())
2659        } else {
2660            Err(JsValue::from_str(&format!(
2661                "No storage found for database: {}",
2662                db_name
2663            )))
2664        }
2665    }
2666
2667    /// Queue a write operation to be executed by the leader
2668    ///
2669    /// Non-leader tabs can use this to request writes from the leader.
2670    /// The write is forwarded via BroadcastChannel and executed by the leader.
2671    ///
2672    /// # Arguments
2673    /// * `sql` - SQL statement to execute (must be a write operation)
2674    ///
2675    /// # Returns
2676    /// Result indicating success or failure
2677    #[wasm_bindgen(js_name = "queueWrite")]
2678    pub async fn queue_write(&mut self, sql: String) -> Result<(), JsValue> {
2679        self.queue_write_with_timeout(sql, 5000).await
2680    }
2681
2682    /// Queue a write operation with a specific timeout
2683    ///
2684    /// # Arguments
2685    /// * `sql` - SQL statement to execute
2686    /// * `timeout_ms` - Timeout in milliseconds
2687    #[wasm_bindgen(js_name = "queueWriteWithTimeout")]
2688    pub async fn queue_write_with_timeout(
2689        &mut self,
2690        sql: String,
2691        timeout_ms: u32,
2692    ) -> Result<(), JsValue> {
2693        use crate::storage::write_queue::{WriteQueueMessage, WriteResponse, send_write_request};
2694        use std::cell::RefCell;
2695        use std::rc::Rc;
2696
2697        log::debug!("Queuing write: {}", sql);
2698
2699        // Check if we're the leader - if so, just execute directly
2700        use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2701        let is_leader = {
2702            let storage_rc = get_storage_with_fallback(&self.name);
2703
2704            if let Some(storage) = storage_rc {
2705                with_storage_async!(storage, "queue_write_is_leader", |s| s.is_leader())
2706                    .unwrap_or(false)
2707            } else {
2708                false
2709            }
2710        };
2711
2712        if is_leader {
2713            log::debug!("We are leader, executing directly");
2714            return self
2715                .execute_internal(&sql)
2716                .await
2717                .map(|_| ())
2718                .map_err(|e| JsValue::from_str(&format!("Execute failed: {}", e)));
2719        }
2720
2721        // Send write request to leader
2722        let request_id = send_write_request(&self.name, &sql)
2723            .map_err(|e| JsValue::from_str(&format!("Failed to send write request: {}", e)))?;
2724
2725        log::debug!("Write request sent with ID: {}", request_id);
2726
2727        // Wait for response with timeout
2728        let response_received = Rc::new(RefCell::new(false));
2729        let response_error = Rc::new(RefCell::new(None::<String>));
2730
2731        let response_received_clone = response_received.clone();
2732        let response_error_clone = response_error.clone();
2733        let request_id_clone = request_id.clone();
2734
2735        // Set up listener for response
2736        let callback = Closure::wrap(Box::new(move |msg: JsValue| {
2737            // Parse the message
2738            if let Ok(json_str) = js_sys::JSON::stringify(&msg) {
2739                if let Some(json_str) = json_str.as_string() {
2740                    if let Ok(message) = serde_json::from_str::<WriteQueueMessage>(&json_str) {
2741                        if let WriteQueueMessage::WriteResponse(response) = message {
2742                            match response {
2743                                WriteResponse::Success { request_id, .. } => {
2744                                    if request_id == request_id_clone {
2745                                        *response_received_clone.borrow_mut() = true;
2746                                        log::debug!("Write response received: Success");
2747                                    }
2748                                }
2749                                WriteResponse::Error {
2750                                    request_id,
2751                                    error_message,
2752                                } => {
2753                                    if request_id == request_id_clone {
2754                                        *response_received_clone.borrow_mut() = true;
2755                                        *response_error_clone.borrow_mut() = Some(error_message);
2756                                        log::debug!("Write response received: Error");
2757                                    }
2758                                }
2759                            }
2760                        }
2761                    }
2762                }
2763            }
2764        }) as Box<dyn FnMut(JsValue)>);
2765
2766        // Register listener
2767        use crate::storage::write_queue::register_write_queue_listener;
2768        let callback_fn = callback.as_ref().unchecked_ref();
2769        register_write_queue_listener(&self.name, callback_fn)
2770            .map_err(|e| JsValue::from_str(&format!("Failed to register listener: {}", e)))?;
2771
2772        // Keep callback alive
2773        callback.forget();
2774
2775        // Wait for response with polling (timeout_ms)
2776        let start_time = js_sys::Date::now();
2777        let timeout_f64 = timeout_ms as f64;
2778
2779        loop {
2780            // Check if response received
2781            if *response_received.borrow() {
2782                if let Some(error_msg) = response_error.borrow().as_ref() {
2783                    return Err(JsValue::from_str(&format!("Write failed: {}", error_msg)));
2784                }
2785                log::info!("Write completed successfully");
2786                return Ok(());
2787            }
2788
2789            // Check timeout
2790            let elapsed = js_sys::Date::now() - start_time;
2791            if elapsed > timeout_f64 {
2792                return Err(JsValue::from_str("Write request timed out"));
2793            }
2794
2795            // Wait a bit before checking again
2796            wasm_bindgen_futures::JsFuture::from(js_sys::Promise::new(&mut |resolve, _reject| {
2797                if let Some(window) = web_sys::window() {
2798                    let _ =
2799                        window.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, 100);
2800                } else {
2801                    log::error!("Window unavailable in timeout handler");
2802                }
2803            }))
2804            .await
2805            .ok();
2806        }
2807    }
2808
2809    #[wasm_bindgen(js_name = "isLeader")]
2810    pub async fn is_leader_wasm(&self) -> Result<JsValue, JsValue> {
2811        // Get the storage from STORAGE_REGISTRY
2812        use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2813
2814        let db_name = &self.name;
2815        log::debug!("isLeader() called for database: {} (self.name)", db_name);
2816
2817        let storage_rc = get_storage_with_fallback(db_name);
2818
2819        if let Some(storage) = storage_rc {
2820            log::debug!("Found storage for {}, calling is_leader()", db_name);
2821            let is_leader = with_storage_async!(storage, "is_leader_wasm", |s| s.is_leader())
2822                .ok_or_else(|| {
2823                    JsValue::from_str(&format!(
2824                        "Failed to access storage for database: {}",
2825                        db_name
2826                    ))
2827                })?;
2828            log::debug!("is_leader() = {} for {}", is_leader, db_name);
2829
2830            // Return as JsValue boolean
2831            Ok(JsValue::from_bool(is_leader))
2832        } else {
2833            log::error!("ERROR: No storage found for database: {}", db_name);
2834            Err(JsValue::from_str(&format!(
2835                "No storage found for database: {}",
2836                db_name
2837            )))
2838        }
2839    }
2840
2841    /// Check if this instance is the leader (non-wasm version for internal use/tests)
2842    pub async fn is_leader(&self) -> Result<bool, JsValue> {
2843        let result = self.is_leader_wasm().await?;
2844        Ok(result.as_bool().unwrap_or(false))
2845    }
2846
2847    #[wasm_bindgen(js_name = "onDataChange")]
2848    pub fn on_data_change_wasm(&mut self, callback: &js_sys::Function) -> Result<(), JsValue> {
2849        log::debug!("Registering onDataChange callback for {}", self.name);
2850
2851        // Store the callback
2852        self.on_data_change_callback = Some(callback.clone());
2853
2854        // Register listener for BroadcastChannel notifications from other tabs
2855        use crate::storage::broadcast_notifications::register_change_listener;
2856
2857        let db_name = &self.name;
2858        register_change_listener(db_name, callback).map_err(|e| {
2859            JsValue::from_str(&format!("Failed to register change listener: {}", e))
2860        })?;
2861
2862        log::debug!("onDataChange callback registered for {}", self.name);
2863        Ok(())
2864    }
2865
2866    /// Enable or disable optimistic updates mode
2867    #[wasm_bindgen(js_name = "enableOptimisticUpdates")]
2868    pub async fn enable_optimistic_updates(&mut self, enabled: bool) -> Result<(), JsValue> {
2869        self.optimistic_updates_manager
2870            .borrow_mut()
2871            .set_enabled(enabled);
2872        log::debug!(
2873            "Optimistic updates {}",
2874            if enabled { "enabled" } else { "disabled" }
2875        );
2876        Ok(())
2877    }
2878
2879    /// Check if optimistic mode is enabled
2880    #[wasm_bindgen(js_name = "isOptimisticMode")]
2881    pub async fn is_optimistic_mode(&self) -> bool {
2882        self.optimistic_updates_manager.borrow().is_enabled()
2883    }
2884
2885    /// Track an optimistic write
2886    #[wasm_bindgen(js_name = "trackOptimisticWrite")]
2887    pub async fn track_optimistic_write(&mut self, sql: String) -> Result<String, JsValue> {
2888        let id = self
2889            .optimistic_updates_manager
2890            .borrow_mut()
2891            .track_write(sql);
2892        Ok(id)
2893    }
2894
2895    /// Get count of pending writes
2896    #[wasm_bindgen(js_name = "getPendingWritesCount")]
2897    pub async fn get_pending_writes_count(&self) -> usize {
2898        self.optimistic_updates_manager.borrow().get_pending_count()
2899    }
2900
2901    /// Clear all optimistic writes
2902    #[wasm_bindgen(js_name = "clearOptimisticWrites")]
2903    pub async fn clear_optimistic_writes(&mut self) -> Result<(), JsValue> {
2904        self.optimistic_updates_manager.borrow_mut().clear_all();
2905        Ok(())
2906    }
2907
2908    /// Enable or disable coordination metrics tracking
2909    #[wasm_bindgen(js_name = "enableCoordinationMetrics")]
2910    pub async fn enable_coordination_metrics(&mut self, enabled: bool) -> Result<(), JsValue> {
2911        self.coordination_metrics_manager
2912            .borrow_mut()
2913            .set_enabled(enabled);
2914        Ok(())
2915    }
2916
2917    /// Check if coordination metrics tracking is enabled
2918    #[wasm_bindgen(js_name = "isCoordinationMetricsEnabled")]
2919    pub async fn is_coordination_metrics_enabled(&self) -> bool {
2920        self.coordination_metrics_manager.borrow().is_enabled()
2921    }
2922
2923    /// Record a leadership change
2924    #[wasm_bindgen(js_name = "recordLeadershipChange")]
2925    pub async fn record_leadership_change(&mut self, became_leader: bool) -> Result<(), JsValue> {
2926        self.coordination_metrics_manager
2927            .borrow_mut()
2928            .record_leadership_change(became_leader);
2929        Ok(())
2930    }
2931
2932    /// Record a notification latency in milliseconds
2933    #[wasm_bindgen(js_name = "recordNotificationLatency")]
2934    pub async fn record_notification_latency(&mut self, latency_ms: f64) -> Result<(), JsValue> {
2935        self.coordination_metrics_manager
2936            .borrow_mut()
2937            .record_notification_latency(latency_ms);
2938        Ok(())
2939    }
2940
2941    /// Record a write conflict (non-leader write attempt)
2942    #[wasm_bindgen(js_name = "recordWriteConflict")]
2943    pub async fn record_write_conflict(&mut self) -> Result<(), JsValue> {
2944        self.coordination_metrics_manager
2945            .borrow_mut()
2946            .record_write_conflict();
2947        Ok(())
2948    }
2949
2950    /// Record a follower refresh
2951    #[wasm_bindgen(js_name = "recordFollowerRefresh")]
2952    pub async fn record_follower_refresh(&mut self) -> Result<(), JsValue> {
2953        self.coordination_metrics_manager
2954            .borrow_mut()
2955            .record_follower_refresh();
2956        Ok(())
2957    }
2958
2959    /// Get coordination metrics as JSON string
2960    #[wasm_bindgen(js_name = "getCoordinationMetrics")]
2961    pub async fn get_coordination_metrics(&self) -> Result<String, JsValue> {
2962        self.coordination_metrics_manager
2963            .borrow()
2964            .get_metrics_json()
2965            .map_err(|e| JsValue::from_str(&e))
2966    }
2967
2968    /// Reset all coordination metrics
2969    #[wasm_bindgen(js_name = "resetCoordinationMetrics")]
2970    pub async fn reset_coordination_metrics(&mut self) -> Result<(), JsValue> {
2971        self.coordination_metrics_manager.borrow_mut().reset();
2972        Ok(())
2973    }
2974}
2975
2976// Export WasmColumnValue for WASM
2977#[cfg(target_arch = "wasm32")]
2978#[wasm_bindgen]
2979pub struct WasmColumnValue {
2980    #[allow(dead_code)]
2981    inner: ColumnValue,
2982}
2983
2984#[cfg(target_arch = "wasm32")]
2985#[wasm_bindgen]
2986impl WasmColumnValue {
2987    #[wasm_bindgen(js_name = "createNull")]
2988    pub fn create_null() -> WasmColumnValue {
2989        WasmColumnValue {
2990            inner: ColumnValue::Null,
2991        }
2992    }
2993
2994    #[wasm_bindgen(js_name = "createInteger")]
2995    pub fn create_integer(value: i64) -> WasmColumnValue {
2996        WasmColumnValue {
2997            inner: ColumnValue::Integer(value),
2998        }
2999    }
3000
3001    #[wasm_bindgen(js_name = "createReal")]
3002    pub fn create_real(value: f64) -> WasmColumnValue {
3003        WasmColumnValue {
3004            inner: ColumnValue::Real(value),
3005        }
3006    }
3007
3008    #[wasm_bindgen(js_name = "createText")]
3009    pub fn create_text(value: String) -> WasmColumnValue {
3010        WasmColumnValue {
3011            inner: ColumnValue::Text(value),
3012        }
3013    }
3014
3015    #[wasm_bindgen(js_name = "createBlob")]
3016    pub fn create_blob(value: &[u8]) -> WasmColumnValue {
3017        WasmColumnValue {
3018            inner: ColumnValue::Blob(value.to_vec()),
3019        }
3020    }
3021
3022    #[wasm_bindgen(js_name = "createBigInt")]
3023    pub fn create_bigint(value: &str) -> WasmColumnValue {
3024        WasmColumnValue {
3025            inner: ColumnValue::BigInt(value.to_string()),
3026        }
3027    }
3028
3029    #[wasm_bindgen(js_name = "createDate")]
3030    pub fn create_date(timestamp: f64) -> WasmColumnValue {
3031        WasmColumnValue {
3032            inner: ColumnValue::Date(timestamp as i64),
3033        }
3034    }
3035
3036    #[wasm_bindgen(js_name = "fromJsValue")]
3037    pub fn from_js_value(value: &JsValue) -> WasmColumnValue {
3038        if value.is_null() || value.is_undefined() {
3039            WasmColumnValue {
3040                inner: ColumnValue::Null,
3041            }
3042        } else if let Some(s) = value.as_string() {
3043            // Check if it's a large number string
3044            if let Ok(parsed) = s.parse::<i64>() {
3045                WasmColumnValue {
3046                    inner: ColumnValue::Integer(parsed),
3047                }
3048            } else {
3049                WasmColumnValue {
3050                    inner: ColumnValue::Text(s),
3051                }
3052            }
3053        } else if let Some(n) = value.as_f64() {
3054            if n.fract() == 0.0 && n >= i64::MIN as f64 && n <= i64::MAX as f64 {
3055                WasmColumnValue {
3056                    inner: ColumnValue::Integer(n as i64),
3057                }
3058            } else {
3059                WasmColumnValue {
3060                    inner: ColumnValue::Real(n),
3061                }
3062            }
3063        } else if value.is_object() {
3064            // Check if it's a Date
3065            if js_sys::Date::new(value).get_time().is_finite() {
3066                let timestamp = js_sys::Date::new(value).get_time() as i64;
3067                WasmColumnValue {
3068                    inner: ColumnValue::Date(timestamp),
3069                }
3070            } else {
3071                // Convert to string for other objects
3072                WasmColumnValue {
3073                    inner: ColumnValue::Text(format!("{:?}", value)),
3074                }
3075            }
3076        } else {
3077            WasmColumnValue {
3078                inner: ColumnValue::Null,
3079            }
3080        }
3081    }
3082
3083    // --- Rust-friendly alias constructors used in wasm tests ---
3084    // These mirror the create* methods but with simpler names and
3085    // argument types matching test usage.
3086    pub fn null() -> WasmColumnValue {
3087        Self::create_null()
3088    }
3089
3090    // Tests call integer(42.0), so accept f64 and cast to i64.
3091    pub fn integer(value: f64) -> WasmColumnValue {
3092        Self::create_integer(value as i64)
3093    }
3094
3095    pub fn real(value: f64) -> WasmColumnValue {
3096        Self::create_real(value)
3097    }
3098
3099    pub fn text(value: String) -> WasmColumnValue {
3100        Self::create_text(value)
3101    }
3102
3103    pub fn blob(value: Vec<u8>) -> WasmColumnValue {
3104        Self::create_blob(&value)
3105    }
3106
3107    pub fn big_int(value: String) -> WasmColumnValue {
3108        Self::create_bigint(&value)
3109    }
3110
3111    pub fn date(timestamp_ms: f64) -> WasmColumnValue {
3112        Self::create_date(timestamp_ms)
3113    }
3114}