#[cfg(target_arch = "wasm32")]
use std::rc::Rc;
#[cfg(target_arch = "wasm32")]
use wasm_bindgen::prelude::*;
#[cfg(all(
not(target_arch = "wasm32"),
any(
feature = "bundled-sqlite",
feature = "encryption",
feature = "encryption-commoncrypto",
feature = "encryption-ios"
)
))]
pub extern crate rusqlite;
#[cfg(feature = "console_error_panic_hook")]
pub use console_error_panic_hook::set_once as set_panic_hook;
#[cfg(feature = "wee_alloc")]
#[global_allocator]
static ALLOC: wee_alloc::WeeAlloc = wee_alloc::WeeAlloc::INIT;
#[cfg(all(target_arch = "wasm32", feature = "console_log"))]
#[wasm_bindgen(start)]
pub fn init_logger() {
#[cfg(debug_assertions)]
let log_level = log::Level::Debug;
#[cfg(not(debug_assertions))]
let log_level = log::Level::Info;
console_log::init_with_level(log_level).expect("Failed to initialize console_log");
log::info!("AbsurderSQL logging initialized at level: {:?}", log_level);
}
#[inline]
#[cfg(target_arch = "wasm32")]
fn normalize_db_name(name: &str) -> String {
if name.ends_with(".db") {
name.to_string()
} else {
format!("{}.db", name)
}
}
mod cleanup;
#[cfg(target_arch = "wasm32")]
pub mod connection_pool;
#[cfg(not(target_arch = "wasm32"))]
pub mod database;
pub mod storage;
pub mod types;
pub mod vfs;
#[cfg(not(target_arch = "wasm32"))]
pub use database::PreparedStatement;
pub mod utils;
#[cfg(feature = "telemetry")]
pub mod telemetry;
#[cfg(not(target_arch = "wasm32"))]
pub use database::SqliteIndexedDB;
#[cfg(target_arch = "wasm32")]
thread_local! {
static DB_OPEN_IN_PROGRESS: std::cell::RefCell<std::collections::HashSet<String>> =
std::cell::RefCell::new(std::collections::HashSet::new());
}
#[cfg(not(target_arch = "wasm32"))]
pub type Database = SqliteIndexedDB;
pub use types::DatabaseConfig;
pub use types::{ColumnValue, DatabaseError, QueryResult, Row, TransactionOptions};
pub use vfs::indexeddb_vfs::IndexedDBVFS;
#[cfg(target_arch = "wasm32")]
macro_rules! with_storage_async {
($storage:expr, $operation:expr, |$s:ident| $body:expr) => {{
let $s = &*$storage;
Some($body.await)
}};
}
#[cfg(target_arch = "wasm32")]
#[wasm_bindgen]
pub struct Database {
#[wasm_bindgen(skip)]
connection_state: Rc<crate::connection_pool::ConnectionState>,
#[allow(dead_code)]
name: String,
#[wasm_bindgen(skip)]
on_data_change_callback: Option<js_sys::Function>,
#[wasm_bindgen(skip)]
allow_non_leader_writes: bool,
#[wasm_bindgen(skip)]
optimistic_updates_manager:
std::cell::RefCell<crate::storage::optimistic_updates::OptimisticUpdatesManager>,
#[wasm_bindgen(skip)]
coordination_metrics_manager:
std::cell::RefCell<crate::storage::coordination_metrics::CoordinationMetricsManager>,
#[wasm_bindgen(skip)]
#[cfg(feature = "telemetry")]
metrics: Option<crate::telemetry::Metrics>,
#[wasm_bindgen(skip)]
#[cfg(feature = "telemetry")]
span_recorder: Option<crate::telemetry::SpanRecorder>,
#[wasm_bindgen(skip)]
#[cfg(feature = "telemetry")]
span_context: Option<crate::telemetry::SpanContext>,
#[wasm_bindgen(skip)]
max_export_size_bytes: Option<u64>,
}
#[cfg(target_arch = "wasm32")]
impl Database {
fn db(&self) -> *mut sqlite_wasm_rs::sqlite3 {
let db_ptr = self.connection_state.db.get();
if db_ptr.is_null() {
panic!("Database connection is null for {}", self.name);
}
db_ptr
}
fn is_write_operation(sql: &str) -> bool {
let upper = sql.trim().to_uppercase();
upper.starts_with("INSERT")
|| upper.starts_with("UPDATE")
|| upper.starts_with("DELETE")
|| upper.starts_with("REPLACE")
}
#[cfg(feature = "telemetry")]
pub fn metrics(&self) -> Option<&crate::telemetry::Metrics> {
self.metrics.as_ref()
}
async fn check_write_permission(&mut self, sql: &str) -> Result<(), DatabaseError> {
if !Self::is_write_operation(sql) {
return Ok(());
}
if self.allow_non_leader_writes {
log::info!("WRITE_ALLOWED: Non-leader writes enabled for {}", self.name);
return Ok(());
}
use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
let db_name = &self.name;
let storage_rc = get_storage_with_fallback(db_name);
if let Some(storage) = storage_rc {
let is_leader = with_storage_async!(storage, "check_write_permission", |s| s
.is_leader())
.ok_or_else(|| {
DatabaseError::new("BORROW_CONFLICT", "Failed to check leader status")
})?;
if !is_leader {
log::error!("WRITE_DENIED: Instance is not leader for {}", db_name);
return Err(DatabaseError::new(
"WRITE_PERMISSION_DENIED",
"Only the leader tab can write to this database. Use db.isLeader() to check status or call db.allowNonLeaderWrites(true) for single-tab mode.",
));
}
log::info!("WRITE_ALLOWED: Instance is leader for {}", db_name);
Ok(())
} else {
log::info!(
"WRITE_ALLOWED: No storage found for {} (single-instance mode)",
db_name
);
Ok(())
}
}
pub async fn new(config: DatabaseConfig) -> Result<Self, DatabaseError> {
use std::ffi::{CStr, CString};
log::info!("Database::new called for {}", config.name);
let normalized_name = normalize_db_name(&config.name);
let vfs_name = format!("vfs_{}", normalized_name.trim_end_matches(".db"));
let vfs_name_cstr = CString::new(vfs_name.as_str())
.map_err(|_| DatabaseError::new("INVALID_VFS_NAME", "Invalid VFS name"))?;
let vfs_exists = unsafe {
let existing_vfs = sqlite_wasm_rs::sqlite3_vfs_find(vfs_name_cstr.as_ptr());
!existing_vfs.is_null()
};
if !vfs_exists {
log::debug!("Creating IndexedDBVFS for: {}", normalized_name);
let vfs = crate::vfs::IndexedDBVFS::new(&normalized_name).await?;
log::debug!("Registering VFS as '{}'", vfs_name);
vfs.register(&vfs_name)?;
log::info!("VFS registered successfully");
} else {
log::info!("VFS '{}' already registered, reusing existing", vfs_name);
let _vfs = crate::vfs::IndexedDBVFS::new(&normalized_name).await?;
log::info!("BlockStorage ensured for {}", normalized_name);
}
#[cfg(target_arch = "wasm32")]
{
const MAX_OPEN_WAIT_MS: u32 = 1000; const OPEN_POLL_MS: u32 = 10;
let max_open_attempts = MAX_OPEN_WAIT_MS / OPEN_POLL_MS;
for attempt in 0..max_open_attempts {
let can_open = DB_OPEN_IN_PROGRESS.with(|opens| {
let mut set = opens.borrow_mut();
if set.contains(&config.name) {
false } else {
set.insert(config.name.clone());
true }
});
if can_open {
web_sys::console::log_1(
&format!("[DB] {} - ACQUIRED sqlite open lock", config.name).into(),
);
break;
} else {
web_sys::console::log_1(
&format!(
"[DB] {} - Waiting for sqlite open (attempt {})",
config.name, attempt
)
.into(),
);
use wasm_bindgen_futures::JsFuture;
let promise = js_sys::Promise::new(&mut |resolve, _| {
web_sys::window()
.unwrap()
.set_timeout_with_callback_and_timeout_and_arguments_0(
&resolve,
OPEN_POLL_MS as i32,
)
.unwrap();
});
JsFuture::from(promise).await.ok();
continue;
}
}
}
let (connection_state, db) = {
let vfs_name_str = vfs_name.clone(); let filename_copy = normalized_name.clone(); let pool_key = normalized_name.trim_end_matches(".db").to_string(); let state = crate::connection_pool::get_or_create_connection(&pool_key, || {
let mut db = std::ptr::null_mut();
let db_name = CString::new(normalized_name.clone())
.map_err(|_| "Invalid database name".to_string())?;
let vfs_cstr = CString::new(vfs_name_str.as_str())
.map_err(|_| "Invalid VFS name".to_string())?;
log::info!(
"Opening database: {} with VFS: {}",
filename_copy,
vfs_name_str
);
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(&format!("[OPEN] About to call sqlite3_open_v2...").into());
let ret = unsafe {
sqlite_wasm_rs::sqlite3_open_v2(
db_name.as_ptr(),
&mut db as *mut _,
sqlite_wasm_rs::SQLITE_OPEN_READWRITE | sqlite_wasm_rs::SQLITE_OPEN_CREATE,
vfs_cstr.as_ptr(),
)
};
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(
&format!("[OPEN] sqlite3_open_v2 returned: {}", ret).into(),
);
log::info!(
"sqlite3_open_v2 returned: {} for database: {}",
ret,
filename_copy
);
if ret != sqlite_wasm_rs::SQLITE_OK {
let err_msg = unsafe {
let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(db);
if !msg_ptr.is_null() {
CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
} else {
"Unknown error".to_string()
}
};
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(
&format!(
"[OPEN] ERROR - sqlite3_open_v2 FAILED: ret={}, err={}",
ret, err_msg
)
.into(),
);
return Err(format!(
"Failed to open database with IndexedDB VFS: {}",
err_msg
));
}
log::info!("Database opened successfully with IndexedDB VFS");
Ok(db)
})
.map_err(|e| DatabaseError::new("CONNECTION_POOL_ERROR", &e))?;
let db_ptr = state.db.get();
(state, db_ptr)
};
let exec_sql = |db: *mut sqlite_wasm_rs::sqlite3, sql: &str| -> Result<(), DatabaseError> {
let c_sql = CString::new(sql)
.map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL statement"))?;
let ret = unsafe {
sqlite_wasm_rs::sqlite3_exec(
db,
c_sql.as_ptr(),
None,
std::ptr::null_mut(),
std::ptr::null_mut(),
)
};
if ret != sqlite_wasm_rs::SQLITE_OK {
let err_msg = unsafe {
let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(db);
if !msg_ptr.is_null() {
CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
} else {
"Unknown error".to_string()
}
};
log::warn!("Failed to execute SQL '{}': {}", sql, err_msg);
return Err(DatabaseError::new(
"SQLITE_ERROR",
&format!("Failed to execute: {}", err_msg),
));
}
Ok(())
};
log::debug!("Setting busy_timeout to 10000ms for concurrent access handling");
exec_sql(db, "PRAGMA busy_timeout = 10000")?;
if let Some(page_size) = config.page_size {
log::debug!("Setting page_size to {}", page_size);
exec_sql(db, &format!("PRAGMA page_size = {}", page_size))?;
}
if let Some(cache_size) = config.cache_size {
log::debug!("Setting cache_size to {}", cache_size);
exec_sql(db, &format!("PRAGMA cache_size = {}", cache_size))?;
}
if let Some(ref journal_mode) = config.journal_mode {
log::debug!("Setting journal_mode to {}", journal_mode);
let pragma_sql = format!("PRAGMA journal_mode = {}", journal_mode);
let c_sql = CString::new(pragma_sql.as_str())
.map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL statement"))?;
let mut stmt: *mut sqlite_wasm_rs::sqlite3_stmt = std::ptr::null_mut();
let ret = unsafe {
sqlite_wasm_rs::sqlite3_prepare_v2(
db,
c_sql.as_ptr(),
-1,
&mut stmt as *mut _,
std::ptr::null_mut(),
)
};
if ret == sqlite_wasm_rs::SQLITE_OK && !stmt.is_null() {
let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
if step_ret == sqlite_wasm_rs::SQLITE_ROW {
let result_ptr = unsafe { sqlite_wasm_rs::sqlite3_column_text(stmt, 0) };
if !result_ptr.is_null() {
let result_mode = unsafe {
std::ffi::CStr::from_ptr(result_ptr as *const i8)
.to_string_lossy()
.to_uppercase()
};
if result_mode != journal_mode.to_uppercase() {
log::warn!(
"journal_mode {} requested but SQLite set {}",
journal_mode,
result_mode
);
} else {
log::info!("journal_mode successfully set to {}", result_mode);
}
}
}
unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
} else {
log::warn!("Failed to prepare journal_mode PRAGMA");
}
}
if let Some(auto_vacuum) = config.auto_vacuum {
let vacuum_mode = if auto_vacuum { 1 } else { 0 }; log::debug!("Setting auto_vacuum to {}", vacuum_mode);
exec_sql(db, &format!("PRAGMA auto_vacuum = {}", vacuum_mode))?;
}
log::info!("Database configuration applied successfully");
#[cfg(feature = "telemetry")]
let metrics = crate::telemetry::Metrics::new().map_err(|e| {
DatabaseError::new(
"METRICS_ERROR",
&format!("Failed to initialize metrics: {}", e),
)
})?;
let database = Database {
connection_state,
name: normalized_name.clone(), on_data_change_callback: None,
allow_non_leader_writes: false,
optimistic_updates_manager: std::cell::RefCell::new(
crate::storage::optimistic_updates::OptimisticUpdatesManager::new(),
),
coordination_metrics_manager: std::cell::RefCell::new(
crate::storage::coordination_metrics::CoordinationMetricsManager::new(),
),
#[cfg(feature = "telemetry")]
metrics: Some(metrics),
#[cfg(feature = "telemetry")]
span_recorder: None,
#[cfg(feature = "telemetry")]
span_context: Some(crate::telemetry::SpanContext::new()),
max_export_size_bytes: config.max_export_size_bytes,
};
#[cfg(target_arch = "wasm32")]
{
DB_OPEN_IN_PROGRESS.with(|opens| {
opens.borrow_mut().remove(&config.name);
});
web_sys::console::log_1(
&format!("[DB] {} - RELEASED sqlite open lock", config.name).into(),
);
}
Ok(database)
}
pub async fn open_with_vfs(filename: &str, vfs_name: &str) -> Result<Self, DatabaseError> {
use std::ffi::CString;
log::info!("Opening database {} with VFS {}", filename, vfs_name);
let normalized_name = normalize_db_name(filename);
let pool_key = normalized_name.trim_end_matches(".db").to_string();
let connection_state = crate::connection_pool::get_or_create_connection(&pool_key, || {
let mut db: *mut sqlite_wasm_rs::sqlite3 = std::ptr::null_mut();
let db_name = CString::new(normalized_name.clone())
.map_err(|_| "Invalid database name".to_string())?;
let vfs_cstr = CString::new(vfs_name).map_err(|_| "Invalid VFS name".to_string())?;
let ret = unsafe {
sqlite_wasm_rs::sqlite3_open_v2(
db_name.as_ptr(),
&mut db as *mut _,
sqlite_wasm_rs::SQLITE_OPEN_READWRITE | sqlite_wasm_rs::SQLITE_OPEN_CREATE,
vfs_cstr.as_ptr(),
)
};
if ret != sqlite_wasm_rs::SQLITE_OK {
let err_msg = if !db.is_null() {
unsafe {
let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(db);
if !msg_ptr.is_null() {
std::ffi::CStr::from_ptr(msg_ptr)
.to_string_lossy()
.into_owned()
} else {
"Unknown error".to_string()
}
}
} else {
"Failed to open database".to_string()
};
return Err(format!("SQLITE_ERROR: {}", err_msg));
}
Ok(db)
})
.map_err(|e| DatabaseError::new("OPEN_ERROR", &e))?;
log::info!(
"Successfully opened database {} with VFS {}",
normalized_name,
vfs_name
);
#[cfg(feature = "telemetry")]
let metrics = crate::telemetry::Metrics::new().map_err(|e| {
DatabaseError::new(
"METRICS_ERROR",
&format!("Failed to initialize metrics: {}", e),
)
})?;
Ok(Database {
connection_state,
name: normalized_name, on_data_change_callback: None,
allow_non_leader_writes: false,
optimistic_updates_manager: std::cell::RefCell::new(
crate::storage::optimistic_updates::OptimisticUpdatesManager::new(),
),
coordination_metrics_manager: std::cell::RefCell::new(
crate::storage::coordination_metrics::CoordinationMetricsManager::new(),
),
#[cfg(feature = "telemetry")]
metrics: Some(metrics),
#[cfg(feature = "telemetry")]
span_recorder: None,
#[cfg(feature = "telemetry")]
span_context: Some(crate::telemetry::SpanContext::new()),
max_export_size_bytes: Some(2 * 1024 * 1024 * 1024), })
}
pub async fn execute_internal(&mut self, sql: &str) -> Result<QueryResult, DatabaseError> {
use std::ffi::{CStr, CString};
let start_time = js_sys::Date::now();
#[cfg(feature = "telemetry")]
let span = if self.span_recorder.is_some() {
let query_type = sql
.trim()
.split_whitespace()
.next()
.unwrap_or("UNKNOWN")
.to_uppercase();
let mut builder = crate::telemetry::SpanBuilder::new("execute_query".to_string())
.with_attribute("query_type", query_type.clone())
.with_attribute("sql", sql.to_string());
if let Some(ref context) = self.span_context {
builder = builder.with_baggage_from_context(context);
}
let span = builder.build();
if let Some(ref context) = self.span_context {
context.enter_span(span.span_id.clone());
}
Some(span)
} else {
None
};
#[cfg(feature = "telemetry")]
#[cfg(feature = "telemetry")]
if let Some(metrics) = &self.metrics {
metrics.queries_total().inc();
}
if self.db().is_null() {
return Err(DatabaseError::new(
"NULL_CONNECTION",
"Database connection is null",
));
}
let sql_cstr = CString::new(sql)
.map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL string"))?;
if sql.trim().to_uppercase().starts_with("SELECT") {
let mut stmt = std::ptr::null_mut();
let ret = unsafe {
sqlite_wasm_rs::sqlite3_prepare_v2(
self.db(),
sql_cstr.as_ptr(),
-1,
&mut stmt,
std::ptr::null_mut(),
)
};
if ret != sqlite_wasm_rs::SQLITE_OK {
let err_msg = unsafe {
let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
if !msg_ptr.is_null() {
CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
} else {
format!("Unknown error (code: {})", ret)
}
};
#[cfg(feature = "telemetry")]
#[cfg(feature = "telemetry")]
if let Some(metrics) = &self.metrics {
metrics.errors_total().inc();
}
#[cfg(feature = "telemetry")]
if let Some(mut s) = span {
s.status = crate::telemetry::SpanStatus::Error(format!(
"Failed to prepare: {}",
err_msg
));
s.end_time_ms = Some(js_sys::Date::now());
if let Some(recorder) = &self.span_recorder {
recorder.record_span(s);
}
if let Some(ref context) = self.span_context {
context.exit_span();
}
}
return Err(DatabaseError::new(
"SQLITE_ERROR",
&format!("Failed to prepare statement: {}", err_msg),
)
.with_sql(sql));
}
let column_count = unsafe { sqlite_wasm_rs::sqlite3_column_count(stmt) };
let mut columns = Vec::new();
let mut rows = Vec::new();
for i in 0..column_count {
let col_name = unsafe {
let name_ptr = sqlite_wasm_rs::sqlite3_column_name(stmt, i);
if name_ptr.is_null() {
format!("col_{}", i)
} else {
std::ffi::CStr::from_ptr(name_ptr)
.to_string_lossy()
.into_owned()
}
};
columns.push(col_name);
}
loop {
let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
if step_ret == sqlite_wasm_rs::SQLITE_ROW {
let mut values = Vec::new();
for i in 0..column_count {
let value = unsafe {
let col_type = sqlite_wasm_rs::sqlite3_column_type(stmt, i);
match col_type {
sqlite_wasm_rs::SQLITE_NULL => ColumnValue::Null,
sqlite_wasm_rs::SQLITE_INTEGER => {
let val = sqlite_wasm_rs::sqlite3_column_int64(stmt, i);
ColumnValue::Integer(val)
}
sqlite_wasm_rs::SQLITE_FLOAT => {
let val = sqlite_wasm_rs::sqlite3_column_double(stmt, i);
ColumnValue::Real(val)
}
sqlite_wasm_rs::SQLITE_TEXT => {
let text_ptr = sqlite_wasm_rs::sqlite3_column_text(stmt, i);
if text_ptr.is_null() {
ColumnValue::Null
} else {
let text = std::ffi::CStr::from_ptr(text_ptr as *const i8)
.to_string_lossy()
.into_owned();
ColumnValue::Text(text)
}
}
sqlite_wasm_rs::SQLITE_BLOB => {
let blob_ptr = sqlite_wasm_rs::sqlite3_column_blob(stmt, i);
let blob_size = sqlite_wasm_rs::sqlite3_column_bytes(stmt, i);
if blob_ptr.is_null() || blob_size == 0 {
ColumnValue::Blob(vec![])
} else {
let blob_slice = std::slice::from_raw_parts(
blob_ptr as *const u8,
blob_size as usize,
);
ColumnValue::Blob(blob_slice.to_vec())
}
}
_ => ColumnValue::Null,
}
};
values.push(value);
}
rows.push(Row { values });
} else if step_ret == sqlite_wasm_rs::SQLITE_DONE {
break;
} else {
let err_msg = unsafe {
let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
if !err_ptr.is_null() {
std::ffi::CStr::from_ptr(err_ptr)
.to_string_lossy()
.to_string()
} else {
"Unknown SQLite error".to_string()
}
};
unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
#[cfg(feature = "telemetry")]
if let Some(metrics) = &self.metrics {
metrics.errors_total().inc();
}
return Err(DatabaseError::new(
"SQLITE_ERROR",
&format!("Error executing SELECT statement: {}", err_msg),
)
.with_sql(sql));
}
}
unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
let execution_time_ms = js_sys::Date::now() - start_time;
#[cfg(feature = "telemetry")]
if let Some(metrics) = &self.metrics {
metrics.query_duration().observe(execution_time_ms);
}
Ok(QueryResult {
columns,
rows,
affected_rows: 0,
last_insert_id: None,
execution_time_ms,
})
} else {
let mut stmt: *mut sqlite_wasm_rs::sqlite3_stmt = std::ptr::null_mut();
let ret = unsafe {
sqlite_wasm_rs::sqlite3_prepare_v2(
self.db(),
sql_cstr.as_ptr(),
-1,
&mut stmt,
std::ptr::null_mut(),
)
};
if ret != sqlite_wasm_rs::SQLITE_OK {
let err_msg = unsafe {
let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
if !msg_ptr.is_null() {
CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
} else {
format!("Unknown error (code: {})", ret)
}
};
#[cfg(feature = "telemetry")]
if let Some(metrics) = &self.metrics {
metrics.errors_total().inc();
}
return Err(DatabaseError::new(
"SQLITE_ERROR",
&format!("Failed to prepare statement: {}", err_msg),
)
.with_sql(sql));
}
let column_count = unsafe { sqlite_wasm_rs::sqlite3_column_count(stmt) };
let mut columns = Vec::new();
let mut rows = Vec::new();
if column_count > 0 {
for i in 0..column_count {
let col_name = unsafe {
let name_ptr = sqlite_wasm_rs::sqlite3_column_name(stmt, i);
if name_ptr.is_null() {
format!("column_{}", i)
} else {
std::ffi::CStr::from_ptr(name_ptr)
.to_string_lossy()
.into_owned()
}
};
columns.push(col_name);
}
loop {
let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
if step_ret == sqlite_wasm_rs::SQLITE_ROW {
let mut values = Vec::new();
for i in 0..column_count {
let value = unsafe {
let col_type = sqlite_wasm_rs::sqlite3_column_type(stmt, i);
match col_type {
sqlite_wasm_rs::SQLITE_TEXT => {
let text_ptr = sqlite_wasm_rs::sqlite3_column_text(stmt, i);
if text_ptr.is_null() {
ColumnValue::Null
} else {
let text =
std::ffi::CStr::from_ptr(text_ptr as *const i8)
.to_string_lossy()
.into_owned();
ColumnValue::Text(text)
}
}
sqlite_wasm_rs::SQLITE_INTEGER => ColumnValue::Integer(
sqlite_wasm_rs::sqlite3_column_int64(stmt, i),
),
_ => ColumnValue::Null,
}
};
values.push(value);
}
rows.push(Row { values });
} else if step_ret == sqlite_wasm_rs::SQLITE_DONE {
break;
} else {
let err_msg = unsafe {
let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
if !err_ptr.is_null() {
std::ffi::CStr::from_ptr(err_ptr)
.to_string_lossy()
.to_string()
} else {
"Unknown SQLite error".to_string()
}
};
unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
#[cfg(feature = "telemetry")]
if let Some(metrics) = &self.metrics {
metrics.errors_total().inc();
}
return Err(DatabaseError::new(
"SQLITE_ERROR",
&format!("Failed to execute statement: {}", err_msg),
)
.with_sql(sql));
}
}
} else {
let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
if step_ret != sqlite_wasm_rs::SQLITE_DONE {
let err_msg = unsafe {
let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
if !err_ptr.is_null() {
std::ffi::CStr::from_ptr(err_ptr)
.to_string_lossy()
.to_string()
} else {
"Unknown SQLite error".to_string()
}
};
unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
#[cfg(feature = "telemetry")]
if let Some(metrics) = &self.metrics {
metrics.errors_total().inc();
}
return Err(DatabaseError::new(
"SQLITE_ERROR",
&format!("Failed to execute statement: {}", err_msg),
)
.with_sql(sql));
}
}
unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
let affected_rows = unsafe { sqlite_wasm_rs::sqlite3_changes(self.db()) } as u32;
let last_insert_id = if sql.trim().to_uppercase().starts_with("INSERT") {
Some(unsafe { sqlite_wasm_rs::sqlite3_last_insert_rowid(self.db()) })
} else {
None
};
let execution_time_ms = js_sys::Date::now() - start_time;
#[cfg(feature = "telemetry")]
if let Some(metrics) = &self.metrics {
metrics.query_duration().observe(execution_time_ms);
}
#[cfg(feature = "telemetry")]
if let Some(mut s) = span {
s.status = crate::telemetry::SpanStatus::Ok;
s.end_time_ms = Some(js_sys::Date::now());
s.attributes
.insert("duration_ms".to_string(), execution_time_ms.to_string());
s.attributes
.insert("affected_rows".to_string(), affected_rows.to_string());
s.attributes
.insert("row_count".to_string(), rows.len().to_string());
if let Some(recorder) = &self.span_recorder {
recorder.record_span(s);
}
if let Some(ref context) = self.span_context {
context.exit_span();
}
}
Ok(QueryResult {
columns,
rows,
affected_rows,
last_insert_id,
execution_time_ms,
})
}
}
pub async fn execute_with_params_internal(
&mut self,
sql: &str,
params: &[ColumnValue],
) -> Result<QueryResult, DatabaseError> {
use std::ffi::{CStr, CString};
let start_time = js_sys::Date::now();
#[cfg(feature = "telemetry")]
let span = if self.span_recorder.is_some() {
let query_type = sql
.trim()
.split_whitespace()
.next()
.unwrap_or("UNKNOWN")
.to_uppercase();
let span = crate::telemetry::SpanBuilder::new("execute_query".to_string())
.with_attribute("query_type", query_type.clone())
.with_attribute("sql", sql.to_string())
.build();
Some(span)
} else {
None
};
#[cfg(feature = "telemetry")]
self.ensure_metrics_propagated();
#[cfg(feature = "telemetry")]
if let Some(metrics) = &self.metrics {
metrics.queries_total().inc();
}
let sql_cstr = CString::new(sql)
.map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL string"))?;
let mut stmt = std::ptr::null_mut();
let ret = unsafe {
sqlite_wasm_rs::sqlite3_prepare_v2(
self.db(),
sql_cstr.as_ptr(),
-1,
&mut stmt,
std::ptr::null_mut(),
)
};
if ret != sqlite_wasm_rs::SQLITE_OK {
let err_msg = unsafe {
let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
if !msg_ptr.is_null() {
CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
} else {
format!("Unknown error (code: {})", ret)
}
};
#[cfg(feature = "telemetry")]
if let Some(metrics) = &self.metrics {
metrics.errors_total().inc();
}
#[cfg(feature = "telemetry")]
if let Some(mut s) = span {
s.status =
crate::telemetry::SpanStatus::Error(format!("Failed to prepare: {}", err_msg));
s.end_time_ms = Some(js_sys::Date::now());
if let Some(recorder) = &self.span_recorder {
recorder.record_span(s);
}
}
return Err(DatabaseError::new(
"SQLITE_ERROR",
&format!("Failed to prepare statement: {}", err_msg),
)
.with_sql(sql));
}
let mut text_cstrings = Vec::new(); for (i, param) in params.iter().enumerate() {
let param_index = (i + 1) as i32;
let bind_ret = unsafe {
match param {
ColumnValue::Null => sqlite_wasm_rs::sqlite3_bind_null(stmt, param_index),
ColumnValue::Integer(val) => {
sqlite_wasm_rs::sqlite3_bind_int64(stmt, param_index, *val)
}
ColumnValue::Real(val) => {
sqlite_wasm_rs::sqlite3_bind_double(stmt, param_index, *val)
}
ColumnValue::Text(val) => {
let sanitized = val.replace('\0', "");
let text_cstr = CString::new(sanitized.as_str())
.expect("CString::new should not fail after null byte removal");
let result = sqlite_wasm_rs::sqlite3_bind_text(
stmt,
param_index,
text_cstr.as_ptr(),
sanitized.len() as i32,
sqlite_wasm_rs::SQLITE_TRANSIENT(),
);
text_cstrings.push(text_cstr); result
}
ColumnValue::Blob(val) => sqlite_wasm_rs::sqlite3_bind_blob(
stmt,
param_index,
val.as_ptr() as *const _,
val.len() as i32,
sqlite_wasm_rs::SQLITE_TRANSIENT(),
),
_ => sqlite_wasm_rs::sqlite3_bind_null(stmt, param_index),
}
};
if bind_ret != sqlite_wasm_rs::SQLITE_OK {
unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
#[cfg(feature = "telemetry")]
if let Some(metrics) = &self.metrics {
metrics.errors_total().inc();
}
return Err(
DatabaseError::new("SQLITE_ERROR", "Failed to bind parameter").with_sql(sql),
);
}
}
if sql.trim().to_uppercase().starts_with("SELECT") {
let column_count = unsafe { sqlite_wasm_rs::sqlite3_column_count(stmt) };
let mut columns = Vec::new();
let mut rows = Vec::new();
for i in 0..column_count {
let col_name = unsafe {
let name_ptr = sqlite_wasm_rs::sqlite3_column_name(stmt, i);
if name_ptr.is_null() {
format!("col_{}", i)
} else {
std::ffi::CStr::from_ptr(name_ptr)
.to_string_lossy()
.into_owned()
}
};
columns.push(col_name);
}
loop {
let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
if step_ret == sqlite_wasm_rs::SQLITE_ROW {
let mut values = Vec::new();
for i in 0..column_count {
let value = unsafe {
let col_type = sqlite_wasm_rs::sqlite3_column_type(stmt, i);
match col_type {
sqlite_wasm_rs::SQLITE_NULL => ColumnValue::Null,
sqlite_wasm_rs::SQLITE_INTEGER => {
let val = sqlite_wasm_rs::sqlite3_column_int64(stmt, i);
ColumnValue::Integer(val)
}
sqlite_wasm_rs::SQLITE_FLOAT => {
let val = sqlite_wasm_rs::sqlite3_column_double(stmt, i);
ColumnValue::Real(val)
}
sqlite_wasm_rs::SQLITE_TEXT => {
let text_ptr = sqlite_wasm_rs::sqlite3_column_text(stmt, i);
if text_ptr.is_null() {
ColumnValue::Null
} else {
let text = std::ffi::CStr::from_ptr(text_ptr as *const i8)
.to_string_lossy()
.into_owned();
ColumnValue::Text(text)
}
}
sqlite_wasm_rs::SQLITE_BLOB => {
let blob_ptr = sqlite_wasm_rs::sqlite3_column_blob(stmt, i);
let blob_size = sqlite_wasm_rs::sqlite3_column_bytes(stmt, i);
if blob_ptr.is_null() || blob_size == 0 {
ColumnValue::Blob(vec![])
} else {
let blob_slice = std::slice::from_raw_parts(
blob_ptr as *const u8,
blob_size as usize,
);
ColumnValue::Blob(blob_slice.to_vec())
}
}
_ => ColumnValue::Null,
}
};
values.push(value);
}
rows.push(Row { values });
} else if step_ret == sqlite_wasm_rs::SQLITE_DONE {
break;
} else {
let err_msg = unsafe {
let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
if !err_ptr.is_null() {
std::ffi::CStr::from_ptr(err_ptr)
.to_string_lossy()
.to_string()
} else {
"Unknown SQLite error".to_string()
}
};
unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
#[cfg(feature = "telemetry")]
if let Some(metrics) = &self.metrics {
metrics.errors_total().inc();
}
return Err(DatabaseError::new(
"SQLITE_ERROR",
&format!("Error executing SELECT statement: {}", err_msg),
)
.with_sql(sql));
}
}
unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
let execution_time_ms = js_sys::Date::now() - start_time;
#[cfg(feature = "telemetry")]
if let Some(metrics) = &self.metrics {
metrics.query_duration().observe(execution_time_ms);
}
#[cfg(feature = "telemetry")]
if let Some(mut s) = span {
s.status = crate::telemetry::SpanStatus::Ok;
s.end_time_ms = Some(js_sys::Date::now());
s.attributes
.insert("duration_ms".to_string(), execution_time_ms.to_string());
s.attributes
.insert("row_count".to_string(), rows.len().to_string());
if let Some(recorder) = &self.span_recorder {
recorder.record_span(s);
}
}
Ok(QueryResult {
columns,
rows,
affected_rows: 0,
last_insert_id: None,
execution_time_ms,
})
} else {
let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
#[cfg(feature = "telemetry")]
if let Some(metrics) = &self.metrics {
metrics.errors_total().inc();
}
unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
if step_ret != sqlite_wasm_rs::SQLITE_DONE {
let err_msg = unsafe {
let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
if !err_ptr.is_null() {
std::ffi::CStr::from_ptr(err_ptr)
.to_string_lossy()
.to_string()
} else {
"Unknown SQLite error".to_string()
}
};
return Err(DatabaseError::new(
"SQLITE_ERROR",
&format!("Failed to execute statement: {}", err_msg),
)
.with_sql(sql));
}
let execution_time_ms = js_sys::Date::now() - start_time;
#[cfg(feature = "telemetry")]
if let Some(metrics) = &self.metrics {
metrics.query_duration().observe(execution_time_ms);
}
let affected_rows = unsafe { sqlite_wasm_rs::sqlite3_changes(self.db()) } as u32;
let last_insert_id = if sql.trim().to_uppercase().starts_with("INSERT") {
Some(unsafe { sqlite_wasm_rs::sqlite3_last_insert_rowid(self.db()) })
} else {
None
};
#[cfg(feature = "telemetry")]
if let Some(mut s) = span {
s.status = crate::telemetry::SpanStatus::Ok;
s.end_time_ms = Some(js_sys::Date::now());
s.attributes
.insert("duration_ms".to_string(), execution_time_ms.to_string());
s.attributes
.insert("affected_rows".to_string(), affected_rows.to_string());
if let Some(recorder) = &self.span_recorder {
recorder.record_span(s);
}
}
Ok(QueryResult {
columns: vec![],
rows: vec![],
affected_rows,
last_insert_id,
execution_time_ms,
})
}
}
#[cfg(feature = "telemetry")]
pub fn set_metrics(&mut self, metrics: Option<crate::telemetry::Metrics>) {
self.metrics = metrics.clone();
self.ensure_metrics_propagated();
}
#[cfg(feature = "telemetry")]
pub fn set_span_recorder(&mut self, recorder: Option<crate::telemetry::SpanRecorder>) {
self.span_recorder = recorder;
}
#[cfg(feature = "telemetry")]
pub fn get_span_context(&self) -> Option<&crate::telemetry::SpanContext> {
self.span_context.as_ref()
}
#[cfg(feature = "telemetry")]
pub fn get_span_recorder(&self) -> Option<&crate::telemetry::SpanRecorder> {
self.span_recorder.as_ref()
}
#[cfg(feature = "telemetry")]
fn ensure_metrics_propagated(&self) {
#[cfg(target_arch = "wasm32")]
{
use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
if self.metrics.is_none() {
return;
}
let db_name = &self.name;
if let Some(storage_rc) = get_storage_with_fallback(db_name) {
use crate::vfs::indexeddb_vfs::with_storage_borrow_mut;
let _ = with_storage_borrow_mut(&storage_rc, "set_metrics", |storage| {
storage.set_metrics(self.metrics.clone());
Ok(())
});
}
}
}
pub async fn close_internal(&mut self) -> Result<(), DatabaseError> {
log::info!("CLOSE_INTERNAL STARTED for: {}", self.name);
if self.connection_state.db.get().is_null() {
log::info!(
"Connection already null for {}, skipping close operations",
self.name
);
return Ok(());
}
log::info!("Checkpointing WAL before close: {}", self.name);
let _ = self
.execute_internal("PRAGMA wal_checkpoint(PASSIVE)")
.await;
log::info!("WAL checkpoint completed for: {}", self.name);
log::info!("Syncing database before close: {}", self.name);
self.sync_internal().await?;
log::info!("Sync completed for: {}", self.name);
web_sys::console::log_1(
&format!("CLOSE: About to stop leader election for {}", self.name).into(),
);
#[cfg(target_arch = "wasm32")]
{
use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
let db_name = &self.name;
web_sys::console::log_1(
&format!("STOP_ELECTION: Getting storage for {}", db_name).into(),
);
log::info!("STOP_ELECTION: Getting storage for {}", db_name);
let storage_rc = get_storage_with_fallback(db_name);
if let Some(storage_rc) = storage_rc {
log::info!("STOP_ELECTION: Found storage for {}, calling stop", db_name);
match with_storage_async!(storage_rc, "stop_leader_election", |storage| storage
.stop_leader_election())
{
Some(Ok(())) => {
log::info!("STOP_ELECTION: Successfully stopped for {}", db_name);
}
Some(Err(e)) => {
log::warn!("STOP_ELECTION: Failed for {}: {:?}", db_name, e);
}
None => {
log::warn!("STOP_ELECTION: Borrow failed for {}", db_name);
}
}
} else {
log::warn!("STOP_ELECTION: No storage found for {}", db_name);
}
log::info!("STOP_ELECTION: Completed section for {}", db_name);
}
log::info!(
"Closed database: {} (connection will be released on Drop)",
self.name
);
Ok(())
}
pub async fn query(&mut self, sql: &str) -> Result<Vec<Row>, DatabaseError> {
let result = self.execute_internal(sql).await?;
Ok(result.rows)
}
pub async fn sync_internal(&mut self) -> Result<(), DatabaseError> {
#[cfg(all(target_arch = "wasm32", feature = "telemetry"))]
let start_time = js_sys::Date::now();
#[cfg(feature = "telemetry")]
let span = if self.span_recorder.is_some() {
let mut builder = crate::telemetry::SpanBuilder::new("vfs_sync".to_string())
.with_attribute("operation", "sync");
if let Some(ref context) = self.span_context {
builder = builder
.with_context(context)
.with_baggage_from_context(context);
}
let span = builder.build();
if let Some(ref context) = self.span_context {
context.enter_span(span.span_id.clone());
}
Some(span)
} else {
None
};
#[cfg(feature = "telemetry")]
if let Some(ref metrics) = self.metrics {
metrics.sync_operations_total().inc();
}
#[cfg(feature = "telemetry")]
let mut blocks_count = 0;
#[cfg(target_arch = "wasm32")]
{
use crate::storage::vfs_sync;
let storage_name = &self.name;
let next_commit = vfs_sync::with_global_commit_marker(|cm| {
#[cfg(target_arch = "wasm32")]
let mut cm_ref = cm.borrow_mut();
#[cfg(not(target_arch = "wasm32"))]
let mut cm_ref = cm.lock();
let current = cm_ref.get(storage_name).copied().unwrap_or(0);
let new_marker = current + 1;
cm_ref.insert(storage_name.to_string(), new_marker);
log::debug!(
"Advanced commit marker for {} from {} to {}",
storage_name,
current,
new_marker
);
new_marker
});
web_sys::console::log_1(
&format!(
"[SYNC] Collecting blocks from GLOBAL_STORAGE for: {}",
storage_name
)
.into(),
);
let (blocks_to_persist, metadata_to_persist) =
vfs_sync::with_global_storage(|storage| {
#[cfg(target_arch = "wasm32")]
let storage_map = storage.borrow();
#[cfg(not(target_arch = "wasm32"))]
let storage_map = storage.lock();
let blocks = if let Some(db_storage) = storage_map.get(storage_name) {
let count = db_storage.len();
web_sys::console::log_1(
&format!("[SYNC] Found {} blocks in GLOBAL_STORAGE", count).into(),
);
db_storage
.iter()
.map(|(&id, data)| (id, data.clone()))
.collect::<Vec<_>>()
} else {
web_sys::console::log_1(
&format!(
"[SYNC] No blocks found in GLOBAL_STORAGE for {}",
storage_name
)
.into(),
);
Vec::new()
};
let metadata = vfs_sync::with_global_metadata(|meta| {
#[cfg(target_arch = "wasm32")]
let meta_map = meta.borrow();
#[cfg(not(target_arch = "wasm32"))]
let meta_map = meta.lock();
if let Some(db_meta) = meta_map.get(storage_name) {
let count = db_meta.len();
web_sys::console::log_1(
&format!("[SYNC] Found {} metadata entries", count).into(),
);
db_meta
.iter()
.map(|(&id, metadata)| (id, metadata.checksum))
.collect::<Vec<_>>()
} else {
web_sys::console::log_1(&format!("[SYNC] No metadata found").into());
Vec::new()
}
});
(blocks, metadata)
});
web_sys::console::log_1(
&format!(
"[SYNC] Preparing to persist {} blocks to IndexedDB",
blocks_to_persist.len()
)
.into(),
);
if !blocks_to_persist.is_empty() {
#[cfg(feature = "telemetry")]
{
blocks_count = blocks_to_persist.len();
}
web_sys::console::log_1(
&format!(
"[SYNC] Persisting {} blocks to IndexedDB",
blocks_to_persist.len()
)
.into(),
);
crate::storage::wasm_indexeddb::persist_to_indexeddb_event_based(
storage_name,
blocks_to_persist,
metadata_to_persist,
next_commit,
#[cfg(feature = "telemetry")]
self.span_recorder.clone(),
#[cfg(feature = "telemetry")]
span.as_ref().map(|s| s.span_id.clone()),
)
.await?;
web_sys::console::log_1(
&format!("[SYNC] Successfully persisted to IndexedDB").into(),
);
} else {
web_sys::console::log_1(
&format!("[SYNC] WARNING: No blocks to persist - GLOBAL_STORAGE is empty!")
.into(),
);
}
use crate::storage::broadcast_notifications::{
BroadcastNotification, send_change_notification,
};
let notification = BroadcastNotification::DataChanged {
db_name: self.name.clone(),
timestamp: js_sys::Date::now() as u64,
};
log::debug!("Sending DataChanged notification for {}", self.name);
if let Err(e) = send_change_notification(¬ification) {
log::warn!("Failed to send change notification: {}", e);
}
}
#[cfg(all(target_arch = "wasm32", feature = "telemetry"))]
if let Some(ref metrics) = self.metrics {
let duration_ms = js_sys::Date::now() - start_time;
metrics.sync_duration().observe(duration_ms);
}
#[cfg(feature = "telemetry")]
if let Some(mut s) = span {
s.status = crate::telemetry::SpanStatus::Ok;
#[cfg(target_arch = "wasm32")]
{
s.end_time_ms = Some(js_sys::Date::now());
let duration_ms = s.end_time_ms.unwrap() - s.start_time_ms;
s.attributes
.insert("duration_ms".to_string(), duration_ms.to_string());
}
#[cfg(not(target_arch = "wasm32"))]
{
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as f64;
s.end_time_ms = Some(now);
let duration_ms = s.end_time_ms.unwrap() - s.start_time_ms;
s.attributes
.insert("duration_ms".to_string(), duration_ms.to_string());
}
s.attributes
.insert("blocks_persisted".to_string(), blocks_count.to_string());
if let Some(recorder) = &self.span_recorder {
recorder.record_span(s);
}
if let Some(ref context) = self.span_context {
context.exit_span();
}
}
Ok(())
}
}
#[cfg(target_arch = "wasm32")]
impl Drop for Database {
fn drop(&mut self) {
web_sys::console::log_1(&format!("DROP: Releasing connection for {}", self.name).into());
let pool_key = self.name.trim_end_matches(".db");
crate::connection_pool::release_connection(pool_key);
web_sys::console::log_1(&format!("DROP: Connection released for {}", self.name).into());
use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
if let Some(storage_rc) = get_storage_with_fallback(&self.name) {
let storage = &*storage_rc;
if let Ok(mut manager_ref) = storage.leader_election.try_borrow_mut() {
if let Some(ref mut manager) = *manager_ref {
if let Some(interval_id) = manager.heartbeat_interval.take() {
if let Some(window) = web_sys::window() {
window.clear_interval_with_handle(interval_id);
web_sys::console::log_1(
&format!(
"DROP: Cleared heartbeat interval {} for {}",
interval_id, self.name
)
.into(),
);
}
}
}
} else {
web_sys::console::log_1(
&format!(
"[DROP] Skipping {} (heartbeat already stopped by shared DB)",
self.name
)
.into(),
);
}
}
log::debug!(
"Closed database: {} (BlockStorage remains in registry)",
self.name
);
}
}
#[cfg(target_arch = "wasm32")]
#[wasm_bindgen]
impl Database {
#[wasm_bindgen(js_name = "newDatabase")]
pub async fn new_wasm(name: String) -> Result<Database, JsValue> {
let normalized_name = if name.ends_with(".db") {
name.clone()
} else {
format!("{}.db", name)
};
let config = DatabaseConfig {
name: normalized_name.clone(),
version: Some(1),
cache_size: Some(10_000),
page_size: Some(4096),
auto_vacuum: Some(true),
journal_mode: Some("WAL".to_string()),
max_export_size_bytes: Some(2 * 1024 * 1024 * 1024), };
let db = Database::new(config)
.await
.map_err(|e| JsValue::from_str(&format!("Failed to create database: {}", e)))?;
Self::start_write_queue_listener(&normalized_name)?;
Ok(db)
}
#[wasm_bindgen(getter)]
pub fn name(&self) -> String {
self.name.clone()
}
#[wasm_bindgen(js_name = "getAllDatabases")]
pub async fn get_all_databases() -> Result<JsValue, JsValue> {
use crate::storage::vfs_sync::with_global_storage;
use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
use std::collections::HashSet;
log::info!("getAllDatabases called");
let mut db_names = HashSet::new();
match Self::get_persistent_database_list() {
Ok(persistent_list) => {
log::info!("Persistent list has {} entries", persistent_list.len());
for name in persistent_list {
log::info!("Found in persistent list: {}", name);
db_names.insert(name);
}
}
Err(e) => {
log::warn!("Failed to get persistent list: {:?}", e);
}
}
STORAGE_REGISTRY.with(|reg| unsafe {
let registry = &*reg.get();
log::info!("STORAGE_REGISTRY has {} entries", registry.len());
for key in registry.keys() {
log::info!("Found in STORAGE_REGISTRY: {}", key);
db_names.insert(key.clone());
}
});
with_global_storage(|storage| {
let storage_borrow = storage.borrow();
log::info!("GLOBAL_STORAGE has {} entries", storage_borrow.len());
for key in storage_borrow.keys() {
log::info!("Found in GLOBAL_STORAGE: {}", key);
db_names.insert(key.clone());
}
});
log::info!("Total unique databases found: {}", db_names.len());
let mut result_vec: Vec<String> = db_names.into_iter().collect();
result_vec.sort();
let js_array = js_sys::Array::new();
for name in &result_vec {
log::info!("Returning database: {}", name);
js_array.push(&JsValue::from_str(name));
}
log::info!("getAllDatabases returning {} databases", result_vec.len());
Ok(js_array.into())
}
#[wasm_bindgen(js_name = "deleteDatabase")]
pub async fn delete_database(name: String) -> Result<(), JsValue> {
use crate::storage::vfs_sync::{
with_global_commit_marker, with_global_metadata, with_global_storage,
};
let normalized_name = if name.ends_with(".db") {
name.clone()
} else {
format!("{}.db", name)
};
log::info!("Deleting database: {}", normalized_name);
use crate::vfs::indexeddb_vfs::remove_storage_from_registry;
remove_storage_from_registry(&normalized_name);
with_global_storage(|gs| {
#[cfg(target_arch = "wasm32")]
let mut storage = gs.borrow_mut();
#[cfg(not(target_arch = "wasm32"))]
let mut storage = gs.lock();
storage.remove(&normalized_name);
});
with_global_metadata(|gm| {
#[cfg(target_arch = "wasm32")]
let mut metadata = gm.borrow_mut();
#[cfg(not(target_arch = "wasm32"))]
let mut metadata = gm.lock();
metadata.remove(&normalized_name);
});
with_global_commit_marker(|cm| {
#[cfg(target_arch = "wasm32")]
let mut markers = cm.borrow_mut();
#[cfg(not(target_arch = "wasm32"))]
let mut markers = cm.lock();
log::info!(
"Cleared commit marker for {} from GLOBAL storage",
normalized_name
);
markers.remove(&normalized_name);
});
let idb_name = format!("absurder_{}", normalized_name);
let _delete_promise = js_sys::eval(&format!("indexedDB.deleteDatabase('{}')", idb_name))
.map_err(|e| JsValue::from_str(&format!("Failed to delete IndexedDB: {:?}", e)))?;
log::info!("Database deleted: {}", normalized_name);
Self::remove_database_from_persistent_list(&normalized_name)?;
Ok(())
}
#[allow(dead_code)]
fn add_database_to_persistent_list(db_name: &str) -> Result<(), JsValue> {
log::info!("add_database_to_persistent_list called for: {}", db_name);
let window = web_sys::window().ok_or_else(|| {
log::error!("No window object");
JsValue::from_str("No window")
})?;
let storage = window
.local_storage()
.map_err(|e| {
log::error!("Failed to get localStorage: {:?}", e);
JsValue::from_str("No localStorage")
})?
.ok_or_else(|| {
log::error!("localStorage not available");
JsValue::from_str("localStorage not available")
})?;
let key = "absurder_db_list";
let existing = storage.get_item(key).map_err(|e| {
log::error!("Failed to read localStorage key {}: {:?}", key, e);
JsValue::from_str("Failed to read localStorage")
})?;
log::debug!("Existing localStorage value: {:?}", existing);
let mut db_list: Vec<String> = if let Some(json_str) = existing {
match serde_json::from_str(&json_str) {
Ok(list) => {
log::debug!("Parsed existing list: {:?}", list);
list
}
Err(e) => {
log::warn!("Failed to parse localStorage JSON: {}, starting fresh", e);
Vec::new()
}
}
} else {
log::debug!("No existing list, creating new");
Vec::new()
};
if !db_list.contains(&db_name.to_string()) {
db_list.push(db_name.to_string());
db_list.sort();
log::debug!("Updated list: {:?}", db_list);
let json_str = serde_json::to_string(&db_list).map_err(|e| {
log::error!("Failed to serialize list: {}", e);
JsValue::from_str("Failed to serialize")
})?;
log::debug!("Writing to localStorage: {}", json_str);
storage.set_item(key, &json_str).map_err(|e| {
log::error!("Failed to write to localStorage: {:?}", e);
JsValue::from_str("Failed to write localStorage")
})?;
log::info!("Successfully added {} to persistent database list", db_name);
} else {
log::info!("{} already in persistent list", db_name);
}
Ok(())
}
fn remove_database_from_persistent_list(db_name: &str) -> Result<(), JsValue> {
let window = web_sys::window().ok_or_else(|| JsValue::from_str("No window"))?;
let storage = window
.local_storage()
.map_err(|_| JsValue::from_str("No localStorage"))?
.ok_or_else(|| JsValue::from_str("localStorage not available"))?;
let key = "absurder_db_list";
let existing = storage
.get_item(key)
.map_err(|_| JsValue::from_str("Failed to read localStorage"))?;
if let Some(json_str) = existing {
let mut db_list: Vec<String> =
serde_json::from_str(&json_str).unwrap_or_else(|_| Vec::new());
db_list.retain(|name| name != db_name);
let json_str = serde_json::to_string(&db_list)
.map_err(|_| JsValue::from_str("Failed to serialize"))?;
storage
.set_item(key, &json_str)
.map_err(|_| JsValue::from_str("Failed to write localStorage"))?;
log::info!("Removed {} from persistent database list", db_name);
}
Ok(())
}
fn get_persistent_database_list() -> Result<Vec<String>, JsValue> {
log::info!("get_persistent_database_list called");
let window = web_sys::window().ok_or_else(|| {
log::error!("No window object");
JsValue::from_str("No window")
})?;
let storage = window
.local_storage()
.map_err(|e| {
log::error!("Failed to get localStorage: {:?}", e);
JsValue::from_str("No localStorage")
})?
.ok_or_else(|| {
log::error!("localStorage not available");
JsValue::from_str("localStorage not available")
})?;
let key = "absurder_db_list";
let existing = storage.get_item(key).map_err(|e| {
log::error!("Failed to read localStorage key {}: {:?}", key, e);
JsValue::from_str("Failed to read localStorage")
})?;
log::debug!("Read from localStorage: {:?}", existing);
if let Some(json_str) = existing {
match serde_json::from_str::<Vec<String>>(&json_str) {
Ok(db_list) => {
log::info!(
"Successfully parsed {} databases from localStorage",
db_list.len()
);
log::debug!("Database list: {:?}", db_list);
Ok(db_list)
}
Err(e) => {
log::error!("Failed to parse localStorage JSON: {}", e);
Ok(Vec::new())
}
}
} else {
log::info!("No persistent database list in localStorage");
Ok(Vec::new())
}
}
fn start_write_queue_listener(db_name: &str) -> Result<(), JsValue> {
use crate::storage::write_queue::{
WriteQueueMessage, WriteResponse, register_write_queue_listener,
};
use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
let db_name_clone = db_name.to_string();
let callback = Closure::wrap(Box::new(move |msg: JsValue| {
let db_name_inner = db_name_clone.clone();
if let Ok(json_str) = js_sys::JSON::stringify(&msg) {
if let Some(json_str) = json_str.as_string() {
if let Ok(message) = serde_json::from_str::<WriteQueueMessage>(&json_str) {
if let WriteQueueMessage::WriteRequest(request) = message {
log::debug!("Leader received write request: {}", request.request_id);
let storage_rc = get_storage_with_fallback(&db_name_inner);
if let Some(storage) = storage_rc {
wasm_bindgen_futures::spawn_local(async move {
let is_leader = with_storage_async!(
storage,
"write_queue_is_leader",
|s| s.is_leader()
);
if is_leader.is_none() {
log::error!("Failed to check leader status");
return;
}
let is_leader = is_leader.unwrap();
if !is_leader {
log::error!("Not leader, ignoring write request");
return;
}
log::debug!("Processing write request as leader");
match Database::new_wasm(db_name_inner.clone()).await {
Ok(mut db) => {
match db.execute_internal(&request.sql).await {
Ok(result) => {
let response = WriteResponse::Success {
request_id: request.request_id.clone(),
affected_rows: result.affected_rows
as usize,
};
use crate::storage::write_queue::send_write_response;
if let Err(e) = send_write_response(
&db_name_inner,
response,
) {
log::error!(
"Failed to send response: {}",
e
);
} else {
log::info!(
"Write response sent successfully"
);
}
}
Err(e) => {
let response = WriteResponse::Error {
request_id: request.request_id.clone(),
error_message: e.to_string(),
};
use crate::storage::write_queue::send_write_response;
if let Err(e) = send_write_response(
&db_name_inner,
response,
) {
log::error!(
"Failed to send error response: {}",
e
);
}
}
}
}
Err(e) => {
log::error!(
"Failed to create db for write processing: {:?}",
e
);
}
}
});
}
}
}
}
}
}) as Box<dyn FnMut(JsValue)>);
let callback_fn = callback.as_ref().unchecked_ref();
register_write_queue_listener(db_name, callback_fn).map_err(|e| {
JsValue::from_str(&format!("Failed to register write queue listener: {}", e))
})?;
callback.forget();
Ok(())
}
#[wasm_bindgen]
pub async fn execute(&mut self, sql: &str) -> Result<JsValue, JsValue> {
self.check_write_permission(sql)
.await
.map_err(|e| JsValue::from_str(&format!("Write permission denied: {}", e)))?;
let result = self
.execute_internal(sql)
.await
.map_err(|e| JsValue::from_str(&format!("Query execution failed: {}", e)))?;
serde_wasm_bindgen::to_value(&result).map_err(|e| JsValue::from_str(&e.to_string()))
}
#[wasm_bindgen(js_name = "executeWithParams")]
pub async fn execute_with_params(
&mut self,
sql: &str,
params: JsValue,
) -> Result<JsValue, JsValue> {
let params: Vec<ColumnValue> = serde_wasm_bindgen::from_value(params)
.map_err(|e| JsValue::from_str(&format!("Invalid parameters: {}", e)))?;
self.check_write_permission(sql)
.await
.map_err(|e| JsValue::from_str(&format!("Write permission denied: {}", e)))?;
let result = self
.execute_with_params_internal(sql, ¶ms)
.await
.map_err(|e| JsValue::from_str(&format!("Query execution failed: {}", e)))?;
serde_wasm_bindgen::to_value(&result).map_err(|e| JsValue::from_str(&e.to_string()))
}
#[wasm_bindgen]
pub async fn close(&mut self) -> Result<(), JsValue> {
self.close_internal()
.await
.map_err(|e| JsValue::from_str(&format!("Failed to close database: {}", e)))
}
#[wasm_bindgen(js_name = "forceCloseConnection")]
pub async fn force_close_connection(&mut self) -> Result<(), JsValue> {
let _ = self.close_internal().await;
let pool_key = self.name.trim_end_matches(".db");
crate::connection_pool::force_close_connection(pool_key);
#[cfg(target_arch = "wasm32")]
{
crate::cleanup::cleanup_all_state(pool_key)
.await
.map_err(|e| JsValue::from_str(&format!("Cleanup failed: {}", e)))?;
}
log::info!("Force closed and removed connection for: {}", self.name);
Ok(())
}
#[wasm_bindgen]
pub async fn sync(&mut self) -> Result<(), JsValue> {
self.sync_internal()
.await
.map_err(|e| JsValue::from_str(&format!("Failed to sync database: {}", e)))
}
#[wasm_bindgen(js_name = "allowNonLeaderWrites")]
pub async fn allow_non_leader_writes(&mut self, allow: bool) -> Result<(), JsValue> {
log::debug!("Setting allowNonLeaderWrites = {} for {}", allow, self.name);
self.allow_non_leader_writes = allow;
Ok(())
}
#[wasm_bindgen(js_name = "exportToFile")]
pub async fn export_to_file(&self) -> Result<js_sys::Uint8Array, JsValue> {
let db_name = self.name.clone();
let max_export_size = self.max_export_size_bytes;
log::info!("[EXPORT] ===== Step 1: Acquiring lock");
let _guard = weblocks::acquire(&db_name, weblocks::AcquireOptions::exclusive()).await?;
log::info!("[EXPORT] ===== Step 2: Lock acquired");
use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
log::info!("[EXPORT] ===== Step 3: Getting storage");
let storage_rc = get_storage_with_fallback(&db_name).ok_or_else(|| {
JsValue::from_str(&format!("Storage not found for database: {}", db_name))
})?;
log::info!("[EXPORT] ===== Step 4: Got storage, reloading cache");
#[cfg(target_arch = "wasm32")]
{
storage_rc.reload_cache_from_global_storage();
}
log::info!("[EXPORT] ===== Step 5: Checkpointing WAL");
if !self.connection_state.db.get().is_null() {
use std::ffi::CString;
let pragma = CString::new("PRAGMA wal_checkpoint(PASSIVE)").unwrap();
unsafe {
let mut stmt = std::ptr::null_mut();
let rc = sqlite_wasm_rs::sqlite3_prepare_v2(
self.connection_state.db.get(),
pragma.as_ptr(),
-1,
&mut stmt,
std::ptr::null_mut(),
);
if rc == sqlite_wasm_rs::SQLITE_OK && !stmt.is_null() {
sqlite_wasm_rs::sqlite3_step(stmt);
sqlite_wasm_rs::sqlite3_finalize(stmt);
log::info!("[EXPORT] WAL checkpoint completed");
} else {
log::warn!("[EXPORT] WAL checkpoint failed with rc: {}", rc);
}
}
}
log::info!("[EXPORT] ===== Step 6: Starting sync");
storage_rc
.sync()
.await
.map_err(|e| JsValue::from_str(&format!("Sync failed: {}", e)))?;
log::info!("[EXPORT] ===== Step 7: Sync complete");
log::info!("[EXPORT] Calling export_database_to_bytes");
let db_bytes = {
let storage = &*storage_rc;
crate::storage::export::export_database_to_bytes(storage, max_export_size)
.await
.map_err(|e| {
log::error!("[EXPORT] Export failed: {}", e);
JsValue::from_str(&format!("Export failed: {}", e))
})?
};
log::info!("[EXPORT] Export complete: {} bytes", db_bytes.len());
let uint8_array = js_sys::Uint8Array::new_with_length(db_bytes.len() as u32);
uint8_array.copy_from(&db_bytes);
Ok(uint8_array)
}
#[wasm_bindgen(js_name = "testLock")]
pub async fn test_lock(&self, value: u32) -> Result<u32, JsValue> {
let lock_name = format!("{}.lock_test", self.name);
log::info!(
"[LOCK-TEST] Acquiring lock: {} with value: {}",
lock_name,
value
);
let _guard = weblocks::acquire(&lock_name, weblocks::AcquireOptions::exclusive()).await?;
log::info!("[LOCK-TEST] Lock acquired, processing value: {}", value);
let result = value + 1;
let delay_promise = js_sys::Promise::new(&mut |resolve, _reject| {
let window = web_sys::window().unwrap();
let _ = window
.set_timeout_with_callback_and_timeout_and_arguments_0(resolve.unchecked_ref(), 10);
});
wasm_bindgen_futures::JsFuture::from(delay_promise).await?;
log::info!(
"[LOCK-TEST] Lock releasing for: {} with result: {}",
lock_name,
result
);
Ok(result)
}
#[wasm_bindgen(js_name = "importFromFile")]
pub async fn import_from_file(&mut self, file_data: js_sys::Uint8Array) -> Result<(), JsValue> {
log::info!("[IMPORT] Starting import with lock for: {}", self.name);
let db_name = self.name.clone();
let data = file_data.to_vec();
let _guard = weblocks::acquire(&db_name, weblocks::AcquireOptions::exclusive()).await?;
log::info!("[IMPORT] Lock acquired for: {}", db_name);
log::debug!("Import data size: {} bytes", data.len());
log::debug!("Force-closing database connection before import");
self.close_internal()
.await
.map_err(|e| JsValue::from_str(&format!("Failed to close before import: {}", e)))?;
let pool_key = self.name.trim_end_matches(".db");
crate::connection_pool::force_close_connection(pool_key);
self.connection_state.db.set(std::ptr::null_mut());
log::debug!("Removed connection from pool for import");
crate::storage::import::import_database_from_bytes(&db_name, data)
.await
.map_err(|e| {
log::error!("Import failed for {}: {}", db_name, e);
JsValue::from_str(&format!("Import failed: {}", e))
})?;
log::info!("[IMPORT] Import complete for: {}", db_name);
log::info!("[IMPORT] Reopening connection for: {}", db_name);
use std::ffi::CString;
let vfs_name = format!("vfs_{}", db_name.trim_end_matches(".db"));
let pool_key = db_name.trim_end_matches(".db").to_string();
let db_name_for_closure = db_name.clone();
let vfs_name_for_closure = vfs_name.clone();
let new_state = crate::connection_pool::get_or_create_connection(&pool_key, || {
let mut db = std::ptr::null_mut();
let db_name_cstr = CString::new(db_name_for_closure.clone())
.map_err(|_| "Invalid database name".to_string())?;
let vfs_cstr = CString::new(vfs_name_for_closure.as_str())
.map_err(|_| "Invalid VFS name".to_string())?;
log::info!(
"[IMPORT] Reopening database: {} with VFS: {}",
db_name_for_closure,
vfs_name_for_closure
);
let ret = unsafe {
sqlite_wasm_rs::sqlite3_open_v2(
db_name_cstr.as_ptr(),
&mut db as *mut _,
sqlite_wasm_rs::SQLITE_OPEN_READWRITE | sqlite_wasm_rs::SQLITE_OPEN_CREATE,
vfs_cstr.as_ptr(),
)
};
if ret != sqlite_wasm_rs::SQLITE_OK {
let err_msg = unsafe {
let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(db);
if !msg_ptr.is_null() {
std::ffi::CStr::from_ptr(msg_ptr)
.to_string_lossy()
.into_owned()
} else {
"Unknown error".to_string()
}
};
return Err(format!(
"Failed to reopen database after import: {}",
err_msg
));
}
log::info!("[IMPORT] Database reopened successfully");
Ok(db)
})
.map_err(|e| {
JsValue::from_str(&format!("Failed to reopen connection after import: {}", e))
})?;
self.connection_state = new_state;
log::info!("[IMPORT] Connection state updated for: {}", db_name);
Ok(())
}
#[wasm_bindgen(js_name = "waitForLeadership")]
pub async fn wait_for_leadership(&mut self) -> Result<(), JsValue> {
use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
#[cfg(feature = "telemetry")]
if let Some(ref metrics) = self.metrics {
metrics.leader_election_attempts_total().inc();
}
let db_name = &self.name;
let start_time = js_sys::Date::now();
let timeout_ms = 5000.0;
loop {
let storage_rc = get_storage_with_fallback(db_name);
if let Some(storage) = storage_rc {
let is_leader =
match with_storage_async!(storage, "wait_for_leadership", |s| s.is_leader()) {
Some(v) => v,
None => continue,
};
if is_leader {
log::info!("Became leader for {}", db_name);
#[cfg(feature = "telemetry")]
if let Some(ref metrics) = self.metrics {
let duration_ms = js_sys::Date::now() - start_time;
metrics.leader_election_duration().observe(duration_ms);
metrics.is_leader().set(1.0);
metrics.leadership_changes_total().inc();
}
return Ok(());
}
}
if js_sys::Date::now() - start_time > timeout_ms {
#[cfg(feature = "telemetry")]
if let Some(ref metrics) = self.metrics {
let duration_ms = js_sys::Date::now() - start_time;
metrics.leader_election_duration().observe(duration_ms);
}
return Err(JsValue::from_str("Timeout waiting for leadership"));
}
let promise = js_sys::Promise::new(&mut |resolve, _| {
let window = web_sys::window().expect("should have window");
let _ = window.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, 100);
});
let _ = wasm_bindgen_futures::JsFuture::from(promise).await;
}
}
#[wasm_bindgen(js_name = "requestLeadership")]
pub async fn request_leadership(&mut self) -> Result<(), JsValue> {
use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
let db_name = &self.name;
log::debug!("Requesting leadership for {}", db_name);
#[cfg(feature = "telemetry")]
let telemetry_data = if self.metrics.is_some() {
let start_time = js_sys::Date::now();
let was_leader = self
.is_leader_wasm()
.await
.ok()
.and_then(|v| v.as_bool())
.unwrap_or(false);
if let Some(ref metrics) = self.metrics {
metrics.leader_elections_total().inc();
}
Some((start_time, was_leader))
} else {
None
};
let storage_rc = get_storage_with_fallback(db_name);
if let Some(storage) = storage_rc {
{
let result = with_storage_async!(storage, "request_leadership", |s| s
.start_leader_election())
.ok_or_else(|| {
JsValue::from_str("Failed to acquire storage lock for leadership request")
})?;
result.map_err(|e| {
JsValue::from_str(&format!("Failed to request leadership: {}", e))
})?;
log::debug!("Re-election triggered for {}", db_name);
}
#[cfg(feature = "telemetry")]
if let Some((start_time, was_leader)) = telemetry_data {
if let Some(ref metrics) = self.metrics {
let duration_ms = js_sys::Date::now() - start_time;
metrics.leader_election_duration().observe(duration_ms);
let is_leader_now = self
.is_leader_wasm()
.await
.ok()
.and_then(|v| v.as_bool())
.unwrap_or(false);
metrics
.is_leader()
.set(if is_leader_now { 1.0 } else { 0.0 });
if was_leader != is_leader_now {
metrics.leadership_changes_total().inc();
}
}
}
Ok(())
} else {
Err(JsValue::from_str(&format!(
"No storage found for database: {}",
db_name
)))
}
}
#[wasm_bindgen(js_name = "getLeaderInfo")]
pub async fn get_leader_info(&mut self) -> Result<JsValue, JsValue> {
use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
let db_name = &self.name;
let storage_rc = get_storage_with_fallback(db_name);
if let Some(storage) = storage_rc {
let is_leader = with_storage_async!(storage, "get_leader_info", |s| s.is_leader())
.ok_or_else(|| {
JsValue::from_str(&format!(
"Failed to access storage for database: {}",
db_name
))
})?;
let leader_id_str = if is_leader {
format!("leader_{}", db_name)
} else {
"unknown".to_string()
};
let obj = js_sys::Object::new();
js_sys::Reflect::set(&obj, &"isLeader".into(), &JsValue::from_bool(is_leader))?;
js_sys::Reflect::set(&obj, &"leaderId".into(), &JsValue::from_str(&leader_id_str))?;
js_sys::Reflect::set(
&obj,
&"leaseExpiry".into(),
&JsValue::from_f64(js_sys::Date::now()),
)?;
Ok(obj.into())
} else {
Err(JsValue::from_str(&format!(
"No storage found for database: {}",
db_name
)))
}
}
#[wasm_bindgen(js_name = "queueWrite")]
pub async fn queue_write(&mut self, sql: String) -> Result<(), JsValue> {
self.queue_write_with_timeout(sql, 5000).await
}
#[wasm_bindgen(js_name = "queueWriteWithTimeout")]
pub async fn queue_write_with_timeout(
&mut self,
sql: String,
timeout_ms: u32,
) -> Result<(), JsValue> {
use crate::storage::write_queue::{WriteQueueMessage, WriteResponse, send_write_request};
use std::cell::RefCell;
use std::rc::Rc;
log::debug!("Queuing write: {}", sql);
use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
let is_leader = {
let storage_rc = get_storage_with_fallback(&self.name);
if let Some(storage) = storage_rc {
with_storage_async!(storage, "queue_write_is_leader", |s| s.is_leader())
.unwrap_or(false)
} else {
false
}
};
if is_leader {
log::debug!("We are leader, executing directly");
return self
.execute_internal(&sql)
.await
.map(|_| ())
.map_err(|e| JsValue::from_str(&format!("Execute failed: {}", e)));
}
let request_id = send_write_request(&self.name, &sql)
.map_err(|e| JsValue::from_str(&format!("Failed to send write request: {}", e)))?;
log::debug!("Write request sent with ID: {}", request_id);
let response_received = Rc::new(RefCell::new(false));
let response_error = Rc::new(RefCell::new(None::<String>));
let response_received_clone = response_received.clone();
let response_error_clone = response_error.clone();
let request_id_clone = request_id.clone();
let callback = Closure::wrap(Box::new(move |msg: JsValue| {
if let Ok(json_str) = js_sys::JSON::stringify(&msg) {
if let Some(json_str) = json_str.as_string() {
if let Ok(message) = serde_json::from_str::<WriteQueueMessage>(&json_str) {
if let WriteQueueMessage::WriteResponse(response) = message {
match response {
WriteResponse::Success { request_id, .. } => {
if request_id == request_id_clone {
*response_received_clone.borrow_mut() = true;
log::debug!("Write response received: Success");
}
}
WriteResponse::Error {
request_id,
error_message,
} => {
if request_id == request_id_clone {
*response_received_clone.borrow_mut() = true;
*response_error_clone.borrow_mut() = Some(error_message);
log::debug!("Write response received: Error");
}
}
}
}
}
}
}
}) as Box<dyn FnMut(JsValue)>);
use crate::storage::write_queue::register_write_queue_listener;
let callback_fn = callback.as_ref().unchecked_ref();
register_write_queue_listener(&self.name, callback_fn)
.map_err(|e| JsValue::from_str(&format!("Failed to register listener: {}", e)))?;
callback.forget();
let start_time = js_sys::Date::now();
let timeout_f64 = timeout_ms as f64;
loop {
if *response_received.borrow() {
if let Some(error_msg) = response_error.borrow().as_ref() {
return Err(JsValue::from_str(&format!("Write failed: {}", error_msg)));
}
log::info!("Write completed successfully");
return Ok(());
}
let elapsed = js_sys::Date::now() - start_time;
if elapsed > timeout_f64 {
return Err(JsValue::from_str("Write request timed out"));
}
wasm_bindgen_futures::JsFuture::from(js_sys::Promise::new(&mut |resolve, _reject| {
if let Some(window) = web_sys::window() {
let _ =
window.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, 100);
} else {
log::error!("Window unavailable in timeout handler");
}
}))
.await
.ok();
}
}
#[wasm_bindgen(js_name = "isLeader")]
pub async fn is_leader_wasm(&self) -> Result<JsValue, JsValue> {
use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
let db_name = &self.name;
log::debug!("isLeader() called for database: {} (self.name)", db_name);
let storage_rc = get_storage_with_fallback(db_name);
if let Some(storage) = storage_rc {
log::debug!("Found storage for {}, calling is_leader()", db_name);
let is_leader = with_storage_async!(storage, "is_leader_wasm", |s| s.is_leader())
.ok_or_else(|| {
JsValue::from_str(&format!(
"Failed to access storage for database: {}",
db_name
))
})?;
log::debug!("is_leader() = {} for {}", is_leader, db_name);
Ok(JsValue::from_bool(is_leader))
} else {
log::error!("ERROR: No storage found for database: {}", db_name);
Err(JsValue::from_str(&format!(
"No storage found for database: {}",
db_name
)))
}
}
pub async fn is_leader(&self) -> Result<bool, JsValue> {
let result = self.is_leader_wasm().await?;
Ok(result.as_bool().unwrap_or(false))
}
#[wasm_bindgen(js_name = "onDataChange")]
pub fn on_data_change_wasm(&mut self, callback: &js_sys::Function) -> Result<(), JsValue> {
log::debug!("Registering onDataChange callback for {}", self.name);
self.on_data_change_callback = Some(callback.clone());
use crate::storage::broadcast_notifications::register_change_listener;
let db_name = &self.name;
register_change_listener(db_name, callback).map_err(|e| {
JsValue::from_str(&format!("Failed to register change listener: {}", e))
})?;
log::debug!("onDataChange callback registered for {}", self.name);
Ok(())
}
#[wasm_bindgen(js_name = "enableOptimisticUpdates")]
pub async fn enable_optimistic_updates(&mut self, enabled: bool) -> Result<(), JsValue> {
self.optimistic_updates_manager
.borrow_mut()
.set_enabled(enabled);
log::debug!(
"Optimistic updates {}",
if enabled { "enabled" } else { "disabled" }
);
Ok(())
}
#[wasm_bindgen(js_name = "isOptimisticMode")]
pub async fn is_optimistic_mode(&self) -> bool {
self.optimistic_updates_manager.borrow().is_enabled()
}
#[wasm_bindgen(js_name = "trackOptimisticWrite")]
pub async fn track_optimistic_write(&mut self, sql: String) -> Result<String, JsValue> {
let id = self
.optimistic_updates_manager
.borrow_mut()
.track_write(sql);
Ok(id)
}
#[wasm_bindgen(js_name = "getPendingWritesCount")]
pub async fn get_pending_writes_count(&self) -> usize {
self.optimistic_updates_manager.borrow().get_pending_count()
}
#[wasm_bindgen(js_name = "clearOptimisticWrites")]
pub async fn clear_optimistic_writes(&mut self) -> Result<(), JsValue> {
self.optimistic_updates_manager.borrow_mut().clear_all();
Ok(())
}
#[wasm_bindgen(js_name = "enableCoordinationMetrics")]
pub async fn enable_coordination_metrics(&mut self, enabled: bool) -> Result<(), JsValue> {
self.coordination_metrics_manager
.borrow_mut()
.set_enabled(enabled);
Ok(())
}
#[wasm_bindgen(js_name = "isCoordinationMetricsEnabled")]
pub async fn is_coordination_metrics_enabled(&self) -> bool {
self.coordination_metrics_manager.borrow().is_enabled()
}
#[wasm_bindgen(js_name = "recordLeadershipChange")]
pub async fn record_leadership_change(&mut self, became_leader: bool) -> Result<(), JsValue> {
self.coordination_metrics_manager
.borrow_mut()
.record_leadership_change(became_leader);
Ok(())
}
#[wasm_bindgen(js_name = "recordNotificationLatency")]
pub async fn record_notification_latency(&mut self, latency_ms: f64) -> Result<(), JsValue> {
self.coordination_metrics_manager
.borrow_mut()
.record_notification_latency(latency_ms);
Ok(())
}
#[wasm_bindgen(js_name = "recordWriteConflict")]
pub async fn record_write_conflict(&mut self) -> Result<(), JsValue> {
self.coordination_metrics_manager
.borrow_mut()
.record_write_conflict();
Ok(())
}
#[wasm_bindgen(js_name = "recordFollowerRefresh")]
pub async fn record_follower_refresh(&mut self) -> Result<(), JsValue> {
self.coordination_metrics_manager
.borrow_mut()
.record_follower_refresh();
Ok(())
}
#[wasm_bindgen(js_name = "getCoordinationMetrics")]
pub async fn get_coordination_metrics(&self) -> Result<String, JsValue> {
self.coordination_metrics_manager
.borrow()
.get_metrics_json()
.map_err(|e| JsValue::from_str(&e))
}
#[wasm_bindgen(js_name = "resetCoordinationMetrics")]
pub async fn reset_coordination_metrics(&mut self) -> Result<(), JsValue> {
self.coordination_metrics_manager.borrow_mut().reset();
Ok(())
}
}
#[cfg(target_arch = "wasm32")]
#[wasm_bindgen]
pub struct WasmColumnValue {
#[allow(dead_code)]
inner: ColumnValue,
}
#[cfg(target_arch = "wasm32")]
#[wasm_bindgen]
impl WasmColumnValue {
#[wasm_bindgen(js_name = "createNull")]
pub fn create_null() -> WasmColumnValue {
WasmColumnValue {
inner: ColumnValue::Null,
}
}
#[wasm_bindgen(js_name = "createInteger")]
pub fn create_integer(value: i64) -> WasmColumnValue {
WasmColumnValue {
inner: ColumnValue::Integer(value),
}
}
#[wasm_bindgen(js_name = "createReal")]
pub fn create_real(value: f64) -> WasmColumnValue {
WasmColumnValue {
inner: ColumnValue::Real(value),
}
}
#[wasm_bindgen(js_name = "createText")]
pub fn create_text(value: String) -> WasmColumnValue {
WasmColumnValue {
inner: ColumnValue::Text(value),
}
}
#[wasm_bindgen(js_name = "createBlob")]
pub fn create_blob(value: &[u8]) -> WasmColumnValue {
WasmColumnValue {
inner: ColumnValue::Blob(value.to_vec()),
}
}
#[wasm_bindgen(js_name = "createBigInt")]
pub fn create_bigint(value: &str) -> WasmColumnValue {
WasmColumnValue {
inner: ColumnValue::BigInt(value.to_string()),
}
}
#[wasm_bindgen(js_name = "createDate")]
pub fn create_date(timestamp: f64) -> WasmColumnValue {
WasmColumnValue {
inner: ColumnValue::Date(timestamp as i64),
}
}
#[wasm_bindgen(js_name = "fromJsValue")]
pub fn from_js_value(value: &JsValue) -> WasmColumnValue {
if value.is_null() || value.is_undefined() {
WasmColumnValue {
inner: ColumnValue::Null,
}
} else if let Some(s) = value.as_string() {
if let Ok(parsed) = s.parse::<i64>() {
WasmColumnValue {
inner: ColumnValue::Integer(parsed),
}
} else {
WasmColumnValue {
inner: ColumnValue::Text(s),
}
}
} else if let Some(n) = value.as_f64() {
if n.fract() == 0.0 && n >= i64::MIN as f64 && n <= i64::MAX as f64 {
WasmColumnValue {
inner: ColumnValue::Integer(n as i64),
}
} else {
WasmColumnValue {
inner: ColumnValue::Real(n),
}
}
} else if value.is_object() {
if js_sys::Date::new(value).get_time().is_finite() {
let timestamp = js_sys::Date::new(value).get_time() as i64;
WasmColumnValue {
inner: ColumnValue::Date(timestamp),
}
} else {
WasmColumnValue {
inner: ColumnValue::Text(format!("{:?}", value)),
}
}
} else {
WasmColumnValue {
inner: ColumnValue::Null,
}
}
}
pub fn null() -> WasmColumnValue {
Self::create_null()
}
pub fn integer(value: f64) -> WasmColumnValue {
Self::create_integer(value as i64)
}
pub fn real(value: f64) -> WasmColumnValue {
Self::create_real(value)
}
pub fn text(value: String) -> WasmColumnValue {
Self::create_text(value)
}
pub fn blob(value: Vec<u8>) -> WasmColumnValue {
Self::create_blob(&value)
}
pub fn big_int(value: String) -> WasmColumnValue {
Self::create_bigint(&value)
}
pub fn date(timestamp_ms: f64) -> WasmColumnValue {
Self::create_date(timestamp_ms)
}
}