1#![allow(dead_code)]
2
3use std::{
4 net::SocketAddr,
5 sync::{
6 OnceLock,
7 atomic::{AtomicU64, Ordering},
8 },
9 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
10};
11
12use dashmap::DashMap;
13use serde::{Deserialize, Serialize};
14
15use crate::nat_traversal_api::PeerId;
16
17pub(crate) const COORDINATOR_CONTROL_MAGIC: &[u8; 4] = b"AQCC";
18pub(crate) const COORDINATOR_CONTROL_VERSION: u8 = 1;
19pub(crate) const COORDINATOR_RATE_LIMIT_WINDOW: Duration = Duration::from_secs(2);
20const COORDINATOR_STATE_SWEEP_INTERVAL: Duration = Duration::from_secs(30);
21
22#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
23pub(crate) struct CoordinatorControlEnvelope {
24 pub request_id: u64,
25 pub expires_at_unix_ms: u64,
26 pub message: CoordinatorControlMessage,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
30pub(crate) enum CoordinatorControlMessage {
31 CoordinationRequest {
32 initiator: PeerId,
33 target: PeerId,
34 round: u32,
35 initiator_addrs: Vec<SocketAddr>,
36 },
37 CoordinationOffer {
38 initiator: PeerId,
39 target: PeerId,
40 round: u32,
41 initiator_addrs: Vec<SocketAddr>,
42 },
43 CoordinationReady {
44 initiator: PeerId,
45 target: PeerId,
46 round: u32,
47 target_addrs: Vec<SocketAddr>,
48 },
49 CoordinationAccepted {
50 initiator: PeerId,
51 target: PeerId,
52 round: u32,
53 initiator_addrs: Vec<SocketAddr>,
54 target_addrs: Vec<SocketAddr>,
55 },
56 CoordinationRejected {
57 initiator: PeerId,
58 target: PeerId,
59 round: u32,
60 reason: RejectionReason,
61 },
62}
63
64#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
66pub enum RejectionReason {
67 SelfTarget,
69 UnknownTarget,
71 Expired,
73 RateLimited,
75 Unauthenticated,
77 InternalError,
79}
80
81impl std::fmt::Display for RejectionReason {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 match self {
84 Self::SelfTarget => write!(f, "self-target request"),
85 Self::UnknownTarget => write!(f, "unknown target"),
86 Self::Expired => write!(f, "request expired"),
87 Self::RateLimited => write!(f, "rate limited"),
88 Self::Unauthenticated => write!(f, "unauthenticated requester"),
89 Self::InternalError => write!(f, "internal coordinator error"),
90 }
91 }
92}
93
94#[derive(Debug, Clone)]
95pub(crate) struct PendingRequest {
96 pub initiator: PeerId,
97 pub target: PeerId,
98 pub round: u32,
99 pub initiator_addrs: Vec<SocketAddr>,
100 pub expires_at_unix_ms: u64,
101 pub local_expires_at: Instant,
102}
103
104#[derive(Debug, Clone, PartialEq, Eq)]
105pub(crate) struct LiveRequest {
106 pub request_id: u64,
107 pub round: u32,
108 pub expires_at_unix_ms: u64,
109 pub local_expires_at: Instant,
110 pub expected_coordinator: Option<PeerId>,
111}
112
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub(crate) struct RecordedRejection {
115 pub request_id: u64,
116 pub round: u32,
117 pub reason: RejectionReason,
118 pub from_peer: Option<PeerId>,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub(crate) struct InboundOffer {
123 pub coordinator: PeerId,
124 pub initiator: PeerId,
125 pub target: PeerId,
126 pub request_id: u64,
127 pub round: u32,
128 pub initiator_addrs: Vec<SocketAddr>,
129 pub expires_at_unix_ms: u64,
130 pub local_expires_at: Instant,
131}
132
133static REQUEST_COUNTER: AtomicU64 = AtomicU64::new(1);
134static LAST_SCAVENGE_MS: AtomicU64 = AtomicU64::new(0);
135static PENDING_REQUESTS: OnceLock<DashMap<u64, PendingRequest>> = OnceLock::new();
136static INBOUND_OFFERS: OnceLock<DashMap<([u8; 32], u64), InboundOffer>> = OnceLock::new();
137static LIVE_REQUESTS: OnceLock<DashMap<([u8; 32], [u8; 32]), LiveRequest>> = OnceLock::new();
138static RATE_LIMITS: OnceLock<DashMap<([u8; 32], [u8; 32]), u64>> = OnceLock::new();
139static REJECTIONS: OnceLock<DashMap<([u8; 32], [u8; 32]), RecordedRejection>> = OnceLock::new();
140
141fn pending_requests() -> &'static DashMap<u64, PendingRequest> {
142 PENDING_REQUESTS.get_or_init(DashMap::new)
143}
144
145fn inbound_offers() -> &'static DashMap<([u8; 32], u64), InboundOffer> {
146 INBOUND_OFFERS.get_or_init(DashMap::new)
147}
148
149fn live_requests() -> &'static DashMap<([u8; 32], [u8; 32]), LiveRequest> {
150 LIVE_REQUESTS.get_or_init(DashMap::new)
151}
152
153fn rate_limits() -> &'static DashMap<([u8; 32], [u8; 32]), u64> {
154 RATE_LIMITS.get_or_init(DashMap::new)
155}
156
157fn rejections() -> &'static DashMap<([u8; 32], [u8; 32]), RecordedRejection> {
158 REJECTIONS.get_or_init(DashMap::new)
159}
160
161fn maybe_scavenge_expired_state(now_ms: u64) {
162 let sweep_interval_ms =
163 u64::try_from(COORDINATOR_STATE_SWEEP_INTERVAL.as_millis()).unwrap_or(u64::MAX);
164 let last_scavenge = LAST_SCAVENGE_MS.load(Ordering::Relaxed);
165 if now_ms.saturating_sub(last_scavenge) < sweep_interval_ms {
166 return;
167 }
168
169 if LAST_SCAVENGE_MS
170 .compare_exchange(last_scavenge, now_ms, Ordering::Relaxed, Ordering::Relaxed)
171 .is_ok()
172 {
173 scavenge_expired_state(now_ms);
174 }
175}
176
177fn scavenge_expired_state(now_ms: u64) {
178 let now = Instant::now();
179 pending_requests().retain(|_, pending| pending.local_expires_at > now);
180 inbound_offers().retain(|_, offer| offer.local_expires_at > now);
181 live_requests().retain(|_, live| live.local_expires_at > now);
182 rejections().retain(|key, recorded| {
183 live_requests().get(key).is_some_and(|live| {
186 live.local_expires_at > now
187 && live.request_id == recorded.request_id
188 && live.round == recorded.round
189 })
190 });
191
192 let rate_limit_window_ms =
193 u64::try_from(COORDINATOR_RATE_LIMIT_WINDOW.as_millis()).unwrap_or(u64::MAX);
194 rate_limits().retain(|_, last_seen_at_ms| {
195 now_ms.saturating_sub(*last_seen_at_ms) < rate_limit_window_ms
196 });
197}
198
199pub(crate) fn encode_coordinator_control(
200 envelope: &CoordinatorControlEnvelope,
201) -> Result<Vec<u8>, serde_json::Error> {
202 let mut out = Vec::new();
203 out.extend_from_slice(COORDINATOR_CONTROL_MAGIC);
204 out.push(COORDINATOR_CONTROL_VERSION);
205 out.extend_from_slice(&serde_json::to_vec(envelope)?);
206 Ok(out)
207}
208
209pub(crate) fn decode_coordinator_control(
210 bytes: &[u8],
211) -> Result<Option<CoordinatorControlEnvelope>, serde_json::Error> {
212 if bytes.len() < COORDINATOR_CONTROL_MAGIC.len() + 1 {
213 return Ok(None);
214 }
215 if &bytes[..COORDINATOR_CONTROL_MAGIC.len()] != COORDINATOR_CONTROL_MAGIC {
216 return Ok(None);
217 }
218 if bytes[COORDINATOR_CONTROL_MAGIC.len()] != COORDINATOR_CONTROL_VERSION {
219 return Ok(None);
220 }
221 let payload = &bytes[COORDINATOR_CONTROL_MAGIC.len() + 1..];
222 serde_json::from_slice(payload).map(Some)
223}
224
225pub(crate) fn next_request_id() -> u64 {
226 REQUEST_COUNTER.fetch_add(1, Ordering::Relaxed)
227}
228
229pub(crate) fn remember_pending_request(request_id: u64, pending: PendingRequest) {
230 maybe_scavenge_expired_state(now_unix_ms());
231 pending_requests().insert(request_id, pending);
232}
233
234pub(crate) fn get_pending_request(request_id: u64) -> Option<PendingRequest> {
235 maybe_scavenge_expired_state(now_unix_ms());
236 let pending = pending_requests()
237 .get(&request_id)
238 .map(|entry| entry.value().clone())?;
239 if pending.local_expires_at <= Instant::now() {
240 let _ = pending_requests().remove(&request_id);
241 return None;
242 }
243 Some(pending)
244}
245
246pub(crate) fn remove_pending_request(request_id: u64) -> Option<PendingRequest> {
247 maybe_scavenge_expired_state(now_unix_ms());
248 pending_requests()
249 .remove(&request_id)
250 .map(|(_, pending)| pending)
251}
252
253pub(crate) fn remember_inbound_offer(
254 local_target_peer: PeerId,
255 request_id: u64,
256 offer: InboundOffer,
257) {
258 maybe_scavenge_expired_state(now_unix_ms());
259 inbound_offers().insert((local_target_peer.0, request_id), offer);
260}
261
262pub(crate) fn inbound_offer(local_target_peer: PeerId, request_id: u64) -> Option<InboundOffer> {
263 maybe_scavenge_expired_state(now_unix_ms());
264 let offer = inbound_offers()
265 .get(&(local_target_peer.0, request_id))
266 .map(|entry| entry.value().clone())?;
267 if offer.local_expires_at <= Instant::now() {
268 let _ = inbound_offers().remove(&(local_target_peer.0, request_id));
269 return None;
270 }
271 Some(offer)
272}
273
274pub(crate) fn remove_inbound_offer(
275 local_target_peer: PeerId,
276 request_id: u64,
277) -> Option<InboundOffer> {
278 maybe_scavenge_expired_state(now_unix_ms());
279 inbound_offers()
280 .remove(&(local_target_peer.0, request_id))
281 .map(|(_, offer)| offer)
282}
283
284pub(crate) fn note_rate_limit_and_check(local_peer: PeerId, requester_peer: PeerId) -> bool {
285 let now_ms = now_unix_ms();
286 maybe_scavenge_expired_state(now_ms);
287 let window_ms = u64::try_from(COORDINATOR_RATE_LIMIT_WINDOW.as_millis()).unwrap_or(u64::MAX);
288 let key = (local_peer.0, requester_peer.0);
289
290 let previous = rate_limits().get(&key).map(|entry| *entry);
291
292 if previous.is_some_and(|previous| now_ms.saturating_sub(previous) < window_ms) {
293 rate_limits().insert(key, now_ms);
294 return false;
295 }
296
297 rate_limits().insert(key, now_ms);
298 true
299}
300
301pub(crate) fn remember_live_request(local_peer: PeerId, target_peer: PeerId, live: LiveRequest) {
302 maybe_scavenge_expired_state(now_unix_ms());
303 live_requests().insert((local_peer.0, target_peer.0), live);
304}
305
306pub(crate) fn live_request(local_peer: PeerId, target_peer: PeerId) -> Option<LiveRequest> {
307 maybe_scavenge_expired_state(now_unix_ms());
308 let live = live_requests()
309 .get(&(local_peer.0, target_peer.0))
310 .map(|entry| entry.value().clone())?;
311 if live.local_expires_at <= Instant::now() {
312 let _ = live_requests().remove(&(local_peer.0, target_peer.0));
313 return None;
314 }
315 Some(live)
316}
317
318pub(crate) fn clear_live_request(local_peer: PeerId, target_peer: PeerId) -> Option<LiveRequest> {
319 maybe_scavenge_expired_state(now_unix_ms());
320 live_requests()
321 .remove(&(local_peer.0, target_peer.0))
322 .map(|(_, live)| live)
323}
324
325pub(crate) fn record_rejection(
326 local_peer: PeerId,
327 target_peer: PeerId,
328 request_id: u64,
329 round: u32,
330 from_peer: Option<PeerId>,
331 reason: RejectionReason,
332) {
333 maybe_scavenge_expired_state(now_unix_ms());
334 rejections().insert(
335 (local_peer.0, target_peer.0),
336 RecordedRejection {
337 request_id,
338 round,
339 reason,
340 from_peer,
341 },
342 );
343}
344
345pub(crate) fn take_live_rejection(
346 local_peer: PeerId,
347 target_peer: PeerId,
348) -> Option<RecordedRejection> {
349 maybe_scavenge_expired_state(now_unix_ms());
350 let key = (local_peer.0, target_peer.0);
351 let live = live_request(local_peer, target_peer)?;
352 let recorded = rejections().get(&key).map(|entry| entry.value().clone())?;
353
354 if recorded.request_id != live.request_id || recorded.round != live.round {
355 return None;
356 }
357
358 rejections().remove(&key).map(|(_, rejection)| rejection)
359}
360
361pub(crate) fn monotonic_deadline_from_unix_ms(expires_at_unix_ms: u64) -> Instant {
362 let remaining_ms = expires_at_unix_ms.saturating_sub(now_unix_ms());
363 Instant::now() + Duration::from_millis(remaining_ms)
364}
365
366pub(crate) fn wire_and_monotonic_expiry_after(duration: Duration) -> (u64, Instant) {
367 let expiry_ms = duration.as_millis().min(u128::from(u64::MAX)) as u64;
368 (
369 now_unix_ms().saturating_add(expiry_ms),
370 Instant::now() + duration,
371 )
372}
373
374pub(crate) fn now_unix_ms() -> u64 {
375 SystemTime::now()
376 .duration_since(UNIX_EPOCH)
377 .map(|d| d.as_millis() as u64)
378 .unwrap_or(0)
379}
380
381#[cfg(test)]
382mod tests {
383 use super::*;
384
385 fn peer(byte: u8) -> PeerId {
386 PeerId([byte; 32])
387 }
388
389 #[test]
390 fn coordination_request_round_trips_through_codec() {
391 let initiator = peer(0x11);
392 let target = peer(0x22);
393 let request_id = 42;
394 let expires_at_unix_ms = 1_700_000_000_123;
395 let initiator_addrs = vec![
396 "203.0.113.10:9000".parse().expect("valid ipv4 address"),
397 "[2001:db8::10]:9000".parse().expect("valid ipv6 address"),
398 ];
399 let envelope = CoordinatorControlEnvelope {
400 request_id,
401 expires_at_unix_ms,
402 message: CoordinatorControlMessage::CoordinationRequest {
403 initiator,
404 target,
405 round: 3,
406 initiator_addrs: initiator_addrs.clone(),
407 },
408 };
409
410 let encoded = encode_coordinator_control(&envelope).expect("encode should succeed");
411 let decoded = decode_coordinator_control(&encoded)
412 .expect("decode should succeed")
413 .expect("control payload should be recognized");
414
415 assert_eq!(decoded.request_id, request_id);
416 assert_eq!(decoded.expires_at_unix_ms, expires_at_unix_ms);
417 match decoded.message {
418 CoordinatorControlMessage::CoordinationRequest {
419 initiator: decoded_initiator,
420 target: decoded_target,
421 round,
422 initiator_addrs: decoded_addrs,
423 } => {
424 assert_eq!(decoded_initiator, initiator);
425 assert_eq!(decoded_target, target);
426 assert_eq!(round, 3);
427 assert_eq!(decoded_addrs, initiator_addrs);
428 }
429 other => panic!("unexpected decoded message: {:?}", other),
430 }
431 }
432
433 #[test]
434 fn coordination_accepted_round_trips_through_codec() {
435 let initiator = peer(0x11);
436 let target = peer(0x22);
437 let request_id = 43;
438 let expires_at_unix_ms = 1_700_000_000_456;
439 let initiator_addrs = vec![
440 "203.0.113.11:9001".parse().expect("valid ipv4 address"),
441 "[2001:db8::11]:9001".parse().expect("valid ipv6 address"),
442 ];
443 let target_addrs = vec![
444 "198.51.100.22:9443".parse().expect("valid ipv4 address"),
445 "[2001:db8::22]:9443".parse().expect("valid ipv6 address"),
446 ];
447 let envelope = CoordinatorControlEnvelope {
448 request_id,
449 expires_at_unix_ms,
450 message: CoordinatorControlMessage::CoordinationAccepted {
451 initiator,
452 target,
453 round: 4,
454 initiator_addrs: initiator_addrs.clone(),
455 target_addrs: target_addrs.clone(),
456 },
457 };
458
459 let encoded = encode_coordinator_control(&envelope).expect("encode should succeed");
460 let decoded = decode_coordinator_control(&encoded)
461 .expect("decode should succeed")
462 .expect("control payload should be recognized");
463
464 match decoded.message {
465 CoordinatorControlMessage::CoordinationAccepted {
466 initiator: decoded_initiator,
467 target: decoded_target,
468 round,
469 initiator_addrs: decoded_initiator_addrs,
470 target_addrs: decoded_target_addrs,
471 } => {
472 assert_eq!(decoded_initiator, initiator);
473 assert_eq!(decoded_target, target);
474 assert_eq!(round, 4);
475 assert_eq!(decoded_initiator_addrs, initiator_addrs);
476 assert_eq!(decoded_target_addrs, target_addrs);
477 }
478 other => panic!("unexpected decoded message: {:?}", other),
479 }
480 }
481
482 #[test]
483 fn non_control_payload_decodes_to_none() {
484 let decoded = decode_coordinator_control(b"not coordinator control")
485 .expect("non-control payload should not error");
486 assert!(decoded.is_none());
487 }
488
489 #[test]
490 fn rejections_are_namespaced_by_local_and_target_peer() {
491 let local_a = peer(0x01);
492 let local_b = peer(0x02);
493 let target = peer(0x33);
494 let now_ms = now_unix_ms();
495 let fresh_deadline = Instant::now() + Duration::from_secs(60);
496
497 let _ = clear_live_request(local_a, target);
498 let _ = clear_live_request(local_b, target);
499 let _ = take_live_rejection(local_a, target);
500 let _ = take_live_rejection(local_b, target);
501
502 remember_live_request(
503 local_a,
504 target,
505 LiveRequest {
506 request_id: 100,
507 round: 1,
508 expires_at_unix_ms: now_ms + 60_000,
509 local_expires_at: fresh_deadline,
510 expected_coordinator: None,
511 },
512 );
513 remember_live_request(
514 local_b,
515 target,
516 LiveRequest {
517 request_id: 200,
518 round: 1,
519 expires_at_unix_ms: now_ms + 61_000,
520 local_expires_at: fresh_deadline,
521 expected_coordinator: None,
522 },
523 );
524
525 record_rejection(
526 local_a,
527 target,
528 100,
529 1,
530 Some(peer(0x77)),
531 RejectionReason::Expired,
532 );
533 record_rejection(
534 local_b,
535 target,
536 200,
537 1,
538 Some(peer(0x88)),
539 RejectionReason::RateLimited,
540 );
541
542 assert_eq!(
543 take_live_rejection(local_a, target),
544 Some(RecordedRejection {
545 request_id: 100,
546 round: 1,
547 reason: RejectionReason::Expired,
548 from_peer: Some(peer(0x77)),
549 })
550 );
551 assert_eq!(
552 take_live_rejection(local_b, target),
553 Some(RecordedRejection {
554 request_id: 200,
555 round: 1,
556 reason: RejectionReason::RateLimited,
557 from_peer: Some(peer(0x88)),
558 })
559 );
560 assert_eq!(take_live_rejection(local_a, target), None);
561 assert_eq!(take_live_rejection(local_b, target), None);
562
563 remember_live_request(
564 local_a,
565 target,
566 LiveRequest {
567 request_id: 301,
568 round: 2,
569 expires_at_unix_ms: now_ms + 62_000,
570 local_expires_at: fresh_deadline,
571 expected_coordinator: None,
572 },
573 );
574 record_rejection(
575 local_a,
576 target,
577 300,
578 1,
579 Some(peer(0x99)),
580 RejectionReason::InternalError,
581 );
582 assert_eq!(take_live_rejection(local_a, target), None);
583
584 record_rejection(
585 local_a,
586 target,
587 301,
588 2,
589 Some(peer(0xAA)),
590 RejectionReason::UnknownTarget,
591 );
592 assert_eq!(
593 take_live_rejection(local_a, target),
594 Some(RecordedRejection {
595 request_id: 301,
596 round: 2,
597 reason: RejectionReason::UnknownTarget,
598 from_peer: Some(peer(0xAA)),
599 })
600 );
601 }
602
603 #[test]
604 fn rate_limit_is_namespaced_by_local_peer() {
605 let requester = peer(0x44);
606 let local_a = peer(0x55);
607 let local_b = peer(0x66);
608
609 assert!(note_rate_limit_and_check(local_a, requester));
610 assert!(!note_rate_limit_and_check(local_a, requester));
611
612 assert!(note_rate_limit_and_check(local_b, requester));
613 assert!(!note_rate_limit_and_check(local_b, requester));
614 }
615
616 #[test]
617 fn scavenger_removes_expired_abandoned_state() {
618 let now_ms = now_unix_ms();
619 let expired_deadline = Instant::now();
620 let fresh_deadline = Instant::now() + Duration::from_secs(10);
621 let expired_request_id = 9_101;
622 let fresh_request_id = 9_102;
623 let expired_target = peer(0x71);
624 let fresh_target = peer(0x72);
625 let expired_local = peer(0x73);
626 let fresh_local = peer(0x74);
627 let expired_requester = peer(0x75);
628 let fresh_requester = peer(0x76);
629
630 let _ = remove_pending_request(expired_request_id);
631 let _ = remove_pending_request(fresh_request_id);
632 let _ = remove_inbound_offer(expired_target, expired_request_id);
633 let _ = remove_inbound_offer(fresh_target, fresh_request_id);
634 let _ = clear_live_request(expired_local, expired_target);
635 let _ = clear_live_request(fresh_local, fresh_target);
636 let _ = take_live_rejection(expired_local, expired_target);
637 let _ = take_live_rejection(fresh_local, fresh_target);
638 rate_limits().remove(&(expired_local.0, expired_requester.0));
639 rate_limits().remove(&(fresh_local.0, fresh_requester.0));
640
641 remember_pending_request(
642 expired_request_id,
643 PendingRequest {
644 initiator: expired_local,
645 target: expired_target,
646 round: 1,
647 initiator_addrs: Vec::new(),
648 expires_at_unix_ms: now_ms - 1,
649 local_expires_at: expired_deadline,
650 },
651 );
652 remember_pending_request(
653 fresh_request_id,
654 PendingRequest {
655 initiator: fresh_local,
656 target: fresh_target,
657 round: 2,
658 initiator_addrs: Vec::new(),
659 expires_at_unix_ms: now_ms + 10_000,
660 local_expires_at: fresh_deadline,
661 },
662 );
663
664 remember_inbound_offer(
665 expired_target,
666 expired_request_id,
667 InboundOffer {
668 coordinator: peer(0x77),
669 initiator: expired_local,
670 target: expired_target,
671 request_id: expired_request_id,
672 round: 1,
673 initiator_addrs: Vec::new(),
674 expires_at_unix_ms: now_ms - 1,
675 local_expires_at: expired_deadline,
676 },
677 );
678 remember_inbound_offer(
679 fresh_target,
680 fresh_request_id,
681 InboundOffer {
682 coordinator: peer(0x78),
683 initiator: fresh_local,
684 target: fresh_target,
685 request_id: fresh_request_id,
686 round: 2,
687 initiator_addrs: Vec::new(),
688 expires_at_unix_ms: now_ms + 10_000,
689 local_expires_at: fresh_deadline,
690 },
691 );
692
693 remember_live_request(
694 expired_local,
695 expired_target,
696 LiveRequest {
697 request_id: expired_request_id,
698 round: 1,
699 expires_at_unix_ms: now_ms - 1,
700 local_expires_at: expired_deadline,
701 expected_coordinator: None,
702 },
703 );
704 remember_live_request(
705 fresh_local,
706 fresh_target,
707 LiveRequest {
708 request_id: fresh_request_id,
709 round: 2,
710 expires_at_unix_ms: now_ms + 10_000,
711 local_expires_at: fresh_deadline,
712 expected_coordinator: None,
713 },
714 );
715
716 record_rejection(
717 expired_local,
718 expired_target,
719 expired_request_id,
720 1,
721 Some(peer(0x79)),
722 RejectionReason::Expired,
723 );
724 record_rejection(
725 fresh_local,
726 fresh_target,
727 fresh_request_id,
728 2,
729 Some(peer(0x7A)),
730 RejectionReason::UnknownTarget,
731 );
732
733 let rate_limit_window_ms =
734 u64::try_from(COORDINATOR_RATE_LIMIT_WINDOW.as_millis()).unwrap_or(u64::MAX);
735 rate_limits().insert(
736 (expired_local.0, expired_requester.0),
737 now_ms.saturating_sub(rate_limit_window_ms + 1),
738 );
739 rate_limits().insert((fresh_local.0, fresh_requester.0), now_ms);
740
741 scavenge_expired_state(now_ms);
742
743 assert!(get_pending_request(expired_request_id).is_none());
744 assert!(get_pending_request(fresh_request_id).is_some());
745 assert!(inbound_offer(expired_target, expired_request_id).is_none());
746 assert!(inbound_offer(fresh_target, fresh_request_id).is_some());
747 assert!(live_request(expired_local, expired_target).is_none());
748 assert!(live_request(fresh_local, fresh_target).is_some());
749 assert!(take_live_rejection(expired_local, expired_target).is_none());
750 assert!(take_live_rejection(fresh_local, fresh_target).is_some());
751 assert!(!rate_limits().contains_key(&(expired_local.0, expired_requester.0)));
752 assert!(rate_limits().contains_key(&(fresh_local.0, fresh_requester.0)));
753
754 let _ = remove_pending_request(fresh_request_id);
755 let _ = remove_inbound_offer(fresh_target, fresh_request_id);
756 let _ = clear_live_request(fresh_local, fresh_target);
757 let _ = take_live_rejection(fresh_local, fresh_target);
758 rate_limits().remove(&(fresh_local.0, fresh_requester.0));
759 }
760
761 #[test]
762 fn getters_prefer_monotonic_deadlines_over_future_wire_expiry() {
763 let now_ms = now_unix_ms();
764 let request_id = 9_201;
765 let local_peer = peer(0x81);
766 let target_peer = peer(0x82);
767 let expired_deadline = Instant::now();
768
769 let _ = remove_pending_request(request_id);
770 let _ = remove_inbound_offer(target_peer, request_id);
771 let _ = clear_live_request(local_peer, target_peer);
772
773 remember_pending_request(
774 request_id,
775 PendingRequest {
776 initiator: local_peer,
777 target: target_peer,
778 round: 1,
779 initiator_addrs: Vec::new(),
780 expires_at_unix_ms: now_ms + 60_000,
781 local_expires_at: expired_deadline,
782 },
783 );
784 remember_inbound_offer(
785 target_peer,
786 request_id,
787 InboundOffer {
788 coordinator: peer(0x83),
789 initiator: local_peer,
790 target: target_peer,
791 request_id,
792 round: 1,
793 initiator_addrs: Vec::new(),
794 expires_at_unix_ms: now_ms + 60_000,
795 local_expires_at: expired_deadline,
796 },
797 );
798 remember_live_request(
799 local_peer,
800 target_peer,
801 LiveRequest {
802 request_id,
803 round: 1,
804 expires_at_unix_ms: now_ms + 60_000,
805 local_expires_at: expired_deadline,
806 expected_coordinator: None,
807 },
808 );
809
810 assert!(get_pending_request(request_id).is_none());
811 assert!(inbound_offer(target_peer, request_id).is_none());
812 assert!(live_request(local_peer, target_peer).is_none());
813 }
814}