Skip to main content

lash_sqlite_store/
persistence.rs

1//! The [`RuntimePersistence`] trait implementation for [`Store`].
2//!
3//! This is the tokio-rusqlite port of the prior store's `persistence.rs`. The
4//! public surface is byte-for-byte the prior store async trait: identical method
5//! names and signatures, so consumers swap backends with a path rename only.
6//!
7//! The translation rules (see `conn.rs`, `lifecycle.rs`, `blobs.rs`):
8//!
9//! * Pure reads run through `self.conn.call(move |conn| { ... })`.
10//! * Read-then-write paths run through `self.conn.write(move |tx| { ... })`
11//!   (`BEGIN IMMEDIATE`, commit on `Ok`, rollback on `Err`) — this is the
12//!   cross-process write-lock guard.
13//! * Paths that may abandon partially-applied writes (the queued-work claim)
14//!   run through `self.conn.write_flow`, deciding commit vs rollback via
15//!   [`TxOutcome`].
16//! * The shared `*_conn` helpers (`try_load_session_head_meta_from_conn`,
17//!   `Self::put_checkpoint_conn`, `Self::load_usage_deltas_conn`,
18//!   `Self::load_session_graph_from_conn`, the queued-work helpers, …) are
19//!   synchronous and take a `&rusqlite::Connection`, so they are reused from
20//!   inside these closures (a `&Transaction` derefs to `&Connection`).
21//! * Closures must be `'static` + `Send`: every borrow of `self`/caller data is
22//!   cloned into an owned value before being moved in.
23
24use super::*;
25
26#[async_trait::async_trait]
27impl RuntimePersistence for Store {
28    fn durability_tier(&self) -> DurabilityTier {
29        DurabilityTier::Durable
30    }
31
32    async fn load_session(
33        &self,
34        scope: SessionReadScope,
35    ) -> Result<Option<PersistedSessionRead>, StoreError> {
36        self.conn
37            .call(move |conn| {
38                let outcome: Result<Option<PersistedSessionRead>, StoreError> = (|| {
39                    let Some(meta) = try_load_session_head_meta_from_conn(conn)? else {
40                        return Ok(None);
41                    };
42                    let leaf_node_id = match &scope {
43                        SessionReadScope::FullGraph => meta.leaf_node_id.clone(),
44                        SessionReadScope::ActivePath { leaf_node_id } => {
45                            leaf_node_id.clone().or_else(|| meta.leaf_node_id.clone())
46                        }
47                    };
48                    let mut graph = match scope {
49                        SessionReadScope::FullGraph => {
50                            Self::load_session_graph_from_conn(conn, meta.leaf_node_id.clone())
51                        }
52                        SessionReadScope::ActivePath { .. } => {
53                            Self::load_active_path_session_graph_from_conn(
54                                conn,
55                                leaf_node_id.clone(),
56                            )
57                            .map_err(sqlite_error)?
58                        }
59                    };
60                    graph.set_leaf_node_id(leaf_node_id);
61                    let checkpoint = meta
62                        .checkpoint_ref
63                        .as_ref()
64                        .and_then(|blob_ref| Self::get_checkpoint_conn(conn, blob_ref));
65                    Ok(Some(PersistedSessionRead {
66                        session_id: meta.session_id,
67                        head_revision: meta.head_revision,
68                        config: meta.config,
69                        agent_frames: meta.agent_frames,
70                        current_agent_frame_id: meta.current_agent_frame_id,
71                        graph,
72                        checkpoint_ref: meta.checkpoint_ref,
73                        checkpoint,
74                        token_ledger: merge_token_ledger_entries(Self::load_usage_deltas_conn(
75                            conn,
76                        )),
77                    }))
78                })(
79                );
80                Ok(outcome)
81            })
82            .await
83            .map_err(sqlite_error)?
84    }
85
86    async fn load_node(
87        &self,
88        node_id: &str,
89    ) -> Result<Option<lash_core::SessionNodeRecord>, StoreError> {
90        let node_id = node_id.to_string();
91        let row: Option<String> = self
92            .conn
93            .call(move |conn| {
94                conn.query_row(
95                    "SELECT node_json FROM graph_nodes WHERE node_id = ?1 AND tombstoned = 0",
96                    params![node_id],
97                    |row| row.get(0),
98                )
99                .optional()
100            })
101            .await
102            .map_err(sqlite_error)?;
103        Ok(row.and_then(|json| serde_json::from_str(&json).ok()))
104    }
105
106    async fn commit_runtime_state(
107        &self,
108        commit: RuntimeCommit,
109    ) -> Result<RuntimeCommitResult, StoreError> {
110        let blob_profile = self.options.blob_profile;
111        let result = self
112            .conn
113            .write_flow(move |tx| {
114                let outcome: Result<RuntimeCommitResult, StoreError> = (|| {
115                    let existing = try_load_session_head_meta_from_conn(tx)?;
116                    if let Some(bound_session_id) =
117                        existing.as_ref().map(|meta| meta.session_id.as_str())
118                        && bound_session_id != commit.session_id
119                    {
120                        return Err(StoreError::SessionBindingMismatch {
121                            bound_session_id: bound_session_id.to_string(),
122                            attempted_session_id: commit.session_id.clone(),
123                        });
124                    }
125                    if let Some(completed) = &commit.turn_commit {
126                        if completed.session_id != commit.session_id {
127                            return Err(StoreError::RuntimeTurnCommitConflict {
128                                session_id: completed.session_id.clone(),
129                                turn_id: completed.turn_id.clone(),
130                            });
131                        }
132                        let prior: Option<(String, String)> = tx
133                            .query_row(
134                                "SELECT turn_commit_hash, result_json FROM runtime_turn_commits
135                                 WHERE session_id = ?1 AND turn_id = ?2",
136                                params![completed.session_id, completed.turn_id],
137                                |row| Ok((row.get(0)?, row.get(1)?)),
138                            )
139                            .optional()
140                            .map_err(sqlite_error)?;
141                        if let Some((turn_commit_hash, result_json)) = prior {
142                            if turn_commit_hash == completed.turn_commit_hash {
143                                let result: RuntimeCommitResult =
144                                    serde_json::from_str(&result_json).map_err(|err| {
145                                        StoreError::Backend(format!(
146                                            "failed to decode runtime turn commit result: {err}"
147                                        ))
148                                    })?;
149                                return Ok(result);
150                            }
151                            return Err(StoreError::RuntimeTurnCommitConflict {
152                                session_id: completed.session_id.clone(),
153                                turn_id: completed.turn_id.clone(),
154                            });
155                        }
156                    }
157                    let actual_revision = existing.as_ref().map_or(0, |meta| meta.head_revision);
158                    if commit.expected_head_revision.is_some()
159                        && commit.expected_head_revision != Some(actual_revision)
160                    {
161                        return Err(StoreError::HeadRevisionConflict {
162                            expected: commit.expected_head_revision,
163                            actual: actual_revision,
164                        });
165                    }
166                    for completed in &commit.completed_queue_claims {
167                        if completed.session_id != commit.session_id {
168                            return Err(StoreError::QueuedWorkClaimExpired {
169                                session_id: completed.session_id.clone(),
170                                claim_id: completed.claim_id.clone(),
171                            });
172                        }
173                        ensure_queued_work_completion_conn(tx, completed)?;
174                    }
175
176                    let stored_checkpoint =
177                        Self::put_checkpoint_conn(tx, &commit.checkpoint, blob_profile)
178                            .map_err(sqlite_error)?;
179
180                    if !commit.usage_deltas.is_empty() {
181                        let mut stmt = tx
182                            .prepare(
183                                "INSERT INTO usage_deltas (
184                                    source, model, input_tokens, output_tokens, cached_input_tokens, reasoning_tokens
185                                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
186                            )
187                            .map_err(sqlite_error)?;
188                        for entry in &commit.usage_deltas {
189                            stmt.execute(params![
190                                entry.source,
191                                entry.model,
192                                entry.usage.input_tokens,
193                                entry.usage.output_tokens,
194                                entry.usage.cached_input_tokens,
195                                entry.usage.reasoning_tokens,
196                            ])
197                            .map_err(sqlite_error)?;
198                        }
199                    }
200
201                    let leaf_node_id = match &commit.graph {
202                        GraphCommitDelta::Unchanged { leaf_node_id } => leaf_node_id.clone(),
203                        GraphCommitDelta::Append {
204                            nodes,
205                            leaf_node_id,
206                        } => {
207                            for node in nodes {
208                                let node_json = encode_json(node);
209                                tx.execute(
210                                    "INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)",
211                                    params![node.node_id, node_json],
212                                )
213                                .map_err(sqlite_error)?;
214                            }
215                            leaf_node_id.clone()
216                        }
217                        GraphCommitDelta::ReplaceFull(graph) => {
218                            tx.execute("DELETE FROM graph_nodes", [])
219                                .map_err(sqlite_error)?;
220                            for node in &graph.nodes {
221                                let node_json = encode_json(node);
222                                tx.execute(
223                                    "INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)",
224                                    params![node.node_id, node_json],
225                                )
226                                .map_err(sqlite_error)?;
227                            }
228                            graph.leaf_node_id.clone()
229                        }
230                    };
231                    let graph_node_count: usize = tx
232                        .query_row(
233                            "SELECT COUNT(*) FROM graph_nodes WHERE tombstoned = 0",
234                            [],
235                            |row| row.get::<_, i64>(0),
236                        )
237                        .map_err(sqlite_error)? as usize;
238                    let next_revision = actual_revision + 1;
239                    let meta = SessionHeadMeta {
240                        session_id: commit.session_id.clone(),
241                        head_revision: next_revision,
242                        config: commit.config.clone(),
243                        agent_frames: commit.agent_frames.clone(),
244                        current_agent_frame_id: commit.current_agent_frame_id.clone(),
245                        checkpoint_ref: Some(stored_checkpoint.checkpoint_ref.clone()),
246                        leaf_node_id,
247                        graph_node_count,
248                        token_ledger: Vec::new(),
249                    };
250                    tx.execute(
251                        "INSERT OR REPLACE INTO session_head (singleton, session_id, head_json, head_revision)
252                         VALUES (1, ?1, ?2, ?3)",
253                        params![
254                            meta.session_id,
255                            encode_json(&meta),
256                            meta.head_revision as i64
257                        ],
258                    )
259                    .map_err(sqlite_error)?;
260                    for completed in &commit.completed_queue_claims {
261                        for batch_id in &completed.batch_ids {
262                            tx.execute(
263                                "DELETE FROM queued_work_batches
264                                 WHERE session_id = ?1
265                                   AND batch_id = ?2
266                                   AND claim_id = ?3
267                                   AND claim_token = ?4",
268                                params![
269                                    completed.session_id,
270                                    batch_id,
271                                    completed.claim_id,
272                                    completed.lease_token
273                                ],
274                            )
275                            .map_err(sqlite_error)?;
276                        }
277                    }
278                    if !commit.committed_attachment_ids.is_empty() {
279                        let now = current_epoch_ms() as i64;
280                        let mut stmt = tx
281                            .prepare(
282                                "UPDATE attachment_manifest
283                                 SET committed_at_ms = COALESCE(committed_at_ms, ?1)
284                                 WHERE attachment_id = ?2 AND session_id = ?3",
285                            )
286                            .map_err(sqlite_error)?;
287                        for id in &commit.committed_attachment_ids {
288                            stmt.execute(params![now, id.as_str(), commit.session_id])
289                                .map_err(sqlite_error)?;
290                        }
291                    }
292                    let result = RuntimeCommitResult {
293                        head_revision: next_revision,
294                        checkpoint_ref: stored_checkpoint.checkpoint_ref,
295                        manifest: stored_checkpoint.manifest,
296                    };
297                    if let Some(completed) = &commit.turn_commit {
298                        tx.execute(
299                            "INSERT INTO runtime_turn_commits (
300                                session_id, turn_id, turn_commit_hash, result_json, committed_at_ms
301                             )
302                             VALUES (?1, ?2, ?3, ?4, ?5)",
303                            params![
304                                completed.session_id,
305                                completed.turn_id,
306                                completed.turn_commit_hash,
307                                encode_json(&result),
308                                current_epoch_ms() as i64
309                            ],
310                        )
311                        .map_err(sqlite_error)?;
312                    }
313                    Ok(result)
314                })();
315                // Roll back on a `StoreError` so a failure after the first
316                // write (e.g. a head-revision conflict surfaced mid-commit, or a
317                // backend write error) does not leave the partial transaction
318                // committed, while still carrying the typed error to the caller.
319                match outcome {
320                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
321                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
322                }
323            })
324            .await
325            .map_err(sqlite_error)??;
326        self.maybe_auto_gc().await;
327        Ok(result)
328    }
329
330    async fn enqueue_queued_work(
331        &self,
332        batch: QueuedWorkBatchDraft,
333    ) -> Result<QueuedWorkBatch, StoreError> {
334        let nonce = self.commit_count.fetch_add(1, AtomicOrdering::Relaxed);
335        self.conn
336            .write_flow(move |tx| {
337                let outcome: Result<QueuedWorkBatch, StoreError> = (|| {
338                    if let Some(source_key) = batch.source_key.as_deref() {
339                        let existing_id: Option<String> = tx
340                            .query_row(
341                                "SELECT batch_id
342                                 FROM queued_work_batches
343                                 WHERE session_id = ?1 AND source_key = ?2",
344                                params![batch.session_id, source_key],
345                                |row| row.get(0),
346                            )
347                            .optional()
348                            .map_err(sqlite_error)?;
349                        if let Some(batch_id) = existing_id {
350                            let existing = load_queued_batch_by_id_conn(tx, &batch_id)?
351                                .ok_or_else(|| {
352                                    StoreError::Backend(
353                                        "queued work source row disappeared".to_string(),
354                                    )
355                                })?;
356                            return Ok(existing);
357                        }
358                    }
359                    let now = current_epoch_ms();
360                    let batch_id = format!(
361                        "qwb:{:x}",
362                        Sha256::digest(
363                            format!("{}:{:?}:{now}:{nonce}", batch.session_id, batch.source_key)
364                                .as_bytes()
365                        )
366                    );
367                    tx.execute(
368                        "INSERT INTO queued_work_batches (
369                            batch_id, session_id, source_key, delivery_policy, slot_policy,
370                            merge_key_json, available_at_ms, enqueued_at_ms
371                         )
372                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
373                        params![
374                            batch_id,
375                            batch.session_id,
376                            batch.source_key.as_deref(),
377                            batch.delivery_policy.as_str(),
378                            batch.slot_policy.as_str(),
379                            encode_json(&batch.merge_key),
380                            batch.available_at_ms as i64,
381                            now as i64,
382                        ],
383                    )
384                    .map_err(sqlite_error)?;
385                    for (index, payload) in batch.payloads.iter().enumerate() {
386                        let item_id = format!("{batch_id}:item:{index}");
387                        tx.execute(
388                            "INSERT INTO queued_work_items (batch_id, item_index, item_id, payload_json)
389                             VALUES (?1, ?2, ?3, ?4)",
390                            params![batch_id, index as i64, item_id, encode_json(payload)],
391                        )
392                        .map_err(sqlite_error)?;
393                    }
394                    load_queued_batch_by_id_conn(tx, &batch_id)?.ok_or_else(|| {
395                        StoreError::Backend("queued work insert disappeared".to_string())
396                    })
397                })();
398                // Roll back the partially-inserted batch/items on a
399                // `StoreError` while still returning the typed error.
400                match outcome {
401                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
402                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
403                }
404            })
405            .await
406            .map_err(sqlite_error)?
407    }
408
409    async fn claim_ready_queued_work(
410        &self,
411        session_id: &str,
412        owner_id: &str,
413        boundary: QueuedWorkClaimBoundary,
414        lease_ttl_ms: u64,
415        max_batches: usize,
416    ) -> Result<Option<QueuedWorkClaim>, StoreError> {
417        if max_batches == 0 {
418            return Ok(None);
419        }
420        let session_id = session_id.to_string();
421        let owner_id = owner_id.to_string();
422        self.conn
423            .write_flow(move |tx| {
424                let outcome: Result<TxOutcome<Option<QueuedWorkClaim>>, StoreError> = (|| {
425                    let now = current_epoch_ms();
426                    let candidate_rows = {
427                        let mut stmt = tx
428                            .prepare(
429                                "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
430                                        slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
431                                        claim_fencing_token
432                                 FROM queued_work_batches
433                                 WHERE session_id = ?1
434                                   AND available_at_ms <= ?2
435                                   AND (claim_token IS NULL OR claim_expires_at_ms <= ?2)
436                                 ORDER BY enqueue_seq ASC
437                                 LIMIT ?3",
438                            )
439                            .map_err(sqlite_error)?;
440                        let rows = stmt
441                            .query_map(
442                                params![
443                                    session_id,
444                                    now as i64,
445                                    (max_batches as i64).saturating_add(32)
446                                ],
447                                queued_batch_row_from_sql,
448                            )
449                            .map_err(sqlite_error)?;
450                        rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
451                    };
452                    let Some(first_row) = candidate_rows.first() else {
453                        return Ok(TxOutcome::Commit(None));
454                    };
455                    let first_delivery = decode_delivery_policy(first_row.delivery_policy.clone())?;
456                    if boundary == QueuedWorkClaimBoundary::ActiveTurnCheckpoint
457                        && first_delivery != DeliveryPolicy::EarliestSafeBoundary
458                    {
459                        return Ok(TxOutcome::Commit(None));
460                    }
461                    let first_slot = decode_slot_policy(first_row.slot_policy.clone())?;
462                    let first_merge_key = decode_merge_key(first_row.merge_key_json.clone())?;
463                    let mut selected = Vec::new();
464                    for row in candidate_rows {
465                        if selected.len() >= max_batches {
466                            break;
467                        }
468                        let delivery = decode_delivery_policy(row.delivery_policy.clone())?;
469                        let slot = decode_slot_policy(row.slot_policy.clone())?;
470                        let merge_key = decode_merge_key(row.merge_key_json.clone())?;
471                        if selected.is_empty() {
472                            selected.push(row);
473                            if first_slot == SlotPolicy::Exclusive {
474                                break;
475                            }
476                            continue;
477                        }
478                        if first_slot != SlotPolicy::Join
479                            || slot != SlotPolicy::Join
480                            || delivery != first_delivery
481                            || merge_key != first_merge_key
482                        {
483                            break;
484                        }
485                        selected.push(row);
486                    }
487                    let Some(first) = selected.first() else {
488                        return Ok(TxOutcome::Commit(None));
489                    };
490                    let fencing_token = first.claim_fencing_token.saturating_add(1);
491                    let claim_id = format!("qwc:{}:{fencing_token}", first.enqueue_seq);
492                    let lease_token = format!(
493                        "{:x}",
494                        Sha256::digest(
495                            format!("{session_id}:{owner_id}:{claim_id}:{now}").as_bytes()
496                        )
497                    );
498                    let expires_at = now.saturating_add(lease_ttl_ms);
499                    for row in &selected {
500                        // Under `BEGIN IMMEDIATE` this connection already holds
501                        // the write lock, but the row could still have been
502                        // claimed by an earlier committed writer (its
503                        // `claim_token` set and not yet expired). The `WHERE`
504                        // clause filters those out, so a 0-row update means we
505                        // lost the race for this batch: treat the whole claim as
506                        // not-won rather than returning a claim that doesn't
507                        // actually own the row.
508                        let claimed = tx
509                            .execute(
510                                "UPDATE queued_work_batches
511                                 SET claim_id = ?3,
512                                     claim_owner_id = ?4,
513                                     claim_token = ?5,
514                                     claim_fencing_token = claim_fencing_token + 1,
515                                     claim_claimed_at_ms = ?6,
516                                     claim_expires_at_ms = ?7
517                                 WHERE session_id = ?1
518                                   AND batch_id = ?2
519                                   AND (claim_token IS NULL OR claim_expires_at_ms <= ?6)",
520                                params![
521                                    session_id,
522                                    row.batch_id,
523                                    claim_id,
524                                    owner_id,
525                                    lease_token,
526                                    now as i64,
527                                    expires_at as i64
528                                ],
529                            )
530                            .map_err(sqlite_error)?;
531                        if claimed == 0 {
532                            // Lost the race for this batch. Roll back any sibling
533                            // rows we already claimed in this transaction so we
534                            // never return a half-owned claim.
535                            return Ok(TxOutcome::Rollback(None));
536                        }
537                    }
538                    let mut batches = Vec::new();
539                    for row in selected {
540                        batches.push(queued_work_batch_from_conn(tx, row)?);
541                    }
542                    Ok(TxOutcome::Commit(Some(QueuedWorkClaim {
543                        session_id: session_id.clone(),
544                        claim_id,
545                        owner_id: owner_id.clone(),
546                        lease_token,
547                        fencing_token,
548                        claimed_at_epoch_ms: now,
549                        expires_at_epoch_ms: expires_at,
550                        batches,
551                    })))
552                })();
553                // Lower a `StoreError` into the rollback arm so the closure body
554                // can keep using `?` while still propagating the error to the
555                // caller. Encode it as a `Result` carried out of the flow.
556                match outcome {
557                    Ok(TxOutcome::Commit(value)) => Ok(TxOutcome::Commit(Ok(value))),
558                    Ok(TxOutcome::Rollback(value)) => Ok(TxOutcome::Rollback(Ok(value))),
559                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
560                }
561            })
562            .await
563            .map_err(sqlite_error)?
564    }
565
566    async fn renew_queued_work_claim(
567        &self,
568        claim: &QueuedWorkClaim,
569        lease_ttl_ms: u64,
570    ) -> Result<QueuedWorkClaim, StoreError> {
571        let now = current_epoch_ms();
572        let expires_at = now.saturating_add(lease_ttl_ms);
573        let session_id = claim.session_id.clone();
574        let claim_id = claim.claim_id.clone();
575        let lease_token = claim.lease_token.clone();
576        let batch_count = claim.batches.len();
577        let changed = self
578            .conn
579            .write(move |tx| {
580                tx.execute(
581                    "UPDATE queued_work_batches
582                     SET claim_expires_at_ms = ?4
583                     WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
584                    params![session_id, claim_id, lease_token, expires_at as i64],
585                )
586            })
587            .await
588            .map_err(sqlite_error)?;
589        if changed != batch_count {
590            return Err(StoreError::QueuedWorkClaimExpired {
591                session_id: claim.session_id.clone(),
592                claim_id: claim.claim_id.clone(),
593            });
594        }
595        Ok(QueuedWorkClaim {
596            expires_at_epoch_ms: expires_at,
597            ..claim.clone()
598        })
599    }
600
601    async fn abandon_queued_work_claim(&self, claim: &QueuedWorkClaim) -> Result<(), StoreError> {
602        let session_id = claim.session_id.clone();
603        let claim_id = claim.claim_id.clone();
604        let lease_token = claim.lease_token.clone();
605        self.conn
606            .write(move |tx| {
607                tx.execute(
608                    "UPDATE queued_work_batches
609                     SET claim_id = NULL,
610                         claim_owner_id = NULL,
611                         claim_token = NULL,
612                         claim_claimed_at_ms = 0,
613                         claim_expires_at_ms = 0
614                     WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
615                    params![session_id, claim_id, lease_token],
616                )
617            })
618            .await
619            .map_err(sqlite_error)?;
620        Ok(())
621    }
622
623    async fn cancel_queued_work_batch(
624        &self,
625        session_id: &str,
626        batch_id: &str,
627    ) -> Result<Option<QueuedWorkBatch>, StoreError> {
628        let session_id = session_id.to_string();
629        let batch_id = batch_id.to_string();
630        self.conn
631            .write_flow(move |tx| {
632                let outcome: Result<Option<QueuedWorkBatch>, StoreError> = (|| {
633                    let now = current_epoch_ms() as i64;
634                    let row = tx
635                        .query_row(
636                            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
637                                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
638                                    claim_fencing_token
639                             FROM queued_work_batches
640                             WHERE session_id = ?1
641                               AND batch_id = ?2
642                               AND (claim_token IS NULL OR claim_expires_at_ms <= ?3)",
643                            params![session_id, batch_id, now],
644                            queued_batch_row_from_sql,
645                        )
646                        .optional()
647                        .map_err(sqlite_error)?;
648                    let Some(row) = row else {
649                        return Ok(None);
650                    };
651                    let batch = queued_work_batch_from_conn(tx, row)?;
652                    tx.execute(
653                        "DELETE FROM queued_work_batches
654                         WHERE session_id = ?1
655                           AND batch_id = ?2
656                           AND (claim_token IS NULL OR claim_expires_at_ms <= ?3)",
657                        params![session_id, batch_id, now],
658                    )
659                    .map_err(sqlite_error)?;
660                    Ok(Some(batch))
661                })();
662                match outcome {
663                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
664                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
665                }
666            })
667            .await
668            .map_err(sqlite_error)?
669    }
670
671    async fn list_queued_work(&self, session_id: &str) -> Result<Vec<QueuedWorkBatch>, StoreError> {
672        let session_id = session_id.to_string();
673        self.conn
674            .call(move |conn| {
675                let outcome: Result<Vec<QueuedWorkBatch>, StoreError> = (|| {
676                    let rows = {
677                        let mut stmt = conn
678                            .prepare(
679                                "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
680                                        slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
681                                        claim_fencing_token
682                                 FROM queued_work_batches
683                                 WHERE session_id = ?1
684                                 ORDER BY enqueue_seq ASC",
685                            )
686                            .map_err(sqlite_error)?;
687                        let rows = stmt
688                            .query_map(params![session_id], queued_batch_row_from_sql)
689                            .map_err(sqlite_error)?;
690                        rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
691                    };
692                    rows.into_iter()
693                        .map(|row| queued_work_batch_from_conn(conn, row))
694                        .collect()
695                })();
696                Ok(outcome)
697            })
698            .await
699            .map_err(sqlite_error)?
700    }
701
702    async fn list_pending_queued_work(
703        &self,
704        session_id: &str,
705    ) -> Result<Vec<QueuedWorkBatch>, StoreError> {
706        let session_id = session_id.to_string();
707        self.conn
708            .call(move |conn| {
709                let outcome: Result<Vec<QueuedWorkBatch>, StoreError> = (|| {
710                    let now = current_epoch_ms();
711                    let rows = {
712                        let mut stmt = conn
713                            .prepare(
714                                "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
715                                        slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
716                                        claim_fencing_token
717                                 FROM queued_work_batches
718                                 WHERE session_id = ?1
719                                   AND (claim_token IS NULL OR claim_expires_at_ms <= ?2)
720                                 ORDER BY enqueue_seq ASC",
721                            )
722                            .map_err(sqlite_error)?;
723                        let rows = stmt
724                            .query_map(
725                                params![session_id, now as i64],
726                                queued_batch_row_from_sql,
727                            )
728                            .map_err(sqlite_error)?;
729                        rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
730                    };
731                    rows.into_iter()
732                        .map(|row| queued_work_batch_from_conn(conn, row))
733                        .collect()
734                })();
735                Ok(outcome)
736            })
737            .await
738            .map_err(sqlite_error)?
739    }
740
741    async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError> {
742        Store::save_session_meta(self, meta).await;
743        Ok(())
744    }
745
746    async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError> {
747        Ok(Store::load_session_meta(self).await)
748    }
749
750    async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError> {
751        if ids.is_empty() {
752            return Ok(());
753        }
754        let ids = ids.to_vec();
755        self.conn
756            .write(move |tx| {
757                let mut stmt =
758                    tx.prepare("UPDATE graph_nodes SET tombstoned = 1 WHERE node_id = ?1")?;
759                for id in &ids {
760                    stmt.execute(params![id])?;
761                }
762                Ok(())
763            })
764            .await
765            .map_err(sqlite_error)
766    }
767
768    async fn vacuum(&self) -> Result<VacuumReport, StoreError> {
769        let removed = self
770            .conn
771            .write(move |tx| tx.execute("DELETE FROM graph_nodes WHERE tombstoned = 1", []))
772            .await
773            .map_err(sqlite_error)?;
774        Ok(VacuumReport {
775            removed_node_count: removed,
776        })
777    }
778
779    async fn gc_unreachable(&self) -> Result<GcReport, StoreError> {
780        Ok(Store::gc_unreachable(self).await)
781    }
782}