use std::collections::HashMap;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use crate::core::{BackendCapability, BackendKind};
use super::analyzer::QueryFeature;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum BackendRole {
Primary,
Search,
Graph,
Terminology,
Archive,
}
impl BackendRole {
pub fn is_primary(&self) -> bool {
matches!(self, BackendRole::Primary)
}
pub fn is_secondary(&self) -> bool {
!self.is_primary()
}
pub fn typical_capabilities(&self) -> Vec<BackendCapability> {
match self {
BackendRole::Primary => vec![
BackendCapability::Crud,
BackendCapability::Versioning,
BackendCapability::InstanceHistory,
BackendCapability::TypeHistory,
BackendCapability::SystemHistory,
BackendCapability::BasicSearch,
BackendCapability::DateSearch,
BackendCapability::ReferenceSearch,
BackendCapability::Transactions,
BackendCapability::OptimisticLocking,
BackendCapability::Include,
BackendCapability::Revinclude,
BackendCapability::Sorting,
BackendCapability::OffsetPagination,
BackendCapability::CursorPagination,
],
BackendRole::Search => vec![
BackendCapability::BasicSearch,
BackendCapability::FullTextSearch,
BackendCapability::Sorting,
],
BackendRole::Graph => vec![
BackendCapability::ChainedSearch,
BackendCapability::ReverseChaining,
BackendCapability::Include,
BackendCapability::Revinclude,
],
BackendRole::Terminology => vec![BackendCapability::TerminologySearch],
BackendRole::Archive => vec![
BackendCapability::Crud,
BackendCapability::Versioning,
BackendCapability::InstanceHistory,
],
}
}
}
#[derive(Debug, Clone)]
pub struct BackendEntry {
pub id: String,
pub role: BackendRole,
pub kind: BackendKind,
pub connection: String,
pub priority: u8,
pub enabled: bool,
pub capabilities: Vec<BackendCapability>,
pub failover_to: Option<String>,
pub options: HashMap<String, serde_json::Value>,
}
fn default_priority() -> u8 {
100
}
impl BackendEntry {
pub fn new(id: impl Into<String>, role: BackendRole, kind: BackendKind) -> Self {
Self {
id: id.into(),
role,
kind,
connection: String::new(),
priority: default_priority(),
enabled: true,
capabilities: Vec::new(),
failover_to: None,
options: HashMap::new(),
}
}
pub fn with_connection(mut self, connection: impl Into<String>) -> Self {
self.connection = connection.into();
self
}
pub fn with_priority(mut self, priority: u8) -> Self {
self.priority = priority;
self
}
pub fn with_failover(mut self, failover_id: impl Into<String>) -> Self {
self.failover_to = Some(failover_id.into());
self
}
pub fn with_capabilities(mut self, capabilities: Vec<BackendCapability>) -> Self {
self.capabilities = capabilities;
self
}
pub fn effective_capabilities(&self) -> Vec<BackendCapability> {
if self.capabilities.is_empty() {
self.role.typical_capabilities()
} else {
self.capabilities.clone()
}
}
pub fn supports(&self, capability: BackendCapability) -> bool {
self.effective_capabilities().contains(&capability)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RoutingRule {
pub id: String,
pub triggers: Vec<QueryFeature>,
pub target_backend: String,
#[serde(default = "default_priority")]
pub priority: u8,
#[serde(default = "default_fallback")]
pub fallback_to_primary: bool,
}
fn default_fallback() -> bool {
true
}
impl RoutingRule {
pub fn new(id: impl Into<String>, target_backend: impl Into<String>) -> Self {
Self {
id: id.into(),
triggers: Vec::new(),
target_backend: target_backend.into(),
priority: default_priority(),
fallback_to_primary: true,
}
}
pub fn with_trigger(mut self, feature: QueryFeature) -> Self {
self.triggers.push(feature);
self
}
pub fn with_triggers(mut self, features: Vec<QueryFeature>) -> Self {
self.triggers.extend(features);
self
}
pub fn with_priority(mut self, priority: u8) -> Self {
self.priority = priority;
self
}
pub fn with_fallback(mut self, fallback: bool) -> Self {
self.fallback_to_primary = fallback;
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum SyncMode {
Synchronous,
#[default]
Asynchronous,
Hybrid {
sync_for_search: bool,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryConfig {
#[serde(default = "default_max_retries")]
pub max_retries: u32,
#[serde(with = "humantime_serde", default = "default_initial_delay")]
pub initial_delay: Duration,
#[serde(with = "humantime_serde", default = "default_max_delay")]
pub max_delay: Duration,
#[serde(default = "default_backoff_multiplier")]
pub backoff_multiplier: f64,
}
fn default_max_retries() -> u32 {
3
}
fn default_initial_delay() -> Duration {
Duration::from_millis(100)
}
fn default_max_delay() -> Duration {
Duration::from_secs(5)
}
fn default_backoff_multiplier() -> f64 {
2.0
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_retries: default_max_retries(),
initial_delay: default_initial_delay(),
max_delay: default_max_delay(),
backoff_multiplier: default_backoff_multiplier(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncConfig {
#[serde(default)]
pub mode: SyncMode,
#[serde(default = "default_max_read_lag")]
pub max_read_lag_ms: u64,
#[serde(default = "default_batch_size")]
pub batch_size: usize,
#[serde(default)]
pub retry: RetryConfig,
}
fn default_max_read_lag() -> u64 {
500
}
fn default_batch_size() -> usize {
100
}
impl Default for SyncConfig {
fn default() -> Self {
Self {
mode: SyncMode::default(),
max_read_lag_ms: default_max_read_lag(),
batch_size: default_batch_size(),
retry: RetryConfig::default(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CostWeights {
#[serde(default = "default_latency_weight")]
pub latency: f64,
#[serde(default = "default_resource_weight")]
pub resource_usage: f64,
#[serde(default = "default_quality_weight")]
pub quality: f64,
}
fn default_latency_weight() -> f64 {
0.5
}
fn default_resource_weight() -> f64 {
0.3
}
fn default_quality_weight() -> f64 {
0.2
}
impl Default for CostWeights {
fn default() -> Self {
Self {
latency: default_latency_weight(),
resource_usage: default_resource_weight(),
quality: default_quality_weight(),
}
}
}
#[derive(Debug, Clone)]
pub struct CostConfig {
pub base_costs: HashMap<BackendKind, f64>,
pub feature_multipliers: HashMap<QueryFeature, f64>,
pub weights: CostWeights,
}
impl Default for CostConfig {
fn default() -> Self {
let mut base_costs = HashMap::new();
base_costs.insert(BackendKind::Sqlite, 1.0);
base_costs.insert(BackendKind::Postgres, 1.2);
base_costs.insert(BackendKind::Elasticsearch, 0.8);
base_costs.insert(BackendKind::Neo4j, 1.5);
base_costs.insert(BackendKind::S3, 2.0);
let mut feature_multipliers = HashMap::new();
feature_multipliers.insert(QueryFeature::BasicSearch, 1.0);
feature_multipliers.insert(QueryFeature::ChainedSearch, 3.0);
feature_multipliers.insert(QueryFeature::ReverseChaining, 3.5);
feature_multipliers.insert(QueryFeature::FullTextSearch, 1.5);
feature_multipliers.insert(QueryFeature::TerminologySearch, 2.0);
Self {
base_costs,
feature_multipliers,
weights: CostWeights::default(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthConfig {
#[serde(with = "humantime_serde", default = "default_health_interval")]
pub check_interval: Duration,
#[serde(with = "humantime_serde", default = "default_health_timeout")]
pub timeout: Duration,
#[serde(default = "default_failure_threshold")]
pub failure_threshold: u32,
#[serde(default = "default_success_threshold")]
pub success_threshold: u32,
}
fn default_health_interval() -> Duration {
Duration::from_secs(30)
}
fn default_health_timeout() -> Duration {
Duration::from_secs(5)
}
fn default_failure_threshold() -> u32 {
3
}
fn default_success_threshold() -> u32 {
2
}
impl Default for HealthConfig {
fn default() -> Self {
Self {
check_interval: default_health_interval(),
timeout: default_health_timeout(),
failure_threshold: default_failure_threshold(),
success_threshold: default_success_threshold(),
}
}
}
#[derive(Debug, Clone)]
pub struct CompositeConfig {
pub backends: Vec<BackendEntry>,
pub routing_rules: Vec<RoutingRule>,
pub sync_config: SyncConfig,
pub cost_config: CostConfig,
pub health_config: HealthConfig,
}
impl CompositeConfig {
pub fn new() -> Self {
Self {
backends: Vec::new(),
routing_rules: Vec::new(),
sync_config: SyncConfig::default(),
cost_config: CostConfig::default(),
health_config: HealthConfig::default(),
}
}
pub fn builder() -> CompositeConfigBuilder {
CompositeConfigBuilder::new()
}
pub fn primary(&self) -> Option<&BackendEntry> {
self.backends
.iter()
.find(|b| b.role.is_primary() && b.enabled)
}
pub fn primary_id(&self) -> Option<&str> {
self.primary().map(|b| b.id.as_str())
}
pub fn secondaries(&self) -> impl Iterator<Item = &BackendEntry> {
self.backends
.iter()
.filter(|b| b.role.is_secondary() && b.enabled)
}
pub fn backend(&self, id: &str) -> Option<&BackendEntry> {
self.backends.iter().find(|b| b.id == id)
}
pub fn backends_with_role(&self, role: BackendRole) -> impl Iterator<Item = &BackendEntry> {
self.backends
.iter()
.filter(move |b| b.role == role && b.enabled)
}
pub fn backends_with_capability(
&self,
capability: BackendCapability,
) -> impl Iterator<Item = &BackendEntry> {
self.backends
.iter()
.filter(move |b| b.enabled && b.supports(capability))
}
pub fn validate(&self) -> Result<Vec<ConfigWarning>, ConfigError> {
let mut warnings = Vec::new();
let primaries: Vec<_> = self
.backends
.iter()
.filter(|b| b.role.is_primary() && b.enabled)
.collect();
if primaries.is_empty() {
return Err(ConfigError::NoPrimaryBackend);
}
if primaries.len() > 1 {
return Err(ConfigError::MultiplePrimaryBackends(
primaries.iter().map(|b| b.id.clone()).collect(),
));
}
let mut seen_ids = std::collections::HashSet::new();
for backend in &self.backends {
if !seen_ids.insert(&backend.id) {
return Err(ConfigError::DuplicateBackendId(backend.id.clone()));
}
}
for backend in &self.backends {
if let Some(ref failover_id) = backend.failover_to {
if self.backend(failover_id).is_none() {
return Err(ConfigError::InvalidFailoverReference {
backend_id: backend.id.clone(),
failover_id: failover_id.clone(),
});
}
}
}
for rule in &self.routing_rules {
if self.backend(&rule.target_backend).is_none() {
return Err(ConfigError::InvalidRoutingTarget {
rule_id: rule.id.clone(),
target_id: rule.target_backend.clone(),
});
}
}
if self.secondaries().count() == 0 {
warnings.push(ConfigWarning::NoSecondaryBackends);
}
let search_backends: Vec<_> = self
.backends_with_capability(BackendCapability::FullTextSearch)
.collect();
if search_backends.len() > 1 {
warnings.push(ConfigWarning::RedundantCapability {
capability: BackendCapability::FullTextSearch,
backends: search_backends.iter().map(|b| b.id.clone()).collect(),
});
}
Ok(warnings)
}
}
impl Default for CompositeConfig {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Default)]
pub struct CompositeConfigBuilder {
backends: Vec<BackendEntry>,
routing_rules: Vec<RoutingRule>,
sync_config: SyncConfig,
cost_config: CostConfig,
health_config: HealthConfig,
}
impl CompositeConfigBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn with_backend(mut self, backend: BackendEntry) -> Self {
self.backends.push(backend);
self
}
pub fn primary(mut self, id: impl Into<String>, kind: BackendKind) -> Self {
self.backends
.push(BackendEntry::new(id, BackendRole::Primary, kind));
self
}
pub fn search_backend(mut self, id: impl Into<String>, kind: BackendKind) -> Self {
self.backends
.push(BackendEntry::new(id, BackendRole::Search, kind));
self
}
pub fn graph_backend(mut self, id: impl Into<String>, kind: BackendKind) -> Self {
self.backends
.push(BackendEntry::new(id, BackendRole::Graph, kind));
self
}
pub fn terminology_backend(mut self, id: impl Into<String>, kind: BackendKind) -> Self {
self.backends
.push(BackendEntry::new(id, BackendRole::Terminology, kind));
self
}
pub fn with_routing_rule(mut self, rule: RoutingRule) -> Self {
self.routing_rules.push(rule);
self
}
pub fn sync_mode(mut self, mode: SyncMode) -> Self {
self.sync_config.mode = mode;
self
}
pub fn with_sync_config(mut self, config: SyncConfig) -> Self {
self.sync_config = config;
self
}
pub fn with_cost_config(mut self, config: CostConfig) -> Self {
self.cost_config = config;
self
}
pub fn with_health_config(mut self, config: HealthConfig) -> Self {
self.health_config = config;
self
}
pub fn build(self) -> Result<CompositeConfig, ConfigError> {
let config = CompositeConfig {
backends: self.backends,
routing_rules: self.routing_rules,
sync_config: self.sync_config,
cost_config: self.cost_config,
health_config: self.health_config,
};
let _ = config.validate()?;
Ok(config)
}
pub fn build_with_warnings(self) -> Result<(CompositeConfig, Vec<ConfigWarning>), ConfigError> {
let config = CompositeConfig {
backends: self.backends,
routing_rules: self.routing_rules,
sync_config: self.sync_config,
cost_config: self.cost_config,
health_config: self.health_config,
};
let warnings = config.validate()?;
Ok((config, warnings))
}
}
#[derive(Debug, Clone, thiserror::Error)]
pub enum ConfigError {
#[error("no primary backend configured - exactly one primary backend is required")]
NoPrimaryBackend,
#[error("multiple primary backends configured: {0:?} - only one primary is allowed")]
MultiplePrimaryBackends(Vec<String>),
#[error("duplicate backend ID: {0}")]
DuplicateBackendId(String),
#[error("backend '{backend_id}' references non-existent failover backend '{failover_id}'")]
InvalidFailoverReference {
backend_id: String,
failover_id: String,
},
#[error("routing rule '{rule_id}' targets non-existent backend '{target_id}'")]
InvalidRoutingTarget {
rule_id: String,
target_id: String,
},
}
#[derive(Debug, Clone)]
pub enum ConfigWarning {
NoSecondaryBackends,
RedundantCapability {
capability: BackendCapability,
backends: Vec<String>,
},
}
impl std::fmt::Display for ConfigWarning {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ConfigWarning::NoSecondaryBackends => {
write!(
f,
"no secondary backends configured - using primary for all operations"
)
}
ConfigWarning::RedundantCapability {
capability,
backends,
} => {
write!(
f,
"capability {:?} is provided by multiple backends: {:?}",
capability, backends
)
}
}
}
}
mod humantime_serde {
use serde::{Deserialize, Deserializer, Serializer};
use std::time::Duration;
pub fn serialize<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&humantime::format_duration(*duration).to_string())
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
humantime::parse_duration(&s).map_err(serde::de::Error::custom)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_backend_role_capabilities() {
let primary_caps = BackendRole::Primary.typical_capabilities();
assert!(primary_caps.contains(&BackendCapability::Crud));
assert!(primary_caps.contains(&BackendCapability::Versioning));
let search_caps = BackendRole::Search.typical_capabilities();
assert!(search_caps.contains(&BackendCapability::FullTextSearch));
let graph_caps = BackendRole::Graph.typical_capabilities();
assert!(graph_caps.contains(&BackendCapability::ChainedSearch));
}
#[test]
fn test_config_builder_minimal() {
let config = CompositeConfigBuilder::new()
.primary("sqlite", BackendKind::Sqlite)
.build()
.unwrap();
assert_eq!(config.backends.len(), 1);
assert!(config.primary().is_some());
assert_eq!(config.primary_id(), Some("sqlite"));
}
#[test]
fn test_config_builder_with_secondaries() {
let config = CompositeConfigBuilder::new()
.primary("pg", BackendKind::Postgres)
.search_backend("es", BackendKind::Elasticsearch)
.graph_backend("neo4j", BackendKind::Neo4j)
.build()
.unwrap();
assert_eq!(config.backends.len(), 3);
assert_eq!(config.secondaries().count(), 2);
}
#[test]
fn test_config_validation_no_primary() {
let result = CompositeConfigBuilder::new()
.search_backend("es", BackendKind::Elasticsearch)
.build();
assert!(matches!(result, Err(ConfigError::NoPrimaryBackend)));
}
#[test]
fn test_config_validation_multiple_primaries() {
let result = CompositeConfigBuilder::new()
.primary("pg1", BackendKind::Postgres)
.primary("pg2", BackendKind::Postgres)
.build();
assert!(matches!(
result,
Err(ConfigError::MultiplePrimaryBackends(_))
));
}
#[test]
fn test_backend_entry_effective_capabilities() {
let entry = BackendEntry::new("test", BackendRole::Search, BackendKind::Elasticsearch);
let caps = entry.effective_capabilities();
assert!(caps.contains(&BackendCapability::FullTextSearch));
let entry_explicit =
BackendEntry::new("test", BackendRole::Search, BackendKind::Elasticsearch)
.with_capabilities(vec![BackendCapability::BasicSearch]);
let caps_explicit = entry_explicit.effective_capabilities();
assert!(caps_explicit.contains(&BackendCapability::BasicSearch));
assert!(!caps_explicit.contains(&BackendCapability::FullTextSearch));
}
}