use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use opentelemetry::KeyValue;
use opentelemetry::metrics::{Counter, Histogram, MeterProvider as _, ObservableGauge};
use opentelemetry_sdk::metrics::SdkMeterProvider;
use prometheus::{Encoder, TextEncoder};
const _CARDINALITY_ENVELOPE_MAX: usize = 1_000;
mod name {
pub const HTTP_REQUESTS: &str = "ff_http_requests";
pub const HTTP_REQUEST_DURATION: &str = "ff_http_request_duration";
pub const SCANNER_CYCLE_DURATION: &str = "ff_scanner_cycle_duration";
pub const SCANNER_CYCLE: &str = "ff_scanner_cycle";
pub const CANCEL_BACKLOG_DEPTH: &str = "ff_cancel_backlog_depth";
pub const CLAIM_FROM_GRANT_DURATION: &str = "ff_claim_from_grant_duration";
pub const LEASE_RENEWAL: &str = "ff_lease_renewal";
pub const ATTEMPT_OUTCOME: &str = "ff_attempt_outcome";
pub const WORKER_AT_CAPACITY: &str = "ff_worker_at_capacity";
pub const BUDGET_HIT: &str = "ff_budget_hit";
pub const QUOTA_HIT: &str = "ff_quota_hit";
pub const EDGE_GROUP_POLICY: &str = "ff_edge_group_policy";
pub const SIBLING_CANCEL_DISPATCHED: &str = "ff_sibling_cancel_dispatched";
pub const SIBLING_CANCEL_DISPOSITION: &str = "ff_sibling_cancel_disposition";
pub const SIBLING_CANCEL_RECONCILE: &str = "ff_sibling_cancel_reconcile";
pub const SHUTDOWN_TIMEOUT: &str = "ff_shutdown_timeout";
pub const PENDING_WAITPOINT_LEGACY_TOKEN: &str =
"ff_pending_waitpoint_legacy_token_served";
pub const BACKEND_UNREADY_BOOT: &str = "ff_backend_unready_boot";
}
struct Inner {
registry: prometheus::Registry,
_provider: SdkMeterProvider,
http_requests: Counter<u64>,
http_duration: Histogram<f64>,
scanner_duration: Histogram<f64>,
scanner_total: Counter<u64>,
cancel_backlog_depth: Arc<AtomicU64>,
_cancel_backlog_gauge: ObservableGauge<u64>,
claim_duration: Histogram<f64>,
lease_renewal: Counter<u64>,
attempt_outcome: Counter<u64>,
worker_at_capacity: Counter<u64>,
budget_hit: Counter<u64>,
quota_hit: Counter<u64>,
edge_group_policy: Counter<u64>,
sibling_cancel_dispatched: Counter<u64>,
sibling_cancel_disposition: Counter<u64>,
sibling_cancel_reconcile: Counter<u64>,
shutdown_timeout: Counter<u64>,
pending_waitpoint_legacy_token: Counter<u64>,
backend_unready_boot: Counter<u64>,
}
#[derive(Clone)]
pub struct Metrics(Arc<Inner>);
impl Metrics {
pub fn new() -> Self {
let registry = prometheus::Registry::new();
let exporter = opentelemetry_prometheus::exporter()
.with_registry(registry.clone())
.build()
.expect("prometheus exporter builds");
let provider = SdkMeterProvider::builder().with_reader(exporter).build();
let meter = provider.meter("ff");
let http_requests = meter
.u64_counter(name::HTTP_REQUESTS)
.with_description("HTTP requests handled, labelled by method/path/status.")
.build();
let http_duration = meter
.f64_histogram(name::HTTP_REQUEST_DURATION)
.with_description("HTTP request duration in seconds.")
.with_unit("s")
.build();
let scanner_duration = meter
.f64_histogram(name::SCANNER_CYCLE_DURATION)
.with_description("Scanner cycle duration in seconds, labelled by scanner.")
.with_unit("s")
.build();
let scanner_total = meter
.u64_counter(name::SCANNER_CYCLE)
.with_description("Scanner cycle count, labelled by scanner.")
.build();
let claim_duration = meter
.f64_histogram(name::CLAIM_FROM_GRANT_DURATION)
.with_description("claim_from_grant latency in seconds, labelled by lane.")
.with_unit("s")
.build();
let lease_renewal = meter
.u64_counter(name::LEASE_RENEWAL)
.with_description("Lease renewal attempts, labelled by outcome (ok|err).")
.build();
let attempt_outcome = meter
.u64_counter(name::ATTEMPT_OUTCOME)
.with_description(
"Terminal attempt outcomes, labelled by lane + outcome \
(ok|error|timeout|cancelled|retry).",
)
.build();
let worker_at_capacity = meter
.u64_counter(name::WORKER_AT_CAPACITY)
.with_description("Count of claim attempts rejected with WorkerAtCapacity.")
.build();
let budget_hit = meter
.u64_counter(name::BUDGET_HIT)
.with_description("Budget hard-breach count, labelled by dimension.")
.build();
let quota_hit = meter
.u64_counter(name::QUOTA_HIT)
.with_description("Quota admission block count, labelled by reason.")
.build();
let edge_group_policy = meter
.u64_counter(name::EDGE_GROUP_POLICY)
.with_description(
"RFC-016 edge-group policy applications, labelled by `policy`. \
Stage A emits only `policy=\"all_of\"`.",
)
.build();
let sibling_cancel_dispatched = meter
.u64_counter(name::SIBLING_CANCEL_DISPATCHED)
.with_description(
"RFC-016 Stage C sibling cancels dispatched, labelled by \
`reason` (`sibling_quorum_satisfied` | \
`sibling_quorum_impossible`).",
)
.build();
let sibling_cancel_disposition = meter
.u64_counter(name::SIBLING_CANCEL_DISPOSITION)
.with_description(
"RFC-016 Stage C sibling-cancel dispositions, labelled by \
`disposition` (`cancelled` | `already_terminal` | \
`not_found`).",
)
.build();
let sibling_cancel_reconcile = meter
.u64_counter(name::SIBLING_CANCEL_RECONCILE)
.with_description(
"RFC-016 Stage D sibling-cancel reconcile actions, labelled \
by `action` (`sremmed_stale` | `completed_drain` | \
`no_op`). Invariant Q6 crash-recovery safety net.",
)
.build();
let shutdown_timeout = meter
.u64_counter(name::SHUTDOWN_TIMEOUT)
.with_description(
"RFC-017 Stage B: count of `backend.shutdown_prepare` \
calls that exceeded their grace budget. Increments once \
per timed-out shutdown.",
)
.build();
let pending_waitpoint_legacy_token = meter
.u64_counter(name::PENDING_WAITPOINT_LEGACY_TOKEN)
.with_description(
"RFC-017 Stage D1 (§8): count of pending-waitpoint \
entries served with the legacy v0.7.x `waitpoint_token` \
wire field. Emitted once per entry served; zero at \
v0.8.0 (field removed).",
)
.build();
let backend_unready_boot = meter
.u64_counter(name::BACKEND_UNREADY_BOOT)
.with_description(
"RFC-017 §9.0 dev-override: boots that bypassed \
BACKEND_STAGE_READY via FF_BACKEND_ACCEPT_UNREADY=1 + \
FF_ENV=development. Labelled by backend + stage.",
)
.build();
let cancel_backlog_depth = Arc::new(AtomicU64::new(0));
let depth_cb = Arc::clone(&cancel_backlog_depth);
let cancel_backlog_gauge = meter
.u64_observable_gauge(name::CANCEL_BACKLOG_DEPTH)
.with_description("Current cancel-reconciler backlog depth.")
.with_callback(move |o| {
o.observe(depth_cb.load(Ordering::Relaxed), &[]);
})
.build();
Self(Arc::new(Inner {
registry,
_provider: provider,
http_requests,
http_duration,
scanner_duration,
scanner_total,
cancel_backlog_depth,
_cancel_backlog_gauge: cancel_backlog_gauge,
claim_duration,
lease_renewal,
attempt_outcome,
worker_at_capacity,
budget_hit,
quota_hit,
edge_group_policy,
sibling_cancel_dispatched,
sibling_cancel_disposition,
sibling_cancel_reconcile,
shutdown_timeout,
pending_waitpoint_legacy_token,
backend_unready_boot,
}))
}
pub fn render(&self) -> String {
let metric_families = self.0.registry.gather();
let encoder = TextEncoder::new();
let mut buf = Vec::with_capacity(4096);
encoder
.encode(&metric_families, &mut buf)
.expect("prometheus text encode");
String::from_utf8(buf).expect("prometheus text is utf-8")
}
pub fn record_http_request(&self, method: &str, path: &str, status: u16, elapsed: Duration) {
let attrs = [
KeyValue::new("method", method.to_owned()),
KeyValue::new("path", path.to_owned()),
KeyValue::new("status", i64::from(status)),
];
self.0.http_requests.add(1, &attrs);
self.0.http_duration.record(elapsed.as_secs_f64(), &attrs);
}
pub fn record_scanner_cycle(&self, scanner: &'static str, elapsed: Duration) {
let attrs = [KeyValue::new("scanner", scanner)];
self.0.scanner_total.add(1, &attrs);
self.0
.scanner_duration
.record(elapsed.as_secs_f64(), &attrs);
}
pub fn set_cancel_backlog_depth(&self, depth: u64) {
self.0.cancel_backlog_depth.store(depth, Ordering::Relaxed);
}
pub fn record_claim_from_grant(&self, lane: &str, elapsed: Duration) {
let attrs = [KeyValue::new("lane", lane.to_owned())];
self.0.claim_duration.record(elapsed.as_secs_f64(), &attrs);
}
pub fn inc_lease_renewal(&self, outcome: &'static str) {
self.0
.lease_renewal
.add(1, &[KeyValue::new("outcome", outcome)]);
}
pub fn inc_attempt_outcome(&self, lane: &str, outcome: super::AttemptOutcome) {
self.0.attempt_outcome.add(
1,
&[
KeyValue::new("lane", lane.to_owned()),
KeyValue::new("outcome", outcome.as_stable_str()),
],
);
}
pub fn inc_worker_at_capacity(&self) {
self.0.worker_at_capacity.add(1, &[]);
}
pub fn inc_budget_hit(&self, dimension: &str) {
self.0
.budget_hit
.add(1, &[KeyValue::new("dimension", dimension.to_owned())]);
}
pub fn inc_quota_hit(&self, reason: &'static str) {
self.0.quota_hit.add(1, &[KeyValue::new("reason", reason)]);
}
pub fn inc_edge_group_policy(&self, policy: &'static str) {
self.0
.edge_group_policy
.add(1, &[KeyValue::new("policy", policy)]);
}
pub fn inc_sibling_cancel_dispatched(&self, reason: &'static str) {
self.0
.sibling_cancel_dispatched
.add(1, &[KeyValue::new("reason", reason)]);
}
pub fn inc_sibling_cancel_disposition(&self, disposition: &'static str) {
self.0
.sibling_cancel_disposition
.add(1, &[KeyValue::new("disposition", disposition)]);
}
pub fn inc_sibling_cancel_reconcile(&self, action: &'static str) {
self.0
.sibling_cancel_reconcile
.add(1, &[KeyValue::new("action", action)]);
}
pub fn inc_shutdown_timeout(&self) {
self.0.shutdown_timeout.add(1, &[]);
}
pub fn inc_pending_waitpoint_legacy_token(&self) {
self.0.pending_waitpoint_legacy_token.add(1, &[]);
}
pub fn inc_backend_unready_boot(&self, backend: &'static str, stage: &'static str) {
self.0.backend_unready_boot.add(
1,
&[
KeyValue::new("backend", backend),
KeyValue::new("stage", stage),
],
);
}
}
impl Default for Metrics {
fn default() -> Self {
Self::new()
}
}