1use std::{
16 collections::HashMap,
17 fmt::Debug,
18 hash::{Hash, Hasher},
19 sync::Arc,
20 time::Duration,
21};
22
23use drasi_query_ast::ast::Query;
24use hashers::jenkins::spooky_hash::SpookyHasher;
25use tokio::{
26 select,
27 sync::{Mutex, Notify},
28 task::JoinHandle,
29};
30
31use crate::{
32 evaluation::{
33 context::{ChangeContext, QueryPartEvaluationContext, QueryVariables},
34 EvaluationError, ExpressionEvaluationContext, ExpressionEvaluator, InstantQueryClock,
35 QueryPartEvaluator,
36 },
37 interface::{
38 ElementIndex, FutureQueue, FutureQueueConsumer, MiddlewareError, QueryClock,
39 SessionControl, SessionGuard,
40 },
41 middleware::SourceMiddlewarePipelineCollection,
42 models::{Element, SourceChange},
43 path_solver::{
44 match_path::{MatchPath, SlotElementSpec},
45 solution::{MatchPathSolution, SolutionSignature},
46 MatchPathSolver, MatchSolveContext,
47 },
48};
49
50pub struct DueFutureResult {
54 pub results: Vec<QueryPartEvaluationContext>,
55 pub source_id: Arc<str>,
57}
58
59pub struct ContinuousQuery {
60 expression_evaluator: Arc<ExpressionEvaluator>,
61 part_evaluator: Arc<QueryPartEvaluator>,
62 element_index: Arc<dyn ElementIndex>,
63 path_solver: Arc<MatchPathSolver>,
64 match_path: Arc<MatchPath>,
65 query: Arc<Query>,
66 future_consumer_shutdown_request: Arc<Notify>,
67 future_queue: Arc<dyn FutureQueue>,
68 future_queue_task: Mutex<Option<JoinHandle<()>>>,
69 change_lock: Mutex<()>,
70 source_pipelines: SourceMiddlewarePipelineCollection,
71 session_control: Arc<dyn SessionControl>,
72}
73
74impl ContinuousQuery {
75 #[allow(clippy::too_many_arguments)]
76 pub fn new(
77 query: Arc<Query>,
78 match_path: Arc<MatchPath>,
79 expression_evaluator: Arc<ExpressionEvaluator>,
80 element_index: Arc<dyn ElementIndex>,
81 path_solver: Arc<MatchPathSolver>,
82 part_evaluator: Arc<QueryPartEvaluator>,
83 future_queue: Arc<dyn FutureQueue>,
84 source_pipelines: SourceMiddlewarePipelineCollection,
85 session_control: Arc<dyn SessionControl>,
86 ) -> Self {
87 Self {
88 expression_evaluator,
89 element_index,
90 path_solver,
91 match_path,
92 part_evaluator,
93 query,
94 future_consumer_shutdown_request: Arc::new(Notify::new()),
95 future_queue,
96 future_queue_task: Mutex::new(None),
97 change_lock: Mutex::new(()),
98 source_pipelines,
99 session_control,
100 }
101 }
102
103 #[tracing::instrument(skip_all, err, level = "debug")]
104 pub async fn process_source_change(
105 &self,
106 change: SourceChange,
107 ) -> Result<Vec<QueryPartEvaluationContext>, EvaluationError> {
108 let _lock = self.change_lock.lock().await;
109 let guard = SessionGuard::begin(self.session_control.clone()).await?;
110
111 let changes = self.execute_source_middleware(change).await?;
112 let result = self.process_changes_inner(changes).await?;
113
114 guard.commit().await?;
115 Ok(result)
116 }
117
118 #[tracing::instrument(skip_all, err, level = "debug")]
126 pub async fn process_due_futures(&self) -> Result<Option<DueFutureResult>, EvaluationError> {
127 let _lock = self.change_lock.lock().await;
128 let guard = SessionGuard::begin(self.session_control.clone()).await?;
129
130 let future_ref = match self.future_queue.pop().await {
131 Ok(Some(fr)) => fr,
132 Ok(None) => {
133 guard.commit().await?;
134 return Ok(None);
135 }
136 Err(e) => return Err(EvaluationError::from(e)),
137 };
138
139 let source_id = future_ref.element_ref.source_id.clone();
140 let change = SourceChange::Future { future_ref };
141 let changes = self.execute_source_middleware(change).await?;
142 let results = self.process_changes_inner(changes).await?;
143 guard.commit().await?;
144 Ok(Some(DueFutureResult { results, source_id }))
145 }
146
147 pub fn future_queue(&self) -> Arc<dyn FutureQueue> {
149 self.future_queue.clone()
150 }
151
152 #[tracing::instrument(skip_all, err, level = "debug")]
155 async fn process_changes_inner(
156 &self,
157 changes: Vec<SourceChange>,
158 ) -> Result<Vec<QueryPartEvaluationContext>, EvaluationError> {
159 let mut result = Vec::new();
160
161 for change in changes {
162 let base_variables = QueryVariables::new(); let after_clock = Arc::new(InstantQueryClock::from_source_change(&change));
164
165 let solution_changes = self
166 .build_solution_changes(&base_variables, change, after_clock.clone())
167 .await?;
168 let before_clock = match solution_changes.before_clock {
169 Some(before_clock) => before_clock,
170 None => after_clock.clone(),
171 };
172
173 let mut aggregation_results = CollapsedAggregationResults::new();
174
175 for (solution_signature, part_context) in solution_changes.changes {
176 let change_results = match self
177 .project_solution(
178 part_context,
179 &ChangeContext {
180 solution_signature,
181 before_clock: before_clock.clone(),
182 after_clock: after_clock.clone(),
183 before_anchor_element: solution_changes.before_anchor_element.clone(),
184 after_anchor_element: solution_changes.anchor_element.clone(),
185 is_future_reprocess: solution_changes.is_future_reprocess,
186 before_grouping_hash: solution_signature,
187 after_grouping_hash: solution_signature,
188 },
189 )
190 .await
191 {
192 Ok(results) => results,
193 Err(EvaluationError::DivideByZero) => {
194 log::debug!("Skipping solution due to DivideByZero in projection");
195 continue;
196 }
197 Err(e) => return Err(e),
198 };
199 change_results.into_iter().for_each(|ctx| {
200 match &ctx {
201 QueryPartEvaluationContext::Aggregation {
202 before,
203 after,
204 default_before,
205 ..
206 } => {
207 if let Some(before) = before {
208 if before == after && !default_before {
209 return;
210 }
211 }
212
213 aggregation_results.insert(ctx);
214 }
215 QueryPartEvaluationContext::Updating { before, after, .. } => {
216 if before == after {
217 return;
218 }
219 result.push(ctx);
220 }
221 _ => result.push(ctx),
222 };
223 });
224 }
225
226 for ctx in aggregation_results.into_result_vec() {
227 result.push(ctx);
228 }
229 }
230
231 Ok(result)
232 }
233
234 #[tracing::instrument(skip_all, err, level = "debug")]
235 async fn build_solution_changes(
236 &self,
237 base_variables: &QueryVariables,
238 change: SourceChange,
239 clock: Arc<dyn QueryClock>,
240 ) -> Result<SolutionChangesResult, EvaluationError> {
241 let mut result = SolutionChangesResult::new();
242 let mut before_change_solutions = HashMap::new();
243 let mut after_change_solutions = HashMap::new();
244
245 match change {
246 SourceChange::Insert { element } => {
247 let element = Arc::new(element);
248 let affinity_slots = self
249 .get_slots_with_affinity(base_variables, element.clone(), clock.clone())
250 .await?;
251 let solutions = self
252 .resolve_solutions(element.clone(), affinity_slots, true)
253 .await?;
254
255 for (signature, solution) in solutions {
256 if let Some(blank_optional_solution) =
257 solution.get_empty_optional_solution(&self.match_path)
258 {
259 before_change_solutions.insert(signature, blank_optional_solution);
260 }
261 after_change_solutions.insert(signature, solution);
262 }
263
264 result.anchor_element = Some(element);
265 }
266 SourceChange::Update { mut element } => {
267 if let Some(prev_version) = self
268 .element_index
269 .get_element(element.get_reference())
270 .await?
271 {
272 let prev_timestamp = prev_version.get_effective_from();
273 let before_clock =
274 Arc::new(InstantQueryClock::new(prev_timestamp, clock.get_realtime()));
275 let affinity_slots = self
276 .get_slots_with_affinity(
277 base_variables,
278 prev_version.clone(),
279 before_clock.clone(),
280 )
281 .await?;
282 let solutions = self
283 .resolve_solutions(prev_version.clone(), affinity_slots, false)
284 .await?;
285 for (signature, solution) in solutions {
286 before_change_solutions.insert(signature, solution);
287 }
288 element.merge_missing_properties(prev_version.as_ref());
289 result.before_clock = Some(before_clock);
290 result.before_anchor_element = Some(prev_version);
291 }
292
293 let element = Arc::new(element);
294 let affinity_slots = self
295 .get_slots_with_affinity(base_variables, element.clone(), clock.clone())
296 .await?;
297 let solutions = self
298 .resolve_solutions(element.clone(), affinity_slots, true)
299 .await?;
300
301 for (signature, solution) in solutions {
302 after_change_solutions.insert(signature, solution);
303 }
304
305 result.anchor_element = Some(element);
306 }
307 SourceChange::Delete { metadata } => {
308 if let Some(element) = self.element_index.get_element(&metadata.reference).await? {
309 let prev_timestamp = element.get_effective_from();
310 let before_clock =
311 Arc::new(InstantQueryClock::new(prev_timestamp, clock.get_realtime()));
312 let affinity_slots = self
313 .get_slots_with_affinity(
314 base_variables,
315 element.clone(),
316 before_clock.clone(),
317 )
318 .await?;
319 let solutions = self
320 .resolve_solutions(element.clone(), affinity_slots, false)
321 .await?;
322 for (signature, solution) in solutions {
323 if let Some(blank_optional_solution) =
324 solution.get_empty_optional_solution(&self.match_path)
325 {
326 after_change_solutions.insert(signature, blank_optional_solution);
327 }
328
329 before_change_solutions.insert(signature, solution);
330 }
331 result.before_clock = Some(before_clock);
332 result.before_anchor_element = Some(element);
333
334 match self.element_index.delete_element(&metadata.reference).await {
335 Ok(_) => {}
336 Err(e) => return Err(EvaluationError::from(e)),
337 }
338 }
339 }
340 SourceChange::Future { future_ref } => {
341 result.is_future_reprocess = true;
342 if let Some(element) = self
343 .element_index
344 .get_element(&future_ref.element_ref)
345 .await?
346 {
347 let prev_timestamp = element.get_effective_from();
348 if prev_timestamp >= future_ref.due_time {
349 return Ok(result);
351 }
352
353 let before_clock =
354 Arc::new(InstantQueryClock::new(prev_timestamp, prev_timestamp));
355
356 let affinity_slots = self
357 .get_slots_with_affinity(
358 base_variables,
359 element.clone(),
360 before_clock.clone(),
361 )
362 .await?;
363
364 let before_solutions = self
365 .resolve_solutions(element.clone(), affinity_slots, false)
366 .await?;
367 for (signature, solution) in before_solutions {
368 before_change_solutions.insert(signature, solution);
369 }
370
371 result.before_clock = Some(before_clock);
372 result.before_anchor_element = Some(element.clone());
373
374 let affinity_slots = self
375 .get_slots_with_affinity(base_variables, element.clone(), clock.clone())
376 .await?;
377
378 let after_solutions = self
379 .resolve_solutions(element.clone(), affinity_slots, false)
380 .await?;
381 for (signature, solution) in after_solutions {
382 after_change_solutions.insert(signature, solution);
383 }
384
385 result.anchor_element = Some(element);
386 }
387 }
388 }
389
390 for (sig, before_sol) in &before_change_solutions {
391 match after_change_solutions.get(sig) {
392 Some(after_sol) => result.changes.push((
393 *sig,
394 QueryPartEvaluationContext::Updating {
395 before: before_sol.into_query_variables(&self.match_path, base_variables),
396 after: after_sol.into_query_variables(&self.match_path, base_variables),
397 row_signature: 0,
398 },
399 )),
400 None => result.changes.push((
401 *sig,
402 QueryPartEvaluationContext::Removing {
403 before: before_sol.into_query_variables(&self.match_path, base_variables),
404 row_signature: 0,
405 },
406 )),
407 }
408 }
409
410 for (sig, after_sol) in &after_change_solutions {
411 if !before_change_solutions.contains_key(sig) {
412 result.changes.push((
413 *sig,
414 QueryPartEvaluationContext::Adding {
415 after: after_sol.into_query_variables(&self.match_path, base_variables),
416 row_signature: 0,
417 },
418 ))
419 }
420 }
421
422 Ok(result)
423 }
424
425 async fn resolve_solutions(
426 &self,
427 anchor_element: Arc<Element>,
428 affinity_slots: Vec<usize>,
429 update_index: bool,
430 ) -> Result<HashMap<u64, MatchPathSolution>, EvaluationError> {
431 if update_index {
432 self.element_index
433 .set_element(anchor_element.as_ref(), &affinity_slots)
434 .await?;
435 }
436
437 let mut result = HashMap::new();
438
439 for slot_num in affinity_slots {
440 let solution = self
441 .path_solver
442 .solve(self.match_path.clone(), anchor_element.clone(), slot_num)
443 .await?;
444 result.extend(solution.into_iter());
445 }
446
447 Ok(result)
448 }
449
450 async fn get_slots_with_affinity(
451 &self,
452 variables: &QueryVariables,
453 anchor_element: Arc<Element>,
454 clock: Arc<dyn QueryClock>,
455 ) -> Result<Vec<usize>, EvaluationError> {
456 let context = MatchSolveContext::new(variables, clock);
457
458 let mut affinity_slots = Vec::new();
459
460 for (slot_num, slot) in self.match_path.slots.iter().enumerate() {
461 if self
462 .match_element_to_slot(&context, &slot.spec, anchor_element.clone())
463 .await?
464 {
465 affinity_slots.push(slot_num);
466 }
467 }
468
469 Ok(affinity_slots)
470 }
471
472 async fn match_element_to_slot(
473 &self,
474 context: &MatchSolveContext<'_>,
475 element_spec: &SlotElementSpec,
476 element: Arc<Element>,
477 ) -> Result<bool, EvaluationError> {
478 let metadata = element.get_metadata();
479 let mut label_match = element_spec.labels.is_empty();
480
481 for label in &element_spec.labels {
482 if metadata.labels.contains(label) {
483 label_match = true;
484 break;
485 }
486 }
487
488 if !label_match {
489 return Ok(false);
490 }
491
492 let mut variables = context.variables.clone();
493
494 let element_variable = element.to_expression_variable();
495
496 if element_spec.annotation.is_some() {
497 variables.insert(
498 element_spec
499 .annotation
500 .clone()
501 .unwrap()
502 .to_string()
503 .into_boxed_str(),
504 element_variable.clone(),
505 );
506 }
507
508 variables.insert("".into(), element_variable);
509
510 let eval_context = ExpressionEvaluationContext::from_slot(
511 &variables,
512 context.clock.clone(),
513 &metadata.reference,
514 );
515
516 for predicate in &element_spec.predicates {
517 let result = self
518 .expression_evaluator
519 .evaluate_predicate(&eval_context, predicate)
520 .await?;
521 if !result {
522 return Ok(false);
523 }
524 }
525
526 Ok(true)
527 }
528
529 #[tracing::instrument(skip_all, err, level = "debug")]
530 async fn project_solution(
531 &self,
532 part_context: QueryPartEvaluationContext,
533 change_context: &ChangeContext,
534 ) -> Result<Vec<QueryPartEvaluationContext>, EvaluationError> {
535 let mut result = Vec::new();
536 let mut contexts = vec![(part_context, change_context.clone())];
537
538 let mut part_num = 0;
539
540 for part in &self.query.parts {
541 part_num += 1;
542 result.clear();
543
544 for (part_context, change_context) in &contexts {
545 let new_contexts = self
546 .part_evaluator
547 .evaluate(part_context.clone(), part_num, part, change_context)
548 .await?;
549
550 let mut aggregation_results = CollapsedAggregationResults::new();
551
552 new_contexts.into_iter().for_each(|ctx| match &ctx {
553 QueryPartEvaluationContext::Aggregation { .. } => {
554 aggregation_results.insert(ctx)
555 }
556 QueryPartEvaluationContext::Noop => (),
557 _ => result.push((ctx, change_context.clone())),
558 });
559
560 for actx in aggregation_results.into_vec_with_context(change_context) {
561 result.push(actx);
562 }
563 }
564 contexts = result.clone();
565 }
566
567 Ok(result
568 .into_iter()
569 .map(|(ctx, cc)| match ctx {
570 QueryPartEvaluationContext::Adding { after, .. } => {
571 QueryPartEvaluationContext::Adding {
572 after,
573 row_signature: cc.solution_signature,
574 }
575 }
576 QueryPartEvaluationContext::Updating { before, after, .. } => {
577 QueryPartEvaluationContext::Updating {
578 before,
579 after,
580 row_signature: cc.solution_signature,
581 }
582 }
583 QueryPartEvaluationContext::Removing { before, .. } => {
584 QueryPartEvaluationContext::Removing {
585 before,
586 row_signature: cc.solution_signature,
587 }
588 }
589 QueryPartEvaluationContext::Aggregation {
590 before,
591 after,
592 grouping_keys,
593 default_before,
594 default_after,
595 ..
596 } => QueryPartEvaluationContext::Aggregation {
597 before,
598 after,
599 grouping_keys,
600 default_before,
601 default_after,
602 row_signature: cc.after_grouping_hash,
603 },
604 QueryPartEvaluationContext::Noop => QueryPartEvaluationContext::Noop,
605 })
606 .collect())
607 }
608
609 #[tracing::instrument(skip_all, err, level = "debug")]
610 async fn execute_source_middleware(
611 &self,
612 change: SourceChange,
613 ) -> Result<Vec<SourceChange>, MiddlewareError> {
614 let source_id = change.get_reference().source_id.clone();
615 let mut source_changes = vec![change];
616
617 let pipeline = match self.source_pipelines.get(source_id) {
618 Some(pipeline) => pipeline,
619 None => return Ok(source_changes),
620 };
621
622 let mut new_source_changes = Vec::new();
623 for source_change in source_changes {
624 new_source_changes.append(
625 &mut pipeline
626 .process(source_change, self.element_index.clone())
627 .await?,
628 );
629 }
630
631 source_changes = new_source_changes;
632
633 Ok(source_changes)
634 }
635
636 pub async fn set_future_consumer(&self, consumer: Arc<dyn FutureQueueConsumer>) {
637 let mut future_queue_task = self.future_queue_task.lock().await;
638 if let Some(c) = future_queue_task.take() {
639 c.abort();
640 }
641
642 let queue = self.future_queue.clone();
643 let shutdown_request = self.future_consumer_shutdown_request.clone();
644
645 let task = tokio::spawn(async move {
646 let idle_interval = Duration::from_secs(1);
647 let error_interval = Duration::from_secs(5);
648 loop {
649 select! {
650 _ = shutdown_request.notified() => {
651 log::info!("Future queue consumer shutting down");
652 break;
653 }
654 peek = queue.peek_due_time() => {
655 match peek {
656 Ok(Some(due_time)) => {
657 if due_time > consumer.now() {
658 tokio::time::sleep(idle_interval).await;
659 continue;
660 }
661 }
662 Ok(None) => {
663 tokio::time::sleep(idle_interval).await;
664 continue;
665 }
666 Err(e) => {
667 log::error!("Future queue consumer error: {e:?}");
668 tokio::time::sleep(error_interval).await;
669 continue;
670 }
671 };
672
673 match consumer.on_items_due().await {
675 Ok(_) => {}
676 Err(e) => {
677 log::error!("Future queue consumer error: {e:?}");
678 consumer.on_error(e).await;
679 tokio::time::sleep(error_interval).await;
680 }
681 }
682 }
683 }
684 }
685 });
686
687 _ = future_queue_task.insert(task);
688 }
689
690 pub async fn terminate_future_consumer(&self) {
691 let mut future_queue_task = self.future_queue_task.lock().await;
692 if let Some(task) = future_queue_task.take() {
693 self.future_consumer_shutdown_request.notify_one();
694 select! {
695 _ = task => {
696 log::info!("Future queue consumer terminated");
697 }
698 _ = tokio::time::sleep(Duration::from_secs(10)) => {
699 log::error!("Future queue consumer termination timeout");
700 }
701 }
702 }
703 }
704
705 pub fn get_query(&self) -> Arc<Query> {
706 self.query.clone()
707 }
708}
709
710impl Drop for ContinuousQuery {
711 fn drop(&mut self) {
712 self.future_consumer_shutdown_request.notify_one();
713 }
714}
715
716impl Debug for ContinuousQuery {
717 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
718 f.debug_struct("ContinuousQuery")
719 .field("query", &self.query)
720 .finish()
721 }
722}
723
724struct SolutionChangesResult {
725 pub changes: Vec<(SolutionSignature, QueryPartEvaluationContext)>,
726 pub anchor_element: Option<Arc<Element>>,
727 pub before_clock: Option<Arc<dyn QueryClock>>,
728 pub before_anchor_element: Option<Arc<Element>>,
729 pub is_future_reprocess: bool,
730}
731
732impl SolutionChangesResult {
733 fn new() -> Self {
734 Self {
735 changes: Vec::new(),
736 before_clock: None,
737 anchor_element: None,
738 before_anchor_element: None,
739 is_future_reprocess: false,
740 }
741 }
742}
743
744struct CollapsedAggregationResults {
745 data: HashMap<u64, (QueryPartEvaluationContext, u64)>,
747}
748
749impl CollapsedAggregationResults {
750 fn new() -> Self {
751 Self {
752 data: HashMap::new(),
753 }
754 }
755
756 fn insert(&mut self, context: QueryPartEvaluationContext) {
757 if let QueryPartEvaluationContext::Aggregation {
758 before,
759 after,
760 grouping_keys,
761 default_before,
762 default_after,
763 ..
764 } = context
765 {
766 let after_key = extract_grouping_value_hash(&grouping_keys, &after);
767 let before_key = match &before {
768 Some(before) => extract_grouping_value_hash(&grouping_keys, before),
769 None => after_key,
770 };
771
772 match self.data.remove(&after_key) {
773 Some((existing, before_key)) => {
774 if let QueryPartEvaluationContext::Aggregation {
775 before: existing_before,
776 ..
777 } = existing
778 {
779 self.data.insert(
780 after_key,
781 (
782 QueryPartEvaluationContext::Aggregation {
783 before: existing_before,
784 default_before,
785 default_after,
786 after,
787 grouping_keys,
788 row_signature: after_key,
789 },
790 before_key,
791 ),
792 );
793 }
794 }
795 None => {
796 self.data.insert(
797 after_key,
798 (
799 QueryPartEvaluationContext::Aggregation {
800 before,
801 after,
802 grouping_keys,
803 default_before,
804 default_after,
805 row_signature: after_key,
806 },
807 before_key,
808 ),
809 );
810 }
811 }
812 }
813 }
814
815 fn into_vec_with_context(
816 self,
817 change_context: &ChangeContext,
818 ) -> Vec<(QueryPartEvaluationContext, ChangeContext)> {
819 self.data
820 .into_iter()
821 .map(|(after_key, (v, before_key))| {
822 let mut change_context = change_context.clone();
823 change_context.before_grouping_hash = before_key;
824 change_context.after_grouping_hash = after_key;
825 (v, change_context)
826 })
827 .collect()
828 }
829
830 fn into_result_vec(self) -> Vec<QueryPartEvaluationContext> {
831 self.data
832 .into_iter()
833 .map(|(after_key, (ctx, _))| match ctx {
834 QueryPartEvaluationContext::Aggregation {
835 before,
836 after,
837 grouping_keys,
838 default_before,
839 default_after,
840 ..
841 } => QueryPartEvaluationContext::Aggregation {
842 before,
843 after,
844 grouping_keys,
845 default_before,
846 default_after,
847 row_signature: after_key,
848 },
849 other => other,
850 })
851 .collect()
852 }
853}
854
855fn extract_grouping_value_hash(grouping_keys: &Vec<String>, variables: &QueryVariables) -> u64 {
856 let mut hasher = SpookyHasher::default();
857
858 for key in grouping_keys {
859 match variables.get(key.as_str()) {
860 Some(v) => v.hash_for_groupby(&mut hasher),
861 None => 0.hash(&mut hasher),
862 };
863 }
864 hasher.finish()
865}