use std::sync::Arc;
use drasi_query_cypher::CypherParser;
use serde_json::json;
use crate::{
evaluation::{
context::QueryPartEvaluationContext,
functions::{Function, FunctionRegistry, Sum},
},
in_memory_index::{
in_memory_element_index::InMemoryElementIndex, in_memory_future_queue::InMemoryFutureQueue,
in_memory_result_index::InMemoryResultIndex,
},
models::{Element, ElementMetadata, ElementPropertyMap, ElementReference, SourceChange},
query::QueryBuilder,
};
fn create_registry_with_sum() -> Arc<FunctionRegistry> {
let registry = Arc::new(FunctionRegistry::new());
registry.register_function("sum", Function::Aggregating(Arc::new(Sum {})));
registry
}
async fn build_simple_query(query_str: &str) -> crate::query::ContinuousQuery {
let function_registry = Arc::new(FunctionRegistry::new());
let parser = Arc::new(CypherParser::new(function_registry.clone()));
let builder = QueryBuilder::new(query_str, parser);
builder.build().await
}
async fn build_aggregating_query(query_str: &str) -> crate::query::ContinuousQuery {
let function_registry = create_registry_with_sum();
let parser = Arc::new(CypherParser::new(function_registry.clone()));
let builder =
QueryBuilder::new(query_str, parser).with_function_registry(function_registry.clone());
builder.build().await
}
async fn build_query_with_indexes(query_str: &str) -> crate::query::ContinuousQuery {
let function_registry = Arc::new(FunctionRegistry::new());
let parser = Arc::new(CypherParser::new(function_registry.clone()));
let element_index = Arc::new(InMemoryElementIndex::new());
let result_index = Arc::new(InMemoryResultIndex::new());
let future_queue = Arc::new(InMemoryFutureQueue::new());
let builder = QueryBuilder::new(query_str, parser)
.with_element_index(element_index.clone())
.with_archive_index(element_index)
.with_result_index(result_index)
.with_future_queue(future_queue);
builder.build().await
}
async fn build_aggregating_query_with_indexes(query_str: &str) -> crate::query::ContinuousQuery {
let function_registry = create_registry_with_sum();
let parser = Arc::new(CypherParser::new(function_registry.clone()));
let element_index = Arc::new(InMemoryElementIndex::new());
let result_index = Arc::new(InMemoryResultIndex::new());
let future_queue = Arc::new(InMemoryFutureQueue::new());
let builder = QueryBuilder::new(query_str, parser)
.with_function_registry(function_registry.clone())
.with_element_index(element_index.clone())
.with_archive_index(element_index)
.with_result_index(result_index)
.with_future_queue(future_queue);
builder.build().await
}
fn make_node(source: &str, id: &str, label: &str, props: serde_json::Value) -> SourceChange {
SourceChange::Insert {
element: Element::Node {
metadata: ElementMetadata {
reference: ElementReference::new(source, id),
labels: Arc::new([Arc::from(label)]),
effective_from: 1000,
},
properties: ElementPropertyMap::from(props),
},
}
}
fn make_update(source: &str, id: &str, label: &str, props: serde_json::Value) -> SourceChange {
SourceChange::Update {
element: Element::Node {
metadata: ElementMetadata {
reference: ElementReference::new(source, id),
labels: Arc::new([Arc::from(label)]),
effective_from: 2000,
},
properties: ElementPropertyMap::from(props),
},
}
}
fn make_delete(source: &str, id: &str) -> SourceChange {
SourceChange::Delete {
metadata: ElementMetadata {
reference: ElementReference::new(source, id),
labels: Arc::new([]),
effective_from: 3000,
},
}
}
#[tokio::test]
async fn non_aggregating_insert_has_nonzero_row_signature() {
let query = build_simple_query("MATCH (n:Sensor) RETURN n.name").await;
let change = make_node("test", "s1", "Sensor", json!({"name": "temp_1"}));
let result = query.process_source_change(change).await.unwrap();
assert_eq!(result.len(), 1);
assert!(matches!(
&result[0],
QueryPartEvaluationContext::Adding { .. }
));
assert_ne!(
result[0].row_signature(),
0,
"Adding result should have a non-zero row_signature"
);
}
#[tokio::test]
async fn non_aggregating_update_preserves_row_signature() {
let query = build_query_with_indexes("MATCH (n:Sensor) RETURN n.name").await;
let insert = make_node("test", "s1", "Sensor", json!({"name": "temp_1"}));
let insert_result = query.process_source_change(insert).await.unwrap();
assert_eq!(insert_result.len(), 1);
let insert_row_signature = insert_result[0].row_signature();
let update = make_update("test", "s1", "Sensor", json!({"name": "temp_1_updated"}));
let update_result = query.process_source_change(update).await.unwrap();
assert_eq!(update_result.len(), 1);
assert!(matches!(
&update_result[0],
QueryPartEvaluationContext::Updating { .. }
));
assert_eq!(
update_result[0].row_signature(),
insert_row_signature,
"Update of same node should preserve row_signature"
);
}
#[tokio::test]
async fn non_aggregating_delete_preserves_row_signature() {
let query = build_query_with_indexes("MATCH (n:Sensor) RETURN n.name").await;
let insert = make_node("test", "s1", "Sensor", json!({"name": "temp_1"}));
let insert_result = query.process_source_change(insert).await.unwrap();
assert_eq!(insert_result.len(), 1);
let insert_row_signature = insert_result[0].row_signature();
let delete = make_delete("test", "s1");
let delete_result = query.process_source_change(delete).await.unwrap();
assert_eq!(delete_result.len(), 1);
assert!(matches!(
&delete_result[0],
QueryPartEvaluationContext::Removing { .. }
));
assert_eq!(
delete_result[0].row_signature(),
insert_row_signature,
"Delete of same node should preserve row_signature"
);
}
#[tokio::test]
async fn different_solutions_get_different_row_signatures() {
let query = build_simple_query("MATCH (n:Sensor) RETURN n.name").await;
let change1 = make_node("test", "s1", "Sensor", json!({"name": "temp_1"}));
let result1 = query.process_source_change(change1).await.unwrap();
assert_eq!(result1.len(), 1);
let change2 = make_node("test", "s2", "Sensor", json!({"name": "temp_1"}));
let result2 = query.process_source_change(change2).await.unwrap();
assert_eq!(result2.len(), 1);
assert_ne!(
result1[0].row_signature(),
result2[0].row_signature(),
"Different graph solutions should produce different row_signatures even with identical projected values"
);
}
#[tokio::test]
async fn aggregating_query_has_nonzero_row_signature() {
let query = build_aggregating_query(
"MATCH (n:Sensor) RETURN n.region AS region, sum(n.value) AS total",
)
.await;
let change = make_node(
"test",
"s1",
"Sensor",
json!({"region": "west", "value": 10}),
);
let result = query.process_source_change(change).await.unwrap();
assert_eq!(result.len(), 1);
assert!(matches!(
&result[0],
QueryPartEvaluationContext::Aggregation { .. }
));
assert_ne!(
result[0].row_signature(),
0,
"Aggregation result should have a non-zero row_signature"
);
}
#[tokio::test]
async fn aggregating_same_group_preserves_row_signature() {
let query = build_aggregating_query_with_indexes(
"MATCH (n:Sensor) RETURN n.region AS region, sum(n.value) AS total",
)
.await;
let change1 = make_node(
"test",
"s1",
"Sensor",
json!({"region": "west", "value": 10}),
);
let result1 = query.process_source_change(change1).await.unwrap();
assert_eq!(result1.len(), 1);
let first_row_signature = result1[0].row_signature();
let change2 = make_node(
"test",
"s2",
"Sensor",
json!({"region": "west", "value": 20}),
);
let result2 = query.process_source_change(change2).await.unwrap();
assert_eq!(result2.len(), 1);
assert_eq!(
result2[0].row_signature(),
first_row_signature,
"Adding to same aggregation group should preserve row_signature"
);
}
#[tokio::test]
async fn aggregating_different_groups_get_different_row_signatures() {
let query = build_aggregating_query(
"MATCH (n:Sensor) RETURN n.region AS region, sum(n.value) AS total",
)
.await;
let change1 = make_node(
"test",
"s1",
"Sensor",
json!({"region": "west", "value": 10}),
);
let result1 = query.process_source_change(change1).await.unwrap();
assert_eq!(result1.len(), 1);
let change2 = make_node(
"test",
"s2",
"Sensor",
json!({"region": "east", "value": 20}),
);
let result2 = query.process_source_change(change2).await.unwrap();
assert_eq!(result2.len(), 1);
assert_ne!(
result1[0].row_signature(),
result2[0].row_signature(),
"Different GROUP BY values should produce different row_signatures"
);
}