osproxy_observe/directive.rs
1//! Runtime diagnostics directives, verbosity as **data**, not a code path
2//! (`docs/05` §3-4).
3//!
4//! A [`DiagnosticsDirective`] says "record this much detail for requests matching
5//! this target, at this sample rate, until this time." The [`DirectiveSet`]
6//! evaluator turns the active directives plus a request's attributes into the
7//! effective [`DiagLevel`], the single decision the pipeline reads to decide how
8//! much to record/export. It is the hot path, so evaluation is allocation-free
9//! and the default (no directive matches) is [`DiagLevel::Off`] at near-zero cost.
10//!
11//! This module is the **spine** both delivery channels feed: the signed
12//! `X-Debug-Directive` request header (surgical) and the control-plane store
13//! (fleet-wide). Targeting is the cost lever and the TTL the safety net, a
14//! forgotten "on" expires instead of silently burning cost.
15
16use osproxy_core::{EndpointKind, IndexName, Instant, PartitionId, PrincipalId, RequestId};
17
18/// How much detail to record for a request (ordered: higher = more verbose).
19#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Default)]
20pub enum DiagLevel {
21 /// No recording/export beyond the always-on minimum. Near-zero cost.
22 #[default]
23 Off,
24 /// Shape-only spans (ids, names, sizes), the standard causal trace.
25 Shape,
26 /// Shapes plus per-stage timing.
27 ShapeTiming,
28 /// Shapes, timing, and the rewrite before/after *shape* diff (never values).
29 ShapeRewriteDiff,
30}
31
32impl DiagLevel {
33 /// The level's stable wire name, the inverse of the publish/header parser,
34 /// so an introspected directive's `level` re-publishes verbatim.
35 #[must_use]
36 pub fn as_str(self) -> &'static str {
37 match self {
38 Self::Off => "Off",
39 Self::Shape => "Shape",
40 Self::ShapeTiming => "ShapeTiming",
41 Self::ShapeRewriteDiff => "ShapeRewriteDiff",
42 }
43 }
44
45 /// Parses a level's wire name back to the level, the inverse of
46 /// [`DiagLevel::as_str`], so the publish/header/etcd vocabularies share one
47 /// source of truth. `None` if the name is not a known level (fail-closed).
48 #[must_use]
49 pub fn from_name(name: &str) -> Option<Self> {
50 match name {
51 "Off" => Some(Self::Off),
52 "Shape" => Some(Self::Shape),
53 "ShapeTiming" => Some(Self::ShapeTiming),
54 "ShapeRewriteDiff" => Some(Self::ShapeRewriteDiff),
55 _ => None,
56 }
57 }
58}
59
60/// What a directive targets. A request matches when **every set field** equals
61/// the request's corresponding attribute; an unset field is a wildcard, so an
62/// all-unset match targets every request.
63#[derive(Clone, PartialEq, Eq, Debug, Default)]
64pub struct DirectiveMatch {
65 /// Match only this partition/tenant (once resolved).
66 pub tenant: Option<PartitionId>,
67 /// Match only this logical index.
68 pub index: Option<IndexName>,
69 /// Match only this principal.
70 pub principal: Option<PrincipalId>,
71 /// Match only this endpoint class.
72 pub endpoint: Option<EndpointKind>,
73}
74
75impl DirectiveMatch {
76 /// A match targeting every request (all fields wildcard).
77 #[must_use]
78 pub fn all() -> Self {
79 Self::default()
80 }
81
82 /// Narrows the match to `tenant` (builder style).
83 #[must_use]
84 pub fn for_tenant(mut self, tenant: PartitionId) -> Self {
85 self.tenant = Some(tenant);
86 self
87 }
88
89 /// Narrows the match to `index` (builder style).
90 #[must_use]
91 pub fn for_index(mut self, index: IndexName) -> Self {
92 self.index = Some(index);
93 self
94 }
95
96 /// Narrows the match to `principal` (builder style).
97 #[must_use]
98 pub fn for_principal(mut self, principal: PrincipalId) -> Self {
99 self.principal = Some(principal);
100 self
101 }
102
103 /// Narrows the match to `endpoint` (builder style).
104 #[must_use]
105 pub fn for_endpoint(mut self, endpoint: EndpointKind) -> Self {
106 self.endpoint = Some(endpoint);
107 self
108 }
109
110 /// Whether `attrs` satisfies every set field of this match.
111 #[must_use]
112 pub fn matches(&self, attrs: &RequestAttrs<'_>) -> bool {
113 self.tenant.as_ref().is_none_or(|t| attrs.tenant == Some(t))
114 && self
115 .index
116 .as_ref()
117 .is_none_or(|i| i.as_str() == attrs.index)
118 && self.principal.as_ref().is_none_or(|p| p == attrs.principal)
119 && self.endpoint.is_none_or(|e| e == attrs.endpoint)
120 }
121}
122
123/// The attributes of a request, evaluated against directive matches. The tenant
124/// is optional because it is only known after partition resolution.
125#[derive(Clone, Copy, Debug)]
126pub struct RequestAttrs<'a> {
127 /// The resolved partition, if resolution has happened.
128 pub tenant: Option<&'a PartitionId>,
129 /// The logical index from the request path.
130 pub index: &'a str,
131 /// The authenticated principal.
132 pub principal: &'a PrincipalId,
133 /// The endpoint classification.
134 pub endpoint: EndpointKind,
135}
136
137/// One diagnostics directive: target, verbosity, sampling, and expiry.
138#[derive(Clone, PartialEq, Eq, Debug)]
139pub struct DiagnosticsDirective {
140 /// A stable id for management/correlation (never a tenant value).
141 pub id: String,
142 /// What this directive targets.
143 pub match_: DirectiveMatch,
144 /// The verbosity to apply to matching requests.
145 pub level: DiagLevel,
146 /// Fraction of matching requests to record, in `0..=1` (scaled to per-mille
147 /// so the directive stays `Eq`/`Hash`-friendly; `1000` = always).
148 pub sample_per_mille: u16,
149 /// Absolute expiry: a request at or after this instant does not match (the
150 /// TTL was applied when the directive was created).
151 pub expires_at: Instant,
152 /// Single-instance break-glass: capture into the local ring buffer.
153 pub ring_buffer: bool,
154 /// Fleet traffic capture: tee the matching exchanges to the configured
155 /// capture sink (e.g. Kafka). The runtime on/off switch for capture, off in
156 /// the baseline, flipped on by publishing a directive, so capture is on demand
157 /// and fleet-wide with no restart. Distinct from [`Self::ring_buffer`], which
158 /// is the single-instance forensic tape.
159 pub capture: bool,
160}
161
162impl DiagnosticsDirective {
163 /// This directive's [`DiagLevel`] if it applies to `attrs` at `now` for
164 /// `request` (target matches, not expired, in sample), else `None`. Used to
165 /// fold a single-request (signed-header) directive into the evaluation.
166 #[must_use]
167 pub fn level_if_applies(
168 &self,
169 attrs: &RequestAttrs<'_>,
170 now: Instant,
171 request: &RequestId,
172 ) -> Option<DiagLevel> {
173 self.applies(attrs, now, request).then_some(self.level)
174 }
175
176 /// Whether this directive applies to `attrs` at `now` for `request`: not
177 /// expired, target matches, and the request falls within the sample.
178 #[must_use]
179 fn applies(&self, attrs: &RequestAttrs<'_>, now: Instant, request: &RequestId) -> bool {
180 now < self.expires_at && self.match_.matches(attrs) && self.is_sampled(request)
181 }
182
183 /// Whether `request` is in this directive's sample. Deterministic per request
184 /// id (no RNG): the same request always decides the same way, so a retry is
185 /// recorded consistently.
186 #[must_use]
187 fn is_sampled(&self, request: &RequestId) -> bool {
188 if self.sample_per_mille >= 1000 {
189 return true;
190 }
191 if self.sample_per_mille == 0 {
192 return false;
193 }
194 // Map the request id into 0..1000 and keep it under the threshold.
195 let bucket = fnv1a(request.as_str().as_bytes()) % 1000;
196 u16::try_from(bucket).unwrap_or(u16::MAX) < self.sample_per_mille
197 }
198}
199
200/// The set of active directives, evaluated per request. Cheap to evaluate (a
201/// filtered scan); typically a handful of directives are active at once.
202#[derive(Clone, Debug, Default)]
203pub struct DirectiveSet {
204 directives: Vec<DiagnosticsDirective>,
205}
206
207impl DirectiveSet {
208 /// An empty set, every request evaluates to [`DiagLevel::Off`].
209 #[must_use]
210 pub fn new() -> Self {
211 Self::default()
212 }
213
214 /// Builds a set from active directives.
215 #[must_use]
216 pub fn from_directives(directives: Vec<DiagnosticsDirective>) -> Self {
217 Self { directives }
218 }
219
220 /// How many directives the set holds.
221 #[must_use]
222 pub fn len(&self) -> usize {
223 self.directives.len()
224 }
225
226 /// Whether the set is empty (every request evaluates to `Off`).
227 #[must_use]
228 pub fn is_empty(&self) -> bool {
229 self.directives.is_empty()
230 }
231
232 /// The effective level for a request: the **highest** level among the
233 /// directives that apply (target matches, not expired, in sample), or
234 /// [`DiagLevel::Off`] if none do.
235 #[must_use]
236 pub fn evaluate(
237 &self,
238 attrs: &RequestAttrs<'_>,
239 now: Instant,
240 request: &RequestId,
241 ) -> DiagLevel {
242 self.directives
243 .iter()
244 .filter(|d| d.applies(attrs, now, request))
245 .map(|d| d.level)
246 .max()
247 .unwrap_or(DiagLevel::Off)
248 }
249
250 /// Whether any applying directive requests local ring-buffer capture.
251 #[must_use]
252 pub fn wants_ring_buffer(
253 &self,
254 attrs: &RequestAttrs<'_>,
255 now: Instant,
256 request: &RequestId,
257 ) -> bool {
258 self.directives
259 .iter()
260 .any(|d| d.ring_buffer && d.applies(attrs, now, request))
261 }
262
263 /// Whether any applying directive turns on fleet traffic capture for this
264 /// request. The runtime gate for capture-on-demand: with no matching
265 /// directive (the baseline), this is `false` and nothing is teed.
266 #[must_use]
267 pub fn wants_capture(
268 &self,
269 attrs: &RequestAttrs<'_>,
270 now: Instant,
271 request: &RequestId,
272 ) -> bool {
273 self.directives
274 .iter()
275 .any(|d| d.capture && d.applies(attrs, now, request))
276 }
277
278 /// A well-defined, shape-only introspection of the active settings: for each
279 /// directive, what it targets, at what verbosity and sample, whether it
280 /// captures to the ring buffer, and whether it has expired at `now`.
281 ///
282 /// This is the **read** side of the control-plane store, an agent fetches it
283 /// to see exactly what an instance is applying. The schema mirrors the publish
284 /// body ([`crate::DirectiveSet`] decoding), except the relative `ttl_secs` is
285 /// reported as a computed `expired` flag, since expiry is held as an absolute
286 /// monotonic instant that has no portable numeric form. Value-free throughout:
287 /// the only strings are operator-authored ids and targeting selectors.
288 #[must_use]
289 pub fn introspect(&self, now: Instant) -> serde_json::Value {
290 let directives: Vec<serde_json::Value> = self
291 .directives
292 .iter()
293 .map(|d| {
294 let mut obj = serde_json::Map::new();
295 obj.insert("id".into(), d.id.clone().into());
296 obj.insert("level".into(), d.level.as_str().into());
297 if let Some(t) = &d.match_.tenant {
298 obj.insert("tenant".into(), t.as_str().into());
299 }
300 if let Some(i) = &d.match_.index {
301 obj.insert("index".into(), i.as_str().into());
302 }
303 if let Some(p) = &d.match_.principal {
304 obj.insert("principal".into(), p.as_str().into());
305 }
306 if let Some(e) = d.match_.endpoint {
307 obj.insert("endpoint".into(), e.as_str().into());
308 }
309 obj.insert("sample_per_mille".into(), d.sample_per_mille.into());
310 obj.insert("ring_buffer".into(), d.ring_buffer.into());
311 obj.insert("capture".into(), d.capture.into());
312 obj.insert("expired".into(), (now >= d.expires_at).into());
313 serde_json::Value::Object(obj)
314 })
315 .collect();
316 serde_json::json!({ "directives": directives })
317 }
318}
319
320/// Verifies a signed `X-Debug-Directive` request header into the directive it
321/// authorizes, the **surgical, single-request** delivery channel (`docs/05`
322/// §3). The signature means a client cannot self-enable verbose diagnostics
323/// without the operator's key (NFR-S3); the directive follows the request to
324/// whatever instance handles it. The concrete implementation (HMAC + the token
325/// format) lives in a crypto-capable crate behind this seam.
326pub trait DirectiveVerifier: Send + Sync {
327 /// The directive a valid header authorizes, or `None` if the header is
328 /// absent, malformed, incorrectly signed, or already expired.
329 fn verify(&self, header_value: &str) -> Option<DiagnosticsDirective>;
330}
331
332/// The default: no header channel is configured, so every header is rejected.
333#[derive(Clone, Copy, Debug, Default)]
334pub struct NoVerifier;
335
336impl DirectiveVerifier for NoVerifier {
337 fn verify(&self, _header_value: &str) -> Option<DiagnosticsDirective> {
338 None
339 }
340}
341
342/// FNV-1a 64-bit hash, for deterministic sampling (no RNG, no dependency).
343fn fnv1a(bytes: &[u8]) -> u64 {
344 let mut h = 0xcbf2_9ce4_8422_2325;
345 for &b in bytes {
346 h ^= u64::from(b);
347 h = u64::wrapping_mul(h, 0x0000_0100_0000_01b3);
348 }
349 h
350}
351
352#[cfg(test)]
353#[path = "directive_tests.rs"]
354mod tests;