1use std::collections::{HashMap, HashSet, VecDeque};
2use std::sync::Arc;
3use parking_lot::Mutex;
4use std::time::Duration;
5#[cfg(not(target_arch = "wasm32"))]
6use std::time::{Instant, SystemTime, UNIX_EPOCH};
7#[cfg(target_arch = "wasm32")]
8use js_sys::Date;
9#[cfg(not(target_arch = "wasm32"))]
10use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
11use crate::types::DatabaseError;
12use super::metadata::{ChecksumManager, ChecksumAlgorithm};
13#[allow(unused_imports)]
14use super::metadata::BlockMetadataPersist;
15#[cfg(any(target_arch = "wasm32", all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist"))))]
16use super::vfs_sync;
17#[cfg(not(target_arch = "wasm32"))]
18use tokio::task::JoinHandle as TokioJoinHandle;
19#[cfg(not(target_arch = "wasm32"))]
20use tokio::sync::mpsc;
21
22#[allow(unused_imports)]
23use std::cell::RefCell;
24
25#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
27use std::path::PathBuf;
28
29#[cfg(not(target_arch = "wasm32"))]
35#[derive(Debug)]
36pub(super) enum SyncRequest {
37 Timer(tokio::sync::oneshot::Sender<()>),
38 Debounce(tokio::sync::oneshot::Sender<()>),
39}
40
41#[derive(Clone, Debug, Default)]
42pub struct RecoveryOptions {
43 pub mode: RecoveryMode,
44 pub on_corruption: CorruptionAction,
45}
46
47#[derive(Clone, Debug)]
48pub enum RecoveryMode {
49 Full,
50 Sample { count: usize },
51 Skip,
52}
53
54impl Default for RecoveryMode {
55 fn default() -> Self {
56 RecoveryMode::Full
57 }
58}
59
60#[derive(Clone, Debug)]
61pub enum CorruptionAction {
62 Report,
63 Repair,
64 Fail,
65}
66
67#[derive(Clone, Debug, PartialEq)]
68pub enum CrashRecoveryAction {
69 NoActionNeeded,
70 Rollback,
71 Finalize,
72}
73
74impl Default for CorruptionAction {
75 fn default() -> Self {
76 CorruptionAction::Report
77 }
78}
79
80#[derive(Clone, Debug, Default)]
81pub struct RecoveryReport {
82 pub total_blocks_verified: usize,
83 pub corrupted_blocks: Vec<u64>,
84 pub repaired_blocks: Vec<u64>,
85 pub verification_duration_ms: u64,
86}
87
88#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
90#[derive(serde::Serialize, serde::Deserialize, Default)]
91#[allow(dead_code)]
92struct FsMeta { entries: Vec<(u64, BlockMetadataPersist)> }
93
94#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
95#[derive(serde::Serialize, serde::Deserialize, Default)]
96#[allow(dead_code)]
97struct FsAlloc { allocated: Vec<u64> }
98
99#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
100#[derive(serde::Serialize, serde::Deserialize, Default)]
101#[allow(dead_code)]
102struct FsDealloc { tombstones: Vec<u64> }
103
104#[cfg(not(target_arch = "wasm32"))]
106thread_local! {
107 pub(super) static GLOBAL_METADATA_TEST: RefCell<HashMap<String, HashMap<u64, BlockMetadataPersist>>> = RefCell::new(HashMap::new());
108}
109
110#[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions)))]
112thread_local! {
113 static GLOBAL_COMMIT_MARKER_TEST: RefCell<HashMap<String, u64>> = RefCell::new(HashMap::new());
114}
115
116#[derive(Clone, Debug)]
117pub struct SyncPolicy {
118 pub interval_ms: Option<u64>,
119 pub max_dirty: Option<usize>,
120 pub max_dirty_bytes: Option<usize>,
121 pub debounce_ms: Option<u64>,
122 pub verify_after_write: bool,
123}
124
125#[cfg(not(target_arch = "wasm32"))]
126impl Drop for BlockStorage {
127 fn drop(&mut self) {
128 if let Some(stop) = &self.auto_sync_stop {
129 stop.store(true, Ordering::SeqCst);
130 }
131 if let Some(handle) = self.auto_sync_thread.take() {
132 let _ = handle.join();
133 }
134 if let Some(handle) = self.debounce_thread.take() {
135 let _ = handle.join();
136 }
137 if let Some(task) = self.tokio_timer_task.take() {
138 task.abort();
139 }
140 if let Some(task) = self.tokio_debounce_task.take() {
141 task.abort();
142 }
143 self.auto_sync_stop = None;
144 }
145}
146
147pub const BLOCK_SIZE: usize = 4096;
148#[allow(dead_code)]
149pub(super) const DEFAULT_CACHE_CAPACITY: usize = 128;
150#[allow(dead_code)]
151const STORE_NAME: &str = "sqlite_blocks";
152#[allow(dead_code)]
153const METADATA_STORE: &str = "metadata";
154
155pub struct BlockStorage {
156 pub(super) cache: HashMap<u64, Vec<u8>>,
157 pub(super) dirty_blocks: Arc<Mutex<HashMap<u64, Vec<u8>>>>,
158 pub(super) allocated_blocks: HashSet<u64>,
159 #[allow(dead_code)]
160 pub(super) deallocated_blocks: HashSet<u64>,
161 pub(super) next_block_id: u64,
162 pub(super) capacity: usize,
163 pub(super) lru_order: VecDeque<u64>,
164 pub(super) checksum_manager: ChecksumManager,
166 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
167 pub(super) base_dir: PathBuf,
168 pub(super) db_name: String,
169 pub(super) auto_sync_interval: Option<Duration>,
171 #[cfg(not(target_arch = "wasm32"))]
172 pub(super) last_auto_sync: Instant,
173 pub(super) policy: Option<SyncPolicy>,
174 #[cfg(not(target_arch = "wasm32"))]
175 pub(super) auto_sync_stop: Option<Arc<AtomicBool>>,
176 #[cfg(not(target_arch = "wasm32"))]
177 pub(super) auto_sync_thread: Option<std::thread::JoinHandle<()>>,
178 #[cfg(not(target_arch = "wasm32"))]
179 pub(super) debounce_thread: Option<std::thread::JoinHandle<()>>,
180 #[cfg(not(target_arch = "wasm32"))]
181 pub(super) tokio_timer_task: Option<TokioJoinHandle<()>>,
182 #[cfg(not(target_arch = "wasm32"))]
183 pub(super) tokio_debounce_task: Option<TokioJoinHandle<()>>,
184 #[cfg(not(target_arch = "wasm32"))]
185 pub(super) last_write_ms: Arc<AtomicU64>,
186 #[cfg(not(target_arch = "wasm32"))]
187 pub(super) threshold_hit: Arc<AtomicBool>,
188 #[cfg(not(target_arch = "wasm32"))]
189 pub(super) sync_count: Arc<AtomicU64>,
190 #[cfg(not(target_arch = "wasm32"))]
191 pub(super) timer_sync_count: Arc<AtomicU64>,
192 #[cfg(not(target_arch = "wasm32"))]
193 pub(super) debounce_sync_count: Arc<AtomicU64>,
194 #[cfg(not(target_arch = "wasm32"))]
195 pub(super) last_sync_duration_ms: Arc<AtomicU64>,
196
197 #[cfg(not(target_arch = "wasm32"))]
199 pub(super) sync_sender: Option<mpsc::UnboundedSender<SyncRequest>>,
200 #[cfg(not(target_arch = "wasm32"))]
201 pub(super) sync_receiver: Option<mpsc::UnboundedReceiver<SyncRequest>>,
202
203 pub(super) recovery_report: RecoveryReport,
205
206 #[cfg(target_arch = "wasm32")]
208 pub(super) leader_election: Option<super::leader_election::LeaderElectionManager>,
209
210 pub(super) observability: super::observability::ObservabilityManager,
212
213 #[cfg(feature = "telemetry")]
215 pub(super) metrics: Option<crate::telemetry::Metrics>,
216}
217
218impl BlockStorage {
219 #[cfg(target_arch = "wasm32")]
222 pub fn new_sync(db_name: &str) -> Self {
223 log::info!("Creating BlockStorage synchronously for database: {}", db_name);
224
225 use crate::storage::vfs_sync::with_global_storage;
227 let (cache, allocated_blocks, max_block_id) = with_global_storage(|gs| {
228 let storage_map = gs.borrow();
229 if let Some(db_storage) = storage_map.get(db_name) {
230 let cache = db_storage.clone();
231 let allocated = db_storage.keys().copied().collect::<HashSet<_>>();
232 let max_id = db_storage.keys().max().copied().unwrap_or(0);
233 (cache, allocated, max_id)
234 } else {
235 (HashMap::new(), HashSet::new(), 0)
236 }
237 });
238
239 log::info!("Loaded {} blocks from GLOBAL_STORAGE for {} (max_block_id={})", cache.len(), db_name, max_block_id);
240
241 use crate::storage::vfs_sync::with_global_metadata;
244 let checksum_manager = with_global_metadata(|gm| {
245 let metadata_map = gm.borrow();
246 if let Some(db_metadata) = metadata_map.get(db_name) {
247 let mut checksums = HashMap::new();
248 let mut algos = HashMap::new();
249 for (block_id, meta) in db_metadata {
250 checksums.insert(*block_id, meta.checksum);
251 algos.insert(*block_id, meta.algo);
252 }
253 ChecksumManager::with_data(checksums, algos, ChecksumAlgorithm::FastHash)
254 } else {
255 ChecksumManager::new(ChecksumAlgorithm::FastHash)
256 }
257 });
258
259 Self {
260 cache,
261 dirty_blocks: Arc::new(Mutex::new(HashMap::new())),
262 allocated_blocks,
263 deallocated_blocks: HashSet::new(),
264 next_block_id: max_block_id + 1,
265 capacity: 128,
266 lru_order: VecDeque::new(),
267 checksum_manager,
268 db_name: db_name.to_string(),
269 auto_sync_interval: None,
270 policy: None,
271 #[cfg(not(target_arch = "wasm32"))]
272 last_auto_sync: Instant::now(),
273 #[cfg(not(target_arch = "wasm32"))]
274 auto_sync_stop: None,
275 #[cfg(not(target_arch = "wasm32"))]
276 auto_sync_thread: None,
277 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
278 base_dir: std::path::PathBuf::from(std::env::var("ABSURDERSQL_FS_BASE").unwrap_or_else(|_| "./test_storage".to_string())),
279 #[cfg(not(target_arch = "wasm32"))]
280 debounce_thread: None,
281 #[cfg(not(target_arch = "wasm32"))]
282 tokio_timer_task: None,
283 #[cfg(not(target_arch = "wasm32"))]
284 tokio_debounce_task: None,
285 #[cfg(not(target_arch = "wasm32"))]
286 last_write_ms: Arc::new(AtomicU64::new(0)),
287 #[cfg(not(target_arch = "wasm32"))]
288 threshold_hit: Arc::new(AtomicBool::new(false)),
289 #[cfg(not(target_arch = "wasm32"))]
290 sync_count: Arc::new(AtomicU64::new(0)),
291 #[cfg(not(target_arch = "wasm32"))]
292 timer_sync_count: Arc::new(AtomicU64::new(0)),
293 #[cfg(not(target_arch = "wasm32"))]
294 debounce_sync_count: Arc::new(AtomicU64::new(0)),
295 #[cfg(not(target_arch = "wasm32"))]
296 last_sync_duration_ms: Arc::new(AtomicU64::new(0)),
297 #[cfg(not(target_arch = "wasm32"))]
298 sync_sender: None,
299 #[cfg(not(target_arch = "wasm32"))]
300 sync_receiver: None,
301 recovery_report: RecoveryReport::default(),
302 #[cfg(target_arch = "wasm32")]
303 leader_election: None,
304 observability: super::observability::ObservabilityManager::new(),
305 #[cfg(feature = "telemetry")]
306 metrics: None,
307 }
308 }
309
310 #[cfg(target_arch = "wasm32")]
311 pub async fn new(db_name: &str) -> Result<Self, DatabaseError> {
312 super::constructors::new_wasm(db_name).await
313 }
314
315 #[cfg(not(target_arch = "wasm32"))]
316 pub async fn new(db_name: &str) -> Result<Self, DatabaseError> {
317 log::info!("Creating BlockStorage for database: {}", db_name);
318
319 let (allocated_blocks, next_block_id) = {
321 #[cfg(feature = "fs_persist")]
322 {
323 let mut allocated_blocks = HashSet::new();
325 let mut next_block_id: u64 = 1;
326
327 let base_path = std::env::var("ABSURDERSQL_FS_BASE").unwrap_or_else(|_| "./test_storage".to_string());
328 let mut alloc_path = std::path::PathBuf::from(base_path);
329 alloc_path.push(db_name);
330 alloc_path.push("allocations.json");
331
332 if let Ok(content) = std::fs::read_to_string(&alloc_path) {
333 if let Ok(alloc_data) = serde_json::from_str::<serde_json::Value>(&content) {
334 if let Some(allocated_array) = alloc_data["allocated"].as_array() {
335 for block_id_val in allocated_array {
336 if let Some(block_id) = block_id_val.as_u64() {
337 allocated_blocks.insert(block_id);
338 }
339 }
340 next_block_id = allocated_blocks.iter().max().copied().unwrap_or(0) + 1;
341 }
342 }
343 }
344
345 (allocated_blocks, next_block_id)
346 }
347
348 #[cfg(not(feature = "fs_persist"))]
349 {
350 (HashSet::new(), 1)
352 }
353 };
354
355 let checksums_init: HashMap<u64, u64> = {
357 #[cfg(feature = "fs_persist")]
358 {
359 let mut map = HashMap::new();
360 let base_path = std::env::var("ABSURDERSQL_FS_BASE").unwrap_or_else(|_| "./test_storage".to_string());
361 let mut meta_path = std::path::PathBuf::from(base_path);
362 meta_path.push(db_name);
363 meta_path.push("metadata.json");
364
365 if let Ok(content) = std::fs::read_to_string(&meta_path) {
366 if let Ok(meta_data) = serde_json::from_str::<serde_json::Value>(&content) {
367 if let Some(entries) = meta_data["entries"].as_array() {
368 for entry in entries {
369 if let Some(arr) = entry.as_array() {
370 if let (Some(block_id), Some(obj)) = (arr.get(0).and_then(|v| v.as_u64()), arr.get(1).and_then(|v| v.as_object())) {
371 if let Some(checksum) = obj.get("checksum").and_then(|v| v.as_u64()) {
372 map.insert(block_id, checksum);
373 }
374 }
375 }
376 }
377 }
378 }
379 }
380 map
381 }
382
383 #[cfg(not(feature = "fs_persist"))]
384 {
385 #[allow(unused_mut)]
387 let mut map = HashMap::new();
388 #[cfg(any(test, debug_assertions))]
389 GLOBAL_METADATA_TEST.with(|meta| {
390 let meta_map = meta.borrow();
391 if let Some(db_meta) = meta_map.get(db_name) {
392 for (block_id, metadata) in db_meta.iter() {
393 map.insert(*block_id, metadata.checksum);
394 }
395 }
396 });
397 map
398 }
399 };
400
401 let checksum_algos_init: HashMap<u64, ChecksumAlgorithm> = {
402 #[cfg(feature = "fs_persist")]
403 {
404 let mut map = HashMap::new();
405 let base_path = std::env::var("ABSURDERSQL_FS_BASE").unwrap_or_else(|_| "./test_storage".to_string());
406 let mut meta_path = std::path::PathBuf::from(base_path);
407 meta_path.push(db_name);
408 meta_path.push("metadata.json");
409
410 if let Ok(content) = std::fs::read_to_string(&meta_path) {
411 if let Ok(meta_data) = serde_json::from_str::<serde_json::Value>(&content) {
412 if let Some(entries) = meta_data["entries"].as_array() {
413 for entry in entries {
414 if let Some(arr) = entry.as_array() {
415 if let (Some(block_id), Some(obj)) = (arr.get(0).and_then(|v| v.as_u64()), arr.get(1).and_then(|v| v.as_object())) {
416 let algo = obj.get("algo").and_then(|v| v.as_str())
417 .and_then(|s| match s {
418 "CRC32" => Some(ChecksumAlgorithm::CRC32),
419 "FastHash" => Some(ChecksumAlgorithm::FastHash),
420 _ => None,
421 })
422 .unwrap_or(ChecksumAlgorithm::FastHash);
423 map.insert(block_id, algo);
424 }
425 }
426 }
427 }
428 }
429 }
430 map
431 }
432
433 #[cfg(not(feature = "fs_persist"))]
434 {
435 #[allow(unused_mut)]
437 let mut map = HashMap::new();
438 #[cfg(any(test, debug_assertions))]
439 GLOBAL_METADATA_TEST.with(|meta| {
440 let meta_map = meta.borrow();
441 if let Some(db_meta) = meta_map.get(db_name) {
442 for (block_id, metadata) in db_meta.iter() {
443 map.insert(*block_id, metadata.algo);
444 }
445 }
446 });
447 map
448 }
449 };
450
451 #[cfg(feature = "fs_persist")]
453 let checksum_algo_default = match std::env::var("DATASYNC_CHECKSUM_ALGO").ok().as_deref() {
454 Some("CRC32") => ChecksumAlgorithm::CRC32,
455 _ => ChecksumAlgorithm::FastHash,
456 };
457 #[cfg(not(feature = "fs_persist"))]
458 let checksum_algo_default = ChecksumAlgorithm::FastHash;
459
460 let deallocated_blocks_init: HashSet<u64> = {
462 #[cfg(feature = "fs_persist")]
463 {
464 let mut set = HashSet::new();
465 let base_path = std::env::var("ABSURDERSQL_FS_BASE").unwrap_or_else(|_| "./test_storage".to_string());
466 let mut path = std::path::PathBuf::from(base_path);
467 path.push(db_name);
468 let mut dealloc_path = path.clone();
469 dealloc_path.push("deallocated.json");
470 if let Ok(content) = std::fs::read_to_string(&dealloc_path) {
471 if let Ok(dealloc_data) = serde_json::from_str::<serde_json::Value>(&content) {
472 if let Some(tombstones_array) = dealloc_data["tombstones"].as_array() {
473 for tombstone_val in tombstones_array {
474 if let Some(block_id) = tombstone_val.as_u64() {
475 set.insert(block_id);
476 }
477 }
478 }
479 }
480 }
481 set
482 }
483 #[cfg(not(feature = "fs_persist"))]
484 {
485 HashSet::new()
486 }
487 };
488
489 Ok(BlockStorage {
490 db_name: db_name.to_string(),
491 cache: HashMap::new(),
492 lru_order: VecDeque::new(),
493 capacity: 1000,
494 checksum_manager: ChecksumManager::with_data(
495 checksums_init,
496 checksum_algos_init,
497 checksum_algo_default,
498 ),
499 dirty_blocks: Arc::new(Mutex::new(HashMap::new())),
500 allocated_blocks,
501 next_block_id,
502 deallocated_blocks: deallocated_blocks_init,
503 policy: None,
504 auto_sync_interval: None,
505 #[cfg(not(target_arch = "wasm32"))]
506 last_auto_sync: Instant::now(),
507 #[cfg(not(target_arch = "wasm32"))]
508 auto_sync_stop: None,
509 #[cfg(not(target_arch = "wasm32"))]
510 auto_sync_thread: None,
511 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
512 base_dir: std::path::PathBuf::from(std::env::var("ABSURDERSQL_FS_BASE").unwrap_or_else(|_| "./test_storage".to_string())),
513 #[cfg(not(target_arch = "wasm32"))]
514 debounce_thread: None,
515 #[cfg(not(target_arch = "wasm32"))]
516 tokio_timer_task: None,
517 #[cfg(not(target_arch = "wasm32"))]
518 tokio_debounce_task: None,
519 #[cfg(not(target_arch = "wasm32"))]
520 last_write_ms: Arc::new(AtomicU64::new(0)),
521 #[cfg(not(target_arch = "wasm32"))]
522 threshold_hit: Arc::new(AtomicBool::new(false)),
523 #[cfg(not(target_arch = "wasm32"))]
524 sync_count: Arc::new(AtomicU64::new(0)),
525 #[cfg(not(target_arch = "wasm32"))]
526 timer_sync_count: Arc::new(AtomicU64::new(0)),
527 #[cfg(not(target_arch = "wasm32"))]
528 debounce_sync_count: Arc::new(AtomicU64::new(0)),
529 #[cfg(not(target_arch = "wasm32"))]
530 last_sync_duration_ms: Arc::new(AtomicU64::new(0)),
531 #[cfg(not(target_arch = "wasm32"))]
532 sync_sender: None,
533 #[cfg(not(target_arch = "wasm32"))]
534 sync_receiver: None,
535 recovery_report: RecoveryReport::default(),
536 observability: super::observability::ObservabilityManager::new(),
537 #[cfg(feature = "telemetry")]
538 metrics: None,
539 })
540 }
541
542 pub async fn new_with_capacity(db_name: &str, capacity: usize) -> Result<Self, DatabaseError> {
543 let mut s = Self::new(db_name).await?;
544 s.capacity = capacity;
545 Ok(s)
546 }
547
548 pub async fn new_with_recovery_options(db_name: &str, recovery_opts: RecoveryOptions) -> Result<Self, DatabaseError> {
549 let mut storage = Self::new(db_name).await?;
550
551 storage.perform_startup_recovery(recovery_opts).await?;
552
553 Ok(storage)
554 }
555
556 pub fn get_recovery_report(&self) -> &RecoveryReport {
557 &self.recovery_report
558 }
559
560 async fn perform_startup_recovery(&mut self, opts: RecoveryOptions) -> Result<(), DatabaseError> {
561 super::recovery::perform_startup_recovery(self, opts).await
562 }
563
564 pub(super) async fn get_blocks_for_verification(&self, mode: &RecoveryMode) -> Result<Vec<u64>, DatabaseError> {
565 let all_blocks: Vec<u64> = self.allocated_blocks.iter().copied().collect();
566
567 match mode {
568 RecoveryMode::Full => Ok(all_blocks),
569 RecoveryMode::Sample { count } => {
570 let sample_count = (*count).min(all_blocks.len());
571 let mut sampled = all_blocks;
572 sampled.sort_unstable(); sampled.truncate(sample_count);
574 Ok(sampled)
575 }
576 RecoveryMode::Skip => Ok(Vec::new()),
577 }
578 }
579
580 pub(super) async fn verify_block_integrity(&mut self, block_id: u64) -> Result<bool, DatabaseError> {
581 let data = match self.read_block_from_storage(block_id).await {
583 Ok(data) => data,
584 Err(_) => {
585 log::warn!("Could not read block {} for integrity verification", block_id);
586 return Ok(false);
587 }
588 };
589
590 match self.verify_against_stored_checksum(block_id, &data) {
592 Ok(()) => Ok(true),
593 Err(e) => {
594 log::warn!("Block {} failed checksum verification: {}", block_id, e.message);
595 Ok(false)
596 }
597 }
598 }
599
600 async fn read_block_from_storage(&mut self, block_id: u64) -> Result<Vec<u8>, DatabaseError> {
601 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
603 {
604 let mut blocks_dir = self.base_dir.clone();
605 blocks_dir.push(&self.db_name);
606 blocks_dir.push("blocks");
607 let block_file = blocks_dir.join(format!("block_{}.bin", block_id));
608
609 if let Ok(data) = std::fs::read(&block_file) {
610 if data.len() == BLOCK_SIZE {
611 return Ok(data);
612 }
613 }
614 }
615
616 #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
618 {
619 let mut found_data = None;
620 vfs_sync::with_global_storage(|storage| {
621 let storage_map = storage.borrow();
622 if let Some(db_storage) = storage_map.get(&self.db_name) {
623 if let Some(data) = db_storage.get(&block_id) {
624 found_data = Some(data.clone());
625 }
626 }
627 });
628 if let Some(data) = found_data {
629 return Ok(data);
630 }
631 }
632
633 #[cfg(target_arch = "wasm32")]
635 {
636 let mut found_data = None;
637 vfs_sync::with_global_storage(|storage| {
638 let storage_map = storage.borrow();
639 if let Some(db_storage) = storage_map.get(&self.db_name) {
640 if let Some(data) = db_storage.get(&block_id) {
641 found_data = Some(data.clone());
642 }
643 }
644 });
645 if let Some(data) = found_data {
646 return Ok(data);
647 }
648 }
649
650 Err(DatabaseError::new(
651 "BLOCK_NOT_FOUND",
652 &format!("Block {} not found in storage", block_id)
653 ))
654 }
655
656 pub(super) async fn repair_corrupted_block(&mut self, block_id: u64) -> Result<bool, DatabaseError> {
657 log::info!("Attempting to repair corrupted block {}", block_id);
658
659 self.cache.remove(&block_id);
664
665 self.checksum_manager.remove_checksum(block_id);
667
668 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
670 {
671 let mut blocks_dir = self.base_dir.clone();
672 blocks_dir.push(&self.db_name);
673 blocks_dir.push("blocks");
674 let block_file = blocks_dir.join(format!("block_{}.bin", block_id));
675 let _ = std::fs::remove_file(&block_file);
676 }
677
678 #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
680 {
681 vfs_sync::with_global_storage(|storage| {
682 let mut storage_map = storage.borrow_mut();
683 if let Some(db_storage) = storage_map.get_mut(&self.db_name) {
684 db_storage.remove(&block_id);
685 }
686 });
687 }
688
689 #[cfg(target_arch = "wasm32")]
691 {
692 vfs_sync::with_global_storage(|storage| {
693 let mut storage_map = storage.borrow_mut();
694 if let Some(db_storage) = storage_map.get_mut(&self.db_name) {
695 db_storage.remove(&block_id);
696 }
697 });
698 }
699
700 log::info!("Corrupted block {} has been removed (repair completed)", block_id);
701 Ok(true)
702 }
703
704 pub(super) fn touch_lru(&mut self, block_id: u64) {
705 if let Some(pos) = self.lru_order.iter().position(|&id| id == block_id) {
707 self.lru_order.remove(pos);
708 }
709 self.lru_order.push_back(block_id);
711 }
712
713 pub(super) fn evict_if_needed(&mut self) {
714 while self.cache.len() > self.capacity {
716 let dirty_guard = self.dirty_blocks.lock();
718 let victim_pos = self
719 .lru_order
720 .iter()
721 .position(|id| !dirty_guard.contains_key(id));
722
723 match victim_pos {
724 Some(pos) => {
725 let victim = self.lru_order.remove(pos).expect("valid pos");
726 self.cache.remove(&victim);
727 }
728 None => {
729 break;
731 }
732 }
733 }
734 }
735
736 #[inline]
737 #[cfg(target_arch = "wasm32")]
738 pub fn now_millis() -> u64 {
739 Date::now() as u64
741 }
742
743 #[inline]
744 #[cfg(not(target_arch = "wasm32"))]
745 pub fn now_millis() -> u64 {
746 let now = SystemTime::now()
747 .duration_since(UNIX_EPOCH)
748 .unwrap_or_else(|_| Duration::from_millis(0));
749 now.as_millis() as u64
750 }
751
752 pub(super) fn verify_against_stored_checksum(
753 &self,
754 block_id: u64,
755 data: &[u8],
756 ) -> Result<(), DatabaseError> {
757 self.checksum_manager.validate_checksum(block_id, data)
758 }
759
760 pub fn read_block_sync(&mut self, block_id: u64) -> Result<Vec<u8>, DatabaseError> {
762 super::io_operations::read_block_sync_impl(self, block_id)
764 }
765
766 pub async fn read_block(&mut self, block_id: u64) -> Result<Vec<u8>, DatabaseError> {
767 self.read_block_sync(block_id)
769 }
770
771 pub fn write_block_sync(&mut self, block_id: u64, data: Vec<u8>) -> Result<(), DatabaseError> {
773 super::io_operations::write_block_sync_impl(self, block_id, data)
775 }
776
777
778 pub async fn write_block(&mut self, block_id: u64, data: Vec<u8>) -> Result<(), DatabaseError> {
779 self.write_block_sync(block_id, data)
781 }
782
783 pub fn write_blocks_sync(&mut self, items: Vec<(u64, Vec<u8>)>) -> Result<(), DatabaseError> {
785 self.maybe_auto_sync();
786 for (block_id, data) in items {
787 self.write_block_sync(block_id, data)?;
788 }
789 Ok(())
790 }
791
792 pub async fn write_blocks(&mut self, items: Vec<(u64, Vec<u8>)>) -> Result<(), DatabaseError> {
794 self.write_blocks_sync(items)
795 }
796
797 pub fn read_blocks_sync(&mut self, block_ids: &[u64]) -> Result<Vec<Vec<u8>>, DatabaseError> {
799 self.maybe_auto_sync();
800 let mut results = Vec::with_capacity(block_ids.len());
801 for &id in block_ids {
802 results.push(self.read_block_sync(id)?);
803 }
804 Ok(results)
805 }
806
807 pub async fn read_blocks(&mut self, block_ids: &[u64]) -> Result<Vec<Vec<u8>>, DatabaseError> {
809 self.read_blocks_sync(block_ids)
810 }
811
812 pub fn get_block_checksum(&self, block_id: u64) -> Option<u32> {
814 self.checksum_manager.get_checksum(block_id).map(|checksum| checksum as u32)
815 }
816
817 #[cfg(target_arch = "wasm32")]
819 pub fn get_commit_marker(&self) -> u64 {
820 vfs_sync::with_global_commit_marker(|cm| {
821 cm.borrow().get(&self.db_name).copied().unwrap_or(0)
822 })
823 }
824
825 #[cfg(target_arch = "wasm32")]
827 pub fn has_any_blocks(&self) -> bool {
828 vfs_sync::with_global_storage(|gs| {
829 gs.borrow().get(&self.db_name).map_or(false, |blocks| !blocks.is_empty())
830 })
831 }
832
833 pub async fn verify_block_checksum(&mut self, block_id: u64) -> Result<(), DatabaseError> {
834 if let Some(bytes) = self.cache.get(&block_id).cloned() {
836 return self.verify_against_stored_checksum(block_id, &bytes);
837 }
838 let data = self.read_block_sync(block_id)?;
840 self.verify_against_stored_checksum(block_id, &data)
841 }
842
843 #[cfg(any(test, debug_assertions))]
844 pub fn get_block_metadata_for_testing(&self) -> HashMap<u64, (u64, u32, u64)> {
845 #[cfg(target_arch = "wasm32")]
847 {
848 let mut out = HashMap::new();
849 vfs_sync::with_global_metadata(|meta| {
850 let meta_map = meta.borrow();
851 if let Some(db_meta) = meta_map.get(&self.db_name) {
852 for (bid, m) in db_meta.iter() {
853 out.insert(*bid, (m.checksum, m.version, m.last_modified_ms));
854 }
855 }
856 });
857 out
858 }
859 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
860 {
861 use std::io::Read;
862 let mut out = HashMap::new();
863 let base: PathBuf = self.base_dir.clone();
864 let mut db_dir = base.clone();
865 db_dir.push(&self.db_name);
866 let mut meta_path = db_dir.clone();
867 meta_path.push("metadata.json");
868 if let Ok(mut f) = std::fs::File::open(&meta_path) {
869 let mut s = String::new();
870 if f.read_to_string(&mut s).is_ok() {
871 if let Ok(parsed) = serde_json::from_str::<FsMeta>(&s) {
872 for (bid, m) in parsed.entries.into_iter() { out.insert(bid, (m.checksum, m.version, m.last_modified_ms)); }
873 }
874 }
875 }
876 out
877 }
878 #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
879 {
880 let mut out = HashMap::new();
881 GLOBAL_METADATA_TEST.with(|meta| {
882 let meta_map = meta.borrow();
883 if let Some(db_meta) = meta_map.get(&self.db_name) {
884 for (bid, m) in db_meta.iter() {
885 out.insert(*bid, (m.checksum, m.version, m.last_modified_ms));
886 }
887 }
888 });
889 out
890 }
891 }
892
893 #[cfg(any(test, debug_assertions))]
894 pub fn set_block_checksum_for_testing(&mut self, block_id: u64, checksum: u64) {
895 self.checksum_manager.set_checksum_for_testing(block_id, checksum);
896 }
897
898 #[cfg(not(target_arch = "wasm32"))]
900 pub(super) fn get_dirty_blocks(&self) -> &Arc<Mutex<HashMap<u64, Vec<u8>>>> {
901 &self.dirty_blocks
902 }
903
904
905 pub async fn sync(&mut self) -> Result<(), DatabaseError> {
906 #[cfg(target_arch = "wasm32")]
908 {
909 let result = self.sync_implementation();
911 wasm_bindgen_futures::JsFuture::from(js_sys::Promise::resolve(&wasm_bindgen::JsValue::UNDEFINED)).await.ok();
913 result
914 }
915 #[cfg(not(target_arch = "wasm32"))]
916 {
917 self.sync_implementation()
918 }
919 }
920
921 pub fn sync_now(&mut self) -> Result<(), DatabaseError> {
923 self.sync_implementation()
924 }
925
926 fn sync_implementation(&mut self) -> Result<(), DatabaseError> {
928 super::sync_operations::sync_implementation_impl(self)
929 }
930
931 #[cfg(target_arch = "wasm32")]
934 pub fn sync_blocks_only(&mut self) -> Result<(), DatabaseError> {
935 super::wasm_vfs_sync::sync_blocks_only(self)
936 }
937
938
939 #[cfg(target_arch = "wasm32")]
941 pub async fn sync_async(&mut self) -> Result<(), DatabaseError> {
942 super::wasm_indexeddb::sync_async(self).await
943 }
944
945 pub fn drain_and_shutdown(&mut self) {
948 if let Err(e) = self.sync_now() {
949 log::error!("drain_and_shutdown: sync_now failed: {}", e.message);
950 }
951 self.auto_sync_interval = None;
952 #[cfg(not(target_arch = "wasm32"))]
953 {
954 if let Some(stop) = &self.auto_sync_stop {
955 stop.store(true, Ordering::SeqCst);
956 }
957 if let Some(handle) = self.auto_sync_thread.take() {
958 let _ = handle.join();
959 }
960 if let Some(handle) = self.debounce_thread.take() {
961 let _ = handle.join();
962 }
963 if let Some(task) = self.tokio_timer_task.take() { task.abort(); }
964 if let Some(task) = self.tokio_debounce_task.take() { task.abort(); }
965 self.auto_sync_stop = None;
966 self.threshold_hit.store(false, Ordering::SeqCst);
967 }
968 }
969
970 pub fn clear_cache(&mut self) {
971 self.cache.clear();
972 self.lru_order.clear();
973 }
974
975 pub async fn on_database_import(&mut self) -> Result<(), DatabaseError> {
995 log::info!("Clearing cache for database '{}' after import", self.db_name);
996
997 self.clear_cache();
999
1000 self.dirty_blocks.lock().clear();
1002
1003 self.checksum_manager.clear_checksums();
1005
1006 #[cfg(target_arch = "wasm32")]
1008 {
1009 use super::vfs_sync::with_global_allocation_map;
1010 self.allocated_blocks = with_global_allocation_map(|gam| {
1011 gam.borrow()
1012 .get(&self.db_name)
1013 .cloned()
1014 .unwrap_or_else(std::collections::HashSet::new)
1015 });
1016 log::debug!("Reloaded {} allocated blocks from global allocation map", self.allocated_blocks.len());
1017
1018 log::debug!("Checksum data will be reloaded from metadata on next verification");
1020 }
1021
1022 #[cfg(not(target_arch = "wasm32"))]
1023 {
1024 #[cfg(feature = "fs_persist")]
1025 {
1026 let mut alloc_path = self.base_dir.clone();
1028 alloc_path.push(&self.db_name);
1029 alloc_path.push("allocations.json");
1030
1031 if let Ok(content) = std::fs::read_to_string(&alloc_path) {
1032 if let Ok(alloc_data) = serde_json::from_str::<serde_json::Value>(&content) {
1033 if let Some(allocated_array) = alloc_data["allocated"].as_array() {
1034 self.allocated_blocks.clear();
1035 for block_id_val in allocated_array {
1036 if let Some(block_id) = block_id_val.as_u64() {
1037 self.allocated_blocks.insert(block_id);
1038 }
1039 }
1040 log::debug!("Reloaded {} allocated blocks from filesystem", self.allocated_blocks.len());
1041 }
1042 }
1043 }
1044
1045 let mut meta_path = self.base_dir.clone();
1047 meta_path.push(&self.db_name);
1048 meta_path.push("metadata.json");
1049
1050 if let Ok(content) = std::fs::read_to_string(&meta_path) {
1051 if let Ok(meta_data) = serde_json::from_str::<serde_json::Value>(&content) {
1052 if let Some(entries) = meta_data["entries"].as_array() {
1053 let mut new_checksums = HashMap::new();
1054 let mut new_algos = HashMap::new();
1055
1056 for entry in entries {
1057 if let (Some(block_id), Some(checksum), Some(algo_str)) = (
1058 entry[0].as_u64(),
1059 entry[1]["checksum"].as_u64(),
1060 entry[1]["algo"].as_str(),
1061 ) {
1062 new_checksums.insert(block_id, checksum);
1063
1064 let algo = match algo_str {
1065 "CRC32" => super::metadata::ChecksumAlgorithm::CRC32,
1066 _ => super::metadata::ChecksumAlgorithm::FastHash,
1067 };
1068 new_algos.insert(block_id, algo);
1069 }
1070 }
1071
1072 self.checksum_manager.replace_all(new_checksums.clone(), new_algos);
1073 log::debug!("Reloaded {} checksums from filesystem metadata", new_checksums.len());
1074 }
1075 }
1076 } else {
1077 log::debug!("No metadata file found, checksums will be empty after import");
1078 }
1079 }
1080
1081 #[cfg(not(feature = "fs_persist"))]
1082 {
1083 use super::vfs_sync::with_global_allocation_map;
1085
1086 self.allocated_blocks = with_global_allocation_map(|gam| {
1087 gam.borrow()
1088 .get(&self.db_name)
1089 .cloned()
1090 .unwrap_or_else(std::collections::HashSet::new)
1091 });
1092 log::debug!("Reloaded {} allocated blocks from global allocation map (native test)", self.allocated_blocks.len());
1093
1094 log::debug!("Checksum data will be reloaded from metadata on next verification");
1096 }
1097 }
1098
1099 log::info!("Cache and allocation state refreshed for '{}'", self.db_name);
1100
1101 Ok(())
1102 }
1103
1104 #[cfg(target_arch = "wasm32")]
1106 pub fn reload_cache_from_global_storage(&mut self) {
1107 use crate::storage::vfs_sync::{with_global_storage, with_global_metadata};
1108 let fresh_cache = with_global_storage(|gs| {
1109 let storage_map = gs.borrow();
1110 if let Some(db_storage) = storage_map.get(&self.db_name) {
1111 db_storage.clone()
1112 } else {
1113 std::collections::HashMap::new()
1114 }
1115 });
1116
1117 with_global_metadata(|gm| {
1120 let metadata_map = gm.borrow();
1121 if let Some(db_metadata) = metadata_map.get(&self.db_name) {
1122 self.checksum_manager.clear_checksums();
1124
1125 let mut new_checksums = std::collections::HashMap::new();
1127 let mut new_algos = std::collections::HashMap::new();
1128 for (block_id, meta) in db_metadata {
1129 new_checksums.insert(*block_id, meta.checksum);
1130 new_algos.insert(*block_id, meta.algo);
1131 }
1132 self.checksum_manager.replace_all(new_checksums, new_algos);
1133 } else {
1134 self.checksum_manager.clear_checksums();
1136 }
1137 });
1138
1139 let old_lru = std::mem::replace(&mut self.lru_order, std::collections::VecDeque::new());
1141 self.cache.clear();
1142
1143 for (block_id, block_data) in fresh_cache {
1145 self.cache.insert(block_id, block_data);
1146 }
1147
1148 for block_id in old_lru {
1150 if self.cache.contains_key(&block_id) {
1151 self.lru_order.push_back(block_id);
1152 }
1153 }
1154
1155 for &block_id in self.cache.keys() {
1157 if !self.lru_order.contains(&block_id) {
1158 self.lru_order.push_back(block_id);
1159 }
1160 }
1161 }
1162
1163 pub fn get_cache_size(&self) -> usize {
1164 self.cache.len()
1165 }
1166
1167 pub fn get_dirty_count(&self) -> usize {
1168 self.dirty_blocks.lock().len()
1169 }
1170
1171 pub fn is_cached(&self, block_id: u64) -> bool {
1172 self.cache.contains_key(&block_id)
1173 }
1174
1175 pub async fn allocate_block(&mut self) -> Result<u64, DatabaseError> {
1177 super::allocation::allocate_block_impl(self).await
1178 }
1179
1180 pub async fn deallocate_block(&mut self, block_id: u64) -> Result<(), DatabaseError> {
1182 super::allocation::deallocate_block_impl(self, block_id).await
1183 }
1184
1185 pub fn get_allocated_count(&self) -> usize {
1187 self.allocated_blocks.len()
1188 }
1189
1190 #[cfg(target_arch = "wasm32")]
1194 pub async fn crash_simulation_sync(&mut self, blocks_written: bool) -> Result<(), DatabaseError> {
1195 log::info!("CRASH SIMULATION: Starting crash simulation with blocks_written={}", blocks_written);
1196
1197 if blocks_written {
1198 let dirty_blocks = {
1203 let dirty = self.dirty_blocks.lock();
1204 dirty.clone()
1205 };
1206
1207 if !dirty_blocks.is_empty() {
1208 log::info!("CRASH SIMULATION: Writing {} blocks to IndexedDB before crash", dirty_blocks.len());
1209
1210 let metadata_to_persist: Vec<(u64, u64)> = dirty_blocks
1212 .keys()
1213 .map(|&block_id| {
1214 let next_commit = self.get_commit_marker() + 1;
1215 (block_id, next_commit)
1216 })
1217 .collect();
1218
1219 log::debug!("CRASH SIMULATION: About to call persist_to_indexeddb for {} blocks", dirty_blocks.len());
1220
1221 super::wasm_indexeddb::persist_to_indexeddb(
1223 &self.db_name,
1224 dirty_blocks,
1225 metadata_to_persist,
1226 ).await?;
1227
1228 log::info!("CRASH SIMULATION: persist_to_indexeddb completed successfully");
1229 log::info!("CRASH SIMULATION: Blocks written to IndexedDB, simulating crash before commit marker advance");
1230
1231 self.dirty_blocks.lock().clear();
1233
1234 return Ok(());
1238 } else {
1239 log::info!("CRASH SIMULATION: No dirty blocks to write");
1240 return Ok(());
1241 }
1242 } else {
1243 log::info!("CRASH SIMULATION: Simulating crash before blocks are written to IndexedDB");
1245
1246 return Ok(());
1248 }
1249 }
1250
1251 #[cfg(target_arch = "wasm32")]
1254 pub async fn crash_simulation_partial_sync(&mut self, blocks_to_write: &[u64]) -> Result<(), DatabaseError> {
1255 log::info!("CRASH SIMULATION: Starting partial crash simulation for {} blocks", blocks_to_write.len());
1256
1257 let dirty_blocks = {
1258 let dirty = self.dirty_blocks.lock();
1259 dirty.clone()
1260 };
1261
1262 let partial_blocks: std::collections::HashMap<u64, Vec<u8>> = dirty_blocks
1264 .into_iter()
1265 .filter(|(block_id, _)| blocks_to_write.contains(block_id))
1266 .collect();
1267
1268 if !partial_blocks.is_empty() {
1269 log::info!("CRASH SIMULATION: Writing {} out of {} blocks before crash",
1270 partial_blocks.len(), blocks_to_write.len());
1271
1272 let metadata_to_persist: Vec<(u64, u64)> = partial_blocks
1273 .keys()
1274 .map(|&block_id| {
1275 let next_commit = self.get_commit_marker() + 1;
1276 (block_id, next_commit)
1277 })
1278 .collect();
1279
1280 super::wasm_indexeddb::persist_to_indexeddb(
1282 &self.db_name,
1283 partial_blocks.clone(),
1284 metadata_to_persist,
1285 ).await?;
1286
1287 {
1289 let mut dirty = self.dirty_blocks.lock();
1290 for block_id in partial_blocks.keys() {
1291 dirty.remove(block_id);
1292 }
1293 }
1294
1295 log::info!("CRASH SIMULATION: Partial blocks written, simulating crash before commit marker advance");
1296
1297 }
1299
1300 Ok(())
1301 }
1302
1303 #[cfg(target_arch = "wasm32")]
1307 pub async fn perform_crash_recovery(&mut self) -> Result<CrashRecoveryAction, DatabaseError> {
1308 log::info!("CRASH RECOVERY: Starting crash recovery scan for database: {}", self.db_name);
1309
1310 let current_marker = self.get_commit_marker();
1312 log::info!("CRASH RECOVERY: Current commit marker: {}", current_marker);
1313
1314 let inconsistent_blocks = self.scan_for_inconsistent_blocks(current_marker).await?;
1317
1318 if inconsistent_blocks.is_empty() {
1319 log::info!("CRASH RECOVERY: No inconsistent blocks found, system is consistent");
1320 return Ok(CrashRecoveryAction::NoActionNeeded);
1321 }
1322
1323 log::info!("CRASH RECOVERY: Found {} inconsistent blocks that need recovery", inconsistent_blocks.len());
1324
1325 let recovery_action = self.determine_recovery_action(&inconsistent_blocks).await?;
1327
1328 match recovery_action {
1329 CrashRecoveryAction::Rollback => {
1330 log::info!("CRASH RECOVERY: Performing rollback of incomplete transaction");
1331 self.rollback_incomplete_transaction(&inconsistent_blocks).await?;
1332 }
1333 CrashRecoveryAction::Finalize => {
1334 log::info!("CRASH RECOVERY: Performing finalization of complete transaction");
1335 self.finalize_complete_transaction(&inconsistent_blocks).await?;
1336 }
1337 CrashRecoveryAction::NoActionNeeded => {
1338 }
1340 }
1341
1342 log::info!("CRASH RECOVERY: Recovery completed successfully");
1343 Ok(recovery_action)
1344 }
1345
1346 #[cfg(target_arch = "wasm32")]
1348 async fn scan_for_inconsistent_blocks(&self, commit_marker: u64) -> Result<Vec<(u64, u64)>, DatabaseError> {
1349 log::info!("CRASH RECOVERY: Scanning for blocks with version > {}", commit_marker);
1350
1351 let mut inconsistent_blocks = Vec::new();
1354
1355 vfs_sync::with_global_metadata(|meta| {
1356 let meta_map = meta.borrow();
1357 if let Some(db_meta) = meta_map.get(&self.db_name) {
1358 for (block_id, metadata) in db_meta.iter() {
1359 if metadata.version as u64 > commit_marker {
1360 log::info!("CRASH RECOVERY: Found inconsistent block {} with version {} > marker {}",
1361 block_id, metadata.version, commit_marker);
1362 inconsistent_blocks.push((*block_id, metadata.version as u64));
1363 }
1364 }
1365 }
1366 });
1367
1368 Ok(inconsistent_blocks)
1369 }
1370
1371 #[cfg(target_arch = "wasm32")]
1373 async fn determine_recovery_action(&self, inconsistent_blocks: &[(u64, u64)]) -> Result<CrashRecoveryAction, DatabaseError> {
1374 let expected_next_commit = self.get_commit_marker() + 1;
1379 let all_same_version = inconsistent_blocks
1380 .iter()
1381 .all(|(_, version)| *version == expected_next_commit);
1382
1383 if all_same_version && !inconsistent_blocks.is_empty() {
1384 log::info!("CRASH RECOVERY: All inconsistent blocks have expected version {}, finalizing transaction", expected_next_commit);
1385 Ok(CrashRecoveryAction::Finalize)
1386 } else {
1387 log::info!("CRASH RECOVERY: Inconsistent block versions detected, rolling back transaction");
1388 Ok(CrashRecoveryAction::Rollback)
1389 }
1390 }
1391
1392 #[cfg(target_arch = "wasm32")]
1394 async fn rollback_incomplete_transaction(&mut self, inconsistent_blocks: &[(u64, u64)]) -> Result<(), DatabaseError> {
1395 log::info!("CRASH RECOVERY: Rolling back {} inconsistent blocks", inconsistent_blocks.len());
1396
1397 vfs_sync::with_global_metadata(|meta| {
1399 let mut meta_map = meta.borrow_mut();
1400 if let Some(db_meta) = meta_map.get_mut(&self.db_name) {
1401 for (block_id, _) in inconsistent_blocks {
1402 log::info!("CRASH RECOVERY: Removing inconsistent block {} from metadata", block_id);
1403 db_meta.remove(block_id);
1404 }
1405 }
1406 });
1407
1408 vfs_sync::with_global_storage(|gs| {
1410 let mut storage_map = gs.borrow_mut();
1411 if let Some(db_storage) = storage_map.get_mut(&self.db_name) {
1412 for (block_id, _) in inconsistent_blocks {
1413 log::info!("CRASH RECOVERY: Removing inconsistent block {} from global storage", block_id);
1414 db_storage.remove(block_id);
1415 }
1416 }
1417 });
1418
1419 for (block_id, _) in inconsistent_blocks {
1421 self.cache.remove(block_id);
1422 self.lru_order.retain(|&id| id != *block_id);
1424 }
1425
1426 let block_ids_to_delete: Vec<u64> = inconsistent_blocks.iter().map(|(id, _)| *id).collect();
1428 if !block_ids_to_delete.is_empty() {
1429 log::info!("CRASH RECOVERY: Deleting {} blocks from IndexedDB", block_ids_to_delete.len());
1430 super::wasm_indexeddb::delete_blocks_from_indexeddb(&self.db_name, &block_ids_to_delete).await?;
1431 log::info!("CRASH RECOVERY: Successfully deleted blocks from IndexedDB");
1432 }
1433
1434 log::info!("CRASH RECOVERY: Rollback completed");
1435 Ok(())
1436 }
1437
1438 #[cfg(target_arch = "wasm32")]
1440 async fn finalize_complete_transaction(&mut self, inconsistent_blocks: &[(u64, u64)]) -> Result<(), DatabaseError> {
1441 log::info!("CRASH RECOVERY: Finalizing transaction for {} blocks", inconsistent_blocks.len());
1442
1443 if let Some((_, target_version)) = inconsistent_blocks.first() {
1445 let new_commit_marker = *target_version;
1446
1447 vfs_sync::with_global_commit_marker(|cm| {
1449 cm.borrow_mut().insert(self.db_name.clone(), new_commit_marker);
1450 });
1451
1452 log::info!("CRASH RECOVERY: Advanced commit marker from {} to {}",
1453 self.get_commit_marker(), new_commit_marker);
1454
1455 for (block_id, _) in inconsistent_blocks {
1457 if let Ok(data) = self.read_block_sync(*block_id) {
1459 self.checksum_manager.store_checksum(*block_id, &data);
1460 log::info!("CRASH RECOVERY: Updated checksum for finalized block {}", block_id);
1461 }
1462 }
1463 }
1464
1465 log::info!("CRASH RECOVERY: Finalization completed");
1466 Ok(())
1467 }
1468
1469 #[cfg(target_arch = "wasm32")]
1473 pub async fn start_leader_election(&mut self) -> Result<(), DatabaseError> {
1474 if self.leader_election.is_none() {
1475 let mut manager = super::leader_election::LeaderElectionManager::new(self.db_name.clone());
1476 manager.start_election().await?;
1477 self.leader_election = Some(manager);
1478 } else {
1479 if let Some(ref mut manager) = self.leader_election {
1481 manager.force_become_leader().await?;
1482 }
1483 }
1484 Ok(())
1485 }
1486
1487 #[cfg(target_arch = "wasm32")]
1489 pub async fn is_leader(&mut self) -> bool {
1490 if self.leader_election.is_none() {
1492 log::debug!("Starting leader election for {}", self.db_name);
1493 if let Err(e) = self.start_leader_election().await {
1494 log::error!("Failed to start leader election: {:?}", e);
1495 return false;
1496 }
1497 }
1498
1499 if let Some(ref mut manager) = self.leader_election {
1500 let is_leader = manager.is_leader().await;
1501
1502 if !is_leader {
1504 let state = manager.state.borrow();
1505 if state.leader_id.is_none() {
1506 log::debug!("No current leader for {} - triggering re-election", self.db_name);
1507 drop(state);
1508 let _ = manager.try_become_leader().await;
1509
1510 let new_is_leader = manager.state.borrow().is_leader;
1512 if new_is_leader && manager.heartbeat_interval.is_none() {
1513 let _ = manager.start_heartbeat();
1514 }
1515
1516 log::debug!("is_leader() for {} = {} (after re-election)", self.db_name, new_is_leader);
1517 return new_is_leader;
1518 }
1519 }
1520
1521 log::debug!("is_leader() for {} = {}", self.db_name, is_leader);
1522 is_leader
1523 } else {
1524 log::debug!("No leader election manager for {}", self.db_name);
1525 false
1526 }
1527 }
1528
1529 #[cfg(target_arch = "wasm32")]
1531 pub async fn stop_leader_election(&mut self) -> Result<(), DatabaseError> {
1532 if let Some(mut manager) = self.leader_election.take() {
1533 manager.stop_election().await?;
1534 }
1535 Ok(())
1536 }
1537
1538 #[cfg(target_arch = "wasm32")]
1540 pub async fn send_leader_heartbeat(&self) -> Result<(), DatabaseError> {
1541 if let Some(ref manager) = self.leader_election {
1542 manager.send_heartbeat().await
1543 } else {
1544 Err(DatabaseError::new("LEADER_ELECTION_ERROR", "Leader election not started"))
1545 }
1546 }
1547
1548 #[cfg(target_arch = "wasm32")]
1550 pub async fn get_last_leader_heartbeat(&self) -> Result<u64, DatabaseError> {
1551 if let Some(ref manager) = self.leader_election {
1552 Ok(manager.get_last_heartbeat().await)
1553 } else {
1554 Err(DatabaseError::new("LEADER_ELECTION_ERROR", "Leader election not started"))
1555 }
1556 }
1557
1558 pub fn get_metrics(&self) -> super::observability::StorageMetrics {
1562 let dirty_count = self.get_dirty_count();
1563 let dirty_bytes = dirty_count * BLOCK_SIZE;
1564
1565 #[cfg(not(target_arch = "wasm32"))]
1566 let (sync_count, timer_sync_count, debounce_sync_count, last_sync_duration_ms) = {
1567 (
1568 self.sync_count.load(Ordering::SeqCst),
1569 self.timer_sync_count.load(Ordering::SeqCst),
1570 self.debounce_sync_count.load(Ordering::SeqCst),
1571 self.last_sync_duration_ms.load(Ordering::SeqCst),
1572 )
1573 };
1574
1575 #[cfg(target_arch = "wasm32")]
1576 let (sync_count, timer_sync_count, debounce_sync_count, last_sync_duration_ms) = {
1577 (self.observability.get_sync_count(), 0, 0, 1)
1579 };
1580
1581 let error_count = self.observability.get_error_count();
1582 let checksum_failures = self.observability.get_checksum_failures();
1583
1584 let total_operations = sync_count + error_count;
1586 let (throughput_blocks_per_sec, throughput_bytes_per_sec) =
1587 self.observability.calculate_throughput(last_sync_duration_ms);
1588 let error_rate = self.observability.calculate_error_rate(total_operations);
1589
1590 super::observability::StorageMetrics {
1591 dirty_count,
1592 dirty_bytes,
1593 sync_count,
1594 timer_sync_count,
1595 debounce_sync_count,
1596 error_count,
1597 checksum_failures,
1598 last_sync_duration_ms,
1599 throughput_blocks_per_sec,
1600 throughput_bytes_per_sec,
1601 error_rate,
1602 }
1603 }
1604
1605 #[cfg(not(target_arch = "wasm32"))]
1607 pub fn set_sync_callbacks(
1608 &mut self,
1609 on_sync_start: super::observability::SyncStartCallback,
1610 on_sync_success: super::observability::SyncSuccessCallback,
1611 on_sync_failure: super::observability::SyncFailureCallback,
1612 ) {
1613 self.observability.sync_start_callback = Some(on_sync_start);
1614 self.observability.sync_success_callback = Some(on_sync_success);
1615 self.observability.sync_failure_callback = Some(on_sync_failure);
1616 }
1617
1618 #[cfg(not(target_arch = "wasm32"))]
1620 pub fn set_backpressure_callback(&mut self, callback: super::observability::BackpressureCallback) {
1621 self.observability.backpressure_callback = Some(callback);
1622 }
1623
1624 #[cfg(not(target_arch = "wasm32"))]
1626 pub fn set_error_callback(&mut self, callback: super::observability::ErrorCallback) {
1627 self.observability.error_callback = Some(callback);
1628 }
1629
1630 #[cfg(target_arch = "wasm32")]
1632 pub fn set_sync_success_callback(&mut self, callback: super::observability::WasmSyncSuccessCallback) {
1633 self.observability.wasm_sync_success_callback = Some(callback);
1634 }
1635
1636 pub fn is_auto_sync_enabled(&self) -> bool {
1638 self.auto_sync_interval.is_some()
1639 }
1640
1641 pub fn get_sync_policy(&self) -> Option<super::SyncPolicy> {
1643 self.policy.clone()
1644 }
1645
1646 pub async fn force_sync(&mut self) -> Result<(), DatabaseError> {
1652 log::info!("force_sync: Starting forced synchronization with durability guarantees");
1653
1654 let dirty_count = self.get_dirty_count();
1655 if dirty_count == 0 {
1656 log::debug!("force_sync: No dirty blocks to sync");
1657 return Ok(());
1658 }
1659
1660 log::info!("force_sync: Syncing {} dirty blocks with durability guarantee", dirty_count);
1661
1662 self.sync().await?;
1664
1665 log::info!("force_sync: Successfully completed forced synchronization");
1666 Ok(())
1667 }
1668
1669 #[cfg(feature = "telemetry")]
1671 pub fn set_metrics(&mut self, metrics: Option<crate::telemetry::Metrics>) {
1672 self.metrics = metrics;
1673 }
1674
1675 #[cfg(feature = "telemetry")]
1677 pub fn metrics(&self) -> Option<&crate::telemetry::Metrics> {
1678 self.metrics.as_ref()
1679 }
1680
1681 #[cfg(feature = "telemetry")]
1683 pub fn new_for_test() -> Self {
1684 Self {
1685 cache: HashMap::new(),
1686 dirty_blocks: Arc::new(parking_lot::Mutex::new(HashMap::new())),
1687 allocated_blocks: HashSet::new(),
1688 deallocated_blocks: HashSet::new(),
1689 next_block_id: 1,
1690 capacity: 128,
1691 lru_order: VecDeque::new(),
1692 checksum_manager: crate::storage::metadata::ChecksumManager::new(
1693 crate::storage::metadata::ChecksumAlgorithm::FastHash
1694 ),
1695 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
1696 base_dir: std::path::PathBuf::from("/tmp/test"),
1697 db_name: "test.db".to_string(),
1698 auto_sync_interval: None,
1699 #[cfg(not(target_arch = "wasm32"))]
1700 last_auto_sync: std::time::Instant::now(),
1701 policy: None,
1702 #[cfg(not(target_arch = "wasm32"))]
1703 auto_sync_stop: None,
1704 #[cfg(not(target_arch = "wasm32"))]
1705 auto_sync_thread: None,
1706 #[cfg(not(target_arch = "wasm32"))]
1707 debounce_thread: None,
1708 #[cfg(not(target_arch = "wasm32"))]
1709 tokio_timer_task: None,
1710 #[cfg(not(target_arch = "wasm32"))]
1711 tokio_debounce_task: None,
1712 #[cfg(not(target_arch = "wasm32"))]
1713 last_write_ms: Arc::new(std::sync::atomic::AtomicU64::new(0)),
1714 #[cfg(not(target_arch = "wasm32"))]
1715 threshold_hit: Arc::new(std::sync::atomic::AtomicBool::new(false)),
1716 #[cfg(not(target_arch = "wasm32"))]
1717 sync_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
1718 #[cfg(not(target_arch = "wasm32"))]
1719 timer_sync_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
1720 #[cfg(not(target_arch = "wasm32"))]
1721 debounce_sync_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
1722 #[cfg(not(target_arch = "wasm32"))]
1723 last_sync_duration_ms: Arc::new(std::sync::atomic::AtomicU64::new(0)),
1724 #[cfg(not(target_arch = "wasm32"))]
1725 sync_sender: None,
1726 #[cfg(not(target_arch = "wasm32"))]
1727 sync_receiver: None,
1728 recovery_report: RecoveryReport::default(),
1729 #[cfg(target_arch = "wasm32")]
1730 leader_election: None,
1731 observability: super::observability::ObservabilityManager::new(),
1732 metrics: None,
1733 }
1734 }
1735}
1736
1737#[cfg(all(test, target_arch = "wasm32"))]
1738mod wasm_commit_marker_tests {
1739 use super::*;
1740 use wasm_bindgen_test::*;
1741
1742 wasm_bindgen_test_configure!(run_in_browser);
1743
1744 fn set_commit_marker(db: &str, v: u64) {
1746 super::vfs_sync::with_global_commit_marker(|cm| {
1747 cm.borrow_mut().insert(db.to_string(), v);
1748 });
1749 }
1750
1751 fn get_commit_marker(db: &str) -> u64 {
1753 vfs_sync::with_global_commit_marker(|cm| cm.borrow().get(db).copied().unwrap_or(0))
1754 }
1755
1756 #[wasm_bindgen_test]
1757 async fn gating_returns_zeroed_until_marker_catches_up_wasm() {
1758 let db = "cm_gating_wasm";
1759 let mut s = BlockStorage::new(db).await.expect("create storage");
1760
1761 let bid = s.allocate_block().await.expect("alloc block");
1763 let data_v1 = vec![0x33u8; BLOCK_SIZE];
1764 s.write_block(bid, data_v1.clone()).await.expect("write v1");
1765
1766 s.clear_cache();
1768 let out0 = s.read_block(bid).await.expect("read before commit");
1769 assert_eq!(out0, vec![0u8; BLOCK_SIZE], "uncommitted data must read as zeroed");
1770
1771 s.sync().await.expect("sync v1");
1773 s.clear_cache();
1774 let out1 = s.read_block(bid).await.expect("read after commit");
1775 assert_eq!(out1, data_v1, "committed data should be visible");
1776 }
1777
1778 #[wasm_bindgen_test]
1779 async fn invisible_blocks_skip_checksum_verification_wasm() {
1780 let db = "cm_checksum_skip_wasm";
1781 let mut s = BlockStorage::new(db).await.expect("create storage");
1782
1783 let bid = s.allocate_block().await.expect("alloc block");
1784 let data = vec![0x44u8; BLOCK_SIZE];
1785 s.write_block(bid, data).await.expect("write v1");
1786 s.sync().await.expect("sync v1"); set_commit_marker(db, 0);
1790
1791 s.set_block_checksum_for_testing(bid, 1234567);
1793 s.clear_cache();
1794 let out = s.read_block(bid).await.expect("read while invisible should not error");
1795 assert_eq!(out, vec![0u8; BLOCK_SIZE], "invisible block reads as zeroed");
1796
1797 set_commit_marker(db, 1);
1799 s.clear_cache();
1800 let err = s
1801 .read_block(bid)
1802 .await
1803 .expect_err("expected checksum mismatch once visible");
1804 assert_eq!(err.code, "CHECKSUM_MISMATCH");
1805 }
1806
1807 #[wasm_bindgen_test]
1808 async fn commit_marker_advances_and_versions_track_syncs_wasm() {
1809 let db = "cm_versions_wasm";
1810 let mut s = BlockStorage::new_with_capacity(db, 8)
1811 .await
1812 .expect("create storage");
1813
1814 let b1 = s.allocate_block().await.expect("alloc b1");
1815 let b2 = s.allocate_block().await.expect("alloc b2");
1816
1817 s.write_block(b1, vec![1u8; BLOCK_SIZE]).await.expect("write b1 v1");
1818 s.write_block(b2, vec![2u8; BLOCK_SIZE]).await.expect("write b2 v1");
1819 s.sync().await.expect("sync #1");
1820
1821 let cm1 = get_commit_marker(db);
1822 assert_eq!(cm1, 1, "first sync should advance commit marker to 1");
1823 let meta1 = s.get_block_metadata_for_testing();
1824 assert_eq!(meta1.get(&b1).unwrap().1 as u64, cm1);
1825 assert_eq!(meta1.get(&b2).unwrap().1 as u64, cm1);
1826
1827 s.write_block(b1, vec![3u8; BLOCK_SIZE]).await.expect("write b1 v2");
1829 s.sync().await.expect("sync #2");
1830
1831 let cm2 = get_commit_marker(db);
1832 assert_eq!(cm2, 2, "second sync should advance commit marker to 2");
1833 let meta2 = s.get_block_metadata_for_testing();
1834 assert_eq!(meta2.get(&b1).unwrap().1 as u64, cm2, "updated block tracks new version");
1835 assert_eq!(meta2.get(&b2).unwrap().1 as u64, 1, "unchanged block retains prior version");
1836 }
1837}
1838
1839#[cfg(all(test, not(target_arch = "wasm32"), not(feature = "fs_persist")))]
1840mod commit_marker_tests {
1841 use super::*;
1842
1843 fn set_commit_marker(db: &str, v: u64) {
1845 super::vfs_sync::with_global_commit_marker(|cm| {
1846 cm.borrow_mut().insert(db.to_string(), v);
1847 });
1848 }
1849
1850 fn get_commit_marker(db: &str) -> u64 {
1852 super::vfs_sync::with_global_commit_marker(|cm| cm.borrow().get(db).copied().unwrap_or(0))
1853 }
1854
1855 #[tokio::test(flavor = "current_thread")]
1856 async fn gating_returns_zeroed_until_marker_catches_up() {
1857 let db = "cm_gating_basic";
1858 println!("DEBUG: Creating BlockStorage for {}", db);
1859 let mut s = BlockStorage::new(db).await.expect("create storage");
1860 println!("DEBUG: BlockStorage created successfully");
1861
1862 let bid = s.allocate_block().await.expect("alloc block");
1864 println!("DEBUG: Allocated block {}", bid);
1865 let data_v1 = vec![0x11u8; BLOCK_SIZE];
1866 s.write_block(bid, data_v1.clone()).await.expect("write v1");
1867 println!("DEBUG: Wrote block {} with data", bid);
1868
1869 s.clear_cache();
1871 let out0 = s.read_block(bid).await.expect("read before commit");
1872 assert_eq!(out0, vec![0u8; BLOCK_SIZE], "uncommitted data must read as zeroed");
1873 println!("DEBUG: Pre-sync read returned zeroed data as expected");
1874
1875 println!("DEBUG: About to call sync");
1877 s.sync().await.expect("sync v1");
1878 println!("DEBUG: Sync completed successfully");
1879
1880 let commit_marker = get_commit_marker(db);
1882 println!("DEBUG: Commit marker after sync: {}", commit_marker);
1883
1884 s.clear_cache();
1885 let out1 = s.read_block(bid).await.expect("read after commit");
1886
1887 println!("DEBUG: Expected data: {:?}", &data_v1[..8]);
1889 println!("DEBUG: Actual data: {:?}", &out1[..8]);
1890 println!("DEBUG: Data lengths - expected: {}, actual: {}", data_v1.len(), out1.len());
1891
1892 let data_matches = out1 == data_v1;
1894 println!("DEBUG: Data matches: {}", data_matches);
1895
1896 if !data_matches {
1897 println!("DEBUG: Data mismatch detected - investigating further");
1898 let is_all_zeros = out1.iter().all(|&b| b == 0);
1900 println!("DEBUG: Is all zeros: {}", is_all_zeros);
1901
1902 println!("DEBUG: Final commit marker: {}", get_commit_marker(db));
1904
1905 panic!("Data mismatch: expected committed data to be visible after sync");
1906 }
1907
1908 println!("DEBUG: Test passed - data is visible after commit");
1909 }
1910
1911 #[tokio::test(flavor = "current_thread")]
1912 async fn invisible_blocks_skip_checksum_verification() {
1913 let db = "cm_checksum_skip";
1914 let mut s = BlockStorage::new(db).await.expect("create storage");
1915
1916 let bid = s.allocate_block().await.expect("alloc block");
1917 let data = vec![0xAAu8; BLOCK_SIZE];
1918 s.write_block(bid, data.clone()).await.expect("write v1");
1919 s.sync().await.expect("sync v1"); set_commit_marker(db, 0);
1923
1924 s.set_block_checksum_for_testing(bid, 1234567);
1926 s.clear_cache();
1927 let out = s.read_block(bid).await.expect("read while invisible should not error");
1928 assert_eq!(out, vec![0u8; BLOCK_SIZE], "invisible block reads as zeroed");
1929
1930 set_commit_marker(db, 1);
1932 s.clear_cache();
1933 let err = s
1934 .read_block(bid)
1935 .await
1936 .expect_err("expected checksum mismatch once visible");
1937 assert_eq!(err.code, "CHECKSUM_MISMATCH");
1938 }
1939
1940 #[tokio::test(flavor = "current_thread")]
1941 async fn commit_marker_advances_and_versions_track_syncs() {
1942 let db = "cm_versions";
1943 let mut s = BlockStorage::new_with_capacity(db, 8)
1944 .await
1945 .expect("create storage");
1946
1947 let b1 = s.allocate_block().await.expect("alloc b1");
1948 let b2 = s.allocate_block().await.expect("alloc b2");
1949
1950 s.write_block(b1, vec![1u8; BLOCK_SIZE]).await.expect("write b1 v1");
1951 s.write_block(b2, vec![2u8; BLOCK_SIZE]).await.expect("write b2 v1");
1952 s.sync().await.expect("sync #1");
1953
1954 let cm1 = get_commit_marker(db);
1955 assert_eq!(cm1, 1, "first sync should advance commit marker to 1");
1956 let meta1 = s.get_block_metadata_for_testing();
1957 assert_eq!(meta1.get(&b1).unwrap().1 as u64, cm1);
1958 assert_eq!(meta1.get(&b2).unwrap().1 as u64, cm1);
1959
1960 s.write_block(b1, vec![3u8; BLOCK_SIZE]).await.expect("write b1 v2");
1962 s.sync().await.expect("sync #2");
1963
1964 let cm2 = get_commit_marker(db);
1965 assert_eq!(cm2, 2, "second sync should advance commit marker to 2");
1966 let meta2 = s.get_block_metadata_for_testing();
1967 assert_eq!(meta2.get(&b1).unwrap().1 as u64, cm2, "updated block tracks new version");
1968 assert_eq!(meta2.get(&b2).unwrap().1 as u64, 1, "unchanged block retains prior version");
1969 }
1970}