#![allow(clippy::unwrap_used)]
use std::sync::{Arc, Mutex};
use std::time::Duration;
use osproxy_core::{
Clock, ClusterId, EndpointKind, FieldName, IndexName, Instant, ManualClock, PartitionId,
PrincipalId, RequestId,
};
use osproxy_engine::Pipeline;
use osproxy_observe::{
BreakGlassBuffer, DiagLevel, DiagnosticSink, DiagnosticsDirective, DirectiveMatch,
DirectiveSet, DirectiveVerifier, InMemoryDirectiveStore, SpanExporter,
};
use osproxy_sink::MemorySink;
use osproxy_spi::{
BodyDoc, DocIdRule, HeaderView, HttpMethod, IdTemplate, InjectedField, InjectedValue, JsonPath,
PartitionKeySpec, Placement, PlacementAt, Principal, Protocol, RequestCtx, SensitivitySpec,
SpiError, TenancySpi,
};
use osproxy_tenancy::{PlacementTable, TenancyRouter};
use serde_json::Value;
#[derive(Clone, Default)]
struct RecordingExporter(Arc<Mutex<Vec<Value>>>);
impl SpanExporter for RecordingExporter {
fn export(&self, payload: Value) {
self.0.lock().unwrap().push(payload);
}
}
struct SharedTenancy {
table: Arc<PlacementTable>,
}
impl TenancySpi for SharedTenancy {
fn resolve_partition(
&self,
ctx: &osproxy_spi::RequestCtx<'_>,
body: BodyDoc<'_>,
) -> Result<osproxy_core::PartitionId, osproxy_spi::SpiError> {
osproxy_tenancy::resolve_partition_spec(
&PartitionKeySpec::BodyField(JsonPath::new("tenant_id")),
ctx,
body,
)
}
fn doc_id_rule(&self) -> Option<DocIdRule> {
Some(DocIdRule::new(IdTemplate::new("{partition}:{body.id}")).with_routing(true))
}
fn injected_fields(&self) -> Vec<InjectedField> {
vec![InjectedField::new(
FieldName::from("_tenant"),
InjectedValue::PartitionId,
)]
}
fn sensitive_fields(&self) -> SensitivitySpec {
SensitivitySpec::none()
}
async fn placement_for(&self, p: &PartitionId) -> Result<PlacementAt, SpiError> {
self.table.get(p).ok_or_else(|| SpiError::PlacementMissing {
partition: p.clone(),
})
}
}
fn pipeline() -> Pipeline<TenancyRouter<SharedTenancy>, MemorySink> {
let table = Arc::new(PlacementTable::new());
table.set(
PartitionId::from("acme"),
Placement::SharedIndex {
cluster: ClusterId::from("eu-1"),
index: IndexName::from("shared"),
inject: vec![InjectedField::new(
FieldName::from("_tenant"),
InjectedValue::PartitionId,
)],
},
);
Pipeline::new(
TenancyRouter::new(SharedTenancy { table }),
MemorySink::new(),
)
}
async fn ingest(p: &Pipeline<TenancyRouter<SharedTenancy>, MemorySink>, rid: &RequestId) {
let principal = Principal::new(PrincipalId::from("svc"));
let headers: Vec<(String, String)> = vec![];
let body = br#"{"tenant_id":"acme","id":7}"#;
let ctx = RequestCtx::new(
&principal,
rid,
HttpMethod::Put,
EndpointKind::IngestDoc,
Protocol::Http1,
"logical",
HeaderView::new(&headers),
body,
);
p.handle(&ctx).await.unwrap();
}
#[tokio::test]
async fn a_handled_request_exports_one_span_with_the_explain_trace_id() {
let exporter = RecordingExporter::default();
let p = pipeline()
.with_exporter(Arc::new(exporter.clone()))
.with_clock(Arc::new(ManualClock::new()))
.with_service_name("osproxy-test");
let rid = RequestId::from("r");
ingest(&p, &rid).await;
let payloads = exporter.0.lock().unwrap();
assert_eq!(payloads.len(), 1, "exactly one span exported per request");
let span = &payloads[0]["resourceSpans"][0]["scopeSpans"][0]["spans"][0];
let explain_trace_id = p.explain(&rid).unwrap()["trace_id"].clone();
assert_eq!(span["traceId"], explain_trace_id);
assert_eq!(
payloads[0]["resourceSpans"][0]["resource"]["attributes"][0]["value"]["stringValue"],
"osproxy-test"
);
}
#[tokio::test]
async fn the_default_pipeline_exports_nothing() {
let exporter = RecordingExporter::default();
let p = pipeline(); ingest(&p, &RequestId::from("r")).await;
assert!(
exporter.0.lock().unwrap().is_empty(),
"an unconfigured pipeline exports nothing"
);
}
#[tokio::test]
async fn baseline_off_suppresses_export_until_a_directive_selects_the_request() {
let off = RecordingExporter::default();
let off_pipeline = pipeline()
.with_exporter(Arc::new(off.clone()))
.with_clock(Arc::new(ManualClock::new()))
.with_baseline_level(DiagLevel::Off);
ingest(&off_pipeline, &RequestId::from("r")).await;
assert!(
off.0.lock().unwrap().is_empty(),
"baseline Off + no directive exports nothing"
);
let on = RecordingExporter::default();
let clock = Arc::new(ManualClock::new());
let directive = DiagnosticsDirective {
id: "watch-acme".to_owned(),
match_: DirectiveMatch::all().for_tenant(PartitionId::from("acme")),
level: DiagLevel::Shape,
sample_per_mille: 1000,
expires_at: clock.now().saturating_add(Duration::from_secs(3600)),
ring_buffer: false,
capture: false,
};
let on_pipeline = pipeline()
.with_exporter(Arc::new(on.clone()))
.with_clock(clock)
.with_baseline_level(DiagLevel::Off)
.with_directives(Arc::new(DirectiveSet::from_directives(vec![directive])));
ingest(&on_pipeline, &RequestId::from("r")).await;
assert_eq!(
on.0.lock().unwrap().len(),
1,
"a directive targeting the tenant re-enables export"
);
}
#[tokio::test]
async fn publishing_to_the_fleet_store_flips_export_without_rebuilding_the_pipeline() {
let exporter = RecordingExporter::default();
let clock = Arc::new(ManualClock::new());
let store = Arc::new(InMemoryDirectiveStore::new());
let p = pipeline()
.with_exporter(Arc::new(exporter.clone()))
.with_clock(clock.clone())
.with_baseline_level(DiagLevel::Off)
.with_directive_store(store.clone());
ingest(&p, &RequestId::from("r")).await;
assert!(
exporter.0.lock().unwrap().is_empty(),
"empty fleet store exports nothing"
);
store.publish(DirectiveSet::from_directives(vec![DiagnosticsDirective {
id: "fleet-on".to_owned(),
match_: DirectiveMatch::all(),
level: DiagLevel::Shape,
sample_per_mille: 1000,
expires_at: clock.now().saturating_add(Duration::from_secs(3600)),
ring_buffer: false,
capture: false,
}]));
ingest(&p, &RequestId::from("r")).await;
assert_eq!(
exporter.0.lock().unwrap().len(),
1,
"a freshly published fleet directive flips export on with no restart"
);
store.publish(DirectiveSet::new());
ingest(&p, &RequestId::from("r")).await;
assert_eq!(
exporter.0.lock().unwrap().len(),
1,
"clearing the fleet store stops further export (count unchanged)"
);
}
#[tokio::test]
async fn a_ring_buffer_directive_captures_into_the_break_glass_tape() {
let clock = Arc::new(ManualClock::new());
let tape = Arc::new(BreakGlassBuffer::new(8));
let store = Arc::new(InMemoryDirectiveStore::new());
let p = pipeline()
.with_clock(clock.clone())
.with_baseline_level(DiagLevel::Off)
.with_directive_store(store.clone())
.with_break_glass(tape.clone());
ingest(&p, &RequestId::from("r")).await;
assert!(tape.is_empty(), "no ring_buffer directive → empty tape");
store.publish(DirectiveSet::from_directives(vec![DiagnosticsDirective {
id: "break-glass".to_owned(),
match_: DirectiveMatch::all(),
level: DiagLevel::Off, sample_per_mille: 1000,
expires_at: clock.now().saturating_add(Duration::from_secs(3600)),
ring_buffer: true,
capture: false,
}]));
ingest(&p, &RequestId::from("r1")).await;
ingest(&p, &RequestId::from("r2")).await;
let captured = tape.snapshot();
assert_eq!(
captured.len(),
2,
"each matching request is captured in order"
);
assert_eq!(captured[0]["request_id"], "r1");
assert_eq!(captured[1]["request_id"], "r2");
}
#[tokio::test]
async fn an_expired_directive_does_not_re_enable_export() {
let exporter = RecordingExporter::default();
let clock = Arc::new(ManualClock::new());
let directive = DiagnosticsDirective {
id: "stale".to_owned(),
match_: DirectiveMatch::all(),
level: DiagLevel::Shape,
sample_per_mille: 1000,
expires_at: clock.now(), ring_buffer: false,
capture: false,
};
let p = pipeline()
.with_exporter(Arc::new(exporter.clone()))
.with_clock(clock)
.with_baseline_level(DiagLevel::Off)
.with_directives(Arc::new(DirectiveSet::from_directives(vec![directive])));
ingest(&p, &RequestId::from("r")).await;
assert!(
exporter.0.lock().unwrap().is_empty(),
"an expired directive does not export"
);
}
#[derive(Clone, Default)]
struct RecordingDiagnosticSink(Arc<Mutex<Vec<Value>>>);
impl DiagnosticSink for RecordingDiagnosticSink {
fn emit(&self, doc: Value) {
self.0.lock().unwrap().push(doc);
}
}
#[tokio::test]
async fn a_capture_directive_pushes_the_explain_doc_to_the_diagnostic_sink() {
let clock = Arc::new(ManualClock::new());
let sink = RecordingDiagnosticSink::default();
let store = Arc::new(InMemoryDirectiveStore::new());
let p = pipeline()
.with_clock(clock.clone())
.with_baseline_level(DiagLevel::Off)
.with_directive_store(store.clone())
.with_diagnostic_sink(Arc::new(sink.clone()));
ingest(&p, &RequestId::from("r0")).await;
assert!(
sink.0.lock().unwrap().is_empty(),
"no capture directive → nothing pushed to the sink"
);
store.publish(DirectiveSet::from_directives(vec![DiagnosticsDirective {
id: "capture".to_owned(),
match_: DirectiveMatch::all(),
level: DiagLevel::Off, sample_per_mille: 1000,
expires_at: clock.now().saturating_add(Duration::from_secs(3600)),
ring_buffer: true,
capture: false,
}]));
let rid = RequestId::from("r1");
ingest(&p, &rid).await;
let pushed = sink.0.lock().unwrap();
assert_eq!(pushed.len(), 1, "the selected capture is pushed once");
assert_eq!(pushed[0]["request_id"], "r1");
let explain_trace_id = p.explain(&rid).unwrap()["trace_id"].clone();
assert_eq!(
pushed[0]["trace_id"], explain_trace_id,
"the pushed doc is keyed by the same trace_id as /debug/explain"
);
}
struct FakeVerifier {
expires_at: Instant,
}
impl DirectiveVerifier for FakeVerifier {
fn verify(&self, header_value: &str) -> Option<DiagnosticsDirective> {
(header_value == "go").then(|| DiagnosticsDirective {
id: "header".to_owned(),
match_: DirectiveMatch::all(),
level: DiagLevel::Shape,
sample_per_mille: 1000,
expires_at: self.expires_at,
ring_buffer: false,
capture: false,
})
}
}
async fn ingest_with_directive(
p: &Pipeline<TenancyRouter<SharedTenancy>, MemorySink>,
header: Option<&str>,
) {
let principal = Principal::new(PrincipalId::from("svc"));
let headers: Vec<(String, String)> = header
.into_iter()
.map(|h| ("x-debug-directive".to_owned(), h.to_owned()))
.collect();
let rid = RequestId::from("r");
let body = br#"{"tenant_id":"acme","id":7}"#;
let ctx = RequestCtx::new(
&principal,
&rid,
HttpMethod::Put,
EndpointKind::IngestDoc,
Protocol::Http1,
"logical",
HeaderView::new(&headers),
body,
);
p.handle(&ctx).await.unwrap();
}
#[tokio::test]
async fn a_validly_signed_header_enables_export_for_its_request_only() {
let clock = Arc::new(ManualClock::new());
let verifier = FakeVerifier {
expires_at: clock.now().saturating_add(Duration::from_secs(600)),
};
let exporter = RecordingExporter::default();
let p = pipeline()
.with_exporter(Arc::new(exporter.clone()))
.with_clock(clock)
.with_baseline_level(DiagLevel::Off) .with_directive_verifier(Arc::new(verifier));
ingest_with_directive(&p, None).await;
assert!(
exporter.0.lock().unwrap().is_empty(),
"no header → no export"
);
ingest_with_directive(&p, Some("forged")).await;
assert!(
exporter.0.lock().unwrap().is_empty(),
"bad token → no export"
);
ingest_with_directive(&p, Some("go")).await;
assert_eq!(
exporter.0.lock().unwrap().len(),
1,
"a validly signed X-Debug-Directive enables export"
);
}