ff-observability 0.8.1

FlowFabric observability — OTEL metrics registry + typed handles + no-op shim
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
//! OTEL-backed metrics registry.
//!
//! Wires an `opentelemetry_sdk::metrics::SdkMeterProvider` to an
//! `opentelemetry_prometheus` exporter, which reads from a
//! `prometheus::Registry` for text-exposition output.
//!
//! Typed instrument handles are constructed once at startup and cloned
//! into the hot path — OTEL's `Counter<u64>` / `Histogram<f64>` are
//! internally `Arc`-based, so cloning is cheap.

use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;

use opentelemetry::KeyValue;
use opentelemetry::metrics::{Counter, Histogram, MeterProvider as _, ObservableGauge};
use opentelemetry_sdk::metrics::SdkMeterProvider;
use prometheus::{Encoder, TextEncoder};

/// Cardinality envelope — see PR-94 plan §3. Each instrument's label
/// set is bounded well below this; the aggregate at steady state stays
/// under 1k series. These consts exist so CI can assert and PR body
/// reviewers can cross-check without reading the whole file.
///
/// Budget (worst case, typical deployment):
///
/// * `ff_http_requests_total` — 20 routes × 4 methods × ~8 statuses = 640
/// * `ff_http_request_duration_seconds` — same key set, histograms
///   generate bucket+sum+count ~= 13 per series but share the same
///   label set. Count the label-set cardinality only (~640 keys).
/// * `ff_scanner_cycle_duration_seconds` / `_total` — 15 scanners ≈ 15
/// * `ff_cancel_backlog_depth` — 1 (no labels)
/// * `ff_claim_from_grant_duration_seconds` — ≤ 16 lanes
/// * `ff_lease_renewal_total` — 2 outcomes (ok, err) = 2
/// * `ff_attempt_outcome_total` — 5 outcomes × ≤ 16 lanes = 80 (prereq #4)
/// * `ff_worker_at_capacity_total` — 1 (no labels)
/// * `ff_budget_hit_total` — per-dimension, typically ≤ 8
/// * `ff_quota_hit_total` — 2 reasons (rate, concurrency) = 2
///
/// Total: ~690 label-sets, 5-10k underlying series with histogram
/// buckets. OK for a per-ff-server /metrics scrape.
const _CARDINALITY_ENVELOPE_MAX: usize = 1_000;

/// Metric names — defined once so tests and docs stay in sync with
/// the call sites.
///
/// OTEL's Prometheus exporter normalizes per Prometheus conventions:
///
///   * counter instruments get `_total` **appended** at exposition;
///   * instruments with `unit="s"` get `_seconds` **appended**.
///
/// We therefore DO NOT include `_total` / `_seconds` in the OTEL
/// instrument name — they appear on the wire automatically. This
/// keeps the exported names matching the plan inventory
/// (`ff_http_requests_total`, `ff_http_request_duration_seconds`).
mod name {
    pub const HTTP_REQUESTS: &str = "ff_http_requests";
    pub const HTTP_REQUEST_DURATION: &str = "ff_http_request_duration";
    pub const SCANNER_CYCLE_DURATION: &str = "ff_scanner_cycle_duration";
    pub const SCANNER_CYCLE: &str = "ff_scanner_cycle";
    pub const CANCEL_BACKLOG_DEPTH: &str = "ff_cancel_backlog_depth";
    pub const CLAIM_FROM_GRANT_DURATION: &str = "ff_claim_from_grant_duration";
    pub const LEASE_RENEWAL: &str = "ff_lease_renewal";
    pub const ATTEMPT_OUTCOME: &str = "ff_attempt_outcome";
    pub const WORKER_AT_CAPACITY: &str = "ff_worker_at_capacity";
    pub const BUDGET_HIT: &str = "ff_budget_hit";
    pub const QUOTA_HIT: &str = "ff_quota_hit";
    /// RFC-016 Stage A: gauge-style counter of edge-group policy
    /// applications, labelled `policy`. Stage A emits only
    /// `policy="all_of"`; Stage B adds `any_of` / `quorum`.
    pub const EDGE_GROUP_POLICY: &str = "ff_edge_group_policy";
    /// RFC-016 Stage C: count of sibling cancels dispatched by the
    /// edge_cancel_dispatcher scanner, labelled by `reason`
    /// (`sibling_quorum_satisfied` | `sibling_quorum_impossible`).
    pub const SIBLING_CANCEL_DISPATCHED: &str = "ff_sibling_cancel_dispatched";
    /// RFC-016 Stage C: count of sibling-cancel dispositions observed by
    /// the dispatcher, labelled by `disposition` (`cancelled` |
    /// `already_terminal` | `not_found`). Cardinality = 3, no per-
    /// flow/exec labels.
    pub const SIBLING_CANCEL_DISPOSITION: &str = "ff_sibling_cancel_disposition";
    /// RFC-016 Stage D: count of reconcile actions by the
    /// edge_cancel_reconciler scanner (Invariant Q6 safety net).
    /// Labelled by `action` ∈ `sremmed_stale` | `completed_drain` |
    /// `no_op`. Cardinality = 3.
    pub const SIBLING_CANCEL_RECONCILE: &str = "ff_sibling_cancel_reconcile";
    /// RFC-017 Stage B: count of `shutdown_prepare` calls that
    /// exceeded their `grace` budget on the backend.
    pub const SHUTDOWN_TIMEOUT: &str = "ff_shutdown_timeout";
    /// RFC-017 Stage D1 (§8): count of pending-waitpoint responses
    /// that still carried the legacy raw `waitpoint_token` wire field
    /// under the v0.7.x deprecation window. No labels.
    pub const PENDING_WAITPOINT_LEGACY_TOKEN: &str =
        "ff_pending_waitpoint_legacy_token_served";
    /// RFC-017 §9.0 dev-mode override: count of boots that bypassed
    /// `BACKEND_STAGE_READY` via `FF_BACKEND_ACCEPT_UNREADY=1 +
    /// FF_ENV=development`. Labelled by `backend` and `stage`.
    pub const BACKEND_UNREADY_BOOT: &str = "ff_backend_unready_boot";
}

struct Inner {
    registry: prometheus::Registry,
    _provider: SdkMeterProvider,

    http_requests: Counter<u64>,
    http_duration: Histogram<f64>,
    scanner_duration: Histogram<f64>,
    scanner_total: Counter<u64>,
    /// Shared with the observable-gauge callback registered below;
    /// `set_cancel_backlog_depth` stores here, OTEL reads at scrape.
    cancel_backlog_depth: Arc<AtomicU64>,
    /// Must be held for the registry lifetime: OTEL deregisters the
    /// callback when the `ObservableGauge` handle is dropped. Without
    /// this field, `ff_cancel_backlog_depth` would never appear on
    /// `/metrics` (the build-and-drop in `Metrics::new` would release
    /// it before the first scrape).
    _cancel_backlog_gauge: ObservableGauge<u64>,
    claim_duration: Histogram<f64>,
    lease_renewal: Counter<u64>,
    attempt_outcome: Counter<u64>,
    worker_at_capacity: Counter<u64>,
    budget_hit: Counter<u64>,
    quota_hit: Counter<u64>,
    edge_group_policy: Counter<u64>,
    sibling_cancel_dispatched: Counter<u64>,
    sibling_cancel_disposition: Counter<u64>,
    sibling_cancel_reconcile: Counter<u64>,
    shutdown_timeout: Counter<u64>,
    pending_waitpoint_legacy_token: Counter<u64>,
    backend_unready_boot: Counter<u64>,
}

#[derive(Clone)]
pub struct Metrics(Arc<Inner>);

impl Metrics {
    /// Construct the metrics registry, register the Prometheus exporter
    /// with the OTEL `MeterProvider`, and pre-create every instrument.
    ///
    /// Panics on construction failure — observability init happens once
    /// at startup and a bad registry is a deploy-blocker, not a
    /// runtime-handled condition. (Callers gate construction behind the
    /// `observability` feature, so disabling the feature bypasses this
    /// path entirely.)
    pub fn new() -> Self {
        let registry = prometheus::Registry::new();
        let exporter = opentelemetry_prometheus::exporter()
            .with_registry(registry.clone())
            .build()
            .expect("prometheus exporter builds");
        let provider = SdkMeterProvider::builder().with_reader(exporter).build();
        let meter = provider.meter("ff");

        let http_requests = meter
            .u64_counter(name::HTTP_REQUESTS)
            .with_description("HTTP requests handled, labelled by method/path/status.")
            .build();
        let http_duration = meter
            .f64_histogram(name::HTTP_REQUEST_DURATION)
            .with_description("HTTP request duration in seconds.")
            .with_unit("s")
            .build();
        let scanner_duration = meter
            .f64_histogram(name::SCANNER_CYCLE_DURATION)
            .with_description("Scanner cycle duration in seconds, labelled by scanner.")
            .with_unit("s")
            .build();
        let scanner_total = meter
            .u64_counter(name::SCANNER_CYCLE)
            .with_description("Scanner cycle count, labelled by scanner.")
            .build();
        let claim_duration = meter
            .f64_histogram(name::CLAIM_FROM_GRANT_DURATION)
            .with_description("claim_from_grant latency in seconds, labelled by lane.")
            .with_unit("s")
            .build();
        let lease_renewal = meter
            .u64_counter(name::LEASE_RENEWAL)
            .with_description("Lease renewal attempts, labelled by outcome (ok|err).")
            .build();
        let attempt_outcome = meter
            .u64_counter(name::ATTEMPT_OUTCOME)
            .with_description(
                "Terminal attempt outcomes, labelled by lane + outcome \
                 (ok|error|timeout|cancelled|retry).",
            )
            .build();
        let worker_at_capacity = meter
            .u64_counter(name::WORKER_AT_CAPACITY)
            .with_description("Count of claim attempts rejected with WorkerAtCapacity.")
            .build();
        let budget_hit = meter
            .u64_counter(name::BUDGET_HIT)
            .with_description("Budget hard-breach count, labelled by dimension.")
            .build();
        let quota_hit = meter
            .u64_counter(name::QUOTA_HIT)
            .with_description("Quota admission block count, labelled by reason.")
            .build();
        let edge_group_policy = meter
            .u64_counter(name::EDGE_GROUP_POLICY)
            .with_description(
                "RFC-016 edge-group policy applications, labelled by `policy`. \
                 Stage A emits only `policy=\"all_of\"`.",
            )
            .build();
        let sibling_cancel_dispatched = meter
            .u64_counter(name::SIBLING_CANCEL_DISPATCHED)
            .with_description(
                "RFC-016 Stage C sibling cancels dispatched, labelled by \
                 `reason` (`sibling_quorum_satisfied` | \
                 `sibling_quorum_impossible`).",
            )
            .build();
        let sibling_cancel_disposition = meter
            .u64_counter(name::SIBLING_CANCEL_DISPOSITION)
            .with_description(
                "RFC-016 Stage C sibling-cancel dispositions, labelled by \
                 `disposition` (`cancelled` | `already_terminal` | \
                 `not_found`).",
            )
            .build();
        let sibling_cancel_reconcile = meter
            .u64_counter(name::SIBLING_CANCEL_RECONCILE)
            .with_description(
                "RFC-016 Stage D sibling-cancel reconcile actions, labelled \
                 by `action` (`sremmed_stale` | `completed_drain` | \
                 `no_op`). Invariant Q6 crash-recovery safety net.",
            )
            .build();
        let shutdown_timeout = meter
            .u64_counter(name::SHUTDOWN_TIMEOUT)
            .with_description(
                "RFC-017 Stage B: count of `backend.shutdown_prepare` \
                 calls that exceeded their grace budget. Increments once \
                 per timed-out shutdown.",
            )
            .build();
        let pending_waitpoint_legacy_token = meter
            .u64_counter(name::PENDING_WAITPOINT_LEGACY_TOKEN)
            .with_description(
                "RFC-017 Stage D1 (§8): count of pending-waitpoint \
                 entries served with the legacy v0.7.x `waitpoint_token` \
                 wire field. Emitted once per entry served; zero at \
                 v0.8.0 (field removed).",
            )
            .build();
        let backend_unready_boot = meter
            .u64_counter(name::BACKEND_UNREADY_BOOT)
            .with_description(
                "RFC-017 §9.0 dev-override: boots that bypassed \
                 BACKEND_STAGE_READY via FF_BACKEND_ACCEPT_UNREADY=1 + \
                 FF_ENV=development. Labelled by backend + stage.",
            )
            .build();

        // Cancel backlog depth — gauge backed by an AtomicU64 so set()
        // from any thread is lock-free. OTEL observable-gauge callback
        // reads the same atomic each scrape.
        let cancel_backlog_depth = Arc::new(AtomicU64::new(0));
        let depth_cb = Arc::clone(&cancel_backlog_depth);
        let cancel_backlog_gauge = meter
            .u64_observable_gauge(name::CANCEL_BACKLOG_DEPTH)
            .with_description("Current cancel-reconciler backlog depth.")
            .with_callback(move |o| {
                o.observe(depth_cb.load(Ordering::Relaxed), &[]);
            })
            .build();

        Self(Arc::new(Inner {
            registry,
            _provider: provider,
            http_requests,
            http_duration,
            scanner_duration,
            scanner_total,
            cancel_backlog_depth,
            _cancel_backlog_gauge: cancel_backlog_gauge,
            claim_duration,
            lease_renewal,
            attempt_outcome,
            worker_at_capacity,
            budget_hit,
            quota_hit,
            edge_group_policy,
            sibling_cancel_dispatched,
            sibling_cancel_disposition,
            sibling_cancel_reconcile,
            shutdown_timeout,
            pending_waitpoint_legacy_token,
            backend_unready_boot,
        }))
    }

    /// Render the Prometheus text exposition format.
    pub fn render(&self) -> String {
        let metric_families = self.0.registry.gather();
        let encoder = TextEncoder::new();
        let mut buf = Vec::with_capacity(4096);
        // TextEncoder writes valid UTF-8; unwrap on encode is the
        // documented contract (see prometheus::TextEncoder docs).
        encoder
            .encode(&metric_families, &mut buf)
            .expect("prometheus text encode");
        String::from_utf8(buf).expect("prometheus text is utf-8")
    }

    // ── HTTP ──

    pub fn record_http_request(&self, method: &str, path: &str, status: u16, elapsed: Duration) {
        let attrs = [
            KeyValue::new("method", method.to_owned()),
            KeyValue::new("path", path.to_owned()),
            KeyValue::new("status", i64::from(status)),
        ];
        self.0.http_requests.add(1, &attrs);
        self.0.http_duration.record(elapsed.as_secs_f64(), &attrs);
    }

    // ── Scanner ──

    pub fn record_scanner_cycle(&self, scanner: &'static str, elapsed: Duration) {
        let attrs = [KeyValue::new("scanner", scanner)];
        self.0.scanner_total.add(1, &attrs);
        self.0
            .scanner_duration
            .record(elapsed.as_secs_f64(), &attrs);
    }

    // ── Cancel backlog ──

    pub fn set_cancel_backlog_depth(&self, depth: u64) {
        self.0.cancel_backlog_depth.store(depth, Ordering::Relaxed);
    }

    // ── Claim ──

    pub fn record_claim_from_grant(&self, lane: &str, elapsed: Duration) {
        let attrs = [KeyValue::new("lane", lane.to_owned())];
        self.0.claim_duration.record(elapsed.as_secs_f64(), &attrs);
    }

    // ── Lease ──

    pub fn inc_lease_renewal(&self, outcome: &'static str) {
        self.0
            .lease_renewal
            .add(1, &[KeyValue::new("outcome", outcome)]);
    }

    // ── Attempt terminal outcome ──

    /// Increment `ff_attempt_outcome_total` — fired at the trait
    /// boundary when `complete` / `fail` / `cancel` succeed on the
    /// backend. Cardinality is bounded at 5 outcomes × N lanes
    /// (accepted at 5×16=80 per Observability RFC prereq #4).
    pub fn inc_attempt_outcome(&self, lane: &str, outcome: super::AttemptOutcome) {
        self.0.attempt_outcome.add(
            1,
            &[
                KeyValue::new("lane", lane.to_owned()),
                KeyValue::new("outcome", outcome.as_stable_str()),
            ],
        );
    }

    // ── Worker-at-capacity ──

    pub fn inc_worker_at_capacity(&self) {
        self.0.worker_at_capacity.add(1, &[]);
    }

    // ── Budget / quota ──

    pub fn inc_budget_hit(&self, dimension: &str) {
        self.0
            .budget_hit
            .add(1, &[KeyValue::new("dimension", dimension.to_owned())]);
    }
    pub fn inc_quota_hit(&self, reason: &'static str) {
        self.0.quota_hit.add(1, &[KeyValue::new("reason", reason)]);
    }

    // ── RFC-016 Stage A: edge-group policy ──

    /// Increment `ff_edge_group_policy_total{policy}`. Stage A call
    /// sites emit `policy="all_of"`; Stage B adds the other variants.
    pub fn inc_edge_group_policy(&self, policy: &'static str) {
        self.0
            .edge_group_policy
            .add(1, &[KeyValue::new("policy", policy)]);
    }

    // ── RFC-016 Stage C: sibling-cancel dispatcher ──

    /// Increment `ff_sibling_cancel_dispatched_total{reason}`. Reason is
    /// `sibling_quorum_satisfied` or `sibling_quorum_impossible`.
    pub fn inc_sibling_cancel_dispatched(&self, reason: &'static str) {
        self.0
            .sibling_cancel_dispatched
            .add(1, &[KeyValue::new("reason", reason)]);
    }

    /// Increment `ff_sibling_cancel_disposition_total{disposition}`.
    /// Disposition is one of `cancelled` / `already_terminal` /
    /// `not_found`. Fixed-cardinality (3) — no per-flow/exec labels.
    pub fn inc_sibling_cancel_disposition(&self, disposition: &'static str) {
        self.0
            .sibling_cancel_disposition
            .add(1, &[KeyValue::new("disposition", disposition)]);
    }

    /// Increment `ff_sibling_cancel_reconcile_total{action}`. Action is
    /// one of `sremmed_stale` / `completed_drain` / `no_op`. Fixed-
    /// cardinality (3) — no per-flow/exec labels.
    pub fn inc_sibling_cancel_reconcile(&self, action: &'static str) {
        self.0
            .sibling_cancel_reconcile
            .add(1, &[KeyValue::new("action", action)]);
    }

    /// RFC-017 Stage B: increment `ff_shutdown_timeout_total` when a
    /// backend's `shutdown_prepare` call exceeds its grace budget.
    pub fn inc_shutdown_timeout(&self) {
        self.0.shutdown_timeout.add(1, &[]);
    }

    /// RFC-017 Stage D1 (§8): increment
    /// `ff_pending_waitpoint_legacy_token_served_total`. Fired once
    /// per pending-waitpoint entry served with the legacy raw
    /// `waitpoint_token` wire field.
    pub fn inc_pending_waitpoint_legacy_token(&self) {
        self.0.pending_waitpoint_legacy_token.add(1, &[]);
    }

    /// RFC-017 §9.0 dev-override: increment
    /// `ff_backend_unready_boot_total{backend,stage}` when the
    /// server bypasses `BACKEND_STAGE_READY` via
    /// `FF_BACKEND_ACCEPT_UNREADY=1 + FF_ENV=development`.
    pub fn inc_backend_unready_boot(&self, backend: &'static str, stage: &'static str) {
        self.0.backend_unready_boot.add(
            1,
            &[
                KeyValue::new("backend", backend),
                KeyValue::new("stage", stage),
            ],
        );
    }
}

impl Default for Metrics {
    fn default() -> Self {
        Self::new()
    }
}