Skip to main content

lash_sqlite_store/
persistence.rs

1//! The [`RuntimePersistence`] trait implementation for [`Store`].
2//!
3//! This is the tokio-rusqlite port of the prior store's `persistence.rs`. The
4//! public surface is byte-for-byte the prior store async trait: identical method
5//! names and signatures, so consumers swap backends with a path rename only.
6//!
7//! The translation rules (see `conn.rs`, `lifecycle.rs`, `blobs.rs`):
8//!
9//! * Pure reads run through `self.conn.call(move |conn| { ... })`.
10//! * Read-then-write paths run through `self.conn.write(move |tx| { ... })`
11//!   (`BEGIN IMMEDIATE`, commit on `Ok`, rollback on `Err`) — this is the
12//!   cross-process write-lock guard.
13//! * Paths that may abandon partially-applied writes (the queued-work claim)
14//!   run through `self.conn.write_flow`, deciding commit vs rollback via
15//!   [`TxOutcome`].
16//! * The shared `*_conn` helpers (`try_load_session_head_meta_from_conn`,
17//!   `Self::put_checkpoint_conn`, `Self::load_usage_deltas_conn`,
18//!   `Self::load_session_graph_from_conn`, the queued-work helpers, …) are
19//!   synchronous and take a `&rusqlite::Connection`, so they are reused from
20//!   inside these closures (a `&Transaction` derefs to `&Connection`).
21//! * Closures must be `'static` + `Send`: every borrow of `self`/caller data is
22//!   cloned into an owned value before being moved in.
23
24use super::*;
25
26#[async_trait::async_trait]
27impl RuntimePersistence for Store {
28    fn durability_tier(&self) -> DurabilityTier {
29        DurabilityTier::Durable
30    }
31
32    async fn load_session(
33        &self,
34        scope: SessionReadScope,
35    ) -> Result<Option<PersistedSessionRead>, StoreError> {
36        self.conn
37            .call(move |conn| {
38                let outcome: Result<Option<PersistedSessionRead>, StoreError> = (|| {
39                    let Some(meta) = try_load_session_head_meta_from_conn(conn)? else {
40                        return Ok(None);
41                    };
42                    let leaf_node_id = match &scope {
43                        SessionReadScope::FullGraph => meta.leaf_node_id.clone(),
44                        SessionReadScope::ActivePath { leaf_node_id } => {
45                            leaf_node_id.clone().or_else(|| meta.leaf_node_id.clone())
46                        }
47                    };
48                    let mut graph = match scope {
49                        SessionReadScope::FullGraph => {
50                            Self::load_session_graph_from_conn(conn, meta.leaf_node_id.clone())
51                        }
52                        SessionReadScope::ActivePath { .. } => {
53                            Self::load_active_path_session_graph_from_conn(
54                                conn,
55                                leaf_node_id.clone(),
56                            )
57                            .map_err(sqlite_error)?
58                        }
59                    };
60                    graph.set_leaf_node_id(leaf_node_id);
61                    let checkpoint = meta
62                        .checkpoint_ref
63                        .as_ref()
64                        .and_then(|blob_ref| Self::get_checkpoint_conn(conn, blob_ref));
65                    Ok(Some(PersistedSessionRead {
66                        session_id: meta.session_id,
67                        head_revision: meta.head_revision,
68                        config: meta.config,
69                        agent_frames: meta.agent_frames,
70                        current_agent_frame_id: meta.current_agent_frame_id,
71                        graph,
72                        checkpoint_ref: meta.checkpoint_ref,
73                        checkpoint,
74                        token_ledger: merge_token_ledger_entries(Self::load_usage_deltas_conn(
75                            conn,
76                        )),
77                    }))
78                })(
79                );
80                Ok(outcome)
81            })
82            .await
83            .map_err(sqlite_error)?
84    }
85
86    async fn load_node(
87        &self,
88        node_id: &str,
89    ) -> Result<Option<lash_core::SessionNodeRecord>, StoreError> {
90        let node_id = node_id.to_string();
91        let row: Option<String> = self
92            .conn
93            .call(move |conn| {
94                conn.query_row(
95                    "SELECT node_json FROM graph_nodes WHERE node_id = ?1 AND tombstoned = 0",
96                    params![node_id],
97                    |row| row.get(0),
98                )
99                .optional()
100            })
101            .await
102            .map_err(sqlite_error)?;
103        Ok(row.and_then(|json| serde_json::from_str(&json).ok()))
104    }
105
106    async fn commit_runtime_state(
107        &self,
108        commit: RuntimeCommit,
109    ) -> Result<RuntimeCommitResult, StoreError> {
110        let blob_profile = self.options.blob_profile;
111        let result = self
112            .conn
113            .write_flow(move |tx| {
114                let outcome: Result<RuntimeCommitResult, StoreError> = (|| {
115                    let existing = try_load_session_head_meta_from_conn(tx)?;
116                    if let Some(bound_session_id) =
117                        existing.as_ref().map(|meta| meta.session_id.as_str())
118                        && bound_session_id != commit.session_id
119                    {
120                        return Err(StoreError::SessionBindingMismatch {
121                            bound_session_id: bound_session_id.to_string(),
122                            attempted_session_id: commit.session_id.clone(),
123                        });
124                    }
125                    if let Some(completed) = &commit.turn_commit {
126                        if completed.session_id != commit.session_id {
127                            return Err(StoreError::RuntimeTurnCommitConflict {
128                                session_id: completed.session_id.clone(),
129                                turn_id: completed.turn_id.clone(),
130                            });
131                        }
132                        let prior: Option<(String, String)> = tx
133                            .query_row(
134                                "SELECT turn_commit_hash, result_json FROM runtime_turn_commits
135                                 WHERE session_id = ?1 AND turn_id = ?2",
136                                params![completed.session_id, completed.turn_id],
137                                |row| Ok((row.get(0)?, row.get(1)?)),
138                            )
139                            .optional()
140                            .map_err(sqlite_error)?;
141                        if let Some((turn_commit_hash, result_json)) = prior {
142                            if turn_commit_hash == completed.turn_commit_hash {
143                                let result: RuntimeCommitResult =
144                                    serde_json::from_str(&result_json).map_err(|err| {
145                                        StoreError::Backend(format!(
146                                            "failed to decode runtime turn commit result: {err}"
147                                        ))
148                                    })?;
149                                return Ok(result);
150                            }
151                            return Err(StoreError::RuntimeTurnCommitConflict {
152                                session_id: completed.session_id.clone(),
153                                turn_id: completed.turn_id.clone(),
154                            });
155                        }
156                    }
157                    let actual_revision = existing.as_ref().map_or(0, |meta| meta.head_revision);
158                    if commit.expected_head_revision.is_some()
159                        && commit.expected_head_revision != Some(actual_revision)
160                    {
161                        return Err(StoreError::HeadRevisionConflict {
162                            expected: commit.expected_head_revision,
163                            actual: actual_revision,
164                        });
165                    }
166                    for completed in &commit.completed_queue_claims {
167                        if completed.session_id != commit.session_id {
168                            return Err(StoreError::QueuedWorkClaimExpired {
169                                session_id: completed.session_id.clone(),
170                                claim_id: completed.claim_id.clone(),
171                            });
172                        }
173                        ensure_queued_work_completion_conn(tx, completed)?;
174                    }
175
176                    let stored_checkpoint =
177                        Self::put_checkpoint_conn(tx, &commit.checkpoint, blob_profile)
178                            .map_err(sqlite_error)?;
179
180                    if !commit.usage_deltas.is_empty() {
181                        let mut stmt = tx
182                            .prepare(
183                                "INSERT INTO usage_deltas (
184                                    source, model, input_tokens, output_tokens, cached_input_tokens, reasoning_tokens
185                                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
186                            )
187                            .map_err(sqlite_error)?;
188                        for entry in &commit.usage_deltas {
189                            stmt.execute(params![
190                                entry.source,
191                                entry.model,
192                                entry.usage.input_tokens,
193                                entry.usage.output_tokens,
194                                entry.usage.cached_input_tokens,
195                                entry.usage.reasoning_tokens,
196                            ])
197                            .map_err(sqlite_error)?;
198                        }
199                    }
200
201                    let leaf_node_id = match &commit.graph {
202                        GraphCommitDelta::Unchanged { leaf_node_id } => leaf_node_id.clone(),
203                        GraphCommitDelta::Append {
204                            nodes,
205                            leaf_node_id,
206                        } => {
207                            for node in nodes {
208                                let node_json = encode_json(node);
209                                tx.execute(
210                                    "INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)",
211                                    params![node.node_id, node_json],
212                                )
213                                .map_err(sqlite_error)?;
214                            }
215                            leaf_node_id.clone()
216                        }
217                        GraphCommitDelta::ReplaceFull(graph) => {
218                            tx.execute("DELETE FROM graph_nodes", [])
219                                .map_err(sqlite_error)?;
220                            for node in &graph.nodes {
221                                let node_json = encode_json(node);
222                                tx.execute(
223                                    "INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)",
224                                    params![node.node_id, node_json],
225                                )
226                                .map_err(sqlite_error)?;
227                            }
228                            graph.leaf_node_id.clone()
229                        }
230                    };
231                    let graph_node_count: usize = tx
232                        .query_row(
233                            "SELECT COUNT(*) FROM graph_nodes WHERE tombstoned = 0",
234                            [],
235                            |row| row.get::<_, i64>(0),
236                        )
237                        .map_err(sqlite_error)? as usize;
238                    let next_revision = actual_revision + 1;
239                    let meta = SessionHeadMeta {
240                        session_id: commit.session_id.clone(),
241                        head_revision: next_revision,
242                        config: commit.config.clone(),
243                        agent_frames: commit.agent_frames.clone(),
244                        current_agent_frame_id: commit.current_agent_frame_id.clone(),
245                        checkpoint_ref: Some(stored_checkpoint.checkpoint_ref.clone()),
246                        leaf_node_id,
247                        graph_node_count,
248                        token_ledger: Vec::new(),
249                    };
250                    tx.execute(
251                        "INSERT OR REPLACE INTO session_head (singleton, session_id, head_json, head_revision)
252                         VALUES (1, ?1, ?2, ?3)",
253                        params![
254                            meta.session_id,
255                            encode_json(&meta),
256                            meta.head_revision as i64
257                        ],
258                    )
259                    .map_err(sqlite_error)?;
260                    for completed in &commit.completed_queue_claims {
261                        for batch_id in &completed.batch_ids {
262                            tx.execute(
263                                "DELETE FROM queued_work_batches
264                                 WHERE session_id = ?1
265                                   AND batch_id = ?2
266                                   AND claim_id = ?3
267                                   AND claim_token = ?4",
268                                params![
269                                    completed.session_id,
270                                    batch_id,
271                                    completed.claim_id,
272                                    completed.lease_token
273                                ],
274                            )
275                            .map_err(sqlite_error)?;
276                        }
277                    }
278                    if !commit.committed_attachment_ids.is_empty() {
279                        let now = current_epoch_ms() as i64;
280                        let mut stmt = tx
281                            .prepare(
282                                "UPDATE attachment_manifest
283                                 SET committed_at_ms = COALESCE(committed_at_ms, ?1)
284                                 WHERE attachment_id = ?2 AND session_id = ?3",
285                            )
286                            .map_err(sqlite_error)?;
287                        for id in &commit.committed_attachment_ids {
288                            stmt.execute(params![now, id.as_str(), commit.session_id])
289                                .map_err(sqlite_error)?;
290                        }
291                    }
292                    let result = RuntimeCommitResult {
293                        head_revision: next_revision,
294                        checkpoint_ref: stored_checkpoint.checkpoint_ref,
295                        manifest: stored_checkpoint.manifest,
296                    };
297                    if let Some(completed) = &commit.turn_commit {
298                        tx.execute(
299                            "INSERT INTO runtime_turn_commits (
300                                session_id, turn_id, turn_commit_hash, result_json, committed_at_ms
301                             )
302                             VALUES (?1, ?2, ?3, ?4, ?5)",
303                            params![
304                                completed.session_id,
305                                completed.turn_id,
306                                completed.turn_commit_hash,
307                                encode_json(&result),
308                                current_epoch_ms() as i64
309                            ],
310                        )
311                        .map_err(sqlite_error)?;
312                    }
313                    Ok(result)
314                })();
315                // Roll back on a `StoreError` so a failure after the first
316                // write (e.g. a head-revision conflict surfaced mid-commit, or a
317                // backend write error) does not leave the partial transaction
318                // committed, while still carrying the typed error to the caller.
319                match outcome {
320                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
321                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
322                }
323            })
324            .await
325            .map_err(sqlite_error)??;
326        self.maybe_auto_gc().await;
327        Ok(result)
328    }
329
330    async fn enqueue_queued_work(
331        &self,
332        batch: QueuedWorkBatchDraft,
333    ) -> Result<QueuedWorkBatch, StoreError> {
334        let nonce = self.commit_count.fetch_add(1, AtomicOrdering::Relaxed);
335        self.conn
336            .write_flow(move |tx| {
337                let outcome: Result<QueuedWorkBatch, StoreError> = (|| {
338                    if let Some(source_key) = batch.source_key.as_deref() {
339                        let existing_id: Option<String> = tx
340                            .query_row(
341                                "SELECT batch_id
342                                 FROM queued_work_batches
343                                 WHERE session_id = ?1 AND source_key = ?2",
344                                params![batch.session_id, source_key],
345                                |row| row.get(0),
346                            )
347                            .optional()
348                            .map_err(sqlite_error)?;
349                        if let Some(batch_id) = existing_id {
350                            let existing = load_queued_batch_by_id_conn(tx, &batch_id)?
351                                .ok_or_else(|| {
352                                    StoreError::Backend(
353                                        "queued work source row disappeared".to_string(),
354                                    )
355                                })?;
356                            return Ok(existing);
357                        }
358                    }
359                    let now = current_epoch_ms();
360                    let batch_id =
361                        derive_batch_id(&batch.session_id, batch.source_key.as_deref(), now, Some(nonce));
362                    tx.execute(
363                        "INSERT INTO queued_work_batches (
364                            batch_id, session_id, source_key, delivery_policy, slot_policy,
365                            merge_key_json, available_at_ms, enqueued_at_ms
366                         )
367                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
368                        params![
369                            batch_id,
370                            batch.session_id,
371                            batch.source_key.as_deref(),
372                            batch.delivery_policy.as_str(),
373                            batch.slot_policy.as_str(),
374                            encode_json(&batch.merge_key),
375                            batch.available_at_ms as i64,
376                            now as i64,
377                        ],
378                    )
379                    .map_err(sqlite_error)?;
380                    for (index, payload) in batch.payloads.iter().enumerate() {
381                        let item_id = format!("{batch_id}:item:{index}");
382                        tx.execute(
383                            "INSERT INTO queued_work_items (batch_id, item_index, item_id, payload_json)
384                             VALUES (?1, ?2, ?3, ?4)",
385                            params![batch_id, index as i64, item_id, encode_json(payload)],
386                        )
387                        .map_err(sqlite_error)?;
388                    }
389                    load_queued_batch_by_id_conn(tx, &batch_id)?.ok_or_else(|| {
390                        StoreError::Backend("queued work insert disappeared".to_string())
391                    })
392                })();
393                // Roll back the partially-inserted batch/items on a
394                // `StoreError` while still returning the typed error.
395                match outcome {
396                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
397                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
398                }
399            })
400            .await
401            .map_err(sqlite_error)?
402    }
403
404    async fn claim_ready_queued_work(
405        &self,
406        session_id: &str,
407        owner_id: &str,
408        boundary: QueuedWorkClaimBoundary,
409        lease_ttl_ms: u64,
410        max_batches: usize,
411    ) -> Result<Option<QueuedWorkClaim>, StoreError> {
412        if max_batches == 0 {
413            return Ok(None);
414        }
415        let session_id = session_id.to_string();
416        let owner_id = owner_id.to_string();
417        self.conn
418            .write_flow(move |tx| {
419                let outcome: Result<TxOutcome<Option<QueuedWorkClaim>>, StoreError> = (|| {
420                    let now = current_epoch_ms();
421                    let candidate_rows = {
422                        let mut stmt = tx
423                            .prepare(
424                                "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
425                                        slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
426                                        claim_fencing_token
427                                 FROM queued_work_batches
428                                 WHERE session_id = ?1
429                                   AND available_at_ms <= ?2
430                                   AND (claim_token IS NULL OR claim_expires_at_ms <= ?2)
431                                 ORDER BY enqueue_seq ASC
432                                 LIMIT ?3",
433                            )
434                            .map_err(sqlite_error)?;
435                        let rows = stmt
436                            .query_map(
437                                params![session_id, now as i64, claim_scan_limit(max_batches)],
438                                queued_batch_row_from_sql,
439                            )
440                            .map_err(sqlite_error)?;
441                        rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
442                    };
443                    let candidates = candidate_rows
444                        .iter()
445                        .map(|row| {
446                            Ok(ClaimCandidate {
447                                enqueue_seq: row.enqueue_seq,
448                                claim_fencing_token: row.claim_fencing_token,
449                                delivery_policy: decode_delivery_policy(
450                                    row.delivery_policy.clone(),
451                                )?,
452                                slot_policy: decode_slot_policy(row.slot_policy.clone())?,
453                                merge_key: decode_merge_key(row.merge_key_json.clone())?,
454                            })
455                        })
456                        .collect::<Result<Vec<_>, StoreError>>()?;
457                    let selected_len = select_claim_prefix(&candidates, boundary, max_batches);
458                    if selected_len == 0 {
459                        return Ok(TxOutcome::Commit(None));
460                    }
461                    let mut selected = candidate_rows;
462                    selected.truncate(selected_len);
463                    let lease = QueuedWorkClaimLease::derive(
464                        &candidates[0],
465                        &session_id,
466                        &owner_id,
467                        now,
468                        lease_ttl_ms,
469                    );
470                    for row in &selected {
471                        // Under `BEGIN IMMEDIATE` this connection already holds
472                        // the write lock, but the row could still have been
473                        // claimed by an earlier committed writer (its
474                        // `claim_token` set and not yet expired). The `WHERE`
475                        // clause filters those out, so a 0-row update means we
476                        // lost the race for this batch: treat the whole claim as
477                        // not-won rather than returning a claim that doesn't
478                        // actually own the row.
479                        let claimed = tx
480                            .execute(
481                                "UPDATE queued_work_batches
482                                 SET claim_id = ?3,
483                                     claim_owner_id = ?4,
484                                     claim_token = ?5,
485                                     claim_fencing_token = claim_fencing_token + 1,
486                                     claim_claimed_at_ms = ?6,
487                                     claim_expires_at_ms = ?7
488                                 WHERE session_id = ?1
489                                   AND batch_id = ?2
490                                   AND (claim_token IS NULL OR claim_expires_at_ms <= ?6)",
491                                params![
492                                    session_id,
493                                    row.batch_id,
494                                    lease.claim_id,
495                                    owner_id,
496                                    lease.lease_token,
497                                    now as i64,
498                                    lease.expires_at_epoch_ms as i64
499                                ],
500                            )
501                            .map_err(sqlite_error)?;
502                        if claimed == 0 {
503                            // Lost the race for this batch. Roll back any sibling
504                            // rows we already claimed in this transaction so we
505                            // never return a half-owned claim.
506                            return Ok(TxOutcome::Rollback(None));
507                        }
508                    }
509                    let mut batches = Vec::new();
510                    for row in selected {
511                        batches.push(queued_work_batch_from_conn(tx, row)?);
512                    }
513                    Ok(TxOutcome::Commit(Some(QueuedWorkClaim {
514                        session_id: session_id.clone(),
515                        claim_id: lease.claim_id,
516                        owner_id: owner_id.clone(),
517                        lease_token: lease.lease_token,
518                        fencing_token: lease.fencing_token,
519                        claimed_at_epoch_ms: lease.claimed_at_epoch_ms,
520                        expires_at_epoch_ms: lease.expires_at_epoch_ms,
521                        batches,
522                    })))
523                })();
524                // Lower a `StoreError` into the rollback arm so the closure body
525                // can keep using `?` while still propagating the error to the
526                // caller. Encode it as a `Result` carried out of the flow.
527                match outcome {
528                    Ok(TxOutcome::Commit(value)) => Ok(TxOutcome::Commit(Ok(value))),
529                    Ok(TxOutcome::Rollback(value)) => Ok(TxOutcome::Rollback(Ok(value))),
530                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
531                }
532            })
533            .await
534            .map_err(sqlite_error)?
535    }
536
537    async fn renew_queued_work_claim(
538        &self,
539        claim: &QueuedWorkClaim,
540        lease_ttl_ms: u64,
541    ) -> Result<QueuedWorkClaim, StoreError> {
542        let now = current_epoch_ms();
543        let expires_at = now.saturating_add(lease_ttl_ms);
544        let session_id = claim.session_id.clone();
545        let claim_id = claim.claim_id.clone();
546        let lease_token = claim.lease_token.clone();
547        let changed = self
548            .conn
549            .write(move |tx| {
550                tx.execute(
551                    "UPDATE queued_work_batches
552                     SET claim_expires_at_ms = ?4
553                     WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
554                    params![session_id, claim_id, lease_token, expires_at as i64],
555                )
556            })
557            .await
558            .map_err(sqlite_error)?;
559        renewed_claim(claim, changed, expires_at)
560    }
561
562    async fn abandon_queued_work_claim(&self, claim: &QueuedWorkClaim) -> Result<(), StoreError> {
563        let session_id = claim.session_id.clone();
564        let claim_id = claim.claim_id.clone();
565        let lease_token = claim.lease_token.clone();
566        self.conn
567            .write(move |tx| {
568                tx.execute(
569                    "UPDATE queued_work_batches
570                     SET claim_id = NULL,
571                         claim_owner_id = NULL,
572                         claim_token = NULL,
573                         claim_claimed_at_ms = 0,
574                         claim_expires_at_ms = 0
575                     WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
576                    params![session_id, claim_id, lease_token],
577                )
578            })
579            .await
580            .map_err(sqlite_error)?;
581        Ok(())
582    }
583
584    async fn cancel_queued_work_batch(
585        &self,
586        session_id: &str,
587        batch_id: &str,
588    ) -> Result<Option<QueuedWorkBatch>, StoreError> {
589        let session_id = session_id.to_string();
590        let batch_id = batch_id.to_string();
591        self.conn
592            .write_flow(move |tx| {
593                let outcome: Result<Option<QueuedWorkBatch>, StoreError> = (|| {
594                    let now = current_epoch_ms() as i64;
595                    let row = tx
596                        .query_row(
597                            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
598                                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
599                                    claim_fencing_token
600                             FROM queued_work_batches
601                             WHERE session_id = ?1
602                               AND batch_id = ?2
603                               AND (claim_token IS NULL OR claim_expires_at_ms <= ?3)",
604                            params![session_id, batch_id, now],
605                            queued_batch_row_from_sql,
606                        )
607                        .optional()
608                        .map_err(sqlite_error)?;
609                    let Some(row) = row else {
610                        return Ok(None);
611                    };
612                    let batch = queued_work_batch_from_conn(tx, row)?;
613                    tx.execute(
614                        "DELETE FROM queued_work_batches
615                         WHERE session_id = ?1
616                           AND batch_id = ?2
617                           AND (claim_token IS NULL OR claim_expires_at_ms <= ?3)",
618                        params![session_id, batch_id, now],
619                    )
620                    .map_err(sqlite_error)?;
621                    Ok(Some(batch))
622                })();
623                match outcome {
624                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
625                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
626                }
627            })
628            .await
629            .map_err(sqlite_error)?
630    }
631
632    async fn list_queued_work(&self, session_id: &str) -> Result<Vec<QueuedWorkBatch>, StoreError> {
633        let session_id = session_id.to_string();
634        self.conn
635            .call(move |conn| {
636                let outcome: Result<Vec<QueuedWorkBatch>, StoreError> = (|| {
637                    let rows = {
638                        let mut stmt = conn
639                            .prepare(
640                                "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
641                                        slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
642                                        claim_fencing_token
643                                 FROM queued_work_batches
644                                 WHERE session_id = ?1
645                                 ORDER BY enqueue_seq ASC",
646                            )
647                            .map_err(sqlite_error)?;
648                        let rows = stmt
649                            .query_map(params![session_id], queued_batch_row_from_sql)
650                            .map_err(sqlite_error)?;
651                        rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
652                    };
653                    rows.into_iter()
654                        .map(|row| queued_work_batch_from_conn(conn, row))
655                        .collect()
656                })();
657                Ok(outcome)
658            })
659            .await
660            .map_err(sqlite_error)?
661    }
662
663    async fn list_pending_queued_work(
664        &self,
665        session_id: &str,
666    ) -> Result<Vec<QueuedWorkBatch>, StoreError> {
667        let session_id = session_id.to_string();
668        self.conn
669            .call(move |conn| {
670                let outcome: Result<Vec<QueuedWorkBatch>, StoreError> = (|| {
671                    let now = current_epoch_ms();
672                    let rows = {
673                        let mut stmt = conn
674                            .prepare(
675                                "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
676                                        slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
677                                        claim_fencing_token
678                                 FROM queued_work_batches
679                                 WHERE session_id = ?1
680                                   AND (claim_token IS NULL OR claim_expires_at_ms <= ?2)
681                                 ORDER BY enqueue_seq ASC",
682                            )
683                            .map_err(sqlite_error)?;
684                        let rows = stmt
685                            .query_map(
686                                params![session_id, now as i64],
687                                queued_batch_row_from_sql,
688                            )
689                            .map_err(sqlite_error)?;
690                        rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
691                    };
692                    rows.into_iter()
693                        .map(|row| queued_work_batch_from_conn(conn, row))
694                        .collect()
695                })();
696                Ok(outcome)
697            })
698            .await
699            .map_err(sqlite_error)?
700    }
701
702    async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError> {
703        Store::save_session_meta(self, meta).await;
704        Ok(())
705    }
706
707    async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError> {
708        Ok(Store::load_session_meta(self).await)
709    }
710
711    async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError> {
712        if ids.is_empty() {
713            return Ok(());
714        }
715        let ids = ids.to_vec();
716        self.conn
717            .write(move |tx| {
718                let mut stmt =
719                    tx.prepare("UPDATE graph_nodes SET tombstoned = 1 WHERE node_id = ?1")?;
720                for id in &ids {
721                    stmt.execute(params![id])?;
722                }
723                Ok(())
724            })
725            .await
726            .map_err(sqlite_error)
727    }
728
729    async fn vacuum(&self) -> Result<VacuumReport, StoreError> {
730        let removed = self
731            .conn
732            .write(move |tx| tx.execute("DELETE FROM graph_nodes WHERE tombstoned = 1", []))
733            .await
734            .map_err(sqlite_error)?;
735        Ok(VacuumReport {
736            removed_node_count: removed,
737        })
738    }
739
740    async fn gc_unreachable(&self) -> Result<GcReport, StoreError> {
741        Ok(Store::gc_unreachable(self).await)
742    }
743}