armdb 0.1.14

sharded bitcask key-value storage optimized for NVMe
Documentation
use crate::error::{DbError, DbResult};

/// In-memory bitmap for tracking occupied/free slots.
///
/// Each bit represents one slot: 1 = occupied, 0 = free.
/// Backed by `Vec<u64>` — 64 slots per word.
pub struct Bitmap {
    /// Bit storage: 1 = occupied, 0 = free.
    words: Vec<u64>,
    /// Total number of logical slots.
    count: u32,
    /// Number of currently occupied slots.
    occupied: u32,
    /// Index of the first word that *might* contain a free bit.
    /// Invariant: all words before this index are `u64::MAX`.
    first_free_hint: usize,
}

#[inline]
const fn words_for(bits: u32) -> usize {
    bits.div_ceil(64) as usize
}

#[allow(dead_code)]
impl Bitmap {
    /// Create a bitmap with `count` slots, all free.
    pub fn new(count: u32) -> Self {
        let n = words_for(count);
        Self {
            words: vec![0u64; n],
            count,
            occupied: 0,
            first_free_hint: 0,
        }
    }

    /// Find the first free slot, mark it occupied, and return its id.
    ///
    /// Returns `Err(SlotsFull)` when every slot is occupied.
    pub fn alloc(&mut self) -> DbResult<u32> {
        let len = self.words.len();
        for i in self.first_free_hint..len {
            let word = self.words[i];
            if word != u64::MAX {
                let bit = (!word).trailing_zeros(); // first 0-bit
                let slot_id = i as u32 * 64 + bit;
                if slot_id >= self.count {
                    // Trailing bits beyond `count` — no real slot here.
                    break;
                }
                self.words[i] |= 1u64 << bit;
                self.occupied += 1;
                // If this word is now full, future scans can skip it.
                if self.words[i] == u64::MAX {
                    self.first_free_hint = i + 1;
                }
                return Ok(slot_id);
            }
        }
        Err(DbError::SlotsFull)
    }

    /// Mark `slot_id` as occupied.
    ///
    /// # Panics
    /// Panics if `slot_id >= count`.
    pub fn set(&mut self, slot_id: u32) {
        assert!(slot_id < self.count, "slot_id out of range");
        let (wi, bit) = (slot_id as usize / 64, slot_id % 64);
        let mask = 1u64 << bit;
        if self.words[wi] & mask == 0 {
            self.words[wi] |= mask;
            self.occupied += 1;
        }
    }

    /// Mark `slot_id` as free.
    ///
    /// # Panics
    /// Panics if `slot_id >= count`.
    pub fn clear(&mut self, slot_id: u32) {
        assert!(slot_id < self.count, "slot_id out of range");
        let (wi, bit) = (slot_id as usize / 64, slot_id % 64);
        let mask = 1u64 << bit;
        if self.words[wi] & mask != 0 {
            self.words[wi] &= !mask;
            self.occupied -= 1;
            if wi < self.first_free_hint {
                self.first_free_hint = wi;
            }
        }
    }

    /// Returns `true` if `slot_id` is occupied.
    ///
    /// # Panics
    /// Panics if `slot_id >= count`.
    pub fn is_set(&self, slot_id: u32) -> bool {
        assert!(slot_id < self.count, "slot_id out of range");
        let (wi, bit) = (slot_id as usize / 64, slot_id % 64);
        self.words[wi] & (1u64 << bit) != 0
    }

    /// Number of currently occupied slots.
    pub fn occupied(&self) -> u32 {
        self.occupied
    }

    /// Total number of logical slots.
    pub fn count(&self) -> u32 {
        self.count
    }

    /// Returns `true` when every slot is occupied.
    pub fn is_full(&self) -> bool {
        self.occupied == self.count
    }

    /// Grow the bitmap to hold at least `new_count` slots.
    ///
    /// # Panics
    /// Panics if `new_count < count` (shrinking is not supported).
    pub fn grow(&mut self, new_count: u32) {
        assert!(
            new_count >= self.count,
            "cannot shrink bitmap: {} -> {}",
            self.count,
            new_count
        );
        let new_words = words_for(new_count);
        self.words.resize(new_words, 0u64);
        self.count = new_count;
    }

    /// Serialize the bitmap words as a byte slice (little-endian).
    pub fn as_bytes(&self) -> &[u8] {
        // SAFETY: `u64` has no padding; its natural alignment (8) is >= alignment of u8.
        unsafe {
            std::slice::from_raw_parts(
                self.words.as_ptr() as *const u8,
                self.words.len() * std::mem::size_of::<u64>(),
            )
        }
    }

    /// Build a bitmap by marking every slot whose `meta`'s status is OCCUPIED.
    pub fn from_versions(versions: &[u32]) -> Self {
        use crate::fixed::slot;
        let mut b = Bitmap::new(versions.len() as u32);
        for (i, &m) in versions.iter().enumerate() {
            if slot::status_of(m) == slot::STATUS_OCCUPIED {
                b.set(i as u32);
            }
        }
        b
    }

    /// Deserialize a bitmap from raw bytes and a logical slot count.
    ///
    /// Recomputes `occupied` and `first_free_hint` from the data.
    ///
    /// # Panics
    /// Panics if `data.len()` is not a multiple of 8 or does not have
    /// enough words for `count`.
    pub fn from_bytes(data: &[u8], count: u32) -> Self {
        assert!(
            data.len().is_multiple_of(8),
            "data length must be a multiple of 8"
        );
        let n_words = data.len() / 8;
        assert!(
            n_words >= words_for(count),
            "not enough words for {count} slots"
        );

        let mut words = vec![0u64; n_words];
        // SAFETY: copying aligned-enough bytes into Vec<u64>.
        unsafe {
            std::ptr::copy_nonoverlapping(data.as_ptr(), words.as_mut_ptr() as *mut u8, data.len());
        }

        // Mask out any trailing bits beyond `count` so they don't
        // pollute the occupied counter.
        let tail = count % 64;
        if tail != 0 {
            let last = words_for(count) - 1;
            words[last] &= (1u64 << tail) - 1;
        }

        let mut occupied: u32 = 0;
        let mut first_free_hint: usize = n_words; // assume full
        for (i, &w) in words.iter().enumerate() {
            occupied += w.count_ones();
            if first_free_hint == n_words && w != u64::MAX {
                first_free_hint = i;
            }
        }

        // If all words are full but occupied < count, the last word has
        // free tail bits — handle that edge via the general hint logic:
        // `first_free_hint` is already set correctly because the masked
        // last word won't be MAX when tail bits exist.

        Self {
            words,
            count,
            occupied,
            first_free_hint,
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_from_versions() {
        use crate::fixed::slot::{STATUS_DELETED, STATUS_FREE, STATUS_OCCUPIED, pack_meta};
        let v = vec![
            pack_meta(STATUS_OCCUPIED, 1),
            pack_meta(STATUS_DELETED, 2),
            pack_meta(STATUS_FREE, 0),
            pack_meta(STATUS_OCCUPIED, 5),
        ];
        let b = Bitmap::from_versions(&v);
        assert!(b.is_set(0));
        assert!(!b.is_set(1));
        assert!(!b.is_set(2));
        assert!(b.is_set(3));
        assert_eq!(b.occupied(), 2);
    }

    #[test]
    fn test_alloc_sequential() {
        let mut bm = Bitmap::new(128);
        for i in 0..128 {
            assert_eq!(bm.alloc().unwrap(), i);
        }
        assert!(bm.is_full());
        assert_eq!(bm.occupied(), 128);
        assert!(bm.alloc().is_err());
    }

    #[test]
    fn test_free_and_realloc() {
        let mut bm = Bitmap::new(64);
        let a = bm.alloc().unwrap();
        let b = bm.alloc().unwrap();
        let c = bm.alloc().unwrap();
        assert_eq!((a, b, c), (0, 1, 2));

        // Free the middle slot.
        bm.clear(b);
        assert_eq!(bm.occupied(), 2);
        assert!(!bm.is_set(b));

        // Next alloc should reuse slot 1.
        let reused = bm.alloc().unwrap();
        assert_eq!(reused, b);
        assert_eq!(bm.occupied(), 3);
    }

    #[test]
    fn test_grow() {
        let mut bm = Bitmap::new(64);
        for _ in 0..64 {
            bm.alloc().unwrap();
        }
        assert!(bm.is_full());

        bm.grow(128);
        assert!(!bm.is_full());
        assert_eq!(bm.count(), 128);

        let slot = bm.alloc().unwrap();
        assert_eq!(slot, 64);
    }

    #[test]
    fn test_serde_roundtrip() {
        let mut bm = Bitmap::new(200);
        bm.set(0);
        bm.set(63);
        bm.set(64);
        bm.set(127);
        bm.set(199);

        let bytes = bm.as_bytes().to_vec();
        let restored = Bitmap::from_bytes(&bytes, 200);

        assert_eq!(restored.count(), 200);
        assert_eq!(restored.occupied(), 5);
        assert!(restored.is_set(0));
        assert!(restored.is_set(63));
        assert!(restored.is_set(64));
        assert!(restored.is_set(127));
        assert!(restored.is_set(199));
        assert!(!restored.is_set(1));
        assert!(!restored.is_set(100));
    }

    #[test]
    fn test_set_idempotent() {
        let mut bm = Bitmap::new(64);
        bm.set(5);
        bm.set(5);
        assert_eq!(bm.occupied(), 1);
    }

    #[test]
    fn test_clear_idempotent() {
        let mut bm = Bitmap::new(64);
        bm.set(5);
        bm.clear(5);
        bm.clear(5);
        assert_eq!(bm.occupied(), 0);
    }

    #[test]
    #[should_panic(expected = "slot_id out of range")]
    fn test_set_out_of_range() {
        let mut bm = Bitmap::new(64);
        bm.set(64);
    }

    #[test]
    fn test_non_aligned_count() {
        // Count not a multiple of 64.
        let mut bm = Bitmap::new(100);
        for i in 0..100 {
            assert_eq!(bm.alloc().unwrap(), i);
        }
        assert!(bm.is_full());
        assert!(bm.alloc().is_err());
    }
}