1use crate::{data_file::wal::DataFileLog, error::Error, BlockDB, ConfirmDestructiveAction};
4use serde::{Deserialize, Serialize};
5use std::{
6 collections::{HashMap, VecDeque},
7 path::PathBuf,
8};
9use tokio::fs::rename;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub enum UncorruptAction {
14 Clear {
15 data_file_id: String,
16 },
17 Recover,
18 BadBatchUnwind {
19 data_file_ids_to_delete: Vec<String>,
20 data_file_inverse_log_map: HashMap<String, Vec<DataFileLog>>,
21 },
22 DataFileCompactMove {
23 path: PathBuf,
24 temp_path: PathBuf,
25 },
26}
27
28impl BlockDB {
29 pub async fn uncorrupt(&mut self, action: UncorruptAction) -> Result<(), Error> {
61 match action {
62 UncorruptAction::Clear { data_file_id } => {
63 if let Some(data_file) = self.data_files.read().await.get(&data_file_id) {
64 let mut data_file_writer = data_file.write().await;
65 data_file_writer
66 .truncate(0)
67 .await
68 .map_err(|err| Error::Corrupted {
69 err: Box::new(err),
70 action: UncorruptAction::Clear { data_file_id },
71 })?;
72
73 data_file_writer.size = 0;
74 data_file_writer.free_bytes = 0;
75 data_file_writer.used_bytes = 0;
76 data_file_writer.data_blocks = HashMap::new();
77 data_file_writer.free_chunk_offsets = VecDeque::new();
78 }
79 }
80 UncorruptAction::Recover => {
81 self.recover().await.map_err(|err| Error::Corrupted {
82 err: Box::new(err),
83 action: UncorruptAction::Recover,
84 })?;
85 }
86 UncorruptAction::BadBatchUnwind {
87 data_file_ids_to_delete,
88 data_file_inverse_log_map,
89 } => {
90 let mut data_file_ids_to_delete_queue = VecDeque::from(data_file_ids_to_delete);
91
92 while let Some(data_file_id) = data_file_ids_to_delete_queue.pop_front() {
93 if let Err(err) = self
94 .delete_data_file(data_file_id, ConfirmDestructiveAction::IKnowWhatImDoing)
95 .await
96 {
97 return Err(Error::Corrupted {
98 err: Box::new(err),
99 action: UncorruptAction::BadBatchUnwind {
100 data_file_ids_to_delete: data_file_ids_to_delete_queue.into(),
101 data_file_inverse_log_map,
102 },
103 });
104 }
105 }
106
107 let data_files_reader = self.data_files.read().await;
108 let mut data_file_inverse_logs_queue = data_file_inverse_log_map
109 .into_iter()
110 .collect::<VecDeque<(String, Vec<DataFileLog>)>>(
111 );
112
113 while let Some((data_file_id, data_file_logs)) =
114 data_file_inverse_logs_queue.pop_front()
115 {
116 if let Some(data_file) = data_files_reader.get(&data_file_id) {
117 if let Err(err) =
118 data_file.write().await.wal.log_many(&data_file_logs).await
119 {
120 return Err(Error::Corrupted {
121 err: Box::new(Error::Walr(err)),
122 action: UncorruptAction::BadBatchUnwind {
123 data_file_ids_to_delete: vec![],
124 data_file_inverse_log_map: data_file_inverse_logs_queue
125 .into_iter()
126 .collect(),
127 },
128 });
129 }
130 } else {
131 debug_assert!(
132 data_files_reader.get(&data_file_id).is_some(),
133 "BlockDB::uncorrupt was passed a non-existent DataFile ID {data_file_id}"
134 );
135 }
136 }
137
138 drop(data_files_reader);
139 self.recover().await.map_err(|err| Error::Corrupted {
140 err: Box::new(err),
141 action: UncorruptAction::Recover,
142 })?;
143 }
144 UncorruptAction::DataFileCompactMove { path, temp_path } => {
145 rename(&temp_path, &path)
146 .await
147 .map_err(|err| Error::Corrupted {
148 err: Box::new(Error::IO(err)),
149 action: UncorruptAction::DataFileCompactMove { path, temp_path },
150 })?;
151 }
152 }
153
154 Ok(())
155 }
156}