absurder_sql/storage/
block_storage.rs

1#[allow(unused_imports)]
2use super::metadata::BlockMetadataPersist;
3use super::metadata::{ChecksumAlgorithm, ChecksumManager};
4#[cfg(any(
5    target_arch = "wasm32",
6    all(
7        not(target_arch = "wasm32"),
8        any(test, debug_assertions),
9        not(feature = "fs_persist")
10    )
11))]
12use super::vfs_sync;
13use crate::types::DatabaseError;
14#[cfg(target_arch = "wasm32")]
15use js_sys::Date;
16#[cfg(not(target_arch = "wasm32"))]
17use parking_lot::Mutex;
18#[cfg(target_arch = "wasm32")]
19use std::cell::RefCell;
20use std::collections::{HashMap, HashSet, VecDeque};
21#[allow(unused_imports)]
22use std::sync::{
23    Arc,
24    atomic::{AtomicBool, AtomicU64, Ordering},
25};
26use std::time::Duration;
27#[cfg(not(target_arch = "wasm32"))]
28use std::time::{Instant, SystemTime, UNIX_EPOCH};
29#[cfg(not(target_arch = "wasm32"))]
30use tokio::sync::mpsc;
31#[cfg(not(target_arch = "wasm32"))]
32use tokio::task::JoinHandle as TokioJoinHandle;
33
34// FS persistence imports (native only when feature is enabled)
35#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
36use std::path::PathBuf;
37
38// Global storage management moved to vfs_sync module
39
40// Persistent metadata storage moved to metadata module
41
42// Auto-sync messaging
43#[cfg(not(target_arch = "wasm32"))]
44#[derive(Debug)]
45pub(super) enum SyncRequest {
46    Timer(tokio::sync::oneshot::Sender<()>),
47    Debounce(tokio::sync::oneshot::Sender<()>),
48}
49
50#[derive(Clone, Debug, Default)]
51pub struct RecoveryOptions {
52    pub mode: RecoveryMode,
53    pub on_corruption: CorruptionAction,
54}
55
56#[derive(Clone, Debug, Default)]
57pub enum RecoveryMode {
58    #[default]
59    Full,
60    Sample {
61        count: usize,
62    },
63    Skip,
64}
65
66#[derive(Clone, Debug, Default)]
67pub enum CorruptionAction {
68    #[default]
69    Report,
70    Repair,
71    Fail,
72}
73
74#[derive(Clone, Debug, PartialEq)]
75pub enum CrashRecoveryAction {
76    NoActionNeeded,
77    Rollback,
78    Finalize,
79}
80
81#[derive(Clone, Debug, Default)]
82pub struct RecoveryReport {
83    pub total_blocks_verified: usize,
84    pub corrupted_blocks: Vec<u64>,
85    pub repaired_blocks: Vec<u64>,
86    pub verification_duration_ms: u64,
87}
88
89// On-disk JSON schema for fs_persist
90#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
91#[derive(serde::Serialize, serde::Deserialize, Default)]
92#[allow(dead_code)]
93struct FsMeta {
94    entries: Vec<(u64, BlockMetadataPersist)>,
95}
96
97#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
98#[derive(serde::Serialize, serde::Deserialize, Default)]
99#[allow(dead_code)]
100struct FsAlloc {
101    allocated: Vec<u64>,
102}
103
104#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
105#[derive(serde::Serialize, serde::Deserialize, Default)]
106#[allow(dead_code)]
107struct FsDealloc {
108    tombstones: Vec<u64>,
109}
110
111// Metadata mirror for native builds
112#[cfg(not(target_arch = "wasm32"))]
113thread_local! {
114    pub(super) static GLOBAL_METADATA_TEST: parking_lot::Mutex<HashMap<String, HashMap<u64, BlockMetadataPersist>>> = parking_lot::Mutex::new(HashMap::new());
115}
116
117// Test-only commit marker mirror for native builds (when fs_persist is disabled)
118#[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions)))]
119thread_local! {
120    static GLOBAL_COMMIT_MARKER_TEST: parking_lot::Mutex<HashMap<String, u64>> = parking_lot::Mutex::new(HashMap::new());
121}
122
123#[derive(Clone, Debug)]
124pub struct SyncPolicy {
125    pub interval_ms: Option<u64>,
126    pub max_dirty: Option<usize>,
127    pub max_dirty_bytes: Option<usize>,
128    pub debounce_ms: Option<u64>,
129    pub verify_after_write: bool,
130}
131
132#[cfg(not(target_arch = "wasm32"))]
133impl Drop for BlockStorage {
134    fn drop(&mut self) {
135        if let Some(stop) = &self.auto_sync_stop {
136            stop.store(true, Ordering::SeqCst);
137        }
138        if let Some(handle) = self.auto_sync_thread.take() {
139            let _ = handle.join();
140        }
141        if let Some(handle) = self.debounce_thread.take() {
142            let _ = handle.join();
143        }
144        if let Some(task) = self.tokio_timer_task.take() {
145            task.abort();
146        }
147        if let Some(task) = self.tokio_debounce_task.take() {
148            task.abort();
149        }
150        self.auto_sync_stop = None;
151    }
152}
153
154pub const BLOCK_SIZE: usize = 4096;
155#[allow(dead_code)]
156pub(super) const DEFAULT_CACHE_CAPACITY: usize = 128;
157#[allow(dead_code)]
158const STORE_NAME: &str = "sqlite_blocks";
159#[allow(dead_code)]
160const METADATA_STORE: &str = "metadata";
161
162// Helper macro to abstract lock API differences: RefCell for WASM, Mutex for native
163// WASM: Use try_borrow_mut and handle reentrancy at call sites
164#[cfg(target_arch = "wasm32")]
165macro_rules! lock_mutex {
166    ($mutex:expr) => {
167        $mutex
168            .try_borrow_mut()
169            .expect("RefCell borrow failed - reentrancy detected in block_storage.rs")
170    };
171}
172
173// Try-lock version for reentrancy-safe operations
174#[allow(unused_macros)]
175#[cfg(target_arch = "wasm32")]
176macro_rules! try_lock_mutex {
177    ($mutex:expr) => {
178        $mutex
179    };
180}
181
182#[cfg(not(target_arch = "wasm32"))]
183macro_rules! lock_mutex {
184    ($mutex:expr) => {
185        $mutex.lock()
186    };
187}
188
189// Safe read macro for WASM that handles reentrancy
190#[allow(unused_macros)]
191#[cfg(target_arch = "wasm32")]
192macro_rules! read_lock {
193    ($mutex:expr) => {
194        $mutex
195            .try_borrow()
196            .expect("RefCell borrow failed - likely reentrancy issue")
197    };
198}
199
200#[allow(unused_macros)]
201#[cfg(not(target_arch = "wasm32"))]
202macro_rules! read_lock {
203    ($mutex:expr) => {
204        $mutex.lock()
205    };
206}
207
208// Try-lock macro for reentrancy-safe operations
209#[allow(unused_macros)]
210#[cfg(target_arch = "wasm32")]
211macro_rules! try_lock_mutex {
212    ($mutex:expr) => {
213        $mutex.ok()
214    };
215}
216
217#[allow(unused_macros)]
218#[cfg(not(target_arch = "wasm32"))]
219macro_rules! try_lock_mutex {
220    ($mutex:expr) => {
221        Some($mutex.lock())
222    };
223}
224
225// Try-read macro for reentrancy-safe read operations
226#[allow(unused_macros)]
227#[cfg(target_arch = "wasm32")]
228macro_rules! try_read_lock {
229    ($mutex:expr) => {
230        $mutex.try_borrow().ok()
231    };
232}
233
234#[allow(unused_macros)]
235#[cfg(not(target_arch = "wasm32"))]
236macro_rules! try_read_lock {
237    ($mutex:expr) => {
238        Some($mutex.lock())
239    };
240}
241
242pub struct BlockStorage {
243    // WASM: RefCell for zero-cost interior mutability (single-threaded)
244    // Native: Mutex for thread safety
245    #[cfg(target_arch = "wasm32")]
246    pub(super) cache: RefCell<HashMap<u64, Vec<u8>>>,
247    #[cfg(not(target_arch = "wasm32"))]
248    pub(super) cache: Mutex<HashMap<u64, Vec<u8>>>,
249
250    #[cfg(target_arch = "wasm32")]
251    pub(super) dirty_blocks: Arc<RefCell<HashMap<u64, Vec<u8>>>>,
252    #[cfg(not(target_arch = "wasm32"))]
253    pub(super) dirty_blocks: Arc<Mutex<HashMap<u64, Vec<u8>>>>,
254
255    #[cfg(target_arch = "wasm32")]
256    pub(super) allocated_blocks: RefCell<HashSet<u64>>,
257    #[cfg(not(target_arch = "wasm32"))]
258    pub(super) allocated_blocks: Mutex<HashSet<u64>>,
259
260    #[cfg(target_arch = "wasm32")]
261    #[allow(dead_code)]
262    pub(super) deallocated_blocks: RefCell<HashSet<u64>>,
263    #[cfg(not(target_arch = "wasm32"))]
264    #[allow(dead_code)]
265    pub(super) deallocated_blocks: Mutex<HashSet<u64>>,
266    pub(super) next_block_id: AtomicU64,
267    pub(super) capacity: usize,
268
269    #[cfg(target_arch = "wasm32")]
270    pub(super) lru_order: RefCell<VecDeque<u64>>,
271    #[cfg(not(target_arch = "wasm32"))]
272    pub(super) lru_order: Mutex<VecDeque<u64>>,
273
274    // Checksum management (moved to metadata module)
275    pub(super) checksum_manager: ChecksumManager,
276    #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
277    pub(super) base_dir: PathBuf,
278    pub(super) db_name: String,
279
280    // Background sync settings
281    #[cfg(target_arch = "wasm32")]
282    pub(super) auto_sync_interval: RefCell<Option<Duration>>,
283    #[cfg(not(target_arch = "wasm32"))]
284    pub(super) auto_sync_interval: Mutex<Option<Duration>>,
285
286    #[cfg(not(target_arch = "wasm32"))]
287    pub(super) last_auto_sync: Instant,
288
289    #[cfg(target_arch = "wasm32")]
290    pub(super) policy: RefCell<Option<SyncPolicy>>,
291    #[cfg(not(target_arch = "wasm32"))]
292    pub(super) policy: Mutex<Option<SyncPolicy>>,
293    #[cfg(not(target_arch = "wasm32"))]
294    pub(super) auto_sync_stop: Option<Arc<AtomicBool>>,
295    #[cfg(not(target_arch = "wasm32"))]
296    pub(super) auto_sync_thread: Option<std::thread::JoinHandle<()>>,
297    #[cfg(not(target_arch = "wasm32"))]
298    pub(super) debounce_thread: Option<std::thread::JoinHandle<()>>,
299    #[cfg(not(target_arch = "wasm32"))]
300    pub(super) tokio_timer_task: Option<TokioJoinHandle<()>>,
301    #[cfg(not(target_arch = "wasm32"))]
302    pub(super) tokio_debounce_task: Option<TokioJoinHandle<()>>,
303    #[cfg(not(target_arch = "wasm32"))]
304    pub(super) last_write_ms: Arc<AtomicU64>,
305    #[cfg(not(target_arch = "wasm32"))]
306    pub(super) threshold_hit: Arc<AtomicBool>,
307    #[cfg(not(target_arch = "wasm32"))]
308    pub(super) sync_count: Arc<AtomicU64>,
309    #[cfg(not(target_arch = "wasm32"))]
310    pub(super) timer_sync_count: Arc<AtomicU64>,
311    #[cfg(not(target_arch = "wasm32"))]
312    pub(super) debounce_sync_count: Arc<AtomicU64>,
313    #[cfg(not(target_arch = "wasm32"))]
314    pub(super) last_sync_duration_ms: Arc<AtomicU64>,
315
316    // Auto-sync channel for real sync operations
317    #[cfg(not(target_arch = "wasm32"))]
318    pub(super) sync_sender: Option<mpsc::UnboundedSender<SyncRequest>>,
319    #[cfg(not(target_arch = "wasm32"))]
320    pub(super) sync_receiver: Option<mpsc::UnboundedReceiver<SyncRequest>>,
321
322    // Startup recovery report
323    pub(super) recovery_report: RecoveryReport,
324
325    // Leader election manager (WASM only) - wrapped in RefCell for interior mutability
326    #[cfg(target_arch = "wasm32")]
327    pub leader_election: std::cell::RefCell<Option<super::leader_election::LeaderElectionManager>>,
328
329    // Observability manager
330    pub(super) observability: super::observability::ObservabilityManager,
331
332    // Telemetry metrics (optional)
333    #[cfg(feature = "telemetry")]
334    pub(super) metrics: Option<crate::telemetry::Metrics>,
335}
336
337impl BlockStorage {
338    /// Create a new BlockStorage synchronously without IndexedDB restoration
339    /// Used for auto-registration in VFS when existing data is detected
340    #[cfg(target_arch = "wasm32")]
341    pub fn new_sync(db_name: &str) -> Self {
342        log::info!(
343            "Creating BlockStorage synchronously for database: {}",
344            db_name
345        );
346
347        // Load existing data from GLOBAL_STORAGE to support multi-connection scenarios
348        use crate::storage::vfs_sync::with_global_storage;
349        let (cache, allocated_blocks, max_block_id) = with_global_storage(|storage_map| {
350            if let Some(db_storage) = storage_map.borrow().get(db_name) {
351                let cache = db_storage.clone();
352                let allocated = db_storage.keys().copied().collect::<HashSet<_>>();
353                let max_id = db_storage.keys().max().copied().unwrap_or(0);
354                (cache, allocated, max_id)
355            } else {
356                (HashMap::new(), HashSet::new(), 0)
357            }
358        });
359
360        log::info!(
361            "Loaded {} blocks from GLOBAL_STORAGE for {} (max_block_id={})",
362            cache.len(),
363            db_name,
364            max_block_id
365        );
366
367        // CRITICAL: Always reload checksums from GLOBAL_METADATA since cache might be stale
368        // This handles the case where import writes new data after close() reloaded cache
369        use crate::storage::vfs_sync::with_global_metadata;
370        let checksum_manager = with_global_metadata(|metadata_map| {
371            if let Some(db_metadata) = metadata_map.borrow().get(db_name) {
372                let mut checksums = HashMap::new();
373                let mut algos = HashMap::new();
374                for (block_id, meta) in db_metadata {
375                    checksums.insert(*block_id, meta.checksum);
376                    algos.insert(*block_id, meta.algo);
377                }
378                ChecksumManager::with_data(checksums, algos, ChecksumAlgorithm::FastHash)
379            } else {
380                ChecksumManager::new(ChecksumAlgorithm::FastHash)
381            }
382        });
383
384        Self {
385            cache: RefCell::new(cache),
386            dirty_blocks: Arc::new(RefCell::new(HashMap::new())),
387            allocated_blocks: RefCell::new(allocated_blocks),
388            deallocated_blocks: RefCell::new(HashSet::new()),
389            next_block_id: AtomicU64::new(max_block_id + 1),
390            capacity: 128,
391            lru_order: RefCell::new(VecDeque::new()),
392            checksum_manager,
393            db_name: db_name.to_string(),
394            auto_sync_interval: RefCell::new(None),
395            policy: RefCell::new(None),
396            #[cfg(not(target_arch = "wasm32"))]
397            last_auto_sync: Instant::now(),
398            #[cfg(not(target_arch = "wasm32"))]
399            auto_sync_stop: None,
400            #[cfg(not(target_arch = "wasm32"))]
401            auto_sync_thread: None,
402            #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
403            base_dir: std::path::PathBuf::from(
404                std::env::var("ABSURDERSQL_FS_BASE")
405                    .unwrap_or_else(|_| "./test_storage".to_string()),
406            ),
407            #[cfg(not(target_arch = "wasm32"))]
408            debounce_thread: None,
409            #[cfg(not(target_arch = "wasm32"))]
410            tokio_timer_task: None,
411            #[cfg(not(target_arch = "wasm32"))]
412            tokio_debounce_task: None,
413            #[cfg(not(target_arch = "wasm32"))]
414            last_write_ms: Arc::new(AtomicU64::new(0)),
415            #[cfg(not(target_arch = "wasm32"))]
416            threshold_hit: Arc::new(AtomicBool::new(false)),
417            #[cfg(not(target_arch = "wasm32"))]
418            sync_count: Arc::new(AtomicU64::new(0)),
419            #[cfg(not(target_arch = "wasm32"))]
420            timer_sync_count: Arc::new(AtomicU64::new(0)),
421            #[cfg(not(target_arch = "wasm32"))]
422            debounce_sync_count: Arc::new(AtomicU64::new(0)),
423            #[cfg(not(target_arch = "wasm32"))]
424            last_sync_duration_ms: Arc::new(AtomicU64::new(0)),
425            #[cfg(not(target_arch = "wasm32"))]
426            sync_sender: None,
427            #[cfg(not(target_arch = "wasm32"))]
428            sync_receiver: None,
429            recovery_report: RecoveryReport::default(),
430            #[cfg(target_arch = "wasm32")]
431            leader_election: std::cell::RefCell::new(None),
432            observability: super::observability::ObservabilityManager::new(),
433            #[cfg(feature = "telemetry")]
434            metrics: None,
435        }
436    }
437
438    #[cfg(target_arch = "wasm32")]
439    pub async fn new(db_name: &str) -> Result<Self, DatabaseError> {
440        super::constructors::new_wasm(db_name).await
441    }
442
443    #[cfg(not(target_arch = "wasm32"))]
444    pub async fn new(db_name: &str) -> Result<Self, DatabaseError> {
445        log::info!("Creating BlockStorage for database: {}", db_name);
446
447        // Initialize allocation tracking for native
448        let (allocated_blocks, next_block_id) = {
449            #[cfg(feature = "fs_persist")]
450            {
451                // fs_persist: restore allocation from filesystem
452                let mut allocated_blocks = HashSet::new();
453                let mut next_block_id: u64 = 1;
454
455                let base_path = std::env::var("ABSURDERSQL_FS_BASE")
456                    .unwrap_or_else(|_| "./test_storage".to_string());
457                let mut alloc_path = std::path::PathBuf::from(base_path);
458                alloc_path.push(db_name);
459                alloc_path.push("allocations.json");
460
461                if let Ok(content) = std::fs::read_to_string(&alloc_path) {
462                    if let Ok(alloc_data) = serde_json::from_str::<serde_json::Value>(&content) {
463                        if let Some(allocated_array) = alloc_data["allocated"].as_array() {
464                            for block_id_val in allocated_array {
465                                if let Some(block_id) = block_id_val.as_u64() {
466                                    allocated_blocks.insert(block_id);
467                                }
468                            }
469                            next_block_id = allocated_blocks.iter().max().copied().unwrap_or(0) + 1;
470                        }
471                    }
472                }
473
474                (allocated_blocks, next_block_id)
475            }
476
477            #[cfg(not(feature = "fs_persist"))]
478            {
479                // Native test mode: use default allocation
480                (HashSet::new(), 1)
481            }
482        };
483
484        // Initialize checksums and checksum algorithms
485        let checksums_init: HashMap<u64, u64> = {
486            #[cfg(feature = "fs_persist")]
487            {
488                let mut map = HashMap::new();
489                let base_path = std::env::var("ABSURDERSQL_FS_BASE")
490                    .unwrap_or_else(|_| "./test_storage".to_string());
491                let mut meta_path = std::path::PathBuf::from(base_path);
492                meta_path.push(db_name);
493                meta_path.push("metadata.json");
494
495                if let Ok(content) = std::fs::read_to_string(&meta_path) {
496                    if let Ok(meta_data) = serde_json::from_str::<serde_json::Value>(&content) {
497                        if let Some(entries) = meta_data["entries"].as_array() {
498                            for entry in entries {
499                                if let Some(arr) = entry.as_array() {
500                                    if let (Some(block_id), Some(obj)) = (
501                                        arr.first().and_then(|v| v.as_u64()),
502                                        arr.get(1).and_then(|v| v.as_object()),
503                                    ) {
504                                        if let Some(checksum) =
505                                            obj.get("checksum").and_then(|v| v.as_u64())
506                                        {
507                                            map.insert(block_id, checksum);
508                                        }
509                                    }
510                                }
511                            }
512                        }
513                    }
514                }
515                map
516            }
517
518            #[cfg(not(feature = "fs_persist"))]
519            {
520                // Native test mode: restore checksums from global test storage
521                #[allow(unused_mut)]
522                let mut map = HashMap::new();
523                #[cfg(any(test, debug_assertions))]
524                GLOBAL_METADATA_TEST.with(|meta| {
525                    #[cfg(target_arch = "wasm32")]
526                    let meta_map = meta.borrow_mut();
527                    #[cfg(not(target_arch = "wasm32"))]
528                    let meta_map = meta.lock();
529                    if let Some(db_meta) = meta_map.get(db_name) {
530                        for (block_id, metadata) in db_meta.iter() {
531                            map.insert(*block_id, metadata.checksum);
532                        }
533                    }
534                });
535                map
536            }
537        };
538
539        let checksum_algos_init: HashMap<u64, ChecksumAlgorithm> = {
540            #[cfg(feature = "fs_persist")]
541            {
542                let mut map = HashMap::new();
543                let base_path = std::env::var("ABSURDERSQL_FS_BASE")
544                    .unwrap_or_else(|_| "./test_storage".to_string());
545                let mut meta_path = std::path::PathBuf::from(base_path);
546                meta_path.push(db_name);
547                meta_path.push("metadata.json");
548
549                if let Ok(content) = std::fs::read_to_string(&meta_path) {
550                    if let Ok(meta_data) = serde_json::from_str::<serde_json::Value>(&content) {
551                        if let Some(entries) = meta_data["entries"].as_array() {
552                            for entry in entries {
553                                if let Some(arr) = entry.as_array() {
554                                    if let (Some(block_id), Some(obj)) = (
555                                        arr.first().and_then(|v| v.as_u64()),
556                                        arr.get(1).and_then(|v| v.as_object()),
557                                    ) {
558                                        let algo = obj
559                                            .get("algo")
560                                            .and_then(|v| v.as_str())
561                                            .and_then(|s| match s {
562                                                "CRC32" => Some(ChecksumAlgorithm::CRC32),
563                                                "FastHash" => Some(ChecksumAlgorithm::FastHash),
564                                                _ => None,
565                                            })
566                                            .unwrap_or(ChecksumAlgorithm::FastHash);
567                                        map.insert(block_id, algo);
568                                    }
569                                }
570                            }
571                        }
572                    }
573                }
574                map
575            }
576
577            #[cfg(not(feature = "fs_persist"))]
578            {
579                // Native test mode: restore algorithms from global test storage
580                #[allow(unused_mut)]
581                let mut map = HashMap::new();
582                #[cfg(any(test, debug_assertions))]
583                GLOBAL_METADATA_TEST.with(|meta| {
584                    #[cfg(target_arch = "wasm32")]
585                    let meta_map = meta.borrow_mut();
586                    #[cfg(not(target_arch = "wasm32"))]
587                    let meta_map = meta.lock();
588                    if let Some(db_meta) = meta_map.get(db_name) {
589                        for (block_id, metadata) in db_meta.iter() {
590                            map.insert(*block_id, metadata.algo);
591                        }
592                    }
593                });
594                map
595            }
596        };
597
598        // Determine default checksum algorithm from environment (fs_persist native), fallback to FastHash
599        #[cfg(feature = "fs_persist")]
600        let checksum_algo_default = match std::env::var("DATASYNC_CHECKSUM_ALGO").ok().as_deref() {
601            Some("CRC32") => ChecksumAlgorithm::CRC32,
602            _ => ChecksumAlgorithm::FastHash,
603        };
604        #[cfg(not(feature = "fs_persist"))]
605        let checksum_algo_default = ChecksumAlgorithm::FastHash;
606
607        // Load deallocated blocks from filesystem
608        let deallocated_blocks_init: HashSet<u64> = {
609            #[cfg(feature = "fs_persist")]
610            {
611                let mut set = HashSet::new();
612                let base_path = std::env::var("ABSURDERSQL_FS_BASE")
613                    .unwrap_or_else(|_| "./test_storage".to_string());
614                let mut path = std::path::PathBuf::from(base_path);
615                path.push(db_name);
616                let mut dealloc_path = path.clone();
617                dealloc_path.push("deallocated.json");
618                if let Ok(content) = std::fs::read_to_string(&dealloc_path) {
619                    if let Ok(dealloc_data) = serde_json::from_str::<serde_json::Value>(&content) {
620                        if let Some(tombstones_array) = dealloc_data["tombstones"].as_array() {
621                            for tombstone_val in tombstones_array {
622                                if let Some(block_id) = tombstone_val.as_u64() {
623                                    set.insert(block_id);
624                                }
625                            }
626                        }
627                    }
628                }
629                set
630            }
631            #[cfg(not(feature = "fs_persist"))]
632            {
633                HashSet::new()
634            }
635        };
636
637        Ok(BlockStorage {
638            db_name: db_name.to_string(),
639            cache: Mutex::new(HashMap::new()),
640            lru_order: Mutex::new(VecDeque::new()),
641            capacity: 1000,
642            checksum_manager: ChecksumManager::with_data(
643                checksums_init,
644                checksum_algos_init,
645                checksum_algo_default,
646            ),
647            dirty_blocks: Arc::new(Mutex::new(HashMap::new())),
648            allocated_blocks: Mutex::new(allocated_blocks),
649            next_block_id: AtomicU64::new(next_block_id),
650            deallocated_blocks: Mutex::new(deallocated_blocks_init),
651            policy: Mutex::new(None),
652            auto_sync_interval: Mutex::new(None),
653            #[cfg(not(target_arch = "wasm32"))]
654            last_auto_sync: Instant::now(),
655            #[cfg(not(target_arch = "wasm32"))]
656            auto_sync_stop: None,
657            #[cfg(not(target_arch = "wasm32"))]
658            auto_sync_thread: None,
659            #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
660            base_dir: std::path::PathBuf::from(
661                std::env::var("ABSURDERSQL_FS_BASE")
662                    .unwrap_or_else(|_| "./test_storage".to_string()),
663            ),
664            #[cfg(not(target_arch = "wasm32"))]
665            debounce_thread: None,
666            #[cfg(not(target_arch = "wasm32"))]
667            tokio_timer_task: None,
668            #[cfg(not(target_arch = "wasm32"))]
669            tokio_debounce_task: None,
670            #[cfg(not(target_arch = "wasm32"))]
671            last_write_ms: Arc::new(AtomicU64::new(0)),
672            #[cfg(not(target_arch = "wasm32"))]
673            threshold_hit: Arc::new(AtomicBool::new(false)),
674            #[cfg(not(target_arch = "wasm32"))]
675            sync_count: Arc::new(AtomicU64::new(0)),
676            #[cfg(not(target_arch = "wasm32"))]
677            timer_sync_count: Arc::new(AtomicU64::new(0)),
678            #[cfg(not(target_arch = "wasm32"))]
679            debounce_sync_count: Arc::new(AtomicU64::new(0)),
680            #[cfg(not(target_arch = "wasm32"))]
681            last_sync_duration_ms: Arc::new(AtomicU64::new(0)),
682            #[cfg(not(target_arch = "wasm32"))]
683            sync_sender: None,
684            #[cfg(not(target_arch = "wasm32"))]
685            sync_receiver: None,
686            recovery_report: RecoveryReport::default(),
687            #[cfg(target_arch = "wasm32")]
688            leader_election: std::cell::RefCell::new(None),
689            observability: super::observability::ObservabilityManager::new(),
690            #[cfg(feature = "telemetry")]
691            metrics: None,
692        })
693    }
694
695    pub async fn new_with_capacity(db_name: &str, capacity: usize) -> Result<Self, DatabaseError> {
696        let mut s = Self::new(db_name).await?;
697        s.capacity = capacity;
698        Ok(s)
699    }
700
701    pub async fn new_with_recovery_options(
702        db_name: &str,
703        recovery_opts: RecoveryOptions,
704    ) -> Result<Self, DatabaseError> {
705        let mut storage = Self::new(db_name).await?;
706
707        storage.perform_startup_recovery(recovery_opts).await?;
708
709        Ok(storage)
710    }
711
712    pub fn get_recovery_report(&self) -> &RecoveryReport {
713        &self.recovery_report
714    }
715
716    async fn perform_startup_recovery(
717        &mut self,
718        opts: RecoveryOptions,
719    ) -> Result<(), DatabaseError> {
720        super::recovery::perform_startup_recovery(self, opts).await
721    }
722
723    pub(super) async fn get_blocks_for_verification(
724        &self,
725        mode: &RecoveryMode,
726    ) -> Result<Vec<u64>, DatabaseError> {
727        let all_blocks: Vec<u64> = lock_mutex!(self.allocated_blocks).iter().copied().collect();
728
729        match mode {
730            RecoveryMode::Full => Ok(all_blocks),
731            RecoveryMode::Sample { count } => {
732                let sample_count = (*count).min(all_blocks.len());
733                let mut sampled = all_blocks;
734                sampled.sort_unstable(); // Deterministic sampling
735                sampled.truncate(sample_count);
736                Ok(sampled)
737            }
738            RecoveryMode::Skip => Ok(Vec::new()),
739        }
740    }
741
742    pub(super) async fn verify_block_integrity(
743        &mut self,
744        block_id: u64,
745    ) -> Result<bool, DatabaseError> {
746        // Read the block data
747        let data = match self.read_block_from_storage(block_id).await {
748            Ok(data) => data,
749            Err(_) => {
750                log::warn!(
751                    "Could not read block {} for integrity verification",
752                    block_id
753                );
754                return Ok(false);
755            }
756        };
757
758        // Verify against stored checksum
759        match self.verify_against_stored_checksum(block_id, &data) {
760            Ok(()) => Ok(true),
761            Err(e) => {
762                log::warn!(
763                    "Block {} failed checksum verification: {}",
764                    block_id,
765                    e.message
766                );
767                Ok(false)
768            }
769        }
770    }
771
772    async fn read_block_from_storage(&mut self, block_id: u64) -> Result<Vec<u8>, DatabaseError> {
773        // Try to read from filesystem first (fs_persist mode)
774        #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
775        {
776            let mut blocks_dir = self.base_dir.clone();
777            blocks_dir.push(&self.db_name);
778            blocks_dir.push("blocks");
779            let block_file = blocks_dir.join(format!("block_{}.bin", block_id));
780
781            if let Ok(data) = std::fs::read(&block_file) {
782                if data.len() == BLOCK_SIZE {
783                    return Ok(data);
784                }
785            }
786        }
787
788        // Fallback to test storage for native tests
789        #[cfg(all(
790            not(target_arch = "wasm32"),
791            any(test, debug_assertions),
792            not(feature = "fs_persist")
793        ))]
794        {
795            let mut found_data = None;
796            vfs_sync::with_global_storage(|storage| {
797                #[cfg(target_arch = "wasm32")]
798                let storage_map = storage;
799                #[cfg(not(target_arch = "wasm32"))]
800                let storage_map = storage.borrow();
801                if let Some(db_storage) = storage_map.get(&self.db_name) {
802                    if let Some(data) = db_storage.get(&block_id) {
803                        found_data = Some(data.clone());
804                    }
805                }
806            });
807            if let Some(data) = found_data {
808                return Ok(data);
809            }
810        }
811
812        // WASM global storage
813        #[cfg(target_arch = "wasm32")]
814        {
815            let mut found_data = None;
816            vfs_sync::with_global_storage(|storage_map| {
817                if let Some(db_storage) = storage_map.borrow().get(&self.db_name) {
818                    if let Some(data) = db_storage.get(&block_id) {
819                        found_data = Some(data.clone());
820                    }
821                }
822            });
823            if let Some(data) = found_data {
824                return Ok(data);
825            }
826        }
827
828        Err(DatabaseError::new(
829            "BLOCK_NOT_FOUND",
830            &format!("Block {} not found in storage", block_id),
831        ))
832    }
833
834    pub(super) async fn repair_corrupted_block(
835        &mut self,
836        block_id: u64,
837    ) -> Result<bool, DatabaseError> {
838        log::info!("Attempting to repair corrupted block {}", block_id);
839
840        // For now, repair by removing the corrupted block and clearing its metadata
841        // In a real implementation, this might involve restoring from backup or rebuilding
842
843        // Remove from cache
844        lock_mutex!(self.cache).remove(&block_id);
845
846        // Remove checksum metadata
847        self.checksum_manager.remove_checksum(block_id);
848
849        // Remove from filesystem if fs_persist is enabled
850        #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
851        {
852            let mut blocks_dir = self.base_dir.clone();
853            blocks_dir.push(&self.db_name);
854            blocks_dir.push("blocks");
855            let block_file = blocks_dir.join(format!("block_{}.bin", block_id));
856            let _ = std::fs::remove_file(&block_file);
857        }
858
859        // Remove from test storage
860        #[cfg(all(
861            not(target_arch = "wasm32"),
862            any(test, debug_assertions),
863            not(feature = "fs_persist")
864        ))]
865        {
866            vfs_sync::with_global_storage(|storage| {
867                let mut storage_map = storage.borrow_mut();
868                if let Some(db_storage) = storage_map.get_mut(&self.db_name) {
869                    db_storage.remove(&block_id);
870                }
871            });
872        }
873
874        // Remove from WASM storage
875        #[cfg(target_arch = "wasm32")]
876        {
877            vfs_sync::with_global_storage(|storage_map| {
878                if let Some(db_storage) = storage_map.borrow_mut().get_mut(&self.db_name) {
879                    db_storage.remove(&block_id);
880                }
881            });
882        }
883
884        log::info!(
885            "Corrupted block {} has been removed (repair completed)",
886            block_id
887        );
888        Ok(true)
889    }
890
891    pub(super) fn touch_lru(&self, block_id: u64) {
892        // Remove any existing occurrence
893        let mut lru = lock_mutex!(self.lru_order);
894        if let Some(pos) = lru.iter().position(|&id| id == block_id) {
895            lru.remove(pos);
896        }
897        // Push as most-recent
898        lru.push_back(block_id);
899    }
900
901    pub(super) fn evict_if_needed(&self) {
902        // Evict clean LRU blocks until within capacity. Never evict dirty blocks.
903        loop {
904            // Check capacity and find victim in a single critical section
905            let victim_opt = {
906                let cache_guard = lock_mutex!(self.cache);
907                if cache_guard.len() <= self.capacity {
908                    break; // Within capacity, done
909                }
910
911                // Find the least-recent block that is NOT dirty
912                let dirty_guard = lock_mutex!(self.dirty_blocks);
913                let mut lru_guard = lock_mutex!(self.lru_order);
914
915                let victim_pos = lru_guard
916                    .iter()
917                    .position(|id| !dirty_guard.contains_key(id));
918
919                victim_pos.map(|pos| lru_guard.remove(pos).expect("valid pos"))
920                // Guards are dropped here
921            };
922
923            match victim_opt {
924                Some(victim) => {
925                    // Remove from cache (new lock acquisition)
926                    lock_mutex!(self.cache).remove(&victim);
927                }
928                None => {
929                    // All blocks are dirty; cannot evict. Allow temporary overflow.
930                    break;
931                }
932            }
933        }
934    }
935
936    #[inline]
937    #[cfg(target_arch = "wasm32")]
938    pub fn now_millis() -> u64 {
939        // Date::now() returns milliseconds since UNIX epoch as f64
940        Date::now() as u64
941    }
942
943    #[inline]
944    #[cfg(not(target_arch = "wasm32"))]
945    pub fn now_millis() -> u64 {
946        let now = SystemTime::now()
947            .duration_since(UNIX_EPOCH)
948            .unwrap_or_else(|_| Duration::from_millis(0));
949        now.as_millis() as u64
950    }
951
952    pub(super) fn verify_against_stored_checksum(
953        &self,
954        block_id: u64,
955        data: &[u8],
956    ) -> Result<(), DatabaseError> {
957        self.checksum_manager.validate_checksum(block_id, data)
958    }
959
960    /// Synchronous block read for environments that require sync access (e.g., VFS callbacks)
961    pub fn read_block_sync(&self, block_id: u64) -> Result<Vec<u8>, DatabaseError> {
962        // Implementation moved to io_operations module
963        super::io_operations::read_block_sync_impl(self, block_id)
964    }
965
966    pub async fn read_block(&self, block_id: u64) -> Result<Vec<u8>, DatabaseError> {
967        // Delegate to synchronous implementation (immediately ready)
968        self.read_block_sync(block_id)
969    }
970
971    /// Synchronous block write for environments that require sync access (e.g., VFS callbacks)
972    #[cfg(target_arch = "wasm32")]
973    pub fn write_block_sync(&self, block_id: u64, data: Vec<u8>) -> Result<(), DatabaseError> {
974        super::io_operations::write_block_sync_impl(self, block_id, data)
975    }
976
977    #[cfg(not(target_arch = "wasm32"))]
978    pub fn write_block_sync(&mut self, block_id: u64, data: Vec<u8>) -> Result<(), DatabaseError> {
979        super::io_operations::write_block_sync_impl(self, block_id, data)
980    }
981
982    #[cfg(target_arch = "wasm32")]
983    pub async fn write_block(&self, block_id: u64, data: Vec<u8>) -> Result<(), DatabaseError> {
984        self.write_block_sync(block_id, data)
985    }
986
987    #[cfg(not(target_arch = "wasm32"))]
988    pub async fn write_block(&mut self, block_id: u64, data: Vec<u8>) -> Result<(), DatabaseError> {
989        self.write_block_sync(block_id, data)?;
990
991        // If threshold was hit and debounce is NOT enabled, perform inline sync
992        if self.threshold_hit.load(std::sync::atomic::Ordering::SeqCst) {
993            let has_debounce = lock_mutex!(self.policy)
994                .as_ref()
995                .and_then(|p| p.debounce_ms)
996                .is_some();
997            if !has_debounce {
998                log::debug!("Threshold hit without debounce: performing inline sync");
999                self.sync_implementation()?;
1000                self.threshold_hit
1001                    .store(false, std::sync::atomic::Ordering::SeqCst);
1002            }
1003        }
1004
1005        Ok(())
1006    }
1007
1008    /// Synchronous batch write of blocks
1009    #[cfg(target_arch = "wasm32")]
1010    pub fn write_blocks_sync(&self, items: Vec<(u64, Vec<u8>)>) -> Result<(), DatabaseError> {
1011        self.maybe_auto_sync();
1012        for (block_id, data) in items {
1013            self.write_block_sync(block_id, data)?;
1014        }
1015        Ok(())
1016    }
1017
1018    #[cfg(not(target_arch = "wasm32"))]
1019    pub fn write_blocks_sync(&mut self, items: Vec<(u64, Vec<u8>)>) -> Result<(), DatabaseError> {
1020        self.maybe_auto_sync();
1021        for (block_id, data) in items {
1022            self.write_block_sync(block_id, data)?;
1023        }
1024        Ok(())
1025    }
1026
1027    /// Async batch write wrapper
1028    #[cfg(target_arch = "wasm32")]
1029    pub async fn write_blocks(&self, items: Vec<(u64, Vec<u8>)>) -> Result<(), DatabaseError> {
1030        self.write_blocks_sync(items)
1031    }
1032
1033    #[cfg(not(target_arch = "wasm32"))]
1034    pub async fn write_blocks(&mut self, items: Vec<(u64, Vec<u8>)>) -> Result<(), DatabaseError> {
1035        self.write_blocks_sync(items)
1036    }
1037
1038    /// Synchronous batch read of blocks, preserving input order
1039    pub fn read_blocks_sync(&self, block_ids: &[u64]) -> Result<Vec<Vec<u8>>, DatabaseError> {
1040        self.maybe_auto_sync();
1041        let mut results = Vec::with_capacity(block_ids.len());
1042        for &id in block_ids {
1043            results.push(self.read_block_sync(id)?);
1044        }
1045        Ok(results)
1046    }
1047
1048    /// Async batch read wrapper
1049    pub async fn read_blocks(&self, block_ids: &[u64]) -> Result<Vec<Vec<u8>>, DatabaseError> {
1050        self.read_blocks_sync(block_ids)
1051    }
1052
1053    /// Get block checksum for verification
1054    pub fn get_block_checksum(&self, block_id: u64) -> Option<u32> {
1055        self.checksum_manager
1056            .get_checksum(block_id)
1057            .map(|checksum| checksum as u32)
1058    }
1059
1060    /// Get current commit marker for this database (WASM only, for testing)
1061    #[cfg(target_arch = "wasm32")]
1062    pub fn get_commit_marker(&self) -> u64 {
1063        vfs_sync::with_global_commit_marker(|cm| {
1064            cm.borrow().get(&self.db_name).copied().unwrap_or(0)
1065        })
1066    }
1067
1068    /// Check if this database has any blocks in storage (WASM only)
1069    #[cfg(target_arch = "wasm32")]
1070    pub fn has_any_blocks(&self) -> bool {
1071        vfs_sync::with_global_storage(|storage_map| {
1072            storage_map
1073                .borrow()
1074                .get(&self.db_name)
1075                .map_or(false, |blocks| !blocks.is_empty())
1076        })
1077    }
1078
1079    pub async fn verify_block_checksum(&mut self, block_id: u64) -> Result<(), DatabaseError> {
1080        // If cached, verify directly against cached bytes
1081        if let Some(bytes) = lock_mutex!(self.cache).get(&block_id).cloned() {
1082            return self.verify_against_stored_checksum(block_id, &bytes);
1083        }
1084        // Otherwise, a read will populate cache and also verify
1085        let data = self.read_block_sync(block_id)?;
1086        self.verify_against_stored_checksum(block_id, &data)
1087    }
1088
1089    // Always available for testing (integration tests need this in release mode)
1090    #[allow(unused_mut)]
1091    pub fn get_block_metadata_for_testing(&mut self) -> HashMap<u64, (u64, u32, u64)> {
1092        // Returns map of block_id -> (checksum, version, last_modified_ms)
1093        #[cfg(target_arch = "wasm32")]
1094        {
1095            let mut out = HashMap::new();
1096            vfs_sync::with_global_metadata(|meta_map| {
1097                if let Some(db_meta) = meta_map.borrow().get(&self.db_name) {
1098                    for (bid, m) in db_meta.iter() {
1099                        out.insert(*bid, (m.checksum, m.version, m.last_modified_ms));
1100                    }
1101                }
1102            });
1103            out
1104        }
1105        #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
1106        {
1107            use super::fs_persist::FsMeta;
1108            use std::io::Read;
1109            let mut out = HashMap::new();
1110            let base: PathBuf = self.base_dir.clone();
1111            let mut db_dir = base.clone();
1112            db_dir.push(&self.db_name);
1113            let mut meta_path = db_dir.clone();
1114            meta_path.push("metadata.json");
1115            if let Ok(mut f) = std::fs::File::open(&meta_path) {
1116                let mut s = String::new();
1117                if f.read_to_string(&mut s).is_ok() {
1118                    if let Ok(meta) = serde_json::from_str::<FsMeta>(&s) {
1119                        for (block_id, metadata) in meta.entries {
1120                            out.insert(
1121                                block_id,
1122                                (
1123                                    metadata.checksum,
1124                                    metadata.version,
1125                                    metadata.last_modified_ms,
1126                                ),
1127                            );
1128                        }
1129                    }
1130                }
1131            }
1132            out
1133        }
1134        #[cfg(all(
1135            not(target_arch = "wasm32"),
1136            any(test, debug_assertions),
1137            not(feature = "fs_persist")
1138        ))]
1139        {
1140            let mut out = HashMap::new();
1141            GLOBAL_METADATA_TEST.with(|meta| {
1142                #[cfg(target_arch = "wasm32")]
1143                let meta_map = meta.borrow_mut();
1144                #[cfg(not(target_arch = "wasm32"))]
1145                let meta_map = meta.lock();
1146                if let Some(db_meta) = meta_map.get(&self.db_name) {
1147                    for (bid, m) in db_meta.iter() {
1148                        out.insert(*bid, (m.checksum, m.version, m.last_modified_ms));
1149                    }
1150                }
1151            });
1152            out
1153        }
1154    }
1155
1156    // Always available for testing (integration tests need this in release mode)
1157    pub fn set_block_checksum_for_testing(&mut self, block_id: u64, checksum: u64) {
1158        self.checksum_manager
1159            .set_checksum_for_testing(block_id, checksum);
1160    }
1161
1162    /// Getter for dirty_blocks for fs_persist and auto_sync modules
1163    #[cfg(not(target_arch = "wasm32"))]
1164    pub(super) fn get_dirty_blocks(&self) -> &Arc<Mutex<HashMap<u64, Vec<u8>>>> {
1165        &self.dirty_blocks
1166    }
1167
1168    #[cfg(target_arch = "wasm32")]
1169    #[allow(invalid_reference_casting)]
1170    pub async fn sync(&self) -> Result<(), DatabaseError> {
1171        // WASM: Use unsafe cast to get mutable access
1172        let self_mut = unsafe { &mut *(self as *const Self as *mut Self) };
1173        let result = self_mut.sync_implementation();
1174        // Give time for the spawned IndexedDB operations to complete
1175        wasm_bindgen_futures::JsFuture::from(js_sys::Promise::resolve(
1176            &wasm_bindgen::JsValue::UNDEFINED,
1177        ))
1178        .await
1179        .ok();
1180        result
1181    }
1182
1183    #[cfg(not(target_arch = "wasm32"))]
1184    pub async fn sync(&mut self) -> Result<(), DatabaseError> {
1185        self.sync_implementation()
1186    }
1187
1188    /// Synchronous version of sync() for immediate persistence
1189    #[cfg(target_arch = "wasm32")]
1190    #[allow(invalid_reference_casting)]
1191    pub fn sync_now(&self) -> Result<(), DatabaseError> {
1192        // WASM can use &self due to RefCell interior mutability
1193        // Cast self to mutable for the implementation
1194        let self_mut = unsafe { &mut *(self as *const Self as *mut Self) };
1195        self_mut.sync_implementation()
1196    }
1197
1198    #[cfg(not(target_arch = "wasm32"))]
1199    pub fn sync_now(&mut self) -> Result<(), DatabaseError> {
1200        self.sync_implementation()
1201    }
1202
1203    /// Internal sync implementation shared by sync() and sync_now()
1204    fn sync_implementation(&mut self) -> Result<(), DatabaseError> {
1205        super::sync_operations::sync_implementation_impl(self)
1206    }
1207
1208    /// Sync blocks to global storage without advancing commit marker
1209    /// Used by VFS x_sync callback to persist blocks but maintain commit marker lag
1210    #[cfg(target_arch = "wasm32")]
1211    pub fn sync_blocks_only(&mut self) -> Result<(), DatabaseError> {
1212        super::wasm_vfs_sync::sync_blocks_only(self)
1213    }
1214
1215    /// Async version of sync for WASM that properly awaits IndexedDB persistence
1216    #[cfg(target_arch = "wasm32")]
1217    pub async fn sync_async(&self) -> Result<(), DatabaseError> {
1218        super::wasm_indexeddb::sync_async(self).await
1219    }
1220
1221    /// Drain all pending dirty blocks and stop background auto-sync (if enabled).
1222    /// Safe to call multiple times.
1223    #[cfg(target_arch = "wasm32")]
1224    #[allow(invalid_reference_casting)]
1225    pub fn drain_and_shutdown(&self) {
1226        // WASM: Use unsafe cast to get mutable access
1227        let self_mut = unsafe { &mut *(self as *const Self as *mut Self) };
1228        if let Err(e) = self_mut.sync_now() {
1229            log::error!("drain_and_shutdown: sync_now failed: {}", e.message);
1230        }
1231        *lock_mutex!(self.auto_sync_interval) = None;
1232    }
1233
1234    #[cfg(not(target_arch = "wasm32"))]
1235    pub fn drain_and_shutdown(&mut self) {
1236        if let Err(e) = self.sync_now() {
1237            log::error!("drain_and_shutdown: sync_now failed: {}", e.message);
1238        }
1239        *lock_mutex!(self.auto_sync_interval) = None;
1240        #[cfg(not(target_arch = "wasm32"))]
1241        {
1242            if let Some(stop) = &self.auto_sync_stop {
1243                stop.store(true, Ordering::SeqCst);
1244            }
1245            if let Some(handle) = self.auto_sync_thread.take() {
1246                let _ = handle.join();
1247            }
1248            if let Some(handle) = self.debounce_thread.take() {
1249                let _ = handle.join();
1250            }
1251            if let Some(task) = self.tokio_timer_task.take() {
1252                task.abort();
1253            }
1254            if let Some(task) = self.tokio_debounce_task.take() {
1255                task.abort();
1256            }
1257            self.auto_sync_stop = None;
1258            self.threshold_hit.store(false, Ordering::SeqCst);
1259        }
1260    }
1261
1262    pub fn clear_cache(&self) {
1263        lock_mutex!(self.cache).clear();
1264        lock_mutex!(self.lru_order).clear();
1265    }
1266
1267    /// Handle notification that the database has been imported
1268    ///
1269    /// This method should be called after a database import to ensure
1270    /// that any cached data is invalidated and fresh data is read from storage.
1271    ///
1272    /// # Returns
1273    /// * `Ok(())` - Cache cleared successfully
1274    /// * `Err(DatabaseError)` - If cache clearing fails
1275    ///
1276    /// # Example
1277    /// ```rust,no_run
1278    /// # use absurder_sql::storage::block_storage::BlockStorage;
1279    /// # async fn example() -> Result<(), absurder_sql::types::DatabaseError> {
1280    /// let mut storage = BlockStorage::new("mydb").await?;
1281    /// // ... database is imported externally ...
1282    /// storage.on_database_import().await?;
1283    /// # Ok(())
1284    /// # }
1285    /// ```
1286    pub async fn on_database_import(&mut self) -> Result<(), DatabaseError> {
1287        log::info!(
1288            "Clearing cache for database '{}' after import",
1289            self.db_name
1290        );
1291
1292        // Clear the LRU cache to force re-reading from storage
1293        self.clear_cache();
1294
1295        // Also clear dirty blocks since they're now stale
1296        lock_mutex!(self.dirty_blocks).clear();
1297
1298        // Clear checksum manager's cache to reload from new metadata
1299        self.checksum_manager.clear_checksums();
1300
1301        // Reload allocated blocks from global storage/allocation map
1302        #[cfg(target_arch = "wasm32")]
1303        {
1304            use super::vfs_sync::with_global_allocation_map;
1305            *lock_mutex!(self.allocated_blocks) = with_global_allocation_map(|gam| {
1306                gam.borrow()
1307                    .get(&self.db_name)
1308                    .cloned()
1309                    .unwrap_or_else(std::collections::HashSet::new)
1310            });
1311            log::debug!(
1312                "Reloaded {} allocated blocks from global allocation map",
1313                lock_mutex!(self.allocated_blocks).len()
1314            );
1315
1316            // Checksums are now managed by ChecksumManager, which loads from metadata on demand
1317            log::debug!("Checksum data will be reloaded from metadata on next verification");
1318        }
1319
1320        #[cfg(not(target_arch = "wasm32"))]
1321        {
1322            #[cfg(feature = "fs_persist")]
1323            {
1324                // Reload from filesystem allocations.json
1325                let mut alloc_path = self.base_dir.clone();
1326                alloc_path.push(&self.db_name);
1327                alloc_path.push("allocations.json");
1328
1329                if let Ok(content) = std::fs::read_to_string(&alloc_path) {
1330                    if let Ok(alloc_data) = serde_json::from_str::<serde_json::Value>(&content) {
1331                        if let Some(allocated_array) = alloc_data["allocated"].as_array() {
1332                            lock_mutex!(self.allocated_blocks).clear();
1333                            for block_id_val in allocated_array {
1334                                if let Some(block_id) = block_id_val.as_u64() {
1335                                    lock_mutex!(self.allocated_blocks).insert(block_id);
1336                                }
1337                            }
1338                            log::debug!(
1339                                "Reloaded {} allocated blocks from filesystem",
1340                                lock_mutex!(self.allocated_blocks).len()
1341                            );
1342                        }
1343                    }
1344                }
1345
1346                // Reload checksums from filesystem metadata.json
1347                let mut meta_path = self.base_dir.clone();
1348                meta_path.push(&self.db_name);
1349                meta_path.push("metadata.json");
1350
1351                if let Ok(content) = std::fs::read_to_string(&meta_path) {
1352                    if let Ok(meta_data) = serde_json::from_str::<serde_json::Value>(&content) {
1353                        if let Some(entries) = meta_data["entries"].as_array() {
1354                            let mut new_checksums = HashMap::new();
1355                            let mut new_algos = HashMap::new();
1356
1357                            for entry in entries {
1358                                if let (Some(block_id), Some(checksum), Some(algo_str)) = (
1359                                    entry[0].as_u64(),
1360                                    entry[1]["checksum"].as_u64(),
1361                                    entry[1]["algo"].as_str(),
1362                                ) {
1363                                    new_checksums.insert(block_id, checksum);
1364
1365                                    let algo = match algo_str {
1366                                        "CRC32" => super::metadata::ChecksumAlgorithm::CRC32,
1367                                        _ => super::metadata::ChecksumAlgorithm::FastHash,
1368                                    };
1369                                    new_algos.insert(block_id, algo);
1370                                }
1371                            }
1372
1373                            self.checksum_manager
1374                                .replace_all(new_checksums.clone(), new_algos);
1375                            log::debug!(
1376                                "Reloaded {} checksums from filesystem metadata",
1377                                new_checksums.len()
1378                            );
1379                        }
1380                    }
1381                } else {
1382                    log::debug!("No metadata file found, checksums will be empty after import");
1383                }
1384            }
1385
1386            #[cfg(not(feature = "fs_persist"))]
1387            {
1388                // Native test mode: reload from GLOBAL_ALLOCATION_MAP
1389                use super::vfs_sync::with_global_allocation_map;
1390
1391                *lock_mutex!(self.allocated_blocks) = with_global_allocation_map(|gam| {
1392                    #[cfg(target_arch = "wasm32")]
1393                    let map = gam;
1394                    #[cfg(not(target_arch = "wasm32"))]
1395                    let map = gam.borrow();
1396                    map.get(&self.db_name)
1397                        .cloned()
1398                        .unwrap_or_else(std::collections::HashSet::new)
1399                });
1400                log::debug!(
1401                    "Reloaded {} allocated blocks from global allocation map (native test)",
1402                    lock_mutex!(self.allocated_blocks).len()
1403                );
1404
1405                // Checksums are managed by ChecksumManager, loaded from metadata on demand
1406                log::debug!("Checksum data will be reloaded from metadata on next verification");
1407            }
1408        }
1409
1410        log::info!(
1411            "Cache and allocation state refreshed for '{}'",
1412            self.db_name
1413        );
1414
1415        Ok(())
1416    }
1417
1418    /// Reload cache from GLOBAL_STORAGE (WASM only, for multi-connection support)
1419    #[cfg(target_arch = "wasm32")]
1420    pub fn reload_cache_from_global_storage(&self) {
1421        use crate::storage::vfs_sync::{with_global_metadata, with_global_storage};
1422
1423        log::info!("[RELOAD] Starting cache reload for: {}", self.db_name);
1424
1425        #[cfg(target_arch = "wasm32")]
1426        web_sys::console::log_1(
1427            &format!("[RELOAD] BlockStorage.db_name = {}", self.db_name).into(),
1428        );
1429
1430        let fresh_cache = with_global_storage(|storage_map| {
1431            if let Some(db_storage) = storage_map.borrow().get(&self.db_name) {
1432                log::info!(
1433                    "[RELOAD] Found {} blocks in GLOBAL_STORAGE",
1434                    db_storage.len()
1435                );
1436                db_storage.clone()
1437            } else {
1438                log::warn!(
1439                    "[RELOAD] No blocks found in GLOBAL_STORAGE for {}",
1440                    self.db_name
1441                );
1442                std::collections::HashMap::new()
1443            }
1444        });
1445
1446        let block_count = fresh_cache.len();
1447        log::info!(
1448            "[RELOAD] Loading {} blocks into cache and marking as dirty",
1449            block_count
1450        );
1451
1452        // CRITICAL: Also reload checksums from GLOBAL_METADATA to match the fresh cache
1453        // Without this, cached reads will verify against stale checksums (e.g., after import)
1454        with_global_metadata(|metadata_map| {
1455            if let Some(db_metadata) = metadata_map.borrow().get(&self.db_name) {
1456                log::info!("[RELOAD] Found {} metadata entries", db_metadata.len());
1457                // Clear existing checksums
1458                self.checksum_manager.clear_checksums();
1459
1460                // Load checksums from metadata
1461                let mut new_checksums = std::collections::HashMap::new();
1462                let mut new_algos = std::collections::HashMap::new();
1463                for (block_id, meta) in db_metadata {
1464                    new_checksums.insert(*block_id, meta.checksum);
1465                    new_algos.insert(*block_id, meta.algo);
1466                }
1467                self.checksum_manager.replace_all(new_checksums, new_algos);
1468            } else {
1469                log::warn!("[RELOAD] No metadata found for {}", self.db_name);
1470                // No metadata exists, clear checksums
1471                self.checksum_manager.clear_checksums();
1472            }
1473        });
1474
1475        // Replace cache contents while preserving LRU order for blocks that still exist
1476        let old_lru = {
1477            let mut lru = lock_mutex!(self.lru_order);
1478            std::mem::replace(&mut *lru, std::collections::VecDeque::new())
1479        };
1480        lock_mutex!(self.cache).clear();
1481
1482        // Insert new cache data (no need to mark dirty - sync reads from GLOBAL_STORAGE)
1483        for (block_id, block_data) in fresh_cache {
1484            lock_mutex!(self.cache).insert(block_id, block_data);
1485        }
1486
1487        log::info!(
1488            "[RELOAD] Cache reloaded with {} blocks",
1489            lock_mutex!(self.cache).len()
1490        );
1491
1492        // Restore LRU order for blocks that still exist, then add new blocks
1493        for block_id in old_lru {
1494            if lock_mutex!(self.cache).contains_key(&block_id) {
1495                lock_mutex!(self.lru_order).push_back(block_id);
1496            }
1497        }
1498
1499        // Add any new blocks not in the old LRU order
1500        let block_ids: Vec<u64> = lock_mutex!(self.cache).keys().copied().collect();
1501        for block_id in block_ids {
1502            if !lock_mutex!(self.lru_order).contains(&block_id) {
1503                lock_mutex!(self.lru_order).push_back(block_id);
1504            }
1505        }
1506
1507        log::info!(
1508            "[RELOAD] Reload complete: {} blocks in cache, {} dirty",
1509            lock_mutex!(self.cache).len(),
1510            lock_mutex!(self.dirty_blocks).len()
1511        );
1512
1513        // CRITICAL DEBUG: Check if block 0 is valid SQLite header
1514        #[cfg(target_arch = "wasm32")]
1515        {
1516            if let Some(block_0) = lock_mutex!(self.cache).get(&0) {
1517                let is_valid = block_0.len() >= 16 && &block_0[0..16] == b"SQLite format 3\0";
1518                web_sys::console::log_1(
1519                    &format!(
1520                        "[RELOAD] Block 0 in cache: {} bytes, valid SQLite header: {}",
1521                        block_0.len(),
1522                        is_valid
1523                    )
1524                    .into(),
1525                );
1526                if block_0.len() >= 16 {
1527                    web_sys::console::log_1(
1528                        &format!("[RELOAD] Block 0 header [0..16]: {:02x?}", &block_0[0..16])
1529                            .into(),
1530                    );
1531                }
1532
1533                // CRITICAL: Check database size field at offset 24-27
1534                if block_0.len() >= 28 {
1535                    let db_size_bytes = [block_0[24], block_0[25], block_0[26], block_0[27]];
1536                    let db_size_pages = u32::from_be_bytes(db_size_bytes);
1537                    web_sys::console::log_1(
1538                        &format!(
1539                            "[RELOAD] Database size field (offset 24-27): {} pages ({:02x?})",
1540                            db_size_pages, db_size_bytes
1541                        )
1542                        .into(),
1543                    );
1544                }
1545
1546                // DUMP FULL HEADER for analysis
1547                if block_0.len() >= 100 {
1548                    web_sys::console::log_1(
1549                        &format!(
1550                            "[RELOAD] Full header dump [16-31]: {:02x?}",
1551                            &block_0[16..32]
1552                        )
1553                        .into(),
1554                    );
1555                    web_sys::console::log_1(
1556                        &format!(
1557                            "[RELOAD] Full header dump [32-47]: {:02x?}",
1558                            &block_0[32..48]
1559                        )
1560                        .into(),
1561                    );
1562                    web_sys::console::log_1(
1563                        &format!(
1564                            "[RELOAD] Full header dump [48-63]: {:02x?}",
1565                            &block_0[48..64]
1566                        )
1567                        .into(),
1568                    );
1569                    web_sys::console::log_1(
1570                        &format!(
1571                            "[RELOAD] Full header dump [64-79]: {:02x?}",
1572                            &block_0[64..80]
1573                        )
1574                        .into(),
1575                    );
1576                    web_sys::console::log_1(
1577                        &format!(
1578                            "[RELOAD] Full header dump [80-99]: {:02x?}",
1579                            &block_0[80..100]
1580                        )
1581                        .into(),
1582                    );
1583                }
1584            } else {
1585                web_sys::console::log_1(
1586                    &format!("[RELOAD] ERROR: Block 0 NOT in cache after reload!").into(),
1587                );
1588            }
1589        }
1590    }
1591
1592    pub fn get_cache_size(&self) -> usize {
1593        lock_mutex!(self.cache).len()
1594    }
1595
1596    pub fn get_dirty_count(&self) -> usize {
1597        lock_mutex!(self.dirty_blocks).len()
1598    }
1599
1600    pub fn get_db_name(&self) -> &str {
1601        &self.db_name
1602    }
1603
1604    pub fn is_cached(&self, block_id: u64) -> bool {
1605        lock_mutex!(self.cache).contains_key(&block_id)
1606    }
1607
1608    /// Allocate a new block and return its ID
1609    pub async fn allocate_block(&mut self) -> Result<u64, DatabaseError> {
1610        super::allocation::allocate_block_impl(self).await
1611    }
1612
1613    /// Deallocate a block and mark it as available for reuse
1614    pub async fn deallocate_block(&mut self, block_id: u64) -> Result<(), DatabaseError> {
1615        super::allocation::deallocate_block_impl(self, block_id).await
1616    }
1617
1618    /// Get the number of currently allocated blocks
1619    pub fn get_allocated_count(&self) -> usize {
1620        lock_mutex!(self.allocated_blocks).len()
1621    }
1622
1623    /// Crash simulation: simulate crash during IndexedDB commit
1624    /// If `blocks_written` is true, blocks are written to IndexedDB but commit marker doesn't advance
1625    /// If `blocks_written` is false, crash occurs before blocks are written
1626    #[cfg(target_arch = "wasm32")]
1627    pub async fn crash_simulation_sync(
1628        &mut self,
1629        blocks_written: bool,
1630    ) -> Result<(), DatabaseError> {
1631        log::info!(
1632            "CRASH SIMULATION: Starting crash simulation with blocks_written={}",
1633            blocks_written
1634        );
1635
1636        if blocks_written {
1637            // Simulate crash after blocks are written but before commit marker advances
1638            // This is the most critical crash scenario to test
1639
1640            // Step 1: Write blocks to IndexedDB (simulate partial transaction completion)
1641            let dirty_blocks = {
1642                let dirty = lock_mutex!(self.dirty_blocks);
1643                dirty.clone()
1644            };
1645
1646            if !dirty_blocks.is_empty() {
1647                log::info!(
1648                    "CRASH SIMULATION: Writing {} blocks to IndexedDB before crash",
1649                    dirty_blocks.len()
1650                );
1651
1652                // Use the existing IndexedDB persistence logic but don't advance commit marker
1653                let metadata_to_persist: Vec<(u64, u64)> = dirty_blocks
1654                    .keys()
1655                    .map(|&block_id| {
1656                        let next_commit = self.get_commit_marker() + 1;
1657                        (block_id, next_commit)
1658                    })
1659                    .collect();
1660
1661                log::debug!(
1662                    "CRASH SIMULATION: About to call persist_to_indexeddb for {} blocks",
1663                    dirty_blocks.len()
1664                );
1665
1666                // Write blocks and metadata to IndexedDB
1667                super::wasm_indexeddb::persist_to_indexeddb(
1668                    &self.db_name,
1669                    dirty_blocks,
1670                    metadata_to_persist,
1671                )
1672                .await?;
1673
1674                log::info!("CRASH SIMULATION: persist_to_indexeddb completed successfully");
1675                log::info!(
1676                    "CRASH SIMULATION: Blocks written to IndexedDB, simulating crash before commit marker advance"
1677                );
1678
1679                // Clear dirty blocks (they're now in IndexedDB)
1680                lock_mutex!(self.dirty_blocks).clear();
1681
1682                // DON'T advance commit marker - this simulates the crash
1683                // In a real crash, the commit marker update would fail
1684
1685                return Ok(());
1686            } else {
1687                log::info!("CRASH SIMULATION: No dirty blocks to write");
1688                return Ok(());
1689            }
1690        } else {
1691            // Simulate crash before blocks are written
1692            log::info!("CRASH SIMULATION: Simulating crash before blocks are written to IndexedDB");
1693
1694            // Just return success - blocks remain dirty, nothing written to IndexedDB
1695            return Ok(());
1696        }
1697    }
1698
1699    /// Crash simulation: simulate partial block writes during IndexedDB commit
1700    /// Only specified blocks are written to IndexedDB before crash
1701    #[cfg(target_arch = "wasm32")]
1702    pub async fn crash_simulation_partial_sync(
1703        &mut self,
1704        blocks_to_write: &[u64],
1705    ) -> Result<(), DatabaseError> {
1706        log::info!(
1707            "CRASH SIMULATION: Starting partial crash simulation for {} blocks",
1708            blocks_to_write.len()
1709        );
1710
1711        let dirty_blocks = {
1712            let dirty = lock_mutex!(self.dirty_blocks);
1713            dirty.clone()
1714        };
1715
1716        // Filter to only the blocks we want to "successfully" write before crash
1717        let partial_blocks: std::collections::HashMap<u64, Vec<u8>> = dirty_blocks
1718            .into_iter()
1719            .filter(|(block_id, _)| blocks_to_write.contains(block_id))
1720            .collect();
1721
1722        if !partial_blocks.is_empty() {
1723            log::info!(
1724                "CRASH SIMULATION: Writing {} out of {} blocks before crash",
1725                partial_blocks.len(),
1726                blocks_to_write.len()
1727            );
1728
1729            let metadata_to_persist: Vec<(u64, u64)> = partial_blocks
1730                .keys()
1731                .map(|&block_id| {
1732                    let next_commit = self.get_commit_marker() + 1;
1733                    (block_id, next_commit)
1734                })
1735                .collect();
1736
1737            // Write only the partial blocks to IndexedDB
1738            super::wasm_indexeddb::persist_to_indexeddb(
1739                &self.db_name,
1740                partial_blocks.clone(),
1741                metadata_to_persist,
1742            )
1743            .await?;
1744
1745            // Remove only the written blocks from dirty_blocks
1746            {
1747                let mut dirty = lock_mutex!(self.dirty_blocks);
1748                for block_id in partial_blocks.keys() {
1749                    dirty.remove(block_id);
1750                }
1751            }
1752
1753            log::info!(
1754                "CRASH SIMULATION: Partial blocks written, simulating crash before commit marker advance"
1755            );
1756
1757            // DON'T advance commit marker - simulates crash during transaction
1758        }
1759
1760        Ok(())
1761    }
1762
1763    /// Perform crash recovery: detect and handle incomplete IndexedDB transactions
1764    /// This method detects inconsistencies between IndexedDB state and commit markers
1765    /// and either finalizes or rolls back incomplete transactions
1766    #[cfg(target_arch = "wasm32")]
1767    pub async fn perform_crash_recovery(&mut self) -> Result<CrashRecoveryAction, DatabaseError> {
1768        log::info!(
1769            "CRASH RECOVERY: Starting crash recovery scan for database: {}",
1770            self.db_name
1771        );
1772
1773        // Step 1: Get current commit marker
1774        let current_marker = self.get_commit_marker();
1775        log::info!("CRASH RECOVERY: Current commit marker: {}", current_marker);
1776
1777        // Step 2: Scan IndexedDB for blocks with versions > commit marker
1778        // These represent incomplete transactions that need recovery
1779        let inconsistent_blocks = self.scan_for_inconsistent_blocks(current_marker).await?;
1780
1781        if inconsistent_blocks.is_empty() {
1782            log::info!("CRASH RECOVERY: No inconsistent blocks found, system is consistent");
1783            return Ok(CrashRecoveryAction::NoActionNeeded);
1784        }
1785
1786        log::info!(
1787            "CRASH RECOVERY: Found {} inconsistent blocks that need recovery",
1788            inconsistent_blocks.len()
1789        );
1790
1791        // Step 3: Determine recovery action based on transaction completeness
1792        let recovery_action = self.determine_recovery_action(&inconsistent_blocks).await?;
1793
1794        match recovery_action {
1795            CrashRecoveryAction::Rollback => {
1796                log::info!("CRASH RECOVERY: Performing rollback of incomplete transaction");
1797                self.rollback_incomplete_transaction(&inconsistent_blocks)
1798                    .await?;
1799            }
1800            CrashRecoveryAction::Finalize => {
1801                log::info!("CRASH RECOVERY: Performing finalization of complete transaction");
1802                self.finalize_complete_transaction(&inconsistent_blocks)
1803                    .await?;
1804            }
1805            CrashRecoveryAction::NoActionNeeded => {
1806                // Already handled above
1807            }
1808        }
1809
1810        log::info!("CRASH RECOVERY: Recovery completed successfully");
1811        Ok(recovery_action)
1812    }
1813
1814    /// Scan IndexedDB for blocks with versions greater than the commit marker
1815    #[cfg(target_arch = "wasm32")]
1816    async fn scan_for_inconsistent_blocks(
1817        &self,
1818        commit_marker: u64,
1819    ) -> Result<Vec<(u64, u64)>, DatabaseError> {
1820        log::info!(
1821            "CRASH RECOVERY: Scanning for blocks with version > {}",
1822            commit_marker
1823        );
1824
1825        // This is a simplified implementation - in a real system we'd scan IndexedDB directly
1826        // For now, we'll check the global metadata storage
1827        let mut inconsistent_blocks = Vec::new();
1828
1829        vfs_sync::with_global_metadata(|meta_map| {
1830            if let Some(db_meta) = meta_map.borrow().get(&self.db_name) {
1831                for (block_id, metadata) in db_meta.iter() {
1832                    if metadata.version as u64 > commit_marker {
1833                        log::info!(
1834                            "CRASH RECOVERY: Found inconsistent block {} with version {} > marker {}",
1835                            block_id,
1836                            metadata.version,
1837                            commit_marker
1838                        );
1839                        inconsistent_blocks.push((*block_id, metadata.version as u64));
1840                    }
1841                }
1842            }
1843        });
1844
1845        Ok(inconsistent_blocks)
1846    }
1847
1848    /// Determine whether to rollback or finalize based on transaction completeness
1849    #[cfg(target_arch = "wasm32")]
1850    async fn determine_recovery_action(
1851        &self,
1852        inconsistent_blocks: &[(u64, u64)],
1853    ) -> Result<CrashRecoveryAction, DatabaseError> {
1854        // Simple heuristic: if all inconsistent blocks have the same version (next expected commit),
1855        // then the transaction was likely complete and should be finalized.
1856        // Otherwise, rollback to maintain consistency.
1857
1858        let expected_next_commit = self.get_commit_marker() + 1;
1859        let all_same_version = inconsistent_blocks
1860            .iter()
1861            .all(|(_, version)| *version == expected_next_commit);
1862
1863        if all_same_version && !inconsistent_blocks.is_empty() {
1864            log::info!(
1865                "CRASH RECOVERY: All inconsistent blocks have expected version {}, finalizing transaction",
1866                expected_next_commit
1867            );
1868            Ok(CrashRecoveryAction::Finalize)
1869        } else {
1870            log::info!(
1871                "CRASH RECOVERY: Inconsistent block versions detected, rolling back transaction"
1872            );
1873            Ok(CrashRecoveryAction::Rollback)
1874        }
1875    }
1876
1877    /// Rollback incomplete transaction by removing inconsistent blocks
1878    #[cfg(target_arch = "wasm32")]
1879    async fn rollback_incomplete_transaction(
1880        &mut self,
1881        inconsistent_blocks: &[(u64, u64)],
1882    ) -> Result<(), DatabaseError> {
1883        log::info!(
1884            "CRASH RECOVERY: Rolling back {} inconsistent blocks",
1885            inconsistent_blocks.len()
1886        );
1887
1888        // Remove inconsistent blocks from global metadata
1889        vfs_sync::with_global_metadata(|meta_map| {
1890            if let Some(db_meta) = meta_map.borrow_mut().get_mut(&self.db_name) {
1891                for (block_id, _) in inconsistent_blocks {
1892                    log::info!(
1893                        "CRASH RECOVERY: Removing inconsistent block {} from metadata",
1894                        block_id
1895                    );
1896                    db_meta.remove(block_id);
1897                }
1898            }
1899        });
1900
1901        // Remove inconsistent blocks from global storage
1902        vfs_sync::with_global_storage(|storage_map| {
1903            if let Some(db_storage) = storage_map.borrow_mut().get_mut(&self.db_name) {
1904                for (block_id, _) in inconsistent_blocks {
1905                    log::info!(
1906                        "CRASH RECOVERY: Removing inconsistent block {} from global storage",
1907                        block_id
1908                    );
1909                    db_storage.remove(block_id);
1910                }
1911            }
1912        });
1913
1914        // Clear any cached data for these blocks
1915        for (block_id, _) in inconsistent_blocks {
1916            lock_mutex!(self.cache).remove(block_id);
1917            // Remove from LRU order
1918            lock_mutex!(self.lru_order).retain(|&id| id != *block_id);
1919        }
1920
1921        // Remove inconsistent blocks from IndexedDB to avoid accumulating orphaned data
1922        let block_ids_to_delete: Vec<u64> = inconsistent_blocks.iter().map(|(id, _)| *id).collect();
1923        if !block_ids_to_delete.is_empty() {
1924            log::info!(
1925                "CRASH RECOVERY: Deleting {} blocks from IndexedDB",
1926                block_ids_to_delete.len()
1927            );
1928            super::wasm_indexeddb::delete_blocks_from_indexeddb(
1929                &self.db_name,
1930                &block_ids_to_delete,
1931            )
1932            .await?;
1933            log::info!("CRASH RECOVERY: Successfully deleted blocks from IndexedDB");
1934        }
1935
1936        log::info!("CRASH RECOVERY: Rollback completed");
1937        Ok(())
1938    }
1939
1940    /// Finalize complete transaction by advancing commit marker
1941    #[cfg(target_arch = "wasm32")]
1942    async fn finalize_complete_transaction(
1943        &mut self,
1944        inconsistent_blocks: &[(u64, u64)],
1945    ) -> Result<(), DatabaseError> {
1946        log::info!(
1947            "CRASH RECOVERY: Finalizing transaction for {} blocks",
1948            inconsistent_blocks.len()
1949        );
1950
1951        // Find the target commit marker (should be consistent across all blocks)
1952        if let Some((_, target_version)) = inconsistent_blocks.first() {
1953            let new_commit_marker = *target_version;
1954
1955            // Advance the commit marker to make the blocks visible
1956            vfs_sync::with_global_commit_marker(|cm| {
1957                cm.borrow_mut()
1958                    .insert(self.db_name.clone(), new_commit_marker);
1959            });
1960
1961            log::info!(
1962                "CRASH RECOVERY: Advanced commit marker from {} to {}",
1963                self.get_commit_marker(),
1964                new_commit_marker
1965            );
1966
1967            // Update checksums for the finalized blocks
1968            for (block_id, _) in inconsistent_blocks {
1969                // Read the block data to compute and store checksum
1970                if let Ok(data) = self.read_block_sync(*block_id) {
1971                    self.checksum_manager.store_checksum(*block_id, &data);
1972                    log::info!(
1973                        "CRASH RECOVERY: Updated checksum for finalized block {}",
1974                        block_id
1975                    );
1976                }
1977            }
1978        }
1979
1980        log::info!("CRASH RECOVERY: Finalization completed");
1981        Ok(())
1982    }
1983
1984    // Leader Election Methods (WASM only)
1985
1986    /// Start leader election process
1987    #[cfg(target_arch = "wasm32")]
1988    pub async fn start_leader_election(&self) -> Result<(), DatabaseError> {
1989        if self.leader_election.borrow().is_none() {
1990            log::debug!(
1991                "BlockStorage::start_leader_election() - Creating new LeaderElectionManager for {}",
1992                self.db_name
1993            );
1994            let mut manager =
1995                super::leader_election::LeaderElectionManager::new(self.db_name.clone());
1996            log::debug!("BlockStorage::start_leader_election() - Calling manager.start_election()");
1997            manager.start_election().await?;
1998            log::debug!(
1999                "BlockStorage::start_leader_election() - Election started, storing manager"
2000            );
2001            *self.leader_election.borrow_mut() = Some(manager);
2002        } else {
2003            // If election is already running, force leadership takeover (requestLeadership)
2004            log::debug!(
2005                "BlockStorage::start_leader_election() - Election already exists, forcing leadership"
2006            );
2007            if let Some(ref mut manager) = *self.leader_election.borrow_mut() {
2008                manager.force_become_leader().await?;
2009            }
2010        }
2011        Ok(())
2012    }
2013
2014    /// Check if this instance WAS the leader WITHOUT triggering re-election
2015    /// Used during cleanup to avoid starting new elections
2016    #[cfg(target_arch = "wasm32")]
2017    pub fn was_leader(&self) -> bool {
2018        self.leader_election
2019            .borrow()
2020            .as_ref()
2021            .map(|m| m.state.borrow().is_leader)
2022            .unwrap_or(false)
2023    }
2024
2025    /// Stop heartbeat interval synchronously (for Drop implementation)
2026    /// This is idempotent and safe to call multiple times
2027    /// Silently skips if already stopped or if manager is borrowed
2028    #[cfg(target_arch = "wasm32")]
2029    pub fn stop_heartbeat_sync(&self) {
2030        // Use try_borrow_mut to avoid deadlock when multiple DBs drop simultaneously
2031        if let Ok(mut manager_ref) = self.leader_election.try_borrow_mut() {
2032            if let Some(ref mut manager) = *manager_ref {
2033                // Only clear if interval exists (idempotent)
2034                if let Some(interval_id) = manager.heartbeat_interval.take() {
2035                    if let Some(window) = web_sys::window() {
2036                        window.clear_interval_with_handle(interval_id);
2037                        web_sys::console::log_1(
2038                            &format!(
2039                                "[DROP] Cleared heartbeat interval {} for {}",
2040                                interval_id, self.db_name
2041                            )
2042                            .into(),
2043                        );
2044                    }
2045                }
2046                // Also drop the closure to release Rc references
2047                if manager.heartbeat_closure.take().is_some() {
2048                    web_sys::console::log_1(
2049                        &format!("[DROP] Released heartbeat closure for {}", self.db_name).into(),
2050                    );
2051                }
2052            }
2053        } else {
2054            // Already borrowed (e.g., first DB is still dropping) - skip silently
2055            web_sys::console::log_1(
2056                &format!(
2057                    "[DROP] Skipping heartbeat stop for {} (already handled)",
2058                    self.db_name
2059                )
2060                .into(),
2061            );
2062        }
2063    }
2064
2065    /// Check if this instance is the leader (with re-election on lease expiry)
2066    #[cfg(target_arch = "wasm32")]
2067    pub async fn is_leader(&self) -> bool {
2068        // Start leader election if not already started
2069        if self.leader_election.borrow().is_none() {
2070            log::debug!(
2071                "BlockStorage::is_leader() - Starting leader election for {}",
2072                self.db_name
2073            );
2074            if let Err(e) = self.start_leader_election().await {
2075                log::error!("Failed to start leader election: {:?}", e);
2076                return false;
2077            }
2078            log::debug!("BlockStorage::is_leader() - Leader election started successfully");
2079        } else {
2080            log::debug!(
2081                "BlockStorage::is_leader() - Leader election already exists for {}",
2082                self.db_name
2083            );
2084        }
2085
2086        if let Some(ref mut manager) = *self.leader_election.borrow_mut() {
2087            let is_leader = manager.is_leader().await;
2088
2089            // If no current leader (lease expired), trigger re-election
2090            if !is_leader {
2091                let state = manager.state.borrow();
2092                if state.leader_id.is_none() {
2093                    log::debug!(
2094                        "No current leader for {} - triggering re-election",
2095                        self.db_name
2096                    );
2097                    drop(state);
2098                    let _ = manager.try_become_leader().await;
2099
2100                    // Start heartbeat if we became leader
2101                    let new_is_leader = manager.state.borrow().is_leader;
2102                    if new_is_leader && manager.heartbeat_interval.is_none() {
2103                        let _ = manager.start_heartbeat();
2104                    }
2105
2106                    log::debug!(
2107                        "is_leader() for {} = {} (after re-election)",
2108                        self.db_name,
2109                        new_is_leader
2110                    );
2111                    return new_is_leader;
2112                }
2113            }
2114
2115            log::debug!("is_leader() for {} = {}", self.db_name, is_leader);
2116            is_leader
2117        } else {
2118            log::debug!("No leader election manager for {}", self.db_name);
2119            false
2120        }
2121    }
2122
2123    /// Stop leader election (e.g., when tab is closing)
2124    #[cfg(target_arch = "wasm32")]
2125    pub async fn stop_leader_election(&self) -> Result<(), DatabaseError> {
2126        if let Some(mut manager) = self
2127            .leader_election
2128            .try_borrow_mut()
2129            .expect("Failed to borrow leader_election in stop_leader_election")
2130            .take()
2131        {
2132            manager.stop_election().await?;
2133        }
2134        Ok(())
2135    }
2136
2137    /// Send a leader heartbeat (for testing)
2138    #[cfg(target_arch = "wasm32")]
2139    pub async fn send_leader_heartbeat(&self) -> Result<(), DatabaseError> {
2140        if let Some(ref manager) = *self.leader_election.borrow() {
2141            manager.send_heartbeat().await
2142        } else {
2143            Err(DatabaseError::new(
2144                "LEADER_ELECTION_ERROR",
2145                "Leader election not started",
2146            ))
2147        }
2148    }
2149
2150    /// Get timestamp of last received leader heartbeat
2151    #[cfg(target_arch = "wasm32")]
2152    pub async fn get_last_leader_heartbeat(&self) -> Result<u64, DatabaseError> {
2153        if let Some(ref manager) = *self.leader_election.borrow() {
2154            Ok(manager.get_last_heartbeat().await)
2155        } else {
2156            Err(DatabaseError::new(
2157                "LEADER_ELECTION_ERROR",
2158                "Leader election not started",
2159            ))
2160        }
2161    }
2162
2163    // Observability Methods
2164
2165    /// Get comprehensive metrics for observability
2166    pub fn get_metrics(&self) -> super::observability::StorageMetrics {
2167        let dirty_count = self.get_dirty_count();
2168        let dirty_bytes = dirty_count * BLOCK_SIZE;
2169
2170        #[cfg(not(target_arch = "wasm32"))]
2171        let (sync_count, timer_sync_count, debounce_sync_count, last_sync_duration_ms) = {
2172            (
2173                self.sync_count.load(Ordering::SeqCst),
2174                self.timer_sync_count.load(Ordering::SeqCst),
2175                self.debounce_sync_count.load(Ordering::SeqCst),
2176                self.last_sync_duration_ms.load(Ordering::SeqCst),
2177            )
2178        };
2179
2180        #[cfg(target_arch = "wasm32")]
2181        let (sync_count, timer_sync_count, debounce_sync_count, last_sync_duration_ms) = {
2182            // For WASM, use observability manager for sync_count tracking
2183            (self.observability.get_sync_count(), 0, 0, 1)
2184        };
2185
2186        let error_count = self.observability.get_error_count();
2187        let checksum_failures = self.observability.get_checksum_failures();
2188
2189        // Calculate throughput and error rate
2190        let total_operations = sync_count + error_count;
2191        let (throughput_blocks_per_sec, throughput_bytes_per_sec) = self
2192            .observability
2193            .calculate_throughput(last_sync_duration_ms);
2194        let error_rate = self.observability.calculate_error_rate(total_operations);
2195
2196        super::observability::StorageMetrics {
2197            dirty_count,
2198            dirty_bytes,
2199            sync_count,
2200            timer_sync_count,
2201            debounce_sync_count,
2202            error_count,
2203            checksum_failures,
2204            last_sync_duration_ms,
2205            throughput_blocks_per_sec,
2206            throughput_bytes_per_sec,
2207            error_rate,
2208        }
2209    }
2210
2211    /// Set sync event callbacks
2212    #[cfg(not(target_arch = "wasm32"))]
2213    pub fn set_sync_callbacks(
2214        &mut self,
2215        on_sync_start: super::observability::SyncStartCallback,
2216        on_sync_success: super::observability::SyncSuccessCallback,
2217        on_sync_failure: super::observability::SyncFailureCallback,
2218    ) {
2219        self.observability.sync_start_callback = Some(on_sync_start);
2220        self.observability.sync_success_callback = Some(on_sync_success);
2221        self.observability.sync_failure_callback = Some(on_sync_failure);
2222    }
2223
2224    /// Set backpressure callback
2225    #[cfg(not(target_arch = "wasm32"))]
2226    pub fn set_backpressure_callback(
2227        &mut self,
2228        callback: super::observability::BackpressureCallback,
2229    ) {
2230        self.observability.backpressure_callback = Some(callback);
2231    }
2232
2233    /// Set error callback
2234    #[cfg(not(target_arch = "wasm32"))]
2235    pub fn set_error_callback(&mut self, callback: super::observability::ErrorCallback) {
2236        self.observability.error_callback = Some(callback);
2237    }
2238
2239    /// Set WASM sync success callback
2240    #[cfg(target_arch = "wasm32")]
2241    pub fn set_sync_success_callback(
2242        &mut self,
2243        callback: super::observability::WasmSyncSuccessCallback,
2244    ) {
2245        self.observability.wasm_sync_success_callback = Some(callback);
2246    }
2247
2248    /// Check if auto-sync is currently enabled
2249    pub fn is_auto_sync_enabled(&self) -> bool {
2250        lock_mutex!(self.auto_sync_interval).is_some()
2251    }
2252
2253    /// Get the current sync policy (if any)
2254    pub fn get_sync_policy(&self) -> Option<super::SyncPolicy> {
2255        lock_mutex!(self.policy).clone()
2256    }
2257
2258    /// Force synchronization with durability guarantees
2259    ///
2260    /// This method ensures that all dirty blocks are persisted to durable storage
2261    /// (IndexedDB in WASM, filesystem in native) and waits for the operation to complete.
2262    /// This is called by VFS xSync to provide SQLite's durability guarantees.
2263    pub async fn force_sync(&mut self) -> Result<(), DatabaseError> {
2264        log::info!("force_sync: Starting forced synchronization with durability guarantees");
2265
2266        let dirty_count = self.get_dirty_count();
2267        if dirty_count == 0 {
2268            log::debug!("force_sync: No dirty blocks to sync");
2269            return Ok(());
2270        }
2271
2272        log::info!(
2273            "force_sync: Syncing {} dirty blocks with durability guarantee",
2274            dirty_count
2275        );
2276
2277        // Just use the regular sync - it already waits for persistence in WASM
2278        self.sync().await?;
2279
2280        log::info!("force_sync: Successfully completed forced synchronization");
2281        Ok(())
2282    }
2283
2284    /// Set telemetry metrics (used for instrumentation)
2285    #[cfg(feature = "telemetry")]
2286    pub fn set_metrics(&mut self, metrics: Option<crate::telemetry::Metrics>) {
2287        self.metrics = metrics;
2288    }
2289
2290    /// Get telemetry metrics
2291    #[cfg(feature = "telemetry")]
2292    pub fn metrics(&self) -> Option<&crate::telemetry::Metrics> {
2293        self.metrics.as_ref()
2294    }
2295
2296    /// Create a test instance with minimal setup
2297    #[cfg(feature = "telemetry")]
2298    pub fn new_for_test() -> Self {
2299        Self {
2300            cache: Mutex::new(HashMap::new()),
2301            dirty_blocks: Arc::new(parking_lot::Mutex::new(HashMap::new())),
2302            allocated_blocks: Mutex::new(HashSet::new()),
2303            deallocated_blocks: Mutex::new(HashSet::new()),
2304            next_block_id: AtomicU64::new(1),
2305            capacity: 128,
2306            lru_order: Mutex::new(VecDeque::new()),
2307            checksum_manager: crate::storage::metadata::ChecksumManager::new(
2308                crate::storage::metadata::ChecksumAlgorithm::FastHash,
2309            ),
2310            #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
2311            base_dir: std::path::PathBuf::from("/tmp/test"),
2312            db_name: "test.db".to_string(),
2313            auto_sync_interval: Mutex::new(None),
2314            #[cfg(not(target_arch = "wasm32"))]
2315            last_auto_sync: std::time::Instant::now(),
2316            policy: Mutex::new(None),
2317            #[cfg(not(target_arch = "wasm32"))]
2318            auto_sync_stop: None,
2319            #[cfg(not(target_arch = "wasm32"))]
2320            auto_sync_thread: None,
2321            #[cfg(not(target_arch = "wasm32"))]
2322            debounce_thread: None,
2323            #[cfg(not(target_arch = "wasm32"))]
2324            tokio_timer_task: None,
2325            #[cfg(not(target_arch = "wasm32"))]
2326            tokio_debounce_task: None,
2327            #[cfg(not(target_arch = "wasm32"))]
2328            last_write_ms: Arc::new(std::sync::atomic::AtomicU64::new(0)),
2329            #[cfg(not(target_arch = "wasm32"))]
2330            threshold_hit: Arc::new(std::sync::atomic::AtomicBool::new(false)),
2331            #[cfg(not(target_arch = "wasm32"))]
2332            sync_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
2333            #[cfg(not(target_arch = "wasm32"))]
2334            timer_sync_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
2335            #[cfg(not(target_arch = "wasm32"))]
2336            debounce_sync_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
2337            #[cfg(not(target_arch = "wasm32"))]
2338            last_sync_duration_ms: Arc::new(std::sync::atomic::AtomicU64::new(0)),
2339            #[cfg(not(target_arch = "wasm32"))]
2340            sync_sender: None,
2341            #[cfg(not(target_arch = "wasm32"))]
2342            sync_receiver: None,
2343            recovery_report: RecoveryReport::default(),
2344            #[cfg(target_arch = "wasm32")]
2345            leader_election: std::cell::RefCell::new(None),
2346            observability: super::observability::ObservabilityManager::new(),
2347            metrics: None,
2348        }
2349    }
2350}
2351
2352#[cfg(all(test, target_arch = "wasm32"))]
2353mod wasm_commit_marker_tests {
2354    use super::*;
2355    use wasm_bindgen_test::*;
2356
2357    wasm_bindgen_test_configure!(run_in_browser);
2358
2359    // Helper: set commit marker for a db name in WASM global
2360    fn set_commit_marker(db: &str, v: u64) {
2361        super::vfs_sync::with_global_commit_marker(|cm| {
2362            cm.borrow_mut().insert(db.to_string(), v);
2363        });
2364    }
2365
2366    // Helper: get commit marker for a db name in WASM global
2367    fn get_commit_marker(db: &str) -> u64 {
2368        vfs_sync::with_global_commit_marker(|cm| cm.borrow().get(db).copied().unwrap_or(0))
2369    }
2370
2371    #[wasm_bindgen_test]
2372    async fn gating_returns_zeroed_until_marker_catches_up_wasm() {
2373        let db = "cm_gating_wasm";
2374        let mut s = BlockStorage::new(db).await.expect("create storage");
2375
2376        // Write a block (starts at version 1, uncommitted)
2377        let bid = s.allocate_block().await.expect("alloc block");
2378        let data_v1 = vec![0x33u8; BLOCK_SIZE];
2379        s.write_block(bid, data_v1.clone()).await.expect("write v1");
2380
2381        // Before sync, commit marker is 0, block version is 1, so should be invisible
2382        s.clear_cache();
2383        let out0 = s.read_block(bid).await.expect("read before commit");
2384        assert_eq!(
2385            out0,
2386            vec![0u8; BLOCK_SIZE],
2387            "uncommitted data must read as zeroed"
2388        );
2389
2390        // After sync, commit marker advances to 1, block version is 1, so should be visible
2391        s.sync().await.expect("sync v1");
2392        s.clear_cache();
2393        let out1 = s.read_block(bid).await.expect("read after commit");
2394        assert_eq!(out1, data_v1, "committed data should be visible");
2395    }
2396
2397    #[wasm_bindgen_test]
2398    async fn invisible_blocks_skip_checksum_verification_wasm() {
2399        let db = "cm_checksum_skip_wasm";
2400        let mut s = BlockStorage::new(db).await.expect("create storage");
2401
2402        let bid = s.allocate_block().await.expect("alloc block");
2403        let data = vec![0x44u8; BLOCK_SIZE];
2404        s.write_block(bid, data).await.expect("write v1");
2405        s.sync().await.expect("sync v1"); // commit marker advances to 1, block version is 1
2406
2407        // Make the block invisible by moving commit marker back to 0
2408        set_commit_marker(db, 0);
2409
2410        // Corrupt the stored checksum; invisible reads must NOT verify checksum
2411        s.set_block_checksum_for_testing(bid, 1234567);
2412        s.clear_cache();
2413        let out = s
2414            .read_block(bid)
2415            .await
2416            .expect("read while invisible should not error");
2417        assert_eq!(
2418            out,
2419            vec![0u8; BLOCK_SIZE],
2420            "invisible block reads as zeroed"
2421        );
2422
2423        // Now make it visible again; checksum verification should trigger and fail
2424        set_commit_marker(db, 1);
2425        s.clear_cache();
2426        let err = s
2427            .read_block(bid)
2428            .await
2429            .expect_err("expected checksum mismatch once visible");
2430        assert_eq!(err.code, "CHECKSUM_MISMATCH");
2431    }
2432
2433    #[wasm_bindgen_test]
2434    async fn commit_marker_advances_and_versions_track_syncs_wasm() {
2435        let db = "cm_versions_wasm";
2436        let mut s = BlockStorage::new_with_capacity(db, 8)
2437            .await
2438            .expect("create storage");
2439
2440        let b1 = s.allocate_block().await.expect("alloc b1");
2441        let b2 = s.allocate_block().await.expect("alloc b2");
2442
2443        s.write_block(b1, vec![1u8; BLOCK_SIZE])
2444            .await
2445            .expect("write b1 v1");
2446        s.write_block(b2, vec![2u8; BLOCK_SIZE])
2447            .await
2448            .expect("write b2 v1");
2449        s.sync().await.expect("sync #1");
2450
2451        let cm1 = get_commit_marker(db);
2452        assert_eq!(cm1, 1, "first sync should advance commit marker to 1");
2453        let meta1 = s.get_block_metadata_for_testing();
2454        assert_eq!(meta1.get(&b1).unwrap().1 as u64, cm1);
2455        assert_eq!(meta1.get(&b2).unwrap().1 as u64, cm1);
2456
2457        // Update only b1 and sync again; only b1's version should bump
2458        s.write_block(b1, vec![3u8; BLOCK_SIZE])
2459            .await
2460            .expect("write b1 v2");
2461        s.sync().await.expect("sync #2");
2462
2463        let cm2 = get_commit_marker(db);
2464        assert_eq!(cm2, 2, "second sync should advance commit marker to 2");
2465        let meta2 = s.get_block_metadata_for_testing();
2466        assert_eq!(
2467            meta2.get(&b1).unwrap().1 as u64,
2468            cm2,
2469            "updated block tracks new version"
2470        );
2471        assert_eq!(
2472            meta2.get(&b2).unwrap().1 as u64,
2473            1,
2474            "unchanged block retains prior version"
2475        );
2476    }
2477}
2478
2479#[cfg(all(test, not(target_arch = "wasm32"), not(feature = "fs_persist")))]
2480mod commit_marker_tests {
2481    use super::*;
2482
2483    // Helper: set commit marker for a db name in test-global mirror
2484    fn set_commit_marker(db: &str, v: u64) {
2485        super::vfs_sync::with_global_commit_marker(|cm| {
2486            cm.borrow_mut().insert(db.to_string(), v);
2487        });
2488    }
2489
2490    // Helper: get commit marker for a db name in test-global mirror
2491    fn get_commit_marker(db: &str) -> u64 {
2492        super::vfs_sync::with_global_commit_marker(|cm| cm.borrow().get(db).copied().unwrap_or(0))
2493    }
2494
2495    #[tokio::test(flavor = "current_thread")]
2496    async fn gating_returns_zeroed_until_marker_catches_up() {
2497        let db = "cm_gating_basic";
2498        println!("DEBUG: Creating BlockStorage for {}", db);
2499        let mut s = BlockStorage::new(db).await.expect("create storage");
2500        println!("DEBUG: BlockStorage created successfully");
2501
2502        // Write a block (starts at version 1, uncommitted)
2503        let bid = s.allocate_block().await.expect("alloc block");
2504        println!("DEBUG: Allocated block {}", bid);
2505        let data_v1 = vec![0x11u8; BLOCK_SIZE];
2506        s.write_block(bid, data_v1.clone()).await.expect("write v1");
2507        println!("DEBUG: Wrote block {} with data", bid);
2508
2509        // Before sync, commit marker is 0, block version is 1, so should be invisible
2510        s.clear_cache();
2511        let out0 = s.read_block(bid).await.expect("read before commit");
2512        assert_eq!(
2513            out0,
2514            vec![0u8; BLOCK_SIZE],
2515            "uncommitted data must read as zeroed"
2516        );
2517        println!("DEBUG: Pre-sync read returned zeroed data as expected");
2518
2519        // After sync, commit marker advances to 1, block version is 1, so should be visible
2520        println!("DEBUG: About to call sync");
2521        s.sync().await.expect("sync v1");
2522        println!("DEBUG: Sync completed successfully");
2523
2524        // Debug: Check commit marker and metadata after sync
2525        let commit_marker = get_commit_marker(db);
2526        println!("DEBUG: Commit marker after sync: {}", commit_marker);
2527
2528        s.clear_cache();
2529        let out1 = s.read_block(bid).await.expect("read after commit");
2530
2531        // Debug: Print what we got vs what we expected
2532        println!("DEBUG: Expected data: {:?}", &data_v1[..8]);
2533        println!("DEBUG: Actual data: {:?}", &out1[..8]);
2534        println!(
2535            "DEBUG: Data lengths - expected: {}, actual: {}",
2536            data_v1.len(),
2537            out1.len()
2538        );
2539
2540        // Check if data matches without panicking
2541        let data_matches = out1 == data_v1;
2542        println!("DEBUG: Data matches: {}", data_matches);
2543
2544        if !data_matches {
2545            println!("DEBUG: Data mismatch detected - investigating further");
2546            // Check if it's all zeros (uncommitted)
2547            let is_all_zeros = out1.iter().all(|&b| b == 0);
2548            println!("DEBUG: Is all zeros: {}", is_all_zeros);
2549
2550            // Check metadata and commit marker state
2551            println!("DEBUG: Final commit marker: {}", get_commit_marker(db));
2552
2553            panic!("Data mismatch: expected committed data to be visible after sync");
2554        }
2555
2556        println!("DEBUG: Test passed - data is visible after commit");
2557    }
2558
2559    #[tokio::test(flavor = "current_thread")]
2560    async fn invisible_blocks_skip_checksum_verification() {
2561        let db = "cm_checksum_skip";
2562        let mut s = BlockStorage::new(db).await.expect("create storage");
2563
2564        let bid = s.allocate_block().await.expect("alloc block");
2565        let data = vec![0xAAu8; BLOCK_SIZE];
2566        s.write_block(bid, data.clone()).await.expect("write v1");
2567        s.sync().await.expect("sync v1"); // commit marker advances to 1, block version is 1
2568
2569        // Make the block invisible by moving commit marker back to 0
2570        set_commit_marker(db, 0);
2571
2572        // Corrupt the stored checksum; invisible reads must NOT verify checksum
2573        s.set_block_checksum_for_testing(bid, 1234567);
2574        s.clear_cache();
2575        let out = s
2576            .read_block(bid)
2577            .await
2578            .expect("read while invisible should not error");
2579        assert_eq!(
2580            out,
2581            vec![0u8; BLOCK_SIZE],
2582            "invisible block reads as zeroed"
2583        );
2584
2585        // Now make it visible again; checksum verification should trigger and fail
2586        set_commit_marker(db, 1);
2587        s.clear_cache();
2588        let err = s
2589            .read_block(bid)
2590            .await
2591            .expect_err("expected checksum mismatch once visible");
2592        assert_eq!(err.code, "CHECKSUM_MISMATCH");
2593    }
2594
2595    #[tokio::test(flavor = "current_thread")]
2596    async fn commit_marker_advances_and_versions_track_syncs() {
2597        let db = "cm_versions";
2598        let mut s = BlockStorage::new_with_capacity(db, 8)
2599            .await
2600            .expect("create storage");
2601
2602        let b1 = s.allocate_block().await.expect("alloc b1");
2603        let b2 = s.allocate_block().await.expect("alloc b2");
2604
2605        s.write_block(b1, vec![1u8; BLOCK_SIZE])
2606            .await
2607            .expect("write b1 v1");
2608        s.write_block(b2, vec![2u8; BLOCK_SIZE])
2609            .await
2610            .expect("write b2 v1");
2611        s.sync().await.expect("sync #1");
2612
2613        let cm1 = get_commit_marker(db);
2614        assert_eq!(cm1, 1, "first sync should advance commit marker to 1");
2615        let meta1 = s.get_block_metadata_for_testing();
2616        assert_eq!(meta1.get(&b1).unwrap().1 as u64, cm1);
2617        assert_eq!(meta1.get(&b2).unwrap().1 as u64, cm1);
2618
2619        // Update only b1 and sync again; only b1's version should bump
2620        s.write_block(b1, vec![3u8; BLOCK_SIZE])
2621            .await
2622            .expect("write b1 v2");
2623        s.sync().await.expect("sync #2");
2624
2625        let cm2 = get_commit_marker(db);
2626        assert_eq!(cm2, 2, "second sync should advance commit marker to 2");
2627        let meta2 = s.get_block_metadata_for_testing();
2628        assert_eq!(
2629            meta2.get(&b1).unwrap().1 as u64,
2630            cm2,
2631            "updated block tracks new version"
2632        );
2633        assert_eq!(
2634            meta2.get(&b2).unwrap().1 as u64,
2635            1,
2636            "unchanged block retains prior version"
2637        );
2638    }
2639}