Skip to main content

lash_core/store/
queued_work.rs

1//! Dialect-independent queued-work claim logic shared by durable backends.
2//!
3//! The SQL backends (sqlite, postgres) load candidate batch rows ordered by
4//! `enqueue_seq` and pre-filtered to ready batches that are not held by a
5//! live claim, then apply the same pure state machine: a delivery-policy
6//! boundary gate, slot-policy/merge-key prefix grouping, and fencing-token /
7//! lease derivation. That state machine lives here so the backends own only
8//! their SQL reads and writes while the claim contract has a single
9//! implementation, exercised against every backend by the shared
10//! `runtime_persistence` conformance suite.
11
12use sha2::{Digest, Sha256};
13
14use super::LeaseOwnerIdentity;
15use super::StoreError;
16use crate::runtime::QueuedWorkClass;
17use crate::{
18    DeliveryPolicy, MergeKey, QueuedWorkClaim, QueuedWorkClaimBoundary, QueuedWorkCompletion,
19    SlotPolicy,
20};
21
22/// Decoded claim-relevant fields of one ready queued-work batch row.
23///
24/// Backends build these from their candidate rows, presented in
25/// `enqueue_seq` ascending order and already filtered to
26/// `available_at_ms <= now` with no live claim.
27#[derive(Clone, Debug)]
28pub struct ClaimCandidate {
29    pub enqueue_seq: u64,
30    pub claim_fencing_token: u64,
31    pub work_class: QueuedWorkClass,
32    pub delivery_policy: DeliveryPolicy,
33    pub slot_policy: SlotPolicy,
34    pub merge_key: MergeKey,
35}
36
37/// How many candidate rows a backend should scan when selecting up to
38/// `max_batches` claimable batches. Joinable groups are matched as a prefix,
39/// so scanning a bounded surplus keeps one round trip sufficient.
40pub fn claim_scan_limit(max_batches: usize) -> i64 {
41    (max_batches as i64).saturating_add(32)
42}
43
44/// Select a leading session-command batch.
45///
46/// Returns `1` only when the earliest ready claimable batch is a
47/// [`QueuedWorkClass::SessionCommand`]. Session commands are intentionally
48/// claimed one batch at a time so the runtime applies each mutation and
49/// completion through its normal fenced commit path before moving to the next.
50pub fn select_leading_session_command(candidates: &[ClaimCandidate]) -> usize {
51    if candidates
52        .first()
53        .is_some_and(|candidate| candidate.work_class == QueuedWorkClass::SessionCommand)
54    {
55        1
56    } else {
57        0
58    }
59}
60
61/// Select the prefix of turn-work `candidates` that a single claim may take.
62///
63/// Returns the number of leading candidates to claim (`0` means no claim):
64///
65/// * The queue head must be [`QueuedWorkClass::TurnWork`]. Earlier ready
66///   session commands are never skipped or materialized as turn input.
67/// * An [`QueuedWorkClaimBoundary::ActiveTurnCheckpoint`] boundary only
68///   admits work whose head batch is
69///   [`DeliveryPolicy::EarliestSafeBoundary`].
70/// * An [`SlotPolicy::Exclusive`] head claims exactly one batch.
71/// * A [`SlotPolicy::Join`] head extends through immediately following
72///   `Join` batches with the same delivery policy and merge key, up to
73///   `max_batches`.
74pub fn select_turn_work_claim_prefix(
75    candidates: &[ClaimCandidate],
76    boundary: QueuedWorkClaimBoundary,
77    max_batches: usize,
78) -> usize {
79    if max_batches == 0 {
80        return 0;
81    }
82    let Some(first) = candidates.first() else {
83        return 0;
84    };
85    if first.work_class != QueuedWorkClass::TurnWork {
86        return 0;
87    }
88    if boundary == QueuedWorkClaimBoundary::ActiveTurnCheckpoint
89        && first.delivery_policy != DeliveryPolicy::EarliestSafeBoundary
90    {
91        return 0;
92    }
93    if first.slot_policy != SlotPolicy::Join {
94        return 1;
95    }
96    let mut selected = 1;
97    for candidate in &candidates[1..] {
98        if selected >= max_batches
99            || candidate.work_class != QueuedWorkClass::TurnWork
100            || candidate.slot_policy != SlotPolicy::Join
101            || candidate.delivery_policy != first.delivery_policy
102            || candidate.merge_key != first.merge_key
103        {
104            break;
105        }
106        selected += 1;
107    }
108    selected
109}
110
111/// A freshly derived lease for a selected claim prefix.
112///
113/// The fencing token advances past the head batch's last observed token, the
114/// claim id is stable for (head batch, fencing token), and the lease token is
115/// an opaque proof-of-ownership digest the backend stamps on every claimed
116/// row.
117#[derive(Clone, Debug)]
118pub struct QueuedWorkClaimLease {
119    pub claim_id: String,
120    pub lease_token: String,
121    pub fencing_token: u64,
122    pub claimed_at_epoch_ms: u64,
123    pub expires_at_epoch_ms: u64,
124}
125
126impl QueuedWorkClaimLease {
127    pub fn derive(
128        head: &ClaimCandidate,
129        session_id: &str,
130        owner: &LeaseOwnerIdentity,
131        now_epoch_ms: u64,
132        lease_ttl_ms: u64,
133    ) -> Self {
134        let fencing_token = head.claim_fencing_token.saturating_add(1);
135        let claim_id = format!("qwc:{}:{fencing_token}", head.enqueue_seq);
136        let lease_token = format!(
137            "{:x}",
138            Sha256::digest(
139                format!(
140                    "{}:{}:{}:{}:{}",
141                    session_id, owner.owner_id, owner.incarnation_id, claim_id, now_epoch_ms
142                )
143                .as_bytes(),
144            )
145        );
146        Self {
147            claim_id,
148            lease_token,
149            fencing_token,
150            claimed_at_epoch_ms: now_epoch_ms,
151            expires_at_epoch_ms: now_epoch_ms.saturating_add(lease_ttl_ms),
152        }
153    }
154}
155
156/// Derive the durable id for a newly enqueued batch.
157///
158/// `nonce` disambiguates batches enqueued within the same millisecond;
159/// backends whose id uniqueness already comes from elsewhere pass `None`.
160pub fn derive_batch_id(
161    session_id: &str,
162    source_key: Option<&str>,
163    now_epoch_ms: u64,
164    nonce: Option<u64>,
165) -> String {
166    let mut seed = format!("{session_id}:{source_key:?}:{now_epoch_ms}");
167    if let Some(nonce) = nonce {
168        seed.push_str(&format!(":{nonce}"));
169    }
170    format!("qwb:{:x}", Sha256::digest(seed.as_bytes()))
171}
172
173/// Apply the shared lease-renewal decision: the lease holds only when every
174/// batch row in the claim accepted the new expiry stamp.
175pub fn renewed_claim(
176    claim: &QueuedWorkClaim,
177    renewed_rows: usize,
178    expires_at_epoch_ms: u64,
179) -> Result<QueuedWorkClaim, StoreError> {
180    if renewed_rows != claim.batches.len() {
181        return Err(StoreError::QueuedWorkClaimExpired {
182            session_id: claim.session_id.clone(),
183            claim_id: claim.claim_id.clone(),
184        });
185    }
186    Ok(QueuedWorkClaim {
187        expires_at_epoch_ms,
188        ..claim.clone()
189    })
190}
191
192/// Apply the shared completion-fencing decision: a completion may delete its
193/// batches only when the live store still shows the claim owning every one
194/// of them (`owned_rows` rows matched the claim id + lease token).
195pub fn ensure_completion_owns_all_batches(
196    completed: &QueuedWorkCompletion,
197    owned_rows: usize,
198) -> Result<(), StoreError> {
199    if owned_rows != completed.batch_ids.len() {
200        return Err(StoreError::QueuedWorkClaimExpired {
201            session_id: completed.session_id.clone(),
202            claim_id: completed.claim_id.clone(),
203        });
204    }
205    Ok(())
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211
212    fn candidate(
213        enqueue_seq: u64,
214        work_class: QueuedWorkClass,
215        delivery_policy: DeliveryPolicy,
216        slot_policy: SlotPolicy,
217        merge_key: MergeKey,
218    ) -> ClaimCandidate {
219        ClaimCandidate {
220            enqueue_seq,
221            claim_fencing_token: 0,
222            work_class,
223            delivery_policy,
224            slot_policy,
225            merge_key,
226        }
227    }
228
229    #[test]
230    fn exclusive_head_claims_exactly_one() {
231        let candidates = vec![
232            candidate(
233                1,
234                QueuedWorkClass::TurnWork,
235                DeliveryPolicy::EarliestSafeBoundary,
236                SlotPolicy::Exclusive,
237                MergeKey::Never,
238            ),
239            candidate(
240                2,
241                QueuedWorkClass::TurnWork,
242                DeliveryPolicy::EarliestSafeBoundary,
243                SlotPolicy::Exclusive,
244                MergeKey::Never,
245            ),
246        ];
247        assert_eq!(
248            select_turn_work_claim_prefix(&candidates, QueuedWorkClaimBoundary::Idle, 8),
249            1
250        );
251    }
252
253    #[test]
254    fn join_head_groups_matching_prefix_up_to_max() {
255        let join = |seq| {
256            candidate(
257                seq,
258                QueuedWorkClass::TurnWork,
259                DeliveryPolicy::EarliestSafeBoundary,
260                SlotPolicy::Join,
261                MergeKey::PayloadDefault,
262            )
263        };
264        let candidates = vec![join(1), join(2), join(3), join(4)];
265        assert_eq!(
266            select_turn_work_claim_prefix(&candidates, QueuedWorkClaimBoundary::Idle, 3),
267            3
268        );
269    }
270
271    #[test]
272    fn join_group_breaks_on_policy_or_merge_key_mismatch() {
273        let candidates = vec![
274            candidate(
275                1,
276                QueuedWorkClass::TurnWork,
277                DeliveryPolicy::EarliestSafeBoundary,
278                SlotPolicy::Join,
279                MergeKey::Group("a".to_string()),
280            ),
281            candidate(
282                2,
283                QueuedWorkClass::TurnWork,
284                DeliveryPolicy::EarliestSafeBoundary,
285                SlotPolicy::Join,
286                MergeKey::Group("b".to_string()),
287            ),
288        ];
289        assert_eq!(
290            select_turn_work_claim_prefix(&candidates, QueuedWorkClaimBoundary::Idle, 8),
291            1
292        );
293    }
294
295    #[test]
296    fn active_turn_checkpoint_boundary_gates_on_delivery_policy() {
297        let candidates = vec![candidate(
298            1,
299            QueuedWorkClass::TurnWork,
300            DeliveryPolicy::AfterCurrentTurnCommit,
301            SlotPolicy::Exclusive,
302            MergeKey::Never,
303        )];
304        assert_eq!(
305            select_turn_work_claim_prefix(
306                &candidates,
307                QueuedWorkClaimBoundary::ActiveTurnCheckpoint,
308                8
309            ),
310            0
311        );
312    }
313
314    #[test]
315    fn leading_session_command_blocks_turn_work_claim() {
316        let candidates = vec![
317            candidate(
318                1,
319                QueuedWorkClass::SessionCommand,
320                DeliveryPolicy::EarliestSafeBoundary,
321                SlotPolicy::Exclusive,
322                MergeKey::Never,
323            ),
324            candidate(
325                2,
326                QueuedWorkClass::TurnWork,
327                DeliveryPolicy::EarliestSafeBoundary,
328                SlotPolicy::Exclusive,
329                MergeKey::Never,
330            ),
331        ];
332        assert_eq!(select_leading_session_command(&candidates), 1);
333        assert_eq!(
334            select_turn_work_claim_prefix(&candidates, QueuedWorkClaimBoundary::Idle, 8),
335            0
336        );
337    }
338
339    #[test]
340    fn later_session_command_does_not_join_turn_work_claim() {
341        let candidates = vec![
342            candidate(
343                1,
344                QueuedWorkClass::TurnWork,
345                DeliveryPolicy::EarliestSafeBoundary,
346                SlotPolicy::Join,
347                MergeKey::PayloadDefault,
348            ),
349            candidate(
350                2,
351                QueuedWorkClass::SessionCommand,
352                DeliveryPolicy::EarliestSafeBoundary,
353                SlotPolicy::Join,
354                MergeKey::PayloadDefault,
355            ),
356        ];
357        assert_eq!(select_leading_session_command(&candidates), 0);
358        assert_eq!(
359            select_turn_work_claim_prefix(&candidates, QueuedWorkClaimBoundary::Idle, 8),
360            1
361        );
362    }
363
364    #[test]
365    fn lease_derivation_is_deterministic_and_advances_fencing() {
366        let head = ClaimCandidate {
367            enqueue_seq: 7,
368            claim_fencing_token: 2,
369            work_class: QueuedWorkClass::TurnWork,
370            delivery_policy: DeliveryPolicy::EarliestSafeBoundary,
371            slot_policy: SlotPolicy::Exclusive,
372            merge_key: MergeKey::Never,
373        };
374        let owner = LeaseOwnerIdentity::opaque("owner", "owner:incarnation");
375        let lease = QueuedWorkClaimLease::derive(&head, "session", &owner, 1_000, 250);
376        assert_eq!(lease.fencing_token, 3);
377        assert_eq!(lease.claim_id, "qwc:7:3");
378        assert_eq!(lease.claimed_at_epoch_ms, 1_000);
379        assert_eq!(lease.expires_at_epoch_ms, 1_250);
380        let again = QueuedWorkClaimLease::derive(&head, "session", &owner, 1_000, 250);
381        assert_eq!(lease.lease_token, again.lease_token);
382    }
383
384    #[test]
385    fn batch_id_includes_optional_nonce() {
386        let plain = derive_batch_id("session", Some("key"), 1_000, None);
387        let nonced = derive_batch_id("session", Some("key"), 1_000, Some(1));
388        assert_ne!(plain, nonced);
389        assert!(plain.starts_with("qwb:"));
390    }
391}