1use securitydept_creds::{CoreJwtClaims, parse_bearer_auth_header_opt};
2use securitydept_oauth_resource_server::{
3 OAuthResourceServerError, OAuthResourceServerVerifier, ResourceTokenPrincipal,
4};
5use securitydept_utils::{
6 error::{ErrorPresentation, ToErrorPresentation, UserRecovery},
7 http::ToHttpStatus,
8 observability::{
9 AuthFlowDiagnosis, AuthFlowDiagnosisField, AuthFlowDiagnosisOutcome, AuthFlowOperation,
10 DiagnosedResult,
11 },
12};
13use snafu::Snafu;
14
15use super::{
16 forwarder::{PropagationForwarder, PropagationForwarderError},
17 propagation::{
18 DEFAULT_PROPAGATION_HEADER_NAME, PropagatedBearer, PropagationDirective,
19 PropagationRequestTarget, TokenPropagatorError,
20 },
21 runtime::AccessTokenSubstrateRuntime,
22};
23
24#[derive(Debug, Snafu)]
30pub enum AccessTokenSubstrateResourceServiceError {
31 #[snafu(transparent)]
32 OAuthResourceServer { source: OAuthResourceServerError },
33
34 #[snafu(transparent)]
35 Propagation { source: PropagationForwarderError },
36
37 #[snafu(display("token propagation is not enabled on this server"))]
38 PropagationNotEnabled,
39
40 #[snafu(display("propagation request requires a bearer token in the Authorization header"))]
41 BearerTokenRequired,
42
43 #[snafu(display("propagation request requires the {DEFAULT_PROPAGATION_HEADER_NAME} header"))]
44 PropagationDirectiveRequired,
45
46 #[snafu(display("invalid propagation directive: {source}"))]
47 PropagationDirectiveInvalid { source: TokenPropagatorError },
48}
49
50impl ToHttpStatus for AccessTokenSubstrateResourceServiceError {
51 fn to_http_status(&self) -> http::StatusCode {
52 match self {
53 Self::OAuthResourceServer { source } => source.to_http_status(),
54 Self::BearerTokenRequired
55 | Self::PropagationDirectiveRequired
56 | Self::PropagationDirectiveInvalid { .. } => http::StatusCode::BAD_REQUEST,
57 Self::Propagation { .. } | Self::PropagationNotEnabled => {
58 http::StatusCode::INTERNAL_SERVER_ERROR
59 }
60 }
61 }
62}
63
64impl ToErrorPresentation for AccessTokenSubstrateResourceServiceError {
65 fn to_error_presentation(&self) -> ErrorPresentation {
66 match self {
67 Self::OAuthResourceServer { source } => source.to_error_presentation(),
68 Self::Propagation { source } => source.to_error_presentation(),
69 Self::PropagationNotEnabled => ErrorPresentation::new(
70 "propagation_not_enabled",
71 "Token propagation is not enabled on this server.",
72 UserRecovery::ContactSupport,
73 ),
74 Self::BearerTokenRequired => ErrorPresentation::new(
75 "bearer_token_required",
76 "A bearer token in the Authorization header is required for propagation.",
77 UserRecovery::Reauthenticate,
78 ),
79 Self::PropagationDirectiveRequired => ErrorPresentation::new(
80 "propagation_directive_required",
81 "The propagation directive header is required.",
82 UserRecovery::ContactSupport,
83 ),
84 Self::PropagationDirectiveInvalid { .. } => ErrorPresentation::new(
85 "propagation_directive_invalid",
86 "The propagation directive header is malformed.",
87 UserRecovery::ContactSupport,
88 ),
89 }
90 }
91}
92
93#[derive(Clone, Copy)]
115pub struct AccessTokenSubstrateResourceService<'a> {
116 runtime: &'a AccessTokenSubstrateRuntime,
117 verifier: &'a OAuthResourceServerVerifier,
118}
119
120impl<'a> AccessTokenSubstrateResourceService<'a> {
121 pub fn new(
122 runtime: &'a AccessTokenSubstrateRuntime,
123 verifier: &'a OAuthResourceServerVerifier,
124 ) -> Self {
125 Self { runtime, verifier }
126 }
127
128 pub async fn authenticate_authorization_header(
136 &self,
137 authorization_header: Option<&str>,
138 ) -> Result<Option<ResourceTokenPrincipal>, AccessTokenSubstrateResourceServiceError> {
139 let Some(authorization_header) = authorization_header else {
140 return Ok(None);
141 };
142 let Some(token) = parse_bearer_auth_header_opt(authorization_header) else {
143 return Ok(None);
144 };
145
146 let verified = self
147 .verifier
148 .verify_token::<CoreJwtClaims>(&token)
149 .await
150 .map_err(
151 |source| AccessTokenSubstrateResourceServiceError::OAuthResourceServer { source },
152 )?;
153
154 Ok(Some(verified.to_resource_token_principal()))
155 }
156
157 pub fn parse_propagation_directive(
168 headers: &http::HeaderMap,
169 ) -> Result<Option<PropagationDirective>, AccessTokenSubstrateResourceServiceError> {
170 let Some(value) = headers.get(DEFAULT_PROPAGATION_HEADER_NAME) else {
171 return Ok(None);
172 };
173
174 PropagationDirective::from_header_value(value)
175 .map(Some)
176 .map_err(|source| {
177 AccessTokenSubstrateResourceServiceError::PropagationDirectiveInvalid { source }
178 })
179 }
180
181 pub async fn propagate_request<F: PropagationForwarder>(
195 &self,
196 forwarder: &F,
197 request: http::Request<F::Body>,
198 ) -> Result<http::Response<F::Body>, AccessTokenSubstrateResourceServiceError> {
199 self.propagate_request_with_diagnosis(forwarder, request)
200 .await
201 .into_result()
202 }
203
204 pub async fn propagate_request_with_diagnosis<F: PropagationForwarder>(
206 &self,
207 forwarder: &F,
208 request: http::Request<F::Body>,
209 ) -> DiagnosedResult<http::Response<F::Body>, AccessTokenSubstrateResourceServiceError> {
210 let mut diagnosis = AuthFlowDiagnosis::started(AuthFlowOperation::PROPAGATION_FORWARD)
211 .field(AuthFlowDiagnosisField::TRANSPORT, "authorization_header")
212 .field(
213 AuthFlowDiagnosisField::DIRECTIVE_HEADER,
214 DEFAULT_PROPAGATION_HEADER_NAME,
215 );
216
217 let authorization_header = request
219 .headers()
220 .get(http::header::AUTHORIZATION)
221 .and_then(|v| v.to_str().ok());
222
223 let Some(authorization_str) = authorization_header else {
224 return DiagnosedResult::failure(
225 diagnosis
226 .with_outcome(AuthFlowDiagnosisOutcome::Rejected)
227 .field(
228 AuthFlowDiagnosisField::FAILURE_STAGE,
229 "authorization_header",
230 )
231 .field(AuthFlowDiagnosisField::REASON, "missing_bearer_token"),
232 AccessTokenSubstrateResourceServiceError::BearerTokenRequired,
233 );
234 };
235
236 let Some(access_token) = parse_bearer_auth_header_opt(authorization_str) else {
237 return DiagnosedResult::failure(
238 diagnosis
239 .with_outcome(AuthFlowDiagnosisOutcome::Rejected)
240 .field(
241 AuthFlowDiagnosisField::FAILURE_STAGE,
242 "authorization_header",
243 )
244 .field(AuthFlowDiagnosisField::REASON, "invalid_bearer_token"),
245 AccessTokenSubstrateResourceServiceError::BearerTokenRequired,
246 );
247 };
248
249 let resource_token_principal = match self
250 .verifier
251 .verify_token::<CoreJwtClaims>(&access_token)
252 .await
253 {
254 Ok(verified) => verified.to_resource_token_principal(),
255 Err(source) => {
256 return DiagnosedResult::failure(
257 diagnosis
258 .with_outcome(AuthFlowDiagnosisOutcome::Failed)
259 .field(AuthFlowDiagnosisField::FAILURE_STAGE, "token_verification"),
260 AccessTokenSubstrateResourceServiceError::OAuthResourceServer { source },
261 );
262 }
263 };
264
265 let Some(directive_header) = request.headers().get(DEFAULT_PROPAGATION_HEADER_NAME) else {
267 return DiagnosedResult::failure(
268 diagnosis
269 .with_outcome(AuthFlowDiagnosisOutcome::Rejected)
270 .field(
271 AuthFlowDiagnosisField::FAILURE_STAGE,
272 "propagation_directive",
273 )
274 .field(
275 AuthFlowDiagnosisField::REASON,
276 "missing_propagation_directive",
277 ),
278 AccessTokenSubstrateResourceServiceError::PropagationDirectiveRequired,
279 );
280 };
281
282 let directive = match PropagationDirective::from_header_value(directive_header) {
283 Ok(directive) => directive,
284 Err(source) => {
285 return DiagnosedResult::failure(
286 diagnosis
287 .with_outcome(AuthFlowDiagnosisOutcome::Rejected)
288 .field(
289 AuthFlowDiagnosisField::FAILURE_STAGE,
290 "propagation_directive",
291 )
292 .field(
293 AuthFlowDiagnosisField::REASON,
294 "invalid_propagation_directive",
295 ),
296 AccessTokenSubstrateResourceServiceError::PropagationDirectiveInvalid {
297 source,
298 },
299 );
300 }
301 };
302
303 let bearer = PropagatedBearer {
304 access_token: &access_token,
305 resource_token_principal: Some(&resource_token_principal),
306 };
307 let target = directive.to_request_target();
308 diagnosis = diagnosis
309 .field("target_node_id", target.node_id.clone())
310 .field(
311 "target_scheme",
312 target.scheme.as_ref().map(|scheme| scheme.as_str()),
313 )
314 .field("target_hostname", target.hostname.clone())
315 .field("target_port", target.port);
316
317 match self
319 .propagate_bearer(forwarder, &bearer, &target, request)
320 .await
321 {
322 Ok(response) => DiagnosedResult::success(
323 diagnosis
324 .with_outcome(AuthFlowDiagnosisOutcome::Succeeded)
325 .field(
326 "principal_subject",
327 resource_token_principal.subject.clone(),
328 ),
329 response,
330 ),
331 Err(error) => DiagnosedResult::failure(
332 diagnosis
333 .with_outcome(AuthFlowDiagnosisOutcome::Failed)
334 .field(AuthFlowDiagnosisField::FAILURE_STAGE, "forward"),
335 error,
336 ),
337 }
338 }
339
340 pub async fn propagate_bearer<F: PropagationForwarder>(
347 &self,
348 forwarder: &F,
349 bearer: &PropagatedBearer<'_>,
350 target: &PropagationRequestTarget,
351 request: http::Request<F::Body>,
352 ) -> Result<http::Response<F::Body>, AccessTokenSubstrateResourceServiceError> {
353 let propagator = self
354 .runtime
355 .token_propagator()
356 .ok_or(AccessTokenSubstrateResourceServiceError::PropagationNotEnabled)?;
357
358 forwarder
359 .forward(propagator, bearer, target, request)
360 .await
361 .map_err(|source| AccessTokenSubstrateResourceServiceError::Propagation { source })
362 }
363}
364
365#[cfg(all(test, feature = "axum-reverse-proxy-propagation-forwarder"))]
370mod tests {
371 use axum::{
372 Json, Router,
373 body::{Body, to_bytes},
374 http::{Request, StatusCode, header::AUTHORIZATION},
375 routing::get,
376 };
377 use securitydept_oauth_resource_server::ResourceTokenPrincipal;
378 use tokio::net::TcpListener;
379
380 use super::*;
381 use crate::access_token_substrate::{
382 AllowedPropagationTarget, AxumReverseProxyPropagationForwarder,
383 AxumReverseProxyPropagationForwarderConfig, DEFAULT_PROPAGATION_HEADER_NAME,
384 PropagationDestinationPolicy, PropagationDirective, PropagationScheme, TokenPropagation,
385 TokenPropagatorConfig,
386 };
387
388 fn make_runtime_and_forwarder(
389 upstream_port: u16,
390 ) -> (
391 AccessTokenSubstrateRuntime,
392 AxumReverseProxyPropagationForwarder,
393 ) {
394 let token_propagation = TokenPropagation::Enabled {
395 config: TokenPropagatorConfig {
396 destination_policy: PropagationDestinationPolicy {
397 allowed_targets: vec![AllowedPropagationTarget::ExactOrigin {
398 scheme: PropagationScheme::Http,
399 hostname: "localhost".to_string(),
400 port: upstream_port,
401 }],
402 ..Default::default()
403 },
404 ..Default::default()
405 },
406 };
407
408 let runtime = AccessTokenSubstrateRuntime::new(&token_propagation)
409 .expect("substrate runtime should build");
410
411 let forwarder =
412 AxumReverseProxyPropagationForwarder::new(AxumReverseProxyPropagationForwarderConfig {
413 proxy_path: "/api/propagation".to_string(),
414 })
415 .expect("forwarder should build");
416
417 (runtime, forwarder)
418 }
419
420 #[tokio::test]
421 async fn propagate_bearer_proxies_request_to_upstream() {
422 let _ = rustls::crypto::ring::default_provider().install_default();
423
424 let upstream = Router::new()
427 .route(
428 "/api/health",
429 get(|request: Request<Body>| async move {
430 let authorization = request
431 .headers()
432 .get(AUTHORIZATION)
433 .and_then(|v| v.to_str().ok())
434 .map(str::to_string);
435 let propagation_header = request
436 .headers()
437 .get(DEFAULT_PROPAGATION_HEADER_NAME)
438 .and_then(|v| v.to_str().ok())
439 .map(str::to_string);
440
441 Json(serde_json::json!({
442 "status": "ok",
443 "path": request.uri().path(),
444 "query": request.uri().query(),
445 "authorization": authorization,
446 "propagation_header": propagation_header,
447 }))
448 }),
449 )
450 .route(
451 "/jwks",
452 get(|| async { Json(serde_json::json!({ "keys": [] })) }),
453 );
454 let listener = TcpListener::bind("127.0.0.1:0")
455 .await
456 .expect("upstream listener should bind");
457 let upstream_port = listener.local_addr().expect("should have addr").port();
458 let upstream_task = tokio::spawn(async move {
459 axum::serve(listener, upstream)
460 .await
461 .expect("upstream server should run");
462 });
463
464 let (runtime, forwarder) = make_runtime_and_forwarder(upstream_port);
465
466 let verifier_config = securitydept_oauth_resource_server::OAuthResourceServerConfig {
470 remote: securitydept_oauth_resource_server::OAuthProviderRemoteConfig {
471 issuer_url: Some(format!("http://localhost:{upstream_port}")),
472 jwks_uri: Some(format!("http://localhost:{upstream_port}/jwks")),
473 ..Default::default()
474 },
475 ..Default::default()
476 };
477 let verifier = OAuthResourceServerVerifier::from_config(verifier_config)
478 .await
479 .expect("verifier should build");
480 let service = AccessTokenSubstrateResourceService::new(&runtime, &verifier);
481
482 let directive = PropagationDirective::parse(&format!(
483 "by=dashboard;for=local-health;host=localhost:{upstream_port};proto=http"
484 ))
485 .expect("directive should parse");
486
487 let bearer = PropagatedBearer {
488 access_token: "dashboard-at",
489 resource_token_principal: Some(&ResourceTokenPrincipal {
490 subject: Some("user-1".to_string()),
491 issuer: None,
492 audiences: Vec::new(),
493 scopes: Vec::new(),
494 authorized_party: None,
495 claims: Default::default(),
496 }),
497 };
498 let target = directive.to_request_target();
499
500 let request = Request::builder()
501 .uri("/api/propagation/api/health?via=token-set")
502 .header(
503 DEFAULT_PROPAGATION_HEADER_NAME,
504 directive
505 .to_header_value()
506 .expect("directive should serialize"),
507 )
508 .body(Body::empty())
509 .expect("request should build");
510
511 let response = service
512 .propagate_bearer(&forwarder, &bearer, &target, request)
513 .await
514 .expect("propagation should succeed");
515
516 assert_eq!(response.status(), StatusCode::OK);
517
518 let body = to_bytes(response.into_body(), usize::MAX)
519 .await
520 .expect("body should read");
521 let payload: serde_json::Value =
522 serde_json::from_slice(&body).expect("response body should be json");
523
524 assert_eq!(payload["status"], "ok");
525 assert_eq!(payload["path"], "/api/health");
526 assert_eq!(payload["query"], "via=token-set");
527 assert_eq!(payload["authorization"], "Bearer dashboard-at");
528 assert_eq!(payload["propagation_header"], serde_json::Value::Null);
530
531 upstream_task.abort();
532 }
533}