1use sha2::{Digest, Sha256};
13
14use super::LeaseOwnerIdentity;
15use super::StoreError;
16use crate::runtime::QueuedWorkClass;
17use crate::{
18 DeliveryPolicy, MergeKey, QueuedWorkClaim, QueuedWorkClaimBoundary, QueuedWorkCompletion,
19 SlotPolicy,
20};
21
22#[derive(Clone, Debug)]
28pub struct ClaimCandidate {
29 pub enqueue_seq: u64,
30 pub claim_fencing_token: u64,
31 pub work_class: QueuedWorkClass,
32 pub delivery_policy: DeliveryPolicy,
33 pub slot_policy: SlotPolicy,
34 pub merge_key: MergeKey,
35}
36
37pub fn claim_scan_limit(max_batches: usize) -> i64 {
41 (max_batches as i64).saturating_add(32)
42}
43
44pub fn select_leading_session_command(candidates: &[ClaimCandidate]) -> usize {
51 if candidates
52 .first()
53 .is_some_and(|candidate| candidate.work_class == QueuedWorkClass::SessionCommand)
54 {
55 1
56 } else {
57 0
58 }
59}
60
61pub fn select_turn_work_claim_prefix(
75 candidates: &[ClaimCandidate],
76 boundary: QueuedWorkClaimBoundary,
77 max_batches: usize,
78) -> usize {
79 if max_batches == 0 {
80 return 0;
81 }
82 let Some(first) = candidates.first() else {
83 return 0;
84 };
85 if first.work_class != QueuedWorkClass::TurnWork {
86 return 0;
87 }
88 if boundary == QueuedWorkClaimBoundary::ActiveTurnCheckpoint
89 && first.delivery_policy != DeliveryPolicy::EarliestSafeBoundary
90 {
91 return 0;
92 }
93 if first.slot_policy != SlotPolicy::Join {
94 return 1;
95 }
96 let mut selected = 1;
97 for candidate in &candidates[1..] {
98 if selected >= max_batches
99 || candidate.work_class != QueuedWorkClass::TurnWork
100 || candidate.slot_policy != SlotPolicy::Join
101 || candidate.delivery_policy != first.delivery_policy
102 || candidate.merge_key != first.merge_key
103 {
104 break;
105 }
106 selected += 1;
107 }
108 selected
109}
110
111#[derive(Clone, Debug)]
118pub struct QueuedWorkClaimLease {
119 pub claim_id: String,
120 pub lease_token: String,
121 pub fencing_token: u64,
122 pub claimed_at_epoch_ms: u64,
123 pub expires_at_epoch_ms: u64,
124}
125
126impl QueuedWorkClaimLease {
127 pub fn derive(
128 head: &ClaimCandidate,
129 session_id: &str,
130 owner: &LeaseOwnerIdentity,
131 now_epoch_ms: u64,
132 lease_ttl_ms: u64,
133 ) -> Self {
134 let fencing_token = head.claim_fencing_token.saturating_add(1);
135 let claim_id = format!("qwc:{}:{fencing_token}", head.enqueue_seq);
136 let lease_token = format!(
137 "{:x}",
138 Sha256::digest(
139 format!(
140 "{}:{}:{}:{}:{}",
141 session_id, owner.owner_id, owner.incarnation_id, claim_id, now_epoch_ms
142 )
143 .as_bytes(),
144 )
145 );
146 Self {
147 claim_id,
148 lease_token,
149 fencing_token,
150 claimed_at_epoch_ms: now_epoch_ms,
151 expires_at_epoch_ms: now_epoch_ms.saturating_add(lease_ttl_ms),
152 }
153 }
154}
155
156pub fn derive_batch_id(
161 session_id: &str,
162 source_key: Option<&str>,
163 now_epoch_ms: u64,
164 nonce: Option<u64>,
165) -> String {
166 let mut seed = format!("{session_id}:{source_key:?}:{now_epoch_ms}");
167 if let Some(nonce) = nonce {
168 seed.push_str(&format!(":{nonce}"));
169 }
170 format!("qwb:{:x}", Sha256::digest(seed.as_bytes()))
171}
172
173pub fn renewed_claim(
176 claim: &QueuedWorkClaim,
177 renewed_rows: usize,
178 expires_at_epoch_ms: u64,
179) -> Result<QueuedWorkClaim, StoreError> {
180 if renewed_rows != claim.batches.len() {
181 return Err(StoreError::QueuedWorkClaimExpired {
182 session_id: claim.session_id.clone(),
183 claim_id: claim.claim_id.clone(),
184 });
185 }
186 Ok(QueuedWorkClaim {
187 expires_at_epoch_ms,
188 ..claim.clone()
189 })
190}
191
192pub fn ensure_completion_owns_all_batches(
196 completed: &QueuedWorkCompletion,
197 owned_rows: usize,
198) -> Result<(), StoreError> {
199 if owned_rows != completed.batch_ids.len() {
200 return Err(StoreError::QueuedWorkClaimExpired {
201 session_id: completed.session_id.clone(),
202 claim_id: completed.claim_id.clone(),
203 });
204 }
205 Ok(())
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211
212 fn candidate(
213 enqueue_seq: u64,
214 work_class: QueuedWorkClass,
215 delivery_policy: DeliveryPolicy,
216 slot_policy: SlotPolicy,
217 merge_key: MergeKey,
218 ) -> ClaimCandidate {
219 ClaimCandidate {
220 enqueue_seq,
221 claim_fencing_token: 0,
222 work_class,
223 delivery_policy,
224 slot_policy,
225 merge_key,
226 }
227 }
228
229 #[test]
230 fn exclusive_head_claims_exactly_one() {
231 let candidates = vec![
232 candidate(
233 1,
234 QueuedWorkClass::TurnWork,
235 DeliveryPolicy::EarliestSafeBoundary,
236 SlotPolicy::Exclusive,
237 MergeKey::Never,
238 ),
239 candidate(
240 2,
241 QueuedWorkClass::TurnWork,
242 DeliveryPolicy::EarliestSafeBoundary,
243 SlotPolicy::Exclusive,
244 MergeKey::Never,
245 ),
246 ];
247 assert_eq!(
248 select_turn_work_claim_prefix(&candidates, QueuedWorkClaimBoundary::Idle, 8),
249 1
250 );
251 }
252
253 #[test]
254 fn join_head_groups_matching_prefix_up_to_max() {
255 let join = |seq| {
256 candidate(
257 seq,
258 QueuedWorkClass::TurnWork,
259 DeliveryPolicy::EarliestSafeBoundary,
260 SlotPolicy::Join,
261 MergeKey::PayloadDefault,
262 )
263 };
264 let candidates = vec![join(1), join(2), join(3), join(4)];
265 assert_eq!(
266 select_turn_work_claim_prefix(&candidates, QueuedWorkClaimBoundary::Idle, 3),
267 3
268 );
269 }
270
271 #[test]
272 fn join_group_breaks_on_policy_or_merge_key_mismatch() {
273 let candidates = vec![
274 candidate(
275 1,
276 QueuedWorkClass::TurnWork,
277 DeliveryPolicy::EarliestSafeBoundary,
278 SlotPolicy::Join,
279 MergeKey::Group("a".to_string()),
280 ),
281 candidate(
282 2,
283 QueuedWorkClass::TurnWork,
284 DeliveryPolicy::EarliestSafeBoundary,
285 SlotPolicy::Join,
286 MergeKey::Group("b".to_string()),
287 ),
288 ];
289 assert_eq!(
290 select_turn_work_claim_prefix(&candidates, QueuedWorkClaimBoundary::Idle, 8),
291 1
292 );
293 }
294
295 #[test]
296 fn active_turn_checkpoint_boundary_gates_on_delivery_policy() {
297 let candidates = vec![candidate(
298 1,
299 QueuedWorkClass::TurnWork,
300 DeliveryPolicy::AfterCurrentTurnCommit,
301 SlotPolicy::Exclusive,
302 MergeKey::Never,
303 )];
304 assert_eq!(
305 select_turn_work_claim_prefix(
306 &candidates,
307 QueuedWorkClaimBoundary::ActiveTurnCheckpoint,
308 8
309 ),
310 0
311 );
312 }
313
314 #[test]
315 fn leading_session_command_blocks_turn_work_claim() {
316 let candidates = vec![
317 candidate(
318 1,
319 QueuedWorkClass::SessionCommand,
320 DeliveryPolicy::EarliestSafeBoundary,
321 SlotPolicy::Exclusive,
322 MergeKey::Never,
323 ),
324 candidate(
325 2,
326 QueuedWorkClass::TurnWork,
327 DeliveryPolicy::EarliestSafeBoundary,
328 SlotPolicy::Exclusive,
329 MergeKey::Never,
330 ),
331 ];
332 assert_eq!(select_leading_session_command(&candidates), 1);
333 assert_eq!(
334 select_turn_work_claim_prefix(&candidates, QueuedWorkClaimBoundary::Idle, 8),
335 0
336 );
337 }
338
339 #[test]
340 fn later_session_command_does_not_join_turn_work_claim() {
341 let candidates = vec![
342 candidate(
343 1,
344 QueuedWorkClass::TurnWork,
345 DeliveryPolicy::EarliestSafeBoundary,
346 SlotPolicy::Join,
347 MergeKey::PayloadDefault,
348 ),
349 candidate(
350 2,
351 QueuedWorkClass::SessionCommand,
352 DeliveryPolicy::EarliestSafeBoundary,
353 SlotPolicy::Join,
354 MergeKey::PayloadDefault,
355 ),
356 ];
357 assert_eq!(select_leading_session_command(&candidates), 0);
358 assert_eq!(
359 select_turn_work_claim_prefix(&candidates, QueuedWorkClaimBoundary::Idle, 8),
360 1
361 );
362 }
363
364 #[test]
365 fn lease_derivation_is_deterministic_and_advances_fencing() {
366 let head = ClaimCandidate {
367 enqueue_seq: 7,
368 claim_fencing_token: 2,
369 work_class: QueuedWorkClass::TurnWork,
370 delivery_policy: DeliveryPolicy::EarliestSafeBoundary,
371 slot_policy: SlotPolicy::Exclusive,
372 merge_key: MergeKey::Never,
373 };
374 let owner = LeaseOwnerIdentity::opaque("owner", "owner:incarnation");
375 let lease = QueuedWorkClaimLease::derive(&head, "session", &owner, 1_000, 250);
376 assert_eq!(lease.fencing_token, 3);
377 assert_eq!(lease.claim_id, "qwc:7:3");
378 assert_eq!(lease.claimed_at_epoch_ms, 1_000);
379 assert_eq!(lease.expires_at_epoch_ms, 1_250);
380 let again = QueuedWorkClaimLease::derive(&head, "session", &owner, 1_000, 250);
381 assert_eq!(lease.lease_token, again.lease_token);
382 }
383
384 #[test]
385 fn batch_id_includes_optional_nonce() {
386 let plain = derive_batch_id("session", Some("key"), 1_000, None);
387 let nonced = derive_batch_id("session", Some("key"), 1_000, Some(1));
388 assert_ne!(plain, nonced);
389 assert!(plain.starts_with("qwb:"));
390 }
391}