Skip to main content

astra_core/
wal.rs

1use std::fs::{File, OpenOptions};
2use std::io::{BufRead, BufReader, Write};
3use std::path::{Path, PathBuf};
4
5use parking_lot::Mutex;
6use serde::{Deserialize, Serialize};
7
8use crate::errors::StoreError;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
11#[serde(tag = "op", rename_all = "snake_case")]
12pub enum WalEntry {
13    Put {
14        key: Vec<u8>,
15        value: Vec<u8>,
16        lease: i64,
17        revision: i64,
18    },
19    Delete {
20        key: Vec<u8>,
21        revision: i64,
22    },
23}
24
25#[derive(Debug)]
26pub struct UnifiedWal {
27    path: PathBuf,
28    file: Mutex<File>,
29}
30
31impl UnifiedWal {
32    pub fn open(data_dir: &Path) -> Result<Self, StoreError> {
33        std::fs::create_dir_all(data_dir)?;
34        let path = data_dir.join("unified-raft.wal");
35        let file = OpenOptions::new().create(true).append(true).open(&path)?;
36
37        Ok(Self {
38            path,
39            file: Mutex::new(file),
40        })
41    }
42
43    pub fn append(&self, entry: &WalEntry) -> Result<(), StoreError> {
44        let mut guard = self.file.lock();
45        let line = serde_json::to_string(entry)?;
46        guard.write_all(line.as_bytes())?;
47        guard.write_all(b"\n")?;
48        guard.sync_data()?;
49        Ok(())
50    }
51
52    pub fn replay(&self) -> Result<Vec<WalEntry>, StoreError> {
53        let file = OpenOptions::new().read(true).open(&self.path)?;
54        let reader = BufReader::new(file);
55
56        let mut out = Vec::new();
57        for line in reader.lines() {
58            let line = line?;
59            if line.trim().is_empty() {
60                continue;
61            }
62            out.push(serde_json::from_str::<WalEntry>(&line)?);
63        }
64        Ok(out)
65    }
66}