#[cfg(test)]
mod tests {
use bytes::Bytes;
use drasi_core::in_memory_index::in_memory_checkpoint_store::InMemoryCheckpointStore;
use drasi_core::interface::CheckpointStore;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::channels::dispatcher::{ChangeDispatcher, ChannelChangeDispatcher};
use crate::channels::*;
use crate::config::{QueryConfig, QueryLanguage, SourceSubscriptionConfig};
use crate::queries::manager::DrasiQuery;
use crate::sources::base::{SourceBase, SourceBaseParams};
use crate::sources::Source;
use crate::test_helpers::wait_for_component_status;
use drasi_core::middleware::MiddlewareTypeRegistry;
struct CheckpointTestSource {
base: SourceBase,
received_resume_from: Arc<RwLock<Vec<Option<Bytes>>>>,
received_last_sequence: Arc<RwLock<Vec<Option<u64>>>>,
provide_position_handle: bool,
position_handle: Arc<std::sync::atomic::AtomicU64>,
}
impl CheckpointTestSource {
fn new(id: &str) -> anyhow::Result<Self> {
let base = SourceBase::new(SourceBaseParams::new(id))?;
Ok(Self {
base,
received_resume_from: Arc::new(RwLock::new(Vec::new())),
received_last_sequence: Arc::new(RwLock::new(Vec::new())),
provide_position_handle: false,
position_handle: Arc::new(std::sync::atomic::AtomicU64::new(0)),
})
}
fn with_position_handle(mut self) -> Self {
self.provide_position_handle = true;
self
}
async fn inject_change_with_position(
&self,
change: drasi_core::models::SourceChange,
position: Option<Bytes>,
) -> anyhow::Result<()> {
let mut wrapper = SourceEventWrapper::new(
self.base.get_id().to_string(),
SourceEvent::Change(change),
chrono::Utc::now(),
);
if let Some(pos) = position {
wrapper.set_source_position(pos);
}
self.base.dispatch_event(wrapper).await
}
async fn get_resume_from_history(&self) -> Vec<Option<Bytes>> {
self.received_resume_from.read().await.clone()
}
async fn get_last_sequence_history(&self) -> Vec<Option<u64>> {
self.received_last_sequence.read().await.clone()
}
fn get_position_handle_value(&self) -> u64 {
self.position_handle
.load(std::sync::atomic::Ordering::Acquire)
}
}
#[async_trait::async_trait]
impl Source for CheckpointTestSource {
fn id(&self) -> &str {
self.base.get_id()
}
fn type_name(&self) -> &str {
"checkpoint-test"
}
fn properties(&self) -> std::collections::HashMap<String, serde_json::Value> {
std::collections::HashMap::new()
}
fn auto_start(&self) -> bool {
self.base.get_auto_start()
}
async fn start(&self) -> anyhow::Result<()> {
self.base
.set_status(ComponentStatus::Starting, Some("Starting".to_string()))
.await;
self.base
.set_status(ComponentStatus::Running, Some("Running".to_string()))
.await;
Ok(())
}
async fn stop(&self) -> anyhow::Result<()> {
self.base
.set_status(ComponentStatus::Stopping, Some("Stopping".to_string()))
.await;
self.base
.set_status(ComponentStatus::Stopped, Some("Stopped".to_string()))
.await;
Ok(())
}
async fn status(&self) -> ComponentStatus {
self.base.status_handle().get_status().await
}
async fn subscribe(
&self,
settings: crate::config::SourceSubscriptionSettings,
) -> anyhow::Result<SubscriptionResponse> {
self.base.apply_subscription_settings(&settings);
self.received_resume_from
.write()
.await
.push(settings.resume_from.clone());
self.received_last_sequence
.write()
.await
.push(settings.last_sequence);
let receiver = self.base.create_streaming_receiver().await?;
Ok(SubscriptionResponse {
query_id: settings.query_id,
source_id: self.id().to_string(),
receiver,
bootstrap_receiver: None,
position_handle: if self.provide_position_handle {
Some(self.position_handle.clone())
} else {
None
},
bootstrap_result_receiver: None,
})
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
async fn initialize(&self, context: crate::context::SourceRuntimeContext) {
self.base.initialize(context).await;
}
}
fn create_query_config(id: &str, sources: Vec<String>) -> QueryConfig {
QueryConfig {
id: id.to_string(),
query: "MATCH (n:Person) RETURN n.name".to_string(),
query_language: QueryLanguage::Cypher,
middleware: vec![],
sources: sources
.into_iter()
.map(|source_id| SourceSubscriptionConfig {
nodes: vec![],
relations: vec![],
source_id,
pipeline: vec![],
})
.collect(),
auto_start: true,
joins: None,
enable_bootstrap: false,
bootstrap_buffer_size: 100,
priority_queue_capacity: None,
dispatch_buffer_capacity: None,
dispatch_mode: None,
storage_backend: None,
recovery_policy: None,
outbox_capacity: 1000,
bootstrap_timeout_secs: 300,
}
}
async fn create_test_env() -> (
Arc<crate::queries::QueryManager>,
Arc<crate::sources::SourceManager>,
Arc<tokio::sync::RwLock<crate::component_graph::ComponentGraph>>,
) {
let log_registry = crate::managers::get_or_init_global_registry();
let (graph, update_rx) = crate::component_graph::ComponentGraph::new("test-instance");
let update_tx = graph.update_sender();
let graph = Arc::new(tokio::sync::RwLock::new(graph));
{
let graph_clone = graph.clone();
tokio::spawn(async move {
let mut rx = update_rx;
while let Some(update) = rx.recv().await {
let mut g = graph_clone.write().await;
g.apply_update(update);
}
});
}
let source_manager = Arc::new(crate::sources::SourceManager::new(
"test-instance",
log_registry.clone(),
graph.clone(),
update_tx.clone(),
));
let index_factory = Arc::new(crate::indexes::IndexFactory::new(vec![], None));
let middleware_registry = Arc::new(MiddlewareTypeRegistry::new());
let query_manager = Arc::new(crate::queries::QueryManager::new(
"test-instance",
source_manager.clone(),
index_factory,
middleware_registry,
log_registry,
graph.clone(),
update_tx,
None,
));
(query_manager, source_manager, graph)
}
async fn add_source(
source_manager: &crate::sources::SourceManager,
graph: &tokio::sync::RwLock<crate::component_graph::ComponentGraph>,
source: impl Source + 'static,
) -> anyhow::Result<()> {
let source_id = source.id().to_string();
let source_type = source.type_name().to_string();
let auto_start = source.auto_start();
{
let mut g = graph.write().await;
let mut metadata = std::collections::HashMap::new();
metadata.insert("kind".to_string(), source_type);
metadata.insert("autoStart".to_string(), auto_start.to_string());
g.register_source(&source_id, metadata)?;
}
source_manager.provision_source(source).await
}
async fn add_query(
manager: &crate::queries::QueryManager,
graph: &tokio::sync::RwLock<crate::component_graph::ComponentGraph>,
config: QueryConfig,
) -> anyhow::Result<()> {
{
let mut g = graph.write().await;
let source_ids: Vec<String> =
config.sources.iter().map(|s| s.source_id.clone()).collect();
for sid in &source_ids {
if !g.contains(sid) {
g.register_source(sid, std::collections::HashMap::new())?;
}
}
let mut metadata = std::collections::HashMap::new();
metadata.insert("query".to_string(), config.query.clone());
g.register_query(&config.id, metadata, &source_ids)?;
}
manager.provision_query(config).await
}
#[tokio::test]
async fn test_sequence_assignment_monotonic() {
let source = CheckpointTestSource::new("seq-src").unwrap();
let (update_tx, _update_rx) = tokio::sync::mpsc::channel(16);
source
.initialize(crate::context::SourceRuntimeContext {
instance_id: "test".to_string(),
source_id: "seq-src".to_string(),
update_tx,
state_store: None,
identity_provider: None,
})
.await;
source.start().await.unwrap();
let settings = crate::config::SourceSubscriptionSettings {
source_id: "seq-src".to_string(),
enable_bootstrap: false,
query_id: "q1".to_string(),
nodes: std::collections::HashSet::new(),
relations: std::collections::HashSet::new(),
resume_from: None,
request_position_handle: false,
last_sequence: None,
};
let sub = source.subscribe(settings).await.unwrap();
let mut receiver = sub.receiver;
let positions =
vec![Some(Bytes::from_static(b"pos-A")), Some(Bytes::from_static(b"pos-B")), None];
for (i, pos) in positions.into_iter().enumerate() {
let change = drasi_core::models::SourceChange::Insert {
element: drasi_core::models::Element::Node {
metadata: drasi_core::models::ElementMetadata {
reference: drasi_core::models::ElementReference::new(
"src",
&format!("n{i}"),
),
labels: Arc::new([Arc::from("Person")]),
effective_from: 1000 + i as u64,
},
properties: drasi_core::models::ElementPropertyMap::new(),
},
};
source
.inject_change_with_position(change, pos)
.await
.unwrap();
}
let mut sequences = Vec::new();
for _ in 0..3 {
let event = receiver.recv().await.unwrap();
sequences.push(event.sequence.unwrap());
}
assert_eq!(sequences, vec![1, 2, 3]);
}
#[tokio::test]
async fn test_source_position_preserved_through_dispatch() {
let source = CheckpointTestSource::new("pos-src").unwrap();
let (update_tx, _update_rx) = tokio::sync::mpsc::channel(16);
source
.initialize(crate::context::SourceRuntimeContext {
instance_id: "test".to_string(),
source_id: "pos-src".to_string(),
update_tx,
state_store: None,
identity_provider: None,
})
.await;
source.start().await.unwrap();
let settings = crate::config::SourceSubscriptionSettings {
source_id: "pos-src".to_string(),
enable_bootstrap: false,
query_id: "q1".to_string(),
nodes: std::collections::HashSet::new(),
relations: std::collections::HashSet::new(),
resume_from: None,
request_position_handle: false,
last_sequence: None,
};
let sub = source.subscribe(settings).await.unwrap();
let mut receiver = sub.receiver;
let mssql_pos = Bytes::from([0xDE, 0xAD, 0xBE, 0xEF].repeat(5));
let change = drasi_core::models::SourceChange::Insert {
element: drasi_core::models::Element::Node {
metadata: drasi_core::models::ElementMetadata {
reference: drasi_core::models::ElementReference::new("src", "node1"),
labels: Arc::new([Arc::from("Item")]),
effective_from: 2000,
},
properties: drasi_core::models::ElementPropertyMap::new(),
},
};
source
.inject_change_with_position(change, Some(mssql_pos.clone()))
.await
.unwrap();
let event = receiver.recv().await.unwrap();
assert_eq!(event.source_position, Some(mssql_pos));
assert!(event.sequence.is_some());
}
#[tokio::test]
async fn test_checkpoint_persistence_end_to_end() {
let (query_manager, source_manager, graph) = create_test_env().await;
let mut event_rx = graph.read().await.subscribe();
let source = CheckpointTestSource::new("e2e-source").unwrap();
add_source(&source_manager, &graph, source).await.unwrap();
source_manager
.start_source("e2e-source".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"e2e-source",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let config = create_query_config("e2e-query", vec!["e2e-source".to_string()]);
add_query(&query_manager, &graph, config).await.unwrap();
query_manager
.start_query("e2e-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"e2e-query",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let source_instance = source_manager
.get_source_instance("e2e-source")
.await
.unwrap();
let test_source = source_instance
.as_any()
.downcast_ref::<CheckpointTestSource>()
.unwrap();
let cosmos_position = Bytes::from(b"cosmos-resume-token-abc123xyz".to_vec());
let change = drasi_core::models::SourceChange::Insert {
element: drasi_core::models::Element::Node {
metadata: drasi_core::models::ElementMetadata {
reference: drasi_core::models::ElementReference::new("e2e-source", "person1"),
labels: Arc::new([Arc::from("Person")]),
effective_from: 1000,
},
properties: drasi_core::models::ElementPropertyMap::from(
vec![(
"name".to_string(),
drasi_core::evaluation::variable_value::VariableValue::String(
"Alice".to_string(),
),
)]
.into_iter()
.collect::<std::collections::BTreeMap<_, _>>(),
),
},
};
test_source
.inject_change_with_position(change, Some(cosmos_position.clone()))
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
query_manager
.stop_query("e2e-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"e2e-query",
ComponentStatus::Stopped,
std::time::Duration::from_secs(5),
)
.await;
query_manager
.start_query("e2e-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"e2e-query",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let resume_history = test_source.get_resume_from_history().await;
assert!(
resume_history.len() >= 2,
"Expected at least 2 subscribe calls, got {}",
resume_history.len()
);
assert_eq!(
resume_history[0], None,
"First subscribe should have no resume_from"
);
assert_eq!(
resume_history[1], None,
"Second subscribe should have no resume_from (in-memory backend, needs re-bootstrap)"
);
let query_instance = query_manager.get_query_instance("e2e-query").await.unwrap();
let cp_store = query_instance
.as_any()
.downcast_ref::<crate::queries::DrasiQuery>()
.unwrap()
.get_checkpoint_store()
.await;
if let Some(store) = cp_store {
let checkpoints = store.read_all_checkpoints().await.unwrap();
if let Some(cp) = checkpoints.get("e2e-source") {
assert_eq!(
cp.source_position.as_ref(),
Some(&cosmos_position),
"Checkpoint store should contain the persisted position"
);
}
}
}
#[tokio::test]
async fn test_in_memory_checkpoint_various_sizes() {
let store = InMemoryCheckpointStore::new();
let pg_pos = Bytes::copy_from_slice(&42u64.to_le_bytes());
store
.stage_checkpoint("pg-source", 1, Some(&pg_pos))
.await
.unwrap();
let cp = store.read_checkpoint("pg-source").await.unwrap().unwrap();
assert_eq!(cp.sequence, 1);
assert_eq!(cp.source_position.as_ref(), Some(&pg_pos));
let mssql_pos = Bytes::from(vec![0xAA; 20]);
store
.stage_checkpoint("mssql-source", 2, Some(&mssql_pos))
.await
.unwrap();
let cp = store
.read_checkpoint("mssql-source")
.await
.unwrap()
.unwrap();
assert_eq!(cp.sequence, 2);
assert_eq!(cp.source_position.as_ref(), Some(&mssql_pos));
let all = store.read_all_checkpoints().await.unwrap();
assert_eq!(all["pg-source"].source_position.as_ref(), Some(&pg_pos));
let mongo_pos = Bytes::from(vec![0xBB; 80]);
store
.stage_checkpoint("mongo-source", 3, Some(&mongo_pos))
.await
.unwrap();
let cp = store
.read_checkpoint("mongo-source")
.await
.unwrap()
.unwrap();
assert_eq!(cp.sequence, 3);
assert_eq!(cp.source_position.as_ref(), Some(&mongo_pos));
let cosmos_pos = Bytes::from(vec![0xCC; 120]);
store
.stage_checkpoint("cosmos-source", 4, Some(&cosmos_pos))
.await
.unwrap();
let cp = store
.read_checkpoint("cosmos-source")
.await
.unwrap()
.unwrap();
assert_eq!(cp.sequence, 4);
assert_eq!(cp.source_position.as_ref(), Some(&cosmos_pos));
store
.stage_checkpoint("volatile-source", 5, None)
.await
.unwrap();
let cp = store
.read_checkpoint("volatile-source")
.await
.unwrap()
.unwrap();
assert_eq!(cp.sequence, 5);
assert_eq!(cp.source_position, None);
let all = store.read_all_checkpoints().await.unwrap();
assert_eq!(
all["cosmos-source"].source_position.as_ref(),
Some(&cosmos_pos)
);
}
#[tokio::test]
async fn test_checkpoint_and_sequence_consistency() {
let store = InMemoryCheckpointStore::new();
let pos = Bytes::from_static(b"test-position");
store
.stage_checkpoint("src-a", 42, Some(&pos))
.await
.unwrap();
let cp = store.read_checkpoint("src-a").await.unwrap().unwrap();
assert_eq!(cp.sequence, 42);
assert_eq!(cp.source_position.as_ref(), Some(&pos));
store.stage_checkpoint("src-b", 99, None).await.unwrap();
let cp = store.read_checkpoint("src-b").await.unwrap().unwrap();
assert_eq!(cp.sequence, 99);
assert_eq!(cp.source_position, None);
}
#[tokio::test]
async fn test_initial_subscribe_no_checkpoint() {
let (query_manager, source_manager, graph) = create_test_env().await;
let mut event_rx = graph.read().await.subscribe();
let source = CheckpointTestSource::new("fresh-source").unwrap();
add_source(&source_manager, &graph, source).await.unwrap();
source_manager
.start_source("fresh-source".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"fresh-source",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let config = create_query_config("fresh-query", vec!["fresh-source".to_string()]);
add_query(&query_manager, &graph, config).await.unwrap();
query_manager
.start_query("fresh-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"fresh-query",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let source_instance = source_manager
.get_source_instance("fresh-source")
.await
.unwrap();
let test_source = source_instance
.as_any()
.downcast_ref::<CheckpointTestSource>()
.unwrap();
let history = test_source.get_resume_from_history().await;
assert!(!history.is_empty());
assert_eq!(
history[0], None,
"Initial subscribe should have no resume_from"
);
}
#[tokio::test]
async fn test_dedup_skips_duplicate_events_after_restart() {
let (query_manager, source_manager, graph) = create_test_env().await;
let mut event_rx = graph.read().await.subscribe();
let source = CheckpointTestSource::new("dedup-source").unwrap();
add_source(&source_manager, &graph, source).await.unwrap();
source_manager
.start_source("dedup-source".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"dedup-source",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let config = create_query_config("dedup-query", vec!["dedup-source".to_string()]);
add_query(&query_manager, &graph, config).await.unwrap();
query_manager
.start_query("dedup-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"dedup-query",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let source_instance = source_manager
.get_source_instance("dedup-source")
.await
.unwrap();
let test_source = source_instance
.as_any()
.downcast_ref::<CheckpointTestSource>()
.unwrap();
for i in 0..3 {
let change = drasi_core::models::SourceChange::Insert {
element: drasi_core::models::Element::Node {
metadata: drasi_core::models::ElementMetadata {
reference: drasi_core::models::ElementReference::new(
"dedup-source",
&format!("node{i}"),
),
labels: Arc::new([Arc::from("Person")]),
effective_from: 1000 + i as u64,
},
properties: drasi_core::models::ElementPropertyMap::from(
vec![(
"name".to_string(),
drasi_core::evaluation::variable_value::VariableValue::String(format!(
"person{i}"
)),
)]
.into_iter()
.collect::<std::collections::BTreeMap<_, _>>(),
),
},
};
test_source
.inject_change_with_position(change, Some(Bytes::from(format!("pos{i}"))))
.await
.unwrap();
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
query_manager
.stop_query("dedup-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"dedup-query",
ComponentStatus::Stopped,
std::time::Duration::from_secs(5),
)
.await;
query_manager
.start_query("dedup-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"dedup-query",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let new_change = drasi_core::models::SourceChange::Insert {
element: drasi_core::models::Element::Node {
metadata: drasi_core::models::ElementMetadata {
reference: drasi_core::models::ElementReference::new("dedup-source", "node3"),
labels: Arc::new([Arc::from("Person")]),
effective_from: 2000,
},
properties: drasi_core::models::ElementPropertyMap::from(
vec![(
"name".to_string(),
drasi_core::evaluation::variable_value::VariableValue::String(
"new-person".to_string(),
),
)]
.into_iter()
.collect::<std::collections::BTreeMap<_, _>>(),
),
},
};
test_source
.inject_change_with_position(new_change, Some(Bytes::from("pos3")))
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let query_instance = query_manager
.get_query_instance("dedup-query")
.await
.unwrap();
let drasi_query = query_instance
.as_any()
.downcast_ref::<DrasiQuery>()
.unwrap();
let result_index = drasi_query.get_checkpoint_store().await.unwrap();
let all_checkpoints = result_index.read_all_checkpoints().await.unwrap();
let max_seq = all_checkpoints
.values()
.map(|cp| cp.sequence)
.max()
.unwrap_or(0);
assert_eq!(
max_seq, 4,
"After restart + new event, checkpoint should be at sequence 4"
);
}
#[tokio::test]
async fn test_source_position_size_validation() {
let source = CheckpointTestSource::new("size-src").unwrap();
let (update_tx, _update_rx) = tokio::sync::mpsc::channel(16);
source
.initialize(crate::context::SourceRuntimeContext {
instance_id: "test".to_string(),
source_id: "size-src".to_string(),
update_tx,
state_store: None,
identity_provider: None,
})
.await;
source.start().await.unwrap();
let settings = crate::config::SourceSubscriptionSettings {
source_id: "size-src".to_string(),
enable_bootstrap: false,
query_id: "q1".to_string(),
nodes: std::collections::HashSet::new(),
relations: std::collections::HashSet::new(),
resume_from: None,
request_position_handle: false,
last_sequence: None,
};
let sub = source.subscribe(settings).await.unwrap();
let mut receiver = sub.receiver;
let pos_under_limit = Bytes::from(vec![0xAA; 65_535]);
let change = drasi_core::models::SourceChange::Insert {
element: drasi_core::models::Element::Node {
metadata: drasi_core::models::ElementMetadata {
reference: drasi_core::models::ElementReference::new("src", "n1"),
labels: Arc::new([Arc::from("Item")]),
effective_from: 1000,
},
properties: drasi_core::models::ElementPropertyMap::new(),
},
};
source
.inject_change_with_position(change, Some(pos_under_limit.clone()))
.await
.unwrap();
let event = receiver.recv().await.unwrap();
assert_eq!(
event.source_position,
Some(pos_under_limit),
"Position at 65,535 bytes should be preserved"
);
let pos_at_limit = Bytes::from(vec![0xBB; 65_536]);
let change2 = drasi_core::models::SourceChange::Insert {
element: drasi_core::models::Element::Node {
metadata: drasi_core::models::ElementMetadata {
reference: drasi_core::models::ElementReference::new("src", "n2"),
labels: Arc::new([Arc::from("Item")]),
effective_from: 1001,
},
properties: drasi_core::models::ElementPropertyMap::new(),
},
};
source
.inject_change_with_position(change2, Some(pos_at_limit.clone()))
.await
.unwrap();
let event2 = receiver.recv().await.unwrap();
assert_eq!(
event2.source_position,
Some(pos_at_limit),
"Position at exactly 65,536 bytes should be preserved (limit is >)"
);
let pos_over_limit = Bytes::from(vec![0xCC; 65_537]);
let change3 = drasi_core::models::SourceChange::Insert {
element: drasi_core::models::Element::Node {
metadata: drasi_core::models::ElementMetadata {
reference: drasi_core::models::ElementReference::new("src", "n3"),
labels: Arc::new([Arc::from("Item")]),
effective_from: 1002,
},
properties: drasi_core::models::ElementPropertyMap::new(),
},
};
source
.inject_change_with_position(change3, Some(pos_over_limit))
.await
.unwrap();
let event3 = receiver.recv().await.unwrap();
assert_eq!(
event3.source_position.as_ref().map(|p| p.len()),
Some(65_537),
"Oversized position should flow through dispatch (limit enforced at checkpoint)"
);
assert_eq!(event3.sequence.unwrap(), 3);
}
#[tokio::test]
async fn test_multi_source_checkpoint_isolation() {
let (query_manager, source_manager, graph) = create_test_env().await;
let mut event_rx = graph.read().await.subscribe();
let source_a = CheckpointTestSource::new("src-alpha").unwrap();
let source_b = CheckpointTestSource::new("src-beta").unwrap();
add_source(&source_manager, &graph, source_a).await.unwrap();
add_source(&source_manager, &graph, source_b).await.unwrap();
source_manager
.start_source("src-alpha".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"src-alpha",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
source_manager
.start_source("src-beta".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"src-beta",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let config = create_query_config(
"multi-query",
vec!["src-alpha".to_string(), "src-beta".to_string()],
);
add_query(&query_manager, &graph, config).await.unwrap();
query_manager
.start_query("multi-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"multi-query",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let src_a = source_manager
.get_source_instance("src-alpha")
.await
.unwrap();
let test_src_a = src_a
.as_any()
.downcast_ref::<CheckpointTestSource>()
.unwrap();
let change_a = drasi_core::models::SourceChange::Insert {
element: drasi_core::models::Element::Node {
metadata: drasi_core::models::ElementMetadata {
reference: drasi_core::models::ElementReference::new("src-alpha", "a1"),
labels: Arc::new([Arc::from("Person")]),
effective_from: 1000,
},
properties: drasi_core::models::ElementPropertyMap::from(
vec![(
"name".to_string(),
drasi_core::evaluation::variable_value::VariableValue::String(
"AlphaUser".to_string(),
),
)]
.into_iter()
.collect::<std::collections::BTreeMap<_, _>>(),
),
},
};
test_src_a
.inject_change_with_position(change_a, Some(Bytes::from("alpha-pos-final")))
.await
.unwrap();
let src_b = source_manager
.get_source_instance("src-beta")
.await
.unwrap();
let test_src_b = src_b
.as_any()
.downcast_ref::<CheckpointTestSource>()
.unwrap();
let change_b = drasi_core::models::SourceChange::Insert {
element: drasi_core::models::Element::Node {
metadata: drasi_core::models::ElementMetadata {
reference: drasi_core::models::ElementReference::new("src-beta", "b1"),
labels: Arc::new([Arc::from("Person")]),
effective_from: 2000,
},
properties: drasi_core::models::ElementPropertyMap::from(
vec![(
"name".to_string(),
drasi_core::evaluation::variable_value::VariableValue::String(
"BetaUser".to_string(),
),
)]
.into_iter()
.collect::<std::collections::BTreeMap<_, _>>(),
),
},
};
test_src_b
.inject_change_with_position(change_b, Some(Bytes::from("beta-pos-final")))
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
query_manager
.stop_query("multi-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"multi-query",
ComponentStatus::Stopped,
std::time::Duration::from_secs(5),
)
.await;
query_manager
.start_query("multi-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"multi-query",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let history_a = test_src_a.get_resume_from_history().await;
let history_b = test_src_b.get_resume_from_history().await;
assert!(
history_a.len() >= 2,
"Source A should have at least 2 subscribes, got {}",
history_a.len()
);
assert!(
history_b.len() >= 2,
"Source B should have at least 2 subscribes, got {}",
history_b.len()
);
assert_eq!(history_a[0], None);
assert_eq!(history_b[0], None);
assert_eq!(
history_a[1], None,
"Source A should not have resume_from (in-memory backend)"
);
assert_eq!(
history_b[1], None,
"Source B should not have resume_from (in-memory backend)"
);
let query_instance = query_manager
.get_query_instance("multi-query")
.await
.unwrap();
let cp_store = query_instance
.as_any()
.downcast_ref::<crate::queries::DrasiQuery>()
.unwrap()
.get_checkpoint_store()
.await;
if let Some(store) = cp_store {
let checkpoints = store.read_all_checkpoints().await.unwrap();
assert!(
checkpoints.contains_key("src-alpha"),
"Source A should have a checkpoint"
);
assert!(
checkpoints.contains_key("src-beta"),
"Source B should have a checkpoint"
);
}
}
#[tokio::test]
async fn test_position_handle_updated_after_checkpoint() {
let (query_manager, source_manager, graph) = create_test_env().await;
let mut event_rx = graph.read().await.subscribe();
let source = CheckpointTestSource::new("handle-source")
.unwrap()
.with_position_handle();
add_source(&source_manager, &graph, source).await.unwrap();
source_manager
.start_source("handle-source".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"handle-source",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let config = create_query_config("handle-query", vec!["handle-source".to_string()]);
add_query(&query_manager, &graph, config).await.unwrap();
query_manager
.start_query("handle-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"handle-query",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let source_instance = source_manager
.get_source_instance("handle-source")
.await
.unwrap();
let test_source = source_instance
.as_any()
.downcast_ref::<CheckpointTestSource>()
.unwrap();
assert_eq!(
test_source.get_position_handle_value(),
0,
"Position handle should be 0 before any events"
);
let change = drasi_core::models::SourceChange::Insert {
element: drasi_core::models::Element::Node {
metadata: drasi_core::models::ElementMetadata {
reference: drasi_core::models::ElementReference::new("handle-source", "h1"),
labels: Arc::new([Arc::from("Person")]),
effective_from: 1000,
},
properties: drasi_core::models::ElementPropertyMap::from(
vec![(
"name".to_string(),
drasi_core::evaluation::variable_value::VariableValue::String(
"HandleTest".to_string(),
),
)]
.into_iter()
.collect::<std::collections::BTreeMap<_, _>>(),
),
},
};
test_source
.inject_change_with_position(change, Some(Bytes::from("handle-pos")))
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let handle_val = test_source.get_position_handle_value();
assert_eq!(
handle_val, 1,
"Position handle should be 1 after first event processed"
);
let change2 = drasi_core::models::SourceChange::Insert {
element: drasi_core::models::Element::Node {
metadata: drasi_core::models::ElementMetadata {
reference: drasi_core::models::ElementReference::new("handle-source", "h2"),
labels: Arc::new([Arc::from("Person")]),
effective_from: 2000,
},
properties: drasi_core::models::ElementPropertyMap::from(
vec![(
"name".to_string(),
drasi_core::evaluation::variable_value::VariableValue::String(
"HandleTest2".to_string(),
),
)]
.into_iter()
.collect::<std::collections::BTreeMap<_, _>>(),
),
},
};
test_source
.inject_change_with_position(change2, Some(Bytes::from("handle-pos-2")))
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let handle_val2 = test_source.get_position_handle_value();
assert_eq!(
handle_val2, 2,
"Position handle should be 2 after second event processed"
);
}
struct PersistentInMemoryCheckpointStore {
inner: drasi_core::in_memory_index::in_memory_checkpoint_store::InMemoryCheckpointStore,
}
impl PersistentInMemoryCheckpointStore {
fn new() -> Self {
Self {
inner: drasi_core::in_memory_index::in_memory_checkpoint_store::InMemoryCheckpointStore::new(),
}
}
}
#[async_trait::async_trait]
impl CheckpointStore for PersistentInMemoryCheckpointStore {
fn is_persistent(&self) -> bool {
true
}
async fn stage_checkpoint(
&self,
source_id: &str,
sequence: u64,
source_position: Option<&Bytes>,
) -> Result<(), drasi_core::interface::IndexError> {
self.inner
.stage_checkpoint(source_id, sequence, source_position)
.await
}
async fn read_checkpoint(
&self,
source_id: &str,
) -> Result<
Option<drasi_core::interface::SourceCheckpoint>,
drasi_core::interface::IndexError,
> {
self.inner.read_checkpoint(source_id).await
}
async fn read_all_checkpoints(
&self,
) -> Result<
std::collections::HashMap<String, drasi_core::interface::SourceCheckpoint>,
drasi_core::interface::IndexError,
> {
self.inner.read_all_checkpoints().await
}
async fn clear_checkpoints(&self) -> Result<(), drasi_core::interface::IndexError> {
self.inner.clear_checkpoints().await
}
async fn write_config_hash(
&self,
hash: u64,
) -> Result<(), drasi_core::interface::IndexError> {
self.inner.write_config_hash(hash).await
}
async fn read_config_hash(&self) -> Result<Option<u64>, drasi_core::interface::IndexError> {
self.inner.read_config_hash().await
}
}
struct MockPersistentPlugin {
stores: RwLock<std::collections::HashMap<String, Arc<PersistentInMemoryCheckpointStore>>>,
}
impl MockPersistentPlugin {
fn new() -> Self {
Self {
stores: RwLock::new(std::collections::HashMap::new()),
}
}
}
#[async_trait::async_trait]
impl crate::indexes::IndexBackendPlugin for MockPersistentPlugin {
async fn create_indexes(
&self,
query_id: &str,
) -> Result<drasi_core::interface::CreatedIndexes, drasi_core::interface::IndexError>
{
use drasi_core::in_memory_index::in_memory_element_index::InMemoryElementIndex;
use drasi_core::in_memory_index::in_memory_future_queue::InMemoryFutureQueue;
use drasi_core::in_memory_index::in_memory_result_index::InMemoryResultIndex;
use drasi_core::interface::{IndexSet, NoOpSessionControl};
let checkpoint_store = {
let mut map = self.stores.write().await;
map.entry(query_id.to_string())
.or_insert_with(|| Arc::new(PersistentInMemoryCheckpointStore::new()))
.clone()
};
let element_index = Arc::new(InMemoryElementIndex::new());
Ok(drasi_core::interface::CreatedIndexes {
set: IndexSet {
element_index: element_index.clone(),
archive_index: element_index,
result_index: Arc::new(InMemoryResultIndex::new()),
future_queue: Arc::new(InMemoryFutureQueue::new()),
session_control: Arc::new(NoOpSessionControl),
},
checkpoint_store: Some(checkpoint_store),
live_results_writer: None,
outbox_writer: None,
})
}
fn is_volatile(&self) -> bool {
false
}
}
async fn create_test_env_with_persistent_backend() -> (
Arc<crate::queries::QueryManager>,
Arc<crate::sources::SourceManager>,
Arc<tokio::sync::RwLock<crate::component_graph::ComponentGraph>>,
) {
let log_registry = crate::managers::get_or_init_global_registry();
let (graph, update_rx) = crate::component_graph::ComponentGraph::new("test-instance");
let update_tx = graph.update_sender();
let graph = Arc::new(tokio::sync::RwLock::new(graph));
{
let graph_clone = graph.clone();
tokio::spawn(async move {
let mut rx = update_rx;
while let Some(update) = rx.recv().await {
let mut g = graph_clone.write().await;
g.apply_update(update);
}
});
}
let source_manager = Arc::new(crate::sources::SourceManager::new(
"test-instance",
log_registry.clone(),
graph.clone(),
update_tx.clone(),
));
let index_factory = Arc::new(crate::indexes::IndexFactory::new(
vec![],
Some(Arc::new(MockPersistentPlugin::new())),
));
let middleware_registry = Arc::new(MiddlewareTypeRegistry::new());
let query_manager = Arc::new(crate::queries::QueryManager::new(
"test-instance",
source_manager.clone(),
index_factory,
middleware_registry,
log_registry,
graph.clone(),
update_tx,
None,
));
(query_manager, source_manager, graph)
}
fn create_persistent_query_config(id: &str, sources: Vec<String>) -> QueryConfig {
use crate::indexes::config::{StorageBackendRef, StorageBackendSpec};
QueryConfig {
id: id.to_string(),
query: "MATCH (n:Person) RETURN n.name".to_string(),
query_language: QueryLanguage::Cypher,
middleware: vec![],
sources: sources
.into_iter()
.map(|source_id| SourceSubscriptionConfig {
nodes: vec![],
relations: vec![],
source_id,
pipeline: vec![],
})
.collect(),
auto_start: true,
joins: None,
enable_bootstrap: false,
bootstrap_buffer_size: 100,
priority_queue_capacity: None,
dispatch_buffer_capacity: None,
dispatch_mode: None,
storage_backend: Some(StorageBackendRef::Inline(StorageBackendSpec::RocksDb {
path: "/tmp/test-drasi".to_string(),
enable_archive: false,
direct_io: false,
})),
recovery_policy: None,
outbox_capacity: 1000,
bootstrap_timeout_secs: 300,
}
}
#[tokio::test]
async fn test_config_hash_match_preserves_checkpoints() {
let (query_manager, source_manager, graph) =
create_test_env_with_persistent_backend().await;
let mut event_rx = graph.read().await.subscribe();
let source = CheckpointTestSource::new("hash-src").unwrap();
add_source(&source_manager, &graph, source).await.unwrap();
source_manager
.start_source("hash-src".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"hash-src",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let config = create_persistent_query_config("hash-query", vec!["hash-src".to_string()]);
add_query(&query_manager, &graph, config).await.unwrap();
query_manager
.start_query("hash-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"hash-query",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let source_instance = source_manager
.get_source_instance("hash-src")
.await
.unwrap();
let test_source = source_instance
.as_any()
.downcast_ref::<CheckpointTestSource>()
.unwrap();
let change = drasi_core::models::SourceChange::Insert {
element: drasi_core::models::Element::Node {
metadata: drasi_core::models::ElementMetadata {
reference: drasi_core::models::ElementReference::new("hash-src", "p1"),
labels: Arc::new([Arc::from("Person")]),
effective_from: 1000,
},
properties: drasi_core::models::ElementPropertyMap::from(
vec![(
"name".to_string(),
drasi_core::evaluation::variable_value::VariableValue::String(
"Alice".to_string(),
),
)]
.into_iter()
.collect::<std::collections::BTreeMap<_, _>>(),
),
},
};
test_source
.inject_change_with_position(change, Some(Bytes::from("pos-1")))
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let query_instance = query_manager
.get_query_instance("hash-query")
.await
.unwrap();
let cp_store = query_instance
.as_any()
.downcast_ref::<DrasiQuery>()
.unwrap()
.get_checkpoint_store()
.await
.expect("Should have checkpoint store");
let cp = cp_store.read_checkpoint("hash-src").await.unwrap();
assert!(
cp.is_some(),
"Checkpoint should exist after processing event"
);
let cp_seq_before = cp.unwrap().sequence;
query_manager
.stop_query("hash-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"hash-query",
ComponentStatus::Stopped,
std::time::Duration::from_secs(5),
)
.await;
query_manager
.start_query("hash-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"hash-query",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let resume_history = test_source.get_resume_from_history().await;
assert!(
resume_history.len() >= 2,
"Expected at least 2 subscribe calls"
);
assert_eq!(
resume_history[1],
Some(Bytes::from("pos-1")),
"Second subscribe should have resume_from from checkpoint"
);
let query_instance = query_manager
.get_query_instance("hash-query")
.await
.unwrap();
let cp_store = query_instance
.as_any()
.downcast_ref::<DrasiQuery>()
.unwrap()
.get_checkpoint_store()
.await
.expect("Should have checkpoint store");
let cp = cp_store
.read_checkpoint("hash-src")
.await
.unwrap()
.expect("Checkpoint should still exist");
assert_eq!(
cp.sequence, cp_seq_before,
"Checkpoint sequence should be preserved"
);
}
#[tokio::test]
async fn test_config_hash_mismatch_clears_checkpoints() {
let (query_manager, source_manager, graph) =
create_test_env_with_persistent_backend().await;
let mut event_rx = graph.read().await.subscribe();
let source = CheckpointTestSource::new("mismatch-src").unwrap();
add_source(&source_manager, &graph, source).await.unwrap();
source_manager
.start_source("mismatch-src".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"mismatch-src",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let config =
create_persistent_query_config("mismatch-query", vec!["mismatch-src".to_string()]);
add_query(&query_manager, &graph, config).await.unwrap();
query_manager
.start_query("mismatch-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"mismatch-query",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let source_instance = source_manager
.get_source_instance("mismatch-src")
.await
.unwrap();
let test_source = source_instance
.as_any()
.downcast_ref::<CheckpointTestSource>()
.unwrap();
let change = drasi_core::models::SourceChange::Insert {
element: drasi_core::models::Element::Node {
metadata: drasi_core::models::ElementMetadata {
reference: drasi_core::models::ElementReference::new("mismatch-src", "p1"),
labels: Arc::new([Arc::from("Person")]),
effective_from: 1000,
},
properties: drasi_core::models::ElementPropertyMap::from(
vec![(
"name".to_string(),
drasi_core::evaluation::variable_value::VariableValue::String(
"Bob".to_string(),
),
)]
.into_iter()
.collect::<std::collections::BTreeMap<_, _>>(),
),
},
};
test_source
.inject_change_with_position(change, Some(Bytes::from("mismatch-pos")))
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let query_instance = query_manager
.get_query_instance("mismatch-query")
.await
.unwrap();
let cp_store = query_instance
.as_any()
.downcast_ref::<DrasiQuery>()
.unwrap()
.get_checkpoint_store()
.await
.expect("Should have checkpoint store");
let cp = cp_store.read_checkpoint("mismatch-src").await.unwrap();
assert!(cp.is_some(), "Checkpoint should exist before config change");
let hash_before = cp_store.read_config_hash().await.unwrap();
assert!(
hash_before.is_some(),
"Config hash should be written on first start"
);
query_manager
.stop_query("mismatch-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"mismatch-query",
ComponentStatus::Stopped,
std::time::Duration::from_secs(5),
)
.await;
query_manager
.update_query(
"mismatch-query".to_string(),
QueryConfig {
id: "mismatch-query".to_string(),
query: "MATCH (n:Person) RETURN n.name, n.age".to_string(),
query_language: QueryLanguage::Cypher,
middleware: vec![],
sources: vec![SourceSubscriptionConfig {
nodes: vec![],
relations: vec![],
source_id: "mismatch-src".to_string(),
pipeline: vec![],
}],
auto_start: true,
joins: None,
enable_bootstrap: false,
bootstrap_buffer_size: 100,
priority_queue_capacity: None,
dispatch_buffer_capacity: None,
dispatch_mode: None,
storage_backend: Some(crate::indexes::config::StorageBackendRef::Inline(
crate::indexes::config::StorageBackendSpec::RocksDb {
path: "/tmp/test-drasi".to_string(),
enable_archive: false,
direct_io: false,
},
)),
recovery_policy: None,
outbox_capacity: 1000,
bootstrap_timeout_secs: 300,
},
)
.await
.unwrap();
query_manager
.start_query("mismatch-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"mismatch-query",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let resume_history = test_source.get_resume_from_history().await;
assert!(
resume_history.len() >= 2,
"Expected at least 2 subscribe calls"
);
assert_eq!(
resume_history[1], None,
"After config change, subscribe should have no resume_from (checkpoints cleared)"
);
let query_instance = query_manager
.get_query_instance("mismatch-query")
.await
.unwrap();
let cp_store = query_instance
.as_any()
.downcast_ref::<DrasiQuery>()
.unwrap()
.get_checkpoint_store()
.await
.expect("Should have checkpoint store");
let all_cp = cp_store.read_all_checkpoints().await.unwrap();
assert!(
all_cp.is_empty(),
"Checkpoints should be cleared after config change"
);
let hash_after = cp_store.read_config_hash().await.unwrap();
assert!(hash_after.is_some(), "New config hash should be written");
assert_ne!(
hash_before, hash_after,
"Config hash should differ after config change"
);
}
#[tokio::test]
async fn test_sequence_recovery_continues_after_restart() {
let (query_manager, source_manager, graph) = create_test_env().await;
let mut event_rx = graph.read().await.subscribe();
let source = CheckpointTestSource::new("recov-source").unwrap();
add_source(&source_manager, &graph, source).await.unwrap();
source_manager
.start_source("recov-source".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"recov-source",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let config = create_query_config("recov-query", vec!["recov-source".to_string()]);
add_query(&query_manager, &graph, config).await.unwrap();
query_manager
.start_query("recov-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"recov-query",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let source_instance = source_manager
.get_source_instance("recov-source")
.await
.unwrap();
let test_source = source_instance
.as_any()
.downcast_ref::<CheckpointTestSource>()
.unwrap();
for i in 0..3 {
let change = drasi_core::models::SourceChange::Insert {
element: drasi_core::models::Element::Node {
metadata: drasi_core::models::ElementMetadata {
reference: drasi_core::models::ElementReference::new(
"recov-source",
&format!("r{i}"),
),
labels: Arc::new([Arc::from("Person")]),
effective_from: 1000 + i as u64,
},
properties: drasi_core::models::ElementPropertyMap::from(
vec![(
"name".to_string(),
drasi_core::evaluation::variable_value::VariableValue::String(format!(
"recov{i}"
)),
)]
.into_iter()
.collect::<std::collections::BTreeMap<_, _>>(),
),
},
};
test_source
.inject_change_with_position(change, Some(Bytes::from(format!("rpos{i}"))))
.await
.unwrap();
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let query_instance = query_manager
.get_query_instance("recov-query")
.await
.unwrap();
let drasi_query = query_instance
.as_any()
.downcast_ref::<DrasiQuery>()
.unwrap();
let checkpoint_store = drasi_query.get_checkpoint_store().await.unwrap();
let cp_before = checkpoint_store
.read_checkpoint("recov-source")
.await
.unwrap()
.unwrap();
assert_eq!(cp_before.sequence, 3);
query_manager
.stop_query("recov-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"recov-query",
ComponentStatus::Stopped,
std::time::Duration::from_secs(5),
)
.await;
query_manager
.start_query("recov-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"recov-query",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let last_seq_history = test_source.get_last_sequence_history().await;
assert!(
last_seq_history.len() >= 2,
"Expected at least 2 subscribe calls"
);
assert_eq!(
last_seq_history[0], None,
"First subscribe should have no last_sequence"
);
assert_eq!(
last_seq_history[1], None,
"Second subscribe should have no last_sequence (in-memory backend)"
);
let new_change = drasi_core::models::SourceChange::Insert {
element: drasi_core::models::Element::Node {
metadata: drasi_core::models::ElementMetadata {
reference: drasi_core::models::ElementReference::new("recov-source", "r-new"),
labels: Arc::new([Arc::from("Person")]),
effective_from: 3000,
},
properties: drasi_core::models::ElementPropertyMap::from(
vec![(
"name".to_string(),
drasi_core::evaluation::variable_value::VariableValue::String(
"new-after-restart".to_string(),
),
)]
.into_iter()
.collect::<std::collections::BTreeMap<_, _>>(),
),
},
};
test_source
.inject_change_with_position(new_change, Some(Bytes::from("rpos-new")))
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let cp_after = checkpoint_store
.read_checkpoint("recov-source")
.await
.unwrap();
assert!(
cp_after.is_some(),
"Checkpoint should be updated for the new event"
);
}
struct HandoverTestSource {
base: SourceBase,
bootstrap_rx: Arc<tokio::sync::Mutex<Option<BootstrapEventReceiver>>>,
bootstrap_result_rx: Arc<
tokio::sync::Mutex<
Option<
tokio::sync::oneshot::Receiver<
anyhow::Result<crate::bootstrap::BootstrapResult>,
>,
>,
>,
>,
}
impl HandoverTestSource {
fn new(
id: &str,
bootstrap_rx: BootstrapEventReceiver,
bootstrap_result_rx: tokio::sync::oneshot::Receiver<
anyhow::Result<crate::bootstrap::BootstrapResult>,
>,
) -> anyhow::Result<Self> {
let base = SourceBase::new(SourceBaseParams::new(id))?;
Ok(Self {
base,
bootstrap_rx: Arc::new(tokio::sync::Mutex::new(Some(bootstrap_rx))),
bootstrap_result_rx: Arc::new(tokio::sync::Mutex::new(Some(bootstrap_result_rx))),
})
}
async fn inject_change(
&self,
change: drasi_core::models::SourceChange,
) -> anyhow::Result<()> {
let wrapper = SourceEventWrapper::new(
self.base.get_id().to_string(),
SourceEvent::Change(change),
chrono::Utc::now(),
);
self.base.dispatch_event(wrapper).await
}
fn set_next_sequence(&self, seq: u64) {
self.base.set_next_sequence(seq);
}
}
#[async_trait::async_trait]
impl Source for HandoverTestSource {
fn id(&self) -> &str {
self.base.get_id()
}
fn type_name(&self) -> &str {
"handover-test"
}
fn properties(&self) -> std::collections::HashMap<String, serde_json::Value> {
std::collections::HashMap::new()
}
fn auto_start(&self) -> bool {
true
}
async fn start(&self) -> anyhow::Result<()> {
self.base
.set_status(ComponentStatus::Starting, Some("Starting".to_string()))
.await;
self.base
.set_status(ComponentStatus::Running, Some("Running".to_string()))
.await;
Ok(())
}
async fn stop(&self) -> anyhow::Result<()> {
self.base
.set_status(ComponentStatus::Stopping, Some("Stopping".to_string()))
.await;
self.base
.set_status(ComponentStatus::Stopped, Some("Stopped".to_string()))
.await;
Ok(())
}
async fn status(&self) -> ComponentStatus {
self.base.status_handle().get_status().await
}
async fn subscribe(
&self,
settings: crate::config::SourceSubscriptionSettings,
) -> anyhow::Result<SubscriptionResponse> {
self.base.apply_subscription_settings(&settings);
let receiver = self.base.create_streaming_receiver().await?;
let bootstrap_rx = self.bootstrap_rx.lock().await.take();
let bootstrap_result_rx = self.bootstrap_result_rx.lock().await.take();
Ok(SubscriptionResponse {
query_id: settings.query_id,
source_id: self.id().to_string(),
receiver,
bootstrap_receiver: bootstrap_rx,
position_handle: None,
bootstrap_result_receiver: bootstrap_result_rx,
})
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
async fn initialize(&self, context: crate::context::SourceRuntimeContext) {
self.base.initialize(context).await;
}
}
fn make_person_insert(
source_id: &str,
node_id: &str,
name: &str,
) -> drasi_core::models::SourceChange {
drasi_core::models::SourceChange::Insert {
element: drasi_core::models::Element::Node {
metadata: drasi_core::models::ElementMetadata {
reference: drasi_core::models::ElementReference::new(source_id, node_id),
labels: Arc::new([Arc::from("Person")]),
effective_from: 1000,
},
properties: drasi_core::models::ElementPropertyMap::from(
vec![(
"name".to_string(),
drasi_core::evaluation::variable_value::VariableValue::String(
name.to_string(),
),
)]
.into_iter()
.collect::<std::collections::BTreeMap<_, _>>(),
),
},
}
}
#[tokio::test]
async fn test_handover_dedup_filters_pre_handover_events() {
let (query_manager, source_manager, graph) =
create_test_env_with_persistent_backend().await;
let mut event_rx = graph.read().await.subscribe();
let (bootstrap_tx, bootstrap_rx) = tokio::sync::mpsc::channel::<BootstrapEvent>(100);
let (result_tx, result_rx) =
tokio::sync::oneshot::channel::<anyhow::Result<crate::bootstrap::BootstrapResult>>();
let source = HandoverTestSource::new("handover-src", bootstrap_rx, result_rx).unwrap();
add_source(&source_manager, &graph, source).await.unwrap();
source_manager
.start_source("handover-src".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"handover-src",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
bootstrap_tx
.send(BootstrapEvent {
source_id: "handover-src".to_string(),
change: make_person_insert("handover-src", "bootstrap-p1", "BootstrapAlice"),
timestamp: chrono::Utc::now(),
sequence: 1,
})
.await
.unwrap();
drop(bootstrap_tx);
result_tx
.send(Ok(crate::bootstrap::BootstrapResult {
event_count: 1,
last_sequence: Some(5),
sequences_aligned: true,
source_position: None,
}))
.unwrap();
let config =
create_persistent_query_config("handover-query", vec!["handover-src".to_string()]);
add_query(&query_manager, &graph, config).await.unwrap();
query_manager
.start_query("handover-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"handover-query",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let source_instance = source_manager
.get_source_instance("handover-src")
.await
.unwrap();
let test_source = source_instance
.as_any()
.downcast_ref::<HandoverTestSource>()
.unwrap();
test_source.set_next_sequence(2);
for (node_id, name) in [
("p3", "Person3"), ("p4", "Person4"), ("p5", "Person5"), ("p6", "Person6"), ("p7", "Person7"), ("p8", "Person8"), ] {
test_source
.inject_change(make_person_insert("handover-src", node_id, name))
.await
.unwrap();
}
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
let query_instance = query_manager
.get_query_instance("handover-query")
.await
.unwrap();
let cp_store = query_instance
.as_any()
.downcast_ref::<DrasiQuery>()
.unwrap()
.get_checkpoint_store()
.await
.expect("Should have checkpoint store");
let cp = cp_store.read_checkpoint("handover-src").await.unwrap();
assert!(
cp.is_some(),
"Checkpoint should exist after processing post-handover events"
);
let checkpoint = cp.unwrap();
assert_eq!(
checkpoint.sequence, 8,
"Checkpoint should be 8 (last processed sequence), \
events ≤5 should have been filtered by handover dedup"
);
}
struct FailableCheckpointStore {
inner: PersistentInMemoryCheckpointStore,
fail_read_config_hash: std::sync::atomic::AtomicBool,
fail_clear_checkpoints: std::sync::atomic::AtomicBool,
}
impl FailableCheckpointStore {
fn new() -> Self {
Self {
inner: PersistentInMemoryCheckpointStore::new(),
fail_read_config_hash: std::sync::atomic::AtomicBool::new(false),
fail_clear_checkpoints: std::sync::atomic::AtomicBool::new(false),
}
}
fn set_fail_read_config_hash(&self, fail: bool) {
self.fail_read_config_hash
.store(fail, std::sync::atomic::Ordering::Relaxed);
}
fn set_fail_clear_checkpoints(&self, fail: bool) {
self.fail_clear_checkpoints
.store(fail, std::sync::atomic::Ordering::Relaxed);
}
}
#[async_trait::async_trait]
impl CheckpointStore for FailableCheckpointStore {
fn is_persistent(&self) -> bool {
true
}
async fn stage_checkpoint(
&self,
source_id: &str,
sequence: u64,
source_position: Option<&Bytes>,
) -> Result<(), drasi_core::interface::IndexError> {
self.inner
.stage_checkpoint(source_id, sequence, source_position)
.await
}
async fn read_checkpoint(
&self,
source_id: &str,
) -> Result<
Option<drasi_core::interface::SourceCheckpoint>,
drasi_core::interface::IndexError,
> {
self.inner.read_checkpoint(source_id).await
}
async fn read_all_checkpoints(
&self,
) -> Result<
std::collections::HashMap<String, drasi_core::interface::SourceCheckpoint>,
drasi_core::interface::IndexError,
> {
self.inner.read_all_checkpoints().await
}
async fn clear_checkpoints(&self) -> Result<(), drasi_core::interface::IndexError> {
if self
.fail_clear_checkpoints
.load(std::sync::atomic::Ordering::Relaxed)
{
return Err(drasi_core::interface::IndexError::other(
std::io::Error::new(
std::io::ErrorKind::Other,
"injected clear_checkpoints failure",
),
));
}
self.inner.clear_checkpoints().await
}
async fn write_config_hash(
&self,
hash: u64,
) -> Result<(), drasi_core::interface::IndexError> {
self.inner.write_config_hash(hash).await
}
async fn read_config_hash(&self) -> Result<Option<u64>, drasi_core::interface::IndexError> {
if self
.fail_read_config_hash
.load(std::sync::atomic::Ordering::Relaxed)
{
return Err(drasi_core::interface::IndexError::other(
std::io::Error::new(
std::io::ErrorKind::Other,
"injected read_config_hash failure",
),
));
}
self.inner.read_config_hash().await
}
}
struct FailablePlugin {
stores: RwLock<std::collections::HashMap<String, Arc<FailableCheckpointStore>>>,
}
impl FailablePlugin {
fn new() -> Self {
Self {
stores: RwLock::new(std::collections::HashMap::new()),
}
}
async fn get_store(&self, query_id: &str) -> Arc<FailableCheckpointStore> {
let mut map = self.stores.write().await;
map.entry(query_id.to_string())
.or_insert_with(|| Arc::new(FailableCheckpointStore::new()))
.clone()
}
}
#[async_trait::async_trait]
impl crate::indexes::IndexBackendPlugin for FailablePlugin {
async fn create_indexes(
&self,
query_id: &str,
) -> Result<drasi_core::interface::CreatedIndexes, drasi_core::interface::IndexError>
{
use drasi_core::in_memory_index::in_memory_element_index::InMemoryElementIndex;
use drasi_core::in_memory_index::in_memory_future_queue::InMemoryFutureQueue;
use drasi_core::in_memory_index::in_memory_result_index::InMemoryResultIndex;
use drasi_core::interface::{IndexSet, NoOpSessionControl};
let checkpoint_store = self.get_store(query_id).await;
let element_index = Arc::new(InMemoryElementIndex::new());
Ok(drasi_core::interface::CreatedIndexes {
set: IndexSet {
element_index: element_index.clone(),
archive_index: element_index,
result_index: Arc::new(InMemoryResultIndex::new()),
future_queue: Arc::new(InMemoryFutureQueue::new()),
session_control: Arc::new(NoOpSessionControl),
},
checkpoint_store: Some(checkpoint_store),
live_results_writer: None,
outbox_writer: None,
})
}
fn is_volatile(&self) -> bool {
false
}
}
async fn create_test_env_with_failable_backend(
plugin: Arc<FailablePlugin>,
) -> (
Arc<crate::queries::QueryManager>,
Arc<crate::sources::SourceManager>,
Arc<tokio::sync::RwLock<crate::component_graph::ComponentGraph>>,
) {
let log_registry = crate::managers::get_or_init_global_registry();
let (graph, update_rx) = crate::component_graph::ComponentGraph::new("test-instance");
let update_tx = graph.update_sender();
let graph = Arc::new(tokio::sync::RwLock::new(graph));
{
let graph_clone = graph.clone();
tokio::spawn(async move {
let mut rx = update_rx;
while let Some(update) = rx.recv().await {
let mut g = graph_clone.write().await;
g.apply_update(update);
}
});
}
let source_manager = Arc::new(crate::sources::SourceManager::new(
"test-instance",
log_registry.clone(),
graph.clone(),
update_tx.clone(),
));
let index_factory = Arc::new(crate::indexes::IndexFactory::new(vec![], Some(plugin)));
let middleware_registry = Arc::new(MiddlewareTypeRegistry::new());
let query_manager = Arc::new(crate::queries::QueryManager::new(
"test-instance",
source_manager.clone(),
index_factory,
middleware_registry,
log_registry,
graph.clone(),
update_tx,
None,
));
(query_manager, source_manager, graph)
}
#[tokio::test]
async fn test_config_hash_read_failure_falls_through() {
let plugin = Arc::new(FailablePlugin::new());
let (query_manager, source_manager, graph) =
create_test_env_with_failable_backend(plugin.clone()).await;
let mut event_rx = graph.read().await.subscribe();
let source = CheckpointTestSource::new("fail-src").unwrap();
add_source(&source_manager, &graph, source).await.unwrap();
source_manager
.start_source("fail-src".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"fail-src",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let store = plugin.get_store("fail-query").await;
store.stage_checkpoint("fail-src", 42, None).await.unwrap();
store.write_config_hash(12345).await.unwrap();
store.set_fail_read_config_hash(true);
let config = create_persistent_query_config("fail-query", vec!["fail-src".to_string()]);
add_query(&query_manager, &graph, config).await.unwrap();
query_manager
.start_query("fail-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"fail-query",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let cp = store.read_checkpoint("fail-src").await.unwrap();
assert!(
cp.is_none(),
"Stale checkpoint should be cleared after config hash read failure"
);
}
#[tokio::test]
async fn test_clear_checkpoints_failure_prevents_hash_write() {
let plugin = Arc::new(FailablePlugin::new());
let (query_manager, source_manager, graph) =
create_test_env_with_failable_backend(plugin.clone()).await;
let mut event_rx = graph.read().await.subscribe();
let source = CheckpointTestSource::new("clear-fail-src").unwrap();
add_source(&source_manager, &graph, source).await.unwrap();
source_manager
.start_source("clear-fail-src".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"clear-fail-src",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let store = plugin.get_store("clear-fail-query").await;
store.write_config_hash(99999).await.unwrap();
store
.stage_checkpoint("clear-fail-src", 10, None)
.await
.unwrap();
store.set_fail_clear_checkpoints(true);
let config =
create_persistent_query_config("clear-fail-query", vec!["clear-fail-src".to_string()]);
add_query(&query_manager, &graph, config).await.unwrap();
let result = query_manager
.start_query("clear-fail-query".to_string())
.await;
assert!(
result.is_err(),
"start_query should fail when clear_checkpoints fails on config hash mismatch"
);
let stored_hash = store.read_config_hash().await.unwrap();
assert_eq!(
stored_hash,
Some(99999),
"Old config hash should remain when clear_checkpoints fails on mismatch"
);
}
}
#[cfg(test)]
mod orchestration_tests {
use bytes::Bytes;
use drasi_core::in_memory_index::in_memory_checkpoint_store::InMemoryCheckpointStore;
use drasi_core::interface::CheckpointStore;
use drasi_core::middleware::MiddlewareTypeRegistry;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::channels::*;
use crate::config::{QueryConfig, QueryLanguage, SourceSubscriptionConfig};
use crate::recovery::RecoveryPolicy;
use crate::sources::base::{SourceBase, SourceBaseParams};
use crate::sources::Source;
use crate::test_helpers::wait_for_component_status;
struct VolatileTestSource {
base: SourceBase,
}
impl VolatileTestSource {
fn new(id: &str) -> anyhow::Result<Self> {
Ok(Self {
base: SourceBase::new(SourceBaseParams::new(id))?,
})
}
}
#[async_trait::async_trait]
impl Source for VolatileTestSource {
fn id(&self) -> &str {
self.base.get_id()
}
fn type_name(&self) -> &str {
"volatile-test"
}
fn properties(&self) -> std::collections::HashMap<String, serde_json::Value> {
std::collections::HashMap::new()
}
fn supports_replay(&self) -> bool {
false
}
async fn start(&self) -> anyhow::Result<()> {
self.base
.set_status(ComponentStatus::Starting, Some("Starting".into()))
.await;
self.base
.set_status(ComponentStatus::Running, Some("Running".into()))
.await;
Ok(())
}
async fn stop(&self) -> anyhow::Result<()> {
self.base
.set_status(ComponentStatus::Stopping, Some("Stopping".into()))
.await;
self.base
.set_status(ComponentStatus::Stopped, Some("Stopped".into()))
.await;
Ok(())
}
async fn status(&self) -> ComponentStatus {
self.base.status_handle().get_status().await
}
async fn subscribe(
&self,
settings: crate::config::SourceSubscriptionSettings,
) -> anyhow::Result<SubscriptionResponse> {
self.base.apply_subscription_settings(&settings);
let receiver = self.base.create_streaming_receiver().await?;
Ok(SubscriptionResponse {
query_id: settings.query_id,
source_id: self.id().to_string(),
receiver,
bootstrap_receiver: None,
position_handle: None,
bootstrap_result_receiver: None,
})
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
async fn initialize(&self, context: crate::context::SourceRuntimeContext) {
self.base.initialize(context).await;
}
}
struct DurableTestSource {
base: SourceBase,
remaining_failures: Arc<std::sync::atomic::AtomicU32>,
subscribe_count: Arc<std::sync::atomic::AtomicU32>,
}
impl DurableTestSource {
fn new(id: &str) -> anyhow::Result<Self> {
Ok(Self {
base: SourceBase::new(SourceBaseParams::new(id))?,
remaining_failures: Arc::new(std::sync::atomic::AtomicU32::new(0)),
subscribe_count: Arc::new(std::sync::atomic::AtomicU32::new(0)),
})
}
fn remaining_failures_handle(&self) -> Arc<std::sync::atomic::AtomicU32> {
self.remaining_failures.clone()
}
fn subscribe_count_handle(&self) -> Arc<std::sync::atomic::AtomicU32> {
self.subscribe_count.clone()
}
fn set_fail_count(&self, count: u32) {
self.remaining_failures
.store(count, std::sync::atomic::Ordering::Release);
}
fn subscribe_count(&self) -> u32 {
self.subscribe_count
.load(std::sync::atomic::Ordering::Acquire)
}
}
#[async_trait::async_trait]
impl Source for DurableTestSource {
fn id(&self) -> &str {
self.base.get_id()
}
fn type_name(&self) -> &str {
"durable-test"
}
fn properties(&self) -> std::collections::HashMap<String, serde_json::Value> {
std::collections::HashMap::new()
}
async fn start(&self) -> anyhow::Result<()> {
self.base
.set_status(ComponentStatus::Starting, Some("Starting".into()))
.await;
self.base
.set_status(ComponentStatus::Running, Some("Running".into()))
.await;
Ok(())
}
async fn stop(&self) -> anyhow::Result<()> {
self.base
.set_status(ComponentStatus::Stopping, Some("Stopping".into()))
.await;
self.base
.set_status(ComponentStatus::Stopped, Some("Stopped".into()))
.await;
Ok(())
}
async fn status(&self) -> ComponentStatus {
self.base.status_handle().get_status().await
}
async fn subscribe(
&self,
settings: crate::config::SourceSubscriptionSettings,
) -> anyhow::Result<SubscriptionResponse> {
self.subscribe_count
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
if self
.remaining_failures
.fetch_update(
std::sync::atomic::Ordering::AcqRel,
std::sync::atomic::Ordering::Acquire,
|n| if n > 0 { Some(n - 1) } else { None },
)
.is_ok()
{
return Err(crate::sources::SourceError::PositionUnavailable {
source_id: self.id().to_string(),
requested: settings.resume_from.unwrap_or_default(),
earliest_available: None,
}
.into());
}
self.base.apply_subscription_settings(&settings);
let receiver = self.base.create_streaming_receiver().await?;
Ok(SubscriptionResponse {
query_id: settings.query_id,
source_id: self.id().to_string(),
receiver,
bootstrap_receiver: None,
position_handle: None,
bootstrap_result_receiver: None,
})
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
async fn initialize(&self, context: crate::context::SourceRuntimeContext) {
self.base.initialize(context).await;
}
async fn remove_position_handle(&self, query_id: &str) {
self.base.remove_position_handle(query_id).await;
}
}
struct PersistentInMemoryCheckpointStore {
inner: InMemoryCheckpointStore,
}
impl PersistentInMemoryCheckpointStore {
fn new() -> Self {
Self {
inner: InMemoryCheckpointStore::new(),
}
}
}
#[async_trait::async_trait]
impl CheckpointStore for PersistentInMemoryCheckpointStore {
fn is_persistent(&self) -> bool {
true
}
async fn stage_checkpoint(
&self,
source_id: &str,
sequence: u64,
source_position: Option<&Bytes>,
) -> Result<(), drasi_core::interface::IndexError> {
self.inner
.stage_checkpoint(source_id, sequence, source_position)
.await
}
async fn read_checkpoint(
&self,
source_id: &str,
) -> Result<
Option<drasi_core::interface::SourceCheckpoint>,
drasi_core::interface::IndexError,
> {
self.inner.read_checkpoint(source_id).await
}
async fn read_all_checkpoints(
&self,
) -> Result<
std::collections::HashMap<String, drasi_core::interface::SourceCheckpoint>,
drasi_core::interface::IndexError,
> {
self.inner.read_all_checkpoints().await
}
async fn read_config_hash(&self) -> Result<Option<u64>, drasi_core::interface::IndexError> {
self.inner.read_config_hash().await
}
async fn write_config_hash(
&self,
hash: u64,
) -> Result<(), drasi_core::interface::IndexError> {
self.inner.write_config_hash(hash).await
}
async fn clear_checkpoints(&self) -> Result<(), drasi_core::interface::IndexError> {
self.inner.clear_checkpoints().await
}
}
struct MockPersistentPlugin {
stores: RwLock<std::collections::HashMap<String, Arc<PersistentInMemoryCheckpointStore>>>,
}
impl MockPersistentPlugin {
fn new() -> Self {
Self {
stores: RwLock::new(std::collections::HashMap::new()),
}
}
}
#[async_trait::async_trait]
impl crate::indexes::IndexBackendPlugin for MockPersistentPlugin {
async fn create_indexes(
&self,
query_id: &str,
) -> Result<drasi_core::interface::CreatedIndexes, drasi_core::interface::IndexError>
{
use drasi_core::in_memory_index::in_memory_element_index::InMemoryElementIndex;
use drasi_core::in_memory_index::in_memory_future_queue::InMemoryFutureQueue;
use drasi_core::in_memory_index::in_memory_result_index::InMemoryResultIndex;
use drasi_core::interface::{IndexSet, NoOpSessionControl};
let checkpoint_store = {
let mut stores = self.stores.write().await;
stores
.entry(query_id.to_string())
.or_insert_with(|| Arc::new(PersistentInMemoryCheckpointStore::new()))
.clone()
};
let element_index = Arc::new(InMemoryElementIndex::new());
Ok(drasi_core::interface::CreatedIndexes {
set: IndexSet {
element_index: element_index.clone(),
archive_index: element_index,
result_index: Arc::new(InMemoryResultIndex::new()),
future_queue: Arc::new(InMemoryFutureQueue::new()),
session_control: Arc::new(NoOpSessionControl),
},
checkpoint_store: Some(checkpoint_store),
live_results_writer: None,
outbox_writer: None,
})
}
fn is_volatile(&self) -> bool {
false
}
}
fn create_volatile_query_config(id: &str, sources: Vec<String>) -> QueryConfig {
QueryConfig {
id: id.to_string(),
query: "MATCH (n:Person) RETURN n.name".to_string(),
query_language: QueryLanguage::Cypher,
middleware: vec![],
sources: sources
.into_iter()
.map(|source_id| SourceSubscriptionConfig {
nodes: vec![],
relations: vec![],
source_id,
pipeline: vec![],
})
.collect(),
auto_start: false,
joins: None,
enable_bootstrap: false,
bootstrap_buffer_size: 100,
priority_queue_capacity: None,
dispatch_buffer_capacity: None,
dispatch_mode: None,
storage_backend: None,
recovery_policy: None,
outbox_capacity: 1000,
bootstrap_timeout_secs: 300,
}
}
fn create_persistent_query_config(
id: &str,
sources: Vec<String>,
recovery_policy: Option<RecoveryPolicy>,
) -> QueryConfig {
use crate::indexes::config::{StorageBackendRef, StorageBackendSpec};
QueryConfig {
id: id.to_string(),
query: "MATCH (n:Person) RETURN n.name".to_string(),
query_language: QueryLanguage::Cypher,
middleware: vec![],
sources: sources
.into_iter()
.map(|source_id| SourceSubscriptionConfig {
nodes: vec![],
relations: vec![],
source_id,
pipeline: vec![],
})
.collect(),
auto_start: false,
joins: None,
enable_bootstrap: false,
bootstrap_buffer_size: 100,
priority_queue_capacity: None,
dispatch_buffer_capacity: None,
dispatch_mode: None,
storage_backend: Some(StorageBackendRef::Inline(StorageBackendSpec::RocksDb {
path: "/tmp/test-drasi".to_string(),
enable_archive: false,
direct_io: false,
})),
recovery_policy,
outbox_capacity: 1000,
bootstrap_timeout_secs: 300,
}
}
async fn add_source(
source_manager: &crate::sources::SourceManager,
graph: &tokio::sync::RwLock<crate::component_graph::ComponentGraph>,
source: impl Source + 'static,
) -> anyhow::Result<()> {
let source_id = source.id().to_string();
let source_type = source.type_name().to_string();
let auto_start = source.auto_start();
{
let mut g = graph.write().await;
let mut metadata = std::collections::HashMap::new();
metadata.insert("kind".to_string(), source_type);
metadata.insert("autoStart".to_string(), auto_start.to_string());
g.register_source(&source_id, metadata)?;
}
source_manager.provision_source(source).await?;
source_manager.start_source(source_id).await
}
async fn add_query(
manager: &crate::queries::QueryManager,
graph: &tokio::sync::RwLock<crate::component_graph::ComponentGraph>,
config: QueryConfig,
) -> anyhow::Result<()> {
{
let mut g = graph.write().await;
let source_ids: Vec<String> =
config.sources.iter().map(|s| s.source_id.clone()).collect();
for sid in &source_ids {
if !g.contains(sid) {
g.register_source(sid, std::collections::HashMap::new())?;
}
}
let mut metadata = std::collections::HashMap::new();
metadata.insert("query".to_string(), config.query.clone());
g.register_query(&config.id, metadata, &source_ids)?;
}
manager.provision_query(config).await
}
async fn create_test_env(
plugin: Option<Arc<dyn crate::indexes::IndexBackendPlugin>>,
default_recovery_policy: Option<RecoveryPolicy>,
) -> (
Arc<crate::queries::QueryManager>,
Arc<crate::sources::SourceManager>,
Arc<tokio::sync::RwLock<crate::component_graph::ComponentGraph>>,
) {
let log_registry = crate::managers::get_or_init_global_registry();
let (graph, update_rx) = crate::component_graph::ComponentGraph::new("test-instance");
let update_tx = graph.update_sender();
let graph = Arc::new(tokio::sync::RwLock::new(graph));
{
let graph_clone = graph.clone();
tokio::spawn(async move {
let mut rx = update_rx;
while let Some(update) = rx.recv().await {
let mut g = graph_clone.write().await;
g.apply_update(update);
}
});
}
let source_manager = Arc::new(crate::sources::SourceManager::new(
"test-instance",
log_registry.clone(),
graph.clone(),
update_tx.clone(),
));
let index_factory = Arc::new(crate::indexes::IndexFactory::new(vec![], plugin));
let middleware_registry = Arc::new(MiddlewareTypeRegistry::new());
let query_manager = Arc::new(crate::queries::QueryManager::new(
"test-instance",
source_manager.clone(),
index_factory,
middleware_registry,
log_registry,
graph.clone(),
update_tx,
default_recovery_policy,
));
(query_manager, source_manager, graph)
}
#[tokio::test]
async fn test_volatile_query_with_volatile_source_ok() {
let (query_manager, source_manager, graph) = create_test_env(None, None).await;
let mut event_rx = graph.read().await.subscribe();
let source = VolatileTestSource::new("vol-src").unwrap();
add_source(&source_manager, &graph, source).await.unwrap();
wait_for_component_status(
&mut event_rx,
"vol-src",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let config = create_volatile_query_config("vol-query", vec!["vol-src".to_string()]);
add_query(&query_manager, &graph, config).await.unwrap();
let result = query_manager.start_query("vol-query".to_string()).await;
assert!(
result.is_ok(),
"Volatile query + volatile source should succeed: {result:?}"
);
wait_for_component_status(
&mut event_rx,
"vol-query",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
query_manager
.stop_query("vol-query".to_string())
.await
.unwrap();
}
#[tokio::test]
async fn test_persistent_query_with_durable_source_ok() {
let plugin = Arc::new(MockPersistentPlugin::new());
let (query_manager, source_manager, graph) = create_test_env(Some(plugin), None).await;
let mut event_rx = graph.read().await.subscribe();
let source = DurableTestSource::new("dur-src").unwrap();
add_source(&source_manager, &graph, source).await.unwrap();
wait_for_component_status(
&mut event_rx,
"dur-src",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let config =
create_persistent_query_config("persist-query", vec!["dur-src".to_string()], None);
add_query(&query_manager, &graph, config).await.unwrap();
let result = query_manager.start_query("persist-query".to_string()).await;
assert!(
result.is_ok(),
"Persistent + durable should succeed: {result:?}"
);
wait_for_component_status(
&mut event_rx,
"persist-query",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
query_manager
.stop_query("persist-query".to_string())
.await
.unwrap();
}
#[tokio::test]
async fn test_persistent_query_with_volatile_source_fails() {
let plugin = Arc::new(MockPersistentPlugin::new());
let (query_manager, source_manager, graph) = create_test_env(Some(plugin), None).await;
let mut event_rx = graph.read().await.subscribe();
let source = VolatileTestSource::new("vol-src").unwrap();
add_source(&source_manager, &graph, source).await.unwrap();
wait_for_component_status(
&mut event_rx,
"vol-src",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let config =
create_persistent_query_config("incompat-query", vec!["vol-src".to_string()], None);
add_query(&query_manager, &graph, config).await.unwrap();
let result = query_manager
.start_query("incompat-query".to_string())
.await;
assert!(result.is_err(), "Persistent + volatile should fail");
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("do not support replay") || err_msg.contains("IncompatibleSource"),
"Error should mention incompatible source: {err_msg}"
);
wait_for_component_status(
&mut event_rx,
"incompat-query",
ComponentStatus::Error,
std::time::Duration::from_secs(5),
)
.await;
}
#[tokio::test]
async fn test_position_unavailable_strict_policy_fails() {
let plugin = Arc::new(MockPersistentPlugin::new());
let (query_manager, source_manager, graph) = create_test_env(Some(plugin), None).await;
let mut event_rx = graph.read().await.subscribe();
let source = DurableTestSource::new("strict-src").unwrap();
source.set_fail_count(u32::MAX);
add_source(&source_manager, &graph, source).await.unwrap();
wait_for_component_status(
&mut event_rx,
"strict-src",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let config = create_persistent_query_config(
"strict-query",
vec!["strict-src".to_string()],
Some(RecoveryPolicy::Strict),
);
add_query(&query_manager, &graph, config).await.unwrap();
let result = query_manager.start_query("strict-query".to_string()).await;
assert!(result.is_err(), "Strict + PositionUnavailable should fail");
let err_msg = format!("{:#}", result.unwrap_err());
assert!(
err_msg.contains("PositionUnavailable") || err_msg.contains("Strict"),
"Error should mention PositionUnavailable or Strict: {err_msg}"
);
wait_for_component_status(
&mut event_rx,
"strict-query",
ComponentStatus::Error,
std::time::Duration::from_secs(5),
)
.await;
}
#[tokio::test]
async fn test_position_unavailable_auto_reset_retries() {
let plugin = Arc::new(MockPersistentPlugin::new());
let (query_manager, source_manager, graph) = create_test_env(Some(plugin), None).await;
let mut event_rx = graph.read().await.subscribe();
let source = DurableTestSource::new("reset-src").unwrap();
source.set_fail_count(1); let sub_count = source.subscribe_count_handle();
add_source(&source_manager, &graph, source).await.unwrap();
wait_for_component_status(
&mut event_rx,
"reset-src",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let config = create_persistent_query_config(
"reset-query",
vec!["reset-src".to_string()],
Some(RecoveryPolicy::AutoReset),
);
add_query(&query_manager, &graph, config).await.unwrap();
let result = query_manager.start_query("reset-query".to_string()).await;
assert!(
result.is_ok(),
"AutoReset should retry and succeed: {result:?}"
);
assert_eq!(
sub_count.load(std::sync::atomic::Ordering::Acquire),
2,
"Source should be subscribed twice (first fail, then retry)"
);
wait_for_component_status(
&mut event_rx,
"reset-query",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
query_manager
.stop_query("reset-query".to_string())
.await
.unwrap();
}
#[tokio::test]
async fn test_default_strict_policy_when_none_configured() {
let plugin = Arc::new(MockPersistentPlugin::new());
let (query_manager, source_manager, graph) = create_test_env(Some(plugin), None).await;
let mut event_rx = graph.read().await.subscribe();
let source = DurableTestSource::new("default-src").unwrap();
source.set_fail_count(u32::MAX);
add_source(&source_manager, &graph, source).await.unwrap();
wait_for_component_status(
&mut event_rx,
"default-src",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let config = create_persistent_query_config(
"default-policy-query",
vec!["default-src".to_string()],
None,
);
add_query(&query_manager, &graph, config).await.unwrap();
let result = query_manager
.start_query("default-policy-query".to_string())
.await;
assert!(
result.is_err(),
"Default Strict + PositionUnavailable should fail"
);
}
#[tokio::test]
async fn test_global_default_policy_auto_reset() {
let plugin = Arc::new(MockPersistentPlugin::new());
let (query_manager, source_manager, graph) =
create_test_env(Some(plugin), Some(RecoveryPolicy::AutoReset)).await;
let mut event_rx = graph.read().await.subscribe();
let source = DurableTestSource::new("global-src").unwrap();
source.set_fail_count(1); let sub_count = source.subscribe_count_handle();
add_source(&source_manager, &graph, source).await.unwrap();
wait_for_component_status(
&mut event_rx,
"global-src",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let config = create_persistent_query_config(
"global-policy-query",
vec!["global-src".to_string()],
None, );
add_query(&query_manager, &graph, config).await.unwrap();
let result = query_manager
.start_query("global-policy-query".to_string())
.await;
assert!(
result.is_ok(),
"Global AutoReset should trigger retry: {result:?}"
);
assert_eq!(sub_count.load(std::sync::atomic::Ordering::Acquire), 2);
wait_for_component_status(
&mut event_rx,
"global-policy-query",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
query_manager
.stop_query("global-policy-query".to_string())
.await
.unwrap();
}
#[tokio::test]
async fn test_per_query_policy_overrides_global() {
let plugin = Arc::new(MockPersistentPlugin::new());
let (query_manager, source_manager, graph) =
create_test_env(Some(plugin), Some(RecoveryPolicy::AutoReset)).await;
let mut event_rx = graph.read().await.subscribe();
let source = DurableTestSource::new("override-src").unwrap();
source.set_fail_count(u32::MAX);
add_source(&source_manager, &graph, source).await.unwrap();
wait_for_component_status(
&mut event_rx,
"override-src",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let config = create_persistent_query_config(
"override-query",
vec!["override-src".to_string()],
Some(RecoveryPolicy::Strict),
);
add_query(&query_manager, &graph, config).await.unwrap();
let result = query_manager
.start_query("override-query".to_string())
.await;
assert!(
result.is_err(),
"Per-query Strict should override global AutoReset"
);
}
#[tokio::test]
async fn test_auto_reset_double_failure_errors() {
let plugin = Arc::new(MockPersistentPlugin::new());
let (query_manager, source_manager, graph) = create_test_env(Some(plugin), None).await;
let mut event_rx = graph.read().await.subscribe();
let source = DurableTestSource::new("double-fail-src").unwrap();
source.set_fail_count(u32::MAX);
let sub_count = source.subscribe_count_handle();
add_source(&source_manager, &graph, source).await.unwrap();
wait_for_component_status(
&mut event_rx,
"double-fail-src",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let config = create_persistent_query_config(
"double-fail-query",
vec!["double-fail-src".to_string()],
Some(RecoveryPolicy::AutoReset),
);
add_query(&query_manager, &graph, config).await.unwrap();
let result = query_manager
.start_query("double-fail-query".to_string())
.await;
assert!(
result.is_err(),
"Double failure should error even with AutoReset"
);
assert_eq!(sub_count.load(std::sync::atomic::Ordering::Acquire), 2);
wait_for_component_status(
&mut event_rx,
"double-fail-query",
ComponentStatus::Error,
std::time::Duration::from_secs(5),
)
.await;
}
}