walrus_rust/
wal.rs

1use memmap2::MmapMut;
2use rkyv::{Archive, Deserialize, Serialize};
3use std::cell::UnsafeCell;
4use std::collections::HashMap;
5use std::fs;
6use std::fs::OpenOptions;
7use std::path::Path;
8use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering};
9use std::sync::mpsc;
10use std::sync::{Arc, Mutex, OnceLock, RwLock};
11use std::thread;
12use std::time::Duration;
13use std::time::SystemTime;
14
15// Macro to conditionally print debug messages
16macro_rules! debug_print {
17    ($($arg:tt)*) => {
18        if std::env::var("WALRUS_QUIET").is_err() {
19            println!($($arg)*);
20        }
21    };
22}
23
24const DEFAULT_BLOCK_SIZE: u64 = 10 * 1024 * 1024; // 10mb
25const BLOCKS_PER_FILE: u64 = 100;
26const MAX_ALLOC: u64 = 1 * 1024 * 1024 * 1024; // 1 GiB cap per block
27const PREFIX_META_SIZE: usize = 64;
28const MAX_FILE_SIZE: u64 = DEFAULT_BLOCK_SIZE * BLOCKS_PER_FILE;
29
30fn now_millis_str() -> String {
31    let ms = SystemTime::now()
32        .duration_since(SystemTime::UNIX_EPOCH)
33        .unwrap_or_else(|_| std::time::Duration::from_secs(0))
34        .as_millis();
35    ms.to_string()
36}
37
38fn checksum64(data: &[u8]) -> u64 {
39    // FNV-1a 64-bit checksum
40    const FNV_OFFSET: u64 = 0xcbf29ce484222325;
41    const FNV_PRIME: u64 = 0x00000100000001B3;
42    let mut hash = FNV_OFFSET;
43    for &b in data {
44        hash ^= b as u64;
45        hash = hash.wrapping_mul(FNV_PRIME);
46    }
47    hash
48}
49
50#[derive(Clone, Debug)]
51
52pub struct Entry {
53    pub data: Vec<u8>,
54}
55
56#[derive(Archive, Deserialize, Serialize, Debug)]
57#[archive(check_bytes)]
58struct Metadata {
59    read_size: usize,
60    owned_by: String,
61    next_block_start: u64,
62    checksum: u64,
63}
64
65#[derive(Clone, Debug)]
66pub struct Block {
67    id: u64,
68    file_path: String,
69    offset: u64,
70    limit: u64,
71    mmap: Arc<SharedMmap>,
72    used: u64,
73}
74
75impl Block {
76    fn write(
77        &self,
78        in_block_offset: u64,
79        data: &[u8],
80        owned_by: &str,
81        next_block_start: u64,
82    ) -> std::io::Result<()> {
83        debug_assert!(
84            in_block_offset + (data.len() as u64 + PREFIX_META_SIZE as u64) <= self.limit
85        );
86
87        let new_meta = Metadata {
88            read_size: data.len(),
89            owned_by: owned_by.to_string(),
90            next_block_start,
91            checksum: checksum64(data),
92        };
93
94        let meta_bytes = rkyv::to_bytes::<_, 256>(&new_meta).map_err(|e| {
95            std::io::Error::new(
96                std::io::ErrorKind::Other,
97                format!("serialize metadata failed: {:?}", e),
98            )
99        })?;
100        if meta_bytes.len() > PREFIX_META_SIZE - 2 {
101            return Err(std::io::Error::new(
102                std::io::ErrorKind::InvalidData,
103                "metadata too large",
104            ));
105        }
106
107        let mut meta_buffer = vec![0u8; PREFIX_META_SIZE];
108        // Store actual length in first 2 bytes (little endian)
109        meta_buffer[0] = (meta_bytes.len() & 0xFF) as u8;
110        meta_buffer[1] = ((meta_bytes.len() >> 8) & 0xFF) as u8;
111        // Copy actual metadata starting at byte 2
112        meta_buffer[2..2 + meta_bytes.len()].copy_from_slice(&meta_bytes);
113
114        // Combine and write
115        let mut combined = Vec::with_capacity(PREFIX_META_SIZE + data.len());
116        combined.extend_from_slice(&meta_buffer);
117        combined.extend_from_slice(data);
118
119        let file_offset = self.offset + in_block_offset;
120        self.mmap.write(file_offset as usize, &combined);
121        Ok(())
122    }
123
124    fn read(&self, in_block_offset: u64) -> std::io::Result<(Entry, usize)> {
125        let mut meta_buffer = vec![0; PREFIX_META_SIZE];
126        let file_offset = self.offset + in_block_offset;
127        self.mmap.read(file_offset as usize, &mut meta_buffer);
128
129        // Read the actual metadata length from first 2 bytes
130        let meta_len = (meta_buffer[0] as usize) | ((meta_buffer[1] as usize) << 8);
131
132        if meta_len == 0 || meta_len > PREFIX_META_SIZE - 2 {
133            return Err(std::io::Error::new(
134                std::io::ErrorKind::InvalidData,
135                format!("invalid metadata length: {}", meta_len),
136            ));
137        }
138
139        // Deserialize only the actual metadata bytes (skip the 2-byte length prefix)
140        let mut aligned = rkyv::AlignedVec::with_capacity(meta_len);
141        aligned.extend_from_slice(&meta_buffer[2..2 + meta_len]);
142
143        // SAFETY: `aligned` contains bytes we just read from our own file format.
144        // We bounded `meta_len` to PREFIX_META_SIZE and copy into an `AlignedVec`,
145        // which satisfies alignment requirements of rkyv.
146        let archived = unsafe { rkyv::archived_root::<Metadata>(&aligned[..]) };
147        let meta: Metadata = archived.deserialize(&mut rkyv::Infallible).map_err(|_| {
148            std::io::Error::new(
149                std::io::ErrorKind::InvalidData,
150                "failed to deserialize metadata",
151            )
152        })?;
153        let actual_entry_size = meta.read_size;
154
155        // Read the actual data
156        let new_offset = file_offset + PREFIX_META_SIZE as u64;
157        let mut ret_buffer = vec![0; actual_entry_size];
158        self.mmap.read(new_offset as usize, &mut ret_buffer);
159
160        // Verify checksum
161        let expected = meta.checksum;
162        if checksum64(&ret_buffer) != expected {
163            debug_print!(
164                "[reader] checksum mismatch; skipping corrupted entry at offset={} in file={}, block_id={}",
165                in_block_offset,
166                self.file_path,
167                self.id
168            );
169            return Err(std::io::Error::new(
170                std::io::ErrorKind::InvalidData,
171                "checksum mismatch, data corruption detected",
172            ));
173        }
174
175        let consumed = PREFIX_META_SIZE + actual_entry_size;
176        Ok((Entry { data: ret_buffer }, consumed))
177    }
178}
179
180fn make_new_file() -> std::io::Result<String> {
181    let file_name = now_millis_str();
182    let file_path = format!("wal_files/{}", file_name);
183    let f = std::fs::File::create(&file_path)?;
184    f.set_len(MAX_FILE_SIZE)?;
185    Ok(file_path)
186}
187
188// has block metas to give out
189struct BlockAllocator {
190    next_block: UnsafeCell<Block>,
191    lock: AtomicBool,
192}
193
194impl BlockAllocator {
195    pub fn new() -> std::io::Result<Self> {
196        std::fs::create_dir_all("wal_files").ok();
197        let file1 = make_new_file()?;
198        let mmap: Arc<SharedMmap> = SharedMmapKeeper::get_mmap_arc(&file1)?;
199        debug_print!(
200            "[alloc] init: created file={}, max_file_size={}B, block_size={}B",
201            file1,
202            MAX_FILE_SIZE,
203            DEFAULT_BLOCK_SIZE
204        );
205        Ok(BlockAllocator {
206            next_block: UnsafeCell::new(Block {
207                id: 1,
208                offset: 0,
209                limit: DEFAULT_BLOCK_SIZE,
210                file_path: file1,
211                mmap,
212                used: 0,
213            }),
214            lock: AtomicBool::new(false),
215        })
216    }
217
218    // Allocate the next available block with proper locking
219    /// SAFETY: Caller must ensure the returned `Block` is treated as uniquely
220    /// owned by a single writer until it is sealed. Internally, a spin lock
221    /// ensures exclusive mutable access to `next_block` while computing the
222    /// next allocation, so the interior `UnsafeCell` is not concurrently
223    /// accessed mutably.
224    pub unsafe fn get_next_available_block(&self) -> std::io::Result<Block> {
225        self.lock();
226        // SAFETY: Guarded by `self.lock()` above, providing exclusive access
227        // to `next_block` so creating a `&mut` from `UnsafeCell` is sound.
228        let data = unsafe { &mut *self.next_block.get() };
229        let prev_block_file_path = data.file_path.clone();
230        if data.offset >= MAX_FILE_SIZE {
231            // mark previous file as fully allocated before switching
232            FileStateTracker::set_fully_allocated(prev_block_file_path);
233            data.file_path = make_new_file()?;
234            data.mmap = SharedMmapKeeper::get_mmap_arc(&data.file_path)?;
235            data.offset = 0;
236            data.used = 0;
237            debug_print!("[alloc] rolled over to new file: {}", data.file_path);
238        }
239
240        // set the cur block as locked
241        BlockStateTracker::register_block(data.id as usize, &data.file_path);
242        FileStateTracker::register_file_if_absent(&data.file_path);
243        FileStateTracker::add_block_to_file_state(&data.file_path);
244        FileStateTracker::set_block_locked(data.id as usize);
245        let ret = data.clone();
246        data.offset += DEFAULT_BLOCK_SIZE;
247        data.id += 1;
248        self.unlock();
249        debug_print!(
250            "[alloc] handout: block_id={}, file={}, offset={}, limit={}",
251            ret.id,
252            ret.file_path,
253            ret.offset,
254            ret.limit
255        );
256        Ok(ret)
257    }
258
259    /// SAFETY: Caller must ensure the resulting `Block` remains uniquely used
260    /// by one writer and not read concurrently while being written. The
261    /// internal spin lock provides exclusive access to mutate allocator state.
262    pub unsafe fn alloc_block(&self, want_bytes: u64) -> std::io::Result<Block> {
263        if want_bytes == 0 || want_bytes > MAX_ALLOC {
264            return Err(std::io::Error::new(
265                std::io::ErrorKind::InvalidInput,
266                "invalid allocation size, a single entry can't be more than 1gb",
267            ));
268        }
269        let alloc_units = (want_bytes + DEFAULT_BLOCK_SIZE - 1) / DEFAULT_BLOCK_SIZE;
270        let alloc_size = alloc_units * DEFAULT_BLOCK_SIZE;
271        debug_print!(
272            "[alloc] alloc_block: want_bytes={}, units={}, size={}",
273            want_bytes,
274            alloc_units,
275            alloc_size
276        );
277
278        self.lock();
279        // SAFETY: Guarded by `self.lock()` above, providing exclusive access
280        // to `next_block` so creating a `&mut` from `UnsafeCell` is sound.
281        let data = unsafe { &mut *self.next_block.get() };
282        if data.offset + alloc_size > MAX_FILE_SIZE {
283            let prev_block_file_path = data.file_path.clone();
284            data.file_path = make_new_file()?;
285            data.mmap = SharedMmapKeeper::get_mmap_arc(&data.file_path)?;
286            data.offset = 0;
287            // mark the previous file fully allocated now
288            FileStateTracker::set_fully_allocated(prev_block_file_path);
289            debug_print!(
290                "[alloc] file rollover for sized alloc -> {}",
291                data.file_path
292            );
293        }
294        let ret = Block {
295            id: data.id,
296            file_path: data.file_path.clone(),
297            offset: data.offset,
298            limit: alloc_size,
299            mmap: data.mmap.clone(),
300            used: 0,
301        };
302        // register the new block before handing it out
303        BlockStateTracker::register_block(ret.id as usize, &ret.file_path);
304        FileStateTracker::register_file_if_absent(&ret.file_path);
305        FileStateTracker::add_block_to_file_state(&ret.file_path);
306        FileStateTracker::set_block_locked(ret.id as usize);
307        data.offset += alloc_size;
308        data.id += 1;
309        self.unlock();
310        debug_print!(
311            "[alloc] handout(sized): block_id={}, file={}, offset={}, limit={}",
312            ret.id,
313            ret.file_path,
314            ret.offset,
315            ret.limit
316        );
317        Ok(ret)
318    }
319
320    /*
321    the critical section of this call would be absolutely tiny given the exception of when a new file is being created, but it'll be amortized and in the majority of the scenario it would be a handful of microseconds and the overhead of a syscall isnt worth it, a hundred or two cycles are nothing in the grand scheme of things 
322    */
323    fn lock(&self) {
324        // Spin lock implementation
325        while self
326            .lock
327            .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
328            .is_err()
329        {
330            std::hint::spin_loop();
331        }
332    }
333
334    fn unlock(&self) {
335        self.lock.store(false, Ordering::Release);
336    }
337}
338
339// SAFETY: `BlockAllocator` uses an internal spin lock to guard all mutable
340// access to `next_block`. It does not expose references to its interior
341// without holding that lock, so concurrent access across threads is safe.
342unsafe impl Sync for BlockAllocator {}
343// SAFETY: The type contains only thread-safe primitives and does not rely on
344// thread-affine resources; moving it to another thread is safe.
345unsafe impl Send for BlockAllocator {}
346
347struct Writer {
348    allocator: Arc<BlockAllocator>,
349    current_block: Mutex<Block>,
350    reader: Arc<Reader>,
351    col: String,
352    publisher: Arc<mpsc::Sender<String>>,
353    current_offset: Mutex<u64>,
354}
355
356impl Writer {
357    pub fn new(
358        allocator: Arc<BlockAllocator>,
359        current_block: Block,
360        reader: Arc<Reader>,
361        col: String,
362        publisher: Arc<mpsc::Sender<String>>,
363    ) -> Self {
364        Writer {
365            allocator,
366            current_block: Mutex::new(current_block),
367            reader,
368            col: col.clone(),
369            publisher,
370            current_offset: Mutex::new(0),
371        }
372    }
373
374    pub fn write(&self, data: &[u8]) -> std::io::Result<()> {
375        let mut block = self.current_block.lock().map_err(|_| {
376            std::io::Error::new(std::io::ErrorKind::Other, "current_block lock poisoned")
377        })?;
378        let mut cur = self.current_offset.lock().map_err(|_| {
379            std::io::Error::new(std::io::ErrorKind::Other, "current_offset lock poisoned")
380        })?;
381
382        let need = (PREFIX_META_SIZE as u64) + (data.len() as u64);
383        if *cur + need > block.limit {
384            debug_print!(
385                "[writer] sealing: col={}, block_id={}, used={}, need={}, limit={}",
386                self.col,
387                block.id,
388                *cur,
389                need,
390                block.limit
391            );
392            FileStateTracker::set_block_unlocked(block.id as usize);
393            let mut sealed = block.clone();
394            sealed.used = *cur;
395            sealed.mmap.flush()?;
396            let _ = self.reader.append_block_to_chain(&self.col, sealed);
397            debug_print!("[writer] appended sealed block to chain: col={}", self.col);
398            // switch to new block
399            // SAFETY: We hold `current_block` and `current_offset` mutexes, so
400            // this writer has exclusive ownership of the active block. The
401            // allocator's internal lock ensures unique block handout.
402            let new_block = unsafe { self.allocator.alloc_block(need) }?;
403            debug_print!(
404                "[writer] switched to new block: col={}, new_block_id={}",
405                self.col,
406                new_block.id
407            );
408            *block = new_block;
409            *cur = 0;
410        }
411        let next_block_start = block.offset + block.limit; // simplistic for now
412        block.write(*cur, data, &self.col, next_block_start)?;
413        debug_print!(
414            "[writer] wrote: col={}, block_id={}, offset_before={}, bytes={}, offset_after={}",
415            self.col,
416            block.id,
417            *cur,
418            need,
419            *cur + need
420        );
421        *cur += need;
422        let _ = self.publisher.send(block.file_path.clone());
423        Ok(())
424    }
425}
426
427#[derive(Debug)]
428struct SharedMmap {
429    mmap: MmapMut,
430    last_touched_at: AtomicU64,
431}
432
433// SAFETY: `SharedMmap` provides interior mutability only via methods that
434// enforce bounds and perform atomic timestamp updates; the underlying
435// `MmapMut` supports concurrent reads and explicit flushes.
436unsafe impl Sync for SharedMmap {}
437// SAFETY: The struct holds an `MmapMut` which is safe to move between threads;
438// timestamps are atomics, so sending is sound.
439unsafe impl Send for SharedMmap {}
440
441impl SharedMmap {
442    pub fn new(path: &str) -> std::io::Result<Arc<Self>> {
443        let file = OpenOptions::new().read(true).write(true).open(path)?;
444
445        // SAFETY: `file` is opened read/write and lives for the duration of this
446        // mapping; `memmap2` upholds aliasing invariants for `MmapMut`.
447        let mmap = unsafe { MmapMut::map_mut(&file)? };
448        let now_ms = SystemTime::now()
449            .duration_since(SystemTime::UNIX_EPOCH)
450            .unwrap_or_else(|_| std::time::Duration::from_secs(0))
451            .as_millis() as u64;
452        Ok(Arc::new(Self {
453            mmap,
454            last_touched_at: AtomicU64::new(now_ms),
455        }))
456    }
457
458    pub fn write(&self, offset: usize, data: &[u8]) {
459        // Bounds check before raw copy to maintain memory safety
460        debug_assert!(offset <= self.mmap.len());
461        debug_assert!(self.mmap.len() - offset >= data.len());
462        // SAFETY: We validated that the destination range [offset, offset+len)
463        // is within the mmap and does not overlap `data`. The pointer is valid
464        // for writes for the size of `data.len()` bytes and due to non overlapping
465        // block segments served by BlockAllocator, we guarantee that no two writers
466        // would step over each other's toes
467        unsafe {
468            let ptr = self.mmap.as_ptr() as *mut u8; // Get pointer when needed
469            std::ptr::copy_nonoverlapping(data.as_ptr(), ptr.add(offset), data.len());
470        }
471        let now_ms = SystemTime::now()
472            .duration_since(SystemTime::UNIX_EPOCH)
473            .unwrap_or_else(|_| std::time::Duration::from_secs(0))
474            .as_millis() as u64;
475        self.last_touched_at.store(now_ms, Ordering::Relaxed);
476    }
477
478    pub fn read(&self, offset: usize, dest: &mut [u8]) {
479        debug_assert!(offset + dest.len() <= self.mmap.len());
480        let src = &self.mmap[offset..offset + dest.len()];
481        dest.copy_from_slice(src);
482    }
483
484    pub fn len(&self) -> usize {
485        self.mmap.len()
486    }
487
488    pub fn flush(&self) -> std::io::Result<()> {
489        self.mmap.flush()
490    }
491}
492
493struct SharedMmapKeeper {
494    data: HashMap<String, Arc<SharedMmap>>,
495}
496
497impl SharedMmapKeeper {
498    fn new() -> Self {
499        Self {
500            data: HashMap::new(),
501        }
502    }
503
504    // Fast path: many readers concurrently
505    fn get_mmap_arc_read(path: &str) -> Option<Arc<SharedMmap>> {
506        static MMAP_KEEPER: OnceLock<RwLock<SharedMmapKeeper>> = OnceLock::new();
507        let keeper_lock = MMAP_KEEPER.get_or_init(|| RwLock::new(SharedMmapKeeper::new()));
508        let keeper = keeper_lock.read().ok()?;
509        keeper.data.get(path).cloned()
510    }
511
512    // Read-mostly accessor that escalates to write lock only on miss
513    fn get_mmap_arc(path: &str) -> std::io::Result<Arc<SharedMmap>> {
514        if let Some(existing) = Self::get_mmap_arc_read(path) {
515            return Ok(existing);
516        }
517
518        static MMAP_KEEPER: OnceLock<RwLock<SharedMmapKeeper>> = OnceLock::new();
519        let keeper_lock = MMAP_KEEPER.get_or_init(|| RwLock::new(SharedMmapKeeper::new()));
520
521        // Double-check with a fresh read lock to avoid unnecessary write lock
522        {
523            let keeper = keeper_lock.read().map_err(|_| {
524                std::io::Error::new(std::io::ErrorKind::Other, "mmap keeper read lock poisoned")
525            })?;
526            if let Some(existing) = keeper.data.get(path) {
527                return Ok(existing.clone());
528            }
529        }
530
531        let mut keeper = keeper_lock.write().map_err(|_| {
532            std::io::Error::new(std::io::ErrorKind::Other, "mmap keeper write lock poisoned")
533        })?;
534        if let Some(existing) = keeper.data.get(path) {
535            return Ok(existing.clone());
536        }
537        let mmap_arc = SharedMmap::new(path)?;
538        keeper.data.insert(path.to_string(), mmap_arc.clone());
539        Ok(mmap_arc)
540    }
541}
542
543#[derive(Debug)]
544struct ColReaderInfo {
545    chain: Vec<Block>,
546    cur_block_idx: usize,
547    cur_block_offset: u64,
548    reads_since_persist: u32,
549    // In-memory progress for tail (active writer block). This allows AtLeastOnce
550    // to advance between reads within a single process without persisting every time.
551    tail_block_id: u64,
552    tail_offset: u64,
553    // Ensure we only hydrate from persisted index once per process per column
554    hydrated_from_index: bool,
555}
556
557struct Reader {
558    data: RwLock<HashMap<String, Arc<RwLock<ColReaderInfo>>>>,
559}
560
561impl Reader {
562    fn new() -> Self {
563        Self {
564            data: RwLock::new(HashMap::new()),
565        }
566    }
567
568    fn get_chain_for_col(&self, col: &str) -> Option<Vec<Block>> {
569        let arc_info = {
570            let map = self.data.read().ok()?;
571            map.get(col)?.clone()
572        };
573        let info = arc_info.read().ok()?;
574        Some(info.chain.clone())
575    }
576
577    // internal
578    fn append_block_to_chain(&self, col: &str, block: Block) -> std::io::Result<()> {
579        // fast path: try read-lock map and use per-column lock
580        if let Some(info_arc) = {
581            let map = self.data.read().map_err(|_| {
582                std::io::Error::new(std::io::ErrorKind::Other, "reader map read lock poisoned")
583            })?;
584            map.get(col).cloned()
585        } {
586            let mut info = info_arc.write().map_err(|_| {
587                std::io::Error::new(std::io::ErrorKind::Other, "col info write lock poisoned")
588            })?;
589            let before = info.chain.len();
590            info.chain.push(block.clone());
591            debug_print!(
592                "[reader] chain append(fast): col={}, block_id={}, chain_len {}->{}",
593                col,
594                block.id,
595                before,
596                before + 1
597            );
598            return Ok(());
599        }
600
601        // slow path
602        let info_arc = {
603            let mut map = self.data.write().map_err(|_| {
604                std::io::Error::new(std::io::ErrorKind::Other, "reader map write lock poisoned")
605            })?;
606            map.entry(col.to_string())
607                .or_insert_with(|| {
608                    Arc::new(RwLock::new(ColReaderInfo {
609                        chain: Vec::new(),
610                        cur_block_idx: 0,
611                        cur_block_offset: 0,
612                        reads_since_persist: 0,
613                        tail_block_id: 0,
614                        tail_offset: 0,
615                        hydrated_from_index: false,
616                    }))
617                })
618                .clone()
619        };
620        let mut info = info_arc.write().map_err(|_| {
621            std::io::Error::new(std::io::ErrorKind::Other, "col info write lock poisoned")
622        })?;
623        info.chain.push(block.clone());
624        debug_print!(
625            "[reader] chain append(slow/new): col={}, block_id={}, chain_len {}->{}",
626            col,
627            block.id,
628            0,
629            1
630        );
631        Ok(())
632    }
633}
634
635#[derive(Archive, Deserialize, Serialize, Debug, Clone)]
636pub struct BlockPos {
637    pub cur_block_idx: u64,
638    pub cur_block_offset: u64,
639}
640
641pub struct WalIndex {
642    store: HashMap<String, BlockPos>,
643    path: String,
644}
645
646impl WalIndex {
647    pub fn new(file_name: &str) -> std::io::Result<Self> {
648        // let tmp_path = format!("{}", );
649        fs::create_dir_all("./wal_files").ok();
650        let path = format!("./wal_files/{}_index.db", file_name);
651
652        let store = Path::new(&path)
653            .exists()
654            .then(|| fs::read(&path).ok())
655            .flatten()
656            .and_then(|bytes| {
657                if bytes.is_empty() {
658                    return None;
659                }
660                // SAFETY: `bytes` comes from our persisted index file which we control;
661                // we only proceed when the file is non-empty and rkyv can interpret it.
662                let archived = unsafe { rkyv::archived_root::<HashMap<String, BlockPos>>(&bytes) };
663                archived.deserialize(&mut rkyv::Infallible).ok()
664            })
665            .unwrap_or_default();
666
667        Ok(Self {
668            store,
669            path: path.to_string(),
670        })
671    }
672
673    pub fn set(&mut self, key: String, idx: u64, offset: u64) -> std::io::Result<()> {
674        self.store.insert(
675            key,
676            BlockPos {
677                cur_block_idx: idx,
678                cur_block_offset: offset,
679            },
680        );
681        self.persist()
682    }
683
684    pub fn get(&self, key: &str) -> Option<&BlockPos> {
685        self.store.get(key)
686    }
687
688    pub fn remove(&mut self, key: &str) -> std::io::Result<Option<BlockPos>> {
689        let result = self.store.remove(key);
690        if result.is_some() {
691            self.persist()?;
692        }
693        Ok(result)
694    }
695
696    fn persist(&self) -> std::io::Result<()> {
697        let tmp_path = format!("{}.tmp", self.path);
698        let bytes = rkyv::to_bytes::<_, 256>(&self.store).map_err(|e| {
699            std::io::Error::new(
700                std::io::ErrorKind::Other,
701                format!("index serialize failed: {:?}", e),
702            )
703        })?;
704
705        fs::write(&tmp_path, &bytes)?;
706        fs::File::open(&tmp_path)?.sync_all()?;
707        fs::rename(&tmp_path, &self.path)?;
708        Ok(())
709    }
710}
711
712// public APIs
713#[derive(Clone, Copy, Debug)]
714pub enum ReadConsistency {
715    StrictlyAtOnce,
716    AtLeastOnce { persist_every: u32 },
717}
718
719#[derive(Clone, Copy, Debug)]
720pub enum FsyncSchedule {
721    Milliseconds(u64),
722}
723
724pub struct Walrus {
725    allocator: Arc<BlockAllocator>,
726    reader: Arc<Reader>,
727    writers: RwLock<HashMap<String, Arc<Writer>>>,
728    fsync_tx: Arc<mpsc::Sender<String>>,
729    read_offset_index: Arc<RwLock<WalIndex>>,
730    read_consistency: ReadConsistency,
731    fsync_schedule: FsyncSchedule,
732}
733
734impl Walrus {
735    pub fn new() -> std::io::Result<Self> {
736        Self::with_consistency(ReadConsistency::StrictlyAtOnce)
737    }
738
739    pub fn with_consistency(mode: ReadConsistency) -> std::io::Result<Self> {
740        Self::with_consistency_and_schedule(mode, FsyncSchedule::Milliseconds(1000))
741    }
742
743    pub fn with_consistency_and_schedule(
744        mode: ReadConsistency,
745        fsync_schedule: FsyncSchedule,
746    ) -> std::io::Result<Self> {
747        debug_print!("[walrus] new");
748        let allocator = Arc::new(BlockAllocator::new()?);
749        let reader = Arc::new(Reader::new());
750        let (tx, rx) = mpsc::channel::<String>();
751        let tx_arc = Arc::new(tx);
752        let (del_tx, del_rx) = mpsc::channel::<String>();
753        let del_tx_arc = Arc::new(del_tx);
754        let _ = DELETION_TX.set(del_tx_arc.clone());
755        let pool: HashMap<String, MmapMut> = HashMap::new();
756        let tick = Arc::new(AtomicU64::new(0));
757        let sleep_millis = match fsync_schedule {
758            FsyncSchedule::Milliseconds(ms) => ms.max(1),
759        };
760        // background flusher
761        thread::spawn(move || {
762            let mut pool = pool;
763            let tick = tick;
764            let del_rx = del_rx;
765            let mut delete_pending = std::collections::HashSet::new();
766            loop {
767                thread::sleep(Duration::from_millis(sleep_millis));
768                let mut unique = std::collections::HashSet::new();
769                while let Ok(path) = rx.try_recv() {
770                    unique.insert(path);
771                }
772                if !unique.is_empty() {
773                    debug_print!("[flush] scheduling {} paths", unique.len());
774                }
775                for path in unique.into_iter() {
776                    // Skip if file doesn't exist
777                    if !std::path::Path::new(&path).exists() {
778                        debug_print!("[flush] file does not exist, skipping: {}", path);
779                        continue;
780                    }
781
782                    if !pool.contains_key(&path) {
783                        match OpenOptions::new().read(true).write(true).open(&path) {
784                            Ok(file) => {
785                                // SAFETY: The file is opened read/write and lives at least until
786                                // the created mapping is inserted into `pool`, which owns it.
787                                match unsafe { MmapMut::map_mut(&file) } {
788                                    Ok(mmap) => {
789                                        pool.insert(path.clone(), mmap);
790                                    }
791                                    Err(e) => {
792                                        debug_print!(
793                                            "[flush] failed to create memory map for {}: {}",
794                                            path,
795                                            e
796                                        );
797                                        continue;
798                                    }
799                                }
800                            }
801                            Err(e) => {
802                                debug_print!(
803                                    "[flush] failed to open file for flushing {}: {}",
804                                    path,
805                                    e
806                                );
807                                continue;
808                            }
809                        }
810                    }
811                    if let Some(mmap) = pool.get_mut(&path) {
812                        if let Err(e) = mmap.flush() {
813                            debug_print!("[flush] flush error for {}: {}", path, e);
814                        }
815                    }
816                }
817                // collect deletion requests
818                while let Ok(path) = del_rx.try_recv() {
819                    debug_print!("[reclaim] deletion requested: {}", path);
820                    delete_pending.insert(path);
821                }
822                let n = tick.fetch_add(1, Ordering::Relaxed) + 1;
823                if n >= 1000 {
824                    // WARN: we clean up once every 1000 times the fsync runs
825                    if tick
826                        .compare_exchange(n, 0, Ordering::AcqRel, Ordering::Relaxed)
827                        .is_ok()
828                    {
829                        let mut empty: HashMap<String, MmapMut> = HashMap::new();
830                        std::mem::swap(&mut pool, &mut empty); // reset map every hour to avoid unconstrained overflow
831                        // perform batched deletions now that mmaps are dropped
832                        for path in delete_pending.drain() {
833                            match fs::remove_file(&path) {
834                                Ok(_) => debug_print!("[reclaim] deleted file {}", path),
835                                Err(e) => {
836                                    debug_print!("[reclaim] delete failed for {}: {}", path, e)
837                                }
838                            }
839                        }
840                    }
841                }
842            }
843        });
844        let idx = WalIndex::new("read_offset_idx")?;
845        let instance = Walrus {
846            allocator,
847            reader,
848            writers: RwLock::new(HashMap::new()),
849            fsync_tx: tx_arc,
850            read_offset_index: Arc::new(RwLock::new(idx)),
851            read_consistency: mode,
852            fsync_schedule,
853        };
854        instance.startup_chore()?;
855        Ok(instance)
856    }
857
858    pub fn append_for_topic(&self, col_name: &str, raw_bytes: &[u8]) -> std::io::Result<()> {
859        let writer = {
860            if let Some(w) = {
861                let map = self.writers.read().map_err(|_| {
862                    std::io::Error::new(std::io::ErrorKind::Other, "writers read lock poisoned")
863                })?;
864                map.get(col_name).cloned()
865            } {
866                w
867            } else {
868                let mut map = self.writers.write().map_err(|_| {
869                    std::io::Error::new(std::io::ErrorKind::Other, "writers write lock poisoned")
870                })?;
871                if let Some(w) = map.get(col_name).cloned() {
872                    w
873                } else {
874                    // SAFETY: The returned block will be held by this writer only
875                    // and appended/sealed before being exposed to readers.
876                    let initial_block = unsafe { self.allocator.get_next_available_block()? };
877                    let w = Arc::new(Writer::new(
878                        self.allocator.clone(),
879                        initial_block,
880                        self.reader.clone(),
881                        col_name.to_string(),
882                        self.fsync_tx.clone(),
883                    ));
884                    map.insert(col_name.to_string(), w.clone());
885                    w
886                }
887            }
888        };
889        writer.write(raw_bytes)
890    }
891
892    pub fn read_next(&self, col_name: &str) -> std::io::Result<Option<Entry>> {
893        const TAIL_FLAG: u64 = 1u64 << 63;
894        let info_arc = if let Some(arc) = {
895            let map = self.reader.data.read().map_err(|_| {
896                std::io::Error::new(std::io::ErrorKind::Other, "reader map read lock poisoned")
897            })?;
898            map.get(col_name).cloned()
899        } {
900            arc
901        } else {
902            let mut map = self.reader.data.write().map_err(|_| {
903                std::io::Error::new(std::io::ErrorKind::Other, "reader map write lock poisoned")
904            })?;
905            map.entry(col_name.to_string())
906                .or_insert_with(|| {
907                    Arc::new(RwLock::new(ColReaderInfo {
908                        chain: Vec::new(),
909                        cur_block_idx: 0,
910                        cur_block_offset: 0,
911                        reads_since_persist: 0,
912                        tail_block_id: 0,
913                        tail_offset: 0,
914                        hydrated_from_index: false,
915                    }))
916                })
917                .clone()
918        };
919        let mut info = info_arc.write().map_err(|_| {
920            std::io::Error::new(std::io::ErrorKind::Other, "col info write lock poisoned")
921        })?;
922        debug_print!(
923            "[reader] read_next start: col={}, chain_len={}, idx={}, offset={}",
924            col_name,
925            info.chain.len(),
926            info.cur_block_idx,
927            info.cur_block_offset
928        );
929
930        // Load persisted position (supports tail sentinel)
931        let mut persisted_tail: Option<(u64 /*block_id*/, u64 /*offset*/)> = None;
932        if !info.hydrated_from_index {
933            if let Ok(idx_guard) = self.read_offset_index.read() {
934                if let Some(pos) = idx_guard.get(col_name) {
935                    if (pos.cur_block_idx & TAIL_FLAG) != 0 {
936                        let tail_block_id = pos.cur_block_idx & (!TAIL_FLAG);
937                        persisted_tail = Some((tail_block_id, pos.cur_block_offset));
938                        // sealed state is considered caught up
939                        info.cur_block_idx = info.chain.len();
940                        info.cur_block_offset = 0;
941                    } else {
942                        let mut ib = pos.cur_block_idx as usize;
943                        if ib > info.chain.len() {
944                            ib = info.chain.len();
945                        }
946                        info.cur_block_idx = ib;
947                        if ib < info.chain.len() {
948                            let used = info.chain[ib].used;
949                            info.cur_block_offset = pos.cur_block_offset.min(used);
950                        } else {
951                            info.cur_block_offset = 0;
952                        }
953                    }
954                    info.hydrated_from_index = true;
955                } else {
956                    // No persisted state present; mark hydrated to avoid re-checking every call
957                    info.hydrated_from_index = true;
958                }
959            }
960        }
961
962        // If we have a persisted tail and some sealed blocks were recovered, fold into the last block
963        if let Some((_, tail_off)) = persisted_tail {
964            if !info.chain.is_empty() {
965                let ib = info.chain.len() - 1;
966                info.cur_block_idx = ib;
967                info.cur_block_offset = tail_off.min(info.chain[ib].used);
968                if self.should_persist(&mut info, true) {
969                    if let Ok(mut idx_guard) = self.read_offset_index.write() {
970                        let _ = idx_guard.set(
971                            col_name.to_string(),
972                            info.cur_block_idx as u64,
973                            info.cur_block_offset,
974                        );
975                    }
976                }
977                persisted_tail = None;
978            }
979        }
980
981        loop {
982            // Sealed chain path
983            if info.cur_block_idx < info.chain.len() {
984                let idx = info.cur_block_idx;
985                let off = info.cur_block_offset;
986                let block = info.chain[idx].clone();
987
988                if off >= block.used {
989                    debug_print!(
990                        "[reader] read_next: advance block col={}, block_id={}, offset={}, used={}",
991                        col_name,
992                        block.id,
993                        off,
994                        block.used
995                    );
996                    BlockStateTracker::set_checkpointed_true(block.id as usize);
997                    info.cur_block_idx += 1;
998                    info.cur_block_offset = 0;
999                    continue;
1000                }
1001
1002                match block.read(off) {
1003                    Ok((entry, consumed)) => {
1004                        info.cur_block_offset = off + consumed as u64;
1005                        if self.should_persist(&mut info, false) {
1006                            if let Ok(mut idx_guard) = self.read_offset_index.write() {
1007                                let _ = idx_guard.set(
1008                                    col_name.to_string(),
1009                                    info.cur_block_idx as u64,
1010                                    info.cur_block_offset,
1011                                );
1012                            }
1013                        }
1014                        debug_print!(
1015                            "[reader] read_next: OK col={}, block_id={}, consumed={}, new_offset={}",
1016                            col_name,
1017                            block.id,
1018                            consumed,
1019                            info.cur_block_offset
1020                        );
1021                        return Ok(Some(entry));
1022                    }
1023                    Err(_) => {
1024                        debug_print!(
1025                            "[reader] read_next: read error; skip block col={}, block_id={}, offset={}",
1026                            col_name,
1027                            block.id,
1028                            off
1029                        );
1030                        info.cur_block_idx += 1;
1031                        info.cur_block_offset = 0;
1032                        continue;
1033                    }
1034                }
1035            }
1036
1037            // Tail path ---
1038
1039            let writer_arc = {
1040                let map = self.writers.read().map_err(|_| {
1041                    std::io::Error::new(std::io::ErrorKind::Other, "writers read lock poisoned")
1042                })?;
1043                match map.get(col_name) {
1044                    Some(w) => w.clone(),
1045                    None => return Ok(None),
1046                }
1047            };
1048            let (active_block, written) = {
1049                let blk = writer_arc.current_block.lock().map_err(|_| {
1050                    std::io::Error::new(std::io::ErrorKind::Other, "current_block lock poisoned")
1051                })?;
1052                let off = writer_arc.current_offset.lock().map_err(|_| {
1053                    std::io::Error::new(std::io::ErrorKind::Other, "current_offset lock poisoned")
1054                })?;
1055                (blk.clone(), *off)
1056            };
1057
1058            // If persisted tail points to a different block and that block is now sealed in chain, fold it
1059            if let Some((tail_block_id, tail_off)) = persisted_tail {
1060                if tail_block_id != active_block.id {
1061                    if let Some((idx, _)) = info
1062                        .chain
1063                        .iter()
1064                        .enumerate()
1065                        .find(|(_, b)| b.id == tail_block_id)
1066                    {
1067                        info.cur_block_idx = idx;
1068                        info.cur_block_offset = tail_off.min(info.chain[idx].used);
1069                        if self.should_persist(&mut info, true) {
1070                            if let Ok(mut idx_guard) = self.read_offset_index.write() {
1071                                let _ = idx_guard.set(
1072                                    col_name.to_string(),
1073                                    info.cur_block_idx as u64,
1074                                    info.cur_block_offset,
1075                                );
1076                            }
1077                        }
1078                        persisted_tail = None; // sealed now
1079                        continue;
1080                    } else {
1081                        // rebase tail to current active block at 0
1082                        persisted_tail = Some((active_block.id, 0));
1083                        if self.should_persist(&mut info, true) {
1084                            if let Ok(mut idx_guard) = self.read_offset_index.write() {
1085                                let _ = idx_guard.set(
1086                                    col_name.to_string(),
1087                                    active_block.id | TAIL_FLAG,
1088                                    0,
1089                                );
1090                            }
1091                        }
1092                    }
1093                }
1094            } else {
1095                // No persisted tail; init at current active block start
1096                persisted_tail = Some((active_block.id, 0));
1097                if self.should_persist(&mut info, true) {
1098                    if let Ok(mut idx_guard) = self.read_offset_index.write() {
1099                        let _ = idx_guard.set(col_name.to_string(), active_block.id | TAIL_FLAG, 0);
1100                    }
1101                }
1102            }
1103
1104            // Choose the best known tail offset: prefer in-memory progress for current active block
1105            let (tail_block_id, mut tail_off) = match persisted_tail {
1106                Some(v) => v,
1107                None => return Ok(None),
1108            };
1109            if tail_block_id == active_block.id {
1110                // Use the max of persisted offset and in-memory offset for this process
1111                if info.tail_block_id == active_block.id {
1112                    tail_off = tail_off.max(info.tail_offset);
1113                }
1114            } else {
1115                // If writer rotated and persisted tail points elsewhere, loop above will fold/rebase
1116            }
1117            // If writer rotated after we set persisted_tail, loop to fold/rebaes
1118            if tail_block_id != active_block.id {
1119                continue;
1120            }
1121
1122            if tail_off < written {
1123                match active_block.read(tail_off) {
1124                    Ok((entry, consumed)) => {
1125                        let new_off = tail_off + consumed as u64;
1126                        // Update in-memory tail progress immediately for AtLeastOnce
1127                        info.tail_block_id = active_block.id;
1128                        info.tail_offset = new_off;
1129                        persisted_tail = Some((tail_block_id, new_off));
1130                        if self.should_persist(&mut info, false) {
1131                            if let Ok(mut idx_guard) = self.read_offset_index.write() {
1132                                let _ = idx_guard.set(
1133                                    col_name.to_string(),
1134                                    tail_block_id | TAIL_FLAG,
1135                                    new_off,
1136                                );
1137                            }
1138                        }
1139                        debug_print!(
1140                            "[reader] read_next: tail OK col={}, block_id={}, consumed={}, new_tail_off={}",
1141                            col_name,
1142                            active_block.id,
1143                            consumed,
1144                            new_off
1145                        );
1146                        return Ok(Some(entry));
1147                    }
1148                    Err(_) => {
1149                        debug_print!(
1150                            "[reader] read_next: tail read error col={}, block_id={}, offset={}",
1151                            col_name,
1152                            active_block.id,
1153                            tail_off
1154                        );
1155                        return Ok(None);
1156                    }
1157                }
1158            } else {
1159                debug_print!(
1160                    "[reader] read_next: tail caught up col={}, block_id={}, off={}, written={}",
1161                    col_name,
1162                    active_block.id,
1163                    tail_off,
1164                    written
1165                );
1166                return Ok(None);
1167            }
1168        }
1169    }
1170
1171    fn should_persist(&self, info: &mut ColReaderInfo, force: bool) -> bool {
1172        match self.read_consistency {
1173            ReadConsistency::StrictlyAtOnce => true,
1174            ReadConsistency::AtLeastOnce { persist_every } => {
1175                let every = persist_every.max(1);
1176                if force {
1177                    info.reads_since_persist = 0;
1178                    return true;
1179                }
1180                let next = info.reads_since_persist.saturating_add(1);
1181                if next >= every {
1182                    info.reads_since_persist = 0;
1183                    true
1184                } else {
1185                    info.reads_since_persist = next;
1186                    false
1187                }
1188            }
1189        }
1190    }
1191
1192    fn startup_chore(&self) -> std::io::Result<()> {
1193        // Minimal recovery: scan wal_files, build reader chains, and rebuild trackers
1194        let dir = match fs::read_dir("./wal_files") {
1195            Ok(d) => d,
1196            Err(_) => return Ok(()),
1197        };
1198        let mut files: Vec<String> = Vec::new();
1199        for entry in dir.flatten() {
1200            let path = entry.path();
1201            if let Some(s) = path.to_str() {
1202                // skip index files
1203                if s.ends_with("_index.db") {
1204                    continue;
1205                }
1206                files.push(s.to_string());
1207            }
1208        }
1209        files.sort();
1210        if !files.is_empty() {
1211            debug_print!("[recovery] scanning files: {}", files.len());
1212        }
1213
1214        // synthetic block ids btw
1215        let mut next_block_id: usize = 1;
1216        let mut seen_files = std::collections::HashSet::new();
1217
1218        for file_path in files.iter() {
1219            let mmap = match SharedMmapKeeper::get_mmap_arc(file_path) {
1220                Ok(m) => m,
1221                Err(e) => {
1222                    debug_print!("[recovery] mmap open failed for {}: {}", file_path, e);
1223                    continue;
1224                }
1225            };
1226            seen_files.insert(file_path.clone());
1227            FileStateTracker::register_file_if_absent(file_path);
1228            debug_print!("[recovery] file {}", file_path);
1229
1230            let mut block_offset: u64 = 0;
1231            while block_offset + DEFAULT_BLOCK_SIZE <= MAX_FILE_SIZE {
1232                // heuristic: if first bytes are zero, assume no more blocks
1233                let mut probe = [0u8; 8];
1234                mmap.read(block_offset as usize, &mut probe);
1235                if probe.iter().all(|&b| b == 0) {
1236                    break;
1237                }
1238
1239                let mut used: u64 = 0;
1240                let mut col_name: Option<String> = None;
1241
1242                // try to read first metadata to get column name (with 2-byte length prefix)
1243                let mut meta_buf = vec![0u8; PREFIX_META_SIZE];
1244                mmap.read(block_offset as usize, &mut meta_buf);
1245                let meta_len = (meta_buf[0] as usize) | ((meta_buf[1] as usize) << 8);
1246                if meta_len == 0 || meta_len > PREFIX_META_SIZE - 2 {
1247                    break;
1248                }
1249                let mut aligned = rkyv::AlignedVec::with_capacity(meta_len);
1250                aligned.extend_from_slice(&meta_buf[2..2 + meta_len]);
1251                // SAFETY: `aligned` was constructed from a bounded metadata slice
1252                // read from our file; alignment is ensured by `AlignedVec`.
1253                // SAFETY: `aligned` is built from bounded bytes inside the block,
1254                // copied into `AlignedVec` ensuring alignment for rkyv.
1255                let archived = unsafe { rkyv::archived_root::<Metadata>(&aligned[..]) };
1256                let md: Metadata = match archived.deserialize(&mut rkyv::Infallible) {
1257                    Ok(m) => m,
1258                    Err(_) => {
1259                        break;
1260                    }
1261                };
1262                col_name = Some(md.owned_by);
1263
1264                // scan entries to compute used
1265                let block_stub = Block {
1266                    id: next_block_id as u64,
1267                    file_path: file_path.clone(),
1268                    offset: block_offset,
1269                    limit: DEFAULT_BLOCK_SIZE,
1270                    mmap: mmap.clone(),
1271                    used: 0,
1272                };
1273                let mut in_block_off: u64 = 0;
1274                loop {
1275                    match block_stub.read(in_block_off) {
1276                        Ok((_entry, consumed)) => {
1277                            used += consumed as u64;
1278                            in_block_off += consumed as u64;
1279                            if in_block_off >= DEFAULT_BLOCK_SIZE {
1280                                break;
1281                            }
1282                        }
1283                        Err(_) => break,
1284                    }
1285                }
1286                if used == 0 {
1287                    break;
1288                }
1289
1290                let block = Block {
1291                    id: next_block_id as u64,
1292                    file_path: file_path.clone(),
1293                    offset: block_offset,
1294                    limit: DEFAULT_BLOCK_SIZE,
1295                    mmap: mmap.clone(),
1296                    used,
1297                };
1298                // register and append
1299                BlockStateTracker::register_block(next_block_id, file_path);
1300                FileStateTracker::add_block_to_file_state(file_path);
1301                if let Some(col) = col_name {
1302                    let _ = self.reader.append_block_to_chain(&col, block.clone());
1303                    debug_print!(
1304                        "[recovery] appended block: file={}, block_id={}, used={}, col={}",
1305                        file_path,
1306                        block.id,
1307                        block.used,
1308                        col
1309                    );
1310                }
1311                next_block_id += 1;
1312                block_offset += DEFAULT_BLOCK_SIZE;
1313            }
1314        }
1315
1316        // restore read positions and checkpoint state
1317        if let Ok(idx_guard) = self.read_offset_index.read() {
1318            let map = self.reader.data.read().ok();
1319            if let Some(map) = map {
1320                for (col, info_arc) in map.iter() {
1321                    if let Some(pos) = idx_guard.get(col) {
1322                        let mut info = match info_arc.write() {
1323                            Ok(v) => v,
1324                            Err(_) => continue,
1325                        };
1326                        let mut ib = pos.cur_block_idx as usize;
1327                        if ib > info.chain.len() {
1328                            ib = info.chain.len();
1329                        }
1330                        info.cur_block_idx = ib;
1331                        if ib < info.chain.len() {
1332                            let used = info.chain[ib].used;
1333                            info.cur_block_offset = pos.cur_block_offset.min(used);
1334                        } else {
1335                            info.cur_block_offset = 0;
1336                        }
1337                        for i in 0..ib {
1338                            BlockStateTracker::set_checkpointed_true(info.chain[i].id as usize);
1339                        }
1340                        if ib < info.chain.len() && info.cur_block_offset >= info.chain[ib].used {
1341                            BlockStateTracker::set_checkpointed_true(info.chain[ib].id as usize);
1342                        }
1343                    }
1344                }
1345            }
1346        }
1347
1348        // enqueue deletion checks
1349        for f in seen_files.into_iter() {
1350            flush_check(f);
1351        }
1352        Ok(())
1353    }
1354}
1355
1356static DELETION_TX: OnceLock<Arc<mpsc::Sender<String>>> = OnceLock::new();
1357
1358fn flush_check(file_path: String) {
1359    // readiness check fast path; hook actual reclamation later
1360    if let Some((locked, checkpointed, total, fully_allocated)) =
1361        FileStateTracker::get_state_snapshot(&file_path)
1362    {
1363        let ready_to_delete = fully_allocated && locked == 0 && total > 0 && checkpointed >= total;
1364        if ready_to_delete {
1365            if let Some(tx) = DELETION_TX.get() {
1366                let _ = tx.send(file_path);
1367            }
1368        }
1369    }
1370}
1371
1372struct BlockState {
1373    is_checkpointed: AtomicBool,
1374    file_path: String,
1375}
1376
1377struct BlockStateTracker {}
1378
1379impl BlockStateTracker {
1380    fn map() -> &'static RwLock<HashMap<usize, BlockState>> {
1381        static MAP: OnceLock<RwLock<HashMap<usize, BlockState>>> = OnceLock::new();
1382        MAP.get_or_init(|| RwLock::new(HashMap::new()))
1383    }
1384
1385    fn new() {
1386        let _ = Self::map();
1387    }
1388
1389    fn register_block(block_id: usize, file_path: &str) {
1390        let map = Self::map();
1391        if let Ok(mut w) = map.write() {
1392            w.entry(block_id).or_insert_with(|| BlockState {
1393                is_checkpointed: AtomicBool::new(false),
1394                file_path: file_path.to_string(),
1395            });
1396        }
1397    }
1398
1399    fn get_file_path_for_block(block_id: usize) -> Option<String> {
1400        let map = Self::map();
1401        let r = map.read().ok()?;
1402        r.get(&block_id).map(|b| b.file_path.clone())
1403    }
1404
1405    fn set_checkpointed_true(block_id: usize) {
1406        let path_opt = {
1407            let map = Self::map();
1408            if let Ok(r) = map.read() {
1409                if let Some(b) = r.get(&block_id) {
1410                    b.is_checkpointed.store(true, Ordering::Release);
1411                    Some(b.file_path.clone())
1412                } else {
1413                    None
1414                }
1415            } else {
1416                None
1417            }
1418        };
1419
1420        if let Some(path) = path_opt {
1421            FileStateTracker::inc_checkpoint_for_file(&path);
1422            flush_check(path);
1423        }
1424    }
1425}
1426
1427struct FileState {
1428    locked_block_ctr: AtomicU16,
1429    checkpoint_block_ctr: AtomicU16,
1430    total_blocks: AtomicU16,
1431    is_fully_allocated: AtomicBool,
1432}
1433
1434struct FileStateTracker {}
1435
1436impl FileStateTracker {
1437    fn map() -> &'static RwLock<HashMap<String, FileState>> {
1438        static MAP: OnceLock<RwLock<HashMap<String, FileState>>> = OnceLock::new();
1439        MAP.get_or_init(|| RwLock::new(HashMap::new()))
1440    }
1441
1442    fn new() {
1443        let _ = Self::map();
1444    }
1445
1446    fn register_file_if_absent(file_path: &str) {
1447        let map = Self::map();
1448        let mut w = map.write().expect("file state map write lock poisoned");
1449        w.entry(file_path.to_string()).or_insert_with(|| FileState {
1450            locked_block_ctr: AtomicU16::new(0),
1451            checkpoint_block_ctr: AtomicU16::new(0),
1452            total_blocks: AtomicU16::new(0),
1453            is_fully_allocated: AtomicBool::new(false),
1454        });
1455    }
1456
1457    fn add_block_to_file_state(file_path: &str) {
1458        Self::register_file_if_absent(file_path);
1459        let map = Self::map();
1460        if let Ok(r) = map.read() {
1461            if let Some(st) = r.get(file_path) {
1462                st.total_blocks.fetch_add(1, Ordering::AcqRel);
1463            }
1464        }
1465    }
1466
1467    fn set_fully_allocated(file_path: String) {
1468        Self::register_file_if_absent(&file_path);
1469        let map = Self::map();
1470        if let Ok(r) = map.read() {
1471            if let Some(st) = r.get(&file_path) {
1472                st.is_fully_allocated.store(true, Ordering::Release);
1473            }
1474        }
1475        flush_check(file_path);
1476    }
1477
1478    fn set_block_locked(block_id: usize) {
1479        if let Some(path) = BlockStateTracker::get_file_path_for_block(block_id) {
1480            let map = Self::map();
1481            if let Ok(r) = map.read() {
1482                if let Some(st) = r.get(&path) {
1483                    st.locked_block_ctr.fetch_add(1, Ordering::AcqRel);
1484                }
1485            }
1486        }
1487    }
1488
1489    fn set_block_unlocked(block_id: usize) {
1490        if let Some(path) = BlockStateTracker::get_file_path_for_block(block_id) {
1491            let map = Self::map();
1492            if let Ok(r) = map.read() {
1493                if let Some(st) = r.get(&path) {
1494                    st.locked_block_ctr.fetch_sub(1, Ordering::AcqRel);
1495                }
1496            }
1497            flush_check(path);
1498        }
1499    }
1500
1501    fn inc_checkpoint_for_file(file_path: &str) {
1502        let map = Self::map();
1503        if let Ok(r) = map.read() {
1504            if let Some(st) = r.get(file_path) {
1505                st.checkpoint_block_ctr.fetch_add(1, Ordering::AcqRel);
1506            }
1507        }
1508    }
1509
1510    fn get_state_snapshot(file_path: &str) -> Option<(u16, u16, u16, bool)> {
1511        let map = Self::map();
1512        let r = map.read().ok()?;
1513        let st = r.get(file_path)?;
1514        let locked = st.locked_block_ctr.load(Ordering::Acquire);
1515        let checkpointed = st.checkpoint_block_ctr.load(Ordering::Acquire);
1516        let total = st.total_blocks.load(Ordering::Acquire);
1517        let fully = st.is_fully_allocated.load(Ordering::Acquire);
1518        Some((locked, checkpointed, total, fully))
1519    }
1520}