use crate::datatypes::Value;
use crate::graph::schema::{InternedKey, StringInterner, TypeIdIndex};
use memmap2::Mmap;
use petgraph::graph::NodeIndex;
use std::collections::HashMap;
use std::path::Path;
use std::sync::{Arc, RwLock};
const MAGIC: &[u8; 8] = b"KGLIIDXR";
const VERSION: u32 = 1;
const HEADER_BYTES: usize = 32;
const DIR_ENTRY_BYTES: usize = 48;
pub struct IdIndexBase {
mmap: Arc<Mmap>,
dir: HashMap<String, BaseEntry>,
general_cache: RwLock<HashMap<String, Arc<HashMap<Value, NodeIndex>>>>,
}
#[derive(Clone, Copy)]
struct BaseEntry {
variant: u8,
num_entries: u32,
payload_off: u64,
payload_len: u64,
}
impl IdIndexBase {
pub fn load_from(dir: &Path, interner: &StringInterner) -> std::io::Result<Option<Self>> {
let path = dir.join("id_indices.bin");
if !path.exists() {
return Ok(None);
}
let file = std::fs::File::open(&path)?;
let len = file.metadata()?.len() as usize;
if len < HEADER_BYTES {
return Ok(None);
}
let mmap = unsafe { Mmap::map(&file)? };
if &mmap[..8] != MAGIC {
return Ok(None);
}
let version = u32::from_le_bytes(mmap[8..12].try_into().unwrap());
if version != VERSION {
return Ok(None);
}
let num_types = u32::from_le_bytes(mmap[12..16].try_into().unwrap()) as usize;
let dir_offset = u64::from_le_bytes(mmap[16..24].try_into().unwrap()) as usize;
let _data_offset = u64::from_le_bytes(mmap[24..32].try_into().unwrap()) as usize;
let need = dir_offset + DIR_ENTRY_BYTES * num_types;
if len < need {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"id_indices.bin truncated at directory",
));
}
let mut dir_map: HashMap<String, BaseEntry> = HashMap::with_capacity(num_types);
for i in 0..num_types {
let off = dir_offset + i * DIR_ENTRY_BYTES;
let type_key = u64::from_le_bytes(mmap[off..off + 8].try_into().unwrap());
let variant = mmap[off + 8];
let num_entries =
u64::from_le_bytes(mmap[off + 16..off + 24].try_into().unwrap()) as u32;
let payload_off = u64::from_le_bytes(mmap[off + 24..off + 32].try_into().unwrap());
let payload_len = u64::from_le_bytes(mmap[off + 32..off + 40].try_into().unwrap());
if let Some(name) = interner.try_resolve(InternedKey::from_u64(type_key)) {
dir_map.insert(
name.to_string(),
BaseEntry {
variant,
num_entries,
payload_off,
payload_len,
},
);
}
}
Ok(Some(Self {
mmap: Arc::new(mmap),
dir: dir_map,
general_cache: RwLock::new(HashMap::new()),
}))
}
pub fn contains(&self, name: &str) -> bool {
self.dir.contains_key(name)
}
pub fn lookup(&self, name: &str, id: &Value) -> Option<NodeIndex> {
let entry = self.dir.get(name)?;
match entry.variant {
0 => self.lookup_integer(entry, id),
1 => self.lookup_general(name, entry, id),
_ => None,
}
}
pub fn materialize(&self, name: &str) -> Option<TypeIdIndex> {
let entry = self.dir.get(name)?;
match entry.variant {
0 => {
let (keys, idxs) = self.integer_slices(entry)?;
let mut map: HashMap<u32, NodeIndex> = HashMap::with_capacity(keys.len());
for (k, v) in keys.iter().zip(idxs.iter()) {
map.insert(*k, NodeIndex::new(*v as usize));
}
Some(TypeIdIndex::Integer(map))
}
1 => {
let map = self.general_map(name, entry)?;
Some(TypeIdIndex::General((*map).clone()))
}
_ => None,
}
}
fn integer_slices(&self, entry: &BaseEntry) -> Option<(&[u32], &[u32])> {
let n = entry.num_entries as usize;
let off = entry.payload_off as usize;
let half = n * 4;
if entry.payload_len < (half * 2) as u64 {
return None;
}
let bytes = self.mmap.get(off..off + half * 2)?;
unsafe {
let keys_ptr = bytes.as_ptr() as *const u32;
let idxs_ptr = bytes.as_ptr().add(half) as *const u32;
Some((
std::slice::from_raw_parts(keys_ptr, n),
std::slice::from_raw_parts(idxs_ptr, n),
))
}
}
fn lookup_integer(&self, entry: &BaseEntry, id: &Value) -> Option<NodeIndex> {
let key_u32 = coerce_to_u32(id)?;
let (keys, idxs) = self.integer_slices(entry)?;
keys.binary_search(&key_u32)
.ok()
.map(|i| NodeIndex::new(idxs[i] as usize))
}
fn lookup_general(&self, name: &str, entry: &BaseEntry, id: &Value) -> Option<NodeIndex> {
let map = self.general_map(name, entry)?;
if let Some(&idx) = map.get(id) {
return Some(idx);
}
match id {
Value::Int64(i) => {
if *i >= 0 && *i <= u32::MAX as i64 {
return map.get(&Value::UniqueId(*i as u32)).copied();
}
None
}
Value::UniqueId(u) => map.get(&Value::Int64(*u as i64)).copied(),
Value::Float64(f) => {
if f.fract() == 0.0 {
let i = *f as i64;
if let Some(&idx) = map.get(&Value::Int64(i)) {
return Some(idx);
}
if i >= 0 && i <= u32::MAX as i64 {
return map.get(&Value::UniqueId(i as u32)).copied();
}
}
None
}
_ => None,
}
}
fn general_map(&self, name: &str, entry: &BaseEntry) -> Option<Arc<HashMap<Value, NodeIndex>>> {
if let Some(arc) = self.general_cache.read().unwrap().get(name).cloned() {
return Some(arc);
}
let off = entry.payload_off as usize;
let len = entry.payload_len as usize;
let blob = self.mmap.get(off..off + len)?;
let map: HashMap<Value, NodeIndex> = bincode::deserialize(blob).ok()?;
let arc = Arc::new(map);
self.general_cache
.write()
.unwrap()
.insert(name.to_string(), Arc::clone(&arc));
Some(arc)
}
}
#[derive(Default)]
pub struct IdIndexStore {
overlay: RwLock<HashMap<String, TypeIdIndex>>,
removed: std::collections::HashSet<String>,
base: Option<Arc<IdIndexBase>>,
}
impl Clone for IdIndexStore {
fn clone(&self) -> Self {
Self {
overlay: RwLock::new(self.overlay.read().unwrap().clone()),
removed: self.removed.clone(),
base: self.base.clone(),
}
}
}
impl IdIndexStore {
pub fn new() -> Self {
Self::default()
}
pub fn from_base(base: IdIndexBase) -> Self {
Self {
overlay: RwLock::new(HashMap::new()),
removed: std::collections::HashSet::new(),
base: Some(Arc::new(base)),
}
}
pub fn contains_key(&self, name: &str) -> bool {
if self.overlay.read().unwrap().contains_key(name) {
return true;
}
if self.removed.contains(name) {
return false;
}
self.base.as_ref().is_some_and(|b| b.contains(name))
}
pub fn lookup_or_build(
&self,
name: &str,
id: &Value,
build: impl FnOnce() -> TypeIdIndex,
) -> Option<NodeIndex> {
{
let ov = self.overlay.read().unwrap();
if let Some(idx) = ov.get(name) {
return idx.get(id);
}
}
if !self.removed.contains(name) {
if let Some(base) = self.base.as_deref() {
if base.contains(name) {
return base.lookup(name, id);
}
}
}
let built = build();
let mut ov = self.overlay.write().unwrap();
ov.entry(name.to_string()).or_insert(built).get(id)
}
pub fn lookup(&self, name: &str, id: &Value) -> Option<NodeIndex> {
{
let ov = self.overlay.read().unwrap();
if let Some(idx) = ov.get(name) {
return idx.get(id);
}
}
if self.removed.contains(name) {
return None;
}
self.base.as_deref().and_then(|b| {
if b.contains(name) {
b.lookup(name, id)
} else {
None
}
})
}
pub fn materialize_type(&self, name: &str) -> Option<HashMap<Value, NodeIndex>> {
{
let ov = self.overlay.read().unwrap();
if let Some(idx) = ov.get(name) {
return Some(idx.iter().collect());
}
}
if self.removed.contains(name) {
return None;
}
let base = self.base.as_deref()?;
if base.contains(name) {
base.materialize(name).map(|ti| ti.iter().collect())
} else {
None
}
}
pub fn insert(&mut self, name: String, idx: TypeIdIndex) {
self.removed.remove(&name);
self.overlay.get_mut().unwrap().insert(name, idx);
}
pub fn remove(&mut self, name: &str) -> Option<TypeIdIndex> {
let prev = self.overlay.get_mut().unwrap().remove(name);
if self.base.as_ref().is_some_and(|b| b.contains(name)) {
self.removed.insert(name.to_string());
}
prev
}
pub fn clear(&mut self) {
self.overlay.get_mut().unwrap().clear();
if let Some(base) = &self.base {
self.removed.extend(base.dir.keys().cloned());
}
}
pub fn len(&self) -> usize {
let overlay = self.overlay.read().unwrap();
let base_count = self
.base
.as_ref()
.map(|b| b.dir.keys().filter(|k| !self.removed.contains(*k)).count())
.unwrap_or(0);
let overlay_only = overlay
.keys()
.filter(|k| self.base.as_ref().map(|b| !b.contains(k)).unwrap_or(true))
.count();
base_count + overlay_only
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn values(&self) -> Vec<TypeIdIndex> {
self.snapshot().into_iter().map(|(_, v)| v).collect()
}
pub fn iter(&self) -> Vec<(String, TypeIdIndex)> {
self.snapshot()
}
fn snapshot(&self) -> Vec<(String, TypeIdIndex)> {
let overlay = self.overlay.read().unwrap();
let mut out: Vec<(String, TypeIdIndex)> = overlay
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
if let Some(base) = self.base.as_deref() {
for k in base.dir.keys() {
if !overlay.contains_key(k.as_str()) && !self.removed.contains(k.as_str()) {
if let Some(materialized) = base.materialize(k) {
out.push((k.clone(), materialized));
}
}
}
}
out
}
pub fn entry_or_default(&mut self, name: String) -> &mut TypeIdIndex {
let needs_materialize = {
let overlay = self.overlay.get_mut().unwrap();
!overlay.contains_key(&name) && !self.removed.contains(&name)
};
if needs_materialize {
if let Some(base) = self.base.as_deref() {
if let Some(materialized) = base.materialize(&name) {
self.overlay
.get_mut()
.unwrap()
.insert(name.clone(), materialized);
}
}
}
self.removed.remove(&name);
self.overlay.get_mut().unwrap().entry(name).or_default()
}
pub fn replace_with(&mut self, map: HashMap<String, TypeIdIndex>) {
*self.overlay.get_mut().unwrap() = map;
self.removed.clear();
self.base = None;
}
}
fn coerce_to_u32(id: &Value) -> Option<u32> {
match id {
Value::UniqueId(u) => Some(*u),
Value::Int64(i) => {
if *i >= 0 && *i <= u32::MAX as i64 {
Some(*i as u32)
} else {
None
}
}
Value::Float64(f) => {
if f.fract() == 0.0 {
let i = *f as i64;
if i >= 0 && i <= u32::MAX as i64 {
Some(i as u32)
} else {
None
}
} else {
None
}
}
_ => None,
}
}
pub fn write_id_indices_bin(
dir: &Path,
store: &IdIndexStore,
interner: &StringInterner,
) -> Result<(), String> {
let mut entries: Vec<(u64, String, TypeIdIndex)> = Vec::new();
let mut interner_clone = interner.clone();
for (name, materialized) in store.iter() {
let key = interner_clone.get_or_intern(&name).as_u64();
entries.push((key, name, materialized));
}
entries.sort_by_key(|(k, _, _)| *k);
let num_types = entries.len();
let header_size = HEADER_BYTES;
let dir_size = DIR_ENTRY_BYTES * num_types;
let data_offset = header_size + dir_size;
struct Plan {
type_key: u64,
variant: u8,
num_entries: u64,
payload_off: u64,
payload_len: u64,
data: Vec<u8>,
}
let mut plans: Vec<Plan> = Vec::with_capacity(num_types);
let mut cursor = data_offset as u64;
for (type_key, _name, idx) in &entries {
match idx {
TypeIdIndex::Integer(map) => {
let mut pairs: Vec<(u32, u32)> =
map.iter().map(|(k, v)| (*k, v.index() as u32)).collect();
pairs.sort_by_key(|(k, _)| *k);
let n = pairs.len();
let mut data = Vec::with_capacity(n * 8);
for (k, _) in &pairs {
data.extend_from_slice(&k.to_le_bytes());
}
for (_, v) in &pairs {
data.extend_from_slice(&v.to_le_bytes());
}
let len = data.len() as u64;
plans.push(Plan {
type_key: *type_key,
variant: 0,
num_entries: n as u64,
payload_off: cursor,
payload_len: len,
data,
});
cursor += len;
}
TypeIdIndex::General(map) => {
let blob = bincode::serialize(map)
.map_err(|e| format!("id_indices General-variant bincode failed: {}", e))?;
let len = blob.len() as u64;
plans.push(Plan {
type_key: *type_key,
variant: 1,
num_entries: map.len() as u64,
payload_off: cursor,
payload_len: len,
data: blob,
});
cursor += len;
}
}
}
let total = cursor as usize;
let mut out = Vec::with_capacity(total);
out.extend_from_slice(MAGIC);
out.extend_from_slice(&VERSION.to_le_bytes());
out.extend_from_slice(&(num_types as u32).to_le_bytes());
out.extend_from_slice(&(HEADER_BYTES as u64).to_le_bytes());
out.extend_from_slice(&(data_offset as u64).to_le_bytes());
for plan in &plans {
out.extend_from_slice(&plan.type_key.to_le_bytes());
out.push(plan.variant);
out.extend_from_slice(&[0u8; 7]);
out.extend_from_slice(&plan.num_entries.to_le_bytes());
out.extend_from_slice(&plan.payload_off.to_le_bytes());
out.extend_from_slice(&plan.payload_len.to_le_bytes());
out.extend_from_slice(&[0u8; 8]);
}
for plan in plans {
out.extend_from_slice(&plan.data);
}
debug_assert_eq!(out.len(), total);
std::fs::write(dir.join("id_indices.bin"), out)
.map_err(|e| format!("Failed to write id_indices.bin: {}", e))?;
Ok(())
}