1use std::collections::VecDeque;
11use std::sync::Mutex;
12
13use osproxy_core::{ErrorContext, RequestId};
14use serde_json::{json, Value};
15
16use crate::trace::RequestTrace;
17
18#[must_use]
20pub fn explain_json(request_id: &RequestId, trace: &RequestTrace) -> Value {
21 let mut spans = serde_json::Map::new();
22 if let Some(i) = &trace.ingress {
23 spans.insert(
24 "ingress".into(),
25 json!({ "protocol": i.protocol, "tls_suite": i.tls_suite, "tls_reused": i.tls_reused }),
26 );
27 }
28 if let Some(c) = &trace.classify {
29 spans.insert(
30 "classify".into(),
31 json!({ "endpoint_kind": format!("{:?}", c.endpoint), "is_write": c.endpoint.is_write(), "index_logical": c.logical_index.as_str() }),
32 );
33 }
34 if let Some(r) = &trace.resolve {
35 spans.insert("spi.resolve".into(), resolve_json(r));
36 }
37 if let Some(r) = &trace.rewrite {
38 spans.insert(
39 "rewrite".into(),
40 json!({ "transform_kind": r.transform_kind, "body_bytes": r.body_bytes }),
41 );
42 }
43 if let Some(d) = &trace.dispatch {
44 spans.insert(
45 "dispatch".into(),
46 json!({ "target_cluster": d.cluster.as_str(), "upstream_status": d.upstream_status, "pool_reuse": d.pool_reuse }),
47 );
48 }
49 if let Some(e) = &trace.egress {
50 spans.insert(
51 "egress".into(),
52 json!({ "status": e.status, "response_bytes": e.response_bytes }),
53 );
54 }
55
56 json!({
57 "request_id": request_id.as_str(),
58 "trace_id": trace.context.as_ref().map(osproxy_core::TraceContext::trace_id_hex),
61 "outcome": if trace.failed() { "error" } else { "ok" },
62 "spans": Value::Object(spans),
63 "error": trace.error.as_ref().map(error_json),
64 })
65}
66
67fn resolve_json(r: &crate::trace::ResolveInfo) -> Value {
69 let fields: Vec<&str> = r
70 .inject_fields
71 .iter()
72 .map(osproxy_core::FieldName::as_str)
73 .collect();
74 json!({
75 "partition_id": r.partition.as_str(),
76 "placement_kind": r.placement_kind,
77 "target_cluster": r.cluster.as_str(),
78 "target_index": r.index.as_str(),
79 "epoch": r.epoch.get(),
80 "inject_field_names": fields,
81 "routing": r.routing,
82 "migration": r.migration,
83 })
84}
85
86fn error_json(err: &ErrorContext) -> Value {
88 let chain = &err.decision_chain;
89 json!({
90 "code": err.code.as_slug(),
91 "retryable": err.retryable,
92 "remediation": err.remediation,
93 "decision_chain": {
94 "principal": chain.principal.as_ref().map(osproxy_core::PrincipalId::as_str),
95 "partition": chain.partition.as_ref().map(osproxy_core::PartitionId::as_str),
96 "cluster": chain.cluster.as_ref().map(osproxy_core::ClusterId::as_str),
97 "index": chain.index.as_ref().map(osproxy_core::IndexName::as_str),
98 },
99 })
100}
101
102#[derive(Debug)]
121pub struct ExplainStore {
122 capacity: usize,
123 entries: Mutex<VecDeque<(RequestId, RequestTrace)>>,
124}
125
126impl ExplainStore {
127 #[must_use]
129 pub fn new(capacity: usize) -> Self {
130 Self {
131 capacity: capacity.max(1),
132 entries: Mutex::new(VecDeque::new()),
133 }
134 }
135
136 pub fn record(&self, request_id: RequestId, trace: &RequestTrace) {
139 let entry = (request_id, trace.clone());
143 let mut entries = self
144 .entries
145 .lock()
146 .unwrap_or_else(std::sync::PoisonError::into_inner);
147 if entries.len() >= self.capacity {
148 entries.pop_front();
149 }
150 entries.push_back(entry);
151 }
152
153 #[must_use]
156 pub fn get(&self, request_id: &RequestId) -> Option<Value> {
157 self.entries
158 .lock()
159 .unwrap_or_else(std::sync::PoisonError::into_inner)
160 .iter()
161 .find(|(id, _)| id == request_id)
162 .map(|(id, trace)| explain_json(id, trace))
163 }
164}
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169 use crate::trace::{ClassifyInfo, DispatchInfo, EgressInfo, ResolveInfo};
170 use osproxy_core::error::DecisionChain;
171 use osproxy_core::{
172 ClusterId, EndpointKind, Epoch, ErrorCode, FieldName, IndexName, PartitionId,
173 };
174
175 fn full_trace() -> RequestTrace {
176 let mut t = RequestTrace::new();
177 t.record_classify(ClassifyInfo {
178 endpoint: EndpointKind::IngestDoc,
179 logical_index: IndexName::from("orders"),
180 });
181 t.record_resolve(ResolveInfo {
182 partition: PartitionId::from("acme"),
183 placement_kind: "shared_index",
184 cluster: ClusterId::from("eu-1"),
185 index: IndexName::from("orders-shared"),
186 epoch: Epoch::new(3),
187 inject_fields: vec![FieldName::from("_tenant")],
188 routing: true,
189 migration: "settled",
190 });
191 t.record_dispatch(DispatchInfo {
192 cluster: ClusterId::from("eu-1"),
193 upstream_status: 201,
194 pool_reuse: true,
195 });
196 t.record_egress(EgressInfo {
197 status: 201,
198 response_bytes: 42,
199 });
200 t
201 }
202
203 #[test]
204 fn explain_document_carries_ids_and_shapes() {
205 let rid = RequestId::from("req-9");
206 let doc = explain_json(&rid, &full_trace());
207 assert_eq!(doc["request_id"], "req-9");
208 assert_eq!(doc["outcome"], "ok");
209 assert_eq!(doc["spans"]["spi.resolve"]["partition_id"], "acme");
210 assert_eq!(doc["spans"]["spi.resolve"]["epoch"], 3);
211 assert_eq!(
212 doc["spans"]["spi.resolve"]["inject_field_names"][0],
213 "_tenant"
214 );
215 assert_eq!(doc["spans"]["dispatch"]["upstream_status"], 201);
216 assert!(doc["error"].is_null());
217 }
218
219 #[test]
220 fn failure_attaches_error_context() {
221 let rid = RequestId::from("req-err");
222 let mut t = RequestTrace::new();
223 let ctx = ErrorContext::new(
224 ErrorCode::PlacementMissing,
225 false,
226 "register a placement for the partition",
227 )
228 .with_chain(DecisionChain {
229 partition: Some(PartitionId::from("ghost")),
230 ..DecisionChain::new()
231 });
232 t.record_error(ctx);
233 let doc = explain_json(&rid, &t);
234 assert_eq!(doc["outcome"], "error");
235 assert_eq!(doc["error"]["code"], "placement_missing");
236 assert_eq!(doc["error"]["decision_chain"]["partition"], "ghost");
237 assert_eq!(doc["error"]["retryable"], false);
238 }
239
240 #[test]
241 fn store_retains_recent_and_evicts_oldest() {
242 let store = ExplainStore::new(2);
243 store.record(RequestId::from("a"), &full_trace());
244 store.record(RequestId::from("b"), &full_trace());
245 store.record(RequestId::from("c"), &full_trace());
246 assert!(store.get(&RequestId::from("a")).is_none(), "a evicted");
247 assert!(store.get(&RequestId::from("b")).is_some());
248 assert!(store.get(&RequestId::from("c")).is_some());
249 }
250}