btree-store 0.1.5

A persistent, embedded key-value storage engine in Rust featuring a Copy-On-Write (COW) B-Tree, ACID compliance, and crash safety with multi-bucket support
Documentation
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};

use crate::node::Node;
use crate::store::Store;
use crate::{Error, Result, Tree};
use parking_lot::Mutex;

pub type Lid = u32;
pub type Pid = u32;

pub(crate) fn encode_u32_key(value: u32) -> [u8; 4] {
    value.to_be_bytes()
}

pub(crate) fn decode_u32_key(bytes: &[u8]) -> Result<u32> {
    if bytes.len() != 4 {
        return Err(Error::Corruption);
    }
    Ok(u32::from_be_bytes(bytes.try_into().unwrap()))
}

const LID_PID_CACHE_SHARDS: usize = 64;
const LID_PID_CACHE_CAPACITY: usize = 8192;

struct LidPidCacheEntry {
    lid: Lid,
    pid: Pid,
    usage: bool,
}

struct LidPidCacheShard {
    entries: Vec<Option<LidPidCacheEntry>>,
    map: HashMap<Lid, usize>,
    hand: usize,
    capacity: usize,
}

impl LidPidCacheShard {
    fn new(capacity: usize) -> Self {
        Self {
            entries: (0..capacity).map(|_| None).collect(),
            map: HashMap::with_capacity(capacity),
            hand: 0,
            capacity,
        }
    }

    fn get(&mut self, lid: Lid) -> Option<Pid> {
        if let Some(&idx) = self.map.get(&lid)
            && let Some(entry) = &mut self.entries[idx]
        {
            entry.usage = true;
            return Some(entry.pid);
        }
        None
    }

    fn put(&mut self, lid: Lid, pid: Pid) {
        if let Some(&idx) = self.map.get(&lid)
            && let Some(entry) = &mut self.entries[idx]
        {
            entry.usage = true;
            entry.pid = pid;
            return;
        }

        loop {
            let evict = match &mut self.entries[self.hand] {
                None => true,
                Some(entry) => {
                    if entry.usage {
                        entry.usage = false;
                        false
                    } else {
                        self.map.remove(&entry.lid);
                        true
                    }
                }
            };

            if evict {
                self.entries[self.hand] = Some(LidPidCacheEntry {
                    lid,
                    pid,
                    usage: true,
                });
                self.map.insert(lid, self.hand);
                self.hand = (self.hand + 1) % self.capacity;
                return;
            }
            self.hand = (self.hand + 1) % self.capacity;
        }
    }

    fn invalidate(&mut self, lid: Lid) {
        if let Some(&idx) = self.map.get(&lid) {
            self.entries[idx] = None;
            self.map.remove(&lid);
        }
    }

    fn clear(&mut self) {
        for slot in &mut self.entries {
            *slot = None;
        }
        self.map.clear();
        self.hand = 0;
    }
}

struct LidPidCache {
    shards: Vec<Mutex<LidPidCacheShard>>,
}

impl LidPidCache {
    fn new(capacity: usize) -> Self {
        let shard_cap = std::cmp::max(1, capacity / LID_PID_CACHE_SHARDS);
        let mut shards = Vec::with_capacity(LID_PID_CACHE_SHARDS);
        for _ in 0..LID_PID_CACHE_SHARDS {
            shards.push(Mutex::new(LidPidCacheShard::new(shard_cap)));
        }
        Self { shards }
    }

    fn shard(&self, lid: Lid) -> &Mutex<LidPidCacheShard> {
        let idx = (lid as usize) % LID_PID_CACHE_SHARDS;
        &self.shards[idx]
    }

    fn get(&self, lid: Lid) -> Option<Pid> {
        self.shard(lid).lock().get(lid)
    }

    fn put(&self, lid: Lid, pid: Pid) {
        self.shard(lid).lock().put(lid, pid)
    }

    fn invalidate(&self, lid: Lid) {
        self.shard(lid).lock().invalidate(lid)
    }

    fn clear(&self) {
        for shard in &self.shards {
            shard.lock().clear();
        }
    }
}

pub trait PageStore: Send + Sync {
    fn alloc_pages(&self, nr_pages: u32, alloc: &mut HashSet<u32>) -> Result<Vec<Lid>>;

    fn alloc_page(&self, alloc: &mut HashSet<u32>) -> Result<Lid> {
        let pages = self.alloc_pages(1, alloc)?;
        Ok(pages[0])
    }

    fn schedule_free(&self, lid: Lid, freed: &mut Vec<(u32, u32)>) -> Result<()>;
    fn free_pages_immediate(&self, page_id: u32, nr_pages: u32) -> Result<()>;
    fn unfree_pages_immediate(&self, page_id: u32, nr_pages: u32) -> Result<()>;
    fn load_node(&self, lid: Lid) -> Result<Arc<Node>>;
    fn load_page(&self, lid: Lid) -> Result<Vec<u8>>;
    fn load_data(&self, lids: &[Lid], len: usize) -> Result<Vec<u8>>;
    fn read_data(&self, lids: &[Lid], buf: &mut [u8]) -> Result<()>;
    fn write_data(&self, lids: &[Lid], data: &[u8]) -> Result<()>;
    fn write_page(&self, lid: Lid, data: &[u8]) -> Result<()>;
}

pub struct LogicalStore {
    store: Arc<Store>,
    mapping_tree: Arc<Tree>,
    reverse_tree: Arc<Tree>,
    lid_cache: LidPidCache,
    lid_cache_seq: AtomicU64,
}

impl LogicalStore {
    pub fn new(store: Arc<Store>, mapping_tree: Arc<Tree>, reverse_tree: Arc<Tree>) -> Self {
        let (seq, _) = store.shared_snapshot();
        Self {
            store,
            mapping_tree,
            reverse_tree,
            lid_cache: LidPidCache::new(LID_PID_CACHE_CAPACITY),
            lid_cache_seq: AtomicU64::new(seq),
        }
    }

    pub(crate) fn clear_lid_cache(&self) {
        let (seq, _) = self.store.shared_snapshot();
        self.lid_cache.clear();
        self.lid_cache_seq.store(seq, Ordering::Release);
    }

    fn refresh_lid_cache(&self) {
        let (seq, _) = self.store.shared_snapshot();
        let cached = self.lid_cache_seq.load(Ordering::Acquire);
        if cached != seq {
            self.lid_cache.clear();
            self.lid_cache_seq.store(seq, Ordering::Release);
        }
    }

    fn resolve_pid(&self, lid: Lid) -> Result<Pid> {
        self.refresh_lid_cache();
        if let Some(pid) = self.lid_cache.get(lid) {
            return Ok(pid);
        }
        let key = encode_u32_key(lid);
        let value = self.mapping_tree.get(&key)?;
        let pid = decode_u32_key(&value)?;
        self.lid_cache.put(lid, pid);
        Ok(pid)
    }

    fn map_lids(&self, lids: &[Lid]) -> Result<Vec<Pid>> {
        let mut pids = Vec::with_capacity(lids.len());
        for &lid in lids {
            pids.push(self.resolve_pid(lid)?);
        }
        Ok(pids)
    }
}

impl PageStore for Store {
    fn alloc_pages(&self, nr_pages: u32, alloc: &mut HashSet<u32>) -> Result<Vec<Lid>> {
        let pages = self.alloc_pages(nr_pages)?;
        for &pid in &pages {
            alloc.insert(pid);
        }
        Ok(pages)
    }

    fn schedule_free(&self, lid: Lid, freed: &mut Vec<(u32, u32)>) -> Result<()> {
        if lid == 0 {
            return Ok(());
        }
        freed.push((lid, 1));
        Ok(())
    }

    fn free_pages_immediate(&self, page_id: u32, nr_pages: u32) -> Result<()> {
        self.free_pages(page_id, nr_pages)
    }

    fn unfree_pages_immediate(&self, page_id: u32, nr_pages: u32) -> Result<()> {
        self.unfree_pages(page_id, nr_pages)
    }

    fn load_node(&self, lid: Lid) -> Result<Arc<Node>> {
        self.load_node(lid)
    }

    fn load_page(&self, lid: Lid) -> Result<Vec<u8>> {
        self.load_page(lid)
    }

    fn load_data(&self, lids: &[Lid], len: usize) -> Result<Vec<u8>> {
        self.load_data(lids, len)
    }

    fn read_data(&self, lids: &[Lid], buf: &mut [u8]) -> Result<()> {
        self.read_data(lids, buf)
    }

    fn write_data(&self, lids: &[Lid], data: &[u8]) -> Result<()> {
        self.write_data(lids, data)
    }

    fn write_page(&self, lid: Lid, data: &[u8]) -> Result<()> {
        self.write_page(lid, data)
    }
}

impl PageStore for LogicalStore {
    fn alloc_pages(&self, nr_pages: u32, alloc: &mut HashSet<u32>) -> Result<Vec<Lid>> {
        if nr_pages == 0 {
            return Ok(Vec::new());
        }

        let pids = self.store.alloc_pages(nr_pages)?;
        for &pid in &pids {
            alloc.insert(pid);
        }

        let lids = self.store.alloc_lids(nr_pages)?;

        for (lid, pid) in lids.iter().zip(pids.iter()) {
            let lid_key = encode_u32_key(*lid);
            let pid_value = encode_u32_key(*pid);
            let pid_key = encode_u32_key(*pid);
            let lid_value = encode_u32_key(*lid);
            self.mapping_tree.put(&lid_key, &pid_value)?;
            self.reverse_tree.put(&pid_key, &lid_value)?;
            self.lid_cache.put(*lid, *pid);
        }

        Ok(lids)
    }

    fn schedule_free(&self, lid: Lid, freed: &mut Vec<(u32, u32)>) -> Result<()> {
        if lid == 0 {
            return Ok(());
        }
        let pid = self.resolve_pid(lid)?;
        let lid_key = encode_u32_key(lid);
        let pid_key = encode_u32_key(pid);
        self.mapping_tree.del(&lid_key)?;
        self.reverse_tree.del(&pid_key)?;
        self.lid_cache.invalidate(lid);
        freed.push((pid, 1));
        Ok(())
    }

    fn free_pages_immediate(&self, page_id: u32, nr_pages: u32) -> Result<()> {
        self.store.free_pages(page_id, nr_pages)
    }

    fn unfree_pages_immediate(&self, page_id: u32, nr_pages: u32) -> Result<()> {
        self.store.unfree_pages(page_id, nr_pages)
    }

    fn load_node(&self, lid: Lid) -> Result<Arc<Node>> {
        let pid = self.resolve_pid(lid)?;
        self.store.load_node(pid)
    }

    fn load_page(&self, lid: Lid) -> Result<Vec<u8>> {
        let pid = self.resolve_pid(lid)?;
        self.store.load_page(pid)
    }

    fn load_data(&self, lids: &[Lid], len: usize) -> Result<Vec<u8>> {
        let pids = self.map_lids(lids)?;
        self.store.load_data(&pids, len)
    }

    fn read_data(&self, lids: &[Lid], buf: &mut [u8]) -> Result<()> {
        let pids = self.map_lids(lids)?;
        self.store.read_data(&pids, buf)
    }

    fn write_data(&self, lids: &[Lid], data: &[u8]) -> Result<()> {
        let pids = self.map_lids(lids)?;
        self.store.write_data(&pids, data)
    }

    fn write_page(&self, lid: Lid, data: &[u8]) -> Result<()> {
        let pid = self.resolve_pid(lid)?;
        self.store.write_page(pid, data)
    }
}