indexedlog/log/
repair.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8use std::fs;
9use std::io;
10use std::io::BufRead;
11use std::io::Read;
12use std::io::Seek;
13use std::io::SeekFrom;
14use std::io::Write;
15use std::path::Path;
16
17use crate::errors::IoResultExt;
18use crate::errors::ResultExt;
19use crate::lock::ScopedDirLock;
20use crate::log::GenericPath;
21use crate::log::Log;
22use crate::log::LogMetadata;
23use crate::log::OpenOptions;
24use crate::log::META_FILE;
25use crate::log::PRIMARY_FILE;
26use crate::log::PRIMARY_HEADER;
27use crate::log::PRIMARY_START_OFFSET;
28use crate::repair::OpenOptionsOutput;
29use crate::repair::OpenOptionsRepair;
30use crate::repair::RepairMessage;
31use crate::utils;
32use crate::utils::mmap_path;
33
34// Repair
35impl OpenOptions {
36    /// Attempt to repair a broken [`Log`] at the given directory.
37    ///
38    /// This is done by truncating entries in the primary log, and rebuilding
39    /// corrupted indexes.
40    ///
41    /// Backup files are written for further investigation.
42    ///
43    /// Return message useful for human consumption.
44    pub fn repair(&self, dir: impl Into<GenericPath>) -> crate::Result<String> {
45        let dir = dir.into();
46        let dir = match dir.as_opt_path() {
47            Some(dir) => dir,
48            None => return Ok(format!("{:?} is not on disk. Nothing to repair.\n", &dir)),
49        };
50
51        let result: crate::Result<_> = (|| {
52            if !dir.exists() {
53                return Ok(format!("{:?} does not exist. Nothing to repair.\n", dir));
54            }
55
56            let lock = ScopedDirLock::new(dir)?;
57            let mut message = RepairMessage::new(dir);
58            message += &format!("Processing IndexedLog: {:?}\n", dir);
59
60            let primary_path = dir.join(PRIMARY_FILE);
61            let meta_path = dir.join(META_FILE);
62
63            // Make sure the header of the primary log file is okay.
64            (|| -> crate::Result<()> {
65                #[allow(clippy::never_loop)]
66                let header_corrupted = loop {
67                    if let Err(e) = primary_path.metadata() {
68                        if e.kind() == io::ErrorKind::NotFound {
69                            break true;
70                        }
71                    }
72                    let mut file = fs::OpenOptions::new()
73                        .read(true)
74                        .open(&primary_path)
75                        .context(&primary_path, "cannot open for read")?;
76                    let mut buf = [0; PRIMARY_START_OFFSET as usize];
77                    break match file.read_exact(&mut buf) {
78                        Ok(_) => buf != PRIMARY_HEADER,
79                        Err(_) => true,
80                    };
81                };
82                if header_corrupted {
83                    let mut file = fs::OpenOptions::new()
84                        .write(true)
85                        .create(true)
86                        .open(&primary_path)
87                        .context(&primary_path, "cannot open for write")?;
88                    file.write_all(PRIMARY_HEADER)
89                        .context(&primary_path, "cannot re-write header")?;
90                    let _ = utils::fix_perm_file(&file, false);
91                    message += "Fixed header in log\n";
92                }
93                Ok(())
94            })()
95            .context("while making sure log has the right header")?;
96
97            // Make sure the "primary_len" is large enough.
98            (|| -> crate::Result<()> {
99                let primary_len = primary_path
100                    .metadata()
101                    .context(&primary_path, "cannot read fs metadata")?
102                    .len();
103                match LogMetadata::read_file(&meta_path)
104                    .context("repair cannot fix metadata corruption")
105                {
106                    Ok(meta) => {
107                        // If metadata can be read, trust it.
108                        if meta.primary_len > primary_len {
109                            use fs2::FileExt;
110                            // Log was truncated for some reason...
111                            // (This should be relatively rare)
112                            // Fill Log with 0s.
113                            let file = fs::OpenOptions::new()
114                                .write(true)
115                                .open(&primary_path)
116                                .context(&primary_path, "cannot open for write")?;
117                            file.allocate(meta.primary_len)
118                                .context(&primary_path, "cannot fallocate")?;
119                            message += &format!(
120                                "Extended log to {:?} bytes required by meta\n",
121                                meta.primary_len
122                            );
123                        }
124                    }
125                    Err(meta_err) => {
126                        // Attempt to rebuild metadata.
127                        let meta = LogMetadata::new_with_primary_len(primary_len);
128                        meta.write_file(&meta_path, self.fsync)
129                            .context("while recreating meta")
130                            .source(meta_err)?;
131                        message += "Rebuilt metadata\n";
132                    }
133                }
134                Ok(())
135            })()
136            .context("while making sure log.length >= meta.log_length")?;
137
138            // Reload the latest log without indexes.
139            //
140            // At this time log is likely open-able.
141            //
142            // Try to open it with indexes so we might reuse them. If that
143            // fails, retry with all indexes disabled.
144            let mut log = self
145                .open_with_lock(&dir.into(), &lock)
146                .or_else(|_| {
147                    self.clone()
148                        .index_defs(Vec::new())
149                        .open(GenericPath::from(dir))
150                })
151                .context("cannot open log for repair")?;
152
153            let mut iter = log.iter();
154
155            // Read entries until hitting a checksum error.
156            let mut entry_count = 0;
157            while let Some(Ok(_)) = iter.next() {
158                entry_count += 1;
159            }
160
161            let valid_len = iter.next_offset;
162            assert!(valid_len >= PRIMARY_START_OFFSET);
163            assert!(valid_len <= log.meta.primary_len);
164
165            if valid_len == log.meta.primary_len {
166                message += &format!(
167                    "Verified {} entries, {} bytes in log\n",
168                    entry_count, valid_len
169                );
170            } else {
171                message += &format!(
172                    "Verified first {} entries, {} of {} bytes in log\n",
173                    entry_count, valid_len, log.meta.primary_len
174                );
175
176                // Backup the part to be truncated.
177                (|| -> crate::Result<()> {
178                    let mut primary_file = fs::OpenOptions::new()
179                        .read(true)
180                        .open(&primary_path)
181                        .context(&primary_path, "cannot open for read")?;
182                    let backup_path = dir.join(format!(
183                        "log.bak.epoch{}.offset{}",
184                        log.meta.epoch, valid_len
185                    ));
186                    let mut backup_file = fs::OpenOptions::new()
187                        .create_new(true)
188                        .write(true)
189                        .open(&backup_path)
190                        .context(&backup_path, "cannot open")?;
191
192                    primary_file
193                        .seek(SeekFrom::Start(valid_len))
194                        .context(&primary_path, "cannot seek")?;
195
196                    let mut reader = io::BufReader::new(primary_file);
197                    loop {
198                        let len = {
199                            let buf = reader.fill_buf().context(&primary_path, "cannot read")?;
200                            if buf.is_empty() {
201                                break;
202                            }
203                            backup_file
204                                .write_all(buf)
205                                .context(&backup_path, "cannot write")?;
206                            buf.len()
207                        };
208                        reader.consume(len);
209                    }
210                    message += &format!("Backed up corrupted log to {:?}\n", backup_path);
211                    Ok(())
212                })()
213                .context("while trying to backup corrupted log")?;
214
215                // Update metadata. Invalidate indexes.
216                // Bump epoch since this is a non-append-only change.
217                // Reload disk buffer.
218                log.meta.primary_len = valid_len;
219                log.meta.indexes.clear();
220                log.meta.epoch = log.meta.epoch.wrapping_add(1);
221                log.disk_buf = mmap_path(&primary_path, valid_len)?;
222
223                log.meta
224                    .write_file(&meta_path, log.open_options.fsync)
225                    .context("while trying to update metadata with verified log length")?;
226                message += &format!("Reset log size to {}\n", valid_len);
227            }
228
229            // Also rebuild corrupted indexes.
230            // Without this, indexes are empty until the next `sync`, which
231            // can lead to bad performance.
232            log.open_options.index_defs = self.index_defs.clone();
233            message += &log
234                .rebuild_indexes_with_lock(false, &lock)
235                .context("while trying to update indexes with reapired log")?;
236
237            Ok(message.into_string())
238        })();
239
240        result.context(|| format!("in log::OpenOptions::repair({:?})", dir))
241    }
242}
243
244impl OpenOptionsRepair for OpenOptions {
245    fn open_options_repair(&self, dir: impl AsRef<Path>) -> crate::Result<String> {
246        OpenOptions::repair(self, dir.as_ref())
247    }
248}
249
250impl OpenOptionsOutput for OpenOptions {
251    type Output = Log;
252
253    fn open_path(&self, path: &Path) -> crate::Result<Self::Output> {
254        self.open(path)
255    }
256}
257
258impl OpenOptions {
259    /// Attempt to change a [`Log`] at the given directory so it becomes
260    /// empty and hopefully recovers from some corrupted state.
261    ///
262    /// Warning: This deletes data, and there is no backup!
263    pub fn delete_content(&self, dir: impl Into<GenericPath>) -> crate::Result<()> {
264        let dir = dir.into();
265        let dir = match dir.as_opt_path() {
266            Some(dir) => dir,
267            None => return Ok(()),
268        };
269        let result: crate::Result<()> = (|| {
270            // Ensure the directory exist.
271            utils::mkdir_p(dir)?;
272
273            // Prevent other writers.
274            let lock = ScopedDirLock::new(dir)?;
275
276            // Replace the metadata to an empty state.
277            let meta = LogMetadata::new_with_primary_len(PRIMARY_START_OFFSET);
278            let meta_path = dir.join(META_FILE);
279            meta.write_file(meta_path, self.fsync)?;
280
281            // Replace the primary log.
282            let primary_path = dir.join(PRIMARY_FILE);
283            utils::atomic_write_plain(&primary_path, PRIMARY_HEADER, self.fsync)?;
284
285            // Replace indexes so they become empty.
286            let log = self
287                .clone()
288                .create(true)
289                .open_with_lock(&dir.into(), &lock)
290                .context("cannot open")?;
291            log.rebuild_indexes_with_lock(true, &lock)?;
292
293            Ok(())
294        })();
295
296        result.context(|| format!("in log::OpenOptions::delete_content({:?})", dir))
297    }
298}