use std::collections::{HashMap, HashSet};
use std::sync::RwLock;
use super::LabelId;
use crate::storage::index::{BloomSegment, HasBloom, IndexBase, IndexKind, IndexStats};
use crate::storage::primitives::BloomFilter;
pub struct NodeSecondaryIndex {
by_type: RwLock<HashMap<LabelId, HashSet<String>>>,
by_label: RwLock<HashMap<String, HashSet<String>>>,
label_bloom: RwLock<BloomSegment>,
entry_count: RwLock<usize>,
}
impl NodeSecondaryIndex {
pub fn new(expected_labels: usize) -> Self {
Self {
by_type: RwLock::new(HashMap::new()),
by_label: RwLock::new(HashMap::new()),
label_bloom: RwLock::new(BloomSegment::with_capacity(expected_labels.max(1024))),
entry_count: RwLock::new(0),
}
}
pub fn insert(&self, node_id: &str, label_id: LabelId, label: &str) {
let mut delta = 0usize;
if let Ok(mut by_type) = self.by_type.write() {
if by_type
.entry(label_id)
.or_default()
.insert(node_id.to_string())
{
delta += 1;
}
}
if let Ok(mut by_label) = self.by_label.write() {
if by_label
.entry(label.to_string())
.or_default()
.insert(node_id.to_string())
{
delta += 1;
}
}
if let Ok(mut bloom) = self.label_bloom.write() {
bloom.insert(label.as_bytes());
}
if delta > 0 {
if let Ok(mut c) = self.entry_count.write() {
*c = c.saturating_add(delta);
}
}
}
pub fn remove(&self, node_id: &str, label_id: LabelId, label: &str) {
let mut delta = 0usize;
if let Ok(mut by_type) = self.by_type.write() {
if let Some(set) = by_type.get_mut(&label_id) {
if set.remove(node_id) {
delta += 1;
}
if set.is_empty() {
by_type.remove(&label_id);
}
}
}
if let Ok(mut by_label) = self.by_label.write() {
if let Some(set) = by_label.get_mut(label) {
if set.remove(node_id) {
delta += 1;
}
if set.is_empty() {
by_label.remove(label);
}
}
}
if delta > 0 {
if let Ok(mut c) = self.entry_count.write() {
*c = c.saturating_sub(delta);
}
}
}
pub fn nodes_by_type(&self, label_id: LabelId) -> Vec<String> {
self.by_type
.read()
.map(|map| {
map.get(&label_id)
.map(|set| set.iter().cloned().collect())
.unwrap_or_default()
})
.unwrap_or_default()
}
pub fn nodes_by_label(&self, label: &str) -> Vec<String> {
if let Ok(bloom) = self.label_bloom.read() {
if bloom.definitely_absent(label.as_bytes()) {
return Vec::new();
}
}
self.by_label
.read()
.map(|map| {
map.get(label)
.map(|set| set.iter().cloned().collect())
.unwrap_or_default()
})
.unwrap_or_default()
}
pub fn count_by_type(&self, label_id: LabelId) -> usize {
self.by_type
.read()
.map(|m| m.get(&label_id).map(|s| s.len()).unwrap_or(0))
.unwrap_or(0)
}
pub fn label_id_counts(&self) -> Vec<(LabelId, u64)> {
self.by_type
.read()
.map(|map| {
map.iter()
.map(|(id, set)| (*id, set.len() as u64))
.collect()
})
.unwrap_or_default()
}
pub fn distinct_labels(&self) -> usize {
self.by_label.read().map(|m| m.len()).unwrap_or(0)
}
pub fn distinct_types(&self) -> usize {
self.by_type.read().map(|m| m.len()).unwrap_or(0)
}
pub fn clear(&self) {
if let Ok(mut m) = self.by_type.write() {
m.clear();
}
if let Ok(mut m) = self.by_label.write() {
m.clear();
}
if let Ok(mut b) = self.label_bloom.write() {
*b = BloomSegment::with_capacity(1024);
}
if let Ok(mut c) = self.entry_count.write() {
*c = 0;
}
}
}
impl Default for NodeSecondaryIndex {
fn default() -> Self {
Self::new(1024)
}
}
impl HasBloom for NodeSecondaryIndex {
fn bloom_segment(&self) -> Option<&BloomSegment> {
None
}
fn definitely_absent(&self, key: &[u8]) -> bool {
self.label_bloom
.read()
.map(|b| b.definitely_absent(key))
.unwrap_or(false)
}
}
impl NodeSecondaryIndex {
pub fn may_contain_label(&self, label: &str) -> bool {
!HasBloom::definitely_absent(self, label.as_bytes())
}
}
impl IndexBase for NodeSecondaryIndex {
fn name(&self) -> &str {
"graph.node_secondary"
}
fn kind(&self) -> IndexKind {
IndexKind::Inverted
}
fn stats(&self) -> IndexStats {
let entries = self.entry_count.read().map(|c| *c).unwrap_or(0);
let distinct_keys = self.distinct_labels() + self.distinct_types();
IndexStats {
entries,
distinct_keys,
approx_bytes: 0,
kind: IndexKind::Inverted,
has_bloom: true,
index_correlation: 0.0,
}
}
fn bloom(&self) -> Option<&BloomFilter> {
None
}
fn definitely_absent(&self, key_bytes: &[u8]) -> bool {
<Self as HasBloom>::definitely_absent(self, key_bytes)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn insert_and_lookup_by_type() {
let idx = NodeSecondaryIndex::new(64);
idx.insert("host:1", LabelId::new(1), "Web Server");
idx.insert("host:2", LabelId::new(1), "DB Server");
idx.insert("svc:1", LabelId::new(2), "HTTP");
let hosts = idx.nodes_by_type(LabelId::new(1));
assert_eq!(hosts.len(), 2);
assert!(hosts.contains(&"host:1".to_string()));
assert!(hosts.contains(&"host:2".to_string()));
let services = idx.nodes_by_type(LabelId::new(2));
assert_eq!(services, vec!["svc:1".to_string()]);
assert!(idx.nodes_by_type(LabelId::new(4)).is_empty());
}
#[test]
fn lookup_by_label() {
let idx = NodeSecondaryIndex::new(64);
idx.insert("host:1", LabelId::new(1), "Web Server");
idx.insert("host:2", LabelId::new(1), "Web Server");
idx.insert("host:3", LabelId::new(1), "DB Server");
let web = idx.nodes_by_label("Web Server");
assert_eq!(web.len(), 2);
let db = idx.nodes_by_label("DB Server");
assert_eq!(db, vec!["host:3".to_string()]);
}
#[test]
fn bloom_prunes_absent_label() {
let idx = NodeSecondaryIndex::new(64);
idx.insert("host:1", LabelId::new(1), "Web Server");
assert!(idx.may_contain_label("Web Server"));
assert!(idx.nodes_by_label("DefinitelyNotThere").is_empty());
}
#[test]
fn remove_shrinks_buckets() {
let idx = NodeSecondaryIndex::new(64);
idx.insert("host:1", LabelId::new(1), "A");
idx.insert("host:2", LabelId::new(1), "A");
idx.remove("host:1", LabelId::new(1), "A");
let remaining = idx.nodes_by_label("A");
assert_eq!(remaining, vec!["host:2".to_string()]);
assert_eq!(idx.count_by_type(LabelId::new(1)), 1);
idx.remove("host:2", LabelId::new(1), "A");
assert!(idx.nodes_by_label("A").is_empty());
assert_eq!(idx.count_by_type(LabelId::new(1)), 0);
}
#[test]
fn clear_resets_everything() {
let idx = NodeSecondaryIndex::new(64);
idx.insert("a", LabelId::new(1), "L");
idx.clear();
assert_eq!(idx.count_by_type(LabelId::new(1)), 0);
assert!(idx.nodes_by_label("L").is_empty());
}
#[test]
fn stats_reflect_insertions() {
let idx = NodeSecondaryIndex::new(64);
idx.insert("a", LabelId::new(1), "x");
idx.insert("b", LabelId::new(2), "y");
let s = idx.stats();
assert_eq!(s.entries, 4);
assert!(s.has_bloom);
assert_eq!(s.kind, IndexKind::Inverted);
}
#[test]
fn concurrent_inserts_are_consistent() {
use std::sync::Arc;
use std::thread;
let idx = Arc::new(NodeSecondaryIndex::new(1024));
let mut handles = vec![];
for t in 0..4 {
let idx_c = Arc::clone(&idx);
handles.push(thread::spawn(move || {
for i in 0..100 {
let id = format!("node:{}:{}", t, i);
idx_c.insert(&id, LabelId::new(1), "bulk");
}
}));
}
for h in handles {
h.join().unwrap();
}
assert_eq!(idx.count_by_type(LabelId::new(1)), 400);
assert_eq!(idx.nodes_by_label("bulk").len(), 400);
}
}