Skip to main content

haystack_core/graph/
columnar.rs

1// Columnar entity storage — struct-of-arrays layout for cache-friendly scans.
2//
3// Provides an auxiliary column-oriented view of entity data, indexed by
4// entity numeric ID (same IDs used in bitmap/value indexes). This enables
5// SIMD-friendly, cache-line-efficient iteration over a single tag across
6// all entities — ideal for scan-heavy filter queries.
7//
8// This is a **supplementary** store; the authoritative data remains in the
9// HashMap<String, HDict>. Columns are lazily populated and rebuilt on demand.
10
11use std::collections::HashMap;
12
13use crate::kinds::Kind;
14
15/// Column-oriented storage for entity tags, indexed by entity ID.
16///
17/// Each "column" is a `Vec<Option<Kind>>` where index `i` corresponds to
18/// entity ID `i`. Missing entities or missing tags for that entity are `None`.
19///
20/// Benefits:
21/// - Sequential memory access when scanning a single tag across all entities
22/// - Cache-line prefetching works optimally (no pointer chasing through HDict)
23/// - Natural alignment with bitmap indexes (same entity ID space)
24pub struct ColumnarStore {
25    /// Tag name → column of values indexed by entity ID.
26    columns: HashMap<String, Vec<Option<Kind>>>,
27    /// Set of tag names that are actively tracked as columns.
28    tracked_tags: Vec<String>,
29    /// Allocated capacity (max entity ID + 1).
30    capacity: usize,
31}
32
33impl ColumnarStore {
34    pub fn new() -> Self {
35        Self {
36            columns: HashMap::new(),
37            tracked_tags: Vec::new(),
38            capacity: 0,
39        }
40    }
41
42    /// Register a tag to be tracked as a column. Must call `rebuild()` after
43    /// registering new tags to populate from existing entities.
44    pub fn track_tag(&mut self, tag: &str) {
45        if !self.tracked_tags.iter().any(|t| t == tag) {
46            self.tracked_tags.push(tag.to_string());
47            self.columns
48                .insert(tag.to_string(), vec![None; self.capacity]);
49        }
50    }
51
52    /// Returns true if the given tag is tracked as a column.
53    pub fn is_tracked(&self, tag: &str) -> bool {
54        self.columns.contains_key(tag)
55    }
56
57    /// Set the value for entity `eid` at the given tag column.
58    pub fn set(&mut self, eid: usize, tag: &str, value: &Kind) {
59        if let Some(col) = self.columns.get_mut(tag) {
60            if eid >= col.len() {
61                col.resize(eid + 1, None);
62                if eid >= self.capacity {
63                    self.capacity = eid + 1;
64                }
65            }
66            col[eid] = Some(value.clone());
67        }
68    }
69
70    /// Clear the value for entity `eid` at the given tag column.
71    pub fn clear_entity(&mut self, eid: usize) {
72        for col in self.columns.values_mut() {
73            if eid < col.len() {
74                col[eid] = None;
75            }
76        }
77    }
78
79    /// Ensure all columns can hold at least `new_capacity` entries.
80    pub fn ensure_capacity(&mut self, new_capacity: usize) {
81        if new_capacity > self.capacity {
82            for col in self.columns.values_mut() {
83                col.resize(new_capacity, None);
84            }
85            self.capacity = new_capacity;
86        }
87    }
88
89    /// Get a column slice for a tracked tag. Returns None if the tag is not tracked.
90    pub fn column(&self, tag: &str) -> Option<&[Option<Kind>]> {
91        self.columns.get(tag).map(|c| c.as_slice())
92    }
93
94    /// Get the value for a specific entity and tag.
95    pub fn get(&self, eid: usize, tag: &str) -> Option<&Kind> {
96        self.columns.get(tag)?.get(eid).and_then(|opt| opt.as_ref())
97    }
98
99    /// Scan a column and return entity IDs where the predicate matches.
100    /// This is the primary performance advantage: sequential memory access.
101    pub fn scan_column<F>(&self, tag: &str, predicate: F) -> Vec<usize>
102    where
103        F: Fn(&Kind) -> bool,
104    {
105        match self.columns.get(tag) {
106            Some(col) => col
107                .iter()
108                .enumerate()
109                .filter_map(|(eid, val)| val.as_ref().filter(|v| predicate(v)).map(|_| eid))
110                .collect(),
111            None => Vec::new(),
112        }
113    }
114
115    /// Number of tracked columns.
116    pub fn column_count(&self) -> usize {
117        self.columns.len()
118    }
119
120    /// Current capacity (max entity IDs).
121    pub fn capacity(&self) -> usize {
122        self.capacity
123    }
124
125    /// Clear all column data (keeps tracked tags registered).
126    pub fn clear(&mut self) {
127        for col in self.columns.values_mut() {
128            col.fill(None);
129        }
130    }
131
132    /// Tracked tag names.
133    pub fn tracked_tags(&self) -> &[String] {
134        &self.tracked_tags
135    }
136}
137
138impl Default for ColumnarStore {
139    fn default() -> Self {
140        Self::new()
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147    use crate::kinds::Number;
148
149    #[test]
150    fn track_and_set_values() {
151        let mut store = ColumnarStore::new();
152        store.track_tag("temp");
153        store.ensure_capacity(3);
154
155        store.set(0, "temp", &Kind::Number(Number::unitless(72.0)));
156        store.set(2, "temp", &Kind::Number(Number::unitless(68.5)));
157
158        assert!(store.get(0, "temp").is_some());
159        assert!(store.get(1, "temp").is_none()); // Entity 1 has no temp
160        assert!(store.get(2, "temp").is_some());
161    }
162
163    #[test]
164    fn scan_column_numeric() {
165        let mut store = ColumnarStore::new();
166        store.track_tag("temp");
167        store.ensure_capacity(5);
168
169        store.set(0, "temp", &Kind::Number(Number::unitless(72.0)));
170        store.set(1, "temp", &Kind::Number(Number::unitless(68.5)));
171        store.set(2, "temp", &Kind::Number(Number::unitless(75.0)));
172        store.set(3, "temp", &Kind::Number(Number::unitless(65.0)));
173
174        let above_70: Vec<usize> = store.scan_column("temp", |k| match k {
175            Kind::Number(n) => n.val > 70.0,
176            _ => false,
177        });
178        assert_eq!(above_70, vec![0, 2]); // 72.0 and 75.0
179    }
180
181    #[test]
182    fn scan_column_string() {
183        let mut store = ColumnarStore::new();
184        store.track_tag("dis");
185        store.ensure_capacity(3);
186
187        store.set(0, "dis", &Kind::Str("Building A".to_string()));
188        store.set(1, "dis", &Kind::Str("Building B".to_string()));
189        store.set(2, "dis", &Kind::Str("AHU-1".to_string()));
190
191        let buildings: Vec<usize> = store.scan_column("dis", |k| match k {
192            Kind::Str(s) => s.starts_with("Building"),
193            _ => false,
194        });
195        assert_eq!(buildings, vec![0, 1]);
196    }
197
198    #[test]
199    fn clear_entity() {
200        let mut store = ColumnarStore::new();
201        store.track_tag("temp");
202        store.track_tag("dis");
203        store.ensure_capacity(2);
204
205        store.set(0, "temp", &Kind::Number(Number::unitless(72.0)));
206        store.set(0, "dis", &Kind::Str("Sensor 1".to_string()));
207
208        store.clear_entity(0);
209        assert!(store.get(0, "temp").is_none());
210        assert!(store.get(0, "dis").is_none());
211    }
212
213    #[test]
214    fn untracked_tag_ignored() {
215        let mut store = ColumnarStore::new();
216        store.track_tag("temp");
217
218        store.set(0, "humidity", &Kind::Number(Number::unitless(50.0)));
219        assert!(store.get(0, "humidity").is_none());
220        assert!(!store.is_tracked("humidity"));
221    }
222
223    #[test]
224    fn auto_extend_capacity() {
225        let mut store = ColumnarStore::new();
226        store.track_tag("temp");
227
228        // Set entity beyond initial capacity — should auto-extend.
229        store.set(100, "temp", &Kind::Number(Number::unitless(72.0)));
230        assert!(store.get(100, "temp").is_some());
231        assert!(store.capacity() >= 101);
232    }
233
234    #[test]
235    fn column_returns_slice() {
236        let mut store = ColumnarStore::new();
237        store.track_tag("temp");
238        store.ensure_capacity(3);
239        store.set(1, "temp", &Kind::Number(Number::unitless(72.0)));
240
241        let col = store.column("temp").unwrap();
242        assert_eq!(col.len(), 3);
243        assert!(col[0].is_none());
244        assert!(col[1].is_some());
245        assert!(col[2].is_none());
246    }
247
248    #[test]
249    fn scan_empty_column() {
250        let mut store = ColumnarStore::new();
251        store.track_tag("temp");
252        let results = store.scan_column("temp", |_| true);
253        assert!(results.is_empty());
254    }
255
256    #[test]
257    fn scan_untracked_column() {
258        let store = ColumnarStore::new();
259        let results = store.scan_column("nonexistent", |_| true);
260        assert!(results.is_empty());
261    }
262}