oauth2_broker/flows/
refresh.rs1mod metrics;
12
13pub use metrics::RefreshMetrics;
14
15use crate::{
17 _prelude::*,
18 auth::{TokenFamily, TokenRecord},
19 error::ConfigError,
20 flows::{Broker, CachedTokenRequest, common},
21 http::TokenHttpClient,
22 oauth::{BasicFacade, OAuth2Facade, TransportErrorMapper},
23 obs::{self, FlowKind, FlowOutcome, FlowSpan},
24 provider::GrantType,
25 store::{BrokerStore, CompareAndSwapOutcome, StoreKey},
26};
27
28impl<C, M> Broker<C, M>
29where
30 C: ?Sized + TokenHttpClient,
31 M: ?Sized + TransportErrorMapper<C::TransportError>,
32{
33 pub async fn refresh_access_token(&self, request: CachedTokenRequest) -> Result<TokenRecord> {
35 const KIND: FlowKind = FlowKind::Refresh;
36
37 let span = FlowSpan::new(KIND, "refresh_access_token");
38
39 obs::record_flow_outcome(KIND, FlowOutcome::Attempt);
40
41 let result = span
42 .instrument(async move {
43 self.ensure_refresh_supported()?;
44 self.refresh_metrics.record_attempt();
45
46 let tenant = request.tenant.clone();
47 let principal = request.principal.clone();
48 let store_scope = request.scope.clone();
49 let requested_scope = store_scope.clone();
50 let mut family = TokenFamily::new(tenant, principal);
51
52 family.provider = Some(self.descriptor.id.clone());
53
54 let key = StoreKey::new(&family, &store_scope);
55 let guard = common::flow_guard(self, &key);
56 let _singleflight = guard.lock().await;
57 let now = OffsetDateTime::now_utc();
58 let current = <dyn BrokerStore>::fetch(self.store.as_ref(), &family, &store_scope)
59 .await
60 .map_err(|err| {
61 self.refresh_metrics.record_failure();
62 Error::from(err)
63 })?
64 .ok_or_else(|| {
65 self.refresh_metrics.record_failure();
66
67 Error::InvalidGrant {
68 reason: "No cached token record is available for refresh operations."
69 .into(),
70 }
71 })?;
72
73 if !request.should_refresh(¤t, now) {
74 self.refresh_metrics.record_success();
75
76 return Ok(current);
77 }
78
79 let expected_refresh = current
80 .refresh_token
81 .as_ref()
82 .map(|secret| secret.expose().to_string())
83 .ok_or_else(|| {
84 self.refresh_metrics.record_failure();
85
86 Error::from(ConfigError::MissingRefreshToken)
87 })?;
88 let facade = <BasicFacade<C, M>>::from_descriptor(
89 &self.descriptor,
90 &self.client_id,
91 self.client_secret.as_deref(),
92 None,
93 self.http_client.clone(),
94 self.transport_mapper.clone(),
95 )
96 .inspect_err(|_| {
97 self.refresh_metrics.record_failure();
98 })?;
99 let (facade_record, new_refresh) = match facade
100 .refresh_token(
101 self.strategy.as_ref(),
102 family.clone(),
103 &expected_refresh,
104 &requested_scope,
105 )
106 .await
107 {
108 Ok(result) => result,
109 Err(err) => {
110 if matches!(err, Error::InvalidGrant { .. } | Error::Revoked) {
111 let _ = <dyn BrokerStore>::revoke(
112 self.store.as_ref(),
113 &family,
114 &store_scope,
115 now,
116 )
117 .await;
118 }
119
120 self.refresh_metrics.record_failure();
121
122 return Err(err);
123 },
124 };
125 let updated = if new_refresh.is_some() {
126 facade_record
127 } else {
128 let mut builder = TokenRecord::builder(
129 facade_record.family.clone(),
130 facade_record.scope.clone(),
131 )
132 .access_token(facade_record.access_token.expose())
133 .issued_at(facade_record.issued_at)
134 .expires_at(facade_record.expires_at);
135
136 builder = builder.refresh_token(expected_refresh.clone());
137
138 builder.build().map_err(|err| {
139 self.refresh_metrics.record_failure();
140
141 common::map_token_builder_error(err)
142 })?
143 };
144 let outcome = <dyn BrokerStore>::compare_and_swap_refresh(
145 self.store.as_ref(),
146 &family,
147 &store_scope,
148 Some(expected_refresh.as_str()),
149 updated.clone(),
150 )
151 .await
152 .map_err(|err| {
153 self.refresh_metrics.record_failure();
154
155 Error::from(err)
156 })?;
157 let result = match outcome {
158 CompareAndSwapOutcome::Updated => updated,
159 CompareAndSwapOutcome::Missing => {
160 <dyn BrokerStore>::save(self.store.as_ref(), updated.clone())
161 .await
162 .map_err(|err| {
163 self.refresh_metrics.record_failure();
164 Error::from(err)
165 })?;
166
167 updated
168 },
169 CompareAndSwapOutcome::RefreshMismatch => {
170 match <dyn BrokerStore>::fetch(self.store.as_ref(), &family, &store_scope)
171 .await
172 .map_err(|err| {
173 self.refresh_metrics.record_failure();
174 Error::from(err)
175 })? {
176 Some(existing) => existing,
177 None => {
178 <dyn BrokerStore>::save(self.store.as_ref(), updated.clone())
179 .await
180 .map_err(|err| {
181 self.refresh_metrics.record_failure();
182 Error::from(err)
183 })?;
184
185 updated
186 },
187 }
188 },
189 };
190
191 self.refresh_metrics.record_success();
192 Ok(result)
193 })
194 .await;
195
196 match &result {
197 Ok(_) => obs::record_flow_outcome(KIND, FlowOutcome::Success),
198 Err(_) => obs::record_flow_outcome(KIND, FlowOutcome::Failure),
199 }
200
201 result
202 }
203
204 fn ensure_refresh_supported(&self) -> Result<()> {
205 if self.descriptor.supports(GrantType::RefreshToken) {
206 Ok(())
207 } else {
208 Err(ConfigError::UnsupportedGrant {
209 descriptor: self.descriptor.id.to_string(),
210 grant: "refresh_token",
211 }
212 .into())
213 }
214 }
215}