use crate::executor::eval::eval;
use crate::executor::operators::create::{SYSTEM_PROP_CREATED_AT, SYSTEM_PROP_UPDATED_AT};
use crate::executor::{ExecutionError, Params, Record, ScalarFnLookup, Value};
use crate::parser::ast::Expression;
use cypherlite_core::{LabelRegistry, NodeId, PropertyValue};
use cypherlite_storage::version::VersionRecord;
use cypherlite_storage::StorageEngine;
pub fn execute_as_of_scan(
source_records: Vec<Record>,
timestamp_expr: &Expression,
engine: &mut StorageEngine,
params: &Params,
scalar_fns: &dyn ScalarFnLookup,
) -> Result<Vec<Record>, ExecutionError> {
let empty_record = Record::new();
let target_val = eval(timestamp_expr, &empty_record, engine, params, scalar_fns)?;
let target_ms = match target_val {
Value::DateTime(ms) => ms,
Value::Int64(ms) => ms,
_ => {
return Err(ExecutionError {
message: "AT TIME expression must evaluate to a DateTime or integer timestamp"
.to_string(),
});
}
};
let updated_key = engine.get_or_create_prop_key(SYSTEM_PROP_UPDATED_AT);
let created_key = engine.get_or_create_prop_key(SYSTEM_PROP_CREATED_AT);
let mut results = Vec::new();
for record in &source_records {
let mut temporal_record = record.clone();
let mut include = true;
for (var_name, value) in record {
if let Value::Node(node_id) = value {
match find_node_state_at(*node_id, target_ms, updated_key, created_key, engine) {
Some(TemporalNodeState::Current) => {
}
Some(TemporalNodeState::Version(props)) => {
temporal_record.insert(
format!("__temporal_props__{}", var_name),
Value::List(
props
.iter()
.map(|(k, v)| {
Value::List(vec![
Value::Int64(*k as i64),
Value::from(v.clone()),
])
})
.collect(),
),
);
}
None => {
include = false;
break;
}
}
}
}
if include {
results.push(temporal_record);
}
}
Ok(results)
}
pub fn execute_temporal_range_scan(
source_records: Vec<Record>,
start_expr: &Expression,
end_expr: &Expression,
engine: &mut StorageEngine,
params: &Params,
scalar_fns: &dyn ScalarFnLookup,
) -> Result<Vec<Record>, ExecutionError> {
let empty_record = Record::new();
let start_val = eval(start_expr, &empty_record, engine, params, scalar_fns)?;
let end_val = eval(end_expr, &empty_record, engine, params, scalar_fns)?;
let start_ms = match start_val {
Value::DateTime(ms) => ms,
Value::Int64(ms) => ms,
_ => {
return Err(ExecutionError {
message: "BETWEEN TIME start must evaluate to a DateTime or integer".to_string(),
});
}
};
let end_ms = match end_val {
Value::DateTime(ms) => ms,
Value::Int64(ms) => ms,
_ => {
return Err(ExecutionError {
message: "BETWEEN TIME end must evaluate to a DateTime or integer".to_string(),
});
}
};
let updated_key = engine.get_or_create_prop_key(SYSTEM_PROP_UPDATED_AT);
let created_key = engine.get_or_create_prop_key(SYSTEM_PROP_CREATED_AT);
let mut results = Vec::new();
for record in &source_records {
for (var_name, value) in record {
if let Value::Node(node_id) = value {
let versions_in_range = find_node_versions_in_range(
*node_id,
start_ms,
end_ms,
updated_key,
created_key,
engine,
);
for props in versions_in_range {
let mut versioned_record = record.clone();
versioned_record.insert(
format!("__temporal_props__{}", var_name),
Value::List(
props
.iter()
.map(|(k, v)| {
Value::List(vec![
Value::Int64(*k as i64),
Value::from(v.clone()),
])
})
.collect(),
),
);
results.push(versioned_record);
}
break;
}
}
}
Ok(results)
}
enum TemporalNodeState {
Current,
Version(Vec<(u32, PropertyValue)>),
}
fn find_node_state_at(
node_id: NodeId,
target_ms: i64,
updated_key: u32,
created_key: u32,
engine: &StorageEngine,
) -> Option<TemporalNodeState> {
let current = engine.get_node(node_id)?;
let current_created = get_timestamp_prop(¤t.properties, created_key)?;
if current_created > target_ms {
return None;
}
let current_updated =
get_timestamp_prop(¤t.properties, updated_key).unwrap_or(current_created);
if current_updated <= target_ms {
return Some(TemporalNodeState::Current);
}
let version_chain = engine.version_store().get_version_chain(node_id.0);
for (_seq, version) in version_chain.iter().rev() {
if let VersionRecord::Node(node_record) = version {
let version_updated = get_timestamp_prop(&node_record.properties, updated_key)
.or_else(|| get_timestamp_prop(&node_record.properties, created_key));
if let Some(ts) = version_updated {
if ts <= target_ms {
return Some(TemporalNodeState::Version(node_record.properties.clone()));
}
}
}
}
if let Some((_seq, VersionRecord::Node(node_record))) = version_chain.first() {
let version_created = get_timestamp_prop(&node_record.properties, created_key);
if let Some(ts) = version_created {
if ts <= target_ms {
return Some(TemporalNodeState::Version(node_record.properties.clone()));
}
}
}
None
}
fn find_node_versions_in_range(
node_id: NodeId,
start_ms: i64,
end_ms: i64,
updated_key: u32,
created_key: u32,
engine: &StorageEngine,
) -> Vec<Vec<(u32, PropertyValue)>> {
let mut results = Vec::new();
let version_chain = engine.version_store().get_version_chain(node_id.0);
for (_seq, version) in &version_chain {
if let VersionRecord::Node(node_record) = version {
let ts = get_timestamp_prop(&node_record.properties, updated_key)
.or_else(|| get_timestamp_prop(&node_record.properties, created_key));
if let Some(ts) = ts {
if ts >= start_ms && ts <= end_ms {
results.push(node_record.properties.clone());
}
}
}
}
if let Some(current) = engine.get_node(node_id) {
let ts = get_timestamp_prop(¤t.properties, updated_key)
.or_else(|| get_timestamp_prop(¤t.properties, created_key));
if let Some(ts) = ts {
if ts >= start_ms && ts <= end_ms {
results.push(current.properties.clone());
}
}
}
results
}
fn get_timestamp_prop(props: &[(u32, PropertyValue)], key: u32) -> Option<i64> {
props.iter().find(|(k, _)| *k == key).and_then(|(_, v)| {
if let PropertyValue::DateTime(ms) = v {
Some(*ms)
} else {
None
}
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_timestamp_prop_found() {
let props = vec![
(1, PropertyValue::String("hello".to_string())),
(2, PropertyValue::DateTime(1000)),
];
assert_eq!(get_timestamp_prop(&props, 2), Some(1000));
}
#[test]
fn test_get_timestamp_prop_not_found() {
let props = vec![(1, PropertyValue::String("hello".to_string()))];
assert_eq!(get_timestamp_prop(&props, 2), None);
}
#[test]
fn test_get_timestamp_prop_wrong_type() {
let props = vec![(2, PropertyValue::Int64(1000))];
assert_eq!(get_timestamp_prop(&props, 2), None);
}
}