1use memmap2::MmapMut;
2use rkyv::{Archive, Deserialize, Serialize};
3use std::cell::UnsafeCell;
4use std::collections::HashMap;
5use std::fs;
6use std::fs::OpenOptions;
7use std::path::Path;
8use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering};
9use std::sync::mpsc;
10use std::sync::{Arc, Mutex, OnceLock, RwLock};
11use std::thread;
12use std::time::Duration;
13use std::time::SystemTime;
14
15macro_rules! debug_print {
17 ($($arg:tt)*) => {
18 if std::env::var("WALRUS_QUIET").is_err() {
19 println!($($arg)*);
20 }
21 };
22}
23
24const DEFAULT_BLOCK_SIZE: u64 = 10 * 1024 * 1024; const BLOCKS_PER_FILE: u64 = 100;
26const MAX_ALLOC: u64 = 1 * 1024 * 1024 * 1024; const PREFIX_META_SIZE: usize = 64;
28const MAX_FILE_SIZE: u64 = DEFAULT_BLOCK_SIZE * BLOCKS_PER_FILE;
29
30fn now_millis_str() -> String {
31 let ms = SystemTime::now()
32 .duration_since(SystemTime::UNIX_EPOCH)
33 .unwrap_or_else(|_| std::time::Duration::from_secs(0))
34 .as_millis();
35 ms.to_string()
36}
37
38fn checksum64(data: &[u8]) -> u64 {
39 const FNV_OFFSET: u64 = 0xcbf29ce484222325;
41 const FNV_PRIME: u64 = 0x00000100000001B3;
42 let mut hash = FNV_OFFSET;
43 for &b in data {
44 hash ^= b as u64;
45 hash = hash.wrapping_mul(FNV_PRIME);
46 }
47 hash
48}
49
50#[derive(Clone, Debug)]
51
52pub struct Entry {
53 pub data: Vec<u8>,
54}
55
56#[derive(Archive, Deserialize, Serialize, Debug)]
57#[archive(check_bytes)]
58struct Metadata {
59 read_size: usize,
60 owned_by: String,
61 next_block_start: u64,
62 checksum: u64,
63}
64
65#[derive(Clone, Debug)]
66pub struct Block {
67 id: u64,
68 file_path: String,
69 offset: u64,
70 limit: u64,
71 mmap: Arc<SharedMmap>,
72 used: u64,
73}
74
75impl Block {
76 fn write(
77 &self,
78 in_block_offset: u64,
79 data: &[u8],
80 owned_by: &str,
81 next_block_start: u64,
82 ) -> std::io::Result<()> {
83 debug_assert!(
84 in_block_offset + (data.len() as u64 + PREFIX_META_SIZE as u64) <= self.limit
85 );
86
87 let new_meta = Metadata {
88 read_size: data.len(),
89 owned_by: owned_by.to_string(),
90 next_block_start,
91 checksum: checksum64(data),
92 };
93
94 let meta_bytes = rkyv::to_bytes::<_, 256>(&new_meta).map_err(|e| {
95 std::io::Error::new(
96 std::io::ErrorKind::Other,
97 format!("serialize metadata failed: {:?}", e),
98 )
99 })?;
100 if meta_bytes.len() > PREFIX_META_SIZE - 2 {
101 return Err(std::io::Error::new(
102 std::io::ErrorKind::InvalidData,
103 "metadata too large",
104 ));
105 }
106
107 let mut meta_buffer = vec![0u8; PREFIX_META_SIZE];
108 meta_buffer[0] = (meta_bytes.len() & 0xFF) as u8;
110 meta_buffer[1] = ((meta_bytes.len() >> 8) & 0xFF) as u8;
111 meta_buffer[2..2 + meta_bytes.len()].copy_from_slice(&meta_bytes);
113
114 let mut combined = Vec::with_capacity(PREFIX_META_SIZE + data.len());
116 combined.extend_from_slice(&meta_buffer);
117 combined.extend_from_slice(data);
118
119 let file_offset = self.offset + in_block_offset;
120 self.mmap.write(file_offset as usize, &combined);
121 Ok(())
122 }
123
124 fn read(&self, in_block_offset: u64) -> std::io::Result<(Entry, usize)> {
125 let mut meta_buffer = vec![0; PREFIX_META_SIZE];
126 let file_offset = self.offset + in_block_offset;
127 self.mmap.read(file_offset as usize, &mut meta_buffer);
128
129 let meta_len = (meta_buffer[0] as usize) | ((meta_buffer[1] as usize) << 8);
131
132 if meta_len == 0 || meta_len > PREFIX_META_SIZE - 2 {
133 return Err(std::io::Error::new(
134 std::io::ErrorKind::InvalidData,
135 format!("invalid metadata length: {}", meta_len),
136 ));
137 }
138
139 let mut aligned = rkyv::AlignedVec::with_capacity(meta_len);
141 aligned.extend_from_slice(&meta_buffer[2..2 + meta_len]);
142
143 let archived = unsafe { rkyv::archived_root::<Metadata>(&aligned[..]) };
147 let meta: Metadata = archived.deserialize(&mut rkyv::Infallible).map_err(|_| {
148 std::io::Error::new(
149 std::io::ErrorKind::InvalidData,
150 "failed to deserialize metadata",
151 )
152 })?;
153 let actual_entry_size = meta.read_size;
154
155 let new_offset = file_offset + PREFIX_META_SIZE as u64;
157 let mut ret_buffer = vec![0; actual_entry_size];
158 self.mmap.read(new_offset as usize, &mut ret_buffer);
159
160 let expected = meta.checksum;
162 if checksum64(&ret_buffer) != expected {
163 debug_print!(
164 "[reader] checksum mismatch; skipping corrupted entry at offset={} in file={}, block_id={}",
165 in_block_offset,
166 self.file_path,
167 self.id
168 );
169 return Err(std::io::Error::new(
170 std::io::ErrorKind::InvalidData,
171 "checksum mismatch, data corruption detected",
172 ));
173 }
174
175 let consumed = PREFIX_META_SIZE + actual_entry_size;
176 Ok((Entry { data: ret_buffer }, consumed))
177 }
178}
179
180fn make_new_file() -> std::io::Result<String> {
181 let file_name = now_millis_str();
182 let file_path = format!("wal_files/{}", file_name);
183 let f = std::fs::File::create(&file_path)?;
184 f.set_len(MAX_FILE_SIZE)?;
185 Ok(file_path)
186}
187
188struct BlockAllocator {
190 next_block: UnsafeCell<Block>,
191 lock: AtomicBool,
192}
193
194impl BlockAllocator {
195 pub fn new() -> std::io::Result<Self> {
196 std::fs::create_dir_all("wal_files").ok();
197 let file1 = make_new_file()?;
198 let mmap: Arc<SharedMmap> = SharedMmapKeeper::get_mmap_arc(&file1)?;
199 debug_print!(
200 "[alloc] init: created file={}, max_file_size={}B, block_size={}B",
201 file1,
202 MAX_FILE_SIZE,
203 DEFAULT_BLOCK_SIZE
204 );
205 Ok(BlockAllocator {
206 next_block: UnsafeCell::new(Block {
207 id: 1,
208 offset: 0,
209 limit: DEFAULT_BLOCK_SIZE,
210 file_path: file1,
211 mmap,
212 used: 0,
213 }),
214 lock: AtomicBool::new(false),
215 })
216 }
217
218 pub unsafe fn get_next_available_block(&self) -> std::io::Result<Block> {
225 self.lock();
226 let data = unsafe { &mut *self.next_block.get() };
229 let prev_block_file_path = data.file_path.clone();
230 if data.offset >= MAX_FILE_SIZE {
231 FileStateTracker::set_fully_allocated(prev_block_file_path);
233 data.file_path = make_new_file()?;
234 data.mmap = SharedMmapKeeper::get_mmap_arc(&data.file_path)?;
235 data.offset = 0;
236 data.used = 0;
237 debug_print!("[alloc] rolled over to new file: {}", data.file_path);
238 }
239
240 BlockStateTracker::register_block(data.id as usize, &data.file_path);
242 FileStateTracker::register_file_if_absent(&data.file_path);
243 FileStateTracker::add_block_to_file_state(&data.file_path);
244 FileStateTracker::set_block_locked(data.id as usize);
245 let ret = data.clone();
246 data.offset += DEFAULT_BLOCK_SIZE;
247 data.id += 1;
248 self.unlock();
249 debug_print!(
250 "[alloc] handout: block_id={}, file={}, offset={}, limit={}",
251 ret.id,
252 ret.file_path,
253 ret.offset,
254 ret.limit
255 );
256 Ok(ret)
257 }
258
259 pub unsafe fn alloc_block(&self, want_bytes: u64) -> std::io::Result<Block> {
263 if want_bytes == 0 || want_bytes > MAX_ALLOC {
264 return Err(std::io::Error::new(
265 std::io::ErrorKind::InvalidInput,
266 "invalid allocation size, a single entry can't be more than 1gb",
267 ));
268 }
269 let alloc_units = (want_bytes + DEFAULT_BLOCK_SIZE - 1) / DEFAULT_BLOCK_SIZE;
270 let alloc_size = alloc_units * DEFAULT_BLOCK_SIZE;
271 debug_print!(
272 "[alloc] alloc_block: want_bytes={}, units={}, size={}",
273 want_bytes,
274 alloc_units,
275 alloc_size
276 );
277
278 self.lock();
279 let data = unsafe { &mut *self.next_block.get() };
282 if data.offset + alloc_size > MAX_FILE_SIZE {
283 let prev_block_file_path = data.file_path.clone();
284 data.file_path = make_new_file()?;
285 data.mmap = SharedMmapKeeper::get_mmap_arc(&data.file_path)?;
286 data.offset = 0;
287 FileStateTracker::set_fully_allocated(prev_block_file_path);
289 debug_print!(
290 "[alloc] file rollover for sized alloc -> {}",
291 data.file_path
292 );
293 }
294 let ret = Block {
295 id: data.id,
296 file_path: data.file_path.clone(),
297 offset: data.offset,
298 limit: alloc_size,
299 mmap: data.mmap.clone(),
300 used: 0,
301 };
302 BlockStateTracker::register_block(ret.id as usize, &ret.file_path);
304 FileStateTracker::register_file_if_absent(&ret.file_path);
305 FileStateTracker::add_block_to_file_state(&ret.file_path);
306 FileStateTracker::set_block_locked(ret.id as usize);
307 data.offset += alloc_size;
308 data.id += 1;
309 self.unlock();
310 debug_print!(
311 "[alloc] handout(sized): block_id={}, file={}, offset={}, limit={}",
312 ret.id,
313 ret.file_path,
314 ret.offset,
315 ret.limit
316 );
317 Ok(ret)
318 }
319
320 fn lock(&self) {
324 while self
326 .lock
327 .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
328 .is_err()
329 {
330 std::hint::spin_loop();
331 }
332 }
333
334 fn unlock(&self) {
335 self.lock.store(false, Ordering::Release);
336 }
337}
338
339unsafe impl Sync for BlockAllocator {}
343unsafe impl Send for BlockAllocator {}
346
347struct Writer {
348 allocator: Arc<BlockAllocator>,
349 current_block: Mutex<Block>,
350 reader: Arc<Reader>,
351 col: String,
352 publisher: Arc<mpsc::Sender<String>>,
353 current_offset: Mutex<u64>,
354}
355
356impl Writer {
357 pub fn new(
358 allocator: Arc<BlockAllocator>,
359 current_block: Block,
360 reader: Arc<Reader>,
361 col: String,
362 publisher: Arc<mpsc::Sender<String>>,
363 ) -> Self {
364 Writer {
365 allocator,
366 current_block: Mutex::new(current_block),
367 reader,
368 col: col.clone(),
369 publisher,
370 current_offset: Mutex::new(0),
371 }
372 }
373
374 pub fn write(&self, data: &[u8]) -> std::io::Result<()> {
375 let mut block = self.current_block.lock().map_err(|_| {
376 std::io::Error::new(std::io::ErrorKind::Other, "current_block lock poisoned")
377 })?;
378 let mut cur = self.current_offset.lock().map_err(|_| {
379 std::io::Error::new(std::io::ErrorKind::Other, "current_offset lock poisoned")
380 })?;
381
382 let need = (PREFIX_META_SIZE as u64) + (data.len() as u64);
383 if *cur + need > block.limit {
384 debug_print!(
385 "[writer] sealing: col={}, block_id={}, used={}, need={}, limit={}",
386 self.col,
387 block.id,
388 *cur,
389 need,
390 block.limit
391 );
392 FileStateTracker::set_block_unlocked(block.id as usize);
393 let mut sealed = block.clone();
394 sealed.used = *cur;
395 sealed.mmap.flush()?;
396 let _ = self.reader.append_block_to_chain(&self.col, sealed);
397 debug_print!("[writer] appended sealed block to chain: col={}", self.col);
398 let new_block = unsafe { self.allocator.alloc_block(need) }?;
403 debug_print!(
404 "[writer] switched to new block: col={}, new_block_id={}",
405 self.col,
406 new_block.id
407 );
408 *block = new_block;
409 *cur = 0;
410 }
411 let next_block_start = block.offset + block.limit; block.write(*cur, data, &self.col, next_block_start)?;
413 debug_print!(
414 "[writer] wrote: col={}, block_id={}, offset_before={}, bytes={}, offset_after={}",
415 self.col,
416 block.id,
417 *cur,
418 need,
419 *cur + need
420 );
421 *cur += need;
422 let _ = self.publisher.send(block.file_path.clone());
423 Ok(())
424 }
425}
426
427#[derive(Debug)]
428struct SharedMmap {
429 mmap: MmapMut,
430 last_touched_at: AtomicU64,
431}
432
433unsafe impl Sync for SharedMmap {}
437unsafe impl Send for SharedMmap {}
440
441impl SharedMmap {
442 pub fn new(path: &str) -> std::io::Result<Arc<Self>> {
443 let file = OpenOptions::new().read(true).write(true).open(path)?;
444
445 let mmap = unsafe { MmapMut::map_mut(&file)? };
448 let now_ms = SystemTime::now()
449 .duration_since(SystemTime::UNIX_EPOCH)
450 .unwrap_or_else(|_| std::time::Duration::from_secs(0))
451 .as_millis() as u64;
452 Ok(Arc::new(Self {
453 mmap,
454 last_touched_at: AtomicU64::new(now_ms),
455 }))
456 }
457
458 pub fn write(&self, offset: usize, data: &[u8]) {
459 debug_assert!(offset <= self.mmap.len());
461 debug_assert!(self.mmap.len() - offset >= data.len());
462 unsafe {
468 let ptr = self.mmap.as_ptr() as *mut u8; std::ptr::copy_nonoverlapping(data.as_ptr(), ptr.add(offset), data.len());
470 }
471 let now_ms = SystemTime::now()
472 .duration_since(SystemTime::UNIX_EPOCH)
473 .unwrap_or_else(|_| std::time::Duration::from_secs(0))
474 .as_millis() as u64;
475 self.last_touched_at.store(now_ms, Ordering::Relaxed);
476 }
477
478 pub fn read(&self, offset: usize, dest: &mut [u8]) {
479 debug_assert!(offset + dest.len() <= self.mmap.len());
480 let src = &self.mmap[offset..offset + dest.len()];
481 dest.copy_from_slice(src);
482 }
483
484 pub fn len(&self) -> usize {
485 self.mmap.len()
486 }
487
488 pub fn flush(&self) -> std::io::Result<()> {
489 self.mmap.flush()
490 }
491}
492
493struct SharedMmapKeeper {
494 data: HashMap<String, Arc<SharedMmap>>,
495}
496
497impl SharedMmapKeeper {
498 fn new() -> Self {
499 Self {
500 data: HashMap::new(),
501 }
502 }
503
504 fn get_mmap_arc_read(path: &str) -> Option<Arc<SharedMmap>> {
506 static MMAP_KEEPER: OnceLock<RwLock<SharedMmapKeeper>> = OnceLock::new();
507 let keeper_lock = MMAP_KEEPER.get_or_init(|| RwLock::new(SharedMmapKeeper::new()));
508 let keeper = keeper_lock.read().ok()?;
509 keeper.data.get(path).cloned()
510 }
511
512 fn get_mmap_arc(path: &str) -> std::io::Result<Arc<SharedMmap>> {
514 if let Some(existing) = Self::get_mmap_arc_read(path) {
515 return Ok(existing);
516 }
517
518 static MMAP_KEEPER: OnceLock<RwLock<SharedMmapKeeper>> = OnceLock::new();
519 let keeper_lock = MMAP_KEEPER.get_or_init(|| RwLock::new(SharedMmapKeeper::new()));
520
521 {
523 let keeper = keeper_lock.read().map_err(|_| {
524 std::io::Error::new(std::io::ErrorKind::Other, "mmap keeper read lock poisoned")
525 })?;
526 if let Some(existing) = keeper.data.get(path) {
527 return Ok(existing.clone());
528 }
529 }
530
531 let mut keeper = keeper_lock.write().map_err(|_| {
532 std::io::Error::new(std::io::ErrorKind::Other, "mmap keeper write lock poisoned")
533 })?;
534 if let Some(existing) = keeper.data.get(path) {
535 return Ok(existing.clone());
536 }
537 let mmap_arc = SharedMmap::new(path)?;
538 keeper.data.insert(path.to_string(), mmap_arc.clone());
539 Ok(mmap_arc)
540 }
541}
542
543#[derive(Debug)]
544struct ColReaderInfo {
545 chain: Vec<Block>,
546 cur_block_idx: usize,
547 cur_block_offset: u64,
548 reads_since_persist: u32,
549 tail_block_id: u64,
552 tail_offset: u64,
553 hydrated_from_index: bool,
555}
556
557struct Reader {
558 data: RwLock<HashMap<String, Arc<RwLock<ColReaderInfo>>>>,
559}
560
561impl Reader {
562 fn new() -> Self {
563 Self {
564 data: RwLock::new(HashMap::new()),
565 }
566 }
567
568 fn get_chain_for_col(&self, col: &str) -> Option<Vec<Block>> {
569 let arc_info = {
570 let map = self.data.read().ok()?;
571 map.get(col)?.clone()
572 };
573 let info = arc_info.read().ok()?;
574 Some(info.chain.clone())
575 }
576
577 fn append_block_to_chain(&self, col: &str, block: Block) -> std::io::Result<()> {
579 if let Some(info_arc) = {
581 let map = self.data.read().map_err(|_| {
582 std::io::Error::new(std::io::ErrorKind::Other, "reader map read lock poisoned")
583 })?;
584 map.get(col).cloned()
585 } {
586 let mut info = info_arc.write().map_err(|_| {
587 std::io::Error::new(std::io::ErrorKind::Other, "col info write lock poisoned")
588 })?;
589 let before = info.chain.len();
590 info.chain.push(block.clone());
591 debug_print!(
592 "[reader] chain append(fast): col={}, block_id={}, chain_len {}->{}",
593 col,
594 block.id,
595 before,
596 before + 1
597 );
598 return Ok(());
599 }
600
601 let info_arc = {
603 let mut map = self.data.write().map_err(|_| {
604 std::io::Error::new(std::io::ErrorKind::Other, "reader map write lock poisoned")
605 })?;
606 map.entry(col.to_string())
607 .or_insert_with(|| {
608 Arc::new(RwLock::new(ColReaderInfo {
609 chain: Vec::new(),
610 cur_block_idx: 0,
611 cur_block_offset: 0,
612 reads_since_persist: 0,
613 tail_block_id: 0,
614 tail_offset: 0,
615 hydrated_from_index: false,
616 }))
617 })
618 .clone()
619 };
620 let mut info = info_arc.write().map_err(|_| {
621 std::io::Error::new(std::io::ErrorKind::Other, "col info write lock poisoned")
622 })?;
623 info.chain.push(block.clone());
624 debug_print!(
625 "[reader] chain append(slow/new): col={}, block_id={}, chain_len {}->{}",
626 col,
627 block.id,
628 0,
629 1
630 );
631 Ok(())
632 }
633}
634
635#[derive(Archive, Deserialize, Serialize, Debug, Clone)]
636pub struct BlockPos {
637 pub cur_block_idx: u64,
638 pub cur_block_offset: u64,
639}
640
641pub struct WalIndex {
642 store: HashMap<String, BlockPos>,
643 path: String,
644}
645
646impl WalIndex {
647 pub fn new(file_name: &str) -> std::io::Result<Self> {
648 fs::create_dir_all("./wal_files").ok();
650 let path = format!("./wal_files/{}_index.db", file_name);
651
652 let store = Path::new(&path)
653 .exists()
654 .then(|| fs::read(&path).ok())
655 .flatten()
656 .and_then(|bytes| {
657 if bytes.is_empty() {
658 return None;
659 }
660 let archived = unsafe { rkyv::archived_root::<HashMap<String, BlockPos>>(&bytes) };
663 archived.deserialize(&mut rkyv::Infallible).ok()
664 })
665 .unwrap_or_default();
666
667 Ok(Self {
668 store,
669 path: path.to_string(),
670 })
671 }
672
673 pub fn set(&mut self, key: String, idx: u64, offset: u64) -> std::io::Result<()> {
674 self.store.insert(
675 key,
676 BlockPos {
677 cur_block_idx: idx,
678 cur_block_offset: offset,
679 },
680 );
681 self.persist()
682 }
683
684 pub fn get(&self, key: &str) -> Option<&BlockPos> {
685 self.store.get(key)
686 }
687
688 pub fn remove(&mut self, key: &str) -> std::io::Result<Option<BlockPos>> {
689 let result = self.store.remove(key);
690 if result.is_some() {
691 self.persist()?;
692 }
693 Ok(result)
694 }
695
696 fn persist(&self) -> std::io::Result<()> {
697 let tmp_path = format!("{}.tmp", self.path);
698 let bytes = rkyv::to_bytes::<_, 256>(&self.store).map_err(|e| {
699 std::io::Error::new(
700 std::io::ErrorKind::Other,
701 format!("index serialize failed: {:?}", e),
702 )
703 })?;
704
705 fs::write(&tmp_path, &bytes)?;
706 fs::File::open(&tmp_path)?.sync_all()?;
707 fs::rename(&tmp_path, &self.path)?;
708 Ok(())
709 }
710}
711
712#[derive(Clone, Copy, Debug)]
714pub enum ReadConsistency {
715 StrictlyAtOnce,
716 AtLeastOnce { persist_every: u32 },
717}
718
719#[derive(Clone, Copy, Debug)]
720pub enum FsyncSchedule {
721 Milliseconds(u64),
722}
723
724pub struct Walrus {
725 allocator: Arc<BlockAllocator>,
726 reader: Arc<Reader>,
727 writers: RwLock<HashMap<String, Arc<Writer>>>,
728 fsync_tx: Arc<mpsc::Sender<String>>,
729 read_offset_index: Arc<RwLock<WalIndex>>,
730 read_consistency: ReadConsistency,
731 fsync_schedule: FsyncSchedule,
732}
733
734impl Walrus {
735 pub fn new() -> std::io::Result<Self> {
736 Self::with_consistency(ReadConsistency::StrictlyAtOnce)
737 }
738
739 pub fn with_consistency(mode: ReadConsistency) -> std::io::Result<Self> {
740 Self::with_consistency_and_schedule(mode, FsyncSchedule::Milliseconds(1000))
741 }
742
743 pub fn with_consistency_and_schedule(
744 mode: ReadConsistency,
745 fsync_schedule: FsyncSchedule,
746 ) -> std::io::Result<Self> {
747 debug_print!("[walrus] new");
748 let allocator = Arc::new(BlockAllocator::new()?);
749 let reader = Arc::new(Reader::new());
750 let (tx, rx) = mpsc::channel::<String>();
751 let tx_arc = Arc::new(tx);
752 let (del_tx, del_rx) = mpsc::channel::<String>();
753 let del_tx_arc = Arc::new(del_tx);
754 let _ = DELETION_TX.set(del_tx_arc.clone());
755 let pool: HashMap<String, MmapMut> = HashMap::new();
756 let tick = Arc::new(AtomicU64::new(0));
757 let sleep_millis = match fsync_schedule {
758 FsyncSchedule::Milliseconds(ms) => ms.max(1),
759 };
760 thread::spawn(move || {
762 let mut pool = pool;
763 let tick = tick;
764 let del_rx = del_rx;
765 let mut delete_pending = std::collections::HashSet::new();
766 loop {
767 thread::sleep(Duration::from_millis(sleep_millis));
768 let mut unique = std::collections::HashSet::new();
769 while let Ok(path) = rx.try_recv() {
770 unique.insert(path);
771 }
772 if !unique.is_empty() {
773 debug_print!("[flush] scheduling {} paths", unique.len());
774 }
775 for path in unique.into_iter() {
776 if !std::path::Path::new(&path).exists() {
778 debug_print!("[flush] file does not exist, skipping: {}", path);
779 continue;
780 }
781
782 if !pool.contains_key(&path) {
783 match OpenOptions::new().read(true).write(true).open(&path) {
784 Ok(file) => {
785 match unsafe { MmapMut::map_mut(&file) } {
788 Ok(mmap) => {
789 pool.insert(path.clone(), mmap);
790 }
791 Err(e) => {
792 debug_print!(
793 "[flush] failed to create memory map for {}: {}",
794 path,
795 e
796 );
797 continue;
798 }
799 }
800 }
801 Err(e) => {
802 debug_print!(
803 "[flush] failed to open file for flushing {}: {}",
804 path,
805 e
806 );
807 continue;
808 }
809 }
810 }
811 if let Some(mmap) = pool.get_mut(&path) {
812 if let Err(e) = mmap.flush() {
813 debug_print!("[flush] flush error for {}: {}", path, e);
814 }
815 }
816 }
817 while let Ok(path) = del_rx.try_recv() {
819 debug_print!("[reclaim] deletion requested: {}", path);
820 delete_pending.insert(path);
821 }
822 let n = tick.fetch_add(1, Ordering::Relaxed) + 1;
823 if n >= 1000 {
824 if tick
826 .compare_exchange(n, 0, Ordering::AcqRel, Ordering::Relaxed)
827 .is_ok()
828 {
829 let mut empty: HashMap<String, MmapMut> = HashMap::new();
830 std::mem::swap(&mut pool, &mut empty); for path in delete_pending.drain() {
833 match fs::remove_file(&path) {
834 Ok(_) => debug_print!("[reclaim] deleted file {}", path),
835 Err(e) => {
836 debug_print!("[reclaim] delete failed for {}: {}", path, e)
837 }
838 }
839 }
840 }
841 }
842 }
843 });
844 let idx = WalIndex::new("read_offset_idx")?;
845 let instance = Walrus {
846 allocator,
847 reader,
848 writers: RwLock::new(HashMap::new()),
849 fsync_tx: tx_arc,
850 read_offset_index: Arc::new(RwLock::new(idx)),
851 read_consistency: mode,
852 fsync_schedule,
853 };
854 instance.startup_chore()?;
855 Ok(instance)
856 }
857
858 pub fn append_for_topic(&self, col_name: &str, raw_bytes: &[u8]) -> std::io::Result<()> {
859 let writer = {
860 if let Some(w) = {
861 let map = self.writers.read().map_err(|_| {
862 std::io::Error::new(std::io::ErrorKind::Other, "writers read lock poisoned")
863 })?;
864 map.get(col_name).cloned()
865 } {
866 w
867 } else {
868 let mut map = self.writers.write().map_err(|_| {
869 std::io::Error::new(std::io::ErrorKind::Other, "writers write lock poisoned")
870 })?;
871 if let Some(w) = map.get(col_name).cloned() {
872 w
873 } else {
874 let initial_block = unsafe { self.allocator.get_next_available_block()? };
877 let w = Arc::new(Writer::new(
878 self.allocator.clone(),
879 initial_block,
880 self.reader.clone(),
881 col_name.to_string(),
882 self.fsync_tx.clone(),
883 ));
884 map.insert(col_name.to_string(), w.clone());
885 w
886 }
887 }
888 };
889 writer.write(raw_bytes)
890 }
891
892 pub fn read_next(&self, col_name: &str) -> std::io::Result<Option<Entry>> {
893 const TAIL_FLAG: u64 = 1u64 << 63;
894 let info_arc = if let Some(arc) = {
895 let map = self.reader.data.read().map_err(|_| {
896 std::io::Error::new(std::io::ErrorKind::Other, "reader map read lock poisoned")
897 })?;
898 map.get(col_name).cloned()
899 } {
900 arc
901 } else {
902 let mut map = self.reader.data.write().map_err(|_| {
903 std::io::Error::new(std::io::ErrorKind::Other, "reader map write lock poisoned")
904 })?;
905 map.entry(col_name.to_string())
906 .or_insert_with(|| {
907 Arc::new(RwLock::new(ColReaderInfo {
908 chain: Vec::new(),
909 cur_block_idx: 0,
910 cur_block_offset: 0,
911 reads_since_persist: 0,
912 tail_block_id: 0,
913 tail_offset: 0,
914 hydrated_from_index: false,
915 }))
916 })
917 .clone()
918 };
919 let mut info = info_arc.write().map_err(|_| {
920 std::io::Error::new(std::io::ErrorKind::Other, "col info write lock poisoned")
921 })?;
922 debug_print!(
923 "[reader] read_next start: col={}, chain_len={}, idx={}, offset={}",
924 col_name,
925 info.chain.len(),
926 info.cur_block_idx,
927 info.cur_block_offset
928 );
929
930 let mut persisted_tail: Option<(u64 , u64 )> = None;
932 if !info.hydrated_from_index {
933 if let Ok(idx_guard) = self.read_offset_index.read() {
934 if let Some(pos) = idx_guard.get(col_name) {
935 if (pos.cur_block_idx & TAIL_FLAG) != 0 {
936 let tail_block_id = pos.cur_block_idx & (!TAIL_FLAG);
937 persisted_tail = Some((tail_block_id, pos.cur_block_offset));
938 info.cur_block_idx = info.chain.len();
940 info.cur_block_offset = 0;
941 } else {
942 let mut ib = pos.cur_block_idx as usize;
943 if ib > info.chain.len() {
944 ib = info.chain.len();
945 }
946 info.cur_block_idx = ib;
947 if ib < info.chain.len() {
948 let used = info.chain[ib].used;
949 info.cur_block_offset = pos.cur_block_offset.min(used);
950 } else {
951 info.cur_block_offset = 0;
952 }
953 }
954 info.hydrated_from_index = true;
955 } else {
956 info.hydrated_from_index = true;
958 }
959 }
960 }
961
962 if let Some((_, tail_off)) = persisted_tail {
964 if !info.chain.is_empty() {
965 let ib = info.chain.len() - 1;
966 info.cur_block_idx = ib;
967 info.cur_block_offset = tail_off.min(info.chain[ib].used);
968 if self.should_persist(&mut info, true) {
969 if let Ok(mut idx_guard) = self.read_offset_index.write() {
970 let _ = idx_guard.set(
971 col_name.to_string(),
972 info.cur_block_idx as u64,
973 info.cur_block_offset,
974 );
975 }
976 }
977 persisted_tail = None;
978 }
979 }
980
981 loop {
982 if info.cur_block_idx < info.chain.len() {
984 let idx = info.cur_block_idx;
985 let off = info.cur_block_offset;
986 let block = info.chain[idx].clone();
987
988 if off >= block.used {
989 debug_print!(
990 "[reader] read_next: advance block col={}, block_id={}, offset={}, used={}",
991 col_name,
992 block.id,
993 off,
994 block.used
995 );
996 BlockStateTracker::set_checkpointed_true(block.id as usize);
997 info.cur_block_idx += 1;
998 info.cur_block_offset = 0;
999 continue;
1000 }
1001
1002 match block.read(off) {
1003 Ok((entry, consumed)) => {
1004 info.cur_block_offset = off + consumed as u64;
1005 if self.should_persist(&mut info, false) {
1006 if let Ok(mut idx_guard) = self.read_offset_index.write() {
1007 let _ = idx_guard.set(
1008 col_name.to_string(),
1009 info.cur_block_idx as u64,
1010 info.cur_block_offset,
1011 );
1012 }
1013 }
1014 debug_print!(
1015 "[reader] read_next: OK col={}, block_id={}, consumed={}, new_offset={}",
1016 col_name,
1017 block.id,
1018 consumed,
1019 info.cur_block_offset
1020 );
1021 return Ok(Some(entry));
1022 }
1023 Err(_) => {
1024 debug_print!(
1025 "[reader] read_next: read error; skip block col={}, block_id={}, offset={}",
1026 col_name,
1027 block.id,
1028 off
1029 );
1030 info.cur_block_idx += 1;
1031 info.cur_block_offset = 0;
1032 continue;
1033 }
1034 }
1035 }
1036
1037 let writer_arc = {
1040 let map = self.writers.read().map_err(|_| {
1041 std::io::Error::new(std::io::ErrorKind::Other, "writers read lock poisoned")
1042 })?;
1043 match map.get(col_name) {
1044 Some(w) => w.clone(),
1045 None => return Ok(None),
1046 }
1047 };
1048 let (active_block, written) = {
1049 let blk = writer_arc.current_block.lock().map_err(|_| {
1050 std::io::Error::new(std::io::ErrorKind::Other, "current_block lock poisoned")
1051 })?;
1052 let off = writer_arc.current_offset.lock().map_err(|_| {
1053 std::io::Error::new(std::io::ErrorKind::Other, "current_offset lock poisoned")
1054 })?;
1055 (blk.clone(), *off)
1056 };
1057
1058 if let Some((tail_block_id, tail_off)) = persisted_tail {
1060 if tail_block_id != active_block.id {
1061 if let Some((idx, _)) = info
1062 .chain
1063 .iter()
1064 .enumerate()
1065 .find(|(_, b)| b.id == tail_block_id)
1066 {
1067 info.cur_block_idx = idx;
1068 info.cur_block_offset = tail_off.min(info.chain[idx].used);
1069 if self.should_persist(&mut info, true) {
1070 if let Ok(mut idx_guard) = self.read_offset_index.write() {
1071 let _ = idx_guard.set(
1072 col_name.to_string(),
1073 info.cur_block_idx as u64,
1074 info.cur_block_offset,
1075 );
1076 }
1077 }
1078 persisted_tail = None; continue;
1080 } else {
1081 persisted_tail = Some((active_block.id, 0));
1083 if self.should_persist(&mut info, true) {
1084 if let Ok(mut idx_guard) = self.read_offset_index.write() {
1085 let _ = idx_guard.set(
1086 col_name.to_string(),
1087 active_block.id | TAIL_FLAG,
1088 0,
1089 );
1090 }
1091 }
1092 }
1093 }
1094 } else {
1095 persisted_tail = Some((active_block.id, 0));
1097 if self.should_persist(&mut info, true) {
1098 if let Ok(mut idx_guard) = self.read_offset_index.write() {
1099 let _ = idx_guard.set(col_name.to_string(), active_block.id | TAIL_FLAG, 0);
1100 }
1101 }
1102 }
1103
1104 let (tail_block_id, mut tail_off) = match persisted_tail {
1106 Some(v) => v,
1107 None => return Ok(None),
1108 };
1109 if tail_block_id == active_block.id {
1110 if info.tail_block_id == active_block.id {
1112 tail_off = tail_off.max(info.tail_offset);
1113 }
1114 } else {
1115 }
1117 if tail_block_id != active_block.id {
1119 continue;
1120 }
1121
1122 if tail_off < written {
1123 match active_block.read(tail_off) {
1124 Ok((entry, consumed)) => {
1125 let new_off = tail_off + consumed as u64;
1126 info.tail_block_id = active_block.id;
1128 info.tail_offset = new_off;
1129 persisted_tail = Some((tail_block_id, new_off));
1130 if self.should_persist(&mut info, false) {
1131 if let Ok(mut idx_guard) = self.read_offset_index.write() {
1132 let _ = idx_guard.set(
1133 col_name.to_string(),
1134 tail_block_id | TAIL_FLAG,
1135 new_off,
1136 );
1137 }
1138 }
1139 debug_print!(
1140 "[reader] read_next: tail OK col={}, block_id={}, consumed={}, new_tail_off={}",
1141 col_name,
1142 active_block.id,
1143 consumed,
1144 new_off
1145 );
1146 return Ok(Some(entry));
1147 }
1148 Err(_) => {
1149 debug_print!(
1150 "[reader] read_next: tail read error col={}, block_id={}, offset={}",
1151 col_name,
1152 active_block.id,
1153 tail_off
1154 );
1155 return Ok(None);
1156 }
1157 }
1158 } else {
1159 debug_print!(
1160 "[reader] read_next: tail caught up col={}, block_id={}, off={}, written={}",
1161 col_name,
1162 active_block.id,
1163 tail_off,
1164 written
1165 );
1166 return Ok(None);
1167 }
1168 }
1169 }
1170
1171 fn should_persist(&self, info: &mut ColReaderInfo, force: bool) -> bool {
1172 match self.read_consistency {
1173 ReadConsistency::StrictlyAtOnce => true,
1174 ReadConsistency::AtLeastOnce { persist_every } => {
1175 let every = persist_every.max(1);
1176 if force {
1177 info.reads_since_persist = 0;
1178 return true;
1179 }
1180 let next = info.reads_since_persist.saturating_add(1);
1181 if next >= every {
1182 info.reads_since_persist = 0;
1183 true
1184 } else {
1185 info.reads_since_persist = next;
1186 false
1187 }
1188 }
1189 }
1190 }
1191
1192 fn startup_chore(&self) -> std::io::Result<()> {
1193 let dir = match fs::read_dir("./wal_files") {
1195 Ok(d) => d,
1196 Err(_) => return Ok(()),
1197 };
1198 let mut files: Vec<String> = Vec::new();
1199 for entry in dir.flatten() {
1200 let path = entry.path();
1201 if let Some(s) = path.to_str() {
1202 if s.ends_with("_index.db") {
1204 continue;
1205 }
1206 files.push(s.to_string());
1207 }
1208 }
1209 files.sort();
1210 if !files.is_empty() {
1211 debug_print!("[recovery] scanning files: {}", files.len());
1212 }
1213
1214 let mut next_block_id: usize = 1;
1216 let mut seen_files = std::collections::HashSet::new();
1217
1218 for file_path in files.iter() {
1219 let mmap = match SharedMmapKeeper::get_mmap_arc(file_path) {
1220 Ok(m) => m,
1221 Err(e) => {
1222 debug_print!("[recovery] mmap open failed for {}: {}", file_path, e);
1223 continue;
1224 }
1225 };
1226 seen_files.insert(file_path.clone());
1227 FileStateTracker::register_file_if_absent(file_path);
1228 debug_print!("[recovery] file {}", file_path);
1229
1230 let mut block_offset: u64 = 0;
1231 while block_offset + DEFAULT_BLOCK_SIZE <= MAX_FILE_SIZE {
1232 let mut probe = [0u8; 8];
1234 mmap.read(block_offset as usize, &mut probe);
1235 if probe.iter().all(|&b| b == 0) {
1236 break;
1237 }
1238
1239 let mut used: u64 = 0;
1240 let mut col_name: Option<String> = None;
1241
1242 let mut meta_buf = vec![0u8; PREFIX_META_SIZE];
1244 mmap.read(block_offset as usize, &mut meta_buf);
1245 let meta_len = (meta_buf[0] as usize) | ((meta_buf[1] as usize) << 8);
1246 if meta_len == 0 || meta_len > PREFIX_META_SIZE - 2 {
1247 break;
1248 }
1249 let mut aligned = rkyv::AlignedVec::with_capacity(meta_len);
1250 aligned.extend_from_slice(&meta_buf[2..2 + meta_len]);
1251 let archived = unsafe { rkyv::archived_root::<Metadata>(&aligned[..]) };
1256 let md: Metadata = match archived.deserialize(&mut rkyv::Infallible) {
1257 Ok(m) => m,
1258 Err(_) => {
1259 break;
1260 }
1261 };
1262 col_name = Some(md.owned_by);
1263
1264 let block_stub = Block {
1266 id: next_block_id as u64,
1267 file_path: file_path.clone(),
1268 offset: block_offset,
1269 limit: DEFAULT_BLOCK_SIZE,
1270 mmap: mmap.clone(),
1271 used: 0,
1272 };
1273 let mut in_block_off: u64 = 0;
1274 loop {
1275 match block_stub.read(in_block_off) {
1276 Ok((_entry, consumed)) => {
1277 used += consumed as u64;
1278 in_block_off += consumed as u64;
1279 if in_block_off >= DEFAULT_BLOCK_SIZE {
1280 break;
1281 }
1282 }
1283 Err(_) => break,
1284 }
1285 }
1286 if used == 0 {
1287 break;
1288 }
1289
1290 let block = Block {
1291 id: next_block_id as u64,
1292 file_path: file_path.clone(),
1293 offset: block_offset,
1294 limit: DEFAULT_BLOCK_SIZE,
1295 mmap: mmap.clone(),
1296 used,
1297 };
1298 BlockStateTracker::register_block(next_block_id, file_path);
1300 FileStateTracker::add_block_to_file_state(file_path);
1301 if let Some(col) = col_name {
1302 let _ = self.reader.append_block_to_chain(&col, block.clone());
1303 debug_print!(
1304 "[recovery] appended block: file={}, block_id={}, used={}, col={}",
1305 file_path,
1306 block.id,
1307 block.used,
1308 col
1309 );
1310 }
1311 next_block_id += 1;
1312 block_offset += DEFAULT_BLOCK_SIZE;
1313 }
1314 }
1315
1316 if let Ok(idx_guard) = self.read_offset_index.read() {
1318 let map = self.reader.data.read().ok();
1319 if let Some(map) = map {
1320 for (col, info_arc) in map.iter() {
1321 if let Some(pos) = idx_guard.get(col) {
1322 let mut info = match info_arc.write() {
1323 Ok(v) => v,
1324 Err(_) => continue,
1325 };
1326 let mut ib = pos.cur_block_idx as usize;
1327 if ib > info.chain.len() {
1328 ib = info.chain.len();
1329 }
1330 info.cur_block_idx = ib;
1331 if ib < info.chain.len() {
1332 let used = info.chain[ib].used;
1333 info.cur_block_offset = pos.cur_block_offset.min(used);
1334 } else {
1335 info.cur_block_offset = 0;
1336 }
1337 for i in 0..ib {
1338 BlockStateTracker::set_checkpointed_true(info.chain[i].id as usize);
1339 }
1340 if ib < info.chain.len() && info.cur_block_offset >= info.chain[ib].used {
1341 BlockStateTracker::set_checkpointed_true(info.chain[ib].id as usize);
1342 }
1343 }
1344 }
1345 }
1346 }
1347
1348 for f in seen_files.into_iter() {
1350 flush_check(f);
1351 }
1352 Ok(())
1353 }
1354}
1355
1356static DELETION_TX: OnceLock<Arc<mpsc::Sender<String>>> = OnceLock::new();
1357
1358fn flush_check(file_path: String) {
1359 if let Some((locked, checkpointed, total, fully_allocated)) =
1361 FileStateTracker::get_state_snapshot(&file_path)
1362 {
1363 let ready_to_delete = fully_allocated && locked == 0 && total > 0 && checkpointed >= total;
1364 if ready_to_delete {
1365 if let Some(tx) = DELETION_TX.get() {
1366 let _ = tx.send(file_path);
1367 }
1368 }
1369 }
1370}
1371
1372struct BlockState {
1373 is_checkpointed: AtomicBool,
1374 file_path: String,
1375}
1376
1377struct BlockStateTracker {}
1378
1379impl BlockStateTracker {
1380 fn map() -> &'static RwLock<HashMap<usize, BlockState>> {
1381 static MAP: OnceLock<RwLock<HashMap<usize, BlockState>>> = OnceLock::new();
1382 MAP.get_or_init(|| RwLock::new(HashMap::new()))
1383 }
1384
1385 fn new() {
1386 let _ = Self::map();
1387 }
1388
1389 fn register_block(block_id: usize, file_path: &str) {
1390 let map = Self::map();
1391 if let Ok(mut w) = map.write() {
1392 w.entry(block_id).or_insert_with(|| BlockState {
1393 is_checkpointed: AtomicBool::new(false),
1394 file_path: file_path.to_string(),
1395 });
1396 }
1397 }
1398
1399 fn get_file_path_for_block(block_id: usize) -> Option<String> {
1400 let map = Self::map();
1401 let r = map.read().ok()?;
1402 r.get(&block_id).map(|b| b.file_path.clone())
1403 }
1404
1405 fn set_checkpointed_true(block_id: usize) {
1406 let path_opt = {
1407 let map = Self::map();
1408 if let Ok(r) = map.read() {
1409 if let Some(b) = r.get(&block_id) {
1410 b.is_checkpointed.store(true, Ordering::Release);
1411 Some(b.file_path.clone())
1412 } else {
1413 None
1414 }
1415 } else {
1416 None
1417 }
1418 };
1419
1420 if let Some(path) = path_opt {
1421 FileStateTracker::inc_checkpoint_for_file(&path);
1422 flush_check(path);
1423 }
1424 }
1425}
1426
1427struct FileState {
1428 locked_block_ctr: AtomicU16,
1429 checkpoint_block_ctr: AtomicU16,
1430 total_blocks: AtomicU16,
1431 is_fully_allocated: AtomicBool,
1432}
1433
1434struct FileStateTracker {}
1435
1436impl FileStateTracker {
1437 fn map() -> &'static RwLock<HashMap<String, FileState>> {
1438 static MAP: OnceLock<RwLock<HashMap<String, FileState>>> = OnceLock::new();
1439 MAP.get_or_init(|| RwLock::new(HashMap::new()))
1440 }
1441
1442 fn new() {
1443 let _ = Self::map();
1444 }
1445
1446 fn register_file_if_absent(file_path: &str) {
1447 let map = Self::map();
1448 let mut w = map.write().expect("file state map write lock poisoned");
1449 w.entry(file_path.to_string()).or_insert_with(|| FileState {
1450 locked_block_ctr: AtomicU16::new(0),
1451 checkpoint_block_ctr: AtomicU16::new(0),
1452 total_blocks: AtomicU16::new(0),
1453 is_fully_allocated: AtomicBool::new(false),
1454 });
1455 }
1456
1457 fn add_block_to_file_state(file_path: &str) {
1458 Self::register_file_if_absent(file_path);
1459 let map = Self::map();
1460 if let Ok(r) = map.read() {
1461 if let Some(st) = r.get(file_path) {
1462 st.total_blocks.fetch_add(1, Ordering::AcqRel);
1463 }
1464 }
1465 }
1466
1467 fn set_fully_allocated(file_path: String) {
1468 Self::register_file_if_absent(&file_path);
1469 let map = Self::map();
1470 if let Ok(r) = map.read() {
1471 if let Some(st) = r.get(&file_path) {
1472 st.is_fully_allocated.store(true, Ordering::Release);
1473 }
1474 }
1475 flush_check(file_path);
1476 }
1477
1478 fn set_block_locked(block_id: usize) {
1479 if let Some(path) = BlockStateTracker::get_file_path_for_block(block_id) {
1480 let map = Self::map();
1481 if let Ok(r) = map.read() {
1482 if let Some(st) = r.get(&path) {
1483 st.locked_block_ctr.fetch_add(1, Ordering::AcqRel);
1484 }
1485 }
1486 }
1487 }
1488
1489 fn set_block_unlocked(block_id: usize) {
1490 if let Some(path) = BlockStateTracker::get_file_path_for_block(block_id) {
1491 let map = Self::map();
1492 if let Ok(r) = map.read() {
1493 if let Some(st) = r.get(&path) {
1494 st.locked_block_ctr.fetch_sub(1, Ordering::AcqRel);
1495 }
1496 }
1497 flush_check(path);
1498 }
1499 }
1500
1501 fn inc_checkpoint_for_file(file_path: &str) {
1502 let map = Self::map();
1503 if let Ok(r) = map.read() {
1504 if let Some(st) = r.get(file_path) {
1505 st.checkpoint_block_ctr.fetch_add(1, Ordering::AcqRel);
1506 }
1507 }
1508 }
1509
1510 fn get_state_snapshot(file_path: &str) -> Option<(u16, u16, u16, bool)> {
1511 let map = Self::map();
1512 let r = map.read().ok()?;
1513 let st = r.get(file_path)?;
1514 let locked = st.locked_block_ctr.load(Ordering::Acquire);
1515 let checkpointed = st.checkpoint_block_ctr.load(Ordering::Acquire);
1516 let total = st.total_blocks.load(Ordering::Acquire);
1517 let fully = st.is_fully_allocated.load(Ordering::Acquire);
1518 Some((locked, checkpointed, total, fully))
1519 }
1520}