1use std::collections::BTreeSet;
10use std::sync::{Arc, Mutex, Weak};
11use std::time::{Duration, Instant};
12
13use meerkat_auth_core::oauth_flow::{
14 OAuthDeviceFlowRecord, OAuthDevicePollLease, OAuthDevicePollLifecycle, OAuthFlowAuthority,
15 OAuthFlowError, OAuthFlowRecord, OAuthFlowRegistry, OAuthFlowRegistrySnapshot,
16 OAuthProviderIdentity, OAuthPrunedFlows, PersistedOAuthBrowserFlow, PersistedOAuthDeviceFlow,
17};
18use meerkat_core::AuthBindingRef;
19use meerkat_core::handles::{DslTransitionError, LeaseKey};
20use meerkat_core::time_compat::{SystemTime, UNIX_EPOCH};
21
22use crate::auth_machine::dsl as auth_dsl;
23use crate::store::RuntimeStore;
24
25use super::RuntimeAuthLeaseHandle;
26use super::auth_lease::{AuthLeaseReleaseObserver, ReleasedOAuthFlows};
27
28type StoreSlot = Arc<Mutex<Option<Weak<dyn RuntimeStore>>>>;
29type PayloadLock = Arc<Mutex<()>>;
30type RemovedBrowserSnapshotKeys = Vec<BrowserSnapshotKey>;
31type RemovedDeviceSnapshotKeys = Vec<DeviceSnapshotKey>;
32
33fn current_time_millis() -> u64 {
34 SystemTime::now()
35 .duration_since(UNIX_EPOCH)
36 .map(|duration| u64::try_from(duration.as_millis()).unwrap_or(u64::MAX))
37 .unwrap_or(0)
38}
39
40fn expires_at_millis(duration: Duration) -> Result<u64, OAuthFlowError> {
41 let duration_millis =
42 u64::try_from(duration.as_millis()).map_err(|_| OAuthFlowError::DeviceExpiryOutOfRange)?;
43 current_time_millis()
44 .checked_add(duration_millis)
45 .ok_or(OAuthFlowError::DeviceExpiryOutOfRange)
46}
47
48fn load_oauth_snapshot_for_release(
49 store: &StoreSlot,
50 operation: &'static str,
51) -> Result<Option<OAuthFlowRegistrySnapshot>, DslTransitionError> {
52 let store = store
53 .lock()
54 .unwrap_or_else(std::sync::PoisonError::into_inner)
55 .clone();
56 let Some(store) = store else {
57 return Ok(None);
58 };
59 let store = store.upgrade().ok_or_else(|| {
60 DslTransitionError::new(operation, "runtime store is no longer available")
61 })?;
62 let Some(bytes) = store
63 .load_auth_oauth_flow_snapshot()
64 .map_err(|err| DslTransitionError::new(operation, err.to_string()))?
65 else {
66 return Ok(None);
67 };
68 serde_json::from_slice::<OAuthFlowRegistrySnapshot>(&bytes)
69 .map(Some)
70 .map_err(|err| DslTransitionError::new(operation, err.to_string()))
71}
72
73#[derive(Debug)]
74pub struct RuntimeOAuthFlowHandle {
75 registry: Arc<OAuthFlowRegistry>,
76 lifecycle: Arc<RuntimeAuthLeaseHandle>,
77 store: StoreSlot,
78 payload_lock: PayloadLock,
79 _release_observer: Option<Arc<OAuthPayloadReleaseObserver>>,
80}
81
82#[derive(Debug)]
83struct OAuthPayloadReleaseObserver {
84 registry: Arc<OAuthFlowRegistry>,
85 store: StoreSlot,
86 payload_lock: PayloadLock,
87}
88
89impl AuthLeaseReleaseObserver for OAuthPayloadReleaseObserver {
90 fn oauth_flows_for_release(
91 &self,
92 lease_key: &LeaseKey,
93 ) -> Result<ReleasedOAuthFlows, DslTransitionError> {
94 let _payload_guard = self
95 .payload_lock
96 .lock()
97 .unwrap_or_else(std::sync::PoisonError::into_inner);
98 let target = AuthBindingRef {
99 realm: lease_key.realm.clone(),
100 binding: lease_key.binding.clone(),
101 profile: lease_key.profile.clone(),
102 };
103 let Some(snapshot) =
104 load_oauth_snapshot_for_release(&self.store, "collect_oauth_flow_payloads")?
105 else {
106 return Ok(ReleasedOAuthFlows {
107 lease_key: lease_key.clone(),
108 browser_flow_ids: Vec::new(),
109 device_flow_ids: Vec::new(),
110 });
111 };
112 let now_millis = current_time_millis();
113 Ok(ReleasedOAuthFlows {
114 lease_key: lease_key.clone(),
115 browser_flow_ids: snapshot
116 .browser
117 .iter()
118 .filter(|flow| flow.target == target && flow.expires_at_millis > now_millis)
119 .map(|flow| flow.state.clone())
120 .collect(),
121 device_flow_ids: snapshot
122 .device
123 .iter()
124 .filter(|flow| flow.target == target && flow.expires_at_millis > now_millis)
125 .map(|flow| flow.device_code.clone())
126 .collect(),
127 })
128 }
129
130 fn auth_lease_released(&self, released: &ReleasedOAuthFlows) -> Result<(), DslTransitionError> {
131 let _payload_guard = self
132 .payload_lock
133 .lock()
134 .unwrap_or_else(std::sync::PoisonError::into_inner);
135 let target = AuthBindingRef {
136 realm: released.lease_key.realm.clone(),
137 binding: released.lease_key.binding.clone(),
138 profile: released.lease_key.profile.clone(),
139 };
140 let browser_flow_ids = released
141 .browser_flow_ids
142 .iter()
143 .map(String::as_str)
144 .collect::<BTreeSet<_>>();
145 let device_flow_ids = released
146 .device_flow_ids
147 .iter()
148 .map(String::as_str)
149 .collect::<BTreeSet<_>>();
150 let now_millis = current_time_millis();
151 let mut snapshot = self.registry.snapshot_for_persistence(now_millis);
152 let mut removed_browser = snapshot
153 .browser
154 .iter()
155 .filter(|flow| flow.target == target && browser_flow_ids.contains(flow.state.as_str()))
156 .map(persisted_browser_snapshot_key)
157 .collect::<BTreeSet<_>>();
158 let mut removed_device = snapshot
159 .device
160 .iter()
161 .filter(|flow| {
162 flow.target == target && device_flow_ids.contains(flow.device_code.as_str())
163 })
164 .map(persisted_device_snapshot_key)
165 .collect::<BTreeSet<_>>();
166 if let Some(durable) =
167 load_oauth_snapshot_for_release(&self.store, "release_oauth_flow_payloads")?
168 {
169 removed_browser.extend(
170 durable
171 .browser
172 .iter()
173 .filter(|flow| {
174 flow.target == target
175 && browser_flow_ids.contains(flow.state.as_str())
176 && flow.expires_at_millis > now_millis
177 })
178 .map(persisted_browser_snapshot_key),
179 );
180 removed_device.extend(
181 durable
182 .device
183 .iter()
184 .filter(|flow| {
185 flow.target == target
186 && device_flow_ids.contains(flow.device_code.as_str())
187 && flow.expires_at_millis > now_millis
188 })
189 .map(persisted_device_snapshot_key),
190 );
191 }
192 let removed_browser = removed_browser.into_iter().collect::<Vec<_>>();
193 let removed_device = removed_device.into_iter().collect::<Vec<_>>();
194 snapshot.browser.retain(|flow| {
195 !(flow.target == target && browser_flow_ids.contains(flow.state.as_str()))
196 });
197 snapshot.device.retain(|flow| {
198 !(flow.target == target && device_flow_ids.contains(flow.device_code.as_str()))
199 });
200 persist_registry_snapshot(
201 &snapshot,
202 &self.store,
203 "release_oauth_flow_payloads",
204 &removed_browser,
205 &removed_device,
206 now_millis,
207 SnapshotPersistPolicy::merge(),
208 )
209 .map_err(|err| {
210 DslTransitionError::new(
211 "AuthLeaseReleaseObserver::release_oauth_flow_payloads",
212 err.to_string(),
213 )
214 })?;
215 let _ = self.registry.retain_flows_with_lifecycle(
216 |record_target, flow_id| {
217 !(record_target == &target && browser_flow_ids.contains(flow_id))
218 },
219 |record_target, device_code| {
220 !(record_target == &target && device_flow_ids.contains(device_code))
221 },
222 );
223 Ok(())
224 }
225}
226
227impl RuntimeOAuthFlowHandle {
228 pub fn new(ttl: Duration) -> Self {
229 Self::new_with_auth_lease(ttl, Arc::new(RuntimeAuthLeaseHandle::new()))
230 }
231
232 pub fn new_with_auth_lease(ttl: Duration, lifecycle: Arc<RuntimeAuthLeaseHandle>) -> Self {
233 Self::new_with_capacity_auth_lease_and_store(ttl, 1024, lifecycle, None)
234 }
235
236 pub fn new_with_capacity(ttl: Duration, max_outstanding: usize) -> Self {
237 Self::new_with_capacity_and_auth_lease(
238 ttl,
239 max_outstanding,
240 Arc::new(RuntimeAuthLeaseHandle::new()),
241 )
242 }
243
244 pub fn new_with_capacity_and_auth_lease(
245 ttl: Duration,
246 max_outstanding: usize,
247 lifecycle: Arc<RuntimeAuthLeaseHandle>,
248 ) -> Self {
249 Self::new_with_capacity_auth_lease_and_store(ttl, max_outstanding, lifecycle, None)
250 }
251
252 pub fn new_with_persistent_store_and_auth_lease(
253 ttl: Duration,
254 lifecycle: Arc<RuntimeAuthLeaseHandle>,
255 store: &Arc<dyn RuntimeStore>,
256 ) -> Self {
257 Self::new_with_capacity_auth_lease_and_store(
258 ttl,
259 1024,
260 lifecycle,
261 Some(Arc::downgrade(store)),
262 )
263 }
264
265 fn new_with_capacity_auth_lease_and_store(
266 ttl: Duration,
267 max_outstanding: usize,
268 lifecycle: Arc<RuntimeAuthLeaseHandle>,
269 store: Option<Weak<dyn RuntimeStore>>,
270 ) -> Self {
271 let registry = Arc::new(OAuthFlowRegistry::new_with_capacity(ttl, max_outstanding));
272 let store = Arc::new(Mutex::new(store));
273 let payload_lock = Arc::new(Mutex::new(()));
274 let release_observer = Arc::new(OAuthPayloadReleaseObserver {
275 registry: Arc::clone(®istry),
276 store: Arc::clone(&store),
277 payload_lock: Arc::clone(&payload_lock),
278 });
279 let release_observer_dyn: Arc<dyn AuthLeaseReleaseObserver> = release_observer.clone();
280 lifecycle.add_release_observer(Arc::downgrade(&release_observer_dyn));
281 let handle = Self {
282 registry,
283 lifecycle,
284 store,
285 payload_lock,
286 _release_observer: Some(release_observer),
287 };
288 handle.rehydrate_persisted_payloads();
289 handle
290 }
291
292 pub(crate) fn bind_persistent_store(&self, store: &Arc<dyn RuntimeStore>) {
293 *self
294 .store
295 .lock()
296 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Arc::downgrade(store));
297 }
298
299 fn apply(
300 &self,
301 target: &AuthBindingRef,
302 input: auth_dsl::AuthMachineInput,
303 operation: &'static str,
304 create_if_missing: bool,
305 ) -> Result<(), OAuthFlowError> {
306 self.lifecycle
307 .apply_oauth_input(target, input, operation, create_if_missing)
308 .map_err(|err| OAuthFlowError::LifecycleRejected {
309 operation,
310 detail: err.to_string(),
311 })
312 }
313
314 fn admit_browser(
315 &self,
316 target: &AuthBindingRef,
317 flow_id: &str,
318 provider: OAuthProviderIdentity,
319 redirect_uri: &str,
320 expires_at_millis: u64,
321 ) -> Result<(), OAuthFlowError> {
322 self.apply(
323 target,
324 auth_dsl::AuthMachineInput::AdmitOAuthBrowserFlow {
325 flow_id: flow_id.to_string(),
326 provider: provider.canonical_alias().to_string(),
327 redirect_uri: redirect_uri.to_string(),
328 expires_at_millis,
329 max_outstanding_flows: self.registry.max_outstanding() as u64,
330 },
331 "admit_oauth_browser_flow",
332 true,
333 )
334 }
335
336 fn verify_browser(
337 &self,
338 target: &AuthBindingRef,
339 flow_id: &str,
340 provider: OAuthProviderIdentity,
341 redirect_uri: &str,
342 ) -> Result<(), OAuthFlowError> {
343 self.apply(
344 target,
345 auth_dsl::AuthMachineInput::VerifyOAuthBrowserFlow {
346 flow_id: flow_id.to_string(),
347 provider: provider.canonical_alias().to_string(),
348 redirect_uri: redirect_uri.to_string(),
349 now_millis: current_time_millis(),
350 },
351 "verify_oauth_browser_flow",
352 false,
353 )
354 }
355
356 fn consume_browser(
357 &self,
358 target: &AuthBindingRef,
359 flow_id: &str,
360 provider: OAuthProviderIdentity,
361 redirect_uri: &str,
362 ) -> Result<(), OAuthFlowError> {
363 self.apply(
364 target,
365 auth_dsl::AuthMachineInput::ConsumeOAuthBrowserFlow {
366 flow_id: flow_id.to_string(),
367 provider: provider.canonical_alias().to_string(),
368 redirect_uri: redirect_uri.to_string(),
369 now_millis: current_time_millis(),
370 },
371 "consume_oauth_browser_flow",
372 false,
373 )
374 }
375
376 fn expire_browser(&self, target: &AuthBindingRef, flow_id: &str) -> Result<(), OAuthFlowError> {
377 self.apply(
378 target,
379 auth_dsl::AuthMachineInput::ExpireOAuthBrowserFlow {
380 flow_id: flow_id.to_string(),
381 },
382 "expire_oauth_browser_flow",
383 false,
384 )
385 }
386
387 fn admit_device(
388 &self,
389 target: &AuthBindingRef,
390 flow_id: &str,
391 provider: OAuthProviderIdentity,
392 expires_at_millis: u64,
393 ) -> Result<(), OAuthFlowError> {
394 self.apply(
395 target,
396 auth_dsl::AuthMachineInput::AdmitOAuthDeviceFlow {
397 flow_id: flow_id.to_string(),
398 provider: provider.canonical_alias().to_string(),
399 expires_at_millis,
400 max_outstanding_flows: self.registry.max_outstanding() as u64,
401 },
402 "admit_oauth_device_flow",
403 true,
404 )
405 }
406
407 fn verify_device(
408 &self,
409 target: &AuthBindingRef,
410 flow_id: &str,
411 provider: OAuthProviderIdentity,
412 ) -> Result<(), OAuthFlowError> {
413 self.apply(
414 target,
415 auth_dsl::AuthMachineInput::VerifyOAuthDeviceFlow {
416 flow_id: flow_id.to_string(),
417 provider: provider.canonical_alias().to_string(),
418 now_millis: current_time_millis(),
419 },
420 "verify_oauth_device_flow",
421 false,
422 )
423 }
424
425 fn begin_device_poll(
426 &self,
427 target: &AuthBindingRef,
428 flow_id: &str,
429 provider: OAuthProviderIdentity,
430 ) -> Result<(), OAuthFlowError> {
431 self.apply(
432 target,
433 auth_dsl::AuthMachineInput::BeginOAuthDevicePoll {
434 flow_id: flow_id.to_string(),
435 provider: provider.canonical_alias().to_string(),
436 now_millis: current_time_millis(),
437 },
438 "begin_oauth_device_poll",
439 false,
440 )
441 }
442
443 fn expire_pruned_flows(&self) {
444 self.expire_collected_flows(OAuthPrunedFlows {
445 browser: self.registry.prune_expired_browser_flows(),
446 device: self.registry.prune_expired_device_flows(),
447 });
448 }
449
450 fn retain_registry_payloads_with_lifecycle(
451 &self,
452 ) -> (OAuthPrunedFlows, OAuthFlowRegistrySnapshot) {
453 let before = self
454 .registry
455 .snapshot_for_persistence(current_time_millis());
456 let pruned = self.registry.retain_flows_with_lifecycle(
457 |target, flow_id| self.lifecycle.has_oauth_browser_flow(target, flow_id),
458 |target, flow_id| self.lifecycle.has_oauth_device_flow(target, flow_id),
459 );
460 (pruned, before)
461 }
462
463 fn expire_collected_flows(&self, pruned: OAuthPrunedFlows) {
464 for (flow_id, target) in pruned.browser {
465 let _ = self.expire_browser(&target, &flow_id);
466 }
467 for (device_code, target) in pruned.device {
468 let _ = self.lifecycle.expire_device_flow(&target, &device_code);
469 }
470 }
471
472 fn removed_snapshot_keys_from_pruned(
473 snapshot: &OAuthFlowRegistrySnapshot,
474 pruned: &OAuthPrunedFlows,
475 ) -> (RemovedBrowserSnapshotKeys, RemovedDeviceSnapshotKeys) {
476 let pruned_browser = pruned
477 .browser
478 .iter()
479 .map(|(flow_id, target)| browser_snapshot_key(target, flow_id))
480 .collect::<BTreeSet<_>>();
481 let pruned_device = pruned
482 .device
483 .iter()
484 .map(|(device_code, target)| device_snapshot_key(target, device_code))
485 .collect::<BTreeSet<_>>();
486 let browser = snapshot
487 .browser
488 .iter()
489 .filter(|flow| pruned_browser.contains(&persisted_browser_snapshot_key(flow)))
490 .map(persisted_browser_snapshot_key)
491 .collect();
492 let device = snapshot
493 .device
494 .iter()
495 .filter(|flow| pruned_device.contains(&persisted_device_snapshot_key(flow)))
496 .map(persisted_device_snapshot_key)
497 .collect();
498 (browser, device)
499 }
500
501 fn store(&self) -> Option<Arc<dyn RuntimeStore>> {
502 self.store
503 .lock()
504 .unwrap_or_else(std::sync::PoisonError::into_inner)
505 .as_ref()
506 .and_then(Weak::upgrade)
507 }
508
509 fn persist_registry_payloads_removing(
510 &self,
511 operation: &'static str,
512 removed_browser: &[BrowserSnapshotKey],
513 removed_device: &[DeviceSnapshotKey],
514 ) -> Result<(), OAuthFlowError> {
515 persist_registry_payloads(
516 &self.registry,
517 &self.store,
518 operation,
519 removed_browser,
520 removed_device,
521 )
522 }
523
524 fn persist_registry_payloads_claiming_removal(
525 &self,
526 operation: &'static str,
527 removed_browser: &[BrowserSnapshotKey],
528 removed_device: &[DeviceSnapshotKey],
529 ) -> Result<(), OAuthFlowError> {
530 persist_registry_payloads_claiming_removal(
531 &self.registry,
532 &self.store,
533 operation,
534 removed_browser,
535 removed_device,
536 )
537 }
538
539 fn persist_registry_payloads_claiming_admission(
540 &self,
541 operation: &'static str,
542 removed_browser: &[BrowserSnapshotKey],
543 removed_device: &[DeviceSnapshotKey],
544 admitted_browser: &[BrowserSnapshotKey],
545 admitted_device: &[DeviceSnapshotKey],
546 ) -> Result<(), OAuthFlowError> {
547 persist_registry_payloads_claiming_admission(
548 &self.registry,
549 &self.store,
550 operation,
551 removed_browser,
552 removed_device,
553 admitted_browser,
554 admitted_device,
555 )
556 }
557
558 fn browser_record_expires_at_millis(
559 &self,
560 record: &OAuthFlowRecord,
561 ) -> Result<u64, OAuthFlowError> {
562 let remaining = self
563 .registry
564 .ttl()
565 .checked_sub(record.created_at.elapsed())
566 .ok_or(OAuthFlowError::Missing)?;
567 expires_at_millis(remaining)
568 }
569
570 fn restore_browser_flow(
571 &self,
572 state: &str,
573 record: &OAuthFlowRecord,
574 ) -> Result<(), OAuthFlowError> {
575 let expires_at_millis = self.browser_record_expires_at_millis(record)?;
576 self.admit_browser(
577 &record.target,
578 state,
579 record.provider,
580 &record.redirect_uri,
581 expires_at_millis,
582 )?;
583 self.registry.insert_restored_browser_flow(
584 state.to_string(),
585 record.target.clone(),
586 record.provider,
587 record.redirect_uri.clone(),
588 record.pkce_verifier.clone(),
589 record.created_at,
590 )
591 }
592
593 fn rehydrate_persisted_payloads(&self) {
594 let Some(store) = self.store() else {
595 return;
596 };
597 let Ok(Some(bytes)) = store.load_auth_oauth_flow_snapshot() else {
598 return;
599 };
600 let Ok(snapshot) = serde_json::from_slice::<OAuthFlowRegistrySnapshot>(&bytes) else {
601 return;
602 };
603 let now_millis = current_time_millis();
604 let now_instant = Instant::now();
605
606 for persisted in snapshot.browser.iter().cloned() {
607 self.restore_browser_payload(persisted, now_millis, now_instant);
608 }
609 for persisted in snapshot.device.iter().cloned() {
610 self.restore_device_payload(persisted, now_millis, now_instant);
611 }
612 let current = self.registry.snapshot_for_persistence(now_millis);
613 let current_browser = current
614 .browser
615 .iter()
616 .map(persisted_browser_snapshot_key)
617 .collect::<BTreeSet<_>>();
618 let current_device = current
619 .device
620 .iter()
621 .map(persisted_device_snapshot_key)
622 .collect::<BTreeSet<_>>();
623 let removed_browser = snapshot
624 .browser
625 .iter()
626 .filter(|flow| !current_browser.contains(&persisted_browser_snapshot_key(flow)))
627 .map(persisted_browser_snapshot_key)
628 .collect::<Vec<_>>();
629 let removed_device = snapshot
630 .device
631 .iter()
632 .filter(|flow| !current_device.contains(&persisted_device_snapshot_key(flow)))
633 .map(persisted_device_snapshot_key)
634 .collect::<Vec<_>>();
635 let _ = self.persist_registry_payloads_removing(
636 "rehydrate_oauth_flows",
637 &removed_browser,
638 &removed_device,
639 );
640 }
641
642 fn sync_persisted_payloads(&self, operation: &'static str) -> Result<(), OAuthFlowError> {
643 let Some(store) = self.store() else {
644 return Ok(());
645 };
646 let snapshot =
647 match store.load_auth_oauth_flow_snapshot().map_err(|err| {
648 OAuthFlowError::PersistenceFailed {
649 operation,
650 detail: err.to_string(),
651 }
652 })? {
653 Some(bytes) => serde_json::from_slice::<OAuthFlowRegistrySnapshot>(&bytes)
654 .map_err(|err| OAuthFlowError::PersistenceFailed {
655 operation,
656 detail: err.to_string(),
657 })?,
658 None => OAuthFlowRegistrySnapshot::default(),
659 };
660 let now_millis = current_time_millis();
661 let now_instant = Instant::now();
662 let durable_browser = snapshot
663 .browser
664 .iter()
665 .filter(|flow| flow.expires_at_millis > now_millis)
666 .map(persisted_browser_snapshot_key)
667 .collect::<BTreeSet<_>>();
668 let durable_device = snapshot
669 .device
670 .iter()
671 .filter(|flow| flow.expires_at_millis > now_millis)
672 .map(persisted_device_snapshot_key)
673 .collect::<BTreeSet<_>>();
674 let current = self.registry.snapshot_for_persistence(now_millis);
675 for flow in current
676 .browser
677 .iter()
678 .filter(|flow| !durable_browser.contains(&persisted_browser_snapshot_key(flow)))
679 {
680 if let Some(provider) = OAuthProviderIdentity::from_alias(&flow.provider) {
681 let _ =
682 self.registry
683 .consume(&flow.state, &flow.target, provider, &flow.redirect_uri);
684 }
685 let _ = self.expire_browser(&flow.target, &flow.state);
686 }
687 for flow in current
688 .device
689 .iter()
690 .filter(|flow| !durable_device.contains(&persisted_device_snapshot_key(flow)))
691 {
692 if let Some(provider) = OAuthProviderIdentity::from_alias(&flow.provider) {
693 let _ = self
694 .registry
695 .expire_device_code(&flow.device_code, &flow.target, provider);
696 }
697 let _ = self
698 .lifecycle
699 .expire_device_flow(&flow.target, &flow.device_code);
700 }
701 let current = self.registry.snapshot_for_persistence(now_millis);
702 let current_browser = current
703 .browser
704 .iter()
705 .map(persisted_browser_snapshot_key)
706 .collect::<BTreeSet<_>>();
707 let current_device = current
708 .device
709 .iter()
710 .map(persisted_device_snapshot_key)
711 .collect::<BTreeSet<_>>();
712 for persisted in snapshot
713 .browser
714 .iter()
715 .filter(|flow| {
716 flow.expires_at_millis > now_millis
717 && !current_browser.contains(&persisted_browser_snapshot_key(flow))
718 })
719 .cloned()
720 {
721 self.restore_browser_payload(persisted, now_millis, now_instant);
722 }
723 for persisted in snapshot
724 .device
725 .iter()
726 .filter(|flow| {
727 flow.expires_at_millis > now_millis
728 && !current_device.contains(&persisted_device_snapshot_key(flow))
729 })
730 .cloned()
731 {
732 self.restore_device_payload(persisted, now_millis, now_instant);
733 }
734 Ok(())
735 }
736
737 fn restore_browser_payload(
738 &self,
739 persisted: PersistedOAuthBrowserFlow,
740 now_millis: u64,
741 now_instant: Instant,
742 ) {
743 if persisted.expires_at_millis <= now_millis {
744 return;
745 }
746 let Some(provider) = OAuthProviderIdentity::from_alias(&persisted.provider) else {
747 return;
748 };
749 let remaining = Duration::from_millis(persisted.expires_at_millis - now_millis);
750 let elapsed = self.registry.ttl().saturating_sub(remaining);
751 let created_at = now_instant.checked_sub(elapsed).unwrap_or(now_instant);
752 if self
753 .admit_browser(
754 &persisted.target,
755 &persisted.state,
756 provider,
757 &persisted.redirect_uri,
758 persisted.expires_at_millis,
759 )
760 .is_err()
761 {
762 return;
763 }
764 if self
765 .registry
766 .insert_restored_browser_flow(
767 persisted.state.clone(),
768 persisted.target.clone(),
769 provider,
770 persisted.redirect_uri.clone(),
771 persisted.pkce_verifier.clone(),
772 created_at,
773 )
774 .is_err()
775 {
776 let _ = self.expire_browser(&persisted.target, &persisted.state);
777 }
778 }
779
780 fn restore_device_payload(
781 &self,
782 persisted: PersistedOAuthDeviceFlow,
783 now_millis: u64,
784 now_instant: Instant,
785 ) {
786 if persisted.expires_at_millis <= now_millis {
787 return;
788 }
789 let Some(provider) = OAuthProviderIdentity::from_alias(&persisted.provider) else {
790 return;
791 };
792 let remaining = Duration::from_millis(persisted.expires_at_millis - now_millis);
793 let expires_at = now_instant.checked_add(remaining).unwrap_or(now_instant);
794 let elapsed = Duration::from_millis(now_millis.saturating_sub(persisted.created_at_millis));
795 let created_at = now_instant.checked_sub(elapsed).unwrap_or(now_instant);
796 if self
797 .admit_device(
798 &persisted.target,
799 &persisted.device_code,
800 provider,
801 persisted.expires_at_millis,
802 )
803 .is_err()
804 {
805 return;
806 }
807 if self
808 .registry
809 .insert_restored_device_flow(
810 persisted.target.clone(),
811 provider,
812 persisted.device_code.clone(),
813 created_at,
814 expires_at,
815 )
816 .is_err()
817 {
818 let _ = self
819 .lifecycle
820 .expire_device_flow(&persisted.target, &persisted.device_code);
821 }
822 }
823}
824
825fn persist_registry_payloads(
826 registry: &OAuthFlowRegistry,
827 store: &StoreSlot,
828 operation: &'static str,
829 removed_browser: &[BrowserSnapshotKey],
830 removed_device: &[DeviceSnapshotKey],
831) -> Result<(), OAuthFlowError> {
832 let now_millis = current_time_millis();
833 let snapshot = registry.snapshot_for_persistence(now_millis);
834 persist_registry_snapshot(
835 &snapshot,
836 store,
837 operation,
838 removed_browser,
839 removed_device,
840 now_millis,
841 SnapshotPersistPolicy::merge(),
842 )
843}
844
845fn persist_existing_registry_payloads(
846 registry: &OAuthFlowRegistry,
847 store: &StoreSlot,
848 operation: &'static str,
849) -> Result<(), OAuthFlowError> {
850 let now_millis = current_time_millis();
851 let snapshot = registry.snapshot_for_persistence(now_millis);
852 persist_registry_snapshot(
853 &snapshot,
854 store,
855 operation,
856 &[],
857 &[],
858 now_millis,
859 SnapshotPersistPolicy::merge_existing(),
860 )
861}
862
863fn persist_registry_payloads_claiming_removal(
864 registry: &OAuthFlowRegistry,
865 store: &StoreSlot,
866 operation: &'static str,
867 removed_browser: &[BrowserSnapshotKey],
868 removed_device: &[DeviceSnapshotKey],
869) -> Result<(), OAuthFlowError> {
870 let now_millis = current_time_millis();
871 let snapshot = registry.snapshot_for_persistence(now_millis);
872 persist_registry_snapshot(
873 &snapshot,
874 store,
875 operation,
876 removed_browser,
877 removed_device,
878 now_millis,
879 SnapshotPersistPolicy::claim_removal(),
880 )
881}
882
883fn persist_registry_payloads_claiming_admission(
884 registry: &OAuthFlowRegistry,
885 store: &StoreSlot,
886 operation: &'static str,
887 removed_browser: &[BrowserSnapshotKey],
888 removed_device: &[DeviceSnapshotKey],
889 admitted_browser: &[BrowserSnapshotKey],
890 admitted_device: &[DeviceSnapshotKey],
891) -> Result<(), OAuthFlowError> {
892 let now_millis = current_time_millis();
893 let snapshot = registry.snapshot_for_persistence(now_millis);
894 persist_registry_snapshot(
895 &snapshot,
896 store,
897 operation,
898 removed_browser,
899 removed_device,
900 now_millis,
901 SnapshotPersistPolicy::claim_admission(
902 registry.max_outstanding(),
903 admitted_browser,
904 admitted_device,
905 ),
906 )
907}
908
909#[derive(Clone, Copy, Debug, Eq, PartialEq)]
910enum SnapshotRemovalMode {
911 Merge,
912 Claim,
913}
914
915#[derive(Clone, Copy, Debug, Eq, PartialEq)]
916struct SnapshotPersistPolicy<'a> {
917 removal_mode: SnapshotRemovalMode,
918 admission_capacity: Option<usize>,
919 admitted_browser: &'a [BrowserSnapshotKey],
920 admitted_device: &'a [DeviceSnapshotKey],
921}
922
923impl<'a> SnapshotPersistPolicy<'a> {
924 fn merge() -> Self {
925 Self {
926 removal_mode: SnapshotRemovalMode::Merge,
927 admission_capacity: None,
928 admitted_browser: &[],
929 admitted_device: &[],
930 }
931 }
932
933 fn merge_existing() -> Self {
934 Self::merge()
935 }
936
937 fn claim_removal() -> Self {
938 Self {
939 removal_mode: SnapshotRemovalMode::Claim,
940 admission_capacity: None,
941 admitted_browser: &[],
942 admitted_device: &[],
943 }
944 }
945
946 fn claim_admission(
947 max_outstanding: usize,
948 admitted_browser: &'a [BrowserSnapshotKey],
949 admitted_device: &'a [DeviceSnapshotKey],
950 ) -> Self {
951 Self {
952 removal_mode: SnapshotRemovalMode::Merge,
953 admission_capacity: Some(max_outstanding),
954 admitted_browser,
955 admitted_device,
956 }
957 }
958}
959
960fn persist_registry_snapshot(
961 snapshot: &OAuthFlowRegistrySnapshot,
962 store: &StoreSlot,
963 operation: &'static str,
964 removed_browser: &[BrowserSnapshotKey],
965 removed_device: &[DeviceSnapshotKey],
966 now_millis: u64,
967 policy: SnapshotPersistPolicy<'_>,
968) -> Result<(), OAuthFlowError> {
969 let store = store
970 .lock()
971 .unwrap_or_else(std::sync::PoisonError::into_inner)
972 .clone();
973 let Some(store) = store else {
974 return Ok(());
975 };
976 let Some(store) = store.upgrade() else {
977 return Err(OAuthFlowError::PersistenceFailed {
978 operation,
979 detail: "runtime store is no longer available".to_string(),
980 });
981 };
982 let mut update = |current: Option<&[u8]>| -> Result<Vec<u8>, crate::store::RuntimeStoreError> {
983 let merged = merge_oauth_registry_snapshot(
984 current,
985 snapshot,
986 removed_browser,
987 removed_device,
988 now_millis,
989 policy,
990 )?;
991 serde_json::to_vec(&merged)
992 .map_err(|err| crate::store::RuntimeStoreError::WriteFailed(err.to_string()))
993 };
994 match store.update_auth_oauth_flow_snapshot(&mut update) {
995 Ok(()) => Ok(()),
996 Err(crate::store::RuntimeStoreError::NotFound(_))
997 if policy.removal_mode == SnapshotRemovalMode::Claim =>
998 {
999 Err(OAuthFlowError::RegistryProjectionMissing { operation })
1000 }
1001 Err(crate::store::RuntimeStoreError::Internal(detail))
1002 if policy.admission_capacity.is_some() && detail == DURABLE_OAUTH_CAPACITY_EXCEEDED =>
1003 {
1004 Err(OAuthFlowError::CapacityExceeded {
1005 max_outstanding: policy.admission_capacity.unwrap_or(0),
1006 })
1007 }
1008 Err(err) => Err(OAuthFlowError::PersistenceFailed {
1009 operation,
1010 detail: err.to_string(),
1011 }),
1012 }
1013}
1014
1015type BrowserSnapshotKey = (String, String, Option<String>, String);
1016type DeviceSnapshotKey = (String, String, Option<String>, String);
1017const DURABLE_OAUTH_CAPACITY_EXCEEDED: &str = "oauth durable capacity exceeded";
1018
1019fn target_snapshot_key(target: &AuthBindingRef) -> (String, String, Option<String>) {
1020 (
1021 target.realm.to_string(),
1022 target.binding.to_string(),
1023 target.profile.as_ref().map(ToString::to_string),
1024 )
1025}
1026
1027fn browser_snapshot_key(target: &AuthBindingRef, state: &str) -> BrowserSnapshotKey {
1028 let (realm, binding, profile) = target_snapshot_key(target);
1029 (realm, binding, profile, state.to_string())
1030}
1031
1032fn device_snapshot_key(target: &AuthBindingRef, device_code: &str) -> DeviceSnapshotKey {
1033 let (realm, binding, profile) = target_snapshot_key(target);
1034 (realm, binding, profile, device_code.to_string())
1035}
1036
1037fn persisted_browser_snapshot_key(flow: &PersistedOAuthBrowserFlow) -> BrowserSnapshotKey {
1038 browser_snapshot_key(&flow.target, &flow.state)
1039}
1040
1041fn persisted_device_snapshot_key(flow: &PersistedOAuthDeviceFlow) -> DeviceSnapshotKey {
1042 device_snapshot_key(&flow.target, &flow.device_code)
1043}
1044
1045fn ensure_removed_flows_are_active(
1046 current: &OAuthFlowRegistrySnapshot,
1047 removed_browser: &BTreeSet<BrowserSnapshotKey>,
1048 removed_device: &BTreeSet<DeviceSnapshotKey>,
1049 now_millis: u64,
1050) -> Result<(), crate::store::RuntimeStoreError> {
1051 let active_browser = current
1052 .browser
1053 .iter()
1054 .filter(|flow| flow.expires_at_millis > now_millis)
1055 .map(persisted_browser_snapshot_key)
1056 .collect::<BTreeSet<_>>();
1057 for key in removed_browser {
1058 if !active_browser.contains(key) {
1059 return Err(crate::store::RuntimeStoreError::NotFound(
1060 "oauth browser flow was already consumed".to_string(),
1061 ));
1062 }
1063 }
1064
1065 let active_device = current
1066 .device
1067 .iter()
1068 .filter(|flow| flow.expires_at_millis > now_millis)
1069 .map(persisted_device_snapshot_key)
1070 .collect::<BTreeSet<_>>();
1071 for key in removed_device {
1072 if !active_device.contains(key) {
1073 return Err(crate::store::RuntimeStoreError::NotFound(
1074 "oauth device flow was already consumed".to_string(),
1075 ));
1076 }
1077 }
1078 Ok(())
1079}
1080
1081fn ensure_merged_snapshot_within_capacity(
1082 merged: &OAuthFlowRegistrySnapshot,
1083 max_outstanding: usize,
1084) -> Result<(), crate::store::RuntimeStoreError> {
1085 if merged.browser.len().saturating_add(merged.device.len()) > max_outstanding {
1086 return Err(crate::store::RuntimeStoreError::Internal(
1087 DURABLE_OAUTH_CAPACITY_EXCEEDED.to_string(),
1088 ));
1089 }
1090 Ok(())
1091}
1092
1093fn merge_oauth_registry_snapshot(
1094 current: Option<&[u8]>,
1095 local: &OAuthFlowRegistrySnapshot,
1096 removed_browser: &[BrowserSnapshotKey],
1097 removed_device: &[DeviceSnapshotKey],
1098 now_millis: u64,
1099 policy: SnapshotPersistPolicy<'_>,
1100) -> Result<OAuthFlowRegistrySnapshot, crate::store::RuntimeStoreError> {
1101 let mut merged = match current {
1102 Some(bytes) => serde_json::from_slice::<OAuthFlowRegistrySnapshot>(bytes)
1103 .map_err(|err| crate::store::RuntimeStoreError::WriteFailed(err.to_string()))?,
1104 None => OAuthFlowRegistrySnapshot::default(),
1105 };
1106 let removed_browser = removed_browser.iter().cloned().collect::<BTreeSet<_>>();
1107 let removed_device = removed_device.iter().cloned().collect::<BTreeSet<_>>();
1108 let admitted_browser = policy
1109 .admitted_browser
1110 .iter()
1111 .cloned()
1112 .collect::<BTreeSet<_>>();
1113 let admitted_device = policy
1114 .admitted_device
1115 .iter()
1116 .cloned()
1117 .collect::<BTreeSet<_>>();
1118 if policy.removal_mode == SnapshotRemovalMode::Claim {
1119 ensure_removed_flows_are_active(&merged, &removed_browser, &removed_device, now_millis)?;
1120 }
1121 let current_browser = merged
1122 .browser
1123 .iter()
1124 .filter(|flow| flow.expires_at_millis > now_millis)
1125 .map(persisted_browser_snapshot_key)
1126 .collect::<BTreeSet<_>>();
1127 let current_device = merged
1128 .device
1129 .iter()
1130 .filter(|flow| flow.expires_at_millis > now_millis)
1131 .map(persisted_device_snapshot_key)
1132 .collect::<BTreeSet<_>>();
1133 let local_browser = local
1134 .browser
1135 .iter()
1136 .map(persisted_browser_snapshot_key)
1137 .collect::<BTreeSet<_>>();
1138 let local_device = local
1139 .device
1140 .iter()
1141 .map(persisted_device_snapshot_key)
1142 .collect::<BTreeSet<_>>();
1143
1144 merged.browser.retain(|flow| {
1145 flow.expires_at_millis > now_millis
1146 && !removed_browser.contains(&persisted_browser_snapshot_key(flow))
1147 && !local_browser.contains(&persisted_browser_snapshot_key(flow))
1148 });
1149 merged.device.retain(|flow| {
1150 flow.expires_at_millis > now_millis
1151 && !removed_device.contains(&persisted_device_snapshot_key(flow))
1152 && !local_device.contains(&persisted_device_snapshot_key(flow))
1153 });
1154 merged.browser.extend(
1155 local
1156 .browser
1157 .iter()
1158 .filter(|flow| {
1159 let key = persisted_browser_snapshot_key(flow);
1160 flow.expires_at_millis > now_millis
1161 && !removed_browser.contains(&key)
1162 && (current_browser.contains(&key) || admitted_browser.contains(&key))
1163 })
1164 .cloned(),
1165 );
1166 merged.device.extend(
1167 local
1168 .device
1169 .iter()
1170 .filter(|flow| {
1171 let key = persisted_device_snapshot_key(flow);
1172 flow.expires_at_millis > now_millis
1173 && !removed_device.contains(&key)
1174 && (current_device.contains(&key) || admitted_device.contains(&key))
1175 })
1176 .cloned(),
1177 );
1178 merged.browser.sort_by_key(persisted_browser_snapshot_key);
1179 merged.device.sort_by_key(persisted_device_snapshot_key);
1180 if let Some(max_outstanding) = policy.admission_capacity {
1181 ensure_merged_snapshot_within_capacity(&merged, max_outstanding)?;
1182 }
1183 Ok(merged)
1184}
1185
1186struct RuntimeOAuthDevicePollLifecycle {
1187 lifecycle: Arc<RuntimeAuthLeaseHandle>,
1188 registry: Arc<OAuthFlowRegistry>,
1189 store: StoreSlot,
1190}
1191
1192impl Default for RuntimeOAuthFlowHandle {
1193 fn default() -> Self {
1194 Self::new(Duration::from_secs(10 * 60))
1195 }
1196}
1197
1198impl OAuthDevicePollLifecycle for RuntimeAuthLeaseHandle {
1199 fn device_flow_state_is_authmachine_owned(&self) -> bool {
1200 true
1201 }
1202
1203 fn finish_device_poll(
1204 &self,
1205 target: &AuthBindingRef,
1206 device_code: &str,
1207 ) -> Result<(), OAuthFlowError> {
1208 self.apply_oauth_input(
1209 target,
1210 auth_dsl::AuthMachineInput::FinishOAuthDevicePoll {
1211 flow_id: device_code.to_string(),
1212 },
1213 "finish_oauth_device_poll",
1214 false,
1215 )
1216 .map_err(|err| OAuthFlowError::LifecycleRejected {
1217 operation: "finish_oauth_device_poll",
1218 detail: err.to_string(),
1219 })
1220 }
1221
1222 fn consume_device_flow(
1223 &self,
1224 target: &AuthBindingRef,
1225 device_code: &str,
1226 provider: OAuthProviderIdentity,
1227 ) -> Result<(), OAuthFlowError> {
1228 self.apply_oauth_input(
1229 target,
1230 auth_dsl::AuthMachineInput::ConsumeOAuthDeviceFlow {
1231 flow_id: device_code.to_string(),
1232 provider: provider.canonical_alias().to_string(),
1233 now_millis: current_time_millis(),
1234 },
1235 "consume_oauth_device_flow",
1236 false,
1237 )
1238 .map_err(|err| OAuthFlowError::LifecycleRejected {
1239 operation: "consume_oauth_device_flow",
1240 detail: err.to_string(),
1241 })
1242 }
1243
1244 fn expire_device_flow(
1245 &self,
1246 target: &AuthBindingRef,
1247 device_code: &str,
1248 ) -> Result<(), OAuthFlowError> {
1249 self.apply_oauth_input(
1250 target,
1251 auth_dsl::AuthMachineInput::ExpireOAuthDeviceFlow {
1252 flow_id: device_code.to_string(),
1253 },
1254 "expire_oauth_device_flow",
1255 false,
1256 )
1257 .map_err(|err| OAuthFlowError::LifecycleRejected {
1258 operation: "expire_oauth_device_flow",
1259 detail: err.to_string(),
1260 })
1261 }
1262
1263 fn restore_device_flow(&self, record: &OAuthDeviceFlowRecord) -> Result<(), OAuthFlowError> {
1264 let remaining = record
1265 .expires_at
1266 .checked_duration_since(Instant::now())
1267 .ok_or(OAuthFlowError::Missing)?;
1268 let expires_at_millis = expires_at_millis(remaining)?;
1269 self.apply_oauth_input(
1270 &record.target,
1271 auth_dsl::AuthMachineInput::AdmitOAuthDeviceFlow {
1272 flow_id: record.device_code.clone(),
1273 provider: record.provider.canonical_alias().to_string(),
1274 expires_at_millis,
1275 max_outstanding_flows: u64::MAX,
1276 },
1277 "restore_oauth_device_flow",
1278 true,
1279 )
1280 .map_err(|err| OAuthFlowError::LifecycleRejected {
1281 operation: "restore_oauth_device_flow",
1282 detail: err.to_string(),
1283 })
1284 }
1285}
1286
1287impl OAuthDevicePollLifecycle for RuntimeOAuthDevicePollLifecycle {
1288 fn device_flow_state_is_authmachine_owned(&self) -> bool {
1289 true
1290 }
1291
1292 fn finish_device_poll(
1293 &self,
1294 target: &AuthBindingRef,
1295 device_code: &str,
1296 ) -> Result<(), OAuthFlowError> {
1297 self.lifecycle.finish_device_poll(target, device_code)
1298 }
1299
1300 fn consume_device_flow(
1301 &self,
1302 target: &AuthBindingRef,
1303 device_code: &str,
1304 provider: OAuthProviderIdentity,
1305 ) -> Result<(), OAuthFlowError> {
1306 self.lifecycle
1307 .consume_device_flow(target, device_code, provider)
1308 }
1309
1310 fn expire_device_flow(
1311 &self,
1312 target: &AuthBindingRef,
1313 device_code: &str,
1314 ) -> Result<(), OAuthFlowError> {
1315 self.lifecycle.expire_device_flow(target, device_code)
1316 }
1317
1318 fn restore_device_flow(&self, record: &OAuthDeviceFlowRecord) -> Result<(), OAuthFlowError> {
1319 let remaining = record
1320 .expires_at
1321 .checked_duration_since(Instant::now())
1322 .ok_or(OAuthFlowError::Missing)?;
1323 let expires_at_millis = expires_at_millis(remaining)?;
1324 self.lifecycle
1325 .apply_oauth_input(
1326 &record.target,
1327 auth_dsl::AuthMachineInput::AdmitOAuthDeviceFlow {
1328 flow_id: record.device_code.clone(),
1329 provider: record.provider.canonical_alias().to_string(),
1330 expires_at_millis,
1331 max_outstanding_flows: self.registry.max_outstanding() as u64,
1332 },
1333 "restore_oauth_device_flow",
1334 true,
1335 )
1336 .map_err(|err| OAuthFlowError::LifecycleRejected {
1337 operation: "restore_oauth_device_flow",
1338 detail: err.to_string(),
1339 })
1340 }
1341
1342 fn device_flow_payloads_changed(&self) -> Result<(), OAuthFlowError> {
1343 persist_existing_registry_payloads(
1344 &self.registry,
1345 &self.store,
1346 "persist_oauth_device_flow_payloads",
1347 )
1348 }
1349
1350 fn device_flow_payload_removed(
1351 &self,
1352 record: &OAuthDeviceFlowRecord,
1353 ) -> Result<(), OAuthFlowError> {
1354 let removed = [device_snapshot_key(&record.target, &record.device_code)];
1355 persist_registry_payloads_claiming_removal(
1356 &self.registry,
1357 &self.store,
1358 "consume_oauth_device_flow",
1359 &[],
1360 &removed,
1361 )
1362 }
1363}
1364
1365impl OAuthFlowAuthority for RuntimeOAuthFlowHandle {
1366 fn terminal_flow_state_is_authmachine_owned(&self) -> bool {
1367 true
1368 }
1369
1370 fn start(
1371 &self,
1372 target: AuthBindingRef,
1373 provider: OAuthProviderIdentity,
1374 redirect_uri: String,
1375 pkce_verifier: String,
1376 ) -> Result<String, OAuthFlowError> {
1377 let _payload_guard = self
1378 .payload_lock
1379 .lock()
1380 .unwrap_or_else(std::sync::PoisonError::into_inner);
1381 self.sync_persisted_payloads("admit_oauth_browser_flow")?;
1382 self.expire_pruned_flows();
1383 let state = OAuthFlowRegistry::new_state()?;
1384 let expires_at = expires_at_millis(self.registry.ttl())?;
1385 self.admit_browser(&target, &state, provider, &redirect_uri, expires_at)?;
1386 let mut inserted = self.registry.insert_browser_flow_with_pruned(
1387 state.clone(),
1388 target.clone(),
1389 provider,
1390 redirect_uri.clone(),
1391 pkce_verifier.clone(),
1392 );
1393 let mut lifecycle_pruned = OAuthPrunedFlows::default();
1394 let mut lifecycle_pruned_snapshot = None;
1395 if matches!(inserted, Err(OAuthFlowError::CapacityExceeded { .. })) {
1396 let (pruned, snapshot) = self.retain_registry_payloads_with_lifecycle();
1397 lifecycle_pruned = pruned;
1398 lifecycle_pruned_snapshot = Some(snapshot);
1399 inserted = self.registry.insert_browser_flow_with_pruned(
1400 state.clone(),
1401 target.clone(),
1402 provider,
1403 redirect_uri.clone(),
1404 pkce_verifier,
1405 );
1406 }
1407 let pruned = match inserted {
1408 Ok(pruned) => pruned,
1409 Err(err) => {
1410 let _ = self.expire_browser(&target, &state);
1411 return Err(err);
1412 }
1413 };
1414 let (removed_browser, removed_device) =
1415 if let Some(snapshot) = lifecycle_pruned_snapshot.as_ref() {
1416 Self::removed_snapshot_keys_from_pruned(snapshot, &lifecycle_pruned)
1417 } else {
1418 (Vec::new(), Vec::new())
1419 };
1420 self.expire_collected_flows(pruned);
1421 let admitted_browser = [browser_snapshot_key(&target, &state)];
1422 if let Err(err) = self.persist_registry_payloads_claiming_admission(
1423 "admit_oauth_browser_flow",
1424 &removed_browser,
1425 &removed_device,
1426 &admitted_browser,
1427 &[],
1428 ) {
1429 let _ = self
1430 .registry
1431 .consume(&state, &target, provider, &redirect_uri);
1432 let _ = self.expire_browser(&target, &state);
1433 return Err(err);
1434 }
1435 Ok(state)
1436 }
1437
1438 fn verify(
1439 &self,
1440 state: &str,
1441 target: &AuthBindingRef,
1442 provider: OAuthProviderIdentity,
1443 redirect_uri: &str,
1444 ) -> Result<OAuthFlowRecord, OAuthFlowError> {
1445 let _payload_guard = self
1446 .payload_lock
1447 .lock()
1448 .unwrap_or_else(std::sync::PoisonError::into_inner);
1449 self.sync_persisted_payloads("verify_oauth_browser_flow")?;
1450 self.expire_pruned_flows();
1451 self.verify_browser(target, state, provider, redirect_uri)?;
1452 match self.registry.verify(state, target, provider, redirect_uri) {
1453 Ok(record) => Ok(record),
1454 Err(OAuthFlowError::Missing) => Err(OAuthFlowError::RegistryProjectionMissing {
1455 operation: "verify_oauth_browser_flow",
1456 }),
1457 Err(err) => Err(err),
1458 }
1459 }
1460
1461 fn consume(
1462 &self,
1463 state: &str,
1464 target: &AuthBindingRef,
1465 provider: OAuthProviderIdentity,
1466 redirect_uri: &str,
1467 ) -> Result<OAuthFlowRecord, OAuthFlowError> {
1468 let _payload_guard = self
1469 .payload_lock
1470 .lock()
1471 .unwrap_or_else(std::sync::PoisonError::into_inner);
1472 self.sync_persisted_payloads("consume_oauth_browser_flow")?;
1473 self.expire_pruned_flows();
1474 self.verify_browser(target, state, provider, redirect_uri)?;
1475 let record = match self.registry.verify(state, target, provider, redirect_uri) {
1476 Ok(record) => record,
1477 Err(OAuthFlowError::Missing) => {
1478 return Err(OAuthFlowError::RegistryProjectionMissing {
1479 operation: "consume_oauth_browser_flow",
1480 });
1481 }
1482 Err(err) => return Err(err),
1483 };
1484 self.consume_browser(target, state, provider, redirect_uri)?;
1485 if let Err(err) = self.registry.consume(state, target, provider, redirect_uri) {
1486 let _ = self.restore_browser_flow(state, &record);
1487 return Err(match err {
1488 OAuthFlowError::Missing => OAuthFlowError::RegistryProjectionMissing {
1489 operation: "consume_oauth_browser_flow",
1490 },
1491 other => other,
1492 });
1493 }
1494 let removed_browser = [browser_snapshot_key(target, state)];
1495 if let Err(err) = self.persist_registry_payloads_claiming_removal(
1496 "consume_oauth_browser_flow",
1497 &removed_browser,
1498 &[],
1499 ) {
1500 if matches!(
1501 err,
1502 OAuthFlowError::Missing | OAuthFlowError::RegistryProjectionMissing { .. }
1503 ) {
1504 return Err(err);
1505 }
1506 let _ = self.restore_browser_flow(state, &record);
1507 return Err(err);
1508 }
1509 Ok(record)
1510 }
1511
1512 fn admit_device_code(
1513 &self,
1514 target: AuthBindingRef,
1515 provider: OAuthProviderIdentity,
1516 device_code: String,
1517 expires_in: Duration,
1518 ) -> Result<(), OAuthFlowError> {
1519 let _payload_guard = self
1520 .payload_lock
1521 .lock()
1522 .unwrap_or_else(std::sync::PoisonError::into_inner);
1523 self.sync_persisted_payloads("admit_oauth_device_flow")?;
1524 self.expire_pruned_flows();
1525 let machine_expires_at = expires_at_millis(expires_in)?;
1526 self.admit_device(&target, &device_code, provider, machine_expires_at)?;
1527 let mut inserted = self.registry.admit_device_code_with_pruned(
1528 target.clone(),
1529 provider,
1530 device_code.clone(),
1531 expires_in,
1532 );
1533 let mut lifecycle_pruned = OAuthPrunedFlows::default();
1534 let mut lifecycle_pruned_snapshot = None;
1535 if matches!(inserted, Err(OAuthFlowError::CapacityExceeded { .. })) {
1536 let (pruned, snapshot) = self.retain_registry_payloads_with_lifecycle();
1537 lifecycle_pruned = pruned;
1538 lifecycle_pruned_snapshot = Some(snapshot);
1539 inserted = self.registry.admit_device_code_with_pruned(
1540 target.clone(),
1541 provider,
1542 device_code.clone(),
1543 expires_in,
1544 );
1545 }
1546 let pruned = match inserted {
1547 Ok(pruned) => pruned,
1548 Err(err) => {
1549 let _ = self.lifecycle.expire_device_flow(&target, &device_code);
1550 return Err(err);
1551 }
1552 };
1553 let (removed_browser, removed_device) =
1554 if let Some(snapshot) = lifecycle_pruned_snapshot.as_ref() {
1555 Self::removed_snapshot_keys_from_pruned(snapshot, &lifecycle_pruned)
1556 } else {
1557 (Vec::new(), Vec::new())
1558 };
1559 self.expire_collected_flows(pruned);
1560 let admitted_device = [device_snapshot_key(&target, &device_code)];
1561 if let Err(err) = self.persist_registry_payloads_claiming_admission(
1562 "admit_oauth_device_flow",
1563 &removed_browser,
1564 &removed_device,
1565 &[],
1566 &admitted_device,
1567 ) {
1568 let _ = self
1569 .registry
1570 .expire_device_code(&device_code, &target, provider);
1571 let _ = self.lifecycle.expire_device_flow(&target, &device_code);
1572 return Err(err);
1573 }
1574 Ok(())
1575 }
1576
1577 fn verify_device_code(
1578 &self,
1579 device_code: &str,
1580 target: &AuthBindingRef,
1581 provider: OAuthProviderIdentity,
1582 ) -> Result<OAuthDeviceFlowRecord, OAuthFlowError> {
1583 let _payload_guard = self
1584 .payload_lock
1585 .lock()
1586 .unwrap_or_else(std::sync::PoisonError::into_inner);
1587 self.sync_persisted_payloads("verify_oauth_device_flow")?;
1588 self.expire_pruned_flows();
1589 self.verify_device(target, device_code, provider)?;
1590 match self
1591 .registry
1592 .verify_device_code(device_code, target, provider)
1593 {
1594 Ok(record) => Ok(record),
1595 Err(OAuthFlowError::Missing) => Err(OAuthFlowError::RegistryProjectionMissing {
1596 operation: "verify_oauth_device_flow",
1597 }),
1598 Err(err) => Err(err),
1599 }
1600 }
1601
1602 fn begin_device_code_poll(
1603 &self,
1604 device_code: &str,
1605 target: &AuthBindingRef,
1606 provider: OAuthProviderIdentity,
1607 ) -> Result<OAuthDevicePollLease, OAuthFlowError> {
1608 let _payload_guard = self
1609 .payload_lock
1610 .lock()
1611 .unwrap_or_else(std::sync::PoisonError::into_inner);
1612 self.sync_persisted_payloads("begin_oauth_device_poll")?;
1613 self.expire_pruned_flows();
1614 self.begin_device_poll(target, device_code, provider)?;
1615 let poll = match self
1616 .registry
1617 .begin_device_code_poll(device_code, target, provider)
1618 {
1619 Ok(poll) => poll,
1620 Err(OAuthFlowError::Missing) => {
1621 let _ = self.lifecycle.finish_device_poll(target, device_code);
1622 return Err(OAuthFlowError::RegistryProjectionMissing {
1623 operation: "begin_oauth_device_poll",
1624 });
1625 }
1626 Err(err) => {
1627 let _ = self.lifecycle.finish_device_poll(target, device_code);
1628 return Err(err);
1629 }
1630 };
1631 let lifecycle: Arc<dyn OAuthDevicePollLifecycle> =
1632 Arc::new(RuntimeOAuthDevicePollLifecycle {
1633 lifecycle: Arc::clone(&self.lifecycle),
1634 registry: Arc::clone(&self.registry),
1635 store: Arc::clone(&self.store),
1636 });
1637 Ok(poll
1638 .with_lifecycle(lifecycle)
1639 .with_operation_lock(Arc::clone(&self.payload_lock)))
1640 }
1641}
1642
1643#[cfg(test)]
1644mod tests {
1645 use std::sync::{
1646 Arc, Condvar, Mutex as StdMutex,
1647 atomic::{AtomicBool, Ordering},
1648 };
1649
1650 use meerkat_core::handles::{AuthLeaseHandle, AuthLeasePhase, LeaseKey};
1651 use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
1652 use meerkat_core::lifecycle::{InputId, RunBoundaryReceipt, RunId};
1653 use meerkat_core::types::SessionId;
1654
1655 use super::*;
1656 use crate::identifiers::LogicalRuntimeId;
1657 use crate::input_state::StoredInputState;
1658 use crate::runtime_state::RuntimeState;
1659 use crate::store::{RuntimeStore, RuntimeStoreError, SessionDelta};
1660
1661 fn target() -> AuthBindingRef {
1662 AuthBindingRef {
1663 realm: meerkat_core::RealmId::parse("dev").expect("valid realm"),
1664 binding: meerkat_core::BindingId::parse("default_openai").expect("valid binding"),
1665 profile: None,
1666 }
1667 }
1668
1669 fn alternate_target() -> AuthBindingRef {
1670 AuthBindingRef {
1671 realm: meerkat_core::RealmId::parse("dev").expect("valid realm"),
1672 binding: meerkat_core::BindingId::parse("secondary_openai").expect("valid binding"),
1673 profile: None,
1674 }
1675 }
1676
1677 #[derive(Debug, Default)]
1678 struct BlockingOAuthPersistState {
1679 armed: bool,
1680 blocked: bool,
1681 released: bool,
1682 }
1683
1684 #[derive(Debug, Default)]
1685 struct FailingOAuthSnapshotStore {
1686 snapshot: StdMutex<Option<Vec<u8>>>,
1687 fail_oauth_persist: AtomicBool,
1688 blocking_oauth_persist: StdMutex<BlockingOAuthPersistState>,
1689 blocking_oauth_persist_cv: Condvar,
1690 }
1691
1692 impl FailingOAuthSnapshotStore {
1693 fn block_next_oauth_persist(&self) {
1694 let mut state = self
1695 .blocking_oauth_persist
1696 .lock()
1697 .expect("blocking persist state lock");
1698 state.armed = true;
1699 state.blocked = false;
1700 state.released = false;
1701 }
1702
1703 fn wait_for_blocked_oauth_persist(&self) {
1704 let mut state = self
1705 .blocking_oauth_persist
1706 .lock()
1707 .expect("blocking persist state lock");
1708 while !state.blocked {
1709 let (next, timeout) = self
1710 .blocking_oauth_persist_cv
1711 .wait_timeout(state, Duration::from_secs(1))
1712 .expect("blocking persist state wait");
1713 assert!(
1714 !timeout.timed_out(),
1715 "expected OAuth snapshot persist to block"
1716 );
1717 state = next;
1718 }
1719 }
1720
1721 fn release_blocked_oauth_persist(&self) {
1722 let mut state = self
1723 .blocking_oauth_persist
1724 .lock()
1725 .expect("blocking persist state lock");
1726 state.released = true;
1727 self.blocking_oauth_persist_cv.notify_all();
1728 }
1729
1730 fn wait_if_oauth_persist_blocked(&self) {
1731 let mut state = self
1732 .blocking_oauth_persist
1733 .lock()
1734 .expect("blocking persist state lock");
1735 if !state.armed {
1736 return;
1737 }
1738 state.armed = false;
1739 state.blocked = true;
1740 self.blocking_oauth_persist_cv.notify_all();
1741 while !state.released {
1742 state = self
1743 .blocking_oauth_persist_cv
1744 .wait(state)
1745 .expect("blocking persist state wait");
1746 }
1747 }
1748
1749 fn fail_oauth_persist(&self) {
1750 self.fail_oauth_persist.store(true, Ordering::SeqCst);
1751 }
1752
1753 fn allow_oauth_persist(&self) {
1754 self.fail_oauth_persist.store(false, Ordering::SeqCst);
1755 }
1756 }
1757
1758 #[async_trait::async_trait]
1759 impl RuntimeStore for FailingOAuthSnapshotStore {
1760 fn persist_auth_oauth_flow_snapshot(
1761 &self,
1762 snapshot_json: &[u8],
1763 ) -> Result<(), RuntimeStoreError> {
1764 self.wait_if_oauth_persist_blocked();
1765 if self.fail_oauth_persist.load(Ordering::SeqCst) {
1766 return Err(RuntimeStoreError::WriteFailed(
1767 "injected oauth snapshot failure".to_string(),
1768 ));
1769 }
1770 *self
1771 .snapshot
1772 .lock()
1773 .map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))? =
1774 Some(snapshot_json.to_vec());
1775 Ok(())
1776 }
1777
1778 fn load_auth_oauth_flow_snapshot(&self) -> Result<Option<Vec<u8>>, RuntimeStoreError> {
1779 self.snapshot
1780 .lock()
1781 .map(|snapshot| snapshot.clone())
1782 .map_err(|err| RuntimeStoreError::ReadFailed(err.to_string()))
1783 }
1784
1785 fn update_auth_oauth_flow_snapshot(
1786 &self,
1787 update: &mut crate::store::AuthOAuthFlowSnapshotUpdate<'_>,
1788 ) -> Result<(), RuntimeStoreError> {
1789 self.wait_if_oauth_persist_blocked();
1790 if self.fail_oauth_persist.load(Ordering::SeqCst) {
1791 return Err(RuntimeStoreError::WriteFailed(
1792 "injected oauth snapshot failure".to_string(),
1793 ));
1794 }
1795 let mut snapshot = self
1796 .snapshot
1797 .lock()
1798 .map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))?;
1799 let next = update(snapshot.as_deref())?;
1800 *snapshot = Some(next);
1801 Ok(())
1802 }
1803
1804 async fn commit_session_snapshot(
1805 &self,
1806 _runtime_id: &LogicalRuntimeId,
1807 _session_delta: SessionDelta,
1808 ) -> Result<(), RuntimeStoreError> {
1809 Err(RuntimeStoreError::Unsupported(
1810 "commit_session_snapshot".to_string(),
1811 ))
1812 }
1813
1814 async fn atomic_apply(
1815 &self,
1816 _runtime_id: &LogicalRuntimeId,
1817 _session_delta: Option<SessionDelta>,
1818 _receipt: RunBoundaryReceipt,
1819 _input_updates: Vec<StoredInputState>,
1820 _session_store_key: Option<SessionId>,
1821 ) -> Result<(), RuntimeStoreError> {
1822 Err(RuntimeStoreError::Unsupported("atomic_apply".to_string()))
1823 }
1824
1825 async fn load_input_states(
1826 &self,
1827 _runtime_id: &LogicalRuntimeId,
1828 ) -> Result<Vec<StoredInputState>, RuntimeStoreError> {
1829 Err(RuntimeStoreError::Unsupported(
1830 "load_input_states".to_string(),
1831 ))
1832 }
1833
1834 async fn load_boundary_receipt(
1835 &self,
1836 _runtime_id: &LogicalRuntimeId,
1837 _run_id: &RunId,
1838 _sequence: u64,
1839 ) -> Result<Option<RunBoundaryReceipt>, RuntimeStoreError> {
1840 Err(RuntimeStoreError::Unsupported(
1841 "load_boundary_receipt".to_string(),
1842 ))
1843 }
1844
1845 async fn load_session_snapshot(
1846 &self,
1847 _runtime_id: &LogicalRuntimeId,
1848 ) -> Result<Option<Vec<u8>>, RuntimeStoreError> {
1849 Err(RuntimeStoreError::Unsupported(
1850 "load_session_snapshot".to_string(),
1851 ))
1852 }
1853
1854 async fn persist_input_state(
1855 &self,
1856 _runtime_id: &LogicalRuntimeId,
1857 _state: &StoredInputState,
1858 ) -> Result<(), RuntimeStoreError> {
1859 Err(RuntimeStoreError::Unsupported(
1860 "persist_input_state".to_string(),
1861 ))
1862 }
1863
1864 async fn load_input_state(
1865 &self,
1866 _runtime_id: &LogicalRuntimeId,
1867 _input_id: &InputId,
1868 ) -> Result<Option<StoredInputState>, RuntimeStoreError> {
1869 Err(RuntimeStoreError::Unsupported(
1870 "load_input_state".to_string(),
1871 ))
1872 }
1873
1874 async fn load_runtime_state(
1875 &self,
1876 _runtime_id: &LogicalRuntimeId,
1877 ) -> Result<Option<RuntimeState>, RuntimeStoreError> {
1878 Err(RuntimeStoreError::Unsupported(
1879 "load_runtime_state".to_string(),
1880 ))
1881 }
1882
1883 async fn commit_machine_lifecycle(
1884 &self,
1885 _runtime_id: &LogicalRuntimeId,
1886 _commit: crate::store::MachineLifecycleCommit,
1887 _input_states: &[StoredInputState],
1888 ) -> Result<(), RuntimeStoreError> {
1889 Err(RuntimeStoreError::Unsupported(
1890 "commit_machine_lifecycle".to_string(),
1891 ))
1892 }
1893 }
1894
1895 fn snapshot_phase(
1896 lifecycle: &RuntimeAuthLeaseHandle,
1897 target: &AuthBindingRef,
1898 ) -> Option<AuthLeasePhase> {
1899 lifecycle
1900 .snapshot(&LeaseKey::from_auth_binding(target))
1901 .phase
1902 }
1903
1904 #[test]
1905 fn browser_flow_only_machine_stays_reauth_required_until_credentials_commit() {
1906 let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
1907 let authority =
1908 RuntimeOAuthFlowHandle::new_with_auth_lease(Duration::from_secs(60), lifecycle.clone());
1909 let target = target();
1910 let provider = OAuthProviderIdentity::OpenAiChatGpt;
1911 let redirect_uri = "http://127.0.0.1/callback";
1912
1913 let state = authority
1914 .start(
1915 target.clone(),
1916 provider,
1917 redirect_uri.to_string(),
1918 "verifier".to_string(),
1919 )
1920 .expect("browser flow admitted");
1921 assert_eq!(
1922 snapshot_phase(&lifecycle, &target),
1923 Some(AuthLeasePhase::ReauthRequired)
1924 );
1925
1926 authority
1927 .verify(&state, &target, provider, redirect_uri)
1928 .expect("browser flow verifies");
1929 assert_eq!(
1930 snapshot_phase(&lifecycle, &target),
1931 Some(AuthLeasePhase::ReauthRequired)
1932 );
1933
1934 authority
1935 .consume(&state, &target, provider, redirect_uri)
1936 .expect("browser flow consumes");
1937 assert_eq!(
1938 snapshot_phase(&lifecycle, &target),
1939 Some(AuthLeasePhase::ReauthRequired)
1940 );
1941 }
1942
1943 #[test]
1944 fn missing_browser_projection_cannot_overwrite_authmachine_flow() {
1945 let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
1946 let authority =
1947 RuntimeOAuthFlowHandle::new_with_auth_lease(Duration::from_secs(60), lifecycle.clone());
1948 let target = target();
1949 let provider = OAuthProviderIdentity::OpenAiChatGpt;
1950 let redirect_uri = "http://127.0.0.1/callback";
1951 let state = authority
1952 .start(
1953 target.clone(),
1954 provider,
1955 redirect_uri.to_string(),
1956 "verifier".to_string(),
1957 )
1958 .expect("browser flow admitted");
1959
1960 authority
1961 .registry
1962 .consume(&state, &target, provider, redirect_uri)
1963 .expect("test removes only the local registry payload");
1964
1965 assert!(matches!(
1966 authority.verify(&state, &target, provider, redirect_uri),
1967 Err(OAuthFlowError::RegistryProjectionMissing {
1968 operation: "verify_oauth_browser_flow"
1969 })
1970 ));
1971 assert!(
1972 lifecycle.has_oauth_browser_flow_for_test(&target, &state),
1973 "missing process-local registry payload must not expire canonical AuthMachine membership"
1974 );
1975
1976 assert!(matches!(
1977 authority.consume(&state, &target, provider, redirect_uri),
1978 Err(OAuthFlowError::RegistryProjectionMissing {
1979 operation: "consume_oauth_browser_flow"
1980 })
1981 ));
1982 assert!(
1983 lifecycle.has_oauth_browser_flow_for_test(&target, &state),
1984 "terminal consume must fail closed instead of converting payload loss to not-found"
1985 );
1986 assert_eq!(
1987 snapshot_phase(&lifecycle, &target),
1988 Some(AuthLeasePhase::ReauthRequired)
1989 );
1990 }
1991
1992 #[test]
1993 fn missing_device_poll_projection_cannot_overwrite_authmachine_flow() {
1994 let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
1995 let authority =
1996 RuntimeOAuthFlowHandle::new_with_auth_lease(Duration::from_secs(60), lifecycle.clone());
1997 let target = target();
1998 let provider = OAuthProviderIdentity::GoogleCodeAssist;
1999 let device_code = "provider-device-code";
2000
2001 authority
2002 .admit_device_code(
2003 target.clone(),
2004 provider,
2005 device_code.to_string(),
2006 Duration::from_secs(60),
2007 )
2008 .expect("device flow admitted");
2009 let poll = authority
2010 .begin_device_code_poll(device_code, &target, provider)
2011 .expect("device poll begins");
2012 authority
2013 .registry
2014 .expire_device_code(device_code, &target, provider)
2015 .expect("test removes only the local registry payload");
2016
2017 assert!(matches!(
2018 poll.consume(),
2019 Err(OAuthFlowError::RegistryProjectionMissing {
2020 operation: "consume_oauth_device_flow"
2021 })
2022 ));
2023 assert!(
2024 lifecycle.has_oauth_device_flow_for_test(&target, device_code),
2025 "missing process-local poll payload must not expire canonical AuthMachine membership"
2026 );
2027 }
2028
2029 #[test]
2030 fn browser_admit_persistence_failure_rolls_back_unreturned_flow() {
2031 let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
2032 let store = Arc::new(FailingOAuthSnapshotStore::default());
2033 let store_dyn = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2034 let authority = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2035 Duration::from_secs(60),
2036 lifecycle.clone(),
2037 &store_dyn,
2038 );
2039 let failed_target = target();
2040 let successful_target = alternate_target();
2041 let provider = OAuthProviderIdentity::OpenAiChatGpt;
2042 let redirect_uri = "http://127.0.0.1/callback";
2043
2044 store.fail_oauth_persist();
2045 assert!(matches!(
2046 authority.start(
2047 failed_target.clone(),
2048 provider,
2049 redirect_uri.to_string(),
2050 "failed-verifier".to_string(),
2051 ),
2052 Err(OAuthFlowError::PersistenceFailed { .. })
2053 ));
2054 assert!(
2055 authority
2056 .registry
2057 .snapshot_for_persistence(current_time_millis())
2058 .browser
2059 .is_empty(),
2060 "failed browser admission must not leave an unreturned registry payload"
2061 );
2062
2063 store.allow_oauth_persist();
2064 let successful_state = authority
2065 .start(
2066 successful_target,
2067 provider,
2068 redirect_uri.to_string(),
2069 "successful-verifier".to_string(),
2070 )
2071 .expect("subsequent browser admit persists after store recovers");
2072 let snapshot_json = store
2073 .load_auth_oauth_flow_snapshot()
2074 .expect("durable OAuth snapshot loads")
2075 .expect("durable OAuth snapshot exists");
2076 let snapshot = serde_json::from_slice::<OAuthFlowRegistrySnapshot>(&snapshot_json)
2077 .expect("durable OAuth snapshot decodes");
2078 assert_eq!(
2079 snapshot
2080 .browser
2081 .iter()
2082 .map(|flow| flow.state.as_str())
2083 .collect::<Vec<_>>(),
2084 vec![successful_state.as_str()],
2085 "a later successful admit must not persist a previously failed unreturned flow"
2086 );
2087 }
2088
2089 #[test]
2090 fn device_admit_persistence_failure_rolls_back_unreturned_flow() {
2091 let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
2092 let store = Arc::new(FailingOAuthSnapshotStore::default());
2093 let store_dyn = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2094 let authority = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2095 Duration::from_secs(60),
2096 lifecycle.clone(),
2097 &store_dyn,
2098 );
2099 let target = target();
2100 let provider = OAuthProviderIdentity::GoogleCodeAssist;
2101 let failed_device_code = "failed-device-code";
2102 let successful_device_code = "successful-device-code";
2103
2104 store.fail_oauth_persist();
2105 assert!(matches!(
2106 authority.admit_device_code(
2107 target.clone(),
2108 provider,
2109 failed_device_code.to_string(),
2110 Duration::from_secs(60),
2111 ),
2112 Err(OAuthFlowError::PersistenceFailed { .. })
2113 ));
2114 assert!(matches!(
2115 authority
2116 .registry
2117 .verify_device_code(failed_device_code, &target, provider),
2118 Err(OAuthFlowError::Missing)
2119 ));
2120 assert!(!lifecycle.has_oauth_device_flow_for_test(&target, failed_device_code));
2121
2122 store.allow_oauth_persist();
2123 authority
2124 .admit_device_code(
2125 target,
2126 provider,
2127 successful_device_code.to_string(),
2128 Duration::from_secs(60),
2129 )
2130 .expect("subsequent device admit persists after store recovers");
2131 let snapshot_json = store
2132 .load_auth_oauth_flow_snapshot()
2133 .expect("durable OAuth snapshot loads")
2134 .expect("durable OAuth snapshot exists");
2135 let snapshot = serde_json::from_slice::<OAuthFlowRegistrySnapshot>(&snapshot_json)
2136 .expect("durable OAuth snapshot decodes");
2137 assert_eq!(
2138 snapshot
2139 .device
2140 .iter()
2141 .map(|flow| flow.device_code.as_str())
2142 .collect::<Vec<_>>(),
2143 vec![successful_device_code],
2144 "a later successful admit must not persist a previously failed unreturned device flow"
2145 );
2146 }
2147
2148 #[cfg(feature = "sqlite-store")]
2149 #[test]
2150 fn persistent_oauth_snapshot_merges_independent_authority_writes() {
2151 let temp_dir = tempfile::tempdir().expect("tempdir");
2152 let store_path = temp_dir.path().join("runtime.sqlite");
2153 let store_one: Arc<dyn RuntimeStore> =
2154 Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2155 let store_two: Arc<dyn RuntimeStore> =
2156 Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2157 let first_authority = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2158 Duration::from_secs(60),
2159 Arc::new(RuntimeAuthLeaseHandle::new()),
2160 &store_one,
2161 );
2162 let second_authority = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2163 Duration::from_secs(60),
2164 Arc::new(RuntimeAuthLeaseHandle::new()),
2165 &store_two,
2166 );
2167 let first_target = target();
2168 let second_target = alternate_target();
2169 let provider = OAuthProviderIdentity::OpenAiChatGpt;
2170
2171 let first_state = first_authority
2172 .start(
2173 first_target.clone(),
2174 provider,
2175 "http://127.0.0.1/callback".to_string(),
2176 "verifier-1".to_string(),
2177 )
2178 .expect("first process admits browser flow");
2179 let second_state = second_authority
2180 .start(
2181 second_target.clone(),
2182 provider,
2183 "http://127.0.0.1/other-callback".to_string(),
2184 "verifier-2".to_string(),
2185 )
2186 .expect("second process admits browser flow");
2187
2188 let store_three: Arc<dyn RuntimeStore> =
2189 Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2190 let snapshot_json = store_three
2191 .load_auth_oauth_flow_snapshot()
2192 .expect("durable OAuth snapshot loads")
2193 .expect("durable OAuth snapshot exists");
2194 let snapshot = serde_json::from_slice::<OAuthFlowRegistrySnapshot>(&snapshot_json)
2195 .expect("durable OAuth snapshot decodes");
2196 assert!(
2197 snapshot
2198 .browser
2199 .iter()
2200 .any(|flow| flow.state == first_state),
2201 "the first independent authority write must survive the second write"
2202 );
2203 assert!(
2204 snapshot
2205 .browser
2206 .iter()
2207 .any(|flow| flow.state == second_state),
2208 "the second independent authority write must be persisted"
2209 );
2210
2211 let restarted = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2212 Duration::from_secs(60),
2213 Arc::new(RuntimeAuthLeaseHandle::new()),
2214 &store_three,
2215 );
2216 restarted
2217 .consume(
2218 &first_state,
2219 &first_target,
2220 provider,
2221 "http://127.0.0.1/callback",
2222 )
2223 .expect("first independent flow rehydrates");
2224 restarted
2225 .consume(
2226 &second_state,
2227 &second_target,
2228 provider,
2229 "http://127.0.0.1/other-callback",
2230 )
2231 .expect("second independent flow rehydrates after first consume");
2232 }
2233
2234 #[test]
2235 fn persistent_oauth_browser_admit_does_not_resurrect_consumed_between_sync_and_persist() {
2236 let store = Arc::new(FailingOAuthSnapshotStore::default());
2237 let store_dyn = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2238 let creator = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2239 Duration::from_secs(60),
2240 Arc::new(RuntimeAuthLeaseHandle::new()),
2241 &store_dyn,
2242 );
2243 let target = target();
2244 let replacement_target = alternate_target();
2245 let provider = OAuthProviderIdentity::OpenAiChatGpt;
2246 let redirect_uri = "http://127.0.0.1/callback";
2247 let replacement_redirect_uri = "http://127.0.0.1/replacement-callback";
2248 let consumed_state = creator
2249 .start(
2250 target.clone(),
2251 provider,
2252 redirect_uri.to_string(),
2253 "consumed-verifier".to_string(),
2254 )
2255 .expect("creator admits browser flow");
2256
2257 let stale_authority = Arc::new(
2258 RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2259 Duration::from_secs(60),
2260 Arc::new(RuntimeAuthLeaseHandle::new()),
2261 &store_dyn,
2262 ),
2263 );
2264 let consumer = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2265 Duration::from_secs(60),
2266 Arc::new(RuntimeAuthLeaseHandle::new()),
2267 &store_dyn,
2268 );
2269
2270 store.block_next_oauth_persist();
2271 let stale_admit = std::thread::spawn({
2272 let stale_authority = Arc::clone(&stale_authority);
2273 let replacement_target = replacement_target.clone();
2274 move || {
2275 stale_authority.start(
2276 replacement_target,
2277 provider,
2278 replacement_redirect_uri.to_string(),
2279 "replacement-verifier".to_string(),
2280 )
2281 }
2282 });
2283 store.wait_for_blocked_oauth_persist();
2284 consumer
2285 .consume(&consumed_state, &target, provider, redirect_uri)
2286 .expect("independent authority consumes browser flow between sync and persist");
2287 store.release_blocked_oauth_persist();
2288 let replacement_state = stale_admit
2289 .join()
2290 .expect("stale browser admit thread should not panic")
2291 .expect("stale authority admits replacement browser flow");
2292
2293 let snapshot_json = store
2294 .load_auth_oauth_flow_snapshot()
2295 .expect("durable OAuth snapshot loads")
2296 .expect("durable OAuth snapshot exists");
2297 let snapshot = serde_json::from_slice::<OAuthFlowRegistrySnapshot>(&snapshot_json)
2298 .expect("durable OAuth snapshot decodes");
2299 assert!(
2300 !snapshot
2301 .browser
2302 .iter()
2303 .any(|flow| flow.state == consumed_state),
2304 "a stale admission must not resurrect a browser flow consumed after pre-sync"
2305 );
2306 assert!(
2307 snapshot
2308 .browser
2309 .iter()
2310 .any(|flow| flow.state == replacement_state),
2311 "the stale authority's newly admitted browser flow should still persist"
2312 );
2313
2314 let restarted = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2315 Duration::from_secs(60),
2316 Arc::new(RuntimeAuthLeaseHandle::new()),
2317 &store_dyn,
2318 );
2319 assert!(matches!(
2320 restarted.consume(&consumed_state, &target, provider, redirect_uri),
2321 Err(OAuthFlowError::LifecycleRejected {
2322 operation: "verify_oauth_browser_flow",
2323 ..
2324 })
2325 ));
2326 restarted
2327 .consume(
2328 &replacement_state,
2329 &replacement_target,
2330 provider,
2331 replacement_redirect_uri,
2332 )
2333 .expect("new stale-authority flow survives restart");
2334 }
2335
2336 #[test]
2337 fn persistent_oauth_device_admit_does_not_resurrect_consumed_between_sync_and_persist() {
2338 let store = Arc::new(FailingOAuthSnapshotStore::default());
2339 let store_dyn = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2340 let creator = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2341 Duration::from_secs(60),
2342 Arc::new(RuntimeAuthLeaseHandle::new()),
2343 &store_dyn,
2344 );
2345 let target = target();
2346 let replacement_target = alternate_target();
2347 let provider = OAuthProviderIdentity::GoogleCodeAssist;
2348 let consumed_device_code = "consumed-device-code";
2349 let replacement_device_code = "replacement-device-code";
2350 creator
2351 .admit_device_code(
2352 target.clone(),
2353 provider,
2354 consumed_device_code.to_string(),
2355 Duration::from_secs(60),
2356 )
2357 .expect("creator admits device flow");
2358
2359 let stale_authority = Arc::new(
2360 RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2361 Duration::from_secs(60),
2362 Arc::new(RuntimeAuthLeaseHandle::new()),
2363 &store_dyn,
2364 ),
2365 );
2366 let consumer = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2367 Duration::from_secs(60),
2368 Arc::new(RuntimeAuthLeaseHandle::new()),
2369 &store_dyn,
2370 );
2371
2372 store.block_next_oauth_persist();
2373 let stale_admit = std::thread::spawn({
2374 let stale_authority = Arc::clone(&stale_authority);
2375 let replacement_target = replacement_target.clone();
2376 move || {
2377 stale_authority.admit_device_code(
2378 replacement_target,
2379 provider,
2380 replacement_device_code.to_string(),
2381 Duration::from_secs(60),
2382 )
2383 }
2384 });
2385 store.wait_for_blocked_oauth_persist();
2386 consumer
2387 .begin_device_code_poll(consumed_device_code, &target, provider)
2388 .expect("independent authority begins device poll")
2389 .consume()
2390 .expect("independent authority consumes device flow between sync and persist");
2391 store.release_blocked_oauth_persist();
2392 stale_admit
2393 .join()
2394 .expect("stale device admit thread should not panic")
2395 .expect("stale authority admits replacement device flow");
2396
2397 let snapshot_json = store
2398 .load_auth_oauth_flow_snapshot()
2399 .expect("durable OAuth snapshot loads")
2400 .expect("durable OAuth snapshot exists");
2401 let snapshot = serde_json::from_slice::<OAuthFlowRegistrySnapshot>(&snapshot_json)
2402 .expect("durable OAuth snapshot decodes");
2403 assert!(
2404 !snapshot
2405 .device
2406 .iter()
2407 .any(|flow| flow.device_code == consumed_device_code),
2408 "a stale admission must not resurrect a device flow consumed after pre-sync"
2409 );
2410 assert!(
2411 snapshot
2412 .device
2413 .iter()
2414 .any(|flow| flow.device_code == replacement_device_code),
2415 "the stale authority's newly admitted device flow should still persist"
2416 );
2417
2418 let restarted = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2419 Duration::from_secs(60),
2420 Arc::new(RuntimeAuthLeaseHandle::new()),
2421 &store_dyn,
2422 );
2423 assert!(matches!(
2424 restarted.verify_device_code(consumed_device_code, &target, provider),
2425 Err(OAuthFlowError::LifecycleRejected {
2426 operation: "verify_oauth_device_flow",
2427 ..
2428 })
2429 ));
2430 restarted
2431 .verify_device_code(replacement_device_code, &replacement_target, provider)
2432 .expect("new stale-authority device flow survives restart");
2433 }
2434
2435 #[cfg(feature = "sqlite-store")]
2436 #[test]
2437 fn persistent_oauth_device_poll_finish_does_not_resurrect_consumed_payload() {
2438 let temp_dir = tempfile::tempdir().expect("tempdir");
2439 let store_path = temp_dir.path().join("runtime.sqlite");
2440 let creator_store: Arc<dyn RuntimeStore> =
2441 Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2442 let stale_store: Arc<dyn RuntimeStore> =
2443 Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2444 let consumer_store: Arc<dyn RuntimeStore> =
2445 Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2446 let creator = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2447 Duration::from_secs(60),
2448 Arc::new(RuntimeAuthLeaseHandle::new()),
2449 &creator_store,
2450 );
2451 let target = target();
2452 let provider = OAuthProviderIdentity::GoogleCodeAssist;
2453 let device_code = "pending-finish-device-code";
2454 creator
2455 .admit_device_code(
2456 target.clone(),
2457 provider,
2458 device_code.to_string(),
2459 Duration::from_secs(60),
2460 )
2461 .expect("creator admits device flow");
2462
2463 let stale_authority = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2464 Duration::from_secs(60),
2465 Arc::new(RuntimeAuthLeaseHandle::new()),
2466 &stale_store,
2467 );
2468 let stale_poll = stale_authority
2469 .begin_device_code_poll(device_code, &target, provider)
2470 .expect("stale authority begins pending poll");
2471 let consumer = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2472 Duration::from_secs(60),
2473 Arc::new(RuntimeAuthLeaseHandle::new()),
2474 &consumer_store,
2475 );
2476 consumer
2477 .begin_device_code_poll(device_code, &target, provider)
2478 .expect("consumer begins independent poll")
2479 .consume()
2480 .expect("consumer consumes durable payload");
2481
2482 stale_poll
2483 .finish()
2484 .expect("stale pending poll finish is local cleanup only");
2485
2486 let snapshot_json = stale_store
2487 .load_auth_oauth_flow_snapshot()
2488 .expect("durable OAuth snapshot loads")
2489 .expect("durable OAuth snapshot exists");
2490 let snapshot = serde_json::from_slice::<OAuthFlowRegistrySnapshot>(&snapshot_json)
2491 .expect("durable OAuth snapshot decodes");
2492 assert!(
2493 !snapshot
2494 .device
2495 .iter()
2496 .any(|flow| flow.device_code == device_code),
2497 "a stale pending poll finish must not resurrect a consumed device flow"
2498 );
2499
2500 let restarted_store: Arc<dyn RuntimeStore> =
2501 Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2502 let restarted = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2503 Duration::from_secs(60),
2504 Arc::new(RuntimeAuthLeaseHandle::new()),
2505 &restarted_store,
2506 );
2507 assert!(matches!(
2508 restarted.verify_device_code(device_code, &target, provider),
2509 Err(OAuthFlowError::LifecycleRejected {
2510 operation: "verify_oauth_device_flow",
2511 ..
2512 })
2513 ));
2514 }
2515
2516 #[cfg(feature = "sqlite-store")]
2517 #[test]
2518 fn persistent_oauth_browser_sync_prunes_stale_capacity_before_admit() {
2519 let temp_dir = tempfile::tempdir().expect("tempdir");
2520 let store_path = temp_dir.path().join("runtime.sqlite");
2521 let creator_store: Arc<dyn RuntimeStore> =
2522 Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2523 let stale_store: Arc<dyn RuntimeStore> =
2524 Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2525 let consumer_store: Arc<dyn RuntimeStore> =
2526 Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2527 let creator = RuntimeOAuthFlowHandle::new_with_capacity_auth_lease_and_store(
2528 Duration::from_secs(60),
2529 1,
2530 Arc::new(RuntimeAuthLeaseHandle::new()),
2531 Some(Arc::downgrade(&creator_store)),
2532 );
2533 let target = target();
2534 let replacement_target = alternate_target();
2535 let provider = OAuthProviderIdentity::OpenAiChatGpt;
2536 let redirect_uri = "http://127.0.0.1/callback";
2537 let replacement_redirect_uri = "http://127.0.0.1/replacement-callback";
2538 let consumed_state = creator
2539 .start(
2540 target.clone(),
2541 provider,
2542 redirect_uri.to_string(),
2543 "consumed-verifier".to_string(),
2544 )
2545 .expect("creator admits browser flow");
2546
2547 let stale_authority = RuntimeOAuthFlowHandle::new_with_capacity_auth_lease_and_store(
2548 Duration::from_secs(60),
2549 1,
2550 Arc::new(RuntimeAuthLeaseHandle::new()),
2551 Some(Arc::downgrade(&stale_store)),
2552 );
2553 let consumer = RuntimeOAuthFlowHandle::new_with_capacity_auth_lease_and_store(
2554 Duration::from_secs(60),
2555 1,
2556 Arc::new(RuntimeAuthLeaseHandle::new()),
2557 Some(Arc::downgrade(&consumer_store)),
2558 );
2559 consumer
2560 .consume(&consumed_state, &target, provider, redirect_uri)
2561 .expect("independent authority consumes browser flow");
2562
2563 let replacement_state = stale_authority
2564 .start(
2565 replacement_target.clone(),
2566 provider,
2567 replacement_redirect_uri.to_string(),
2568 "replacement-verifier".to_string(),
2569 )
2570 .expect("stale capacity is pruned before browser admit");
2571 stale_authority
2572 .consume(
2573 &replacement_state,
2574 &replacement_target,
2575 provider,
2576 replacement_redirect_uri,
2577 )
2578 .expect("replacement browser flow remains usable");
2579 }
2580
2581 #[cfg(feature = "sqlite-store")]
2582 #[test]
2583 fn persistent_oauth_device_sync_prunes_stale_capacity_before_admit() {
2584 let temp_dir = tempfile::tempdir().expect("tempdir");
2585 let store_path = temp_dir.path().join("runtime.sqlite");
2586 let creator_store: Arc<dyn RuntimeStore> =
2587 Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2588 let stale_store: Arc<dyn RuntimeStore> =
2589 Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2590 let consumer_store: Arc<dyn RuntimeStore> =
2591 Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
2592 let creator = RuntimeOAuthFlowHandle::new_with_capacity_auth_lease_and_store(
2593 Duration::from_secs(60),
2594 1,
2595 Arc::new(RuntimeAuthLeaseHandle::new()),
2596 Some(Arc::downgrade(&creator_store)),
2597 );
2598 let target = target();
2599 let replacement_target = alternate_target();
2600 let provider = OAuthProviderIdentity::GoogleCodeAssist;
2601 let consumed_device_code = "consumed-capacity-device-code";
2602 let replacement_device_code = "replacement-device-code";
2603 creator
2604 .admit_device_code(
2605 target.clone(),
2606 provider,
2607 consumed_device_code.to_string(),
2608 Duration::from_secs(60),
2609 )
2610 .expect("creator admits device flow");
2611
2612 let stale_authority = RuntimeOAuthFlowHandle::new_with_capacity_auth_lease_and_store(
2613 Duration::from_secs(60),
2614 1,
2615 Arc::new(RuntimeAuthLeaseHandle::new()),
2616 Some(Arc::downgrade(&stale_store)),
2617 );
2618 let consumer = RuntimeOAuthFlowHandle::new_with_capacity_auth_lease_and_store(
2619 Duration::from_secs(60),
2620 1,
2621 Arc::new(RuntimeAuthLeaseHandle::new()),
2622 Some(Arc::downgrade(&consumer_store)),
2623 );
2624 consumer
2625 .begin_device_code_poll(consumed_device_code, &target, provider)
2626 .expect("independent authority begins device poll")
2627 .consume()
2628 .expect("independent authority consumes device flow");
2629
2630 stale_authority
2631 .admit_device_code(
2632 replacement_target.clone(),
2633 provider,
2634 replacement_device_code.to_string(),
2635 Duration::from_secs(60),
2636 )
2637 .expect("stale capacity is pruned before device admit");
2638 stale_authority
2639 .verify_device_code(replacement_device_code, &replacement_target, provider)
2640 .expect("replacement device flow remains usable");
2641 }
2642
2643 #[test]
2644 fn concurrent_persistent_browser_consumes_require_fresh_durable_claim() {
2645 let store = Arc::new(FailingOAuthSnapshotStore::default());
2646 let store_dyn = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2647 let creator = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2648 Duration::from_secs(60),
2649 Arc::new(RuntimeAuthLeaseHandle::new()),
2650 &store_dyn,
2651 );
2652 let target = target();
2653 let provider = OAuthProviderIdentity::OpenAiChatGpt;
2654 let redirect_uri = "http://127.0.0.1/callback";
2655 let state = creator
2656 .start(
2657 target.clone(),
2658 provider,
2659 redirect_uri.to_string(),
2660 "verifier".to_string(),
2661 )
2662 .expect("browser flow admitted");
2663 let first = Arc::new(
2664 RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2665 Duration::from_secs(60),
2666 Arc::new(RuntimeAuthLeaseHandle::new()),
2667 &store_dyn,
2668 ),
2669 );
2670 let second = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2671 Duration::from_secs(60),
2672 Arc::new(RuntimeAuthLeaseHandle::new()),
2673 &store_dyn,
2674 );
2675
2676 store.block_next_oauth_persist();
2677 let first_consume = std::thread::spawn({
2678 let first = Arc::clone(&first);
2679 let state = state.clone();
2680 let target = target.clone();
2681 move || first.consume(&state, &target, provider, redirect_uri)
2682 });
2683 store.wait_for_blocked_oauth_persist();
2684
2685 second
2686 .consume(&state, &target, provider, redirect_uri)
2687 .expect("second authority wins durable consume race");
2688 store.release_blocked_oauth_persist();
2689 assert!(matches!(
2690 first_consume
2691 .join()
2692 .expect("first consume thread should not panic"),
2693 Err(OAuthFlowError::RegistryProjectionMissing {
2694 operation: "consume_oauth_browser_flow"
2695 })
2696 ));
2697 }
2698
2699 #[test]
2700 fn concurrent_persistent_device_consumes_require_fresh_durable_claim() {
2701 let store = Arc::new(FailingOAuthSnapshotStore::default());
2702 let store_dyn = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2703 let creator = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2704 Duration::from_secs(60),
2705 Arc::new(RuntimeAuthLeaseHandle::new()),
2706 &store_dyn,
2707 );
2708 let target = target();
2709 let provider = OAuthProviderIdentity::GoogleCodeAssist;
2710 let device_code = "race-device-code";
2711 creator
2712 .admit_device_code(
2713 target.clone(),
2714 provider,
2715 device_code.to_string(),
2716 Duration::from_secs(60),
2717 )
2718 .expect("device flow admitted");
2719 let first = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2720 Duration::from_secs(60),
2721 Arc::new(RuntimeAuthLeaseHandle::new()),
2722 &store_dyn,
2723 );
2724 let second = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2725 Duration::from_secs(60),
2726 Arc::new(RuntimeAuthLeaseHandle::new()),
2727 &store_dyn,
2728 );
2729 let first_poll = first
2730 .begin_device_code_poll(device_code, &target, provider)
2731 .expect("first authority begins poll");
2732 let second_poll = second
2733 .begin_device_code_poll(device_code, &target, provider)
2734 .expect("second authority begins poll");
2735
2736 store.block_next_oauth_persist();
2737 let first_consume = std::thread::spawn(move || first_poll.consume());
2738 store.wait_for_blocked_oauth_persist();
2739
2740 second_poll
2741 .consume()
2742 .expect("second authority wins durable device consume race");
2743 store.release_blocked_oauth_persist();
2744 assert!(matches!(
2745 first_consume
2746 .join()
2747 .expect("first device consume thread should not panic"),
2748 Err(OAuthFlowError::RegistryProjectionMissing {
2749 operation: "consume_oauth_device_flow"
2750 })
2751 ));
2752 }
2753
2754 #[test]
2755 fn concurrent_persistent_browser_admits_require_fresh_durable_capacity() {
2756 let store = Arc::new(FailingOAuthSnapshotStore::default());
2757 let first_store = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2758 let second_store = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2759 let first = Arc::new(
2760 RuntimeOAuthFlowHandle::new_with_capacity_auth_lease_and_store(
2761 Duration::from_secs(60),
2762 1,
2763 Arc::new(RuntimeAuthLeaseHandle::new()),
2764 Some(Arc::downgrade(&first_store)),
2765 ),
2766 );
2767 let second = RuntimeOAuthFlowHandle::new_with_capacity_auth_lease_and_store(
2768 Duration::from_secs(60),
2769 1,
2770 Arc::new(RuntimeAuthLeaseHandle::new()),
2771 Some(Arc::downgrade(&second_store)),
2772 );
2773 let first_target = target();
2774 let second_target = alternate_target();
2775 let provider = OAuthProviderIdentity::OpenAiChatGpt;
2776 let first_redirect_uri = "http://127.0.0.1/first-callback";
2777 let second_redirect_uri = "http://127.0.0.1/second-callback";
2778
2779 store.block_next_oauth_persist();
2780 let first_admit = std::thread::spawn({
2781 let first = Arc::clone(&first);
2782 let first_target = first_target.clone();
2783 move || {
2784 first.start(
2785 first_target,
2786 provider,
2787 first_redirect_uri.to_string(),
2788 "first-verifier".to_string(),
2789 )
2790 }
2791 });
2792 store.wait_for_blocked_oauth_persist();
2793
2794 second
2795 .start(
2796 second_target,
2797 provider,
2798 second_redirect_uri.to_string(),
2799 "second-verifier".to_string(),
2800 )
2801 .expect("second authority wins durable browser admission race");
2802 store.release_blocked_oauth_persist();
2803 assert!(matches!(
2804 first_admit
2805 .join()
2806 .expect("first browser admit thread should not panic"),
2807 Err(OAuthFlowError::CapacityExceeded { max_outstanding: 1 })
2808 ));
2809 }
2810
2811 #[test]
2812 fn concurrent_persistent_device_admits_require_fresh_durable_capacity() {
2813 let store = Arc::new(FailingOAuthSnapshotStore::default());
2814 let first_store = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2815 let second_store = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2816 let first = Arc::new(
2817 RuntimeOAuthFlowHandle::new_with_capacity_auth_lease_and_store(
2818 Duration::from_secs(60),
2819 1,
2820 Arc::new(RuntimeAuthLeaseHandle::new()),
2821 Some(Arc::downgrade(&first_store)),
2822 ),
2823 );
2824 let second = RuntimeOAuthFlowHandle::new_with_capacity_auth_lease_and_store(
2825 Duration::from_secs(60),
2826 1,
2827 Arc::new(RuntimeAuthLeaseHandle::new()),
2828 Some(Arc::downgrade(&second_store)),
2829 );
2830 let first_target = target();
2831 let second_target = alternate_target();
2832 let provider = OAuthProviderIdentity::GoogleCodeAssist;
2833
2834 store.block_next_oauth_persist();
2835 let first_admit = std::thread::spawn({
2836 let first = Arc::clone(&first);
2837 let first_target = first_target.clone();
2838 move || {
2839 first.admit_device_code(
2840 first_target,
2841 provider,
2842 "first-device-code".to_string(),
2843 Duration::from_secs(60),
2844 )
2845 }
2846 });
2847 store.wait_for_blocked_oauth_persist();
2848
2849 second
2850 .admit_device_code(
2851 second_target,
2852 provider,
2853 "second-device-code".to_string(),
2854 Duration::from_secs(60),
2855 )
2856 .expect("second authority wins durable device admission race");
2857 store.release_blocked_oauth_persist();
2858 assert!(matches!(
2859 first_admit
2860 .join()
2861 .expect("first device admit thread should not panic"),
2862 Err(OAuthFlowError::CapacityExceeded { max_outstanding: 1 })
2863 ));
2864 }
2865
2866 #[test]
2867 fn concurrent_browser_admits_preserve_newer_durable_snapshot() {
2868 let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
2869 let store = Arc::new(FailingOAuthSnapshotStore::default());
2870 let store_dyn = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2871 let authority = Arc::new(
2872 RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2873 Duration::from_secs(60),
2874 lifecycle,
2875 &store_dyn,
2876 ),
2877 );
2878 let first_target = target();
2879 let second_target = alternate_target();
2880 let provider = OAuthProviderIdentity::OpenAiChatGpt;
2881 let first_redirect_uri = "http://127.0.0.1/callback";
2882 let second_redirect_uri = "http://127.0.0.1/other-callback";
2883
2884 store.block_next_oauth_persist();
2885 let first_admit = std::thread::spawn({
2886 let authority = Arc::clone(&authority);
2887 let target = first_target.clone();
2888 move || {
2889 authority.start(
2890 target,
2891 provider,
2892 first_redirect_uri.to_string(),
2893 "verifier-1".to_string(),
2894 )
2895 }
2896 });
2897 store.wait_for_blocked_oauth_persist();
2898
2899 let (second_done_tx, second_done_rx) = std::sync::mpsc::channel();
2900 std::thread::spawn({
2901 let authority = Arc::clone(&authority);
2902 let target = second_target.clone();
2903 move || {
2904 let result = authority.start(
2905 target,
2906 provider,
2907 second_redirect_uri.to_string(),
2908 "verifier-2".to_string(),
2909 );
2910 let _ = second_done_tx.send(result);
2911 }
2912 });
2913 let second_before_release = second_done_rx.recv_timeout(Duration::from_millis(100)).ok();
2914 store.release_blocked_oauth_persist();
2915 first_admit
2916 .join()
2917 .expect("first admit thread should not panic")
2918 .expect("first browser flow admitted");
2919 let second_state = second_before_release
2920 .unwrap_or_else(|| {
2921 second_done_rx
2922 .recv_timeout(Duration::from_secs(1))
2923 .expect("second admit should finish after first durable write is released")
2924 })
2925 .expect("second browser flow admitted");
2926
2927 let snapshot_json = store
2928 .load_auth_oauth_flow_snapshot()
2929 .expect("durable OAuth snapshot loads")
2930 .expect("durable OAuth snapshot exists");
2931 let snapshot = serde_json::from_slice::<OAuthFlowRegistrySnapshot>(&snapshot_json)
2932 .expect("durable OAuth snapshot decodes");
2933 assert!(
2934 snapshot
2935 .browser
2936 .iter()
2937 .any(|flow| flow.state == second_state),
2938 "durable snapshot must retain flow admitted by a concurrent newer write"
2939 );
2940
2941 let restarted_lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
2942 let restarted = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2943 Duration::from_secs(60),
2944 restarted_lifecycle,
2945 &store_dyn,
2946 );
2947 let record = restarted
2948 .consume(&second_state, &second_target, provider, second_redirect_uri)
2949 .expect("newer durable flow should survive restart");
2950 assert_eq!(record.pkce_verifier, "verifier-2");
2951 }
2952
2953 #[test]
2954 fn browser_consume_persistence_failure_keeps_flow_retryable() {
2955 let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
2956 let store = Arc::new(FailingOAuthSnapshotStore::default());
2957 let store_dyn = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2958 let authority = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2959 Duration::from_secs(60),
2960 lifecycle.clone(),
2961 &store_dyn,
2962 );
2963 let target = target();
2964 let provider = OAuthProviderIdentity::OpenAiChatGpt;
2965 let redirect_uri = "http://127.0.0.1/callback";
2966 let state = authority
2967 .start(
2968 target.clone(),
2969 provider,
2970 redirect_uri.to_string(),
2971 "verifier".to_string(),
2972 )
2973 .expect("browser flow admitted");
2974
2975 store.fail_oauth_persist();
2976 assert!(matches!(
2977 authority.consume(&state, &target, provider, redirect_uri),
2978 Err(OAuthFlowError::PersistenceFailed { .. })
2979 ));
2980 assert!(lifecycle.has_oauth_browser_flow_for_test(&target, &state));
2981
2982 store.allow_oauth_persist();
2983 authority
2984 .consume(&state, &target, provider, redirect_uri)
2985 .expect("failed durable consume remains retryable");
2986 }
2987
2988 #[test]
2989 fn device_consume_persistence_failure_keeps_flow_retryable() {
2990 let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
2991 let store = Arc::new(FailingOAuthSnapshotStore::default());
2992 let store_dyn = Arc::clone(&store) as Arc<dyn RuntimeStore>;
2993 let authority = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
2994 Duration::from_secs(60),
2995 lifecycle.clone(),
2996 &store_dyn,
2997 );
2998 let target = target();
2999 let provider = OAuthProviderIdentity::GoogleCodeAssist;
3000 let device_code = "provider-device-code";
3001 authority
3002 .admit_device_code(
3003 target.clone(),
3004 provider,
3005 device_code.to_string(),
3006 Duration::from_secs(60),
3007 )
3008 .expect("device flow admitted");
3009 let poll = authority
3010 .begin_device_code_poll(device_code, &target, provider)
3011 .expect("device poll begins");
3012
3013 store.fail_oauth_persist();
3014 assert!(matches!(
3015 poll.consume(),
3016 Err(OAuthFlowError::PersistenceFailed { .. })
3017 ));
3018 assert!(lifecycle.has_oauth_device_flow_for_test(&target, device_code));
3019
3020 store.allow_oauth_persist();
3021 let retry = authority
3022 .begin_device_code_poll(device_code, &target, provider)
3023 .expect("failed durable consume keeps device flow retryable");
3024 retry
3025 .consume()
3026 .expect("retry consumes after durable persistence recovers");
3027 }
3028
3029 #[test]
3030 fn release_persistence_failure_keeps_released_flows_retryable() {
3031 let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
3032 let store = Arc::new(FailingOAuthSnapshotStore::default());
3033 let store_dyn = Arc::clone(&store) as Arc<dyn RuntimeStore>;
3034 let authority = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
3035 Duration::from_secs(60),
3036 lifecycle.clone(),
3037 &store_dyn,
3038 );
3039 let target = target();
3040 let lease_key = LeaseKey::from_auth_binding(&target);
3041 let provider = OAuthProviderIdentity::OpenAiChatGpt;
3042 let redirect_uri = "http://127.0.0.1/callback";
3043 let state = authority
3044 .start(
3045 target.clone(),
3046 provider,
3047 redirect_uri.to_string(),
3048 "verifier".to_string(),
3049 )
3050 .expect("browser flow admitted");
3051
3052 store.fail_oauth_persist();
3053 assert!(
3054 lifecycle.release_lease(&lease_key).is_err(),
3055 "release should fail closed when durable OAuth cleanup cannot persist"
3056 );
3057 assert!(lifecycle.has_oauth_browser_flow_for_test(&target, &state));
3058
3059 store.allow_oauth_persist();
3060 authority
3061 .consume(&state, &target, provider, redirect_uri)
3062 .expect("failed durable release leaves browser flow retryable");
3063 }
3064
3065 #[test]
3066 fn stale_release_persistence_failure_does_not_install_released_authority() {
3067 let releasing_lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
3068 let store = Arc::new(FailingOAuthSnapshotStore::default());
3069 let store_dyn = Arc::clone(&store) as Arc<dyn RuntimeStore>;
3070 let releasing_authority = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
3071 Duration::from_secs(60),
3072 releasing_lifecycle.clone(),
3073 &store_dyn,
3074 );
3075 let admitting_authority = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
3076 Duration::from_secs(60),
3077 Arc::new(RuntimeAuthLeaseHandle::new()),
3078 &store_dyn,
3079 );
3080 let target = target();
3081 let lease_key = LeaseKey::from_auth_binding(&target);
3082 let provider = OAuthProviderIdentity::OpenAiChatGpt;
3083 let redirect_uri = "http://127.0.0.1/callback";
3084 let state = admitting_authority
3085 .start(
3086 target.clone(),
3087 provider,
3088 redirect_uri.to_string(),
3089 "verifier".to_string(),
3090 )
3091 .expect("other authority admits browser flow");
3092 assert!(
3093 !releasing_lifecycle.has_oauth_browser_flow_for_test(&target, &state),
3094 "releasing authority starts stale and has no local machine membership"
3095 );
3096
3097 store.fail_oauth_persist();
3098 assert!(
3099 releasing_lifecycle.release_lease(&lease_key).is_err(),
3100 "release should fail closed when stale durable OAuth cleanup cannot persist"
3101 );
3102 assert_eq!(
3103 releasing_lifecycle.snapshot(&lease_key).phase,
3104 None,
3105 "failed stale release must not synthesize a local AuthMachine authority"
3106 );
3107
3108 store.allow_oauth_persist();
3109 releasing_authority
3110 .consume(&state, &target, provider, redirect_uri)
3111 .expect("failed stale durable release must leave browser flow retryable");
3112 }
3113
3114 #[cfg(feature = "sqlite-store")]
3115 #[test]
3116 fn persistent_release_prunes_durable_flows_from_stale_authority() {
3117 let temp_dir = tempfile::tempdir().expect("tempdir");
3118 let store_path = temp_dir.path().join("runtime.sqlite");
3119 let releasing_store: Arc<dyn RuntimeStore> =
3120 Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
3121 let admitting_store: Arc<dyn RuntimeStore> =
3122 Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
3123 let releasing_lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
3124 let releasing_authority = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
3125 Duration::from_secs(60),
3126 releasing_lifecycle.clone(),
3127 &releasing_store,
3128 );
3129 let admitting_authority = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
3130 Duration::from_secs(60),
3131 Arc::new(RuntimeAuthLeaseHandle::new()),
3132 &admitting_store,
3133 );
3134 let target = target();
3135 let lease_key = LeaseKey::from_auth_binding(&target);
3136 let browser_provider = OAuthProviderIdentity::OpenAiChatGpt;
3137 let redirect_uri = "http://127.0.0.1/callback";
3138 let browser_state = admitting_authority
3139 .start(
3140 target.clone(),
3141 browser_provider,
3142 redirect_uri.to_string(),
3143 "browser-verifier".to_string(),
3144 )
3145 .expect("other authority admits browser flow");
3146 let device_provider = OAuthProviderIdentity::GoogleCodeAssist;
3147 let device_code = "released-device-code";
3148 admitting_authority
3149 .admit_device_code(
3150 target.clone(),
3151 device_provider,
3152 device_code.to_string(),
3153 Duration::from_secs(60),
3154 )
3155 .expect("other authority admits device flow");
3156 assert!(
3157 !releasing_lifecycle.has_oauth_browser_flow_for_test(&target, &browser_state),
3158 "releasing authority starts stale and does not know the browser flow locally"
3159 );
3160 assert!(
3161 !releasing_lifecycle.has_oauth_device_flow_for_test(&target, device_code),
3162 "releasing authority starts stale and does not know the device flow locally"
3163 );
3164
3165 releasing_lifecycle
3166 .release_lease(&lease_key)
3167 .expect("stale release succeeds");
3168
3169 let restarted_store: Arc<dyn RuntimeStore> =
3170 Arc::new(crate::store::sqlite::SqliteRuntimeStore::new(&store_path).unwrap());
3171 let restarted = RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
3172 Duration::from_secs(60),
3173 Arc::new(RuntimeAuthLeaseHandle::new()),
3174 &restarted_store,
3175 );
3176 let browser_after_release =
3177 restarted.consume(&browser_state, &target, browser_provider, redirect_uri);
3178 let device_after_release =
3179 restarted.verify_device_code(device_code, &target, device_provider);
3180 assert!(
3181 matches!(
3182 browser_after_release,
3183 Err(OAuthFlowError::LifecycleRejected {
3184 operation: "verify_oauth_browser_flow",
3185 ..
3186 })
3187 ),
3188 "release from a stale authority must prune durable browser flow, got {browser_after_release:?}"
3189 );
3190 assert!(
3191 matches!(
3192 device_after_release,
3193 Err(OAuthFlowError::LifecycleRejected {
3194 operation: "verify_oauth_device_flow",
3195 ..
3196 })
3197 ),
3198 "release from a stale authority must prune durable device flow, got {device_after_release:?}"
3199 );
3200 drop(releasing_authority);
3201 }
3202
3203 #[test]
3204 fn oauth_flow_membership_does_not_advance_credential_generation() {
3205 let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
3206 let authority =
3207 RuntimeOAuthFlowHandle::new_with_auth_lease(Duration::from_secs(60), lifecycle.clone());
3208 let target = target();
3209 let lease_key = LeaseKey::from_auth_binding(&target);
3210 let provider = OAuthProviderIdentity::OpenAiChatGpt;
3211 let redirect_uri = "http://127.0.0.1/callback";
3212 let transition = lifecycle
3213 .acquire_lease(&lease_key, 4_200)
3214 .expect("credential lifecycle acquired");
3215
3216 let state = authority
3217 .start(
3218 target.clone(),
3219 provider,
3220 redirect_uri.to_string(),
3221 "verifier".to_string(),
3222 )
3223 .expect("browser flow admitted");
3224 authority
3225 .verify(&state, &target, provider, redirect_uri)
3226 .expect("browser flow verifies");
3227 authority
3228 .consume(&state, &target, provider, redirect_uri)
3229 .expect("browser flow consumes");
3230
3231 let snapshot = lifecycle.snapshot(&lease_key);
3232 assert_eq!(snapshot.generation, transition.generation);
3233 assert_eq!(
3234 snapshot.credential_published_at_millis,
3235 transition.credential_published_at_millis
3236 );
3237 }
3238
3239 #[test]
3240 fn global_browser_expiry_preserves_reauth_required_phase() {
3241 let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
3242 let authority = RuntimeOAuthFlowHandle::new_with_auth_lease(
3243 Duration::from_millis(1),
3244 lifecycle.clone(),
3245 );
3246 let target = target();
3247 let other_target = alternate_target();
3248 let provider = OAuthProviderIdentity::OpenAiChatGpt;
3249 let redirect_uri = "http://127.0.0.1/callback";
3250
3251 let expired_state = authority
3252 .start(
3253 target.clone(),
3254 provider,
3255 redirect_uri.to_string(),
3256 "verifier-old".to_string(),
3257 )
3258 .expect("browser flow admitted");
3259 assert_eq!(
3260 snapshot_phase(&lifecycle, &target),
3261 Some(AuthLeasePhase::ReauthRequired)
3262 );
3263 std::thread::sleep(Duration::from_millis(10));
3264
3265 authority
3266 .start(
3267 other_target,
3268 provider,
3269 redirect_uri.to_string(),
3270 "verifier-new".to_string(),
3271 )
3272 .expect("new browser flow admitted after pruning expired flow");
3273
3274 assert!(
3275 !lifecycle.has_oauth_browser_flow_for_test(&target, &expired_state),
3276 "passive registry expiry must remove stale AuthMachine browser membership"
3277 );
3278 assert_eq!(
3279 snapshot_phase(&lifecycle, &target),
3280 Some(AuthLeasePhase::ReauthRequired),
3281 "global OAuth expiry cleanup must not change credential lifecycle truth"
3282 );
3283 }
3284
3285 #[test]
3286 fn browser_passive_expiry_clears_lifecycle_membership_on_next_admit() {
3287 let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
3288 let authority = RuntimeOAuthFlowHandle::new_with_auth_lease(
3289 Duration::from_millis(1),
3290 lifecycle.clone(),
3291 );
3292 let target = target();
3293 let provider = OAuthProviderIdentity::OpenAiChatGpt;
3294 let redirect_uri = "http://127.0.0.1/callback";
3295
3296 let expired_state = authority
3297 .start(
3298 target.clone(),
3299 provider,
3300 redirect_uri.to_string(),
3301 "verifier-old".to_string(),
3302 )
3303 .expect("browser flow admitted");
3304 assert!(lifecycle.has_oauth_browser_flow_for_test(&target, &expired_state));
3305 std::thread::sleep(Duration::from_millis(10));
3306
3307 authority
3308 .start(
3309 target.clone(),
3310 provider,
3311 redirect_uri.to_string(),
3312 "verifier-new".to_string(),
3313 )
3314 .expect("new browser flow admitted after pruning expired flow");
3315
3316 assert!(
3317 !lifecycle.has_oauth_browser_flow_for_test(&target, &expired_state),
3318 "passive registry expiry must remove stale AuthMachine browser membership"
3319 );
3320 }
3321
3322 #[test]
3323 fn device_admit_rejects_registry_pruned_canonical_membership() {
3324 let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
3325 let authority =
3326 RuntimeOAuthFlowHandle::new_with_auth_lease(Duration::from_secs(60), lifecycle.clone());
3327 let target = target();
3328 let provider = OAuthProviderIdentity::GoogleCodeAssist;
3329 let device_code = "provider-device-code";
3330
3331 authority
3332 .admit_device_code(
3333 target.clone(),
3334 provider,
3335 device_code.to_string(),
3336 Duration::from_secs(60),
3337 )
3338 .expect("device flow admitted");
3339 assert!(lifecycle.has_oauth_device_flow_for_test(&target, device_code));
3340 authority
3341 .registry
3342 .expire_device_code(device_code, &target, provider)
3343 .expect("test removes registry record without lifecycle cleanup");
3344 assert!(lifecycle.has_oauth_device_flow_for_test(&target, device_code));
3345
3346 assert!(matches!(
3347 authority.admit_device_code(
3348 target.clone(),
3349 provider,
3350 device_code.to_string(),
3351 Duration::from_secs(60),
3352 ),
3353 Err(OAuthFlowError::LifecycleRejected {
3354 operation: "admit_oauth_device_flow",
3355 ..
3356 })
3357 ));
3358
3359 assert!(
3360 lifecycle.has_oauth_device_flow_for_test(&target, device_code),
3361 "registry-only loss must not expire canonical AuthMachine device membership"
3362 );
3363 assert!(matches!(
3364 authority.verify_device_code(device_code, &target, provider),
3365 Err(OAuthFlowError::RegistryProjectionMissing {
3366 operation: "verify_oauth_device_flow"
3367 })
3368 ));
3369 assert!(matches!(
3370 authority.begin_device_code_poll(device_code, &target, provider),
3371 Err(OAuthFlowError::RegistryProjectionMissing {
3372 operation: "begin_oauth_device_poll"
3373 })
3374 ));
3375 assert!(
3376 lifecycle.has_oauth_device_flow_for_test(&target, device_code),
3377 "missing process-local device payload must fail closed without removing the flow"
3378 );
3379 }
3380
3381 #[test]
3382 fn duplicate_device_admit_preserves_active_lifecycle_membership() {
3383 let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
3384 let authority =
3385 RuntimeOAuthFlowHandle::new_with_auth_lease(Duration::from_secs(60), lifecycle.clone());
3386 let target = target();
3387 let provider = OAuthProviderIdentity::GoogleCodeAssist;
3388 let device_code = "provider-device-code";
3389
3390 authority
3391 .admit_device_code(
3392 target.clone(),
3393 provider,
3394 device_code.to_string(),
3395 Duration::from_secs(60),
3396 )
3397 .expect("device flow admitted");
3398 let duplicate = authority.admit_device_code(
3399 target.clone(),
3400 provider,
3401 device_code.to_string(),
3402 Duration::from_secs(60),
3403 );
3404
3405 assert!(matches!(
3406 duplicate,
3407 Err(OAuthFlowError::LifecycleRejected {
3408 operation: "admit_oauth_device_flow",
3409 ..
3410 })
3411 ));
3412 assert!(lifecycle.has_oauth_device_flow_for_test(&target, device_code));
3413 authority
3414 .begin_device_code_poll(device_code, &target, provider)
3415 .expect("duplicate admit must not orphan active lifecycle membership");
3416 }
3417
3418 #[test]
3419 fn registry_capacity_still_bounds_payloads_after_lifecycle_release() {
3420 let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
3421 let authority = RuntimeOAuthFlowHandle::new_with_capacity_and_auth_lease(
3422 Duration::from_secs(60),
3423 1,
3424 lifecycle.clone(),
3425 );
3426 let target = target();
3427 let provider = OAuthProviderIdentity::OpenAiChatGpt;
3428
3429 authority
3430 .start(
3431 target.clone(),
3432 provider,
3433 "http://127.0.0.1/callback".to_string(),
3434 "verifier-1".to_string(),
3435 )
3436 .expect("first browser flow admitted");
3437 lifecycle
3438 .release_lease(&LeaseKey::from_auth_binding(&target))
3439 .expect("credential lifecycle release succeeds");
3440
3441 authority
3442 .start(
3443 alternate_target(),
3444 provider,
3445 "http://127.0.0.1/other-callback".to_string(),
3446 "verifier-2".to_string(),
3447 )
3448 .expect("AuthMachine release must clear stale registry payload capacity");
3449 }
3450
3451 #[test]
3452 fn release_observer_does_not_prune_flow_admitted_after_release_acceptance() {
3453 let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
3454 let authority = Arc::new(RuntimeOAuthFlowHandle::new_with_auth_lease(
3455 Duration::from_secs(60),
3456 lifecycle.clone(),
3457 ));
3458 let target = target();
3459 let lease_key = LeaseKey::from_auth_binding(&target);
3460 let provider = OAuthProviderIdentity::OpenAiChatGpt;
3461 let redirect_uri = "http://127.0.0.1/callback";
3462 let old_state = authority
3463 .start(
3464 target.clone(),
3465 provider,
3466 redirect_uri.to_string(),
3467 "old-verifier".to_string(),
3468 )
3469 .expect("old browser flow admitted");
3470 let new_state = Arc::new(std::sync::Mutex::new(None));
3471 let new_state_for_hook = Arc::clone(&new_state);
3472 let authority_for_hook = Arc::clone(&authority);
3473 let target_for_hook = target.clone();
3474 let lease_key_for_hook = lease_key.clone();
3475 let _hook_guard = crate::handles::auth_lease::install_release_after_accept_hook_for_test(
3476 Arc::new(move |released_key| {
3477 if released_key != &lease_key_for_hook {
3478 return;
3479 }
3480 let admitted = authority_for_hook
3481 .start(
3482 target_for_hook.clone(),
3483 provider,
3484 redirect_uri.to_string(),
3485 "new-verifier".to_string(),
3486 )
3487 .expect("new browser flow admitted after release acceptance");
3488 *new_state_for_hook
3489 .lock()
3490 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(admitted);
3491 }),
3492 );
3493
3494 lifecycle
3495 .release_lease(&lease_key)
3496 .expect("credential lifecycle release succeeds");
3497
3498 let new_state = new_state
3499 .lock()
3500 .unwrap_or_else(std::sync::PoisonError::into_inner)
3501 .clone()
3502 .expect("hook admitted replacement flow");
3503 let flow = authority
3504 .consume(&new_state, &target, provider, redirect_uri)
3505 .expect("release observer must not prune newly admitted flow");
3506 assert_eq!(flow.pkce_verifier, "new-verifier");
3507 assert!(matches!(
3508 authority.consume(&old_state, &target, provider, redirect_uri),
3509 Err(OAuthFlowError::LifecycleRejected {
3510 operation: "verify_oauth_browser_flow",
3511 ..
3512 })
3513 ));
3514 }
3515
3516 #[test]
3517 fn browser_capacity_rejection_comes_from_authmachine_lifecycle() {
3518 let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
3519 let authority = RuntimeOAuthFlowHandle::new_with_capacity_and_auth_lease(
3520 Duration::from_secs(60),
3521 1,
3522 lifecycle,
3523 );
3524 let target = target();
3525 let provider = OAuthProviderIdentity::OpenAiChatGpt;
3526 let redirect_uri = "http://127.0.0.1/callback";
3527
3528 authority
3529 .start(
3530 target.clone(),
3531 provider,
3532 redirect_uri.to_string(),
3533 "verifier-1".to_string(),
3534 )
3535 .expect("first browser flow admitted");
3536
3537 assert!(matches!(
3538 authority.start(
3539 alternate_target(),
3540 provider,
3541 "http://127.0.0.1/other-callback".to_string(),
3542 "verifier-2".to_string(),
3543 ),
3544 Err(OAuthFlowError::LifecycleRejected {
3545 operation: "admit_oauth_browser_flow",
3546 ..
3547 })
3548 ));
3549 }
3550
3551 #[test]
3552 fn browser_provider_mismatch_rejection_comes_from_authmachine_lifecycle() {
3553 let lifecycle = Arc::new(RuntimeAuthLeaseHandle::new());
3554 let authority =
3555 RuntimeOAuthFlowHandle::new_with_auth_lease(Duration::from_secs(60), lifecycle);
3556 let target = target();
3557 let redirect_uri = "http://127.0.0.1/callback";
3558 let state = authority
3559 .start(
3560 target.clone(),
3561 OAuthProviderIdentity::OpenAiChatGpt,
3562 redirect_uri.to_string(),
3563 "verifier".to_string(),
3564 )
3565 .expect("browser flow admitted");
3566
3567 assert!(matches!(
3568 authority.verify(
3569 &state,
3570 &target,
3571 OAuthProviderIdentity::GoogleCodeAssist,
3572 redirect_uri,
3573 ),
3574 Err(OAuthFlowError::LifecycleRejected {
3575 operation: "verify_oauth_browser_flow",
3576 ..
3577 })
3578 ));
3579 }
3580}