Skip to main content

arcly_http/pipeline/
mod.rs

1//! Unified request provenance — one answer to "who is this unit of work,
2//! which trace does it continue, which tenant does it belong to?" shared by
3//! every transport: HTTP requests, WebSocket handshakes, and consumer-mesh
4//! messages.
5//!
6//! ## Why this module exists
7//!
8//! Before it, the three transports drifted: HTTP extracted trace + tenant +
9//! credentials in `assemble_context`, WebSocket handshakes extracted *only*
10//! credentials (no tenant enforcement, orphan-root spans), and the consumer
11//! mesh carried tenant ids as raw strings that never met the
12//! `TenantRegistry` (a suspended tenant's queued events kept processing).
13//! A new identity dimension had to be wired three times — or, in practice,
14//! once, with the other transports silently missing it.
15//!
16//! [`Provenance`] is now the single extraction point. Adding a dimension
17//! here reaches every transport at once; a transport that skips it can be
18//! spotted in review by the absence of one call.
19//!
20//! ## Zero-lock guarantee
21//!
22//! Construction is pure parsing plus frozen-map probes: `traceparent`
23//! parsing, the existing credential pipeline (`auth::extract`), and one
24//! `ArcSwap` snapshot read in the tenant registry. No locks, no I/O beyond
25//! what the credential pipeline already contracted.
26
27use std::sync::Arc;
28
29use axum::http::HeaderMap;
30
31use crate::auth::extract::extract_auth;
32use crate::core::engine::FrozenDiContainer;
33use crate::messaging::InboundMessage;
34use crate::observability::propagation::{extract_trace_context, TraceContext};
35use crate::session::Session;
36use crate::web::context::Claims;
37use crate::web::tenant::{TenantConfig, TenantRegistry};
38
39/// Who/where/why for one unit of work — identical shape across transports.
40pub struct Provenance {
41    /// W3C trace identity: continued from the caller/producer, or a fresh root.
42    pub trace: TraceContext,
43    /// Resolved + validated tenant (`None` = unknown without fallback, or
44    /// suspended — callers must treat suspended as a hard cut-off).
45    pub tenant: Option<Arc<TenantConfig>>,
46    /// Decoded principal claims (JWT / signed cookie), when presented.
47    pub claims: Option<Arc<Claims>>,
48    /// Server-side session, when the session pipeline matched a cookie.
49    pub session: Option<Arc<Session>>,
50}
51
52impl Provenance {
53    /// HTTP requests and WebSocket handshakes: headers carry everything.
54    ///
55    /// This is the ONE place trace + tenant + credentials meet a header map.
56    pub async fn from_headers(headers: &HeaderMap, container: &'static FrozenDiContainer) -> Self {
57        let trace = extract_trace_context(headers);
58        let auth = extract_auth(headers, container).await;
59        let tenant = container
60            .try_get::<TenantRegistry>()
61            .and_then(|tr| tr.resolve(headers));
62        Self {
63            trace,
64            tenant,
65            claims: auth.claims,
66            session: auth.session,
67        }
68    }
69
70    /// Consumer-mesh messages: the producing request's identity rides the
71    /// envelope. The tenant id is validated against the SAME registry as
72    /// HTTP — a suspended tenant's queued events resolve to `None` and stop
73    /// being processed — and the producer's `traceparent` continues the
74    /// distributed trace instead of starting an orphan root.
75    ///
76    /// Messages authenticate at the transport (broker credentials), not per
77    /// event, so `claims`/`session` are absent by design.
78    pub fn from_message(msg: &InboundMessage, container: &'static FrozenDiContainer) -> Self {
79        let trace = msg
80            .traceparent
81            .as_deref()
82            .and_then(TraceContext::from_traceparent)
83            .unwrap_or_else(TraceContext::new_root);
84        let tenant = msg.tenant.as_deref().and_then(|id| {
85            container
86                .try_get::<TenantRegistry>()
87                .and_then(|tr| tr.resolve_by_id(id))
88        });
89        Self {
90            trace,
91            tenant,
92            claims: None,
93            session: None,
94        }
95    }
96
97    /// `traceparent` for stamping outbound hops (HTTP calls, outbox rows,
98    /// queue envelopes).
99    pub fn traceparent(&self) -> String {
100        self.trace.to_traceparent()
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use super::*;
107    use crate::core::engine::DiContainerBuilder;
108    use crate::web::tenant::{TenantId, TenantStrategy};
109
110    fn container_with_registry() -> &'static FrozenDiContainer {
111        let registry = TenantRegistry::new(
112            TenantStrategy::header("x-tenant-id"),
113            vec![TenantConfig {
114                id: TenantId::new("acme"),
115                display_name: "Acme".into(),
116                datasource: "acme".into(),
117            }],
118            None,
119        );
120        registry.suspend(&TenantId::new("globex")); // suspended, never known
121        let mut b = DiContainerBuilder::new();
122        b.register(registry);
123        b.freeze()
124    }
125
126    fn msg(tenant: Option<&str>, traceparent: Option<&str>) -> InboundMessage {
127        InboundMessage {
128            topic: "t".into(),
129            payload: serde_json::Value::Null,
130            idempotency_key: "k".into(),
131            tenant: tenant.map(str::to_owned),
132            traceparent: traceparent.map(str::to_owned),
133        }
134    }
135
136    #[test]
137    fn message_provenance_validates_tenant_against_registry() {
138        let c = container_with_registry();
139
140        // Known tenant resolves to the same config HTTP would see.
141        let p = Provenance::from_message(&msg(Some("acme"), None), c);
142        assert_eq!(p.tenant.as_deref().map(|t| t.id.as_str()), Some("acme"));
143
144        // Unknown and suspended ids resolve to None — handlers never see them.
145        assert!(Provenance::from_message(&msg(Some("nope"), None), c)
146            .tenant
147            .is_none());
148        assert!(Provenance::from_message(&msg(Some("globex"), None), c)
149            .tenant
150            .is_none());
151        // No tenant on the envelope: simply absent.
152        assert!(Provenance::from_message(&msg(None, None), c)
153            .tenant
154            .is_none());
155    }
156
157    #[test]
158    fn message_provenance_continues_producer_trace() {
159        let c = container_with_registry();
160        let carried = "00-0123456789abcdef0123456789abcdef-00f067aa0ba902b7-01";
161
162        let p = Provenance::from_message(&msg(None, Some(carried)), c);
163        let hex = crate::observability::lean_telemetry::hex_encode(&p.trace.trace_id);
164        assert_eq!(
165            hex, "0123456789abcdef0123456789abcdef",
166            "trace id continues"
167        );
168        // Producer's span is this hop's parent; a fresh consumer span is minted.
169        let parent = crate::observability::lean_telemetry::hex_encode(&p.trace.parent_span_id);
170        assert_eq!(parent, "00f067aa0ba902b7");
171
172        // No traceparent → fresh root (all-zero parent).
173        let root = Provenance::from_message(&msg(None, None), c);
174        assert_eq!(root.trace.parent_span_id, [0u8; 8]);
175    }
176}