use crate::store::StoreError;
use std::any::Any;
use std::fmt::Formatter;
use std::ops::Sub;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::{fmt, sync::atomic::AtomicUsize, time::Duration};
#[allow(dead_code)]
#[allow(unused_variables)]
pub(crate) trait Metrics: Send + Sync {
fn action_received(&self, data: Option<&dyn Any>) {}
fn action_dropped(&self, data: Option<&dyn Any>) {}
fn action_executed(&self, data: Option<&dyn Any>, duration: Duration) {}
fn middleware_executed(
&self,
data: Option<&dyn Any>,
middleware_name: &str,
count: usize,
duration: Duration,
) {
}
fn action_reduced(
&self,
data: Option<&dyn Any>,
duration: Duration,
duration_from_received: Duration,
) {
}
fn effect_issued(&self, count: usize) {}
fn effect_executed(&self, count: usize, duration: Duration) {}
fn state_notified(&self, data: Option<&dyn Any>) {}
fn subscriber_notified(&self, data: Option<&dyn Any>, count: usize, duration: Duration) {}
fn queue_size(&self, current_size: usize) {}
fn error_occurred(&self, error: &StoreError) {}
}
pub(crate) struct CountMetrics {
pub action_received: AtomicUsize,
pub action_dropped: AtomicUsize,
pub action_execution_time: AtomicUsize,
pub action_reduced: AtomicUsize,
pub reducer_time_max: AtomicUsize,
pub reducer_time_min: AtomicUsize,
pub reducer_execution_time: AtomicUsize,
pub action_received_and_reduced_execution_time: AtomicUsize,
pub effect_issued: AtomicUsize,
pub effect_executed: AtomicUsize,
pub middleware_executed: AtomicUsize,
pub middleware_time_max: AtomicUsize,
pub middleware_time_min: AtomicUsize,
pub middleware_execution_time: AtomicUsize,
pub state_notified: AtomicUsize,
pub subscriber_notified: AtomicUsize,
pub subscriber_time_max: AtomicUsize,
pub subscriber_time_min: AtomicUsize,
pub subscriber_execution_time: AtomicUsize,
pub remaining_queue: AtomicUsize,
pub remaining_queue_max: AtomicUsize,
pub error_occurred: AtomicUsize,
}
impl Default for CountMetrics {
fn default() -> Self {
Self {
action_received: AtomicUsize::new(0),
action_dropped: AtomicUsize::new(0),
action_execution_time: AtomicUsize::new(0),
action_reduced: AtomicUsize::new(0),
effect_issued: AtomicUsize::new(0),
effect_executed: AtomicUsize::new(0),
reducer_time_max: AtomicUsize::new(0),
reducer_time_min: AtomicUsize::new(0),
reducer_execution_time: AtomicUsize::new(0),
action_received_and_reduced_execution_time: AtomicUsize::new(0),
middleware_executed: AtomicUsize::new(0),
middleware_time_max: AtomicUsize::new(0),
middleware_time_min: AtomicUsize::new(0),
middleware_execution_time: AtomicUsize::new(0),
state_notified: Default::default(),
subscriber_notified: AtomicUsize::new(0),
subscriber_time_max: AtomicUsize::new(0),
subscriber_time_min: AtomicUsize::new(0),
subscriber_execution_time: AtomicUsize::new(0),
remaining_queue: AtomicUsize::new(0),
remaining_queue_max: AtomicUsize::new(0),
error_occurred: AtomicUsize::new(0),
}
}
}
impl fmt::Display for CountMetrics {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"action_received: {:?}",
self.action_received.load(Ordering::SeqCst)
)?;
write!(
f,
", action_dropped: {:?}",
self.action_dropped.load(Ordering::SeqCst)
)?;
write!(
f,
", action_execution_time: {:?}",
self.action_execution_time.load(Ordering::SeqCst)
)?;
write!(
f,
", action_reduced: {:?}",
self.action_reduced.load(Ordering::SeqCst)
)?;
write!(
f,
", action_received_and_reduced_execution_time: {:?}",
self.action_received_and_reduced_execution_time.load(Ordering::SeqCst)
)?;
write!(
f,
", reducer_time_max: {:?}",
self.reducer_time_max.load(Ordering::SeqCst)
)?;
write!(
f,
", reducer_time_min: {:?}",
self.reducer_time_min.load(Ordering::SeqCst)
)?;
write!(
f,
", reducer_execution_time: {:?}",
self.reducer_execution_time.load(Ordering::SeqCst)
)?;
write!(
f,
", middleware_executed: {:?}",
self.middleware_executed.load(Ordering::SeqCst)
)?;
write!(
f,
", middleware_time_max: {:?}",
self.middleware_time_max.load(Ordering::SeqCst)
)?;
write!(
f,
", middleware_time_min: {:?}",
self.middleware_time_min.load(Ordering::SeqCst)
)?;
write!(
f,
", middleware_execution_time: {:?}",
self.middleware_execution_time.load(Ordering::SeqCst)
)?;
write!(
f,
", state_notified: {:?}",
self.state_notified.load(Ordering::SeqCst)
)?;
write!(
f,
", subscriber_notified: {:?}",
self.subscriber_notified.load(Ordering::SeqCst)
)?;
write!(
f,
", subscriber_time_max: {:?}",
self.subscriber_time_max.load(Ordering::SeqCst)
)?;
write!(
f,
", subscriber_time_min: {:?}",
self.subscriber_time_min.load(Ordering::SeqCst)
)?;
write!(
f,
", subscriber_execution_time: {:?}",
self.subscriber_execution_time.load(Ordering::SeqCst)
)?;
write!(
f,
", remaining_queue: {:?}",
self.remaining_queue.load(Ordering::SeqCst)
)?;
write!(
f,
", remaining_queue_max: {:?}",
self.remaining_queue_max.load(Ordering::SeqCst)
)?;
write!(
f,
", error_occurred: {:?}",
self.error_occurred.load(Ordering::SeqCst)
)?;
Ok(())
}
}
#[allow(unused_variables)]
impl Metrics for CountMetrics {
fn action_received(&self, data: Option<&dyn Any>) {
self.action_received.fetch_add(1, Ordering::SeqCst);
}
fn action_dropped(&self, data: Option<&dyn Any>) {
self.action_dropped.fetch_add(1, Ordering::SeqCst);
}
fn action_executed(&self, data: Option<&dyn Any>, duration: Duration) {
let duration_ms = duration.as_millis() as usize;
self.action_execution_time.fetch_add(duration_ms, Ordering::SeqCst);
}
fn middleware_executed(
&self,
data: Option<&dyn Any>,
_middleware_name: &str,
count: usize,
duration: Duration,
) {
self.middleware_executed.fetch_add(count, Ordering::SeqCst);
let duration_ms = duration.as_millis() as usize;
if duration_ms > self.middleware_time_max.load(Ordering::SeqCst) {
self.middleware_time_max.store(duration_ms, Ordering::SeqCst);
}
if self.middleware_time_min.load(Ordering::SeqCst) == 0
|| duration_ms < self.middleware_time_min.load(Ordering::SeqCst)
{
self.middleware_time_min.store(duration_ms, Ordering::SeqCst);
}
self.middleware_execution_time.fetch_add(duration_ms, Ordering::SeqCst);
}
fn action_reduced(
&self,
data: Option<&dyn Any>,
duration: Duration,
duration_from_received: Duration,
) {
self.action_reduced.fetch_add(1, Ordering::SeqCst);
let duration_ms = duration.as_millis() as usize;
if duration_ms > self.reducer_time_max.load(Ordering::SeqCst) {
self.reducer_time_max.store(duration_ms, Ordering::SeqCst);
}
if self.reducer_time_min.load(Ordering::SeqCst) == 0
|| duration_ms < self.reducer_time_min.load(Ordering::SeqCst)
{
self.reducer_time_min.store(duration_ms, Ordering::SeqCst);
}
self.reducer_execution_time.fetch_add(duration_ms, Ordering::SeqCst);
self.action_received_and_reduced_execution_time.fetch_add(
duration_from_received.as_millis() as usize,
Ordering::SeqCst,
);
}
fn effect_issued(&self, count: usize) {
self.effect_issued.fetch_add(count, Ordering::SeqCst);
}
fn effect_executed(&self, count: usize, _duration: Duration) {
self.effect_executed.fetch_add(count, Ordering::SeqCst);
}
fn state_notified(&self, data: Option<&dyn Any>) {
self.state_notified.fetch_add(1, Ordering::SeqCst);
}
fn subscriber_notified(&self, data: Option<&dyn Any>, count: usize, duration: Duration) {
self.subscriber_notified.fetch_add(count, Ordering::SeqCst);
let duration_ms = duration.as_millis() as usize;
if duration_ms > self.subscriber_time_max.load(Ordering::SeqCst) {
self.subscriber_time_max.store(duration_ms, Ordering::SeqCst);
}
if self.subscriber_time_min.load(Ordering::SeqCst) == 0
|| duration_ms < self.subscriber_time_min.load(Ordering::SeqCst)
{
self.subscriber_time_min.store(duration_ms, Ordering::SeqCst);
}
self.subscriber_execution_time.fetch_add(duration_ms, Ordering::SeqCst);
}
fn queue_size(&self, current_size: usize) {
self.remaining_queue.store(current_size, Ordering::SeqCst);
if current_size > self.remaining_queue_max.load(Ordering::SeqCst) {
self.remaining_queue_max.store(current_size, Ordering::SeqCst);
}
}
fn error_occurred(&self, error: &StoreError) {
self.error_occurred.fetch_add(1, Ordering::SeqCst);
}
}
#[allow(dead_code)]
impl CountMetrics {
pub fn new() -> Arc<Self> {
Arc::new(Self::default())
}
pub fn reset(&self) {
self.action_received.store(0, Ordering::SeqCst);
self.action_dropped.store(0, Ordering::SeqCst);
self.action_reduced.store(0, Ordering::SeqCst);
self.action_received_and_reduced_execution_time.store(0, Ordering::SeqCst);
self.effect_issued.store(0, Ordering::SeqCst);
self.effect_executed.store(0, Ordering::SeqCst);
self.reducer_time_max.store(0, Ordering::SeqCst);
self.reducer_time_min.store(0, Ordering::SeqCst);
self.reducer_execution_time.store(0, Ordering::SeqCst);
self.middleware_executed.store(0, Ordering::SeqCst);
self.middleware_time_max.store(0, Ordering::SeqCst);
self.middleware_time_min.store(0, Ordering::SeqCst);
self.middleware_execution_time.store(0, Ordering::SeqCst);
self.state_notified.store(0, Ordering::SeqCst);
self.subscriber_notified.store(0, Ordering::SeqCst);
self.subscriber_time_max.store(0, Ordering::SeqCst);
self.subscriber_time_min.store(0, Ordering::SeqCst);
self.subscriber_execution_time.store(0, Ordering::SeqCst);
self.remaining_queue.store(0, Ordering::SeqCst);
self.remaining_queue_max.store(0, Ordering::SeqCst);
self.error_occurred.store(0, Ordering::SeqCst);
}
}
#[allow(dead_code)]
#[derive(Default)]
pub struct MetricsSnapshot {
pub action_received: usize,
pub action_dropped: usize,
pub action_received_and_reduced_execution_time: usize,
pub action_reduced: usize,
pub effect_issued: usize,
pub(crate) effect_executed: usize,
pub reducer_time_max: usize,
pub reducer_time_min: usize,
pub reducer_execution_time: usize,
pub middleware_executed: usize,
pub middleware_time_max: usize,
pub middleware_time_min: usize,
pub middleware_execution_time: usize,
pub state_notified: usize,
pub subscriber_notified: usize,
pub subscriber_time_max: usize,
pub subscriber_time_min: usize,
pub subscriber_execution_time: usize,
pub(crate) remaining_queue: usize,
pub(crate) remaining_queue_max: usize,
pub error_occurred: usize,
}
impl From<&CountMetrics> for MetricsSnapshot {
fn from(value: &CountMetrics) -> Self {
Self {
action_received: value.action_received.load(Ordering::SeqCst),
action_dropped: value.action_dropped.load(Ordering::SeqCst),
action_reduced: value.action_reduced.load(Ordering::SeqCst),
action_received_and_reduced_execution_time: value
.action_received_and_reduced_execution_time
.load(Ordering::SeqCst),
effect_issued: value.effect_issued.load(Ordering::SeqCst),
effect_executed: value.effect_executed.load(Ordering::SeqCst),
reducer_time_max: value.reducer_time_max.load(Ordering::SeqCst),
reducer_time_min: value.reducer_time_min.load(Ordering::SeqCst),
reducer_execution_time: value.reducer_execution_time.load(Ordering::SeqCst),
middleware_executed: value.middleware_executed.load(Ordering::SeqCst),
middleware_time_max: value.middleware_time_max.load(Ordering::SeqCst),
middleware_time_min: value.middleware_time_min.load(Ordering::SeqCst),
middleware_execution_time: value.middleware_execution_time.load(Ordering::SeqCst),
state_notified: value.state_notified.load(Ordering::SeqCst),
subscriber_notified: value.subscriber_notified.load(Ordering::SeqCst),
subscriber_time_max: value.subscriber_time_max.load(Ordering::SeqCst),
subscriber_time_min: value.subscriber_time_min.load(Ordering::SeqCst),
subscriber_execution_time: value.subscriber_execution_time.load(Ordering::SeqCst),
remaining_queue: value.remaining_queue.load(Ordering::SeqCst),
remaining_queue_max: value.remaining_queue_max.load(Ordering::SeqCst),
error_occurred: value.error_occurred.load(Ordering::SeqCst),
}
}
}
impl Sub<MetricsSnapshot> for MetricsSnapshot {
type Output = MetricsSnapshot;
fn sub(self, rhs: MetricsSnapshot) -> Self::Output {
Self::Output {
action_received: self.action_received - rhs.action_received,
action_dropped: self.action_dropped - rhs.action_dropped,
action_reduced: self.action_reduced - rhs.action_reduced,
action_received_and_reduced_execution_time: self
.action_received_and_reduced_execution_time
- rhs.action_received_and_reduced_execution_time,
effect_issued: self.effect_issued - rhs.effect_issued,
effect_executed: self.effect_executed - rhs.effect_executed,
reducer_time_max: self.reducer_time_max - rhs.reducer_time_max,
reducer_time_min: self.reducer_time_min - rhs.reducer_time_min,
reducer_execution_time: self.reducer_execution_time - rhs.reducer_execution_time,
middleware_executed: self.middleware_executed - rhs.middleware_executed,
middleware_time_max: self.middleware_time_max - rhs.middleware_time_max,
middleware_time_min: self.middleware_time_min - rhs.middleware_time_min,
middleware_execution_time: self.middleware_execution_time
- rhs.middleware_execution_time,
state_notified: self.state_notified - rhs.state_notified,
subscriber_notified: self.subscriber_notified - rhs.subscriber_notified,
subscriber_time_max: self.subscriber_time_max - rhs.subscriber_time_max,
subscriber_time_min: self.subscriber_time_min - rhs.subscriber_time_min,
subscriber_execution_time: self.subscriber_execution_time
- rhs.subscriber_execution_time,
remaining_queue: self.remaining_queue - rhs.remaining_queue,
remaining_queue_max: self.remaining_queue_max - rhs.remaining_queue_max,
error_occurred: self.error_occurred - rhs.error_occurred,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
BackpressurePolicy, Dispatcher, MiddlewareFn, MiddlewareFnFactory, Reducer, StoreImpl,
};
use std::sync::Arc;
use std::thread;
struct TestReducer;
impl Reducer<i32, i32> for TestReducer {
fn reduce(&self, state: &i32, action: &i32) -> crate::DispatchOp<i32, i32> {
let new_state = state + action;
thread::sleep(Duration::from_millis(10)); crate::DispatchOp::Dispatch(new_state, vec![])
}
}
struct TestMiddleware {
#[allow(dead_code)]
name: String,
}
impl TestMiddleware {
fn new(name: &str) -> Self {
Self {
name: name.to_string(),
}
}
}
impl<State, Action> MiddlewareFnFactory<State, Action> for TestMiddleware
where
State: Send + Sync + Clone + 'static,
Action: Send + Sync + Clone + 'static,
{
fn create(&self, inner: MiddlewareFn<State, Action>) -> MiddlewareFn<State, Action> {
Arc::new(move |state: &State, action: &Action| inner(state, action))
}
}
#[test]
fn test_count_metrics_basic() {
let store = StoreImpl::new_with(
0,
vec![Box::new(TestReducer)],
"test".to_string(),
5,
BackpressurePolicy::DropOldestIf(None),
vec![],
)
.unwrap();
let _ = store.dispatch(1);
let _ = store.dispatch(2);
let _ = store.dispatch(3);
match store.stop() {
Ok(_) => println!("store stopped"),
Err(e) => {
panic!("store stop failed : {:?}", e);
}
}
let metrics = store.get_metrics();
assert_eq!(metrics.action_received, 3 + 1);
assert_eq!(metrics.action_reduced, 3);
assert!(metrics.reducer_execution_time > 0);
}
#[test]
fn test_count_metrics_with_dropped_actions() {
let store = StoreImpl::new_with(
0,
vec![Box::new(TestReducer)],
"test".to_string(),
2,
BackpressurePolicy::DropOldestIf(None),
vec![],
)
.unwrap();
for i in 0..5 {
let _ = store.dispatch(i);
}
match store.stop() {
Ok(_) => println!("store stopped"),
Err(e) => {
panic!("store stop failed : {:?}", e);
}
}
let metrics = store.get_metrics();
assert!(metrics.action_dropped > 0);
assert!(metrics.remaining_queue_max <= 2);
}
#[test]
fn test_count_metrics_with_middleware() {
let middleware = Arc::new(TestMiddleware::new("test"));
#[allow(deprecated)]
let store = StoreImpl::new_with(
0,
vec![Box::new(TestReducer)],
"test".to_string(),
5,
BackpressurePolicy::DropOldestIf(None),
vec![middleware],
)
.unwrap();
store.dispatch(1).expect("no error");
match store.stop() {
Ok(_) => println!("store stopped"),
Err(e) => {
panic!("store stop failed : {:?}", e);
}
}
let metrics = store.get_metrics();
assert_eq!(metrics.action_received, 1 + 1);
assert_eq!(metrics.action_reduced, 1);
assert_eq!(metrics.state_notified, 1);
assert_eq!(metrics.subscriber_notified, 0);
assert!(metrics.middleware_execution_time > 0);
assert!(metrics.reducer_execution_time > 0);
assert!(
metrics.middleware_execution_time >= metrics.reducer_execution_time,
"middleware time should be greater than reducer time"
);
}
#[test]
fn test_count_metrics_reset() {
let metrics: CountMetrics = CountMetrics::default();
metrics.action_received.fetch_add(5, Ordering::SeqCst);
metrics.action_reduced.fetch_add(3, Ordering::SeqCst);
metrics.subscriber_notified.fetch_add(2, Ordering::SeqCst);
metrics.reset();
assert_eq!(metrics.action_received.load(Ordering::SeqCst), 0);
assert_eq!(metrics.action_reduced.load(Ordering::SeqCst), 0);
assert_eq!(metrics.subscriber_notified.load(Ordering::SeqCst), 0);
assert_eq!(metrics.middleware_execution_time.load(Ordering::SeqCst), 0);
assert_eq!(metrics.reducer_execution_time.load(Ordering::SeqCst), 0);
}
#[test]
fn test_metrics_snapshot_sub() {
let metrics1 = MetricsSnapshot {
action_received: 1,
action_dropped: 2,
action_reduced: 3,
..Default::default()
};
let metrics2 = MetricsSnapshot {
action_received: 11,
action_dropped: 12,
action_reduced: 13,
..Default::default()
};
let diff = metrics2 - metrics1;
assert_eq!(diff.action_received, 10);
assert_eq!(diff.action_dropped, 10);
assert_eq!(diff.action_reduced, 10);
}
}