use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use arc_swap::{ArcSwap, ArcSwapOption};
use crate::error::ShikumiError;
use crate::reload::ReloadFailure;
pub(crate) struct ReloadObservatory {
generation: Arc<AtomicU64>,
failure_count: Arc<AtomicU64>,
last_reload_error: Arc<ArcSwapOption<ReloadFailure>>,
last_publish_at: Arc<ArcSwap<Instant>>,
last_failure_at: Arc<ArcSwapOption<Instant>>,
}
impl ReloadObservatory {
pub(crate) fn new() -> Self {
Self {
generation: Arc::new(AtomicU64::new(0)),
failure_count: Arc::new(AtomicU64::new(0)),
last_reload_error: Arc::new(ArcSwapOption::empty()),
last_publish_at: Arc::new(ArcSwap::from_pointee(Instant::now())),
last_failure_at: Arc::new(ArcSwapOption::empty()),
}
}
pub(crate) fn record_success<T>(&self, inner: &ArcSwap<T>, new: T) {
inner.store(Arc::new(new));
self.last_reload_error.store(None);
self.last_failure_at.store(None);
self.last_publish_at.store(Arc::new(Instant::now()));
self.generation.fetch_add(1, Ordering::Release);
}
pub(crate) fn record_failure(&self, err: &ShikumiError) {
self.last_failure_at.store(Some(Arc::new(Instant::now())));
self.failure_count.fetch_add(1, Ordering::Release);
self.last_reload_error
.store(Some(Arc::new(ReloadFailure::from_error(err))));
}
pub(crate) fn generation(&self) -> u64 {
self.generation.load(Ordering::Acquire)
}
pub(crate) fn failure_count(&self) -> u64 {
self.failure_count.load(Ordering::Acquire)
}
pub(crate) fn last_reload_error(&self) -> Option<Arc<ReloadFailure>> {
self.last_reload_error.load_full()
}
pub(crate) fn last_publish_at(&self) -> Instant {
**self.last_publish_at.load()
}
pub(crate) fn time_since_publish(&self) -> Duration {
self.last_publish_at().elapsed()
}
pub(crate) fn last_failure_at(&self) -> Option<Instant> {
self.last_failure_at.load_full().map(|arc| *arc)
}
pub(crate) fn time_since_failure(&self) -> Option<Duration> {
self.last_failure_at().map(|t| t.elapsed())
}
pub(crate) fn shared_generation(&self) -> Arc<AtomicU64> {
self.generation.clone()
}
pub(crate) fn shared_failure_count(&self) -> Arc<AtomicU64> {
self.failure_count.clone()
}
pub(crate) fn shared_last_reload_error(&self) -> Arc<ArcSwapOption<ReloadFailure>> {
self.last_reload_error.clone()
}
pub(crate) fn shared_last_publish_at(&self) -> Arc<ArcSwap<Instant>> {
self.last_publish_at.clone()
}
pub(crate) fn shared_last_failure_at(&self) -> Arc<ArcSwapOption<Instant>> {
self.last_failure_at.clone()
}
}
impl Clone for ReloadObservatory {
fn clone(&self) -> Self {
Self {
generation: self.generation.clone(),
failure_count: self.failure_count.clone(),
last_reload_error: self.last_reload_error.clone(),
last_publish_at: self.last_publish_at.clone(),
last_failure_at: self.last_failure_at.clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::source::ConfigSource;
use std::path::PathBuf;
use std::thread;
#[test]
fn new_initializes_all_slots_to_zero_or_empty() {
let obs = ReloadObservatory::new();
assert_eq!(obs.generation(), 0);
assert_eq!(obs.failure_count(), 0);
assert!(obs.last_reload_error().is_none());
assert!(obs.last_failure_at().is_none());
assert!(obs.time_since_failure().is_none());
assert!(obs.time_since_publish() < Duration::from_secs(1));
}
#[test]
fn new_stamps_publish_at_within_construction_window() {
let before = Instant::now();
let obs = ReloadObservatory::new();
let after = Instant::now();
let stamped = obs.last_publish_at();
assert!(stamped >= before && stamped <= after);
}
#[test]
fn record_success_publishes_value_and_bumps_generation() {
let obs = ReloadObservatory::new();
let inner = ArcSwap::from_pointee(0u32);
obs.record_success(&inner, 42);
assert_eq!(**inner.load(), 42);
assert_eq!(obs.generation(), 1);
assert_eq!(obs.failure_count(), 0);
}
#[test]
fn record_success_clears_failure_slots() {
let obs = ReloadObservatory::new();
let inner = ArcSwap::from_pointee(0u32);
let err = ShikumiError::Parse("bad".to_owned());
obs.record_failure(&err);
assert!(obs.last_reload_error().is_some());
assert!(obs.last_failure_at().is_some());
obs.record_success(&inner, 1);
assert!(obs.last_reload_error().is_none());
assert!(obs.last_failure_at().is_none());
}
#[test]
fn record_success_advances_publish_at() {
let obs = ReloadObservatory::new();
let inner = ArcSwap::from_pointee(0u32);
let t0 = obs.last_publish_at();
thread::sleep(Duration::from_millis(20));
obs.record_success(&inner, 1);
let t1 = obs.last_publish_at();
assert!(t1 > t0, "successful record must advance publish-at");
}
#[test]
fn record_failure_stamps_and_publishes() {
let obs = ReloadObservatory::new();
let err = ShikumiError::Parse("oops".to_owned());
let before = Instant::now();
obs.record_failure(&err);
let after = Instant::now();
let stamp = obs.last_failure_at().expect("stamp populated");
assert!(stamp >= before && stamp <= after);
let captured = obs.last_reload_error().expect("error populated");
assert_eq!(captured.message, err.to_string());
assert_eq!(obs.failure_count(), 1);
}
#[test]
fn record_failure_does_not_touch_value_or_generation() {
let obs = ReloadObservatory::new();
let inner = ArcSwap::from_pointee(99u32);
let err = ShikumiError::Parse("oops".to_owned());
obs.record_failure(&err);
assert_eq!(**inner.load(), 99);
assert_eq!(obs.generation(), 0);
}
#[test]
fn record_failure_preserves_publish_at() {
let obs = ReloadObservatory::new();
let t0 = obs.last_publish_at();
thread::sleep(Duration::from_millis(15));
obs.record_failure(&ShikumiError::Parse("oops".to_owned()));
let t1 = obs.last_publish_at();
assert_eq!(
t0, t1,
"failed record must preserve publish-at byte-for-byte"
);
}
#[test]
fn record_failure_is_monotonic() {
let obs = ReloadObservatory::new();
let err = ShikumiError::Parse("x".to_owned());
for i in 1..=5 {
obs.record_failure(&err);
assert_eq!(obs.failure_count(), i);
}
}
#[test]
fn record_success_does_not_clear_failure_count() {
let obs = ReloadObservatory::new();
let inner = ArcSwap::from_pointee(0u32);
obs.record_failure(&ShikumiError::Parse("bad".to_owned()));
obs.record_failure(&ShikumiError::Parse("bad2".to_owned()));
assert_eq!(obs.failure_count(), 2);
obs.record_success(&inner, 1);
assert_eq!(
obs.failure_count(),
2,
"recovery must not erase the lifetime failure-count"
);
assert_eq!(obs.generation(), 1);
}
#[test]
fn ordering_contract_failure_state_pinned_when_error_visible() {
let obs = ReloadObservatory::new();
let err = ShikumiError::Parse("oops".to_owned());
obs.record_failure(&err);
if obs.last_reload_error().is_some() {
assert!(obs.failure_count() >= 1);
assert!(obs.last_failure_at().is_some());
}
}
#[test]
fn clone_shares_underlying_atomics_with_original() {
let obs = ReloadObservatory::new();
let cloned = obs.clone();
let inner = ArcSwap::from_pointee(0u32);
obs.record_success(&inner, 1);
assert_eq!(cloned.generation(), 1);
assert_eq!(obs.generation(), 1);
cloned.record_failure(&ShikumiError::Parse("x".to_owned()));
assert_eq!(obs.failure_count(), 1);
assert!(obs.last_reload_error().is_some());
}
#[test]
fn shared_handles_point_at_same_atomics() {
let obs = ReloadObservatory::new();
let g = obs.shared_generation();
let fc = obs.shared_failure_count();
let err_handle = obs.shared_last_reload_error();
let pub_handle = obs.shared_last_publish_at();
let fail_handle = obs.shared_last_failure_at();
let inner = ArcSwap::from_pointee(0u32);
obs.record_success(&inner, 5);
assert_eq!(g.load(Ordering::Acquire), 1);
assert_eq!(**pub_handle.load(), obs.last_publish_at());
obs.record_failure(&ShikumiError::Parse("x".to_owned()));
assert_eq!(fc.load(Ordering::Acquire), 1);
assert!(err_handle.load_full().is_some());
assert!(fail_handle.load_full().is_some());
}
#[test]
fn shared_handles_outlive_observatory() {
let inner = ArcSwap::from_pointee(0u32);
let (g, fc, err_h, pub_h, fail_h, gen_observed) = {
let obs = ReloadObservatory::new();
obs.record_failure(&ShikumiError::Parse("x".to_owned()));
obs.record_success(&inner, 1);
(
obs.shared_generation(),
obs.shared_failure_count(),
obs.shared_last_reload_error(),
obs.shared_last_publish_at(),
obs.shared_last_failure_at(),
obs.generation(),
)
};
assert_eq!(g.load(Ordering::Acquire), gen_observed);
assert_eq!(fc.load(Ordering::Acquire), 1);
assert!(err_h.load_full().is_none());
assert!(fail_h.load_full().is_none());
let _ = **pub_h.load();
}
#[test]
fn record_failure_captures_full_failure_content() {
let obs = ReloadObservatory::new();
let chain = vec![
ConfigSource::Env("OBS_".to_owned()),
ConfigSource::File(PathBuf::from("/etc/x.yaml")),
];
let figment_err = {
let figment = figment::Figment::new();
figment.extract::<String>().unwrap_err()
};
let err = ShikumiError::Extract {
sources: chain.clone(),
error: Box::new(figment_err),
};
obs.record_failure(&err);
let captured = obs.last_reload_error().expect("captured");
assert_eq!(captured.sources, chain);
assert_eq!(captured.message, err.to_string());
}
#[test]
fn time_since_publish_grows_without_record() {
let obs = ReloadObservatory::new();
let e0 = obs.time_since_publish();
thread::sleep(Duration::from_millis(20));
let e1 = obs.time_since_publish();
assert!(e1 > e0);
}
#[test]
fn time_since_failure_none_until_first_failure() {
let obs = ReloadObservatory::new();
assert!(obs.time_since_failure().is_none());
obs.record_failure(&ShikumiError::Parse("x".to_owned()));
assert!(obs.time_since_failure().is_some());
}
#[test]
fn cross_thread_funnel_via_clone() {
let obs = ReloadObservatory::new();
let inner = Arc::new(ArcSwap::from_pointee(0u32));
let cloned = obs.clone();
let inner_clone = inner.clone();
let handle = thread::spawn(move || {
cloned.record_success(&inner_clone, 7);
cloned.record_failure(&ShikumiError::Parse("from-thread".to_owned()));
});
handle.join().expect("worker thread");
assert_eq!(**inner.load(), 7);
assert_eq!(obs.generation(), 1);
assert_eq!(obs.failure_count(), 1);
let captured = obs.last_reload_error().expect("captured from thread");
assert!(captured.message.contains("from-thread"));
}
}