Skip to main content

drasi_core/evaluation/
context.rs

1// Copyright 2024 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use drasi_query_ast::ast::{self, ProjectionClause, QueryPart};
16use hashers::jenkins::spooky_hash::SpookyHasher;
17use std::collections::BTreeMap;
18use std::hash::{Hash, Hasher};
19use std::sync::Arc;
20
21use crate::evaluation::variable_value::VariableValue;
22use crate::interface::QueryClock;
23use crate::models::{Element, ElementReference, ElementTimestamp};
24use crate::path_solver::solution::SolutionSignature;
25
26pub type QueryVariables = BTreeMap<Box<str>, VariableValue>;
27
28#[derive(Debug, Clone)]
29pub enum SideEffects {
30    Apply,
31    RevertForUpdate,
32    RevertForDelete,
33    Snapshot,
34}
35
36#[derive(Debug, Clone, PartialEq)]
37pub enum QueryPartEvaluationContext {
38    Adding {
39        after: QueryVariables,
40        row_signature: u64,
41    },
42    Updating {
43        before: QueryVariables,
44        after: QueryVariables,
45        row_signature: u64,
46    },
47    Removing {
48        before: QueryVariables,
49        row_signature: u64,
50    },
51    Aggregation {
52        before: Option<QueryVariables>,
53        after: QueryVariables,
54        grouping_keys: Vec<String>,
55        default_before: bool,
56        default_after: bool,
57        row_signature: u64,
58    },
59    Noop,
60}
61
62impl QueryPartEvaluationContext {
63    pub fn row_signature(&self) -> u64 {
64        match self {
65            Self::Adding { row_signature, .. }
66            | Self::Updating { row_signature, .. }
67            | Self::Removing { row_signature, .. }
68            | Self::Aggregation { row_signature, .. } => *row_signature,
69            Self::Noop => 0,
70        }
71    }
72
73    /// Compares all fields except `row_signature`. Used by test helpers to assert
74    /// on result data without needing to predict internal hash values.
75    pub fn data_eq(&self, other: &Self) -> bool {
76        match (self, other) {
77            (Self::Adding { after: a, .. }, Self::Adding { after: b, .. }) => a == b,
78            (
79                Self::Updating {
80                    before: ab,
81                    after: aa,
82                    ..
83                },
84                Self::Updating {
85                    before: bb,
86                    after: ba,
87                    ..
88                },
89            ) => ab == bb && aa == ba,
90            (Self::Removing { before: a, .. }, Self::Removing { before: b, .. }) => a == b,
91            (
92                Self::Aggregation {
93                    before: ab,
94                    after: aa,
95                    grouping_keys: ag,
96                    default_before: adb,
97                    default_after: ada,
98                    ..
99                },
100                Self::Aggregation {
101                    before: bb,
102                    after: ba,
103                    grouping_keys: bg,
104                    default_before: bdb,
105                    default_after: bda,
106                    ..
107                },
108            ) => ab == bb && aa == ba && ag == bg && adb == bdb && ada == bda,
109            (Self::Noop, Self::Noop) => true,
110            _ => false,
111        }
112    }
113}
114
115#[derive(Debug, Clone)]
116pub struct ExpressionEvaluationContext<'a> {
117    variables: &'a QueryVariables,
118    side_effects: SideEffects,
119    output_grouping_key: Option<&'a Vec<ast::Expression>>,
120    input_grouping_hash: u64,
121    clock: Arc<dyn QueryClock>,
122    solution_signature: Option<SolutionSignature>,
123    anchor_element: Option<Arc<Element>>,
124}
125
126impl<'a> ExpressionEvaluationContext<'a> {
127    pub fn new(
128        variables: &'a QueryVariables,
129        clock: Arc<dyn QueryClock>,
130    ) -> ExpressionEvaluationContext<'a> {
131        ExpressionEvaluationContext {
132            variables,
133            side_effects: SideEffects::Apply,
134            output_grouping_key: None,
135            input_grouping_hash: u64::default(),
136            clock,
137            solution_signature: None,
138            anchor_element: None,
139        }
140    }
141
142    pub fn from_slot(
143        variables: &'a QueryVariables,
144        clock: Arc<dyn QueryClock>,
145        element_reference: &ElementReference,
146    ) -> ExpressionEvaluationContext<'a> {
147        ExpressionEvaluationContext {
148            variables,
149            side_effects: SideEffects::Apply,
150            output_grouping_key: None,
151            input_grouping_hash: extract_element_reference_hash(element_reference),
152            clock,
153            solution_signature: None,
154            anchor_element: None,
155        }
156    }
157
158    pub fn from_before_change(
159        variables: &'a QueryVariables,
160        side_effect_directive: SideEffects,
161        change_context: &ChangeContext,
162        query_part: &'a QueryPart,
163    ) -> ExpressionEvaluationContext<'a> {
164        ExpressionEvaluationContext {
165            variables,
166            side_effects: side_effect_directive,
167            input_grouping_hash: change_context.before_grouping_hash,
168            clock: change_context.before_clock.clone(),
169            solution_signature: Some(change_context.solution_signature),
170            anchor_element: change_context.before_anchor_element.clone(),
171            output_grouping_key: match &query_part.return_clause {
172                ProjectionClause::GroupBy { grouping, .. } => Some(grouping),
173                _ => None,
174            },
175        }
176    }
177
178    pub fn from_after_change(
179        variables: &'a QueryVariables,
180        change_context: &ChangeContext,
181        query_part: &'a QueryPart,
182    ) -> ExpressionEvaluationContext<'a> {
183        ExpressionEvaluationContext {
184            variables,
185            side_effects: SideEffects::Apply,
186            input_grouping_hash: change_context.after_grouping_hash,
187            clock: change_context.after_clock.clone(),
188            solution_signature: Some(change_context.solution_signature),
189            anchor_element: change_context.after_anchor_element.clone(),
190            output_grouping_key: match &query_part.return_clause {
191                ProjectionClause::GroupBy { grouping, .. } => Some(grouping),
192                _ => None,
193            },
194        }
195    }
196
197    pub fn replace_variables(&mut self, new_data: &'a QueryVariables) {
198        self.variables = new_data;
199    }
200
201    pub fn get_variable(&self, name: Arc<str>) -> Option<&VariableValue> {
202        self.variables.get(&name.to_string().into_boxed_str())
203    }
204
205    pub fn clone_variables(&self) -> QueryVariables {
206        self.variables.clone()
207    }
208
209    pub fn set_side_effects(&mut self, directive: SideEffects) {
210        self.side_effects = directive;
211    }
212
213    pub fn get_side_effects(&self) -> &SideEffects {
214        &self.side_effects
215    }
216
217    pub fn set_output_grouping_key(&mut self, grouping_key: &'a Vec<ast::Expression>) {
218        self.output_grouping_key = Some(grouping_key);
219    }
220
221    pub fn get_output_grouping_key(&self) -> Option<&Vec<ast::Expression>> {
222        self.output_grouping_key
223    }
224
225    pub fn get_transaction_time(&self) -> ElementTimestamp {
226        self.clock.get_transaction_time()
227    }
228
229    pub fn get_realtime(&self) -> ElementTimestamp {
230        self.clock.get_realtime()
231    }
232
233    pub fn get_clock(&self) -> Arc<dyn QueryClock> {
234        self.clock.clone()
235    }
236
237    pub fn get_solution_signature(&self) -> Option<SolutionSignature> {
238        self.solution_signature
239    }
240
241    pub fn get_anchor_element(&self) -> Option<Arc<Element>> {
242        self.anchor_element.clone()
243    }
244
245    pub fn get_input_grouping_hash(&self) -> u64 {
246        self.input_grouping_hash
247    }
248}
249
250#[derive(Debug, Clone)]
251pub struct ChangeContext {
252    pub solution_signature: SolutionSignature,
253    pub before_anchor_element: Option<Arc<Element>>,
254    pub after_anchor_element: Option<Arc<Element>>,
255    pub before_clock: Arc<dyn QueryClock>,
256    pub after_clock: Arc<dyn QueryClock>,
257    pub is_future_reprocess: bool,
258    pub before_grouping_hash: u64,
259    pub after_grouping_hash: u64,
260}
261
262fn extract_element_reference_hash(element_reference: &ElementReference) -> u64 {
263    let mut hasher = SpookyHasher::default();
264    element_reference.source_id.hash(&mut hasher);
265    element_reference.element_id.hash(&mut hasher);
266    hasher.finish()
267}