1#[cfg(target_arch = "wasm32")]
2use std::rc::Rc;
3#[cfg(target_arch = "wasm32")]
4use wasm_bindgen::prelude::*;
5
6#[cfg(all(
11 not(target_arch = "wasm32"),
12 any(
13 feature = "bundled-sqlite",
14 feature = "encryption",
15 feature = "encryption-commoncrypto",
16 feature = "encryption-ios"
17 )
18))]
19pub extern crate rusqlite;
20
21#[cfg(feature = "console_error_panic_hook")]
23pub use console_error_panic_hook::set_once as set_panic_hook;
24
25#[cfg(feature = "wee_alloc")]
26#[global_allocator]
27static ALLOC: wee_alloc::WeeAlloc = wee_alloc::WeeAlloc::INIT;
28
29#[cfg(all(target_arch = "wasm32", feature = "console_log"))]
31#[wasm_bindgen(start)]
32pub fn init_logger() {
33 #[cfg(debug_assertions)]
36 let log_level = log::Level::Debug;
37 #[cfg(not(debug_assertions))]
38 let log_level = log::Level::Info;
39
40 console_log::init_with_level(log_level).expect("Failed to initialize console_log");
41
42 log::info!("AbsurderSQL logging initialized at level: {:?}", log_level);
43}
44
45#[inline]
49#[cfg(target_arch = "wasm32")]
50fn normalize_db_name(name: &str) -> String {
51 if name.ends_with(".db") {
54 name.to_string()
55 } else {
56 format!("{}.db", name)
57 }
58}
59
60mod cleanup;
62#[cfg(target_arch = "wasm32")]
63pub mod connection_pool;
64#[cfg(not(target_arch = "wasm32"))]
65pub mod database;
66pub mod storage;
67pub mod types;
68pub mod vfs;
69#[cfg(not(target_arch = "wasm32"))]
70pub use database::PreparedStatement;
71pub mod utils;
72
73#[cfg(feature = "telemetry")]
74pub mod telemetry;
75
76#[cfg(not(target_arch = "wasm32"))]
78pub use database::SqliteIndexedDB;
79
80#[cfg(target_arch = "wasm32")]
82thread_local! {
83 static DB_OPEN_IN_PROGRESS: std::cell::RefCell<std::collections::HashSet<String>> =
84 std::cell::RefCell::new(std::collections::HashSet::new());
85}
86
87#[cfg(not(target_arch = "wasm32"))]
89pub type Database = SqliteIndexedDB;
90
91pub use types::DatabaseConfig;
92pub use types::{ColumnValue, DatabaseError, QueryResult, Row, TransactionOptions};
93
94pub use vfs::indexeddb_vfs::IndexedDBVFS;
96
97#[cfg(target_arch = "wasm32")]
99macro_rules! with_storage_async {
100 ($storage:expr, $operation:expr, |$s:ident| $body:expr) => {{
101 let $s = &*$storage;
103 Some($body.await)
104 }};
105}
106
107#[cfg(target_arch = "wasm32")]
109#[wasm_bindgen]
110pub struct Database {
111 #[wasm_bindgen(skip)]
112 connection_state: Rc<crate::connection_pool::ConnectionState>,
113 #[allow(dead_code)]
114 name: String,
115 #[wasm_bindgen(skip)]
116 on_data_change_callback: Option<js_sys::Function>,
117 #[wasm_bindgen(skip)]
118 allow_non_leader_writes: bool,
119 #[wasm_bindgen(skip)]
120 optimistic_updates_manager:
121 std::cell::RefCell<crate::storage::optimistic_updates::OptimisticUpdatesManager>,
122 #[wasm_bindgen(skip)]
123 coordination_metrics_manager:
124 std::cell::RefCell<crate::storage::coordination_metrics::CoordinationMetricsManager>,
125 #[wasm_bindgen(skip)]
126 #[cfg(feature = "telemetry")]
127 metrics: Option<crate::telemetry::Metrics>,
128 #[wasm_bindgen(skip)]
129 #[cfg(feature = "telemetry")]
130 span_recorder: Option<crate::telemetry::SpanRecorder>,
131 #[wasm_bindgen(skip)]
132 #[cfg(feature = "telemetry")]
133 span_context: Option<crate::telemetry::SpanContext>,
134 #[wasm_bindgen(skip)]
135 max_export_size_bytes: Option<u64>,
136}
137
138#[cfg(target_arch = "wasm32")]
139impl Database {
140 fn db(&self) -> *mut sqlite_wasm_rs::sqlite3 {
143 let db_ptr = self.connection_state.db.get();
144 if db_ptr.is_null() {
145 panic!("Database connection is null for {}", self.name);
146 }
147 db_ptr
148 }
149
150 fn is_write_operation(sql: &str) -> bool {
152 let upper = sql.trim().to_uppercase();
153 upper.starts_with("INSERT")
154 || upper.starts_with("UPDATE")
155 || upper.starts_with("DELETE")
156 || upper.starts_with("REPLACE")
157 }
158
159 #[cfg(feature = "telemetry")]
163 pub fn metrics(&self) -> Option<&crate::telemetry::Metrics> {
164 self.metrics.as_ref()
165 }
166
167 async fn check_write_permission(&mut self, sql: &str) -> Result<(), DatabaseError> {
169 if !Self::is_write_operation(sql) {
170 return Ok(());
172 }
173
174 if self.allow_non_leader_writes {
176 log::info!("WRITE_ALLOWED: Non-leader writes enabled for {}", self.name);
177 return Ok(());
178 }
179
180 use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
182
183 let db_name = &self.name;
184 let storage_rc = get_storage_with_fallback(db_name);
185
186 if let Some(storage) = storage_rc {
187 let is_leader = with_storage_async!(storage, "check_write_permission", |s| s
188 .is_leader())
189 .ok_or_else(|| {
190 DatabaseError::new("BORROW_CONFLICT", "Failed to check leader status")
191 })?;
192
193 if !is_leader {
194 log::error!("WRITE_DENIED: Instance is not leader for {}", db_name);
195 return Err(DatabaseError::new(
196 "WRITE_PERMISSION_DENIED",
197 "Only the leader tab can write to this database. Use db.isLeader() to check status or call db.allowNonLeaderWrites(true) for single-tab mode.",
198 ));
199 }
200
201 log::info!("WRITE_ALLOWED: Instance is leader for {}", db_name);
202 Ok(())
203 } else {
204 log::info!(
206 "WRITE_ALLOWED: No storage found for {} (single-instance mode)",
207 db_name
208 );
209 Ok(())
210 }
211 }
212
213 pub async fn new(config: DatabaseConfig) -> Result<Self, DatabaseError> {
214 use std::ffi::{CStr, CString};
215
216 log::info!("Database::new called for {}", config.name);
217
218 let normalized_name = normalize_db_name(&config.name);
221
222 let vfs_name = format!("vfs_{}", normalized_name.trim_end_matches(".db"));
224 let vfs_name_cstr = CString::new(vfs_name.as_str())
225 .map_err(|_| DatabaseError::new("INVALID_VFS_NAME", "Invalid VFS name"))?;
226 let vfs_exists = unsafe {
227 let existing_vfs = sqlite_wasm_rs::sqlite3_vfs_find(vfs_name_cstr.as_ptr());
228 !existing_vfs.is_null()
229 };
230
231 if !vfs_exists {
232 log::debug!("Creating IndexedDBVFS for: {}", normalized_name);
234 let vfs = crate::vfs::IndexedDBVFS::new(&normalized_name).await?;
235 log::debug!("Registering VFS as '{}'", vfs_name);
236 vfs.register(&vfs_name)?;
237 log::info!("VFS registered successfully");
238 } else {
239 log::info!("VFS '{}' already registered, reusing existing", vfs_name);
240 let _vfs = crate::vfs::IndexedDBVFS::new(&normalized_name).await?;
243 log::info!("BlockStorage ensured for {}", normalized_name);
244 }
245
246 #[cfg(target_arch = "wasm32")]
249 {
250 const MAX_OPEN_WAIT_MS: u32 = 1000; const OPEN_POLL_MS: u32 = 10;
252 let max_open_attempts = MAX_OPEN_WAIT_MS / OPEN_POLL_MS;
253
254 for attempt in 0..max_open_attempts {
255 let can_open = DB_OPEN_IN_PROGRESS.with(|opens| {
256 let mut set = opens.borrow_mut();
257 if set.contains(&config.name) {
258 false } else {
260 set.insert(config.name.clone());
261 true }
263 });
264
265 if can_open {
266 web_sys::console::log_1(
267 &format!("[DB] {} - ACQUIRED sqlite open lock", config.name).into(),
268 );
269 break;
270 } else {
271 web_sys::console::log_1(
272 &format!(
273 "[DB] {} - Waiting for sqlite open (attempt {})",
274 config.name, attempt
275 )
276 .into(),
277 );
278 use wasm_bindgen_futures::JsFuture;
279 let promise = js_sys::Promise::new(&mut |resolve, _| {
280 web_sys::window()
281 .unwrap()
282 .set_timeout_with_callback_and_timeout_and_arguments_0(
283 &resolve,
284 OPEN_POLL_MS as i32,
285 )
286 .unwrap();
287 });
288 JsFuture::from(promise).await.ok();
289 continue;
290 }
291 }
292 }
293
294 let (connection_state, db) = {
296 let vfs_name_str = vfs_name.clone(); let filename_copy = normalized_name.clone(); let pool_key = normalized_name.trim_end_matches(".db").to_string(); let state = crate::connection_pool::get_or_create_connection(&pool_key, || {
300 let mut db = std::ptr::null_mut();
301 let db_name = CString::new(normalized_name.clone())
302 .map_err(|_| "Invalid database name".to_string())?;
303 let vfs_cstr = CString::new(vfs_name_str.as_str())
304 .map_err(|_| "Invalid VFS name".to_string())?;
305
306 log::info!(
307 "Opening database: {} with VFS: {}",
308 filename_copy,
309 vfs_name_str
310 );
311
312 #[cfg(target_arch = "wasm32")]
313 web_sys::console::log_1(&format!("[OPEN] About to call sqlite3_open_v2...").into());
314
315 let ret = unsafe {
316 sqlite_wasm_rs::sqlite3_open_v2(
317 db_name.as_ptr(),
318 &mut db as *mut _,
319 sqlite_wasm_rs::SQLITE_OPEN_READWRITE | sqlite_wasm_rs::SQLITE_OPEN_CREATE,
320 vfs_cstr.as_ptr(),
321 )
322 };
323
324 #[cfg(target_arch = "wasm32")]
325 web_sys::console::log_1(
326 &format!("[OPEN] sqlite3_open_v2 returned: {}", ret).into(),
327 );
328
329 log::info!(
330 "sqlite3_open_v2 returned: {} for database: {}",
331 ret,
332 filename_copy
333 );
334
335 if ret != sqlite_wasm_rs::SQLITE_OK {
336 let err_msg = unsafe {
337 let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(db);
338 if !msg_ptr.is_null() {
339 CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
340 } else {
341 "Unknown error".to_string()
342 }
343 };
344
345 #[cfg(target_arch = "wasm32")]
346 web_sys::console::log_1(
347 &format!(
348 "[OPEN] ERROR - sqlite3_open_v2 FAILED: ret={}, err={}",
349 ret, err_msg
350 )
351 .into(),
352 );
353
354 return Err(format!(
355 "Failed to open database with IndexedDB VFS: {}",
356 err_msg
357 ));
358 }
359
360 log::info!("Database opened successfully with IndexedDB VFS");
361 Ok(db)
362 })
363 .map_err(|e| DatabaseError::new("CONNECTION_POOL_ERROR", &e))?;
364 let db_ptr = state.db.get();
365 (state, db_ptr)
366 };
367
368 let exec_sql = |db: *mut sqlite_wasm_rs::sqlite3, sql: &str| -> Result<(), DatabaseError> {
370 let c_sql = CString::new(sql)
371 .map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL statement"))?;
372
373 let ret = unsafe {
374 sqlite_wasm_rs::sqlite3_exec(
375 db,
376 c_sql.as_ptr(),
377 None,
378 std::ptr::null_mut(),
379 std::ptr::null_mut(),
380 )
381 };
382
383 if ret != sqlite_wasm_rs::SQLITE_OK {
384 let err_msg = unsafe {
385 let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(db);
386 if !msg_ptr.is_null() {
387 CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
388 } else {
389 "Unknown error".to_string()
390 }
391 };
392 log::warn!("Failed to execute SQL '{}': {}", sql, err_msg);
393 return Err(DatabaseError::new(
394 "SQLITE_ERROR",
395 &format!("Failed to execute: {}", err_msg),
396 ));
397 }
398 Ok(())
399 };
400
401 log::debug!("Setting busy_timeout to 10000ms for concurrent access handling");
405 exec_sql(db, "PRAGMA busy_timeout = 10000")?;
406
407 if let Some(page_size) = config.page_size {
409 log::debug!("Setting page_size to {}", page_size);
410 exec_sql(db, &format!("PRAGMA page_size = {}", page_size))?;
411 }
412
413 if let Some(cache_size) = config.cache_size {
415 log::debug!("Setting cache_size to {}", cache_size);
416 exec_sql(db, &format!("PRAGMA cache_size = {}", cache_size))?;
417 }
418
419 if let Some(ref journal_mode) = config.journal_mode {
422 log::debug!("Setting journal_mode to {}", journal_mode);
423
424 let pragma_sql = format!("PRAGMA journal_mode = {}", journal_mode);
425 let c_sql = CString::new(pragma_sql.as_str())
426 .map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL statement"))?;
427
428 let mut stmt: *mut sqlite_wasm_rs::sqlite3_stmt = std::ptr::null_mut();
429 let ret = unsafe {
430 sqlite_wasm_rs::sqlite3_prepare_v2(
431 db,
432 c_sql.as_ptr(),
433 -1,
434 &mut stmt as *mut _,
435 std::ptr::null_mut(),
436 )
437 };
438
439 if ret == sqlite_wasm_rs::SQLITE_OK && !stmt.is_null() {
440 let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
441 if step_ret == sqlite_wasm_rs::SQLITE_ROW {
442 let result_ptr = unsafe { sqlite_wasm_rs::sqlite3_column_text(stmt, 0) };
443 if !result_ptr.is_null() {
444 let result_mode = unsafe {
445 std::ffi::CStr::from_ptr(result_ptr as *const i8)
446 .to_string_lossy()
447 .to_uppercase()
448 };
449
450 if result_mode != journal_mode.to_uppercase() {
451 log::warn!(
452 "journal_mode {} requested but SQLite set {}",
453 journal_mode,
454 result_mode
455 );
456 } else {
457 log::info!("journal_mode successfully set to {}", result_mode);
458 }
459 }
460 }
461 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
462 } else {
463 log::warn!("Failed to prepare journal_mode PRAGMA");
464 }
465 }
466
467 if let Some(auto_vacuum) = config.auto_vacuum {
469 let vacuum_mode = if auto_vacuum { 1 } else { 0 }; log::debug!("Setting auto_vacuum to {}", vacuum_mode);
471 exec_sql(db, &format!("PRAGMA auto_vacuum = {}", vacuum_mode))?;
472 }
473
474 log::info!("Database configuration applied successfully");
475
476 #[cfg(feature = "telemetry")]
478 let metrics = crate::telemetry::Metrics::new().map_err(|e| {
479 DatabaseError::new(
480 "METRICS_ERROR",
481 &format!("Failed to initialize metrics: {}", e),
482 )
483 })?;
484
485 let database = Database {
486 connection_state,
487 name: normalized_name.clone(), on_data_change_callback: None,
489 allow_non_leader_writes: false,
490 optimistic_updates_manager: std::cell::RefCell::new(
491 crate::storage::optimistic_updates::OptimisticUpdatesManager::new(),
492 ),
493 coordination_metrics_manager: std::cell::RefCell::new(
494 crate::storage::coordination_metrics::CoordinationMetricsManager::new(),
495 ),
496 #[cfg(feature = "telemetry")]
497 metrics: Some(metrics),
498 #[cfg(feature = "telemetry")]
499 span_recorder: None,
500 #[cfg(feature = "telemetry")]
501 span_context: Some(crate::telemetry::SpanContext::new()),
502 max_export_size_bytes: config.max_export_size_bytes,
503 };
504
505 #[cfg(target_arch = "wasm32")]
508 {
509 DB_OPEN_IN_PROGRESS.with(|opens| {
510 opens.borrow_mut().remove(&config.name);
511 });
512 web_sys::console::log_1(
513 &format!("[DB] {} - RELEASED sqlite open lock", config.name).into(),
514 );
515 }
516
517 Ok(database)
518 }
519
520 pub async fn open_with_vfs(filename: &str, vfs_name: &str) -> Result<Self, DatabaseError> {
522 use std::ffi::CString;
523
524 log::info!("Opening database {} with VFS {}", filename, vfs_name);
525
526 let normalized_name = normalize_db_name(filename);
528 let pool_key = normalized_name.trim_end_matches(".db").to_string();
529
530 let connection_state = crate::connection_pool::get_or_create_connection(&pool_key, || {
532 let mut db: *mut sqlite_wasm_rs::sqlite3 = std::ptr::null_mut();
533 let db_name = CString::new(normalized_name.clone())
534 .map_err(|_| "Invalid database name".to_string())?;
535 let vfs_cstr = CString::new(vfs_name).map_err(|_| "Invalid VFS name".to_string())?;
536
537 let ret = unsafe {
538 sqlite_wasm_rs::sqlite3_open_v2(
539 db_name.as_ptr(),
540 &mut db as *mut _,
541 sqlite_wasm_rs::SQLITE_OPEN_READWRITE | sqlite_wasm_rs::SQLITE_OPEN_CREATE,
542 vfs_cstr.as_ptr(),
543 )
544 };
545
546 if ret != sqlite_wasm_rs::SQLITE_OK {
547 let err_msg = if !db.is_null() {
548 unsafe {
549 let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(db);
550 if !msg_ptr.is_null() {
551 std::ffi::CStr::from_ptr(msg_ptr)
552 .to_string_lossy()
553 .into_owned()
554 } else {
555 "Unknown error".to_string()
556 }
557 }
558 } else {
559 "Failed to open database".to_string()
560 };
561 return Err(format!("SQLITE_ERROR: {}", err_msg));
562 }
563
564 Ok(db)
565 })
566 .map_err(|e| DatabaseError::new("OPEN_ERROR", &e))?;
567
568 log::info!(
569 "Successfully opened database {} with VFS {}",
570 normalized_name,
571 vfs_name
572 );
573
574 #[cfg(feature = "telemetry")]
576 let metrics = crate::telemetry::Metrics::new().map_err(|e| {
577 DatabaseError::new(
578 "METRICS_ERROR",
579 &format!("Failed to initialize metrics: {}", e),
580 )
581 })?;
582
583 Ok(Database {
584 connection_state,
585 name: normalized_name, on_data_change_callback: None,
587 allow_non_leader_writes: false,
588 optimistic_updates_manager: std::cell::RefCell::new(
589 crate::storage::optimistic_updates::OptimisticUpdatesManager::new(),
590 ),
591 coordination_metrics_manager: std::cell::RefCell::new(
592 crate::storage::coordination_metrics::CoordinationMetricsManager::new(),
593 ),
594 #[cfg(feature = "telemetry")]
595 metrics: Some(metrics),
596 #[cfg(feature = "telemetry")]
597 span_recorder: None,
598 #[cfg(feature = "telemetry")]
599 span_context: Some(crate::telemetry::SpanContext::new()),
600 max_export_size_bytes: Some(2 * 1024 * 1024 * 1024), })
602 }
603
604 pub async fn execute_internal(&mut self, sql: &str) -> Result<QueryResult, DatabaseError> {
605 use std::ffi::{CStr, CString};
606 let start_time = js_sys::Date::now();
607
608 #[cfg(feature = "telemetry")]
610 let span = if self.span_recorder.is_some() {
611 let query_type = sql
612 .trim()
613 .split_whitespace()
614 .next()
615 .unwrap_or("UNKNOWN")
616 .to_uppercase();
617 let mut builder = crate::telemetry::SpanBuilder::new("execute_query".to_string())
618 .with_attribute("query_type", query_type.clone())
619 .with_attribute("sql", sql.to_string());
620
621 if let Some(ref context) = self.span_context {
623 builder = builder.with_baggage_from_context(context);
624 }
625
626 let span = builder.build();
627
628 if let Some(ref context) = self.span_context {
630 context.enter_span(span.span_id.clone());
631 }
632
633 Some(span)
634 } else {
635 None
636 };
637
638 #[cfg(feature = "telemetry")]
640 #[cfg(feature = "telemetry")]
641 if let Some(metrics) = &self.metrics {
642 metrics.queries_total().inc();
643 }
644
645 if self.db().is_null() {
647 return Err(DatabaseError::new(
648 "NULL_CONNECTION",
649 "Database connection is null",
650 ));
651 }
652
653 let sql_cstr = CString::new(sql)
654 .map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL string"))?;
655
656 if sql.trim().to_uppercase().starts_with("SELECT") {
657 let mut stmt = std::ptr::null_mut();
658 let ret = unsafe {
659 sqlite_wasm_rs::sqlite3_prepare_v2(
660 self.db(),
661 sql_cstr.as_ptr(),
662 -1,
663 &mut stmt,
664 std::ptr::null_mut(),
665 )
666 };
667
668 if ret != sqlite_wasm_rs::SQLITE_OK {
669 let err_msg = unsafe {
671 let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
672 if !msg_ptr.is_null() {
673 CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
674 } else {
675 format!("Unknown error (code: {})", ret)
676 }
677 };
678
679 #[cfg(feature = "telemetry")]
681 #[cfg(feature = "telemetry")]
682 if let Some(metrics) = &self.metrics {
683 metrics.errors_total().inc();
684 }
685
686 #[cfg(feature = "telemetry")]
688 if let Some(mut s) = span {
689 s.status = crate::telemetry::SpanStatus::Error(format!(
690 "Failed to prepare: {}",
691 err_msg
692 ));
693 s.end_time_ms = Some(js_sys::Date::now());
694 if let Some(recorder) = &self.span_recorder {
695 recorder.record_span(s);
696 }
697
698 if let Some(ref context) = self.span_context {
700 context.exit_span();
701 }
702 }
703
704 return Err(DatabaseError::new(
705 "SQLITE_ERROR",
706 &format!("Failed to prepare statement: {}", err_msg),
707 )
708 .with_sql(sql));
709 }
710
711 let column_count = unsafe { sqlite_wasm_rs::sqlite3_column_count(stmt) };
712 let mut columns = Vec::new();
713 let mut rows = Vec::new();
714
715 for i in 0..column_count {
717 let col_name = unsafe {
718 let name_ptr = sqlite_wasm_rs::sqlite3_column_name(stmt, i);
719 if name_ptr.is_null() {
720 format!("col_{}", i)
721 } else {
722 std::ffi::CStr::from_ptr(name_ptr)
723 .to_string_lossy()
724 .into_owned()
725 }
726 };
727 columns.push(col_name);
728 }
729
730 loop {
732 let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
733 if step_ret == sqlite_wasm_rs::SQLITE_ROW {
734 let mut values = Vec::new();
735 for i in 0..column_count {
736 let value = unsafe {
737 let col_type = sqlite_wasm_rs::sqlite3_column_type(stmt, i);
738 match col_type {
739 sqlite_wasm_rs::SQLITE_NULL => ColumnValue::Null,
740 sqlite_wasm_rs::SQLITE_INTEGER => {
741 let val = sqlite_wasm_rs::sqlite3_column_int64(stmt, i);
742 ColumnValue::Integer(val)
743 }
744 sqlite_wasm_rs::SQLITE_FLOAT => {
745 let val = sqlite_wasm_rs::sqlite3_column_double(stmt, i);
746 ColumnValue::Real(val)
747 }
748 sqlite_wasm_rs::SQLITE_TEXT => {
749 let text_ptr = sqlite_wasm_rs::sqlite3_column_text(stmt, i);
750 if text_ptr.is_null() {
751 ColumnValue::Null
752 } else {
753 let text = std::ffi::CStr::from_ptr(text_ptr as *const i8)
754 .to_string_lossy()
755 .into_owned();
756 ColumnValue::Text(text)
757 }
758 }
759 sqlite_wasm_rs::SQLITE_BLOB => {
760 let blob_ptr = sqlite_wasm_rs::sqlite3_column_blob(stmt, i);
761 let blob_size = sqlite_wasm_rs::sqlite3_column_bytes(stmt, i);
762 if blob_ptr.is_null() || blob_size == 0 {
763 ColumnValue::Blob(vec![])
764 } else {
765 let blob_slice = std::slice::from_raw_parts(
766 blob_ptr as *const u8,
767 blob_size as usize,
768 );
769 ColumnValue::Blob(blob_slice.to_vec())
770 }
771 }
772 _ => ColumnValue::Null,
773 }
774 };
775 values.push(value);
776 }
777 rows.push(Row { values });
778 } else if step_ret == sqlite_wasm_rs::SQLITE_DONE {
779 break;
780 } else {
781 let err_msg = unsafe {
783 let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
784 if !err_ptr.is_null() {
785 std::ffi::CStr::from_ptr(err_ptr)
786 .to_string_lossy()
787 .to_string()
788 } else {
789 "Unknown SQLite error".to_string()
790 }
791 };
792 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
793 #[cfg(feature = "telemetry")]
795 if let Some(metrics) = &self.metrics {
796 metrics.errors_total().inc();
797 }
798 return Err(DatabaseError::new(
799 "SQLITE_ERROR",
800 &format!("Error executing SELECT statement: {}", err_msg),
801 )
802 .with_sql(sql));
803 }
804 }
805
806 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
807 let execution_time_ms = js_sys::Date::now() - start_time;
808
809 #[cfg(feature = "telemetry")]
811 if let Some(metrics) = &self.metrics {
812 metrics.query_duration().observe(execution_time_ms);
813 }
814
815 Ok(QueryResult {
816 columns,
817 rows,
818 affected_rows: 0,
819 last_insert_id: None,
820 execution_time_ms,
821 })
822 } else {
823 let mut stmt: *mut sqlite_wasm_rs::sqlite3_stmt = std::ptr::null_mut();
825 let ret = unsafe {
826 sqlite_wasm_rs::sqlite3_prepare_v2(
827 self.db(),
828 sql_cstr.as_ptr(),
829 -1,
830 &mut stmt,
831 std::ptr::null_mut(),
832 )
833 };
834
835 if ret != sqlite_wasm_rs::SQLITE_OK {
836 let err_msg = unsafe {
838 let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
839 if !msg_ptr.is_null() {
840 CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
841 } else {
842 format!("Unknown error (code: {})", ret)
843 }
844 };
845
846 #[cfg(feature = "telemetry")]
848 if let Some(metrics) = &self.metrics {
849 metrics.errors_total().inc();
850 }
851 return Err(DatabaseError::new(
852 "SQLITE_ERROR",
853 &format!("Failed to prepare statement: {}", err_msg),
854 )
855 .with_sql(sql));
856 }
857
858 let column_count = unsafe { sqlite_wasm_rs::sqlite3_column_count(stmt) };
860 let mut columns = Vec::new();
861 let mut rows = Vec::new();
862
863 if column_count > 0 {
864 for i in 0..column_count {
866 let col_name = unsafe {
867 let name_ptr = sqlite_wasm_rs::sqlite3_column_name(stmt, i);
868 if name_ptr.is_null() {
869 format!("column_{}", i)
870 } else {
871 std::ffi::CStr::from_ptr(name_ptr)
872 .to_string_lossy()
873 .into_owned()
874 }
875 };
876 columns.push(col_name);
877 }
878
879 loop {
881 let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
882 if step_ret == sqlite_wasm_rs::SQLITE_ROW {
883 let mut values = Vec::new();
884 for i in 0..column_count {
885 let value = unsafe {
886 let col_type = sqlite_wasm_rs::sqlite3_column_type(stmt, i);
887 match col_type {
888 sqlite_wasm_rs::SQLITE_TEXT => {
889 let text_ptr = sqlite_wasm_rs::sqlite3_column_text(stmt, i);
890 if text_ptr.is_null() {
891 ColumnValue::Null
892 } else {
893 let text =
894 std::ffi::CStr::from_ptr(text_ptr as *const i8)
895 .to_string_lossy()
896 .into_owned();
897 ColumnValue::Text(text)
898 }
899 }
900 sqlite_wasm_rs::SQLITE_INTEGER => ColumnValue::Integer(
901 sqlite_wasm_rs::sqlite3_column_int64(stmt, i),
902 ),
903 _ => ColumnValue::Null,
904 }
905 };
906 values.push(value);
907 }
908 rows.push(Row { values });
909 } else if step_ret == sqlite_wasm_rs::SQLITE_DONE {
910 break;
911 } else {
912 let err_msg = unsafe {
914 let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
915 if !err_ptr.is_null() {
916 std::ffi::CStr::from_ptr(err_ptr)
917 .to_string_lossy()
918 .to_string()
919 } else {
920 "Unknown SQLite error".to_string()
921 }
922 };
923 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
924 #[cfg(feature = "telemetry")]
926 if let Some(metrics) = &self.metrics {
927 metrics.errors_total().inc();
928 }
929 return Err(DatabaseError::new(
930 "SQLITE_ERROR",
931 &format!("Failed to execute statement: {}", err_msg),
932 )
933 .with_sql(sql));
934 }
935 }
936 } else {
937 let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
939 if step_ret != sqlite_wasm_rs::SQLITE_DONE {
940 let err_msg = unsafe {
942 let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
943 if !err_ptr.is_null() {
944 std::ffi::CStr::from_ptr(err_ptr)
945 .to_string_lossy()
946 .to_string()
947 } else {
948 "Unknown SQLite error".to_string()
949 }
950 };
951 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
952 #[cfg(feature = "telemetry")]
954 if let Some(metrics) = &self.metrics {
955 metrics.errors_total().inc();
956 }
957 return Err(DatabaseError::new(
958 "SQLITE_ERROR",
959 &format!("Failed to execute statement: {}", err_msg),
960 )
961 .with_sql(sql));
962 }
963 }
964
965 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
967
968 let affected_rows = unsafe { sqlite_wasm_rs::sqlite3_changes(self.db()) } as u32;
969 let last_insert_id = if sql.trim().to_uppercase().starts_with("INSERT") {
970 Some(unsafe { sqlite_wasm_rs::sqlite3_last_insert_rowid(self.db()) })
971 } else {
972 None
973 };
974
975 let execution_time_ms = js_sys::Date::now() - start_time;
976
977 #[cfg(feature = "telemetry")]
979 if let Some(metrics) = &self.metrics {
980 metrics.query_duration().observe(execution_time_ms);
981 }
982
983 #[cfg(feature = "telemetry")]
985 if let Some(mut s) = span {
986 s.status = crate::telemetry::SpanStatus::Ok;
987 s.end_time_ms = Some(js_sys::Date::now());
988 s.attributes
989 .insert("duration_ms".to_string(), execution_time_ms.to_string());
990 s.attributes
991 .insert("affected_rows".to_string(), affected_rows.to_string());
992 s.attributes
993 .insert("row_count".to_string(), rows.len().to_string());
994 if let Some(recorder) = &self.span_recorder {
995 recorder.record_span(s);
996 }
997
998 if let Some(ref context) = self.span_context {
1000 context.exit_span();
1001 }
1002 }
1003
1004 Ok(QueryResult {
1005 columns,
1006 rows,
1007 affected_rows,
1008 last_insert_id,
1009 execution_time_ms,
1010 })
1011 }
1012 }
1013
1014 pub async fn execute_with_params_internal(
1015 &mut self,
1016 sql: &str,
1017 params: &[ColumnValue],
1018 ) -> Result<QueryResult, DatabaseError> {
1019 use std::ffi::{CStr, CString};
1020 let start_time = js_sys::Date::now();
1021
1022 #[cfg(feature = "telemetry")]
1024 let span = if self.span_recorder.is_some() {
1025 let query_type = sql
1026 .trim()
1027 .split_whitespace()
1028 .next()
1029 .unwrap_or("UNKNOWN")
1030 .to_uppercase();
1031 let span = crate::telemetry::SpanBuilder::new("execute_query".to_string())
1032 .with_attribute("query_type", query_type.clone())
1033 .with_attribute("sql", sql.to_string())
1034 .build();
1035 Some(span)
1036 } else {
1037 None
1038 };
1039
1040 #[cfg(feature = "telemetry")]
1042 self.ensure_metrics_propagated();
1043
1044 #[cfg(feature = "telemetry")]
1046 if let Some(metrics) = &self.metrics {
1047 metrics.queries_total().inc();
1048 }
1049
1050 let sql_cstr = CString::new(sql)
1051 .map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL string"))?;
1052
1053 let mut stmt = std::ptr::null_mut();
1054 let ret = unsafe {
1055 sqlite_wasm_rs::sqlite3_prepare_v2(
1056 self.db(),
1057 sql_cstr.as_ptr(),
1058 -1,
1059 &mut stmt,
1060 std::ptr::null_mut(),
1061 )
1062 };
1063
1064 if ret != sqlite_wasm_rs::SQLITE_OK {
1065 let err_msg = unsafe {
1067 let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
1068 if !msg_ptr.is_null() {
1069 CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
1070 } else {
1071 format!("Unknown error (code: {})", ret)
1072 }
1073 };
1074
1075 #[cfg(feature = "telemetry")]
1077 if let Some(metrics) = &self.metrics {
1078 metrics.errors_total().inc();
1079 }
1080
1081 #[cfg(feature = "telemetry")]
1083 if let Some(mut s) = span {
1084 s.status =
1085 crate::telemetry::SpanStatus::Error(format!("Failed to prepare: {}", err_msg));
1086 s.end_time_ms = Some(js_sys::Date::now());
1087 if let Some(recorder) = &self.span_recorder {
1088 recorder.record_span(s);
1089 }
1090 }
1091
1092 return Err(DatabaseError::new(
1093 "SQLITE_ERROR",
1094 &format!("Failed to prepare statement: {}", err_msg),
1095 )
1096 .with_sql(sql));
1097 }
1098
1099 let mut text_cstrings = Vec::new(); for (i, param) in params.iter().enumerate() {
1102 let param_index = (i + 1) as i32;
1103 let bind_ret = unsafe {
1104 match param {
1105 ColumnValue::Null => sqlite_wasm_rs::sqlite3_bind_null(stmt, param_index),
1106 ColumnValue::Integer(val) => {
1107 sqlite_wasm_rs::sqlite3_bind_int64(stmt, param_index, *val)
1108 }
1109 ColumnValue::Real(val) => {
1110 sqlite_wasm_rs::sqlite3_bind_double(stmt, param_index, *val)
1111 }
1112 ColumnValue::Text(val) => {
1113 let sanitized = val.replace('\0', "");
1115 let text_cstr = CString::new(sanitized.as_str())
1117 .expect("CString::new should not fail after null byte removal");
1118 let result = sqlite_wasm_rs::sqlite3_bind_text(
1119 stmt,
1120 param_index,
1121 text_cstr.as_ptr(),
1122 sanitized.len() as i32,
1123 sqlite_wasm_rs::SQLITE_TRANSIENT(),
1124 );
1125 text_cstrings.push(text_cstr); result
1127 }
1128 ColumnValue::Blob(val) => sqlite_wasm_rs::sqlite3_bind_blob(
1129 stmt,
1130 param_index,
1131 val.as_ptr() as *const _,
1132 val.len() as i32,
1133 sqlite_wasm_rs::SQLITE_TRANSIENT(),
1134 ),
1135 _ => sqlite_wasm_rs::sqlite3_bind_null(stmt, param_index),
1136 }
1137 };
1138
1139 if bind_ret != sqlite_wasm_rs::SQLITE_OK {
1140 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
1141 #[cfg(feature = "telemetry")]
1143 if let Some(metrics) = &self.metrics {
1144 metrics.errors_total().inc();
1145 }
1146 return Err(
1147 DatabaseError::new("SQLITE_ERROR", "Failed to bind parameter").with_sql(sql),
1148 );
1149 }
1150 }
1151
1152 if sql.trim().to_uppercase().starts_with("SELECT") {
1153 let column_count = unsafe { sqlite_wasm_rs::sqlite3_column_count(stmt) };
1154 let mut columns = Vec::new();
1155 let mut rows = Vec::new();
1156
1157 for i in 0..column_count {
1159 let col_name = unsafe {
1160 let name_ptr = sqlite_wasm_rs::sqlite3_column_name(stmt, i);
1161 if name_ptr.is_null() {
1162 format!("col_{}", i)
1163 } else {
1164 std::ffi::CStr::from_ptr(name_ptr)
1165 .to_string_lossy()
1166 .into_owned()
1167 }
1168 };
1169 columns.push(col_name);
1170 }
1171
1172 loop {
1174 let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
1175 if step_ret == sqlite_wasm_rs::SQLITE_ROW {
1176 let mut values = Vec::new();
1177 for i in 0..column_count {
1178 let value = unsafe {
1179 let col_type = sqlite_wasm_rs::sqlite3_column_type(stmt, i);
1180 match col_type {
1181 sqlite_wasm_rs::SQLITE_NULL => ColumnValue::Null,
1182 sqlite_wasm_rs::SQLITE_INTEGER => {
1183 let val = sqlite_wasm_rs::sqlite3_column_int64(stmt, i);
1184 ColumnValue::Integer(val)
1185 }
1186 sqlite_wasm_rs::SQLITE_FLOAT => {
1187 let val = sqlite_wasm_rs::sqlite3_column_double(stmt, i);
1188 ColumnValue::Real(val)
1189 }
1190 sqlite_wasm_rs::SQLITE_TEXT => {
1191 let text_ptr = sqlite_wasm_rs::sqlite3_column_text(stmt, i);
1192 if text_ptr.is_null() {
1193 ColumnValue::Null
1194 } else {
1195 let text = std::ffi::CStr::from_ptr(text_ptr as *const i8)
1196 .to_string_lossy()
1197 .into_owned();
1198 ColumnValue::Text(text)
1199 }
1200 }
1201 sqlite_wasm_rs::SQLITE_BLOB => {
1202 let blob_ptr = sqlite_wasm_rs::sqlite3_column_blob(stmt, i);
1203 let blob_size = sqlite_wasm_rs::sqlite3_column_bytes(stmt, i);
1204 if blob_ptr.is_null() || blob_size == 0 {
1205 ColumnValue::Blob(vec![])
1206 } else {
1207 let blob_slice = std::slice::from_raw_parts(
1208 blob_ptr as *const u8,
1209 blob_size as usize,
1210 );
1211 ColumnValue::Blob(blob_slice.to_vec())
1212 }
1213 }
1214 _ => ColumnValue::Null,
1215 }
1216 };
1217 values.push(value);
1218 }
1219 rows.push(Row { values });
1220 } else if step_ret == sqlite_wasm_rs::SQLITE_DONE {
1221 break;
1222 } else {
1223 let err_msg = unsafe {
1225 let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
1226 if !err_ptr.is_null() {
1227 std::ffi::CStr::from_ptr(err_ptr)
1228 .to_string_lossy()
1229 .to_string()
1230 } else {
1231 "Unknown SQLite error".to_string()
1232 }
1233 };
1234 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
1235 #[cfg(feature = "telemetry")]
1237 if let Some(metrics) = &self.metrics {
1238 metrics.errors_total().inc();
1239 }
1240 return Err(DatabaseError::new(
1241 "SQLITE_ERROR",
1242 &format!("Error executing SELECT statement: {}", err_msg),
1243 )
1244 .with_sql(sql));
1245 }
1246 }
1247
1248 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
1249
1250 let execution_time_ms = js_sys::Date::now() - start_time;
1251
1252 #[cfg(feature = "telemetry")]
1254 if let Some(metrics) = &self.metrics {
1255 metrics.query_duration().observe(execution_time_ms);
1256 }
1257
1258 #[cfg(feature = "telemetry")]
1260 if let Some(mut s) = span {
1261 s.status = crate::telemetry::SpanStatus::Ok;
1262 s.end_time_ms = Some(js_sys::Date::now());
1263 s.attributes
1264 .insert("duration_ms".to_string(), execution_time_ms.to_string());
1265 s.attributes
1266 .insert("row_count".to_string(), rows.len().to_string());
1267 if let Some(recorder) = &self.span_recorder {
1268 recorder.record_span(s);
1269 }
1270 }
1271
1272 Ok(QueryResult {
1273 columns,
1274 rows,
1275 affected_rows: 0,
1276 last_insert_id: None,
1277 execution_time_ms,
1278 })
1279 } else {
1280 let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
1282 #[cfg(feature = "telemetry")]
1284 if let Some(metrics) = &self.metrics {
1285 metrics.errors_total().inc();
1286 }
1287 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
1288
1289 if step_ret != sqlite_wasm_rs::SQLITE_DONE {
1290 let err_msg = unsafe {
1291 let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
1292 if !err_ptr.is_null() {
1293 std::ffi::CStr::from_ptr(err_ptr)
1294 .to_string_lossy()
1295 .to_string()
1296 } else {
1297 "Unknown SQLite error".to_string()
1298 }
1299 };
1300 return Err(DatabaseError::new(
1301 "SQLITE_ERROR",
1302 &format!("Failed to execute statement: {}", err_msg),
1303 )
1304 .with_sql(sql));
1305 }
1306
1307 let execution_time_ms = js_sys::Date::now() - start_time;
1308
1309 #[cfg(feature = "telemetry")]
1311 if let Some(metrics) = &self.metrics {
1312 metrics.query_duration().observe(execution_time_ms);
1313 }
1314 let affected_rows = unsafe { sqlite_wasm_rs::sqlite3_changes(self.db()) } as u32;
1315 let last_insert_id = if sql.trim().to_uppercase().starts_with("INSERT") {
1316 Some(unsafe { sqlite_wasm_rs::sqlite3_last_insert_rowid(self.db()) })
1317 } else {
1318 None
1319 };
1320
1321 #[cfg(feature = "telemetry")]
1323 if let Some(mut s) = span {
1324 s.status = crate::telemetry::SpanStatus::Ok;
1325 s.end_time_ms = Some(js_sys::Date::now());
1326 s.attributes
1327 .insert("duration_ms".to_string(), execution_time_ms.to_string());
1328 s.attributes
1329 .insert("affected_rows".to_string(), affected_rows.to_string());
1330 if let Some(recorder) = &self.span_recorder {
1331 recorder.record_span(s);
1332 }
1333 }
1334
1335 Ok(QueryResult {
1336 columns: vec![],
1337 rows: vec![],
1338 affected_rows,
1339 last_insert_id,
1340 execution_time_ms,
1341 })
1342 }
1343 }
1344
1345 #[cfg(feature = "telemetry")]
1347 pub fn set_metrics(&mut self, metrics: Option<crate::telemetry::Metrics>) {
1348 self.metrics = metrics.clone();
1349 self.ensure_metrics_propagated();
1350 }
1351
1352 #[cfg(feature = "telemetry")]
1354 pub fn set_span_recorder(&mut self, recorder: Option<crate::telemetry::SpanRecorder>) {
1355 self.span_recorder = recorder;
1356 }
1357
1358 #[cfg(feature = "telemetry")]
1360 pub fn get_span_context(&self) -> Option<&crate::telemetry::SpanContext> {
1361 self.span_context.as_ref()
1362 }
1363
1364 #[cfg(feature = "telemetry")]
1366 pub fn get_span_recorder(&self) -> Option<&crate::telemetry::SpanRecorder> {
1367 self.span_recorder.as_ref()
1368 }
1369 #[cfg(feature = "telemetry")]
1371 fn ensure_metrics_propagated(&self) {
1372 #[cfg(target_arch = "wasm32")]
1374 {
1375 use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
1376
1377 if self.metrics.is_none() {
1378 return;
1379 }
1380
1381 let db_name = &self.name;
1382
1383 if let Some(storage_rc) = get_storage_with_fallback(db_name) {
1384 use crate::vfs::indexeddb_vfs::with_storage_borrow_mut;
1385 let _ = with_storage_borrow_mut(&storage_rc, "set_metrics", |storage| {
1386 storage.set_metrics(self.metrics.clone());
1387 Ok(())
1388 });
1389 }
1390 }
1391 }
1392
1393 pub async fn close_internal(&mut self) -> Result<(), DatabaseError> {
1394 log::info!("CLOSE_INTERNAL STARTED for: {}", self.name);
1395
1396 if self.connection_state.db.get().is_null() {
1398 log::info!(
1399 "Connection already null for {}, skipping close operations",
1400 self.name
1401 );
1402 return Ok(());
1403 }
1404
1405 log::info!("Checkpointing WAL before close: {}", self.name);
1407 let _ = self
1408 .execute_internal("PRAGMA wal_checkpoint(PASSIVE)")
1409 .await;
1410 log::info!("WAL checkpoint completed for: {}", self.name);
1411
1412 log::info!("Syncing database before close: {}", self.name);
1414 self.sync_internal().await?;
1415 log::info!("Sync completed for: {}", self.name);
1416
1417 web_sys::console::log_1(
1418 &format!("CLOSE: About to stop leader election for {}", self.name).into(),
1419 );
1420
1421 #[cfg(target_arch = "wasm32")]
1423 {
1424 use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
1425
1426 let db_name = &self.name;
1427 web_sys::console::log_1(
1428 &format!("STOP_ELECTION: Getting storage for {}", db_name).into(),
1429 );
1430 log::info!("STOP_ELECTION: Getting storage for {}", db_name);
1431 let storage_rc = get_storage_with_fallback(db_name);
1432
1433 if let Some(storage_rc) = storage_rc {
1434 log::info!("STOP_ELECTION: Found storage for {}, calling stop", db_name);
1435 match with_storage_async!(storage_rc, "stop_leader_election", |storage| storage
1436 .stop_leader_election())
1437 {
1438 Some(Ok(())) => {
1439 log::info!("STOP_ELECTION: Successfully stopped for {}", db_name);
1440 }
1441 Some(Err(e)) => {
1442 log::warn!("STOP_ELECTION: Failed for {}: {:?}", db_name, e);
1443 }
1444 None => {
1445 log::warn!("STOP_ELECTION: Borrow failed for {}", db_name);
1446 }
1447 }
1448 } else {
1449 log::warn!("STOP_ELECTION: No storage found for {}", db_name);
1450 }
1451 log::info!("STOP_ELECTION: Completed section for {}", db_name);
1452 }
1453
1454 log::info!(
1459 "Closed database: {} (connection will be released on Drop)",
1460 self.name
1461 );
1462 Ok(())
1463 }
1464
1465 pub async fn query(&mut self, sql: &str) -> Result<Vec<Row>, DatabaseError> {
1467 let result = self.execute_internal(sql).await?;
1468 Ok(result.rows)
1469 }
1470
1471 pub async fn sync_internal(&mut self) -> Result<(), DatabaseError> {
1472 #[cfg(all(target_arch = "wasm32", feature = "telemetry"))]
1474 let start_time = js_sys::Date::now();
1475
1476 #[cfg(feature = "telemetry")]
1478 let span = if self.span_recorder.is_some() {
1479 let mut builder = crate::telemetry::SpanBuilder::new("vfs_sync".to_string())
1480 .with_attribute("operation", "sync");
1481
1482 if let Some(ref context) = self.span_context {
1484 builder = builder
1485 .with_context(context)
1486 .with_baggage_from_context(context);
1487 }
1488
1489 let span = builder.build();
1490
1491 if let Some(ref context) = self.span_context {
1493 context.enter_span(span.span_id.clone());
1494 }
1495
1496 Some(span)
1497 } else {
1498 None
1499 };
1500
1501 #[cfg(feature = "telemetry")]
1503 if let Some(ref metrics) = self.metrics {
1504 metrics.sync_operations_total().inc();
1505 }
1506
1507 #[cfg(feature = "telemetry")]
1509 let mut blocks_count = 0;
1510
1511 #[cfg(target_arch = "wasm32")]
1513 {
1514 use crate::storage::vfs_sync;
1515
1516 let storage_name = &self.name;
1520
1521 let next_commit = vfs_sync::with_global_commit_marker(|cm| {
1523 #[cfg(target_arch = "wasm32")]
1524 let mut cm_ref = cm.borrow_mut();
1525 #[cfg(not(target_arch = "wasm32"))]
1526 let mut cm_ref = cm.lock();
1527
1528 let current = cm_ref.get(storage_name).copied().unwrap_or(0);
1529 let new_marker = current + 1;
1530 cm_ref.insert(storage_name.to_string(), new_marker);
1531 log::debug!(
1532 "Advanced commit marker for {} from {} to {}",
1533 storage_name,
1534 current,
1535 new_marker
1536 );
1537 new_marker
1538 });
1539
1540 web_sys::console::log_1(
1541 &format!(
1542 "[SYNC] Collecting blocks from GLOBAL_STORAGE for: {}",
1543 storage_name
1544 )
1545 .into(),
1546 );
1547 let (blocks_to_persist, metadata_to_persist) =
1548 vfs_sync::with_global_storage(|storage| {
1549 #[cfg(target_arch = "wasm32")]
1550 let storage_map = storage.borrow();
1551 #[cfg(not(target_arch = "wasm32"))]
1552 let storage_map = storage.lock();
1553
1554 let blocks = if let Some(db_storage) = storage_map.get(storage_name) {
1555 let count = db_storage.len();
1556 web_sys::console::log_1(
1557 &format!("[SYNC] Found {} blocks in GLOBAL_STORAGE", count).into(),
1558 );
1559 db_storage
1560 .iter()
1561 .map(|(&id, data)| (id, data.clone()))
1562 .collect::<Vec<_>>()
1563 } else {
1564 web_sys::console::log_1(
1565 &format!(
1566 "[SYNC] No blocks found in GLOBAL_STORAGE for {}",
1567 storage_name
1568 )
1569 .into(),
1570 );
1571 Vec::new()
1572 };
1573
1574 let metadata = vfs_sync::with_global_metadata(|meta| {
1575 #[cfg(target_arch = "wasm32")]
1576 let meta_map = meta.borrow();
1577 #[cfg(not(target_arch = "wasm32"))]
1578 let meta_map = meta.lock();
1579 if let Some(db_meta) = meta_map.get(storage_name) {
1580 let count = db_meta.len();
1581 web_sys::console::log_1(
1582 &format!("[SYNC] Found {} metadata entries", count).into(),
1583 );
1584 db_meta
1585 .iter()
1586 .map(|(&id, metadata)| (id, metadata.checksum))
1587 .collect::<Vec<_>>()
1588 } else {
1589 web_sys::console::log_1(&format!("[SYNC] No metadata found").into());
1590 Vec::new()
1591 }
1592 });
1593
1594 (blocks, metadata)
1595 });
1596
1597 web_sys::console::log_1(
1598 &format!(
1599 "[SYNC] Preparing to persist {} blocks to IndexedDB",
1600 blocks_to_persist.len()
1601 )
1602 .into(),
1603 );
1604
1605 if !blocks_to_persist.is_empty() {
1606 #[cfg(feature = "telemetry")]
1607 {
1608 blocks_count = blocks_to_persist.len();
1609 }
1610 web_sys::console::log_1(
1611 &format!(
1612 "[SYNC] Persisting {} blocks to IndexedDB",
1613 blocks_to_persist.len()
1614 )
1615 .into(),
1616 );
1617 crate::storage::wasm_indexeddb::persist_to_indexeddb_event_based(
1618 storage_name,
1619 blocks_to_persist,
1620 metadata_to_persist,
1621 next_commit,
1622 #[cfg(feature = "telemetry")]
1623 self.span_recorder.clone(),
1624 #[cfg(feature = "telemetry")]
1625 span.as_ref().map(|s| s.span_id.clone()),
1626 )
1627 .await?;
1628 web_sys::console::log_1(
1629 &format!("[SYNC] Successfully persisted to IndexedDB").into(),
1630 );
1631 } else {
1632 web_sys::console::log_1(
1633 &format!("[SYNC] WARNING: No blocks to persist - GLOBAL_STORAGE is empty!")
1634 .into(),
1635 );
1636 }
1637
1638 use crate::storage::broadcast_notifications::{
1640 BroadcastNotification, send_change_notification,
1641 };
1642
1643 let notification = BroadcastNotification::DataChanged {
1644 db_name: self.name.clone(),
1645 timestamp: js_sys::Date::now() as u64,
1646 };
1647
1648 log::debug!("Sending DataChanged notification for {}", self.name);
1649
1650 if let Err(e) = send_change_notification(¬ification) {
1651 log::warn!("Failed to send change notification: {}", e);
1652 }
1654 }
1655
1656 #[cfg(all(target_arch = "wasm32", feature = "telemetry"))]
1658 if let Some(ref metrics) = self.metrics {
1659 let duration_ms = js_sys::Date::now() - start_time;
1660 metrics.sync_duration().observe(duration_ms);
1661 }
1662
1663 #[cfg(feature = "telemetry")]
1665 if let Some(mut s) = span {
1666 s.status = crate::telemetry::SpanStatus::Ok;
1667 #[cfg(target_arch = "wasm32")]
1668 {
1669 s.end_time_ms = Some(js_sys::Date::now());
1670 let duration_ms = s.end_time_ms.unwrap() - s.start_time_ms;
1671 s.attributes
1672 .insert("duration_ms".to_string(), duration_ms.to_string());
1673 }
1674 #[cfg(not(target_arch = "wasm32"))]
1675 {
1676 let now = std::time::SystemTime::now()
1677 .duration_since(std::time::UNIX_EPOCH)
1678 .unwrap()
1679 .as_millis() as f64;
1680 s.end_time_ms = Some(now);
1681 let duration_ms = s.end_time_ms.unwrap() - s.start_time_ms;
1682 s.attributes
1683 .insert("duration_ms".to_string(), duration_ms.to_string());
1684 }
1685 s.attributes
1686 .insert("blocks_persisted".to_string(), blocks_count.to_string());
1687 if let Some(recorder) = &self.span_recorder {
1688 recorder.record_span(s);
1689 }
1690
1691 if let Some(ref context) = self.span_context {
1693 context.exit_span();
1694 }
1695 }
1696
1697 Ok(())
1698 }
1699}
1700
1701#[cfg(target_arch = "wasm32")]
1702impl Drop for Database {
1703 fn drop(&mut self) {
1704 web_sys::console::log_1(&format!("DROP: Releasing connection for {}", self.name).into());
1705
1706 let pool_key = self.name.trim_end_matches(".db");
1710 crate::connection_pool::release_connection(pool_key);
1711
1712 web_sys::console::log_1(&format!("DROP: Connection released for {}", self.name).into());
1713
1714 use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
1716 if let Some(storage_rc) = get_storage_with_fallback(&self.name) {
1717 let storage = &*storage_rc;
1719 if let Ok(mut manager_ref) = storage.leader_election.try_borrow_mut() {
1721 if let Some(ref mut manager) = *manager_ref {
1722 if let Some(interval_id) = manager.heartbeat_interval.take() {
1724 if let Some(window) = web_sys::window() {
1725 window.clear_interval_with_handle(interval_id);
1726 web_sys::console::log_1(
1727 &format!(
1728 "DROP: Cleared heartbeat interval {} for {}",
1729 interval_id, self.name
1730 )
1731 .into(),
1732 );
1733 }
1734 }
1735 }
1738 } else {
1739 web_sys::console::log_1(
1741 &format!(
1742 "[DROP] Skipping {} (heartbeat already stopped by shared DB)",
1743 self.name
1744 )
1745 .into(),
1746 );
1747 }
1748 }
1749
1750 log::debug!(
1754 "Closed database: {} (BlockStorage remains in registry)",
1755 self.name
1756 );
1757 }
1758}
1759
1760#[cfg(target_arch = "wasm32")]
1762#[wasm_bindgen]
1763impl Database {
1764 #[wasm_bindgen(js_name = "newDatabase")]
1765 pub async fn new_wasm(name: String) -> Result<Database, JsValue> {
1766 let normalized_name = if name.ends_with(".db") {
1768 name.clone()
1769 } else {
1770 format!("{}.db", name)
1771 };
1772
1773 let config = DatabaseConfig {
1774 name: normalized_name.clone(),
1775 version: Some(1),
1776 cache_size: Some(10_000),
1777 page_size: Some(4096),
1778 auto_vacuum: Some(true),
1779 journal_mode: Some("WAL".to_string()),
1780 max_export_size_bytes: Some(2 * 1024 * 1024 * 1024), };
1782
1783 let db = Database::new(config)
1784 .await
1785 .map_err(|e| JsValue::from_str(&format!("Failed to create database: {}", e)))?;
1786
1787 Self::start_write_queue_listener(&normalized_name)?;
1789
1790 Ok(db)
1791 }
1792
1793 #[wasm_bindgen(getter)]
1795 pub fn name(&self) -> String {
1796 self.name.clone()
1797 }
1798
1799 #[wasm_bindgen(js_name = "getAllDatabases")]
1803 pub async fn get_all_databases() -> Result<JsValue, JsValue> {
1804 use crate::storage::vfs_sync::with_global_storage;
1805 use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
1806 use std::collections::HashSet;
1807
1808 log::info!("getAllDatabases called");
1809 let mut db_names = HashSet::new();
1810
1811 match Self::get_persistent_database_list() {
1813 Ok(persistent_list) => {
1814 log::info!("Persistent list has {} entries", persistent_list.len());
1815 for name in persistent_list {
1816 log::info!("Found in persistent list: {}", name);
1817 db_names.insert(name);
1818 }
1819 }
1820 Err(e) => {
1821 log::warn!("Failed to get persistent list: {:?}", e);
1822 }
1823 }
1824
1825 STORAGE_REGISTRY.with(|reg| unsafe {
1828 let registry = &*reg.get();
1829 log::info!("STORAGE_REGISTRY has {} entries", registry.len());
1830 for key in registry.keys() {
1831 log::info!("Found in STORAGE_REGISTRY: {}", key);
1832 db_names.insert(key.clone());
1833 }
1834 });
1835
1836 with_global_storage(|storage| {
1838 let storage_borrow = storage.borrow();
1839 log::info!("GLOBAL_STORAGE has {} entries", storage_borrow.len());
1840 for key in storage_borrow.keys() {
1841 log::info!("Found in GLOBAL_STORAGE: {}", key);
1842 db_names.insert(key.clone());
1843 }
1844 });
1845
1846 log::info!("Total unique databases found: {}", db_names.len());
1847
1848 let mut result_vec: Vec<String> = db_names.into_iter().collect();
1850 result_vec.sort();
1851
1852 let js_array = js_sys::Array::new();
1854 for name in &result_vec {
1855 log::info!("Returning database: {}", name);
1856 js_array.push(&JsValue::from_str(name));
1857 }
1858
1859 log::info!("getAllDatabases returning {} databases", result_vec.len());
1860
1861 Ok(js_array.into())
1862 }
1863
1864 #[wasm_bindgen(js_name = "deleteDatabase")]
1868 pub async fn delete_database(name: String) -> Result<(), JsValue> {
1869 use crate::storage::vfs_sync::{
1870 with_global_commit_marker, with_global_metadata, with_global_storage,
1871 };
1872
1873 let normalized_name = if name.ends_with(".db") {
1875 name.clone()
1876 } else {
1877 format!("{}.db", name)
1878 };
1879
1880 log::info!("Deleting database: {}", normalized_name);
1881
1882 use crate::vfs::indexeddb_vfs::remove_storage_from_registry;
1884 remove_storage_from_registry(&normalized_name);
1885
1886 with_global_storage(|gs| {
1888 #[cfg(target_arch = "wasm32")]
1889 let mut storage = gs.borrow_mut();
1890 #[cfg(not(target_arch = "wasm32"))]
1891 let mut storage = gs.lock();
1892 storage.remove(&normalized_name);
1893 });
1894
1895 with_global_metadata(|gm| {
1897 #[cfg(target_arch = "wasm32")]
1898 let mut metadata = gm.borrow_mut();
1899 #[cfg(not(target_arch = "wasm32"))]
1900 let mut metadata = gm.lock();
1901 metadata.remove(&normalized_name);
1902 });
1903
1904 with_global_commit_marker(|cm| {
1906 #[cfg(target_arch = "wasm32")]
1907 let mut markers = cm.borrow_mut();
1908 #[cfg(not(target_arch = "wasm32"))]
1909 let mut markers = cm.lock();
1910 log::info!(
1911 "Cleared commit marker for {} from GLOBAL storage",
1912 normalized_name
1913 );
1914 markers.remove(&normalized_name);
1915 });
1916
1917 let idb_name = format!("absurder_{}", normalized_name);
1919 let _delete_promise = js_sys::eval(&format!("indexedDB.deleteDatabase('{}')", idb_name))
1920 .map_err(|e| JsValue::from_str(&format!("Failed to delete IndexedDB: {:?}", e)))?;
1921
1922 log::info!("Database deleted: {}", normalized_name);
1923
1924 Self::remove_database_from_persistent_list(&normalized_name)?;
1926
1927 Ok(())
1928 }
1929
1930 #[allow(dead_code)]
1932 fn add_database_to_persistent_list(db_name: &str) -> Result<(), JsValue> {
1933 log::info!("add_database_to_persistent_list called for: {}", db_name);
1934
1935 let window = web_sys::window().ok_or_else(|| {
1936 log::error!("No window object");
1937 JsValue::from_str("No window")
1938 })?;
1939
1940 let storage = window
1941 .local_storage()
1942 .map_err(|e| {
1943 log::error!("Failed to get localStorage: {:?}", e);
1944 JsValue::from_str("No localStorage")
1945 })?
1946 .ok_or_else(|| {
1947 log::error!("localStorage not available");
1948 JsValue::from_str("localStorage not available")
1949 })?;
1950
1951 let key = "absurder_db_list";
1952 let existing = storage.get_item(key).map_err(|e| {
1953 log::error!("Failed to read localStorage key {}: {:?}", key, e);
1954 JsValue::from_str("Failed to read localStorage")
1955 })?;
1956
1957 log::debug!("Existing localStorage value: {:?}", existing);
1958
1959 let mut db_list: Vec<String> = if let Some(json_str) = existing {
1960 match serde_json::from_str(&json_str) {
1961 Ok(list) => {
1962 log::debug!("Parsed existing list: {:?}", list);
1963 list
1964 }
1965 Err(e) => {
1966 log::warn!("Failed to parse localStorage JSON: {}, starting fresh", e);
1967 Vec::new()
1968 }
1969 }
1970 } else {
1971 log::debug!("No existing list, creating new");
1972 Vec::new()
1973 };
1974
1975 if !db_list.contains(&db_name.to_string()) {
1976 db_list.push(db_name.to_string());
1977 db_list.sort();
1978 log::debug!("Updated list: {:?}", db_list);
1979
1980 let json_str = serde_json::to_string(&db_list).map_err(|e| {
1981 log::error!("Failed to serialize list: {}", e);
1982 JsValue::from_str("Failed to serialize")
1983 })?;
1984
1985 log::debug!("Writing to localStorage: {}", json_str);
1986
1987 storage.set_item(key, &json_str).map_err(|e| {
1988 log::error!("Failed to write to localStorage: {:?}", e);
1989 JsValue::from_str("Failed to write localStorage")
1990 })?;
1991
1992 log::info!("Successfully added {} to persistent database list", db_name);
1993 } else {
1994 log::info!("{} already in persistent list", db_name);
1995 }
1996
1997 Ok(())
1998 }
1999
2000 fn remove_database_from_persistent_list(db_name: &str) -> Result<(), JsValue> {
2002 let window = web_sys::window().ok_or_else(|| JsValue::from_str("No window"))?;
2003 let storage = window
2004 .local_storage()
2005 .map_err(|_| JsValue::from_str("No localStorage"))?
2006 .ok_or_else(|| JsValue::from_str("localStorage not available"))?;
2007
2008 let key = "absurder_db_list";
2009 let existing = storage
2010 .get_item(key)
2011 .map_err(|_| JsValue::from_str("Failed to read localStorage"))?;
2012
2013 if let Some(json_str) = existing {
2014 let mut db_list: Vec<String> =
2015 serde_json::from_str(&json_str).unwrap_or_else(|_| Vec::new());
2016 db_list.retain(|name| name != db_name);
2017 let json_str = serde_json::to_string(&db_list)
2018 .map_err(|_| JsValue::from_str("Failed to serialize"))?;
2019 storage
2020 .set_item(key, &json_str)
2021 .map_err(|_| JsValue::from_str("Failed to write localStorage"))?;
2022 log::info!("Removed {} from persistent database list", db_name);
2023 }
2024
2025 Ok(())
2026 }
2027
2028 fn get_persistent_database_list() -> Result<Vec<String>, JsValue> {
2030 log::info!("get_persistent_database_list called");
2031
2032 let window = web_sys::window().ok_or_else(|| {
2033 log::error!("No window object");
2034 JsValue::from_str("No window")
2035 })?;
2036
2037 let storage = window
2038 .local_storage()
2039 .map_err(|e| {
2040 log::error!("Failed to get localStorage: {:?}", e);
2041 JsValue::from_str("No localStorage")
2042 })?
2043 .ok_or_else(|| {
2044 log::error!("localStorage not available");
2045 JsValue::from_str("localStorage not available")
2046 })?;
2047
2048 let key = "absurder_db_list";
2049 let existing = storage.get_item(key).map_err(|e| {
2050 log::error!("Failed to read localStorage key {}: {:?}", key, e);
2051 JsValue::from_str("Failed to read localStorage")
2052 })?;
2053
2054 log::debug!("Read from localStorage: {:?}", existing);
2055
2056 if let Some(json_str) = existing {
2057 match serde_json::from_str::<Vec<String>>(&json_str) {
2058 Ok(db_list) => {
2059 log::info!(
2060 "Successfully parsed {} databases from localStorage",
2061 db_list.len()
2062 );
2063 log::debug!("Database list: {:?}", db_list);
2064 Ok(db_list)
2065 }
2066 Err(e) => {
2067 log::error!("Failed to parse localStorage JSON: {}", e);
2068 Ok(Vec::new())
2069 }
2070 }
2071 } else {
2072 log::info!("No persistent database list in localStorage");
2073 Ok(Vec::new())
2074 }
2075 }
2076
2077 fn start_write_queue_listener(db_name: &str) -> Result<(), JsValue> {
2079 use crate::storage::write_queue::{
2080 WriteQueueMessage, WriteResponse, register_write_queue_listener,
2081 };
2082 use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2083
2084 let db_name_clone = db_name.to_string();
2085
2086 let callback = Closure::wrap(Box::new(move |msg: JsValue| {
2087 let db_name_inner = db_name_clone.clone();
2088
2089 if let Ok(json_str) = js_sys::JSON::stringify(&msg) {
2091 if let Some(json_str) = json_str.as_string() {
2092 if let Ok(message) = serde_json::from_str::<WriteQueueMessage>(&json_str) {
2093 if let WriteQueueMessage::WriteRequest(request) = message {
2094 log::debug!("Leader received write request: {}", request.request_id);
2095
2096 let storage_rc = get_storage_with_fallback(&db_name_inner);
2098
2099 if let Some(storage) = storage_rc {
2100 wasm_bindgen_futures::spawn_local(async move {
2102 let is_leader = with_storage_async!(
2103 storage,
2104 "write_queue_is_leader",
2105 |s| s.is_leader()
2106 );
2107 if is_leader.is_none() {
2108 log::error!("Failed to check leader status");
2109 return;
2110 }
2111 let is_leader = is_leader.unwrap();
2112
2113 if !is_leader {
2114 log::error!("Not leader, ignoring write request");
2115 return;
2116 }
2117
2118 log::debug!("Processing write request as leader");
2119
2120 match Database::new_wasm(db_name_inner.clone()).await {
2122 Ok(mut db) => {
2123 match db.execute_internal(&request.sql).await {
2125 Ok(result) => {
2126 let response = WriteResponse::Success {
2128 request_id: request.request_id.clone(),
2129 affected_rows: result.affected_rows
2130 as usize,
2131 };
2132
2133 use crate::storage::write_queue::send_write_response;
2134 if let Err(e) = send_write_response(
2135 &db_name_inner,
2136 response,
2137 ) {
2138 log::error!(
2139 "Failed to send response: {}",
2140 e
2141 );
2142 } else {
2143 log::info!(
2144 "Write response sent successfully"
2145 );
2146 }
2147 }
2148 Err(e) => {
2149 let response = WriteResponse::Error {
2151 request_id: request.request_id.clone(),
2152 error_message: e.to_string(),
2153 };
2154
2155 use crate::storage::write_queue::send_write_response;
2156 if let Err(e) = send_write_response(
2157 &db_name_inner,
2158 response,
2159 ) {
2160 log::error!(
2161 "Failed to send error response: {}",
2162 e
2163 );
2164 }
2165 }
2166 }
2167 }
2168 Err(e) => {
2169 log::error!(
2170 "Failed to create db for write processing: {:?}",
2171 e
2172 );
2173 }
2174 }
2175 });
2176 }
2177 }
2178 }
2179 }
2180 }
2181 }) as Box<dyn FnMut(JsValue)>);
2182
2183 let callback_fn = callback.as_ref().unchecked_ref();
2184 register_write_queue_listener(db_name, callback_fn).map_err(|e| {
2185 JsValue::from_str(&format!("Failed to register write queue listener: {}", e))
2186 })?;
2187
2188 callback.forget();
2189
2190 Ok(())
2191 }
2192
2193 #[wasm_bindgen]
2194 pub async fn execute(&mut self, sql: &str) -> Result<JsValue, JsValue> {
2195 self.check_write_permission(sql)
2197 .await
2198 .map_err(|e| JsValue::from_str(&format!("Write permission denied: {}", e)))?;
2199
2200 let result = self
2201 .execute_internal(sql)
2202 .await
2203 .map_err(|e| JsValue::from_str(&format!("Query execution failed: {}", e)))?;
2204 serde_wasm_bindgen::to_value(&result).map_err(|e| JsValue::from_str(&e.to_string()))
2205 }
2206
2207 #[wasm_bindgen(js_name = "executeWithParams")]
2208 pub async fn execute_with_params(
2209 &mut self,
2210 sql: &str,
2211 params: JsValue,
2212 ) -> Result<JsValue, JsValue> {
2213 let params: Vec<ColumnValue> = serde_wasm_bindgen::from_value(params)
2214 .map_err(|e| JsValue::from_str(&format!("Invalid parameters: {}", e)))?;
2215
2216 self.check_write_permission(sql)
2218 .await
2219 .map_err(|e| JsValue::from_str(&format!("Write permission denied: {}", e)))?;
2220
2221 let result = self
2222 .execute_with_params_internal(sql, ¶ms)
2223 .await
2224 .map_err(|e| JsValue::from_str(&format!("Query execution failed: {}", e)))?;
2225 serde_wasm_bindgen::to_value(&result).map_err(|e| JsValue::from_str(&e.to_string()))
2226 }
2227
2228 #[wasm_bindgen]
2229 pub async fn close(&mut self) -> Result<(), JsValue> {
2230 self.close_internal()
2231 .await
2232 .map_err(|e| JsValue::from_str(&format!("Failed to close database: {}", e)))
2233 }
2234
2235 #[wasm_bindgen(js_name = "forceCloseConnection")]
2237 pub async fn force_close_connection(&mut self) -> Result<(), JsValue> {
2238 let _ = self.close_internal().await;
2240
2241 let pool_key = self.name.trim_end_matches(".db");
2244 crate::connection_pool::force_close_connection(pool_key);
2245
2246 #[cfg(target_arch = "wasm32")]
2248 {
2249 crate::cleanup::cleanup_all_state(pool_key)
2250 .await
2251 .map_err(|e| JsValue::from_str(&format!("Cleanup failed: {}", e)))?;
2252 }
2253 log::info!("Force closed and removed connection for: {}", self.name);
2254 Ok(())
2255 }
2256
2257 #[wasm_bindgen]
2258 pub async fn sync(&mut self) -> Result<(), JsValue> {
2259 self.sync_internal()
2260 .await
2261 .map_err(|e| JsValue::from_str(&format!("Failed to sync database: {}", e)))
2262 }
2263
2264 #[wasm_bindgen(js_name = "allowNonLeaderWrites")]
2266 pub async fn allow_non_leader_writes(&mut self, allow: bool) -> Result<(), JsValue> {
2267 log::debug!("Setting allowNonLeaderWrites = {} for {}", allow, self.name);
2268 self.allow_non_leader_writes = allow;
2269 Ok(())
2270 }
2271
2272 #[wasm_bindgen(js_name = "exportToFile")]
2288 pub async fn export_to_file(&self) -> Result<js_sys::Uint8Array, JsValue> {
2289 let db_name = self.name.clone();
2290 let max_export_size = self.max_export_size_bytes;
2291
2292 log::info!("[EXPORT] ===== Step 1: Acquiring lock");
2293
2294 let _guard = weblocks::acquire(&db_name, weblocks::AcquireOptions::exclusive()).await?;
2296 log::info!("[EXPORT] ===== Step 2: Lock acquired");
2297
2298 use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2300 log::info!("[EXPORT] ===== Step 3: Getting storage");
2301 let storage_rc = get_storage_with_fallback(&db_name).ok_or_else(|| {
2302 JsValue::from_str(&format!("Storage not found for database: {}", db_name))
2303 })?;
2304 log::info!("[EXPORT] ===== Step 4: Got storage, reloading cache");
2305
2306 #[cfg(target_arch = "wasm32")]
2308 {
2309 storage_rc.reload_cache_from_global_storage();
2310 }
2311
2312 log::info!("[EXPORT] ===== Step 5: Checkpointing WAL");
2315 if !self.connection_state.db.get().is_null() {
2316 use std::ffi::CString;
2318 let pragma = CString::new("PRAGMA wal_checkpoint(PASSIVE)").unwrap();
2319 unsafe {
2320 let mut stmt = std::ptr::null_mut();
2321 let rc = sqlite_wasm_rs::sqlite3_prepare_v2(
2322 self.connection_state.db.get(),
2323 pragma.as_ptr(),
2324 -1,
2325 &mut stmt,
2326 std::ptr::null_mut(),
2327 );
2328 if rc == sqlite_wasm_rs::SQLITE_OK && !stmt.is_null() {
2329 sqlite_wasm_rs::sqlite3_step(stmt);
2330 sqlite_wasm_rs::sqlite3_finalize(stmt);
2331 log::info!("[EXPORT] WAL checkpoint completed");
2332 } else {
2333 log::warn!("[EXPORT] WAL checkpoint failed with rc: {}", rc);
2334 }
2335 }
2336 }
2337
2338 log::info!("[EXPORT] ===== Step 6: Starting sync");
2339 storage_rc
2341 .sync()
2342 .await
2343 .map_err(|e| JsValue::from_str(&format!("Sync failed: {}", e)))?;
2344 log::info!("[EXPORT] ===== Step 7: Sync complete");
2345
2346 log::info!("[EXPORT] Calling export_database_to_bytes");
2348 let db_bytes = {
2349 let storage = &*storage_rc;
2350 crate::storage::export::export_database_to_bytes(storage, max_export_size)
2351 .await
2352 .map_err(|e| {
2353 log::error!("[EXPORT] Export failed: {}", e);
2354 JsValue::from_str(&format!("Export failed: {}", e))
2355 })?
2356 };
2357
2358 log::info!("[EXPORT] Export complete: {} bytes", db_bytes.len());
2359
2360 let uint8_array = js_sys::Uint8Array::new_with_length(db_bytes.len() as u32);
2361 uint8_array.copy_from(&db_bytes);
2362
2363 Ok(uint8_array)
2364 }
2365
2366 #[wasm_bindgen(js_name = "testLock")]
2368 pub async fn test_lock(&self, value: u32) -> Result<u32, JsValue> {
2369 let lock_name = format!("{}.lock_test", self.name);
2370
2371 log::info!(
2372 "[LOCK-TEST] Acquiring lock: {} with value: {}",
2373 lock_name,
2374 value
2375 );
2376 let _guard = weblocks::acquire(&lock_name, weblocks::AcquireOptions::exclusive()).await?;
2377 log::info!("[LOCK-TEST] Lock acquired, processing value: {}", value);
2378
2379 let result = value + 1;
2381
2382 let delay_promise = js_sys::Promise::new(&mut |resolve, _reject| {
2384 let window = web_sys::window().unwrap();
2385 let _ = window
2386 .set_timeout_with_callback_and_timeout_and_arguments_0(resolve.unchecked_ref(), 10);
2387 });
2388 wasm_bindgen_futures::JsFuture::from(delay_promise).await?;
2389
2390 log::info!(
2391 "[LOCK-TEST] Lock releasing for: {} with result: {}",
2392 lock_name,
2393 result
2394 );
2395 Ok(result)
2396 }
2397
2398 #[wasm_bindgen(js_name = "importFromFile")]
2427 pub async fn import_from_file(&mut self, file_data: js_sys::Uint8Array) -> Result<(), JsValue> {
2428 log::info!("[IMPORT] Starting import with lock for: {}", self.name);
2429 let db_name = self.name.clone();
2430 let data = file_data.to_vec();
2431
2432 let _guard = weblocks::acquire(&db_name, weblocks::AcquireOptions::exclusive()).await?;
2434 log::info!("[IMPORT] Lock acquired for: {}", db_name);
2435
2436 log::debug!("Import data size: {} bytes", data.len());
2437
2438 log::debug!("Force-closing database connection before import");
2442
2443 self.close_internal()
2445 .await
2446 .map_err(|e| JsValue::from_str(&format!("Failed to close before import: {}", e)))?;
2447
2448 let pool_key = self.name.trim_end_matches(".db");
2450 crate::connection_pool::force_close_connection(pool_key);
2451
2452 self.connection_state.db.set(std::ptr::null_mut());
2454 log::debug!("Removed connection from pool for import");
2455
2456 crate::storage::import::import_database_from_bytes(&db_name, data)
2458 .await
2459 .map_err(|e| {
2460 log::error!("Import failed for {}: {}", db_name, e);
2461 JsValue::from_str(&format!("Import failed: {}", e))
2462 })?;
2463
2464 log::info!("[IMPORT] Import complete for: {}", db_name);
2465
2466 log::info!("[IMPORT] Reopening connection for: {}", db_name);
2469
2470 use std::ffi::CString;
2471
2472 let vfs_name = format!("vfs_{}", db_name.trim_end_matches(".db"));
2473 let pool_key = db_name.trim_end_matches(".db").to_string();
2474 let db_name_for_closure = db_name.clone();
2475 let vfs_name_for_closure = vfs_name.clone();
2476
2477 let new_state = crate::connection_pool::get_or_create_connection(&pool_key, || {
2478 let mut db = std::ptr::null_mut();
2479 let db_name_cstr = CString::new(db_name_for_closure.clone())
2480 .map_err(|_| "Invalid database name".to_string())?;
2481 let vfs_cstr = CString::new(vfs_name_for_closure.as_str())
2482 .map_err(|_| "Invalid VFS name".to_string())?;
2483
2484 log::info!(
2485 "[IMPORT] Reopening database: {} with VFS: {}",
2486 db_name_for_closure,
2487 vfs_name_for_closure
2488 );
2489
2490 let ret = unsafe {
2491 sqlite_wasm_rs::sqlite3_open_v2(
2492 db_name_cstr.as_ptr(),
2493 &mut db as *mut _,
2494 sqlite_wasm_rs::SQLITE_OPEN_READWRITE | sqlite_wasm_rs::SQLITE_OPEN_CREATE,
2495 vfs_cstr.as_ptr(),
2496 )
2497 };
2498
2499 if ret != sqlite_wasm_rs::SQLITE_OK {
2500 let err_msg = unsafe {
2501 let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(db);
2502 if !msg_ptr.is_null() {
2503 std::ffi::CStr::from_ptr(msg_ptr)
2504 .to_string_lossy()
2505 .into_owned()
2506 } else {
2507 "Unknown error".to_string()
2508 }
2509 };
2510 return Err(format!(
2511 "Failed to reopen database after import: {}",
2512 err_msg
2513 ));
2514 }
2515
2516 log::info!("[IMPORT] Database reopened successfully");
2517 Ok(db)
2518 })
2519 .map_err(|e| {
2520 JsValue::from_str(&format!("Failed to reopen connection after import: {}", e))
2521 })?;
2522
2523 self.connection_state = new_state;
2525 log::info!("[IMPORT] Connection state updated for: {}", db_name);
2526
2527 Ok(())
2528 }
2529
2530 #[wasm_bindgen(js_name = "waitForLeadership")]
2532 pub async fn wait_for_leadership(&mut self) -> Result<(), JsValue> {
2533 use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2534
2535 #[cfg(feature = "telemetry")]
2537 if let Some(ref metrics) = self.metrics {
2538 metrics.leader_election_attempts_total().inc();
2539 }
2540
2541 let db_name = &self.name;
2542 let start_time = js_sys::Date::now();
2543
2544 let timeout_ms = 5000.0; loop {
2547 let storage_rc = get_storage_with_fallback(db_name);
2548
2549 if let Some(storage) = storage_rc {
2550 let is_leader =
2551 match with_storage_async!(storage, "wait_for_leadership", |s| s.is_leader()) {
2552 Some(v) => v,
2553 None => continue,
2554 };
2555
2556 if is_leader {
2557 log::info!("Became leader for {}", db_name);
2558
2559 #[cfg(feature = "telemetry")]
2561 if let Some(ref metrics) = self.metrics {
2562 let duration_ms = js_sys::Date::now() - start_time;
2563 metrics.leader_election_duration().observe(duration_ms);
2564 metrics.is_leader().set(1.0);
2565 metrics.leadership_changes_total().inc();
2566 }
2567
2568 return Ok(());
2569 }
2570 }
2571
2572 if js_sys::Date::now() - start_time > timeout_ms {
2574 #[cfg(feature = "telemetry")]
2576 if let Some(ref metrics) = self.metrics {
2577 let duration_ms = js_sys::Date::now() - start_time;
2578 metrics.leader_election_duration().observe(duration_ms);
2579 }
2580
2581 return Err(JsValue::from_str("Timeout waiting for leadership"));
2582 }
2583
2584 let promise = js_sys::Promise::new(&mut |resolve, _| {
2586 let window = web_sys::window().expect("should have window");
2587 let _ = window.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, 100);
2588 });
2589 let _ = wasm_bindgen_futures::JsFuture::from(promise).await;
2590 }
2591 }
2592
2593 #[wasm_bindgen(js_name = "requestLeadership")]
2595 pub async fn request_leadership(&mut self) -> Result<(), JsValue> {
2596 use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2597
2598 let db_name = &self.name;
2599 log::debug!("Requesting leadership for {}", db_name);
2600
2601 #[cfg(feature = "telemetry")]
2603 let telemetry_data = if self.metrics.is_some() {
2604 let start_time = js_sys::Date::now();
2605 let was_leader = self
2606 .is_leader_wasm()
2607 .await
2608 .ok()
2609 .and_then(|v| v.as_bool())
2610 .unwrap_or(false);
2611
2612 if let Some(ref metrics) = self.metrics {
2613 metrics.leader_elections_total().inc();
2614 }
2615
2616 Some((start_time, was_leader))
2617 } else {
2618 None
2619 };
2620
2621 let storage_rc = get_storage_with_fallback(db_name);
2622
2623 if let Some(storage) = storage_rc {
2624 {
2625 let result = with_storage_async!(storage, "request_leadership", |s| s
2627 .start_leader_election())
2628 .ok_or_else(|| {
2629 JsValue::from_str("Failed to acquire storage lock for leadership request")
2630 })?;
2631 result.map_err(|e| {
2632 JsValue::from_str(&format!("Failed to request leadership: {}", e))
2633 })?;
2634
2635 log::debug!("Re-election triggered for {}", db_name);
2636 } #[cfg(feature = "telemetry")]
2640 if let Some((start_time, was_leader)) = telemetry_data {
2641 if let Some(ref metrics) = self.metrics {
2642 let duration_ms = js_sys::Date::now() - start_time;
2644 metrics.leader_election_duration().observe(duration_ms);
2645
2646 let is_leader_now = self
2648 .is_leader_wasm()
2649 .await
2650 .ok()
2651 .and_then(|v| v.as_bool())
2652 .unwrap_or(false);
2653
2654 metrics
2656 .is_leader()
2657 .set(if is_leader_now { 1.0 } else { 0.0 });
2658
2659 if was_leader != is_leader_now {
2661 metrics.leadership_changes_total().inc();
2662 }
2663 }
2664 }
2665
2666 Ok(())
2667 } else {
2668 Err(JsValue::from_str(&format!(
2669 "No storage found for database: {}",
2670 db_name
2671 )))
2672 }
2673 }
2674
2675 #[wasm_bindgen(js_name = "getLeaderInfo")]
2677 pub async fn get_leader_info(&mut self) -> Result<JsValue, JsValue> {
2678 use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2679
2680 let db_name = &self.name;
2681
2682 let storage_rc = get_storage_with_fallback(db_name);
2683
2684 if let Some(storage) = storage_rc {
2685 let is_leader = with_storage_async!(storage, "get_leader_info", |s| s.is_leader())
2686 .ok_or_else(|| {
2687 JsValue::from_str(&format!(
2688 "Failed to access storage for database: {}",
2689 db_name
2690 ))
2691 })?;
2692
2693 let leader_id_str = if is_leader {
2696 format!("leader_{}", db_name)
2697 } else {
2698 "unknown".to_string()
2699 };
2700
2701 let obj = js_sys::Object::new();
2703 js_sys::Reflect::set(&obj, &"isLeader".into(), &JsValue::from_bool(is_leader))?;
2704 js_sys::Reflect::set(&obj, &"leaderId".into(), &JsValue::from_str(&leader_id_str))?;
2705 js_sys::Reflect::set(
2706 &obj,
2707 &"leaseExpiry".into(),
2708 &JsValue::from_f64(js_sys::Date::now()),
2709 )?;
2710
2711 Ok(obj.into())
2712 } else {
2713 Err(JsValue::from_str(&format!(
2714 "No storage found for database: {}",
2715 db_name
2716 )))
2717 }
2718 }
2719
2720 #[wasm_bindgen(js_name = "queueWrite")]
2731 pub async fn queue_write(&mut self, sql: String) -> Result<(), JsValue> {
2732 self.queue_write_with_timeout(sql, 5000).await
2733 }
2734
2735 #[wasm_bindgen(js_name = "queueWriteWithTimeout")]
2741 pub async fn queue_write_with_timeout(
2742 &mut self,
2743 sql: String,
2744 timeout_ms: u32,
2745 ) -> Result<(), JsValue> {
2746 use crate::storage::write_queue::{WriteQueueMessage, WriteResponse, send_write_request};
2747 use std::cell::RefCell;
2748 use std::rc::Rc;
2749
2750 log::debug!("Queuing write: {}", sql);
2751
2752 use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2754 let is_leader = {
2755 let storage_rc = get_storage_with_fallback(&self.name);
2756
2757 if let Some(storage) = storage_rc {
2758 with_storage_async!(storage, "queue_write_is_leader", |s| s.is_leader())
2759 .unwrap_or(false)
2760 } else {
2761 false
2762 }
2763 };
2764
2765 if is_leader {
2766 log::debug!("We are leader, executing directly");
2767 return self
2768 .execute_internal(&sql)
2769 .await
2770 .map(|_| ())
2771 .map_err(|e| JsValue::from_str(&format!("Execute failed: {}", e)));
2772 }
2773
2774 let request_id = send_write_request(&self.name, &sql)
2776 .map_err(|e| JsValue::from_str(&format!("Failed to send write request: {}", e)))?;
2777
2778 log::debug!("Write request sent with ID: {}", request_id);
2779
2780 let response_received = Rc::new(RefCell::new(false));
2782 let response_error = Rc::new(RefCell::new(None::<String>));
2783
2784 let response_received_clone = response_received.clone();
2785 let response_error_clone = response_error.clone();
2786 let request_id_clone = request_id.clone();
2787
2788 let callback = Closure::wrap(Box::new(move |msg: JsValue| {
2790 if let Ok(json_str) = js_sys::JSON::stringify(&msg) {
2792 if let Some(json_str) = json_str.as_string() {
2793 if let Ok(message) = serde_json::from_str::<WriteQueueMessage>(&json_str) {
2794 if let WriteQueueMessage::WriteResponse(response) = message {
2795 match response {
2796 WriteResponse::Success { request_id, .. } => {
2797 if request_id == request_id_clone {
2798 *response_received_clone.borrow_mut() = true;
2799 log::debug!("Write response received: Success");
2800 }
2801 }
2802 WriteResponse::Error {
2803 request_id,
2804 error_message,
2805 } => {
2806 if request_id == request_id_clone {
2807 *response_received_clone.borrow_mut() = true;
2808 *response_error_clone.borrow_mut() = Some(error_message);
2809 log::debug!("Write response received: Error");
2810 }
2811 }
2812 }
2813 }
2814 }
2815 }
2816 }
2817 }) as Box<dyn FnMut(JsValue)>);
2818
2819 use crate::storage::write_queue::register_write_queue_listener;
2821 let callback_fn = callback.as_ref().unchecked_ref();
2822 register_write_queue_listener(&self.name, callback_fn)
2823 .map_err(|e| JsValue::from_str(&format!("Failed to register listener: {}", e)))?;
2824
2825 callback.forget();
2827
2828 let start_time = js_sys::Date::now();
2830 let timeout_f64 = timeout_ms as f64;
2831
2832 loop {
2833 if *response_received.borrow() {
2835 if let Some(error_msg) = response_error.borrow().as_ref() {
2836 return Err(JsValue::from_str(&format!("Write failed: {}", error_msg)));
2837 }
2838 log::info!("Write completed successfully");
2839 return Ok(());
2840 }
2841
2842 let elapsed = js_sys::Date::now() - start_time;
2844 if elapsed > timeout_f64 {
2845 return Err(JsValue::from_str("Write request timed out"));
2846 }
2847
2848 wasm_bindgen_futures::JsFuture::from(js_sys::Promise::new(&mut |resolve, _reject| {
2850 if let Some(window) = web_sys::window() {
2851 let _ =
2852 window.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, 100);
2853 } else {
2854 log::error!("Window unavailable in timeout handler");
2855 }
2856 }))
2857 .await
2858 .ok();
2859 }
2860 }
2861
2862 #[wasm_bindgen(js_name = "isLeader")]
2863 pub async fn is_leader_wasm(&self) -> Result<JsValue, JsValue> {
2864 use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2866
2867 let db_name = &self.name;
2868 log::debug!("isLeader() called for database: {} (self.name)", db_name);
2869
2870 let storage_rc = get_storage_with_fallback(db_name);
2871
2872 if let Some(storage) = storage_rc {
2873 log::debug!("Found storage for {}, calling is_leader()", db_name);
2874 let is_leader = with_storage_async!(storage, "is_leader_wasm", |s| s.is_leader())
2875 .ok_or_else(|| {
2876 JsValue::from_str(&format!(
2877 "Failed to access storage for database: {}",
2878 db_name
2879 ))
2880 })?;
2881 log::debug!("is_leader() = {} for {}", is_leader, db_name);
2882
2883 Ok(JsValue::from_bool(is_leader))
2885 } else {
2886 log::error!("ERROR: No storage found for database: {}", db_name);
2887 Err(JsValue::from_str(&format!(
2888 "No storage found for database: {}",
2889 db_name
2890 )))
2891 }
2892 }
2893
2894 pub async fn is_leader(&self) -> Result<bool, JsValue> {
2896 let result = self.is_leader_wasm().await?;
2897 Ok(result.as_bool().unwrap_or(false))
2898 }
2899
2900 #[wasm_bindgen(js_name = "onDataChange")]
2901 pub fn on_data_change_wasm(&mut self, callback: &js_sys::Function) -> Result<(), JsValue> {
2902 log::debug!("Registering onDataChange callback for {}", self.name);
2903
2904 self.on_data_change_callback = Some(callback.clone());
2906
2907 use crate::storage::broadcast_notifications::register_change_listener;
2909
2910 let db_name = &self.name;
2911 register_change_listener(db_name, callback).map_err(|e| {
2912 JsValue::from_str(&format!("Failed to register change listener: {}", e))
2913 })?;
2914
2915 log::debug!("onDataChange callback registered for {}", self.name);
2916 Ok(())
2917 }
2918
2919 #[wasm_bindgen(js_name = "enableOptimisticUpdates")]
2921 pub async fn enable_optimistic_updates(&mut self, enabled: bool) -> Result<(), JsValue> {
2922 self.optimistic_updates_manager
2923 .borrow_mut()
2924 .set_enabled(enabled);
2925 log::debug!(
2926 "Optimistic updates {}",
2927 if enabled { "enabled" } else { "disabled" }
2928 );
2929 Ok(())
2930 }
2931
2932 #[wasm_bindgen(js_name = "isOptimisticMode")]
2934 pub async fn is_optimistic_mode(&self) -> bool {
2935 self.optimistic_updates_manager.borrow().is_enabled()
2936 }
2937
2938 #[wasm_bindgen(js_name = "trackOptimisticWrite")]
2940 pub async fn track_optimistic_write(&mut self, sql: String) -> Result<String, JsValue> {
2941 let id = self
2942 .optimistic_updates_manager
2943 .borrow_mut()
2944 .track_write(sql);
2945 Ok(id)
2946 }
2947
2948 #[wasm_bindgen(js_name = "getPendingWritesCount")]
2950 pub async fn get_pending_writes_count(&self) -> usize {
2951 self.optimistic_updates_manager.borrow().get_pending_count()
2952 }
2953
2954 #[wasm_bindgen(js_name = "clearOptimisticWrites")]
2956 pub async fn clear_optimistic_writes(&mut self) -> Result<(), JsValue> {
2957 self.optimistic_updates_manager.borrow_mut().clear_all();
2958 Ok(())
2959 }
2960
2961 #[wasm_bindgen(js_name = "enableCoordinationMetrics")]
2963 pub async fn enable_coordination_metrics(&mut self, enabled: bool) -> Result<(), JsValue> {
2964 self.coordination_metrics_manager
2965 .borrow_mut()
2966 .set_enabled(enabled);
2967 Ok(())
2968 }
2969
2970 #[wasm_bindgen(js_name = "isCoordinationMetricsEnabled")]
2972 pub async fn is_coordination_metrics_enabled(&self) -> bool {
2973 self.coordination_metrics_manager.borrow().is_enabled()
2974 }
2975
2976 #[wasm_bindgen(js_name = "recordLeadershipChange")]
2978 pub async fn record_leadership_change(&mut self, became_leader: bool) -> Result<(), JsValue> {
2979 self.coordination_metrics_manager
2980 .borrow_mut()
2981 .record_leadership_change(became_leader);
2982 Ok(())
2983 }
2984
2985 #[wasm_bindgen(js_name = "recordNotificationLatency")]
2987 pub async fn record_notification_latency(&mut self, latency_ms: f64) -> Result<(), JsValue> {
2988 self.coordination_metrics_manager
2989 .borrow_mut()
2990 .record_notification_latency(latency_ms);
2991 Ok(())
2992 }
2993
2994 #[wasm_bindgen(js_name = "recordWriteConflict")]
2996 pub async fn record_write_conflict(&mut self) -> Result<(), JsValue> {
2997 self.coordination_metrics_manager
2998 .borrow_mut()
2999 .record_write_conflict();
3000 Ok(())
3001 }
3002
3003 #[wasm_bindgen(js_name = "recordFollowerRefresh")]
3005 pub async fn record_follower_refresh(&mut self) -> Result<(), JsValue> {
3006 self.coordination_metrics_manager
3007 .borrow_mut()
3008 .record_follower_refresh();
3009 Ok(())
3010 }
3011
3012 #[wasm_bindgen(js_name = "getCoordinationMetrics")]
3014 pub async fn get_coordination_metrics(&self) -> Result<String, JsValue> {
3015 self.coordination_metrics_manager
3016 .borrow()
3017 .get_metrics_json()
3018 .map_err(|e| JsValue::from_str(&e))
3019 }
3020
3021 #[wasm_bindgen(js_name = "resetCoordinationMetrics")]
3023 pub async fn reset_coordination_metrics(&mut self) -> Result<(), JsValue> {
3024 self.coordination_metrics_manager.borrow_mut().reset();
3025 Ok(())
3026 }
3027}
3028
3029#[cfg(target_arch = "wasm32")]
3031#[wasm_bindgen]
3032pub struct WasmColumnValue {
3033 #[allow(dead_code)]
3034 inner: ColumnValue,
3035}
3036
3037#[cfg(target_arch = "wasm32")]
3038#[wasm_bindgen]
3039impl WasmColumnValue {
3040 #[wasm_bindgen(js_name = "createNull")]
3041 pub fn create_null() -> WasmColumnValue {
3042 WasmColumnValue {
3043 inner: ColumnValue::Null,
3044 }
3045 }
3046
3047 #[wasm_bindgen(js_name = "createInteger")]
3048 pub fn create_integer(value: i64) -> WasmColumnValue {
3049 WasmColumnValue {
3050 inner: ColumnValue::Integer(value),
3051 }
3052 }
3053
3054 #[wasm_bindgen(js_name = "createReal")]
3055 pub fn create_real(value: f64) -> WasmColumnValue {
3056 WasmColumnValue {
3057 inner: ColumnValue::Real(value),
3058 }
3059 }
3060
3061 #[wasm_bindgen(js_name = "createText")]
3062 pub fn create_text(value: String) -> WasmColumnValue {
3063 WasmColumnValue {
3064 inner: ColumnValue::Text(value),
3065 }
3066 }
3067
3068 #[wasm_bindgen(js_name = "createBlob")]
3069 pub fn create_blob(value: &[u8]) -> WasmColumnValue {
3070 WasmColumnValue {
3071 inner: ColumnValue::Blob(value.to_vec()),
3072 }
3073 }
3074
3075 #[wasm_bindgen(js_name = "createBigInt")]
3076 pub fn create_bigint(value: &str) -> WasmColumnValue {
3077 WasmColumnValue {
3078 inner: ColumnValue::BigInt(value.to_string()),
3079 }
3080 }
3081
3082 #[wasm_bindgen(js_name = "createDate")]
3083 pub fn create_date(timestamp: f64) -> WasmColumnValue {
3084 WasmColumnValue {
3085 inner: ColumnValue::Date(timestamp as i64),
3086 }
3087 }
3088
3089 #[wasm_bindgen(js_name = "fromJsValue")]
3090 pub fn from_js_value(value: &JsValue) -> WasmColumnValue {
3091 if value.is_null() || value.is_undefined() {
3092 WasmColumnValue {
3093 inner: ColumnValue::Null,
3094 }
3095 } else if let Some(s) = value.as_string() {
3096 if let Ok(parsed) = s.parse::<i64>() {
3098 WasmColumnValue {
3099 inner: ColumnValue::Integer(parsed),
3100 }
3101 } else {
3102 WasmColumnValue {
3103 inner: ColumnValue::Text(s),
3104 }
3105 }
3106 } else if let Some(n) = value.as_f64() {
3107 if n.fract() == 0.0 && n >= i64::MIN as f64 && n <= i64::MAX as f64 {
3108 WasmColumnValue {
3109 inner: ColumnValue::Integer(n as i64),
3110 }
3111 } else {
3112 WasmColumnValue {
3113 inner: ColumnValue::Real(n),
3114 }
3115 }
3116 } else if value.is_object() {
3117 if js_sys::Date::new(value).get_time().is_finite() {
3119 let timestamp = js_sys::Date::new(value).get_time() as i64;
3120 WasmColumnValue {
3121 inner: ColumnValue::Date(timestamp),
3122 }
3123 } else {
3124 WasmColumnValue {
3126 inner: ColumnValue::Text(format!("{:?}", value)),
3127 }
3128 }
3129 } else {
3130 WasmColumnValue {
3131 inner: ColumnValue::Null,
3132 }
3133 }
3134 }
3135
3136 pub fn null() -> WasmColumnValue {
3140 Self::create_null()
3141 }
3142
3143 pub fn integer(value: f64) -> WasmColumnValue {
3145 Self::create_integer(value as i64)
3146 }
3147
3148 pub fn real(value: f64) -> WasmColumnValue {
3149 Self::create_real(value)
3150 }
3151
3152 pub fn text(value: String) -> WasmColumnValue {
3153 Self::create_text(value)
3154 }
3155
3156 pub fn blob(value: Vec<u8>) -> WasmColumnValue {
3157 Self::create_blob(&value)
3158 }
3159
3160 pub fn big_int(value: String) -> WasmColumnValue {
3161 Self::create_bigint(&value)
3162 }
3163
3164 pub fn date(timestamp_ms: f64) -> WasmColumnValue {
3165 Self::create_date(timestamp_ms)
3166 }
3167}