fluvio_smartengine/engine/
metrics.rs

1use std::cmp::max;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::time::Duration;
4
5use serde::{Deserialize, Serialize};
6
7const DEFAULT_ORDERING: Ordering = Ordering::Relaxed;
8
9#[derive(Serialize, Deserialize, Default, Debug)]
10pub struct SmartModuleChainMetrics {
11    bytes_in: AtomicU64,
12    records_out: AtomicU64,
13    invocation_count: AtomicU64,
14    fuel_used: AtomicU64,
15    // CPU time in milliseconds
16    // allow this to be missing for deserialization for legacy use
17    #[serde(default)]
18    cpu_ms: AtomicU64,
19}
20
21impl SmartModuleChainMetrics {
22    pub fn add_bytes_in(&self, value: u64) {
23        self.bytes_in.fetch_add(value, DEFAULT_ORDERING);
24        self.invocation_count.fetch_add(1, DEFAULT_ORDERING);
25    }
26
27    pub fn add_records_out(&self, value: u64) {
28        self.records_out.fetch_add(value, DEFAULT_ORDERING);
29    }
30
31    pub fn add_fuel_used(&self, fuel: u64, cpu_elapsed: Duration) {
32        let cpu_ms = cpu_elapsed.as_millis() as u64;
33        self.cpu_ms.fetch_add(max(cpu_ms, 1), DEFAULT_ORDERING);
34        self.fuel_used.fetch_add(fuel, DEFAULT_ORDERING);
35    }
36
37    pub fn bytes_in(&self) -> u64 {
38        self.bytes_in.load(DEFAULT_ORDERING)
39    }
40
41    pub fn records_out(&self) -> u64 {
42        self.records_out.load(DEFAULT_ORDERING)
43    }
44
45    pub fn fuel_used(&self) -> u64 {
46        self.fuel_used.load(DEFAULT_ORDERING)
47    }
48
49    pub fn invocation_count(&self) -> u64 {
50        self.invocation_count.load(DEFAULT_ORDERING)
51    }
52
53    pub fn cpu_ms(&self) -> u64 {
54        self.cpu_ms.load(DEFAULT_ORDERING)
55    }
56}
57
58#[cfg(test)]
59mod t_smartmodule_metrics {
60
61    #[test]
62    fn test_metrics() {
63        use std::time::Duration;
64        use super::SmartModuleChainMetrics;
65
66        let metrics = SmartModuleChainMetrics::default();
67        let elapsed = Duration::from_millis(100);
68        let fuel = 100;
69        metrics.add_fuel_used(fuel, elapsed);
70
71        assert_eq!(metrics.cpu_ms(), 100);
72
73        let _out = serde_json::to_string(&metrics).expect("serialize");
74    }
75
76    #[test]
77    fn last_version() {
78        use super::SmartModuleChainMetrics;
79
80        // previous version, no cpu_ms
81        let input = r#"{"bytes_in":0,"records_out":0,"invocation_count":0,"fuel_used":0}"#;
82        let metrics: SmartModuleChainMetrics = serde_json::from_str(input).expect("deserialize");
83        assert_eq!(metrics.cpu_ms(), 0);
84
85        // check behavior w/ extra property
86        let input =
87            r#"{"bytes_in":0,"records_out":0,"invocation_count":0,"fuel_used":0, "extra": 1}"#;
88        let metrics: SmartModuleChainMetrics =
89            serde_json::from_str(input).expect("deserialize with extra");
90        assert_eq!(metrics.cpu_ms(), 0);
91    }
92}