drasi_core/evaluation/functions/past/
mod.rs1use std::sync::Arc;
16
17use crate::evaluation::variable_value::VariableValue;
18use async_trait::async_trait;
19use chrono::NaiveTime;
20use drasi_query_ast::ast;
21use futures::StreamExt;
22
23use crate::{
24 evaluation::{ExpressionEvaluationContext, FunctionError, FunctionEvaluationError},
25 interface::ElementArchiveIndex,
26};
27
28use super::{Function, FunctionRegistry, ScalarFunction};
29use crate::models::{Element, TimestampBound, TimestampRange};
30
31pub trait RegisterPastFunctions {
32 fn register_past_functions(&self, archive_index: Arc<dyn ElementArchiveIndex>);
33}
34
35impl RegisterPastFunctions for FunctionRegistry {
36 fn register_past_functions(&self, archive_index: Arc<dyn ElementArchiveIndex>) {
37 self.register_function(
38 "drasi.getVersionByTimestamp",
39 Function::Scalar(Arc::new(GetVersionByTimestamp {
40 archive_index: archive_index.clone(),
41 })),
42 );
43
44 self.register_function(
45 "drasi.getVersionsByTimeRange",
46 Function::Scalar(Arc::new(GetVersionsByTimeRange {
47 archive_index: archive_index.clone(),
48 })),
49 );
50 }
51}
52
53pub struct GetVersionByTimestamp {
54 archive_index: Arc<dyn ElementArchiveIndex>,
55}
56
57#[async_trait]
58impl ScalarFunction for GetVersionByTimestamp {
59 async fn call(
60 &self,
61 _context: &ExpressionEvaluationContext,
62 expression: &ast::FunctionExpression,
63 args: Vec<VariableValue>,
64 ) -> Result<VariableValue, FunctionError> {
65 if args.len() != 2 {
66 return Err(FunctionError {
67 function_name: expression.name.to_string(),
68 error: FunctionEvaluationError::InvalidArgumentCount,
69 });
70 }
71
72 let metadata = match &args[0] {
73 VariableValue::Element(e) => e.get_metadata(),
74 _ => {
75 return Err(FunctionError {
76 function_name: expression.name.to_string(),
77 error: FunctionEvaluationError::InvalidArgument(0),
78 })
79 }
80 };
81
82 let timestamp = match &args[1] {
83 VariableValue::Date(d) => {
84 d.and_time(NaiveTime::MIN).and_utc().timestamp_millis() as u64
85 }
86 VariableValue::LocalDateTime(d) => d.and_utc().timestamp_millis() as u64,
87 VariableValue::ZonedDateTime(d) => d.datetime().timestamp_millis() as u64,
88 VariableValue::Integer(n) => match n.as_u64() {
89 Some(u) => u,
90 None => {
91 return Err(FunctionError {
92 function_name: expression.name.to_string(),
93 error: FunctionEvaluationError::OverflowError,
94 })
95 }
96 },
97 _ => {
98 return Err(FunctionError {
99 function_name: expression.name.to_string(),
100 error: FunctionEvaluationError::InvalidArgument(1),
101 })
102 }
103 };
104
105 let element = match self
106 .archive_index
107 .get_element_as_at(&metadata.reference, timestamp)
108 .await
109 {
110 Ok(e) => e,
111 Err(e) => {
112 return Err(FunctionError {
113 function_name: expression.name.to_string(),
114 error: FunctionEvaluationError::IndexError(e),
115 })
116 }
117 };
118
119 match element {
120 Some(e) => Ok(e.to_expression_variable()),
121 None => Ok(VariableValue::Null),
122 }
123 }
124}
125
126pub struct GetVersionsByTimeRange {
127 archive_index: Arc<dyn ElementArchiveIndex>,
128}
129
130#[async_trait]
131impl ScalarFunction for GetVersionsByTimeRange {
132 async fn call(
133 &self,
134 _context: &ExpressionEvaluationContext,
135 expression: &ast::FunctionExpression,
136 args: Vec<VariableValue>,
137 ) -> Result<VariableValue, FunctionError> {
138 if args.len() < 3 || args.len() > 4 {
139 return Err(FunctionError {
140 function_name: expression.name.to_string(),
141 error: FunctionEvaluationError::InvalidArgumentCount,
142 });
143 }
144 let metadata = match &args[0] {
145 VariableValue::Element(e) => e.get_metadata(),
146 _ => {
147 return Err(FunctionError {
148 function_name: expression.name.to_string(),
149 error: FunctionEvaluationError::InvalidArgument(0),
150 })
151 }
152 };
153
154 let from = match &args[1] {
155 VariableValue::Date(d) => {
156 d.and_time(NaiveTime::MIN).and_utc().timestamp_millis() as u64
157 }
158 VariableValue::LocalDateTime(d) => d.and_utc().timestamp_millis() as u64,
159 VariableValue::ZonedDateTime(d) => d.datetime().timestamp_millis() as u64,
160 VariableValue::Integer(n) => match n.as_u64() {
161 Some(u) => u,
162 None => {
163 return Err(FunctionError {
164 function_name: expression.name.to_string(),
165 error: FunctionEvaluationError::OverflowError,
166 })
167 }
168 },
169 _ => {
170 return Err(FunctionError {
171 function_name: expression.name.to_string(),
172 error: FunctionEvaluationError::InvalidArgument(1),
173 })
174 }
175 };
176
177 let to = match &args[2] {
178 VariableValue::Date(d) => {
179 d.and_time(NaiveTime::MIN).and_utc().timestamp_millis() as u64
180 }
181 VariableValue::LocalDateTime(d) => d.and_utc().timestamp_millis() as u64,
182 VariableValue::ZonedDateTime(d) => d.datetime().timestamp_millis() as u64,
183 VariableValue::Integer(n) => match n.as_u64() {
184 Some(u) => u,
185 None => {
186 return Err(FunctionError {
187 function_name: expression.name.to_string(),
188 error: FunctionEvaluationError::OverflowError,
189 })
190 }
191 },
192 _ => {
193 return Err(FunctionError {
194 function_name: expression.name.to_string(),
195 error: FunctionEvaluationError::InvalidArgument(2),
196 })
197 }
198 };
199
200 let retrieve_initial_value = match args.get(3) {
201 Some(VariableValue::Bool(b)) => *b,
202 None => false,
203 _ => {
204 return Err(FunctionError {
205 function_name: expression.name.to_string(),
206 error: FunctionEvaluationError::InvalidArgument(3),
207 })
208 }
209 };
210
211 let range = match retrieve_initial_value {
212 true => TimestampRange {
213 from: TimestampBound::StartFromPrevious(from),
214 to,
215 },
216 false => TimestampRange {
217 from: TimestampBound::Included(from),
218 to,
219 },
220 };
221
222 let mut stream = match self
223 .archive_index
224 .get_element_versions(&metadata.reference, range)
225 .await
226 {
227 Ok(s) => s,
228 Err(e) => {
229 return Err(FunctionError {
230 function_name: expression.name.to_string(),
231 error: FunctionEvaluationError::IndexError(e),
232 })
233 }
234 };
235
236 let mut result = Vec::new();
237 while let Some(item) = stream.next().await {
238 match item {
239 Ok(e) => {
240 if retrieve_initial_value && e.get_effective_from() < from {
243 let mut metadata = e.get_metadata().clone();
244 metadata.effective_from = from;
245 let _result_element = Element::Node {
246 metadata,
247 properties: e.get_properties().clone(),
248 };
249 result.push(e.to_expression_variable())
250 } else {
251 result.push(e.to_expression_variable())
252 }
253 }
254 Err(e) => {
255 return Err(FunctionError {
256 function_name: expression.name.to_string(),
257 error: FunctionEvaluationError::IndexError(e),
258 })
259 }
260 }
261 }
262
263 Ok(VariableValue::List(result))
264 }
265}