use std::cell::RefCell;
use std::marker::PhantomData;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::time::Duration;
use fast_time::Clock;
use num_traits::AsPrimitive;
use crate::{EventBuilder, Magnitude, Observe, PublishModel, Pull};
#[derive(Debug)]
pub struct Event<P = Pull>
where
P: PublishModel,
{
publish_model: P,
clock: RefCell<Clock>,
_single_threaded: PhantomData<*const ()>,
}
impl<P: PublishModel> UnwindSafe for Event<P> {}
impl<P: PublishModel> RefUnwindSafe for Event<P> {}
impl Event<Pull> {
#[must_use]
#[cfg_attr(test, mutants::skip)] pub fn builder() -> EventBuilder<Pull> {
EventBuilder::new()
}
}
impl<P> Event<P>
where
P: PublishModel,
{
#[must_use]
pub(crate) fn new(publish_model: P) -> Self {
Self {
publish_model,
clock: RefCell::new(Clock::new()),
_single_threaded: PhantomData,
}
}
#[inline]
pub fn observe_once(&self) {
self.batch(1).observe(1);
}
#[inline]
pub fn observe(&self, magnitude: impl AsPrimitive<Magnitude>) {
self.batch(1).observe(magnitude);
}
#[inline]
pub fn observe_millis(&self, duration: Duration) {
self.batch(1).observe_millis(duration);
}
#[inline]
pub fn observe_duration_millis<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
self.batch(1).observe_duration_millis(f)
}
#[must_use]
#[inline]
pub fn batch(&self, count: usize) -> ObservationBatch<'_, P> {
ObservationBatch { event: self, count }
}
#[cfg(test)]
pub(crate) fn snapshot(&self) -> crate::ObservationBagSnapshot {
self.publish_model.snapshot()
}
}
#[derive(Debug)]
pub struct ObservationBatch<'a, P>
where
P: PublishModel,
{
event: &'a Event<P>,
count: usize,
}
impl<P> ObservationBatch<'_, P>
where
P: PublishModel,
{
#[inline]
pub fn observe_once(&self) {
self.event.publish_model.insert(1, self.count);
}
#[inline]
pub fn observe(&self, magnitude: impl AsPrimitive<Magnitude>) {
self.event.publish_model.insert(magnitude.as_(), self.count);
}
#[inline]
pub fn observe_millis(&self, duration: Duration) {
#[expect(
clippy::cast_possible_truncation,
reason = "intentional - nothing we can do about it; typical values are in safe range"
)]
let millis = duration.as_millis() as i64;
self.event.publish_model.insert(millis, self.count);
}
#[inline]
pub fn observe_duration_millis<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
let mut clock = self.event.clock.borrow_mut();
let start = clock.now();
let result = f();
let elapsed = start.elapsed(&mut clock);
drop(clock);
self.observe_millis(elapsed);
result
}
}
#[cfg_attr(coverage_nightly, coverage(off))] impl<P> Observe for Event<P>
where
P: PublishModel,
{
#[cfg_attr(test, mutants::skip)] #[inline]
fn observe_once(&self) {
self.observe_once();
}
#[cfg_attr(test, mutants::skip)] #[inline]
fn observe(&self, magnitude: impl AsPrimitive<Magnitude>) {
self.observe(magnitude);
}
#[cfg_attr(test, mutants::skip)] #[inline]
fn observe_millis(&self, duration: Duration) {
self.observe_millis(duration);
}
#[cfg_attr(test, mutants::skip)] #[inline]
fn observe_duration_millis<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
self.observe_duration_millis(f)
}
}
#[cfg_attr(coverage_nightly, coverage(off))] impl<P> Observe for ObservationBatch<'_, P>
where
P: PublishModel,
{
#[cfg_attr(test, mutants::skip)] #[inline]
fn observe_once(&self) {
self.observe_once();
}
#[cfg_attr(test, mutants::skip)] #[inline]
fn observe(&self, magnitude: impl AsPrimitive<Magnitude>) {
self.observe(magnitude);
}
#[cfg_attr(test, mutants::skip)] #[inline]
fn observe_millis(&self, duration: Duration) {
self.observe_millis(duration);
}
#[cfg_attr(test, mutants::skip)] #[inline]
fn observe_duration_millis<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
self.observe_duration_millis(f)
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::rc::Rc;
use std::sync::Arc;
use static_assertions::{assert_impl_all, assert_not_impl_any};
use super::*;
use crate::{ObservationBag, ObservationBagSync, Push};
assert_impl_all!(Event<Pull>: UnwindSafe, RefUnwindSafe);
assert_impl_all!(Event<Push>: UnwindSafe, RefUnwindSafe);
#[test]
fn pull_event_observations_are_recorded() {
let observations = Arc::new(ObservationBagSync::new(&[]));
let event = Event {
publish_model: Pull { observations },
clock: RefCell::new(Clock::new()),
_single_threaded: PhantomData,
};
let snapshot = event.snapshot();
assert_eq!(snapshot.count, 0);
assert_eq!(snapshot.sum, 0);
event.observe_once();
let snapshot = event.snapshot();
assert_eq!(snapshot.count, 1);
assert_eq!(snapshot.sum, 1);
event.batch(3).observe_once();
let snapshot = event.snapshot();
assert_eq!(snapshot.count, 4);
assert_eq!(snapshot.sum, 4);
event.observe(5);
let snapshot = event.snapshot();
assert_eq!(snapshot.count, 5);
assert_eq!(snapshot.sum, 9);
event.observe_millis(Duration::from_millis(100));
let snapshot = event.snapshot();
assert_eq!(snapshot.count, 6);
assert_eq!(snapshot.sum, 109);
event.batch(2).observe(10);
let snapshot = event.snapshot();
assert_eq!(snapshot.count, 8);
assert_eq!(snapshot.sum, 129);
}
#[test]
fn push_event_observations_are_recorded() {
let observations = Rc::new(ObservationBag::new(&[]));
let event = Event {
publish_model: Push { observations },
clock: RefCell::new(Clock::new()),
_single_threaded: PhantomData,
};
let snapshot = event.snapshot();
assert_eq!(snapshot.count, 0);
assert_eq!(snapshot.sum, 0);
event.observe_once();
let snapshot = event.snapshot();
assert_eq!(snapshot.count, 1);
assert_eq!(snapshot.sum, 1);
event.batch(3).observe_once();
let snapshot = event.snapshot();
assert_eq!(snapshot.count, 4);
assert_eq!(snapshot.sum, 4);
event.observe(5);
let snapshot = event.snapshot();
assert_eq!(snapshot.count, 5);
assert_eq!(snapshot.sum, 9);
event.observe_millis(Duration::from_millis(100));
let snapshot = event.snapshot();
assert_eq!(snapshot.count, 6);
assert_eq!(snapshot.sum, 109);
event.batch(2).observe(10);
let snapshot = event.snapshot();
assert_eq!(snapshot.count, 8);
assert_eq!(snapshot.sum, 129);
}
#[test]
fn event_accepts_different_numeric_types_without_casting() {
let event = Event::builder().name("test_event").build();
event.observe(1_u8);
event.observe(2_u16);
event.observe(3_u32);
event.observe(4_u64);
event.observe(5_usize);
event.observe(6.66);
event.observe(7_i32);
event.observe(8_i128);
}
#[test]
fn single_threaded_type() {
assert_not_impl_any!(Event: Send, Sync);
}
}