1use super::{error::Error, BlockDB};
4use serde::{Deserialize, Serialize};
5use std::{
6 collections::HashSet,
7 ops::{Deref, DerefMut},
8 path::PathBuf,
9 sync::Arc,
10};
11use tokio::sync::Mutex;
12use walr::{options::WALOptionsBuilder, ReplayHandlerResult, WAL};
13
14#[derive(Debug, Serialize, Deserialize)]
15pub enum BlockDBLog {
16 AddDataFile(String),
17 AddDataFiles(HashSet<String>),
18 DeleteDataFile(String),
19 DeleteDataFiles,
20}
21
22#[derive(Debug, Default, Serialize, Deserialize)]
23pub struct BlockDBState {
24 pub valid_data_files: HashSet<String>,
25}
26
27#[derive(Debug)]
28pub struct BlockDBWAL(WAL<BlockDBLog, BlockDBState, ()>);
29
30impl Deref for BlockDBWAL {
31 type Target = WAL<BlockDBLog, BlockDBState, ()>;
32
33 fn deref(&self) -> &Self::Target {
34 &self.0
35 }
36}
37
38impl DerefMut for BlockDBWAL {
39 fn deref_mut(&mut self) -> &mut Self::Target {
40 &mut self.0
41 }
42}
43
44fn replay_handler(
45 logs: Vec<BlockDBLog>,
46 checkpoint: Arc<Mutex<BlockDBState>>,
47 _context: Option<()>,
48) -> ReplayHandlerResult {
49 Box::pin(async move {
50 let mut checkpoint = checkpoint.lock().await;
51
52 for log in logs {
53 match log {
54 BlockDBLog::AddDataFile(id) => {
55 checkpoint.valid_data_files.insert(id);
56 }
57 BlockDBLog::AddDataFiles(ids) => {
58 checkpoint.valid_data_files.extend(ids);
59 }
60 BlockDBLog::DeleteDataFile(ref id) => {
61 checkpoint.valid_data_files.remove(id);
62 }
63 BlockDBLog::DeleteDataFiles => {
64 checkpoint.valid_data_files.clear();
65 }
66 }
67 }
68
69 Ok(())
70 })
71}
72
73impl BlockDBWAL {
74 pub async fn open(data_path: PathBuf) -> Result<Self, Error> {
75 Ok(Self(
76 WAL::open(
77 data_path,
78 Box::new(replay_handler),
79 Some(
80 WALOptionsBuilder::new()
81 .max_size((size_of::<BlockDBLog>() + 2) * 1_000)
82 .build(),
83 ),
84 )
85 .await?,
86 ))
87 }
88}
89
90impl BlockDB {
91 pub async fn wal_checkpoint_handler(
92 &self,
93 wal_writer: &mut WAL<BlockDBLog, BlockDBState, ()>,
94 ) -> Result<(), Error> {
95 if wal_writer.should_checkpoint() {
96 wal_writer
97 .checkpoint(&BlockDBState {
98 valid_data_files: self.data_files.read().await.keys().cloned().collect(),
99 })
100 .await?;
101 }
102
103 Ok(())
104 }
105}