Skip to main content

reddb_server/storage/unified/store/
builder.rs

1use super::*;
2
3impl Default for UnifiedStore {
4    fn default() -> Self {
5        Self::new()
6    }
7}
8
9// Builder for creating entities with a fluent API
10pub struct EntityBuilder {
11    store: Arc<UnifiedStore>,
12    collection: String,
13    entity: UnifiedEntity,
14}
15
16impl EntityBuilder {
17    /// Start building an entity
18    pub fn new(
19        store: Arc<UnifiedStore>,
20        collection: impl Into<String>,
21        kind: EntityKind,
22        data: EntityData,
23    ) -> Self {
24        let collection_name = collection.into();
25        let _ = store.get_or_create_collection(&collection_name);
26        let id = store.next_entity_id();
27
28        Self {
29            store,
30            collection: collection_name,
31            entity: UnifiedEntity::new(id, kind, data),
32        }
33    }
34
35    /// Add metadata
36    pub fn metadata(self, key: impl Into<String>, value: MetadataValue) -> Self {
37        // Store metadata separately via manager after insert
38        self
39    }
40
41    /// Add an embedding
42    pub fn embedding(
43        mut self,
44        name: impl Into<String>,
45        vector: Vec<f32>,
46        model: impl Into<String>,
47    ) -> Self {
48        self.entity
49            .add_embedding(EmbeddingSlot::new(name, vector, model));
50        self
51    }
52
53    /// Add a cross-reference
54    pub fn cross_ref(
55        mut self,
56        target: EntityId,
57        target_collection: impl Into<String>,
58        ref_type: RefType,
59    ) -> Self {
60        self.entity.add_cross_ref(CrossRef::new(
61            self.entity.id,
62            target,
63            target_collection,
64            ref_type,
65        ));
66        self
67    }
68
69    /// Build and insert the entity
70    pub fn insert(self) -> Result<EntityId, StoreError> {
71        self.store.insert(&self.collection, self.entity)
72    }
73}
74
75#[cfg(test)]
76mod tests {
77    use super::*;
78    use crate::storage::schema::Value;
79    use std::collections::HashMap;
80    use std::path::PathBuf;
81
82    #[test]
83    fn test_store_basic() {
84        let store = UnifiedStore::new();
85        store.create_collection("hosts").unwrap();
86
87        let entity = UnifiedEntity::table_row(
88            store.next_entity_id(),
89            "hosts",
90            1,
91            vec![Value::text("192.168.1.1".to_string())],
92        );
93
94        let id = store.insert("hosts", entity).unwrap();
95        assert!(store.get("hosts", id).is_some());
96    }
97
98    #[test]
99    fn test_store_auto_create() {
100        let store = UnifiedStore::new();
101
102        let entity =
103            UnifiedEntity::vector(store.next_entity_id(), "embeddings", vec![0.1, 0.2, 0.3]);
104
105        let id = store.insert_auto("new_collection", entity).unwrap();
106        assert!(store.get("new_collection", id).is_some());
107    }
108
109    #[test]
110    fn test_cross_references() {
111        let store = UnifiedStore::new();
112
113        // Create hosts collection
114        let host_entity = UnifiedEntity::table_row(
115            store.next_entity_id(),
116            "hosts",
117            1,
118            vec![Value::text("192.168.1.1".to_string())],
119        );
120        let host_id = store.insert_auto("hosts", host_entity).unwrap();
121
122        // Create vulns collection
123        let vuln_entity = UnifiedEntity::table_row(
124            store.next_entity_id(),
125            "vulns",
126            1,
127            vec![Value::text("CVE-2024-1234".to_string())],
128        );
129        let vuln_id = store.insert_auto("vulns", vuln_entity).unwrap();
130
131        // Add cross-reference
132        store
133            .add_cross_ref("hosts", host_id, "vulns", vuln_id, RefType::RelatedTo, 1.0)
134            .unwrap();
135
136        // Verify forward reference
137        let refs = store.get_refs_from(host_id);
138        assert_eq!(refs.len(), 1);
139        assert_eq!(refs[0].0, vuln_id);
140
141        // Verify reverse reference
142        let back_refs = store.get_refs_to(vuln_id);
143        assert_eq!(back_refs.len(), 1);
144        assert_eq!(back_refs[0].0, host_id);
145    }
146
147    /// Pins issue #113: when a delete batch covers entities with no
148    /// inbound *or* outbound cross-refs, `unindex_cross_refs_batch` must
149    /// take the read-only fast path and skip the `reverse_refs` write
150    /// lock entirely. Observable via `unindex_cross_refs_fast_path_hits`.
151    #[test]
152    fn unindex_cross_refs_batch_takes_fast_path_when_no_inbound_refs() {
153        let store = UnifiedStore::new();
154
155        // Insert plain rows with zero cross-refs.
156        let mut ids = Vec::new();
157        for i in 0..16 {
158            let row = UnifiedEntity::table_row(
159                store.next_entity_id(),
160                "rows",
161                (i + 1) as u64,
162                vec![Value::text(format!("v{i}"))],
163            );
164            ids.push(store.insert_auto("rows", row).unwrap());
165        }
166
167        let before = store.unindex_cross_refs_fast_path_hits();
168        store.delete_batch("rows", &ids).unwrap();
169        let after = store.unindex_cross_refs_fast_path_hits();
170
171        // Exactly one fast-path hit recorded for the single batch call.
172        assert_eq!(
173            after - before,
174            1,
175            "expected unindex_cross_refs_batch to take the read-only fast path"
176        );
177
178        // And of course, the rows are gone.
179        for id in &ids {
180            assert!(store.get("rows", *id).is_none());
181        }
182    }
183
184    /// Inverse of the above: when at least one deleted entity *does*
185    /// have an inbound cross-ref, the slow path must run and the
186    /// fast-path counter must NOT advance.
187    #[test]
188    fn unindex_cross_refs_batch_uses_slow_path_when_inbound_refs_exist() {
189        let store = UnifiedStore::new();
190
191        let host = UnifiedEntity::table_row(
192            store.next_entity_id(),
193            "hosts",
194            1,
195            vec![Value::text("h".to_string())],
196        );
197        let host_id = store.insert_auto("hosts", host).unwrap();
198
199        let vuln = UnifiedEntity::table_row(
200            store.next_entity_id(),
201            "vulns",
202            1,
203            vec![Value::text("v".to_string())],
204        );
205        let vuln_id = store.insert_auto("vulns", vuln).unwrap();
206
207        store
208            .add_cross_ref("hosts", host_id, "vulns", vuln_id, RefType::RelatedTo, 1.0)
209            .unwrap();
210
211        let before = store.unindex_cross_refs_fast_path_hits();
212        // Deleting the *target* (vuln_id) — it has an inbound ref from host_id,
213        // so reverse_refs has it as a key and the slow path is mandatory.
214        store.delete_batch("vulns", &[vuln_id]).unwrap();
215        let after = store.unindex_cross_refs_fast_path_hits();
216
217        assert_eq!(
218            after, before,
219            "fast-path counter must not advance when inbound refs exist"
220        );
221
222        // Reverse ref is gone.
223        assert!(store.get_refs_to(vuln_id).is_empty());
224    }
225
226    /// Manual perf check for the #113 fast path. Pollutes `reverse_refs`
227    /// with a large dictionary of unrelated entries, then deletes a fresh
228    /// batch of cross-ref-free rows. The pre-fix code paid O(|reverse_refs|)
229    /// in `values_mut()` per call; the fast path makes it O(|batch|).
230    /// `cargo test --release -p reddb-server -- --ignored unindex_cross_refs_batch_fast_path_perf --nocapture`.
231    #[test]
232    #[ignore]
233    fn unindex_cross_refs_batch_fast_path_perf() {
234        let store = UnifiedStore::new();
235
236        // Seed a fat reverse_refs dictionary by creating cross-refs in
237        // a *separate* set of entities. These entries will be untouched
238        // by the delete batch below.
239        const SEED: usize = 50_000;
240        let mut sources = Vec::with_capacity(SEED);
241        let mut targets = Vec::with_capacity(SEED);
242        for i in 0..SEED {
243            let s = UnifiedEntity::table_row(store.next_entity_id(), "src", (i + 1) as u64, vec![]);
244            sources.push(store.insert_auto("src", s).unwrap());
245            let t = UnifiedEntity::table_row(store.next_entity_id(), "tgt", (i + 1) as u64, vec![]);
246            targets.push(store.insert_auto("tgt", t).unwrap());
247        }
248        for (s, t) in sources.iter().zip(targets.iter()) {
249            store
250                .add_cross_ref("src", *s, "tgt", *t, RefType::RelatedTo, 1.0)
251                .unwrap();
252        }
253
254        // Now insert a fresh batch of plain rows (no cross-refs at all).
255        const BATCH: usize = 100;
256        let mut victims = Vec::with_capacity(BATCH);
257        for i in 0..BATCH {
258            let row =
259                UnifiedEntity::table_row(store.next_entity_id(), "rows", (i + 1) as u64, vec![]);
260            victims.push(store.insert_auto("rows", row).unwrap());
261        }
262
263        let before_hits = store.unindex_cross_refs_fast_path_hits();
264        let start = std::time::Instant::now();
265        store.delete_batch("rows", &victims).unwrap();
266        let elapsed = start.elapsed();
267        let after_hits = store.unindex_cross_refs_fast_path_hits();
268
269        eprintln!(
270            "delete_batch({BATCH}) over {SEED} unrelated reverse-refs: \
271             {elapsed:?}, fast_path_hits +{}",
272            after_hits - before_hits,
273        );
274        assert_eq!(after_hits - before_hits, 1);
275    }
276
277    #[test]
278    fn test_expand_refs() {
279        let store = UnifiedStore::new();
280
281        // Create a chain: A → B → C
282        let _ = store.get_or_create_collection("test");
283
284        let a = UnifiedEntity::vector(store.next_entity_id(), "v", vec![0.1]);
285        let a_id = store.insert_auto("test", a).unwrap();
286
287        let b = UnifiedEntity::vector(store.next_entity_id(), "v", vec![0.2]);
288        let b_id = store.insert_auto("test", b).unwrap();
289
290        let c = UnifiedEntity::vector(store.next_entity_id(), "v", vec![0.3]);
291        let c_id = store.insert_auto("test", c).unwrap();
292
293        store
294            .add_cross_ref("test", a_id, "test", b_id, RefType::SimilarTo, 0.9)
295            .unwrap();
296        store
297            .add_cross_ref("test", b_id, "test", c_id, RefType::SimilarTo, 0.8)
298            .unwrap();
299
300        // Expand from A with depth 2
301        let expanded = store.expand_refs(a_id, 2, None);
302        assert_eq!(expanded.len(), 2); // Should find B and C
303    }
304
305    #[test]
306    fn test_query_all_collections() {
307        let store = UnifiedStore::new();
308
309        // Insert into multiple collections
310        store
311            .insert_auto(
312                "hosts",
313                UnifiedEntity::table_row(store.next_entity_id(), "hosts", 1, vec![]),
314            )
315            .unwrap();
316
317        store
318            .insert_auto(
319                "vulns",
320                UnifiedEntity::table_row(store.next_entity_id(), "vulns", 1, vec![]),
321            )
322            .unwrap();
323
324        // Query all
325        let results = store.query_all(|_| true);
326        assert_eq!(results.len(), 2);
327    }
328
329    #[test]
330    fn test_stats() {
331        let store = UnifiedStore::new();
332
333        let _ = store.get_or_create_collection("test");
334        for i in 0..5 {
335            store
336                .insert_auto(
337                    "test",
338                    UnifiedEntity::vector(store.next_entity_id(), "v", vec![i as f32]),
339                )
340                .unwrap();
341        }
342
343        let stats = store.stats();
344        assert_eq!(stats.collection_count, 1);
345        assert_eq!(stats.total_entities, 5);
346    }
347
348    struct FileGuard {
349        path: PathBuf,
350    }
351
352    impl Drop for FileGuard {
353        fn drop(&mut self) {
354            let _ = std::fs::remove_file(&self.path);
355        }
356    }
357
358    fn temp_path(name: &str) -> (FileGuard, PathBuf) {
359        let path =
360            std::env::temp_dir().join(format!("rb_store_{}_{}.rdb", name, std::process::id()));
361        let guard = FileGuard { path: path.clone() };
362        let _ = std::fs::remove_file(&path);
363        (guard, path)
364    }
365
366    #[test]
367    fn test_cross_refs_persist_file_mode() {
368        let (_guard, path) = temp_path("file");
369        let store = UnifiedStore::new();
370
371        let row = UnifiedEntity::table_row(
372            store.next_entity_id(),
373            "hosts",
374            1,
375            vec![Value::text("10.0.0.1".to_string())],
376        );
377        let row_id = store.insert_auto("hosts", row).unwrap();
378
379        let node =
380            UnifiedEntity::graph_node(store.next_entity_id(), "host", "asset", HashMap::new());
381        let node_id = store.insert_auto("graph", node).unwrap();
382
383        let vector =
384            UnifiedEntity::vector(store.next_entity_id(), "embeddings", vec![0.1, 0.2, 0.3]);
385        let vector_id = store.insert_auto("embeddings", vector).unwrap();
386
387        store
388            .add_cross_ref("hosts", row_id, "graph", node_id, RefType::RowToNode, 1.0)
389            .unwrap();
390        store
391            .add_cross_ref(
392                "graph",
393                node_id,
394                "embeddings",
395                vector_id,
396                RefType::NodeToVector,
397                1.0,
398            )
399            .unwrap();
400
401        store.save_to_file(&path).unwrap();
402
403        let loaded = UnifiedStore::load_from_file(&path).unwrap();
404        let refs = loaded.get_refs_from(row_id);
405        assert!(refs.iter().any(|(id, kind, coll)| {
406            *id == node_id && *kind == RefType::RowToNode && coll == "graph"
407        }));
408
409        let graph_refs = loaded.get_refs_from(node_id);
410        assert!(graph_refs.iter().any(|(id, kind, coll)| {
411            *id == vector_id && *kind == RefType::NodeToVector && coll == "embeddings"
412        }));
413
414        let expanded = loaded.expand_refs(row_id, 2, None);
415        assert!(expanded
416            .iter()
417            .any(|(entity, depth, _)| { entity.id == node_id && *depth == 1 }));
418        assert!(expanded
419            .iter()
420            .any(|(entity, depth, _)| { entity.id == vector_id && *depth == 2 }));
421    }
422
423    #[test]
424    fn test_cross_refs_persist_paged_mode() {
425        let (_guard, path) = temp_path("paged");
426        let store = UnifiedStore::open(&path).unwrap();
427
428        let row = UnifiedEntity::table_row(store.next_entity_id(), "hosts", 1, vec![]);
429        let row_id = store.insert_auto("hosts", row).unwrap();
430
431        let node =
432            UnifiedEntity::graph_node(store.next_entity_id(), "host", "asset", HashMap::new());
433        let node_id = store.insert_auto("graph", node).unwrap();
434
435        store
436            .add_cross_ref("hosts", row_id, "graph", node_id, RefType::RowToNode, 1.0)
437            .unwrap();
438
439        store.persist().unwrap();
440
441        drop(store);
442
443        let loaded = UnifiedStore::open(&path).unwrap();
444        let refs = loaded.get_refs_from(row_id);
445        assert!(refs.iter().any(|(id, kind, coll)| {
446            *id == node_id && *kind == RefType::RowToNode && coll == "graph"
447        }));
448    }
449
450    #[test]
451    fn test_paged_mode_survives_multiple_reopens() {
452        let (_guard, path) = temp_path("paged_multi_reopen");
453
454        let store = UnifiedStore::open(&path).unwrap();
455        store.set_config_tree(
456            "red.system",
457            &crate::json!({
458                "hostname": "test-host",
459                "arch": "x86_64",
460                "started_at": 123_u64
461            }),
462        );
463        let initial = store
464            .get_collection("red_config")
465            .map(|m| m.query_all(|_| true).len())
466            .unwrap_or(0);
467        assert!(initial >= 3);
468        drop(store);
469
470        let reopened = UnifiedStore::open(&path).unwrap();
471        let first_reopen = reopened
472            .get_collection("red_config")
473            .map(|m| m.query_all(|_| true).len())
474            .unwrap_or(0);
475        assert_eq!(first_reopen, initial);
476        drop(reopened);
477
478        let reopened_again = UnifiedStore::open(&path).unwrap();
479        let second_reopen = reopened_again
480            .get_collection("red_config")
481            .map(|m| m.query_all(|_| true).len())
482            .unwrap_or(0);
483        assert_eq!(second_reopen, initial);
484    }
485
486    #[test]
487    fn test_global_ids_unique_across_collections() {
488        let store = UnifiedStore::new();
489
490        let entity_a = UnifiedEntity::table_row(EntityId::new(0), "alpha", 1, vec![]);
491        let entity_b = UnifiedEntity::table_row(EntityId::new(0), "beta", 1, vec![]);
492
493        let id_a = store.insert_auto("alpha", entity_a).unwrap();
494        let id_b = store.insert_auto("beta", entity_b).unwrap();
495
496        assert_ne!(id_a, id_b);
497
498        store
499            .add_cross_ref("alpha", id_a, "beta", id_b, RefType::RelatedTo, 1.0)
500            .unwrap();
501
502        let expanded = store.expand_refs(id_a, 1, None);
503        assert!(expanded.iter().any(|(entity, _, _)| entity.id == id_b));
504    }
505}