1use std::{
16 collections::HashMap,
17 fmt::Debug,
18 future::Future,
19 hash::{Hash, Hasher},
20 sync::Arc,
21 time::Duration,
22};
23
24use drasi_query_ast::ast::Query;
25use hashers::jenkins::spooky_hash::SpookyHasher;
26use tokio::{
27 select,
28 sync::{Mutex, Notify},
29 task::JoinHandle,
30};
31
32use crate::{
33 evaluation::{
34 context::{ChangeContext, QueryPartEvaluationContext, QueryVariables},
35 EvaluationError, ExpressionEvaluationContext, ExpressionEvaluator, InstantQueryClock,
36 QueryPartEvaluator,
37 },
38 interface::{
39 ElementIndex, FutureQueue, FutureQueueConsumer, IndexError, MiddlewareError, QueryClock,
40 SessionControl, SessionGuard,
41 },
42 middleware::SourceMiddlewarePipelineCollection,
43 models::{Element, SourceChange},
44 path_solver::{
45 match_path::{MatchPath, SlotElementSpec},
46 solution::{MatchPathSolution, SolutionSignature},
47 MatchPathSolver, MatchSolveContext,
48 },
49};
50
51pub struct DueFutureResult {
55 pub results: Vec<QueryPartEvaluationContext>,
56 pub source_id: Arc<str>,
58}
59
60pub struct ContinuousQuery {
61 expression_evaluator: Arc<ExpressionEvaluator>,
62 part_evaluator: Arc<QueryPartEvaluator>,
63 element_index: Arc<dyn ElementIndex>,
64 path_solver: Arc<MatchPathSolver>,
65 match_path: Arc<MatchPath>,
66 query: Arc<Query>,
67 future_consumer_shutdown_request: Arc<Notify>,
68 future_queue: Arc<dyn FutureQueue>,
69 future_queue_task: Mutex<Option<JoinHandle<()>>>,
70 change_lock: Mutex<()>,
71 source_pipelines: SourceMiddlewarePipelineCollection,
72 session_control: Arc<dyn SessionControl>,
73}
74
75impl ContinuousQuery {
76 #[allow(clippy::too_many_arguments)]
77 pub fn new(
78 query: Arc<Query>,
79 match_path: Arc<MatchPath>,
80 expression_evaluator: Arc<ExpressionEvaluator>,
81 element_index: Arc<dyn ElementIndex>,
82 path_solver: Arc<MatchPathSolver>,
83 part_evaluator: Arc<QueryPartEvaluator>,
84 future_queue: Arc<dyn FutureQueue>,
85 source_pipelines: SourceMiddlewarePipelineCollection,
86 session_control: Arc<dyn SessionControl>,
87 ) -> Self {
88 Self {
89 expression_evaluator,
90 element_index,
91 path_solver,
92 match_path,
93 part_evaluator,
94 query,
95 future_consumer_shutdown_request: Arc::new(Notify::new()),
96 future_queue,
97 future_queue_task: Mutex::new(None),
98 change_lock: Mutex::new(()),
99 source_pipelines,
100 session_control,
101 }
102 }
103
104 #[tracing::instrument(skip_all, err, level = "debug")]
105 pub async fn process_source_change(
106 &self,
107 change: SourceChange,
108 ) -> Result<Vec<QueryPartEvaluationContext>, EvaluationError> {
109 let _lock = self.change_lock.lock().await;
110 let guard = SessionGuard::begin(self.session_control.clone()).await?;
111
112 let changes = self.execute_source_middleware(change).await?;
113 let result = self.process_changes_inner(changes).await?;
114
115 guard.commit().await?;
116 Ok(result)
117 }
118
119 #[tracing::instrument(skip_all, err, level = "debug")]
126 pub async fn process_source_change_with_hook<F, Fut>(
127 &self,
128 change: SourceChange,
129 pre_commit_hook: F,
130 ) -> Result<Vec<QueryPartEvaluationContext>, EvaluationError>
131 where
132 F: FnOnce() -> Fut + Send,
133 Fut: Future<Output = Result<(), IndexError>> + Send,
134 {
135 let _lock = self.change_lock.lock().await;
136 let guard = SessionGuard::begin(self.session_control.clone()).await?;
137
138 let changes = self.execute_source_middleware(change).await?;
139 let result = self.process_changes_inner(changes).await?;
140
141 pre_commit_hook().await?;
142 guard.commit().await?;
143 Ok(result)
144 }
145
146 #[tracing::instrument(skip_all, err, level = "debug")]
154 pub async fn process_due_futures(&self) -> Result<Option<DueFutureResult>, EvaluationError> {
155 let _lock = self.change_lock.lock().await;
156 let guard = SessionGuard::begin(self.session_control.clone()).await?;
157
158 let future_ref = match self.future_queue.pop().await {
159 Ok(Some(fr)) => fr,
160 Ok(None) => {
161 guard.commit().await?;
162 return Ok(None);
163 }
164 Err(e) => return Err(EvaluationError::from(e)),
165 };
166
167 let source_id = future_ref.element_ref.source_id.clone();
168 let change = SourceChange::Future { future_ref };
169 let changes = self.execute_source_middleware(change).await?;
170 let results = self.process_changes_inner(changes).await?;
171 guard.commit().await?;
172 Ok(Some(DueFutureResult { results, source_id }))
173 }
174
175 pub fn future_queue(&self) -> Arc<dyn FutureQueue> {
177 self.future_queue.clone()
178 }
179
180 #[tracing::instrument(skip_all, err, level = "debug")]
183 async fn process_changes_inner(
184 &self,
185 changes: Vec<SourceChange>,
186 ) -> Result<Vec<QueryPartEvaluationContext>, EvaluationError> {
187 let mut result = Vec::new();
188
189 for change in changes {
190 let base_variables = QueryVariables::new(); let after_clock = Arc::new(InstantQueryClock::from_source_change(&change));
192
193 let solution_changes = self
194 .build_solution_changes(&base_variables, change, after_clock.clone())
195 .await?;
196 let before_clock = match solution_changes.before_clock {
197 Some(before_clock) => before_clock,
198 None => after_clock.clone(),
199 };
200
201 let mut aggregation_results = CollapsedAggregationResults::new();
202
203 for (solution_signature, part_context) in solution_changes.changes {
204 let change_results = match self
205 .project_solution(
206 part_context,
207 &ChangeContext {
208 solution_signature,
209 before_clock: before_clock.clone(),
210 after_clock: after_clock.clone(),
211 before_anchor_element: solution_changes.before_anchor_element.clone(),
212 after_anchor_element: solution_changes.anchor_element.clone(),
213 is_future_reprocess: solution_changes.is_future_reprocess,
214 before_grouping_hash: solution_signature,
215 after_grouping_hash: solution_signature,
216 },
217 )
218 .await
219 {
220 Ok(results) => results,
221 Err(EvaluationError::DivideByZero) => {
222 log::debug!("Skipping solution due to DivideByZero in projection");
223 continue;
224 }
225 Err(e) => return Err(e),
226 };
227 change_results.into_iter().for_each(|ctx| {
228 match &ctx {
229 QueryPartEvaluationContext::Aggregation {
230 before,
231 after,
232 default_before,
233 ..
234 } => {
235 if let Some(before) = before {
236 if before == after && !default_before {
237 return;
238 }
239 }
240
241 aggregation_results.insert(ctx);
242 }
243 QueryPartEvaluationContext::Updating { before, after, .. } => {
244 if before == after {
245 return;
246 }
247 result.push(ctx);
248 }
249 _ => result.push(ctx),
250 };
251 });
252 }
253
254 for ctx in aggregation_results.into_result_vec() {
255 result.push(ctx);
256 }
257 }
258
259 Ok(result)
260 }
261
262 #[tracing::instrument(skip_all, err, level = "debug")]
263 async fn build_solution_changes(
264 &self,
265 base_variables: &QueryVariables,
266 change: SourceChange,
267 clock: Arc<dyn QueryClock>,
268 ) -> Result<SolutionChangesResult, EvaluationError> {
269 let mut result = SolutionChangesResult::new();
270 let mut before_change_solutions = HashMap::new();
271 let mut after_change_solutions = HashMap::new();
272
273 match change {
274 SourceChange::Insert { element } => {
275 let element = Arc::new(element);
276 let affinity_slots = self
277 .get_slots_with_affinity(base_variables, element.clone(), clock.clone())
278 .await?;
279 let solutions = self
280 .resolve_solutions(element.clone(), affinity_slots, true)
281 .await?;
282
283 for (signature, solution) in solutions {
284 if let Some(blank_optional_solution) =
285 solution.get_empty_optional_solution(&self.match_path)
286 {
287 before_change_solutions.insert(signature, blank_optional_solution);
288 }
289 after_change_solutions.insert(signature, solution);
290 }
291
292 result.anchor_element = Some(element);
293 }
294 SourceChange::Update { mut element } => {
295 if let Some(prev_version) = self
296 .element_index
297 .get_element(element.get_reference())
298 .await?
299 {
300 let prev_timestamp = prev_version.get_effective_from();
301 let before_clock =
302 Arc::new(InstantQueryClock::new(prev_timestamp, clock.get_realtime()));
303 let affinity_slots = self
304 .get_slots_with_affinity(
305 base_variables,
306 prev_version.clone(),
307 before_clock.clone(),
308 )
309 .await?;
310 let solutions = self
311 .resolve_solutions(prev_version.clone(), affinity_slots, false)
312 .await?;
313 for (signature, solution) in solutions {
314 before_change_solutions.insert(signature, solution);
315 }
316 element.merge_missing_properties(prev_version.as_ref());
317 result.before_clock = Some(before_clock);
318 result.before_anchor_element = Some(prev_version);
319 }
320
321 let element = Arc::new(element);
322 let affinity_slots = self
323 .get_slots_with_affinity(base_variables, element.clone(), clock.clone())
324 .await?;
325 let solutions = self
326 .resolve_solutions(element.clone(), affinity_slots, true)
327 .await?;
328
329 for (signature, solution) in solutions {
330 after_change_solutions.insert(signature, solution);
331 }
332
333 result.anchor_element = Some(element);
334 }
335 SourceChange::Delete { metadata } => {
336 if let Some(element) = self.element_index.get_element(&metadata.reference).await? {
337 let prev_timestamp = element.get_effective_from();
338 let before_clock =
339 Arc::new(InstantQueryClock::new(prev_timestamp, clock.get_realtime()));
340 let affinity_slots = self
341 .get_slots_with_affinity(
342 base_variables,
343 element.clone(),
344 before_clock.clone(),
345 )
346 .await?;
347 let solutions = self
348 .resolve_solutions(element.clone(), affinity_slots, false)
349 .await?;
350 for (signature, solution) in solutions {
351 if let Some(blank_optional_solution) =
352 solution.get_empty_optional_solution(&self.match_path)
353 {
354 after_change_solutions.insert(signature, blank_optional_solution);
355 }
356
357 before_change_solutions.insert(signature, solution);
358 }
359 result.before_clock = Some(before_clock);
360 result.before_anchor_element = Some(element);
361
362 match self.element_index.delete_element(&metadata.reference).await {
363 Ok(_) => {}
364 Err(e) => return Err(EvaluationError::from(e)),
365 }
366 }
367 }
368 SourceChange::Future { future_ref } => {
369 result.is_future_reprocess = true;
370 if let Some(element) = self
371 .element_index
372 .get_element(&future_ref.element_ref)
373 .await?
374 {
375 let prev_timestamp = element.get_effective_from();
376 if prev_timestamp >= future_ref.due_time {
377 return Ok(result);
379 }
380
381 let before_clock =
382 Arc::new(InstantQueryClock::new(prev_timestamp, prev_timestamp));
383
384 let affinity_slots = self
385 .get_slots_with_affinity(
386 base_variables,
387 element.clone(),
388 before_clock.clone(),
389 )
390 .await?;
391
392 let before_solutions = self
393 .resolve_solutions(element.clone(), affinity_slots, false)
394 .await?;
395 for (signature, solution) in before_solutions {
396 before_change_solutions.insert(signature, solution);
397 }
398
399 result.before_clock = Some(before_clock);
400 result.before_anchor_element = Some(element.clone());
401
402 let affinity_slots = self
403 .get_slots_with_affinity(base_variables, element.clone(), clock.clone())
404 .await?;
405
406 let after_solutions = self
407 .resolve_solutions(element.clone(), affinity_slots, false)
408 .await?;
409 for (signature, solution) in after_solutions {
410 after_change_solutions.insert(signature, solution);
411 }
412
413 result.anchor_element = Some(element);
414 }
415 }
416 }
417
418 for (sig, before_sol) in &before_change_solutions {
419 match after_change_solutions.get(sig) {
420 Some(after_sol) => result.changes.push((
421 *sig,
422 QueryPartEvaluationContext::Updating {
423 before: before_sol.into_query_variables(&self.match_path, base_variables),
424 after: after_sol.into_query_variables(&self.match_path, base_variables),
425 row_signature: 0,
426 },
427 )),
428 None => result.changes.push((
429 *sig,
430 QueryPartEvaluationContext::Removing {
431 before: before_sol.into_query_variables(&self.match_path, base_variables),
432 row_signature: 0,
433 },
434 )),
435 }
436 }
437
438 for (sig, after_sol) in &after_change_solutions {
439 if !before_change_solutions.contains_key(sig) {
440 result.changes.push((
441 *sig,
442 QueryPartEvaluationContext::Adding {
443 after: after_sol.into_query_variables(&self.match_path, base_variables),
444 row_signature: 0,
445 },
446 ))
447 }
448 }
449
450 Ok(result)
451 }
452
453 async fn resolve_solutions(
454 &self,
455 anchor_element: Arc<Element>,
456 affinity_slots: Vec<usize>,
457 update_index: bool,
458 ) -> Result<HashMap<u64, MatchPathSolution>, EvaluationError> {
459 if update_index {
460 self.element_index
461 .set_element(anchor_element.as_ref(), &affinity_slots)
462 .await?;
463 }
464
465 let mut result = HashMap::new();
466
467 for slot_num in affinity_slots {
468 let solution = self
469 .path_solver
470 .solve(self.match_path.clone(), anchor_element.clone(), slot_num)
471 .await?;
472 result.extend(solution.into_iter());
473 }
474
475 Ok(result)
476 }
477
478 async fn get_slots_with_affinity(
479 &self,
480 variables: &QueryVariables,
481 anchor_element: Arc<Element>,
482 clock: Arc<dyn QueryClock>,
483 ) -> Result<Vec<usize>, EvaluationError> {
484 let context = MatchSolveContext::new(variables, clock);
485
486 let mut affinity_slots = Vec::new();
487
488 for (slot_num, slot) in self.match_path.slots.iter().enumerate() {
489 if self
490 .match_element_to_slot(&context, &slot.spec, anchor_element.clone())
491 .await?
492 {
493 affinity_slots.push(slot_num);
494 }
495 }
496
497 Ok(affinity_slots)
498 }
499
500 async fn match_element_to_slot(
501 &self,
502 context: &MatchSolveContext<'_>,
503 element_spec: &SlotElementSpec,
504 element: Arc<Element>,
505 ) -> Result<bool, EvaluationError> {
506 let metadata = element.get_metadata();
507 let mut label_match = element_spec.labels.is_empty();
508
509 for label in &element_spec.labels {
510 if metadata.labels.contains(label) {
511 label_match = true;
512 break;
513 }
514 }
515
516 if !label_match {
517 return Ok(false);
518 }
519
520 let mut variables = context.variables.clone();
521
522 let element_variable = element.to_expression_variable();
523
524 if element_spec.annotation.is_some() {
525 variables.insert(
526 element_spec
527 .annotation
528 .clone()
529 .unwrap()
530 .to_string()
531 .into_boxed_str(),
532 element_variable.clone(),
533 );
534 }
535
536 variables.insert("".into(), element_variable);
537
538 let eval_context = ExpressionEvaluationContext::from_slot(
539 &variables,
540 context.clock.clone(),
541 &metadata.reference,
542 );
543
544 for predicate in &element_spec.predicates {
545 let result = self
546 .expression_evaluator
547 .evaluate_predicate(&eval_context, predicate)
548 .await?;
549 if !result {
550 return Ok(false);
551 }
552 }
553
554 Ok(true)
555 }
556
557 #[tracing::instrument(skip_all, err, level = "debug")]
558 async fn project_solution(
559 &self,
560 part_context: QueryPartEvaluationContext,
561 change_context: &ChangeContext,
562 ) -> Result<Vec<QueryPartEvaluationContext>, EvaluationError> {
563 let mut result = Vec::new();
564 let mut contexts = vec![(part_context, change_context.clone())];
565
566 let mut part_num = 0;
567
568 for part in &self.query.parts {
569 part_num += 1;
570 result.clear();
571
572 for (part_context, change_context) in &contexts {
573 let new_contexts = self
574 .part_evaluator
575 .evaluate(part_context.clone(), part_num, part, change_context)
576 .await?;
577
578 let mut aggregation_results = CollapsedAggregationResults::new();
579
580 new_contexts.into_iter().for_each(|ctx| match &ctx {
581 QueryPartEvaluationContext::Aggregation { .. } => {
582 aggregation_results.insert(ctx)
583 }
584 QueryPartEvaluationContext::Noop => (),
585 _ => result.push((ctx, change_context.clone())),
586 });
587
588 for actx in aggregation_results.into_vec_with_context(change_context) {
589 result.push(actx);
590 }
591 }
592 contexts = result.clone();
593 }
594
595 Ok(result
596 .into_iter()
597 .map(|(ctx, cc)| match ctx {
598 QueryPartEvaluationContext::Adding { after, .. } => {
599 QueryPartEvaluationContext::Adding {
600 after,
601 row_signature: cc.solution_signature,
602 }
603 }
604 QueryPartEvaluationContext::Updating { before, after, .. } => {
605 QueryPartEvaluationContext::Updating {
606 before,
607 after,
608 row_signature: cc.solution_signature,
609 }
610 }
611 QueryPartEvaluationContext::Removing { before, .. } => {
612 QueryPartEvaluationContext::Removing {
613 before,
614 row_signature: cc.solution_signature,
615 }
616 }
617 QueryPartEvaluationContext::Aggregation {
618 before,
619 after,
620 grouping_keys,
621 default_before,
622 default_after,
623 ..
624 } => QueryPartEvaluationContext::Aggregation {
625 before,
626 after,
627 grouping_keys,
628 default_before,
629 default_after,
630 row_signature: cc.after_grouping_hash,
631 },
632 QueryPartEvaluationContext::Noop => QueryPartEvaluationContext::Noop,
633 })
634 .collect())
635 }
636
637 #[tracing::instrument(skip_all, err, level = "debug")]
638 async fn execute_source_middleware(
639 &self,
640 change: SourceChange,
641 ) -> Result<Vec<SourceChange>, MiddlewareError> {
642 let source_id = change.get_reference().source_id.clone();
643 let mut source_changes = vec![change];
644
645 let pipeline = match self.source_pipelines.get(source_id) {
646 Some(pipeline) => pipeline,
647 None => return Ok(source_changes),
648 };
649
650 let mut new_source_changes = Vec::new();
651 for source_change in source_changes {
652 new_source_changes.append(
653 &mut pipeline
654 .process(source_change, self.element_index.clone())
655 .await?,
656 );
657 }
658
659 source_changes = new_source_changes;
660
661 Ok(source_changes)
662 }
663
664 pub async fn set_future_consumer(&self, consumer: Arc<dyn FutureQueueConsumer>) {
665 let mut future_queue_task = self.future_queue_task.lock().await;
666 if let Some(c) = future_queue_task.take() {
667 c.abort();
668 }
669
670 let queue = self.future_queue.clone();
671 let shutdown_request = self.future_consumer_shutdown_request.clone();
672
673 let task = tokio::spawn(async move {
674 let idle_interval = Duration::from_secs(1);
675 let error_interval = Duration::from_secs(5);
676 loop {
677 select! {
678 _ = shutdown_request.notified() => {
679 log::info!("Future queue consumer shutting down");
680 break;
681 }
682 peek = queue.peek_due_time() => {
683 match peek {
684 Ok(Some(due_time)) => {
685 if due_time > consumer.now() {
686 tokio::time::sleep(idle_interval).await;
687 continue;
688 }
689 }
690 Ok(None) => {
691 tokio::time::sleep(idle_interval).await;
692 continue;
693 }
694 Err(e) => {
695 log::error!("Future queue consumer error: {e:?}");
696 tokio::time::sleep(error_interval).await;
697 continue;
698 }
699 };
700
701 match consumer.on_items_due().await {
703 Ok(_) => {}
704 Err(e) => {
705 log::error!("Future queue consumer error: {e:?}");
706 consumer.on_error(e).await;
707 tokio::time::sleep(error_interval).await;
708 }
709 }
710 }
711 }
712 }
713 });
714
715 _ = future_queue_task.insert(task);
716 }
717
718 pub async fn terminate_future_consumer(&self) {
719 let mut future_queue_task = self.future_queue_task.lock().await;
720 if let Some(task) = future_queue_task.take() {
721 self.future_consumer_shutdown_request.notify_one();
722 select! {
723 _ = task => {
724 log::info!("Future queue consumer terminated");
725 }
726 _ = tokio::time::sleep(Duration::from_secs(10)) => {
727 log::error!("Future queue consumer termination timeout");
728 }
729 }
730 }
731 }
732
733 pub fn get_query(&self) -> Arc<Query> {
734 self.query.clone()
735 }
736}
737
738impl Drop for ContinuousQuery {
739 fn drop(&mut self) {
740 self.future_consumer_shutdown_request.notify_one();
741 }
742}
743
744impl Debug for ContinuousQuery {
745 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
746 f.debug_struct("ContinuousQuery")
747 .field("query", &self.query)
748 .finish()
749 }
750}
751
752struct SolutionChangesResult {
753 pub changes: Vec<(SolutionSignature, QueryPartEvaluationContext)>,
754 pub anchor_element: Option<Arc<Element>>,
755 pub before_clock: Option<Arc<dyn QueryClock>>,
756 pub before_anchor_element: Option<Arc<Element>>,
757 pub is_future_reprocess: bool,
758}
759
760impl SolutionChangesResult {
761 fn new() -> Self {
762 Self {
763 changes: Vec::new(),
764 before_clock: None,
765 anchor_element: None,
766 before_anchor_element: None,
767 is_future_reprocess: false,
768 }
769 }
770}
771
772struct CollapsedAggregationResults {
773 data: HashMap<u64, (QueryPartEvaluationContext, u64)>,
775}
776
777impl CollapsedAggregationResults {
778 fn new() -> Self {
779 Self {
780 data: HashMap::new(),
781 }
782 }
783
784 fn insert(&mut self, context: QueryPartEvaluationContext) {
785 if let QueryPartEvaluationContext::Aggregation {
786 before,
787 after,
788 grouping_keys,
789 default_before,
790 default_after,
791 ..
792 } = context
793 {
794 let after_key = extract_grouping_value_hash(&grouping_keys, &after);
795 let before_key = match &before {
796 Some(before) => extract_grouping_value_hash(&grouping_keys, before),
797 None => after_key,
798 };
799
800 match self.data.remove(&after_key) {
801 Some((existing, before_key)) => {
802 if let QueryPartEvaluationContext::Aggregation {
803 before: existing_before,
804 ..
805 } = existing
806 {
807 self.data.insert(
808 after_key,
809 (
810 QueryPartEvaluationContext::Aggregation {
811 before: existing_before,
812 default_before,
813 default_after,
814 after,
815 grouping_keys,
816 row_signature: after_key,
817 },
818 before_key,
819 ),
820 );
821 }
822 }
823 None => {
824 self.data.insert(
825 after_key,
826 (
827 QueryPartEvaluationContext::Aggregation {
828 before,
829 after,
830 grouping_keys,
831 default_before,
832 default_after,
833 row_signature: after_key,
834 },
835 before_key,
836 ),
837 );
838 }
839 }
840 }
841 }
842
843 fn into_vec_with_context(
844 self,
845 change_context: &ChangeContext,
846 ) -> Vec<(QueryPartEvaluationContext, ChangeContext)> {
847 self.data
848 .into_iter()
849 .map(|(after_key, (v, before_key))| {
850 let mut change_context = change_context.clone();
851 change_context.before_grouping_hash = before_key;
852 change_context.after_grouping_hash = after_key;
853 (v, change_context)
854 })
855 .collect()
856 }
857
858 fn into_result_vec(self) -> Vec<QueryPartEvaluationContext> {
859 self.data
860 .into_iter()
861 .map(|(after_key, (ctx, _))| match ctx {
862 QueryPartEvaluationContext::Aggregation {
863 before,
864 after,
865 grouping_keys,
866 default_before,
867 default_after,
868 ..
869 } => QueryPartEvaluationContext::Aggregation {
870 before,
871 after,
872 grouping_keys,
873 default_before,
874 default_after,
875 row_signature: after_key,
876 },
877 other => other,
878 })
879 .collect()
880 }
881}
882
883fn extract_grouping_value_hash(grouping_keys: &Vec<String>, variables: &QueryVariables) -> u64 {
884 let mut hasher = SpookyHasher::default();
885
886 for key in grouping_keys {
887 match variables.get(key.as_str()) {
888 Some(v) => v.hash_for_groupby(&mut hasher),
889 None => 0.hash(&mut hasher),
890 };
891 }
892 hasher.finish()
893}