use crate::client::{cache_clients, RpcRequest};
use crate::quorum::{quorum_handler, QuorumReq};
pub use crate::quorum::{AckLevel, QuorumHealth, QuorumHealthState, QuorumState};
use crate::server::{CacheMap, RpcCacheService};
use bincode::ErrorKind;
use cached::Cached;
pub use cached::SizedCache;
pub use cached::TimedCache;
pub use cached::TimedSizedCache;
use flume::{RecvError, SendError};
use gethostname::gethostname;
use lazy_static::lazy_static;
use nanoid::nanoid;
use rand::Rng;
use std::collections::HashMap;
use std::env;
use std::fmt::{Debug, Display, Formatter};
use std::time::Duration;
use tokio::sync::{mpsc, oneshot, watch};
use tokio::time;
use tracing::{debug, error, info, warn};
mod client;
pub mod quorum;
#[allow(clippy::enum_variant_names)]
mod rpc;
mod server;
lazy_static! {
pub(crate) static ref HA_MODE: bool = {
let ha_mode = env::var("HA_MODE").unwrap_or_else(|_| String::from("false"));
if ha_mode != "true" {
info!("redhac is starting in standalone mode");
false
} else {
info!("redhac is starting in HA mode");
true
}
};
pub(crate) static ref QUORUM: u8 = {
let ha_hosts = env::var("HA_HOSTS").expect("HA_HOSTS is not set");
let len = ha_hosts.split(',').count();
(len / 2) as u8
};
pub(crate) static ref TLS: bool = env::var("CACHE_TLS")
.unwrap_or_else(|_| "true".to_string())
.parse::<bool>()
.expect("Cannot parse CACHE_TLS to bool");
pub(crate) static ref MTLS: bool = env::var("CACHE_MTLS")
.unwrap_or_else(|_| "true".to_string())
.parse::<bool>()
.expect("Cannot parse CACHE_MTLS to bool");
}
#[macro_export]
macro_rules! cache_get {
($type:ty, $name:expr, $entry:expr, $config:expr, $lookup:expr) => {
async {
if let Some(v) = cache_get_value($name, $entry, $config, $lookup)
.await
.unwrap_or(None)
{
cache_get_from::<$type>(&v).await
} else {
Ok(None)
}
}
};
}
pub struct CacheNotify {
pub cache_name: String,
pub entry: String,
pub method: CacheMethod,
}
pub enum CacheMethod {
Put,
Insert(AckLevel),
Del,
Remove(AckLevel),
}
#[derive(Debug, Clone)]
pub struct CacheConfig {
cache_map: CacheMap,
pub rx_health_state: watch::Receiver<Option<QuorumHealthState>>,
tx_remote: Option<flume::Sender<RpcRequest>>,
rx_remote: Option<flume::Receiver<RpcRequest>>,
tx_quorum: Option<flume::Sender<QuorumReq>>,
rx_quorum: Option<flume::Receiver<QuorumReq>>,
tx_exit: Option<flume::Sender<oneshot::Sender<()>>>,
}
impl CacheConfig {
pub fn new() -> (watch::Sender<Option<QuorumHealthState>>, Self) {
let cache_map = HashMap::new();
let (tx_watch, rx_watch) = watch::channel::<Option<QuorumHealthState>>(None);
if *HA_MODE {
tx_watch.send(Some(QuorumHealthState::default())).unwrap();
let (tx_remote, rx_remote) = flume::unbounded::<RpcRequest>();
let (tx_quorum, rx_quorum) = flume::unbounded::<QuorumReq>();
let cfg = Self {
cache_map,
rx_health_state: rx_watch,
tx_remote: Some(tx_remote),
rx_remote: Some(rx_remote),
tx_quorum: Some(tx_quorum),
rx_quorum: Some(rx_quorum),
tx_exit: None,
};
(tx_watch, cfg)
} else {
let cfg = Self {
cache_map,
rx_health_state: rx_watch,
tx_remote: None,
rx_remote: None,
tx_quorum: None,
rx_quorum: None,
tx_exit: None,
};
(tx_watch, cfg)
}
}
pub fn spawn_cache<C: Cached<String, Vec<u8>> + Send + 'static>(
&mut self,
cache_name: String,
cache: C,
buffer: Option<usize>,
) {
let (tx, rx) = if let Some(buf) = buffer {
flume::bounded::<CacheReq>(buf)
} else {
flume::unbounded()
};
self.cache_map.insert(cache_name.clone(), tx);
tokio::spawn(cache_recv(cache, cache_name, rx));
}
pub async fn shutdown(&self) -> Result<(), CacheError> {
if let Some(tx) = &self.tx_exit {
let (tx_exit_ack, rx_exit_ack) = oneshot::channel();
tx.send_async(tx_exit_ack)
.await
.expect("cache shutdown receiver to not be closed");
rx_exit_ack
.await
.expect("cache shutdown receiver to not be closed");
}
Ok(())
}
}
#[derive(Debug)]
pub struct CacheError {
pub error: String,
}
impl Display for CacheError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
writeln!(f, "{}", self.error)
}
}
impl std::error::Error for CacheError {}
impl From<std::boxed::Box<bincode::ErrorKind>> for CacheError {
fn from(value: Box<ErrorKind>) -> Self {
Self {
error: format!("bincode serializing error: {}", value),
}
}
}
impl From<flume::SendError<RpcRequest>> for CacheError {
fn from(value: SendError<RpcRequest>) -> Self {
match value {
SendError(err) => Self {
error: format!("Flume Cache Error: {:?}", err),
},
}
}
}
impl From<&flume::SendError<RpcRequest>> for CacheError {
fn from(value: &SendError<RpcRequest>) -> Self {
match value {
SendError(err) => Self {
error: format!("Flume Cache Error: {:?}", err),
},
}
}
}
impl From<flume::RecvError> for CacheError {
fn from(value: RecvError) -> Self {
match value {
RecvError::Disconnected => Self {
error: "Flume Cache Error: RecvError::Disconnected".to_string(),
},
}
}
}
impl From<flume::SendError<QuorumReq>> for CacheError {
fn from(value: SendError<QuorumReq>) -> Self {
match value {
SendError(err) => Self {
error: format!("Flume Cache Error: {:?}", err),
},
}
}
}
impl From<&flume::SendError<QuorumReq>> for CacheError {
fn from(value: &SendError<QuorumReq>) -> Self {
match value {
SendError(err) => Self {
error: format!("Flume Cache Error: {:?}", err),
},
}
}
}
impl From<tokio::sync::mpsc::error::SendError<CacheReq>> for CacheError {
fn from(value: mpsc::error::SendError<CacheReq>) -> Self {
match value {
mpsc::error::SendError(err) => Self {
error: format!("Flume Cache Error: {:?}", err),
},
}
}
}
impl From<flume::SendError<CacheReq>> for CacheError {
fn from(err: SendError<CacheReq>) -> Self {
match err {
SendError(err) => Self {
error: format!("Flume Cache Error: {:?}", err),
},
}
}
}
#[derive(Debug)]
pub enum CacheReq {
Get {
entry: String,
resp: flume::Sender<Option<Vec<u8>>>,
},
Put {
entry: String,
value: Vec<u8>,
},
Del {
entry: String,
},
Reset,
}
pub async fn cache_get_value(
cache_name: String,
entry: String,
cache_config: &CacheConfig,
remote_lookup: bool,
) -> Result<Option<Vec<u8>>, CacheError> {
let health_state = if let Some(s) = cache_config.rx_health_state.borrow().clone() {
s.is_quorum_good()?;
Some(s)
} else {
None
};
let (tx_resp, rx_resp) = flume::unbounded::<Option<Vec<u8>>>();
let req = CacheReq::Get {
entry: entry.clone(),
resp: tx_resp,
};
let tx = cache_config.cache_map.get(&cache_name).ok_or_else(|| {
let err = format!(
"CacheMap misconfiguration - could not find expected cache_name '{}'",
&cache_name
);
error!("{err}");
CacheError { error: err }
})?;
tx.send_async(req).await.map_err(|e| {
let err = format!("Error sending local cache value over the channel: {:?}", e);
error!("{err}");
CacheError { error: err }
})?;
let local_res = rx_resp.recv_async().await.map_err(|e| {
let err = format!("Error receiving cache value through channel: {:?}", e);
error!("{err}");
CacheError { error: err }
})?;
if local_res.is_none() && health_state.is_some() && remote_lookup {
let (resp_tx, resp_rx) = flume::unbounded();
let req = RpcRequest::Get {
cache_name,
entry,
resp: resp_tx,
};
cache_config
.tx_remote
.as_ref()
.unwrap()
.send_async(req)
.await
.map_err(|e| {
let err = format!("Error sending request to RPC Clients: {:?}", e);
error!("{err}");
CacheError { error: err }
})?;
let mut res;
let mut connected_hosts = health_state.unwrap().connected_hosts;
loop {
res = resp_rx.recv_async().await?;
if res.is_some() {
break; }
if connected_hosts == 1 {
break; }
connected_hosts -= 1;
}
Ok(res)
} else {
Ok(local_res)
}
}
pub async fn cache_get_from<'a, T>(value: &'a [u8]) -> Result<Option<T>, CacheError>
where
T: Debug + serde::Deserialize<'a>,
{
let res = bincode::deserialize::<T>(value).map_err(|e| CacheError {
error: format!("Error deserializing cache result: {:?}", e),
})?;
Ok(Some(res))
}
pub async fn cache_put<T>(
cache_name: String,
entry: String,
cache_config: &CacheConfig,
value: &T,
) -> Result<(), CacheError>
where
T: Debug + serde::Serialize,
{
let health_state = if let Some(s) = cache_config.rx_health_state.borrow().clone() {
s.is_quorum_good()?;
Some(s)
} else {
None
};
let val = bincode::serialize(value)?;
let req = CacheReq::Put {
entry: entry.clone(),
value: val.clone(),
};
let tx = cache_config.cache_map.get(&cache_name).ok_or_else(|| {
let err = format!(
"CacheMap misconfiguration - could not find expected cache_name '{}'",
&cache_name
);
error!("{err}");
CacheError { error: err }
})?;
tx.send_async(req).await.map_err(|e| {
let err = format!("Error sending local cache value over the channel: {:?}", e);
error!("{err}");
CacheError { error: err }
})?;
if health_state.is_some() {
let remote_req = RpcRequest::Put {
cache_name,
entry,
value: val,
resp: None,
};
cache_config
.tx_remote
.as_ref()
.unwrap()
.send_async(remote_req)
.await?;
}
Ok(())
}
pub async fn cache_insert<T>(
cache_name: String,
entry: String,
cache_config: &CacheConfig,
value: &T,
ack_level: AckLevel,
) -> Result<(), CacheError>
where
T: Debug + serde::Serialize,
{
if !*HA_MODE {
return cache_put(cache_name, entry, cache_config, value).await;
}
let health_state = if let Some(s) = cache_config.rx_health_state.borrow().clone() {
s.is_quorum_good()?;
s
} else {
return Err(CacheError {
error: "'cache_insert' does only work with an active HA_MODE".to_string(),
});
};
let val = bincode::serialize(value)?;
let mut callback_rx = None;
match health_state.state {
QuorumState::Leader => {
insert_from_leader(
cache_name,
entry,
val,
cache_config,
ack_level,
Some(health_state),
)
.await?;
}
QuorumState::LeaderDead => {
return Err(CacheError {
error: "HA Cache is in QuorumState::LeaderDead - cache insert not possible"
.to_string(),
});
}
QuorumState::LeaderSwitch => {
insert_from_leader(
cache_name,
entry,
val,
cache_config,
ack_level,
Some(health_state),
)
.await?;
}
QuorumState::LeaderTxAwait(_) | QuorumState::LeadershipRequested(_) => {
return Err(CacheError {
error: "HA Cache has no leader yet - cache insert not possible".to_string(),
});
}
QuorumState::Follower => {
let (tx, rx) = flume::unbounded();
callback_rx = Some(rx);
let req = RpcRequest::Insert {
cache_name,
entry,
value: val,
ack_level,
resp: tx,
};
health_state
.tx_leader
.as_ref()
.expect(
"'health_state.tx_leader' is None in 'cache_insert' when it should never be",
)
.send_async(req)
.await?;
}
QuorumState::Undefined | QuorumState::Retry => {
return Err(CacheError {
error: "The HA cache layer is not ready".to_string(),
})
}
}
if let Some(rx) = callback_rx {
let res = rx.recv_async().await?;
if res {
Ok(())
} else {
Err(CacheError {
error: "Could not execute the 'cache_insert'".to_string(),
})
}
} else {
Ok(())
}
}
pub(crate) async fn insert_from_leader(
cache_name: String,
entry: String,
value: Vec<u8>,
cache_config: &CacheConfig,
ack_level: AckLevel,
health_state: Option<QuorumHealthState>,
) -> Result<bool, CacheError> {
debug!("'insert_from_leader' for {}/{}", cache_name, entry);
if !*HA_MODE {
let error = "'insert_from_leader' is only available with an active HA_MODE".to_string();
error!("{error}");
return Err(CacheError { error });
}
let health_state = if let Some(health_state) = health_state {
health_state
} else {
let health_state = if let Some(s) = cache_config.rx_health_state.borrow().clone() {
s.is_quorum_good()?;
s
} else {
return Err(CacheError {
error: "'insert_from_leader' does only work with an active HA_MODE".to_string(),
});
};
health_state
};
let tx_cache = cache_config.cache_map.get(&cache_name);
if tx_cache.is_none() {
let error = format!("'cache_map' misconfiguration in 'insert_from_leader': The tx for the given cache_name '{}' does not exist", cache_name);
error!("{error}");
return Err(CacheError { error });
}
let tx_cache = tx_cache.unwrap();
let await_acks = match ack_level {
AckLevel::Leader => None,
AckLevel::Quorum => Some(*QUORUM),
AckLevel::Once => Some(1),
};
if let Some(mut acks) = await_acks {
let (tx, rx) = flume::unbounded();
cache_config
.tx_remote
.as_ref()
.unwrap()
.send_async(RpcRequest::Put {
cache_name,
entry: entry.clone(),
value: value.clone(),
resp: Some(tx),
})
.await?;
tx_cache.send_async(CacheReq::Put { entry, value }).await?;
debug!("health_state in 'insert_from_leader': {:?}", health_state);
let mut clients = health_state.connected_hosts;
while acks > 0 && clients > 0 {
match rx.recv_async().await {
Ok(val) => {
if val {
acks -= 1;
}
}
Err(err) => {
error!("Error in 'insert_from_leader': {}", err);
}
}
if clients == 1 && acks > 0 {
return Ok(false);
}
clients -= 1;
}
} else {
let remote = cache_config
.tx_remote
.as_ref()
.unwrap()
.send_async(RpcRequest::Put {
cache_name,
entry: entry.clone(),
value: value.clone(),
resp: None,
});
let local = tx_cache.send_async(CacheReq::Put { entry, value });
remote.await?;
local.await?;
}
Ok(true)
}
pub async fn cache_del(
cache_name: String,
entry: String,
cache_config: &CacheConfig,
) -> Result<(), CacheError> {
let tx = cache_config.cache_map.get(&cache_name).ok_or_else(|| {
let err = format!(
"CacheMap misconfiguration - could not find expected cache_name '{}'",
&cache_name
);
error!("{err}");
CacheError { error: err }
})?;
let req = CacheReq::Del {
entry: entry.clone(),
};
tx.send_async(req).await.map_err(|e| {
let err = format!("Error sending local cache value over the channel: {:?}", e);
error!("{err}");
CacheError { error: err }
})?;
if *HA_MODE {
let remote_req = RpcRequest::Del {
cache_name,
entry,
resp: None,
};
cache_config
.tx_remote
.as_ref()
.unwrap()
.send_async(remote_req)
.await?;
}
Ok(())
}
pub async fn cache_remove(
cache_name: String,
entry: String,
cache_config: &CacheConfig,
ack_level: AckLevel,
) -> Result<(), CacheError> {
if !*HA_MODE {
return cache_del(cache_name, entry, cache_config).await;
}
let health_state = if let Some(s) = cache_config.rx_health_state.borrow().clone() {
s.is_quorum_good()?;
s
} else {
return Err(CacheError {
error: "'cache_remove' does only work with an active HA_MODE".to_string(),
});
};
let mut callback_rx = None;
match health_state.state {
QuorumState::Leader => {
remove_from_leader(
cache_name,
entry,
cache_config,
ack_level,
Some(health_state),
)
.await?;
}
QuorumState::LeaderDead => {
return Err(CacheError {
error: "HA Cache is in QuorumState::LeaderDead - cache insert not possible"
.to_string(),
});
}
QuorumState::LeaderSwitch => {
remove_from_leader(
cache_name,
entry,
cache_config,
ack_level,
Some(health_state),
)
.await?;
}
QuorumState::LeaderTxAwait(_) | QuorumState::LeadershipRequested(_) => {
return Err(CacheError {
error: "HA Cache has no leader yet - cache insert not possible".to_string(),
});
}
QuorumState::Follower => {
let (tx, rx) = flume::unbounded();
callback_rx = Some(rx);
let req = RpcRequest::Remove {
cache_name,
entry,
ack_level,
resp: tx,
};
health_state
.tx_leader
.as_ref()
.expect(
"'health_state.tx_leader' is None in 'cache_insert' when it should never be",
)
.send_async(req)
.await?;
}
QuorumState::Undefined => unreachable!(),
QuorumState::Retry => unreachable!(),
}
if let Some(rx) = callback_rx {
let res = rx.recv_async().await?;
if res {
Ok(())
} else {
Err(CacheError {
error: "Could not execute the 'cache_insert'".to_string(),
})
}
} else {
Ok(())
}
}
pub(crate) async fn remove_from_leader(
cache_name: String,
entry: String,
cache_config: &CacheConfig,
ack_level: AckLevel,
health_state: Option<QuorumHealthState>,
) -> Result<bool, CacheError> {
debug!("'remove_from_leader' for {}/{}", cache_name, entry);
if !*HA_MODE {
let error = "'remove_from_leader' is only available with an active HA_MODE".to_string();
error!("{error}");
return Err(CacheError { error });
}
let health_state = if let Some(health_state) = health_state {
health_state
} else {
let health_state = if let Some(s) = cache_config.rx_health_state.borrow().clone() {
s.is_quorum_good()?;
s
} else {
return Err(CacheError {
error: "'remove_from_leader' does only work with an active HA_MODE".to_string(),
});
};
health_state
};
if health_state.state != QuorumState::Leader && health_state.state != QuorumState::LeaderSwitch
{
let error = "Execution of 'remove_from_leader' is not allowed on a non-leader".to_string();
warn!("is_leader state: {:?}", health_state.state);
panic!("{}", error);
}
let tx_cache = cache_config.cache_map.get(&cache_name);
if tx_cache.is_none() {
let error = format!("'cache_map' misconfiguration in 'remove_from_leader': The tx for the given cache_name '{}' does not exist", cache_name);
error!("{error}");
return Err(CacheError { error });
}
let tx_cache = tx_cache.unwrap();
let await_acks = match ack_level {
AckLevel::Leader => None,
AckLevel::Quorum => Some(*QUORUM),
AckLevel::Once => Some(1),
};
if let Some(mut acks) = await_acks {
let (tx, rx) = flume::unbounded();
cache_config
.tx_remote
.as_ref()
.unwrap()
.send_async(RpcRequest::Del {
cache_name,
entry: entry.clone(),
resp: Some(tx),
})
.await?;
tx_cache.send_async(CacheReq::Del { entry }).await?;
debug!("health_state in 'remove_from_leader': {:?}", health_state);
let mut clients = health_state.connected_hosts;
while acks > 0 && clients > 0 {
match rx.recv_async().await {
Ok(val) => {
if val {
acks -= 1;
}
}
Err(err) => {
error!("Error in 'remove_from_leader': {}", err);
}
}
if clients == 1 && acks > 0 {
return Ok(false);
}
clients -= 1;
}
} else {
let remote = cache_config
.tx_remote
.as_ref()
.unwrap()
.send_async(RpcRequest::Del {
cache_name,
entry: entry.clone(),
resp: None,
});
let local = tx_cache.send_async(CacheReq::Del { entry });
remote.await?;
local.await?;
}
Ok(true)
}
fn get_local_hostname() -> String {
match env::var("HOSTNAME_OVERWRITE") {
Ok(hostname) => hostname,
Err(_) => {
let hostname_os = gethostname();
hostname_os
.to_str()
.expect("Error getting the hostname from the OS")
.to_string()
}
}
}
pub(crate) fn get_cache_req_id() -> String {
nanoid!(10)
}
pub(crate) fn get_rand_between(start: u64, end: u64) -> u64 {
let mut rng = rand::thread_rng();
rng.gen_range(start..end)
}
pub async fn clear_caches(cache_config: &CacheConfig) -> Result<(), CacheError> {
for (name, tx) in &cache_config.cache_map {
debug!("Clearing cache {}", name);
tx.send_async(CacheReq::Reset).await?;
}
Ok(())
}
pub async fn start_cluster(
tx_watch: watch::Sender<Option<QuorumHealthState>>,
cache_config: &mut CacheConfig,
tx_notify: Option<mpsc::Sender<CacheNotify>>,
hostname_overwrite: Option<String>,
) -> anyhow::Result<()> {
dotenvy::dotenv().ok();
if !*HA_MODE {
info!("HA_MODE is not set, starting in standalone mode");
return Ok(());
}
let mut ha_clients = vec![];
let mut host_srv_addr = String::default();
let hostname = if let Some(name) = hostname_overwrite {
name
} else {
get_local_hostname()
};
let ha_hosts_csv = env::var("HA_HOSTS").expect("HA_HOSTS is not set");
if !ha_hosts_csv.contains(&hostname) {
panic!(
"HA_HOSTS is not set up correctly. Current hostname '{}' does not appear in HA_HOSTS",
hostname
);
}
let ha_hosts = ha_hosts_csv
.split(',')
.map(|h| h.trim().to_string())
.collect::<Vec<String>>();
ha_hosts.iter().for_each(|h| {
if !h.contains(&hostname) {
ha_clients.push(h.trim().to_owned())
} else {
h.trim().clone_into(&mut host_srv_addr);
}
});
info!(
"Starting HA cache for hostname '{}' and cache members: {:?}",
hostname, ha_clients
);
let tx_quorum = cache_config.tx_quorum.as_ref().unwrap().to_owned();
let cache_map = cache_config.cache_map.clone();
let quorum_handle = tokio::spawn(quorum_handler(
tx_quorum.clone(),
tx_watch,
cache_config.rx_quorum.as_ref().unwrap().clone(),
cache_config.tx_remote.as_ref().unwrap().clone(),
ha_clients.len(),
cache_map,
host_srv_addr.clone(),
));
let srv_handle = tokio::spawn(RpcCacheService::serve(
host_srv_addr,
cache_config.clone(),
tx_quorum.clone(),
tx_notify,
));
let rx_remote = cache_config.rx_remote.as_ref().unwrap().clone();
let clients_handle = tokio::spawn(cache_clients(ha_clients, tx_quorum, rx_remote));
let tx_remote = cache_config.tx_remote.as_ref().unwrap().clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
if let Err(err) = tx_remote.send_async(RpcRequest::Ping).await {
debug!("cannot ping remote caches: {:?}", err);
}
}
});
let (tx_exit, rx_exit) = flume::unbounded();
cache_config.tx_exit = Some(tx_exit);
let tx_quorum = cache_config
.tx_quorum
.clone()
.expect("cache_config.tx_quorum to never be empty here");
tokio::spawn(async move {
let tx_ack = rx_exit.recv_async().await;
tx_quorum.send_async(QuorumReq::HostShutdown).await.unwrap();
time::sleep(Duration::from_millis(2000)).await;
srv_handle.abort();
clients_handle.abort();
quorum_handle.abort();
if let Ok(tx) = tx_ack {
tx.send(()).unwrap();
}
time::sleep(Duration::from_millis(10)).await;
});
Ok(())
}
pub(crate) async fn cache_recv<C>(mut cache: C, name: String, rx: flume::Receiver<CacheReq>)
where
C: Cached<String, Vec<u8>>,
{
info!("Started cache {}", name);
loop {
let req_opt = rx.recv_async().await;
if req_opt.is_err() {
warn!(
"Received None in cache_recv {} - Cache Sender has been dropped - exiting cache",
name
);
break;
}
match req_opt.unwrap() {
CacheReq::Get { entry, resp } => {
let cache_entry = cache.cache_get(&entry).cloned();
if let Err(err) = resp.send_async(cache_entry).await {
debug!("Error sending cache entry '{}' back: {:?}", entry, err);
}
}
CacheReq::Put { entry, value } => {
cache.cache_set(entry, value);
}
CacheReq::Del { entry } => {
cache.cache_remove(&entry);
}
CacheReq::Reset => {
debug!("Received a full cache reset request");
cache.cache_reset();
}
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use pretty_assertions::assert_eq;
use super::*;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct Cert {
key: String,
crt: String,
id: i64,
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct BincodeTester {
id: i64,
name: String,
opt_name: Option<String>,
some_vec: Vec<u32>,
some_opt_vec: Option<Vec<String>>,
}
pub fn setup_logging() {
use tracing::Level;
let log_level = Level::INFO;
let filter = format!("{},async_nats=info,hyper=info", log_level.as_str());
env::set_var("RUST_LOG", &filter);
let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_max_level(log_level)
.with_env_filter(filter)
.finish();
let _ = tracing::subscriber::set_default(subscriber);
}
#[tokio::test]
async fn test_cache_simple() -> Result<(), Box<dyn std::error::Error>> {
setup_logging();
let (_rx_quorum, mut cache_config) = CacheConfig::new();
let cache_name_1 = "one".to_string();
cache_config.spawn_cache(cache_name_1.clone(), SizedCache::with_size(5), None);
let s1 = BincodeTester {
id: 123,
name: "SuperTester".to_string(),
opt_name: None,
some_vec: vec![1, 33, 20098],
some_opt_vec: None,
};
let entry = "e1".to_string();
cache_put(cache_name_1.clone(), entry.clone(), &cache_config, &s1)
.await
.unwrap();
let res_opt = cache_get!(
BincodeTester,
cache_name_1.clone(),
entry.clone(),
&cache_config,
false
)
.await
.expect("Cache error");
assert!(res_opt.is_some());
let res = res_opt.unwrap();
assert_eq!(res.id, s1.id);
assert_eq!(res.name, s1.name);
assert_eq!(res.opt_name, s1.opt_name);
assert_eq!(res.some_vec, s1.some_vec);
assert_eq!(res.some_opt_vec, s1.some_opt_vec);
Ok(())
}
#[tokio::test]
async fn test_cache_recv_sized() -> Result<(), Box<dyn std::error::Error>> {
setup_logging();
let (_rx_quorum, mut cache_config) = CacheConfig::new();
let cache_name_1 = "two".to_string();
cache_config.spawn_cache(cache_name_1.clone(), SizedCache::with_size(5), Some(16));
cache_put(cache_name_1.clone(), "1".to_string(), &cache_config, &1i32)
.await
.unwrap();
cache_put(
cache_name_1.clone(),
"2".to_string(),
&cache_config,
&999i32,
)
.await
.unwrap();
cache_put(cache_name_1.clone(), "2".to_string(), &cache_config, &17i32)
.await
.unwrap();
cache_put(
cache_name_1.clone(),
"3".to_string(),
&cache_config,
&1337i64,
)
.await
.unwrap();
cache_put(
cache_name_1.clone(),
"4".to_string(),
&cache_config,
&2887398i64,
)
.await
.unwrap();
let crt1 = Cert {
key: "SomeKey1".to_string(),
crt: "SomeVerySecretCert1".to_string(),
id: 1,
};
let crt2 = Cert {
key: "SomeKey2WhichIsVeeeeeeeeeeryyyyyyLooooooong".to_string(),
crt: "SomeVerySecretCert2".to_string(),
id: 2,
};
cache_put(
cache_name_1.clone(),
"Cert1".to_string(),
&cache_config,
&crt1,
)
.await
.unwrap();
cache_put(
cache_name_1.clone(),
"Cert2".to_string(),
&cache_config,
&crt2,
)
.await
.unwrap();
let one = cache_get!(
i32,
cache_name_1.clone(),
"1".to_string(),
&cache_config,
false
)
.await
.unwrap();
let two = cache_get!(
i32,
cache_name_1.clone(),
"2".to_string(),
&cache_config,
false
)
.await
.unwrap();
let three = cache_get!(
i64,
cache_name_1.clone(),
"3".to_string(),
&cache_config,
false
)
.await
.unwrap();
let four = cache_get!(
i64,
cache_name_1.clone(),
"4".to_string(),
&cache_config,
false
)
.await
.unwrap();
let crt_recv1 = cache_get!(
Cert,
cache_name_1.clone(),
"Cert1".to_string(),
&cache_config,
false
)
.await
.unwrap();
let crt_recv2 = cache_get!(
Cert,
cache_name_1.clone(),
"Cert2".to_string(),
&cache_config,
false
)
.await
.unwrap();
assert!(one.is_none());
assert_eq!(two.unwrap(), 17);
assert_eq!(three.unwrap(), 1337);
assert_eq!(four.unwrap(), 2887398);
assert_eq!(crt_recv1.unwrap().crt, crt1.crt);
assert_eq!(crt_recv2.unwrap().crt, crt2.crt);
cache_del(cache_name_1.clone(), "2".to_string(), &cache_config)
.await
.unwrap();
cache_del(cache_name_1.clone(), "Cert2".to_string(), &cache_config)
.await
.unwrap();
let two = cache_get!(
i32,
cache_name_1.clone(),
"2".to_string(),
&cache_config,
false
)
.await
.unwrap();
let crt_recv2 = cache_get!(
Cert,
cache_name_1.clone(),
"Cert2".to_string(),
&cache_config,
false
)
.await
.unwrap();
assert!(two.is_none());
assert!(crt_recv2.is_none());
Ok(())
}
#[tokio::test]
async fn test_cache_recv_sized_timed() -> Result<(), Box<dyn std::error::Error>> {
setup_logging();
let (_rx_quorum, mut cache_config) = CacheConfig::new();
let cache_name_1 = "three".to_string();
cache_config.spawn_cache(
cache_name_1.clone(),
TimedSizedCache::with_size_and_lifespan(2, 1),
None,
);
cache_put(cache_name_1.clone(), "1".to_string(), &cache_config, &1i64)
.await
.unwrap();
cache_put(cache_name_1.clone(), "2".to_string(), &cache_config, &2i64)
.await
.unwrap();
cache_put(cache_name_1.clone(), "3".to_string(), &cache_config, &3i64)
.await
.unwrap();
let one = cache_get!(
i64,
cache_name_1.clone(),
"1".to_string(),
&cache_config,
false
)
.await
.unwrap();
let two = cache_get!(
i64,
cache_name_1.clone(),
"2".to_string(),
&cache_config,
false
)
.await
.unwrap()
.unwrap();
let three = cache_get!(
i64,
cache_name_1.clone(),
"3".to_string(),
&cache_config,
false
)
.await
.unwrap()
.unwrap();
assert!(one.is_none());
assert_eq!(two, 2);
assert_eq!(three, 3);
time::sleep(Duration::from_secs(1)).await;
let one = cache_get!(
Cert,
cache_name_1.clone(),
"1".to_string(),
&cache_config,
false
)
.await
.unwrap();
let two = cache_get!(
Cert,
cache_name_1.clone(),
"2".to_string(),
&cache_config,
false
)
.await
.unwrap();
let three = cache_get!(
Cert,
cache_name_1.clone(),
"3".to_string(),
&cache_config,
false
)
.await
.unwrap();
assert!(one.is_none());
assert!(two.is_none());
assert!(three.is_none());
Ok(())
}
#[tokio::test]
async fn test_cache_recv_timed() -> Result<(), Box<dyn std::error::Error>> {
setup_logging();
let (_rx_quorum, mut cache_config) = CacheConfig::new();
let cache_name_1 = "four".to_string();
cache_config.spawn_cache(cache_name_1.clone(), TimedCache::with_lifespan(1), None);
cache_put(cache_name_1.clone(), "1".to_string(), &cache_config, &1i32)
.await
.unwrap();
cache_put(cache_name_1.clone(), "2".to_string(), &cache_config, &2i32)
.await
.unwrap();
cache_put(cache_name_1.clone(), "3".to_string(), &cache_config, &3i32)
.await
.unwrap();
let one = cache_get!(
i32,
cache_name_1.clone(),
"1".to_string(),
&cache_config,
false
)
.await
.unwrap();
let two = cache_get!(
i32,
cache_name_1.clone(),
"2".to_string(),
&cache_config,
false
)
.await
.unwrap();
let three = cache_get!(
i32,
cache_name_1.clone(),
"3".to_string(),
&cache_config,
false
)
.await
.unwrap();
assert_eq!(one, Some(1));
assert_eq!(two, Some(2));
assert_eq!(three, Some(3));
time::sleep(Duration::from_secs(1)).await;
let one = cache_get!(
i32,
cache_name_1.clone(),
"1".to_string(),
&cache_config,
false
)
.await
.unwrap();
let two = cache_get!(
i32,
cache_name_1.clone(),
"2".to_string(),
&cache_config,
false
)
.await
.unwrap();
let three = cache_get!(
i32,
cache_name_1.clone(),
"3".to_string(),
&cache_config,
false
)
.await
.unwrap();
assert!(one.is_none());
assert!(two.is_none());
assert!(three.is_none());
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
#[ignore]
async fn test_ha_cache() -> anyhow::Result<()> {
setup_logging();
env::set_var("HA_MODE", "true");
env::set_var(
"HA_HOSTS",
"http://127.0.0.1:7001, http://127.0.0.1:7002, http://127.0.0.1:7003",
);
env::set_var("CACHE_AUTH_TOKEN", "SuperSecretToken1337");
env::set_var("CACHE_TLS", "false");
let (tx_health_1, mut cache_config_1) = CacheConfig::new();
let (tx_health_2, mut cache_config_2) = CacheConfig::new();
let (tx_health_3, mut cache_config_3) = CacheConfig::new();
let cache_name = "c_one".to_string();
let cache = SizedCache::with_size(16);
cache_config_1.spawn_cache(cache_name.clone(), cache.clone(), Some(16));
cache_config_2.spawn_cache(cache_name.clone(), cache.clone(), None);
cache_config_3.spawn_cache(cache_name.clone(), cache, None);
start_cluster(
tx_health_1,
&mut cache_config_1,
None,
Some("127.0.0.1:7001".to_string()),
)
.await?;
time::sleep(Duration::from_millis(100)).await;
let entry = "one".to_string();
let res = cache_put(cache_name.clone(), entry.clone(), &cache_config_1, &1i32).await;
match res {
Ok(_) => panic!("This should not be Ok"),
Err(err) => {
assert!(err.error.contains("QuorumHealth::Bad"));
}
}
let one = cache_get!(
i32,
cache_name.clone(),
entry.clone(),
&cache_config_1,
false
)
.await
.unwrap();
assert_eq!(one, None);
start_cluster(
tx_health_2,
&mut cache_config_2,
None,
Some("127.0.0.1:7002".to_string()),
)
.await?;
let mut loops = 0;
while cache_config_1
.rx_health_state
.borrow()
.as_ref()
.unwrap()
.health
== QuorumHealth::Bad
{
loops += 1;
time::sleep(Duration::from_secs(1)).await;
if loops > 20 {
panic!("QuorumHealth did no reach Good state when it should have");
}
}
let entry = "one".to_string();
cache_put(cache_name.clone(), entry.clone(), &cache_config_1, &1337i32)
.await
.unwrap();
let one = cache_get!(
i32,
cache_name.clone(),
entry.clone(),
&cache_config_1,
false
)
.await
.unwrap();
assert_eq!(one, Some(1337));
time::sleep(Duration::from_millis(5)).await;
let two = cache_get!(
i32,
cache_name.clone(),
entry.clone(),
&cache_config_2,
false
)
.await
.unwrap();
assert_eq!(two, Some(1337));
start_cluster(
tx_health_3,
&mut cache_config_3,
None,
Some("127.0.0.1:7003".to_string()),
)
.await?;
let mut loops = 0;
while cache_config_3
.rx_health_state
.borrow()
.as_ref()
.unwrap()
.health
== QuorumHealth::Bad
{
loops += 1;
time::sleep(Duration::from_secs(1)).await;
if loops > 20 {
panic!("QuorumHealth did no reach Good state when it should have");
}
}
assert_eq!(
cache_config_3
.rx_health_state
.borrow()
.as_ref()
.unwrap()
.state,
QuorumState::Follower
);
while cache_config_1
.rx_health_state
.borrow()
.as_ref()
.unwrap()
.connected_hosts
< 2
|| cache_config_2
.rx_health_state
.borrow()
.as_ref()
.unwrap()
.connected_hosts
< 2
|| cache_config_3
.rx_health_state
.borrow()
.as_ref()
.unwrap()
.connected_hosts
< 2
{
time::sleep(Duration::from_secs(1)).await;
}
let three = cache_get!(
i32,
cache_name.clone(),
entry.clone(),
&cache_config_3,
false
)
.await
.unwrap();
assert_eq!(three, None);
let three = cache_get!(
i32,
cache_name.clone(),
entry.clone(),
&cache_config_3,
true
)
.await
.unwrap();
assert_eq!(three, Some(1337));
cache_del(cache_name.clone(), entry.clone(), &cache_config_1)
.await
.unwrap();
let one = cache_get!(
i32,
cache_name.clone(),
entry.clone(),
&cache_config_1,
false
)
.await
.unwrap();
assert_eq!(one, None);
time::sleep(Duration::from_millis(1)).await;
let two = cache_get!(
i32,
cache_name.clone(),
entry.clone(),
&cache_config_2,
false
)
.await
.unwrap();
assert_eq!(two, None);
let three = cache_get!(
i32,
cache_name.clone(),
entry.clone(),
&cache_config_3,
false
)
.await
.unwrap();
assert_eq!(three, None);
let ha_entry = "ha_entry".to_string();
let ha_val = "HaVal1337".to_string();
cache_insert(
cache_name.clone(),
ha_entry.clone(),
&cache_config_1,
&ha_val,
AckLevel::Quorum,
)
.await
.unwrap();
time::sleep(Duration::from_millis(20)).await;
let one = cache_get!(
String,
cache_name.clone(),
ha_entry.clone(),
&cache_config_1,
false
)
.await
.unwrap()
.unwrap();
assert_eq!(one, ha_val);
let two = cache_get!(
String,
cache_name.clone(),
ha_entry.clone(),
&cache_config_2,
false
)
.await
.unwrap()
.unwrap();
assert_eq!(two, ha_val);
let three = cache_get!(
String,
cache_name.clone(),
ha_entry.clone(),
&cache_config_3,
false
)
.await
.unwrap()
.unwrap();
assert_eq!(three, ha_val);
cache_remove(
cache_name.clone(),
entry.clone(),
&cache_config_1,
AckLevel::Quorum,
)
.await
.unwrap();
time::sleep(Duration::from_millis(20)).await;
let one = cache_get!(
String,
cache_name.clone(),
entry.clone(),
&cache_config_1,
false
)
.await
.unwrap();
assert_eq!(one, None);
let two = cache_get!(
String,
cache_name.clone(),
entry.clone(),
&cache_config_2,
false
)
.await
.unwrap();
assert_eq!(two, None);
let three = cache_get!(
String,
cache_name.clone(),
entry.clone(),
&cache_config_3,
false
)
.await
.unwrap();
assert_eq!(three, None);
Ok(())
}
}