grafeo_core/graph/lpg/store/search.rs
1use super::{LpgStore, value_in_range};
2use crate::graph::lpg::property::CompareOp;
3use crate::index::zone_map::ZoneMapEntry;
4use grafeo_common::types::{HashableValue, NodeId, PropertyKey, Value};
5
6impl LpgStore {
7 /// Finds all nodes whose property value falls within a range.
8 ///
9 /// Uses zone maps to skip the scan entirely when no values could possibly
10 /// match. This is the primary building block for range predicates in query
11 /// execution.
12 ///
13 /// # Arguments
14 ///
15 /// * `property` - The property key to check
16 /// * `min` - Optional lower bound
17 /// * `max` - Optional upper bound
18 /// * `min_inclusive` - Whether the lower bound is inclusive
19 /// * `max_inclusive` - Whether the upper bound is inclusive
20 ///
21 /// # Example
22 ///
23 /// ```
24 /// use grafeo_core::graph::lpg::LpgStore;
25 /// use grafeo_common::types::Value;
26 ///
27 /// let store = LpgStore::new();
28 /// let n1 = store.create_node(&["Person"]);
29 /// let n2 = store.create_node(&["Person"]);
30 /// store.set_node_property(n1, "age", Value::from(25i64));
31 /// store.set_node_property(n2, "age", Value::from(35i64));
32 ///
33 /// // Find nodes where age > 30
34 /// let result = store.find_nodes_in_range(
35 /// "age",
36 /// Some(&Value::from(30i64)),
37 /// None,
38 /// false, // exclusive lower bound
39 /// true, // inclusive upper bound (doesn't matter since None)
40 /// );
41 /// assert_eq!(result.len(), 1); // Only n2 matches
42 /// ```
43 #[must_use]
44 pub fn find_nodes_in_range(
45 &self,
46 property: &str,
47 min: Option<&Value>,
48 max: Option<&Value>,
49 min_inclusive: bool,
50 max_inclusive: bool,
51 ) -> Vec<NodeId> {
52 let key = PropertyKey::new(property);
53
54 // Check zone map first - if no values could match, return empty
55 if !self
56 .node_properties
57 .might_match_range(&key, min, max, min_inclusive, max_inclusive)
58 {
59 return Vec::new();
60 }
61
62 // Scan all nodes and filter by range
63 self.node_ids()
64 .into_iter()
65 .filter(|&node_id| {
66 self.node_properties
67 .get(node_id, &key)
68 .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
69 })
70 .collect()
71 }
72
73 /// Finds nodes matching multiple property equality conditions.
74 ///
75 /// This is more efficient than intersecting multiple single-property lookups
76 /// because it can use indexes when available and short-circuits on the first
77 /// miss.
78 ///
79 /// # Example
80 ///
81 /// ```
82 /// use grafeo_core::graph::lpg::LpgStore;
83 /// use grafeo_common::types::Value;
84 ///
85 /// let store = LpgStore::new();
86 /// let alice = store.create_node(&["Person"]);
87 /// store.set_node_property(alice, "name", Value::from("Alice"));
88 /// store.set_node_property(alice, "city", Value::from("NYC"));
89 ///
90 /// // Find nodes where name = "Alice" AND city = "NYC"
91 /// let matches = store.find_nodes_by_properties(&[
92 /// ("name", Value::from("Alice")),
93 /// ("city", Value::from("NYC")),
94 /// ]);
95 /// assert!(matches.contains(&alice));
96 /// ```
97 #[must_use]
98 pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
99 if conditions.is_empty() {
100 return self.node_ids();
101 }
102
103 // Find the most selective condition (smallest result set) to start
104 // If any condition has an index, use that first
105 let mut best_start: Option<(usize, Vec<NodeId>)> = None;
106 let indexes = self.property_indexes.read();
107
108 for (i, (prop, value)) in conditions.iter().enumerate() {
109 let key = PropertyKey::new(*prop);
110 let hv = HashableValue::new(value.clone());
111
112 if let Some(index) = indexes.get(&key) {
113 let matches: Vec<NodeId> = index
114 .get(&hv)
115 .map(|nodes| nodes.iter().copied().collect())
116 .unwrap_or_default();
117
118 // Short-circuit if any indexed condition has no matches
119 if matches.is_empty() {
120 return Vec::new();
121 }
122
123 // Use smallest indexed result as starting point
124 if best_start
125 .as_ref()
126 .is_none_or(|(_, best)| matches.len() < best.len())
127 {
128 best_start = Some((i, matches));
129 }
130 }
131 }
132 drop(indexes);
133
134 // Start from best indexed result or fall back to full node scan
135 let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
136 // No indexes available, start with first condition via full scan
137 let (prop, value) = &conditions[0];
138 (0, self.find_nodes_by_property(prop, value))
139 });
140
141 // Filter candidates through remaining conditions
142 for (i, (prop, value)) in conditions.iter().enumerate() {
143 if i == start_idx {
144 continue;
145 }
146
147 let key = PropertyKey::new(*prop);
148 candidates.retain(|&node_id| {
149 self.node_properties
150 .get(node_id, &key)
151 .is_some_and(|v| v == *value)
152 });
153
154 // Short-circuit if no candidates remain
155 if candidates.is_empty() {
156 return Vec::new();
157 }
158 }
159
160 candidates
161 }
162
163 /// Finds all nodes that have a specific property value.
164 ///
165 /// If the property is indexed, this is O(1). Otherwise, it scans all nodes
166 /// which is O(n). Use [`Self::create_property_index`] for frequently queried properties.
167 ///
168 /// # Example
169 ///
170 /// ```
171 /// use grafeo_core::graph::lpg::LpgStore;
172 /// use grafeo_common::types::Value;
173 ///
174 /// let store = LpgStore::new();
175 /// store.create_property_index("city"); // Optional but makes lookups fast
176 ///
177 /// let alice = store.create_node(&["Person"]);
178 /// let bob = store.create_node(&["Person"]);
179 /// store.set_node_property(alice, "city", Value::from("NYC"));
180 /// store.set_node_property(bob, "city", Value::from("NYC"));
181 ///
182 /// let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
183 /// assert_eq!(nyc_people.len(), 2);
184 /// ```
185 #[must_use]
186 pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
187 let key = PropertyKey::new(property);
188 let hv = HashableValue::new(value.clone());
189
190 // Try indexed lookup first
191 let indexes = self.property_indexes.read();
192 if let Some(index) = indexes.get(&key) {
193 if let Some(nodes) = index.get(&hv) {
194 return nodes.iter().copied().collect();
195 }
196 return Vec::new();
197 }
198 drop(indexes);
199
200 // Fall back to full scan
201 self.node_ids()
202 .into_iter()
203 .filter(|&node_id| {
204 self.node_properties
205 .get(node_id, &key)
206 .is_some_and(|v| v == *value)
207 })
208 .collect()
209 }
210
211 /// Finds nodes whose property matches an operator filter.
212 ///
213 /// The `filter_value` is either a scalar (equality) or a `Value::Map` with
214 /// `$`-prefixed operator keys like `$gt`, `$lt`, `$gte`, `$lte`, `$in`,
215 /// `$nin`, `$ne`, `$contains`.
216 pub fn find_nodes_matching_filter(&self, property: &str, filter_value: &Value) -> Vec<NodeId> {
217 let key = PropertyKey::new(property);
218 self.node_ids()
219 .into_iter()
220 .filter(|&node_id| {
221 self.node_properties
222 .get(node_id, &key)
223 .is_some_and(|v| Self::matches_filter(&v, filter_value))
224 })
225 .collect()
226 }
227
228 /// Checks if a node property value matches a filter value.
229 ///
230 /// - Scalar filter: equality check
231 /// - Map filter with `$`-prefixed keys: operator evaluation
232 fn matches_filter(node_value: &Value, filter_value: &Value) -> bool {
233 match filter_value {
234 Value::Map(ops) if ops.keys().any(|k| k.as_str().starts_with('$')) => {
235 ops.iter().all(|(op_key, op_val)| {
236 match op_key.as_str() {
237 "$gt" => {
238 Self::compare_values(node_value, op_val)
239 == Some(std::cmp::Ordering::Greater)
240 }
241 "$gte" => matches!(
242 Self::compare_values(node_value, op_val),
243 Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
244 ),
245 "$lt" => {
246 Self::compare_values(node_value, op_val)
247 == Some(std::cmp::Ordering::Less)
248 }
249 "$lte" => matches!(
250 Self::compare_values(node_value, op_val),
251 Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
252 ),
253 "$ne" => node_value != op_val,
254 "$in" => match op_val {
255 Value::List(items) => items.iter().any(|v| v == node_value),
256 _ => false,
257 },
258 "$nin" => match op_val {
259 Value::List(items) => !items.iter().any(|v| v == node_value),
260 _ => true,
261 },
262 "$contains" => match (node_value, op_val) {
263 (Value::String(a), Value::String(b)) => a.contains(b.as_str()),
264 _ => false,
265 },
266 _ => false, // Unknown operator — no match
267 }
268 })
269 }
270 _ => node_value == filter_value, // Equality (backward compatible)
271 }
272 }
273
274 /// Compares two values for ordering (cross-type numeric comparison supported).
275 fn compare_values(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
276 match (a, b) {
277 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
278 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
279 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
280 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
281 (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
282 (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
283 (Value::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
284 (Value::Date(a), Value::Date(b)) => Some(a.cmp(b)),
285 (Value::Time(a), Value::Time(b)) => Some(a.cmp(b)),
286 _ => None,
287 }
288 }
289
290 // === Zone Map Support ===
291
292 /// Checks if a node property predicate might match any nodes.
293 ///
294 /// Uses zone maps for early filtering. Returns `true` if there might be
295 /// matching nodes, `false` if there definitely aren't.
296 #[must_use]
297 pub fn node_property_might_match(
298 &self,
299 property: &PropertyKey,
300 op: CompareOp,
301 value: &Value,
302 ) -> bool {
303 self.node_properties.might_match(property, op, value)
304 }
305
306 /// Checks if an edge property predicate might match any edges.
307 #[must_use]
308 pub fn edge_property_might_match(
309 &self,
310 property: &PropertyKey,
311 op: CompareOp,
312 value: &Value,
313 ) -> bool {
314 self.edge_properties.might_match(property, op, value)
315 }
316
317 /// Gets the zone map for a node property.
318 #[must_use]
319 pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
320 self.node_properties.zone_map(property)
321 }
322
323 /// Gets the zone map for an edge property.
324 #[must_use]
325 pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
326 self.edge_properties.zone_map(property)
327 }
328
329 /// Rebuilds zone maps for all properties.
330 #[doc(hidden)]
331 pub fn rebuild_zone_maps(&self) {
332 self.node_properties.rebuild_zone_maps();
333 self.edge_properties.rebuild_zone_maps();
334 }
335}