1use super::*;
25
26#[async_trait::async_trait]
27impl RuntimePersistence for Store {
28 fn durability_tier(&self) -> DurabilityTier {
29 DurabilityTier::Durable
30 }
31
32 async fn load_session(
33 &self,
34 scope: SessionReadScope,
35 ) -> Result<Option<PersistedSessionRead>, StoreError> {
36 self.conn
37 .call(move |conn| {
38 let outcome: Result<Option<PersistedSessionRead>, StoreError> = (|| {
39 let Some(meta) = try_load_session_head_meta_from_conn(conn)? else {
40 return Ok(None);
41 };
42 let leaf_node_id = match &scope {
43 SessionReadScope::FullGraph => meta.leaf_node_id.clone(),
44 SessionReadScope::ActivePath { leaf_node_id } => {
45 leaf_node_id.clone().or_else(|| meta.leaf_node_id.clone())
46 }
47 };
48 let mut graph = match scope {
49 SessionReadScope::FullGraph => {
50 Self::load_session_graph_from_conn(conn, meta.leaf_node_id.clone())
51 }
52 SessionReadScope::ActivePath { .. } => {
53 Self::load_active_path_session_graph_from_conn(
54 conn,
55 leaf_node_id.clone(),
56 )
57 .map_err(sqlite_error)?
58 }
59 };
60 graph.set_leaf_node_id(leaf_node_id);
61 let checkpoint = meta
62 .checkpoint_ref
63 .as_ref()
64 .map(|blob_ref| Self::get_checkpoint_conn(conn, blob_ref))
65 .transpose()?
66 .flatten();
67 Ok(Some(PersistedSessionRead {
68 session_id: meta.session_id,
69 head_revision: meta.head_revision,
70 config: meta.config,
71 agent_frames: meta.agent_frames,
72 current_agent_frame_id: meta.current_agent_frame_id,
73 graph,
74 checkpoint_ref: meta.checkpoint_ref,
75 checkpoint,
76 token_ledger: merge_token_ledger_entries(Self::load_usage_deltas_conn(
77 conn,
78 )),
79 }))
80 })(
81 );
82 Ok(outcome)
83 })
84 .await
85 .map_err(sqlite_error)?
86 }
87
88 async fn load_node(
89 &self,
90 node_id: &str,
91 ) -> Result<Option<lash_core::SessionNodeRecord>, StoreError> {
92 let node_id = node_id.to_string();
93 let row: Option<String> = self
94 .conn
95 .call(move |conn| {
96 conn.query_row(
97 "SELECT node_json FROM graph_nodes WHERE node_id = ?1 AND tombstoned = 0",
98 params![node_id],
99 |row| row.get(0),
100 )
101 .optional()
102 })
103 .await
104 .map_err(sqlite_error)?;
105 Ok(row.and_then(|json| serde_json::from_str(&json).ok()))
106 }
107
108 async fn commit_runtime_state(
109 &self,
110 commit: RuntimeCommit,
111 ) -> Result<RuntimeCommitResult, StoreError> {
112 let blob_profile = self.options.blob_profile;
113 let result = self
114 .conn
115 .write_flow(move |tx| {
116 let outcome: Result<RuntimeCommitResult, StoreError> = (|| {
117 let existing = try_load_session_head_meta_from_conn(tx)?;
118 if let Some(bound_session_id) =
119 existing.as_ref().map(|meta| meta.session_id.as_str())
120 && bound_session_id != commit.session_id
121 {
122 return Err(StoreError::SessionBindingMismatch {
123 bound_session_id: bound_session_id.to_string(),
124 attempted_session_id: commit.session_id.clone(),
125 });
126 }
127 if let Some(completed) = &commit.turn_commit {
128 if completed.session_id != commit.session_id {
129 return Err(StoreError::RuntimeTurnCommitConflict {
130 session_id: completed.session_id.clone(),
131 turn_id: completed.turn_id.clone(),
132 });
133 }
134 let prior: Option<(String, String)> = tx
135 .query_row(
136 "SELECT turn_commit_hash, result_json FROM runtime_turn_commits
137 WHERE session_id = ?1 AND turn_id = ?2",
138 params![completed.session_id, completed.turn_id],
139 |row| Ok((row.get(0)?, row.get(1)?)),
140 )
141 .optional()
142 .map_err(sqlite_error)?;
143 if let Some((turn_commit_hash, result_json)) = prior {
144 if turn_commit_hash == completed.turn_commit_hash {
145 let result: RuntimeCommitResult =
146 serde_json::from_str(&result_json).map_err(|err| {
147 StoreError::Backend(format!(
148 "failed to decode runtime turn commit result: {err}"
149 ))
150 })?;
151 if let Some(completion) =
152 commit.release_session_execution_lease.as_ref()
153 {
154 release_session_execution_lease_conn(tx, completion)?;
155 }
156 return Ok(result);
157 }
158 return Err(StoreError::RuntimeTurnCommitConflict {
159 session_id: completed.session_id.clone(),
160 turn_id: completed.turn_id.clone(),
161 });
162 }
163 }
164 let Some(session_execution_lease) = commit.session_execution_lease.as_ref()
165 else {
166 return Err(StoreError::SessionExecutionLeaseExpired {
167 session_id: commit.session_id.clone(),
168 });
169 };
170 ensure_session_execution_lease_conn(
171 tx,
172 &commit.session_id,
173 session_execution_lease,
174 )?;
175 let actual_revision = existing.as_ref().map_or(0, |meta| meta.head_revision);
176 if commit.expected_head_revision.is_some()
177 && commit.expected_head_revision != Some(actual_revision)
178 {
179 return Err(StoreError::HeadRevisionConflict {
180 expected: commit.expected_head_revision,
181 actual: actual_revision,
182 });
183 }
184 for completed in &commit.completed_queue_claims {
185 if completed.session_id != commit.session_id {
186 return Err(StoreError::QueuedWorkClaimExpired {
187 session_id: completed.session_id.clone(),
188 claim_id: completed.claim_id.clone(),
189 });
190 }
191 ensure_queued_work_completion_conn(tx, completed)?;
192 }
193 for completed in &commit.completed_turn_input_claims {
194 if completed.session_id != commit.session_id {
195 return Err(StoreError::TurnInputClaimExpired {
196 session_id: completed.session_id.clone(),
197 claim_id: completed.claim_id.clone(),
198 });
199 }
200 let owned_rows: usize = tx
201 .query_row(
202 "SELECT COUNT(*)
203 FROM pending_turn_inputs
204 WHERE session_id = ?1
205 AND claim_id = ?2
206 AND claim_token = ?3",
207 params![
208 completed.session_id,
209 completed.claim_id,
210 completed.lease_token
211 ],
212 |row| row.get::<_, i64>(0),
213 )
214 .map_err(sqlite_error)? as usize;
215 ensure_turn_input_completion_owns_all_inputs(completed, owned_rows)?;
216 }
217
218 let stored_checkpoint =
219 Self::put_checkpoint_conn(tx, &commit.checkpoint, blob_profile)
220 .map_err(sqlite_error)?;
221
222 if !commit.usage_deltas.is_empty() {
223 let mut stmt = tx
224 .prepare(
225 "INSERT INTO usage_deltas (
226 source, model, input_tokens, output_tokens, cached_input_tokens, reasoning_tokens
227 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
228 )
229 .map_err(sqlite_error)?;
230 for entry in &commit.usage_deltas {
231 stmt.execute(params![
232 entry.source,
233 entry.model,
234 entry.usage.input_tokens,
235 entry.usage.output_tokens,
236 entry.usage.cached_input_tokens,
237 entry.usage.reasoning_tokens,
238 ])
239 .map_err(sqlite_error)?;
240 }
241 }
242
243 let leaf_node_id = match &commit.graph {
244 GraphCommitDelta::Unchanged { leaf_node_id } => leaf_node_id.clone(),
245 GraphCommitDelta::Append {
246 nodes,
247 leaf_node_id,
248 } => {
249 for node in nodes {
250 let node_json = encode_json(node);
251 tx.execute(
252 "INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)",
253 params![node.node_id, node_json],
254 )
255 .map_err(sqlite_error)?;
256 }
257 leaf_node_id.clone()
258 }
259 GraphCommitDelta::ReplaceFull(graph) => {
260 tx.execute("DELETE FROM graph_nodes", [])
261 .map_err(sqlite_error)?;
262 for node in &graph.nodes {
263 let node_json = encode_json(node);
264 tx.execute(
265 "INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)",
266 params![node.node_id, node_json],
267 )
268 .map_err(sqlite_error)?;
269 }
270 graph.leaf_node_id.clone()
271 }
272 };
273 let graph_node_count: usize = tx
274 .query_row(
275 "SELECT COUNT(*) FROM graph_nodes WHERE tombstoned = 0",
276 [],
277 |row| row.get::<_, i64>(0),
278 )
279 .map_err(sqlite_error)? as usize;
280 let next_revision = actual_revision + 1;
281 let meta = SessionHeadMeta {
282 schema_version: lash_core::store::SESSION_HEAD_META_SCHEMA_VERSION,
283 session_id: commit.session_id.clone(),
284 head_revision: next_revision,
285 config: commit.config.clone(),
286 agent_frames: commit.agent_frames.clone(),
287 current_agent_frame_id: commit.current_agent_frame_id.clone(),
288 checkpoint_ref: Some(stored_checkpoint.checkpoint_ref.clone()),
289 leaf_node_id,
290 graph_node_count,
291 token_ledger: Vec::new(),
292 };
293 tx.execute(
294 "INSERT OR REPLACE INTO session_head (singleton, session_id, head_json, head_revision)
295 VALUES (1, ?1, ?2, ?3)",
296 params![
297 meta.session_id,
298 encode_json(&meta),
299 meta.head_revision as i64
300 ],
301 )
302 .map_err(sqlite_error)?;
303 for completed in &commit.completed_queue_claims {
304 for batch_id in &completed.batch_ids {
305 tx.execute(
306 "DELETE FROM queued_work_batches
307 WHERE session_id = ?1
308 AND batch_id = ?2
309 AND claim_id = ?3
310 AND claim_token = ?4",
311 params![
312 completed.session_id,
313 batch_id,
314 completed.claim_id,
315 completed.lease_token
316 ],
317 )
318 .map_err(sqlite_error)?;
319 }
320 }
321 for completed in &commit.completed_turn_input_claims {
322 for input_id in &completed.input_ids {
323 tx.execute(
324 "UPDATE pending_turn_inputs
325 SET state = ?5,
326 claim_id = NULL,
327 claim_owner_id = NULL,
328 claim_owner_incarnation_id = NULL,
329 claim_owner_liveness_json = NULL,
330 claim_token = NULL,
331 claim_claimed_at_ms = 0,
332 claim_expires_at_ms = 0
333 WHERE session_id = ?1
334 AND input_id = ?2
335 AND claim_id = ?3
336 AND claim_token = ?4",
337 params![
338 completed.session_id,
339 input_id,
340 completed.claim_id,
341 completed.lease_token,
342 lash_core::TurnInputState::Completed.as_str(),
343 ],
344 )
345 .map_err(sqlite_error)?;
346 }
347 }
348 if let Some(turn_id) = commit.interrupted_turn_input_turn_id.as_deref() {
349 let input_ids = {
350 let mut stmt = tx
351 .prepare(
352 "SELECT input_id, ingress_json
353 FROM pending_turn_inputs
354 WHERE session_id = ?1 AND state = ?2",
355 )
356 .map_err(sqlite_error)?;
357 let rows = stmt
358 .query_map(
359 params![
360 commit.session_id,
361 lash_core::TurnInputState::PendingActive.as_str()
362 ],
363 |row| {
364 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
365 },
366 )
367 .map_err(sqlite_error)?;
368 let mut input_ids = Vec::new();
369 for row in rows {
370 let (input_id, ingress_json) = row.map_err(sqlite_error)?;
371 let ingress = decode_turn_input_ingress(ingress_json)?;
372 if ingress
373 .active_turn_id()
374 .is_some_and(|active| active == turn_id)
375 {
376 input_ids.push(input_id);
377 }
378 }
379 input_ids
380 };
381 let next_turn_ingress = encode_json(&lash_core::TurnInputIngress::NextTurn);
382 let mut stmt = tx
383 .prepare(
384 "UPDATE pending_turn_inputs
385 SET state = ?3,
386 ingress_json = ?4,
387 claim_id = NULL,
388 claim_owner_id = NULL,
389 claim_owner_incarnation_id = NULL,
390 claim_owner_liveness_json = NULL,
391 claim_token = NULL,
392 claim_claimed_at_ms = 0,
393 claim_expires_at_ms = 0
394 WHERE session_id = ?1 AND input_id = ?2",
395 )
396 .map_err(sqlite_error)?;
397 for input_id in input_ids {
398 stmt.execute(params![
399 commit.session_id,
400 input_id,
401 lash_core::TurnInputState::DeferredNextTurn.as_str(),
402 next_turn_ingress
403 ])
404 .map_err(sqlite_error)?;
405 }
406 }
407 if !commit.committed_attachment_ids.is_empty() {
408 let now = current_epoch_ms() as i64;
409 let mut stmt = tx
410 .prepare(
411 "UPDATE attachment_manifest
412 SET committed_at_ms = COALESCE(committed_at_ms, ?1)
413 WHERE attachment_id = ?2 AND session_id = ?3",
414 )
415 .map_err(sqlite_error)?;
416 for id in &commit.committed_attachment_ids {
417 stmt.execute(params![now, id.as_str(), commit.session_id])
418 .map_err(sqlite_error)?;
419 }
420 }
421 let result = RuntimeCommitResult {
422 head_revision: next_revision,
423 checkpoint_ref: stored_checkpoint.checkpoint_ref,
424 manifest: stored_checkpoint.manifest,
425 };
426 if let Some(completed) = &commit.turn_commit {
427 tx.execute(
428 "INSERT INTO runtime_turn_commits (
429 session_id, turn_id, turn_commit_hash, result_json, committed_at_ms
430 )
431 VALUES (?1, ?2, ?3, ?4, ?5)",
432 params![
433 completed.session_id,
434 completed.turn_id,
435 completed.turn_commit_hash,
436 encode_json(&result),
437 current_epoch_ms() as i64
438 ],
439 )
440 .map_err(sqlite_error)?;
441 }
442 if let Some(completion) = commit.release_session_execution_lease.as_ref() {
443 release_session_execution_lease_conn(tx, completion)?;
444 }
445 Ok(result)
446 })();
447 match outcome {
452 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
453 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
454 }
455 })
456 .await
457 .map_err(sqlite_error)??;
458 self.maybe_auto_gc().await;
459 Ok(result)
460 }
461
462 async fn try_claim_session_execution_lease(
463 &self,
464 session_id: &str,
465 owner: &LeaseOwnerIdentity,
466 lease_ttl_ms: u64,
467 ) -> Result<SessionExecutionLeaseClaimOutcome, StoreError> {
468 let session_id = session_id.to_string();
469 let owner = owner.clone();
470 self.conn
471 .write_flow(move |tx| {
472 let outcome: Result<SessionExecutionLeaseClaimOutcome, StoreError> = (|| {
473 let now = current_epoch_ms();
474 let current = load_session_execution_lease_row_conn(tx, &session_id)?;
475 if current.as_ref().is_some_and(|lease| {
476 lease.lease_token.is_some() && lease.expires_at_ms > now
477 }) {
478 let current = current.expect("checked current lease is present");
479 if current
480 .owner
481 .as_ref()
482 .is_some_and(|current_owner| current_owner.same_incarnation(&owner))
483 {
484 let expires_at = now.saturating_add(lease_ttl_ms);
485 tx.execute(
486 "UPDATE session_execution_leases
487 SET lease_expires_at_ms = ?2
488 WHERE session_id = ?1",
489 params![session_id, expires_at as i64],
490 )
491 .map_err(sqlite_error)?;
492 return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
493 SessionExecutionLease {
494 session_id,
495 owner,
496 lease_token: current.lease_token.expect("live lease token set"),
497 fencing_token: current.fencing_token,
498 claimed_at_epoch_ms: current.claimed_at_ms,
499 expires_at_epoch_ms: expires_at,
500 },
501 ));
502 }
503 return Ok(SessionExecutionLeaseClaimOutcome::Busy {
504 holder: row_to_session_execution_lease(&session_id, current)?,
505 });
506 }
507 Ok(SessionExecutionLeaseClaimOutcome::Acquired(
508 acquire_session_execution_lease_conn(
509 tx,
510 &session_id,
511 &owner,
512 current.as_ref().map_or(0, |lease| lease.fencing_token),
513 now,
514 lease_ttl_ms,
515 )?,
516 ))
517 })(
518 );
519 match outcome {
520 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
521 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
522 }
523 })
524 .await
525 .map_err(sqlite_error)?
526 }
527
528 async fn reclaim_session_execution_lease(
529 &self,
530 session_id: &str,
531 owner: &LeaseOwnerIdentity,
532 observed_holder: &SessionExecutionLeaseFence,
533 lease_ttl_ms: u64,
534 ) -> Result<SessionExecutionLeaseClaimOutcome, StoreError> {
535 let session_id = session_id.to_string();
536 let owner = owner.clone();
537 let observed_holder = observed_holder.clone();
538 self.conn
539 .write_flow(move |tx| {
540 let outcome: Result<SessionExecutionLeaseClaimOutcome, StoreError> = (|| {
541 let now = current_epoch_ms();
542 let current = load_session_execution_lease_row_conn(tx, &session_id)?;
543 let Some(current) = current else {
544 return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
545 acquire_session_execution_lease_conn(
546 tx,
547 &session_id,
548 &owner,
549 0,
550 now,
551 lease_ttl_ms,
552 )?,
553 ));
554 };
555 if current.lease_token.is_none() || current.expires_at_ms <= now {
556 return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
557 acquire_session_execution_lease_conn(
558 tx,
559 &session_id,
560 &owner,
561 current.fencing_token,
562 now,
563 lease_ttl_ms,
564 )?,
565 ));
566 }
567 let holder = row_to_session_execution_lease(&session_id, current)?;
568 if observed_holder.session_id == session_id
569 && holder.owner.same_incarnation(&observed_holder.owner)
570 && holder.lease_token == observed_holder.lease_token
571 && holder.fencing_token == observed_holder.fencing_token
572 && holder.owner.is_definitely_dead_for_claimant(&owner)
573 {
574 let fencing_token = holder.fencing_token.saturating_add(1);
575 let lease_token = format!(
576 "{}:{}:{}:{now}:{fencing_token}",
577 session_id, owner.owner_id, owner.incarnation_id
578 );
579 let expires_at = now.saturating_add(lease_ttl_ms);
580 let liveness_json = encode_liveness(&owner.liveness)?;
581 let changed = tx
582 .execute(
583 "UPDATE session_execution_leases
584 SET lease_owner_id = ?1,
585 lease_owner_incarnation_id = ?2,
586 lease_owner_liveness_json = ?3,
587 lease_token = ?4,
588 lease_fencing_token = ?5,
589 lease_claimed_at_ms = ?6,
590 lease_expires_at_ms = ?7
591 WHERE session_id = ?8
592 AND lease_owner_id = ?9
593 AND lease_owner_incarnation_id = ?10
594 AND lease_token = ?11
595 AND lease_fencing_token = ?12",
596 params![
597 owner.owner_id,
598 owner.incarnation_id,
599 liveness_json,
600 lease_token,
601 fencing_token as i64,
602 now as i64,
603 expires_at as i64,
604 session_id,
605 observed_holder.owner.owner_id,
606 observed_holder.owner.incarnation_id,
607 observed_holder.lease_token,
608 observed_holder.fencing_token as i64,
609 ],
610 )
611 .map_err(sqlite_error)?;
612 if changed == 1 {
613 return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
614 SessionExecutionLease {
615 session_id,
616 owner,
617 lease_token,
618 fencing_token,
619 claimed_at_epoch_ms: now,
620 expires_at_epoch_ms: expires_at,
621 },
622 ));
623 }
624 let current = load_session_execution_lease_row_conn(tx, &session_id)?;
625 if current.as_ref().is_some_and(|lease| {
626 lease.lease_token.is_some() && lease.expires_at_ms > now
627 }) {
628 let current = current.expect("checked current lease is present");
629 return Ok(SessionExecutionLeaseClaimOutcome::Busy {
630 holder: row_to_session_execution_lease(&session_id, current)?,
631 });
632 }
633 let previous_fencing_token =
634 current.as_ref().map_or(0, |lease| lease.fencing_token);
635 return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
636 acquire_session_execution_lease_conn(
637 tx,
638 &session_id,
639 &owner,
640 previous_fencing_token,
641 now,
642 lease_ttl_ms,
643 )?,
644 ));
645 }
646 Ok(SessionExecutionLeaseClaimOutcome::Busy { holder })
647 })(
648 );
649 match outcome {
650 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
651 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
652 }
653 })
654 .await
655 .map_err(sqlite_error)?
656 }
657
658 async fn renew_session_execution_lease(
659 &self,
660 fence: &SessionExecutionLeaseFence,
661 lease_ttl_ms: u64,
662 ) -> Result<SessionExecutionLease, StoreError> {
663 let fence = fence.clone();
664 self.conn
665 .write_flow(move |tx| {
666 let outcome: Result<SessionExecutionLease, StoreError> = (|| {
667 let now = current_epoch_ms();
668 let current = load_session_execution_lease_row_conn(tx, &fence.session_id)?;
669 let Some(current) = current else {
670 return Err(StoreError::SessionExecutionLeaseExpired {
671 session_id: fence.session_id.clone(),
672 });
673 };
674 if !current
675 .owner
676 .as_ref()
677 .is_some_and(|owner| owner.same_incarnation(&fence.owner))
678 || current.lease_token.as_deref() != Some(fence.lease_token.as_str())
679 || current.fencing_token != fence.fencing_token
680 || current.expires_at_ms <= now
681 {
682 return Err(StoreError::SessionExecutionLeaseExpired {
683 session_id: fence.session_id.clone(),
684 });
685 }
686 let expires_at = now.saturating_add(lease_ttl_ms);
687 tx.execute(
688 "UPDATE session_execution_leases
689 SET lease_expires_at_ms = ?5
690 WHERE session_id = ?1
691 AND lease_owner_id = ?2
692 AND lease_owner_incarnation_id = ?3
693 AND lease_token = ?4
694 AND lease_fencing_token = ?6",
695 params![
696 fence.session_id,
697 fence.owner.owner_id,
698 fence.owner.incarnation_id,
699 fence.lease_token,
700 expires_at as i64,
701 fence.fencing_token as i64
702 ],
703 )
704 .map_err(sqlite_error)?;
705 Ok(SessionExecutionLease {
706 session_id: fence.session_id,
707 owner: fence.owner,
708 lease_token: fence.lease_token,
709 fencing_token: fence.fencing_token,
710 claimed_at_epoch_ms: current.claimed_at_ms,
711 expires_at_epoch_ms: expires_at,
712 })
713 })();
714 match outcome {
715 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
716 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
717 }
718 })
719 .await
720 .map_err(sqlite_error)?
721 }
722
723 async fn release_session_execution_lease(
724 &self,
725 completion: &SessionExecutionLeaseCompletion,
726 ) -> Result<(), StoreError> {
727 let completion = completion.clone();
728 self.conn
729 .write_flow(move |tx| {
730 let outcome = release_session_execution_lease_conn(tx, &completion);
731 match outcome {
732 Ok(()) => Ok(TxOutcome::Commit(Ok(()))),
733 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
734 }
735 })
736 .await
737 .map_err(sqlite_error)?
738 }
739
740 async fn enqueue_queued_work(
741 &self,
742 batch: QueuedWorkBatchDraft,
743 ) -> Result<QueuedWorkBatch, StoreError> {
744 let nonce = self.commit_count.fetch_add(1, AtomicOrdering::Relaxed);
745 self.conn
746 .write_flow(move |tx| {
747 let outcome: Result<QueuedWorkBatch, StoreError> = (|| {
748 if let Some(source_key) = batch.source_key.as_deref() {
749 let existing_id: Option<String> = tx
750 .query_row(
751 "SELECT batch_id
752 FROM queued_work_batches
753 WHERE session_id = ?1 AND source_key = ?2",
754 params![batch.session_id, source_key],
755 |row| row.get(0),
756 )
757 .optional()
758 .map_err(sqlite_error)?;
759 if let Some(batch_id) = existing_id {
760 let existing = load_queued_batch_by_id_conn(tx, &batch_id)?
761 .ok_or_else(|| {
762 StoreError::Backend(
763 "queued work source row disappeared".to_string(),
764 )
765 })?;
766 return Ok(existing);
767 }
768 }
769 let now = current_epoch_ms();
770 let batch_id =
771 derive_batch_id(&batch.session_id, batch.source_key.as_deref(), now, Some(nonce));
772 tx.execute(
773 "INSERT INTO queued_work_batches (
774 batch_id, session_id, source_key, delivery_policy, slot_policy,
775 merge_key_json, available_at_ms, enqueued_at_ms
776 )
777 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
778 params![
779 batch_id,
780 batch.session_id,
781 batch.source_key.as_deref(),
782 batch.delivery_policy.as_str(),
783 batch.slot_policy.as_str(),
784 encode_json(&batch.merge_key),
785 batch.available_at_ms as i64,
786 now as i64,
787 ],
788 )
789 .map_err(sqlite_error)?;
790 for (index, payload) in batch.payloads.iter().enumerate() {
791 let item_id = format!("{batch_id}:item:{index}");
792 tx.execute(
793 "INSERT INTO queued_work_items (batch_id, item_index, item_id, payload_json)
794 VALUES (?1, ?2, ?3, ?4)",
795 params![batch_id, index as i64, item_id, encode_json(payload)],
796 )
797 .map_err(sqlite_error)?;
798 }
799 load_queued_batch_by_id_conn(tx, &batch_id)?.ok_or_else(|| {
800 StoreError::Backend("queued work insert disappeared".to_string())
801 })
802 })();
803 match outcome {
806 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
807 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
808 }
809 })
810 .await
811 .map_err(sqlite_error)?
812 }
813
814 async fn claim_leading_ready_session_command(
815 &self,
816 session_id: &str,
817 session_execution_lease: &SessionExecutionLeaseFence,
818 owner: &LeaseOwnerIdentity,
819 lease_ttl_ms: u64,
820 ) -> Result<Option<QueuedWorkClaim>, StoreError> {
821 let session_id = session_id.to_string();
822 let session_execution_lease = session_execution_lease.clone();
823 let owner = owner.clone();
824 self.conn
825 .write_flow(move |tx| {
826 let outcome: Result<TxOutcome<Option<QueuedWorkClaim>>, StoreError> = (|| {
827 ensure_session_execution_lease_conn(
828 tx,
829 &session_id,
830 &session_execution_lease,
831 )?;
832 let now = current_epoch_ms();
833 let candidate_rows = {
834 let mut stmt = tx
835 .prepare(
836 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
837 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
838 claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
839 claim_owner_liveness_json, claim_token, claim_expires_at_ms
840 FROM queued_work_batches
841 WHERE session_id = ?1
842 AND available_at_ms <= ?2
843 ORDER BY enqueue_seq ASC
844 LIMIT ?3",
845 )
846 .map_err(sqlite_error)?;
847 let rows = stmt
848 .query_map(
849 params![session_id, now as i64, claim_scan_limit(1)],
850 queued_batch_row_from_sql,
851 )
852 .map_err(sqlite_error)?;
853 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
854 };
855 let candidate_rows = candidate_rows
856 .into_iter()
857 .filter(|row| {
858 row.claim_token.is_none()
859 || row.claim_expires_at_ms <= now
860 || row
861 .claim_owner
862 .as_ref()
863 .is_some_and(|holder| holder.is_definitely_dead_for_claimant(&owner))
864 })
865 .collect::<Vec<_>>();
866 let candidate_batches = candidate_rows
867 .iter()
868 .map(|row| queued_work_batch_from_conn(tx, row.clone()))
869 .collect::<Result<Vec<_>, StoreError>>()?;
870 let candidates = candidate_rows
871 .iter()
872 .zip(candidate_batches.iter())
873 .map(|(row, batch)| {
874 Ok(ClaimCandidate {
875 enqueue_seq: row.enqueue_seq,
876 claim_fencing_token: row.claim_fencing_token,
877 work_class: batch.work_class().ok_or_else(|| {
878 StoreError::Backend(format!(
879 "queued-work batch `{}` has mixed or empty payload classes",
880 batch.batch_id
881 ))
882 })?,
883 delivery_policy: decode_delivery_policy(
884 row.delivery_policy.clone(),
885 )?,
886 slot_policy: decode_slot_policy(row.slot_policy.clone())?,
887 merge_key: decode_merge_key(row.merge_key_json.clone())?,
888 })
889 })
890 .collect::<Result<Vec<_>, StoreError>>()?;
891 let selected_len = select_leading_session_command(&candidates);
892 if selected_len == 0 {
893 return Ok(TxOutcome::Commit(None));
894 }
895 let mut selected = candidate_rows;
896 selected.truncate(selected_len);
897 let mut selected_batches = candidate_batches;
898 selected_batches.truncate(selected_len);
899 let lease = QueuedWorkClaimLease::derive(
900 &candidates[0],
901 &session_id,
902 &owner,
903 now,
904 lease_ttl_ms,
905 );
906 let liveness_json = encode_liveness(&owner.liveness)?;
907 for row in &selected {
908 let claimed = tx
909 .execute(
910 "UPDATE queued_work_batches
911 SET claim_id = ?3,
912 claim_owner_id = ?4,
913 claim_owner_incarnation_id = ?5,
914 claim_owner_liveness_json = ?6,
915 claim_token = ?7,
916 claim_fencing_token = claim_fencing_token + 1,
917 claim_claimed_at_ms = ?8,
918 claim_expires_at_ms = ?9
919 WHERE session_id = ?1
920 AND batch_id = ?2
921 AND (
922 claim_token IS NULL
923 OR claim_expires_at_ms <= ?8
924 OR (
925 claim_token = ?10
926 AND claim_owner_id = ?11
927 AND claim_owner_incarnation_id = ?12
928 )
929 )",
930 params![
931 session_id,
932 row.batch_id,
933 lease.claim_id,
934 owner.owner_id.as_str(),
935 owner.incarnation_id.as_str(),
936 liveness_json.as_str(),
937 lease.lease_token,
938 now as i64,
939 lease.expires_at_epoch_ms as i64,
940 row.claim_token,
941 row.claim_owner.as_ref().map(|owner| owner.owner_id.as_str()),
942 row.claim_owner
943 .as_ref()
944 .map(|owner| owner.incarnation_id.as_str())
945 ],
946 )
947 .map_err(sqlite_error)?;
948 if claimed == 0 {
949 return Ok(TxOutcome::Rollback(None));
950 }
951 }
952 Ok(TxOutcome::Commit(Some(QueuedWorkClaim {
953 session_id: session_id.clone(),
954 claim_id: lease.claim_id,
955 owner: owner.clone(),
956 lease_token: lease.lease_token,
957 fencing_token: lease.fencing_token,
958 claimed_at_epoch_ms: lease.claimed_at_epoch_ms,
959 expires_at_epoch_ms: lease.expires_at_epoch_ms,
960 batches: selected_batches,
961 })))
962 })();
963 match outcome {
964 Ok(TxOutcome::Commit(value)) => Ok(TxOutcome::Commit(Ok(value))),
965 Ok(TxOutcome::Rollback(value)) => Ok(TxOutcome::Rollback(Ok(value))),
966 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
967 }
968 })
969 .await
970 .map_err(sqlite_error)?
971 }
972
973 async fn claim_ready_queued_work(
974 &self,
975 session_id: &str,
976 session_execution_lease: &SessionExecutionLeaseFence,
977 owner: &LeaseOwnerIdentity,
978 boundary: QueuedWorkClaimBoundary,
979 lease_ttl_ms: u64,
980 max_batches: usize,
981 ) -> Result<Option<QueuedWorkClaim>, StoreError> {
982 if max_batches == 0 {
983 return Ok(None);
984 }
985 let session_id = session_id.to_string();
986 let session_execution_lease = session_execution_lease.clone();
987 let owner = owner.clone();
988 self.conn
989 .write_flow(move |tx| {
990 let outcome: Result<TxOutcome<Option<QueuedWorkClaim>>, StoreError> = (|| {
991 ensure_session_execution_lease_conn(
992 tx,
993 &session_id,
994 &session_execution_lease,
995 )?;
996 let now = current_epoch_ms();
997 let candidate_rows = {
998 let mut stmt = tx
999 .prepare(
1000 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1001 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1002 claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
1003 claim_owner_liveness_json, claim_token, claim_expires_at_ms
1004 FROM queued_work_batches
1005 WHERE session_id = ?1
1006 AND available_at_ms <= ?2
1007 ORDER BY enqueue_seq ASC
1008 LIMIT ?3",
1009 )
1010 .map_err(sqlite_error)?;
1011 let rows = stmt
1012 .query_map(
1013 params![session_id, now as i64, claim_scan_limit(max_batches)],
1014 queued_batch_row_from_sql,
1015 )
1016 .map_err(sqlite_error)?;
1017 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1018 };
1019 let candidate_rows = candidate_rows
1020 .into_iter()
1021 .filter(|row| {
1022 row.claim_token.is_none()
1023 || row.claim_expires_at_ms <= now
1024 || row
1025 .claim_owner
1026 .as_ref()
1027 .is_some_and(|holder| holder.is_definitely_dead_for_claimant(&owner))
1028 })
1029 .collect::<Vec<_>>();
1030 let candidate_batches = candidate_rows
1031 .iter()
1032 .map(|row| queued_work_batch_from_conn(tx, row.clone()))
1033 .collect::<Result<Vec<_>, StoreError>>()?;
1034 let candidates = candidate_rows
1035 .iter()
1036 .zip(candidate_batches.iter())
1037 .map(|(row, batch)| {
1038 Ok(ClaimCandidate {
1039 enqueue_seq: row.enqueue_seq,
1040 claim_fencing_token: row.claim_fencing_token,
1041 work_class: batch.work_class().ok_or_else(|| {
1042 StoreError::Backend(format!(
1043 "queued-work batch `{}` has mixed or empty payload classes",
1044 batch.batch_id
1045 ))
1046 })?,
1047 delivery_policy: decode_delivery_policy(
1048 row.delivery_policy.clone(),
1049 )?,
1050 slot_policy: decode_slot_policy(row.slot_policy.clone())?,
1051 merge_key: decode_merge_key(row.merge_key_json.clone())?,
1052 })
1053 })
1054 .collect::<Result<Vec<_>, StoreError>>()?;
1055 let selected_len =
1056 select_turn_work_claim_prefix(&candidates, boundary, max_batches);
1057 if selected_len == 0 {
1058 return Ok(TxOutcome::Commit(None));
1059 }
1060 let mut selected = candidate_rows;
1061 selected.truncate(selected_len);
1062 let mut selected_batches = candidate_batches;
1063 selected_batches.truncate(selected_len);
1064 let lease = QueuedWorkClaimLease::derive(
1065 &candidates[0],
1066 &session_id,
1067 &owner,
1068 now,
1069 lease_ttl_ms,
1070 );
1071 let liveness_json = encode_liveness(&owner.liveness)?;
1072 for row in &selected {
1073 let claimed = tx
1082 .execute(
1083 "UPDATE queued_work_batches
1084 SET claim_id = ?3,
1085 claim_owner_id = ?4,
1086 claim_owner_incarnation_id = ?5,
1087 claim_owner_liveness_json = ?6,
1088 claim_token = ?7,
1089 claim_fencing_token = claim_fencing_token + 1,
1090 claim_claimed_at_ms = ?8,
1091 claim_expires_at_ms = ?9
1092 WHERE session_id = ?1
1093 AND batch_id = ?2
1094 AND (
1095 claim_token IS NULL
1096 OR claim_expires_at_ms <= ?8
1097 OR (
1098 claim_token = ?10
1099 AND claim_owner_id = ?11
1100 AND claim_owner_incarnation_id = ?12
1101 )
1102 )",
1103 params![
1104 session_id,
1105 row.batch_id,
1106 lease.claim_id,
1107 owner.owner_id.as_str(),
1108 owner.incarnation_id.as_str(),
1109 liveness_json.as_str(),
1110 lease.lease_token,
1111 now as i64,
1112 lease.expires_at_epoch_ms as i64,
1113 row.claim_token,
1114 row.claim_owner.as_ref().map(|owner| owner.owner_id.as_str()),
1115 row.claim_owner
1116 .as_ref()
1117 .map(|owner| owner.incarnation_id.as_str())
1118 ],
1119 )
1120 .map_err(sqlite_error)?;
1121 if claimed == 0 {
1122 return Ok(TxOutcome::Rollback(None));
1126 }
1127 }
1128 Ok(TxOutcome::Commit(Some(QueuedWorkClaim {
1129 session_id: session_id.clone(),
1130 claim_id: lease.claim_id,
1131 owner: owner.clone(),
1132 lease_token: lease.lease_token,
1133 fencing_token: lease.fencing_token,
1134 claimed_at_epoch_ms: lease.claimed_at_epoch_ms,
1135 expires_at_epoch_ms: lease.expires_at_epoch_ms,
1136 batches: selected_batches,
1137 })))
1138 })();
1139 match outcome {
1143 Ok(TxOutcome::Commit(value)) => Ok(TxOutcome::Commit(Ok(value))),
1144 Ok(TxOutcome::Rollback(value)) => Ok(TxOutcome::Rollback(Ok(value))),
1145 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1146 }
1147 })
1148 .await
1149 .map_err(sqlite_error)?
1150 }
1151
1152 async fn renew_queued_work_claim(
1153 &self,
1154 claim: &QueuedWorkClaim,
1155 lease_ttl_ms: u64,
1156 ) -> Result<QueuedWorkClaim, StoreError> {
1157 let now = current_epoch_ms();
1158 let expires_at = now.saturating_add(lease_ttl_ms);
1159 let session_id = claim.session_id.clone();
1160 let claim_id = claim.claim_id.clone();
1161 let lease_token = claim.lease_token.clone();
1162 let changed = self
1163 .conn
1164 .write(move |tx| {
1165 tx.execute(
1166 "UPDATE queued_work_batches
1167 SET claim_expires_at_ms = ?4
1168 WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
1169 params![session_id, claim_id, lease_token, expires_at as i64],
1170 )
1171 })
1172 .await
1173 .map_err(sqlite_error)?;
1174 renewed_claim(claim, changed, expires_at)
1175 }
1176
1177 async fn abandon_queued_work_claim(&self, claim: &QueuedWorkClaim) -> Result<(), StoreError> {
1178 let session_id = claim.session_id.clone();
1179 let claim_id = claim.claim_id.clone();
1180 let lease_token = claim.lease_token.clone();
1181 self.conn
1182 .write(move |tx| {
1183 tx.execute(
1184 "UPDATE queued_work_batches
1185 SET claim_id = NULL,
1186 claim_owner_id = NULL,
1187 claim_owner_incarnation_id = NULL,
1188 claim_owner_liveness_json = NULL,
1189 claim_token = NULL,
1190 claim_claimed_at_ms = 0,
1191 claim_expires_at_ms = 0
1192 WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
1193 params![session_id, claim_id, lease_token],
1194 )
1195 })
1196 .await
1197 .map_err(sqlite_error)?;
1198 Ok(())
1199 }
1200
1201 async fn cancel_queued_work_batch(
1202 &self,
1203 session_id: &str,
1204 batch_id: &str,
1205 ) -> Result<Option<QueuedWorkBatch>, StoreError> {
1206 let session_id = session_id.to_string();
1207 let batch_id = batch_id.to_string();
1208 self.conn
1209 .write_flow(move |tx| {
1210 let outcome: Result<Option<QueuedWorkBatch>, StoreError> = (|| {
1211 let now = current_epoch_ms() as i64;
1212 let row = tx
1213 .query_row(
1214 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1215 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1216 claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
1217 claim_owner_liveness_json, claim_token, claim_expires_at_ms
1218 FROM queued_work_batches
1219 WHERE session_id = ?1
1220 AND batch_id = ?2
1221 AND (claim_token IS NULL OR claim_expires_at_ms <= ?3)",
1222 params![session_id, batch_id, now],
1223 queued_batch_row_from_sql,
1224 )
1225 .optional()
1226 .map_err(sqlite_error)?;
1227 let Some(row) = row else {
1228 return Ok(None);
1229 };
1230 let batch = queued_work_batch_from_conn(tx, row)?;
1231 tx.execute(
1232 "DELETE FROM queued_work_batches
1233 WHERE session_id = ?1
1234 AND batch_id = ?2
1235 AND (claim_token IS NULL OR claim_expires_at_ms <= ?3)",
1236 params![session_id, batch_id, now],
1237 )
1238 .map_err(sqlite_error)?;
1239 Ok(Some(batch))
1240 })();
1241 match outcome {
1242 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
1243 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1244 }
1245 })
1246 .await
1247 .map_err(sqlite_error)?
1248 }
1249
1250 async fn list_queued_work(&self, session_id: &str) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1251 let session_id = session_id.to_string();
1252 self.conn
1253 .call(move |conn| {
1254 let outcome: Result<Vec<QueuedWorkBatch>, StoreError> = (|| {
1255 let rows = {
1256 let mut stmt = conn
1257 .prepare(
1258 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1259 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1260 claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
1261 claim_owner_liveness_json, claim_token, claim_expires_at_ms
1262 FROM queued_work_batches
1263 WHERE session_id = ?1
1264 ORDER BY enqueue_seq ASC",
1265 )
1266 .map_err(sqlite_error)?;
1267 let rows = stmt
1268 .query_map(params![session_id], queued_batch_row_from_sql)
1269 .map_err(sqlite_error)?;
1270 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1271 };
1272 rows.into_iter()
1273 .map(|row| queued_work_batch_from_conn(conn, row))
1274 .collect()
1275 })();
1276 Ok(outcome)
1277 })
1278 .await
1279 .map_err(sqlite_error)?
1280 }
1281
1282 async fn list_pending_queued_work(
1283 &self,
1284 session_id: &str,
1285 ) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1286 let session_id = session_id.to_string();
1287 self.conn
1288 .call(move |conn| {
1289 let outcome: Result<Vec<QueuedWorkBatch>, StoreError> = (|| {
1290 let now = current_epoch_ms();
1291 let rows = {
1292 let mut stmt = conn
1293 .prepare(
1294 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1295 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1296 claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
1297 claim_owner_liveness_json, claim_token, claim_expires_at_ms
1298 FROM queued_work_batches
1299 WHERE session_id = ?1
1300 AND (claim_token IS NULL OR claim_expires_at_ms <= ?2)
1301 ORDER BY enqueue_seq ASC",
1302 )
1303 .map_err(sqlite_error)?;
1304 let rows = stmt
1305 .query_map(
1306 params![session_id, now as i64],
1307 queued_batch_row_from_sql,
1308 )
1309 .map_err(sqlite_error)?;
1310 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1311 };
1312 rows.into_iter()
1313 .map(|row| queued_work_batch_from_conn(conn, row))
1314 .collect()
1315 })();
1316 Ok(outcome)
1317 })
1318 .await
1319 .map_err(sqlite_error)?
1320 }
1321
1322 async fn enqueue_pending_turn_input(
1323 &self,
1324 draft: lash_core::PendingTurnInputDraft,
1325 ) -> Result<lash_core::PendingTurnInput, StoreError> {
1326 let nonce = self.commit_count.fetch_add(1, AtomicOrdering::Relaxed);
1327 self.conn
1328 .write_flow(move |tx| {
1329 let outcome: Result<lash_core::PendingTurnInput, StoreError> = (|| {
1330 if let Some(source_key) = draft.source_key.as_deref() {
1331 let existing_id: Option<String> = tx
1332 .query_row(
1333 "SELECT input_id
1334 FROM pending_turn_inputs
1335 WHERE session_id = ?1 AND source_key = ?2",
1336 params![draft.session_id, source_key],
1337 |row| row.get(0),
1338 )
1339 .optional()
1340 .map_err(sqlite_error)?;
1341 if let Some(input_id) = existing_id {
1342 let existing = load_pending_turn_input_by_id_conn(
1343 tx,
1344 &draft.session_id,
1345 &input_id,
1346 )?
1347 .ok_or_else(|| {
1348 StoreError::Backend(
1349 "pending turn input source row disappeared".to_string(),
1350 )
1351 })?;
1352 if !draft.submitted_content_matches(&existing).map_err(|err| {
1353 StoreError::Backend(format!(
1354 "failed to compare pending turn input submission: {err}"
1355 ))
1356 })? {
1357 return Err(StoreError::PendingTurnInputSourceKeyConflict {
1358 session_id: draft.session_id.clone(),
1359 source_key: source_key.to_string(),
1360 existing_input_id: existing.input_id.clone(),
1361 });
1362 }
1363 return Ok(existing);
1364 }
1365 }
1366 let now = current_epoch_ms();
1367 let input_id = draft.input_id.clone().unwrap_or_else(|| {
1368 derive_pending_turn_input_id(
1369 &draft.session_id,
1370 draft.source_key.as_deref(),
1371 now,
1372 nonce,
1373 )
1374 });
1375 let state = match draft.ingress {
1376 lash_core::TurnInputIngress::ActiveTurn { .. } => {
1377 lash_core::TurnInputState::PendingActive
1378 }
1379 lash_core::TurnInputIngress::NextTurn => {
1380 lash_core::TurnInputState::DeferredNextTurn
1381 }
1382 };
1383 tx.execute(
1384 "INSERT INTO pending_turn_inputs (
1385 input_id, session_id, source_key, ingress_json, state,
1386 input_json, enqueued_at_ms
1387 )
1388 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1389 params![
1390 input_id,
1391 draft.session_id,
1392 draft.source_key.as_deref(),
1393 encode_json(&draft.ingress),
1394 state.as_str(),
1395 encode_json(&draft.input),
1396 now as i64,
1397 ],
1398 )
1399 .map_err(sqlite_error)?;
1400 load_pending_turn_input_by_id_conn(tx, &draft.session_id, &input_id)?
1401 .ok_or_else(|| {
1402 StoreError::Backend("pending turn input insert disappeared".to_string())
1403 })
1404 })();
1405 match outcome {
1406 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
1407 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1408 }
1409 })
1410 .await
1411 .map_err(sqlite_error)?
1412 }
1413
1414 async fn list_pending_turn_inputs(
1415 &self,
1416 session_id: &str,
1417 ) -> Result<Vec<lash_core::PendingTurnInput>, StoreError> {
1418 let session_id = session_id.to_string();
1419 self.conn
1420 .call(move |conn| {
1421 let outcome: Result<Vec<lash_core::PendingTurnInput>, StoreError> = (|| {
1422 let now = current_epoch_ms();
1423 let rows = {
1424 let mut stmt = conn
1425 .prepare(
1426 "SELECT enqueue_seq, input_id, session_id, source_key, ingress_json,
1427 state, input_json, enqueued_at_ms, claim_id, claim_fencing_token,
1428 claim_owner_id, claim_owner_incarnation_id,
1429 claim_owner_liveness_json, claim_token, claim_expires_at_ms
1430 FROM pending_turn_inputs
1431 WHERE session_id = ?1
1432 AND state IN (?2, ?3)
1433 AND (claim_token IS NULL OR claim_expires_at_ms <= ?4)
1434 ORDER BY enqueue_seq ASC",
1435 )
1436 .map_err(sqlite_error)?;
1437 let rows = stmt
1438 .query_map(
1439 params![
1440 session_id,
1441 lash_core::TurnInputState::PendingActive.as_str(),
1442 lash_core::TurnInputState::DeferredNextTurn.as_str(),
1443 now as i64
1444 ],
1445 pending_turn_input_row_from_sql,
1446 )
1447 .map_err(sqlite_error)?;
1448 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1449 };
1450 rows.into_iter().map(pending_turn_input_from_row).collect()
1451 })(
1452 );
1453 Ok(outcome)
1454 })
1455 .await
1456 .map_err(sqlite_error)?
1457 }
1458
1459 async fn cancel_pending_turn_input(
1460 &self,
1461 session_id: &str,
1462 input_id: &str,
1463 ) -> Result<lash_core::PendingTurnInputCancelOutcome, StoreError> {
1464 let target = lash_core::PendingTurnInputCancelTarget::input_id(input_id);
1465 let targets = vec![target];
1466 let mut outcomes = self
1467 .cancel_pending_turn_inputs(session_id, &targets)
1468 .await?;
1469 Ok(outcomes
1470 .pop()
1471 .map(|result| result.outcome)
1472 .unwrap_or(lash_core::PendingTurnInputCancelOutcome::NotFound))
1473 }
1474
1475 async fn cancel_pending_turn_inputs(
1476 &self,
1477 session_id: &str,
1478 targets: &[lash_core::PendingTurnInputCancelTarget],
1479 ) -> Result<Vec<lash_core::PendingTurnInputCancelResult>, StoreError> {
1480 let session_id = session_id.to_string();
1481 let targets = targets.to_vec();
1482 self.conn
1483 .write_flow(move |tx| {
1484 let outcome: Result<Vec<lash_core::PendingTurnInputCancelResult>, StoreError> =
1485 (|| {
1486 let now = current_epoch_ms();
1487 let mut results = Vec::with_capacity(targets.len());
1488 for target in targets {
1489 let outcome = match load_pending_turn_input_row_by_target_conn(
1490 tx,
1491 &session_id,
1492 &target,
1493 )? {
1494 Some(row) => cancel_pending_turn_input_row_conn(tx, row, now)?,
1495 None => lash_core::PendingTurnInputCancelOutcome::NotFound,
1496 };
1497 results
1498 .push(lash_core::PendingTurnInputCancelResult { target, outcome });
1499 }
1500 Ok(results)
1501 })();
1502 match outcome {
1503 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
1504 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1505 }
1506 })
1507 .await
1508 .map_err(sqlite_error)?
1509 }
1510
1511 async fn cancel_pending_turn_input_suffix(
1512 &self,
1513 session_id: &str,
1514 anchor: &lash_core::PendingTurnInputCancelTarget,
1515 ) -> Result<lash_core::PendingTurnInputSuffixCancelOutcome, StoreError> {
1516 let session_id = session_id.to_string();
1517 let anchor = anchor.clone();
1518 self.conn
1519 .write_flow(move |tx| {
1520 let outcome: Result<lash_core::PendingTurnInputSuffixCancelOutcome, StoreError> =
1521 (|| {
1522 let now = current_epoch_ms();
1523 let Some(anchor_row) =
1524 load_pending_turn_input_row_by_target_conn(tx, &session_id, &anchor)?
1525 else {
1526 return Ok(
1527 lash_core::PendingTurnInputSuffixCancelOutcome::AnchorNotFound {
1528 anchor,
1529 },
1530 );
1531 };
1532 let rows = {
1533 let mut stmt = tx
1534 .prepare(
1535 "SELECT enqueue_seq, input_id, session_id, source_key, ingress_json,
1536 state, input_json, enqueued_at_ms, claim_id, claim_fencing_token,
1537 claim_owner_id, claim_owner_incarnation_id,
1538 claim_owner_liveness_json, claim_token, claim_expires_at_ms
1539 FROM pending_turn_inputs
1540 WHERE session_id = ?1 AND enqueue_seq >= ?2
1541 ORDER BY enqueue_seq ASC",
1542 )
1543 .map_err(sqlite_error)?;
1544 let rows = stmt
1545 .query_map(
1546 params![session_id, anchor_row.enqueue_seq as i64],
1547 pending_turn_input_row_from_sql,
1548 )
1549 .map_err(sqlite_error)?;
1550 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1551 };
1552 let mut outcomes = Vec::with_capacity(rows.len());
1553 for row in rows {
1554 outcomes.push(cancel_pending_turn_input_row_conn(tx, row, now)?);
1555 }
1556 Ok(lash_core::PendingTurnInputSuffixCancelOutcome::Outcomes {
1557 anchor,
1558 outcomes,
1559 })
1560 })();
1561 match outcome {
1562 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
1563 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1564 }
1565 })
1566 .await
1567 .map_err(sqlite_error)?
1568 }
1569
1570 async fn claim_active_turn_inputs(
1571 &self,
1572 session_id: &str,
1573 session_execution_lease: &SessionExecutionLeaseFence,
1574 owner: &LeaseOwnerIdentity,
1575 turn_id: &str,
1576 checkpoint: lash_core::CheckpointKind,
1577 lease_ttl_ms: u64,
1578 max_inputs: usize,
1579 ) -> Result<Option<lash_core::TurnInputClaim>, StoreError> {
1580 claim_pending_turn_inputs_sqlite(
1581 &self.conn,
1582 session_id,
1583 session_execution_lease,
1584 owner,
1585 lease_ttl_ms,
1586 max_inputs,
1587 lash_core::TurnInputClaimMode::ActiveTurn {
1588 turn_id: turn_id.to_string(),
1589 checkpoint,
1590 },
1591 )
1592 .await
1593 }
1594
1595 async fn claim_next_turn_inputs(
1596 &self,
1597 session_id: &str,
1598 session_execution_lease: &SessionExecutionLeaseFence,
1599 owner: &LeaseOwnerIdentity,
1600 lease_ttl_ms: u64,
1601 max_inputs: usize,
1602 ) -> Result<Option<lash_core::TurnInputClaim>, StoreError> {
1603 claim_pending_turn_inputs_sqlite(
1604 &self.conn,
1605 session_id,
1606 session_execution_lease,
1607 owner,
1608 lease_ttl_ms,
1609 max_inputs,
1610 lash_core::TurnInputClaimMode::NextTurn,
1611 )
1612 .await
1613 }
1614
1615 async fn abandon_turn_input_claim(
1616 &self,
1617 claim: &lash_core::TurnInputClaim,
1618 ) -> Result<(), StoreError> {
1619 let session_id = claim.session_id.clone();
1620 let claim_id = claim.claim_id.clone();
1621 let lease_token = claim.lease_token.clone();
1622 let restored_state = match claim.mode {
1623 lash_core::TurnInputClaimMode::ActiveTurn { .. } => {
1624 lash_core::TurnInputState::PendingActive
1625 }
1626 lash_core::TurnInputClaimMode::NextTurn => lash_core::TurnInputState::DeferredNextTurn,
1627 };
1628 self.conn
1629 .write(move |tx| {
1630 tx.execute(
1631 "UPDATE pending_turn_inputs
1632 SET state = CASE
1633 WHEN state = ?4 THEN ?5
1634 ELSE state
1635 END,
1636 claim_id = NULL,
1637 claim_owner_id = NULL,
1638 claim_owner_incarnation_id = NULL,
1639 claim_owner_liveness_json = NULL,
1640 claim_token = NULL,
1641 claim_claimed_at_ms = 0,
1642 claim_expires_at_ms = 0
1643 WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
1644 params![
1645 session_id,
1646 claim_id,
1647 lease_token,
1648 lash_core::TurnInputState::Accepted.as_str(),
1649 restored_state.as_str(),
1650 ],
1651 )
1652 })
1653 .await
1654 .map_err(sqlite_error)?;
1655 Ok(())
1656 }
1657
1658 async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError> {
1659 Store::save_session_meta(self, meta).await;
1660 Ok(())
1661 }
1662
1663 async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError> {
1664 Ok(Store::load_session_meta(self).await)
1665 }
1666
1667 async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError> {
1668 if ids.is_empty() {
1669 return Ok(());
1670 }
1671 let ids = ids.to_vec();
1672 self.conn
1673 .write(move |tx| {
1674 let mut stmt =
1675 tx.prepare("UPDATE graph_nodes SET tombstoned = 1 WHERE node_id = ?1")?;
1676 for id in &ids {
1677 stmt.execute(params![id])?;
1678 }
1679 Ok(())
1680 })
1681 .await
1682 .map_err(sqlite_error)
1683 }
1684
1685 async fn vacuum(&self) -> Result<VacuumReport, StoreError> {
1686 let (removed_node_count, removed_pending_turn_input_tombstone_count) = self
1687 .conn
1688 .write(move |tx| {
1689 let removed_node_count =
1690 tx.execute("DELETE FROM graph_nodes WHERE tombstoned = 1", [])?;
1691 let removed_pending_turn_input_tombstone_count = tx.execute(
1692 "DELETE FROM pending_turn_inputs
1693 WHERE state IN (?1, ?2)",
1694 params![
1695 lash_core::TurnInputState::Cancelled.as_str(),
1696 lash_core::TurnInputState::Completed.as_str()
1697 ],
1698 )?;
1699 Ok((
1700 removed_node_count,
1701 removed_pending_turn_input_tombstone_count,
1702 ))
1703 })
1704 .await
1705 .map_err(sqlite_error)?;
1706 Ok(VacuumReport {
1707 removed_node_count,
1708 removed_pending_turn_input_tombstone_count,
1709 })
1710 }
1711
1712 async fn gc_unreachable(&self) -> Result<GcReport, StoreError> {
1713 Ok(Store::gc_unreachable(self).await)
1714 }
1715}
1716
1717fn derive_pending_turn_input_id(
1718 session_id: &str,
1719 source_key: Option<&str>,
1720 now_epoch_ms: u64,
1721 nonce: u64,
1722) -> String {
1723 format!(
1724 "ti:{:x}",
1725 Sha256::digest(format!("{session_id}:{source_key:?}:{now_epoch_ms}:{nonce}").as_bytes())
1726 )
1727}
1728
1729fn cancel_pending_turn_input_row_conn(
1730 conn: &Connection,
1731 row: PendingTurnInputRow,
1732 now_epoch_ms: u64,
1733) -> Result<lash_core::PendingTurnInputCancelOutcome, StoreError> {
1734 let mut input = pending_turn_input_from_row(row.clone())?;
1735 match input.state {
1736 lash_core::TurnInputState::Cancelled => Ok(
1737 lash_core::PendingTurnInputCancelOutcome::AlreadyCancelled(input),
1738 ),
1739 lash_core::TurnInputState::Completed => Ok(
1740 lash_core::PendingTurnInputCancelOutcome::AlreadyCompleted(input),
1741 ),
1742 lash_core::TurnInputState::Accepted => {
1743 Ok(lash_core::PendingTurnInputCancelOutcome::AlreadyClaimed {
1744 claim: pending_turn_input_claim_diagnostics_from_row(&row, input.state),
1745 input,
1746 })
1747 }
1748 lash_core::TurnInputState::PendingActive | lash_core::TurnInputState::DeferredNextTurn => {
1749 let live_claim = row.claim_token.is_some() && row.claim_expires_at_ms > now_epoch_ms;
1750 if live_claim {
1751 return Ok(lash_core::PendingTurnInputCancelOutcome::AlreadyClaimed {
1752 claim: pending_turn_input_claim_diagnostics_from_row(&row, input.state),
1753 input,
1754 });
1755 }
1756 conn.execute(
1757 "UPDATE pending_turn_inputs
1758 SET state = ?3,
1759 claim_id = NULL,
1760 claim_owner_id = NULL,
1761 claim_owner_incarnation_id = NULL,
1762 claim_owner_liveness_json = NULL,
1763 claim_token = NULL,
1764 claim_claimed_at_ms = 0,
1765 claim_expires_at_ms = 0
1766 WHERE session_id = ?1 AND input_id = ?2",
1767 params![
1768 row.session_id,
1769 row.input_id,
1770 lash_core::TurnInputState::Cancelled.as_str(),
1771 ],
1772 )
1773 .map_err(sqlite_error)?;
1774 input.state = lash_core::TurnInputState::Cancelled;
1775 Ok(lash_core::PendingTurnInputCancelOutcome::Cancelled(input))
1776 }
1777 }
1778}
1779
1780async fn claim_pending_turn_inputs_sqlite(
1781 conn: &SqliteConnection,
1782 session_id: &str,
1783 session_execution_lease: &SessionExecutionLeaseFence,
1784 owner: &LeaseOwnerIdentity,
1785 lease_ttl_ms: u64,
1786 max_inputs: usize,
1787 mode: lash_core::TurnInputClaimMode,
1788) -> Result<Option<lash_core::TurnInputClaim>, StoreError> {
1789 if max_inputs == 0 {
1790 return Ok(None);
1791 }
1792 let session_id = session_id.to_string();
1793 let session_execution_lease = session_execution_lease.clone();
1794 let owner = owner.clone();
1795 conn.write_flow(move |tx| {
1796 let outcome: Result<TxOutcome<Option<lash_core::TurnInputClaim>>, StoreError> = (|| {
1797 ensure_session_execution_lease_conn(tx, &session_id, &session_execution_lease)?;
1798 let now = current_epoch_ms();
1799 let wanted_state = match &mode {
1800 lash_core::TurnInputClaimMode::ActiveTurn { .. } => {
1801 lash_core::TurnInputState::PendingActive
1802 }
1803 lash_core::TurnInputClaimMode::NextTurn => {
1804 lash_core::TurnInputState::DeferredNextTurn
1805 }
1806 };
1807 let candidate_rows = {
1808 let mut stmt = tx
1809 .prepare(
1810 "SELECT enqueue_seq, input_id, session_id, source_key, ingress_json,
1811 state, input_json, enqueued_at_ms, claim_id, claim_fencing_token,
1812 claim_owner_id, claim_owner_incarnation_id,
1813 claim_owner_liveness_json, claim_token, claim_expires_at_ms
1814 FROM pending_turn_inputs
1815 WHERE session_id = ?1 AND state = ?2
1816 ORDER BY enqueue_seq ASC
1817 LIMIT ?3",
1818 )
1819 .map_err(sqlite_error)?;
1820 let rows = stmt
1821 .query_map(
1822 params![
1823 session_id,
1824 wanted_state.as_str(),
1825 (max_inputs as i64).saturating_add(32)
1826 ],
1827 pending_turn_input_row_from_sql,
1828 )
1829 .map_err(sqlite_error)?;
1830 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1831 };
1832 let mut selected = Vec::new();
1833 for row in candidate_rows {
1834 let claim_available = row.claim_token.is_none()
1835 || row.claim_expires_at_ms <= now
1836 || row
1837 .claim_owner
1838 .as_ref()
1839 .is_some_and(|holder| holder.is_definitely_dead_for_claimant(&owner));
1840 if !claim_available {
1841 continue;
1842 }
1843 let input = pending_turn_input_from_row(row.clone())?;
1844 let matches_mode = match &mode {
1845 lash_core::TurnInputClaimMode::ActiveTurn {
1846 turn_id,
1847 checkpoint,
1848 } => {
1849 input
1850 .ingress
1851 .active_turn_id()
1852 .is_some_and(|active| active == turn_id)
1853 && input.ingress.admits_checkpoint(*checkpoint)
1854 }
1855 lash_core::TurnInputClaimMode::NextTurn => input.state.is_next_turn_pending(),
1856 };
1857 if matches_mode {
1858 selected.push((row, input));
1859 if selected.len() >= max_inputs {
1860 break;
1861 }
1862 }
1863 }
1864 let Some((head, _)) = selected.first() else {
1865 return Ok(TxOutcome::Commit(None));
1866 };
1867 let lease = TurnInputClaimLease::derive(head, &session_id, &owner, now, lease_ttl_ms);
1868 let liveness_json = encode_liveness(&owner.liveness)?;
1869 let state_after_claim = match &mode {
1870 lash_core::TurnInputClaimMode::ActiveTurn { .. } => {
1871 lash_core::TurnInputState::Accepted
1872 }
1873 lash_core::TurnInputClaimMode::NextTurn => {
1874 lash_core::TurnInputState::DeferredNextTurn
1875 }
1876 };
1877 let mut inputs = Vec::new();
1878 for (row, mut input) in selected {
1879 let claimed = tx
1880 .execute(
1881 "UPDATE pending_turn_inputs
1882 SET state = ?3,
1883 claim_id = ?4,
1884 claim_owner_id = ?5,
1885 claim_owner_incarnation_id = ?6,
1886 claim_owner_liveness_json = ?7,
1887 claim_token = ?8,
1888 claim_fencing_token = claim_fencing_token + 1,
1889 claim_claimed_at_ms = ?9,
1890 claim_expires_at_ms = ?10
1891 WHERE session_id = ?1
1892 AND input_id = ?2
1893 AND (
1894 claim_token IS NULL
1895 OR claim_expires_at_ms <= ?9
1896 OR (
1897 claim_token = ?11
1898 AND claim_owner_id = ?12
1899 AND claim_owner_incarnation_id = ?13
1900 )
1901 )",
1902 params![
1903 session_id,
1904 row.input_id,
1905 state_after_claim.as_str(),
1906 lease.claim_id,
1907 owner.owner_id.as_str(),
1908 owner.incarnation_id.as_str(),
1909 liveness_json.as_str(),
1910 lease.lease_token,
1911 now as i64,
1912 lease.expires_at_epoch_ms as i64,
1913 row.claim_token,
1914 row.claim_owner
1915 .as_ref()
1916 .map(|owner| owner.owner_id.as_str()),
1917 row.claim_owner
1918 .as_ref()
1919 .map(|owner| owner.incarnation_id.as_str())
1920 ],
1921 )
1922 .map_err(sqlite_error)?;
1923 if claimed == 0 {
1924 return Ok(TxOutcome::Rollback(None));
1925 }
1926 input.state = state_after_claim;
1927 inputs.push(input);
1928 }
1929 Ok(TxOutcome::Commit(Some(lash_core::TurnInputClaim {
1930 session_id: session_id.clone(),
1931 claim_id: lease.claim_id,
1932 owner: owner.clone(),
1933 lease_token: lease.lease_token,
1934 fencing_token: lease.fencing_token,
1935 claimed_at_epoch_ms: lease.claimed_at_epoch_ms,
1936 expires_at_epoch_ms: lease.expires_at_epoch_ms,
1937 mode,
1938 inputs,
1939 })))
1940 })(
1941 );
1942 match outcome {
1943 Ok(TxOutcome::Commit(value)) => Ok(TxOutcome::Commit(Ok(value))),
1944 Ok(TxOutcome::Rollback(value)) => Ok(TxOutcome::Rollback(Ok(value))),
1945 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1946 }
1947 })
1948 .await
1949 .map_err(sqlite_error)?
1950}
1951
1952struct SessionExecutionLeaseRow {
1953 owner: Option<LeaseOwnerIdentity>,
1954 lease_token: Option<String>,
1955 fencing_token: u64,
1956 claimed_at_ms: u64,
1957 expires_at_ms: u64,
1958}
1959
1960fn load_session_execution_lease_row_conn(
1961 conn: &Connection,
1962 session_id: &str,
1963) -> Result<Option<SessionExecutionLeaseRow>, StoreError> {
1964 let row = conn
1965 .query_row(
1966 "SELECT lease_owner_id, lease_token, lease_fencing_token,
1967 lease_claimed_at_ms, lease_expires_at_ms,
1968 lease_owner_incarnation_id, lease_owner_liveness_json
1969 FROM session_execution_leases
1970 WHERE session_id = ?1",
1971 params![session_id],
1972 |row| {
1973 let owner_id: Option<String> = row.get(0)?;
1974 let incarnation_id: Option<String> = row.get(5)?;
1975 let liveness_json: Option<String> = row.get(6)?;
1976 Ok(SessionExecutionLeaseRow {
1977 owner: lease_owner_from_columns(owner_id, incarnation_id, liveness_json),
1978 lease_token: row.get(1)?,
1979 fencing_token: row.get::<_, i64>(2)? as u64,
1980 claimed_at_ms: row.get::<_, i64>(3)? as u64,
1981 expires_at_ms: row.get::<_, i64>(4)? as u64,
1982 })
1983 },
1984 )
1985 .optional()
1986 .map_err(sqlite_error)?;
1987 Ok(row)
1988}
1989
1990fn lease_owner_from_columns(
1991 owner_id: Option<String>,
1992 incarnation_id: Option<String>,
1993 liveness_json: Option<String>,
1994) -> Option<LeaseOwnerIdentity> {
1995 owner_id.map(|owner_id| LeaseOwnerIdentity {
1996 incarnation_id: incarnation_id.unwrap_or_else(|| owner_id.clone()),
1997 owner_id,
1998 liveness: liveness_json
1999 .as_deref()
2000 .and_then(|json| serde_json::from_str(json).ok())
2001 .unwrap_or(LeaseOwnerLiveness::Opaque),
2002 })
2003}
2004
2005fn encode_liveness(liveness: &LeaseOwnerLiveness) -> Result<String, StoreError> {
2006 serde_json::to_string(liveness)
2007 .map_err(|err| StoreError::Backend(format!("failed to encode lease liveness: {err}")))
2008}
2009
2010fn row_to_session_execution_lease(
2011 session_id: &str,
2012 row: SessionExecutionLeaseRow,
2013) -> Result<SessionExecutionLease, StoreError> {
2014 Ok(SessionExecutionLease {
2015 session_id: session_id.to_string(),
2016 owner: row
2017 .owner
2018 .ok_or_else(|| StoreError::Backend("live session lease missing owner".to_string()))?,
2019 lease_token: row.lease_token.ok_or_else(|| {
2020 StoreError::Backend("live session lease missing lease token".to_string())
2021 })?,
2022 fencing_token: row.fencing_token,
2023 claimed_at_epoch_ms: row.claimed_at_ms,
2024 expires_at_epoch_ms: row.expires_at_ms,
2025 })
2026}
2027
2028fn acquire_session_execution_lease_conn(
2029 conn: &Connection,
2030 session_id: &str,
2031 owner: &LeaseOwnerIdentity,
2032 previous_fencing_token: u64,
2033 now: u64,
2034 lease_ttl_ms: u64,
2035) -> Result<SessionExecutionLease, StoreError> {
2036 let fencing_token = previous_fencing_token.saturating_add(1);
2037 let lease_token = format!(
2038 "{}:{}:{}:{now}:{fencing_token}",
2039 session_id, owner.owner_id, owner.incarnation_id
2040 );
2041 let expires_at = now.saturating_add(lease_ttl_ms);
2042 let liveness_json = encode_liveness(&owner.liveness)?;
2043 conn.execute(
2044 "INSERT INTO session_execution_leases (
2045 session_id, lease_owner_id, lease_owner_incarnation_id, lease_owner_liveness_json,
2046 lease_token, lease_fencing_token, lease_claimed_at_ms, lease_expires_at_ms
2047 )
2048 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
2049 ON CONFLICT(session_id) DO UPDATE SET
2050 lease_owner_id = excluded.lease_owner_id,
2051 lease_owner_incarnation_id = excluded.lease_owner_incarnation_id,
2052 lease_owner_liveness_json = excluded.lease_owner_liveness_json,
2053 lease_token = excluded.lease_token,
2054 lease_fencing_token = excluded.lease_fencing_token,
2055 lease_claimed_at_ms = excluded.lease_claimed_at_ms,
2056 lease_expires_at_ms = excluded.lease_expires_at_ms",
2057 params![
2058 session_id,
2059 owner.owner_id,
2060 owner.incarnation_id,
2061 liveness_json,
2062 lease_token,
2063 fencing_token as i64,
2064 now as i64,
2065 expires_at as i64
2066 ],
2067 )
2068 .map_err(sqlite_error)?;
2069 Ok(SessionExecutionLease {
2070 session_id: session_id.to_string(),
2071 owner: owner.clone(),
2072 lease_token,
2073 fencing_token,
2074 claimed_at_epoch_ms: now,
2075 expires_at_epoch_ms: expires_at,
2076 })
2077}
2078
2079fn ensure_session_execution_lease_conn(
2080 conn: &Connection,
2081 session_id: &str,
2082 fence: &SessionExecutionLeaseFence,
2083) -> Result<(), StoreError> {
2084 if fence.session_id != session_id {
2085 return Err(StoreError::SessionExecutionLeaseExpired {
2086 session_id: session_id.to_string(),
2087 });
2088 }
2089 let now = current_epoch_ms();
2090 let current = load_session_execution_lease_row_conn(conn, session_id)?;
2091 let Some(current) = current else {
2092 return Err(StoreError::SessionExecutionLeaseExpired {
2093 session_id: session_id.to_string(),
2094 });
2095 };
2096 if current
2097 .owner
2098 .as_ref()
2099 .is_some_and(|owner| owner.same_incarnation(&fence.owner))
2100 && current.lease_token.as_deref() == Some(fence.lease_token.as_str())
2101 && current.fencing_token == fence.fencing_token
2102 && current.expires_at_ms > now
2103 {
2104 Ok(())
2105 } else {
2106 Err(StoreError::SessionExecutionLeaseExpired {
2107 session_id: session_id.to_string(),
2108 })
2109 }
2110}
2111
2112fn release_session_execution_lease_conn(
2113 conn: &Connection,
2114 completion: &SessionExecutionLeaseCompletion,
2115) -> Result<(), StoreError> {
2116 conn.execute(
2117 "UPDATE session_execution_leases
2118 SET lease_owner_id = NULL,
2119 lease_owner_incarnation_id = NULL,
2120 lease_owner_liveness_json = NULL,
2121 lease_token = NULL,
2122 lease_claimed_at_ms = 0,
2123 lease_expires_at_ms = 0
2124 WHERE session_id = ?1
2125 AND lease_owner_id = ?2
2126 AND lease_owner_incarnation_id = ?3
2127 AND lease_token = ?4
2128 AND lease_fencing_token = ?5",
2129 params![
2130 completion.session_id,
2131 completion.owner.owner_id,
2132 completion.owner.incarnation_id,
2133 completion.lease_token,
2134 completion.fencing_token as i64
2135 ],
2136 )
2137 .map_err(sqlite_error)?;
2138 Ok(())
2139}