reddb_server/storage/unified/index/
manager_impl.rs1use super::*;
2
3impl IntegratedIndexManager {
4 pub fn new() -> Self {
6 Self::with_config(IntegratedIndexConfig::default())
7 }
8
9 pub fn with_config(config: IntegratedIndexConfig) -> Self {
11 let now = SystemTime::now()
12 .duration_since(UNIX_EPOCH)
13 .map(|d| d.as_secs())
14 .unwrap_or(0);
15
16 let mut status = HashMap::new();
17
18 if config.enable_hnsw {
20 status.insert((IndexType::Hnsw, None), IndexStatus::Ready);
21 } else {
22 status.insert((IndexType::Hnsw, None), IndexStatus::Disabled);
23 }
24 if config.enable_fulltext {
25 status.insert((IndexType::Fulltext, None), IndexStatus::Ready);
26 } else {
27 status.insert((IndexType::Fulltext, None), IndexStatus::Disabled);
28 }
29 if config.enable_metadata {
30 status.insert((IndexType::Metadata, None), IndexStatus::Ready);
31 } else {
32 status.insert((IndexType::Metadata, None), IndexStatus::Disabled);
33 }
34 if config.enable_graph {
35 status.insert((IndexType::Graph, None), IndexStatus::Ready);
36 } else {
37 status.insert((IndexType::Graph, None), IndexStatus::Disabled);
38 }
39
40 Self {
41 config,
42 text_index: InvertedIndex::new(),
43 metadata_index: RwLock::new(MetadataStorage::new()),
44 hnsw_indices: RwLock::new(HashMap::new()),
45 graph_index: GraphAdjacencyIndex::new(),
46 index_status: RwLock::new(status),
47 event_history: RwLock::new(Vec::new()),
48 created_at: now,
49 }
50 }
51
52 pub fn index_vector(&self, collection: &str, id: EntityId, vector: &[f32]) {
54 if !self.config.enable_hnsw {
55 return;
56 }
57
58 {
59 let mut indices = self.hnsw_indices.write();
60 let info = indices
61 .entry(collection.to_string())
62 .or_insert_with(|| HnswIndexInfo {
63 dimension: vector.len(),
64 vectors: HashMap::new(),
65 entry_point: None,
66 });
67
68 if info.dimension != vector.len() && !info.vectors.is_empty() {
70 return; }
72
73 info.vectors.insert(id, vector.to_vec());
74 if info.entry_point.is_none() {
75 info.entry_point = Some(id);
76 }
77 }
78 }
79
80 pub fn search_similar(
82 &self,
83 collection: &str,
84 query: &[f32],
85 k: usize,
86 ) -> Vec<VectorSearchResult> {
87 let indices = self.hnsw_indices.read();
88
89 let info = match indices.get(collection) {
90 Some(i) => i,
91 None => return Vec::new(),
92 };
93
94 if query.len() != info.dimension {
95 return Vec::new();
96 }
97
98 let mut results: Vec<VectorSearchResult> = info
100 .vectors
101 .iter()
102 .map(|(id, vec)| {
103 let similarity = cosine_similarity(query, vec);
104 VectorSearchResult {
105 entity_id: *id,
106 collection: collection.to_string(),
107 similarity,
108 }
109 })
110 .collect();
111
112 results.sort_by(|a, b| {
113 b.similarity
114 .partial_cmp(&a.similarity)
115 .unwrap_or(std::cmp::Ordering::Equal)
116 .then_with(|| a.entity_id.cmp(&b.entity_id))
117 });
118 results.truncate(k);
119 results
120 }
121
122 pub fn index_text(&self, collection: &str, id: EntityId, field: &str, content: &str) {
124 if !self.config.enable_fulltext {
125 return;
126 }
127 self.text_index
128 .index_document(collection, id, field, content);
129 }
130
131 pub fn search_text(&self, query: &str, limit: usize) -> Vec<TextSearchResult> {
133 self.text_index.search(query, limit)
134 }
135
136 pub fn autocomplete(&self, prefix: &str, limit: usize) -> Vec<String> {
138 self.text_index.search_prefix(prefix, limit)
139 }
140
141 pub fn index_metadata(&self, _collection: &str, id: EntityId, metadata: &Metadata) {
143 if !self.config.enable_metadata {
144 return;
145 }
146 {
148 let mut storage = self.metadata_index.write();
149 for (key, value) in &metadata.fields {
150 storage.set(id, key.clone(), value.clone());
151 }
152 }
153 }
154
155 pub fn query_metadata(&self, key: &str, filter: MetadataQueryFilter) -> Vec<EntityId> {
157 let storage = self.metadata_index.read();
158
159 match filter {
160 MetadataQueryFilter::Equals(ref value) => storage.filter_eq(key, value),
161 MetadataQueryFilter::Range { min, max } => {
162 let min_int = min.as_ref().and_then(|v| {
164 if let MetadataValue::Int(n) = v {
165 Some(*n)
166 } else {
167 None
168 }
169 });
170 let max_int = max.as_ref().and_then(|v| {
171 if let MetadataValue::Int(n) = v {
172 Some(*n)
173 } else {
174 None
175 }
176 });
177 if min_int.is_some() || max_int.is_some() {
178 return storage.filter_int_range(key, min_int, max_int);
179 }
180 Vec::new()
181 }
182 MetadataQueryFilter::Contains(ref substring) => {
183 storage.filter_string_prefix(key, substring)
184 }
185 MetadataQueryFilter::In(ref values) => {
186 let mut result = Vec::new();
188 for value in values {
189 result.extend(storage.filter_eq(key, value));
190 }
191 result.sort();
192 result.dedup();
193 result
194 }
195 }
196 }
197
198 pub fn remove_entity(&self, id: EntityId) {
200 self.text_index.remove_document(id);
202
203 {
205 let mut indices = self.hnsw_indices.write();
206 for info in indices.values_mut() {
207 info.vectors.remove(&id);
208 }
209 }
210
211 self.graph_index.remove_edge(id);
213
214 }
216
217 pub fn index_edge(
223 &self,
224 edge_id: EntityId,
225 source_id: EntityId,
226 target_id: EntityId,
227 label: &str,
228 weight: f32,
229 ) {
230 if !self.config.enable_graph {
231 return;
232 }
233 self.graph_index
234 .index_edge(edge_id, source_id, target_id, label, weight);
235 }
236
237 pub fn get_neighbors(
239 &self,
240 node_id: EntityId,
241 direction: EdgeDirection,
242 label_filter: Option<&str>,
243 ) -> Vec<AdjacencyEntry> {
244 self.graph_index
245 .get_neighbors(node_id, direction, label_filter)
246 }
247
248 pub fn get_edges_by_label(&self, label: &str) -> Vec<EntityId> {
250 self.graph_index.get_edges_by_label(label)
251 }
252
253 pub fn node_degree(&self, node_id: EntityId, direction: EdgeDirection) -> usize {
255 match direction {
256 EdgeDirection::Outgoing => self.graph_index.out_degree(node_id),
257 EdgeDirection::Incoming => self.graph_index.in_degree(node_id),
258 EdgeDirection::Both => self.graph_index.degree(node_id),
259 }
260 }
261
262 pub fn graph_index(&self) -> &GraphAdjacencyIndex {
264 &self.graph_index
265 }
266
267 pub fn create_index(
273 &self,
274 index_type: IndexType,
275 collection: Option<&str>,
276 ) -> Result<(), String> {
277 let key = (index_type, collection.map(|s| s.to_string()));
278
279 {
281 let status = self.index_status.read();
282 if let Some(IndexStatus::Ready) = status.get(&key) {
283 return Err(format!("Index {:?} already exists", index_type));
284 }
285 }
286
287 self.index_status
289 .write()
290 .insert(key.clone(), IndexStatus::Building { progress: 0.0 });
291
292 self.index_status
294 .write()
295 .insert(key.clone(), IndexStatus::Ready);
296
297 self.record_event(IndexEvent {
299 index_type,
300 collection: collection.map(|s| s.to_string()),
301 event: IndexEventKind::Created,
302 timestamp: Self::now(),
303 });
304
305 Ok(())
306 }
307
308 pub fn drop_index(
310 &self,
311 index_type: IndexType,
312 collection: Option<&str>,
313 ) -> Result<(), String> {
314 let key = (index_type, collection.map(|s| s.to_string()));
315
316 match index_type {
318 IndexType::Hnsw => {
319 if let Some(coll) = collection {
320 self.hnsw_indices.write().remove(coll);
321 } else {
322 self.hnsw_indices.write().clear();
323 }
324 }
325 IndexType::Graph => {
326 self.graph_index.clear();
327 }
328 _ => {}
330 }
331
332 self.index_status.write().remove(&key);
334
335 self.record_event(IndexEvent {
337 index_type,
338 collection: collection.map(|s| s.to_string()),
339 event: IndexEventKind::Dropped,
340 timestamp: Self::now(),
341 });
342
343 Ok(())
344 }
345
346 pub fn rebuild_index(
348 &self,
349 index_type: IndexType,
350 collection: Option<&str>,
351 ) -> Result<(), String> {
352 let key = (index_type, collection.map(|s| s.to_string()));
353
354 self.index_status
356 .write()
357 .insert(key.clone(), IndexStatus::Building { progress: 0.0 });
358
359 match index_type {
361 IndexType::Hnsw => {
362 if let Some(coll) = collection {
363 let mut indices = self.hnsw_indices.write();
364 if let Some(info) = indices.get_mut(coll) {
365 info.vectors.clear();
366 info.entry_point = None;
367 }
368 }
369 }
370 IndexType::Graph => {
371 self.graph_index.clear();
372 }
373 _ => {}
374 }
375
376 self.index_status
378 .write()
379 .insert(key.clone(), IndexStatus::Ready);
380
381 self.record_event(IndexEvent {
383 index_type,
384 collection: collection.map(|s| s.to_string()),
385 event: IndexEventKind::Rebuilt,
386 timestamp: Self::now(),
387 });
388
389 Ok(())
390 }
391
392 pub fn index_status(&self, index_type: IndexType, collection: Option<&str>) -> IndexStatus {
394 let key = (index_type, collection.map(|s| s.to_string()));
395 self.index_status
396 .read()
397 .get(&key)
398 .cloned()
399 .unwrap_or(IndexStatus::Disabled)
400 }
401
402 pub fn all_index_statuses(&self) -> HashMap<(IndexType, Option<String>), IndexStatus> {
404 self.index_status.read().clone()
405 }
406
407 pub fn event_history(&self) -> Vec<IndexEvent> {
409 self.event_history.read().clone()
410 }
411
412 pub fn stats(&self) -> IndexStats {
418 let now = Self::now();
419
420 let vector_count = self
421 .hnsw_indices
422 .read()
423 .values()
424 .map(|info| info.vectors.len())
425 .sum();
426
427 let (document_count, term_count) = {
428 let i = self.text_index.index.read();
429 let terms = i.len();
430 let docs: HashSet<EntityId> = i
431 .values()
432 .flat_map(|postings| postings.iter().map(|p| p.entity_id))
433 .collect();
434 (docs.len(), terms)
435 };
436
437 IndexStats {
438 vector_count,
439 document_count,
440 term_count,
441 metadata_entries: 0, graph_node_count: self.graph_index.node_count(),
443 graph_edge_count: self.graph_index.edge_count(),
444 created_at: self.created_at,
445 updated_at: now,
446 }
447 }
448
449 pub fn config(&self) -> &IntegratedIndexConfig {
451 &self.config
452 }
453
454 fn record_event(&self, event: IndexEvent) {
459 let mut history = self.event_history.write();
460 history.push(event);
461 if history.len() > 1000 {
463 history.drain(0..100);
464 }
465 }
466
467 fn now() -> u64 {
468 SystemTime::now()
469 .duration_since(UNIX_EPOCH)
470 .map(|d| d.as_secs())
471 .unwrap_or(0)
472 }
473}