use std::cell::RefCell;
use std::marker::PhantomData;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::rc::Rc;
use std::sync::Arc;
use crate::{EventName, LOCAL_REGISTRY, ObservationBag, ObservationBagSync, Observations};
#[derive(Debug)]
pub struct MetricsPusher {
push_registry: Rc<RefCell<Vec<LocalGlobalPair>>>,
_single_threaded: PhantomData<*const ()>,
}
impl UnwindSafe for MetricsPusher {}
impl RefUnwindSafe for MetricsPusher {}
impl MetricsPusher {
#[must_use]
pub fn new() -> Self {
Self {
push_registry: Rc::new(RefCell::new(Vec::new())),
_single_threaded: PhantomData,
}
}
pub fn push(&self) {
for pair in self.push_registry.borrow().iter() {
pair.global.copy_from(&pair.local);
}
}
pub(crate) fn pre_register(&self) -> PusherPreRegistration {
PusherPreRegistration {
push_registry: Rc::clone(&self.push_registry),
}
}
#[cfg(test)]
pub(crate) fn event_count(&self) -> usize {
self.push_registry.borrow().len()
}
}
impl Default for MetricsPusher {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
struct LocalGlobalPair {
local: Rc<ObservationBag>,
global: Arc<ObservationBagSync>,
}
#[derive(Debug)]
pub(crate) struct PusherPreRegistration {
push_registry: Rc<RefCell<Vec<LocalGlobalPair>>>,
}
impl PusherPreRegistration {
pub(crate) fn register(self, name: EventName, source: Rc<ObservationBag>) {
let global = Arc::new(ObservationBagSync::new(source.bucket_magnitudes()));
LOCAL_REGISTRY.with_borrow(|r| r.register(name, Arc::clone(&global)));
self.push_registry.borrow_mut().push(LocalGlobalPair {
local: source,
global,
});
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use std::panic::{RefUnwindSafe, UnwindSafe};
use static_assertions::assert_impl_all;
use super::*;
assert_impl_all!(MetricsPusher: UnwindSafe, RefUnwindSafe);
#[test]
fn default_creates_valid_pusher() {
let pusher = MetricsPusher::default();
assert_eq!(pusher.event_count(), 0);
let pre_registration = pusher.pre_register();
let source = Rc::new(ObservationBag::new(&[]));
pre_registration.register("default_test_event".into(), source);
assert_eq!(pusher.event_count(), 1);
}
#[test]
fn data_updated_only_on_push() {
let local = Rc::new(ObservationBag::new(&[]));
local.insert(1, 1);
let pusher = MetricsPusher::new();
let pre_registration = pusher.pre_register();
pre_registration.register("test_event".into(), Rc::clone(&local));
let global = Arc::clone(&pusher.push_registry.borrow().first().unwrap().global);
let global_snapshot = global.snapshot();
assert_eq!(0, global_snapshot.count);
pusher.push();
let global_snapshot = global.snapshot();
assert_eq!(1, global_snapshot.count);
local.insert(1, 1);
let global_snapshot = global.snapshot();
assert_eq!(1, global_snapshot.count);
pusher.push();
let global_snapshot = global.snapshot();
assert_eq!(2, global_snapshot.count);
pusher.push();
let global_snapshot = global.snapshot();
assert_eq!(2, global_snapshot.count);
}
}