Skip to main content

osproxy_observe/
explain.rs

1//! Assembling a [`RequestTrace`] into the `/debug/explain` document and a
2//! bounded store of recent explanations.
3//!
4//! The document is purpose-built for LLM consumption (`docs/05` §6): the ordered
5//! spans as shape attributes, the final status, and, on failure, the
6//! `ErrorContext` with its decision chain and remediation. Because the trace is
7//! shape-only by construction, so is this document; it cannot reveal a tenant
8//! value because none was ever captured.
9
10use std::collections::VecDeque;
11use std::sync::Mutex;
12
13use osproxy_core::{ErrorContext, RequestId};
14use serde_json::{json, Value};
15
16use crate::trace::RequestTrace;
17
18/// Builds the explain document for `request_id` from its `trace`.
19#[must_use]
20pub fn explain_json(request_id: &RequestId, trace: &RequestTrace) -> Value {
21    let mut spans = serde_json::Map::new();
22    if let Some(i) = &trace.ingress {
23        spans.insert(
24            "ingress".into(),
25            json!({ "protocol": i.protocol, "tls_suite": i.tls_suite, "tls_reused": i.tls_reused }),
26        );
27    }
28    if let Some(c) = &trace.classify {
29        spans.insert(
30            "classify".into(),
31            json!({ "endpoint_kind": format!("{:?}", c.endpoint), "is_write": c.endpoint.is_write(), "index_logical": c.logical_index.as_str() }),
32        );
33    }
34    if let Some(r) = &trace.resolve {
35        spans.insert("spi.resolve".into(), resolve_json(r));
36    }
37    if let Some(r) = &trace.rewrite {
38        spans.insert(
39            "rewrite".into(),
40            json!({ "transform_kind": r.transform_kind, "body_bytes": r.body_bytes }),
41        );
42    }
43    if let Some(d) = &trace.dispatch {
44        spans.insert(
45            "dispatch".into(),
46            json!({ "target_cluster": d.cluster.as_str(), "upstream_status": d.upstream_status, "pool_reuse": d.pool_reuse }),
47        );
48    }
49    if let Some(e) = &trace.egress {
50        spans.insert(
51            "egress".into(),
52            json!({ "status": e.status, "response_bytes": e.response_bytes }),
53        );
54    }
55
56    json!({
57        "request_id": request_id.as_str(),
58        // The distributed-trace id (W3C), so this explanation joins the OTLP
59        // trace for the same request. An id, never a value.
60        "trace_id": trace.context.as_ref().map(osproxy_core::TraceContext::trace_id_hex),
61        "outcome": if trace.failed() { "error" } else { "ok" },
62        "spans": Value::Object(spans),
63        "error": trace.error.as_ref().map(error_json),
64    })
65}
66
67/// Serializes the `spi.resolve` span (field names only, never values).
68fn resolve_json(r: &crate::trace::ResolveInfo) -> Value {
69    let fields: Vec<&str> = r
70        .inject_fields
71        .iter()
72        .map(osproxy_core::FieldName::as_str)
73        .collect();
74    json!({
75        "partition_id": r.partition.as_str(),
76        "placement_kind": r.placement_kind,
77        "target_cluster": r.cluster.as_str(),
78        "target_index": r.index.as_str(),
79        "epoch": r.epoch.get(),
80        "inject_field_names": fields,
81        "routing": r.routing,
82        "migration": r.migration,
83    })
84}
85
86/// Serializes an [`ErrorContext`] (ids + remediation, never values).
87fn error_json(err: &ErrorContext) -> Value {
88    let chain = &err.decision_chain;
89    json!({
90        "code": err.code.as_slug(),
91        "retryable": err.retryable,
92        "remediation": err.remediation,
93        "decision_chain": {
94            "principal": chain.principal.as_ref().map(osproxy_core::PrincipalId::as_str),
95            "partition": chain.partition.as_ref().map(osproxy_core::PartitionId::as_str),
96            "cluster": chain.cluster.as_ref().map(osproxy_core::ClusterId::as_str),
97            "index": chain.index.as_ref().map(osproxy_core::IndexName::as_str),
98        },
99    })
100}
101
102/// A bounded, in-memory store of recent request explanations.
103///
104/// A single-instance affordance backing `/debug/explain/{request_id}` (`docs/05`
105/// §5 ring buffer). Oldest entries are evicted past capacity, so memory is
106/// bounded regardless of traffic.
107///
108/// It retains the **assembled trace**, not a serialized document: `/debug/explain`
109/// is read for a vanishing fraction of requests, so building the JSON eagerly on
110/// every request was pure waste (~12µs/req of allocation + serialization). The doc
111/// is built lazily in [`ExplainStore::get`] instead; `record` only clones the
112/// (small, owned) trace and pushes it under the lock.
113///
114/// A single lock (not sharded): `record`'s per-request contention was measured to
115/// be dominated by the trace *clone*, not the lock — even one uncontended mutex
116/// under 16 threads matched a 16-way sharded one, because the allocation is the
117/// cost. Cloning the trace *before* taking the lock (below) keeps the critical
118/// section to an O(1) deque op, and a fast concurrent allocator (the binary's
119/// mimalloc) is what actually relieves the clone's cost.
120#[derive(Debug)]
121pub struct ExplainStore {
122    capacity: usize,
123    entries: Mutex<VecDeque<(RequestId, RequestTrace)>>,
124}
125
126impl ExplainStore {
127    /// Creates a store holding at most `capacity` recent explanations.
128    #[must_use]
129    pub fn new(capacity: usize) -> Self {
130        Self {
131            capacity: capacity.max(1),
132            entries: Mutex::new(VecDeque::new()),
133        }
134    }
135
136    /// Records the trace for `request_id`, evicting the oldest if full. Retains the
137    /// trace as-is, the explain document is assembled lazily on [`Self::get`].
138    pub fn record(&self, request_id: RequestId, trace: &RequestTrace) {
139        // Clone the trace *before* taking the lock: the copy is the bulk of the
140        // work, so doing it outside keeps the critical section to the O(1) deque
141        // push/pop (the lock is held for pointer moves, not a copy).
142        let entry = (request_id, trace.clone());
143        let mut entries = self
144            .entries
145            .lock()
146            .unwrap_or_else(std::sync::PoisonError::into_inner);
147        if entries.len() >= self.capacity {
148            entries.pop_front();
149        }
150        entries.push_back(entry);
151    }
152
153    /// Looks up `request_id` and assembles its explanation document, if retained.
154    /// The JSON is built here (read time), not at record time.
155    #[must_use]
156    pub fn get(&self, request_id: &RequestId) -> Option<Value> {
157        self.entries
158            .lock()
159            .unwrap_or_else(std::sync::PoisonError::into_inner)
160            .iter()
161            .find(|(id, _)| id == request_id)
162            .map(|(id, trace)| explain_json(id, trace))
163    }
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169    use crate::trace::{ClassifyInfo, DispatchInfo, EgressInfo, ResolveInfo};
170    use osproxy_core::error::DecisionChain;
171    use osproxy_core::{
172        ClusterId, EndpointKind, Epoch, ErrorCode, FieldName, IndexName, PartitionId,
173    };
174
175    fn full_trace() -> RequestTrace {
176        let mut t = RequestTrace::new();
177        t.record_classify(ClassifyInfo {
178            endpoint: EndpointKind::IngestDoc,
179            logical_index: IndexName::from("orders"),
180        });
181        t.record_resolve(ResolveInfo {
182            partition: PartitionId::from("acme"),
183            placement_kind: "shared_index",
184            cluster: ClusterId::from("eu-1"),
185            index: IndexName::from("orders-shared"),
186            epoch: Epoch::new(3),
187            inject_fields: vec![FieldName::from("_tenant")],
188            routing: true,
189            migration: "settled",
190        });
191        t.record_dispatch(DispatchInfo {
192            cluster: ClusterId::from("eu-1"),
193            upstream_status: 201,
194            pool_reuse: true,
195        });
196        t.record_egress(EgressInfo {
197            status: 201,
198            response_bytes: 42,
199        });
200        t
201    }
202
203    #[test]
204    fn explain_document_carries_ids_and_shapes() {
205        let rid = RequestId::from("req-9");
206        let doc = explain_json(&rid, &full_trace());
207        assert_eq!(doc["request_id"], "req-9");
208        assert_eq!(doc["outcome"], "ok");
209        assert_eq!(doc["spans"]["spi.resolve"]["partition_id"], "acme");
210        assert_eq!(doc["spans"]["spi.resolve"]["epoch"], 3);
211        assert_eq!(
212            doc["spans"]["spi.resolve"]["inject_field_names"][0],
213            "_tenant"
214        );
215        assert_eq!(doc["spans"]["dispatch"]["upstream_status"], 201);
216        assert!(doc["error"].is_null());
217    }
218
219    #[test]
220    fn failure_attaches_error_context() {
221        let rid = RequestId::from("req-err");
222        let mut t = RequestTrace::new();
223        let ctx = ErrorContext::new(
224            ErrorCode::PlacementMissing,
225            false,
226            "register a placement for the partition",
227        )
228        .with_chain(DecisionChain {
229            partition: Some(PartitionId::from("ghost")),
230            ..DecisionChain::new()
231        });
232        t.record_error(ctx);
233        let doc = explain_json(&rid, &t);
234        assert_eq!(doc["outcome"], "error");
235        assert_eq!(doc["error"]["code"], "placement_missing");
236        assert_eq!(doc["error"]["decision_chain"]["partition"], "ghost");
237        assert_eq!(doc["error"]["retryable"], false);
238    }
239
240    #[test]
241    fn store_retains_recent_and_evicts_oldest() {
242        let store = ExplainStore::new(2);
243        store.record(RequestId::from("a"), &full_trace());
244        store.record(RequestId::from("b"), &full_trace());
245        store.record(RequestId::from("c"), &full_trace());
246        assert!(store.get(&RequestId::from("a")).is_none(), "a evicted");
247        assert!(store.get(&RequestId::from("b")).is_some());
248        assert!(store.get(&RequestId::from("c")).is_some());
249    }
250}