walr/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// Authors: Robert Lopez

mod checkpoint;
mod iter;
mod log;
mod replay;
mod truncate;
mod undo;
mod util;

pub mod error;
pub mod options;

#[cfg(test)]
mod tests;

use error::Error;
use options::WALOptions;
use serde::{Deserialize, Serialize};
use std::{
    future::Future,
    marker::PhantomData,
    path::{Path, PathBuf},
    pin::Pin,
    sync::Arc,
};
use tokio::{fs::File, io::BufWriter, sync::Mutex};
use util::{
    encode::serialize,
    fs::{create_directory, file_size, open_file, path_exists, write_file},
};

pub type ReplayHandler<Log, State> =
    Box<dyn Fn(Vec<Log>, Arc<Mutex<State>>) -> ReplayHandlerResult>;

/// `ReplayHandler` result type alias
pub type ReplayHandlerResult =
    Pin<Box<dyn Future<Output = Result<(), Box<dyn std::error::Error>>> + Send>>;

/// Write Ahead Log(WAL)
///
/// Persists the generic `State` by applying `Log`(s) to
/// all mutable actions before performing those actions
///
/// State is then `checkpoint`ed, and can be `replay`ed later
/// from disk to rebuild the `State` via the `replay_handler`
///
/// Performance can be tuned via the `WALOptions` argument in `open`,
/// but `WALOptions.divider` must stay consistent.
///
/// Disk Structure:
/// ```
/// ./<data_path>
///     ./wal.bin                  // Logs file
///     ./checkpoint.bin           // State checkpoint file
///     ./temporary-checkpoint.bin // Temporary file during `checkpoint`
/// ```
///
/// [See basic example here](https://gitlab.com/robertlopezdev/walr/-/blob/main/example.rs)
pub struct WAL<Log, State>
where
    Log: Serialize + for<'a> Deserialize<'a>,
    State: Default + Serialize + for<'a> Deserialize<'a>,
{
    pub size: usize,
    pub options: WALOptions,
    pub data_path: PathBuf,

    path: PathBuf,
    writer: BufWriter<File>,
    last_log_size: Option<usize>,
    replay_handler: ReplayHandler<Log, State>,
    checkpoint_path: PathBuf,
    temporary_checkpoint_path: PathBuf,
    _log_phantom: PhantomData<Log>,
    _checkpoint_phantom: PhantomData<State>,
}

impl<Log, State> WAL<Log, State>
where
    Log: Serialize + for<'a> Deserialize<'a>,
    State: Default + Serialize + for<'a> Deserialize<'a>,
{
    /// Open a WAL by `data_path`
    ///
    /// If `data_path` does not exist, will create it and
    /// setup a new empty WAL
    ///
    /// *Note*: `WALOptions.divider` must stay consistent.
    ///
    /// *Note*: Does not `replay` the WAL, must call manually.
    ///
    /// ---
    /// Example Usage:
    /// ```
    ///
    /// fn replay_handler(
    ///     logs: Vec<CacheLog>,
    ///     checkpoint: Arc<Mutex<Cache>>,
    /// ) -> ReplayHandlerResult {
    ///     Box::pin(async move {
    ///         let mut cache = cache.lock().await;
    ///
    ///         for log in logs {
    ///             match log {
    ///                 CacheLog::Insert { key, value } => cache.insert(key, value),
    ///                 CacheLog::Remove(key) => cache.remove(&key),
    ///             };
    ///         }
    ///
    ///         Ok(())
    ///     })
    /// }
    ///
    /// let mut wal: WAL<CacheLog, Cache> = WAL::open("./cache_wal", Box::new(replay_handler), None)
    ///    .await?;
    ///
    /// let mut cache = wal.replay().await?;
    /// ```
    pub async fn open<P: AsRef<Path>>(
        data_path: P,
        replay_handler: ReplayHandler<Log, State>,
        options: Option<WALOptions>,
    ) -> Result<WAL<Log, State>, Error> {
        let data_path = data_path.as_ref().to_path_buf();
        let path = data_path.join("wal.bin");
        let options = options.unwrap_or_default();
        let checkpoint_path = data_path.join("checkpoint.bin");
        let temporary_checkpoint_path = data_path.join("temporary_checkpoint.bin");

        create_directory(&data_path).await?;

        if !path_exists(&path).await? {
            let default_checkpoint = serialize(&State::default())?;
            write_file(&checkpoint_path, &default_checkpoint).await?;
        }

        let file = open_file(&path, false, true, true, false).await?;
        let size = file_size(&file).await?;

        Ok(Self {
            size,
            path,
            writer: BufWriter::new(file),
            options,
            data_path,
            last_log_size: None,
            replay_handler,
            checkpoint_path,
            temporary_checkpoint_path,
            _log_phantom: PhantomData,
            _checkpoint_phantom: PhantomData,
        })
    }
}