walr/lib.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
// Authors: Robert Lopez
mod checkpoint;
mod iter;
mod log;
mod replay;
mod truncate;
mod undo;
mod util;
pub mod error;
pub mod options;
#[cfg(test)]
mod tests;
use error::Error;
use options::WALOptions;
use serde::{Deserialize, Serialize};
use std::{
future::Future,
marker::PhantomData,
path::{Path, PathBuf},
pin::Pin,
sync::Arc,
};
use tokio::{fs::File, io::BufWriter, sync::Mutex};
use util::{
encode::serialize,
fs::{create_directory, file_size, open_file, path_exists, write_file},
};
pub type ReplayHandler<Log, State> =
Box<dyn Fn(Vec<Log>, Arc<Mutex<State>>) -> ReplayHandlerResult>;
/// `ReplayHandler` result type alias
pub type ReplayHandlerResult =
Pin<Box<dyn Future<Output = Result<(), Box<dyn std::error::Error>>> + Send>>;
/// Write Ahead Log(WAL)
///
/// Persists the generic `State` by applying `Log`(s) to
/// all mutable actions before performing those actions
///
/// State is then `checkpoint`ed, and can be `replay`ed later
/// from disk to rebuild the `State` via the `replay_handler`
///
/// Performance can be tuned via the `WALOptions` argument in `open`,
/// but `WALOptions.divider` must stay consistent.
///
/// Disk Structure:
/// ```
/// ./<data_path>
/// ./wal.bin // Logs file
/// ./checkpoint.bin // State checkpoint file
/// ./temporary-checkpoint.bin // Temporary file during `checkpoint`
/// ```
///
/// [See basic example here](https://gitlab.com/robertlopezdev/walr/-/blob/main/example.rs)
pub struct WAL<Log, State>
where
Log: Serialize + for<'a> Deserialize<'a>,
State: Default + Serialize + for<'a> Deserialize<'a>,
{
pub size: usize,
pub options: WALOptions,
pub data_path: PathBuf,
path: PathBuf,
writer: BufWriter<File>,
last_log_size: Option<usize>,
replay_handler: ReplayHandler<Log, State>,
checkpoint_path: PathBuf,
temporary_checkpoint_path: PathBuf,
_log_phantom: PhantomData<Log>,
_checkpoint_phantom: PhantomData<State>,
}
impl<Log, State> WAL<Log, State>
where
Log: Serialize + for<'a> Deserialize<'a>,
State: Default + Serialize + for<'a> Deserialize<'a>,
{
/// Open a WAL by `data_path`
///
/// If `data_path` does not exist, will create it and
/// setup a new empty WAL
///
/// *Note*: `WALOptions.divider` must stay consistent.
///
/// *Note*: Does not `replay` the WAL, must call manually.
///
/// ---
/// Example Usage:
/// ```
///
/// fn replay_handler(
/// logs: Vec<CacheLog>,
/// checkpoint: Arc<Mutex<Cache>>,
/// ) -> ReplayHandlerResult {
/// Box::pin(async move {
/// let mut cache = cache.lock().await;
///
/// for log in logs {
/// match log {
/// CacheLog::Insert { key, value } => cache.insert(key, value),
/// CacheLog::Remove(key) => cache.remove(&key),
/// };
/// }
///
/// Ok(())
/// })
/// }
///
/// let mut wal: WAL<CacheLog, Cache> = WAL::open("./cache_wal", Box::new(replay_handler), None)
/// .await?;
///
/// let mut cache = wal.replay().await?;
/// ```
pub async fn open<P: AsRef<Path>>(
data_path: P,
replay_handler: ReplayHandler<Log, State>,
options: Option<WALOptions>,
) -> Result<WAL<Log, State>, Error> {
let data_path = data_path.as_ref().to_path_buf();
let path = data_path.join("wal.bin");
let options = options.unwrap_or_default();
let checkpoint_path = data_path.join("checkpoint.bin");
let temporary_checkpoint_path = data_path.join("temporary_checkpoint.bin");
create_directory(&data_path).await?;
if !path_exists(&path).await? {
let default_checkpoint = serialize(&State::default())?;
write_file(&checkpoint_path, &default_checkpoint).await?;
}
let file = open_file(&path, false, true, true, false).await?;
let size = file_size(&file).await?;
Ok(Self {
size,
path,
writer: BufWriter::new(file),
options,
data_path,
last_log_size: None,
replay_handler,
checkpoint_path,
temporary_checkpoint_path,
_log_phantom: PhantomData,
_checkpoint_phantom: PhantomData,
})
}
}