Skip to main content

osproxy_observe/
explain.rs

1//! Assembling a [`RequestTrace`] into the `/debug/explain` document and a
2//! bounded store of recent explanations.
3//!
4//! The document is purpose-built for LLM consumption (`docs/05` §6): the ordered
5//! spans as shape attributes, the final status, and, on failure, the
6//! `ErrorContext` with its decision chain and remediation. Because the trace is
7//! shape-only by construction, so is this document; it cannot reveal a tenant
8//! value because none was ever captured.
9
10use std::collections::VecDeque;
11use std::sync::Mutex;
12
13use osproxy_core::{ErrorContext, RequestId};
14use serde_json::{json, Value};
15
16use crate::trace::RequestTrace;
17
18/// Builds the explain document for `request_id` from its `trace`.
19#[must_use]
20pub fn explain_json(request_id: &RequestId, trace: &RequestTrace) -> Value {
21    let mut spans = serde_json::Map::new();
22    if let Some(i) = &trace.ingress {
23        spans.insert(
24            "ingress".into(),
25            json!({ "protocol": i.protocol, "tls_suite": i.tls_suite, "tls_reused": i.tls_reused }),
26        );
27    }
28    if let Some(c) = &trace.classify {
29        spans.insert(
30            "classify".into(),
31            json!({ "endpoint_kind": format!("{:?}", c.endpoint), "is_write": c.endpoint.is_write(), "index_logical": c.logical_index.as_str() }),
32        );
33    }
34    if let Some(r) = &trace.resolve {
35        spans.insert("spi.resolve".into(), resolve_json(r));
36    }
37    if let Some(r) = &trace.rewrite {
38        spans.insert(
39            "rewrite".into(),
40            json!({ "transform_kind": r.transform_kind, "body_bytes": r.body_bytes }),
41        );
42    }
43    if let Some(d) = &trace.dispatch {
44        spans.insert(
45            "dispatch".into(),
46            json!({ "target_cluster": d.cluster.as_str(), "upstream_status": d.upstream_status, "pool_reuse": d.pool_reuse }),
47        );
48    }
49    if let Some(e) = &trace.egress {
50        spans.insert(
51            "egress".into(),
52            json!({ "status": e.status, "response_bytes": e.response_bytes }),
53        );
54    }
55
56    json!({
57        "request_id": request_id.as_str(),
58        // The distributed-trace id (W3C), so this explanation joins the OTLP
59        // trace for the same request. An id, never a value.
60        "trace_id": trace.context.as_ref().map(osproxy_core::TraceContext::trace_id_hex),
61        "outcome": if trace.failed() { "error" } else { "ok" },
62        "spans": Value::Object(spans),
63        "error": trace.error.as_ref().map(error_json),
64    })
65}
66
67/// Serializes the `spi.resolve` span (field names only, never values).
68fn resolve_json(r: &crate::trace::ResolveInfo) -> Value {
69    let fields: Vec<&str> = r
70        .inject_fields
71        .iter()
72        .map(osproxy_core::FieldName::as_str)
73        .collect();
74    json!({
75        "partition_id": r.partition.as_str(),
76        "placement_kind": r.placement_kind,
77        "target_cluster": r.cluster.as_str(),
78        "target_index": r.index.as_str(),
79        "epoch": r.epoch.get(),
80        "inject_field_names": fields,
81        "routing": r.routing,
82        "migration": r.migration,
83    })
84}
85
86/// Serializes an [`ErrorContext`] (ids + remediation, never values).
87fn error_json(err: &ErrorContext) -> Value {
88    let chain = &err.decision_chain;
89    json!({
90        "code": err.code.as_slug(),
91        "retryable": err.retryable,
92        "remediation": err.remediation,
93        "decision_chain": {
94            "principal": chain.principal.as_ref().map(osproxy_core::PrincipalId::as_str),
95            "partition": chain.partition.as_ref().map(osproxy_core::PartitionId::as_str),
96            "cluster": chain.cluster.as_ref().map(osproxy_core::ClusterId::as_str),
97            "index": chain.index.as_ref().map(osproxy_core::IndexName::as_str),
98        },
99    })
100}
101
102/// A bounded, in-memory store of recent request explanations.
103///
104/// A single-instance affordance backing `/debug/explain/{request_id}` (`docs/05`
105/// §5 ring buffer). Oldest entries are evicted past capacity, so memory is
106/// bounded regardless of traffic.
107///
108/// It retains the **assembled trace**, not a serialized document: `/debug/explain`
109/// is read for a vanishing fraction of requests, so building the JSON eagerly on
110/// every request was pure waste (~12µs/req of allocation + serialization). The doc
111/// is built lazily in [`ExplainStore::get`] instead; `record` only clones the
112/// (small, owned) trace and pushes it under the lock.
113#[derive(Debug)]
114pub struct ExplainStore {
115    capacity: usize,
116    entries: Mutex<VecDeque<(RequestId, RequestTrace)>>,
117}
118
119impl ExplainStore {
120    /// Creates a store holding at most `capacity` recent explanations.
121    #[must_use]
122    pub fn new(capacity: usize) -> Self {
123        Self {
124            capacity: capacity.max(1),
125            entries: Mutex::new(VecDeque::new()),
126        }
127    }
128
129    /// Records the trace for `request_id`, evicting the oldest if full. Retains the
130    /// trace as-is, the explain document is assembled lazily on [`Self::get`].
131    pub fn record(&self, request_id: RequestId, trace: &RequestTrace) {
132        let mut entries = self
133            .entries
134            .lock()
135            .unwrap_or_else(std::sync::PoisonError::into_inner);
136        if entries.len() >= self.capacity {
137            entries.pop_front();
138        }
139        entries.push_back((request_id, trace.clone()));
140    }
141
142    /// Looks up `request_id` and assembles its explanation document, if retained.
143    /// The JSON is built here (read time), not at record time.
144    #[must_use]
145    pub fn get(&self, request_id: &RequestId) -> Option<Value> {
146        self.entries
147            .lock()
148            .unwrap_or_else(std::sync::PoisonError::into_inner)
149            .iter()
150            .find(|(id, _)| id == request_id)
151            .map(|(id, trace)| explain_json(id, trace))
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use crate::trace::{ClassifyInfo, DispatchInfo, EgressInfo, ResolveInfo};
159    use osproxy_core::error::DecisionChain;
160    use osproxy_core::{
161        ClusterId, EndpointKind, Epoch, ErrorCode, FieldName, IndexName, PartitionId,
162    };
163
164    fn full_trace() -> RequestTrace {
165        let mut t = RequestTrace::new();
166        t.record_classify(ClassifyInfo {
167            endpoint: EndpointKind::IngestDoc,
168            logical_index: IndexName::from("orders"),
169        });
170        t.record_resolve(ResolveInfo {
171            partition: PartitionId::from("acme"),
172            placement_kind: "shared_index",
173            cluster: ClusterId::from("eu-1"),
174            index: IndexName::from("orders-shared"),
175            epoch: Epoch::new(3),
176            inject_fields: vec![FieldName::from("_tenant")],
177            routing: true,
178            migration: "settled",
179        });
180        t.record_dispatch(DispatchInfo {
181            cluster: ClusterId::from("eu-1"),
182            upstream_status: 201,
183            pool_reuse: true,
184        });
185        t.record_egress(EgressInfo {
186            status: 201,
187            response_bytes: 42,
188        });
189        t
190    }
191
192    #[test]
193    fn explain_document_carries_ids_and_shapes() {
194        let rid = RequestId::from("req-9");
195        let doc = explain_json(&rid, &full_trace());
196        assert_eq!(doc["request_id"], "req-9");
197        assert_eq!(doc["outcome"], "ok");
198        assert_eq!(doc["spans"]["spi.resolve"]["partition_id"], "acme");
199        assert_eq!(doc["spans"]["spi.resolve"]["epoch"], 3);
200        assert_eq!(
201            doc["spans"]["spi.resolve"]["inject_field_names"][0],
202            "_tenant"
203        );
204        assert_eq!(doc["spans"]["dispatch"]["upstream_status"], 201);
205        assert!(doc["error"].is_null());
206    }
207
208    #[test]
209    fn failure_attaches_error_context() {
210        let rid = RequestId::from("req-err");
211        let mut t = RequestTrace::new();
212        let ctx = ErrorContext::new(
213            ErrorCode::PlacementMissing,
214            false,
215            "register a placement for the partition",
216        )
217        .with_chain(DecisionChain {
218            partition: Some(PartitionId::from("ghost")),
219            ..DecisionChain::new()
220        });
221        t.record_error(ctx);
222        let doc = explain_json(&rid, &t);
223        assert_eq!(doc["outcome"], "error");
224        assert_eq!(doc["error"]["code"], "placement_missing");
225        assert_eq!(doc["error"]["decision_chain"]["partition"], "ghost");
226        assert_eq!(doc["error"]["retryable"], false);
227    }
228
229    #[test]
230    fn store_retains_recent_and_evicts_oldest() {
231        let store = ExplainStore::new(2);
232        store.record(RequestId::from("a"), &full_trace());
233        store.record(RequestId::from("b"), &full_trace());
234        store.record(RequestId::from("c"), &full_trace());
235        assert!(store.get(&RequestId::from("a")).is_none(), "a evicted");
236        assert!(store.get(&RequestId::from("b")).is_some());
237        assert!(store.get(&RequestId::from("c")).is_some());
238    }
239}