absurder_sql/storage/
recovery.rs

1//! Recovery operations for BlockStorage
2//! This module contains startup recovery and integrity verification functionality
3
4use super::block_storage::{
5    BlockStorage, CorruptionAction, RecoveryMode, RecoveryOptions, RecoveryReport,
6};
7use crate::types::DatabaseError;
8
9// Lock macro for accessing BlockStorage Mutex-wrapped fields
10#[allow(unused_macros)]
11#[cfg(target_arch = "wasm32")]
12macro_rules! lock_mutex {
13    ($mutex:expr) => {
14        $mutex
15            .try_borrow_mut()
16            .expect("RefCell borrow failed - reentrancy detected in recovery.rs")
17    };
18}
19
20#[allow(unused_macros)]
21#[cfg(not(target_arch = "wasm32"))]
22macro_rules! lock_mutex {
23    ($mutex:expr) => {
24        $mutex.lock()
25    };
26}
27
28#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
29use crate::storage::{BLOCK_SIZE, ChecksumAlgorithm};
30#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
31use std::collections::HashMap;
32#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
33use std::io::Read;
34
35#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
36use std::{fs, io::Write};
37
38/// Perform startup recovery verification on a BlockStorage instance
39pub async fn perform_startup_recovery(
40    storage: &mut BlockStorage,
41    opts: RecoveryOptions,
42) -> Result<(), DatabaseError> {
43    let start_time = BlockStorage::now_millis();
44    log::info!("Starting startup recovery with mode: {:?}", opts.mode);
45
46    // Handle pending metadata commit markers (fs_persist only)
47    #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
48    {
49        use std::io::Write;
50
51        let mut db_dir = storage.base_dir.clone();
52        db_dir.push(&storage.db_name);
53        let meta_path = db_dir.join("metadata.json");
54        let meta_pending_path = db_dir.join("metadata.json.pending");
55        let blocks_dir = db_dir.join("blocks");
56
57        if let Ok(pending_content) = std::fs::read_to_string(&meta_pending_path) {
58            log::warn!(
59                "Found pending metadata commit marker at startup: {:?}",
60                meta_pending_path
61            );
62
63            let mut finalize = true;
64            let mut parsed_val: Option<serde_json::Value> = None;
65            if let Ok(val) = serde_json::from_str::<serde_json::Value>(&pending_content) {
66                parsed_val = Some(val.clone());
67                if let Some(entries) = val.get("entries").and_then(|v| v.as_array()) {
68                    for entry in entries {
69                        if let Some(arr) = entry.as_array() {
70                            if arr.len() == 2 {
71                                if let Some(block_id) = arr.first().and_then(|v| v.as_u64()) {
72                                    let bpath = blocks_dir.join(format!("block_{}.bin", block_id));
73                                    match std::fs::metadata(&bpath) {
74                                        Ok(meta) => {
75                                            if !meta.is_file() || meta.len() as usize != BLOCK_SIZE
76                                            {
77                                                log::warn!(
78                                                    "Pending commit references block {} but file invalid: {:?}",
79                                                    block_id,
80                                                    bpath
81                                                );
82                                                finalize = false;
83                                                break;
84                                            }
85                                        }
86                                        Err(_) => {
87                                            log::warn!(
88                                                "Pending commit references missing block file for id {}: {:?}",
89                                                block_id,
90                                                bpath
91                                            );
92                                            finalize = false;
93                                            break;
94                                        }
95                                    }
96                                }
97                            }
98                        }
99                    }
100                }
101            } else {
102                // Malformed pending file -> rollback
103                log::error!("Malformed metadata.json.pending; rolling back");
104                finalize = false;
105            }
106
107            if finalize {
108                // Finalize: write pending content to metadata.json and remove pending
109                if let Ok(mut f) = std::fs::File::create(&meta_path) {
110                    let _ = f.write_all(pending_content.as_bytes());
111                }
112                let _ = std::fs::remove_file(&meta_pending_path);
113                log::info!("Finalized pending metadata commit to {:?}", meta_path);
114
115                // Update in-memory checksum and algo maps from finalized metadata
116                if let Some(val) = parsed_val {
117                    let mut checksums_new: std::collections::HashMap<u64, u64> =
118                        std::collections::HashMap::new();
119                    let mut algos_new: std::collections::HashMap<u64, ChecksumAlgorithm> =
120                        std::collections::HashMap::new();
121                    if let Some(entries) = val.get("entries").and_then(|v| v.as_array()) {
122                        for entry in entries.iter() {
123                            if let Some(arr) = entry.as_array() {
124                                if arr.len() == 2 {
125                                    if let (Some(bid), Some(meta)) = (
126                                        arr.first().and_then(|v| v.as_u64()),
127                                        arr.get(1).and_then(|v| v.as_object()),
128                                    ) {
129                                        if let Some(csum) =
130                                            meta.get("checksum").and_then(|v| v.as_u64())
131                                        {
132                                            checksums_new.insert(bid, csum);
133                                        }
134                                        let algo_str =
135                                            meta.get("algo").and_then(|v| v.as_str()).unwrap_or("");
136                                        let algo = match algo_str {
137                                            "CRC32" => Some(ChecksumAlgorithm::CRC32),
138                                            "FastHash" => Some(ChecksumAlgorithm::FastHash),
139                                            _ => None,
140                                        };
141                                        if let Some(a) = algo {
142                                            algos_new.insert(bid, a);
143                                        }
144                                    }
145                                }
146                            }
147                        }
148                    }
149                    storage
150                        .checksum_manager
151                        .replace_all(checksums_new, algos_new);
152                }
153            } else {
154                // Rollback: just remove the pending file, retain existing metadata.json
155                let _ = std::fs::remove_file(&meta_pending_path);
156                log::info!("Rolled back pending metadata commit; kept {:?}", meta_path);
157            }
158        }
159    }
160
161    // Extended scan/reconciliation of blocks vs metadata (fs_persist only)
162    #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
163    {
164        use std::collections::HashSet as Set;
165
166        let mut db_dir = storage.base_dir.clone();
167        db_dir.push(&storage.db_name);
168        let meta_path = db_dir.join("metadata.json");
169        let meta_pending_path = db_dir.join("metadata.json.pending");
170        let blocks_dir = db_dir.join("blocks");
171
172        // Load current metadata entries (preserve fields to keep version/last_modified)
173        let mut entries_val: Vec<serde_json::Value> = Vec::new();
174        let mut meta_ids: Set<u64> = Set::new();
175        if let Ok(mut f) = fs::File::open(&meta_path) {
176            let mut s = String::new();
177            if f.read_to_string(&mut s).is_ok() {
178                if let Ok(val) = serde_json::from_str::<serde_json::Value>(&s) {
179                    if let Some(entries) = val.get("entries").and_then(|v| v.as_array()).cloned() {
180                        entries_val = entries;
181                        for entry in entries_val.iter() {
182                            if let Some(arr) = entry.as_array() {
183                                if let Some(id) = arr.first().and_then(|v| v.as_u64()) {
184                                    meta_ids.insert(id);
185                                }
186                            }
187                        }
188                    }
189                }
190            }
191        }
192
193        // Collect block file IDs from disk
194        let mut file_ids: Set<u64> = Set::new();
195        if let Ok(entries) = fs::read_dir(&blocks_dir) {
196            for entry in entries.flatten() {
197                if let Ok(ft) = entry.file_type() {
198                    if ft.is_file() {
199                        if let Some(name) = entry.file_name().to_str() {
200                            if let Some(id_str) = name
201                                .strip_prefix("block_")
202                                .and_then(|s| s.strip_suffix(".bin"))
203                            {
204                                if let Ok(id) = id_str.parse::<u64>() {
205                                    file_ids.insert(id);
206                                }
207                            }
208                        }
209                    }
210                }
211            }
212        }
213
214        // Remove stray files without metadata
215        let stray: Vec<u64> = file_ids.difference(&meta_ids).copied().collect();
216        if !stray.is_empty() {
217            log::warn!(
218                "[fs] Found {} stray block files with no metadata: {:?}",
219                stray.len(),
220                stray
221            );
222            for id in &stray {
223                let p = blocks_dir.join(format!("block_{}.bin", id));
224                match fs::remove_file(&p) {
225                    Ok(()) => log::info!("[fs] Removed stray block file {:?}", p),
226                    Err(e) => log::error!("[fs] Failed to remove stray block file {:?}: {}", p, e),
227                }
228            }
229            // Fsync blocks directory to persist deletions (best-effort on Unix)
230            #[cfg(unix)]
231            if let Ok(dirf) = fs::OpenOptions::new().read(true).open(&blocks_dir) {
232                let _ = dirf.sync_all();
233            }
234        }
235
236        // Remove metadata entries whose files are missing/invalid
237        let before_len = entries_val.len();
238        // Track deletions of invalid-sized files to fsync directory after
239        let mut deleted_invalid_files: usize = 0;
240        if before_len > 0 {
241            entries_val.retain(|entry| {
242                if let Some(arr) = entry.as_array() {
243                    if let Some(id) = arr.first().and_then(|v| v.as_u64()) {
244                        let p = blocks_dir.join(format!("block_{}.bin", id));
245                        match fs::metadata(&p) {
246                            Ok(meta) => {
247                                if meta.is_file() && meta.len() as usize == BLOCK_SIZE {
248                                    true
249                                } else {
250                                    // Invalid-sized or non-regular file: drop metadata and delete file now
251                                    log::warn!(
252                                        "[fs] Removing metadata for block {} due to invalid file (len={} bytes); deleting {:?}",
253                                        id, meta.len(), p
254                                    );
255                                    match fs::remove_file(&p) {
256                                        Ok(()) => {
257                                            deleted_invalid_files += 1;
258                                            log::info!("[fs] Deleted invalid-sized block file {:?}", p);
259                                        }
260                                        Err(e) => {
261                                            log::error!("[fs] Failed to delete invalid-sized block file {:?}: {}", p, e);
262                                        }
263                                    }
264                                    false
265                                }
266                            }
267                            Err(_) => {
268                                log::warn!("[fs] Removing metadata entry for block {} due to missing file {:?}", id, p);
269                                false
270                            }
271                        }
272                    } else {
273                        true
274                    }
275                } else {
276                    true
277                }
278            });
279        }
280        // If we deleted any invalid-sized files, fsync the blocks directory (best-effort on Unix)
281        if deleted_invalid_files > 0 {
282            #[cfg(unix)]
283            if let Ok(dirf) = fs::OpenOptions::new().read(true).open(&blocks_dir) {
284                let _ = dirf.sync_all();
285            }
286            log::info!(
287                "[fs] Deleted {} invalid-sized block file(s) during reconciliation",
288                deleted_invalid_files
289            );
290        }
291        let meta_changed = entries_val.len() != before_len;
292
293        // If metadata changed, rewrite atomically and update in-memory maps
294        let mut kept_ids: Set<u64> = Set::new();
295        if meta_changed {
296            for entry in entries_val.iter() {
297                if let Some(arr) = entry.as_array() {
298                    if let Some(id) = arr.first().and_then(|v| v.as_u64()) {
299                        kept_ids.insert(id);
300                    }
301                }
302            }
303            let new_val = serde_json::json!({ "entries": entries_val });
304            if let Ok(mut f) = fs::File::create(&meta_pending_path) {
305                let _ = f.write_all(
306                    serde_json::to_string(&new_val)
307                        .unwrap_or_else(|_| "{\"entries\":[]}".into())
308                        .as_bytes(),
309                );
310                let _ = f.sync_all();
311            }
312            let _ = fs::rename(&meta_pending_path, &meta_path);
313            // Fsync metadata.json and its parent dir
314            if let Ok(f) = fs::File::open(&meta_path) {
315                let _ = f.sync_all();
316            }
317            #[cfg(unix)]
318            if let Ok(dirf) = fs::OpenOptions::new().read(true).open(&db_dir) {
319                let _ = dirf.sync_all();
320            }
321            log::info!(
322                "[fs] Rewrote metadata.json after reconciliation; entries={} ",
323                kept_ids.len()
324            );
325
326            // Update in-memory checksum and algo maps to match filtered metadata
327            let mut checksums_new: HashMap<u64, u64> = HashMap::new();
328            let mut algos_new: HashMap<u64, ChecksumAlgorithm> = HashMap::new();
329            if let Some(entries) = new_val.get("entries").and_then(|v| v.as_array()) {
330                for entry in entries.iter() {
331                    if let Some(arr) = entry.as_array() {
332                        if let (Some(bid), Some(meta)) = (
333                            arr.first().and_then(|v| v.as_u64()),
334                            arr.get(1).and_then(|v| v.as_object()),
335                        ) {
336                            if let Some(csum) = meta.get("checksum").and_then(|v| v.as_u64()) {
337                                checksums_new.insert(bid, csum);
338                            }
339                            let algo = match meta.get("algo").and_then(|v| v.as_str()) {
340                                Some("CRC32") => Some(ChecksumAlgorithm::CRC32),
341                                Some("FastHash") => Some(ChecksumAlgorithm::FastHash),
342                                _ => None,
343                            };
344                            if let Some(a) = algo {
345                                algos_new.insert(bid, a);
346                            }
347                        }
348                    }
349                }
350            }
351            storage
352                .checksum_manager
353                .replace_all(checksums_new, algos_new);
354        } else {
355            kept_ids = meta_ids;
356        }
357
358        // Reconcile allocations to the metadata IDs
359        let needs_update = {
360            let allocated = lock_mutex!(storage.allocated_blocks);
361            *allocated != kept_ids
362        };
363
364        if needs_update {
365            *lock_mutex!(storage.allocated_blocks) = kept_ids.clone();
366            let max_id = lock_mutex!(storage.allocated_blocks)
367                .iter()
368                .copied()
369                .max()
370                .unwrap_or(0);
371            storage
372                .next_block_id
373                .store(max_id + 1, std::sync::atomic::Ordering::SeqCst);
374
375            // Persist allocations.json atomically via temp rename
376            let alloc_path = db_dir.join("allocations.json");
377            let alloc_tmp = db_dir.join("allocations.json.tmp");
378            let mut allocated_vec: Vec<u64> = lock_mutex!(storage.allocated_blocks)
379                .iter()
380                .copied()
381                .collect();
382            allocated_vec.sort_unstable();
383            let alloc_json = serde_json::json!({ "allocated": allocated_vec });
384            if let Ok(mut f) = fs::File::create(&alloc_tmp) {
385                let _ = f.write_all(
386                    serde_json::to_string(&alloc_json)
387                        .unwrap_or_else(|_| "{\"allocated\":[]}".into())
388                        .as_bytes(),
389                );
390                let _ = f.sync_all();
391            }
392            let _ = fs::rename(&alloc_tmp, &alloc_path);
393            // Fsync allocations.json and directory (best-effort)
394            if let Ok(f) = fs::File::open(&alloc_path) {
395                let _ = f.sync_all();
396            }
397            #[cfg(unix)]
398            if let Ok(dirf) = fs::OpenOptions::new().read(true).open(&db_dir) {
399                let _ = dirf.sync_all();
400            }
401            let allocated_len = lock_mutex!(storage.allocated_blocks).len();
402            log::info!(
403                "[fs] Rewrote allocations.json after reconciliation; allocated={}",
404                allocated_len
405            );
406        }
407    }
408
409    let mut corrupted_blocks = Vec::new();
410    let mut repaired_blocks = Vec::new();
411
412    // Skip recovery if requested
413    if matches!(opts.mode, RecoveryMode::Skip) {
414        log::info!("Startup recovery skipped by configuration");
415        storage.recovery_report = RecoveryReport {
416            total_blocks_verified: 0,
417            corrupted_blocks: Vec::new(),
418            repaired_blocks: Vec::new(),
419            verification_duration_ms: BlockStorage::now_millis() - start_time,
420        };
421        return Ok(());
422    }
423
424    // Get list of blocks to verify based on mode
425    let blocks_to_verify = storage.get_blocks_for_verification(&opts.mode).await?;
426    let total_verified = blocks_to_verify.len();
427
428    log::info!(
429        "Verifying {} blocks during startup recovery",
430        total_verified
431    );
432
433    // Verify each block
434    for block_id in blocks_to_verify {
435        match storage.verify_block_integrity(block_id).await {
436            Ok(true) => {
437                log::debug!("Block {} passed integrity check", block_id);
438            }
439            Ok(false) => {
440                log::warn!("Block {} failed integrity check", block_id);
441                corrupted_blocks.push(block_id);
442
443                // Handle corruption based on policy
444                match opts.on_corruption {
445                    CorruptionAction::Report => {
446                        log::info!("Corruption in block {} reported", block_id);
447                    }
448                    CorruptionAction::Repair => {
449                        if storage.repair_corrupted_block(block_id).await? {
450                            log::info!("Successfully repaired block {}", block_id);
451                            repaired_blocks.push(block_id);
452                        } else {
453                            log::error!("Failed to repair block {}", block_id);
454                        }
455                    }
456                    CorruptionAction::Fail => {
457                        return Err(DatabaseError::new(
458                            "STARTUP_RECOVERY_FAILED",
459                            &format!(
460                                "Corrupted block {} detected and failure policy is active",
461                                block_id
462                            ),
463                        ));
464                    }
465                }
466            }
467            Err(e) => {
468                log::error!("Error verifying block {}: {}", block_id, e.message);
469                if matches!(opts.on_corruption, CorruptionAction::Fail) {
470                    return Err(e);
471                }
472            }
473        }
474    }
475
476    let duration = BlockStorage::now_millis() - start_time;
477    log::info!(
478        "Startup recovery completed: {} blocks verified, {} corrupted, {} repaired in {}ms",
479        total_verified,
480        corrupted_blocks.len(),
481        repaired_blocks.len(),
482        duration
483    );
484
485    storage.recovery_report = RecoveryReport {
486        total_blocks_verified: total_verified,
487        corrupted_blocks,
488        repaired_blocks,
489        verification_duration_ms: duration,
490    };
491
492    Ok(())
493}