1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
//! # High Roller
//!
//! This `no_std` library includes tools for tracking rolling-window
//! statistics in latency-sensitive systems. The motivating case was
//! reporting downsampled performance telemetry in embedded applications,
//! but it's hopefully useful for other domains as well.
//!
//! This crate contains three members:
//! - Rolling Max: tracks the greatest value in a fixed-size window.
//! - Rolling Sum: tracks the sum of entries in a fixed-size window.
//! - Decimal32: a 32-bit fixed-precision decimal type. Pairs with `RollingSum`
//! when float precision is needed. Native floating point types are incompatible
//! with `RollingSum`'s overflow recovery mechanism.
//!
//! This crate has the following design motivations:
//! - Algorithmic optimality: Max and overflow-resilient Sum expose asymptotically
//! optimal operations.
//! - Performance orientation: no_std, no heap allocs, and a performance-aware
//! approach to implementation. Demonstrated performance improvement contributions
//! are also very welcome.
//! - Code simplicity: this crate has more lines of docs than actual code. When feature
//! availability and simplicity collide, the latter is chosen.
//!
//! # Example
//!
//! The example below shows how `high_roller` could be used to track
//! and publish request latency telemetry in an application with
//! scheduled ticks and structured I/O patterns.
//!
//! The example expects a request every 1/3 ticks, publishes telemetry
//! every 100 ticks, and tracks a window of 1000 ticks.
//!
//! ```rust
//! use core::cmp::Reverse;
//!
//! use high_roller::decimal::D5;
//! use high_roller::rolling_max::RollingMax;
//! use high_roller::rolling_sum::RollingSum;
//!
//! /// Track a rolling window of this many ticks.
//! const WINDOW: usize = 1000;
//!
//! /// Expect a request every 1 / 3 ticks.
//! const EXPECTED_INTERVAL: u32 = 3;
//!
//! /// Emits telemetry on the rolling window of 1000 ticks.
//! /// Emitted every 100 ticks.
//! #[allow(unused)]
//! struct TelemetryMsg {
//! /// The maximum number of ticks between requests.
//! max_latency: Option<u32>,
//!
//! /// The minimum number of ticks between requests.
//! min_latency: Option<u32>,
//!
//! /// Root Mean Square Error of request latency from what is expected.
//! rmse: Option<D5>,
//! }
//!
//! let mut io = IoLayer::new();
//! let mut telemetry = Telemetry::default();
//!
//! while io.tick() {
//! let req = io.next_request();
//! if let Some(req) = &req {
//! process_request(req);
//! }
//! telemetry.log_tick(req.is_some());
//!
//! if io.count % 100 == 0 {
//! let max_latency = telemetry.max_latency_ticks.max().copied();
//! let min_latency = telemetry.min_latency_ticks.max().map(|m| m.0);
//! let rmse = {
//! let sum_sq = telemetry.rmse_acc.total().copied().map(D5::get);
//! let sample_ct = telemetry.rmse_samples.total().copied().unwrap_or(0);
//! sum_sq.and_then(|sum_sq| {
//! (sample_ct != 0)
//! .then(|| (sum_sq / sample_ct as f64).sqrt())
//! .map(D5::cast)
//! })
//! };
//!
//! io.log_telemetry(TelemetryMsg {
//! max_latency,
//! min_latency,
//! rmse,
//! });
//! }
//! }
//!
//! /// An accumulator for dynamic system telemetry.
//! #[derive(Default)]
//! struct Telemetry {
//! tick: u32,
//! last_req_tick: u32,
//! rmse_acc: RollingSum<D5, WINDOW>,
//! rmse_samples: RollingSum<u32, WINDOW>,
//! max_latency_ticks: RollingMax<u32, WINDOW>,
//! min_latency_ticks: RollingMax<Reverse<u32>, WINDOW>,
//! }
//!
//! impl Telemetry {
//! /// Call this once every tick to log statistics based on
//! /// whether a request was received.
//! fn log_tick(&mut self, received_req: bool) {
//! self.tick = self.tick.wrapping_add(1);
//!
//! if !received_req {
//! self.max_latency_ticks.push(0);
//! self.min_latency_ticks.push(Reverse(u32::MAX));
//! self.rmse_acc.add(D5::ZERO);
//! self.rmse_samples.add(0);
//! return;
//! }
//!
//! let interval = self
//! .tick
//! .checked_sub(self.last_req_tick)
//! .expect("irrational last_req");
//! self.last_req_tick = self.tick;
//!
//! // RMSE = sqrt(mean(sq_err)).
//! // Saturate worst-case error at `D5::MAX`.
//! let sq_err = D5::checked((interval as f64 - EXPECTED_INTERVAL as f64).powf(2.))
//! .unwrap_or(D5::MAX);
//!
//! self.max_latency_ticks.push(interval);
//! self.min_latency_ticks.push(Reverse(interval));
//! self.rmse_acc.add(sq_err);
//! self.rmse_samples.add(1);
//! }
//! }
//!
//! /// A dummy I/O layer.
//! struct IoLayer {
//! rng: rand::rngs::ThreadRng,
//! dist: rand::distr::Bernoulli,
//! // How many ticks this contrived IO stack will sustain. Otherwise the
//! // example would run forever.
//! count: usize,
//! }
//!
//! impl IoLayer {
//! /// Creates a new IoLayer instance. A real app presumably loops forever,
//! /// but this dummy stack self-destructs after a certain number of ticks.
//! fn new() -> Self {
//! Self {
//! rng: rand::rng(),
//! dist: rand::distr::Bernoulli::from_ratio(1, 3).expect("good range"),
//! count: 10_000,
//! }
//! }
//!
//! /// Returns Some(Request) if one was received and None if not.
//! fn next_request(&mut self) -> Option<Request> {
//! use rand::distr::Distribution;
//! self.dist.sample(&mut self.rng).then_some(Request)
//! }
//!
//! /// In a real system, this would implement some timed tick mechanism.
//! /// Here it's just a pass through. Returns false if the example should exit.
//! fn tick(&mut self) -> bool {
//! let prev = self.count;
//! self.count = prev.saturating_sub(1);
//! prev != 0
//! }
//!
//! /// Pushes a telemetry message into some structured logging pipeline.
//! fn log_telemetry(&self, msg: TelemetryMsg) {
//! core::hint::black_box((&self, msg));
//! }
//! }
//!
//! struct Request;
//!
//! fn process_request(req: &Request) {
//! core::hint::black_box(req);
//! }
//!
//! ```