use std::sync::Arc;
use axum::http::HeaderMap;
use crate::auth::extract::extract_auth;
use crate::core::engine::FrozenDiContainer;
use crate::messaging::InboundMessage;
use crate::observability::propagation::{extract_trace_context, TraceContext};
use crate::session::Session;
use crate::web::context::Claims;
use crate::web::tenant::{TenantConfig, TenantRegistry};
pub struct Provenance {
pub trace: TraceContext,
pub tenant: Option<Arc<TenantConfig>>,
pub claims: Option<Arc<Claims>>,
pub session: Option<Arc<Session>>,
}
impl Provenance {
pub async fn from_headers(headers: &HeaderMap, container: &'static FrozenDiContainer) -> Self {
let trace = extract_trace_context(headers);
let auth = extract_auth(headers, container).await;
let tenant = container
.try_get::<TenantRegistry>()
.and_then(|tr| tr.resolve(headers));
Self {
trace,
tenant,
claims: auth.claims,
session: auth.session,
}
}
pub fn from_message(msg: &InboundMessage, container: &'static FrozenDiContainer) -> Self {
let trace = msg
.traceparent
.as_deref()
.and_then(TraceContext::from_traceparent)
.unwrap_or_else(TraceContext::new_root);
let tenant = msg.tenant.as_deref().and_then(|id| {
container
.try_get::<TenantRegistry>()
.and_then(|tr| tr.resolve_by_id(id))
});
Self {
trace,
tenant,
claims: None,
session: None,
}
}
pub fn traceparent(&self) -> String {
self.trace.to_traceparent()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::engine::DiContainerBuilder;
use crate::web::tenant::{TenantId, TenantStrategy};
fn container_with_registry() -> &'static FrozenDiContainer {
let registry = TenantRegistry::new(
TenantStrategy::Header("x-tenant-id"),
vec![TenantConfig {
id: TenantId::new("acme"),
display_name: "Acme".into(),
datasource: "acme".into(),
}],
None,
);
registry.suspend(&TenantId::new("globex")); let mut b = DiContainerBuilder::new();
b.register(registry);
b.freeze()
}
fn msg(tenant: Option<&str>, traceparent: Option<&str>) -> InboundMessage {
InboundMessage {
topic: "t".into(),
payload: serde_json::Value::Null,
idempotency_key: "k".into(),
tenant: tenant.map(str::to_owned),
traceparent: traceparent.map(str::to_owned),
}
}
#[test]
fn message_provenance_validates_tenant_against_registry() {
let c = container_with_registry();
let p = Provenance::from_message(&msg(Some("acme"), None), c);
assert_eq!(p.tenant.as_deref().map(|t| t.id.as_str()), Some("acme"));
assert!(Provenance::from_message(&msg(Some("nope"), None), c)
.tenant
.is_none());
assert!(Provenance::from_message(&msg(Some("globex"), None), c)
.tenant
.is_none());
assert!(Provenance::from_message(&msg(None, None), c)
.tenant
.is_none());
}
#[test]
fn message_provenance_continues_producer_trace() {
let c = container_with_registry();
let carried = "00-0123456789abcdef0123456789abcdef-00f067aa0ba902b7-01";
let p = Provenance::from_message(&msg(None, Some(carried)), c);
let hex = crate::observability::lean_telemetry::hex_encode(&p.trace.trace_id);
assert_eq!(
hex, "0123456789abcdef0123456789abcdef",
"trace id continues"
);
let parent = crate::observability::lean_telemetry::hex_encode(&p.trace.parent_span_id);
assert_eq!(parent, "00f067aa0ba902b7");
let root = Provenance::from_message(&msg(None, None), c);
assert_eq!(root.trace.parent_span_id, [0u8; 8]);
}
}