osproxy_observe/
decode.rs1use std::time::Duration;
18
19use osproxy_core::{Clock, EndpointKind, IndexName, PartitionId, PrincipalId};
20use serde_json::Value;
21
22use crate::directive::{DiagLevel, DiagnosticsDirective, DirectiveMatch, DirectiveSet};
23
24pub fn decode_directive_set(body: &[u8], clock: &dyn Clock) -> Result<DirectiveSet, &'static str> {
31 let v: Value = serde_json::from_slice(body).map_err(|_| "invalid_json")?;
32 reject_unknown_keys(&v, &["directives"])?;
33 let items = v
34 .get("directives")
35 .and_then(Value::as_array)
36 .ok_or("missing_directives")?;
37 let mut directives = Vec::with_capacity(items.len());
38 for item in items {
39 directives.push(decode_one(item, clock)?);
40 }
41 Ok(DirectiveSet::from_directives(directives))
42}
43
44const DIRECTIVE_KEYS: &[&str] = &[
48 "id",
49 "level",
50 "ttl_secs",
51 "tenant",
52 "index",
53 "principal",
54 "endpoint",
55 "sample_per_mille",
56 "ring_buffer",
57 "capture",
58];
59
60fn reject_unknown_keys(v: &Value, allowed: &[&str]) -> Result<(), &'static str> {
64 let obj = v.as_object().ok_or("not_an_object")?;
65 if obj.keys().all(|k| allowed.contains(&k.as_str())) {
66 Ok(())
67 } else {
68 Err("unknown_field")
69 }
70}
71
72fn decode_one(v: &Value, clock: &dyn Clock) -> Result<DiagnosticsDirective, &'static str> {
74 reject_unknown_keys(v, DIRECTIVE_KEYS)?;
75 let id = v
76 .get("id")
77 .and_then(Value::as_str)
78 .ok_or("missing_id")?
79 .to_owned();
80 let level = DiagLevel::from_name(
81 v.get("level")
82 .and_then(Value::as_str)
83 .ok_or("missing_level")?,
84 )
85 .ok_or("unknown_level")?;
86
87 let ttl_secs = v
88 .get("ttl_secs")
89 .and_then(Value::as_u64)
90 .ok_or("missing_ttl_secs")?;
91 if ttl_secs == 0 {
92 return Err("zero_ttl");
93 }
94 let expires_at = clock.now().saturating_add(Duration::from_secs(ttl_secs));
95
96 let sample_per_mille = match v.get("sample_per_mille") {
99 None => 1000,
100 Some(n) => match n.as_u64() {
101 Some(n) if n <= 1000 => u16::try_from(n).unwrap_or(1000),
102 _ => return Err("bad_sample_rate"),
103 },
104 };
105
106 let mut match_ = DirectiveMatch::all();
107 if let Some(t) = v.get("tenant").and_then(Value::as_str) {
108 match_ = match_.for_tenant(PartitionId::from(t));
109 }
110 if let Some(i) = v.get("index").and_then(Value::as_str) {
111 match_ = match_.for_index(IndexName::from(i));
112 }
113 if let Some(p) = v.get("principal").and_then(Value::as_str) {
114 match_ = match_.for_principal(PrincipalId::from(p));
115 }
116 if let Some(e) = v.get("endpoint") {
119 let name = e.as_str().ok_or("bad_endpoint")?;
120 match_ = match_.for_endpoint(EndpointKind::from_name(name).ok_or("unknown_endpoint")?);
121 }
122
123 Ok(DiagnosticsDirective {
124 id,
125 match_,
126 level,
127 sample_per_mille,
128 expires_at,
129 ring_buffer: v
130 .get("ring_buffer")
131 .and_then(Value::as_bool)
132 .unwrap_or(false),
133 capture: v.get("capture").and_then(Value::as_bool).unwrap_or(false),
134 })
135}
136
137#[cfg(test)]
138#[path = "decode_tests.rs"]
139mod tests;