use std::{
marker::PhantomData,
sync::{Arc, RwLock, RwLockReadGuard},
time::{Duration, Instant},
};
#[derive(Debug)]
pub enum ProbeError<E> {
WrongState,
AwaitingNextProbe,
CompletedUnusable,
CallbackFailed(E),
}
#[derive(Debug)]
pub struct StatefulProbe<C, E, S> {
name: String,
status: RwLock<ProbeStatus<C, E, S>>,
probe_interval: Duration,
}
pub enum ProbeStatus<C, E, S> {
Probing {
config: Arc<C>,
last_probe_time: Option<Instant>,
phantom: PhantomData<E>,
},
Unusable,
Usable(S),
}
impl<C, E, S> std::fmt::Debug for ProbeStatus<C, E, S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Probing { .. } => write!(f, "Probing"),
Self::Unusable => write!(f, "Unusable"),
Self::Usable(_) => write!(f, "Usable"),
}
}
}
impl<C, E, S> ProbeStatus<C, E, S> {
pub fn mark(&mut self) -> Result<(), ProbeError<E>> {
match self {
#[rustfmt::skip]
ProbeStatus::Probing { last_probe_time, .. } => {
last_probe_time.replace(Instant::now());
Ok(())
}
_ => Err(ProbeError::WrongState),
}
}
pub fn config(&self) -> Result<Arc<C>, ProbeError<E>> {
match self {
ProbeStatus::Probing { config, .. } => Ok(config.clone()),
_ => Err(ProbeError::WrongState),
}
}
#[cfg(test)]
pub fn last_probe_time(&self) -> Result<Option<Instant>, ProbeError<E>> {
match self {
ProbeStatus::Probing { last_probe_time, .. } => Ok(*last_probe_time),
_ => Err(ProbeError::WrongState),
}
}
pub fn state(&self) -> Result<&S, ProbeError<E>> {
match self {
ProbeStatus::Usable(state) => Ok(state),
_ => Err(ProbeError::WrongState),
}
}
}
impl<C, E, S> StatefulProbe<C, E, S> {
pub fn new(name: String, config: Arc<C>, probe_interval: Duration) -> Self {
let status = RwLock::new(ProbeStatus::Probing {
config,
last_probe_time: None,
phantom: PhantomData,
});
StatefulProbe {
name,
status,
probe_interval,
}
}
#[cfg(test)]
pub fn last_probe_time(&self) -> Result<Option<Instant>, ProbeError<E>> {
self.status.read().unwrap().last_probe_time()
}
pub fn status<F>(&self, probe: F) -> Result<RwLockReadGuard<ProbeStatus<C, E, S>>, ProbeError<E>>
where
F: Fn(String, &ProbeStatus<C, E, S>) -> Result<S, ProbeError<E>>,
{
fn is_time_to_check(time_between_probes: Duration, last_probe_time: Option<Instant>) -> bool {
match last_probe_time {
None => true,
Some(instant) => Instant::now().saturating_duration_since(instant) > time_between_probes,
}
}
#[allow(clippy::type_complexity)]
fn get_if_usable<C, E, S>(
name: String,
status: RwLockReadGuard<ProbeStatus<C, E, S>>,
retry_interval: Duration,
) -> Option<Result<RwLockReadGuard<ProbeStatus<C, E, S>>, ProbeError<E>>> {
match &*status {
ProbeStatus::Usable(_) => {
Some(Ok(status))
}
ProbeStatus::Unusable => {
Some(Err(ProbeError::CompletedUnusable))
}
ProbeStatus::Probing { last_probe_time, .. } => {
if !is_time_to_check(retry_interval, *last_probe_time) {
if let Some(instant) = last_probe_time {
let until = retry_interval.checked_sub(instant.elapsed()).unwrap(); info!(
"Signer '{}' is currently unavailable. {:?}s until next check.",
name,
until.as_secs()
);
} else {
info!(
"Signer '{}' is currently unavailable. Checking every {:?}s",
name,
retry_interval.as_secs()
);
}
Some(Err(ProbeError::AwaitingNextProbe))
} else {
None
}
}
}
}
fn send_probe<C, E, F, S>(probe: &StatefulProbe<C, E, S>, probe_cb: F) -> Result<(), ProbeError<E>>
where
F: Fn(String, &ProbeStatus<C, E, S>) -> Result<S, ProbeError<E>>,
{
let mut status = probe.status.try_write().map_err(|_| ProbeError::AwaitingNextProbe)?;
status.mark()?;
match (probe_cb)(probe.name.clone(), &*status) {
Ok(usable_state) => {
*status = ProbeStatus::Usable(usable_state);
Ok(())
}
Err(err) => {
if matches!(err, ProbeError::CompletedUnusable) {
*status = ProbeStatus::Unusable;
}
Err(err)
}
}
}
let status = self.status.read().unwrap();
get_if_usable(self.name.clone(), status, self.probe_interval).unwrap_or_else(|| {
send_probe(self, probe)
.map(|_| self.status.read().unwrap())
.map_err(|err| match err {
ProbeError::CompletedUnusable => err,
_ => ProbeError::AwaitingNextProbe,
})
})
}
}
#[cfg(test)]
pub mod tests {
use std::time::Duration;
use super::*;
#[derive(Debug, Default)]
struct Config {
_hostname: String,
_port: u64,
}
#[derive(Copy, Clone, Debug, Default)]
struct State {
some_state: u8,
}
impl State {
fn some_func(&self) -> u8 {
self.some_state
}
}
#[derive(Debug)]
enum SomeError {
SomeErrorCode,
}
fn probe_func(
_name: String,
_status: &ProbeStatus<Config, SomeError, State>,
) -> Result<State, ProbeError<SomeError>> {
Err(ProbeError::CompletedUnusable)
}
#[test]
fn probe_should_be_permanently_unavailable_with_closure() {
let config = Arc::new(Config::default());
let conn = StatefulProbe::<_, SomeError, State>::new("dummy".to_string(), config, Duration::from_secs(0));
let res = conn.status(|_, _| Err(ProbeError::CompletedUnusable));
match res {
Err(ProbeError::CompletedUnusable) => {}
other => panic!("Expected Err(ProbeError::PermanentlyUnusable) but got {:?}", other),
}
}
#[test]
fn probe_should_be_permanently_unavailable_with_fn() {
let config = Arc::new(Config::default());
let conn = StatefulProbe::<_, SomeError, State>::new("dummy".to_string(), config, Duration::from_secs(0));
let res = conn.status(probe_func);
match res {
Err(ProbeError::CompletedUnusable) => {}
other => panic!("Expected Err(ProbeError::PermanentlyUnusable) but got {:?}", other),
}
}
#[test]
fn probe_should_be_permanently_unavailable() {
let config = Arc::new(Config::default());
let conn = StatefulProbe::<_, SomeError, State>::new("dummy".to_string(), config, Duration::from_secs(0));
let res = conn.status(|_, _| Err(ProbeError::CompletedUnusable));
match res {
Err(ProbeError::CompletedUnusable) => {}
other => panic!("Expected Err(ProbeError::PermanentlyUnusable) but got {:?}", other),
}
}
#[test]
fn probe_should_be_temporarily_unavailable() {
let config = Arc::new(Config::default());
let conn = StatefulProbe::<_, SomeError, State>::new("dummy".to_string(), config, Duration::from_secs(0));
let res = conn.status(|_, _| Err(ProbeError::AwaitingNextProbe));
match res {
Err(ProbeError::AwaitingNextProbe) => {}
other => panic!("Expected Err(ProbeError::AwaitingNextProbe) but got {:?}", other),
}
}
#[test]
fn probe_should_be_temporarily_unavailable_on_custom_error() {
let config = Arc::new(Config::default());
let conn = StatefulProbe::<_, SomeError, State>::new("dummy".to_string(), config, Duration::from_secs(0));
let res = conn.status(|_, _| Err(ProbeError::CallbackFailed(SomeError::SomeErrorCode)));
match res {
Err(ProbeError::AwaitingNextProbe) => {}
other => panic!("Expected Err(ProbeError::AwaitingNextProbe) but got {:?}", other),
}
}
#[test]
fn last_probe_time_should_advance() -> Result<(), ProbeError<SomeError>> {
let config = Arc::new(Config::default());
let conn = StatefulProbe::<_, SomeError, State>::new("dummy".to_string(), config, Duration::from_millis(100));
assert_eq!(None, conn.last_probe_time()?);
let _probe1 = conn.status(|_, _| Err(ProbeError::AwaitingNextProbe));
let t1 = conn.last_probe_time()?;
assert!(t1.is_some());
std::thread::sleep(Duration::from_millis(10));
let _probe2 = conn.status(|_, _| Err(ProbeError::AwaitingNextProbe));
let t2 = conn.last_probe_time()?;
assert!(t2 == t1);
std::thread::sleep(Duration::from_millis(200));
let _probe3 = conn.status(|_, _| Err(ProbeError::AwaitingNextProbe));
let t3 = conn.last_probe_time()?;
assert!(t3 > t1);
Ok(())
}
#[test]
fn probe_should_change_state_when_usable() -> Result<(), ProbeError<SomeError>> {
let config = Arc::new(Config::default());
let new_state = State { some_state: 1 };
let conn = StatefulProbe::<_, SomeError, State>::new("dummy".to_string(), config, Duration::from_millis(0));
let new_status = conn.status(|_, _| Ok(new_state))?;
assert_eq!(1, new_status.state()?.some_state);
assert_eq!(1, new_status.state()?.some_func());
Ok(())
}
}