1use std::{
2 sync::{
3 atomic::{AtomicBool, Ordering},
4 Arc,
5 },
6 time::{Duration, Instant},
7};
8
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use rand::{rngs::StdRng, Rng, SeedableRng};
12use tokio::{
13 sync::{Mutex, Notify},
14 task::JoinHandle,
15 time::sleep,
16};
17use tracing::{info, warn};
18
19use super::siwe::{AccessBundle, SiweError};
20
21const DEFAULT_RATIO: f64 = 0.75;
22const MIN_DURATION: Duration = Duration::from_millis(1);
23
24#[async_trait]
25pub trait AccessAuthenticator: Send + Sync {
26 async fn login(&self) -> Result<AccessBundle, SiweError>;
27}
28
29pub trait Clock: Send + Sync {
30 fn now_instant(&self) -> Instant;
31 fn now_utc(&self) -> DateTime<Utc>;
32}
33
34#[derive(Clone)]
35pub struct SystemClock;
36
37impl Clock for SystemClock {
38 fn now_instant(&self) -> Instant {
39 Instant::now()
40 }
41
42 fn now_utc(&self) -> DateTime<Utc> {
43 Utc::now()
44 }
45}
46
47#[derive(Debug, Clone)]
48pub struct TokenManagerConfig {
49 pub safety_ratio: f64,
50 pub max_retries: u32,
51 pub jitter: Duration,
52}
53
54impl Default for TokenManagerConfig {
55 fn default() -> Self {
56 Self {
57 safety_ratio: DEFAULT_RATIO,
58 max_retries: 3,
59 jitter: Duration::from_millis(500),
60 }
61 }
62}
63
64impl TokenManagerConfig {
65 fn clamped_ratio(&self) -> f64 {
66 self.safety_ratio.clamp(0.0, 1.0)
67 }
68}
69
70#[derive(Debug, thiserror::Error)]
71pub enum TokenManagerError {
72 #[error("token manager stopped")]
73 Stopped,
74 #[error("token response already expired")]
75 Expired,
76 #[error("authentication failed after {attempts} attempts: {last_error}")]
77 Authentication {
78 attempts: u32,
79 #[source]
80 last_error: SiweError,
81 },
82}
83
84#[derive(Debug, thiserror::Error)]
85pub enum TokenProviderError {
86 #[error(transparent)]
87 TokenManager(#[from] TokenManagerError),
88 #[error("{0}")]
89 Message(String),
90}
91
92pub type TokenProviderResult<T> = std::result::Result<T, TokenProviderError>;
93
94#[async_trait]
95pub trait TokenProvider: Send + Sync {
96 async fn bearer(&self) -> TokenProviderResult<String>;
97 async fn on_unauthorized(&self);
98}
99
100pub struct TokenManager<A: AccessAuthenticator, C: Clock> {
101 auth: Arc<A>,
102 clock: Arc<C>,
103 config: TokenManagerConfig,
104 rng: Arc<Mutex<StdRng>>,
105 state: Arc<Mutex<State>>,
106 stopped: Arc<AtomicBool>,
107 stop_notify: Arc<Notify>,
108 state_notify: Arc<Notify>,
109 bg_task: Arc<Mutex<Option<JoinHandle<()>>>>,
110}
111
112impl<A: AccessAuthenticator, C: Clock> Clone for TokenManager<A, C> {
113 fn clone(&self) -> Self {
114 Self {
115 auth: Arc::clone(&self.auth),
116 clock: Arc::clone(&self.clock),
117 config: self.config.clone(),
118 rng: Arc::clone(&self.rng),
119 state: Arc::clone(&self.state),
120 stopped: Arc::clone(&self.stopped),
121 stop_notify: Arc::clone(&self.stop_notify),
122 state_notify: Arc::clone(&self.state_notify),
123 bg_task: Arc::clone(&self.bg_task),
124 }
125 }
126}
127
128#[async_trait]
129impl<A, C> TokenProvider for TokenManager<A, C>
130where
131 A: AccessAuthenticator + 'static,
132 C: Clock + 'static,
133{
134 async fn bearer(&self) -> TokenProviderResult<String> {
135 let now = self.clock.now_instant();
136 self.get_access(now)
137 .await
138 .map_err(TokenProviderError::TokenManager)
139 }
140
141 async fn on_unauthorized(&self) {
142 self.on_unauthorized_retry().await;
143 }
144}
145
146struct State {
147 token: Option<TokenEntry>,
148 inflight: Option<Arc<Notify>>,
149}
150
151#[derive(Clone)]
152struct TokenEntry {
153 value: String,
154 refresh_at: Instant,
155}
156
157impl TokenEntry {
158 fn is_valid(&self, now: Instant) -> bool {
159 now <= self.refresh_at
160 }
161}
162
163impl<A, C> TokenManager<A, C>
164where
165 A: AccessAuthenticator + 'static,
166 C: Clock + 'static,
167{
168 pub fn new(auth: Arc<A>, clock: Arc<C>, config: TokenManagerConfig) -> Self {
169 let rng = StdRng::from_entropy();
170 Self::with_rng(auth, clock, config, rng)
171 }
172
173 pub fn with_rng(auth: Arc<A>, clock: Arc<C>, config: TokenManagerConfig, rng: StdRng) -> Self {
174 Self {
175 auth,
176 clock,
177 config,
178 rng: Arc::new(Mutex::new(rng)),
179 state: Arc::new(Mutex::new(State {
180 token: None,
181 inflight: None,
182 })),
183 stopped: Arc::new(AtomicBool::new(false)),
184 stop_notify: Arc::new(Notify::new()),
185 state_notify: Arc::new(Notify::new()),
186 bg_task: Arc::new(Mutex::new(None)),
187 }
188 }
189
190 pub async fn get_access(&self, now: Instant) -> Result<String, TokenManagerError> {
191 loop {
192 let (notify, is_owner) = {
193 let mut state = self.state.lock().await;
194 if self.stopped.load(Ordering::SeqCst) {
195 return Err(TokenManagerError::Stopped);
196 }
197 if let Some(entry) = &state.token {
198 if entry.is_valid(now) {
199 return Ok(entry.value.clone());
200 }
201 }
202 match state.inflight.clone() {
203 Some(existing) => (existing, false),
204 None => {
205 let notify = Arc::new(Notify::new());
206 state.inflight = Some(notify.clone());
207 (notify, true)
208 }
209 }
210 };
211
212 if is_owner {
213 let result = self.reauth(now).await;
214 let mut refreshed = false;
215 let (notify_to_signal, outcome) = {
216 let mut state = self.state.lock().await;
217 let notify_opt = state.inflight.take();
218 let outcome = match result {
219 Ok(entry) => {
220 let token = entry.value.clone();
221 state.token = Some(entry);
222 refreshed = true;
223 Ok(token)
224 }
225 Err(err) => Err(err),
226 };
227 (notify_opt, outcome)
228 };
229 if let Some(n) = notify_to_signal {
230 n.notify_waiters();
231 }
232 if refreshed {
233 self.state_notify.notify_waiters();
234 }
235 return outcome;
236 } else {
237 notify.notified().await;
238 }
239 }
240 }
241
242 pub async fn clear(&self) {
243 let mut state = self.state.lock().await;
244 state.token = None;
245 drop(state);
246 self.state_notify.notify_waiters();
247 }
248
249 pub async fn on_unauthorized_retry(&self) {
250 let mut state = self.state.lock().await;
251 if let Some(entry) = &mut state.token {
252 let now = self.clock.now_instant();
253 entry.refresh_at = now.checked_sub(MIN_DURATION).unwrap_or(now);
254 }
255 drop(state);
256 self.state_notify.notify_waiters();
257 }
258
259 pub async fn stop(&self) {
260 self.stopped.store(true, Ordering::SeqCst);
261 self.stop_notify.notify_waiters();
262 let notify = {
263 let mut state = self.state.lock().await;
264 state.token = None;
265 let inflight = state.inflight.take();
266 drop(state);
267 self.state_notify.notify_waiters();
268 inflight
269 };
270 if let Some(waiters) = notify {
271 waiters.notify_waiters();
272 }
273 if let Some(handle) = self.take_bg_handle().await {
274 let _ = handle.await;
275 }
276 }
277
278 pub async fn start_bg(&self) {
279 let mut guard = self.bg_task.lock().await;
280 if guard.is_some() {
281 return;
282 }
283 if self.stopped.load(Ordering::SeqCst) {
284 self.stopped.store(false, Ordering::SeqCst);
285 }
286 let manager = self.clone();
287 let handle = tokio::spawn(async move {
288 manager.background_loop().await;
289 });
290 *guard = Some(handle);
291 }
292
293 pub async fn stop_bg(&self) {
294 self.stop().await;
295 }
296
297 async fn reauth(&self, now: Instant) -> Result<TokenEntry, TokenManagerError> {
298 if self.stopped.load(Ordering::SeqCst) {
299 return Err(TokenManagerError::Stopped);
300 }
301
302 let attempts = self.config.max_retries.saturating_add(1);
303 let mut last_error: Option<SiweError> = None;
304
305 for attempt in 1..=attempts {
306 if self.stopped.load(Ordering::SeqCst) {
307 return Err(TokenManagerError::Stopped);
308 }
309
310 let login_fut = self.auth.login();
311 tokio::pin!(login_fut);
312 let stop_fut = self.stop_notify.notified();
313 tokio::pin!(stop_fut);
314
315 let bundle = tokio::select! {
316 res = &mut login_fut => res,
317 _ = &mut stop_fut => return Err(TokenManagerError::Stopped),
318 };
319
320 match bundle {
321 Ok(bundle) => match self.bundle_to_entry(&bundle, now).await {
322 Ok(entry) => {
323 let ttl = entry.refresh_at.saturating_duration_since(now);
324 info!(
325 expires_in_ms = ttl.as_millis(),
326 "Refreshed DDS access token"
327 );
328 return Ok(entry);
329 }
330 Err(err) => {
331 warn!(attempt, error = %err, "DDS SIWE response invalid");
332 last_error = Some(err);
333 }
334 },
335 Err(err) => {
336 warn!(attempt, error = %err, "DDS SIWE login failed");
337 last_error = Some(err);
338 }
339 }
340
341 if attempt < attempts {
342 let delay = self.next_delay().await;
343 if !delay.is_zero() {
344 tokio::time::sleep(delay).await;
345 }
346 }
347 }
348
349 Err(TokenManagerError::Authentication {
350 attempts,
351 last_error: last_error.unwrap_or_else(|| SiweError::MissingField("access_token")),
352 })
353 }
354
355 async fn bundle_to_entry(
356 &self,
357 bundle: &AccessBundle,
358 now: Instant,
359 ) -> Result<TokenEntry, SiweError> {
360 let now_utc = self.clock.now_utc();
361 let expires_at = bundle.expires_at();
362 let ttl = expires_at
363 .signed_duration_since(now_utc)
364 .to_std()
365 .map_err(|_| SiweError::MissingField("expires_at"))?;
366 if ttl.is_zero() {
367 return Err(SiweError::MissingField("expires_at"));
368 }
369
370 let ratio = self.config.clamped_ratio();
371 let safe_secs = ttl.as_secs_f64() * ratio;
372 let safe_duration = if safe_secs <= 0.0 {
373 MIN_DURATION.min(ttl)
374 } else {
375 Duration::from_secs_f64(safe_secs).min(ttl)
376 };
377
378 let actual_expiry = now + ttl;
379 let base_refresh = now + safe_duration;
380
381 let jitter_ms = self.sample_jitter_ms().await;
382 let refresh_at = if jitter_ms >= 0 {
383 let add = Duration::from_millis(jitter_ms as u64);
384 base_refresh + add
385 } else {
386 let sub = Duration::from_millis((-jitter_ms) as u64);
387 base_refresh.checked_sub(sub).unwrap_or(now)
388 };
389
390 let min_refresh = now + MIN_DURATION.min(ttl);
391 let mut refresh_at = refresh_at.max(min_refresh);
392 refresh_at = refresh_at.min(actual_expiry);
393
394 Ok(TokenEntry {
395 value: bundle.token().to_string(),
396 refresh_at,
397 })
398 }
399
400 async fn next_delay(&self) -> Duration {
401 if self.config.jitter.is_zero() {
402 return Duration::ZERO;
403 }
404 let mut rng = self.rng.lock().await;
405 let max_ms = self.config.jitter.as_millis() as u64;
406 if max_ms == 0 {
407 Duration::ZERO
408 } else {
409 let jitter_ms = rng.gen_range(0..=max_ms);
410 Duration::from_millis(jitter_ms)
411 }
412 }
413
414 async fn sample_jitter_ms(&self) -> i64 {
415 if self.config.jitter.is_zero() {
416 return 0;
417 }
418 let max_ms_u128 = self.config.jitter.as_millis();
419 let max_ms = max_ms_u128.min(i64::MAX as u128) as i64;
420 if max_ms == 0 {
421 return 0;
422 }
423 let mut rng = self.rng.lock().await;
424 rng.gen_range(-max_ms..=max_ms)
425 }
426
427 async fn take_bg_handle(&self) -> Option<JoinHandle<()>> {
428 let mut guard = self.bg_task.lock().await;
429 guard.take()
430 }
431
432 async fn background_loop(self) {
433 loop {
434 if self.stopped.load(Ordering::SeqCst) {
435 break;
436 }
437
438 match self.next_refresh_target().await {
439 Some(target) => {
440 let now = self.clock.now_instant();
441 if target <= now {
442 if let Err(err) = self.get_access(now).await {
443 warn!(error = %err, "Background reauth attempt failed");
444 }
445 continue;
446 }
447
448 let delay = target.saturating_duration_since(now);
449
450 tokio::select! {
451 _ = self.stop_notify.notified() => break,
452 _ = self.state_notify.notified() => continue,
453 _ = sleep(delay) => {}
454 }
455
456 if self.stopped.load(Ordering::SeqCst) {
457 break;
458 }
459
460 let now = self.clock.now_instant();
461 if let Err(err) = self.get_access(now).await {
462 warn!(error = %err, "Background reauth attempt failed");
463 }
464 }
465 None => {
466 tokio::select! {
467 _ = self.stop_notify.notified() => break,
468 _ = self.state_notify.notified() => {}
469 }
470 }
471 }
472 }
473 }
474
475 async fn next_refresh_target(&self) -> Option<Instant> {
476 let state = self.state.lock().await;
477 state.token.as_ref().map(|entry| entry.refresh_at)
478 }
479}
480
481#[cfg(test)]
482mod tests {
483 use super::*;
484 use std::{
485 collections::VecDeque,
486 sync::{
487 atomic::{AtomicUsize, Ordering as AtomicOrdering},
488 Arc,
489 },
490 };
491 use tokio::task::yield_now;
492 use tokio::time::advance;
493
494 struct TestClock {
495 start_instant: Instant,
496 instant_offset: std::sync::Mutex<Duration>,
497 start_utc: DateTime<Utc>,
498 utc_offset: std::sync::Mutex<chrono::Duration>,
499 }
500
501 impl TestClock {
502 fn new() -> Self {
503 Self {
504 start_instant: Instant::now(),
505 instant_offset: std::sync::Mutex::new(Duration::ZERO),
506 start_utc: Utc::now(),
507 utc_offset: std::sync::Mutex::new(chrono::Duration::zero()),
508 }
509 }
510 }
511
512 impl Clock for TestClock {
513 fn now_instant(&self) -> Instant {
514 let offset = self.instant_offset.lock().unwrap();
515 self.start_instant + *offset
516 }
517
518 fn now_utc(&self) -> DateTime<Utc> {
519 let offset = self.utc_offset.lock().unwrap();
520 self.start_utc + *offset
521 }
522 }
523
524 struct QueueAuthenticator {
525 calls: AtomicUsize,
526 responses: Mutex<VecDeque<Result<AccessBundle, SiweError>>>,
527 }
528
529 impl QueueAuthenticator {
530 fn new(responses: VecDeque<Result<AccessBundle, SiweError>>) -> Self {
531 Self {
532 calls: AtomicUsize::new(0),
533 responses: Mutex::new(responses),
534 }
535 }
536
537 fn calls(&self) -> usize {
538 self.calls.load(AtomicOrdering::SeqCst)
539 }
540 }
541
542 #[async_trait]
543 impl AccessAuthenticator for QueueAuthenticator {
544 async fn login(&self) -> Result<AccessBundle, SiweError> {
545 self.calls.fetch_add(1, AtomicOrdering::SeqCst);
546 let mut guard = self.responses.lock().await;
547 guard
548 .pop_front()
549 .unwrap_or_else(|| Err(SiweError::MissingField("access_token")))
550 }
551 }
552
553 fn access_bundle(token: &str, ttl_secs: i64) -> AccessBundle {
554 let expires_at = Utc::now() + chrono::Duration::seconds(ttl_secs);
555 AccessBundle::new(token.to_string(), expires_at)
556 }
557
558 struct TokioTestClock {
559 start_tokio: tokio::time::Instant,
560 start_std: Instant,
561 start_utc: DateTime<Utc>,
562 }
563
564 impl TokioTestClock {
565 fn new() -> Self {
566 let start_tokio = tokio::time::Instant::now();
567 Self {
568 start_std: start_tokio.into_std(),
569 start_utc: Utc::now(),
570 start_tokio,
571 }
572 }
573 }
574
575 impl Clock for TokioTestClock {
576 fn now_instant(&self) -> Instant {
577 let now_tokio = tokio::time::Instant::now();
578 let offset = now_tokio.duration_since(self.start_tokio);
579 self.start_std + offset
580 }
581
582 fn now_utc(&self) -> DateTime<Utc> {
583 let now_tokio = tokio::time::Instant::now();
584 let offset = now_tokio.duration_since(self.start_tokio);
585 self.start_utc + chrono::Duration::from_std(offset).unwrap()
586 }
587 }
588
589 struct RecordingAuthenticator {
590 clock: Arc<TokioTestClock>,
591 ttl: chrono::Duration,
592 calls: Mutex<Vec<tokio::time::Instant>>,
593 counter: AtomicUsize,
594 }
595
596 impl RecordingAuthenticator {
597 fn new(clock: Arc<TokioTestClock>, ttl: chrono::Duration) -> Self {
598 Self {
599 clock,
600 ttl,
601 calls: Mutex::new(Vec::new()),
602 counter: AtomicUsize::new(0),
603 }
604 }
605
606 fn call_count(&self) -> usize {
607 self.counter.load(AtomicOrdering::SeqCst)
608 }
609
610 async fn call_history(&self) -> Vec<tokio::time::Instant> {
611 self.calls.lock().await.clone()
612 }
613 }
614
615 #[async_trait]
616 impl AccessAuthenticator for RecordingAuthenticator {
617 async fn login(&self) -> Result<AccessBundle, SiweError> {
618 let order = self.counter.fetch_add(1, AtomicOrdering::SeqCst) + 1;
619 self.calls.lock().await.push(tokio::time::Instant::now());
620 let expires_at = self.clock.now_utc() + self.ttl;
621 Ok(AccessBundle::new(format!("token-{order}"), expires_at))
622 }
623 }
624
625 #[tokio::test]
626 async fn concurrent_access_triggers_single_login() {
627 let responses = VecDeque::from([Ok(access_bundle("token-1", 3600))]);
628 let auth = Arc::new(QueueAuthenticator::new(responses));
629 let clock = Arc::new(TestClock::new());
630 let manager = TokenManager::with_rng(
631 auth.clone(),
632 clock.clone(),
633 TokenManagerConfig {
634 safety_ratio: 1.0,
635 max_retries: 0,
636 jitter: Duration::ZERO,
637 },
638 StdRng::seed_from_u64(42),
639 );
640
641 let now = clock.now_instant();
642 let tasks = (0..5).map(|_| {
643 let manager = manager.clone();
644 async move { manager.get_access(now).await.unwrap() }
645 });
646
647 let tokens: Vec<_> = futures::future::join_all(tasks).await;
648 assert!(tokens.iter().all(|t| t == "token-1"));
649 assert_eq!(auth.calls(), 1);
650 }
651
652 #[tokio::test]
653 async fn unauthorized_forces_refresh_on_next_access() {
654 let responses = VecDeque::from([
655 Ok(access_bundle("token-1", 3600)),
656 Ok(access_bundle("token-2", 3600)),
657 ]);
658 let auth = Arc::new(QueueAuthenticator::new(responses));
659 let clock = Arc::new(TestClock::new());
660 let manager = TokenManager::with_rng(
661 auth.clone(),
662 clock.clone(),
663 TokenManagerConfig {
664 safety_ratio: 1.0,
665 max_retries: 0,
666 jitter: Duration::ZERO,
667 },
668 StdRng::seed_from_u64(1),
669 );
670
671 let now = clock.now_instant();
672 assert_eq!(manager.get_access(now).await.unwrap(), "token-1");
673 manager.on_unauthorized_retry().await;
674 let later = clock.now_instant();
675 assert_eq!(manager.get_access(later).await.unwrap(), "token-2");
676 assert_eq!(auth.calls(), 2);
677 }
678
679 #[tokio::test]
680 async fn retries_until_success() {
681 let responses = VecDeque::from([
682 Err(SiweError::MissingField("nonce")),
683 Ok(access_bundle("token-3", 3600)),
684 ]);
685 let auth = Arc::new(QueueAuthenticator::new(responses));
686 let clock = Arc::new(TestClock::new());
687 let manager = TokenManager::with_rng(
688 auth.clone(),
689 clock.clone(),
690 TokenManagerConfig {
691 safety_ratio: 1.0,
692 max_retries: 1,
693 jitter: Duration::ZERO,
694 },
695 StdRng::seed_from_u64(7),
696 );
697
698 let now = clock.now_instant();
699 assert_eq!(manager.get_access(now).await.unwrap(), "token-3");
700 assert_eq!(auth.calls(), 2);
701 }
702
703 struct HangingAuthenticator {
704 calls: AtomicUsize,
705 notify: Notify,
706 }
707
708 impl HangingAuthenticator {
709 fn new() -> Self {
710 Self {
711 calls: AtomicUsize::new(0),
712 notify: Notify::new(),
713 }
714 }
715
716 fn calls(&self) -> usize {
717 self.calls.load(AtomicOrdering::SeqCst)
718 }
719
720 fn release(&self) {
721 self.notify.notify_waiters();
722 }
723 }
724
725 #[async_trait]
726 impl AccessAuthenticator for HangingAuthenticator {
727 async fn login(&self) -> Result<AccessBundle, SiweError> {
728 self.calls.fetch_add(1, AtomicOrdering::SeqCst);
729 self.notify.notified().await;
730 Ok(access_bundle("never", 60))
731 }
732 }
733
734 #[tokio::test]
735 async fn stop_cancels_inflight_refresh() {
736 let auth = Arc::new(HangingAuthenticator::new());
737 let clock = Arc::new(TestClock::new());
738 let manager = TokenManager::with_rng(
739 auth.clone(),
740 clock.clone(),
741 TokenManagerConfig {
742 safety_ratio: 1.0,
743 max_retries: 0,
744 jitter: Duration::ZERO,
745 },
746 StdRng::seed_from_u64(99),
747 );
748
749 let now = clock.now_instant();
750 let mgr_clone = manager.clone();
751 let handle = tokio::spawn(async move { mgr_clone.get_access(now).await });
752
753 tokio::time::sleep(Duration::from_millis(50)).await;
754 assert_eq!(auth.calls(), 1);
755
756 manager.stop().await;
757 let result = handle.await.unwrap();
758 assert!(matches!(result, Err(TokenManagerError::Stopped)));
759
760 auth.release();
761 }
762
763 #[tokio::test(start_paused = true)]
764 async fn background_refresh_occurs_within_jitter() {
765 let clock = Arc::new(TokioTestClock::new());
766 let ttl = chrono::Duration::seconds(100);
767 let auth = Arc::new(RecordingAuthenticator::new(clock.clone(), ttl));
768 let config = TokenManagerConfig {
769 safety_ratio: 0.5,
770 max_retries: 0,
771 jitter: Duration::from_secs(10),
772 };
773
774 let seed = 123_u64;
775 let manager = TokenManager::with_rng(
776 auth.clone(),
777 clock.clone(),
778 config.clone(),
779 StdRng::seed_from_u64(seed),
780 );
781
782 manager.start_bg().await;
783 yield_now().await;
784
785 let now = clock.now_instant();
786 assert_eq!(manager.get_access(now).await.unwrap(), "token-1");
787
788 yield_now().await;
789
790 let ttl_std = Duration::from_secs(100);
792 let safe_duration = Duration::from_secs(50);
793 let mut rng = StdRng::seed_from_u64(seed);
794 let jitter_ms_range = config.jitter.as_millis() as i64;
795 let jitter_ms = if jitter_ms_range == 0 {
796 0
797 } else {
798 rng.gen_range(-jitter_ms_range..=jitter_ms_range)
799 };
800
801 let mut expected = if jitter_ms >= 0 {
802 safe_duration + Duration::from_millis(jitter_ms as u64)
803 } else {
804 safe_duration
805 .checked_sub(Duration::from_millis((-jitter_ms) as u64))
806 .unwrap_or(Duration::ZERO)
807 };
808 let min_refresh = MIN_DURATION.min(ttl_std);
809 if expected < min_refresh {
810 expected = min_refresh;
811 }
812 if expected > ttl_std {
813 expected = ttl_std;
814 }
815
816 let before = expected.saturating_sub(Duration::from_millis(1));
817 advance(before).await;
818 yield_now().await;
819 assert_eq!(auth.call_count(), 1);
820
821 advance(Duration::from_millis(1)).await;
822 yield_now().await;
823
824 for _ in 0..5 {
825 if auth.call_count() >= 2 {
826 break;
827 }
828 advance(Duration::from_millis(1)).await;
829 yield_now().await;
830 }
831
832 assert_eq!(auth.call_count(), 2);
833 let calls = auth.call_history().await;
834 let elapsed = calls[1].duration_since(calls[0]);
835 let elapsed_std = Duration::from_secs_f64(elapsed.as_secs_f64());
836 assert!(elapsed_std >= expected);
837 assert!(elapsed_std <= expected + Duration::from_millis(10));
838 }
839
840 #[tokio::test(start_paused = true)]
841 async fn background_stop_prevents_refresh() {
842 let clock = Arc::new(TokioTestClock::new());
843 let ttl = chrono::Duration::seconds(100);
844 let auth = Arc::new(RecordingAuthenticator::new(clock.clone(), ttl));
845 let config = TokenManagerConfig {
846 safety_ratio: 0.5,
847 max_retries: 0,
848 jitter: Duration::from_secs(5),
849 };
850
851 let manager = TokenManager::with_rng(
852 auth.clone(),
853 clock.clone(),
854 config,
855 StdRng::seed_from_u64(321),
856 );
857
858 manager.start_bg().await;
859 yield_now().await;
860
861 let now = clock.now_instant();
862 assert_eq!(manager.get_access(now).await.unwrap(), "token-1");
863
864 manager.stop_bg().await;
865
866 advance(Duration::from_secs(200)).await;
867 yield_now().await;
868
869 assert_eq!(auth.call_count(), 1);
870 }
871}