Skip to main content

reddb_server/storage/unified/devx/
query.rs

1//! Query Builder and Query Types
2//!
3//! Fluent query API for multi-modal queries across tables, graphs, and vectors.
4
5use std::collections::HashMap;
6use std::sync::Arc;
7
8use super::super::{EntityData, EntityId, EntityKind, MetadataValue, UnifiedEntity, UnifiedStore};
9use super::error::DevXError;
10use super::helpers::cosine_similarity;
11use crate::storage::schema::Value;
12
13// ============================================================================
14// Query Builder
15// ============================================================================
16
17/// Fluent query builder for multi-modal queries
18pub struct QueryBuilder {
19    store: Arc<UnifiedStore>,
20    collections: Option<Vec<String>>,
21    vector_query: Option<Vec<f32>>,
22    similarity_threshold: f32,
23    property_filters: Vec<(String, PropertyFilter)>,
24    metadata_filters: Vec<(String, MetadataFilter)>,
25    expand_edges: Vec<(String, u32)>, // (edge_label, depth)
26    limit: usize,
27    offset: usize,
28}
29
30impl QueryBuilder {
31    pub(crate) fn new(store: Arc<UnifiedStore>) -> Self {
32        Self {
33            store,
34            collections: None,
35            vector_query: None,
36            similarity_threshold: 0.7,
37            property_filters: Vec::new(),
38            metadata_filters: Vec::new(),
39            expand_edges: Vec::new(),
40            limit: 100,
41            offset: 0,
42        }
43    }
44
45    /// Search in specific collections
46    pub fn collection(mut self, name: impl Into<String>) -> Self {
47        self.collections.get_or_insert(Vec::new()).push(name.into());
48        self
49    }
50
51    /// Search in multiple collections
52    pub fn collections(mut self, names: impl IntoIterator<Item = impl Into<String>>) -> Self {
53        let cols = self.collections.get_or_insert(Vec::new());
54        for name in names {
55            cols.push(name.into());
56        }
57        self
58    }
59
60    /// Vector similarity search
61    pub fn similar_to(mut self, vector: Vec<f32>, threshold: f32) -> Self {
62        self.vector_query = Some(vector);
63        self.similarity_threshold = threshold;
64        self
65    }
66
67    /// Filter by property value (equality)
68    pub fn where_prop(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
69        self.property_filters
70            .push((key.into(), PropertyFilter::Eq(value.into())));
71        self
72    }
73
74    /// Filter by property (greater than)
75    pub fn where_prop_gt(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
76        self.property_filters
77            .push((key.into(), PropertyFilter::Gt(value.into())));
78        self
79    }
80
81    /// Filter by property (less than)
82    pub fn where_prop_lt(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
83        self.property_filters
84            .push((key.into(), PropertyFilter::Lt(value.into())));
85        self
86    }
87
88    /// Filter by property (contains text)
89    pub fn where_prop_contains(
90        mut self,
91        key: impl Into<String>,
92        substr: impl Into<String>,
93    ) -> Self {
94        self.property_filters
95            .push((key.into(), PropertyFilter::Contains(substr.into())));
96        self
97    }
98
99    /// Filter by metadata
100    pub fn where_meta(mut self, key: impl Into<String>, value: impl Into<MetadataValue>) -> Self {
101        self.metadata_filters
102            .push((key.into(), MetadataFilter::Eq(value.into())));
103        self
104    }
105
106    /// Expand through edges
107    pub fn expand(mut self, edge_label: impl Into<String>, depth: u32) -> Self {
108        self.expand_edges.push((edge_label.into(), depth));
109        self
110    }
111
112    /// Set result limit
113    pub fn limit(mut self, limit: usize) -> Self {
114        self.limit = limit;
115        self
116    }
117
118    /// Set offset for pagination
119    pub fn offset(mut self, offset: usize) -> Self {
120        self.offset = offset;
121        self
122    }
123
124    /// Execute the query
125    pub fn execute(mut self) -> Result<QueryResult, DevXError> {
126        let mut results = Vec::new();
127
128        let collections = self
129            .collections
130            .take()
131            .unwrap_or_else(|| self.store.list_collections());
132
133        for col_name in &collections {
134            let manager = match self.store.get_collection(col_name) {
135                Some(m) => m,
136                None => continue,
137            };
138
139            let entities = manager.query_all(|_| true);
140
141            for entity in entities {
142                let mut score = 0.0f32;
143                let mut include = true;
144
145                // Vector similarity
146                if let Some(ref query_vec) = self.vector_query {
147                    let sim = match &entity.data {
148                        EntityData::Vector(v) => cosine_similarity(query_vec, &v.dense),
149                        _ => entity
150                            .embeddings()
151                            .iter()
152                            .map(|e| cosine_similarity(query_vec, &e.vector))
153                            .fold(0.0f32, f32::max),
154                    };
155
156                    if sim < self.similarity_threshold {
157                        include = false;
158                    } else {
159                        score = sim;
160                    }
161                }
162
163                // Property filters
164                if include {
165                    let props = self.extract_properties(&entity);
166                    for (key, filter) in &self.property_filters {
167                        if !filter.matches(props.get(key)) {
168                            include = false;
169                            break;
170                        }
171                    }
172                }
173
174                if include {
175                    results.push(QueryResultItem {
176                        entity,
177                        collection: col_name.clone(),
178                        score,
179                        expanded: Vec::new(),
180                    });
181                }
182            }
183        }
184
185        // Sort by score
186        results.sort_by(|a, b| {
187            b.score
188                .partial_cmp(&a.score)
189                .unwrap_or(std::cmp::Ordering::Equal)
190        });
191
192        // Expand edges
193        if !self.expand_edges.is_empty() {
194            for item in &mut results {
195                for (edge_label, depth) in &self.expand_edges {
196                    let expanded = self.expand_entity(item.entity.id, edge_label, *depth);
197                    item.expanded.extend(expanded);
198                }
199            }
200        }
201
202        // Apply pagination
203        let total = results.len();
204        let results: Vec<_> = results
205            .into_iter()
206            .skip(self.offset)
207            .take(self.limit)
208            .collect();
209
210        Ok(QueryResult {
211            items: results,
212            total,
213            offset: self.offset,
214            limit: self.limit,
215        })
216    }
217
218    fn extract_properties(&self, entity: &UnifiedEntity) -> HashMap<String, Value> {
219        match &entity.data {
220            EntityData::Node(n) => n.properties.clone(),
221            EntityData::Edge(e) => e.properties.clone(),
222            EntityData::Row(r) => r.named.clone().unwrap_or_default(),
223            EntityData::Vector(_) => HashMap::new(),
224            EntityData::TimeSeries(_) => HashMap::new(),
225            EntityData::QueueMessage(_) => HashMap::new(),
226        }
227    }
228
229    fn expand_entity(&self, id: EntityId, edge_label: &str, depth: u32) -> Vec<ExpandedEntity> {
230        if depth == 0 {
231            return Vec::new();
232        }
233
234        let mut expanded = Vec::new();
235        let refs = self.store.get_refs_from(id);
236
237        for (target_id, _ref_type, collection) in refs {
238            if let Some(entity) = self.store.get(&collection, target_id) {
239                // Check if edge matches label
240                let matches = match &entity.kind {
241                    EntityKind::GraphEdge(ref edge) => edge.label == edge_label,
242                    _ => true, // Include non-edge entities
243                };
244
245                if matches {
246                    expanded.push(ExpandedEntity {
247                        entity,
248                        collection,
249                        depth,
250                    });
251
252                    // Recursively expand
253                    if depth > 1 {
254                        let sub_expanded = self.expand_entity(target_id, edge_label, depth - 1);
255                        expanded.extend(sub_expanded);
256                    }
257                }
258            }
259        }
260
261        expanded
262    }
263}
264
265// ============================================================================
266// Filter Types
267// ============================================================================
268
269/// Property filter operations
270#[derive(Debug, Clone)]
271pub enum PropertyFilter {
272    Eq(Value),
273    Gt(Value),
274    Lt(Value),
275    Contains(String),
276}
277
278impl PropertyFilter {
279    pub fn matches(&self, value: Option<&Value>) -> bool {
280        match (self, value) {
281            (PropertyFilter::Eq(expected), Some(actual)) => expected == actual,
282            (PropertyFilter::Contains(substr), Some(Value::Text(s))) => s.contains(substr),
283            (PropertyFilter::Gt(expected), Some(actual)) => match (expected, actual) {
284                (Value::Integer(e), Value::Integer(a)) => a > e,
285                (Value::Float(e), Value::Float(a)) => a > e,
286                _ => false,
287            },
288            (PropertyFilter::Lt(expected), Some(actual)) => match (expected, actual) {
289                (Value::Integer(e), Value::Integer(a)) => a < e,
290                (Value::Float(e), Value::Float(a)) => a < e,
291                _ => false,
292            },
293            _ => false,
294        }
295    }
296}
297
298/// Metadata filter operations
299#[derive(Debug, Clone)]
300pub enum MetadataFilter {
301    Eq(MetadataValue),
302}
303
304// ============================================================================
305// Result Types
306// ============================================================================
307
308/// Query result container
309#[derive(Debug)]
310pub struct QueryResult {
311    pub items: Vec<QueryResultItem>,
312    pub total: usize,
313    pub offset: usize,
314    pub limit: usize,
315}
316
317impl QueryResult {
318    pub fn is_empty(&self) -> bool {
319        self.items.is_empty()
320    }
321
322    pub fn len(&self) -> usize {
323        self.items.len()
324    }
325}
326
327/// Single item in query results
328#[derive(Debug)]
329pub struct QueryResultItem {
330    pub entity: UnifiedEntity,
331    pub collection: String,
332    pub score: f32,
333    pub expanded: Vec<ExpandedEntity>,
334}
335
336/// Expanded entity from graph traversal
337#[derive(Debug)]
338pub struct ExpandedEntity {
339    pub entity: UnifiedEntity,
340    pub collection: String,
341    pub depth: u32,
342}