oauth2_broker/flows/
refresh.rs

1//! Refresh token orchestration with singleflight guards, CAS rotation, and metrics.
2//!
3//! The broker exposes [`Broker::refresh_access_token`] so callers can request a fresh
4//! access token for a tenant/principal/scope triple without worrying about
5//! concurrent rotations. Each request acquires a per-`StoreKey` guard, evaluates
6//! a jittered preemptive window, and either reuses the cached record or performs a
7//! `grant_type=refresh_token` call. Successful refreshes rotate secrets via
8//! `BrokerStore::compare_and_swap_refresh`, while invalid_grant/revoked responses
9//! revoke the cached record.
10
11mod metrics;
12
13pub use metrics::RefreshMetrics;
14
15// self
16use crate::{
17	_prelude::*,
18	auth::{TokenFamily, TokenRecord},
19	error::ConfigError,
20	flows::{Broker, CachedTokenRequest, common},
21	http::TokenHttpClient,
22	oauth::{BasicFacade, OAuth2Facade, TransportErrorMapper},
23	obs::{self, FlowKind, FlowOutcome, FlowSpan},
24	provider::GrantType,
25	store::{BrokerStore, CompareAndSwapOutcome, StoreKey},
26};
27
28impl<C, M> Broker<C, M>
29where
30	C: ?Sized + TokenHttpClient,
31	M: ?Sized + TransportErrorMapper<C::TransportError>,
32{
33	/// Refreshes the cached token family, performing CAS rotation + singleflight guards.
34	pub async fn refresh_access_token(&self, request: CachedTokenRequest) -> Result<TokenRecord> {
35		const KIND: FlowKind = FlowKind::Refresh;
36
37		let span = FlowSpan::new(KIND, "refresh_access_token");
38
39		obs::record_flow_outcome(KIND, FlowOutcome::Attempt);
40
41		let result = span
42			.instrument(async move {
43				self.ensure_refresh_supported()?;
44				self.refresh_metrics.record_attempt();
45
46				let tenant = request.tenant.clone();
47				let principal = request.principal.clone();
48				let store_scope = request.scope.clone();
49				let requested_scope = store_scope.clone();
50				let mut family = TokenFamily::new(tenant, principal);
51
52				family.provider = Some(self.descriptor.id.clone());
53
54				let key = StoreKey::new(&family, &store_scope);
55				let guard = common::flow_guard(self, &key);
56				let _singleflight = guard.lock().await;
57				let now = OffsetDateTime::now_utc();
58				let current = <dyn BrokerStore>::fetch(self.store.as_ref(), &family, &store_scope)
59					.await
60					.map_err(|err| {
61						self.refresh_metrics.record_failure();
62						Error::from(err)
63					})?
64					.ok_or_else(|| {
65						self.refresh_metrics.record_failure();
66
67						Error::InvalidGrant {
68							reason: "No cached token record is available for refresh operations."
69								.into(),
70						}
71					})?;
72
73				if !request.should_refresh(&current, now) {
74					self.refresh_metrics.record_success();
75
76					return Ok(current);
77				}
78
79				let expected_refresh = current
80					.refresh_token
81					.as_ref()
82					.map(|secret| secret.expose().to_string())
83					.ok_or_else(|| {
84						self.refresh_metrics.record_failure();
85
86						Error::from(ConfigError::MissingRefreshToken)
87					})?;
88				let facade = <BasicFacade<C, M>>::from_descriptor(
89					&self.descriptor,
90					&self.client_id,
91					self.client_secret.as_deref(),
92					None,
93					self.http_client.clone(),
94					self.transport_mapper.clone(),
95				)
96				.inspect_err(|_| {
97					self.refresh_metrics.record_failure();
98				})?;
99				let (facade_record, new_refresh) = match facade
100					.refresh_token(
101						self.strategy.as_ref(),
102						family.clone(),
103						&expected_refresh,
104						&requested_scope,
105					)
106					.await
107				{
108					Ok(result) => result,
109					Err(err) => {
110						if matches!(err, Error::InvalidGrant { .. } | Error::Revoked) {
111							let _ = <dyn BrokerStore>::revoke(
112								self.store.as_ref(),
113								&family,
114								&store_scope,
115								now,
116							)
117							.await;
118						}
119
120						self.refresh_metrics.record_failure();
121
122						return Err(err);
123					},
124				};
125				let updated = if new_refresh.is_some() {
126					facade_record
127				} else {
128					let mut builder = TokenRecord::builder(
129						facade_record.family.clone(),
130						facade_record.scope.clone(),
131					)
132					.access_token(facade_record.access_token.expose())
133					.issued_at(facade_record.issued_at)
134					.expires_at(facade_record.expires_at);
135
136					builder = builder.refresh_token(expected_refresh.clone());
137
138					builder.build().map_err(|err| {
139						self.refresh_metrics.record_failure();
140
141						common::map_token_builder_error(err)
142					})?
143				};
144				let outcome = <dyn BrokerStore>::compare_and_swap_refresh(
145					self.store.as_ref(),
146					&family,
147					&store_scope,
148					Some(expected_refresh.as_str()),
149					updated.clone(),
150				)
151				.await
152				.map_err(|err| {
153					self.refresh_metrics.record_failure();
154
155					Error::from(err)
156				})?;
157				let result = match outcome {
158					CompareAndSwapOutcome::Updated => updated,
159					CompareAndSwapOutcome::Missing => {
160						<dyn BrokerStore>::save(self.store.as_ref(), updated.clone())
161							.await
162							.map_err(|err| {
163								self.refresh_metrics.record_failure();
164								Error::from(err)
165							})?;
166
167						updated
168					},
169					CompareAndSwapOutcome::RefreshMismatch => {
170						match <dyn BrokerStore>::fetch(self.store.as_ref(), &family, &store_scope)
171							.await
172							.map_err(|err| {
173								self.refresh_metrics.record_failure();
174								Error::from(err)
175							})? {
176							Some(existing) => existing,
177							None => {
178								<dyn BrokerStore>::save(self.store.as_ref(), updated.clone())
179									.await
180									.map_err(|err| {
181										self.refresh_metrics.record_failure();
182										Error::from(err)
183									})?;
184
185								updated
186							},
187						}
188					},
189				};
190
191				self.refresh_metrics.record_success();
192				Ok(result)
193			})
194			.await;
195
196		match &result {
197			Ok(_) => obs::record_flow_outcome(KIND, FlowOutcome::Success),
198			Err(_) => obs::record_flow_outcome(KIND, FlowOutcome::Failure),
199		}
200
201		result
202	}
203
204	fn ensure_refresh_supported(&self) -> Result<()> {
205		if self.descriptor.supports(GrantType::RefreshToken) {
206			Ok(())
207		} else {
208			Err(ConfigError::UnsupportedGrant {
209				descriptor: self.descriptor.id.to_string(),
210				grant: "refresh_token",
211			}
212			.into())
213		}
214	}
215}