#[cfg(all(feature = "replicas", any(feature = "enable-native-tls", feature = "enable-rustls")))]
use crate::types::TlsHostMapping;
#[cfg(feature = "replicas")]
use crate::{
error::{RedisError, RedisErrorKind},
modules::inner::RedisClientInner,
protocol::{
command::RedisCommand,
connection,
connection::{CommandBuffer, RedisWriter},
},
router::{centralized, clustered, utils, Written},
types::Server,
};
#[cfg(feature = "replicas")]
use std::{
collections::{HashMap, VecDeque},
convert::identity,
fmt,
fmt::Formatter,
sync::Arc,
};
#[cfg(feature = "replicas")]
#[cfg_attr(docsrs, doc(cfg(feature = "replicas")))]
#[async_trait]
pub trait ReplicaFilter: Send + Sync + 'static {
#[allow(unused_variables)]
async fn filter(&self, primary: &Server, replica: &Server) -> bool {
true
}
}
#[cfg(feature = "replicas")]
#[cfg_attr(docsrs, doc(cfg(feature = "replicas")))]
#[derive(Clone)]
pub struct ReplicaConfig {
pub lazy_connections: bool,
pub filter: Option<Arc<dyn ReplicaFilter>>,
pub ignore_reconnection_errors: bool,
pub connection_error_count: u32,
pub primary_fallback: bool,
}
#[cfg(feature = "replicas")]
impl fmt::Debug for ReplicaConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("ReplicaConfig")
.field("lazy_connections", &self.lazy_connections)
.field("ignore_reconnection_errors", &self.ignore_reconnection_errors)
.field("connection_error_count", &self.connection_error_count)
.field("primary_fallback", &self.primary_fallback)
.finish()
}
}
#[cfg(feature = "replicas")]
impl PartialEq for ReplicaConfig {
fn eq(&self, other: &Self) -> bool {
self.lazy_connections == other.lazy_connections
&& self.ignore_reconnection_errors == other.ignore_reconnection_errors
&& self.connection_error_count == other.connection_error_count
&& self.primary_fallback == other.primary_fallback
}
}
#[cfg(feature = "replicas")]
impl Eq for ReplicaConfig {}
#[cfg(feature = "replicas")]
impl Default for ReplicaConfig {
fn default() -> Self {
ReplicaConfig {
lazy_connections: true,
filter: None,
ignore_reconnection_errors: true,
connection_error_count: 0,
primary_fallback: true,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Default)]
#[cfg(feature = "replicas")]
#[cfg_attr(docsrs, doc(cfg(feature = "replicas")))]
pub struct ReplicaRouter {
counter: usize,
servers: Vec<Server>,
}
#[cfg(feature = "replicas")]
impl ReplicaRouter {
pub fn next(&mut self) -> Option<&Server> {
self.counter = (self.counter + 1) % self.servers.len();
self.servers.get(self.counter)
}
pub fn add(&mut self, server: Server) {
if !self.servers.contains(&server) {
self.servers.push(server);
}
}
pub fn remove(&mut self, server: &Server) {
self.servers = self.servers.drain(..).filter(|_server| server != _server).collect();
}
pub fn len(&self) -> usize {
self.servers.len()
}
pub fn iter(&self) -> impl Iterator<Item = &Server> {
self.servers.iter()
}
}
#[cfg(feature = "replicas")]
#[cfg_attr(docsrs, doc(cfg(feature = "replicas")))]
#[derive(Clone, Debug, Eq, PartialEq, Default)]
pub struct ReplicaSet {
servers: HashMap<Server, ReplicaRouter>,
}
#[cfg(feature = "replicas")]
#[allow(dead_code)]
impl ReplicaSet {
pub fn new() -> ReplicaSet {
ReplicaSet {
servers: HashMap::new(),
}
}
pub fn add(&mut self, primary: Server, replica: Server) {
self
.servers
.entry(primary)
.or_insert(ReplicaRouter::default())
.add(replica);
}
pub fn remove(&mut self, primary: &Server, replica: &Server) {
let should_remove = if let Some(router) = self.servers.get_mut(primary) {
router.remove(replica);
router.len() == 0
} else {
false
};
if should_remove {
self.servers.remove(primary);
}
}
pub fn remove_replica(&mut self, replica: &Server) {
self.servers = self
.servers
.drain()
.filter_map(|(primary, mut routing)| {
routing.remove(replica);
if routing.len() > 0 {
Some((primary, routing))
} else {
None
}
})
.collect();
}
pub fn next_replica(&mut self, primary: &Server) -> Option<&Server> {
self.servers.get_mut(primary).and_then(|router| router.next())
}
pub fn replicas(&self, primary: &Server) -> impl Iterator<Item = &Server> {
self
.servers
.get(primary)
.map(|router| router.iter())
.into_iter()
.flat_map(identity)
}
pub fn to_map(&self) -> HashMap<Server, Server> {
let mut out = HashMap::with_capacity(self.servers.len());
for (primary, replicas) in self.servers.iter() {
for replica in replicas.iter() {
out.insert(replica.clone(), primary.clone());
}
}
out
}
pub fn all_replicas(&self) -> Vec<Server> {
let mut out = Vec::with_capacity(self.servers.len());
for (_, replicas) in self.servers.iter() {
for replica in replicas.iter() {
out.push(replica.clone());
}
}
out
}
pub fn clear(&mut self) {
self.servers.clear();
}
}
#[cfg(feature = "replicas")]
pub struct Replicas {
pub(crate) writers: HashMap<Server, RedisWriter>,
routing: ReplicaSet,
buffer: VecDeque<RedisCommand>,
}
#[cfg(feature = "replicas")]
#[allow(dead_code)]
impl Replicas {
pub fn new() -> Replicas {
Replicas {
writers: HashMap::new(),
routing: ReplicaSet::new(),
buffer: VecDeque::new(),
}
}
pub async fn sync_connections(&mut self, inner: &Arc<RedisClientInner>) -> Result<(), RedisError> {
for (_, writer) in self.writers.drain() {
let commands = writer.graceful_close().await;
self.buffer.extend(commands);
}
for (replica, primary) in self.routing.to_map() {
let _ = self.add_connection(inner, primary, replica, false).await?;
}
Ok(())
}
pub async fn clear_connections(&mut self, inner: &Arc<RedisClientInner>) -> Result<(), RedisError> {
self.routing.clear();
self.sync_connections(inner).await
}
pub fn clear_routing(&mut self) {
self.routing.clear();
}
pub async fn add_connection(
&mut self,
inner: &Arc<RedisClientInner>,
primary: Server,
replica: Server,
force: bool,
) -> Result<(), RedisError> {
_debug!(
inner,
"Adding replica connection {} (replica) -> {} (primary)",
replica,
primary
);
if !inner.connection.replica.lazy_connections || force {
let mut transport = connection::create(inner, &replica, None).await?;
let _ = transport.setup(inner, None).await?;
let (_, writer) = if inner.config.server.is_clustered() {
let _ = transport.readonly(inner, None).await?;
connection::split_and_initialize(inner, transport, true, clustered::spawn_reader_task)?
} else {
connection::split_and_initialize(inner, transport, true, centralized::spawn_reader_task)?
};
self.writers.insert(replica.clone(), writer);
}
self.routing.add(primary, replica);
Ok(())
}
pub async fn drop_writer(&mut self, replica: &Server) {
if let Some(writer) = self.writers.remove(replica) {
let commands = writer.graceful_close().await;
self.buffer.extend(commands);
}
}
pub fn remove_replica(&mut self, replica: &Server) {
self.routing.remove_replica(replica);
}
pub async fn remove_connection(
&mut self,
inner: &Arc<RedisClientInner>,
primary: &Server,
replica: &Server,
keep_routable: bool,
) -> Result<(), RedisError> {
_debug!(
inner,
"Removing replica connection {} (replica) -> {} (primary)",
replica,
primary
);
self.drop_writer(replica).await;
if !keep_routable {
self.routing.remove(primary, replica);
}
Ok(())
}
pub async fn check_and_flush(&mut self) -> Result<(), RedisError> {
for (_, writer) in self.writers.iter_mut() {
let _ = writer.flush().await?;
}
Ok(())
}
pub fn has_replica_connection(&self, primary: &Server) -> bool {
for replica in self.routing.replicas(primary) {
if self.has_connection(replica) {
return true;
}
}
false
}
pub fn has_connection(&self, replica: &Server) -> bool {
self.writers.get(replica).map(|w| w.is_working()).unwrap_or(false)
}
pub fn routing_table(&self) -> HashMap<Server, Server> {
self.routing.to_map()
}
pub async fn drop_broken_connections(&mut self) {
let mut new_writers = HashMap::with_capacity(self.writers.len());
for (server, writer) in self.writers.drain() {
if writer.is_working() {
new_writers.insert(server, writer);
} else {
let commands = writer.graceful_close().await;
self.buffer.extend(commands);
self.routing.remove_replica(&server);
}
}
self.writers = new_writers;
}
pub fn active_connections(&self) -> Vec<Server> {
self
.writers
.iter()
.filter_map(|(server, writer)| {
if writer.is_working() {
Some(server.clone())
} else {
None
}
})
.collect()
}
pub async fn write(
&mut self,
inner: &Arc<RedisClientInner>,
primary: &Server,
mut command: RedisCommand,
force_flush: bool,
) -> Written {
let replica = match self.routing.next_replica(primary) {
Some(replica) => replica.clone(),
None => {
return if inner.connection.replica.primary_fallback {
Written::Fallback(command)
} else {
command.finish(
inner,
Err(RedisError::new(RedisErrorKind::Replica, "Missing replica node.")),
);
Written::Ignore
};
},
};
_trace!(
inner,
"Found replica {} (primary: {}) for {} ({})",
replica,
primary,
command.kind.to_str_debug(),
command.debug_id()
);
let writer = match self.writers.get_mut(&replica) {
Some(writer) => writer,
None => {
if inner.connection.replica.lazy_connections {
_debug!(inner, "Lazily adding {} replica connection", replica);
if let Err(e) = self.add_connection(inner, primary.clone(), replica.clone(), true).await {
return Written::Disconnected((Some(replica.clone()), Some(command), e));
}
match self.writers.get_mut(&replica) {
Some(writer) => writer,
None => {
return Written::Disconnected((
Some(replica.clone()),
Some(command),
RedisError::new(RedisErrorKind::Replica, "Missing connection."),
));
},
}
} else {
return Written::NotFound(command);
}
},
};
let (frame, should_flush) = match utils::prepare_command(inner, &writer.counters, &mut command) {
Ok((frame, should_flush)) => (frame, should_flush || force_flush),
Err(e) => {
_warn!(inner, "Frame encoding error for {}", command.kind.to_str_debug());
command.finish(inner, Err(e));
return Written::Ignore;
},
};
let blocks_connection = command.blocks_connection();
_debug!(
inner,
"Sending {} ({}) to replica {}",
command.kind.to_str_debug(),
command.debug_id(),
replica
);
command.write_attempts += 1;
if !writer.is_working() {
let error = RedisError::new(RedisErrorKind::IO, "Connection closed.");
_debug!(
inner,
"Error sending replica command {}: {:?}",
command.kind.to_str_debug(),
error
);
return Written::Disconnected((Some(writer.server.clone()), Some(command), error));
}
writer.push_command(inner, command);
if let Err(err) = writer.write_frame(frame, should_flush, false).await {
Written::Disconnected((Some(writer.server.clone()), None, err))
} else {
if blocks_connection {
inner.backchannel.write().await.set_blocked(&writer.server);
}
Written::Sent((writer.server.clone(), should_flush))
}
}
pub fn take_retry_buffer(&mut self) -> CommandBuffer {
self.buffer.drain(..).collect()
}
}
#[cfg(all(feature = "replicas", any(feature = "enable-native-tls", feature = "enable-rustls")))]
pub fn map_replica_tls_names(inner: &Arc<RedisClientInner>, primary: &Server, replica: &mut Server) {
let policy = match inner.config.tls {
Some(ref config) => &config.hostnames,
None => {
_trace!(inner, "Skip modifying TLS hostname for replicas.");
return;
},
};
if *policy == TlsHostMapping::None {
_trace!(inner, "Skip modifying TLS hostnames for replicas.");
return;
}
replica.set_tls_server_name(policy, &primary.host);
}
#[cfg(all(
feature = "replicas",
not(any(feature = "enable-native-tls", feature = "enable-rustls"))
))]
pub fn map_replica_tls_names(_: &Arc<RedisClientInner>, _: &Server, _: &mut Server) {}