1#[cfg(target_arch = "wasm32")]
2use std::rc::Rc;
3#[cfg(target_arch = "wasm32")]
4use wasm_bindgen::prelude::*;
5
6#[cfg(all(
11 not(target_arch = "wasm32"),
12 any(
13 feature = "bundled-sqlite",
14 feature = "encryption",
15 feature = "encryption-commoncrypto",
16 feature = "encryption-ios"
17 )
18))]
19pub extern crate rusqlite;
20
21#[cfg(feature = "console_error_panic_hook")]
23pub use console_error_panic_hook::set_once as set_panic_hook;
24
25#[cfg(feature = "wee_alloc")]
26#[global_allocator]
27static ALLOC: wee_alloc::WeeAlloc = wee_alloc::WeeAlloc::INIT;
28
29#[cfg(all(target_arch = "wasm32", feature = "console_log"))]
31#[wasm_bindgen(start)]
32pub fn init_logger() {
33 #[cfg(debug_assertions)]
36 let log_level = log::Level::Debug;
37 #[cfg(not(debug_assertions))]
38 let log_level = log::Level::Info;
39
40 console_log::init_with_level(log_level).expect("Failed to initialize console_log");
41
42 log::info!("AbsurderSQL logging initialized at level: {:?}", log_level);
43}
44
45#[inline]
49#[cfg(target_arch = "wasm32")]
50fn normalize_db_name(name: &str) -> String {
51 if name.ends_with(".db") {
54 name.to_string()
55 } else {
56 format!("{}.db", name)
57 }
58}
59
60mod cleanup;
62#[cfg(target_arch = "wasm32")]
63pub mod connection_pool;
64#[cfg(not(target_arch = "wasm32"))]
65pub mod database;
66pub mod storage;
67pub mod types;
68pub mod vfs;
69#[cfg(not(target_arch = "wasm32"))]
70pub use database::PreparedStatement;
71pub mod utils;
72
73#[cfg(feature = "telemetry")]
74pub mod telemetry;
75
76#[cfg(not(target_arch = "wasm32"))]
78pub use database::SqliteIndexedDB;
79
80#[cfg(target_arch = "wasm32")]
82thread_local! {
83 static DB_OPEN_IN_PROGRESS: std::cell::RefCell<std::collections::HashSet<String>> =
84 std::cell::RefCell::new(std::collections::HashSet::new());
85}
86
87#[cfg(not(target_arch = "wasm32"))]
89pub type Database = SqliteIndexedDB;
90
91pub use types::DatabaseConfig;
92pub use types::{ColumnValue, DatabaseError, QueryResult, Row, TransactionOptions};
93
94pub use vfs::indexeddb_vfs::IndexedDBVFS;
96
97#[cfg(target_arch = "wasm32")]
99macro_rules! with_storage_async {
100 ($storage:expr, $operation:expr, |$s:ident| $body:expr) => {{
101 let $s = &*$storage;
103 Some($body.await)
104 }};
105}
106
107#[cfg(target_arch = "wasm32")]
109#[wasm_bindgen]
110pub struct Database {
111 #[wasm_bindgen(skip)]
112 connection_state: Rc<crate::connection_pool::ConnectionState>,
113 #[allow(dead_code)]
114 name: String,
115 #[wasm_bindgen(skip)]
116 on_data_change_callback: Option<js_sys::Function>,
117 #[wasm_bindgen(skip)]
118 allow_non_leader_writes: bool,
119 #[wasm_bindgen(skip)]
120 optimistic_updates_manager:
121 std::cell::RefCell<crate::storage::optimistic_updates::OptimisticUpdatesManager>,
122 #[wasm_bindgen(skip)]
123 coordination_metrics_manager:
124 std::cell::RefCell<crate::storage::coordination_metrics::CoordinationMetricsManager>,
125 #[wasm_bindgen(skip)]
126 #[cfg(feature = "telemetry")]
127 metrics: Option<crate::telemetry::Metrics>,
128 #[wasm_bindgen(skip)]
129 #[cfg(feature = "telemetry")]
130 span_recorder: Option<crate::telemetry::SpanRecorder>,
131 #[wasm_bindgen(skip)]
132 #[cfg(feature = "telemetry")]
133 span_context: Option<crate::telemetry::SpanContext>,
134 #[wasm_bindgen(skip)]
135 max_export_size_bytes: Option<u64>,
136}
137
138#[cfg(target_arch = "wasm32")]
139impl Database {
140 fn db(&self) -> *mut sqlite_wasm_rs::sqlite3 {
143 let db_ptr = self.connection_state.db.get();
144 if db_ptr.is_null() {
145 panic!("Database connection is null for {}", self.name);
146 }
147 db_ptr
148 }
149
150 fn is_write_operation(sql: &str) -> bool {
152 let upper = sql.trim().to_uppercase();
153 upper.starts_with("INSERT")
154 || upper.starts_with("UPDATE")
155 || upper.starts_with("DELETE")
156 || upper.starts_with("REPLACE")
157 }
158
159 #[cfg(feature = "telemetry")]
163 pub fn metrics(&self) -> Option<&crate::telemetry::Metrics> {
164 self.metrics.as_ref()
165 }
166
167 async fn check_write_permission(&mut self, sql: &str) -> Result<(), DatabaseError> {
169 if !Self::is_write_operation(sql) {
170 return Ok(());
172 }
173
174 if self.allow_non_leader_writes {
176 log::info!("WRITE_ALLOWED: Non-leader writes enabled for {}", self.name);
177 return Ok(());
178 }
179
180 use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
182
183 let db_name = &self.name;
184 let storage_rc = get_storage_with_fallback(db_name);
185
186 if let Some(storage) = storage_rc {
187 let is_leader = with_storage_async!(storage, "check_write_permission", |s| s
188 .is_leader())
189 .ok_or_else(|| {
190 DatabaseError::new("BORROW_CONFLICT", "Failed to check leader status")
191 })?;
192
193 if !is_leader {
194 log::error!("WRITE_DENIED: Instance is not leader for {}", db_name);
195 return Err(DatabaseError::new(
196 "WRITE_PERMISSION_DENIED",
197 "Only the leader tab can write to this database. Use db.isLeader() to check status or call db.allowNonLeaderWrites(true) for single-tab mode.",
198 ));
199 }
200
201 log::info!("WRITE_ALLOWED: Instance is leader for {}", db_name);
202 Ok(())
203 } else {
204 log::info!(
206 "WRITE_ALLOWED: No storage found for {} (single-instance mode)",
207 db_name
208 );
209 Ok(())
210 }
211 }
212
213 pub async fn new(config: DatabaseConfig) -> Result<Self, DatabaseError> {
214 use std::ffi::{CStr, CString};
215
216 log::info!("Database::new called for {}", config.name);
217
218 let normalized_name = normalize_db_name(&config.name);
221
222 let vfs_name = format!("vfs_{}", normalized_name.trim_end_matches(".db"));
224 let vfs_name_cstr = CString::new(vfs_name.as_str())
225 .map_err(|_| DatabaseError::new("INVALID_VFS_NAME", "Invalid VFS name"))?;
226 let vfs_exists = unsafe {
227 let existing_vfs = sqlite_wasm_rs::sqlite3_vfs_find(vfs_name_cstr.as_ptr());
228 !existing_vfs.is_null()
229 };
230
231 if !vfs_exists {
232 log::debug!("Creating IndexedDBVFS for: {}", normalized_name);
234 let vfs = crate::vfs::IndexedDBVFS::new(&normalized_name).await?;
235 log::debug!("Registering VFS as '{}'", vfs_name);
236 vfs.register(&vfs_name)?;
237 log::info!("VFS registered successfully");
238 } else {
239 log::info!("VFS '{}' already registered, reusing existing", vfs_name);
240 let _vfs = crate::vfs::IndexedDBVFS::new(&normalized_name).await?;
243 log::info!("BlockStorage ensured for {}", normalized_name);
244 }
245
246 #[cfg(target_arch = "wasm32")]
249 {
250 const MAX_OPEN_WAIT_MS: u32 = 1000; const OPEN_POLL_MS: u32 = 10;
252 let max_open_attempts = MAX_OPEN_WAIT_MS / OPEN_POLL_MS;
253
254 for attempt in 0..max_open_attempts {
255 let can_open = DB_OPEN_IN_PROGRESS.with(|opens| {
256 let mut set = opens.borrow_mut();
257 if set.contains(&config.name) {
258 false } else {
260 set.insert(config.name.clone());
261 true }
263 });
264
265 if can_open {
266 web_sys::console::log_1(
267 &format!("[DB] {} - ACQUIRED sqlite open lock", config.name).into(),
268 );
269 break;
270 } else {
271 web_sys::console::log_1(
272 &format!(
273 "[DB] {} - Waiting for sqlite open (attempt {})",
274 config.name, attempt
275 )
276 .into(),
277 );
278 use wasm_bindgen_futures::JsFuture;
279 let promise = js_sys::Promise::new(&mut |resolve, _| {
280 web_sys::window()
281 .unwrap()
282 .set_timeout_with_callback_and_timeout_and_arguments_0(
283 &resolve,
284 OPEN_POLL_MS as i32,
285 )
286 .unwrap();
287 });
288 JsFuture::from(promise).await.ok();
289 continue;
290 }
291 }
292 }
293
294 let (connection_state, db) = {
296 let vfs_name_str = vfs_name.clone(); let filename_copy = normalized_name.clone(); let pool_key = normalized_name.trim_end_matches(".db").to_string(); let state = crate::connection_pool::get_or_create_connection(&pool_key, || {
300 let mut db = std::ptr::null_mut();
301 let db_name = CString::new(normalized_name.clone())
302 .map_err(|_| "Invalid database name".to_string())?;
303 let vfs_cstr = CString::new(vfs_name_str.as_str())
304 .map_err(|_| "Invalid VFS name".to_string())?;
305
306 log::info!(
307 "Opening database: {} with VFS: {}",
308 filename_copy,
309 vfs_name_str
310 );
311
312 #[cfg(target_arch = "wasm32")]
313 web_sys::console::log_1(&format!("[OPEN] About to call sqlite3_open_v2...").into());
314
315 let ret = unsafe {
316 sqlite_wasm_rs::sqlite3_open_v2(
317 db_name.as_ptr(),
318 &mut db as *mut _,
319 sqlite_wasm_rs::SQLITE_OPEN_READWRITE | sqlite_wasm_rs::SQLITE_OPEN_CREATE,
320 vfs_cstr.as_ptr(),
321 )
322 };
323
324 #[cfg(target_arch = "wasm32")]
325 web_sys::console::log_1(
326 &format!("[OPEN] sqlite3_open_v2 returned: {}", ret).into(),
327 );
328
329 log::info!(
330 "sqlite3_open_v2 returned: {} for database: {}",
331 ret,
332 filename_copy
333 );
334
335 if ret != sqlite_wasm_rs::SQLITE_OK {
336 let err_msg = unsafe {
337 let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(db);
338 if !msg_ptr.is_null() {
339 CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
340 } else {
341 "Unknown error".to_string()
342 }
343 };
344
345 #[cfg(target_arch = "wasm32")]
346 web_sys::console::log_1(
347 &format!(
348 "[OPEN] ERROR - sqlite3_open_v2 FAILED: ret={}, err={}",
349 ret, err_msg
350 )
351 .into(),
352 );
353
354 return Err(format!(
355 "Failed to open database with IndexedDB VFS: {}",
356 err_msg
357 ));
358 }
359
360 log::info!("Database opened successfully with IndexedDB VFS");
361 Ok(db)
362 })
363 .map_err(|e| DatabaseError::new("CONNECTION_POOL_ERROR", &e))?;
364 let db_ptr = state.db.get();
365 (state, db_ptr)
366 };
367
368 let exec_sql = |db: *mut sqlite_wasm_rs::sqlite3, sql: &str| -> Result<(), DatabaseError> {
370 let c_sql = CString::new(sql)
371 .map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL statement"))?;
372
373 let ret = unsafe {
374 sqlite_wasm_rs::sqlite3_exec(
375 db,
376 c_sql.as_ptr(),
377 None,
378 std::ptr::null_mut(),
379 std::ptr::null_mut(),
380 )
381 };
382
383 if ret != sqlite_wasm_rs::SQLITE_OK {
384 let err_msg = unsafe {
385 let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(db);
386 if !msg_ptr.is_null() {
387 CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
388 } else {
389 "Unknown error".to_string()
390 }
391 };
392 log::warn!("Failed to execute SQL '{}': {}", sql, err_msg);
393 return Err(DatabaseError::new(
394 "SQLITE_ERROR",
395 &format!("Failed to execute: {}", err_msg),
396 ));
397 }
398 Ok(())
399 };
400
401 log::debug!("Setting busy_timeout to 10000ms for concurrent access handling");
405 exec_sql(db, "PRAGMA busy_timeout = 10000")?;
406
407 if let Some(page_size) = config.page_size {
409 log::debug!("Setting page_size to {}", page_size);
410 exec_sql(db, &format!("PRAGMA page_size = {}", page_size))?;
411 }
412
413 if let Some(cache_size) = config.cache_size {
415 log::debug!("Setting cache_size to {}", cache_size);
416 exec_sql(db, &format!("PRAGMA cache_size = {}", cache_size))?;
417 }
418
419 if let Some(ref journal_mode) = config.journal_mode {
422 log::debug!("Setting journal_mode to {}", journal_mode);
423
424 let pragma_sql = format!("PRAGMA journal_mode = {}", journal_mode);
425 let c_sql = CString::new(pragma_sql.as_str())
426 .map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL statement"))?;
427
428 let mut stmt: *mut sqlite_wasm_rs::sqlite3_stmt = std::ptr::null_mut();
429 let ret = unsafe {
430 sqlite_wasm_rs::sqlite3_prepare_v2(
431 db,
432 c_sql.as_ptr(),
433 -1,
434 &mut stmt as *mut _,
435 std::ptr::null_mut(),
436 )
437 };
438
439 if ret == sqlite_wasm_rs::SQLITE_OK && !stmt.is_null() {
440 let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
441 if step_ret == sqlite_wasm_rs::SQLITE_ROW {
442 let result_ptr = unsafe { sqlite_wasm_rs::sqlite3_column_text(stmt, 0) };
443 if !result_ptr.is_null() {
444 let result_mode = unsafe {
445 std::ffi::CStr::from_ptr(result_ptr as *const i8)
446 .to_string_lossy()
447 .to_uppercase()
448 };
449
450 if result_mode != journal_mode.to_uppercase() {
451 log::warn!(
452 "journal_mode {} requested but SQLite set {}",
453 journal_mode,
454 result_mode
455 );
456 } else {
457 log::info!("journal_mode successfully set to {}", result_mode);
458 }
459 }
460 }
461 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
462 } else {
463 log::warn!("Failed to prepare journal_mode PRAGMA");
464 }
465 }
466
467 if let Some(auto_vacuum) = config.auto_vacuum {
469 let vacuum_mode = if auto_vacuum { 1 } else { 0 }; log::debug!("Setting auto_vacuum to {}", vacuum_mode);
471 exec_sql(db, &format!("PRAGMA auto_vacuum = {}", vacuum_mode))?;
472 }
473
474 log::info!("Database configuration applied successfully");
475
476 #[cfg(feature = "telemetry")]
478 let metrics = crate::telemetry::Metrics::new().map_err(|e| {
479 DatabaseError::new(
480 "METRICS_ERROR",
481 &format!("Failed to initialize metrics: {}", e),
482 )
483 })?;
484
485 let database = Database {
486 connection_state,
487 name: normalized_name.clone(), on_data_change_callback: None,
489 allow_non_leader_writes: false,
490 optimistic_updates_manager: std::cell::RefCell::new(
491 crate::storage::optimistic_updates::OptimisticUpdatesManager::new(),
492 ),
493 coordination_metrics_manager: std::cell::RefCell::new(
494 crate::storage::coordination_metrics::CoordinationMetricsManager::new(),
495 ),
496 #[cfg(feature = "telemetry")]
497 metrics: Some(metrics),
498 #[cfg(feature = "telemetry")]
499 span_recorder: None,
500 #[cfg(feature = "telemetry")]
501 span_context: Some(crate::telemetry::SpanContext::new()),
502 max_export_size_bytes: config.max_export_size_bytes,
503 };
504
505 #[cfg(target_arch = "wasm32")]
508 {
509 DB_OPEN_IN_PROGRESS.with(|opens| {
510 opens.borrow_mut().remove(&config.name);
511 });
512 web_sys::console::log_1(
513 &format!("[DB] {} - RELEASED sqlite open lock", config.name).into(),
514 );
515 }
516
517 Ok(database)
518 }
519
520 pub async fn open_with_vfs(filename: &str, vfs_name: &str) -> Result<Self, DatabaseError> {
522 use std::ffi::CString;
523
524 log::info!("Opening database {} with VFS {}", filename, vfs_name);
525
526 let normalized_name = normalize_db_name(filename);
528 let pool_key = normalized_name.trim_end_matches(".db").to_string();
529
530 let connection_state = crate::connection_pool::get_or_create_connection(&pool_key, || {
532 let mut db: *mut sqlite_wasm_rs::sqlite3 = std::ptr::null_mut();
533 let db_name = CString::new(normalized_name.clone())
534 .map_err(|_| "Invalid database name".to_string())?;
535 let vfs_cstr = CString::new(vfs_name).map_err(|_| "Invalid VFS name".to_string())?;
536
537 let ret = unsafe {
538 sqlite_wasm_rs::sqlite3_open_v2(
539 db_name.as_ptr(),
540 &mut db as *mut _,
541 sqlite_wasm_rs::SQLITE_OPEN_READWRITE | sqlite_wasm_rs::SQLITE_OPEN_CREATE,
542 vfs_cstr.as_ptr(),
543 )
544 };
545
546 if ret != sqlite_wasm_rs::SQLITE_OK {
547 let err_msg = if !db.is_null() {
548 unsafe {
549 let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(db);
550 if !msg_ptr.is_null() {
551 std::ffi::CStr::from_ptr(msg_ptr)
552 .to_string_lossy()
553 .into_owned()
554 } else {
555 "Unknown error".to_string()
556 }
557 }
558 } else {
559 "Failed to open database".to_string()
560 };
561 return Err(format!("SQLITE_ERROR: {}", err_msg));
562 }
563
564 Ok(db)
565 })
566 .map_err(|e| DatabaseError::new("OPEN_ERROR", &e))?;
567
568 log::info!(
569 "Successfully opened database {} with VFS {}",
570 normalized_name,
571 vfs_name
572 );
573
574 #[cfg(feature = "telemetry")]
576 let metrics = crate::telemetry::Metrics::new().map_err(|e| {
577 DatabaseError::new(
578 "METRICS_ERROR",
579 &format!("Failed to initialize metrics: {}", e),
580 )
581 })?;
582
583 Ok(Database {
584 connection_state,
585 name: normalized_name, on_data_change_callback: None,
587 allow_non_leader_writes: false,
588 optimistic_updates_manager: std::cell::RefCell::new(
589 crate::storage::optimistic_updates::OptimisticUpdatesManager::new(),
590 ),
591 coordination_metrics_manager: std::cell::RefCell::new(
592 crate::storage::coordination_metrics::CoordinationMetricsManager::new(),
593 ),
594 #[cfg(feature = "telemetry")]
595 metrics: Some(metrics),
596 #[cfg(feature = "telemetry")]
597 span_recorder: None,
598 #[cfg(feature = "telemetry")]
599 span_context: Some(crate::telemetry::SpanContext::new()),
600 max_export_size_bytes: Some(2 * 1024 * 1024 * 1024), })
602 }
603
604 pub async fn execute_internal(&mut self, sql: &str) -> Result<QueryResult, DatabaseError> {
605 use std::ffi::{CStr, CString};
606 let start_time = js_sys::Date::now();
607
608 #[cfg(feature = "telemetry")]
610 let span = if self.span_recorder.is_some() {
611 let query_type = sql
612 .trim()
613 .split_whitespace()
614 .next()
615 .unwrap_or("UNKNOWN")
616 .to_uppercase();
617 let mut builder = crate::telemetry::SpanBuilder::new("execute_query".to_string())
618 .with_attribute("query_type", query_type.clone())
619 .with_attribute("sql", sql.to_string());
620
621 if let Some(ref context) = self.span_context {
623 builder = builder.with_baggage_from_context(context);
624 }
625
626 let span = builder.build();
627
628 if let Some(ref context) = self.span_context {
630 context.enter_span(span.span_id.clone());
631 }
632
633 Some(span)
634 } else {
635 None
636 };
637
638 #[cfg(feature = "telemetry")]
640 #[cfg(feature = "telemetry")]
641 if let Some(metrics) = &self.metrics {
642 metrics.queries_total().inc();
643 }
644
645 if self.db().is_null() {
647 return Err(DatabaseError::new(
648 "NULL_CONNECTION",
649 "Database connection is null",
650 ));
651 }
652
653 let sql_cstr = CString::new(sql)
654 .map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL string"))?;
655
656 if sql.trim().to_uppercase().starts_with("SELECT") {
657 let mut stmt = std::ptr::null_mut();
658 let ret = unsafe {
659 sqlite_wasm_rs::sqlite3_prepare_v2(
660 self.db(),
661 sql_cstr.as_ptr(),
662 -1,
663 &mut stmt,
664 std::ptr::null_mut(),
665 )
666 };
667
668 if ret != sqlite_wasm_rs::SQLITE_OK {
669 let err_msg = unsafe {
671 let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
672 if !msg_ptr.is_null() {
673 CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
674 } else {
675 format!("Unknown error (code: {})", ret)
676 }
677 };
678
679 #[cfg(feature = "telemetry")]
681 #[cfg(feature = "telemetry")]
682 if let Some(metrics) = &self.metrics {
683 metrics.errors_total().inc();
684 }
685
686 #[cfg(feature = "telemetry")]
688 if let Some(mut s) = span {
689 s.status = crate::telemetry::SpanStatus::Error(format!(
690 "Failed to prepare: {}",
691 err_msg
692 ));
693 s.end_time_ms = Some(js_sys::Date::now());
694 if let Some(recorder) = &self.span_recorder {
695 recorder.record_span(s);
696 }
697
698 if let Some(ref context) = self.span_context {
700 context.exit_span();
701 }
702 }
703
704 return Err(DatabaseError::new(
705 "SQLITE_ERROR",
706 &format!("Failed to prepare statement: {}", err_msg),
707 )
708 .with_sql(sql));
709 }
710
711 let column_count = unsafe { sqlite_wasm_rs::sqlite3_column_count(stmt) };
712 let mut columns = Vec::new();
713 let mut rows = Vec::new();
714
715 for i in 0..column_count {
717 let col_name = unsafe {
718 let name_ptr = sqlite_wasm_rs::sqlite3_column_name(stmt, i);
719 if name_ptr.is_null() {
720 format!("col_{}", i)
721 } else {
722 std::ffi::CStr::from_ptr(name_ptr)
723 .to_string_lossy()
724 .into_owned()
725 }
726 };
727 columns.push(col_name);
728 }
729
730 loop {
732 let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
733 if step_ret == sqlite_wasm_rs::SQLITE_ROW {
734 let mut values = Vec::new();
735 for i in 0..column_count {
736 let value = unsafe {
737 let col_type = sqlite_wasm_rs::sqlite3_column_type(stmt, i);
738 match col_type {
739 sqlite_wasm_rs::SQLITE_NULL => ColumnValue::Null,
740 sqlite_wasm_rs::SQLITE_INTEGER => {
741 let val = sqlite_wasm_rs::sqlite3_column_int64(stmt, i);
742 ColumnValue::Integer(val)
743 }
744 sqlite_wasm_rs::SQLITE_FLOAT => {
745 let val = sqlite_wasm_rs::sqlite3_column_double(stmt, i);
746 ColumnValue::Real(val)
747 }
748 sqlite_wasm_rs::SQLITE_TEXT => {
749 let text_ptr = sqlite_wasm_rs::sqlite3_column_text(stmt, i);
750 if text_ptr.is_null() {
751 ColumnValue::Null
752 } else {
753 let text = std::ffi::CStr::from_ptr(text_ptr as *const i8)
754 .to_string_lossy()
755 .into_owned();
756 ColumnValue::Text(text)
757 }
758 }
759 sqlite_wasm_rs::SQLITE_BLOB => {
760 let blob_ptr = sqlite_wasm_rs::sqlite3_column_blob(stmt, i);
761 let blob_size = sqlite_wasm_rs::sqlite3_column_bytes(stmt, i);
762 if blob_ptr.is_null() || blob_size == 0 {
763 ColumnValue::Blob(vec![])
764 } else {
765 let blob_slice = std::slice::from_raw_parts(
766 blob_ptr as *const u8,
767 blob_size as usize,
768 );
769 ColumnValue::Blob(blob_slice.to_vec())
770 }
771 }
772 _ => ColumnValue::Null,
773 }
774 };
775 values.push(value);
776 }
777 rows.push(Row { values });
778 } else if step_ret == sqlite_wasm_rs::SQLITE_DONE {
779 break;
780 } else {
781 let err_msg = unsafe {
783 let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
784 if !err_ptr.is_null() {
785 std::ffi::CStr::from_ptr(err_ptr)
786 .to_string_lossy()
787 .to_string()
788 } else {
789 "Unknown SQLite error".to_string()
790 }
791 };
792 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
793 #[cfg(feature = "telemetry")]
795 if let Some(metrics) = &self.metrics {
796 metrics.errors_total().inc();
797 }
798 return Err(DatabaseError::new(
799 "SQLITE_ERROR",
800 &format!("Error executing SELECT statement: {}", err_msg),
801 )
802 .with_sql(sql));
803 }
804 }
805
806 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
807 let execution_time_ms = js_sys::Date::now() - start_time;
808
809 #[cfg(feature = "telemetry")]
811 if let Some(metrics) = &self.metrics {
812 metrics.query_duration().observe(execution_time_ms);
813 }
814
815 Ok(QueryResult {
816 columns,
817 rows,
818 affected_rows: 0,
819 last_insert_id: None,
820 execution_time_ms,
821 })
822 } else {
823 let mut stmt: *mut sqlite_wasm_rs::sqlite3_stmt = std::ptr::null_mut();
825 let ret = unsafe {
826 sqlite_wasm_rs::sqlite3_prepare_v2(
827 self.db(),
828 sql_cstr.as_ptr(),
829 -1,
830 &mut stmt,
831 std::ptr::null_mut(),
832 )
833 };
834
835 if ret != sqlite_wasm_rs::SQLITE_OK {
836 let err_msg = unsafe {
838 let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
839 if !msg_ptr.is_null() {
840 CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
841 } else {
842 format!("Unknown error (code: {})", ret)
843 }
844 };
845
846 #[cfg(feature = "telemetry")]
848 if let Some(metrics) = &self.metrics {
849 metrics.errors_total().inc();
850 }
851 return Err(DatabaseError::new(
852 "SQLITE_ERROR",
853 &format!("Failed to prepare statement: {}", err_msg),
854 )
855 .with_sql(sql));
856 }
857
858 let column_count = unsafe { sqlite_wasm_rs::sqlite3_column_count(stmt) };
860 let mut columns = Vec::new();
861 let mut rows = Vec::new();
862
863 if column_count > 0 {
864 for i in 0..column_count {
866 let col_name = unsafe {
867 let name_ptr = sqlite_wasm_rs::sqlite3_column_name(stmt, i);
868 if name_ptr.is_null() {
869 format!("column_{}", i)
870 } else {
871 std::ffi::CStr::from_ptr(name_ptr)
872 .to_string_lossy()
873 .into_owned()
874 }
875 };
876 columns.push(col_name);
877 }
878
879 loop {
881 let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
882 if step_ret == sqlite_wasm_rs::SQLITE_ROW {
883 let mut values = Vec::new();
884 for i in 0..column_count {
885 let value = unsafe {
886 let col_type = sqlite_wasm_rs::sqlite3_column_type(stmt, i);
887 match col_type {
888 sqlite_wasm_rs::SQLITE_TEXT => {
889 let text_ptr = sqlite_wasm_rs::sqlite3_column_text(stmt, i);
890 if text_ptr.is_null() {
891 ColumnValue::Null
892 } else {
893 let text =
894 std::ffi::CStr::from_ptr(text_ptr as *const i8)
895 .to_string_lossy()
896 .into_owned();
897 ColumnValue::Text(text)
898 }
899 }
900 sqlite_wasm_rs::SQLITE_INTEGER => ColumnValue::Integer(
901 sqlite_wasm_rs::sqlite3_column_int64(stmt, i),
902 ),
903 _ => ColumnValue::Null,
904 }
905 };
906 values.push(value);
907 }
908 rows.push(Row { values });
909 } else if step_ret == sqlite_wasm_rs::SQLITE_DONE {
910 break;
911 } else {
912 let err_msg = unsafe {
914 let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
915 if !err_ptr.is_null() {
916 std::ffi::CStr::from_ptr(err_ptr)
917 .to_string_lossy()
918 .to_string()
919 } else {
920 "Unknown SQLite error".to_string()
921 }
922 };
923 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
924 #[cfg(feature = "telemetry")]
926 if let Some(metrics) = &self.metrics {
927 metrics.errors_total().inc();
928 }
929 return Err(DatabaseError::new(
930 "SQLITE_ERROR",
931 &format!("Failed to execute statement: {}", err_msg),
932 )
933 .with_sql(sql));
934 }
935 }
936 } else {
937 let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
939 if step_ret != sqlite_wasm_rs::SQLITE_DONE {
940 let err_msg = unsafe {
942 let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
943 if !err_ptr.is_null() {
944 std::ffi::CStr::from_ptr(err_ptr)
945 .to_string_lossy()
946 .to_string()
947 } else {
948 "Unknown SQLite error".to_string()
949 }
950 };
951 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
952 #[cfg(feature = "telemetry")]
954 if let Some(metrics) = &self.metrics {
955 metrics.errors_total().inc();
956 }
957 return Err(DatabaseError::new(
958 "SQLITE_ERROR",
959 &format!("Failed to execute statement: {}", err_msg),
960 )
961 .with_sql(sql));
962 }
963 }
964
965 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
967
968 let affected_rows = unsafe { sqlite_wasm_rs::sqlite3_changes(self.db()) } as u32;
969 let last_insert_id = if sql.trim().to_uppercase().starts_with("INSERT") {
970 Some(unsafe { sqlite_wasm_rs::sqlite3_last_insert_rowid(self.db()) })
971 } else {
972 None
973 };
974
975 let execution_time_ms = js_sys::Date::now() - start_time;
976
977 #[cfg(feature = "telemetry")]
979 if let Some(metrics) = &self.metrics {
980 metrics.query_duration().observe(execution_time_ms);
981 }
982
983 #[cfg(feature = "telemetry")]
985 if let Some(mut s) = span {
986 s.status = crate::telemetry::SpanStatus::Ok;
987 s.end_time_ms = Some(js_sys::Date::now());
988 s.attributes
989 .insert("duration_ms".to_string(), execution_time_ms.to_string());
990 s.attributes
991 .insert("affected_rows".to_string(), affected_rows.to_string());
992 s.attributes
993 .insert("row_count".to_string(), rows.len().to_string());
994 if let Some(recorder) = &self.span_recorder {
995 recorder.record_span(s);
996 }
997
998 if let Some(ref context) = self.span_context {
1000 context.exit_span();
1001 }
1002 }
1003
1004 Ok(QueryResult {
1005 columns,
1006 rows,
1007 affected_rows,
1008 last_insert_id,
1009 execution_time_ms,
1010 })
1011 }
1012 }
1013
1014 pub async fn execute_with_params_internal(
1015 &mut self,
1016 sql: &str,
1017 params: &[ColumnValue],
1018 ) -> Result<QueryResult, DatabaseError> {
1019 use std::ffi::{CStr, CString};
1020 let start_time = js_sys::Date::now();
1021
1022 #[cfg(feature = "telemetry")]
1024 let span = if self.span_recorder.is_some() {
1025 let query_type = sql
1026 .trim()
1027 .split_whitespace()
1028 .next()
1029 .unwrap_or("UNKNOWN")
1030 .to_uppercase();
1031 let span = crate::telemetry::SpanBuilder::new("execute_query".to_string())
1032 .with_attribute("query_type", query_type.clone())
1033 .with_attribute("sql", sql.to_string())
1034 .build();
1035 Some(span)
1036 } else {
1037 None
1038 };
1039
1040 #[cfg(feature = "telemetry")]
1042 self.ensure_metrics_propagated();
1043
1044 #[cfg(feature = "telemetry")]
1046 if let Some(metrics) = &self.metrics {
1047 metrics.queries_total().inc();
1048 }
1049
1050 let sql_cstr = CString::new(sql)
1051 .map_err(|_| DatabaseError::new("INVALID_SQL", "Invalid SQL string"))?;
1052
1053 let mut stmt = std::ptr::null_mut();
1054 let ret = unsafe {
1055 sqlite_wasm_rs::sqlite3_prepare_v2(
1056 self.db(),
1057 sql_cstr.as_ptr(),
1058 -1,
1059 &mut stmt,
1060 std::ptr::null_mut(),
1061 )
1062 };
1063
1064 if ret != sqlite_wasm_rs::SQLITE_OK {
1065 let err_msg = unsafe {
1067 let msg_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
1068 if !msg_ptr.is_null() {
1069 CStr::from_ptr(msg_ptr).to_string_lossy().into_owned()
1070 } else {
1071 format!("Unknown error (code: {})", ret)
1072 }
1073 };
1074
1075 #[cfg(feature = "telemetry")]
1077 if let Some(metrics) = &self.metrics {
1078 metrics.errors_total().inc();
1079 }
1080
1081 #[cfg(feature = "telemetry")]
1083 if let Some(mut s) = span {
1084 s.status =
1085 crate::telemetry::SpanStatus::Error(format!("Failed to prepare: {}", err_msg));
1086 s.end_time_ms = Some(js_sys::Date::now());
1087 if let Some(recorder) = &self.span_recorder {
1088 recorder.record_span(s);
1089 }
1090 }
1091
1092 return Err(DatabaseError::new(
1093 "SQLITE_ERROR",
1094 &format!("Failed to prepare statement: {}", err_msg),
1095 )
1096 .with_sql(sql));
1097 }
1098
1099 let mut text_cstrings = Vec::new(); for (i, param) in params.iter().enumerate() {
1102 let param_index = (i + 1) as i32;
1103 let bind_ret = unsafe {
1104 match param {
1105 ColumnValue::Null => sqlite_wasm_rs::sqlite3_bind_null(stmt, param_index),
1106 ColumnValue::Integer(val) => {
1107 sqlite_wasm_rs::sqlite3_bind_int64(stmt, param_index, *val)
1108 }
1109 ColumnValue::Real(val) => {
1110 sqlite_wasm_rs::sqlite3_bind_double(stmt, param_index, *val)
1111 }
1112 ColumnValue::Text(val) => {
1113 let sanitized = val.replace('\0', "");
1115 let text_cstr = CString::new(sanitized.as_str())
1117 .expect("CString::new should not fail after null byte removal");
1118 let result = sqlite_wasm_rs::sqlite3_bind_text(
1119 stmt,
1120 param_index,
1121 text_cstr.as_ptr(),
1122 sanitized.len() as i32,
1123 sqlite_wasm_rs::SQLITE_TRANSIENT(),
1124 );
1125 text_cstrings.push(text_cstr); result
1127 }
1128 ColumnValue::Blob(val) => sqlite_wasm_rs::sqlite3_bind_blob(
1129 stmt,
1130 param_index,
1131 val.as_ptr() as *const _,
1132 val.len() as i32,
1133 sqlite_wasm_rs::SQLITE_TRANSIENT(),
1134 ),
1135 _ => sqlite_wasm_rs::sqlite3_bind_null(stmt, param_index),
1136 }
1137 };
1138
1139 if bind_ret != sqlite_wasm_rs::SQLITE_OK {
1140 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
1141 #[cfg(feature = "telemetry")]
1143 if let Some(metrics) = &self.metrics {
1144 metrics.errors_total().inc();
1145 }
1146 return Err(
1147 DatabaseError::new("SQLITE_ERROR", "Failed to bind parameter").with_sql(sql),
1148 );
1149 }
1150 }
1151
1152 if sql.trim().to_uppercase().starts_with("SELECT") {
1153 let column_count = unsafe { sqlite_wasm_rs::sqlite3_column_count(stmt) };
1154 let mut columns = Vec::new();
1155 let mut rows = Vec::new();
1156
1157 for i in 0..column_count {
1159 let col_name = unsafe {
1160 let name_ptr = sqlite_wasm_rs::sqlite3_column_name(stmt, i);
1161 if name_ptr.is_null() {
1162 format!("col_{}", i)
1163 } else {
1164 std::ffi::CStr::from_ptr(name_ptr)
1165 .to_string_lossy()
1166 .into_owned()
1167 }
1168 };
1169 columns.push(col_name);
1170 }
1171
1172 loop {
1174 let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
1175 if step_ret == sqlite_wasm_rs::SQLITE_ROW {
1176 let mut values = Vec::new();
1177 for i in 0..column_count {
1178 let value = unsafe {
1179 let col_type = sqlite_wasm_rs::sqlite3_column_type(stmt, i);
1180 match col_type {
1181 sqlite_wasm_rs::SQLITE_NULL => ColumnValue::Null,
1182 sqlite_wasm_rs::SQLITE_INTEGER => {
1183 let val = sqlite_wasm_rs::sqlite3_column_int64(stmt, i);
1184 ColumnValue::Integer(val)
1185 }
1186 sqlite_wasm_rs::SQLITE_FLOAT => {
1187 let val = sqlite_wasm_rs::sqlite3_column_double(stmt, i);
1188 ColumnValue::Real(val)
1189 }
1190 sqlite_wasm_rs::SQLITE_TEXT => {
1191 let text_ptr = sqlite_wasm_rs::sqlite3_column_text(stmt, i);
1192 if text_ptr.is_null() {
1193 ColumnValue::Null
1194 } else {
1195 let text = std::ffi::CStr::from_ptr(text_ptr as *const i8)
1196 .to_string_lossy()
1197 .into_owned();
1198 ColumnValue::Text(text)
1199 }
1200 }
1201 sqlite_wasm_rs::SQLITE_BLOB => {
1202 let blob_ptr = sqlite_wasm_rs::sqlite3_column_blob(stmt, i);
1203 let blob_size = sqlite_wasm_rs::sqlite3_column_bytes(stmt, i);
1204 if blob_ptr.is_null() || blob_size == 0 {
1205 ColumnValue::Blob(vec![])
1206 } else {
1207 let blob_slice = std::slice::from_raw_parts(
1208 blob_ptr as *const u8,
1209 blob_size as usize,
1210 );
1211 ColumnValue::Blob(blob_slice.to_vec())
1212 }
1213 }
1214 _ => ColumnValue::Null,
1215 }
1216 };
1217 values.push(value);
1218 }
1219 rows.push(Row { values });
1220 } else if step_ret == sqlite_wasm_rs::SQLITE_DONE {
1221 break;
1222 } else {
1223 let err_msg = unsafe {
1225 let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
1226 if !err_ptr.is_null() {
1227 std::ffi::CStr::from_ptr(err_ptr)
1228 .to_string_lossy()
1229 .to_string()
1230 } else {
1231 "Unknown SQLite error".to_string()
1232 }
1233 };
1234 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
1235 #[cfg(feature = "telemetry")]
1237 if let Some(metrics) = &self.metrics {
1238 metrics.errors_total().inc();
1239 }
1240 return Err(DatabaseError::new(
1241 "SQLITE_ERROR",
1242 &format!("Error executing SELECT statement: {}", err_msg),
1243 )
1244 .with_sql(sql));
1245 }
1246 }
1247
1248 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
1249
1250 let execution_time_ms = js_sys::Date::now() - start_time;
1251
1252 #[cfg(feature = "telemetry")]
1254 if let Some(metrics) = &self.metrics {
1255 metrics.query_duration().observe(execution_time_ms);
1256 }
1257
1258 #[cfg(feature = "telemetry")]
1260 if let Some(mut s) = span {
1261 s.status = crate::telemetry::SpanStatus::Ok;
1262 s.end_time_ms = Some(js_sys::Date::now());
1263 s.attributes
1264 .insert("duration_ms".to_string(), execution_time_ms.to_string());
1265 s.attributes
1266 .insert("row_count".to_string(), rows.len().to_string());
1267 if let Some(recorder) = &self.span_recorder {
1268 recorder.record_span(s);
1269 }
1270 }
1271
1272 Ok(QueryResult {
1273 columns,
1274 rows,
1275 affected_rows: 0,
1276 last_insert_id: None,
1277 execution_time_ms,
1278 })
1279 } else {
1280 let step_ret = unsafe { sqlite_wasm_rs::sqlite3_step(stmt) };
1282 #[cfg(feature = "telemetry")]
1284 if let Some(metrics) = &self.metrics {
1285 metrics.errors_total().inc();
1286 }
1287 unsafe { sqlite_wasm_rs::sqlite3_finalize(stmt) };
1288
1289 if step_ret != sqlite_wasm_rs::SQLITE_DONE {
1290 let err_msg = unsafe {
1291 let err_ptr = sqlite_wasm_rs::sqlite3_errmsg(self.db());
1292 if !err_ptr.is_null() {
1293 std::ffi::CStr::from_ptr(err_ptr)
1294 .to_string_lossy()
1295 .to_string()
1296 } else {
1297 "Unknown SQLite error".to_string()
1298 }
1299 };
1300 return Err(DatabaseError::new(
1301 "SQLITE_ERROR",
1302 &format!("Failed to execute statement: {}", err_msg),
1303 )
1304 .with_sql(sql));
1305 }
1306
1307 let execution_time_ms = js_sys::Date::now() - start_time;
1308
1309 #[cfg(feature = "telemetry")]
1311 if let Some(metrics) = &self.metrics {
1312 metrics.query_duration().observe(execution_time_ms);
1313 }
1314 let affected_rows = unsafe { sqlite_wasm_rs::sqlite3_changes(self.db()) } as u32;
1315 let last_insert_id = if sql.trim().to_uppercase().starts_with("INSERT") {
1316 Some(unsafe { sqlite_wasm_rs::sqlite3_last_insert_rowid(self.db()) })
1317 } else {
1318 None
1319 };
1320
1321 #[cfg(feature = "telemetry")]
1323 if let Some(mut s) = span {
1324 s.status = crate::telemetry::SpanStatus::Ok;
1325 s.end_time_ms = Some(js_sys::Date::now());
1326 s.attributes
1327 .insert("duration_ms".to_string(), execution_time_ms.to_string());
1328 s.attributes
1329 .insert("affected_rows".to_string(), affected_rows.to_string());
1330 if let Some(recorder) = &self.span_recorder {
1331 recorder.record_span(s);
1332 }
1333 }
1334
1335 Ok(QueryResult {
1336 columns: vec![],
1337 rows: vec![],
1338 affected_rows,
1339 last_insert_id,
1340 execution_time_ms,
1341 })
1342 }
1343 }
1344
1345 #[cfg(feature = "telemetry")]
1347 pub fn set_metrics(&mut self, metrics: Option<crate::telemetry::Metrics>) {
1348 self.metrics = metrics.clone();
1349 self.ensure_metrics_propagated();
1350 }
1351
1352 #[cfg(feature = "telemetry")]
1354 pub fn set_span_recorder(&mut self, recorder: Option<crate::telemetry::SpanRecorder>) {
1355 self.span_recorder = recorder;
1356 }
1357
1358 #[cfg(feature = "telemetry")]
1360 pub fn get_span_context(&self) -> Option<&crate::telemetry::SpanContext> {
1361 self.span_context.as_ref()
1362 }
1363
1364 #[cfg(feature = "telemetry")]
1366 pub fn get_span_recorder(&self) -> Option<&crate::telemetry::SpanRecorder> {
1367 self.span_recorder.as_ref()
1368 }
1369 #[cfg(feature = "telemetry")]
1371 fn ensure_metrics_propagated(&self) {
1372 #[cfg(target_arch = "wasm32")]
1374 {
1375 use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
1376
1377 if self.metrics.is_none() {
1378 return;
1379 }
1380
1381 let db_name = &self.name;
1382
1383 if let Some(storage_rc) = get_storage_with_fallback(db_name) {
1384 use crate::vfs::indexeddb_vfs::with_storage_borrow_mut;
1385 let _ = with_storage_borrow_mut(&storage_rc, "set_metrics", |storage| {
1386 storage.set_metrics(self.metrics.clone());
1387 Ok(())
1388 });
1389 }
1390 }
1391 }
1392
1393 pub async fn close_internal(&mut self) -> Result<(), DatabaseError> {
1394 log::info!("CLOSE_INTERNAL STARTED for: {}", self.name);
1395
1396 if self.connection_state.db.get().is_null() {
1398 log::info!(
1399 "Connection already null for {}, skipping close operations",
1400 self.name
1401 );
1402 return Ok(());
1403 }
1404
1405 log::info!("Checkpointing WAL before close: {}", self.name);
1407 let _ = self
1408 .execute_internal("PRAGMA wal_checkpoint(PASSIVE)")
1409 .await;
1410 log::info!("WAL checkpoint completed for: {}", self.name);
1411
1412 log::info!("Syncing database before close: {}", self.name);
1414 self.sync_internal().await?;
1415 log::info!("Sync completed for: {}", self.name);
1416
1417 web_sys::console::log_1(
1418 &format!("CLOSE: About to stop leader election for {}", self.name).into(),
1419 );
1420
1421 #[cfg(target_arch = "wasm32")]
1423 {
1424 use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
1425
1426 let db_name = &self.name;
1427 web_sys::console::log_1(
1428 &format!("STOP_ELECTION: Getting storage for {}", db_name).into(),
1429 );
1430 log::info!("STOP_ELECTION: Getting storage for {}", db_name);
1431 let storage_rc = get_storage_with_fallback(db_name);
1432
1433 if let Some(storage_rc) = storage_rc {
1434 log::info!("STOP_ELECTION: Found storage for {}, calling stop", db_name);
1435 match with_storage_async!(storage_rc, "stop_leader_election", |storage| storage
1436 .stop_leader_election())
1437 {
1438 Some(Ok(())) => {
1439 log::info!("STOP_ELECTION: Successfully stopped for {}", db_name);
1440 }
1441 Some(Err(e)) => {
1442 log::warn!("STOP_ELECTION: Failed for {}: {:?}", db_name, e);
1443 }
1444 None => {
1445 log::warn!("STOP_ELECTION: Borrow failed for {}", db_name);
1446 }
1447 }
1448 } else {
1449 log::warn!("STOP_ELECTION: No storage found for {}", db_name);
1450 }
1451 log::info!("STOP_ELECTION: Completed section for {}", db_name);
1452 }
1453
1454 log::info!(
1459 "Closed database: {} (connection will be released on Drop)",
1460 self.name
1461 );
1462 Ok(())
1463 }
1464
1465 pub async fn query(&mut self, sql: &str) -> Result<Vec<Row>, DatabaseError> {
1467 let result = self.execute_internal(sql).await?;
1468 Ok(result.rows)
1469 }
1470
1471 pub async fn sync_internal(&mut self) -> Result<(), DatabaseError> {
1472 #[cfg(all(target_arch = "wasm32", feature = "telemetry"))]
1474 let start_time = js_sys::Date::now();
1475
1476 #[cfg(feature = "telemetry")]
1478 let span = if self.span_recorder.is_some() {
1479 let mut builder = crate::telemetry::SpanBuilder::new("vfs_sync".to_string())
1480 .with_attribute("operation", "sync");
1481
1482 if let Some(ref context) = self.span_context {
1484 builder = builder
1485 .with_context(context)
1486 .with_baggage_from_context(context);
1487 }
1488
1489 let span = builder.build();
1490
1491 if let Some(ref context) = self.span_context {
1493 context.enter_span(span.span_id.clone());
1494 }
1495
1496 Some(span)
1497 } else {
1498 None
1499 };
1500
1501 #[cfg(feature = "telemetry")]
1503 if let Some(ref metrics) = self.metrics {
1504 metrics.sync_operations_total().inc();
1505 }
1506
1507 #[cfg(feature = "telemetry")]
1509 let mut blocks_count = 0;
1510
1511 #[cfg(target_arch = "wasm32")]
1513 {
1514 use crate::storage::vfs_sync;
1515
1516 let storage_name = &self.name;
1520
1521 let next_commit = vfs_sync::with_global_commit_marker(|cm| {
1523 #[cfg(target_arch = "wasm32")]
1524 let mut cm_ref = cm.borrow_mut();
1525 #[cfg(not(target_arch = "wasm32"))]
1526 let mut cm_ref = cm.lock();
1527
1528 let current = cm_ref.get(storage_name).copied().unwrap_or(0);
1529 let new_marker = current + 1;
1530 cm_ref.insert(storage_name.to_string(), new_marker);
1531 log::debug!(
1532 "Advanced commit marker for {} from {} to {}",
1533 storage_name,
1534 current,
1535 new_marker
1536 );
1537 new_marker
1538 });
1539
1540 web_sys::console::log_1(
1541 &format!(
1542 "[SYNC] Collecting blocks from GLOBAL_STORAGE for: {}",
1543 storage_name
1544 )
1545 .into(),
1546 );
1547 let (blocks_to_persist, metadata_to_persist) =
1548 vfs_sync::with_global_storage(|storage| {
1549 #[cfg(target_arch = "wasm32")]
1550 let storage_map = storage.borrow();
1551 #[cfg(not(target_arch = "wasm32"))]
1552 let storage_map = storage.lock();
1553
1554 let blocks = if let Some(db_storage) = storage_map.get(storage_name) {
1555 let count = db_storage.len();
1556 web_sys::console::log_1(
1557 &format!("[SYNC] Found {} blocks in GLOBAL_STORAGE", count).into(),
1558 );
1559 db_storage
1560 .iter()
1561 .map(|(&id, data)| (id, data.clone()))
1562 .collect::<Vec<_>>()
1563 } else {
1564 web_sys::console::log_1(
1565 &format!(
1566 "[SYNC] No blocks found in GLOBAL_STORAGE for {}",
1567 storage_name
1568 )
1569 .into(),
1570 );
1571 Vec::new()
1572 };
1573
1574 let metadata = vfs_sync::with_global_metadata(|meta| {
1575 #[cfg(target_arch = "wasm32")]
1576 let meta_map = meta.borrow();
1577 #[cfg(not(target_arch = "wasm32"))]
1578 let meta_map = meta.lock();
1579 if let Some(db_meta) = meta_map.get(storage_name) {
1580 let count = db_meta.len();
1581 web_sys::console::log_1(
1582 &format!("[SYNC] Found {} metadata entries", count).into(),
1583 );
1584 db_meta
1585 .iter()
1586 .map(|(&id, metadata)| (id, metadata.checksum))
1587 .collect::<Vec<_>>()
1588 } else {
1589 web_sys::console::log_1(&format!("[SYNC] No metadata found").into());
1590 Vec::new()
1591 }
1592 });
1593
1594 (blocks, metadata)
1595 });
1596
1597 web_sys::console::log_1(
1598 &format!(
1599 "[SYNC] Preparing to persist {} blocks to IndexedDB",
1600 blocks_to_persist.len()
1601 )
1602 .into(),
1603 );
1604
1605 if !blocks_to_persist.is_empty() {
1606 #[cfg(feature = "telemetry")]
1607 {
1608 blocks_count = blocks_to_persist.len();
1609 }
1610 web_sys::console::log_1(
1611 &format!(
1612 "[SYNC] Persisting {} blocks to IndexedDB",
1613 blocks_to_persist.len()
1614 )
1615 .into(),
1616 );
1617 crate::storage::wasm_indexeddb::persist_to_indexeddb_event_based(
1618 storage_name,
1619 blocks_to_persist,
1620 metadata_to_persist,
1621 next_commit,
1622 #[cfg(feature = "telemetry")]
1623 self.span_recorder.clone(),
1624 #[cfg(feature = "telemetry")]
1625 span.as_ref().map(|s| s.span_id.clone()),
1626 )
1627 .await?;
1628 web_sys::console::log_1(
1629 &format!("[SYNC] Successfully persisted to IndexedDB").into(),
1630 );
1631 } else {
1632 web_sys::console::log_1(
1633 &format!("[SYNC] WARNING: No blocks to persist - GLOBAL_STORAGE is empty!")
1634 .into(),
1635 );
1636 }
1637
1638 use crate::storage::broadcast_notifications::{
1640 BroadcastNotification, send_change_notification,
1641 };
1642
1643 let notification = BroadcastNotification::DataChanged {
1644 db_name: self.name.clone(),
1645 timestamp: js_sys::Date::now() as u64,
1646 };
1647
1648 log::debug!("Sending DataChanged notification for {}", self.name);
1649
1650 if let Err(e) = send_change_notification(¬ification) {
1651 log::warn!("Failed to send change notification: {}", e);
1652 }
1654 }
1655
1656 #[cfg(all(target_arch = "wasm32", feature = "telemetry"))]
1658 if let Some(ref metrics) = self.metrics {
1659 let duration_ms = js_sys::Date::now() - start_time;
1660 metrics.sync_duration().observe(duration_ms);
1661 }
1662
1663 #[cfg(feature = "telemetry")]
1665 if let Some(mut s) = span {
1666 s.status = crate::telemetry::SpanStatus::Ok;
1667 #[cfg(target_arch = "wasm32")]
1668 {
1669 s.end_time_ms = Some(js_sys::Date::now());
1670 let duration_ms = s.end_time_ms.unwrap() - s.start_time_ms;
1671 s.attributes
1672 .insert("duration_ms".to_string(), duration_ms.to_string());
1673 }
1674 #[cfg(not(target_arch = "wasm32"))]
1675 {
1676 let now = std::time::SystemTime::now()
1677 .duration_since(std::time::UNIX_EPOCH)
1678 .unwrap()
1679 .as_millis() as f64;
1680 s.end_time_ms = Some(now);
1681 let duration_ms = s.end_time_ms.unwrap() - s.start_time_ms;
1682 s.attributes
1683 .insert("duration_ms".to_string(), duration_ms.to_string());
1684 }
1685 s.attributes
1686 .insert("blocks_persisted".to_string(), blocks_count.to_string());
1687 if let Some(recorder) = &self.span_recorder {
1688 recorder.record_span(s);
1689 }
1690
1691 if let Some(ref context) = self.span_context {
1693 context.exit_span();
1694 }
1695 }
1696
1697 Ok(())
1698 }
1699}
1700
1701#[cfg(target_arch = "wasm32")]
1702impl Drop for Database {
1703 fn drop(&mut self) {
1704 web_sys::console::log_1(&format!("DROP: Releasing connection for {}", self.name).into());
1705
1706 let pool_key = self.name.trim_end_matches(".db");
1710 crate::connection_pool::release_connection(pool_key);
1711
1712 web_sys::console::log_1(&format!("DROP: Connection released for {}", self.name).into());
1713
1714 use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
1716 if let Some(storage_rc) = get_storage_with_fallback(&self.name) {
1717 let storage = &*storage_rc;
1719 if let Ok(mut manager_ref) = storage.leader_election.try_borrow_mut() {
1721 if let Some(ref mut manager) = *manager_ref {
1722 if let Some(interval_id) = manager.heartbeat_interval.take() {
1724 if let Some(window) = web_sys::window() {
1725 window.clear_interval_with_handle(interval_id);
1726 web_sys::console::log_1(
1727 &format!(
1728 "DROP: Cleared heartbeat interval {} for {}",
1729 interval_id, self.name
1730 )
1731 .into(),
1732 );
1733 }
1734 }
1735 if manager.heartbeat_closure.take().is_some() {
1737 web_sys::console::log_1(
1738 &format!("DROP: Released heartbeat closure for {}", self.name).into(),
1739 );
1740 }
1741 }
1742 } else {
1743 web_sys::console::log_1(
1745 &format!(
1746 "[DROP] Skipping {} (heartbeat already stopped by shared DB)",
1747 self.name
1748 )
1749 .into(),
1750 );
1751 }
1752 }
1753
1754 log::debug!(
1758 "Closed database: {} (BlockStorage remains in registry)",
1759 self.name
1760 );
1761 }
1762}
1763
1764#[cfg(target_arch = "wasm32")]
1766#[wasm_bindgen]
1767impl Database {
1768 #[wasm_bindgen(js_name = "newDatabase")]
1769 pub async fn new_wasm(name: String) -> Result<Database, JsValue> {
1770 let normalized_name = if name.ends_with(".db") {
1772 name.clone()
1773 } else {
1774 format!("{}.db", name)
1775 };
1776
1777 let config = DatabaseConfig {
1778 name: normalized_name.clone(),
1779 version: Some(1),
1780 cache_size: Some(10_000),
1781 page_size: Some(4096),
1782 auto_vacuum: Some(true),
1783 journal_mode: Some("WAL".to_string()),
1784 max_export_size_bytes: Some(2 * 1024 * 1024 * 1024), };
1786
1787 let db = Database::new(config)
1788 .await
1789 .map_err(|e| JsValue::from_str(&format!("Failed to create database: {}", e)))?;
1790
1791 Self::start_write_queue_listener(&normalized_name)?;
1793
1794 Ok(db)
1795 }
1796
1797 #[wasm_bindgen(getter)]
1799 pub fn name(&self) -> String {
1800 self.name.clone()
1801 }
1802
1803 #[wasm_bindgen(js_name = "getAllDatabases")]
1807 pub async fn get_all_databases() -> Result<JsValue, JsValue> {
1808 use crate::storage::vfs_sync::with_global_storage;
1809 use crate::vfs::indexeddb_vfs::STORAGE_REGISTRY;
1810 use std::collections::HashSet;
1811
1812 log::info!("getAllDatabases called");
1813 let mut db_names = HashSet::new();
1814
1815 match Self::get_persistent_database_list() {
1817 Ok(persistent_list) => {
1818 log::info!("Persistent list has {} entries", persistent_list.len());
1819 for name in persistent_list {
1820 log::info!("Found in persistent list: {}", name);
1821 db_names.insert(name);
1822 }
1823 }
1824 Err(e) => {
1825 log::warn!("Failed to get persistent list: {:?}", e);
1826 }
1827 }
1828
1829 STORAGE_REGISTRY.with(|reg| unsafe {
1832 let registry = &*reg.get();
1833 log::info!("STORAGE_REGISTRY has {} entries", registry.len());
1834 for key in registry.keys() {
1835 log::info!("Found in STORAGE_REGISTRY: {}", key);
1836 db_names.insert(key.clone());
1837 }
1838 });
1839
1840 with_global_storage(|storage| {
1842 let storage_borrow = storage.borrow();
1843 log::info!("GLOBAL_STORAGE has {} entries", storage_borrow.len());
1844 for key in storage_borrow.keys() {
1845 log::info!("Found in GLOBAL_STORAGE: {}", key);
1846 db_names.insert(key.clone());
1847 }
1848 });
1849
1850 log::info!("Total unique databases found: {}", db_names.len());
1851
1852 let mut result_vec: Vec<String> = db_names.into_iter().collect();
1854 result_vec.sort();
1855
1856 let js_array = js_sys::Array::new();
1858 for name in &result_vec {
1859 log::info!("Returning database: {}", name);
1860 js_array.push(&JsValue::from_str(name));
1861 }
1862
1863 log::info!("getAllDatabases returning {} databases", result_vec.len());
1864
1865 Ok(js_array.into())
1866 }
1867
1868 #[wasm_bindgen(js_name = "deleteDatabase")]
1872 pub async fn delete_database(name: String) -> Result<(), JsValue> {
1873 use crate::storage::vfs_sync::{
1874 with_global_commit_marker, with_global_metadata, with_global_storage,
1875 };
1876
1877 let normalized_name = if name.ends_with(".db") {
1879 name.clone()
1880 } else {
1881 format!("{}.db", name)
1882 };
1883
1884 log::info!("Deleting database: {}", normalized_name);
1885
1886 use crate::vfs::indexeddb_vfs::remove_storage_from_registry;
1888 remove_storage_from_registry(&normalized_name);
1889
1890 with_global_storage(|gs| {
1892 #[cfg(target_arch = "wasm32")]
1893 let mut storage = gs.borrow_mut();
1894 #[cfg(not(target_arch = "wasm32"))]
1895 let mut storage = gs.lock();
1896 storage.remove(&normalized_name);
1897 });
1898
1899 with_global_metadata(|gm| {
1901 #[cfg(target_arch = "wasm32")]
1902 let mut metadata = gm.borrow_mut();
1903 #[cfg(not(target_arch = "wasm32"))]
1904 let mut metadata = gm.lock();
1905 metadata.remove(&normalized_name);
1906 });
1907
1908 with_global_commit_marker(|cm| {
1910 #[cfg(target_arch = "wasm32")]
1911 let mut markers = cm.borrow_mut();
1912 #[cfg(not(target_arch = "wasm32"))]
1913 let mut markers = cm.lock();
1914 log::info!(
1915 "Cleared commit marker for {} from GLOBAL storage",
1916 normalized_name
1917 );
1918 markers.remove(&normalized_name);
1919 });
1920
1921 let idb_name = format!("absurder_{}", normalized_name);
1923 let _delete_promise = js_sys::eval(&format!("indexedDB.deleteDatabase('{}')", idb_name))
1924 .map_err(|e| JsValue::from_str(&format!("Failed to delete IndexedDB: {:?}", e)))?;
1925
1926 log::info!("Database deleted: {}", normalized_name);
1927
1928 Self::remove_database_from_persistent_list(&normalized_name)?;
1930
1931 Ok(())
1932 }
1933
1934 #[allow(dead_code)]
1936 fn add_database_to_persistent_list(db_name: &str) -> Result<(), JsValue> {
1937 log::info!("add_database_to_persistent_list called for: {}", db_name);
1938
1939 let window = web_sys::window().ok_or_else(|| {
1940 log::error!("No window object");
1941 JsValue::from_str("No window")
1942 })?;
1943
1944 let storage = window
1945 .local_storage()
1946 .map_err(|e| {
1947 log::error!("Failed to get localStorage: {:?}", e);
1948 JsValue::from_str("No localStorage")
1949 })?
1950 .ok_or_else(|| {
1951 log::error!("localStorage not available");
1952 JsValue::from_str("localStorage not available")
1953 })?;
1954
1955 let key = "absurder_db_list";
1956 let existing = storage.get_item(key).map_err(|e| {
1957 log::error!("Failed to read localStorage key {}: {:?}", key, e);
1958 JsValue::from_str("Failed to read localStorage")
1959 })?;
1960
1961 log::debug!("Existing localStorage value: {:?}", existing);
1962
1963 let mut db_list: Vec<String> = if let Some(json_str) = existing {
1964 match serde_json::from_str(&json_str) {
1965 Ok(list) => {
1966 log::debug!("Parsed existing list: {:?}", list);
1967 list
1968 }
1969 Err(e) => {
1970 log::warn!("Failed to parse localStorage JSON: {}, starting fresh", e);
1971 Vec::new()
1972 }
1973 }
1974 } else {
1975 log::debug!("No existing list, creating new");
1976 Vec::new()
1977 };
1978
1979 if !db_list.contains(&db_name.to_string()) {
1980 db_list.push(db_name.to_string());
1981 db_list.sort();
1982 log::debug!("Updated list: {:?}", db_list);
1983
1984 let json_str = serde_json::to_string(&db_list).map_err(|e| {
1985 log::error!("Failed to serialize list: {}", e);
1986 JsValue::from_str("Failed to serialize")
1987 })?;
1988
1989 log::debug!("Writing to localStorage: {}", json_str);
1990
1991 storage.set_item(key, &json_str).map_err(|e| {
1992 log::error!("Failed to write to localStorage: {:?}", e);
1993 JsValue::from_str("Failed to write localStorage")
1994 })?;
1995
1996 log::info!("Successfully added {} to persistent database list", db_name);
1997 } else {
1998 log::info!("{} already in persistent list", db_name);
1999 }
2000
2001 Ok(())
2002 }
2003
2004 fn remove_database_from_persistent_list(db_name: &str) -> Result<(), JsValue> {
2006 let window = web_sys::window().ok_or_else(|| JsValue::from_str("No window"))?;
2007 let storage = window
2008 .local_storage()
2009 .map_err(|_| JsValue::from_str("No localStorage"))?
2010 .ok_or_else(|| JsValue::from_str("localStorage not available"))?;
2011
2012 let key = "absurder_db_list";
2013 let existing = storage
2014 .get_item(key)
2015 .map_err(|_| JsValue::from_str("Failed to read localStorage"))?;
2016
2017 if let Some(json_str) = existing {
2018 let mut db_list: Vec<String> =
2019 serde_json::from_str(&json_str).unwrap_or_else(|_| Vec::new());
2020 db_list.retain(|name| name != db_name);
2021 let json_str = serde_json::to_string(&db_list)
2022 .map_err(|_| JsValue::from_str("Failed to serialize"))?;
2023 storage
2024 .set_item(key, &json_str)
2025 .map_err(|_| JsValue::from_str("Failed to write localStorage"))?;
2026 log::info!("Removed {} from persistent database list", db_name);
2027 }
2028
2029 Ok(())
2030 }
2031
2032 fn get_persistent_database_list() -> Result<Vec<String>, JsValue> {
2034 log::info!("get_persistent_database_list called");
2035
2036 let window = web_sys::window().ok_or_else(|| {
2037 log::error!("No window object");
2038 JsValue::from_str("No window")
2039 })?;
2040
2041 let storage = window
2042 .local_storage()
2043 .map_err(|e| {
2044 log::error!("Failed to get localStorage: {:?}", e);
2045 JsValue::from_str("No localStorage")
2046 })?
2047 .ok_or_else(|| {
2048 log::error!("localStorage not available");
2049 JsValue::from_str("localStorage not available")
2050 })?;
2051
2052 let key = "absurder_db_list";
2053 let existing = storage.get_item(key).map_err(|e| {
2054 log::error!("Failed to read localStorage key {}: {:?}", key, e);
2055 JsValue::from_str("Failed to read localStorage")
2056 })?;
2057
2058 log::debug!("Read from localStorage: {:?}", existing);
2059
2060 if let Some(json_str) = existing {
2061 match serde_json::from_str::<Vec<String>>(&json_str) {
2062 Ok(db_list) => {
2063 log::info!(
2064 "Successfully parsed {} databases from localStorage",
2065 db_list.len()
2066 );
2067 log::debug!("Database list: {:?}", db_list);
2068 Ok(db_list)
2069 }
2070 Err(e) => {
2071 log::error!("Failed to parse localStorage JSON: {}", e);
2072 Ok(Vec::new())
2073 }
2074 }
2075 } else {
2076 log::info!("No persistent database list in localStorage");
2077 Ok(Vec::new())
2078 }
2079 }
2080
2081 fn start_write_queue_listener(db_name: &str) -> Result<(), JsValue> {
2083 use crate::storage::write_queue::{
2084 WriteQueueMessage, WriteResponse, register_write_queue_listener,
2085 };
2086 use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2087
2088 let db_name_clone = db_name.to_string();
2089
2090 let callback = Closure::wrap(Box::new(move |msg: JsValue| {
2091 let db_name_inner = db_name_clone.clone();
2092
2093 if let Ok(json_str) = js_sys::JSON::stringify(&msg) {
2095 if let Some(json_str) = json_str.as_string() {
2096 if let Ok(message) = serde_json::from_str::<WriteQueueMessage>(&json_str) {
2097 if let WriteQueueMessage::WriteRequest(request) = message {
2098 log::debug!("Leader received write request: {}", request.request_id);
2099
2100 let storage_rc = get_storage_with_fallback(&db_name_inner);
2102
2103 if let Some(storage) = storage_rc {
2104 wasm_bindgen_futures::spawn_local(async move {
2106 let is_leader = with_storage_async!(
2107 storage,
2108 "write_queue_is_leader",
2109 |s| s.is_leader()
2110 );
2111 if is_leader.is_none() {
2112 log::error!("Failed to check leader status");
2113 return;
2114 }
2115 let is_leader = is_leader.unwrap();
2116
2117 if !is_leader {
2118 log::error!("Not leader, ignoring write request");
2119 return;
2120 }
2121
2122 log::debug!("Processing write request as leader");
2123
2124 match Database::new_wasm(db_name_inner.clone()).await {
2126 Ok(mut db) => {
2127 match db.execute_internal(&request.sql).await {
2129 Ok(result) => {
2130 let response = WriteResponse::Success {
2132 request_id: request.request_id.clone(),
2133 affected_rows: result.affected_rows
2134 as usize,
2135 };
2136
2137 use crate::storage::write_queue::send_write_response;
2138 if let Err(e) = send_write_response(
2139 &db_name_inner,
2140 response,
2141 ) {
2142 log::error!(
2143 "Failed to send response: {}",
2144 e
2145 );
2146 } else {
2147 log::info!(
2148 "Write response sent successfully"
2149 );
2150 }
2151 }
2152 Err(e) => {
2153 let response = WriteResponse::Error {
2155 request_id: request.request_id.clone(),
2156 error_message: e.to_string(),
2157 };
2158
2159 use crate::storage::write_queue::send_write_response;
2160 if let Err(e) = send_write_response(
2161 &db_name_inner,
2162 response,
2163 ) {
2164 log::error!(
2165 "Failed to send error response: {}",
2166 e
2167 );
2168 }
2169 }
2170 }
2171 }
2172 Err(e) => {
2173 log::error!(
2174 "Failed to create db for write processing: {:?}",
2175 e
2176 );
2177 }
2178 }
2179 });
2180 }
2181 }
2182 }
2183 }
2184 }
2185 }) as Box<dyn FnMut(JsValue)>);
2186
2187 let callback_fn = callback.as_ref().unchecked_ref();
2188 register_write_queue_listener(db_name, callback_fn).map_err(|e| {
2189 JsValue::from_str(&format!("Failed to register write queue listener: {}", e))
2190 })?;
2191
2192 callback.forget();
2193
2194 Ok(())
2195 }
2196
2197 #[wasm_bindgen]
2198 pub async fn execute(&mut self, sql: &str) -> Result<JsValue, JsValue> {
2199 self.check_write_permission(sql)
2201 .await
2202 .map_err(|e| JsValue::from_str(&format!("Write permission denied: {}", e)))?;
2203
2204 let result = self
2205 .execute_internal(sql)
2206 .await
2207 .map_err(|e| JsValue::from_str(&format!("Query execution failed: {}", e)))?;
2208 serde_wasm_bindgen::to_value(&result).map_err(|e| JsValue::from_str(&e.to_string()))
2209 }
2210
2211 #[wasm_bindgen(js_name = "executeWithParams")]
2212 pub async fn execute_with_params(
2213 &mut self,
2214 sql: &str,
2215 params: JsValue,
2216 ) -> Result<JsValue, JsValue> {
2217 let params: Vec<ColumnValue> = serde_wasm_bindgen::from_value(params)
2218 .map_err(|e| JsValue::from_str(&format!("Invalid parameters: {}", e)))?;
2219
2220 self.check_write_permission(sql)
2222 .await
2223 .map_err(|e| JsValue::from_str(&format!("Write permission denied: {}", e)))?;
2224
2225 let result = self
2226 .execute_with_params_internal(sql, ¶ms)
2227 .await
2228 .map_err(|e| JsValue::from_str(&format!("Query execution failed: {}", e)))?;
2229 serde_wasm_bindgen::to_value(&result).map_err(|e| JsValue::from_str(&e.to_string()))
2230 }
2231
2232 #[wasm_bindgen]
2233 pub async fn close(&mut self) -> Result<(), JsValue> {
2234 self.close_internal()
2235 .await
2236 .map_err(|e| JsValue::from_str(&format!("Failed to close database: {}", e)))
2237 }
2238
2239 #[wasm_bindgen(js_name = "forceCloseConnection")]
2241 pub async fn force_close_connection(&mut self) -> Result<(), JsValue> {
2242 let _ = self.close_internal().await;
2244
2245 let pool_key = self.name.trim_end_matches(".db");
2248 crate::connection_pool::force_close_connection(pool_key);
2249
2250 #[cfg(target_arch = "wasm32")]
2252 {
2253 crate::cleanup::cleanup_all_state(pool_key)
2254 .await
2255 .map_err(|e| JsValue::from_str(&format!("Cleanup failed: {}", e)))?;
2256 }
2257 log::info!("Force closed and removed connection for: {}", self.name);
2258 Ok(())
2259 }
2260
2261 #[wasm_bindgen]
2262 pub async fn sync(&mut self) -> Result<(), JsValue> {
2263 self.sync_internal()
2264 .await
2265 .map_err(|e| JsValue::from_str(&format!("Failed to sync database: {}", e)))
2266 }
2267
2268 #[wasm_bindgen(js_name = "allowNonLeaderWrites")]
2270 pub async fn allow_non_leader_writes(&mut self, allow: bool) -> Result<(), JsValue> {
2271 log::debug!("Setting allowNonLeaderWrites = {} for {}", allow, self.name);
2272 self.allow_non_leader_writes = allow;
2273 Ok(())
2274 }
2275
2276 #[wasm_bindgen(js_name = "exportToFile")]
2292 pub async fn export_to_file(&self) -> Result<js_sys::Uint8Array, JsValue> {
2293 let db_name = self.name.clone();
2294 let max_export_size = self.max_export_size_bytes;
2295
2296 log::info!("[EXPORT] ===== Step 1: Acquiring lock");
2297
2298 let _guard = weblocks::acquire(&db_name, weblocks::AcquireOptions::exclusive()).await?;
2300 log::info!("[EXPORT] ===== Step 2: Lock acquired");
2301
2302 use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2304 log::info!("[EXPORT] ===== Step 3: Getting storage");
2305 let storage_rc = get_storage_with_fallback(&db_name).ok_or_else(|| {
2306 JsValue::from_str(&format!("Storage not found for database: {}", db_name))
2307 })?;
2308 log::info!("[EXPORT] ===== Step 4: Got storage, reloading cache");
2309
2310 #[cfg(target_arch = "wasm32")]
2312 {
2313 storage_rc.reload_cache_from_global_storage();
2314 }
2315
2316 log::info!("[EXPORT] ===== Step 5: Checkpointing WAL");
2319 if !self.connection_state.db.get().is_null() {
2320 use std::ffi::CString;
2322 let pragma = CString::new("PRAGMA wal_checkpoint(PASSIVE)").unwrap();
2323 unsafe {
2324 let mut stmt = std::ptr::null_mut();
2325 let rc = sqlite_wasm_rs::sqlite3_prepare_v2(
2326 self.connection_state.db.get(),
2327 pragma.as_ptr(),
2328 -1,
2329 &mut stmt,
2330 std::ptr::null_mut(),
2331 );
2332 if rc == sqlite_wasm_rs::SQLITE_OK && !stmt.is_null() {
2333 sqlite_wasm_rs::sqlite3_step(stmt);
2334 sqlite_wasm_rs::sqlite3_finalize(stmt);
2335 log::info!("[EXPORT] WAL checkpoint completed");
2336 } else {
2337 log::warn!("[EXPORT] WAL checkpoint failed with rc: {}", rc);
2338 }
2339 }
2340 }
2341
2342 log::info!("[EXPORT] ===== Step 6: Starting sync");
2343 storage_rc
2345 .sync()
2346 .await
2347 .map_err(|e| JsValue::from_str(&format!("Sync failed: {}", e)))?;
2348 log::info!("[EXPORT] ===== Step 7: Sync complete");
2349
2350 log::info!("[EXPORT] Calling export_database_to_bytes");
2352 let db_bytes = {
2353 let storage = &*storage_rc;
2354 crate::storage::export::export_database_to_bytes(storage, max_export_size)
2355 .await
2356 .map_err(|e| {
2357 log::error!("[EXPORT] Export failed: {}", e);
2358 JsValue::from_str(&format!("Export failed: {}", e))
2359 })?
2360 };
2361
2362 log::info!("[EXPORT] Export complete: {} bytes", db_bytes.len());
2363
2364 let uint8_array = js_sys::Uint8Array::new_with_length(db_bytes.len() as u32);
2365 uint8_array.copy_from(&db_bytes);
2366
2367 Ok(uint8_array)
2368 }
2369
2370 #[wasm_bindgen(js_name = "testLock")]
2372 pub async fn test_lock(&self, value: u32) -> Result<u32, JsValue> {
2373 let lock_name = format!("{}.lock_test", self.name);
2374
2375 log::info!(
2376 "[LOCK-TEST] Acquiring lock: {} with value: {}",
2377 lock_name,
2378 value
2379 );
2380 let _guard = weblocks::acquire(&lock_name, weblocks::AcquireOptions::exclusive()).await?;
2381 log::info!("[LOCK-TEST] Lock acquired, processing value: {}", value);
2382
2383 let result = value + 1;
2385
2386 let delay_promise = js_sys::Promise::new(&mut |resolve, _reject| {
2388 let window = web_sys::window().unwrap();
2389 let _ = window
2390 .set_timeout_with_callback_and_timeout_and_arguments_0(resolve.unchecked_ref(), 10);
2391 });
2392 wasm_bindgen_futures::JsFuture::from(delay_promise).await?;
2393
2394 log::info!(
2395 "[LOCK-TEST] Lock releasing for: {} with result: {}",
2396 lock_name,
2397 result
2398 );
2399 Ok(result)
2400 }
2401
2402 #[wasm_bindgen(js_name = "importFromFile")]
2432 pub async fn import_from_file(&mut self, file_data: js_sys::Uint8Array) -> Result<(), JsValue> {
2433 log::info!("[IMPORT] Starting import with lock for: {}", self.name);
2434 let db_name = self.name.clone();
2435 let data = file_data.to_vec();
2436
2437 let _guard = weblocks::acquire(&db_name, weblocks::AcquireOptions::exclusive()).await?;
2439 log::info!("[IMPORT] Lock acquired for: {}", db_name);
2440
2441 log::debug!("Import data size: {} bytes", data.len());
2442
2443 log::debug!("Force-closing database connection before import");
2447
2448 self.close_internal()
2450 .await
2451 .map_err(|e| JsValue::from_str(&format!("Failed to close before import: {}", e)))?;
2452
2453 let pool_key = self.name.trim_end_matches(".db");
2455 crate::connection_pool::force_close_connection(pool_key);
2456
2457 self.connection_state.db.set(std::ptr::null_mut());
2459 log::debug!("Removed connection from pool for import");
2460
2461 crate::storage::import::import_database_from_bytes(&db_name, data)
2463 .await
2464 .map_err(|e| {
2465 log::error!("Import failed for {}: {}", db_name, e);
2466 JsValue::from_str(&format!("Import failed: {}", e))
2467 })?;
2468
2469 log::info!("[IMPORT] Import complete for: {}", db_name);
2470
2471 Ok(())
2475 }
2476
2477 #[wasm_bindgen(js_name = "waitForLeadership")]
2479 pub async fn wait_for_leadership(&mut self) -> Result<(), JsValue> {
2480 use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2481
2482 #[cfg(feature = "telemetry")]
2484 if let Some(ref metrics) = self.metrics {
2485 metrics.leader_election_attempts_total().inc();
2486 }
2487
2488 let db_name = &self.name;
2489 let start_time = js_sys::Date::now();
2490
2491 let timeout_ms = 5000.0; loop {
2494 let storage_rc = get_storage_with_fallback(db_name);
2495
2496 if let Some(storage) = storage_rc {
2497 let is_leader =
2498 match with_storage_async!(storage, "wait_for_leadership", |s| s.is_leader()) {
2499 Some(v) => v,
2500 None => continue,
2501 };
2502
2503 if is_leader {
2504 log::info!("Became leader for {}", db_name);
2505
2506 #[cfg(feature = "telemetry")]
2508 if let Some(ref metrics) = self.metrics {
2509 let duration_ms = js_sys::Date::now() - start_time;
2510 metrics.leader_election_duration().observe(duration_ms);
2511 metrics.is_leader().set(1.0);
2512 metrics.leadership_changes_total().inc();
2513 }
2514
2515 return Ok(());
2516 }
2517 }
2518
2519 if js_sys::Date::now() - start_time > timeout_ms {
2521 #[cfg(feature = "telemetry")]
2523 if let Some(ref metrics) = self.metrics {
2524 let duration_ms = js_sys::Date::now() - start_time;
2525 metrics.leader_election_duration().observe(duration_ms);
2526 }
2527
2528 return Err(JsValue::from_str("Timeout waiting for leadership"));
2529 }
2530
2531 let promise = js_sys::Promise::new(&mut |resolve, _| {
2533 let window = web_sys::window().expect("should have window");
2534 let _ = window.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, 100);
2535 });
2536 let _ = wasm_bindgen_futures::JsFuture::from(promise).await;
2537 }
2538 }
2539
2540 #[wasm_bindgen(js_name = "requestLeadership")]
2542 pub async fn request_leadership(&mut self) -> Result<(), JsValue> {
2543 use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2544
2545 let db_name = &self.name;
2546 log::debug!("Requesting leadership for {}", db_name);
2547
2548 #[cfg(feature = "telemetry")]
2550 let telemetry_data = if self.metrics.is_some() {
2551 let start_time = js_sys::Date::now();
2552 let was_leader = self
2553 .is_leader_wasm()
2554 .await
2555 .ok()
2556 .and_then(|v| v.as_bool())
2557 .unwrap_or(false);
2558
2559 if let Some(ref metrics) = self.metrics {
2560 metrics.leader_elections_total().inc();
2561 }
2562
2563 Some((start_time, was_leader))
2564 } else {
2565 None
2566 };
2567
2568 let storage_rc = get_storage_with_fallback(db_name);
2569
2570 if let Some(storage) = storage_rc {
2571 {
2572 let result = with_storage_async!(storage, "request_leadership", |s| s
2574 .start_leader_election())
2575 .ok_or_else(|| {
2576 JsValue::from_str("Failed to acquire storage lock for leadership request")
2577 })?;
2578 result.map_err(|e| {
2579 JsValue::from_str(&format!("Failed to request leadership: {}", e))
2580 })?;
2581
2582 log::debug!("Re-election triggered for {}", db_name);
2583 } #[cfg(feature = "telemetry")]
2587 if let Some((start_time, was_leader)) = telemetry_data {
2588 if let Some(ref metrics) = self.metrics {
2589 let duration_ms = js_sys::Date::now() - start_time;
2591 metrics.leader_election_duration().observe(duration_ms);
2592
2593 let is_leader_now = self
2595 .is_leader_wasm()
2596 .await
2597 .ok()
2598 .and_then(|v| v.as_bool())
2599 .unwrap_or(false);
2600
2601 metrics
2603 .is_leader()
2604 .set(if is_leader_now { 1.0 } else { 0.0 });
2605
2606 if was_leader != is_leader_now {
2608 metrics.leadership_changes_total().inc();
2609 }
2610 }
2611 }
2612
2613 Ok(())
2614 } else {
2615 Err(JsValue::from_str(&format!(
2616 "No storage found for database: {}",
2617 db_name
2618 )))
2619 }
2620 }
2621
2622 #[wasm_bindgen(js_name = "getLeaderInfo")]
2624 pub async fn get_leader_info(&mut self) -> Result<JsValue, JsValue> {
2625 use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2626
2627 let db_name = &self.name;
2628
2629 let storage_rc = get_storage_with_fallback(db_name);
2630
2631 if let Some(storage) = storage_rc {
2632 let is_leader = with_storage_async!(storage, "get_leader_info", |s| s.is_leader())
2633 .ok_or_else(|| {
2634 JsValue::from_str(&format!(
2635 "Failed to access storage for database: {}",
2636 db_name
2637 ))
2638 })?;
2639
2640 let leader_id_str = if is_leader {
2643 format!("leader_{}", db_name)
2644 } else {
2645 "unknown".to_string()
2646 };
2647
2648 let obj = js_sys::Object::new();
2650 js_sys::Reflect::set(&obj, &"isLeader".into(), &JsValue::from_bool(is_leader))?;
2651 js_sys::Reflect::set(&obj, &"leaderId".into(), &JsValue::from_str(&leader_id_str))?;
2652 js_sys::Reflect::set(
2653 &obj,
2654 &"leaseExpiry".into(),
2655 &JsValue::from_f64(js_sys::Date::now()),
2656 )?;
2657
2658 Ok(obj.into())
2659 } else {
2660 Err(JsValue::from_str(&format!(
2661 "No storage found for database: {}",
2662 db_name
2663 )))
2664 }
2665 }
2666
2667 #[wasm_bindgen(js_name = "queueWrite")]
2678 pub async fn queue_write(&mut self, sql: String) -> Result<(), JsValue> {
2679 self.queue_write_with_timeout(sql, 5000).await
2680 }
2681
2682 #[wasm_bindgen(js_name = "queueWriteWithTimeout")]
2688 pub async fn queue_write_with_timeout(
2689 &mut self,
2690 sql: String,
2691 timeout_ms: u32,
2692 ) -> Result<(), JsValue> {
2693 use crate::storage::write_queue::{WriteQueueMessage, WriteResponse, send_write_request};
2694 use std::cell::RefCell;
2695 use std::rc::Rc;
2696
2697 log::debug!("Queuing write: {}", sql);
2698
2699 use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2701 let is_leader = {
2702 let storage_rc = get_storage_with_fallback(&self.name);
2703
2704 if let Some(storage) = storage_rc {
2705 with_storage_async!(storage, "queue_write_is_leader", |s| s.is_leader())
2706 .unwrap_or(false)
2707 } else {
2708 false
2709 }
2710 };
2711
2712 if is_leader {
2713 log::debug!("We are leader, executing directly");
2714 return self
2715 .execute_internal(&sql)
2716 .await
2717 .map(|_| ())
2718 .map_err(|e| JsValue::from_str(&format!("Execute failed: {}", e)));
2719 }
2720
2721 let request_id = send_write_request(&self.name, &sql)
2723 .map_err(|e| JsValue::from_str(&format!("Failed to send write request: {}", e)))?;
2724
2725 log::debug!("Write request sent with ID: {}", request_id);
2726
2727 let response_received = Rc::new(RefCell::new(false));
2729 let response_error = Rc::new(RefCell::new(None::<String>));
2730
2731 let response_received_clone = response_received.clone();
2732 let response_error_clone = response_error.clone();
2733 let request_id_clone = request_id.clone();
2734
2735 let callback = Closure::wrap(Box::new(move |msg: JsValue| {
2737 if let Ok(json_str) = js_sys::JSON::stringify(&msg) {
2739 if let Some(json_str) = json_str.as_string() {
2740 if let Ok(message) = serde_json::from_str::<WriteQueueMessage>(&json_str) {
2741 if let WriteQueueMessage::WriteResponse(response) = message {
2742 match response {
2743 WriteResponse::Success { request_id, .. } => {
2744 if request_id == request_id_clone {
2745 *response_received_clone.borrow_mut() = true;
2746 log::debug!("Write response received: Success");
2747 }
2748 }
2749 WriteResponse::Error {
2750 request_id,
2751 error_message,
2752 } => {
2753 if request_id == request_id_clone {
2754 *response_received_clone.borrow_mut() = true;
2755 *response_error_clone.borrow_mut() = Some(error_message);
2756 log::debug!("Write response received: Error");
2757 }
2758 }
2759 }
2760 }
2761 }
2762 }
2763 }
2764 }) as Box<dyn FnMut(JsValue)>);
2765
2766 use crate::storage::write_queue::register_write_queue_listener;
2768 let callback_fn = callback.as_ref().unchecked_ref();
2769 register_write_queue_listener(&self.name, callback_fn)
2770 .map_err(|e| JsValue::from_str(&format!("Failed to register listener: {}", e)))?;
2771
2772 callback.forget();
2774
2775 let start_time = js_sys::Date::now();
2777 let timeout_f64 = timeout_ms as f64;
2778
2779 loop {
2780 if *response_received.borrow() {
2782 if let Some(error_msg) = response_error.borrow().as_ref() {
2783 return Err(JsValue::from_str(&format!("Write failed: {}", error_msg)));
2784 }
2785 log::info!("Write completed successfully");
2786 return Ok(());
2787 }
2788
2789 let elapsed = js_sys::Date::now() - start_time;
2791 if elapsed > timeout_f64 {
2792 return Err(JsValue::from_str("Write request timed out"));
2793 }
2794
2795 wasm_bindgen_futures::JsFuture::from(js_sys::Promise::new(&mut |resolve, _reject| {
2797 if let Some(window) = web_sys::window() {
2798 let _ =
2799 window.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, 100);
2800 } else {
2801 log::error!("Window unavailable in timeout handler");
2802 }
2803 }))
2804 .await
2805 .ok();
2806 }
2807 }
2808
2809 #[wasm_bindgen(js_name = "isLeader")]
2810 pub async fn is_leader_wasm(&self) -> Result<JsValue, JsValue> {
2811 use crate::vfs::indexeddb_vfs::get_storage_with_fallback;
2813
2814 let db_name = &self.name;
2815 log::debug!("isLeader() called for database: {} (self.name)", db_name);
2816
2817 let storage_rc = get_storage_with_fallback(db_name);
2818
2819 if let Some(storage) = storage_rc {
2820 log::debug!("Found storage for {}, calling is_leader()", db_name);
2821 let is_leader = with_storage_async!(storage, "is_leader_wasm", |s| s.is_leader())
2822 .ok_or_else(|| {
2823 JsValue::from_str(&format!(
2824 "Failed to access storage for database: {}",
2825 db_name
2826 ))
2827 })?;
2828 log::debug!("is_leader() = {} for {}", is_leader, db_name);
2829
2830 Ok(JsValue::from_bool(is_leader))
2832 } else {
2833 log::error!("ERROR: No storage found for database: {}", db_name);
2834 Err(JsValue::from_str(&format!(
2835 "No storage found for database: {}",
2836 db_name
2837 )))
2838 }
2839 }
2840
2841 pub async fn is_leader(&self) -> Result<bool, JsValue> {
2843 let result = self.is_leader_wasm().await?;
2844 Ok(result.as_bool().unwrap_or(false))
2845 }
2846
2847 #[wasm_bindgen(js_name = "onDataChange")]
2848 pub fn on_data_change_wasm(&mut self, callback: &js_sys::Function) -> Result<(), JsValue> {
2849 log::debug!("Registering onDataChange callback for {}", self.name);
2850
2851 self.on_data_change_callback = Some(callback.clone());
2853
2854 use crate::storage::broadcast_notifications::register_change_listener;
2856
2857 let db_name = &self.name;
2858 register_change_listener(db_name, callback).map_err(|e| {
2859 JsValue::from_str(&format!("Failed to register change listener: {}", e))
2860 })?;
2861
2862 log::debug!("onDataChange callback registered for {}", self.name);
2863 Ok(())
2864 }
2865
2866 #[wasm_bindgen(js_name = "enableOptimisticUpdates")]
2868 pub async fn enable_optimistic_updates(&mut self, enabled: bool) -> Result<(), JsValue> {
2869 self.optimistic_updates_manager
2870 .borrow_mut()
2871 .set_enabled(enabled);
2872 log::debug!(
2873 "Optimistic updates {}",
2874 if enabled { "enabled" } else { "disabled" }
2875 );
2876 Ok(())
2877 }
2878
2879 #[wasm_bindgen(js_name = "isOptimisticMode")]
2881 pub async fn is_optimistic_mode(&self) -> bool {
2882 self.optimistic_updates_manager.borrow().is_enabled()
2883 }
2884
2885 #[wasm_bindgen(js_name = "trackOptimisticWrite")]
2887 pub async fn track_optimistic_write(&mut self, sql: String) -> Result<String, JsValue> {
2888 let id = self
2889 .optimistic_updates_manager
2890 .borrow_mut()
2891 .track_write(sql);
2892 Ok(id)
2893 }
2894
2895 #[wasm_bindgen(js_name = "getPendingWritesCount")]
2897 pub async fn get_pending_writes_count(&self) -> usize {
2898 self.optimistic_updates_manager.borrow().get_pending_count()
2899 }
2900
2901 #[wasm_bindgen(js_name = "clearOptimisticWrites")]
2903 pub async fn clear_optimistic_writes(&mut self) -> Result<(), JsValue> {
2904 self.optimistic_updates_manager.borrow_mut().clear_all();
2905 Ok(())
2906 }
2907
2908 #[wasm_bindgen(js_name = "enableCoordinationMetrics")]
2910 pub async fn enable_coordination_metrics(&mut self, enabled: bool) -> Result<(), JsValue> {
2911 self.coordination_metrics_manager
2912 .borrow_mut()
2913 .set_enabled(enabled);
2914 Ok(())
2915 }
2916
2917 #[wasm_bindgen(js_name = "isCoordinationMetricsEnabled")]
2919 pub async fn is_coordination_metrics_enabled(&self) -> bool {
2920 self.coordination_metrics_manager.borrow().is_enabled()
2921 }
2922
2923 #[wasm_bindgen(js_name = "recordLeadershipChange")]
2925 pub async fn record_leadership_change(&mut self, became_leader: bool) -> Result<(), JsValue> {
2926 self.coordination_metrics_manager
2927 .borrow_mut()
2928 .record_leadership_change(became_leader);
2929 Ok(())
2930 }
2931
2932 #[wasm_bindgen(js_name = "recordNotificationLatency")]
2934 pub async fn record_notification_latency(&mut self, latency_ms: f64) -> Result<(), JsValue> {
2935 self.coordination_metrics_manager
2936 .borrow_mut()
2937 .record_notification_latency(latency_ms);
2938 Ok(())
2939 }
2940
2941 #[wasm_bindgen(js_name = "recordWriteConflict")]
2943 pub async fn record_write_conflict(&mut self) -> Result<(), JsValue> {
2944 self.coordination_metrics_manager
2945 .borrow_mut()
2946 .record_write_conflict();
2947 Ok(())
2948 }
2949
2950 #[wasm_bindgen(js_name = "recordFollowerRefresh")]
2952 pub async fn record_follower_refresh(&mut self) -> Result<(), JsValue> {
2953 self.coordination_metrics_manager
2954 .borrow_mut()
2955 .record_follower_refresh();
2956 Ok(())
2957 }
2958
2959 #[wasm_bindgen(js_name = "getCoordinationMetrics")]
2961 pub async fn get_coordination_metrics(&self) -> Result<String, JsValue> {
2962 self.coordination_metrics_manager
2963 .borrow()
2964 .get_metrics_json()
2965 .map_err(|e| JsValue::from_str(&e))
2966 }
2967
2968 #[wasm_bindgen(js_name = "resetCoordinationMetrics")]
2970 pub async fn reset_coordination_metrics(&mut self) -> Result<(), JsValue> {
2971 self.coordination_metrics_manager.borrow_mut().reset();
2972 Ok(())
2973 }
2974}
2975
2976#[cfg(target_arch = "wasm32")]
2978#[wasm_bindgen]
2979pub struct WasmColumnValue {
2980 #[allow(dead_code)]
2981 inner: ColumnValue,
2982}
2983
2984#[cfg(target_arch = "wasm32")]
2985#[wasm_bindgen]
2986impl WasmColumnValue {
2987 #[wasm_bindgen(js_name = "createNull")]
2988 pub fn create_null() -> WasmColumnValue {
2989 WasmColumnValue {
2990 inner: ColumnValue::Null,
2991 }
2992 }
2993
2994 #[wasm_bindgen(js_name = "createInteger")]
2995 pub fn create_integer(value: i64) -> WasmColumnValue {
2996 WasmColumnValue {
2997 inner: ColumnValue::Integer(value),
2998 }
2999 }
3000
3001 #[wasm_bindgen(js_name = "createReal")]
3002 pub fn create_real(value: f64) -> WasmColumnValue {
3003 WasmColumnValue {
3004 inner: ColumnValue::Real(value),
3005 }
3006 }
3007
3008 #[wasm_bindgen(js_name = "createText")]
3009 pub fn create_text(value: String) -> WasmColumnValue {
3010 WasmColumnValue {
3011 inner: ColumnValue::Text(value),
3012 }
3013 }
3014
3015 #[wasm_bindgen(js_name = "createBlob")]
3016 pub fn create_blob(value: &[u8]) -> WasmColumnValue {
3017 WasmColumnValue {
3018 inner: ColumnValue::Blob(value.to_vec()),
3019 }
3020 }
3021
3022 #[wasm_bindgen(js_name = "createBigInt")]
3023 pub fn create_bigint(value: &str) -> WasmColumnValue {
3024 WasmColumnValue {
3025 inner: ColumnValue::BigInt(value.to_string()),
3026 }
3027 }
3028
3029 #[wasm_bindgen(js_name = "createDate")]
3030 pub fn create_date(timestamp: f64) -> WasmColumnValue {
3031 WasmColumnValue {
3032 inner: ColumnValue::Date(timestamp as i64),
3033 }
3034 }
3035
3036 #[wasm_bindgen(js_name = "fromJsValue")]
3037 pub fn from_js_value(value: &JsValue) -> WasmColumnValue {
3038 if value.is_null() || value.is_undefined() {
3039 WasmColumnValue {
3040 inner: ColumnValue::Null,
3041 }
3042 } else if let Some(s) = value.as_string() {
3043 if let Ok(parsed) = s.parse::<i64>() {
3045 WasmColumnValue {
3046 inner: ColumnValue::Integer(parsed),
3047 }
3048 } else {
3049 WasmColumnValue {
3050 inner: ColumnValue::Text(s),
3051 }
3052 }
3053 } else if let Some(n) = value.as_f64() {
3054 if n.fract() == 0.0 && n >= i64::MIN as f64 && n <= i64::MAX as f64 {
3055 WasmColumnValue {
3056 inner: ColumnValue::Integer(n as i64),
3057 }
3058 } else {
3059 WasmColumnValue {
3060 inner: ColumnValue::Real(n),
3061 }
3062 }
3063 } else if value.is_object() {
3064 if js_sys::Date::new(value).get_time().is_finite() {
3066 let timestamp = js_sys::Date::new(value).get_time() as i64;
3067 WasmColumnValue {
3068 inner: ColumnValue::Date(timestamp),
3069 }
3070 } else {
3071 WasmColumnValue {
3073 inner: ColumnValue::Text(format!("{:?}", value)),
3074 }
3075 }
3076 } else {
3077 WasmColumnValue {
3078 inner: ColumnValue::Null,
3079 }
3080 }
3081 }
3082
3083 pub fn null() -> WasmColumnValue {
3087 Self::create_null()
3088 }
3089
3090 pub fn integer(value: f64) -> WasmColumnValue {
3092 Self::create_integer(value as i64)
3093 }
3094
3095 pub fn real(value: f64) -> WasmColumnValue {
3096 Self::create_real(value)
3097 }
3098
3099 pub fn text(value: String) -> WasmColumnValue {
3100 Self::create_text(value)
3101 }
3102
3103 pub fn blob(value: Vec<u8>) -> WasmColumnValue {
3104 Self::create_blob(&value)
3105 }
3106
3107 pub fn big_int(value: String) -> WasmColumnValue {
3108 Self::create_bigint(&value)
3109 }
3110
3111 pub fn date(timestamp_ms: f64) -> WasmColumnValue {
3112 Self::create_date(timestamp_ms)
3113 }
3114}