Skip to main content

lash_sqlite_store/
persistence.rs

1//! The [`RuntimePersistence`] trait implementation for [`Store`].
2//!
3//! This is the tokio-rusqlite port of the prior store's `persistence.rs`. The
4//! public surface is byte-for-byte the prior store async trait: identical method
5//! names and signatures, so consumers swap backends with a path rename only.
6//!
7//! The translation rules (see `conn.rs`, `lifecycle.rs`, `blobs.rs`):
8//!
9//! * Pure reads run through `self.conn.call(move |conn| { ... })`.
10//! * Read-then-write paths run through `self.conn.write(move |tx| { ... })`
11//!   (`BEGIN IMMEDIATE`, commit on `Ok`, rollback on `Err`) — this is the
12//!   cross-process write-lock guard.
13//! * Paths that may abandon partially-applied writes (the queued-work claim)
14//!   run through `self.conn.write_flow`, deciding commit vs rollback via
15//!   [`TxOutcome`].
16//! * The shared `*_conn` helpers (`try_load_session_head_meta_from_conn`,
17//!   `Self::put_checkpoint_conn`, `Self::load_usage_deltas_conn`,
18//!   `Self::load_session_graph_from_conn`, the queued-work helpers, …) are
19//!   synchronous and take a `&rusqlite::Connection`, so they are reused from
20//!   inside these closures (a `&Transaction` derefs to `&Connection`).
21//! * Closures must be `'static` + `Send`: every borrow of `self`/caller data is
22//!   cloned into an owned value before being moved in.
23
24use super::*;
25
26#[async_trait::async_trait]
27impl RuntimePersistence for Store {
28    fn durability_tier(&self) -> DurabilityTier {
29        DurabilityTier::Durable
30    }
31
32    async fn load_session(
33        &self,
34        scope: SessionReadScope,
35    ) -> Result<Option<PersistedSessionRead>, StoreError> {
36        self.conn
37            .call(move |conn| {
38                let outcome: Result<Option<PersistedSessionRead>, StoreError> = (|| {
39                    let Some(meta) = try_load_session_head_meta_from_conn(conn)? else {
40                        return Ok(None);
41                    };
42                    let leaf_node_id = match &scope {
43                        SessionReadScope::FullGraph => meta.leaf_node_id.clone(),
44                        SessionReadScope::ActivePath { leaf_node_id } => {
45                            leaf_node_id.clone().or_else(|| meta.leaf_node_id.clone())
46                        }
47                    };
48                    let mut graph = match scope {
49                        SessionReadScope::FullGraph => {
50                            Self::load_session_graph_from_conn(conn, meta.leaf_node_id.clone())
51                        }
52                        SessionReadScope::ActivePath { .. } => {
53                            Self::load_active_path_session_graph_from_conn(
54                                conn,
55                                leaf_node_id.clone(),
56                            )
57                            .map_err(sqlite_error)?
58                        }
59                    };
60                    graph.set_leaf_node_id(leaf_node_id);
61                    let checkpoint = meta
62                        .checkpoint_ref
63                        .as_ref()
64                        .and_then(|blob_ref| Self::get_checkpoint_conn(conn, blob_ref));
65                    Ok(Some(PersistedSessionRead {
66                        session_id: meta.session_id,
67                        head_revision: meta.head_revision,
68                        config: meta.config,
69                        agent_frames: meta.agent_frames,
70                        current_agent_frame_id: meta.current_agent_frame_id,
71                        graph,
72                        checkpoint_ref: meta.checkpoint_ref,
73                        checkpoint,
74                        token_ledger: merge_token_ledger_entries(Self::load_usage_deltas_conn(
75                            conn,
76                        )),
77                    }))
78                })(
79                );
80                Ok(outcome)
81            })
82            .await
83            .map_err(sqlite_error)?
84    }
85
86    async fn load_node(
87        &self,
88        node_id: &str,
89    ) -> Result<Option<lash_core::SessionNodeRecord>, StoreError> {
90        let node_id = node_id.to_string();
91        let row: Option<String> = self
92            .conn
93            .call(move |conn| {
94                conn.query_row(
95                    "SELECT node_json FROM graph_nodes WHERE node_id = ?1 AND tombstoned = 0",
96                    params![node_id],
97                    |row| row.get(0),
98                )
99                .optional()
100            })
101            .await
102            .map_err(sqlite_error)?;
103        Ok(row.and_then(|json| serde_json::from_str(&json).ok()))
104    }
105
106    async fn commit_runtime_state(
107        &self,
108        commit: RuntimeCommit,
109    ) -> Result<RuntimeCommitResult, StoreError> {
110        let blob_profile = self.options.blob_profile;
111        let result = self
112            .conn
113            .write_flow(move |tx| {
114                let outcome: Result<RuntimeCommitResult, StoreError> = (|| {
115                    let existing = try_load_session_head_meta_from_conn(tx)?;
116                    if let Some(bound_session_id) =
117                        existing.as_ref().map(|meta| meta.session_id.as_str())
118                        && bound_session_id != commit.session_id
119                    {
120                        return Err(StoreError::SessionBindingMismatch {
121                            bound_session_id: bound_session_id.to_string(),
122                            attempted_session_id: commit.session_id.clone(),
123                        });
124                    }
125                    if let Some(completed) = &commit.turn_commit {
126                        if completed.session_id != commit.session_id {
127                            return Err(StoreError::RuntimeTurnCommitConflict {
128                                session_id: completed.session_id.clone(),
129                                turn_id: completed.turn_id.clone(),
130                            });
131                        }
132                        let prior: Option<(String, String)> = tx
133                            .query_row(
134                                "SELECT turn_commit_hash, result_json FROM runtime_turn_commits
135                                 WHERE session_id = ?1 AND turn_id = ?2",
136                                params![completed.session_id, completed.turn_id],
137                                |row| Ok((row.get(0)?, row.get(1)?)),
138                            )
139                            .optional()
140                            .map_err(sqlite_error)?;
141                        if let Some((turn_commit_hash, result_json)) = prior {
142                            if turn_commit_hash == completed.turn_commit_hash {
143                                let result: RuntimeCommitResult =
144                                    serde_json::from_str(&result_json).map_err(|err| {
145                                        StoreError::Backend(format!(
146                                            "failed to decode runtime turn commit result: {err}"
147                                        ))
148                                    })?;
149                                if let Some(completion) =
150                                    commit.release_session_execution_lease.as_ref()
151                                {
152                                    release_session_execution_lease_conn(tx, completion)?;
153                                }
154                                return Ok(result);
155                            }
156                            return Err(StoreError::RuntimeTurnCommitConflict {
157                                session_id: completed.session_id.clone(),
158                                turn_id: completed.turn_id.clone(),
159                            });
160                        }
161                    }
162                    let Some(session_execution_lease) = commit.session_execution_lease.as_ref()
163                    else {
164                        return Err(StoreError::SessionExecutionLeaseExpired {
165                            session_id: commit.session_id.clone(),
166                        });
167                    };
168                    ensure_session_execution_lease_conn(
169                        tx,
170                        &commit.session_id,
171                        session_execution_lease,
172                    )?;
173                    let actual_revision = existing.as_ref().map_or(0, |meta| meta.head_revision);
174                    if commit.expected_head_revision.is_some()
175                        && commit.expected_head_revision != Some(actual_revision)
176                    {
177                        return Err(StoreError::HeadRevisionConflict {
178                            expected: commit.expected_head_revision,
179                            actual: actual_revision,
180                        });
181                    }
182                    for completed in &commit.completed_queue_claims {
183                        if completed.session_id != commit.session_id {
184                            return Err(StoreError::QueuedWorkClaimExpired {
185                                session_id: completed.session_id.clone(),
186                                claim_id: completed.claim_id.clone(),
187                            });
188                        }
189                        ensure_queued_work_completion_conn(tx, completed)?;
190                    }
191
192                    let stored_checkpoint =
193                        Self::put_checkpoint_conn(tx, &commit.checkpoint, blob_profile)
194                            .map_err(sqlite_error)?;
195
196                    if !commit.usage_deltas.is_empty() {
197                        let mut stmt = tx
198                            .prepare(
199                                "INSERT INTO usage_deltas (
200                                    source, model, input_tokens, output_tokens, cached_input_tokens, reasoning_tokens
201                                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
202                            )
203                            .map_err(sqlite_error)?;
204                        for entry in &commit.usage_deltas {
205                            stmt.execute(params![
206                                entry.source,
207                                entry.model,
208                                entry.usage.input_tokens,
209                                entry.usage.output_tokens,
210                                entry.usage.cached_input_tokens,
211                                entry.usage.reasoning_tokens,
212                            ])
213                            .map_err(sqlite_error)?;
214                        }
215                    }
216
217                    let leaf_node_id = match &commit.graph {
218                        GraphCommitDelta::Unchanged { leaf_node_id } => leaf_node_id.clone(),
219                        GraphCommitDelta::Append {
220                            nodes,
221                            leaf_node_id,
222                        } => {
223                            for node in nodes {
224                                let node_json = encode_json(node);
225                                tx.execute(
226                                    "INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)",
227                                    params![node.node_id, node_json],
228                                )
229                                .map_err(sqlite_error)?;
230                            }
231                            leaf_node_id.clone()
232                        }
233                        GraphCommitDelta::ReplaceFull(graph) => {
234                            tx.execute("DELETE FROM graph_nodes", [])
235                                .map_err(sqlite_error)?;
236                            for node in &graph.nodes {
237                                let node_json = encode_json(node);
238                                tx.execute(
239                                    "INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)",
240                                    params![node.node_id, node_json],
241                                )
242                                .map_err(sqlite_error)?;
243                            }
244                            graph.leaf_node_id.clone()
245                        }
246                    };
247                    let graph_node_count: usize = tx
248                        .query_row(
249                            "SELECT COUNT(*) FROM graph_nodes WHERE tombstoned = 0",
250                            [],
251                            |row| row.get::<_, i64>(0),
252                        )
253                        .map_err(sqlite_error)? as usize;
254                    let next_revision = actual_revision + 1;
255                    let meta = SessionHeadMeta {
256                        session_id: commit.session_id.clone(),
257                        head_revision: next_revision,
258                        config: commit.config.clone(),
259                        agent_frames: commit.agent_frames.clone(),
260                        current_agent_frame_id: commit.current_agent_frame_id.clone(),
261                        checkpoint_ref: Some(stored_checkpoint.checkpoint_ref.clone()),
262                        leaf_node_id,
263                        graph_node_count,
264                        token_ledger: Vec::new(),
265                    };
266                    tx.execute(
267                        "INSERT OR REPLACE INTO session_head (singleton, session_id, head_json, head_revision)
268                         VALUES (1, ?1, ?2, ?3)",
269                        params![
270                            meta.session_id,
271                            encode_json(&meta),
272                            meta.head_revision as i64
273                        ],
274                    )
275                    .map_err(sqlite_error)?;
276                    for completed in &commit.completed_queue_claims {
277                        for batch_id in &completed.batch_ids {
278                            tx.execute(
279                                "DELETE FROM queued_work_batches
280                                 WHERE session_id = ?1
281                                   AND batch_id = ?2
282                                   AND claim_id = ?3
283                                   AND claim_token = ?4",
284                                params![
285                                    completed.session_id,
286                                    batch_id,
287                                    completed.claim_id,
288                                    completed.lease_token
289                                ],
290                            )
291                            .map_err(sqlite_error)?;
292                        }
293                    }
294                    if !commit.committed_attachment_ids.is_empty() {
295                        let now = current_epoch_ms() as i64;
296                        let mut stmt = tx
297                            .prepare(
298                                "UPDATE attachment_manifest
299                                 SET committed_at_ms = COALESCE(committed_at_ms, ?1)
300                                 WHERE attachment_id = ?2 AND session_id = ?3",
301                            )
302                            .map_err(sqlite_error)?;
303                        for id in &commit.committed_attachment_ids {
304                            stmt.execute(params![now, id.as_str(), commit.session_id])
305                                .map_err(sqlite_error)?;
306                        }
307                    }
308                    let result = RuntimeCommitResult {
309                        head_revision: next_revision,
310                        checkpoint_ref: stored_checkpoint.checkpoint_ref,
311                        manifest: stored_checkpoint.manifest,
312                    };
313                    if let Some(completed) = &commit.turn_commit {
314                        tx.execute(
315                            "INSERT INTO runtime_turn_commits (
316                                session_id, turn_id, turn_commit_hash, result_json, committed_at_ms
317                             )
318                             VALUES (?1, ?2, ?3, ?4, ?5)",
319                            params![
320                                completed.session_id,
321                                completed.turn_id,
322                                completed.turn_commit_hash,
323                                encode_json(&result),
324                                current_epoch_ms() as i64
325                            ],
326                        )
327                        .map_err(sqlite_error)?;
328                    }
329                    if let Some(completion) = commit.release_session_execution_lease.as_ref() {
330                        release_session_execution_lease_conn(tx, completion)?;
331                    }
332                    Ok(result)
333                })();
334                // Roll back on a `StoreError` so a failure after the first
335                // write (e.g. a head-revision conflict surfaced mid-commit, or a
336                // backend write error) does not leave the partial transaction
337                // committed, while still carrying the typed error to the caller.
338                match outcome {
339                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
340                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
341                }
342            })
343            .await
344            .map_err(sqlite_error)??;
345        self.maybe_auto_gc().await;
346        Ok(result)
347    }
348
349    async fn try_claim_session_execution_lease(
350        &self,
351        session_id: &str,
352        owner_id: &str,
353        lease_ttl_ms: u64,
354    ) -> Result<Option<SessionExecutionLease>, StoreError> {
355        let session_id = session_id.to_string();
356        let owner_id = owner_id.to_string();
357        self.conn
358            .write_flow(move |tx| {
359                let outcome: Result<Option<SessionExecutionLease>, StoreError> = (|| {
360                    let now = current_epoch_ms();
361                    let current = load_session_execution_lease_row_conn(tx, &session_id)?;
362                    if current.as_ref().is_some_and(|lease| {
363                        lease.lease_token.is_some() && lease.expires_at_ms > now
364                    }) {
365                        let current = current.expect("checked current lease is present");
366                        if current.owner_id.as_deref() == Some(owner_id.as_str()) {
367                            let expires_at = now.saturating_add(lease_ttl_ms);
368                            tx.execute(
369                                "UPDATE session_execution_leases
370                                 SET lease_expires_at_ms = ?2
371                                 WHERE session_id = ?1",
372                                params![session_id, expires_at as i64],
373                            )
374                            .map_err(sqlite_error)?;
375                            return Ok(Some(SessionExecutionLease {
376                                session_id,
377                                owner_id,
378                                lease_token: current.lease_token.expect("live lease token set"),
379                                fencing_token: current.fencing_token,
380                                claimed_at_epoch_ms: current.claimed_at_ms,
381                                expires_at_epoch_ms: expires_at,
382                            }));
383                        }
384                        return Ok(None);
385                    }
386                    let fencing_token = current
387                        .as_ref()
388                        .map_or(0, |lease| lease.fencing_token)
389                        .saturating_add(1);
390                    let lease_token = format!("{session_id}:{owner_id}:{now}:{fencing_token}");
391                    let expires_at = now.saturating_add(lease_ttl_ms);
392                    tx.execute(
393                        "INSERT INTO session_execution_leases (
394                            session_id, lease_owner_id, lease_token, lease_fencing_token,
395                            lease_claimed_at_ms, lease_expires_at_ms
396                         )
397                         VALUES (?1, ?2, ?3, ?4, ?5, ?6)
398                         ON CONFLICT(session_id) DO UPDATE SET
399                            lease_owner_id = excluded.lease_owner_id,
400                            lease_token = excluded.lease_token,
401                            lease_fencing_token = excluded.lease_fencing_token,
402                            lease_claimed_at_ms = excluded.lease_claimed_at_ms,
403                            lease_expires_at_ms = excluded.lease_expires_at_ms",
404                        params![
405                            session_id,
406                            owner_id,
407                            lease_token,
408                            fencing_token as i64,
409                            now as i64,
410                            expires_at as i64
411                        ],
412                    )
413                    .map_err(sqlite_error)?;
414                    Ok(Some(SessionExecutionLease {
415                        session_id,
416                        owner_id,
417                        lease_token,
418                        fencing_token,
419                        claimed_at_epoch_ms: now,
420                        expires_at_epoch_ms: expires_at,
421                    }))
422                })(
423                );
424                match outcome {
425                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
426                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
427                }
428            })
429            .await
430            .map_err(sqlite_error)?
431    }
432
433    async fn renew_session_execution_lease(
434        &self,
435        fence: &SessionExecutionLeaseFence,
436        lease_ttl_ms: u64,
437    ) -> Result<SessionExecutionLease, StoreError> {
438        let fence = fence.clone();
439        self.conn
440            .write_flow(move |tx| {
441                let outcome: Result<SessionExecutionLease, StoreError> = (|| {
442                    let now = current_epoch_ms();
443                    let current = load_session_execution_lease_row_conn(tx, &fence.session_id)?;
444                    let Some(current) = current else {
445                        return Err(StoreError::SessionExecutionLeaseExpired {
446                            session_id: fence.session_id.clone(),
447                        });
448                    };
449                    if current.owner_id.as_deref() != Some(fence.owner_id.as_str())
450                        || current.lease_token.as_deref() != Some(fence.lease_token.as_str())
451                        || current.fencing_token != fence.fencing_token
452                        || current.expires_at_ms <= now
453                    {
454                        return Err(StoreError::SessionExecutionLeaseExpired {
455                            session_id: fence.session_id.clone(),
456                        });
457                    }
458                    let expires_at = now.saturating_add(lease_ttl_ms);
459                    tx.execute(
460                        "UPDATE session_execution_leases
461                         SET lease_expires_at_ms = ?5
462                         WHERE session_id = ?1
463                           AND lease_owner_id = ?2
464                           AND lease_token = ?3
465                           AND lease_fencing_token = ?4",
466                        params![
467                            fence.session_id,
468                            fence.owner_id,
469                            fence.lease_token,
470                            fence.fencing_token as i64,
471                            expires_at as i64
472                        ],
473                    )
474                    .map_err(sqlite_error)?;
475                    Ok(SessionExecutionLease {
476                        session_id: fence.session_id,
477                        owner_id: fence.owner_id,
478                        lease_token: fence.lease_token,
479                        fencing_token: fence.fencing_token,
480                        claimed_at_epoch_ms: current.claimed_at_ms,
481                        expires_at_epoch_ms: expires_at,
482                    })
483                })();
484                match outcome {
485                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
486                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
487                }
488            })
489            .await
490            .map_err(sqlite_error)?
491    }
492
493    async fn release_session_execution_lease(
494        &self,
495        completion: &SessionExecutionLeaseCompletion,
496    ) -> Result<(), StoreError> {
497        let completion = completion.clone();
498        self.conn
499            .write_flow(move |tx| {
500                let outcome = release_session_execution_lease_conn(tx, &completion);
501                match outcome {
502                    Ok(()) => Ok(TxOutcome::Commit(Ok(()))),
503                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
504                }
505            })
506            .await
507            .map_err(sqlite_error)?
508    }
509
510    async fn enqueue_queued_work(
511        &self,
512        batch: QueuedWorkBatchDraft,
513    ) -> Result<QueuedWorkBatch, StoreError> {
514        let nonce = self.commit_count.fetch_add(1, AtomicOrdering::Relaxed);
515        self.conn
516            .write_flow(move |tx| {
517                let outcome: Result<QueuedWorkBatch, StoreError> = (|| {
518                    if let Some(source_key) = batch.source_key.as_deref() {
519                        let existing_id: Option<String> = tx
520                            .query_row(
521                                "SELECT batch_id
522                                 FROM queued_work_batches
523                                 WHERE session_id = ?1 AND source_key = ?2",
524                                params![batch.session_id, source_key],
525                                |row| row.get(0),
526                            )
527                            .optional()
528                            .map_err(sqlite_error)?;
529                        if let Some(batch_id) = existing_id {
530                            let existing = load_queued_batch_by_id_conn(tx, &batch_id)?
531                                .ok_or_else(|| {
532                                    StoreError::Backend(
533                                        "queued work source row disappeared".to_string(),
534                                    )
535                                })?;
536                            return Ok(existing);
537                        }
538                    }
539                    let now = current_epoch_ms();
540                    let batch_id =
541                        derive_batch_id(&batch.session_id, batch.source_key.as_deref(), now, Some(nonce));
542                    tx.execute(
543                        "INSERT INTO queued_work_batches (
544                            batch_id, session_id, source_key, delivery_policy, slot_policy,
545                            merge_key_json, available_at_ms, enqueued_at_ms
546                         )
547                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
548                        params![
549                            batch_id,
550                            batch.session_id,
551                            batch.source_key.as_deref(),
552                            batch.delivery_policy.as_str(),
553                            batch.slot_policy.as_str(),
554                            encode_json(&batch.merge_key),
555                            batch.available_at_ms as i64,
556                            now as i64,
557                        ],
558                    )
559                    .map_err(sqlite_error)?;
560                    for (index, payload) in batch.payloads.iter().enumerate() {
561                        let item_id = format!("{batch_id}:item:{index}");
562                        tx.execute(
563                            "INSERT INTO queued_work_items (batch_id, item_index, item_id, payload_json)
564                             VALUES (?1, ?2, ?3, ?4)",
565                            params![batch_id, index as i64, item_id, encode_json(payload)],
566                        )
567                        .map_err(sqlite_error)?;
568                    }
569                    load_queued_batch_by_id_conn(tx, &batch_id)?.ok_or_else(|| {
570                        StoreError::Backend("queued work insert disappeared".to_string())
571                    })
572                })();
573                // Roll back the partially-inserted batch/items on a
574                // `StoreError` while still returning the typed error.
575                match outcome {
576                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
577                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
578                }
579            })
580            .await
581            .map_err(sqlite_error)?
582    }
583
584    async fn claim_ready_queued_work(
585        &self,
586        session_id: &str,
587        session_execution_lease: &SessionExecutionLeaseFence,
588        owner_id: &str,
589        boundary: QueuedWorkClaimBoundary,
590        lease_ttl_ms: u64,
591        max_batches: usize,
592    ) -> Result<Option<QueuedWorkClaim>, StoreError> {
593        if max_batches == 0 {
594            return Ok(None);
595        }
596        let session_id = session_id.to_string();
597        let session_execution_lease = session_execution_lease.clone();
598        let owner_id = owner_id.to_string();
599        self.conn
600            .write_flow(move |tx| {
601                let outcome: Result<TxOutcome<Option<QueuedWorkClaim>>, StoreError> = (|| {
602                    ensure_session_execution_lease_conn(
603                        tx,
604                        &session_id,
605                        &session_execution_lease,
606                    )?;
607                    let now = current_epoch_ms();
608                    let candidate_rows = {
609                        let mut stmt = tx
610                            .prepare(
611                                "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
612                                        slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
613                                        claim_fencing_token
614                                 FROM queued_work_batches
615                                 WHERE session_id = ?1
616                                   AND available_at_ms <= ?2
617                                   AND (claim_token IS NULL OR claim_expires_at_ms <= ?2)
618                                 ORDER BY enqueue_seq ASC
619                                 LIMIT ?3",
620                            )
621                            .map_err(sqlite_error)?;
622                        let rows = stmt
623                            .query_map(
624                                params![session_id, now as i64, claim_scan_limit(max_batches)],
625                                queued_batch_row_from_sql,
626                            )
627                            .map_err(sqlite_error)?;
628                        rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
629                    };
630                    let candidates = candidate_rows
631                        .iter()
632                        .map(|row| {
633                            Ok(ClaimCandidate {
634                                enqueue_seq: row.enqueue_seq,
635                                claim_fencing_token: row.claim_fencing_token,
636                                delivery_policy: decode_delivery_policy(
637                                    row.delivery_policy.clone(),
638                                )?,
639                                slot_policy: decode_slot_policy(row.slot_policy.clone())?,
640                                merge_key: decode_merge_key(row.merge_key_json.clone())?,
641                            })
642                        })
643                        .collect::<Result<Vec<_>, StoreError>>()?;
644                    let selected_len = select_claim_prefix(&candidates, boundary, max_batches);
645                    if selected_len == 0 {
646                        return Ok(TxOutcome::Commit(None));
647                    }
648                    let mut selected = candidate_rows;
649                    selected.truncate(selected_len);
650                    let lease = QueuedWorkClaimLease::derive(
651                        &candidates[0],
652                        &session_id,
653                        &owner_id,
654                        now,
655                        lease_ttl_ms,
656                    );
657                    for row in &selected {
658                        // Under `BEGIN IMMEDIATE` this connection already holds
659                        // the write lock, but the row could still have been
660                        // claimed by an earlier committed writer (its
661                        // `claim_token` set and not yet expired). The `WHERE`
662                        // clause filters those out, so a 0-row update means we
663                        // lost the race for this batch: treat the whole claim as
664                        // not-won rather than returning a claim that doesn't
665                        // actually own the row.
666                        let claimed = tx
667                            .execute(
668                                "UPDATE queued_work_batches
669                                 SET claim_id = ?3,
670                                     claim_owner_id = ?4,
671                                     claim_token = ?5,
672                                     claim_fencing_token = claim_fencing_token + 1,
673                                     claim_claimed_at_ms = ?6,
674                                     claim_expires_at_ms = ?7
675                                 WHERE session_id = ?1
676                                   AND batch_id = ?2
677                                   AND (claim_token IS NULL OR claim_expires_at_ms <= ?6)",
678                                params![
679                                    session_id,
680                                    row.batch_id,
681                                    lease.claim_id,
682                                    owner_id,
683                                    lease.lease_token,
684                                    now as i64,
685                                    lease.expires_at_epoch_ms as i64
686                                ],
687                            )
688                            .map_err(sqlite_error)?;
689                        if claimed == 0 {
690                            // Lost the race for this batch. Roll back any sibling
691                            // rows we already claimed in this transaction so we
692                            // never return a half-owned claim.
693                            return Ok(TxOutcome::Rollback(None));
694                        }
695                    }
696                    let mut batches = Vec::new();
697                    for row in selected {
698                        batches.push(queued_work_batch_from_conn(tx, row)?);
699                    }
700                    Ok(TxOutcome::Commit(Some(QueuedWorkClaim {
701                        session_id: session_id.clone(),
702                        claim_id: lease.claim_id,
703                        owner_id: owner_id.clone(),
704                        lease_token: lease.lease_token,
705                        fencing_token: lease.fencing_token,
706                        claimed_at_epoch_ms: lease.claimed_at_epoch_ms,
707                        expires_at_epoch_ms: lease.expires_at_epoch_ms,
708                        batches,
709                    })))
710                })();
711                // Lower a `StoreError` into the rollback arm so the closure body
712                // can keep using `?` while still propagating the error to the
713                // caller. Encode it as a `Result` carried out of the flow.
714                match outcome {
715                    Ok(TxOutcome::Commit(value)) => Ok(TxOutcome::Commit(Ok(value))),
716                    Ok(TxOutcome::Rollback(value)) => Ok(TxOutcome::Rollback(Ok(value))),
717                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
718                }
719            })
720            .await
721            .map_err(sqlite_error)?
722    }
723
724    async fn renew_queued_work_claim(
725        &self,
726        claim: &QueuedWorkClaim,
727        lease_ttl_ms: u64,
728    ) -> Result<QueuedWorkClaim, StoreError> {
729        let now = current_epoch_ms();
730        let expires_at = now.saturating_add(lease_ttl_ms);
731        let session_id = claim.session_id.clone();
732        let claim_id = claim.claim_id.clone();
733        let lease_token = claim.lease_token.clone();
734        let changed = self
735            .conn
736            .write(move |tx| {
737                tx.execute(
738                    "UPDATE queued_work_batches
739                     SET claim_expires_at_ms = ?4
740                     WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
741                    params![session_id, claim_id, lease_token, expires_at as i64],
742                )
743            })
744            .await
745            .map_err(sqlite_error)?;
746        renewed_claim(claim, changed, expires_at)
747    }
748
749    async fn abandon_queued_work_claim(&self, claim: &QueuedWorkClaim) -> Result<(), StoreError> {
750        let session_id = claim.session_id.clone();
751        let claim_id = claim.claim_id.clone();
752        let lease_token = claim.lease_token.clone();
753        self.conn
754            .write(move |tx| {
755                tx.execute(
756                    "UPDATE queued_work_batches
757                     SET claim_id = NULL,
758                         claim_owner_id = NULL,
759                         claim_token = NULL,
760                         claim_claimed_at_ms = 0,
761                         claim_expires_at_ms = 0
762                     WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
763                    params![session_id, claim_id, lease_token],
764                )
765            })
766            .await
767            .map_err(sqlite_error)?;
768        Ok(())
769    }
770
771    async fn cancel_queued_work_batch(
772        &self,
773        session_id: &str,
774        batch_id: &str,
775    ) -> Result<Option<QueuedWorkBatch>, StoreError> {
776        let session_id = session_id.to_string();
777        let batch_id = batch_id.to_string();
778        self.conn
779            .write_flow(move |tx| {
780                let outcome: Result<Option<QueuedWorkBatch>, StoreError> = (|| {
781                    let now = current_epoch_ms() as i64;
782                    let row = tx
783                        .query_row(
784                            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
785                                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
786                                    claim_fencing_token
787                             FROM queued_work_batches
788                             WHERE session_id = ?1
789                               AND batch_id = ?2
790                               AND (claim_token IS NULL OR claim_expires_at_ms <= ?3)",
791                            params![session_id, batch_id, now],
792                            queued_batch_row_from_sql,
793                        )
794                        .optional()
795                        .map_err(sqlite_error)?;
796                    let Some(row) = row else {
797                        return Ok(None);
798                    };
799                    let batch = queued_work_batch_from_conn(tx, row)?;
800                    tx.execute(
801                        "DELETE FROM queued_work_batches
802                         WHERE session_id = ?1
803                           AND batch_id = ?2
804                           AND (claim_token IS NULL OR claim_expires_at_ms <= ?3)",
805                        params![session_id, batch_id, now],
806                    )
807                    .map_err(sqlite_error)?;
808                    Ok(Some(batch))
809                })();
810                match outcome {
811                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
812                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
813                }
814            })
815            .await
816            .map_err(sqlite_error)?
817    }
818
819    async fn list_queued_work(&self, session_id: &str) -> Result<Vec<QueuedWorkBatch>, StoreError> {
820        let session_id = session_id.to_string();
821        self.conn
822            .call(move |conn| {
823                let outcome: Result<Vec<QueuedWorkBatch>, StoreError> = (|| {
824                    let rows = {
825                        let mut stmt = conn
826                            .prepare(
827                                "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
828                                        slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
829                                        claim_fencing_token
830                                 FROM queued_work_batches
831                                 WHERE session_id = ?1
832                                 ORDER BY enqueue_seq ASC",
833                            )
834                            .map_err(sqlite_error)?;
835                        let rows = stmt
836                            .query_map(params![session_id], queued_batch_row_from_sql)
837                            .map_err(sqlite_error)?;
838                        rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
839                    };
840                    rows.into_iter()
841                        .map(|row| queued_work_batch_from_conn(conn, row))
842                        .collect()
843                })();
844                Ok(outcome)
845            })
846            .await
847            .map_err(sqlite_error)?
848    }
849
850    async fn list_pending_queued_work(
851        &self,
852        session_id: &str,
853    ) -> Result<Vec<QueuedWorkBatch>, StoreError> {
854        let session_id = session_id.to_string();
855        self.conn
856            .call(move |conn| {
857                let outcome: Result<Vec<QueuedWorkBatch>, StoreError> = (|| {
858                    let now = current_epoch_ms();
859                    let rows = {
860                        let mut stmt = conn
861                            .prepare(
862                                "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
863                                        slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
864                                        claim_fencing_token
865                                 FROM queued_work_batches
866                                 WHERE session_id = ?1
867                                   AND (claim_token IS NULL OR claim_expires_at_ms <= ?2)
868                                 ORDER BY enqueue_seq ASC",
869                            )
870                            .map_err(sqlite_error)?;
871                        let rows = stmt
872                            .query_map(
873                                params![session_id, now as i64],
874                                queued_batch_row_from_sql,
875                            )
876                            .map_err(sqlite_error)?;
877                        rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
878                    };
879                    rows.into_iter()
880                        .map(|row| queued_work_batch_from_conn(conn, row))
881                        .collect()
882                })();
883                Ok(outcome)
884            })
885            .await
886            .map_err(sqlite_error)?
887    }
888
889    async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError> {
890        Store::save_session_meta(self, meta).await;
891        Ok(())
892    }
893
894    async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError> {
895        Ok(Store::load_session_meta(self).await)
896    }
897
898    async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError> {
899        if ids.is_empty() {
900            return Ok(());
901        }
902        let ids = ids.to_vec();
903        self.conn
904            .write(move |tx| {
905                let mut stmt =
906                    tx.prepare("UPDATE graph_nodes SET tombstoned = 1 WHERE node_id = ?1")?;
907                for id in &ids {
908                    stmt.execute(params![id])?;
909                }
910                Ok(())
911            })
912            .await
913            .map_err(sqlite_error)
914    }
915
916    async fn vacuum(&self) -> Result<VacuumReport, StoreError> {
917        let removed = self
918            .conn
919            .write(move |tx| tx.execute("DELETE FROM graph_nodes WHERE tombstoned = 1", []))
920            .await
921            .map_err(sqlite_error)?;
922        Ok(VacuumReport {
923            removed_node_count: removed,
924        })
925    }
926
927    async fn gc_unreachable(&self) -> Result<GcReport, StoreError> {
928        Ok(Store::gc_unreachable(self).await)
929    }
930}
931
932struct SessionExecutionLeaseRow {
933    owner_id: Option<String>,
934    lease_token: Option<String>,
935    fencing_token: u64,
936    claimed_at_ms: u64,
937    expires_at_ms: u64,
938}
939
940fn load_session_execution_lease_row_conn(
941    conn: &Connection,
942    session_id: &str,
943) -> Result<Option<SessionExecutionLeaseRow>, StoreError> {
944    let row = conn
945        .query_row(
946            "SELECT lease_owner_id, lease_token, lease_fencing_token,
947                    lease_claimed_at_ms, lease_expires_at_ms
948             FROM session_execution_leases
949             WHERE session_id = ?1",
950            params![session_id],
951            |row| {
952                Ok(SessionExecutionLeaseRow {
953                    owner_id: row.get(0)?,
954                    lease_token: row.get(1)?,
955                    fencing_token: row.get::<_, i64>(2)? as u64,
956                    claimed_at_ms: row.get::<_, i64>(3)? as u64,
957                    expires_at_ms: row.get::<_, i64>(4)? as u64,
958                })
959            },
960        )
961        .optional()
962        .map_err(sqlite_error)?;
963    Ok(row)
964}
965
966fn ensure_session_execution_lease_conn(
967    conn: &Connection,
968    session_id: &str,
969    fence: &SessionExecutionLeaseFence,
970) -> Result<(), StoreError> {
971    if fence.session_id != session_id {
972        return Err(StoreError::SessionExecutionLeaseExpired {
973            session_id: session_id.to_string(),
974        });
975    }
976    let now = current_epoch_ms();
977    let current = load_session_execution_lease_row_conn(conn, session_id)?;
978    let Some(current) = current else {
979        return Err(StoreError::SessionExecutionLeaseExpired {
980            session_id: session_id.to_string(),
981        });
982    };
983    if current.owner_id.as_deref() == Some(fence.owner_id.as_str())
984        && current.lease_token.as_deref() == Some(fence.lease_token.as_str())
985        && current.fencing_token == fence.fencing_token
986        && current.expires_at_ms > now
987    {
988        Ok(())
989    } else {
990        Err(StoreError::SessionExecutionLeaseExpired {
991            session_id: session_id.to_string(),
992        })
993    }
994}
995
996fn release_session_execution_lease_conn(
997    conn: &Connection,
998    completion: &SessionExecutionLeaseCompletion,
999) -> Result<(), StoreError> {
1000    conn.execute(
1001        "UPDATE session_execution_leases
1002         SET lease_owner_id = NULL,
1003             lease_token = NULL,
1004             lease_claimed_at_ms = 0,
1005             lease_expires_at_ms = 0
1006         WHERE session_id = ?1
1007           AND lease_owner_id = ?2
1008           AND lease_token = ?3
1009           AND lease_fencing_token = ?4",
1010        params![
1011            completion.session_id,
1012            completion.owner_id,
1013            completion.lease_token,
1014            completion.fencing_token as i64
1015        ],
1016    )
1017    .map_err(sqlite_error)?;
1018    Ok(())
1019}