arcly_http/pipeline/mod.rs
1//! Unified request provenance — one answer to "who is this unit of work,
2//! which trace does it continue, which tenant does it belong to?" shared by
3//! every transport: HTTP requests, WebSocket handshakes, and consumer-mesh
4//! messages.
5//!
6//! ## Why this module exists
7//!
8//! Before it, the three transports drifted: HTTP extracted trace + tenant +
9//! credentials in `assemble_context`, WebSocket handshakes extracted *only*
10//! credentials (no tenant enforcement, orphan-root spans), and the consumer
11//! mesh carried tenant ids as raw strings that never met the
12//! `TenantRegistry` (a suspended tenant's queued events kept processing).
13//! A new identity dimension had to be wired three times — or, in practice,
14//! once, with the other transports silently missing it.
15//!
16//! [`Provenance`] is now the single extraction point. Adding a dimension
17//! here reaches every transport at once; a transport that skips it can be
18//! spotted in review by the absence of one call.
19//!
20//! ## Zero-lock guarantee
21//!
22//! Construction is pure parsing plus frozen-map probes: `traceparent`
23//! parsing, the existing credential pipeline (`auth::extract`), and one
24//! `ArcSwap` snapshot read in the tenant registry. No locks, no I/O beyond
25//! what the credential pipeline already contracted.
26
27use std::sync::Arc;
28
29use axum::http::HeaderMap;
30
31use crate::auth::extract::extract_auth;
32use crate::core::engine::FrozenDiContainer;
33use crate::messaging::InboundMessage;
34use crate::observability::propagation::{extract_trace_context, TraceContext};
35use crate::session::Session;
36use crate::web::context::Claims;
37use crate::web::tenant::{TenantConfig, TenantRegistry};
38
39/// Who/where/why for one unit of work — identical shape across transports.
40pub struct Provenance {
41 /// W3C trace identity: continued from the caller/producer, or a fresh root.
42 pub trace: TraceContext,
43 /// Resolved + validated tenant (`None` = unknown without fallback, or
44 /// suspended — callers must treat suspended as a hard cut-off).
45 pub tenant: Option<Arc<TenantConfig>>,
46 /// Decoded principal claims (JWT / signed cookie), when presented.
47 pub claims: Option<Arc<Claims>>,
48 /// Server-side session, when the session pipeline matched a cookie.
49 pub session: Option<Arc<Session>>,
50}
51
52impl Provenance {
53 /// HTTP requests and WebSocket handshakes: headers carry everything.
54 ///
55 /// This is the ONE place trace + tenant + credentials meet a header map.
56 pub async fn from_headers(headers: &HeaderMap, container: &'static FrozenDiContainer) -> Self {
57 let trace = extract_trace_context(headers);
58 let auth = extract_auth(headers, container).await;
59 let tenant = container
60 .try_get::<TenantRegistry>()
61 .and_then(|tr| tr.resolve(headers));
62 Self {
63 trace,
64 tenant,
65 claims: auth.claims,
66 session: auth.session,
67 }
68 }
69
70 /// Consumer-mesh messages: the producing request's identity rides the
71 /// envelope. The tenant id is validated against the SAME registry as
72 /// HTTP — a suspended tenant's queued events resolve to `None` and stop
73 /// being processed — and the producer's `traceparent` continues the
74 /// distributed trace instead of starting an orphan root.
75 ///
76 /// Messages authenticate at the transport (broker credentials), not per
77 /// event, so `claims`/`session` are absent by design.
78 pub fn from_message(msg: &InboundMessage, container: &'static FrozenDiContainer) -> Self {
79 let trace = msg
80 .traceparent
81 .as_deref()
82 .and_then(TraceContext::from_traceparent)
83 .unwrap_or_else(TraceContext::new_root);
84 let tenant = msg.tenant.as_deref().and_then(|id| {
85 container
86 .try_get::<TenantRegistry>()
87 .and_then(|tr| tr.resolve_by_id(id))
88 });
89 Self {
90 trace,
91 tenant,
92 claims: None,
93 session: None,
94 }
95 }
96
97 /// `traceparent` for stamping outbound hops (HTTP calls, outbox rows,
98 /// queue envelopes).
99 pub fn traceparent(&self) -> String {
100 self.trace.to_traceparent()
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use super::*;
107 use crate::core::engine::DiContainerBuilder;
108 use crate::web::tenant::{TenantId, TenantStrategy};
109
110 fn container_with_registry() -> &'static FrozenDiContainer {
111 let registry = TenantRegistry::new(
112 TenantStrategy::header("x-tenant-id"),
113 vec![TenantConfig {
114 id: TenantId::new("acme"),
115 display_name: "Acme".into(),
116 datasource: "acme".into(),
117 }],
118 None,
119 );
120 registry.suspend(&TenantId::new("globex")); // suspended, never known
121 let mut b = DiContainerBuilder::new();
122 b.register(registry);
123 b.freeze()
124 }
125
126 fn msg(tenant: Option<&str>, traceparent: Option<&str>) -> InboundMessage {
127 InboundMessage {
128 topic: "t".into(),
129 payload: serde_json::Value::Null,
130 idempotency_key: "k".into(),
131 tenant: tenant.map(str::to_owned),
132 traceparent: traceparent.map(str::to_owned),
133 }
134 }
135
136 #[test]
137 fn message_provenance_validates_tenant_against_registry() {
138 let c = container_with_registry();
139
140 // Known tenant resolves to the same config HTTP would see.
141 let p = Provenance::from_message(&msg(Some("acme"), None), c);
142 assert_eq!(p.tenant.as_deref().map(|t| t.id.as_str()), Some("acme"));
143
144 // Unknown and suspended ids resolve to None — handlers never see them.
145 assert!(Provenance::from_message(&msg(Some("nope"), None), c)
146 .tenant
147 .is_none());
148 assert!(Provenance::from_message(&msg(Some("globex"), None), c)
149 .tenant
150 .is_none());
151 // No tenant on the envelope: simply absent.
152 assert!(Provenance::from_message(&msg(None, None), c)
153 .tenant
154 .is_none());
155 }
156
157 #[test]
158 fn message_provenance_continues_producer_trace() {
159 let c = container_with_registry();
160 let carried = "00-0123456789abcdef0123456789abcdef-00f067aa0ba902b7-01";
161
162 let p = Provenance::from_message(&msg(None, Some(carried)), c);
163 let hex = crate::observability::lean_telemetry::hex_encode(&p.trace.trace_id);
164 assert_eq!(
165 hex, "0123456789abcdef0123456789abcdef",
166 "trace id continues"
167 );
168 // Producer's span is this hop's parent; a fresh consumer span is minted.
169 let parent = crate::observability::lean_telemetry::hex_encode(&p.trace.parent_span_id);
170 assert_eq!(parent, "00f067aa0ba902b7");
171
172 // No traceparent → fresh root (all-zero parent).
173 let root = Provenance::from_message(&msg(None, None), c);
174 assert_eq!(root.trace.parent_span_id, [0u8; 8]);
175 }
176}