use iggy_common::ClusterClient;
use iggy_common::{
ClusterMetadata, ClusterNodeRole, ClusterNodeStatus, IggyError, TransportProtocol,
};
use std::net::SocketAddr;
use std::str::FromStr;
use tracing::{debug, info, warn};
const MAX_LEADER_REDIRECTS: u8 = 3;
pub async fn check_and_redirect_to_leader<C: ClusterClient>(
client: &C,
current_address: &str,
transport: TransportProtocol,
) -> Result<Option<String>, IggyError> {
debug!("Checking cluster metadata for leader detection");
match client.get_cluster_metadata().await {
Ok(metadata) => {
debug!(
"Got cluster metadata: {} nodes, cluster: {}",
metadata.nodes.len(),
metadata.name
);
process_cluster_metadata(&metadata, current_address, transport)
}
Err(e) => {
warn!(
"Failed to get cluster metadata: {}, connection will continue on server node {}",
e, current_address
);
Ok(None)
}
}
}
fn process_cluster_metadata(
metadata: &ClusterMetadata,
current_address: &str,
transport: TransportProtocol,
) -> Result<Option<String>, IggyError> {
if metadata.nodes.len() == 1 {
debug!(
"Single-node cluster detected ({}), no leader redirection needed",
metadata.nodes[0].name
);
return Ok(None);
}
let leader = metadata
.nodes
.iter()
.find(|n| n.role == ClusterNodeRole::Leader && n.status == ClusterNodeStatus::Healthy);
match leader {
Some(leader_node) => {
let leader_port = match transport {
TransportProtocol::Tcp => leader_node.endpoints.tcp,
TransportProtocol::Quic => leader_node.endpoints.quic,
TransportProtocol::Http => leader_node.endpoints.http,
TransportProtocol::WebSocket => leader_node.endpoints.websocket,
};
let leader_address = format!("{}:{}", leader_node.ip, leader_port);
info!(
"Found leader node: {} at {} (using {} transport)",
leader_node.name, leader_address, transport
);
if !is_same_address(current_address, &leader_address) {
info!(
"Current connection to {} is not the leader, will redirect to {}",
current_address, leader_address
);
Ok(Some(leader_address))
} else {
debug!("Already connected to leader at {}", current_address);
Ok(None)
}
}
None => {
warn!(
"No active leader found in cluster metadata, connection will continue on server node {}",
current_address
);
Ok(None)
}
}
}
fn is_same_address(addr1: &str, addr2: &str) -> bool {
match (parse_address(addr1), parse_address(addr2)) {
(Some(sock1), Some(sock2)) => sock1.ip() == sock2.ip() && sock1.port() == sock2.port(),
_ => normalize_address(addr1) == normalize_address(addr2),
}
}
fn parse_address(addr: &str) -> Option<SocketAddr> {
if let Ok(socket_addr) = SocketAddr::from_str(addr) {
return Some(socket_addr);
}
let normalized = addr
.replace("localhost", "127.0.0.1")
.replace("[::]", "[::1]");
SocketAddr::from_str(&normalized).ok()
}
fn normalize_address(addr: &str) -> String {
addr.to_lowercase()
.replace("localhost", "127.0.0.1")
.replace("[::]", "[::1]")
}
#[derive(Debug, Clone)]
pub struct LeaderRedirectionState {
pub redirect_count: u8,
pub last_leader_address: Option<String>,
}
impl LeaderRedirectionState {
pub fn new() -> Self {
Self {
redirect_count: 0,
last_leader_address: None,
}
}
pub fn can_redirect(&self) -> bool {
self.redirect_count < MAX_LEADER_REDIRECTS
}
pub fn increment_redirect(&mut self, leader_address: String) {
self.redirect_count += 1;
self.last_leader_address = Some(leader_address);
}
pub fn reset(&mut self) {
self.redirect_count = 0;
self.last_leader_address = None;
}
}
impl Default for LeaderRedirectionState {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_is_same_address() {
assert!(is_same_address("127.0.0.1:8090", "127.0.0.1:8090"));
assert!(is_same_address("localhost:8090", "127.0.0.1:8090"));
assert!(!is_same_address("127.0.0.1:8090", "127.0.0.1:8091"));
assert!(!is_same_address("192.168.1.1:8090", "127.0.0.1:8090"));
}
#[test]
fn test_normalize_address() {
assert_eq!(normalize_address("localhost:8090"), "127.0.0.1:8090");
assert_eq!(normalize_address("LOCALHOST:8090"), "127.0.0.1:8090");
assert_eq!(normalize_address("[::]:8090"), "[::1]:8090");
}
#[test]
fn test_leader_redirection_state() {
let mut state = LeaderRedirectionState::new();
assert!(state.can_redirect());
assert_eq!(state.redirect_count, 0);
state.increment_redirect("127.0.0.1:8090".to_string());
assert!(state.can_redirect());
assert_eq!(state.redirect_count, 1);
state.increment_redirect("127.0.0.1:8091".to_string());
state.increment_redirect("127.0.0.1:8092".to_string());
assert!(!state.can_redirect());
assert_eq!(state.redirect_count, 3);
state.reset();
assert!(state.can_redirect());
assert_eq!(state.redirect_count, 0);
}
}