use crate::event::Event;
use crate::log::EventReader;
use crate::snapshot::{self, Snapshot};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::any::Any;
use std::io;
use std::path::{Path, PathBuf};
pub type ReduceFn<S> = fn(S, &Event) -> S;
mod sealed {
pub trait Sealed {}
}
pub trait ViewOps: sealed::Sealed {
fn refresh_boxed(&mut self, reader: &EventReader) -> io::Result<()>;
fn reset_offset(&mut self) -> io::Result<()>;
fn view_name(&self) -> &str;
fn as_any(&self) -> &dyn Any;
fn as_any_mut(&mut self) -> &mut dyn Any;
}
pub struct View<S> {
name: String,
reducer: ReduceFn<S>,
snapshot_path: PathBuf,
state: S,
offset: u64,
hash: String,
loaded: bool,
needs_full_replay: bool,
}
impl<S: std::fmt::Debug> std::fmt::Debug for View<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("View")
.field("name", &self.name)
.field("snapshot_path", &self.snapshot_path)
.field("state", &self.state)
.field("offset", &self.offset)
.finish()
}
}
impl<S> View<S>
where
S: Serialize + DeserializeOwned + Default + Clone,
{
pub fn new(name: &str, reducer: ReduceFn<S>, views_dir: &Path) -> Self {
let snapshot_path = views_dir.join(format!("{name}.snapshot.json"));
View {
name: name.to_string(),
reducer,
snapshot_path,
state: S::default(),
offset: 0,
hash: String::new(),
loaded: false,
needs_full_replay: false,
}
}
pub fn refresh(&mut self, reader: &EventReader) -> io::Result<&S> {
if !self.loaded {
if let Some(snap) = snapshot::load::<S>(&self.snapshot_path)? {
self.state = snap.state;
self.offset = snap.offset;
self.hash = snap.hash;
} else {
self.needs_full_replay = true;
}
self.loaded = true;
if self.offset > 0 {
match self.verify_snapshot(reader)? {
SnapshotValidity::Valid => {}
SnapshotValidity::OffsetBeyondEof => {
log::warn!(
"eventfold: view '{}': snapshot offset {} is beyond log EOF, rebuilding",
self.name, self.offset
);
self.state = S::default();
self.offset = 0;
self.hash = String::new();
self.needs_full_replay = true;
}
SnapshotValidity::HashMismatch => {
log::warn!(
"eventfold: view '{}': snapshot hash mismatch, rebuilding",
self.name
);
self.state = S::default();
self.offset = 0;
self.hash = String::new();
self.needs_full_replay = true;
}
}
}
}
let mut state = std::mem::take(&mut self.state);
let mut new_offset = self.offset;
let mut new_hash = self.hash.clone();
let mut processed = false;
if self.needs_full_replay {
self.needs_full_replay = false;
for result in reader.read_full()? {
let (event, line_hash) = result?;
state = (self.reducer)(state, &event);
new_hash = line_hash;
processed = true;
}
if processed {
new_offset = reader.active_log_size()?;
}
} else {
for result in reader.read_from(self.offset)? {
let (event, next_offset, line_hash) = result?;
state = (self.reducer)(state, &event);
new_offset = next_offset;
new_hash = line_hash;
processed = true;
}
}
self.state = state;
if processed {
self.offset = new_offset;
self.hash = new_hash;
snapshot::save(
&self.snapshot_path,
&Snapshot::new(self.state.clone(), self.offset, self.hash.clone()),
)?;
}
Ok(&self.state)
}
pub fn state(&self) -> &S {
&self.state
}
pub fn rebuild(&mut self, reader: &EventReader) -> io::Result<&S> {
snapshot::delete(&self.snapshot_path)?;
self.state = S::default();
self.offset = 0;
self.hash = String::new();
self.loaded = true;
self.needs_full_replay = true;
self.refresh(reader)
}
pub fn name(&self) -> &str {
&self.name
}
fn verify_snapshot(&self, reader: &EventReader) -> io::Result<SnapshotValidity> {
let file_size = reader.active_log_size()?;
if self.offset > file_size {
return Ok(SnapshotValidity::OffsetBeyondEof);
}
if self.offset == 0 {
return Ok(SnapshotValidity::Valid);
}
match reader.read_line_hash_before(self.offset)? {
Some(hash) if hash == self.hash => Ok(SnapshotValidity::Valid),
Some(_) => Ok(SnapshotValidity::HashMismatch),
None => Ok(SnapshotValidity::Valid),
}
}
}
impl<S> sealed::Sealed for View<S> {}
impl<S> ViewOps for View<S>
where
S: Serialize + DeserializeOwned + Default + Clone + 'static,
{
fn refresh_boxed(&mut self, reader: &EventReader) -> io::Result<()> {
self.refresh(reader)?;
Ok(())
}
fn reset_offset(&mut self) -> io::Result<()> {
self.offset = 0;
self.hash = String::new();
snapshot::save(
&self.snapshot_path,
&Snapshot::new(self.state.clone(), self.offset, self.hash.clone()),
)
}
fn view_name(&self) -> &str {
&self.name
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
enum SnapshotValidity {
Valid,
OffsetBeyondEof,
HashMismatch,
}