Skip to main content

grafeo_core/graph/lpg/store/
search.rs

1use super::{LpgStore, value_in_range};
2use crate::graph::lpg::property::CompareOp;
3use crate::index::zone_map::ZoneMapEntry;
4use grafeo_common::types::{HashableValue, NodeId, PropertyKey, Value};
5
6impl LpgStore {
7    /// Finds all nodes whose property value falls within a range.
8    ///
9    /// Uses zone maps to skip the scan entirely when no values could possibly
10    /// match. This is the primary building block for range predicates in query
11    /// execution.
12    ///
13    /// # Arguments
14    ///
15    /// * `property` - The property key to check
16    /// * `min` - Optional lower bound
17    /// * `max` - Optional upper bound
18    /// * `min_inclusive` - Whether the lower bound is inclusive
19    /// * `max_inclusive` - Whether the upper bound is inclusive
20    ///
21    /// # Example
22    ///
23    /// ```
24    /// use grafeo_core::graph::lpg::LpgStore;
25    /// use grafeo_common::types::Value;
26    ///
27    /// let store = LpgStore::new();
28    /// let n1 = store.create_node(&["Person"]);
29    /// let n2 = store.create_node(&["Person"]);
30    /// store.set_node_property(n1, "age", Value::from(25i64));
31    /// store.set_node_property(n2, "age", Value::from(35i64));
32    ///
33    /// // Find nodes where age > 30
34    /// let result = store.find_nodes_in_range(
35    ///     "age",
36    ///     Some(&Value::from(30i64)),
37    ///     None,
38    ///     false, // exclusive lower bound
39    ///     true,  // inclusive upper bound (doesn't matter since None)
40    /// );
41    /// assert_eq!(result.len(), 1); // Only n2 matches
42    /// ```
43    #[must_use]
44    pub fn find_nodes_in_range(
45        &self,
46        property: &str,
47        min: Option<&Value>,
48        max: Option<&Value>,
49        min_inclusive: bool,
50        max_inclusive: bool,
51    ) -> Vec<NodeId> {
52        let key = PropertyKey::new(property);
53
54        // Check zone map first - if no values could match, return empty
55        if !self
56            .node_properties
57            .might_match_range(&key, min, max, min_inclusive, max_inclusive)
58        {
59            return Vec::new();
60        }
61
62        // Scan all nodes and filter by range
63        self.node_ids()
64            .into_iter()
65            .filter(|&node_id| {
66                self.node_properties
67                    .get(node_id, &key)
68                    .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
69            })
70            .collect()
71    }
72
73    /// Finds nodes matching multiple property equality conditions.
74    ///
75    /// This is more efficient than intersecting multiple single-property lookups
76    /// because it can use indexes when available and short-circuits on the first
77    /// miss.
78    ///
79    /// # Example
80    ///
81    /// ```
82    /// use grafeo_core::graph::lpg::LpgStore;
83    /// use grafeo_common::types::Value;
84    ///
85    /// let store = LpgStore::new();
86    /// let alice = store.create_node(&["Person"]);
87    /// store.set_node_property(alice, "name", Value::from("Alice"));
88    /// store.set_node_property(alice, "city", Value::from("NYC"));
89    ///
90    /// // Find nodes where name = "Alice" AND city = "NYC"
91    /// let matches = store.find_nodes_by_properties(&[
92    ///     ("name", Value::from("Alice")),
93    ///     ("city", Value::from("NYC")),
94    /// ]);
95    /// assert!(matches.contains(&alice));
96    /// ```
97    #[must_use]
98    pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
99        if conditions.is_empty() {
100            return self.node_ids();
101        }
102
103        // Find the most selective condition (smallest result set) to start
104        // If any condition has an index, use that first
105        let mut best_start: Option<(usize, Vec<NodeId>)> = None;
106        let indexes = self.property_indexes.read();
107
108        for (i, (prop, value)) in conditions.iter().enumerate() {
109            let key = PropertyKey::new(*prop);
110            let hv = HashableValue::new(value.clone());
111
112            if let Some(index) = indexes.get(&key) {
113                let matches: Vec<NodeId> = index
114                    .get(&hv)
115                    .map(|nodes| nodes.iter().copied().collect())
116                    .unwrap_or_default();
117
118                // Short-circuit if any indexed condition has no matches
119                if matches.is_empty() {
120                    return Vec::new();
121                }
122
123                // Use smallest indexed result as starting point
124                if best_start
125                    .as_ref()
126                    .is_none_or(|(_, best)| matches.len() < best.len())
127                {
128                    best_start = Some((i, matches));
129                }
130            }
131        }
132        drop(indexes);
133
134        // Start from best indexed result or fall back to full node scan
135        let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
136            // No indexes available, start with first condition via full scan
137            let (prop, value) = &conditions[0];
138            (0, self.find_nodes_by_property(prop, value))
139        });
140
141        // Filter candidates through remaining conditions
142        for (i, (prop, value)) in conditions.iter().enumerate() {
143            if i == start_idx {
144                continue;
145            }
146
147            let key = PropertyKey::new(*prop);
148            candidates.retain(|&node_id| {
149                self.node_properties
150                    .get(node_id, &key)
151                    .is_some_and(|v| v == *value)
152            });
153
154            // Short-circuit if no candidates remain
155            if candidates.is_empty() {
156                return Vec::new();
157            }
158        }
159
160        candidates
161    }
162
163    /// Finds all nodes that have a specific property value.
164    ///
165    /// If the property is indexed, this is O(1). Otherwise, it scans all nodes
166    /// which is O(n). Use [`Self::create_property_index`] for frequently queried properties.
167    ///
168    /// # Example
169    ///
170    /// ```
171    /// use grafeo_core::graph::lpg::LpgStore;
172    /// use grafeo_common::types::Value;
173    ///
174    /// let store = LpgStore::new();
175    /// store.create_property_index("city"); // Optional but makes lookups fast
176    ///
177    /// let alice = store.create_node(&["Person"]);
178    /// let bob = store.create_node(&["Person"]);
179    /// store.set_node_property(alice, "city", Value::from("NYC"));
180    /// store.set_node_property(bob, "city", Value::from("NYC"));
181    ///
182    /// let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
183    /// assert_eq!(nyc_people.len(), 2);
184    /// ```
185    #[must_use]
186    pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
187        let key = PropertyKey::new(property);
188        let hv = HashableValue::new(value.clone());
189
190        // Try indexed lookup first
191        let indexes = self.property_indexes.read();
192        if let Some(index) = indexes.get(&key) {
193            if let Some(nodes) = index.get(&hv) {
194                return nodes.iter().copied().collect();
195            }
196            return Vec::new();
197        }
198        drop(indexes);
199
200        // Fall back to full scan
201        self.node_ids()
202            .into_iter()
203            .filter(|&node_id| {
204                self.node_properties
205                    .get(node_id, &key)
206                    .is_some_and(|v| v == *value)
207            })
208            .collect()
209    }
210
211    /// Finds nodes whose property matches an operator filter.
212    ///
213    /// The `filter_value` is either a scalar (equality) or a `Value::Map` with
214    /// `$`-prefixed operator keys like `$gt`, `$lt`, `$gte`, `$lte`, `$in`,
215    /// `$nin`, `$ne`, `$contains`.
216    pub fn find_nodes_matching_filter(&self, property: &str, filter_value: &Value) -> Vec<NodeId> {
217        let key = PropertyKey::new(property);
218        self.node_ids()
219            .into_iter()
220            .filter(|&node_id| {
221                self.node_properties
222                    .get(node_id, &key)
223                    .is_some_and(|v| Self::matches_filter(&v, filter_value))
224            })
225            .collect()
226    }
227
228    /// Checks if a node property value matches a filter value.
229    ///
230    /// - Scalar filter: equality check
231    /// - Map filter with `$`-prefixed keys: operator evaluation
232    fn matches_filter(node_value: &Value, filter_value: &Value) -> bool {
233        match filter_value {
234            Value::Map(ops) if ops.keys().any(|k| k.as_str().starts_with('$')) => {
235                ops.iter().all(|(op_key, op_val)| {
236                    match op_key.as_str() {
237                        "$gt" => {
238                            Self::compare_values(node_value, op_val)
239                                == Some(std::cmp::Ordering::Greater)
240                        }
241                        "$gte" => matches!(
242                            Self::compare_values(node_value, op_val),
243                            Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
244                        ),
245                        "$lt" => {
246                            Self::compare_values(node_value, op_val)
247                                == Some(std::cmp::Ordering::Less)
248                        }
249                        "$lte" => matches!(
250                            Self::compare_values(node_value, op_val),
251                            Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
252                        ),
253                        "$ne" => node_value != op_val,
254                        "$in" => match op_val {
255                            Value::List(items) => items.iter().any(|v| v == node_value),
256                            _ => false,
257                        },
258                        "$nin" => match op_val {
259                            Value::List(items) => !items.iter().any(|v| v == node_value),
260                            _ => true,
261                        },
262                        "$contains" => match (node_value, op_val) {
263                            (Value::String(a), Value::String(b)) => a.contains(b.as_str()),
264                            _ => false,
265                        },
266                        _ => false, // Unknown operator — no match
267                    }
268                })
269            }
270            _ => node_value == filter_value, // Equality (backward compatible)
271        }
272    }
273
274    /// Compares two values for ordering (cross-type numeric comparison supported).
275    fn compare_values(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
276        match (a, b) {
277            (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
278            (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
279            (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
280            (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
281            (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
282            (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
283            (Value::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
284            (Value::Date(a), Value::Date(b)) => Some(a.cmp(b)),
285            (Value::Time(a), Value::Time(b)) => Some(a.cmp(b)),
286            _ => None,
287        }
288    }
289
290    // === Zone Map Support ===
291
292    /// Checks if a node property predicate might match any nodes.
293    ///
294    /// Uses zone maps for early filtering. Returns `true` if there might be
295    /// matching nodes, `false` if there definitely aren't.
296    #[must_use]
297    pub fn node_property_might_match(
298        &self,
299        property: &PropertyKey,
300        op: CompareOp,
301        value: &Value,
302    ) -> bool {
303        self.node_properties.might_match(property, op, value)
304    }
305
306    /// Checks if an edge property predicate might match any edges.
307    #[must_use]
308    pub fn edge_property_might_match(
309        &self,
310        property: &PropertyKey,
311        op: CompareOp,
312        value: &Value,
313    ) -> bool {
314        self.edge_properties.might_match(property, op, value)
315    }
316
317    /// Gets the zone map for a node property.
318    #[must_use]
319    pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
320        self.node_properties.zone_map(property)
321    }
322
323    /// Gets the zone map for an edge property.
324    #[must_use]
325    pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
326        self.edge_properties.zone_map(property)
327    }
328
329    /// Rebuilds zone maps for all properties.
330    #[doc(hidden)]
331    pub fn rebuild_zone_maps(&self) {
332        self.node_properties.rebuild_zone_maps();
333        self.edge_properties.rebuild_zone_maps();
334    }
335}