1use crate::wal::block::{Block, Metadata};
2use crate::wal::config::{
3 DEFAULT_BLOCK_SIZE, FsyncSchedule, MAX_FILE_SIZE, PREFIX_META_SIZE, debug_print,
4};
5use crate::wal::paths::WalPathManager;
6use crate::wal::storage::{SharedMmapKeeper, set_fsync_schedule};
7use std::collections::{HashMap, HashSet};
8use std::fs;
9use std::sync::mpsc;
10use std::sync::{Arc, RwLock};
11
12use super::WalIndex;
13use super::allocator::{BlockAllocator, BlockStateTracker, FileStateTracker, flush_check};
14use super::background::start_background_workers;
15use super::reader::Reader;
16use super::writer::Writer;
17use rkyv::Deserialize;
18
19#[derive(Clone, Copy, Debug)]
20pub enum ReadConsistency {
21 StrictlyAtOnce,
22 AtLeastOnce { persist_every: u32 },
23}
24
25pub struct Walrus {
26 pub(super) allocator: Arc<BlockAllocator>,
27 pub(super) reader: Arc<Reader>,
28 pub(super) writers: RwLock<HashMap<String, Arc<Writer>>>,
29 pub(super) fsync_tx: Arc<mpsc::Sender<String>>,
30 pub(super) read_offset_index: Arc<RwLock<WalIndex>>,
31 pub(super) read_consistency: ReadConsistency,
32 pub(super) fsync_schedule: FsyncSchedule,
33 pub(super) paths: Arc<WalPathManager>,
34}
35
36impl Walrus {
37 pub fn new() -> std::io::Result<Self> {
38 Self::with_consistency(ReadConsistency::StrictlyAtOnce)
39 }
40
41 pub fn with_consistency(mode: ReadConsistency) -> std::io::Result<Self> {
42 Self::with_consistency_and_schedule(mode, FsyncSchedule::Milliseconds(200))
43 }
44
45 pub fn with_consistency_and_schedule(
46 mode: ReadConsistency,
47 fsync_schedule: FsyncSchedule,
48 ) -> std::io::Result<Self> {
49 let paths = Arc::new(WalPathManager::default());
50 Self::with_paths(paths, mode, fsync_schedule)
51 }
52
53 pub fn new_for_key(key: &str) -> std::io::Result<Self> {
54 Self::with_consistency_for_key(key, ReadConsistency::StrictlyAtOnce)
55 }
56
57 pub fn with_consistency_for_key(key: &str, mode: ReadConsistency) -> std::io::Result<Self> {
58 Self::with_consistency_and_schedule_for_key(key, mode, FsyncSchedule::Milliseconds(200))
59 }
60
61 pub fn with_consistency_and_schedule_for_key(
62 key: &str,
63 mode: ReadConsistency,
64 fsync_schedule: FsyncSchedule,
65 ) -> std::io::Result<Self> {
66 let paths = WalPathManager::for_key(key);
67 Self::with_paths(Arc::new(paths), mode, fsync_schedule)
68 }
69
70 fn with_paths(
71 paths: Arc<WalPathManager>,
72 mode: ReadConsistency,
73 fsync_schedule: FsyncSchedule,
74 ) -> std::io::Result<Self> {
75 debug_print!("[walrus] new");
76
77 set_fsync_schedule(fsync_schedule);
79
80 let allocator = Arc::new(BlockAllocator::new(paths.clone())?);
81 let reader = Arc::new(Reader::new());
82 let tx_arc = start_background_workers(fsync_schedule);
83
84 let idx = WalIndex::new_in(&paths, "read_offset_idx")?;
85 let instance = Walrus {
86 allocator,
87 reader,
88 writers: RwLock::new(HashMap::new()),
89 fsync_tx: tx_arc,
90 read_offset_index: Arc::new(RwLock::new(idx)),
91 read_consistency: mode,
92 fsync_schedule,
93 paths,
94 };
95 instance.startup_chore()?;
96 Ok(instance)
97 }
98
99 pub(super) fn get_or_create_writer(&self, col_name: &str) -> std::io::Result<Arc<Writer>> {
100 if let Some(writer) = {
101 let map = self.writers.read().map_err(|_| {
102 std::io::Error::new(std::io::ErrorKind::Other, "writers read lock poisoned")
103 })?;
104 map.get(col_name).cloned()
105 } {
106 return Ok(writer);
107 }
108
109 let mut map = self.writers.write().map_err(|_| {
110 std::io::Error::new(std::io::ErrorKind::Other, "writers write lock poisoned")
111 })?;
112
113 if let Some(writer) = map.get(col_name).cloned() {
114 return Ok(writer);
115 }
116
117 let initial_block = unsafe { self.allocator.get_next_available_block()? };
120 let writer = Arc::new(Writer::new(
121 self.allocator.clone(),
122 initial_block,
123 self.reader.clone(),
124 col_name.to_string(),
125 self.fsync_tx.clone(),
126 self.fsync_schedule,
127 ));
128 map.insert(col_name.to_string(), writer.clone());
129 Ok(writer)
130 }
131
132 pub(super) fn startup_chore(&self) -> std::io::Result<()> {
133 let dir = match fs::read_dir(self.paths.root()) {
135 Ok(d) => d,
136 Err(_) => return Ok(()),
137 };
138 let mut files: Vec<String> = Vec::new();
139 for entry in dir {
140 let entry = match entry {
141 Ok(e) => e,
142 Err(_) => continue,
143 };
144 let path = entry.path();
145 if let Ok(ft) = entry.file_type() {
146 if ft.is_dir() {
147 continue;
148 }
149 }
150 if let Some(s) = path.to_str() {
151 if s.ends_with("_index.db") {
153 continue;
154 }
155 files.push(s.to_string());
156 }
157 }
158 files.sort();
159 if !files.is_empty() {
160 debug_print!("[recovery] scanning files: {}", files.len());
161 }
162
163 let mut next_block_id: usize = 1;
165 let mut seen_files = HashSet::new();
166
167 for file_path in files.iter() {
168 let mmap = match SharedMmapKeeper::get_mmap_arc(file_path) {
169 Ok(m) => m,
170 Err(e) => {
171 debug_print!("[recovery] mmap open failed for {}: {}", file_path, e);
172 continue;
173 }
174 };
175 seen_files.insert(file_path.clone());
176 FileStateTracker::register_file_if_absent(file_path);
177 debug_print!("[recovery] file {}", file_path);
178
179 let mut block_offset: u64 = 0;
180 while block_offset + DEFAULT_BLOCK_SIZE <= MAX_FILE_SIZE {
181 let mut probe = [0u8; 8];
183 mmap.read(block_offset as usize, &mut probe);
184 if probe.iter().all(|&b| b == 0) {
185 break;
186 }
187
188 let mut used: u64 = 0;
189
190 let mut meta_buf = vec![0u8; PREFIX_META_SIZE];
192 mmap.read(block_offset as usize, &mut meta_buf);
193 let meta_len = (meta_buf[0] as usize) | ((meta_buf[1] as usize) << 8);
194 if meta_len == 0 || meta_len > PREFIX_META_SIZE - 2 {
195 break;
196 }
197 let mut aligned = rkyv::AlignedVec::with_capacity(meta_len);
198 aligned.extend_from_slice(&meta_buf[2..2 + meta_len]);
199 let archived = unsafe { rkyv::archived_root::<Metadata>(&aligned[..]) };
204 let md: Metadata = match archived.deserialize(&mut rkyv::Infallible) {
205 Ok(m) => m,
206 Err(_) => {
207 break;
208 }
209 };
210 let col_name = md.owned_by;
211
212 let block_stub = Block {
214 id: next_block_id as u64,
215 file_path: file_path.clone(),
216 offset: block_offset,
217 limit: DEFAULT_BLOCK_SIZE,
218 mmap: mmap.clone(),
219 used: 0,
220 };
221 let mut in_block_off: u64 = 0;
222 loop {
223 match block_stub.read(in_block_off) {
224 Ok((_entry, consumed)) => {
225 used += consumed as u64;
226 in_block_off += consumed as u64;
227 if in_block_off >= DEFAULT_BLOCK_SIZE {
228 break;
229 }
230 }
231 Err(_) => break,
232 }
233 }
234 if used == 0 {
235 break;
236 }
237
238 let block = Block {
239 id: next_block_id as u64,
240 file_path: file_path.clone(),
241 offset: block_offset,
242 limit: DEFAULT_BLOCK_SIZE,
243 mmap: mmap.clone(),
244 used,
245 };
246 BlockStateTracker::register_block(next_block_id, file_path);
248 FileStateTracker::add_block_to_file_state(file_path);
249 if !col_name.is_empty() {
250 let _ = self.reader.append_block_to_chain(&col_name, block.clone());
251 debug_print!(
252 "[recovery] appended block: file={}, block_id={}, used={}, col={}",
253 file_path,
254 block.id,
255 block.used,
256 col_name
257 );
258 }
259 next_block_id += 1;
260 block_offset += DEFAULT_BLOCK_SIZE;
261 }
262 }
263
264 if let Ok(idx_guard) = self.read_offset_index.read() {
266 let map = self.reader.data.read().ok();
267 if let Some(map) = map {
268 for (col, info_arc) in map.iter() {
269 if let Some(pos) = idx_guard.get(col) {
270 let mut info = match info_arc.write() {
271 Ok(v) => v,
272 Err(_) => continue,
273 };
274 let mut ib = pos.cur_block_idx as usize;
275 if ib > info.chain.len() {
276 ib = info.chain.len();
277 }
278 info.cur_block_idx = ib;
279 if ib < info.chain.len() {
280 let used = info.chain[ib].used;
281 info.cur_block_offset = pos.cur_block_offset.min(used);
282 } else {
283 info.cur_block_offset = 0;
284 }
285 for i in 0..ib {
286 BlockStateTracker::set_checkpointed_true(info.chain[i].id as usize);
287 }
288 if ib < info.chain.len() && info.cur_block_offset >= info.chain[ib].used {
289 BlockStateTracker::set_checkpointed_true(info.chain[ib].id as usize);
290 }
291 }
292 }
293 }
294 }
295
296 for f in seen_files.into_iter() {
298 flush_check(f);
299 }
300 Ok(())
301 }
302}