drasi_core/interface/
result_index.rs

1// Copyright 2024 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use crate::{
18    evaluation::{context::QueryVariables, variable_value::VariableValue},
19    models::ElementReference,
20};
21use async_trait::async_trait;
22use ordered_float::OrderedFloat;
23
24use crate::evaluation::functions::aggregation::ValueAccumulator;
25
26use super::IndexError;
27
28pub trait ResultIndex: AccumulatorIndex + ResultSequenceCounter {}
29
30#[async_trait]
31pub trait AccumulatorIndex: LazySortedSetStore {
32    async fn clear(&self) -> Result<(), IndexError>;
33    async fn get(
34        &self,
35        key: &ResultKey,
36        owner: &ResultOwner,
37    ) -> Result<Option<ValueAccumulator>, IndexError>;
38    async fn set(
39        &self,
40        key: ResultKey,
41        owner: ResultOwner,
42        value: Option<ValueAccumulator>,
43    ) -> Result<(), IndexError>;
44}
45
46#[async_trait]
47pub trait LazySortedSetStore: Send + Sync {
48    async fn get_next(
49        &self,
50        set_id: u64,
51        value: Option<OrderedFloat<f64>>,
52    ) -> Result<Option<(OrderedFloat<f64>, isize)>, IndexError>;
53    async fn get_value_count(
54        &self,
55        set_id: u64,
56        value: OrderedFloat<f64>,
57    ) -> Result<isize, IndexError>;
58    async fn increment_value_count(
59        &self,
60        set_id: u64,
61        value: OrderedFloat<f64>,
62        delta: isize,
63    ) -> Result<(), IndexError>;
64}
65
66#[derive(Debug, Clone, PartialEq, Eq)]
67pub struct ResultSequence {
68    pub sequence: u64,
69    pub source_change_id: Arc<str>,
70}
71
72impl Default for ResultSequence {
73    fn default() -> Self {
74        ResultSequence {
75            sequence: 0,
76            source_change_id: Arc::from(""),
77        }
78    }
79}
80
81#[async_trait]
82pub trait ResultSequenceCounter: Send + Sync {
83    async fn apply_sequence(&self, sequence: u64, source_change_id: &str)
84        -> Result<(), IndexError>;
85    async fn get_sequence(&self) -> Result<ResultSequence, IndexError>;
86}
87
88#[derive(Debug, Clone, PartialEq, Hash)]
89pub enum ResultOwner {
90    Function(usize),
91    PartCurrent(usize),
92    PartDefault(usize),
93}
94
95#[derive(Debug, Clone, PartialEq)]
96pub enum ResultKey {
97    GroupBy(Arc<Vec<VariableValue>>),
98    InputHash(u64),
99    Element(ElementReference),
100}
101
102impl ResultKey {
103    pub fn groupby_from_variables(keys: &[String], variables: &QueryVariables) -> ResultKey {
104        let mut grouping_keys = Vec::new();
105        for key in keys.iter() {
106            grouping_keys.push(
107                variables
108                    .get(key.as_str())
109                    .unwrap_or(&VariableValue::Null)
110                    .clone(),
111            );
112        }
113        ResultKey::GroupBy(Arc::new(grouping_keys))
114    }
115}
116
117impl std::hash::Hash for ResultKey {
118    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
119        match self {
120            ResultKey::GroupBy(grouping_keys) => {
121                for key in grouping_keys.iter() {
122                    key.hash_for_groupby(state);
123                }
124            }
125            ResultKey::InputHash(hash) => {
126                hash.hash(state);
127            }
128            ResultKey::Element(element_reference) => {
129                element_reference.hash(state);
130            }
131        }
132    }
133}