1#[allow(unused_imports)]
2use super::metadata::BlockMetadataPersist;
3use super::metadata::{ChecksumAlgorithm, ChecksumManager};
4#[cfg(any(
5 target_arch = "wasm32",
6 all(
7 not(target_arch = "wasm32"),
8 any(test, debug_assertions),
9 not(feature = "fs_persist")
10 )
11))]
12use super::vfs_sync;
13use crate::types::DatabaseError;
14#[cfg(target_arch = "wasm32")]
15use js_sys::Date;
16#[cfg(not(target_arch = "wasm32"))]
17use parking_lot::Mutex;
18#[cfg(target_arch = "wasm32")]
19use std::cell::RefCell;
20use std::collections::{HashMap, HashSet, VecDeque};
21#[allow(unused_imports)]
22use std::sync::{
23 Arc,
24 atomic::{AtomicBool, AtomicU64, Ordering},
25};
26use std::time::Duration;
27#[cfg(not(target_arch = "wasm32"))]
28use std::time::{Instant, SystemTime, UNIX_EPOCH};
29#[cfg(not(target_arch = "wasm32"))]
30use tokio::sync::mpsc;
31#[cfg(not(target_arch = "wasm32"))]
32use tokio::task::JoinHandle as TokioJoinHandle;
33
34#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
36use std::path::PathBuf;
37
38#[cfg(not(target_arch = "wasm32"))]
44#[derive(Debug)]
45pub(super) enum SyncRequest {
46 Timer(tokio::sync::oneshot::Sender<()>),
47 Debounce(tokio::sync::oneshot::Sender<()>),
48}
49
50#[derive(Clone, Debug, Default)]
51pub struct RecoveryOptions {
52 pub mode: RecoveryMode,
53 pub on_corruption: CorruptionAction,
54}
55
56#[derive(Clone, Debug, Default)]
57pub enum RecoveryMode {
58 #[default]
59 Full,
60 Sample {
61 count: usize,
62 },
63 Skip,
64}
65
66#[derive(Clone, Debug, Default)]
67pub enum CorruptionAction {
68 #[default]
69 Report,
70 Repair,
71 Fail,
72}
73
74#[derive(Clone, Debug, PartialEq)]
75pub enum CrashRecoveryAction {
76 NoActionNeeded,
77 Rollback,
78 Finalize,
79}
80
81#[derive(Clone, Debug, Default)]
82pub struct RecoveryReport {
83 pub total_blocks_verified: usize,
84 pub corrupted_blocks: Vec<u64>,
85 pub repaired_blocks: Vec<u64>,
86 pub verification_duration_ms: u64,
87}
88
89#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
91#[derive(serde::Serialize, serde::Deserialize, Default)]
92#[allow(dead_code)]
93struct FsMeta {
94 entries: Vec<(u64, BlockMetadataPersist)>,
95}
96
97#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
98#[derive(serde::Serialize, serde::Deserialize, Default)]
99#[allow(dead_code)]
100struct FsAlloc {
101 allocated: Vec<u64>,
102}
103
104#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
105#[derive(serde::Serialize, serde::Deserialize, Default)]
106#[allow(dead_code)]
107struct FsDealloc {
108 tombstones: Vec<u64>,
109}
110
111#[cfg(not(target_arch = "wasm32"))]
113thread_local! {
114 pub(super) static GLOBAL_METADATA_TEST: parking_lot::Mutex<HashMap<String, HashMap<u64, BlockMetadataPersist>>> = parking_lot::Mutex::new(HashMap::new());
115}
116
117#[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions)))]
119thread_local! {
120 static GLOBAL_COMMIT_MARKER_TEST: parking_lot::Mutex<HashMap<String, u64>> = parking_lot::Mutex::new(HashMap::new());
121}
122
123#[derive(Clone, Debug)]
124pub struct SyncPolicy {
125 pub interval_ms: Option<u64>,
126 pub max_dirty: Option<usize>,
127 pub max_dirty_bytes: Option<usize>,
128 pub debounce_ms: Option<u64>,
129 pub verify_after_write: bool,
130}
131
132#[cfg(not(target_arch = "wasm32"))]
133impl Drop for BlockStorage {
134 fn drop(&mut self) {
135 if let Some(stop) = &self.auto_sync_stop {
136 stop.store(true, Ordering::SeqCst);
137 }
138 if let Some(handle) = self.auto_sync_thread.take() {
139 let _ = handle.join();
140 }
141 if let Some(handle) = self.debounce_thread.take() {
142 let _ = handle.join();
143 }
144 if let Some(task) = self.tokio_timer_task.take() {
145 task.abort();
146 }
147 if let Some(task) = self.tokio_debounce_task.take() {
148 task.abort();
149 }
150 self.auto_sync_stop = None;
151 }
152}
153
154pub const BLOCK_SIZE: usize = 4096;
155#[allow(dead_code)]
156pub(super) const DEFAULT_CACHE_CAPACITY: usize = 128;
157#[allow(dead_code)]
158const STORE_NAME: &str = "sqlite_blocks";
159#[allow(dead_code)]
160const METADATA_STORE: &str = "metadata";
161
162#[cfg(target_arch = "wasm32")]
165macro_rules! lock_mutex {
166 ($mutex:expr) => {
167 $mutex
168 .try_borrow_mut()
169 .expect("RefCell borrow failed - reentrancy detected in block_storage.rs")
170 };
171}
172
173#[allow(unused_macros)]
175#[cfg(target_arch = "wasm32")]
176macro_rules! try_lock_mutex {
177 ($mutex:expr) => {
178 $mutex
179 };
180}
181
182#[cfg(not(target_arch = "wasm32"))]
183macro_rules! lock_mutex {
184 ($mutex:expr) => {
185 $mutex.lock()
186 };
187}
188
189#[allow(unused_macros)]
191#[cfg(target_arch = "wasm32")]
192macro_rules! read_lock {
193 ($mutex:expr) => {
194 $mutex
195 .try_borrow()
196 .expect("RefCell borrow failed - likely reentrancy issue")
197 };
198}
199
200#[allow(unused_macros)]
201#[cfg(not(target_arch = "wasm32"))]
202macro_rules! read_lock {
203 ($mutex:expr) => {
204 $mutex.lock()
205 };
206}
207
208#[allow(unused_macros)]
210#[cfg(target_arch = "wasm32")]
211macro_rules! try_lock_mutex {
212 ($mutex:expr) => {
213 $mutex.ok()
214 };
215}
216
217#[allow(unused_macros)]
218#[cfg(not(target_arch = "wasm32"))]
219macro_rules! try_lock_mutex {
220 ($mutex:expr) => {
221 Some($mutex.lock())
222 };
223}
224
225#[allow(unused_macros)]
227#[cfg(target_arch = "wasm32")]
228macro_rules! try_read_lock {
229 ($mutex:expr) => {
230 $mutex.try_borrow().ok()
231 };
232}
233
234#[allow(unused_macros)]
235#[cfg(not(target_arch = "wasm32"))]
236macro_rules! try_read_lock {
237 ($mutex:expr) => {
238 Some($mutex.lock())
239 };
240}
241
242pub struct BlockStorage {
243 #[cfg(target_arch = "wasm32")]
246 pub(super) cache: RefCell<HashMap<u64, Vec<u8>>>,
247 #[cfg(not(target_arch = "wasm32"))]
248 pub(super) cache: Mutex<HashMap<u64, Vec<u8>>>,
249
250 #[cfg(target_arch = "wasm32")]
251 pub(super) dirty_blocks: Arc<RefCell<HashMap<u64, Vec<u8>>>>,
252 #[cfg(not(target_arch = "wasm32"))]
253 pub(super) dirty_blocks: Arc<Mutex<HashMap<u64, Vec<u8>>>>,
254
255 #[cfg(target_arch = "wasm32")]
256 pub(super) allocated_blocks: RefCell<HashSet<u64>>,
257 #[cfg(not(target_arch = "wasm32"))]
258 pub(super) allocated_blocks: Mutex<HashSet<u64>>,
259
260 #[cfg(target_arch = "wasm32")]
261 #[allow(dead_code)]
262 pub(super) deallocated_blocks: RefCell<HashSet<u64>>,
263 #[cfg(not(target_arch = "wasm32"))]
264 #[allow(dead_code)]
265 pub(super) deallocated_blocks: Mutex<HashSet<u64>>,
266 pub(super) next_block_id: AtomicU64,
267 pub(super) capacity: usize,
268
269 #[cfg(target_arch = "wasm32")]
270 pub(super) lru_order: RefCell<VecDeque<u64>>,
271 #[cfg(not(target_arch = "wasm32"))]
272 pub(super) lru_order: Mutex<VecDeque<u64>>,
273
274 pub(super) checksum_manager: ChecksumManager,
276 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
277 pub(super) base_dir: PathBuf,
278 pub(super) db_name: String,
279
280 #[cfg(target_arch = "wasm32")]
282 pub(super) auto_sync_interval: RefCell<Option<Duration>>,
283 #[cfg(not(target_arch = "wasm32"))]
284 pub(super) auto_sync_interval: Mutex<Option<Duration>>,
285
286 #[cfg(not(target_arch = "wasm32"))]
287 pub(super) last_auto_sync: Instant,
288
289 #[cfg(target_arch = "wasm32")]
290 pub(super) policy: RefCell<Option<SyncPolicy>>,
291 #[cfg(not(target_arch = "wasm32"))]
292 pub(super) policy: Mutex<Option<SyncPolicy>>,
293 #[cfg(not(target_arch = "wasm32"))]
294 pub(super) auto_sync_stop: Option<Arc<AtomicBool>>,
295 #[cfg(not(target_arch = "wasm32"))]
296 pub(super) auto_sync_thread: Option<std::thread::JoinHandle<()>>,
297 #[cfg(not(target_arch = "wasm32"))]
298 pub(super) debounce_thread: Option<std::thread::JoinHandle<()>>,
299 #[cfg(not(target_arch = "wasm32"))]
300 pub(super) tokio_timer_task: Option<TokioJoinHandle<()>>,
301 #[cfg(not(target_arch = "wasm32"))]
302 pub(super) tokio_debounce_task: Option<TokioJoinHandle<()>>,
303 #[cfg(not(target_arch = "wasm32"))]
304 pub(super) last_write_ms: Arc<AtomicU64>,
305 #[cfg(not(target_arch = "wasm32"))]
306 pub(super) threshold_hit: Arc<AtomicBool>,
307 #[cfg(not(target_arch = "wasm32"))]
308 pub(super) sync_count: Arc<AtomicU64>,
309 #[cfg(not(target_arch = "wasm32"))]
310 pub(super) timer_sync_count: Arc<AtomicU64>,
311 #[cfg(not(target_arch = "wasm32"))]
312 pub(super) debounce_sync_count: Arc<AtomicU64>,
313 #[cfg(not(target_arch = "wasm32"))]
314 pub(super) last_sync_duration_ms: Arc<AtomicU64>,
315
316 #[cfg(not(target_arch = "wasm32"))]
318 pub(super) sync_sender: Option<mpsc::UnboundedSender<SyncRequest>>,
319 #[cfg(not(target_arch = "wasm32"))]
320 pub(super) sync_receiver: Option<mpsc::UnboundedReceiver<SyncRequest>>,
321
322 pub(super) recovery_report: RecoveryReport,
324
325 #[cfg(target_arch = "wasm32")]
327 pub leader_election: std::cell::RefCell<Option<super::leader_election::LeaderElectionManager>>,
328
329 pub(super) observability: super::observability::ObservabilityManager,
331
332 #[cfg(feature = "telemetry")]
334 pub(super) metrics: Option<crate::telemetry::Metrics>,
335}
336
337impl BlockStorage {
338 #[cfg(target_arch = "wasm32")]
341 pub fn new_sync(db_name: &str) -> Self {
342 log::info!(
343 "Creating BlockStorage synchronously for database: {}",
344 db_name
345 );
346
347 use crate::storage::vfs_sync::with_global_storage;
349 let (cache, allocated_blocks, max_block_id) = with_global_storage(|storage_map| {
350 if let Some(db_storage) = storage_map.borrow().get(db_name) {
351 let cache = db_storage.clone();
352 let allocated = db_storage.keys().copied().collect::<HashSet<_>>();
353 let max_id = db_storage.keys().max().copied().unwrap_or(0);
354 (cache, allocated, max_id)
355 } else {
356 (HashMap::new(), HashSet::new(), 0)
357 }
358 });
359
360 log::info!(
361 "Loaded {} blocks from GLOBAL_STORAGE for {} (max_block_id={})",
362 cache.len(),
363 db_name,
364 max_block_id
365 );
366
367 use crate::storage::vfs_sync::with_global_metadata;
370 let checksum_manager = with_global_metadata(|metadata_map| {
371 if let Some(db_metadata) = metadata_map.borrow().get(db_name) {
372 let mut checksums = HashMap::new();
373 let mut algos = HashMap::new();
374 for (block_id, meta) in db_metadata {
375 checksums.insert(*block_id, meta.checksum);
376 algos.insert(*block_id, meta.algo);
377 }
378 ChecksumManager::with_data(checksums, algos, ChecksumAlgorithm::FastHash)
379 } else {
380 ChecksumManager::new(ChecksumAlgorithm::FastHash)
381 }
382 });
383
384 Self {
385 cache: RefCell::new(cache),
386 dirty_blocks: Arc::new(RefCell::new(HashMap::new())),
387 allocated_blocks: RefCell::new(allocated_blocks),
388 deallocated_blocks: RefCell::new(HashSet::new()),
389 next_block_id: AtomicU64::new(max_block_id + 1),
390 capacity: 128,
391 lru_order: RefCell::new(VecDeque::new()),
392 checksum_manager,
393 db_name: db_name.to_string(),
394 auto_sync_interval: RefCell::new(None),
395 policy: RefCell::new(None),
396 #[cfg(not(target_arch = "wasm32"))]
397 last_auto_sync: Instant::now(),
398 #[cfg(not(target_arch = "wasm32"))]
399 auto_sync_stop: None,
400 #[cfg(not(target_arch = "wasm32"))]
401 auto_sync_thread: None,
402 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
403 base_dir: std::path::PathBuf::from(
404 std::env::var("ABSURDERSQL_FS_BASE")
405 .unwrap_or_else(|_| "./test_storage".to_string()),
406 ),
407 #[cfg(not(target_arch = "wasm32"))]
408 debounce_thread: None,
409 #[cfg(not(target_arch = "wasm32"))]
410 tokio_timer_task: None,
411 #[cfg(not(target_arch = "wasm32"))]
412 tokio_debounce_task: None,
413 #[cfg(not(target_arch = "wasm32"))]
414 last_write_ms: Arc::new(AtomicU64::new(0)),
415 #[cfg(not(target_arch = "wasm32"))]
416 threshold_hit: Arc::new(AtomicBool::new(false)),
417 #[cfg(not(target_arch = "wasm32"))]
418 sync_count: Arc::new(AtomicU64::new(0)),
419 #[cfg(not(target_arch = "wasm32"))]
420 timer_sync_count: Arc::new(AtomicU64::new(0)),
421 #[cfg(not(target_arch = "wasm32"))]
422 debounce_sync_count: Arc::new(AtomicU64::new(0)),
423 #[cfg(not(target_arch = "wasm32"))]
424 last_sync_duration_ms: Arc::new(AtomicU64::new(0)),
425 #[cfg(not(target_arch = "wasm32"))]
426 sync_sender: None,
427 #[cfg(not(target_arch = "wasm32"))]
428 sync_receiver: None,
429 recovery_report: RecoveryReport::default(),
430 #[cfg(target_arch = "wasm32")]
431 leader_election: std::cell::RefCell::new(None),
432 observability: super::observability::ObservabilityManager::new(),
433 #[cfg(feature = "telemetry")]
434 metrics: None,
435 }
436 }
437
438 #[cfg(target_arch = "wasm32")]
439 pub async fn new(db_name: &str) -> Result<Self, DatabaseError> {
440 super::constructors::new_wasm(db_name).await
441 }
442
443 #[cfg(not(target_arch = "wasm32"))]
444 pub async fn new(db_name: &str) -> Result<Self, DatabaseError> {
445 log::info!("Creating BlockStorage for database: {}", db_name);
446
447 let (allocated_blocks, next_block_id) = {
449 #[cfg(feature = "fs_persist")]
450 {
451 let mut allocated_blocks = HashSet::new();
453 let mut next_block_id: u64 = 1;
454
455 let base_path = std::env::var("ABSURDERSQL_FS_BASE")
456 .unwrap_or_else(|_| "./test_storage".to_string());
457 let mut alloc_path = std::path::PathBuf::from(base_path);
458 alloc_path.push(db_name);
459 alloc_path.push("allocations.json");
460
461 if let Ok(content) = std::fs::read_to_string(&alloc_path) {
462 if let Ok(alloc_data) = serde_json::from_str::<serde_json::Value>(&content) {
463 if let Some(allocated_array) = alloc_data["allocated"].as_array() {
464 for block_id_val in allocated_array {
465 if let Some(block_id) = block_id_val.as_u64() {
466 allocated_blocks.insert(block_id);
467 }
468 }
469 next_block_id = allocated_blocks.iter().max().copied().unwrap_or(0) + 1;
470 }
471 }
472 }
473
474 (allocated_blocks, next_block_id)
475 }
476
477 #[cfg(not(feature = "fs_persist"))]
478 {
479 (HashSet::new(), 1)
481 }
482 };
483
484 let checksums_init: HashMap<u64, u64> = {
486 #[cfg(feature = "fs_persist")]
487 {
488 let mut map = HashMap::new();
489 let base_path = std::env::var("ABSURDERSQL_FS_BASE")
490 .unwrap_or_else(|_| "./test_storage".to_string());
491 let mut meta_path = std::path::PathBuf::from(base_path);
492 meta_path.push(db_name);
493 meta_path.push("metadata.json");
494
495 if let Ok(content) = std::fs::read_to_string(&meta_path) {
496 if let Ok(meta_data) = serde_json::from_str::<serde_json::Value>(&content) {
497 if let Some(entries) = meta_data["entries"].as_array() {
498 for entry in entries {
499 if let Some(arr) = entry.as_array() {
500 if let (Some(block_id), Some(obj)) = (
501 arr.first().and_then(|v| v.as_u64()),
502 arr.get(1).and_then(|v| v.as_object()),
503 ) {
504 if let Some(checksum) =
505 obj.get("checksum").and_then(|v| v.as_u64())
506 {
507 map.insert(block_id, checksum);
508 }
509 }
510 }
511 }
512 }
513 }
514 }
515 map
516 }
517
518 #[cfg(not(feature = "fs_persist"))]
519 {
520 #[allow(unused_mut)]
522 let mut map = HashMap::new();
523 #[cfg(any(test, debug_assertions))]
524 GLOBAL_METADATA_TEST.with(|meta| {
525 #[cfg(target_arch = "wasm32")]
526 let meta_map = meta.borrow_mut();
527 #[cfg(not(target_arch = "wasm32"))]
528 let meta_map = meta.lock();
529 if let Some(db_meta) = meta_map.get(db_name) {
530 for (block_id, metadata) in db_meta.iter() {
531 map.insert(*block_id, metadata.checksum);
532 }
533 }
534 });
535 map
536 }
537 };
538
539 let checksum_algos_init: HashMap<u64, ChecksumAlgorithm> = {
540 #[cfg(feature = "fs_persist")]
541 {
542 let mut map = HashMap::new();
543 let base_path = std::env::var("ABSURDERSQL_FS_BASE")
544 .unwrap_or_else(|_| "./test_storage".to_string());
545 let mut meta_path = std::path::PathBuf::from(base_path);
546 meta_path.push(db_name);
547 meta_path.push("metadata.json");
548
549 if let Ok(content) = std::fs::read_to_string(&meta_path) {
550 if let Ok(meta_data) = serde_json::from_str::<serde_json::Value>(&content) {
551 if let Some(entries) = meta_data["entries"].as_array() {
552 for entry in entries {
553 if let Some(arr) = entry.as_array() {
554 if let (Some(block_id), Some(obj)) = (
555 arr.first().and_then(|v| v.as_u64()),
556 arr.get(1).and_then(|v| v.as_object()),
557 ) {
558 let algo = obj
559 .get("algo")
560 .and_then(|v| v.as_str())
561 .and_then(|s| match s {
562 "CRC32" => Some(ChecksumAlgorithm::CRC32),
563 "FastHash" => Some(ChecksumAlgorithm::FastHash),
564 _ => None,
565 })
566 .unwrap_or(ChecksumAlgorithm::FastHash);
567 map.insert(block_id, algo);
568 }
569 }
570 }
571 }
572 }
573 }
574 map
575 }
576
577 #[cfg(not(feature = "fs_persist"))]
578 {
579 #[allow(unused_mut)]
581 let mut map = HashMap::new();
582 #[cfg(any(test, debug_assertions))]
583 GLOBAL_METADATA_TEST.with(|meta| {
584 #[cfg(target_arch = "wasm32")]
585 let meta_map = meta.borrow_mut();
586 #[cfg(not(target_arch = "wasm32"))]
587 let meta_map = meta.lock();
588 if let Some(db_meta) = meta_map.get(db_name) {
589 for (block_id, metadata) in db_meta.iter() {
590 map.insert(*block_id, metadata.algo);
591 }
592 }
593 });
594 map
595 }
596 };
597
598 #[cfg(feature = "fs_persist")]
600 let checksum_algo_default = match std::env::var("DATASYNC_CHECKSUM_ALGO").ok().as_deref() {
601 Some("CRC32") => ChecksumAlgorithm::CRC32,
602 _ => ChecksumAlgorithm::FastHash,
603 };
604 #[cfg(not(feature = "fs_persist"))]
605 let checksum_algo_default = ChecksumAlgorithm::FastHash;
606
607 let deallocated_blocks_init: HashSet<u64> = {
609 #[cfg(feature = "fs_persist")]
610 {
611 let mut set = HashSet::new();
612 let base_path = std::env::var("ABSURDERSQL_FS_BASE")
613 .unwrap_or_else(|_| "./test_storage".to_string());
614 let mut path = std::path::PathBuf::from(base_path);
615 path.push(db_name);
616 let mut dealloc_path = path.clone();
617 dealloc_path.push("deallocated.json");
618 if let Ok(content) = std::fs::read_to_string(&dealloc_path) {
619 if let Ok(dealloc_data) = serde_json::from_str::<serde_json::Value>(&content) {
620 if let Some(tombstones_array) = dealloc_data["tombstones"].as_array() {
621 for tombstone_val in tombstones_array {
622 if let Some(block_id) = tombstone_val.as_u64() {
623 set.insert(block_id);
624 }
625 }
626 }
627 }
628 }
629 set
630 }
631 #[cfg(not(feature = "fs_persist"))]
632 {
633 HashSet::new()
634 }
635 };
636
637 Ok(BlockStorage {
638 db_name: db_name.to_string(),
639 cache: Mutex::new(HashMap::new()),
640 lru_order: Mutex::new(VecDeque::new()),
641 capacity: 1000,
642 checksum_manager: ChecksumManager::with_data(
643 checksums_init,
644 checksum_algos_init,
645 checksum_algo_default,
646 ),
647 dirty_blocks: Arc::new(Mutex::new(HashMap::new())),
648 allocated_blocks: Mutex::new(allocated_blocks),
649 next_block_id: AtomicU64::new(next_block_id),
650 deallocated_blocks: Mutex::new(deallocated_blocks_init),
651 policy: Mutex::new(None),
652 auto_sync_interval: Mutex::new(None),
653 #[cfg(not(target_arch = "wasm32"))]
654 last_auto_sync: Instant::now(),
655 #[cfg(not(target_arch = "wasm32"))]
656 auto_sync_stop: None,
657 #[cfg(not(target_arch = "wasm32"))]
658 auto_sync_thread: None,
659 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
660 base_dir: std::path::PathBuf::from(
661 std::env::var("ABSURDERSQL_FS_BASE")
662 .unwrap_or_else(|_| "./test_storage".to_string()),
663 ),
664 #[cfg(not(target_arch = "wasm32"))]
665 debounce_thread: None,
666 #[cfg(not(target_arch = "wasm32"))]
667 tokio_timer_task: None,
668 #[cfg(not(target_arch = "wasm32"))]
669 tokio_debounce_task: None,
670 #[cfg(not(target_arch = "wasm32"))]
671 last_write_ms: Arc::new(AtomicU64::new(0)),
672 #[cfg(not(target_arch = "wasm32"))]
673 threshold_hit: Arc::new(AtomicBool::new(false)),
674 #[cfg(not(target_arch = "wasm32"))]
675 sync_count: Arc::new(AtomicU64::new(0)),
676 #[cfg(not(target_arch = "wasm32"))]
677 timer_sync_count: Arc::new(AtomicU64::new(0)),
678 #[cfg(not(target_arch = "wasm32"))]
679 debounce_sync_count: Arc::new(AtomicU64::new(0)),
680 #[cfg(not(target_arch = "wasm32"))]
681 last_sync_duration_ms: Arc::new(AtomicU64::new(0)),
682 #[cfg(not(target_arch = "wasm32"))]
683 sync_sender: None,
684 #[cfg(not(target_arch = "wasm32"))]
685 sync_receiver: None,
686 recovery_report: RecoveryReport::default(),
687 #[cfg(target_arch = "wasm32")]
688 leader_election: std::cell::RefCell::new(None),
689 observability: super::observability::ObservabilityManager::new(),
690 #[cfg(feature = "telemetry")]
691 metrics: None,
692 })
693 }
694
695 pub async fn new_with_capacity(db_name: &str, capacity: usize) -> Result<Self, DatabaseError> {
696 let mut s = Self::new(db_name).await?;
697 s.capacity = capacity;
698 Ok(s)
699 }
700
701 pub async fn new_with_recovery_options(
702 db_name: &str,
703 recovery_opts: RecoveryOptions,
704 ) -> Result<Self, DatabaseError> {
705 let mut storage = Self::new(db_name).await?;
706
707 storage.perform_startup_recovery(recovery_opts).await?;
708
709 Ok(storage)
710 }
711
712 pub fn get_recovery_report(&self) -> &RecoveryReport {
713 &self.recovery_report
714 }
715
716 async fn perform_startup_recovery(
717 &mut self,
718 opts: RecoveryOptions,
719 ) -> Result<(), DatabaseError> {
720 super::recovery::perform_startup_recovery(self, opts).await
721 }
722
723 pub(super) async fn get_blocks_for_verification(
724 &self,
725 mode: &RecoveryMode,
726 ) -> Result<Vec<u64>, DatabaseError> {
727 let all_blocks: Vec<u64> = lock_mutex!(self.allocated_blocks).iter().copied().collect();
728
729 match mode {
730 RecoveryMode::Full => Ok(all_blocks),
731 RecoveryMode::Sample { count } => {
732 let sample_count = (*count).min(all_blocks.len());
733 let mut sampled = all_blocks;
734 sampled.sort_unstable(); sampled.truncate(sample_count);
736 Ok(sampled)
737 }
738 RecoveryMode::Skip => Ok(Vec::new()),
739 }
740 }
741
742 pub(super) async fn verify_block_integrity(
743 &mut self,
744 block_id: u64,
745 ) -> Result<bool, DatabaseError> {
746 let data = match self.read_block_from_storage(block_id).await {
748 Ok(data) => data,
749 Err(_) => {
750 log::warn!(
751 "Could not read block {} for integrity verification",
752 block_id
753 );
754 return Ok(false);
755 }
756 };
757
758 match self.verify_against_stored_checksum(block_id, &data) {
760 Ok(()) => Ok(true),
761 Err(e) => {
762 log::warn!(
763 "Block {} failed checksum verification: {}",
764 block_id,
765 e.message
766 );
767 Ok(false)
768 }
769 }
770 }
771
772 async fn read_block_from_storage(&mut self, block_id: u64) -> Result<Vec<u8>, DatabaseError> {
773 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
775 {
776 let mut blocks_dir = self.base_dir.clone();
777 blocks_dir.push(&self.db_name);
778 blocks_dir.push("blocks");
779 let block_file = blocks_dir.join(format!("block_{}.bin", block_id));
780
781 if let Ok(data) = std::fs::read(&block_file) {
782 if data.len() == BLOCK_SIZE {
783 return Ok(data);
784 }
785 }
786 }
787
788 #[cfg(all(
790 not(target_arch = "wasm32"),
791 any(test, debug_assertions),
792 not(feature = "fs_persist")
793 ))]
794 {
795 let mut found_data = None;
796 vfs_sync::with_global_storage(|storage| {
797 #[cfg(target_arch = "wasm32")]
798 let storage_map = storage;
799 #[cfg(not(target_arch = "wasm32"))]
800 let storage_map = storage.borrow();
801 if let Some(db_storage) = storage_map.get(&self.db_name) {
802 if let Some(data) = db_storage.get(&block_id) {
803 found_data = Some(data.clone());
804 }
805 }
806 });
807 if let Some(data) = found_data {
808 return Ok(data);
809 }
810 }
811
812 #[cfg(target_arch = "wasm32")]
814 {
815 let mut found_data = None;
816 vfs_sync::with_global_storage(|storage_map| {
817 if let Some(db_storage) = storage_map.borrow().get(&self.db_name) {
818 if let Some(data) = db_storage.get(&block_id) {
819 found_data = Some(data.clone());
820 }
821 }
822 });
823 if let Some(data) = found_data {
824 return Ok(data);
825 }
826 }
827
828 Err(DatabaseError::new(
829 "BLOCK_NOT_FOUND",
830 &format!("Block {} not found in storage", block_id),
831 ))
832 }
833
834 pub(super) async fn repair_corrupted_block(
835 &mut self,
836 block_id: u64,
837 ) -> Result<bool, DatabaseError> {
838 log::info!("Attempting to repair corrupted block {}", block_id);
839
840 lock_mutex!(self.cache).remove(&block_id);
845
846 self.checksum_manager.remove_checksum(block_id);
848
849 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
851 {
852 let mut blocks_dir = self.base_dir.clone();
853 blocks_dir.push(&self.db_name);
854 blocks_dir.push("blocks");
855 let block_file = blocks_dir.join(format!("block_{}.bin", block_id));
856 let _ = std::fs::remove_file(&block_file);
857 }
858
859 #[cfg(all(
861 not(target_arch = "wasm32"),
862 any(test, debug_assertions),
863 not(feature = "fs_persist")
864 ))]
865 {
866 vfs_sync::with_global_storage(|storage| {
867 let mut storage_map = storage.borrow_mut();
868 if let Some(db_storage) = storage_map.get_mut(&self.db_name) {
869 db_storage.remove(&block_id);
870 }
871 });
872 }
873
874 #[cfg(target_arch = "wasm32")]
876 {
877 vfs_sync::with_global_storage(|storage_map| {
878 if let Some(db_storage) = storage_map.borrow_mut().get_mut(&self.db_name) {
879 db_storage.remove(&block_id);
880 }
881 });
882 }
883
884 log::info!(
885 "Corrupted block {} has been removed (repair completed)",
886 block_id
887 );
888 Ok(true)
889 }
890
891 pub(super) fn touch_lru(&self, block_id: u64) {
892 let mut lru = lock_mutex!(self.lru_order);
894 if let Some(pos) = lru.iter().position(|&id| id == block_id) {
895 lru.remove(pos);
896 }
897 lru.push_back(block_id);
899 }
900
901 pub(super) fn evict_if_needed(&self) {
902 loop {
904 let victim_opt = {
906 let cache_guard = lock_mutex!(self.cache);
907 if cache_guard.len() <= self.capacity {
908 break; }
910
911 let dirty_guard = lock_mutex!(self.dirty_blocks);
913 let mut lru_guard = lock_mutex!(self.lru_order);
914
915 let victim_pos = lru_guard
916 .iter()
917 .position(|id| !dirty_guard.contains_key(id));
918
919 victim_pos.map(|pos| lru_guard.remove(pos).expect("valid pos"))
920 };
922
923 match victim_opt {
924 Some(victim) => {
925 lock_mutex!(self.cache).remove(&victim);
927 }
928 None => {
929 break;
931 }
932 }
933 }
934 }
935
936 #[inline]
937 #[cfg(target_arch = "wasm32")]
938 pub fn now_millis() -> u64 {
939 Date::now() as u64
941 }
942
943 #[inline]
944 #[cfg(not(target_arch = "wasm32"))]
945 pub fn now_millis() -> u64 {
946 let now = SystemTime::now()
947 .duration_since(UNIX_EPOCH)
948 .unwrap_or_else(|_| Duration::from_millis(0));
949 now.as_millis() as u64
950 }
951
952 pub(super) fn verify_against_stored_checksum(
953 &self,
954 block_id: u64,
955 data: &[u8],
956 ) -> Result<(), DatabaseError> {
957 self.checksum_manager.validate_checksum(block_id, data)
958 }
959
960 pub fn read_block_sync(&self, block_id: u64) -> Result<Vec<u8>, DatabaseError> {
962 super::io_operations::read_block_sync_impl(self, block_id)
964 }
965
966 pub async fn read_block(&self, block_id: u64) -> Result<Vec<u8>, DatabaseError> {
967 self.read_block_sync(block_id)
969 }
970
971 #[cfg(target_arch = "wasm32")]
973 pub fn write_block_sync(&self, block_id: u64, data: Vec<u8>) -> Result<(), DatabaseError> {
974 super::io_operations::write_block_sync_impl(self, block_id, data)
975 }
976
977 #[cfg(not(target_arch = "wasm32"))]
978 pub fn write_block_sync(&mut self, block_id: u64, data: Vec<u8>) -> Result<(), DatabaseError> {
979 super::io_operations::write_block_sync_impl(self, block_id, data)
980 }
981
982 #[cfg(target_arch = "wasm32")]
983 pub async fn write_block(&self, block_id: u64, data: Vec<u8>) -> Result<(), DatabaseError> {
984 self.write_block_sync(block_id, data)
985 }
986
987 #[cfg(not(target_arch = "wasm32"))]
988 pub async fn write_block(&mut self, block_id: u64, data: Vec<u8>) -> Result<(), DatabaseError> {
989 self.write_block_sync(block_id, data)?;
990
991 if self.threshold_hit.load(std::sync::atomic::Ordering::SeqCst) {
993 let has_debounce = lock_mutex!(self.policy)
994 .as_ref()
995 .and_then(|p| p.debounce_ms)
996 .is_some();
997 if !has_debounce {
998 log::debug!("Threshold hit without debounce: performing inline sync");
999 self.sync_implementation()?;
1000 self.threshold_hit
1001 .store(false, std::sync::atomic::Ordering::SeqCst);
1002 }
1003 }
1004
1005 Ok(())
1006 }
1007
1008 #[cfg(target_arch = "wasm32")]
1010 pub fn write_blocks_sync(&self, items: Vec<(u64, Vec<u8>)>) -> Result<(), DatabaseError> {
1011 self.maybe_auto_sync();
1012 for (block_id, data) in items {
1013 self.write_block_sync(block_id, data)?;
1014 }
1015 Ok(())
1016 }
1017
1018 #[cfg(not(target_arch = "wasm32"))]
1019 pub fn write_blocks_sync(&mut self, items: Vec<(u64, Vec<u8>)>) -> Result<(), DatabaseError> {
1020 self.maybe_auto_sync();
1021 for (block_id, data) in items {
1022 self.write_block_sync(block_id, data)?;
1023 }
1024 Ok(())
1025 }
1026
1027 #[cfg(target_arch = "wasm32")]
1029 pub async fn write_blocks(&self, items: Vec<(u64, Vec<u8>)>) -> Result<(), DatabaseError> {
1030 self.write_blocks_sync(items)
1031 }
1032
1033 #[cfg(not(target_arch = "wasm32"))]
1034 pub async fn write_blocks(&mut self, items: Vec<(u64, Vec<u8>)>) -> Result<(), DatabaseError> {
1035 self.write_blocks_sync(items)
1036 }
1037
1038 pub fn read_blocks_sync(&self, block_ids: &[u64]) -> Result<Vec<Vec<u8>>, DatabaseError> {
1040 self.maybe_auto_sync();
1041 let mut results = Vec::with_capacity(block_ids.len());
1042 for &id in block_ids {
1043 results.push(self.read_block_sync(id)?);
1044 }
1045 Ok(results)
1046 }
1047
1048 pub async fn read_blocks(&self, block_ids: &[u64]) -> Result<Vec<Vec<u8>>, DatabaseError> {
1050 self.read_blocks_sync(block_ids)
1051 }
1052
1053 pub fn get_block_checksum(&self, block_id: u64) -> Option<u32> {
1055 self.checksum_manager
1056 .get_checksum(block_id)
1057 .map(|checksum| checksum as u32)
1058 }
1059
1060 #[cfg(target_arch = "wasm32")]
1062 pub fn get_commit_marker(&self) -> u64 {
1063 vfs_sync::with_global_commit_marker(|cm| {
1064 cm.borrow().get(&self.db_name).copied().unwrap_or(0)
1065 })
1066 }
1067
1068 #[cfg(target_arch = "wasm32")]
1070 pub fn has_any_blocks(&self) -> bool {
1071 vfs_sync::with_global_storage(|storage_map| {
1072 storage_map
1073 .borrow()
1074 .get(&self.db_name)
1075 .map_or(false, |blocks| !blocks.is_empty())
1076 })
1077 }
1078
1079 pub async fn verify_block_checksum(&mut self, block_id: u64) -> Result<(), DatabaseError> {
1080 if let Some(bytes) = lock_mutex!(self.cache).get(&block_id).cloned() {
1082 return self.verify_against_stored_checksum(block_id, &bytes);
1083 }
1084 let data = self.read_block_sync(block_id)?;
1086 self.verify_against_stored_checksum(block_id, &data)
1087 }
1088
1089 #[allow(unused_mut)]
1091 pub fn get_block_metadata_for_testing(&mut self) -> HashMap<u64, (u64, u32, u64)> {
1092 #[cfg(target_arch = "wasm32")]
1094 {
1095 let mut out = HashMap::new();
1096 vfs_sync::with_global_metadata(|meta_map| {
1097 if let Some(db_meta) = meta_map.borrow().get(&self.db_name) {
1098 for (bid, m) in db_meta.iter() {
1099 out.insert(*bid, (m.checksum, m.version, m.last_modified_ms));
1100 }
1101 }
1102 });
1103 out
1104 }
1105 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
1106 {
1107 use super::fs_persist::FsMeta;
1108 use std::io::Read;
1109 let mut out = HashMap::new();
1110 let base: PathBuf = self.base_dir.clone();
1111 let mut db_dir = base.clone();
1112 db_dir.push(&self.db_name);
1113 let mut meta_path = db_dir.clone();
1114 meta_path.push("metadata.json");
1115 if let Ok(mut f) = std::fs::File::open(&meta_path) {
1116 let mut s = String::new();
1117 if f.read_to_string(&mut s).is_ok() {
1118 if let Ok(meta) = serde_json::from_str::<FsMeta>(&s) {
1119 for (block_id, metadata) in meta.entries {
1120 out.insert(
1121 block_id,
1122 (
1123 metadata.checksum,
1124 metadata.version,
1125 metadata.last_modified_ms,
1126 ),
1127 );
1128 }
1129 }
1130 }
1131 }
1132 out
1133 }
1134 #[cfg(all(
1135 not(target_arch = "wasm32"),
1136 any(test, debug_assertions),
1137 not(feature = "fs_persist")
1138 ))]
1139 {
1140 let mut out = HashMap::new();
1141 GLOBAL_METADATA_TEST.with(|meta| {
1142 #[cfg(target_arch = "wasm32")]
1143 let meta_map = meta.borrow_mut();
1144 #[cfg(not(target_arch = "wasm32"))]
1145 let meta_map = meta.lock();
1146 if let Some(db_meta) = meta_map.get(&self.db_name) {
1147 for (bid, m) in db_meta.iter() {
1148 out.insert(*bid, (m.checksum, m.version, m.last_modified_ms));
1149 }
1150 }
1151 });
1152 out
1153 }
1154 }
1155
1156 pub fn set_block_checksum_for_testing(&mut self, block_id: u64, checksum: u64) {
1158 self.checksum_manager
1159 .set_checksum_for_testing(block_id, checksum);
1160 }
1161
1162 #[cfg(not(target_arch = "wasm32"))]
1164 pub(super) fn get_dirty_blocks(&self) -> &Arc<Mutex<HashMap<u64, Vec<u8>>>> {
1165 &self.dirty_blocks
1166 }
1167
1168 #[cfg(target_arch = "wasm32")]
1169 #[allow(invalid_reference_casting)]
1170 pub async fn sync(&self) -> Result<(), DatabaseError> {
1171 let self_mut = unsafe { &mut *(self as *const Self as *mut Self) };
1173 let result = self_mut.sync_implementation();
1174 wasm_bindgen_futures::JsFuture::from(js_sys::Promise::resolve(
1176 &wasm_bindgen::JsValue::UNDEFINED,
1177 ))
1178 .await
1179 .ok();
1180 result
1181 }
1182
1183 #[cfg(not(target_arch = "wasm32"))]
1184 pub async fn sync(&mut self) -> Result<(), DatabaseError> {
1185 self.sync_implementation()
1186 }
1187
1188 #[cfg(target_arch = "wasm32")]
1190 #[allow(invalid_reference_casting)]
1191 pub fn sync_now(&self) -> Result<(), DatabaseError> {
1192 let self_mut = unsafe { &mut *(self as *const Self as *mut Self) };
1195 self_mut.sync_implementation()
1196 }
1197
1198 #[cfg(not(target_arch = "wasm32"))]
1199 pub fn sync_now(&mut self) -> Result<(), DatabaseError> {
1200 self.sync_implementation()
1201 }
1202
1203 fn sync_implementation(&mut self) -> Result<(), DatabaseError> {
1205 super::sync_operations::sync_implementation_impl(self)
1206 }
1207
1208 #[cfg(target_arch = "wasm32")]
1211 pub fn sync_blocks_only(&mut self) -> Result<(), DatabaseError> {
1212 super::wasm_vfs_sync::sync_blocks_only(self)
1213 }
1214
1215 #[cfg(target_arch = "wasm32")]
1217 pub async fn sync_async(&self) -> Result<(), DatabaseError> {
1218 super::wasm_indexeddb::sync_async(self).await
1219 }
1220
1221 #[cfg(target_arch = "wasm32")]
1224 #[allow(invalid_reference_casting)]
1225 pub fn drain_and_shutdown(&self) {
1226 let self_mut = unsafe { &mut *(self as *const Self as *mut Self) };
1228 if let Err(e) = self_mut.sync_now() {
1229 log::error!("drain_and_shutdown: sync_now failed: {}", e.message);
1230 }
1231 *lock_mutex!(self.auto_sync_interval) = None;
1232 }
1233
1234 #[cfg(not(target_arch = "wasm32"))]
1235 pub fn drain_and_shutdown(&mut self) {
1236 if let Err(e) = self.sync_now() {
1237 log::error!("drain_and_shutdown: sync_now failed: {}", e.message);
1238 }
1239 *lock_mutex!(self.auto_sync_interval) = None;
1240 #[cfg(not(target_arch = "wasm32"))]
1241 {
1242 if let Some(stop) = &self.auto_sync_stop {
1243 stop.store(true, Ordering::SeqCst);
1244 }
1245 if let Some(handle) = self.auto_sync_thread.take() {
1246 let _ = handle.join();
1247 }
1248 if let Some(handle) = self.debounce_thread.take() {
1249 let _ = handle.join();
1250 }
1251 if let Some(task) = self.tokio_timer_task.take() {
1252 task.abort();
1253 }
1254 if let Some(task) = self.tokio_debounce_task.take() {
1255 task.abort();
1256 }
1257 self.auto_sync_stop = None;
1258 self.threshold_hit.store(false, Ordering::SeqCst);
1259 }
1260 }
1261
1262 pub fn clear_cache(&self) {
1263 lock_mutex!(self.cache).clear();
1264 lock_mutex!(self.lru_order).clear();
1265 }
1266
1267 pub async fn on_database_import(&mut self) -> Result<(), DatabaseError> {
1287 log::info!(
1288 "Clearing cache for database '{}' after import",
1289 self.db_name
1290 );
1291
1292 self.clear_cache();
1294
1295 lock_mutex!(self.dirty_blocks).clear();
1297
1298 self.checksum_manager.clear_checksums();
1300
1301 #[cfg(target_arch = "wasm32")]
1303 {
1304 use super::vfs_sync::with_global_allocation_map;
1305 *lock_mutex!(self.allocated_blocks) = with_global_allocation_map(|gam| {
1306 gam.borrow()
1307 .get(&self.db_name)
1308 .cloned()
1309 .unwrap_or_else(std::collections::HashSet::new)
1310 });
1311 log::debug!(
1312 "Reloaded {} allocated blocks from global allocation map",
1313 lock_mutex!(self.allocated_blocks).len()
1314 );
1315
1316 log::debug!("Checksum data will be reloaded from metadata on next verification");
1318 }
1319
1320 #[cfg(not(target_arch = "wasm32"))]
1321 {
1322 #[cfg(feature = "fs_persist")]
1323 {
1324 let mut alloc_path = self.base_dir.clone();
1326 alloc_path.push(&self.db_name);
1327 alloc_path.push("allocations.json");
1328
1329 if let Ok(content) = std::fs::read_to_string(&alloc_path) {
1330 if let Ok(alloc_data) = serde_json::from_str::<serde_json::Value>(&content) {
1331 if let Some(allocated_array) = alloc_data["allocated"].as_array() {
1332 lock_mutex!(self.allocated_blocks).clear();
1333 for block_id_val in allocated_array {
1334 if let Some(block_id) = block_id_val.as_u64() {
1335 lock_mutex!(self.allocated_blocks).insert(block_id);
1336 }
1337 }
1338 log::debug!(
1339 "Reloaded {} allocated blocks from filesystem",
1340 lock_mutex!(self.allocated_blocks).len()
1341 );
1342 }
1343 }
1344 }
1345
1346 let mut meta_path = self.base_dir.clone();
1348 meta_path.push(&self.db_name);
1349 meta_path.push("metadata.json");
1350
1351 if let Ok(content) = std::fs::read_to_string(&meta_path) {
1352 if let Ok(meta_data) = serde_json::from_str::<serde_json::Value>(&content) {
1353 if let Some(entries) = meta_data["entries"].as_array() {
1354 let mut new_checksums = HashMap::new();
1355 let mut new_algos = HashMap::new();
1356
1357 for entry in entries {
1358 if let (Some(block_id), Some(checksum), Some(algo_str)) = (
1359 entry[0].as_u64(),
1360 entry[1]["checksum"].as_u64(),
1361 entry[1]["algo"].as_str(),
1362 ) {
1363 new_checksums.insert(block_id, checksum);
1364
1365 let algo = match algo_str {
1366 "CRC32" => super::metadata::ChecksumAlgorithm::CRC32,
1367 _ => super::metadata::ChecksumAlgorithm::FastHash,
1368 };
1369 new_algos.insert(block_id, algo);
1370 }
1371 }
1372
1373 self.checksum_manager
1374 .replace_all(new_checksums.clone(), new_algos);
1375 log::debug!(
1376 "Reloaded {} checksums from filesystem metadata",
1377 new_checksums.len()
1378 );
1379 }
1380 }
1381 } else {
1382 log::debug!("No metadata file found, checksums will be empty after import");
1383 }
1384 }
1385
1386 #[cfg(not(feature = "fs_persist"))]
1387 {
1388 use super::vfs_sync::with_global_allocation_map;
1390
1391 *lock_mutex!(self.allocated_blocks) = with_global_allocation_map(|gam| {
1392 #[cfg(target_arch = "wasm32")]
1393 let map = gam;
1394 #[cfg(not(target_arch = "wasm32"))]
1395 let map = gam.borrow();
1396 map.get(&self.db_name)
1397 .cloned()
1398 .unwrap_or_else(std::collections::HashSet::new)
1399 });
1400 log::debug!(
1401 "Reloaded {} allocated blocks from global allocation map (native test)",
1402 lock_mutex!(self.allocated_blocks).len()
1403 );
1404
1405 log::debug!("Checksum data will be reloaded from metadata on next verification");
1407 }
1408 }
1409
1410 log::info!(
1411 "Cache and allocation state refreshed for '{}'",
1412 self.db_name
1413 );
1414
1415 Ok(())
1416 }
1417
1418 #[cfg(target_arch = "wasm32")]
1420 pub fn reload_cache_from_global_storage(&self) {
1421 use crate::storage::vfs_sync::{with_global_metadata, with_global_storage};
1422
1423 log::info!("[RELOAD] Starting cache reload for: {}", self.db_name);
1424
1425 #[cfg(target_arch = "wasm32")]
1426 web_sys::console::log_1(
1427 &format!("[RELOAD] BlockStorage.db_name = {}", self.db_name).into(),
1428 );
1429
1430 let fresh_cache = with_global_storage(|storage_map| {
1431 if let Some(db_storage) = storage_map.borrow().get(&self.db_name) {
1432 log::info!(
1433 "[RELOAD] Found {} blocks in GLOBAL_STORAGE",
1434 db_storage.len()
1435 );
1436 db_storage.clone()
1437 } else {
1438 log::warn!(
1439 "[RELOAD] No blocks found in GLOBAL_STORAGE for {}",
1440 self.db_name
1441 );
1442 std::collections::HashMap::new()
1443 }
1444 });
1445
1446 let block_count = fresh_cache.len();
1447 log::info!(
1448 "[RELOAD] Loading {} blocks into cache and marking as dirty",
1449 block_count
1450 );
1451
1452 with_global_metadata(|metadata_map| {
1455 if let Some(db_metadata) = metadata_map.borrow().get(&self.db_name) {
1456 log::info!("[RELOAD] Found {} metadata entries", db_metadata.len());
1457 self.checksum_manager.clear_checksums();
1459
1460 let mut new_checksums = std::collections::HashMap::new();
1462 let mut new_algos = std::collections::HashMap::new();
1463 for (block_id, meta) in db_metadata {
1464 new_checksums.insert(*block_id, meta.checksum);
1465 new_algos.insert(*block_id, meta.algo);
1466 }
1467 self.checksum_manager.replace_all(new_checksums, new_algos);
1468 } else {
1469 log::warn!("[RELOAD] No metadata found for {}", self.db_name);
1470 self.checksum_manager.clear_checksums();
1472 }
1473 });
1474
1475 let old_lru = {
1477 let mut lru = lock_mutex!(self.lru_order);
1478 std::mem::replace(&mut *lru, std::collections::VecDeque::new())
1479 };
1480 lock_mutex!(self.cache).clear();
1481
1482 for (block_id, block_data) in fresh_cache {
1484 lock_mutex!(self.cache).insert(block_id, block_data);
1485 }
1486
1487 log::info!(
1488 "[RELOAD] Cache reloaded with {} blocks",
1489 lock_mutex!(self.cache).len()
1490 );
1491
1492 for block_id in old_lru {
1494 if lock_mutex!(self.cache).contains_key(&block_id) {
1495 lock_mutex!(self.lru_order).push_back(block_id);
1496 }
1497 }
1498
1499 let block_ids: Vec<u64> = lock_mutex!(self.cache).keys().copied().collect();
1501 for block_id in block_ids {
1502 if !lock_mutex!(self.lru_order).contains(&block_id) {
1503 lock_mutex!(self.lru_order).push_back(block_id);
1504 }
1505 }
1506
1507 log::info!(
1508 "[RELOAD] Reload complete: {} blocks in cache, {} dirty",
1509 lock_mutex!(self.cache).len(),
1510 lock_mutex!(self.dirty_blocks).len()
1511 );
1512
1513 #[cfg(target_arch = "wasm32")]
1515 {
1516 if let Some(block_0) = lock_mutex!(self.cache).get(&0) {
1517 let is_valid = block_0.len() >= 16 && &block_0[0..16] == b"SQLite format 3\0";
1518 web_sys::console::log_1(
1519 &format!(
1520 "[RELOAD] Block 0 in cache: {} bytes, valid SQLite header: {}",
1521 block_0.len(),
1522 is_valid
1523 )
1524 .into(),
1525 );
1526 if block_0.len() >= 16 {
1527 web_sys::console::log_1(
1528 &format!("[RELOAD] Block 0 header [0..16]: {:02x?}", &block_0[0..16])
1529 .into(),
1530 );
1531 }
1532
1533 if block_0.len() >= 28 {
1535 let db_size_bytes = [block_0[24], block_0[25], block_0[26], block_0[27]];
1536 let db_size_pages = u32::from_be_bytes(db_size_bytes);
1537 web_sys::console::log_1(
1538 &format!(
1539 "[RELOAD] Database size field (offset 24-27): {} pages ({:02x?})",
1540 db_size_pages, db_size_bytes
1541 )
1542 .into(),
1543 );
1544 }
1545
1546 if block_0.len() >= 100 {
1548 web_sys::console::log_1(
1549 &format!(
1550 "[RELOAD] Full header dump [16-31]: {:02x?}",
1551 &block_0[16..32]
1552 )
1553 .into(),
1554 );
1555 web_sys::console::log_1(
1556 &format!(
1557 "[RELOAD] Full header dump [32-47]: {:02x?}",
1558 &block_0[32..48]
1559 )
1560 .into(),
1561 );
1562 web_sys::console::log_1(
1563 &format!(
1564 "[RELOAD] Full header dump [48-63]: {:02x?}",
1565 &block_0[48..64]
1566 )
1567 .into(),
1568 );
1569 web_sys::console::log_1(
1570 &format!(
1571 "[RELOAD] Full header dump [64-79]: {:02x?}",
1572 &block_0[64..80]
1573 )
1574 .into(),
1575 );
1576 web_sys::console::log_1(
1577 &format!(
1578 "[RELOAD] Full header dump [80-99]: {:02x?}",
1579 &block_0[80..100]
1580 )
1581 .into(),
1582 );
1583 }
1584 } else {
1585 web_sys::console::log_1(
1586 &format!("[RELOAD] ERROR: Block 0 NOT in cache after reload!").into(),
1587 );
1588 }
1589 }
1590 }
1591
1592 pub fn get_cache_size(&self) -> usize {
1593 lock_mutex!(self.cache).len()
1594 }
1595
1596 pub fn get_dirty_count(&self) -> usize {
1597 lock_mutex!(self.dirty_blocks).len()
1598 }
1599
1600 pub fn get_db_name(&self) -> &str {
1601 &self.db_name
1602 }
1603
1604 pub fn is_cached(&self, block_id: u64) -> bool {
1605 lock_mutex!(self.cache).contains_key(&block_id)
1606 }
1607
1608 pub async fn allocate_block(&mut self) -> Result<u64, DatabaseError> {
1610 super::allocation::allocate_block_impl(self).await
1611 }
1612
1613 pub async fn deallocate_block(&mut self, block_id: u64) -> Result<(), DatabaseError> {
1615 super::allocation::deallocate_block_impl(self, block_id).await
1616 }
1617
1618 pub fn get_allocated_count(&self) -> usize {
1620 lock_mutex!(self.allocated_blocks).len()
1621 }
1622
1623 #[cfg(target_arch = "wasm32")]
1627 pub async fn crash_simulation_sync(
1628 &mut self,
1629 blocks_written: bool,
1630 ) -> Result<(), DatabaseError> {
1631 log::info!(
1632 "CRASH SIMULATION: Starting crash simulation with blocks_written={}",
1633 blocks_written
1634 );
1635
1636 if blocks_written {
1637 let dirty_blocks = {
1642 let dirty = lock_mutex!(self.dirty_blocks);
1643 dirty.clone()
1644 };
1645
1646 if !dirty_blocks.is_empty() {
1647 log::info!(
1648 "CRASH SIMULATION: Writing {} blocks to IndexedDB before crash",
1649 dirty_blocks.len()
1650 );
1651
1652 let metadata_to_persist: Vec<(u64, u64)> = dirty_blocks
1654 .keys()
1655 .map(|&block_id| {
1656 let next_commit = self.get_commit_marker() + 1;
1657 (block_id, next_commit)
1658 })
1659 .collect();
1660
1661 log::debug!(
1662 "CRASH SIMULATION: About to call persist_to_indexeddb for {} blocks",
1663 dirty_blocks.len()
1664 );
1665
1666 super::wasm_indexeddb::persist_to_indexeddb(
1668 &self.db_name,
1669 dirty_blocks,
1670 metadata_to_persist,
1671 )
1672 .await?;
1673
1674 log::info!("CRASH SIMULATION: persist_to_indexeddb completed successfully");
1675 log::info!(
1676 "CRASH SIMULATION: Blocks written to IndexedDB, simulating crash before commit marker advance"
1677 );
1678
1679 lock_mutex!(self.dirty_blocks).clear();
1681
1682 return Ok(());
1686 } else {
1687 log::info!("CRASH SIMULATION: No dirty blocks to write");
1688 return Ok(());
1689 }
1690 } else {
1691 log::info!("CRASH SIMULATION: Simulating crash before blocks are written to IndexedDB");
1693
1694 return Ok(());
1696 }
1697 }
1698
1699 #[cfg(target_arch = "wasm32")]
1702 pub async fn crash_simulation_partial_sync(
1703 &mut self,
1704 blocks_to_write: &[u64],
1705 ) -> Result<(), DatabaseError> {
1706 log::info!(
1707 "CRASH SIMULATION: Starting partial crash simulation for {} blocks",
1708 blocks_to_write.len()
1709 );
1710
1711 let dirty_blocks = {
1712 let dirty = lock_mutex!(self.dirty_blocks);
1713 dirty.clone()
1714 };
1715
1716 let partial_blocks: std::collections::HashMap<u64, Vec<u8>> = dirty_blocks
1718 .into_iter()
1719 .filter(|(block_id, _)| blocks_to_write.contains(block_id))
1720 .collect();
1721
1722 if !partial_blocks.is_empty() {
1723 log::info!(
1724 "CRASH SIMULATION: Writing {} out of {} blocks before crash",
1725 partial_blocks.len(),
1726 blocks_to_write.len()
1727 );
1728
1729 let metadata_to_persist: Vec<(u64, u64)> = partial_blocks
1730 .keys()
1731 .map(|&block_id| {
1732 let next_commit = self.get_commit_marker() + 1;
1733 (block_id, next_commit)
1734 })
1735 .collect();
1736
1737 super::wasm_indexeddb::persist_to_indexeddb(
1739 &self.db_name,
1740 partial_blocks.clone(),
1741 metadata_to_persist,
1742 )
1743 .await?;
1744
1745 {
1747 let mut dirty = lock_mutex!(self.dirty_blocks);
1748 for block_id in partial_blocks.keys() {
1749 dirty.remove(block_id);
1750 }
1751 }
1752
1753 log::info!(
1754 "CRASH SIMULATION: Partial blocks written, simulating crash before commit marker advance"
1755 );
1756
1757 }
1759
1760 Ok(())
1761 }
1762
1763 #[cfg(target_arch = "wasm32")]
1767 pub async fn perform_crash_recovery(&mut self) -> Result<CrashRecoveryAction, DatabaseError> {
1768 log::info!(
1769 "CRASH RECOVERY: Starting crash recovery scan for database: {}",
1770 self.db_name
1771 );
1772
1773 let current_marker = self.get_commit_marker();
1775 log::info!("CRASH RECOVERY: Current commit marker: {}", current_marker);
1776
1777 let inconsistent_blocks = self.scan_for_inconsistent_blocks(current_marker).await?;
1780
1781 if inconsistent_blocks.is_empty() {
1782 log::info!("CRASH RECOVERY: No inconsistent blocks found, system is consistent");
1783 return Ok(CrashRecoveryAction::NoActionNeeded);
1784 }
1785
1786 log::info!(
1787 "CRASH RECOVERY: Found {} inconsistent blocks that need recovery",
1788 inconsistent_blocks.len()
1789 );
1790
1791 let recovery_action = self.determine_recovery_action(&inconsistent_blocks).await?;
1793
1794 match recovery_action {
1795 CrashRecoveryAction::Rollback => {
1796 log::info!("CRASH RECOVERY: Performing rollback of incomplete transaction");
1797 self.rollback_incomplete_transaction(&inconsistent_blocks)
1798 .await?;
1799 }
1800 CrashRecoveryAction::Finalize => {
1801 log::info!("CRASH RECOVERY: Performing finalization of complete transaction");
1802 self.finalize_complete_transaction(&inconsistent_blocks)
1803 .await?;
1804 }
1805 CrashRecoveryAction::NoActionNeeded => {
1806 }
1808 }
1809
1810 log::info!("CRASH RECOVERY: Recovery completed successfully");
1811 Ok(recovery_action)
1812 }
1813
1814 #[cfg(target_arch = "wasm32")]
1816 async fn scan_for_inconsistent_blocks(
1817 &self,
1818 commit_marker: u64,
1819 ) -> Result<Vec<(u64, u64)>, DatabaseError> {
1820 log::info!(
1821 "CRASH RECOVERY: Scanning for blocks with version > {}",
1822 commit_marker
1823 );
1824
1825 let mut inconsistent_blocks = Vec::new();
1828
1829 vfs_sync::with_global_metadata(|meta_map| {
1830 if let Some(db_meta) = meta_map.borrow().get(&self.db_name) {
1831 for (block_id, metadata) in db_meta.iter() {
1832 if metadata.version as u64 > commit_marker {
1833 log::info!(
1834 "CRASH RECOVERY: Found inconsistent block {} with version {} > marker {}",
1835 block_id,
1836 metadata.version,
1837 commit_marker
1838 );
1839 inconsistent_blocks.push((*block_id, metadata.version as u64));
1840 }
1841 }
1842 }
1843 });
1844
1845 Ok(inconsistent_blocks)
1846 }
1847
1848 #[cfg(target_arch = "wasm32")]
1850 async fn determine_recovery_action(
1851 &self,
1852 inconsistent_blocks: &[(u64, u64)],
1853 ) -> Result<CrashRecoveryAction, DatabaseError> {
1854 let expected_next_commit = self.get_commit_marker() + 1;
1859 let all_same_version = inconsistent_blocks
1860 .iter()
1861 .all(|(_, version)| *version == expected_next_commit);
1862
1863 if all_same_version && !inconsistent_blocks.is_empty() {
1864 log::info!(
1865 "CRASH RECOVERY: All inconsistent blocks have expected version {}, finalizing transaction",
1866 expected_next_commit
1867 );
1868 Ok(CrashRecoveryAction::Finalize)
1869 } else {
1870 log::info!(
1871 "CRASH RECOVERY: Inconsistent block versions detected, rolling back transaction"
1872 );
1873 Ok(CrashRecoveryAction::Rollback)
1874 }
1875 }
1876
1877 #[cfg(target_arch = "wasm32")]
1879 async fn rollback_incomplete_transaction(
1880 &mut self,
1881 inconsistent_blocks: &[(u64, u64)],
1882 ) -> Result<(), DatabaseError> {
1883 log::info!(
1884 "CRASH RECOVERY: Rolling back {} inconsistent blocks",
1885 inconsistent_blocks.len()
1886 );
1887
1888 vfs_sync::with_global_metadata(|meta_map| {
1890 if let Some(db_meta) = meta_map.borrow_mut().get_mut(&self.db_name) {
1891 for (block_id, _) in inconsistent_blocks {
1892 log::info!(
1893 "CRASH RECOVERY: Removing inconsistent block {} from metadata",
1894 block_id
1895 );
1896 db_meta.remove(block_id);
1897 }
1898 }
1899 });
1900
1901 vfs_sync::with_global_storage(|storage_map| {
1903 if let Some(db_storage) = storage_map.borrow_mut().get_mut(&self.db_name) {
1904 for (block_id, _) in inconsistent_blocks {
1905 log::info!(
1906 "CRASH RECOVERY: Removing inconsistent block {} from global storage",
1907 block_id
1908 );
1909 db_storage.remove(block_id);
1910 }
1911 }
1912 });
1913
1914 for (block_id, _) in inconsistent_blocks {
1916 lock_mutex!(self.cache).remove(block_id);
1917 lock_mutex!(self.lru_order).retain(|&id| id != *block_id);
1919 }
1920
1921 let block_ids_to_delete: Vec<u64> = inconsistent_blocks.iter().map(|(id, _)| *id).collect();
1923 if !block_ids_to_delete.is_empty() {
1924 log::info!(
1925 "CRASH RECOVERY: Deleting {} blocks from IndexedDB",
1926 block_ids_to_delete.len()
1927 );
1928 super::wasm_indexeddb::delete_blocks_from_indexeddb(
1929 &self.db_name,
1930 &block_ids_to_delete,
1931 )
1932 .await?;
1933 log::info!("CRASH RECOVERY: Successfully deleted blocks from IndexedDB");
1934 }
1935
1936 log::info!("CRASH RECOVERY: Rollback completed");
1937 Ok(())
1938 }
1939
1940 #[cfg(target_arch = "wasm32")]
1942 async fn finalize_complete_transaction(
1943 &mut self,
1944 inconsistent_blocks: &[(u64, u64)],
1945 ) -> Result<(), DatabaseError> {
1946 log::info!(
1947 "CRASH RECOVERY: Finalizing transaction for {} blocks",
1948 inconsistent_blocks.len()
1949 );
1950
1951 if let Some((_, target_version)) = inconsistent_blocks.first() {
1953 let new_commit_marker = *target_version;
1954
1955 vfs_sync::with_global_commit_marker(|cm| {
1957 cm.borrow_mut()
1958 .insert(self.db_name.clone(), new_commit_marker);
1959 });
1960
1961 log::info!(
1962 "CRASH RECOVERY: Advanced commit marker from {} to {}",
1963 self.get_commit_marker(),
1964 new_commit_marker
1965 );
1966
1967 for (block_id, _) in inconsistent_blocks {
1969 if let Ok(data) = self.read_block_sync(*block_id) {
1971 self.checksum_manager.store_checksum(*block_id, &data);
1972 log::info!(
1973 "CRASH RECOVERY: Updated checksum for finalized block {}",
1974 block_id
1975 );
1976 }
1977 }
1978 }
1979
1980 log::info!("CRASH RECOVERY: Finalization completed");
1981 Ok(())
1982 }
1983
1984 #[cfg(target_arch = "wasm32")]
1988 pub async fn start_leader_election(&self) -> Result<(), DatabaseError> {
1989 if self.leader_election.borrow().is_none() {
1990 log::debug!(
1991 "BlockStorage::start_leader_election() - Creating new LeaderElectionManager for {}",
1992 self.db_name
1993 );
1994 let mut manager =
1995 super::leader_election::LeaderElectionManager::new(self.db_name.clone());
1996 log::debug!("BlockStorage::start_leader_election() - Calling manager.start_election()");
1997 manager.start_election().await?;
1998 log::debug!(
1999 "BlockStorage::start_leader_election() - Election started, storing manager"
2000 );
2001 *self.leader_election.borrow_mut() = Some(manager);
2002 } else {
2003 log::debug!(
2005 "BlockStorage::start_leader_election() - Election already exists, forcing leadership"
2006 );
2007 if let Some(ref mut manager) = *self.leader_election.borrow_mut() {
2008 manager.force_become_leader().await?;
2009 }
2010 }
2011 Ok(())
2012 }
2013
2014 #[cfg(target_arch = "wasm32")]
2017 pub fn was_leader(&self) -> bool {
2018 self.leader_election
2019 .borrow()
2020 .as_ref()
2021 .map(|m| m.state.borrow().is_leader)
2022 .unwrap_or(false)
2023 }
2024
2025 #[cfg(target_arch = "wasm32")]
2029 pub fn stop_heartbeat_sync(&self) {
2030 if let Ok(mut manager_ref) = self.leader_election.try_borrow_mut() {
2032 if let Some(ref mut manager) = *manager_ref {
2033 if let Some(interval_id) = manager.heartbeat_interval.take() {
2035 if let Some(window) = web_sys::window() {
2036 window.clear_interval_with_handle(interval_id);
2037 web_sys::console::log_1(
2038 &format!(
2039 "[DROP] Cleared heartbeat interval {} for {}",
2040 interval_id, self.db_name
2041 )
2042 .into(),
2043 );
2044 }
2045 }
2046 if manager.heartbeat_closure.take().is_some() {
2048 web_sys::console::log_1(
2049 &format!("[DROP] Released heartbeat closure for {}", self.db_name).into(),
2050 );
2051 }
2052 }
2053 } else {
2054 web_sys::console::log_1(
2056 &format!(
2057 "[DROP] Skipping heartbeat stop for {} (already handled)",
2058 self.db_name
2059 )
2060 .into(),
2061 );
2062 }
2063 }
2064
2065 #[cfg(target_arch = "wasm32")]
2067 pub async fn is_leader(&self) -> bool {
2068 if self.leader_election.borrow().is_none() {
2070 log::debug!(
2071 "BlockStorage::is_leader() - Starting leader election for {}",
2072 self.db_name
2073 );
2074 if let Err(e) = self.start_leader_election().await {
2075 log::error!("Failed to start leader election: {:?}", e);
2076 return false;
2077 }
2078 log::debug!("BlockStorage::is_leader() - Leader election started successfully");
2079 } else {
2080 log::debug!(
2081 "BlockStorage::is_leader() - Leader election already exists for {}",
2082 self.db_name
2083 );
2084 }
2085
2086 if let Some(ref mut manager) = *self.leader_election.borrow_mut() {
2087 let is_leader = manager.is_leader().await;
2088
2089 if !is_leader {
2091 let state = manager.state.borrow();
2092 if state.leader_id.is_none() {
2093 log::debug!(
2094 "No current leader for {} - triggering re-election",
2095 self.db_name
2096 );
2097 drop(state);
2098 let _ = manager.try_become_leader().await;
2099
2100 let new_is_leader = manager.state.borrow().is_leader;
2102 if new_is_leader && manager.heartbeat_interval.is_none() {
2103 let _ = manager.start_heartbeat();
2104 }
2105
2106 log::debug!(
2107 "is_leader() for {} = {} (after re-election)",
2108 self.db_name,
2109 new_is_leader
2110 );
2111 return new_is_leader;
2112 }
2113 }
2114
2115 log::debug!("is_leader() for {} = {}", self.db_name, is_leader);
2116 is_leader
2117 } else {
2118 log::debug!("No leader election manager for {}", self.db_name);
2119 false
2120 }
2121 }
2122
2123 #[cfg(target_arch = "wasm32")]
2125 pub async fn stop_leader_election(&self) -> Result<(), DatabaseError> {
2126 if let Some(mut manager) = self
2127 .leader_election
2128 .try_borrow_mut()
2129 .expect("Failed to borrow leader_election in stop_leader_election")
2130 .take()
2131 {
2132 manager.stop_election().await?;
2133 }
2134 Ok(())
2135 }
2136
2137 #[cfg(target_arch = "wasm32")]
2139 pub async fn send_leader_heartbeat(&self) -> Result<(), DatabaseError> {
2140 if let Some(ref manager) = *self.leader_election.borrow() {
2141 manager.send_heartbeat().await
2142 } else {
2143 Err(DatabaseError::new(
2144 "LEADER_ELECTION_ERROR",
2145 "Leader election not started",
2146 ))
2147 }
2148 }
2149
2150 #[cfg(target_arch = "wasm32")]
2152 pub async fn get_last_leader_heartbeat(&self) -> Result<u64, DatabaseError> {
2153 if let Some(ref manager) = *self.leader_election.borrow() {
2154 Ok(manager.get_last_heartbeat().await)
2155 } else {
2156 Err(DatabaseError::new(
2157 "LEADER_ELECTION_ERROR",
2158 "Leader election not started",
2159 ))
2160 }
2161 }
2162
2163 pub fn get_metrics(&self) -> super::observability::StorageMetrics {
2167 let dirty_count = self.get_dirty_count();
2168 let dirty_bytes = dirty_count * BLOCK_SIZE;
2169
2170 #[cfg(not(target_arch = "wasm32"))]
2171 let (sync_count, timer_sync_count, debounce_sync_count, last_sync_duration_ms) = {
2172 (
2173 self.sync_count.load(Ordering::SeqCst),
2174 self.timer_sync_count.load(Ordering::SeqCst),
2175 self.debounce_sync_count.load(Ordering::SeqCst),
2176 self.last_sync_duration_ms.load(Ordering::SeqCst),
2177 )
2178 };
2179
2180 #[cfg(target_arch = "wasm32")]
2181 let (sync_count, timer_sync_count, debounce_sync_count, last_sync_duration_ms) = {
2182 (self.observability.get_sync_count(), 0, 0, 1)
2184 };
2185
2186 let error_count = self.observability.get_error_count();
2187 let checksum_failures = self.observability.get_checksum_failures();
2188
2189 let total_operations = sync_count + error_count;
2191 let (throughput_blocks_per_sec, throughput_bytes_per_sec) = self
2192 .observability
2193 .calculate_throughput(last_sync_duration_ms);
2194 let error_rate = self.observability.calculate_error_rate(total_operations);
2195
2196 super::observability::StorageMetrics {
2197 dirty_count,
2198 dirty_bytes,
2199 sync_count,
2200 timer_sync_count,
2201 debounce_sync_count,
2202 error_count,
2203 checksum_failures,
2204 last_sync_duration_ms,
2205 throughput_blocks_per_sec,
2206 throughput_bytes_per_sec,
2207 error_rate,
2208 }
2209 }
2210
2211 #[cfg(not(target_arch = "wasm32"))]
2213 pub fn set_sync_callbacks(
2214 &mut self,
2215 on_sync_start: super::observability::SyncStartCallback,
2216 on_sync_success: super::observability::SyncSuccessCallback,
2217 on_sync_failure: super::observability::SyncFailureCallback,
2218 ) {
2219 self.observability.sync_start_callback = Some(on_sync_start);
2220 self.observability.sync_success_callback = Some(on_sync_success);
2221 self.observability.sync_failure_callback = Some(on_sync_failure);
2222 }
2223
2224 #[cfg(not(target_arch = "wasm32"))]
2226 pub fn set_backpressure_callback(
2227 &mut self,
2228 callback: super::observability::BackpressureCallback,
2229 ) {
2230 self.observability.backpressure_callback = Some(callback);
2231 }
2232
2233 #[cfg(not(target_arch = "wasm32"))]
2235 pub fn set_error_callback(&mut self, callback: super::observability::ErrorCallback) {
2236 self.observability.error_callback = Some(callback);
2237 }
2238
2239 #[cfg(target_arch = "wasm32")]
2241 pub fn set_sync_success_callback(
2242 &mut self,
2243 callback: super::observability::WasmSyncSuccessCallback,
2244 ) {
2245 self.observability.wasm_sync_success_callback = Some(callback);
2246 }
2247
2248 pub fn is_auto_sync_enabled(&self) -> bool {
2250 lock_mutex!(self.auto_sync_interval).is_some()
2251 }
2252
2253 pub fn get_sync_policy(&self) -> Option<super::SyncPolicy> {
2255 lock_mutex!(self.policy).clone()
2256 }
2257
2258 pub async fn force_sync(&mut self) -> Result<(), DatabaseError> {
2264 log::info!("force_sync: Starting forced synchronization with durability guarantees");
2265
2266 let dirty_count = self.get_dirty_count();
2267 if dirty_count == 0 {
2268 log::debug!("force_sync: No dirty blocks to sync");
2269 return Ok(());
2270 }
2271
2272 log::info!(
2273 "force_sync: Syncing {} dirty blocks with durability guarantee",
2274 dirty_count
2275 );
2276
2277 self.sync().await?;
2279
2280 log::info!("force_sync: Successfully completed forced synchronization");
2281 Ok(())
2282 }
2283
2284 #[cfg(feature = "telemetry")]
2286 pub fn set_metrics(&mut self, metrics: Option<crate::telemetry::Metrics>) {
2287 self.metrics = metrics;
2288 }
2289
2290 #[cfg(feature = "telemetry")]
2292 pub fn metrics(&self) -> Option<&crate::telemetry::Metrics> {
2293 self.metrics.as_ref()
2294 }
2295
2296 #[cfg(feature = "telemetry")]
2298 pub fn new_for_test() -> Self {
2299 Self {
2300 cache: Mutex::new(HashMap::new()),
2301 dirty_blocks: Arc::new(parking_lot::Mutex::new(HashMap::new())),
2302 allocated_blocks: Mutex::new(HashSet::new()),
2303 deallocated_blocks: Mutex::new(HashSet::new()),
2304 next_block_id: AtomicU64::new(1),
2305 capacity: 128,
2306 lru_order: Mutex::new(VecDeque::new()),
2307 checksum_manager: crate::storage::metadata::ChecksumManager::new(
2308 crate::storage::metadata::ChecksumAlgorithm::FastHash,
2309 ),
2310 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
2311 base_dir: std::path::PathBuf::from("/tmp/test"),
2312 db_name: "test.db".to_string(),
2313 auto_sync_interval: Mutex::new(None),
2314 #[cfg(not(target_arch = "wasm32"))]
2315 last_auto_sync: std::time::Instant::now(),
2316 policy: Mutex::new(None),
2317 #[cfg(not(target_arch = "wasm32"))]
2318 auto_sync_stop: None,
2319 #[cfg(not(target_arch = "wasm32"))]
2320 auto_sync_thread: None,
2321 #[cfg(not(target_arch = "wasm32"))]
2322 debounce_thread: None,
2323 #[cfg(not(target_arch = "wasm32"))]
2324 tokio_timer_task: None,
2325 #[cfg(not(target_arch = "wasm32"))]
2326 tokio_debounce_task: None,
2327 #[cfg(not(target_arch = "wasm32"))]
2328 last_write_ms: Arc::new(std::sync::atomic::AtomicU64::new(0)),
2329 #[cfg(not(target_arch = "wasm32"))]
2330 threshold_hit: Arc::new(std::sync::atomic::AtomicBool::new(false)),
2331 #[cfg(not(target_arch = "wasm32"))]
2332 sync_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
2333 #[cfg(not(target_arch = "wasm32"))]
2334 timer_sync_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
2335 #[cfg(not(target_arch = "wasm32"))]
2336 debounce_sync_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
2337 #[cfg(not(target_arch = "wasm32"))]
2338 last_sync_duration_ms: Arc::new(std::sync::atomic::AtomicU64::new(0)),
2339 #[cfg(not(target_arch = "wasm32"))]
2340 sync_sender: None,
2341 #[cfg(not(target_arch = "wasm32"))]
2342 sync_receiver: None,
2343 recovery_report: RecoveryReport::default(),
2344 #[cfg(target_arch = "wasm32")]
2345 leader_election: std::cell::RefCell::new(None),
2346 observability: super::observability::ObservabilityManager::new(),
2347 metrics: None,
2348 }
2349 }
2350}
2351
2352#[cfg(all(test, target_arch = "wasm32"))]
2353mod wasm_commit_marker_tests {
2354 use super::*;
2355 use wasm_bindgen_test::*;
2356
2357 wasm_bindgen_test_configure!(run_in_browser);
2358
2359 fn set_commit_marker(db: &str, v: u64) {
2361 super::vfs_sync::with_global_commit_marker(|cm| {
2362 cm.borrow_mut().insert(db.to_string(), v);
2363 });
2364 }
2365
2366 fn get_commit_marker(db: &str) -> u64 {
2368 vfs_sync::with_global_commit_marker(|cm| cm.borrow().get(db).copied().unwrap_or(0))
2369 }
2370
2371 #[wasm_bindgen_test]
2372 async fn gating_returns_zeroed_until_marker_catches_up_wasm() {
2373 let db = "cm_gating_wasm";
2374 let mut s = BlockStorage::new(db).await.expect("create storage");
2375
2376 let bid = s.allocate_block().await.expect("alloc block");
2378 let data_v1 = vec![0x33u8; BLOCK_SIZE];
2379 s.write_block(bid, data_v1.clone()).await.expect("write v1");
2380
2381 s.clear_cache();
2383 let out0 = s.read_block(bid).await.expect("read before commit");
2384 assert_eq!(
2385 out0,
2386 vec![0u8; BLOCK_SIZE],
2387 "uncommitted data must read as zeroed"
2388 );
2389
2390 s.sync().await.expect("sync v1");
2392 s.clear_cache();
2393 let out1 = s.read_block(bid).await.expect("read after commit");
2394 assert_eq!(out1, data_v1, "committed data should be visible");
2395 }
2396
2397 #[wasm_bindgen_test]
2398 async fn invisible_blocks_skip_checksum_verification_wasm() {
2399 let db = "cm_checksum_skip_wasm";
2400 let mut s = BlockStorage::new(db).await.expect("create storage");
2401
2402 let bid = s.allocate_block().await.expect("alloc block");
2403 let data = vec![0x44u8; BLOCK_SIZE];
2404 s.write_block(bid, data).await.expect("write v1");
2405 s.sync().await.expect("sync v1"); set_commit_marker(db, 0);
2409
2410 s.set_block_checksum_for_testing(bid, 1234567);
2412 s.clear_cache();
2413 let out = s
2414 .read_block(bid)
2415 .await
2416 .expect("read while invisible should not error");
2417 assert_eq!(
2418 out,
2419 vec![0u8; BLOCK_SIZE],
2420 "invisible block reads as zeroed"
2421 );
2422
2423 set_commit_marker(db, 1);
2425 s.clear_cache();
2426 let err = s
2427 .read_block(bid)
2428 .await
2429 .expect_err("expected checksum mismatch once visible");
2430 assert_eq!(err.code, "CHECKSUM_MISMATCH");
2431 }
2432
2433 #[wasm_bindgen_test]
2434 async fn commit_marker_advances_and_versions_track_syncs_wasm() {
2435 let db = "cm_versions_wasm";
2436 let mut s = BlockStorage::new_with_capacity(db, 8)
2437 .await
2438 .expect("create storage");
2439
2440 let b1 = s.allocate_block().await.expect("alloc b1");
2441 let b2 = s.allocate_block().await.expect("alloc b2");
2442
2443 s.write_block(b1, vec![1u8; BLOCK_SIZE])
2444 .await
2445 .expect("write b1 v1");
2446 s.write_block(b2, vec![2u8; BLOCK_SIZE])
2447 .await
2448 .expect("write b2 v1");
2449 s.sync().await.expect("sync #1");
2450
2451 let cm1 = get_commit_marker(db);
2452 assert_eq!(cm1, 1, "first sync should advance commit marker to 1");
2453 let meta1 = s.get_block_metadata_for_testing();
2454 assert_eq!(meta1.get(&b1).unwrap().1 as u64, cm1);
2455 assert_eq!(meta1.get(&b2).unwrap().1 as u64, cm1);
2456
2457 s.write_block(b1, vec![3u8; BLOCK_SIZE])
2459 .await
2460 .expect("write b1 v2");
2461 s.sync().await.expect("sync #2");
2462
2463 let cm2 = get_commit_marker(db);
2464 assert_eq!(cm2, 2, "second sync should advance commit marker to 2");
2465 let meta2 = s.get_block_metadata_for_testing();
2466 assert_eq!(
2467 meta2.get(&b1).unwrap().1 as u64,
2468 cm2,
2469 "updated block tracks new version"
2470 );
2471 assert_eq!(
2472 meta2.get(&b2).unwrap().1 as u64,
2473 1,
2474 "unchanged block retains prior version"
2475 );
2476 }
2477}
2478
2479#[cfg(all(test, not(target_arch = "wasm32"), not(feature = "fs_persist")))]
2480mod commit_marker_tests {
2481 use super::*;
2482
2483 fn set_commit_marker(db: &str, v: u64) {
2485 super::vfs_sync::with_global_commit_marker(|cm| {
2486 cm.borrow_mut().insert(db.to_string(), v);
2487 });
2488 }
2489
2490 fn get_commit_marker(db: &str) -> u64 {
2492 super::vfs_sync::with_global_commit_marker(|cm| cm.borrow().get(db).copied().unwrap_or(0))
2493 }
2494
2495 #[tokio::test(flavor = "current_thread")]
2496 async fn gating_returns_zeroed_until_marker_catches_up() {
2497 let db = "cm_gating_basic";
2498 println!("DEBUG: Creating BlockStorage for {}", db);
2499 let mut s = BlockStorage::new(db).await.expect("create storage");
2500 println!("DEBUG: BlockStorage created successfully");
2501
2502 let bid = s.allocate_block().await.expect("alloc block");
2504 println!("DEBUG: Allocated block {}", bid);
2505 let data_v1 = vec![0x11u8; BLOCK_SIZE];
2506 s.write_block(bid, data_v1.clone()).await.expect("write v1");
2507 println!("DEBUG: Wrote block {} with data", bid);
2508
2509 s.clear_cache();
2511 let out0 = s.read_block(bid).await.expect("read before commit");
2512 assert_eq!(
2513 out0,
2514 vec![0u8; BLOCK_SIZE],
2515 "uncommitted data must read as zeroed"
2516 );
2517 println!("DEBUG: Pre-sync read returned zeroed data as expected");
2518
2519 println!("DEBUG: About to call sync");
2521 s.sync().await.expect("sync v1");
2522 println!("DEBUG: Sync completed successfully");
2523
2524 let commit_marker = get_commit_marker(db);
2526 println!("DEBUG: Commit marker after sync: {}", commit_marker);
2527
2528 s.clear_cache();
2529 let out1 = s.read_block(bid).await.expect("read after commit");
2530
2531 println!("DEBUG: Expected data: {:?}", &data_v1[..8]);
2533 println!("DEBUG: Actual data: {:?}", &out1[..8]);
2534 println!(
2535 "DEBUG: Data lengths - expected: {}, actual: {}",
2536 data_v1.len(),
2537 out1.len()
2538 );
2539
2540 let data_matches = out1 == data_v1;
2542 println!("DEBUG: Data matches: {}", data_matches);
2543
2544 if !data_matches {
2545 println!("DEBUG: Data mismatch detected - investigating further");
2546 let is_all_zeros = out1.iter().all(|&b| b == 0);
2548 println!("DEBUG: Is all zeros: {}", is_all_zeros);
2549
2550 println!("DEBUG: Final commit marker: {}", get_commit_marker(db));
2552
2553 panic!("Data mismatch: expected committed data to be visible after sync");
2554 }
2555
2556 println!("DEBUG: Test passed - data is visible after commit");
2557 }
2558
2559 #[tokio::test(flavor = "current_thread")]
2560 async fn invisible_blocks_skip_checksum_verification() {
2561 let db = "cm_checksum_skip";
2562 let mut s = BlockStorage::new(db).await.expect("create storage");
2563
2564 let bid = s.allocate_block().await.expect("alloc block");
2565 let data = vec![0xAAu8; BLOCK_SIZE];
2566 s.write_block(bid, data.clone()).await.expect("write v1");
2567 s.sync().await.expect("sync v1"); set_commit_marker(db, 0);
2571
2572 s.set_block_checksum_for_testing(bid, 1234567);
2574 s.clear_cache();
2575 let out = s
2576 .read_block(bid)
2577 .await
2578 .expect("read while invisible should not error");
2579 assert_eq!(
2580 out,
2581 vec![0u8; BLOCK_SIZE],
2582 "invisible block reads as zeroed"
2583 );
2584
2585 set_commit_marker(db, 1);
2587 s.clear_cache();
2588 let err = s
2589 .read_block(bid)
2590 .await
2591 .expect_err("expected checksum mismatch once visible");
2592 assert_eq!(err.code, "CHECKSUM_MISMATCH");
2593 }
2594
2595 #[tokio::test(flavor = "current_thread")]
2596 async fn commit_marker_advances_and_versions_track_syncs() {
2597 let db = "cm_versions";
2598 let mut s = BlockStorage::new_with_capacity(db, 8)
2599 .await
2600 .expect("create storage");
2601
2602 let b1 = s.allocate_block().await.expect("alloc b1");
2603 let b2 = s.allocate_block().await.expect("alloc b2");
2604
2605 s.write_block(b1, vec![1u8; BLOCK_SIZE])
2606 .await
2607 .expect("write b1 v1");
2608 s.write_block(b2, vec![2u8; BLOCK_SIZE])
2609 .await
2610 .expect("write b2 v1");
2611 s.sync().await.expect("sync #1");
2612
2613 let cm1 = get_commit_marker(db);
2614 assert_eq!(cm1, 1, "first sync should advance commit marker to 1");
2615 let meta1 = s.get_block_metadata_for_testing();
2616 assert_eq!(meta1.get(&b1).unwrap().1 as u64, cm1);
2617 assert_eq!(meta1.get(&b2).unwrap().1 as u64, cm1);
2618
2619 s.write_block(b1, vec![3u8; BLOCK_SIZE])
2621 .await
2622 .expect("write b1 v2");
2623 s.sync().await.expect("sync #2");
2624
2625 let cm2 = get_commit_marker(db);
2626 assert_eq!(cm2, 2, "second sync should advance commit marker to 2");
2627 let meta2 = s.get_block_metadata_for_testing();
2628 assert_eq!(
2629 meta2.get(&b1).unwrap().1 as u64,
2630 cm2,
2631 "updated block tracks new version"
2632 );
2633 assert_eq!(
2634 meta2.get(&b2).unwrap().1 as u64,
2635 1,
2636 "unchanged block retains prior version"
2637 );
2638 }
2639}