pub struct WAL<Log, State, ReplayContext>where
Log: Serialize + for<'a> Deserialize<'a>,
State: Default + Serialize + for<'a> Deserialize<'a>,
ReplayContext: Clone,{
pub size: usize,
pub options: WALOptions,
pub data_path: PathBuf,
/* private fields */
}
Expand description
Write Ahead Log(WAL)
Persists the generic State
by applying Log
(s) to
all mutable actions before performing those actions
State is then checkpoint
ed, and can be replay
ed later
from disk to rebuild the State
via the replay_handler
with optional ReplayContext
.
Performance can be tuned via the WALOptions
argument in open
,
but WALOptions.divider
must stay consistent.
Disk Structure:
./<data_path>
./wal.bin // Logs file
./checkpoint.bin // State checkpoint file
./temporary-checkpoint.bin // Temporary file during `checkpoint`
Fields§
§size: usize
§options: WALOptions
§data_path: PathBuf
Implementations§
Source§impl<Log, State, ReplayContext> WAL<Log, State, ReplayContext>where
Log: Serialize + for<'a> Deserialize<'a>,
State: Default + Serialize + for<'a> Deserialize<'a>,
ReplayContext: Clone,
impl<Log, State, ReplayContext> WAL<Log, State, ReplayContext>where
Log: Serialize + for<'a> Deserialize<'a>,
State: Default + Serialize + for<'a> Deserialize<'a>,
ReplayContext: Clone,
Sourcepub fn should_checkpoint(&self) -> bool
pub fn should_checkpoint(&self) -> bool
Returns if over or equal to max size and should call the
checkpoint
method
Example Usage:
let mut wal: WAL::<TreeLog, Tree> = ...;
let tree: Tree = ...;
if wal.should_checkpoint() {
wal.checkpoint(&tree).await?;
}
Sourcepub async fn checkpoint(&mut self, state: &State) -> Result<(), Error>
pub async fn checkpoint(&mut self, state: &State) -> Result<(), Error>
Checkpoints the current state of the persisted data to disk, then truncates the WAL to 0
Example Usage:
let mut wal: WAL::<TreeLog, Tree> = ...;
let tree: Tree = ...;
if wal.should_checkpoint() {
wal.checkpoint(&tree).await?;
}
Source§impl<Log, State, ReplayContext> WAL<Log, State, ReplayContext>where
Log: Serialize + for<'a> Deserialize<'a>,
State: Default + Serialize + for<'a> Deserialize<'a>,
ReplayContext: Clone,
impl<Log, State, ReplayContext> WAL<Log, State, ReplayContext>where
Log: Serialize + for<'a> Deserialize<'a>,
State: Default + Serialize + for<'a> Deserialize<'a>,
ReplayContext: Clone,
Sourcepub async fn log(&mut self, log: &Log) -> Result<(), Error>
pub async fn log(&mut self, log: &Log) -> Result<(), Error>
Log once to the WAL
Example Usage:
let mut wal: WAL<CacheLog, Cache> = WAL::open("./cache_wal", Box::new(replay_handler), None).await?;
let mut cache = wal.replay().await?;
let log = CacheLog::Insert {
key: "some id".to_string(),
value: "some data".to_string(),
};
wal.log(&log).await?;
Sourcepub async fn log_many(&mut self, logs: &Vec<Log>) -> Result<(), Error>
pub async fn log_many(&mut self, logs: &Vec<Log>) -> Result<(), Error>
Log many as a single batch log
Example Usage:
let mut wal: WAL<CacheLog, Cache> = WAL::open("./cache_wal", Box::new(replay_handler), None).await?;
let mut cache = wal.replay().await?;
let mut logs = vec![];
for string in vec!["1".to_string(), "2".to_string()] {
logs.push(CacheLog::Insert {
key: string.clone(),
value: string,
});
}
wal.log_many(&logs).await?;
Source§impl<Log, State, ReplayContext> WAL<Log, State, ReplayContext>where
Log: Serialize + for<'a> Deserialize<'a>,
State: Default + Serialize + for<'a> Deserialize<'a>,
ReplayContext: Clone,
impl<Log, State, ReplayContext> WAL<Log, State, ReplayContext>where
Log: Serialize + for<'a> Deserialize<'a>,
State: Default + Serialize + for<'a> Deserialize<'a>,
ReplayContext: Clone,
Sourcepub async fn replay(
&mut self,
context: Option<ReplayContext>,
) -> Result<State, Error>
pub async fn replay( &mut self, context: Option<ReplayContext>, ) -> Result<State, Error>
Replay the log file against the current checkpoint to construct
the current state, then checkpoints the current state to truncate
the WAL. Each log batch is handled with the WAL.replay_handler
.
context
is optional data you can provide during the replay to
aid in the replay process, but is not persisted or checkpointed.
Example Usage:
fn replay_handler(
logs: Vec<CacheLog>,
checkpoint: Arc<Mutex<Cache>>,
context: Option<ReplayContext>,
) -> ReplayHandlerResult {
Box::pin(async move {
let mut cache = cache.lock().await;
for log in logs {
match log {
CacheLog::Insert { key, value } => cache.insert(key, value),
CacheLog::Remove(key) => cache.remove(&key),
};
}
Ok(())
})
}
let mut wal: WAL<CacheLog, Cache> = WAL::open("./cache_wal", Box::new(replay_handler), None)
.await?;
let mut cache = wal.replay().await?;
Source§impl<Log, State, ReplayContext> WAL<Log, State, ReplayContext>where
Log: Serialize + for<'a> Deserialize<'a>,
State: Default + Serialize + for<'a> Deserialize<'a>,
ReplayContext: Clone,
impl<Log, State, ReplayContext> WAL<Log, State, ReplayContext>where
Log: Serialize + for<'a> Deserialize<'a>,
State: Default + Serialize + for<'a> Deserialize<'a>,
ReplayContext: Clone,
Sourcepub async fn undo(&mut self) -> Result<(), Error>
pub async fn undo(&mut self) -> Result<(), Error>
Undoes the last log
or log_many
by truncation
Does nothing if already undone, or no log/log_many call
Example Usage:
let mut wal: WAL<CacheLog, Cache> = ...;
wal.log_many(&logs).await?;
wal.undo().await?; // Undoes `log_many`
wal.log(&log).await?;
wal.undo().await?; // Undoes `log`
wal.undo().await?; // Does nothing
Source§impl<Log, State, ReplayContext> WAL<Log, State, ReplayContext>where
Log: Serialize + for<'a> Deserialize<'a>,
State: Default + Serialize + for<'a> Deserialize<'a>,
ReplayContext: Clone,
impl<Log, State, ReplayContext> WAL<Log, State, ReplayContext>where
Log: Serialize + for<'a> Deserialize<'a>,
State: Default + Serialize + for<'a> Deserialize<'a>,
ReplayContext: Clone,
Sourcepub async fn open<P: AsRef<Path>>(
data_path: P,
replay_handler: ReplayHandler<Log, State, ReplayContext>,
options: Option<WALOptions>,
) -> Result<WAL<Log, State, ReplayContext>, Error>
pub async fn open<P: AsRef<Path>>( data_path: P, replay_handler: ReplayHandler<Log, State, ReplayContext>, options: Option<WALOptions>, ) -> Result<WAL<Log, State, ReplayContext>, Error>
Open a WAL by data_path
If data_path
does not exist, will create it and
setup a new empty WAL
Note: WALOptions.divider
must stay consistent.
Note: Does not replay
the WAL, must call manually.
Example Usage:
fn replay_handler(
logs: Vec<CacheLog>,
checkpoint: Arc<Mutex<Cache>>,
context: Option<ReplayContext>,
) -> ReplayHandlerResult {
Box::pin(async move {
let mut cache = cache.lock().await;
for log in logs {
match log {
CacheLog::Insert { key, value } => cache.insert(key, value),
CacheLog::Remove(key) => cache.remove(&key),
};
}
Ok(())
})
}
let mut wal: WAL<CacheLog, Cache, ReplayContext> = WAL::open("./cache_wal", Box::new(replay_handler), None)
.await?;
let mut cache = wal.replay().await?;