fluvio_smartengine/engine/
metrics.rs1use std::cmp::max;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::time::Duration;
4
5use serde::{Deserialize, Serialize};
6
7const DEFAULT_ORDERING: Ordering = Ordering::Relaxed;
8
9#[derive(Serialize, Deserialize, Default, Debug)]
10pub struct SmartModuleChainMetrics {
11 bytes_in: AtomicU64,
12 records_out: AtomicU64,
13 invocation_count: AtomicU64,
14 fuel_used: AtomicU64,
15 #[serde(default)]
18 cpu_ms: AtomicU64,
19}
20
21impl SmartModuleChainMetrics {
22 pub fn add_bytes_in(&self, value: u64) {
23 self.bytes_in.fetch_add(value, DEFAULT_ORDERING);
24 self.invocation_count.fetch_add(1, DEFAULT_ORDERING);
25 }
26
27 pub fn add_records_out(&self, value: u64) {
28 self.records_out.fetch_add(value, DEFAULT_ORDERING);
29 }
30
31 pub fn add_fuel_used(&self, fuel: u64, cpu_elapsed: Duration) {
32 let cpu_ms = cpu_elapsed.as_millis() as u64;
33 self.cpu_ms.fetch_add(max(cpu_ms, 1), DEFAULT_ORDERING);
34 self.fuel_used.fetch_add(fuel, DEFAULT_ORDERING);
35 }
36
37 pub fn bytes_in(&self) -> u64 {
38 self.bytes_in.load(DEFAULT_ORDERING)
39 }
40
41 pub fn records_out(&self) -> u64 {
42 self.records_out.load(DEFAULT_ORDERING)
43 }
44
45 pub fn fuel_used(&self) -> u64 {
46 self.fuel_used.load(DEFAULT_ORDERING)
47 }
48
49 pub fn invocation_count(&self) -> u64 {
50 self.invocation_count.load(DEFAULT_ORDERING)
51 }
52
53 pub fn cpu_ms(&self) -> u64 {
54 self.cpu_ms.load(DEFAULT_ORDERING)
55 }
56}
57
58#[cfg(test)]
59mod t_smartmodule_metrics {
60
61 #[test]
62 fn test_metrics() {
63 use std::time::Duration;
64 use super::SmartModuleChainMetrics;
65
66 let metrics = SmartModuleChainMetrics::default();
67 let elapsed = Duration::from_millis(100);
68 let fuel = 100;
69 metrics.add_fuel_used(fuel, elapsed);
70
71 assert_eq!(metrics.cpu_ms(), 100);
72
73 let _out = serde_json::to_string(&metrics).expect("serialize");
74 }
75
76 #[test]
77 fn last_version() {
78 use super::SmartModuleChainMetrics;
79
80 let input = r#"{"bytes_in":0,"records_out":0,"invocation_count":0,"fuel_used":0}"#;
82 let metrics: SmartModuleChainMetrics = serde_json::from_str(input).expect("deserialize");
83 assert_eq!(metrics.cpu_ms(), 0);
84
85 let input =
87 r#"{"bytes_in":0,"records_out":0,"invocation_count":0,"fuel_used":0, "extra": 1}"#;
88 let metrics: SmartModuleChainMetrics =
89 serde_json::from_str(input).expect("deserialize with extra");
90 assert_eq!(metrics.cpu_ms(), 0);
91 }
92}