1use super::*;
25
26#[async_trait::async_trait]
27impl RuntimePersistence for Store {
28 fn durability_tier(&self) -> DurabilityTier {
29 DurabilityTier::Durable
30 }
31
32 async fn load_session(
33 &self,
34 scope: SessionReadScope,
35 ) -> Result<Option<PersistedSessionRead>, StoreError> {
36 self.conn
37 .call(move |conn| {
38 let outcome: Result<Option<PersistedSessionRead>, StoreError> = (|| {
39 let Some(meta) = try_load_session_head_meta_from_conn(conn)? else {
40 return Ok(None);
41 };
42 let leaf_node_id = match &scope {
43 SessionReadScope::FullGraph => meta.leaf_node_id.clone(),
44 SessionReadScope::ActivePath { leaf_node_id } => {
45 leaf_node_id.clone().or_else(|| meta.leaf_node_id.clone())
46 }
47 };
48 let mut graph = match scope {
49 SessionReadScope::FullGraph => {
50 Self::load_session_graph_from_conn(conn, meta.leaf_node_id.clone())
51 }
52 SessionReadScope::ActivePath { .. } => {
53 Self::load_active_path_session_graph_from_conn(
54 conn,
55 leaf_node_id.clone(),
56 )
57 .map_err(sqlite_error)?
58 }
59 };
60 graph.set_leaf_node_id(leaf_node_id);
61 let checkpoint = meta
62 .checkpoint_ref
63 .as_ref()
64 .and_then(|blob_ref| Self::get_checkpoint_conn(conn, blob_ref));
65 Ok(Some(PersistedSessionRead {
66 session_id: meta.session_id,
67 head_revision: meta.head_revision,
68 config: meta.config,
69 agent_frames: meta.agent_frames,
70 current_agent_frame_id: meta.current_agent_frame_id,
71 graph,
72 checkpoint_ref: meta.checkpoint_ref,
73 checkpoint,
74 token_ledger: merge_token_ledger_entries(Self::load_usage_deltas_conn(
75 conn,
76 )),
77 }))
78 })(
79 );
80 Ok(outcome)
81 })
82 .await
83 .map_err(sqlite_error)?
84 }
85
86 async fn load_node(
87 &self,
88 node_id: &str,
89 ) -> Result<Option<lash_core::SessionNodeRecord>, StoreError> {
90 let node_id = node_id.to_string();
91 let row: Option<String> = self
92 .conn
93 .call(move |conn| {
94 conn.query_row(
95 "SELECT node_json FROM graph_nodes WHERE node_id = ?1 AND tombstoned = 0",
96 params![node_id],
97 |row| row.get(0),
98 )
99 .optional()
100 })
101 .await
102 .map_err(sqlite_error)?;
103 Ok(row.and_then(|json| serde_json::from_str(&json).ok()))
104 }
105
106 async fn commit_runtime_state(
107 &self,
108 commit: RuntimeCommit,
109 ) -> Result<RuntimeCommitResult, StoreError> {
110 let blob_profile = self.options.blob_profile;
111 let result = self
112 .conn
113 .write_flow(move |tx| {
114 let outcome: Result<RuntimeCommitResult, StoreError> = (|| {
115 let existing = try_load_session_head_meta_from_conn(tx)?;
116 if let Some(bound_session_id) =
117 existing.as_ref().map(|meta| meta.session_id.as_str())
118 && bound_session_id != commit.session_id
119 {
120 return Err(StoreError::SessionBindingMismatch {
121 bound_session_id: bound_session_id.to_string(),
122 attempted_session_id: commit.session_id.clone(),
123 });
124 }
125 if let Some(completed) = &commit.turn_commit {
126 if completed.session_id != commit.session_id {
127 return Err(StoreError::RuntimeTurnCommitConflict {
128 session_id: completed.session_id.clone(),
129 turn_id: completed.turn_id.clone(),
130 });
131 }
132 let prior: Option<(String, String)> = tx
133 .query_row(
134 "SELECT turn_commit_hash, result_json FROM runtime_turn_commits
135 WHERE session_id = ?1 AND turn_id = ?2",
136 params![completed.session_id, completed.turn_id],
137 |row| Ok((row.get(0)?, row.get(1)?)),
138 )
139 .optional()
140 .map_err(sqlite_error)?;
141 if let Some((turn_commit_hash, result_json)) = prior {
142 if turn_commit_hash == completed.turn_commit_hash {
143 let result: RuntimeCommitResult =
144 serde_json::from_str(&result_json).map_err(|err| {
145 StoreError::Backend(format!(
146 "failed to decode runtime turn commit result: {err}"
147 ))
148 })?;
149 if let Some(completion) =
150 commit.release_session_execution_lease.as_ref()
151 {
152 release_session_execution_lease_conn(tx, completion)?;
153 }
154 return Ok(result);
155 }
156 return Err(StoreError::RuntimeTurnCommitConflict {
157 session_id: completed.session_id.clone(),
158 turn_id: completed.turn_id.clone(),
159 });
160 }
161 }
162 let Some(session_execution_lease) = commit.session_execution_lease.as_ref()
163 else {
164 return Err(StoreError::SessionExecutionLeaseExpired {
165 session_id: commit.session_id.clone(),
166 });
167 };
168 ensure_session_execution_lease_conn(
169 tx,
170 &commit.session_id,
171 session_execution_lease,
172 )?;
173 let actual_revision = existing.as_ref().map_or(0, |meta| meta.head_revision);
174 if commit.expected_head_revision.is_some()
175 && commit.expected_head_revision != Some(actual_revision)
176 {
177 return Err(StoreError::HeadRevisionConflict {
178 expected: commit.expected_head_revision,
179 actual: actual_revision,
180 });
181 }
182 for completed in &commit.completed_queue_claims {
183 if completed.session_id != commit.session_id {
184 return Err(StoreError::QueuedWorkClaimExpired {
185 session_id: completed.session_id.clone(),
186 claim_id: completed.claim_id.clone(),
187 });
188 }
189 ensure_queued_work_completion_conn(tx, completed)?;
190 }
191
192 let stored_checkpoint =
193 Self::put_checkpoint_conn(tx, &commit.checkpoint, blob_profile)
194 .map_err(sqlite_error)?;
195
196 if !commit.usage_deltas.is_empty() {
197 let mut stmt = tx
198 .prepare(
199 "INSERT INTO usage_deltas (
200 source, model, input_tokens, output_tokens, cached_input_tokens, reasoning_tokens
201 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
202 )
203 .map_err(sqlite_error)?;
204 for entry in &commit.usage_deltas {
205 stmt.execute(params![
206 entry.source,
207 entry.model,
208 entry.usage.input_tokens,
209 entry.usage.output_tokens,
210 entry.usage.cached_input_tokens,
211 entry.usage.reasoning_tokens,
212 ])
213 .map_err(sqlite_error)?;
214 }
215 }
216
217 let leaf_node_id = match &commit.graph {
218 GraphCommitDelta::Unchanged { leaf_node_id } => leaf_node_id.clone(),
219 GraphCommitDelta::Append {
220 nodes,
221 leaf_node_id,
222 } => {
223 for node in nodes {
224 let node_json = encode_json(node);
225 tx.execute(
226 "INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)",
227 params![node.node_id, node_json],
228 )
229 .map_err(sqlite_error)?;
230 }
231 leaf_node_id.clone()
232 }
233 GraphCommitDelta::ReplaceFull(graph) => {
234 tx.execute("DELETE FROM graph_nodes", [])
235 .map_err(sqlite_error)?;
236 for node in &graph.nodes {
237 let node_json = encode_json(node);
238 tx.execute(
239 "INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)",
240 params![node.node_id, node_json],
241 )
242 .map_err(sqlite_error)?;
243 }
244 graph.leaf_node_id.clone()
245 }
246 };
247 let graph_node_count: usize = tx
248 .query_row(
249 "SELECT COUNT(*) FROM graph_nodes WHERE tombstoned = 0",
250 [],
251 |row| row.get::<_, i64>(0),
252 )
253 .map_err(sqlite_error)? as usize;
254 let next_revision = actual_revision + 1;
255 let meta = SessionHeadMeta {
256 session_id: commit.session_id.clone(),
257 head_revision: next_revision,
258 config: commit.config.clone(),
259 agent_frames: commit.agent_frames.clone(),
260 current_agent_frame_id: commit.current_agent_frame_id.clone(),
261 checkpoint_ref: Some(stored_checkpoint.checkpoint_ref.clone()),
262 leaf_node_id,
263 graph_node_count,
264 token_ledger: Vec::new(),
265 };
266 tx.execute(
267 "INSERT OR REPLACE INTO session_head (singleton, session_id, head_json, head_revision)
268 VALUES (1, ?1, ?2, ?3)",
269 params![
270 meta.session_id,
271 encode_json(&meta),
272 meta.head_revision as i64
273 ],
274 )
275 .map_err(sqlite_error)?;
276 for completed in &commit.completed_queue_claims {
277 for batch_id in &completed.batch_ids {
278 tx.execute(
279 "DELETE FROM queued_work_batches
280 WHERE session_id = ?1
281 AND batch_id = ?2
282 AND claim_id = ?3
283 AND claim_token = ?4",
284 params![
285 completed.session_id,
286 batch_id,
287 completed.claim_id,
288 completed.lease_token
289 ],
290 )
291 .map_err(sqlite_error)?;
292 }
293 }
294 if !commit.committed_attachment_ids.is_empty() {
295 let now = current_epoch_ms() as i64;
296 let mut stmt = tx
297 .prepare(
298 "UPDATE attachment_manifest
299 SET committed_at_ms = COALESCE(committed_at_ms, ?1)
300 WHERE attachment_id = ?2 AND session_id = ?3",
301 )
302 .map_err(sqlite_error)?;
303 for id in &commit.committed_attachment_ids {
304 stmt.execute(params![now, id.as_str(), commit.session_id])
305 .map_err(sqlite_error)?;
306 }
307 }
308 let result = RuntimeCommitResult {
309 head_revision: next_revision,
310 checkpoint_ref: stored_checkpoint.checkpoint_ref,
311 manifest: stored_checkpoint.manifest,
312 };
313 if let Some(completed) = &commit.turn_commit {
314 tx.execute(
315 "INSERT INTO runtime_turn_commits (
316 session_id, turn_id, turn_commit_hash, result_json, committed_at_ms
317 )
318 VALUES (?1, ?2, ?3, ?4, ?5)",
319 params![
320 completed.session_id,
321 completed.turn_id,
322 completed.turn_commit_hash,
323 encode_json(&result),
324 current_epoch_ms() as i64
325 ],
326 )
327 .map_err(sqlite_error)?;
328 }
329 if let Some(completion) = commit.release_session_execution_lease.as_ref() {
330 release_session_execution_lease_conn(tx, completion)?;
331 }
332 Ok(result)
333 })();
334 match outcome {
339 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
340 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
341 }
342 })
343 .await
344 .map_err(sqlite_error)??;
345 self.maybe_auto_gc().await;
346 Ok(result)
347 }
348
349 async fn try_claim_session_execution_lease(
350 &self,
351 session_id: &str,
352 owner: &LeaseOwnerIdentity,
353 lease_ttl_ms: u64,
354 ) -> Result<SessionExecutionLeaseClaimOutcome, StoreError> {
355 let session_id = session_id.to_string();
356 let owner = owner.clone();
357 self.conn
358 .write_flow(move |tx| {
359 let outcome: Result<SessionExecutionLeaseClaimOutcome, StoreError> = (|| {
360 let now = current_epoch_ms();
361 let current = load_session_execution_lease_row_conn(tx, &session_id)?;
362 if current.as_ref().is_some_and(|lease| {
363 lease.lease_token.is_some() && lease.expires_at_ms > now
364 }) {
365 let current = current.expect("checked current lease is present");
366 if current
367 .owner
368 .as_ref()
369 .is_some_and(|current_owner| current_owner.same_incarnation(&owner))
370 {
371 let expires_at = now.saturating_add(lease_ttl_ms);
372 tx.execute(
373 "UPDATE session_execution_leases
374 SET lease_expires_at_ms = ?2
375 WHERE session_id = ?1",
376 params![session_id, expires_at as i64],
377 )
378 .map_err(sqlite_error)?;
379 return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
380 SessionExecutionLease {
381 session_id,
382 owner,
383 lease_token: current.lease_token.expect("live lease token set"),
384 fencing_token: current.fencing_token,
385 claimed_at_epoch_ms: current.claimed_at_ms,
386 expires_at_epoch_ms: expires_at,
387 },
388 ));
389 }
390 return Ok(SessionExecutionLeaseClaimOutcome::Busy {
391 holder: row_to_session_execution_lease(&session_id, current)?,
392 });
393 }
394 Ok(SessionExecutionLeaseClaimOutcome::Acquired(
395 acquire_session_execution_lease_conn(
396 tx,
397 &session_id,
398 &owner,
399 current.as_ref().map_or(0, |lease| lease.fencing_token),
400 now,
401 lease_ttl_ms,
402 )?,
403 ))
404 })(
405 );
406 match outcome {
407 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
408 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
409 }
410 })
411 .await
412 .map_err(sqlite_error)?
413 }
414
415 async fn reclaim_session_execution_lease(
416 &self,
417 session_id: &str,
418 owner: &LeaseOwnerIdentity,
419 observed_holder: &SessionExecutionLeaseFence,
420 lease_ttl_ms: u64,
421 ) -> Result<SessionExecutionLeaseClaimOutcome, StoreError> {
422 let session_id = session_id.to_string();
423 let owner = owner.clone();
424 let observed_holder = observed_holder.clone();
425 self.conn
426 .write_flow(move |tx| {
427 let outcome: Result<SessionExecutionLeaseClaimOutcome, StoreError> = (|| {
428 let now = current_epoch_ms();
429 let current = load_session_execution_lease_row_conn(tx, &session_id)?;
430 let Some(current) = current else {
431 return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
432 acquire_session_execution_lease_conn(
433 tx,
434 &session_id,
435 &owner,
436 0,
437 now,
438 lease_ttl_ms,
439 )?,
440 ));
441 };
442 if current.lease_token.is_none() || current.expires_at_ms <= now {
443 return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
444 acquire_session_execution_lease_conn(
445 tx,
446 &session_id,
447 &owner,
448 current.fencing_token,
449 now,
450 lease_ttl_ms,
451 )?,
452 ));
453 }
454 let holder = row_to_session_execution_lease(&session_id, current)?;
455 if observed_holder.session_id == session_id
456 && holder.owner.same_incarnation(&observed_holder.owner)
457 && holder.lease_token == observed_holder.lease_token
458 && holder.fencing_token == observed_holder.fencing_token
459 && holder.owner.is_definitely_dead_for_claimant(&owner)
460 {
461 let fencing_token = holder.fencing_token.saturating_add(1);
462 let lease_token = format!(
463 "{}:{}:{}:{now}:{fencing_token}",
464 session_id, owner.owner_id, owner.incarnation_id
465 );
466 let expires_at = now.saturating_add(lease_ttl_ms);
467 let liveness_json = encode_liveness(&owner.liveness)?;
468 let changed = tx
469 .execute(
470 "UPDATE session_execution_leases
471 SET lease_owner_id = ?1,
472 lease_owner_incarnation_id = ?2,
473 lease_owner_liveness_json = ?3,
474 lease_token = ?4,
475 lease_fencing_token = ?5,
476 lease_claimed_at_ms = ?6,
477 lease_expires_at_ms = ?7
478 WHERE session_id = ?8
479 AND lease_owner_id = ?9
480 AND lease_owner_incarnation_id = ?10
481 AND lease_token = ?11
482 AND lease_fencing_token = ?12",
483 params![
484 owner.owner_id,
485 owner.incarnation_id,
486 liveness_json,
487 lease_token,
488 fencing_token as i64,
489 now as i64,
490 expires_at as i64,
491 session_id,
492 observed_holder.owner.owner_id,
493 observed_holder.owner.incarnation_id,
494 observed_holder.lease_token,
495 observed_holder.fencing_token as i64,
496 ],
497 )
498 .map_err(sqlite_error)?;
499 if changed == 1 {
500 return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
501 SessionExecutionLease {
502 session_id,
503 owner,
504 lease_token,
505 fencing_token,
506 claimed_at_epoch_ms: now,
507 expires_at_epoch_ms: expires_at,
508 },
509 ));
510 }
511 let current = load_session_execution_lease_row_conn(tx, &session_id)?;
512 if current.as_ref().is_some_and(|lease| {
513 lease.lease_token.is_some() && lease.expires_at_ms > now
514 }) {
515 let current = current.expect("checked current lease is present");
516 return Ok(SessionExecutionLeaseClaimOutcome::Busy {
517 holder: row_to_session_execution_lease(&session_id, current)?,
518 });
519 }
520 let previous_fencing_token =
521 current.as_ref().map_or(0, |lease| lease.fencing_token);
522 return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
523 acquire_session_execution_lease_conn(
524 tx,
525 &session_id,
526 &owner,
527 previous_fencing_token,
528 now,
529 lease_ttl_ms,
530 )?,
531 ));
532 }
533 Ok(SessionExecutionLeaseClaimOutcome::Busy { holder })
534 })(
535 );
536 match outcome {
537 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
538 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
539 }
540 })
541 .await
542 .map_err(sqlite_error)?
543 }
544
545 async fn renew_session_execution_lease(
546 &self,
547 fence: &SessionExecutionLeaseFence,
548 lease_ttl_ms: u64,
549 ) -> Result<SessionExecutionLease, StoreError> {
550 let fence = fence.clone();
551 self.conn
552 .write_flow(move |tx| {
553 let outcome: Result<SessionExecutionLease, StoreError> = (|| {
554 let now = current_epoch_ms();
555 let current = load_session_execution_lease_row_conn(tx, &fence.session_id)?;
556 let Some(current) = current else {
557 return Err(StoreError::SessionExecutionLeaseExpired {
558 session_id: fence.session_id.clone(),
559 });
560 };
561 if !current
562 .owner
563 .as_ref()
564 .is_some_and(|owner| owner.same_incarnation(&fence.owner))
565 || current.lease_token.as_deref() != Some(fence.lease_token.as_str())
566 || current.fencing_token != fence.fencing_token
567 || current.expires_at_ms <= now
568 {
569 return Err(StoreError::SessionExecutionLeaseExpired {
570 session_id: fence.session_id.clone(),
571 });
572 }
573 let expires_at = now.saturating_add(lease_ttl_ms);
574 tx.execute(
575 "UPDATE session_execution_leases
576 SET lease_expires_at_ms = ?5
577 WHERE session_id = ?1
578 AND lease_owner_id = ?2
579 AND lease_owner_incarnation_id = ?3
580 AND lease_token = ?4
581 AND lease_fencing_token = ?6",
582 params![
583 fence.session_id,
584 fence.owner.owner_id,
585 fence.owner.incarnation_id,
586 fence.lease_token,
587 expires_at as i64,
588 fence.fencing_token as i64
589 ],
590 )
591 .map_err(sqlite_error)?;
592 Ok(SessionExecutionLease {
593 session_id: fence.session_id,
594 owner: fence.owner,
595 lease_token: fence.lease_token,
596 fencing_token: fence.fencing_token,
597 claimed_at_epoch_ms: current.claimed_at_ms,
598 expires_at_epoch_ms: expires_at,
599 })
600 })();
601 match outcome {
602 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
603 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
604 }
605 })
606 .await
607 .map_err(sqlite_error)?
608 }
609
610 async fn release_session_execution_lease(
611 &self,
612 completion: &SessionExecutionLeaseCompletion,
613 ) -> Result<(), StoreError> {
614 let completion = completion.clone();
615 self.conn
616 .write_flow(move |tx| {
617 let outcome = release_session_execution_lease_conn(tx, &completion);
618 match outcome {
619 Ok(()) => Ok(TxOutcome::Commit(Ok(()))),
620 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
621 }
622 })
623 .await
624 .map_err(sqlite_error)?
625 }
626
627 async fn enqueue_queued_work(
628 &self,
629 batch: QueuedWorkBatchDraft,
630 ) -> Result<QueuedWorkBatch, StoreError> {
631 let nonce = self.commit_count.fetch_add(1, AtomicOrdering::Relaxed);
632 self.conn
633 .write_flow(move |tx| {
634 let outcome: Result<QueuedWorkBatch, StoreError> = (|| {
635 if let Some(source_key) = batch.source_key.as_deref() {
636 let existing_id: Option<String> = tx
637 .query_row(
638 "SELECT batch_id
639 FROM queued_work_batches
640 WHERE session_id = ?1 AND source_key = ?2",
641 params![batch.session_id, source_key],
642 |row| row.get(0),
643 )
644 .optional()
645 .map_err(sqlite_error)?;
646 if let Some(batch_id) = existing_id {
647 let existing = load_queued_batch_by_id_conn(tx, &batch_id)?
648 .ok_or_else(|| {
649 StoreError::Backend(
650 "queued work source row disappeared".to_string(),
651 )
652 })?;
653 return Ok(existing);
654 }
655 }
656 let now = current_epoch_ms();
657 let batch_id =
658 derive_batch_id(&batch.session_id, batch.source_key.as_deref(), now, Some(nonce));
659 tx.execute(
660 "INSERT INTO queued_work_batches (
661 batch_id, session_id, source_key, delivery_policy, slot_policy,
662 merge_key_json, available_at_ms, enqueued_at_ms
663 )
664 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
665 params![
666 batch_id,
667 batch.session_id,
668 batch.source_key.as_deref(),
669 batch.delivery_policy.as_str(),
670 batch.slot_policy.as_str(),
671 encode_json(&batch.merge_key),
672 batch.available_at_ms as i64,
673 now as i64,
674 ],
675 )
676 .map_err(sqlite_error)?;
677 for (index, payload) in batch.payloads.iter().enumerate() {
678 let item_id = format!("{batch_id}:item:{index}");
679 tx.execute(
680 "INSERT INTO queued_work_items (batch_id, item_index, item_id, payload_json)
681 VALUES (?1, ?2, ?3, ?4)",
682 params![batch_id, index as i64, item_id, encode_json(payload)],
683 )
684 .map_err(sqlite_error)?;
685 }
686 load_queued_batch_by_id_conn(tx, &batch_id)?.ok_or_else(|| {
687 StoreError::Backend("queued work insert disappeared".to_string())
688 })
689 })();
690 match outcome {
693 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
694 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
695 }
696 })
697 .await
698 .map_err(sqlite_error)?
699 }
700
701 async fn claim_leading_ready_session_command(
702 &self,
703 session_id: &str,
704 session_execution_lease: &SessionExecutionLeaseFence,
705 owner: &LeaseOwnerIdentity,
706 lease_ttl_ms: u64,
707 ) -> Result<Option<QueuedWorkClaim>, StoreError> {
708 let session_id = session_id.to_string();
709 let session_execution_lease = session_execution_lease.clone();
710 let owner = owner.clone();
711 self.conn
712 .write_flow(move |tx| {
713 let outcome: Result<TxOutcome<Option<QueuedWorkClaim>>, StoreError> = (|| {
714 ensure_session_execution_lease_conn(
715 tx,
716 &session_id,
717 &session_execution_lease,
718 )?;
719 let now = current_epoch_ms();
720 let candidate_rows = {
721 let mut stmt = tx
722 .prepare(
723 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
724 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
725 claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
726 claim_owner_liveness_json, claim_token, claim_expires_at_ms
727 FROM queued_work_batches
728 WHERE session_id = ?1
729 AND available_at_ms <= ?2
730 ORDER BY enqueue_seq ASC
731 LIMIT ?3",
732 )
733 .map_err(sqlite_error)?;
734 let rows = stmt
735 .query_map(
736 params![session_id, now as i64, claim_scan_limit(1)],
737 queued_batch_row_from_sql,
738 )
739 .map_err(sqlite_error)?;
740 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
741 };
742 let candidate_rows = candidate_rows
743 .into_iter()
744 .filter(|row| {
745 row.claim_token.is_none()
746 || row.claim_expires_at_ms <= now
747 || row
748 .claim_owner
749 .as_ref()
750 .is_some_and(|holder| holder.is_definitely_dead_for_claimant(&owner))
751 })
752 .collect::<Vec<_>>();
753 let candidate_batches = candidate_rows
754 .iter()
755 .map(|row| queued_work_batch_from_conn(tx, row.clone()))
756 .collect::<Result<Vec<_>, StoreError>>()?;
757 let candidates = candidate_rows
758 .iter()
759 .zip(candidate_batches.iter())
760 .map(|(row, batch)| {
761 Ok(ClaimCandidate {
762 enqueue_seq: row.enqueue_seq,
763 claim_fencing_token: row.claim_fencing_token,
764 work_class: batch.work_class().ok_or_else(|| {
765 StoreError::Backend(format!(
766 "queued-work batch `{}` has mixed or empty payload classes",
767 batch.batch_id
768 ))
769 })?,
770 delivery_policy: decode_delivery_policy(
771 row.delivery_policy.clone(),
772 )?,
773 slot_policy: decode_slot_policy(row.slot_policy.clone())?,
774 merge_key: decode_merge_key(row.merge_key_json.clone())?,
775 })
776 })
777 .collect::<Result<Vec<_>, StoreError>>()?;
778 let selected_len = select_leading_session_command(&candidates);
779 if selected_len == 0 {
780 return Ok(TxOutcome::Commit(None));
781 }
782 let mut selected = candidate_rows;
783 selected.truncate(selected_len);
784 let mut selected_batches = candidate_batches;
785 selected_batches.truncate(selected_len);
786 let lease = QueuedWorkClaimLease::derive(
787 &candidates[0],
788 &session_id,
789 &owner,
790 now,
791 lease_ttl_ms,
792 );
793 let liveness_json = encode_liveness(&owner.liveness)?;
794 for row in &selected {
795 let claimed = tx
796 .execute(
797 "UPDATE queued_work_batches
798 SET claim_id = ?3,
799 claim_owner_id = ?4,
800 claim_owner_incarnation_id = ?5,
801 claim_owner_liveness_json = ?6,
802 claim_token = ?7,
803 claim_fencing_token = claim_fencing_token + 1,
804 claim_claimed_at_ms = ?8,
805 claim_expires_at_ms = ?9
806 WHERE session_id = ?1
807 AND batch_id = ?2
808 AND (
809 claim_token IS NULL
810 OR claim_expires_at_ms <= ?8
811 OR (
812 claim_token = ?10
813 AND claim_owner_id = ?11
814 AND claim_owner_incarnation_id = ?12
815 )
816 )",
817 params![
818 session_id,
819 row.batch_id,
820 lease.claim_id,
821 owner.owner_id.as_str(),
822 owner.incarnation_id.as_str(),
823 liveness_json.as_str(),
824 lease.lease_token,
825 now as i64,
826 lease.expires_at_epoch_ms as i64,
827 row.claim_token,
828 row.claim_owner.as_ref().map(|owner| owner.owner_id.as_str()),
829 row.claim_owner
830 .as_ref()
831 .map(|owner| owner.incarnation_id.as_str())
832 ],
833 )
834 .map_err(sqlite_error)?;
835 if claimed == 0 {
836 return Ok(TxOutcome::Rollback(None));
837 }
838 }
839 Ok(TxOutcome::Commit(Some(QueuedWorkClaim {
840 session_id: session_id.clone(),
841 claim_id: lease.claim_id,
842 owner: owner.clone(),
843 lease_token: lease.lease_token,
844 fencing_token: lease.fencing_token,
845 claimed_at_epoch_ms: lease.claimed_at_epoch_ms,
846 expires_at_epoch_ms: lease.expires_at_epoch_ms,
847 batches: selected_batches,
848 })))
849 })();
850 match outcome {
851 Ok(TxOutcome::Commit(value)) => Ok(TxOutcome::Commit(Ok(value))),
852 Ok(TxOutcome::Rollback(value)) => Ok(TxOutcome::Rollback(Ok(value))),
853 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
854 }
855 })
856 .await
857 .map_err(sqlite_error)?
858 }
859
860 async fn claim_ready_queued_work(
861 &self,
862 session_id: &str,
863 session_execution_lease: &SessionExecutionLeaseFence,
864 owner: &LeaseOwnerIdentity,
865 boundary: QueuedWorkClaimBoundary,
866 lease_ttl_ms: u64,
867 max_batches: usize,
868 ) -> Result<Option<QueuedWorkClaim>, StoreError> {
869 if max_batches == 0 {
870 return Ok(None);
871 }
872 let session_id = session_id.to_string();
873 let session_execution_lease = session_execution_lease.clone();
874 let owner = owner.clone();
875 self.conn
876 .write_flow(move |tx| {
877 let outcome: Result<TxOutcome<Option<QueuedWorkClaim>>, StoreError> = (|| {
878 ensure_session_execution_lease_conn(
879 tx,
880 &session_id,
881 &session_execution_lease,
882 )?;
883 let now = current_epoch_ms();
884 let candidate_rows = {
885 let mut stmt = tx
886 .prepare(
887 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
888 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
889 claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
890 claim_owner_liveness_json, claim_token, claim_expires_at_ms
891 FROM queued_work_batches
892 WHERE session_id = ?1
893 AND available_at_ms <= ?2
894 ORDER BY enqueue_seq ASC
895 LIMIT ?3",
896 )
897 .map_err(sqlite_error)?;
898 let rows = stmt
899 .query_map(
900 params![session_id, now as i64, claim_scan_limit(max_batches)],
901 queued_batch_row_from_sql,
902 )
903 .map_err(sqlite_error)?;
904 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
905 };
906 let candidate_rows = candidate_rows
907 .into_iter()
908 .filter(|row| {
909 row.claim_token.is_none()
910 || row.claim_expires_at_ms <= now
911 || row
912 .claim_owner
913 .as_ref()
914 .is_some_and(|holder| holder.is_definitely_dead_for_claimant(&owner))
915 })
916 .collect::<Vec<_>>();
917 let candidate_batches = candidate_rows
918 .iter()
919 .map(|row| queued_work_batch_from_conn(tx, row.clone()))
920 .collect::<Result<Vec<_>, StoreError>>()?;
921 let candidates = candidate_rows
922 .iter()
923 .zip(candidate_batches.iter())
924 .map(|(row, batch)| {
925 Ok(ClaimCandidate {
926 enqueue_seq: row.enqueue_seq,
927 claim_fencing_token: row.claim_fencing_token,
928 work_class: batch.work_class().ok_or_else(|| {
929 StoreError::Backend(format!(
930 "queued-work batch `{}` has mixed or empty payload classes",
931 batch.batch_id
932 ))
933 })?,
934 delivery_policy: decode_delivery_policy(
935 row.delivery_policy.clone(),
936 )?,
937 slot_policy: decode_slot_policy(row.slot_policy.clone())?,
938 merge_key: decode_merge_key(row.merge_key_json.clone())?,
939 })
940 })
941 .collect::<Result<Vec<_>, StoreError>>()?;
942 let selected_len =
943 select_turn_work_claim_prefix(&candidates, boundary, max_batches);
944 if selected_len == 0 {
945 return Ok(TxOutcome::Commit(None));
946 }
947 let mut selected = candidate_rows;
948 selected.truncate(selected_len);
949 let mut selected_batches = candidate_batches;
950 selected_batches.truncate(selected_len);
951 let lease = QueuedWorkClaimLease::derive(
952 &candidates[0],
953 &session_id,
954 &owner,
955 now,
956 lease_ttl_ms,
957 );
958 let liveness_json = encode_liveness(&owner.liveness)?;
959 for row in &selected {
960 let claimed = tx
969 .execute(
970 "UPDATE queued_work_batches
971 SET claim_id = ?3,
972 claim_owner_id = ?4,
973 claim_owner_incarnation_id = ?5,
974 claim_owner_liveness_json = ?6,
975 claim_token = ?7,
976 claim_fencing_token = claim_fencing_token + 1,
977 claim_claimed_at_ms = ?8,
978 claim_expires_at_ms = ?9
979 WHERE session_id = ?1
980 AND batch_id = ?2
981 AND (
982 claim_token IS NULL
983 OR claim_expires_at_ms <= ?8
984 OR (
985 claim_token = ?10
986 AND claim_owner_id = ?11
987 AND claim_owner_incarnation_id = ?12
988 )
989 )",
990 params![
991 session_id,
992 row.batch_id,
993 lease.claim_id,
994 owner.owner_id.as_str(),
995 owner.incarnation_id.as_str(),
996 liveness_json.as_str(),
997 lease.lease_token,
998 now as i64,
999 lease.expires_at_epoch_ms as i64,
1000 row.claim_token,
1001 row.claim_owner.as_ref().map(|owner| owner.owner_id.as_str()),
1002 row.claim_owner
1003 .as_ref()
1004 .map(|owner| owner.incarnation_id.as_str())
1005 ],
1006 )
1007 .map_err(sqlite_error)?;
1008 if claimed == 0 {
1009 return Ok(TxOutcome::Rollback(None));
1013 }
1014 }
1015 Ok(TxOutcome::Commit(Some(QueuedWorkClaim {
1016 session_id: session_id.clone(),
1017 claim_id: lease.claim_id,
1018 owner: owner.clone(),
1019 lease_token: lease.lease_token,
1020 fencing_token: lease.fencing_token,
1021 claimed_at_epoch_ms: lease.claimed_at_epoch_ms,
1022 expires_at_epoch_ms: lease.expires_at_epoch_ms,
1023 batches: selected_batches,
1024 })))
1025 })();
1026 match outcome {
1030 Ok(TxOutcome::Commit(value)) => Ok(TxOutcome::Commit(Ok(value))),
1031 Ok(TxOutcome::Rollback(value)) => Ok(TxOutcome::Rollback(Ok(value))),
1032 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1033 }
1034 })
1035 .await
1036 .map_err(sqlite_error)?
1037 }
1038
1039 async fn renew_queued_work_claim(
1040 &self,
1041 claim: &QueuedWorkClaim,
1042 lease_ttl_ms: u64,
1043 ) -> Result<QueuedWorkClaim, StoreError> {
1044 let now = current_epoch_ms();
1045 let expires_at = now.saturating_add(lease_ttl_ms);
1046 let session_id = claim.session_id.clone();
1047 let claim_id = claim.claim_id.clone();
1048 let lease_token = claim.lease_token.clone();
1049 let changed = self
1050 .conn
1051 .write(move |tx| {
1052 tx.execute(
1053 "UPDATE queued_work_batches
1054 SET claim_expires_at_ms = ?4
1055 WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
1056 params![session_id, claim_id, lease_token, expires_at as i64],
1057 )
1058 })
1059 .await
1060 .map_err(sqlite_error)?;
1061 renewed_claim(claim, changed, expires_at)
1062 }
1063
1064 async fn abandon_queued_work_claim(&self, claim: &QueuedWorkClaim) -> Result<(), StoreError> {
1065 let session_id = claim.session_id.clone();
1066 let claim_id = claim.claim_id.clone();
1067 let lease_token = claim.lease_token.clone();
1068 self.conn
1069 .write(move |tx| {
1070 tx.execute(
1071 "UPDATE queued_work_batches
1072 SET claim_id = NULL,
1073 claim_owner_id = NULL,
1074 claim_owner_incarnation_id = NULL,
1075 claim_owner_liveness_json = NULL,
1076 claim_token = NULL,
1077 claim_claimed_at_ms = 0,
1078 claim_expires_at_ms = 0
1079 WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
1080 params![session_id, claim_id, lease_token],
1081 )
1082 })
1083 .await
1084 .map_err(sqlite_error)?;
1085 Ok(())
1086 }
1087
1088 async fn cancel_queued_work_batch(
1089 &self,
1090 session_id: &str,
1091 batch_id: &str,
1092 ) -> Result<Option<QueuedWorkBatch>, StoreError> {
1093 let session_id = session_id.to_string();
1094 let batch_id = batch_id.to_string();
1095 self.conn
1096 .write_flow(move |tx| {
1097 let outcome: Result<Option<QueuedWorkBatch>, StoreError> = (|| {
1098 let now = current_epoch_ms() as i64;
1099 let row = tx
1100 .query_row(
1101 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1102 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1103 claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
1104 claim_owner_liveness_json, claim_token, claim_expires_at_ms
1105 FROM queued_work_batches
1106 WHERE session_id = ?1
1107 AND batch_id = ?2
1108 AND (claim_token IS NULL OR claim_expires_at_ms <= ?3)",
1109 params![session_id, batch_id, now],
1110 queued_batch_row_from_sql,
1111 )
1112 .optional()
1113 .map_err(sqlite_error)?;
1114 let Some(row) = row else {
1115 return Ok(None);
1116 };
1117 let batch = queued_work_batch_from_conn(tx, row)?;
1118 tx.execute(
1119 "DELETE FROM queued_work_batches
1120 WHERE session_id = ?1
1121 AND batch_id = ?2
1122 AND (claim_token IS NULL OR claim_expires_at_ms <= ?3)",
1123 params![session_id, batch_id, now],
1124 )
1125 .map_err(sqlite_error)?;
1126 Ok(Some(batch))
1127 })();
1128 match outcome {
1129 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
1130 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1131 }
1132 })
1133 .await
1134 .map_err(sqlite_error)?
1135 }
1136
1137 async fn list_queued_work(&self, session_id: &str) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1138 let session_id = session_id.to_string();
1139 self.conn
1140 .call(move |conn| {
1141 let outcome: Result<Vec<QueuedWorkBatch>, StoreError> = (|| {
1142 let rows = {
1143 let mut stmt = conn
1144 .prepare(
1145 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1146 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1147 claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
1148 claim_owner_liveness_json, claim_token, claim_expires_at_ms
1149 FROM queued_work_batches
1150 WHERE session_id = ?1
1151 ORDER BY enqueue_seq ASC",
1152 )
1153 .map_err(sqlite_error)?;
1154 let rows = stmt
1155 .query_map(params![session_id], queued_batch_row_from_sql)
1156 .map_err(sqlite_error)?;
1157 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1158 };
1159 rows.into_iter()
1160 .map(|row| queued_work_batch_from_conn(conn, row))
1161 .collect()
1162 })();
1163 Ok(outcome)
1164 })
1165 .await
1166 .map_err(sqlite_error)?
1167 }
1168
1169 async fn list_pending_queued_work(
1170 &self,
1171 session_id: &str,
1172 ) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1173 let session_id = session_id.to_string();
1174 self.conn
1175 .call(move |conn| {
1176 let outcome: Result<Vec<QueuedWorkBatch>, StoreError> = (|| {
1177 let now = current_epoch_ms();
1178 let rows = {
1179 let mut stmt = conn
1180 .prepare(
1181 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1182 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1183 claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
1184 claim_owner_liveness_json, claim_token, claim_expires_at_ms
1185 FROM queued_work_batches
1186 WHERE session_id = ?1
1187 AND (claim_token IS NULL OR claim_expires_at_ms <= ?2)
1188 ORDER BY enqueue_seq ASC",
1189 )
1190 .map_err(sqlite_error)?;
1191 let rows = stmt
1192 .query_map(
1193 params![session_id, now as i64],
1194 queued_batch_row_from_sql,
1195 )
1196 .map_err(sqlite_error)?;
1197 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1198 };
1199 rows.into_iter()
1200 .map(|row| queued_work_batch_from_conn(conn, row))
1201 .collect()
1202 })();
1203 Ok(outcome)
1204 })
1205 .await
1206 .map_err(sqlite_error)?
1207 }
1208
1209 async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError> {
1210 Store::save_session_meta(self, meta).await;
1211 Ok(())
1212 }
1213
1214 async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError> {
1215 Ok(Store::load_session_meta(self).await)
1216 }
1217
1218 async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError> {
1219 if ids.is_empty() {
1220 return Ok(());
1221 }
1222 let ids = ids.to_vec();
1223 self.conn
1224 .write(move |tx| {
1225 let mut stmt =
1226 tx.prepare("UPDATE graph_nodes SET tombstoned = 1 WHERE node_id = ?1")?;
1227 for id in &ids {
1228 stmt.execute(params![id])?;
1229 }
1230 Ok(())
1231 })
1232 .await
1233 .map_err(sqlite_error)
1234 }
1235
1236 async fn vacuum(&self) -> Result<VacuumReport, StoreError> {
1237 let removed = self
1238 .conn
1239 .write(move |tx| tx.execute("DELETE FROM graph_nodes WHERE tombstoned = 1", []))
1240 .await
1241 .map_err(sqlite_error)?;
1242 Ok(VacuumReport {
1243 removed_node_count: removed,
1244 })
1245 }
1246
1247 async fn gc_unreachable(&self) -> Result<GcReport, StoreError> {
1248 Ok(Store::gc_unreachable(self).await)
1249 }
1250}
1251
1252struct SessionExecutionLeaseRow {
1253 owner: Option<LeaseOwnerIdentity>,
1254 lease_token: Option<String>,
1255 fencing_token: u64,
1256 claimed_at_ms: u64,
1257 expires_at_ms: u64,
1258}
1259
1260fn load_session_execution_lease_row_conn(
1261 conn: &Connection,
1262 session_id: &str,
1263) -> Result<Option<SessionExecutionLeaseRow>, StoreError> {
1264 let row = conn
1265 .query_row(
1266 "SELECT lease_owner_id, lease_token, lease_fencing_token,
1267 lease_claimed_at_ms, lease_expires_at_ms,
1268 lease_owner_incarnation_id, lease_owner_liveness_json
1269 FROM session_execution_leases
1270 WHERE session_id = ?1",
1271 params![session_id],
1272 |row| {
1273 let owner_id: Option<String> = row.get(0)?;
1274 let incarnation_id: Option<String> = row.get(5)?;
1275 let liveness_json: Option<String> = row.get(6)?;
1276 Ok(SessionExecutionLeaseRow {
1277 owner: lease_owner_from_columns(owner_id, incarnation_id, liveness_json),
1278 lease_token: row.get(1)?,
1279 fencing_token: row.get::<_, i64>(2)? as u64,
1280 claimed_at_ms: row.get::<_, i64>(3)? as u64,
1281 expires_at_ms: row.get::<_, i64>(4)? as u64,
1282 })
1283 },
1284 )
1285 .optional()
1286 .map_err(sqlite_error)?;
1287 Ok(row)
1288}
1289
1290fn lease_owner_from_columns(
1291 owner_id: Option<String>,
1292 incarnation_id: Option<String>,
1293 liveness_json: Option<String>,
1294) -> Option<LeaseOwnerIdentity> {
1295 owner_id.map(|owner_id| LeaseOwnerIdentity {
1296 incarnation_id: incarnation_id.unwrap_or_else(|| owner_id.clone()),
1297 owner_id,
1298 liveness: liveness_json
1299 .as_deref()
1300 .and_then(|json| serde_json::from_str(json).ok())
1301 .unwrap_or(LeaseOwnerLiveness::Opaque),
1302 })
1303}
1304
1305fn encode_liveness(liveness: &LeaseOwnerLiveness) -> Result<String, StoreError> {
1306 serde_json::to_string(liveness)
1307 .map_err(|err| StoreError::Backend(format!("failed to encode lease liveness: {err}")))
1308}
1309
1310fn row_to_session_execution_lease(
1311 session_id: &str,
1312 row: SessionExecutionLeaseRow,
1313) -> Result<SessionExecutionLease, StoreError> {
1314 Ok(SessionExecutionLease {
1315 session_id: session_id.to_string(),
1316 owner: row
1317 .owner
1318 .ok_or_else(|| StoreError::Backend("live session lease missing owner".to_string()))?,
1319 lease_token: row.lease_token.ok_or_else(|| {
1320 StoreError::Backend("live session lease missing lease token".to_string())
1321 })?,
1322 fencing_token: row.fencing_token,
1323 claimed_at_epoch_ms: row.claimed_at_ms,
1324 expires_at_epoch_ms: row.expires_at_ms,
1325 })
1326}
1327
1328fn acquire_session_execution_lease_conn(
1329 conn: &Connection,
1330 session_id: &str,
1331 owner: &LeaseOwnerIdentity,
1332 previous_fencing_token: u64,
1333 now: u64,
1334 lease_ttl_ms: u64,
1335) -> Result<SessionExecutionLease, StoreError> {
1336 let fencing_token = previous_fencing_token.saturating_add(1);
1337 let lease_token = format!(
1338 "{}:{}:{}:{now}:{fencing_token}",
1339 session_id, owner.owner_id, owner.incarnation_id
1340 );
1341 let expires_at = now.saturating_add(lease_ttl_ms);
1342 let liveness_json = encode_liveness(&owner.liveness)?;
1343 conn.execute(
1344 "INSERT INTO session_execution_leases (
1345 session_id, lease_owner_id, lease_owner_incarnation_id, lease_owner_liveness_json,
1346 lease_token, lease_fencing_token, lease_claimed_at_ms, lease_expires_at_ms
1347 )
1348 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1349 ON CONFLICT(session_id) DO UPDATE SET
1350 lease_owner_id = excluded.lease_owner_id,
1351 lease_owner_incarnation_id = excluded.lease_owner_incarnation_id,
1352 lease_owner_liveness_json = excluded.lease_owner_liveness_json,
1353 lease_token = excluded.lease_token,
1354 lease_fencing_token = excluded.lease_fencing_token,
1355 lease_claimed_at_ms = excluded.lease_claimed_at_ms,
1356 lease_expires_at_ms = excluded.lease_expires_at_ms",
1357 params![
1358 session_id,
1359 owner.owner_id,
1360 owner.incarnation_id,
1361 liveness_json,
1362 lease_token,
1363 fencing_token as i64,
1364 now as i64,
1365 expires_at as i64
1366 ],
1367 )
1368 .map_err(sqlite_error)?;
1369 Ok(SessionExecutionLease {
1370 session_id: session_id.to_string(),
1371 owner: owner.clone(),
1372 lease_token,
1373 fencing_token,
1374 claimed_at_epoch_ms: now,
1375 expires_at_epoch_ms: expires_at,
1376 })
1377}
1378
1379fn ensure_session_execution_lease_conn(
1380 conn: &Connection,
1381 session_id: &str,
1382 fence: &SessionExecutionLeaseFence,
1383) -> Result<(), StoreError> {
1384 if fence.session_id != session_id {
1385 return Err(StoreError::SessionExecutionLeaseExpired {
1386 session_id: session_id.to_string(),
1387 });
1388 }
1389 let now = current_epoch_ms();
1390 let current = load_session_execution_lease_row_conn(conn, session_id)?;
1391 let Some(current) = current else {
1392 return Err(StoreError::SessionExecutionLeaseExpired {
1393 session_id: session_id.to_string(),
1394 });
1395 };
1396 if current
1397 .owner
1398 .as_ref()
1399 .is_some_and(|owner| owner.same_incarnation(&fence.owner))
1400 && current.lease_token.as_deref() == Some(fence.lease_token.as_str())
1401 && current.fencing_token == fence.fencing_token
1402 && current.expires_at_ms > now
1403 {
1404 Ok(())
1405 } else {
1406 Err(StoreError::SessionExecutionLeaseExpired {
1407 session_id: session_id.to_string(),
1408 })
1409 }
1410}
1411
1412fn release_session_execution_lease_conn(
1413 conn: &Connection,
1414 completion: &SessionExecutionLeaseCompletion,
1415) -> Result<(), StoreError> {
1416 conn.execute(
1417 "UPDATE session_execution_leases
1418 SET lease_owner_id = NULL,
1419 lease_owner_incarnation_id = NULL,
1420 lease_owner_liveness_json = NULL,
1421 lease_token = NULL,
1422 lease_claimed_at_ms = 0,
1423 lease_expires_at_ms = 0
1424 WHERE session_id = ?1
1425 AND lease_owner_id = ?2
1426 AND lease_owner_incarnation_id = ?3
1427 AND lease_token = ?4
1428 AND lease_fencing_token = ?5",
1429 params![
1430 completion.session_id,
1431 completion.owner.owner_id,
1432 completion.owner.incarnation_id,
1433 completion.lease_token,
1434 completion.fencing_token as i64
1435 ],
1436 )
1437 .map_err(sqlite_error)?;
1438 Ok(())
1439}