reddb_server/storage/unified/devx/
query.rs1use std::collections::HashMap;
6use std::sync::Arc;
7
8use super::super::{EntityData, EntityId, EntityKind, MetadataValue, UnifiedEntity, UnifiedStore};
9use super::error::DevXError;
10use super::helpers::cosine_similarity;
11use crate::storage::schema::Value;
12
13pub struct QueryBuilder {
19 store: Arc<UnifiedStore>,
20 collections: Option<Vec<String>>,
21 vector_query: Option<Vec<f32>>,
22 similarity_threshold: f32,
23 property_filters: Vec<(String, PropertyFilter)>,
24 metadata_filters: Vec<(String, MetadataFilter)>,
25 expand_edges: Vec<(String, u32)>, limit: usize,
27 offset: usize,
28}
29
30impl QueryBuilder {
31 pub(crate) fn new(store: Arc<UnifiedStore>) -> Self {
32 Self {
33 store,
34 collections: None,
35 vector_query: None,
36 similarity_threshold: 0.7,
37 property_filters: Vec::new(),
38 metadata_filters: Vec::new(),
39 expand_edges: Vec::new(),
40 limit: 100,
41 offset: 0,
42 }
43 }
44
45 pub fn collection(mut self, name: impl Into<String>) -> Self {
47 self.collections.get_or_insert(Vec::new()).push(name.into());
48 self
49 }
50
51 pub fn collections(mut self, names: impl IntoIterator<Item = impl Into<String>>) -> Self {
53 let cols = self.collections.get_or_insert(Vec::new());
54 for name in names {
55 cols.push(name.into());
56 }
57 self
58 }
59
60 pub fn similar_to(mut self, vector: Vec<f32>, threshold: f32) -> Self {
62 self.vector_query = Some(vector);
63 self.similarity_threshold = threshold;
64 self
65 }
66
67 pub fn where_prop(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
69 self.property_filters
70 .push((key.into(), PropertyFilter::Eq(value.into())));
71 self
72 }
73
74 pub fn where_prop_gt(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
76 self.property_filters
77 .push((key.into(), PropertyFilter::Gt(value.into())));
78 self
79 }
80
81 pub fn where_prop_lt(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
83 self.property_filters
84 .push((key.into(), PropertyFilter::Lt(value.into())));
85 self
86 }
87
88 pub fn where_prop_contains(
90 mut self,
91 key: impl Into<String>,
92 substr: impl Into<String>,
93 ) -> Self {
94 self.property_filters
95 .push((key.into(), PropertyFilter::Contains(substr.into())));
96 self
97 }
98
99 pub fn where_meta(mut self, key: impl Into<String>, value: impl Into<MetadataValue>) -> Self {
101 self.metadata_filters
102 .push((key.into(), MetadataFilter::Eq(value.into())));
103 self
104 }
105
106 pub fn expand(mut self, edge_label: impl Into<String>, depth: u32) -> Self {
108 self.expand_edges.push((edge_label.into(), depth));
109 self
110 }
111
112 pub fn limit(mut self, limit: usize) -> Self {
114 self.limit = limit;
115 self
116 }
117
118 pub fn offset(mut self, offset: usize) -> Self {
120 self.offset = offset;
121 self
122 }
123
124 pub fn execute(mut self) -> Result<QueryResult, DevXError> {
126 let mut results = Vec::new();
127
128 let collections = self
129 .collections
130 .take()
131 .unwrap_or_else(|| self.store.list_collections());
132
133 for col_name in &collections {
134 let manager = match self.store.get_collection(col_name) {
135 Some(m) => m,
136 None => continue,
137 };
138
139 let entities = manager.query_all(|_| true);
140
141 for entity in entities {
142 let mut score = 0.0f32;
143 let mut include = true;
144
145 if let Some(ref query_vec) = self.vector_query {
147 let sim = match &entity.data {
148 EntityData::Vector(v) => cosine_similarity(query_vec, &v.dense),
149 _ => entity
150 .embeddings()
151 .iter()
152 .map(|e| cosine_similarity(query_vec, &e.vector))
153 .fold(0.0f32, f32::max),
154 };
155
156 if sim < self.similarity_threshold {
157 include = false;
158 } else {
159 score = sim;
160 }
161 }
162
163 if include {
165 let props = self.extract_properties(&entity);
166 for (key, filter) in &self.property_filters {
167 if !filter.matches(props.get(key)) {
168 include = false;
169 break;
170 }
171 }
172 }
173
174 if include {
175 results.push(QueryResultItem {
176 entity,
177 collection: col_name.clone(),
178 score,
179 expanded: Vec::new(),
180 });
181 }
182 }
183 }
184
185 results.sort_by(|a, b| {
187 b.score
188 .partial_cmp(&a.score)
189 .unwrap_or(std::cmp::Ordering::Equal)
190 });
191
192 if !self.expand_edges.is_empty() {
194 for item in &mut results {
195 for (edge_label, depth) in &self.expand_edges {
196 let expanded = self.expand_entity(item.entity.id, edge_label, *depth);
197 item.expanded.extend(expanded);
198 }
199 }
200 }
201
202 let total = results.len();
204 let results: Vec<_> = results
205 .into_iter()
206 .skip(self.offset)
207 .take(self.limit)
208 .collect();
209
210 Ok(QueryResult {
211 items: results,
212 total,
213 offset: self.offset,
214 limit: self.limit,
215 })
216 }
217
218 fn extract_properties(&self, entity: &UnifiedEntity) -> HashMap<String, Value> {
219 match &entity.data {
220 EntityData::Node(n) => n.properties.clone(),
221 EntityData::Edge(e) => e.properties.clone(),
222 EntityData::Row(r) => r.named.clone().unwrap_or_default(),
223 EntityData::Vector(_) => HashMap::new(),
224 EntityData::TimeSeries(_) => HashMap::new(),
225 EntityData::QueueMessage(_) => HashMap::new(),
226 }
227 }
228
229 fn expand_entity(&self, id: EntityId, edge_label: &str, depth: u32) -> Vec<ExpandedEntity> {
230 if depth == 0 {
231 return Vec::new();
232 }
233
234 let mut expanded = Vec::new();
235 let refs = self.store.get_refs_from(id);
236
237 for (target_id, _ref_type, collection) in refs {
238 if let Some(entity) = self.store.get(&collection, target_id) {
239 let matches = match &entity.kind {
241 EntityKind::GraphEdge(ref edge) => edge.label == edge_label,
242 _ => true, };
244
245 if matches {
246 expanded.push(ExpandedEntity {
247 entity,
248 collection,
249 depth,
250 });
251
252 if depth > 1 {
254 let sub_expanded = self.expand_entity(target_id, edge_label, depth - 1);
255 expanded.extend(sub_expanded);
256 }
257 }
258 }
259 }
260
261 expanded
262 }
263}
264
265#[derive(Debug, Clone)]
271pub enum PropertyFilter {
272 Eq(Value),
273 Gt(Value),
274 Lt(Value),
275 Contains(String),
276}
277
278impl PropertyFilter {
279 pub fn matches(&self, value: Option<&Value>) -> bool {
280 match (self, value) {
281 (PropertyFilter::Eq(expected), Some(actual)) => expected == actual,
282 (PropertyFilter::Contains(substr), Some(Value::Text(s))) => s.contains(substr),
283 (PropertyFilter::Gt(expected), Some(actual)) => match (expected, actual) {
284 (Value::Integer(e), Value::Integer(a)) => a > e,
285 (Value::Float(e), Value::Float(a)) => a > e,
286 _ => false,
287 },
288 (PropertyFilter::Lt(expected), Some(actual)) => match (expected, actual) {
289 (Value::Integer(e), Value::Integer(a)) => a < e,
290 (Value::Float(e), Value::Float(a)) => a < e,
291 _ => false,
292 },
293 _ => false,
294 }
295 }
296}
297
298#[derive(Debug, Clone)]
300pub enum MetadataFilter {
301 Eq(MetadataValue),
302}
303
304#[derive(Debug)]
310pub struct QueryResult {
311 pub items: Vec<QueryResultItem>,
312 pub total: usize,
313 pub offset: usize,
314 pub limit: usize,
315}
316
317impl QueryResult {
318 pub fn is_empty(&self) -> bool {
319 self.items.is_empty()
320 }
321
322 pub fn len(&self) -> usize {
323 self.items.len()
324 }
325}
326
327#[derive(Debug)]
329pub struct QueryResultItem {
330 pub entity: UnifiedEntity,
331 pub collection: String,
332 pub score: f32,
333 pub expanded: Vec<ExpandedEntity>,
334}
335
336#[derive(Debug)]
338pub struct ExpandedEntity {
339 pub entity: UnifiedEntity,
340 pub collection: String,
341 pub depth: u32,
342}