1use std::collections::BTreeMap;
9use std::sync::Arc;
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use bytes::{BufMut, Bytes, BytesMut};
13
14use crate::error::{Error, Result};
15use crate::fleet::Fleet;
16use crate::id::NetId64;
17use crate::typed::OrbitTyped;
18
19#[cfg(unix)]
22pub const CONTEST_PAYLOAD_MAX: usize = crate::ring::shm::PAYLOAD_MAX;
23#[cfg(not(unix))]
24pub const CONTEST_PAYLOAD_MAX: usize = 256;
25
26pub const CONTEST_RING_KIND: u8 = 222;
27pub const CONTEST_FRAME_KIND_CLAIM: u8 = 1;
28pub const CONTEST_FRAME_KIND_RELEASE: u8 = 2;
29
30const CLAIM_HEADER_LEN: usize = 1 + 2 + 2 + 8 + 8;
31const RELEASE_HEADER_LEN: usize = 8 + 1 + 2;
32
33#[derive(Clone, Debug)]
35pub struct ContestRecord;
36
37impl OrbitTyped for ContestRecord {
38 const KIND: u8 = CONTEST_RING_KIND;
41}
42
43pub trait ContestType {
49 const KIND: u8;
50}
51
52#[derive(Clone, Debug, PartialEq, Eq, Hash)]
54pub struct ContestSubject {
55 kind: u8,
56 label: String,
57}
58
59impl ContestSubject {
60 pub fn new<T: ContestType>(label: impl Into<String>) -> Self {
61 Self {
62 kind: T::KIND,
63 label: label.into(),
64 }
65 }
66
67 pub const fn kind(&self) -> u8 {
68 self.kind
69 }
70
71 pub fn label(&self) -> &str {
72 &self.label
73 }
74
75 fn as_bytes(&self) -> &[u8] {
76 self.label.as_bytes()
77 }
78
79 fn from_parts(kind: u8, label: &[u8]) -> Self {
80 Self {
81 kind,
82 label: String::from_utf8_lossy(label).into_owned(),
83 }
84 }
85}
86
87#[derive(Clone, Debug)]
89pub struct Contest {
90 fleet: Arc<Fleet>,
91}
92
93impl Contest {
94 pub fn new(fleet: Arc<Fleet>) -> Self {
95 Self { fleet }
96 }
97
98 pub fn reset_ring(&self) -> Result<()> {
103 self.fleet.reset_ring::<ContestRecord>().map_err(Error::Io)
104 }
105
106 pub fn try_claim<T: ContestType>(
111 &self,
112 subject: impl Into<String>,
113 owner: impl Into<ContestOwner>,
114 ttl: Duration,
115 ) -> Result<Claim> {
116 self.try_claim_at::<T>(subject, owner, ttl, now_ms())
117 }
118
119 pub fn try_claim_at<T: ContestType>(
123 &self,
124 subject: impl Into<String>,
125 owner: impl Into<ContestOwner>,
126 ttl: Duration,
127 now_ms: u64,
128 ) -> Result<Claim> {
129 self.try_claim_subject_at(ContestSubject::new::<T>(subject), owner, ttl, now_ms)
130 }
131
132 pub fn try_claim_subject(
134 &self,
135 subject: ContestSubject,
136 owner: impl Into<ContestOwner>,
137 ttl: Duration,
138 ) -> Result<Claim> {
139 self.try_claim_subject_at(subject, owner, ttl, now_ms())
140 }
141
142 pub fn try_claim_subject_at(
144 &self,
145 subject: ContestSubject,
146 owner: impl Into<ContestOwner>,
147 ttl: Duration,
148 now_ms: u64,
149 ) -> Result<Claim> {
150 let owner = owner.into();
151 let expires_at_ms = expires_at(now_ms, ttl);
152 let payload = encode_claim(
153 subject.kind,
154 subject.as_bytes(),
155 owner.as_bytes(),
156 now_ms,
157 expires_at_ms,
158 )?;
159 let claim_id =
160 self.fleet
161 .publish::<ContestRecord>(CONTEST_FRAME_KIND_CLAIM, now_ms, payload);
162
163 let Some(holder) = self.active_holder(&subject, now_ms) else {
164 return Ok(Claim::YieldTo(Holder {
165 claim_id,
166 subject,
167 owner,
168 claimed_at_ms: now_ms,
169 expires_at_ms,
170 }));
171 };
172
173 if holder.claim_id.counter() == claim_id.counter() {
174 Ok(Claim::Claimed(Guard::new(self.clone(), holder)))
175 } else {
176 let _ = self.release_id(&subject, claim_id, now_ms);
177 Ok(Claim::YieldTo(holder))
178 }
179 }
180
181 pub fn guard_holder(&self, holder: Holder) -> Guard {
188 Guard::new(self.clone(), holder)
189 }
190
191 pub fn release_holder(&self, holder: &Holder) -> Result<NetId64> {
197 self.release_id(&holder.subject, holder.claim_id, now_ms())
198 }
199
200 fn release_id(
201 &self,
202 subject: &ContestSubject,
203 claim_id: NetId64,
204 now_ms: u64,
205 ) -> Result<NetId64> {
206 let payload = encode_release(subject.kind, subject.as_bytes(), claim_id)?;
207 Ok(self
208 .fleet
209 .publish::<ContestRecord>(CONTEST_FRAME_KIND_RELEASE, now_ms, payload))
210 }
211
212 fn active_holder(&self, subject: &ContestSubject, now_ms: u64) -> Option<Holder> {
213 let mut cursor = self.fleet.cursor_from_start::<ContestRecord>();
214 let poll = self.fleet.poll_ring::<ContestRecord>(&mut cursor);
215 let mut active = BTreeMap::<u64, Holder>::new();
216
217 for frame in poll.frames {
218 match decode_frame(frame.kind, &frame.payload) {
219 Some(DecodedContestFrame::Claim(decoded))
220 if decoded.subject_kind == subject.kind
221 && decoded.subject == subject.as_bytes()
222 && decoded.expires_at_ms > now_ms =>
223 {
224 active.insert(
225 frame.id.counter(),
226 Holder {
227 claim_id: frame.id,
228 subject: ContestSubject::from_parts(
229 decoded.subject_kind,
230 decoded.subject,
231 ),
232 owner: ContestOwner::from_bytes(decoded.owner),
233 claimed_at_ms: decoded.claimed_at_ms,
234 expires_at_ms: decoded.expires_at_ms,
235 },
236 );
237 }
238 Some(DecodedContestFrame::Release(decoded))
239 if decoded.subject_kind == subject.kind
240 && decoded.subject == subject.as_bytes() =>
241 {
242 active.remove(&decoded.claim_id.counter());
243 }
244 _ => {}
245 }
246 }
247
248 active.into_values().next()
249 }
250}
251
252#[derive(Clone, Debug, PartialEq, Eq, Hash)]
254pub struct ContestOwner(String);
255
256impl ContestOwner {
257 pub fn new(value: impl Into<String>) -> Self {
258 Self(value.into())
259 }
260
261 pub fn as_str(&self) -> &str {
262 &self.0
263 }
264
265 fn as_bytes(&self) -> &[u8] {
266 self.0.as_bytes()
267 }
268
269 fn from_bytes(bytes: &[u8]) -> Self {
270 Self(String::from_utf8_lossy(bytes).into_owned())
271 }
272}
273
274impl From<&str> for ContestOwner {
275 fn from(value: &str) -> Self {
276 Self::new(value)
277 }
278}
279
280impl From<String> for ContestOwner {
281 fn from(value: String) -> Self {
282 Self(value)
283 }
284}
285
286impl From<&String> for ContestOwner {
287 fn from(value: &String) -> Self {
288 Self(value.clone())
289 }
290}
291
292#[derive(Clone, Debug, PartialEq, Eq)]
294pub struct Holder {
295 pub claim_id: NetId64,
296 pub subject: ContestSubject,
297 pub owner: ContestOwner,
298 pub claimed_at_ms: u64,
299 pub expires_at_ms: u64,
300}
301
302#[derive(Debug)]
309pub struct Guard {
310 contest: Contest,
311 holder: Holder,
312 release_on_drop: bool,
313}
314
315impl Guard {
316 fn new(contest: Contest, holder: Holder) -> Self {
317 Self {
318 contest,
319 holder,
320 release_on_drop: true,
321 }
322 }
323
324 pub fn holder(&self) -> &Holder {
325 &self.holder
326 }
327
328 pub fn claim_id(&self) -> NetId64 {
329 self.holder.claim_id
330 }
331
332 pub fn subject(&self) -> &ContestSubject {
333 &self.holder.subject
334 }
335
336 pub fn owner(&self) -> &ContestOwner {
337 &self.holder.owner
338 }
339
340 pub fn expires_at_ms(&self) -> u64 {
341 self.holder.expires_at_ms
342 }
343
344 pub fn release(mut self) -> Result<NetId64> {
346 let released =
347 self.contest
348 .release_id(&self.holder.subject, self.holder.claim_id, now_ms());
349 if released.is_ok() {
350 self.release_on_drop = false;
351 }
352 released
353 }
354}
355
356impl Drop for Guard {
357 fn drop(&mut self) {
358 if !self.release_on_drop {
359 return;
360 }
361 if let Err(err) =
362 self.contest
363 .release_id(&self.holder.subject, self.holder.claim_id, now_ms())
364 {
365 tracing::debug!(
366 claim_id = %self.holder.claim_id,
367 subject_kind = self.holder.subject.kind(),
368 subject = self.holder.subject.label(),
369 error = %err,
370 "contest guard release failed"
371 );
372 }
373 }
374}
375
376#[derive(Debug)]
377pub enum Claim {
378 Claimed(Guard),
379 YieldTo(Holder),
380}
381
382impl Claim {
383 pub fn is_claimed(&self) -> bool {
384 matches!(self, Self::Claimed(_))
385 }
386}
387
388struct DecodedClaim<'a> {
389 subject_kind: u8,
390 subject: &'a [u8],
391 owner: &'a [u8],
392 claimed_at_ms: u64,
393 expires_at_ms: u64,
394}
395
396struct DecodedRelease<'a> {
397 subject_kind: u8,
398 subject: &'a [u8],
399 claim_id: NetId64,
400}
401
402enum DecodedContestFrame<'a> {
403 Claim(DecodedClaim<'a>),
404 Release(DecodedRelease<'a>),
405}
406
407fn encode_claim(
408 subject_kind: u8,
409 subject: &[u8],
410 owner: &[u8],
411 claimed_at_ms: u64,
412 expires_at_ms: u64,
413) -> Result<Bytes> {
414 let total = CLAIM_HEADER_LEN + subject.len() + owner.len();
415 if subject.len() > u16::MAX as usize
416 || owner.len() > u16::MAX as usize
417 || total > CONTEST_PAYLOAD_MAX
418 {
419 return Err(Error::ContestFrameTooLarge {
420 subject_len: subject.len(),
421 owner_len: owner.len(),
422 max_payload: CONTEST_PAYLOAD_MAX,
423 });
424 }
425
426 let mut buf = BytesMut::with_capacity(total);
427 buf.put_u8(subject_kind);
428 buf.put_u16_le(subject.len() as u16);
429 buf.put_u16_le(owner.len() as u16);
430 buf.put_u64_le(claimed_at_ms);
431 buf.put_u64_le(expires_at_ms);
432 buf.put_slice(subject);
433 buf.put_slice(owner);
434 Ok(buf.freeze())
435}
436
437fn encode_release(subject_kind: u8, subject: &[u8], claim_id: NetId64) -> Result<Bytes> {
438 let total = RELEASE_HEADER_LEN + subject.len();
439 if subject.len() > u16::MAX as usize || total > CONTEST_PAYLOAD_MAX {
440 return Err(Error::ContestFrameTooLarge {
441 subject_len: subject.len(),
442 owner_len: 0,
443 max_payload: CONTEST_PAYLOAD_MAX,
444 });
445 }
446
447 let mut buf = BytesMut::with_capacity(total);
448 buf.put_u64_le(claim_id.raw());
449 buf.put_u8(subject_kind);
450 buf.put_u16_le(subject.len() as u16);
451 buf.put_slice(subject);
452 Ok(buf.freeze())
453}
454
455fn decode_frame(frame_kind: u8, payload: &Bytes) -> Option<DecodedContestFrame<'_>> {
456 match frame_kind {
457 CONTEST_FRAME_KIND_CLAIM => decode_claim(payload).map(DecodedContestFrame::Claim),
458 CONTEST_FRAME_KIND_RELEASE => decode_release(payload).map(DecodedContestFrame::Release),
459 _ => None,
460 }
461}
462
463fn decode_claim(payload: &Bytes) -> Option<DecodedClaim<'_>> {
464 if payload.len() < CLAIM_HEADER_LEN {
465 return None;
466 }
467
468 let subject_kind = payload[0];
469 let subject_len = u16::from_le_bytes(payload[1..3].try_into().ok()?) as usize;
470 let owner_len = u16::from_le_bytes(payload[3..5].try_into().ok()?) as usize;
471 let claimed_at_ms = u64::from_le_bytes(payload[5..13].try_into().ok()?);
472 let expires_at_ms = u64::from_le_bytes(payload[13..21].try_into().ok()?);
473 let subject_start = CLAIM_HEADER_LEN;
474 let subject_end = subject_start.checked_add(subject_len)?;
475 let owner_end = subject_end.checked_add(owner_len)?;
476 if payload.len() < owner_end {
477 return None;
478 }
479
480 Some(DecodedClaim {
481 subject_kind,
482 subject: &payload[subject_start..subject_end],
483 owner: &payload[subject_end..owner_end],
484 claimed_at_ms,
485 expires_at_ms,
486 })
487}
488
489fn decode_release(payload: &Bytes) -> Option<DecodedRelease<'_>> {
490 if payload.len() < RELEASE_HEADER_LEN {
491 return None;
492 }
493
494 let claim_id = NetId64::from_raw(u64::from_le_bytes(payload[0..8].try_into().ok()?));
495 let subject_kind = payload[8];
496 let subject_len = u16::from_le_bytes(payload[9..11].try_into().ok()?) as usize;
497 let subject_start = RELEASE_HEADER_LEN;
498 let subject_end = subject_start.checked_add(subject_len)?;
499 if payload.len() < subject_end {
500 return None;
501 }
502
503 Some(DecodedRelease {
504 subject_kind,
505 subject: &payload[subject_start..subject_end],
506 claim_id,
507 })
508}
509
510fn expires_at(now_ms: u64, ttl: Duration) -> u64 {
511 let ttl_ms = ttl.as_millis().min(u128::from(u64::MAX)) as u64;
512 now_ms.saturating_add(ttl_ms)
513}
514
515fn now_ms() -> u64 {
516 SystemTime::now()
517 .duration_since(UNIX_EPOCH)
518 .map(|d| d.as_millis().min(u128::from(u64::MAX)) as u64)
519 .unwrap_or(0)
520}
521
522#[cfg(test)]
523mod tests {
524 use std::sync::Arc;
525 use std::time::Duration;
526
527 use super::{Claim, Contest, ContestType};
528 use crate::Fleet;
529
530 struct OriginProbe;
531
532 impl ContestType for OriginProbe {
533 const KIND: u8 = 1;
534 }
535
536 struct OtherProbe;
537
538 impl ContestType for OtherProbe {
539 const KIND: u8 = 2;
540 }
541
542 #[test]
543 fn first_claim_is_claimed_for_subject() {
544 let fleet = Arc::new(Fleet::join("first_claim_is_claimed", 2).expect("fleet"));
545 let claims = Contest::new(fleet);
546
547 let first = claims
548 .try_claim_at::<OriginProbe>("origin:tcp_1", "worker:1", Duration::from_secs(30), 1_000)
549 .expect("contest");
550 let second = claims
551 .try_claim_at::<OriginProbe>("origin:tcp_1", "worker:2", Duration::from_secs(30), 1_001)
552 .expect("second claim");
553
554 assert!(matches!(first, Claim::Claimed(_)));
555 let Claim::YieldTo(holder) = second else {
556 panic!("second claimant should yield");
557 };
558 assert_eq!(holder.owner.as_str(), "worker:1");
559 }
560
561 #[test]
562 fn different_subject_labels_do_not_compete() {
563 let fleet = Arc::new(Fleet::join("first_claim_subjects", 2).expect("fleet"));
564 let claims = Contest::new(fleet);
565
566 let first = claims
567 .try_claim_at::<OriginProbe>("origin:tcp_1", "worker:1", Duration::from_secs(30), 1_000)
568 .expect("first subject");
569 let second = claims
570 .try_claim_at::<OriginProbe>("origin:tcp_2", "worker:2", Duration::from_secs(30), 1_001)
571 .expect("second subject");
572
573 assert!(first.is_claimed());
574 assert!(second.is_claimed());
575 }
576
577 #[test]
578 fn different_subject_types_do_not_compete() {
579 let fleet = Arc::new(Fleet::join("first_claim_subject_types", 2).expect("fleet"));
580 let claims = Contest::new(fleet);
581
582 let first = claims
583 .try_claim_at::<OriginProbe>("same-label", "worker:1", Duration::from_secs(30), 1_000)
584 .expect("first type");
585 let second = claims
586 .try_claim_at::<OtherProbe>("same-label", "worker:2", Duration::from_secs(30), 1_001)
587 .expect("second type");
588
589 assert!(first.is_claimed());
590 assert!(second.is_claimed());
591 }
592
593 #[test]
594 fn releasing_claim_guard_allows_next_claim() {
595 let fleet = Arc::new(Fleet::join("first_claim_release", 2).expect("fleet"));
596 let claims = Contest::new(fleet);
597
598 let Claim::Claimed(guard) = claims
599 .try_claim_at::<OriginProbe>("origin:tcp_1", "worker:1", Duration::from_secs(30), 1_000)
600 .expect("claim")
601 else {
602 panic!("expected claim");
603 };
604 guard.release().expect("release");
605
606 let next = claims
607 .try_claim_at::<OriginProbe>("origin:tcp_1", "worker:2", Duration::from_secs(30), 1_100)
608 .expect("next claim");
609
610 let Claim::Claimed(guard) = next else {
611 panic!("released claim should not block");
612 };
613 assert_eq!(guard.owner().as_str(), "worker:2");
614 }
615
616 #[test]
617 fn expired_claim_does_not_block_next_claim() {
618 let fleet = Arc::new(Fleet::join("first_claim_expiry", 2).expect("fleet"));
619 let claims = Contest::new(fleet);
620
621 let first = claims
622 .try_claim_at::<OriginProbe>(
623 "origin:tcp_1",
624 "worker:1",
625 Duration::from_millis(5),
626 1_000,
627 )
628 .expect("contest");
629 let second = claims
630 .try_claim_at::<OriginProbe>("origin:tcp_1", "worker:2", Duration::from_secs(30), 1_006)
631 .expect("second claim");
632
633 assert!(first.is_claimed());
634 let Claim::Claimed(guard) = second else {
635 panic!("expired claim should not block");
636 };
637 assert_eq!(guard.owner().as_str(), "worker:2");
638 }
639
640 #[test]
641 fn yielding_claim_releases_itself() {
642 let fleet = Arc::new(Fleet::join("yielding_claim_release", 2).expect("fleet"));
643 let claims = Contest::new(fleet);
644
645 let Claim::Claimed(first_guard) = claims
646 .try_claim_at::<OriginProbe>("origin:tcp_1", "worker:1", Duration::from_secs(30), 1_000)
647 .expect("contest")
648 else {
649 panic!("expected initial claim");
650 };
651 let second = claims
652 .try_claim_at::<OriginProbe>("origin:tcp_1", "worker:2", Duration::from_secs(30), 1_001)
653 .expect("second claim");
654 assert!(matches!(second, Claim::YieldTo(_)));
655
656 first_guard.release().expect("release first");
657 let third = claims
658 .try_claim_at::<OriginProbe>("origin:tcp_1", "worker:3", Duration::from_secs(30), 1_100)
659 .expect("third claim");
660
661 let Claim::Claimed(guard) = third else {
662 panic!("released yielding claim should not block");
663 };
664 assert_eq!(guard.owner().as_str(), "worker:3");
665 }
666}