1#[cfg(target_arch = "wasm32")]
6macro_rules! lock_mutex {
7 ($mutex:expr) => {
8 $mutex
9 .try_borrow_mut()
10 .expect("RefCell borrow failed - reentrancy detected in io_operations.rs")
11 };
12}
13
14#[cfg(not(target_arch = "wasm32"))]
15macro_rules! lock_mutex {
16 ($mutex:expr) => {
17 $mutex.lock()
18 };
19}
20
21#[allow(unused_macros)]
22#[cfg(target_arch = "wasm32")]
23macro_rules! try_lock_mutex {
24 ($mutex:expr) => {
25 $mutex.try_borrow_mut().ok()
26 };
27}
28
29#[allow(unused_macros)]
30#[cfg(not(target_arch = "wasm32"))]
31macro_rules! try_lock_mutex {
32 ($mutex:expr) => {
33 Some($mutex.lock())
34 };
35}
36
37#[cfg(target_arch = "wasm32")]
38macro_rules! try_read_lock {
39 ($mutex:expr) => {
40 $mutex.try_borrow().ok()
41 };
42}
43
44#[cfg(not(target_arch = "wasm32"))]
45macro_rules! try_read_lock {
46 ($mutex:expr) => {
47 Some($mutex.lock())
48 };
49}
50
51use super::block_storage::{BLOCK_SIZE, BlockStorage};
52use crate::types::DatabaseError;
53#[cfg(not(target_arch = "wasm32"))]
54use std::sync::atomic::Ordering;
55
56#[cfg(any(
57 target_arch = "wasm32",
58 all(not(target_arch = "wasm32"), any(test, debug_assertions))
59))]
60use super::vfs_sync;
61
62#[cfg(target_arch = "wasm32")]
63use super::metadata::{BlockMetadataPersist, ChecksumAlgorithm};
64#[cfg(target_arch = "wasm32")]
65use std::collections::HashMap;
66
67#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
68use std::{fs, io::Read, path::PathBuf};
69
70#[cfg(all(
71 not(target_arch = "wasm32"),
72 any(test, debug_assertions),
73 not(feature = "fs_persist")
74))]
75use super::block_storage::GLOBAL_METADATA_TEST;
76
77pub fn read_block_sync_impl(
79 storage: &BlockStorage,
80 block_id: u64,
81) -> Result<Vec<u8>, DatabaseError> {
82 let cached_data = try_read_lock!(storage.cache).and_then(|cache| cache.get(&block_id).cloned());
86
87 #[cfg(target_arch = "wasm32")]
88 {
89 let cache_hit = cached_data.is_some();
90 web_sys::console::log_1(
91 &format!(
92 "[READ_DEBUG] db={}, block_id={}, cache_hit={}",
93 storage.db_name, block_id, cache_hit
94 )
95 .into(),
96 );
97 }
98
99 if let Some(data) = cached_data {
100 #[cfg(feature = "telemetry")]
102 if let Some(ref metrics) = storage.metrics {
103 metrics.cache_hits().inc();
104 }
105 if block_id != 0 {
108 storage.verify_against_stored_checksum(block_id, &data)?
109 }
110 if let Some(cache) = try_read_lock!(storage.cache) {
113 if cache.len() > (storage.capacity * 4 / 5) {
114 drop(cache); storage.touch_lru(block_id);
116 }
117 }
118
119 return Ok(data);
120 }
121
122 #[cfg(feature = "telemetry")]
124 if let Some(ref metrics) = storage.metrics {
125 metrics.cache_misses().inc();
126 metrics.indexeddb_operations_total().inc();
127 }
128
129 #[cfg(target_arch = "wasm32")]
131 {
132 let (data, is_visible) = vfs_sync::with_global_commit_marker(|cm| {
134 let committed = cm.borrow().get(&storage.db_name).copied().unwrap_or(0);
135
136 if block_id == 0 {
138 let data = vfs_sync::with_global_storage(|gs| {
139 let storage_map = gs.borrow();
140 let result = storage_map
141 .get(&storage.db_name)
142 .and_then(|db_storage| db_storage.get(&block_id))
143 .cloned();
144
145 #[cfg(target_arch = "wasm32")]
146 {
147 let found_in_gs = result.is_some();
148 let gs_has_db = storage_map.contains_key(&storage.db_name);
149 web_sys::console::log_1(&format!(
150 "[READ_DEBUG] GLOBAL_STORAGE lookup: db={}, gs_has_db={}, found_block_0={}",
151 storage.db_name, gs_has_db, found_in_gs
152 ).into());
153 }
154
155 result.unwrap_or_else(|| vec![0; BLOCK_SIZE])
156 });
157 return (data, true);
158 }
159
160 vfs_sync::with_global_metadata(|meta| {
162 let meta_borrow = meta.borrow();
163 let has_metadata = meta_borrow
164 .get(&storage.db_name)
165 .and_then(|db_meta| db_meta.get(&block_id))
166 .is_some();
167
168 if has_metadata {
169 let is_visible = meta_borrow
171 .get(&storage.db_name)
172 .and_then(|db_meta| db_meta.get(&block_id))
173 .map(|m| (m.version as u64) <= committed)
174 .unwrap_or(false);
175
176 if is_visible {
177 let data = vfs_sync::with_global_storage(|gs| {
179 gs.borrow()
180 .get(&storage.db_name)
181 .and_then(|db_storage| db_storage.get(&block_id))
182 .cloned()
183 .unwrap_or_else(|| vec![0; BLOCK_SIZE])
184 });
185 (data, true)
186 } else {
187 (vec![0; BLOCK_SIZE], false)
189 }
190 } else {
191 let data = vfs_sync::with_global_storage(|gs| {
193 gs.borrow()
194 .get(&storage.db_name)
195 .and_then(|db_storage| db_storage.get(&block_id))
196 .cloned()
197 });
198
199 match data {
200 Some(data) => (data, true), None => (vec![0; BLOCK_SIZE], true), }
203 }
204 })
205 });
206
207 if is_visible && block_id != 0 {
210 if let Err(e) = storage.verify_against_stored_checksum(block_id, &data) {
211 return Err(e);
212 }
213 }
214
215 if let Some(mut cache) = try_lock_mutex!(storage.cache) {
218 cache.insert(block_id, data.clone());
219 }
220
221 return Ok(data);
222 }
223
224 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
226 {
227 let base: PathBuf = storage.base_dir.clone();
228 let mut dir = base.clone();
229 dir.push(&storage.db_name);
230 let mut blocks = dir.clone();
231 blocks.push("blocks");
232 let mut block_path = blocks.clone();
233 block_path.push(format!("block_{}.bin", block_id));
234 if lock_mutex!(storage.deallocated_blocks).contains(&block_id) {
236 return Err(DatabaseError::new(
237 "BLOCK_NOT_ALLOCATED",
238 &format!("Block {} is not allocated", block_id),
239 ));
240 }
241 if let Ok(mut f) = fs::File::open(&block_path) {
242 let mut data = vec![0u8; BLOCK_SIZE];
243 f.read_exact(&mut data).map_err(|e| {
244 DatabaseError::new(
245 "IO_ERROR",
246 &format!("read block {} failed: {}", block_id, e),
247 )
248 })?;
249 lock_mutex!(storage.cache).insert(block_id, data.clone());
250 storage.verify_against_stored_checksum(block_id, &data)?;
251 storage.touch_lru(block_id);
252 storage.evict_if_needed();
253 return Ok(data);
254 }
255 let data = vec![0; BLOCK_SIZE];
258 lock_mutex!(storage.cache).insert(block_id, data.clone());
259 storage.verify_against_stored_checksum(block_id, &data)?;
260 storage.touch_lru(block_id);
261 storage.evict_if_needed();
262 Ok(data)
263 }
264
265 #[cfg(all(
267 not(target_arch = "wasm32"),
268 any(test, debug_assertions),
269 not(feature = "fs_persist")
270 ))]
271 {
272 let committed: u64 = vfs_sync::with_global_commit_marker(|cm| {
274 #[cfg(target_arch = "wasm32")]
275 let cm = cm;
276 #[cfg(not(target_arch = "wasm32"))]
277 let cm = cm.borrow();
278 cm.get(&storage.db_name).copied().unwrap_or(0)
279 });
280 let is_visible: bool = GLOBAL_METADATA_TEST.with(|meta| {
281 #[cfg(target_arch = "wasm32")]
282 let meta_map = meta.borrow_mut();
283 #[cfg(not(target_arch = "wasm32"))]
284 let meta_map = meta.lock();
285 if let Some(db_meta) = meta_map.get(&storage.db_name) {
286 if let Some(m) = db_meta.get(&block_id) {
287 return (m.version as u64) <= committed;
288 }
289 }
290 false
291 });
292 let data = if is_visible {
293 vfs_sync::with_global_storage(|gs| {
294 #[cfg(target_arch = "wasm32")]
295 let storage_map = gs;
296 #[cfg(not(target_arch = "wasm32"))]
297 let storage_map = gs.borrow();
298 if let Some(db_storage) = storage_map.get(&storage.db_name) {
299 if let Some(data) = db_storage.get(&block_id) {
300 log::debug!(
301 "[test] Block {} found in global storage (sync, committed visible)",
302 block_id
303 );
304 return data.clone();
305 }
306 }
307 vec![0; BLOCK_SIZE]
308 })
309 } else {
310 log::debug!(
311 "[test] Block {} not visible due to commit gating (committed={}, treating as zeroed)",
312 block_id,
313 committed
314 );
315 vec![0; BLOCK_SIZE]
316 };
317
318 if !lock_mutex!(storage.allocated_blocks).contains(&block_id) && !is_visible {
320 let error = DatabaseError::new(
321 "BLOCK_NOT_FOUND",
322 &format!("Block {} not found in storage", block_id),
323 );
324 storage.observability.record_error(&error);
326 return Err(error);
327 }
328
329 lock_mutex!(storage.cache).insert(block_id, data.clone());
330 log::debug!(
331 "[test] Block {} cached from global storage (sync)",
332 block_id
333 );
334 if is_visible {
336 if let Err(e) = storage.verify_against_stored_checksum(block_id, &data) {
337 log::error!(
338 "[test] Checksum verification failed for block {} (test storage): {}",
339 block_id,
340 e.message
341 );
342 storage.observability.record_error(&e);
343 return Err(e);
344 }
345 }
346 storage.touch_lru(block_id);
347 storage.evict_if_needed();
348 return Ok(data);
349 }
350
351 #[cfg(not(any(
353 target_arch = "wasm32",
354 all(not(target_arch = "wasm32"), feature = "fs_persist"),
355 all(
356 not(target_arch = "wasm32"),
357 any(test, debug_assertions),
358 not(feature = "fs_persist")
359 )
360 )))]
361 unreachable!("No storage backend configured for this build")
362}
363
364#[cfg(target_arch = "wasm32")]
366pub fn write_block_sync_impl(
367 storage: &BlockStorage,
368 block_id: u64,
369 data: Vec<u8>,
370) -> Result<(), DatabaseError> {
371 write_block_impl_inner(storage, block_id, data)
372}
373
374#[cfg(not(target_arch = "wasm32"))]
375pub fn write_block_sync_impl(
376 storage: &mut BlockStorage,
377 block_id: u64,
378 data: Vec<u8>,
379) -> Result<(), DatabaseError> {
380 write_block_impl_inner(storage, block_id, data)
381}
382
383fn write_block_impl_inner(
384 storage: &BlockStorage,
385 block_id: u64,
386 data: Vec<u8>,
387) -> Result<(), DatabaseError> {
388 #[cfg(feature = "telemetry")]
390 if let Some(ref metrics) = storage.metrics {
391 metrics.indexeddb_operations_total().inc();
392 }
393
394 storage.maybe_auto_sync();
395
396 let dirty_count = storage.get_dirty_count();
398 if dirty_count > 100 {
399 storage
401 .observability
402 .record_backpressure("high", "too_many_dirty_blocks");
403 }
404
405 if data.len() != BLOCK_SIZE {
406 return Err(DatabaseError::new(
407 "INVALID_BLOCK_SIZE",
408 &format!(
409 "Block size must be {} bytes, got {}",
410 BLOCK_SIZE,
411 data.len()
412 ),
413 ));
414 }
415
416 let verify_before = lock_mutex!(storage.policy)
419 .as_ref()
420 .map(|p| p.verify_after_write)
421 .unwrap_or(false);
422 if verify_before {
423 #[cfg(not(target_arch = "wasm32"))]
424 {
425 if let Some(bytes) = lock_mutex!(storage.cache).get(&block_id).cloned() {
426 if let Err(e) = storage.verify_against_stored_checksum(block_id, &bytes) {
427 log::error!(
428 "verify_after_write: pre-write checksum verification failed for block {}: {}",
429 block_id,
430 e.message
431 );
432 return Err(e);
433 }
434 }
435 }
436 #[cfg(target_arch = "wasm32")]
437 {
438 if let Some(bytes) = lock_mutex!(storage.cache).get(&block_id).cloned() {
439 if let Err(e) = storage.verify_against_stored_checksum(block_id, &bytes) {
440 log::error!(
441 "verify_after_write: pre-write checksum verification failed for block {}: {}",
442 block_id,
443 e.message
444 );
445 return Err(e);
446 }
447 } else {
448 let maybe_bytes = vfs_sync::with_global_storage(|gs| {
449 let storage_map = gs;
450 storage_map
451 .borrow()
452 .get(&storage.db_name)
453 .and_then(|db| db.get(&block_id))
454 .cloned()
455 });
456 if let Some(bytes) = maybe_bytes {
457 if let Err(e) = storage.verify_against_stored_checksum(block_id, &bytes) {
458 log::error!(
459 "verify_after_write: pre-write checksum verification failed for block {}: {}",
460 block_id,
461 e.message
462 );
463 return Err(e);
464 }
465 }
466 }
467 }
468 #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions)))]
469 {
470 if let Some(bytes) = lock_mutex!(storage.cache).get(&block_id).cloned() {
471 if let Err(e) = storage.verify_against_stored_checksum(block_id, &bytes) {
472 log::error!(
473 "[test] verify_after_write: pre-write checksum verification failed for block {}: {}",
474 block_id,
475 e.message
476 );
477 return Err(e);
478 }
479 } else {
480 let maybe_bytes = vfs_sync::with_global_storage(|gs| {
481 let storage_map = gs.borrow();
482 storage_map
483 .get(&storage.db_name)
484 .and_then(|db| db.get(&block_id))
485 .cloned()
486 });
487 if let Some(bytes) = maybe_bytes {
488 if let Err(e) = storage.verify_against_stored_checksum(block_id, &bytes) {
489 log::error!(
490 "[test] verify_after_write: pre-write checksum verification failed for block {}: {}",
491 block_id,
492 e.message
493 );
494 return Err(e);
495 }
496 }
497 }
498 }
499 }
500
501 #[cfg(target_arch = "wasm32")]
503 {
504 let existing_data = vfs_sync::with_global_storage(|gs| {
506 let storage_map = gs;
507 if let Some(db_storage) = storage_map.borrow().get(&storage.db_name) {
508 db_storage.get(&block_id).cloned()
509 } else {
510 None
511 }
512 });
513
514 let has_committed_metadata = vfs_sync::with_global_metadata(|meta| {
516 if let Some(db_meta) = meta.borrow().get(&storage.db_name) {
517 if let Some(metadata) = db_meta.get(&block_id) {
518 metadata.version > 0
520 } else {
521 false
522 }
523 } else {
524 false
525 }
526 });
527
528 let should_write = if let Some(existing) = existing_data {
530 if has_committed_metadata {
531 true } else if existing.iter().zip(data.iter()).all(|(a, b)| a == b) {
534 false
536 } else {
537 let existing_non_zero = existing.iter().filter(|&&b| b != 0).count();
539 let new_non_zero = data.iter().filter(|&&b| b != 0).count();
540
541 if new_non_zero > existing_non_zero {
542 true
543 } else if new_non_zero < existing_non_zero {
544 false
545 } else {
546 true
547 }
548 }
549 } else {
550 let has_global_committed_data = vfs_sync::with_global_metadata(|meta| {
552 if let Some(db_meta) = meta.borrow().get(&storage.db_name) {
553 if let Some(metadata) = db_meta.get(&block_id) {
554 metadata.version > 0
555 } else {
556 false
557 }
558 } else {
559 false
560 }
561 });
562
563 if has_global_committed_data {
564 true } else {
566 true
568 }
569 };
570
571 if should_write {
572 vfs_sync::with_global_storage(|gs| {
573 let mut storage_map = gs.borrow_mut();
574 let db_storage = storage_map
575 .entry(storage.db_name.clone())
576 .or_insert_with(HashMap::new);
577
578 db_storage.insert(block_id, data.clone());
582 });
583 }
584
585 vfs_sync::with_global_metadata(|meta| {
587 let mut meta_guard = meta.borrow_mut();
588 let db_meta = meta_guard
589 .entry(storage.db_name.clone())
590 .or_insert_with(HashMap::new);
591
592 let stored_data = if should_write {
594 data.clone()
595 } else {
596 vfs_sync::with_global_storage(|gs| {
598 let storage_map = gs;
599 if let Some(db_storage) = storage_map.borrow().get(&storage.db_name) {
600 if let Some(existing) = db_storage.get(&block_id) {
601 existing.clone()
602 } else {
603 data.clone() }
605 } else {
606 data.clone() }
608 })
609 };
610
611 let checksum = {
612 let mut hasher = crc32fast::Hasher::new();
613 hasher.update(&stored_data);
614 hasher.finalize() as u64
615 };
616
617 let version = if let Some(existing_meta) = db_meta.get(&block_id) {
619 existing_meta.version
620 } else {
621 1 };
623
624 db_meta.insert(
625 block_id,
626 BlockMetadataPersist {
627 checksum,
628 version,
629 last_modified_ms: 0, algo: ChecksumAlgorithm::CRC32,
631 },
632 );
633 });
634
635 #[cfg(all(
637 not(target_arch = "wasm32"),
638 any(test, debug_assertions),
639 not(feature = "fs_persist")
640 ))]
641 GLOBAL_METADATA_TEST.with(|meta| {
642 let mut meta_map = meta.borrow_mut();
643 let db_meta = meta_map
644 .entry(storage.db_name.clone())
645 .or_insert_with(HashMap::new);
646
647 let stored_data = if should_write {
649 data.clone()
650 } else {
651 vfs_sync::with_global_storage(|gs| {
653 let storage_map = gs.lock();
654 if let Some(db_storage) = storage_map.get(&storage.db_name) {
655 if let Some(existing) = db_storage.get(&block_id) {
656 existing.clone()
657 } else {
658 data.clone() }
660 } else {
661 data.clone() }
663 })
664 };
665
666 let checksum = {
667 let mut hasher = crc32fast::Hasher::new();
668 hasher.update(&stored_data);
669 hasher.finalize() as u64
670 };
671
672 let version = if let Some(existing_meta) = db_meta.get(&block_id) {
674 existing_meta.version
675 } else {
676 1 };
678
679 db_meta.insert(
680 block_id,
681 BlockMetadataPersist {
682 checksum,
683 version,
684 last_modified_ms: 0, algo: ChecksumAlgorithm::CRC32,
686 },
687 );
688 log::debug!(
689 "Updated test metadata for block {} with checksum {} (version {})",
690 block_id,
691 checksum,
692 version
693 );
694 });
695 }
696
697 lock_mutex!(storage.cache).insert(block_id, data.clone());
699 {
700 let mut dirty = lock_mutex!(storage.dirty_blocks);
701 dirty.insert(block_id, data);
702 }
703 if let Some(bytes) = lock_mutex!(storage.cache).get(&block_id) {
705 storage.checksum_manager.store_checksum(block_id, bytes);
706 }
707 #[cfg(not(target_arch = "wasm32"))]
709 {
710 storage
711 .last_write_ms
712 .store(BlockStorage::now_millis(), Ordering::SeqCst);
713 }
714
715 let (max_dirty_opt, max_bytes_opt) = lock_mutex!(storage.policy)
717 .as_ref()
718 .map(|p| (p.max_dirty, p.max_dirty_bytes))
719 .unwrap_or((None, None));
720
721 let mut threshold_reached = false;
722 if let Some(max_dirty) = max_dirty_opt {
723 let cur = lock_mutex!(storage.dirty_blocks).len();
724 if cur >= max_dirty {
725 threshold_reached = true;
726 }
727 }
728 if let Some(max_bytes) = max_bytes_opt {
729 let cur_bytes: usize = {
730 let m = lock_mutex!(storage.dirty_blocks);
731 m.values().map(|v| v.len()).sum()
732 };
733 if cur_bytes >= max_bytes {
734 threshold_reached = true;
735 }
736 }
737
738 if threshold_reached {
739 let debounce_ms_opt = lock_mutex!(storage.policy)
740 .as_ref()
741 .and_then(|p| p.debounce_ms);
742 if let Some(_debounce) = debounce_ms_opt {
743 #[cfg(not(target_arch = "wasm32"))]
745 {
746 storage.threshold_hit.store(true, Ordering::SeqCst);
747 }
748 } else {
749 #[cfg(target_arch = "wasm32")]
751 #[allow(invalid_reference_casting)]
752 {
753 let storage_mut =
755 unsafe { &mut *(storage as *const BlockStorage as *mut BlockStorage) };
756 let _ = storage_mut.sync_now();
757 }
758 #[cfg(not(target_arch = "wasm32"))]
759 {
760 if storage.sync_sender.is_some() {
762 log::info!("Threshold reached: marking for inline sync");
763 storage.threshold_hit.store(true, Ordering::SeqCst);
764 } else {
765 log::warn!("Backpressure threshold reached but no auto-sync enabled");
766 }
767 }
768 }
769 }
770
771 storage.touch_lru(block_id);
772 storage.evict_if_needed();
773
774 #[cfg(feature = "telemetry")]
776 if let Some(ref metrics) = storage.metrics {
777 let cache_guard = storage.cache.lock();
779 let total_bytes: usize = cache_guard.values().map(|v| v.len()).sum();
780 metrics.storage_bytes().set(total_bytes as f64);
781
782 let cache_bytes: usize = cache_guard.len() * BLOCK_SIZE;
784 metrics.cache_size_bytes().set(cache_bytes as f64);
785 }
786
787 Ok(())
788}