mod row_signature_tests;
use std::sync::Arc;
use async_trait::async_trait;
use drasi_query_cypher::CypherParser;
use crate::{
evaluation::functions::FunctionRegistry,
in_memory_index::{
in_memory_element_index::InMemoryElementIndex, in_memory_future_queue::InMemoryFutureQueue,
in_memory_result_index::InMemoryResultIndex,
},
interface::{FutureQueueConsumer, IndexError},
models::{Element, ElementMetadata, ElementPropertyMap, ElementReference, SourceChange},
query::QueryBuilder,
};
#[tokio::test]
async fn dependency_leaks() {
let query_str = "MATCH (n:Person) RETURN n";
let function_registry = Arc::new(FunctionRegistry::new());
let parser = Arc::new(CypherParser::new(function_registry.clone()));
let mut builder = QueryBuilder::new(query_str, parser);
let element_index = Arc::new(InMemoryElementIndex::new());
let result_index = Arc::new(InMemoryResultIndex::new());
let future_queue = Arc::new(InMemoryFutureQueue::new());
builder = builder.with_element_index(element_index.clone());
builder = builder.with_archive_index(element_index.clone());
builder = builder.with_result_index(result_index.clone());
builder = builder.with_future_queue(future_queue.clone());
let query = builder.build().await;
let fq = Arc::new(TestFutureConsumer {});
query.set_future_consumer(fq).await;
query.terminate_future_consumer().await;
drop(query);
assert_eq!(Arc::strong_count(&element_index), 1);
assert_eq!(Arc::strong_count(&result_index), 1);
assert_eq!(Arc::strong_count(&future_queue), 1);
}
#[tokio::test]
async fn hook_failure_rolls_back_session() {
let query_str = "MATCH (n:Person) RETURN n.name";
let function_registry = Arc::new(FunctionRegistry::new());
let parser = Arc::new(CypherParser::new(function_registry.clone()));
let element_index = Arc::new(InMemoryElementIndex::new());
let result_index = Arc::new(InMemoryResultIndex::new());
let future_queue = Arc::new(InMemoryFutureQueue::new());
let builder = QueryBuilder::new(query_str, parser)
.with_element_index(element_index.clone())
.with_archive_index(element_index.clone())
.with_result_index(result_index.clone())
.with_future_queue(future_queue);
let query = builder.build().await;
let insert = SourceChange::Insert {
element: Element::Node {
metadata: ElementMetadata {
reference: ElementReference::new("test", "n1"),
labels: Arc::new([Arc::from("Person")]),
effective_from: 1000,
},
properties: ElementPropertyMap::from(
vec![(
"name".to_string(),
crate::evaluation::variable_value::VariableValue::String("Alice".to_string()),
)]
.into_iter()
.collect::<std::collections::BTreeMap<_, _>>(),
),
},
};
let result = query.process_source_change(insert).await;
assert!(result.is_ok(), "Initial insert should succeed");
assert_eq!(result.unwrap().len(), 1, "Should produce 1 result");
let insert2 = SourceChange::Insert {
element: Element::Node {
metadata: ElementMetadata {
reference: ElementReference::new("test", "n2"),
labels: Arc::new([Arc::from("Person")]),
effective_from: 2000,
},
properties: ElementPropertyMap::from(
vec![(
"name".to_string(),
crate::evaluation::variable_value::VariableValue::String("Bob".to_string()),
)]
.into_iter()
.collect::<std::collections::BTreeMap<_, _>>(),
),
},
};
let failing_hook = || async {
Err(IndexError::other(std::io::Error::other(
"simulated checkpoint failure",
)))
};
let result = query
.process_source_change_with_hook(insert2, failing_hook)
.await;
assert!(result.is_err(), "Hook failure should propagate as error");
let insert3 = SourceChange::Insert {
element: Element::Node {
metadata: ElementMetadata {
reference: ElementReference::new("test", "n3"),
labels: Arc::new([Arc::from("Person")]),
effective_from: 3000,
},
properties: ElementPropertyMap::from(
vec![(
"name".to_string(),
crate::evaluation::variable_value::VariableValue::String("Charlie".to_string()),
)]
.into_iter()
.collect::<std::collections::BTreeMap<_, _>>(),
),
},
};
let ok_hook = || async { Ok(()) };
let result = query
.process_source_change_with_hook(insert3, ok_hook)
.await;
assert!(
result.is_ok(),
"Successful hook should complete: {:?}",
result.err()
);
assert_eq!(
result.unwrap().len(),
1,
"Should produce 1 result after successful hook"
);
}
struct TestFutureConsumer {}
#[async_trait]
impl FutureQueueConsumer for TestFutureConsumer {
async fn on_items_due(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Ok(())
}
async fn on_error(&self, _error: Box<dyn std::error::Error + Send + Sync>) {}
fn now(&self) -> u64 {
0
}
}