Skip to main content

ursula_runtime/
metrics.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::time::Instant;
4
5use ursula_shard::{BucketStreamId, CoreId, RaftGroupId, ShardPlacement};
6
7use crate::engine::{GroupEngine, GroupEngineError};
8use crate::error::RuntimeError;
9use crate::request::{AppendBatchRequest, ColdWriteAdmission};
10
11pub(crate) const GROUP_ACTOR_MAX_WRITE_BATCH: usize = 64;
12pub(crate) const COLD_FLUSH_GROUP_BATCH_MAX_CHUNKS: usize = 64;
13
14#[derive(Debug, Clone)]
15pub struct RuntimeMetrics {
16    pub(crate) inner: Arc<RuntimeMetricsInner>,
17}
18
19impl RuntimeMetrics {
20    pub fn snapshot(&self) -> RuntimeMetricsSnapshot {
21        let per_core_appends = self
22            .inner
23            .per_core_appends
24            .iter()
25            .map(PaddedAtomicU64::load_relaxed)
26            .collect::<Vec<_>>();
27        let accepted_appends = per_core_appends.iter().sum();
28        let per_group_appends = self
29            .inner
30            .per_group_appends
31            .iter()
32            .map(PaddedAtomicU64::load_relaxed)
33            .collect();
34        let per_core_applied_mutations = self
35            .inner
36            .per_core_applied_mutations
37            .iter()
38            .map(PaddedAtomicU64::load_relaxed)
39            .collect::<Vec<_>>();
40        let applied_mutations = per_core_applied_mutations.iter().sum();
41        let per_group_applied_mutations = self
42            .inner
43            .per_group_applied_mutations
44            .iter()
45            .map(PaddedAtomicU64::load_relaxed)
46            .collect();
47        let per_core_mutation_apply_ns = self
48            .inner
49            .per_core_mutation_apply_ns
50            .iter()
51            .map(PaddedAtomicU64::load_relaxed)
52            .collect::<Vec<_>>();
53        let mutation_apply_ns = per_core_mutation_apply_ns.iter().sum();
54        let per_group_mutation_apply_ns = self
55            .inner
56            .per_group_mutation_apply_ns
57            .iter()
58            .map(PaddedAtomicU64::load_relaxed)
59            .collect();
60        let per_core_group_lock_wait_ns = self
61            .inner
62            .per_core_group_lock_wait_ns
63            .iter()
64            .map(PaddedAtomicU64::load_relaxed)
65            .collect::<Vec<_>>();
66        let group_lock_wait_ns = per_core_group_lock_wait_ns.iter().sum();
67        let per_group_group_lock_wait_ns = self
68            .inner
69            .per_group_group_lock_wait_ns
70            .iter()
71            .map(PaddedAtomicU64::load_relaxed)
72            .collect();
73        let per_core_group_engine_exec_ns = self
74            .inner
75            .per_core_group_engine_exec_ns
76            .iter()
77            .map(PaddedAtomicU64::load_relaxed)
78            .collect::<Vec<_>>();
79        let group_engine_exec_ns = per_core_group_engine_exec_ns.iter().sum();
80        let per_group_group_engine_exec_ns = self
81            .inner
82            .per_group_group_engine_exec_ns
83            .iter()
84            .map(PaddedAtomicU64::load_relaxed)
85            .collect();
86        let per_group_group_mailbox_depth = self
87            .inner
88            .per_group_group_mailbox_depth
89            .iter()
90            .map(PaddedAtomicU64::load_relaxed)
91            .collect::<Vec<_>>();
92        let group_mailbox_depth = per_group_group_mailbox_depth.iter().sum();
93        let per_group_group_mailbox_max_depth = self
94            .inner
95            .per_group_group_mailbox_max_depth
96            .iter()
97            .map(PaddedAtomicU64::load_relaxed)
98            .collect::<Vec<_>>();
99        let group_mailbox_max_depth = per_group_group_mailbox_max_depth
100            .iter()
101            .copied()
102            .max()
103            .unwrap_or(0);
104        let per_group_group_mailbox_full_events = self
105            .inner
106            .per_group_group_mailbox_full_events
107            .iter()
108            .map(PaddedAtomicU64::load_relaxed)
109            .collect::<Vec<_>>();
110        let group_mailbox_full_events = per_group_group_mailbox_full_events.iter().sum();
111        let per_core_raft_write_many_batches = self
112            .inner
113            .per_core_raft_write_many_batches
114            .iter()
115            .map(PaddedAtomicU64::load_relaxed)
116            .collect::<Vec<_>>();
117        let raft_write_many_batches = per_core_raft_write_many_batches.iter().sum();
118        let per_group_raft_write_many_batches = self
119            .inner
120            .per_group_raft_write_many_batches
121            .iter()
122            .map(PaddedAtomicU64::load_relaxed)
123            .collect();
124        let per_core_raft_write_many_commands = self
125            .inner
126            .per_core_raft_write_many_commands
127            .iter()
128            .map(PaddedAtomicU64::load_relaxed)
129            .collect::<Vec<_>>();
130        let raft_write_many_commands = per_core_raft_write_many_commands.iter().sum();
131        let per_group_raft_write_many_commands = self
132            .inner
133            .per_group_raft_write_many_commands
134            .iter()
135            .map(PaddedAtomicU64::load_relaxed)
136            .collect();
137        let per_core_raft_write_many_logical_commands = self
138            .inner
139            .per_core_raft_write_many_logical_commands
140            .iter()
141            .map(PaddedAtomicU64::load_relaxed)
142            .collect::<Vec<_>>();
143        let raft_write_many_logical_commands =
144            per_core_raft_write_many_logical_commands.iter().sum();
145        let per_group_raft_write_many_logical_commands = self
146            .inner
147            .per_group_raft_write_many_logical_commands
148            .iter()
149            .map(PaddedAtomicU64::load_relaxed)
150            .collect();
151        let per_core_raft_write_many_responses = self
152            .inner
153            .per_core_raft_write_many_responses
154            .iter()
155            .map(PaddedAtomicU64::load_relaxed)
156            .collect::<Vec<_>>();
157        let raft_write_many_responses = per_core_raft_write_many_responses.iter().sum();
158        let per_group_raft_write_many_responses = self
159            .inner
160            .per_group_raft_write_many_responses
161            .iter()
162            .map(PaddedAtomicU64::load_relaxed)
163            .collect();
164        let per_core_raft_write_many_submit_ns = self
165            .inner
166            .per_core_raft_write_many_submit_ns
167            .iter()
168            .map(PaddedAtomicU64::load_relaxed)
169            .collect::<Vec<_>>();
170        let raft_write_many_submit_ns = per_core_raft_write_many_submit_ns.iter().sum();
171        let per_group_raft_write_many_submit_ns = self
172            .inner
173            .per_group_raft_write_many_submit_ns
174            .iter()
175            .map(PaddedAtomicU64::load_relaxed)
176            .collect();
177        let per_core_raft_write_many_response_ns = self
178            .inner
179            .per_core_raft_write_many_response_ns
180            .iter()
181            .map(PaddedAtomicU64::load_relaxed)
182            .collect::<Vec<_>>();
183        let raft_write_many_response_ns = per_core_raft_write_many_response_ns.iter().sum();
184        let per_group_raft_write_many_response_ns = self
185            .inner
186            .per_group_raft_write_many_response_ns
187            .iter()
188            .map(PaddedAtomicU64::load_relaxed)
189            .collect();
190        let per_core_raft_apply_entries = self
191            .inner
192            .per_core_raft_apply_entries
193            .iter()
194            .map(PaddedAtomicU64::load_relaxed)
195            .collect::<Vec<_>>();
196        let raft_apply_entries = per_core_raft_apply_entries.iter().sum();
197        let per_group_raft_apply_entries = self
198            .inner
199            .per_group_raft_apply_entries
200            .iter()
201            .map(PaddedAtomicU64::load_relaxed)
202            .collect();
203        let per_core_raft_apply_ns = self
204            .inner
205            .per_core_raft_apply_ns
206            .iter()
207            .map(PaddedAtomicU64::load_relaxed)
208            .collect::<Vec<_>>();
209        let raft_apply_ns = per_core_raft_apply_ns.iter().sum();
210        let per_group_raft_apply_ns = self
211            .inner
212            .per_group_raft_apply_ns
213            .iter()
214            .map(PaddedAtomicU64::load_relaxed)
215            .collect();
216        let per_core_live_read_waiters = self
217            .inner
218            .per_core_live_read_waiters
219            .iter()
220            .map(PaddedAtomicU64::load_relaxed)
221            .collect::<Vec<_>>();
222        let live_read_waiters = per_core_live_read_waiters.iter().sum();
223        let per_core_live_read_backpressure_events = self
224            .inner
225            .per_core_live_read_backpressure_events
226            .iter()
227            .map(PaddedAtomicU64::load_relaxed)
228            .collect::<Vec<_>>();
229        let live_read_backpressure_events = per_core_live_read_backpressure_events.iter().sum();
230        let per_core_routed_requests = self
231            .inner
232            .per_core_routed_requests
233            .iter()
234            .map(PaddedAtomicU64::load_relaxed)
235            .collect::<Vec<_>>();
236        let routed_requests = per_core_routed_requests.iter().sum();
237        let per_core_mailbox_send_wait_ns = self
238            .inner
239            .per_core_mailbox_send_wait_ns
240            .iter()
241            .map(PaddedAtomicU64::load_relaxed)
242            .collect::<Vec<_>>();
243        let mailbox_send_wait_ns = per_core_mailbox_send_wait_ns.iter().sum();
244        let per_core_mailbox_full_events = self
245            .inner
246            .per_core_mailbox_full_events
247            .iter()
248            .map(PaddedAtomicU64::load_relaxed)
249            .collect::<Vec<_>>();
250        let mailbox_full_events = per_core_mailbox_full_events.iter().sum();
251        let per_core_wal_batches = self
252            .inner
253            .per_core_wal_batches
254            .iter()
255            .map(PaddedAtomicU64::load_relaxed)
256            .collect::<Vec<_>>();
257        let wal_batches = per_core_wal_batches.iter().sum();
258        let per_group_wal_batches = self
259            .inner
260            .per_group_wal_batches
261            .iter()
262            .map(PaddedAtomicU64::load_relaxed)
263            .collect();
264        let per_core_wal_records = self
265            .inner
266            .per_core_wal_records
267            .iter()
268            .map(PaddedAtomicU64::load_relaxed)
269            .collect::<Vec<_>>();
270        let wal_records = per_core_wal_records.iter().sum();
271        let per_group_wal_records = self
272            .inner
273            .per_group_wal_records
274            .iter()
275            .map(PaddedAtomicU64::load_relaxed)
276            .collect();
277        let per_core_wal_write_ns = self
278            .inner
279            .per_core_wal_write_ns
280            .iter()
281            .map(PaddedAtomicU64::load_relaxed)
282            .collect::<Vec<_>>();
283        let wal_write_ns = per_core_wal_write_ns.iter().sum();
284        let per_group_wal_write_ns = self
285            .inner
286            .per_group_wal_write_ns
287            .iter()
288            .map(PaddedAtomicU64::load_relaxed)
289            .collect();
290        let per_core_wal_sync_ns = self
291            .inner
292            .per_core_wal_sync_ns
293            .iter()
294            .map(PaddedAtomicU64::load_relaxed)
295            .collect::<Vec<_>>();
296        let wal_sync_ns = per_core_wal_sync_ns.iter().sum();
297        let per_group_wal_sync_ns = self
298            .inner
299            .per_group_wal_sync_ns
300            .iter()
301            .map(PaddedAtomicU64::load_relaxed)
302            .collect();
303        let cold_flush_uploads = self.inner.cold_flush_uploads.load_relaxed();
304        let cold_flush_upload_bytes = self.inner.cold_flush_upload_bytes.load_relaxed();
305        let cold_flush_upload_ns = self.inner.cold_flush_upload_ns.load_relaxed();
306        let cold_flush_publishes = self.inner.cold_flush_publishes.load_relaxed();
307        let cold_flush_publish_bytes = self.inner.cold_flush_publish_bytes.load_relaxed();
308        let cold_flush_publish_ns = self.inner.cold_flush_publish_ns.load_relaxed();
309        let cold_orphan_cleanup_attempts = self.inner.cold_orphan_cleanup_attempts.load_relaxed();
310        let cold_orphan_cleanup_errors = self.inner.cold_orphan_cleanup_errors.load_relaxed();
311        let cold_orphan_bytes = self.inner.cold_orphan_bytes.load_relaxed();
312        let per_group_cold_hot_bytes = self
313            .inner
314            .per_group_cold_hot_bytes
315            .iter()
316            .map(PaddedAtomicU64::load_relaxed)
317            .collect::<Vec<_>>();
318        let cold_hot_bytes = per_group_cold_hot_bytes.iter().sum();
319        let per_group_cold_hot_bytes_max = self
320            .inner
321            .per_group_cold_hot_bytes_max
322            .iter()
323            .map(PaddedAtomicU64::load_relaxed)
324            .collect::<Vec<_>>();
325        let cold_hot_group_bytes_max = per_group_cold_hot_bytes_max
326            .iter()
327            .copied()
328            .max()
329            .unwrap_or(0);
330        let cold_hot_stream_bytes_max = self.inner.cold_hot_stream_bytes_max.load_relaxed();
331        let per_core_cold_backpressure_events = self
332            .inner
333            .per_core_cold_backpressure_events
334            .iter()
335            .map(PaddedAtomicU64::load_relaxed)
336            .collect::<Vec<_>>();
337        let cold_backpressure_events = per_core_cold_backpressure_events.iter().sum();
338        let per_group_cold_backpressure_events = self
339            .inner
340            .per_group_cold_backpressure_events
341            .iter()
342            .map(PaddedAtomicU64::load_relaxed)
343            .collect();
344        let cold_backpressure_bytes = self.inner.cold_backpressure_bytes.load_relaxed();
345
346        RuntimeMetricsSnapshot {
347            accepted_appends,
348            per_core_appends,
349            per_group_appends,
350            applied_mutations,
351            per_core_applied_mutations,
352            per_group_applied_mutations,
353            mutation_apply_ns,
354            per_core_mutation_apply_ns,
355            per_group_mutation_apply_ns,
356            group_lock_wait_ns,
357            per_core_group_lock_wait_ns,
358            per_group_group_lock_wait_ns,
359            group_engine_exec_ns,
360            per_core_group_engine_exec_ns,
361            per_group_group_engine_exec_ns,
362            group_mailbox_depth,
363            per_group_group_mailbox_depth,
364            group_mailbox_max_depth,
365            per_group_group_mailbox_max_depth,
366            group_mailbox_full_events,
367            per_group_group_mailbox_full_events,
368            raft_write_many_batches,
369            per_core_raft_write_many_batches,
370            per_group_raft_write_many_batches,
371            raft_write_many_commands,
372            per_core_raft_write_many_commands,
373            per_group_raft_write_many_commands,
374            raft_write_many_logical_commands,
375            per_core_raft_write_many_logical_commands,
376            per_group_raft_write_many_logical_commands,
377            raft_write_many_responses,
378            per_core_raft_write_many_responses,
379            per_group_raft_write_many_responses,
380            raft_write_many_submit_ns,
381            per_core_raft_write_many_submit_ns,
382            per_group_raft_write_many_submit_ns,
383            raft_write_many_response_ns,
384            per_core_raft_write_many_response_ns,
385            per_group_raft_write_many_response_ns,
386            raft_apply_entries,
387            per_core_raft_apply_entries,
388            per_group_raft_apply_entries,
389            raft_apply_ns,
390            per_core_raft_apply_ns,
391            per_group_raft_apply_ns,
392            live_read_waiters,
393            per_core_live_read_waiters,
394            live_read_backpressure_events,
395            per_core_live_read_backpressure_events,
396            routed_requests,
397            per_core_routed_requests,
398            mailbox_send_wait_ns,
399            per_core_mailbox_send_wait_ns,
400            mailbox_full_events,
401            per_core_mailbox_full_events,
402            wal_batches,
403            per_core_wal_batches,
404            per_group_wal_batches,
405            wal_records,
406            per_core_wal_records,
407            per_group_wal_records,
408            wal_write_ns,
409            per_core_wal_write_ns,
410            per_group_wal_write_ns,
411            wal_sync_ns,
412            per_core_wal_sync_ns,
413            per_group_wal_sync_ns,
414            cold_flush_uploads,
415            cold_flush_upload_bytes,
416            cold_flush_upload_ns,
417            cold_flush_publishes,
418            cold_flush_publish_bytes,
419            cold_flush_publish_ns,
420            cold_orphan_cleanup_attempts,
421            cold_orphan_cleanup_errors,
422            cold_orphan_bytes,
423            cold_hot_bytes,
424            per_group_cold_hot_bytes,
425            cold_hot_group_bytes_max,
426            per_group_cold_hot_bytes_max,
427            cold_hot_stream_bytes_max,
428            cold_backpressure_events,
429            per_core_cold_backpressure_events,
430            per_group_cold_backpressure_events,
431            cold_backpressure_bytes,
432        }
433    }
434}
435
436#[derive(Debug, Clone, PartialEq, Eq)]
437pub struct RuntimeMetricsSnapshot {
438    pub accepted_appends: u64,
439    pub per_core_appends: Vec<u64>,
440    pub per_group_appends: Vec<u64>,
441    pub applied_mutations: u64,
442    pub per_core_applied_mutations: Vec<u64>,
443    pub per_group_applied_mutations: Vec<u64>,
444    pub mutation_apply_ns: u64,
445    pub per_core_mutation_apply_ns: Vec<u64>,
446    pub per_group_mutation_apply_ns: Vec<u64>,
447    pub group_lock_wait_ns: u64,
448    pub per_core_group_lock_wait_ns: Vec<u64>,
449    pub per_group_group_lock_wait_ns: Vec<u64>,
450    pub group_engine_exec_ns: u64,
451    pub per_core_group_engine_exec_ns: Vec<u64>,
452    pub per_group_group_engine_exec_ns: Vec<u64>,
453    pub group_mailbox_depth: u64,
454    pub per_group_group_mailbox_depth: Vec<u64>,
455    pub group_mailbox_max_depth: u64,
456    pub per_group_group_mailbox_max_depth: Vec<u64>,
457    pub group_mailbox_full_events: u64,
458    pub per_group_group_mailbox_full_events: Vec<u64>,
459    pub raft_write_many_batches: u64,
460    pub per_core_raft_write_many_batches: Vec<u64>,
461    pub per_group_raft_write_many_batches: Vec<u64>,
462    pub raft_write_many_commands: u64,
463    pub per_core_raft_write_many_commands: Vec<u64>,
464    pub per_group_raft_write_many_commands: Vec<u64>,
465    pub raft_write_many_logical_commands: u64,
466    pub per_core_raft_write_many_logical_commands: Vec<u64>,
467    pub per_group_raft_write_many_logical_commands: Vec<u64>,
468    pub raft_write_many_responses: u64,
469    pub per_core_raft_write_many_responses: Vec<u64>,
470    pub per_group_raft_write_many_responses: Vec<u64>,
471    pub raft_write_many_submit_ns: u64,
472    pub per_core_raft_write_many_submit_ns: Vec<u64>,
473    pub per_group_raft_write_many_submit_ns: Vec<u64>,
474    pub raft_write_many_response_ns: u64,
475    pub per_core_raft_write_many_response_ns: Vec<u64>,
476    pub per_group_raft_write_many_response_ns: Vec<u64>,
477    pub raft_apply_entries: u64,
478    pub per_core_raft_apply_entries: Vec<u64>,
479    pub per_group_raft_apply_entries: Vec<u64>,
480    pub raft_apply_ns: u64,
481    pub per_core_raft_apply_ns: Vec<u64>,
482    pub per_group_raft_apply_ns: Vec<u64>,
483    pub live_read_waiters: u64,
484    pub per_core_live_read_waiters: Vec<u64>,
485    pub live_read_backpressure_events: u64,
486    pub per_core_live_read_backpressure_events: Vec<u64>,
487    pub routed_requests: u64,
488    pub per_core_routed_requests: Vec<u64>,
489    pub mailbox_send_wait_ns: u64,
490    pub per_core_mailbox_send_wait_ns: Vec<u64>,
491    pub mailbox_full_events: u64,
492    pub per_core_mailbox_full_events: Vec<u64>,
493    pub wal_batches: u64,
494    pub per_core_wal_batches: Vec<u64>,
495    pub per_group_wal_batches: Vec<u64>,
496    pub wal_records: u64,
497    pub per_core_wal_records: Vec<u64>,
498    pub per_group_wal_records: Vec<u64>,
499    pub wal_write_ns: u64,
500    pub per_core_wal_write_ns: Vec<u64>,
501    pub per_group_wal_write_ns: Vec<u64>,
502    pub wal_sync_ns: u64,
503    pub per_core_wal_sync_ns: Vec<u64>,
504    pub per_group_wal_sync_ns: Vec<u64>,
505    pub cold_flush_uploads: u64,
506    pub cold_flush_upload_bytes: u64,
507    pub cold_flush_upload_ns: u64,
508    pub cold_flush_publishes: u64,
509    pub cold_flush_publish_bytes: u64,
510    pub cold_flush_publish_ns: u64,
511    pub cold_orphan_cleanup_attempts: u64,
512    pub cold_orphan_cleanup_errors: u64,
513    pub cold_orphan_bytes: u64,
514    pub cold_hot_bytes: u64,
515    pub per_group_cold_hot_bytes: Vec<u64>,
516    pub cold_hot_group_bytes_max: u64,
517    pub per_group_cold_hot_bytes_max: Vec<u64>,
518    pub cold_hot_stream_bytes_max: u64,
519    pub cold_backpressure_events: u64,
520    pub per_core_cold_backpressure_events: Vec<u64>,
521    pub per_group_cold_backpressure_events: Vec<u64>,
522    pub cold_backpressure_bytes: u64,
523}
524
525#[derive(Debug, Clone, PartialEq, Eq)]
526pub struct RuntimeMailboxSnapshot {
527    pub depths: Vec<usize>,
528    pub capacities: Vec<usize>,
529}
530
531#[derive(Debug)]
532pub(crate) struct RuntimeMetricsInner {
533    pub(crate) per_core_appends: Vec<PaddedAtomicU64>,
534    pub(crate) per_group_appends: Vec<PaddedAtomicU64>,
535    pub(crate) per_core_applied_mutations: Vec<PaddedAtomicU64>,
536    pub(crate) per_group_applied_mutations: Vec<PaddedAtomicU64>,
537    pub(crate) per_core_mutation_apply_ns: Vec<PaddedAtomicU64>,
538    pub(crate) per_group_mutation_apply_ns: Vec<PaddedAtomicU64>,
539    pub(crate) per_core_group_lock_wait_ns: Vec<PaddedAtomicU64>,
540    pub(crate) per_group_group_lock_wait_ns: Vec<PaddedAtomicU64>,
541    pub(crate) per_core_group_engine_exec_ns: Vec<PaddedAtomicU64>,
542    pub(crate) per_group_group_engine_exec_ns: Vec<PaddedAtomicU64>,
543    pub(crate) per_group_group_mailbox_depth: Vec<PaddedAtomicU64>,
544    pub(crate) per_group_group_mailbox_max_depth: Vec<PaddedAtomicU64>,
545    pub(crate) per_group_group_mailbox_full_events: Vec<PaddedAtomicU64>,
546    pub(crate) per_core_raft_write_many_batches: Vec<PaddedAtomicU64>,
547    pub(crate) per_group_raft_write_many_batches: Vec<PaddedAtomicU64>,
548    pub(crate) per_core_raft_write_many_commands: Vec<PaddedAtomicU64>,
549    pub(crate) per_group_raft_write_many_commands: Vec<PaddedAtomicU64>,
550    pub(crate) per_core_raft_write_many_logical_commands: Vec<PaddedAtomicU64>,
551    pub(crate) per_group_raft_write_many_logical_commands: Vec<PaddedAtomicU64>,
552    pub(crate) per_core_raft_write_many_responses: Vec<PaddedAtomicU64>,
553    pub(crate) per_group_raft_write_many_responses: Vec<PaddedAtomicU64>,
554    pub(crate) per_core_raft_write_many_submit_ns: Vec<PaddedAtomicU64>,
555    pub(crate) per_group_raft_write_many_submit_ns: Vec<PaddedAtomicU64>,
556    pub(crate) per_core_raft_write_many_response_ns: Vec<PaddedAtomicU64>,
557    pub(crate) per_group_raft_write_many_response_ns: Vec<PaddedAtomicU64>,
558    pub(crate) per_core_raft_apply_entries: Vec<PaddedAtomicU64>,
559    pub(crate) per_group_raft_apply_entries: Vec<PaddedAtomicU64>,
560    pub(crate) per_core_raft_apply_ns: Vec<PaddedAtomicU64>,
561    pub(crate) per_group_raft_apply_ns: Vec<PaddedAtomicU64>,
562    pub(crate) per_core_live_read_waiters: Vec<PaddedAtomicU64>,
563    pub(crate) per_core_live_read_backpressure_events: Vec<PaddedAtomicU64>,
564    pub(crate) per_core_routed_requests: Vec<PaddedAtomicU64>,
565    pub(crate) per_core_mailbox_send_wait_ns: Vec<PaddedAtomicU64>,
566    pub(crate) per_core_mailbox_full_events: Vec<PaddedAtomicU64>,
567    pub(crate) per_core_wal_batches: Vec<PaddedAtomicU64>,
568    pub(crate) per_group_wal_batches: Vec<PaddedAtomicU64>,
569    pub(crate) per_core_wal_records: Vec<PaddedAtomicU64>,
570    pub(crate) per_group_wal_records: Vec<PaddedAtomicU64>,
571    pub(crate) per_core_wal_write_ns: Vec<PaddedAtomicU64>,
572    pub(crate) per_group_wal_write_ns: Vec<PaddedAtomicU64>,
573    pub(crate) per_core_wal_sync_ns: Vec<PaddedAtomicU64>,
574    pub(crate) per_group_wal_sync_ns: Vec<PaddedAtomicU64>,
575    pub(crate) cold_flush_uploads: PaddedAtomicU64,
576    pub(crate) cold_flush_upload_bytes: PaddedAtomicU64,
577    pub(crate) cold_flush_upload_ns: PaddedAtomicU64,
578    pub(crate) cold_flush_publishes: PaddedAtomicU64,
579    pub(crate) cold_flush_publish_bytes: PaddedAtomicU64,
580    pub(crate) cold_flush_publish_ns: PaddedAtomicU64,
581    pub(crate) cold_orphan_cleanup_attempts: PaddedAtomicU64,
582    pub(crate) cold_orphan_cleanup_errors: PaddedAtomicU64,
583    pub(crate) cold_orphan_bytes: PaddedAtomicU64,
584    pub(crate) per_group_cold_hot_bytes: Vec<PaddedAtomicU64>,
585    pub(crate) per_group_cold_hot_bytes_max: Vec<PaddedAtomicU64>,
586    pub(crate) cold_hot_stream_bytes_max: PaddedAtomicU64,
587    pub(crate) per_core_cold_backpressure_events: Vec<PaddedAtomicU64>,
588    pub(crate) per_group_cold_backpressure_events: Vec<PaddedAtomicU64>,
589    pub(crate) cold_backpressure_bytes: PaddedAtomicU64,
590}
591
592#[derive(Debug, Clone, Copy)]
593pub(crate) struct RaftWriteManySample {
594    pub(crate) command_count: u64,
595    pub(crate) logical_command_count: u64,
596    pub(crate) response_count: u64,
597    pub(crate) submit_ns: u64,
598    pub(crate) response_ns: u64,
599}
600
601impl RuntimeMetricsInner {
602    pub(crate) fn new(core_count: usize, raft_group_count: usize) -> Self {
603        Self {
604            per_core_appends: (0..core_count).map(|_| PaddedAtomicU64::new(0)).collect(),
605            per_group_appends: (0..raft_group_count)
606                .map(|_| PaddedAtomicU64::new(0))
607                .collect(),
608            per_core_applied_mutations: (0..core_count).map(|_| PaddedAtomicU64::new(0)).collect(),
609            per_group_applied_mutations: (0..raft_group_count)
610                .map(|_| PaddedAtomicU64::new(0))
611                .collect(),
612            per_core_mutation_apply_ns: (0..core_count).map(|_| PaddedAtomicU64::new(0)).collect(),
613            per_group_mutation_apply_ns: (0..raft_group_count)
614                .map(|_| PaddedAtomicU64::new(0))
615                .collect(),
616            per_core_group_lock_wait_ns: (0..core_count).map(|_| PaddedAtomicU64::new(0)).collect(),
617            per_group_group_lock_wait_ns: (0..raft_group_count)
618                .map(|_| PaddedAtomicU64::new(0))
619                .collect(),
620            per_core_group_engine_exec_ns: (0..core_count)
621                .map(|_| PaddedAtomicU64::new(0))
622                .collect(),
623            per_group_group_engine_exec_ns: (0..raft_group_count)
624                .map(|_| PaddedAtomicU64::new(0))
625                .collect(),
626            per_group_group_mailbox_depth: (0..raft_group_count)
627                .map(|_| PaddedAtomicU64::new(0))
628                .collect(),
629            per_group_group_mailbox_max_depth: (0..raft_group_count)
630                .map(|_| PaddedAtomicU64::new(0))
631                .collect(),
632            per_group_group_mailbox_full_events: (0..raft_group_count)
633                .map(|_| PaddedAtomicU64::new(0))
634                .collect(),
635            per_core_raft_write_many_batches: (0..core_count)
636                .map(|_| PaddedAtomicU64::new(0))
637                .collect(),
638            per_group_raft_write_many_batches: (0..raft_group_count)
639                .map(|_| PaddedAtomicU64::new(0))
640                .collect(),
641            per_core_raft_write_many_commands: (0..core_count)
642                .map(|_| PaddedAtomicU64::new(0))
643                .collect(),
644            per_group_raft_write_many_commands: (0..raft_group_count)
645                .map(|_| PaddedAtomicU64::new(0))
646                .collect(),
647            per_core_raft_write_many_logical_commands: (0..core_count)
648                .map(|_| PaddedAtomicU64::new(0))
649                .collect(),
650            per_group_raft_write_many_logical_commands: (0..raft_group_count)
651                .map(|_| PaddedAtomicU64::new(0))
652                .collect(),
653            per_core_raft_write_many_responses: (0..core_count)
654                .map(|_| PaddedAtomicU64::new(0))
655                .collect(),
656            per_group_raft_write_many_responses: (0..raft_group_count)
657                .map(|_| PaddedAtomicU64::new(0))
658                .collect(),
659            per_core_raft_write_many_submit_ns: (0..core_count)
660                .map(|_| PaddedAtomicU64::new(0))
661                .collect(),
662            per_group_raft_write_many_submit_ns: (0..raft_group_count)
663                .map(|_| PaddedAtomicU64::new(0))
664                .collect(),
665            per_core_raft_write_many_response_ns: (0..core_count)
666                .map(|_| PaddedAtomicU64::new(0))
667                .collect(),
668            per_group_raft_write_many_response_ns: (0..raft_group_count)
669                .map(|_| PaddedAtomicU64::new(0))
670                .collect(),
671            per_core_raft_apply_entries: (0..core_count).map(|_| PaddedAtomicU64::new(0)).collect(),
672            per_group_raft_apply_entries: (0..raft_group_count)
673                .map(|_| PaddedAtomicU64::new(0))
674                .collect(),
675            per_core_raft_apply_ns: (0..core_count).map(|_| PaddedAtomicU64::new(0)).collect(),
676            per_group_raft_apply_ns: (0..raft_group_count)
677                .map(|_| PaddedAtomicU64::new(0))
678                .collect(),
679            per_core_live_read_waiters: (0..core_count).map(|_| PaddedAtomicU64::new(0)).collect(),
680            per_core_live_read_backpressure_events: (0..core_count)
681                .map(|_| PaddedAtomicU64::new(0))
682                .collect(),
683            per_core_routed_requests: (0..core_count).map(|_| PaddedAtomicU64::new(0)).collect(),
684            per_core_mailbox_send_wait_ns: (0..core_count)
685                .map(|_| PaddedAtomicU64::new(0))
686                .collect(),
687            per_core_mailbox_full_events: (0..core_count)
688                .map(|_| PaddedAtomicU64::new(0))
689                .collect(),
690            per_core_wal_batches: (0..core_count).map(|_| PaddedAtomicU64::new(0)).collect(),
691            per_group_wal_batches: (0..raft_group_count)
692                .map(|_| PaddedAtomicU64::new(0))
693                .collect(),
694            per_core_wal_records: (0..core_count).map(|_| PaddedAtomicU64::new(0)).collect(),
695            per_group_wal_records: (0..raft_group_count)
696                .map(|_| PaddedAtomicU64::new(0))
697                .collect(),
698            per_core_wal_write_ns: (0..core_count).map(|_| PaddedAtomicU64::new(0)).collect(),
699            per_group_wal_write_ns: (0..raft_group_count)
700                .map(|_| PaddedAtomicU64::new(0))
701                .collect(),
702            per_core_wal_sync_ns: (0..core_count).map(|_| PaddedAtomicU64::new(0)).collect(),
703            per_group_wal_sync_ns: (0..raft_group_count)
704                .map(|_| PaddedAtomicU64::new(0))
705                .collect(),
706            cold_flush_uploads: PaddedAtomicU64::new(0),
707            cold_flush_upload_bytes: PaddedAtomicU64::new(0),
708            cold_flush_upload_ns: PaddedAtomicU64::new(0),
709            cold_flush_publishes: PaddedAtomicU64::new(0),
710            cold_flush_publish_bytes: PaddedAtomicU64::new(0),
711            cold_flush_publish_ns: PaddedAtomicU64::new(0),
712            cold_orphan_cleanup_attempts: PaddedAtomicU64::new(0),
713            cold_orphan_cleanup_errors: PaddedAtomicU64::new(0),
714            cold_orphan_bytes: PaddedAtomicU64::new(0),
715            per_group_cold_hot_bytes: (0..raft_group_count)
716                .map(|_| PaddedAtomicU64::new(0))
717                .collect(),
718            per_group_cold_hot_bytes_max: (0..raft_group_count)
719                .map(|_| PaddedAtomicU64::new(0))
720                .collect(),
721            cold_hot_stream_bytes_max: PaddedAtomicU64::new(0),
722            per_core_cold_backpressure_events: (0..core_count)
723                .map(|_| PaddedAtomicU64::new(0))
724                .collect(),
725            per_group_cold_backpressure_events: (0..raft_group_count)
726                .map(|_| PaddedAtomicU64::new(0))
727                .collect(),
728            cold_backpressure_bytes: PaddedAtomicU64::new(0),
729        }
730    }
731
732    pub(crate) fn record_routed_request(&self, core_id: CoreId, mailbox_send_wait_ns: u64) {
733        let index = usize::from(core_id.0);
734        self.per_core_routed_requests[index].fetch_add_relaxed(1);
735        self.per_core_mailbox_send_wait_ns[index].fetch_add_relaxed(mailbox_send_wait_ns);
736    }
737
738    pub(crate) fn record_mailbox_full(&self, core_id: CoreId) {
739        self.per_core_mailbox_full_events[usize::from(core_id.0)].fetch_add_relaxed(1);
740    }
741
742    pub(crate) fn record_append(&self, core_id: CoreId, group_id: RaftGroupId) {
743        self.record_append_batch(core_id, group_id, 1);
744    }
745
746    pub(crate) fn record_append_batch(&self, core_id: CoreId, group_id: RaftGroupId, count: u64) {
747        self.per_core_appends[usize::from(core_id.0)].fetch_add_relaxed(count);
748        self.per_group_appends[usize::try_from(group_id.0).expect("u32 fits usize")]
749            .fetch_add_relaxed(count);
750    }
751
752    pub(crate) fn record_applied_mutation(
753        &self,
754        core_id: CoreId,
755        group_id: RaftGroupId,
756        apply_ns: u64,
757    ) {
758        self.record_applied_mutation_batch(core_id, group_id, 1, apply_ns);
759    }
760
761    pub(crate) fn record_applied_mutation_batch(
762        &self,
763        core_id: CoreId,
764        group_id: RaftGroupId,
765        count: u64,
766        apply_ns: u64,
767    ) {
768        let core_index = usize::from(core_id.0);
769        let group_index = usize::try_from(group_id.0).expect("u32 fits usize");
770        self.per_core_applied_mutations[core_index].fetch_add_relaxed(count);
771        self.per_group_applied_mutations[group_index].fetch_add_relaxed(count);
772        self.per_core_mutation_apply_ns[core_index].fetch_add_relaxed(apply_ns);
773        self.per_group_mutation_apply_ns[group_index].fetch_add_relaxed(apply_ns);
774    }
775
776    pub(crate) fn record_group_engine_exec(
777        &self,
778        core_id: CoreId,
779        group_id: RaftGroupId,
780        exec_ns: u64,
781    ) {
782        let core_index = usize::from(core_id.0);
783        let group_index = usize::try_from(group_id.0).expect("u32 fits usize");
784        self.per_core_group_engine_exec_ns[core_index].fetch_add_relaxed(exec_ns);
785        self.per_group_group_engine_exec_ns[group_index].fetch_add_relaxed(exec_ns);
786    }
787
788    pub(crate) fn record_group_mailbox_enqueued(&self, group_id: RaftGroupId) {
789        let group_index = usize::try_from(group_id.0).expect("u32 fits usize");
790        let depth = self.per_group_group_mailbox_depth[group_index].fetch_add_relaxed(1) + 1;
791        self.per_group_group_mailbox_max_depth[group_index].fetch_max_relaxed(depth);
792    }
793
794    pub(crate) fn record_group_mailbox_dequeued(&self, group_id: RaftGroupId) {
795        let group_index = usize::try_from(group_id.0).expect("u32 fits usize");
796        self.per_group_group_mailbox_depth[group_index].fetch_sub_relaxed(1);
797    }
798
799    pub(crate) fn record_group_mailbox_full(&self, group_id: RaftGroupId) {
800        let group_index = usize::try_from(group_id.0).expect("u32 fits usize");
801        self.per_group_group_mailbox_full_events[group_index].fetch_add_relaxed(1);
802    }
803
804    pub(crate) fn record_raft_write_many(
805        &self,
806        core_id: CoreId,
807        group_id: RaftGroupId,
808        sample: RaftWriteManySample,
809    ) {
810        let core_index = usize::from(core_id.0);
811        let group_index = usize::try_from(group_id.0).expect("u32 fits usize");
812        self.per_core_raft_write_many_batches[core_index].fetch_add_relaxed(1);
813        self.per_group_raft_write_many_batches[group_index].fetch_add_relaxed(1);
814        self.per_core_raft_write_many_commands[core_index].fetch_add_relaxed(sample.command_count);
815        self.per_group_raft_write_many_commands[group_index]
816            .fetch_add_relaxed(sample.command_count);
817        self.per_core_raft_write_many_logical_commands[core_index]
818            .fetch_add_relaxed(sample.logical_command_count);
819        self.per_group_raft_write_many_logical_commands[group_index]
820            .fetch_add_relaxed(sample.logical_command_count);
821        self.per_core_raft_write_many_responses[core_index]
822            .fetch_add_relaxed(sample.response_count);
823        self.per_group_raft_write_many_responses[group_index]
824            .fetch_add_relaxed(sample.response_count);
825        self.per_core_raft_write_many_submit_ns[core_index].fetch_add_relaxed(sample.submit_ns);
826        self.per_group_raft_write_many_submit_ns[group_index].fetch_add_relaxed(sample.submit_ns);
827        self.per_core_raft_write_many_response_ns[core_index].fetch_add_relaxed(sample.response_ns);
828        self.per_group_raft_write_many_response_ns[group_index]
829            .fetch_add_relaxed(sample.response_ns);
830    }
831
832    pub(crate) fn record_raft_apply_batch(
833        &self,
834        core_id: CoreId,
835        group_id: RaftGroupId,
836        entry_count: u64,
837        apply_ns: u64,
838    ) {
839        let core_index = usize::from(core_id.0);
840        let group_index = usize::try_from(group_id.0).expect("u32 fits usize");
841        self.per_core_raft_apply_entries[core_index].fetch_add_relaxed(entry_count);
842        self.per_group_raft_apply_entries[group_index].fetch_add_relaxed(entry_count);
843        self.per_core_raft_apply_ns[core_index].fetch_add_relaxed(apply_ns);
844        self.per_group_raft_apply_ns[group_index].fetch_add_relaxed(apply_ns);
845    }
846
847    pub(crate) fn record_wal_batch(
848        &self,
849        core_id: CoreId,
850        group_id: RaftGroupId,
851        record_count: u64,
852        write_ns: u64,
853        sync_ns: u64,
854    ) {
855        let core_index = usize::from(core_id.0);
856        let group_index = usize::try_from(group_id.0).expect("u32 fits usize");
857        self.per_core_wal_batches[core_index].fetch_add_relaxed(1);
858        self.per_group_wal_batches[group_index].fetch_add_relaxed(1);
859        self.per_core_wal_records[core_index].fetch_add_relaxed(record_count);
860        self.per_group_wal_records[group_index].fetch_add_relaxed(record_count);
861        self.per_core_wal_write_ns[core_index].fetch_add_relaxed(write_ns);
862        self.per_group_wal_write_ns[group_index].fetch_add_relaxed(write_ns);
863        self.per_core_wal_sync_ns[core_index].fetch_add_relaxed(sync_ns);
864        self.per_group_wal_sync_ns[group_index].fetch_add_relaxed(sync_ns);
865    }
866
867    pub(crate) fn record_cold_upload(&self, bytes: u64, upload_ns: u64) {
868        self.cold_flush_uploads.fetch_add_relaxed(1);
869        self.cold_flush_upload_bytes.fetch_add_relaxed(bytes);
870        self.cold_flush_upload_ns.fetch_add_relaxed(upload_ns);
871    }
872
873    pub(crate) fn record_cold_publish(&self, bytes: u64, publish_ns: u64) {
874        self.cold_flush_publishes.fetch_add_relaxed(1);
875        self.cold_flush_publish_bytes.fetch_add_relaxed(bytes);
876        self.cold_flush_publish_ns.fetch_add_relaxed(publish_ns);
877    }
878
879    pub(crate) fn record_cold_orphan_cleanup(&self, bytes: u64, cleanup_failed: bool) {
880        self.cold_orphan_cleanup_attempts.fetch_add_relaxed(1);
881        if cleanup_failed {
882            self.cold_orphan_cleanup_errors.fetch_add_relaxed(1);
883            self.cold_orphan_bytes.fetch_add_relaxed(bytes);
884        }
885    }
886
887    pub(crate) fn record_cold_hot_backlog(
888        &self,
889        group_id: RaftGroupId,
890        stream_hot_bytes: u64,
891        group_hot_bytes: u64,
892    ) {
893        let group_index = usize::try_from(group_id.0).expect("u32 fits usize");
894        self.per_group_cold_hot_bytes[group_index].store_relaxed(group_hot_bytes);
895        self.per_group_cold_hot_bytes_max[group_index].fetch_max_relaxed(group_hot_bytes);
896        self.cold_hot_stream_bytes_max
897            .fetch_max_relaxed(stream_hot_bytes);
898    }
899
900    pub(crate) fn record_cold_backpressure(
901        &self,
902        core_id: CoreId,
903        group_id: RaftGroupId,
904        incoming_bytes: u64,
905        _limit: u64,
906    ) {
907        let core_index = usize::from(core_id.0);
908        let group_index = usize::try_from(group_id.0).expect("u32 fits usize");
909        self.per_core_cold_backpressure_events[core_index].fetch_add_relaxed(1);
910        self.per_group_cold_backpressure_events[group_index].fetch_add_relaxed(1);
911        self.cold_backpressure_bytes
912            .fetch_add_relaxed(incoming_bytes);
913    }
914
915    pub(crate) fn record_read_watcher_added(&self, core_id: CoreId) {
916        self.record_read_watchers_added(core_id, 1);
917    }
918
919    pub(crate) fn record_read_watchers_added(&self, core_id: CoreId, count: usize) {
920        self.per_core_live_read_waiters[usize::from(core_id.0)]
921            .fetch_add_relaxed(u64::try_from(count).expect("watcher count fits u64"));
922    }
923
924    pub(crate) fn record_read_watchers_removed(&self, core_id: CoreId, count: usize) {
925        self.per_core_live_read_waiters[usize::from(core_id.0)]
926            .fetch_sub_relaxed(u64::try_from(count).expect("watcher count fits u64"));
927    }
928
929    pub(crate) fn record_live_read_backpressure(&self, core_id: CoreId) {
930        self.per_core_live_read_backpressure_events[usize::from(core_id.0)].fetch_add_relaxed(1);
931    }
932}
933
934pub(crate) fn elapsed_ns(started_at: Instant) -> u64 {
935    u64::try_from(started_at.elapsed().as_nanos()).unwrap_or(u64::MAX)
936}
937
938pub(crate) fn append_batch_payload_bytes(request: &AppendBatchRequest) -> u64 {
939    request
940        .payloads
941        .iter()
942        .map(|payload| u64::try_from(payload.len()).expect("payload len fits u64"))
943        .sum()
944}
945
946pub(crate) fn record_cold_backpressure_error(
947    metrics: &RuntimeMetricsInner,
948    placement: ShardPlacement,
949    incoming_bytes: u64,
950    admission: ColdWriteAdmission,
951    err: &GroupEngineError,
952) {
953    if !err.message().contains("ColdBackpressure") {
954        return;
955    }
956    metrics.record_cold_backpressure(
957        placement.core_id,
958        placement.raft_group_id,
959        incoming_bytes,
960        admission.max_hot_bytes_per_group.unwrap_or(0),
961    );
962}
963
964pub(crate) fn is_stale_cold_flush_candidate_error(err: &RuntimeError) -> bool {
965    let RuntimeError::GroupEngine { message, .. } = err else {
966        return false;
967    };
968    message.contains("StreamGone")
969        || message.contains("StreamNotFound")
970        || (message.contains("InvalidColdFlush")
971            && (message.contains("beyond stream")
972                || message.contains("does not match the start of a hot payload segment")
973                || message.contains("does not cover contiguous hot payload segments")
974                || message.contains("exceeds stream")
975                || message.contains("non-contiguous hot payload metadata")))
976}
977
978pub(crate) async fn record_cold_hot_backlog(
979    group: &mut Box<dyn GroupEngine>,
980    metrics: &RuntimeMetricsInner,
981    stream_id: BucketStreamId,
982    placement: ShardPlacement,
983) {
984    if let Ok(backlog) = group.cold_hot_backlog(stream_id, placement).await {
985        metrics.record_cold_hot_backlog(
986            placement.raft_group_id,
987            backlog.stream_hot_bytes,
988            backlog.group_hot_bytes,
989        );
990    }
991}
992
993#[derive(Debug)]
994#[repr(align(128))]
995pub(crate) struct PaddedAtomicU64 {
996    value: AtomicU64,
997}
998
999impl PaddedAtomicU64 {
1000    pub(crate) fn new(value: u64) -> Self {
1001        Self {
1002            value: AtomicU64::new(value),
1003        }
1004    }
1005
1006    pub(crate) fn load_relaxed(&self) -> u64 {
1007        self.value.load(Ordering::Relaxed)
1008    }
1009
1010    pub(crate) fn fetch_add_relaxed(&self, value: u64) -> u64 {
1011        self.value.fetch_add(value, Ordering::Relaxed)
1012    }
1013
1014    pub(crate) fn fetch_sub_relaxed(&self, value: u64) {
1015        self.value.fetch_sub(value, Ordering::Relaxed);
1016    }
1017
1018    pub(crate) fn fetch_max_relaxed(&self, value: u64) {
1019        self.value.fetch_max(value, Ordering::Relaxed);
1020    }
1021
1022    pub(crate) fn store_relaxed(&self, value: u64) {
1023        self.value.store(value, Ordering::Relaxed);
1024    }
1025}