drasi_core/query/
continuous_query.rs

1// Copyright 2024 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::HashMap,
17    fmt::Debug,
18    hash::{Hash, Hasher},
19    sync::Arc,
20    time::Duration,
21};
22
23use drasi_query_ast::ast::Query;
24use hashers::jenkins::spooky_hash::SpookyHasher;
25use tokio::{
26    select,
27    sync::{Mutex, Notify},
28    task::JoinHandle,
29};
30
31use crate::{
32    evaluation::{
33        context::{ChangeContext, QueryPartEvaluationContext, QueryVariables},
34        EvaluationError, ExpressionEvaluationContext, ExpressionEvaluator, InstantQueryClock,
35        QueryPartEvaluator,
36    },
37    interface::{ElementIndex, FutureQueue, FutureQueueConsumer, MiddlewareError, QueryClock},
38    middleware::SourceMiddlewarePipelineCollection,
39    models::{Element, SourceChange},
40    path_solver::{
41        match_path::{MatchPath, SlotElementSpec},
42        solution::{MatchPathSolution, SolutionSignature},
43        MatchPathSolver, MatchSolveContext,
44    },
45};
46
47pub struct ContinuousQuery {
48    expression_evaluator: Arc<ExpressionEvaluator>,
49    part_evaluator: Arc<QueryPartEvaluator>,
50    element_index: Arc<dyn ElementIndex>,
51    path_solver: Arc<MatchPathSolver>,
52    match_path: Arc<MatchPath>,
53    query: Arc<Query>,
54    future_consumer_shutdown_request: Arc<Notify>,
55    future_queue: Arc<dyn FutureQueue>,
56    future_queue_task: Mutex<Option<JoinHandle<()>>>,
57    change_lock: Mutex<()>,
58    source_pipelines: SourceMiddlewarePipelineCollection,
59}
60
61impl ContinuousQuery {
62    #[allow(clippy::too_many_arguments)]
63    pub fn new(
64        query: Arc<Query>,
65        match_path: Arc<MatchPath>,
66        expression_evaluator: Arc<ExpressionEvaluator>,
67        element_index: Arc<dyn ElementIndex>,
68        path_solver: Arc<MatchPathSolver>,
69        part_evaluator: Arc<QueryPartEvaluator>,
70        future_queue: Arc<dyn FutureQueue>,
71        source_pipelines: SourceMiddlewarePipelineCollection,
72    ) -> Self {
73        Self {
74            expression_evaluator,
75            element_index,
76            path_solver,
77            match_path,
78            part_evaluator,
79            query,
80            future_consumer_shutdown_request: Arc::new(Notify::new()),
81            future_queue,
82            future_queue_task: Mutex::new(None),
83            change_lock: Mutex::new(()),
84            source_pipelines,
85        }
86    }
87
88    #[tracing::instrument(skip_all, err, level = "debug")]
89    pub async fn process_source_change(
90        &self,
91        change: SourceChange,
92    ) -> Result<Vec<QueryPartEvaluationContext>, EvaluationError> {
93        //println!("-> process_source_change {:?}", change);
94        let _lock = self.change_lock.lock().await;
95        let mut result = Vec::new();
96
97        let changes = self.execute_source_middleware(change).await?;
98
99        for change in changes {
100            let base_variables = QueryVariables::new(); //todo: get query parameters
101            let after_clock = Arc::new(InstantQueryClock::from_source_change(&change));
102
103            let solution_changes = self
104                .build_solution_changes(&base_variables, change, after_clock.clone())
105                .await?;
106            let before_clock = match solution_changes.before_clock {
107                Some(before_clock) => before_clock,
108                None => after_clock.clone(),
109            };
110
111            let mut aggregation_results = CollapsedAggregationResults::new();
112
113            for (solution_signature, part_context) in solution_changes.changes {
114                let change_results = self
115                    .project_solution(
116                        part_context,
117                        &ChangeContext {
118                            solution_signature,
119                            before_clock: before_clock.clone(),
120                            after_clock: after_clock.clone(),
121                            before_anchor_element: solution_changes.before_anchor_element.clone(),
122                            after_anchor_element: solution_changes.anchor_element.clone(),
123                            is_future_reprocess: solution_changes.is_future_reprocess,
124                            before_grouping_hash: solution_signature,
125                            after_grouping_hash: solution_signature,
126                        },
127                    )
128                    .await?;
129                change_results.into_iter().for_each(|ctx| {
130                    match &ctx {
131                        QueryPartEvaluationContext::Aggregation {
132                            before,
133                            after,
134                            default_before,
135                            ..
136                        } => {
137                            if let Some(before) = before {
138                                if before == after && !default_before {
139                                    return;
140                                }
141                            }
142
143                            aggregation_results.insert(ctx);
144                        }
145                        QueryPartEvaluationContext::Updating { before, after } => {
146                            if before == after {
147                                return;
148                            }
149                            result.push(ctx);
150                        }
151                        _ => result.push(ctx),
152                    };
153                });
154            }
155
156            for ctx in aggregation_results.into_result_vec() {
157                result.push(ctx);
158            }
159        }
160        //println!("--> process_source_change result {:?}", result);
161        Ok(result)
162    }
163
164    #[tracing::instrument(skip_all, err, level = "debug")]
165    async fn build_solution_changes(
166        &self,
167        base_variables: &QueryVariables,
168        change: SourceChange,
169        clock: Arc<dyn QueryClock>,
170    ) -> Result<SolutionChangesResult, EvaluationError> {
171        let mut result = SolutionChangesResult::new();
172        let mut before_change_solutions = HashMap::new();
173        let mut after_change_solutions = HashMap::new();
174
175        match change {
176            SourceChange::Insert { element } => {
177                let element = Arc::new(element);
178                let affinity_slots = self
179                    .get_slots_with_affinity(base_variables, element.clone(), clock.clone())
180                    .await?;
181                let solutions = self
182                    .resolve_solutions(element.clone(), affinity_slots, true)
183                    .await?;
184
185                for (signature, solution) in solutions {
186                    if let Some(blank_optional_solution) =
187                        solution.get_empty_optional_solution(&self.match_path)
188                    {
189                        before_change_solutions.insert(signature, blank_optional_solution);
190                    }
191                    after_change_solutions.insert(signature, solution);
192                }
193
194                result.anchor_element = Some(element);
195            }
196            SourceChange::Update { mut element } => {
197                if let Some(prev_version) = self
198                    .element_index
199                    .get_element(element.get_reference())
200                    .await?
201                {
202                    let prev_timestamp = prev_version.get_effective_from();
203                    let before_clock =
204                        Arc::new(InstantQueryClock::new(prev_timestamp, clock.get_realtime()));
205                    let affinity_slots = self
206                        .get_slots_with_affinity(
207                            base_variables,
208                            prev_version.clone(),
209                            before_clock.clone(),
210                        )
211                        .await?;
212                    let solutions = self
213                        .resolve_solutions(prev_version.clone(), affinity_slots, false)
214                        .await?;
215                    for (signature, solution) in solutions {
216                        before_change_solutions.insert(signature, solution);
217                    }
218                    element.merge_missing_properties(prev_version.as_ref());
219                    result.before_clock = Some(before_clock);
220                    result.before_anchor_element = Some(prev_version);
221                }
222
223                let element = Arc::new(element);
224                let affinity_slots = self
225                    .get_slots_with_affinity(base_variables, element.clone(), clock.clone())
226                    .await?;
227                let solutions = self
228                    .resolve_solutions(element.clone(), affinity_slots, true)
229                    .await?;
230
231                for (signature, solution) in solutions {
232                    after_change_solutions.insert(signature, solution);
233                }
234
235                result.anchor_element = Some(element);
236            }
237            SourceChange::Delete { metadata } => {
238                if let Some(element) = self.element_index.get_element(&metadata.reference).await? {
239                    let prev_timestamp = element.get_effective_from();
240                    let before_clock =
241                        Arc::new(InstantQueryClock::new(prev_timestamp, clock.get_realtime()));
242                    let affinity_slots = self
243                        .get_slots_with_affinity(
244                            base_variables,
245                            element.clone(),
246                            before_clock.clone(),
247                        )
248                        .await?;
249                    let solutions = self
250                        .resolve_solutions(element.clone(), affinity_slots, false)
251                        .await?;
252                    for (signature, solution) in solutions {
253                        if let Some(blank_optional_solution) =
254                            solution.get_empty_optional_solution(&self.match_path)
255                        {
256                            after_change_solutions.insert(signature, blank_optional_solution);
257                        }
258
259                        before_change_solutions.insert(signature, solution);
260                    }
261                    result.before_clock = Some(before_clock);
262                    result.before_anchor_element = Some(element);
263
264                    match self.element_index.delete_element(&metadata.reference).await {
265                        Ok(_) => {}
266                        Err(e) => return Err(EvaluationError::from(e)),
267                    }
268                }
269            }
270            SourceChange::Future { future_ref } => {
271                result.is_future_reprocess = true;
272                if let Some(element) = self
273                    .element_index
274                    .get_element(&future_ref.element_ref)
275                    .await?
276                {
277                    let prev_timestamp = element.get_effective_from();
278                    if prev_timestamp >= future_ref.due_time {
279                        // element already processed with due time expired, don't duplicate
280                        return Ok(result);
281                    }
282
283                    let before_clock =
284                        Arc::new(InstantQueryClock::new(prev_timestamp, prev_timestamp));
285
286                    let affinity_slots = self
287                        .get_slots_with_affinity(
288                            base_variables,
289                            element.clone(),
290                            before_clock.clone(),
291                        )
292                        .await?;
293
294                    let before_solutions = self
295                        .resolve_solutions(element.clone(), affinity_slots, false)
296                        .await?;
297                    for (signature, solution) in before_solutions {
298                        before_change_solutions.insert(signature, solution);
299                    }
300
301                    result.before_clock = Some(before_clock);
302                    result.before_anchor_element = Some(element.clone());
303
304                    let affinity_slots = self
305                        .get_slots_with_affinity(base_variables, element.clone(), clock.clone())
306                        .await?;
307
308                    let after_solutions = self
309                        .resolve_solutions(element.clone(), affinity_slots, false)
310                        .await?;
311                    for (signature, solution) in after_solutions {
312                        after_change_solutions.insert(signature, solution);
313                    }
314
315                    result.anchor_element = Some(element);
316                }
317            }
318        }
319
320        for (sig, before_sol) in &before_change_solutions {
321            match after_change_solutions.get(sig) {
322                Some(after_sol) => result.changes.push((
323                    *sig,
324                    QueryPartEvaluationContext::Updating {
325                        before: before_sol.into_query_variables(&self.match_path, base_variables),
326                        after: after_sol.into_query_variables(&self.match_path, base_variables),
327                    },
328                )),
329                None => result.changes.push((
330                    *sig,
331                    QueryPartEvaluationContext::Removing {
332                        before: before_sol.into_query_variables(&self.match_path, base_variables),
333                    },
334                )),
335            }
336        }
337
338        for (sig, after_sol) in &after_change_solutions {
339            if !before_change_solutions.contains_key(sig) {
340                result.changes.push((
341                    *sig,
342                    QueryPartEvaluationContext::Adding {
343                        after: after_sol.into_query_variables(&self.match_path, base_variables),
344                    },
345                ))
346            }
347        }
348
349        Ok(result)
350    }
351
352    async fn resolve_solutions(
353        &self,
354        anchor_element: Arc<Element>,
355        affinity_slots: Vec<usize>,
356        update_index: bool,
357    ) -> Result<HashMap<u64, MatchPathSolution>, EvaluationError> {
358        if update_index {
359            self.element_index
360                .set_element(anchor_element.as_ref(), &affinity_slots)
361                .await?;
362        }
363
364        let mut result = HashMap::new();
365
366        for slot_num in affinity_slots {
367            let solution = self
368                .path_solver
369                .solve(self.match_path.clone(), anchor_element.clone(), slot_num)
370                .await?;
371            result.extend(solution.into_iter());
372        }
373
374        Ok(result)
375    }
376
377    async fn get_slots_with_affinity(
378        &self,
379        variables: &QueryVariables,
380        anchor_element: Arc<Element>,
381        clock: Arc<dyn QueryClock>,
382    ) -> Result<Vec<usize>, EvaluationError> {
383        let context = MatchSolveContext::new(variables, clock);
384
385        let mut affinity_slots = Vec::new();
386
387        for (slot_num, slot) in self.match_path.slots.iter().enumerate() {
388            if self
389                .match_element_to_slot(&context, &slot.spec, anchor_element.clone())
390                .await?
391            {
392                affinity_slots.push(slot_num);
393            }
394        }
395
396        Ok(affinity_slots)
397    }
398
399    async fn match_element_to_slot(
400        &self,
401        context: &MatchSolveContext<'_>,
402        element_spec: &SlotElementSpec,
403        element: Arc<Element>,
404    ) -> Result<bool, EvaluationError> {
405        let metadata = element.get_metadata();
406        let mut label_match = element_spec.labels.is_empty();
407
408        for label in &element_spec.labels {
409            if metadata.labels.contains(label) {
410                label_match = true;
411                break;
412            }
413        }
414
415        if !label_match {
416            return Ok(false);
417        }
418
419        let mut variables = context.variables.clone();
420
421        let element_variable = element.to_expression_variable();
422
423        if element_spec.annotation.is_some() {
424            variables.insert(
425                element_spec
426                    .annotation
427                    .clone()
428                    .unwrap()
429                    .to_string()
430                    .into_boxed_str(),
431                element_variable.clone(),
432            );
433        }
434
435        variables.insert("".into(), element_variable);
436
437        let eval_context = ExpressionEvaluationContext::from_slot(
438            &variables,
439            context.clock.clone(),
440            &metadata.reference,
441        );
442
443        for predicate in &element_spec.predicates {
444            let result = self
445                .expression_evaluator
446                .evaluate_predicate(&eval_context, predicate)
447                .await?;
448            if !result {
449                return Ok(false);
450            }
451        }
452
453        Ok(true)
454    }
455
456    #[tracing::instrument(skip_all, err, level = "debug")]
457    async fn project_solution(
458        &self,
459        part_context: QueryPartEvaluationContext,
460        change_context: &ChangeContext,
461    ) -> Result<Vec<QueryPartEvaluationContext>, EvaluationError> {
462        let mut result = Vec::new();
463        let mut contexts = vec![(part_context, change_context.clone())];
464
465        let mut part_num = 0;
466
467        for part in &self.query.parts {
468            part_num += 1;
469            result.clear();
470
471            for (part_context, change_context) in &contexts {
472                let new_contexts = self
473                    .part_evaluator
474                    .evaluate(part_context.clone(), part_num, part, change_context)
475                    .await?;
476
477                let mut aggregation_results = CollapsedAggregationResults::new();
478
479                new_contexts.into_iter().for_each(|ctx| match &ctx {
480                    QueryPartEvaluationContext::Aggregation { .. } => {
481                        aggregation_results.insert(ctx)
482                    }
483                    QueryPartEvaluationContext::Noop => (),
484                    _ => result.push((ctx, change_context.clone())),
485                });
486
487                for actx in aggregation_results.into_vec_with_context(change_context) {
488                    result.push(actx);
489                }
490            }
491            contexts = result.clone();
492        }
493
494        Ok(result.into_iter().map(|(ctx, _)| ctx).collect())
495    }
496
497    #[tracing::instrument(skip_all, err, level = "debug")]
498    async fn execute_source_middleware(
499        &self,
500        change: SourceChange,
501    ) -> Result<Vec<SourceChange>, MiddlewareError> {
502        let source_id = change.get_reference().source_id.clone();
503        let mut source_changes = vec![change];
504
505        let pipeline = match self.source_pipelines.get(source_id) {
506            Some(pipeline) => pipeline,
507            None => return Ok(source_changes),
508        };
509
510        let mut new_source_changes = Vec::new();
511        for source_change in source_changes {
512            new_source_changes.append(
513                &mut pipeline
514                    .process(source_change, self.element_index.clone())
515                    .await?,
516            );
517        }
518
519        source_changes = new_source_changes;
520
521        Ok(source_changes)
522    }
523
524    pub async fn set_future_consumer(&self, consumer: Arc<dyn FutureQueueConsumer>) {
525        let mut future_queue_task = self.future_queue_task.lock().await;
526        if let Some(c) = future_queue_task.take() {
527            c.abort();
528        }
529
530        let queue = self.future_queue.clone();
531        let shutdown_request = self.future_consumer_shutdown_request.clone();
532
533        let task = tokio::spawn(async move {
534            let idle_interval = Duration::from_secs(1);
535            let error_interval = Duration::from_secs(5);
536            loop {
537                select! {
538                    _ = shutdown_request.notified() => {
539                        log::info!("Future queue consumer shutting down");
540                        break;
541                    }
542                    peek = queue.peek_due_time() => {
543                        let fut_ref = match peek {
544                            Ok(Some(due_time)) => {
545                                if due_time > consumer.now() {
546                                    tokio::time::sleep(idle_interval).await;
547                                    continue;
548                                }
549                                match queue.pop().await {
550                                    Ok(Some(element)) => element,
551                                    Ok(None) => {
552                                        tokio::time::sleep(idle_interval).await;
553                                        continue;
554                                    }
555                                    Err(e) => {
556                                        log::error!("Future queue consumer error: {e:?}");
557                                        tokio::time::sleep(error_interval).await;
558                                        continue;
559                                    }
560                                }
561                            }
562                            Ok(None) => {
563                                tokio::time::sleep(idle_interval).await;
564                                continue;
565                            }
566                            Err(e) => {
567                                log::error!("Future queue consumer error: {e:?}");
568                                tokio::time::sleep(error_interval).await;
569                                continue;
570                            }
571                        };
572
573                        log::info!("Future queue consumer processing {}", &fut_ref.element_ref);
574
575                        match consumer.on_due(&fut_ref).await {
576                            Ok(_) => log::info!("Future queue consumer processed {}", &fut_ref.element_ref),
577                            Err(e) => {
578                                log::error!("Future queue consumer error: {e:?}");
579                                consumer.on_error(&fut_ref, e).await;
580                            }
581                        }
582                    }
583                }
584            }
585        });
586
587        _ = future_queue_task.insert(task);
588    }
589
590    pub async fn terminate_future_consumer(&self) {
591        let mut future_queue_task = self.future_queue_task.lock().await;
592        if let Some(task) = future_queue_task.take() {
593            self.future_consumer_shutdown_request.notify_one();
594            select! {
595                _ = task => {
596                    log::info!("Future queue consumer terminated");
597                }
598                _ = tokio::time::sleep(Duration::from_secs(10)) => {
599                    log::error!("Future queue consumer termination timeout");
600                }
601            }
602        }
603    }
604
605    pub fn get_query(&self) -> Arc<Query> {
606        self.query.clone()
607    }
608}
609
610impl Drop for ContinuousQuery {
611    fn drop(&mut self) {
612        self.future_consumer_shutdown_request.notify_one();
613    }
614}
615
616impl Debug for ContinuousQuery {
617    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
618        f.debug_struct("ContinuousQuery")
619            .field("query", &self.query)
620            .finish()
621    }
622}
623
624struct SolutionChangesResult {
625    pub changes: Vec<(SolutionSignature, QueryPartEvaluationContext)>,
626    pub anchor_element: Option<Arc<Element>>,
627    pub before_clock: Option<Arc<dyn QueryClock>>,
628    pub before_anchor_element: Option<Arc<Element>>,
629    pub is_future_reprocess: bool,
630}
631
632impl SolutionChangesResult {
633    fn new() -> Self {
634        Self {
635            changes: Vec::new(),
636            before_clock: None,
637            anchor_element: None,
638            before_anchor_element: None,
639            is_future_reprocess: false,
640        }
641    }
642}
643
644struct CollapsedAggregationResults {
645    // [hash of after change grouping keys] -> (context, hash of before change grouping keys)
646    data: HashMap<u64, (QueryPartEvaluationContext, u64)>,
647}
648
649impl CollapsedAggregationResults {
650    fn new() -> Self {
651        Self {
652            data: HashMap::new(),
653        }
654    }
655
656    fn insert(&mut self, context: QueryPartEvaluationContext) {
657        if let QueryPartEvaluationContext::Aggregation {
658            before,
659            after,
660            grouping_keys,
661            default_before,
662            default_after,
663        } = context
664        {
665            let after_key = extract_grouping_value_hash(&grouping_keys, &after);
666            let before_key = match &before {
667                Some(before) => extract_grouping_value_hash(&grouping_keys, before),
668                None => after_key,
669            };
670
671            match self.data.remove(&after_key) {
672                Some((existing, before_key)) => {
673                    if let QueryPartEvaluationContext::Aggregation {
674                        before: existing_before,
675                        ..
676                    } = existing
677                    {
678                        self.data.insert(
679                            after_key,
680                            (
681                                QueryPartEvaluationContext::Aggregation {
682                                    before: existing_before,
683                                    default_before,
684                                    default_after,
685                                    after,
686                                    grouping_keys,
687                                },
688                                before_key,
689                            ),
690                        );
691                    }
692                }
693                None => {
694                    self.data.insert(
695                        after_key,
696                        (
697                            QueryPartEvaluationContext::Aggregation {
698                                before,
699                                after,
700                                grouping_keys,
701                                default_before,
702                                default_after,
703                            },
704                            before_key,
705                        ),
706                    );
707                }
708            }
709        }
710    }
711
712    fn into_vec_with_context(
713        self,
714        change_context: &ChangeContext,
715    ) -> Vec<(QueryPartEvaluationContext, ChangeContext)> {
716        self.data
717            .into_iter()
718            .map(|(after_key, (v, before_key))| {
719                let mut change_context = change_context.clone();
720                change_context.before_grouping_hash = before_key;
721                change_context.after_grouping_hash = after_key;
722                (v, change_context)
723            })
724            .collect()
725    }
726
727    fn into_result_vec(self) -> Vec<QueryPartEvaluationContext> {
728        self.data.into_iter().map(|(_, (v, _))| v).collect()
729    }
730}
731
732fn extract_grouping_value_hash(grouping_keys: &Vec<String>, variables: &QueryVariables) -> u64 {
733    let mut hasher = SpookyHasher::default();
734
735    for key in grouping_keys {
736        match variables.get(key.as_str()) {
737            Some(v) => v.hash_for_groupby(&mut hasher),
738            None => 0.hash(&mut hasher),
739        };
740    }
741    hasher.finish()
742}