drasi_core/evaluation/functions/past/
mod.rs

1// Copyright 2024 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use crate::evaluation::variable_value::VariableValue;
18use async_trait::async_trait;
19use chrono::NaiveTime;
20use drasi_query_ast::ast;
21use futures::StreamExt;
22
23use crate::{
24    evaluation::{ExpressionEvaluationContext, FunctionError, FunctionEvaluationError},
25    interface::ElementArchiveIndex,
26};
27
28use super::{Function, FunctionRegistry, ScalarFunction};
29use crate::models::{Element, TimestampBound, TimestampRange};
30
31pub trait RegisterPastFunctions {
32    fn register_past_functions(&self, archive_index: Arc<dyn ElementArchiveIndex>);
33}
34
35impl RegisterPastFunctions for FunctionRegistry {
36    fn register_past_functions(&self, archive_index: Arc<dyn ElementArchiveIndex>) {
37        self.register_function(
38            "drasi.getVersionByTimestamp",
39            Function::Scalar(Arc::new(GetVersionByTimestamp {
40                archive_index: archive_index.clone(),
41            })),
42        );
43
44        self.register_function(
45            "drasi.getVersionsByTimeRange",
46            Function::Scalar(Arc::new(GetVersionsByTimeRange {
47                archive_index: archive_index.clone(),
48            })),
49        );
50    }
51}
52
53pub struct GetVersionByTimestamp {
54    archive_index: Arc<dyn ElementArchiveIndex>,
55}
56
57#[async_trait]
58impl ScalarFunction for GetVersionByTimestamp {
59    async fn call(
60        &self,
61        _context: &ExpressionEvaluationContext,
62        expression: &ast::FunctionExpression,
63        args: Vec<VariableValue>,
64    ) -> Result<VariableValue, FunctionError> {
65        if args.len() != 2 {
66            return Err(FunctionError {
67                function_name: expression.name.to_string(),
68                error: FunctionEvaluationError::InvalidArgumentCount,
69            });
70        }
71
72        let metadata = match &args[0] {
73            VariableValue::Element(e) => e.get_metadata(),
74            _ => {
75                return Err(FunctionError {
76                    function_name: expression.name.to_string(),
77                    error: FunctionEvaluationError::InvalidArgument(0),
78                })
79            }
80        };
81
82        let timestamp = match &args[1] {
83            VariableValue::Date(d) => {
84                d.and_time(NaiveTime::MIN).and_utc().timestamp_millis() as u64
85            }
86            VariableValue::LocalDateTime(d) => d.and_utc().timestamp_millis() as u64,
87            VariableValue::ZonedDateTime(d) => d.datetime().timestamp_millis() as u64,
88            VariableValue::Integer(n) => match n.as_u64() {
89                Some(u) => u,
90                None => {
91                    return Err(FunctionError {
92                        function_name: expression.name.to_string(),
93                        error: FunctionEvaluationError::OverflowError,
94                    })
95                }
96            },
97            _ => {
98                return Err(FunctionError {
99                    function_name: expression.name.to_string(),
100                    error: FunctionEvaluationError::InvalidArgument(1),
101                })
102            }
103        };
104
105        let element = match self
106            .archive_index
107            .get_element_as_at(&metadata.reference, timestamp)
108            .await
109        {
110            Ok(e) => e,
111            Err(e) => {
112                return Err(FunctionError {
113                    function_name: expression.name.to_string(),
114                    error: FunctionEvaluationError::IndexError(e),
115                })
116            }
117        };
118
119        match element {
120            Some(e) => Ok(e.to_expression_variable()),
121            None => Ok(VariableValue::Null),
122        }
123    }
124}
125
126pub struct GetVersionsByTimeRange {
127    archive_index: Arc<dyn ElementArchiveIndex>,
128}
129
130#[async_trait]
131impl ScalarFunction for GetVersionsByTimeRange {
132    async fn call(
133        &self,
134        _context: &ExpressionEvaluationContext,
135        expression: &ast::FunctionExpression,
136        args: Vec<VariableValue>,
137    ) -> Result<VariableValue, FunctionError> {
138        if args.len() < 3 || args.len() > 4 {
139            return Err(FunctionError {
140                function_name: expression.name.to_string(),
141                error: FunctionEvaluationError::InvalidArgumentCount,
142            });
143        }
144        let metadata = match &args[0] {
145            VariableValue::Element(e) => e.get_metadata(),
146            _ => {
147                return Err(FunctionError {
148                    function_name: expression.name.to_string(),
149                    error: FunctionEvaluationError::InvalidArgument(0),
150                })
151            }
152        };
153
154        let from = match &args[1] {
155            VariableValue::Date(d) => {
156                d.and_time(NaiveTime::MIN).and_utc().timestamp_millis() as u64
157            }
158            VariableValue::LocalDateTime(d) => d.and_utc().timestamp_millis() as u64,
159            VariableValue::ZonedDateTime(d) => d.datetime().timestamp_millis() as u64,
160            VariableValue::Integer(n) => match n.as_u64() {
161                Some(u) => u,
162                None => {
163                    return Err(FunctionError {
164                        function_name: expression.name.to_string(),
165                        error: FunctionEvaluationError::OverflowError,
166                    })
167                }
168            },
169            _ => {
170                return Err(FunctionError {
171                    function_name: expression.name.to_string(),
172                    error: FunctionEvaluationError::InvalidArgument(1),
173                })
174            }
175        };
176
177        let to = match &args[2] {
178            VariableValue::Date(d) => {
179                d.and_time(NaiveTime::MIN).and_utc().timestamp_millis() as u64
180            }
181            VariableValue::LocalDateTime(d) => d.and_utc().timestamp_millis() as u64,
182            VariableValue::ZonedDateTime(d) => d.datetime().timestamp_millis() as u64,
183            VariableValue::Integer(n) => match n.as_u64() {
184                Some(u) => u,
185                None => {
186                    return Err(FunctionError {
187                        function_name: expression.name.to_string(),
188                        error: FunctionEvaluationError::OverflowError,
189                    })
190                }
191            },
192            _ => {
193                return Err(FunctionError {
194                    function_name: expression.name.to_string(),
195                    error: FunctionEvaluationError::InvalidArgument(2),
196                })
197            }
198        };
199
200        let retrieve_initial_value = match args.get(3) {
201            Some(VariableValue::Bool(b)) => *b,
202            None => false,
203            _ => {
204                return Err(FunctionError {
205                    function_name: expression.name.to_string(),
206                    error: FunctionEvaluationError::InvalidArgument(3),
207                })
208            }
209        };
210
211        let range = match retrieve_initial_value {
212            true => TimestampRange {
213                from: TimestampBound::StartFromPrevious(from),
214                to,
215            },
216            false => TimestampRange {
217                from: TimestampBound::Included(from),
218                to,
219            },
220        };
221
222        let mut stream = match self
223            .archive_index
224            .get_element_versions(&metadata.reference, range)
225            .await
226        {
227            Ok(s) => s,
228            Err(e) => {
229                return Err(FunctionError {
230                    function_name: expression.name.to_string(),
231                    error: FunctionEvaluationError::IndexError(e),
232                })
233            }
234        };
235
236        let mut result = Vec::new();
237        while let Some(item) = stream.next().await {
238            match item {
239                Ok(e) => {
240                    // if effective time is less than from and if retrieve_initial_value is true
241                    //  then we need to update the effective time to be from
242                    if retrieve_initial_value && e.get_effective_from() < from {
243                        let mut metadata = e.get_metadata().clone();
244                        metadata.effective_from = from;
245                        let _result_element = Element::Node {
246                            metadata,
247                            properties: e.get_properties().clone(),
248                        };
249                        result.push(e.to_expression_variable())
250                    } else {
251                        result.push(e.to_expression_variable())
252                    }
253                }
254                Err(e) => {
255                    return Err(FunctionError {
256                        function_name: expression.name.to_string(),
257                        error: FunctionEvaluationError::IndexError(e),
258                    })
259                }
260            }
261        }
262
263        Ok(VariableValue::List(result))
264    }
265}