1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::time::Instant;
4
5use ursula_shard::{BucketStreamId, CoreId, RaftGroupId, ShardPlacement};
6
7use crate::engine::{GroupEngine, GroupEngineError};
8use crate::error::RuntimeError;
9use crate::request::{AppendBatchRequest, ColdWriteAdmission};
10
11pub(crate) const GROUP_ACTOR_MAX_WRITE_BATCH: usize = 64;
12pub(crate) const COLD_FLUSH_GROUP_BATCH_MAX_CHUNKS: usize = 64;
13
14#[derive(Debug, Clone)]
15pub struct RuntimeMetrics {
16 pub(crate) inner: Arc<RuntimeMetricsInner>,
17}
18
19impl RuntimeMetrics {
20 pub fn snapshot(&self) -> RuntimeMetricsSnapshot {
21 let per_core_appends = self
22 .inner
23 .per_core_appends
24 .iter()
25 .map(PaddedAtomicU64::load_relaxed)
26 .collect::<Vec<_>>();
27 let accepted_appends = per_core_appends.iter().sum();
28 let per_group_appends = self
29 .inner
30 .per_group_appends
31 .iter()
32 .map(PaddedAtomicU64::load_relaxed)
33 .collect();
34 let per_core_applied_mutations = self
35 .inner
36 .per_core_applied_mutations
37 .iter()
38 .map(PaddedAtomicU64::load_relaxed)
39 .collect::<Vec<_>>();
40 let applied_mutations = per_core_applied_mutations.iter().sum();
41 let per_group_applied_mutations = self
42 .inner
43 .per_group_applied_mutations
44 .iter()
45 .map(PaddedAtomicU64::load_relaxed)
46 .collect();
47 let per_core_mutation_apply_ns = self
48 .inner
49 .per_core_mutation_apply_ns
50 .iter()
51 .map(PaddedAtomicU64::load_relaxed)
52 .collect::<Vec<_>>();
53 let mutation_apply_ns = per_core_mutation_apply_ns.iter().sum();
54 let per_group_mutation_apply_ns = self
55 .inner
56 .per_group_mutation_apply_ns
57 .iter()
58 .map(PaddedAtomicU64::load_relaxed)
59 .collect();
60 let per_core_group_lock_wait_ns = self
61 .inner
62 .per_core_group_lock_wait_ns
63 .iter()
64 .map(PaddedAtomicU64::load_relaxed)
65 .collect::<Vec<_>>();
66 let group_lock_wait_ns = per_core_group_lock_wait_ns.iter().sum();
67 let per_group_group_lock_wait_ns = self
68 .inner
69 .per_group_group_lock_wait_ns
70 .iter()
71 .map(PaddedAtomicU64::load_relaxed)
72 .collect();
73 let per_core_group_engine_exec_ns = self
74 .inner
75 .per_core_group_engine_exec_ns
76 .iter()
77 .map(PaddedAtomicU64::load_relaxed)
78 .collect::<Vec<_>>();
79 let group_engine_exec_ns = per_core_group_engine_exec_ns.iter().sum();
80 let per_group_group_engine_exec_ns = self
81 .inner
82 .per_group_group_engine_exec_ns
83 .iter()
84 .map(PaddedAtomicU64::load_relaxed)
85 .collect();
86 let per_group_group_mailbox_depth = self
87 .inner
88 .per_group_group_mailbox_depth
89 .iter()
90 .map(PaddedAtomicU64::load_relaxed)
91 .collect::<Vec<_>>();
92 let group_mailbox_depth = per_group_group_mailbox_depth.iter().sum();
93 let per_group_group_mailbox_max_depth = self
94 .inner
95 .per_group_group_mailbox_max_depth
96 .iter()
97 .map(PaddedAtomicU64::load_relaxed)
98 .collect::<Vec<_>>();
99 let group_mailbox_max_depth = per_group_group_mailbox_max_depth
100 .iter()
101 .copied()
102 .max()
103 .unwrap_or(0);
104 let per_group_group_mailbox_full_events = self
105 .inner
106 .per_group_group_mailbox_full_events
107 .iter()
108 .map(PaddedAtomicU64::load_relaxed)
109 .collect::<Vec<_>>();
110 let group_mailbox_full_events = per_group_group_mailbox_full_events.iter().sum();
111 let per_core_raft_write_many_batches = self
112 .inner
113 .per_core_raft_write_many_batches
114 .iter()
115 .map(PaddedAtomicU64::load_relaxed)
116 .collect::<Vec<_>>();
117 let raft_write_many_batches = per_core_raft_write_many_batches.iter().sum();
118 let per_group_raft_write_many_batches = self
119 .inner
120 .per_group_raft_write_many_batches
121 .iter()
122 .map(PaddedAtomicU64::load_relaxed)
123 .collect();
124 let per_core_raft_write_many_commands = self
125 .inner
126 .per_core_raft_write_many_commands
127 .iter()
128 .map(PaddedAtomicU64::load_relaxed)
129 .collect::<Vec<_>>();
130 let raft_write_many_commands = per_core_raft_write_many_commands.iter().sum();
131 let per_group_raft_write_many_commands = self
132 .inner
133 .per_group_raft_write_many_commands
134 .iter()
135 .map(PaddedAtomicU64::load_relaxed)
136 .collect();
137 let per_core_raft_write_many_logical_commands = self
138 .inner
139 .per_core_raft_write_many_logical_commands
140 .iter()
141 .map(PaddedAtomicU64::load_relaxed)
142 .collect::<Vec<_>>();
143 let raft_write_many_logical_commands =
144 per_core_raft_write_many_logical_commands.iter().sum();
145 let per_group_raft_write_many_logical_commands = self
146 .inner
147 .per_group_raft_write_many_logical_commands
148 .iter()
149 .map(PaddedAtomicU64::load_relaxed)
150 .collect();
151 let per_core_raft_write_many_responses = self
152 .inner
153 .per_core_raft_write_many_responses
154 .iter()
155 .map(PaddedAtomicU64::load_relaxed)
156 .collect::<Vec<_>>();
157 let raft_write_many_responses = per_core_raft_write_many_responses.iter().sum();
158 let per_group_raft_write_many_responses = self
159 .inner
160 .per_group_raft_write_many_responses
161 .iter()
162 .map(PaddedAtomicU64::load_relaxed)
163 .collect();
164 let per_core_raft_write_many_submit_ns = self
165 .inner
166 .per_core_raft_write_many_submit_ns
167 .iter()
168 .map(PaddedAtomicU64::load_relaxed)
169 .collect::<Vec<_>>();
170 let raft_write_many_submit_ns = per_core_raft_write_many_submit_ns.iter().sum();
171 let per_group_raft_write_many_submit_ns = self
172 .inner
173 .per_group_raft_write_many_submit_ns
174 .iter()
175 .map(PaddedAtomicU64::load_relaxed)
176 .collect();
177 let per_core_raft_write_many_response_ns = self
178 .inner
179 .per_core_raft_write_many_response_ns
180 .iter()
181 .map(PaddedAtomicU64::load_relaxed)
182 .collect::<Vec<_>>();
183 let raft_write_many_response_ns = per_core_raft_write_many_response_ns.iter().sum();
184 let per_group_raft_write_many_response_ns = self
185 .inner
186 .per_group_raft_write_many_response_ns
187 .iter()
188 .map(PaddedAtomicU64::load_relaxed)
189 .collect();
190 let per_core_raft_apply_entries = self
191 .inner
192 .per_core_raft_apply_entries
193 .iter()
194 .map(PaddedAtomicU64::load_relaxed)
195 .collect::<Vec<_>>();
196 let raft_apply_entries = per_core_raft_apply_entries.iter().sum();
197 let per_group_raft_apply_entries = self
198 .inner
199 .per_group_raft_apply_entries
200 .iter()
201 .map(PaddedAtomicU64::load_relaxed)
202 .collect();
203 let per_core_raft_apply_ns = self
204 .inner
205 .per_core_raft_apply_ns
206 .iter()
207 .map(PaddedAtomicU64::load_relaxed)
208 .collect::<Vec<_>>();
209 let raft_apply_ns = per_core_raft_apply_ns.iter().sum();
210 let per_group_raft_apply_ns = self
211 .inner
212 .per_group_raft_apply_ns
213 .iter()
214 .map(PaddedAtomicU64::load_relaxed)
215 .collect();
216 let per_core_live_read_waiters = self
217 .inner
218 .per_core_live_read_waiters
219 .iter()
220 .map(PaddedAtomicU64::load_relaxed)
221 .collect::<Vec<_>>();
222 let live_read_waiters = per_core_live_read_waiters.iter().sum();
223 let per_core_live_read_backpressure_events = self
224 .inner
225 .per_core_live_read_backpressure_events
226 .iter()
227 .map(PaddedAtomicU64::load_relaxed)
228 .collect::<Vec<_>>();
229 let live_read_backpressure_events = per_core_live_read_backpressure_events.iter().sum();
230 let per_core_routed_requests = self
231 .inner
232 .per_core_routed_requests
233 .iter()
234 .map(PaddedAtomicU64::load_relaxed)
235 .collect::<Vec<_>>();
236 let routed_requests = per_core_routed_requests.iter().sum();
237 let per_core_mailbox_send_wait_ns = self
238 .inner
239 .per_core_mailbox_send_wait_ns
240 .iter()
241 .map(PaddedAtomicU64::load_relaxed)
242 .collect::<Vec<_>>();
243 let mailbox_send_wait_ns = per_core_mailbox_send_wait_ns.iter().sum();
244 let per_core_mailbox_full_events = self
245 .inner
246 .per_core_mailbox_full_events
247 .iter()
248 .map(PaddedAtomicU64::load_relaxed)
249 .collect::<Vec<_>>();
250 let mailbox_full_events = per_core_mailbox_full_events.iter().sum();
251 let per_core_wal_batches = self
252 .inner
253 .per_core_wal_batches
254 .iter()
255 .map(PaddedAtomicU64::load_relaxed)
256 .collect::<Vec<_>>();
257 let wal_batches = per_core_wal_batches.iter().sum();
258 let per_group_wal_batches = self
259 .inner
260 .per_group_wal_batches
261 .iter()
262 .map(PaddedAtomicU64::load_relaxed)
263 .collect();
264 let per_core_wal_records = self
265 .inner
266 .per_core_wal_records
267 .iter()
268 .map(PaddedAtomicU64::load_relaxed)
269 .collect::<Vec<_>>();
270 let wal_records = per_core_wal_records.iter().sum();
271 let per_group_wal_records = self
272 .inner
273 .per_group_wal_records
274 .iter()
275 .map(PaddedAtomicU64::load_relaxed)
276 .collect();
277 let per_core_wal_write_ns = self
278 .inner
279 .per_core_wal_write_ns
280 .iter()
281 .map(PaddedAtomicU64::load_relaxed)
282 .collect::<Vec<_>>();
283 let wal_write_ns = per_core_wal_write_ns.iter().sum();
284 let per_group_wal_write_ns = self
285 .inner
286 .per_group_wal_write_ns
287 .iter()
288 .map(PaddedAtomicU64::load_relaxed)
289 .collect();
290 let per_core_wal_sync_ns = self
291 .inner
292 .per_core_wal_sync_ns
293 .iter()
294 .map(PaddedAtomicU64::load_relaxed)
295 .collect::<Vec<_>>();
296 let wal_sync_ns = per_core_wal_sync_ns.iter().sum();
297 let per_group_wal_sync_ns = self
298 .inner
299 .per_group_wal_sync_ns
300 .iter()
301 .map(PaddedAtomicU64::load_relaxed)
302 .collect();
303 let cold_flush_uploads = self.inner.cold_flush_uploads.load_relaxed();
304 let cold_flush_upload_bytes = self.inner.cold_flush_upload_bytes.load_relaxed();
305 let cold_flush_upload_ns = self.inner.cold_flush_upload_ns.load_relaxed();
306 let cold_flush_publishes = self.inner.cold_flush_publishes.load_relaxed();
307 let cold_flush_publish_bytes = self.inner.cold_flush_publish_bytes.load_relaxed();
308 let cold_flush_publish_ns = self.inner.cold_flush_publish_ns.load_relaxed();
309 let cold_orphan_cleanup_attempts = self.inner.cold_orphan_cleanup_attempts.load_relaxed();
310 let cold_orphan_cleanup_errors = self.inner.cold_orphan_cleanup_errors.load_relaxed();
311 let cold_orphan_bytes = self.inner.cold_orphan_bytes.load_relaxed();
312 let per_group_cold_hot_bytes = self
313 .inner
314 .per_group_cold_hot_bytes
315 .iter()
316 .map(PaddedAtomicU64::load_relaxed)
317 .collect::<Vec<_>>();
318 let cold_hot_bytes = per_group_cold_hot_bytes.iter().sum();
319 let per_group_cold_hot_bytes_max = self
320 .inner
321 .per_group_cold_hot_bytes_max
322 .iter()
323 .map(PaddedAtomicU64::load_relaxed)
324 .collect::<Vec<_>>();
325 let cold_hot_group_bytes_max = per_group_cold_hot_bytes_max
326 .iter()
327 .copied()
328 .max()
329 .unwrap_or(0);
330 let cold_hot_stream_bytes_max = self.inner.cold_hot_stream_bytes_max.load_relaxed();
331 let per_core_cold_backpressure_events = self
332 .inner
333 .per_core_cold_backpressure_events
334 .iter()
335 .map(PaddedAtomicU64::load_relaxed)
336 .collect::<Vec<_>>();
337 let cold_backpressure_events = per_core_cold_backpressure_events.iter().sum();
338 let per_group_cold_backpressure_events = self
339 .inner
340 .per_group_cold_backpressure_events
341 .iter()
342 .map(PaddedAtomicU64::load_relaxed)
343 .collect();
344 let cold_backpressure_bytes = self.inner.cold_backpressure_bytes.load_relaxed();
345
346 RuntimeMetricsSnapshot {
347 accepted_appends,
348 per_core_appends,
349 per_group_appends,
350 applied_mutations,
351 per_core_applied_mutations,
352 per_group_applied_mutations,
353 mutation_apply_ns,
354 per_core_mutation_apply_ns,
355 per_group_mutation_apply_ns,
356 group_lock_wait_ns,
357 per_core_group_lock_wait_ns,
358 per_group_group_lock_wait_ns,
359 group_engine_exec_ns,
360 per_core_group_engine_exec_ns,
361 per_group_group_engine_exec_ns,
362 group_mailbox_depth,
363 per_group_group_mailbox_depth,
364 group_mailbox_max_depth,
365 per_group_group_mailbox_max_depth,
366 group_mailbox_full_events,
367 per_group_group_mailbox_full_events,
368 raft_write_many_batches,
369 per_core_raft_write_many_batches,
370 per_group_raft_write_many_batches,
371 raft_write_many_commands,
372 per_core_raft_write_many_commands,
373 per_group_raft_write_many_commands,
374 raft_write_many_logical_commands,
375 per_core_raft_write_many_logical_commands,
376 per_group_raft_write_many_logical_commands,
377 raft_write_many_responses,
378 per_core_raft_write_many_responses,
379 per_group_raft_write_many_responses,
380 raft_write_many_submit_ns,
381 per_core_raft_write_many_submit_ns,
382 per_group_raft_write_many_submit_ns,
383 raft_write_many_response_ns,
384 per_core_raft_write_many_response_ns,
385 per_group_raft_write_many_response_ns,
386 raft_apply_entries,
387 per_core_raft_apply_entries,
388 per_group_raft_apply_entries,
389 raft_apply_ns,
390 per_core_raft_apply_ns,
391 per_group_raft_apply_ns,
392 live_read_waiters,
393 per_core_live_read_waiters,
394 live_read_backpressure_events,
395 per_core_live_read_backpressure_events,
396 routed_requests,
397 per_core_routed_requests,
398 mailbox_send_wait_ns,
399 per_core_mailbox_send_wait_ns,
400 mailbox_full_events,
401 per_core_mailbox_full_events,
402 wal_batches,
403 per_core_wal_batches,
404 per_group_wal_batches,
405 wal_records,
406 per_core_wal_records,
407 per_group_wal_records,
408 wal_write_ns,
409 per_core_wal_write_ns,
410 per_group_wal_write_ns,
411 wal_sync_ns,
412 per_core_wal_sync_ns,
413 per_group_wal_sync_ns,
414 cold_flush_uploads,
415 cold_flush_upload_bytes,
416 cold_flush_upload_ns,
417 cold_flush_publishes,
418 cold_flush_publish_bytes,
419 cold_flush_publish_ns,
420 cold_orphan_cleanup_attempts,
421 cold_orphan_cleanup_errors,
422 cold_orphan_bytes,
423 cold_hot_bytes,
424 per_group_cold_hot_bytes,
425 cold_hot_group_bytes_max,
426 per_group_cold_hot_bytes_max,
427 cold_hot_stream_bytes_max,
428 cold_backpressure_events,
429 per_core_cold_backpressure_events,
430 per_group_cold_backpressure_events,
431 cold_backpressure_bytes,
432 }
433 }
434}
435
436#[derive(Debug, Clone, PartialEq, Eq)]
437pub struct RuntimeMetricsSnapshot {
438 pub accepted_appends: u64,
439 pub per_core_appends: Vec<u64>,
440 pub per_group_appends: Vec<u64>,
441 pub applied_mutations: u64,
442 pub per_core_applied_mutations: Vec<u64>,
443 pub per_group_applied_mutations: Vec<u64>,
444 pub mutation_apply_ns: u64,
445 pub per_core_mutation_apply_ns: Vec<u64>,
446 pub per_group_mutation_apply_ns: Vec<u64>,
447 pub group_lock_wait_ns: u64,
448 pub per_core_group_lock_wait_ns: Vec<u64>,
449 pub per_group_group_lock_wait_ns: Vec<u64>,
450 pub group_engine_exec_ns: u64,
451 pub per_core_group_engine_exec_ns: Vec<u64>,
452 pub per_group_group_engine_exec_ns: Vec<u64>,
453 pub group_mailbox_depth: u64,
454 pub per_group_group_mailbox_depth: Vec<u64>,
455 pub group_mailbox_max_depth: u64,
456 pub per_group_group_mailbox_max_depth: Vec<u64>,
457 pub group_mailbox_full_events: u64,
458 pub per_group_group_mailbox_full_events: Vec<u64>,
459 pub raft_write_many_batches: u64,
460 pub per_core_raft_write_many_batches: Vec<u64>,
461 pub per_group_raft_write_many_batches: Vec<u64>,
462 pub raft_write_many_commands: u64,
463 pub per_core_raft_write_many_commands: Vec<u64>,
464 pub per_group_raft_write_many_commands: Vec<u64>,
465 pub raft_write_many_logical_commands: u64,
466 pub per_core_raft_write_many_logical_commands: Vec<u64>,
467 pub per_group_raft_write_many_logical_commands: Vec<u64>,
468 pub raft_write_many_responses: u64,
469 pub per_core_raft_write_many_responses: Vec<u64>,
470 pub per_group_raft_write_many_responses: Vec<u64>,
471 pub raft_write_many_submit_ns: u64,
472 pub per_core_raft_write_many_submit_ns: Vec<u64>,
473 pub per_group_raft_write_many_submit_ns: Vec<u64>,
474 pub raft_write_many_response_ns: u64,
475 pub per_core_raft_write_many_response_ns: Vec<u64>,
476 pub per_group_raft_write_many_response_ns: Vec<u64>,
477 pub raft_apply_entries: u64,
478 pub per_core_raft_apply_entries: Vec<u64>,
479 pub per_group_raft_apply_entries: Vec<u64>,
480 pub raft_apply_ns: u64,
481 pub per_core_raft_apply_ns: Vec<u64>,
482 pub per_group_raft_apply_ns: Vec<u64>,
483 pub live_read_waiters: u64,
484 pub per_core_live_read_waiters: Vec<u64>,
485 pub live_read_backpressure_events: u64,
486 pub per_core_live_read_backpressure_events: Vec<u64>,
487 pub routed_requests: u64,
488 pub per_core_routed_requests: Vec<u64>,
489 pub mailbox_send_wait_ns: u64,
490 pub per_core_mailbox_send_wait_ns: Vec<u64>,
491 pub mailbox_full_events: u64,
492 pub per_core_mailbox_full_events: Vec<u64>,
493 pub wal_batches: u64,
494 pub per_core_wal_batches: Vec<u64>,
495 pub per_group_wal_batches: Vec<u64>,
496 pub wal_records: u64,
497 pub per_core_wal_records: Vec<u64>,
498 pub per_group_wal_records: Vec<u64>,
499 pub wal_write_ns: u64,
500 pub per_core_wal_write_ns: Vec<u64>,
501 pub per_group_wal_write_ns: Vec<u64>,
502 pub wal_sync_ns: u64,
503 pub per_core_wal_sync_ns: Vec<u64>,
504 pub per_group_wal_sync_ns: Vec<u64>,
505 pub cold_flush_uploads: u64,
506 pub cold_flush_upload_bytes: u64,
507 pub cold_flush_upload_ns: u64,
508 pub cold_flush_publishes: u64,
509 pub cold_flush_publish_bytes: u64,
510 pub cold_flush_publish_ns: u64,
511 pub cold_orphan_cleanup_attempts: u64,
512 pub cold_orphan_cleanup_errors: u64,
513 pub cold_orphan_bytes: u64,
514 pub cold_hot_bytes: u64,
515 pub per_group_cold_hot_bytes: Vec<u64>,
516 pub cold_hot_group_bytes_max: u64,
517 pub per_group_cold_hot_bytes_max: Vec<u64>,
518 pub cold_hot_stream_bytes_max: u64,
519 pub cold_backpressure_events: u64,
520 pub per_core_cold_backpressure_events: Vec<u64>,
521 pub per_group_cold_backpressure_events: Vec<u64>,
522 pub cold_backpressure_bytes: u64,
523}
524
525#[derive(Debug, Clone, PartialEq, Eq)]
526pub struct RuntimeMailboxSnapshot {
527 pub depths: Vec<usize>,
528 pub capacities: Vec<usize>,
529}
530
531#[derive(Debug)]
532pub(crate) struct RuntimeMetricsInner {
533 pub(crate) per_core_appends: Vec<PaddedAtomicU64>,
534 pub(crate) per_group_appends: Vec<PaddedAtomicU64>,
535 pub(crate) per_core_applied_mutations: Vec<PaddedAtomicU64>,
536 pub(crate) per_group_applied_mutations: Vec<PaddedAtomicU64>,
537 pub(crate) per_core_mutation_apply_ns: Vec<PaddedAtomicU64>,
538 pub(crate) per_group_mutation_apply_ns: Vec<PaddedAtomicU64>,
539 pub(crate) per_core_group_lock_wait_ns: Vec<PaddedAtomicU64>,
540 pub(crate) per_group_group_lock_wait_ns: Vec<PaddedAtomicU64>,
541 pub(crate) per_core_group_engine_exec_ns: Vec<PaddedAtomicU64>,
542 pub(crate) per_group_group_engine_exec_ns: Vec<PaddedAtomicU64>,
543 pub(crate) per_group_group_mailbox_depth: Vec<PaddedAtomicU64>,
544 pub(crate) per_group_group_mailbox_max_depth: Vec<PaddedAtomicU64>,
545 pub(crate) per_group_group_mailbox_full_events: Vec<PaddedAtomicU64>,
546 pub(crate) per_core_raft_write_many_batches: Vec<PaddedAtomicU64>,
547 pub(crate) per_group_raft_write_many_batches: Vec<PaddedAtomicU64>,
548 pub(crate) per_core_raft_write_many_commands: Vec<PaddedAtomicU64>,
549 pub(crate) per_group_raft_write_many_commands: Vec<PaddedAtomicU64>,
550 pub(crate) per_core_raft_write_many_logical_commands: Vec<PaddedAtomicU64>,
551 pub(crate) per_group_raft_write_many_logical_commands: Vec<PaddedAtomicU64>,
552 pub(crate) per_core_raft_write_many_responses: Vec<PaddedAtomicU64>,
553 pub(crate) per_group_raft_write_many_responses: Vec<PaddedAtomicU64>,
554 pub(crate) per_core_raft_write_many_submit_ns: Vec<PaddedAtomicU64>,
555 pub(crate) per_group_raft_write_many_submit_ns: Vec<PaddedAtomicU64>,
556 pub(crate) per_core_raft_write_many_response_ns: Vec<PaddedAtomicU64>,
557 pub(crate) per_group_raft_write_many_response_ns: Vec<PaddedAtomicU64>,
558 pub(crate) per_core_raft_apply_entries: Vec<PaddedAtomicU64>,
559 pub(crate) per_group_raft_apply_entries: Vec<PaddedAtomicU64>,
560 pub(crate) per_core_raft_apply_ns: Vec<PaddedAtomicU64>,
561 pub(crate) per_group_raft_apply_ns: Vec<PaddedAtomicU64>,
562 pub(crate) per_core_live_read_waiters: Vec<PaddedAtomicU64>,
563 pub(crate) per_core_live_read_backpressure_events: Vec<PaddedAtomicU64>,
564 pub(crate) per_core_routed_requests: Vec<PaddedAtomicU64>,
565 pub(crate) per_core_mailbox_send_wait_ns: Vec<PaddedAtomicU64>,
566 pub(crate) per_core_mailbox_full_events: Vec<PaddedAtomicU64>,
567 pub(crate) per_core_wal_batches: Vec<PaddedAtomicU64>,
568 pub(crate) per_group_wal_batches: Vec<PaddedAtomicU64>,
569 pub(crate) per_core_wal_records: Vec<PaddedAtomicU64>,
570 pub(crate) per_group_wal_records: Vec<PaddedAtomicU64>,
571 pub(crate) per_core_wal_write_ns: Vec<PaddedAtomicU64>,
572 pub(crate) per_group_wal_write_ns: Vec<PaddedAtomicU64>,
573 pub(crate) per_core_wal_sync_ns: Vec<PaddedAtomicU64>,
574 pub(crate) per_group_wal_sync_ns: Vec<PaddedAtomicU64>,
575 pub(crate) cold_flush_uploads: PaddedAtomicU64,
576 pub(crate) cold_flush_upload_bytes: PaddedAtomicU64,
577 pub(crate) cold_flush_upload_ns: PaddedAtomicU64,
578 pub(crate) cold_flush_publishes: PaddedAtomicU64,
579 pub(crate) cold_flush_publish_bytes: PaddedAtomicU64,
580 pub(crate) cold_flush_publish_ns: PaddedAtomicU64,
581 pub(crate) cold_orphan_cleanup_attempts: PaddedAtomicU64,
582 pub(crate) cold_orphan_cleanup_errors: PaddedAtomicU64,
583 pub(crate) cold_orphan_bytes: PaddedAtomicU64,
584 pub(crate) per_group_cold_hot_bytes: Vec<PaddedAtomicU64>,
585 pub(crate) per_group_cold_hot_bytes_max: Vec<PaddedAtomicU64>,
586 pub(crate) cold_hot_stream_bytes_max: PaddedAtomicU64,
587 pub(crate) per_core_cold_backpressure_events: Vec<PaddedAtomicU64>,
588 pub(crate) per_group_cold_backpressure_events: Vec<PaddedAtomicU64>,
589 pub(crate) cold_backpressure_bytes: PaddedAtomicU64,
590}
591
592#[derive(Debug, Clone, Copy)]
593pub(crate) struct RaftWriteManySample {
594 pub(crate) command_count: u64,
595 pub(crate) logical_command_count: u64,
596 pub(crate) response_count: u64,
597 pub(crate) submit_ns: u64,
598 pub(crate) response_ns: u64,
599}
600
601impl RuntimeMetricsInner {
602 pub(crate) fn new(core_count: usize, raft_group_count: usize) -> Self {
603 Self {
604 per_core_appends: (0..core_count).map(|_| PaddedAtomicU64::new(0)).collect(),
605 per_group_appends: (0..raft_group_count)
606 .map(|_| PaddedAtomicU64::new(0))
607 .collect(),
608 per_core_applied_mutations: (0..core_count).map(|_| PaddedAtomicU64::new(0)).collect(),
609 per_group_applied_mutations: (0..raft_group_count)
610 .map(|_| PaddedAtomicU64::new(0))
611 .collect(),
612 per_core_mutation_apply_ns: (0..core_count).map(|_| PaddedAtomicU64::new(0)).collect(),
613 per_group_mutation_apply_ns: (0..raft_group_count)
614 .map(|_| PaddedAtomicU64::new(0))
615 .collect(),
616 per_core_group_lock_wait_ns: (0..core_count).map(|_| PaddedAtomicU64::new(0)).collect(),
617 per_group_group_lock_wait_ns: (0..raft_group_count)
618 .map(|_| PaddedAtomicU64::new(0))
619 .collect(),
620 per_core_group_engine_exec_ns: (0..core_count)
621 .map(|_| PaddedAtomicU64::new(0))
622 .collect(),
623 per_group_group_engine_exec_ns: (0..raft_group_count)
624 .map(|_| PaddedAtomicU64::new(0))
625 .collect(),
626 per_group_group_mailbox_depth: (0..raft_group_count)
627 .map(|_| PaddedAtomicU64::new(0))
628 .collect(),
629 per_group_group_mailbox_max_depth: (0..raft_group_count)
630 .map(|_| PaddedAtomicU64::new(0))
631 .collect(),
632 per_group_group_mailbox_full_events: (0..raft_group_count)
633 .map(|_| PaddedAtomicU64::new(0))
634 .collect(),
635 per_core_raft_write_many_batches: (0..core_count)
636 .map(|_| PaddedAtomicU64::new(0))
637 .collect(),
638 per_group_raft_write_many_batches: (0..raft_group_count)
639 .map(|_| PaddedAtomicU64::new(0))
640 .collect(),
641 per_core_raft_write_many_commands: (0..core_count)
642 .map(|_| PaddedAtomicU64::new(0))
643 .collect(),
644 per_group_raft_write_many_commands: (0..raft_group_count)
645 .map(|_| PaddedAtomicU64::new(0))
646 .collect(),
647 per_core_raft_write_many_logical_commands: (0..core_count)
648 .map(|_| PaddedAtomicU64::new(0))
649 .collect(),
650 per_group_raft_write_many_logical_commands: (0..raft_group_count)
651 .map(|_| PaddedAtomicU64::new(0))
652 .collect(),
653 per_core_raft_write_many_responses: (0..core_count)
654 .map(|_| PaddedAtomicU64::new(0))
655 .collect(),
656 per_group_raft_write_many_responses: (0..raft_group_count)
657 .map(|_| PaddedAtomicU64::new(0))
658 .collect(),
659 per_core_raft_write_many_submit_ns: (0..core_count)
660 .map(|_| PaddedAtomicU64::new(0))
661 .collect(),
662 per_group_raft_write_many_submit_ns: (0..raft_group_count)
663 .map(|_| PaddedAtomicU64::new(0))
664 .collect(),
665 per_core_raft_write_many_response_ns: (0..core_count)
666 .map(|_| PaddedAtomicU64::new(0))
667 .collect(),
668 per_group_raft_write_many_response_ns: (0..raft_group_count)
669 .map(|_| PaddedAtomicU64::new(0))
670 .collect(),
671 per_core_raft_apply_entries: (0..core_count).map(|_| PaddedAtomicU64::new(0)).collect(),
672 per_group_raft_apply_entries: (0..raft_group_count)
673 .map(|_| PaddedAtomicU64::new(0))
674 .collect(),
675 per_core_raft_apply_ns: (0..core_count).map(|_| PaddedAtomicU64::new(0)).collect(),
676 per_group_raft_apply_ns: (0..raft_group_count)
677 .map(|_| PaddedAtomicU64::new(0))
678 .collect(),
679 per_core_live_read_waiters: (0..core_count).map(|_| PaddedAtomicU64::new(0)).collect(),
680 per_core_live_read_backpressure_events: (0..core_count)
681 .map(|_| PaddedAtomicU64::new(0))
682 .collect(),
683 per_core_routed_requests: (0..core_count).map(|_| PaddedAtomicU64::new(0)).collect(),
684 per_core_mailbox_send_wait_ns: (0..core_count)
685 .map(|_| PaddedAtomicU64::new(0))
686 .collect(),
687 per_core_mailbox_full_events: (0..core_count)
688 .map(|_| PaddedAtomicU64::new(0))
689 .collect(),
690 per_core_wal_batches: (0..core_count).map(|_| PaddedAtomicU64::new(0)).collect(),
691 per_group_wal_batches: (0..raft_group_count)
692 .map(|_| PaddedAtomicU64::new(0))
693 .collect(),
694 per_core_wal_records: (0..core_count).map(|_| PaddedAtomicU64::new(0)).collect(),
695 per_group_wal_records: (0..raft_group_count)
696 .map(|_| PaddedAtomicU64::new(0))
697 .collect(),
698 per_core_wal_write_ns: (0..core_count).map(|_| PaddedAtomicU64::new(0)).collect(),
699 per_group_wal_write_ns: (0..raft_group_count)
700 .map(|_| PaddedAtomicU64::new(0))
701 .collect(),
702 per_core_wal_sync_ns: (0..core_count).map(|_| PaddedAtomicU64::new(0)).collect(),
703 per_group_wal_sync_ns: (0..raft_group_count)
704 .map(|_| PaddedAtomicU64::new(0))
705 .collect(),
706 cold_flush_uploads: PaddedAtomicU64::new(0),
707 cold_flush_upload_bytes: PaddedAtomicU64::new(0),
708 cold_flush_upload_ns: PaddedAtomicU64::new(0),
709 cold_flush_publishes: PaddedAtomicU64::new(0),
710 cold_flush_publish_bytes: PaddedAtomicU64::new(0),
711 cold_flush_publish_ns: PaddedAtomicU64::new(0),
712 cold_orphan_cleanup_attempts: PaddedAtomicU64::new(0),
713 cold_orphan_cleanup_errors: PaddedAtomicU64::new(0),
714 cold_orphan_bytes: PaddedAtomicU64::new(0),
715 per_group_cold_hot_bytes: (0..raft_group_count)
716 .map(|_| PaddedAtomicU64::new(0))
717 .collect(),
718 per_group_cold_hot_bytes_max: (0..raft_group_count)
719 .map(|_| PaddedAtomicU64::new(0))
720 .collect(),
721 cold_hot_stream_bytes_max: PaddedAtomicU64::new(0),
722 per_core_cold_backpressure_events: (0..core_count)
723 .map(|_| PaddedAtomicU64::new(0))
724 .collect(),
725 per_group_cold_backpressure_events: (0..raft_group_count)
726 .map(|_| PaddedAtomicU64::new(0))
727 .collect(),
728 cold_backpressure_bytes: PaddedAtomicU64::new(0),
729 }
730 }
731
732 pub(crate) fn record_routed_request(&self, core_id: CoreId, mailbox_send_wait_ns: u64) {
733 let index = usize::from(core_id.0);
734 self.per_core_routed_requests[index].fetch_add_relaxed(1);
735 self.per_core_mailbox_send_wait_ns[index].fetch_add_relaxed(mailbox_send_wait_ns);
736 }
737
738 pub(crate) fn record_mailbox_full(&self, core_id: CoreId) {
739 self.per_core_mailbox_full_events[usize::from(core_id.0)].fetch_add_relaxed(1);
740 }
741
742 pub(crate) fn record_append(&self, core_id: CoreId, group_id: RaftGroupId) {
743 self.record_append_batch(core_id, group_id, 1);
744 }
745
746 pub(crate) fn record_append_batch(&self, core_id: CoreId, group_id: RaftGroupId, count: u64) {
747 self.per_core_appends[usize::from(core_id.0)].fetch_add_relaxed(count);
748 self.per_group_appends[usize::try_from(group_id.0).expect("u32 fits usize")]
749 .fetch_add_relaxed(count);
750 }
751
752 pub(crate) fn record_applied_mutation(
753 &self,
754 core_id: CoreId,
755 group_id: RaftGroupId,
756 apply_ns: u64,
757 ) {
758 self.record_applied_mutation_batch(core_id, group_id, 1, apply_ns);
759 }
760
761 pub(crate) fn record_applied_mutation_batch(
762 &self,
763 core_id: CoreId,
764 group_id: RaftGroupId,
765 count: u64,
766 apply_ns: u64,
767 ) {
768 let core_index = usize::from(core_id.0);
769 let group_index = usize::try_from(group_id.0).expect("u32 fits usize");
770 self.per_core_applied_mutations[core_index].fetch_add_relaxed(count);
771 self.per_group_applied_mutations[group_index].fetch_add_relaxed(count);
772 self.per_core_mutation_apply_ns[core_index].fetch_add_relaxed(apply_ns);
773 self.per_group_mutation_apply_ns[group_index].fetch_add_relaxed(apply_ns);
774 }
775
776 pub(crate) fn record_group_engine_exec(
777 &self,
778 core_id: CoreId,
779 group_id: RaftGroupId,
780 exec_ns: u64,
781 ) {
782 let core_index = usize::from(core_id.0);
783 let group_index = usize::try_from(group_id.0).expect("u32 fits usize");
784 self.per_core_group_engine_exec_ns[core_index].fetch_add_relaxed(exec_ns);
785 self.per_group_group_engine_exec_ns[group_index].fetch_add_relaxed(exec_ns);
786 }
787
788 pub(crate) fn record_group_mailbox_enqueued(&self, group_id: RaftGroupId) {
789 let group_index = usize::try_from(group_id.0).expect("u32 fits usize");
790 let depth = self.per_group_group_mailbox_depth[group_index].fetch_add_relaxed(1) + 1;
791 self.per_group_group_mailbox_max_depth[group_index].fetch_max_relaxed(depth);
792 }
793
794 pub(crate) fn record_group_mailbox_dequeued(&self, group_id: RaftGroupId) {
795 let group_index = usize::try_from(group_id.0).expect("u32 fits usize");
796 self.per_group_group_mailbox_depth[group_index].fetch_sub_relaxed(1);
797 }
798
799 pub(crate) fn record_group_mailbox_full(&self, group_id: RaftGroupId) {
800 let group_index = usize::try_from(group_id.0).expect("u32 fits usize");
801 self.per_group_group_mailbox_full_events[group_index].fetch_add_relaxed(1);
802 }
803
804 pub(crate) fn record_raft_write_many(
805 &self,
806 core_id: CoreId,
807 group_id: RaftGroupId,
808 sample: RaftWriteManySample,
809 ) {
810 let core_index = usize::from(core_id.0);
811 let group_index = usize::try_from(group_id.0).expect("u32 fits usize");
812 self.per_core_raft_write_many_batches[core_index].fetch_add_relaxed(1);
813 self.per_group_raft_write_many_batches[group_index].fetch_add_relaxed(1);
814 self.per_core_raft_write_many_commands[core_index].fetch_add_relaxed(sample.command_count);
815 self.per_group_raft_write_many_commands[group_index]
816 .fetch_add_relaxed(sample.command_count);
817 self.per_core_raft_write_many_logical_commands[core_index]
818 .fetch_add_relaxed(sample.logical_command_count);
819 self.per_group_raft_write_many_logical_commands[group_index]
820 .fetch_add_relaxed(sample.logical_command_count);
821 self.per_core_raft_write_many_responses[core_index]
822 .fetch_add_relaxed(sample.response_count);
823 self.per_group_raft_write_many_responses[group_index]
824 .fetch_add_relaxed(sample.response_count);
825 self.per_core_raft_write_many_submit_ns[core_index].fetch_add_relaxed(sample.submit_ns);
826 self.per_group_raft_write_many_submit_ns[group_index].fetch_add_relaxed(sample.submit_ns);
827 self.per_core_raft_write_many_response_ns[core_index].fetch_add_relaxed(sample.response_ns);
828 self.per_group_raft_write_many_response_ns[group_index]
829 .fetch_add_relaxed(sample.response_ns);
830 }
831
832 pub(crate) fn record_raft_apply_batch(
833 &self,
834 core_id: CoreId,
835 group_id: RaftGroupId,
836 entry_count: u64,
837 apply_ns: u64,
838 ) {
839 let core_index = usize::from(core_id.0);
840 let group_index = usize::try_from(group_id.0).expect("u32 fits usize");
841 self.per_core_raft_apply_entries[core_index].fetch_add_relaxed(entry_count);
842 self.per_group_raft_apply_entries[group_index].fetch_add_relaxed(entry_count);
843 self.per_core_raft_apply_ns[core_index].fetch_add_relaxed(apply_ns);
844 self.per_group_raft_apply_ns[group_index].fetch_add_relaxed(apply_ns);
845 }
846
847 pub(crate) fn record_wal_batch(
848 &self,
849 core_id: CoreId,
850 group_id: RaftGroupId,
851 record_count: u64,
852 write_ns: u64,
853 sync_ns: u64,
854 ) {
855 let core_index = usize::from(core_id.0);
856 let group_index = usize::try_from(group_id.0).expect("u32 fits usize");
857 self.per_core_wal_batches[core_index].fetch_add_relaxed(1);
858 self.per_group_wal_batches[group_index].fetch_add_relaxed(1);
859 self.per_core_wal_records[core_index].fetch_add_relaxed(record_count);
860 self.per_group_wal_records[group_index].fetch_add_relaxed(record_count);
861 self.per_core_wal_write_ns[core_index].fetch_add_relaxed(write_ns);
862 self.per_group_wal_write_ns[group_index].fetch_add_relaxed(write_ns);
863 self.per_core_wal_sync_ns[core_index].fetch_add_relaxed(sync_ns);
864 self.per_group_wal_sync_ns[group_index].fetch_add_relaxed(sync_ns);
865 }
866
867 pub(crate) fn record_cold_upload(&self, bytes: u64, upload_ns: u64) {
868 self.cold_flush_uploads.fetch_add_relaxed(1);
869 self.cold_flush_upload_bytes.fetch_add_relaxed(bytes);
870 self.cold_flush_upload_ns.fetch_add_relaxed(upload_ns);
871 }
872
873 pub(crate) fn record_cold_publish(&self, bytes: u64, publish_ns: u64) {
874 self.cold_flush_publishes.fetch_add_relaxed(1);
875 self.cold_flush_publish_bytes.fetch_add_relaxed(bytes);
876 self.cold_flush_publish_ns.fetch_add_relaxed(publish_ns);
877 }
878
879 pub(crate) fn record_cold_orphan_cleanup(&self, bytes: u64, cleanup_failed: bool) {
880 self.cold_orphan_cleanup_attempts.fetch_add_relaxed(1);
881 if cleanup_failed {
882 self.cold_orphan_cleanup_errors.fetch_add_relaxed(1);
883 self.cold_orphan_bytes.fetch_add_relaxed(bytes);
884 }
885 }
886
887 pub(crate) fn record_cold_hot_backlog(
888 &self,
889 group_id: RaftGroupId,
890 stream_hot_bytes: u64,
891 group_hot_bytes: u64,
892 ) {
893 let group_index = usize::try_from(group_id.0).expect("u32 fits usize");
894 self.per_group_cold_hot_bytes[group_index].store_relaxed(group_hot_bytes);
895 self.per_group_cold_hot_bytes_max[group_index].fetch_max_relaxed(group_hot_bytes);
896 self.cold_hot_stream_bytes_max
897 .fetch_max_relaxed(stream_hot_bytes);
898 }
899
900 pub(crate) fn record_cold_backpressure(
901 &self,
902 core_id: CoreId,
903 group_id: RaftGroupId,
904 incoming_bytes: u64,
905 _limit: u64,
906 ) {
907 let core_index = usize::from(core_id.0);
908 let group_index = usize::try_from(group_id.0).expect("u32 fits usize");
909 self.per_core_cold_backpressure_events[core_index].fetch_add_relaxed(1);
910 self.per_group_cold_backpressure_events[group_index].fetch_add_relaxed(1);
911 self.cold_backpressure_bytes
912 .fetch_add_relaxed(incoming_bytes);
913 }
914
915 pub(crate) fn record_read_watcher_added(&self, core_id: CoreId) {
916 self.record_read_watchers_added(core_id, 1);
917 }
918
919 pub(crate) fn record_read_watchers_added(&self, core_id: CoreId, count: usize) {
920 self.per_core_live_read_waiters[usize::from(core_id.0)]
921 .fetch_add_relaxed(u64::try_from(count).expect("watcher count fits u64"));
922 }
923
924 pub(crate) fn record_read_watchers_removed(&self, core_id: CoreId, count: usize) {
925 self.per_core_live_read_waiters[usize::from(core_id.0)]
926 .fetch_sub_relaxed(u64::try_from(count).expect("watcher count fits u64"));
927 }
928
929 pub(crate) fn record_live_read_backpressure(&self, core_id: CoreId) {
930 self.per_core_live_read_backpressure_events[usize::from(core_id.0)].fetch_add_relaxed(1);
931 }
932}
933
934pub(crate) fn elapsed_ns(started_at: Instant) -> u64 {
935 u64::try_from(started_at.elapsed().as_nanos()).unwrap_or(u64::MAX)
936}
937
938pub(crate) fn append_batch_payload_bytes(request: &AppendBatchRequest) -> u64 {
939 request
940 .payloads
941 .iter()
942 .map(|payload| u64::try_from(payload.len()).expect("payload len fits u64"))
943 .sum()
944}
945
946pub(crate) fn record_cold_backpressure_error(
947 metrics: &RuntimeMetricsInner,
948 placement: ShardPlacement,
949 incoming_bytes: u64,
950 admission: ColdWriteAdmission,
951 err: &GroupEngineError,
952) {
953 if !err.message().contains("ColdBackpressure") {
954 return;
955 }
956 metrics.record_cold_backpressure(
957 placement.core_id,
958 placement.raft_group_id,
959 incoming_bytes,
960 admission.max_hot_bytes_per_group.unwrap_or(0),
961 );
962}
963
964pub(crate) fn is_stale_cold_flush_candidate_error(err: &RuntimeError) -> bool {
965 let RuntimeError::GroupEngine { message, .. } = err else {
966 return false;
967 };
968 message.contains("StreamGone")
969 || message.contains("StreamNotFound")
970 || (message.contains("InvalidColdFlush")
971 && (message.contains("beyond stream")
972 || message.contains("does not match the start of a hot payload segment")
973 || message.contains("does not cover contiguous hot payload segments")
974 || message.contains("exceeds stream")
975 || message.contains("non-contiguous hot payload metadata")))
976}
977
978pub(crate) async fn record_cold_hot_backlog(
979 group: &mut Box<dyn GroupEngine>,
980 metrics: &RuntimeMetricsInner,
981 stream_id: BucketStreamId,
982 placement: ShardPlacement,
983) {
984 if let Ok(backlog) = group.cold_hot_backlog(stream_id, placement).await {
985 metrics.record_cold_hot_backlog(
986 placement.raft_group_id,
987 backlog.stream_hot_bytes,
988 backlog.group_hot_bytes,
989 );
990 }
991}
992
993#[derive(Debug)]
994#[repr(align(128))]
995pub(crate) struct PaddedAtomicU64 {
996 value: AtomicU64,
997}
998
999impl PaddedAtomicU64 {
1000 pub(crate) fn new(value: u64) -> Self {
1001 Self {
1002 value: AtomicU64::new(value),
1003 }
1004 }
1005
1006 pub(crate) fn load_relaxed(&self) -> u64 {
1007 self.value.load(Ordering::Relaxed)
1008 }
1009
1010 pub(crate) fn fetch_add_relaxed(&self, value: u64) -> u64 {
1011 self.value.fetch_add(value, Ordering::Relaxed)
1012 }
1013
1014 pub(crate) fn fetch_sub_relaxed(&self, value: u64) {
1015 self.value.fetch_sub(value, Ordering::Relaxed);
1016 }
1017
1018 pub(crate) fn fetch_max_relaxed(&self, value: u64) {
1019 self.value.fetch_max(value, Ordering::Relaxed);
1020 }
1021
1022 pub(crate) fn store_relaxed(&self, value: u64) {
1023 self.value.store(value, Ordering::Relaxed);
1024 }
1025}