Skip to main content

drasi_core/query/
continuous_query.rs

1// Copyright 2024 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::HashMap,
17    fmt::Debug,
18    hash::{Hash, Hasher},
19    sync::Arc,
20    time::Duration,
21};
22
23use drasi_query_ast::ast::Query;
24use hashers::jenkins::spooky_hash::SpookyHasher;
25use tokio::{
26    select,
27    sync::{Mutex, Notify},
28    task::JoinHandle,
29};
30
31use crate::{
32    evaluation::{
33        context::{ChangeContext, QueryPartEvaluationContext, QueryVariables},
34        EvaluationError, ExpressionEvaluationContext, ExpressionEvaluator, InstantQueryClock,
35        QueryPartEvaluator,
36    },
37    interface::{
38        ElementIndex, FutureQueue, FutureQueueConsumer, MiddlewareError, QueryClock,
39        SessionControl, SessionGuard,
40    },
41    middleware::SourceMiddlewarePipelineCollection,
42    models::{Element, SourceChange},
43    path_solver::{
44        match_path::{MatchPath, SlotElementSpec},
45        solution::{MatchPathSolution, SolutionSignature},
46        MatchPathSolver, MatchSolveContext,
47    },
48};
49
50/// Result of processing a due future item.
51/// Contains the evaluation results and the source_id from the popped future's element_ref,
52/// needed by the lib crate to record provenance in QueryResult metadata.
53pub struct DueFutureResult {
54    pub results: Vec<QueryPartEvaluationContext>,
55    /// The source_id from the popped future's element_ref.
56    pub source_id: Arc<str>,
57}
58
59pub struct ContinuousQuery {
60    expression_evaluator: Arc<ExpressionEvaluator>,
61    part_evaluator: Arc<QueryPartEvaluator>,
62    element_index: Arc<dyn ElementIndex>,
63    path_solver: Arc<MatchPathSolver>,
64    match_path: Arc<MatchPath>,
65    query: Arc<Query>,
66    future_consumer_shutdown_request: Arc<Notify>,
67    future_queue: Arc<dyn FutureQueue>,
68    future_queue_task: Mutex<Option<JoinHandle<()>>>,
69    change_lock: Mutex<()>,
70    source_pipelines: SourceMiddlewarePipelineCollection,
71    session_control: Arc<dyn SessionControl>,
72}
73
74impl ContinuousQuery {
75    #[allow(clippy::too_many_arguments)]
76    pub fn new(
77        query: Arc<Query>,
78        match_path: Arc<MatchPath>,
79        expression_evaluator: Arc<ExpressionEvaluator>,
80        element_index: Arc<dyn ElementIndex>,
81        path_solver: Arc<MatchPathSolver>,
82        part_evaluator: Arc<QueryPartEvaluator>,
83        future_queue: Arc<dyn FutureQueue>,
84        source_pipelines: SourceMiddlewarePipelineCollection,
85        session_control: Arc<dyn SessionControl>,
86    ) -> Self {
87        Self {
88            expression_evaluator,
89            element_index,
90            path_solver,
91            match_path,
92            part_evaluator,
93            query,
94            future_consumer_shutdown_request: Arc::new(Notify::new()),
95            future_queue,
96            future_queue_task: Mutex::new(None),
97            change_lock: Mutex::new(()),
98            source_pipelines,
99            session_control,
100        }
101    }
102
103    #[tracing::instrument(skip_all, err, level = "debug")]
104    pub async fn process_source_change(
105        &self,
106        change: SourceChange,
107    ) -> Result<Vec<QueryPartEvaluationContext>, EvaluationError> {
108        let _lock = self.change_lock.lock().await;
109        let guard = SessionGuard::begin(self.session_control.clone()).await?;
110
111        let changes = self.execute_source_middleware(change).await?;
112        let result = self.process_changes_inner(changes).await?;
113
114        guard.commit().await?;
115        Ok(result)
116    }
117
118    /// Atomically pop a due future from the queue and process it within a single session.
119    ///
120    /// Returns `Ok(None)` when the queue is empty (stale peek).
121    /// Returns `Ok(Some(DueFutureResult))` with results and the original source_id.
122    ///
123    /// Pop happens inside the session → atomic with all downstream index writes.
124    /// If a crash occurs before commit, the pop rolls back and the item stays in the queue.
125    #[tracing::instrument(skip_all, err, level = "debug")]
126    pub async fn process_due_futures(&self) -> Result<Option<DueFutureResult>, EvaluationError> {
127        let _lock = self.change_lock.lock().await;
128        let guard = SessionGuard::begin(self.session_control.clone()).await?;
129
130        let future_ref = match self.future_queue.pop().await {
131            Ok(Some(fr)) => fr,
132            Ok(None) => {
133                guard.commit().await?;
134                return Ok(None);
135            }
136            Err(e) => return Err(EvaluationError::from(e)),
137        };
138
139        let source_id = future_ref.element_ref.source_id.clone();
140        let change = SourceChange::Future { future_ref };
141        let changes = self.execute_source_middleware(change).await?;
142        let results = self.process_changes_inner(changes).await?;
143        guard.commit().await?;
144        Ok(Some(DueFutureResult { results, source_id }))
145    }
146
147    /// Expose the ContinuousQuery's future queue for external polling.
148    pub fn future_queue(&self) -> Arc<dyn FutureQueue> {
149        self.future_queue.clone()
150    }
151
152    /// Inner processing logic shared by `process_source_change` and `process_due_futures`.
153    /// Must be called within an active session and while holding `change_lock`.
154    #[tracing::instrument(skip_all, err, level = "debug")]
155    async fn process_changes_inner(
156        &self,
157        changes: Vec<SourceChange>,
158    ) -> Result<Vec<QueryPartEvaluationContext>, EvaluationError> {
159        let mut result = Vec::new();
160
161        for change in changes {
162            let base_variables = QueryVariables::new(); //todo: get query parameters
163            let after_clock = Arc::new(InstantQueryClock::from_source_change(&change));
164
165            let solution_changes = self
166                .build_solution_changes(&base_variables, change, after_clock.clone())
167                .await?;
168            let before_clock = match solution_changes.before_clock {
169                Some(before_clock) => before_clock,
170                None => after_clock.clone(),
171            };
172
173            let mut aggregation_results = CollapsedAggregationResults::new();
174
175            for (solution_signature, part_context) in solution_changes.changes {
176                let change_results = match self
177                    .project_solution(
178                        part_context,
179                        &ChangeContext {
180                            solution_signature,
181                            before_clock: before_clock.clone(),
182                            after_clock: after_clock.clone(),
183                            before_anchor_element: solution_changes.before_anchor_element.clone(),
184                            after_anchor_element: solution_changes.anchor_element.clone(),
185                            is_future_reprocess: solution_changes.is_future_reprocess,
186                            before_grouping_hash: solution_signature,
187                            after_grouping_hash: solution_signature,
188                        },
189                    )
190                    .await
191                {
192                    Ok(results) => results,
193                    Err(EvaluationError::DivideByZero) => {
194                        log::debug!("Skipping solution due to DivideByZero in projection");
195                        continue;
196                    }
197                    Err(e) => return Err(e),
198                };
199                change_results.into_iter().for_each(|ctx| {
200                    match &ctx {
201                        QueryPartEvaluationContext::Aggregation {
202                            before,
203                            after,
204                            default_before,
205                            ..
206                        } => {
207                            if let Some(before) = before {
208                                if before == after && !default_before {
209                                    return;
210                                }
211                            }
212
213                            aggregation_results.insert(ctx);
214                        }
215                        QueryPartEvaluationContext::Updating { before, after, .. } => {
216                            if before == after {
217                                return;
218                            }
219                            result.push(ctx);
220                        }
221                        _ => result.push(ctx),
222                    };
223                });
224            }
225
226            for ctx in aggregation_results.into_result_vec() {
227                result.push(ctx);
228            }
229        }
230
231        Ok(result)
232    }
233
234    #[tracing::instrument(skip_all, err, level = "debug")]
235    async fn build_solution_changes(
236        &self,
237        base_variables: &QueryVariables,
238        change: SourceChange,
239        clock: Arc<dyn QueryClock>,
240    ) -> Result<SolutionChangesResult, EvaluationError> {
241        let mut result = SolutionChangesResult::new();
242        let mut before_change_solutions = HashMap::new();
243        let mut after_change_solutions = HashMap::new();
244
245        match change {
246            SourceChange::Insert { element } => {
247                let element = Arc::new(element);
248                let affinity_slots = self
249                    .get_slots_with_affinity(base_variables, element.clone(), clock.clone())
250                    .await?;
251                let solutions = self
252                    .resolve_solutions(element.clone(), affinity_slots, true)
253                    .await?;
254
255                for (signature, solution) in solutions {
256                    if let Some(blank_optional_solution) =
257                        solution.get_empty_optional_solution(&self.match_path)
258                    {
259                        before_change_solutions.insert(signature, blank_optional_solution);
260                    }
261                    after_change_solutions.insert(signature, solution);
262                }
263
264                result.anchor_element = Some(element);
265            }
266            SourceChange::Update { mut element } => {
267                if let Some(prev_version) = self
268                    .element_index
269                    .get_element(element.get_reference())
270                    .await?
271                {
272                    let prev_timestamp = prev_version.get_effective_from();
273                    let before_clock =
274                        Arc::new(InstantQueryClock::new(prev_timestamp, clock.get_realtime()));
275                    let affinity_slots = self
276                        .get_slots_with_affinity(
277                            base_variables,
278                            prev_version.clone(),
279                            before_clock.clone(),
280                        )
281                        .await?;
282                    let solutions = self
283                        .resolve_solutions(prev_version.clone(), affinity_slots, false)
284                        .await?;
285                    for (signature, solution) in solutions {
286                        before_change_solutions.insert(signature, solution);
287                    }
288                    element.merge_missing_properties(prev_version.as_ref());
289                    result.before_clock = Some(before_clock);
290                    result.before_anchor_element = Some(prev_version);
291                }
292
293                let element = Arc::new(element);
294                let affinity_slots = self
295                    .get_slots_with_affinity(base_variables, element.clone(), clock.clone())
296                    .await?;
297                let solutions = self
298                    .resolve_solutions(element.clone(), affinity_slots, true)
299                    .await?;
300
301                for (signature, solution) in solutions {
302                    after_change_solutions.insert(signature, solution);
303                }
304
305                result.anchor_element = Some(element);
306            }
307            SourceChange::Delete { metadata } => {
308                if let Some(element) = self.element_index.get_element(&metadata.reference).await? {
309                    let prev_timestamp = element.get_effective_from();
310                    let before_clock =
311                        Arc::new(InstantQueryClock::new(prev_timestamp, clock.get_realtime()));
312                    let affinity_slots = self
313                        .get_slots_with_affinity(
314                            base_variables,
315                            element.clone(),
316                            before_clock.clone(),
317                        )
318                        .await?;
319                    let solutions = self
320                        .resolve_solutions(element.clone(), affinity_slots, false)
321                        .await?;
322                    for (signature, solution) in solutions {
323                        if let Some(blank_optional_solution) =
324                            solution.get_empty_optional_solution(&self.match_path)
325                        {
326                            after_change_solutions.insert(signature, blank_optional_solution);
327                        }
328
329                        before_change_solutions.insert(signature, solution);
330                    }
331                    result.before_clock = Some(before_clock);
332                    result.before_anchor_element = Some(element);
333
334                    match self.element_index.delete_element(&metadata.reference).await {
335                        Ok(_) => {}
336                        Err(e) => return Err(EvaluationError::from(e)),
337                    }
338                }
339            }
340            SourceChange::Future { future_ref } => {
341                result.is_future_reprocess = true;
342                if let Some(element) = self
343                    .element_index
344                    .get_element(&future_ref.element_ref)
345                    .await?
346                {
347                    let prev_timestamp = element.get_effective_from();
348                    if prev_timestamp >= future_ref.due_time {
349                        // element already processed with due time expired, don't duplicate
350                        return Ok(result);
351                    }
352
353                    let before_clock =
354                        Arc::new(InstantQueryClock::new(prev_timestamp, prev_timestamp));
355
356                    let affinity_slots = self
357                        .get_slots_with_affinity(
358                            base_variables,
359                            element.clone(),
360                            before_clock.clone(),
361                        )
362                        .await?;
363
364                    let before_solutions = self
365                        .resolve_solutions(element.clone(), affinity_slots, false)
366                        .await?;
367                    for (signature, solution) in before_solutions {
368                        before_change_solutions.insert(signature, solution);
369                    }
370
371                    result.before_clock = Some(before_clock);
372                    result.before_anchor_element = Some(element.clone());
373
374                    let affinity_slots = self
375                        .get_slots_with_affinity(base_variables, element.clone(), clock.clone())
376                        .await?;
377
378                    let after_solutions = self
379                        .resolve_solutions(element.clone(), affinity_slots, false)
380                        .await?;
381                    for (signature, solution) in after_solutions {
382                        after_change_solutions.insert(signature, solution);
383                    }
384
385                    result.anchor_element = Some(element);
386                }
387            }
388        }
389
390        for (sig, before_sol) in &before_change_solutions {
391            match after_change_solutions.get(sig) {
392                Some(after_sol) => result.changes.push((
393                    *sig,
394                    QueryPartEvaluationContext::Updating {
395                        before: before_sol.into_query_variables(&self.match_path, base_variables),
396                        after: after_sol.into_query_variables(&self.match_path, base_variables),
397                        row_signature: 0,
398                    },
399                )),
400                None => result.changes.push((
401                    *sig,
402                    QueryPartEvaluationContext::Removing {
403                        before: before_sol.into_query_variables(&self.match_path, base_variables),
404                        row_signature: 0,
405                    },
406                )),
407            }
408        }
409
410        for (sig, after_sol) in &after_change_solutions {
411            if !before_change_solutions.contains_key(sig) {
412                result.changes.push((
413                    *sig,
414                    QueryPartEvaluationContext::Adding {
415                        after: after_sol.into_query_variables(&self.match_path, base_variables),
416                        row_signature: 0,
417                    },
418                ))
419            }
420        }
421
422        Ok(result)
423    }
424
425    async fn resolve_solutions(
426        &self,
427        anchor_element: Arc<Element>,
428        affinity_slots: Vec<usize>,
429        update_index: bool,
430    ) -> Result<HashMap<u64, MatchPathSolution>, EvaluationError> {
431        if update_index {
432            self.element_index
433                .set_element(anchor_element.as_ref(), &affinity_slots)
434                .await?;
435        }
436
437        let mut result = HashMap::new();
438
439        for slot_num in affinity_slots {
440            let solution = self
441                .path_solver
442                .solve(self.match_path.clone(), anchor_element.clone(), slot_num)
443                .await?;
444            result.extend(solution.into_iter());
445        }
446
447        Ok(result)
448    }
449
450    async fn get_slots_with_affinity(
451        &self,
452        variables: &QueryVariables,
453        anchor_element: Arc<Element>,
454        clock: Arc<dyn QueryClock>,
455    ) -> Result<Vec<usize>, EvaluationError> {
456        let context = MatchSolveContext::new(variables, clock);
457
458        let mut affinity_slots = Vec::new();
459
460        for (slot_num, slot) in self.match_path.slots.iter().enumerate() {
461            if self
462                .match_element_to_slot(&context, &slot.spec, anchor_element.clone())
463                .await?
464            {
465                affinity_slots.push(slot_num);
466            }
467        }
468
469        Ok(affinity_slots)
470    }
471
472    async fn match_element_to_slot(
473        &self,
474        context: &MatchSolveContext<'_>,
475        element_spec: &SlotElementSpec,
476        element: Arc<Element>,
477    ) -> Result<bool, EvaluationError> {
478        let metadata = element.get_metadata();
479        let mut label_match = element_spec.labels.is_empty();
480
481        for label in &element_spec.labels {
482            if metadata.labels.contains(label) {
483                label_match = true;
484                break;
485            }
486        }
487
488        if !label_match {
489            return Ok(false);
490        }
491
492        let mut variables = context.variables.clone();
493
494        let element_variable = element.to_expression_variable();
495
496        if element_spec.annotation.is_some() {
497            variables.insert(
498                element_spec
499                    .annotation
500                    .clone()
501                    .unwrap()
502                    .to_string()
503                    .into_boxed_str(),
504                element_variable.clone(),
505            );
506        }
507
508        variables.insert("".into(), element_variable);
509
510        let eval_context = ExpressionEvaluationContext::from_slot(
511            &variables,
512            context.clock.clone(),
513            &metadata.reference,
514        );
515
516        for predicate in &element_spec.predicates {
517            let result = self
518                .expression_evaluator
519                .evaluate_predicate(&eval_context, predicate)
520                .await?;
521            if !result {
522                return Ok(false);
523            }
524        }
525
526        Ok(true)
527    }
528
529    #[tracing::instrument(skip_all, err, level = "debug")]
530    async fn project_solution(
531        &self,
532        part_context: QueryPartEvaluationContext,
533        change_context: &ChangeContext,
534    ) -> Result<Vec<QueryPartEvaluationContext>, EvaluationError> {
535        let mut result = Vec::new();
536        let mut contexts = vec![(part_context, change_context.clone())];
537
538        let mut part_num = 0;
539
540        for part in &self.query.parts {
541            part_num += 1;
542            result.clear();
543
544            for (part_context, change_context) in &contexts {
545                let new_contexts = self
546                    .part_evaluator
547                    .evaluate(part_context.clone(), part_num, part, change_context)
548                    .await?;
549
550                let mut aggregation_results = CollapsedAggregationResults::new();
551
552                new_contexts.into_iter().for_each(|ctx| match &ctx {
553                    QueryPartEvaluationContext::Aggregation { .. } => {
554                        aggregation_results.insert(ctx)
555                    }
556                    QueryPartEvaluationContext::Noop => (),
557                    _ => result.push((ctx, change_context.clone())),
558                });
559
560                for actx in aggregation_results.into_vec_with_context(change_context) {
561                    result.push(actx);
562                }
563            }
564            contexts = result.clone();
565        }
566
567        Ok(result
568            .into_iter()
569            .map(|(ctx, cc)| match ctx {
570                QueryPartEvaluationContext::Adding { after, .. } => {
571                    QueryPartEvaluationContext::Adding {
572                        after,
573                        row_signature: cc.solution_signature,
574                    }
575                }
576                QueryPartEvaluationContext::Updating { before, after, .. } => {
577                    QueryPartEvaluationContext::Updating {
578                        before,
579                        after,
580                        row_signature: cc.solution_signature,
581                    }
582                }
583                QueryPartEvaluationContext::Removing { before, .. } => {
584                    QueryPartEvaluationContext::Removing {
585                        before,
586                        row_signature: cc.solution_signature,
587                    }
588                }
589                QueryPartEvaluationContext::Aggregation {
590                    before,
591                    after,
592                    grouping_keys,
593                    default_before,
594                    default_after,
595                    ..
596                } => QueryPartEvaluationContext::Aggregation {
597                    before,
598                    after,
599                    grouping_keys,
600                    default_before,
601                    default_after,
602                    row_signature: cc.after_grouping_hash,
603                },
604                QueryPartEvaluationContext::Noop => QueryPartEvaluationContext::Noop,
605            })
606            .collect())
607    }
608
609    #[tracing::instrument(skip_all, err, level = "debug")]
610    async fn execute_source_middleware(
611        &self,
612        change: SourceChange,
613    ) -> Result<Vec<SourceChange>, MiddlewareError> {
614        let source_id = change.get_reference().source_id.clone();
615        let mut source_changes = vec![change];
616
617        let pipeline = match self.source_pipelines.get(source_id) {
618            Some(pipeline) => pipeline,
619            None => return Ok(source_changes),
620        };
621
622        let mut new_source_changes = Vec::new();
623        for source_change in source_changes {
624            new_source_changes.append(
625                &mut pipeline
626                    .process(source_change, self.element_index.clone())
627                    .await?,
628            );
629        }
630
631        source_changes = new_source_changes;
632
633        Ok(source_changes)
634    }
635
636    pub async fn set_future_consumer(&self, consumer: Arc<dyn FutureQueueConsumer>) {
637        let mut future_queue_task = self.future_queue_task.lock().await;
638        if let Some(c) = future_queue_task.take() {
639            c.abort();
640        }
641
642        let queue = self.future_queue.clone();
643        let shutdown_request = self.future_consumer_shutdown_request.clone();
644
645        let task = tokio::spawn(async move {
646            let idle_interval = Duration::from_secs(1);
647            let error_interval = Duration::from_secs(5);
648            loop {
649                select! {
650                    _ = shutdown_request.notified() => {
651                        log::info!("Future queue consumer shutting down");
652                        break;
653                    }
654                    peek = queue.peek_due_time() => {
655                        match peek {
656                            Ok(Some(due_time)) => {
657                                if due_time > consumer.now() {
658                                    tokio::time::sleep(idle_interval).await;
659                                    continue;
660                                }
661                            }
662                            Ok(None) => {
663                                tokio::time::sleep(idle_interval).await;
664                                continue;
665                            }
666                            Err(e) => {
667                                log::error!("Future queue consumer error: {e:?}");
668                                tokio::time::sleep(error_interval).await;
669                                continue;
670                            }
671                        };
672
673                        // Items are due — delegate to consumer which calls process_due_futures()
674                        match consumer.on_items_due().await {
675                            Ok(_) => {}
676                            Err(e) => {
677                                log::error!("Future queue consumer error: {e:?}");
678                                consumer.on_error(e).await;
679                                tokio::time::sleep(error_interval).await;
680                            }
681                        }
682                    }
683                }
684            }
685        });
686
687        _ = future_queue_task.insert(task);
688    }
689
690    pub async fn terminate_future_consumer(&self) {
691        let mut future_queue_task = self.future_queue_task.lock().await;
692        if let Some(task) = future_queue_task.take() {
693            self.future_consumer_shutdown_request.notify_one();
694            select! {
695                _ = task => {
696                    log::info!("Future queue consumer terminated");
697                }
698                _ = tokio::time::sleep(Duration::from_secs(10)) => {
699                    log::error!("Future queue consumer termination timeout");
700                }
701            }
702        }
703    }
704
705    pub fn get_query(&self) -> Arc<Query> {
706        self.query.clone()
707    }
708}
709
710impl Drop for ContinuousQuery {
711    fn drop(&mut self) {
712        self.future_consumer_shutdown_request.notify_one();
713    }
714}
715
716impl Debug for ContinuousQuery {
717    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
718        f.debug_struct("ContinuousQuery")
719            .field("query", &self.query)
720            .finish()
721    }
722}
723
724struct SolutionChangesResult {
725    pub changes: Vec<(SolutionSignature, QueryPartEvaluationContext)>,
726    pub anchor_element: Option<Arc<Element>>,
727    pub before_clock: Option<Arc<dyn QueryClock>>,
728    pub before_anchor_element: Option<Arc<Element>>,
729    pub is_future_reprocess: bool,
730}
731
732impl SolutionChangesResult {
733    fn new() -> Self {
734        Self {
735            changes: Vec::new(),
736            before_clock: None,
737            anchor_element: None,
738            before_anchor_element: None,
739            is_future_reprocess: false,
740        }
741    }
742}
743
744struct CollapsedAggregationResults {
745    // [hash of after change grouping keys] -> (context, hash of before change grouping keys)
746    data: HashMap<u64, (QueryPartEvaluationContext, u64)>,
747}
748
749impl CollapsedAggregationResults {
750    fn new() -> Self {
751        Self {
752            data: HashMap::new(),
753        }
754    }
755
756    fn insert(&mut self, context: QueryPartEvaluationContext) {
757        if let QueryPartEvaluationContext::Aggregation {
758            before,
759            after,
760            grouping_keys,
761            default_before,
762            default_after,
763            ..
764        } = context
765        {
766            let after_key = extract_grouping_value_hash(&grouping_keys, &after);
767            let before_key = match &before {
768                Some(before) => extract_grouping_value_hash(&grouping_keys, before),
769                None => after_key,
770            };
771
772            match self.data.remove(&after_key) {
773                Some((existing, before_key)) => {
774                    if let QueryPartEvaluationContext::Aggregation {
775                        before: existing_before,
776                        ..
777                    } = existing
778                    {
779                        self.data.insert(
780                            after_key,
781                            (
782                                QueryPartEvaluationContext::Aggregation {
783                                    before: existing_before,
784                                    default_before,
785                                    default_after,
786                                    after,
787                                    grouping_keys,
788                                    row_signature: after_key,
789                                },
790                                before_key,
791                            ),
792                        );
793                    }
794                }
795                None => {
796                    self.data.insert(
797                        after_key,
798                        (
799                            QueryPartEvaluationContext::Aggregation {
800                                before,
801                                after,
802                                grouping_keys,
803                                default_before,
804                                default_after,
805                                row_signature: after_key,
806                            },
807                            before_key,
808                        ),
809                    );
810                }
811            }
812        }
813    }
814
815    fn into_vec_with_context(
816        self,
817        change_context: &ChangeContext,
818    ) -> Vec<(QueryPartEvaluationContext, ChangeContext)> {
819        self.data
820            .into_iter()
821            .map(|(after_key, (v, before_key))| {
822                let mut change_context = change_context.clone();
823                change_context.before_grouping_hash = before_key;
824                change_context.after_grouping_hash = after_key;
825                (v, change_context)
826            })
827            .collect()
828    }
829
830    fn into_result_vec(self) -> Vec<QueryPartEvaluationContext> {
831        self.data
832            .into_iter()
833            .map(|(after_key, (ctx, _))| match ctx {
834                QueryPartEvaluationContext::Aggregation {
835                    before,
836                    after,
837                    grouping_keys,
838                    default_before,
839                    default_after,
840                    ..
841                } => QueryPartEvaluationContext::Aggregation {
842                    before,
843                    after,
844                    grouping_keys,
845                    default_before,
846                    default_after,
847                    row_signature: after_key,
848                },
849                other => other,
850            })
851            .collect()
852    }
853}
854
855fn extract_grouping_value_hash(grouping_keys: &Vec<String>, variables: &QueryVariables) -> u64 {
856    let mut hasher = SpookyHasher::default();
857
858    for key in grouping_keys {
859        match variables.get(key.as_str()) {
860            Some(v) => v.hash_for_groupby(&mut hasher),
861            None => 0.hash(&mut hasher),
862        };
863    }
864    hasher.finish()
865}