use super::{
ConsistencyLevel, ParsedHints, RouteTarget, RoutingConfig, RoutingError, Result,
};
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct NodeInfo {
pub name: String,
pub role: NodeRole,
pub sync_mode: SyncMode,
pub lag_ms: u64,
pub healthy: bool,
pub enabled: bool,
pub weight: u32,
pub tags: Vec<String>,
pub zone: Option<String>,
}
impl NodeInfo {
pub fn primary(name: &str) -> Self {
Self {
name: name.to_string(),
role: NodeRole::Primary,
sync_mode: SyncMode::Primary,
lag_ms: 0,
healthy: true,
enabled: true,
weight: 100,
tags: Vec::new(),
zone: None,
}
}
pub fn standby(name: &str, sync_mode: SyncMode) -> Self {
Self {
name: name.to_string(),
role: NodeRole::Standby,
sync_mode,
lag_ms: 0,
healthy: true,
enabled: true,
weight: 100,
tags: Vec::new(),
zone: None,
}
}
pub fn with_lag(mut self, lag_ms: u64) -> Self {
self.lag_ms = lag_ms;
self
}
pub fn with_tags(mut self, tags: Vec<String>) -> Self {
self.tags = tags;
self
}
pub fn with_zone(mut self, zone: &str) -> Self {
self.zone = Some(zone.to_string());
self
}
pub fn has_tag(&self, tag: &str) -> bool {
self.tags.iter().any(|t| t == tag)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NodeRole {
Primary,
Standby,
ReadReplica,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SyncMode {
Primary,
Sync,
SemiSync,
Async,
}
impl SyncMode {
pub fn matches_target(&self, target: RouteTarget) -> bool {
match target {
RouteTarget::Primary => *self == SyncMode::Primary,
RouteTarget::Sync => *self == SyncMode::Sync,
RouteTarget::SemiSync => *self == SyncMode::SemiSync,
RouteTarget::Async => *self == SyncMode::Async,
RouteTarget::Standby => matches!(self, SyncMode::Sync | SyncMode::SemiSync | SyncMode::Async),
RouteTarget::Any => true,
RouteTarget::Local => true, RouteTarget::Vector => true, }
}
}
#[derive(Debug)]
pub struct NodeFilter {
config: RoutingConfig,
local_zone: Option<String>,
}
impl NodeFilter {
pub fn new(config: RoutingConfig) -> Self {
Self {
config,
local_zone: None,
}
}
pub fn with_local_zone(mut self, zone: &str) -> Self {
self.local_zone = Some(zone.to_string());
self
}
pub fn filter<'a>(
&self,
nodes: &'a [NodeInfo],
criteria: &NodeCriteria,
) -> FilterResult<'a> {
let mut eligible: Vec<&NodeInfo> = nodes.iter()
.filter(|n| n.healthy && n.enabled)
.collect();
let mut reasons = Vec::new();
if let Some(ref name) = criteria.node_name {
let count_before = eligible.len();
eligible.retain(|n| n.name == *name);
if eligible.len() < count_before {
reasons.push(format!("Filtered to node: {}", name));
}
}
if let Some(target) = criteria.route {
let count_before = eligible.len();
eligible.retain(|n| self.matches_route_target(n, target));
if eligible.len() < count_before {
reasons.push(format!("Filtered by route target: {:?}", target));
}
}
if let Some(consistency) = criteria.consistency {
let count_before = eligible.len();
eligible.retain(|n| self.meets_consistency(n, consistency, criteria.max_lag));
if eligible.len() < count_before {
reasons.push(format!("Filtered by consistency: {:?}", consistency));
}
}
if let Some(max_lag) = criteria.max_lag {
let count_before = eligible.len();
let max_lag_ms = max_lag.as_millis() as u64;
eligible.retain(|n| n.lag_ms <= max_lag_ms);
if eligible.len() < count_before {
reasons.push(format!("Filtered by max lag: {}ms", max_lag_ms));
}
}
if !criteria.required_tags.is_empty() {
let count_before = eligible.len();
eligible.retain(|n| criteria.required_tags.iter().all(|tag| n.has_tag(tag)));
if eligible.len() < count_before {
reasons.push(format!("Filtered by tags: {:?}", criteria.required_tags));
}
}
if criteria.route == Some(RouteTarget::Local) {
if let Some(ref local_zone) = self.local_zone {
let local_nodes: Vec<_> = eligible.iter()
.filter(|n| n.zone.as_ref() == Some(local_zone))
.copied()
.collect();
if !local_nodes.is_empty() {
eligible = local_nodes;
reasons.push(format!("Preferred local zone: {}", local_zone));
}
}
}
if criteria.route == Some(RouteTarget::Vector) {
let vector_nodes: Vec<_> = eligible.iter()
.filter(|n| n.has_tag("vector"))
.copied()
.collect();
if !vector_nodes.is_empty() {
eligible = vector_nodes;
reasons.push("Filtered to vector-capable nodes".to_string());
}
}
if let Some(ref alias) = criteria.alias {
if let Some(alias_nodes) = self.config.resolve_alias(alias) {
let count_before = eligible.len();
eligible.retain(|n| alias_nodes.contains(&n.name));
if eligible.len() < count_before {
reasons.push(format!("Resolved alias: {}", alias));
}
}
}
FilterResult {
eligible,
reasons,
fallback_used: false,
}
}
fn matches_route_target(&self, node: &NodeInfo, target: RouteTarget) -> bool {
match target {
RouteTarget::Primary => node.role == NodeRole::Primary,
RouteTarget::Standby => node.role == NodeRole::Standby,
RouteTarget::Sync => node.sync_mode == SyncMode::Sync,
RouteTarget::SemiSync => node.sync_mode == SyncMode::SemiSync,
RouteTarget::Async => node.sync_mode == SyncMode::Async,
RouteTarget::Any => true,
RouteTarget::Local => true, RouteTarget::Vector => node.has_tag("vector"),
}
}
fn meets_consistency(&self, node: &NodeInfo, level: ConsistencyLevel, max_lag: Option<Duration>) -> bool {
let config = match self.config.get_consistency_config(level) {
Some(c) => c,
None => return true, };
if !config.allows_node(&node.name) && !config.allows_node(&format!("{:?}", node.role).to_lowercase()) {
return false;
}
let max_lag_ms = max_lag
.map(|d| d.as_millis() as u64)
.unwrap_or(config.max_lag_ms);
if max_lag_ms < u64::MAX && node.lag_ms > max_lag_ms {
return false;
}
true
}
pub fn default_criteria_for_read(&self) -> NodeCriteria {
NodeCriteria {
route: Some(self.config.default.read_target),
consistency: Some(self.config.default.consistency),
..Default::default()
}
}
pub fn default_criteria_for_write(&self) -> NodeCriteria {
NodeCriteria {
route: Some(self.config.default.write_target),
consistency: Some(ConsistencyLevel::Strong),
..Default::default()
}
}
}
#[derive(Debug, Clone, Default)]
pub struct NodeCriteria {
pub node_name: Option<String>,
pub route: Option<RouteTarget>,
pub consistency: Option<ConsistencyLevel>,
pub max_lag: Option<Duration>,
pub required_tags: Vec<String>,
pub alias: Option<String>,
pub branch: Option<String>,
}
impl NodeCriteria {
pub fn from_hints(hints: &ParsedHints) -> Self {
Self {
node_name: hints.node.clone(),
route: hints.route,
consistency: hints.consistency,
max_lag: hints.max_lag,
required_tags: Vec::new(),
alias: None,
branch: hints.branch.clone(),
}
}
pub fn with_tag(mut self, tag: &str) -> Self {
self.required_tags.push(tag.to_string());
self
}
pub fn with_alias(mut self, alias: &str) -> Self {
self.alias = Some(alias.to_string());
self
}
}
#[derive(Debug)]
pub struct FilterResult<'a> {
pub eligible: Vec<&'a NodeInfo>,
pub reasons: Vec<String>,
pub fallback_used: bool,
}
impl<'a> FilterResult<'a> {
pub fn has_matches(&self) -> bool {
!self.eligible.is_empty()
}
pub fn count(&self) -> usize {
self.eligible.len()
}
pub fn first(&self) -> Option<&'a NodeInfo> {
self.eligible.first().copied()
}
pub fn require_match(&self, context: &str) -> Result<&'a NodeInfo> {
self.first().ok_or_else(|| {
RoutingError::NoMatchingNodes(format!(
"{}: reasons: {}",
context,
self.reasons.join(", ")
))
})
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_nodes() -> Vec<NodeInfo> {
vec![
NodeInfo::primary("primary"),
NodeInfo::standby("standby-sync-1", SyncMode::Sync),
NodeInfo::standby("standby-async-1", SyncMode::Async).with_lag(500),
NodeInfo::standby("standby-async-2", SyncMode::Async).with_lag(5000),
NodeInfo::standby("standby-vector-1", SyncMode::Async)
.with_tags(vec!["vector".to_string()]),
]
}
#[test]
fn test_filter_by_route_target() {
let filter = NodeFilter::new(RoutingConfig::default());
let nodes = test_nodes();
let criteria = NodeCriteria {
route: Some(RouteTarget::Primary),
..Default::default()
};
let result = filter.filter(&nodes, &criteria);
assert_eq!(result.count(), 1);
assert_eq!(result.first().unwrap().name, "primary");
let criteria = NodeCriteria {
route: Some(RouteTarget::Standby),
..Default::default()
};
let result = filter.filter(&nodes, &criteria);
assert_eq!(result.count(), 4);
}
#[test]
fn test_filter_by_sync_mode() {
let filter = NodeFilter::new(RoutingConfig::default());
let nodes = test_nodes();
let criteria = NodeCriteria {
route: Some(RouteTarget::Sync),
..Default::default()
};
let result = filter.filter(&nodes, &criteria);
assert_eq!(result.count(), 1);
assert_eq!(result.first().unwrap().name, "standby-sync-1");
}
#[test]
fn test_filter_by_max_lag() {
let filter = NodeFilter::new(RoutingConfig::default());
let nodes = test_nodes();
let criteria = NodeCriteria {
max_lag: Some(Duration::from_millis(1000)),
..Default::default()
};
let result = filter.filter(&nodes, &criteria);
assert!(result.eligible.iter().all(|n| n.lag_ms <= 1000));
}
#[test]
fn test_filter_by_node_name() {
let filter = NodeFilter::new(RoutingConfig::default());
let nodes = test_nodes();
let criteria = NodeCriteria {
node_name: Some("standby-sync-1".to_string()),
..Default::default()
};
let result = filter.filter(&nodes, &criteria);
assert_eq!(result.count(), 1);
assert_eq!(result.first().unwrap().name, "standby-sync-1");
}
#[test]
fn test_filter_by_tag() {
let filter = NodeFilter::new(RoutingConfig::default());
let nodes = test_nodes();
let criteria = NodeCriteria {
route: Some(RouteTarget::Vector),
..Default::default()
};
let result = filter.filter(&nodes, &criteria);
assert_eq!(result.count(), 1);
assert_eq!(result.first().unwrap().name, "standby-vector-1");
}
#[test]
fn test_filter_with_alias() {
let mut config = RoutingConfig::default();
config.add_alias("analytics", vec![
"standby-async-1".to_string(),
"standby-async-2".to_string(),
]);
let filter = NodeFilter::new(config);
let nodes = test_nodes();
let criteria = NodeCriteria {
alias: Some("analytics".to_string()),
..Default::default()
};
let result = filter.filter(&nodes, &criteria);
assert_eq!(result.count(), 2);
}
#[test]
fn test_local_zone_preference() {
let filter = NodeFilter::new(RoutingConfig::default())
.with_local_zone("us-west-1");
let nodes = vec![
NodeInfo::standby("standby-1", SyncMode::Async).with_zone("us-east-1"),
NodeInfo::standby("standby-2", SyncMode::Async).with_zone("us-west-1"),
];
let criteria = NodeCriteria {
route: Some(RouteTarget::Local),
..Default::default()
};
let result = filter.filter(&nodes, &criteria);
assert_eq!(result.count(), 1);
assert_eq!(result.first().unwrap().name, "standby-2");
}
#[test]
fn test_no_match_error() {
let filter = NodeFilter::new(RoutingConfig::default());
let nodes = test_nodes();
let criteria = NodeCriteria {
node_name: Some("nonexistent".to_string()),
..Default::default()
};
let result = filter.filter(&nodes, &criteria);
assert!(!result.has_matches());
let err = result.require_match("test context");
assert!(err.is_err());
}
#[test]
fn test_from_hints() {
let parser = super::super::HintParser::new();
let hints = parser.parse("/*helios:route=sync,lag=100ms*/ SELECT 1");
let criteria = NodeCriteria::from_hints(&hints);
assert_eq!(criteria.route, Some(RouteTarget::Sync));
assert_eq!(criteria.max_lag, Some(Duration::from_millis(100)));
}
}