fluvio_smartengine/engine/
metrics.rs1use std::cmp::max;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::time::Duration;
4
5use serde::{Deserialize, Serialize};
6
7const DEFAULT_ORDERING: Ordering = Ordering::Relaxed;
8
9#[derive(Serialize, Deserialize, Debug)]
10pub struct SmartModuleChainMetrics {
11 bytes_in: AtomicU64,
12 records_out: AtomicU64,
13 records_err: AtomicU64,
14 invocation_count: AtomicU64,
15 fuel_used: AtomicU64,
16 #[serde(default)]
19 cpu_ms: AtomicU64,
20 #[serde(default)]
22 smartmodule_names: Vec<String>,
23}
24
25impl Clone for SmartModuleChainMetrics {
26 fn clone(&self) -> Self {
27 Self {
28 bytes_in: AtomicU64::new(self.bytes_in.load(DEFAULT_ORDERING)),
29 records_out: AtomicU64::new(self.records_out.load(DEFAULT_ORDERING)),
30 records_err: AtomicU64::new(self.records_err.load(DEFAULT_ORDERING)),
31 invocation_count: AtomicU64::new(self.invocation_count.load(DEFAULT_ORDERING)),
32 fuel_used: AtomicU64::new(self.fuel_used.load(DEFAULT_ORDERING)),
33 cpu_ms: AtomicU64::new(self.cpu_ms.load(DEFAULT_ORDERING)),
34 smartmodule_names: self.smartmodule_names.clone(),
35 }
36 }
37}
38
39impl Default for SmartModuleChainMetrics {
40 fn default() -> Self {
41 Self::new(&[])
42 }
43}
44
45impl SmartModuleChainMetrics {
46 pub fn new(names: &[String]) -> Self {
49 Self {
50 bytes_in: AtomicU64::new(0),
51 records_out: AtomicU64::new(0),
52 records_err: AtomicU64::new(0),
53 invocation_count: AtomicU64::new(0),
54 fuel_used: AtomicU64::new(0),
55 cpu_ms: AtomicU64::new(0),
56 smartmodule_names: names.to_vec(),
57 }
58 }
59
60 pub fn add_bytes_in(&self, value: u64) {
61 self.bytes_in.fetch_add(value, DEFAULT_ORDERING);
62 }
63
64 pub fn add_invocation_count(&self, value: u64) {
65 self.invocation_count.fetch_add(value, DEFAULT_ORDERING);
66 }
67
68 pub fn add_records_out(&self, value: u64) {
69 self.records_out.fetch_add(value, DEFAULT_ORDERING);
70 }
71
72 pub fn add_records_err(&self, value: u64) {
73 self.records_err.fetch_add(value, DEFAULT_ORDERING);
74 }
75
76 pub fn add_fuel_used(&self, fuel: u64, cpu_elapsed: Duration) {
77 let cpu_ms = cpu_elapsed.as_millis() as u64;
78 self.cpu_ms.fetch_add(max(cpu_ms, 1), DEFAULT_ORDERING);
79 self.fuel_used.fetch_add(fuel, DEFAULT_ORDERING);
80 }
81
82 pub fn bytes_in(&self) -> u64 {
83 self.bytes_in.load(DEFAULT_ORDERING)
84 }
85
86 pub fn records_out(&self) -> u64 {
87 self.records_out.load(DEFAULT_ORDERING)
88 }
89
90 pub fn fuel_used(&self) -> u64 {
91 self.fuel_used.load(DEFAULT_ORDERING)
92 }
93
94 pub fn invocation_count(&self) -> u64 {
95 self.invocation_count.load(DEFAULT_ORDERING)
96 }
97
98 pub fn cpu_ms(&self) -> u64 {
99 self.cpu_ms.load(DEFAULT_ORDERING)
100 }
101
102 pub fn records_err(&self) -> u64 {
103 self.records_err.load(DEFAULT_ORDERING)
104 }
105
106 pub fn smartmodule_names(&self) -> &Vec<String> {
107 &self.smartmodule_names
108 }
109
110 pub fn append(&self, other: &SmartModuleChainMetrics) {
111 self.bytes_in
112 .fetch_add(other.bytes_in.load(DEFAULT_ORDERING), DEFAULT_ORDERING);
113 self.records_out
114 .fetch_add(other.records_out.load(DEFAULT_ORDERING), DEFAULT_ORDERING);
115 self.fuel_used
116 .fetch_add(other.fuel_used.load(DEFAULT_ORDERING), DEFAULT_ORDERING);
117 self.cpu_ms
118 .fetch_add(other.cpu_ms.load(DEFAULT_ORDERING), DEFAULT_ORDERING);
119 self.invocation_count.fetch_add(
120 other.invocation_count.load(DEFAULT_ORDERING),
121 DEFAULT_ORDERING,
122 );
123 self.records_err
124 .fetch_add(other.records_err.load(DEFAULT_ORDERING), DEFAULT_ORDERING);
125 }
126 pub fn reset(&self) {
127 self.bytes_in.store(0, DEFAULT_ORDERING);
128 self.records_out.store(0, DEFAULT_ORDERING);
129 self.fuel_used.store(0, DEFAULT_ORDERING);
130 self.cpu_ms.store(0, DEFAULT_ORDERING);
131 self.invocation_count.store(0, DEFAULT_ORDERING);
132 self.records_err.store(0, DEFAULT_ORDERING);
133 }
134}
135
136#[cfg(test)]
137mod t_smartmodule_metrics {
138
139 #[test]
140 fn test_metrics() {
141 use std::time::Duration;
142 use super::SmartModuleChainMetrics;
143
144 let sm_names = vec!["module1".to_string(), "module2".to_string()];
145 let metrics = SmartModuleChainMetrics::new(&sm_names);
146 let elapsed = Duration::from_millis(100);
147 let fuel = 100;
148 metrics.add_fuel_used(fuel, elapsed);
149
150 assert_eq!(metrics.cpu_ms(), 100);
151 assert_eq!(
152 metrics.smartmodule_names(),
153 &vec!["module1".to_string(), "module2".to_string()]
154 );
155
156 let out = serde_json::to_string(&metrics).expect("serialize");
157 println!("metrics2: {:?}", out);
158 }
159
160 #[test]
161 fn last_version() {
162 use super::SmartModuleChainMetrics;
163
164 let input =
166 r#"{"bytes_in":0,"records_out":0,"invocation_count":0,"fuel_used":0,"records_err":0}"#;
167 let metrics: SmartModuleChainMetrics = serde_json::from_str(input).expect("deserialize");
168 assert_eq!(metrics.cpu_ms(), 0);
169
170 let input = r#"{"bytes_in":0,"records_out":0,"invocation_count":0,"fuel_used":0, "extra": 1, "records_err": 0}"#;
172 let metrics: SmartModuleChainMetrics =
173 serde_json::from_str(input).expect("deserialize with extra");
174 assert_eq!(metrics.cpu_ms(), 0);
175 }
176}