1#[allow(unused_imports)]
2use super::metadata::BlockMetadataPersist;
3use super::metadata::{ChecksumAlgorithm, ChecksumManager};
4#[cfg(any(
5 target_arch = "wasm32",
6 all(
7 not(target_arch = "wasm32"),
8 any(test, debug_assertions),
9 not(feature = "fs_persist")
10 )
11))]
12use super::vfs_sync;
13use crate::types::DatabaseError;
14#[cfg(target_arch = "wasm32")]
15use js_sys::Date;
16#[cfg(not(target_arch = "wasm32"))]
17use parking_lot::Mutex;
18#[cfg(target_arch = "wasm32")]
19use std::cell::RefCell;
20use std::collections::{HashMap, HashSet, VecDeque};
21#[allow(unused_imports)]
22use std::sync::{
23 Arc,
24 atomic::{AtomicBool, AtomicU64, Ordering},
25};
26use std::time::Duration;
27#[cfg(not(target_arch = "wasm32"))]
28use std::time::{Instant, SystemTime, UNIX_EPOCH};
29#[cfg(not(target_arch = "wasm32"))]
30use tokio::sync::mpsc;
31#[cfg(not(target_arch = "wasm32"))]
32use tokio::task::JoinHandle as TokioJoinHandle;
33
34#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
36use std::path::PathBuf;
37
38#[cfg(not(target_arch = "wasm32"))]
44#[derive(Debug)]
45pub(super) enum SyncRequest {
46 Timer(tokio::sync::oneshot::Sender<()>),
47 Debounce(tokio::sync::oneshot::Sender<()>),
48}
49
50#[derive(Clone, Debug, Default)]
51pub struct RecoveryOptions {
52 pub mode: RecoveryMode,
53 pub on_corruption: CorruptionAction,
54}
55
56#[derive(Clone, Debug, Default)]
57pub enum RecoveryMode {
58 #[default]
59 Full,
60 Sample {
61 count: usize,
62 },
63 Skip,
64}
65
66#[derive(Clone, Debug, Default)]
67pub enum CorruptionAction {
68 #[default]
69 Report,
70 Repair,
71 Fail,
72}
73
74#[derive(Clone, Debug, PartialEq)]
75pub enum CrashRecoveryAction {
76 NoActionNeeded,
77 Rollback,
78 Finalize,
79}
80
81#[derive(Clone, Debug, Default)]
82pub struct RecoveryReport {
83 pub total_blocks_verified: usize,
84 pub corrupted_blocks: Vec<u64>,
85 pub repaired_blocks: Vec<u64>,
86 pub verification_duration_ms: u64,
87}
88
89#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
91#[derive(serde::Serialize, serde::Deserialize, Default)]
92#[allow(dead_code)]
93struct FsMeta {
94 entries: Vec<(u64, BlockMetadataPersist)>,
95}
96
97#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
98#[derive(serde::Serialize, serde::Deserialize, Default)]
99#[allow(dead_code)]
100struct FsAlloc {
101 allocated: Vec<u64>,
102}
103
104#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
105#[derive(serde::Serialize, serde::Deserialize, Default)]
106#[allow(dead_code)]
107struct FsDealloc {
108 tombstones: Vec<u64>,
109}
110
111#[cfg(not(target_arch = "wasm32"))]
113thread_local! {
114 pub(super) static GLOBAL_METADATA_TEST: parking_lot::Mutex<HashMap<String, HashMap<u64, BlockMetadataPersist>>> = parking_lot::Mutex::new(HashMap::new());
115}
116
117#[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions)))]
119thread_local! {
120 static GLOBAL_COMMIT_MARKER_TEST: parking_lot::Mutex<HashMap<String, u64>> = parking_lot::Mutex::new(HashMap::new());
121}
122
123#[derive(Clone, Debug)]
124pub struct SyncPolicy {
125 pub interval_ms: Option<u64>,
126 pub max_dirty: Option<usize>,
127 pub max_dirty_bytes: Option<usize>,
128 pub debounce_ms: Option<u64>,
129 pub verify_after_write: bool,
130}
131
132#[cfg(not(target_arch = "wasm32"))]
133impl Drop for BlockStorage {
134 fn drop(&mut self) {
135 if let Some(stop) = &self.auto_sync_stop {
136 stop.store(true, Ordering::SeqCst);
137 }
138 if let Some(handle) = self.auto_sync_thread.take() {
139 let _ = handle.join();
140 }
141 if let Some(handle) = self.debounce_thread.take() {
142 let _ = handle.join();
143 }
144 if let Some(task) = self.tokio_timer_task.take() {
145 task.abort();
146 }
147 if let Some(task) = self.tokio_debounce_task.take() {
148 task.abort();
149 }
150 self.auto_sync_stop = None;
151 }
152}
153
154pub const BLOCK_SIZE: usize = 4096;
155#[allow(dead_code)]
156pub(super) const DEFAULT_CACHE_CAPACITY: usize = 128;
157#[allow(dead_code)]
158const STORE_NAME: &str = "sqlite_blocks";
159#[allow(dead_code)]
160const METADATA_STORE: &str = "metadata";
161
162#[cfg(target_arch = "wasm32")]
165macro_rules! lock_mutex {
166 ($mutex:expr) => {
167 $mutex
168 .try_borrow_mut()
169 .expect("RefCell borrow failed - reentrancy detected in block_storage.rs")
170 };
171}
172
173#[allow(unused_macros)]
175#[cfg(target_arch = "wasm32")]
176macro_rules! try_lock_mutex {
177 ($mutex:expr) => {
178 $mutex
179 };
180}
181
182#[cfg(not(target_arch = "wasm32"))]
183macro_rules! lock_mutex {
184 ($mutex:expr) => {
185 $mutex.lock()
186 };
187}
188
189#[allow(unused_macros)]
191#[cfg(target_arch = "wasm32")]
192macro_rules! read_lock {
193 ($mutex:expr) => {
194 $mutex
195 .try_borrow()
196 .expect("RefCell borrow failed - likely reentrancy issue")
197 };
198}
199
200#[allow(unused_macros)]
201#[cfg(not(target_arch = "wasm32"))]
202macro_rules! read_lock {
203 ($mutex:expr) => {
204 $mutex.lock()
205 };
206}
207
208#[allow(unused_macros)]
210#[cfg(target_arch = "wasm32")]
211macro_rules! try_lock_mutex {
212 ($mutex:expr) => {
213 $mutex.ok()
214 };
215}
216
217#[allow(unused_macros)]
218#[cfg(not(target_arch = "wasm32"))]
219macro_rules! try_lock_mutex {
220 ($mutex:expr) => {
221 Some($mutex.lock())
222 };
223}
224
225#[allow(unused_macros)]
227#[cfg(target_arch = "wasm32")]
228macro_rules! try_read_lock {
229 ($mutex:expr) => {
230 $mutex.try_borrow().ok()
231 };
232}
233
234#[allow(unused_macros)]
235#[cfg(not(target_arch = "wasm32"))]
236macro_rules! try_read_lock {
237 ($mutex:expr) => {
238 Some($mutex.lock())
239 };
240}
241
242pub struct BlockStorage {
243 #[cfg(target_arch = "wasm32")]
246 pub(super) cache: RefCell<HashMap<u64, Vec<u8>>>,
247 #[cfg(not(target_arch = "wasm32"))]
248 pub(super) cache: Mutex<HashMap<u64, Vec<u8>>>,
249
250 #[cfg(target_arch = "wasm32")]
251 pub(super) dirty_blocks: Arc<RefCell<HashMap<u64, Vec<u8>>>>,
252 #[cfg(not(target_arch = "wasm32"))]
253 pub(super) dirty_blocks: Arc<Mutex<HashMap<u64, Vec<u8>>>>,
254
255 #[cfg(target_arch = "wasm32")]
256 pub(super) allocated_blocks: RefCell<HashSet<u64>>,
257 #[cfg(not(target_arch = "wasm32"))]
258 pub(super) allocated_blocks: Mutex<HashSet<u64>>,
259
260 #[cfg(target_arch = "wasm32")]
261 #[allow(dead_code)]
262 pub(super) deallocated_blocks: RefCell<HashSet<u64>>,
263 #[cfg(not(target_arch = "wasm32"))]
264 #[allow(dead_code)]
265 pub(super) deallocated_blocks: Mutex<HashSet<u64>>,
266 pub(super) next_block_id: AtomicU64,
267 pub(super) capacity: usize,
268
269 #[cfg(target_arch = "wasm32")]
270 pub(super) lru_order: RefCell<VecDeque<u64>>,
271 #[cfg(not(target_arch = "wasm32"))]
272 pub(super) lru_order: Mutex<VecDeque<u64>>,
273
274 pub(super) checksum_manager: ChecksumManager,
276 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
277 pub(super) base_dir: PathBuf,
278 pub(super) db_name: String,
279
280 #[cfg(target_arch = "wasm32")]
282 pub(super) auto_sync_interval: RefCell<Option<Duration>>,
283 #[cfg(not(target_arch = "wasm32"))]
284 pub(super) auto_sync_interval: Mutex<Option<Duration>>,
285
286 #[cfg(not(target_arch = "wasm32"))]
287 pub(super) last_auto_sync: Instant,
288
289 #[cfg(target_arch = "wasm32")]
290 pub(super) policy: RefCell<Option<SyncPolicy>>,
291 #[cfg(not(target_arch = "wasm32"))]
292 pub(super) policy: Mutex<Option<SyncPolicy>>,
293 #[cfg(not(target_arch = "wasm32"))]
294 pub(super) auto_sync_stop: Option<Arc<AtomicBool>>,
295 #[cfg(not(target_arch = "wasm32"))]
296 pub(super) auto_sync_thread: Option<std::thread::JoinHandle<()>>,
297 #[cfg(not(target_arch = "wasm32"))]
298 pub(super) debounce_thread: Option<std::thread::JoinHandle<()>>,
299 #[cfg(not(target_arch = "wasm32"))]
300 pub(super) tokio_timer_task: Option<TokioJoinHandle<()>>,
301 #[cfg(not(target_arch = "wasm32"))]
302 pub(super) tokio_debounce_task: Option<TokioJoinHandle<()>>,
303 #[cfg(not(target_arch = "wasm32"))]
304 pub(super) last_write_ms: Arc<AtomicU64>,
305 #[cfg(not(target_arch = "wasm32"))]
306 pub(super) threshold_hit: Arc<AtomicBool>,
307 #[cfg(not(target_arch = "wasm32"))]
308 pub(super) sync_count: Arc<AtomicU64>,
309 #[cfg(not(target_arch = "wasm32"))]
310 pub(super) timer_sync_count: Arc<AtomicU64>,
311 #[cfg(not(target_arch = "wasm32"))]
312 pub(super) debounce_sync_count: Arc<AtomicU64>,
313 #[cfg(not(target_arch = "wasm32"))]
314 pub(super) last_sync_duration_ms: Arc<AtomicU64>,
315
316 #[cfg(not(target_arch = "wasm32"))]
318 pub(super) sync_sender: Option<mpsc::UnboundedSender<SyncRequest>>,
319 #[cfg(not(target_arch = "wasm32"))]
320 pub(super) sync_receiver: Option<mpsc::UnboundedReceiver<SyncRequest>>,
321
322 pub(super) recovery_report: RecoveryReport,
324
325 #[cfg(target_arch = "wasm32")]
327 pub leader_election: std::cell::RefCell<Option<super::leader_election::LeaderElectionManager>>,
328
329 pub(super) observability: super::observability::ObservabilityManager,
331
332 #[cfg(feature = "telemetry")]
334 pub(super) metrics: Option<crate::telemetry::Metrics>,
335}
336
337impl BlockStorage {
338 #[cfg(target_arch = "wasm32")]
341 pub fn new_sync(db_name: &str) -> Self {
342 log::info!(
343 "Creating BlockStorage synchronously for database: {}",
344 db_name
345 );
346
347 use crate::storage::vfs_sync::with_global_storage;
349 let (cache, allocated_blocks, max_block_id) = with_global_storage(|storage_map| {
350 if let Some(db_storage) = storage_map.borrow().get(db_name) {
351 let cache = db_storage.clone();
352 let allocated = db_storage.keys().copied().collect::<HashSet<_>>();
353 let max_id = db_storage.keys().max().copied().unwrap_or(0);
354 (cache, allocated, max_id)
355 } else {
356 (HashMap::new(), HashSet::new(), 0)
357 }
358 });
359
360 log::info!(
361 "Loaded {} blocks from GLOBAL_STORAGE for {} (max_block_id={})",
362 cache.len(),
363 db_name,
364 max_block_id
365 );
366
367 use crate::storage::vfs_sync::with_global_metadata;
370 let checksum_manager = with_global_metadata(|metadata_map| {
371 if let Some(db_metadata) = metadata_map.borrow().get(db_name) {
372 let mut checksums = HashMap::new();
373 let mut algos = HashMap::new();
374 for (block_id, meta) in db_metadata {
375 checksums.insert(*block_id, meta.checksum);
376 algos.insert(*block_id, meta.algo);
377 }
378 ChecksumManager::with_data(checksums, algos, ChecksumAlgorithm::FastHash)
379 } else {
380 ChecksumManager::new(ChecksumAlgorithm::FastHash)
381 }
382 });
383
384 Self {
385 cache: RefCell::new(cache),
386 dirty_blocks: Arc::new(RefCell::new(HashMap::new())),
387 allocated_blocks: RefCell::new(allocated_blocks),
388 deallocated_blocks: RefCell::new(HashSet::new()),
389 next_block_id: AtomicU64::new(max_block_id + 1),
390 capacity: 128,
391 lru_order: RefCell::new(VecDeque::new()),
392 checksum_manager,
393 db_name: db_name.to_string(),
394 auto_sync_interval: RefCell::new(None),
395 policy: RefCell::new(None),
396 #[cfg(not(target_arch = "wasm32"))]
397 last_auto_sync: Instant::now(),
398 #[cfg(not(target_arch = "wasm32"))]
399 auto_sync_stop: None,
400 #[cfg(not(target_arch = "wasm32"))]
401 auto_sync_thread: None,
402 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
403 base_dir: std::path::PathBuf::from(
404 std::env::var("ABSURDERSQL_FS_BASE")
405 .unwrap_or_else(|_| "./test_storage".to_string()),
406 ),
407 #[cfg(not(target_arch = "wasm32"))]
408 debounce_thread: None,
409 #[cfg(not(target_arch = "wasm32"))]
410 tokio_timer_task: None,
411 #[cfg(not(target_arch = "wasm32"))]
412 tokio_debounce_task: None,
413 #[cfg(not(target_arch = "wasm32"))]
414 last_write_ms: Arc::new(AtomicU64::new(0)),
415 #[cfg(not(target_arch = "wasm32"))]
416 threshold_hit: Arc::new(AtomicBool::new(false)),
417 #[cfg(not(target_arch = "wasm32"))]
418 sync_count: Arc::new(AtomicU64::new(0)),
419 #[cfg(not(target_arch = "wasm32"))]
420 timer_sync_count: Arc::new(AtomicU64::new(0)),
421 #[cfg(not(target_arch = "wasm32"))]
422 debounce_sync_count: Arc::new(AtomicU64::new(0)),
423 #[cfg(not(target_arch = "wasm32"))]
424 last_sync_duration_ms: Arc::new(AtomicU64::new(0)),
425 #[cfg(not(target_arch = "wasm32"))]
426 sync_sender: None,
427 #[cfg(not(target_arch = "wasm32"))]
428 sync_receiver: None,
429 recovery_report: RecoveryReport::default(),
430 #[cfg(target_arch = "wasm32")]
431 leader_election: std::cell::RefCell::new(None),
432 observability: super::observability::ObservabilityManager::new(),
433 #[cfg(feature = "telemetry")]
434 metrics: None,
435 }
436 }
437
438 #[cfg(target_arch = "wasm32")]
439 pub async fn new(db_name: &str) -> Result<Self, DatabaseError> {
440 super::constructors::new_wasm(db_name).await
441 }
442
443 #[cfg(not(target_arch = "wasm32"))]
444 pub async fn new(db_name: &str) -> Result<Self, DatabaseError> {
445 log::info!("Creating BlockStorage for database: {}", db_name);
446
447 let (allocated_blocks, next_block_id) = {
449 #[cfg(feature = "fs_persist")]
450 {
451 let mut allocated_blocks = HashSet::new();
453 let mut next_block_id: u64 = 1;
454
455 let base_path = std::env::var("ABSURDERSQL_FS_BASE")
456 .unwrap_or_else(|_| "./test_storage".to_string());
457 let mut alloc_path = std::path::PathBuf::from(base_path);
458 alloc_path.push(db_name);
459 alloc_path.push("allocations.json");
460
461 if let Ok(content) = std::fs::read_to_string(&alloc_path) {
462 if let Ok(alloc_data) = serde_json::from_str::<serde_json::Value>(&content) {
463 if let Some(allocated_array) = alloc_data["allocated"].as_array() {
464 for block_id_val in allocated_array {
465 if let Some(block_id) = block_id_val.as_u64() {
466 allocated_blocks.insert(block_id);
467 }
468 }
469 next_block_id = allocated_blocks.iter().max().copied().unwrap_or(0) + 1;
470 }
471 }
472 }
473
474 (allocated_blocks, next_block_id)
475 }
476
477 #[cfg(not(feature = "fs_persist"))]
478 {
479 (HashSet::new(), 1)
481 }
482 };
483
484 let checksums_init: HashMap<u64, u64> = {
486 #[cfg(feature = "fs_persist")]
487 {
488 let mut map = HashMap::new();
489 let base_path = std::env::var("ABSURDERSQL_FS_BASE")
490 .unwrap_or_else(|_| "./test_storage".to_string());
491 let mut meta_path = std::path::PathBuf::from(base_path);
492 meta_path.push(db_name);
493 meta_path.push("metadata.json");
494
495 if let Ok(content) = std::fs::read_to_string(&meta_path) {
496 if let Ok(meta_data) = serde_json::from_str::<serde_json::Value>(&content) {
497 if let Some(entries) = meta_data["entries"].as_array() {
498 for entry in entries {
499 if let Some(arr) = entry.as_array() {
500 if let (Some(block_id), Some(obj)) = (
501 arr.first().and_then(|v| v.as_u64()),
502 arr.get(1).and_then(|v| v.as_object()),
503 ) {
504 if let Some(checksum) =
505 obj.get("checksum").and_then(|v| v.as_u64())
506 {
507 map.insert(block_id, checksum);
508 }
509 }
510 }
511 }
512 }
513 }
514 }
515 map
516 }
517
518 #[cfg(not(feature = "fs_persist"))]
519 {
520 #[allow(unused_mut)]
522 let mut map = HashMap::new();
523 #[cfg(any(test, debug_assertions))]
524 GLOBAL_METADATA_TEST.with(|meta| {
525 #[cfg(target_arch = "wasm32")]
526 let meta_map = meta.borrow_mut();
527 #[cfg(not(target_arch = "wasm32"))]
528 let meta_map = meta.lock();
529 if let Some(db_meta) = meta_map.get(db_name) {
530 for (block_id, metadata) in db_meta.iter() {
531 map.insert(*block_id, metadata.checksum);
532 }
533 }
534 });
535 map
536 }
537 };
538
539 let checksum_algos_init: HashMap<u64, ChecksumAlgorithm> = {
540 #[cfg(feature = "fs_persist")]
541 {
542 let mut map = HashMap::new();
543 let base_path = std::env::var("ABSURDERSQL_FS_BASE")
544 .unwrap_or_else(|_| "./test_storage".to_string());
545 let mut meta_path = std::path::PathBuf::from(base_path);
546 meta_path.push(db_name);
547 meta_path.push("metadata.json");
548
549 if let Ok(content) = std::fs::read_to_string(&meta_path) {
550 if let Ok(meta_data) = serde_json::from_str::<serde_json::Value>(&content) {
551 if let Some(entries) = meta_data["entries"].as_array() {
552 for entry in entries {
553 if let Some(arr) = entry.as_array() {
554 if let (Some(block_id), Some(obj)) = (
555 arr.first().and_then(|v| v.as_u64()),
556 arr.get(1).and_then(|v| v.as_object()),
557 ) {
558 let algo = obj
559 .get("algo")
560 .and_then(|v| v.as_str())
561 .and_then(|s| match s {
562 "CRC32" => Some(ChecksumAlgorithm::CRC32),
563 "FastHash" => Some(ChecksumAlgorithm::FastHash),
564 _ => None,
565 })
566 .unwrap_or(ChecksumAlgorithm::FastHash);
567 map.insert(block_id, algo);
568 }
569 }
570 }
571 }
572 }
573 }
574 map
575 }
576
577 #[cfg(not(feature = "fs_persist"))]
578 {
579 #[allow(unused_mut)]
581 let mut map = HashMap::new();
582 #[cfg(any(test, debug_assertions))]
583 GLOBAL_METADATA_TEST.with(|meta| {
584 #[cfg(target_arch = "wasm32")]
585 let meta_map = meta.borrow_mut();
586 #[cfg(not(target_arch = "wasm32"))]
587 let meta_map = meta.lock();
588 if let Some(db_meta) = meta_map.get(db_name) {
589 for (block_id, metadata) in db_meta.iter() {
590 map.insert(*block_id, metadata.algo);
591 }
592 }
593 });
594 map
595 }
596 };
597
598 #[cfg(feature = "fs_persist")]
600 let checksum_algo_default = match std::env::var("DATASYNC_CHECKSUM_ALGO").ok().as_deref() {
601 Some("CRC32") => ChecksumAlgorithm::CRC32,
602 _ => ChecksumAlgorithm::FastHash,
603 };
604 #[cfg(not(feature = "fs_persist"))]
605 let checksum_algo_default = ChecksumAlgorithm::FastHash;
606
607 let deallocated_blocks_init: HashSet<u64> = {
609 #[cfg(feature = "fs_persist")]
610 {
611 let mut set = HashSet::new();
612 let base_path = std::env::var("ABSURDERSQL_FS_BASE")
613 .unwrap_or_else(|_| "./test_storage".to_string());
614 let mut path = std::path::PathBuf::from(base_path);
615 path.push(db_name);
616 let mut dealloc_path = path.clone();
617 dealloc_path.push("deallocated.json");
618 if let Ok(content) = std::fs::read_to_string(&dealloc_path) {
619 if let Ok(dealloc_data) = serde_json::from_str::<serde_json::Value>(&content) {
620 if let Some(tombstones_array) = dealloc_data["tombstones"].as_array() {
621 for tombstone_val in tombstones_array {
622 if let Some(block_id) = tombstone_val.as_u64() {
623 set.insert(block_id);
624 }
625 }
626 }
627 }
628 }
629 set
630 }
631 #[cfg(not(feature = "fs_persist"))]
632 {
633 HashSet::new()
634 }
635 };
636
637 Ok(BlockStorage {
638 db_name: db_name.to_string(),
639 cache: Mutex::new(HashMap::new()),
640 lru_order: Mutex::new(VecDeque::new()),
641 capacity: 1000,
642 checksum_manager: ChecksumManager::with_data(
643 checksums_init,
644 checksum_algos_init,
645 checksum_algo_default,
646 ),
647 dirty_blocks: Arc::new(Mutex::new(HashMap::new())),
648 allocated_blocks: Mutex::new(allocated_blocks),
649 next_block_id: AtomicU64::new(next_block_id),
650 deallocated_blocks: Mutex::new(deallocated_blocks_init),
651 policy: Mutex::new(None),
652 auto_sync_interval: Mutex::new(None),
653 #[cfg(not(target_arch = "wasm32"))]
654 last_auto_sync: Instant::now(),
655 #[cfg(not(target_arch = "wasm32"))]
656 auto_sync_stop: None,
657 #[cfg(not(target_arch = "wasm32"))]
658 auto_sync_thread: None,
659 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
660 base_dir: std::path::PathBuf::from(
661 std::env::var("ABSURDERSQL_FS_BASE")
662 .unwrap_or_else(|_| "./test_storage".to_string()),
663 ),
664 #[cfg(not(target_arch = "wasm32"))]
665 debounce_thread: None,
666 #[cfg(not(target_arch = "wasm32"))]
667 tokio_timer_task: None,
668 #[cfg(not(target_arch = "wasm32"))]
669 tokio_debounce_task: None,
670 #[cfg(not(target_arch = "wasm32"))]
671 last_write_ms: Arc::new(AtomicU64::new(0)),
672 #[cfg(not(target_arch = "wasm32"))]
673 threshold_hit: Arc::new(AtomicBool::new(false)),
674 #[cfg(not(target_arch = "wasm32"))]
675 sync_count: Arc::new(AtomicU64::new(0)),
676 #[cfg(not(target_arch = "wasm32"))]
677 timer_sync_count: Arc::new(AtomicU64::new(0)),
678 #[cfg(not(target_arch = "wasm32"))]
679 debounce_sync_count: Arc::new(AtomicU64::new(0)),
680 #[cfg(not(target_arch = "wasm32"))]
681 last_sync_duration_ms: Arc::new(AtomicU64::new(0)),
682 #[cfg(not(target_arch = "wasm32"))]
683 sync_sender: None,
684 #[cfg(not(target_arch = "wasm32"))]
685 sync_receiver: None,
686 recovery_report: RecoveryReport::default(),
687 #[cfg(target_arch = "wasm32")]
688 leader_election: std::cell::RefCell::new(None),
689 observability: super::observability::ObservabilityManager::new(),
690 #[cfg(feature = "telemetry")]
691 metrics: None,
692 })
693 }
694
695 pub async fn new_with_capacity(db_name: &str, capacity: usize) -> Result<Self, DatabaseError> {
696 let mut s = Self::new(db_name).await?;
697 s.capacity = capacity;
698 Ok(s)
699 }
700
701 pub async fn new_with_recovery_options(
702 db_name: &str,
703 recovery_opts: RecoveryOptions,
704 ) -> Result<Self, DatabaseError> {
705 let mut storage = Self::new(db_name).await?;
706
707 storage.perform_startup_recovery(recovery_opts).await?;
708
709 Ok(storage)
710 }
711
712 pub fn get_recovery_report(&self) -> &RecoveryReport {
713 &self.recovery_report
714 }
715
716 async fn perform_startup_recovery(
717 &mut self,
718 opts: RecoveryOptions,
719 ) -> Result<(), DatabaseError> {
720 super::recovery::perform_startup_recovery(self, opts).await
721 }
722
723 pub(super) async fn get_blocks_for_verification(
724 &self,
725 mode: &RecoveryMode,
726 ) -> Result<Vec<u64>, DatabaseError> {
727 let all_blocks: Vec<u64> = lock_mutex!(self.allocated_blocks).iter().copied().collect();
728
729 match mode {
730 RecoveryMode::Full => Ok(all_blocks),
731 RecoveryMode::Sample { count } => {
732 let sample_count = (*count).min(all_blocks.len());
733 let mut sampled = all_blocks;
734 sampled.sort_unstable(); sampled.truncate(sample_count);
736 Ok(sampled)
737 }
738 RecoveryMode::Skip => Ok(Vec::new()),
739 }
740 }
741
742 pub(super) async fn verify_block_integrity(
743 &mut self,
744 block_id: u64,
745 ) -> Result<bool, DatabaseError> {
746 let data = match self.read_block_from_storage(block_id).await {
748 Ok(data) => data,
749 Err(_) => {
750 log::warn!(
751 "Could not read block {} for integrity verification",
752 block_id
753 );
754 return Ok(false);
755 }
756 };
757
758 match self.verify_against_stored_checksum(block_id, &data) {
760 Ok(()) => Ok(true),
761 Err(e) => {
762 log::warn!(
763 "Block {} failed checksum verification: {}",
764 block_id,
765 e.message
766 );
767 Ok(false)
768 }
769 }
770 }
771
772 async fn read_block_from_storage(&mut self, block_id: u64) -> Result<Vec<u8>, DatabaseError> {
773 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
775 {
776 let mut blocks_dir = self.base_dir.clone();
777 blocks_dir.push(&self.db_name);
778 blocks_dir.push("blocks");
779 let block_file = blocks_dir.join(format!("block_{}.bin", block_id));
780
781 if let Ok(data) = std::fs::read(&block_file) {
782 if data.len() == BLOCK_SIZE {
783 return Ok(data);
784 }
785 }
786 }
787
788 #[cfg(all(
790 not(target_arch = "wasm32"),
791 any(test, debug_assertions),
792 not(feature = "fs_persist")
793 ))]
794 {
795 let mut found_data = None;
796 vfs_sync::with_global_storage(|storage| {
797 #[cfg(target_arch = "wasm32")]
798 let storage_map = storage;
799 #[cfg(not(target_arch = "wasm32"))]
800 let storage_map = storage.borrow();
801 if let Some(db_storage) = storage_map.get(&self.db_name) {
802 if let Some(data) = db_storage.get(&block_id) {
803 found_data = Some(data.clone());
804 }
805 }
806 });
807 if let Some(data) = found_data {
808 return Ok(data);
809 }
810 }
811
812 #[cfg(target_arch = "wasm32")]
814 {
815 let mut found_data = None;
816 vfs_sync::with_global_storage(|storage_map| {
817 if let Some(db_storage) = storage_map.borrow().get(&self.db_name) {
818 if let Some(data) = db_storage.get(&block_id) {
819 found_data = Some(data.clone());
820 }
821 }
822 });
823 if let Some(data) = found_data {
824 return Ok(data);
825 }
826 }
827
828 Err(DatabaseError::new(
829 "BLOCK_NOT_FOUND",
830 &format!("Block {} not found in storage", block_id),
831 ))
832 }
833
834 pub(super) async fn repair_corrupted_block(
835 &mut self,
836 block_id: u64,
837 ) -> Result<bool, DatabaseError> {
838 log::info!("Attempting to repair corrupted block {}", block_id);
839
840 lock_mutex!(self.cache).remove(&block_id);
845
846 self.checksum_manager.remove_checksum(block_id);
848
849 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
851 {
852 let mut blocks_dir = self.base_dir.clone();
853 blocks_dir.push(&self.db_name);
854 blocks_dir.push("blocks");
855 let block_file = blocks_dir.join(format!("block_{}.bin", block_id));
856 let _ = std::fs::remove_file(&block_file);
857 }
858
859 #[cfg(all(
861 not(target_arch = "wasm32"),
862 any(test, debug_assertions),
863 not(feature = "fs_persist")
864 ))]
865 {
866 vfs_sync::with_global_storage(|storage| {
867 let mut storage_map = storage.borrow_mut();
868 if let Some(db_storage) = storage_map.get_mut(&self.db_name) {
869 db_storage.remove(&block_id);
870 }
871 });
872 }
873
874 #[cfg(target_arch = "wasm32")]
876 {
877 vfs_sync::with_global_storage(|storage_map| {
878 if let Some(db_storage) = storage_map.borrow_mut().get_mut(&self.db_name) {
879 db_storage.remove(&block_id);
880 }
881 });
882 }
883
884 log::info!(
885 "Corrupted block {} has been removed (repair completed)",
886 block_id
887 );
888 Ok(true)
889 }
890
891 pub(super) fn touch_lru(&self, block_id: u64) {
892 let mut lru = lock_mutex!(self.lru_order);
894 if let Some(pos) = lru.iter().position(|&id| id == block_id) {
895 lru.remove(pos);
896 }
897 lru.push_back(block_id);
899 }
900
901 pub(super) fn evict_if_needed(&self) {
902 loop {
904 let victim_opt = {
906 let cache_guard = lock_mutex!(self.cache);
907 if cache_guard.len() <= self.capacity {
908 break; }
910
911 let dirty_guard = lock_mutex!(self.dirty_blocks);
913 let mut lru_guard = lock_mutex!(self.lru_order);
914
915 let victim_pos = lru_guard
916 .iter()
917 .position(|id| !dirty_guard.contains_key(id));
918
919 victim_pos.map(|pos| lru_guard.remove(pos).expect("valid pos"))
920 };
922
923 match victim_opt {
924 Some(victim) => {
925 lock_mutex!(self.cache).remove(&victim);
927 }
928 None => {
929 break;
931 }
932 }
933 }
934 }
935
936 #[inline]
937 #[cfg(target_arch = "wasm32")]
938 pub fn now_millis() -> u64 {
939 Date::now() as u64
941 }
942
943 #[inline]
944 #[cfg(not(target_arch = "wasm32"))]
945 pub fn now_millis() -> u64 {
946 let now = SystemTime::now()
947 .duration_since(UNIX_EPOCH)
948 .unwrap_or_else(|_| Duration::from_millis(0));
949 now.as_millis() as u64
950 }
951
952 pub(super) fn verify_against_stored_checksum(
953 &self,
954 block_id: u64,
955 data: &[u8],
956 ) -> Result<(), DatabaseError> {
957 self.checksum_manager.validate_checksum(block_id, data)
958 }
959
960 pub fn read_block_sync(&self, block_id: u64) -> Result<Vec<u8>, DatabaseError> {
962 super::io_operations::read_block_sync_impl(self, block_id)
964 }
965
966 pub async fn read_block(&self, block_id: u64) -> Result<Vec<u8>, DatabaseError> {
967 self.read_block_sync(block_id)
969 }
970
971 #[cfg(target_arch = "wasm32")]
973 pub fn write_block_sync(&self, block_id: u64, data: Vec<u8>) -> Result<(), DatabaseError> {
974 super::io_operations::write_block_sync_impl(self, block_id, data)
975 }
976
977 #[cfg(not(target_arch = "wasm32"))]
978 pub fn write_block_sync(&mut self, block_id: u64, data: Vec<u8>) -> Result<(), DatabaseError> {
979 super::io_operations::write_block_sync_impl(self, block_id, data)
980 }
981
982 #[cfg(target_arch = "wasm32")]
983 pub async fn write_block(&self, block_id: u64, data: Vec<u8>) -> Result<(), DatabaseError> {
984 self.write_block_sync(block_id, data)
985 }
986
987 #[cfg(not(target_arch = "wasm32"))]
988 pub async fn write_block(&mut self, block_id: u64, data: Vec<u8>) -> Result<(), DatabaseError> {
989 self.write_block_sync(block_id, data)?;
990
991 if self.threshold_hit.load(std::sync::atomic::Ordering::SeqCst) {
993 let has_debounce = lock_mutex!(self.policy)
994 .as_ref()
995 .and_then(|p| p.debounce_ms)
996 .is_some();
997 if !has_debounce {
998 log::debug!("Threshold hit without debounce: performing inline sync");
999 self.sync_implementation()?;
1000 self.threshold_hit
1001 .store(false, std::sync::atomic::Ordering::SeqCst);
1002 }
1003 }
1004
1005 Ok(())
1006 }
1007
1008 #[cfg(target_arch = "wasm32")]
1010 pub fn write_blocks_sync(&self, items: Vec<(u64, Vec<u8>)>) -> Result<(), DatabaseError> {
1011 self.maybe_auto_sync();
1012 for (block_id, data) in items {
1013 self.write_block_sync(block_id, data)?;
1014 }
1015 Ok(())
1016 }
1017
1018 #[cfg(not(target_arch = "wasm32"))]
1019 pub fn write_blocks_sync(&mut self, items: Vec<(u64, Vec<u8>)>) -> Result<(), DatabaseError> {
1020 self.maybe_auto_sync();
1021 for (block_id, data) in items {
1022 self.write_block_sync(block_id, data)?;
1023 }
1024 Ok(())
1025 }
1026
1027 #[cfg(target_arch = "wasm32")]
1029 pub async fn write_blocks(&self, items: Vec<(u64, Vec<u8>)>) -> Result<(), DatabaseError> {
1030 self.write_blocks_sync(items)
1031 }
1032
1033 #[cfg(not(target_arch = "wasm32"))]
1034 pub async fn write_blocks(&mut self, items: Vec<(u64, Vec<u8>)>) -> Result<(), DatabaseError> {
1035 self.write_blocks_sync(items)
1036 }
1037
1038 pub fn read_blocks_sync(&self, block_ids: &[u64]) -> Result<Vec<Vec<u8>>, DatabaseError> {
1040 self.maybe_auto_sync();
1041 let mut results = Vec::with_capacity(block_ids.len());
1042 for &id in block_ids {
1043 results.push(self.read_block_sync(id)?);
1044 }
1045 Ok(results)
1046 }
1047
1048 pub async fn read_blocks(&self, block_ids: &[u64]) -> Result<Vec<Vec<u8>>, DatabaseError> {
1050 self.read_blocks_sync(block_ids)
1051 }
1052
1053 pub fn get_block_checksum(&self, block_id: u64) -> Option<u32> {
1055 self.checksum_manager
1056 .get_checksum(block_id)
1057 .map(|checksum| checksum as u32)
1058 }
1059
1060 #[cfg(target_arch = "wasm32")]
1062 pub fn get_commit_marker(&self) -> u64 {
1063 vfs_sync::with_global_commit_marker(|cm| {
1064 cm.borrow().get(&self.db_name).copied().unwrap_or(0)
1065 })
1066 }
1067
1068 #[cfg(target_arch = "wasm32")]
1070 pub fn has_any_blocks(&self) -> bool {
1071 vfs_sync::with_global_storage(|storage_map| {
1072 storage_map
1073 .borrow()
1074 .get(&self.db_name)
1075 .map_or(false, |blocks| !blocks.is_empty())
1076 })
1077 }
1078
1079 pub async fn verify_block_checksum(&mut self, block_id: u64) -> Result<(), DatabaseError> {
1080 if let Some(bytes) = lock_mutex!(self.cache).get(&block_id).cloned() {
1082 return self.verify_against_stored_checksum(block_id, &bytes);
1083 }
1084 let data = self.read_block_sync(block_id)?;
1086 self.verify_against_stored_checksum(block_id, &data)
1087 }
1088
1089 #[allow(unused_mut)]
1091 pub fn get_block_metadata_for_testing(&mut self) -> HashMap<u64, (u64, u32, u64)> {
1092 #[cfg(target_arch = "wasm32")]
1094 {
1095 let mut out = HashMap::new();
1096 vfs_sync::with_global_metadata(|meta_map| {
1097 if let Some(db_meta) = meta_map.borrow().get(&self.db_name) {
1098 for (bid, m) in db_meta.iter() {
1099 out.insert(*bid, (m.checksum, m.version, m.last_modified_ms));
1100 }
1101 }
1102 });
1103 out
1104 }
1105 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
1106 {
1107 use super::fs_persist::FsMeta;
1108 use std::io::Read;
1109 let mut out = HashMap::new();
1110 let base: PathBuf = self.base_dir.clone();
1111 let mut db_dir = base.clone();
1112 db_dir.push(&self.db_name);
1113 let mut meta_path = db_dir.clone();
1114 meta_path.push("metadata.json");
1115 if let Ok(mut f) = std::fs::File::open(&meta_path) {
1116 let mut s = String::new();
1117 if f.read_to_string(&mut s).is_ok() {
1118 if let Ok(meta) = serde_json::from_str::<FsMeta>(&s) {
1119 for (block_id, metadata) in meta.entries {
1120 out.insert(
1121 block_id,
1122 (
1123 metadata.checksum,
1124 metadata.version,
1125 metadata.last_modified_ms,
1126 ),
1127 );
1128 }
1129 }
1130 }
1131 }
1132 out
1133 }
1134 #[cfg(all(
1135 not(target_arch = "wasm32"),
1136 any(test, debug_assertions),
1137 not(feature = "fs_persist")
1138 ))]
1139 {
1140 let mut out = HashMap::new();
1141 GLOBAL_METADATA_TEST.with(|meta| {
1142 #[cfg(target_arch = "wasm32")]
1143 let meta_map = meta.borrow_mut();
1144 #[cfg(not(target_arch = "wasm32"))]
1145 let meta_map = meta.lock();
1146 if let Some(db_meta) = meta_map.get(&self.db_name) {
1147 for (bid, m) in db_meta.iter() {
1148 out.insert(*bid, (m.checksum, m.version, m.last_modified_ms));
1149 }
1150 }
1151 });
1152 out
1153 }
1154 }
1155
1156 pub fn set_block_checksum_for_testing(&mut self, block_id: u64, checksum: u64) {
1158 self.checksum_manager
1159 .set_checksum_for_testing(block_id, checksum);
1160 }
1161
1162 #[cfg(not(target_arch = "wasm32"))]
1164 pub(super) fn get_dirty_blocks(&self) -> &Arc<Mutex<HashMap<u64, Vec<u8>>>> {
1165 &self.dirty_blocks
1166 }
1167
1168 #[cfg(target_arch = "wasm32")]
1169 #[allow(invalid_reference_casting)]
1170 pub async fn sync(&self) -> Result<(), DatabaseError> {
1171 let self_mut = unsafe { &mut *(self as *const Self as *mut Self) };
1173 let result = self_mut.sync_implementation();
1174 wasm_bindgen_futures::JsFuture::from(js_sys::Promise::resolve(
1176 &wasm_bindgen::JsValue::UNDEFINED,
1177 ))
1178 .await
1179 .ok();
1180 result
1181 }
1182
1183 #[cfg(not(target_arch = "wasm32"))]
1184 pub async fn sync(&mut self) -> Result<(), DatabaseError> {
1185 self.sync_implementation()
1186 }
1187
1188 #[cfg(target_arch = "wasm32")]
1190 #[allow(invalid_reference_casting)]
1191 pub fn sync_now(&self) -> Result<(), DatabaseError> {
1192 let self_mut = unsafe { &mut *(self as *const Self as *mut Self) };
1195 self_mut.sync_implementation()
1196 }
1197
1198 #[cfg(not(target_arch = "wasm32"))]
1199 pub fn sync_now(&mut self) -> Result<(), DatabaseError> {
1200 self.sync_implementation()
1201 }
1202
1203 fn sync_implementation(&mut self) -> Result<(), DatabaseError> {
1205 super::sync_operations::sync_implementation_impl(self)
1206 }
1207
1208 #[cfg(target_arch = "wasm32")]
1211 pub fn sync_blocks_only(&mut self) -> Result<(), DatabaseError> {
1212 super::wasm_vfs_sync::sync_blocks_only(self)
1213 }
1214
1215 #[cfg(target_arch = "wasm32")]
1217 pub async fn sync_async(&self) -> Result<(), DatabaseError> {
1218 super::wasm_indexeddb::sync_async(self).await
1219 }
1220
1221 #[cfg(target_arch = "wasm32")]
1224 #[allow(invalid_reference_casting)]
1225 pub fn drain_and_shutdown(&self) {
1226 let self_mut = unsafe { &mut *(self as *const Self as *mut Self) };
1228 if let Err(e) = self_mut.sync_now() {
1229 log::error!("drain_and_shutdown: sync_now failed: {}", e.message);
1230 }
1231 *lock_mutex!(self.auto_sync_interval) = None;
1232 }
1233
1234 #[cfg(not(target_arch = "wasm32"))]
1235 pub fn drain_and_shutdown(&mut self) {
1236 if let Err(e) = self.sync_now() {
1237 log::error!("drain_and_shutdown: sync_now failed: {}", e.message);
1238 }
1239 *lock_mutex!(self.auto_sync_interval) = None;
1240 #[cfg(not(target_arch = "wasm32"))]
1241 {
1242 if let Some(stop) = &self.auto_sync_stop {
1243 stop.store(true, Ordering::SeqCst);
1244 }
1245 if let Some(handle) = self.auto_sync_thread.take() {
1246 let _ = handle.join();
1247 }
1248 if let Some(handle) = self.debounce_thread.take() {
1249 let _ = handle.join();
1250 }
1251 if let Some(task) = self.tokio_timer_task.take() {
1252 task.abort();
1253 }
1254 if let Some(task) = self.tokio_debounce_task.take() {
1255 task.abort();
1256 }
1257 self.auto_sync_stop = None;
1258 self.threshold_hit.store(false, Ordering::SeqCst);
1259 }
1260 }
1261
1262 pub fn clear_cache(&self) {
1263 lock_mutex!(self.cache).clear();
1264 lock_mutex!(self.lru_order).clear();
1265 }
1266
1267 pub async fn on_database_import(&mut self) -> Result<(), DatabaseError> {
1287 log::info!(
1288 "Clearing cache for database '{}' after import",
1289 self.db_name
1290 );
1291
1292 self.clear_cache();
1294
1295 lock_mutex!(self.dirty_blocks).clear();
1297
1298 self.checksum_manager.clear_checksums();
1300
1301 #[cfg(target_arch = "wasm32")]
1303 {
1304 use super::vfs_sync::with_global_allocation_map;
1305 *lock_mutex!(self.allocated_blocks) = with_global_allocation_map(|gam| {
1306 gam.borrow()
1307 .get(&self.db_name)
1308 .cloned()
1309 .unwrap_or_else(std::collections::HashSet::new)
1310 });
1311 log::debug!(
1312 "Reloaded {} allocated blocks from global allocation map",
1313 lock_mutex!(self.allocated_blocks).len()
1314 );
1315
1316 log::debug!("Checksum data will be reloaded from metadata on next verification");
1318 }
1319
1320 #[cfg(not(target_arch = "wasm32"))]
1321 {
1322 #[cfg(feature = "fs_persist")]
1323 {
1324 let mut alloc_path = self.base_dir.clone();
1326 alloc_path.push(&self.db_name);
1327 alloc_path.push("allocations.json");
1328
1329 if let Ok(content) = std::fs::read_to_string(&alloc_path) {
1330 if let Ok(alloc_data) = serde_json::from_str::<serde_json::Value>(&content) {
1331 if let Some(allocated_array) = alloc_data["allocated"].as_array() {
1332 lock_mutex!(self.allocated_blocks).clear();
1333 for block_id_val in allocated_array {
1334 if let Some(block_id) = block_id_val.as_u64() {
1335 lock_mutex!(self.allocated_blocks).insert(block_id);
1336 }
1337 }
1338 log::debug!(
1339 "Reloaded {} allocated blocks from filesystem",
1340 lock_mutex!(self.allocated_blocks).len()
1341 );
1342 }
1343 }
1344 }
1345
1346 let mut meta_path = self.base_dir.clone();
1348 meta_path.push(&self.db_name);
1349 meta_path.push("metadata.json");
1350
1351 if let Ok(content) = std::fs::read_to_string(&meta_path) {
1352 if let Ok(meta_data) = serde_json::from_str::<serde_json::Value>(&content) {
1353 if let Some(entries) = meta_data["entries"].as_array() {
1354 let mut new_checksums = HashMap::new();
1355 let mut new_algos = HashMap::new();
1356
1357 for entry in entries {
1358 if let (Some(block_id), Some(checksum), Some(algo_str)) = (
1359 entry[0].as_u64(),
1360 entry[1]["checksum"].as_u64(),
1361 entry[1]["algo"].as_str(),
1362 ) {
1363 new_checksums.insert(block_id, checksum);
1364
1365 let algo = match algo_str {
1366 "CRC32" => super::metadata::ChecksumAlgorithm::CRC32,
1367 _ => super::metadata::ChecksumAlgorithm::FastHash,
1368 };
1369 new_algos.insert(block_id, algo);
1370 }
1371 }
1372
1373 self.checksum_manager
1374 .replace_all(new_checksums.clone(), new_algos);
1375 log::debug!(
1376 "Reloaded {} checksums from filesystem metadata",
1377 new_checksums.len()
1378 );
1379 }
1380 }
1381 } else {
1382 log::debug!("No metadata file found, checksums will be empty after import");
1383 }
1384 }
1385
1386 #[cfg(not(feature = "fs_persist"))]
1387 {
1388 use super::vfs_sync::with_global_allocation_map;
1390
1391 *lock_mutex!(self.allocated_blocks) = with_global_allocation_map(|gam| {
1392 #[cfg(target_arch = "wasm32")]
1393 let map = gam;
1394 #[cfg(not(target_arch = "wasm32"))]
1395 let map = gam.borrow();
1396 map.get(&self.db_name)
1397 .cloned()
1398 .unwrap_or_else(std::collections::HashSet::new)
1399 });
1400 log::debug!(
1401 "Reloaded {} allocated blocks from global allocation map (native test)",
1402 lock_mutex!(self.allocated_blocks).len()
1403 );
1404
1405 log::debug!("Checksum data will be reloaded from metadata on next verification");
1407 }
1408 }
1409
1410 log::info!(
1411 "Cache and allocation state refreshed for '{}'",
1412 self.db_name
1413 );
1414
1415 Ok(())
1416 }
1417
1418 #[cfg(target_arch = "wasm32")]
1420 pub fn reload_cache_from_global_storage(&self) {
1421 use crate::storage::vfs_sync::{with_global_metadata, with_global_storage};
1422
1423 log::info!("[RELOAD] Starting cache reload for: {}", self.db_name);
1424
1425 #[cfg(target_arch = "wasm32")]
1426 web_sys::console::log_1(
1427 &format!("[RELOAD] BlockStorage.db_name = {}", self.db_name).into(),
1428 );
1429
1430 let fresh_cache = with_global_storage(|storage_map| {
1431 if let Some(db_storage) = storage_map.borrow().get(&self.db_name) {
1432 log::info!(
1433 "[RELOAD] Found {} blocks in GLOBAL_STORAGE",
1434 db_storage.len()
1435 );
1436 db_storage.clone()
1437 } else {
1438 log::warn!(
1439 "[RELOAD] No blocks found in GLOBAL_STORAGE for {}",
1440 self.db_name
1441 );
1442 std::collections::HashMap::new()
1443 }
1444 });
1445
1446 let block_count = fresh_cache.len();
1447 log::info!(
1448 "[RELOAD] Loading {} blocks into cache and marking as dirty",
1449 block_count
1450 );
1451
1452 with_global_metadata(|metadata_map| {
1455 if let Some(db_metadata) = metadata_map.borrow().get(&self.db_name) {
1456 log::info!("[RELOAD] Found {} metadata entries", db_metadata.len());
1457 self.checksum_manager.clear_checksums();
1459
1460 let mut new_checksums = std::collections::HashMap::new();
1462 let mut new_algos = std::collections::HashMap::new();
1463 for (block_id, meta) in db_metadata {
1464 new_checksums.insert(*block_id, meta.checksum);
1465 new_algos.insert(*block_id, meta.algo);
1466 }
1467 self.checksum_manager.replace_all(new_checksums, new_algos);
1468 } else {
1469 log::warn!("[RELOAD] No metadata found for {}", self.db_name);
1470 self.checksum_manager.clear_checksums();
1472 }
1473 });
1474
1475 let old_lru = {
1477 let mut lru = lock_mutex!(self.lru_order);
1478 std::mem::replace(&mut *lru, std::collections::VecDeque::new())
1479 };
1480 lock_mutex!(self.cache).clear();
1481
1482 for (block_id, block_data) in fresh_cache {
1484 lock_mutex!(self.cache).insert(block_id, block_data);
1485 }
1486
1487 log::info!(
1488 "[RELOAD] Cache reloaded with {} blocks",
1489 lock_mutex!(self.cache).len()
1490 );
1491
1492 for block_id in old_lru {
1494 if lock_mutex!(self.cache).contains_key(&block_id) {
1495 lock_mutex!(self.lru_order).push_back(block_id);
1496 }
1497 }
1498
1499 let block_ids: Vec<u64> = lock_mutex!(self.cache).keys().copied().collect();
1501 for block_id in block_ids {
1502 if !lock_mutex!(self.lru_order).contains(&block_id) {
1503 lock_mutex!(self.lru_order).push_back(block_id);
1504 }
1505 }
1506
1507 log::info!(
1508 "[RELOAD] Reload complete: {} blocks in cache, {} dirty",
1509 lock_mutex!(self.cache).len(),
1510 lock_mutex!(self.dirty_blocks).len()
1511 );
1512
1513 #[cfg(target_arch = "wasm32")]
1515 {
1516 if let Some(block_0) = lock_mutex!(self.cache).get(&0) {
1517 let is_valid = block_0.len() >= 16 && &block_0[0..16] == b"SQLite format 3\0";
1518 web_sys::console::log_1(
1519 &format!(
1520 "[RELOAD] Block 0 in cache: {} bytes, valid SQLite header: {}",
1521 block_0.len(),
1522 is_valid
1523 )
1524 .into(),
1525 );
1526 if block_0.len() >= 16 {
1527 web_sys::console::log_1(
1528 &format!("[RELOAD] Block 0 header [0..16]: {:02x?}", &block_0[0..16])
1529 .into(),
1530 );
1531 }
1532
1533 if block_0.len() >= 28 {
1535 let db_size_bytes = [block_0[24], block_0[25], block_0[26], block_0[27]];
1536 let db_size_pages = u32::from_be_bytes(db_size_bytes);
1537 web_sys::console::log_1(
1538 &format!(
1539 "[RELOAD] Database size field (offset 24-27): {} pages ({:02x?})",
1540 db_size_pages, db_size_bytes
1541 )
1542 .into(),
1543 );
1544 }
1545
1546 if block_0.len() >= 100 {
1548 web_sys::console::log_1(
1549 &format!(
1550 "[RELOAD] Full header dump [16-31]: {:02x?}",
1551 &block_0[16..32]
1552 )
1553 .into(),
1554 );
1555 web_sys::console::log_1(
1556 &format!(
1557 "[RELOAD] Full header dump [32-47]: {:02x?}",
1558 &block_0[32..48]
1559 )
1560 .into(),
1561 );
1562 web_sys::console::log_1(
1563 &format!(
1564 "[RELOAD] Full header dump [48-63]: {:02x?}",
1565 &block_0[48..64]
1566 )
1567 .into(),
1568 );
1569 web_sys::console::log_1(
1570 &format!(
1571 "[RELOAD] Full header dump [64-79]: {:02x?}",
1572 &block_0[64..80]
1573 )
1574 .into(),
1575 );
1576 web_sys::console::log_1(
1577 &format!(
1578 "[RELOAD] Full header dump [80-99]: {:02x?}",
1579 &block_0[80..100]
1580 )
1581 .into(),
1582 );
1583 }
1584 } else {
1585 web_sys::console::log_1(
1586 &format!("[RELOAD] ERROR: Block 0 NOT in cache after reload!").into(),
1587 );
1588 }
1589 }
1590 }
1591
1592 pub fn get_cache_size(&self) -> usize {
1593 lock_mutex!(self.cache).len()
1594 }
1595
1596 pub fn get_dirty_count(&self) -> usize {
1597 lock_mutex!(self.dirty_blocks).len()
1598 }
1599
1600 pub fn get_db_name(&self) -> &str {
1601 &self.db_name
1602 }
1603
1604 pub fn is_cached(&self, block_id: u64) -> bool {
1605 lock_mutex!(self.cache).contains_key(&block_id)
1606 }
1607
1608 pub async fn allocate_block(&mut self) -> Result<u64, DatabaseError> {
1610 super::allocation::allocate_block_impl(self).await
1611 }
1612
1613 pub async fn deallocate_block(&mut self, block_id: u64) -> Result<(), DatabaseError> {
1615 super::allocation::deallocate_block_impl(self, block_id).await
1616 }
1617
1618 pub fn get_allocated_count(&self) -> usize {
1620 lock_mutex!(self.allocated_blocks).len()
1621 }
1622
1623 #[cfg(target_arch = "wasm32")]
1627 pub async fn crash_simulation_sync(
1628 &mut self,
1629 blocks_written: bool,
1630 ) -> Result<(), DatabaseError> {
1631 log::info!(
1632 "CRASH SIMULATION: Starting crash simulation with blocks_written={}",
1633 blocks_written
1634 );
1635
1636 if blocks_written {
1637 let dirty_blocks = {
1642 let dirty = lock_mutex!(self.dirty_blocks);
1643 dirty.clone()
1644 };
1645
1646 if !dirty_blocks.is_empty() {
1647 log::info!(
1648 "CRASH SIMULATION: Writing {} blocks to IndexedDB before crash",
1649 dirty_blocks.len()
1650 );
1651
1652 let metadata_to_persist: Vec<(u64, u64)> = dirty_blocks
1654 .keys()
1655 .map(|&block_id| {
1656 let next_commit = self.get_commit_marker() + 1;
1657 (block_id, next_commit)
1658 })
1659 .collect();
1660
1661 log::debug!(
1662 "CRASH SIMULATION: About to call persist_to_indexeddb for {} blocks",
1663 dirty_blocks.len()
1664 );
1665
1666 super::wasm_indexeddb::persist_to_indexeddb(
1668 &self.db_name,
1669 dirty_blocks,
1670 metadata_to_persist,
1671 )
1672 .await?;
1673
1674 log::info!("CRASH SIMULATION: persist_to_indexeddb completed successfully");
1675 log::info!(
1676 "CRASH SIMULATION: Blocks written to IndexedDB, simulating crash before commit marker advance"
1677 );
1678
1679 lock_mutex!(self.dirty_blocks).clear();
1681
1682 return Ok(());
1686 } else {
1687 log::info!("CRASH SIMULATION: No dirty blocks to write");
1688 return Ok(());
1689 }
1690 } else {
1691 log::info!("CRASH SIMULATION: Simulating crash before blocks are written to IndexedDB");
1693
1694 return Ok(());
1696 }
1697 }
1698
1699 #[cfg(target_arch = "wasm32")]
1702 pub async fn crash_simulation_partial_sync(
1703 &mut self,
1704 blocks_to_write: &[u64],
1705 ) -> Result<(), DatabaseError> {
1706 log::info!(
1707 "CRASH SIMULATION: Starting partial crash simulation for {} blocks",
1708 blocks_to_write.len()
1709 );
1710
1711 let dirty_blocks = {
1712 let dirty = lock_mutex!(self.dirty_blocks);
1713 dirty.clone()
1714 };
1715
1716 let partial_blocks: std::collections::HashMap<u64, Vec<u8>> = dirty_blocks
1718 .into_iter()
1719 .filter(|(block_id, _)| blocks_to_write.contains(block_id))
1720 .collect();
1721
1722 if !partial_blocks.is_empty() {
1723 log::info!(
1724 "CRASH SIMULATION: Writing {} out of {} blocks before crash",
1725 partial_blocks.len(),
1726 blocks_to_write.len()
1727 );
1728
1729 let metadata_to_persist: Vec<(u64, u64)> = partial_blocks
1730 .keys()
1731 .map(|&block_id| {
1732 let next_commit = self.get_commit_marker() + 1;
1733 (block_id, next_commit)
1734 })
1735 .collect();
1736
1737 super::wasm_indexeddb::persist_to_indexeddb(
1739 &self.db_name,
1740 partial_blocks.clone(),
1741 metadata_to_persist,
1742 )
1743 .await?;
1744
1745 {
1747 let mut dirty = lock_mutex!(self.dirty_blocks);
1748 for block_id in partial_blocks.keys() {
1749 dirty.remove(block_id);
1750 }
1751 }
1752
1753 log::info!(
1754 "CRASH SIMULATION: Partial blocks written, simulating crash before commit marker advance"
1755 );
1756
1757 }
1759
1760 Ok(())
1761 }
1762
1763 #[cfg(target_arch = "wasm32")]
1767 pub async fn perform_crash_recovery(&mut self) -> Result<CrashRecoveryAction, DatabaseError> {
1768 log::info!(
1769 "CRASH RECOVERY: Starting crash recovery scan for database: {}",
1770 self.db_name
1771 );
1772
1773 let current_marker = self.get_commit_marker();
1775 log::info!("CRASH RECOVERY: Current commit marker: {}", current_marker);
1776
1777 let inconsistent_blocks = self.scan_for_inconsistent_blocks(current_marker).await?;
1780
1781 if inconsistent_blocks.is_empty() {
1782 log::info!("CRASH RECOVERY: No inconsistent blocks found, system is consistent");
1783 return Ok(CrashRecoveryAction::NoActionNeeded);
1784 }
1785
1786 log::info!(
1787 "CRASH RECOVERY: Found {} inconsistent blocks that need recovery",
1788 inconsistent_blocks.len()
1789 );
1790
1791 let recovery_action = self.determine_recovery_action(&inconsistent_blocks).await?;
1793
1794 match recovery_action {
1795 CrashRecoveryAction::Rollback => {
1796 log::info!("CRASH RECOVERY: Performing rollback of incomplete transaction");
1797 self.rollback_incomplete_transaction(&inconsistent_blocks)
1798 .await?;
1799 }
1800 CrashRecoveryAction::Finalize => {
1801 log::info!("CRASH RECOVERY: Performing finalization of complete transaction");
1802 self.finalize_complete_transaction(&inconsistent_blocks)
1803 .await?;
1804 }
1805 CrashRecoveryAction::NoActionNeeded => {
1806 }
1808 }
1809
1810 log::info!("CRASH RECOVERY: Recovery completed successfully");
1811 Ok(recovery_action)
1812 }
1813
1814 #[cfg(target_arch = "wasm32")]
1816 async fn scan_for_inconsistent_blocks(
1817 &self,
1818 commit_marker: u64,
1819 ) -> Result<Vec<(u64, u64)>, DatabaseError> {
1820 log::info!(
1821 "CRASH RECOVERY: Scanning for blocks with version > {}",
1822 commit_marker
1823 );
1824
1825 let mut inconsistent_blocks = Vec::new();
1828
1829 vfs_sync::with_global_metadata(|meta_map| {
1830 if let Some(db_meta) = meta_map.borrow().get(&self.db_name) {
1831 for (block_id, metadata) in db_meta.iter() {
1832 if metadata.version as u64 > commit_marker {
1833 log::info!(
1834 "CRASH RECOVERY: Found inconsistent block {} with version {} > marker {}",
1835 block_id,
1836 metadata.version,
1837 commit_marker
1838 );
1839 inconsistent_blocks.push((*block_id, metadata.version as u64));
1840 }
1841 }
1842 }
1843 });
1844
1845 Ok(inconsistent_blocks)
1846 }
1847
1848 #[cfg(target_arch = "wasm32")]
1850 async fn determine_recovery_action(
1851 &self,
1852 inconsistent_blocks: &[(u64, u64)],
1853 ) -> Result<CrashRecoveryAction, DatabaseError> {
1854 let expected_next_commit = self.get_commit_marker() + 1;
1859 let all_same_version = inconsistent_blocks
1860 .iter()
1861 .all(|(_, version)| *version == expected_next_commit);
1862
1863 if all_same_version && !inconsistent_blocks.is_empty() {
1864 log::info!(
1865 "CRASH RECOVERY: All inconsistent blocks have expected version {}, finalizing transaction",
1866 expected_next_commit
1867 );
1868 Ok(CrashRecoveryAction::Finalize)
1869 } else {
1870 log::info!(
1871 "CRASH RECOVERY: Inconsistent block versions detected, rolling back transaction"
1872 );
1873 Ok(CrashRecoveryAction::Rollback)
1874 }
1875 }
1876
1877 #[cfg(target_arch = "wasm32")]
1879 async fn rollback_incomplete_transaction(
1880 &mut self,
1881 inconsistent_blocks: &[(u64, u64)],
1882 ) -> Result<(), DatabaseError> {
1883 log::info!(
1884 "CRASH RECOVERY: Rolling back {} inconsistent blocks",
1885 inconsistent_blocks.len()
1886 );
1887
1888 vfs_sync::with_global_metadata(|meta_map| {
1890 if let Some(db_meta) = meta_map.borrow_mut().get_mut(&self.db_name) {
1891 for (block_id, _) in inconsistent_blocks {
1892 log::info!(
1893 "CRASH RECOVERY: Removing inconsistent block {} from metadata",
1894 block_id
1895 );
1896 db_meta.remove(block_id);
1897 }
1898 }
1899 });
1900
1901 vfs_sync::with_global_storage(|storage_map| {
1903 if let Some(db_storage) = storage_map.borrow_mut().get_mut(&self.db_name) {
1904 for (block_id, _) in inconsistent_blocks {
1905 log::info!(
1906 "CRASH RECOVERY: Removing inconsistent block {} from global storage",
1907 block_id
1908 );
1909 db_storage.remove(block_id);
1910 }
1911 }
1912 });
1913
1914 for (block_id, _) in inconsistent_blocks {
1916 lock_mutex!(self.cache).remove(block_id);
1917 lock_mutex!(self.lru_order).retain(|&id| id != *block_id);
1919 }
1920
1921 let block_ids_to_delete: Vec<u64> = inconsistent_blocks.iter().map(|(id, _)| *id).collect();
1923 if !block_ids_to_delete.is_empty() {
1924 log::info!(
1925 "CRASH RECOVERY: Deleting {} blocks from IndexedDB",
1926 block_ids_to_delete.len()
1927 );
1928 super::wasm_indexeddb::delete_blocks_from_indexeddb(
1929 &self.db_name,
1930 &block_ids_to_delete,
1931 )
1932 .await?;
1933 log::info!("CRASH RECOVERY: Successfully deleted blocks from IndexedDB");
1934 }
1935
1936 log::info!("CRASH RECOVERY: Rollback completed");
1937 Ok(())
1938 }
1939
1940 #[cfg(target_arch = "wasm32")]
1942 async fn finalize_complete_transaction(
1943 &mut self,
1944 inconsistent_blocks: &[(u64, u64)],
1945 ) -> Result<(), DatabaseError> {
1946 log::info!(
1947 "CRASH RECOVERY: Finalizing transaction for {} blocks",
1948 inconsistent_blocks.len()
1949 );
1950
1951 if let Some((_, target_version)) = inconsistent_blocks.first() {
1953 let new_commit_marker = *target_version;
1954
1955 vfs_sync::with_global_commit_marker(|cm| {
1957 cm.borrow_mut()
1958 .insert(self.db_name.clone(), new_commit_marker);
1959 });
1960
1961 log::info!(
1962 "CRASH RECOVERY: Advanced commit marker from {} to {}",
1963 self.get_commit_marker(),
1964 new_commit_marker
1965 );
1966
1967 for (block_id, _) in inconsistent_blocks {
1969 if let Ok(data) = self.read_block_sync(*block_id) {
1971 self.checksum_manager.store_checksum(*block_id, &data);
1972 log::info!(
1973 "CRASH RECOVERY: Updated checksum for finalized block {}",
1974 block_id
1975 );
1976 }
1977 }
1978 }
1979
1980 log::info!("CRASH RECOVERY: Finalization completed");
1981 Ok(())
1982 }
1983
1984 #[cfg(target_arch = "wasm32")]
1988 pub async fn start_leader_election(&self) -> Result<(), DatabaseError> {
1989 if self.leader_election.borrow().is_none() {
1990 log::debug!(
1991 "BlockStorage::start_leader_election() - Creating new LeaderElectionManager for {}",
1992 self.db_name
1993 );
1994 let mut manager =
1995 super::leader_election::LeaderElectionManager::new(self.db_name.clone());
1996 log::debug!("BlockStorage::start_leader_election() - Calling manager.start_election()");
1997 manager.start_election().await?;
1998 log::debug!(
1999 "BlockStorage::start_leader_election() - Election started, storing manager"
2000 );
2001 *self.leader_election.borrow_mut() = Some(manager);
2002 } else {
2003 log::debug!(
2005 "BlockStorage::start_leader_election() - Election already exists, forcing leadership"
2006 );
2007 if let Some(ref mut manager) = *self.leader_election.borrow_mut() {
2008 manager.force_become_leader().await?;
2009 }
2010 }
2011 Ok(())
2012 }
2013
2014 #[cfg(target_arch = "wasm32")]
2017 pub fn was_leader(&self) -> bool {
2018 self.leader_election
2019 .borrow()
2020 .as_ref()
2021 .map(|m| m.state.borrow().is_leader)
2022 .unwrap_or(false)
2023 }
2024
2025 #[cfg(target_arch = "wasm32")]
2029 pub fn stop_heartbeat_sync(&self) {
2030 if let Ok(mut manager_ref) = self.leader_election.try_borrow_mut() {
2032 if let Some(ref mut manager) = *manager_ref {
2033 if let Some(interval_id) = manager.heartbeat_interval.take() {
2035 if let Some(window) = web_sys::window() {
2036 window.clear_interval_with_handle(interval_id);
2037 web_sys::console::log_1(
2038 &format!(
2039 "[DROP] Cleared heartbeat interval {} for {}",
2040 interval_id, self.db_name
2041 )
2042 .into(),
2043 );
2044 }
2045 }
2046 }
2049 } else {
2050 web_sys::console::log_1(
2052 &format!(
2053 "[DROP] Skipping heartbeat stop for {} (already handled)",
2054 self.db_name
2055 )
2056 .into(),
2057 );
2058 }
2059 }
2060
2061 #[cfg(target_arch = "wasm32")]
2063 pub async fn is_leader(&self) -> bool {
2064 if self.leader_election.borrow().is_none() {
2066 log::debug!(
2067 "BlockStorage::is_leader() - Starting leader election for {}",
2068 self.db_name
2069 );
2070 if let Err(e) = self.start_leader_election().await {
2071 log::error!("Failed to start leader election: {:?}", e);
2072 return false;
2073 }
2074 log::debug!("BlockStorage::is_leader() - Leader election started successfully");
2075 } else {
2076 log::debug!(
2077 "BlockStorage::is_leader() - Leader election already exists for {}",
2078 self.db_name
2079 );
2080 }
2081
2082 if let Some(ref mut manager) = *self.leader_election.borrow_mut() {
2083 let is_leader = manager.is_leader().await;
2084
2085 if !is_leader {
2087 let state = manager.state.borrow();
2088 if state.leader_id.is_none() {
2089 log::debug!(
2090 "No current leader for {} - triggering re-election",
2091 self.db_name
2092 );
2093 drop(state);
2094 let _ = manager.try_become_leader().await;
2095
2096 let new_is_leader = manager.state.borrow().is_leader;
2098 if new_is_leader && manager.heartbeat_interval.is_none() {
2099 let _ = manager.start_heartbeat();
2100 }
2101
2102 log::debug!(
2103 "is_leader() for {} = {} (after re-election)",
2104 self.db_name,
2105 new_is_leader
2106 );
2107 return new_is_leader;
2108 }
2109 }
2110
2111 log::debug!("is_leader() for {} = {}", self.db_name, is_leader);
2112 is_leader
2113 } else {
2114 log::debug!("No leader election manager for {}", self.db_name);
2115 false
2116 }
2117 }
2118
2119 #[cfg(target_arch = "wasm32")]
2121 pub async fn stop_leader_election(&self) -> Result<(), DatabaseError> {
2122 if let Some(mut manager) = self
2123 .leader_election
2124 .try_borrow_mut()
2125 .expect("Failed to borrow leader_election in stop_leader_election")
2126 .take()
2127 {
2128 manager.stop_election().await?;
2129 }
2130 Ok(())
2131 }
2132
2133 #[cfg(target_arch = "wasm32")]
2135 pub async fn send_leader_heartbeat(&self) -> Result<(), DatabaseError> {
2136 if let Some(ref manager) = *self.leader_election.borrow() {
2137 manager.send_heartbeat().await
2138 } else {
2139 Err(DatabaseError::new(
2140 "LEADER_ELECTION_ERROR",
2141 "Leader election not started",
2142 ))
2143 }
2144 }
2145
2146 #[cfg(target_arch = "wasm32")]
2148 pub async fn get_last_leader_heartbeat(&self) -> Result<u64, DatabaseError> {
2149 if let Some(ref manager) = *self.leader_election.borrow() {
2150 Ok(manager.get_last_heartbeat().await)
2151 } else {
2152 Err(DatabaseError::new(
2153 "LEADER_ELECTION_ERROR",
2154 "Leader election not started",
2155 ))
2156 }
2157 }
2158
2159 pub fn get_metrics(&self) -> super::observability::StorageMetrics {
2163 let dirty_count = self.get_dirty_count();
2164 let dirty_bytes = dirty_count * BLOCK_SIZE;
2165
2166 #[cfg(not(target_arch = "wasm32"))]
2167 let (sync_count, timer_sync_count, debounce_sync_count, last_sync_duration_ms) = {
2168 (
2169 self.sync_count.load(Ordering::SeqCst),
2170 self.timer_sync_count.load(Ordering::SeqCst),
2171 self.debounce_sync_count.load(Ordering::SeqCst),
2172 self.last_sync_duration_ms.load(Ordering::SeqCst),
2173 )
2174 };
2175
2176 #[cfg(target_arch = "wasm32")]
2177 let (sync_count, timer_sync_count, debounce_sync_count, last_sync_duration_ms) = {
2178 (self.observability.get_sync_count(), 0, 0, 1)
2180 };
2181
2182 let error_count = self.observability.get_error_count();
2183 let checksum_failures = self.observability.get_checksum_failures();
2184
2185 let total_operations = sync_count + error_count;
2187 let (throughput_blocks_per_sec, throughput_bytes_per_sec) = self
2188 .observability
2189 .calculate_throughput(last_sync_duration_ms);
2190 let error_rate = self.observability.calculate_error_rate(total_operations);
2191
2192 super::observability::StorageMetrics {
2193 dirty_count,
2194 dirty_bytes,
2195 sync_count,
2196 timer_sync_count,
2197 debounce_sync_count,
2198 error_count,
2199 checksum_failures,
2200 last_sync_duration_ms,
2201 throughput_blocks_per_sec,
2202 throughput_bytes_per_sec,
2203 error_rate,
2204 }
2205 }
2206
2207 #[cfg(not(target_arch = "wasm32"))]
2209 pub fn set_sync_callbacks(
2210 &mut self,
2211 on_sync_start: super::observability::SyncStartCallback,
2212 on_sync_success: super::observability::SyncSuccessCallback,
2213 on_sync_failure: super::observability::SyncFailureCallback,
2214 ) {
2215 self.observability.sync_start_callback = Some(on_sync_start);
2216 self.observability.sync_success_callback = Some(on_sync_success);
2217 self.observability.sync_failure_callback = Some(on_sync_failure);
2218 }
2219
2220 #[cfg(not(target_arch = "wasm32"))]
2222 pub fn set_backpressure_callback(
2223 &mut self,
2224 callback: super::observability::BackpressureCallback,
2225 ) {
2226 self.observability.backpressure_callback = Some(callback);
2227 }
2228
2229 #[cfg(not(target_arch = "wasm32"))]
2231 pub fn set_error_callback(&mut self, callback: super::observability::ErrorCallback) {
2232 self.observability.error_callback = Some(callback);
2233 }
2234
2235 #[cfg(target_arch = "wasm32")]
2237 pub fn set_sync_success_callback(
2238 &mut self,
2239 callback: super::observability::WasmSyncSuccessCallback,
2240 ) {
2241 self.observability.wasm_sync_success_callback = Some(callback);
2242 }
2243
2244 pub fn is_auto_sync_enabled(&self) -> bool {
2246 lock_mutex!(self.auto_sync_interval).is_some()
2247 }
2248
2249 pub fn get_sync_policy(&self) -> Option<super::SyncPolicy> {
2251 lock_mutex!(self.policy).clone()
2252 }
2253
2254 pub async fn force_sync(&mut self) -> Result<(), DatabaseError> {
2260 log::info!("force_sync: Starting forced synchronization with durability guarantees");
2261
2262 let dirty_count = self.get_dirty_count();
2263 if dirty_count == 0 {
2264 log::debug!("force_sync: No dirty blocks to sync");
2265 return Ok(());
2266 }
2267
2268 log::info!(
2269 "force_sync: Syncing {} dirty blocks with durability guarantee",
2270 dirty_count
2271 );
2272
2273 self.sync().await?;
2275
2276 log::info!("force_sync: Successfully completed forced synchronization");
2277 Ok(())
2278 }
2279
2280 #[cfg(feature = "telemetry")]
2282 pub fn set_metrics(&mut self, metrics: Option<crate::telemetry::Metrics>) {
2283 self.metrics = metrics;
2284 }
2285
2286 #[cfg(feature = "telemetry")]
2288 pub fn metrics(&self) -> Option<&crate::telemetry::Metrics> {
2289 self.metrics.as_ref()
2290 }
2291
2292 #[cfg(feature = "telemetry")]
2294 pub fn new_for_test() -> Self {
2295 Self {
2296 cache: Mutex::new(HashMap::new()),
2297 dirty_blocks: Arc::new(parking_lot::Mutex::new(HashMap::new())),
2298 allocated_blocks: Mutex::new(HashSet::new()),
2299 deallocated_blocks: Mutex::new(HashSet::new()),
2300 next_block_id: AtomicU64::new(1),
2301 capacity: 128,
2302 lru_order: Mutex::new(VecDeque::new()),
2303 checksum_manager: crate::storage::metadata::ChecksumManager::new(
2304 crate::storage::metadata::ChecksumAlgorithm::FastHash,
2305 ),
2306 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
2307 base_dir: std::path::PathBuf::from("/tmp/test"),
2308 db_name: "test.db".to_string(),
2309 auto_sync_interval: Mutex::new(None),
2310 #[cfg(not(target_arch = "wasm32"))]
2311 last_auto_sync: std::time::Instant::now(),
2312 policy: Mutex::new(None),
2313 #[cfg(not(target_arch = "wasm32"))]
2314 auto_sync_stop: None,
2315 #[cfg(not(target_arch = "wasm32"))]
2316 auto_sync_thread: None,
2317 #[cfg(not(target_arch = "wasm32"))]
2318 debounce_thread: None,
2319 #[cfg(not(target_arch = "wasm32"))]
2320 tokio_timer_task: None,
2321 #[cfg(not(target_arch = "wasm32"))]
2322 tokio_debounce_task: None,
2323 #[cfg(not(target_arch = "wasm32"))]
2324 last_write_ms: Arc::new(std::sync::atomic::AtomicU64::new(0)),
2325 #[cfg(not(target_arch = "wasm32"))]
2326 threshold_hit: Arc::new(std::sync::atomic::AtomicBool::new(false)),
2327 #[cfg(not(target_arch = "wasm32"))]
2328 sync_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
2329 #[cfg(not(target_arch = "wasm32"))]
2330 timer_sync_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
2331 #[cfg(not(target_arch = "wasm32"))]
2332 debounce_sync_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
2333 #[cfg(not(target_arch = "wasm32"))]
2334 last_sync_duration_ms: Arc::new(std::sync::atomic::AtomicU64::new(0)),
2335 #[cfg(not(target_arch = "wasm32"))]
2336 sync_sender: None,
2337 #[cfg(not(target_arch = "wasm32"))]
2338 sync_receiver: None,
2339 recovery_report: RecoveryReport::default(),
2340 #[cfg(target_arch = "wasm32")]
2341 leader_election: std::cell::RefCell::new(None),
2342 observability: super::observability::ObservabilityManager::new(),
2343 metrics: None,
2344 }
2345 }
2346}
2347
2348#[cfg(all(test, target_arch = "wasm32"))]
2349mod wasm_commit_marker_tests {
2350 use super::*;
2351 use wasm_bindgen_test::*;
2352
2353 wasm_bindgen_test_configure!(run_in_browser);
2354
2355 fn set_commit_marker(db: &str, v: u64) {
2357 super::vfs_sync::with_global_commit_marker(|cm| {
2358 cm.borrow_mut().insert(db.to_string(), v);
2359 });
2360 }
2361
2362 fn get_commit_marker(db: &str) -> u64 {
2364 vfs_sync::with_global_commit_marker(|cm| cm.borrow().get(db).copied().unwrap_or(0))
2365 }
2366
2367 #[wasm_bindgen_test]
2368 async fn gating_returns_zeroed_until_marker_catches_up_wasm() {
2369 let db = "cm_gating_wasm";
2370 let mut s = BlockStorage::new(db).await.expect("create storage");
2371
2372 let bid = s.allocate_block().await.expect("alloc block");
2374 let data_v1 = vec![0x33u8; BLOCK_SIZE];
2375 s.write_block(bid, data_v1.clone()).await.expect("write v1");
2376
2377 s.clear_cache();
2379 let out0 = s.read_block(bid).await.expect("read before commit");
2380 assert_eq!(
2381 out0,
2382 vec![0u8; BLOCK_SIZE],
2383 "uncommitted data must read as zeroed"
2384 );
2385
2386 s.sync().await.expect("sync v1");
2388 s.clear_cache();
2389 let out1 = s.read_block(bid).await.expect("read after commit");
2390 assert_eq!(out1, data_v1, "committed data should be visible");
2391 }
2392
2393 #[wasm_bindgen_test]
2394 async fn invisible_blocks_skip_checksum_verification_wasm() {
2395 let db = "cm_checksum_skip_wasm";
2396 let mut s = BlockStorage::new(db).await.expect("create storage");
2397
2398 let bid = s.allocate_block().await.expect("alloc block");
2399 let data = vec![0x44u8; BLOCK_SIZE];
2400 s.write_block(bid, data).await.expect("write v1");
2401 s.sync().await.expect("sync v1"); set_commit_marker(db, 0);
2405
2406 s.set_block_checksum_for_testing(bid, 1234567);
2408 s.clear_cache();
2409 let out = s
2410 .read_block(bid)
2411 .await
2412 .expect("read while invisible should not error");
2413 assert_eq!(
2414 out,
2415 vec![0u8; BLOCK_SIZE],
2416 "invisible block reads as zeroed"
2417 );
2418
2419 set_commit_marker(db, 1);
2421 s.clear_cache();
2422 let err = s
2423 .read_block(bid)
2424 .await
2425 .expect_err("expected checksum mismatch once visible");
2426 assert_eq!(err.code, "CHECKSUM_MISMATCH");
2427 }
2428
2429 #[wasm_bindgen_test]
2430 async fn commit_marker_advances_and_versions_track_syncs_wasm() {
2431 let db = "cm_versions_wasm";
2432 let mut s = BlockStorage::new_with_capacity(db, 8)
2433 .await
2434 .expect("create storage");
2435
2436 let b1 = s.allocate_block().await.expect("alloc b1");
2437 let b2 = s.allocate_block().await.expect("alloc b2");
2438
2439 s.write_block(b1, vec![1u8; BLOCK_SIZE])
2440 .await
2441 .expect("write b1 v1");
2442 s.write_block(b2, vec![2u8; BLOCK_SIZE])
2443 .await
2444 .expect("write b2 v1");
2445 s.sync().await.expect("sync #1");
2446
2447 let cm1 = get_commit_marker(db);
2448 assert_eq!(cm1, 1, "first sync should advance commit marker to 1");
2449 let meta1 = s.get_block_metadata_for_testing();
2450 assert_eq!(meta1.get(&b1).unwrap().1 as u64, cm1);
2451 assert_eq!(meta1.get(&b2).unwrap().1 as u64, cm1);
2452
2453 s.write_block(b1, vec![3u8; BLOCK_SIZE])
2455 .await
2456 .expect("write b1 v2");
2457 s.sync().await.expect("sync #2");
2458
2459 let cm2 = get_commit_marker(db);
2460 assert_eq!(cm2, 2, "second sync should advance commit marker to 2");
2461 let meta2 = s.get_block_metadata_for_testing();
2462 assert_eq!(
2463 meta2.get(&b1).unwrap().1 as u64,
2464 cm2,
2465 "updated block tracks new version"
2466 );
2467 assert_eq!(
2468 meta2.get(&b2).unwrap().1 as u64,
2469 1,
2470 "unchanged block retains prior version"
2471 );
2472 }
2473}
2474
2475#[cfg(all(test, not(target_arch = "wasm32"), not(feature = "fs_persist")))]
2476mod commit_marker_tests {
2477 use super::*;
2478
2479 fn set_commit_marker(db: &str, v: u64) {
2481 super::vfs_sync::with_global_commit_marker(|cm| {
2482 cm.borrow_mut().insert(db.to_string(), v);
2483 });
2484 }
2485
2486 fn get_commit_marker(db: &str) -> u64 {
2488 super::vfs_sync::with_global_commit_marker(|cm| cm.borrow().get(db).copied().unwrap_or(0))
2489 }
2490
2491 #[tokio::test(flavor = "current_thread")]
2492 async fn gating_returns_zeroed_until_marker_catches_up() {
2493 let db = "cm_gating_basic";
2494 println!("DEBUG: Creating BlockStorage for {}", db);
2495 let mut s = BlockStorage::new(db).await.expect("create storage");
2496 println!("DEBUG: BlockStorage created successfully");
2497
2498 let bid = s.allocate_block().await.expect("alloc block");
2500 println!("DEBUG: Allocated block {}", bid);
2501 let data_v1 = vec![0x11u8; BLOCK_SIZE];
2502 s.write_block(bid, data_v1.clone()).await.expect("write v1");
2503 println!("DEBUG: Wrote block {} with data", bid);
2504
2505 s.clear_cache();
2507 let out0 = s.read_block(bid).await.expect("read before commit");
2508 assert_eq!(
2509 out0,
2510 vec![0u8; BLOCK_SIZE],
2511 "uncommitted data must read as zeroed"
2512 );
2513 println!("DEBUG: Pre-sync read returned zeroed data as expected");
2514
2515 println!("DEBUG: About to call sync");
2517 s.sync().await.expect("sync v1");
2518 println!("DEBUG: Sync completed successfully");
2519
2520 let commit_marker = get_commit_marker(db);
2522 println!("DEBUG: Commit marker after sync: {}", commit_marker);
2523
2524 s.clear_cache();
2525 let out1 = s.read_block(bid).await.expect("read after commit");
2526
2527 println!("DEBUG: Expected data: {:?}", &data_v1[..8]);
2529 println!("DEBUG: Actual data: {:?}", &out1[..8]);
2530 println!(
2531 "DEBUG: Data lengths - expected: {}, actual: {}",
2532 data_v1.len(),
2533 out1.len()
2534 );
2535
2536 let data_matches = out1 == data_v1;
2538 println!("DEBUG: Data matches: {}", data_matches);
2539
2540 if !data_matches {
2541 println!("DEBUG: Data mismatch detected - investigating further");
2542 let is_all_zeros = out1.iter().all(|&b| b == 0);
2544 println!("DEBUG: Is all zeros: {}", is_all_zeros);
2545
2546 println!("DEBUG: Final commit marker: {}", get_commit_marker(db));
2548
2549 panic!("Data mismatch: expected committed data to be visible after sync");
2550 }
2551
2552 println!("DEBUG: Test passed - data is visible after commit");
2553 }
2554
2555 #[tokio::test(flavor = "current_thread")]
2556 async fn invisible_blocks_skip_checksum_verification() {
2557 let db = "cm_checksum_skip";
2558 let mut s = BlockStorage::new(db).await.expect("create storage");
2559
2560 let bid = s.allocate_block().await.expect("alloc block");
2561 let data = vec![0xAAu8; BLOCK_SIZE];
2562 s.write_block(bid, data.clone()).await.expect("write v1");
2563 s.sync().await.expect("sync v1"); set_commit_marker(db, 0);
2567
2568 s.set_block_checksum_for_testing(bid, 1234567);
2570 s.clear_cache();
2571 let out = s
2572 .read_block(bid)
2573 .await
2574 .expect("read while invisible should not error");
2575 assert_eq!(
2576 out,
2577 vec![0u8; BLOCK_SIZE],
2578 "invisible block reads as zeroed"
2579 );
2580
2581 set_commit_marker(db, 1);
2583 s.clear_cache();
2584 let err = s
2585 .read_block(bid)
2586 .await
2587 .expect_err("expected checksum mismatch once visible");
2588 assert_eq!(err.code, "CHECKSUM_MISMATCH");
2589 }
2590
2591 #[tokio::test(flavor = "current_thread")]
2592 async fn commit_marker_advances_and_versions_track_syncs() {
2593 let db = "cm_versions";
2594 let mut s = BlockStorage::new_with_capacity(db, 8)
2595 .await
2596 .expect("create storage");
2597
2598 let b1 = s.allocate_block().await.expect("alloc b1");
2599 let b2 = s.allocate_block().await.expect("alloc b2");
2600
2601 s.write_block(b1, vec![1u8; BLOCK_SIZE])
2602 .await
2603 .expect("write b1 v1");
2604 s.write_block(b2, vec![2u8; BLOCK_SIZE])
2605 .await
2606 .expect("write b2 v1");
2607 s.sync().await.expect("sync #1");
2608
2609 let cm1 = get_commit_marker(db);
2610 assert_eq!(cm1, 1, "first sync should advance commit marker to 1");
2611 let meta1 = s.get_block_metadata_for_testing();
2612 assert_eq!(meta1.get(&b1).unwrap().1 as u64, cm1);
2613 assert_eq!(meta1.get(&b2).unwrap().1 as u64, cm1);
2614
2615 s.write_block(b1, vec![3u8; BLOCK_SIZE])
2617 .await
2618 .expect("write b1 v2");
2619 s.sync().await.expect("sync #2");
2620
2621 let cm2 = get_commit_marker(db);
2622 assert_eq!(cm2, 2, "second sync should advance commit marker to 2");
2623 let meta2 = s.get_block_metadata_for_testing();
2624 assert_eq!(
2625 meta2.get(&b1).unwrap().1 as u64,
2626 cm2,
2627 "updated block tracks new version"
2628 );
2629 assert_eq!(
2630 meta2.get(&b2).unwrap().1 as u64,
2631 1,
2632 "unchanged block retains prior version"
2633 );
2634 }
2635}