absurder_sql/
lib.rs

1#[cfg(target_arch = "wasm32")]
2use wasm_bindgen::prelude::*;
3
4// Conditional rusqlite import: same crate, different features
5// Make this public so child crates can use it
6// When encryption is enabled, rusqlite uses bundled-sqlcipher-vendored-openssl feature
7// When bundled-sqlite is enabled, rusqlite uses bundled feature
8#[cfg(all(not(target_arch = "wasm32"), any(feature = "bundled-sqlite", feature = "encryption", feature = "encryption-commoncrypto", feature = "encryption-ios")))]
9pub extern crate rusqlite;
10
11// Enable better panic messages and memory allocation
12#[cfg(feature = "console_error_panic_hook")]
13pub use console_error_panic_hook::set_once as set_panic_hook;
14
15#[cfg(feature = "wee_alloc")]
16#[global_allocator]
17static ALLOC: wee_alloc::WeeAlloc = wee_alloc::WeeAlloc::INIT;
18
19// Initialize logging infrastructure for WASM
20#[cfg(all(target_arch = "wasm32", feature = "console_log"))]
21#[wasm_bindgen(start)]
22pub fn init_logger() {
23    // Initialize console_log for browser logging
24    // Use Info level for production, Debug for development
25    #[cfg(debug_assertions)]
26    let log_level = log::Level::Debug;
27    #[cfg(not(debug_assertions))]
28    let log_level = log::Level::Info;
29    
30    console_log::init_with_level(log_level)
31        .expect("Failed to initialize console_log");
32    
33    log::info!("AbsurderSQL logging initialized at level: {:?}", log_level);
34}
35
36// Module declarations
37pub mod types;
38pub mod storage;
39pub mod vfs;
40#[cfg(not(target_arch = "wasm32"))]
41pub mod database;
42#[cfg(not(target_arch = "wasm32"))]
43pub use database::PreparedStatement;
44pub mod utils;
45#[cfg(feature = "telemetry")]
46pub mod telemetry;
47
48// Re-export main public API
49#[cfg(not(target_arch = "wasm32"))]
50pub use database::SqliteIndexedDB;
51
52// Type alias for native platforms
53#[cfg(not(target_arch = "wasm32"))]
54pub type Database = SqliteIndexedDB;
55
56pub use types::DatabaseConfig;
57pub use types::{QueryResult, ColumnValue, DatabaseError, TransactionOptions, Row};
58
59// Re-export VFS
60pub use vfs::indexeddb_vfs::IndexedDBVFS;
61
62// WASM Database implementation using sqlite-wasm-rs
63#[cfg(target_arch = "wasm32")]
64#[wasm_bindgen]
65pub struct Database {
66    db: *mut sqlite_wasm_rs::sqlite3,
67    #[allow(dead_code)]
68    name: String,
69    #[wasm_bindgen(skip)]
70    on_data_change_callback: Option<js_sys::Function>,
71    #[wasm_bindgen(skip)]
72    allow_non_leader_writes: bool,
73    #[wasm_bindgen(skip)]
74    optimistic_updates_manager: std::cell::RefCell<crate::storage::optimistic_updates::OptimisticUpdatesManager>,
75    #[wasm_bindgen(skip)]
76    coordination_metrics_manager: std::cell::RefCell<crate::storage::coordination_metrics::CoordinationMetricsManager>,
77    #[wasm_bindgen(skip)]
78    #[cfg(feature = "telemetry")]
79    metrics: Option<crate::telemetry::Metrics>,
80    #[wasm_bindgen(skip)]
81    #[cfg(feature = "telemetry")]
82    span_recorder: Option<crate::telemetry::SpanRecorder>,
83    #[wasm_bindgen(skip)]
84    #[cfg(feature = "telemetry")]
85    span_context: Option<crate::telemetry::SpanContext>,
86    #[wasm_bindgen(skip)]
87    max_export_size_bytes: Option<u64>,
88}
89
90#[cfg(target_arch = "wasm32")]
91impl Database {
92    /// Check if a SQL statement is a write operation
93    fn is_write_operation(sql: &str) -> bool {
94        let upper = sql.trim().to_uppercase();
95        upper.starts_with("INSERT") 
96            || upper.starts_with("UPDATE")
97            || upper.starts_with("DELETE")
98            || upper.starts_with("REPLACE")
99    }
100    
101    /// Get metrics for observability
102    ///
103    /// Returns a reference to the Metrics instance for tracking queries, errors, and performance
104    #[cfg(feature = "telemetry")]
105    pub fn metrics(&self) -> Option<&crate::telemetry::Metrics> {
106        self.metrics.as_ref()
107    }
108    
109    /// Check write permission - only leader can write (unless override enabled)
110    async fn check_write_permission(&mut self, sql: &str) -> Result<(), DatabaseError> {
111        if !Self::is_write_operation(sql) {
112            // Not a write operation, allow it
113            return Ok(());
114        }
115        
116        // Check if non-leader writes are allowed
117        if self.allow_non_leader_writes {
118            log::info!("WRITE_ALLOWED: Non-leader writes enabled for {}", self.name);
119            return Ok(());
120        }
121        
122        // Check if this instance is the leader
123        use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
124        
125        let db_name = &self.name;
126        let storage_rc = STORAGE_REGISTRY.with(|reg| {
127            let registry = reg.borrow();
128            registry.get(db_name).cloned()
129                .or_else(|| registry.get(&format!("{}.db", db_name)).cloned())
130                .or_else(|| {
131                    if db_name.ends_with(".db") {
132                        registry.get(&db_name[..db_name.len()-3]).cloned()
133                    } else {
134                        None
135                    }
136                })
137        });
138        
139        if let Some(storage) = storage_rc {
140            let mut storage_mut = storage.borrow_mut();
141            let is_leader = storage_mut.is_leader().await;
142            
143            if !is_leader {
144                log::error!("WRITE_DENIED: Instance is not leader for {}", db_name);
145                return Err(DatabaseError::new(
146                    "WRITE_PERMISSION_DENIED",
147                    "Only the leader tab can write to this database. Use db.isLeader() to check status or call db.allowNonLeaderWrites(true) for single-tab mode."
148                ));
149            }
150            
151            log::info!("WRITE_ALLOWED: Instance is leader for {}", db_name);
152            Ok(())
153        } else {
154            // No storage found - allow by default (single-instance mode)
155            log::info!("WRITE_ALLOWED: No storage found for {} (single-instance mode)", db_name);
156            Ok(())
157        }
158    }
159    
160    pub async fn new(config: DatabaseConfig) -> Result<Self, DatabaseError> {
161        use std::ffi::{CString, CStr};
162        
163        // Use IndexedDB VFS for persistent storage
164        log::debug!("Creating IndexedDBVFS for: {}", config.name);
165        let vfs = crate::vfs::IndexedDBVFS::new(&config.name).await?;
166        log::debug!("Registering VFS as 'indexeddb'");
167        vfs.register("indexeddb")?;
168        log::info!("VFS registered successfully");
169        
170        let mut db = std::ptr::null_mut();
171        let filename = if config.name.ends_with(".db") {
172            config.name.clone()
173        } else {
174            format!("{}.db", config.name)
175        };
176        
177        let db_name = CString::new(filename.clone())
178            .map_err(|_| DatabaseError::new("INVALID_NAME", "Invalid database name"))?;
179        let vfs_name = CString::new("indexeddb")
180            .map_err(|_| DatabaseError::new("INVALID_VFS", "Invalid VFS name"))?;
181        
182        log::debug!("Opening database: {} with VFS: indexeddb", filename);
183        let ret = unsafe {
184            sqlite_wasm_rs::sqlite3_open_v2(
185                db_name.as_ptr(),
186                &mut db as *mut _,
187                sqlite_wasm_rs::SQLITE_OPEN_READWRITE | sqlite_wasm_rs::SQLITE_OPEN_CREATE,
188                vfs_name.as_ptr()
189            )
190        };
191        log::debug!("sqlite3_open_v2 returned: {}", ret);
192        
193        if ret != sqlite_wasm_rs::SQLITE_OK {
194            let err_msg = unsafe {
195                let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(db);
196                if !msg_ptr.is_null() {
197                    CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
198                } else {
199                    "Unknown error".to_string()
200                }
201            };
202            return Err(DatabaseError::new("SQLITE_ERROR", &format!("Failed to open database with IndexedDB VFS: {}", err_msg)));
203        }
204        
205        log::info!("Database opened successfully with IndexedDB VFS");
206        
207        // Apply configuration options via PRAGMA statements
208        let exec_sql = |db: *mut sqlite_wasm_rs::sqlite3, sql: &str| -> Result<(), DatabaseError> {
209            let c_sql = CString::new(sql)
210                .map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL statement"))?;
211            
212            let ret = unsafe {
213                sqlite_wasm_rs::sqlite3_exec(
214                    db,
215                    c_sql.as_ptr(),
216                    None,
217                    std::ptr::null_mut(),
218                    std::ptr::null_mut()
219                )
220            };
221            
222            if ret != sqlite_wasm_rs::SQLITE_OK {
223                let err_msg = unsafe {
224                    let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(db);
225                    if !msg_ptr.is_null() {
226                        CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
227                    } else {
228                        "Unknown error".to_string()
229                    }
230                };
231                log::warn!("Failed to execute SQL '{}': {}", sql, err_msg);
232                return Err(DatabaseError::new("SQLITE_ERROR", &format!("Failed to execute: {}", err_msg)));
233            }
234            Ok(())
235        };
236        
237        // Apply page_size (must be set before any tables are created)
238        if let Some(page_size) = config.page_size {
239            log::debug!("Setting page_size to {}", page_size);
240            exec_sql(db, &format!("PRAGMA page_size = {}", page_size))?;
241        }
242        
243        // Apply cache_size
244        if let Some(cache_size) = config.cache_size {
245            log::debug!("Setting cache_size to {}", cache_size);
246            exec_sql(db, &format!("PRAGMA cache_size = {}", cache_size))?;
247        }
248        
249        // Apply journal_mode
250        // WAL mode is now fully supported via shared memory (xShm*) implementation
251        if let Some(ref journal_mode) = config.journal_mode {
252            log::debug!("Setting journal_mode to {}", journal_mode);
253            
254            let pragma_sql = format!("PRAGMA journal_mode = {}", journal_mode);
255            let c_sql = CString::new(pragma_sql.as_str())
256                .map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL statement"))?;
257            
258            let mut stmt: *mut sqlite_wasm_rs::sqlite3_stmt = std::ptr::null_mut();
259            let ret = unsafe {
260                sqlite_wasm_rs::sqlite3_prepare_v2(
261                    db,
262                    c_sql.as_ptr(),
263                    -1,
264                    &mut stmt as *mut _,
265                    std::ptr::null_mut()
266                )
267            };
268            
269            if ret == sqlite_wasm_rs::SQLITE_OK && !stmt.is_null() {
270                let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
271                if step_ret == sqlite_wasm_rs::SQLITE_ROW {
272                    let result_ptr = unsafe { sqlite_wasm_rs::sqlite3_column_text(stmt, 0) };
273                    if !result_ptr.is_null() {
274                        let result_mode = unsafe {
275                            std::ffi::CStr::from_ptr(result_ptr as *const i8)
276                                .to_string_lossy()
277                                .to_uppercase()
278                        };
279                        
280                        if result_mode != journal_mode.to_uppercase() {
281                            log::warn!(
282                                "journal_mode {} requested but SQLite set {}",
283                                journal_mode, result_mode
284                            );
285                        } else {
286                            log::info!("journal_mode successfully set to {}", result_mode);
287                        }
288                    }
289                }
290                unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
291            } else {
292                log::warn!("Failed to prepare journal_mode PRAGMA");
293            }
294        }
295        
296        // Apply auto_vacuum (must be set before any tables are created)
297        if let Some(auto_vacuum) = config.auto_vacuum {
298            let vacuum_mode = if auto_vacuum { 1 } else { 0 }; // 0=none, 1=full, 2=incremental
299            log::debug!("Setting auto_vacuum to {}", vacuum_mode);
300            exec_sql(db, &format!("PRAGMA auto_vacuum = {}", vacuum_mode))?;
301        }
302        
303        log::info!("Database configuration applied successfully");
304        
305        // Initialize metrics for telemetry
306        #[cfg(feature = "telemetry")]
307        let metrics = crate::telemetry::Metrics::new()
308            .map_err(|e| DatabaseError::new("METRICS_ERROR", &format!("Failed to initialize metrics: {}", e)))?;
309        
310        Ok(Database {
311            db,
312            name: config.name.clone(),
313            on_data_change_callback: None,
314            allow_non_leader_writes: false,
315            optimistic_updates_manager: std::cell::RefCell::new(crate::storage::optimistic_updates::OptimisticUpdatesManager::new()),
316            coordination_metrics_manager: std::cell::RefCell::new(crate::storage::coordination_metrics::CoordinationMetricsManager::new()),
317            #[cfg(feature = "telemetry")]
318            metrics: Some(metrics),
319            #[cfg(feature = "telemetry")]
320            span_recorder: None,
321            #[cfg(feature = "telemetry")]
322            span_context: Some(crate::telemetry::SpanContext::new()),
323            max_export_size_bytes: config.max_export_size_bytes,
324        })
325    }
326    
327    /// Open a database with a specific VFS
328    pub async fn open_with_vfs(filename: &str, vfs_name: &str) -> Result<Self, DatabaseError> {
329        use std::ffi::CString;
330        
331        log::info!("Opening database {} with VFS {}", filename, vfs_name);
332        
333        let mut db: *mut sqlite_wasm_rs::sqlite3 = std::ptr::null_mut();
334        let db_name = CString::new(filename)
335            .map_err(|_| DatabaseError::new("INVALID_NAME", "Invalid database name"))?;
336        let vfs_cstr = CString::new(vfs_name)
337            .map_err(|_| DatabaseError::new("INVALID_VFS", "Invalid VFS name"))?;
338        
339        let ret = unsafe {
340            sqlite_wasm_rs::sqlite3_open_v2(
341                db_name.as_ptr(),
342                &mut db as *mut _,
343                sqlite_wasm_rs::SQLITE_OPEN_READWRITE | sqlite_wasm_rs::SQLITE_OPEN_CREATE,
344                vfs_cstr.as_ptr()
345            )
346        };
347        
348        if ret != sqlite_wasm_rs::SQLITE_OK {
349            let err_msg = if !db.is_null() {
350                unsafe {
351                    let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(db);
352                    if !msg_ptr.is_null() {
353                        std::ffi::CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
354                    } else {
355                        "Unknown error".to_string()
356                    }
357                }
358            } else {
359                "Failed to open database".to_string()
360            };
361            return Err(DatabaseError::new("SQLITE_ERROR", &err_msg));
362        }
363        
364        // Extract database name from filename (strip "file:" prefix if present)
365        let name = filename.strip_prefix("file:").unwrap_or(filename)
366            .strip_suffix(".db").unwrap_or(filename)
367            .to_string();
368        
369        log::info!("Successfully opened database {} with VFS {}", name, vfs_name);
370        
371        // Initialize metrics for telemetry
372        #[cfg(feature = "telemetry")]
373        let metrics = crate::telemetry::Metrics::new()
374            .map_err(|e| DatabaseError::new("METRICS_ERROR", &format!("Failed to initialize metrics: {}", e)))?;
375        
376        Ok(Database {
377            db,
378            name,
379            on_data_change_callback: None,
380            allow_non_leader_writes: false,
381            optimistic_updates_manager: std::cell::RefCell::new(crate::storage::optimistic_updates::OptimisticUpdatesManager::new()),
382            coordination_metrics_manager: std::cell::RefCell::new(crate::storage::coordination_metrics::CoordinationMetricsManager::new()),
383            #[cfg(feature = "telemetry")]
384            metrics: Some(metrics),
385            #[cfg(feature = "telemetry")]
386            span_recorder: None,
387            #[cfg(feature = "telemetry")]
388            span_context: Some(crate::telemetry::SpanContext::new()),
389            max_export_size_bytes: Some(2 * 1024 * 1024 * 1024), // Default 2GB limit
390        })
391    }
392    
393    pub async fn execute_internal(&mut self, sql: &str) -> Result<QueryResult, DatabaseError> {
394        use std::ffi::CString;
395        let start_time = js_sys::Date::now();
396        
397        // Create span for query execution and enter context
398        #[cfg(feature = "telemetry")]
399        let span = if self.span_recorder.is_some() {
400            let query_type = sql.trim().split_whitespace().next().unwrap_or("UNKNOWN").to_uppercase();
401            let mut builder = crate::telemetry::SpanBuilder::new("execute_query".to_string())
402                .with_attribute("query_type", query_type.clone())
403                .with_attribute("sql", sql.to_string());
404            
405            // Attach baggage from context
406            if let Some(ref context) = self.span_context {
407                builder = builder.with_baggage_from_context(context);
408            }
409            
410            let span = builder.build();
411            
412            // Enter span context
413            if let Some(ref context) = self.span_context {
414                context.enter_span(span.span_id.clone());
415            }
416            
417            Some(span)
418        } else {
419            None
420        };
421        
422        // Track query execution metrics
423        #[cfg(feature = "telemetry")]
424        #[cfg(feature = "telemetry")]
425        if let Some(metrics) = &self.metrics {
426            metrics.queries_total().inc();
427        }
428        
429        let sql_cstr = CString::new(sql)
430            .map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL string"))?;
431        
432        if sql.trim().to_uppercase().starts_with("SELECT") {
433            let mut stmt = std::ptr::null_mut();
434            let ret = unsafe {
435                sqlite_wasm_rs::sqlite3_prepare_v2(
436                    self.db,
437                    sql_cstr.as_ptr(),
438                    -1,
439                    &mut stmt,
440                    std::ptr::null_mut()
441                )
442            };
443            
444            if ret != sqlite_wasm_rs::SQLITE_OK {
445                // Track error
446                #[cfg(feature = "telemetry")]
447        #[cfg(feature = "telemetry")]
448                if let Some(metrics) = &self.metrics {
449                    metrics.errors_total().inc();
450                }
451                
452                // Finish span with error
453                #[cfg(feature = "telemetry")]
454                if let Some(mut s) = span {
455                    s.status = crate::telemetry::SpanStatus::Error("Failed to prepare statement".to_string());
456                    s.end_time_ms = Some(js_sys::Date::now());
457                    if let Some(recorder) = &self.span_recorder {
458                        recorder.record_span(s);
459                    }
460                    
461                    // Exit span context
462                    if let Some(ref context) = self.span_context {
463                        context.exit_span();
464                    }
465                }
466                
467                return Err(DatabaseError::new("SQLITE_ERROR", "Failed to prepare statement").with_sql(sql));
468            }
469            
470            let column_count = unsafe { sqlite_wasm_rs::sqlite3_column_count(stmt) };
471            let mut columns = Vec::new();
472            let mut rows = Vec::new();
473            
474            // Get column names
475            for i in 0..column_count {
476                let col_name = unsafe {
477                    let name_ptr = sqlite_wasm_rs::sqlite3_column_name(stmt, i);
478                    if name_ptr.is_null() {
479                        format!("col_{}", i)
480                    } else {
481                        std::ffi::CStr::from_ptr(name_ptr).to_string_lossy().into_owned()
482                    }
483                };
484                columns.push(col_name);
485            }
486            
487            // Execute and fetch rows
488            loop {
489                let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
490                if step_ret == sqlite_wasm_rs::SQLITE_ROW {
491                    let mut values = Vec::new();
492                    for i in 0..column_count {
493                        let value = unsafe {
494                            let col_type = sqlite_wasm_rs::sqlite3_column_type(stmt, i);
495                            match col_type {
496                                sqlite_wasm_rs::SQLITE_NULL => ColumnValue::Null,
497                                sqlite_wasm_rs::SQLITE_INTEGER => {
498                                    let val = sqlite_wasm_rs::sqlite3_column_int64(stmt, i);
499                                    ColumnValue::Integer(val)
500                                },
501                                sqlite_wasm_rs::SQLITE_FLOAT => {
502                                    let val = sqlite_wasm_rs::sqlite3_column_double(stmt, i);
503                                    ColumnValue::Real(val)
504                                },
505                                sqlite_wasm_rs::SQLITE_TEXT => {
506                                    let text_ptr = sqlite_wasm_rs::sqlite3_column_text(stmt, i);
507                                    if text_ptr.is_null() {
508                                        ColumnValue::Null
509                                    } else {
510                                        let text = std::ffi::CStr::from_ptr(text_ptr as *const i8).to_string_lossy().into_owned();
511                                        ColumnValue::Text(text)
512                                    }
513                                },
514                                sqlite_wasm_rs::SQLITE_BLOB => {
515                                    let blob_ptr = sqlite_wasm_rs::sqlite3_column_blob(stmt, i);
516                                    let blob_size = sqlite_wasm_rs::sqlite3_column_bytes(stmt, i);
517                                    if blob_ptr.is_null() || blob_size == 0 {
518                                        ColumnValue::Blob(vec![])
519                                    } else {
520                                        let blob_slice = std::slice::from_raw_parts(blob_ptr as *const u8, blob_size as usize);
521                                        ColumnValue::Blob(blob_slice.to_vec())
522                                    }
523                                },
524                                _ => ColumnValue::Null,
525                            }
526                        };
527                        values.push(value);
528                    }
529                    rows.push(Row { values });
530                } else if step_ret == sqlite_wasm_rs::SQLITE_DONE {
531                    break;
532                } else {
533                    // Get SQLite error message before finalizing
534                    let err_msg = unsafe {
535                        let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db);
536                        if !err_ptr.is_null() {
537                            std::ffi::CStr::from_ptr(err_ptr)
538                                .to_string_lossy()
539                                .to_string()
540                        } else {
541                            "Unknown SQLite error".to_string()
542                        }
543                    };
544                    unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
545                    // Track error
546        #[cfg(feature = "telemetry")]
547                    if let Some(metrics) = &self.metrics {
548                        metrics.errors_total().inc();
549                    }
550                    return Err(DatabaseError::new("SQLITE_ERROR", &format!("Error executing SELECT statement: {}", err_msg)).with_sql(sql));
551                }
552            }
553            
554            unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
555            let execution_time_ms = js_sys::Date::now() - start_time;
556            
557            // Track query duration
558        #[cfg(feature = "telemetry")]
559            if let Some(metrics) = &self.metrics {
560                metrics.query_duration().observe(execution_time_ms);
561            }
562            
563            Ok(QueryResult {
564                columns,
565                rows,
566                affected_rows: 0,
567                last_insert_id: None,
568                execution_time_ms,
569            })
570        } else {
571            // Non-SELECT statements - Use prepare/step to properly handle PRAGMA results
572            let mut stmt: *mut sqlite_wasm_rs::sqlite3_stmt = std::ptr::null_mut();
573            let ret = unsafe {
574                sqlite_wasm_rs::sqlite3_prepare_v2(
575                    self.db,
576                    sql_cstr.as_ptr(),
577                    -1,
578                    &mut stmt,
579                    std::ptr::null_mut()
580                )
581            };
582            
583            if ret != sqlite_wasm_rs::SQLITE_OK {
584                // Track error
585        #[cfg(feature = "telemetry")]
586                if let Some(metrics) = &self.metrics {
587                    metrics.errors_total().inc();
588                }
589                return Err(DatabaseError::new("SQLITE_ERROR", "Failed to prepare statement").with_sql(sql));
590            }
591            
592            // Get column info for PRAGMA statements that return results
593            let column_count = unsafe { sqlite_wasm_rs::sqlite3_column_count(stmt) };
594            let mut columns = Vec::new();
595            let mut rows = Vec::new();
596            
597            if column_count > 0 {
598                // This is a PRAGMA or other statement that returns rows
599                for i in 0..column_count {
600                    let col_name = unsafe {
601                        let name_ptr = sqlite_wasm_rs::sqlite3_column_name(stmt, i);
602                        if name_ptr.is_null() {
603                            format!("column_{}", i)
604                        } else {
605                            std::ffi::CStr::from_ptr(name_ptr).to_string_lossy().into_owned()
606                        }
607                    };
608                    columns.push(col_name);
609                }
610                
611                // Fetch all rows
612                loop {
613                    let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
614                    if step_ret == sqlite_wasm_rs::SQLITE_ROW {
615                        let mut values = Vec::new();
616                        for i in 0..column_count {
617                            let value = unsafe {
618                                let col_type = sqlite_wasm_rs::sqlite3_column_type(stmt, i);
619                                match col_type {
620                                    sqlite_wasm_rs::SQLITE_TEXT => {
621                                        let text_ptr = sqlite_wasm_rs::sqlite3_column_text(stmt, i);
622                                        if text_ptr.is_null() {
623                                            ColumnValue::Null
624                                        } else {
625                                            let text = std::ffi::CStr::from_ptr(text_ptr as *const i8).to_string_lossy().into_owned();
626                                            ColumnValue::Text(text)
627                                        }
628                                    },
629                                    sqlite_wasm_rs::SQLITE_INTEGER => {
630                                        ColumnValue::Integer(sqlite_wasm_rs::sqlite3_column_int64(stmt, i))
631                                    },
632                                    _ => ColumnValue::Null,
633                                }
634                            };
635                            values.push(value);
636                        }
637                        rows.push(Row { values });
638                    } else if step_ret == sqlite_wasm_rs::SQLITE_DONE {
639                        break;
640                    } else {
641                        // Get SQLite error message before finalizing
642                        let err_msg = unsafe {
643                            let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db);
644                            if !err_ptr.is_null() {
645                                std::ffi::CStr::from_ptr(err_ptr)
646                                    .to_string_lossy()
647                                    .to_string()
648                            } else {
649                                "Unknown SQLite error".to_string()
650                            }
651                        };
652                        unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
653                        // Track error
654        #[cfg(feature = "telemetry")]
655                        if let Some(metrics) = &self.metrics {
656                            metrics.errors_total().inc();
657                        }
658                        return Err(DatabaseError::new("SQLITE_ERROR", &format!("Failed to execute statement: {}", err_msg)).with_sql(sql));
659                    }
660                }
661            } else {
662                // Regular non-SELECT statement
663                let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
664                if step_ret != sqlite_wasm_rs::SQLITE_DONE {
665                    // Get SQLite error message before finalizing
666                    let err_msg = unsafe {
667                        let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db);
668                        if !err_ptr.is_null() {
669                            std::ffi::CStr::from_ptr(err_ptr)
670                                .to_string_lossy()
671                                .to_string()
672                        } else {
673                            "Unknown SQLite error".to_string()
674                        }
675                    };
676                    unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
677                    // Track error
678        #[cfg(feature = "telemetry")]
679                    if let Some(metrics) = &self.metrics {
680                        metrics.errors_total().inc();
681                    }
682                    return Err(DatabaseError::new("SQLITE_ERROR", &format!("Failed to execute statement: {}", err_msg)).with_sql(sql));
683                }
684            }
685            
686            // Finalize to complete the statement
687            unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
688            
689            let affected_rows = unsafe { sqlite_wasm_rs::sqlite3_changes(self.db) } as u32;
690            let last_insert_id = if sql.trim().to_uppercase().starts_with("INSERT") {
691                Some(unsafe { sqlite_wasm_rs::sqlite3_last_insert_rowid(self.db) })
692            } else {
693                None
694            };
695            
696            let execution_time_ms = js_sys::Date::now() - start_time;
697            
698            // Track query duration
699        #[cfg(feature = "telemetry")]
700            if let Some(metrics) = &self.metrics {
701                metrics.query_duration().observe(execution_time_ms);
702            }
703            
704            // Finish span successfully
705            #[cfg(feature = "telemetry")]
706            if let Some(mut s) = span {
707                s.status = crate::telemetry::SpanStatus::Ok;
708                s.end_time_ms = Some(js_sys::Date::now());
709                s.attributes.insert("duration_ms".to_string(), execution_time_ms.to_string());
710                s.attributes.insert("affected_rows".to_string(), affected_rows.to_string());
711                s.attributes.insert("row_count".to_string(), rows.len().to_string());
712                if let Some(recorder) = &self.span_recorder {
713                    recorder.record_span(s);
714                }
715                
716                // Exit span context
717                if let Some(ref context) = self.span_context {
718                    context.exit_span();
719                }
720            }
721            
722            Ok(QueryResult {
723                columns,
724                rows,
725                affected_rows,
726                last_insert_id,
727                execution_time_ms,
728            })
729        }
730    }
731    
732    pub async fn execute_with_params_internal(&mut self, sql: &str, params: &[ColumnValue]) -> Result<QueryResult, DatabaseError> {
733        use std::ffi::CString;
734        let start_time = js_sys::Date::now();
735        
736        // Create span for query execution
737        #[cfg(feature = "telemetry")]
738        let span = if self.span_recorder.is_some() {
739            let query_type = sql.trim().split_whitespace().next().unwrap_or("UNKNOWN").to_uppercase();
740            let span = crate::telemetry::SpanBuilder::new("execute_query".to_string())
741                .with_attribute("query_type", query_type.clone())
742                .with_attribute("sql", sql.to_string())
743                .build();
744            Some(span)
745        } else {
746            None
747        };
748        
749        // Ensure metrics are propagated to BlockStorage before execution
750        #[cfg(feature = "telemetry")]
751        self.ensure_metrics_propagated();
752        
753        // Track query execution metrics
754        #[cfg(feature = "telemetry")]
755        if let Some(metrics) = &self.metrics {
756            metrics.queries_total().inc();
757        }
758        
759        let sql_cstr = CString::new(sql)
760            .map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL string"))?;
761        
762        let mut stmt = std::ptr::null_mut();
763        let ret = unsafe {
764            sqlite_wasm_rs::sqlite3_prepare_v2(
765                self.db,
766                sql_cstr.as_ptr(),
767                -1,
768                &mut stmt,
769                std::ptr::null_mut()
770            )
771        };
772        
773        if ret != sqlite_wasm_rs::SQLITE_OK {
774            // Track error
775        #[cfg(feature = "telemetry")]
776            if let Some(metrics) = &self.metrics {
777                metrics.errors_total().inc();
778            }
779            
780            // Finish span with error
781            #[cfg(feature = "telemetry")]
782            if let Some(mut s) = span {
783                s.status = crate::telemetry::SpanStatus::Error("Failed to prepare statement".to_string());
784                s.end_time_ms = Some(js_sys::Date::now());
785                if let Some(recorder) = &self.span_recorder {
786                    recorder.record_span(s);
787                }
788            }
789            
790            return Err(DatabaseError::new("SQLITE_ERROR", "Failed to prepare statement").with_sql(sql));
791        }
792        
793        // Bind parameters
794        let mut text_cstrings = Vec::new(); // Keep CStrings alive
795        for (i, param) in params.iter().enumerate() {
796            let param_index = (i + 1) as i32;
797            let bind_ret = unsafe {
798                match param {
799                    ColumnValue::Null => sqlite_wasm_rs::sqlite3_bind_null(stmt, param_index),
800                    ColumnValue::Integer(val) => sqlite_wasm_rs::sqlite3_bind_int64(stmt, param_index, *val),
801                    ColumnValue::Real(val) => sqlite_wasm_rs::sqlite3_bind_double(stmt, param_index, *val),
802                    ColumnValue::Text(val) => {
803                        // Sanitize string by removing null bytes (SQLite text shouldn't contain them)
804                        let sanitized = val.replace('\0', "");
805                        // Safe: after removing null bytes, CString::new cannot fail
806                        let text_cstr = CString::new(sanitized.as_str())
807                            .expect("CString::new should not fail after null byte removal");
808                        let result = sqlite_wasm_rs::sqlite3_bind_text(
809                            stmt, 
810                            param_index, 
811                            text_cstr.as_ptr(), 
812                            sanitized.len() as i32, 
813                            sqlite_wasm_rs::SQLITE_TRANSIENT()
814                        );
815                        text_cstrings.push(text_cstr); // Keep alive
816                        result
817                    },
818                    ColumnValue::Blob(val) => {
819                        sqlite_wasm_rs::sqlite3_bind_blob(
820                            stmt, 
821                            param_index, 
822                            val.as_ptr() as *const _, 
823                            val.len() as i32, 
824                            sqlite_wasm_rs::SQLITE_TRANSIENT()
825                        )
826                    },
827                    _ => sqlite_wasm_rs::sqlite3_bind_null(stmt, param_index),
828                }
829            };
830            
831            if bind_ret != sqlite_wasm_rs::SQLITE_OK {
832                unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
833                // Track error
834        #[cfg(feature = "telemetry")]
835                if let Some(metrics) = &self.metrics {
836                    metrics.errors_total().inc();
837                }
838                return Err(DatabaseError::new("SQLITE_ERROR", "Failed to bind parameter").with_sql(sql));
839            }
840        }
841        
842        if sql.trim().to_uppercase().starts_with("SELECT") {
843            let column_count = unsafe { sqlite_wasm_rs::sqlite3_column_count(stmt) };
844            let mut columns = Vec::new();
845            let mut rows = Vec::new();
846            
847            // Get column names
848            for i in 0..column_count {
849                let col_name = unsafe {
850                    let name_ptr = sqlite_wasm_rs::sqlite3_column_name(stmt, i);
851                    if name_ptr.is_null() {
852                        format!("col_{}", i)
853                    } else {
854                        std::ffi::CStr::from_ptr(name_ptr).to_string_lossy().into_owned()
855                    }
856                };
857                columns.push(col_name);
858            }
859            
860            // Execute and fetch rows
861            loop {
862                let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
863                if step_ret == sqlite_wasm_rs::SQLITE_ROW {
864                    let mut values = Vec::new();
865                    for i in 0..column_count {
866                        let value = unsafe {
867                            let col_type = sqlite_wasm_rs::sqlite3_column_type(stmt, i);
868                            match col_type {
869                                sqlite_wasm_rs::SQLITE_NULL => ColumnValue::Null,
870                                sqlite_wasm_rs::SQLITE_INTEGER => {
871                                    let val = sqlite_wasm_rs::sqlite3_column_int64(stmt, i);
872                                    ColumnValue::Integer(val)
873                                },
874                                sqlite_wasm_rs::SQLITE_FLOAT => {
875                                    let val = sqlite_wasm_rs::sqlite3_column_double(stmt, i);
876                                    ColumnValue::Real(val)
877                                },
878                                sqlite_wasm_rs::SQLITE_TEXT => {
879                                    let text_ptr = sqlite_wasm_rs::sqlite3_column_text(stmt, i);
880                                    if text_ptr.is_null() {
881                                        ColumnValue::Null
882                                    } else {
883                                        let text = std::ffi::CStr::from_ptr(text_ptr as *const i8).to_string_lossy().into_owned();
884                                        ColumnValue::Text(text)
885                                    }
886                                },
887                                sqlite_wasm_rs::SQLITE_BLOB => {
888                                    let blob_ptr = sqlite_wasm_rs::sqlite3_column_blob(stmt, i);
889                                    let blob_size = sqlite_wasm_rs::sqlite3_column_bytes(stmt, i);
890                                    if blob_ptr.is_null() || blob_size == 0 {
891                                        ColumnValue::Blob(vec![])
892                                    } else {
893                                        let blob_slice = std::slice::from_raw_parts(blob_ptr as *const u8, blob_size as usize);
894                                        ColumnValue::Blob(blob_slice.to_vec())
895                                    }
896                                },
897                                _ => ColumnValue::Null,
898                            }
899                        };
900                        values.push(value);
901                    }
902                    rows.push(Row { values });
903                } else if step_ret == sqlite_wasm_rs::SQLITE_DONE {
904                    break;
905                } else {
906                    // Get SQLite error message before finalizing
907                    let err_msg = unsafe {
908                        let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db);
909                        if !err_ptr.is_null() {
910                            std::ffi::CStr::from_ptr(err_ptr)
911                                .to_string_lossy()
912                                .to_string()
913                        } else {
914                            "Unknown SQLite error".to_string()
915                        }
916                    };
917                    unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
918                    // Track error
919        #[cfg(feature = "telemetry")]
920                    if let Some(metrics) = &self.metrics {
921                        metrics.errors_total().inc();
922                    }
923                    return Err(DatabaseError::new("SQLITE_ERROR", &format!("Error executing SELECT statement: {}", err_msg)).with_sql(sql));
924                }
925            }
926            
927            unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
928            
929            
930            let execution_time_ms = js_sys::Date::now() - start_time;
931            
932            // Track query duration
933        #[cfg(feature = "telemetry")]
934            if let Some(metrics) = &self.metrics {
935                metrics.query_duration().observe(execution_time_ms);
936            }
937            
938            // Finish span successfully for SELECT query
939            #[cfg(feature = "telemetry")]
940            if let Some(mut s) = span {
941                s.status = crate::telemetry::SpanStatus::Ok;
942                s.end_time_ms = Some(js_sys::Date::now());
943                s.attributes.insert("duration_ms".to_string(), execution_time_ms.to_string());
944                s.attributes.insert("row_count".to_string(), rows.len().to_string());
945                if let Some(recorder) = &self.span_recorder {
946                    recorder.record_span(s);
947                }
948            }
949            
950            Ok(QueryResult {
951                columns,
952                rows,
953                affected_rows: 0,
954                last_insert_id: None,
955                execution_time_ms,
956            })
957        } else {
958            // Non-SELECT statements
959            let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
960                // Track error
961        #[cfg(feature = "telemetry")]
962                if let Some(metrics) = &self.metrics {
963                    metrics.errors_total().inc();
964                }
965            unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
966            
967            if step_ret != sqlite_wasm_rs::SQLITE_DONE {
968                let err_msg = unsafe {
969                    let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db);
970                    if !err_ptr.is_null() {
971                        std::ffi::CStr::from_ptr(err_ptr)
972                            .to_string_lossy()
973                            .to_string()
974                    } else {
975                        "Unknown SQLite error".to_string()
976                    }
977                };
978                return Err(DatabaseError::new("SQLITE_ERROR", &format!("Failed to execute statement: {}", err_msg)).with_sql(sql));
979            }
980            
981            
982            let execution_time_ms = js_sys::Date::now() - start_time;
983            
984            // Track query duration
985        #[cfg(feature = "telemetry")]
986            if let Some(metrics) = &self.metrics {
987                metrics.query_duration().observe(execution_time_ms);
988            }
989            let affected_rows = unsafe { sqlite_wasm_rs::sqlite3_changes(self.db) } as u32;
990            let last_insert_id = if sql.trim().to_uppercase().starts_with("INSERT") {
991                Some(unsafe { sqlite_wasm_rs::sqlite3_last_insert_rowid(self.db) })
992            } else {
993                None
994            };
995            
996            // Finish span successfully
997            #[cfg(feature = "telemetry")]
998            if let Some(mut s) = span {
999                s.status = crate::telemetry::SpanStatus::Ok;
1000                s.end_time_ms = Some(js_sys::Date::now());
1001                s.attributes.insert("duration_ms".to_string(), execution_time_ms.to_string());
1002                s.attributes.insert("affected_rows".to_string(), affected_rows.to_string());
1003                if let Some(recorder) = &self.span_recorder {
1004                    recorder.record_span(s);
1005                }
1006            }
1007            
1008            Ok(QueryResult {
1009                columns: vec![],
1010                rows: vec![],
1011                affected_rows,
1012                last_insert_id,
1013                execution_time_ms,
1014            })
1015        }
1016    }
1017    
1018    /// Set telemetry metrics for this database instance
1019    #[cfg(feature = "telemetry")]
1020    pub fn set_metrics(&mut self, metrics: Option<crate::telemetry::Metrics>) {
1021        self.metrics = metrics.clone();
1022        self.ensure_metrics_propagated();
1023    }
1024
1025    /// Set span recorder for distributed tracing
1026    #[cfg(feature = "telemetry")]
1027    pub fn set_span_recorder(&mut self, recorder: Option<crate::telemetry::SpanRecorder>) {
1028        self.span_recorder = recorder;
1029    }
1030
1031    /// Get span context for distributed tracing
1032    #[cfg(feature = "telemetry")]
1033    pub fn get_span_context(&self) -> Option<&crate::telemetry::SpanContext> {
1034        self.span_context.as_ref()
1035    }
1036
1037    /// Get span recorder for distributed tracing
1038    #[cfg(feature = "telemetry")]
1039    pub fn get_span_recorder(&self) -> Option<&crate::telemetry::SpanRecorder> {
1040        self.span_recorder.as_ref()
1041    }
1042    /// Ensure metrics are propagated to BlockStorage
1043    #[cfg(feature = "telemetry")]
1044    fn ensure_metrics_propagated(&self) {
1045        // Propagate metrics to BlockStorage via STORAGE_REGISTRY
1046        #[cfg(target_arch = "wasm32")]
1047        {
1048            use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
1049            
1050            if self.metrics.is_none() {
1051                return;
1052            }
1053            
1054            let db_name = &self.name;
1055            
1056            STORAGE_REGISTRY.with(|reg| {
1057                let registry = reg.borrow();
1058                if let Some(storage_rc) = registry.get(db_name)
1059                    .or_else(|| registry.get(&format!("{}.db", db_name)))
1060                    .or_else(|| {
1061                        if db_name.ends_with(".db") {
1062                            registry.get(&db_name[..db_name.len()-3])
1063                        } else {
1064                            None
1065                        }
1066                    })
1067                {
1068                    let mut storage = storage_rc.borrow_mut();
1069                    storage.set_metrics(self.metrics.clone());
1070                }
1071            });
1072        }
1073    }
1074    
1075    pub async fn close_internal(&mut self) -> Result<(), DatabaseError> {
1076        // Checkpoint WAL data before close using PASSIVE mode (non-blocking)
1077        // This ensures we don't block other database instances in concurrent scenarios
1078        log::debug!("Checkpointing WAL before close: {}", self.name);
1079        let _ = self.execute_internal("PRAGMA wal_checkpoint(PASSIVE)").await;
1080        
1081        // Sync to IndexedDB before closing to ensure schema persists
1082        log::debug!("Syncing database before close: {}", self.name);
1083        self.sync_internal().await?;
1084        
1085        // Stop leader election before closing
1086        #[cfg(target_arch = "wasm32")]
1087        {
1088            use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
1089            
1090            let db_name = &self.name;
1091            let storage_rc = STORAGE_REGISTRY.with(|reg| {
1092                let registry = reg.borrow();
1093                registry.get(db_name).cloned()
1094                    .or_else(|| registry.get(&format!("{}.db", db_name)).cloned())
1095            });
1096            
1097            if let Some(storage_rc) = storage_rc {
1098                if let Ok(mut storage) = storage_rc.try_borrow_mut() {
1099                    log::debug!("Stopping leader election for {}", db_name);
1100                    let _ = storage.stop_leader_election().await;
1101                }
1102            }
1103        }
1104        
1105        if !self.db.is_null() {
1106            unsafe {
1107                sqlite_wasm_rs::sqlite3_close(self.db);
1108                self.db = std::ptr::null_mut();
1109            }
1110        }
1111        Ok(())
1112    }
1113    
1114    /// Query database and return rows (alias for execute that returns rows)
1115    pub async fn query(&mut self, sql: &str) -> Result<Vec<Row>, DatabaseError> {
1116        let result = self.execute_internal(sql).await?;
1117        Ok(result.rows)
1118    }
1119    
1120    pub async fn sync_internal(&mut self) -> Result<(), DatabaseError> {
1121        // Start timing for telemetry
1122        #[cfg(all(target_arch = "wasm32", feature = "telemetry"))]
1123        let start_time = js_sys::Date::now();
1124        
1125        // Create span for VFS sync operation with automatic context linking
1126        #[cfg(feature = "telemetry")]
1127        let span = if self.span_recorder.is_some() {
1128            let mut builder = crate::telemetry::SpanBuilder::new("vfs_sync".to_string())
1129                .with_attribute("operation", "sync");
1130            
1131            // Automatically link to parent span via context and copy baggage
1132            if let Some(ref context) = self.span_context {
1133                builder = builder.with_context(context).with_baggage_from_context(context);
1134            }
1135            
1136            let span = builder.build();
1137            
1138            // Enter this span's context
1139            if let Some(ref context) = self.span_context {
1140                context.enter_span(span.span_id.clone());
1141            }
1142            
1143            Some(span)
1144        } else {
1145            None
1146        };
1147        
1148        // Track sync operation start
1149        #[cfg(feature = "telemetry")]
1150        if let Some(ref metrics) = self.metrics {
1151            metrics.sync_operations_total().inc();
1152        }
1153        
1154        // Track blocks persisted for span attributes
1155        #[cfg(feature = "telemetry")]
1156        let mut blocks_count = 0;
1157        
1158        // Trigger VFS sync to persist all blocks to IndexedDB
1159        #[cfg(target_arch = "wasm32")]
1160        {
1161            use crate::storage::vfs_sync;
1162            
1163            // Advance commit marker
1164            let next_commit = vfs_sync::with_global_commit_marker(|cm| {
1165                let mut cm = cm.borrow_mut();
1166                let current = cm.get(&self.name).copied().unwrap_or(0);
1167                let new_marker = current + 1;
1168                cm.insert(self.name.clone(), new_marker);
1169                log::debug!("Advanced commit marker for {} from {} to {}", self.name, current, new_marker);
1170                new_marker
1171            });
1172            
1173            // Collect blocks from GLOBAL_STORAGE (where VFS writes them)
1174            let (blocks_to_persist, metadata_to_persist) = vfs_sync::with_global_storage(|storage| {
1175                let storage_map = storage.borrow();
1176                let blocks = if let Some(db_storage) = storage_map.get(&self.name) {
1177                    db_storage.iter().map(|(&id, data)| (id, data.clone())).collect::<Vec<_>>()
1178                } else {
1179                    Vec::new()
1180                };
1181                
1182                let metadata = vfs_sync::with_global_metadata(|meta| {
1183                    let meta_map = meta.borrow();
1184                    if let Some(db_meta) = meta_map.get(&self.name) {
1185                        db_meta.iter().map(|(&id, metadata)| (id, metadata.checksum)).collect::<Vec<_>>()
1186                    } else {
1187                        Vec::new()
1188                    }
1189                });
1190                
1191                (blocks, metadata)
1192            });
1193            
1194            if !blocks_to_persist.is_empty() {
1195                #[cfg(feature = "telemetry")]
1196                {
1197                    blocks_count = blocks_to_persist.len();
1198                }
1199                log::debug!("Persisting {} blocks to IndexedDB for {}", blocks_to_persist.len(), self.name);
1200                crate::storage::wasm_indexeddb::persist_to_indexeddb_event_based(
1201                    &self.name,
1202                    blocks_to_persist,
1203                    metadata_to_persist,
1204                    next_commit,
1205                    #[cfg(feature = "telemetry")]
1206                    self.span_recorder.clone(),
1207                    #[cfg(feature = "telemetry")]
1208                    span.as_ref().map(|s| s.span_id.clone()),
1209                ).await?;
1210                log::debug!("Successfully persisted {} to IndexedDB (awaited)", self.name);
1211            } else {
1212                log::debug!("No blocks to persist for {}", self.name);
1213            }
1214            
1215            // Send notification after successful sync
1216            use crate::storage::broadcast_notifications::{BroadcastNotification, send_change_notification};
1217            
1218            let notification = BroadcastNotification::DataChanged {
1219                db_name: self.name.clone(),
1220                timestamp: js_sys::Date::now() as u64,
1221            };
1222            
1223            log::debug!("Sending DataChanged notification for {}", self.name);
1224            
1225            if let Err(e) = send_change_notification(&notification) {
1226                log::warn!("Failed to send change notification: {}", e);
1227                // Don't fail the sync if notification fails
1228            }
1229        }
1230        
1231        // Record sync duration
1232        #[cfg(all(target_arch = "wasm32", feature = "telemetry"))]
1233        if let Some(ref metrics) = self.metrics {
1234            let duration_ms = js_sys::Date::now() - start_time;
1235            metrics.sync_duration().observe(duration_ms);
1236        }
1237        
1238        // Finish span successfully
1239        #[cfg(feature = "telemetry")]
1240        if let Some(mut s) = span {
1241            s.status = crate::telemetry::SpanStatus::Ok;
1242            #[cfg(target_arch = "wasm32")]
1243            {
1244                s.end_time_ms = Some(js_sys::Date::now());
1245                let duration_ms = s.end_time_ms.unwrap() - s.start_time_ms;
1246                s.attributes.insert("duration_ms".to_string(), duration_ms.to_string());
1247            }
1248            #[cfg(not(target_arch = "wasm32"))]
1249            {
1250                let now = std::time::SystemTime::now()
1251                    .duration_since(std::time::UNIX_EPOCH)
1252                    .unwrap()
1253                    .as_millis() as f64;
1254                s.end_time_ms = Some(now);
1255                let duration_ms = s.end_time_ms.unwrap() - s.start_time_ms;
1256                s.attributes.insert("duration_ms".to_string(), duration_ms.to_string());
1257            }
1258            s.attributes.insert("blocks_persisted".to_string(), blocks_count.to_string());
1259            if let Some(recorder) = &self.span_recorder {
1260                recorder.record_span(s);
1261            }
1262            
1263            // Exit span context
1264            if let Some(ref context) = self.span_context {
1265                context.exit_span();
1266            }
1267        }
1268        
1269        Ok(())
1270    }
1271}
1272
1273#[cfg(target_arch = "wasm32")]
1274impl Drop for Database {
1275    fn drop(&mut self) {
1276        if !self.db.is_null() {
1277            unsafe {
1278                sqlite_wasm_rs::sqlite3_close(self.db);
1279            }
1280        }
1281        
1282        // Keep BlockStorage in STORAGE_REGISTRY so multiple Database instances
1283        // with the same name share the same BlockStorage and leader election state
1284        // Blocks persist in GLOBAL_STORAGE across Database instances
1285        log::debug!("Closed database: {} (BlockStorage remains in registry)", self.name);
1286    }
1287}
1288
1289// Add wasm_bindgen exports for the main Database struct
1290#[cfg(target_arch = "wasm32")]
1291#[wasm_bindgen]
1292impl Database {
1293    #[wasm_bindgen(js_name = "newDatabase")]
1294    pub async fn new_wasm(name: String) -> Result<Database, JsValue> {
1295        let config = DatabaseConfig {
1296            name: name.clone(),
1297            version: Some(1),
1298            cache_size: Some(10_000),
1299            page_size: Some(4096),
1300            auto_vacuum: Some(true),
1301            journal_mode: Some("WAL".to_string()),
1302            max_export_size_bytes: Some(2 * 1024 * 1024 * 1024), // 2GB default
1303        };
1304        
1305        let db = Database::new(config)
1306            .await
1307            .map_err(|e| JsValue::from_str(&format!("Failed to create database: {}", e)))?;
1308        
1309        // Start listening for write queue requests (leader will process them)
1310        Self::start_write_queue_listener(&name)?;
1311        
1312        Ok(db)
1313    }
1314    
1315    /// Start listening for write queue requests (leader processes these)
1316    fn start_write_queue_listener(db_name: &str) -> Result<(), JsValue> {
1317        use crate::storage::write_queue::{register_write_queue_listener, WriteQueueMessage, WriteResponse};
1318        use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
1319        
1320        let db_name_clone = db_name.to_string();
1321        
1322        let callback = Closure::wrap(Box::new(move |msg: JsValue| {
1323            let db_name_inner = db_name_clone.clone();
1324            
1325            // Parse the message
1326            if let Ok(json_str) = js_sys::JSON::stringify(&msg) {
1327                if let Some(json_str) = json_str.as_string() {
1328                    if let Ok(message) = serde_json::from_str::<WriteQueueMessage>(&json_str) {
1329                        if let WriteQueueMessage::WriteRequest(request) = message {
1330                            log::debug!("Leader received write request: {}", request.request_id);
1331                            
1332                            // Check if we're the leader
1333                            let storage_rc = STORAGE_REGISTRY.with(|reg| {
1334                                let registry = reg.borrow();
1335                                registry.get(&db_name_inner).cloned()
1336                                    .or_else(|| registry.get(&format!("{}.db", &db_name_inner)).cloned())
1337                            });
1338                            
1339                            if let Some(storage) = storage_rc {
1340                                // Spawn async task to process the write
1341                                wasm_bindgen_futures::spawn_local(async move {
1342                                    let is_leader = {
1343                                        let mut storage_mut = storage.borrow_mut();
1344                                        storage_mut.is_leader().await
1345                                    };
1346                                    
1347                                    if !is_leader {
1348                                        log::error!("Not leader, ignoring write request");
1349                                        return;
1350                                    }
1351                                    
1352                                    log::debug!("Processing write request as leader");
1353                                    
1354                                    // Create a temporary database instance to execute the SQL
1355                                    match Database::new_wasm(db_name_inner.clone()).await {
1356                                        Ok(mut db) => {
1357                                            // Execute the SQL
1358                                            match db.execute_internal(&request.sql).await {
1359                                                Ok(result) => {
1360                                                    // Send success response
1361                                                    let response = WriteResponse::Success {
1362                                                        request_id: request.request_id.clone(),
1363                                                        affected_rows: result.affected_rows as usize,
1364                                                    };
1365                                                    
1366                                                    use crate::storage::write_queue::send_write_response;
1367                                                    if let Err(e) = send_write_response(&db_name_inner, response) {
1368                                                        log::error!("Failed to send response: {}", e);
1369                                                    } else {
1370                                                        log::info!("Write response sent successfully");
1371                                                    }
1372                                                }
1373                                                Err(e) => {
1374                                                    // Send error response
1375                                                    let response = WriteResponse::Error {
1376                                                        request_id: request.request_id.clone(),
1377                                                        error_message: e.to_string(),
1378                                                    };
1379                                                    
1380                                                    use crate::storage::write_queue::send_write_response;
1381                                                    if let Err(e) = send_write_response(&db_name_inner, response) {
1382                                                        log::error!("Failed to send error response: {}", e);
1383                                                    }
1384                                                }
1385                                            }
1386                                        }
1387                                        Err(e) => {
1388                                            log::error!("Failed to create db for write processing: {:?}", e);
1389                                        }
1390                                    }
1391                                });
1392                            }
1393                        }
1394                    }
1395                }
1396            }
1397        }) as Box<dyn FnMut(JsValue)>);
1398        
1399        let callback_fn = callback.as_ref().unchecked_ref();
1400        register_write_queue_listener(db_name, callback_fn)
1401            .map_err(|e| JsValue::from_str(&format!("Failed to register write queue listener: {}", e)))?;
1402        
1403        callback.forget();
1404        
1405        Ok(())
1406    }
1407
1408    #[wasm_bindgen]
1409    pub async fn execute(&mut self, sql: &str) -> Result<JsValue, JsValue> {
1410        // Check write permission before executing
1411        self.check_write_permission(sql)
1412            .await
1413            .map_err(|e| JsValue::from_str(&format!("Write permission denied: {}", e)))?;
1414        
1415        let result = self.execute_internal(sql)
1416            .await
1417            .map_err(|e| JsValue::from_str(&format!("Query execution failed: {}", e)))?;
1418        serde_wasm_bindgen::to_value(&result).map_err(|e| JsValue::from_str(&e.to_string()))
1419    }
1420
1421    #[wasm_bindgen(js_name = "executeWithParams")]
1422    pub async fn execute_with_params(&mut self, sql: &str, params: JsValue) -> Result<JsValue, JsValue> {
1423        let params: Vec<ColumnValue> = serde_wasm_bindgen::from_value(params)
1424            .map_err(|e| JsValue::from_str(&format!("Invalid parameters: {}", e)))?;
1425        
1426        // Check write permission before executing
1427        self.check_write_permission(sql)
1428            .await
1429            .map_err(|e| JsValue::from_str(&format!("Write permission denied: {}", e)))?;
1430        
1431        let result = self.execute_with_params_internal(sql, &params)
1432            .await
1433            .map_err(|e| JsValue::from_str(&format!("Query execution failed: {}", e)))?;
1434        serde_wasm_bindgen::to_value(&result).map_err(|e| JsValue::from_str(&e.to_string()))
1435    }
1436
1437    #[wasm_bindgen]
1438    pub async fn close(&mut self) -> Result<(), JsValue> {
1439        self.close_internal()
1440            .await
1441            .map_err(|e| JsValue::from_str(&format!("Failed to close database: {}", e)))
1442    }
1443
1444    #[wasm_bindgen]
1445    pub async fn sync(&mut self) -> Result<(), JsValue> {
1446        self.sync_internal()
1447            .await
1448            .map_err(|e| JsValue::from_str(&format!("Failed to sync database: {}", e)))
1449    }
1450
1451    /// Allow non-leader writes (for single-tab apps or testing)
1452    #[wasm_bindgen(js_name = "allowNonLeaderWrites")]
1453    pub async fn allow_non_leader_writes(&mut self, allow: bool) -> Result<(), JsValue> {
1454        log::debug!("Setting allowNonLeaderWrites = {} for {}", allow, self.name);
1455        self.allow_non_leader_writes = allow;
1456        Ok(())
1457    }
1458
1459    /// Export database to SQLite .db file format
1460    /// 
1461    /// Returns the complete database as a Uint8Array that can be downloaded
1462    /// or saved as a standard SQLite .db file.
1463    /// 
1464    /// # Example
1465    /// ```javascript
1466    /// const dbBytes = await db.exportToFile();
1467    /// const blob = new Blob([dbBytes], { type: 'application/x-sqlite3' });
1468    /// const url = URL.createObjectURL(blob);
1469    /// const a = document.createElement('a');
1470    /// a.href = url;
1471    /// a.download = 'database.db';
1472    /// a.click();
1473    /// ```
1474    #[wasm_bindgen(js_name = "exportToFile")]
1475    pub async fn export_to_file(&mut self) -> Result<js_sys::Uint8Array, JsValue> {
1476        log::info!("Exporting database: {}", self.name);
1477        
1478        // Acquire export/import lock to prevent concurrent operations
1479        let _lock_guard = crate::storage::export_import_lock::acquire_export_import_lock(&self.name)
1480            .await
1481            .map_err(|e| JsValue::from_str(&format!("Failed to acquire export lock: {}", e)))?;
1482        
1483        log::debug!("Export lock acquired for: {}", self.name);
1484        
1485        // Trigger a non-blocking WAL checkpoint to ensure we get latest data
1486        // Use PASSIVE mode so it doesn't block other connections
1487        let _ = self.execute("PRAGMA wal_checkpoint(PASSIVE)").await;
1488        
1489        // Get storage from registry
1490        use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
1491        
1492        let storage_rc = STORAGE_REGISTRY.with(|reg| {
1493            let registry = reg.borrow();
1494            registry.get(&self.name).cloned()
1495                .or_else(|| registry.get(&format!("{}.db", &self.name)).cloned())
1496                .or_else(|| {
1497                    if self.name.ends_with(".db") {
1498                        registry.get(&self.name[..self.name.len()-3]).cloned()
1499                    } else {
1500                        None
1501                    }
1502                })
1503        });
1504        
1505        let storage_rc = storage_rc.ok_or_else(|| {
1506            JsValue::from_str(&format!("Storage not found for database: {}", self.name))
1507        })?;
1508        
1509        // Call export function
1510        let mut storage = storage_rc.borrow_mut();
1511        
1512        // Reload cache from GLOBAL_STORAGE to ensure we have latest data
1513        #[cfg(target_arch = "wasm32")]
1514        storage.reload_cache_from_global_storage();
1515        
1516        // Export with configured size limit from DatabaseConfig
1517        let db_bytes = crate::storage::export::export_database_to_bytes(&mut *storage, self.max_export_size_bytes)
1518            .await
1519            .map_err(|e| JsValue::from_str(&format!("Export failed: {}", e)))?;
1520        
1521        log::info!("Export complete: {} bytes", db_bytes.len());
1522        
1523        // Convert to Uint8Array for JavaScript
1524        let uint8_array = js_sys::Uint8Array::new_with_length(db_bytes.len() as u32);
1525        uint8_array.copy_from(&db_bytes);
1526        
1527        Ok(uint8_array)
1528        // Lock automatically released when _lock_guard is dropped
1529    }
1530
1531    /// Import SQLite database from .db file bytes
1532    /// 
1533    /// Replaces the current database contents with the imported data.
1534    /// This will close the current database connection and clear all existing data.
1535    /// 
1536    /// # Arguments
1537    /// * `file_data` - SQLite .db file as Uint8Array
1538    /// 
1539    /// # Returns
1540    /// * `Ok(())` - Import successful
1541    /// * `Err(JsValue)` - Import failed (invalid file, validation error, etc.)
1542    /// 
1543    /// # Example
1544    /// ```javascript
1545    /// // From file input
1546    /// const fileInput = document.getElementById('dbFile');
1547    /// const file = fileInput.files[0];
1548    /// const arrayBuffer = await file.arrayBuffer();
1549    /// const uint8Array = new Uint8Array(arrayBuffer);
1550    /// 
1551    /// await db.importFromFile(uint8Array);
1552    /// 
1553    /// // Database is now replaced - you may need to reopen connections
1554    /// ```
1555    /// 
1556    /// # Warning
1557    /// This operation is destructive and will replace all existing database data.
1558    /// The database connection will be closed and must be reopened after import.
1559    #[wasm_bindgen(js_name = "importFromFile")]
1560    pub async fn import_from_file(&mut self, file_data: js_sys::Uint8Array) -> Result<(), JsValue> {
1561        log::info!("Importing database: {}", self.name);
1562        
1563        // Acquire export/import lock to prevent concurrent operations
1564        let _lock_guard = crate::storage::export_import_lock::acquire_export_import_lock(&self.name)
1565            .await
1566            .map_err(|e| JsValue::from_str(&format!("Failed to acquire import lock: {}", e)))?;
1567        
1568        log::debug!("Import lock acquired for: {}", self.name);
1569        
1570        // Convert Uint8Array to Vec<u8>
1571        let data = file_data.to_vec();
1572        log::debug!("Import data size: {} bytes", data.len());
1573        
1574        // Close the database connection first
1575        log::debug!("Closing database connection before import");
1576        self.close().await?;
1577        
1578        // Call the import function from storage module
1579        crate::storage::import::import_database_from_bytes(&self.name, data)
1580            .await
1581            .map_err(|e| {
1582                log::error!("Import failed for {}: {}", self.name, e);
1583                JsValue::from_str(&format!("Import failed: {}", e))
1584            })?;
1585        
1586        log::info!("Import complete for: {}", self.name);
1587        
1588        // Note: The user will need to create a new Database instance to use the imported data
1589        // We don't automatically reopen here to give the user control
1590        
1591        Ok(())
1592        // Lock automatically released when _lock_guard is dropped
1593    }
1594
1595    /// Wait for this instance to become leader
1596    #[wasm_bindgen(js_name = "waitForLeadership")]
1597    pub async fn wait_for_leadership(&mut self) -> Result<(), JsValue> {
1598        use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
1599        
1600        // Track leader election attempt
1601        #[cfg(feature = "telemetry")]
1602        if let Some(ref metrics) = self.metrics {
1603            metrics.leader_elections_total().inc();
1604        }
1605        
1606        let db_name = &self.name;
1607        log::debug!("Waiting for leadership for {}", db_name);
1608        
1609        // Poll until we become leader (with timeout)
1610        let start_time = js_sys::Date::now();
1611        let timeout_ms = 5000.0; // 5 second timeout
1612        
1613        loop {
1614            let storage_rc = STORAGE_REGISTRY.with(|reg| {
1615                let registry = reg.borrow();
1616                registry.get(db_name).cloned()
1617                    .or_else(|| registry.get(&format!("{}.db", db_name)).cloned())
1618                    .or_else(|| {
1619                        if db_name.ends_with(".db") {
1620                            registry.get(&db_name[..db_name.len()-3]).cloned()
1621                        } else {
1622                            None
1623                        }
1624                    })
1625            });
1626            
1627            if let Some(storage) = storage_rc {
1628                let mut storage_mut = storage.borrow_mut();
1629                let is_leader = storage_mut.is_leader().await;
1630                
1631                if is_leader {
1632                    log::info!("Became leader for {}", db_name);
1633                    
1634                    // Record telemetry on successful leadership acquisition
1635                    #[cfg(feature = "telemetry")]
1636                    if let Some(ref metrics) = self.metrics {
1637                        let duration_ms = js_sys::Date::now() - start_time;
1638                        metrics.leader_election_duration().observe(duration_ms);
1639                        metrics.is_leader().set(1.0);
1640                        metrics.leadership_changes_total().inc();
1641                    }
1642                    
1643                    return Ok(());
1644                }
1645            }
1646            
1647            // Check timeout
1648            if js_sys::Date::now() - start_time > timeout_ms {
1649                // Record telemetry on timeout
1650                #[cfg(feature = "telemetry")]
1651                if let Some(ref metrics) = self.metrics {
1652                    let duration_ms = js_sys::Date::now() - start_time;
1653                    metrics.leader_election_duration().observe(duration_ms);
1654                }
1655                
1656                return Err(JsValue::from_str("Timeout waiting for leadership"));
1657            }
1658            
1659            // Wait a bit before checking again
1660            let promise = js_sys::Promise::new(&mut |resolve, _| {
1661                let window = web_sys::window().expect("should have window");
1662                let _ = window.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, 100);
1663            });
1664            let _ = wasm_bindgen_futures::JsFuture::from(promise).await;
1665        }
1666    }
1667
1668    /// Request leadership (triggers re-election check)
1669    #[wasm_bindgen(js_name = "requestLeadership")]
1670    pub async fn request_leadership(&mut self) -> Result<(), JsValue> {
1671        use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
1672        
1673        let db_name = &self.name;
1674        log::debug!("Requesting leadership for {}", db_name);
1675        
1676        // Telemetry setup
1677        #[cfg(feature = "telemetry")]
1678        let telemetry_data = if self.metrics.is_some() {
1679            let start_time = js_sys::Date::now();
1680            let was_leader = self.is_leader_wasm().await.ok()
1681                .and_then(|v| v.as_bool())
1682                .unwrap_or(false);
1683            
1684            if let Some(ref metrics) = self.metrics {
1685                metrics.leader_elections_total().inc();
1686            }
1687            
1688            Some((start_time, was_leader))
1689        } else {
1690            None
1691        };
1692        
1693        let storage_rc = STORAGE_REGISTRY.with(|reg| {
1694            let registry = reg.borrow();
1695            registry.get(db_name).cloned()
1696                .or_else(|| registry.get(&format!("{}.db", db_name)).cloned())
1697                .or_else(|| {
1698                    if db_name.ends_with(".db") {
1699                        registry.get(&db_name[..db_name.len()-3]).cloned()
1700                    } else {
1701                        None
1702                    }
1703                })
1704        });
1705        
1706        if let Some(storage) = storage_rc {
1707            {
1708                let mut storage_mut = storage.borrow_mut();
1709                
1710                // Trigger leader election
1711                storage_mut.start_leader_election().await
1712                    .map_err(|e| JsValue::from_str(&format!("Failed to request leadership: {}", e)))?;
1713                        
1714                log::debug!("Re-election triggered for {}", db_name);
1715            } // Drop the borrow here
1716            
1717            // Record telemetry after election (after dropping borrow)
1718            #[cfg(feature = "telemetry")]
1719            if let Some((start_time, was_leader)) = telemetry_data {
1720                if let Some(ref metrics) = self.metrics {
1721                    // Record election duration
1722                    let duration_ms = js_sys::Date::now() - start_time;
1723                    metrics.leader_election_duration().observe(duration_ms);
1724                    
1725                    // Check if leadership status changed
1726                    let is_leader_now = self.is_leader_wasm().await.ok()
1727                        .and_then(|v| v.as_bool())
1728                        .unwrap_or(false);
1729                    
1730                    // Update is_leader gauge
1731                    metrics.is_leader().set(if is_leader_now { 1.0 } else { 0.0 });
1732                    
1733                    // Track leadership changes
1734                    if was_leader != is_leader_now {
1735                        metrics.leadership_changes_total().inc();
1736                    }
1737                }
1738            }
1739            
1740            Ok(())
1741        } else {
1742            Err(JsValue::from_str(&format!("No storage found for database: {}", db_name)))
1743        }
1744    }
1745
1746    /// Get leader information
1747    #[wasm_bindgen(js_name = "getLeaderInfo")]
1748    pub async fn get_leader_info(&mut self) -> Result<JsValue, JsValue> {
1749        use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
1750        
1751        let db_name = &self.name;
1752        
1753        let storage_rc = STORAGE_REGISTRY.with(|reg| {
1754            let registry = reg.borrow();
1755            registry.get(db_name).cloned()
1756                .or_else(|| registry.get(&format!("{}.db", db_name)).cloned())
1757                .or_else(|| {
1758                    if db_name.ends_with(".db") {
1759                        registry.get(&db_name[..db_name.len()-3]).cloned()
1760                    } else {
1761                        None
1762                    }
1763                })
1764        });
1765        
1766        if let Some(storage) = storage_rc {
1767            let mut storage_mut = storage.borrow_mut();
1768            let is_leader = storage_mut.is_leader().await;
1769            
1770            // Get leader info - we'll use simpler data for now
1771            // Real implementation would need public getters on BlockStorage
1772            let leader_id_str = if is_leader {
1773                format!("leader_{}", db_name)
1774            } else {
1775                "unknown".to_string()
1776            };
1777            
1778            // Create JavaScript object
1779            let obj = js_sys::Object::new();
1780            js_sys::Reflect::set(&obj, &"isLeader".into(), &JsValue::from_bool(is_leader))?;
1781            js_sys::Reflect::set(&obj, &"leaderId".into(), &JsValue::from_str(&leader_id_str))?;
1782            js_sys::Reflect::set(&obj, &"leaseExpiry".into(), &JsValue::from_f64(js_sys::Date::now()))?;
1783            
1784            Ok(obj.into())
1785        } else {
1786            Err(JsValue::from_str(&format!("No storage found for database: {}", db_name)))
1787        }
1788    }
1789
1790    /// Queue a write operation to be executed by the leader
1791    /// 
1792    /// Non-leader tabs can use this to request writes from the leader.
1793    /// The write is forwarded via BroadcastChannel and executed by the leader.
1794    /// 
1795    /// # Arguments
1796    /// * `sql` - SQL statement to execute (must be a write operation)
1797    /// 
1798    /// # Returns
1799    /// Result indicating success or failure
1800    #[wasm_bindgen(js_name = "queueWrite")]
1801    pub async fn queue_write(&mut self, sql: String) -> Result<(), JsValue> {
1802        self.queue_write_with_timeout(sql, 5000).await
1803    }
1804    
1805    /// Queue a write operation with a specific timeout
1806    /// 
1807    /// # Arguments
1808    /// * `sql` - SQL statement to execute
1809    /// * `timeout_ms` - Timeout in milliseconds
1810    #[wasm_bindgen(js_name = "queueWriteWithTimeout")]
1811    pub async fn queue_write_with_timeout(&mut self, sql: String, timeout_ms: u32) -> Result<(), JsValue> {
1812        use crate::storage::write_queue::{send_write_request, WriteResponse, WriteQueueMessage};
1813        use std::cell::RefCell;
1814        use std::rc::Rc;
1815        
1816        log::debug!("Queuing write: {}", sql);
1817        
1818        // Check if we're the leader - if so, just execute directly
1819        use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
1820        let is_leader = {
1821            let storage_rc = STORAGE_REGISTRY.with(|reg| {
1822                let registry = reg.borrow();
1823                registry.get(&self.name).cloned()
1824                    .or_else(|| registry.get(&format!("{}.db", &self.name)).cloned())
1825            });
1826            
1827            if let Some(storage) = storage_rc {
1828                let mut storage_mut = storage.borrow_mut();
1829                storage_mut.is_leader().await
1830            } else {
1831                false
1832            }
1833        };
1834        
1835        if is_leader {
1836            log::debug!("We are leader, executing directly");
1837            return self.execute_internal(&sql).await
1838                .map(|_| ())
1839                .map_err(|e| JsValue::from_str(&format!("Execute failed: {}", e)));
1840        }
1841        
1842        // Send write request to leader
1843        let request_id = send_write_request(&self.name, &sql)
1844            .map_err(|e| JsValue::from_str(&format!("Failed to send write request: {}", e)))?;
1845        
1846        log::debug!("Write request sent with ID: {}", request_id);
1847        
1848        // Wait for response with timeout
1849        let response_received = Rc::new(RefCell::new(false));
1850        let response_error = Rc::new(RefCell::new(None::<String>));
1851        
1852        let response_received_clone = response_received.clone();
1853        let response_error_clone = response_error.clone();
1854        let request_id_clone = request_id.clone();
1855        
1856        // Set up listener for response
1857        let callback = Closure::wrap(Box::new(move |msg: JsValue| {
1858            // Parse the message
1859            if let Ok(json_str) = js_sys::JSON::stringify(&msg) {
1860                if let Some(json_str) = json_str.as_string() {
1861                    if let Ok(message) = serde_json::from_str::<WriteQueueMessage>(&json_str) {
1862                        if let WriteQueueMessage::WriteResponse(response) = message {
1863                            match response {
1864                                WriteResponse::Success { request_id, .. } => {
1865                                    if request_id == request_id_clone {
1866                                        *response_received_clone.borrow_mut() = true;
1867                                        log::debug!("Write response received: Success");
1868                                    }
1869                                }
1870                                WriteResponse::Error { request_id, error_message } => {
1871                                    if request_id == request_id_clone {
1872                                        *response_received_clone.borrow_mut() = true;
1873                                        *response_error_clone.borrow_mut() = Some(error_message);
1874                                        log::debug!("Write response received: Error");
1875                                    }
1876                                }
1877                            }
1878                        }
1879                    }
1880                }
1881            }
1882        }) as Box<dyn FnMut(JsValue)>);
1883        
1884        // Register listener
1885        use crate::storage::write_queue::register_write_queue_listener;
1886        let callback_fn = callback.as_ref().unchecked_ref();
1887        register_write_queue_listener(&self.name, callback_fn)
1888            .map_err(|e| JsValue::from_str(&format!("Failed to register listener: {}", e)))?;
1889        
1890        // Keep callback alive
1891        callback.forget();
1892        
1893        // Wait for response with polling (timeout_ms)
1894        let start_time = js_sys::Date::now();
1895        let timeout_f64 = timeout_ms as f64;
1896        
1897        loop {
1898            // Check if response received
1899            if *response_received.borrow() {
1900                if let Some(error_msg) = response_error.borrow().as_ref() {
1901                    return Err(JsValue::from_str(&format!("Write failed: {}", error_msg)));
1902                }
1903                log::info!("Write completed successfully");
1904                return Ok(());
1905            }
1906            
1907            // Check timeout
1908            let elapsed = js_sys::Date::now() - start_time;
1909            if elapsed > timeout_f64 {
1910                return Err(JsValue::from_str("Write request timed out"));
1911            }
1912            
1913            // Wait a bit before checking again
1914            wasm_bindgen_futures::JsFuture::from(js_sys::Promise::new(&mut |resolve, _reject| {
1915                if let Some(window) = web_sys::window() {
1916                    let _ = window.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, 100);
1917                } else {
1918                    log::error!("Window unavailable in timeout handler");
1919                }
1920            })).await.ok();
1921        }
1922    }
1923
1924    #[wasm_bindgen(js_name = "isLeader")]
1925    pub async fn is_leader_wasm(&self) -> Result<JsValue, JsValue> {
1926        // Get the storage from STORAGE_REGISTRY
1927        use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
1928        
1929        let db_name = &self.name;
1930        log::debug!("isLeader() called for database: {} (self.name)", db_name);
1931        
1932        // Show what's in the registry
1933        STORAGE_REGISTRY.with(|reg| {
1934            let registry = reg.borrow();
1935            log::debug!("STORAGE_REGISTRY keys: {:?}", registry.keys().collect::<Vec<_>>());
1936        });
1937        
1938        let storage_rc = STORAGE_REGISTRY.with(|reg| {
1939            let registry = reg.borrow();
1940            // Try both with and without .db extension
1941            registry.get(db_name).cloned()
1942                .or_else(|| registry.get(&format!("{}.db", db_name)).cloned())
1943                .or_else(|| {
1944                    if db_name.ends_with(".db") {
1945                        registry.get(&db_name[..db_name.len()-3]).cloned()
1946                    } else {
1947                        None
1948                    }
1949                })
1950        });
1951        
1952        if let Some(storage) = storage_rc {
1953            log::debug!("Found storage for {}, calling is_leader()", db_name);
1954            let mut storage_mut = storage.borrow_mut();
1955            let is_leader = storage_mut.is_leader().await;
1956            log::debug!("is_leader() = {} for {}", is_leader, db_name);
1957            
1958            // Return as JsValue boolean
1959            Ok(JsValue::from_bool(is_leader))
1960        } else {
1961            log::error!("ERROR: No storage found for database: {}", db_name);
1962            Err(JsValue::from_str(&format!("No storage found for database: {}", db_name)))
1963        }
1964    }
1965
1966    /// Check if this instance is the leader (non-wasm version for internal use/tests)
1967    pub async fn is_leader(&self) -> Result<bool, JsValue> {
1968        let result = self.is_leader_wasm().await?;
1969        Ok(result.as_bool().unwrap_or(false))
1970    }
1971
1972    #[wasm_bindgen(js_name = "onDataChange")]
1973    pub fn on_data_change_wasm(&mut self, callback: &js_sys::Function) -> Result<(), JsValue> {
1974        log::debug!("Registering onDataChange callback for {}", self.name);
1975        
1976        // Store the callback
1977        self.on_data_change_callback = Some(callback.clone());
1978        
1979        // Register listener for BroadcastChannel notifications from other tabs
1980        use crate::storage::broadcast_notifications::register_change_listener;
1981        
1982        let db_name = &self.name;
1983        register_change_listener(db_name, callback)
1984            .map_err(|e| JsValue::from_str(&format!("Failed to register change listener: {}", e)))?;
1985        
1986        log::debug!("onDataChange callback registered for {}", self.name);
1987        Ok(())
1988    }
1989
1990    /// Enable or disable optimistic updates mode
1991    #[wasm_bindgen(js_name = "enableOptimisticUpdates")]
1992    pub async fn enable_optimistic_updates(&mut self, enabled: bool) -> Result<(), JsValue> {
1993        self.optimistic_updates_manager.borrow_mut().set_enabled(enabled);
1994        log::debug!("Optimistic updates {}", if enabled { "enabled" } else { "disabled" });
1995        Ok(())
1996    }
1997
1998    /// Check if optimistic mode is enabled
1999    #[wasm_bindgen(js_name = "isOptimisticMode")]
2000    pub async fn is_optimistic_mode(&self) -> bool {
2001        self.optimistic_updates_manager.borrow().is_enabled()
2002    }
2003
2004    /// Track an optimistic write
2005    #[wasm_bindgen(js_name = "trackOptimisticWrite")]
2006    pub async fn track_optimistic_write(&mut self, sql: String) -> Result<String, JsValue> {
2007        let id = self.optimistic_updates_manager.borrow_mut().track_write(sql);
2008        Ok(id)
2009    }
2010
2011    /// Get count of pending writes
2012    #[wasm_bindgen(js_name = "getPendingWritesCount")]
2013    pub async fn get_pending_writes_count(&self) -> usize {
2014        self.optimistic_updates_manager.borrow().get_pending_count()
2015    }
2016
2017    /// Clear all optimistic writes
2018    #[wasm_bindgen(js_name = "clearOptimisticWrites")]
2019    pub async fn clear_optimistic_writes(&mut self) -> Result<(), JsValue> {
2020        self.optimistic_updates_manager.borrow_mut().clear_all();
2021        Ok(())
2022    }
2023
2024    /// Enable or disable coordination metrics tracking
2025    #[wasm_bindgen(js_name = "enableCoordinationMetrics")]
2026    pub async fn enable_coordination_metrics(&mut self, enabled: bool) -> Result<(), JsValue> {
2027        self.coordination_metrics_manager.borrow_mut().set_enabled(enabled);
2028        Ok(())
2029    }
2030
2031    /// Check if coordination metrics tracking is enabled
2032    #[wasm_bindgen(js_name = "isCoordinationMetricsEnabled")]
2033    pub async fn is_coordination_metrics_enabled(&self) -> bool {
2034        self.coordination_metrics_manager.borrow().is_enabled()
2035    }
2036
2037    /// Record a leadership change
2038    #[wasm_bindgen(js_name = "recordLeadershipChange")]
2039    pub async fn record_leadership_change(&mut self, became_leader: bool) -> Result<(), JsValue> {
2040        self.coordination_metrics_manager.borrow_mut().record_leadership_change(became_leader);
2041        Ok(())
2042    }
2043
2044    /// Record a notification latency in milliseconds
2045    #[wasm_bindgen(js_name = "recordNotificationLatency")]
2046    pub async fn record_notification_latency(&mut self, latency_ms: f64) -> Result<(), JsValue> {
2047        self.coordination_metrics_manager.borrow_mut().record_notification_latency(latency_ms);
2048        Ok(())
2049    }
2050
2051    /// Record a write conflict (non-leader write attempt)
2052    #[wasm_bindgen(js_name = "recordWriteConflict")]
2053    pub async fn record_write_conflict(&mut self) -> Result<(), JsValue> {
2054        self.coordination_metrics_manager.borrow_mut().record_write_conflict();
2055        Ok(())
2056    }
2057
2058    /// Record a follower refresh
2059    #[wasm_bindgen(js_name = "recordFollowerRefresh")]
2060    pub async fn record_follower_refresh(&mut self) -> Result<(), JsValue> {
2061        self.coordination_metrics_manager.borrow_mut().record_follower_refresh();
2062        Ok(())
2063    }
2064
2065    /// Get coordination metrics as JSON string
2066    #[wasm_bindgen(js_name = "getCoordinationMetrics")]
2067    pub async fn get_coordination_metrics(&self) -> Result<String, JsValue> {
2068        self.coordination_metrics_manager.borrow().get_metrics_json()
2069            .map_err(|e| JsValue::from_str(&e))
2070    }
2071
2072    /// Reset all coordination metrics
2073    #[wasm_bindgen(js_name = "resetCoordinationMetrics")]
2074    pub async fn reset_coordination_metrics(&mut self) -> Result<(), JsValue> {
2075        self.coordination_metrics_manager.borrow_mut().reset();
2076        Ok(())
2077    }
2078}
2079
2080// Export WasmColumnValue for WASM
2081#[cfg(target_arch = "wasm32")]
2082#[wasm_bindgen]
2083pub struct WasmColumnValue {
2084    #[allow(dead_code)]
2085    inner: ColumnValue,
2086}
2087
2088#[cfg(target_arch = "wasm32")]
2089#[wasm_bindgen]
2090impl WasmColumnValue {
2091    #[wasm_bindgen(js_name = "createNull")]
2092    pub fn create_null() -> WasmColumnValue {
2093        WasmColumnValue {
2094            inner: ColumnValue::Null,
2095        }
2096    }
2097
2098    #[wasm_bindgen(js_name = "createInteger")]
2099    pub fn create_integer(value: i64) -> WasmColumnValue {
2100        WasmColumnValue {
2101            inner: ColumnValue::Integer(value),
2102        }
2103    }
2104
2105    #[wasm_bindgen(js_name = "createReal")]
2106    pub fn create_real(value: f64) -> WasmColumnValue {
2107        WasmColumnValue {
2108            inner: ColumnValue::Real(value),
2109        }
2110    }
2111
2112    #[wasm_bindgen(js_name = "createText")]
2113    pub fn create_text(value: String) -> WasmColumnValue {
2114        WasmColumnValue {
2115            inner: ColumnValue::Text(value),
2116        }
2117    }
2118
2119    #[wasm_bindgen(js_name = "createBlob")]
2120    pub fn create_blob(value: &[u8]) -> WasmColumnValue {
2121        WasmColumnValue {
2122            inner: ColumnValue::Blob(value.to_vec()),
2123        }
2124    }
2125
2126    #[wasm_bindgen(js_name = "createBigInt")]
2127    pub fn create_bigint(value: &str) -> WasmColumnValue {
2128        WasmColumnValue {
2129            inner: ColumnValue::BigInt(value.to_string()),
2130        }
2131    }
2132
2133    #[wasm_bindgen(js_name = "createDate")]
2134    pub fn create_date(timestamp: f64) -> WasmColumnValue {
2135        WasmColumnValue {
2136            inner: ColumnValue::Date(timestamp as i64),
2137        }
2138    }
2139
2140    #[wasm_bindgen(js_name = "fromJsValue")]
2141    pub fn from_js_value(value: &JsValue) -> WasmColumnValue {
2142        if value.is_null() || value.is_undefined() {
2143            WasmColumnValue {
2144                inner: ColumnValue::Null,
2145            }
2146        } else if let Some(s) = value.as_string() {
2147            // Check if it's a large number string
2148            if let Ok(parsed) = s.parse::<i64>() {
2149                WasmColumnValue {
2150                    inner: ColumnValue::Integer(parsed),
2151                }
2152            } else {
2153                WasmColumnValue {
2154                    inner: ColumnValue::Text(s),
2155                }
2156            }
2157        } else if let Some(n) = value.as_f64() {
2158            if n.fract() == 0.0 && n >= i64::MIN as f64 && n <= i64::MAX as f64 {
2159                WasmColumnValue {
2160                    inner: ColumnValue::Integer(n as i64),
2161                }
2162            } else {
2163                WasmColumnValue {
2164                    inner: ColumnValue::Real(n),
2165                }
2166            }
2167        } else if value.is_object() {
2168            // Check if it's a Date
2169            if js_sys::Date::new(value).get_time().is_finite() {
2170                let timestamp = js_sys::Date::new(value).get_time() as i64;
2171                WasmColumnValue {
2172                    inner: ColumnValue::Date(timestamp),
2173                }
2174            } else {
2175                // Convert to string for other objects
2176                WasmColumnValue {
2177                    inner: ColumnValue::Text(format!("{:?}", value)),
2178                }
2179            }
2180        } else {
2181            WasmColumnValue {
2182                inner: ColumnValue::Null,
2183            }
2184        }
2185    }
2186
2187    // --- Rust-friendly alias constructors used in wasm tests ---
2188    // These mirror the create* methods but with simpler names and
2189    // argument types matching test usage.
2190    pub fn null() -> WasmColumnValue { Self::create_null() }
2191
2192    // Tests call integer(42.0), so accept f64 and cast to i64.
2193    pub fn integer(value: f64) -> WasmColumnValue { Self::create_integer(value as i64) }
2194
2195    pub fn real(value: f64) -> WasmColumnValue { Self::create_real(value) }
2196
2197    pub fn text(value: String) -> WasmColumnValue { Self::create_text(value) }
2198
2199    pub fn blob(value: Vec<u8>) -> WasmColumnValue { Self::create_blob(&value) }
2200
2201    pub fn big_int(value: String) -> WasmColumnValue { Self::create_bigint(&value) }
2202
2203    pub fn date(timestamp_ms: f64) -> WasmColumnValue { Self::create_date(timestamp_ms) }
2204}