absurder_sql/storage/
block_storage.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use std::sync::Arc;
3use parking_lot::Mutex;
4use std::time::Duration;
5#[cfg(not(target_arch = "wasm32"))]
6use std::time::{Instant, SystemTime, UNIX_EPOCH};
7#[cfg(target_arch = "wasm32")]
8use js_sys::Date;
9#[cfg(not(target_arch = "wasm32"))]
10use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
11use crate::types::DatabaseError;
12use super::metadata::{ChecksumManager, ChecksumAlgorithm};
13#[allow(unused_imports)]
14use super::metadata::BlockMetadataPersist;
15#[cfg(any(target_arch = "wasm32", all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist"))))]
16use super::vfs_sync;
17#[cfg(not(target_arch = "wasm32"))]
18use tokio::task::JoinHandle as TokioJoinHandle;
19#[cfg(not(target_arch = "wasm32"))]
20use tokio::sync::mpsc;
21
22#[allow(unused_imports)]
23use std::cell::RefCell;
24
25// FS persistence imports (native only when feature is enabled)
26#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
27use std::path::PathBuf;
28
29// Global storage management moved to vfs_sync module
30
31// Persistent metadata storage moved to metadata module
32
33// Auto-sync messaging
34#[cfg(not(target_arch = "wasm32"))]
35#[derive(Debug)]
36pub(super) enum SyncRequest {
37    Timer(tokio::sync::oneshot::Sender<()>),
38    Debounce(tokio::sync::oneshot::Sender<()>),
39}
40
41#[derive(Clone, Debug, Default)]
42pub struct RecoveryOptions {
43    pub mode: RecoveryMode,
44    pub on_corruption: CorruptionAction,
45}
46
47#[derive(Clone, Debug)]
48pub enum RecoveryMode {
49    Full,
50    Sample { count: usize },
51    Skip,
52}
53
54impl Default for RecoveryMode {
55    fn default() -> Self {
56        RecoveryMode::Full
57    }
58}
59
60#[derive(Clone, Debug)]
61pub enum CorruptionAction {
62    Report,
63    Repair,
64    Fail,
65}
66
67#[derive(Clone, Debug, PartialEq)]
68pub enum CrashRecoveryAction {
69    NoActionNeeded,
70    Rollback,
71    Finalize,
72}
73
74impl Default for CorruptionAction {
75    fn default() -> Self {
76        CorruptionAction::Report
77    }
78}
79
80#[derive(Clone, Debug, Default)]
81pub struct RecoveryReport {
82    pub total_blocks_verified: usize,
83    pub corrupted_blocks: Vec<u64>,
84    pub repaired_blocks: Vec<u64>,
85    pub verification_duration_ms: u64,
86}
87
88// On-disk JSON schema for fs_persist
89#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
90#[derive(serde::Serialize, serde::Deserialize, Default)]
91#[allow(dead_code)]
92struct FsMeta { entries: Vec<(u64, BlockMetadataPersist)> }
93
94#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
95#[derive(serde::Serialize, serde::Deserialize, Default)]
96#[allow(dead_code)]
97struct FsAlloc { allocated: Vec<u64> }
98
99#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
100#[derive(serde::Serialize, serde::Deserialize, Default)]
101#[allow(dead_code)]
102struct FsDealloc { tombstones: Vec<u64> }
103
104// Metadata mirror for native builds
105#[cfg(not(target_arch = "wasm32"))]
106thread_local! {
107    pub(super) static GLOBAL_METADATA_TEST: RefCell<HashMap<String, HashMap<u64, BlockMetadataPersist>>> = RefCell::new(HashMap::new());
108}
109
110// Test-only commit marker mirror for native builds (when fs_persist is disabled)
111#[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions)))]
112thread_local! {
113    static GLOBAL_COMMIT_MARKER_TEST: RefCell<HashMap<String, u64>> = RefCell::new(HashMap::new());
114}
115
116#[derive(Clone, Debug)]
117pub struct SyncPolicy {
118    pub interval_ms: Option<u64>,
119    pub max_dirty: Option<usize>,
120    pub max_dirty_bytes: Option<usize>,
121    pub debounce_ms: Option<u64>,
122    pub verify_after_write: bool,
123}
124
125#[cfg(not(target_arch = "wasm32"))]
126impl Drop for BlockStorage {
127    fn drop(&mut self) {
128        if let Some(stop) = &self.auto_sync_stop {
129            stop.store(true, Ordering::SeqCst);
130        }
131        if let Some(handle) = self.auto_sync_thread.take() {
132            let _ = handle.join();
133        }
134        if let Some(handle) = self.debounce_thread.take() {
135            let _ = handle.join();
136        }
137        if let Some(task) = self.tokio_timer_task.take() {
138            task.abort();
139        }
140        if let Some(task) = self.tokio_debounce_task.take() {
141            task.abort();
142        }
143        self.auto_sync_stop = None;
144    }
145}
146
147pub const BLOCK_SIZE: usize = 4096;
148#[allow(dead_code)]
149pub(super) const DEFAULT_CACHE_CAPACITY: usize = 128;
150#[allow(dead_code)]
151const STORE_NAME: &str = "sqlite_blocks";
152#[allow(dead_code)]
153const METADATA_STORE: &str = "metadata";
154
155pub struct BlockStorage {
156    pub(super) cache: HashMap<u64, Vec<u8>>,
157    pub(super) dirty_blocks: Arc<Mutex<HashMap<u64, Vec<u8>>>>,
158    pub(super) allocated_blocks: HashSet<u64>,
159    #[allow(dead_code)]
160    pub(super) deallocated_blocks: HashSet<u64>,
161    pub(super) next_block_id: u64,
162    pub(super) capacity: usize,
163    pub(super) lru_order: VecDeque<u64>,
164    // Checksum management (moved to metadata module)
165    pub(super) checksum_manager: ChecksumManager,
166    #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
167    pub(super) base_dir: PathBuf,
168    pub(super) db_name: String,
169    // Background sync settings
170    pub(super) auto_sync_interval: Option<Duration>,
171    #[cfg(not(target_arch = "wasm32"))]
172    pub(super) last_auto_sync: Instant,
173    pub(super) policy: Option<SyncPolicy>,
174    #[cfg(not(target_arch = "wasm32"))]
175    pub(super) auto_sync_stop: Option<Arc<AtomicBool>>,
176    #[cfg(not(target_arch = "wasm32"))]
177    pub(super) auto_sync_thread: Option<std::thread::JoinHandle<()>>,
178    #[cfg(not(target_arch = "wasm32"))]
179    pub(super) debounce_thread: Option<std::thread::JoinHandle<()>>,
180    #[cfg(not(target_arch = "wasm32"))]
181    pub(super) tokio_timer_task: Option<TokioJoinHandle<()>>,
182    #[cfg(not(target_arch = "wasm32"))]
183    pub(super) tokio_debounce_task: Option<TokioJoinHandle<()>>,
184    #[cfg(not(target_arch = "wasm32"))]
185    pub(super) last_write_ms: Arc<AtomicU64>,
186    #[cfg(not(target_arch = "wasm32"))]
187    pub(super) threshold_hit: Arc<AtomicBool>,
188    #[cfg(not(target_arch = "wasm32"))]
189    pub(super) sync_count: Arc<AtomicU64>,
190    #[cfg(not(target_arch = "wasm32"))]
191    pub(super) timer_sync_count: Arc<AtomicU64>,
192    #[cfg(not(target_arch = "wasm32"))]
193    pub(super) debounce_sync_count: Arc<AtomicU64>,
194    #[cfg(not(target_arch = "wasm32"))]
195    pub(super) last_sync_duration_ms: Arc<AtomicU64>,
196
197    // Auto-sync channel for real sync operations
198    #[cfg(not(target_arch = "wasm32"))]
199    pub(super) sync_sender: Option<mpsc::UnboundedSender<SyncRequest>>,
200    #[cfg(not(target_arch = "wasm32"))]
201    pub(super) sync_receiver: Option<mpsc::UnboundedReceiver<SyncRequest>>,
202
203    // Startup recovery report
204    pub(super) recovery_report: RecoveryReport,
205    
206    // Leader election manager (WASM only)
207    #[cfg(target_arch = "wasm32")]
208    pub(super) leader_election: Option<super::leader_election::LeaderElectionManager>,
209    
210    // Observability manager
211    pub(super) observability: super::observability::ObservabilityManager,
212    
213    // Telemetry metrics (optional)
214    #[cfg(feature = "telemetry")]
215    pub(super) metrics: Option<crate::telemetry::Metrics>,
216}
217
218impl BlockStorage {
219    /// Create a new BlockStorage synchronously without IndexedDB restoration
220    /// Used for auto-registration in VFS when existing data is detected
221    #[cfg(target_arch = "wasm32")]
222    pub fn new_sync(db_name: &str) -> Self {
223        log::info!("Creating BlockStorage synchronously for database: {}", db_name);
224        
225        // Load existing data from GLOBAL_STORAGE to support multi-connection scenarios
226        use crate::storage::vfs_sync::with_global_storage;
227        let (cache, allocated_blocks, max_block_id) = with_global_storage(|gs| {
228            let storage_map = gs.borrow();
229            if let Some(db_storage) = storage_map.get(db_name) {
230                let cache = db_storage.clone();
231                let allocated = db_storage.keys().copied().collect::<HashSet<_>>();
232                let max_id = db_storage.keys().max().copied().unwrap_or(0);
233                (cache, allocated, max_id)
234            } else {
235                (HashMap::new(), HashSet::new(), 0)
236            }
237        });
238        
239        log::info!("Loaded {} blocks from GLOBAL_STORAGE for {} (max_block_id={})", cache.len(), db_name, max_block_id);
240        
241        // CRITICAL: Always reload checksums from GLOBAL_METADATA since cache might be stale
242        // This handles the case where import writes new data after close() reloaded cache
243        use crate::storage::vfs_sync::with_global_metadata;
244        let checksum_manager = with_global_metadata(|gm| {
245            let metadata_map = gm.borrow();
246            if let Some(db_metadata) = metadata_map.get(db_name) {
247                let mut checksums = HashMap::new();
248                let mut algos = HashMap::new();
249                for (block_id, meta) in db_metadata {
250                    checksums.insert(*block_id, meta.checksum);
251                    algos.insert(*block_id, meta.algo);
252                }
253                ChecksumManager::with_data(checksums, algos, ChecksumAlgorithm::FastHash)
254            } else {
255                ChecksumManager::new(ChecksumAlgorithm::FastHash)
256            }
257        });
258        
259        Self {
260            cache,
261            dirty_blocks: Arc::new(Mutex::new(HashMap::new())),
262            allocated_blocks,
263            deallocated_blocks: HashSet::new(),
264            next_block_id: max_block_id + 1,
265            capacity: 128,
266            lru_order: VecDeque::new(),
267            checksum_manager,
268            db_name: db_name.to_string(),
269            auto_sync_interval: None,
270            policy: None,
271            #[cfg(not(target_arch = "wasm32"))]
272            last_auto_sync: Instant::now(),
273            #[cfg(not(target_arch = "wasm32"))]
274            auto_sync_stop: None,
275            #[cfg(not(target_arch = "wasm32"))]
276            auto_sync_thread: None,
277            #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
278            base_dir: std::path::PathBuf::from(std::env::var("ABSURDERSQL_FS_BASE").unwrap_or_else(|_| "./test_storage".to_string())),
279            #[cfg(not(target_arch = "wasm32"))]
280            debounce_thread: None,
281            #[cfg(not(target_arch = "wasm32"))]
282            tokio_timer_task: None,
283            #[cfg(not(target_arch = "wasm32"))]
284            tokio_debounce_task: None,
285            #[cfg(not(target_arch = "wasm32"))]
286            last_write_ms: Arc::new(AtomicU64::new(0)),
287            #[cfg(not(target_arch = "wasm32"))]
288            threshold_hit: Arc::new(AtomicBool::new(false)),
289            #[cfg(not(target_arch = "wasm32"))]
290            sync_count: Arc::new(AtomicU64::new(0)),
291            #[cfg(not(target_arch = "wasm32"))]
292            timer_sync_count: Arc::new(AtomicU64::new(0)),
293            #[cfg(not(target_arch = "wasm32"))]
294            debounce_sync_count: Arc::new(AtomicU64::new(0)),
295            #[cfg(not(target_arch = "wasm32"))]
296            last_sync_duration_ms: Arc::new(AtomicU64::new(0)),
297            #[cfg(not(target_arch = "wasm32"))]
298            sync_sender: None,
299            #[cfg(not(target_arch = "wasm32"))]
300            sync_receiver: None,
301            recovery_report: RecoveryReport::default(),
302            #[cfg(target_arch = "wasm32")]
303            leader_election: None,
304            observability: super::observability::ObservabilityManager::new(),
305            #[cfg(feature = "telemetry")]
306            metrics: None,
307        }
308    }
309
310    #[cfg(target_arch = "wasm32")]
311    pub async fn new(db_name: &str) -> Result<Self, DatabaseError> {
312        super::constructors::new_wasm(db_name).await
313    }
314
315    #[cfg(not(target_arch = "wasm32"))]
316    pub async fn new(db_name: &str) -> Result<Self, DatabaseError> {
317        log::info!("Creating BlockStorage for database: {}", db_name);
318        
319        // Initialize allocation tracking for native
320        let (allocated_blocks, next_block_id) = {
321            #[cfg(feature = "fs_persist")]
322            {
323                // fs_persist: restore allocation from filesystem
324                let mut allocated_blocks = HashSet::new();
325                let mut next_block_id: u64 = 1;
326                
327                let base_path = std::env::var("ABSURDERSQL_FS_BASE").unwrap_or_else(|_| "./test_storage".to_string());
328                let mut alloc_path = std::path::PathBuf::from(base_path);
329                alloc_path.push(db_name);
330                alloc_path.push("allocations.json");
331                
332                if let Ok(content) = std::fs::read_to_string(&alloc_path) {
333                    if let Ok(alloc_data) = serde_json::from_str::<serde_json::Value>(&content) {
334                        if let Some(allocated_array) = alloc_data["allocated"].as_array() {
335                            for block_id_val in allocated_array {
336                                if let Some(block_id) = block_id_val.as_u64() {
337                                    allocated_blocks.insert(block_id);
338                                }
339                            }
340                            next_block_id = allocated_blocks.iter().max().copied().unwrap_or(0) + 1;
341                        }
342                    }
343                }
344                
345                (allocated_blocks, next_block_id)
346            }
347            
348            #[cfg(not(feature = "fs_persist"))]
349            {
350                // Native test mode: use default allocation
351                (HashSet::new(), 1)
352            }
353        };
354
355        // Initialize checksums and checksum algorithms
356        let checksums_init: HashMap<u64, u64> = {
357            #[cfg(feature = "fs_persist")]
358            {
359                let mut map = HashMap::new();
360                let base_path = std::env::var("ABSURDERSQL_FS_BASE").unwrap_or_else(|_| "./test_storage".to_string());
361                let mut meta_path = std::path::PathBuf::from(base_path);
362                meta_path.push(db_name);
363                meta_path.push("metadata.json");
364                
365                if let Ok(content) = std::fs::read_to_string(&meta_path) {
366                    if let Ok(meta_data) = serde_json::from_str::<serde_json::Value>(&content) {
367                        if let Some(entries) = meta_data["entries"].as_array() {
368                            for entry in entries {
369                                if let Some(arr) = entry.as_array() {
370                                    if let (Some(block_id), Some(obj)) = (arr.get(0).and_then(|v| v.as_u64()), arr.get(1).and_then(|v| v.as_object())) {
371                                        if let Some(checksum) = obj.get("checksum").and_then(|v| v.as_u64()) {
372                                            map.insert(block_id, checksum);
373                                        }
374                                    }
375                                }
376                            }
377                        }
378                    }
379                }
380                map
381            }
382            
383            #[cfg(not(feature = "fs_persist"))]
384            {
385                // Native test mode: restore checksums from global test storage
386                #[allow(unused_mut)]
387                let mut map = HashMap::new();
388                #[cfg(any(test, debug_assertions))]
389                GLOBAL_METADATA_TEST.with(|meta| {
390                    let meta_map = meta.borrow();
391                    if let Some(db_meta) = meta_map.get(db_name) {
392                        for (block_id, metadata) in db_meta.iter() {
393                            map.insert(*block_id, metadata.checksum);
394                        }
395                    }
396                });
397                map
398            }
399        };
400
401        let checksum_algos_init: HashMap<u64, ChecksumAlgorithm> = {
402            #[cfg(feature = "fs_persist")]
403            {
404                let mut map = HashMap::new();
405                let base_path = std::env::var("ABSURDERSQL_FS_BASE").unwrap_or_else(|_| "./test_storage".to_string());
406                let mut meta_path = std::path::PathBuf::from(base_path);
407                meta_path.push(db_name);
408                meta_path.push("metadata.json");
409                
410                if let Ok(content) = std::fs::read_to_string(&meta_path) {
411                    if let Ok(meta_data) = serde_json::from_str::<serde_json::Value>(&content) {
412                        if let Some(entries) = meta_data["entries"].as_array() {
413                            for entry in entries {
414                                if let Some(arr) = entry.as_array() {
415                                    if let (Some(block_id), Some(obj)) = (arr.get(0).and_then(|v| v.as_u64()), arr.get(1).and_then(|v| v.as_object())) {
416                                        let algo = obj.get("algo").and_then(|v| v.as_str())
417                                            .and_then(|s| match s {
418                                                "CRC32" => Some(ChecksumAlgorithm::CRC32),
419                                                "FastHash" => Some(ChecksumAlgorithm::FastHash),
420                                                _ => None,
421                                            })
422                                            .unwrap_or(ChecksumAlgorithm::FastHash);
423                                        map.insert(block_id, algo);
424                                    }
425                                }
426                            }
427                        }
428                    }
429                }
430                map
431            }
432            
433            #[cfg(not(feature = "fs_persist"))]
434            {
435                // Native test mode: restore algorithms from global test storage
436                #[allow(unused_mut)]
437                let mut map = HashMap::new();
438                #[cfg(any(test, debug_assertions))]
439                GLOBAL_METADATA_TEST.with(|meta| {
440                    let meta_map = meta.borrow();
441                    if let Some(db_meta) = meta_map.get(db_name) {
442                        for (block_id, metadata) in db_meta.iter() {
443                            map.insert(*block_id, metadata.algo);
444                        }
445                    }
446                });
447                map
448            }
449        };
450
451        // Determine default checksum algorithm from environment (fs_persist native), fallback to FastHash
452        #[cfg(feature = "fs_persist")]
453        let checksum_algo_default = match std::env::var("DATASYNC_CHECKSUM_ALGO").ok().as_deref() {
454            Some("CRC32") => ChecksumAlgorithm::CRC32,
455            _ => ChecksumAlgorithm::FastHash,
456        };
457        #[cfg(not(feature = "fs_persist"))]
458        let checksum_algo_default = ChecksumAlgorithm::FastHash;
459
460        // Load deallocated blocks from filesystem
461        let deallocated_blocks_init: HashSet<u64> = {
462            #[cfg(feature = "fs_persist")]
463            {
464                let mut set = HashSet::new();
465                let base_path = std::env::var("ABSURDERSQL_FS_BASE").unwrap_or_else(|_| "./test_storage".to_string());
466                let mut path = std::path::PathBuf::from(base_path);
467                path.push(db_name);
468                let mut dealloc_path = path.clone();
469                dealloc_path.push("deallocated.json");
470                if let Ok(content) = std::fs::read_to_string(&dealloc_path) {
471                    if let Ok(dealloc_data) = serde_json::from_str::<serde_json::Value>(&content) {
472                        if let Some(tombstones_array) = dealloc_data["tombstones"].as_array() {
473                            for tombstone_val in tombstones_array {
474                                if let Some(block_id) = tombstone_val.as_u64() {
475                                    set.insert(block_id);
476                                }
477                            }
478                        }
479                    }
480                }
481                set
482            }
483            #[cfg(not(feature = "fs_persist"))]
484            {
485                HashSet::new()
486            }
487        };
488
489        Ok(BlockStorage {
490            db_name: db_name.to_string(),
491            cache: HashMap::new(),
492            lru_order: VecDeque::new(),
493            capacity: 1000,
494            checksum_manager: ChecksumManager::with_data(
495                checksums_init,
496                checksum_algos_init,
497                checksum_algo_default,
498            ),
499            dirty_blocks: Arc::new(Mutex::new(HashMap::new())),
500            allocated_blocks,
501            next_block_id,
502            deallocated_blocks: deallocated_blocks_init,
503            policy: None,
504            auto_sync_interval: None,
505            #[cfg(not(target_arch = "wasm32"))]
506            last_auto_sync: Instant::now(),
507            #[cfg(not(target_arch = "wasm32"))]
508            auto_sync_stop: None,
509            #[cfg(not(target_arch = "wasm32"))]
510            auto_sync_thread: None,
511            #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
512            base_dir: std::path::PathBuf::from(std::env::var("ABSURDERSQL_FS_BASE").unwrap_or_else(|_| "./test_storage".to_string())),
513            #[cfg(not(target_arch = "wasm32"))]
514            debounce_thread: None,
515            #[cfg(not(target_arch = "wasm32"))]
516            tokio_timer_task: None,
517            #[cfg(not(target_arch = "wasm32"))]
518            tokio_debounce_task: None,
519            #[cfg(not(target_arch = "wasm32"))]
520            last_write_ms: Arc::new(AtomicU64::new(0)),
521            #[cfg(not(target_arch = "wasm32"))]
522            threshold_hit: Arc::new(AtomicBool::new(false)),
523            #[cfg(not(target_arch = "wasm32"))]
524            sync_count: Arc::new(AtomicU64::new(0)),
525            #[cfg(not(target_arch = "wasm32"))]
526            timer_sync_count: Arc::new(AtomicU64::new(0)),
527            #[cfg(not(target_arch = "wasm32"))]
528            debounce_sync_count: Arc::new(AtomicU64::new(0)),
529            #[cfg(not(target_arch = "wasm32"))]
530            last_sync_duration_ms: Arc::new(AtomicU64::new(0)),
531            #[cfg(not(target_arch = "wasm32"))]
532            sync_sender: None,
533            #[cfg(not(target_arch = "wasm32"))]
534            sync_receiver: None,
535            recovery_report: RecoveryReport::default(),
536            observability: super::observability::ObservabilityManager::new(),
537            #[cfg(feature = "telemetry")]
538            metrics: None,
539        })
540    }
541
542    pub async fn new_with_capacity(db_name: &str, capacity: usize) -> Result<Self, DatabaseError> {
543        let mut s = Self::new(db_name).await?;
544        s.capacity = capacity;
545        Ok(s)
546    }
547
548    pub async fn new_with_recovery_options(db_name: &str, recovery_opts: RecoveryOptions) -> Result<Self, DatabaseError> {
549        let mut storage = Self::new(db_name).await?;
550        
551        storage.perform_startup_recovery(recovery_opts).await?;
552        
553        Ok(storage)
554    }
555
556    pub fn get_recovery_report(&self) -> &RecoveryReport {
557        &self.recovery_report
558    }
559
560    async fn perform_startup_recovery(&mut self, opts: RecoveryOptions) -> Result<(), DatabaseError> {
561        super::recovery::perform_startup_recovery(self, opts).await
562    }
563
564    pub(super) async fn get_blocks_for_verification(&self, mode: &RecoveryMode) -> Result<Vec<u64>, DatabaseError> {
565        let all_blocks: Vec<u64> = self.allocated_blocks.iter().copied().collect();
566        
567        match mode {
568            RecoveryMode::Full => Ok(all_blocks),
569            RecoveryMode::Sample { count } => {
570                let sample_count = (*count).min(all_blocks.len());
571                let mut sampled = all_blocks;
572                sampled.sort_unstable(); // Deterministic sampling
573                sampled.truncate(sample_count);
574                Ok(sampled)
575            }
576            RecoveryMode::Skip => Ok(Vec::new()),
577        }
578    }
579
580    pub(super) async fn verify_block_integrity(&mut self, block_id: u64) -> Result<bool, DatabaseError> {
581        // Read the block data
582        let data = match self.read_block_from_storage(block_id).await {
583            Ok(data) => data,
584            Err(_) => {
585                log::warn!("Could not read block {} for integrity verification", block_id);
586                return Ok(false);
587            }
588        };
589
590        // Verify against stored checksum
591        match self.verify_against_stored_checksum(block_id, &data) {
592            Ok(()) => Ok(true),
593            Err(e) => {
594                log::warn!("Block {} failed checksum verification: {}", block_id, e.message);
595                Ok(false)
596            }
597        }
598    }
599
600    async fn read_block_from_storage(&mut self, block_id: u64) -> Result<Vec<u8>, DatabaseError> {
601        // Try to read from filesystem first (fs_persist mode)
602        #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
603        {
604            let mut blocks_dir = self.base_dir.clone();
605            blocks_dir.push(&self.db_name);
606            blocks_dir.push("blocks");
607            let block_file = blocks_dir.join(format!("block_{}.bin", block_id));
608            
609            if let Ok(data) = std::fs::read(&block_file) {
610                if data.len() == BLOCK_SIZE {
611                    return Ok(data);
612                }
613            }
614        }
615
616        // Fallback to test storage for native tests
617        #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
618        {
619            let mut found_data = None;
620            vfs_sync::with_global_storage(|storage| {
621                let storage_map = storage.borrow();
622                if let Some(db_storage) = storage_map.get(&self.db_name) {
623                    if let Some(data) = db_storage.get(&block_id) {
624                        found_data = Some(data.clone());
625                    }
626                }
627            });
628            if let Some(data) = found_data {
629                return Ok(data);
630            }
631        }
632
633        // WASM global storage
634        #[cfg(target_arch = "wasm32")]
635        {
636            let mut found_data = None;
637            vfs_sync::with_global_storage(|storage| {
638                let storage_map = storage.borrow();
639                if let Some(db_storage) = storage_map.get(&self.db_name) {
640                    if let Some(data) = db_storage.get(&block_id) {
641                        found_data = Some(data.clone());
642                    }
643                }
644            });
645            if let Some(data) = found_data {
646                return Ok(data);
647            }
648        }
649
650        Err(DatabaseError::new(
651            "BLOCK_NOT_FOUND",
652            &format!("Block {} not found in storage", block_id)
653        ))
654    }
655
656    pub(super) async fn repair_corrupted_block(&mut self, block_id: u64) -> Result<bool, DatabaseError> {
657        log::info!("Attempting to repair corrupted block {}", block_id);
658        
659        // For now, repair by removing the corrupted block and clearing its metadata
660        // In a real implementation, this might involve restoring from backup or rebuilding
661        
662        // Remove from cache
663        self.cache.remove(&block_id);
664        
665        // Remove checksum metadata
666        self.checksum_manager.remove_checksum(block_id);
667        
668        // Remove from filesystem if fs_persist is enabled
669        #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
670        {
671            let mut blocks_dir = self.base_dir.clone();
672            blocks_dir.push(&self.db_name);
673            blocks_dir.push("blocks");
674            let block_file = blocks_dir.join(format!("block_{}.bin", block_id));
675            let _ = std::fs::remove_file(&block_file);
676        }
677        
678        // Remove from test storage
679        #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
680        {
681            vfs_sync::with_global_storage(|storage| {
682                let mut storage_map = storage.borrow_mut();
683                if let Some(db_storage) = storage_map.get_mut(&self.db_name) {
684                    db_storage.remove(&block_id);
685                }
686            });
687        }
688        
689        // Remove from WASM storage
690        #[cfg(target_arch = "wasm32")]
691        {
692            vfs_sync::with_global_storage(|storage| {
693                let mut storage_map = storage.borrow_mut();
694                if let Some(db_storage) = storage_map.get_mut(&self.db_name) {
695                    db_storage.remove(&block_id);
696                }
697            });
698        }
699        
700        log::info!("Corrupted block {} has been removed (repair completed)", block_id);
701        Ok(true)
702    }
703
704    pub(super) fn touch_lru(&mut self, block_id: u64) {
705        // Remove any existing occurrence
706        if let Some(pos) = self.lru_order.iter().position(|&id| id == block_id) {
707            self.lru_order.remove(pos);
708        }
709        // Push as most-recent
710        self.lru_order.push_back(block_id);
711    }
712
713    pub(super) fn evict_if_needed(&mut self) {
714        // Evict clean LRU blocks until within capacity. Never evict dirty blocks.
715        while self.cache.len() > self.capacity {
716            // Find the least-recent block that is NOT dirty
717            let dirty_guard = self.dirty_blocks.lock();
718            let victim_pos = self
719                .lru_order
720                .iter()
721                .position(|id| !dirty_guard.contains_key(id));
722
723            match victim_pos {
724                Some(pos) => {
725                    let victim = self.lru_order.remove(pos).expect("valid pos");
726                    self.cache.remove(&victim);
727                }
728                None => {
729                    // All blocks are dirty; cannot evict. Allow temporary overflow.
730                    break;
731                }
732            }
733        }
734    }
735
736    #[inline]
737    #[cfg(target_arch = "wasm32")]
738    pub fn now_millis() -> u64 {
739        // Date::now() returns milliseconds since UNIX epoch as f64
740        Date::now() as u64
741    }
742
743    #[inline]
744    #[cfg(not(target_arch = "wasm32"))]
745    pub fn now_millis() -> u64 {
746        let now = SystemTime::now()
747            .duration_since(UNIX_EPOCH)
748            .unwrap_or_else(|_| Duration::from_millis(0));
749        now.as_millis() as u64
750    }
751
752    pub(super) fn verify_against_stored_checksum(
753        &self,
754        block_id: u64,
755        data: &[u8],
756    ) -> Result<(), DatabaseError> {
757        self.checksum_manager.validate_checksum(block_id, data)
758    }
759
760    /// Synchronous block read for environments that require sync access (e.g., VFS callbacks)
761    pub fn read_block_sync(&mut self, block_id: u64) -> Result<Vec<u8>, DatabaseError> {
762        // Implementation moved to io_operations module
763        super::io_operations::read_block_sync_impl(self, block_id)
764    }
765
766    pub async fn read_block(&mut self, block_id: u64) -> Result<Vec<u8>, DatabaseError> {
767        // Delegate to synchronous implementation (immediately ready)
768        self.read_block_sync(block_id)
769    }
770
771    /// Synchronous block write for environments that require sync access (e.g., VFS callbacks)
772    pub fn write_block_sync(&mut self, block_id: u64, data: Vec<u8>) -> Result<(), DatabaseError> {
773        // Implementation moved to io_operations module
774        super::io_operations::write_block_sync_impl(self, block_id, data)
775    }
776
777
778    pub async fn write_block(&mut self, block_id: u64, data: Vec<u8>) -> Result<(), DatabaseError> {
779        // Delegate to synchronous implementation (immediately ready)
780        self.write_block_sync(block_id, data)
781    }
782
783    /// Synchronous batch write of blocks
784    pub fn write_blocks_sync(&mut self, items: Vec<(u64, Vec<u8>)>) -> Result<(), DatabaseError> {
785        self.maybe_auto_sync();
786        for (block_id, data) in items {
787            self.write_block_sync(block_id, data)?;
788        }
789        Ok(())
790    }
791
792    /// Async batch write wrapper
793    pub async fn write_blocks(&mut self, items: Vec<(u64, Vec<u8>)>) -> Result<(), DatabaseError> {
794        self.write_blocks_sync(items)
795    }
796
797    /// Synchronous batch read of blocks, preserving input order
798    pub fn read_blocks_sync(&mut self, block_ids: &[u64]) -> Result<Vec<Vec<u8>>, DatabaseError> {
799        self.maybe_auto_sync();
800        let mut results = Vec::with_capacity(block_ids.len());
801        for &id in block_ids {
802            results.push(self.read_block_sync(id)?);
803        }
804        Ok(results)
805    }
806
807    /// Async batch read wrapper
808    pub async fn read_blocks(&mut self, block_ids: &[u64]) -> Result<Vec<Vec<u8>>, DatabaseError> {
809        self.read_blocks_sync(block_ids)
810    }
811
812    /// Get block checksum for verification
813    pub fn get_block_checksum(&self, block_id: u64) -> Option<u32> {
814        self.checksum_manager.get_checksum(block_id).map(|checksum| checksum as u32)
815    }
816
817    /// Get current commit marker for this database (WASM only, for testing)
818    #[cfg(target_arch = "wasm32")]
819    pub fn get_commit_marker(&self) -> u64 {
820        vfs_sync::with_global_commit_marker(|cm| {
821            cm.borrow().get(&self.db_name).copied().unwrap_or(0)
822        })
823    }
824
825    /// Check if this database has any blocks in storage (WASM only)
826    #[cfg(target_arch = "wasm32")]
827    pub fn has_any_blocks(&self) -> bool {
828        vfs_sync::with_global_storage(|gs| {
829            gs.borrow().get(&self.db_name).map_or(false, |blocks| !blocks.is_empty())
830        })
831    }
832
833    pub async fn verify_block_checksum(&mut self, block_id: u64) -> Result<(), DatabaseError> {
834        // If cached, verify directly against cached bytes
835        if let Some(bytes) = self.cache.get(&block_id).cloned() {
836            return self.verify_against_stored_checksum(block_id, &bytes);
837        }
838        // Otherwise, a read will populate cache and also verify
839        let data = self.read_block_sync(block_id)?;
840        self.verify_against_stored_checksum(block_id, &data)
841    }
842
843    #[cfg(any(test, debug_assertions))]
844    pub fn get_block_metadata_for_testing(&self) -> HashMap<u64, (u64, u32, u64)> {
845        // Returns map of block_id -> (checksum, version, last_modified_ms)
846        #[cfg(target_arch = "wasm32")]
847        {
848            let mut out = HashMap::new();
849            vfs_sync::with_global_metadata(|meta| {
850                let meta_map = meta.borrow();
851                if let Some(db_meta) = meta_map.get(&self.db_name) {
852                    for (bid, m) in db_meta.iter() {
853                        out.insert(*bid, (m.checksum, m.version, m.last_modified_ms));
854                    }
855                }
856            });
857            out
858        }
859        #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
860        {
861            use std::io::Read;
862            let mut out = HashMap::new();
863            let base: PathBuf = self.base_dir.clone();
864            let mut db_dir = base.clone();
865            db_dir.push(&self.db_name);
866            let mut meta_path = db_dir.clone();
867            meta_path.push("metadata.json");
868            if let Ok(mut f) = std::fs::File::open(&meta_path) {
869                let mut s = String::new();
870                if f.read_to_string(&mut s).is_ok() {
871                    if let Ok(parsed) = serde_json::from_str::<FsMeta>(&s) {
872                        for (bid, m) in parsed.entries.into_iter() { out.insert(bid, (m.checksum, m.version, m.last_modified_ms)); }
873                    }
874                }
875            }
876            out
877        }
878        #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
879        {
880            let mut out = HashMap::new();
881            GLOBAL_METADATA_TEST.with(|meta| {
882                let meta_map = meta.borrow();
883                if let Some(db_meta) = meta_map.get(&self.db_name) {
884                    for (bid, m) in db_meta.iter() {
885                        out.insert(*bid, (m.checksum, m.version, m.last_modified_ms));
886                    }
887                }
888            });
889            out
890        }
891    }
892
893    #[cfg(any(test, debug_assertions))]
894    pub fn set_block_checksum_for_testing(&mut self, block_id: u64, checksum: u64) {
895        self.checksum_manager.set_checksum_for_testing(block_id, checksum);
896    }
897
898    /// Getter for dirty_blocks for fs_persist and auto_sync modules
899    #[cfg(not(target_arch = "wasm32"))]
900    pub(super) fn get_dirty_blocks(&self) -> &Arc<Mutex<HashMap<u64, Vec<u8>>>> {
901        &self.dirty_blocks
902    }
903
904
905    pub async fn sync(&mut self) -> Result<(), DatabaseError> {
906        // For WASM, we need to handle the async IndexedDB operations properly
907        #[cfg(target_arch = "wasm32")]
908        {
909            // Call the sync implementation but handle the spawned async operations
910            let result = self.sync_implementation();
911            // Give time for the spawned IndexedDB operations to complete
912            wasm_bindgen_futures::JsFuture::from(js_sys::Promise::resolve(&wasm_bindgen::JsValue::UNDEFINED)).await.ok();
913            result
914        }
915        #[cfg(not(target_arch = "wasm32"))]
916        {
917            self.sync_implementation()
918        }
919    }
920
921    /// Synchronous version of sync() for immediate persistence
922    pub fn sync_now(&mut self) -> Result<(), DatabaseError> {
923        self.sync_implementation()
924    }
925
926    /// Internal sync implementation shared by sync() and sync_now()
927    fn sync_implementation(&mut self) -> Result<(), DatabaseError> {
928        super::sync_operations::sync_implementation_impl(self)
929    }
930
931    /// Sync blocks to global storage without advancing commit marker
932    /// Used by VFS x_sync callback to persist blocks but maintain commit marker lag
933    #[cfg(target_arch = "wasm32")]
934    pub fn sync_blocks_only(&mut self) -> Result<(), DatabaseError> {
935        super::wasm_vfs_sync::sync_blocks_only(self)
936    }
937
938
939    /// Async version of sync for WASM that properly awaits IndexedDB persistence
940    #[cfg(target_arch = "wasm32")]
941    pub async fn sync_async(&mut self) -> Result<(), DatabaseError> {
942        super::wasm_indexeddb::sync_async(self).await
943    }
944
945    /// Drain all pending dirty blocks and stop background auto-sync (if enabled).
946    /// Safe to call multiple times.
947    pub fn drain_and_shutdown(&mut self) {
948        if let Err(e) = self.sync_now() {
949            log::error!("drain_and_shutdown: sync_now failed: {}", e.message);
950        }
951        self.auto_sync_interval = None;
952        #[cfg(not(target_arch = "wasm32"))]
953        {
954            if let Some(stop) = &self.auto_sync_stop {
955                stop.store(true, Ordering::SeqCst);
956            }
957            if let Some(handle) = self.auto_sync_thread.take() {
958                let _ = handle.join();
959            }
960            if let Some(handle) = self.debounce_thread.take() {
961                let _ = handle.join();
962            }
963            if let Some(task) = self.tokio_timer_task.take() { task.abort(); }
964            if let Some(task) = self.tokio_debounce_task.take() { task.abort(); }
965            self.auto_sync_stop = None;
966            self.threshold_hit.store(false, Ordering::SeqCst);
967        }
968    }
969
970    pub fn clear_cache(&mut self) {
971        self.cache.clear();
972        self.lru_order.clear();
973    }
974    
975    /// Handle notification that the database has been imported
976    /// 
977    /// This method should be called after a database import to ensure
978    /// that any cached data is invalidated and fresh data is read from storage.
979    /// 
980    /// # Returns
981    /// * `Ok(())` - Cache cleared successfully
982    /// * `Err(DatabaseError)` - If cache clearing fails
983    /// 
984    /// # Example
985    /// ```rust,no_run
986    /// # use absurder_sql::storage::block_storage::BlockStorage;
987    /// # async fn example() -> Result<(), absurder_sql::types::DatabaseError> {
988    /// let mut storage = BlockStorage::new("mydb").await?;
989    /// // ... database is imported externally ...
990    /// storage.on_database_import().await?;
991    /// # Ok(())
992    /// # }
993    /// ```
994    pub async fn on_database_import(&mut self) -> Result<(), DatabaseError> {
995        log::info!("Clearing cache for database '{}' after import", self.db_name);
996        
997        // Clear the LRU cache to force re-reading from storage
998        self.clear_cache();
999        
1000        // Also clear dirty blocks since they're now stale
1001        self.dirty_blocks.lock().clear();
1002        
1003        // Clear checksum manager's cache to reload from new metadata
1004        self.checksum_manager.clear_checksums();
1005        
1006        // Reload allocated blocks from global storage/allocation map
1007        #[cfg(target_arch = "wasm32")]
1008        {
1009            use super::vfs_sync::with_global_allocation_map;
1010            self.allocated_blocks = with_global_allocation_map(|gam| {
1011                gam.borrow()
1012                    .get(&self.db_name)
1013                    .cloned()
1014                    .unwrap_or_else(std::collections::HashSet::new)
1015            });
1016            log::debug!("Reloaded {} allocated blocks from global allocation map", self.allocated_blocks.len());
1017            
1018            // Checksums are now managed by ChecksumManager, which loads from metadata on demand
1019            log::debug!("Checksum data will be reloaded from metadata on next verification");
1020        }
1021        
1022        #[cfg(not(target_arch = "wasm32"))]
1023        {
1024            #[cfg(feature = "fs_persist")]
1025            {
1026                // Reload from filesystem allocations.json
1027                let mut alloc_path = self.base_dir.clone();
1028                alloc_path.push(&self.db_name);
1029                alloc_path.push("allocations.json");
1030                
1031                if let Ok(content) = std::fs::read_to_string(&alloc_path) {
1032                    if let Ok(alloc_data) = serde_json::from_str::<serde_json::Value>(&content) {
1033                        if let Some(allocated_array) = alloc_data["allocated"].as_array() {
1034                            self.allocated_blocks.clear();
1035                            for block_id_val in allocated_array {
1036                                if let Some(block_id) = block_id_val.as_u64() {
1037                                    self.allocated_blocks.insert(block_id);
1038                                }
1039                            }
1040                            log::debug!("Reloaded {} allocated blocks from filesystem", self.allocated_blocks.len());
1041                        }
1042                    }
1043                }
1044                
1045                // Reload checksums from filesystem metadata.json
1046                let mut meta_path = self.base_dir.clone();
1047                meta_path.push(&self.db_name);
1048                meta_path.push("metadata.json");
1049                
1050                if let Ok(content) = std::fs::read_to_string(&meta_path) {
1051                    if let Ok(meta_data) = serde_json::from_str::<serde_json::Value>(&content) {
1052                        if let Some(entries) = meta_data["entries"].as_array() {
1053                            let mut new_checksums = HashMap::new();
1054                            let mut new_algos = HashMap::new();
1055                            
1056                            for entry in entries {
1057                                if let (Some(block_id), Some(checksum), Some(algo_str)) = (
1058                                    entry[0].as_u64(),
1059                                    entry[1]["checksum"].as_u64(),
1060                                    entry[1]["algo"].as_str(),
1061                                ) {
1062                                    new_checksums.insert(block_id, checksum);
1063                                    
1064                                    let algo = match algo_str {
1065                                        "CRC32" => super::metadata::ChecksumAlgorithm::CRC32,
1066                                        _ => super::metadata::ChecksumAlgorithm::FastHash,
1067                                    };
1068                                    new_algos.insert(block_id, algo);
1069                                }
1070                            }
1071                            
1072                            self.checksum_manager.replace_all(new_checksums.clone(), new_algos);
1073                            log::debug!("Reloaded {} checksums from filesystem metadata", new_checksums.len());
1074                        }
1075                    }
1076                } else {
1077                    log::debug!("No metadata file found, checksums will be empty after import");
1078                }
1079            }
1080            
1081            #[cfg(not(feature = "fs_persist"))]
1082            {
1083                // Native test mode: reload from GLOBAL_ALLOCATION_MAP
1084                use super::vfs_sync::with_global_allocation_map;
1085                
1086                self.allocated_blocks = with_global_allocation_map(|gam| {
1087                    gam.borrow()
1088                        .get(&self.db_name)
1089                        .cloned()
1090                        .unwrap_or_else(std::collections::HashSet::new)
1091                });
1092                log::debug!("Reloaded {} allocated blocks from global allocation map (native test)", self.allocated_blocks.len());
1093                
1094                // Checksums are managed by ChecksumManager, loaded from metadata on demand
1095                log::debug!("Checksum data will be reloaded from metadata on next verification");
1096            }
1097        }
1098        
1099        log::info!("Cache and allocation state refreshed for '{}'", self.db_name);
1100        
1101        Ok(())
1102    }
1103    
1104    /// Reload cache from GLOBAL_STORAGE (WASM only, for multi-connection support)
1105    #[cfg(target_arch = "wasm32")]
1106    pub fn reload_cache_from_global_storage(&mut self) {
1107        use crate::storage::vfs_sync::{with_global_storage, with_global_metadata};
1108        let fresh_cache = with_global_storage(|gs| {
1109            let storage_map = gs.borrow();
1110            if let Some(db_storage) = storage_map.get(&self.db_name) {
1111                db_storage.clone()
1112            } else {
1113                std::collections::HashMap::new()
1114            }
1115        });
1116        
1117        // CRITICAL: Also reload checksums from GLOBAL_METADATA to match the fresh cache
1118        // Without this, cached reads will verify against stale checksums (e.g., after import)
1119        with_global_metadata(|gm| {
1120            let metadata_map = gm.borrow();
1121            if let Some(db_metadata) = metadata_map.get(&self.db_name) {
1122                // Clear existing checksums
1123                self.checksum_manager.clear_checksums();
1124                
1125                // Load checksums from metadata
1126                let mut new_checksums = std::collections::HashMap::new();
1127                let mut new_algos = std::collections::HashMap::new();
1128                for (block_id, meta) in db_metadata {
1129                    new_checksums.insert(*block_id, meta.checksum);
1130                    new_algos.insert(*block_id, meta.algo);
1131                }
1132                self.checksum_manager.replace_all(new_checksums, new_algos);
1133            } else {
1134                // No metadata exists, clear checksums
1135                self.checksum_manager.clear_checksums();
1136            }
1137        });
1138        
1139        // Replace cache contents while preserving LRU order for blocks that still exist
1140        let old_lru = std::mem::replace(&mut self.lru_order, std::collections::VecDeque::new());
1141        self.cache.clear();
1142        
1143        // Insert new cache data
1144        for (block_id, block_data) in fresh_cache {
1145            self.cache.insert(block_id, block_data);
1146        }
1147        
1148        // Restore LRU order for blocks that still exist, then add new blocks
1149        for block_id in old_lru {
1150            if self.cache.contains_key(&block_id) {
1151                self.lru_order.push_back(block_id);
1152            }
1153        }
1154        
1155        // Add any new blocks not in the old LRU order
1156        for &block_id in self.cache.keys() {
1157            if !self.lru_order.contains(&block_id) {
1158                self.lru_order.push_back(block_id);
1159            }
1160        }
1161    }
1162
1163    pub fn get_cache_size(&self) -> usize {
1164        self.cache.len()
1165    }
1166
1167    pub fn get_dirty_count(&self) -> usize {
1168        self.dirty_blocks.lock().len()
1169    }
1170
1171    pub fn is_cached(&self, block_id: u64) -> bool {
1172        self.cache.contains_key(&block_id)
1173    }
1174
1175    /// Allocate a new block and return its ID
1176    pub async fn allocate_block(&mut self) -> Result<u64, DatabaseError> {
1177        super::allocation::allocate_block_impl(self).await
1178    }
1179
1180    /// Deallocate a block and mark it as available for reuse
1181    pub async fn deallocate_block(&mut self, block_id: u64) -> Result<(), DatabaseError> {
1182        super::allocation::deallocate_block_impl(self, block_id).await
1183    }
1184
1185    /// Get the number of currently allocated blocks
1186    pub fn get_allocated_count(&self) -> usize {
1187        self.allocated_blocks.len()
1188    }
1189
1190    /// Crash simulation: simulate crash during IndexedDB commit
1191    /// If `blocks_written` is true, blocks are written to IndexedDB but commit marker doesn't advance
1192    /// If `blocks_written` is false, crash occurs before blocks are written
1193    #[cfg(target_arch = "wasm32")]
1194    pub async fn crash_simulation_sync(&mut self, blocks_written: bool) -> Result<(), DatabaseError> {
1195        log::info!("CRASH SIMULATION: Starting crash simulation with blocks_written={}", blocks_written);
1196        
1197        if blocks_written {
1198            // Simulate crash after blocks are written but before commit marker advances
1199            // This is the most critical crash scenario to test
1200            
1201            // Step 1: Write blocks to IndexedDB (simulate partial transaction completion)
1202            let dirty_blocks = {
1203                let dirty = self.dirty_blocks.lock();
1204                dirty.clone()
1205            };
1206            
1207            if !dirty_blocks.is_empty() {
1208                log::info!("CRASH SIMULATION: Writing {} blocks to IndexedDB before crash", dirty_blocks.len());
1209                
1210                // Use the existing IndexedDB persistence logic but don't advance commit marker
1211                let metadata_to_persist: Vec<(u64, u64)> = dirty_blocks
1212                    .keys()
1213                    .map(|&block_id| {
1214                        let next_commit = self.get_commit_marker() + 1;
1215                        (block_id, next_commit)
1216                    })
1217                    .collect();
1218                
1219                log::debug!("CRASH SIMULATION: About to call persist_to_indexeddb for {} blocks", dirty_blocks.len());
1220                
1221                // Write blocks and metadata to IndexedDB
1222                super::wasm_indexeddb::persist_to_indexeddb(
1223                    &self.db_name,
1224                    dirty_blocks,
1225                    metadata_to_persist,
1226                ).await?;
1227                
1228                log::info!("CRASH SIMULATION: persist_to_indexeddb completed successfully");
1229                log::info!("CRASH SIMULATION: Blocks written to IndexedDB, simulating crash before commit marker advance");
1230                
1231                // Clear dirty blocks (they're now in IndexedDB)
1232                self.dirty_blocks.lock().clear();
1233                
1234                // DON'T advance commit marker - this simulates the crash
1235                // In a real crash, the commit marker update would fail
1236                
1237                return Ok(());
1238            } else {
1239                log::info!("CRASH SIMULATION: No dirty blocks to write");
1240                return Ok(());
1241            }
1242        } else {
1243            // Simulate crash before blocks are written
1244            log::info!("CRASH SIMULATION: Simulating crash before blocks are written to IndexedDB");
1245            
1246            // Just return success - blocks remain dirty, nothing written to IndexedDB
1247            return Ok(());
1248        }
1249    }
1250
1251    /// Crash simulation: simulate partial block writes during IndexedDB commit
1252    /// Only specified blocks are written to IndexedDB before crash
1253    #[cfg(target_arch = "wasm32")]
1254    pub async fn crash_simulation_partial_sync(&mut self, blocks_to_write: &[u64]) -> Result<(), DatabaseError> {
1255        log::info!("CRASH SIMULATION: Starting partial crash simulation for {} blocks", blocks_to_write.len());
1256        
1257        let dirty_blocks = {
1258            let dirty = self.dirty_blocks.lock();
1259            dirty.clone()
1260        };
1261        
1262        // Filter to only the blocks we want to "successfully" write before crash
1263        let partial_blocks: std::collections::HashMap<u64, Vec<u8>> = dirty_blocks
1264            .into_iter()
1265            .filter(|(block_id, _)| blocks_to_write.contains(block_id))
1266            .collect();
1267        
1268        if !partial_blocks.is_empty() {
1269            log::info!("CRASH SIMULATION: Writing {} out of {} blocks before crash", 
1270                      partial_blocks.len(), blocks_to_write.len());
1271            
1272            let metadata_to_persist: Vec<(u64, u64)> = partial_blocks
1273                .keys()
1274                .map(|&block_id| {
1275                    let next_commit = self.get_commit_marker() + 1;
1276                    (block_id, next_commit)
1277                })
1278                .collect();
1279            
1280            // Write only the partial blocks to IndexedDB
1281            super::wasm_indexeddb::persist_to_indexeddb(
1282                &self.db_name,
1283                partial_blocks.clone(),
1284                metadata_to_persist,
1285            ).await?;
1286            
1287            // Remove only the written blocks from dirty_blocks
1288            {
1289                let mut dirty = self.dirty_blocks.lock();
1290                for block_id in partial_blocks.keys() {
1291                    dirty.remove(block_id);
1292                }
1293            }
1294            
1295            log::info!("CRASH SIMULATION: Partial blocks written, simulating crash before commit marker advance");
1296            
1297            // DON'T advance commit marker - simulates crash during transaction
1298        }
1299        
1300        Ok(())
1301    }
1302
1303    /// Perform crash recovery: detect and handle incomplete IndexedDB transactions
1304    /// This method detects inconsistencies between IndexedDB state and commit markers
1305    /// and either finalizes or rolls back incomplete transactions
1306    #[cfg(target_arch = "wasm32")]
1307    pub async fn perform_crash_recovery(&mut self) -> Result<CrashRecoveryAction, DatabaseError> {
1308        log::info!("CRASH RECOVERY: Starting crash recovery scan for database: {}", self.db_name);
1309        
1310        // Step 1: Get current commit marker
1311        let current_marker = self.get_commit_marker();
1312        log::info!("CRASH RECOVERY: Current commit marker: {}", current_marker);
1313        
1314        // Step 2: Scan IndexedDB for blocks with versions > commit marker
1315        // These represent incomplete transactions that need recovery
1316        let inconsistent_blocks = self.scan_for_inconsistent_blocks(current_marker).await?;
1317        
1318        if inconsistent_blocks.is_empty() {
1319            log::info!("CRASH RECOVERY: No inconsistent blocks found, system is consistent");
1320            return Ok(CrashRecoveryAction::NoActionNeeded);
1321        }
1322        
1323        log::info!("CRASH RECOVERY: Found {} inconsistent blocks that need recovery", inconsistent_blocks.len());
1324        
1325        // Step 3: Determine recovery action based on transaction completeness
1326        let recovery_action = self.determine_recovery_action(&inconsistent_blocks).await?;
1327        
1328        match recovery_action {
1329            CrashRecoveryAction::Rollback => {
1330                log::info!("CRASH RECOVERY: Performing rollback of incomplete transaction");
1331                self.rollback_incomplete_transaction(&inconsistent_blocks).await?;
1332            }
1333            CrashRecoveryAction::Finalize => {
1334                log::info!("CRASH RECOVERY: Performing finalization of complete transaction");
1335                self.finalize_complete_transaction(&inconsistent_blocks).await?;
1336            }
1337            CrashRecoveryAction::NoActionNeeded => {
1338                // Already handled above
1339            }
1340        }
1341        
1342        log::info!("CRASH RECOVERY: Recovery completed successfully");
1343        Ok(recovery_action)
1344    }
1345
1346    /// Scan IndexedDB for blocks with versions greater than the commit marker
1347    #[cfg(target_arch = "wasm32")]
1348    async fn scan_for_inconsistent_blocks(&self, commit_marker: u64) -> Result<Vec<(u64, u64)>, DatabaseError> {
1349        log::info!("CRASH RECOVERY: Scanning for blocks with version > {}", commit_marker);
1350        
1351        // This is a simplified implementation - in a real system we'd scan IndexedDB directly
1352        // For now, we'll check the global metadata storage
1353        let mut inconsistent_blocks = Vec::new();
1354        
1355        vfs_sync::with_global_metadata(|meta| {
1356            let meta_map = meta.borrow();
1357            if let Some(db_meta) = meta_map.get(&self.db_name) {
1358                for (block_id, metadata) in db_meta.iter() {
1359                    if metadata.version as u64 > commit_marker {
1360                        log::info!("CRASH RECOVERY: Found inconsistent block {} with version {} > marker {}", 
1361                                  block_id, metadata.version, commit_marker);
1362                        inconsistent_blocks.push((*block_id, metadata.version as u64));
1363                    }
1364                }
1365            }
1366        });
1367        
1368        Ok(inconsistent_blocks)
1369    }
1370
1371    /// Determine whether to rollback or finalize based on transaction completeness
1372    #[cfg(target_arch = "wasm32")]
1373    async fn determine_recovery_action(&self, inconsistent_blocks: &[(u64, u64)]) -> Result<CrashRecoveryAction, DatabaseError> {
1374        // Simple heuristic: if all inconsistent blocks have the same version (next expected commit),
1375        // then the transaction was likely complete and should be finalized.
1376        // Otherwise, rollback to maintain consistency.
1377        
1378        let expected_next_commit = self.get_commit_marker() + 1;
1379        let all_same_version = inconsistent_blocks
1380            .iter()
1381            .all(|(_, version)| *version == expected_next_commit);
1382        
1383        if all_same_version && !inconsistent_blocks.is_empty() {
1384            log::info!("CRASH RECOVERY: All inconsistent blocks have expected version {}, finalizing transaction", expected_next_commit);
1385            Ok(CrashRecoveryAction::Finalize)
1386        } else {
1387            log::info!("CRASH RECOVERY: Inconsistent block versions detected, rolling back transaction");
1388            Ok(CrashRecoveryAction::Rollback)
1389        }
1390    }
1391
1392    /// Rollback incomplete transaction by removing inconsistent blocks
1393    #[cfg(target_arch = "wasm32")]
1394    async fn rollback_incomplete_transaction(&mut self, inconsistent_blocks: &[(u64, u64)]) -> Result<(), DatabaseError> {
1395        log::info!("CRASH RECOVERY: Rolling back {} inconsistent blocks", inconsistent_blocks.len());
1396        
1397        // Remove inconsistent blocks from global metadata
1398        vfs_sync::with_global_metadata(|meta| {
1399            let mut meta_map = meta.borrow_mut();
1400            if let Some(db_meta) = meta_map.get_mut(&self.db_name) {
1401                for (block_id, _) in inconsistent_blocks {
1402                    log::info!("CRASH RECOVERY: Removing inconsistent block {} from metadata", block_id);
1403                    db_meta.remove(block_id);
1404                }
1405            }
1406        });
1407        
1408        // Remove inconsistent blocks from global storage
1409        vfs_sync::with_global_storage(|gs| {
1410            let mut storage_map = gs.borrow_mut();
1411            if let Some(db_storage) = storage_map.get_mut(&self.db_name) {
1412                for (block_id, _) in inconsistent_blocks {
1413                    log::info!("CRASH RECOVERY: Removing inconsistent block {} from global storage", block_id);
1414                    db_storage.remove(block_id);
1415                }
1416            }
1417        });
1418        
1419        // Clear any cached data for these blocks
1420        for (block_id, _) in inconsistent_blocks {
1421            self.cache.remove(block_id);
1422            // Remove from LRU order
1423            self.lru_order.retain(|&id| id != *block_id);
1424        }
1425        
1426        // Remove inconsistent blocks from IndexedDB to avoid accumulating orphaned data
1427        let block_ids_to_delete: Vec<u64> = inconsistent_blocks.iter().map(|(id, _)| *id).collect();
1428        if !block_ids_to_delete.is_empty() {
1429            log::info!("CRASH RECOVERY: Deleting {} blocks from IndexedDB", block_ids_to_delete.len());
1430            super::wasm_indexeddb::delete_blocks_from_indexeddb(&self.db_name, &block_ids_to_delete).await?;
1431            log::info!("CRASH RECOVERY: Successfully deleted blocks from IndexedDB");
1432        }
1433        
1434        log::info!("CRASH RECOVERY: Rollback completed");
1435        Ok(())
1436    }
1437
1438    /// Finalize complete transaction by advancing commit marker
1439    #[cfg(target_arch = "wasm32")]
1440    async fn finalize_complete_transaction(&mut self, inconsistent_blocks: &[(u64, u64)]) -> Result<(), DatabaseError> {
1441        log::info!("CRASH RECOVERY: Finalizing transaction for {} blocks", inconsistent_blocks.len());
1442        
1443        // Find the target commit marker (should be consistent across all blocks)
1444        if let Some((_, target_version)) = inconsistent_blocks.first() {
1445            let new_commit_marker = *target_version;
1446            
1447            // Advance the commit marker to make the blocks visible
1448            vfs_sync::with_global_commit_marker(|cm| {
1449                cm.borrow_mut().insert(self.db_name.clone(), new_commit_marker);
1450            });
1451            
1452            log::info!("CRASH RECOVERY: Advanced commit marker from {} to {}", 
1453                      self.get_commit_marker(), new_commit_marker);
1454            
1455            // Update checksums for the finalized blocks
1456            for (block_id, _) in inconsistent_blocks {
1457                // Read the block data to compute and store checksum
1458                if let Ok(data) = self.read_block_sync(*block_id) {
1459                    self.checksum_manager.store_checksum(*block_id, &data);
1460                    log::info!("CRASH RECOVERY: Updated checksum for finalized block {}", block_id);
1461                }
1462            }
1463        }
1464        
1465        log::info!("CRASH RECOVERY: Finalization completed");
1466        Ok(())
1467    }
1468
1469    // Leader Election Methods (WASM only)
1470    
1471    /// Start leader election process
1472    #[cfg(target_arch = "wasm32")]
1473    pub async fn start_leader_election(&mut self) -> Result<(), DatabaseError> {
1474        if self.leader_election.is_none() {
1475            let mut manager = super::leader_election::LeaderElectionManager::new(self.db_name.clone());
1476            manager.start_election().await?;
1477            self.leader_election = Some(manager);
1478        } else {
1479            // If election is already running, force leadership takeover (requestLeadership)
1480            if let Some(ref mut manager) = self.leader_election {
1481                manager.force_become_leader().await?;
1482            }
1483        }
1484        Ok(())
1485    }
1486    
1487    /// Check if this instance is the leader (with re-election on lease expiry)
1488    #[cfg(target_arch = "wasm32")]
1489    pub async fn is_leader(&mut self) -> bool {
1490        // Start leader election if not already started
1491        if self.leader_election.is_none() {
1492            log::debug!("Starting leader election for {}", self.db_name);
1493            if let Err(e) = self.start_leader_election().await {
1494                log::error!("Failed to start leader election: {:?}", e);
1495                return false;
1496            }
1497        }
1498        
1499        if let Some(ref mut manager) = self.leader_election {
1500            let is_leader = manager.is_leader().await;
1501            
1502            // If no current leader (lease expired), trigger re-election
1503            if !is_leader {
1504                let state = manager.state.borrow();
1505                if state.leader_id.is_none() {
1506                    log::debug!("No current leader for {} - triggering re-election", self.db_name);
1507                    drop(state);
1508                    let _ = manager.try_become_leader().await;
1509                    
1510                    // Start heartbeat if we became leader
1511                    let new_is_leader = manager.state.borrow().is_leader;
1512                    if new_is_leader && manager.heartbeat_interval.is_none() {
1513                        let _ = manager.start_heartbeat();
1514                    }
1515                    
1516                    log::debug!("is_leader() for {} = {} (after re-election)", self.db_name, new_is_leader);
1517                    return new_is_leader;
1518                }
1519            }
1520            
1521            log::debug!("is_leader() for {} = {}", self.db_name, is_leader);
1522            is_leader
1523        } else {
1524            log::debug!("No leader election manager for {}", self.db_name);
1525            false
1526        }
1527    }
1528    
1529    /// Stop leader election (e.g., when tab is closing)
1530    #[cfg(target_arch = "wasm32")]
1531    pub async fn stop_leader_election(&mut self) -> Result<(), DatabaseError> {
1532        if let Some(mut manager) = self.leader_election.take() {
1533            manager.stop_election().await?;
1534        }
1535        Ok(())
1536    }
1537    
1538    /// Send a leader heartbeat (for testing)
1539    #[cfg(target_arch = "wasm32")]
1540    pub async fn send_leader_heartbeat(&self) -> Result<(), DatabaseError> {
1541        if let Some(ref manager) = self.leader_election {
1542            manager.send_heartbeat().await
1543        } else {
1544            Err(DatabaseError::new("LEADER_ELECTION_ERROR", "Leader election not started"))
1545        }
1546    }
1547    
1548    /// Get timestamp of last received leader heartbeat
1549    #[cfg(target_arch = "wasm32")]
1550    pub async fn get_last_leader_heartbeat(&self) -> Result<u64, DatabaseError> {
1551        if let Some(ref manager) = self.leader_election {
1552            Ok(manager.get_last_heartbeat().await)
1553        } else {
1554            Err(DatabaseError::new("LEADER_ELECTION_ERROR", "Leader election not started"))
1555        }
1556    }
1557
1558    // Observability Methods
1559
1560    /// Get comprehensive metrics for observability
1561    pub fn get_metrics(&self) -> super::observability::StorageMetrics {
1562        let dirty_count = self.get_dirty_count();
1563        let dirty_bytes = dirty_count * BLOCK_SIZE;
1564        
1565        #[cfg(not(target_arch = "wasm32"))]
1566        let (sync_count, timer_sync_count, debounce_sync_count, last_sync_duration_ms) = {
1567            (
1568                self.sync_count.load(Ordering::SeqCst),
1569                self.timer_sync_count.load(Ordering::SeqCst),
1570                self.debounce_sync_count.load(Ordering::SeqCst),
1571                self.last_sync_duration_ms.load(Ordering::SeqCst),
1572            )
1573        };
1574        
1575        #[cfg(target_arch = "wasm32")]
1576        let (sync_count, timer_sync_count, debounce_sync_count, last_sync_duration_ms) = {
1577            // For WASM, use observability manager for sync_count tracking
1578            (self.observability.get_sync_count(), 0, 0, 1)
1579        };
1580        
1581        let error_count = self.observability.get_error_count();
1582        let checksum_failures = self.observability.get_checksum_failures();
1583        
1584        // Calculate throughput and error rate
1585        let total_operations = sync_count + error_count;
1586        let (throughput_blocks_per_sec, throughput_bytes_per_sec) = 
1587            self.observability.calculate_throughput(last_sync_duration_ms);
1588        let error_rate = self.observability.calculate_error_rate(total_operations);
1589        
1590        super::observability::StorageMetrics {
1591            dirty_count,
1592            dirty_bytes,
1593            sync_count,
1594            timer_sync_count,
1595            debounce_sync_count,
1596            error_count,
1597            checksum_failures,
1598            last_sync_duration_ms,
1599            throughput_blocks_per_sec,
1600            throughput_bytes_per_sec,
1601            error_rate,
1602        }
1603    }
1604
1605    /// Set sync event callbacks
1606    #[cfg(not(target_arch = "wasm32"))]
1607    pub fn set_sync_callbacks(
1608        &mut self,
1609        on_sync_start: super::observability::SyncStartCallback,
1610        on_sync_success: super::observability::SyncSuccessCallback,
1611        on_sync_failure: super::observability::SyncFailureCallback,
1612    ) {
1613        self.observability.sync_start_callback = Some(on_sync_start);
1614        self.observability.sync_success_callback = Some(on_sync_success);
1615        self.observability.sync_failure_callback = Some(on_sync_failure);
1616    }
1617
1618    /// Set backpressure callback
1619    #[cfg(not(target_arch = "wasm32"))]
1620    pub fn set_backpressure_callback(&mut self, callback: super::observability::BackpressureCallback) {
1621        self.observability.backpressure_callback = Some(callback);
1622    }
1623
1624    /// Set error callback
1625    #[cfg(not(target_arch = "wasm32"))]
1626    pub fn set_error_callback(&mut self, callback: super::observability::ErrorCallback) {
1627        self.observability.error_callback = Some(callback);
1628    }
1629
1630    /// Set WASM sync success callback
1631    #[cfg(target_arch = "wasm32")]
1632    pub fn set_sync_success_callback(&mut self, callback: super::observability::WasmSyncSuccessCallback) {
1633        self.observability.wasm_sync_success_callback = Some(callback);
1634    }
1635    
1636    /// Check if auto-sync is currently enabled
1637    pub fn is_auto_sync_enabled(&self) -> bool {
1638        self.auto_sync_interval.is_some()
1639    }
1640    
1641    /// Get the current sync policy (if any)
1642    pub fn get_sync_policy(&self) -> Option<super::SyncPolicy> {
1643        self.policy.clone()
1644    }
1645    
1646    /// Force synchronization with durability guarantees
1647    /// 
1648    /// This method ensures that all dirty blocks are persisted to durable storage
1649    /// (IndexedDB in WASM, filesystem in native) and waits for the operation to complete.
1650    /// This is called by VFS xSync to provide SQLite's durability guarantees.
1651    pub async fn force_sync(&mut self) -> Result<(), DatabaseError> {
1652        log::info!("force_sync: Starting forced synchronization with durability guarantees");
1653        
1654        let dirty_count = self.get_dirty_count();
1655        if dirty_count == 0 {
1656            log::debug!("force_sync: No dirty blocks to sync");
1657            return Ok(());
1658        }
1659        
1660        log::info!("force_sync: Syncing {} dirty blocks with durability guarantee", dirty_count);
1661        
1662        // Just use the regular sync - it already waits for persistence in WASM
1663        self.sync().await?;
1664        
1665        log::info!("force_sync: Successfully completed forced synchronization");
1666        Ok(())
1667    }
1668    
1669    /// Set telemetry metrics (used for instrumentation)
1670    #[cfg(feature = "telemetry")]
1671    pub fn set_metrics(&mut self, metrics: Option<crate::telemetry::Metrics>) {
1672        self.metrics = metrics;
1673    }
1674    
1675    /// Get telemetry metrics
1676    #[cfg(feature = "telemetry")]
1677    pub fn metrics(&self) -> Option<&crate::telemetry::Metrics> {
1678        self.metrics.as_ref()
1679    }
1680    
1681    /// Create a test instance with minimal setup
1682    #[cfg(feature = "telemetry")]
1683    pub fn new_for_test() -> Self {
1684        Self {
1685            cache: HashMap::new(),
1686            dirty_blocks: Arc::new(parking_lot::Mutex::new(HashMap::new())),
1687            allocated_blocks: HashSet::new(),
1688            deallocated_blocks: HashSet::new(),
1689            next_block_id: 1,
1690            capacity: 128,
1691            lru_order: VecDeque::new(),
1692            checksum_manager: crate::storage::metadata::ChecksumManager::new(
1693                crate::storage::metadata::ChecksumAlgorithm::FastHash
1694            ),
1695            #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
1696            base_dir: std::path::PathBuf::from("/tmp/test"),
1697            db_name: "test.db".to_string(),
1698            auto_sync_interval: None,
1699            #[cfg(not(target_arch = "wasm32"))]
1700            last_auto_sync: std::time::Instant::now(),
1701            policy: None,
1702            #[cfg(not(target_arch = "wasm32"))]
1703            auto_sync_stop: None,
1704            #[cfg(not(target_arch = "wasm32"))]
1705            auto_sync_thread: None,
1706            #[cfg(not(target_arch = "wasm32"))]
1707            debounce_thread: None,
1708            #[cfg(not(target_arch = "wasm32"))]
1709            tokio_timer_task: None,
1710            #[cfg(not(target_arch = "wasm32"))]
1711            tokio_debounce_task: None,
1712            #[cfg(not(target_arch = "wasm32"))]
1713            last_write_ms: Arc::new(std::sync::atomic::AtomicU64::new(0)),
1714            #[cfg(not(target_arch = "wasm32"))]
1715            threshold_hit: Arc::new(std::sync::atomic::AtomicBool::new(false)),
1716            #[cfg(not(target_arch = "wasm32"))]
1717            sync_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
1718            #[cfg(not(target_arch = "wasm32"))]
1719            timer_sync_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
1720            #[cfg(not(target_arch = "wasm32"))]
1721            debounce_sync_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
1722            #[cfg(not(target_arch = "wasm32"))]
1723            last_sync_duration_ms: Arc::new(std::sync::atomic::AtomicU64::new(0)),
1724            #[cfg(not(target_arch = "wasm32"))]
1725            sync_sender: None,
1726            #[cfg(not(target_arch = "wasm32"))]
1727            sync_receiver: None,
1728            recovery_report: RecoveryReport::default(),
1729            #[cfg(target_arch = "wasm32")]
1730            leader_election: None,
1731            observability: super::observability::ObservabilityManager::new(),
1732            metrics: None,
1733        }
1734    }
1735}
1736
1737#[cfg(all(test, target_arch = "wasm32"))]
1738mod wasm_commit_marker_tests {
1739    use super::*;
1740    use wasm_bindgen_test::*;
1741
1742    wasm_bindgen_test_configure!(run_in_browser);
1743
1744    // Helper: set commit marker for a db name in WASM global
1745    fn set_commit_marker(db: &str, v: u64) {
1746        super::vfs_sync::with_global_commit_marker(|cm| {
1747            cm.borrow_mut().insert(db.to_string(), v);
1748        });
1749    }
1750
1751    // Helper: get commit marker for a db name in WASM global
1752    fn get_commit_marker(db: &str) -> u64 {
1753        vfs_sync::with_global_commit_marker(|cm| cm.borrow().get(db).copied().unwrap_or(0))
1754    }
1755
1756    #[wasm_bindgen_test]
1757    async fn gating_returns_zeroed_until_marker_catches_up_wasm() {
1758        let db = "cm_gating_wasm";
1759        let mut s = BlockStorage::new(db).await.expect("create storage");
1760
1761        // Write a block (starts at version 1, uncommitted)
1762        let bid = s.allocate_block().await.expect("alloc block");
1763        let data_v1 = vec![0x33u8; BLOCK_SIZE];
1764        s.write_block(bid, data_v1.clone()).await.expect("write v1");
1765        
1766        // Before sync, commit marker is 0, block version is 1, so should be invisible
1767        s.clear_cache();
1768        let out0 = s.read_block(bid).await.expect("read before commit");
1769        assert_eq!(out0, vec![0u8; BLOCK_SIZE], "uncommitted data must read as zeroed");
1770
1771        // After sync, commit marker advances to 1, block version is 1, so should be visible
1772        s.sync().await.expect("sync v1");
1773        s.clear_cache();
1774        let out1 = s.read_block(bid).await.expect("read after commit");
1775        assert_eq!(out1, data_v1, "committed data should be visible");
1776    }
1777
1778    #[wasm_bindgen_test]
1779    async fn invisible_blocks_skip_checksum_verification_wasm() {
1780        let db = "cm_checksum_skip_wasm";
1781        let mut s = BlockStorage::new(db).await.expect("create storage");
1782
1783        let bid = s.allocate_block().await.expect("alloc block");
1784        let data = vec![0x44u8; BLOCK_SIZE];
1785        s.write_block(bid, data).await.expect("write v1");
1786        s.sync().await.expect("sync v1"); // commit marker advances to 1, block version is 1
1787
1788        // Make the block invisible by moving commit marker back to 0
1789        set_commit_marker(db, 0);
1790
1791        // Corrupt the stored checksum; invisible reads must NOT verify checksum
1792        s.set_block_checksum_for_testing(bid, 1234567);
1793        s.clear_cache();
1794        let out = s.read_block(bid).await.expect("read while invisible should not error");
1795        assert_eq!(out, vec![0u8; BLOCK_SIZE], "invisible block reads as zeroed");
1796
1797        // Now make it visible again; checksum verification should trigger and fail
1798        set_commit_marker(db, 1);
1799        s.clear_cache();
1800        let err = s
1801            .read_block(bid)
1802            .await
1803            .expect_err("expected checksum mismatch once visible");
1804        assert_eq!(err.code, "CHECKSUM_MISMATCH");
1805    }
1806
1807    #[wasm_bindgen_test]
1808    async fn commit_marker_advances_and_versions_track_syncs_wasm() {
1809        let db = "cm_versions_wasm";
1810        let mut s = BlockStorage::new_with_capacity(db, 8)
1811            .await
1812            .expect("create storage");
1813
1814        let b1 = s.allocate_block().await.expect("alloc b1");
1815        let b2 = s.allocate_block().await.expect("alloc b2");
1816
1817        s.write_block(b1, vec![1u8; BLOCK_SIZE]).await.expect("write b1 v1");
1818        s.write_block(b2, vec![2u8; BLOCK_SIZE]).await.expect("write b2 v1");
1819        s.sync().await.expect("sync #1");
1820
1821        let cm1 = get_commit_marker(db);
1822        assert_eq!(cm1, 1, "first sync should advance commit marker to 1");
1823        let meta1 = s.get_block_metadata_for_testing();
1824        assert_eq!(meta1.get(&b1).unwrap().1 as u64, cm1);
1825        assert_eq!(meta1.get(&b2).unwrap().1 as u64, cm1);
1826
1827        // Update only b1 and sync again; only b1's version should bump
1828        s.write_block(b1, vec![3u8; BLOCK_SIZE]).await.expect("write b1 v2");
1829        s.sync().await.expect("sync #2");
1830
1831        let cm2 = get_commit_marker(db);
1832        assert_eq!(cm2, 2, "second sync should advance commit marker to 2");
1833        let meta2 = s.get_block_metadata_for_testing();
1834        assert_eq!(meta2.get(&b1).unwrap().1 as u64, cm2, "updated block tracks new version");
1835        assert_eq!(meta2.get(&b2).unwrap().1 as u64, 1, "unchanged block retains prior version");
1836    }
1837}
1838
1839#[cfg(all(test, not(target_arch = "wasm32"), not(feature = "fs_persist")))]
1840mod commit_marker_tests {
1841    use super::*;
1842
1843    // Helper: set commit marker for a db name in test-global mirror
1844    fn set_commit_marker(db: &str, v: u64) {
1845        super::vfs_sync::with_global_commit_marker(|cm| {
1846            cm.borrow_mut().insert(db.to_string(), v);
1847        });
1848    }
1849
1850    // Helper: get commit marker for a db name in test-global mirror
1851    fn get_commit_marker(db: &str) -> u64 {
1852        super::vfs_sync::with_global_commit_marker(|cm| cm.borrow().get(db).copied().unwrap_or(0))
1853    }
1854
1855    #[tokio::test(flavor = "current_thread")]
1856    async fn gating_returns_zeroed_until_marker_catches_up() {
1857        let db = "cm_gating_basic";
1858        println!("DEBUG: Creating BlockStorage for {}", db);
1859        let mut s = BlockStorage::new(db).await.expect("create storage");
1860        println!("DEBUG: BlockStorage created successfully");
1861
1862        // Write a block (starts at version 1, uncommitted)
1863        let bid = s.allocate_block().await.expect("alloc block");
1864        println!("DEBUG: Allocated block {}", bid);
1865        let data_v1 = vec![0x11u8; BLOCK_SIZE];
1866        s.write_block(bid, data_v1.clone()).await.expect("write v1");
1867        println!("DEBUG: Wrote block {} with data", bid);
1868        
1869        // Before sync, commit marker is 0, block version is 1, so should be invisible
1870        s.clear_cache();
1871        let out0 = s.read_block(bid).await.expect("read before commit");
1872        assert_eq!(out0, vec![0u8; BLOCK_SIZE], "uncommitted data must read as zeroed");
1873        println!("DEBUG: Pre-sync read returned zeroed data as expected");
1874
1875        // After sync, commit marker advances to 1, block version is 1, so should be visible
1876        println!("DEBUG: About to call sync");
1877        s.sync().await.expect("sync v1");
1878        println!("DEBUG: Sync completed successfully");
1879        
1880        // Debug: Check commit marker and metadata after sync
1881        let commit_marker = get_commit_marker(db);
1882        println!("DEBUG: Commit marker after sync: {}", commit_marker);
1883        
1884        s.clear_cache();
1885        let out1 = s.read_block(bid).await.expect("read after commit");
1886        
1887        // Debug: Print what we got vs what we expected
1888        println!("DEBUG: Expected data: {:?}", &data_v1[..8]);
1889        println!("DEBUG: Actual data: {:?}", &out1[..8]);
1890        println!("DEBUG: Data lengths - expected: {}, actual: {}", data_v1.len(), out1.len());
1891        
1892        // Check if data matches without panicking
1893        let data_matches = out1 == data_v1;
1894        println!("DEBUG: Data matches: {}", data_matches);
1895        
1896        if !data_matches {
1897            println!("DEBUG: Data mismatch detected - investigating further");
1898            // Check if it's all zeros (uncommitted)
1899            let is_all_zeros = out1.iter().all(|&b| b == 0);
1900            println!("DEBUG: Is all zeros: {}", is_all_zeros);
1901            
1902            // Check metadata and commit marker state
1903            println!("DEBUG: Final commit marker: {}", get_commit_marker(db));
1904            
1905            panic!("Data mismatch: expected committed data to be visible after sync");
1906        }
1907        
1908        println!("DEBUG: Test passed - data is visible after commit");
1909    }
1910
1911    #[tokio::test(flavor = "current_thread")]
1912    async fn invisible_blocks_skip_checksum_verification() {
1913        let db = "cm_checksum_skip";
1914        let mut s = BlockStorage::new(db).await.expect("create storage");
1915
1916        let bid = s.allocate_block().await.expect("alloc block");
1917        let data = vec![0xAAu8; BLOCK_SIZE];
1918        s.write_block(bid, data.clone()).await.expect("write v1");
1919        s.sync().await.expect("sync v1"); // commit marker advances to 1, block version is 1
1920
1921        // Make the block invisible by moving commit marker back to 0
1922        set_commit_marker(db, 0);
1923
1924        // Corrupt the stored checksum; invisible reads must NOT verify checksum
1925        s.set_block_checksum_for_testing(bid, 1234567);
1926        s.clear_cache();
1927        let out = s.read_block(bid).await.expect("read while invisible should not error");
1928        assert_eq!(out, vec![0u8; BLOCK_SIZE], "invisible block reads as zeroed");
1929
1930        // Now make it visible again; checksum verification should trigger and fail
1931        set_commit_marker(db, 1);
1932        s.clear_cache();
1933        let err = s
1934            .read_block(bid)
1935            .await
1936            .expect_err("expected checksum mismatch once visible");
1937        assert_eq!(err.code, "CHECKSUM_MISMATCH");
1938    }
1939
1940    #[tokio::test(flavor = "current_thread")]
1941    async fn commit_marker_advances_and_versions_track_syncs() {
1942        let db = "cm_versions";
1943        let mut s = BlockStorage::new_with_capacity(db, 8)
1944            .await
1945            .expect("create storage");
1946
1947        let b1 = s.allocate_block().await.expect("alloc b1");
1948        let b2 = s.allocate_block().await.expect("alloc b2");
1949
1950        s.write_block(b1, vec![1u8; BLOCK_SIZE]).await.expect("write b1 v1");
1951        s.write_block(b2, vec![2u8; BLOCK_SIZE]).await.expect("write b2 v1");
1952        s.sync().await.expect("sync #1");
1953
1954        let cm1 = get_commit_marker(db);
1955        assert_eq!(cm1, 1, "first sync should advance commit marker to 1");
1956        let meta1 = s.get_block_metadata_for_testing();
1957        assert_eq!(meta1.get(&b1).unwrap().1 as u64, cm1);
1958        assert_eq!(meta1.get(&b2).unwrap().1 as u64, cm1);
1959
1960        // Update only b1 and sync again; only b1's version should bump
1961        s.write_block(b1, vec![3u8; BLOCK_SIZE]).await.expect("write b1 v2");
1962        s.sync().await.expect("sync #2");
1963
1964        let cm2 = get_commit_marker(db);
1965        assert_eq!(cm2, 2, "second sync should advance commit marker to 2");
1966        let meta2 = s.get_block_metadata_for_testing();
1967        assert_eq!(meta2.get(&b1).unwrap().1 as u64, cm2, "updated block tracks new version");
1968        assert_eq!(meta2.get(&b2).unwrap().1 as u64, 1, "unchanged block retains prior version");
1969    }
1970}