Skip to main content

osproxy_observe/
decode.rs

1//! Decoding an operator-published fleet [`DirectiveSet`] from JSON (`docs/05` ยง3).
2//!
3//! One fail-closed decoder shared by every publish channel, the
4//! `POST /admin/directives` admin endpoint and a distributed `DirectiveStore`
5//! (e.g. etcd), so a directive means the same thing however it arrives, and a
6//! typo can never silently widen its blast radius.
7//!
8//! **Fail-closed**: any malformed or out-of-range field rejects the *whole* set
9//! rather than publishing a partial or surprising directive. The vocabulary
10//! matches the signed `X-Debug-Directive` token (`level`, optional
11//! `tenant`/`index`/`principal`/`endpoint` targeting, `sample_per_mille`,
12//! `ring_buffer`, `capture`) but with a relative `ttl_secs` resolved against the
13//! injected clock, so a forgotten "on" still self-expires.
14//!
15//! Body shape: `{"directives": [ {"id","level","ttl_secs", ...}, ... ]}`.
16
17use std::time::Duration;
18
19use osproxy_core::{Clock, EndpointKind, IndexName, PartitionId, PrincipalId};
20use serde_json::Value;
21
22use crate::directive::{DiagLevel, DiagnosticsDirective, DirectiveMatch, DirectiveSet};
23
24/// Decodes a publish body into a [`DirectiveSet`], or a stable value-free reason
25/// slug on the first malformed field.
26///
27/// # Errors
28/// A `&'static str` slug (e.g. `"unknown_field"`, `"zero_ttl"`) naming the first
29/// rejection, suitable for a log or an HTTP reason, never echoes a value.
30pub fn decode_directive_set(body: &[u8], clock: &dyn Clock) -> Result<DirectiveSet, &'static str> {
31    let v: Value = serde_json::from_slice(body).map_err(|_| "invalid_json")?;
32    reject_unknown_keys(&v, &["directives"])?;
33    let items = v
34        .get("directives")
35        .and_then(Value::as_array)
36        .ok_or("missing_directives")?;
37    let mut directives = Vec::with_capacity(items.len());
38    for item in items {
39        directives.push(decode_one(item, clock)?);
40    }
41    Ok(DirectiveSet::from_directives(directives))
42}
43
44/// The directive fields a publish body may carry. A typo'd key (e.g. `"tennant"`)
45/// is rejected rather than silently dropped, a mistyped `"tenant"` must not
46/// quietly widen a directive to the whole fleet.
47const DIRECTIVE_KEYS: &[&str] = &[
48    "id",
49    "level",
50    "ttl_secs",
51    "tenant",
52    "index",
53    "principal",
54    "endpoint",
55    "sample_per_mille",
56    "ring_buffer",
57    "capture",
58];
59
60/// Rejects an object carrying any key outside `allowed` (fail-closed; an unknown
61/// key signals a typo or a mismatched client and could change the directive's
62/// meaning if accepted).
63fn reject_unknown_keys(v: &Value, allowed: &[&str]) -> Result<(), &'static str> {
64    let obj = v.as_object().ok_or("not_an_object")?;
65    if obj.keys().all(|k| allowed.contains(&k.as_str())) {
66        Ok(())
67    } else {
68        Err("unknown_field")
69    }
70}
71
72/// Decodes a single directive, resolving its relative TTL against `clock`.
73fn decode_one(v: &Value, clock: &dyn Clock) -> Result<DiagnosticsDirective, &'static str> {
74    reject_unknown_keys(v, DIRECTIVE_KEYS)?;
75    let id = v
76        .get("id")
77        .and_then(Value::as_str)
78        .ok_or("missing_id")?
79        .to_owned();
80    let level = DiagLevel::from_name(
81        v.get("level")
82            .and_then(Value::as_str)
83            .ok_or("missing_level")?,
84    )
85    .ok_or("unknown_level")?;
86
87    let ttl_secs = v
88        .get("ttl_secs")
89        .and_then(Value::as_u64)
90        .ok_or("missing_ttl_secs")?;
91    if ttl_secs == 0 {
92        return Err("zero_ttl");
93    }
94    let expires_at = clock.now().saturating_add(Duration::from_secs(ttl_secs));
95
96    // A present sampling rate must be a valid per-mille; out of range fails closed
97    // rather than widening capture to always-on.
98    let sample_per_mille = match v.get("sample_per_mille") {
99        None => 1000,
100        Some(n) => match n.as_u64() {
101            Some(n) if n <= 1000 => u16::try_from(n).unwrap_or(1000),
102            _ => return Err("bad_sample_rate"),
103        },
104    };
105
106    let mut match_ = DirectiveMatch::all();
107    if let Some(t) = v.get("tenant").and_then(Value::as_str) {
108        match_ = match_.for_tenant(PartitionId::from(t));
109    }
110    if let Some(i) = v.get("index").and_then(Value::as_str) {
111        match_ = match_.for_index(IndexName::from(i));
112    }
113    if let Some(p) = v.get("principal").and_then(Value::as_str) {
114        match_ = match_.for_principal(PrincipalId::from(p));
115    }
116    // A present `endpoint` must name a known class; an unknown one fails closed
117    // rather than silently widening the target (it round-trips with `as_str`).
118    if let Some(e) = v.get("endpoint") {
119        let name = e.as_str().ok_or("bad_endpoint")?;
120        match_ = match_.for_endpoint(EndpointKind::from_name(name).ok_or("unknown_endpoint")?);
121    }
122
123    Ok(DiagnosticsDirective {
124        id,
125        match_,
126        level,
127        sample_per_mille,
128        expires_at,
129        ring_buffer: v
130            .get("ring_buffer")
131            .and_then(Value::as_bool)
132            .unwrap_or(false),
133        capture: v.get("capture").and_then(Value::as_bool).unwrap_or(false),
134    })
135}
136
137#[cfg(test)]
138#[path = "decode_tests.rs"]
139mod tests;