Skip to main content

cypherlite_query/executor/operators/
temporal_scan.rs

1// Temporal scan operators: AsOfScan and TemporalRangeScan
2//
3// X-T6: AsOfScan -- find node versions at a specific point in time
4// Y-T3: TemporalRangeScan -- find all versions within a time range
5
6use crate::executor::eval::eval;
7use crate::executor::operators::create::{SYSTEM_PROP_CREATED_AT, SYSTEM_PROP_UPDATED_AT};
8use crate::executor::{ExecutionError, Params, Record, ScalarFnLookup, Value};
9use crate::parser::ast::Expression;
10use cypherlite_core::{LabelRegistry, NodeId, PropertyValue};
11use cypherlite_storage::version::VersionRecord;
12use cypherlite_storage::StorageEngine;
13
14/// Execute an AsOfScan: for each node in source records, find the version
15/// that was current at the specified timestamp.
16///
17/// Algorithm:
18/// 1. Evaluate timestamp expression to get target time T
19/// 2. For each record, find node variables and look up their state at time T
20/// 3. A node's state at time T is determined by:
21///    - Collect all states: version snapshots + current state
22///    - Each state has an _updated_at timestamp (or _created_at if never updated)
23///    - Find the state with the largest _updated_at <= T
24///    - If no state has _updated_at <= T, the node is excluded
25pub fn execute_as_of_scan(
26    source_records: Vec<Record>,
27    timestamp_expr: &Expression,
28    engine: &mut StorageEngine,
29    params: &Params,
30    scalar_fns: &dyn ScalarFnLookup,
31) -> Result<Vec<Record>, ExecutionError> {
32    // Evaluate the timestamp expression using an empty record for context
33    let empty_record = Record::new();
34    let target_val = eval(timestamp_expr, &empty_record, engine, params, scalar_fns)?;
35    let target_ms = match target_val {
36        Value::DateTime(ms) => ms,
37        Value::Int64(ms) => ms,
38        _ => {
39            return Err(ExecutionError {
40                message: "AT TIME expression must evaluate to a DateTime or integer timestamp"
41                    .to_string(),
42            });
43        }
44    };
45
46    let updated_key = engine.get_or_create_prop_key(SYSTEM_PROP_UPDATED_AT);
47    let created_key = engine.get_or_create_prop_key(SYSTEM_PROP_CREATED_AT);
48
49    let mut results = Vec::new();
50
51    for record in &source_records {
52        // Find node variables in the record and apply temporal lookup
53        let mut temporal_record = record.clone();
54        let mut include = true;
55
56        for (var_name, value) in record {
57            if let Value::Node(node_id) = value {
58                match find_node_state_at(*node_id, target_ms, updated_key, created_key, engine) {
59                    Some(TemporalNodeState::Current) => {
60                        // Current state is valid at target time; keep the record as-is
61                    }
62                    Some(TemporalNodeState::Version(props)) => {
63                        // Replace the node's record with versioned properties
64                        // We keep the node ID but mark it with versioned properties
65                        // by injecting a special marker. For property access to work,
66                        // we store versioned properties so eval can pick them up.
67                        temporal_record.insert(
68                            format!("__temporal_props__{}", var_name),
69                            Value::List(
70                                props
71                                    .iter()
72                                    .map(|(k, v)| {
73                                        Value::List(vec![
74                                            Value::Int64(*k as i64),
75                                            Value::from(v.clone()),
76                                        ])
77                                    })
78                                    .collect(),
79                            ),
80                        );
81                    }
82                    None => {
83                        // Node did not exist at target time; exclude from results
84                        include = false;
85                        break;
86                    }
87                }
88            }
89        }
90
91        if include {
92            results.push(temporal_record);
93        }
94    }
95
96    Ok(results)
97}
98
99/// Execute a TemporalRangeScan: for each node in source records, find all
100/// versions within the specified time range.
101///
102/// Each version becomes a separate row in the result set.
103pub fn execute_temporal_range_scan(
104    source_records: Vec<Record>,
105    start_expr: &Expression,
106    end_expr: &Expression,
107    engine: &mut StorageEngine,
108    params: &Params,
109    scalar_fns: &dyn ScalarFnLookup,
110) -> Result<Vec<Record>, ExecutionError> {
111    let empty_record = Record::new();
112    let start_val = eval(start_expr, &empty_record, engine, params, scalar_fns)?;
113    let end_val = eval(end_expr, &empty_record, engine, params, scalar_fns)?;
114
115    let start_ms = match start_val {
116        Value::DateTime(ms) => ms,
117        Value::Int64(ms) => ms,
118        _ => {
119            return Err(ExecutionError {
120                message: "BETWEEN TIME start must evaluate to a DateTime or integer".to_string(),
121            });
122        }
123    };
124
125    let end_ms = match end_val {
126        Value::DateTime(ms) => ms,
127        Value::Int64(ms) => ms,
128        _ => {
129            return Err(ExecutionError {
130                message: "BETWEEN TIME end must evaluate to a DateTime or integer".to_string(),
131            });
132        }
133    };
134
135    let updated_key = engine.get_or_create_prop_key(SYSTEM_PROP_UPDATED_AT);
136    let created_key = engine.get_or_create_prop_key(SYSTEM_PROP_CREATED_AT);
137
138    let mut results = Vec::new();
139
140    for record in &source_records {
141        // Find node variables in the record and collect all versions in range
142        for (var_name, value) in record {
143            if let Value::Node(node_id) = value {
144                let versions_in_range = find_node_versions_in_range(
145                    *node_id,
146                    start_ms,
147                    end_ms,
148                    updated_key,
149                    created_key,
150                    engine,
151                );
152
153                for props in versions_in_range {
154                    let mut versioned_record = record.clone();
155                    versioned_record.insert(
156                        format!("__temporal_props__{}", var_name),
157                        Value::List(
158                            props
159                                .iter()
160                                .map(|(k, v)| {
161                                    Value::List(vec![
162                                        Value::Int64(*k as i64),
163                                        Value::from(v.clone()),
164                                    ])
165                                })
166                                .collect(),
167                        ),
168                    );
169                    results.push(versioned_record);
170                }
171                // Only handle first node variable for simplicity
172                break;
173            }
174        }
175    }
176
177    Ok(results)
178}
179
180/// Result of looking up a node's state at a specific time.
181enum TemporalNodeState {
182    /// The current (live) state is valid at the target time.
183    Current,
184    /// A historical version's properties should be used.
185    Version(Vec<(u32, PropertyValue)>),
186}
187
188/// Find the node state at a specific timestamp.
189///
190/// Returns None if the node did not exist at that time.
191fn find_node_state_at(
192    node_id: NodeId,
193    target_ms: i64,
194    updated_key: u32,
195    created_key: u32,
196    engine: &StorageEngine,
197) -> Option<TemporalNodeState> {
198    let current = engine.get_node(node_id)?;
199
200    // Get current node's _created_at
201    let current_created = get_timestamp_prop(&current.properties, created_key)?;
202
203    // If node was created after target time, it didn't exist then
204    if current_created > target_ms {
205        return None;
206    }
207
208    // Get current node's _updated_at
209    let current_updated =
210        get_timestamp_prop(&current.properties, updated_key).unwrap_or(current_created);
211
212    // If current state's _updated_at <= target_ms, current state is valid
213    if current_updated <= target_ms {
214        return Some(TemporalNodeState::Current);
215    }
216
217    // Current state was updated after target time; check version chain
218    let version_chain = engine.version_store().get_version_chain(node_id.0);
219
220    // Iterate from newest to oldest to find the latest version valid at target time
221    for (_seq, version) in version_chain.iter().rev() {
222        if let VersionRecord::Node(node_record) = version {
223            let version_updated = get_timestamp_prop(&node_record.properties, updated_key)
224                .or_else(|| get_timestamp_prop(&node_record.properties, created_key));
225
226            if let Some(ts) = version_updated {
227                if ts <= target_ms {
228                    return Some(TemporalNodeState::Version(node_record.properties.clone()));
229                }
230            }
231        }
232    }
233
234    // No version found at target time; check if the earliest version's _created_at <= target
235    // This handles the case where the node's first state is in the version chain
236    if let Some((_seq, VersionRecord::Node(node_record))) = version_chain.first() {
237        let version_created = get_timestamp_prop(&node_record.properties, created_key);
238        if let Some(ts) = version_created {
239            if ts <= target_ms {
240                return Some(TemporalNodeState::Version(node_record.properties.clone()));
241            }
242        }
243    }
244
245    None
246}
247
248/// Find all node versions within a time range [start_ms, end_ms].
249fn find_node_versions_in_range(
250    node_id: NodeId,
251    start_ms: i64,
252    end_ms: i64,
253    updated_key: u32,
254    created_key: u32,
255    engine: &StorageEngine,
256) -> Vec<Vec<(u32, PropertyValue)>> {
257    let mut results = Vec::new();
258
259    // Check version chain
260    let version_chain = engine.version_store().get_version_chain(node_id.0);
261
262    for (_seq, version) in &version_chain {
263        if let VersionRecord::Node(node_record) = version {
264            let ts = get_timestamp_prop(&node_record.properties, updated_key)
265                .or_else(|| get_timestamp_prop(&node_record.properties, created_key));
266
267            if let Some(ts) = ts {
268                if ts >= start_ms && ts <= end_ms {
269                    results.push(node_record.properties.clone());
270                }
271            }
272        }
273    }
274
275    // Also check current state
276    if let Some(current) = engine.get_node(node_id) {
277        let ts = get_timestamp_prop(&current.properties, updated_key)
278            .or_else(|| get_timestamp_prop(&current.properties, created_key));
279
280        if let Some(ts) = ts {
281            if ts >= start_ms && ts <= end_ms {
282                results.push(current.properties.clone());
283            }
284        }
285    }
286
287    results
288}
289
290/// Extract a DateTime timestamp from a property list by key.
291fn get_timestamp_prop(props: &[(u32, PropertyValue)], key: u32) -> Option<i64> {
292    props.iter().find(|(k, _)| *k == key).and_then(|(_, v)| {
293        if let PropertyValue::DateTime(ms) = v {
294            Some(*ms)
295        } else {
296            None
297        }
298    })
299}
300
301#[cfg(test)]
302mod tests {
303    use super::*;
304
305    #[test]
306    fn test_get_timestamp_prop_found() {
307        let props = vec![
308            (1, PropertyValue::String("hello".to_string())),
309            (2, PropertyValue::DateTime(1000)),
310        ];
311        assert_eq!(get_timestamp_prop(&props, 2), Some(1000));
312    }
313
314    #[test]
315    fn test_get_timestamp_prop_not_found() {
316        let props = vec![(1, PropertyValue::String("hello".to_string()))];
317        assert_eq!(get_timestamp_prop(&props, 2), None);
318    }
319
320    #[test]
321    fn test_get_timestamp_prop_wrong_type() {
322        let props = vec![(2, PropertyValue::Int64(1000))];
323        assert_eq!(get_timestamp_prop(&props, 2), None);
324    }
325}