1use std::fs;
9use std::io;
10use std::io::BufRead;
11use std::io::Read;
12use std::io::Seek;
13use std::io::SeekFrom;
14use std::io::Write;
15use std::path::Path;
16
17use crate::errors::IoResultExt;
18use crate::errors::ResultExt;
19use crate::lock::ScopedDirLock;
20use crate::log::GenericPath;
21use crate::log::Log;
22use crate::log::LogMetadata;
23use crate::log::OpenOptions;
24use crate::log::META_FILE;
25use crate::log::PRIMARY_FILE;
26use crate::log::PRIMARY_HEADER;
27use crate::log::PRIMARY_START_OFFSET;
28use crate::repair::OpenOptionsOutput;
29use crate::repair::OpenOptionsRepair;
30use crate::repair::RepairMessage;
31use crate::utils;
32use crate::utils::mmap_path;
33
34impl OpenOptions {
36 pub fn repair(&self, dir: impl Into<GenericPath>) -> crate::Result<String> {
45 let dir = dir.into();
46 let dir = match dir.as_opt_path() {
47 Some(dir) => dir,
48 None => return Ok(format!("{:?} is not on disk. Nothing to repair.\n", &dir)),
49 };
50
51 let result: crate::Result<_> = (|| {
52 if !dir.exists() {
53 return Ok(format!("{:?} does not exist. Nothing to repair.\n", dir));
54 }
55
56 let lock = ScopedDirLock::new(dir)?;
57 let mut message = RepairMessage::new(dir);
58 message += &format!("Processing IndexedLog: {:?}\n", dir);
59
60 let primary_path = dir.join(PRIMARY_FILE);
61 let meta_path = dir.join(META_FILE);
62
63 (|| -> crate::Result<()> {
65 #[allow(clippy::never_loop)]
66 let header_corrupted = loop {
67 if let Err(e) = primary_path.metadata() {
68 if e.kind() == io::ErrorKind::NotFound {
69 break true;
70 }
71 }
72 let mut file = fs::OpenOptions::new()
73 .read(true)
74 .open(&primary_path)
75 .context(&primary_path, "cannot open for read")?;
76 let mut buf = [0; PRIMARY_START_OFFSET as usize];
77 break match file.read_exact(&mut buf) {
78 Ok(_) => buf != PRIMARY_HEADER,
79 Err(_) => true,
80 };
81 };
82 if header_corrupted {
83 let mut file = fs::OpenOptions::new()
84 .write(true)
85 .create(true)
86 .open(&primary_path)
87 .context(&primary_path, "cannot open for write")?;
88 file.write_all(PRIMARY_HEADER)
89 .context(&primary_path, "cannot re-write header")?;
90 let _ = utils::fix_perm_file(&file, false);
91 message += "Fixed header in log\n";
92 }
93 Ok(())
94 })()
95 .context("while making sure log has the right header")?;
96
97 (|| -> crate::Result<()> {
99 let primary_len = primary_path
100 .metadata()
101 .context(&primary_path, "cannot read fs metadata")?
102 .len();
103 match LogMetadata::read_file(&meta_path)
104 .context("repair cannot fix metadata corruption")
105 {
106 Ok(meta) => {
107 if meta.primary_len > primary_len {
109 use fs2::FileExt;
110 let file = fs::OpenOptions::new()
114 .write(true)
115 .open(&primary_path)
116 .context(&primary_path, "cannot open for write")?;
117 file.allocate(meta.primary_len)
118 .context(&primary_path, "cannot fallocate")?;
119 message += &format!(
120 "Extended log to {:?} bytes required by meta\n",
121 meta.primary_len
122 );
123 }
124 }
125 Err(meta_err) => {
126 let meta = LogMetadata::new_with_primary_len(primary_len);
128 meta.write_file(&meta_path, self.fsync)
129 .context("while recreating meta")
130 .source(meta_err)?;
131 message += "Rebuilt metadata\n";
132 }
133 }
134 Ok(())
135 })()
136 .context("while making sure log.length >= meta.log_length")?;
137
138 let mut log = self
145 .open_with_lock(&dir.into(), &lock)
146 .or_else(|_| {
147 self.clone()
148 .index_defs(Vec::new())
149 .open(GenericPath::from(dir))
150 })
151 .context("cannot open log for repair")?;
152
153 let mut iter = log.iter();
154
155 let mut entry_count = 0;
157 while let Some(Ok(_)) = iter.next() {
158 entry_count += 1;
159 }
160
161 let valid_len = iter.next_offset;
162 assert!(valid_len >= PRIMARY_START_OFFSET);
163 assert!(valid_len <= log.meta.primary_len);
164
165 if valid_len == log.meta.primary_len {
166 message += &format!(
167 "Verified {} entries, {} bytes in log\n",
168 entry_count, valid_len
169 );
170 } else {
171 message += &format!(
172 "Verified first {} entries, {} of {} bytes in log\n",
173 entry_count, valid_len, log.meta.primary_len
174 );
175
176 (|| -> crate::Result<()> {
178 let mut primary_file = fs::OpenOptions::new()
179 .read(true)
180 .open(&primary_path)
181 .context(&primary_path, "cannot open for read")?;
182 let backup_path = dir.join(format!(
183 "log.bak.epoch{}.offset{}",
184 log.meta.epoch, valid_len
185 ));
186 let mut backup_file = fs::OpenOptions::new()
187 .create_new(true)
188 .write(true)
189 .open(&backup_path)
190 .context(&backup_path, "cannot open")?;
191
192 primary_file
193 .seek(SeekFrom::Start(valid_len))
194 .context(&primary_path, "cannot seek")?;
195
196 let mut reader = io::BufReader::new(primary_file);
197 loop {
198 let len = {
199 let buf = reader.fill_buf().context(&primary_path, "cannot read")?;
200 if buf.is_empty() {
201 break;
202 }
203 backup_file
204 .write_all(buf)
205 .context(&backup_path, "cannot write")?;
206 buf.len()
207 };
208 reader.consume(len);
209 }
210 message += &format!("Backed up corrupted log to {:?}\n", backup_path);
211 Ok(())
212 })()
213 .context("while trying to backup corrupted log")?;
214
215 log.meta.primary_len = valid_len;
219 log.meta.indexes.clear();
220 log.meta.epoch = log.meta.epoch.wrapping_add(1);
221 log.disk_buf = mmap_path(&primary_path, valid_len)?;
222
223 log.meta
224 .write_file(&meta_path, log.open_options.fsync)
225 .context("while trying to update metadata with verified log length")?;
226 message += &format!("Reset log size to {}\n", valid_len);
227 }
228
229 log.open_options.index_defs = self.index_defs.clone();
233 message += &log
234 .rebuild_indexes_with_lock(false, &lock)
235 .context("while trying to update indexes with reapired log")?;
236
237 Ok(message.into_string())
238 })();
239
240 result.context(|| format!("in log::OpenOptions::repair({:?})", dir))
241 }
242}
243
244impl OpenOptionsRepair for OpenOptions {
245 fn open_options_repair(&self, dir: impl AsRef<Path>) -> crate::Result<String> {
246 OpenOptions::repair(self, dir.as_ref())
247 }
248}
249
250impl OpenOptionsOutput for OpenOptions {
251 type Output = Log;
252
253 fn open_path(&self, path: &Path) -> crate::Result<Self::Output> {
254 self.open(path)
255 }
256}
257
258impl OpenOptions {
259 pub fn delete_content(&self, dir: impl Into<GenericPath>) -> crate::Result<()> {
264 let dir = dir.into();
265 let dir = match dir.as_opt_path() {
266 Some(dir) => dir,
267 None => return Ok(()),
268 };
269 let result: crate::Result<()> = (|| {
270 utils::mkdir_p(dir)?;
272
273 let lock = ScopedDirLock::new(dir)?;
275
276 let meta = LogMetadata::new_with_primary_len(PRIMARY_START_OFFSET);
278 let meta_path = dir.join(META_FILE);
279 meta.write_file(meta_path, self.fsync)?;
280
281 let primary_path = dir.join(PRIMARY_FILE);
283 utils::atomic_write_plain(&primary_path, PRIMARY_HEADER, self.fsync)?;
284
285 let log = self
287 .clone()
288 .create(true)
289 .open_with_lock(&dir.into(), &lock)
290 .context("cannot open")?;
291 log.rebuild_indexes_with_lock(true, &lock)?;
292
293 Ok(())
294 })();
295
296 result.context(|| format!("in log::OpenOptions::delete_content({:?})", dir))
297 }
298}