fluvio_smartengine/engine/
metrics.rs

1use std::cmp::max;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::time::Duration;
4
5use serde::{Deserialize, Serialize};
6
7const DEFAULT_ORDERING: Ordering = Ordering::Relaxed;
8
9#[derive(Serialize, Deserialize, Debug)]
10pub struct SmartModuleChainMetrics {
11    bytes_in: AtomicU64,
12    records_out: AtomicU64,
13    records_err: AtomicU64,
14    invocation_count: AtomicU64,
15    fuel_used: AtomicU64,
16    // CPU time in milliseconds
17    // allow this to be missing for deserialization for legacy use
18    #[serde(default)]
19    cpu_ms: AtomicU64,
20    // Names of the SmartModules in the chain
21    #[serde(default)]
22    smartmodule_names: Vec<String>,
23}
24
25impl Clone for SmartModuleChainMetrics {
26    fn clone(&self) -> Self {
27        Self {
28            bytes_in: AtomicU64::new(self.bytes_in.load(DEFAULT_ORDERING)),
29            records_out: AtomicU64::new(self.records_out.load(DEFAULT_ORDERING)),
30            records_err: AtomicU64::new(self.records_err.load(DEFAULT_ORDERING)),
31            invocation_count: AtomicU64::new(self.invocation_count.load(DEFAULT_ORDERING)),
32            fuel_used: AtomicU64::new(self.fuel_used.load(DEFAULT_ORDERING)),
33            cpu_ms: AtomicU64::new(self.cpu_ms.load(DEFAULT_ORDERING)),
34            smartmodule_names: self.smartmodule_names.clone(),
35        }
36    }
37}
38
39impl Default for SmartModuleChainMetrics {
40    fn default() -> Self {
41        Self::new(&[])
42    }
43}
44
45impl SmartModuleChainMetrics {
46    /// Create a new instance of SmartModuleChainMetrics
47    /// with the given names of SmartModules.
48    pub fn new(names: &[String]) -> Self {
49        Self {
50            bytes_in: AtomicU64::new(0),
51            records_out: AtomicU64::new(0),
52            records_err: AtomicU64::new(0),
53            invocation_count: AtomicU64::new(0),
54            fuel_used: AtomicU64::new(0),
55            cpu_ms: AtomicU64::new(0),
56            smartmodule_names: names.to_vec(),
57        }
58    }
59
60    pub fn add_bytes_in(&self, value: u64) {
61        self.bytes_in.fetch_add(value, DEFAULT_ORDERING);
62    }
63
64    pub fn add_invocation_count(&self, value: u64) {
65        self.invocation_count.fetch_add(value, DEFAULT_ORDERING);
66    }
67
68    pub fn add_records_out(&self, value: u64) {
69        self.records_out.fetch_add(value, DEFAULT_ORDERING);
70    }
71
72    pub fn add_records_err(&self, value: u64) {
73        self.records_err.fetch_add(value, DEFAULT_ORDERING);
74    }
75
76    pub fn add_fuel_used(&self, fuel: u64, cpu_elapsed: Duration) {
77        let cpu_ms = cpu_elapsed.as_millis() as u64;
78        self.cpu_ms.fetch_add(max(cpu_ms, 1), DEFAULT_ORDERING);
79        self.fuel_used.fetch_add(fuel, DEFAULT_ORDERING);
80    }
81
82    pub fn bytes_in(&self) -> u64 {
83        self.bytes_in.load(DEFAULT_ORDERING)
84    }
85
86    pub fn records_out(&self) -> u64 {
87        self.records_out.load(DEFAULT_ORDERING)
88    }
89
90    pub fn fuel_used(&self) -> u64 {
91        self.fuel_used.load(DEFAULT_ORDERING)
92    }
93
94    pub fn invocation_count(&self) -> u64 {
95        self.invocation_count.load(DEFAULT_ORDERING)
96    }
97
98    pub fn cpu_ms(&self) -> u64 {
99        self.cpu_ms.load(DEFAULT_ORDERING)
100    }
101
102    pub fn records_err(&self) -> u64 {
103        self.records_err.load(DEFAULT_ORDERING)
104    }
105
106    pub fn smartmodule_names(&self) -> &Vec<String> {
107        &self.smartmodule_names
108    }
109
110    pub fn append(&self, other: &SmartModuleChainMetrics) {
111        self.bytes_in
112            .fetch_add(other.bytes_in.load(DEFAULT_ORDERING), DEFAULT_ORDERING);
113        self.records_out
114            .fetch_add(other.records_out.load(DEFAULT_ORDERING), DEFAULT_ORDERING);
115        self.fuel_used
116            .fetch_add(other.fuel_used.load(DEFAULT_ORDERING), DEFAULT_ORDERING);
117        self.cpu_ms
118            .fetch_add(other.cpu_ms.load(DEFAULT_ORDERING), DEFAULT_ORDERING);
119        self.invocation_count.fetch_add(
120            other.invocation_count.load(DEFAULT_ORDERING),
121            DEFAULT_ORDERING,
122        );
123        self.records_err
124            .fetch_add(other.records_err.load(DEFAULT_ORDERING), DEFAULT_ORDERING);
125    }
126    pub fn reset(&self) {
127        self.bytes_in.store(0, DEFAULT_ORDERING);
128        self.records_out.store(0, DEFAULT_ORDERING);
129        self.fuel_used.store(0, DEFAULT_ORDERING);
130        self.cpu_ms.store(0, DEFAULT_ORDERING);
131        self.invocation_count.store(0, DEFAULT_ORDERING);
132        self.records_err.store(0, DEFAULT_ORDERING);
133    }
134}
135
136#[cfg(test)]
137mod t_smartmodule_metrics {
138
139    #[test]
140    fn test_metrics() {
141        use std::time::Duration;
142        use super::SmartModuleChainMetrics;
143
144        let sm_names = vec!["module1".to_string(), "module2".to_string()];
145        let metrics = SmartModuleChainMetrics::new(&sm_names);
146        let elapsed = Duration::from_millis(100);
147        let fuel = 100;
148        metrics.add_fuel_used(fuel, elapsed);
149
150        assert_eq!(metrics.cpu_ms(), 100);
151        assert_eq!(
152            metrics.smartmodule_names(),
153            &vec!["module1".to_string(), "module2".to_string()]
154        );
155
156        let out = serde_json::to_string(&metrics).expect("serialize");
157        println!("metrics2: {:?}", out);
158    }
159
160    #[test]
161    fn last_version() {
162        use super::SmartModuleChainMetrics;
163
164        // previous version, no cpu_ms
165        let input =
166            r#"{"bytes_in":0,"records_out":0,"invocation_count":0,"fuel_used":0,"records_err":0}"#;
167        let metrics: SmartModuleChainMetrics = serde_json::from_str(input).expect("deserialize");
168        assert_eq!(metrics.cpu_ms(), 0);
169
170        // check behavior w/ extra property
171        let input = r#"{"bytes_in":0,"records_out":0,"invocation_count":0,"fuel_used":0, "extra": 1, "records_err": 0}"#;
172        let metrics: SmartModuleChainMetrics =
173            serde_json::from_str(input).expect("deserialize with extra");
174        assert_eq!(metrics.cpu_ms(), 0);
175    }
176}