1#[cfg(target_arch = "wasm32")]
5use super::metadata::{BlockMetadataPersist, ChecksumAlgorithm};
6#[cfg(target_arch = "wasm32")]
7use super::{BlockStorage, vfs_sync};
8#[cfg(target_arch = "wasm32")]
9use crate::types::DatabaseError;
10#[cfg(target_arch = "wasm32")]
11use futures::channel::oneshot;
12#[cfg(target_arch = "wasm32")]
13use futures::lock::Mutex;
14#[cfg(target_arch = "wasm32")]
15use std::cell::RefCell;
16#[cfg(target_arch = "wasm32")]
17use std::collections::HashMap;
18#[cfg(target_arch = "wasm32")]
19use std::sync::Arc;
20
21#[cfg(target_arch = "wasm32")]
22thread_local! {
23 static INDEXEDDB_MUTEX: RefCell<Arc<Mutex<()>>> = RefCell::new(Arc::new(Mutex::new(())));
26}
27
28#[allow(unused_macros)]
30macro_rules! lock_mutex {
31 ($mutex:expr) => {
32 $mutex
33 .try_borrow_mut()
34 .expect("RefCell borrow failed - reentrancy issue")
35 };
36}
37
38#[cfg(target_arch = "wasm32")]
41fn get_indexeddb_factory() -> Result<web_sys::IdbFactory, DatabaseError> {
42 use wasm_bindgen::JsCast;
43
44 let global = js_sys::global();
46
47 let indexed_db_value =
51 js_sys::Reflect::get(&global, &wasm_bindgen::JsValue::from_str("indexedDB")).map_err(
52 |e| {
53 DatabaseError::new(
54 "INDEXEDDB_ACCESS_ERROR",
55 &format!("Failed to access indexedDB property: {:?}", e),
56 )
57 },
58 )?;
59
60 if indexed_db_value.is_null() || indexed_db_value.is_undefined() {
62 return Err(DatabaseError::new(
63 "INDEXEDDB_UNAVAILABLE",
64 "IndexedDB is not supported in this environment",
65 ));
66 }
67
68 let indexed_db = indexed_db_value
70 .dyn_into::<web_sys::IdbFactory>()
71 .map_err(|_| {
72 DatabaseError::new(
73 "INDEXEDDB_TYPE_ERROR",
74 "indexedDB property is not an IdbFactory",
75 )
76 })?;
77
78 Ok(indexed_db)
79}
80
81#[cfg(target_arch = "wasm32")]
83fn open_indexeddb(db_name: &str, version: u32) -> Result<web_sys::IdbOpenDbRequest, DatabaseError> {
84 let factory = get_indexeddb_factory()?;
85
86 factory.open_with_u32(db_name, version).map_err(|e| {
87 DatabaseError::new(
88 "INDEXEDDB_OPEN_ERROR",
89 &format!("Failed to open IndexedDB '{}': {:?}", db_name, e),
90 )
91 })
92}
93
94#[cfg(target_arch = "wasm32")]
101pub async fn perform_indexeddb_recovery_scan(db_name: &str) -> Result<bool, DatabaseError> {
102 log::debug!("Starting IndexedDB recovery scan for {}", db_name);
103
104 let has_existing_marker =
107 vfs_sync::with_global_commit_marker(|cm| cm.borrow().contains_key(db_name));
108
109 if has_existing_marker {
110 log::debug!(
111 "Recovery scan - found existing commit marker for {}",
112 db_name
113 );
114 return Ok(true);
115 }
116
117 log::debug!("Recovery scan - no existing state found for {}", db_name);
118 Ok(false)
119}
120
121#[cfg(target_arch = "wasm32")]
123pub async fn restore_from_indexeddb(db_name: &str) -> Result<(), DatabaseError> {
124 use super::retry_logic::with_retry;
125
126 let db_name = db_name.to_string();
127
128 with_retry("restore_from_indexeddb", || {
129 let db_name = db_name.clone();
130 async move { restore_from_indexeddb_internal(&db_name, false).await }
131 })
132 .await
133}
134
135#[cfg(target_arch = "wasm32")]
137pub async fn restore_from_indexeddb_force(db_name: &str) -> Result<(), DatabaseError> {
138 use super::retry_logic::with_retry;
139
140 let db_name = db_name.to_string();
141
142 with_retry("restore_from_indexeddb_force", || {
143 let db_name = db_name.clone();
144 async move { restore_from_indexeddb_internal(&db_name, true).await }
145 })
146 .await
147}
148
149#[cfg(target_arch = "wasm32")]
151async fn restore_from_indexeddb_internal(db_name: &str, force: bool) -> Result<(), DatabaseError> {
152 use wasm_bindgen::JsCast;
153 use wasm_bindgen::JsValue;
154
155 web_sys::console::log_1(
156 &format!(
157 "[RESTORE] restore_from_indexeddb_internal called for: {} (force={})",
158 db_name, force
159 )
160 .into(),
161 );
162
163 let mutex = INDEXEDDB_MUTEX.with(|m| m.borrow().clone());
166 #[cfg(target_arch = "wasm32")]
167 web_sys::console::log_1(&format!("[RESTORE] Acquiring IndexedDB mutex...").into());
168 let _guard = mutex.lock().await;
169 #[cfg(target_arch = "wasm32")]
170 web_sys::console::log_1(
171 &format!("[RESTORE] Mutex acquired, proceeding with restoration").into(),
172 );
173
174 log::debug!("Starting restoration for {}", db_name);
175
176 let existing_marker =
178 vfs_sync::with_global_commit_marker(|cm| cm.borrow().get(db_name).copied());
179
180 let (has_blocks, block_count, block_0_size) = vfs_sync::with_global_storage(|gs| {
182 if let Some(db_storage) = gs.borrow().get(db_name) {
183 let count = db_storage.len();
184 let b0_size = db_storage.get(&0).map(|d| d.len()).unwrap_or(0);
185 (!db_storage.is_empty(), count, b0_size)
186 } else {
187 (false, 0, 0)
188 }
189 });
190
191 #[cfg(target_arch = "wasm32")]
192 web_sys::console::log_1(
193 &format!(
194 "[RESTORE] Commit marker: {:?}, Has blocks: {} (count={}, block_0_size={})",
195 existing_marker, has_blocks, block_count, block_0_size
196 )
197 .into(),
198 );
199
200 if let Some(_marker) = existing_marker {
201 log::debug!("Found existing commit marker for {}", db_name);
202
203 if has_blocks && !force {
204 log::debug!(
205 "Blocks already loaded for {}, skipping restoration",
206 db_name
207 );
208 #[cfg(target_arch = "wasm32")]
209 web_sys::console::log_1(
210 &format!("[RESTORE] Blocks already loaded, skipping IndexedDB restore").into(),
211 );
212 return Ok(());
213 } else if has_blocks && force {
214 #[cfg(target_arch = "wasm32")]
215 web_sys::console::log_1(
216 &format!("[RESTORE] Blocks exist but force=true, reloading from IndexedDB").into(),
217 );
218 }
219
220 log::debug!("Commit marker exists but no blocks - opening IndexedDB to restore blocks");
221 #[cfg(target_arch = "wasm32")]
222 web_sys::console::log_1(&format!("[RESTORE] Opening IndexedDB to restore blocks").into());
223
224 let open_req = open_indexeddb("block_storage", 2)?;
226
227 let (tx, rx) = futures::channel::oneshot::channel::<Result<web_sys::IdbDatabase, String>>();
228 let tx = std::rc::Rc::new(std::cell::RefCell::new(Some(tx)));
229
230 let success_tx = tx.clone();
231 let success_callback =
232 wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
233 if let Some(tx) = success_tx.borrow_mut().take() {
234 let target = event.target().unwrap();
235 let request: web_sys::IdbOpenDbRequest = target.unchecked_into();
236 let result = request.result().unwrap();
237 let db: web_sys::IdbDatabase = result.unchecked_into();
238 let _ = tx.send(Ok(db));
239 }
240 }) as Box<dyn FnMut(_)>);
241
242 open_req.set_onsuccess(Some(success_callback.as_ref().unchecked_ref()));
243 success_callback.forget();
244
245 if let Ok(Ok(db)) = rx.await {
246 #[cfg(target_arch = "wasm32")]
247 web_sys::console::log_1(
248 &format!("[RESTORE] IndexedDB opened, starting block restoration").into(),
249 );
250 restore_blocks_from_indexeddb(&db, db_name).await?;
251 #[cfg(target_arch = "wasm32")]
252 web_sys::console::log_1(&format!("[RESTORE] Block restoration complete").into());
253 return Ok(());
254 }
255
256 return Err(DatabaseError::new(
257 "INDEXEDDB_OPEN_FAILED",
258 "Failed to open IndexedDB for block restoration",
259 ));
260 } else {
261 log::debug!(
262 "No existing commit marker found for {}, trying IndexedDB restoration",
263 db_name
264 );
265 }
266
267 let open_req = open_indexeddb("block_storage", 2)?;
269
270 let upgrade_closure =
272 wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
273 log::debug!("Upgrade handler called in restore_from_indexeddb");
274 let target = event.target().unwrap();
275 let request: web_sys::IdbOpenDbRequest = target.unchecked_into();
276 let result = request.result().unwrap();
277 let db: web_sys::IdbDatabase = result.unchecked_into();
278
279 if !db.object_store_names().contains("blocks") {
280 let _ = db.create_object_store("blocks");
281 log::info!("Created blocks store in restore upgrade");
282 }
283 if !db.object_store_names().contains("metadata") {
284 let _ = db.create_object_store("metadata");
285 log::info!("Created metadata store in restore upgrade");
286 }
287 }) as Box<dyn FnMut(_)>);
288 open_req.set_onupgradeneeded(Some(upgrade_closure.as_ref().unchecked_ref()));
289 upgrade_closure.forget();
290
291 let (tx, rx) = futures::channel::oneshot::channel();
293 let tx = std::rc::Rc::new(std::cell::RefCell::new(Some(tx)));
294
295 let success_tx = tx.clone();
297 let success_callback =
298 wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
299 if let Some(tx) = success_tx.borrow_mut().take() {
300 let target = event.target().unwrap();
301 let request: web_sys::IdbOpenDbRequest = target.unchecked_into();
302 let result = request.result().unwrap();
303 let _ = tx.send(Ok(result));
304 }
305 }) as Box<dyn FnMut(_)>);
306
307 let error_tx = tx.clone();
309 let error_callback =
310 wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
311 if let Some(tx) = error_tx.borrow_mut().take() {
312 let _ = tx.send(Err(format!("IndexedDB open failed: {:?}", event)));
313 }
314 }) as Box<dyn FnMut(_)>);
315
316 open_req.set_onsuccess(Some(success_callback.as_ref().unchecked_ref()));
317 open_req.set_onerror(Some(error_callback.as_ref().unchecked_ref()));
318
319 let db_result = rx.await;
320
321 success_callback.forget();
323 error_callback.forget();
324
325 match db_result {
326 Ok(Ok(db_value)) => {
327 #[cfg(target_arch = "wasm32")]
328 log::info!("Successfully opened IndexedDB");
329 if let Ok(db) = db_value.dyn_into::<web_sys::IdbDatabase>() {
330 let store_names = db.object_store_names();
332 #[cfg(target_arch = "wasm32")]
333 log::debug!("Available stores: {:?}", store_names.length());
334
335 if store_names.contains("metadata") {
336 log::debug!("Found metadata store");
337 let transaction = db.transaction_with_str("metadata").map_err(|e| {
338 DatabaseError::new(
339 "TRANSACTION_ERROR",
340 &format!("Failed to create transaction: {:?}", e),
341 )
342 })?;
343 let store = transaction.object_store("metadata").map_err(|e| {
344 DatabaseError::new(
345 "STORE_ERROR",
346 &format!("Failed to access metadata store: {:?}", e),
347 )
348 })?;
349 let commit_key = format!("{}:commit_marker", db_name);
350
351 log::debug!("Looking for key: {}", commit_key);
352 let get_req = store.get(&JsValue::from_str(&commit_key)).map_err(|e| {
353 DatabaseError::new(
354 "GET_ERROR",
355 &format!("Failed to create get request: {:?}", e),
356 )
357 })?;
358
359 let (get_tx, get_rx) = futures::channel::oneshot::channel();
361 let get_tx = std::rc::Rc::new(std::cell::RefCell::new(Some(get_tx)));
362
363 let get_success_tx = get_tx.clone();
364 let get_success_callback = wasm_bindgen::closure::Closure::wrap(Box::new(
365 move |event: web_sys::Event| {
366 if let Some(tx) = get_success_tx.borrow_mut().take() {
367 let target = event.target().unwrap();
368 let request: web_sys::IdbRequest = target.unchecked_into();
369 let result = request.result().unwrap();
370 let _ = tx.send(Ok(result));
371 }
372 },
373 )
374 as Box<dyn FnMut(_)>);
375
376 let get_error_tx = get_tx.clone();
377 let get_error_callback = wasm_bindgen::closure::Closure::wrap(Box::new(
378 move |event: web_sys::Event| {
379 if let Some(tx) = get_error_tx.borrow_mut().take() {
380 let _ = tx.send(Err(format!("Get request failed: {:?}", event)));
381 }
382 },
383 )
384 as Box<dyn FnMut(_)>);
385
386 get_req.set_onsuccess(Some(get_success_callback.as_ref().unchecked_ref()));
387 get_req.set_onerror(Some(get_error_callback.as_ref().unchecked_ref()));
388
389 let get_result = get_rx.await;
390
391 get_success_callback.forget();
393 get_error_callback.forget();
394
395 match get_result {
396 Ok(Ok(result)) => {
397 #[cfg(target_arch = "wasm32")]
398 log::debug!("Get request succeeded");
399 if !result.is_undefined() && !result.is_null() {
400 #[cfg(target_arch = "wasm32")]
401 log::debug!("Result is not null/undefined");
402 if let Some(commit_marker) = result.as_f64() {
403 let commit_u64 = commit_marker as u64;
404 vfs_sync::with_global_commit_marker(|cm| {
405 cm.borrow_mut().insert(db_name.to_string(), commit_u64);
406 });
407 #[cfg(target_arch = "wasm32")]
408 log::debug!(
409 "Restored commit marker {} for {}",
410 commit_u64,
411 db_name
412 );
413
414 #[cfg(target_arch = "wasm32")]
416 log::debug!("About to call restore_blocks_from_indexeddb");
417
418 restore_blocks_from_indexeddb(&db, db_name).await?;
419 log::info!("Successfully restored blocks");
420 return Ok(());
421 } else {
422 #[cfg(target_arch = "wasm32")]
423 log::debug!("Result is not a number: {:?}", result);
424 }
425 } else {
426 #[cfg(target_arch = "wasm32")]
427 log::debug!("Result is null or undefined");
428 }
429 }
430 Ok(Err(e)) => {
431 #[cfg(target_arch = "wasm32")]
432 log::error!("Get request failed: {}", e);
433 }
434 Err(_) => {
435 #[cfg(target_arch = "wasm32")]
436 log::error!("Get request channel failed");
437 }
438 }
439 } else {
440 #[cfg(target_arch = "wasm32")]
441 log::debug!("No metadata store found");
442 }
443 } else {
444 #[cfg(target_arch = "wasm32")]
445 log::error!("Failed to cast to IdbDatabase");
446 }
447 }
448 Ok(Err(e)) => {
449 #[cfg(target_arch = "wasm32")]
450 log::error!("Failed to open IndexedDB: {}", e);
451 }
452 Err(_) => {
453 #[cfg(target_arch = "wasm32")]
454 log::error!("IndexedDB open channel failed");
455 }
456 }
457
458 log::debug!("No commit marker found for {} in IndexedDB", db_name);
459 Ok(())
460}
461
462#[cfg(target_arch = "wasm32")]
464async fn restore_blocks_from_indexeddb(
465 db: &web_sys::IdbDatabase,
466 db_name: &str,
467) -> Result<(), DatabaseError> {
468 use wasm_bindgen::JsCast;
469 use wasm_bindgen::JsValue;
470
471 log::debug!("Restoring blocks for {} from IndexedDB", db_name);
472
473 let store_names = js_sys::Array::new();
475 store_names.push(&JsValue::from_str("blocks"));
476 store_names.push(&JsValue::from_str("metadata"));
477
478 let transaction = db
479 .transaction_with_str_sequence(&store_names)
480 .map_err(|_| DatabaseError::new("INDEXEDDB_ERROR", "Failed to create transaction"))?;
481
482 let blocks_store = transaction
483 .object_store("blocks")
484 .map_err(|_| DatabaseError::new("INDEXEDDB_ERROR", "Failed to get blocks store"))?;
485
486 let metadata_store = transaction
487 .object_store("metadata")
488 .map_err(|_| DatabaseError::new("INDEXEDDB_ERROR", "Failed to get metadata store"))?;
489
490 let key_start = format!("{}:", db_name);
492 let key_end = format!("{}:\u{FFFF}", db_name);
493 #[cfg(target_arch = "wasm32")]
494 web_sys::console::log_1(
495 &format!(
496 "[RESTORE] Searching IndexedDB for keys from '{}' to '{}'",
497 key_start, key_end
498 )
499 .into(),
500 );
501
502 let key_range =
503 web_sys::IdbKeyRange::bound(&JsValue::from_str(&key_start), &JsValue::from_str(&key_end))
504 .map_err(|_| DatabaseError::new("INDEXEDDB_ERROR", "Failed to create key range"))?;
505
506 let blocks_cursor_req = blocks_store
507 .open_cursor_with_range(&key_range)
508 .map_err(|_| DatabaseError::new("INDEXEDDB_ERROR", "Failed to open blocks cursor"))?;
509
510 let (tx, rx) = futures::channel::oneshot::channel::<Result<(), String>>();
512 let tx = std::rc::Rc::new(std::cell::RefCell::new(Some(tx)));
513 let blocks_data = std::rc::Rc::new(std::cell::RefCell::new(Vec::new()));
514
515 let blocks_data_clone = blocks_data.clone();
516 let tx_clone = tx.clone();
517 let success_closure =
518 wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
519 let target = event.target().unwrap();
520 let request: web_sys::IdbRequest = target.unchecked_into();
521 let result = request.result().unwrap();
522
523 if !result.is_null() {
524 let cursor: web_sys::IdbCursorWithValue = result.unchecked_into();
525 let key = cursor.key().unwrap().as_string().unwrap();
526 let value = cursor.value().unwrap();
527
528 #[cfg(target_arch = "wasm32")]
529 web_sys::console::log_1(
530 &format!("[RESTORE] Found key in IndexedDB: {}", key).into(),
531 );
532
533 let parts: Vec<&str> = key.split(':').collect();
535 if parts.len() >= 2 {
536 if let Ok(block_id) = parts[1].parse::<u64>() {
537 if let Ok(array) = value.dyn_into::<js_sys::Uint8Array>() {
539 let mut data = vec![0u8; array.length() as usize];
540 array.copy_to(&mut data);
541 #[cfg(target_arch = "wasm32")]
542 web_sys::console::log_1(
543 &format!("[RESTORE] Block {} has {} bytes", block_id, data.len())
544 .into(),
545 );
546 blocks_data_clone.borrow_mut().push((block_id, data));
547 }
548 }
549 }
550
551 let _ = cursor.continue_();
553 } else {
554 if let Some(sender) = tx_clone.borrow_mut().take() {
556 let _ = sender.send(Ok(()));
557 }
558 }
559 }) as Box<dyn FnMut(_)>);
560
561 blocks_cursor_req.set_onsuccess(Some(success_closure.as_ref().unchecked_ref()));
562 success_closure.forget();
563
564 let _ = rx.await;
566
567 let restored_blocks = blocks_data.borrow().clone();
570 let mut deduped_blocks: HashMap<u64, Vec<u8>> = HashMap::new();
571 for (block_id, data) in &restored_blocks {
572 deduped_blocks.insert(*block_id, data.clone());
573 }
574
575 log::info!(
576 "Restored {} unique blocks from IndexedDB (after deduplication)",
577 deduped_blocks.len()
578 );
579 #[cfg(target_arch = "wasm32")]
580 web_sys::console::log_1(
581 &format!(
582 "[RESTORE] Restored {} unique blocks from IndexedDB for {}",
583 deduped_blocks.len(),
584 db_name
585 )
586 .into(),
587 );
588
589 let total_deduped = deduped_blocks.len();
591 let blocks_written = vfs_sync::with_global_storage(|gs| {
592 let mut storage_map = gs.borrow_mut();
593 let db_storage = storage_map
594 .entry(db_name.to_string())
595 .or_insert_with(HashMap::new);
596 let mut count = 0;
597 for (block_id, data) in deduped_blocks {
598 if !db_storage.contains_key(&block_id) {
599 #[cfg(target_arch = "wasm32")]
600 web_sys::console::log_1(
601 &format!(
602 "[RESTORE] Writing block {} to GLOBAL_STORAGE[{}]",
603 block_id, db_name
604 )
605 .into(),
606 );
607 db_storage.insert(block_id, data);
608 count += 1;
609 } else {
610 #[cfg(target_arch = "wasm32")]
611 web_sys::console::log_1(
612 &format!(
613 "[RESTORE] Skipping block {} - already in GLOBAL_STORAGE",
614 block_id
615 )
616 .into(),
617 );
618 }
619 }
620 count
621 });
622
623 #[cfg(target_arch = "wasm32")]
624 web_sys::console::log_1(
625 &format!(
626 "[RESTORE] Wrote {} new blocks to GLOBAL_STORAGE (skipped {} existing)",
627 blocks_written,
628 total_deduped - blocks_written
629 )
630 .into(),
631 );
632
633 use std::collections::HashSet;
635 let allocated_ids = vfs_sync::with_global_storage(|gs| {
636 let storage_map = gs.borrow();
637 storage_map
638 .get(db_name)
639 .map(|db_storage| db_storage.keys().copied().collect::<HashSet<u64>>())
640 .unwrap_or_default()
641 });
642
643 vfs_sync::with_global_allocation_map(|gam| {
644 gam.borrow_mut()
645 .insert(db_name.to_string(), allocated_ids.clone());
646 });
647 #[cfg(target_arch = "wasm32")]
648 web_sys::console::log_1(
649 &format!(
650 "[RESTORE] Updated allocation map with {} blocks",
651 allocated_ids.len()
652 )
653 .into(),
654 );
655
656 let metadata_cursor_req = metadata_store
659 .open_cursor_with_range(&key_range)
660 .map_err(|_| DatabaseError::new("INDEXEDDB_ERROR", "Failed to open metadata cursor"))?;
661
662 let (meta_tx, meta_rx) = futures::channel::oneshot::channel::<Result<(), String>>();
663 let meta_tx = std::rc::Rc::new(std::cell::RefCell::new(Some(meta_tx)));
664 let metadata_data = std::rc::Rc::new(std::cell::RefCell::new(Vec::new()));
665
666 let metadata_data_clone = metadata_data.clone();
667 let meta_tx_clone = meta_tx.clone();
668 let metadata_success_closure =
669 wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
670 let target = event.target().unwrap();
671 let request: web_sys::IdbRequest = target.unchecked_into();
672 let result = request.result().unwrap();
673
674 if !result.is_null() {
675 let cursor: web_sys::IdbCursorWithValue = result.unchecked_into();
676 let key = cursor.key().unwrap().as_string().unwrap();
677 let value = cursor.value().unwrap();
678
679 if !key.contains("commit_marker") {
681 let parts: Vec<&str> = key.split(':').collect();
682 if parts.len() >= 3 {
683 if let Ok(block_id) = parts[1].parse::<u64>() {
684 if let Ok(version) = parts[2].parse::<u32>() {
685 if let Some(version_f64) = value.as_f64() {
687 metadata_data_clone.borrow_mut().push((
688 block_id,
689 version,
690 version_f64 as u32,
691 ));
692 }
693 }
694 }
695 }
696 }
697
698 let _ = cursor.continue_();
700 } else {
701 if let Some(sender) = meta_tx_clone.borrow_mut().take() {
703 let _ = sender.send(Ok(()));
704 }
705 }
706 }) as Box<dyn FnMut(_)>);
707
708 metadata_cursor_req.set_onsuccess(Some(metadata_success_closure.as_ref().unchecked_ref()));
709 metadata_success_closure.forget();
710
711 let _ = meta_rx.await;
713
714 let restored_metadata = metadata_data.borrow().clone();
715 log::info!(
716 "Restored {} metadata entries from IndexedDB",
717 restored_metadata.len()
718 );
719
720 vfs_sync::with_global_metadata(|gm| {
723 let mut meta_map = gm.borrow_mut();
724 let db_meta = meta_map
725 .entry(db_name.to_string())
726 .or_insert_with(HashMap::new);
727
728 for (block_id, _key_version, stored_version) in &restored_metadata {
729 if let Some((_, data)) = restored_blocks.iter().find(|(bid, _)| bid == block_id) {
731 let checksum = {
732 use std::collections::hash_map::DefaultHasher;
733 use std::hash::{Hash, Hasher};
734 let mut hasher = DefaultHasher::new();
735 data.hash(&mut hasher);
736 hasher.finish()
737 };
738
739 db_meta.insert(
740 *block_id,
741 BlockMetadataPersist {
742 checksum,
743 version: *stored_version,
744 last_modified_ms: 0, algo: ChecksumAlgorithm::FastHash,
746 },
747 );
748 }
749 }
750 });
751
752 log::info!("Successfully restored blocks and metadata for {}", db_name);
753 Ok(())
754}
755
756#[cfg(target_arch = "wasm32")]
758pub async fn persist_to_indexeddb_event_based(
759 db_name: &str,
760 blocks: Vec<(u64, Vec<u8>)>,
761 metadata: Vec<(u64, u64)>,
762 commit_marker: u64,
763 #[cfg(feature = "telemetry")] span_recorder: Option<crate::telemetry::SpanRecorder>,
764 #[cfg(feature = "telemetry")] parent_span_id: Option<String>,
765) -> Result<(), DatabaseError> {
766 use super::retry_logic::with_retry;
767
768 #[cfg(feature = "telemetry")]
770 let span = if span_recorder.is_some() {
771 let mut builder = crate::telemetry::SpanBuilder::new("persist_indexeddb".to_string())
772 .with_attribute("blocks_count", blocks.len().to_string())
773 .with_attribute("metadata_count", metadata.len().to_string());
774
775 if let Some(ref parent_id) = parent_span_id {
776 builder = builder.with_parent(parent_id.clone());
777 }
778
779 Some(builder.build())
780 } else {
781 None
782 };
783
784 let db_name = db_name.to_string();
786 let blocks_clone = blocks.clone();
787 let metadata_clone = metadata.clone();
788
789 let result = with_retry("persist_to_indexeddb", || {
790 let db_name = db_name.clone();
791 let blocks = blocks_clone.clone();
792 let metadata = metadata_clone.clone();
793 async move {
794 persist_to_indexeddb_event_based_internal(&db_name, blocks, metadata, commit_marker)
795 .await
796 }
797 })
798 .await;
799
800 #[cfg(feature = "telemetry")]
802 if let Some(mut s) = span {
803 s.end_time_ms = Some(js_sys::Date::now());
804 let duration_ms = s.end_time_ms.unwrap() - s.start_time_ms;
805 s.attributes
806 .insert("duration_ms".to_string(), duration_ms.to_string());
807
808 if result.is_ok() {
809 s.status = crate::telemetry::SpanStatus::Ok;
810 } else {
811 s.status =
812 crate::telemetry::SpanStatus::Error("IndexedDB persistence failed".to_string());
813 }
814
815 if let Some(recorder) = span_recorder {
816 recorder.record_span(s);
817 }
818 }
819
820 result
821}
822
823#[cfg(target_arch = "wasm32")]
825async fn persist_to_indexeddb_event_based_internal(
826 db_name: &str,
827 blocks: Vec<(u64, Vec<u8>)>,
828 metadata: Vec<(u64, u64)>,
829 commit_marker: u64,
830) -> Result<(), DatabaseError> {
831 use wasm_bindgen::JsCast;
832 use wasm_bindgen::closure::Closure;
833
834 let mutex = INDEXEDDB_MUTEX.with(|m| m.borrow().clone());
837 log::debug!("PERSIST: Acquiring IndexedDB mutex...");
838 let _guard = mutex.lock().await;
839 log::debug!("PERSIST: Mutex acquired, proceeding with persistence");
840
841 log::debug!("persist_to_indexeddb_event_based starting");
842
843 let open_req = open_indexeddb("block_storage", 2)?;
845 log::info!("Created open request for block_storage version 2");
846
847 let upgrade_closure = Closure::wrap(Box::new(move |event: web_sys::Event| {
849 log::debug!("IndexedDB upgrade handler called");
850
851 match (|| -> Result<(), Box<dyn std::error::Error>> {
852 let target = event.target().ok_or("No event target")?;
853 log::debug!("Got event target in upgrade handler");
854
855 let request: web_sys::IdbOpenDbRequest = target
856 .dyn_into()
857 .map_err(|_| "Failed to cast to IdbOpenDbRequest")?;
858 log::debug!("Cast to IdbOpenDbRequest in upgrade handler");
859
860 let result = request
861 .result()
862 .map_err(|_| "Failed to get result from request")?;
863 log::debug!("Got result from request in upgrade handler");
864
865 let db: web_sys::IdbDatabase = result
866 .dyn_into()
867 .map_err(|_| "Failed to cast result to IdbDatabase")?;
868 log::debug!("Cast result to IdbDatabase in upgrade handler");
869
870 if !db.object_store_names().contains("blocks") {
871 db.create_object_store("blocks")
872 .map_err(|_| "Failed to create blocks store")?;
873 log::info!("Created blocks object store");
874 }
875 if !db.object_store_names().contains("metadata") {
876 db.create_object_store("metadata")
877 .map_err(|_| "Failed to create metadata store")?;
878 log::info!("Created metadata object store");
879 }
880
881 log::info!("Upgrade handler completed successfully");
882 Ok(())
883 })() {
884 Ok(_) => {}
885 Err(e) => {
886 log::error!("Upgrade handler error: {}", e);
887 }
888 }
889 }) as Box<dyn FnMut(_)>);
890 open_req.set_onupgradeneeded(Some(upgrade_closure.as_ref().unchecked_ref()));
891 upgrade_closure.forget();
892
893 let (open_tx, open_rx) = oneshot::channel();
895 let open_tx = std::rc::Rc::new(std::cell::RefCell::new(Some(open_tx)));
896
897 let success_closure = {
898 let open_tx = open_tx.clone();
899 Closure::wrap(Box::new(move |event: web_sys::Event| {
900 log::info!("IndexedDB open success handler called");
901 if let Some(sender) = open_tx.borrow_mut().take() {
902 log::debug!("Got sender from RefCell");
903 let target = event.target().unwrap();
904 log::debug!("Got event target");
905 let request: web_sys::IdbOpenDbRequest = target.dyn_into().unwrap();
906 log::debug!("Cast to IdbOpenDbRequest");
907 let result = request.result().unwrap();
908 log::debug!("Got result from request");
909 let db: web_sys::IdbDatabase = result.dyn_into().unwrap();
910 log::debug!("Cast result to IdbDatabase");
911 log::debug!("Sending database to channel");
912 let send_result = sender.send(Ok(db));
913 log::debug!("Channel send result: {:?}", send_result.is_ok());
914 } else {
915 log::debug!("No sender available in RefCell");
916 }
917 }) as Box<dyn FnMut(_)>)
918 };
919
920 let error_closure = {
921 let open_tx = open_tx.clone();
922 Closure::wrap(Box::new(move |_event: web_sys::Event| {
923 log::error!("IndexedDB open error handler called");
924 if let Some(sender) = open_tx.borrow_mut().take() {
925 let _ = sender.send(Err("Failed to open IndexedDB".to_string()));
926 }
927 }) as Box<dyn FnMut(_)>)
928 };
929
930 open_req.set_onsuccess(Some(success_closure.as_ref().unchecked_ref()));
931 open_req.set_onerror(Some(error_closure.as_ref().unchecked_ref()));
932 success_closure.forget();
933 error_closure.forget();
934
935 log::debug!("About to await open_rx channel");
936 let db = match open_rx.await {
937 Ok(Ok(db)) => {
938 log::info!("Successfully received database from channel");
939 db
940 }
941 Ok(Err(e)) => {
942 log::error!("Database open error: {}", e);
943 return Err(DatabaseError::new("INDEXEDDB_ERROR", &e));
944 }
945 Err(_) => {
946 log::error!("Channel error while waiting for database");
947 return Err(DatabaseError::new("INDEXEDDB_ERROR", "Channel error"));
948 }
949 };
950
951 log::debug!("Starting IndexedDB transaction");
952
953 let store_names_list = db.object_store_names();
955 log::debug!("Available object stores: {}", store_names_list.length());
956 for i in 0..store_names_list.length() {
957 if let Some(name) = store_names_list.get(i) {
958 log::debug!("Store {}: {:?}", i, name);
959 }
960 }
961
962 if !store_names_list.contains("blocks") || !store_names_list.contains("metadata") {
964 log::debug!("Required object stores missing, cannot create transaction");
965 return Err(DatabaseError::new(
966 "INDEXEDDB_ERROR",
967 "Required object stores not found",
968 ));
969 }
970
971 super::indexeddb_queue::acquire_indexeddb_slot().await;
973 log::info!("Acquired IndexedDB transaction slot");
974
975 struct SlotGuard;
977 impl Drop for SlotGuard {
978 fn drop(&mut self) {
979 super::indexeddb_queue::release_indexeddb_slot();
980 web_sys::console::log_1(&"[GUARD] Released IndexedDB slot via guard".into());
981 }
982 }
983 let _slot_guard = SlotGuard;
984
985 let store_names = js_sys::Array::new();
987 store_names.push(&"blocks".into());
988 store_names.push(&"metadata".into());
989 let transaction = db
990 .transaction_with_str_sequence_and_mode(
991 &store_names,
992 web_sys::IdbTransactionMode::Readwrite,
993 )
994 .map_err(|e| {
995 DatabaseError::new(
996 "TRANSACTION_ERROR",
997 &format!("Failed to create transaction: {:?}", e),
998 )
999 })?;
1000 log::info!("Created IndexedDB transaction");
1001
1002 let blocks_store = transaction.object_store("blocks").map_err(|e| {
1003 DatabaseError::new(
1004 "STORE_ERROR",
1005 &format!("Failed to access blocks store: {:?}", e),
1006 )
1007 })?;
1008 let metadata_store = transaction.object_store("metadata").map_err(|e| {
1009 DatabaseError::new(
1010 "STORE_ERROR",
1011 &format!("Failed to access metadata store: {:?}", e),
1012 )
1013 })?;
1014
1015 for (block_id, block_data) in &blocks {
1017 if let Some((_, version)) = metadata.iter().find(|(id, _)| *id == *block_id) {
1019 let key = format!("{}:{}:{}", db_name, block_id, version);
1020 let value = js_sys::Uint8Array::from(&block_data[..]);
1021 #[cfg(target_arch = "wasm32")]
1022 {
1023 log::debug!("Storing block with idempotent key: {}", key);
1024 web_sys::console::log_1(
1025 &format!("[PERSIST] Writing block to IndexedDB with key: {}", key).into(),
1026 );
1027 }
1028 let _ = blocks_store.put_with_key(&value, &key.into());
1029 }
1030 }
1031
1032 for (block_id, version) in metadata {
1034 let key = format!("{}:{}:{}", db_name, block_id, version);
1035 let value = js_sys::Number::from(version as f64);
1036 #[cfg(target_arch = "wasm32")]
1037 log::debug!("Storing metadata with idempotent key: {}", key);
1038 let _ = metadata_store.put_with_key(&value, &key.into());
1039 }
1040
1041 let commit_key = format!("{}:commit_marker", db_name);
1043 let commit_value = js_sys::Number::from(commit_marker as f64);
1044 let _ = metadata_store.put_with_key(&commit_value, &commit_key.into());
1045
1046 let (tx_tx, tx_rx) = oneshot::channel();
1048 let tx_tx = std::rc::Rc::new(std::cell::RefCell::new(Some(tx_tx)));
1049
1050 let complete_closure = {
1051 let tx_tx = tx_tx.clone();
1052 Closure::wrap(Box::new(move |_event: web_sys::Event| {
1053 if let Some(sender) = tx_tx.borrow_mut().take() {
1054 let _ = sender.send(Ok(()));
1055 }
1056 }) as Box<dyn FnMut(_)>)
1057 };
1058
1059 let tx_error_closure = {
1060 let tx_tx = tx_tx.clone();
1061 Closure::wrap(Box::new(move |_event: web_sys::Event| {
1062 if let Some(sender) = tx_tx.borrow_mut().take() {
1063 let _ = sender.send(Err("Transaction failed".to_string()));
1064 }
1065 }) as Box<dyn FnMut(_)>)
1066 };
1067
1068 transaction.set_oncomplete(Some(complete_closure.as_ref().unchecked_ref()));
1069 transaction.set_onerror(Some(tx_error_closure.as_ref().unchecked_ref()));
1070
1071 let result = match tx_rx.await {
1076 Ok(Ok(())) => {
1077 log::info!("IndexedDB persistence completed successfully");
1078 Ok(())
1079 }
1080 Ok(Err(e)) => Err(DatabaseError::new("INDEXEDDB_ERROR", &e)),
1081 Err(_) => Err(DatabaseError::new("INDEXEDDB_ERROR", "Channel error")),
1082 };
1083
1084 drop(complete_closure);
1086 drop(tx_error_closure);
1087
1088 db.close();
1091 log::debug!("Closed IndexedDB connection after transaction completion");
1092
1093 result
1097}
1098
1099#[cfg(target_arch = "wasm32")]
1101pub async fn sync_async(storage: &BlockStorage) -> Result<(), DatabaseError> {
1102 log::debug!("Using ASYNC sync_async method");
1103 let current_commit = vfs_sync::with_global_commit_marker(|cm| {
1105 let cm = cm;
1106 cm.borrow().get(&storage.db_name).copied().unwrap_or(0)
1107 });
1108
1109 let next_commit = current_commit + 1;
1110 log::debug!(
1111 "Current commit marker for {}: {}",
1112 storage.db_name,
1113 current_commit
1114 );
1115 log::debug!(
1116 "Next commit marker for {}: {}",
1117 storage.db_name,
1118 next_commit
1119 );
1120
1121 let mut to_persist = Vec::new();
1123 let mut metadata_to_persist = Vec::new();
1124
1125 let cache_snapshot: Vec<(u64, Vec<u8>)> = lock_mutex!(storage.cache)
1127 .iter()
1128 .map(|(k, v)| (*k, v.clone()))
1129 .collect();
1130
1131 for (block_id, block_data) in cache_snapshot {
1132 let should_update = vfs_sync::with_global_storage(|storage_global| {
1133 let storage_global = storage_global;
1134 if let Some(db_storage) = storage_global.borrow().get(&storage.db_name) {
1135 if let Some(existing_data) = db_storage.get(&block_id) {
1136 let existing_non_zero = existing_data.iter().filter(|&&b| b != 0).count();
1138 let cache_non_zero = block_data.iter().filter(|&&b| b != 0).count();
1139
1140 if cache_non_zero > existing_non_zero {
1141 #[cfg(target_arch = "wasm32")]
1142 web_sys::console::log_1(&format!(
1143 "DEBUG: SYNC updating committed block {} with richer cache data - existing: {}, cache: {}",
1144 block_id,
1145 existing_data.iter().take(8).map(|b| format!("{:02x}", b)).collect::<Vec<_>>().join(" "),
1146 block_data.iter().take(8).map(|b| format!("{:02x}", b)).collect::<Vec<_>>().join(" ")
1147 ).into());
1148 true
1149 } else {
1150 #[cfg(target_arch = "wasm32")]
1151 web_sys::console::log_1(&format!(
1152 "DEBUG: SYNC preserving committed block {} - existing: {}, cache: {} - SKIPPING",
1153 block_id,
1154 existing_data.iter().take(8).map(|b| format!("{:02x}", b)).collect::<Vec<_>>().join(" "),
1155 block_data.iter().take(8).map(|b| format!("{:02x}", b)).collect::<Vec<_>>().join(" ")
1156 ).into());
1157 false
1158 }
1159 } else {
1160 true }
1162 } else {
1163 true }
1165 });
1166
1167 if should_update {
1168 to_persist.push((block_id, block_data.clone()));
1169 }
1170
1171 metadata_to_persist.push((block_id, next_commit));
1173 #[cfg(target_arch = "wasm32")]
1174 log::debug!(
1175 "SYNC updating metadata for block {} to version {}",
1176 block_id,
1177 next_commit
1178 );
1179 }
1180
1181 vfs_sync::with_global_storage(|storage_global| {
1183 let mut guard = storage_global.borrow_mut();
1184 let db_storage = guard
1185 .entry(storage.db_name.clone())
1186 .or_insert_with(std::collections::HashMap::new);
1187 for (block_id, block_data) in &to_persist {
1188 db_storage.insert(*block_id, block_data.clone());
1189 }
1190 });
1191
1192 vfs_sync::with_global_metadata(|metadata| {
1194 let mut guard = metadata.borrow_mut();
1195 let db_metadata = guard
1196 .entry(storage.db_name.clone())
1197 .or_insert_with(std::collections::HashMap::new);
1198 for (block_id, version) in &metadata_to_persist {
1199 db_metadata.insert(
1200 *block_id,
1201 BlockMetadataPersist {
1202 version: *version as u32,
1203 checksum: 0,
1204 algo: ChecksumAlgorithm::FastHash,
1205 last_modified_ms: js_sys::Date::now() as u64,
1206 },
1207 );
1208 }
1209 });
1210
1211 vfs_sync::with_global_commit_marker(|cm| {
1213 let cm_map = cm;
1214 cm_map
1215 .borrow_mut()
1216 .insert(storage.db_name.clone(), next_commit);
1217 });
1218
1219 if !to_persist.is_empty() {
1221 #[cfg(target_arch = "wasm32")]
1222 log::debug!(
1223 "Awaiting IndexedDB persistence for {} blocks",
1224 to_persist.len()
1225 );
1226 persist_to_indexeddb_event_based(
1227 &storage.db_name,
1228 to_persist,
1229 metadata_to_persist,
1230 next_commit,
1231 #[cfg(feature = "telemetry")]
1232 None,
1233 #[cfg(feature = "telemetry")]
1234 None,
1235 )
1236 .await?;
1237 }
1238
1239 {
1241 let mut dirty = lock_mutex!(storage.dirty_blocks);
1242 dirty.clear();
1243 }
1244
1245 Ok(())
1246}
1247
1248#[cfg(target_arch = "wasm32")]
1251pub async fn persist_to_indexeddb(
1252 db_name: &str,
1253 blocks: std::collections::HashMap<u64, Vec<u8>>,
1254 metadata: Vec<(u64, u64)>,
1255) -> Result<(), DatabaseError> {
1256 log::debug!("persist_to_indexeddb called for {} blocks", blocks.len());
1257
1258 let blocks_vec: Vec<(u64, Vec<u8>)> = blocks.into_iter().collect();
1260 log::debug!(
1261 "Converted HashMap to Vec, now have {} block entries",
1262 blocks_vec.len()
1263 );
1264
1265 log::debug!("About to call persist_to_indexeddb_event_based");
1266
1267 let result = persist_to_indexeddb_event_based(
1270 db_name,
1271 blocks_vec,
1272 metadata,
1273 0,
1274 #[cfg(feature = "telemetry")]
1275 None,
1276 #[cfg(feature = "telemetry")]
1277 None,
1278 )
1279 .await;
1280
1281 log::debug!(
1282 "persist_to_indexeddb_event_based completed with result: {:?}",
1283 result.is_ok()
1284 );
1285
1286 result
1287}
1288
1289#[cfg(target_arch = "wasm32")]
1292pub async fn delete_blocks_from_indexeddb(
1293 db_name: &str,
1294 block_ids: &[u64],
1295) -> Result<(), DatabaseError> {
1296 use futures::channel::oneshot;
1297 use wasm_bindgen::JsCast;
1298 use wasm_bindgen::closure::Closure;
1299
1300 if block_ids.is_empty() {
1301 return Ok(());
1302 }
1303
1304 log::debug!(
1305 "delete_blocks_from_indexeddb - deleting {} blocks for {}",
1306 block_ids.len(),
1307 db_name
1308 );
1309
1310 let open_req = open_indexeddb("block_storage", 2)?;
1311
1312 let upgrade_closure = Closure::wrap(Box::new(move |event: web_sys::Event| {
1314 log::debug!("IndexedDB upgrade handler called during delete");
1315
1316 match (|| -> Result<(), Box<dyn std::error::Error>> {
1317 let target = event.target().ok_or("No event target")?;
1318 let request: web_sys::IdbOpenDbRequest = target
1319 .dyn_into()
1320 .map_err(|_| "Failed to cast to IdbOpenDbRequest")?;
1321 let result = request
1322 .result()
1323 .map_err(|_| "Failed to get result from request")?;
1324 let db: web_sys::IdbDatabase = result
1325 .dyn_into()
1326 .map_err(|_| "Failed to cast result to IdbDatabase")?;
1327
1328 if !db.object_store_names().contains("blocks") {
1329 db.create_object_store("blocks")
1330 .map_err(|_| "Failed to create blocks store")?;
1331 }
1332 if !db.object_store_names().contains("metadata") {
1333 db.create_object_store("metadata")
1334 .map_err(|_| "Failed to create metadata store")?;
1335 }
1336
1337 Ok(())
1338 })() {
1339 Ok(_) => {}
1340 Err(e) => {
1341 log::error!("Upgrade handler error: {}", e);
1342 }
1343 }
1344 }) as Box<dyn FnMut(_)>);
1345 open_req.set_onupgradeneeded(Some(upgrade_closure.as_ref().unchecked_ref()));
1346 upgrade_closure.forget();
1347
1348 let (open_tx, open_rx) = oneshot::channel();
1350 let open_tx = std::rc::Rc::new(std::cell::RefCell::new(Some(open_tx)));
1351
1352 let success_closure = {
1353 let open_tx = open_tx.clone();
1354 Closure::wrap(Box::new(move |event: web_sys::Event| {
1355 if let Some(sender) = open_tx.borrow_mut().take() {
1356 let target = event.target().unwrap();
1357 let request: web_sys::IdbOpenDbRequest = target.dyn_into().unwrap();
1358 let result = request.result().unwrap();
1359 let db: web_sys::IdbDatabase = result.dyn_into().unwrap();
1360 let _ = sender.send(Ok(db));
1361 }
1362 }) as Box<dyn FnMut(_)>)
1363 };
1364
1365 let error_closure = {
1366 let open_tx = open_tx.clone();
1367 Closure::wrap(Box::new(move |_event: web_sys::Event| {
1368 if let Some(sender) = open_tx.borrow_mut().take() {
1369 let _ = sender.send(Err("Failed to open IndexedDB".to_string()));
1370 }
1371 }) as Box<dyn FnMut(_)>)
1372 };
1373
1374 open_req.set_onsuccess(Some(success_closure.as_ref().unchecked_ref()));
1375 open_req.set_onerror(Some(error_closure.as_ref().unchecked_ref()));
1376 success_closure.forget();
1377 error_closure.forget();
1378
1379 let db = match open_rx.await {
1380 Ok(Ok(db)) => db,
1381 Ok(Err(e)) => return Err(DatabaseError::new("INDEXEDDB_ERROR", &e)),
1382 Err(_) => return Err(DatabaseError::new("INDEXEDDB_ERROR", "Channel error")),
1383 };
1384
1385 super::indexeddb_queue::acquire_indexeddb_slot().await;
1387 log::info!("Acquired IndexedDB transaction slot for delete");
1388
1389 struct SlotGuard;
1391 impl Drop for SlotGuard {
1392 fn drop(&mut self) {
1393 super::indexeddb_queue::release_indexeddb_slot();
1394 web_sys::console::log_1(&"[GUARD] Released IndexedDB slot via guard (delete)".into());
1395 }
1396 }
1397 let _slot_guard = SlotGuard;
1398
1399 let store_names = js_sys::Array::new();
1401 store_names.push(&"blocks".into());
1402 store_names.push(&"metadata".into());
1403 let transaction = db
1404 .transaction_with_str_sequence_and_mode(
1405 &store_names,
1406 web_sys::IdbTransactionMode::Readwrite,
1407 )
1408 .map_err(|_| {
1409 DatabaseError::new("INDEXEDDB_ERROR", "Failed to create delete transaction")
1410 })?;
1411
1412 let blocks_store = transaction
1413 .object_store("blocks")
1414 .map_err(|_| DatabaseError::new("INDEXEDDB_ERROR", "Failed to get blocks store"))?;
1415 let metadata_store = transaction
1416 .object_store("metadata")
1417 .map_err(|_| DatabaseError::new("INDEXEDDB_ERROR", "Failed to get metadata store"))?;
1418
1419 for block_id in block_ids {
1422 let key_prefix_start = format!("{}:{}:", db_name, block_id);
1425 let key_prefix_end = format!("{}:{}:\u{FFFF}", db_name, block_id);
1426
1427 let key_range =
1428 web_sys::IdbKeyRange::bound(&key_prefix_start.into(), &key_prefix_end.into()).map_err(
1429 |_| {
1430 DatabaseError::new("INDEXEDDB_ERROR", "Failed to create key range for deletion")
1431 },
1432 )?;
1433
1434 let blocks_cursor_req = blocks_store
1436 .open_cursor_with_range(&key_range)
1437 .map_err(|_| {
1438 DatabaseError::new("INDEXEDDB_ERROR", "Failed to open cursor for deletion")
1439 })?;
1440
1441 let (delete_tx, delete_rx) = oneshot::channel::<Result<(), String>>();
1443 let delete_tx = std::rc::Rc::new(std::cell::RefCell::new(Some(delete_tx)));
1444
1445 let delete_closure = {
1446 let delete_tx = delete_tx.clone();
1447 Closure::wrap(Box::new(move |event: web_sys::Event| {
1448 let target = event.target().unwrap();
1449 let request: web_sys::IdbRequest = target.unchecked_into();
1450 let result = request.result().unwrap();
1451
1452 if !result.is_null() {
1453 let cursor: web_sys::IdbCursorWithValue = result.unchecked_into();
1454
1455 #[cfg(target_arch = "wasm32")]
1456 {
1457 if let Ok(key) = cursor.key() {
1458 if let Some(key_str) = key.as_string() {
1459 web_sys::console::log_1(
1460 &format!("[DELETE] Deleting key: {}", key_str).into(),
1461 );
1462 }
1463 }
1464 }
1465
1466 let _ = cursor.delete();
1468
1469 let _ = cursor.continue_();
1471 } else {
1472 if let Some(sender) = delete_tx.borrow_mut().take() {
1474 let _ = sender.send(Ok(()));
1475 }
1476 }
1477 }) as Box<dyn FnMut(_)>)
1478 };
1479
1480 blocks_cursor_req.set_onsuccess(Some(delete_closure.as_ref().unchecked_ref()));
1481 delete_closure.forget();
1482
1483 let _ = delete_rx.await;
1485
1486 let metadata_cursor_req =
1488 metadata_store
1489 .open_cursor_with_range(&key_range)
1490 .map_err(|_| {
1491 DatabaseError::new(
1492 "INDEXEDDB_ERROR",
1493 "Failed to open metadata cursor for deletion",
1494 )
1495 })?;
1496
1497 let (meta_delete_tx, meta_delete_rx) = oneshot::channel::<Result<(), String>>();
1498 let meta_delete_tx = std::rc::Rc::new(std::cell::RefCell::new(Some(meta_delete_tx)));
1499
1500 let meta_delete_closure = {
1501 let meta_delete_tx = meta_delete_tx.clone();
1502 Closure::wrap(Box::new(move |event: web_sys::Event| {
1503 let target = event.target().unwrap();
1504 let request: web_sys::IdbRequest = target.unchecked_into();
1505 let result = request.result().unwrap();
1506
1507 if !result.is_null() {
1508 let cursor: web_sys::IdbCursorWithValue = result.unchecked_into();
1509
1510 let _ = cursor.delete();
1512
1513 let _ = cursor.continue_();
1515 } else {
1516 if let Some(sender) = meta_delete_tx.borrow_mut().take() {
1518 let _ = sender.send(Ok(()));
1519 }
1520 }
1521 }) as Box<dyn FnMut(_)>)
1522 };
1523
1524 metadata_cursor_req.set_onsuccess(Some(meta_delete_closure.as_ref().unchecked_ref()));
1525 meta_delete_closure.forget();
1526
1527 let _ = meta_delete_rx.await;
1529
1530 log::debug!("Deleted block {} (all versions) from IndexedDB", block_id);
1531 }
1532
1533 let (tx_tx, tx_rx) = oneshot::channel();
1535 let tx_tx = std::rc::Rc::new(std::cell::RefCell::new(Some(tx_tx)));
1536
1537 let complete_closure = {
1538 let tx_tx = tx_tx.clone();
1539 Closure::wrap(Box::new(move |_event: web_sys::Event| {
1540 if let Some(sender) = tx_tx.borrow_mut().take() {
1541 let _ = sender.send(Ok(()));
1542 }
1543 }) as Box<dyn FnMut(_)>)
1544 };
1545
1546 let tx_error_closure = {
1547 let tx_tx = tx_tx.clone();
1548 Closure::wrap(Box::new(move |_event: web_sys::Event| {
1549 if let Some(sender) = tx_tx.borrow_mut().take() {
1550 let _ = sender.send(Err("Delete transaction failed".to_string()));
1551 }
1552 }) as Box<dyn FnMut(_)>)
1553 };
1554
1555 transaction.set_oncomplete(Some(complete_closure.as_ref().unchecked_ref()));
1556 transaction.set_onerror(Some(tx_error_closure.as_ref().unchecked_ref()));
1557 complete_closure.forget();
1558 tx_error_closure.forget();
1559
1560 match tx_rx.await {
1561 Ok(Ok(())) => {
1562 log::info!(
1563 "Successfully deleted {} blocks from IndexedDB",
1564 block_ids.len()
1565 );
1566 Ok(())
1567 }
1568 Ok(Err(e)) => Err(DatabaseError::new("INDEXEDDB_ERROR", &e)),
1569 Err(_) => Err(DatabaseError::new(
1570 "INDEXEDDB_ERROR",
1571 "Channel error during deletion",
1572 )),
1573 }
1574}
1575
1576#[cfg(target_arch = "wasm32")]
1594pub async fn delete_all_database_blocks_from_indexeddb(db_name: &str) -> Result<(), DatabaseError> {
1595 use futures::channel::oneshot;
1596 use wasm_bindgen::JsCast;
1597 use wasm_bindgen::closure::Closure;
1598
1599 log::info!(
1600 "[DELETE_ALL] Starting deletion of ALL blocks for database: {}",
1601 db_name
1602 );
1603 web_sys::console::log_1(
1604 &format!(
1605 "[DELETE_ALL] Deleting all IndexedDB entries for: {}",
1606 db_name
1607 )
1608 .into(),
1609 );
1610
1611 let open_req = open_indexeddb("block_storage", 2)?;
1612
1613 let upgrade_closure = Closure::wrap(Box::new(move |event: web_sys::Event| {
1615 match (|| -> Result<(), Box<dyn std::error::Error>> {
1616 let target = event.target().ok_or("No event target")?;
1617 let request: web_sys::IdbOpenDbRequest =
1618 target.dyn_into().map_err(|_| "Cast failed")?;
1619 let result = request.result().map_err(|_| "No result")?;
1620 let db: web_sys::IdbDatabase = result
1621 .dyn_into()
1622 .map_err(|_| "Cast to IdbDatabase failed")?;
1623
1624 if !db.object_store_names().contains("blocks") {
1625 db.create_object_store("blocks")
1626 .map_err(|_| "Create blocks store failed")?;
1627 }
1628 if !db.object_store_names().contains("metadata") {
1629 db.create_object_store("metadata")
1630 .map_err(|_| "Create metadata store failed")?;
1631 }
1632 Ok(())
1633 })() {
1634 Ok(_) => {}
1635 Err(e) => log::error!("[DELETE_ALL] Upgrade handler error: {}", e),
1636 }
1637 }) as Box<dyn FnMut(_)>);
1638 open_req.set_onupgradeneeded(Some(upgrade_closure.as_ref().unchecked_ref()));
1639 upgrade_closure.forget();
1640
1641 let (open_tx, open_rx) = oneshot::channel();
1643 let open_tx = std::rc::Rc::new(std::cell::RefCell::new(Some(open_tx)));
1644
1645 let success_closure = {
1646 let open_tx = open_tx.clone();
1647 Closure::wrap(Box::new(move |event: web_sys::Event| {
1648 if let Some(sender) = open_tx.borrow_mut().take() {
1649 let target = event.target().unwrap();
1650 let request: web_sys::IdbOpenDbRequest = target.dyn_into().unwrap();
1651 let result = request.result().unwrap();
1652 let db: web_sys::IdbDatabase = result.dyn_into().unwrap();
1653 let _ = sender.send(Ok(db));
1654 }
1655 }) as Box<dyn FnMut(_)>)
1656 };
1657
1658 let error_closure = {
1659 let open_tx = open_tx.clone();
1660 Closure::wrap(Box::new(move |_event: web_sys::Event| {
1661 if let Some(sender) = open_tx.borrow_mut().take() {
1662 let _ = sender.send(Err("Failed to open IndexedDB".to_string()));
1663 }
1664 }) as Box<dyn FnMut(_)>)
1665 };
1666
1667 open_req.set_onsuccess(Some(success_closure.as_ref().unchecked_ref()));
1668 open_req.set_onerror(Some(error_closure.as_ref().unchecked_ref()));
1669 success_closure.forget();
1670 error_closure.forget();
1671
1672 let db = match open_rx.await {
1673 Ok(Ok(db)) => db,
1674 Ok(Err(e)) => return Err(DatabaseError::new("INDEXEDDB_ERROR", &e)),
1675 Err(_) => {
1676 return Err(DatabaseError::new(
1677 "INDEXEDDB_ERROR",
1678 "Channel error opening DB",
1679 ));
1680 }
1681 };
1682
1683 super::indexeddb_queue::acquire_indexeddb_slot().await;
1685 web_sys::console::log_1(&"[DELETE_ALL] Acquired IndexedDB slot".into());
1686
1687 struct SlotGuard;
1689 impl Drop for SlotGuard {
1690 fn drop(&mut self) {
1691 super::indexeddb_queue::release_indexeddb_slot();
1692 web_sys::console::log_1(&"[DELETE_ALL] Released IndexedDB slot via guard".into());
1693 }
1694 }
1695 let _slot_guard = SlotGuard;
1696
1697 let store_names = js_sys::Array::new();
1699 store_names.push(&"blocks".into());
1700 store_names.push(&"metadata".into());
1701 let transaction = db
1702 .transaction_with_str_sequence_and_mode(
1703 &store_names,
1704 web_sys::IdbTransactionMode::Readwrite,
1705 )
1706 .map_err(|_| DatabaseError::new("INDEXEDDB_ERROR", "Failed to create transaction"))?;
1707
1708 let blocks_store = transaction
1709 .object_store("blocks")
1710 .map_err(|_| DatabaseError::new("INDEXEDDB_ERROR", "Failed to get blocks store"))?;
1711 let metadata_store = transaction
1712 .object_store("metadata")
1713 .map_err(|_| DatabaseError::new("INDEXEDDB_ERROR", "Failed to get metadata store"))?;
1714
1715 let key_prefix_start = format!("{}:", db_name);
1718 let key_prefix_end = format!("{}:\u{FFFF}", db_name);
1719
1720 let key_range = web_sys::IdbKeyRange::bound(
1721 &key_prefix_start.clone().into(),
1722 &key_prefix_end.clone().into(),
1723 )
1724 .map_err(|_| DatabaseError::new("INDEXEDDB_ERROR", "Failed to create key range"))?;
1725
1726 web_sys::console::log_1(
1727 &format!(
1728 "[DELETE_ALL] Key range: {} to {}",
1729 key_prefix_start, key_prefix_end
1730 )
1731 .into(),
1732 );
1733
1734 let blocks_cursor_req = blocks_store
1736 .open_cursor_with_range(&key_range)
1737 .map_err(|_| DatabaseError::new("INDEXEDDB_ERROR", "Failed to open blocks cursor"))?;
1738
1739 let (blocks_tx, blocks_rx) = oneshot::channel::<Result<u32, String>>();
1740 let blocks_tx = std::rc::Rc::new(std::cell::RefCell::new(Some(blocks_tx)));
1741 let blocks_deleted = std::rc::Rc::new(std::cell::RefCell::new(0u32));
1742
1743 let blocks_closure = {
1744 let blocks_tx = blocks_tx.clone();
1745 let blocks_deleted = blocks_deleted.clone();
1746 Closure::wrap(Box::new(move |event: web_sys::Event| {
1747 let target = event.target().unwrap();
1748 let request: web_sys::IdbRequest = target.unchecked_into();
1749 let result = request.result().unwrap();
1750
1751 if !result.is_null() && !result.is_undefined() {
1752 let cursor: web_sys::IdbCursorWithValue = result.unchecked_into();
1753
1754 if let Ok(key) = cursor.key() {
1756 if let Some(key_str) = key.as_string() {
1757 web_sys::console::log_1(
1758 &format!("[DELETE_ALL] Deleting block key: {}", key_str).into(),
1759 );
1760 }
1761 }
1762
1763 let _ = cursor.delete();
1765 *blocks_deleted.borrow_mut() += 1;
1766 let _ = cursor.continue_();
1767 } else {
1768 let count = *blocks_deleted.borrow();
1770 if let Some(sender) = blocks_tx.borrow_mut().take() {
1771 let _ = sender.send(Ok(count));
1772 }
1773 }
1774 }) as Box<dyn FnMut(_)>)
1775 };
1776
1777 let blocks_error_closure = {
1778 let blocks_tx = blocks_tx.clone();
1779 Closure::wrap(Box::new(move |_event: web_sys::Event| {
1780 if let Some(sender) = blocks_tx.borrow_mut().take() {
1781 let _ = sender.send(Err("Blocks cursor error".to_string()));
1782 }
1783 }) as Box<dyn FnMut(_)>)
1784 };
1785
1786 blocks_cursor_req.set_onsuccess(Some(blocks_closure.as_ref().unchecked_ref()));
1787 blocks_cursor_req.set_onerror(Some(blocks_error_closure.as_ref().unchecked_ref()));
1788 blocks_closure.forget();
1789 blocks_error_closure.forget();
1790
1791 let blocks_result = blocks_rx.await;
1792 let blocks_count = match blocks_result {
1793 Ok(Ok(count)) => count,
1794 Ok(Err(e)) => {
1795 log::error!("[DELETE_ALL] Error deleting blocks: {}", e);
1796 0
1797 }
1798 Err(_) => 0,
1799 };
1800
1801 web_sys::console::log_1(&format!("[DELETE_ALL] Deleted {} blocks", blocks_count).into());
1802
1803 let metadata_cursor_req = metadata_store
1805 .open_cursor_with_range(&key_range)
1806 .map_err(|_| DatabaseError::new("INDEXEDDB_ERROR", "Failed to open metadata cursor"))?;
1807
1808 let (meta_tx, meta_rx) = oneshot::channel::<Result<u32, String>>();
1809 let meta_tx = std::rc::Rc::new(std::cell::RefCell::new(Some(meta_tx)));
1810 let meta_deleted = std::rc::Rc::new(std::cell::RefCell::new(0u32));
1811
1812 let meta_closure = {
1813 let meta_tx = meta_tx.clone();
1814 let meta_deleted = meta_deleted.clone();
1815 Closure::wrap(Box::new(move |event: web_sys::Event| {
1816 let target = event.target().unwrap();
1817 let request: web_sys::IdbRequest = target.unchecked_into();
1818 let result = request.result().unwrap();
1819
1820 if !result.is_null() && !result.is_undefined() {
1821 let cursor: web_sys::IdbCursorWithValue = result.unchecked_into();
1822
1823 if let Ok(key) = cursor.key() {
1824 if let Some(key_str) = key.as_string() {
1825 web_sys::console::log_1(
1826 &format!("[DELETE_ALL] Deleting metadata key: {}", key_str).into(),
1827 );
1828 }
1829 }
1830
1831 let _ = cursor.delete();
1832 *meta_deleted.borrow_mut() += 1;
1833 let _ = cursor.continue_();
1834 } else {
1835 let count = *meta_deleted.borrow();
1836 if let Some(sender) = meta_tx.borrow_mut().take() {
1837 let _ = sender.send(Ok(count));
1838 }
1839 }
1840 }) as Box<dyn FnMut(_)>)
1841 };
1842
1843 let meta_error_closure = {
1844 let meta_tx = meta_tx.clone();
1845 Closure::wrap(Box::new(move |_event: web_sys::Event| {
1846 if let Some(sender) = meta_tx.borrow_mut().take() {
1847 let _ = sender.send(Err("Metadata cursor error".to_string()));
1848 }
1849 }) as Box<dyn FnMut(_)>)
1850 };
1851
1852 metadata_cursor_req.set_onsuccess(Some(meta_closure.as_ref().unchecked_ref()));
1853 metadata_cursor_req.set_onerror(Some(meta_error_closure.as_ref().unchecked_ref()));
1854 meta_closure.forget();
1855 meta_error_closure.forget();
1856
1857 let meta_result = meta_rx.await;
1858 let meta_count = match meta_result {
1859 Ok(Ok(count)) => count,
1860 Ok(Err(e)) => {
1861 log::error!("[DELETE_ALL] Error deleting metadata: {}", e);
1862 0
1863 }
1864 Err(_) => 0,
1865 };
1866
1867 web_sys::console::log_1(
1868 &format!("[DELETE_ALL] Deleted {} metadata entries", meta_count).into(),
1869 );
1870
1871 let commit_marker_key = format!("{}_commit_marker", db_name);
1873 web_sys::console::log_1(
1874 &format!("[DELETE_ALL] Deleting commit marker: {}", commit_marker_key).into(),
1875 );
1876 let _ = metadata_store.delete(&commit_marker_key.into());
1877
1878 let (tx_tx, tx_rx) = oneshot::channel();
1880 let tx_tx = std::rc::Rc::new(std::cell::RefCell::new(Some(tx_tx)));
1881
1882 let complete_closure = {
1883 let tx_tx = tx_tx.clone();
1884 Closure::wrap(Box::new(move |_event: web_sys::Event| {
1885 if let Some(sender) = tx_tx.borrow_mut().take() {
1886 let _ = sender.send(Ok(()));
1887 }
1888 }) as Box<dyn FnMut(_)>)
1889 };
1890
1891 let tx_error_closure = {
1892 let tx_tx = tx_tx.clone();
1893 Closure::wrap(Box::new(move |_event: web_sys::Event| {
1894 if let Some(sender) = tx_tx.borrow_mut().take() {
1895 let _ = sender.send(Err("Transaction failed".to_string()));
1896 }
1897 }) as Box<dyn FnMut(_)>)
1898 };
1899
1900 transaction.set_oncomplete(Some(complete_closure.as_ref().unchecked_ref()));
1901 transaction.set_onerror(Some(tx_error_closure.as_ref().unchecked_ref()));
1902 complete_closure.forget();
1903 tx_error_closure.forget();
1904
1905 match tx_rx.await {
1906 Ok(Ok(())) => {
1907 log::info!(
1908 "[DELETE_ALL] Successfully deleted all IndexedDB data for {}: {} blocks, {} metadata entries",
1909 db_name,
1910 blocks_count,
1911 meta_count
1912 );
1913 web_sys::console::log_1(
1914 &format!(
1915 "[DELETE_ALL] Complete: {} blocks, {} metadata for {}",
1916 blocks_count, meta_count, db_name
1917 )
1918 .into(),
1919 );
1920 Ok(())
1921 }
1922 Ok(Err(e)) => Err(DatabaseError::new("INDEXEDDB_ERROR", &e)),
1923 Err(_) => Err(DatabaseError::new(
1924 "INDEXEDDB_ERROR",
1925 "Channel error during delete all",
1926 )),
1927 }
1928}