Skip to main content

ant_quic/
coordinator_control.rs

1#![allow(dead_code)]
2
3use std::{
4    net::SocketAddr,
5    sync::{
6        OnceLock,
7        atomic::{AtomicU64, Ordering},
8    },
9    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
10};
11
12use dashmap::DashMap;
13use serde::{Deserialize, Serialize};
14
15use crate::nat_traversal_api::PeerId;
16
17pub(crate) const COORDINATOR_CONTROL_MAGIC: &[u8; 4] = b"AQCC";
18pub(crate) const COORDINATOR_CONTROL_VERSION: u8 = 1;
19pub(crate) const COORDINATOR_RATE_LIMIT_WINDOW: Duration = Duration::from_secs(2);
20const COORDINATOR_STATE_SWEEP_INTERVAL: Duration = Duration::from_secs(30);
21
22#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
23pub(crate) struct CoordinatorControlEnvelope {
24    pub request_id: u64,
25    pub expires_at_unix_ms: u64,
26    pub message: CoordinatorControlMessage,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
30pub(crate) enum CoordinatorControlMessage {
31    CoordinationRequest {
32        initiator: PeerId,
33        target: PeerId,
34        round: u32,
35        initiator_addrs: Vec<SocketAddr>,
36    },
37    CoordinationOffer {
38        initiator: PeerId,
39        target: PeerId,
40        round: u32,
41        initiator_addrs: Vec<SocketAddr>,
42    },
43    CoordinationReady {
44        initiator: PeerId,
45        target: PeerId,
46        round: u32,
47        target_addrs: Vec<SocketAddr>,
48    },
49    CoordinationAccepted {
50        initiator: PeerId,
51        target: PeerId,
52        round: u32,
53        initiator_addrs: Vec<SocketAddr>,
54        target_addrs: Vec<SocketAddr>,
55    },
56    CoordinationRejected {
57        initiator: PeerId,
58        target: PeerId,
59        round: u32,
60        reason: RejectionReason,
61    },
62}
63
64/// Coordinator-side reason for rejecting a traversal coordination request.
65#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
66pub enum RejectionReason {
67    /// The initiator attempted to coordinate a traversal to itself.
68    SelfTarget,
69    /// The requested target peer is unknown or currently unavailable.
70    UnknownTarget,
71    /// The request or offer expired before it could be coordinated.
72    Expired,
73    /// The coordinator refused the request due to rate limiting.
74    RateLimited,
75    /// The initiating peer was not sufficiently authenticated for coordination.
76    Unauthenticated,
77    /// The coordinator hit an internal error while processing the request.
78    InternalError,
79}
80
81impl std::fmt::Display for RejectionReason {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        match self {
84            Self::SelfTarget => write!(f, "self-target request"),
85            Self::UnknownTarget => write!(f, "unknown target"),
86            Self::Expired => write!(f, "request expired"),
87            Self::RateLimited => write!(f, "rate limited"),
88            Self::Unauthenticated => write!(f, "unauthenticated requester"),
89            Self::InternalError => write!(f, "internal coordinator error"),
90        }
91    }
92}
93
94#[derive(Debug, Clone)]
95pub(crate) struct PendingRequest {
96    pub initiator: PeerId,
97    pub target: PeerId,
98    pub round: u32,
99    pub initiator_addrs: Vec<SocketAddr>,
100    pub expires_at_unix_ms: u64,
101    pub local_expires_at: Instant,
102}
103
104#[derive(Debug, Clone, PartialEq, Eq)]
105pub(crate) struct LiveRequest {
106    pub request_id: u64,
107    pub round: u32,
108    pub expires_at_unix_ms: u64,
109    pub local_expires_at: Instant,
110    pub expected_coordinator: Option<PeerId>,
111}
112
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub(crate) struct RecordedRejection {
115    pub request_id: u64,
116    pub round: u32,
117    pub reason: RejectionReason,
118    pub from_peer: Option<PeerId>,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub(crate) struct InboundOffer {
123    pub coordinator: PeerId,
124    pub initiator: PeerId,
125    pub target: PeerId,
126    pub request_id: u64,
127    pub round: u32,
128    pub initiator_addrs: Vec<SocketAddr>,
129    pub expires_at_unix_ms: u64,
130    pub local_expires_at: Instant,
131}
132
133static REQUEST_COUNTER: AtomicU64 = AtomicU64::new(1);
134static LAST_SCAVENGE_MS: AtomicU64 = AtomicU64::new(0);
135static PENDING_REQUESTS: OnceLock<DashMap<u64, PendingRequest>> = OnceLock::new();
136static INBOUND_OFFERS: OnceLock<DashMap<([u8; 32], u64), InboundOffer>> = OnceLock::new();
137static LIVE_REQUESTS: OnceLock<DashMap<([u8; 32], [u8; 32]), LiveRequest>> = OnceLock::new();
138static RATE_LIMITS: OnceLock<DashMap<([u8; 32], [u8; 32]), u64>> = OnceLock::new();
139static REJECTIONS: OnceLock<DashMap<([u8; 32], [u8; 32]), RecordedRejection>> = OnceLock::new();
140
141fn pending_requests() -> &'static DashMap<u64, PendingRequest> {
142    PENDING_REQUESTS.get_or_init(DashMap::new)
143}
144
145fn inbound_offers() -> &'static DashMap<([u8; 32], u64), InboundOffer> {
146    INBOUND_OFFERS.get_or_init(DashMap::new)
147}
148
149fn live_requests() -> &'static DashMap<([u8; 32], [u8; 32]), LiveRequest> {
150    LIVE_REQUESTS.get_or_init(DashMap::new)
151}
152
153fn rate_limits() -> &'static DashMap<([u8; 32], [u8; 32]), u64> {
154    RATE_LIMITS.get_or_init(DashMap::new)
155}
156
157fn rejections() -> &'static DashMap<([u8; 32], [u8; 32]), RecordedRejection> {
158    REJECTIONS.get_or_init(DashMap::new)
159}
160
161fn maybe_scavenge_expired_state(now_ms: u64) {
162    let sweep_interval_ms =
163        u64::try_from(COORDINATOR_STATE_SWEEP_INTERVAL.as_millis()).unwrap_or(u64::MAX);
164    let last_scavenge = LAST_SCAVENGE_MS.load(Ordering::Relaxed);
165    if now_ms.saturating_sub(last_scavenge) < sweep_interval_ms {
166        return;
167    }
168
169    if LAST_SCAVENGE_MS
170        .compare_exchange(last_scavenge, now_ms, Ordering::Relaxed, Ordering::Relaxed)
171        .is_ok()
172    {
173        scavenge_expired_state(now_ms);
174    }
175}
176
177fn scavenge_expired_state(now_ms: u64) {
178    let now = Instant::now();
179    pending_requests().retain(|_, pending| pending.local_expires_at > now);
180    inbound_offers().retain(|_, offer| offer.local_expires_at > now);
181    live_requests().retain(|_, live| live.local_expires_at > now);
182    rejections().retain(|key, recorded| {
183        // This cross-map lookup stays safe because `rejections` and
184        // `live_requests` are distinct DashMaps with independent shard locks.
185        live_requests().get(key).is_some_and(|live| {
186            live.local_expires_at > now
187                && live.request_id == recorded.request_id
188                && live.round == recorded.round
189        })
190    });
191
192    let rate_limit_window_ms =
193        u64::try_from(COORDINATOR_RATE_LIMIT_WINDOW.as_millis()).unwrap_or(u64::MAX);
194    rate_limits().retain(|_, last_seen_at_ms| {
195        now_ms.saturating_sub(*last_seen_at_ms) < rate_limit_window_ms
196    });
197}
198
199pub(crate) fn encode_coordinator_control(
200    envelope: &CoordinatorControlEnvelope,
201) -> Result<Vec<u8>, serde_json::Error> {
202    let mut out = Vec::new();
203    out.extend_from_slice(COORDINATOR_CONTROL_MAGIC);
204    out.push(COORDINATOR_CONTROL_VERSION);
205    out.extend_from_slice(&serde_json::to_vec(envelope)?);
206    Ok(out)
207}
208
209pub(crate) fn decode_coordinator_control(
210    bytes: &[u8],
211) -> Result<Option<CoordinatorControlEnvelope>, serde_json::Error> {
212    if bytes.len() < COORDINATOR_CONTROL_MAGIC.len() + 1 {
213        return Ok(None);
214    }
215    if &bytes[..COORDINATOR_CONTROL_MAGIC.len()] != COORDINATOR_CONTROL_MAGIC {
216        return Ok(None);
217    }
218    if bytes[COORDINATOR_CONTROL_MAGIC.len()] != COORDINATOR_CONTROL_VERSION {
219        return Ok(None);
220    }
221    let payload = &bytes[COORDINATOR_CONTROL_MAGIC.len() + 1..];
222    serde_json::from_slice(payload).map(Some)
223}
224
225pub(crate) fn next_request_id() -> u64 {
226    REQUEST_COUNTER.fetch_add(1, Ordering::Relaxed)
227}
228
229pub(crate) fn remember_pending_request(request_id: u64, pending: PendingRequest) {
230    maybe_scavenge_expired_state(now_unix_ms());
231    pending_requests().insert(request_id, pending);
232}
233
234pub(crate) fn get_pending_request(request_id: u64) -> Option<PendingRequest> {
235    maybe_scavenge_expired_state(now_unix_ms());
236    let pending = pending_requests()
237        .get(&request_id)
238        .map(|entry| entry.value().clone())?;
239    if pending.local_expires_at <= Instant::now() {
240        let _ = pending_requests().remove(&request_id);
241        return None;
242    }
243    Some(pending)
244}
245
246pub(crate) fn remove_pending_request(request_id: u64) -> Option<PendingRequest> {
247    maybe_scavenge_expired_state(now_unix_ms());
248    pending_requests()
249        .remove(&request_id)
250        .map(|(_, pending)| pending)
251}
252
253pub(crate) fn remember_inbound_offer(
254    local_target_peer: PeerId,
255    request_id: u64,
256    offer: InboundOffer,
257) {
258    maybe_scavenge_expired_state(now_unix_ms());
259    inbound_offers().insert((local_target_peer.0, request_id), offer);
260}
261
262pub(crate) fn inbound_offer(local_target_peer: PeerId, request_id: u64) -> Option<InboundOffer> {
263    maybe_scavenge_expired_state(now_unix_ms());
264    let offer = inbound_offers()
265        .get(&(local_target_peer.0, request_id))
266        .map(|entry| entry.value().clone())?;
267    if offer.local_expires_at <= Instant::now() {
268        let _ = inbound_offers().remove(&(local_target_peer.0, request_id));
269        return None;
270    }
271    Some(offer)
272}
273
274pub(crate) fn remove_inbound_offer(
275    local_target_peer: PeerId,
276    request_id: u64,
277) -> Option<InboundOffer> {
278    maybe_scavenge_expired_state(now_unix_ms());
279    inbound_offers()
280        .remove(&(local_target_peer.0, request_id))
281        .map(|(_, offer)| offer)
282}
283
284pub(crate) fn note_rate_limit_and_check(local_peer: PeerId, requester_peer: PeerId) -> bool {
285    let now_ms = now_unix_ms();
286    maybe_scavenge_expired_state(now_ms);
287    let window_ms = u64::try_from(COORDINATOR_RATE_LIMIT_WINDOW.as_millis()).unwrap_or(u64::MAX);
288    let key = (local_peer.0, requester_peer.0);
289
290    let previous = rate_limits().get(&key).map(|entry| *entry);
291
292    if previous.is_some_and(|previous| now_ms.saturating_sub(previous) < window_ms) {
293        rate_limits().insert(key, now_ms);
294        return false;
295    }
296
297    rate_limits().insert(key, now_ms);
298    true
299}
300
301pub(crate) fn remember_live_request(local_peer: PeerId, target_peer: PeerId, live: LiveRequest) {
302    maybe_scavenge_expired_state(now_unix_ms());
303    live_requests().insert((local_peer.0, target_peer.0), live);
304}
305
306pub(crate) fn live_request(local_peer: PeerId, target_peer: PeerId) -> Option<LiveRequest> {
307    maybe_scavenge_expired_state(now_unix_ms());
308    let live = live_requests()
309        .get(&(local_peer.0, target_peer.0))
310        .map(|entry| entry.value().clone())?;
311    if live.local_expires_at <= Instant::now() {
312        let _ = live_requests().remove(&(local_peer.0, target_peer.0));
313        return None;
314    }
315    Some(live)
316}
317
318pub(crate) fn clear_live_request(local_peer: PeerId, target_peer: PeerId) -> Option<LiveRequest> {
319    maybe_scavenge_expired_state(now_unix_ms());
320    live_requests()
321        .remove(&(local_peer.0, target_peer.0))
322        .map(|(_, live)| live)
323}
324
325pub(crate) fn record_rejection(
326    local_peer: PeerId,
327    target_peer: PeerId,
328    request_id: u64,
329    round: u32,
330    from_peer: Option<PeerId>,
331    reason: RejectionReason,
332) {
333    maybe_scavenge_expired_state(now_unix_ms());
334    rejections().insert(
335        (local_peer.0, target_peer.0),
336        RecordedRejection {
337            request_id,
338            round,
339            reason,
340            from_peer,
341        },
342    );
343}
344
345pub(crate) fn take_live_rejection(
346    local_peer: PeerId,
347    target_peer: PeerId,
348) -> Option<RecordedRejection> {
349    maybe_scavenge_expired_state(now_unix_ms());
350    let key = (local_peer.0, target_peer.0);
351    let live = live_request(local_peer, target_peer)?;
352    let recorded = rejections().get(&key).map(|entry| entry.value().clone())?;
353
354    if recorded.request_id != live.request_id || recorded.round != live.round {
355        return None;
356    }
357
358    rejections().remove(&key).map(|(_, rejection)| rejection)
359}
360
361pub(crate) fn monotonic_deadline_from_unix_ms(expires_at_unix_ms: u64) -> Instant {
362    let remaining_ms = expires_at_unix_ms.saturating_sub(now_unix_ms());
363    Instant::now() + Duration::from_millis(remaining_ms)
364}
365
366pub(crate) fn wire_and_monotonic_expiry_after(duration: Duration) -> (u64, Instant) {
367    let expiry_ms = duration.as_millis().min(u128::from(u64::MAX)) as u64;
368    (
369        now_unix_ms().saturating_add(expiry_ms),
370        Instant::now() + duration,
371    )
372}
373
374pub(crate) fn now_unix_ms() -> u64 {
375    SystemTime::now()
376        .duration_since(UNIX_EPOCH)
377        .map(|d| d.as_millis() as u64)
378        .unwrap_or(0)
379}
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384
385    fn peer(byte: u8) -> PeerId {
386        PeerId([byte; 32])
387    }
388
389    #[test]
390    fn coordination_request_round_trips_through_codec() {
391        let initiator = peer(0x11);
392        let target = peer(0x22);
393        let request_id = 42;
394        let expires_at_unix_ms = 1_700_000_000_123;
395        let initiator_addrs = vec![
396            "203.0.113.10:9000".parse().expect("valid ipv4 address"),
397            "[2001:db8::10]:9000".parse().expect("valid ipv6 address"),
398        ];
399        let envelope = CoordinatorControlEnvelope {
400            request_id,
401            expires_at_unix_ms,
402            message: CoordinatorControlMessage::CoordinationRequest {
403                initiator,
404                target,
405                round: 3,
406                initiator_addrs: initiator_addrs.clone(),
407            },
408        };
409
410        let encoded = encode_coordinator_control(&envelope).expect("encode should succeed");
411        let decoded = decode_coordinator_control(&encoded)
412            .expect("decode should succeed")
413            .expect("control payload should be recognized");
414
415        assert_eq!(decoded.request_id, request_id);
416        assert_eq!(decoded.expires_at_unix_ms, expires_at_unix_ms);
417        match decoded.message {
418            CoordinatorControlMessage::CoordinationRequest {
419                initiator: decoded_initiator,
420                target: decoded_target,
421                round,
422                initiator_addrs: decoded_addrs,
423            } => {
424                assert_eq!(decoded_initiator, initiator);
425                assert_eq!(decoded_target, target);
426                assert_eq!(round, 3);
427                assert_eq!(decoded_addrs, initiator_addrs);
428            }
429            other => panic!("unexpected decoded message: {:?}", other),
430        }
431    }
432
433    #[test]
434    fn coordination_accepted_round_trips_through_codec() {
435        let initiator = peer(0x11);
436        let target = peer(0x22);
437        let request_id = 43;
438        let expires_at_unix_ms = 1_700_000_000_456;
439        let initiator_addrs = vec![
440            "203.0.113.11:9001".parse().expect("valid ipv4 address"),
441            "[2001:db8::11]:9001".parse().expect("valid ipv6 address"),
442        ];
443        let target_addrs = vec![
444            "198.51.100.22:9443".parse().expect("valid ipv4 address"),
445            "[2001:db8::22]:9443".parse().expect("valid ipv6 address"),
446        ];
447        let envelope = CoordinatorControlEnvelope {
448            request_id,
449            expires_at_unix_ms,
450            message: CoordinatorControlMessage::CoordinationAccepted {
451                initiator,
452                target,
453                round: 4,
454                initiator_addrs: initiator_addrs.clone(),
455                target_addrs: target_addrs.clone(),
456            },
457        };
458
459        let encoded = encode_coordinator_control(&envelope).expect("encode should succeed");
460        let decoded = decode_coordinator_control(&encoded)
461            .expect("decode should succeed")
462            .expect("control payload should be recognized");
463
464        match decoded.message {
465            CoordinatorControlMessage::CoordinationAccepted {
466                initiator: decoded_initiator,
467                target: decoded_target,
468                round,
469                initiator_addrs: decoded_initiator_addrs,
470                target_addrs: decoded_target_addrs,
471            } => {
472                assert_eq!(decoded_initiator, initiator);
473                assert_eq!(decoded_target, target);
474                assert_eq!(round, 4);
475                assert_eq!(decoded_initiator_addrs, initiator_addrs);
476                assert_eq!(decoded_target_addrs, target_addrs);
477            }
478            other => panic!("unexpected decoded message: {:?}", other),
479        }
480    }
481
482    #[test]
483    fn non_control_payload_decodes_to_none() {
484        let decoded = decode_coordinator_control(b"not coordinator control")
485            .expect("non-control payload should not error");
486        assert!(decoded.is_none());
487    }
488
489    #[test]
490    fn rejections_are_namespaced_by_local_and_target_peer() {
491        let local_a = peer(0x01);
492        let local_b = peer(0x02);
493        let target = peer(0x33);
494        let now_ms = now_unix_ms();
495        let fresh_deadline = Instant::now() + Duration::from_secs(60);
496
497        let _ = clear_live_request(local_a, target);
498        let _ = clear_live_request(local_b, target);
499        let _ = take_live_rejection(local_a, target);
500        let _ = take_live_rejection(local_b, target);
501
502        remember_live_request(
503            local_a,
504            target,
505            LiveRequest {
506                request_id: 100,
507                round: 1,
508                expires_at_unix_ms: now_ms + 60_000,
509                local_expires_at: fresh_deadline,
510                expected_coordinator: None,
511            },
512        );
513        remember_live_request(
514            local_b,
515            target,
516            LiveRequest {
517                request_id: 200,
518                round: 1,
519                expires_at_unix_ms: now_ms + 61_000,
520                local_expires_at: fresh_deadline,
521                expected_coordinator: None,
522            },
523        );
524
525        record_rejection(
526            local_a,
527            target,
528            100,
529            1,
530            Some(peer(0x77)),
531            RejectionReason::Expired,
532        );
533        record_rejection(
534            local_b,
535            target,
536            200,
537            1,
538            Some(peer(0x88)),
539            RejectionReason::RateLimited,
540        );
541
542        assert_eq!(
543            take_live_rejection(local_a, target),
544            Some(RecordedRejection {
545                request_id: 100,
546                round: 1,
547                reason: RejectionReason::Expired,
548                from_peer: Some(peer(0x77)),
549            })
550        );
551        assert_eq!(
552            take_live_rejection(local_b, target),
553            Some(RecordedRejection {
554                request_id: 200,
555                round: 1,
556                reason: RejectionReason::RateLimited,
557                from_peer: Some(peer(0x88)),
558            })
559        );
560        assert_eq!(take_live_rejection(local_a, target), None);
561        assert_eq!(take_live_rejection(local_b, target), None);
562
563        remember_live_request(
564            local_a,
565            target,
566            LiveRequest {
567                request_id: 301,
568                round: 2,
569                expires_at_unix_ms: now_ms + 62_000,
570                local_expires_at: fresh_deadline,
571                expected_coordinator: None,
572            },
573        );
574        record_rejection(
575            local_a,
576            target,
577            300,
578            1,
579            Some(peer(0x99)),
580            RejectionReason::InternalError,
581        );
582        assert_eq!(take_live_rejection(local_a, target), None);
583
584        record_rejection(
585            local_a,
586            target,
587            301,
588            2,
589            Some(peer(0xAA)),
590            RejectionReason::UnknownTarget,
591        );
592        assert_eq!(
593            take_live_rejection(local_a, target),
594            Some(RecordedRejection {
595                request_id: 301,
596                round: 2,
597                reason: RejectionReason::UnknownTarget,
598                from_peer: Some(peer(0xAA)),
599            })
600        );
601    }
602
603    #[test]
604    fn rate_limit_is_namespaced_by_local_peer() {
605        let requester = peer(0x44);
606        let local_a = peer(0x55);
607        let local_b = peer(0x66);
608
609        assert!(note_rate_limit_and_check(local_a, requester));
610        assert!(!note_rate_limit_and_check(local_a, requester));
611
612        assert!(note_rate_limit_and_check(local_b, requester));
613        assert!(!note_rate_limit_and_check(local_b, requester));
614    }
615
616    #[test]
617    fn scavenger_removes_expired_abandoned_state() {
618        let now_ms = now_unix_ms();
619        let expired_deadline = Instant::now();
620        let fresh_deadline = Instant::now() + Duration::from_secs(10);
621        let expired_request_id = 9_101;
622        let fresh_request_id = 9_102;
623        let expired_target = peer(0x71);
624        let fresh_target = peer(0x72);
625        let expired_local = peer(0x73);
626        let fresh_local = peer(0x74);
627        let expired_requester = peer(0x75);
628        let fresh_requester = peer(0x76);
629
630        let _ = remove_pending_request(expired_request_id);
631        let _ = remove_pending_request(fresh_request_id);
632        let _ = remove_inbound_offer(expired_target, expired_request_id);
633        let _ = remove_inbound_offer(fresh_target, fresh_request_id);
634        let _ = clear_live_request(expired_local, expired_target);
635        let _ = clear_live_request(fresh_local, fresh_target);
636        let _ = take_live_rejection(expired_local, expired_target);
637        let _ = take_live_rejection(fresh_local, fresh_target);
638        rate_limits().remove(&(expired_local.0, expired_requester.0));
639        rate_limits().remove(&(fresh_local.0, fresh_requester.0));
640
641        remember_pending_request(
642            expired_request_id,
643            PendingRequest {
644                initiator: expired_local,
645                target: expired_target,
646                round: 1,
647                initiator_addrs: Vec::new(),
648                expires_at_unix_ms: now_ms - 1,
649                local_expires_at: expired_deadline,
650            },
651        );
652        remember_pending_request(
653            fresh_request_id,
654            PendingRequest {
655                initiator: fresh_local,
656                target: fresh_target,
657                round: 2,
658                initiator_addrs: Vec::new(),
659                expires_at_unix_ms: now_ms + 10_000,
660                local_expires_at: fresh_deadline,
661            },
662        );
663
664        remember_inbound_offer(
665            expired_target,
666            expired_request_id,
667            InboundOffer {
668                coordinator: peer(0x77),
669                initiator: expired_local,
670                target: expired_target,
671                request_id: expired_request_id,
672                round: 1,
673                initiator_addrs: Vec::new(),
674                expires_at_unix_ms: now_ms - 1,
675                local_expires_at: expired_deadline,
676            },
677        );
678        remember_inbound_offer(
679            fresh_target,
680            fresh_request_id,
681            InboundOffer {
682                coordinator: peer(0x78),
683                initiator: fresh_local,
684                target: fresh_target,
685                request_id: fresh_request_id,
686                round: 2,
687                initiator_addrs: Vec::new(),
688                expires_at_unix_ms: now_ms + 10_000,
689                local_expires_at: fresh_deadline,
690            },
691        );
692
693        remember_live_request(
694            expired_local,
695            expired_target,
696            LiveRequest {
697                request_id: expired_request_id,
698                round: 1,
699                expires_at_unix_ms: now_ms - 1,
700                local_expires_at: expired_deadline,
701                expected_coordinator: None,
702            },
703        );
704        remember_live_request(
705            fresh_local,
706            fresh_target,
707            LiveRequest {
708                request_id: fresh_request_id,
709                round: 2,
710                expires_at_unix_ms: now_ms + 10_000,
711                local_expires_at: fresh_deadline,
712                expected_coordinator: None,
713            },
714        );
715
716        record_rejection(
717            expired_local,
718            expired_target,
719            expired_request_id,
720            1,
721            Some(peer(0x79)),
722            RejectionReason::Expired,
723        );
724        record_rejection(
725            fresh_local,
726            fresh_target,
727            fresh_request_id,
728            2,
729            Some(peer(0x7A)),
730            RejectionReason::UnknownTarget,
731        );
732
733        let rate_limit_window_ms =
734            u64::try_from(COORDINATOR_RATE_LIMIT_WINDOW.as_millis()).unwrap_or(u64::MAX);
735        rate_limits().insert(
736            (expired_local.0, expired_requester.0),
737            now_ms.saturating_sub(rate_limit_window_ms + 1),
738        );
739        rate_limits().insert((fresh_local.0, fresh_requester.0), now_ms);
740
741        scavenge_expired_state(now_ms);
742
743        assert!(get_pending_request(expired_request_id).is_none());
744        assert!(get_pending_request(fresh_request_id).is_some());
745        assert!(inbound_offer(expired_target, expired_request_id).is_none());
746        assert!(inbound_offer(fresh_target, fresh_request_id).is_some());
747        assert!(live_request(expired_local, expired_target).is_none());
748        assert!(live_request(fresh_local, fresh_target).is_some());
749        assert!(take_live_rejection(expired_local, expired_target).is_none());
750        assert!(take_live_rejection(fresh_local, fresh_target).is_some());
751        assert!(!rate_limits().contains_key(&(expired_local.0, expired_requester.0)));
752        assert!(rate_limits().contains_key(&(fresh_local.0, fresh_requester.0)));
753
754        let _ = remove_pending_request(fresh_request_id);
755        let _ = remove_inbound_offer(fresh_target, fresh_request_id);
756        let _ = clear_live_request(fresh_local, fresh_target);
757        let _ = take_live_rejection(fresh_local, fresh_target);
758        rate_limits().remove(&(fresh_local.0, fresh_requester.0));
759    }
760
761    #[test]
762    fn getters_prefer_monotonic_deadlines_over_future_wire_expiry() {
763        let now_ms = now_unix_ms();
764        let request_id = 9_201;
765        let local_peer = peer(0x81);
766        let target_peer = peer(0x82);
767        let expired_deadline = Instant::now();
768
769        let _ = remove_pending_request(request_id);
770        let _ = remove_inbound_offer(target_peer, request_id);
771        let _ = clear_live_request(local_peer, target_peer);
772
773        remember_pending_request(
774            request_id,
775            PendingRequest {
776                initiator: local_peer,
777                target: target_peer,
778                round: 1,
779                initiator_addrs: Vec::new(),
780                expires_at_unix_ms: now_ms + 60_000,
781                local_expires_at: expired_deadline,
782            },
783        );
784        remember_inbound_offer(
785            target_peer,
786            request_id,
787            InboundOffer {
788                coordinator: peer(0x83),
789                initiator: local_peer,
790                target: target_peer,
791                request_id,
792                round: 1,
793                initiator_addrs: Vec::new(),
794                expires_at_unix_ms: now_ms + 60_000,
795                local_expires_at: expired_deadline,
796            },
797        );
798        remember_live_request(
799            local_peer,
800            target_peer,
801            LiveRequest {
802                request_id,
803                round: 1,
804                expires_at_unix_ms: now_ms + 60_000,
805                local_expires_at: expired_deadline,
806                expected_coordinator: None,
807            },
808        );
809
810        assert!(get_pending_request(request_id).is_none());
811        assert!(inbound_offer(target_peer, request_id).is_none());
812        assert!(live_request(local_peer, target_peer).is_none());
813    }
814}