reddb_server/storage/query/engine/registry/
in_memory_impl.rs1use super::*;
2
3impl InMemoryEngine {
4 pub fn new() -> Self {
6 Self {
7 data: Arc::new(HashMap::new()),
8 }
9 }
10
11 pub fn with_data(data: HashMap<String, Vec<Binding>>) -> Self {
13 Self {
14 data: Arc::new(data),
15 }
16 }
17
18 fn execute_bgp(&self, bgp: &OpBGP) -> Box<dyn BindingIterator> {
20 let mut bindings = Vec::new();
22
23 for triple in &bgp.triples {
25 if let Pattern::Uri(pred) = &triple.predicate {
26 if let Some(data) = self.data.get(pred) {
27 bindings.extend(data.clone());
28 }
29 }
30 }
31
32 Box::new(QueryIterBase::new(bindings))
33 }
34
35 fn execute_triple(&self, triple: &OpTriple) -> Box<dyn BindingIterator> {
37 let bgp = OpBGP::from_triples(vec![triple.triple.clone()]);
38 self.execute_bgp(&bgp)
39 }
40
41 fn execute_join(
43 &self,
44 left: Box<dyn BindingIterator>,
45 right_op: Op,
46 ) -> Box<dyn BindingIterator> {
47 let engine = self.clone();
48 let right_vars = right_op.vars();
49
50 Box::new(QueryIterJoin::new(
51 left,
52 move || engine.execute_op(&right_op),
53 right_vars,
54 ))
55 }
56
57 fn execute_filter(
59 &self,
60 sub: Box<dyn BindingIterator>,
61 filter: &FilterExpr,
62 ) -> Box<dyn BindingIterator> {
63 let filter = filter.clone();
64 Box::new(QueryIterFilter::new(sub, move |b| filter.evaluate(b)))
65 }
66
67 fn execute_union(
69 &self,
70 left: Box<dyn BindingIterator>,
71 right: Box<dyn BindingIterator>,
72 ) -> Box<dyn BindingIterator> {
73 Box::new(QueryIterUnion::new(vec![left, right]))
74 }
75
76 fn execute_intersect(
78 &self,
79 left: Box<dyn BindingIterator>,
80 mut right: Box<dyn BindingIterator>,
81 ) -> Box<dyn BindingIterator> {
82 let mut right_bindings: Vec<Binding> = Vec::new();
84 while let Ok(Some(binding)) = right.next_binding() {
85 right_bindings.push(binding);
86 }
87 let right_set: std::collections::HashSet<_> =
88 right_bindings.iter().map(binding_hash).collect();
89
90 Box::new(QueryIterFilter::new(left, move |b| {
92 right_set.contains(&binding_hash(b))
93 }))
94 }
95
96 fn execute_project(
98 &self,
99 sub: Box<dyn BindingIterator>,
100 vars: &[Var],
101 ) -> Box<dyn BindingIterator> {
102 Box::new(QueryIterProject::new(sub, vars.to_vec()))
103 }
104
105 fn execute_distinct(&self, sub: Box<dyn BindingIterator>) -> Box<dyn BindingIterator> {
107 Box::new(QueryIterDistinct::new(sub))
108 }
109
110 fn execute_slice(
112 &self,
113 sub: Box<dyn BindingIterator>,
114 offset: u64,
115 limit: Option<u64>,
116 ) -> Box<dyn BindingIterator> {
117 Box::new(QueryIterSlice::new(sub, offset, limit))
118 }
119
120 fn execute_order(
122 &self,
123 sub: Box<dyn BindingIterator>,
124 keys: &[OrderKey],
125 ) -> Box<dyn BindingIterator> {
126 let sort_keys: Vec<SortKey> = keys
127 .iter()
128 .filter_map(|k| {
129 if let ExprTerm::Var(v) = &k.expr {
130 Some(SortKey {
131 var: v.clone(),
132 ascending: k.ascending,
133 })
134 } else {
135 None
136 }
137 })
138 .collect();
139
140 Box::new(QueryIterSort::new(sub, sort_keys))
141 }
142
143 fn execute_table(&self, table: &OpTable) -> Box<dyn BindingIterator> {
145 let mut bindings = Vec::new();
146
147 for row in &table.rows {
148 let mut builder = crate::storage::query::engine::binding::BindingBuilder::new();
149 for (i, var) in table.vars.iter().enumerate() {
150 if let Some(Some(value)) = row.get(i) {
151 builder = builder.add(var.clone(), value.clone());
152 }
153 }
154 bindings.push(builder.build());
155 }
156
157 Box::new(QueryIterBase::new(bindings))
158 }
159
160 fn collect_bindings(iter: Box<dyn BindingIterator>) -> Vec<Binding> {
161 let query_iter = QueryIter::new(iter);
162 query_iter
163 .collect::<Result<Vec<_>, _>>()
164 .unwrap_or_default()
165 }
166
167 fn execute_group_op(&self, group: &OpGroup) -> Box<dyn BindingIterator> {
168 let sub = self.execute_op(&group.sub_op);
169 let bindings = Self::collect_bindings(sub);
170 let results = Self::group_bindings(bindings, &group.group_vars, &group.aggregates);
171 Box::new(QueryIterBase::new(results))
172 }
173
174 fn execute_extend_op(&self, extend: &OpExtend) -> Box<dyn BindingIterator> {
175 let sub = self.execute_op(&extend.sub_op);
176 let bindings = Self::collect_bindings(sub);
177
178 let result: Vec<Binding> = bindings
179 .into_iter()
180 .filter_map(|binding| {
181 let existing = binding.get(&extend.var).cloned();
182 let evaluated = extend.expr.evaluate(&binding);
183
184 match (existing, evaluated) {
185 (Some(current), Some(value)) => {
186 if current == value {
187 Some(binding)
188 } else {
189 None
190 }
191 }
192 (Some(_), None) => Some(binding),
193 (None, Some(value)) => Some(binding.extend(extend.var.clone(), value)),
194 (None, None) => Some(binding),
195 }
196 })
197 .collect();
198
199 Box::new(QueryIterBase::new(result))
200 }
201
202 fn execute_minus_op(&self, minus: &OpMinus) -> Box<dyn BindingIterator> {
203 let left = Self::collect_bindings(self.execute_op(&minus.left));
204 let right = Self::collect_bindings(self.execute_op(&minus.right));
205
206 let result: Vec<Binding> = left
207 .into_iter()
208 .filter(|binding| {
209 !right.iter().any(|candidate| {
210 bindings_share_vars(binding, candidate)
211 && bindings_compatible(binding, candidate)
212 })
213 })
214 .collect();
215
216 Box::new(QueryIterBase::new(result))
217 }
218
219 fn group_bindings(
220 bindings: Vec<Binding>,
221 group_vars: &[Var],
222 aggregates: &[(Var, Aggregate)],
223 ) -> Vec<Binding> {
224 let mut groups: HashMap<Vec<Option<Value>>, Vec<Binding>> = HashMap::new();
225 let mut group_order: Vec<Vec<Option<Value>>> = Vec::new();
226
227 for binding in bindings {
228 let key_values: Vec<Option<Value>> =
229 group_vars.iter().map(|v| binding.get(v).cloned()).collect();
230
231 if !groups.contains_key(&key_values) {
232 group_order.push(key_values.clone());
233 }
234 groups.entry(key_values).or_default().push(binding);
235 }
236
237 let mut results = Vec::new();
238
239 for key_values in group_order {
240 let Some(group_bindings) = groups.get(&key_values) else {
241 continue;
242 };
243 if group_bindings.is_empty() {
244 continue;
245 }
246
247 let mut result = Binding::empty();
248
249 for (idx, var) in group_vars.iter().enumerate() {
250 if let Some(Some(value)) = key_values.get(idx) {
251 result = result.extend(var.clone(), value.clone());
252 }
253 }
254
255 for (result_var, agg) in aggregates {
256 if let Some(mut aggregator) = Self::build_aggregator(agg) {
257 for binding in group_bindings {
258 let value = Self::aggregate_value(agg, binding);
259 aggregator.accumulate(value.as_ref());
260 }
261 let agg_value = aggregator.finalize();
262 result = result.extend(result_var.clone(), agg_value);
263 }
264 }
265
266 results.push(result);
267 }
268
269 results
270 }
271
272 fn build_aggregator(agg: &Aggregate) -> Option<Box<dyn Aggregator>> {
273 match agg {
274 Aggregate::Count(None) => Some(Box::new(CountAggregator::count_all())),
275 Aggregate::Count(Some(_)) => Some(Box::new(CountAggregator::count_column())),
276 Aggregate::CountDistinct(_) => Some(Box::new(CountDistinctAggregator::new())),
277 Aggregate::Sum(_) => Some(Box::new(SumAggregator::new())),
278 Aggregate::Avg(_) => Some(Box::new(AvgAggregator::new())),
279 Aggregate::Min(_) => Some(Box::new(MinAggregator::new())),
280 Aggregate::Max(_) => Some(Box::new(MaxAggregator::new())),
281 Aggregate::Sample(_) => Some(Box::new(SampleAggregator::new())),
282 Aggregate::GroupConcat(_, sep) => {
283 Some(Box::new(GroupConcatAggregator::new(sep.clone())))
284 }
285 }
286 }
287
288 fn aggregate_value(agg: &Aggregate, binding: &Binding) -> Option<Value> {
289 match agg {
290 Aggregate::Count(None) => None,
291 Aggregate::Count(Some(expr))
292 | Aggregate::CountDistinct(expr)
293 | Aggregate::Sum(expr)
294 | Aggregate::Avg(expr)
295 | Aggregate::Min(expr)
296 | Aggregate::Max(expr)
297 | Aggregate::Sample(expr)
298 | Aggregate::GroupConcat(expr, _) => expr.evaluate(binding),
299 }
300 }
301
302 pub(crate) fn execute_op(&self, op: &Op) -> Box<dyn BindingIterator> {
304 match op {
305 Op::BGP(bgp) => self.execute_bgp(bgp),
306 Op::Triple(triple) => self.execute_triple(triple),
307 Op::Join(join) => {
308 let left = self.execute_op(&join.left);
309 self.execute_join(left, (*join.right).clone())
310 }
311 Op::LeftJoin(lj) => {
312 let left = self.execute_op(&lj.left);
314 self.execute_join(left, (*lj.right).clone())
315 }
316 Op::Filter(filter) => {
317 let sub = self.execute_op(&filter.sub_op);
318 self.execute_filter(sub, &filter.filter)
319 }
320 Op::Union(union) => {
321 let left = self.execute_op(&union.left);
322 let right = self.execute_op(&union.right);
323 self.execute_union(left, right)
324 }
325 Op::Project(project) => {
326 let sub = self.execute_op(&project.sub_op);
327 self.execute_project(sub, &project.vars)
328 }
329 Op::Distinct(distinct) => {
330 let sub = self.execute_op(&distinct.sub_op);
331 self.execute_distinct(sub)
332 }
333 Op::Reduced(reduced) => {
334 let sub = self.execute_op(&reduced.sub_op);
336 self.execute_distinct(sub)
337 }
338 Op::Slice(slice) => {
339 let sub = self.execute_op(&slice.sub_op);
340 self.execute_slice(sub, slice.offset, slice.limit)
341 }
342 Op::Order(order) => {
343 let sub = self.execute_op(&order.sub_op);
344 self.execute_order(sub, &order.keys)
345 }
346 Op::Group(group) => self.execute_group_op(group),
347 Op::Extend(extend) => self.execute_extend_op(extend),
348 Op::Minus(minus) => self.execute_minus_op(minus),
349 Op::RightJoin(rj) => {
350 let left = self.execute_op(&rj.left);
352 let right = self.execute_op(&rj.right);
353 self.execute_join(right, (*rj.left).clone())
355 }
356 Op::CrossJoin(cj) => {
357 let left = self.execute_op(&cj.left);
359 self.execute_join(left, (*cj.right).clone())
360 }
361 Op::Intersect(inter) => {
362 let left = self.execute_op(&inter.left);
364 let right = self.execute_op(&inter.right);
365 self.execute_intersect(left, right)
366 }
367 Op::Table(table) => self.execute_table(table),
368 Op::Sequence(seq) => {
369 if seq.ops.is_empty() {
371 return Box::new(QueryIterBase::single(Binding::empty()));
372 }
373
374 let mut result = self.execute_op(&seq.ops[0]);
375 for op in seq.ops.iter().skip(1) {
376 let right = op.clone();
377 result = self.execute_join(result, right);
378 }
379 result
380 }
381 Op::Disjunction(disj) => {
382 if disj.ops.is_empty() {
384 return Box::new(QueryIterBase::empty());
385 }
386
387 let iters: Vec<Box<dyn BindingIterator>> =
388 disj.ops.iter().map(|op| self.execute_op(op)).collect();
389
390 Box::new(QueryIterUnion::new(iters))
391 }
392 Op::Null(_) => Box::new(QueryIterBase::empty()),
393 }
394 }
395}