Skip to main content

drasi_core/query/
continuous_query.rs

1// Copyright 2024 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::HashMap,
17    fmt::Debug,
18    future::Future,
19    hash::{Hash, Hasher},
20    sync::Arc,
21    time::Duration,
22};
23
24use drasi_query_ast::ast::Query;
25use hashers::jenkins::spooky_hash::SpookyHasher;
26use tokio::{
27    select,
28    sync::{Mutex, Notify},
29    task::JoinHandle,
30};
31
32use crate::{
33    evaluation::{
34        context::{ChangeContext, QueryPartEvaluationContext, QueryVariables},
35        EvaluationError, ExpressionEvaluationContext, ExpressionEvaluator, InstantQueryClock,
36        QueryPartEvaluator,
37    },
38    interface::{
39        ElementIndex, FutureQueue, FutureQueueConsumer, IndexError, MiddlewareError, QueryClock,
40        SessionControl, SessionGuard,
41    },
42    middleware::SourceMiddlewarePipelineCollection,
43    models::{Element, SourceChange},
44    path_solver::{
45        match_path::{MatchPath, SlotElementSpec},
46        solution::{MatchPathSolution, SolutionSignature},
47        MatchPathSolver, MatchSolveContext,
48    },
49};
50
51/// Result of processing a due future item.
52/// Contains the evaluation results and the source_id from the popped future's element_ref,
53/// needed by the lib crate to record provenance in QueryResult metadata.
54pub struct DueFutureResult {
55    pub results: Vec<QueryPartEvaluationContext>,
56    /// The source_id from the popped future's element_ref.
57    pub source_id: Arc<str>,
58}
59
60pub struct ContinuousQuery {
61    expression_evaluator: Arc<ExpressionEvaluator>,
62    part_evaluator: Arc<QueryPartEvaluator>,
63    element_index: Arc<dyn ElementIndex>,
64    path_solver: Arc<MatchPathSolver>,
65    match_path: Arc<MatchPath>,
66    query: Arc<Query>,
67    future_consumer_shutdown_request: Arc<Notify>,
68    future_queue: Arc<dyn FutureQueue>,
69    future_queue_task: Mutex<Option<JoinHandle<()>>>,
70    change_lock: Mutex<()>,
71    source_pipelines: SourceMiddlewarePipelineCollection,
72    session_control: Arc<dyn SessionControl>,
73}
74
75impl ContinuousQuery {
76    #[allow(clippy::too_many_arguments)]
77    pub fn new(
78        query: Arc<Query>,
79        match_path: Arc<MatchPath>,
80        expression_evaluator: Arc<ExpressionEvaluator>,
81        element_index: Arc<dyn ElementIndex>,
82        path_solver: Arc<MatchPathSolver>,
83        part_evaluator: Arc<QueryPartEvaluator>,
84        future_queue: Arc<dyn FutureQueue>,
85        source_pipelines: SourceMiddlewarePipelineCollection,
86        session_control: Arc<dyn SessionControl>,
87    ) -> Self {
88        Self {
89            expression_evaluator,
90            element_index,
91            path_solver,
92            match_path,
93            part_evaluator,
94            query,
95            future_consumer_shutdown_request: Arc::new(Notify::new()),
96            future_queue,
97            future_queue_task: Mutex::new(None),
98            change_lock: Mutex::new(()),
99            source_pipelines,
100            session_control,
101        }
102    }
103
104    #[tracing::instrument(skip_all, err, level = "debug")]
105    pub async fn process_source_change(
106        &self,
107        change: SourceChange,
108    ) -> Result<Vec<QueryPartEvaluationContext>, EvaluationError> {
109        let _lock = self.change_lock.lock().await;
110        let guard = SessionGuard::begin(self.session_control.clone()).await?;
111
112        let changes = self.execute_source_middleware(change).await?;
113        let result = self.process_changes_inner(changes).await?;
114
115        guard.commit().await?;
116        Ok(result)
117    }
118
119    /// Process a source change with a pre-commit hook that runs inside the session.
120    ///
121    /// The hook executes after index updates but before the session commits,
122    /// allowing callers to stage additional writes (e.g. checkpoint data) into
123    /// the same atomic transaction. The change_lock is held for the entire
124    /// duration, preserving serialization.
125    #[tracing::instrument(skip_all, err, level = "debug")]
126    pub async fn process_source_change_with_hook<F, Fut>(
127        &self,
128        change: SourceChange,
129        pre_commit_hook: F,
130    ) -> Result<Vec<QueryPartEvaluationContext>, EvaluationError>
131    where
132        F: FnOnce() -> Fut + Send,
133        Fut: Future<Output = Result<(), IndexError>> + Send,
134    {
135        let _lock = self.change_lock.lock().await;
136        let guard = SessionGuard::begin(self.session_control.clone()).await?;
137
138        let changes = self.execute_source_middleware(change).await?;
139        let result = self.process_changes_inner(changes).await?;
140
141        pre_commit_hook().await?;
142        guard.commit().await?;
143        Ok(result)
144    }
145
146    /// Atomically pop a due future from the queue and process it within a single session.
147    ///
148    /// Returns `Ok(None)` when the queue is empty (stale peek).
149    /// Returns `Ok(Some(DueFutureResult))` with results and the original source_id.
150    ///
151    /// Pop happens inside the session → atomic with all downstream index writes.
152    /// If a crash occurs before commit, the pop rolls back and the item stays in the queue.
153    #[tracing::instrument(skip_all, err, level = "debug")]
154    pub async fn process_due_futures(&self) -> Result<Option<DueFutureResult>, EvaluationError> {
155        let _lock = self.change_lock.lock().await;
156        let guard = SessionGuard::begin(self.session_control.clone()).await?;
157
158        let future_ref = match self.future_queue.pop().await {
159            Ok(Some(fr)) => fr,
160            Ok(None) => {
161                guard.commit().await?;
162                return Ok(None);
163            }
164            Err(e) => return Err(EvaluationError::from(e)),
165        };
166
167        let source_id = future_ref.element_ref.source_id.clone();
168        let change = SourceChange::Future { future_ref };
169        let changes = self.execute_source_middleware(change).await?;
170        let results = self.process_changes_inner(changes).await?;
171        guard.commit().await?;
172        Ok(Some(DueFutureResult { results, source_id }))
173    }
174
175    /// Expose the ContinuousQuery's future queue for external polling.
176    pub fn future_queue(&self) -> Arc<dyn FutureQueue> {
177        self.future_queue.clone()
178    }
179
180    /// Inner processing logic shared by `process_source_change` and `process_due_futures`.
181    /// Must be called within an active session and while holding `change_lock`.
182    #[tracing::instrument(skip_all, err, level = "debug")]
183    async fn process_changes_inner(
184        &self,
185        changes: Vec<SourceChange>,
186    ) -> Result<Vec<QueryPartEvaluationContext>, EvaluationError> {
187        let mut result = Vec::new();
188
189        for change in changes {
190            let base_variables = QueryVariables::new(); //todo: get query parameters
191            let after_clock = Arc::new(InstantQueryClock::from_source_change(&change));
192
193            let solution_changes = self
194                .build_solution_changes(&base_variables, change, after_clock.clone())
195                .await?;
196            let before_clock = match solution_changes.before_clock {
197                Some(before_clock) => before_clock,
198                None => after_clock.clone(),
199            };
200
201            let mut aggregation_results = CollapsedAggregationResults::new();
202
203            for (solution_signature, part_context) in solution_changes.changes {
204                let change_results = match self
205                    .project_solution(
206                        part_context,
207                        &ChangeContext {
208                            solution_signature,
209                            before_clock: before_clock.clone(),
210                            after_clock: after_clock.clone(),
211                            before_anchor_element: solution_changes.before_anchor_element.clone(),
212                            after_anchor_element: solution_changes.anchor_element.clone(),
213                            is_future_reprocess: solution_changes.is_future_reprocess,
214                            before_grouping_hash: solution_signature,
215                            after_grouping_hash: solution_signature,
216                        },
217                    )
218                    .await
219                {
220                    Ok(results) => results,
221                    Err(EvaluationError::DivideByZero) => {
222                        log::debug!("Skipping solution due to DivideByZero in projection");
223                        continue;
224                    }
225                    Err(e) => return Err(e),
226                };
227                change_results.into_iter().for_each(|ctx| {
228                    match &ctx {
229                        QueryPartEvaluationContext::Aggregation {
230                            before,
231                            after,
232                            default_before,
233                            ..
234                        } => {
235                            if let Some(before) = before {
236                                if before == after && !default_before {
237                                    return;
238                                }
239                            }
240
241                            aggregation_results.insert(ctx);
242                        }
243                        QueryPartEvaluationContext::Updating { before, after, .. } => {
244                            if before == after {
245                                return;
246                            }
247                            result.push(ctx);
248                        }
249                        _ => result.push(ctx),
250                    };
251                });
252            }
253
254            for ctx in aggregation_results.into_result_vec() {
255                result.push(ctx);
256            }
257        }
258
259        Ok(result)
260    }
261
262    #[tracing::instrument(skip_all, err, level = "debug")]
263    async fn build_solution_changes(
264        &self,
265        base_variables: &QueryVariables,
266        change: SourceChange,
267        clock: Arc<dyn QueryClock>,
268    ) -> Result<SolutionChangesResult, EvaluationError> {
269        let mut result = SolutionChangesResult::new();
270        let mut before_change_solutions = HashMap::new();
271        let mut after_change_solutions = HashMap::new();
272
273        match change {
274            SourceChange::Insert { element } => {
275                let element = Arc::new(element);
276                let affinity_slots = self
277                    .get_slots_with_affinity(base_variables, element.clone(), clock.clone())
278                    .await?;
279                let solutions = self
280                    .resolve_solutions(element.clone(), affinity_slots, true)
281                    .await?;
282
283                for (signature, solution) in solutions {
284                    if let Some(blank_optional_solution) =
285                        solution.get_empty_optional_solution(&self.match_path)
286                    {
287                        before_change_solutions.insert(signature, blank_optional_solution);
288                    }
289                    after_change_solutions.insert(signature, solution);
290                }
291
292                result.anchor_element = Some(element);
293            }
294            SourceChange::Update { mut element } => {
295                if let Some(prev_version) = self
296                    .element_index
297                    .get_element(element.get_reference())
298                    .await?
299                {
300                    let prev_timestamp = prev_version.get_effective_from();
301                    let before_clock =
302                        Arc::new(InstantQueryClock::new(prev_timestamp, clock.get_realtime()));
303                    let affinity_slots = self
304                        .get_slots_with_affinity(
305                            base_variables,
306                            prev_version.clone(),
307                            before_clock.clone(),
308                        )
309                        .await?;
310                    let solutions = self
311                        .resolve_solutions(prev_version.clone(), affinity_slots, false)
312                        .await?;
313                    for (signature, solution) in solutions {
314                        before_change_solutions.insert(signature, solution);
315                    }
316                    element.merge_missing_properties(prev_version.as_ref());
317                    result.before_clock = Some(before_clock);
318                    result.before_anchor_element = Some(prev_version);
319                }
320
321                let element = Arc::new(element);
322                let affinity_slots = self
323                    .get_slots_with_affinity(base_variables, element.clone(), clock.clone())
324                    .await?;
325                let solutions = self
326                    .resolve_solutions(element.clone(), affinity_slots, true)
327                    .await?;
328
329                for (signature, solution) in solutions {
330                    after_change_solutions.insert(signature, solution);
331                }
332
333                result.anchor_element = Some(element);
334            }
335            SourceChange::Delete { metadata } => {
336                if let Some(element) = self.element_index.get_element(&metadata.reference).await? {
337                    let prev_timestamp = element.get_effective_from();
338                    let before_clock =
339                        Arc::new(InstantQueryClock::new(prev_timestamp, clock.get_realtime()));
340                    let affinity_slots = self
341                        .get_slots_with_affinity(
342                            base_variables,
343                            element.clone(),
344                            before_clock.clone(),
345                        )
346                        .await?;
347                    let solutions = self
348                        .resolve_solutions(element.clone(), affinity_slots, false)
349                        .await?;
350                    for (signature, solution) in solutions {
351                        if let Some(blank_optional_solution) =
352                            solution.get_empty_optional_solution(&self.match_path)
353                        {
354                            after_change_solutions.insert(signature, blank_optional_solution);
355                        }
356
357                        before_change_solutions.insert(signature, solution);
358                    }
359                    result.before_clock = Some(before_clock);
360                    result.before_anchor_element = Some(element);
361
362                    match self.element_index.delete_element(&metadata.reference).await {
363                        Ok(_) => {}
364                        Err(e) => return Err(EvaluationError::from(e)),
365                    }
366                }
367            }
368            SourceChange::Future { future_ref } => {
369                result.is_future_reprocess = true;
370                if let Some(element) = self
371                    .element_index
372                    .get_element(&future_ref.element_ref)
373                    .await?
374                {
375                    let prev_timestamp = element.get_effective_from();
376                    if prev_timestamp >= future_ref.due_time {
377                        // element already processed with due time expired, don't duplicate
378                        return Ok(result);
379                    }
380
381                    let before_clock =
382                        Arc::new(InstantQueryClock::new(prev_timestamp, prev_timestamp));
383
384                    let affinity_slots = self
385                        .get_slots_with_affinity(
386                            base_variables,
387                            element.clone(),
388                            before_clock.clone(),
389                        )
390                        .await?;
391
392                    let before_solutions = self
393                        .resolve_solutions(element.clone(), affinity_slots, false)
394                        .await?;
395                    for (signature, solution) in before_solutions {
396                        before_change_solutions.insert(signature, solution);
397                    }
398
399                    result.before_clock = Some(before_clock);
400                    result.before_anchor_element = Some(element.clone());
401
402                    let affinity_slots = self
403                        .get_slots_with_affinity(base_variables, element.clone(), clock.clone())
404                        .await?;
405
406                    let after_solutions = self
407                        .resolve_solutions(element.clone(), affinity_slots, false)
408                        .await?;
409                    for (signature, solution) in after_solutions {
410                        after_change_solutions.insert(signature, solution);
411                    }
412
413                    result.anchor_element = Some(element);
414                }
415            }
416        }
417
418        for (sig, before_sol) in &before_change_solutions {
419            match after_change_solutions.get(sig) {
420                Some(after_sol) => result.changes.push((
421                    *sig,
422                    QueryPartEvaluationContext::Updating {
423                        before: before_sol.into_query_variables(&self.match_path, base_variables),
424                        after: after_sol.into_query_variables(&self.match_path, base_variables),
425                        row_signature: 0,
426                    },
427                )),
428                None => result.changes.push((
429                    *sig,
430                    QueryPartEvaluationContext::Removing {
431                        before: before_sol.into_query_variables(&self.match_path, base_variables),
432                        row_signature: 0,
433                    },
434                )),
435            }
436        }
437
438        for (sig, after_sol) in &after_change_solutions {
439            if !before_change_solutions.contains_key(sig) {
440                result.changes.push((
441                    *sig,
442                    QueryPartEvaluationContext::Adding {
443                        after: after_sol.into_query_variables(&self.match_path, base_variables),
444                        row_signature: 0,
445                    },
446                ))
447            }
448        }
449
450        Ok(result)
451    }
452
453    async fn resolve_solutions(
454        &self,
455        anchor_element: Arc<Element>,
456        affinity_slots: Vec<usize>,
457        update_index: bool,
458    ) -> Result<HashMap<u64, MatchPathSolution>, EvaluationError> {
459        if update_index {
460            self.element_index
461                .set_element(anchor_element.as_ref(), &affinity_slots)
462                .await?;
463        }
464
465        let mut result = HashMap::new();
466
467        for slot_num in affinity_slots {
468            let solution = self
469                .path_solver
470                .solve(self.match_path.clone(), anchor_element.clone(), slot_num)
471                .await?;
472            result.extend(solution.into_iter());
473        }
474
475        Ok(result)
476    }
477
478    async fn get_slots_with_affinity(
479        &self,
480        variables: &QueryVariables,
481        anchor_element: Arc<Element>,
482        clock: Arc<dyn QueryClock>,
483    ) -> Result<Vec<usize>, EvaluationError> {
484        let context = MatchSolveContext::new(variables, clock);
485
486        let mut affinity_slots = Vec::new();
487
488        for (slot_num, slot) in self.match_path.slots.iter().enumerate() {
489            if self
490                .match_element_to_slot(&context, &slot.spec, anchor_element.clone())
491                .await?
492            {
493                affinity_slots.push(slot_num);
494            }
495        }
496
497        Ok(affinity_slots)
498    }
499
500    async fn match_element_to_slot(
501        &self,
502        context: &MatchSolveContext<'_>,
503        element_spec: &SlotElementSpec,
504        element: Arc<Element>,
505    ) -> Result<bool, EvaluationError> {
506        let metadata = element.get_metadata();
507        let mut label_match = element_spec.labels.is_empty();
508
509        for label in &element_spec.labels {
510            if metadata.labels.contains(label) {
511                label_match = true;
512                break;
513            }
514        }
515
516        if !label_match {
517            return Ok(false);
518        }
519
520        let mut variables = context.variables.clone();
521
522        let element_variable = element.to_expression_variable();
523
524        if element_spec.annotation.is_some() {
525            variables.insert(
526                element_spec
527                    .annotation
528                    .clone()
529                    .unwrap()
530                    .to_string()
531                    .into_boxed_str(),
532                element_variable.clone(),
533            );
534        }
535
536        variables.insert("".into(), element_variable);
537
538        let eval_context = ExpressionEvaluationContext::from_slot(
539            &variables,
540            context.clock.clone(),
541            &metadata.reference,
542        );
543
544        for predicate in &element_spec.predicates {
545            let result = self
546                .expression_evaluator
547                .evaluate_predicate(&eval_context, predicate)
548                .await?;
549            if !result {
550                return Ok(false);
551            }
552        }
553
554        Ok(true)
555    }
556
557    #[tracing::instrument(skip_all, err, level = "debug")]
558    async fn project_solution(
559        &self,
560        part_context: QueryPartEvaluationContext,
561        change_context: &ChangeContext,
562    ) -> Result<Vec<QueryPartEvaluationContext>, EvaluationError> {
563        let mut result = Vec::new();
564        let mut contexts = vec![(part_context, change_context.clone())];
565
566        let mut part_num = 0;
567
568        for part in &self.query.parts {
569            part_num += 1;
570            result.clear();
571
572            for (part_context, change_context) in &contexts {
573                let new_contexts = self
574                    .part_evaluator
575                    .evaluate(part_context.clone(), part_num, part, change_context)
576                    .await?;
577
578                let mut aggregation_results = CollapsedAggregationResults::new();
579
580                new_contexts.into_iter().for_each(|ctx| match &ctx {
581                    QueryPartEvaluationContext::Aggregation { .. } => {
582                        aggregation_results.insert(ctx)
583                    }
584                    QueryPartEvaluationContext::Noop => (),
585                    _ => result.push((ctx, change_context.clone())),
586                });
587
588                for actx in aggregation_results.into_vec_with_context(change_context) {
589                    result.push(actx);
590                }
591            }
592            contexts = result.clone();
593        }
594
595        Ok(result
596            .into_iter()
597            .map(|(ctx, cc)| match ctx {
598                QueryPartEvaluationContext::Adding { after, .. } => {
599                    QueryPartEvaluationContext::Adding {
600                        after,
601                        row_signature: cc.solution_signature,
602                    }
603                }
604                QueryPartEvaluationContext::Updating { before, after, .. } => {
605                    QueryPartEvaluationContext::Updating {
606                        before,
607                        after,
608                        row_signature: cc.solution_signature,
609                    }
610                }
611                QueryPartEvaluationContext::Removing { before, .. } => {
612                    QueryPartEvaluationContext::Removing {
613                        before,
614                        row_signature: cc.solution_signature,
615                    }
616                }
617                QueryPartEvaluationContext::Aggregation {
618                    before,
619                    after,
620                    grouping_keys,
621                    default_before,
622                    default_after,
623                    ..
624                } => QueryPartEvaluationContext::Aggregation {
625                    before,
626                    after,
627                    grouping_keys,
628                    default_before,
629                    default_after,
630                    row_signature: cc.after_grouping_hash,
631                },
632                QueryPartEvaluationContext::Noop => QueryPartEvaluationContext::Noop,
633            })
634            .collect())
635    }
636
637    #[tracing::instrument(skip_all, err, level = "debug")]
638    async fn execute_source_middleware(
639        &self,
640        change: SourceChange,
641    ) -> Result<Vec<SourceChange>, MiddlewareError> {
642        let source_id = change.get_reference().source_id.clone();
643        let mut source_changes = vec![change];
644
645        let pipeline = match self.source_pipelines.get(source_id) {
646            Some(pipeline) => pipeline,
647            None => return Ok(source_changes),
648        };
649
650        let mut new_source_changes = Vec::new();
651        for source_change in source_changes {
652            new_source_changes.append(
653                &mut pipeline
654                    .process(source_change, self.element_index.clone())
655                    .await?,
656            );
657        }
658
659        source_changes = new_source_changes;
660
661        Ok(source_changes)
662    }
663
664    pub async fn set_future_consumer(&self, consumer: Arc<dyn FutureQueueConsumer>) {
665        let mut future_queue_task = self.future_queue_task.lock().await;
666        if let Some(c) = future_queue_task.take() {
667            c.abort();
668        }
669
670        let queue = self.future_queue.clone();
671        let shutdown_request = self.future_consumer_shutdown_request.clone();
672
673        let task = tokio::spawn(async move {
674            let idle_interval = Duration::from_secs(1);
675            let error_interval = Duration::from_secs(5);
676            loop {
677                select! {
678                    _ = shutdown_request.notified() => {
679                        log::info!("Future queue consumer shutting down");
680                        break;
681                    }
682                    peek = queue.peek_due_time() => {
683                        match peek {
684                            Ok(Some(due_time)) => {
685                                if due_time > consumer.now() {
686                                    tokio::time::sleep(idle_interval).await;
687                                    continue;
688                                }
689                            }
690                            Ok(None) => {
691                                tokio::time::sleep(idle_interval).await;
692                                continue;
693                            }
694                            Err(e) => {
695                                log::error!("Future queue consumer error: {e:?}");
696                                tokio::time::sleep(error_interval).await;
697                                continue;
698                            }
699                        };
700
701                        // Items are due — delegate to consumer which calls process_due_futures()
702                        match consumer.on_items_due().await {
703                            Ok(_) => {}
704                            Err(e) => {
705                                log::error!("Future queue consumer error: {e:?}");
706                                consumer.on_error(e).await;
707                                tokio::time::sleep(error_interval).await;
708                            }
709                        }
710                    }
711                }
712            }
713        });
714
715        _ = future_queue_task.insert(task);
716    }
717
718    pub async fn terminate_future_consumer(&self) {
719        let mut future_queue_task = self.future_queue_task.lock().await;
720        if let Some(task) = future_queue_task.take() {
721            self.future_consumer_shutdown_request.notify_one();
722            select! {
723                _ = task => {
724                    log::info!("Future queue consumer terminated");
725                }
726                _ = tokio::time::sleep(Duration::from_secs(10)) => {
727                    log::error!("Future queue consumer termination timeout");
728                }
729            }
730        }
731    }
732
733    pub fn get_query(&self) -> Arc<Query> {
734        self.query.clone()
735    }
736}
737
738impl Drop for ContinuousQuery {
739    fn drop(&mut self) {
740        self.future_consumer_shutdown_request.notify_one();
741    }
742}
743
744impl Debug for ContinuousQuery {
745    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
746        f.debug_struct("ContinuousQuery")
747            .field("query", &self.query)
748            .finish()
749    }
750}
751
752struct SolutionChangesResult {
753    pub changes: Vec<(SolutionSignature, QueryPartEvaluationContext)>,
754    pub anchor_element: Option<Arc<Element>>,
755    pub before_clock: Option<Arc<dyn QueryClock>>,
756    pub before_anchor_element: Option<Arc<Element>>,
757    pub is_future_reprocess: bool,
758}
759
760impl SolutionChangesResult {
761    fn new() -> Self {
762        Self {
763            changes: Vec::new(),
764            before_clock: None,
765            anchor_element: None,
766            before_anchor_element: None,
767            is_future_reprocess: false,
768        }
769    }
770}
771
772struct CollapsedAggregationResults {
773    // [hash of after change grouping keys] -> (context, hash of before change grouping keys)
774    data: HashMap<u64, (QueryPartEvaluationContext, u64)>,
775}
776
777impl CollapsedAggregationResults {
778    fn new() -> Self {
779        Self {
780            data: HashMap::new(),
781        }
782    }
783
784    fn insert(&mut self, context: QueryPartEvaluationContext) {
785        if let QueryPartEvaluationContext::Aggregation {
786            before,
787            after,
788            grouping_keys,
789            default_before,
790            default_after,
791            ..
792        } = context
793        {
794            let after_key = extract_grouping_value_hash(&grouping_keys, &after);
795            let before_key = match &before {
796                Some(before) => extract_grouping_value_hash(&grouping_keys, before),
797                None => after_key,
798            };
799
800            match self.data.remove(&after_key) {
801                Some((existing, before_key)) => {
802                    if let QueryPartEvaluationContext::Aggregation {
803                        before: existing_before,
804                        ..
805                    } = existing
806                    {
807                        self.data.insert(
808                            after_key,
809                            (
810                                QueryPartEvaluationContext::Aggregation {
811                                    before: existing_before,
812                                    default_before,
813                                    default_after,
814                                    after,
815                                    grouping_keys,
816                                    row_signature: after_key,
817                                },
818                                before_key,
819                            ),
820                        );
821                    }
822                }
823                None => {
824                    self.data.insert(
825                        after_key,
826                        (
827                            QueryPartEvaluationContext::Aggregation {
828                                before,
829                                after,
830                                grouping_keys,
831                                default_before,
832                                default_after,
833                                row_signature: after_key,
834                            },
835                            before_key,
836                        ),
837                    );
838                }
839            }
840        }
841    }
842
843    fn into_vec_with_context(
844        self,
845        change_context: &ChangeContext,
846    ) -> Vec<(QueryPartEvaluationContext, ChangeContext)> {
847        self.data
848            .into_iter()
849            .map(|(after_key, (v, before_key))| {
850                let mut change_context = change_context.clone();
851                change_context.before_grouping_hash = before_key;
852                change_context.after_grouping_hash = after_key;
853                (v, change_context)
854            })
855            .collect()
856    }
857
858    fn into_result_vec(self) -> Vec<QueryPartEvaluationContext> {
859        self.data
860            .into_iter()
861            .map(|(after_key, (ctx, _))| match ctx {
862                QueryPartEvaluationContext::Aggregation {
863                    before,
864                    after,
865                    grouping_keys,
866                    default_before,
867                    default_after,
868                    ..
869                } => QueryPartEvaluationContext::Aggregation {
870                    before,
871                    after,
872                    grouping_keys,
873                    default_before,
874                    default_after,
875                    row_signature: after_key,
876                },
877                other => other,
878            })
879            .collect()
880    }
881}
882
883fn extract_grouping_value_hash(grouping_keys: &Vec<String>, variables: &QueryVariables) -> u64 {
884    let mut hasher = SpookyHasher::default();
885
886    for key in grouping_keys {
887        match variables.get(key.as_str()) {
888            Some(v) => v.hash_for_groupby(&mut hasher),
889            None => 0.hash(&mut hasher),
890        };
891    }
892    hasher.finish()
893}