use osproxy_core::{RequestId, TraceContext};
use serde_json::{json, Value};
use crate::trace::RequestTrace;
const SPAN_KIND_SERVER: i32 = 2;
const STATUS_OK: i32 = 1;
const STATUS_ERROR: i32 = 2;
#[must_use]
pub fn resource_spans(
service_name: &str,
request_id: &RequestId,
trace: &RequestTrace,
start_unix_nano: u64,
end_unix_nano: u64,
) -> Option<Value> {
let context = trace.context()?;
let span = span_json(request_id, trace, context, start_unix_nano, end_unix_nano);
Some(json!({
"resourceSpans": [{
"resource": {
"attributes": [attr_str("service.name", service_name)],
},
"scopeSpans": [{
"scope": { "name": "osproxy" },
"spans": [span],
}],
}],
}))
}
fn span_json(
request_id: &RequestId,
trace: &RequestTrace,
context: &TraceContext,
start_unix_nano: u64,
end_unix_nano: u64,
) -> Value {
let status = if let Some(err) = &trace.error {
json!({ "code": STATUS_ERROR, "message": err.code.as_slug() })
} else {
json!({ "code": STATUS_OK })
};
let mut span = json!({
"traceId": context.trace_id_hex(),
"spanId": context.span_id_hex(),
"name": span_name(trace),
"kind": SPAN_KIND_SERVER,
"startTimeUnixNano": start_unix_nano.to_string(),
"endTimeUnixNano": end_unix_nano.to_string(),
"attributes": attributes(request_id, trace),
"status": status,
});
if let Some(parent) = context.parent_span_id_hex() {
span["parentSpanId"] = json!(parent);
}
span
}
fn span_name(trace: &RequestTrace) -> String {
trace
.classify
.as_ref()
.map_or_else(|| "request".to_owned(), |c| format!("{:?}", c.endpoint))
}
fn attributes(request_id: &RequestId, trace: &RequestTrace) -> Vec<Value> {
let mut a = vec![attr_str("osproxy.request.id", request_id.as_str())];
if let Some(i) = &trace.ingress {
a.push(attr_str("osproxy.protocol", i.protocol));
if let Some(reused) = i.tls_reused {
a.push(attr_bool("tls.session_reused", reused));
}
}
if let Some(c) = &trace.classify {
a.push(attr_str("osproxy.endpoint", &format!("{:?}", c.endpoint)));
a.push(attr_bool("osproxy.request.is_write", c.endpoint.is_write()));
}
if let Some(r) = &trace.resolve {
let names: Vec<&str> = r
.inject_fields
.iter()
.map(osproxy_core::FieldName::as_str)
.collect();
a.push(attr_str("osproxy.partition.id", r.partition.as_str()));
a.push(attr_str("osproxy.placement.kind", r.placement_kind));
a.push(attr_str("osproxy.target.cluster", r.cluster.as_str()));
a.push(attr_str("osproxy.target.index", r.index.as_str()));
a.push(attr_int("osproxy.epoch", r.epoch.get()));
a.push(attr_strs("osproxy.inject.field_names", &names));
a.push(attr_bool("osproxy.routing", r.routing));
a.push(attr_str("osproxy.migration.phase", r.migration));
}
if let Some(r) = &trace.rewrite {
a.push(attr_str("osproxy.rewrite.kind", r.transform_kind));
a.push(attr_int("osproxy.rewrite.body_bytes", r.body_bytes as u64));
}
if let Some(d) = &trace.dispatch {
a.push(attr_int(
"osproxy.upstream.status",
u64::from(d.upstream_status),
));
a.push(attr_bool("osproxy.pool.reuse", d.pool_reuse));
}
if let Some(e) = &trace.egress {
a.push(attr_int("http.response.status_code", u64::from(e.status)));
a.push(attr_int("osproxy.response.bytes", e.response_bytes as u64));
}
if let Some(err) = &trace.error {
a.push(attr_str("osproxy.error.code", err.code.as_slug()));
a.push(attr_bool("osproxy.error.retryable", err.retryable));
}
a
}
fn attr_str(key: &str, value: &str) -> Value {
json!({ "key": key, "value": { "stringValue": value } })
}
fn attr_int(key: &str, value: u64) -> Value {
json!({ "key": key, "value": { "intValue": value.to_string() } })
}
fn attr_bool(key: &str, value: bool) -> Value {
json!({ "key": key, "value": { "boolValue": value } })
}
fn attr_strs(key: &str, values: &[&str]) -> Value {
let items: Vec<Value> = values.iter().map(|v| json!({ "stringValue": v })).collect();
json!({ "key": key, "value": { "arrayValue": { "values": items } } })
}
#[cfg(test)]
mod tests {
use super::*;
use crate::trace::{ClassifyInfo, ResolveInfo};
use osproxy_core::{ClusterId, EndpointKind, Epoch, FieldName, IndexName, PartitionId};
fn traced() -> RequestTrace {
let mut t = RequestTrace::new();
t.record_context(TraceContext::propagate(
Some("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"),
None,
&RequestId::from("req-1"),
));
t.record_classify(ClassifyInfo {
endpoint: EndpointKind::IngestDoc,
logical_index: IndexName::from("orders"),
});
t.record_resolve(ResolveInfo {
partition: PartitionId::from("acme"),
placement_kind: "shared_index",
cluster: ClusterId::from("eu-1"),
index: IndexName::from("orders-shared"),
epoch: Epoch::new(7),
inject_fields: vec![FieldName::from("_tenant")],
routing: true,
migration: "settled",
});
t
}
#[test]
fn encodes_a_resource_span_with_the_proxy_trace_and_span_ids() {
let trace = traced();
let doc =
resource_spans("osproxy", &RequestId::from("req-1"), &trace, 1_000, 2_000).unwrap();
let span = &doc["resourceSpans"][0]["scopeSpans"][0]["spans"][0];
assert_eq!(span["traceId"], "4bf92f3577b34da6a3ce929d0e0e4736");
assert_eq!(
span["spanId"],
trace.context().unwrap().span_id_hex(),
"emitted span id must equal the id propagated downstream"
);
assert_eq!(span["kind"], SPAN_KIND_SERVER);
assert_eq!(span["startTimeUnixNano"], "1000");
assert_eq!(span["endTimeUnixNano"], "2000");
assert_eq!(
doc["resourceSpans"][0]["resource"]["attributes"][0]["value"]["stringValue"],
"osproxy"
);
}
#[test]
fn a_continued_trace_nests_the_span_under_the_callers_parent() {
let doc = resource_spans("svc", &RequestId::from("req-1"), &traced(), 0, 1).unwrap();
let span = &doc["resourceSpans"][0]["scopeSpans"][0]["spans"][0];
assert_eq!(span["parentSpanId"], "00f067aa0ba902b7");
}
#[test]
fn a_root_request_emits_no_parent_span_id() {
let mut t = RequestTrace::new();
t.record_context(TraceContext::propagate(
None,
None,
&RequestId::from("req-1"),
));
let doc = resource_spans("svc", &RequestId::from("req-1"), &t, 0, 1).unwrap();
let span = &doc["resourceSpans"][0]["scopeSpans"][0]["spans"][0];
assert!(
span.get("parentSpanId").is_none(),
"root span has no parent"
);
}
#[test]
fn attributes_are_shape_only_ids_and_names_never_values() {
let doc = resource_spans("svc", &RequestId::from("req-1"), &traced(), 0, 1).unwrap();
let attrs = &doc["resourceSpans"][0]["scopeSpans"][0]["spans"][0]["attributes"];
let text = serde_json::to_string(attrs).unwrap();
assert!(text.contains("osproxy.inject.field_names"));
assert!(text.contains("_tenant"));
assert!(text.contains("\"osproxy.partition.id\""));
assert!(
text.contains(r#"{"intValue":"7"}"#),
"epoch as string int: {text}"
);
}
#[test]
fn a_failed_request_maps_to_otlp_error_status() {
let mut trace = traced();
trace.record_error(osproxy_core::ErrorContext::new(
osproxy_core::ErrorCode::StaleEpoch,
true,
"retry the request",
));
let doc = resource_spans("svc", &RequestId::from("req-1"), &trace, 0, 1).unwrap();
let status = &doc["resourceSpans"][0]["scopeSpans"][0]["spans"][0]["status"];
assert_eq!(status["code"], STATUS_ERROR);
assert_eq!(status["message"], "stale_epoch");
}
#[test]
fn no_trace_context_means_nothing_to_export() {
let trace = RequestTrace::new();
assert!(resource_spans("svc", &RequestId::from("r"), &trace, 0, 1).is_none());
}
}