Skip to main content

lash_sqlite_store/
persistence.rs

1//! The [`RuntimePersistence`] trait implementation for [`Store`].
2//!
3//! This is the tokio-rusqlite port of the prior store's `persistence.rs`. The
4//! public surface is byte-for-byte the prior store async trait: identical method
5//! names and signatures, so consumers swap backends with a path rename only.
6//!
7//! The translation rules (see `conn.rs`, `lifecycle.rs`, `blobs.rs`):
8//!
9//! * Pure reads run through `self.conn.call(move |conn| { ... })`.
10//! * Read-then-write paths run through `self.conn.write(move |tx| { ... })`
11//!   (`BEGIN IMMEDIATE`, commit on `Ok`, rollback on `Err`) — this is the
12//!   cross-process write-lock guard.
13//! * Paths that may abandon partially-applied writes (the queued-work claim)
14//!   run through `self.conn.write_flow`, deciding commit vs rollback via
15//!   [`TxOutcome`].
16//! * The shared `*_conn` helpers (`try_load_session_head_meta_from_conn`,
17//!   `Self::put_checkpoint_conn`, `Self::load_usage_deltas_conn`,
18//!   `Self::load_session_graph_from_conn`, the queued-work helpers, …) are
19//!   synchronous and take a `&rusqlite::Connection`, so they are reused from
20//!   inside these closures (a `&Transaction` derefs to `&Connection`).
21//! * Closures must be `'static` + `Send`: every borrow of `self`/caller data is
22//!   cloned into an owned value before being moved in.
23
24use super::*;
25
26#[async_trait::async_trait]
27impl RuntimePersistence for Store {
28    fn durability_tier(&self) -> DurabilityTier {
29        DurabilityTier::Durable
30    }
31
32    async fn load_session(
33        &self,
34        scope: SessionReadScope,
35    ) -> Result<Option<PersistedSessionRead>, StoreError> {
36        self.conn
37            .call(move |conn| {
38                let outcome: Result<Option<PersistedSessionRead>, StoreError> = (|| {
39                    let Some(meta) = try_load_session_head_meta_from_conn(conn)? else {
40                        return Ok(None);
41                    };
42                    let leaf_node_id = match &scope {
43                        SessionReadScope::FullGraph => meta.leaf_node_id.clone(),
44                        SessionReadScope::ActivePath { leaf_node_id } => {
45                            leaf_node_id.clone().or_else(|| meta.leaf_node_id.clone())
46                        }
47                    };
48                    let mut graph = match scope {
49                        SessionReadScope::FullGraph => {
50                            Self::load_session_graph_from_conn(conn, meta.leaf_node_id.clone())
51                        }
52                        SessionReadScope::ActivePath { .. } => {
53                            Self::load_active_path_session_graph_from_conn(
54                                conn,
55                                leaf_node_id.clone(),
56                            )
57                            .map_err(sqlite_error)?
58                        }
59                    };
60                    graph.set_leaf_node_id(leaf_node_id);
61                    let checkpoint = meta
62                        .checkpoint_ref
63                        .as_ref()
64                        .and_then(|blob_ref| Self::get_checkpoint_conn(conn, blob_ref));
65                    Ok(Some(PersistedSessionRead {
66                        session_id: meta.session_id,
67                        head_revision: meta.head_revision,
68                        config: meta.config,
69                        agent_frames: meta.agent_frames,
70                        current_agent_frame_id: meta.current_agent_frame_id,
71                        graph,
72                        checkpoint_ref: meta.checkpoint_ref,
73                        checkpoint,
74                        token_ledger: merge_token_ledger_entries(Self::load_usage_deltas_conn(
75                            conn,
76                        )),
77                    }))
78                })(
79                );
80                Ok(outcome)
81            })
82            .await
83            .map_err(sqlite_error)?
84    }
85
86    async fn load_node(
87        &self,
88        node_id: &str,
89    ) -> Result<Option<lash_core::SessionNodeRecord>, StoreError> {
90        let node_id = node_id.to_string();
91        let row: Option<String> = self
92            .conn
93            .call(move |conn| {
94                conn.query_row(
95                    "SELECT node_json FROM graph_nodes WHERE node_id = ?1 AND tombstoned = 0",
96                    params![node_id],
97                    |row| row.get(0),
98                )
99                .optional()
100            })
101            .await
102            .map_err(sqlite_error)?;
103        Ok(row.and_then(|json| serde_json::from_str(&json).ok()))
104    }
105
106    async fn commit_runtime_state(
107        &self,
108        commit: RuntimeCommit,
109    ) -> Result<RuntimeCommitResult, StoreError> {
110        let blob_profile = self.options.blob_profile;
111        let result = self
112            .conn
113            .write_flow(move |tx| {
114                let outcome: Result<RuntimeCommitResult, StoreError> = (|| {
115                    let existing = try_load_session_head_meta_from_conn(tx)?;
116                    if let Some(bound_session_id) =
117                        existing.as_ref().map(|meta| meta.session_id.as_str())
118                        && bound_session_id != commit.session_id
119                    {
120                        return Err(StoreError::SessionBindingMismatch {
121                            bound_session_id: bound_session_id.to_string(),
122                            attempted_session_id: commit.session_id.clone(),
123                        });
124                    }
125                    if let Some(completed) = &commit.turn_commit {
126                        if completed.session_id != commit.session_id {
127                            return Err(StoreError::RuntimeTurnCommitConflict {
128                                session_id: completed.session_id.clone(),
129                                turn_id: completed.turn_id.clone(),
130                            });
131                        }
132                        let prior: Option<(String, String)> = tx
133                            .query_row(
134                                "SELECT turn_commit_hash, result_json FROM runtime_turn_commits
135                                 WHERE session_id = ?1 AND turn_id = ?2",
136                                params![completed.session_id, completed.turn_id],
137                                |row| Ok((row.get(0)?, row.get(1)?)),
138                            )
139                            .optional()
140                            .map_err(sqlite_error)?;
141                        if let Some((turn_commit_hash, result_json)) = prior {
142                            if turn_commit_hash == completed.turn_commit_hash {
143                                let result: RuntimeCommitResult =
144                                    serde_json::from_str(&result_json).map_err(|err| {
145                                        StoreError::Backend(format!(
146                                            "failed to decode runtime turn commit result: {err}"
147                                        ))
148                                    })?;
149                                if let Some(completion) =
150                                    commit.release_session_execution_lease.as_ref()
151                                {
152                                    release_session_execution_lease_conn(tx, completion)?;
153                                }
154                                return Ok(result);
155                            }
156                            return Err(StoreError::RuntimeTurnCommitConflict {
157                                session_id: completed.session_id.clone(),
158                                turn_id: completed.turn_id.clone(),
159                            });
160                        }
161                    }
162                    let Some(session_execution_lease) = commit.session_execution_lease.as_ref()
163                    else {
164                        return Err(StoreError::SessionExecutionLeaseExpired {
165                            session_id: commit.session_id.clone(),
166                        });
167                    };
168                    ensure_session_execution_lease_conn(
169                        tx,
170                        &commit.session_id,
171                        session_execution_lease,
172                    )?;
173                    let actual_revision = existing.as_ref().map_or(0, |meta| meta.head_revision);
174                    if commit.expected_head_revision.is_some()
175                        && commit.expected_head_revision != Some(actual_revision)
176                    {
177                        return Err(StoreError::HeadRevisionConflict {
178                            expected: commit.expected_head_revision,
179                            actual: actual_revision,
180                        });
181                    }
182                    for completed in &commit.completed_queue_claims {
183                        if completed.session_id != commit.session_id {
184                            return Err(StoreError::QueuedWorkClaimExpired {
185                                session_id: completed.session_id.clone(),
186                                claim_id: completed.claim_id.clone(),
187                            });
188                        }
189                        ensure_queued_work_completion_conn(tx, completed)?;
190                    }
191
192                    let stored_checkpoint =
193                        Self::put_checkpoint_conn(tx, &commit.checkpoint, blob_profile)
194                            .map_err(sqlite_error)?;
195
196                    if !commit.usage_deltas.is_empty() {
197                        let mut stmt = tx
198                            .prepare(
199                                "INSERT INTO usage_deltas (
200                                    source, model, input_tokens, output_tokens, cached_input_tokens, reasoning_tokens
201                                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
202                            )
203                            .map_err(sqlite_error)?;
204                        for entry in &commit.usage_deltas {
205                            stmt.execute(params![
206                                entry.source,
207                                entry.model,
208                                entry.usage.input_tokens,
209                                entry.usage.output_tokens,
210                                entry.usage.cached_input_tokens,
211                                entry.usage.reasoning_tokens,
212                            ])
213                            .map_err(sqlite_error)?;
214                        }
215                    }
216
217                    let leaf_node_id = match &commit.graph {
218                        GraphCommitDelta::Unchanged { leaf_node_id } => leaf_node_id.clone(),
219                        GraphCommitDelta::Append {
220                            nodes,
221                            leaf_node_id,
222                        } => {
223                            for node in nodes {
224                                let node_json = encode_json(node);
225                                tx.execute(
226                                    "INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)",
227                                    params![node.node_id, node_json],
228                                )
229                                .map_err(sqlite_error)?;
230                            }
231                            leaf_node_id.clone()
232                        }
233                        GraphCommitDelta::ReplaceFull(graph) => {
234                            tx.execute("DELETE FROM graph_nodes", [])
235                                .map_err(sqlite_error)?;
236                            for node in &graph.nodes {
237                                let node_json = encode_json(node);
238                                tx.execute(
239                                    "INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)",
240                                    params![node.node_id, node_json],
241                                )
242                                .map_err(sqlite_error)?;
243                            }
244                            graph.leaf_node_id.clone()
245                        }
246                    };
247                    let graph_node_count: usize = tx
248                        .query_row(
249                            "SELECT COUNT(*) FROM graph_nodes WHERE tombstoned = 0",
250                            [],
251                            |row| row.get::<_, i64>(0),
252                        )
253                        .map_err(sqlite_error)? as usize;
254                    let next_revision = actual_revision + 1;
255                    let meta = SessionHeadMeta {
256                        session_id: commit.session_id.clone(),
257                        head_revision: next_revision,
258                        config: commit.config.clone(),
259                        agent_frames: commit.agent_frames.clone(),
260                        current_agent_frame_id: commit.current_agent_frame_id.clone(),
261                        checkpoint_ref: Some(stored_checkpoint.checkpoint_ref.clone()),
262                        leaf_node_id,
263                        graph_node_count,
264                        token_ledger: Vec::new(),
265                    };
266                    tx.execute(
267                        "INSERT OR REPLACE INTO session_head (singleton, session_id, head_json, head_revision)
268                         VALUES (1, ?1, ?2, ?3)",
269                        params![
270                            meta.session_id,
271                            encode_json(&meta),
272                            meta.head_revision as i64
273                        ],
274                    )
275                    .map_err(sqlite_error)?;
276                    for completed in &commit.completed_queue_claims {
277                        for batch_id in &completed.batch_ids {
278                            tx.execute(
279                                "DELETE FROM queued_work_batches
280                                 WHERE session_id = ?1
281                                   AND batch_id = ?2
282                                   AND claim_id = ?3
283                                   AND claim_token = ?4",
284                                params![
285                                    completed.session_id,
286                                    batch_id,
287                                    completed.claim_id,
288                                    completed.lease_token
289                                ],
290                            )
291                            .map_err(sqlite_error)?;
292                        }
293                    }
294                    if !commit.committed_attachment_ids.is_empty() {
295                        let now = current_epoch_ms() as i64;
296                        let mut stmt = tx
297                            .prepare(
298                                "UPDATE attachment_manifest
299                                 SET committed_at_ms = COALESCE(committed_at_ms, ?1)
300                                 WHERE attachment_id = ?2 AND session_id = ?3",
301                            )
302                            .map_err(sqlite_error)?;
303                        for id in &commit.committed_attachment_ids {
304                            stmt.execute(params![now, id.as_str(), commit.session_id])
305                                .map_err(sqlite_error)?;
306                        }
307                    }
308                    let result = RuntimeCommitResult {
309                        head_revision: next_revision,
310                        checkpoint_ref: stored_checkpoint.checkpoint_ref,
311                        manifest: stored_checkpoint.manifest,
312                    };
313                    if let Some(completed) = &commit.turn_commit {
314                        tx.execute(
315                            "INSERT INTO runtime_turn_commits (
316                                session_id, turn_id, turn_commit_hash, result_json, committed_at_ms
317                             )
318                             VALUES (?1, ?2, ?3, ?4, ?5)",
319                            params![
320                                completed.session_id,
321                                completed.turn_id,
322                                completed.turn_commit_hash,
323                                encode_json(&result),
324                                current_epoch_ms() as i64
325                            ],
326                        )
327                        .map_err(sqlite_error)?;
328                    }
329                    if let Some(completion) = commit.release_session_execution_lease.as_ref() {
330                        release_session_execution_lease_conn(tx, completion)?;
331                    }
332                    Ok(result)
333                })();
334                // Roll back on a `StoreError` so a failure after the first
335                // write (e.g. a head-revision conflict surfaced mid-commit, or a
336                // backend write error) does not leave the partial transaction
337                // committed, while still carrying the typed error to the caller.
338                match outcome {
339                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
340                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
341                }
342            })
343            .await
344            .map_err(sqlite_error)??;
345        self.maybe_auto_gc().await;
346        Ok(result)
347    }
348
349    async fn try_claim_session_execution_lease(
350        &self,
351        session_id: &str,
352        owner: &LeaseOwnerIdentity,
353        lease_ttl_ms: u64,
354    ) -> Result<SessionExecutionLeaseClaimOutcome, StoreError> {
355        let session_id = session_id.to_string();
356        let owner = owner.clone();
357        self.conn
358            .write_flow(move |tx| {
359                let outcome: Result<SessionExecutionLeaseClaimOutcome, StoreError> = (|| {
360                    let now = current_epoch_ms();
361                    let current = load_session_execution_lease_row_conn(tx, &session_id)?;
362                    if current.as_ref().is_some_and(|lease| {
363                        lease.lease_token.is_some() && lease.expires_at_ms > now
364                    }) {
365                        let current = current.expect("checked current lease is present");
366                        if current
367                            .owner
368                            .as_ref()
369                            .is_some_and(|current_owner| current_owner.same_incarnation(&owner))
370                        {
371                            let expires_at = now.saturating_add(lease_ttl_ms);
372                            tx.execute(
373                                "UPDATE session_execution_leases
374                                 SET lease_expires_at_ms = ?2
375                                 WHERE session_id = ?1",
376                                params![session_id, expires_at as i64],
377                            )
378                            .map_err(sqlite_error)?;
379                            return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
380                                SessionExecutionLease {
381                                    session_id,
382                                    owner,
383                                    lease_token: current.lease_token.expect("live lease token set"),
384                                    fencing_token: current.fencing_token,
385                                    claimed_at_epoch_ms: current.claimed_at_ms,
386                                    expires_at_epoch_ms: expires_at,
387                                },
388                            ));
389                        }
390                        return Ok(SessionExecutionLeaseClaimOutcome::Busy {
391                            holder: row_to_session_execution_lease(&session_id, current)?,
392                        });
393                    }
394                    Ok(SessionExecutionLeaseClaimOutcome::Acquired(
395                        acquire_session_execution_lease_conn(
396                            tx,
397                            &session_id,
398                            &owner,
399                            current.as_ref().map_or(0, |lease| lease.fencing_token),
400                            now,
401                            lease_ttl_ms,
402                        )?,
403                    ))
404                })(
405                );
406                match outcome {
407                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
408                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
409                }
410            })
411            .await
412            .map_err(sqlite_error)?
413    }
414
415    async fn reclaim_session_execution_lease(
416        &self,
417        session_id: &str,
418        owner: &LeaseOwnerIdentity,
419        observed_holder: &SessionExecutionLeaseFence,
420        lease_ttl_ms: u64,
421    ) -> Result<SessionExecutionLeaseClaimOutcome, StoreError> {
422        let session_id = session_id.to_string();
423        let owner = owner.clone();
424        let observed_holder = observed_holder.clone();
425        self.conn
426            .write_flow(move |tx| {
427                let outcome: Result<SessionExecutionLeaseClaimOutcome, StoreError> = (|| {
428                    let now = current_epoch_ms();
429                    let current = load_session_execution_lease_row_conn(tx, &session_id)?;
430                    let Some(current) = current else {
431                        return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
432                            acquire_session_execution_lease_conn(
433                                tx,
434                                &session_id,
435                                &owner,
436                                0,
437                                now,
438                                lease_ttl_ms,
439                            )?,
440                        ));
441                    };
442                    if current.lease_token.is_none() || current.expires_at_ms <= now {
443                        return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
444                            acquire_session_execution_lease_conn(
445                                tx,
446                                &session_id,
447                                &owner,
448                                current.fencing_token,
449                                now,
450                                lease_ttl_ms,
451                            )?,
452                        ));
453                    }
454                    let holder = row_to_session_execution_lease(&session_id, current)?;
455                    if observed_holder.session_id == session_id
456                        && holder.owner.same_incarnation(&observed_holder.owner)
457                        && holder.lease_token == observed_holder.lease_token
458                        && holder.fencing_token == observed_holder.fencing_token
459                        && holder.owner.is_definitely_dead_for_claimant(&owner)
460                    {
461                        let fencing_token = holder.fencing_token.saturating_add(1);
462                        let lease_token = format!(
463                            "{}:{}:{}:{now}:{fencing_token}",
464                            session_id, owner.owner_id, owner.incarnation_id
465                        );
466                        let expires_at = now.saturating_add(lease_ttl_ms);
467                        let liveness_json = encode_liveness(&owner.liveness)?;
468                        let changed = tx
469                            .execute(
470                                "UPDATE session_execution_leases
471                                 SET lease_owner_id = ?1,
472                                     lease_owner_incarnation_id = ?2,
473                                     lease_owner_liveness_json = ?3,
474                                     lease_token = ?4,
475                                     lease_fencing_token = ?5,
476                                     lease_claimed_at_ms = ?6,
477                                     lease_expires_at_ms = ?7
478                                 WHERE session_id = ?8
479                                   AND lease_owner_id = ?9
480                                   AND lease_owner_incarnation_id = ?10
481                                   AND lease_token = ?11
482                                   AND lease_fencing_token = ?12",
483                                params![
484                                    owner.owner_id,
485                                    owner.incarnation_id,
486                                    liveness_json,
487                                    lease_token,
488                                    fencing_token as i64,
489                                    now as i64,
490                                    expires_at as i64,
491                                    session_id,
492                                    observed_holder.owner.owner_id,
493                                    observed_holder.owner.incarnation_id,
494                                    observed_holder.lease_token,
495                                    observed_holder.fencing_token as i64,
496                                ],
497                            )
498                            .map_err(sqlite_error)?;
499                        if changed == 1 {
500                            return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
501                                SessionExecutionLease {
502                                    session_id,
503                                    owner,
504                                    lease_token,
505                                    fencing_token,
506                                    claimed_at_epoch_ms: now,
507                                    expires_at_epoch_ms: expires_at,
508                                },
509                            ));
510                        }
511                        let current = load_session_execution_lease_row_conn(tx, &session_id)?;
512                        if current.as_ref().is_some_and(|lease| {
513                            lease.lease_token.is_some() && lease.expires_at_ms > now
514                        }) {
515                            let current = current.expect("checked current lease is present");
516                            return Ok(SessionExecutionLeaseClaimOutcome::Busy {
517                                holder: row_to_session_execution_lease(&session_id, current)?,
518                            });
519                        }
520                        let previous_fencing_token =
521                            current.as_ref().map_or(0, |lease| lease.fencing_token);
522                        return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
523                            acquire_session_execution_lease_conn(
524                                tx,
525                                &session_id,
526                                &owner,
527                                previous_fencing_token,
528                                now,
529                                lease_ttl_ms,
530                            )?,
531                        ));
532                    }
533                    Ok(SessionExecutionLeaseClaimOutcome::Busy { holder })
534                })(
535                );
536                match outcome {
537                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
538                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
539                }
540            })
541            .await
542            .map_err(sqlite_error)?
543    }
544
545    async fn renew_session_execution_lease(
546        &self,
547        fence: &SessionExecutionLeaseFence,
548        lease_ttl_ms: u64,
549    ) -> Result<SessionExecutionLease, StoreError> {
550        let fence = fence.clone();
551        self.conn
552            .write_flow(move |tx| {
553                let outcome: Result<SessionExecutionLease, StoreError> = (|| {
554                    let now = current_epoch_ms();
555                    let current = load_session_execution_lease_row_conn(tx, &fence.session_id)?;
556                    let Some(current) = current else {
557                        return Err(StoreError::SessionExecutionLeaseExpired {
558                            session_id: fence.session_id.clone(),
559                        });
560                    };
561                    if !current
562                        .owner
563                        .as_ref()
564                        .is_some_and(|owner| owner.same_incarnation(&fence.owner))
565                        || current.lease_token.as_deref() != Some(fence.lease_token.as_str())
566                        || current.fencing_token != fence.fencing_token
567                        || current.expires_at_ms <= now
568                    {
569                        return Err(StoreError::SessionExecutionLeaseExpired {
570                            session_id: fence.session_id.clone(),
571                        });
572                    }
573                    let expires_at = now.saturating_add(lease_ttl_ms);
574                    tx.execute(
575                        "UPDATE session_execution_leases
576                         SET lease_expires_at_ms = ?5
577                         WHERE session_id = ?1
578                           AND lease_owner_id = ?2
579                           AND lease_owner_incarnation_id = ?3
580                           AND lease_token = ?4
581                           AND lease_fencing_token = ?6",
582                        params![
583                            fence.session_id,
584                            fence.owner.owner_id,
585                            fence.owner.incarnation_id,
586                            fence.lease_token,
587                            expires_at as i64,
588                            fence.fencing_token as i64
589                        ],
590                    )
591                    .map_err(sqlite_error)?;
592                    Ok(SessionExecutionLease {
593                        session_id: fence.session_id,
594                        owner: fence.owner,
595                        lease_token: fence.lease_token,
596                        fencing_token: fence.fencing_token,
597                        claimed_at_epoch_ms: current.claimed_at_ms,
598                        expires_at_epoch_ms: expires_at,
599                    })
600                })();
601                match outcome {
602                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
603                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
604                }
605            })
606            .await
607            .map_err(sqlite_error)?
608    }
609
610    async fn release_session_execution_lease(
611        &self,
612        completion: &SessionExecutionLeaseCompletion,
613    ) -> Result<(), StoreError> {
614        let completion = completion.clone();
615        self.conn
616            .write_flow(move |tx| {
617                let outcome = release_session_execution_lease_conn(tx, &completion);
618                match outcome {
619                    Ok(()) => Ok(TxOutcome::Commit(Ok(()))),
620                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
621                }
622            })
623            .await
624            .map_err(sqlite_error)?
625    }
626
627    async fn enqueue_queued_work(
628        &self,
629        batch: QueuedWorkBatchDraft,
630    ) -> Result<QueuedWorkBatch, StoreError> {
631        let nonce = self.commit_count.fetch_add(1, AtomicOrdering::Relaxed);
632        self.conn
633            .write_flow(move |tx| {
634                let outcome: Result<QueuedWorkBatch, StoreError> = (|| {
635                    if let Some(source_key) = batch.source_key.as_deref() {
636                        let existing_id: Option<String> = tx
637                            .query_row(
638                                "SELECT batch_id
639                                 FROM queued_work_batches
640                                 WHERE session_id = ?1 AND source_key = ?2",
641                                params![batch.session_id, source_key],
642                                |row| row.get(0),
643                            )
644                            .optional()
645                            .map_err(sqlite_error)?;
646                        if let Some(batch_id) = existing_id {
647                            let existing = load_queued_batch_by_id_conn(tx, &batch_id)?
648                                .ok_or_else(|| {
649                                    StoreError::Backend(
650                                        "queued work source row disappeared".to_string(),
651                                    )
652                                })?;
653                            return Ok(existing);
654                        }
655                    }
656                    let now = current_epoch_ms();
657                    let batch_id =
658                        derive_batch_id(&batch.session_id, batch.source_key.as_deref(), now, Some(nonce));
659                    tx.execute(
660                        "INSERT INTO queued_work_batches (
661                            batch_id, session_id, source_key, delivery_policy, slot_policy,
662                            merge_key_json, available_at_ms, enqueued_at_ms
663                         )
664                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
665                        params![
666                            batch_id,
667                            batch.session_id,
668                            batch.source_key.as_deref(),
669                            batch.delivery_policy.as_str(),
670                            batch.slot_policy.as_str(),
671                            encode_json(&batch.merge_key),
672                            batch.available_at_ms as i64,
673                            now as i64,
674                        ],
675                    )
676                    .map_err(sqlite_error)?;
677                    for (index, payload) in batch.payloads.iter().enumerate() {
678                        let item_id = format!("{batch_id}:item:{index}");
679                        tx.execute(
680                            "INSERT INTO queued_work_items (batch_id, item_index, item_id, payload_json)
681                             VALUES (?1, ?2, ?3, ?4)",
682                            params![batch_id, index as i64, item_id, encode_json(payload)],
683                        )
684                        .map_err(sqlite_error)?;
685                    }
686                    load_queued_batch_by_id_conn(tx, &batch_id)?.ok_or_else(|| {
687                        StoreError::Backend("queued work insert disappeared".to_string())
688                    })
689                })();
690                // Roll back the partially-inserted batch/items on a
691                // `StoreError` while still returning the typed error.
692                match outcome {
693                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
694                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
695                }
696            })
697            .await
698            .map_err(sqlite_error)?
699    }
700
701    async fn claim_leading_ready_session_command(
702        &self,
703        session_id: &str,
704        session_execution_lease: &SessionExecutionLeaseFence,
705        owner: &LeaseOwnerIdentity,
706        lease_ttl_ms: u64,
707    ) -> Result<Option<QueuedWorkClaim>, StoreError> {
708        let session_id = session_id.to_string();
709        let session_execution_lease = session_execution_lease.clone();
710        let owner = owner.clone();
711        self.conn
712            .write_flow(move |tx| {
713                let outcome: Result<TxOutcome<Option<QueuedWorkClaim>>, StoreError> = (|| {
714                    ensure_session_execution_lease_conn(
715                        tx,
716                        &session_id,
717                        &session_execution_lease,
718                    )?;
719                    let now = current_epoch_ms();
720                    let candidate_rows = {
721                        let mut stmt = tx
722                            .prepare(
723                                "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
724                                        slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
725                                        claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
726                                        claim_owner_liveness_json, claim_token, claim_expires_at_ms
727                                 FROM queued_work_batches
728                                 WHERE session_id = ?1
729                                   AND available_at_ms <= ?2
730                                 ORDER BY enqueue_seq ASC
731                                 LIMIT ?3",
732                            )
733                            .map_err(sqlite_error)?;
734                        let rows = stmt
735                            .query_map(
736                                params![session_id, now as i64, claim_scan_limit(1)],
737                                queued_batch_row_from_sql,
738                            )
739                            .map_err(sqlite_error)?;
740                        rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
741                    };
742                    let candidate_rows = candidate_rows
743                        .into_iter()
744                        .filter(|row| {
745                            row.claim_token.is_none()
746                                || row.claim_expires_at_ms <= now
747                                || row
748                                    .claim_owner
749                                    .as_ref()
750                                    .is_some_and(|holder| holder.is_definitely_dead_for_claimant(&owner))
751                        })
752                        .collect::<Vec<_>>();
753                    let candidate_batches = candidate_rows
754                        .iter()
755                        .map(|row| queued_work_batch_from_conn(tx, row.clone()))
756                        .collect::<Result<Vec<_>, StoreError>>()?;
757                    let candidates = candidate_rows
758                        .iter()
759                        .zip(candidate_batches.iter())
760                        .map(|(row, batch)| {
761                            Ok(ClaimCandidate {
762                                enqueue_seq: row.enqueue_seq,
763                                claim_fencing_token: row.claim_fencing_token,
764                                work_class: batch.work_class().ok_or_else(|| {
765                                    StoreError::Backend(format!(
766                                        "queued-work batch `{}` has mixed or empty payload classes",
767                                        batch.batch_id
768                                    ))
769                                })?,
770                                delivery_policy: decode_delivery_policy(
771                                    row.delivery_policy.clone(),
772                                )?,
773                                slot_policy: decode_slot_policy(row.slot_policy.clone())?,
774                                merge_key: decode_merge_key(row.merge_key_json.clone())?,
775                            })
776                        })
777                        .collect::<Result<Vec<_>, StoreError>>()?;
778                    let selected_len = select_leading_session_command(&candidates);
779                    if selected_len == 0 {
780                        return Ok(TxOutcome::Commit(None));
781                    }
782                    let mut selected = candidate_rows;
783                    selected.truncate(selected_len);
784                    let mut selected_batches = candidate_batches;
785                    selected_batches.truncate(selected_len);
786                    let lease = QueuedWorkClaimLease::derive(
787                        &candidates[0],
788                        &session_id,
789                        &owner,
790                        now,
791                        lease_ttl_ms,
792                    );
793                    let liveness_json = encode_liveness(&owner.liveness)?;
794                    for row in &selected {
795                        let claimed = tx
796                            .execute(
797                                "UPDATE queued_work_batches
798                                 SET claim_id = ?3,
799                                     claim_owner_id = ?4,
800                                     claim_owner_incarnation_id = ?5,
801                                     claim_owner_liveness_json = ?6,
802                                     claim_token = ?7,
803                                     claim_fencing_token = claim_fencing_token + 1,
804                                     claim_claimed_at_ms = ?8,
805                                     claim_expires_at_ms = ?9
806                                 WHERE session_id = ?1
807                                   AND batch_id = ?2
808                                   AND (
809                                        claim_token IS NULL
810                                        OR claim_expires_at_ms <= ?8
811                                        OR (
812                                            claim_token = ?10
813                                            AND claim_owner_id = ?11
814                                            AND claim_owner_incarnation_id = ?12
815                                        )
816                                   )",
817                                params![
818                                    session_id,
819                                    row.batch_id,
820                                    lease.claim_id,
821                                    owner.owner_id.as_str(),
822                                    owner.incarnation_id.as_str(),
823                                    liveness_json.as_str(),
824                                    lease.lease_token,
825                                    now as i64,
826                                    lease.expires_at_epoch_ms as i64,
827                                    row.claim_token,
828                                    row.claim_owner.as_ref().map(|owner| owner.owner_id.as_str()),
829                                    row.claim_owner
830                                        .as_ref()
831                                        .map(|owner| owner.incarnation_id.as_str())
832                                ],
833                            )
834                            .map_err(sqlite_error)?;
835                        if claimed == 0 {
836                            return Ok(TxOutcome::Rollback(None));
837                        }
838                    }
839                    Ok(TxOutcome::Commit(Some(QueuedWorkClaim {
840                        session_id: session_id.clone(),
841                        claim_id: lease.claim_id,
842                        owner: owner.clone(),
843                        lease_token: lease.lease_token,
844                        fencing_token: lease.fencing_token,
845                        claimed_at_epoch_ms: lease.claimed_at_epoch_ms,
846                        expires_at_epoch_ms: lease.expires_at_epoch_ms,
847                        batches: selected_batches,
848                    })))
849                })();
850                match outcome {
851                    Ok(TxOutcome::Commit(value)) => Ok(TxOutcome::Commit(Ok(value))),
852                    Ok(TxOutcome::Rollback(value)) => Ok(TxOutcome::Rollback(Ok(value))),
853                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
854                }
855            })
856            .await
857            .map_err(sqlite_error)?
858    }
859
860    async fn claim_ready_queued_work(
861        &self,
862        session_id: &str,
863        session_execution_lease: &SessionExecutionLeaseFence,
864        owner: &LeaseOwnerIdentity,
865        boundary: QueuedWorkClaimBoundary,
866        lease_ttl_ms: u64,
867        max_batches: usize,
868    ) -> Result<Option<QueuedWorkClaim>, StoreError> {
869        if max_batches == 0 {
870            return Ok(None);
871        }
872        let session_id = session_id.to_string();
873        let session_execution_lease = session_execution_lease.clone();
874        let owner = owner.clone();
875        self.conn
876            .write_flow(move |tx| {
877                let outcome: Result<TxOutcome<Option<QueuedWorkClaim>>, StoreError> = (|| {
878                    ensure_session_execution_lease_conn(
879                        tx,
880                        &session_id,
881                        &session_execution_lease,
882                    )?;
883                    let now = current_epoch_ms();
884                    let candidate_rows = {
885                        let mut stmt = tx
886                            .prepare(
887                                "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
888                                        slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
889                                        claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
890                                        claim_owner_liveness_json, claim_token, claim_expires_at_ms
891                                 FROM queued_work_batches
892                                 WHERE session_id = ?1
893                                   AND available_at_ms <= ?2
894                                 ORDER BY enqueue_seq ASC
895                                 LIMIT ?3",
896                            )
897                            .map_err(sqlite_error)?;
898                        let rows = stmt
899                            .query_map(
900                                params![session_id, now as i64, claim_scan_limit(max_batches)],
901                                queued_batch_row_from_sql,
902                            )
903                            .map_err(sqlite_error)?;
904                        rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
905                    };
906                    let candidate_rows = candidate_rows
907                        .into_iter()
908                        .filter(|row| {
909                            row.claim_token.is_none()
910                                || row.claim_expires_at_ms <= now
911                                || row
912                                    .claim_owner
913                                    .as_ref()
914                                    .is_some_and(|holder| holder.is_definitely_dead_for_claimant(&owner))
915                        })
916                        .collect::<Vec<_>>();
917                    let candidate_batches = candidate_rows
918                        .iter()
919                        .map(|row| queued_work_batch_from_conn(tx, row.clone()))
920                        .collect::<Result<Vec<_>, StoreError>>()?;
921                    let candidates = candidate_rows
922                        .iter()
923                        .zip(candidate_batches.iter())
924                        .map(|(row, batch)| {
925                            Ok(ClaimCandidate {
926                                enqueue_seq: row.enqueue_seq,
927                                claim_fencing_token: row.claim_fencing_token,
928                                work_class: batch.work_class().ok_or_else(|| {
929                                    StoreError::Backend(format!(
930                                        "queued-work batch `{}` has mixed or empty payload classes",
931                                        batch.batch_id
932                                    ))
933                                })?,
934                                delivery_policy: decode_delivery_policy(
935                                    row.delivery_policy.clone(),
936                                )?,
937                                slot_policy: decode_slot_policy(row.slot_policy.clone())?,
938                                merge_key: decode_merge_key(row.merge_key_json.clone())?,
939                            })
940                        })
941                        .collect::<Result<Vec<_>, StoreError>>()?;
942                    let selected_len =
943                        select_turn_work_claim_prefix(&candidates, boundary, max_batches);
944                    if selected_len == 0 {
945                        return Ok(TxOutcome::Commit(None));
946                    }
947                    let mut selected = candidate_rows;
948                    selected.truncate(selected_len);
949                    let mut selected_batches = candidate_batches;
950                    selected_batches.truncate(selected_len);
951                    let lease = QueuedWorkClaimLease::derive(
952                        &candidates[0],
953                        &session_id,
954                        &owner,
955                        now,
956                        lease_ttl_ms,
957                    );
958                    let liveness_json = encode_liveness(&owner.liveness)?;
959                    for row in &selected {
960                        // Under `BEGIN IMMEDIATE` this connection already holds
961                        // the write lock, but the row could still have been
962                        // claimed by an earlier committed writer (its
963                        // `claim_token` set and not yet expired). The `WHERE`
964                        // clause filters those out, so a 0-row update means we
965                        // lost the race for this batch: treat the whole claim as
966                        // not-won rather than returning a claim that doesn't
967                        // actually own the row.
968                        let claimed = tx
969                            .execute(
970                                "UPDATE queued_work_batches
971                                 SET claim_id = ?3,
972                                     claim_owner_id = ?4,
973                                     claim_owner_incarnation_id = ?5,
974                                     claim_owner_liveness_json = ?6,
975                                     claim_token = ?7,
976                                     claim_fencing_token = claim_fencing_token + 1,
977                                     claim_claimed_at_ms = ?8,
978                                     claim_expires_at_ms = ?9
979                                 WHERE session_id = ?1
980                                   AND batch_id = ?2
981                                   AND (
982                                        claim_token IS NULL
983                                        OR claim_expires_at_ms <= ?8
984                                        OR (
985                                            claim_token = ?10
986                                            AND claim_owner_id = ?11
987                                            AND claim_owner_incarnation_id = ?12
988                                        )
989                                   )",
990                                params![
991                                    session_id,
992                                    row.batch_id,
993                                    lease.claim_id,
994                                    owner.owner_id.as_str(),
995                                    owner.incarnation_id.as_str(),
996                                    liveness_json.as_str(),
997                                    lease.lease_token,
998                                    now as i64,
999                                    lease.expires_at_epoch_ms as i64,
1000                                    row.claim_token,
1001                                    row.claim_owner.as_ref().map(|owner| owner.owner_id.as_str()),
1002                                    row.claim_owner
1003                                        .as_ref()
1004                                        .map(|owner| owner.incarnation_id.as_str())
1005                                ],
1006                            )
1007                            .map_err(sqlite_error)?;
1008                        if claimed == 0 {
1009                            // Lost the race for this batch. Roll back any sibling
1010                            // rows we already claimed in this transaction so we
1011                            // never return a half-owned claim.
1012                            return Ok(TxOutcome::Rollback(None));
1013                        }
1014                    }
1015                    Ok(TxOutcome::Commit(Some(QueuedWorkClaim {
1016                        session_id: session_id.clone(),
1017                        claim_id: lease.claim_id,
1018                        owner: owner.clone(),
1019                        lease_token: lease.lease_token,
1020                        fencing_token: lease.fencing_token,
1021                        claimed_at_epoch_ms: lease.claimed_at_epoch_ms,
1022                        expires_at_epoch_ms: lease.expires_at_epoch_ms,
1023                        batches: selected_batches,
1024                    })))
1025                })();
1026                // Lower a `StoreError` into the rollback arm so the closure body
1027                // can keep using `?` while still propagating the error to the
1028                // caller. Encode it as a `Result` carried out of the flow.
1029                match outcome {
1030                    Ok(TxOutcome::Commit(value)) => Ok(TxOutcome::Commit(Ok(value))),
1031                    Ok(TxOutcome::Rollback(value)) => Ok(TxOutcome::Rollback(Ok(value))),
1032                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1033                }
1034            })
1035            .await
1036            .map_err(sqlite_error)?
1037    }
1038
1039    async fn renew_queued_work_claim(
1040        &self,
1041        claim: &QueuedWorkClaim,
1042        lease_ttl_ms: u64,
1043    ) -> Result<QueuedWorkClaim, StoreError> {
1044        let now = current_epoch_ms();
1045        let expires_at = now.saturating_add(lease_ttl_ms);
1046        let session_id = claim.session_id.clone();
1047        let claim_id = claim.claim_id.clone();
1048        let lease_token = claim.lease_token.clone();
1049        let changed = self
1050            .conn
1051            .write(move |tx| {
1052                tx.execute(
1053                    "UPDATE queued_work_batches
1054                     SET claim_expires_at_ms = ?4
1055                     WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
1056                    params![session_id, claim_id, lease_token, expires_at as i64],
1057                )
1058            })
1059            .await
1060            .map_err(sqlite_error)?;
1061        renewed_claim(claim, changed, expires_at)
1062    }
1063
1064    async fn abandon_queued_work_claim(&self, claim: &QueuedWorkClaim) -> Result<(), StoreError> {
1065        let session_id = claim.session_id.clone();
1066        let claim_id = claim.claim_id.clone();
1067        let lease_token = claim.lease_token.clone();
1068        self.conn
1069            .write(move |tx| {
1070                tx.execute(
1071                    "UPDATE queued_work_batches
1072                     SET claim_id = NULL,
1073                         claim_owner_id = NULL,
1074                         claim_owner_incarnation_id = NULL,
1075                         claim_owner_liveness_json = NULL,
1076                         claim_token = NULL,
1077                         claim_claimed_at_ms = 0,
1078                         claim_expires_at_ms = 0
1079                     WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
1080                    params![session_id, claim_id, lease_token],
1081                )
1082            })
1083            .await
1084            .map_err(sqlite_error)?;
1085        Ok(())
1086    }
1087
1088    async fn cancel_queued_work_batch(
1089        &self,
1090        session_id: &str,
1091        batch_id: &str,
1092    ) -> Result<Option<QueuedWorkBatch>, StoreError> {
1093        let session_id = session_id.to_string();
1094        let batch_id = batch_id.to_string();
1095        self.conn
1096            .write_flow(move |tx| {
1097                let outcome: Result<Option<QueuedWorkBatch>, StoreError> = (|| {
1098                    let now = current_epoch_ms() as i64;
1099                    let row = tx
1100                        .query_row(
1101                            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1102                                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1103                                    claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
1104                                    claim_owner_liveness_json, claim_token, claim_expires_at_ms
1105                             FROM queued_work_batches
1106                             WHERE session_id = ?1
1107                               AND batch_id = ?2
1108                               AND (claim_token IS NULL OR claim_expires_at_ms <= ?3)",
1109                            params![session_id, batch_id, now],
1110                            queued_batch_row_from_sql,
1111                        )
1112                        .optional()
1113                        .map_err(sqlite_error)?;
1114                    let Some(row) = row else {
1115                        return Ok(None);
1116                    };
1117                    let batch = queued_work_batch_from_conn(tx, row)?;
1118                    tx.execute(
1119                        "DELETE FROM queued_work_batches
1120                         WHERE session_id = ?1
1121                           AND batch_id = ?2
1122                           AND (claim_token IS NULL OR claim_expires_at_ms <= ?3)",
1123                        params![session_id, batch_id, now],
1124                    )
1125                    .map_err(sqlite_error)?;
1126                    Ok(Some(batch))
1127                })();
1128                match outcome {
1129                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
1130                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1131                }
1132            })
1133            .await
1134            .map_err(sqlite_error)?
1135    }
1136
1137    async fn list_queued_work(&self, session_id: &str) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1138        let session_id = session_id.to_string();
1139        self.conn
1140            .call(move |conn| {
1141                let outcome: Result<Vec<QueuedWorkBatch>, StoreError> = (|| {
1142                    let rows = {
1143                        let mut stmt = conn
1144                            .prepare(
1145                                "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1146                                        slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1147                                        claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
1148                                        claim_owner_liveness_json, claim_token, claim_expires_at_ms
1149                                 FROM queued_work_batches
1150                                 WHERE session_id = ?1
1151                                 ORDER BY enqueue_seq ASC",
1152                            )
1153                            .map_err(sqlite_error)?;
1154                        let rows = stmt
1155                            .query_map(params![session_id], queued_batch_row_from_sql)
1156                            .map_err(sqlite_error)?;
1157                        rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1158                    };
1159                    rows.into_iter()
1160                        .map(|row| queued_work_batch_from_conn(conn, row))
1161                        .collect()
1162                })();
1163                Ok(outcome)
1164            })
1165            .await
1166            .map_err(sqlite_error)?
1167    }
1168
1169    async fn list_pending_queued_work(
1170        &self,
1171        session_id: &str,
1172    ) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1173        let session_id = session_id.to_string();
1174        self.conn
1175            .call(move |conn| {
1176                let outcome: Result<Vec<QueuedWorkBatch>, StoreError> = (|| {
1177                    let now = current_epoch_ms();
1178                    let rows = {
1179                        let mut stmt = conn
1180                            .prepare(
1181                                "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1182                                        slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1183                                        claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
1184                                        claim_owner_liveness_json, claim_token, claim_expires_at_ms
1185                                 FROM queued_work_batches
1186                                 WHERE session_id = ?1
1187                                   AND (claim_token IS NULL OR claim_expires_at_ms <= ?2)
1188                                 ORDER BY enqueue_seq ASC",
1189                            )
1190                            .map_err(sqlite_error)?;
1191                        let rows = stmt
1192                            .query_map(
1193                                params![session_id, now as i64],
1194                                queued_batch_row_from_sql,
1195                            )
1196                            .map_err(sqlite_error)?;
1197                        rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1198                    };
1199                    rows.into_iter()
1200                        .map(|row| queued_work_batch_from_conn(conn, row))
1201                        .collect()
1202                })();
1203                Ok(outcome)
1204            })
1205            .await
1206            .map_err(sqlite_error)?
1207    }
1208
1209    async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError> {
1210        Store::save_session_meta(self, meta).await;
1211        Ok(())
1212    }
1213
1214    async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError> {
1215        Ok(Store::load_session_meta(self).await)
1216    }
1217
1218    async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError> {
1219        if ids.is_empty() {
1220            return Ok(());
1221        }
1222        let ids = ids.to_vec();
1223        self.conn
1224            .write(move |tx| {
1225                let mut stmt =
1226                    tx.prepare("UPDATE graph_nodes SET tombstoned = 1 WHERE node_id = ?1")?;
1227                for id in &ids {
1228                    stmt.execute(params![id])?;
1229                }
1230                Ok(())
1231            })
1232            .await
1233            .map_err(sqlite_error)
1234    }
1235
1236    async fn vacuum(&self) -> Result<VacuumReport, StoreError> {
1237        let removed = self
1238            .conn
1239            .write(move |tx| tx.execute("DELETE FROM graph_nodes WHERE tombstoned = 1", []))
1240            .await
1241            .map_err(sqlite_error)?;
1242        Ok(VacuumReport {
1243            removed_node_count: removed,
1244        })
1245    }
1246
1247    async fn gc_unreachable(&self) -> Result<GcReport, StoreError> {
1248        Ok(Store::gc_unreachable(self).await)
1249    }
1250}
1251
1252struct SessionExecutionLeaseRow {
1253    owner: Option<LeaseOwnerIdentity>,
1254    lease_token: Option<String>,
1255    fencing_token: u64,
1256    claimed_at_ms: u64,
1257    expires_at_ms: u64,
1258}
1259
1260fn load_session_execution_lease_row_conn(
1261    conn: &Connection,
1262    session_id: &str,
1263) -> Result<Option<SessionExecutionLeaseRow>, StoreError> {
1264    let row = conn
1265        .query_row(
1266            "SELECT lease_owner_id, lease_token, lease_fencing_token,
1267                    lease_claimed_at_ms, lease_expires_at_ms,
1268                    lease_owner_incarnation_id, lease_owner_liveness_json
1269             FROM session_execution_leases
1270             WHERE session_id = ?1",
1271            params![session_id],
1272            |row| {
1273                let owner_id: Option<String> = row.get(0)?;
1274                let incarnation_id: Option<String> = row.get(5)?;
1275                let liveness_json: Option<String> = row.get(6)?;
1276                Ok(SessionExecutionLeaseRow {
1277                    owner: lease_owner_from_columns(owner_id, incarnation_id, liveness_json),
1278                    lease_token: row.get(1)?,
1279                    fencing_token: row.get::<_, i64>(2)? as u64,
1280                    claimed_at_ms: row.get::<_, i64>(3)? as u64,
1281                    expires_at_ms: row.get::<_, i64>(4)? as u64,
1282                })
1283            },
1284        )
1285        .optional()
1286        .map_err(sqlite_error)?;
1287    Ok(row)
1288}
1289
1290fn lease_owner_from_columns(
1291    owner_id: Option<String>,
1292    incarnation_id: Option<String>,
1293    liveness_json: Option<String>,
1294) -> Option<LeaseOwnerIdentity> {
1295    owner_id.map(|owner_id| LeaseOwnerIdentity {
1296        incarnation_id: incarnation_id.unwrap_or_else(|| owner_id.clone()),
1297        owner_id,
1298        liveness: liveness_json
1299            .as_deref()
1300            .and_then(|json| serde_json::from_str(json).ok())
1301            .unwrap_or(LeaseOwnerLiveness::Opaque),
1302    })
1303}
1304
1305fn encode_liveness(liveness: &LeaseOwnerLiveness) -> Result<String, StoreError> {
1306    serde_json::to_string(liveness)
1307        .map_err(|err| StoreError::Backend(format!("failed to encode lease liveness: {err}")))
1308}
1309
1310fn row_to_session_execution_lease(
1311    session_id: &str,
1312    row: SessionExecutionLeaseRow,
1313) -> Result<SessionExecutionLease, StoreError> {
1314    Ok(SessionExecutionLease {
1315        session_id: session_id.to_string(),
1316        owner: row
1317            .owner
1318            .ok_or_else(|| StoreError::Backend("live session lease missing owner".to_string()))?,
1319        lease_token: row.lease_token.ok_or_else(|| {
1320            StoreError::Backend("live session lease missing lease token".to_string())
1321        })?,
1322        fencing_token: row.fencing_token,
1323        claimed_at_epoch_ms: row.claimed_at_ms,
1324        expires_at_epoch_ms: row.expires_at_ms,
1325    })
1326}
1327
1328fn acquire_session_execution_lease_conn(
1329    conn: &Connection,
1330    session_id: &str,
1331    owner: &LeaseOwnerIdentity,
1332    previous_fencing_token: u64,
1333    now: u64,
1334    lease_ttl_ms: u64,
1335) -> Result<SessionExecutionLease, StoreError> {
1336    let fencing_token = previous_fencing_token.saturating_add(1);
1337    let lease_token = format!(
1338        "{}:{}:{}:{now}:{fencing_token}",
1339        session_id, owner.owner_id, owner.incarnation_id
1340    );
1341    let expires_at = now.saturating_add(lease_ttl_ms);
1342    let liveness_json = encode_liveness(&owner.liveness)?;
1343    conn.execute(
1344        "INSERT INTO session_execution_leases (
1345            session_id, lease_owner_id, lease_owner_incarnation_id, lease_owner_liveness_json,
1346            lease_token, lease_fencing_token, lease_claimed_at_ms, lease_expires_at_ms
1347         )
1348         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1349         ON CONFLICT(session_id) DO UPDATE SET
1350            lease_owner_id = excluded.lease_owner_id,
1351            lease_owner_incarnation_id = excluded.lease_owner_incarnation_id,
1352            lease_owner_liveness_json = excluded.lease_owner_liveness_json,
1353            lease_token = excluded.lease_token,
1354            lease_fencing_token = excluded.lease_fencing_token,
1355            lease_claimed_at_ms = excluded.lease_claimed_at_ms,
1356            lease_expires_at_ms = excluded.lease_expires_at_ms",
1357        params![
1358            session_id,
1359            owner.owner_id,
1360            owner.incarnation_id,
1361            liveness_json,
1362            lease_token,
1363            fencing_token as i64,
1364            now as i64,
1365            expires_at as i64
1366        ],
1367    )
1368    .map_err(sqlite_error)?;
1369    Ok(SessionExecutionLease {
1370        session_id: session_id.to_string(),
1371        owner: owner.clone(),
1372        lease_token,
1373        fencing_token,
1374        claimed_at_epoch_ms: now,
1375        expires_at_epoch_ms: expires_at,
1376    })
1377}
1378
1379fn ensure_session_execution_lease_conn(
1380    conn: &Connection,
1381    session_id: &str,
1382    fence: &SessionExecutionLeaseFence,
1383) -> Result<(), StoreError> {
1384    if fence.session_id != session_id {
1385        return Err(StoreError::SessionExecutionLeaseExpired {
1386            session_id: session_id.to_string(),
1387        });
1388    }
1389    let now = current_epoch_ms();
1390    let current = load_session_execution_lease_row_conn(conn, session_id)?;
1391    let Some(current) = current else {
1392        return Err(StoreError::SessionExecutionLeaseExpired {
1393            session_id: session_id.to_string(),
1394        });
1395    };
1396    if current
1397        .owner
1398        .as_ref()
1399        .is_some_and(|owner| owner.same_incarnation(&fence.owner))
1400        && current.lease_token.as_deref() == Some(fence.lease_token.as_str())
1401        && current.fencing_token == fence.fencing_token
1402        && current.expires_at_ms > now
1403    {
1404        Ok(())
1405    } else {
1406        Err(StoreError::SessionExecutionLeaseExpired {
1407            session_id: session_id.to_string(),
1408        })
1409    }
1410}
1411
1412fn release_session_execution_lease_conn(
1413    conn: &Connection,
1414    completion: &SessionExecutionLeaseCompletion,
1415) -> Result<(), StoreError> {
1416    conn.execute(
1417        "UPDATE session_execution_leases
1418         SET lease_owner_id = NULL,
1419             lease_owner_incarnation_id = NULL,
1420             lease_owner_liveness_json = NULL,
1421             lease_token = NULL,
1422             lease_claimed_at_ms = 0,
1423             lease_expires_at_ms = 0
1424         WHERE session_id = ?1
1425           AND lease_owner_id = ?2
1426           AND lease_owner_incarnation_id = ?3
1427           AND lease_token = ?4
1428           AND lease_fencing_token = ?5",
1429        params![
1430            completion.session_id,
1431            completion.owner.owner_id,
1432            completion.owner.incarnation_id,
1433            completion.lease_token,
1434            completion.fencing_token as i64
1435        ],
1436    )
1437    .map_err(sqlite_error)?;
1438    Ok(())
1439}