posemesh_compute_node/auth/
token_manager.rs

1use std::{
2    sync::{
3        atomic::{AtomicBool, Ordering},
4        Arc,
5    },
6    time::{Duration, Instant},
7};
8
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use rand::{rngs::StdRng, Rng, SeedableRng};
12use tokio::{
13    sync::{Mutex, Notify},
14    task::JoinHandle,
15    time::sleep,
16};
17use tracing::{info, warn};
18
19use super::siwe::{AccessBundle, SiweError};
20
21const DEFAULT_RATIO: f64 = 0.75;
22const MIN_DURATION: Duration = Duration::from_millis(1);
23
24#[async_trait]
25pub trait AccessAuthenticator: Send + Sync {
26    async fn login(&self) -> Result<AccessBundle, SiweError>;
27}
28
29pub trait Clock: Send + Sync {
30    fn now_instant(&self) -> Instant;
31    fn now_utc(&self) -> DateTime<Utc>;
32}
33
34#[derive(Clone)]
35pub struct SystemClock;
36
37impl Clock for SystemClock {
38    fn now_instant(&self) -> Instant {
39        Instant::now()
40    }
41
42    fn now_utc(&self) -> DateTime<Utc> {
43        Utc::now()
44    }
45}
46
47#[derive(Debug, Clone)]
48pub struct TokenManagerConfig {
49    pub safety_ratio: f64,
50    pub max_retries: u32,
51    pub jitter: Duration,
52}
53
54impl Default for TokenManagerConfig {
55    fn default() -> Self {
56        Self {
57            safety_ratio: DEFAULT_RATIO,
58            max_retries: 3,
59            jitter: Duration::from_millis(500),
60        }
61    }
62}
63
64impl TokenManagerConfig {
65    fn clamped_ratio(&self) -> f64 {
66        self.safety_ratio.clamp(0.0, 1.0)
67    }
68}
69
70#[derive(Debug, thiserror::Error)]
71pub enum TokenManagerError {
72    #[error("token manager stopped")]
73    Stopped,
74    #[error("token response already expired")]
75    Expired,
76    #[error("authentication failed after {attempts} attempts: {last_error}")]
77    Authentication {
78        attempts: u32,
79        #[source]
80        last_error: SiweError,
81    },
82}
83
84#[derive(Debug, thiserror::Error)]
85pub enum TokenProviderError {
86    #[error(transparent)]
87    TokenManager(#[from] TokenManagerError),
88    #[error("{0}")]
89    Message(String),
90}
91
92pub type TokenProviderResult<T> = std::result::Result<T, TokenProviderError>;
93
94#[async_trait]
95pub trait TokenProvider: Send + Sync {
96    async fn bearer(&self) -> TokenProviderResult<String>;
97    async fn on_unauthorized(&self);
98}
99
100pub struct TokenManager<A: AccessAuthenticator, C: Clock> {
101    auth: Arc<A>,
102    clock: Arc<C>,
103    config: TokenManagerConfig,
104    rng: Arc<Mutex<StdRng>>,
105    state: Arc<Mutex<State>>,
106    stopped: Arc<AtomicBool>,
107    stop_notify: Arc<Notify>,
108    state_notify: Arc<Notify>,
109    bg_task: Arc<Mutex<Option<JoinHandle<()>>>>,
110}
111
112impl<A: AccessAuthenticator, C: Clock> Clone for TokenManager<A, C> {
113    fn clone(&self) -> Self {
114        Self {
115            auth: Arc::clone(&self.auth),
116            clock: Arc::clone(&self.clock),
117            config: self.config.clone(),
118            rng: Arc::clone(&self.rng),
119            state: Arc::clone(&self.state),
120            stopped: Arc::clone(&self.stopped),
121            stop_notify: Arc::clone(&self.stop_notify),
122            state_notify: Arc::clone(&self.state_notify),
123            bg_task: Arc::clone(&self.bg_task),
124        }
125    }
126}
127
128#[async_trait]
129impl<A, C> TokenProvider for TokenManager<A, C>
130where
131    A: AccessAuthenticator + 'static,
132    C: Clock + 'static,
133{
134    async fn bearer(&self) -> TokenProviderResult<String> {
135        let now = self.clock.now_instant();
136        self.get_access(now)
137            .await
138            .map_err(TokenProviderError::TokenManager)
139    }
140
141    async fn on_unauthorized(&self) {
142        self.on_unauthorized_retry().await;
143    }
144}
145
146struct State {
147    token: Option<TokenEntry>,
148    inflight: Option<Arc<Notify>>,
149}
150
151#[derive(Clone)]
152struct TokenEntry {
153    value: String,
154    refresh_at: Instant,
155}
156
157impl TokenEntry {
158    fn is_valid(&self, now: Instant) -> bool {
159        now <= self.refresh_at
160    }
161}
162
163impl<A, C> TokenManager<A, C>
164where
165    A: AccessAuthenticator + 'static,
166    C: Clock + 'static,
167{
168    pub fn new(auth: Arc<A>, clock: Arc<C>, config: TokenManagerConfig) -> Self {
169        let rng = StdRng::from_entropy();
170        Self::with_rng(auth, clock, config, rng)
171    }
172
173    pub fn with_rng(auth: Arc<A>, clock: Arc<C>, config: TokenManagerConfig, rng: StdRng) -> Self {
174        Self {
175            auth,
176            clock,
177            config,
178            rng: Arc::new(Mutex::new(rng)),
179            state: Arc::new(Mutex::new(State {
180                token: None,
181                inflight: None,
182            })),
183            stopped: Arc::new(AtomicBool::new(false)),
184            stop_notify: Arc::new(Notify::new()),
185            state_notify: Arc::new(Notify::new()),
186            bg_task: Arc::new(Mutex::new(None)),
187        }
188    }
189
190    pub async fn get_access(&self, now: Instant) -> Result<String, TokenManagerError> {
191        loop {
192            let (notify, is_owner) = {
193                let mut state = self.state.lock().await;
194                if self.stopped.load(Ordering::SeqCst) {
195                    return Err(TokenManagerError::Stopped);
196                }
197                if let Some(entry) = &state.token {
198                    if entry.is_valid(now) {
199                        return Ok(entry.value.clone());
200                    }
201                }
202                match state.inflight.clone() {
203                    Some(existing) => (existing, false),
204                    None => {
205                        let notify = Arc::new(Notify::new());
206                        state.inflight = Some(notify.clone());
207                        (notify, true)
208                    }
209                }
210            };
211
212            if is_owner {
213                let result = self.reauth(now).await;
214                let mut refreshed = false;
215                let (notify_to_signal, outcome) = {
216                    let mut state = self.state.lock().await;
217                    let notify_opt = state.inflight.take();
218                    let outcome = match result {
219                        Ok(entry) => {
220                            let token = entry.value.clone();
221                            state.token = Some(entry);
222                            refreshed = true;
223                            Ok(token)
224                        }
225                        Err(err) => Err(err),
226                    };
227                    (notify_opt, outcome)
228                };
229                if let Some(n) = notify_to_signal {
230                    n.notify_waiters();
231                }
232                if refreshed {
233                    self.state_notify.notify_waiters();
234                }
235                return outcome;
236            } else {
237                notify.notified().await;
238            }
239        }
240    }
241
242    pub async fn clear(&self) {
243        let mut state = self.state.lock().await;
244        state.token = None;
245        drop(state);
246        self.state_notify.notify_waiters();
247    }
248
249    pub async fn on_unauthorized_retry(&self) {
250        let mut state = self.state.lock().await;
251        if let Some(entry) = &mut state.token {
252            let now = self.clock.now_instant();
253            entry.refresh_at = now.checked_sub(MIN_DURATION).unwrap_or(now);
254        }
255        drop(state);
256        self.state_notify.notify_waiters();
257    }
258
259    pub async fn stop(&self) {
260        self.stopped.store(true, Ordering::SeqCst);
261        self.stop_notify.notify_waiters();
262        let notify = {
263            let mut state = self.state.lock().await;
264            state.token = None;
265            let inflight = state.inflight.take();
266            drop(state);
267            self.state_notify.notify_waiters();
268            inflight
269        };
270        if let Some(waiters) = notify {
271            waiters.notify_waiters();
272        }
273        if let Some(handle) = self.take_bg_handle().await {
274            let _ = handle.await;
275        }
276    }
277
278    pub async fn start_bg(&self) {
279        let mut guard = self.bg_task.lock().await;
280        if guard.is_some() {
281            return;
282        }
283        if self.stopped.load(Ordering::SeqCst) {
284            self.stopped.store(false, Ordering::SeqCst);
285        }
286        let manager = self.clone();
287        let handle = tokio::spawn(async move {
288            manager.background_loop().await;
289        });
290        *guard = Some(handle);
291    }
292
293    pub async fn stop_bg(&self) {
294        self.stop().await;
295    }
296
297    async fn reauth(&self, now: Instant) -> Result<TokenEntry, TokenManagerError> {
298        if self.stopped.load(Ordering::SeqCst) {
299            return Err(TokenManagerError::Stopped);
300        }
301
302        let attempts = self.config.max_retries.saturating_add(1);
303        let mut last_error: Option<SiweError> = None;
304
305        for attempt in 1..=attempts {
306            if self.stopped.load(Ordering::SeqCst) {
307                return Err(TokenManagerError::Stopped);
308            }
309
310            let login_fut = self.auth.login();
311            tokio::pin!(login_fut);
312            let stop_fut = self.stop_notify.notified();
313            tokio::pin!(stop_fut);
314
315            let bundle = tokio::select! {
316                res = &mut login_fut => res,
317                _ = &mut stop_fut => return Err(TokenManagerError::Stopped),
318            };
319
320            match bundle {
321                Ok(bundle) => match self.bundle_to_entry(&bundle, now).await {
322                    Ok(entry) => {
323                        let ttl = entry.refresh_at.saturating_duration_since(now);
324                        info!(
325                            expires_in_ms = ttl.as_millis(),
326                            "Refreshed DDS access token"
327                        );
328                        return Ok(entry);
329                    }
330                    Err(err) => {
331                        warn!(attempt, error = %err, "DDS SIWE response invalid");
332                        last_error = Some(err);
333                    }
334                },
335                Err(err) => {
336                    warn!(attempt, error = %err, "DDS SIWE login failed");
337                    last_error = Some(err);
338                }
339            }
340
341            if attempt < attempts {
342                let delay = self.next_delay().await;
343                if !delay.is_zero() {
344                    tokio::time::sleep(delay).await;
345                }
346            }
347        }
348
349        Err(TokenManagerError::Authentication {
350            attempts,
351            last_error: last_error.unwrap_or_else(|| SiweError::MissingField("access_token")),
352        })
353    }
354
355    async fn bundle_to_entry(
356        &self,
357        bundle: &AccessBundle,
358        now: Instant,
359    ) -> Result<TokenEntry, SiweError> {
360        let now_utc = self.clock.now_utc();
361        let expires_at = bundle.expires_at();
362        let ttl = expires_at
363            .signed_duration_since(now_utc)
364            .to_std()
365            .map_err(|_| SiweError::MissingField("expires_at"))?;
366        if ttl.is_zero() {
367            return Err(SiweError::MissingField("expires_at"));
368        }
369
370        let ratio = self.config.clamped_ratio();
371        let safe_secs = ttl.as_secs_f64() * ratio;
372        let safe_duration = if safe_secs <= 0.0 {
373            MIN_DURATION.min(ttl)
374        } else {
375            Duration::from_secs_f64(safe_secs).min(ttl)
376        };
377
378        let actual_expiry = now + ttl;
379        let base_refresh = now + safe_duration;
380
381        let jitter_ms = self.sample_jitter_ms().await;
382        let refresh_at = if jitter_ms >= 0 {
383            let add = Duration::from_millis(jitter_ms as u64);
384            base_refresh + add
385        } else {
386            let sub = Duration::from_millis((-jitter_ms) as u64);
387            base_refresh.checked_sub(sub).unwrap_or(now)
388        };
389
390        let min_refresh = now + MIN_DURATION.min(ttl);
391        let mut refresh_at = refresh_at.max(min_refresh);
392        refresh_at = refresh_at.min(actual_expiry);
393
394        Ok(TokenEntry {
395            value: bundle.token().to_string(),
396            refresh_at,
397        })
398    }
399
400    async fn next_delay(&self) -> Duration {
401        if self.config.jitter.is_zero() {
402            return Duration::ZERO;
403        }
404        let mut rng = self.rng.lock().await;
405        let max_ms = self.config.jitter.as_millis() as u64;
406        if max_ms == 0 {
407            Duration::ZERO
408        } else {
409            let jitter_ms = rng.gen_range(0..=max_ms);
410            Duration::from_millis(jitter_ms)
411        }
412    }
413
414    async fn sample_jitter_ms(&self) -> i64 {
415        if self.config.jitter.is_zero() {
416            return 0;
417        }
418        let max_ms_u128 = self.config.jitter.as_millis();
419        let max_ms = max_ms_u128.min(i64::MAX as u128) as i64;
420        if max_ms == 0 {
421            return 0;
422        }
423        let mut rng = self.rng.lock().await;
424        rng.gen_range(-max_ms..=max_ms)
425    }
426
427    async fn take_bg_handle(&self) -> Option<JoinHandle<()>> {
428        let mut guard = self.bg_task.lock().await;
429        guard.take()
430    }
431
432    async fn background_loop(self) {
433        loop {
434            if self.stopped.load(Ordering::SeqCst) {
435                break;
436            }
437
438            match self.next_refresh_target().await {
439                Some(target) => {
440                    let now = self.clock.now_instant();
441                    if target <= now {
442                        if let Err(err) = self.get_access(now).await {
443                            warn!(error = %err, "Background reauth attempt failed");
444                        }
445                        continue;
446                    }
447
448                    let delay = target.saturating_duration_since(now);
449
450                    tokio::select! {
451                        _ = self.stop_notify.notified() => break,
452                        _ = self.state_notify.notified() => continue,
453                        _ = sleep(delay) => {}
454                    }
455
456                    if self.stopped.load(Ordering::SeqCst) {
457                        break;
458                    }
459
460                    let now = self.clock.now_instant();
461                    if let Err(err) = self.get_access(now).await {
462                        warn!(error = %err, "Background reauth attempt failed");
463                    }
464                }
465                None => {
466                    tokio::select! {
467                        _ = self.stop_notify.notified() => break,
468                        _ = self.state_notify.notified() => {}
469                    }
470                }
471            }
472        }
473    }
474
475    async fn next_refresh_target(&self) -> Option<Instant> {
476        let state = self.state.lock().await;
477        state.token.as_ref().map(|entry| entry.refresh_at)
478    }
479}
480
481#[cfg(test)]
482mod tests {
483    use super::*;
484    use std::{
485        collections::VecDeque,
486        sync::{
487            atomic::{AtomicUsize, Ordering as AtomicOrdering},
488            Arc,
489        },
490    };
491    use tokio::task::yield_now;
492    use tokio::time::advance;
493
494    struct TestClock {
495        start_instant: Instant,
496        instant_offset: std::sync::Mutex<Duration>,
497        start_utc: DateTime<Utc>,
498        utc_offset: std::sync::Mutex<chrono::Duration>,
499    }
500
501    impl TestClock {
502        fn new() -> Self {
503            Self {
504                start_instant: Instant::now(),
505                instant_offset: std::sync::Mutex::new(Duration::ZERO),
506                start_utc: Utc::now(),
507                utc_offset: std::sync::Mutex::new(chrono::Duration::zero()),
508            }
509        }
510    }
511
512    impl Clock for TestClock {
513        fn now_instant(&self) -> Instant {
514            let offset = self.instant_offset.lock().unwrap();
515            self.start_instant + *offset
516        }
517
518        fn now_utc(&self) -> DateTime<Utc> {
519            let offset = self.utc_offset.lock().unwrap();
520            self.start_utc + *offset
521        }
522    }
523
524    struct QueueAuthenticator {
525        calls: AtomicUsize,
526        responses: Mutex<VecDeque<Result<AccessBundle, SiweError>>>,
527    }
528
529    impl QueueAuthenticator {
530        fn new(responses: VecDeque<Result<AccessBundle, SiweError>>) -> Self {
531            Self {
532                calls: AtomicUsize::new(0),
533                responses: Mutex::new(responses),
534            }
535        }
536
537        fn calls(&self) -> usize {
538            self.calls.load(AtomicOrdering::SeqCst)
539        }
540    }
541
542    #[async_trait]
543    impl AccessAuthenticator for QueueAuthenticator {
544        async fn login(&self) -> Result<AccessBundle, SiweError> {
545            self.calls.fetch_add(1, AtomicOrdering::SeqCst);
546            let mut guard = self.responses.lock().await;
547            guard
548                .pop_front()
549                .unwrap_or_else(|| Err(SiweError::MissingField("access_token")))
550        }
551    }
552
553    fn access_bundle(token: &str, ttl_secs: i64) -> AccessBundle {
554        let expires_at = Utc::now() + chrono::Duration::seconds(ttl_secs);
555        AccessBundle::new(token.to_string(), expires_at)
556    }
557
558    struct TokioTestClock {
559        start_tokio: tokio::time::Instant,
560        start_std: Instant,
561        start_utc: DateTime<Utc>,
562    }
563
564    impl TokioTestClock {
565        fn new() -> Self {
566            let start_tokio = tokio::time::Instant::now();
567            Self {
568                start_std: start_tokio.into_std(),
569                start_utc: Utc::now(),
570                start_tokio,
571            }
572        }
573    }
574
575    impl Clock for TokioTestClock {
576        fn now_instant(&self) -> Instant {
577            let now_tokio = tokio::time::Instant::now();
578            let offset = now_tokio.duration_since(self.start_tokio);
579            self.start_std + offset
580        }
581
582        fn now_utc(&self) -> DateTime<Utc> {
583            let now_tokio = tokio::time::Instant::now();
584            let offset = now_tokio.duration_since(self.start_tokio);
585            self.start_utc + chrono::Duration::from_std(offset).unwrap()
586        }
587    }
588
589    struct RecordingAuthenticator {
590        clock: Arc<TokioTestClock>,
591        ttl: chrono::Duration,
592        calls: Mutex<Vec<tokio::time::Instant>>,
593        counter: AtomicUsize,
594    }
595
596    impl RecordingAuthenticator {
597        fn new(clock: Arc<TokioTestClock>, ttl: chrono::Duration) -> Self {
598            Self {
599                clock,
600                ttl,
601                calls: Mutex::new(Vec::new()),
602                counter: AtomicUsize::new(0),
603            }
604        }
605
606        fn call_count(&self) -> usize {
607            self.counter.load(AtomicOrdering::SeqCst)
608        }
609
610        async fn call_history(&self) -> Vec<tokio::time::Instant> {
611            self.calls.lock().await.clone()
612        }
613    }
614
615    #[async_trait]
616    impl AccessAuthenticator for RecordingAuthenticator {
617        async fn login(&self) -> Result<AccessBundle, SiweError> {
618            let order = self.counter.fetch_add(1, AtomicOrdering::SeqCst) + 1;
619            self.calls.lock().await.push(tokio::time::Instant::now());
620            let expires_at = self.clock.now_utc() + self.ttl;
621            Ok(AccessBundle::new(format!("token-{order}"), expires_at))
622        }
623    }
624
625    #[tokio::test]
626    async fn concurrent_access_triggers_single_login() {
627        let responses = VecDeque::from([Ok(access_bundle("token-1", 3600))]);
628        let auth = Arc::new(QueueAuthenticator::new(responses));
629        let clock = Arc::new(TestClock::new());
630        let manager = TokenManager::with_rng(
631            auth.clone(),
632            clock.clone(),
633            TokenManagerConfig {
634                safety_ratio: 1.0,
635                max_retries: 0,
636                jitter: Duration::ZERO,
637            },
638            StdRng::seed_from_u64(42),
639        );
640
641        let now = clock.now_instant();
642        let tasks = (0..5).map(|_| {
643            let manager = manager.clone();
644            async move { manager.get_access(now).await.unwrap() }
645        });
646
647        let tokens: Vec<_> = futures::future::join_all(tasks).await;
648        assert!(tokens.iter().all(|t| t == "token-1"));
649        assert_eq!(auth.calls(), 1);
650    }
651
652    #[tokio::test]
653    async fn unauthorized_forces_refresh_on_next_access() {
654        let responses = VecDeque::from([
655            Ok(access_bundle("token-1", 3600)),
656            Ok(access_bundle("token-2", 3600)),
657        ]);
658        let auth = Arc::new(QueueAuthenticator::new(responses));
659        let clock = Arc::new(TestClock::new());
660        let manager = TokenManager::with_rng(
661            auth.clone(),
662            clock.clone(),
663            TokenManagerConfig {
664                safety_ratio: 1.0,
665                max_retries: 0,
666                jitter: Duration::ZERO,
667            },
668            StdRng::seed_from_u64(1),
669        );
670
671        let now = clock.now_instant();
672        assert_eq!(manager.get_access(now).await.unwrap(), "token-1");
673        manager.on_unauthorized_retry().await;
674        let later = clock.now_instant();
675        assert_eq!(manager.get_access(later).await.unwrap(), "token-2");
676        assert_eq!(auth.calls(), 2);
677    }
678
679    #[tokio::test]
680    async fn retries_until_success() {
681        let responses = VecDeque::from([
682            Err(SiweError::MissingField("nonce")),
683            Ok(access_bundle("token-3", 3600)),
684        ]);
685        let auth = Arc::new(QueueAuthenticator::new(responses));
686        let clock = Arc::new(TestClock::new());
687        let manager = TokenManager::with_rng(
688            auth.clone(),
689            clock.clone(),
690            TokenManagerConfig {
691                safety_ratio: 1.0,
692                max_retries: 1,
693                jitter: Duration::ZERO,
694            },
695            StdRng::seed_from_u64(7),
696        );
697
698        let now = clock.now_instant();
699        assert_eq!(manager.get_access(now).await.unwrap(), "token-3");
700        assert_eq!(auth.calls(), 2);
701    }
702
703    struct HangingAuthenticator {
704        calls: AtomicUsize,
705        notify: Notify,
706    }
707
708    impl HangingAuthenticator {
709        fn new() -> Self {
710            Self {
711                calls: AtomicUsize::new(0),
712                notify: Notify::new(),
713            }
714        }
715
716        fn calls(&self) -> usize {
717            self.calls.load(AtomicOrdering::SeqCst)
718        }
719
720        fn release(&self) {
721            self.notify.notify_waiters();
722        }
723    }
724
725    #[async_trait]
726    impl AccessAuthenticator for HangingAuthenticator {
727        async fn login(&self) -> Result<AccessBundle, SiweError> {
728            self.calls.fetch_add(1, AtomicOrdering::SeqCst);
729            self.notify.notified().await;
730            Ok(access_bundle("never", 60))
731        }
732    }
733
734    #[tokio::test]
735    async fn stop_cancels_inflight_refresh() {
736        let auth = Arc::new(HangingAuthenticator::new());
737        let clock = Arc::new(TestClock::new());
738        let manager = TokenManager::with_rng(
739            auth.clone(),
740            clock.clone(),
741            TokenManagerConfig {
742                safety_ratio: 1.0,
743                max_retries: 0,
744                jitter: Duration::ZERO,
745            },
746            StdRng::seed_from_u64(99),
747        );
748
749        let now = clock.now_instant();
750        let mgr_clone = manager.clone();
751        let handle = tokio::spawn(async move { mgr_clone.get_access(now).await });
752
753        tokio::time::sleep(Duration::from_millis(50)).await;
754        assert_eq!(auth.calls(), 1);
755
756        manager.stop().await;
757        let result = handle.await.unwrap();
758        assert!(matches!(result, Err(TokenManagerError::Stopped)));
759
760        auth.release();
761    }
762
763    #[tokio::test(start_paused = true)]
764    async fn background_refresh_occurs_within_jitter() {
765        let clock = Arc::new(TokioTestClock::new());
766        let ttl = chrono::Duration::seconds(100);
767        let auth = Arc::new(RecordingAuthenticator::new(clock.clone(), ttl));
768        let config = TokenManagerConfig {
769            safety_ratio: 0.5,
770            max_retries: 0,
771            jitter: Duration::from_secs(10),
772        };
773
774        let seed = 123_u64;
775        let manager = TokenManager::with_rng(
776            auth.clone(),
777            clock.clone(),
778            config.clone(),
779            StdRng::seed_from_u64(seed),
780        );
781
782        manager.start_bg().await;
783        yield_now().await;
784
785        let now = clock.now_instant();
786        assert_eq!(manager.get_access(now).await.unwrap(), "token-1");
787
788        yield_now().await;
789
790        // Compute expected refresh timing based on deterministic RNG
791        let ttl_std = Duration::from_secs(100);
792        let safe_duration = Duration::from_secs(50);
793        let mut rng = StdRng::seed_from_u64(seed);
794        let jitter_ms_range = config.jitter.as_millis() as i64;
795        let jitter_ms = if jitter_ms_range == 0 {
796            0
797        } else {
798            rng.gen_range(-jitter_ms_range..=jitter_ms_range)
799        };
800
801        let mut expected = if jitter_ms >= 0 {
802            safe_duration + Duration::from_millis(jitter_ms as u64)
803        } else {
804            safe_duration
805                .checked_sub(Duration::from_millis((-jitter_ms) as u64))
806                .unwrap_or(Duration::ZERO)
807        };
808        let min_refresh = MIN_DURATION.min(ttl_std);
809        if expected < min_refresh {
810            expected = min_refresh;
811        }
812        if expected > ttl_std {
813            expected = ttl_std;
814        }
815
816        let before = expected.saturating_sub(Duration::from_millis(1));
817        advance(before).await;
818        yield_now().await;
819        assert_eq!(auth.call_count(), 1);
820
821        advance(Duration::from_millis(1)).await;
822        yield_now().await;
823
824        for _ in 0..5 {
825            if auth.call_count() >= 2 {
826                break;
827            }
828            advance(Duration::from_millis(1)).await;
829            yield_now().await;
830        }
831
832        assert_eq!(auth.call_count(), 2);
833        let calls = auth.call_history().await;
834        let elapsed = calls[1].duration_since(calls[0]);
835        let elapsed_std = Duration::from_secs_f64(elapsed.as_secs_f64());
836        assert!(elapsed_std >= expected);
837        assert!(elapsed_std <= expected + Duration::from_millis(10));
838    }
839
840    #[tokio::test(start_paused = true)]
841    async fn background_stop_prevents_refresh() {
842        let clock = Arc::new(TokioTestClock::new());
843        let ttl = chrono::Duration::seconds(100);
844        let auth = Arc::new(RecordingAuthenticator::new(clock.clone(), ttl));
845        let config = TokenManagerConfig {
846            safety_ratio: 0.5,
847            max_retries: 0,
848            jitter: Duration::from_secs(5),
849        };
850
851        let manager = TokenManager::with_rng(
852            auth.clone(),
853            clock.clone(),
854            config,
855            StdRng::seed_from_u64(321),
856        );
857
858        manager.start_bg().await;
859        yield_now().await;
860
861        let now = clock.now_instant();
862        assert_eq!(manager.get_access(now).await.unwrap(), "token-1");
863
864        manager.stop_bg().await;
865
866        advance(Duration::from_secs(200)).await;
867        yield_now().await;
868
869        assert_eq!(auth.call_count(), 1);
870    }
871}