use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::sync::Notify;
const DEFAULT_INHIBIT: Duration = Duration::from_secs(60 * 5);
pub struct BeaconAnomaly {
inhibit: Duration,
last: Mutex<Option<Instant>>,
pulse: Mutex<Option<Arc<Notify>>>,
}
impl BeaconAnomaly {
pub fn new() -> Self {
Self::with_inhibit(DEFAULT_INHIBIT)
}
pub fn with_inhibit(inhibit: Duration) -> Self {
Self {
inhibit,
last: Mutex::new(None),
pulse: Mutex::new(None),
}
}
pub fn install_pulse(&self, pulse: Arc<Notify>) {
*self.pulse.lock().unwrap() = Some(pulse);
}
pub fn request(&self) -> bool {
let now = Instant::now();
let mut last = self.last.lock().unwrap();
match *last {
Some(t) if now.duration_since(t) < self.inhibit => false,
_ => {
*last = Some(now);
drop(last);
if let Some(notify) = self.pulse.lock().unwrap().as_ref() {
notify.notify_one();
}
true
}
}
}
pub fn reset(&self) {
*self.last.lock().unwrap() = None;
}
pub fn elapsed(&self) -> Option<Duration> {
self.last.lock().unwrap().map(|t| t.elapsed())
}
}
impl Default for BeaconAnomaly {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn first_request_honored() {
let b = BeaconAnomaly::with_inhibit(Duration::from_secs(60));
assert!(b.request());
}
#[test]
fn second_immediate_request_inhibited() {
let b = BeaconAnomaly::with_inhibit(Duration::from_secs(60));
assert!(b.request());
assert!(!b.request());
}
#[test]
fn request_after_inhibit_expires() {
let b = BeaconAnomaly::with_inhibit(Duration::from_millis(10));
assert!(b.request());
std::thread::sleep(Duration::from_millis(20));
assert!(b.request());
}
#[test]
fn reset_clears_inhibit() {
let b = BeaconAnomaly::with_inhibit(Duration::from_secs(60));
assert!(b.request());
assert!(!b.request());
b.reset();
assert!(b.request());
}
#[tokio::test(flavor = "current_thread")]
async fn install_pulse_fires_on_honored_request() {
let b = BeaconAnomaly::with_inhibit(Duration::from_millis(10));
let pulse = Arc::new(Notify::new());
b.install_pulse(pulse.clone());
assert!(b.request());
let woken = tokio::time::timeout(Duration::from_millis(100), pulse.notified())
.await
.is_ok();
assert!(woken, "honored request must pulse the installed Notify");
}
#[tokio::test(flavor = "current_thread")]
async fn install_pulse_skips_on_inhibited_request() {
let b = BeaconAnomaly::with_inhibit(Duration::from_secs(60));
let pulse = Arc::new(Notify::new());
b.install_pulse(pulse.clone());
assert!(b.request());
let _ = tokio::time::timeout(Duration::from_millis(50), pulse.notified()).await;
assert!(!b.request());
let woken = tokio::time::timeout(Duration::from_millis(100), pulse.notified())
.await
.is_ok();
assert!(!woken, "inhibited request must NOT pulse the Notify");
}
}