1use super::*;
25
26#[async_trait::async_trait]
27impl RuntimePersistence for Store {
28 fn durability_tier(&self) -> DurabilityTier {
29 DurabilityTier::Durable
30 }
31
32 async fn load_session(
33 &self,
34 scope: SessionReadScope,
35 ) -> Result<Option<PersistedSessionRead>, StoreError> {
36 self.conn
37 .call(move |conn| {
38 let outcome: Result<Option<PersistedSessionRead>, StoreError> = (|| {
39 let Some(meta) = try_load_session_head_meta_from_conn(conn)? else {
40 return Ok(None);
41 };
42 let leaf_node_id = match &scope {
43 SessionReadScope::FullGraph => meta.leaf_node_id.clone(),
44 SessionReadScope::ActivePath { leaf_node_id } => {
45 leaf_node_id.clone().or_else(|| meta.leaf_node_id.clone())
46 }
47 };
48 let mut graph = match scope {
49 SessionReadScope::FullGraph => {
50 Self::load_session_graph_from_conn(conn, meta.leaf_node_id.clone())
51 }
52 SessionReadScope::ActivePath { .. } => {
53 Self::load_active_path_session_graph_from_conn(
54 conn,
55 leaf_node_id.clone(),
56 )
57 .map_err(sqlite_error)?
58 }
59 };
60 graph.set_leaf_node_id(leaf_node_id);
61 let checkpoint = meta
62 .checkpoint_ref
63 .as_ref()
64 .and_then(|blob_ref| Self::get_checkpoint_conn(conn, blob_ref));
65 Ok(Some(PersistedSessionRead {
66 session_id: meta.session_id,
67 head_revision: meta.head_revision,
68 config: meta.config,
69 agent_frames: meta.agent_frames,
70 current_agent_frame_id: meta.current_agent_frame_id,
71 graph,
72 checkpoint_ref: meta.checkpoint_ref,
73 checkpoint,
74 token_ledger: merge_token_ledger_entries(Self::load_usage_deltas_conn(
75 conn,
76 )),
77 }))
78 })(
79 );
80 Ok(outcome)
81 })
82 .await
83 .map_err(sqlite_error)?
84 }
85
86 async fn load_node(
87 &self,
88 node_id: &str,
89 ) -> Result<Option<lash_core::SessionNodeRecord>, StoreError> {
90 let node_id = node_id.to_string();
91 let row: Option<String> = self
92 .conn
93 .call(move |conn| {
94 conn.query_row(
95 "SELECT node_json FROM graph_nodes WHERE node_id = ?1 AND tombstoned = 0",
96 params![node_id],
97 |row| row.get(0),
98 )
99 .optional()
100 })
101 .await
102 .map_err(sqlite_error)?;
103 Ok(row.and_then(|json| serde_json::from_str(&json).ok()))
104 }
105
106 async fn commit_runtime_state(
107 &self,
108 commit: RuntimeCommit,
109 ) -> Result<RuntimeCommitResult, StoreError> {
110 let blob_profile = self.options.blob_profile;
111 let result = self
112 .conn
113 .write_flow(move |tx| {
114 let outcome: Result<RuntimeCommitResult, StoreError> = (|| {
115 let existing = try_load_session_head_meta_from_conn(tx)?;
116 if let Some(bound_session_id) =
117 existing.as_ref().map(|meta| meta.session_id.as_str())
118 && bound_session_id != commit.session_id
119 {
120 return Err(StoreError::SessionBindingMismatch {
121 bound_session_id: bound_session_id.to_string(),
122 attempted_session_id: commit.session_id.clone(),
123 });
124 }
125 if let Some(completed) = &commit.turn_commit {
126 if completed.session_id != commit.session_id {
127 return Err(StoreError::RuntimeTurnCommitConflict {
128 session_id: completed.session_id.clone(),
129 turn_id: completed.turn_id.clone(),
130 });
131 }
132 let prior: Option<(String, String)> = tx
133 .query_row(
134 "SELECT turn_commit_hash, result_json FROM runtime_turn_commits
135 WHERE session_id = ?1 AND turn_id = ?2",
136 params![completed.session_id, completed.turn_id],
137 |row| Ok((row.get(0)?, row.get(1)?)),
138 )
139 .optional()
140 .map_err(sqlite_error)?;
141 if let Some((turn_commit_hash, result_json)) = prior {
142 if turn_commit_hash == completed.turn_commit_hash {
143 let result: RuntimeCommitResult =
144 serde_json::from_str(&result_json).map_err(|err| {
145 StoreError::Backend(format!(
146 "failed to decode runtime turn commit result: {err}"
147 ))
148 })?;
149 if let Some(completion) =
150 commit.release_session_execution_lease.as_ref()
151 {
152 release_session_execution_lease_conn(tx, completion)?;
153 }
154 return Ok(result);
155 }
156 return Err(StoreError::RuntimeTurnCommitConflict {
157 session_id: completed.session_id.clone(),
158 turn_id: completed.turn_id.clone(),
159 });
160 }
161 }
162 let Some(session_execution_lease) = commit.session_execution_lease.as_ref()
163 else {
164 return Err(StoreError::SessionExecutionLeaseExpired {
165 session_id: commit.session_id.clone(),
166 });
167 };
168 ensure_session_execution_lease_conn(
169 tx,
170 &commit.session_id,
171 session_execution_lease,
172 )?;
173 let actual_revision = existing.as_ref().map_or(0, |meta| meta.head_revision);
174 if commit.expected_head_revision.is_some()
175 && commit.expected_head_revision != Some(actual_revision)
176 {
177 return Err(StoreError::HeadRevisionConflict {
178 expected: commit.expected_head_revision,
179 actual: actual_revision,
180 });
181 }
182 for completed in &commit.completed_queue_claims {
183 if completed.session_id != commit.session_id {
184 return Err(StoreError::QueuedWorkClaimExpired {
185 session_id: completed.session_id.clone(),
186 claim_id: completed.claim_id.clone(),
187 });
188 }
189 ensure_queued_work_completion_conn(tx, completed)?;
190 }
191
192 let stored_checkpoint =
193 Self::put_checkpoint_conn(tx, &commit.checkpoint, blob_profile)
194 .map_err(sqlite_error)?;
195
196 if !commit.usage_deltas.is_empty() {
197 let mut stmt = tx
198 .prepare(
199 "INSERT INTO usage_deltas (
200 source, model, input_tokens, output_tokens, cached_input_tokens, reasoning_tokens
201 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
202 )
203 .map_err(sqlite_error)?;
204 for entry in &commit.usage_deltas {
205 stmt.execute(params![
206 entry.source,
207 entry.model,
208 entry.usage.input_tokens,
209 entry.usage.output_tokens,
210 entry.usage.cached_input_tokens,
211 entry.usage.reasoning_tokens,
212 ])
213 .map_err(sqlite_error)?;
214 }
215 }
216
217 let leaf_node_id = match &commit.graph {
218 GraphCommitDelta::Unchanged { leaf_node_id } => leaf_node_id.clone(),
219 GraphCommitDelta::Append {
220 nodes,
221 leaf_node_id,
222 } => {
223 for node in nodes {
224 let node_json = encode_json(node);
225 tx.execute(
226 "INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)",
227 params![node.node_id, node_json],
228 )
229 .map_err(sqlite_error)?;
230 }
231 leaf_node_id.clone()
232 }
233 GraphCommitDelta::ReplaceFull(graph) => {
234 tx.execute("DELETE FROM graph_nodes", [])
235 .map_err(sqlite_error)?;
236 for node in &graph.nodes {
237 let node_json = encode_json(node);
238 tx.execute(
239 "INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)",
240 params![node.node_id, node_json],
241 )
242 .map_err(sqlite_error)?;
243 }
244 graph.leaf_node_id.clone()
245 }
246 };
247 let graph_node_count: usize = tx
248 .query_row(
249 "SELECT COUNT(*) FROM graph_nodes WHERE tombstoned = 0",
250 [],
251 |row| row.get::<_, i64>(0),
252 )
253 .map_err(sqlite_error)? as usize;
254 let next_revision = actual_revision + 1;
255 let meta = SessionHeadMeta {
256 session_id: commit.session_id.clone(),
257 head_revision: next_revision,
258 config: commit.config.clone(),
259 agent_frames: commit.agent_frames.clone(),
260 current_agent_frame_id: commit.current_agent_frame_id.clone(),
261 checkpoint_ref: Some(stored_checkpoint.checkpoint_ref.clone()),
262 leaf_node_id,
263 graph_node_count,
264 token_ledger: Vec::new(),
265 };
266 tx.execute(
267 "INSERT OR REPLACE INTO session_head (singleton, session_id, head_json, head_revision)
268 VALUES (1, ?1, ?2, ?3)",
269 params![
270 meta.session_id,
271 encode_json(&meta),
272 meta.head_revision as i64
273 ],
274 )
275 .map_err(sqlite_error)?;
276 for completed in &commit.completed_queue_claims {
277 for batch_id in &completed.batch_ids {
278 tx.execute(
279 "DELETE FROM queued_work_batches
280 WHERE session_id = ?1
281 AND batch_id = ?2
282 AND claim_id = ?3
283 AND claim_token = ?4",
284 params![
285 completed.session_id,
286 batch_id,
287 completed.claim_id,
288 completed.lease_token
289 ],
290 )
291 .map_err(sqlite_error)?;
292 }
293 }
294 if !commit.committed_attachment_ids.is_empty() {
295 let now = current_epoch_ms() as i64;
296 let mut stmt = tx
297 .prepare(
298 "UPDATE attachment_manifest
299 SET committed_at_ms = COALESCE(committed_at_ms, ?1)
300 WHERE attachment_id = ?2 AND session_id = ?3",
301 )
302 .map_err(sqlite_error)?;
303 for id in &commit.committed_attachment_ids {
304 stmt.execute(params![now, id.as_str(), commit.session_id])
305 .map_err(sqlite_error)?;
306 }
307 }
308 let result = RuntimeCommitResult {
309 head_revision: next_revision,
310 checkpoint_ref: stored_checkpoint.checkpoint_ref,
311 manifest: stored_checkpoint.manifest,
312 };
313 if let Some(completed) = &commit.turn_commit {
314 tx.execute(
315 "INSERT INTO runtime_turn_commits (
316 session_id, turn_id, turn_commit_hash, result_json, committed_at_ms
317 )
318 VALUES (?1, ?2, ?3, ?4, ?5)",
319 params![
320 completed.session_id,
321 completed.turn_id,
322 completed.turn_commit_hash,
323 encode_json(&result),
324 current_epoch_ms() as i64
325 ],
326 )
327 .map_err(sqlite_error)?;
328 }
329 if let Some(completion) = commit.release_session_execution_lease.as_ref() {
330 release_session_execution_lease_conn(tx, completion)?;
331 }
332 Ok(result)
333 })();
334 match outcome {
339 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
340 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
341 }
342 })
343 .await
344 .map_err(sqlite_error)??;
345 self.maybe_auto_gc().await;
346 Ok(result)
347 }
348
349 async fn try_claim_session_execution_lease(
350 &self,
351 session_id: &str,
352 owner_id: &str,
353 lease_ttl_ms: u64,
354 ) -> Result<Option<SessionExecutionLease>, StoreError> {
355 let session_id = session_id.to_string();
356 let owner_id = owner_id.to_string();
357 self.conn
358 .write_flow(move |tx| {
359 let outcome: Result<Option<SessionExecutionLease>, StoreError> = (|| {
360 let now = current_epoch_ms();
361 let current = load_session_execution_lease_row_conn(tx, &session_id)?;
362 if current.as_ref().is_some_and(|lease| {
363 lease.lease_token.is_some() && lease.expires_at_ms > now
364 }) {
365 let current = current.expect("checked current lease is present");
366 if current.owner_id.as_deref() == Some(owner_id.as_str()) {
367 let expires_at = now.saturating_add(lease_ttl_ms);
368 tx.execute(
369 "UPDATE session_execution_leases
370 SET lease_expires_at_ms = ?2
371 WHERE session_id = ?1",
372 params![session_id, expires_at as i64],
373 )
374 .map_err(sqlite_error)?;
375 return Ok(Some(SessionExecutionLease {
376 session_id,
377 owner_id,
378 lease_token: current.lease_token.expect("live lease token set"),
379 fencing_token: current.fencing_token,
380 claimed_at_epoch_ms: current.claimed_at_ms,
381 expires_at_epoch_ms: expires_at,
382 }));
383 }
384 return Ok(None);
385 }
386 let fencing_token = current
387 .as_ref()
388 .map_or(0, |lease| lease.fencing_token)
389 .saturating_add(1);
390 let lease_token = format!("{session_id}:{owner_id}:{now}:{fencing_token}");
391 let expires_at = now.saturating_add(lease_ttl_ms);
392 tx.execute(
393 "INSERT INTO session_execution_leases (
394 session_id, lease_owner_id, lease_token, lease_fencing_token,
395 lease_claimed_at_ms, lease_expires_at_ms
396 )
397 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
398 ON CONFLICT(session_id) DO UPDATE SET
399 lease_owner_id = excluded.lease_owner_id,
400 lease_token = excluded.lease_token,
401 lease_fencing_token = excluded.lease_fencing_token,
402 lease_claimed_at_ms = excluded.lease_claimed_at_ms,
403 lease_expires_at_ms = excluded.lease_expires_at_ms",
404 params![
405 session_id,
406 owner_id,
407 lease_token,
408 fencing_token as i64,
409 now as i64,
410 expires_at as i64
411 ],
412 )
413 .map_err(sqlite_error)?;
414 Ok(Some(SessionExecutionLease {
415 session_id,
416 owner_id,
417 lease_token,
418 fencing_token,
419 claimed_at_epoch_ms: now,
420 expires_at_epoch_ms: expires_at,
421 }))
422 })(
423 );
424 match outcome {
425 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
426 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
427 }
428 })
429 .await
430 .map_err(sqlite_error)?
431 }
432
433 async fn renew_session_execution_lease(
434 &self,
435 fence: &SessionExecutionLeaseFence,
436 lease_ttl_ms: u64,
437 ) -> Result<SessionExecutionLease, StoreError> {
438 let fence = fence.clone();
439 self.conn
440 .write_flow(move |tx| {
441 let outcome: Result<SessionExecutionLease, StoreError> = (|| {
442 let now = current_epoch_ms();
443 let current = load_session_execution_lease_row_conn(tx, &fence.session_id)?;
444 let Some(current) = current else {
445 return Err(StoreError::SessionExecutionLeaseExpired {
446 session_id: fence.session_id.clone(),
447 });
448 };
449 if current.owner_id.as_deref() != Some(fence.owner_id.as_str())
450 || current.lease_token.as_deref() != Some(fence.lease_token.as_str())
451 || current.fencing_token != fence.fencing_token
452 || current.expires_at_ms <= now
453 {
454 return Err(StoreError::SessionExecutionLeaseExpired {
455 session_id: fence.session_id.clone(),
456 });
457 }
458 let expires_at = now.saturating_add(lease_ttl_ms);
459 tx.execute(
460 "UPDATE session_execution_leases
461 SET lease_expires_at_ms = ?5
462 WHERE session_id = ?1
463 AND lease_owner_id = ?2
464 AND lease_token = ?3
465 AND lease_fencing_token = ?4",
466 params![
467 fence.session_id,
468 fence.owner_id,
469 fence.lease_token,
470 fence.fencing_token as i64,
471 expires_at as i64
472 ],
473 )
474 .map_err(sqlite_error)?;
475 Ok(SessionExecutionLease {
476 session_id: fence.session_id,
477 owner_id: fence.owner_id,
478 lease_token: fence.lease_token,
479 fencing_token: fence.fencing_token,
480 claimed_at_epoch_ms: current.claimed_at_ms,
481 expires_at_epoch_ms: expires_at,
482 })
483 })();
484 match outcome {
485 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
486 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
487 }
488 })
489 .await
490 .map_err(sqlite_error)?
491 }
492
493 async fn release_session_execution_lease(
494 &self,
495 completion: &SessionExecutionLeaseCompletion,
496 ) -> Result<(), StoreError> {
497 let completion = completion.clone();
498 self.conn
499 .write_flow(move |tx| {
500 let outcome = release_session_execution_lease_conn(tx, &completion);
501 match outcome {
502 Ok(()) => Ok(TxOutcome::Commit(Ok(()))),
503 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
504 }
505 })
506 .await
507 .map_err(sqlite_error)?
508 }
509
510 async fn enqueue_queued_work(
511 &self,
512 batch: QueuedWorkBatchDraft,
513 ) -> Result<QueuedWorkBatch, StoreError> {
514 let nonce = self.commit_count.fetch_add(1, AtomicOrdering::Relaxed);
515 self.conn
516 .write_flow(move |tx| {
517 let outcome: Result<QueuedWorkBatch, StoreError> = (|| {
518 if let Some(source_key) = batch.source_key.as_deref() {
519 let existing_id: Option<String> = tx
520 .query_row(
521 "SELECT batch_id
522 FROM queued_work_batches
523 WHERE session_id = ?1 AND source_key = ?2",
524 params![batch.session_id, source_key],
525 |row| row.get(0),
526 )
527 .optional()
528 .map_err(sqlite_error)?;
529 if let Some(batch_id) = existing_id {
530 let existing = load_queued_batch_by_id_conn(tx, &batch_id)?
531 .ok_or_else(|| {
532 StoreError::Backend(
533 "queued work source row disappeared".to_string(),
534 )
535 })?;
536 return Ok(existing);
537 }
538 }
539 let now = current_epoch_ms();
540 let batch_id =
541 derive_batch_id(&batch.session_id, batch.source_key.as_deref(), now, Some(nonce));
542 tx.execute(
543 "INSERT INTO queued_work_batches (
544 batch_id, session_id, source_key, delivery_policy, slot_policy,
545 merge_key_json, available_at_ms, enqueued_at_ms
546 )
547 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
548 params![
549 batch_id,
550 batch.session_id,
551 batch.source_key.as_deref(),
552 batch.delivery_policy.as_str(),
553 batch.slot_policy.as_str(),
554 encode_json(&batch.merge_key),
555 batch.available_at_ms as i64,
556 now as i64,
557 ],
558 )
559 .map_err(sqlite_error)?;
560 for (index, payload) in batch.payloads.iter().enumerate() {
561 let item_id = format!("{batch_id}:item:{index}");
562 tx.execute(
563 "INSERT INTO queued_work_items (batch_id, item_index, item_id, payload_json)
564 VALUES (?1, ?2, ?3, ?4)",
565 params![batch_id, index as i64, item_id, encode_json(payload)],
566 )
567 .map_err(sqlite_error)?;
568 }
569 load_queued_batch_by_id_conn(tx, &batch_id)?.ok_or_else(|| {
570 StoreError::Backend("queued work insert disappeared".to_string())
571 })
572 })();
573 match outcome {
576 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
577 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
578 }
579 })
580 .await
581 .map_err(sqlite_error)?
582 }
583
584 async fn claim_ready_queued_work(
585 &self,
586 session_id: &str,
587 session_execution_lease: &SessionExecutionLeaseFence,
588 owner_id: &str,
589 boundary: QueuedWorkClaimBoundary,
590 lease_ttl_ms: u64,
591 max_batches: usize,
592 ) -> Result<Option<QueuedWorkClaim>, StoreError> {
593 if max_batches == 0 {
594 return Ok(None);
595 }
596 let session_id = session_id.to_string();
597 let session_execution_lease = session_execution_lease.clone();
598 let owner_id = owner_id.to_string();
599 self.conn
600 .write_flow(move |tx| {
601 let outcome: Result<TxOutcome<Option<QueuedWorkClaim>>, StoreError> = (|| {
602 ensure_session_execution_lease_conn(
603 tx,
604 &session_id,
605 &session_execution_lease,
606 )?;
607 let now = current_epoch_ms();
608 let candidate_rows = {
609 let mut stmt = tx
610 .prepare(
611 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
612 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
613 claim_fencing_token
614 FROM queued_work_batches
615 WHERE session_id = ?1
616 AND available_at_ms <= ?2
617 AND (claim_token IS NULL OR claim_expires_at_ms <= ?2)
618 ORDER BY enqueue_seq ASC
619 LIMIT ?3",
620 )
621 .map_err(sqlite_error)?;
622 let rows = stmt
623 .query_map(
624 params![session_id, now as i64, claim_scan_limit(max_batches)],
625 queued_batch_row_from_sql,
626 )
627 .map_err(sqlite_error)?;
628 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
629 };
630 let candidates = candidate_rows
631 .iter()
632 .map(|row| {
633 Ok(ClaimCandidate {
634 enqueue_seq: row.enqueue_seq,
635 claim_fencing_token: row.claim_fencing_token,
636 delivery_policy: decode_delivery_policy(
637 row.delivery_policy.clone(),
638 )?,
639 slot_policy: decode_slot_policy(row.slot_policy.clone())?,
640 merge_key: decode_merge_key(row.merge_key_json.clone())?,
641 })
642 })
643 .collect::<Result<Vec<_>, StoreError>>()?;
644 let selected_len = select_claim_prefix(&candidates, boundary, max_batches);
645 if selected_len == 0 {
646 return Ok(TxOutcome::Commit(None));
647 }
648 let mut selected = candidate_rows;
649 selected.truncate(selected_len);
650 let lease = QueuedWorkClaimLease::derive(
651 &candidates[0],
652 &session_id,
653 &owner_id,
654 now,
655 lease_ttl_ms,
656 );
657 for row in &selected {
658 let claimed = tx
667 .execute(
668 "UPDATE queued_work_batches
669 SET claim_id = ?3,
670 claim_owner_id = ?4,
671 claim_token = ?5,
672 claim_fencing_token = claim_fencing_token + 1,
673 claim_claimed_at_ms = ?6,
674 claim_expires_at_ms = ?7
675 WHERE session_id = ?1
676 AND batch_id = ?2
677 AND (claim_token IS NULL OR claim_expires_at_ms <= ?6)",
678 params![
679 session_id,
680 row.batch_id,
681 lease.claim_id,
682 owner_id,
683 lease.lease_token,
684 now as i64,
685 lease.expires_at_epoch_ms as i64
686 ],
687 )
688 .map_err(sqlite_error)?;
689 if claimed == 0 {
690 return Ok(TxOutcome::Rollback(None));
694 }
695 }
696 let mut batches = Vec::new();
697 for row in selected {
698 batches.push(queued_work_batch_from_conn(tx, row)?);
699 }
700 Ok(TxOutcome::Commit(Some(QueuedWorkClaim {
701 session_id: session_id.clone(),
702 claim_id: lease.claim_id,
703 owner_id: owner_id.clone(),
704 lease_token: lease.lease_token,
705 fencing_token: lease.fencing_token,
706 claimed_at_epoch_ms: lease.claimed_at_epoch_ms,
707 expires_at_epoch_ms: lease.expires_at_epoch_ms,
708 batches,
709 })))
710 })();
711 match outcome {
715 Ok(TxOutcome::Commit(value)) => Ok(TxOutcome::Commit(Ok(value))),
716 Ok(TxOutcome::Rollback(value)) => Ok(TxOutcome::Rollback(Ok(value))),
717 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
718 }
719 })
720 .await
721 .map_err(sqlite_error)?
722 }
723
724 async fn renew_queued_work_claim(
725 &self,
726 claim: &QueuedWorkClaim,
727 lease_ttl_ms: u64,
728 ) -> Result<QueuedWorkClaim, StoreError> {
729 let now = current_epoch_ms();
730 let expires_at = now.saturating_add(lease_ttl_ms);
731 let session_id = claim.session_id.clone();
732 let claim_id = claim.claim_id.clone();
733 let lease_token = claim.lease_token.clone();
734 let changed = self
735 .conn
736 .write(move |tx| {
737 tx.execute(
738 "UPDATE queued_work_batches
739 SET claim_expires_at_ms = ?4
740 WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
741 params![session_id, claim_id, lease_token, expires_at as i64],
742 )
743 })
744 .await
745 .map_err(sqlite_error)?;
746 renewed_claim(claim, changed, expires_at)
747 }
748
749 async fn abandon_queued_work_claim(&self, claim: &QueuedWorkClaim) -> Result<(), StoreError> {
750 let session_id = claim.session_id.clone();
751 let claim_id = claim.claim_id.clone();
752 let lease_token = claim.lease_token.clone();
753 self.conn
754 .write(move |tx| {
755 tx.execute(
756 "UPDATE queued_work_batches
757 SET claim_id = NULL,
758 claim_owner_id = NULL,
759 claim_token = NULL,
760 claim_claimed_at_ms = 0,
761 claim_expires_at_ms = 0
762 WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
763 params![session_id, claim_id, lease_token],
764 )
765 })
766 .await
767 .map_err(sqlite_error)?;
768 Ok(())
769 }
770
771 async fn cancel_queued_work_batch(
772 &self,
773 session_id: &str,
774 batch_id: &str,
775 ) -> Result<Option<QueuedWorkBatch>, StoreError> {
776 let session_id = session_id.to_string();
777 let batch_id = batch_id.to_string();
778 self.conn
779 .write_flow(move |tx| {
780 let outcome: Result<Option<QueuedWorkBatch>, StoreError> = (|| {
781 let now = current_epoch_ms() as i64;
782 let row = tx
783 .query_row(
784 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
785 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
786 claim_fencing_token
787 FROM queued_work_batches
788 WHERE session_id = ?1
789 AND batch_id = ?2
790 AND (claim_token IS NULL OR claim_expires_at_ms <= ?3)",
791 params![session_id, batch_id, now],
792 queued_batch_row_from_sql,
793 )
794 .optional()
795 .map_err(sqlite_error)?;
796 let Some(row) = row else {
797 return Ok(None);
798 };
799 let batch = queued_work_batch_from_conn(tx, row)?;
800 tx.execute(
801 "DELETE FROM queued_work_batches
802 WHERE session_id = ?1
803 AND batch_id = ?2
804 AND (claim_token IS NULL OR claim_expires_at_ms <= ?3)",
805 params![session_id, batch_id, now],
806 )
807 .map_err(sqlite_error)?;
808 Ok(Some(batch))
809 })();
810 match outcome {
811 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
812 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
813 }
814 })
815 .await
816 .map_err(sqlite_error)?
817 }
818
819 async fn list_queued_work(&self, session_id: &str) -> Result<Vec<QueuedWorkBatch>, StoreError> {
820 let session_id = session_id.to_string();
821 self.conn
822 .call(move |conn| {
823 let outcome: Result<Vec<QueuedWorkBatch>, StoreError> = (|| {
824 let rows = {
825 let mut stmt = conn
826 .prepare(
827 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
828 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
829 claim_fencing_token
830 FROM queued_work_batches
831 WHERE session_id = ?1
832 ORDER BY enqueue_seq ASC",
833 )
834 .map_err(sqlite_error)?;
835 let rows = stmt
836 .query_map(params![session_id], queued_batch_row_from_sql)
837 .map_err(sqlite_error)?;
838 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
839 };
840 rows.into_iter()
841 .map(|row| queued_work_batch_from_conn(conn, row))
842 .collect()
843 })();
844 Ok(outcome)
845 })
846 .await
847 .map_err(sqlite_error)?
848 }
849
850 async fn list_pending_queued_work(
851 &self,
852 session_id: &str,
853 ) -> Result<Vec<QueuedWorkBatch>, StoreError> {
854 let session_id = session_id.to_string();
855 self.conn
856 .call(move |conn| {
857 let outcome: Result<Vec<QueuedWorkBatch>, StoreError> = (|| {
858 let now = current_epoch_ms();
859 let rows = {
860 let mut stmt = conn
861 .prepare(
862 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
863 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
864 claim_fencing_token
865 FROM queued_work_batches
866 WHERE session_id = ?1
867 AND (claim_token IS NULL OR claim_expires_at_ms <= ?2)
868 ORDER BY enqueue_seq ASC",
869 )
870 .map_err(sqlite_error)?;
871 let rows = stmt
872 .query_map(
873 params![session_id, now as i64],
874 queued_batch_row_from_sql,
875 )
876 .map_err(sqlite_error)?;
877 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
878 };
879 rows.into_iter()
880 .map(|row| queued_work_batch_from_conn(conn, row))
881 .collect()
882 })();
883 Ok(outcome)
884 })
885 .await
886 .map_err(sqlite_error)?
887 }
888
889 async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError> {
890 Store::save_session_meta(self, meta).await;
891 Ok(())
892 }
893
894 async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError> {
895 Ok(Store::load_session_meta(self).await)
896 }
897
898 async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError> {
899 if ids.is_empty() {
900 return Ok(());
901 }
902 let ids = ids.to_vec();
903 self.conn
904 .write(move |tx| {
905 let mut stmt =
906 tx.prepare("UPDATE graph_nodes SET tombstoned = 1 WHERE node_id = ?1")?;
907 for id in &ids {
908 stmt.execute(params![id])?;
909 }
910 Ok(())
911 })
912 .await
913 .map_err(sqlite_error)
914 }
915
916 async fn vacuum(&self) -> Result<VacuumReport, StoreError> {
917 let removed = self
918 .conn
919 .write(move |tx| tx.execute("DELETE FROM graph_nodes WHERE tombstoned = 1", []))
920 .await
921 .map_err(sqlite_error)?;
922 Ok(VacuumReport {
923 removed_node_count: removed,
924 })
925 }
926
927 async fn gc_unreachable(&self) -> Result<GcReport, StoreError> {
928 Ok(Store::gc_unreachable(self).await)
929 }
930}
931
932struct SessionExecutionLeaseRow {
933 owner_id: Option<String>,
934 lease_token: Option<String>,
935 fencing_token: u64,
936 claimed_at_ms: u64,
937 expires_at_ms: u64,
938}
939
940fn load_session_execution_lease_row_conn(
941 conn: &Connection,
942 session_id: &str,
943) -> Result<Option<SessionExecutionLeaseRow>, StoreError> {
944 let row = conn
945 .query_row(
946 "SELECT lease_owner_id, lease_token, lease_fencing_token,
947 lease_claimed_at_ms, lease_expires_at_ms
948 FROM session_execution_leases
949 WHERE session_id = ?1",
950 params![session_id],
951 |row| {
952 Ok(SessionExecutionLeaseRow {
953 owner_id: row.get(0)?,
954 lease_token: row.get(1)?,
955 fencing_token: row.get::<_, i64>(2)? as u64,
956 claimed_at_ms: row.get::<_, i64>(3)? as u64,
957 expires_at_ms: row.get::<_, i64>(4)? as u64,
958 })
959 },
960 )
961 .optional()
962 .map_err(sqlite_error)?;
963 Ok(row)
964}
965
966fn ensure_session_execution_lease_conn(
967 conn: &Connection,
968 session_id: &str,
969 fence: &SessionExecutionLeaseFence,
970) -> Result<(), StoreError> {
971 if fence.session_id != session_id {
972 return Err(StoreError::SessionExecutionLeaseExpired {
973 session_id: session_id.to_string(),
974 });
975 }
976 let now = current_epoch_ms();
977 let current = load_session_execution_lease_row_conn(conn, session_id)?;
978 let Some(current) = current else {
979 return Err(StoreError::SessionExecutionLeaseExpired {
980 session_id: session_id.to_string(),
981 });
982 };
983 if current.owner_id.as_deref() == Some(fence.owner_id.as_str())
984 && current.lease_token.as_deref() == Some(fence.lease_token.as_str())
985 && current.fencing_token == fence.fencing_token
986 && current.expires_at_ms > now
987 {
988 Ok(())
989 } else {
990 Err(StoreError::SessionExecutionLeaseExpired {
991 session_id: session_id.to_string(),
992 })
993 }
994}
995
996fn release_session_execution_lease_conn(
997 conn: &Connection,
998 completion: &SessionExecutionLeaseCompletion,
999) -> Result<(), StoreError> {
1000 conn.execute(
1001 "UPDATE session_execution_leases
1002 SET lease_owner_id = NULL,
1003 lease_token = NULL,
1004 lease_claimed_at_ms = 0,
1005 lease_expires_at_ms = 0
1006 WHERE session_id = ?1
1007 AND lease_owner_id = ?2
1008 AND lease_token = ?3
1009 AND lease_fencing_token = ?4",
1010 params![
1011 completion.session_id,
1012 completion.owner_id,
1013 completion.lease_token,
1014 completion.fencing_token as i64
1015 ],
1016 )
1017 .map_err(sqlite_error)?;
1018 Ok(())
1019}