Skip to main content

drasi_core/interface/
result_index.rs

1// Copyright 2024 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use crate::{
18    evaluation::{context::QueryVariables, variable_value::VariableValue},
19    models::ElementReference,
20};
21use async_trait::async_trait;
22use ordered_float::OrderedFloat;
23
24use crate::evaluation::functions::aggregation::ValueAccumulator;
25
26use super::IndexError;
27
28pub trait ResultIndex: AccumulatorIndex + ResultSequenceCounter {}
29
30#[async_trait]
31pub trait AccumulatorIndex: LazySortedSetStore {
32    async fn clear(&self) -> Result<(), IndexError>;
33    async fn get(
34        &self,
35        key: &ResultKey,
36        owner: &ResultOwner,
37    ) -> Result<Option<ValueAccumulator>, IndexError>;
38    async fn set(
39        &self,
40        key: ResultKey,
41        owner: ResultOwner,
42        value: Option<ValueAccumulator>,
43    ) -> Result<(), IndexError>;
44}
45
46#[async_trait]
47pub trait LazySortedSetStore: Send + Sync {
48    async fn get_next(
49        &self,
50        set_id: u64,
51        value: Option<OrderedFloat<f64>>,
52    ) -> Result<Option<(OrderedFloat<f64>, isize)>, IndexError>;
53    async fn get_value_count(
54        &self,
55        set_id: u64,
56        value: OrderedFloat<f64>,
57    ) -> Result<isize, IndexError>;
58    async fn increment_value_count(
59        &self,
60        set_id: u64,
61        value: OrderedFloat<f64>,
62        delta: isize,
63    ) -> Result<(), IndexError>;
64}
65
66#[derive(Debug, Clone, PartialEq, Eq)]
67pub struct ResultSequence {
68    pub sequence: u64,
69    pub source_change_id: Arc<str>,
70}
71
72impl Default for ResultSequence {
73    fn default() -> Self {
74        ResultSequence {
75            sequence: 0,
76            source_change_id: Arc::from(""),
77        }
78    }
79}
80
81/// Tracks the monotonic output sequence number for a query's result stream.
82///
83/// Each time a query emits a result batch, the sequence is incremented and
84/// persisted so that downstream consumers (reactions) can detect ordering
85/// and gaps. This is separate from the source checkpoint tracking in
86/// [`CheckpointStore`](super::CheckpointStore).
87#[async_trait]
88pub trait ResultSequenceCounter: Send + Sync {
89    async fn apply_sequence(&self, sequence: u64, source_change_id: &str)
90        -> Result<(), IndexError>;
91    async fn get_sequence(&self) -> Result<ResultSequence, IndexError>;
92}
93
94#[derive(Debug, Clone, PartialEq, Hash)]
95pub enum ResultOwner {
96    Function(usize),
97    PartCurrent(usize),
98    PartDefault(usize),
99}
100
101#[derive(Debug, Clone, PartialEq)]
102pub enum ResultKey {
103    GroupBy(Arc<Vec<VariableValue>>),
104    InputHash(u64),
105    Element(ElementReference),
106}
107
108impl ResultKey {
109    pub fn groupby_from_variables(keys: &[String], variables: &QueryVariables) -> ResultKey {
110        let mut grouping_keys = Vec::new();
111        for key in keys.iter() {
112            grouping_keys.push(
113                variables
114                    .get(key.as_str())
115                    .unwrap_or(&VariableValue::Null)
116                    .clone(),
117            );
118        }
119        ResultKey::GroupBy(Arc::new(grouping_keys))
120    }
121}
122
123impl std::hash::Hash for ResultKey {
124    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
125        match self {
126            ResultKey::GroupBy(grouping_keys) => {
127                for key in grouping_keys.iter() {
128                    key.hash_for_groupby(state);
129                }
130            }
131            ResultKey::InputHash(hash) => {
132                hash.hash(state);
133            }
134            ResultKey::Element(element_reference) => {
135                element_reference.hash(state);
136            }
137        }
138    }
139}