use std::collections::{BTreeMap, HashMap, HashSet};
use std::time::{SystemTime, UNIX_EPOCH};
use parking_lot::RwLock;
use super::entity::EntityId;
use super::metadata::{Metadata, MetadataStorage, MetadataValue};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum EdgeDirection {
Outgoing,
Incoming,
Both,
}
#[derive(Debug, Clone)]
pub struct AdjacencyEntry {
pub edge_id: EntityId,
pub neighbor_id: EntityId,
pub label: String,
pub weight: f32,
}
pub struct GraphAdjacencyIndex {
outgoing: RwLock<HashMap<EntityId, Vec<AdjacencyEntry>>>,
incoming: RwLock<HashMap<EntityId, Vec<AdjacencyEntry>>>,
by_label: RwLock<HashMap<String, HashSet<EntityId>>>,
edge_count: RwLock<usize>,
node_count: RwLock<usize>,
}
impl GraphAdjacencyIndex {
pub fn new() -> Self {
Self {
outgoing: RwLock::new(HashMap::new()),
incoming: RwLock::new(HashMap::new()),
by_label: RwLock::new(HashMap::new()),
edge_count: RwLock::new(0),
node_count: RwLock::new(0),
}
}
pub fn index_edge(
&self,
edge_id: EntityId,
source_id: EntityId,
target_id: EntityId,
label: &str,
weight: f32,
) {
{
let mut outgoing = self.outgoing.write();
let entry = AdjacencyEntry {
edge_id,
neighbor_id: target_id,
label: label.to_string(),
weight,
};
outgoing.entry(source_id).or_default().push(entry);
}
{
let mut incoming = self.incoming.write();
let entry = AdjacencyEntry {
edge_id,
neighbor_id: source_id,
label: label.to_string(),
weight,
};
incoming.entry(target_id).or_default().push(entry);
}
{
let mut by_label = self.by_label.write();
by_label
.entry(label.to_string())
.or_default()
.insert(edge_id);
}
{
let mut count = self.edge_count.write();
*count += 1;
}
self.update_node_count();
}
pub fn remove_edge(&self, edge_id: EntityId) {
{
let mut outgoing = self.outgoing.write();
for entries in outgoing.values_mut() {
entries.retain(|e| e.edge_id != edge_id);
}
}
{
let mut incoming = self.incoming.write();
for entries in incoming.values_mut() {
entries.retain(|e| e.edge_id != edge_id);
}
}
{
let mut by_label = self.by_label.write();
for edges in by_label.values_mut() {
edges.remove(&edge_id);
}
}
{
let mut count = self.edge_count.write();
*count = count.saturating_sub(1);
}
}
pub fn get_neighbors(
&self,
node_id: EntityId,
direction: EdgeDirection,
label_filter: Option<&str>,
) -> Vec<AdjacencyEntry> {
let mut results = Vec::new();
if matches!(direction, EdgeDirection::Outgoing | EdgeDirection::Both) {
let outgoing = self.outgoing.read();
if let Some(entries) = outgoing.get(&node_id) {
for entry in entries {
if label_filter.is_none_or(|l| entry.label == l) {
results.push(entry.clone());
}
}
}
}
if matches!(direction, EdgeDirection::Incoming | EdgeDirection::Both) {
let incoming = self.incoming.read();
if let Some(entries) = incoming.get(&node_id) {
for entry in entries {
if label_filter.is_none_or(|l| entry.label == l) {
results.push(entry.clone());
}
}
}
}
results
}
pub fn get_edges_by_label(&self, label: &str) -> Vec<EntityId> {
let idx = self.by_label.read();
idx.get(label)
.map(|s| s.iter().copied().collect())
.unwrap_or_default()
}
pub fn out_degree(&self, node_id: EntityId) -> usize {
self.outgoing
.read()
.get(&node_id)
.map(|v| v.len())
.unwrap_or(0)
}
pub fn in_degree(&self, node_id: EntityId) -> usize {
self.incoming
.read()
.get(&node_id)
.map(|v| v.len())
.unwrap_or(0)
}
pub fn degree(&self, node_id: EntityId) -> usize {
self.out_degree(node_id) + self.in_degree(node_id)
}
pub fn edge_count(&self) -> usize {
*self.edge_count.read()
}
pub fn node_count(&self) -> usize {
*self.node_count.read()
}
pub fn clear(&self) {
self.outgoing.write().clear();
self.incoming.write().clear();
self.by_label.write().clear();
*self.edge_count.write() = 0;
*self.node_count.write() = 0;
}
fn update_node_count(&self) {
let out_nodes: HashSet<_> = self.outgoing.read().keys().copied().collect();
let in_nodes: HashSet<_> = self.incoming.read().keys().copied().collect();
let total: HashSet<_> = out_nodes.union(&in_nodes).collect();
*self.node_count.write() = total.len();
}
}
impl Default for GraphAdjacencyIndex {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct TokenPosition {
pub entity_id: EntityId,
pub field: String,
pub position: u32,
}
#[derive(Debug, Clone)]
pub struct PostingEntry {
pub entity_id: EntityId,
pub collection: String,
pub field: String,
pub positions: Vec<u32>,
pub term_frequency: f32,
}
pub struct InvertedIndex {
index: RwLock<BTreeMap<String, Vec<PostingEntry>>>,
doc_count: RwLock<usize>,
indexed_fields: RwLock<HashMap<String, HashSet<String>>>,
}
impl InvertedIndex {
pub fn new() -> Self {
Self {
index: RwLock::new(BTreeMap::new()),
doc_count: RwLock::new(0),
indexed_fields: RwLock::new(HashMap::new()),
}
}
pub fn add_indexed_field(&self, collection: &str, field: &str) {
self.indexed_fields
.write()
.entry(collection.to_string())
.or_default()
.insert(field.to_string());
}
pub fn index_document(
&self,
collection: &str,
entity_id: EntityId,
field: &str,
content: &str,
) {
let tokens = self.tokenize(content);
let term_count = tokens.len() as f32;
let mut term_freqs: HashMap<String, Vec<u32>> = HashMap::new();
for (position, token) in tokens.iter().enumerate() {
term_freqs
.entry(token.clone())
.or_default()
.push(position as u32);
}
{
let mut index = self.index.write();
for (term, positions) in term_freqs {
let tf = positions.len() as f32 / term_count.max(1.0);
let entry = PostingEntry {
entity_id,
collection: collection.to_string(),
field: field.to_string(),
positions,
term_frequency: tf,
};
index.entry(term).or_default().push(entry);
}
}
*self.doc_count.write() += 1;
}
pub fn remove_document(&self, entity_id: EntityId) {
let mut index = self.index.write();
for postings in index.values_mut() {
postings.retain(|p| p.entity_id != entity_id);
}
}
pub fn search(&self, query: &str, limit: usize) -> Vec<TextSearchResult> {
let terms = self.tokenize(query);
if terms.is_empty() {
return Vec::new();
}
let index = self.index.read();
let doc_count = *self.doc_count.read();
let mut term_postings: Vec<&Vec<PostingEntry>> = Vec::new();
for term in &terms {
if let Some(postings) = index.get(term) {
term_postings.push(postings);
} else {
return Vec::new();
}
}
let mut scores: HashMap<EntityId, f32> = HashMap::new();
if let Some(first_postings) = term_postings.first() {
for posting in *first_postings {
let idf = ((doc_count as f32) / (first_postings.len() as f32 + 1.0)).ln();
scores.insert(posting.entity_id, posting.term_frequency * idf);
}
}
for postings in term_postings.iter().skip(1) {
let idf = ((doc_count as f32) / (postings.len() as f32 + 1.0)).ln();
let entities_in_term: HashSet<EntityId> =
postings.iter().map(|p| p.entity_id).collect();
scores.retain(|id, _| entities_in_term.contains(id));
for posting in *postings {
if let Some(score) = scores.get_mut(&posting.entity_id) {
*score += posting.term_frequency * idf;
}
}
}
let mut results: Vec<TextSearchResult> = scores
.into_iter()
.map(|(entity_id, score)| {
let collection = term_postings
.first()
.and_then(|p| p.iter().find(|e| e.entity_id == entity_id))
.map(|p| p.collection.clone())
.unwrap_or_default();
TextSearchResult {
entity_id,
collection,
score,
matched_terms: terms.clone(),
}
})
.collect();
results.sort_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
results.truncate(limit);
results
}
pub fn search_prefix(&self, prefix: &str, limit: usize) -> Vec<String> {
let prefix_lower = prefix.to_lowercase();
let index = self.index.read();
index
.range(prefix_lower.clone()..)
.take_while(|(term, _)| term.starts_with(&prefix_lower))
.take(limit)
.map(|(term, _)| term.clone())
.collect()
}
fn tokenize(&self, text: &str) -> Vec<String> {
text.to_lowercase()
.split(|c: char| !c.is_alphanumeric())
.filter(|s| s.len() >= 2)
.map(|s| s.to_string())
.collect()
}
}
impl Default for InvertedIndex {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct TextSearchResult {
pub entity_id: EntityId,
pub collection: String,
pub score: f32,
pub matched_terms: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct IntegratedIndexConfig {
pub enable_hnsw: bool,
pub enable_fulltext: bool,
pub enable_metadata: bool,
pub enable_graph: bool,
pub hnsw_m: usize,
pub hnsw_ef_construction: usize,
pub hnsw_ef_search: usize,
}
impl Default for IntegratedIndexConfig {
fn default() -> Self {
Self {
enable_hnsw: true,
enable_fulltext: true,
enable_metadata: true,
enable_graph: true,
hnsw_m: 16,
hnsw_ef_construction: 100,
hnsw_ef_search: 50,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct IndexStats {
pub vector_count: usize,
pub document_count: usize,
pub term_count: usize,
pub metadata_entries: usize,
pub graph_node_count: usize,
pub graph_edge_count: usize,
pub created_at: u64,
pub updated_at: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum IndexType {
Hnsw,
Fulltext,
Metadata,
Graph,
}
#[derive(Debug, Clone)]
pub enum IndexStatus {
Ready,
Building { progress: f32 },
Stale,
Disabled,
Error(String),
}
#[derive(Debug, Clone)]
pub struct IndexEvent {
pub index_type: IndexType,
pub collection: Option<String>,
pub event: IndexEventKind,
pub timestamp: u64,
}
#[derive(Debug, Clone)]
pub enum IndexEventKind {
Created,
Dropped,
Rebuilt,
Updated { entries_affected: usize },
}
pub struct IntegratedIndexManager {
config: IntegratedIndexConfig,
text_index: InvertedIndex,
metadata_index: RwLock<MetadataStorage>,
hnsw_indices: RwLock<HashMap<String, HnswIndexInfo>>,
graph_index: GraphAdjacencyIndex,
index_status: RwLock<HashMap<(IndexType, Option<String>), IndexStatus>>,
event_history: RwLock<Vec<IndexEvent>>,
created_at: u64,
}
struct HnswIndexInfo {
dimension: usize,
vectors: HashMap<EntityId, Vec<f32>>,
entry_point: Option<EntityId>,
}
pub mod incremental;
mod manager_impl;
pub use incremental::{IncrementalIndexMaintainer, IndexDeltaOp, SecondaryIndexHandle};
impl Default for IntegratedIndexManager {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct VectorSearchResult {
pub entity_id: EntityId,
pub collection: String,
pub similarity: f32,
}
#[derive(Debug, Clone)]
pub enum MetadataQueryFilter {
Equals(MetadataValue),
Range {
min: Option<MetadataValue>,
max: Option<MetadataValue>,
},
Contains(String),
In(Vec<MetadataValue>),
}
fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
if a.len() != b.len() || a.is_empty() {
return 0.0;
}
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm_a == 0.0 || norm_b == 0.0 {
return 0.0;
}
dot / (norm_a * norm_b)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_inverted_index_basic() {
let index = InvertedIndex::new();
index.index_document(
"docs",
EntityId(1),
"content",
"hello world rust programming",
);
index.index_document("docs", EntityId(2), "content", "rust is fast and safe");
index.index_document("docs", EntityId(3), "content", "python is easy to learn");
let results = index.search("rust", 10);
assert_eq!(results.len(), 2);
assert!(results.iter().any(|r| r.entity_id == EntityId(1)));
assert!(results.iter().any(|r| r.entity_id == EntityId(2)));
}
#[test]
fn test_inverted_index_and_query() {
let index = InvertedIndex::new();
index.index_document("docs", EntityId(1), "content", "rust programming language");
index.index_document("docs", EntityId(2), "content", "rust systems programming");
index.index_document(
"docs",
EntityId(3),
"content",
"python programming language",
);
let results = index.search("rust programming", 10);
assert_eq!(results.len(), 2);
let results = index.search("language programming", 10);
assert_eq!(results.len(), 2);
}
#[test]
fn test_prefix_search() {
let index = InvertedIndex::new();
index.index_document("docs", EntityId(1), "content", "programming rust rustacean");
let suggestions = index.search_prefix("rust", 10);
assert!(suggestions.contains(&"rust".to_string()));
assert!(suggestions.contains(&"rustacean".to_string()));
}
#[test]
fn test_vector_search() {
let manager = IntegratedIndexManager::new();
manager.index_vector("embeddings", EntityId(1), &[1.0, 0.0, 0.0]);
manager.index_vector("embeddings", EntityId(2), &[0.9, 0.1, 0.0]);
manager.index_vector("embeddings", EntityId(3), &[0.0, 1.0, 0.0]);
let results = manager.search_similar("embeddings", &[1.0, 0.0, 0.0], 2);
assert_eq!(results.len(), 2);
assert_eq!(results[0].entity_id, EntityId(1));
assert!(results[0].similarity > 0.99);
}
#[test]
fn test_cosine_similarity() {
let a = [1.0, 0.0, 0.0];
let b = [1.0, 0.0, 0.0];
assert!((cosine_similarity(&a, &b) - 1.0).abs() < 0.001);
let c = [0.0, 1.0, 0.0];
assert!(cosine_similarity(&a, &c).abs() < 0.001);
}
#[test]
fn test_graph_adjacency_basic() {
let index = GraphAdjacencyIndex::new();
index.index_edge(EntityId(100), EntityId(1), EntityId(2), "KNOWS", 1.0);
index.index_edge(EntityId(101), EntityId(2), EntityId(3), "KNOWS", 1.0);
let neighbors = index.get_neighbors(EntityId(1), EdgeDirection::Outgoing, None);
assert_eq!(neighbors.len(), 1);
assert_eq!(neighbors[0].neighbor_id, EntityId(2));
let neighbors = index.get_neighbors(EntityId(2), EdgeDirection::Incoming, None);
assert_eq!(neighbors.len(), 1);
assert_eq!(neighbors[0].neighbor_id, EntityId(1));
let neighbors = index.get_neighbors(EntityId(2), EdgeDirection::Both, None);
assert_eq!(neighbors.len(), 2);
}
#[test]
fn test_graph_adjacency_label_filter() {
let index = GraphAdjacencyIndex::new();
index.index_edge(EntityId(100), EntityId(1), EntityId(2), "KNOWS", 1.0);
index.index_edge(EntityId(101), EntityId(1), EntityId(3), "WORKS_WITH", 1.0);
index.index_edge(EntityId(102), EntityId(1), EntityId(4), "KNOWS", 1.0);
let neighbors = index.get_neighbors(EntityId(1), EdgeDirection::Outgoing, Some("KNOWS"));
assert_eq!(neighbors.len(), 2);
let neighbors =
index.get_neighbors(EntityId(1), EdgeDirection::Outgoing, Some("WORKS_WITH"));
assert_eq!(neighbors.len(), 1);
assert_eq!(neighbors[0].neighbor_id, EntityId(3));
}
#[test]
fn test_graph_adjacency_degree() {
let index = GraphAdjacencyIndex::new();
index.index_edge(EntityId(100), EntityId(1), EntityId(2), "LINK", 1.0);
index.index_edge(EntityId(101), EntityId(1), EntityId(3), "LINK", 1.0);
index.index_edge(EntityId(102), EntityId(1), EntityId(4), "LINK", 1.0);
index.index_edge(EntityId(103), EntityId(1), EntityId(5), "LINK", 1.0);
assert_eq!(index.out_degree(EntityId(1)), 4);
assert_eq!(index.in_degree(EntityId(1)), 0);
assert_eq!(index.degree(EntityId(1)), 4);
assert_eq!(index.in_degree(EntityId(2)), 1);
assert_eq!(index.out_degree(EntityId(2)), 0);
}
#[test]
fn test_graph_adjacency_edge_by_label() {
let index = GraphAdjacencyIndex::new();
index.index_edge(EntityId(100), EntityId(1), EntityId(2), "A", 1.0);
index.index_edge(EntityId(101), EntityId(2), EntityId(3), "B", 1.0);
index.index_edge(EntityId(102), EntityId(3), EntityId(4), "A", 1.0);
let edges_a = index.get_edges_by_label("A");
assert_eq!(edges_a.len(), 2);
assert!(edges_a.contains(&EntityId(100)));
assert!(edges_a.contains(&EntityId(102)));
let edges_b = index.get_edges_by_label("B");
assert_eq!(edges_b.len(), 1);
assert!(edges_b.contains(&EntityId(101)));
}
#[test]
fn test_graph_adjacency_remove() {
let index = GraphAdjacencyIndex::new();
index.index_edge(EntityId(100), EntityId(1), EntityId(2), "LINK", 1.0);
index.index_edge(EntityId(101), EntityId(1), EntityId(3), "LINK", 1.0);
assert_eq!(index.edge_count(), 2);
index.remove_edge(EntityId(100));
assert_eq!(index.edge_count(), 1);
let neighbors = index.get_neighbors(EntityId(1), EdgeDirection::Outgoing, None);
assert_eq!(neighbors.len(), 1);
assert_eq!(neighbors[0].neighbor_id, EntityId(3));
}
#[test]
fn test_index_lifecycle_create_drop() {
let manager = IntegratedIndexManager::new();
let result = manager.create_index(IndexType::Hnsw, Some("my_collection"));
assert!(result.is_ok());
let status = manager.index_status(IndexType::Hnsw, Some("my_collection"));
assert!(matches!(status, IndexStatus::Ready));
let result = manager.drop_index(IndexType::Hnsw, Some("my_collection"));
assert!(result.is_ok());
}
#[test]
fn test_index_lifecycle_rebuild() {
let manager = IntegratedIndexManager::new();
manager.index_vector("test", EntityId(1), &[1.0, 0.0, 0.0]);
manager.index_vector("test", EntityId(2), &[0.0, 1.0, 0.0]);
let result = manager.rebuild_index(IndexType::Hnsw, Some("test"));
assert!(result.is_ok());
let status = manager.index_status(IndexType::Hnsw, Some("test"));
assert!(matches!(status, IndexStatus::Ready));
}
#[test]
fn test_index_stats_with_graph() {
let manager = IntegratedIndexManager::new();
manager.index_edge(EntityId(100), EntityId(1), EntityId(2), "LINK", 1.0);
manager.index_edge(EntityId(101), EntityId(2), EntityId(3), "LINK", 1.0);
let stats = manager.stats();
assert_eq!(stats.graph_edge_count, 2);
assert!(stats.graph_node_count >= 2); }
#[test]
fn test_integrated_manager_graph_operations() {
let manager = IntegratedIndexManager::new();
manager.index_edge(EntityId(100), EntityId(1), EntityId(2), "KNOWS", 1.0);
manager.index_edge(EntityId(101), EntityId(2), EntityId(3), "KNOWS", 0.5);
let neighbors = manager.get_neighbors(EntityId(1), EdgeDirection::Outgoing, None);
assert_eq!(neighbors.len(), 1);
assert_eq!(neighbors[0].neighbor_id, EntityId(2));
assert_eq!(neighbors[0].weight, 1.0);
assert_eq!(manager.node_degree(EntityId(2), EdgeDirection::Both), 2);
}
}