1use std::{
16 collections::HashMap,
17 fmt::Debug,
18 hash::{Hash, Hasher},
19 sync::Arc,
20 time::Duration,
21};
22
23use drasi_query_ast::ast::Query;
24use hashers::jenkins::spooky_hash::SpookyHasher;
25use tokio::{
26 select,
27 sync::{Mutex, Notify},
28 task::JoinHandle,
29};
30
31use crate::{
32 evaluation::{
33 context::{ChangeContext, QueryPartEvaluationContext, QueryVariables},
34 EvaluationError, ExpressionEvaluationContext, ExpressionEvaluator, InstantQueryClock,
35 QueryPartEvaluator,
36 },
37 interface::{ElementIndex, FutureQueue, FutureQueueConsumer, MiddlewareError, QueryClock},
38 middleware::SourceMiddlewarePipelineCollection,
39 models::{Element, SourceChange},
40 path_solver::{
41 match_path::{MatchPath, SlotElementSpec},
42 solution::{MatchPathSolution, SolutionSignature},
43 MatchPathSolver, MatchSolveContext,
44 },
45};
46
47pub struct ContinuousQuery {
48 expression_evaluator: Arc<ExpressionEvaluator>,
49 part_evaluator: Arc<QueryPartEvaluator>,
50 element_index: Arc<dyn ElementIndex>,
51 path_solver: Arc<MatchPathSolver>,
52 match_path: Arc<MatchPath>,
53 query: Arc<Query>,
54 future_consumer_shutdown_request: Arc<Notify>,
55 future_queue: Arc<dyn FutureQueue>,
56 future_queue_task: Mutex<Option<JoinHandle<()>>>,
57 change_lock: Mutex<()>,
58 source_pipelines: SourceMiddlewarePipelineCollection,
59}
60
61impl ContinuousQuery {
62 #[allow(clippy::too_many_arguments)]
63 pub fn new(
64 query: Arc<Query>,
65 match_path: Arc<MatchPath>,
66 expression_evaluator: Arc<ExpressionEvaluator>,
67 element_index: Arc<dyn ElementIndex>,
68 path_solver: Arc<MatchPathSolver>,
69 part_evaluator: Arc<QueryPartEvaluator>,
70 future_queue: Arc<dyn FutureQueue>,
71 source_pipelines: SourceMiddlewarePipelineCollection,
72 ) -> Self {
73 Self {
74 expression_evaluator,
75 element_index,
76 path_solver,
77 match_path,
78 part_evaluator,
79 query,
80 future_consumer_shutdown_request: Arc::new(Notify::new()),
81 future_queue,
82 future_queue_task: Mutex::new(None),
83 change_lock: Mutex::new(()),
84 source_pipelines,
85 }
86 }
87
88 #[tracing::instrument(skip_all, err)]
89 pub async fn process_source_change(
90 &self,
91 change: SourceChange,
92 ) -> Result<Vec<QueryPartEvaluationContext>, EvaluationError> {
93 let _lock = self.change_lock.lock().await;
95 let mut result = Vec::new();
96
97 let changes = self.execute_source_middleware(change).await?;
98
99 for change in changes {
100 let base_variables = QueryVariables::new(); let after_clock = Arc::new(InstantQueryClock::from_source_change(&change));
102
103 let solution_changes = self
104 .build_solution_changes(&base_variables, change, after_clock.clone())
105 .await?;
106 let before_clock = match solution_changes.before_clock {
107 Some(before_clock) => before_clock,
108 None => after_clock.clone(),
109 };
110
111 let mut aggregation_results = CollapsedAggregationResults::new();
112
113 for (solution_signature, part_context) in solution_changes.changes {
114 let change_results = self
115 .project_solution(
116 part_context,
117 &ChangeContext {
118 solution_signature,
119 before_clock: before_clock.clone(),
120 after_clock: after_clock.clone(),
121 before_anchor_element: solution_changes.before_anchor_element.clone(),
122 after_anchor_element: solution_changes.anchor_element.clone(),
123 is_future_reprocess: solution_changes.is_future_reprocess,
124 before_grouping_hash: solution_signature,
125 after_grouping_hash: solution_signature,
126 },
127 )
128 .await?;
129 change_results.into_iter().for_each(|ctx| {
130 match &ctx {
131 QueryPartEvaluationContext::Aggregation { before, after, .. } => {
132 if let Some(before) = before {
133 if before == after {
134 return;
135 }
136 }
137
138 aggregation_results.insert(ctx);
139 }
140 QueryPartEvaluationContext::Updating { before, after } => {
141 if before == after {
142 return;
143 }
144 result.push(ctx);
145 }
146 _ => result.push(ctx),
147 };
148 });
149 }
150
151 for ctx in aggregation_results.into_result_vec() {
152 result.push(ctx);
153 }
154 }
155 Ok(result)
157 }
158
159 #[tracing::instrument(skip_all, err)]
160 async fn build_solution_changes(
161 &self,
162 base_variables: &QueryVariables,
163 change: SourceChange,
164 clock: Arc<dyn QueryClock>,
165 ) -> Result<SolutionChangesResult, EvaluationError> {
166 let mut result = SolutionChangesResult::new();
167 let mut before_change_solutions = HashMap::new();
168 let mut after_change_solutions = HashMap::new();
169
170 match change {
171 SourceChange::Insert { element } => {
172 let element = Arc::new(element);
173 let solutions = self
174 .resolve_solutions(base_variables, element.clone(), clock.clone(), true)
175 .await?;
176
177 for (signature, solution) in solutions {
178 after_change_solutions.insert(signature, solution);
179 }
180
181 result.anchor_element = Some(element);
182 }
183 SourceChange::Update { mut element } => {
184 if let Some(prev_version) = self
185 .element_index
186 .get_element(element.get_reference())
187 .await?
188 {
189 let prev_timestamp = prev_version.get_effective_from();
190 let before_clock =
191 Arc::new(InstantQueryClock::new(prev_timestamp, clock.get_realtime()));
192 let solutions = self
193 .resolve_solutions(
194 base_variables,
195 prev_version.clone(),
196 before_clock.clone(),
197 false,
198 )
199 .await?;
200 for (signature, solution) in solutions {
201 before_change_solutions.insert(signature, solution);
202 }
203 element.merge_missing_properties(prev_version.as_ref());
204 result.before_clock = Some(before_clock);
205 result.before_anchor_element = Some(prev_version);
206 }
207
208 let element = Arc::new(element);
209 let solutions = self
210 .resolve_solutions(base_variables, element.clone(), clock.clone(), true)
211 .await?;
212
213 for (signature, solution) in solutions {
214 after_change_solutions.insert(signature, solution);
215 }
216
217 result.anchor_element = Some(element);
218 }
219 SourceChange::Delete { metadata } => {
220 if let Some(element) = self.element_index.get_element(&metadata.reference).await? {
221 let prev_timestamp = element.get_effective_from();
222 let before_clock =
223 Arc::new(InstantQueryClock::new(prev_timestamp, clock.get_realtime()));
224 let solutions = self
225 .resolve_solutions(
226 base_variables,
227 element.clone(),
228 before_clock.clone(),
229 false,
230 )
231 .await?;
232 for (signature, solution) in solutions {
233 before_change_solutions.insert(signature, solution);
234 }
235 result.before_clock = Some(before_clock);
236 result.before_anchor_element = Some(element);
237
238 match self.element_index.delete_element(&metadata.reference).await {
239 Ok(_) => {}
240 Err(e) => return Err(EvaluationError::from(e)),
241 }
242 }
243 }
244 SourceChange::Future { future_ref } => {
245 result.is_future_reprocess = true;
246 if let Some(element) = self
247 .element_index
248 .get_element(&future_ref.element_ref)
249 .await?
250 {
251 let prev_timestamp = element.get_effective_from();
252 if prev_timestamp >= future_ref.due_time {
253 return Ok(result);
255 }
256
257 let before_clock =
258 Arc::new(InstantQueryClock::new(prev_timestamp, prev_timestamp));
259
260 let before_solutions = self
261 .resolve_solutions(
262 base_variables,
263 element.clone(),
264 before_clock.clone(),
265 false,
266 )
267 .await?;
268 for (signature, solution) in before_solutions {
269 before_change_solutions.insert(signature, solution);
270 }
271
272 result.before_clock = Some(before_clock);
273 result.before_anchor_element = Some(element.clone());
274
275 let after_solutions = self
276 .resolve_solutions(base_variables, element.clone(), clock.clone(), false)
277 .await?;
278 for (signature, solution) in after_solutions {
279 after_change_solutions.insert(signature, solution);
280 }
281
282 result.anchor_element = Some(element);
283 }
284 }
285 }
286
287 for (sig, before_sol) in &before_change_solutions {
288 match after_change_solutions.get(sig) {
289 Some(after_sol) => result.changes.push((
290 *sig,
291 QueryPartEvaluationContext::Updating {
292 before: before_sol.into_query_variables(&self.match_path, base_variables),
293 after: after_sol.into_query_variables(&self.match_path, base_variables),
294 },
295 )),
296 None => result.changes.push((
297 *sig,
298 QueryPartEvaluationContext::Removing {
299 before: before_sol.into_query_variables(&self.match_path, base_variables),
300 },
301 )),
302 }
303 }
304
305 for (sig, after_sol) in &after_change_solutions {
306 if !before_change_solutions.contains_key(sig) {
307 result.changes.push((
308 *sig,
309 QueryPartEvaluationContext::Adding {
310 after: after_sol.into_query_variables(&self.match_path, base_variables),
311 },
312 ))
313 }
314 }
315
316 Ok(result)
317 }
318
319 async fn resolve_solutions(
320 &self,
321 variables: &QueryVariables,
322 anchor_element: Arc<Element>,
323 clock: Arc<dyn QueryClock>,
324 update_index: bool,
325 ) -> Result<HashMap<u64, MatchPathSolution>, EvaluationError> {
326 let context = MatchSolveContext::new(variables, clock);
327
328 let mut affinity_slots = Vec::new();
329
330 for (slot_num, slot) in self.match_path.slots.iter().enumerate() {
331 if self
332 .match_element_to_slot(&context, &slot.spec, anchor_element.clone())
333 .await?
334 {
335 affinity_slots.push(slot_num);
336 }
337 }
338
339 if update_index {
340 self.element_index
341 .set_element(anchor_element.as_ref(), &affinity_slots)
342 .await?;
343 }
344
345 let mut result = HashMap::new();
346
347 for slot_num in affinity_slots {
348 let solution = self
349 .path_solver
350 .solve(self.match_path.clone(), anchor_element.clone(), slot_num)
351 .await?;
352 result.extend(solution.into_iter());
353 }
354
355 Ok(result)
356 }
357
358 async fn match_element_to_slot(
359 &self,
360 context: &MatchSolveContext<'_>,
361 element_spec: &SlotElementSpec,
362 element: Arc<Element>,
363 ) -> Result<bool, EvaluationError> {
364 let metadata = element.get_metadata();
365 let mut label_match = element_spec.labels.is_empty();
366
367 for label in &element_spec.labels {
368 if metadata.labels.contains(label) {
369 label_match = true;
370 break;
371 }
372 }
373
374 if !label_match {
375 return Ok(false);
376 }
377
378 let mut variables = context.variables.clone();
379
380 let element_variable = element.to_expression_variable();
381
382 if element_spec.annotation.is_some() {
383 variables.insert(
384 element_spec
385 .annotation
386 .clone()
387 .unwrap()
388 .to_string()
389 .into_boxed_str(),
390 element_variable.clone(),
391 );
392 }
393
394 variables.insert("".into(), element_variable);
395
396 let eval_context = ExpressionEvaluationContext::new(&variables, context.clock.clone());
397
398 for predicate in &element_spec.predicates {
399 let result = self
400 .expression_evaluator
401 .evaluate_predicate(&eval_context, predicate)
402 .await?;
403 if !result {
404 return Ok(false);
405 }
406 }
407
408 Ok(true)
409 }
410
411 #[tracing::instrument(skip_all, err)]
412 async fn project_solution(
413 &self,
414 part_context: QueryPartEvaluationContext,
415 change_context: &ChangeContext,
416 ) -> Result<Vec<QueryPartEvaluationContext>, EvaluationError> {
417 let mut result = Vec::new();
418 let mut contexts = vec![(part_context, change_context.clone())];
419
420 let mut part_num = 0;
421
422 for part in &self.query.parts {
423 part_num += 1;
424 result.clear();
425
426 for (part_context, change_context) in &contexts {
427 let new_contexts = self
428 .part_evaluator
429 .evaluate(part_context.clone(), part_num, part, change_context)
430 .await?;
431
432 let mut aggregation_results = CollapsedAggregationResults::new();
433
434 new_contexts.into_iter().for_each(|ctx| match &ctx {
435 QueryPartEvaluationContext::Aggregation { .. } => {
436 aggregation_results.insert(ctx)
437 }
438 QueryPartEvaluationContext::Noop => (),
439 _ => result.push((ctx, change_context.clone())),
440 });
441
442 for actx in aggregation_results.into_vec_with_context(change_context) {
443 result.push(actx);
444 }
445 }
446 contexts = result.clone();
447 }
448
449 Ok(result.into_iter().map(|(ctx, _)| ctx).collect())
450 }
451
452 #[tracing::instrument(skip_all, err)]
453 async fn execute_source_middleware(
454 &self,
455 change: SourceChange,
456 ) -> Result<Vec<SourceChange>, MiddlewareError> {
457 let source_id = change.get_reference().source_id.clone();
458 let mut source_changes = vec![change];
459
460 let pipeline = match self.source_pipelines.get(source_id) {
461 Some(pipeline) => pipeline,
462 None => return Ok(source_changes),
463 };
464
465 let mut new_source_changes = Vec::new();
466 for source_change in source_changes {
467 new_source_changes.append(&mut pipeline.process(source_change).await?);
468 }
469
470 source_changes = new_source_changes;
471
472 Ok(source_changes)
473 }
474
475 pub async fn set_future_consumer(&self, consumer: Arc<dyn FutureQueueConsumer>) {
476 let mut future_queue_task = self.future_queue_task.lock().await;
477 if let Some(c) = future_queue_task.take() {
478 c.abort();
479 }
480
481 let queue = self.future_queue.clone();
482 let shutdown_request = self.future_consumer_shutdown_request.clone();
483
484 let task = tokio::spawn(async move {
485 let idle_interval = Duration::from_secs(1);
486 let error_interval = Duration::from_secs(5);
487 loop {
488 select! {
489 _ = shutdown_request.notified() => {
490 log::info!("Future queue consumer shutting down");
491 break;
492 }
493 peek = queue.peek_due_time() => {
494 let fut_ref = match peek {
495 Ok(Some(due_time)) => {
496 if due_time > consumer.now() {
497 tokio::time::sleep(idle_interval).await;
498 continue;
499 }
500 match queue.pop().await {
501 Ok(Some(element)) => element,
502 Ok(None) => {
503 tokio::time::sleep(idle_interval).await;
504 continue;
505 }
506 Err(e) => {
507 log::error!("Future queue consumer error: {:?}", e);
508 tokio::time::sleep(error_interval).await;
509 continue;
510 }
511 }
512 }
513 Ok(None) => {
514 tokio::time::sleep(idle_interval).await;
515 continue;
516 }
517 Err(e) => {
518 log::error!("Future queue consumer error: {:?}", e);
519 tokio::time::sleep(error_interval).await;
520 continue;
521 }
522 };
523
524 log::info!("Future queue consumer processing {}", &fut_ref.element_ref);
525
526 match consumer.on_due(&fut_ref).await {
527 Ok(_) => log::info!("Future queue consumer processed {}", &fut_ref.element_ref),
528 Err(e) => {
529 log::error!("Future queue consumer error: {:?}", e);
530 consumer.on_error(&fut_ref, e).await;
531 }
532 }
533 }
534 }
535 }
536 });
537
538 _ = future_queue_task.insert(task);
539 }
540
541 pub async fn terminate_future_consumer(&self) {
542 let mut future_queue_task = self.future_queue_task.lock().await;
543 if let Some(task) = future_queue_task.take() {
544 self.future_consumer_shutdown_request.notify_one();
545 select! {
546 _ = task => {
547 log::info!("Future queue consumer terminated");
548 }
549 _ = tokio::time::sleep(Duration::from_secs(10)) => {
550 log::error!("Future queue consumer termination timeout");
551 }
552 }
553 }
554 }
555
556 pub fn get_query(&self) -> Arc<Query> {
557 self.query.clone()
558 }
559}
560
561impl Drop for ContinuousQuery {
562 fn drop(&mut self) {
563 self.future_consumer_shutdown_request.notify_one();
564 }
565}
566
567impl Debug for ContinuousQuery {
568 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
569 f.debug_struct("ContinuousQuery")
570 .field("query", &self.query)
571 .finish()
572 }
573}
574
575struct SolutionChangesResult {
576 pub changes: Vec<(SolutionSignature, QueryPartEvaluationContext)>,
577 pub anchor_element: Option<Arc<Element>>,
578 pub before_clock: Option<Arc<dyn QueryClock>>,
579 pub before_anchor_element: Option<Arc<Element>>,
580 pub is_future_reprocess: bool,
581}
582
583impl SolutionChangesResult {
584 fn new() -> Self {
585 Self {
586 changes: Vec::new(),
587 before_clock: None,
588 anchor_element: None,
589 before_anchor_element: None,
590 is_future_reprocess: false,
591 }
592 }
593}
594
595struct CollapsedAggregationResults {
596 data: HashMap<u64, (QueryPartEvaluationContext, u64)>,
598}
599
600impl CollapsedAggregationResults {
601 fn new() -> Self {
602 Self {
603 data: HashMap::new(),
604 }
605 }
606
607 fn insert(&mut self, context: QueryPartEvaluationContext) {
608 if let QueryPartEvaluationContext::Aggregation {
609 before,
610 after,
611 grouping_keys,
612 default_before,
613 default_after,
614 } = context
615 {
616 let after_key = extract_grouping_value_hash(&grouping_keys, &after);
617 let before_key = match &before {
618 Some(before) => extract_grouping_value_hash(&grouping_keys, before),
619 None => after_key,
620 };
621
622 match self.data.remove(&after_key) {
623 Some((existing, before_key)) => {
624 if let QueryPartEvaluationContext::Aggregation {
625 before: existing_before,
626 ..
627 } = existing
628 {
629 self.data.insert(
630 after_key,
631 (
632 QueryPartEvaluationContext::Aggregation {
633 before: existing_before,
634 default_before,
635 default_after,
636 after,
637 grouping_keys,
638 },
639 before_key,
640 ),
641 );
642 }
643 }
644 None => {
645 self.data.insert(
646 after_key,
647 (
648 QueryPartEvaluationContext::Aggregation {
649 before,
650 after,
651 grouping_keys,
652 default_before,
653 default_after,
654 },
655 before_key,
656 ),
657 );
658 }
659 }
660 }
661 }
662
663 fn into_vec_with_context(
664 self,
665 change_context: &ChangeContext,
666 ) -> Vec<(QueryPartEvaluationContext, ChangeContext)> {
667 self.data
668 .into_iter()
669 .map(|(after_key, (v, before_key))| {
670 let mut change_context = change_context.clone();
671 change_context.before_grouping_hash = before_key;
672 change_context.after_grouping_hash = after_key;
673 (v, change_context)
674 })
675 .collect()
676 }
677
678 fn into_result_vec(self) -> Vec<QueryPartEvaluationContext> {
679 self.data.into_iter().map(|(_, (v, _))| v).collect()
680 }
681}
682
683fn extract_grouping_value_hash(grouping_keys: &Vec<String>, variables: &QueryVariables) -> u64 {
684 let mut hasher = SpookyHasher::default();
685
686 for key in grouping_keys {
687 match variables.get(key.as_str()) {
688 Some(v) => v.hash_for_groupby(&mut hasher),
689 None => 0.hash(&mut hasher),
690 };
691 }
692 hasher.finish()
693}