use std::fs;
use std::io;
use std::io::BufRead;
use std::io::Read;
use std::io::Seek;
use std::io::SeekFrom;
use std::io::Write;
use std::path::Path;
use crate::errors::IoResultExt;
use crate::errors::ResultExt;
use crate::lock::ScopedDirLock;
use crate::log::GenericPath;
use crate::log::Log;
use crate::log::LogMetadata;
use crate::log::OpenOptions;
use crate::log::META_FILE;
use crate::log::PRIMARY_FILE;
use crate::log::PRIMARY_HEADER;
use crate::log::PRIMARY_START_OFFSET;
use crate::repair::OpenOptionsOutput;
use crate::repair::OpenOptionsRepair;
use crate::repair::RepairMessage;
use crate::utils;
use crate::utils::mmap_path;
impl OpenOptions {
pub fn repair(&self, dir: impl Into<GenericPath>) -> crate::Result<String> {
let dir = dir.into();
let dir = match dir.as_opt_path() {
Some(dir) => dir,
None => return Ok(format!("{:?} is not on disk. Nothing to repair.\n", &dir)),
};
let result: crate::Result<_> = (|| {
if !dir.exists() {
return Ok(format!("{:?} does not exist. Nothing to repair.\n", dir));
}
let lock = ScopedDirLock::new(dir)?;
let mut message = RepairMessage::new(dir);
message += &format!("Processing IndexedLog: {:?}\n", dir);
let primary_path = dir.join(PRIMARY_FILE);
let meta_path = dir.join(META_FILE);
(|| -> crate::Result<()> {
#[allow(clippy::never_loop)]
let header_corrupted = loop {
if let Err(e) = primary_path.metadata() {
if e.kind() == io::ErrorKind::NotFound {
break true;
}
}
let mut file = fs::OpenOptions::new()
.read(true)
.open(&primary_path)
.context(&primary_path, "cannot open for read")?;
let mut buf = [0; PRIMARY_START_OFFSET as usize];
break match file.read_exact(&mut buf) {
Ok(_) => buf != PRIMARY_HEADER,
Err(_) => true,
};
};
if header_corrupted {
let mut file = fs::OpenOptions::new()
.write(true)
.create(true)
.open(&primary_path)
.context(&primary_path, "cannot open for write")?;
file.write_all(PRIMARY_HEADER)
.context(&primary_path, "cannot re-write header")?;
let _ = utils::fix_perm_file(&file, false);
message += "Fixed header in log\n";
}
Ok(())
})()
.context("while making sure log has the right header")?;
(|| -> crate::Result<()> {
let primary_len = primary_path
.metadata()
.context(&primary_path, "cannot read fs metadata")?
.len();
match LogMetadata::read_file(&meta_path)
.context("repair cannot fix metadata corruption")
{
Ok(meta) => {
if meta.primary_len > primary_len {
use fs2::FileExt;
let file = fs::OpenOptions::new()
.write(true)
.open(&primary_path)
.context(&primary_path, "cannot open for write")?;
file.allocate(meta.primary_len)
.context(&primary_path, "cannot fallocate")?;
message += &format!(
"Extended log to {:?} bytes required by meta\n",
meta.primary_len
);
}
}
Err(meta_err) => {
let meta = LogMetadata::new_with_primary_len(primary_len);
meta.write_file(&meta_path, self.fsync)
.context("while recreating meta")
.source(meta_err)?;
message += "Rebuilt metadata\n";
}
}
Ok(())
})()
.context("while making sure log.length >= meta.log_length")?;
let mut log = self
.open_with_lock(&dir.into(), &lock)
.or_else(|_| {
self.clone()
.index_defs(Vec::new())
.open(GenericPath::from(dir))
})
.context("cannot open log for repair")?;
let mut iter = log.iter();
let mut entry_count = 0;
while let Some(Ok(_)) = iter.next() {
entry_count += 1;
}
let valid_len = iter.next_offset;
assert!(valid_len >= PRIMARY_START_OFFSET);
assert!(valid_len <= log.meta.primary_len);
if valid_len == log.meta.primary_len {
message += &format!(
"Verified {} entries, {} bytes in log\n",
entry_count, valid_len
);
} else {
message += &format!(
"Verified first {} entries, {} of {} bytes in log\n",
entry_count, valid_len, log.meta.primary_len
);
(|| -> crate::Result<()> {
let mut primary_file = fs::OpenOptions::new()
.read(true)
.open(&primary_path)
.context(&primary_path, "cannot open for read")?;
let backup_path = dir.join(format!(
"log.bak.epoch{}.offset{}",
log.meta.epoch, valid_len
));
let mut backup_file = fs::OpenOptions::new()
.create_new(true)
.write(true)
.open(&backup_path)
.context(&backup_path, "cannot open")?;
primary_file
.seek(SeekFrom::Start(valid_len))
.context(&primary_path, "cannot seek")?;
let mut reader = io::BufReader::new(primary_file);
loop {
let len = {
let buf = reader.fill_buf().context(&primary_path, "cannot read")?;
if buf.is_empty() {
break;
}
backup_file
.write_all(buf)
.context(&backup_path, "cannot write")?;
buf.len()
};
reader.consume(len);
}
message += &format!("Backed up corrupted log to {:?}\n", backup_path);
Ok(())
})()
.context("while trying to backup corrupted log")?;
log.meta.primary_len = valid_len;
log.meta.indexes.clear();
log.meta.epoch = log.meta.epoch.wrapping_add(1);
log.disk_buf = mmap_path(&primary_path, valid_len)?;
log.meta
.write_file(&meta_path, log.open_options.fsync)
.context("while trying to update metadata with verified log length")?;
message += &format!("Reset log size to {}\n", valid_len);
}
log.open_options.index_defs = self.index_defs.clone();
message += &log
.rebuild_indexes_with_lock(false, &lock)
.context("while trying to update indexes with reapired log")?;
Ok(message.into_string())
})();
result.context(|| format!("in log::OpenOptions::repair({:?})", dir))
}
}
impl OpenOptionsRepair for OpenOptions {
fn open_options_repair(&self, dir: impl AsRef<Path>) -> crate::Result<String> {
OpenOptions::repair(self, dir.as_ref())
}
}
impl OpenOptionsOutput for OpenOptions {
type Output = Log;
fn open_path(&self, path: &Path) -> crate::Result<Self::Output> {
self.open(path)
}
}
impl OpenOptions {
pub fn delete_content(&self, dir: impl Into<GenericPath>) -> crate::Result<()> {
let dir = dir.into();
let dir = match dir.as_opt_path() {
Some(dir) => dir,
None => return Ok(()),
};
let result: crate::Result<()> = (|| {
utils::mkdir_p(dir)?;
let lock = ScopedDirLock::new(dir)?;
let meta = LogMetadata::new_with_primary_len(PRIMARY_START_OFFSET);
let meta_path = dir.join(META_FILE);
meta.write_file(&meta_path, self.fsync)?;
let primary_path = dir.join(PRIMARY_FILE);
utils::atomic_write_plain(&primary_path, PRIMARY_HEADER, self.fsync)?;
let log = self
.clone()
.create(true)
.open_with_lock(&dir.into(), &lock)
.context("cannot open")?;
log.rebuild_indexes_with_lock(true, &lock)?;
Ok(())
})();
result.context(|| format!("in log::OpenOptions::delete_content({:?})", dir))
}
}