use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use fred::types::config::ReplicaFilter;
use fred::types::config::Server;
use parking_lot::RwLock;
use tokio::time::Instant;
use tracing::debug;
#[derive(Default, Debug)]
pub(crate) struct RouteableReplicaFilter {
replicas: Arc<RwLock<HashMap<String, Replica>>>,
}
#[derive(Debug)]
struct Replica {
expires: Instant,
routeable: bool,
}
#[async_trait::async_trait]
impl ReplicaFilter for RouteableReplicaFilter {
#[tracing::instrument(level = "trace")]
async fn filter(&self, _primary: &Server, replica: &Server) -> bool {
let addr = format!("{}:{}", replica.host, replica.port);
let cached = {
let replicas = self.replicas.read();
replicas.get(&addr).map(|rep| (rep.expires, rep.routeable))
};
if let Some((expires, routeable)) = cached {
if expires > Instant::now() {
return routeable;
}
debug!("redis replica filter cache: entry for {addr} expired");
}
let routeable = tokio::time::timeout(
Duration::from_millis(250),
tokio::net::TcpStream::connect(&addr),
)
.await
.map(|res| res.is_ok())
.inspect_err(|_e| debug!("{addr} is being broadcast as part of redis but is not currently routeable, which may be intentional if using centralized or high-availabitliy setups with internal IPs for certain nodes or might represent a misconfiguration or infrastructure failure"))
.unwrap_or(false);
let mut replicas = self.replicas.write();
replicas.insert(
addr,
Replica {
expires: Instant::now() + Duration::from_secs(300),
routeable,
},
);
routeable
}
}
#[cfg(test)]
mod tests {
use tokio::net::TcpListener;
use super::*;
fn dummy_primary() -> Server {
Server::new("127.0.0.1", 6379)
}
fn server(port: u16) -> Server {
Server::new("127.0.0.1", port)
}
fn seed_cache(filter: &RouteableReplicaFilter, port: u16, routeable: bool, expires: Instant) {
let addr = format!("127.0.0.1:{port}");
filter
.replicas
.write()
.insert(addr, Replica { expires, routeable });
}
#[tokio::test]
async fn reachable_replica_returns_true() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
let filter = RouteableReplicaFilter::default();
assert!(filter.filter(&dummy_primary(), &server(port)).await);
}
#[tokio::test]
async fn unreachable_replica_returns_false() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let filter = RouteableReplicaFilter::default();
assert!(!filter.filter(&dummy_primary(), &server(port)).await);
}
#[tokio::test]
async fn cached_result_is_returned_without_reconnect() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
let filter = RouteableReplicaFilter::default();
assert!(filter.filter(&dummy_primary(), &server(port)).await);
drop(listener);
assert!(filter.filter(&dummy_primary(), &server(port)).await);
}
#[tokio::test]
async fn result_is_cached_after_filter() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
let filter = RouteableReplicaFilter::default();
filter.filter(&dummy_primary(), &server(port)).await;
let replicas = filter.replicas.read();
let addr = format!("127.0.0.1:{port}");
let entry = replicas.get(&addr).expect("entry should be cached");
assert!(entry.routeable);
assert!(entry.expires > Instant::now());
}
#[tokio::test]
async fn expired_cache_triggers_fresh_connect() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let filter = RouteableReplicaFilter::default();
let expired = Instant::now() - Duration::from_secs(1);
seed_cache(&filter, port, true, expired);
assert!(!filter.filter(&dummy_primary(), &server(port)).await);
}
#[tokio::test]
async fn unexpired_cache_is_used() {
let filter = RouteableReplicaFilter::default();
let port = 1; let future = Instant::now() + Duration::from_secs(300);
seed_cache(&filter, port, true, future);
assert!(filter.filter(&dummy_primary(), &server(port)).await);
}
#[tokio::test]
async fn unexpired_false_cache_is_used() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
let filter = RouteableReplicaFilter::default();
let future = Instant::now() + Duration::from_secs(300);
seed_cache(&filter, port, false, future);
assert!(!filter.filter(&dummy_primary(), &server(port)).await);
}
#[tokio::test]
async fn separate_replicas_cached_independently() {
let listener_a = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port_a = listener_a.local_addr().unwrap().port();
let listener_b = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port_b = listener_b.local_addr().unwrap().port();
drop(listener_b);
let filter = RouteableReplicaFilter::default();
assert!(filter.filter(&dummy_primary(), &server(port_a)).await);
assert!(!filter.filter(&dummy_primary(), &server(port_b)).await);
let replicas = filter.replicas.read();
assert_eq!(replicas.len(), 2);
}
#[tokio::test]
async fn expired_entry_gets_replaced() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
let filter = RouteableReplicaFilter::default();
let expired = Instant::now() - Duration::from_secs(1);
seed_cache(&filter, port, false, expired);
assert!(filter.filter(&dummy_primary(), &server(port)).await);
let replicas = filter.replicas.read();
let addr = format!("127.0.0.1:{port}");
let entry = replicas.get(&addr).unwrap();
assert!(entry.routeable);
assert!(entry.expires > Instant::now());
}
#[tokio::test]
async fn connect_timeout_returns_false() {
let filter = RouteableReplicaFilter::default();
let primary = dummy_primary();
let replica = Server::new("192.0.2.1", 1);
let start = Instant::now();
let result = filter.filter(&primary, &replica).await;
let elapsed = start.elapsed();
assert!(!result);
assert!(
elapsed >= Duration::from_millis(200),
"expected timeout (~250ms), but returned in {elapsed:?}"
);
}
#[tokio::test]
async fn connect_timeout_result_is_cached() {
let filter = RouteableReplicaFilter::default();
let primary = dummy_primary();
let replica = Server::new("192.0.2.1", 1);
filter.filter(&primary, &replica).await;
let start = Instant::now();
let result = filter.filter(&primary, &replica).await;
let elapsed = start.elapsed();
assert!(!result);
assert!(
elapsed < Duration::from_millis(50),
"expected instant cache hit, but took {elapsed:?}"
);
}
}