1use std::{
16 collections::HashMap,
17 fmt::Debug,
18 hash::{Hash, Hasher},
19 sync::Arc,
20 time::Duration,
21};
22
23use drasi_query_ast::ast::Query;
24use hashers::jenkins::spooky_hash::SpookyHasher;
25use tokio::{
26 select,
27 sync::{Mutex, Notify},
28 task::JoinHandle,
29};
30
31use crate::{
32 evaluation::{
33 context::{ChangeContext, QueryPartEvaluationContext, QueryVariables},
34 EvaluationError, ExpressionEvaluationContext, ExpressionEvaluator, InstantQueryClock,
35 QueryPartEvaluator,
36 },
37 interface::{ElementIndex, FutureQueue, FutureQueueConsumer, MiddlewareError, QueryClock},
38 middleware::SourceMiddlewarePipelineCollection,
39 models::{Element, SourceChange},
40 path_solver::{
41 match_path::{MatchPath, SlotElementSpec},
42 solution::{MatchPathSolution, SolutionSignature},
43 MatchPathSolver, MatchSolveContext,
44 },
45};
46
47pub struct ContinuousQuery {
48 expression_evaluator: Arc<ExpressionEvaluator>,
49 part_evaluator: Arc<QueryPartEvaluator>,
50 element_index: Arc<dyn ElementIndex>,
51 path_solver: Arc<MatchPathSolver>,
52 match_path: Arc<MatchPath>,
53 query: Arc<Query>,
54 future_consumer_shutdown_request: Arc<Notify>,
55 future_queue: Arc<dyn FutureQueue>,
56 future_queue_task: Mutex<Option<JoinHandle<()>>>,
57 change_lock: Mutex<()>,
58 source_pipelines: SourceMiddlewarePipelineCollection,
59}
60
61impl ContinuousQuery {
62 #[allow(clippy::too_many_arguments)]
63 pub fn new(
64 query: Arc<Query>,
65 match_path: Arc<MatchPath>,
66 expression_evaluator: Arc<ExpressionEvaluator>,
67 element_index: Arc<dyn ElementIndex>,
68 path_solver: Arc<MatchPathSolver>,
69 part_evaluator: Arc<QueryPartEvaluator>,
70 future_queue: Arc<dyn FutureQueue>,
71 source_pipelines: SourceMiddlewarePipelineCollection,
72 ) -> Self {
73 Self {
74 expression_evaluator,
75 element_index,
76 path_solver,
77 match_path,
78 part_evaluator,
79 query,
80 future_consumer_shutdown_request: Arc::new(Notify::new()),
81 future_queue,
82 future_queue_task: Mutex::new(None),
83 change_lock: Mutex::new(()),
84 source_pipelines,
85 }
86 }
87
88 #[tracing::instrument(skip_all, err, level = "debug")]
89 pub async fn process_source_change(
90 &self,
91 change: SourceChange,
92 ) -> Result<Vec<QueryPartEvaluationContext>, EvaluationError> {
93 let _lock = self.change_lock.lock().await;
95 let mut result = Vec::new();
96
97 let changes = self.execute_source_middleware(change).await?;
98
99 for change in changes {
100 let base_variables = QueryVariables::new(); let after_clock = Arc::new(InstantQueryClock::from_source_change(&change));
102
103 let solution_changes = self
104 .build_solution_changes(&base_variables, change, after_clock.clone())
105 .await?;
106 let before_clock = match solution_changes.before_clock {
107 Some(before_clock) => before_clock,
108 None => after_clock.clone(),
109 };
110
111 let mut aggregation_results = CollapsedAggregationResults::new();
112
113 for (solution_signature, part_context) in solution_changes.changes {
114 let change_results = self
115 .project_solution(
116 part_context,
117 &ChangeContext {
118 solution_signature,
119 before_clock: before_clock.clone(),
120 after_clock: after_clock.clone(),
121 before_anchor_element: solution_changes.before_anchor_element.clone(),
122 after_anchor_element: solution_changes.anchor_element.clone(),
123 is_future_reprocess: solution_changes.is_future_reprocess,
124 before_grouping_hash: solution_signature,
125 after_grouping_hash: solution_signature,
126 },
127 )
128 .await?;
129 change_results.into_iter().for_each(|ctx| {
130 match &ctx {
131 QueryPartEvaluationContext::Aggregation {
132 before,
133 after,
134 default_before,
135 ..
136 } => {
137 if let Some(before) = before {
138 if before == after && !default_before {
139 return;
140 }
141 }
142
143 aggregation_results.insert(ctx);
144 }
145 QueryPartEvaluationContext::Updating { before, after } => {
146 if before == after {
147 return;
148 }
149 result.push(ctx);
150 }
151 _ => result.push(ctx),
152 };
153 });
154 }
155
156 for ctx in aggregation_results.into_result_vec() {
157 result.push(ctx);
158 }
159 }
160 Ok(result)
162 }
163
164 #[tracing::instrument(skip_all, err, level = "debug")]
165 async fn build_solution_changes(
166 &self,
167 base_variables: &QueryVariables,
168 change: SourceChange,
169 clock: Arc<dyn QueryClock>,
170 ) -> Result<SolutionChangesResult, EvaluationError> {
171 let mut result = SolutionChangesResult::new();
172 let mut before_change_solutions = HashMap::new();
173 let mut after_change_solutions = HashMap::new();
174
175 match change {
176 SourceChange::Insert { element } => {
177 let element = Arc::new(element);
178 let affinity_slots = self
179 .get_slots_with_affinity(base_variables, element.clone(), clock.clone())
180 .await?;
181 let solutions = self
182 .resolve_solutions(element.clone(), affinity_slots, true)
183 .await?;
184
185 for (signature, solution) in solutions {
186 if let Some(blank_optional_solution) =
187 solution.get_empty_optional_solution(&self.match_path)
188 {
189 before_change_solutions.insert(signature, blank_optional_solution);
190 }
191 after_change_solutions.insert(signature, solution);
192 }
193
194 result.anchor_element = Some(element);
195 }
196 SourceChange::Update { mut element } => {
197 if let Some(prev_version) = self
198 .element_index
199 .get_element(element.get_reference())
200 .await?
201 {
202 let prev_timestamp = prev_version.get_effective_from();
203 let before_clock =
204 Arc::new(InstantQueryClock::new(prev_timestamp, clock.get_realtime()));
205 let affinity_slots = self
206 .get_slots_with_affinity(
207 base_variables,
208 prev_version.clone(),
209 before_clock.clone(),
210 )
211 .await?;
212 let solutions = self
213 .resolve_solutions(prev_version.clone(), affinity_slots, false)
214 .await?;
215 for (signature, solution) in solutions {
216 before_change_solutions.insert(signature, solution);
217 }
218 element.merge_missing_properties(prev_version.as_ref());
219 result.before_clock = Some(before_clock);
220 result.before_anchor_element = Some(prev_version);
221 }
222
223 let element = Arc::new(element);
224 let affinity_slots = self
225 .get_slots_with_affinity(base_variables, element.clone(), clock.clone())
226 .await?;
227 let solutions = self
228 .resolve_solutions(element.clone(), affinity_slots, true)
229 .await?;
230
231 for (signature, solution) in solutions {
232 after_change_solutions.insert(signature, solution);
233 }
234
235 result.anchor_element = Some(element);
236 }
237 SourceChange::Delete { metadata } => {
238 if let Some(element) = self.element_index.get_element(&metadata.reference).await? {
239 let prev_timestamp = element.get_effective_from();
240 let before_clock =
241 Arc::new(InstantQueryClock::new(prev_timestamp, clock.get_realtime()));
242 let affinity_slots = self
243 .get_slots_with_affinity(
244 base_variables,
245 element.clone(),
246 before_clock.clone(),
247 )
248 .await?;
249 let solutions = self
250 .resolve_solutions(element.clone(), affinity_slots, false)
251 .await?;
252 for (signature, solution) in solutions {
253 if let Some(blank_optional_solution) =
254 solution.get_empty_optional_solution(&self.match_path)
255 {
256 after_change_solutions.insert(signature, blank_optional_solution);
257 }
258
259 before_change_solutions.insert(signature, solution);
260 }
261 result.before_clock = Some(before_clock);
262 result.before_anchor_element = Some(element);
263
264 match self.element_index.delete_element(&metadata.reference).await {
265 Ok(_) => {}
266 Err(e) => return Err(EvaluationError::from(e)),
267 }
268 }
269 }
270 SourceChange::Future { future_ref } => {
271 result.is_future_reprocess = true;
272 if let Some(element) = self
273 .element_index
274 .get_element(&future_ref.element_ref)
275 .await?
276 {
277 let prev_timestamp = element.get_effective_from();
278 if prev_timestamp >= future_ref.due_time {
279 return Ok(result);
281 }
282
283 let before_clock =
284 Arc::new(InstantQueryClock::new(prev_timestamp, prev_timestamp));
285
286 let affinity_slots = self
287 .get_slots_with_affinity(
288 base_variables,
289 element.clone(),
290 before_clock.clone(),
291 )
292 .await?;
293
294 let before_solutions = self
295 .resolve_solutions(element.clone(), affinity_slots, false)
296 .await?;
297 for (signature, solution) in before_solutions {
298 before_change_solutions.insert(signature, solution);
299 }
300
301 result.before_clock = Some(before_clock);
302 result.before_anchor_element = Some(element.clone());
303
304 let affinity_slots = self
305 .get_slots_with_affinity(base_variables, element.clone(), clock.clone())
306 .await?;
307
308 let after_solutions = self
309 .resolve_solutions(element.clone(), affinity_slots, false)
310 .await?;
311 for (signature, solution) in after_solutions {
312 after_change_solutions.insert(signature, solution);
313 }
314
315 result.anchor_element = Some(element);
316 }
317 }
318 }
319
320 for (sig, before_sol) in &before_change_solutions {
321 match after_change_solutions.get(sig) {
322 Some(after_sol) => result.changes.push((
323 *sig,
324 QueryPartEvaluationContext::Updating {
325 before: before_sol.into_query_variables(&self.match_path, base_variables),
326 after: after_sol.into_query_variables(&self.match_path, base_variables),
327 },
328 )),
329 None => result.changes.push((
330 *sig,
331 QueryPartEvaluationContext::Removing {
332 before: before_sol.into_query_variables(&self.match_path, base_variables),
333 },
334 )),
335 }
336 }
337
338 for (sig, after_sol) in &after_change_solutions {
339 if !before_change_solutions.contains_key(sig) {
340 result.changes.push((
341 *sig,
342 QueryPartEvaluationContext::Adding {
343 after: after_sol.into_query_variables(&self.match_path, base_variables),
344 },
345 ))
346 }
347 }
348
349 Ok(result)
350 }
351
352 async fn resolve_solutions(
353 &self,
354 anchor_element: Arc<Element>,
355 affinity_slots: Vec<usize>,
356 update_index: bool,
357 ) -> Result<HashMap<u64, MatchPathSolution>, EvaluationError> {
358 if update_index {
359 self.element_index
360 .set_element(anchor_element.as_ref(), &affinity_slots)
361 .await?;
362 }
363
364 let mut result = HashMap::new();
365
366 for slot_num in affinity_slots {
367 let solution = self
368 .path_solver
369 .solve(self.match_path.clone(), anchor_element.clone(), slot_num)
370 .await?;
371 result.extend(solution.into_iter());
372 }
373
374 Ok(result)
375 }
376
377 async fn get_slots_with_affinity(
378 &self,
379 variables: &QueryVariables,
380 anchor_element: Arc<Element>,
381 clock: Arc<dyn QueryClock>,
382 ) -> Result<Vec<usize>, EvaluationError> {
383 let context = MatchSolveContext::new(variables, clock);
384
385 let mut affinity_slots = Vec::new();
386
387 for (slot_num, slot) in self.match_path.slots.iter().enumerate() {
388 if self
389 .match_element_to_slot(&context, &slot.spec, anchor_element.clone())
390 .await?
391 {
392 affinity_slots.push(slot_num);
393 }
394 }
395
396 Ok(affinity_slots)
397 }
398
399 async fn match_element_to_slot(
400 &self,
401 context: &MatchSolveContext<'_>,
402 element_spec: &SlotElementSpec,
403 element: Arc<Element>,
404 ) -> Result<bool, EvaluationError> {
405 let metadata = element.get_metadata();
406 let mut label_match = element_spec.labels.is_empty();
407
408 for label in &element_spec.labels {
409 if metadata.labels.contains(label) {
410 label_match = true;
411 break;
412 }
413 }
414
415 if !label_match {
416 return Ok(false);
417 }
418
419 let mut variables = context.variables.clone();
420
421 let element_variable = element.to_expression_variable();
422
423 if element_spec.annotation.is_some() {
424 variables.insert(
425 element_spec
426 .annotation
427 .clone()
428 .unwrap()
429 .to_string()
430 .into_boxed_str(),
431 element_variable.clone(),
432 );
433 }
434
435 variables.insert("".into(), element_variable);
436
437 let eval_context = ExpressionEvaluationContext::from_slot(
438 &variables,
439 context.clock.clone(),
440 &metadata.reference,
441 );
442
443 for predicate in &element_spec.predicates {
444 let result = self
445 .expression_evaluator
446 .evaluate_predicate(&eval_context, predicate)
447 .await?;
448 if !result {
449 return Ok(false);
450 }
451 }
452
453 Ok(true)
454 }
455
456 #[tracing::instrument(skip_all, err, level = "debug")]
457 async fn project_solution(
458 &self,
459 part_context: QueryPartEvaluationContext,
460 change_context: &ChangeContext,
461 ) -> Result<Vec<QueryPartEvaluationContext>, EvaluationError> {
462 let mut result = Vec::new();
463 let mut contexts = vec![(part_context, change_context.clone())];
464
465 let mut part_num = 0;
466
467 for part in &self.query.parts {
468 part_num += 1;
469 result.clear();
470
471 for (part_context, change_context) in &contexts {
472 let new_contexts = self
473 .part_evaluator
474 .evaluate(part_context.clone(), part_num, part, change_context)
475 .await?;
476
477 let mut aggregation_results = CollapsedAggregationResults::new();
478
479 new_contexts.into_iter().for_each(|ctx| match &ctx {
480 QueryPartEvaluationContext::Aggregation { .. } => {
481 aggregation_results.insert(ctx)
482 }
483 QueryPartEvaluationContext::Noop => (),
484 _ => result.push((ctx, change_context.clone())),
485 });
486
487 for actx in aggregation_results.into_vec_with_context(change_context) {
488 result.push(actx);
489 }
490 }
491 contexts = result.clone();
492 }
493
494 Ok(result.into_iter().map(|(ctx, _)| ctx).collect())
495 }
496
497 #[tracing::instrument(skip_all, err, level = "debug")]
498 async fn execute_source_middleware(
499 &self,
500 change: SourceChange,
501 ) -> Result<Vec<SourceChange>, MiddlewareError> {
502 let source_id = change.get_reference().source_id.clone();
503 let mut source_changes = vec![change];
504
505 let pipeline = match self.source_pipelines.get(source_id) {
506 Some(pipeline) => pipeline,
507 None => return Ok(source_changes),
508 };
509
510 let mut new_source_changes = Vec::new();
511 for source_change in source_changes {
512 new_source_changes.append(
513 &mut pipeline
514 .process(source_change, self.element_index.clone())
515 .await?,
516 );
517 }
518
519 source_changes = new_source_changes;
520
521 Ok(source_changes)
522 }
523
524 pub async fn set_future_consumer(&self, consumer: Arc<dyn FutureQueueConsumer>) {
525 let mut future_queue_task = self.future_queue_task.lock().await;
526 if let Some(c) = future_queue_task.take() {
527 c.abort();
528 }
529
530 let queue = self.future_queue.clone();
531 let shutdown_request = self.future_consumer_shutdown_request.clone();
532
533 let task = tokio::spawn(async move {
534 let idle_interval = Duration::from_secs(1);
535 let error_interval = Duration::from_secs(5);
536 loop {
537 select! {
538 _ = shutdown_request.notified() => {
539 log::info!("Future queue consumer shutting down");
540 break;
541 }
542 peek = queue.peek_due_time() => {
543 let fut_ref = match peek {
544 Ok(Some(due_time)) => {
545 if due_time > consumer.now() {
546 tokio::time::sleep(idle_interval).await;
547 continue;
548 }
549 match queue.pop().await {
550 Ok(Some(element)) => element,
551 Ok(None) => {
552 tokio::time::sleep(idle_interval).await;
553 continue;
554 }
555 Err(e) => {
556 log::error!("Future queue consumer error: {e:?}");
557 tokio::time::sleep(error_interval).await;
558 continue;
559 }
560 }
561 }
562 Ok(None) => {
563 tokio::time::sleep(idle_interval).await;
564 continue;
565 }
566 Err(e) => {
567 log::error!("Future queue consumer error: {e:?}");
568 tokio::time::sleep(error_interval).await;
569 continue;
570 }
571 };
572
573 log::info!("Future queue consumer processing {}", &fut_ref.element_ref);
574
575 match consumer.on_due(&fut_ref).await {
576 Ok(_) => log::info!("Future queue consumer processed {}", &fut_ref.element_ref),
577 Err(e) => {
578 log::error!("Future queue consumer error: {e:?}");
579 consumer.on_error(&fut_ref, e).await;
580 }
581 }
582 }
583 }
584 }
585 });
586
587 _ = future_queue_task.insert(task);
588 }
589
590 pub async fn terminate_future_consumer(&self) {
591 let mut future_queue_task = self.future_queue_task.lock().await;
592 if let Some(task) = future_queue_task.take() {
593 self.future_consumer_shutdown_request.notify_one();
594 select! {
595 _ = task => {
596 log::info!("Future queue consumer terminated");
597 }
598 _ = tokio::time::sleep(Duration::from_secs(10)) => {
599 log::error!("Future queue consumer termination timeout");
600 }
601 }
602 }
603 }
604
605 pub fn get_query(&self) -> Arc<Query> {
606 self.query.clone()
607 }
608}
609
610impl Drop for ContinuousQuery {
611 fn drop(&mut self) {
612 self.future_consumer_shutdown_request.notify_one();
613 }
614}
615
616impl Debug for ContinuousQuery {
617 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
618 f.debug_struct("ContinuousQuery")
619 .field("query", &self.query)
620 .finish()
621 }
622}
623
624struct SolutionChangesResult {
625 pub changes: Vec<(SolutionSignature, QueryPartEvaluationContext)>,
626 pub anchor_element: Option<Arc<Element>>,
627 pub before_clock: Option<Arc<dyn QueryClock>>,
628 pub before_anchor_element: Option<Arc<Element>>,
629 pub is_future_reprocess: bool,
630}
631
632impl SolutionChangesResult {
633 fn new() -> Self {
634 Self {
635 changes: Vec::new(),
636 before_clock: None,
637 anchor_element: None,
638 before_anchor_element: None,
639 is_future_reprocess: false,
640 }
641 }
642}
643
644struct CollapsedAggregationResults {
645 data: HashMap<u64, (QueryPartEvaluationContext, u64)>,
647}
648
649impl CollapsedAggregationResults {
650 fn new() -> Self {
651 Self {
652 data: HashMap::new(),
653 }
654 }
655
656 fn insert(&mut self, context: QueryPartEvaluationContext) {
657 if let QueryPartEvaluationContext::Aggregation {
658 before,
659 after,
660 grouping_keys,
661 default_before,
662 default_after,
663 } = context
664 {
665 let after_key = extract_grouping_value_hash(&grouping_keys, &after);
666 let before_key = match &before {
667 Some(before) => extract_grouping_value_hash(&grouping_keys, before),
668 None => after_key,
669 };
670
671 match self.data.remove(&after_key) {
672 Some((existing, before_key)) => {
673 if let QueryPartEvaluationContext::Aggregation {
674 before: existing_before,
675 ..
676 } = existing
677 {
678 self.data.insert(
679 after_key,
680 (
681 QueryPartEvaluationContext::Aggregation {
682 before: existing_before,
683 default_before,
684 default_after,
685 after,
686 grouping_keys,
687 },
688 before_key,
689 ),
690 );
691 }
692 }
693 None => {
694 self.data.insert(
695 after_key,
696 (
697 QueryPartEvaluationContext::Aggregation {
698 before,
699 after,
700 grouping_keys,
701 default_before,
702 default_after,
703 },
704 before_key,
705 ),
706 );
707 }
708 }
709 }
710 }
711
712 fn into_vec_with_context(
713 self,
714 change_context: &ChangeContext,
715 ) -> Vec<(QueryPartEvaluationContext, ChangeContext)> {
716 self.data
717 .into_iter()
718 .map(|(after_key, (v, before_key))| {
719 let mut change_context = change_context.clone();
720 change_context.before_grouping_hash = before_key;
721 change_context.after_grouping_hash = after_key;
722 (v, change_context)
723 })
724 .collect()
725 }
726
727 fn into_result_vec(self) -> Vec<QueryPartEvaluationContext> {
728 self.data.into_iter().map(|(_, (v, _))| v).collect()
729 }
730}
731
732fn extract_grouping_value_hash(grouping_keys: &Vec<String>, variables: &QueryVariables) -> u64 {
733 let mut hasher = SpookyHasher::default();
734
735 for key in grouping_keys {
736 match variables.get(key.as_str()) {
737 Some(v) => v.hash_for_groupby(&mut hasher),
738 None => 0.hash(&mut hasher),
739 };
740 }
741 hasher.finish()
742}