1use super::block_storage::{
5 BlockStorage, CorruptionAction, RecoveryMode, RecoveryOptions, RecoveryReport,
6};
7use crate::types::DatabaseError;
8
9#[allow(unused_macros)]
11#[cfg(target_arch = "wasm32")]
12macro_rules! lock_mutex {
13 ($mutex:expr) => {
14 $mutex
15 .try_borrow_mut()
16 .expect("RefCell borrow failed - reentrancy detected in recovery.rs")
17 };
18}
19
20#[allow(unused_macros)]
21#[cfg(not(target_arch = "wasm32"))]
22macro_rules! lock_mutex {
23 ($mutex:expr) => {
24 $mutex.lock()
25 };
26}
27
28#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
29use crate::storage::{BLOCK_SIZE, ChecksumAlgorithm};
30#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
31use std::collections::HashMap;
32#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
33use std::io::Read;
34
35#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
36use std::{fs, io::Write};
37
38pub async fn perform_startup_recovery(
40 storage: &mut BlockStorage,
41 opts: RecoveryOptions,
42) -> Result<(), DatabaseError> {
43 let start_time = BlockStorage::now_millis();
44 log::info!("Starting startup recovery with mode: {:?}", opts.mode);
45
46 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
48 {
49 use std::io::Write;
50
51 let mut db_dir = storage.base_dir.clone();
52 db_dir.push(&storage.db_name);
53 let meta_path = db_dir.join("metadata.json");
54 let meta_pending_path = db_dir.join("metadata.json.pending");
55 let blocks_dir = db_dir.join("blocks");
56
57 if let Ok(pending_content) = std::fs::read_to_string(&meta_pending_path) {
58 log::warn!(
59 "Found pending metadata commit marker at startup: {:?}",
60 meta_pending_path
61 );
62
63 let mut finalize = true;
64 let mut parsed_val: Option<serde_json::Value> = None;
65 if let Ok(val) = serde_json::from_str::<serde_json::Value>(&pending_content) {
66 parsed_val = Some(val.clone());
67 if let Some(entries) = val.get("entries").and_then(|v| v.as_array()) {
68 for entry in entries {
69 if let Some(arr) = entry.as_array() {
70 if arr.len() == 2 {
71 if let Some(block_id) = arr.first().and_then(|v| v.as_u64()) {
72 let bpath = blocks_dir.join(format!("block_{}.bin", block_id));
73 match std::fs::metadata(&bpath) {
74 Ok(meta) => {
75 if !meta.is_file() || meta.len() as usize != BLOCK_SIZE
76 {
77 log::warn!(
78 "Pending commit references block {} but file invalid: {:?}",
79 block_id,
80 bpath
81 );
82 finalize = false;
83 break;
84 }
85 }
86 Err(_) => {
87 log::warn!(
88 "Pending commit references missing block file for id {}: {:?}",
89 block_id,
90 bpath
91 );
92 finalize = false;
93 break;
94 }
95 }
96 }
97 }
98 }
99 }
100 }
101 } else {
102 log::error!("Malformed metadata.json.pending; rolling back");
104 finalize = false;
105 }
106
107 if finalize {
108 if let Ok(mut f) = std::fs::File::create(&meta_path) {
110 let _ = f.write_all(pending_content.as_bytes());
111 }
112 let _ = std::fs::remove_file(&meta_pending_path);
113 log::info!("Finalized pending metadata commit to {:?}", meta_path);
114
115 if let Some(val) = parsed_val {
117 let mut checksums_new: std::collections::HashMap<u64, u64> =
118 std::collections::HashMap::new();
119 let mut algos_new: std::collections::HashMap<u64, ChecksumAlgorithm> =
120 std::collections::HashMap::new();
121 if let Some(entries) = val.get("entries").and_then(|v| v.as_array()) {
122 for entry in entries.iter() {
123 if let Some(arr) = entry.as_array() {
124 if arr.len() == 2 {
125 if let (Some(bid), Some(meta)) = (
126 arr.first().and_then(|v| v.as_u64()),
127 arr.get(1).and_then(|v| v.as_object()),
128 ) {
129 if let Some(csum) =
130 meta.get("checksum").and_then(|v| v.as_u64())
131 {
132 checksums_new.insert(bid, csum);
133 }
134 let algo_str =
135 meta.get("algo").and_then(|v| v.as_str()).unwrap_or("");
136 let algo = match algo_str {
137 "CRC32" => Some(ChecksumAlgorithm::CRC32),
138 "FastHash" => Some(ChecksumAlgorithm::FastHash),
139 _ => None,
140 };
141 if let Some(a) = algo {
142 algos_new.insert(bid, a);
143 }
144 }
145 }
146 }
147 }
148 }
149 storage
150 .checksum_manager
151 .replace_all(checksums_new, algos_new);
152 }
153 } else {
154 let _ = std::fs::remove_file(&meta_pending_path);
156 log::info!("Rolled back pending metadata commit; kept {:?}", meta_path);
157 }
158 }
159 }
160
161 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
163 {
164 use std::collections::HashSet as Set;
165
166 let mut db_dir = storage.base_dir.clone();
167 db_dir.push(&storage.db_name);
168 let meta_path = db_dir.join("metadata.json");
169 let meta_pending_path = db_dir.join("metadata.json.pending");
170 let blocks_dir = db_dir.join("blocks");
171
172 let mut entries_val: Vec<serde_json::Value> = Vec::new();
174 let mut meta_ids: Set<u64> = Set::new();
175 if let Ok(mut f) = fs::File::open(&meta_path) {
176 let mut s = String::new();
177 if f.read_to_string(&mut s).is_ok() {
178 if let Ok(val) = serde_json::from_str::<serde_json::Value>(&s) {
179 if let Some(entries) = val.get("entries").and_then(|v| v.as_array()).cloned() {
180 entries_val = entries;
181 for entry in entries_val.iter() {
182 if let Some(arr) = entry.as_array() {
183 if let Some(id) = arr.first().and_then(|v| v.as_u64()) {
184 meta_ids.insert(id);
185 }
186 }
187 }
188 }
189 }
190 }
191 }
192
193 let mut file_ids: Set<u64> = Set::new();
195 if let Ok(entries) = fs::read_dir(&blocks_dir) {
196 for entry in entries.flatten() {
197 if let Ok(ft) = entry.file_type() {
198 if ft.is_file() {
199 if let Some(name) = entry.file_name().to_str() {
200 if let Some(id_str) = name
201 .strip_prefix("block_")
202 .and_then(|s| s.strip_suffix(".bin"))
203 {
204 if let Ok(id) = id_str.parse::<u64>() {
205 file_ids.insert(id);
206 }
207 }
208 }
209 }
210 }
211 }
212 }
213
214 let stray: Vec<u64> = file_ids.difference(&meta_ids).copied().collect();
216 if !stray.is_empty() {
217 log::warn!(
218 "[fs] Found {} stray block files with no metadata: {:?}",
219 stray.len(),
220 stray
221 );
222 for id in &stray {
223 let p = blocks_dir.join(format!("block_{}.bin", id));
224 match fs::remove_file(&p) {
225 Ok(()) => log::info!("[fs] Removed stray block file {:?}", p),
226 Err(e) => log::error!("[fs] Failed to remove stray block file {:?}: {}", p, e),
227 }
228 }
229 #[cfg(unix)]
231 if let Ok(dirf) = fs::OpenOptions::new().read(true).open(&blocks_dir) {
232 let _ = dirf.sync_all();
233 }
234 }
235
236 let before_len = entries_val.len();
238 let mut deleted_invalid_files: usize = 0;
240 if before_len > 0 {
241 entries_val.retain(|entry| {
242 if let Some(arr) = entry.as_array() {
243 if let Some(id) = arr.first().and_then(|v| v.as_u64()) {
244 let p = blocks_dir.join(format!("block_{}.bin", id));
245 match fs::metadata(&p) {
246 Ok(meta) => {
247 if meta.is_file() && meta.len() as usize == BLOCK_SIZE {
248 true
249 } else {
250 log::warn!(
252 "[fs] Removing metadata for block {} due to invalid file (len={} bytes); deleting {:?}",
253 id, meta.len(), p
254 );
255 match fs::remove_file(&p) {
256 Ok(()) => {
257 deleted_invalid_files += 1;
258 log::info!("[fs] Deleted invalid-sized block file {:?}", p);
259 }
260 Err(e) => {
261 log::error!("[fs] Failed to delete invalid-sized block file {:?}: {}", p, e);
262 }
263 }
264 false
265 }
266 }
267 Err(_) => {
268 log::warn!("[fs] Removing metadata entry for block {} due to missing file {:?}", id, p);
269 false
270 }
271 }
272 } else {
273 true
274 }
275 } else {
276 true
277 }
278 });
279 }
280 if deleted_invalid_files > 0 {
282 #[cfg(unix)]
283 if let Ok(dirf) = fs::OpenOptions::new().read(true).open(&blocks_dir) {
284 let _ = dirf.sync_all();
285 }
286 log::info!(
287 "[fs] Deleted {} invalid-sized block file(s) during reconciliation",
288 deleted_invalid_files
289 );
290 }
291 let meta_changed = entries_val.len() != before_len;
292
293 let mut kept_ids: Set<u64> = Set::new();
295 if meta_changed {
296 for entry in entries_val.iter() {
297 if let Some(arr) = entry.as_array() {
298 if let Some(id) = arr.first().and_then(|v| v.as_u64()) {
299 kept_ids.insert(id);
300 }
301 }
302 }
303 let new_val = serde_json::json!({ "entries": entries_val });
304 if let Ok(mut f) = fs::File::create(&meta_pending_path) {
305 let _ = f.write_all(
306 serde_json::to_string(&new_val)
307 .unwrap_or_else(|_| "{\"entries\":[]}".into())
308 .as_bytes(),
309 );
310 let _ = f.sync_all();
311 }
312 let _ = fs::rename(&meta_pending_path, &meta_path);
313 if let Ok(f) = fs::File::open(&meta_path) {
315 let _ = f.sync_all();
316 }
317 #[cfg(unix)]
318 if let Ok(dirf) = fs::OpenOptions::new().read(true).open(&db_dir) {
319 let _ = dirf.sync_all();
320 }
321 log::info!(
322 "[fs] Rewrote metadata.json after reconciliation; entries={} ",
323 kept_ids.len()
324 );
325
326 let mut checksums_new: HashMap<u64, u64> = HashMap::new();
328 let mut algos_new: HashMap<u64, ChecksumAlgorithm> = HashMap::new();
329 if let Some(entries) = new_val.get("entries").and_then(|v| v.as_array()) {
330 for entry in entries.iter() {
331 if let Some(arr) = entry.as_array() {
332 if let (Some(bid), Some(meta)) = (
333 arr.first().and_then(|v| v.as_u64()),
334 arr.get(1).and_then(|v| v.as_object()),
335 ) {
336 if let Some(csum) = meta.get("checksum").and_then(|v| v.as_u64()) {
337 checksums_new.insert(bid, csum);
338 }
339 let algo = match meta.get("algo").and_then(|v| v.as_str()) {
340 Some("CRC32") => Some(ChecksumAlgorithm::CRC32),
341 Some("FastHash") => Some(ChecksumAlgorithm::FastHash),
342 _ => None,
343 };
344 if let Some(a) = algo {
345 algos_new.insert(bid, a);
346 }
347 }
348 }
349 }
350 }
351 storage
352 .checksum_manager
353 .replace_all(checksums_new, algos_new);
354 } else {
355 kept_ids = meta_ids;
356 }
357
358 let needs_update = {
360 let allocated = lock_mutex!(storage.allocated_blocks);
361 *allocated != kept_ids
362 };
363
364 if needs_update {
365 *lock_mutex!(storage.allocated_blocks) = kept_ids.clone();
366 let max_id = lock_mutex!(storage.allocated_blocks)
367 .iter()
368 .copied()
369 .max()
370 .unwrap_or(0);
371 storage
372 .next_block_id
373 .store(max_id + 1, std::sync::atomic::Ordering::SeqCst);
374
375 let alloc_path = db_dir.join("allocations.json");
377 let alloc_tmp = db_dir.join("allocations.json.tmp");
378 let mut allocated_vec: Vec<u64> = lock_mutex!(storage.allocated_blocks)
379 .iter()
380 .copied()
381 .collect();
382 allocated_vec.sort_unstable();
383 let alloc_json = serde_json::json!({ "allocated": allocated_vec });
384 if let Ok(mut f) = fs::File::create(&alloc_tmp) {
385 let _ = f.write_all(
386 serde_json::to_string(&alloc_json)
387 .unwrap_or_else(|_| "{\"allocated\":[]}".into())
388 .as_bytes(),
389 );
390 let _ = f.sync_all();
391 }
392 let _ = fs::rename(&alloc_tmp, &alloc_path);
393 if let Ok(f) = fs::File::open(&alloc_path) {
395 let _ = f.sync_all();
396 }
397 #[cfg(unix)]
398 if let Ok(dirf) = fs::OpenOptions::new().read(true).open(&db_dir) {
399 let _ = dirf.sync_all();
400 }
401 let allocated_len = lock_mutex!(storage.allocated_blocks).len();
402 log::info!(
403 "[fs] Rewrote allocations.json after reconciliation; allocated={}",
404 allocated_len
405 );
406 }
407 }
408
409 let mut corrupted_blocks = Vec::new();
410 let mut repaired_blocks = Vec::new();
411
412 if matches!(opts.mode, RecoveryMode::Skip) {
414 log::info!("Startup recovery skipped by configuration");
415 storage.recovery_report = RecoveryReport {
416 total_blocks_verified: 0,
417 corrupted_blocks: Vec::new(),
418 repaired_blocks: Vec::new(),
419 verification_duration_ms: BlockStorage::now_millis() - start_time,
420 };
421 return Ok(());
422 }
423
424 let blocks_to_verify = storage.get_blocks_for_verification(&opts.mode).await?;
426 let total_verified = blocks_to_verify.len();
427
428 log::info!(
429 "Verifying {} blocks during startup recovery",
430 total_verified
431 );
432
433 for block_id in blocks_to_verify {
435 match storage.verify_block_integrity(block_id).await {
436 Ok(true) => {
437 log::debug!("Block {} passed integrity check", block_id);
438 }
439 Ok(false) => {
440 log::warn!("Block {} failed integrity check", block_id);
441 corrupted_blocks.push(block_id);
442
443 match opts.on_corruption {
445 CorruptionAction::Report => {
446 log::info!("Corruption in block {} reported", block_id);
447 }
448 CorruptionAction::Repair => {
449 if storage.repair_corrupted_block(block_id).await? {
450 log::info!("Successfully repaired block {}", block_id);
451 repaired_blocks.push(block_id);
452 } else {
453 log::error!("Failed to repair block {}", block_id);
454 }
455 }
456 CorruptionAction::Fail => {
457 return Err(DatabaseError::new(
458 "STARTUP_RECOVERY_FAILED",
459 &format!(
460 "Corrupted block {} detected and failure policy is active",
461 block_id
462 ),
463 ));
464 }
465 }
466 }
467 Err(e) => {
468 log::error!("Error verifying block {}: {}", block_id, e.message);
469 if matches!(opts.on_corruption, CorruptionAction::Fail) {
470 return Err(e);
471 }
472 }
473 }
474 }
475
476 let duration = BlockStorage::now_millis() - start_time;
477 log::info!(
478 "Startup recovery completed: {} blocks verified, {} corrupted, {} repaired in {}ms",
479 total_verified,
480 corrupted_blocks.len(),
481 repaired_blocks.len(),
482 duration
483 );
484
485 storage.recovery_report = RecoveryReport {
486 total_blocks_verified: total_verified,
487 corrupted_blocks,
488 repaired_blocks,
489 verification_duration_ms: duration,
490 };
491
492 Ok(())
493}