absurder_sql/storage/
io_operations.rs

1//! I/O operations for BlockStorage
2//! This module contains block reading and writing functionality
3
4// Reentrancy-safe lock macros
5#[cfg(target_arch = "wasm32")]
6macro_rules! lock_mutex {
7    ($mutex:expr) => {
8        $mutex
9            .try_borrow_mut()
10            .expect("RefCell borrow failed - reentrancy detected in io_operations.rs")
11    };
12}
13
14#[cfg(not(target_arch = "wasm32"))]
15macro_rules! lock_mutex {
16    ($mutex:expr) => {
17        $mutex.lock()
18    };
19}
20
21#[allow(unused_macros)]
22#[cfg(target_arch = "wasm32")]
23macro_rules! try_lock_mutex {
24    ($mutex:expr) => {
25        $mutex.try_borrow_mut().ok()
26    };
27}
28
29#[allow(unused_macros)]
30#[cfg(not(target_arch = "wasm32"))]
31macro_rules! try_lock_mutex {
32    ($mutex:expr) => {
33        Some($mutex.lock())
34    };
35}
36
37#[cfg(target_arch = "wasm32")]
38macro_rules! try_read_lock {
39    ($mutex:expr) => {
40        $mutex.try_borrow().ok()
41    };
42}
43
44#[cfg(not(target_arch = "wasm32"))]
45macro_rules! try_read_lock {
46    ($mutex:expr) => {
47        Some($mutex.lock())
48    };
49}
50
51use super::block_storage::{BLOCK_SIZE, BlockStorage};
52use crate::types::DatabaseError;
53#[cfg(not(target_arch = "wasm32"))]
54use std::sync::atomic::Ordering;
55
56#[cfg(any(
57    target_arch = "wasm32",
58    all(not(target_arch = "wasm32"), any(test, debug_assertions))
59))]
60use super::vfs_sync;
61
62#[cfg(target_arch = "wasm32")]
63use super::metadata::{BlockMetadataPersist, ChecksumAlgorithm};
64#[cfg(target_arch = "wasm32")]
65use std::collections::HashMap;
66
67#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
68use std::{fs, io::Read, path::PathBuf};
69
70#[cfg(all(
71    not(target_arch = "wasm32"),
72    any(test, debug_assertions),
73    not(feature = "fs_persist")
74))]
75use super::block_storage::GLOBAL_METADATA_TEST;
76
77/// Synchronous block read implementation
78pub fn read_block_sync_impl(
79    storage: &BlockStorage,
80    block_id: u64,
81) -> Result<Vec<u8>, DatabaseError> {
82    // Skip auto_sync check for reads - only writes trigger sync
83
84    // Check cache first - use try_read_lock to handle reentrancy
85    let cached_data = try_read_lock!(storage.cache).and_then(|cache| cache.get(&block_id).cloned());
86
87    #[cfg(target_arch = "wasm32")]
88    {
89        let cache_hit = cached_data.is_some();
90        web_sys::console::log_1(
91            &format!(
92                "[READ_DEBUG] db={}, block_id={}, cache_hit={}",
93                storage.db_name, block_id, cache_hit
94            )
95            .into(),
96        );
97    }
98
99    if let Some(data) = cached_data {
100        // Record cache hit
101        #[cfg(feature = "telemetry")]
102        if let Some(ref metrics) = storage.metrics {
103            metrics.cache_hits().inc();
104        }
105        // Verify checksum even for cached data to catch corruption
106        // Skip block 0 as it's the SQLite header which can be modified by SQLite
107        if block_id != 0 {
108            storage.verify_against_stored_checksum(block_id, &data)?
109        }
110        // Only update LRU when close to capacity to avoid O(n) overhead on every read
111        // This maintains correctness for eviction while optimizing hot-path performance
112        if let Some(cache) = try_read_lock!(storage.cache) {
113            if cache.len() > (storage.capacity * 4 / 5) {
114                drop(cache); // Drop read lock before calling touch_lru
115                storage.touch_lru(block_id);
116            }
117        }
118
119        return Ok(data);
120    }
121
122    // Record cache miss
123    #[cfg(feature = "telemetry")]
124    if let Some(ref metrics) = storage.metrics {
125        metrics.cache_misses().inc();
126        metrics.indexeddb_operations_total().inc();
127    }
128
129    // For WASM, check global storage for persistence across instances
130    #[cfg(target_arch = "wasm32")]
131    {
132        // Single combined lookup for commit marker, visibility, and data
133        let (data, is_visible) = vfs_sync::with_global_commit_marker(|cm| {
134            let committed = cm.borrow().get(&storage.db_name).copied().unwrap_or(0);
135
136            // Block 0 (database header) is always visible
137            if block_id == 0 {
138                let data = vfs_sync::with_global_storage(|gs| {
139                    let storage_map = gs.borrow();
140                    let result = storage_map
141                        .get(&storage.db_name)
142                        .and_then(|db_storage| db_storage.get(&block_id))
143                        .cloned();
144
145                    #[cfg(target_arch = "wasm32")]
146                    {
147                        let found_in_gs = result.is_some();
148                        let gs_has_db = storage_map.contains_key(&storage.db_name);
149                        web_sys::console::log_1(&format!(
150                                "[READ_DEBUG] GLOBAL_STORAGE lookup: db={}, gs_has_db={}, found_block_0={}",
151                                storage.db_name, gs_has_db, found_in_gs
152                            ).into());
153                    }
154
155                    result.unwrap_or_else(|| vec![0; BLOCK_SIZE])
156                });
157                return (data, true);
158            }
159
160            // For other blocks, check visibility and get data in one pass
161            vfs_sync::with_global_metadata(|meta| {
162                let meta_borrow = meta.borrow();
163                let has_metadata = meta_borrow
164                    .get(&storage.db_name)
165                    .and_then(|db_meta| db_meta.get(&block_id))
166                    .is_some();
167
168                if has_metadata {
169                    // Has metadata - check if visible based on commit marker
170                    let is_visible = meta_borrow
171                        .get(&storage.db_name)
172                        .and_then(|db_meta| db_meta.get(&block_id))
173                        .map(|m| (m.version as u64) <= committed)
174                        .unwrap_or(false);
175
176                    if is_visible {
177                        // Visible - return actual data
178                        let data = vfs_sync::with_global_storage(|gs| {
179                            gs.borrow()
180                                .get(&storage.db_name)
181                                .and_then(|db_storage| db_storage.get(&block_id))
182                                .cloned()
183                                .unwrap_or_else(|| vec![0; BLOCK_SIZE])
184                        });
185                        (data, true)
186                    } else {
187                        // Not visible (version > commit marker) - return zeroed data for SQLite
188                        (vec![0; BLOCK_SIZE], false)
189                    }
190                } else {
191                    // No metadata - check if data exists in global storage
192                    let data = vfs_sync::with_global_storage(|gs| {
193                        gs.borrow()
194                            .get(&storage.db_name)
195                            .and_then(|db_storage| db_storage.get(&block_id))
196                            .cloned()
197                    });
198
199                    match data {
200                        Some(data) => (data, true),          // Old data before metadata tracking
201                        None => (vec![0; BLOCK_SIZE], true), // Return zeros for RMW (read-modify-write)
202                    }
203                }
204            })
205        });
206
207        // Verify checksum ONLY for visible blocks in WASM
208        // Skip block 0 as it's the SQLite header which can be modified by SQLite
209        if is_visible && block_id != 0 {
210            if let Err(e) = storage.verify_against_stored_checksum(block_id, &data) {
211                return Err(e);
212            }
213        }
214
215        // Cache for future reads (skip eviction check for performance)
216        // Use try_lock to handle reentrancy gracefully - skip caching if borrowed
217        if let Some(mut cache) = try_lock_mutex!(storage.cache) {
218            cache.insert(block_id, data.clone());
219        }
220
221        return Ok(data);
222    }
223
224    // For native fs_persist, read from filesystem if allocated
225    #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
226    {
227        let base: PathBuf = storage.base_dir.clone();
228        let mut dir = base.clone();
229        dir.push(&storage.db_name);
230        let mut blocks = dir.clone();
231        blocks.push("blocks");
232        let mut block_path = blocks.clone();
233        block_path.push(format!("block_{}.bin", block_id));
234        // If the block was explicitly deallocated (tombstoned), refuse reads
235        if lock_mutex!(storage.deallocated_blocks).contains(&block_id) {
236            return Err(DatabaseError::new(
237                "BLOCK_NOT_ALLOCATED",
238                &format!("Block {} is not allocated", block_id),
239            ));
240        }
241        if let Ok(mut f) = fs::File::open(&block_path) {
242            let mut data = vec![0u8; BLOCK_SIZE];
243            f.read_exact(&mut data).map_err(|e| {
244                DatabaseError::new(
245                    "IO_ERROR",
246                    &format!("read block {} failed: {}", block_id, e),
247                )
248            })?;
249            lock_mutex!(storage.cache).insert(block_id, data.clone());
250            storage.verify_against_stored_checksum(block_id, &data)?;
251            storage.touch_lru(block_id);
252            storage.evict_if_needed();
253            return Ok(data);
254        }
255        // If file missing, treat as zeroed data (compat). This covers never-written blocks
256        // and avoids depending on allocated_blocks for read behavior.
257        let data = vec![0; BLOCK_SIZE];
258        lock_mutex!(storage.cache).insert(block_id, data.clone());
259        storage.verify_against_stored_checksum(block_id, &data)?;
260        storage.touch_lru(block_id);
261        storage.evict_if_needed();
262        Ok(data)
263    }
264
265    // For native tests, check test-global storage for persistence across instances (when fs_persist disabled)
266    #[cfg(all(
267        not(target_arch = "wasm32"),
268        any(test, debug_assertions),
269        not(feature = "fs_persist")
270    ))]
271    {
272        // Enforce commit gating in native test path as well
273        let committed: u64 = vfs_sync::with_global_commit_marker(|cm| {
274            #[cfg(target_arch = "wasm32")]
275            let cm = cm;
276            #[cfg(not(target_arch = "wasm32"))]
277            let cm = cm.borrow();
278            cm.get(&storage.db_name).copied().unwrap_or(0)
279        });
280        let is_visible: bool = GLOBAL_METADATA_TEST.with(|meta| {
281            #[cfg(target_arch = "wasm32")]
282            let meta_map = meta.borrow_mut();
283            #[cfg(not(target_arch = "wasm32"))]
284            let meta_map = meta.lock();
285            if let Some(db_meta) = meta_map.get(&storage.db_name) {
286                if let Some(m) = db_meta.get(&block_id) {
287                    return (m.version as u64) <= committed;
288                }
289            }
290            false
291        });
292        let data = if is_visible {
293            vfs_sync::with_global_storage(|gs| {
294                #[cfg(target_arch = "wasm32")]
295                let storage_map = gs;
296                #[cfg(not(target_arch = "wasm32"))]
297                let storage_map = gs.borrow();
298                if let Some(db_storage) = storage_map.get(&storage.db_name) {
299                    if let Some(data) = db_storage.get(&block_id) {
300                        log::debug!(
301                            "[test] Block {} found in global storage (sync, committed visible)",
302                            block_id
303                        );
304                        return data.clone();
305                    }
306                }
307                vec![0; BLOCK_SIZE]
308            })
309        } else {
310            log::debug!(
311                "[test] Block {} not visible due to commit gating (committed={}, treating as zeroed)",
312                block_id,
313                committed
314            );
315            vec![0; BLOCK_SIZE]
316        };
317
318        // Check if block is actually allocated before returning zeroed data
319        if !lock_mutex!(storage.allocated_blocks).contains(&block_id) && !is_visible {
320            let error = DatabaseError::new(
321                "BLOCK_NOT_FOUND",
322                &format!("Block {} not found in storage", block_id),
323            );
324            // Record error for observability
325            storage.observability.record_error(&error);
326            return Err(error);
327        }
328
329        lock_mutex!(storage.cache).insert(block_id, data.clone());
330        log::debug!(
331            "[test] Block {} cached from global storage (sync)",
332            block_id
333        );
334        // Verify checksum only if the block is visible under the commit marker
335        if is_visible {
336            if let Err(e) = storage.verify_against_stored_checksum(block_id, &data) {
337                log::error!(
338                    "[test] Checksum verification failed for block {} (test storage): {}",
339                    block_id,
340                    e.message
341                );
342                storage.observability.record_error(&e);
343                return Err(e);
344            }
345        }
346        storage.touch_lru(block_id);
347        storage.evict_if_needed();
348        return Ok(data);
349    }
350
351    // Unreachable: all build configurations should hit one of the above code paths
352    #[cfg(not(any(
353        target_arch = "wasm32",
354        all(not(target_arch = "wasm32"), feature = "fs_persist"),
355        all(
356            not(target_arch = "wasm32"),
357            any(test, debug_assertions),
358            not(feature = "fs_persist")
359        )
360    )))]
361    unreachable!("No storage backend configured for this build")
362}
363
364/// Synchronous block write implementation  
365#[cfg(target_arch = "wasm32")]
366pub fn write_block_sync_impl(
367    storage: &BlockStorage,
368    block_id: u64,
369    data: Vec<u8>,
370) -> Result<(), DatabaseError> {
371    write_block_impl_inner(storage, block_id, data)
372}
373
374#[cfg(not(target_arch = "wasm32"))]
375pub fn write_block_sync_impl(
376    storage: &mut BlockStorage,
377    block_id: u64,
378    data: Vec<u8>,
379) -> Result<(), DatabaseError> {
380    write_block_impl_inner(storage, block_id, data)
381}
382
383fn write_block_impl_inner(
384    storage: &BlockStorage,
385    block_id: u64,
386    data: Vec<u8>,
387) -> Result<(), DatabaseError> {
388    // Record IndexedDB write operation
389    #[cfg(feature = "telemetry")]
390    if let Some(ref metrics) = storage.metrics {
391        metrics.indexeddb_operations_total().inc();
392    }
393
394    storage.maybe_auto_sync();
395
396    // Check for backpressure conditions
397    let dirty_count = storage.get_dirty_count();
398    if dirty_count > 100 {
399        // Threshold for backpressure
400        storage
401            .observability
402            .record_backpressure("high", "too_many_dirty_blocks");
403    }
404
405    if data.len() != BLOCK_SIZE {
406        return Err(DatabaseError::new(
407            "INVALID_BLOCK_SIZE",
408            &format!(
409                "Block size must be {} bytes, got {}",
410                BLOCK_SIZE,
411                data.len()
412            ),
413        ));
414    }
415
416    // If requested by policy, verify existing data integrity BEFORE accepting the new write.
417    // This prevents overwriting a block whose prior contents no longer match the stored checksum.
418    let verify_before = lock_mutex!(storage.policy)
419        .as_ref()
420        .map(|p| p.verify_after_write)
421        .unwrap_or(false);
422    if verify_before {
423        #[cfg(not(target_arch = "wasm32"))]
424        {
425            if let Some(bytes) = lock_mutex!(storage.cache).get(&block_id).cloned() {
426                if let Err(e) = storage.verify_against_stored_checksum(block_id, &bytes) {
427                    log::error!(
428                        "verify_after_write: pre-write checksum verification failed for block {}: {}",
429                        block_id,
430                        e.message
431                    );
432                    return Err(e);
433                }
434            }
435        }
436        #[cfg(target_arch = "wasm32")]
437        {
438            if let Some(bytes) = lock_mutex!(storage.cache).get(&block_id).cloned() {
439                if let Err(e) = storage.verify_against_stored_checksum(block_id, &bytes) {
440                    log::error!(
441                        "verify_after_write: pre-write checksum verification failed for block {}: {}",
442                        block_id,
443                        e.message
444                    );
445                    return Err(e);
446                }
447            } else {
448                let maybe_bytes = vfs_sync::with_global_storage(|gs| {
449                    let storage_map = gs;
450                    storage_map
451                        .borrow()
452                        .get(&storage.db_name)
453                        .and_then(|db| db.get(&block_id))
454                        .cloned()
455                });
456                if let Some(bytes) = maybe_bytes {
457                    if let Err(e) = storage.verify_against_stored_checksum(block_id, &bytes) {
458                        log::error!(
459                            "verify_after_write: pre-write checksum verification failed for block {}: {}",
460                            block_id,
461                            e.message
462                        );
463                        return Err(e);
464                    }
465                }
466            }
467        }
468        #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions)))]
469        {
470            if let Some(bytes) = lock_mutex!(storage.cache).get(&block_id).cloned() {
471                if let Err(e) = storage.verify_against_stored_checksum(block_id, &bytes) {
472                    log::error!(
473                        "[test] verify_after_write: pre-write checksum verification failed for block {}: {}",
474                        block_id,
475                        e.message
476                    );
477                    return Err(e);
478                }
479            } else {
480                let maybe_bytes = vfs_sync::with_global_storage(|gs| {
481                    let storage_map = gs.borrow();
482                    storage_map
483                        .get(&storage.db_name)
484                        .and_then(|db| db.get(&block_id))
485                        .cloned()
486                });
487                if let Some(bytes) = maybe_bytes {
488                    if let Err(e) = storage.verify_against_stored_checksum(block_id, &bytes) {
489                        log::error!(
490                            "[test] verify_after_write: pre-write checksum verification failed for block {}: {}",
491                            block_id,
492                            e.message
493                        );
494                        return Err(e);
495                    }
496                }
497            }
498        }
499    }
500
501    // For WASM, immediately persist to global storage FIRST for cross-instance visibility
502    #[cfg(target_arch = "wasm32")]
503    {
504        // Check if this block already exists in global storage with committed data
505        let existing_data = vfs_sync::with_global_storage(|gs| {
506            let storage_map = gs;
507            if let Some(db_storage) = storage_map.borrow().get(&storage.db_name) {
508                db_storage.get(&block_id).cloned()
509            } else {
510                None
511            }
512        });
513
514        // Check if there's existing metadata for this block
515        let has_committed_metadata = vfs_sync::with_global_metadata(|meta| {
516            if let Some(db_meta) = meta.borrow().get(&storage.db_name) {
517                if let Some(metadata) = db_meta.get(&block_id) {
518                    // If version > 0, this block has been committed before
519                    metadata.version > 0
520                } else {
521                    false
522                }
523            } else {
524                false
525            }
526        });
527
528        // Only overwrite if there's no committed data or if this is a legitimate update
529        let should_write = if let Some(existing) = existing_data {
530            if has_committed_metadata {
531                // CRITICAL FIX: Always allow writes during transactions to ensure schema changes persist
532                true // Always allow writes when there's committed metadata
533            } else if existing.iter().zip(data.iter()).all(|(a, b)| a == b) {
534                // If the data is identical, skip the write
535                false
536            } else {
537                // Check if the new data is richer (has more non-zero bytes) than existing
538                let existing_non_zero = existing.iter().filter(|&&b| b != 0).count();
539                let new_non_zero = data.iter().filter(|&&b| b != 0).count();
540
541                if new_non_zero > existing_non_zero {
542                    true
543                } else if new_non_zero < existing_non_zero {
544                    false
545                } else {
546                    true
547                }
548            }
549        } else {
550            // Check if there's committed data in global storage that we haven't seen yet
551            let has_global_committed_data = vfs_sync::with_global_metadata(|meta| {
552                if let Some(db_meta) = meta.borrow().get(&storage.db_name) {
553                    if let Some(metadata) = db_meta.get(&block_id) {
554                        metadata.version > 0
555                    } else {
556                        false
557                    }
558                } else {
559                    false
560                }
561            });
562
563            if has_global_committed_data {
564                true // Allow transactional writes even when committed data exists
565            } else {
566                // No existing data and no committed metadata, safe to write
567                true
568            }
569        };
570
571        if should_write {
572            vfs_sync::with_global_storage(|gs| {
573                let mut storage_map = gs.borrow_mut();
574                let db_storage = storage_map
575                    .entry(storage.db_name.clone())
576                    .or_insert_with(HashMap::new);
577
578                // Log what we're about to write vs what exists
579                // Block overwrite (debug logging removed for performance)
580
581                db_storage.insert(block_id, data.clone());
582            });
583        }
584
585        // Always ensure metadata exists for the block, and UPDATE checksum if we wrote new data
586        vfs_sync::with_global_metadata(|meta| {
587            let mut meta_guard = meta.borrow_mut();
588            let db_meta = meta_guard
589                .entry(storage.db_name.clone())
590                .or_insert_with(HashMap::new);
591
592            // Calculate checksum for the data that will be stored (either new or existing)
593            let stored_data = if should_write {
594                data.clone()
595            } else {
596                // Use existing data from global storage
597                vfs_sync::with_global_storage(|gs| {
598                    let storage_map = gs;
599                    if let Some(db_storage) = storage_map.borrow().get(&storage.db_name) {
600                        if let Some(existing) = db_storage.get(&block_id) {
601                            existing.clone()
602                        } else {
603                            data.clone() // Fallback to new data
604                        }
605                    } else {
606                        data.clone() // Fallback to new data
607                    }
608                })
609            };
610
611            let checksum = {
612                let mut hasher = crc32fast::Hasher::new();
613                hasher.update(&stored_data);
614                hasher.finalize() as u64
615            };
616
617            // If metadata exists, preserve the version number but update the checksum
618            let version = if let Some(existing_meta) = db_meta.get(&block_id) {
619                existing_meta.version
620            } else {
621                1 // Start at version 1 so uncommitted data is hidden (commit marker starts at 0)
622            };
623
624            db_meta.insert(
625                block_id,
626                BlockMetadataPersist {
627                    checksum,
628                    version,
629                    last_modified_ms: 0, // Will be updated during sync
630                    algo: ChecksumAlgorithm::CRC32,
631                },
632            );
633        });
634
635        // Also create/update metadata for native test path
636        #[cfg(all(
637            not(target_arch = "wasm32"),
638            any(test, debug_assertions),
639            not(feature = "fs_persist")
640        ))]
641        GLOBAL_METADATA_TEST.with(|meta| {
642            let mut meta_map = meta.borrow_mut();
643            let db_meta = meta_map
644                .entry(storage.db_name.clone())
645                .or_insert_with(HashMap::new);
646
647            // Calculate checksum for the data that will be stored (either new or existing)
648            let stored_data = if should_write {
649                data.clone()
650            } else {
651                // Use existing data from global test storage
652                vfs_sync::with_global_storage(|gs| {
653                    let storage_map = gs.lock();
654                    if let Some(db_storage) = storage_map.get(&storage.db_name) {
655                        if let Some(existing) = db_storage.get(&block_id) {
656                            existing.clone()
657                        } else {
658                            data.clone() // Fallback to new data
659                        }
660                    } else {
661                        data.clone() // Fallback to new data
662                    }
663                })
664            };
665
666            let checksum = {
667                let mut hasher = crc32fast::Hasher::new();
668                hasher.update(&stored_data);
669                hasher.finalize() as u64
670            };
671
672            // If metadata exists, preserve the version number but update the checksum
673            let version = if let Some(existing_meta) = db_meta.get(&block_id) {
674                existing_meta.version
675            } else {
676                1 // Start at version 1 so uncommitted data is hidden (commit marker starts at 0)
677            };
678
679            db_meta.insert(
680                block_id,
681                BlockMetadataPersist {
682                    checksum,
683                    version,
684                    last_modified_ms: 0, // Will be updated during sync
685                    algo: ChecksumAlgorithm::CRC32,
686                },
687            );
688            log::debug!(
689                "Updated test metadata for block {} with checksum {} (version {})",
690                block_id,
691                checksum,
692                version
693            );
694        });
695    }
696
697    // Update cache and mark as dirty
698    lock_mutex!(storage.cache).insert(block_id, data.clone());
699    {
700        let mut dirty = lock_mutex!(storage.dirty_blocks);
701        dirty.insert(block_id, data);
702    }
703    // Update checksum metadata on write
704    if let Some(bytes) = lock_mutex!(storage.cache).get(&block_id) {
705        storage.checksum_manager.store_checksum(block_id, bytes);
706    }
707    // Record write time for debounce tracking (native)
708    #[cfg(not(target_arch = "wasm32"))]
709    {
710        storage
711            .last_write_ms
712            .store(BlockStorage::now_millis(), Ordering::SeqCst);
713    }
714
715    // Policy-based triggers: thresholds
716    let (max_dirty_opt, max_bytes_opt) = lock_mutex!(storage.policy)
717        .as_ref()
718        .map(|p| (p.max_dirty, p.max_dirty_bytes))
719        .unwrap_or((None, None));
720
721    let mut threshold_reached = false;
722    if let Some(max_dirty) = max_dirty_opt {
723        let cur = lock_mutex!(storage.dirty_blocks).len();
724        if cur >= max_dirty {
725            threshold_reached = true;
726        }
727    }
728    if let Some(max_bytes) = max_bytes_opt {
729        let cur_bytes: usize = {
730            let m = lock_mutex!(storage.dirty_blocks);
731            m.values().map(|v| v.len()).sum()
732        };
733        if cur_bytes >= max_bytes {
734            threshold_reached = true;
735        }
736    }
737
738    if threshold_reached {
739        let debounce_ms_opt = lock_mutex!(storage.policy)
740            .as_ref()
741            .and_then(|p| p.debounce_ms);
742        if let Some(_debounce) = debounce_ms_opt {
743            // Debounce enabled: mark threshold and let debounce thread flush after inactivity
744            #[cfg(not(target_arch = "wasm32"))]
745            {
746                storage.threshold_hit.store(true, Ordering::SeqCst);
747            }
748        } else {
749            // No debounce: flush immediately
750            #[cfg(target_arch = "wasm32")]
751            #[allow(invalid_reference_casting)]
752            {
753                // WASM: Use unsafe cast to call sync_now
754                let storage_mut =
755                    unsafe { &mut *(storage as *const BlockStorage as *mut BlockStorage) };
756                let _ = storage_mut.sync_now();
757            }
758            #[cfg(not(target_arch = "wasm32"))]
759            {
760                // Native: Mark that threshold was hit so write_block can sync inline
761                if storage.sync_sender.is_some() {
762                    log::info!("Threshold reached: marking for inline sync");
763                    storage.threshold_hit.store(true, Ordering::SeqCst);
764                } else {
765                    log::warn!("Backpressure threshold reached but no auto-sync enabled");
766                }
767            }
768        }
769    }
770
771    storage.touch_lru(block_id);
772    storage.evict_if_needed();
773
774    // Update storage and cache size gauges
775    #[cfg(feature = "telemetry")]
776    if let Some(ref metrics) = storage.metrics {
777        // Update storage bytes gauge
778        let cache_guard = storage.cache.lock();
779        let total_bytes: usize = cache_guard.values().map(|v| v.len()).sum();
780        metrics.storage_bytes().set(total_bytes as f64);
781
782        // Update cache size bytes gauge
783        let cache_bytes: usize = cache_guard.len() * BLOCK_SIZE;
784        metrics.cache_size_bytes().set(cache_bytes as f64);
785    }
786
787    Ok(())
788}