drasi_core/interface/
result_index.rs

1// Copyright 2024 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use crate::evaluation::{context::QueryVariables, variable_value::VariableValue};
18use async_trait::async_trait;
19use ordered_float::OrderedFloat;
20
21use crate::evaluation::functions::aggregation::ValueAccumulator;
22
23use super::IndexError;
24
25pub trait ResultIndex: AccumulatorIndex + ResultSequenceCounter {}
26
27#[async_trait]
28pub trait AccumulatorIndex: LazySortedSetStore {
29    async fn clear(&self) -> Result<(), IndexError>;
30    async fn get(
31        &self,
32        key: &ResultKey,
33        owner: &ResultOwner,
34    ) -> Result<Option<ValueAccumulator>, IndexError>;
35    async fn set(
36        &self,
37        key: ResultKey,
38        owner: ResultOwner,
39        value: Option<ValueAccumulator>,
40    ) -> Result<(), IndexError>;
41}
42
43#[async_trait]
44pub trait LazySortedSetStore: Send + Sync {
45    async fn get_next(
46        &self,
47        set_id: u64,
48        value: Option<OrderedFloat<f64>>,
49    ) -> Result<Option<(OrderedFloat<f64>, isize)>, IndexError>;
50    async fn get_value_count(
51        &self,
52        set_id: u64,
53        value: OrderedFloat<f64>,
54    ) -> Result<isize, IndexError>;
55    async fn increment_value_count(
56        &self,
57        set_id: u64,
58        value: OrderedFloat<f64>,
59        delta: isize,
60    ) -> Result<(), IndexError>;
61}
62
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub struct ResultSequence {
65    pub sequence: u64,
66    pub source_change_id: Arc<str>,
67}
68
69impl Default for ResultSequence {
70    fn default() -> Self {
71        ResultSequence {
72            sequence: 0,
73            source_change_id: Arc::from(""),
74        }
75    }
76}
77
78#[async_trait]
79pub trait ResultSequenceCounter: Send + Sync {
80    async fn apply_sequence(&self, sequence: u64, source_change_id: &str)
81        -> Result<(), IndexError>;
82    async fn get_sequence(&self) -> Result<ResultSequence, IndexError>;
83}
84
85#[derive(Debug, Clone, PartialEq, Hash)]
86pub enum ResultOwner {
87    Function(usize),
88    PartCurrent(usize),
89    PartDefault(usize),
90}
91
92#[derive(Debug, Clone, PartialEq)]
93pub enum ResultKey {
94    GroupBy(Arc<Vec<VariableValue>>),
95    InputHash(u64),
96}
97
98impl ResultKey {
99    pub fn groupby_from_variables(keys: &[String], variables: &QueryVariables) -> ResultKey {
100        let mut grouping_keys = Vec::new();
101        for key in keys.iter() {
102            grouping_keys.push(
103                variables
104                    .get(key.as_str())
105                    .unwrap_or(&VariableValue::Null)
106                    .clone(),
107            );
108        }
109        ResultKey::GroupBy(Arc::new(grouping_keys))
110    }
111}
112
113impl std::hash::Hash for ResultKey {
114    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
115        match self {
116            ResultKey::GroupBy(grouping_keys) => {
117                for key in grouping_keys.iter() {
118                    key.hash_for_groupby(state);
119                }
120            }
121            ResultKey::InputHash(hash) => {
122                hash.hash(state);
123            }
124        }
125    }
126}