iridium-db 0.4.0

A high-performance vector-graph hybrid storage and indexing engine
use std::collections::{BTreeMap, HashMap};
use std::io::{Cursor, Read};
use std::path::{Path, PathBuf};

use roaring::RoaringBitmap;

use crate::core::reactor::Reactor;

const BITMAP_STORE_FILE: &str = "ir.bitmap.v1";
const BITMAP_STORE_MAGIC: &[u8] = b"IRBM1\n";

#[derive(Debug)]
pub enum BitmapError {
    Io(std::io::Error),
    Corrupt(String),
    InvalidInput(String),
}

pub type Result<T> = std::result::Result<T, BitmapError>;

impl From<std::io::Error> for BitmapError {
    fn from(err: std::io::Error) -> Self {
        Self::Io(err)
    }
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct BitmapKey {
    index_name: String,
    value_key: String,
}

#[derive(Debug, Clone)]
pub struct BitmapStore {
    dir: PathBuf,
    committed: HashMap<BitmapKey, RoaringBitmap>,
    mutable_add: HashMap<BitmapKey, RoaringBitmap>,
    mutable_remove: HashMap<BitmapKey, RoaringBitmap>,
    dirty: bool,
}

impl BitmapStore {
    pub fn load_or_create_with_reactor(dir: PathBuf, reactor: &dyn Reactor) -> Result<Self> {
        reactor.create_dir_all(&dir)?;
        let path = store_path(&dir);
        if reactor.metadata_len(&path).is_ok() {
            let data = reactor.read_file(&path)?;
            let committed = decode_store(&data)?;
            Ok(Self {
                dir,
                committed,
                mutable_add: HashMap::new(),
                mutable_remove: HashMap::new(),
                dirty: false,
            })
        } else {
            Ok(Self {
                dir,
                committed: HashMap::new(),
                mutable_add: HashMap::new(),
                mutable_remove: HashMap::new(),
                dirty: false,
            })
        }
    }

    pub fn add_posting(&mut self, index_name: &str, value_key: &str, node_id: u64) -> Result<()> {
        let index_name = index_name.trim();
        let value_key = value_key.trim();
        if index_name.is_empty() || value_key.is_empty() {
            return Err(BitmapError::InvalidInput(
                "bitmap posting requires non-empty index_name and value_key".to_string(),
            ));
        }
        let node_id_u32 = u32::try_from(node_id).map_err(|_| {
            BitmapError::InvalidInput("bitmap node_id exceeds u32 range".to_string())
        })?;
        let key = BitmapKey {
            index_name: index_name.to_string(),
            value_key: value_key.to_string(),
        };
        self.mutable_remove
            .entry(key.clone())
            .or_default()
            .remove(node_id_u32);
        self.mutable_add.entry(key).or_default().insert(node_id_u32);
        self.dirty = true;
        Ok(())
    }

    pub fn remove_posting(
        &mut self,
        index_name: &str,
        value_key: &str,
        node_id: u64,
    ) -> Result<()> {
        let index_name = index_name.trim();
        let value_key = value_key.trim();
        if index_name.is_empty() || value_key.is_empty() {
            return Err(BitmapError::InvalidInput(
                "bitmap posting requires non-empty index_name and value_key".to_string(),
            ));
        }
        let node_id_u32 = u32::try_from(node_id).map_err(|_| {
            BitmapError::InvalidInput("bitmap node_id exceeds u32 range".to_string())
        })?;
        let key = BitmapKey {
            index_name: index_name.to_string(),
            value_key: value_key.to_string(),
        };
        self.mutable_add
            .entry(key.clone())
            .or_default()
            .remove(node_id_u32);
        self.mutable_remove
            .entry(key)
            .or_default()
            .insert(node_id_u32);
        self.dirty = true;
        Ok(())
    }

    pub fn postings(&self, index_name: &str, value_key: &str) -> Vec<u64> {
        self.postings_in_range_limit(index_name, value_key, 0, u64::MAX, None)
    }

    pub fn postings_in_range_limit(
        &self,
        index_name: &str,
        value_key: &str,
        range_start: u64,
        range_end_exclusive: u64,
        limit: Option<usize>,
    ) -> Vec<u64> {
        let key = BitmapKey {
            index_name: index_name.to_string(),
            value_key: value_key.to_string(),
        };
        let mut combined = self
            .committed
            .get(&key)
            .cloned()
            .unwrap_or_else(RoaringBitmap::new);
        if let Some(delta) = self.mutable_add.get(&key) {
            combined |= delta;
        }
        if let Some(delta) = self.mutable_remove.get(&key) {
            combined -= delta;
        }
        let start_u32 = u32::try_from(range_start).unwrap_or(u32::MAX);
        let end_u32 = u32::try_from(range_end_exclusive).unwrap_or(u32::MAX);
        if start_u32 >= end_u32 {
            return Vec::new();
        }

        let mut out = Vec::new();
        for value in combined.range(start_u32..end_u32) {
            out.push(value as u64);
            if let Some(max) = limit {
                if out.len() >= max {
                    break;
                }
            }
        }
        out
    }

    pub fn flush_with_reactor(&mut self, reactor: &dyn Reactor) -> Result<()> {
        if !self.dirty {
            return Ok(());
        }
        for (key, bitmap) in self.mutable_add.drain() {
            let target = self.committed.entry(key).or_default();
            *target |= bitmap;
        }
        for (key, bitmap) in self.mutable_remove.drain() {
            if let Some(target) = self.committed.get_mut(&key) {
                *target -= bitmap;
                if target.is_empty() {
                    self.committed.remove(&key);
                }
            }
        }
        let encoded = encode_store(&self.committed)?;
        reactor.write_file(&store_path(&self.dir), &encoded)?;
        self.dirty = false;
        Ok(())
    }

    pub fn is_dirty(&self) -> bool {
        self.dirty
    }
}

fn store_path(dir: &Path) -> PathBuf {
    dir.join(BITMAP_STORE_FILE)
}

fn encode_store(entries: &HashMap<BitmapKey, RoaringBitmap>) -> Result<Vec<u8>> {
    let mut out = Vec::new();
    out.extend_from_slice(BITMAP_STORE_MAGIC);
    let sorted = entries
        .iter()
        .map(|(k, v)| (k.clone(), v.clone()))
        .collect::<BTreeMap<BitmapKey, RoaringBitmap>>();
    out.extend_from_slice(&(sorted.len() as u32).to_le_bytes());

    for (key, bitmap) in sorted {
        write_len_prefixed_string(&mut out, &key.index_name)?;
        write_len_prefixed_string(&mut out, &key.value_key)?;
        let mut bitmap_bytes = Vec::new();
        bitmap.serialize_into(&mut bitmap_bytes).map_err(|err| {
            BitmapError::Corrupt(format!("failed to serialize roaring bitmap: {}", err))
        })?;
        out.extend_from_slice(&(bitmap_bytes.len() as u32).to_le_bytes());
        out.extend_from_slice(&bitmap_bytes);
    }
    Ok(out)
}

fn decode_store(data: &[u8]) -> Result<HashMap<BitmapKey, RoaringBitmap>> {
    let mut cursor = Cursor::new(data);
    let mut magic = vec![0u8; BITMAP_STORE_MAGIC.len()];
    cursor.read_exact(&mut magic)?;
    if magic.as_slice() != BITMAP_STORE_MAGIC {
        return Err(BitmapError::Corrupt(
            "invalid bitmap store magic".to_string(),
        ));
    }
    let count = read_u32(&mut cursor)? as usize;
    let mut out = HashMap::new();
    for _ in 0..count {
        let index_name = read_len_prefixed_string(&mut cursor)?;
        let value_key = read_len_prefixed_string(&mut cursor)?;
        let bitmap_len = read_u32(&mut cursor)? as usize;
        let mut bitmap_bytes = vec![0u8; bitmap_len];
        cursor.read_exact(&mut bitmap_bytes)?;
        let bitmap =
            RoaringBitmap::deserialize_from(&mut Cursor::new(bitmap_bytes)).map_err(|err| {
                BitmapError::Corrupt(format!("failed to deserialize roaring bitmap: {}", err))
            })?;
        out.insert(
            BitmapKey {
                index_name,
                value_key,
            },
            bitmap,
        );
    }
    Ok(out)
}

fn write_len_prefixed_string(out: &mut Vec<u8>, value: &str) -> Result<()> {
    let len = u16::try_from(value.len())
        .map_err(|_| BitmapError::InvalidInput("bitmap string key exceeds u16".to_string()))?;
    out.extend_from_slice(&len.to_le_bytes());
    out.extend_from_slice(value.as_bytes());
    Ok(())
}

fn read_len_prefixed_string(cursor: &mut Cursor<&[u8]>) -> Result<String> {
    let mut len_bytes = [0u8; 2];
    cursor.read_exact(&mut len_bytes)?;
    let len = u16::from_le_bytes(len_bytes) as usize;
    let mut bytes = vec![0u8; len];
    cursor.read_exact(&mut bytes)?;
    String::from_utf8(bytes)
        .map_err(|_| BitmapError::Corrupt("invalid utf8 in bitmap key".to_string()))
}

fn read_u32(cursor: &mut Cursor<&[u8]>) -> Result<u32> {
    let mut buf = [0u8; 4];
    cursor.read_exact(&mut buf)?;
    Ok(u32::from_le_bytes(buf))
}

#[cfg(test)]
mod tests {
    use super::*;

    fn temp_dir(prefix: &str) -> PathBuf {
        let mut dir = std::env::temp_dir();
        let stamp = format!(
            "{}_{}_{}",
            prefix,
            std::process::id(),
            std::time::SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .unwrap()
                .as_nanos()
        );
        dir.push(stamp);
        std::fs::create_dir_all(&dir).expect("temp dir create");
        dir
    }

    #[test]
    fn bitmap_store_flush_and_reload_round_trip() {
        let dir = temp_dir("bitmap_store_round_trip");
        let reactor =
            crate::core::reactor::DeterministicReactor::new(std::time::SystemTime::UNIX_EPOCH, 1);
        let mut store = BitmapStore::load_or_create_with_reactor(dir.clone(), &reactor).unwrap();
        store.add_posting("idx_country", "US", 10).unwrap();
        store.add_posting("idx_country", "US", 12).unwrap();
        store.add_posting("idx_country", "GB", 11).unwrap();
        store.flush_with_reactor(&reactor).unwrap();

        let reloaded = BitmapStore::load_or_create_with_reactor(dir, &reactor).unwrap();
        assert_eq!(reloaded.postings("idx_country", "US"), vec![10, 12]);
        assert_eq!(reloaded.postings("idx_country", "GB"), vec![11]);
    }

    #[test]
    fn postings_in_range_limit_respects_range_and_limit() {
        let dir = temp_dir("bitmap_store_range_limit");
        let reactor =
            crate::core::reactor::DeterministicReactor::new(std::time::SystemTime::UNIX_EPOCH, 2);
        let mut store = BitmapStore::load_or_create_with_reactor(dir, &reactor).unwrap();
        for node_id in [5_u64, 10, 15, 20, 25, 30] {
            store.add_posting("idx_country", "US", node_id).unwrap();
        }
        let bounded = store.postings_in_range_limit("idx_country", "US", 10, 30, Some(3));
        assert_eq!(bounded, vec![10, 15, 20]);
        let all_in_range = store.postings_in_range_limit("idx_country", "US", 10, 30, None);
        assert_eq!(all_in_range, vec![10, 15, 20, 25]);
    }

    #[test]
    fn remove_posting_updates_pending_and_persisted_state() {
        let dir = temp_dir("bitmap_store_remove");
        let reactor =
            crate::core::reactor::DeterministicReactor::new(std::time::SystemTime::UNIX_EPOCH, 3);
        let mut store = BitmapStore::load_or_create_with_reactor(dir.clone(), &reactor).unwrap();
        store.add_posting("idx_country", "US", 10).unwrap();
        store.add_posting("idx_country", "US", 12).unwrap();
        store.flush_with_reactor(&reactor).unwrap();

        store.remove_posting("idx_country", "US", 10).unwrap();
        assert_eq!(store.postings("idx_country", "US"), vec![12]);
        store.flush_with_reactor(&reactor).unwrap();

        let reloaded = BitmapStore::load_or_create_with_reactor(dir, &reactor).unwrap();
        assert_eq!(reloaded.postings("idx_country", "US"), vec![12]);
    }
}