cypherlite_query/executor/operators/
temporal_scan.rs1use crate::executor::eval::eval;
7use crate::executor::operators::create::{SYSTEM_PROP_CREATED_AT, SYSTEM_PROP_UPDATED_AT};
8use crate::executor::{ExecutionError, Params, Record, ScalarFnLookup, Value};
9use crate::parser::ast::Expression;
10use cypherlite_core::{LabelRegistry, NodeId, PropertyValue};
11use cypherlite_storage::version::VersionRecord;
12use cypherlite_storage::StorageEngine;
13
14pub fn execute_as_of_scan(
26 source_records: Vec<Record>,
27 timestamp_expr: &Expression,
28 engine: &mut StorageEngine,
29 params: &Params,
30 scalar_fns: &dyn ScalarFnLookup,
31) -> Result<Vec<Record>, ExecutionError> {
32 let empty_record = Record::new();
34 let target_val = eval(timestamp_expr, &empty_record, engine, params, scalar_fns)?;
35 let target_ms = match target_val {
36 Value::DateTime(ms) => ms,
37 Value::Int64(ms) => ms,
38 _ => {
39 return Err(ExecutionError {
40 message: "AT TIME expression must evaluate to a DateTime or integer timestamp"
41 .to_string(),
42 });
43 }
44 };
45
46 let updated_key = engine.get_or_create_prop_key(SYSTEM_PROP_UPDATED_AT);
47 let created_key = engine.get_or_create_prop_key(SYSTEM_PROP_CREATED_AT);
48
49 let mut results = Vec::new();
50
51 for record in &source_records {
52 let mut temporal_record = record.clone();
54 let mut include = true;
55
56 for (var_name, value) in record {
57 if let Value::Node(node_id) = value {
58 match find_node_state_at(*node_id, target_ms, updated_key, created_key, engine) {
59 Some(TemporalNodeState::Current) => {
60 }
62 Some(TemporalNodeState::Version(props)) => {
63 temporal_record.insert(
68 format!("__temporal_props__{}", var_name),
69 Value::List(
70 props
71 .iter()
72 .map(|(k, v)| {
73 Value::List(vec![
74 Value::Int64(*k as i64),
75 Value::from(v.clone()),
76 ])
77 })
78 .collect(),
79 ),
80 );
81 }
82 None => {
83 include = false;
85 break;
86 }
87 }
88 }
89 }
90
91 if include {
92 results.push(temporal_record);
93 }
94 }
95
96 Ok(results)
97}
98
99pub fn execute_temporal_range_scan(
104 source_records: Vec<Record>,
105 start_expr: &Expression,
106 end_expr: &Expression,
107 engine: &mut StorageEngine,
108 params: &Params,
109 scalar_fns: &dyn ScalarFnLookup,
110) -> Result<Vec<Record>, ExecutionError> {
111 let empty_record = Record::new();
112 let start_val = eval(start_expr, &empty_record, engine, params, scalar_fns)?;
113 let end_val = eval(end_expr, &empty_record, engine, params, scalar_fns)?;
114
115 let start_ms = match start_val {
116 Value::DateTime(ms) => ms,
117 Value::Int64(ms) => ms,
118 _ => {
119 return Err(ExecutionError {
120 message: "BETWEEN TIME start must evaluate to a DateTime or integer".to_string(),
121 });
122 }
123 };
124
125 let end_ms = match end_val {
126 Value::DateTime(ms) => ms,
127 Value::Int64(ms) => ms,
128 _ => {
129 return Err(ExecutionError {
130 message: "BETWEEN TIME end must evaluate to a DateTime or integer".to_string(),
131 });
132 }
133 };
134
135 let updated_key = engine.get_or_create_prop_key(SYSTEM_PROP_UPDATED_AT);
136 let created_key = engine.get_or_create_prop_key(SYSTEM_PROP_CREATED_AT);
137
138 let mut results = Vec::new();
139
140 for record in &source_records {
141 for (var_name, value) in record {
143 if let Value::Node(node_id) = value {
144 let versions_in_range = find_node_versions_in_range(
145 *node_id,
146 start_ms,
147 end_ms,
148 updated_key,
149 created_key,
150 engine,
151 );
152
153 for props in versions_in_range {
154 let mut versioned_record = record.clone();
155 versioned_record.insert(
156 format!("__temporal_props__{}", var_name),
157 Value::List(
158 props
159 .iter()
160 .map(|(k, v)| {
161 Value::List(vec![
162 Value::Int64(*k as i64),
163 Value::from(v.clone()),
164 ])
165 })
166 .collect(),
167 ),
168 );
169 results.push(versioned_record);
170 }
171 break;
173 }
174 }
175 }
176
177 Ok(results)
178}
179
180enum TemporalNodeState {
182 Current,
184 Version(Vec<(u32, PropertyValue)>),
186}
187
188fn find_node_state_at(
192 node_id: NodeId,
193 target_ms: i64,
194 updated_key: u32,
195 created_key: u32,
196 engine: &StorageEngine,
197) -> Option<TemporalNodeState> {
198 let current = engine.get_node(node_id)?;
199
200 let current_created = get_timestamp_prop(¤t.properties, created_key)?;
202
203 if current_created > target_ms {
205 return None;
206 }
207
208 let current_updated =
210 get_timestamp_prop(¤t.properties, updated_key).unwrap_or(current_created);
211
212 if current_updated <= target_ms {
214 return Some(TemporalNodeState::Current);
215 }
216
217 let version_chain = engine.version_store().get_version_chain(node_id.0);
219
220 for (_seq, version) in version_chain.iter().rev() {
222 if let VersionRecord::Node(node_record) = version {
223 let version_updated = get_timestamp_prop(&node_record.properties, updated_key)
224 .or_else(|| get_timestamp_prop(&node_record.properties, created_key));
225
226 if let Some(ts) = version_updated {
227 if ts <= target_ms {
228 return Some(TemporalNodeState::Version(node_record.properties.clone()));
229 }
230 }
231 }
232 }
233
234 if let Some((_seq, VersionRecord::Node(node_record))) = version_chain.first() {
237 let version_created = get_timestamp_prop(&node_record.properties, created_key);
238 if let Some(ts) = version_created {
239 if ts <= target_ms {
240 return Some(TemporalNodeState::Version(node_record.properties.clone()));
241 }
242 }
243 }
244
245 None
246}
247
248fn find_node_versions_in_range(
250 node_id: NodeId,
251 start_ms: i64,
252 end_ms: i64,
253 updated_key: u32,
254 created_key: u32,
255 engine: &StorageEngine,
256) -> Vec<Vec<(u32, PropertyValue)>> {
257 let mut results = Vec::new();
258
259 let version_chain = engine.version_store().get_version_chain(node_id.0);
261
262 for (_seq, version) in &version_chain {
263 if let VersionRecord::Node(node_record) = version {
264 let ts = get_timestamp_prop(&node_record.properties, updated_key)
265 .or_else(|| get_timestamp_prop(&node_record.properties, created_key));
266
267 if let Some(ts) = ts {
268 if ts >= start_ms && ts <= end_ms {
269 results.push(node_record.properties.clone());
270 }
271 }
272 }
273 }
274
275 if let Some(current) = engine.get_node(node_id) {
277 let ts = get_timestamp_prop(¤t.properties, updated_key)
278 .or_else(|| get_timestamp_prop(¤t.properties, created_key));
279
280 if let Some(ts) = ts {
281 if ts >= start_ms && ts <= end_ms {
282 results.push(current.properties.clone());
283 }
284 }
285 }
286
287 results
288}
289
290fn get_timestamp_prop(props: &[(u32, PropertyValue)], key: u32) -> Option<i64> {
292 props.iter().find(|(k, _)| *k == key).and_then(|(_, v)| {
293 if let PropertyValue::DateTime(ms) = v {
294 Some(*ms)
295 } else {
296 None
297 }
298 })
299}
300
301#[cfg(test)]
302mod tests {
303 use super::*;
304
305 #[test]
306 fn test_get_timestamp_prop_found() {
307 let props = vec![
308 (1, PropertyValue::String("hello".to_string())),
309 (2, PropertyValue::DateTime(1000)),
310 ];
311 assert_eq!(get_timestamp_prop(&props, 2), Some(1000));
312 }
313
314 #[test]
315 fn test_get_timestamp_prop_not_found() {
316 let props = vec![(1, PropertyValue::String("hello".to_string()))];
317 assert_eq!(get_timestamp_prop(&props, 2), None);
318 }
319
320 #[test]
321 fn test_get_timestamp_prop_wrong_type() {
322 let props = vec![(2, PropertyValue::Int64(1000))];
323 assert_eq!(get_timestamp_prop(&props, 2), None);
324 }
325}