use crate::graph::schema::InternedKey;
use crate::graph::storage::mapped::mmap_vec::{MmapBytes, MmapOrVec};
use petgraph::graph::NodeIndex;
use std::fs;
use std::path::{Path, PathBuf};
const FILE_PREFIX: &str = "property_index_";
const GLOBAL_PREFIX: &str = "global_index_";
pub struct PropertyIndex {
keys: MmapBytes,
offsets: MmapOrVec<u64>,
ids: MmapOrVec<u32>,
count: usize,
}
fn sanitise(s: &str) -> String {
s.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '_' || c == '-' {
c
} else {
'_'
}
})
.collect()
}
pub fn file_paths(
data_dir: &Path,
node_type: &str,
property: &str,
) -> (PathBuf, PathBuf, PathBuf, PathBuf) {
let stem = format!(
"{}{}_{}",
FILE_PREFIX,
sanitise(node_type),
sanitise(property)
);
(
data_dir.join(format!("{}_meta.bin", stem)),
data_dir.join(format!("{}_keys.bin", stem)),
data_dir.join(format!("{}_offsets.bin", stem)),
data_dir.join(format!("{}_ids.bin", stem)),
)
}
pub fn global_file_paths(data_dir: &Path, property: &str) -> (PathBuf, PathBuf, PathBuf, PathBuf) {
let stem = format!("{}{}", GLOBAL_PREFIX, sanitise(property));
(
data_dir.join(format!("{}_meta.bin", stem)),
data_dir.join(format!("{}_keys.bin", stem)),
data_dir.join(format!("{}_offsets.bin", stem)),
data_dir.join(format!("{}_ids.bin", stem)),
)
}
pub fn scan_data_dir(data_dir: &Path) -> Vec<(String, String)> {
let Ok(entries) = fs::read_dir(data_dir) else {
return Vec::new();
};
let mut out = Vec::new();
for entry in entries.flatten() {
let Some(name) = entry.file_name().to_str().map(str::to_owned) else {
continue;
};
if let Some(stem) = name.strip_prefix(FILE_PREFIX) {
if let Some(stem) = stem.strip_suffix("_meta.bin") {
if let Some(sep) = stem.rfind('_') {
let (ty, prop) = stem.split_at(sep);
let prop = &prop[1..];
if !ty.is_empty() && !prop.is_empty() {
out.push((ty.to_string(), prop.to_string()));
}
}
}
}
}
out
}
pub fn scan_segment_hashes(segment_dir: &Path) -> Vec<(u64, u64)> {
scan_data_dir(segment_dir)
.into_iter()
.map(|(t, p)| {
(
InternedKey::from_str(&t).as_u64(),
InternedKey::from_str(&p).as_u64(),
)
})
.collect()
}
fn write_meta(path: &Path, count: usize, keys_len: usize) -> std::io::Result<()> {
use std::io::Write;
let mut f = fs::File::create(path)?;
f.write_all(&(count as u64).to_le_bytes())?;
f.write_all(&(keys_len as u64).to_le_bytes())?;
Ok(())
}
fn read_meta(path: &Path) -> std::io::Result<(usize, usize)> {
let bytes = fs::read(path)?;
if bytes.len() < 16 {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"meta file too small",
));
}
let count = u64::from_le_bytes(bytes[0..8].try_into().unwrap()) as usize;
let keys_len = u64::from_le_bytes(bytes[8..16].try_into().unwrap()) as usize;
Ok((count, keys_len))
}
impl PropertyIndex {
#[allow(dead_code)] pub fn len(&self) -> usize {
self.count
}
fn key_at(&self, i: usize) -> &[u8] {
let start = self.offsets.get(i) as usize;
let end = self.offsets.get(i + 1) as usize;
self.keys.slice(start, end)
}
fn lower_bound(&self, target: &[u8]) -> usize {
let (mut lo, mut hi) = (0usize, self.count);
while lo < hi {
let mid = lo + (hi - lo) / 2;
if self.key_at(mid) < target {
lo = mid + 1;
} else {
hi = mid;
}
}
lo
}
pub fn lookup_eq_str(&self, value: &str) -> Vec<NodeIndex> {
let target = value.as_bytes();
let start = self.lower_bound(target);
if start >= self.count || self.key_at(start) != target {
return Vec::new();
}
let mut out = Vec::new();
let mut i = start;
while i < self.count && self.key_at(i) == target {
out.push(NodeIndex::new(self.ids.get(i) as usize));
i += 1;
}
out
}
pub fn lookup_prefix_str(&self, prefix: &str, limit: usize) -> Vec<NodeIndex> {
if limit == 0 {
return Vec::new();
}
let target = prefix.as_bytes();
let start = self.lower_bound(target);
let mut out = Vec::with_capacity(limit.min(16));
let mut i = start;
while i < self.count && out.len() < limit {
let key = self.key_at(i);
if !key.starts_with(target) {
break;
}
out.push(NodeIndex::new(self.ids.get(i) as usize));
i += 1;
}
out
}
pub fn build(
data_dir: &Path,
node_type: &str,
property: &str,
entries: Vec<(String, u32)>,
) -> std::io::Result<Self> {
let paths = file_paths(data_dir, node_type, property);
Self::build_at(&paths.0, &paths.1, &paths.2, &paths.3, entries)
}
pub fn build_global(
data_dir: &Path,
property: &str,
entries: Vec<(String, u32)>,
) -> std::io::Result<Self> {
let paths = global_file_paths(data_dir, property);
Self::build_at(&paths.0, &paths.1, &paths.2, &paths.3, entries)
}
fn build_at(
meta_path: &Path,
keys_path: &Path,
offsets_path: &Path,
ids_path: &Path,
mut entries: Vec<(String, u32)>,
) -> std::io::Result<Self> {
entries.sort_by(|a, b| a.0.cmp(&b.0).then_with(|| a.1.cmp(&b.1)));
let total_bytes: usize = entries.iter().map(|(k, _)| k.len()).sum();
let count = entries.len();
let mut keys_buf = MmapBytes::mapped(keys_path, total_bytes.max(4096))
.unwrap_or_else(|_| MmapBytes::new());
let mut offsets_buf = MmapOrVec::<u64>::mapped(offsets_path, count + 1)
.unwrap_or_else(|_| MmapOrVec::with_capacity(count + 1));
let mut ids_buf = MmapOrVec::<u32>::mapped(ids_path, count.max(1))
.unwrap_or_else(|_| MmapOrVec::with_capacity(count));
let mut offset: u64 = 0;
for (key, id) in &entries {
offsets_buf.push(offset);
keys_buf.extend(key.as_bytes());
ids_buf.push(*id);
offset += key.len() as u64;
}
offsets_buf.push(offset);
write_meta(meta_path, count, total_bytes)?;
Ok(PropertyIndex {
keys: keys_buf,
offsets: offsets_buf,
ids: ids_buf,
count,
})
}
pub fn open(data_dir: &Path, node_type: &str, property: &str) -> std::io::Result<Option<Self>> {
let paths = file_paths(data_dir, node_type, property);
Self::open_at(&paths.0, &paths.1, &paths.2, &paths.3)
}
pub fn open_global(data_dir: &Path, property: &str) -> std::io::Result<Option<Self>> {
let paths = global_file_paths(data_dir, property);
Self::open_at(&paths.0, &paths.1, &paths.2, &paths.3)
}
fn open_at(
meta_path: &Path,
keys_path: &Path,
offsets_path: &Path,
ids_path: &Path,
) -> std::io::Result<Option<Self>> {
if !meta_path.exists()
|| !keys_path.exists()
|| !offsets_path.exists()
|| !ids_path.exists()
{
return Ok(None);
}
let (count, keys_len) = read_meta(meta_path)?;
let keys = MmapBytes::load_mapped(keys_path, keys_len)?;
let offsets = MmapOrVec::<u64>::load_mapped(offsets_path, count + 1)?;
let ids = MmapOrVec::<u32>::load_mapped(ids_path, count)?;
Ok(Some(PropertyIndex {
keys,
offsets,
ids,
count,
}))
}
#[allow(dead_code)] pub fn remove_files(data_dir: &Path, node_type: &str, property: &str) -> std::io::Result<()> {
let paths = file_paths(data_dir, node_type, property);
for p in [&paths.0, &paths.1, &paths.2, &paths.3] {
if p.exists() {
fs::remove_file(p)?;
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn tmp_dir() -> PathBuf {
let p = std::env::temp_dir().join(format!(
"kglite_prop_idx_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
));
fs::create_dir_all(&p).unwrap();
p
}
#[test]
fn equality_lookup_finds_single_match() {
let dir = tmp_dir();
let idx = PropertyIndex::build(
&dir,
"Human",
"label",
vec![
("Alice".into(), 1),
("Bob".into(), 2),
("Charlie".into(), 3),
],
)
.unwrap();
assert_eq!(idx.lookup_eq_str("Bob"), vec![NodeIndex::new(2)]);
assert_eq!(idx.lookup_eq_str("Alice"), vec![NodeIndex::new(1)]);
assert!(idx.lookup_eq_str("Missing").is_empty());
}
#[test]
fn equality_lookup_returns_all_duplicates() {
let dir = tmp_dir();
let idx = PropertyIndex::build(
&dir,
"Human",
"label",
vec![
("Alice".into(), 1),
("Alice".into(), 7),
("Alice".into(), 4),
("Bob".into(), 2),
],
)
.unwrap();
let hits = idx.lookup_eq_str("Alice");
assert_eq!(
hits,
vec![NodeIndex::new(1), NodeIndex::new(4), NodeIndex::new(7)]
);
}
#[test]
fn prefix_lookup_respects_limit_and_sort_order() {
let dir = tmp_dir();
let idx = PropertyIndex::build(
&dir,
"Human",
"label",
vec![
("Oslo".into(), 10),
("Ottawa".into(), 11),
("Oxford".into(), 12),
("Paris".into(), 13),
],
)
.unwrap();
let hits = idx.lookup_prefix_str("O", 10);
assert_eq!(
hits,
vec![NodeIndex::new(10), NodeIndex::new(11), NodeIndex::new(12)]
);
let hits = idx.lookup_prefix_str("O", 2);
assert_eq!(hits.len(), 2);
assert_eq!(hits[0], NodeIndex::new(10));
}
#[test]
fn persistence_roundtrip_via_open() {
let dir = tmp_dir();
{
let _ = PropertyIndex::build(
&dir,
"Human",
"label",
vec![("Alice".into(), 1), ("Bob".into(), 2)],
)
.unwrap();
} let reopened = PropertyIndex::open(&dir, "Human", "label")
.unwrap()
.expect("index files should be present");
assert_eq!(reopened.lookup_eq_str("Bob"), vec![NodeIndex::new(2)]);
assert_eq!(reopened.len(), 2);
}
#[test]
fn scan_data_dir_discovers_built_indexes() {
let dir = tmp_dir();
let _ = PropertyIndex::build(&dir, "Human", "label", vec![("A".into(), 1)]).unwrap();
let _ = PropertyIndex::build(&dir, "Paper", "title", vec![("Z".into(), 2)]).unwrap();
let mut pairs = scan_data_dir(&dir);
pairs.sort();
assert_eq!(
pairs,
vec![
("Human".to_string(), "label".to_string()),
("Paper".to_string(), "title".to_string()),
]
);
}
#[test]
fn remove_files_cleans_up() {
let dir = tmp_dir();
let _ = PropertyIndex::build(&dir, "Human", "label", vec![("A".into(), 1)]).unwrap();
PropertyIndex::remove_files(&dir, "Human", "label").unwrap();
assert!(PropertyIndex::open(&dir, "Human", "label")
.unwrap()
.is_none());
}
#[test]
fn empty_index_lookup_returns_empty() {
let dir = tmp_dir();
let idx = PropertyIndex::build(&dir, "Human", "label", Vec::new()).unwrap();
assert!(idx.lookup_eq_str("anything").is_empty());
assert!(idx.lookup_prefix_str("x", 10).is_empty());
}
#[test]
fn scan_segment_hashes_returns_hashed_pairs() {
let tmp = tempfile::TempDir::new().unwrap();
let dir = tmp.path();
let _ = PropertyIndex::build(dir, "Human", "label", vec![("A".into(), 1)]).unwrap();
let _ = PropertyIndex::build(dir, "Paper", "title", vec![("Z".into(), 2)]).unwrap();
let mut pairs = scan_segment_hashes(dir);
pairs.sort();
let mut expected = vec![
(
InternedKey::from_str("Human").as_u64(),
InternedKey::from_str("label").as_u64(),
),
(
InternedKey::from_str("Paper").as_u64(),
InternedKey::from_str("title").as_u64(),
),
];
expected.sort();
assert_eq!(pairs, expected);
}
}