kvbm-logical 1.3.0-dev.1

Logical layer for KVBM (Key-Value Buffer Manager), managing block metadata, allocation, and eviction policies.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! Raw atomic counters and gauges for a single block pool type.
//!
//! All increment/decrement methods use `Ordering::Relaxed` for zero overhead on the hot path.
//! The [`MetricsAggregator`] reads these atomics at scrape time and builds Prometheus protos.

use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};

/// Raw atomic metrics for a single block pool (one per `BlockManager<T>`).
///
/// Counters are monotonically increasing `AtomicU64`.
/// Gauges are bidirectional `AtomicI64`.
pub struct BlockPoolMetrics {
    type_label: String,

    // Counters (monotonic)
    allocations: AtomicU64,
    allocations_from_reset: AtomicU64,
    evictions: AtomicU64,
    registrations: AtomicU64,
    duplicate_blocks: AtomicU64,
    registration_dedup: AtomicU64,
    stagings: AtomicU64,
    match_hashes_requested: AtomicU64,
    match_blocks_returned: AtomicU64,
    scan_hashes_requested: AtomicU64,
    scan_blocks_returned: AtomicU64,

    // Audit counters for normally-rare branches. These exist primarily
    // so tests can assert "this code path actually fired" rather than
    // inferring it from emergent behaviour, and so production
    // dashboards detect regressions if these spike.
    eager_primary_to_inactive_total: AtomicU64,
    allocate_atomic_rollback_total: AtomicU64,
    release_primary_noop_total: AtomicU64,
    release_duplicate_noop_total: AtomicU64,

    // Gauges (bidirectional)
    inflight_mutable: AtomicI64,
    inflight_immutable: AtomicI64,
    reset_pool_size: AtomicI64,
    inactive_pool_size: AtomicI64,
}

impl BlockPoolMetrics {
    /// Create a new `BlockPoolMetrics` with the given type label (e.g. `"G1"`).
    pub fn new(type_label: String) -> Self {
        Self {
            type_label,
            allocations: AtomicU64::new(0),
            allocations_from_reset: AtomicU64::new(0),
            evictions: AtomicU64::new(0),
            registrations: AtomicU64::new(0),
            duplicate_blocks: AtomicU64::new(0),
            registration_dedup: AtomicU64::new(0),
            stagings: AtomicU64::new(0),
            match_hashes_requested: AtomicU64::new(0),
            match_blocks_returned: AtomicU64::new(0),
            scan_hashes_requested: AtomicU64::new(0),
            scan_blocks_returned: AtomicU64::new(0),
            eager_primary_to_inactive_total: AtomicU64::new(0),
            allocate_atomic_rollback_total: AtomicU64::new(0),
            release_primary_noop_total: AtomicU64::new(0),
            release_duplicate_noop_total: AtomicU64::new(0),
            inflight_mutable: AtomicI64::new(0),
            inflight_immutable: AtomicI64::new(0),
            reset_pool_size: AtomicI64::new(0),
            inactive_pool_size: AtomicI64::new(0),
        }
    }

    /// The pool type label (e.g. `"G1"`, `"G2"`).
    #[inline(always)]
    pub fn type_label(&self) -> &str {
        &self.type_label
    }

    // ---- Counter increments ----

    #[inline(always)]
    pub fn inc_allocations(&self, n: u64) {
        self.allocations.fetch_add(n, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn inc_allocations_from_reset(&self, n: u64) {
        self.allocations_from_reset.fetch_add(n, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn inc_evictions(&self, n: u64) {
        self.evictions.fetch_add(n, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn inc_registrations(&self) {
        self.registrations.fetch_add(1, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn inc_duplicate_blocks(&self) {
        self.duplicate_blocks.fetch_add(1, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn inc_registration_dedup(&self) {
        self.registration_dedup.fetch_add(1, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn inc_stagings(&self) {
        self.stagings.fetch_add(1, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn inc_match_hashes_requested(&self, n: u64) {
        self.match_hashes_requested.fetch_add(n, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn inc_match_blocks_returned(&self, n: u64) {
        self.match_blocks_returned.fetch_add(n, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn inc_scan_hashes_requested(&self, n: u64) {
        self.scan_hashes_requested.fetch_add(n, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn inc_scan_blocks_returned(&self, n: u64) {
        self.scan_blocks_returned.fetch_add(n, Ordering::Relaxed);
    }

    // ---- Gauge operations ----

    #[inline(always)]
    pub fn inc_inflight_mutable(&self) {
        self.inflight_mutable.fetch_add(1, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn inc_inflight_mutable_by(&self, n: i64) {
        self.inflight_mutable.fetch_add(n, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn dec_inflight_mutable(&self) {
        self.inflight_mutable.fetch_sub(1, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn dec_inflight_mutable_by(&self, n: i64) {
        self.inflight_mutable.fetch_sub(n, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn inc_inflight_immutable(&self) {
        self.inflight_immutable.fetch_add(1, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn inc_inflight_immutable_by(&self, n: i64) {
        self.inflight_immutable.fetch_add(n, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn dec_inflight_immutable(&self) {
        self.inflight_immutable.fetch_sub(1, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn dec_inflight_immutable_by(&self, n: i64) {
        self.inflight_immutable.fetch_sub(n, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn set_reset_pool_size(&self, size: i64) {
        self.reset_pool_size.store(size, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn inc_reset_pool_size(&self) {
        self.reset_pool_size.fetch_add(1, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn inc_reset_pool_size_by(&self, n: i64) {
        self.reset_pool_size.fetch_add(n, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn dec_reset_pool_size(&self) {
        self.reset_pool_size.fetch_sub(1, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn dec_reset_pool_size_by(&self, n: i64) {
        self.reset_pool_size.fetch_sub(n, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn set_inactive_pool_size(&self, size: i64) {
        self.inactive_pool_size.store(size, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn inc_inactive_pool_size(&self) {
        self.inactive_pool_size.fetch_add(1, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn inc_inactive_pool_size_by(&self, n: i64) {
        self.inactive_pool_size.fetch_add(n, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn dec_inactive_pool_size(&self) {
        self.inactive_pool_size.fetch_sub(1, Ordering::Relaxed);
    }

    #[inline(always)]
    pub fn dec_inactive_pool_size_by(&self, n: i64) {
        self.inactive_pool_size.fetch_sub(n, Ordering::Relaxed);
    }

    // ---- Audit counters ----

    /// Lookup-driven `Primary → Inactive` transition fired when the
    /// active-pool `Weak` was dead. Hitting this is exclusively a
    /// race-window event; tests assert it ticks under stress.
    #[inline(always)]
    pub fn inc_eager_primary_to_inactive(&self) {
        self.eager_primary_to_inactive_total
            .fetch_add(1, Ordering::Relaxed);
    }

    /// `allocate_atomic` rolled back due to an inactive backend
    /// returning fewer pairs than `len()` advertised. Should never
    /// happen with shipped backends; tests assert it fires when wired
    /// against an under-allocating fake.
    #[inline(always)]
    pub fn inc_allocate_atomic_rollback(&self) {
        self.allocate_atomic_rollback_total
            .fetch_add(1, Ordering::Relaxed);
    }

    /// `release_primary` no-op'd because the slot was no longer
    /// `Primary` for this Inner (a concurrent lookup eagerly transitioned
    /// it, or it was resurrected to a different Inner).
    #[inline(always)]
    pub fn inc_release_primary_noop(&self) {
        self.release_primary_noop_total
            .fetch_add(1, Ordering::Relaxed);
    }

    /// `release_duplicate` no-op'd because the slot's `Duplicate` weak
    /// no longer matches this Inner.
    #[inline(always)]
    pub fn inc_release_duplicate_noop(&self) {
        self.release_duplicate_noop_total
            .fetch_add(1, Ordering::Relaxed);
    }

    // ---- Snapshot for stats collector ----

    /// Take a point-in-time snapshot of all metrics.
    pub fn snapshot(&self) -> MetricsSnapshot {
        MetricsSnapshot {
            allocations: self.allocations.load(Ordering::Relaxed),
            allocations_from_reset: self.allocations_from_reset.load(Ordering::Relaxed),
            evictions: self.evictions.load(Ordering::Relaxed),
            registrations: self.registrations.load(Ordering::Relaxed),
            duplicate_blocks: self.duplicate_blocks.load(Ordering::Relaxed),
            registration_dedup: self.registration_dedup.load(Ordering::Relaxed),
            stagings: self.stagings.load(Ordering::Relaxed),
            match_hashes_requested: self.match_hashes_requested.load(Ordering::Relaxed),
            match_blocks_returned: self.match_blocks_returned.load(Ordering::Relaxed),
            scan_hashes_requested: self.scan_hashes_requested.load(Ordering::Relaxed),
            scan_blocks_returned: self.scan_blocks_returned.load(Ordering::Relaxed),
            eager_primary_to_inactive_total: self
                .eager_primary_to_inactive_total
                .load(Ordering::Relaxed),
            allocate_atomic_rollback_total: self
                .allocate_atomic_rollback_total
                .load(Ordering::Relaxed),
            release_primary_noop_total: self.release_primary_noop_total.load(Ordering::Relaxed),
            release_duplicate_noop_total: self.release_duplicate_noop_total.load(Ordering::Relaxed),
            inflight_mutable: self.inflight_mutable.load(Ordering::Relaxed),
            inflight_immutable: self.inflight_immutable.load(Ordering::Relaxed),
            reset_pool_size: self.reset_pool_size.load(Ordering::Relaxed),
            inactive_pool_size: self.inactive_pool_size.load(Ordering::Relaxed),
        }
    }
}

/// Point-in-time snapshot of all atomic metrics, used by the stats collector and prometheus collector.
#[derive(Debug, Clone, Copy)]
pub struct MetricsSnapshot {
    pub allocations: u64,
    pub allocations_from_reset: u64,
    pub evictions: u64,
    pub registrations: u64,
    pub duplicate_blocks: u64,
    pub registration_dedup: u64,
    pub stagings: u64,
    pub match_hashes_requested: u64,
    pub match_blocks_returned: u64,
    pub scan_hashes_requested: u64,
    pub scan_blocks_returned: u64,
    pub eager_primary_to_inactive_total: u64,
    pub allocate_atomic_rollback_total: u64,
    pub release_primary_noop_total: u64,
    pub release_duplicate_noop_total: u64,
    pub inflight_mutable: i64,
    pub inflight_immutable: i64,
    pub reset_pool_size: i64,
    pub inactive_pool_size: i64,
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_counter_increments() {
        let m = BlockPoolMetrics::new("G1".to_string());

        m.inc_allocations(5);
        m.inc_allocations(3);
        m.inc_evictions(2);
        m.inc_registrations();
        m.inc_duplicate_blocks();
        m.inc_registration_dedup();
        m.inc_stagings();

        let snap = m.snapshot();
        assert_eq!(snap.allocations, 8);
        assert_eq!(snap.evictions, 2);
        assert_eq!(snap.registrations, 1);
        assert_eq!(snap.duplicate_blocks, 1);
        assert_eq!(snap.registration_dedup, 1);
        assert_eq!(snap.stagings, 1);
    }

    #[test]
    fn test_gauge_bidirectional() {
        let m = BlockPoolMetrics::new("G2".to_string());

        m.inc_inflight_mutable();
        m.inc_inflight_mutable();
        m.dec_inflight_mutable();

        m.inc_inflight_immutable();
        m.inc_inflight_immutable();
        m.inc_inflight_immutable();
        m.dec_inflight_immutable();

        let snap = m.snapshot();
        assert_eq!(snap.inflight_mutable, 1);
        assert_eq!(snap.inflight_immutable, 2);
    }

    #[test]
    fn test_pool_size_gauges() {
        let m = BlockPoolMetrics::new("G1".to_string());

        m.set_reset_pool_size(100);
        m.set_inactive_pool_size(50);

        let snap = m.snapshot();
        assert_eq!(snap.reset_pool_size, 100);
        assert_eq!(snap.inactive_pool_size, 50);

        m.set_reset_pool_size(80);
        let snap = m.snapshot();
        assert_eq!(snap.reset_pool_size, 80);

        // Test inc/dec for reset pool size
        m.inc_reset_pool_size();
        m.inc_reset_pool_size();
        m.dec_reset_pool_size();
        let snap = m.snapshot();
        assert_eq!(snap.reset_pool_size, 81);

        // Test inc/dec for inactive pool size
        m.inc_inactive_pool_size();
        m.inc_inactive_pool_size();
        m.inc_inactive_pool_size();
        m.dec_inactive_pool_size();
        let snap = m.snapshot();
        assert_eq!(snap.inactive_pool_size, 52);
    }

    #[test]
    fn test_type_label() {
        let m = BlockPoolMetrics::new("MyPool".to_string());
        assert_eq!(m.type_label(), "MyPool");
    }

    #[test]
    fn test_match_scan_counters() {
        let m = BlockPoolMetrics::new("G1".to_string());

        m.inc_match_hashes_requested(10);
        m.inc_match_blocks_returned(7);
        m.inc_scan_hashes_requested(20);
        m.inc_scan_blocks_returned(15);

        let snap = m.snapshot();
        assert_eq!(snap.match_hashes_requested, 10);
        assert_eq!(snap.match_blocks_returned, 7);
        assert_eq!(snap.scan_hashes_requested, 20);
        assert_eq!(snap.scan_blocks_returned, 15);
    }
}