Skip to main content

reddb_server/storage/query/engine/registry/
in_memory_impl.rs

1use super::*;
2
3impl InMemoryEngine {
4    /// Create empty engine
5    pub fn new() -> Self {
6        Self {
7            data: Arc::new(HashMap::new()),
8        }
9    }
10
11    /// Create with data
12    pub fn with_data(data: HashMap<String, Vec<Binding>>) -> Self {
13        Self {
14            data: Arc::new(data),
15        }
16    }
17
18    /// Execute BGP
19    fn execute_bgp(&self, bgp: &OpBGP) -> Box<dyn BindingIterator> {
20        // For now, return empty. Real impl would lookup in data store
21        let mut bindings = Vec::new();
22
23        // Simple pattern matching simulation
24        for triple in &bgp.triples {
25            if let Pattern::Uri(pred) = &triple.predicate {
26                if let Some(data) = self.data.get(pred) {
27                    bindings.extend(data.clone());
28                }
29            }
30        }
31
32        Box::new(QueryIterBase::new(bindings))
33    }
34
35    /// Execute triple
36    fn execute_triple(&self, triple: &OpTriple) -> Box<dyn BindingIterator> {
37        let bgp = OpBGP::from_triples(vec![triple.triple.clone()]);
38        self.execute_bgp(&bgp)
39    }
40
41    /// Execute join
42    fn execute_join(
43        &self,
44        left: Box<dyn BindingIterator>,
45        right_op: Op,
46    ) -> Box<dyn BindingIterator> {
47        let engine = self.clone();
48        let right_vars = right_op.vars();
49
50        Box::new(QueryIterJoin::new(
51            left,
52            move || engine.execute_op(&right_op),
53            right_vars,
54        ))
55    }
56
57    /// Execute filter
58    fn execute_filter(
59        &self,
60        sub: Box<dyn BindingIterator>,
61        filter: &FilterExpr,
62    ) -> Box<dyn BindingIterator> {
63        let filter = filter.clone();
64        Box::new(QueryIterFilter::new(sub, move |b| filter.evaluate(b)))
65    }
66
67    /// Execute union
68    fn execute_union(
69        &self,
70        left: Box<dyn BindingIterator>,
71        right: Box<dyn BindingIterator>,
72    ) -> Box<dyn BindingIterator> {
73        Box::new(QueryIterUnion::new(vec![left, right]))
74    }
75
76    /// Execute intersect (set intersection)
77    fn execute_intersect(
78        &self,
79        left: Box<dyn BindingIterator>,
80        mut right: Box<dyn BindingIterator>,
81    ) -> Box<dyn BindingIterator> {
82        // Collect right side into a set for O(1) lookups
83        let mut right_bindings: Vec<Binding> = Vec::new();
84        while let Ok(Some(binding)) = right.next_binding() {
85            right_bindings.push(binding);
86        }
87        let right_set: std::collections::HashSet<_> =
88            right_bindings.iter().map(binding_hash).collect();
89
90        // Filter left side to only include bindings that appear in right
91        Box::new(QueryIterFilter::new(left, move |b| {
92            right_set.contains(&binding_hash(b))
93        }))
94    }
95
96    /// Execute project
97    fn execute_project(
98        &self,
99        sub: Box<dyn BindingIterator>,
100        vars: &[Var],
101    ) -> Box<dyn BindingIterator> {
102        Box::new(QueryIterProject::new(sub, vars.to_vec()))
103    }
104
105    /// Execute distinct
106    fn execute_distinct(&self, sub: Box<dyn BindingIterator>) -> Box<dyn BindingIterator> {
107        Box::new(QueryIterDistinct::new(sub))
108    }
109
110    /// Execute slice
111    fn execute_slice(
112        &self,
113        sub: Box<dyn BindingIterator>,
114        offset: u64,
115        limit: Option<u64>,
116    ) -> Box<dyn BindingIterator> {
117        Box::new(QueryIterSlice::new(sub, offset, limit))
118    }
119
120    /// Execute order
121    fn execute_order(
122        &self,
123        sub: Box<dyn BindingIterator>,
124        keys: &[OrderKey],
125    ) -> Box<dyn BindingIterator> {
126        let sort_keys: Vec<SortKey> = keys
127            .iter()
128            .filter_map(|k| {
129                if let ExprTerm::Var(v) = &k.expr {
130                    Some(SortKey {
131                        var: v.clone(),
132                        ascending: k.ascending,
133                    })
134                } else {
135                    None
136                }
137            })
138            .collect();
139
140        Box::new(QueryIterSort::new(sub, sort_keys))
141    }
142
143    /// Execute table
144    fn execute_table(&self, table: &OpTable) -> Box<dyn BindingIterator> {
145        let mut bindings = Vec::new();
146
147        for row in &table.rows {
148            let mut builder = crate::storage::query::engine::binding::BindingBuilder::new();
149            for (i, var) in table.vars.iter().enumerate() {
150                if let Some(Some(value)) = row.get(i) {
151                    builder = builder.add(var.clone(), value.clone());
152                }
153            }
154            bindings.push(builder.build());
155        }
156
157        Box::new(QueryIterBase::new(bindings))
158    }
159
160    fn collect_bindings(iter: Box<dyn BindingIterator>) -> Vec<Binding> {
161        let query_iter = QueryIter::new(iter);
162        query_iter
163            .collect::<Result<Vec<_>, _>>()
164            .unwrap_or_default()
165    }
166
167    fn execute_group_op(&self, group: &OpGroup) -> Box<dyn BindingIterator> {
168        let sub = self.execute_op(&group.sub_op);
169        let bindings = Self::collect_bindings(sub);
170        let results = Self::group_bindings(bindings, &group.group_vars, &group.aggregates);
171        Box::new(QueryIterBase::new(results))
172    }
173
174    fn execute_extend_op(&self, extend: &OpExtend) -> Box<dyn BindingIterator> {
175        let sub = self.execute_op(&extend.sub_op);
176        let bindings = Self::collect_bindings(sub);
177
178        let result: Vec<Binding> = bindings
179            .into_iter()
180            .filter_map(|binding| {
181                let existing = binding.get(&extend.var).cloned();
182                let evaluated = extend.expr.evaluate(&binding);
183
184                match (existing, evaluated) {
185                    (Some(current), Some(value)) => {
186                        if current == value {
187                            Some(binding)
188                        } else {
189                            None
190                        }
191                    }
192                    (Some(_), None) => Some(binding),
193                    (None, Some(value)) => Some(binding.extend(extend.var.clone(), value)),
194                    (None, None) => Some(binding),
195                }
196            })
197            .collect();
198
199        Box::new(QueryIterBase::new(result))
200    }
201
202    fn execute_minus_op(&self, minus: &OpMinus) -> Box<dyn BindingIterator> {
203        let left = Self::collect_bindings(self.execute_op(&minus.left));
204        let right = Self::collect_bindings(self.execute_op(&minus.right));
205
206        let result: Vec<Binding> = left
207            .into_iter()
208            .filter(|binding| {
209                !right.iter().any(|candidate| {
210                    bindings_share_vars(binding, candidate)
211                        && bindings_compatible(binding, candidate)
212                })
213            })
214            .collect();
215
216        Box::new(QueryIterBase::new(result))
217    }
218
219    fn group_bindings(
220        bindings: Vec<Binding>,
221        group_vars: &[Var],
222        aggregates: &[(Var, Aggregate)],
223    ) -> Vec<Binding> {
224        let mut groups: HashMap<Vec<Option<Value>>, Vec<Binding>> = HashMap::new();
225        let mut group_order: Vec<Vec<Option<Value>>> = Vec::new();
226
227        for binding in bindings {
228            let key_values: Vec<Option<Value>> =
229                group_vars.iter().map(|v| binding.get(v).cloned()).collect();
230
231            if !groups.contains_key(&key_values) {
232                group_order.push(key_values.clone());
233            }
234            groups.entry(key_values).or_default().push(binding);
235        }
236
237        let mut results = Vec::new();
238
239        for key_values in group_order {
240            let Some(group_bindings) = groups.get(&key_values) else {
241                continue;
242            };
243            if group_bindings.is_empty() {
244                continue;
245            }
246
247            let mut result = Binding::empty();
248
249            for (idx, var) in group_vars.iter().enumerate() {
250                if let Some(Some(value)) = key_values.get(idx) {
251                    result = result.extend(var.clone(), value.clone());
252                }
253            }
254
255            for (result_var, agg) in aggregates {
256                if let Some(mut aggregator) = Self::build_aggregator(agg) {
257                    for binding in group_bindings {
258                        let value = Self::aggregate_value(agg, binding);
259                        aggregator.accumulate(value.as_ref());
260                    }
261                    let agg_value = aggregator.finalize();
262                    result = result.extend(result_var.clone(), agg_value);
263                }
264            }
265
266            results.push(result);
267        }
268
269        results
270    }
271
272    fn build_aggregator(agg: &Aggregate) -> Option<Box<dyn Aggregator>> {
273        match agg {
274            Aggregate::Count(None) => Some(Box::new(CountAggregator::count_all())),
275            Aggregate::Count(Some(_)) => Some(Box::new(CountAggregator::count_column())),
276            Aggregate::CountDistinct(_) => Some(Box::new(CountDistinctAggregator::new())),
277            Aggregate::Sum(_) => Some(Box::new(SumAggregator::new())),
278            Aggregate::Avg(_) => Some(Box::new(AvgAggregator::new())),
279            Aggregate::Min(_) => Some(Box::new(MinAggregator::new())),
280            Aggregate::Max(_) => Some(Box::new(MaxAggregator::new())),
281            Aggregate::Sample(_) => Some(Box::new(SampleAggregator::new())),
282            Aggregate::GroupConcat(_, sep) => {
283                Some(Box::new(GroupConcatAggregator::new(sep.clone())))
284            }
285        }
286    }
287
288    fn aggregate_value(agg: &Aggregate, binding: &Binding) -> Option<Value> {
289        match agg {
290            Aggregate::Count(None) => None,
291            Aggregate::Count(Some(expr))
292            | Aggregate::CountDistinct(expr)
293            | Aggregate::Sum(expr)
294            | Aggregate::Avg(expr)
295            | Aggregate::Min(expr)
296            | Aggregate::Max(expr)
297            | Aggregate::Sample(expr)
298            | Aggregate::GroupConcat(expr, _) => expr.evaluate(binding),
299        }
300    }
301
302    /// Execute an Op recursively
303    pub(crate) fn execute_op(&self, op: &Op) -> Box<dyn BindingIterator> {
304        match op {
305            Op::BGP(bgp) => self.execute_bgp(bgp),
306            Op::Triple(triple) => self.execute_triple(triple),
307            Op::Join(join) => {
308                let left = self.execute_op(&join.left);
309                self.execute_join(left, (*join.right).clone())
310            }
311            Op::LeftJoin(lj) => {
312                // Simplified: execute as regular join with null extension
313                let left = self.execute_op(&lj.left);
314                self.execute_join(left, (*lj.right).clone())
315            }
316            Op::Filter(filter) => {
317                let sub = self.execute_op(&filter.sub_op);
318                self.execute_filter(sub, &filter.filter)
319            }
320            Op::Union(union) => {
321                let left = self.execute_op(&union.left);
322                let right = self.execute_op(&union.right);
323                self.execute_union(left, right)
324            }
325            Op::Project(project) => {
326                let sub = self.execute_op(&project.sub_op);
327                self.execute_project(sub, &project.vars)
328            }
329            Op::Distinct(distinct) => {
330                let sub = self.execute_op(&distinct.sub_op);
331                self.execute_distinct(sub)
332            }
333            Op::Reduced(reduced) => {
334                // Reduced is like distinct but weaker - for now, same impl
335                let sub = self.execute_op(&reduced.sub_op);
336                self.execute_distinct(sub)
337            }
338            Op::Slice(slice) => {
339                let sub = self.execute_op(&slice.sub_op);
340                self.execute_slice(sub, slice.offset, slice.limit)
341            }
342            Op::Order(order) => {
343                let sub = self.execute_op(&order.sub_op);
344                self.execute_order(sub, &order.keys)
345            }
346            Op::Group(group) => self.execute_group_op(group),
347            Op::Extend(extend) => self.execute_extend_op(extend),
348            Op::Minus(minus) => self.execute_minus_op(minus),
349            Op::RightJoin(rj) => {
350                // Right join: swap left/right, execute as left join, swap back
351                let left = self.execute_op(&rj.left);
352                let right = self.execute_op(&rj.right);
353                // Simplified: execute right side, join with left
354                self.execute_join(right, (*rj.left).clone())
355            }
356            Op::CrossJoin(cj) => {
357                // Cross join: Cartesian product
358                let left = self.execute_op(&cj.left);
359                self.execute_join(left, (*cj.right).clone())
360            }
361            Op::Intersect(inter) => {
362                // Set intersection: only bindings that appear in both sides
363                let left = self.execute_op(&inter.left);
364                let right = self.execute_op(&inter.right);
365                self.execute_intersect(left, right)
366            }
367            Op::Table(table) => self.execute_table(table),
368            Op::Sequence(seq) => {
369                // Execute in sequence, join results
370                if seq.ops.is_empty() {
371                    return Box::new(QueryIterBase::single(Binding::empty()));
372                }
373
374                let mut result = self.execute_op(&seq.ops[0]);
375                for op in seq.ops.iter().skip(1) {
376                    let right = op.clone();
377                    result = self.execute_join(result, right);
378                }
379                result
380            }
381            Op::Disjunction(disj) => {
382                // Union all branches
383                if disj.ops.is_empty() {
384                    return Box::new(QueryIterBase::empty());
385                }
386
387                let iters: Vec<Box<dyn BindingIterator>> =
388                    disj.ops.iter().map(|op| self.execute_op(op)).collect();
389
390                Box::new(QueryIterUnion::new(iters))
391            }
392            Op::Null(_) => Box::new(QueryIterBase::empty()),
393        }
394    }
395}