drasi_core/query/
continuous_query.rs

1// Copyright 2024 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::HashMap,
17    fmt::Debug,
18    hash::{Hash, Hasher},
19    sync::Arc,
20    time::Duration,
21};
22
23use drasi_query_ast::ast::Query;
24use hashers::jenkins::spooky_hash::SpookyHasher;
25use tokio::{
26    select,
27    sync::{Mutex, Notify},
28    task::JoinHandle,
29};
30
31use crate::{
32    evaluation::{
33        context::{ChangeContext, QueryPartEvaluationContext, QueryVariables},
34        EvaluationError, ExpressionEvaluationContext, ExpressionEvaluator, InstantQueryClock,
35        QueryPartEvaluator,
36    },
37    interface::{ElementIndex, FutureQueue, FutureQueueConsumer, MiddlewareError, QueryClock},
38    middleware::SourceMiddlewarePipelineCollection,
39    models::{Element, SourceChange},
40    path_solver::{
41        match_path::{MatchPath, SlotElementSpec},
42        solution::{MatchPathSolution, SolutionSignature},
43        MatchPathSolver, MatchSolveContext,
44    },
45};
46
47pub struct ContinuousQuery {
48    expression_evaluator: Arc<ExpressionEvaluator>,
49    part_evaluator: Arc<QueryPartEvaluator>,
50    element_index: Arc<dyn ElementIndex>,
51    path_solver: Arc<MatchPathSolver>,
52    match_path: Arc<MatchPath>,
53    query: Arc<Query>,
54    future_consumer_shutdown_request: Arc<Notify>,
55    future_queue: Arc<dyn FutureQueue>,
56    future_queue_task: Mutex<Option<JoinHandle<()>>>,
57    change_lock: Mutex<()>,
58    source_pipelines: SourceMiddlewarePipelineCollection,
59}
60
61impl ContinuousQuery {
62    #[allow(clippy::too_many_arguments)]
63    pub fn new(
64        query: Arc<Query>,
65        match_path: Arc<MatchPath>,
66        expression_evaluator: Arc<ExpressionEvaluator>,
67        element_index: Arc<dyn ElementIndex>,
68        path_solver: Arc<MatchPathSolver>,
69        part_evaluator: Arc<QueryPartEvaluator>,
70        future_queue: Arc<dyn FutureQueue>,
71        source_pipelines: SourceMiddlewarePipelineCollection,
72    ) -> Self {
73        Self {
74            expression_evaluator,
75            element_index,
76            path_solver,
77            match_path,
78            part_evaluator,
79            query,
80            future_consumer_shutdown_request: Arc::new(Notify::new()),
81            future_queue,
82            future_queue_task: Mutex::new(None),
83            change_lock: Mutex::new(()),
84            source_pipelines,
85        }
86    }
87
88    #[tracing::instrument(skip_all, err)]
89    pub async fn process_source_change(
90        &self,
91        change: SourceChange,
92    ) -> Result<Vec<QueryPartEvaluationContext>, EvaluationError> {
93        //println!("-> process_source_change {:?}", change);
94        let _lock = self.change_lock.lock().await;
95        let mut result = Vec::new();
96
97        let changes = self.execute_source_middleware(change).await?;
98
99        for change in changes {
100            let base_variables = QueryVariables::new(); //todo: get query parameters
101            let after_clock = Arc::new(InstantQueryClock::from_source_change(&change));
102
103            let solution_changes = self
104                .build_solution_changes(&base_variables, change, after_clock.clone())
105                .await?;
106            let before_clock = match solution_changes.before_clock {
107                Some(before_clock) => before_clock,
108                None => after_clock.clone(),
109            };
110
111            let mut aggregation_results = CollapsedAggregationResults::new();
112
113            for (solution_signature, part_context) in solution_changes.changes {
114                let change_results = self
115                    .project_solution(
116                        part_context,
117                        &ChangeContext {
118                            solution_signature,
119                            before_clock: before_clock.clone(),
120                            after_clock: after_clock.clone(),
121                            before_anchor_element: solution_changes.before_anchor_element.clone(),
122                            after_anchor_element: solution_changes.anchor_element.clone(),
123                            is_future_reprocess: solution_changes.is_future_reprocess,
124                            before_grouping_hash: solution_signature,
125                            after_grouping_hash: solution_signature,
126                        },
127                    )
128                    .await?;
129                change_results.into_iter().for_each(|ctx| {
130                    match &ctx {
131                        QueryPartEvaluationContext::Aggregation { before, after, .. } => {
132                            if let Some(before) = before {
133                                if before == after {
134                                    return;
135                                }
136                            }
137
138                            aggregation_results.insert(ctx);
139                        }
140                        QueryPartEvaluationContext::Updating { before, after } => {
141                            if before == after {
142                                return;
143                            }
144                            result.push(ctx);
145                        }
146                        _ => result.push(ctx),
147                    };
148                });
149            }
150
151            for ctx in aggregation_results.into_result_vec() {
152                result.push(ctx);
153            }
154        }
155        //println!("--> process_source_change result {:?}", result);
156        Ok(result)
157    }
158
159    #[tracing::instrument(skip_all, err)]
160    async fn build_solution_changes(
161        &self,
162        base_variables: &QueryVariables,
163        change: SourceChange,
164        clock: Arc<dyn QueryClock>,
165    ) -> Result<SolutionChangesResult, EvaluationError> {
166        let mut result = SolutionChangesResult::new();
167        let mut before_change_solutions = HashMap::new();
168        let mut after_change_solutions = HashMap::new();
169
170        match change {
171            SourceChange::Insert { element } => {
172                let element = Arc::new(element);
173                let solutions = self
174                    .resolve_solutions(base_variables, element.clone(), clock.clone(), true)
175                    .await?;
176
177                for (signature, solution) in solutions {
178                    after_change_solutions.insert(signature, solution);
179                }
180
181                result.anchor_element = Some(element);
182            }
183            SourceChange::Update { mut element } => {
184                if let Some(prev_version) = self
185                    .element_index
186                    .get_element(element.get_reference())
187                    .await?
188                {
189                    let prev_timestamp = prev_version.get_effective_from();
190                    let before_clock =
191                        Arc::new(InstantQueryClock::new(prev_timestamp, clock.get_realtime()));
192                    let solutions = self
193                        .resolve_solutions(
194                            base_variables,
195                            prev_version.clone(),
196                            before_clock.clone(),
197                            false,
198                        )
199                        .await?;
200                    for (signature, solution) in solutions {
201                        before_change_solutions.insert(signature, solution);
202                    }
203                    element.merge_missing_properties(prev_version.as_ref());
204                    result.before_clock = Some(before_clock);
205                    result.before_anchor_element = Some(prev_version);
206                }
207
208                let element = Arc::new(element);
209                let solutions = self
210                    .resolve_solutions(base_variables, element.clone(), clock.clone(), true)
211                    .await?;
212
213                for (signature, solution) in solutions {
214                    after_change_solutions.insert(signature, solution);
215                }
216
217                result.anchor_element = Some(element);
218            }
219            SourceChange::Delete { metadata } => {
220                if let Some(element) = self.element_index.get_element(&metadata.reference).await? {
221                    let prev_timestamp = element.get_effective_from();
222                    let before_clock =
223                        Arc::new(InstantQueryClock::new(prev_timestamp, clock.get_realtime()));
224                    let solutions = self
225                        .resolve_solutions(
226                            base_variables,
227                            element.clone(),
228                            before_clock.clone(),
229                            false,
230                        )
231                        .await?;
232                    for (signature, solution) in solutions {
233                        before_change_solutions.insert(signature, solution);
234                    }
235                    result.before_clock = Some(before_clock);
236                    result.before_anchor_element = Some(element);
237
238                    match self.element_index.delete_element(&metadata.reference).await {
239                        Ok(_) => {}
240                        Err(e) => return Err(EvaluationError::from(e)),
241                    }
242                }
243            }
244            SourceChange::Future { future_ref } => {
245                result.is_future_reprocess = true;
246                if let Some(element) = self
247                    .element_index
248                    .get_element(&future_ref.element_ref)
249                    .await?
250                {
251                    let prev_timestamp = element.get_effective_from();
252                    if prev_timestamp >= future_ref.due_time {
253                        // element already processed with due time expired, don't duplicate
254                        return Ok(result);
255                    }
256
257                    let before_clock =
258                        Arc::new(InstantQueryClock::new(prev_timestamp, prev_timestamp));
259
260                    let before_solutions = self
261                        .resolve_solutions(
262                            base_variables,
263                            element.clone(),
264                            before_clock.clone(),
265                            false,
266                        )
267                        .await?;
268                    for (signature, solution) in before_solutions {
269                        before_change_solutions.insert(signature, solution);
270                    }
271
272                    result.before_clock = Some(before_clock);
273                    result.before_anchor_element = Some(element.clone());
274
275                    let after_solutions = self
276                        .resolve_solutions(base_variables, element.clone(), clock.clone(), false)
277                        .await?;
278                    for (signature, solution) in after_solutions {
279                        after_change_solutions.insert(signature, solution);
280                    }
281
282                    result.anchor_element = Some(element);
283                }
284            }
285        }
286
287        for (sig, before_sol) in &before_change_solutions {
288            match after_change_solutions.get(sig) {
289                Some(after_sol) => result.changes.push((
290                    *sig,
291                    QueryPartEvaluationContext::Updating {
292                        before: before_sol.into_query_variables(&self.match_path, base_variables),
293                        after: after_sol.into_query_variables(&self.match_path, base_variables),
294                    },
295                )),
296                None => result.changes.push((
297                    *sig,
298                    QueryPartEvaluationContext::Removing {
299                        before: before_sol.into_query_variables(&self.match_path, base_variables),
300                    },
301                )),
302            }
303        }
304
305        for (sig, after_sol) in &after_change_solutions {
306            if !before_change_solutions.contains_key(sig) {
307                result.changes.push((
308                    *sig,
309                    QueryPartEvaluationContext::Adding {
310                        after: after_sol.into_query_variables(&self.match_path, base_variables),
311                    },
312                ))
313            }
314        }
315
316        Ok(result)
317    }
318
319    async fn resolve_solutions(
320        &self,
321        variables: &QueryVariables,
322        anchor_element: Arc<Element>,
323        clock: Arc<dyn QueryClock>,
324        update_index: bool,
325    ) -> Result<HashMap<u64, MatchPathSolution>, EvaluationError> {
326        let context = MatchSolveContext::new(variables, clock);
327
328        let mut affinity_slots = Vec::new();
329
330        for (slot_num, slot) in self.match_path.slots.iter().enumerate() {
331            if self
332                .match_element_to_slot(&context, &slot.spec, anchor_element.clone())
333                .await?
334            {
335                affinity_slots.push(slot_num);
336            }
337        }
338
339        if update_index {
340            self.element_index
341                .set_element(anchor_element.as_ref(), &affinity_slots)
342                .await?;
343        }
344
345        let mut result = HashMap::new();
346
347        for slot_num in affinity_slots {
348            let solution = self
349                .path_solver
350                .solve(self.match_path.clone(), anchor_element.clone(), slot_num)
351                .await?;
352            result.extend(solution.into_iter());
353        }
354
355        Ok(result)
356    }
357
358    async fn match_element_to_slot(
359        &self,
360        context: &MatchSolveContext<'_>,
361        element_spec: &SlotElementSpec,
362        element: Arc<Element>,
363    ) -> Result<bool, EvaluationError> {
364        let metadata = element.get_metadata();
365        let mut label_match = element_spec.labels.is_empty();
366
367        for label in &element_spec.labels {
368            if metadata.labels.contains(label) {
369                label_match = true;
370                break;
371            }
372        }
373
374        if !label_match {
375            return Ok(false);
376        }
377
378        let mut variables = context.variables.clone();
379
380        let element_variable = element.to_expression_variable();
381
382        if element_spec.annotation.is_some() {
383            variables.insert(
384                element_spec
385                    .annotation
386                    .clone()
387                    .unwrap()
388                    .to_string()
389                    .into_boxed_str(),
390                element_variable.clone(),
391            );
392        }
393
394        variables.insert("".into(), element_variable);
395
396        let eval_context = ExpressionEvaluationContext::new(&variables, context.clock.clone());
397
398        for predicate in &element_spec.predicates {
399            let result = self
400                .expression_evaluator
401                .evaluate_predicate(&eval_context, predicate)
402                .await?;
403            if !result {
404                return Ok(false);
405            }
406        }
407
408        Ok(true)
409    }
410
411    #[tracing::instrument(skip_all, err)]
412    async fn project_solution(
413        &self,
414        part_context: QueryPartEvaluationContext,
415        change_context: &ChangeContext,
416    ) -> Result<Vec<QueryPartEvaluationContext>, EvaluationError> {
417        let mut result = Vec::new();
418        let mut contexts = vec![(part_context, change_context.clone())];
419
420        let mut part_num = 0;
421
422        for part in &self.query.parts {
423            part_num += 1;
424            result.clear();
425
426            for (part_context, change_context) in &contexts {
427                let new_contexts = self
428                    .part_evaluator
429                    .evaluate(part_context.clone(), part_num, part, change_context)
430                    .await?;
431
432                let mut aggregation_results = CollapsedAggregationResults::new();
433
434                new_contexts.into_iter().for_each(|ctx| match &ctx {
435                    QueryPartEvaluationContext::Aggregation { .. } => {
436                        aggregation_results.insert(ctx)
437                    }
438                    QueryPartEvaluationContext::Noop => (),
439                    _ => result.push((ctx, change_context.clone())),
440                });
441
442                for actx in aggregation_results.into_vec_with_context(change_context) {
443                    result.push(actx);
444                }
445            }
446            contexts = result.clone();
447        }
448
449        Ok(result.into_iter().map(|(ctx, _)| ctx).collect())
450    }
451
452    #[tracing::instrument(skip_all, err)]
453    async fn execute_source_middleware(
454        &self,
455        change: SourceChange,
456    ) -> Result<Vec<SourceChange>, MiddlewareError> {
457        let source_id = change.get_reference().source_id.clone();
458        let mut source_changes = vec![change];
459
460        let pipeline = match self.source_pipelines.get(source_id) {
461            Some(pipeline) => pipeline,
462            None => return Ok(source_changes),
463        };
464
465        let mut new_source_changes = Vec::new();
466        for source_change in source_changes {
467            new_source_changes.append(&mut pipeline.process(source_change).await?);
468        }
469
470        source_changes = new_source_changes;
471
472        Ok(source_changes)
473    }
474
475    pub async fn set_future_consumer(&self, consumer: Arc<dyn FutureQueueConsumer>) {
476        let mut future_queue_task = self.future_queue_task.lock().await;
477        if let Some(c) = future_queue_task.take() {
478            c.abort();
479        }
480
481        let queue = self.future_queue.clone();
482        let shutdown_request = self.future_consumer_shutdown_request.clone();
483
484        let task = tokio::spawn(async move {
485            let idle_interval = Duration::from_secs(1);
486            let error_interval = Duration::from_secs(5);
487            loop {
488                select! {
489                    _ = shutdown_request.notified() => {
490                        log::info!("Future queue consumer shutting down");
491                        break;
492                    }
493                    peek = queue.peek_due_time() => {
494                        let fut_ref = match peek {
495                            Ok(Some(due_time)) => {
496                                if due_time > consumer.now() {
497                                    tokio::time::sleep(idle_interval).await;
498                                    continue;
499                                }
500                                match queue.pop().await {
501                                    Ok(Some(element)) => element,
502                                    Ok(None) => {
503                                        tokio::time::sleep(idle_interval).await;
504                                        continue;
505                                    }
506                                    Err(e) => {
507                                        log::error!("Future queue consumer error: {:?}", e);
508                                        tokio::time::sleep(error_interval).await;
509                                        continue;
510                                    }
511                                }
512                            }
513                            Ok(None) => {
514                                tokio::time::sleep(idle_interval).await;
515                                continue;
516                            }
517                            Err(e) => {
518                                log::error!("Future queue consumer error: {:?}", e);
519                                tokio::time::sleep(error_interval).await;
520                                continue;
521                            }
522                        };
523
524                        log::info!("Future queue consumer processing {}", &fut_ref.element_ref);
525
526                        match consumer.on_due(&fut_ref).await {
527                            Ok(_) => log::info!("Future queue consumer processed {}", &fut_ref.element_ref),
528                            Err(e) => {
529                                log::error!("Future queue consumer error: {:?}", e);
530                                consumer.on_error(&fut_ref, e).await;
531                            }
532                        }
533                    }
534                }
535            }
536        });
537
538        _ = future_queue_task.insert(task);
539    }
540
541    pub async fn terminate_future_consumer(&self) {
542        let mut future_queue_task = self.future_queue_task.lock().await;
543        if let Some(task) = future_queue_task.take() {
544            self.future_consumer_shutdown_request.notify_one();
545            select! {
546                _ = task => {
547                    log::info!("Future queue consumer terminated");
548                }
549                _ = tokio::time::sleep(Duration::from_secs(10)) => {
550                    log::error!("Future queue consumer termination timeout");
551                }
552            }
553        }
554    }
555
556    pub fn get_query(&self) -> Arc<Query> {
557        self.query.clone()
558    }
559}
560
561impl Drop for ContinuousQuery {
562    fn drop(&mut self) {
563        self.future_consumer_shutdown_request.notify_one();
564    }
565}
566
567impl Debug for ContinuousQuery {
568    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
569        f.debug_struct("ContinuousQuery")
570            .field("query", &self.query)
571            .finish()
572    }
573}
574
575struct SolutionChangesResult {
576    pub changes: Vec<(SolutionSignature, QueryPartEvaluationContext)>,
577    pub anchor_element: Option<Arc<Element>>,
578    pub before_clock: Option<Arc<dyn QueryClock>>,
579    pub before_anchor_element: Option<Arc<Element>>,
580    pub is_future_reprocess: bool,
581}
582
583impl SolutionChangesResult {
584    fn new() -> Self {
585        Self {
586            changes: Vec::new(),
587            before_clock: None,
588            anchor_element: None,
589            before_anchor_element: None,
590            is_future_reprocess: false,
591        }
592    }
593}
594
595struct CollapsedAggregationResults {
596    // [hash of after change grouping keys] -> (context, hash of before change grouping keys)
597    data: HashMap<u64, (QueryPartEvaluationContext, u64)>,
598}
599
600impl CollapsedAggregationResults {
601    fn new() -> Self {
602        Self {
603            data: HashMap::new(),
604        }
605    }
606
607    fn insert(&mut self, context: QueryPartEvaluationContext) {
608        if let QueryPartEvaluationContext::Aggregation {
609            before,
610            after,
611            grouping_keys,
612            default_before,
613            default_after,
614        } = context
615        {
616            let after_key = extract_grouping_value_hash(&grouping_keys, &after);
617            let before_key = match &before {
618                Some(before) => extract_grouping_value_hash(&grouping_keys, before),
619                None => after_key,
620            };
621
622            match self.data.remove(&after_key) {
623                Some((existing, before_key)) => {
624                    if let QueryPartEvaluationContext::Aggregation {
625                        before: existing_before,
626                        ..
627                    } = existing
628                    {
629                        self.data.insert(
630                            after_key,
631                            (
632                                QueryPartEvaluationContext::Aggregation {
633                                    before: existing_before,
634                                    default_before,
635                                    default_after,
636                                    after,
637                                    grouping_keys,
638                                },
639                                before_key,
640                            ),
641                        );
642                    }
643                }
644                None => {
645                    self.data.insert(
646                        after_key,
647                        (
648                            QueryPartEvaluationContext::Aggregation {
649                                before,
650                                after,
651                                grouping_keys,
652                                default_before,
653                                default_after,
654                            },
655                            before_key,
656                        ),
657                    );
658                }
659            }
660        }
661    }
662
663    fn into_vec_with_context(
664        self,
665        change_context: &ChangeContext,
666    ) -> Vec<(QueryPartEvaluationContext, ChangeContext)> {
667        self.data
668            .into_iter()
669            .map(|(after_key, (v, before_key))| {
670                let mut change_context = change_context.clone();
671                change_context.before_grouping_hash = before_key;
672                change_context.after_grouping_hash = after_key;
673                (v, change_context)
674            })
675            .collect()
676    }
677
678    fn into_result_vec(self) -> Vec<QueryPartEvaluationContext> {
679        self.data.into_iter().map(|(_, (v, _))| v).collect()
680    }
681}
682
683fn extract_grouping_value_hash(grouping_keys: &Vec<String>, variables: &QueryVariables) -> u64 {
684    let mut hasher = SpookyHasher::default();
685
686    for key in grouping_keys {
687        match variables.get(key.as_str()) {
688            Some(v) => v.hash_for_groupby(&mut hasher),
689            None => 0.hash(&mut hasher),
690        };
691    }
692    hasher.finish()
693}