use std::collections::{BTreeMap, HashMap};
use std::io::{Cursor, Read};
use std::path::{Path, PathBuf};
use roaring::RoaringBitmap;
use crate::core::reactor::Reactor;
const BITMAP_STORE_FILE: &str = "ir.bitmap.v1";
const BITMAP_STORE_MAGIC: &[u8] = b"IRBM1\n";
#[derive(Debug)]
pub enum BitmapError {
Io(std::io::Error),
Corrupt(String),
InvalidInput(String),
}
pub type Result<T> = std::result::Result<T, BitmapError>;
impl From<std::io::Error> for BitmapError {
fn from(err: std::io::Error) -> Self {
Self::Io(err)
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct BitmapKey {
index_name: String,
value_key: String,
}
#[derive(Debug, Clone)]
pub struct BitmapStore {
dir: PathBuf,
committed: HashMap<BitmapKey, RoaringBitmap>,
mutable_add: HashMap<BitmapKey, RoaringBitmap>,
mutable_remove: HashMap<BitmapKey, RoaringBitmap>,
dirty: bool,
}
impl BitmapStore {
pub fn load_or_create_with_reactor(dir: PathBuf, reactor: &dyn Reactor) -> Result<Self> {
reactor.create_dir_all(&dir)?;
let path = store_path(&dir);
if reactor.metadata_len(&path).is_ok() {
let data = reactor.read_file(&path)?;
let committed = decode_store(&data)?;
Ok(Self {
dir,
committed,
mutable_add: HashMap::new(),
mutable_remove: HashMap::new(),
dirty: false,
})
} else {
Ok(Self {
dir,
committed: HashMap::new(),
mutable_add: HashMap::new(),
mutable_remove: HashMap::new(),
dirty: false,
})
}
}
pub fn add_posting(&mut self, index_name: &str, value_key: &str, node_id: u64) -> Result<()> {
let index_name = index_name.trim();
let value_key = value_key.trim();
if index_name.is_empty() || value_key.is_empty() {
return Err(BitmapError::InvalidInput(
"bitmap posting requires non-empty index_name and value_key".to_string(),
));
}
let node_id_u32 = u32::try_from(node_id).map_err(|_| {
BitmapError::InvalidInput("bitmap node_id exceeds u32 range".to_string())
})?;
let key = BitmapKey {
index_name: index_name.to_string(),
value_key: value_key.to_string(),
};
self.mutable_remove
.entry(key.clone())
.or_default()
.remove(node_id_u32);
self.mutable_add.entry(key).or_default().insert(node_id_u32);
self.dirty = true;
Ok(())
}
pub fn remove_posting(
&mut self,
index_name: &str,
value_key: &str,
node_id: u64,
) -> Result<()> {
let index_name = index_name.trim();
let value_key = value_key.trim();
if index_name.is_empty() || value_key.is_empty() {
return Err(BitmapError::InvalidInput(
"bitmap posting requires non-empty index_name and value_key".to_string(),
));
}
let node_id_u32 = u32::try_from(node_id).map_err(|_| {
BitmapError::InvalidInput("bitmap node_id exceeds u32 range".to_string())
})?;
let key = BitmapKey {
index_name: index_name.to_string(),
value_key: value_key.to_string(),
};
self.mutable_add
.entry(key.clone())
.or_default()
.remove(node_id_u32);
self.mutable_remove
.entry(key)
.or_default()
.insert(node_id_u32);
self.dirty = true;
Ok(())
}
pub fn postings(&self, index_name: &str, value_key: &str) -> Vec<u64> {
self.postings_in_range_limit(index_name, value_key, 0, u64::MAX, None)
}
pub fn postings_in_range_limit(
&self,
index_name: &str,
value_key: &str,
range_start: u64,
range_end_exclusive: u64,
limit: Option<usize>,
) -> Vec<u64> {
let key = BitmapKey {
index_name: index_name.to_string(),
value_key: value_key.to_string(),
};
let mut combined = self
.committed
.get(&key)
.cloned()
.unwrap_or_else(RoaringBitmap::new);
if let Some(delta) = self.mutable_add.get(&key) {
combined |= delta;
}
if let Some(delta) = self.mutable_remove.get(&key) {
combined -= delta;
}
let start_u32 = u32::try_from(range_start).unwrap_or(u32::MAX);
let end_u32 = u32::try_from(range_end_exclusive).unwrap_or(u32::MAX);
if start_u32 >= end_u32 {
return Vec::new();
}
let mut out = Vec::new();
for value in combined.range(start_u32..end_u32) {
out.push(value as u64);
if let Some(max) = limit {
if out.len() >= max {
break;
}
}
}
out
}
pub fn flush_with_reactor(&mut self, reactor: &dyn Reactor) -> Result<()> {
if !self.dirty {
return Ok(());
}
for (key, bitmap) in self.mutable_add.drain() {
let target = self.committed.entry(key).or_default();
*target |= bitmap;
}
for (key, bitmap) in self.mutable_remove.drain() {
if let Some(target) = self.committed.get_mut(&key) {
*target -= bitmap;
if target.is_empty() {
self.committed.remove(&key);
}
}
}
let encoded = encode_store(&self.committed)?;
reactor.write_file(&store_path(&self.dir), &encoded)?;
self.dirty = false;
Ok(())
}
pub fn is_dirty(&self) -> bool {
self.dirty
}
}
fn store_path(dir: &Path) -> PathBuf {
dir.join(BITMAP_STORE_FILE)
}
fn encode_store(entries: &HashMap<BitmapKey, RoaringBitmap>) -> Result<Vec<u8>> {
let mut out = Vec::new();
out.extend_from_slice(BITMAP_STORE_MAGIC);
let sorted = entries
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<BTreeMap<BitmapKey, RoaringBitmap>>();
out.extend_from_slice(&(sorted.len() as u32).to_le_bytes());
for (key, bitmap) in sorted {
write_len_prefixed_string(&mut out, &key.index_name)?;
write_len_prefixed_string(&mut out, &key.value_key)?;
let mut bitmap_bytes = Vec::new();
bitmap.serialize_into(&mut bitmap_bytes).map_err(|err| {
BitmapError::Corrupt(format!("failed to serialize roaring bitmap: {}", err))
})?;
out.extend_from_slice(&(bitmap_bytes.len() as u32).to_le_bytes());
out.extend_from_slice(&bitmap_bytes);
}
Ok(out)
}
fn decode_store(data: &[u8]) -> Result<HashMap<BitmapKey, RoaringBitmap>> {
let mut cursor = Cursor::new(data);
let mut magic = vec![0u8; BITMAP_STORE_MAGIC.len()];
cursor.read_exact(&mut magic)?;
if magic.as_slice() != BITMAP_STORE_MAGIC {
return Err(BitmapError::Corrupt(
"invalid bitmap store magic".to_string(),
));
}
let count = read_u32(&mut cursor)? as usize;
let mut out = HashMap::new();
for _ in 0..count {
let index_name = read_len_prefixed_string(&mut cursor)?;
let value_key = read_len_prefixed_string(&mut cursor)?;
let bitmap_len = read_u32(&mut cursor)? as usize;
let mut bitmap_bytes = vec![0u8; bitmap_len];
cursor.read_exact(&mut bitmap_bytes)?;
let bitmap =
RoaringBitmap::deserialize_from(&mut Cursor::new(bitmap_bytes)).map_err(|err| {
BitmapError::Corrupt(format!("failed to deserialize roaring bitmap: {}", err))
})?;
out.insert(
BitmapKey {
index_name,
value_key,
},
bitmap,
);
}
Ok(out)
}
fn write_len_prefixed_string(out: &mut Vec<u8>, value: &str) -> Result<()> {
let len = u16::try_from(value.len())
.map_err(|_| BitmapError::InvalidInput("bitmap string key exceeds u16".to_string()))?;
out.extend_from_slice(&len.to_le_bytes());
out.extend_from_slice(value.as_bytes());
Ok(())
}
fn read_len_prefixed_string(cursor: &mut Cursor<&[u8]>) -> Result<String> {
let mut len_bytes = [0u8; 2];
cursor.read_exact(&mut len_bytes)?;
let len = u16::from_le_bytes(len_bytes) as usize;
let mut bytes = vec![0u8; len];
cursor.read_exact(&mut bytes)?;
String::from_utf8(bytes)
.map_err(|_| BitmapError::Corrupt("invalid utf8 in bitmap key".to_string()))
}
fn read_u32(cursor: &mut Cursor<&[u8]>) -> Result<u32> {
let mut buf = [0u8; 4];
cursor.read_exact(&mut buf)?;
Ok(u32::from_le_bytes(buf))
}
#[cfg(test)]
mod tests {
use super::*;
fn temp_dir(prefix: &str) -> PathBuf {
let mut dir = std::env::temp_dir();
let stamp = format!(
"{}_{}_{}",
prefix,
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
dir.push(stamp);
std::fs::create_dir_all(&dir).expect("temp dir create");
dir
}
#[test]
fn bitmap_store_flush_and_reload_round_trip() {
let dir = temp_dir("bitmap_store_round_trip");
let reactor =
crate::core::reactor::DeterministicReactor::new(std::time::SystemTime::UNIX_EPOCH, 1);
let mut store = BitmapStore::load_or_create_with_reactor(dir.clone(), &reactor).unwrap();
store.add_posting("idx_country", "US", 10).unwrap();
store.add_posting("idx_country", "US", 12).unwrap();
store.add_posting("idx_country", "GB", 11).unwrap();
store.flush_with_reactor(&reactor).unwrap();
let reloaded = BitmapStore::load_or_create_with_reactor(dir, &reactor).unwrap();
assert_eq!(reloaded.postings("idx_country", "US"), vec![10, 12]);
assert_eq!(reloaded.postings("idx_country", "GB"), vec![11]);
}
#[test]
fn postings_in_range_limit_respects_range_and_limit() {
let dir = temp_dir("bitmap_store_range_limit");
let reactor =
crate::core::reactor::DeterministicReactor::new(std::time::SystemTime::UNIX_EPOCH, 2);
let mut store = BitmapStore::load_or_create_with_reactor(dir, &reactor).unwrap();
for node_id in [5_u64, 10, 15, 20, 25, 30] {
store.add_posting("idx_country", "US", node_id).unwrap();
}
let bounded = store.postings_in_range_limit("idx_country", "US", 10, 30, Some(3));
assert_eq!(bounded, vec![10, 15, 20]);
let all_in_range = store.postings_in_range_limit("idx_country", "US", 10, 30, None);
assert_eq!(all_in_range, vec![10, 15, 20, 25]);
}
#[test]
fn remove_posting_updates_pending_and_persisted_state() {
let dir = temp_dir("bitmap_store_remove");
let reactor =
crate::core::reactor::DeterministicReactor::new(std::time::SystemTime::UNIX_EPOCH, 3);
let mut store = BitmapStore::load_or_create_with_reactor(dir.clone(), &reactor).unwrap();
store.add_posting("idx_country", "US", 10).unwrap();
store.add_posting("idx_country", "US", 12).unwrap();
store.flush_with_reactor(&reactor).unwrap();
store.remove_posting("idx_country", "US", 10).unwrap();
assert_eq!(store.postings("idx_country", "US"), vec![12]);
store.flush_with_reactor(&reactor).unwrap();
let reloaded = BitmapStore::load_or_create_with_reactor(dir, &reactor).unwrap();
assert_eq!(reloaded.postings("idx_country", "US"), vec![12]);
}
}