Skip to main content

securitydept_token_set_context/access_token_substrate/
service.rs

1use securitydept_creds::{CoreJwtClaims, parse_bearer_auth_header_opt};
2use securitydept_oauth_resource_server::{
3    OAuthResourceServerError, OAuthResourceServerVerifier, ResourceTokenPrincipal,
4};
5use securitydept_utils::{
6    error::{ErrorPresentation, ToErrorPresentation, UserRecovery},
7    http::ToHttpStatus,
8    observability::{
9        AuthFlowDiagnosis, AuthFlowDiagnosisField, AuthFlowDiagnosisOutcome, AuthFlowOperation,
10        DiagnosedResult,
11    },
12};
13use snafu::Snafu;
14
15use super::{
16    forwarder::{PropagationForwarder, PropagationForwarderError},
17    propagation::{
18        DEFAULT_PROPAGATION_HEADER_NAME, PropagatedBearer, PropagationDirective,
19        PropagationRequestTarget, TokenPropagatorError,
20    },
21    runtime::AccessTokenSubstrateRuntime,
22};
23
24// ---------------------------------------------------------------------------
25// Error
26// ---------------------------------------------------------------------------
27
28/// Errors produced by [`AccessTokenSubstrateResourceService`].
29#[derive(Debug, Snafu)]
30pub enum AccessTokenSubstrateResourceServiceError {
31    #[snafu(transparent)]
32    OAuthResourceServer { source: OAuthResourceServerError },
33
34    #[snafu(transparent)]
35    Propagation { source: PropagationForwarderError },
36
37    #[snafu(display("token propagation is not enabled on this server"))]
38    PropagationNotEnabled,
39
40    #[snafu(display("propagation request requires a bearer token in the Authorization header"))]
41    BearerTokenRequired,
42
43    #[snafu(display("propagation request requires the {DEFAULT_PROPAGATION_HEADER_NAME} header"))]
44    PropagationDirectiveRequired,
45
46    #[snafu(display("invalid propagation directive: {source}"))]
47    PropagationDirectiveInvalid { source: TokenPropagatorError },
48}
49
50impl ToHttpStatus for AccessTokenSubstrateResourceServiceError {
51    fn to_http_status(&self) -> http::StatusCode {
52        match self {
53            Self::OAuthResourceServer { source } => source.to_http_status(),
54            Self::BearerTokenRequired
55            | Self::PropagationDirectiveRequired
56            | Self::PropagationDirectiveInvalid { .. } => http::StatusCode::BAD_REQUEST,
57            Self::Propagation { .. } | Self::PropagationNotEnabled => {
58                http::StatusCode::INTERNAL_SERVER_ERROR
59            }
60        }
61    }
62}
63
64impl ToErrorPresentation for AccessTokenSubstrateResourceServiceError {
65    fn to_error_presentation(&self) -> ErrorPresentation {
66        match self {
67            Self::OAuthResourceServer { source } => source.to_error_presentation(),
68            Self::Propagation { source } => source.to_error_presentation(),
69            Self::PropagationNotEnabled => ErrorPresentation::new(
70                "propagation_not_enabled",
71                "Token propagation is not enabled on this server.",
72                UserRecovery::ContactSupport,
73            ),
74            Self::BearerTokenRequired => ErrorPresentation::new(
75                "bearer_token_required",
76                "A bearer token in the Authorization header is required for propagation.",
77                UserRecovery::Reauthenticate,
78            ),
79            Self::PropagationDirectiveRequired => ErrorPresentation::new(
80                "propagation_directive_required",
81                "The propagation directive header is required.",
82                UserRecovery::ContactSupport,
83            ),
84            Self::PropagationDirectiveInvalid { .. } => ErrorPresentation::new(
85                "propagation_directive_invalid",
86                "The propagation directive header is malformed.",
87                UserRecovery::ContactSupport,
88            ),
89        }
90    }
91}
92
93// ---------------------------------------------------------------------------
94// Service
95// ---------------------------------------------------------------------------
96
97/// Cross-mode resource service for verifying bearer tokens and forwarding
98/// propagation requests.
99///
100/// # Capabilities
101///
102/// | Method | Description |
103/// |---|---|
104/// | [`authenticate_authorization_header`](Self::authenticate_authorization_header) | Verify a bearer token from an `Authorization` header |
105/// | [`parse_propagation_directive`](Self::parse_propagation_directive) | Extract and parse the propagation directive from request headers |
106/// | [`propagate_request`](Self::propagate_request) | End-to-end: extract bearer + directive from request, verify, and forward |
107/// | [`propagate_bearer`](Self::propagate_bearer) | Low-level: forward a pre-extracted bearer to a downstream target |
108///
109/// # Service pattern
110///
111/// Constructed from `ServerState` via `resource_service()` when both
112/// `substrate_runtime` and `oauth_resource_server_verifier` are present,
113/// mirroring [`BackendOidcModeAuthService`](crate::backend_oidc_mode::BackendOidcModeAuthService).
114#[derive(Clone, Copy)]
115pub struct AccessTokenSubstrateResourceService<'a> {
116    runtime: &'a AccessTokenSubstrateRuntime,
117    verifier: &'a OAuthResourceServerVerifier,
118}
119
120impl<'a> AccessTokenSubstrateResourceService<'a> {
121    pub fn new(
122        runtime: &'a AccessTokenSubstrateRuntime,
123        verifier: &'a OAuthResourceServerVerifier,
124    ) -> Self {
125        Self { runtime, verifier }
126    }
127
128    // -----------------------------------------------------------------------
129    // Token verification
130    // -----------------------------------------------------------------------
131
132    /// Verify a bearer token extracted from an `Authorization` header.
133    ///
134    /// Returns `None` when the header is absent or not a bearer token.
135    pub async fn authenticate_authorization_header(
136        &self,
137        authorization_header: Option<&str>,
138    ) -> Result<Option<ResourceTokenPrincipal>, AccessTokenSubstrateResourceServiceError> {
139        let Some(authorization_header) = authorization_header else {
140            return Ok(None);
141        };
142        let Some(token) = parse_bearer_auth_header_opt(authorization_header) else {
143            return Ok(None);
144        };
145
146        let verified = self
147            .verifier
148            .verify_token::<CoreJwtClaims>(&token)
149            .await
150            .map_err(
151                |source| AccessTokenSubstrateResourceServiceError::OAuthResourceServer { source },
152            )?;
153
154        Ok(Some(verified.to_resource_token_principal()))
155    }
156
157    // -----------------------------------------------------------------------
158    // Propagation directive parsing
159    // -----------------------------------------------------------------------
160
161    /// Extract and parse a [`PropagationDirective`] from request headers.
162    ///
163    /// Returns `Ok(None)` when the `x-securitydept-propagation` header is
164    /// absent, `Ok(Some(directive))` when present and valid, or
165    /// [`PropagationDirectiveInvalid`](AccessTokenSubstrateResourceServiceError::PropagationDirectiveInvalid)
166    /// when the header value is malformed.
167    pub fn parse_propagation_directive(
168        headers: &http::HeaderMap,
169    ) -> Result<Option<PropagationDirective>, AccessTokenSubstrateResourceServiceError> {
170        let Some(value) = headers.get(DEFAULT_PROPAGATION_HEADER_NAME) else {
171            return Ok(None);
172        };
173
174        PropagationDirective::from_header_value(value)
175            .map(Some)
176            .map_err(|source| {
177                AccessTokenSubstrateResourceServiceError::PropagationDirectiveInvalid { source }
178            })
179    }
180
181    // -----------------------------------------------------------------------
182    // Propagation forwarding
183    // -----------------------------------------------------------------------
184
185    /// End-to-end propagation: extract, verify, and forward.
186    ///
187    /// 1. Extracts the bearer token from the `Authorization` header and
188    ///    verifies it via the configured [`OAuthResourceServerVerifier`].
189    /// 2. Parses the [`PropagationDirective`] from the
190    ///    `x-securitydept-propagation` header.
191    /// 3. Delegates to [`propagate_bearer`](Self::propagate_bearer).
192    ///
193    /// This is the recommended entry-point for propagation route handlers.
194    pub async fn propagate_request<F: PropagationForwarder>(
195        &self,
196        forwarder: &F,
197        request: http::Request<F::Body>,
198    ) -> Result<http::Response<F::Body>, AccessTokenSubstrateResourceServiceError> {
199        self.propagate_request_with_diagnosis(forwarder, request)
200            .await
201            .into_result()
202    }
203
204    /// End-to-end propagation with a machine-readable diagnosis surface.
205    pub async fn propagate_request_with_diagnosis<F: PropagationForwarder>(
206        &self,
207        forwarder: &F,
208        request: http::Request<F::Body>,
209    ) -> DiagnosedResult<http::Response<F::Body>, AccessTokenSubstrateResourceServiceError> {
210        let mut diagnosis = AuthFlowDiagnosis::started(AuthFlowOperation::PROPAGATION_FORWARD)
211            .field(AuthFlowDiagnosisField::TRANSPORT, "authorization_header")
212            .field(
213                AuthFlowDiagnosisField::DIRECTIVE_HEADER,
214                DEFAULT_PROPAGATION_HEADER_NAME,
215            );
216
217        // 1. Extract and verify bearer token.
218        let authorization_header = request
219            .headers()
220            .get(http::header::AUTHORIZATION)
221            .and_then(|v| v.to_str().ok());
222
223        let Some(authorization_str) = authorization_header else {
224            return DiagnosedResult::failure(
225                diagnosis
226                    .with_outcome(AuthFlowDiagnosisOutcome::Rejected)
227                    .field(
228                        AuthFlowDiagnosisField::FAILURE_STAGE,
229                        "authorization_header",
230                    )
231                    .field(AuthFlowDiagnosisField::REASON, "missing_bearer_token"),
232                AccessTokenSubstrateResourceServiceError::BearerTokenRequired,
233            );
234        };
235
236        let Some(access_token) = parse_bearer_auth_header_opt(authorization_str) else {
237            return DiagnosedResult::failure(
238                diagnosis
239                    .with_outcome(AuthFlowDiagnosisOutcome::Rejected)
240                    .field(
241                        AuthFlowDiagnosisField::FAILURE_STAGE,
242                        "authorization_header",
243                    )
244                    .field(AuthFlowDiagnosisField::REASON, "invalid_bearer_token"),
245                AccessTokenSubstrateResourceServiceError::BearerTokenRequired,
246            );
247        };
248
249        let resource_token_principal = match self
250            .verifier
251            .verify_token::<CoreJwtClaims>(&access_token)
252            .await
253        {
254            Ok(verified) => verified.to_resource_token_principal(),
255            Err(source) => {
256                return DiagnosedResult::failure(
257                    diagnosis
258                        .with_outcome(AuthFlowDiagnosisOutcome::Failed)
259                        .field(AuthFlowDiagnosisField::FAILURE_STAGE, "token_verification"),
260                    AccessTokenSubstrateResourceServiceError::OAuthResourceServer { source },
261                );
262            }
263        };
264
265        // 2. Parse propagation directive.
266        let Some(directive_header) = request.headers().get(DEFAULT_PROPAGATION_HEADER_NAME) else {
267            return DiagnosedResult::failure(
268                diagnosis
269                    .with_outcome(AuthFlowDiagnosisOutcome::Rejected)
270                    .field(
271                        AuthFlowDiagnosisField::FAILURE_STAGE,
272                        "propagation_directive",
273                    )
274                    .field(
275                        AuthFlowDiagnosisField::REASON,
276                        "missing_propagation_directive",
277                    ),
278                AccessTokenSubstrateResourceServiceError::PropagationDirectiveRequired,
279            );
280        };
281
282        let directive = match PropagationDirective::from_header_value(directive_header) {
283            Ok(directive) => directive,
284            Err(source) => {
285                return DiagnosedResult::failure(
286                    diagnosis
287                        .with_outcome(AuthFlowDiagnosisOutcome::Rejected)
288                        .field(
289                            AuthFlowDiagnosisField::FAILURE_STAGE,
290                            "propagation_directive",
291                        )
292                        .field(
293                            AuthFlowDiagnosisField::REASON,
294                            "invalid_propagation_directive",
295                        ),
296                    AccessTokenSubstrateResourceServiceError::PropagationDirectiveInvalid {
297                        source,
298                    },
299                );
300            }
301        };
302
303        let bearer = PropagatedBearer {
304            access_token: &access_token,
305            resource_token_principal: Some(&resource_token_principal),
306        };
307        let target = directive.to_request_target();
308        diagnosis = diagnosis
309            .field("target_node_id", target.node_id.clone())
310            .field(
311                "target_scheme",
312                target.scheme.as_ref().map(|scheme| scheme.as_str()),
313            )
314            .field("target_hostname", target.hostname.clone())
315            .field("target_port", target.port);
316
317        // 3. Forward.
318        match self
319            .propagate_bearer(forwarder, &bearer, &target, request)
320            .await
321        {
322            Ok(response) => DiagnosedResult::success(
323                diagnosis
324                    .with_outcome(AuthFlowDiagnosisOutcome::Succeeded)
325                    .field(
326                        "principal_subject",
327                        resource_token_principal.subject.clone(),
328                    ),
329                response,
330            ),
331            Err(error) => DiagnosedResult::failure(
332                diagnosis
333                    .with_outcome(AuthFlowDiagnosisOutcome::Failed)
334                    .field(AuthFlowDiagnosisField::FAILURE_STAGE, "forward"),
335                error,
336            ),
337        }
338    }
339
340    /// Validate and forward a bearer token to a downstream propagation target.
341    ///
342    /// This is the low-level building block used by
343    /// [`propagate_request`](Self::propagate_request). Use it directly when
344    /// the bearer and target have already been extracted and verified by the
345    /// caller.
346    pub async fn propagate_bearer<F: PropagationForwarder>(
347        &self,
348        forwarder: &F,
349        bearer: &PropagatedBearer<'_>,
350        target: &PropagationRequestTarget,
351        request: http::Request<F::Body>,
352    ) -> Result<http::Response<F::Body>, AccessTokenSubstrateResourceServiceError> {
353        let propagator = self
354            .runtime
355            .token_propagator()
356            .ok_or(AccessTokenSubstrateResourceServiceError::PropagationNotEnabled)?;
357
358        forwarder
359            .forward(propagator, bearer, target, request)
360            .await
361            .map_err(|source| AccessTokenSubstrateResourceServiceError::Propagation { source })
362    }
363}
364
365// ---------------------------------------------------------------------------
366// Tests
367// ---------------------------------------------------------------------------
368
369#[cfg(all(test, feature = "axum-reverse-proxy-propagation-forwarder"))]
370mod tests {
371    use axum::{
372        Json, Router,
373        body::{Body, to_bytes},
374        http::{Request, StatusCode, header::AUTHORIZATION},
375        routing::get,
376    };
377    use securitydept_oauth_resource_server::ResourceTokenPrincipal;
378    use tokio::net::TcpListener;
379
380    use super::*;
381    use crate::access_token_substrate::{
382        AllowedPropagationTarget, AxumReverseProxyPropagationForwarder,
383        AxumReverseProxyPropagationForwarderConfig, DEFAULT_PROPAGATION_HEADER_NAME,
384        PropagationDestinationPolicy, PropagationDirective, PropagationScheme, TokenPropagation,
385        TokenPropagatorConfig,
386    };
387
388    fn make_runtime_and_forwarder(
389        upstream_port: u16,
390    ) -> (
391        AccessTokenSubstrateRuntime,
392        AxumReverseProxyPropagationForwarder,
393    ) {
394        let token_propagation = TokenPropagation::Enabled {
395            config: TokenPropagatorConfig {
396                destination_policy: PropagationDestinationPolicy {
397                    allowed_targets: vec![AllowedPropagationTarget::ExactOrigin {
398                        scheme: PropagationScheme::Http,
399                        hostname: "localhost".to_string(),
400                        port: upstream_port,
401                    }],
402                    ..Default::default()
403                },
404                ..Default::default()
405            },
406        };
407
408        let runtime = AccessTokenSubstrateRuntime::new(&token_propagation)
409            .expect("substrate runtime should build");
410
411        let forwarder =
412            AxumReverseProxyPropagationForwarder::new(AxumReverseProxyPropagationForwarderConfig {
413                proxy_path: "/api/propagation".to_string(),
414            })
415            .expect("forwarder should build");
416
417        (runtime, forwarder)
418    }
419
420    #[tokio::test]
421    async fn propagate_bearer_proxies_request_to_upstream() {
422        let _ = rustls::crypto::ring::default_provider().install_default();
423
424        // Spin up a minimal upstream that echoes request details and serves
425        // a stub JWKS endpoint (needed to construct a valid verifier).
426        let upstream = Router::new()
427            .route(
428                "/api/health",
429                get(|request: Request<Body>| async move {
430                    let authorization = request
431                        .headers()
432                        .get(AUTHORIZATION)
433                        .and_then(|v| v.to_str().ok())
434                        .map(str::to_string);
435                    let propagation_header = request
436                        .headers()
437                        .get(DEFAULT_PROPAGATION_HEADER_NAME)
438                        .and_then(|v| v.to_str().ok())
439                        .map(str::to_string);
440
441                    Json(serde_json::json!({
442                        "status": "ok",
443                        "path":   request.uri().path(),
444                        "query":  request.uri().query(),
445                        "authorization": authorization,
446                        "propagation_header": propagation_header,
447                    }))
448                }),
449            )
450            .route(
451                "/jwks",
452                get(|| async { Json(serde_json::json!({ "keys": [] })) }),
453            );
454        let listener = TcpListener::bind("127.0.0.1:0")
455            .await
456            .expect("upstream listener should bind");
457        let upstream_port = listener.local_addr().expect("should have addr").port();
458        let upstream_task = tokio::spawn(async move {
459            axum::serve(listener, upstream)
460                .await
461                .expect("upstream server should run");
462        });
463
464        let (runtime, forwarder) = make_runtime_and_forwarder(upstream_port);
465
466        // Build a verifier backed by the mock JWKS endpoint.
467        // propagate_bearer never calls the verifier — we only need to satisfy
468        // the type-level requirement that the service holds a valid reference.
469        let verifier_config = securitydept_oauth_resource_server::OAuthResourceServerConfig {
470            remote: securitydept_oauth_resource_server::OAuthProviderRemoteConfig {
471                issuer_url: Some(format!("http://localhost:{upstream_port}")),
472                jwks_uri: Some(format!("http://localhost:{upstream_port}/jwks")),
473                ..Default::default()
474            },
475            ..Default::default()
476        };
477        let verifier = OAuthResourceServerVerifier::from_config(verifier_config)
478            .await
479            .expect("verifier should build");
480        let service = AccessTokenSubstrateResourceService::new(&runtime, &verifier);
481
482        let directive = PropagationDirective::parse(&format!(
483            "by=dashboard;for=local-health;host=localhost:{upstream_port};proto=http"
484        ))
485        .expect("directive should parse");
486
487        let bearer = PropagatedBearer {
488            access_token: "dashboard-at",
489            resource_token_principal: Some(&ResourceTokenPrincipal {
490                subject: Some("user-1".to_string()),
491                issuer: None,
492                audiences: Vec::new(),
493                scopes: Vec::new(),
494                authorized_party: None,
495                claims: Default::default(),
496            }),
497        };
498        let target = directive.to_request_target();
499
500        let request = Request::builder()
501            .uri("/api/propagation/api/health?via=token-set")
502            .header(
503                DEFAULT_PROPAGATION_HEADER_NAME,
504                directive
505                    .to_header_value()
506                    .expect("directive should serialize"),
507            )
508            .body(Body::empty())
509            .expect("request should build");
510
511        let response = service
512            .propagate_bearer(&forwarder, &bearer, &target, request)
513            .await
514            .expect("propagation should succeed");
515
516        assert_eq!(response.status(), StatusCode::OK);
517
518        let body = to_bytes(response.into_body(), usize::MAX)
519            .await
520            .expect("body should read");
521        let payload: serde_json::Value =
522            serde_json::from_slice(&body).expect("response body should be json");
523
524        assert_eq!(payload["status"], "ok");
525        assert_eq!(payload["path"], "/api/health");
526        assert_eq!(payload["query"], "via=token-set");
527        assert_eq!(payload["authorization"], "Bearer dashboard-at");
528        // The propagation header must be stripped before forwarding.
529        assert_eq!(payload["propagation_header"], serde_json::Value::Null);
530
531        upstream_task.abort();
532    }
533}