walrus_rust/wal/runtime/
walrus.rs

1use crate::wal::block::{Block, Metadata};
2use crate::wal::config::{
3    DEFAULT_BLOCK_SIZE, FsyncSchedule, MAX_FILE_SIZE, PREFIX_META_SIZE, debug_print,
4};
5use crate::wal::paths::WalPathManager;
6use crate::wal::storage::{SharedMmapKeeper, set_fsync_schedule};
7use std::collections::{HashMap, HashSet};
8use std::fs;
9use std::sync::mpsc;
10use std::sync::{Arc, RwLock};
11
12use super::WalIndex;
13use super::allocator::{BlockAllocator, BlockStateTracker, FileStateTracker, flush_check};
14use super::background::start_background_workers;
15use super::reader::Reader;
16use super::writer::Writer;
17use rkyv::Deserialize;
18
19#[derive(Clone, Copy, Debug)]
20pub enum ReadConsistency {
21    StrictlyAtOnce,
22    AtLeastOnce { persist_every: u32 },
23}
24
25pub struct Walrus {
26    pub(super) allocator: Arc<BlockAllocator>,
27    pub(super) reader: Arc<Reader>,
28    pub(super) writers: RwLock<HashMap<String, Arc<Writer>>>,
29    pub(super) fsync_tx: Arc<mpsc::Sender<String>>,
30    pub(super) read_offset_index: Arc<RwLock<WalIndex>>,
31    pub(super) read_consistency: ReadConsistency,
32    pub(super) fsync_schedule: FsyncSchedule,
33    pub(super) paths: Arc<WalPathManager>,
34}
35
36impl Walrus {
37    pub fn new() -> std::io::Result<Self> {
38        Self::with_consistency(ReadConsistency::StrictlyAtOnce)
39    }
40
41    pub fn with_consistency(mode: ReadConsistency) -> std::io::Result<Self> {
42        Self::with_consistency_and_schedule(mode, FsyncSchedule::Milliseconds(200))
43    }
44
45    pub fn with_consistency_and_schedule(
46        mode: ReadConsistency,
47        fsync_schedule: FsyncSchedule,
48    ) -> std::io::Result<Self> {
49        let paths = Arc::new(WalPathManager::default());
50        Self::with_paths(paths, mode, fsync_schedule)
51    }
52
53    pub fn new_for_key(key: &str) -> std::io::Result<Self> {
54        Self::with_consistency_for_key(key, ReadConsistency::StrictlyAtOnce)
55    }
56
57    pub fn with_consistency_for_key(key: &str, mode: ReadConsistency) -> std::io::Result<Self> {
58        Self::with_consistency_and_schedule_for_key(key, mode, FsyncSchedule::Milliseconds(200))
59    }
60
61    pub fn with_consistency_and_schedule_for_key(
62        key: &str,
63        mode: ReadConsistency,
64        fsync_schedule: FsyncSchedule,
65    ) -> std::io::Result<Self> {
66        let paths = WalPathManager::for_key(key);
67        Self::with_paths(Arc::new(paths), mode, fsync_schedule)
68    }
69
70    fn with_paths(
71        paths: Arc<WalPathManager>,
72        mode: ReadConsistency,
73        fsync_schedule: FsyncSchedule,
74    ) -> std::io::Result<Self> {
75        debug_print!("[walrus] new");
76
77        // Store the fsync schedule globally for SharedMmap::new to access
78        set_fsync_schedule(fsync_schedule);
79
80        let allocator = Arc::new(BlockAllocator::new(paths.clone())?);
81        let reader = Arc::new(Reader::new());
82        let tx_arc = start_background_workers(fsync_schedule);
83
84        let idx = WalIndex::new_in(&paths, "read_offset_idx")?;
85        let instance = Walrus {
86            allocator,
87            reader,
88            writers: RwLock::new(HashMap::new()),
89            fsync_tx: tx_arc,
90            read_offset_index: Arc::new(RwLock::new(idx)),
91            read_consistency: mode,
92            fsync_schedule,
93            paths,
94        };
95        instance.startup_chore()?;
96        Ok(instance)
97    }
98
99    pub(super) fn get_or_create_writer(&self, col_name: &str) -> std::io::Result<Arc<Writer>> {
100        if let Some(writer) = {
101            let map = self.writers.read().map_err(|_| {
102                std::io::Error::new(std::io::ErrorKind::Other, "writers read lock poisoned")
103            })?;
104            map.get(col_name).cloned()
105        } {
106            return Ok(writer);
107        }
108
109        let mut map = self.writers.write().map_err(|_| {
110            std::io::Error::new(std::io::ErrorKind::Other, "writers write lock poisoned")
111        })?;
112
113        if let Some(writer) = map.get(col_name).cloned() {
114            return Ok(writer);
115        }
116
117        // SAFETY: The returned block will be held by this writer only
118        // and appended/sealed before being exposed to readers.
119        let initial_block = unsafe { self.allocator.get_next_available_block()? };
120        let writer = Arc::new(Writer::new(
121            self.allocator.clone(),
122            initial_block,
123            self.reader.clone(),
124            col_name.to_string(),
125            self.fsync_tx.clone(),
126            self.fsync_schedule,
127        ));
128        map.insert(col_name.to_string(), writer.clone());
129        Ok(writer)
130    }
131
132    pub(super) fn startup_chore(&self) -> std::io::Result<()> {
133        // Minimal recovery: scan wal data dir, build reader chains, and rebuild trackers
134        let dir = match fs::read_dir(self.paths.root()) {
135            Ok(d) => d,
136            Err(_) => return Ok(()),
137        };
138        let mut files: Vec<String> = Vec::new();
139        for entry in dir {
140            let entry = match entry {
141                Ok(e) => e,
142                Err(_) => continue,
143            };
144            let path = entry.path();
145            if let Ok(ft) = entry.file_type() {
146                if ft.is_dir() {
147                    continue;
148                }
149            }
150            if let Some(s) = path.to_str() {
151                // skip index files
152                if s.ends_with("_index.db") {
153                    continue;
154                }
155                files.push(s.to_string());
156            }
157        }
158        files.sort();
159        if !files.is_empty() {
160            debug_print!("[recovery] scanning files: {}", files.len());
161        }
162
163        // synthetic block ids btw
164        let mut next_block_id: usize = 1;
165        let mut seen_files = HashSet::new();
166
167        for file_path in files.iter() {
168            let mmap = match SharedMmapKeeper::get_mmap_arc(file_path) {
169                Ok(m) => m,
170                Err(e) => {
171                    debug_print!("[recovery] mmap open failed for {}: {}", file_path, e);
172                    continue;
173                }
174            };
175            seen_files.insert(file_path.clone());
176            FileStateTracker::register_file_if_absent(file_path);
177            debug_print!("[recovery] file {}", file_path);
178
179            let mut block_offset: u64 = 0;
180            while block_offset + DEFAULT_BLOCK_SIZE <= MAX_FILE_SIZE {
181                // heuristic: if first bytes are zero, assume no more blocks
182                let mut probe = [0u8; 8];
183                mmap.read(block_offset as usize, &mut probe);
184                if probe.iter().all(|&b| b == 0) {
185                    break;
186                }
187
188                let mut used: u64 = 0;
189
190                // try to read first metadata to get column name (with 2-byte length prefix)
191                let mut meta_buf = vec![0u8; PREFIX_META_SIZE];
192                mmap.read(block_offset as usize, &mut meta_buf);
193                let meta_len = (meta_buf[0] as usize) | ((meta_buf[1] as usize) << 8);
194                if meta_len == 0 || meta_len > PREFIX_META_SIZE - 2 {
195                    break;
196                }
197                let mut aligned = rkyv::AlignedVec::with_capacity(meta_len);
198                aligned.extend_from_slice(&meta_buf[2..2 + meta_len]);
199                // SAFETY: `aligned` was constructed from a bounded metadata slice
200                // read from our file; alignment is ensured by `AlignedVec`.
201                // SAFETY: `aligned` is built from bounded bytes inside the block,
202                // copied into `AlignedVec` ensuring alignment for rkyv.
203                let archived = unsafe { rkyv::archived_root::<Metadata>(&aligned[..]) };
204                let md: Metadata = match archived.deserialize(&mut rkyv::Infallible) {
205                    Ok(m) => m,
206                    Err(_) => {
207                        break;
208                    }
209                };
210                let col_name = md.owned_by;
211
212                // scan entries to compute used
213                let block_stub = Block {
214                    id: next_block_id as u64,
215                    file_path: file_path.clone(),
216                    offset: block_offset,
217                    limit: DEFAULT_BLOCK_SIZE,
218                    mmap: mmap.clone(),
219                    used: 0,
220                };
221                let mut in_block_off: u64 = 0;
222                loop {
223                    match block_stub.read(in_block_off) {
224                        Ok((_entry, consumed)) => {
225                            used += consumed as u64;
226                            in_block_off += consumed as u64;
227                            if in_block_off >= DEFAULT_BLOCK_SIZE {
228                                break;
229                            }
230                        }
231                        Err(_) => break,
232                    }
233                }
234                if used == 0 {
235                    break;
236                }
237
238                let block = Block {
239                    id: next_block_id as u64,
240                    file_path: file_path.clone(),
241                    offset: block_offset,
242                    limit: DEFAULT_BLOCK_SIZE,
243                    mmap: mmap.clone(),
244                    used,
245                };
246                // register and append
247                BlockStateTracker::register_block(next_block_id, file_path);
248                FileStateTracker::add_block_to_file_state(file_path);
249                if !col_name.is_empty() {
250                    let _ = self.reader.append_block_to_chain(&col_name, block.clone());
251                    debug_print!(
252                        "[recovery] appended block: file={}, block_id={}, used={}, col={}",
253                        file_path,
254                        block.id,
255                        block.used,
256                        col_name
257                    );
258                }
259                next_block_id += 1;
260                block_offset += DEFAULT_BLOCK_SIZE;
261            }
262        }
263
264        // hydrate index into memory and mark checkpointed blocks
265        if let Ok(idx_guard) = self.read_offset_index.read() {
266            let map = self.reader.data.read().ok();
267            if let Some(map) = map {
268                for (col, info_arc) in map.iter() {
269                    if let Some(pos) = idx_guard.get(col) {
270                        let mut info = match info_arc.write() {
271                            Ok(v) => v,
272                            Err(_) => continue,
273                        };
274                        let mut ib = pos.cur_block_idx as usize;
275                        if ib > info.chain.len() {
276                            ib = info.chain.len();
277                        }
278                        info.cur_block_idx = ib;
279                        if ib < info.chain.len() {
280                            let used = info.chain[ib].used;
281                            info.cur_block_offset = pos.cur_block_offset.min(used);
282                        } else {
283                            info.cur_block_offset = 0;
284                        }
285                        for i in 0..ib {
286                            BlockStateTracker::set_checkpointed_true(info.chain[i].id as usize);
287                        }
288                        if ib < info.chain.len() && info.cur_block_offset >= info.chain[ib].used {
289                            BlockStateTracker::set_checkpointed_true(info.chain[ib].id as usize);
290                        }
291                    }
292                }
293            }
294        }
295
296        // enqueue deletion checks
297        for f in seen_files.into_iter() {
298            flush_check(f);
299        }
300        Ok(())
301    }
302}