Skip to main content

meerkat_runtime/handles/
oauth_flow.rs

1//! Runtime OAuth login-flow lifecycle authority.
2//!
3//! REST/RPC surfaces reach browser-PKCE and device-code admission/consume
4//! through [`MeerkatMachine`](crate::meerkat_machine::MeerkatMachine). The
5//! auth-core registry stores short-lived PKCE/device payloads; the lifecycle
6//! membership and terminal transitions are routed through generated
7//! AuthMachine inputs keyed by the target binding.
8
9use std::collections::BTreeSet;
10use std::sync::{Arc, Mutex, Weak};
11use std::time::{Duration, Instant};
12
13use meerkat_auth_core::oauth_flow::{
14    OAuthDeviceFlowRecord, OAuthDevicePollLease, OAuthDevicePollLifecycle, OAuthFlowAuthority,
15    OAuthFlowError, OAuthFlowRecord, OAuthFlowRegistry, OAuthFlowRegistrySnapshot,
16    OAuthProviderIdentity, OAuthPrunedFlows, PersistedOAuthBrowserFlow, PersistedOAuthDeviceFlow,
17};
18use meerkat_core::AuthBindingRef;
19use meerkat_core::handles::{DslTransitionError, LeaseKey};
20use meerkat_core::time_compat::{SystemTime, UNIX_EPOCH};
21
22use crate::auth_machine::dsl as auth_dsl;
23use crate::store::RuntimeStore;
24
25use super::RuntimeAuthLeaseHandle;
26use super::auth_lease::{AuthLeaseReleaseObserver, ReleasedOAuthFlows};
27
28type StoreSlot = Arc<Mutex<Option<Weak<dyn RuntimeStore>>>>;
29type PayloadLock = Arc<Mutex<()>>;
30type RemovedBrowserSnapshotKeys = Vec<BrowserSnapshotKey>;
31type RemovedDeviceSnapshotKeys = Vec<DeviceSnapshotKey>;
32
33fn current_time_millis() -> u64 {
34    SystemTime::now()
35        .duration_since(UNIX_EPOCH)
36        .map(|duration| u64::try_from(duration.as_millis()).unwrap_or(u64::MAX))
37        .unwrap_or(0)
38}
39
40fn expires_at_millis(duration: Duration) -> Result<u64, OAuthFlowError> {
41    let duration_millis =
42        u64::try_from(duration.as_millis()).map_err(|_| OAuthFlowError::DeviceExpiryOutOfRange)?;
43    current_time_millis()
44        .checked_add(duration_millis)
45        .ok_or(OAuthFlowError::DeviceExpiryOutOfRange)
46}
47
48fn load_oauth_snapshot_for_release(
49    store: &StoreSlot,
50    operation: &'static str,
51) -> Result<Option<OAuthFlowRegistrySnapshot>, DslTransitionError> {
52    let store = store
53        .lock()
54        .unwrap_or_else(std::sync::PoisonError::into_inner)
55        .clone();
56    let Some(store) = store else {
57        return Ok(None);
58    };
59    let store = store.upgrade().ok_or_else(|| {
60        DslTransitionError::new(operation, "runtime store is no longer available")
61    })?;
62    let Some(bytes) = store
63        .load_auth_oauth_flow_snapshot()
64        .map_err(|err| DslTransitionError::new(operation, err.to_string()))?
65    else {
66        return Ok(None);
67    };
68    serde_json::from_slice::<OAuthFlowRegistrySnapshot>(&bytes)
69        .map(Some)
70        .map_err(|err| DslTransitionError::new(operation, err.to_string()))
71}
72
73#[derive(Debug)]
74pub struct RuntimeOAuthFlowHandle {
75    registry: Arc<OAuthFlowRegistry>,
76    lifecycle: Arc<RuntimeAuthLeaseHandle>,
77    store: StoreSlot,
78    payload_lock: PayloadLock,
79    _release_observer: Option<Arc<OAuthPayloadReleaseObserver>>,
80}
81
82#[derive(Debug)]
83struct OAuthPayloadReleaseObserver {
84    registry: Arc<OAuthFlowRegistry>,
85    store: StoreSlot,
86    payload_lock: PayloadLock,
87}
88
89impl AuthLeaseReleaseObserver for OAuthPayloadReleaseObserver {
90    fn oauth_flows_for_release(
91        &self,
92        lease_key: &LeaseKey,
93    ) -> Result<ReleasedOAuthFlows, DslTransitionError> {
94        let _payload_guard = self
95            .payload_lock
96            .lock()
97            .unwrap_or_else(std::sync::PoisonError::into_inner);
98        let target = AuthBindingRef {
99            realm: lease_key.realm.clone(),
100            binding: lease_key.binding.clone(),
101            profile: lease_key.profile.clone(),
102        };
103        let Some(snapshot) =
104            load_oauth_snapshot_for_release(&self.store, "collect_oauth_flow_payloads")?
105        else {
106            return Ok(ReleasedOAuthFlows {
107                lease_key: lease_key.clone(),
108                browser_flow_ids: Vec::new(),
109                device_flow_ids: Vec::new(),
110            });
111        };
112        let now_millis = current_time_millis();
113        Ok(ReleasedOAuthFlows {
114            lease_key: lease_key.clone(),
115            browser_flow_ids: snapshot
116                .browser
117                .iter()
118                .filter(|flow| flow.target == target && flow.expires_at_millis > now_millis)
119                .map(|flow| flow.state.clone())
120                .collect(),
121            device_flow_ids: snapshot
122                .device
123                .iter()
124                .filter(|flow| flow.target == target && flow.expires_at_millis > now_millis)
125                .map(|flow| flow.device_code.clone())
126                .collect(),
127        })
128    }
129
130    fn auth_lease_released(&self, released: &ReleasedOAuthFlows) -> Result<(), DslTransitionError> {
131        let _payload_guard = self
132            .payload_lock
133            .lock()
134            .unwrap_or_else(std::sync::PoisonError::into_inner);
135        let target = AuthBindingRef {
136            realm: released.lease_key.realm.clone(),
137            binding: released.lease_key.binding.clone(),
138            profile: released.lease_key.profile.clone(),
139        };
140        let browser_flow_ids = released
141            .browser_flow_ids
142            .iter()
143            .map(String::as_str)
144            .collect::<BTreeSet<_>>();
145        let device_flow_ids = released
146            .device_flow_ids
147            .iter()
148            .map(String::as_str)
149            .collect::<BTreeSet<_>>();
150        let now_millis = current_time_millis();
151        let mut snapshot = self.registry.snapshot_for_persistence(now_millis);
152        let mut removed_browser = snapshot
153            .browser
154            .iter()
155            .filter(|flow| flow.target == target && browser_flow_ids.contains(flow.state.as_str()))
156            .map(persisted_browser_snapshot_key)
157            .collect::<BTreeSet<_>>();
158        let mut removed_device = snapshot
159            .device
160            .iter()
161            .filter(|flow| {
162                flow.target == target && device_flow_ids.contains(flow.device_code.as_str())
163            })
164            .map(persisted_device_snapshot_key)
165            .collect::<BTreeSet<_>>();
166        if let Some(durable) =
167            load_oauth_snapshot_for_release(&self.store, "release_oauth_flow_payloads")?
168        {
169            removed_browser.extend(
170                durable
171                    .browser
172                    .iter()
173                    .filter(|flow| {
174                        flow.target == target
175                            && browser_flow_ids.contains(flow.state.as_str())
176                            && flow.expires_at_millis > now_millis
177                    })
178                    .map(persisted_browser_snapshot_key),
179            );
180            removed_device.extend(
181                durable
182                    .device
183                    .iter()
184                    .filter(|flow| {
185                        flow.target == target
186                            && device_flow_ids.contains(flow.device_code.as_str())
187                            && flow.expires_at_millis > now_millis
188                    })
189                    .map(persisted_device_snapshot_key),
190            );
191        }
192        let removed_browser = removed_browser.into_iter().collect::<Vec<_>>();
193        let removed_device = removed_device.into_iter().collect::<Vec<_>>();
194        snapshot.browser.retain(|flow| {
195            !(flow.target == target && browser_flow_ids.contains(flow.state.as_str()))
196        });
197        snapshot.device.retain(|flow| {
198            !(flow.target == target && device_flow_ids.contains(flow.device_code.as_str()))
199        });
200        persist_registry_snapshot(
201            &snapshot,
202            &self.store,
203            "release_oauth_flow_payloads",
204            &removed_browser,
205            &removed_device,
206            now_millis,
207            SnapshotPersistPolicy::merge(),
208        )
209        .map_err(|err| {
210            DslTransitionError::new(
211                "AuthLeaseReleaseObserver::release_oauth_flow_payloads",
212                err.to_string(),
213            )
214        })?;
215        let _ = self.registry.retain_flows_with_lifecycle(
216            |record_target, flow_id| {
217                !(record_target == &target && browser_flow_ids.contains(flow_id))
218            },
219            |record_target, device_code| {
220                !(record_target == &target && device_flow_ids.contains(device_code))
221            },
222        );
223        Ok(())
224    }
225}
226
227impl RuntimeOAuthFlowHandle {
228    pub fn new(ttl: Duration) -> Self {
229        Self::new_with_auth_lease(ttl, Arc::new(RuntimeAuthLeaseHandle::new()))
230    }
231
232    pub fn new_with_auth_lease(ttl: Duration, lifecycle: Arc<RuntimeAuthLeaseHandle>) -> Self {
233        Self::new_with_capacity_auth_lease_and_store(ttl, 1024, lifecycle, None)
234    }
235
236    pub fn new_with_capacity(ttl: Duration, max_outstanding: usize) -> Self {
237        Self::new_with_capacity_and_auth_lease(
238            ttl,
239            max_outstanding,
240            Arc::new(RuntimeAuthLeaseHandle::new()),
241        )
242    }
243
244    pub fn new_with_capacity_and_auth_lease(
245        ttl: Duration,
246        max_outstanding: usize,
247        lifecycle: Arc<RuntimeAuthLeaseHandle>,
248    ) -> Self {
249        Self::new_with_capacity_auth_lease_and_store(ttl, max_outstanding, lifecycle, None)
250    }
251
252    pub fn new_with_persistent_store_and_auth_lease(
253        ttl: Duration,
254        lifecycle: Arc<RuntimeAuthLeaseHandle>,
255        store: &Arc<dyn RuntimeStore>,
256    ) -> Self {
257        Self::new_with_capacity_auth_lease_and_store(
258            ttl,
259            1024,
260            lifecycle,
261            Some(Arc::downgrade(store)),
262        )
263    }
264
265    fn new_with_capacity_auth_lease_and_store(
266        ttl: Duration,
267        max_outstanding: usize,
268        lifecycle: Arc<RuntimeAuthLeaseHandle>,
269        store: Option<Weak<dyn RuntimeStore>>,
270    ) -> Self {
271        let registry = Arc::new(OAuthFlowRegistry::new_with_capacity(ttl, max_outstanding));
272        let store = Arc::new(Mutex::new(store));
273        let payload_lock = Arc::new(Mutex::new(()));
274        let release_observer = Arc::new(OAuthPayloadReleaseObserver {
275            registry: Arc::clone(&registry),
276            store: Arc::clone(&store),
277            payload_lock: Arc::clone(&payload_lock),
278        });
279        let release_observer_dyn: Arc<dyn AuthLeaseReleaseObserver> = release_observer.clone();
280        lifecycle.add_release_observer(Arc::downgrade(&release_observer_dyn));
281        let handle = Self {
282            registry,
283            lifecycle,
284            store,
285            payload_lock,
286            _release_observer: Some(release_observer),
287        };
288        handle.rehydrate_persisted_payloads();
289        handle
290    }
291
292    pub(crate) fn bind_persistent_store(&self, store: &Arc<dyn RuntimeStore>) {
293        *self
294            .store
295            .lock()
296            .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Arc::downgrade(store));
297    }
298
299    fn apply(
300        &self,
301        target: &AuthBindingRef,
302        input: auth_dsl::AuthMachineInput,
303        operation: &'static str,
304        create_if_missing: bool,
305    ) -> Result<(), OAuthFlowError> {
306        self.lifecycle
307            .apply_oauth_input(target, input, operation, create_if_missing)
308            .map_err(|err| OAuthFlowError::LifecycleRejected {
309                operation,
310                detail: err.to_string(),
311            })
312    }
313
314    fn admit_browser(
315        &self,
316        target: &AuthBindingRef,
317        flow_id: &str,
318        provider: OAuthProviderIdentity,
319        redirect_uri: &str,
320        expires_at_millis: u64,
321    ) -> Result<(), OAuthFlowError> {
322        self.apply(
323            target,
324            auth_dsl::AuthMachineInput::AdmitOAuthBrowserFlow {
325                flow_id: flow_id.to_string(),
326                provider: provider.canonical_alias().to_string(),
327                redirect_uri: redirect_uri.to_string(),
328                expires_at_millis,
329                max_outstanding_flows: self.registry.max_outstanding() as u64,
330            },
331            "admit_oauth_browser_flow",
332            true,
333        )
334    }
335
336    fn verify_browser(
337        &self,
338        target: &AuthBindingRef,
339        flow_id: &str,
340        provider: OAuthProviderIdentity,
341        redirect_uri: &str,
342    ) -> Result<(), OAuthFlowError> {
343        self.apply(
344            target,
345            auth_dsl::AuthMachineInput::VerifyOAuthBrowserFlow {
346                flow_id: flow_id.to_string(),
347                provider: provider.canonical_alias().to_string(),
348                redirect_uri: redirect_uri.to_string(),
349                now_millis: current_time_millis(),
350            },
351            "verify_oauth_browser_flow",
352            false,
353        )
354    }
355
356    fn consume_browser(
357        &self,
358        target: &AuthBindingRef,
359        flow_id: &str,
360        provider: OAuthProviderIdentity,
361        redirect_uri: &str,
362    ) -> Result<(), OAuthFlowError> {
363        self.apply(
364            target,
365            auth_dsl::AuthMachineInput::ConsumeOAuthBrowserFlow {
366                flow_id: flow_id.to_string(),
367                provider: provider.canonical_alias().to_string(),
368                redirect_uri: redirect_uri.to_string(),
369                now_millis: current_time_millis(),
370            },
371            "consume_oauth_browser_flow",
372            false,
373        )
374    }
375
376    fn expire_browser(&self, target: &AuthBindingRef, flow_id: &str) -> Result<(), OAuthFlowError> {
377        self.apply(
378            target,
379            auth_dsl::AuthMachineInput::ExpireOAuthBrowserFlow {
380                flow_id: flow_id.to_string(),
381            },
382            "expire_oauth_browser_flow",
383            false,
384        )
385    }
386
387    fn admit_device(
388        &self,
389        target: &AuthBindingRef,
390        flow_id: &str,
391        provider: OAuthProviderIdentity,
392        expires_at_millis: u64,
393    ) -> Result<(), OAuthFlowError> {
394        self.apply(
395            target,
396            auth_dsl::AuthMachineInput::AdmitOAuthDeviceFlow {
397                flow_id: flow_id.to_string(),
398                provider: provider.canonical_alias().to_string(),
399                expires_at_millis,
400                max_outstanding_flows: self.registry.max_outstanding() as u64,
401            },
402            "admit_oauth_device_flow",
403            true,
404        )
405    }
406
407    fn verify_device(
408        &self,
409        target: &AuthBindingRef,
410        flow_id: &str,
411        provider: OAuthProviderIdentity,
412    ) -> Result<(), OAuthFlowError> {
413        self.apply(
414            target,
415            auth_dsl::AuthMachineInput::VerifyOAuthDeviceFlow {
416                flow_id: flow_id.to_string(),
417                provider: provider.canonical_alias().to_string(),
418                now_millis: current_time_millis(),
419            },
420            "verify_oauth_device_flow",
421            false,
422        )
423    }
424
425    fn begin_device_poll(
426        &self,
427        target: &AuthBindingRef,
428        flow_id: &str,
429        provider: OAuthProviderIdentity,
430    ) -> Result<(), OAuthFlowError> {
431        self.apply(
432            target,
433            auth_dsl::AuthMachineInput::BeginOAuthDevicePoll {
434                flow_id: flow_id.to_string(),
435                provider: provider.canonical_alias().to_string(),
436                now_millis: current_time_millis(),
437            },
438            "begin_oauth_device_poll",
439            false,
440        )
441    }
442
443    fn expire_pruned_flows(&self) {
444        self.expire_collected_flows(OAuthPrunedFlows {
445            browser: self.registry.prune_expired_browser_flows(),
446            device: self.registry.prune_expired_device_flows(),
447        });
448    }
449
450    fn retain_registry_payloads_with_lifecycle(
451        &self,
452    ) -> (OAuthPrunedFlows, OAuthFlowRegistrySnapshot) {
453        let before = self
454            .registry
455            .snapshot_for_persistence(current_time_millis());
456        let pruned = self.registry.retain_flows_with_lifecycle(
457            |target, flow_id| self.lifecycle.has_oauth_browser_flow(target, flow_id),
458            |target, flow_id| self.lifecycle.has_oauth_device_flow(target, flow_id),
459        );
460        (pruned, before)
461    }
462
463    fn expire_collected_flows(&self, pruned: OAuthPrunedFlows) {
464        for (flow_id, target) in pruned.browser {
465            let _ = self.expire_browser(&target, &flow_id);
466        }
467        for (device_code, target) in pruned.device {
468            let _ = self.lifecycle.expire_device_flow(&target, &device_code);
469        }
470    }
471
472    fn removed_snapshot_keys_from_pruned(
473        snapshot: &OAuthFlowRegistrySnapshot,
474        pruned: &OAuthPrunedFlows,
475    ) -> (RemovedBrowserSnapshotKeys, RemovedDeviceSnapshotKeys) {
476        let pruned_browser = pruned
477            .browser
478            .iter()
479            .map(|(flow_id, target)| browser_snapshot_key(target, flow_id))
480            .collect::<BTreeSet<_>>();
481        let pruned_device = pruned
482            .device
483            .iter()
484            .map(|(device_code, target)| device_snapshot_key(target, device_code))
485            .collect::<BTreeSet<_>>();
486        let browser = snapshot
487            .browser
488            .iter()
489            .filter(|flow| pruned_browser.contains(&persisted_browser_snapshot_key(flow)))
490            .map(persisted_browser_snapshot_key)
491            .collect();
492        let device = snapshot
493            .device
494            .iter()
495            .filter(|flow| pruned_device.contains(&persisted_device_snapshot_key(flow)))
496            .map(persisted_device_snapshot_key)
497            .collect();
498        (browser, device)
499    }
500
501    fn store(&self) -> Option<Arc<dyn RuntimeStore>> {
502        self.store
503            .lock()
504            .unwrap_or_else(std::sync::PoisonError::into_inner)
505            .as_ref()
506            .and_then(Weak::upgrade)
507    }
508
509    fn persist_registry_payloads_removing(
510        &self,
511        operation: &'static str,
512        removed_browser: &[BrowserSnapshotKey],
513        removed_device: &[DeviceSnapshotKey],
514    ) -> Result<(), OAuthFlowError> {
515        persist_registry_payloads(
516            &self.registry,
517            &self.store,
518            operation,
519            removed_browser,
520            removed_device,
521        )
522    }
523
524    fn persist_registry_payloads_claiming_removal(
525        &self,
526        operation: &'static str,
527        removed_browser: &[BrowserSnapshotKey],
528        removed_device: &[DeviceSnapshotKey],
529    ) -> Result<(), OAuthFlowError> {
530        persist_registry_payloads_claiming_removal(
531            &self.registry,
532            &self.store,
533            operation,
534            removed_browser,
535            removed_device,
536        )
537    }
538
539    fn persist_registry_payloads_claiming_admission(
540        &self,
541        operation: &'static str,
542        removed_browser: &[BrowserSnapshotKey],
543        removed_device: &[DeviceSnapshotKey],
544        admitted_browser: &[BrowserSnapshotKey],
545        admitted_device: &[DeviceSnapshotKey],
546    ) -> Result<(), OAuthFlowError> {
547        persist_registry_payloads_claiming_admission(
548            &self.registry,
549            &self.store,
550            operation,
551            removed_browser,
552            removed_device,
553            admitted_browser,
554            admitted_device,
555        )
556    }
557
558    fn browser_record_expires_at_millis(
559        &self,
560        record: &OAuthFlowRecord,
561    ) -> Result<u64, OAuthFlowError> {
562        let remaining = self
563            .registry
564            .ttl()
565            .checked_sub(record.created_at.elapsed())
566            .ok_or(OAuthFlowError::Missing)?;
567        expires_at_millis(remaining)
568    }
569
570    fn restore_browser_flow(
571        &self,
572        state: &str,
573        record: &OAuthFlowRecord,
574    ) -> Result<(), OAuthFlowError> {
575        let expires_at_millis = self.browser_record_expires_at_millis(record)?;
576        self.admit_browser(
577            &record.target,
578            state,
579            record.provider,
580            &record.redirect_uri,
581            expires_at_millis,
582        )?;
583        self.registry.insert_restored_browser_flow(
584            state.to_string(),
585            record.target.clone(),
586            record.provider,
587            record.redirect_uri.clone(),
588            record.pkce_verifier.clone(),
589            record.created_at,
590        )
591    }
592
593    fn rehydrate_persisted_payloads(&self) {
594        let Some(store) = self.store() else {
595            return;
596        };
597        let Ok(Some(bytes)) = store.load_auth_oauth_flow_snapshot() else {
598            return;
599        };
600        let Ok(snapshot) = serde_json::from_slice::<OAuthFlowRegistrySnapshot>(&bytes) else {
601            return;
602        };
603        let now_millis = current_time_millis();
604        let now_instant = Instant::now();
605
606        for persisted in snapshot.browser.iter().cloned() {
607            self.restore_browser_payload(persisted, now_millis, now_instant);
608        }
609        for persisted in snapshot.device.iter().cloned() {
610            self.restore_device_payload(persisted, now_millis, now_instant);
611        }
612        let current = self.registry.snapshot_for_persistence(now_millis);
613        let current_browser = current
614            .browser
615            .iter()
616            .map(persisted_browser_snapshot_key)
617            .collect::<BTreeSet<_>>();
618        let current_device = current
619            .device
620            .iter()
621            .map(persisted_device_snapshot_key)
622            .collect::<BTreeSet<_>>();
623        let removed_browser = snapshot
624            .browser
625            .iter()
626            .filter(|flow| !current_browser.contains(&persisted_browser_snapshot_key(flow)))
627            .map(persisted_browser_snapshot_key)
628            .collect::<Vec<_>>();
629        let removed_device = snapshot
630            .device
631            .iter()
632            .filter(|flow| !current_device.contains(&persisted_device_snapshot_key(flow)))
633            .map(persisted_device_snapshot_key)
634            .collect::<Vec<_>>();
635        let _ = self.persist_registry_payloads_removing(
636            "rehydrate_oauth_flows",
637            &removed_browser,
638            &removed_device,
639        );
640    }
641
642    fn sync_persisted_payloads(&self, operation: &'static str) -> Result<(), OAuthFlowError> {
643        let Some(store) = self.store() else {
644            return Ok(());
645        };
646        let snapshot =
647            match store.load_auth_oauth_flow_snapshot().map_err(|err| {
648                OAuthFlowError::PersistenceFailed {
649                    operation,
650                    detail: err.to_string(),
651                }
652            })? {
653                Some(bytes) => serde_json::from_slice::<OAuthFlowRegistrySnapshot>(&bytes)
654                    .map_err(|err| OAuthFlowError::PersistenceFailed {
655                        operation,
656                        detail: err.to_string(),
657                    })?,
658                None => OAuthFlowRegistrySnapshot::default(),
659            };
660        let now_millis = current_time_millis();
661        let now_instant = Instant::now();
662        let durable_browser = snapshot
663            .browser
664            .iter()
665            .filter(|flow| flow.expires_at_millis > now_millis)
666            .map(persisted_browser_snapshot_key)
667            .collect::<BTreeSet<_>>();
668        let durable_device = snapshot
669            .device
670            .iter()
671            .filter(|flow| flow.expires_at_millis > now_millis)
672            .map(persisted_device_snapshot_key)
673            .collect::<BTreeSet<_>>();
674        let current = self.registry.snapshot_for_persistence(now_millis);
675        for flow in current
676            .browser
677            .iter()
678            .filter(|flow| !durable_browser.contains(&persisted_browser_snapshot_key(flow)))
679        {
680            if let Some(provider) = OAuthProviderIdentity::from_alias(&flow.provider) {
681                let _ =
682                    self.registry
683                        .consume(&flow.state, &flow.target, provider, &flow.redirect_uri);
684            }
685            let _ = self.expire_browser(&flow.target, &flow.state);
686        }
687        for flow in current
688            .device
689            .iter()
690            .filter(|flow| !durable_device.contains(&persisted_device_snapshot_key(flow)))
691        {
692            if let Some(provider) = OAuthProviderIdentity::from_alias(&flow.provider) {
693                let _ = self
694                    .registry
695                    .expire_device_code(&flow.device_code, &flow.target, provider);
696            }
697            let _ = self
698                .lifecycle
699                .expire_device_flow(&flow.target, &flow.device_code);
700        }
701        let current = self.registry.snapshot_for_persistence(now_millis);
702        let current_browser = current
703            .browser
704            .iter()
705            .map(persisted_browser_snapshot_key)
706            .collect::<BTreeSet<_>>();
707        let current_device = current
708            .device
709            .iter()
710            .map(persisted_device_snapshot_key)
711            .collect::<BTreeSet<_>>();
712        for persisted in snapshot
713            .browser
714            .iter()
715            .filter(|flow| {
716                flow.expires_at_millis > now_millis
717                    && !current_browser.contains(&persisted_browser_snapshot_key(flow))
718            })
719            .cloned()
720        {
721            self.restore_browser_payload(persisted, now_millis, now_instant);
722        }
723        for persisted in snapshot
724            .device
725            .iter()
726            .filter(|flow| {
727                flow.expires_at_millis > now_millis
728                    && !current_device.contains(&persisted_device_snapshot_key(flow))
729            })
730            .cloned()
731        {
732            self.restore_device_payload(persisted, now_millis, now_instant);
733        }
734        Ok(())
735    }
736
737    fn restore_browser_payload(
738        &self,
739        persisted: PersistedOAuthBrowserFlow,
740        now_millis: u64,
741        now_instant: Instant,
742    ) {
743        if persisted.expires_at_millis <= now_millis {
744            return;
745        }
746        let Some(provider) = OAuthProviderIdentity::from_alias(&persisted.provider) else {
747            return;
748        };
749        let remaining = Duration::from_millis(persisted.expires_at_millis - now_millis);
750        let elapsed = self.registry.ttl().saturating_sub(remaining);
751        let created_at = now_instant.checked_sub(elapsed).unwrap_or(now_instant);
752        if self
753            .admit_browser(
754                &persisted.target,
755                &persisted.state,
756                provider,
757                &persisted.redirect_uri,
758                persisted.expires_at_millis,
759            )
760            .is_err()
761        {
762            return;
763        }
764        if self
765            .registry
766            .insert_restored_browser_flow(
767                persisted.state.clone(),
768                persisted.target.clone(),
769                provider,
770                persisted.redirect_uri.clone(),
771                persisted.pkce_verifier.clone(),
772                created_at,
773            )
774            .is_err()
775        {
776            let _ = self.expire_browser(&persisted.target, &persisted.state);
777        }
778    }
779
780    fn restore_device_payload(
781        &self,
782        persisted: PersistedOAuthDeviceFlow,
783        now_millis: u64,
784        now_instant: Instant,
785    ) {
786        if persisted.expires_at_millis <= now_millis {
787            return;
788        }
789        let Some(provider) = OAuthProviderIdentity::from_alias(&persisted.provider) else {
790            return;
791        };
792        let remaining = Duration::from_millis(persisted.expires_at_millis - now_millis);
793        let expires_at = now_instant.checked_add(remaining).unwrap_or(now_instant);
794        let elapsed = Duration::from_millis(now_millis.saturating_sub(persisted.created_at_millis));
795        let created_at = now_instant.checked_sub(elapsed).unwrap_or(now_instant);
796        if self
797            .admit_device(
798                &persisted.target,
799                &persisted.device_code,
800                provider,
801                persisted.expires_at_millis,
802            )
803            .is_err()
804        {
805            return;
806        }
807        if self
808            .registry
809            .insert_restored_device_flow(
810                persisted.target.clone(),
811                provider,
812                persisted.device_code.clone(),
813                created_at,
814                expires_at,
815            )
816            .is_err()
817        {
818            let _ = self
819                .lifecycle
820                .expire_device_flow(&persisted.target, &persisted.device_code);
821        }
822    }
823}
824
825fn persist_registry_payloads(
826    registry: &OAuthFlowRegistry,
827    store: &StoreSlot,
828    operation: &'static str,
829    removed_browser: &[BrowserSnapshotKey],
830    removed_device: &[DeviceSnapshotKey],
831) -> Result<(), OAuthFlowError> {
832    let now_millis = current_time_millis();
833    let snapshot = registry.snapshot_for_persistence(now_millis);
834    persist_registry_snapshot(
835        &snapshot,
836        store,
837        operation,
838        removed_browser,
839        removed_device,
840        now_millis,
841        SnapshotPersistPolicy::merge(),
842    )
843}
844
845fn persist_existing_registry_payloads(
846    registry: &OAuthFlowRegistry,
847    store: &StoreSlot,
848    operation: &'static str,
849) -> Result<(), OAuthFlowError> {
850    let now_millis = current_time_millis();
851    let snapshot = registry.snapshot_for_persistence(now_millis);
852    persist_registry_snapshot(
853        &snapshot,
854        store,
855        operation,
856        &[],
857        &[],
858        now_millis,
859        SnapshotPersistPolicy::merge_existing(),
860    )
861}
862
863fn persist_registry_payloads_claiming_removal(
864    registry: &OAuthFlowRegistry,
865    store: &StoreSlot,
866    operation: &'static str,
867    removed_browser: &[BrowserSnapshotKey],
868    removed_device: &[DeviceSnapshotKey],
869) -> Result<(), OAuthFlowError> {
870    let now_millis = current_time_millis();
871    let snapshot = registry.snapshot_for_persistence(now_millis);
872    persist_registry_snapshot(
873        &snapshot,
874        store,
875        operation,
876        removed_browser,
877        removed_device,
878        now_millis,
879        SnapshotPersistPolicy::claim_removal(),
880    )
881}
882
883fn persist_registry_payloads_claiming_admission(
884    registry: &OAuthFlowRegistry,
885    store: &StoreSlot,
886    operation: &'static str,
887    removed_browser: &[BrowserSnapshotKey],
888    removed_device: &[DeviceSnapshotKey],
889    admitted_browser: &[BrowserSnapshotKey],
890    admitted_device: &[DeviceSnapshotKey],
891) -> Result<(), OAuthFlowError> {
892    let now_millis = current_time_millis();
893    let snapshot = registry.snapshot_for_persistence(now_millis);
894    persist_registry_snapshot(
895        &snapshot,
896        store,
897        operation,
898        removed_browser,
899        removed_device,
900        now_millis,
901        SnapshotPersistPolicy::claim_admission(
902            registry.max_outstanding(),
903            admitted_browser,
904            admitted_device,
905        ),
906    )
907}
908
909#[derive(Clone, Copy, Debug, Eq, PartialEq)]
910enum SnapshotRemovalMode {
911    Merge,
912    Claim,
913}
914
915#[derive(Clone, Copy, Debug, Eq, PartialEq)]
916struct SnapshotPersistPolicy<'a> {
917    removal_mode: SnapshotRemovalMode,
918    admission_capacity: Option<usize>,
919    admitted_browser: &'a [BrowserSnapshotKey],
920    admitted_device: &'a [DeviceSnapshotKey],
921}
922
923impl<'a> SnapshotPersistPolicy<'a> {
924    fn merge() -> Self {
925        Self {
926            removal_mode: SnapshotRemovalMode::Merge,
927            admission_capacity: None,
928            admitted_browser: &[],
929            admitted_device: &[],
930        }
931    }
932
933    fn merge_existing() -> Self {
934        Self::merge()
935    }
936
937    fn claim_removal() -> Self {
938        Self {
939            removal_mode: SnapshotRemovalMode::Claim,
940            admission_capacity: None,
941            admitted_browser: &[],
942            admitted_device: &[],
943        }
944    }
945
946    fn claim_admission(
947        max_outstanding: usize,
948        admitted_browser: &'a [BrowserSnapshotKey],
949        admitted_device: &'a [DeviceSnapshotKey],
950    ) -> Self {
951        Self {
952            removal_mode: SnapshotRemovalMode::Merge,
953            admission_capacity: Some(max_outstanding),
954            admitted_browser,
955            admitted_device,
956        }
957    }
958}
959
960fn persist_registry_snapshot(
961    snapshot: &OAuthFlowRegistrySnapshot,
962    store: &StoreSlot,
963    operation: &'static str,
964    removed_browser: &[BrowserSnapshotKey],
965    removed_device: &[DeviceSnapshotKey],
966    now_millis: u64,
967    policy: SnapshotPersistPolicy<'_>,
968) -> Result<(), OAuthFlowError> {
969    let store = store
970        .lock()
971        .unwrap_or_else(std::sync::PoisonError::into_inner)
972        .clone();
973    let Some(store) = store else {
974        return Ok(());
975    };
976    let Some(store) = store.upgrade() else {
977        return Err(OAuthFlowError::PersistenceFailed {
978            operation,
979            detail: "runtime store is no longer available".to_string(),
980        });
981    };
982    let mut update = |current: Option<&[u8]>| -> Result<Vec<u8>, crate::store::RuntimeStoreError> {
983        let merged = merge_oauth_registry_snapshot(
984            current,
985            snapshot,
986            removed_browser,
987            removed_device,
988            now_millis,
989            policy,
990        )?;
991        serde_json::to_vec(&merged)
992            .map_err(|err| crate::store::RuntimeStoreError::WriteFailed(err.to_string()))
993    };
994    match store.update_auth_oauth_flow_snapshot(&mut update) {
995        Ok(()) => Ok(()),
996        Err(crate::store::RuntimeStoreError::NotFound(_))
997            if policy.removal_mode == SnapshotRemovalMode::Claim =>
998        {
999            Err(OAuthFlowError::RegistryProjectionMissing { operation })
1000        }
1001        Err(crate::store::RuntimeStoreError::Internal(detail))
1002            if policy.admission_capacity.is_some() && detail == DURABLE_OAUTH_CAPACITY_EXCEEDED =>
1003        {
1004            Err(OAuthFlowError::CapacityExceeded {
1005                max_outstanding: policy.admission_capacity.unwrap_or(0),
1006            })
1007        }
1008        Err(err) => Err(OAuthFlowError::PersistenceFailed {
1009            operation,
1010            detail: err.to_string(),
1011        }),
1012    }
1013}
1014
1015type BrowserSnapshotKey = (String, String, Option<String>, String);
1016type DeviceSnapshotKey = (String, String, Option<String>, String);
1017const DURABLE_OAUTH_CAPACITY_EXCEEDED: &str = "oauth durable capacity exceeded";
1018
1019fn target_snapshot_key(target: &AuthBindingRef) -> (String, String, Option<String>) {
1020    (
1021        target.realm.to_string(),
1022        target.binding.to_string(),
1023        target.profile.as_ref().map(ToString::to_string),
1024    )
1025}
1026
1027fn browser_snapshot_key(target: &AuthBindingRef, state: &str) -> BrowserSnapshotKey {
1028    let (realm, binding, profile) = target_snapshot_key(target);
1029    (realm, binding, profile, state.to_string())
1030}
1031
1032fn device_snapshot_key(target: &AuthBindingRef, device_code: &str) -> DeviceSnapshotKey {
1033    let (realm, binding, profile) = target_snapshot_key(target);
1034    (realm, binding, profile, device_code.to_string())
1035}
1036
1037fn persisted_browser_snapshot_key(flow: &PersistedOAuthBrowserFlow) -> BrowserSnapshotKey {
1038    browser_snapshot_key(&flow.target, &flow.state)
1039}
1040
1041fn persisted_device_snapshot_key(flow: &PersistedOAuthDeviceFlow) -> DeviceSnapshotKey {
1042    device_snapshot_key(&flow.target, &flow.device_code)
1043}
1044
1045fn ensure_removed_flows_are_active(
1046    current: &OAuthFlowRegistrySnapshot,
1047    removed_browser: &BTreeSet<BrowserSnapshotKey>,
1048    removed_device: &BTreeSet<DeviceSnapshotKey>,
1049    now_millis: u64,
1050) -> Result<(), crate::store::RuntimeStoreError> {
1051    let active_browser = current
1052        .browser
1053        .iter()
1054        .filter(|flow| flow.expires_at_millis > now_millis)
1055        .map(persisted_browser_snapshot_key)
1056        .collect::<BTreeSet<_>>();
1057    for key in removed_browser {
1058        if !active_browser.contains(key) {
1059            return Err(crate::store::RuntimeStoreError::NotFound(
1060                "oauth browser flow was already consumed".to_string(),
1061            ));
1062        }
1063    }
1064
1065    let active_device = current
1066        .device
1067        .iter()
1068        .filter(|flow| flow.expires_at_millis > now_millis)
1069        .map(persisted_device_snapshot_key)
1070        .collect::<BTreeSet<_>>();
1071    for key in removed_device {
1072        if !active_device.contains(key) {
1073            return Err(crate::store::RuntimeStoreError::NotFound(
1074                "oauth device flow was already consumed".to_string(),
1075            ));
1076        }
1077    }
1078    Ok(())
1079}
1080
1081fn ensure_merged_snapshot_within_capacity(
1082    merged: &OAuthFlowRegistrySnapshot,
1083    max_outstanding: usize,
1084) -> Result<(), crate::store::RuntimeStoreError> {
1085    if merged.browser.len().saturating_add(merged.device.len()) > max_outstanding {
1086        return Err(crate::store::RuntimeStoreError::Internal(
1087            DURABLE_OAUTH_CAPACITY_EXCEEDED.to_string(),
1088        ));
1089    }
1090    Ok(())
1091}
1092
1093fn merge_oauth_registry_snapshot(
1094    current: Option<&[u8]>,
1095    local: &OAuthFlowRegistrySnapshot,
1096    removed_browser: &[BrowserSnapshotKey],
1097    removed_device: &[DeviceSnapshotKey],
1098    now_millis: u64,
1099    policy: SnapshotPersistPolicy<'_>,
1100) -> Result<OAuthFlowRegistrySnapshot, crate::store::RuntimeStoreError> {
1101    let mut merged = match current {
1102        Some(bytes) => serde_json::from_slice::<OAuthFlowRegistrySnapshot>(bytes)
1103            .map_err(|err| crate::store::RuntimeStoreError::WriteFailed(err.to_string()))?,
1104        None => OAuthFlowRegistrySnapshot::default(),
1105    };
1106    let removed_browser = removed_browser.iter().cloned().collect::<BTreeSet<_>>();
1107    let removed_device = removed_device.iter().cloned().collect::<BTreeSet<_>>();
1108    let admitted_browser = policy
1109        .admitted_browser
1110        .iter()
1111        .cloned()
1112        .collect::<BTreeSet<_>>();
1113    let admitted_device = policy
1114        .admitted_device
1115        .iter()
1116        .cloned()
1117        .collect::<BTreeSet<_>>();
1118    if policy.removal_mode == SnapshotRemovalMode::Claim {
1119        ensure_removed_flows_are_active(&merged, &removed_browser, &removed_device, now_millis)?;
1120    }
1121    let current_browser = merged
1122        .browser
1123        .iter()
1124        .filter(|flow| flow.expires_at_millis > now_millis)
1125        .map(persisted_browser_snapshot_key)
1126        .collect::<BTreeSet<_>>();
1127    let current_device = merged
1128        .device
1129        .iter()
1130        .filter(|flow| flow.expires_at_millis > now_millis)
1131        .map(persisted_device_snapshot_key)
1132        .collect::<BTreeSet<_>>();
1133    let local_browser = local
1134        .browser
1135        .iter()
1136        .map(persisted_browser_snapshot_key)
1137        .collect::<BTreeSet<_>>();
1138    let local_device = local
1139        .device
1140        .iter()
1141        .map(persisted_device_snapshot_key)
1142        .collect::<BTreeSet<_>>();
1143
1144    merged.browser.retain(|flow| {
1145        flow.expires_at_millis > now_millis
1146            && !removed_browser.contains(&persisted_browser_snapshot_key(flow))
1147            && !local_browser.contains(&persisted_browser_snapshot_key(flow))
1148    });
1149    merged.device.retain(|flow| {
1150        flow.expires_at_millis > now_millis
1151            && !removed_device.contains(&persisted_device_snapshot_key(flow))
1152            && !local_device.contains(&persisted_device_snapshot_key(flow))
1153    });
1154    merged.browser.extend(
1155        local
1156            .browser
1157            .iter()
1158            .filter(|flow| {
1159                let key = persisted_browser_snapshot_key(flow);
1160                flow.expires_at_millis > now_millis
1161                    && !removed_browser.contains(&key)
1162                    && (current_browser.contains(&key) || admitted_browser.contains(&key))
1163            })
1164            .cloned(),
1165    );
1166    merged.device.extend(
1167        local
1168            .device
1169            .iter()
1170            .filter(|flow| {
1171                let key = persisted_device_snapshot_key(flow);
1172                flow.expires_at_millis > now_millis
1173                    && !removed_device.contains(&key)
1174                    && (current_device.contains(&key) || admitted_device.contains(&key))
1175            })
1176            .cloned(),
1177    );
1178    merged.browser.sort_by_key(persisted_browser_snapshot_key);
1179    merged.device.sort_by_key(persisted_device_snapshot_key);
1180    if let Some(max_outstanding) = policy.admission_capacity {
1181        ensure_merged_snapshot_within_capacity(&merged, max_outstanding)?;
1182    }
1183    Ok(merged)
1184}
1185
1186struct RuntimeOAuthDevicePollLifecycle {
1187    lifecycle: Arc<RuntimeAuthLeaseHandle>,
1188    registry: Arc<OAuthFlowRegistry>,
1189    store: StoreSlot,
1190}
1191
1192impl Default for RuntimeOAuthFlowHandle {
1193    fn default() -> Self {
1194        Self::new(Duration::from_secs(10 * 60))
1195    }
1196}
1197
1198impl OAuthDevicePollLifecycle for RuntimeAuthLeaseHandle {
1199    fn device_flow_state_is_authmachine_owned(&self) -> bool {
1200        true
1201    }
1202
1203    fn finish_device_poll(
1204        &self,
1205        target: &AuthBindingRef,
1206        device_code: &str,
1207    ) -> Result<(), OAuthFlowError> {
1208        self.apply_oauth_input(
1209            target,
1210            auth_dsl::AuthMachineInput::FinishOAuthDevicePoll {
1211                flow_id: device_code.to_string(),
1212            },
1213            "finish_oauth_device_poll",
1214            false,
1215        )
1216        .map_err(|err| OAuthFlowError::LifecycleRejected {
1217            operation: "finish_oauth_device_poll",
1218            detail: err.to_string(),
1219        })
1220    }
1221
1222    fn consume_device_flow(
1223        &self,
1224        target: &AuthBindingRef,
1225        device_code: &str,
1226        provider: OAuthProviderIdentity,
1227    ) -> Result<(), OAuthFlowError> {
1228        self.apply_oauth_input(
1229            target,
1230            auth_dsl::AuthMachineInput::ConsumeOAuthDeviceFlow {
1231                flow_id: device_code.to_string(),
1232                provider: provider.canonical_alias().to_string(),
1233                now_millis: current_time_millis(),
1234            },
1235            "consume_oauth_device_flow",
1236            false,
1237        )
1238        .map_err(|err| OAuthFlowError::LifecycleRejected {
1239            operation: "consume_oauth_device_flow",
1240            detail: err.to_string(),
1241        })
1242    }
1243
1244    fn expire_device_flow(
1245        &self,
1246        target: &AuthBindingRef,
1247        device_code: &str,
1248    ) -> Result<(), OAuthFlowError> {
1249        self.apply_oauth_input(
1250            target,
1251            auth_dsl::AuthMachineInput::ExpireOAuthDeviceFlow {
1252                flow_id: device_code.to_string(),
1253            },
1254            "expire_oauth_device_flow",
1255            false,
1256        )
1257        .map_err(|err| OAuthFlowError::LifecycleRejected {
1258            operation: "expire_oauth_device_flow",
1259            detail: err.to_string(),
1260        })
1261    }
1262
1263    fn restore_device_flow(&self, record: &OAuthDeviceFlowRecord) -> Result<(), OAuthFlowError> {
1264        let remaining = record
1265            .expires_at
1266            .checked_duration_since(Instant::now())
1267            .ok_or(OAuthFlowError::Missing)?;
1268        let expires_at_millis = expires_at_millis(remaining)?;
1269        self.apply_oauth_input(
1270            &record.target,
1271            auth_dsl::AuthMachineInput::AdmitOAuthDeviceFlow {
1272                flow_id: record.device_code.clone(),
1273                provider: record.provider.canonical_alias().to_string(),
1274                expires_at_millis,
1275                max_outstanding_flows: u64::MAX,
1276            },
1277            "restore_oauth_device_flow",
1278            true,
1279        )
1280        .map_err(|err| OAuthFlowError::LifecycleRejected {
1281            operation: "restore_oauth_device_flow",
1282            detail: err.to_string(),
1283        })
1284    }
1285}
1286
1287impl OAuthDevicePollLifecycle for RuntimeOAuthDevicePollLifecycle {
1288    fn device_flow_state_is_authmachine_owned(&self) -> bool {
1289        true
1290    }
1291
1292    fn finish_device_poll(
1293        &self,
1294        target: &AuthBindingRef,
1295        device_code: &str,
1296    ) -> Result<(), OAuthFlowError> {
1297        self.lifecycle.finish_device_poll(target, device_code)
1298    }
1299
1300    fn consume_device_flow(
1301        &self,
1302        target: &AuthBindingRef,
1303        device_code: &str,
1304        provider: OAuthProviderIdentity,
1305    ) -> Result<(), OAuthFlowError> {
1306        self.lifecycle
1307            .consume_device_flow(target, device_code, provider)
1308    }
1309
1310    fn expire_device_flow(
1311        &self,
1312        target: &AuthBindingRef,
1313        device_code: &str,
1314    ) -> Result<(), OAuthFlowError> {
1315        self.lifecycle.expire_device_flow(target, device_code)
1316    }
1317
1318    fn restore_device_flow(&self, record: &OAuthDeviceFlowRecord) -> Result<(), OAuthFlowError> {
1319        let remaining = record
1320            .expires_at
1321            .checked_duration_since(Instant::now())
1322            .ok_or(OAuthFlowError::Missing)?;
1323        let expires_at_millis = expires_at_millis(remaining)?;
1324        self.lifecycle
1325            .apply_oauth_input(
1326                &record.target,
1327                auth_dsl::AuthMachineInput::AdmitOAuthDeviceFlow {
1328                    flow_id: record.device_code.clone(),
1329                    provider: record.provider.canonical_alias().to_string(),
1330                    expires_at_millis,
1331                    max_outstanding_flows: self.registry.max_outstanding() as u64,
1332                },
1333                "restore_oauth_device_flow",
1334                true,
1335            )
1336            .map_err(|err| OAuthFlowError::LifecycleRejected {
1337                operation: "restore_oauth_device_flow",
1338                detail: err.to_string(),
1339            })
1340    }
1341
1342    fn device_flow_payloads_changed(&self) -> Result<(), OAuthFlowError> {
1343        persist_existing_registry_payloads(
1344            &self.registry,
1345            &self.store,
1346            "persist_oauth_device_flow_payloads",
1347        )
1348    }
1349
1350    fn device_flow_payload_removed(
1351        &self,
1352        record: &OAuthDeviceFlowRecord,
1353    ) -> Result<(), OAuthFlowError> {
1354        let removed = [device_snapshot_key(&record.target, &record.device_code)];
1355        persist_registry_payloads_claiming_removal(
1356            &self.registry,
1357            &self.store,
1358            "consume_oauth_device_flow",
1359            &[],
1360            &removed,
1361        )
1362    }
1363}
1364
1365impl OAuthFlowAuthority for RuntimeOAuthFlowHandle {
1366    fn terminal_flow_state_is_authmachine_owned(&self) -> bool {
1367        true
1368    }
1369
1370    fn start(
1371        &self,
1372        target: AuthBindingRef,
1373        provider: OAuthProviderIdentity,
1374        redirect_uri: String,
1375        pkce_verifier: String,
1376    ) -> Result<String, OAuthFlowError> {
1377        let _payload_guard = self
1378            .payload_lock
1379            .lock()
1380            .unwrap_or_else(std::sync::PoisonError::into_inner);
1381        self.sync_persisted_payloads("admit_oauth_browser_flow")?;
1382        self.expire_pruned_flows();
1383        let state = OAuthFlowRegistry::new_state()?;
1384        let expires_at = expires_at_millis(self.registry.ttl())?;
1385        self.admit_browser(&target, &state, provider, &redirect_uri, expires_at)?;
1386        let mut inserted = self.registry.insert_browser_flow_with_pruned(
1387            state.clone(),
1388            target.clone(),
1389            provider,
1390            redirect_uri.clone(),
1391            pkce_verifier.clone(),
1392        );
1393        let mut lifecycle_pruned = OAuthPrunedFlows::default();
1394        let mut lifecycle_pruned_snapshot = None;
1395        if matches!(inserted, Err(OAuthFlowError::CapacityExceeded { .. })) {
1396            let (pruned, snapshot) = self.retain_registry_payloads_with_lifecycle();
1397            lifecycle_pruned = pruned;
1398            lifecycle_pruned_snapshot = Some(snapshot);
1399            inserted = self.registry.insert_browser_flow_with_pruned(
1400                state.clone(),
1401                target.clone(),
1402                provider,
1403                redirect_uri.clone(),
1404                pkce_verifier,
1405            );
1406        }
1407        let pruned = match inserted {
1408            Ok(pruned) => pruned,
1409            Err(err) => {
1410                let _ = self.expire_browser(&target, &state);
1411                return Err(err);
1412            }
1413        };
1414        let (removed_browser, removed_device) =
1415            if let Some(snapshot) = lifecycle_pruned_snapshot.as_ref() {
1416                Self::removed_snapshot_keys_from_pruned(snapshot, &lifecycle_pruned)
1417            } else {
1418                (Vec::new(), Vec::new())
1419            };
1420        self.expire_collected_flows(pruned);
1421        let admitted_browser = [browser_snapshot_key(&target, &state)];
1422        if let Err(err) = self.persist_registry_payloads_claiming_admission(
1423            "admit_oauth_browser_flow",
1424            &removed_browser,
1425            &removed_device,
1426            &admitted_browser,
1427            &[],
1428        ) {
1429            let _ = self
1430                .registry
1431                .consume(&state, &target, provider, &redirect_uri);
1432            let _ = self.expire_browser(&target, &state);
1433            return Err(err);
1434        }
1435        Ok(state)
1436    }
1437
1438    fn verify(
1439        &self,
1440        state: &str,
1441        target: &AuthBindingRef,
1442        provider: OAuthProviderIdentity,
1443        redirect_uri: &str,
1444    ) -> Result<OAuthFlowRecord, OAuthFlowError> {
1445        let _payload_guard = self
1446            .payload_lock
1447            .lock()
1448            .unwrap_or_else(std::sync::PoisonError::into_inner);
1449        self.sync_persisted_payloads("verify_oauth_browser_flow")?;
1450        self.expire_pruned_flows();
1451        self.verify_browser(target, state, provider, redirect_uri)?;
1452        match self.registry.verify(state, target, provider, redirect_uri) {
1453            Ok(record) => Ok(record),
1454            Err(OAuthFlowError::Missing) => Err(OAuthFlowError::RegistryProjectionMissing {
1455                operation: "verify_oauth_browser_flow",
1456            }),
1457            Err(err) => Err(err),
1458        }
1459    }
1460
1461    fn consume(
1462        &self,
1463        state: &str,
1464        target: &AuthBindingRef,
1465        provider: OAuthProviderIdentity,
1466        redirect_uri: &str,
1467    ) -> Result<OAuthFlowRecord, OAuthFlowError> {
1468        let _payload_guard = self
1469            .payload_lock
1470            .lock()
1471            .unwrap_or_else(std::sync::PoisonError::into_inner);
1472        self.sync_persisted_payloads("consume_oauth_browser_flow")?;
1473        self.expire_pruned_flows();
1474        self.verify_browser(target, state, provider, redirect_uri)?;
1475        let record = match self.registry.verify(state, target, provider, redirect_uri) {
1476            Ok(record) => record,
1477            Err(OAuthFlowError::Missing) => {
1478                return Err(OAuthFlowError::RegistryProjectionMissing {
1479                    operation: "consume_oauth_browser_flow",
1480                });
1481            }
1482            Err(err) => return Err(err),
1483        };
1484        self.consume_browser(target, state, provider, redirect_uri)?;
1485        if let Err(err) = self.registry.consume(state, target, provider, redirect_uri) {
1486            let _ = self.restore_browser_flow(state, &record);
1487            return Err(match err {
1488                OAuthFlowError::Missing => OAuthFlowError::RegistryProjectionMissing {
1489                    operation: "consume_oauth_browser_flow",
1490                },
1491                other => other,
1492            });
1493        }
1494        let removed_browser = [browser_snapshot_key(target, state)];
1495        if let Err(err) = self.persist_registry_payloads_claiming_removal(
1496            "consume_oauth_browser_flow",
1497            &removed_browser,
1498            &[],
1499        ) {
1500            if matches!(
1501                err,
1502                OAuthFlowError::Missing | OAuthFlowError::RegistryProjectionMissing { .. }
1503            ) {
1504                return Err(err);
1505            }
1506            let _ = self.restore_browser_flow(state, &record);
1507            return Err(err);
1508        }
1509        Ok(record)
1510    }
1511
1512    fn admit_device_code(
1513        &self,
1514        target: AuthBindingRef,
1515        provider: OAuthProviderIdentity,
1516        device_code: String,
1517        expires_in: Duration,
1518    ) -> Result<(), OAuthFlowError> {
1519        let _payload_guard = self
1520            .payload_lock
1521            .lock()
1522            .unwrap_or_else(std::sync::PoisonError::into_inner);
1523        self.sync_persisted_payloads("admit_oauth_device_flow")?;
1524        self.expire_pruned_flows();
1525        let machine_expires_at = expires_at_millis(expires_in)?;
1526        self.admit_device(&target, &device_code, provider, machine_expires_at)?;
1527        let mut inserted = self.registry.admit_device_code_with_pruned(
1528            target.clone(),
1529            provider,
1530            device_code.clone(),
1531            expires_in,
1532        );
1533        let mut lifecycle_pruned = OAuthPrunedFlows::default();
1534        let mut lifecycle_pruned_snapshot = None;
1535        if matches!(inserted, Err(OAuthFlowError::CapacityExceeded { .. })) {
1536            let (pruned, snapshot) = self.retain_registry_payloads_with_lifecycle();
1537            lifecycle_pruned = pruned;
1538            lifecycle_pruned_snapshot = Some(snapshot);
1539            inserted = self.registry.admit_device_code_with_pruned(
1540                target.clone(),
1541                provider,
1542                device_code.clone(),
1543                expires_in,
1544            );
1545        }
1546        let pruned = match inserted {
1547            Ok(pruned) => pruned,
1548            Err(err) => {
1549                let _ = self.lifecycle.expire_device_flow(&target, &device_code);
1550                return Err(err);
1551            }
1552        };
1553        let (removed_browser, removed_device) =
1554            if let Some(snapshot) = lifecycle_pruned_snapshot.as_ref() {
1555                Self::removed_snapshot_keys_from_pruned(snapshot, &lifecycle_pruned)
1556            } else {
1557                (Vec::new(), Vec::new())
1558            };
1559        self.expire_collected_flows(pruned);
1560        let admitted_device = [device_snapshot_key(&target, &device_code)];
1561        if let Err(err) = self.persist_registry_payloads_claiming_admission(
1562            "admit_oauth_device_flow",
1563            &removed_browser,
1564            &removed_device,
1565            &[],
1566            &admitted_device,
1567        ) {
1568            let _ = self
1569                .registry
1570                .expire_device_code(&device_code, &target, provider);
1571            let _ = self.lifecycle.expire_device_flow(&target, &device_code);
1572            return Err(err);
1573        }
1574        Ok(())
1575    }
1576
1577    fn verify_device_code(
1578        &self,
1579        device_code: &str,
1580        target: &AuthBindingRef,
1581        provider: OAuthProviderIdentity,
1582    ) -> Result<OAuthDeviceFlowRecord, OAuthFlowError> {
1583        let _payload_guard = self
1584            .payload_lock
1585            .lock()
1586            .unwrap_or_else(std::sync::PoisonError::into_inner);
1587        self.sync_persisted_payloads("verify_oauth_device_flow")?;
1588        self.expire_pruned_flows();
1589        self.verify_device(target, device_code, provider)?;
1590        match self
1591            .registry
1592            .verify_device_code(device_code, target, provider)
1593        {
1594            Ok(record) => Ok(record),
1595            Err(OAuthFlowError::Missing) => Err(OAuthFlowError::RegistryProjectionMissing {
1596                operation: "verify_oauth_device_flow",
1597            }),
1598            Err(err) => Err(err),
1599        }
1600    }
1601
1602    fn begin_device_code_poll(
1603        &self,
1604        device_code: &str,
1605        target: &AuthBindingRef,
1606        provider: OAuthProviderIdentity,
1607    ) -> Result<OAuthDevicePollLease, OAuthFlowError> {
1608        let _payload_guard = self
1609            .payload_lock
1610            .lock()
1611            .unwrap_or_else(std::sync::PoisonError::into_inner);
1612        self.sync_persisted_payloads("begin_oauth_device_poll")?;
1613        self.expire_pruned_flows();
1614        self.begin_device_poll(target, device_code, provider)?;
1615        let poll = match self
1616            .registry
1617            .begin_device_code_poll(device_code, target, provider)
1618        {
1619            Ok(poll) => poll,
1620            Err(OAuthFlowError::Missing) => {
1621                let _ = self.lifecycle.finish_device_poll(target, device_code);
1622                return Err(OAuthFlowError::RegistryProjectionMissing {
1623                    operation: "begin_oauth_device_poll",
1624                });
1625            }
1626            Err(err) => {
1627                let _ = self.lifecycle.finish_device_poll(target, device_code);
1628                return Err(err);
1629            }
1630        };
1631        let lifecycle: Arc<dyn OAuthDevicePollLifecycle> =
1632            Arc::new(RuntimeOAuthDevicePollLifecycle {
1633                lifecycle: Arc::clone(&self.lifecycle),
1634                registry: Arc::clone(&self.registry),
1635                store: Arc::clone(&self.store),
1636            });
1637        Ok(poll
1638            .with_lifecycle(lifecycle)
1639            .with_operation_lock(Arc::clone(&self.payload_lock)))
1640    }
1641}
1642
1643#[cfg(test)]
1644mod tests {
1645    use std::sync::{
1646        Arc, Condvar, Mutex as StdMutex,
1647        atomic::{AtomicBool, Ordering},
1648    };
1649
1650    use meerkat_core::handles::{AuthLeaseHandle, AuthLeasePhase, LeaseKey};
1651    use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
1652    use meerkat_core::lifecycle::{InputId, RunBoundaryReceipt, RunId};
1653    use meerkat_core::types::SessionId;
1654
1655    use super::*;
1656    use crate::identifiers::LogicalRuntimeId;
1657    use crate::input_state::StoredInputState;
1658    use crate::runtime_state::RuntimeState;
1659    use crate::store::{RuntimeStore, RuntimeStoreError, SessionDelta};
1660
1661    fn target() -> AuthBindingRef {
1662        AuthBindingRef {
1663            realm: meerkat_core::RealmId::parse("dev").expect("valid realm"),
1664            binding: meerkat_core::BindingId::parse("default_openai").expect("valid binding"),
1665            profile: None,
1666        }
1667    }
1668
1669    fn alternate_target() -> AuthBindingRef {
1670        AuthBindingRef {
1671            realm: meerkat_core::RealmId::parse("dev").expect("valid realm"),
1672            binding: meerkat_core::BindingId::parse("secondary_openai").expect("valid binding"),
1673            profile: None,
1674        }
1675    }
1676
1677    #[derive(Debug, Default)]
1678    struct BlockingOAuthPersistState {
1679        armed: bool,
1680        blocked: bool,
1681        released: bool,
1682    }
1683
1684    #[derive(Debug, Default)]
1685    struct FailingOAuthSnapshotStore {
1686        snapshot: StdMutex<Option<Vec<u8>>>,
1687        fail_oauth_persist: AtomicBool,
1688        blocking_oauth_persist: StdMutex<BlockingOAuthPersistState>,
1689        blocking_oauth_persist_cv: Condvar,
1690    }
1691
1692    impl FailingOAuthSnapshotStore {
1693        fn block_next_oauth_persist(&self) {
1694            let mut state = self
1695                .blocking_oauth_persist
1696                .lock()
1697                .expect("blocking persist state lock");
1698            state.armed = true;
1699            state.blocked = false;
1700            state.released = false;
1701        }
1702
1703        fn wait_for_blocked_oauth_persist(&self) {
1704            let mut state = self
1705                .blocking_oauth_persist
1706                .lock()
1707                .expect("blocking persist state lock");
1708            while !state.blocked {
1709                let (next, timeout) = self
1710                    .blocking_oauth_persist_cv
1711                    .wait_timeout(state, Duration::from_secs(1))
1712                    .expect("blocking persist state wait");
1713                assert!(
1714                    !timeout.timed_out(),
1715                    "expected OAuth snapshot persist to block"
1716                );
1717                state = next;
1718            }
1719        }
1720
1721        fn release_blocked_oauth_persist(&self) {
1722            let mut state = self
1723                .blocking_oauth_persist
1724                .lock()
1725                .expect("blocking persist state lock");
1726            state.released = true;
1727            self.blocking_oauth_persist_cv.notify_all();
1728        }
1729
1730        fn wait_if_oauth_persist_blocked(&self) {
1731            let mut state = self
1732                .blocking_oauth_persist
1733                .lock()
1734                .expect("blocking persist state lock");
1735            if !state.armed {
1736                return;
1737            }
1738            state.armed = false;
1739            state.blocked = true;
1740            self.blocking_oauth_persist_cv.notify_all();
1741            while !state.released {
1742                state = self
1743                    .blocking_oauth_persist_cv
1744                    .wait(state)
1745                    .expect("blocking persist state wait");
1746            }
1747        }
1748
1749        fn fail_oauth_persist(&self) {
1750            self.fail_oauth_persist.store(true, Ordering::SeqCst);
1751        }
1752
1753        fn allow_oauth_persist(&self) {
1754            self.fail_oauth_persist.store(false, Ordering::SeqCst);
1755        }
1756    }
1757
1758    #[async_trait::async_trait]
1759    impl RuntimeStore for FailingOAuthSnapshotStore {
1760        fn persist_auth_oauth_flow_snapshot(
1761            &self,
1762            snapshot_json: &[u8],
1763        ) -> Result<(), RuntimeStoreError> {
1764            self.wait_if_oauth_persist_blocked();
1765            if self.fail_oauth_persist.load(Ordering::SeqCst) {
1766                return Err(RuntimeStoreError::WriteFailed(
1767                    "injected oauth snapshot failure".to_string(),
1768                ));
1769            }
1770            *self
1771                .snapshot
1772                .lock()
1773                .map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))? =
1774                Some(snapshot_json.to_vec());
1775            Ok(())
1776        }
1777
1778        fn load_auth_oauth_flow_snapshot(&self) -> Result<Option<Vec<u8>>, RuntimeStoreError> {
1779            self.snapshot
1780                .lock()
1781                .map(|snapshot| snapshot.clone())
1782                .map_err(|err| RuntimeStoreError::ReadFailed(err.to_string()))
1783        }
1784
1785        fn update_auth_oauth_flow_snapshot(
1786            &self,
1787            update: &mut crate::store::AuthOAuthFlowSnapshotUpdate<'_>,
1788        ) -> Result<(), RuntimeStoreError> {
1789            self.wait_if_oauth_persist_blocked();
1790            if self.fail_oauth_persist.load(Ordering::SeqCst) {
1791                return Err(RuntimeStoreError::WriteFailed(
1792                    "injected oauth snapshot failure".to_string(),
1793                ));
1794            }
1795            let mut snapshot = self
1796                .snapshot
1797                .lock()
1798                .map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))?;
1799            let next = update(snapshot.as_deref())?;
1800            *snapshot = Some(next);
1801            Ok(())
1802        }
1803
1804        async fn commit_session_snapshot(
1805            &self,
1806            _runtime_id: &LogicalRuntimeId,
1807            _session_delta: SessionDelta,
1808        ) -> Result<(), RuntimeStoreError> {
1809            Err(RuntimeStoreError::Unsupported(
1810                "commit_session_snapshot".to_string(),
1811            ))
1812        }
1813
1814        async fn atomic_apply(
1815            &self,
1816            _runtime_id: &LogicalRuntimeId,
1817            _session_delta: Option<SessionDelta>,
1818            _receipt: RunBoundaryReceipt,
1819            _input_updates: Vec<StoredInputState>,
1820            _session_store_key: Option<SessionId>,
1821        ) -> Result<(), RuntimeStoreError> {
1822            Err(RuntimeStoreError::Unsupported("atomic_apply".to_string()))
1823        }
1824
1825        async fn load_input_states(
1826            &self,
1827            _runtime_id: &LogicalRuntimeId,
1828        ) -> Result<Vec<StoredInputState>, RuntimeStoreError> {
1829            Err(RuntimeStoreError::Unsupported(
1830                "load_input_states".to_string(),
1831            ))
1832        }
1833
1834        async fn load_boundary_receipt(
1835            &self,
1836            _runtime_id: &LogicalRuntimeId,
1837            _run_id: &RunId,
1838            _sequence: u64,
1839        ) -> Result<Option<RunBoundaryReceipt>, RuntimeStoreError> {
1840            Err(RuntimeStoreError::Unsupported(
1841                "load_boundary_receipt".to_string(),
1842            ))
1843        }
1844
1845        async fn load_session_snapshot(
1846            &self,
1847            _runtime_id: &LogicalRuntimeId,
1848        ) -> Result<Option<Vec<u8>>, RuntimeStoreError> {
1849            Err(RuntimeStoreError::Unsupported(
1850                "load_session_snapshot".to_string(),
1851            ))
1852        }
1853
1854        async fn persist_input_state(
1855            &self,
1856            _runtime_id: &LogicalRuntimeId,
1857            _state: &StoredInputState,
1858        ) -> Result<(), RuntimeStoreError> {
1859            Err(RuntimeStoreError::Unsupported(
1860                "persist_input_state".to_string(),
1861            ))
1862        }
1863
1864        async fn load_input_state(
1865            &self,
1866            _runtime_id: &LogicalRuntimeId,
1867            _input_id: &InputId,
1868        ) -> Result<Option<StoredInputState>, RuntimeStoreError> {
1869            Err(RuntimeStoreError::Unsupported(
1870                "load_input_state".to_string(),
1871            ))
1872        }
1873
1874        async fn load_runtime_state(
1875            &self,
1876            _runtime_id: &LogicalRuntimeId,
1877        ) -> Result<Option<RuntimeState>, RuntimeStoreError> {
1878            Err(RuntimeStoreError::Unsupported(
1879                "load_runtime_state".to_string(),
1880            ))
1881        }
1882
1883        async fn commit_machine_lifecycle(
1884            &self,
1885            _runtime_id: &LogicalRuntimeId,
1886            _commit: crate::store::MachineLifecycleCommit,
1887            _input_states: &[StoredInputState],
1888        ) -> Result<(), RuntimeStoreError> {
1889            Err(RuntimeStoreError::Unsupported(
1890                "commit_machine_lifecycle".to_string(),
1891            ))
1892        }
1893    }
1894
1895    fn snapshot_phase(
1896        lifecycle: &RuntimeAuthLeaseHandle,
1897        target: &AuthBindingRef,
1898    ) -> Option<AuthLeasePhase> {
1899        lifecycle
1900            .snapshot(&LeaseKey::from_auth_binding(target))
1901            .phase
1902    }
1903
1904    #[test]
1905    fn browser_flow_only_machine_stays_reauth_required_until_credentials_commit() {
1906        let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
1907        let authority =
1908            RuntimeOAuthFlowHandle::new_with_auth_lease(Duration::from_secs(60), lifecycle.clone());
1909        let target = target();
1910        let provider = OAuthProviderIdentity::OpenAiChatGpt;
1911        let redirect_uri = "http://127.0.0.1/callback";
1912
1913        let state = authority
1914            .start(
1915                target.clone(),
1916                provider,
1917                redirect_uri.to_string(),
1918                "verifier".to_string(),
1919            )
1920            .expect("browser flow admitted");
1921        assert_eq!(
1922            snapshot_phase(&lifecycle, &target),
1923            Some(AuthLeasePhase::ReauthRequired)
1924        );
1925
1926        authority
1927            .verify(&state, &target, provider, redirect_uri)
1928            .expect("browser flow verifies");
1929        assert_eq!(
1930            snapshot_phase(&lifecycle, &target),
1931            Some(AuthLeasePhase::ReauthRequired)
1932        );
1933
1934        authority
1935            .consume(&state, &target, provider, redirect_uri)
1936            .expect("browser flow consumes");
1937        assert_eq!(
1938            snapshot_phase(&lifecycle, &target),
1939            Some(AuthLeasePhase::ReauthRequired)
1940        );
1941    }
1942
1943    #[test]
1944    fn missing_browser_projection_cannot_overwrite_authmachine_flow() {
1945        let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
1946        let authority =
1947            RuntimeOAuthFlowHandle::new_with_auth_lease(Duration::from_secs(60), lifecycle.clone());
1948        let target = target();
1949        let provider = OAuthProviderIdentity::OpenAiChatGpt;
1950        let redirect_uri = "http://127.0.0.1/callback";
1951        let state = authority
1952            .start(
1953                target.clone(),
1954                provider,
1955                redirect_uri.to_string(),
1956                "verifier".to_string(),
1957            )
1958            .expect("browser flow admitted");
1959
1960        authority
1961            .registry
1962            .consume(&state, &target, provider, redirect_uri)
1963            .expect("test removes only the local registry payload");
1964
1965        assert!(matches!(
1966            authority.verify(&state, &target, provider, redirect_uri),
1967            Err(OAuthFlowError::RegistryProjectionMissing {
1968                operation: "verify_oauth_browser_flow"
1969            })
1970        ));
1971        assert!(
1972            lifecycle.has_oauth_browser_flow_for_test(&target, &state),
1973            "missing process-local registry payload must not expire canonical AuthMachine membership"
1974        );
1975
1976        assert!(matches!(
1977            authority.consume(&state, &target, provider, redirect_uri),
1978            Err(OAuthFlowError::RegistryProjectionMissing {
1979                operation: "consume_oauth_browser_flow"
1980            })
1981        ));
1982        assert!(
1983            lifecycle.has_oauth_browser_flow_for_test(&target, &state),
1984            "terminal consume must fail closed instead of converting payload loss to not-found"
1985        );
1986        assert_eq!(
1987            snapshot_phase(&lifecycle, &target),
1988            Some(AuthLeasePhase::ReauthRequired)
1989        );
1990    }
1991
1992    #[test]
1993    fn missing_device_poll_projection_cannot_overwrite_authmachine_flow() {
1994        let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
1995        let authority =
1996            RuntimeOAuthFlowHandle::new_with_auth_lease(Duration::from_secs(60), lifecycle.clone());
1997        let target = target();
1998        let provider = OAuthProviderIdentity::GoogleCodeAssist;
1999        let device_code = "provider-device-code";
2000
2001        authority
2002            .admit_device_code(
2003                target.clone(),
2004                provider,
2005                device_code.to_string(),
2006                Duration::from_secs(60),
2007            )
2008            .expect("device flow admitted");
2009        let poll = authority
2010            .begin_device_code_poll(device_code, &target, provider)
2011            .expect("device poll begins");
2012        authority
2013            .registry
2014            .expire_device_code(device_code, &target, provider)
2015            .expect("test removes only the local registry payload");
2016
2017        assert!(matches!(
2018            poll.consume(),
2019            Err(OAuthFlowError::RegistryProjectionMissing {
2020                operation: "consume_oauth_device_flow"
2021            })
2022        ));
2023        assert!(
2024            lifecycle.has_oauth_device_flow_for_test(&target, device_code),
2025            "missing process-local poll payload must not expire canonical AuthMachine membership"
2026        );
2027    }
2028
2029    #[test]
2030    fn browser_admit_persistence_failure_rolls_back_unreturned_flow() {
2031        let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
2032        let store = Arc::new(FailingOAuthSnapshotStore::default());
2033        let store_dyn = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2034        let authority = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2035            Duration::from_secs(60),
2036            lifecycle.clone(),
2037            &store_dyn,
2038        );
2039        let failed_target = target();
2040        let successful_target = alternate_target();
2041        let provider = OAuthProviderIdentity::OpenAiChatGpt;
2042        let redirect_uri = "http://127.0.0.1/callback";
2043
2044        store.fail_oauth_persist();
2045        assert!(matches!(
2046            authority.start(
2047                failed_target.clone(),
2048                provider,
2049                redirect_uri.to_string(),
2050                "failed-verifier".to_string(),
2051            ),
2052            Err(OAuthFlowError::PersistenceFailed { .. })
2053        ));
2054        assert!(
2055            authority
2056                .registry
2057                .snapshot_for_persistence(current_time_millis())
2058                .browser
2059                .is_empty(),
2060            "failed browser admission must not leave an unreturned registry payload"
2061        );
2062
2063        store.allow_oauth_persist();
2064        let successful_state = authority
2065            .start(
2066                successful_target,
2067                provider,
2068                redirect_uri.to_string(),
2069                "successful-verifier".to_string(),
2070            )
2071            .expect("subsequent browser admit persists after store recovers");
2072        let snapshot_json = store
2073            .load_auth_oauth_flow_snapshot()
2074            .expect("durable OAuth snapshot loads")
2075            .expect("durable OAuth snapshot exists");
2076        let snapshot = serde_json::from_slice::<OAuthFlowRegistrySnapshot>(&snapshot_json)
2077            .expect("durable OAuth snapshot decodes");
2078        assert_eq!(
2079            snapshot
2080                .browser
2081                .iter()
2082                .map(|flow| flow.state.as_str())
2083                .collect::<Vec<_>>(),
2084            vec![successful_state.as_str()],
2085            "a later successful admit must not persist a previously failed unreturned flow"
2086        );
2087    }
2088
2089    #[test]
2090    fn device_admit_persistence_failure_rolls_back_unreturned_flow() {
2091        let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
2092        let store = Arc::new(FailingOAuthSnapshotStore::default());
2093        let store_dyn = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2094        let authority = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2095            Duration::from_secs(60),
2096            lifecycle.clone(),
2097            &store_dyn,
2098        );
2099        let target = target();
2100        let provider = OAuthProviderIdentity::GoogleCodeAssist;
2101        let failed_device_code = "failed-device-code";
2102        let successful_device_code = "successful-device-code";
2103
2104        store.fail_oauth_persist();
2105        assert!(matches!(
2106            authority.admit_device_code(
2107                target.clone(),
2108                provider,
2109                failed_device_code.to_string(),
2110                Duration::from_secs(60),
2111            ),
2112            Err(OAuthFlowError::PersistenceFailed { .. })
2113        ));
2114        assert!(matches!(
2115            authority
2116                .registry
2117                .verify_device_code(failed_device_code, &target, provider),
2118            Err(OAuthFlowError::Missing)
2119        ));
2120        assert!(!lifecycle.has_oauth_device_flow_for_test(&target, failed_device_code));
2121
2122        store.allow_oauth_persist();
2123        authority
2124            .admit_device_code(
2125                target,
2126                provider,
2127                successful_device_code.to_string(),
2128                Duration::from_secs(60),
2129            )
2130            .expect("subsequent device admit persists after store recovers");
2131        let snapshot_json = store
2132            .load_auth_oauth_flow_snapshot()
2133            .expect("durable OAuth snapshot loads")
2134            .expect("durable OAuth snapshot exists");
2135        let snapshot = serde_json::from_slice::<OAuthFlowRegistrySnapshot>(&snapshot_json)
2136            .expect("durable OAuth snapshot decodes");
2137        assert_eq!(
2138            snapshot
2139                .device
2140                .iter()
2141                .map(|flow| flow.device_code.as_str())
2142                .collect::<Vec<_>>(),
2143            vec![successful_device_code],
2144            "a later successful admit must not persist a previously failed unreturned device flow"
2145        );
2146    }
2147
2148    #[cfg(feature = "sqlite-store")]
2149    #[test]
2150    fn persistent_oauth_snapshot_merges_independent_authority_writes() {
2151        let temp_dir = tempfile::tempdir().expect("tempdir");
2152        let store_path = temp_dir.path().join("runtime.sqlite");
2153        let store_one: Arc<dyn RuntimeStore> =
2154            Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2155        let store_two: Arc<dyn RuntimeStore> =
2156            Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2157        let first_authority = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2158            Duration::from_secs(60),
2159            Arc::new(RuntimeAuthLeaseHandle::new()),
2160            &store_one,
2161        );
2162        let second_authority = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2163            Duration::from_secs(60),
2164            Arc::new(RuntimeAuthLeaseHandle::new()),
2165            &store_two,
2166        );
2167        let first_target = target();
2168        let second_target = alternate_target();
2169        let provider = OAuthProviderIdentity::OpenAiChatGpt;
2170
2171        let first_state = first_authority
2172            .start(
2173                first_target.clone(),
2174                provider,
2175                "http://127.0.0.1/callback".to_string(),
2176                "verifier-1".to_string(),
2177            )
2178            .expect("first process admits browser flow");
2179        let second_state = second_authority
2180            .start(
2181                second_target.clone(),
2182                provider,
2183                "http://127.0.0.1/other-callback".to_string(),
2184                "verifier-2".to_string(),
2185            )
2186            .expect("second process admits browser flow");
2187
2188        let store_three: Arc<dyn RuntimeStore> =
2189            Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2190        let snapshot_json = store_three
2191            .load_auth_oauth_flow_snapshot()
2192            .expect("durable OAuth snapshot loads")
2193            .expect("durable OAuth snapshot exists");
2194        let snapshot = serde_json::from_slice::<OAuthFlowRegistrySnapshot>(&snapshot_json)
2195            .expect("durable OAuth snapshot decodes");
2196        assert!(
2197            snapshot
2198                .browser
2199                .iter()
2200                .any(|flow| flow.state == first_state),
2201            "the first independent authority write must survive the second write"
2202        );
2203        assert!(
2204            snapshot
2205                .browser
2206                .iter()
2207                .any(|flow| flow.state == second_state),
2208            "the second independent authority write must be persisted"
2209        );
2210
2211        let restarted = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2212            Duration::from_secs(60),
2213            Arc::new(RuntimeAuthLeaseHandle::new()),
2214            &store_three,
2215        );
2216        restarted
2217            .consume(
2218                &first_state,
2219                &first_target,
2220                provider,
2221                "http://127.0.0.1/callback",
2222            )
2223            .expect("first independent flow rehydrates");
2224        restarted
2225            .consume(
2226                &second_state,
2227                &second_target,
2228                provider,
2229                "http://127.0.0.1/other-callback",
2230            )
2231            .expect("second independent flow rehydrates after first consume");
2232    }
2233
2234    #[test]
2235    fn persistent_oauth_browser_admit_does_not_resurrect_consumed_between_sync_and_persist() {
2236        let store = Arc::new(FailingOAuthSnapshotStore::default());
2237        let store_dyn = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2238        let creator = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2239            Duration::from_secs(60),
2240            Arc::new(RuntimeAuthLeaseHandle::new()),
2241            &store_dyn,
2242        );
2243        let target = target();
2244        let replacement_target = alternate_target();
2245        let provider = OAuthProviderIdentity::OpenAiChatGpt;
2246        let redirect_uri = "http://127.0.0.1/callback";
2247        let replacement_redirect_uri = "http://127.0.0.1/replacement-callback";
2248        let consumed_state = creator
2249            .start(
2250                target.clone(),
2251                provider,
2252                redirect_uri.to_string(),
2253                "consumed-verifier".to_string(),
2254            )
2255            .expect("creator admits browser flow");
2256
2257        let stale_authority = Arc::new(
2258            RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2259                Duration::from_secs(60),
2260                Arc::new(RuntimeAuthLeaseHandle::new()),
2261                &store_dyn,
2262            ),
2263        );
2264        let consumer = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2265            Duration::from_secs(60),
2266            Arc::new(RuntimeAuthLeaseHandle::new()),
2267            &store_dyn,
2268        );
2269
2270        store.block_next_oauth_persist();
2271        let stale_admit = std::thread::spawn({
2272            let stale_authority = Arc::clone(&stale_authority);
2273            let replacement_target = replacement_target.clone();
2274            move || {
2275                stale_authority.start(
2276                    replacement_target,
2277                    provider,
2278                    replacement_redirect_uri.to_string(),
2279                    "replacement-verifier".to_string(),
2280                )
2281            }
2282        });
2283        store.wait_for_blocked_oauth_persist();
2284        consumer
2285            .consume(&consumed_state, &target, provider, redirect_uri)
2286            .expect("independent authority consumes browser flow between sync and persist");
2287        store.release_blocked_oauth_persist();
2288        let replacement_state = stale_admit
2289            .join()
2290            .expect("stale browser admit thread should not panic")
2291            .expect("stale authority admits replacement browser flow");
2292
2293        let snapshot_json = store
2294            .load_auth_oauth_flow_snapshot()
2295            .expect("durable OAuth snapshot loads")
2296            .expect("durable OAuth snapshot exists");
2297        let snapshot = serde_json::from_slice::<OAuthFlowRegistrySnapshot>(&snapshot_json)
2298            .expect("durable OAuth snapshot decodes");
2299        assert!(
2300            !snapshot
2301                .browser
2302                .iter()
2303                .any(|flow| flow.state == consumed_state),
2304            "a stale admission must not resurrect a browser flow consumed after pre-sync"
2305        );
2306        assert!(
2307            snapshot
2308                .browser
2309                .iter()
2310                .any(|flow| flow.state == replacement_state),
2311            "the stale authority's newly admitted browser flow should still persist"
2312        );
2313
2314        let restarted = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2315            Duration::from_secs(60),
2316            Arc::new(RuntimeAuthLeaseHandle::new()),
2317            &store_dyn,
2318        );
2319        assert!(matches!(
2320            restarted.consume(&consumed_state, &target, provider, redirect_uri),
2321            Err(OAuthFlowError::LifecycleRejected {
2322                operation: "verify_oauth_browser_flow",
2323                ..
2324            })
2325        ));
2326        restarted
2327            .consume(
2328                &replacement_state,
2329                &replacement_target,
2330                provider,
2331                replacement_redirect_uri,
2332            )
2333            .expect("new stale-authority flow survives restart");
2334    }
2335
2336    #[test]
2337    fn persistent_oauth_device_admit_does_not_resurrect_consumed_between_sync_and_persist() {
2338        let store = Arc::new(FailingOAuthSnapshotStore::default());
2339        let store_dyn = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2340        let creator = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2341            Duration::from_secs(60),
2342            Arc::new(RuntimeAuthLeaseHandle::new()),
2343            &store_dyn,
2344        );
2345        let target = target();
2346        let replacement_target = alternate_target();
2347        let provider = OAuthProviderIdentity::GoogleCodeAssist;
2348        let consumed_device_code = "consumed-device-code";
2349        let replacement_device_code = "replacement-device-code";
2350        creator
2351            .admit_device_code(
2352                target.clone(),
2353                provider,
2354                consumed_device_code.to_string(),
2355                Duration::from_secs(60),
2356            )
2357            .expect("creator admits device flow");
2358
2359        let stale_authority = Arc::new(
2360            RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2361                Duration::from_secs(60),
2362                Arc::new(RuntimeAuthLeaseHandle::new()),
2363                &store_dyn,
2364            ),
2365        );
2366        let consumer = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2367            Duration::from_secs(60),
2368            Arc::new(RuntimeAuthLeaseHandle::new()),
2369            &store_dyn,
2370        );
2371
2372        store.block_next_oauth_persist();
2373        let stale_admit = std::thread::spawn({
2374            let stale_authority = Arc::clone(&stale_authority);
2375            let replacement_target = replacement_target.clone();
2376            move || {
2377                stale_authority.admit_device_code(
2378                    replacement_target,
2379                    provider,
2380                    replacement_device_code.to_string(),
2381                    Duration::from_secs(60),
2382                )
2383            }
2384        });
2385        store.wait_for_blocked_oauth_persist();
2386        consumer
2387            .begin_device_code_poll(consumed_device_code, &target, provider)
2388            .expect("independent authority begins device poll")
2389            .consume()
2390            .expect("independent authority consumes device flow between sync and persist");
2391        store.release_blocked_oauth_persist();
2392        stale_admit
2393            .join()
2394            .expect("stale device admit thread should not panic")
2395            .expect("stale authority admits replacement device flow");
2396
2397        let snapshot_json = store
2398            .load_auth_oauth_flow_snapshot()
2399            .expect("durable OAuth snapshot loads")
2400            .expect("durable OAuth snapshot exists");
2401        let snapshot = serde_json::from_slice::<OAuthFlowRegistrySnapshot>(&snapshot_json)
2402            .expect("durable OAuth snapshot decodes");
2403        assert!(
2404            !snapshot
2405                .device
2406                .iter()
2407                .any(|flow| flow.device_code == consumed_device_code),
2408            "a stale admission must not resurrect a device flow consumed after pre-sync"
2409        );
2410        assert!(
2411            snapshot
2412                .device
2413                .iter()
2414                .any(|flow| flow.device_code == replacement_device_code),
2415            "the stale authority's newly admitted device flow should still persist"
2416        );
2417
2418        let restarted = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2419            Duration::from_secs(60),
2420            Arc::new(RuntimeAuthLeaseHandle::new()),
2421            &store_dyn,
2422        );
2423        assert!(matches!(
2424            restarted.verify_device_code(consumed_device_code, &target, provider),
2425            Err(OAuthFlowError::LifecycleRejected {
2426                operation: "verify_oauth_device_flow",
2427                ..
2428            })
2429        ));
2430        restarted
2431            .verify_device_code(replacement_device_code, &replacement_target, provider)
2432            .expect("new stale-authority device flow survives restart");
2433    }
2434
2435    #[cfg(feature = "sqlite-store")]
2436    #[test]
2437    fn persistent_oauth_device_poll_finish_does_not_resurrect_consumed_payload() {
2438        let temp_dir = tempfile::tempdir().expect("tempdir");
2439        let store_path = temp_dir.path().join("runtime.sqlite");
2440        let creator_store: Arc<dyn RuntimeStore> =
2441            Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2442        let stale_store: Arc<dyn RuntimeStore> =
2443            Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2444        let consumer_store: Arc<dyn RuntimeStore> =
2445            Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2446        let creator = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2447            Duration::from_secs(60),
2448            Arc::new(RuntimeAuthLeaseHandle::new()),
2449            &creator_store,
2450        );
2451        let target = target();
2452        let provider = OAuthProviderIdentity::GoogleCodeAssist;
2453        let device_code = "pending-finish-device-code";
2454        creator
2455            .admit_device_code(
2456                target.clone(),
2457                provider,
2458                device_code.to_string(),
2459                Duration::from_secs(60),
2460            )
2461            .expect("creator admits device flow");
2462
2463        let stale_authority = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2464            Duration::from_secs(60),
2465            Arc::new(RuntimeAuthLeaseHandle::new()),
2466            &stale_store,
2467        );
2468        let stale_poll = stale_authority
2469            .begin_device_code_poll(device_code, &target, provider)
2470            .expect("stale authority begins pending poll");
2471        let consumer = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2472            Duration::from_secs(60),
2473            Arc::new(RuntimeAuthLeaseHandle::new()),
2474            &consumer_store,
2475        );
2476        consumer
2477            .begin_device_code_poll(device_code, &target, provider)
2478            .expect("consumer begins independent poll")
2479            .consume()
2480            .expect("consumer consumes durable payload");
2481
2482        stale_poll
2483            .finish()
2484            .expect("stale pending poll finish is local cleanup only");
2485
2486        let snapshot_json = stale_store
2487            .load_auth_oauth_flow_snapshot()
2488            .expect("durable OAuth snapshot loads")
2489            .expect("durable OAuth snapshot exists");
2490        let snapshot = serde_json::from_slice::<OAuthFlowRegistrySnapshot>(&snapshot_json)
2491            .expect("durable OAuth snapshot decodes");
2492        assert!(
2493            !snapshot
2494                .device
2495                .iter()
2496                .any(|flow| flow.device_code == device_code),
2497            "a stale pending poll finish must not resurrect a consumed device flow"
2498        );
2499
2500        let restarted_store: Arc<dyn RuntimeStore> =
2501            Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2502        let restarted = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2503            Duration::from_secs(60),
2504            Arc::new(RuntimeAuthLeaseHandle::new()),
2505            &restarted_store,
2506        );
2507        assert!(matches!(
2508            restarted.verify_device_code(device_code, &target, provider),
2509            Err(OAuthFlowError::LifecycleRejected {
2510                operation: "verify_oauth_device_flow",
2511                ..
2512            })
2513        ));
2514    }
2515
2516    #[cfg(feature = "sqlite-store")]
2517    #[test]
2518    fn persistent_oauth_browser_sync_prunes_stale_capacity_before_admit() {
2519        let temp_dir = tempfile::tempdir().expect("tempdir");
2520        let store_path = temp_dir.path().join("runtime.sqlite");
2521        let creator_store: Arc<dyn RuntimeStore> =
2522            Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2523        let stale_store: Arc<dyn RuntimeStore> =
2524            Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2525        let consumer_store: Arc<dyn RuntimeStore> =
2526            Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2527        let creator = RuntimeOAuthFlowHandle::new_with_capacity_auth_lease_and_store(
2528            Duration::from_secs(60),
2529            1,
2530            Arc::new(RuntimeAuthLeaseHandle::new()),
2531            Some(Arc::downgrade(&creator_store)),
2532        );
2533        let target = target();
2534        let replacement_target = alternate_target();
2535        let provider = OAuthProviderIdentity::OpenAiChatGpt;
2536        let redirect_uri = "http://127.0.0.1/callback";
2537        let replacement_redirect_uri = "http://127.0.0.1/replacement-callback";
2538        let consumed_state = creator
2539            .start(
2540                target.clone(),
2541                provider,
2542                redirect_uri.to_string(),
2543                "consumed-verifier".to_string(),
2544            )
2545            .expect("creator admits browser flow");
2546
2547        let stale_authority = RuntimeOAuthFlowHandle::new_with_capacity_auth_lease_and_store(
2548            Duration::from_secs(60),
2549            1,
2550            Arc::new(RuntimeAuthLeaseHandle::new()),
2551            Some(Arc::downgrade(&stale_store)),
2552        );
2553        let consumer = RuntimeOAuthFlowHandle::new_with_capacity_auth_lease_and_store(
2554            Duration::from_secs(60),
2555            1,
2556            Arc::new(RuntimeAuthLeaseHandle::new()),
2557            Some(Arc::downgrade(&consumer_store)),
2558        );
2559        consumer
2560            .consume(&consumed_state, &target, provider, redirect_uri)
2561            .expect("independent authority consumes browser flow");
2562
2563        let replacement_state = stale_authority
2564            .start(
2565                replacement_target.clone(),
2566                provider,
2567                replacement_redirect_uri.to_string(),
2568                "replacement-verifier".to_string(),
2569            )
2570            .expect("stale capacity is pruned before browser admit");
2571        stale_authority
2572            .consume(
2573                &replacement_state,
2574                &replacement_target,
2575                provider,
2576                replacement_redirect_uri,
2577            )
2578            .expect("replacement browser flow remains usable");
2579    }
2580
2581    #[cfg(feature = "sqlite-store")]
2582    #[test]
2583    fn persistent_oauth_device_sync_prunes_stale_capacity_before_admit() {
2584        let temp_dir = tempfile::tempdir().expect("tempdir");
2585        let store_path = temp_dir.path().join("runtime.sqlite");
2586        let creator_store: Arc<dyn RuntimeStore> =
2587            Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2588        let stale_store: Arc<dyn RuntimeStore> =
2589            Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2590        let consumer_store: Arc<dyn RuntimeStore> =
2591            Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2592        let creator = RuntimeOAuthFlowHandle::new_with_capacity_auth_lease_and_store(
2593            Duration::from_secs(60),
2594            1,
2595            Arc::new(RuntimeAuthLeaseHandle::new()),
2596            Some(Arc::downgrade(&creator_store)),
2597        );
2598        let target = target();
2599        let replacement_target = alternate_target();
2600        let provider = OAuthProviderIdentity::GoogleCodeAssist;
2601        let consumed_device_code = "consumed-capacity-device-code";
2602        let replacement_device_code = "replacement-device-code";
2603        creator
2604            .admit_device_code(
2605                target.clone(),
2606                provider,
2607                consumed_device_code.to_string(),
2608                Duration::from_secs(60),
2609            )
2610            .expect("creator admits device flow");
2611
2612        let stale_authority = RuntimeOAuthFlowHandle::new_with_capacity_auth_lease_and_store(
2613            Duration::from_secs(60),
2614            1,
2615            Arc::new(RuntimeAuthLeaseHandle::new()),
2616            Some(Arc::downgrade(&stale_store)),
2617        );
2618        let consumer = RuntimeOAuthFlowHandle::new_with_capacity_auth_lease_and_store(
2619            Duration::from_secs(60),
2620            1,
2621            Arc::new(RuntimeAuthLeaseHandle::new()),
2622            Some(Arc::downgrade(&consumer_store)),
2623        );
2624        consumer
2625            .begin_device_code_poll(consumed_device_code, &target, provider)
2626            .expect("independent authority begins device poll")
2627            .consume()
2628            .expect("independent authority consumes device flow");
2629
2630        stale_authority
2631            .admit_device_code(
2632                replacement_target.clone(),
2633                provider,
2634                replacement_device_code.to_string(),
2635                Duration::from_secs(60),
2636            )
2637            .expect("stale capacity is pruned before device admit");
2638        stale_authority
2639            .verify_device_code(replacement_device_code, &replacement_target, provider)
2640            .expect("replacement device flow remains usable");
2641    }
2642
2643    #[test]
2644    fn concurrent_persistent_browser_consumes_require_fresh_durable_claim() {
2645        let store = Arc::new(FailingOAuthSnapshotStore::default());
2646        let store_dyn = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2647        let creator = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2648            Duration::from_secs(60),
2649            Arc::new(RuntimeAuthLeaseHandle::new()),
2650            &store_dyn,
2651        );
2652        let target = target();
2653        let provider = OAuthProviderIdentity::OpenAiChatGpt;
2654        let redirect_uri = "http://127.0.0.1/callback";
2655        let state = creator
2656            .start(
2657                target.clone(),
2658                provider,
2659                redirect_uri.to_string(),
2660                "verifier".to_string(),
2661            )
2662            .expect("browser flow admitted");
2663        let first = Arc::new(
2664            RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2665                Duration::from_secs(60),
2666                Arc::new(RuntimeAuthLeaseHandle::new()),
2667                &store_dyn,
2668            ),
2669        );
2670        let second = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2671            Duration::from_secs(60),
2672            Arc::new(RuntimeAuthLeaseHandle::new()),
2673            &store_dyn,
2674        );
2675
2676        store.block_next_oauth_persist();
2677        let first_consume = std::thread::spawn({
2678            let first = Arc::clone(&first);
2679            let state = state.clone();
2680            let target = target.clone();
2681            move || first.consume(&state, &target, provider, redirect_uri)
2682        });
2683        store.wait_for_blocked_oauth_persist();
2684
2685        second
2686            .consume(&state, &target, provider, redirect_uri)
2687            .expect("second authority wins durable consume race");
2688        store.release_blocked_oauth_persist();
2689        assert!(matches!(
2690            first_consume
2691                .join()
2692                .expect("first consume thread should not panic"),
2693            Err(OAuthFlowError::RegistryProjectionMissing {
2694                operation: "consume_oauth_browser_flow"
2695            })
2696        ));
2697    }
2698
2699    #[test]
2700    fn concurrent_persistent_device_consumes_require_fresh_durable_claim() {
2701        let store = Arc::new(FailingOAuthSnapshotStore::default());
2702        let store_dyn = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2703        let creator = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2704            Duration::from_secs(60),
2705            Arc::new(RuntimeAuthLeaseHandle::new()),
2706            &store_dyn,
2707        );
2708        let target = target();
2709        let provider = OAuthProviderIdentity::GoogleCodeAssist;
2710        let device_code = "race-device-code";
2711        creator
2712            .admit_device_code(
2713                target.clone(),
2714                provider,
2715                device_code.to_string(),
2716                Duration::from_secs(60),
2717            )
2718            .expect("device flow admitted");
2719        let first = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2720            Duration::from_secs(60),
2721            Arc::new(RuntimeAuthLeaseHandle::new()),
2722            &store_dyn,
2723        );
2724        let second = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2725            Duration::from_secs(60),
2726            Arc::new(RuntimeAuthLeaseHandle::new()),
2727            &store_dyn,
2728        );
2729        let first_poll = first
2730            .begin_device_code_poll(device_code, &target, provider)
2731            .expect("first authority begins poll");
2732        let second_poll = second
2733            .begin_device_code_poll(device_code, &target, provider)
2734            .expect("second authority begins poll");
2735
2736        store.block_next_oauth_persist();
2737        let first_consume = std::thread::spawn(move || first_poll.consume());
2738        store.wait_for_blocked_oauth_persist();
2739
2740        second_poll
2741            .consume()
2742            .expect("second authority wins durable device consume race");
2743        store.release_blocked_oauth_persist();
2744        assert!(matches!(
2745            first_consume
2746                .join()
2747                .expect("first device consume thread should not panic"),
2748            Err(OAuthFlowError::RegistryProjectionMissing {
2749                operation: "consume_oauth_device_flow"
2750            })
2751        ));
2752    }
2753
2754    #[test]
2755    fn concurrent_persistent_browser_admits_require_fresh_durable_capacity() {
2756        let store = Arc::new(FailingOAuthSnapshotStore::default());
2757        let first_store = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2758        let second_store = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2759        let first = Arc::new(
2760            RuntimeOAuthFlowHandle::new_with_capacity_auth_lease_and_store(
2761                Duration::from_secs(60),
2762                1,
2763                Arc::new(RuntimeAuthLeaseHandle::new()),
2764                Some(Arc::downgrade(&first_store)),
2765            ),
2766        );
2767        let second = RuntimeOAuthFlowHandle::new_with_capacity_auth_lease_and_store(
2768            Duration::from_secs(60),
2769            1,
2770            Arc::new(RuntimeAuthLeaseHandle::new()),
2771            Some(Arc::downgrade(&second_store)),
2772        );
2773        let first_target = target();
2774        let second_target = alternate_target();
2775        let provider = OAuthProviderIdentity::OpenAiChatGpt;
2776        let first_redirect_uri = "http://127.0.0.1/first-callback";
2777        let second_redirect_uri = "http://127.0.0.1/second-callback";
2778
2779        store.block_next_oauth_persist();
2780        let first_admit = std::thread::spawn({
2781            let first = Arc::clone(&first);
2782            let first_target = first_target.clone();
2783            move || {
2784                first.start(
2785                    first_target,
2786                    provider,
2787                    first_redirect_uri.to_string(),
2788                    "first-verifier".to_string(),
2789                )
2790            }
2791        });
2792        store.wait_for_blocked_oauth_persist();
2793
2794        second
2795            .start(
2796                second_target,
2797                provider,
2798                second_redirect_uri.to_string(),
2799                "second-verifier".to_string(),
2800            )
2801            .expect("second authority wins durable browser admission race");
2802        store.release_blocked_oauth_persist();
2803        assert!(matches!(
2804            first_admit
2805                .join()
2806                .expect("first browser admit thread should not panic"),
2807            Err(OAuthFlowError::CapacityExceeded { max_outstanding: 1 })
2808        ));
2809    }
2810
2811    #[test]
2812    fn concurrent_persistent_device_admits_require_fresh_durable_capacity() {
2813        let store = Arc::new(FailingOAuthSnapshotStore::default());
2814        let first_store = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2815        let second_store = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2816        let first = Arc::new(
2817            RuntimeOAuthFlowHandle::new_with_capacity_auth_lease_and_store(
2818                Duration::from_secs(60),
2819                1,
2820                Arc::new(RuntimeAuthLeaseHandle::new()),
2821                Some(Arc::downgrade(&first_store)),
2822            ),
2823        );
2824        let second = RuntimeOAuthFlowHandle::new_with_capacity_auth_lease_and_store(
2825            Duration::from_secs(60),
2826            1,
2827            Arc::new(RuntimeAuthLeaseHandle::new()),
2828            Some(Arc::downgrade(&second_store)),
2829        );
2830        let first_target = target();
2831        let second_target = alternate_target();
2832        let provider = OAuthProviderIdentity::GoogleCodeAssist;
2833
2834        store.block_next_oauth_persist();
2835        let first_admit = std::thread::spawn({
2836            let first = Arc::clone(&first);
2837            let first_target = first_target.clone();
2838            move || {
2839                first.admit_device_code(
2840                    first_target,
2841                    provider,
2842                    "first-device-code".to_string(),
2843                    Duration::from_secs(60),
2844                )
2845            }
2846        });
2847        store.wait_for_blocked_oauth_persist();
2848
2849        second
2850            .admit_device_code(
2851                second_target,
2852                provider,
2853                "second-device-code".to_string(),
2854                Duration::from_secs(60),
2855            )
2856            .expect("second authority wins durable device admission race");
2857        store.release_blocked_oauth_persist();
2858        assert!(matches!(
2859            first_admit
2860                .join()
2861                .expect("first device admit thread should not panic"),
2862            Err(OAuthFlowError::CapacityExceeded { max_outstanding: 1 })
2863        ));
2864    }
2865
2866    #[test]
2867    fn concurrent_browser_admits_preserve_newer_durable_snapshot() {
2868        let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
2869        let store = Arc::new(FailingOAuthSnapshotStore::default());
2870        let store_dyn = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2871        let authority = Arc::new(
2872            RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2873                Duration::from_secs(60),
2874                lifecycle,
2875                &store_dyn,
2876            ),
2877        );
2878        let first_target = target();
2879        let second_target = alternate_target();
2880        let provider = OAuthProviderIdentity::OpenAiChatGpt;
2881        let first_redirect_uri = "http://127.0.0.1/callback";
2882        let second_redirect_uri = "http://127.0.0.1/other-callback";
2883
2884        store.block_next_oauth_persist();
2885        let first_admit = std::thread::spawn({
2886            let authority = Arc::clone(&authority);
2887            let target = first_target.clone();
2888            move || {
2889                authority.start(
2890                    target,
2891                    provider,
2892                    first_redirect_uri.to_string(),
2893                    "verifier-1".to_string(),
2894                )
2895            }
2896        });
2897        store.wait_for_blocked_oauth_persist();
2898
2899        let (second_done_tx, second_done_rx) = std::sync::mpsc::channel();
2900        std::thread::spawn({
2901            let authority = Arc::clone(&authority);
2902            let target = second_target.clone();
2903            move || {
2904                let result = authority.start(
2905                    target,
2906                    provider,
2907                    second_redirect_uri.to_string(),
2908                    "verifier-2".to_string(),
2909                );
2910                let _ = second_done_tx.send(result);
2911            }
2912        });
2913        let second_before_release = second_done_rx.recv_timeout(Duration::from_millis(100)).ok();
2914        store.release_blocked_oauth_persist();
2915        first_admit
2916            .join()
2917            .expect("first admit thread should not panic")
2918            .expect("first browser flow admitted");
2919        let second_state = second_before_release
2920            .unwrap_or_else(|| {
2921                second_done_rx
2922                    .recv_timeout(Duration::from_secs(1))
2923                    .expect("second admit should finish after first durable write is released")
2924            })
2925            .expect("second browser flow admitted");
2926
2927        let snapshot_json = store
2928            .load_auth_oauth_flow_snapshot()
2929            .expect("durable OAuth snapshot loads")
2930            .expect("durable OAuth snapshot exists");
2931        let snapshot = serde_json::from_slice::<OAuthFlowRegistrySnapshot>(&snapshot_json)
2932            .expect("durable OAuth snapshot decodes");
2933        assert!(
2934            snapshot
2935                .browser
2936                .iter()
2937                .any(|flow| flow.state == second_state),
2938            "durable snapshot must retain flow admitted by a concurrent newer write"
2939        );
2940
2941        let restarted_lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
2942        let restarted = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2943            Duration::from_secs(60),
2944            restarted_lifecycle,
2945            &store_dyn,
2946        );
2947        let record = restarted
2948            .consume(&second_state, &second_target, provider, second_redirect_uri)
2949            .expect("newer durable flow should survive restart");
2950        assert_eq!(record.pkce_verifier, "verifier-2");
2951    }
2952
2953    #[test]
2954    fn browser_consume_persistence_failure_keeps_flow_retryable() {
2955        let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
2956        let store = Arc::new(FailingOAuthSnapshotStore::default());
2957        let store_dyn = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2958        let authority = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2959            Duration::from_secs(60),
2960            lifecycle.clone(),
2961            &store_dyn,
2962        );
2963        let target = target();
2964        let provider = OAuthProviderIdentity::OpenAiChatGpt;
2965        let redirect_uri = "http://127.0.0.1/callback";
2966        let state = authority
2967            .start(
2968                target.clone(),
2969                provider,
2970                redirect_uri.to_string(),
2971                "verifier".to_string(),
2972            )
2973            .expect("browser flow admitted");
2974
2975        store.fail_oauth_persist();
2976        assert!(matches!(
2977            authority.consume(&state, &target, provider, redirect_uri),
2978            Err(OAuthFlowError::PersistenceFailed { .. })
2979        ));
2980        assert!(lifecycle.has_oauth_browser_flow_for_test(&target, &state));
2981
2982        store.allow_oauth_persist();
2983        authority
2984            .consume(&state, &target, provider, redirect_uri)
2985            .expect("failed durable consume remains retryable");
2986    }
2987
2988    #[test]
2989    fn device_consume_persistence_failure_keeps_flow_retryable() {
2990        let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
2991        let store = Arc::new(FailingOAuthSnapshotStore::default());
2992        let store_dyn = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2993        let authority = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2994            Duration::from_secs(60),
2995            lifecycle.clone(),
2996            &store_dyn,
2997        );
2998        let target = target();
2999        let provider = OAuthProviderIdentity::GoogleCodeAssist;
3000        let device_code = "provider-device-code";
3001        authority
3002            .admit_device_code(
3003                target.clone(),
3004                provider,
3005                device_code.to_string(),
3006                Duration::from_secs(60),
3007            )
3008            .expect("device flow admitted");
3009        let poll = authority
3010            .begin_device_code_poll(device_code, &target, provider)
3011            .expect("device poll begins");
3012
3013        store.fail_oauth_persist();
3014        assert!(matches!(
3015            poll.consume(),
3016            Err(OAuthFlowError::PersistenceFailed { .. })
3017        ));
3018        assert!(lifecycle.has_oauth_device_flow_for_test(&target, device_code));
3019
3020        store.allow_oauth_persist();
3021        let retry = authority
3022            .begin_device_code_poll(device_code, &target, provider)
3023            .expect("failed durable consume keeps device flow retryable");
3024        retry
3025            .consume()
3026            .expect("retry consumes after durable persistence recovers");
3027    }
3028
3029    #[test]
3030    fn release_persistence_failure_keeps_released_flows_retryable() {
3031        let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
3032        let store = Arc::new(FailingOAuthSnapshotStore::default());
3033        let store_dyn = Arc::clone(&store) as Arc<dyn RuntimeStore>;
3034        let authority = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
3035            Duration::from_secs(60),
3036            lifecycle.clone(),
3037            &store_dyn,
3038        );
3039        let target = target();
3040        let lease_key = LeaseKey::from_auth_binding(&target);
3041        let provider = OAuthProviderIdentity::OpenAiChatGpt;
3042        let redirect_uri = "http://127.0.0.1/callback";
3043        let state = authority
3044            .start(
3045                target.clone(),
3046                provider,
3047                redirect_uri.to_string(),
3048                "verifier".to_string(),
3049            )
3050            .expect("browser flow admitted");
3051
3052        store.fail_oauth_persist();
3053        assert!(
3054            lifecycle.release_lease(&lease_key).is_err(),
3055            "release should fail closed when durable OAuth cleanup cannot persist"
3056        );
3057        assert!(lifecycle.has_oauth_browser_flow_for_test(&target, &state));
3058
3059        store.allow_oauth_persist();
3060        authority
3061            .consume(&state, &target, provider, redirect_uri)
3062            .expect("failed durable release leaves browser flow retryable");
3063    }
3064
3065    #[test]
3066    fn stale_release_persistence_failure_does_not_install_released_authority() {
3067        let releasing_lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
3068        let store = Arc::new(FailingOAuthSnapshotStore::default());
3069        let store_dyn = Arc::clone(&store) as Arc<dyn RuntimeStore>;
3070        let releasing_authority = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
3071            Duration::from_secs(60),
3072            releasing_lifecycle.clone(),
3073            &store_dyn,
3074        );
3075        let admitting_authority = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
3076            Duration::from_secs(60),
3077            Arc::new(RuntimeAuthLeaseHandle::new()),
3078            &store_dyn,
3079        );
3080        let target = target();
3081        let lease_key = LeaseKey::from_auth_binding(&target);
3082        let provider = OAuthProviderIdentity::OpenAiChatGpt;
3083        let redirect_uri = "http://127.0.0.1/callback";
3084        let state = admitting_authority
3085            .start(
3086                target.clone(),
3087                provider,
3088                redirect_uri.to_string(),
3089                "verifier".to_string(),
3090            )
3091            .expect("other authority admits browser flow");
3092        assert!(
3093            !releasing_lifecycle.has_oauth_browser_flow_for_test(&target, &state),
3094            "releasing authority starts stale and has no local machine membership"
3095        );
3096
3097        store.fail_oauth_persist();
3098        assert!(
3099            releasing_lifecycle.release_lease(&lease_key).is_err(),
3100            "release should fail closed when stale durable OAuth cleanup cannot persist"
3101        );
3102        assert_eq!(
3103            releasing_lifecycle.snapshot(&lease_key).phase,
3104            None,
3105            "failed stale release must not synthesize a local AuthMachine authority"
3106        );
3107
3108        store.allow_oauth_persist();
3109        releasing_authority
3110            .consume(&state, &target, provider, redirect_uri)
3111            .expect("failed stale durable release must leave browser flow retryable");
3112    }
3113
3114    #[cfg(feature = "sqlite-store")]
3115    #[test]
3116    fn persistent_release_prunes_durable_flows_from_stale_authority() {
3117        let temp_dir = tempfile::tempdir().expect("tempdir");
3118        let store_path = temp_dir.path().join("runtime.sqlite");
3119        let releasing_store: Arc<dyn RuntimeStore> =
3120            Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
3121        let admitting_store: Arc<dyn RuntimeStore> =
3122            Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
3123        let releasing_lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
3124        let releasing_authority = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
3125            Duration::from_secs(60),
3126            releasing_lifecycle.clone(),
3127            &releasing_store,
3128        );
3129        let admitting_authority = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
3130            Duration::from_secs(60),
3131            Arc::new(RuntimeAuthLeaseHandle::new()),
3132            &admitting_store,
3133        );
3134        let target = target();
3135        let lease_key = LeaseKey::from_auth_binding(&target);
3136        let browser_provider = OAuthProviderIdentity::OpenAiChatGpt;
3137        let redirect_uri = "http://127.0.0.1/callback";
3138        let browser_state = admitting_authority
3139            .start(
3140                target.clone(),
3141                browser_provider,
3142                redirect_uri.to_string(),
3143                "browser-verifier".to_string(),
3144            )
3145            .expect("other authority admits browser flow");
3146        let device_provider = OAuthProviderIdentity::GoogleCodeAssist;
3147        let device_code = "released-device-code";
3148        admitting_authority
3149            .admit_device_code(
3150                target.clone(),
3151                device_provider,
3152                device_code.to_string(),
3153                Duration::from_secs(60),
3154            )
3155            .expect("other authority admits device flow");
3156        assert!(
3157            !releasing_lifecycle.has_oauth_browser_flow_for_test(&target, &browser_state),
3158            "releasing authority starts stale and does not know the browser flow locally"
3159        );
3160        assert!(
3161            !releasing_lifecycle.has_oauth_device_flow_for_test(&target, device_code),
3162            "releasing authority starts stale and does not know the device flow locally"
3163        );
3164
3165        releasing_lifecycle
3166            .release_lease(&lease_key)
3167            .expect("stale release succeeds");
3168
3169        let restarted_store: Arc<dyn RuntimeStore> =
3170            Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
3171        let restarted = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
3172            Duration::from_secs(60),
3173            Arc::new(RuntimeAuthLeaseHandle::new()),
3174            &restarted_store,
3175        );
3176        let browser_after_release =
3177            restarted.consume(&browser_state, &target, browser_provider, redirect_uri);
3178        let device_after_release =
3179            restarted.verify_device_code(device_code, &target, device_provider);
3180        assert!(
3181            matches!(
3182                browser_after_release,
3183                Err(OAuthFlowError::LifecycleRejected {
3184                    operation: "verify_oauth_browser_flow",
3185                    ..
3186                })
3187            ),
3188            "release from a stale authority must prune durable browser flow, got {browser_after_release:?}"
3189        );
3190        assert!(
3191            matches!(
3192                device_after_release,
3193                Err(OAuthFlowError::LifecycleRejected {
3194                    operation: "verify_oauth_device_flow",
3195                    ..
3196                })
3197            ),
3198            "release from a stale authority must prune durable device flow, got {device_after_release:?}"
3199        );
3200        drop(releasing_authority);
3201    }
3202
3203    #[test]
3204    fn oauth_flow_membership_does_not_advance_credential_generation() {
3205        let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
3206        let authority =
3207            RuntimeOAuthFlowHandle::new_with_auth_lease(Duration::from_secs(60), lifecycle.clone());
3208        let target = target();
3209        let lease_key = LeaseKey::from_auth_binding(&target);
3210        let provider = OAuthProviderIdentity::OpenAiChatGpt;
3211        let redirect_uri = "http://127.0.0.1/callback";
3212        let transition = lifecycle
3213            .acquire_lease(&lease_key, 4_200)
3214            .expect("credential lifecycle acquired");
3215
3216        let state = authority
3217            .start(
3218                target.clone(),
3219                provider,
3220                redirect_uri.to_string(),
3221                "verifier".to_string(),
3222            )
3223            .expect("browser flow admitted");
3224        authority
3225            .verify(&state, &target, provider, redirect_uri)
3226            .expect("browser flow verifies");
3227        authority
3228            .consume(&state, &target, provider, redirect_uri)
3229            .expect("browser flow consumes");
3230
3231        let snapshot = lifecycle.snapshot(&lease_key);
3232        assert_eq!(snapshot.generation, transition.generation);
3233        assert_eq!(
3234            snapshot.credential_published_at_millis,
3235            transition.credential_published_at_millis
3236        );
3237    }
3238
3239    #[test]
3240    fn global_browser_expiry_preserves_reauth_required_phase() {
3241        let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
3242        let authority = RuntimeOAuthFlowHandle::new_with_auth_lease(
3243            Duration::from_millis(1),
3244            lifecycle.clone(),
3245        );
3246        let target = target();
3247        let other_target = alternate_target();
3248        let provider = OAuthProviderIdentity::OpenAiChatGpt;
3249        let redirect_uri = "http://127.0.0.1/callback";
3250
3251        let expired_state = authority
3252            .start(
3253                target.clone(),
3254                provider,
3255                redirect_uri.to_string(),
3256                "verifier-old".to_string(),
3257            )
3258            .expect("browser flow admitted");
3259        assert_eq!(
3260            snapshot_phase(&lifecycle, &target),
3261            Some(AuthLeasePhase::ReauthRequired)
3262        );
3263        std::thread::sleep(Duration::from_millis(10));
3264
3265        authority
3266            .start(
3267                other_target,
3268                provider,
3269                redirect_uri.to_string(),
3270                "verifier-new".to_string(),
3271            )
3272            .expect("new browser flow admitted after pruning expired flow");
3273
3274        assert!(
3275            !lifecycle.has_oauth_browser_flow_for_test(&target, &expired_state),
3276            "passive registry expiry must remove stale AuthMachine browser membership"
3277        );
3278        assert_eq!(
3279            snapshot_phase(&lifecycle, &target),
3280            Some(AuthLeasePhase::ReauthRequired),
3281            "global OAuth expiry cleanup must not change credential lifecycle truth"
3282        );
3283    }
3284
3285    #[test]
3286    fn browser_passive_expiry_clears_lifecycle_membership_on_next_admit() {
3287        let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
3288        let authority = RuntimeOAuthFlowHandle::new_with_auth_lease(
3289            Duration::from_millis(1),
3290            lifecycle.clone(),
3291        );
3292        let target = target();
3293        let provider = OAuthProviderIdentity::OpenAiChatGpt;
3294        let redirect_uri = "http://127.0.0.1/callback";
3295
3296        let expired_state = authority
3297            .start(
3298                target.clone(),
3299                provider,
3300                redirect_uri.to_string(),
3301                "verifier-old".to_string(),
3302            )
3303            .expect("browser flow admitted");
3304        assert!(lifecycle.has_oauth_browser_flow_for_test(&target, &expired_state));
3305        std::thread::sleep(Duration::from_millis(10));
3306
3307        authority
3308            .start(
3309                target.clone(),
3310                provider,
3311                redirect_uri.to_string(),
3312                "verifier-new".to_string(),
3313            )
3314            .expect("new browser flow admitted after pruning expired flow");
3315
3316        assert!(
3317            !lifecycle.has_oauth_browser_flow_for_test(&target, &expired_state),
3318            "passive registry expiry must remove stale AuthMachine browser membership"
3319        );
3320    }
3321
3322    #[test]
3323    fn device_admit_rejects_registry_pruned_canonical_membership() {
3324        let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
3325        let authority =
3326            RuntimeOAuthFlowHandle::new_with_auth_lease(Duration::from_secs(60), lifecycle.clone());
3327        let target = target();
3328        let provider = OAuthProviderIdentity::GoogleCodeAssist;
3329        let device_code = "provider-device-code";
3330
3331        authority
3332            .admit_device_code(
3333                target.clone(),
3334                provider,
3335                device_code.to_string(),
3336                Duration::from_secs(60),
3337            )
3338            .expect("device flow admitted");
3339        assert!(lifecycle.has_oauth_device_flow_for_test(&target, device_code));
3340        authority
3341            .registry
3342            .expire_device_code(device_code, &target, provider)
3343            .expect("test removes registry record without lifecycle cleanup");
3344        assert!(lifecycle.has_oauth_device_flow_for_test(&target, device_code));
3345
3346        assert!(matches!(
3347            authority.admit_device_code(
3348                target.clone(),
3349                provider,
3350                device_code.to_string(),
3351                Duration::from_secs(60),
3352            ),
3353            Err(OAuthFlowError::LifecycleRejected {
3354                operation: "admit_oauth_device_flow",
3355                ..
3356            })
3357        ));
3358
3359        assert!(
3360            lifecycle.has_oauth_device_flow_for_test(&target, device_code),
3361            "registry-only loss must not expire canonical AuthMachine device membership"
3362        );
3363        assert!(matches!(
3364            authority.verify_device_code(device_code, &target, provider),
3365            Err(OAuthFlowError::RegistryProjectionMissing {
3366                operation: "verify_oauth_device_flow"
3367            })
3368        ));
3369        assert!(matches!(
3370            authority.begin_device_code_poll(device_code, &target, provider),
3371            Err(OAuthFlowError::RegistryProjectionMissing {
3372                operation: "begin_oauth_device_poll"
3373            })
3374        ));
3375        assert!(
3376            lifecycle.has_oauth_device_flow_for_test(&target, device_code),
3377            "missing process-local device payload must fail closed without removing the flow"
3378        );
3379    }
3380
3381    #[test]
3382    fn duplicate_device_admit_preserves_active_lifecycle_membership() {
3383        let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
3384        let authority =
3385            RuntimeOAuthFlowHandle::new_with_auth_lease(Duration::from_secs(60), lifecycle.clone());
3386        let target = target();
3387        let provider = OAuthProviderIdentity::GoogleCodeAssist;
3388        let device_code = "provider-device-code";
3389
3390        authority
3391            .admit_device_code(
3392                target.clone(),
3393                provider,
3394                device_code.to_string(),
3395                Duration::from_secs(60),
3396            )
3397            .expect("device flow admitted");
3398        let duplicate = authority.admit_device_code(
3399            target.clone(),
3400            provider,
3401            device_code.to_string(),
3402            Duration::from_secs(60),
3403        );
3404
3405        assert!(matches!(
3406            duplicate,
3407            Err(OAuthFlowError::LifecycleRejected {
3408                operation: "admit_oauth_device_flow",
3409                ..
3410            })
3411        ));
3412        assert!(lifecycle.has_oauth_device_flow_for_test(&target, device_code));
3413        authority
3414            .begin_device_code_poll(device_code, &target, provider)
3415            .expect("duplicate admit must not orphan active lifecycle membership");
3416    }
3417
3418    #[test]
3419    fn registry_capacity_still_bounds_payloads_after_lifecycle_release() {
3420        let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
3421        let authority = RuntimeOAuthFlowHandle::new_with_capacity_and_auth_lease(
3422            Duration::from_secs(60),
3423            1,
3424            lifecycle.clone(),
3425        );
3426        let target = target();
3427        let provider = OAuthProviderIdentity::OpenAiChatGpt;
3428
3429        authority
3430            .start(
3431                target.clone(),
3432                provider,
3433                "http://127.0.0.1/callback".to_string(),
3434                "verifier-1".to_string(),
3435            )
3436            .expect("first browser flow admitted");
3437        lifecycle
3438            .release_lease(&LeaseKey::from_auth_binding(&target))
3439            .expect("credential lifecycle release succeeds");
3440
3441        authority
3442            .start(
3443                alternate_target(),
3444                provider,
3445                "http://127.0.0.1/other-callback".to_string(),
3446                "verifier-2".to_string(),
3447            )
3448            .expect("AuthMachine release must clear stale registry payload capacity");
3449    }
3450
3451    #[test]
3452    fn release_observer_does_not_prune_flow_admitted_after_release_acceptance() {
3453        let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
3454        let authority = Arc::new(RuntimeOAuthFlowHandle::new_with_auth_lease(
3455            Duration::from_secs(60),
3456            lifecycle.clone(),
3457        ));
3458        let target = target();
3459        let lease_key = LeaseKey::from_auth_binding(&target);
3460        let provider = OAuthProviderIdentity::OpenAiChatGpt;
3461        let redirect_uri = "http://127.0.0.1/callback";
3462        let old_state = authority
3463            .start(
3464                target.clone(),
3465                provider,
3466                redirect_uri.to_string(),
3467                "old-verifier".to_string(),
3468            )
3469            .expect("old browser flow admitted");
3470        let new_state = Arc::new(std::sync::Mutex::new(None));
3471        let new_state_for_hook = Arc::clone(&new_state);
3472        let authority_for_hook = Arc::clone(&authority);
3473        let target_for_hook = target.clone();
3474        let lease_key_for_hook = lease_key.clone();
3475        let _hook_guard = crate::handles::auth_lease::install_release_after_accept_hook_for_test(
3476            Arc::new(move |released_key| {
3477                if released_key != &lease_key_for_hook {
3478                    return;
3479                }
3480                let admitted = authority_for_hook
3481                    .start(
3482                        target_for_hook.clone(),
3483                        provider,
3484                        redirect_uri.to_string(),
3485                        "new-verifier".to_string(),
3486                    )
3487                    .expect("new browser flow admitted after release acceptance");
3488                *new_state_for_hook
3489                    .lock()
3490                    .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(admitted);
3491            }),
3492        );
3493
3494        lifecycle
3495            .release_lease(&lease_key)
3496            .expect("credential lifecycle release succeeds");
3497
3498        let new_state = new_state
3499            .lock()
3500            .unwrap_or_else(std::sync::PoisonError::into_inner)
3501            .clone()
3502            .expect("hook admitted replacement flow");
3503        let flow = authority
3504            .consume(&new_state, &target, provider, redirect_uri)
3505            .expect("release observer must not prune newly admitted flow");
3506        assert_eq!(flow.pkce_verifier, "new-verifier");
3507        assert!(matches!(
3508            authority.consume(&old_state, &target, provider, redirect_uri),
3509            Err(OAuthFlowError::LifecycleRejected {
3510                operation: "verify_oauth_browser_flow",
3511                ..
3512            })
3513        ));
3514    }
3515
3516    #[test]
3517    fn browser_capacity_rejection_comes_from_authmachine_lifecycle() {
3518        let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
3519        let authority = RuntimeOAuthFlowHandle::new_with_capacity_and_auth_lease(
3520            Duration::from_secs(60),
3521            1,
3522            lifecycle,
3523        );
3524        let target = target();
3525        let provider = OAuthProviderIdentity::OpenAiChatGpt;
3526        let redirect_uri = "http://127.0.0.1/callback";
3527
3528        authority
3529            .start(
3530                target.clone(),
3531                provider,
3532                redirect_uri.to_string(),
3533                "verifier-1".to_string(),
3534            )
3535            .expect("first browser flow admitted");
3536
3537        assert!(matches!(
3538            authority.start(
3539                alternate_target(),
3540                provider,
3541                "http://127.0.0.1/other-callback".to_string(),
3542                "verifier-2".to_string(),
3543            ),
3544            Err(OAuthFlowError::LifecycleRejected {
3545                operation: "admit_oauth_browser_flow",
3546                ..
3547            })
3548        ));
3549    }
3550
3551    #[test]
3552    fn browser_provider_mismatch_rejection_comes_from_authmachine_lifecycle() {
3553        let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
3554        let authority =
3555            RuntimeOAuthFlowHandle::new_with_auth_lease(Duration::from_secs(60), lifecycle);
3556        let target = target();
3557        let redirect_uri = "http://127.0.0.1/callback";
3558        let state = authority
3559            .start(
3560                target.clone(),
3561                OAuthProviderIdentity::OpenAiChatGpt,
3562                redirect_uri.to_string(),
3563                "verifier".to_string(),
3564            )
3565            .expect("browser flow admitted");
3566
3567        assert!(matches!(
3568            authority.verify(
3569                &state,
3570                &target,
3571                OAuthProviderIdentity::GoogleCodeAssist,
3572                redirect_uri,
3573            ),
3574            Err(OAuthFlowError::LifecycleRejected {
3575                operation: "verify_oauth_browser_flow",
3576                ..
3577            })
3578        ));
3579    }
3580}