1use super::*;
27
28#[async_trait::async_trait]
29impl SessionCommitStore for Store {
30 fn durability_tier(&self) -> DurabilityTier {
31 DurabilityTier::Durable
32 }
33
34 async fn load_session(
35 &self,
36 scope: SessionReadScope,
37 ) -> Result<Option<PersistedSessionRead>, StoreError> {
38 self.conn
39 .call(move |conn| {
40 let outcome: Result<Option<PersistedSessionRead>, StoreError> = (|| {
41 let Some(meta) = try_load_session_head_meta_from_conn(conn)? else {
42 return Ok(None);
43 };
44 let leaf_node_id = match &scope {
45 SessionReadScope::FullGraph => meta.leaf_node_id.clone(),
46 SessionReadScope::ActivePath { leaf_node_id } => {
47 leaf_node_id.clone().or_else(|| meta.leaf_node_id.clone())
48 }
49 };
50 let mut graph = match scope {
51 SessionReadScope::FullGraph => {
52 Self::load_session_graph_from_conn(conn, meta.leaf_node_id.clone())
53 }
54 SessionReadScope::ActivePath { .. } => {
55 Self::load_active_path_session_graph_from_conn(
56 conn,
57 leaf_node_id.clone(),
58 )
59 .map_err(sqlite_error)?
60 }
61 };
62 graph.set_leaf_node_id(leaf_node_id);
63 let checkpoint = meta
64 .checkpoint_ref
65 .as_ref()
66 .map(|blob_ref| Self::get_checkpoint_conn(conn, blob_ref))
67 .transpose()?
68 .flatten();
69 Ok(Some(PersistedSessionRead {
70 session_id: meta.session_id,
71 head_revision: meta.head_revision,
72 config: meta.config,
73 agent_frames: meta.agent_frames,
74 current_agent_frame_id: meta.current_agent_frame_id,
75 graph,
76 checkpoint_ref: meta.checkpoint_ref,
77 checkpoint,
78 token_ledger: merge_token_ledger_entries(Self::load_usage_deltas_conn(
79 conn,
80 )),
81 }))
82 })(
83 );
84 Ok(outcome)
85 })
86 .await
87 .map_err(sqlite_error)?
88 }
89
90 async fn load_node(
91 &self,
92 node_id: &str,
93 ) -> Result<Option<lash_core::SessionNodeRecord>, StoreError> {
94 let node_id = node_id.to_string();
95 let row: Option<String> = self
96 .conn
97 .call(move |conn| {
98 conn.query_row(
99 "SELECT node_json FROM graph_nodes WHERE node_id = ?1 AND tombstoned = 0",
100 params![node_id],
101 |row| row.get(0),
102 )
103 .optional()
104 })
105 .await
106 .map_err(sqlite_error)?;
107 Ok(row.and_then(|json| serde_json::from_str(&json).ok()))
108 }
109
110 async fn commit_runtime_state(
111 &self,
112 commit: RuntimeCommit,
113 ) -> Result<RuntimeCommitResult, StoreError> {
114 let blob_profile = self.options.blob_profile;
115 let result = self
116 .conn
117 .write_flow(move |tx| {
118 let outcome: Result<RuntimeCommitResult, StoreError> = (|| {
119 let existing = try_load_session_head_meta_from_conn(tx)?;
120 if let Some(bound_session_id) =
121 existing.as_ref().map(|meta| meta.session_id.as_str())
122 && bound_session_id != commit.session_id
123 {
124 return Err(StoreError::SessionBindingMismatch {
125 bound_session_id: bound_session_id.to_string(),
126 attempted_session_id: commit.session_id.clone(),
127 });
128 }
129 if let Some(completed) = &commit.turn_commit {
130 if completed.session_id != commit.session_id {
131 return Err(StoreError::RuntimeTurnCommitConflict {
132 session_id: completed.session_id.clone(),
133 turn_id: completed.turn_id.clone(),
134 });
135 }
136 let prior: Option<(String, String)> = tx
137 .query_row(
138 "SELECT turn_commit_hash, result_json FROM runtime_turn_commits
139 WHERE session_id = ?1 AND turn_id = ?2",
140 params![completed.session_id, completed.turn_id],
141 |row| Ok((row.get(0)?, row.get(1)?)),
142 )
143 .optional()
144 .map_err(sqlite_error)?;
145 if let Some((turn_commit_hash, result_json)) = prior {
146 if turn_commit_hash == completed.turn_commit_hash {
147 let result: RuntimeCommitResult =
148 serde_json::from_str(&result_json).map_err(|err| {
149 StoreError::Backend(format!(
150 "failed to decode runtime turn commit result: {err}"
151 ))
152 })?;
153 if let Some(completion) =
154 commit.release_session_execution_lease.as_ref()
155 {
156 release_session_execution_lease_conn(tx, completion)?;
157 }
158 return Ok(result);
159 }
160 return Err(StoreError::RuntimeTurnCommitConflict {
161 session_id: completed.session_id.clone(),
162 turn_id: completed.turn_id.clone(),
163 });
164 }
165 }
166 let Some(session_execution_lease) = commit.session_execution_lease.as_ref()
167 else {
168 return Err(StoreError::SessionExecutionLeaseExpired {
169 session_id: commit.session_id.clone(),
170 });
171 };
172 ensure_session_execution_lease_conn(
173 tx,
174 &commit.session_id,
175 session_execution_lease,
176 )?;
177 let actual_revision = existing.as_ref().map_or(0, |meta| meta.head_revision);
178 if commit.expected_head_revision.is_some()
179 && commit.expected_head_revision != Some(actual_revision)
180 {
181 return Err(StoreError::HeadRevisionConflict {
182 expected: commit.expected_head_revision,
183 actual: actual_revision,
184 });
185 }
186 for completed in &commit.completed_queue_claims {
187 if completed.session_id != commit.session_id {
188 return Err(StoreError::QueuedWorkClaimExpired {
189 session_id: completed.session_id.clone(),
190 claim_id: completed.claim_id.clone(),
191 });
192 }
193 ensure_queued_work_completion_conn(tx, completed)?;
194 }
195 for completed in &commit.completed_turn_input_claims {
196 if completed.session_id != commit.session_id {
197 return Err(StoreError::TurnInputClaimExpired {
198 session_id: completed.session_id.clone(),
199 claim_id: completed.claim_id.clone(),
200 });
201 }
202 let owned_rows: usize = tx
203 .query_row(
204 "SELECT COUNT(*)
205 FROM pending_turn_inputs
206 WHERE session_id = ?1
207 AND claim_id = ?2
208 AND claim_token = ?3",
209 params![
210 completed.session_id,
211 completed.claim_id,
212 completed.lease_token
213 ],
214 |row| row.get::<_, i64>(0),
215 )
216 .map_err(sqlite_error)? as usize;
217 ensure_turn_input_completion_owns_all_inputs(completed, owned_rows)?;
218 }
219
220 let stored_checkpoint =
221 Self::put_checkpoint_conn(tx, &commit.checkpoint, blob_profile)
222 .map_err(sqlite_error)?;
223
224 if !commit.usage_deltas.is_empty() {
225 let mut stmt = tx
226 .prepare(
227 "INSERT INTO usage_deltas (
228 source, model, input_tokens, output_tokens, cache_read_input_tokens, cache_write_input_tokens, reasoning_output_tokens
229 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
230 )
231 .map_err(sqlite_error)?;
232 for entry in &commit.usage_deltas {
233 stmt.execute(params![
234 entry.source,
235 entry.model,
236 entry.usage.input_tokens,
237 entry.usage.output_tokens,
238 entry.usage.cache_read_input_tokens,
239 entry.usage.cache_write_input_tokens,
240 entry.usage.reasoning_output_tokens,
241 ])
242 .map_err(sqlite_error)?;
243 }
244 }
245
246 let leaf_node_id = match &commit.graph {
247 GraphCommitDelta::Unchanged { leaf_node_id } => leaf_node_id.clone(),
248 GraphCommitDelta::Append {
249 nodes,
250 leaf_node_id,
251 } => {
252 for node in nodes {
253 let node_json = encode_json(node);
254 tx.execute(
255 "INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)",
256 params![node.node_id, node_json],
257 )
258 .map_err(sqlite_error)?;
259 }
260 leaf_node_id.clone()
261 }
262 GraphCommitDelta::ReplaceFull(graph) => {
263 tx.execute("DELETE FROM graph_nodes", [])
264 .map_err(sqlite_error)?;
265 for node in &graph.nodes {
266 let node_json = encode_json(node);
267 tx.execute(
268 "INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)",
269 params![node.node_id, node_json],
270 )
271 .map_err(sqlite_error)?;
272 }
273 graph.leaf_node_id.clone()
274 }
275 };
276 let graph_node_count: usize = tx
277 .query_row(
278 "SELECT COUNT(*) FROM graph_nodes WHERE tombstoned = 0",
279 [],
280 |row| row.get::<_, i64>(0),
281 )
282 .map_err(sqlite_error)? as usize;
283 let next_revision = actual_revision + 1;
284 let meta = SessionHeadMeta {
285 schema_version: lash_core::store::SESSION_HEAD_META_SCHEMA_VERSION,
286 session_id: commit.session_id.clone(),
287 head_revision: next_revision,
288 config: commit.config.clone(),
289 agent_frames: commit.agent_frames.clone(),
290 current_agent_frame_id: commit.current_agent_frame_id.clone(),
291 checkpoint_ref: Some(stored_checkpoint.checkpoint_ref.clone()),
292 leaf_node_id,
293 graph_node_count,
294 token_ledger: Vec::new(),
295 };
296 tx.execute(
297 "INSERT OR REPLACE INTO session_head (singleton, session_id, head_json, head_revision)
298 VALUES (1, ?1, ?2, ?3)",
299 params![
300 meta.session_id,
301 encode_json(&meta),
302 meta.head_revision as i64
303 ],
304 )
305 .map_err(sqlite_error)?;
306 for completed in &commit.completed_queue_claims {
307 for batch_id in &completed.batch_ids {
308 tx.execute(
309 "DELETE FROM queued_work_batches
310 WHERE session_id = ?1
311 AND batch_id = ?2
312 AND claim_id = ?3
313 AND claim_token = ?4",
314 params![
315 completed.session_id,
316 batch_id,
317 completed.claim_id,
318 completed.lease_token
319 ],
320 )
321 .map_err(sqlite_error)?;
322 }
323 }
324 for completed in &commit.completed_turn_input_claims {
325 for input_id in &completed.input_ids {
326 tx.execute(
327 "UPDATE pending_turn_inputs
328 SET state = ?5,
329 claim_id = NULL,
330 claim_owner_id = NULL,
331 claim_owner_incarnation_id = NULL,
332 claim_owner_liveness_json = NULL,
333 claim_token = NULL,
334 claim_claimed_at_ms = 0,
335 claim_expires_at_ms = 0
336 WHERE session_id = ?1
337 AND input_id = ?2
338 AND claim_id = ?3
339 AND claim_token = ?4",
340 params![
341 completed.session_id,
342 input_id,
343 completed.claim_id,
344 completed.lease_token,
345 lash_core::TurnInputState::Completed.as_str(),
346 ],
347 )
348 .map_err(sqlite_error)?;
349 }
350 }
351 if let Some(turn_id) = commit.interrupted_turn_input_turn_id.as_deref() {
352 let input_ids = {
353 let mut stmt = tx
354 .prepare(
355 "SELECT input_id, ingress_json
356 FROM pending_turn_inputs
357 WHERE session_id = ?1 AND state = ?2",
358 )
359 .map_err(sqlite_error)?;
360 let rows = stmt
361 .query_map(
362 params![
363 commit.session_id,
364 lash_core::TurnInputState::PendingActive.as_str()
365 ],
366 |row| {
367 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
368 },
369 )
370 .map_err(sqlite_error)?;
371 let mut input_ids = Vec::new();
372 for row in rows {
373 let (input_id, ingress_json) = row.map_err(sqlite_error)?;
374 let ingress = decode_turn_input_ingress(ingress_json)?;
375 if ingress
376 .active_turn_id()
377 .is_some_and(|active| active == turn_id)
378 {
379 input_ids.push(input_id);
380 }
381 }
382 input_ids
383 };
384 let next_turn_ingress = encode_json(&lash_core::TurnInputIngress::NextTurn);
385 let mut stmt = tx
386 .prepare(
387 "UPDATE pending_turn_inputs
388 SET state = ?3,
389 ingress_json = ?4,
390 claim_id = NULL,
391 claim_owner_id = NULL,
392 claim_owner_incarnation_id = NULL,
393 claim_owner_liveness_json = NULL,
394 claim_token = NULL,
395 claim_claimed_at_ms = 0,
396 claim_expires_at_ms = 0
397 WHERE session_id = ?1 AND input_id = ?2",
398 )
399 .map_err(sqlite_error)?;
400 for input_id in input_ids {
401 stmt.execute(params![
402 commit.session_id,
403 input_id,
404 lash_core::TurnInputState::DeferredNextTurn.as_str(),
405 next_turn_ingress
406 ])
407 .map_err(sqlite_error)?;
408 }
409 }
410 if !commit.committed_attachment_ids.is_empty() {
411 let now = current_epoch_ms() as i64;
412 let mut stmt = tx
413 .prepare(
414 "UPDATE attachment_manifest
415 SET committed_at_ms = COALESCE(committed_at_ms, ?1)
416 WHERE attachment_id = ?2 AND session_id = ?3",
417 )
418 .map_err(sqlite_error)?;
419 for id in &commit.committed_attachment_ids {
420 stmt.execute(params![now, id.as_str(), commit.session_id])
421 .map_err(sqlite_error)?;
422 }
423 }
424 let result = RuntimeCommitResult {
425 head_revision: next_revision,
426 checkpoint_ref: stored_checkpoint.checkpoint_ref,
427 manifest: stored_checkpoint.manifest,
428 };
429 if let Some(completed) = &commit.turn_commit {
430 tx.execute(
431 "INSERT INTO runtime_turn_commits (
432 session_id, turn_id, turn_commit_hash, result_json, committed_at_ms
433 )
434 VALUES (?1, ?2, ?3, ?4, ?5)",
435 params![
436 completed.session_id,
437 completed.turn_id,
438 completed.turn_commit_hash,
439 encode_json(&result),
440 current_epoch_ms() as i64
441 ],
442 )
443 .map_err(sqlite_error)?;
444 }
445 if let Some(completion) = commit.release_session_execution_lease.as_ref() {
446 release_session_execution_lease_conn(tx, completion)?;
447 }
448 Ok(result)
449 })();
450 match outcome {
455 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
456 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
457 }
458 })
459 .await
460 .map_err(sqlite_error)??;
461 self.maybe_auto_gc().await;
462 Ok(result)
463 }
464
465 async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError> {
466 Store::save_session_meta(self, meta).await;
467 Ok(())
468 }
469
470 async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError> {
471 Ok(Store::load_session_meta(self).await)
472 }
473}
474
475#[async_trait::async_trait]
476impl SessionExecutionLeaseStore for Store {
477 async fn try_claim_session_execution_lease(
478 &self,
479 session_id: &str,
480 owner: &LeaseOwnerIdentity,
481 lease_ttl_ms: u64,
482 ) -> Result<SessionExecutionLeaseClaimOutcome, StoreError> {
483 let session_id = session_id.to_string();
484 let owner = owner.clone();
485 self.conn
486 .write_flow(move |tx| {
487 let outcome: Result<SessionExecutionLeaseClaimOutcome, StoreError> = (|| {
488 let now = current_epoch_ms();
489 let current = load_session_execution_lease_row_conn(tx, &session_id)?;
490 if current.as_ref().is_some_and(|lease| {
491 lease.lease_token.is_some() && lease.expires_at_ms > now
492 }) {
493 let current = current.expect("checked current lease is present");
494 if current
495 .owner
496 .as_ref()
497 .is_some_and(|current_owner| current_owner.same_incarnation(&owner))
498 {
499 let expires_at = now.saturating_add(lease_ttl_ms);
500 tx.execute(
501 "UPDATE session_execution_leases
502 SET lease_expires_at_ms = ?2
503 WHERE session_id = ?1",
504 params![session_id, expires_at as i64],
505 )
506 .map_err(sqlite_error)?;
507 return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
508 SessionExecutionLease {
509 session_id,
510 owner,
511 lease_token: current.lease_token.expect("live lease token set"),
512 fencing_token: current.fencing_token,
513 claimed_at_epoch_ms: current.claimed_at_ms,
514 expires_at_epoch_ms: expires_at,
515 },
516 ));
517 }
518 return Ok(SessionExecutionLeaseClaimOutcome::Busy {
519 holder: row_to_session_execution_lease(&session_id, current)?,
520 });
521 }
522 Ok(SessionExecutionLeaseClaimOutcome::Acquired(
523 acquire_session_execution_lease_conn(
524 tx,
525 &session_id,
526 &owner,
527 current.as_ref().map_or(0, |lease| lease.fencing_token),
528 now,
529 lease_ttl_ms,
530 )?,
531 ))
532 })(
533 );
534 match outcome {
535 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
536 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
537 }
538 })
539 .await
540 .map_err(sqlite_error)?
541 }
542
543 async fn reclaim_session_execution_lease(
544 &self,
545 session_id: &str,
546 owner: &LeaseOwnerIdentity,
547 observed_holder: &SessionExecutionLeaseFence,
548 lease_ttl_ms: u64,
549 ) -> Result<SessionExecutionLeaseClaimOutcome, StoreError> {
550 let session_id = session_id.to_string();
551 let owner = owner.clone();
552 let observed_holder = observed_holder.clone();
553 self.conn
554 .write_flow(move |tx| {
555 let outcome: Result<SessionExecutionLeaseClaimOutcome, StoreError> = (|| {
556 let now = current_epoch_ms();
557 let current = load_session_execution_lease_row_conn(tx, &session_id)?;
558 let Some(current) = current else {
559 return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
560 acquire_session_execution_lease_conn(
561 tx,
562 &session_id,
563 &owner,
564 0,
565 now,
566 lease_ttl_ms,
567 )?,
568 ));
569 };
570 if current.lease_token.is_none() || current.expires_at_ms <= now {
571 return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
572 acquire_session_execution_lease_conn(
573 tx,
574 &session_id,
575 &owner,
576 current.fencing_token,
577 now,
578 lease_ttl_ms,
579 )?,
580 ));
581 }
582 let holder = row_to_session_execution_lease(&session_id, current)?;
583 if observed_holder.session_id == session_id
584 && holder.owner.same_incarnation(&observed_holder.owner)
585 && holder.lease_token == observed_holder.lease_token
586 && holder.fencing_token == observed_holder.fencing_token
587 && holder.owner.is_definitely_dead_for_claimant(&owner)
588 {
589 let fencing_token = holder.fencing_token.saturating_add(1);
590 let lease_token = format!(
591 "{}:{}:{}:{now}:{fencing_token}",
592 session_id, owner.owner_id, owner.incarnation_id
593 );
594 let expires_at = now.saturating_add(lease_ttl_ms);
595 let liveness_json = encode_liveness(&owner.liveness)?;
596 let changed = tx
597 .execute(
598 "UPDATE session_execution_leases
599 SET lease_owner_id = ?1,
600 lease_owner_incarnation_id = ?2,
601 lease_owner_liveness_json = ?3,
602 lease_token = ?4,
603 lease_fencing_token = ?5,
604 lease_claimed_at_ms = ?6,
605 lease_expires_at_ms = ?7
606 WHERE session_id = ?8
607 AND lease_owner_id = ?9
608 AND lease_owner_incarnation_id = ?10
609 AND lease_token = ?11
610 AND lease_fencing_token = ?12",
611 params![
612 owner.owner_id,
613 owner.incarnation_id,
614 liveness_json,
615 lease_token,
616 fencing_token as i64,
617 now as i64,
618 expires_at as i64,
619 session_id,
620 observed_holder.owner.owner_id,
621 observed_holder.owner.incarnation_id,
622 observed_holder.lease_token,
623 observed_holder.fencing_token as i64,
624 ],
625 )
626 .map_err(sqlite_error)?;
627 if changed == 1 {
628 return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
629 SessionExecutionLease {
630 session_id,
631 owner,
632 lease_token,
633 fencing_token,
634 claimed_at_epoch_ms: now,
635 expires_at_epoch_ms: expires_at,
636 },
637 ));
638 }
639 let current = load_session_execution_lease_row_conn(tx, &session_id)?;
640 if current.as_ref().is_some_and(|lease| {
641 lease.lease_token.is_some() && lease.expires_at_ms > now
642 }) {
643 let current = current.expect("checked current lease is present");
644 return Ok(SessionExecutionLeaseClaimOutcome::Busy {
645 holder: row_to_session_execution_lease(&session_id, current)?,
646 });
647 }
648 let previous_fencing_token =
649 current.as_ref().map_or(0, |lease| lease.fencing_token);
650 return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
651 acquire_session_execution_lease_conn(
652 tx,
653 &session_id,
654 &owner,
655 previous_fencing_token,
656 now,
657 lease_ttl_ms,
658 )?,
659 ));
660 }
661 Ok(SessionExecutionLeaseClaimOutcome::Busy { holder })
662 })(
663 );
664 match outcome {
665 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
666 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
667 }
668 })
669 .await
670 .map_err(sqlite_error)?
671 }
672
673 async fn renew_session_execution_lease(
674 &self,
675 fence: &SessionExecutionLeaseFence,
676 lease_ttl_ms: u64,
677 ) -> Result<SessionExecutionLease, StoreError> {
678 let fence = fence.clone();
679 self.conn
680 .write_flow(move |tx| {
681 let outcome: Result<SessionExecutionLease, StoreError> = (|| {
682 let now = current_epoch_ms();
683 let current = load_session_execution_lease_row_conn(tx, &fence.session_id)?;
684 let Some(current) = current else {
685 return Err(StoreError::SessionExecutionLeaseExpired {
686 session_id: fence.session_id.clone(),
687 });
688 };
689 if !current
690 .owner
691 .as_ref()
692 .is_some_and(|owner| owner.same_incarnation(&fence.owner))
693 || current.lease_token.as_deref() != Some(fence.lease_token.as_str())
694 || current.fencing_token != fence.fencing_token
695 || current.expires_at_ms <= now
696 {
697 return Err(StoreError::SessionExecutionLeaseExpired {
698 session_id: fence.session_id.clone(),
699 });
700 }
701 let expires_at = now.saturating_add(lease_ttl_ms);
702 tx.execute(
703 "UPDATE session_execution_leases
704 SET lease_expires_at_ms = ?5
705 WHERE session_id = ?1
706 AND lease_owner_id = ?2
707 AND lease_owner_incarnation_id = ?3
708 AND lease_token = ?4
709 AND lease_fencing_token = ?6",
710 params![
711 fence.session_id,
712 fence.owner.owner_id,
713 fence.owner.incarnation_id,
714 fence.lease_token,
715 expires_at as i64,
716 fence.fencing_token as i64
717 ],
718 )
719 .map_err(sqlite_error)?;
720 Ok(SessionExecutionLease {
721 session_id: fence.session_id,
722 owner: fence.owner,
723 lease_token: fence.lease_token,
724 fencing_token: fence.fencing_token,
725 claimed_at_epoch_ms: current.claimed_at_ms,
726 expires_at_epoch_ms: expires_at,
727 })
728 })();
729 match outcome {
730 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
731 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
732 }
733 })
734 .await
735 .map_err(sqlite_error)?
736 }
737
738 async fn release_session_execution_lease(
739 &self,
740 completion: &SessionExecutionLeaseCompletion,
741 ) -> Result<(), StoreError> {
742 let completion = completion.clone();
743 self.conn
744 .write_flow(move |tx| {
745 let outcome = release_session_execution_lease_conn(tx, &completion);
746 match outcome {
747 Ok(()) => Ok(TxOutcome::Commit(Ok(()))),
748 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
749 }
750 })
751 .await
752 .map_err(sqlite_error)?
753 }
754}
755
756#[async_trait::async_trait]
757impl QueuedWorkStore for Store {
758 async fn enqueue_queued_work(
759 &self,
760 batch: QueuedWorkBatchDraft,
761 ) -> Result<QueuedWorkBatch, StoreError> {
762 let nonce = self.commit_count.fetch_add(1, AtomicOrdering::Relaxed);
763 self.conn
764 .write_flow(move |tx| {
765 let outcome: Result<QueuedWorkBatch, StoreError> = (|| {
766 if let Some(source_key) = batch.source_key.as_deref() {
767 let existing_id: Option<String> = tx
768 .query_row(
769 "SELECT batch_id
770 FROM queued_work_batches
771 WHERE session_id = ?1 AND source_key = ?2",
772 params![batch.session_id, source_key],
773 |row| row.get(0),
774 )
775 .optional()
776 .map_err(sqlite_error)?;
777 if let Some(batch_id) = existing_id {
778 let existing = load_queued_batch_by_id_conn(tx, &batch_id)?
779 .ok_or_else(|| {
780 StoreError::Backend(
781 "queued work source row disappeared".to_string(),
782 )
783 })?;
784 return Ok(existing);
785 }
786 }
787 let now = current_epoch_ms();
788 let batch_id =
789 derive_batch_id(&batch.session_id, batch.source_key.as_deref(), now, Some(nonce));
790 tx.execute(
791 "INSERT INTO queued_work_batches (
792 batch_id, session_id, source_key, delivery_policy, slot_policy,
793 merge_key_json, available_at_ms, enqueued_at_ms
794 )
795 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
796 params![
797 batch_id,
798 batch.session_id,
799 batch.source_key.as_deref(),
800 batch.delivery_policy.as_str(),
801 batch.slot_policy.as_str(),
802 encode_json(&batch.merge_key),
803 batch.available_at_ms as i64,
804 now as i64,
805 ],
806 )
807 .map_err(sqlite_error)?;
808 for (index, payload) in batch.payloads.iter().enumerate() {
809 let item_id = format!("{batch_id}:item:{index}");
810 tx.execute(
811 "INSERT INTO queued_work_items (batch_id, item_index, item_id, payload_json)
812 VALUES (?1, ?2, ?3, ?4)",
813 params![batch_id, index as i64, item_id, encode_json(payload)],
814 )
815 .map_err(sqlite_error)?;
816 }
817 load_queued_batch_by_id_conn(tx, &batch_id)?.ok_or_else(|| {
818 StoreError::Backend("queued work insert disappeared".to_string())
819 })
820 })();
821 match outcome {
824 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
825 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
826 }
827 })
828 .await
829 .map_err(sqlite_error)?
830 }
831
832 async fn claim_leading_ready_session_command(
833 &self,
834 session_id: &str,
835 session_execution_lease: &SessionExecutionLeaseFence,
836 owner: &LeaseOwnerIdentity,
837 lease_ttl_ms: u64,
838 ) -> Result<Option<QueuedWorkClaim>, StoreError> {
839 let session_id = session_id.to_string();
840 let session_execution_lease = session_execution_lease.clone();
841 let owner = owner.clone();
842 self.conn
843 .write_flow(move |tx| {
844 let outcome: Result<TxOutcome<Option<QueuedWorkClaim>>, StoreError> = (|| {
845 ensure_session_execution_lease_conn(
846 tx,
847 &session_id,
848 &session_execution_lease,
849 )?;
850 let now = current_epoch_ms();
851 let candidate_rows = {
852 let mut stmt = tx
853 .prepare(
854 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
855 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
856 claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
857 claim_owner_liveness_json, claim_token, claim_expires_at_ms
858 FROM queued_work_batches
859 WHERE session_id = ?1
860 AND available_at_ms <= ?2
861 ORDER BY enqueue_seq ASC
862 LIMIT ?3",
863 )
864 .map_err(sqlite_error)?;
865 let rows = stmt
866 .query_map(
867 params![session_id, now as i64, claim_scan_limit(1)],
868 queued_batch_row_from_sql,
869 )
870 .map_err(sqlite_error)?;
871 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
872 };
873 let candidate_rows = candidate_rows
874 .into_iter()
875 .filter(|row| {
876 row.claim_token.is_none()
877 || row.claim_expires_at_ms <= now
878 || row
879 .claim_owner
880 .as_ref()
881 .is_some_and(|holder| holder.is_definitely_dead_for_claimant(&owner))
882 })
883 .collect::<Vec<_>>();
884 let candidate_batches = candidate_rows
885 .iter()
886 .map(|row| queued_work_batch_from_conn(tx, row.clone()))
887 .collect::<Result<Vec<_>, StoreError>>()?;
888 let candidates = candidate_rows
889 .iter()
890 .zip(candidate_batches.iter())
891 .map(|(row, batch)| {
892 Ok(ClaimCandidate {
893 enqueue_seq: row.enqueue_seq,
894 claim_fencing_token: row.claim_fencing_token,
895 work_class: batch.work_class().ok_or_else(|| {
896 StoreError::Backend(format!(
897 "queued-work batch `{}` has mixed or empty payload classes",
898 batch.batch_id
899 ))
900 })?,
901 delivery_policy: decode_delivery_policy(
902 row.delivery_policy.clone(),
903 )?,
904 slot_policy: decode_slot_policy(row.slot_policy.clone())?,
905 merge_key: decode_merge_key(row.merge_key_json.clone())?,
906 })
907 })
908 .collect::<Result<Vec<_>, StoreError>>()?;
909 let selected_len = select_leading_session_command(&candidates);
910 if selected_len == 0 {
911 return Ok(TxOutcome::Commit(None));
912 }
913 let mut selected = candidate_rows;
914 selected.truncate(selected_len);
915 let mut selected_batches = candidate_batches;
916 selected_batches.truncate(selected_len);
917 let lease = QueuedWorkClaimLease::derive(
918 &candidates[0],
919 &session_id,
920 &owner,
921 now,
922 lease_ttl_ms,
923 );
924 let liveness_json = encode_liveness(&owner.liveness)?;
925 for row in &selected {
926 let claimed = tx
927 .execute(
928 "UPDATE queued_work_batches
929 SET claim_id = ?3,
930 claim_owner_id = ?4,
931 claim_owner_incarnation_id = ?5,
932 claim_owner_liveness_json = ?6,
933 claim_token = ?7,
934 claim_fencing_token = claim_fencing_token + 1,
935 claim_claimed_at_ms = ?8,
936 claim_expires_at_ms = ?9
937 WHERE session_id = ?1
938 AND batch_id = ?2
939 AND (
940 claim_token IS NULL
941 OR claim_expires_at_ms <= ?8
942 OR (
943 claim_token = ?10
944 AND claim_owner_id = ?11
945 AND claim_owner_incarnation_id = ?12
946 )
947 )",
948 params![
949 session_id,
950 row.batch_id,
951 lease.claim_id,
952 owner.owner_id.as_str(),
953 owner.incarnation_id.as_str(),
954 liveness_json.as_str(),
955 lease.lease_token,
956 now as i64,
957 lease.expires_at_epoch_ms as i64,
958 row.claim_token,
959 row.claim_owner.as_ref().map(|owner| owner.owner_id.as_str()),
960 row.claim_owner
961 .as_ref()
962 .map(|owner| owner.incarnation_id.as_str())
963 ],
964 )
965 .map_err(sqlite_error)?;
966 if claimed == 0 {
967 return Ok(TxOutcome::Rollback(None));
968 }
969 }
970 Ok(TxOutcome::Commit(Some(QueuedWorkClaim {
971 session_id: session_id.clone(),
972 claim_id: lease.claim_id,
973 owner: owner.clone(),
974 lease_token: lease.lease_token,
975 fencing_token: lease.fencing_token,
976 claimed_at_epoch_ms: lease.claimed_at_epoch_ms,
977 expires_at_epoch_ms: lease.expires_at_epoch_ms,
978 batches: selected_batches,
979 })))
980 })();
981 match outcome {
982 Ok(TxOutcome::Commit(value)) => Ok(TxOutcome::Commit(Ok(value))),
983 Ok(TxOutcome::Rollback(value)) => Ok(TxOutcome::Rollback(Ok(value))),
984 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
985 }
986 })
987 .await
988 .map_err(sqlite_error)?
989 }
990
991 async fn claim_ready_queued_work(
992 &self,
993 session_id: &str,
994 session_execution_lease: &SessionExecutionLeaseFence,
995 owner: &LeaseOwnerIdentity,
996 boundary: QueuedWorkClaimBoundary,
997 lease_ttl_ms: u64,
998 max_batches: usize,
999 ) -> Result<Option<QueuedWorkClaim>, StoreError> {
1000 if max_batches == 0 {
1001 return Ok(None);
1002 }
1003 let session_id = session_id.to_string();
1004 let session_execution_lease = session_execution_lease.clone();
1005 let owner = owner.clone();
1006 self.conn
1007 .write_flow(move |tx| {
1008 let outcome: Result<TxOutcome<Option<QueuedWorkClaim>>, StoreError> = (|| {
1009 ensure_session_execution_lease_conn(
1010 tx,
1011 &session_id,
1012 &session_execution_lease,
1013 )?;
1014 let now = current_epoch_ms();
1015 let candidate_rows = {
1016 let mut stmt = tx
1017 .prepare(
1018 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1019 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1020 claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
1021 claim_owner_liveness_json, claim_token, claim_expires_at_ms
1022 FROM queued_work_batches
1023 WHERE session_id = ?1
1024 AND available_at_ms <= ?2
1025 ORDER BY enqueue_seq ASC
1026 LIMIT ?3",
1027 )
1028 .map_err(sqlite_error)?;
1029 let rows = stmt
1030 .query_map(
1031 params![session_id, now as i64, claim_scan_limit(max_batches)],
1032 queued_batch_row_from_sql,
1033 )
1034 .map_err(sqlite_error)?;
1035 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1036 };
1037 let candidate_rows = candidate_rows
1038 .into_iter()
1039 .filter(|row| {
1040 row.claim_token.is_none()
1041 || row.claim_expires_at_ms <= now
1042 || row
1043 .claim_owner
1044 .as_ref()
1045 .is_some_and(|holder| holder.is_definitely_dead_for_claimant(&owner))
1046 })
1047 .collect::<Vec<_>>();
1048 let candidate_batches = candidate_rows
1049 .iter()
1050 .map(|row| queued_work_batch_from_conn(tx, row.clone()))
1051 .collect::<Result<Vec<_>, StoreError>>()?;
1052 let candidates = candidate_rows
1053 .iter()
1054 .zip(candidate_batches.iter())
1055 .map(|(row, batch)| {
1056 Ok(ClaimCandidate {
1057 enqueue_seq: row.enqueue_seq,
1058 claim_fencing_token: row.claim_fencing_token,
1059 work_class: batch.work_class().ok_or_else(|| {
1060 StoreError::Backend(format!(
1061 "queued-work batch `{}` has mixed or empty payload classes",
1062 batch.batch_id
1063 ))
1064 })?,
1065 delivery_policy: decode_delivery_policy(
1066 row.delivery_policy.clone(),
1067 )?,
1068 slot_policy: decode_slot_policy(row.slot_policy.clone())?,
1069 merge_key: decode_merge_key(row.merge_key_json.clone())?,
1070 })
1071 })
1072 .collect::<Result<Vec<_>, StoreError>>()?;
1073 let selected_len =
1074 select_turn_work_claim_prefix(&candidates, boundary, max_batches);
1075 if selected_len == 0 {
1076 return Ok(TxOutcome::Commit(None));
1077 }
1078 let mut selected = candidate_rows;
1079 selected.truncate(selected_len);
1080 let mut selected_batches = candidate_batches;
1081 selected_batches.truncate(selected_len);
1082 let lease = QueuedWorkClaimLease::derive(
1083 &candidates[0],
1084 &session_id,
1085 &owner,
1086 now,
1087 lease_ttl_ms,
1088 );
1089 let liveness_json = encode_liveness(&owner.liveness)?;
1090 for row in &selected {
1091 let claimed = tx
1100 .execute(
1101 "UPDATE queued_work_batches
1102 SET claim_id = ?3,
1103 claim_owner_id = ?4,
1104 claim_owner_incarnation_id = ?5,
1105 claim_owner_liveness_json = ?6,
1106 claim_token = ?7,
1107 claim_fencing_token = claim_fencing_token + 1,
1108 claim_claimed_at_ms = ?8,
1109 claim_expires_at_ms = ?9
1110 WHERE session_id = ?1
1111 AND batch_id = ?2
1112 AND (
1113 claim_token IS NULL
1114 OR claim_expires_at_ms <= ?8
1115 OR (
1116 claim_token = ?10
1117 AND claim_owner_id = ?11
1118 AND claim_owner_incarnation_id = ?12
1119 )
1120 )",
1121 params![
1122 session_id,
1123 row.batch_id,
1124 lease.claim_id,
1125 owner.owner_id.as_str(),
1126 owner.incarnation_id.as_str(),
1127 liveness_json.as_str(),
1128 lease.lease_token,
1129 now as i64,
1130 lease.expires_at_epoch_ms as i64,
1131 row.claim_token,
1132 row.claim_owner.as_ref().map(|owner| owner.owner_id.as_str()),
1133 row.claim_owner
1134 .as_ref()
1135 .map(|owner| owner.incarnation_id.as_str())
1136 ],
1137 )
1138 .map_err(sqlite_error)?;
1139 if claimed == 0 {
1140 return Ok(TxOutcome::Rollback(None));
1144 }
1145 }
1146 Ok(TxOutcome::Commit(Some(QueuedWorkClaim {
1147 session_id: session_id.clone(),
1148 claim_id: lease.claim_id,
1149 owner: owner.clone(),
1150 lease_token: lease.lease_token,
1151 fencing_token: lease.fencing_token,
1152 claimed_at_epoch_ms: lease.claimed_at_epoch_ms,
1153 expires_at_epoch_ms: lease.expires_at_epoch_ms,
1154 batches: selected_batches,
1155 })))
1156 })();
1157 match outcome {
1161 Ok(TxOutcome::Commit(value)) => Ok(TxOutcome::Commit(Ok(value))),
1162 Ok(TxOutcome::Rollback(value)) => Ok(TxOutcome::Rollback(Ok(value))),
1163 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1164 }
1165 })
1166 .await
1167 .map_err(sqlite_error)?
1168 }
1169
1170 async fn renew_queued_work_claim(
1171 &self,
1172 claim: &QueuedWorkClaim,
1173 lease_ttl_ms: u64,
1174 ) -> Result<QueuedWorkClaim, StoreError> {
1175 let now = current_epoch_ms();
1176 let expires_at = now.saturating_add(lease_ttl_ms);
1177 let session_id = claim.session_id.clone();
1178 let claim_id = claim.claim_id.clone();
1179 let lease_token = claim.lease_token.clone();
1180 let changed = self
1181 .conn
1182 .write(move |tx| {
1183 tx.execute(
1184 "UPDATE queued_work_batches
1185 SET claim_expires_at_ms = ?4
1186 WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
1187 params![session_id, claim_id, lease_token, expires_at as i64],
1188 )
1189 })
1190 .await
1191 .map_err(sqlite_error)?;
1192 renewed_claim(claim, changed, expires_at)
1193 }
1194
1195 async fn abandon_queued_work_claim(&self, claim: &QueuedWorkClaim) -> Result<(), StoreError> {
1196 let session_id = claim.session_id.clone();
1197 let claim_id = claim.claim_id.clone();
1198 let lease_token = claim.lease_token.clone();
1199 self.conn
1200 .write(move |tx| {
1201 tx.execute(
1202 "UPDATE queued_work_batches
1203 SET claim_id = NULL,
1204 claim_owner_id = NULL,
1205 claim_owner_incarnation_id = NULL,
1206 claim_owner_liveness_json = NULL,
1207 claim_token = NULL,
1208 claim_claimed_at_ms = 0,
1209 claim_expires_at_ms = 0
1210 WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
1211 params![session_id, claim_id, lease_token],
1212 )
1213 })
1214 .await
1215 .map_err(sqlite_error)?;
1216 Ok(())
1217 }
1218
1219 async fn cancel_queued_work_batch(
1220 &self,
1221 session_id: &str,
1222 batch_id: &str,
1223 ) -> Result<Option<QueuedWorkBatch>, StoreError> {
1224 let session_id = session_id.to_string();
1225 let batch_id = batch_id.to_string();
1226 self.conn
1227 .write_flow(move |tx| {
1228 let outcome: Result<Option<QueuedWorkBatch>, StoreError> = (|| {
1229 let now = current_epoch_ms() as i64;
1230 let row = tx
1231 .query_row(
1232 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1233 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1234 claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
1235 claim_owner_liveness_json, claim_token, claim_expires_at_ms
1236 FROM queued_work_batches
1237 WHERE session_id = ?1
1238 AND batch_id = ?2
1239 AND (claim_token IS NULL OR claim_expires_at_ms <= ?3)",
1240 params![session_id, batch_id, now],
1241 queued_batch_row_from_sql,
1242 )
1243 .optional()
1244 .map_err(sqlite_error)?;
1245 let Some(row) = row else {
1246 return Ok(None);
1247 };
1248 let batch = queued_work_batch_from_conn(tx, row)?;
1249 tx.execute(
1250 "DELETE FROM queued_work_batches
1251 WHERE session_id = ?1
1252 AND batch_id = ?2
1253 AND (claim_token IS NULL OR claim_expires_at_ms <= ?3)",
1254 params![session_id, batch_id, now],
1255 )
1256 .map_err(sqlite_error)?;
1257 Ok(Some(batch))
1258 })();
1259 match outcome {
1260 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
1261 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1262 }
1263 })
1264 .await
1265 .map_err(sqlite_error)?
1266 }
1267
1268 async fn list_queued_work(&self, session_id: &str) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1269 let session_id = session_id.to_string();
1270 self.conn
1271 .call(move |conn| {
1272 let outcome: Result<Vec<QueuedWorkBatch>, StoreError> = (|| {
1273 let rows = {
1274 let mut stmt = conn
1275 .prepare(
1276 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1277 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1278 claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
1279 claim_owner_liveness_json, claim_token, claim_expires_at_ms
1280 FROM queued_work_batches
1281 WHERE session_id = ?1
1282 ORDER BY enqueue_seq ASC",
1283 )
1284 .map_err(sqlite_error)?;
1285 let rows = stmt
1286 .query_map(params![session_id], queued_batch_row_from_sql)
1287 .map_err(sqlite_error)?;
1288 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1289 };
1290 rows.into_iter()
1291 .map(|row| queued_work_batch_from_conn(conn, row))
1292 .collect()
1293 })();
1294 Ok(outcome)
1295 })
1296 .await
1297 .map_err(sqlite_error)?
1298 }
1299
1300 async fn list_pending_queued_work(
1301 &self,
1302 session_id: &str,
1303 ) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1304 let session_id = session_id.to_string();
1305 self.conn
1306 .call(move |conn| {
1307 let outcome: Result<Vec<QueuedWorkBatch>, StoreError> = (|| {
1308 let now = current_epoch_ms();
1309 let rows = {
1310 let mut stmt = conn
1311 .prepare(
1312 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1313 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1314 claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
1315 claim_owner_liveness_json, claim_token, claim_expires_at_ms
1316 FROM queued_work_batches
1317 WHERE session_id = ?1
1318 AND (claim_token IS NULL OR claim_expires_at_ms <= ?2)
1319 ORDER BY enqueue_seq ASC",
1320 )
1321 .map_err(sqlite_error)?;
1322 let rows = stmt
1323 .query_map(
1324 params![session_id, now as i64],
1325 queued_batch_row_from_sql,
1326 )
1327 .map_err(sqlite_error)?;
1328 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1329 };
1330 rows.into_iter()
1331 .map(|row| queued_work_batch_from_conn(conn, row))
1332 .collect()
1333 })();
1334 Ok(outcome)
1335 })
1336 .await
1337 .map_err(sqlite_error)?
1338 }
1339}
1340
1341#[async_trait::async_trait]
1342impl TurnInputStore for Store {
1343 async fn enqueue_pending_turn_input(
1344 &self,
1345 draft: lash_core::PendingTurnInputDraft,
1346 ) -> Result<lash_core::PendingTurnInput, StoreError> {
1347 let nonce = self.commit_count.fetch_add(1, AtomicOrdering::Relaxed);
1348 self.conn
1349 .write_flow(move |tx| {
1350 let outcome: Result<lash_core::PendingTurnInput, StoreError> = (|| {
1351 if let Some(source_key) = draft.source_key.as_deref() {
1352 let existing_id: Option<String> = tx
1353 .query_row(
1354 "SELECT input_id
1355 FROM pending_turn_inputs
1356 WHERE session_id = ?1 AND source_key = ?2",
1357 params![draft.session_id, source_key],
1358 |row| row.get(0),
1359 )
1360 .optional()
1361 .map_err(sqlite_error)?;
1362 if let Some(input_id) = existing_id {
1363 let existing = load_pending_turn_input_by_id_conn(
1364 tx,
1365 &draft.session_id,
1366 &input_id,
1367 )?
1368 .ok_or_else(|| {
1369 StoreError::Backend(
1370 "pending turn input source row disappeared".to_string(),
1371 )
1372 })?;
1373 if !draft.submitted_content_matches(&existing).map_err(|err| {
1374 StoreError::Backend(format!(
1375 "failed to compare pending turn input submission: {err}"
1376 ))
1377 })? {
1378 return Err(StoreError::PendingTurnInputSourceKeyConflict {
1379 session_id: draft.session_id.clone(),
1380 source_key: source_key.to_string(),
1381 existing_input_id: existing.input_id.clone(),
1382 });
1383 }
1384 return Ok(existing);
1385 }
1386 }
1387 let now = current_epoch_ms();
1388 let input_id = draft.input_id.clone().unwrap_or_else(|| {
1389 derive_pending_turn_input_id(
1390 &draft.session_id,
1391 draft.source_key.as_deref(),
1392 now,
1393 nonce,
1394 )
1395 });
1396 let state = match draft.ingress {
1397 lash_core::TurnInputIngress::ActiveTurn { .. } => {
1398 lash_core::TurnInputState::PendingActive
1399 }
1400 lash_core::TurnInputIngress::NextTurn => {
1401 lash_core::TurnInputState::DeferredNextTurn
1402 }
1403 };
1404 tx.execute(
1405 "INSERT INTO pending_turn_inputs (
1406 input_id, session_id, source_key, ingress_json, state,
1407 input_json, enqueued_at_ms
1408 )
1409 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1410 params![
1411 input_id,
1412 draft.session_id,
1413 draft.source_key.as_deref(),
1414 encode_json(&draft.ingress),
1415 state.as_str(),
1416 encode_json(&draft.input),
1417 now as i64,
1418 ],
1419 )
1420 .map_err(sqlite_error)?;
1421 load_pending_turn_input_by_id_conn(tx, &draft.session_id, &input_id)?
1422 .ok_or_else(|| {
1423 StoreError::Backend("pending turn input insert disappeared".to_string())
1424 })
1425 })();
1426 match outcome {
1427 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
1428 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1429 }
1430 })
1431 .await
1432 .map_err(sqlite_error)?
1433 }
1434
1435 async fn list_pending_turn_inputs(
1436 &self,
1437 session_id: &str,
1438 ) -> Result<Vec<lash_core::PendingTurnInput>, StoreError> {
1439 let session_id = session_id.to_string();
1440 self.conn
1441 .call(move |conn| {
1442 let outcome: Result<Vec<lash_core::PendingTurnInput>, StoreError> = (|| {
1443 let now = current_epoch_ms();
1444 let rows = {
1445 let mut stmt = conn
1446 .prepare(
1447 "SELECT enqueue_seq, input_id, session_id, source_key, ingress_json,
1448 state, input_json, enqueued_at_ms, claim_id, claim_fencing_token,
1449 claim_owner_id, claim_owner_incarnation_id,
1450 claim_owner_liveness_json, claim_token, claim_expires_at_ms
1451 FROM pending_turn_inputs
1452 WHERE session_id = ?1
1453 AND state IN (?2, ?3)
1454 AND (claim_token IS NULL OR claim_expires_at_ms <= ?4)
1455 ORDER BY enqueue_seq ASC",
1456 )
1457 .map_err(sqlite_error)?;
1458 let rows = stmt
1459 .query_map(
1460 params![
1461 session_id,
1462 lash_core::TurnInputState::PendingActive.as_str(),
1463 lash_core::TurnInputState::DeferredNextTurn.as_str(),
1464 now as i64
1465 ],
1466 pending_turn_input_row_from_sql,
1467 )
1468 .map_err(sqlite_error)?;
1469 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1470 };
1471 rows.into_iter().map(pending_turn_input_from_row).collect()
1472 })(
1473 );
1474 Ok(outcome)
1475 })
1476 .await
1477 .map_err(sqlite_error)?
1478 }
1479
1480 async fn cancel_pending_turn_inputs(
1481 &self,
1482 session_id: &str,
1483 targets: &[lash_core::PendingTurnInputCancelTarget],
1484 ) -> Result<Vec<lash_core::PendingTurnInputCancelResult>, StoreError> {
1485 let session_id = session_id.to_string();
1486 let targets = targets.to_vec();
1487 self.conn
1488 .write_flow(move |tx| {
1489 let outcome: Result<Vec<lash_core::PendingTurnInputCancelResult>, StoreError> =
1490 (|| {
1491 let now = current_epoch_ms();
1492 let mut results = Vec::with_capacity(targets.len());
1493 for target in targets {
1494 let outcome = match load_pending_turn_input_row_by_target_conn(
1495 tx,
1496 &session_id,
1497 &target,
1498 )? {
1499 Some(row) => cancel_pending_turn_input_row_conn(tx, row, now)?,
1500 None => lash_core::PendingTurnInputCancelOutcome::NotFound,
1501 };
1502 results
1503 .push(lash_core::PendingTurnInputCancelResult { target, outcome });
1504 }
1505 Ok(results)
1506 })();
1507 match outcome {
1508 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
1509 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1510 }
1511 })
1512 .await
1513 .map_err(sqlite_error)?
1514 }
1515
1516 async fn cancel_pending_turn_input_suffix(
1517 &self,
1518 session_id: &str,
1519 anchor: &lash_core::PendingTurnInputCancelTarget,
1520 ) -> Result<lash_core::PendingTurnInputSuffixCancelOutcome, StoreError> {
1521 let session_id = session_id.to_string();
1522 let anchor = anchor.clone();
1523 self.conn
1524 .write_flow(move |tx| {
1525 let outcome: Result<lash_core::PendingTurnInputSuffixCancelOutcome, StoreError> =
1526 (|| {
1527 let now = current_epoch_ms();
1528 let Some(anchor_row) =
1529 load_pending_turn_input_row_by_target_conn(tx, &session_id, &anchor)?
1530 else {
1531 return Ok(
1532 lash_core::PendingTurnInputSuffixCancelOutcome::AnchorNotFound {
1533 anchor,
1534 },
1535 );
1536 };
1537 let rows = {
1538 let mut stmt = tx
1539 .prepare(
1540 "SELECT enqueue_seq, input_id, session_id, source_key, ingress_json,
1541 state, input_json, enqueued_at_ms, claim_id, claim_fencing_token,
1542 claim_owner_id, claim_owner_incarnation_id,
1543 claim_owner_liveness_json, claim_token, claim_expires_at_ms
1544 FROM pending_turn_inputs
1545 WHERE session_id = ?1 AND enqueue_seq >= ?2
1546 ORDER BY enqueue_seq ASC",
1547 )
1548 .map_err(sqlite_error)?;
1549 let rows = stmt
1550 .query_map(
1551 params![session_id, anchor_row.enqueue_seq as i64],
1552 pending_turn_input_row_from_sql,
1553 )
1554 .map_err(sqlite_error)?;
1555 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1556 };
1557 let mut outcomes = Vec::with_capacity(rows.len());
1558 for row in rows {
1559 outcomes.push(cancel_pending_turn_input_row_conn(tx, row, now)?);
1560 }
1561 Ok(lash_core::PendingTurnInputSuffixCancelOutcome::Outcomes {
1562 anchor,
1563 outcomes,
1564 })
1565 })();
1566 match outcome {
1567 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
1568 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1569 }
1570 })
1571 .await
1572 .map_err(sqlite_error)?
1573 }
1574
1575 async fn claim_active_turn_inputs(
1576 &self,
1577 session_id: &str,
1578 session_execution_lease: &SessionExecutionLeaseFence,
1579 owner: &LeaseOwnerIdentity,
1580 turn_id: &str,
1581 checkpoint: lash_core::CheckpointKind,
1582 lease_ttl_ms: u64,
1583 max_inputs: usize,
1584 ) -> Result<Option<lash_core::TurnInputClaim>, StoreError> {
1585 claim_pending_turn_inputs_sqlite(
1586 &self.conn,
1587 session_id,
1588 session_execution_lease,
1589 owner,
1590 lease_ttl_ms,
1591 max_inputs,
1592 lash_core::TurnInputClaimMode::ActiveTurn {
1593 turn_id: turn_id.to_string(),
1594 checkpoint,
1595 },
1596 )
1597 .await
1598 }
1599
1600 async fn claim_next_turn_inputs(
1601 &self,
1602 session_id: &str,
1603 session_execution_lease: &SessionExecutionLeaseFence,
1604 owner: &LeaseOwnerIdentity,
1605 lease_ttl_ms: u64,
1606 max_inputs: usize,
1607 ) -> Result<Option<lash_core::TurnInputClaim>, StoreError> {
1608 claim_pending_turn_inputs_sqlite(
1609 &self.conn,
1610 session_id,
1611 session_execution_lease,
1612 owner,
1613 lease_ttl_ms,
1614 max_inputs,
1615 lash_core::TurnInputClaimMode::NextTurn,
1616 )
1617 .await
1618 }
1619
1620 async fn abandon_turn_input_claim(
1621 &self,
1622 claim: &lash_core::TurnInputClaim,
1623 ) -> Result<(), StoreError> {
1624 let session_id = claim.session_id.clone();
1625 let claim_id = claim.claim_id.clone();
1626 let lease_token = claim.lease_token.clone();
1627 let restored_state = match claim.mode {
1628 lash_core::TurnInputClaimMode::ActiveTurn { .. } => {
1629 lash_core::TurnInputState::PendingActive
1630 }
1631 lash_core::TurnInputClaimMode::NextTurn => lash_core::TurnInputState::DeferredNextTurn,
1632 };
1633 self.conn
1634 .write(move |tx| {
1635 tx.execute(
1636 "UPDATE pending_turn_inputs
1637 SET state = CASE
1638 WHEN state = ?4 THEN ?5
1639 ELSE state
1640 END,
1641 claim_id = NULL,
1642 claim_owner_id = NULL,
1643 claim_owner_incarnation_id = NULL,
1644 claim_owner_liveness_json = NULL,
1645 claim_token = NULL,
1646 claim_claimed_at_ms = 0,
1647 claim_expires_at_ms = 0
1648 WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
1649 params![
1650 session_id,
1651 claim_id,
1652 lease_token,
1653 lash_core::TurnInputState::Accepted.as_str(),
1654 restored_state.as_str(),
1655 ],
1656 )
1657 })
1658 .await
1659 .map_err(sqlite_error)?;
1660 Ok(())
1661 }
1662}
1663
1664#[async_trait::async_trait]
1665impl StoreMaintenance for Store {
1666 async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError> {
1667 if ids.is_empty() {
1668 return Ok(());
1669 }
1670 let ids = ids.to_vec();
1671 self.conn
1672 .write(move |tx| {
1673 let mut stmt =
1674 tx.prepare("UPDATE graph_nodes SET tombstoned = 1 WHERE node_id = ?1")?;
1675 for id in &ids {
1676 stmt.execute(params![id])?;
1677 }
1678 Ok(())
1679 })
1680 .await
1681 .map_err(sqlite_error)
1682 }
1683
1684 async fn vacuum(&self) -> Result<VacuumReport, StoreError> {
1685 let (removed_node_count, removed_pending_turn_input_tombstone_count) = self
1686 .conn
1687 .write(move |tx| {
1688 let removed_node_count =
1689 tx.execute("DELETE FROM graph_nodes WHERE tombstoned = 1", [])?;
1690 let removed_pending_turn_input_tombstone_count = tx.execute(
1691 "DELETE FROM pending_turn_inputs
1692 WHERE state IN (?1, ?2)",
1693 params![
1694 lash_core::TurnInputState::Cancelled.as_str(),
1695 lash_core::TurnInputState::Completed.as_str()
1696 ],
1697 )?;
1698 Ok((
1699 removed_node_count,
1700 removed_pending_turn_input_tombstone_count,
1701 ))
1702 })
1703 .await
1704 .map_err(sqlite_error)?;
1705 Ok(VacuumReport {
1706 removed_node_count,
1707 removed_pending_turn_input_tombstone_count,
1708 })
1709 }
1710
1711 async fn gc_unreachable(&self) -> Result<GcReport, StoreError> {
1712 Ok(Store::gc_unreachable(self).await)
1713 }
1714}
1715
1716fn derive_pending_turn_input_id(
1717 session_id: &str,
1718 source_key: Option<&str>,
1719 now_epoch_ms: u64,
1720 nonce: u64,
1721) -> String {
1722 format!(
1723 "ti:{:x}",
1724 Sha256::digest(format!("{session_id}:{source_key:?}:{now_epoch_ms}:{nonce}").as_bytes())
1725 )
1726}
1727
1728fn cancel_pending_turn_input_row_conn(
1729 conn: &Connection,
1730 row: PendingTurnInputRow,
1731 now_epoch_ms: u64,
1732) -> Result<lash_core::PendingTurnInputCancelOutcome, StoreError> {
1733 let mut input = pending_turn_input_from_row(row.clone())?;
1734 match input.state {
1735 lash_core::TurnInputState::Cancelled => Ok(
1736 lash_core::PendingTurnInputCancelOutcome::AlreadyCancelled(input),
1737 ),
1738 lash_core::TurnInputState::Completed => Ok(
1739 lash_core::PendingTurnInputCancelOutcome::AlreadyCompleted(input),
1740 ),
1741 lash_core::TurnInputState::Accepted => {
1742 Ok(lash_core::PendingTurnInputCancelOutcome::AlreadyClaimed {
1743 claim: pending_turn_input_claim_diagnostics_from_row(&row, input.state),
1744 input,
1745 })
1746 }
1747 lash_core::TurnInputState::PendingActive | lash_core::TurnInputState::DeferredNextTurn => {
1748 let live_claim = row.claim_token.is_some() && row.claim_expires_at_ms > now_epoch_ms;
1749 if live_claim {
1750 return Ok(lash_core::PendingTurnInputCancelOutcome::AlreadyClaimed {
1751 claim: pending_turn_input_claim_diagnostics_from_row(&row, input.state),
1752 input,
1753 });
1754 }
1755 conn.execute(
1756 "UPDATE pending_turn_inputs
1757 SET state = ?3,
1758 claim_id = NULL,
1759 claim_owner_id = NULL,
1760 claim_owner_incarnation_id = NULL,
1761 claim_owner_liveness_json = NULL,
1762 claim_token = NULL,
1763 claim_claimed_at_ms = 0,
1764 claim_expires_at_ms = 0
1765 WHERE session_id = ?1 AND input_id = ?2",
1766 params![
1767 row.session_id,
1768 row.input_id,
1769 lash_core::TurnInputState::Cancelled.as_str(),
1770 ],
1771 )
1772 .map_err(sqlite_error)?;
1773 input.state = lash_core::TurnInputState::Cancelled;
1774 Ok(lash_core::PendingTurnInputCancelOutcome::Cancelled(input))
1775 }
1776 }
1777}
1778
1779async fn claim_pending_turn_inputs_sqlite(
1780 conn: &SqliteConnection,
1781 session_id: &str,
1782 session_execution_lease: &SessionExecutionLeaseFence,
1783 owner: &LeaseOwnerIdentity,
1784 lease_ttl_ms: u64,
1785 max_inputs: usize,
1786 mode: lash_core::TurnInputClaimMode,
1787) -> Result<Option<lash_core::TurnInputClaim>, StoreError> {
1788 if max_inputs == 0 {
1789 return Ok(None);
1790 }
1791 let session_id = session_id.to_string();
1792 let session_execution_lease = session_execution_lease.clone();
1793 let owner = owner.clone();
1794 conn.write_flow(move |tx| {
1795 let outcome: Result<TxOutcome<Option<lash_core::TurnInputClaim>>, StoreError> = (|| {
1796 ensure_session_execution_lease_conn(tx, &session_id, &session_execution_lease)?;
1797 let now = current_epoch_ms();
1798 let wanted_state = match &mode {
1799 lash_core::TurnInputClaimMode::ActiveTurn { .. } => {
1800 lash_core::TurnInputState::PendingActive
1801 }
1802 lash_core::TurnInputClaimMode::NextTurn => {
1803 lash_core::TurnInputState::DeferredNextTurn
1804 }
1805 };
1806 let candidate_rows = {
1807 let mut stmt = tx
1808 .prepare(
1809 "SELECT enqueue_seq, input_id, session_id, source_key, ingress_json,
1810 state, input_json, enqueued_at_ms, claim_id, claim_fencing_token,
1811 claim_owner_id, claim_owner_incarnation_id,
1812 claim_owner_liveness_json, claim_token, claim_expires_at_ms
1813 FROM pending_turn_inputs
1814 WHERE session_id = ?1 AND state = ?2
1815 ORDER BY enqueue_seq ASC
1816 LIMIT ?3",
1817 )
1818 .map_err(sqlite_error)?;
1819 let rows = stmt
1820 .query_map(
1821 params![
1822 session_id,
1823 wanted_state.as_str(),
1824 (max_inputs as i64).saturating_add(32)
1825 ],
1826 pending_turn_input_row_from_sql,
1827 )
1828 .map_err(sqlite_error)?;
1829 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1830 };
1831 let mut selected = Vec::new();
1832 for row in candidate_rows {
1833 let claim_available = row.claim_token.is_none()
1834 || row.claim_expires_at_ms <= now
1835 || row
1836 .claim_owner
1837 .as_ref()
1838 .is_some_and(|holder| holder.is_definitely_dead_for_claimant(&owner));
1839 if !claim_available {
1840 continue;
1841 }
1842 let input = pending_turn_input_from_row(row.clone())?;
1843 let matches_mode = match &mode {
1844 lash_core::TurnInputClaimMode::ActiveTurn {
1845 turn_id,
1846 checkpoint,
1847 } => {
1848 input
1849 .ingress
1850 .active_turn_id()
1851 .is_some_and(|active| active == turn_id)
1852 && input.ingress.admits_checkpoint(*checkpoint)
1853 }
1854 lash_core::TurnInputClaimMode::NextTurn => input.state.is_next_turn_pending(),
1855 };
1856 if matches_mode {
1857 selected.push((row, input));
1858 if selected.len() >= max_inputs {
1859 break;
1860 }
1861 }
1862 }
1863 let Some((head, _)) = selected.first() else {
1864 return Ok(TxOutcome::Commit(None));
1865 };
1866 let lease = TurnInputClaimLease::derive(head, &session_id, &owner, now, lease_ttl_ms);
1867 let liveness_json = encode_liveness(&owner.liveness)?;
1868 let state_after_claim = match &mode {
1869 lash_core::TurnInputClaimMode::ActiveTurn { .. } => {
1870 lash_core::TurnInputState::Accepted
1871 }
1872 lash_core::TurnInputClaimMode::NextTurn => {
1873 lash_core::TurnInputState::DeferredNextTurn
1874 }
1875 };
1876 let mut inputs = Vec::new();
1877 for (row, mut input) in selected {
1878 let claimed = tx
1879 .execute(
1880 "UPDATE pending_turn_inputs
1881 SET state = ?3,
1882 claim_id = ?4,
1883 claim_owner_id = ?5,
1884 claim_owner_incarnation_id = ?6,
1885 claim_owner_liveness_json = ?7,
1886 claim_token = ?8,
1887 claim_fencing_token = claim_fencing_token + 1,
1888 claim_claimed_at_ms = ?9,
1889 claim_expires_at_ms = ?10
1890 WHERE session_id = ?1
1891 AND input_id = ?2
1892 AND (
1893 claim_token IS NULL
1894 OR claim_expires_at_ms <= ?9
1895 OR (
1896 claim_token = ?11
1897 AND claim_owner_id = ?12
1898 AND claim_owner_incarnation_id = ?13
1899 )
1900 )",
1901 params![
1902 session_id,
1903 row.input_id,
1904 state_after_claim.as_str(),
1905 lease.claim_id,
1906 owner.owner_id.as_str(),
1907 owner.incarnation_id.as_str(),
1908 liveness_json.as_str(),
1909 lease.lease_token,
1910 now as i64,
1911 lease.expires_at_epoch_ms as i64,
1912 row.claim_token,
1913 row.claim_owner
1914 .as_ref()
1915 .map(|owner| owner.owner_id.as_str()),
1916 row.claim_owner
1917 .as_ref()
1918 .map(|owner| owner.incarnation_id.as_str())
1919 ],
1920 )
1921 .map_err(sqlite_error)?;
1922 if claimed == 0 {
1923 return Ok(TxOutcome::Rollback(None));
1924 }
1925 input.state = state_after_claim;
1926 inputs.push(input);
1927 }
1928 Ok(TxOutcome::Commit(Some(lash_core::TurnInputClaim {
1929 session_id: session_id.clone(),
1930 claim_id: lease.claim_id,
1931 owner: owner.clone(),
1932 lease_token: lease.lease_token,
1933 fencing_token: lease.fencing_token,
1934 claimed_at_epoch_ms: lease.claimed_at_epoch_ms,
1935 expires_at_epoch_ms: lease.expires_at_epoch_ms,
1936 mode,
1937 inputs,
1938 })))
1939 })(
1940 );
1941 match outcome {
1942 Ok(TxOutcome::Commit(value)) => Ok(TxOutcome::Commit(Ok(value))),
1943 Ok(TxOutcome::Rollback(value)) => Ok(TxOutcome::Rollback(Ok(value))),
1944 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1945 }
1946 })
1947 .await
1948 .map_err(sqlite_error)?
1949}
1950
1951struct SessionExecutionLeaseRow {
1952 owner: Option<LeaseOwnerIdentity>,
1953 lease_token: Option<String>,
1954 fencing_token: u64,
1955 claimed_at_ms: u64,
1956 expires_at_ms: u64,
1957}
1958
1959fn load_session_execution_lease_row_conn(
1960 conn: &Connection,
1961 session_id: &str,
1962) -> Result<Option<SessionExecutionLeaseRow>, StoreError> {
1963 let row = conn
1964 .query_row(
1965 "SELECT lease_owner_id, lease_token, lease_fencing_token,
1966 lease_claimed_at_ms, lease_expires_at_ms,
1967 lease_owner_incarnation_id, lease_owner_liveness_json
1968 FROM session_execution_leases
1969 WHERE session_id = ?1",
1970 params![session_id],
1971 |row| {
1972 let owner_id: Option<String> = row.get(0)?;
1973 let incarnation_id: Option<String> = row.get(5)?;
1974 let liveness_json: Option<String> = row.get(6)?;
1975 Ok(SessionExecutionLeaseRow {
1976 owner: lease_owner_from_columns(owner_id, incarnation_id, liveness_json),
1977 lease_token: row.get(1)?,
1978 fencing_token: row.get::<_, i64>(2)? as u64,
1979 claimed_at_ms: row.get::<_, i64>(3)? as u64,
1980 expires_at_ms: row.get::<_, i64>(4)? as u64,
1981 })
1982 },
1983 )
1984 .optional()
1985 .map_err(sqlite_error)?;
1986 Ok(row)
1987}
1988
1989fn lease_owner_from_columns(
1990 owner_id: Option<String>,
1991 incarnation_id: Option<String>,
1992 liveness_json: Option<String>,
1993) -> Option<LeaseOwnerIdentity> {
1994 owner_id.map(|owner_id| LeaseOwnerIdentity {
1995 incarnation_id: incarnation_id.unwrap_or_else(|| owner_id.clone()),
1996 owner_id,
1997 liveness: liveness_json
1998 .as_deref()
1999 .and_then(|json| serde_json::from_str(json).ok())
2000 .unwrap_or(LeaseOwnerLiveness::Opaque),
2001 })
2002}
2003
2004fn encode_liveness(liveness: &LeaseOwnerLiveness) -> Result<String, StoreError> {
2005 serde_json::to_string(liveness)
2006 .map_err(|err| StoreError::Backend(format!("failed to encode lease liveness: {err}")))
2007}
2008
2009fn row_to_session_execution_lease(
2010 session_id: &str,
2011 row: SessionExecutionLeaseRow,
2012) -> Result<SessionExecutionLease, StoreError> {
2013 Ok(SessionExecutionLease {
2014 session_id: session_id.to_string(),
2015 owner: row
2016 .owner
2017 .ok_or_else(|| StoreError::Backend("live session lease missing owner".to_string()))?,
2018 lease_token: row.lease_token.ok_or_else(|| {
2019 StoreError::Backend("live session lease missing lease token".to_string())
2020 })?,
2021 fencing_token: row.fencing_token,
2022 claimed_at_epoch_ms: row.claimed_at_ms,
2023 expires_at_epoch_ms: row.expires_at_ms,
2024 })
2025}
2026
2027fn acquire_session_execution_lease_conn(
2028 conn: &Connection,
2029 session_id: &str,
2030 owner: &LeaseOwnerIdentity,
2031 previous_fencing_token: u64,
2032 now: u64,
2033 lease_ttl_ms: u64,
2034) -> Result<SessionExecutionLease, StoreError> {
2035 let fencing_token = previous_fencing_token.saturating_add(1);
2036 let lease_token = format!(
2037 "{}:{}:{}:{now}:{fencing_token}",
2038 session_id, owner.owner_id, owner.incarnation_id
2039 );
2040 let expires_at = now.saturating_add(lease_ttl_ms);
2041 let liveness_json = encode_liveness(&owner.liveness)?;
2042 conn.execute(
2043 "INSERT INTO session_execution_leases (
2044 session_id, lease_owner_id, lease_owner_incarnation_id, lease_owner_liveness_json,
2045 lease_token, lease_fencing_token, lease_claimed_at_ms, lease_expires_at_ms
2046 )
2047 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
2048 ON CONFLICT(session_id) DO UPDATE SET
2049 lease_owner_id = excluded.lease_owner_id,
2050 lease_owner_incarnation_id = excluded.lease_owner_incarnation_id,
2051 lease_owner_liveness_json = excluded.lease_owner_liveness_json,
2052 lease_token = excluded.lease_token,
2053 lease_fencing_token = excluded.lease_fencing_token,
2054 lease_claimed_at_ms = excluded.lease_claimed_at_ms,
2055 lease_expires_at_ms = excluded.lease_expires_at_ms",
2056 params![
2057 session_id,
2058 owner.owner_id,
2059 owner.incarnation_id,
2060 liveness_json,
2061 lease_token,
2062 fencing_token as i64,
2063 now as i64,
2064 expires_at as i64
2065 ],
2066 )
2067 .map_err(sqlite_error)?;
2068 Ok(SessionExecutionLease {
2069 session_id: session_id.to_string(),
2070 owner: owner.clone(),
2071 lease_token,
2072 fencing_token,
2073 claimed_at_epoch_ms: now,
2074 expires_at_epoch_ms: expires_at,
2075 })
2076}
2077
2078fn ensure_session_execution_lease_conn(
2079 conn: &Connection,
2080 session_id: &str,
2081 fence: &SessionExecutionLeaseFence,
2082) -> Result<(), StoreError> {
2083 if fence.session_id != session_id {
2084 return Err(StoreError::SessionExecutionLeaseExpired {
2085 session_id: session_id.to_string(),
2086 });
2087 }
2088 let now = current_epoch_ms();
2089 let current = load_session_execution_lease_row_conn(conn, session_id)?;
2090 let Some(current) = current else {
2091 return Err(StoreError::SessionExecutionLeaseExpired {
2092 session_id: session_id.to_string(),
2093 });
2094 };
2095 if current
2096 .owner
2097 .as_ref()
2098 .is_some_and(|owner| owner.same_incarnation(&fence.owner))
2099 && current.lease_token.as_deref() == Some(fence.lease_token.as_str())
2100 && current.fencing_token == fence.fencing_token
2101 && current.expires_at_ms > now
2102 {
2103 Ok(())
2104 } else {
2105 Err(StoreError::SessionExecutionLeaseExpired {
2106 session_id: session_id.to_string(),
2107 })
2108 }
2109}
2110
2111fn release_session_execution_lease_conn(
2112 conn: &Connection,
2113 completion: &SessionExecutionLeaseCompletion,
2114) -> Result<(), StoreError> {
2115 conn.execute(
2116 "UPDATE session_execution_leases
2117 SET lease_owner_id = NULL,
2118 lease_owner_incarnation_id = NULL,
2119 lease_owner_liveness_json = NULL,
2120 lease_token = NULL,
2121 lease_claimed_at_ms = 0,
2122 lease_expires_at_ms = 0
2123 WHERE session_id = ?1
2124 AND lease_owner_id = ?2
2125 AND lease_owner_incarnation_id = ?3
2126 AND lease_token = ?4
2127 AND lease_fencing_token = ?5",
2128 params![
2129 completion.session_id,
2130 completion.owner.owner_id,
2131 completion.owner.incarnation_id,
2132 completion.lease_token,
2133 completion.fencing_token as i64
2134 ],
2135 )
2136 .map_err(sqlite_error)?;
2137 Ok(())
2138}