use anyhow::{bail, Result};
use crate::config::{QueryConfig, SourceSubscriptionConfig, SourceSubscriptionSettings};
use crate::queries::QueryLabels;
pub struct SubscriptionSettingsBuilder;
impl SubscriptionSettingsBuilder {
pub fn build_subscription_settings(
query_config: &QueryConfig,
query_labels: &QueryLabels,
) -> Result<Vec<SourceSubscriptionSettings>> {
let mut settings_vec: Vec<SourceSubscriptionSettings> = query_config
.sources
.iter()
.map(|source_config| SourceSubscriptionSettings {
source_id: source_config.source_id.clone(),
enable_bootstrap: query_config.enable_bootstrap,
query_id: query_config.id.clone(),
nodes: source_config.nodes.iter().cloned().collect(),
relations: source_config.relations.iter().cloned().collect(),
resume_from: None,
request_position_handle: false,
})
.collect();
Self::allocate_node_labels(&mut settings_vec, &query_config.sources, query_labels)?;
Self::allocate_relation_labels(
&mut settings_vec,
&query_config.sources,
query_labels,
&query_config.joins,
)?;
Ok(settings_vec)
}
fn allocate_node_labels(
settings_vec: &mut [SourceSubscriptionSettings],
source_configs: &[SourceSubscriptionConfig],
query_labels: &QueryLabels,
) -> Result<()> {
for node_label in &query_labels.node_labels {
let mut matching_indices = Vec::new();
for (idx, config) in source_configs.iter().enumerate() {
if config.nodes.contains(node_label) {
matching_indices.push(idx);
}
}
match matching_indices.len() {
0 => {
if let Some(first_settings) = settings_vec.first_mut() {
first_settings.nodes.insert(node_label.clone());
} else {
bail!("No sources configured for query");
}
}
1 => {
}
_ => {
bail!(
"Node label '{node_label}' is configured in multiple sources. Each node label must be assigned to exactly one source."
);
}
}
}
Ok(())
}
fn allocate_relation_labels(
settings_vec: &mut [SourceSubscriptionSettings],
source_configs: &[SourceSubscriptionConfig],
query_labels: &QueryLabels,
joins: &Option<Vec<crate::config::QueryJoinConfig>>,
) -> Result<()> {
for relation_label in &query_labels.relation_labels {
let mut matching_indices = Vec::new();
for (idx, config) in source_configs.iter().enumerate() {
if config.relations.contains(relation_label) {
matching_indices.push(idx);
}
}
match matching_indices.len() {
0 => {
if let Some(join_configs) = joins {
if let Some(join_config) =
join_configs.iter().find(|j| j.id == *relation_label)
{
for key in &join_config.keys {
if !query_labels.node_labels.contains(&key.label) {
bail!(
"Join relation '{}' references node label '{}' which is not found in the query",
relation_label,
key.label
);
}
}
continue;
}
}
if let Some(first_settings) = settings_vec.first_mut() {
first_settings.relations.insert(relation_label.clone());
} else {
bail!("No sources configured for query");
}
}
1 => {
}
_ => {
bail!(
"Relation label '{relation_label}' is configured in multiple sources. Each relation label must be assigned to exactly one source."
);
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{
QueryConfig, QueryJoinConfig, QueryJoinKeyConfig, QueryLanguage, SourceSubscriptionConfig,
};
fn create_test_query_config(sources: Vec<SourceSubscriptionConfig>) -> QueryConfig {
QueryConfig {
id: "test-query".to_string(),
query: "MATCH (n:Person) RETURN n".to_string(),
query_language: QueryLanguage::Cypher,
middleware: vec![],
sources,
auto_start: true,
joins: None,
enable_bootstrap: true,
bootstrap_buffer_size: 10000,
priority_queue_capacity: None,
dispatch_buffer_capacity: None,
dispatch_mode: None,
storage_backend: None,
recovery_policy: None,
}
}
#[test]
fn test_node_label_in_one_source() {
let sources = vec![SourceSubscriptionConfig {
source_id: "source1".to_string(),
nodes: vec!["Person".to_string()],
relations: vec![],
pipeline: vec![],
}];
let query_config = create_test_query_config(sources);
let query_labels = QueryLabels {
node_labels: vec!["Person".to_string()],
relation_labels: vec![],
};
let result =
SubscriptionSettingsBuilder::build_subscription_settings(&query_config, &query_labels);
assert!(result.is_ok());
let settings = result.unwrap();
assert_eq!(settings.len(), 1);
assert!(settings[0].nodes.contains("Person"));
}
#[test]
fn test_node_label_not_in_any_source_goes_to_first() {
let sources = vec![
SourceSubscriptionConfig {
source_id: "source1".to_string(),
nodes: vec![],
relations: vec![],
pipeline: vec![],
},
SourceSubscriptionConfig {
source_id: "source2".to_string(),
nodes: vec![],
relations: vec![],
pipeline: vec![],
},
];
let query_config = create_test_query_config(sources);
let query_labels = QueryLabels {
node_labels: vec!["Person".to_string()],
relation_labels: vec![],
};
let result =
SubscriptionSettingsBuilder::build_subscription_settings(&query_config, &query_labels);
assert!(result.is_ok());
let settings = result.unwrap();
assert_eq!(settings.len(), 2);
assert!(settings[0].nodes.contains("Person"));
assert!(!settings[1].nodes.contains("Person"));
}
#[test]
fn test_node_label_in_multiple_sources_error() {
let sources = vec![
SourceSubscriptionConfig {
source_id: "source1".to_string(),
nodes: vec!["Person".to_string()],
relations: vec![],
pipeline: vec![],
},
SourceSubscriptionConfig {
source_id: "source2".to_string(),
nodes: vec!["Person".to_string()],
relations: vec![],
pipeline: vec![],
},
];
let query_config = create_test_query_config(sources);
let query_labels = QueryLabels {
node_labels: vec!["Person".to_string()],
relation_labels: vec![],
};
let result =
SubscriptionSettingsBuilder::build_subscription_settings(&query_config, &query_labels);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("multiple sources"));
}
#[test]
fn test_relation_label_in_one_source() {
let sources = vec![SourceSubscriptionConfig {
source_id: "source1".to_string(),
nodes: vec![],
relations: vec!["KNOWS".to_string()],
pipeline: vec![],
}];
let query_config = create_test_query_config(sources);
let query_labels = QueryLabels {
node_labels: vec![],
relation_labels: vec!["KNOWS".to_string()],
};
let result =
SubscriptionSettingsBuilder::build_subscription_settings(&query_config, &query_labels);
assert!(result.is_ok());
let settings = result.unwrap();
assert_eq!(settings.len(), 1);
assert!(settings[0].relations.contains("KNOWS"));
}
#[test]
fn test_relation_label_not_in_any_source_goes_to_first() {
let sources = vec![SourceSubscriptionConfig {
source_id: "source1".to_string(),
nodes: vec![],
relations: vec![],
pipeline: vec![],
}];
let query_config = create_test_query_config(sources);
let query_labels = QueryLabels {
node_labels: vec![],
relation_labels: vec!["KNOWS".to_string()],
};
let result =
SubscriptionSettingsBuilder::build_subscription_settings(&query_config, &query_labels);
assert!(result.is_ok());
let settings = result.unwrap();
assert_eq!(settings.len(), 1);
assert!(settings[0].relations.contains("KNOWS"));
}
#[test]
fn test_relation_label_in_multiple_sources_error() {
let sources = vec![
SourceSubscriptionConfig {
source_id: "source1".to_string(),
nodes: vec![],
relations: vec!["KNOWS".to_string()],
pipeline: vec![],
},
SourceSubscriptionConfig {
source_id: "source2".to_string(),
nodes: vec![],
relations: vec!["KNOWS".to_string()],
pipeline: vec![],
},
];
let query_config = create_test_query_config(sources);
let query_labels = QueryLabels {
node_labels: vec![],
relation_labels: vec!["KNOWS".to_string()],
};
let result =
SubscriptionSettingsBuilder::build_subscription_settings(&query_config, &query_labels);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("multiple sources"));
}
#[test]
fn test_join_relation_not_added_to_source() {
let sources = vec![SourceSubscriptionConfig {
source_id: "source1".to_string(),
nodes: vec![],
relations: vec![],
pipeline: vec![],
}];
let mut query_config = create_test_query_config(sources);
query_config.joins = Some(vec![QueryJoinConfig {
id: "CUSTOMER".to_string(),
keys: vec![
QueryJoinKeyConfig {
label: "Order".to_string(),
property: "customer_id".to_string(),
},
QueryJoinKeyConfig {
label: "Customer".to_string(),
property: "id".to_string(),
},
],
}]);
let query_labels = QueryLabels {
node_labels: vec!["Order".to_string(), "Customer".to_string()],
relation_labels: vec!["CUSTOMER".to_string()],
};
let result =
SubscriptionSettingsBuilder::build_subscription_settings(&query_config, &query_labels);
assert!(result.is_ok());
let settings = result.unwrap();
assert_eq!(settings.len(), 1);
assert!(!settings[0].relations.contains("CUSTOMER"));
assert!(settings[0].nodes.contains("Order"));
assert!(settings[0].nodes.contains("Customer"));
}
#[test]
fn test_join_relation_with_missing_node_label_error() {
let sources = vec![SourceSubscriptionConfig {
source_id: "source1".to_string(),
nodes: vec![],
relations: vec![],
pipeline: vec![],
}];
let mut query_config = create_test_query_config(sources);
query_config.joins = Some(vec![QueryJoinConfig {
id: "CUSTOMER".to_string(),
keys: vec![
QueryJoinKeyConfig {
label: "Order".to_string(),
property: "customer_id".to_string(),
},
QueryJoinKeyConfig {
label: "Customer".to_string(),
property: "id".to_string(),
},
],
}]);
let query_labels = QueryLabels {
node_labels: vec!["Order".to_string()], relation_labels: vec!["CUSTOMER".to_string()],
};
let result =
SubscriptionSettingsBuilder::build_subscription_settings(&query_config, &query_labels);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("not found in the query"));
}
#[test]
fn test_complex_multi_source_scenario() {
let sources = vec![
SourceSubscriptionConfig {
source_id: "orders_db".to_string(),
nodes: vec!["Order".to_string()],
relations: vec![],
pipeline: vec![],
},
SourceSubscriptionConfig {
source_id: "customers_db".to_string(),
nodes: vec!["Customer".to_string()],
relations: vec![],
pipeline: vec![],
},
];
let mut query_config = create_test_query_config(sources);
query_config.joins = Some(vec![QueryJoinConfig {
id: "PLACED_BY".to_string(),
keys: vec![
QueryJoinKeyConfig {
label: "Order".to_string(),
property: "customer_id".to_string(),
},
QueryJoinKeyConfig {
label: "Customer".to_string(),
property: "id".to_string(),
},
],
}]);
let query_labels = QueryLabels {
node_labels: vec!["Order".to_string(), "Customer".to_string(), "Product".to_string()],
relation_labels: vec!["PLACED_BY".to_string(), "CONTAINS".to_string()],
};
let result =
SubscriptionSettingsBuilder::build_subscription_settings(&query_config, &query_labels);
assert!(result.is_ok());
let settings = result.unwrap();
assert_eq!(settings.len(), 2);
assert!(settings[0].nodes.contains("Order"));
assert!(settings[1].nodes.contains("Customer"));
assert!(settings[0].nodes.contains("Product"));
assert!(!settings[0].relations.contains("PLACED_BY"));
assert!(!settings[1].relations.contains("PLACED_BY"));
assert!(settings[0].relations.contains("CONTAINS"));
}
}