block_db/
wal.rs

1// Authors: Robert Lopez
2
3use super::{error::Error, BlockDB};
4use serde::{Deserialize, Serialize};
5use std::{
6    collections::HashSet,
7    ops::{Deref, DerefMut},
8    path::PathBuf,
9    sync::Arc,
10};
11use tokio::sync::Mutex;
12use walr::{options::WALOptionsBuilder, ReplayHandlerResult, WAL};
13
14#[derive(Debug, Serialize, Deserialize)]
15pub enum BlockDBLog {
16    AddDataFile(String),
17    AddDataFiles(HashSet<String>),
18    DeleteDataFile(String),
19    DeleteDataFiles,
20}
21
22#[derive(Debug, Default, Serialize, Deserialize)]
23pub struct BlockDBState {
24    pub valid_data_files: HashSet<String>,
25}
26
27#[derive(Debug)]
28pub struct BlockDBWAL(WAL<BlockDBLog, BlockDBState, ()>);
29
30impl Deref for BlockDBWAL {
31    type Target = WAL<BlockDBLog, BlockDBState, ()>;
32
33    fn deref(&self) -> &Self::Target {
34        &self.0
35    }
36}
37
38impl DerefMut for BlockDBWAL {
39    fn deref_mut(&mut self) -> &mut Self::Target {
40        &mut self.0
41    }
42}
43
44fn replay_handler(
45    logs: Vec<BlockDBLog>,
46    checkpoint: Arc<Mutex<BlockDBState>>,
47    _context: Option<()>,
48) -> ReplayHandlerResult {
49    Box::pin(async move {
50        let mut checkpoint = checkpoint.lock().await;
51
52        for log in logs {
53            match log {
54                BlockDBLog::AddDataFile(id) => {
55                    checkpoint.valid_data_files.insert(id);
56                }
57                BlockDBLog::AddDataFiles(ids) => {
58                    checkpoint.valid_data_files.extend(ids);
59                }
60                BlockDBLog::DeleteDataFile(ref id) => {
61                    checkpoint.valid_data_files.remove(id);
62                }
63                BlockDBLog::DeleteDataFiles => {
64                    checkpoint.valid_data_files.clear();
65                }
66            }
67        }
68
69        Ok(())
70    })
71}
72
73impl BlockDBWAL {
74    pub async fn open(data_path: PathBuf) -> Result<Self, Error> {
75        Ok(Self(
76            WAL::open(
77                data_path,
78                Box::new(replay_handler),
79                Some(
80                    WALOptionsBuilder::new()
81                        .max_size((size_of::<BlockDBLog>() + 2) * 1_000)
82                        .build(),
83                ),
84            )
85            .await?,
86        ))
87    }
88}
89
90impl BlockDB {
91    pub async fn wal_checkpoint_handler(
92        &self,
93        wal_writer: &mut WAL<BlockDBLog, BlockDBState, ()>,
94    ) -> Result<(), Error> {
95        if wal_writer.should_checkpoint() {
96            wal_writer
97                .checkpoint(&BlockDBState {
98                    valid_data_files: self.data_files.read().await.keys().cloned().collect(),
99                })
100                .await?;
101        }
102
103        Ok(())
104    }
105}