use std::cmp::Ordering;
use std::fs::{read_dir, remove_file};
use std::path::{Path, PathBuf};
use biometrics::{Collector, Counter};
use mani::{Edit, Manifest, ManifestIterator};
use setsum::Setsum;
use sst::merging_cursor::MergingCursor;
use sst::{Cursor, Sst, SstCursor};
use zerror::Z;
use zerror_core::ErrorCore;
use super::{
Error, IoToZ, LsmtkOptions, MANI_ROOT, SST_FILE, TRASH_LOG, TRASH_ROOT, TRASH_SST, VERIFY_ROOT,
};
static RM_FILE: Counter = Counter::new("lsmtk.verifier.verifier_rm_file");
static RM_MANI: Counter = Counter::new("lsmtk.verifier.verifier_rm_mani");
static EDIT_VERIFIED: Counter = Counter::new("lsmtk.verifier.verifier_edit_verified");
static MANI_VERIFIED: Counter = Counter::new("lsmtk.verifier.verifier_mani_verified");
pub fn register_biometrics(collector: &Collector) {
collector.register_counter(&RM_FILE);
collector.register_counter(&RM_MANI);
collector.register_counter(&EDIT_VERIFIED);
collector.register_counter(&MANI_VERIFIED);
}
pub struct LsmVerifier {
root: PathBuf,
mani: Manifest,
options: LsmtkOptions,
}
impl LsmVerifier {
pub fn open(options: LsmtkOptions) -> Result<Self, Error> {
let root: PathBuf = PathBuf::from(&options.path);
let mani: Manifest = Manifest::open(options.mani.clone(), VERIFY_ROOT(&root))?;
Ok(Self {
root,
mani,
options,
})
}
pub fn verify(&mut self) -> Result<(), Error> {
let mut entries = list_mani_fragments(&self.root)?;
entries.pop();
entries.pop();
for entry in entries {
self.process_one(&entry)?;
}
Ok(())
}
fn process_one(&mut self, entry: &PathBuf) -> Result<(), Error> {
self.possibly_complete_processing(entry)?;
if let Some(last_entry_processed) = self.mani.info('M') {
let log_num_old = mani::extract_backup(last_entry_processed);
let log_num_new = mani::extract_backup(entry);
if log_num_old == log_num_new {
return Ok(());
}
}
assert!(self.mani.strs().count() == 0);
let verifier_setsum = setsum_from_info_default('O', self.mani.info('O'))?;
let (output_setsum, ssts_to_rm, logs_to_rm) = self.verify_one(entry, verifier_setsum)?;
let mut edit = Edit::default();
for sst in ssts_to_rm.iter() {
let path = TRASH_SST(&self.root, *sst);
if !path.exists() {
return Err(Error::Backoff {
core: ErrorCore::default(),
path: basename_string(&path)?,
});
}
edit.add(&basename_string(&path)?)?;
}
for log_num in logs_to_rm.iter() {
let log_path = TRASH_LOG(&self.root, *log_num);
if !log_path.exists() {
return Err(Error::Backoff {
core: ErrorCore::default(),
path: basename_string(&log_path)?,
});
}
edit.add(&basename_string(log_path)?)?;
}
edit.info('O', &output_setsum.hexdigest())?;
edit.info('M', &basename_string(entry)?)?;
self.mani.apply(edit)?;
self.possibly_complete_processing(entry)?;
Ok(())
}
fn possibly_complete_processing(&mut self, entry: &PathBuf) -> Result<(), Error> {
if let Some(last_entry_processed) = self.mani.info('M') {
let log_num_old = mani::extract_backup(last_entry_processed);
let log_num_new = mani::extract_backup(entry);
if log_num_old > log_num_new {
return Err(Error::Corruption {
core: ErrorCore::default(),
context: "clean up saw log out of order".to_string(),
}
.with_info("old log num", log_num_old)
.with_info("new log num", log_num_new));
}
if log_num_old == log_num_new && entry.exists() {
RM_MANI.click();
remove_file(entry).as_z().with_info("path", entry)?;
}
let mut edit = Edit::default();
for path in self.mani.strs() {
RM_FILE.click();
let full_path = TRASH_ROOT(&self.root).join(path);
if full_path.exists() {
remove_file(&full_path)
.as_z()
.with_info("path", full_path)?;
}
edit.rm(path)?;
}
self.mani.apply(edit)?;
}
Ok(())
}
fn verify_one(
&self,
entry: &PathBuf,
mut acc: Setsum,
) -> Result<(Setsum, Vec<Setsum>, Vec<u64>), Error> {
let mani_iter = ManifestIterator::open(entry)?;
let mut ssts_to_remove = vec![];
let mut logs_to_remove = vec![];
let mut last_outputs = None;
let mut first = true;
for edit in mani_iter {
let edit = edit?;
let inputs = setsum_from_info('I', edit.get_info('I'))?;
let outputs = setsum_from_info('O', edit.get_info('O'))?;
let discard = setsum_from_info('D', edit.get_info('D'))?;
if first && outputs != acc {
let err = Error::Corruption {
core: ErrorCore::default(),
context: "manifest does not continue with accumulated setsum".to_string(),
}
.with_info("outputs", outputs.hexdigest())
.with_info("acc", acc.hexdigest())
.with_info("fragment", entry.to_string_lossy());
return Err(err);
}
if !first && inputs != acc {
let err = Error::Corruption {
core: ErrorCore::default(),
context: "manifest does not continue with accumulated setsum".to_string(),
}
.with_info("inputs", inputs.hexdigest())
.with_info("acc", acc.hexdigest())
.with_info("fragment", entry.to_string_lossy());
return Err(err);
}
if !first && inputs != outputs + discard {
let err = Error::Corruption {
core: ErrorCore::default(),
context: "manifest does not balance inputs == outputs + discard".to_string(),
}
.with_info("inputs", inputs.hexdigest())
.with_info("outputs", outputs.hexdigest())
.with_info("discard", discard.hexdigest())
.with_info("discard^-1", (Setsum::default() - discard).hexdigest())
.with_info("inputs - outputs", (inputs - outputs).hexdigest());
return Err(err);
}
last_outputs = Some(outputs);
let mut computed_discard = Setsum::default();
for added in edit.added() {
let setsum = Setsum::from_hexdigest(added).ok_or(Error::Corruption {
core: ErrorCore::default(),
context: format!("manifest added has bad digest: {added}"),
})?;
computed_discard -= setsum;
}
for rmed in edit.rmed() {
let setsum = Setsum::from_hexdigest(rmed).ok_or(Error::Corruption {
core: ErrorCore::default(),
context: format!("manifest rmed has bad digest: {rmed}"),
})?;
computed_discard += setsum;
ssts_to_remove.push(setsum);
}
if !first {
if let Some(log_num) = edit.get_info('L') {
let log_num: u64 = log_num.parse().map_err(|_| Error::Corruption {
core: ErrorCore::default(),
context: format!("manifest has bad L field: got {log_num:?}"),
})?;
logs_to_remove.push(log_num);
}
if discard != computed_discard {
return Err(Error::Corruption {
core: ErrorCore::default(),
context: format!(
"manifest has bad discard: expected {discard:?}, but got {computed_discard:?}"
),
});
}
if discard != Setsum::default() && edit.rmed().count() > 0 {
self.verify_gc(&edit, discard)?;
}
acc -= computed_discard;
}
first = false;
EDIT_VERIFIED.click();
}
MANI_VERIFIED.click();
if last_outputs != Some(acc) {
return Err(Error::Corruption {
core: ErrorCore::default(),
context: format!("manifest has bad output setsum: expected {acc:?}"),
});
}
Ok((acc, ssts_to_remove, logs_to_remove))
}
fn verify_gc(&self, edit: &Edit, discard: Setsum) -> Result<(), Error> {
fn from_hexdigest(hex_digest: &str) -> Result<Setsum, Error> {
match Setsum::from_hexdigest(hex_digest) {
Some(setsum) => Ok(setsum),
None => Err(Error::Corruption {
core: ErrorCore::default(),
context: format!("manifest field has bad digest: {hex_digest}"),
}),
}
}
let mut input_cursors: Vec<SstCursor> = vec![];
let mut gc_cursors: Vec<SstCursor> = vec![];
for rm in edit.rmed() {
let cursor = self.get_cursor(from_hexdigest(rm)?)?;
input_cursors.push(cursor.clone());
gc_cursors.push(cursor);
}
let mut output_cursors: Vec<SstCursor> = vec![];
for add in edit.added() {
output_cursors.push(self.get_cursor(from_hexdigest(add)?)?);
}
let mut input = MergingCursor::new(input_cursors)?;
let mut output = MergingCursor::new(output_cursors)?;
let mut gc = MergingCursor::new(gc_cursors)?;
input.seek_to_first()?;
input.next()?;
output.seek_to_first()?;
output.next()?;
gc.seek_to_first()?;
gc.next()?;
let mut gc = self.options.gc_policy.collector(gc, 0)?;
let mut gc_next = gc.next()?;
let mut computed_discard = Setsum::default();
while let (Some(i), Some(o)) = (input.key(), output.key()) {
let mut must_return = false;
if let Some(gc_next) = gc_next {
match gc_next.cmp(&i) {
Ordering::Less => {
return Err(Error::LogicError {
core: ErrorCore::default(),
context: "gc key less than input".to_string(),
})
.with_info("gc", gc_next)
.with_info("input", i);
}
Ordering::Equal => {
must_return = true;
}
Ordering::Greater => {}
};
}
match i.cmp(&o) {
Ordering::Less => {
if must_return {
return Err(Error::Corruption {
core: ErrorCore::default(),
context: "data loss".to_string(),
})
.with_info("input", i);
}
let mut setsum = sst::Setsum::default();
setsum.insert(input.key_value().unwrap());
computed_discard += setsum.into_inner();
input.next()?;
}
Ordering::Greater => {
return Err(Error::Corruption {
core: ErrorCore::default(),
context: "data construction".to_string(),
})
.with_info("output", o);
}
Ordering::Equal => {
input.next()?;
output.next()?;
}
};
if must_return {
gc_next = gc.next()?;
}
}
if let Some(o) = output.key() {
return Err(Error::Corruption {
core: ErrorCore::default(),
context: "data construction".to_string(),
})
.with_info("output", o);
}
while let Some(i) = input.key_value() {
let mut setsum = sst::Setsum::default();
setsum.insert(i);
computed_discard += setsum.into_inner();
input.next()?;
}
if computed_discard != discard {
return Err(Error::Corruption {
core: ErrorCore::default(),
context: "garbage collection has bad discard".to_string(),
})
.with_info("discard", discard.hexdigest())
.with_info("discard^-1", (Setsum::default() - discard).hexdigest())
.with_info("computed_discard", computed_discard.hexdigest())
.with_info(
"computed_discard^-1",
(Setsum::default() - computed_discard).hexdigest(),
);
}
Ok(())
}
fn get_cursor(&self, setsum: Setsum) -> Result<sst::SstCursor, Error> {
let trash_path = TRASH_SST(&self.root, setsum);
let sst_path = SST_FILE(&self.root, setsum);
let file = match sst::file_manager::open_without_manager(&trash_path) {
Ok(file) => file,
Err(_) => match sst::file_manager::open_without_manager(sst_path) {
Ok(file) => file,
Err(_) => sst::file_manager::open_without_manager(&trash_path)?,
},
};
let sst = Sst::from_file_handle(file)?;
Ok(sst.cursor())
}
}
pub struct ManifestVerifier {}
impl ManifestVerifier {
pub fn open() -> Result<Self, Error> {
Ok(ManifestVerifier {})
}
pub fn verify(&self, entry: &PathBuf) -> Result<Vec<(Setsum, Setsum, Setsum)>, Error> {
let mani_iter = ManifestIterator::open(entry)?;
let mut first = true;
let mut acc = Setsum::default();
let mut ret = vec![];
for edit in mani_iter {
let edit = edit?;
let inputs = setsum_from_info('I', edit.get_info('I'))?;
let outputs = setsum_from_info('O', edit.get_info('O'))?;
let discard = setsum_from_info('D', edit.get_info('D'))?;
if first {
acc = outputs;
} else {
ret.push((inputs, outputs, discard));
if inputs != acc {
let err = Error::Corruption {
core: ErrorCore::default(),
context: "manifest does not continue with accumulated setsum".to_string(),
}
.with_info("inputs", inputs.hexdigest())
.with_info("acc", acc.hexdigest())
.with_info("fragment", entry.to_string_lossy());
return Err(err);
}
if inputs != outputs + discard {
let err = Error::Corruption {
core: ErrorCore::default(),
context: "manifest does not balance inputs == outputs + discard"
.to_string(),
}
.with_info("inputs", inputs.hexdigest())
.with_info("outputs", outputs.hexdigest())
.with_info("discard", discard.hexdigest())
.with_info("discard^-1", (Setsum::default() - discard).hexdigest())
.with_info("inputs - outputs", (inputs - outputs).hexdigest());
return Err(err);
}
}
let mut computed_discard = Setsum::default();
for added in edit.added() {
let setsum = Setsum::from_hexdigest(added).ok_or(Error::Corruption {
core: ErrorCore::default(),
context: format!("manifest added has bad digest: {added}"),
})?;
computed_discard -= setsum;
}
for rmed in edit.rmed() {
let setsum = Setsum::from_hexdigest(rmed).ok_or(Error::Corruption {
core: ErrorCore::default(),
context: format!("manifest rmed has bad digest: {rmed}"),
})?;
computed_discard += setsum;
}
if !first {
if discard != computed_discard {
return Err(Error::Corruption {
core: ErrorCore::default(),
context: format!(
"manifest has bad discard: expected {discard:?}, but got {computed_discard:?}"
),
})
.with_info("discard", discard.hexdigest())
.with_info("discard^-1", (Setsum::default() - discard).hexdigest())
.with_info("computed_discard", computed_discard.hexdigest())
.with_info("computed_discard^-1", (Setsum::default() - computed_discard).hexdigest());
}
acc -= computed_discard;
}
first = false;
}
Ok(ret)
}
}
fn basename_string<P: AsRef<Path>>(path: P) -> Result<String, Error> {
if let Some(file_name) = path.as_ref().file_name() {
let file_name_string = file_name.to_string_lossy().to_string();
if PathBuf::from(&file_name_string) != file_name {
Err(Error::Corruption {
core: ErrorCore::default(),
context: "file name contains lossy characters".to_string(),
})
.with_info("path", path.as_ref().to_string_lossy())
} else {
Ok(file_name_string)
}
} else {
Err(Error::Corruption {
core: ErrorCore::default(),
context: "file name has no basename".to_string(),
})
.with_info("path", path.as_ref().to_string_lossy())
}
}
fn setsum_from_info(info: char, value: Option<&String>) -> Result<Setsum, Error> {
let hex_digest = match value {
Some(hex_digest) => hex_digest,
None => {
return Err(Error::Corruption {
core: ErrorCore::default(),
context: format!("manifest edit missing '{info}'"),
});
}
};
match Setsum::from_hexdigest(hex_digest) {
Some(setsum) => Ok(setsum),
None => Err(Error::Corruption {
core: ErrorCore::default(),
context: format!("manifest '{info}' field has bad digest: {hex_digest}"),
}),
}
}
fn setsum_from_info_default(info: char, value: Option<&str>) -> Result<Setsum, Error> {
let hex_digest = match value {
Some(hex_digest) => hex_digest,
None => {
return Ok(Setsum::default());
}
};
match Setsum::from_hexdigest(hex_digest) {
Some(setsum) => Ok(setsum),
None => Err(Error::Corruption {
core: ErrorCore::default(),
context: format!("manifest '{info}' field has bad digest: {hex_digest}"),
}),
}
}
pub fn list_mani_fragments<P: AsRef<Path>>(root: P) -> Result<Vec<PathBuf>, Error> {
let mut entries = vec![];
let mani_root = MANI_ROOT(root.as_ref());
for entry in read_dir(&mani_root)? {
let entry = entry?;
entries.push(entry.path());
}
let mut entries = entries
.iter()
.filter_map(mani::extract_backup)
.collect::<Vec<_>>();
entries.sort();
let mut entries = entries
.into_iter()
.map(|x| mani::BACKUP(&mani_root, x))
.collect::<Vec<_>>();
entries.push(mani::MANIFEST(&mani_root));
Ok(entries)
}