block_db/
uncorrupt.rs

1// Authors: Robert Lopez
2
3use crate::{data_file::wal::DataFileLog, error::Error, BlockDB, ConfirmDestructiveAction};
4use serde::{Deserialize, Serialize};
5use std::{
6    collections::{HashMap, VecDeque},
7    path::PathBuf,
8};
9use tokio::fs::rename;
10
11/// Action to resolve to corruption of a BlockDB
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub enum UncorruptAction {
14    Clear {
15        data_file_id: String,
16    },
17    Recover,
18    BadBatchUnwind {
19        data_file_ids_to_delete: Vec<String>,
20        data_file_inverse_log_map: HashMap<String, Vec<DataFileLog>>,
21    },
22    DataFileCompactMove {
23        path: PathBuf,
24        temp_path: PathBuf,
25    },
26}
27
28impl BlockDB {
29    /// Attempts to resolve a `BlockDB` corruption state.
30    ///
31    /// Used in response to an `Error::Corrupted`, this method attempts to
32    /// automatically resolve the issue by performing recovery steps.
33    ///
34    /// The only error this method can return is another `Error::Corrupted`,
35    /// indicating that the corruption has not yet been resolved and recovery
36    /// should be retried. This allows multiple safe attempts to uncorrupt
37    /// the database.
38    ///
39    /// If specific `DataFile`s remain deadlocked — even after ensuring
40    /// filesystem and hardware stability — it likely indicates unrecoverable
41    /// corruption in the WAL or binary file contents.
42    ///
43    /// ---
44    /// Example
45    /// ``` let mut block_db = BlockDB::open("./data", None).await?;
46    ///
47    /// // ...
48    ///
49    /// if let Err(err) = block_db
50    ///     .batch(vec![b"Hello", b"World"], vec![])
51    ///     .await
52    /// {
53    ///     if let Error::Corrupted { action, .. } = err {
54    ///         if let Err(Error::Corrupted { action, .. }) = block_db.uncorrupt(action).await {
55    ///             // Store this action to process after ensuring stability
56    ///         }
57    ///     }
58    /// }
59    /// ```
60    pub async fn uncorrupt(&mut self, action: UncorruptAction) -> Result<(), Error> {
61        match action {
62            UncorruptAction::Clear { data_file_id } => {
63                if let Some(data_file) = self.data_files.read().await.get(&data_file_id) {
64                    let mut data_file_writer = data_file.write().await;
65                    data_file_writer
66                        .truncate(0)
67                        .await
68                        .map_err(|err| Error::Corrupted {
69                            err: Box::new(err),
70                            action: UncorruptAction::Clear { data_file_id },
71                        })?;
72
73                    data_file_writer.size = 0;
74                    data_file_writer.free_bytes = 0;
75                    data_file_writer.used_bytes = 0;
76                    data_file_writer.data_blocks = HashMap::new();
77                    data_file_writer.free_chunk_offsets = VecDeque::new();
78                }
79            }
80            UncorruptAction::Recover => {
81                self.recover().await.map_err(|err| Error::Corrupted {
82                    err: Box::new(err),
83                    action: UncorruptAction::Recover,
84                })?;
85            }
86            UncorruptAction::BadBatchUnwind {
87                data_file_ids_to_delete,
88                data_file_inverse_log_map,
89            } => {
90                let mut data_file_ids_to_delete_queue = VecDeque::from(data_file_ids_to_delete);
91
92                while let Some(data_file_id) = data_file_ids_to_delete_queue.pop_front() {
93                    if let Err(err) = self
94                        .delete_data_file(data_file_id, ConfirmDestructiveAction::IKnowWhatImDoing)
95                        .await
96                    {
97                        return Err(Error::Corrupted {
98                            err: Box::new(err),
99                            action: UncorruptAction::BadBatchUnwind {
100                                data_file_ids_to_delete: data_file_ids_to_delete_queue.into(),
101                                data_file_inverse_log_map,
102                            },
103                        });
104                    }
105                }
106
107                let data_files_reader = self.data_files.read().await;
108                let mut data_file_inverse_logs_queue = data_file_inverse_log_map
109                    .into_iter()
110                    .collect::<VecDeque<(String, Vec<DataFileLog>)>>(
111                );
112
113                while let Some((data_file_id, data_file_logs)) =
114                    data_file_inverse_logs_queue.pop_front()
115                {
116                    if let Some(data_file) = data_files_reader.get(&data_file_id) {
117                        if let Err(err) =
118                            data_file.write().await.wal.log_many(&data_file_logs).await
119                        {
120                            return Err(Error::Corrupted {
121                                err: Box::new(Error::Walr(err)),
122                                action: UncorruptAction::BadBatchUnwind {
123                                    data_file_ids_to_delete: vec![],
124                                    data_file_inverse_log_map: data_file_inverse_logs_queue
125                                        .into_iter()
126                                        .collect(),
127                                },
128                            });
129                        }
130                    } else {
131                        debug_assert!(
132                            data_files_reader.get(&data_file_id).is_some(),
133                            "BlockDB::uncorrupt was passed a non-existent DataFile ID {data_file_id}"
134                        );
135                    }
136                }
137
138                drop(data_files_reader);
139                self.recover().await.map_err(|err| Error::Corrupted {
140                    err: Box::new(err),
141                    action: UncorruptAction::Recover,
142                })?;
143            }
144            UncorruptAction::DataFileCompactMove { path, temp_path } => {
145                rename(&temp_path, &path)
146                    .await
147                    .map_err(|err| Error::Corrupted {
148                        err: Box::new(Error::IO(err)),
149                        action: UncorruptAction::DataFileCompactMove { path, temp_path },
150                    })?;
151            }
152        }
153
154        Ok(())
155    }
156}