Skip to main content

haystack_core/graph/
entity_graph.rs

1// EntityGraph — in-memory entity store with bitmap indexing and ref adjacency.
2
3use std::collections::HashMap;
4
5use indexmap::IndexMap;
6use parking_lot::Mutex;
7use rayon::prelude::*;
8
9use crate::data::{HCol, HDict, HGrid};
10use crate::filter::{FilterNode, matches_with_ns, parse_filter};
11use crate::kinds::{HRef, Kind};
12use crate::ontology::{DefNamespace, ValidationIssue};
13
14use super::adjacency::RefAdjacency;
15use super::bitmap::TagBitmapIndex;
16use super::changelog::{ChangelogGap, DiffOp, GraphDiff};
17use super::columnar::ColumnarStore;
18use super::csr::CsrAdjacency;
19use super::query_planner;
20use super::value_index::ValueIndex;
21
22/// Errors returned by EntityGraph operations.
23#[derive(Debug, thiserror::Error)]
24pub enum GraphError {
25    #[error("entity missing 'id' tag")]
26    MissingId,
27    #[error("entity id must be a Ref")]
28    InvalidId,
29    #[error("entity already exists: {0}")]
30    DuplicateRef(String),
31    #[error("entity not found: {0}")]
32    NotFound(String),
33    #[error("filter error: {0}")]
34    Filter(String),
35    #[error("entity ID space exhausted (max {MAX_ENTITY_ID})")]
36    IdExhausted,
37}
38
39/// Maximum entity ID — constrained by RoaringBitmap (u32) and snapshot format.
40const MAX_ENTITY_ID: usize = u32::MAX as usize;
41
42/// Core entity graph with bitmap tag indexing and bidirectional ref adjacency.
43pub struct EntityGraph {
44    /// ref_val -> entity dict
45    entities: HashMap<String, HDict>,
46    /// ref_val -> internal numeric id (for bitmap indexing)
47    id_map: HashMap<String, usize>,
48    /// internal numeric id -> ref_val
49    reverse_id: HashMap<usize, String>,
50    /// Next internal id to assign.
51    next_id: usize,
52    /// Freelist of recycled entity IDs from removed entities.
53    free_ids: Vec<usize>,
54    /// Tag bitmap index for fast has/missing queries.
55    tag_index: TagBitmapIndex,
56    /// Bidirectional ref adjacency for graph traversal.
57    adjacency: RefAdjacency,
58    /// Optional ontology namespace for spec-aware operations.
59    namespace: Option<DefNamespace>,
60    /// Monotonic version counter, incremented on every mutation.
61    version: u64,
62    /// Ordered list of mutations.
63    changelog: std::collections::VecDeque<GraphDiff>,
64    /// Maximum number of changelog entries retained.
65    changelog_capacity: usize,
66    /// Lowest version still present in the changelog (0 = no evictions yet).
67    floor_version: u64,
68    /// LRU query cache: (filter, version) → matching ref_vals.
69    /// Uses Mutex for interior mutability since read_all takes &self.
70    query_cache: Mutex<QueryCache>,
71    /// Parsed filter AST cache: filter_string → AST (version-independent).
72    ast_cache: Mutex<HashMap<String, FilterNode>>,
73    /// Optional B-Tree value indexes for comparison-based filter acceleration.
74    value_index: ValueIndex,
75    /// CSR snapshot of adjacency for cache-friendly traversal.
76    /// Rebuilt lazily via `rebuild_csr()`. `None` until first build.
77    csr: Option<CsrAdjacency>,
78    /// Version at which the CSR was last rebuilt (for staleness detection).
79    csr_version: u64,
80    /// Patch buffer for incremental CSR updates.
81    csr_patch: super::csr::CsrPatch,
82    /// Columnar storage for cache-friendly single-tag scans.
83    columnar: ColumnarStore,
84    /// WL structural fingerprint index.
85    structural: super::structural::StructuralIndex,
86}
87
88/// Fixed-capacity LRU cache for filter query results using IndexMap for O(1) ops.
89struct QueryCache {
90    /// (filter, version) → matching ref_vals. Most-recently-used at the back.
91    entries: IndexMap<(String, u64), Vec<String>>,
92    capacity: usize,
93}
94
95impl QueryCache {
96    fn new(capacity: usize) -> Self {
97        Self {
98            entries: IndexMap::with_capacity(capacity),
99            capacity,
100        }
101    }
102
103    fn get(&mut self, filter: &str, version: u64) -> Option<&[String]> {
104        // Move to back (most recently used) on access.
105        let key = (filter.to_string(), version);
106        let idx = self.entries.get_index_of(&key)?;
107        self.entries.move_index(idx, self.entries.len() - 1);
108        self.entries.get(&key).map(|v| v.as_slice())
109    }
110
111    fn insert(&mut self, filter: String, version: u64, ref_vals: Vec<String>) {
112        if self.entries.len() >= self.capacity {
113            // First try purging stale entries from older versions.
114            self.purge_stale(version);
115        }
116        if self.entries.len() >= self.capacity {
117            // Still at capacity — evict least recently used (front).
118            self.entries.shift_remove_index(0);
119        }
120        self.entries.insert((filter, version), ref_vals);
121    }
122
123    /// Remove all entries whose version is older than `min_version`.
124    fn purge_stale(&mut self, min_version: u64) {
125        self.entries
126            .retain(|(_filter, version), _| *version >= min_version);
127    }
128}
129
130/// Compute query cache capacity based on entity count.
131/// Scales with graph size but bounded: min 256, max 1024.
132fn query_cache_capacity_for(entity_count: usize) -> usize {
133    (entity_count / 100).clamp(256, 1024)
134}
135
136const DEFAULT_QUERY_CACHE_CAPACITY: usize = 256;
137
138/// Common Haystack fields to auto-index for O(log N) value lookups.
139/// Number of CSR patch ops before triggering a full rebuild.
140const CSR_PATCH_THRESHOLD: usize = 1000;
141
142const AUTO_INDEX_FIELDS: &[&str] = &[
143    "siteRef", "equipRef", "dis", "curVal", "area", "geoCity", "kind", "unit",
144];
145
146impl EntityGraph {
147    /// Create an empty entity graph with standard Haystack fields auto-indexed
148    /// and default changelog capacity (50,000).
149    pub fn new() -> Self {
150        Self::with_changelog_capacity(super::changelog::DEFAULT_CHANGELOG_CAPACITY)
151    }
152
153    /// Create an empty entity graph with a custom changelog capacity.
154    pub fn with_changelog_capacity(capacity: usize) -> Self {
155        let capacity = capacity.max(1); // Ensure at least 1 entry
156        let mut value_index = ValueIndex::new();
157        for field in AUTO_INDEX_FIELDS {
158            value_index.index_field(field);
159        }
160        Self {
161            entities: HashMap::new(),
162            id_map: HashMap::new(),
163            reverse_id: HashMap::new(),
164            next_id: 0,
165            free_ids: Vec::new(),
166            tag_index: TagBitmapIndex::new(),
167            adjacency: RefAdjacency::new(),
168            namespace: None,
169            version: 0,
170            changelog: std::collections::VecDeque::new(),
171            changelog_capacity: capacity,
172            floor_version: 0,
173            query_cache: Mutex::new(QueryCache::new(DEFAULT_QUERY_CACHE_CAPACITY)),
174            ast_cache: Mutex::new(HashMap::new()),
175            value_index,
176            csr: None,
177            csr_version: 0,
178            csr_patch: super::csr::CsrPatch::new(),
179            columnar: ColumnarStore::new(),
180            structural: super::structural::StructuralIndex::new(),
181        }
182    }
183
184    /// Create an entity graph with an ontology namespace.
185    pub fn with_namespace(ns: DefNamespace) -> Self {
186        Self {
187            namespace: Some(ns),
188            ..Self::new()
189        }
190    }
191
192    // ── Value Indexes ──
193
194    /// Register a field for B-Tree value indexing. Enables O(log N) range
195    /// queries (e.g. `temp > 72`) for this field. Must be called before
196    /// entities are added, or followed by `rebuild_value_index` for existing data.
197    pub fn index_field(&mut self, field: &str) {
198        self.value_index.index_field(field);
199    }
200
201    /// Rebuild the value index for all indexed fields from the current entities.
202    pub fn rebuild_value_index(&mut self) {
203        self.value_index.clear();
204        for (ref_val, entity) in &self.entities {
205            if let Some(&eid) = self.id_map.get(ref_val.as_str()) {
206                for (name, val) in entity.iter() {
207                    if self.value_index.has_index(name) {
208                        self.value_index.add(eid, name, val);
209                    }
210                }
211            }
212        }
213    }
214
215    /// Returns a reference to the value index (for use by the query planner).
216    pub fn value_index(&self) -> &ValueIndex {
217        &self.value_index
218    }
219
220    // ── Columnar Storage ──
221
222    /// Register a tag for columnar storage. Enables cache-friendly sequential
223    /// scans for this tag. Must be called before entities are added, or followed
224    /// by `rebuild_columnar` for existing data.
225    pub fn track_column(&mut self, tag: &str) {
226        self.columnar.track_tag(tag);
227    }
228
229    /// Rebuild all tracked columnar data from current entities.
230    pub fn rebuild_columnar(&mut self) {
231        self.columnar.clear();
232        self.columnar.ensure_capacity(self.next_id);
233        for (ref_val, entity) in &self.entities {
234            if let Some(&eid) = self.id_map.get(ref_val.as_str()) {
235                for (name, val) in entity.iter() {
236                    if self.columnar.is_tracked(name) {
237                        self.columnar.set(eid, name, val);
238                    }
239                }
240            }
241        }
242    }
243
244    /// Returns a reference to the columnar store for direct column scans.
245    pub fn columnar(&self) -> &ColumnarStore {
246        &self.columnar
247    }
248
249    // ── CRUD ──
250
251    /// Add an entity to the graph.
252    ///
253    /// The entity must have an `id` tag that is a `Ref`. Returns the ref
254    /// value string on success.
255    pub fn add(&mut self, entity: HDict) -> Result<String, GraphError> {
256        let ref_val = extract_ref_val(&entity)?;
257
258        if self.entities.contains_key(&ref_val) {
259            return Err(GraphError::DuplicateRef(ref_val));
260        }
261
262        let eid = if let Some(recycled) = self.free_ids.pop() {
263            recycled
264        } else {
265            if self.next_id > MAX_ENTITY_ID {
266                return Err(GraphError::IdExhausted);
267            }
268            let id = self.next_id;
269            self.next_id = self.next_id.checked_add(1).ok_or(GraphError::InvalidId)?;
270            id
271        };
272
273        self.id_map.insert(ref_val.clone(), eid);
274        self.reverse_id.insert(eid, ref_val.clone());
275
276        // Index before inserting (borrows entity immutably, self mutably).
277        self.index_tags(eid, &entity);
278        self.index_refs(eid, &entity);
279
280        // Clone for the changelog, then move the entity into the map.
281        let entity_for_log = entity.clone();
282        self.entities.insert(ref_val.clone(), entity);
283
284        self.version += 1;
285        // Patch CSR instead of invalidating.
286        if self.csr.is_some() {
287            if let Some(entity) = self.entities.get(&ref_val) {
288                for (name, val) in entity.iter() {
289                    if let Kind::Ref(r) = val
290                        && name != "id"
291                    {
292                        self.csr_patch.add_edge(eid, name, &r.val);
293                    }
294                }
295            }
296            if self.csr_patch.len() > CSR_PATCH_THRESHOLD {
297                self.rebuild_csr();
298                self.csr_patch = super::csr::CsrPatch::new();
299            }
300        }
301        self.push_changelog(GraphDiff {
302            version: self.version,
303            timestamp: 0,
304            op: DiffOp::Add,
305            ref_val: ref_val.clone(),
306            old: None,
307            new: Some(entity_for_log),
308            changed_tags: None,
309            previous_tags: None,
310        });
311
312        // Resize query cache if entity count crossed a threshold.
313        let target_cap = query_cache_capacity_for(self.entities.len());
314        let mut cache = self.query_cache.lock();
315        if cache.capacity < target_cap {
316            cache.capacity = target_cap;
317        }
318
319        self.structural.mark_stale();
320        Ok(ref_val)
321    }
322
323    /// Add an entity without changelog tracking or version bump.
324    ///
325    /// Used for bulk restore from snapshots. Caller must call `finalize_bulk()`
326    /// after all bulk adds to rebuild CSR and set version.
327    pub fn add_bulk(&mut self, entity: HDict) -> Result<String, GraphError> {
328        let ref_val = extract_ref_val(&entity)?;
329        if self.entities.contains_key(&ref_val) {
330            return Err(GraphError::DuplicateRef(ref_val));
331        }
332
333        let eid = if let Some(recycled) = self.free_ids.pop() {
334            recycled
335        } else {
336            if self.next_id > MAX_ENTITY_ID {
337                return Err(GraphError::IdExhausted);
338            }
339            let id = self.next_id;
340            self.next_id = self.next_id.checked_add(1).ok_or(GraphError::InvalidId)?;
341            id
342        };
343
344        self.id_map.insert(ref_val.clone(), eid);
345        self.reverse_id.insert(eid, ref_val.clone());
346        self.index_tags(eid, &entity);
347        self.index_refs(eid, &entity);
348        self.entities.insert(ref_val.clone(), entity);
349
350        Ok(ref_val)
351    }
352
353    /// Finalize bulk load: rebuild CSR and set version.
354    pub fn finalize_bulk(&mut self, target_version: u64) {
355        self.version = target_version;
356        self.rebuild_csr();
357    }
358
359    /// Get a reference to an entity by ref value.
360    pub fn get(&self, ref_val: &str) -> Option<&HDict> {
361        self.entities.get(ref_val)
362    }
363
364    /// Update an existing entity by merging `changes` into it.
365    ///
366    /// Tags in `changes` overwrite existing tags; `Kind::Remove` tags are
367    /// deleted. The `id` tag cannot be changed.
368    pub fn update(&mut self, ref_val: &str, changes: HDict) -> Result<(), GraphError> {
369        let eid = *self
370            .id_map
371            .get(ref_val)
372            .ok_or_else(|| GraphError::NotFound(ref_val.to_string()))?;
373
374        let mut old_entity = self
375            .entities
376            .remove(ref_val)
377            .ok_or_else(|| GraphError::NotFound(ref_val.to_string()))?;
378
379        // Compute delta for changelog before mutating.
380        let mut prev_tags = HDict::new();
381        let mut changed = HDict::new();
382        for (key, new_val) in changes.iter() {
383            if let Some(old_val) = old_entity.get(key) {
384                prev_tags.set(key, old_val.clone());
385            }
386            changed.set(key, new_val.clone());
387        }
388
389        // Clone old for delta comparison, then merge.
390        let old_snapshot = old_entity.clone();
391        old_entity.merge(&changes);
392
393        // Delta indexing: only update what changed.
394        self.update_tags_delta(eid, &old_snapshot, &old_entity);
395
396        // Re-index refs only if ref edges changed.
397        if Self::refs_changed(&old_snapshot, &old_entity) {
398            self.adjacency.remove(eid);
399            self.index_refs(eid, &old_entity);
400            if self.csr.is_some() {
401                self.csr_patch.remove_entity(eid);
402                for (name, val) in old_entity.iter() {
403                    if let Kind::Ref(r) = val
404                        && name != "id"
405                    {
406                        self.csr_patch.add_edge(eid, name, &r.val);
407                    }
408                }
409                if self.csr_patch.len() > CSR_PATCH_THRESHOLD {
410                    self.rebuild_csr();
411                    self.csr_patch = super::csr::CsrPatch::new();
412                }
413            }
414        }
415
416        self.entities.insert(ref_val.to_string(), old_entity);
417
418        self.version += 1;
419        self.push_changelog(GraphDiff {
420            version: self.version,
421            timestamp: 0,
422            op: DiffOp::Update,
423            ref_val: ref_val.to_string(),
424            old: None,
425            new: None,
426            changed_tags: Some(changed),
427            previous_tags: Some(prev_tags),
428        });
429
430        // Invalidate query cache.
431        self.query_cache.lock().entries.clear();
432
433        self.structural.mark_stale();
434        Ok(())
435    }
436
437    /// Remove an entity from the graph. Returns the removed entity.
438    pub fn remove(&mut self, ref_val: &str) -> Result<HDict, GraphError> {
439        let eid = self
440            .id_map
441            .remove(ref_val)
442            .ok_or_else(|| GraphError::NotFound(ref_val.to_string()))?;
443
444        self.reverse_id.remove(&eid);
445
446        let entity = self
447            .entities
448            .remove(ref_val)
449            .ok_or_else(|| GraphError::NotFound(ref_val.to_string()))?;
450
451        self.remove_indexing(eid, &entity);
452        self.free_ids.push(eid);
453
454        self.version += 1;
455        if self.csr.is_some() {
456            self.csr_patch.remove_entity(eid);
457            if self.csr_patch.len() > CSR_PATCH_THRESHOLD {
458                self.rebuild_csr();
459                self.csr_patch = super::csr::CsrPatch::new();
460            }
461        }
462        self.push_changelog(GraphDiff {
463            version: self.version,
464            timestamp: 0,
465            op: DiffOp::Remove,
466            ref_val: ref_val.to_string(),
467            old: Some(entity.clone()),
468            new: None,
469            changed_tags: None,
470            previous_tags: None,
471        });
472
473        self.structural.mark_stale();
474        Ok(entity)
475    }
476
477    // ── Structural Index ──
478
479    /// Get the structural index if it has been computed and is current.
480    /// Returns `None` if stale — call `recompute_structural()` under a
481    /// write lock first.
482    pub fn structural_index(&self) -> Option<&super::structural::StructuralIndex> {
483        if self.structural.is_stale() {
484            None
485        } else {
486            Some(&self.structural)
487        }
488    }
489
490    /// Force recomputation of structural fingerprints.
491    pub fn recompute_structural(&mut self) {
492        let entities = &self.entities;
493        let id_map = &self.id_map;
494        let adjacency = &self.adjacency;
495        self.structural
496            .compute(entities, id_map, |ref_val| match id_map.get(ref_val) {
497                Some(&eid) => adjacency.targets_from(eid, None),
498                None => Vec::new(),
499            });
500    }
501
502    // ── Query ──
503
504    /// Run a filter expression and return matching entities as a grid.
505    pub fn read(&self, filter_expr: &str, limit: usize) -> Result<HGrid, GraphError> {
506        let results = self.read_all(filter_expr, limit)?;
507
508        if results.is_empty() {
509            return Ok(HGrid::new());
510        }
511
512        // Collect all unique column names.
513        let mut seen: std::collections::HashSet<String> =
514            std::collections::HashSet::with_capacity(results.len().min(64));
515        for entity in &results {
516            for name in entity.tag_names() {
517                seen.insert(name.to_string());
518            }
519        }
520        let mut col_set: Vec<String> = seen.into_iter().collect();
521        col_set.sort();
522        let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
523        let rows: Vec<HDict> = results.into_iter().cloned().collect();
524
525        Ok(HGrid::from_parts(HDict::new(), cols, rows))
526    }
527
528    /// Run a filter expression and return matching entities as references.
529    pub fn read_all(&self, filter_expr: &str, limit: usize) -> Result<Vec<&HDict>, GraphError> {
530        let effective_limit = if limit == 0 { usize::MAX } else { limit };
531
532        // Check query cache (version-keyed, so mutations auto-invalidate).
533        {
534            let mut cache = self.query_cache.lock();
535            if let Some(cached_refs) = cache.get(filter_expr, self.version) {
536                let mut results = Vec::new();
537                for rv in cached_refs {
538                    if results.len() >= effective_limit {
539                        break;
540                    }
541                    if let Some(entity) = self.entities.get(rv) {
542                        results.push(entity);
543                    }
544                }
545                return Ok(results);
546            }
547        }
548
549        // Use cached AST or parse and cache it (ASTs are version-independent).
550        let ast = {
551            let mut ast_cache = self.ast_cache.lock();
552            if let Some(cached) = ast_cache.get(filter_expr) {
553                cached.clone()
554            } else {
555                let parsed =
556                    parse_filter(filter_expr).map_err(|e| GraphError::Filter(e.to_string()))?;
557                ast_cache.insert(filter_expr.to_string(), parsed.clone());
558                parsed
559            }
560        };
561
562        // Phase 1: bitmap acceleration (with value index + adjacency).
563        let max_id = self.next_id;
564        let candidates = query_planner::bitmap_candidates_with_values(
565            &ast,
566            &self.tag_index,
567            &self.value_index,
568            &self.adjacency,
569            max_id,
570        );
571
572        // Phase 2: full filter evaluation.
573        let resolver = |r: &HRef| -> Option<&HDict> { self.entities.get(&r.val) };
574        let ns = self.namespace.as_ref();
575
576        // Use parallel evaluation only for large unlimited queries where rayon
577        // overhead is worthwhile. Bounded queries always use sequential + early exit.
578        const PARALLEL_THRESHOLD: usize = 500;
579        let use_parallel = effective_limit == usize::MAX;
580
581        let mut results: Vec<&HDict>;
582
583        if let Some(ref bitmap) = candidates {
584            let candidate_ids: Vec<usize> = TagBitmapIndex::iter_set_bits(bitmap).collect();
585
586            if candidate_ids.len() >= PARALLEL_THRESHOLD && use_parallel {
587                results = candidate_ids
588                    .par_iter()
589                    .filter_map(|&eid| {
590                        let ref_val = self.reverse_id.get(&eid)?;
591                        let entity = self.entities.get(ref_val)?;
592                        if matches_with_ns(&ast, entity, Some(&resolver), ns) {
593                            Some(entity)
594                        } else {
595                            None
596                        }
597                    })
598                    .collect();
599            } else {
600                results = Vec::new();
601                for eid in TagBitmapIndex::iter_set_bits(bitmap) {
602                    if results.len() >= effective_limit {
603                        break;
604                    }
605                    if let Some(ref_val) = self.reverse_id.get(&eid)
606                        && let Some(entity) = self.entities.get(ref_val)
607                        && matches_with_ns(&ast, entity, Some(&resolver), ns)
608                    {
609                        results.push(entity);
610                    }
611                }
612            }
613        } else {
614            let entity_count = self.entities.len();
615
616            if entity_count >= PARALLEL_THRESHOLD && use_parallel {
617                results = self
618                    .entities
619                    .par_iter()
620                    .filter_map(|(_, entity)| {
621                        if matches_with_ns(&ast, entity, Some(&resolver), ns) {
622                            Some(entity)
623                        } else {
624                            None
625                        }
626                    })
627                    .collect();
628            } else {
629                results = Vec::new();
630                for entity in self.entities.values() {
631                    if results.len() >= effective_limit {
632                        break;
633                    }
634                    if matches_with_ns(&ast, entity, Some(&resolver), ns) {
635                        results.push(entity);
636                    }
637                }
638            }
639        }
640
641        if results.len() > effective_limit {
642            results.truncate(effective_limit);
643        }
644
645        // Populate cache with result ref_vals (only for unlimited queries to
646        // avoid caching partial results that depend on limit).
647        if limit == 0 {
648            let ref_vals: Vec<String> = results
649                .iter()
650                .filter_map(|e| {
651                    e.get("id").and_then(|k| match k {
652                        Kind::Ref(r) => Some(r.val.clone()),
653                        _ => None,
654                    })
655                })
656                .collect();
657            let mut cache = self.query_cache.lock();
658            cache.insert(filter_expr.to_string(), self.version, ref_vals);
659        }
660
661        Ok(results)
662    }
663
664    // ── Ref traversal ──
665
666    /// Get ref values that the given entity points to.
667    pub fn refs_from(&self, ref_val: &str, ref_type: Option<&str>) -> Vec<String> {
668        match self.id_map.get(ref_val) {
669            Some(&eid) => {
670                if let Some(csr) = &self.csr {
671                    csr.targets_from_patched(eid, ref_type, &self.csr_patch)
672                } else {
673                    self.adjacency.targets_from(eid, ref_type)
674                }
675            }
676            None => Vec::new(),
677        }
678    }
679
680    /// Get ref values of entities that point to the given entity.
681    pub fn refs_to(&self, ref_val: &str, ref_type: Option<&str>) -> Vec<String> {
682        if let Some(csr) = &self.csr {
683            csr.sources_to_patched(ref_val, ref_type, &self.csr_patch)
684                .iter()
685                .filter_map(|eid| self.reverse_id.get(eid).cloned())
686                .collect()
687        } else {
688            self.adjacency
689                .sources_to(ref_val, ref_type)
690                .iter()
691                .filter_map(|eid| self.reverse_id.get(eid).cloned())
692                .collect()
693        }
694    }
695
696    /// Rebuild the CSR snapshot from the current adjacency.
697    /// Should be called after a batch of mutations (e.g., import, sync).
698    pub fn rebuild_csr(&mut self) {
699        let max_id = if self.next_id > 0 { self.next_id } else { 0 };
700        self.csr = Some(CsrAdjacency::from_ref_adjacency(&self.adjacency, max_id));
701        self.csr_version = self.version;
702        self.csr_patch = super::csr::CsrPatch::new();
703    }
704
705    /// Returns true if the CSR snapshot is stale (version mismatch).
706    pub fn csr_is_stale(&self) -> bool {
707        match &self.csr {
708            Some(_) => self.csr_version != self.version,
709            None => true,
710        }
711    }
712
713    // ── Graph traversal ──
714
715    /// Return all edges in the graph as `(source_ref, ref_tag, target_ref)` tuples.
716    pub fn all_edges(&self) -> Vec<(String, String, String)> {
717        let mut edges = Vec::new();
718        for (&eid, ref_val) in &self.reverse_id {
719            if let Some(fwd) = self.adjacency.forward_raw().get(&eid) {
720                for (ref_tag, target) in fwd {
721                    edges.push((ref_val.clone(), ref_tag.clone(), target.clone()));
722                }
723            }
724        }
725        edges
726    }
727
728    /// BFS neighborhood: return entities and edges within `hops` of `ref_val`.
729    ///
730    /// `ref_types` optionally restricts which ref tags are traversed.
731    /// Returns `(entities, edges)` where edges are `(source, ref_tag, target)`.
732    pub fn neighbors(
733        &self,
734        ref_val: &str,
735        hops: usize,
736        ref_types: Option<&[&str]>,
737    ) -> (Vec<&HDict>, Vec<(String, String, String)>) {
738        use std::collections::{HashSet, VecDeque};
739
740        let start_eid = match self.id_map.get(ref_val) {
741            Some(&eid) => eid,
742            None => return (Vec::new(), Vec::new()),
743        };
744
745        let mut visited: HashSet<usize> = HashSet::new();
746        let mut queue: VecDeque<(usize, usize)> = VecDeque::new();
747        let mut result_entities: Vec<&HDict> = Vec::with_capacity(16);
748        let mut result_edges: Vec<(String, String, String)> = Vec::with_capacity(16);
749
750        visited.insert(start_eid);
751        queue.push_back((start_eid, 0));
752
753        if let Some(entity) = self.entities.get(ref_val) {
754            result_entities.push(entity);
755        }
756
757        while let Some((current_eid, depth)) = queue.pop_front() {
758            if depth >= hops {
759                continue;
760            }
761            let current_ref = match self.reverse_id.get(&current_eid) {
762                Some(s) => s.as_str(),
763                None => continue,
764            };
765
766            // Traverse forward edges
767            if let Some(fwd) = self.adjacency.forward_raw().get(&current_eid) {
768                for (ref_tag, target) in fwd {
769                    if let Some(types) = ref_types
770                        && !types.iter().any(|t| t == ref_tag)
771                    {
772                        continue;
773                    }
774                    result_edges.push((current_ref.to_string(), ref_tag.clone(), target.clone()));
775                    if let Some(&target_eid) = self.id_map.get(target.as_str())
776                        && visited.insert(target_eid)
777                    {
778                        if let Some(entity) = self.entities.get(target.as_str()) {
779                            result_entities.push(entity);
780                        }
781                        queue.push_back((target_eid, depth + 1));
782                    }
783                }
784            }
785            // Traverse reverse edges
786            if let Some(rev) = self.adjacency.reverse_raw().get(current_ref) {
787                for (ref_tag, source_eid) in rev {
788                    if let Some(types) = ref_types
789                        && !types.iter().any(|t| t == ref_tag)
790                    {
791                        continue;
792                    }
793                    if let Some(source_ref) = self.reverse_id.get(source_eid) {
794                        result_edges.push((
795                            source_ref.clone(),
796                            ref_tag.clone(),
797                            current_ref.to_string(),
798                        ));
799                        if visited.insert(*source_eid) {
800                            if let Some(entity) = self.entities.get(source_ref.as_str()) {
801                                result_entities.push(entity);
802                            }
803                            queue.push_back((*source_eid, depth + 1));
804                        }
805                    }
806                }
807            }
808        }
809
810        result_entities.sort_by(|a, b| {
811            let a_id = a.id().map(|r| r.val.as_str()).unwrap_or("");
812            let b_id = b.id().map(|r| r.val.as_str()).unwrap_or("");
813            a_id.cmp(b_id)
814        });
815        result_edges.sort();
816        // Deduplicate edges (reverse traversal can produce duplicates)
817        result_edges.dedup();
818
819        (result_entities, result_edges)
820    }
821
822    /// BFS shortest path from `from` to `to`. Returns ordered ref_vals, or
823    /// empty vec if no path exists.
824    pub fn shortest_path(&self, from: &str, to: &str) -> Vec<String> {
825        use std::collections::{HashMap as StdHashMap, VecDeque};
826
827        if from == to {
828            return vec![from.to_string()];
829        }
830        let from_eid = match self.id_map.get(from) {
831            Some(&eid) => eid,
832            None => return Vec::new(),
833        };
834        let to_eid = match self.id_map.get(to) {
835            Some(&eid) => eid,
836            None => return Vec::new(),
837        };
838
839        // parent map: child_eid -> parent_eid (usize::MAX = root sentinel)
840        let mut parents: StdHashMap<usize, usize> = StdHashMap::new();
841        let mut queue: VecDeque<usize> = VecDeque::new();
842        parents.insert(from_eid, usize::MAX);
843        queue.push_back(from_eid);
844
845        while let Some(current_eid) = queue.pop_front() {
846            let current_ref = match self.reverse_id.get(&current_eid) {
847                Some(s) => s.as_str(),
848                None => continue,
849            };
850
851            // Forward edges
852            if let Some(fwd) = self.adjacency.forward_raw().get(&current_eid) {
853                for (_, target) in fwd {
854                    if let Some(&target_eid) = self.id_map.get(target.as_str())
855                        && let std::collections::hash_map::Entry::Vacant(e) =
856                            parents.entry(target_eid)
857                    {
858                        e.insert(current_eid);
859                        if target_eid == to_eid {
860                            return Self::reconstruct_path_usize(
861                                &parents,
862                                to_eid,
863                                &self.reverse_id,
864                            );
865                        }
866                        queue.push_back(target_eid);
867                    }
868                }
869            }
870            // Reverse edges
871            if let Some(rev) = self.adjacency.reverse_raw().get(current_ref) {
872                for (_, source_eid) in rev {
873                    if !parents.contains_key(source_eid) {
874                        parents.insert(*source_eid, current_eid);
875                        if *source_eid == to_eid {
876                            return Self::reconstruct_path_usize(
877                                &parents,
878                                to_eid,
879                                &self.reverse_id,
880                            );
881                        }
882                        queue.push_back(*source_eid);
883                    }
884                }
885            }
886        }
887
888        Vec::new() // No path found
889    }
890
891    /// Reconstruct path from usize-based BFS parent map.
892    fn reconstruct_path_usize(
893        parents: &std::collections::HashMap<usize, usize>,
894        to_eid: usize,
895        reverse_id: &HashMap<usize, String>,
896    ) -> Vec<String> {
897        let mut path_eids = vec![to_eid];
898        let mut current = to_eid;
899        while let Some(&parent) = parents.get(&current) {
900            if parent == usize::MAX {
901                break;
902            }
903            path_eids.push(parent);
904            current = parent;
905        }
906        path_eids.reverse();
907        path_eids
908            .iter()
909            .filter_map(|eid| reverse_id.get(eid).cloned())
910            .collect()
911    }
912
913    /// Return the subtree rooted at `root` up to `max_depth` levels.
914    ///
915    /// Walks reverse refs (children referencing parent). Returns entities
916    /// paired with their depth from root. Root is depth 0.
917    pub fn subtree(&self, root: &str, max_depth: usize) -> Vec<(&HDict, usize)> {
918        use std::collections::{HashSet, VecDeque};
919
920        let mut visited: HashSet<String> = HashSet::new();
921        let mut queue: VecDeque<(String, usize)> = VecDeque::new();
922        let mut results: Vec<(&HDict, usize)> = Vec::new();
923
924        visited.insert(root.to_string());
925        queue.push_back((root.to_string(), 0));
926
927        if let Some(entity) = self.entities.get(root) {
928            results.push((entity, 0));
929        } else {
930            return Vec::new();
931        }
932
933        while let Some((current, depth)) = queue.pop_front() {
934            if depth >= max_depth {
935                continue;
936            }
937            // Children = entities that reference current (reverse refs)
938            let child_refs = self.refs_to(&current, None);
939            for child_ref in child_refs {
940                if visited.insert(child_ref.clone())
941                    && let Some(entity) = self.entities.get(&child_ref)
942                {
943                    results.push((entity, depth + 1));
944                    queue.push_back((child_ref, depth + 1));
945                }
946            }
947        }
948
949        results
950    }
951
952    // ── Haystack Hierarchy Helpers ──
953
954    /// Walk a chain of ref tags starting from an entity.
955    ///
956    /// For example, `ref_chain("point-1", &["equipRef", "siteRef"])` follows
957    /// `point-1` → its `equipRef` → that entity's `siteRef`, returning the
958    /// ordered path of resolved entities (excluding the starting entity).
959    pub fn ref_chain(&self, ref_val: &str, ref_tags: &[&str]) -> Vec<&HDict> {
960        let mut result = Vec::with_capacity(ref_tags.len());
961        let mut current = ref_val.to_string();
962        for tag in ref_tags {
963            let entity = match self.entities.get(&current) {
964                Some(e) => e,
965                None => break,
966            };
967            match entity.get(tag) {
968                Some(Kind::Ref(r)) => {
969                    current = r.val.clone();
970                    if let Some(target) = self.entities.get(&current) {
971                        result.push(target);
972                    } else {
973                        break;
974                    }
975                }
976                _ => break,
977            }
978        }
979        result
980    }
981
982    /// Resolve the site for any entity by walking `equipRef` → `siteRef`.
983    ///
984    /// If the entity itself has a `site` marker, returns it directly.
985    /// Otherwise walks the standard Haystack ref chain.
986    pub fn site_for(&self, ref_val: &str) -> Option<&HDict> {
987        let entity = self.entities.get(ref_val)?;
988        // If the entity is itself a site, return it.
989        if entity.has("site") {
990            return Some(entity);
991        }
992        // Check direct siteRef.
993        if let Some(Kind::Ref(r)) = entity.get("siteRef") {
994            return self.entities.get(&r.val);
995        }
996        // Walk equipRef → siteRef.
997        if let Some(Kind::Ref(r)) = entity.get("equipRef")
998            && let Some(equip) = self.entities.get(&r.val)
999            && let Some(Kind::Ref(sr)) = equip.get("siteRef")
1000        {
1001            return self.entities.get(&sr.val);
1002        }
1003        None
1004    }
1005
1006    /// All direct children: entities with any ref tag pointing to this entity.
1007    pub fn children(&self, ref_val: &str) -> Vec<&HDict> {
1008        self.refs_to(ref_val, None)
1009            .iter()
1010            .filter_map(|r| self.entities.get(r))
1011            .collect()
1012    }
1013
1014    /// All points for an equip — children with the `point` marker.
1015    ///
1016    /// Optionally filter further with a filter expression.
1017    pub fn equip_points(&self, equip_ref: &str, filter: Option<&str>) -> Vec<&HDict> {
1018        let points: Vec<&HDict> = self
1019            .children(equip_ref)
1020            .into_iter()
1021            .filter(|e| e.has("point"))
1022            .collect();
1023        match filter {
1024            Some(expr) => {
1025                let ast = match crate::filter::parse_filter(expr) {
1026                    Ok(ast) => ast,
1027                    Err(_) => return points,
1028                };
1029                points
1030                    .into_iter()
1031                    .filter(|e| crate::filter::matches(&ast, e, None))
1032                    .collect()
1033            }
1034            None => points,
1035        }
1036    }
1037
1038    // ── Spec-aware ──
1039
1040    /// Find all entities that structurally fit a spec/type name.
1041    ///
1042    /// Requires a namespace to be set. Returns empty if no namespace.
1043    pub fn entities_fitting(&self, spec_name: &str) -> Vec<&HDict> {
1044        match &self.namespace {
1045            Some(ns) => self
1046                .entities
1047                .values()
1048                .filter(|e| ns.fits(e, spec_name))
1049                .collect(),
1050            None => Vec::new(),
1051        }
1052    }
1053
1054    /// Validate all entities against the namespace and check for dangling refs.
1055    ///
1056    /// Returns empty if no namespace is set and no dangling refs exist.
1057    pub fn validate(&self) -> Vec<ValidationIssue> {
1058        let mut issues: Vec<ValidationIssue> = match &self.namespace {
1059            Some(ns) => self
1060                .entities
1061                .values()
1062                .flat_map(|e| ns.validate_entity(e))
1063                .collect(),
1064            None => Vec::new(),
1065        };
1066
1067        // Check for dangling refs: Ref values (except `id`) that point to
1068        // entities not present in the graph.
1069        for entity in self.entities.values() {
1070            let entity_ref = entity.id().map(|r| r.val.as_str());
1071            for (name, val) in entity.iter() {
1072                if name == "id" {
1073                    continue;
1074                }
1075                if let Kind::Ref(r) = val
1076                    && !self.entities.contains_key(&r.val)
1077                {
1078                    issues.push(ValidationIssue {
1079                        entity: entity_ref.map(|s| s.to_string()),
1080                        issue_type: "dangling_ref".to_string(),
1081                        detail: format!(
1082                            "tag '{}' references '{}' which does not exist in the graph",
1083                            name, r.val
1084                        ),
1085                    });
1086                }
1087            }
1088        }
1089
1090        issues
1091    }
1092
1093    // ── Serialization ──
1094
1095    /// Convert matching entities to a grid.
1096    ///
1097    /// If `filter_expr` is empty, exports all entities.
1098    /// Otherwise, delegates to `read`.
1099    pub fn to_grid(&self, filter_expr: &str) -> Result<HGrid, GraphError> {
1100        if filter_expr.is_empty() {
1101            let entities: Vec<&HDict> = self.entities.values().collect();
1102            return Ok(Self::entities_to_grid(&entities));
1103        }
1104        self.read(filter_expr, 0)
1105    }
1106
1107    /// Build a grid from a slice of entity references.
1108    fn entities_to_grid(entities: &[&HDict]) -> HGrid {
1109        if entities.is_empty() {
1110            return HGrid::new();
1111        }
1112
1113        let mut col_set: Vec<String> = Vec::new();
1114        let mut seen = std::collections::HashSet::new();
1115        for entity in entities {
1116            for name in entity.tag_names() {
1117                if seen.insert(name.to_string()) {
1118                    col_set.push(name.to_string());
1119                }
1120            }
1121        }
1122        col_set.sort();
1123        let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
1124        let rows: Vec<HDict> = entities.iter().map(|e| (*e).clone()).collect();
1125
1126        HGrid::from_parts(HDict::new(), cols, rows)
1127    }
1128
1129    /// Build an EntityGraph from a grid.
1130    ///
1131    /// Rows without a valid `id` Ref tag are silently skipped.
1132    pub fn from_grid(grid: &HGrid, namespace: Option<DefNamespace>) -> Result<Self, GraphError> {
1133        let mut graph = match namespace {
1134            Some(ns) => Self::with_namespace(ns),
1135            None => Self::new(),
1136        };
1137        for row in &grid.rows {
1138            if row.id().is_some() {
1139                graph.add(row.clone())?;
1140            }
1141        }
1142        // Bulk import: build CSR once after all adds.
1143        graph.rebuild_csr();
1144        Ok(graph)
1145    }
1146
1147    // ── Change tracking ──
1148
1149    /// Get changelog entries since a given version.
1150    ///
1151    /// Returns `Err(ChangelogGap)` if the requested version has been evicted
1152    /// from the changelog, signalling the subscriber must do a full resync.
1153    pub fn changes_since(&self, version: u64) -> Result<Vec<&GraphDiff>, ChangelogGap> {
1154        let target = version + 1;
1155        // If the floor has advanced past the requested version, the subscriber
1156        // has fallen behind and missed entries.
1157        if self.floor_version > 0 && version < self.floor_version {
1158            return Err(ChangelogGap {
1159                subscriber_version: version,
1160                floor_version: self.floor_version,
1161            });
1162        }
1163        // Binary search: versions are monotonically increasing in the VecDeque.
1164        // partition_point finds the first entry where version >= target.
1165        let start = self.changelog.partition_point(|d| d.version < target);
1166        Ok(self.changelog.iter().skip(start).collect())
1167    }
1168
1169    /// The lowest version still retained in the changelog.
1170    ///
1171    /// Returns 0 if no entries have been evicted.
1172    pub fn floor_version(&self) -> u64 {
1173        self.floor_version
1174    }
1175
1176    /// The configured changelog capacity.
1177    pub fn changelog_capacity(&self) -> usize {
1178        self.changelog_capacity
1179    }
1180
1181    /// Current query cache capacity.
1182    pub fn query_cache_capacity(&self) -> usize {
1183        self.query_cache.lock().capacity
1184    }
1185
1186    /// Current graph version (monotonically increasing).
1187    pub fn version(&self) -> u64 {
1188        self.version
1189    }
1190
1191    // ── Container ──
1192
1193    /// Number of entities in the graph.
1194    pub fn len(&self) -> usize {
1195        self.entities.len()
1196    }
1197
1198    /// Returns `true` if the graph has no entities.
1199    pub fn is_empty(&self) -> bool {
1200        self.entities.is_empty()
1201    }
1202
1203    /// Returns `true` if an entity with the given ref value exists.
1204    pub fn contains(&self, ref_val: &str) -> bool {
1205        self.entities.contains_key(ref_val)
1206    }
1207
1208    /// Returns references to all entities in the graph.
1209    pub fn all(&self) -> Vec<&HDict> {
1210        self.entities.values().collect()
1211    }
1212
1213    // ── Internal indexing ──
1214
1215    /// Add tag bitmap entries for an entity.
1216    fn index_tags(&mut self, entity_id: usize, entity: &HDict) {
1217        let tags: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
1218        self.tag_index.add(entity_id, &tags);
1219
1220        // Update value indexes for any indexed fields present on this entity.
1221        for (name, val) in entity.iter() {
1222            if self.value_index.has_index(name) {
1223                self.value_index.add(entity_id, name, val);
1224            }
1225            // Update columnar store for tracked tags.
1226            if self.columnar.is_tracked(name) {
1227                self.columnar.set(entity_id, name, val);
1228            }
1229        }
1230    }
1231
1232    /// Add ref adjacency entries for an entity.
1233    fn index_refs(&mut self, entity_id: usize, entity: &HDict) {
1234        for (name, val) in entity.iter() {
1235            if let Kind::Ref(r) = val {
1236                // Skip the "id" tag — it is the entity's own identity,
1237                // not a reference edge.
1238                if name != "id" {
1239                    self.adjacency.add(entity_id, name, &r.val);
1240                }
1241            }
1242        }
1243    }
1244
1245    /// Remove all index entries for an entity.
1246    fn remove_indexing(&mut self, entity_id: usize, entity: &HDict) {
1247        let tags: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
1248        self.tag_index.remove(entity_id, &tags);
1249        self.adjacency.remove(entity_id);
1250
1251        // Remove from value indexes.
1252        for (name, val) in entity.iter() {
1253            if self.value_index.has_index(name) {
1254                self.value_index.remove(entity_id, name, val);
1255            }
1256        }
1257
1258        // Clear columnar data for this entity.
1259        self.columnar.clear_entity(entity_id);
1260    }
1261
1262    /// Update only the changed tags in the tag bitmap index.
1263    fn update_tags_delta(&mut self, entity_id: usize, old: &HDict, new: &HDict) {
1264        let old_tags: std::collections::HashSet<&str> = old.tag_names().collect();
1265        let new_tags: std::collections::HashSet<&str> = new.tag_names().collect();
1266
1267        // Tags removed: clear bits.
1268        let removed: Vec<String> = old_tags
1269            .difference(&new_tags)
1270            .map(|s| s.to_string())
1271            .collect();
1272        if !removed.is_empty() {
1273            self.tag_index.remove(entity_id, &removed);
1274        }
1275
1276        // Tags added: set bits.
1277        let added: Vec<String> = new_tags
1278            .difference(&old_tags)
1279            .map(|s| s.to_string())
1280            .collect();
1281        if !added.is_empty() {
1282            self.tag_index.add(entity_id, &added);
1283        }
1284
1285        // Update value indexes for changed fields only.
1286        for (name, new_val) in new.iter() {
1287            if self.value_index.has_index(name) {
1288                if let Some(old_val) = old.get(name) {
1289                    if old_val != new_val {
1290                        self.value_index.remove(entity_id, name, old_val);
1291                        self.value_index.add(entity_id, name, new_val);
1292                    }
1293                } else {
1294                    self.value_index.add(entity_id, name, new_val);
1295                }
1296            }
1297            if self.columnar.is_tracked(name) {
1298                self.columnar.set(entity_id, name, new_val);
1299            }
1300        }
1301
1302        // Remove value indexes for removed fields.
1303        for name in &removed {
1304            if self.value_index.has_index(name)
1305                && let Some(old_val) = old.get(name.as_str())
1306            {
1307                self.value_index.remove(entity_id, name, old_val);
1308            }
1309        }
1310    }
1311
1312    /// Check if ref edges changed between old and new entity.
1313    fn refs_changed(old: &HDict, new: &HDict) -> bool {
1314        for (name, val) in new.iter() {
1315            if name != "id"
1316                && let Kind::Ref(_) = val
1317                && old.get(name) != Some(val)
1318            {
1319                return true;
1320            }
1321        }
1322        // Check for removed refs.
1323        for (name, val) in old.iter() {
1324            if name != "id"
1325                && let Kind::Ref(_) = val
1326                && new.get(name).is_none()
1327            {
1328                return true;
1329            }
1330        }
1331        false
1332    }
1333
1334    /// Build a full hierarchy subtree as a structured tree.
1335    /// `root` is the entity ref, `max_depth` limits recursion (0 = root only).
1336    pub fn hierarchy_tree(&self, root: &str, max_depth: usize) -> Option<HierarchyNode> {
1337        let entity = self.entities.get(root)?.clone();
1338        Some(self.build_subtree(root, &entity, 0, max_depth))
1339    }
1340
1341    fn build_subtree(
1342        &self,
1343        ref_val: &str,
1344        entity: &HDict,
1345        depth: usize,
1346        max_depth: usize,
1347    ) -> HierarchyNode {
1348        let children = if depth < max_depth {
1349            self.children(ref_val)
1350                .into_iter()
1351                .filter_map(|child| {
1352                    let child_id = child.id()?.val.clone();
1353                    Some(self.build_subtree(&child_id, child, depth + 1, max_depth))
1354                })
1355                .collect()
1356        } else {
1357            Vec::new()
1358        };
1359        HierarchyNode {
1360            entity: entity.clone(),
1361            children,
1362            depth,
1363        }
1364    }
1365
1366    /// Determine the most specific entity type from its markers.
1367    ///
1368    /// Returns the most specific marker tag that identifies the entity type.
1369    /// E.g., an entity with `equip` + `ahu` markers returns `"ahu"` (most specific).
1370    pub fn classify(&self, ref_val: &str) -> Option<String> {
1371        let entity = self.entities.get(ref_val)?;
1372        classify_entity(entity)
1373    }
1374
1375    /// Append a diff to the changelog, capping at the configured capacity.
1376    fn push_changelog(&mut self, mut diff: GraphDiff) {
1377        diff.timestamp = GraphDiff::now_nanos();
1378        self.changelog.push_back(diff);
1379        while self.changelog.len() > self.changelog_capacity {
1380            if let Some(evicted) = self.changelog.pop_front() {
1381                self.floor_version = evicted.version;
1382            }
1383        }
1384    }
1385}
1386
1387impl Default for EntityGraph {
1388    fn default() -> Self {
1389        Self::new()
1390    }
1391}
1392
1393/// A node in a hierarchy tree produced by [`EntityGraph::hierarchy_tree`].
1394#[derive(Debug, Clone)]
1395pub struct HierarchyNode {
1396    pub entity: HDict,
1397    pub children: Vec<HierarchyNode>,
1398    pub depth: usize,
1399}
1400
1401/// Extract the ref value string from an entity's `id` tag.
1402fn extract_ref_val(entity: &HDict) -> Result<String, GraphError> {
1403    match entity.get("id") {
1404        Some(Kind::Ref(r)) => Ok(r.val.clone()),
1405        Some(_) => Err(GraphError::InvalidId),
1406        None => Err(GraphError::MissingId),
1407    }
1408}
1409
1410/// Priority-ordered list of marker tags from most specific to least specific.
1411/// The first match wins.
1412const CLASSIFY_PRIORITY: &[&str] = &[
1413    // Point subtypes
1414    "sensor", "cmd", "sp", // Equipment subtypes
1415    "ahu", "vav", "boiler", "chiller", "meter", // Base categories
1416    "point", "equip", // Space types
1417    "room", "floor", "zone", "space", // Site
1418    "site",  // Other well-known
1419    "weather", "device", "network",
1420];
1421
1422/// Classify an entity by returning the most specific recognized marker tag.
1423fn classify_entity(entity: &HDict) -> Option<String> {
1424    for &tag in CLASSIFY_PRIORITY {
1425        if entity.has(tag) {
1426            return Some(tag.to_string());
1427        }
1428    }
1429    None
1430}
1431
1432#[cfg(test)]
1433mod tests {
1434    use super::*;
1435    use crate::kinds::Number;
1436
1437    fn make_site(id: &str) -> HDict {
1438        let mut d = HDict::new();
1439        d.set("id", Kind::Ref(HRef::from_val(id)));
1440        d.set("site", Kind::Marker);
1441        d.set("dis", Kind::Str(format!("Site {id}")));
1442        d.set(
1443            "area",
1444            Kind::Number(Number::new(4500.0, Some("ft\u{00b2}".into()))),
1445        );
1446        d
1447    }
1448
1449    fn make_equip(id: &str, site_ref: &str) -> HDict {
1450        let mut d = HDict::new();
1451        d.set("id", Kind::Ref(HRef::from_val(id)));
1452        d.set("equip", Kind::Marker);
1453        d.set("dis", Kind::Str(format!("Equip {id}")));
1454        d.set("siteRef", Kind::Ref(HRef::from_val(site_ref)));
1455        d
1456    }
1457
1458    fn make_point(id: &str, equip_ref: &str) -> HDict {
1459        let mut d = HDict::new();
1460        d.set("id", Kind::Ref(HRef::from_val(id)));
1461        d.set("point", Kind::Marker);
1462        d.set("sensor", Kind::Marker);
1463        d.set("temp", Kind::Marker);
1464        d.set("dis", Kind::Str(format!("Point {id}")));
1465        d.set("equipRef", Kind::Ref(HRef::from_val(equip_ref)));
1466        d.set(
1467            "curVal",
1468            Kind::Number(Number::new(72.5, Some("\u{00b0}F".into()))),
1469        );
1470        d
1471    }
1472
1473    // ── Add tests ──
1474
1475    #[test]
1476    fn add_entity_with_valid_id() {
1477        let mut g = EntityGraph::new();
1478        let result = g.add(make_site("site-1"));
1479        assert!(result.is_ok());
1480        assert_eq!(result.unwrap(), "site-1");
1481        assert_eq!(g.len(), 1);
1482    }
1483
1484    #[test]
1485    fn add_entity_missing_id_fails() {
1486        let mut g = EntityGraph::new();
1487        let entity = HDict::new();
1488        let err = g.add(entity).unwrap_err();
1489        assert!(matches!(err, GraphError::MissingId));
1490    }
1491
1492    #[test]
1493    fn add_entity_non_ref_id_fails() {
1494        let mut g = EntityGraph::new();
1495        let mut entity = HDict::new();
1496        entity.set("id", Kind::Str("not-a-ref".into()));
1497        let err = g.add(entity).unwrap_err();
1498        assert!(matches!(err, GraphError::InvalidId));
1499    }
1500
1501    #[test]
1502    fn add_duplicate_ref_fails() {
1503        let mut g = EntityGraph::new();
1504        g.add(make_site("site-1")).unwrap();
1505        let err = g.add(make_site("site-1")).unwrap_err();
1506        assert!(matches!(err, GraphError::DuplicateRef(_)));
1507    }
1508
1509    // ── Get tests ──
1510
1511    #[test]
1512    fn get_existing_entity() {
1513        let mut g = EntityGraph::new();
1514        g.add(make_site("site-1")).unwrap();
1515        let entity = g.get("site-1").unwrap();
1516        assert!(entity.has("site"));
1517        assert_eq!(entity.get("dis"), Some(&Kind::Str("Site site-1".into())));
1518    }
1519
1520    #[test]
1521    fn get_missing_entity_returns_none() {
1522        let g = EntityGraph::new();
1523        assert!(g.get("nonexistent").is_none());
1524    }
1525
1526    // ── Update tests ──
1527
1528    #[test]
1529    fn update_merges_changes() {
1530        let mut g = EntityGraph::new();
1531        g.add(make_site("site-1")).unwrap();
1532
1533        let mut changes = HDict::new();
1534        changes.set("dis", Kind::Str("Updated Site".into()));
1535        changes.set("geoCity", Kind::Str("Richmond".into()));
1536        g.update("site-1", changes).unwrap();
1537
1538        let entity = g.get("site-1").unwrap();
1539        assert_eq!(entity.get("dis"), Some(&Kind::Str("Updated Site".into())));
1540        assert_eq!(entity.get("geoCity"), Some(&Kind::Str("Richmond".into())));
1541        assert!(entity.has("site")); // unchanged
1542    }
1543
1544    #[test]
1545    fn update_missing_entity_fails() {
1546        let mut g = EntityGraph::new();
1547        let err = g.update("nonexistent", HDict::new()).unwrap_err();
1548        assert!(matches!(err, GraphError::NotFound(_)));
1549    }
1550
1551    // ── Remove tests ──
1552
1553    #[test]
1554    fn remove_entity() {
1555        let mut g = EntityGraph::new();
1556        g.add(make_site("site-1")).unwrap();
1557        let removed = g.remove("site-1").unwrap();
1558        assert!(removed.has("site"));
1559        assert!(g.get("site-1").is_none());
1560        assert_eq!(g.len(), 0);
1561    }
1562
1563    #[test]
1564    fn remove_missing_entity_fails() {
1565        let mut g = EntityGraph::new();
1566        let err = g.remove("nonexistent").unwrap_err();
1567        assert!(matches!(err, GraphError::NotFound(_)));
1568    }
1569
1570    #[test]
1571    fn id_freelist_recycles_removed_ids() {
1572        let mut g = EntityGraph::new();
1573
1574        // Add 3 entities: IDs 0, 1, 2
1575        for i in 0..3 {
1576            let mut e = HDict::new();
1577            e.set("id", Kind::Ref(HRef::from_val(format!("e-{i}"))));
1578            g.add(e).unwrap();
1579        }
1580
1581        // Remove entity 1 (frees ID 1)
1582        g.remove("e-1").unwrap();
1583
1584        // Add a new entity — should reuse ID 1, not allocate ID 3
1585        let mut e = HDict::new();
1586        e.set("id", Kind::Ref(HRef::from_val("e-new")));
1587        g.add(e).unwrap();
1588
1589        // Graph should have 3 entities and next_id should still be 3
1590        assert_eq!(g.len(), 3);
1591    }
1592
1593    // ── Version / changelog tests ──
1594
1595    #[test]
1596    fn version_increments_on_mutations() {
1597        let mut g = EntityGraph::new();
1598        assert_eq!(g.version(), 0);
1599
1600        g.add(make_site("site-1")).unwrap();
1601        assert_eq!(g.version(), 1);
1602
1603        g.update("site-1", HDict::new()).unwrap();
1604        assert_eq!(g.version(), 2);
1605
1606        g.remove("site-1").unwrap();
1607        assert_eq!(g.version(), 3);
1608    }
1609
1610    #[test]
1611    fn changelog_records_add_update_remove() {
1612        let mut g = EntityGraph::new();
1613        g.add(make_site("site-1")).unwrap();
1614        g.update("site-1", HDict::new()).unwrap();
1615        g.remove("site-1").unwrap();
1616
1617        let changes = g.changes_since(0).unwrap();
1618        assert_eq!(changes.len(), 3);
1619
1620        // Add: has new, no old, no deltas.
1621        assert_eq!(changes[0].op, DiffOp::Add);
1622        assert_eq!(changes[0].ref_val, "site-1");
1623        assert!(changes[0].old.is_none());
1624        assert!(changes[0].new.is_some());
1625        assert!(changes[0].changed_tags.is_none());
1626
1627        // Update: has deltas, no old/new.
1628        assert_eq!(changes[1].op, DiffOp::Update);
1629        assert!(changes[1].old.is_none());
1630        assert!(changes[1].new.is_none());
1631        assert!(changes[1].changed_tags.is_some());
1632        assert!(changes[1].previous_tags.is_some());
1633
1634        // Remove: has old, no new, no deltas.
1635        assert_eq!(changes[2].op, DiffOp::Remove);
1636        assert!(changes[2].old.is_some());
1637        assert!(changes[2].new.is_none());
1638        assert!(changes[2].changed_tags.is_none());
1639    }
1640
1641    #[test]
1642    fn changes_since_returns_subset() {
1643        let mut g = EntityGraph::new();
1644        g.add(make_site("site-1")).unwrap(); // v1
1645        g.add(make_site("site-2")).unwrap(); // v2
1646        g.add(make_site("site-3")).unwrap(); // v3
1647
1648        let since_v2 = g.changes_since(2).unwrap();
1649        assert_eq!(since_v2.len(), 1);
1650        assert_eq!(since_v2[0].ref_val, "site-3");
1651    }
1652
1653    #[test]
1654    fn configurable_changelog_capacity() {
1655        let mut g = EntityGraph::with_changelog_capacity(3);
1656        assert_eq!(g.changelog_capacity(), 3);
1657
1658        // Add 5 entities — first 2 should be evicted from changelog.
1659        for i in 0..5 {
1660            g.add(make_site(&format!("site-{i}"))).unwrap();
1661        }
1662
1663        assert_eq!(g.version(), 5);
1664        assert_eq!(g.floor_version(), 2); // v1 and v2 evicted
1665
1666        // Can still get changes from v2 onwards.
1667        let changes = g.changes_since(2).unwrap();
1668        assert_eq!(changes.len(), 3); // v3, v4, v5
1669
1670        // Requesting from v1 (evicted) should return ChangelogGap.
1671        let gap = g.changes_since(1).unwrap_err();
1672        assert_eq!(gap.subscriber_version, 1);
1673        assert_eq!(gap.floor_version, 2);
1674    }
1675
1676    #[test]
1677    fn changelog_gap_on_version_zero_after_eviction() {
1678        let mut g = EntityGraph::with_changelog_capacity(2);
1679        for i in 0..4 {
1680            g.add(make_site(&format!("site-{i}"))).unwrap();
1681        }
1682
1683        // Requesting since v0 after evictions should return gap.
1684        let gap = g.changes_since(0).unwrap_err();
1685        assert_eq!(gap.subscriber_version, 0);
1686        assert!(gap.floor_version > 0);
1687    }
1688
1689    #[test]
1690    fn no_gap_when_capacity_sufficient() {
1691        let mut g = EntityGraph::with_changelog_capacity(100);
1692        for i in 0..50 {
1693            g.add(make_site(&format!("site-{i}"))).unwrap();
1694        }
1695        assert_eq!(g.floor_version(), 0);
1696        let changes = g.changes_since(0).unwrap();
1697        assert_eq!(changes.len(), 50);
1698    }
1699
1700    #[test]
1701    fn changelog_entries_have_timestamps() {
1702        let mut g = EntityGraph::new();
1703        g.add(make_site("site-1")).unwrap();
1704        g.update("site-1", HDict::new()).unwrap();
1705        g.remove("site-1").unwrap();
1706
1707        let changes = g.changes_since(0).unwrap();
1708        for diff in &changes {
1709            assert!(diff.timestamp > 0, "timestamp should be positive");
1710        }
1711        // Timestamps should be non-decreasing.
1712        for pair in changes.windows(2) {
1713            assert!(pair[1].timestamp >= pair[0].timestamp);
1714        }
1715    }
1716
1717    #[test]
1718    fn update_diff_carries_delta_tags() {
1719        let mut g = EntityGraph::new();
1720        let mut site = HDict::new();
1721        site.set("id", Kind::Ref(HRef::from_val("site-1")));
1722        site.set("site", Kind::Marker);
1723        site.set("dis", Kind::Str("Original".into()));
1724        site.set("area", Kind::Number(Number::unitless(1000.0)));
1725        g.add(site).unwrap();
1726
1727        let mut changes = HDict::new();
1728        changes.set("dis", Kind::Str("Updated".into()));
1729        g.update("site-1", changes).unwrap();
1730
1731        let diffs = g.changes_since(1).unwrap(); // skip the Add
1732        assert_eq!(diffs.len(), 1);
1733        let diff = &diffs[0];
1734        assert_eq!(diff.op, DiffOp::Update);
1735
1736        // old/new should be None for Update (delta only).
1737        assert!(diff.old.is_none());
1738        assert!(diff.new.is_none());
1739
1740        // changed_tags has the new value.
1741        let ct = diff.changed_tags.as_ref().unwrap();
1742        assert_eq!(ct.get("dis"), Some(&Kind::Str("Updated".into())));
1743        assert!(ct.get("area").is_none()); // unchanged tag not included
1744
1745        // previous_tags has the old value.
1746        let pt = diff.previous_tags.as_ref().unwrap();
1747        assert_eq!(pt.get("dis"), Some(&Kind::Str("Original".into())));
1748    }
1749
1750    // ── Container tests ──
1751
1752    #[test]
1753    fn contains_check() {
1754        let mut g = EntityGraph::new();
1755        g.add(make_site("site-1")).unwrap();
1756        assert!(g.contains("site-1"));
1757        assert!(!g.contains("site-2"));
1758    }
1759
1760    #[test]
1761    fn len_and_is_empty() {
1762        let mut g = EntityGraph::new();
1763        assert!(g.is_empty());
1764        assert_eq!(g.len(), 0);
1765
1766        g.add(make_site("site-1")).unwrap();
1767        assert!(!g.is_empty());
1768        assert_eq!(g.len(), 1);
1769    }
1770
1771    // ── Query tests ──
1772
1773    #[test]
1774    fn read_with_simple_has_filter() {
1775        let mut g = EntityGraph::new();
1776        g.add(make_site("site-1")).unwrap();
1777        g.add(make_equip("equip-1", "site-1")).unwrap();
1778
1779        let results = g.read_all("site", 0).unwrap();
1780        assert_eq!(results.len(), 1);
1781        assert!(results[0].has("site"));
1782    }
1783
1784    #[test]
1785    fn read_with_comparison_filter() {
1786        let mut g = EntityGraph::new();
1787        g.add(make_point("pt-1", "equip-1")).unwrap();
1788
1789        let results = g.read_all("curVal > 70\u{00b0}F", 0).unwrap();
1790        assert_eq!(results.len(), 1);
1791    }
1792
1793    #[test]
1794    fn read_with_and_filter() {
1795        let mut g = EntityGraph::new();
1796        g.add(make_point("pt-1", "equip-1")).unwrap();
1797        g.add(make_equip("equip-1", "site-1")).unwrap();
1798
1799        let results = g.read_all("point and sensor", 0).unwrap();
1800        assert_eq!(results.len(), 1);
1801    }
1802
1803    #[test]
1804    fn read_with_or_filter() {
1805        let mut g = EntityGraph::new();
1806        g.add(make_site("site-1")).unwrap();
1807        g.add(make_equip("equip-1", "site-1")).unwrap();
1808
1809        let results = g.read_all("site or equip", 0).unwrap();
1810        assert_eq!(results.len(), 2);
1811    }
1812
1813    #[test]
1814    fn read_limit_parameter_works() {
1815        let mut g = EntityGraph::new();
1816        g.add(make_site("site-1")).unwrap();
1817        g.add(make_site("site-2")).unwrap();
1818        g.add(make_site("site-3")).unwrap();
1819
1820        let results = g.read_all("site", 2).unwrap();
1821        assert_eq!(results.len(), 2);
1822    }
1823
1824    #[test]
1825    fn read_returns_grid() {
1826        let mut g = EntityGraph::new();
1827        g.add(make_site("site-1")).unwrap();
1828        g.add(make_site("site-2")).unwrap();
1829
1830        let grid = g.read("site", 0).unwrap();
1831        assert_eq!(grid.len(), 2);
1832        assert!(grid.col("site").is_some());
1833        assert!(grid.col("id").is_some());
1834    }
1835
1836    #[test]
1837    fn read_invalid_filter() {
1838        let g = EntityGraph::new();
1839        let err = g.read("!!!", 0).unwrap_err();
1840        assert!(matches!(err, GraphError::Filter(_)));
1841    }
1842
1843    #[test]
1844    fn query_cache_returns_same_results() {
1845        let mut g = EntityGraph::new();
1846        g.add(make_site("site-1")).unwrap();
1847        g.add(make_equip("equip-1", "site-1")).unwrap();
1848        g.add(make_point("pt-1", "equip-1")).unwrap();
1849
1850        // First call populates cache
1851        let results1 = g.read_all("site", 0).unwrap();
1852        assert_eq!(results1.len(), 1);
1853
1854        // Second call should hit cache and return same results
1855        let results2 = g.read_all("site", 0).unwrap();
1856        assert_eq!(results2.len(), 1);
1857        assert_eq!(results1[0].get("id"), results2[0].get("id"));
1858    }
1859
1860    #[test]
1861    fn query_cache_invalidated_by_mutation() {
1862        let mut g = EntityGraph::new();
1863        g.add(make_site("site-1")).unwrap();
1864
1865        let results = g.read_all("site", 0).unwrap();
1866        assert_eq!(results.len(), 1);
1867
1868        // Add another site — cache should be invalidated by version bump
1869        g.add(make_site("site-2")).unwrap();
1870
1871        let results = g.read_all("site", 0).unwrap();
1872        assert_eq!(results.len(), 2);
1873    }
1874
1875    #[test]
1876    fn query_cache_capacity_scales_with_entity_count() {
1877        let mut g = EntityGraph::new();
1878        // Default cache should start at 256
1879        assert_eq!(g.query_cache_capacity(), 256);
1880        for i in 0..500 {
1881            let mut e = HDict::new();
1882            e.set("id", Kind::Ref(HRef::from_val(format!("e-{i}"))));
1883            e.set("site", Kind::Marker);
1884            g.add(e).unwrap();
1885        }
1886        // For 500 entities: (500/100).clamp(256, 1024) = 256 (still minimum)
1887        assert!(g.query_cache_capacity() >= 256);
1888    }
1889
1890    // ── Ref traversal tests ──
1891
1892    #[test]
1893    fn refs_from_returns_targets() {
1894        let mut g = EntityGraph::new();
1895        g.add(make_site("site-1")).unwrap();
1896        g.add(make_equip("equip-1", "site-1")).unwrap();
1897
1898        let targets = g.refs_from("equip-1", None);
1899        assert_eq!(targets, vec!["site-1".to_string()]);
1900    }
1901
1902    #[test]
1903    fn refs_to_returns_sources() {
1904        let mut g = EntityGraph::new();
1905        g.add(make_site("site-1")).unwrap();
1906        g.add(make_equip("equip-1", "site-1")).unwrap();
1907        g.add(make_equip("equip-2", "site-1")).unwrap();
1908
1909        let mut sources = g.refs_to("site-1", None);
1910        sources.sort();
1911        assert_eq!(sources.len(), 2);
1912    }
1913
1914    #[test]
1915    fn type_filtered_ref_queries() {
1916        let mut g = EntityGraph::new();
1917        g.add(make_site("site-1")).unwrap();
1918        g.add(make_equip("equip-1", "site-1")).unwrap();
1919
1920        let targets = g.refs_from("equip-1", Some("siteRef"));
1921        assert_eq!(targets, vec!["site-1".to_string()]);
1922
1923        let targets = g.refs_from("equip-1", Some("equipRef"));
1924        assert!(targets.is_empty());
1925    }
1926
1927    #[test]
1928    fn refs_from_nonexistent_entity() {
1929        let g = EntityGraph::new();
1930        assert!(g.refs_from("nonexistent", None).is_empty());
1931    }
1932
1933    #[test]
1934    fn refs_to_nonexistent_entity() {
1935        let g = EntityGraph::new();
1936        assert!(g.refs_to("nonexistent", None).is_empty());
1937    }
1938
1939    #[test]
1940    fn csr_survives_single_mutation() {
1941        let mut g = EntityGraph::new();
1942        let mut site = HDict::new();
1943        site.set("id", Kind::Ref(HRef::from_val("site-1")));
1944        site.set("site", Kind::Marker);
1945        g.add(site).unwrap();
1946
1947        let mut equip = HDict::new();
1948        equip.set("id", Kind::Ref(HRef::from_val("equip-1")));
1949        equip.set("equip", Kind::Marker);
1950        equip.set("siteRef", Kind::Ref(HRef::from_val("site-1")));
1951        g.add(equip).unwrap();
1952
1953        g.rebuild_csr();
1954
1955        // Mutate: add a new entity (should NOT destroy CSR)
1956        let mut point = HDict::new();
1957        point.set("id", Kind::Ref(HRef::from_val("point-1")));
1958        point.set("point", Kind::Marker);
1959        point.set("equipRef", Kind::Ref(HRef::from_val("equip-1")));
1960        g.add(point).unwrap();
1961
1962        // Forward refs should still work (CSR + patch overlay)
1963        let refs = g.refs_from("equip-1", Some("siteRef"));
1964        assert_eq!(refs, vec!["site-1".to_string()]);
1965
1966        // New entity's refs should be found via patch
1967        let refs = g.refs_from("point-1", Some("equipRef"));
1968        assert_eq!(refs, vec!["equip-1".to_string()]);
1969    }
1970
1971    // ── Serialization tests ──
1972
1973    #[test]
1974    fn from_grid_round_trip() {
1975        let mut g = EntityGraph::new();
1976        g.add(make_site("site-1")).unwrap();
1977        g.add(make_equip("equip-1", "site-1")).unwrap();
1978
1979        let grid = g.to_grid("site or equip").unwrap();
1980        assert_eq!(grid.len(), 2);
1981
1982        let g2 = EntityGraph::from_grid(&grid, None).unwrap();
1983        assert_eq!(g2.len(), 2);
1984        assert!(g2.contains("site-1"));
1985        assert!(g2.contains("equip-1"));
1986    }
1987
1988    #[test]
1989    fn to_grid_empty_result() {
1990        let g = EntityGraph::new();
1991        let grid = g.to_grid("site").unwrap();
1992        assert!(grid.is_empty());
1993    }
1994
1995    // ── Update re-indexes correctly ──
1996
1997    #[test]
1998    fn update_reindexes_tags() {
1999        let mut g = EntityGraph::new();
2000        g.add(make_site("site-1")).unwrap();
2001
2002        // Should find the site with "site" filter.
2003        assert_eq!(g.read_all("site", 0).unwrap().len(), 1);
2004
2005        // Remove the "site" marker via update.
2006        let mut changes = HDict::new();
2007        changes.set("site", Kind::Remove);
2008        g.update("site-1", changes).unwrap();
2009
2010        // Should no longer match "site" filter.
2011        assert_eq!(g.read_all("site", 0).unwrap().len(), 0);
2012    }
2013
2014    #[test]
2015    fn update_reindexes_refs() {
2016        let mut g = EntityGraph::new();
2017        g.add(make_site("site-1")).unwrap();
2018        g.add(make_site("site-2")).unwrap();
2019        g.add(make_equip("equip-1", "site-1")).unwrap();
2020
2021        // Initially equip-1 points to site-1.
2022        assert_eq!(g.refs_from("equip-1", None), vec!["site-1".to_string()]);
2023
2024        // Move equip-1 to site-2.
2025        let mut changes = HDict::new();
2026        changes.set("siteRef", Kind::Ref(HRef::from_val("site-2")));
2027        g.update("equip-1", changes).unwrap();
2028
2029        assert_eq!(g.refs_from("equip-1", None), vec!["site-2".to_string()]);
2030        assert!(g.refs_to("site-1", None).is_empty());
2031    }
2032
2033    #[test]
2034    fn update_delta_indexing_preserves_unchanged_tags() {
2035        let mut g = EntityGraph::new();
2036        let mut e = HDict::new();
2037        e.set("id", Kind::Ref(HRef::from_val("p-1")));
2038        e.set("point", Kind::Marker);
2039        e.set("sensor", Kind::Marker);
2040        e.set("curVal", Kind::Number(Number::unitless(72.0)));
2041        e.set("siteRef", Kind::Ref(HRef::from_val("site-1")));
2042        g.add(e).unwrap();
2043
2044        // Update only curVal — point, sensor, siteRef should remain indexed.
2045        let mut changes = HDict::new();
2046        changes.set("curVal", Kind::Number(Number::unitless(75.0)));
2047        g.update("p-1", changes).unwrap();
2048
2049        // Verify tag bitmap still has point and sensor.
2050        let results = g.read_all("point and sensor", 0).unwrap();
2051        assert_eq!(results.len(), 1);
2052
2053        // Verify ref adjacency still works.
2054        let refs = g.refs_from("p-1", Some("siteRef"));
2055        assert_eq!(refs, vec!["site-1".to_string()]);
2056
2057        // Verify value index has the new curVal.
2058        let results = g.read_all("curVal >= 74", 0).unwrap();
2059        assert_eq!(results.len(), 1);
2060    }
2061
2062    // ── Dangling ref validation ──
2063
2064    #[test]
2065    fn validate_detects_dangling_refs() {
2066        let mut g = EntityGraph::new();
2067        g.add(make_site("site-1")).unwrap();
2068        // equip-1 has siteRef pointing to "site-1" (exists) — no issue
2069        g.add(make_equip("equip-1", "site-1")).unwrap();
2070        // equip-2 has siteRef pointing to "site-999" (does not exist) — dangling
2071        g.add(make_equip("equip-2", "site-999")).unwrap();
2072
2073        let issues = g.validate();
2074        assert!(!issues.is_empty());
2075
2076        let dangling: Vec<_> = issues
2077            .iter()
2078            .filter(|i| i.issue_type == "dangling_ref")
2079            .collect();
2080        assert_eq!(dangling.len(), 1);
2081        assert_eq!(dangling[0].entity.as_deref(), Some("equip-2"));
2082        assert!(dangling[0].detail.contains("site-999"));
2083        assert!(dangling[0].detail.contains("siteRef"));
2084    }
2085
2086    // ── Empty filter exports all ──
2087
2088    #[test]
2089    fn to_grid_empty_filter_exports_all() {
2090        let mut g = EntityGraph::new();
2091        g.add(make_site("site-1")).unwrap();
2092        g.add(make_equip("equip-1", "site-1")).unwrap();
2093        g.add(make_point("pt-1", "equip-1")).unwrap();
2094
2095        let grid = g.to_grid("").unwrap();
2096        assert_eq!(grid.len(), 3);
2097        assert!(grid.col("id").is_some());
2098    }
2099
2100    // ── from_grid skips rows without id ──
2101
2102    #[test]
2103    fn changelog_bounded_to_max_size() {
2104        // Use a small capacity to test bounding without 50K iterations.
2105        let mut graph = EntityGraph::with_changelog_capacity(100);
2106        for i in 0..200 {
2107            let mut d = HDict::new();
2108            d.set("id", Kind::Ref(HRef::from_val(format!("e{i}"))));
2109            d.set("dis", Kind::Str(format!("Entity {i}")));
2110            graph.add(d).unwrap();
2111        }
2112        // Changelog should be capped at capacity.
2113        // Requesting since floor_version should succeed.
2114        let floor = graph.floor_version();
2115        assert!(floor > 0);
2116        let changes = graph.changes_since(floor).unwrap();
2117        assert!(changes.len() <= 100);
2118        // Latest changes should still be present.
2119        assert!(graph.changes_since(199).unwrap().len() <= 1);
2120        // Old versions should return gap.
2121        assert!(graph.changes_since(0).is_err());
2122    }
2123
2124    #[test]
2125    fn from_grid_skips_rows_without_id() {
2126        let cols = vec![HCol::new("id"), HCol::new("dis"), HCol::new("site")];
2127
2128        let mut row_with_id = HDict::new();
2129        row_with_id.set("id", Kind::Ref(HRef::from_val("site-1")));
2130        row_with_id.set("site", Kind::Marker);
2131        row_with_id.set("dis", Kind::Str("Has ID".into()));
2132
2133        // Row with string id (not a Ref) — should be skipped.
2134        let mut row_bad_id = HDict::new();
2135        row_bad_id.set("id", Kind::Str("not-a-ref".into()));
2136        row_bad_id.set("dis", Kind::Str("Bad ID".into()));
2137
2138        // Row with no id at all — should be skipped.
2139        let mut row_no_id = HDict::new();
2140        row_no_id.set("dis", Kind::Str("No ID".into()));
2141
2142        let grid = HGrid::from_parts(HDict::new(), cols, vec![row_with_id, row_bad_id, row_no_id]);
2143        let g = EntityGraph::from_grid(&grid, None).unwrap();
2144
2145        assert_eq!(g.len(), 1);
2146        assert!(g.contains("site-1"));
2147    }
2148
2149    // ── Graph traversal method tests ──
2150
2151    fn build_hierarchy_graph() -> EntityGraph {
2152        let mut g = EntityGraph::new();
2153        g.add(make_site("s1")).unwrap();
2154        g.add(make_site("s2")).unwrap();
2155        g.add(make_equip("e1", "s1")).unwrap();
2156        g.add(make_equip("e2", "s1")).unwrap();
2157        g.add(make_equip("e3", "s2")).unwrap();
2158        g.add(make_point("p1", "e1")).unwrap();
2159        g.add(make_point("p2", "e1")).unwrap();
2160        g.add(make_point("p3", "e2")).unwrap();
2161        g
2162    }
2163
2164    #[test]
2165    fn all_edges_returns_all_ref_relationships() {
2166        let g = build_hierarchy_graph();
2167        let edges = g.all_edges();
2168        // e1->s1 (siteRef), e2->s1, e3->s2, p1->e1 (equipRef), p2->e1, p3->e2
2169        assert_eq!(edges.len(), 6);
2170        assert!(
2171            edges
2172                .iter()
2173                .any(|(s, t, d)| s == "e1" && t == "siteRef" && d == "s1")
2174        );
2175        assert!(
2176            edges
2177                .iter()
2178                .any(|(s, t, d)| s == "p1" && t == "equipRef" && d == "e1")
2179        );
2180    }
2181
2182    #[test]
2183    fn all_edges_empty_graph() {
2184        let g = EntityGraph::new();
2185        assert!(g.all_edges().is_empty());
2186    }
2187
2188    #[test]
2189    fn neighbors_one_hop() {
2190        let g = build_hierarchy_graph();
2191        let (entities, edges) = g.neighbors("e1", 1, None);
2192        // e1 + s1 (forward via siteRef) + p1, p2 (reverse from equipRef)
2193        let ids: Vec<String> = entities
2194            .iter()
2195            .filter_map(|e| e.id().map(|r| r.val.clone()))
2196            .collect();
2197        assert!(ids.contains(&"e1".to_string()));
2198        assert!(ids.contains(&"s1".to_string()));
2199        assert!(ids.contains(&"p1".to_string()));
2200        assert!(ids.contains(&"p2".to_string()));
2201        assert!(!edges.is_empty());
2202    }
2203
2204    #[test]
2205    fn neighbors_with_ref_type_filter() {
2206        let g = build_hierarchy_graph();
2207        let (entities, edges) = g.neighbors("e1", 1, Some(&["siteRef"]));
2208        // Only forward siteRef edge: e1->s1
2209        let ids: Vec<String> = entities
2210            .iter()
2211            .filter_map(|e| e.id().map(|r| r.val.clone()))
2212            .collect();
2213        assert!(ids.contains(&"e1".to_string()));
2214        assert!(ids.contains(&"s1".to_string()));
2215        // Should not include p1/p2 (those connect via equipRef)
2216        assert!(!ids.contains(&"p1".to_string()));
2217        // Edges should only contain siteRef
2218        assert!(edges.iter().all(|(_, tag, _)| tag == "siteRef"));
2219    }
2220
2221    #[test]
2222    fn neighbors_zero_hops() {
2223        let g = build_hierarchy_graph();
2224        let (entities, edges) = g.neighbors("e1", 0, None);
2225        assert_eq!(entities.len(), 1);
2226        assert!(edges.is_empty());
2227    }
2228
2229    #[test]
2230    fn neighbors_nonexistent_entity() {
2231        let g = build_hierarchy_graph();
2232        let (entities, _) = g.neighbors("nonexistent", 1, None);
2233        assert!(entities.is_empty());
2234    }
2235
2236    #[test]
2237    fn shortest_path_direct() {
2238        let g = build_hierarchy_graph();
2239        let path = g.shortest_path("e1", "s1");
2240        assert_eq!(path, vec!["e1".to_string(), "s1".to_string()]);
2241    }
2242
2243    #[test]
2244    fn shortest_path_two_hops() {
2245        let g = build_hierarchy_graph();
2246        let path = g.shortest_path("p1", "s1");
2247        // p1 -> e1 -> s1
2248        assert_eq!(path.len(), 3);
2249        assert_eq!(path[0], "p1");
2250        assert_eq!(path[2], "s1");
2251    }
2252
2253    #[test]
2254    fn shortest_path_same_node() {
2255        let g = build_hierarchy_graph();
2256        let path = g.shortest_path("s1", "s1");
2257        assert_eq!(path, vec!["s1".to_string()]);
2258    }
2259
2260    #[test]
2261    fn shortest_path_no_connection() {
2262        // s1 and s2 are not connected to each other directly
2263        let g = build_hierarchy_graph();
2264        let path = g.shortest_path("s1", "s2");
2265        // They are disconnected (no edges between them)
2266        assert!(path.is_empty());
2267    }
2268
2269    #[test]
2270    fn shortest_path_nonexistent() {
2271        let g = build_hierarchy_graph();
2272        let path = g.shortest_path("s1", "nonexistent");
2273        assert!(path.is_empty());
2274    }
2275
2276    #[test]
2277    fn subtree_from_site() {
2278        let g = build_hierarchy_graph();
2279        let tree = g.subtree("s1", 10);
2280        // s1 (depth 0), e1 (1), e2 (1), p1 (2), p2 (2), p3 (2)
2281        assert_eq!(tree.len(), 6);
2282        // Root is at depth 0
2283        assert_eq!(tree[0].0.id().unwrap().val, "s1");
2284        assert_eq!(tree[0].1, 0);
2285        // Equips at depth 1
2286        let depth_1: Vec<_> = tree.iter().filter(|(_, d)| *d == 1).collect();
2287        assert_eq!(depth_1.len(), 2);
2288        // Points at depth 2
2289        let depth_2: Vec<_> = tree.iter().filter(|(_, d)| *d == 2).collect();
2290        assert_eq!(depth_2.len(), 3);
2291    }
2292
2293    #[test]
2294    fn subtree_max_depth_1() {
2295        let g = build_hierarchy_graph();
2296        let tree = g.subtree("s1", 1);
2297        // s1 (depth 0), e1 (1), e2 (1) — no points (depth 2)
2298        assert_eq!(tree.len(), 3);
2299    }
2300
2301    #[test]
2302    fn subtree_nonexistent_root() {
2303        let g = build_hierarchy_graph();
2304        let tree = g.subtree("nonexistent", 10);
2305        assert!(tree.is_empty());
2306    }
2307
2308    #[test]
2309    fn subtree_leaf_node() {
2310        let g = build_hierarchy_graph();
2311        let tree = g.subtree("p1", 10);
2312        // p1 has no children referencing it
2313        assert_eq!(tree.len(), 1);
2314        assert_eq!(tree[0].0.id().unwrap().val, "p1");
2315    }
2316
2317    // ── Haystack Hierarchy Helper tests ──
2318
2319    #[test]
2320    fn ref_chain_walks_equip_to_site() {
2321        let g = build_hierarchy_graph();
2322        // p1 → equipRef=e1 → siteRef=s1
2323        let chain = g.ref_chain("p1", &["equipRef", "siteRef"]);
2324        assert_eq!(chain.len(), 2);
2325        assert_eq!(chain[0].id().unwrap().val, "e1");
2326        assert_eq!(chain[1].id().unwrap().val, "s1");
2327    }
2328
2329    #[test]
2330    fn ref_chain_stops_on_missing_tag() {
2331        let g = build_hierarchy_graph();
2332        // e1 has siteRef but no spaceRef — should return just the site.
2333        let chain = g.ref_chain("e1", &["siteRef", "spaceRef"]);
2334        assert_eq!(chain.len(), 1);
2335        assert_eq!(chain[0].id().unwrap().val, "s1");
2336    }
2337
2338    #[test]
2339    fn ref_chain_empty_for_nonexistent() {
2340        let g = build_hierarchy_graph();
2341        let chain = g.ref_chain("nonexistent", &["equipRef"]);
2342        assert!(chain.is_empty());
2343    }
2344
2345    #[test]
2346    fn site_for_returns_site_itself() {
2347        let g = build_hierarchy_graph();
2348        let site = g.site_for("s1").unwrap();
2349        assert_eq!(site.id().unwrap().val, "s1");
2350    }
2351
2352    #[test]
2353    fn site_for_walks_from_point() {
2354        let g = build_hierarchy_graph();
2355        // p1 → equipRef=e1 → siteRef=s1
2356        let site = g.site_for("p1").unwrap();
2357        assert_eq!(site.id().unwrap().val, "s1");
2358    }
2359
2360    #[test]
2361    fn site_for_walks_from_equip() {
2362        let g = build_hierarchy_graph();
2363        let site = g.site_for("e1").unwrap();
2364        assert_eq!(site.id().unwrap().val, "s1");
2365    }
2366
2367    #[test]
2368    fn children_returns_direct_refs() {
2369        let g = build_hierarchy_graph();
2370        let kids = g.children("s1");
2371        // e1 and e2 reference s1 via siteRef.
2372        let ids: Vec<&str> = kids.iter().map(|e| e.id().unwrap().val.as_str()).collect();
2373        assert!(ids.contains(&"e1"));
2374        assert!(ids.contains(&"e2"));
2375    }
2376
2377    #[test]
2378    fn equip_points_returns_points_only() {
2379        let g = build_hierarchy_graph();
2380        let points = g.equip_points("e1", None);
2381        assert_eq!(points.len(), 2); // p1, p2
2382        for p in &points {
2383            assert!(p.has("point"));
2384        }
2385    }
2386
2387    #[test]
2388    fn equip_points_with_filter() {
2389        let mut g = build_hierarchy_graph();
2390        // Existing points already have temp marker. Add one with flow instead.
2391        let mut pf = HDict::new();
2392        pf.set("id", Kind::Ref(HRef::from_val("pf")));
2393        pf.set("point", Kind::Marker);
2394        pf.set("flow", Kind::Marker);
2395        pf.set("equipRef", Kind::Ref(HRef::from_val("e1")));
2396        g.add(pf).unwrap();
2397
2398        let temp_points = g.equip_points("e1", Some("temp"));
2399        // Only p1 and p2 have temp (the existing ones).
2400        assert_eq!(temp_points.len(), 2);
2401        assert!(temp_points.iter().all(|p| p.has("temp")));
2402    }
2403
2404    // ── Hierarchy tree tests ──
2405
2406    #[test]
2407    fn hierarchy_tree_from_site() {
2408        let g = build_hierarchy_graph();
2409        let tree = g.hierarchy_tree("s1", 10).unwrap();
2410        assert_eq!(tree.depth, 0);
2411        assert_eq!(tree.entity.id().unwrap().val, "s1");
2412        // s1 has children e1, e2
2413        assert_eq!(tree.children.len(), 2);
2414        let child_ids: Vec<String> = tree
2415            .children
2416            .iter()
2417            .map(|c| c.entity.id().unwrap().val.clone())
2418            .collect();
2419        assert!(child_ids.contains(&"e1".to_string()));
2420        assert!(child_ids.contains(&"e2".to_string()));
2421        // e1 has children p1, p2
2422        let e1_node = tree
2423            .children
2424            .iter()
2425            .find(|c| c.entity.id().unwrap().val == "e1")
2426            .unwrap();
2427        assert_eq!(e1_node.children.len(), 2);
2428        let point_ids: Vec<String> = e1_node
2429            .children
2430            .iter()
2431            .map(|c| c.entity.id().unwrap().val.clone())
2432            .collect();
2433        assert!(point_ids.contains(&"p1".to_string()));
2434        assert!(point_ids.contains(&"p2".to_string()));
2435    }
2436
2437    #[test]
2438    fn hierarchy_tree_max_depth() {
2439        let g = build_hierarchy_graph();
2440        // depth 0 = root only
2441        let tree = g.hierarchy_tree("s1", 0).unwrap();
2442        assert!(tree.children.is_empty());
2443        // depth 1 = root + direct children
2444        let tree = g.hierarchy_tree("s1", 1).unwrap();
2445        assert_eq!(tree.children.len(), 2);
2446        assert!(tree.children.iter().all(|c| c.children.is_empty()));
2447    }
2448
2449    #[test]
2450    fn hierarchy_tree_missing_root() {
2451        let g = build_hierarchy_graph();
2452        assert!(g.hierarchy_tree("nonexistent", 10).is_none());
2453    }
2454
2455    // ── Classify tests ──
2456
2457    #[test]
2458    fn classify_site() {
2459        let g = build_hierarchy_graph();
2460        assert_eq!(g.classify("s1").unwrap(), "site");
2461    }
2462
2463    #[test]
2464    fn classify_equip() {
2465        let mut g = EntityGraph::new();
2466        let mut d = HDict::new();
2467        d.set("id", Kind::Ref(HRef::from_val("ahu-1")));
2468        d.set("equip", Kind::Marker);
2469        d.set("ahu", Kind::Marker);
2470        g.add(d).unwrap();
2471        assert_eq!(g.classify("ahu-1").unwrap(), "ahu");
2472    }
2473
2474    #[test]
2475    fn classify_point() {
2476        let g = build_hierarchy_graph();
2477        // Points have point + sensor + temp markers; sensor is most specific.
2478        assert_eq!(g.classify("p1").unwrap(), "sensor");
2479    }
2480
2481    #[test]
2482    fn classify_unknown() {
2483        let mut g = EntityGraph::new();
2484        let mut d = HDict::new();
2485        d.set("id", Kind::Ref(HRef::from_val("x1")));
2486        d.set("custom", Kind::Marker);
2487        g.add(d).unwrap();
2488        assert!(g.classify("x1").is_none());
2489    }
2490
2491    #[test]
2492    fn changes_since_binary_search_equivalence() {
2493        let mut g = EntityGraph::new();
2494        for i in 0..100 {
2495            let mut e = HDict::new();
2496            e.set("id", Kind::Ref(HRef::from_val(format!("e-{i}"))));
2497            e.set("site", Kind::Marker);
2498            g.add(e).unwrap();
2499        }
2500        // After 100 adds, version is 100.
2501        // changes_since(50) should return versions 51..=100 (50 entries).
2502        let changes = g.changes_since(50).unwrap();
2503        assert_eq!(changes.len(), 50);
2504        assert_eq!(changes.first().unwrap().version, 51);
2505        assert_eq!(changes.last().unwrap().version, 100);
2506    }
2507
2508    #[test]
2509    fn add_bulk_skips_changelog() {
2510        let mut g = EntityGraph::new();
2511        for i in 0..10 {
2512            let mut e = HDict::new();
2513            e.set("id", Kind::Ref(HRef::from_val(format!("e-{i}"))));
2514            e.set("site", Kind::Marker);
2515            g.add_bulk(e).unwrap();
2516        }
2517        assert_eq!(g.len(), 10);
2518        assert_eq!(g.version(), 0); // version not bumped during bulk load
2519        assert!(g.changes_since(0).unwrap().is_empty()); // no changelog entries
2520    }
2521}