Skip to main content

orbit_rs/contest/
guard.rs

1//! Contest coordination over one Orbit ring.
2//!
3//! `Contest` is not a race primitive. It turns simultaneous interest in
4//! the same typed subject into a small Claim/Yield protocol: every peer
5//! may publish a claim, the earliest active claim receives a
6//! drop-released [`Guard`], and later claims receive `YieldTo(holder)`.
7
8use std::collections::BTreeMap;
9use std::sync::Arc;
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use bytes::{BufMut, Bytes, BytesMut};
13
14use crate::error::{Error, Result};
15use crate::fleet::Fleet;
16use crate::id::NetId64;
17use crate::typed::OrbitTyped;
18
19/// Contest frame payload limit for V0. On Unix this matches the SHM ring
20/// slot payload size; non-Unix keeps the same bounded contract.
21#[cfg(unix)]
22pub const CONTEST_PAYLOAD_MAX: usize = crate::ring::shm::PAYLOAD_MAX;
23#[cfg(not(unix))]
24pub const CONTEST_PAYLOAD_MAX: usize = 256;
25
26pub const CONTEST_RING_KIND: u8 = 222;
27pub const CONTEST_FRAME_KIND_CLAIM: u8 = 1;
28pub const CONTEST_FRAME_KIND_RELEASE: u8 = 2;
29
30const CLAIM_HEADER_LEN: usize = 1 + 2 + 2 + 8 + 8;
31const RELEASE_HEADER_LEN: usize = 8 + 1 + 2;
32
33/// Dedicated ring record marker for contest frames.
34#[derive(Clone, Debug)]
35pub struct ContestRecord;
36
37impl OrbitTyped for ContestRecord {
38    // Hand-picked V0 kind. Build-time KIND allocation will replace
39    // these manual values later.
40    const KIND: u8 = CONTEST_RING_KIND;
41}
42
43/// Type namespace for a contest subject.
44///
45/// This is deliberately smaller than [`OrbitTyped`]. A contest subject is
46/// not a ring value family; it is just a caller-owned namespace inside
47/// the shared contest ring.
48pub trait ContestType {
49    const KIND: u8;
50}
51
52/// The typed subject peers coordinate on.
53#[derive(Clone, Debug, PartialEq, Eq, Hash)]
54pub struct ContestSubject {
55    kind: u8,
56    label: String,
57}
58
59impl ContestSubject {
60    pub fn new<T: ContestType>(label: impl Into<String>) -> Self {
61        Self {
62            kind: T::KIND,
63            label: label.into(),
64        }
65    }
66
67    pub const fn kind(&self) -> u8 {
68        self.kind
69    }
70
71    pub fn label(&self) -> &str {
72        &self.label
73    }
74
75    fn as_bytes(&self) -> &[u8] {
76        self.label.as_bytes()
77    }
78
79    fn from_parts(kind: u8, label: &[u8]) -> Self {
80        Self {
81            kind,
82            label: String::from_utf8_lossy(label).into_owned(),
83        }
84    }
85}
86
87/// Fleet-shared contest handle. Cheap to clone.
88#[derive(Clone, Debug)]
89pub struct Contest {
90    fleet: Arc<Fleet>,
91}
92
93impl Contest {
94    pub fn new(fleet: Arc<Fleet>) -> Self {
95        Self { fleet }
96    }
97
98    /// Clear the shared contest ring.
99    ///
100    /// Intended for owner-controlled boot-time cleanup before peer
101    /// processes publish claims. It is not a runtime coordination tool.
102    pub fn reset_ring(&self) -> Result<()> {
103        self.fleet.reset_ring::<ContestRecord>().map_err(Error::Io)
104    }
105
106    /// Try to become the first active claimant for a typed subject.
107    ///
108    /// The caller supplies an owner label only for observability. Orbit
109    /// does not interpret it.
110    pub fn try_claim<T: ContestType>(
111        &self,
112        subject: impl Into<String>,
113        owner: impl Into<ContestOwner>,
114        ttl: Duration,
115    ) -> Result<Claim> {
116        self.try_claim_at::<T>(subject, owner, ttl, now_ms())
117    }
118
119    /// Same as [`Self::try_claim`], but with an explicit clock value.
120    /// Useful for deterministic tests and embedders with their own
121    /// time source.
122    pub fn try_claim_at<T: ContestType>(
123        &self,
124        subject: impl Into<String>,
125        owner: impl Into<ContestOwner>,
126        ttl: Duration,
127        now_ms: u64,
128    ) -> Result<Claim> {
129        self.try_claim_subject_at(ContestSubject::new::<T>(subject), owner, ttl, now_ms)
130    }
131
132    /// Try to claim a pre-built subject.
133    pub fn try_claim_subject(
134        &self,
135        subject: ContestSubject,
136        owner: impl Into<ContestOwner>,
137        ttl: Duration,
138    ) -> Result<Claim> {
139        self.try_claim_subject_at(subject, owner, ttl, now_ms())
140    }
141
142    /// Same as [`Self::try_claim_subject`], but with an explicit clock.
143    pub fn try_claim_subject_at(
144        &self,
145        subject: ContestSubject,
146        owner: impl Into<ContestOwner>,
147        ttl: Duration,
148        now_ms: u64,
149    ) -> Result<Claim> {
150        let owner = owner.into();
151        let expires_at_ms = expires_at(now_ms, ttl);
152        let payload = encode_claim(
153            subject.kind,
154            subject.as_bytes(),
155            owner.as_bytes(),
156            now_ms,
157            expires_at_ms,
158        )?;
159        let claim_id =
160            self.fleet
161                .publish::<ContestRecord>(CONTEST_FRAME_KIND_CLAIM, now_ms, payload);
162
163        let Some(holder) = self.active_holder(&subject, now_ms) else {
164            return Ok(Claim::YieldTo(Holder {
165                claim_id,
166                subject,
167                owner,
168                claimed_at_ms: now_ms,
169                expires_at_ms,
170            }));
171        };
172
173        if holder.claim_id.counter() == claim_id.counter() {
174            Ok(Claim::Claimed(Guard::new(self.clone(), holder)))
175        } else {
176            let _ = self.release_id(&subject, claim_id, now_ms);
177            Ok(Claim::YieldTo(holder))
178        }
179    }
180
181    /// Turn an observed holder into a drop-released guard.
182    ///
183    /// This is useful for re-entrant owners: a process may observe that
184    /// it already owns the active claim, continue carrying the same
185    /// responsibility, and release the original claim when the guard
186    /// leaves scope.
187    pub fn guard_holder(&self, holder: Holder) -> Guard {
188        Guard::new(self.clone(), holder)
189    }
190
191    /// Release a holder observed through a yield path.
192    ///
193    /// This is useful for re-entrant owners: a process may keep probing
194    /// under its own still-active claim, then release that original claim
195    /// when the guarded work succeeds.
196    pub fn release_holder(&self, holder: &Holder) -> Result<NetId64> {
197        self.release_id(&holder.subject, holder.claim_id, now_ms())
198    }
199
200    fn release_id(
201        &self,
202        subject: &ContestSubject,
203        claim_id: NetId64,
204        now_ms: u64,
205    ) -> Result<NetId64> {
206        let payload = encode_release(subject.kind, subject.as_bytes(), claim_id)?;
207        Ok(self
208            .fleet
209            .publish::<ContestRecord>(CONTEST_FRAME_KIND_RELEASE, now_ms, payload))
210    }
211
212    fn active_holder(&self, subject: &ContestSubject, now_ms: u64) -> Option<Holder> {
213        let mut cursor = self.fleet.cursor_from_start::<ContestRecord>();
214        let poll = self.fleet.poll_ring::<ContestRecord>(&mut cursor);
215        let mut active = BTreeMap::<u64, Holder>::new();
216
217        for frame in poll.frames {
218            match decode_frame(frame.kind, &frame.payload) {
219                Some(DecodedContestFrame::Claim(decoded))
220                    if decoded.subject_kind == subject.kind
221                        && decoded.subject == subject.as_bytes()
222                        && decoded.expires_at_ms > now_ms =>
223                {
224                    active.insert(
225                        frame.id.counter(),
226                        Holder {
227                            claim_id: frame.id,
228                            subject: ContestSubject::from_parts(
229                                decoded.subject_kind,
230                                decoded.subject,
231                            ),
232                            owner: ContestOwner::from_bytes(decoded.owner),
233                            claimed_at_ms: decoded.claimed_at_ms,
234                            expires_at_ms: decoded.expires_at_ms,
235                        },
236                    );
237                }
238                Some(DecodedContestFrame::Release(decoded))
239                    if decoded.subject_kind == subject.kind
240                        && decoded.subject == subject.as_bytes() =>
241                {
242                    active.remove(&decoded.claim_id.counter());
243                }
244                _ => {}
245            }
246        }
247
248        active.into_values().next()
249    }
250}
251
252/// Observer-facing label for the claimant.
253#[derive(Clone, Debug, PartialEq, Eq, Hash)]
254pub struct ContestOwner(String);
255
256impl ContestOwner {
257    pub fn new(value: impl Into<String>) -> Self {
258        Self(value.into())
259    }
260
261    pub fn as_str(&self) -> &str {
262        &self.0
263    }
264
265    fn as_bytes(&self) -> &[u8] {
266        self.0.as_bytes()
267    }
268
269    fn from_bytes(bytes: &[u8]) -> Self {
270        Self(String::from_utf8_lossy(bytes).into_owned())
271    }
272}
273
274impl From<&str> for ContestOwner {
275    fn from(value: &str) -> Self {
276        Self::new(value)
277    }
278}
279
280impl From<String> for ContestOwner {
281    fn from(value: String) -> Self {
282        Self(value)
283    }
284}
285
286impl From<&String> for ContestOwner {
287    fn from(value: &String) -> Self {
288        Self(value.clone())
289    }
290}
291
292/// Current active holder observed for a subject.
293#[derive(Clone, Debug, PartialEq, Eq)]
294pub struct Holder {
295    pub claim_id: NetId64,
296    pub subject: ContestSubject,
297    pub owner: ContestOwner,
298    pub claimed_at_ms: u64,
299    pub expires_at_ms: u64,
300}
301
302/// RAII guard returned to the process that owns the earliest active claim.
303///
304/// The guard is the lifetime of the claim in Rust terms: while it lives,
305/// the caller is carrying the responsibility for the contest subject.
306/// Dropping it publishes a release frame. If the process is killed before
307/// `Drop` runs, the claim remains active only until its encoded expiry.
308#[derive(Debug)]
309pub struct Guard {
310    contest: Contest,
311    holder: Holder,
312    release_on_drop: bool,
313}
314
315impl Guard {
316    fn new(contest: Contest, holder: Holder) -> Self {
317        Self {
318            contest,
319            holder,
320            release_on_drop: true,
321        }
322    }
323
324    pub fn holder(&self) -> &Holder {
325        &self.holder
326    }
327
328    pub fn claim_id(&self) -> NetId64 {
329        self.holder.claim_id
330    }
331
332    pub fn subject(&self) -> &ContestSubject {
333        &self.holder.subject
334    }
335
336    pub fn owner(&self) -> &ContestOwner {
337        &self.holder.owner
338    }
339
340    pub fn expires_at_ms(&self) -> u64 {
341        self.holder.expires_at_ms
342    }
343
344    /// Explicitly release the claim before this guard leaves scope.
345    pub fn release(mut self) -> Result<NetId64> {
346        let released =
347            self.contest
348                .release_id(&self.holder.subject, self.holder.claim_id, now_ms());
349        if released.is_ok() {
350            self.release_on_drop = false;
351        }
352        released
353    }
354}
355
356impl Drop for Guard {
357    fn drop(&mut self) {
358        if !self.release_on_drop {
359            return;
360        }
361        if let Err(err) =
362            self.contest
363                .release_id(&self.holder.subject, self.holder.claim_id, now_ms())
364        {
365            tracing::debug!(
366                claim_id = %self.holder.claim_id,
367                subject_kind = self.holder.subject.kind(),
368                subject = self.holder.subject.label(),
369                error = %err,
370                "contest guard release failed"
371            );
372        }
373    }
374}
375
376#[derive(Debug)]
377pub enum Claim {
378    Claimed(Guard),
379    YieldTo(Holder),
380}
381
382impl Claim {
383    pub fn is_claimed(&self) -> bool {
384        matches!(self, Self::Claimed(_))
385    }
386}
387
388struct DecodedClaim<'a> {
389    subject_kind: u8,
390    subject: &'a [u8],
391    owner: &'a [u8],
392    claimed_at_ms: u64,
393    expires_at_ms: u64,
394}
395
396struct DecodedRelease<'a> {
397    subject_kind: u8,
398    subject: &'a [u8],
399    claim_id: NetId64,
400}
401
402enum DecodedContestFrame<'a> {
403    Claim(DecodedClaim<'a>),
404    Release(DecodedRelease<'a>),
405}
406
407fn encode_claim(
408    subject_kind: u8,
409    subject: &[u8],
410    owner: &[u8],
411    claimed_at_ms: u64,
412    expires_at_ms: u64,
413) -> Result<Bytes> {
414    let total = CLAIM_HEADER_LEN + subject.len() + owner.len();
415    if subject.len() > u16::MAX as usize
416        || owner.len() > u16::MAX as usize
417        || total > CONTEST_PAYLOAD_MAX
418    {
419        return Err(Error::ContestFrameTooLarge {
420            subject_len: subject.len(),
421            owner_len: owner.len(),
422            max_payload: CONTEST_PAYLOAD_MAX,
423        });
424    }
425
426    let mut buf = BytesMut::with_capacity(total);
427    buf.put_u8(subject_kind);
428    buf.put_u16_le(subject.len() as u16);
429    buf.put_u16_le(owner.len() as u16);
430    buf.put_u64_le(claimed_at_ms);
431    buf.put_u64_le(expires_at_ms);
432    buf.put_slice(subject);
433    buf.put_slice(owner);
434    Ok(buf.freeze())
435}
436
437fn encode_release(subject_kind: u8, subject: &[u8], claim_id: NetId64) -> Result<Bytes> {
438    let total = RELEASE_HEADER_LEN + subject.len();
439    if subject.len() > u16::MAX as usize || total > CONTEST_PAYLOAD_MAX {
440        return Err(Error::ContestFrameTooLarge {
441            subject_len: subject.len(),
442            owner_len: 0,
443            max_payload: CONTEST_PAYLOAD_MAX,
444        });
445    }
446
447    let mut buf = BytesMut::with_capacity(total);
448    buf.put_u64_le(claim_id.raw());
449    buf.put_u8(subject_kind);
450    buf.put_u16_le(subject.len() as u16);
451    buf.put_slice(subject);
452    Ok(buf.freeze())
453}
454
455fn decode_frame(frame_kind: u8, payload: &Bytes) -> Option<DecodedContestFrame<'_>> {
456    match frame_kind {
457        CONTEST_FRAME_KIND_CLAIM => decode_claim(payload).map(DecodedContestFrame::Claim),
458        CONTEST_FRAME_KIND_RELEASE => decode_release(payload).map(DecodedContestFrame::Release),
459        _ => None,
460    }
461}
462
463fn decode_claim(payload: &Bytes) -> Option<DecodedClaim<'_>> {
464    if payload.len() < CLAIM_HEADER_LEN {
465        return None;
466    }
467
468    let subject_kind = payload[0];
469    let subject_len = u16::from_le_bytes(payload[1..3].try_into().ok()?) as usize;
470    let owner_len = u16::from_le_bytes(payload[3..5].try_into().ok()?) as usize;
471    let claimed_at_ms = u64::from_le_bytes(payload[5..13].try_into().ok()?);
472    let expires_at_ms = u64::from_le_bytes(payload[13..21].try_into().ok()?);
473    let subject_start = CLAIM_HEADER_LEN;
474    let subject_end = subject_start.checked_add(subject_len)?;
475    let owner_end = subject_end.checked_add(owner_len)?;
476    if payload.len() < owner_end {
477        return None;
478    }
479
480    Some(DecodedClaim {
481        subject_kind,
482        subject: &payload[subject_start..subject_end],
483        owner: &payload[subject_end..owner_end],
484        claimed_at_ms,
485        expires_at_ms,
486    })
487}
488
489fn decode_release(payload: &Bytes) -> Option<DecodedRelease<'_>> {
490    if payload.len() < RELEASE_HEADER_LEN {
491        return None;
492    }
493
494    let claim_id = NetId64::from_raw(u64::from_le_bytes(payload[0..8].try_into().ok()?));
495    let subject_kind = payload[8];
496    let subject_len = u16::from_le_bytes(payload[9..11].try_into().ok()?) as usize;
497    let subject_start = RELEASE_HEADER_LEN;
498    let subject_end = subject_start.checked_add(subject_len)?;
499    if payload.len() < subject_end {
500        return None;
501    }
502
503    Some(DecodedRelease {
504        subject_kind,
505        subject: &payload[subject_start..subject_end],
506        claim_id,
507    })
508}
509
510fn expires_at(now_ms: u64, ttl: Duration) -> u64 {
511    let ttl_ms = ttl.as_millis().min(u128::from(u64::MAX)) as u64;
512    now_ms.saturating_add(ttl_ms)
513}
514
515fn now_ms() -> u64 {
516    SystemTime::now()
517        .duration_since(UNIX_EPOCH)
518        .map(|d| d.as_millis().min(u128::from(u64::MAX)) as u64)
519        .unwrap_or(0)
520}
521
522#[cfg(test)]
523mod tests {
524    use std::sync::Arc;
525    use std::time::Duration;
526
527    use super::{Claim, Contest, ContestType};
528    use crate::Fleet;
529
530    struct OriginProbe;
531
532    impl ContestType for OriginProbe {
533        const KIND: u8 = 1;
534    }
535
536    struct OtherProbe;
537
538    impl ContestType for OtherProbe {
539        const KIND: u8 = 2;
540    }
541
542    #[test]
543    fn first_claim_is_claimed_for_subject() {
544        let fleet = Arc::new(Fleet::join("first_claim_is_claimed", 2).expect("fleet"));
545        let claims = Contest::new(fleet);
546
547        let first = claims
548            .try_claim_at::<OriginProbe>("origin:tcp_1", "worker:1", Duration::from_secs(30), 1_000)
549            .expect("contest");
550        let second = claims
551            .try_claim_at::<OriginProbe>("origin:tcp_1", "worker:2", Duration::from_secs(30), 1_001)
552            .expect("second claim");
553
554        assert!(matches!(first, Claim::Claimed(_)));
555        let Claim::YieldTo(holder) = second else {
556            panic!("second claimant should yield");
557        };
558        assert_eq!(holder.owner.as_str(), "worker:1");
559    }
560
561    #[test]
562    fn different_subject_labels_do_not_compete() {
563        let fleet = Arc::new(Fleet::join("first_claim_subjects", 2).expect("fleet"));
564        let claims = Contest::new(fleet);
565
566        let first = claims
567            .try_claim_at::<OriginProbe>("origin:tcp_1", "worker:1", Duration::from_secs(30), 1_000)
568            .expect("first subject");
569        let second = claims
570            .try_claim_at::<OriginProbe>("origin:tcp_2", "worker:2", Duration::from_secs(30), 1_001)
571            .expect("second subject");
572
573        assert!(first.is_claimed());
574        assert!(second.is_claimed());
575    }
576
577    #[test]
578    fn different_subject_types_do_not_compete() {
579        let fleet = Arc::new(Fleet::join("first_claim_subject_types", 2).expect("fleet"));
580        let claims = Contest::new(fleet);
581
582        let first = claims
583            .try_claim_at::<OriginProbe>("same-label", "worker:1", Duration::from_secs(30), 1_000)
584            .expect("first type");
585        let second = claims
586            .try_claim_at::<OtherProbe>("same-label", "worker:2", Duration::from_secs(30), 1_001)
587            .expect("second type");
588
589        assert!(first.is_claimed());
590        assert!(second.is_claimed());
591    }
592
593    #[test]
594    fn releasing_claim_guard_allows_next_claim() {
595        let fleet = Arc::new(Fleet::join("first_claim_release", 2).expect("fleet"));
596        let claims = Contest::new(fleet);
597
598        let Claim::Claimed(guard) = claims
599            .try_claim_at::<OriginProbe>("origin:tcp_1", "worker:1", Duration::from_secs(30), 1_000)
600            .expect("claim")
601        else {
602            panic!("expected claim");
603        };
604        guard.release().expect("release");
605
606        let next = claims
607            .try_claim_at::<OriginProbe>("origin:tcp_1", "worker:2", Duration::from_secs(30), 1_100)
608            .expect("next claim");
609
610        let Claim::Claimed(guard) = next else {
611            panic!("released claim should not block");
612        };
613        assert_eq!(guard.owner().as_str(), "worker:2");
614    }
615
616    #[test]
617    fn expired_claim_does_not_block_next_claim() {
618        let fleet = Arc::new(Fleet::join("first_claim_expiry", 2).expect("fleet"));
619        let claims = Contest::new(fleet);
620
621        let first = claims
622            .try_claim_at::<OriginProbe>(
623                "origin:tcp_1",
624                "worker:1",
625                Duration::from_millis(5),
626                1_000,
627            )
628            .expect("contest");
629        let second = claims
630            .try_claim_at::<OriginProbe>("origin:tcp_1", "worker:2", Duration::from_secs(30), 1_006)
631            .expect("second claim");
632
633        assert!(first.is_claimed());
634        let Claim::Claimed(guard) = second else {
635            panic!("expired claim should not block");
636        };
637        assert_eq!(guard.owner().as_str(), "worker:2");
638    }
639
640    #[test]
641    fn yielding_claim_releases_itself() {
642        let fleet = Arc::new(Fleet::join("yielding_claim_release", 2).expect("fleet"));
643        let claims = Contest::new(fleet);
644
645        let Claim::Claimed(first_guard) = claims
646            .try_claim_at::<OriginProbe>("origin:tcp_1", "worker:1", Duration::from_secs(30), 1_000)
647            .expect("contest")
648        else {
649            panic!("expected initial claim");
650        };
651        let second = claims
652            .try_claim_at::<OriginProbe>("origin:tcp_1", "worker:2", Duration::from_secs(30), 1_001)
653            .expect("second claim");
654        assert!(matches!(second, Claim::YieldTo(_)));
655
656        first_guard.release().expect("release first");
657        let third = claims
658            .try_claim_at::<OriginProbe>("origin:tcp_1", "worker:3", Duration::from_secs(30), 1_100)
659            .expect("third claim");
660
661        let Claim::Claimed(guard) = third else {
662            panic!("released yielding claim should not block");
663        };
664        assert_eq!(guard.owner().as_str(), "worker:3");
665    }
666}