Skip to main content

osproxy_observe/
directive.rs

1//! Runtime diagnostics directives, verbosity as **data**, not a code path
2//! (`docs/05` §3-4).
3//!
4//! A [`DiagnosticsDirective`] says "record this much detail for requests matching
5//! this target, at this sample rate, until this time." The [`DirectiveSet`]
6//! evaluator turns the active directives plus a request's attributes into the
7//! effective [`DiagLevel`], the single decision the pipeline reads to decide how
8//! much to record/export. It is the hot path, so evaluation is allocation-free
9//! and the default (no directive matches) is [`DiagLevel::Off`] at near-zero cost.
10//!
11//! This module is the **spine** both delivery channels feed: the signed
12//! `X-Debug-Directive` request header (surgical) and the control-plane store
13//! (fleet-wide). Targeting is the cost lever and the TTL the safety net, a
14//! forgotten "on" expires instead of silently burning cost.
15
16use osproxy_core::{EndpointKind, IndexName, Instant, PartitionId, PrincipalId, RequestId};
17
18/// How much detail to record for a request (ordered: higher = more verbose).
19#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Default)]
20pub enum DiagLevel {
21    /// No recording/export beyond the always-on minimum. Near-zero cost.
22    #[default]
23    Off,
24    /// Shape-only spans (ids, names, sizes), the standard causal trace.
25    Shape,
26    /// Shapes plus per-stage timing.
27    ShapeTiming,
28    /// Shapes, timing, and the rewrite before/after *shape* diff (never values).
29    ShapeRewriteDiff,
30}
31
32impl DiagLevel {
33    /// The level's stable wire name, the inverse of the publish/​header parser,
34    /// so an introspected directive's `level` re-publishes verbatim.
35    #[must_use]
36    pub fn as_str(self) -> &'static str {
37        match self {
38            Self::Off => "Off",
39            Self::Shape => "Shape",
40            Self::ShapeTiming => "ShapeTiming",
41            Self::ShapeRewriteDiff => "ShapeRewriteDiff",
42        }
43    }
44
45    /// Parses a level's wire name back to the level, the inverse of
46    /// [`DiagLevel::as_str`], so the publish/header/etcd vocabularies share one
47    /// source of truth. `None` if the name is not a known level (fail-closed).
48    #[must_use]
49    pub fn from_name(name: &str) -> Option<Self> {
50        match name {
51            "Off" => Some(Self::Off),
52            "Shape" => Some(Self::Shape),
53            "ShapeTiming" => Some(Self::ShapeTiming),
54            "ShapeRewriteDiff" => Some(Self::ShapeRewriteDiff),
55            _ => None,
56        }
57    }
58}
59
60/// What a directive targets. A request matches when **every set field** equals
61/// the request's corresponding attribute; an unset field is a wildcard, so an
62/// all-unset match targets every request.
63#[derive(Clone, PartialEq, Eq, Debug, Default)]
64pub struct DirectiveMatch {
65    /// Match only this partition/tenant (once resolved).
66    pub tenant: Option<PartitionId>,
67    /// Match only this logical index.
68    pub index: Option<IndexName>,
69    /// Match only this principal.
70    pub principal: Option<PrincipalId>,
71    /// Match only this endpoint class.
72    pub endpoint: Option<EndpointKind>,
73}
74
75impl DirectiveMatch {
76    /// A match targeting every request (all fields wildcard).
77    #[must_use]
78    pub fn all() -> Self {
79        Self::default()
80    }
81
82    /// Narrows the match to `tenant` (builder style).
83    #[must_use]
84    pub fn for_tenant(mut self, tenant: PartitionId) -> Self {
85        self.tenant = Some(tenant);
86        self
87    }
88
89    /// Narrows the match to `index` (builder style).
90    #[must_use]
91    pub fn for_index(mut self, index: IndexName) -> Self {
92        self.index = Some(index);
93        self
94    }
95
96    /// Narrows the match to `principal` (builder style).
97    #[must_use]
98    pub fn for_principal(mut self, principal: PrincipalId) -> Self {
99        self.principal = Some(principal);
100        self
101    }
102
103    /// Narrows the match to `endpoint` (builder style).
104    #[must_use]
105    pub fn for_endpoint(mut self, endpoint: EndpointKind) -> Self {
106        self.endpoint = Some(endpoint);
107        self
108    }
109
110    /// Whether `attrs` satisfies every set field of this match.
111    #[must_use]
112    pub fn matches(&self, attrs: &RequestAttrs<'_>) -> bool {
113        self.tenant.as_ref().is_none_or(|t| attrs.tenant == Some(t))
114            && self
115                .index
116                .as_ref()
117                .is_none_or(|i| i.as_str() == attrs.index)
118            && self.principal.as_ref().is_none_or(|p| p == attrs.principal)
119            && self.endpoint.is_none_or(|e| e == attrs.endpoint)
120    }
121}
122
123/// The attributes of a request, evaluated against directive matches. The tenant
124/// is optional because it is only known after partition resolution.
125#[derive(Clone, Copy, Debug)]
126pub struct RequestAttrs<'a> {
127    /// The resolved partition, if resolution has happened.
128    pub tenant: Option<&'a PartitionId>,
129    /// The logical index from the request path.
130    pub index: &'a str,
131    /// The authenticated principal.
132    pub principal: &'a PrincipalId,
133    /// The endpoint classification.
134    pub endpoint: EndpointKind,
135}
136
137/// One diagnostics directive: target, verbosity, sampling, and expiry.
138#[derive(Clone, PartialEq, Eq, Debug)]
139pub struct DiagnosticsDirective {
140    /// A stable id for management/correlation (never a tenant value).
141    pub id: String,
142    /// What this directive targets.
143    pub match_: DirectiveMatch,
144    /// The verbosity to apply to matching requests.
145    pub level: DiagLevel,
146    /// Fraction of matching requests to record, in `0..=1` (scaled to per-mille
147    /// so the directive stays `Eq`/`Hash`-friendly; `1000` = always).
148    pub sample_per_mille: u16,
149    /// Absolute expiry: a request at or after this instant does not match (the
150    /// TTL was applied when the directive was created).
151    pub expires_at: Instant,
152    /// Single-instance break-glass: capture into the local ring buffer.
153    pub ring_buffer: bool,
154    /// Fleet traffic capture: tee the matching exchanges to the configured
155    /// capture sink (e.g. Kafka). The runtime on/off switch for capture, off in
156    /// the baseline, flipped on by publishing a directive, so capture is on demand
157    /// and fleet-wide with no restart. Distinct from [`Self::ring_buffer`], which
158    /// is the single-instance forensic tape.
159    pub capture: bool,
160}
161
162impl DiagnosticsDirective {
163    /// This directive's [`DiagLevel`] if it applies to `attrs` at `now` for
164    /// `request` (target matches, not expired, in sample), else `None`. Used to
165    /// fold a single-request (signed-header) directive into the evaluation.
166    #[must_use]
167    pub fn level_if_applies(
168        &self,
169        attrs: &RequestAttrs<'_>,
170        now: Instant,
171        request: &RequestId,
172    ) -> Option<DiagLevel> {
173        self.applies(attrs, now, request).then_some(self.level)
174    }
175
176    /// Whether this directive applies to `attrs` at `now` for `request`: not
177    /// expired, target matches, and the request falls within the sample.
178    #[must_use]
179    fn applies(&self, attrs: &RequestAttrs<'_>, now: Instant, request: &RequestId) -> bool {
180        now < self.expires_at && self.match_.matches(attrs) && self.is_sampled(request)
181    }
182
183    /// Whether `request` is in this directive's sample. Deterministic per request
184    /// id (no RNG): the same request always decides the same way, so a retry is
185    /// recorded consistently.
186    #[must_use]
187    fn is_sampled(&self, request: &RequestId) -> bool {
188        if self.sample_per_mille >= 1000 {
189            return true;
190        }
191        if self.sample_per_mille == 0 {
192            return false;
193        }
194        // Map the request id into 0..1000 and keep it under the threshold.
195        let bucket = fnv1a(request.as_str().as_bytes()) % 1000;
196        u16::try_from(bucket).unwrap_or(u16::MAX) < self.sample_per_mille
197    }
198}
199
200/// The set of active directives, evaluated per request. Cheap to evaluate (a
201/// filtered scan); typically a handful of directives are active at once.
202#[derive(Clone, Debug, Default)]
203pub struct DirectiveSet {
204    directives: Vec<DiagnosticsDirective>,
205}
206
207impl DirectiveSet {
208    /// An empty set, every request evaluates to [`DiagLevel::Off`].
209    #[must_use]
210    pub fn new() -> Self {
211        Self::default()
212    }
213
214    /// Builds a set from active directives.
215    #[must_use]
216    pub fn from_directives(directives: Vec<DiagnosticsDirective>) -> Self {
217        Self { directives }
218    }
219
220    /// How many directives the set holds.
221    #[must_use]
222    pub fn len(&self) -> usize {
223        self.directives.len()
224    }
225
226    /// Whether the set is empty (every request evaluates to `Off`).
227    #[must_use]
228    pub fn is_empty(&self) -> bool {
229        self.directives.is_empty()
230    }
231
232    /// The effective level for a request: the **highest** level among the
233    /// directives that apply (target matches, not expired, in sample), or
234    /// [`DiagLevel::Off`] if none do.
235    #[must_use]
236    pub fn evaluate(
237        &self,
238        attrs: &RequestAttrs<'_>,
239        now: Instant,
240        request: &RequestId,
241    ) -> DiagLevel {
242        self.directives
243            .iter()
244            .filter(|d| d.applies(attrs, now, request))
245            .map(|d| d.level)
246            .max()
247            .unwrap_or(DiagLevel::Off)
248    }
249
250    /// Whether any applying directive requests local ring-buffer capture.
251    #[must_use]
252    pub fn wants_ring_buffer(
253        &self,
254        attrs: &RequestAttrs<'_>,
255        now: Instant,
256        request: &RequestId,
257    ) -> bool {
258        self.directives
259            .iter()
260            .any(|d| d.ring_buffer && d.applies(attrs, now, request))
261    }
262
263    /// Whether any applying directive turns on fleet traffic capture for this
264    /// request. The runtime gate for capture-on-demand: with no matching
265    /// directive (the baseline), this is `false` and nothing is teed.
266    #[must_use]
267    pub fn wants_capture(
268        &self,
269        attrs: &RequestAttrs<'_>,
270        now: Instant,
271        request: &RequestId,
272    ) -> bool {
273        self.directives
274            .iter()
275            .any(|d| d.capture && d.applies(attrs, now, request))
276    }
277
278    /// A well-defined, shape-only introspection of the active settings: for each
279    /// directive, what it targets, at what verbosity and sample, whether it
280    /// captures to the ring buffer, and whether it has expired at `now`.
281    ///
282    /// This is the **read** side of the control-plane store, an agent fetches it
283    /// to see exactly what an instance is applying. The schema mirrors the publish
284    /// body ([`crate::DirectiveSet`] decoding), except the relative `ttl_secs` is
285    /// reported as a computed `expired` flag, since expiry is held as an absolute
286    /// monotonic instant that has no portable numeric form. Value-free throughout:
287    /// the only strings are operator-authored ids and targeting selectors.
288    #[must_use]
289    pub fn introspect(&self, now: Instant) -> serde_json::Value {
290        let directives: Vec<serde_json::Value> = self
291            .directives
292            .iter()
293            .map(|d| {
294                let mut obj = serde_json::Map::new();
295                obj.insert("id".into(), d.id.clone().into());
296                obj.insert("level".into(), d.level.as_str().into());
297                if let Some(t) = &d.match_.tenant {
298                    obj.insert("tenant".into(), t.as_str().into());
299                }
300                if let Some(i) = &d.match_.index {
301                    obj.insert("index".into(), i.as_str().into());
302                }
303                if let Some(p) = &d.match_.principal {
304                    obj.insert("principal".into(), p.as_str().into());
305                }
306                if let Some(e) = d.match_.endpoint {
307                    obj.insert("endpoint".into(), e.as_str().into());
308                }
309                obj.insert("sample_per_mille".into(), d.sample_per_mille.into());
310                obj.insert("ring_buffer".into(), d.ring_buffer.into());
311                obj.insert("capture".into(), d.capture.into());
312                obj.insert("expired".into(), (now >= d.expires_at).into());
313                serde_json::Value::Object(obj)
314            })
315            .collect();
316        serde_json::json!({ "directives": directives })
317    }
318}
319
320/// Verifies a signed `X-Debug-Directive` request header into the directive it
321/// authorizes, the **surgical, single-request** delivery channel (`docs/05`
322/// §3). The signature means a client cannot self-enable verbose diagnostics
323/// without the operator's key (NFR-S3); the directive follows the request to
324/// whatever instance handles it. The concrete implementation (HMAC + the token
325/// format) lives in a crypto-capable crate behind this seam.
326pub trait DirectiveVerifier: Send + Sync {
327    /// The directive a valid header authorizes, or `None` if the header is
328    /// absent, malformed, incorrectly signed, or already expired.
329    fn verify(&self, header_value: &str) -> Option<DiagnosticsDirective>;
330}
331
332/// The default: no header channel is configured, so every header is rejected.
333#[derive(Clone, Copy, Debug, Default)]
334pub struct NoVerifier;
335
336impl DirectiveVerifier for NoVerifier {
337    fn verify(&self, _header_value: &str) -> Option<DiagnosticsDirective> {
338        None
339    }
340}
341
342/// FNV-1a 64-bit hash, for deterministic sampling (no RNG, no dependency).
343fn fnv1a(bytes: &[u8]) -> u64 {
344    let mut h = 0xcbf2_9ce4_8422_2325;
345    for &b in bytes {
346        h ^= u64::from(b);
347        h = u64::wrapping_mul(h, 0x0000_0100_0000_01b3);
348    }
349    h
350}
351
352#[cfg(test)]
353#[path = "directive_tests.rs"]
354mod tests;