Skip to main content

lash_sqlite_store/
persistence.rs

1//! The [`RuntimePersistence`] trait implementation for [`Store`].
2//!
3//! This is the tokio-rusqlite port of the prior store's `persistence.rs`. The
4//! public surface is byte-for-byte the prior store async trait: identical method
5//! names and signatures, so consumers swap backends with a path rename only.
6//!
7//! The translation rules (see `conn.rs`, `lifecycle.rs`, `blobs.rs`):
8//!
9//! * Pure reads run through `self.conn.call(move |conn| { ... })`.
10//! * Read-then-write paths run through `self.conn.write(move |tx| { ... })`
11//!   (`BEGIN IMMEDIATE`, commit on `Ok`, rollback on `Err`) — this is the
12//!   cross-process write-lock guard.
13//! * Paths that may abandon partially-applied writes (the queued-work claim)
14//!   run through `self.conn.write_flow`, deciding commit vs rollback via
15//!   [`TxOutcome`].
16//! * The shared `*_conn` helpers (`try_load_session_head_meta_from_conn`,
17//!   `Self::put_checkpoint_conn`, `Self::load_usage_deltas_conn`,
18//!   `Self::load_session_graph_from_conn`, the queued-work helpers, …) are
19//!   synchronous and take a `&rusqlite::Connection`, so they are reused from
20//!   inside these closures (a `&Transaction` derefs to `&Connection`).
21//! * Closures must be `'static` + `Send`: every borrow of `self`/caller data is
22//!   cloned into an owned value before being moved in.
23
24use super::*;
25
26#[async_trait::async_trait]
27impl RuntimePersistence for Store {
28    fn durability_tier(&self) -> DurabilityTier {
29        DurabilityTier::Durable
30    }
31
32    async fn load_session(
33        &self,
34        scope: SessionReadScope,
35    ) -> Result<Option<PersistedSessionRead>, StoreError> {
36        self.conn
37            .call(move |conn| {
38                let outcome: Result<Option<PersistedSessionRead>, StoreError> = (|| {
39                    let Some(meta) = try_load_session_head_meta_from_conn(conn)? else {
40                        return Ok(None);
41                    };
42                    let leaf_node_id = match &scope {
43                        SessionReadScope::FullGraph => meta.leaf_node_id.clone(),
44                        SessionReadScope::ActivePath { leaf_node_id } => {
45                            leaf_node_id.clone().or_else(|| meta.leaf_node_id.clone())
46                        }
47                    };
48                    let mut graph = match scope {
49                        SessionReadScope::FullGraph => {
50                            Self::load_session_graph_from_conn(conn, meta.leaf_node_id.clone())
51                        }
52                        SessionReadScope::ActivePath { .. } => {
53                            Self::load_active_path_session_graph_from_conn(
54                                conn,
55                                leaf_node_id.clone(),
56                            )
57                            .map_err(sqlite_error)?
58                        }
59                    };
60                    graph.set_leaf_node_id(leaf_node_id);
61                    let checkpoint = meta
62                        .checkpoint_ref
63                        .as_ref()
64                        .map(|blob_ref| Self::get_checkpoint_conn(conn, blob_ref))
65                        .transpose()?
66                        .flatten();
67                    Ok(Some(PersistedSessionRead {
68                        session_id: meta.session_id,
69                        head_revision: meta.head_revision,
70                        config: meta.config,
71                        agent_frames: meta.agent_frames,
72                        current_agent_frame_id: meta.current_agent_frame_id,
73                        graph,
74                        checkpoint_ref: meta.checkpoint_ref,
75                        checkpoint,
76                        token_ledger: merge_token_ledger_entries(Self::load_usage_deltas_conn(
77                            conn,
78                        )),
79                    }))
80                })(
81                );
82                Ok(outcome)
83            })
84            .await
85            .map_err(sqlite_error)?
86    }
87
88    async fn load_node(
89        &self,
90        node_id: &str,
91    ) -> Result<Option<lash_core::SessionNodeRecord>, StoreError> {
92        let node_id = node_id.to_string();
93        let row: Option<String> = self
94            .conn
95            .call(move |conn| {
96                conn.query_row(
97                    "SELECT node_json FROM graph_nodes WHERE node_id = ?1 AND tombstoned = 0",
98                    params![node_id],
99                    |row| row.get(0),
100                )
101                .optional()
102            })
103            .await
104            .map_err(sqlite_error)?;
105        Ok(row.and_then(|json| serde_json::from_str(&json).ok()))
106    }
107
108    async fn commit_runtime_state(
109        &self,
110        commit: RuntimeCommit,
111    ) -> Result<RuntimeCommitResult, StoreError> {
112        let blob_profile = self.options.blob_profile;
113        let result = self
114            .conn
115            .write_flow(move |tx| {
116                let outcome: Result<RuntimeCommitResult, StoreError> = (|| {
117                    let existing = try_load_session_head_meta_from_conn(tx)?;
118                    if let Some(bound_session_id) =
119                        existing.as_ref().map(|meta| meta.session_id.as_str())
120                        && bound_session_id != commit.session_id
121                    {
122                        return Err(StoreError::SessionBindingMismatch {
123                            bound_session_id: bound_session_id.to_string(),
124                            attempted_session_id: commit.session_id.clone(),
125                        });
126                    }
127                    if let Some(completed) = &commit.turn_commit {
128                        if completed.session_id != commit.session_id {
129                            return Err(StoreError::RuntimeTurnCommitConflict {
130                                session_id: completed.session_id.clone(),
131                                turn_id: completed.turn_id.clone(),
132                            });
133                        }
134                        let prior: Option<(String, String)> = tx
135                            .query_row(
136                                "SELECT turn_commit_hash, result_json FROM runtime_turn_commits
137                                 WHERE session_id = ?1 AND turn_id = ?2",
138                                params![completed.session_id, completed.turn_id],
139                                |row| Ok((row.get(0)?, row.get(1)?)),
140                            )
141                            .optional()
142                            .map_err(sqlite_error)?;
143                        if let Some((turn_commit_hash, result_json)) = prior {
144                            if turn_commit_hash == completed.turn_commit_hash {
145                                let result: RuntimeCommitResult =
146                                    serde_json::from_str(&result_json).map_err(|err| {
147                                        StoreError::Backend(format!(
148                                            "failed to decode runtime turn commit result: {err}"
149                                        ))
150                                    })?;
151                                if let Some(completion) =
152                                    commit.release_session_execution_lease.as_ref()
153                                {
154                                    release_session_execution_lease_conn(tx, completion)?;
155                                }
156                                return Ok(result);
157                            }
158                            return Err(StoreError::RuntimeTurnCommitConflict {
159                                session_id: completed.session_id.clone(),
160                                turn_id: completed.turn_id.clone(),
161                            });
162                        }
163                    }
164                    let Some(session_execution_lease) = commit.session_execution_lease.as_ref()
165                    else {
166                        return Err(StoreError::SessionExecutionLeaseExpired {
167                            session_id: commit.session_id.clone(),
168                        });
169                    };
170                    ensure_session_execution_lease_conn(
171                        tx,
172                        &commit.session_id,
173                        session_execution_lease,
174                    )?;
175                    let actual_revision = existing.as_ref().map_or(0, |meta| meta.head_revision);
176                    if commit.expected_head_revision.is_some()
177                        && commit.expected_head_revision != Some(actual_revision)
178                    {
179                        return Err(StoreError::HeadRevisionConflict {
180                            expected: commit.expected_head_revision,
181                            actual: actual_revision,
182                        });
183                    }
184                    for completed in &commit.completed_queue_claims {
185                        if completed.session_id != commit.session_id {
186                            return Err(StoreError::QueuedWorkClaimExpired {
187                                session_id: completed.session_id.clone(),
188                                claim_id: completed.claim_id.clone(),
189                            });
190                        }
191                        ensure_queued_work_completion_conn(tx, completed)?;
192                    }
193                    for completed in &commit.completed_turn_input_claims {
194                        if completed.session_id != commit.session_id {
195                            return Err(StoreError::TurnInputClaimExpired {
196                                session_id: completed.session_id.clone(),
197                                claim_id: completed.claim_id.clone(),
198                            });
199                        }
200                        let owned_rows: usize = tx
201                            .query_row(
202                                "SELECT COUNT(*)
203                                 FROM pending_turn_inputs
204                                 WHERE session_id = ?1
205                                   AND claim_id = ?2
206                                   AND claim_token = ?3",
207                                params![
208                                    completed.session_id,
209                                    completed.claim_id,
210                                    completed.lease_token
211                                ],
212                                |row| row.get::<_, i64>(0),
213                            )
214                            .map_err(sqlite_error)? as usize;
215                        ensure_turn_input_completion_owns_all_inputs(completed, owned_rows)?;
216                    }
217
218                    let stored_checkpoint =
219                        Self::put_checkpoint_conn(tx, &commit.checkpoint, blob_profile)
220                            .map_err(sqlite_error)?;
221
222                    if !commit.usage_deltas.is_empty() {
223                        let mut stmt = tx
224                            .prepare(
225                                "INSERT INTO usage_deltas (
226                                    source, model, input_tokens, output_tokens, cached_input_tokens, reasoning_tokens
227                                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
228                            )
229                            .map_err(sqlite_error)?;
230                        for entry in &commit.usage_deltas {
231                            stmt.execute(params![
232                                entry.source,
233                                entry.model,
234                                entry.usage.input_tokens,
235                                entry.usage.output_tokens,
236                                entry.usage.cached_input_tokens,
237                                entry.usage.reasoning_tokens,
238                            ])
239                            .map_err(sqlite_error)?;
240                        }
241                    }
242
243                    let leaf_node_id = match &commit.graph {
244                        GraphCommitDelta::Unchanged { leaf_node_id } => leaf_node_id.clone(),
245                        GraphCommitDelta::Append {
246                            nodes,
247                            leaf_node_id,
248                        } => {
249                            for node in nodes {
250                                let node_json = encode_json(node);
251                                tx.execute(
252                                    "INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)",
253                                    params![node.node_id, node_json],
254                                )
255                                .map_err(sqlite_error)?;
256                            }
257                            leaf_node_id.clone()
258                        }
259                        GraphCommitDelta::ReplaceFull(graph) => {
260                            tx.execute("DELETE FROM graph_nodes", [])
261                                .map_err(sqlite_error)?;
262                            for node in &graph.nodes {
263                                let node_json = encode_json(node);
264                                tx.execute(
265                                    "INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)",
266                                    params![node.node_id, node_json],
267                                )
268                                .map_err(sqlite_error)?;
269                            }
270                            graph.leaf_node_id.clone()
271                        }
272                    };
273                    let graph_node_count: usize = tx
274                        .query_row(
275                            "SELECT COUNT(*) FROM graph_nodes WHERE tombstoned = 0",
276                            [],
277                            |row| row.get::<_, i64>(0),
278                        )
279                        .map_err(sqlite_error)? as usize;
280                    let next_revision = actual_revision + 1;
281                    let meta = SessionHeadMeta {
282                        schema_version: lash_core::store::SESSION_HEAD_META_SCHEMA_VERSION,
283                        session_id: commit.session_id.clone(),
284                        head_revision: next_revision,
285                        config: commit.config.clone(),
286                        agent_frames: commit.agent_frames.clone(),
287                        current_agent_frame_id: commit.current_agent_frame_id.clone(),
288                        checkpoint_ref: Some(stored_checkpoint.checkpoint_ref.clone()),
289                        leaf_node_id,
290                        graph_node_count,
291                        token_ledger: Vec::new(),
292                    };
293                    tx.execute(
294                        "INSERT OR REPLACE INTO session_head (singleton, session_id, head_json, head_revision)
295                         VALUES (1, ?1, ?2, ?3)",
296                        params![
297                            meta.session_id,
298                            encode_json(&meta),
299                            meta.head_revision as i64
300                        ],
301                    )
302                    .map_err(sqlite_error)?;
303                    for completed in &commit.completed_queue_claims {
304                        for batch_id in &completed.batch_ids {
305                            tx.execute(
306                                "DELETE FROM queued_work_batches
307                                 WHERE session_id = ?1
308                                   AND batch_id = ?2
309                                   AND claim_id = ?3
310                                   AND claim_token = ?4",
311                                params![
312                                    completed.session_id,
313                                    batch_id,
314                                    completed.claim_id,
315                                    completed.lease_token
316                                ],
317                            )
318                            .map_err(sqlite_error)?;
319                        }
320                    }
321                    for completed in &commit.completed_turn_input_claims {
322                        for input_id in &completed.input_ids {
323                            tx.execute(
324                                "UPDATE pending_turn_inputs
325                                 SET state = ?5,
326                                     claim_id = NULL,
327                                     claim_owner_id = NULL,
328                                     claim_owner_incarnation_id = NULL,
329                                     claim_owner_liveness_json = NULL,
330                                     claim_token = NULL,
331                                     claim_claimed_at_ms = 0,
332                                     claim_expires_at_ms = 0
333                                 WHERE session_id = ?1
334                                   AND input_id = ?2
335                                   AND claim_id = ?3
336                                   AND claim_token = ?4",
337                                params![
338                                    completed.session_id,
339                                    input_id,
340                                    completed.claim_id,
341                                    completed.lease_token,
342                                    lash_core::TurnInputState::Completed.as_str(),
343                                ],
344                            )
345                            .map_err(sqlite_error)?;
346                        }
347                    }
348                    if let Some(turn_id) = commit.interrupted_turn_input_turn_id.as_deref() {
349                        let input_ids = {
350                            let mut stmt = tx
351                                .prepare(
352                                    "SELECT input_id, ingress_json
353                                     FROM pending_turn_inputs
354                                     WHERE session_id = ?1 AND state = ?2",
355                                )
356                                .map_err(sqlite_error)?;
357                            let rows = stmt
358                                .query_map(
359                                    params![
360                                        commit.session_id,
361                                        lash_core::TurnInputState::PendingActive.as_str()
362                                    ],
363                                    |row| {
364                                        Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
365                                    },
366                                )
367                                .map_err(sqlite_error)?;
368                            let mut input_ids = Vec::new();
369                            for row in rows {
370                                let (input_id, ingress_json) = row.map_err(sqlite_error)?;
371                                let ingress = decode_turn_input_ingress(ingress_json)?;
372                                if ingress
373                                    .active_turn_id()
374                                    .is_some_and(|active| active == turn_id)
375                                {
376                                    input_ids.push(input_id);
377                                }
378                            }
379                            input_ids
380                        };
381                        let next_turn_ingress = encode_json(&lash_core::TurnInputIngress::NextTurn);
382                        let mut stmt = tx
383                            .prepare(
384                                "UPDATE pending_turn_inputs
385                                 SET state = ?3,
386                                     ingress_json = ?4,
387                                     claim_id = NULL,
388                                     claim_owner_id = NULL,
389                                     claim_owner_incarnation_id = NULL,
390                                     claim_owner_liveness_json = NULL,
391                                     claim_token = NULL,
392                                     claim_claimed_at_ms = 0,
393                                     claim_expires_at_ms = 0
394                                 WHERE session_id = ?1 AND input_id = ?2",
395                            )
396                            .map_err(sqlite_error)?;
397                        for input_id in input_ids {
398                            stmt.execute(params![
399                                commit.session_id,
400                                input_id,
401                                lash_core::TurnInputState::DeferredNextTurn.as_str(),
402                                next_turn_ingress
403                            ])
404                            .map_err(sqlite_error)?;
405                        }
406                    }
407                    if !commit.committed_attachment_ids.is_empty() {
408                        let now = current_epoch_ms() as i64;
409                        let mut stmt = tx
410                            .prepare(
411                                "UPDATE attachment_manifest
412                                 SET committed_at_ms = COALESCE(committed_at_ms, ?1)
413                                 WHERE attachment_id = ?2 AND session_id = ?3",
414                            )
415                            .map_err(sqlite_error)?;
416                        for id in &commit.committed_attachment_ids {
417                            stmt.execute(params![now, id.as_str(), commit.session_id])
418                                .map_err(sqlite_error)?;
419                        }
420                    }
421                    let result = RuntimeCommitResult {
422                        head_revision: next_revision,
423                        checkpoint_ref: stored_checkpoint.checkpoint_ref,
424                        manifest: stored_checkpoint.manifest,
425                    };
426                    if let Some(completed) = &commit.turn_commit {
427                        tx.execute(
428                            "INSERT INTO runtime_turn_commits (
429                                session_id, turn_id, turn_commit_hash, result_json, committed_at_ms
430                             )
431                             VALUES (?1, ?2, ?3, ?4, ?5)",
432                            params![
433                                completed.session_id,
434                                completed.turn_id,
435                                completed.turn_commit_hash,
436                                encode_json(&result),
437                                current_epoch_ms() as i64
438                            ],
439                        )
440                        .map_err(sqlite_error)?;
441                    }
442                    if let Some(completion) = commit.release_session_execution_lease.as_ref() {
443                        release_session_execution_lease_conn(tx, completion)?;
444                    }
445                    Ok(result)
446                })();
447                // Roll back on a `StoreError` so a failure after the first
448                // write (e.g. a head-revision conflict surfaced mid-commit, or a
449                // backend write error) does not leave the partial transaction
450                // committed, while still carrying the typed error to the caller.
451                match outcome {
452                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
453                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
454                }
455            })
456            .await
457            .map_err(sqlite_error)??;
458        self.maybe_auto_gc().await;
459        Ok(result)
460    }
461
462    async fn try_claim_session_execution_lease(
463        &self,
464        session_id: &str,
465        owner: &LeaseOwnerIdentity,
466        lease_ttl_ms: u64,
467    ) -> Result<SessionExecutionLeaseClaimOutcome, StoreError> {
468        let session_id = session_id.to_string();
469        let owner = owner.clone();
470        self.conn
471            .write_flow(move |tx| {
472                let outcome: Result<SessionExecutionLeaseClaimOutcome, StoreError> = (|| {
473                    let now = current_epoch_ms();
474                    let current = load_session_execution_lease_row_conn(tx, &session_id)?;
475                    if current.as_ref().is_some_and(|lease| {
476                        lease.lease_token.is_some() && lease.expires_at_ms > now
477                    }) {
478                        let current = current.expect("checked current lease is present");
479                        if current
480                            .owner
481                            .as_ref()
482                            .is_some_and(|current_owner| current_owner.same_incarnation(&owner))
483                        {
484                            let expires_at = now.saturating_add(lease_ttl_ms);
485                            tx.execute(
486                                "UPDATE session_execution_leases
487                                 SET lease_expires_at_ms = ?2
488                                 WHERE session_id = ?1",
489                                params![session_id, expires_at as i64],
490                            )
491                            .map_err(sqlite_error)?;
492                            return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
493                                SessionExecutionLease {
494                                    session_id,
495                                    owner,
496                                    lease_token: current.lease_token.expect("live lease token set"),
497                                    fencing_token: current.fencing_token,
498                                    claimed_at_epoch_ms: current.claimed_at_ms,
499                                    expires_at_epoch_ms: expires_at,
500                                },
501                            ));
502                        }
503                        return Ok(SessionExecutionLeaseClaimOutcome::Busy {
504                            holder: row_to_session_execution_lease(&session_id, current)?,
505                        });
506                    }
507                    Ok(SessionExecutionLeaseClaimOutcome::Acquired(
508                        acquire_session_execution_lease_conn(
509                            tx,
510                            &session_id,
511                            &owner,
512                            current.as_ref().map_or(0, |lease| lease.fencing_token),
513                            now,
514                            lease_ttl_ms,
515                        )?,
516                    ))
517                })(
518                );
519                match outcome {
520                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
521                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
522                }
523            })
524            .await
525            .map_err(sqlite_error)?
526    }
527
528    async fn reclaim_session_execution_lease(
529        &self,
530        session_id: &str,
531        owner: &LeaseOwnerIdentity,
532        observed_holder: &SessionExecutionLeaseFence,
533        lease_ttl_ms: u64,
534    ) -> Result<SessionExecutionLeaseClaimOutcome, StoreError> {
535        let session_id = session_id.to_string();
536        let owner = owner.clone();
537        let observed_holder = observed_holder.clone();
538        self.conn
539            .write_flow(move |tx| {
540                let outcome: Result<SessionExecutionLeaseClaimOutcome, StoreError> = (|| {
541                    let now = current_epoch_ms();
542                    let current = load_session_execution_lease_row_conn(tx, &session_id)?;
543                    let Some(current) = current else {
544                        return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
545                            acquire_session_execution_lease_conn(
546                                tx,
547                                &session_id,
548                                &owner,
549                                0,
550                                now,
551                                lease_ttl_ms,
552                            )?,
553                        ));
554                    };
555                    if current.lease_token.is_none() || current.expires_at_ms <= now {
556                        return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
557                            acquire_session_execution_lease_conn(
558                                tx,
559                                &session_id,
560                                &owner,
561                                current.fencing_token,
562                                now,
563                                lease_ttl_ms,
564                            )?,
565                        ));
566                    }
567                    let holder = row_to_session_execution_lease(&session_id, current)?;
568                    if observed_holder.session_id == session_id
569                        && holder.owner.same_incarnation(&observed_holder.owner)
570                        && holder.lease_token == observed_holder.lease_token
571                        && holder.fencing_token == observed_holder.fencing_token
572                        && holder.owner.is_definitely_dead_for_claimant(&owner)
573                    {
574                        let fencing_token = holder.fencing_token.saturating_add(1);
575                        let lease_token = format!(
576                            "{}:{}:{}:{now}:{fencing_token}",
577                            session_id, owner.owner_id, owner.incarnation_id
578                        );
579                        let expires_at = now.saturating_add(lease_ttl_ms);
580                        let liveness_json = encode_liveness(&owner.liveness)?;
581                        let changed = tx
582                            .execute(
583                                "UPDATE session_execution_leases
584                                 SET lease_owner_id = ?1,
585                                     lease_owner_incarnation_id = ?2,
586                                     lease_owner_liveness_json = ?3,
587                                     lease_token = ?4,
588                                     lease_fencing_token = ?5,
589                                     lease_claimed_at_ms = ?6,
590                                     lease_expires_at_ms = ?7
591                                 WHERE session_id = ?8
592                                   AND lease_owner_id = ?9
593                                   AND lease_owner_incarnation_id = ?10
594                                   AND lease_token = ?11
595                                   AND lease_fencing_token = ?12",
596                                params![
597                                    owner.owner_id,
598                                    owner.incarnation_id,
599                                    liveness_json,
600                                    lease_token,
601                                    fencing_token as i64,
602                                    now as i64,
603                                    expires_at as i64,
604                                    session_id,
605                                    observed_holder.owner.owner_id,
606                                    observed_holder.owner.incarnation_id,
607                                    observed_holder.lease_token,
608                                    observed_holder.fencing_token as i64,
609                                ],
610                            )
611                            .map_err(sqlite_error)?;
612                        if changed == 1 {
613                            return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
614                                SessionExecutionLease {
615                                    session_id,
616                                    owner,
617                                    lease_token,
618                                    fencing_token,
619                                    claimed_at_epoch_ms: now,
620                                    expires_at_epoch_ms: expires_at,
621                                },
622                            ));
623                        }
624                        let current = load_session_execution_lease_row_conn(tx, &session_id)?;
625                        if current.as_ref().is_some_and(|lease| {
626                            lease.lease_token.is_some() && lease.expires_at_ms > now
627                        }) {
628                            let current = current.expect("checked current lease is present");
629                            return Ok(SessionExecutionLeaseClaimOutcome::Busy {
630                                holder: row_to_session_execution_lease(&session_id, current)?,
631                            });
632                        }
633                        let previous_fencing_token =
634                            current.as_ref().map_or(0, |lease| lease.fencing_token);
635                        return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
636                            acquire_session_execution_lease_conn(
637                                tx,
638                                &session_id,
639                                &owner,
640                                previous_fencing_token,
641                                now,
642                                lease_ttl_ms,
643                            )?,
644                        ));
645                    }
646                    Ok(SessionExecutionLeaseClaimOutcome::Busy { holder })
647                })(
648                );
649                match outcome {
650                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
651                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
652                }
653            })
654            .await
655            .map_err(sqlite_error)?
656    }
657
658    async fn renew_session_execution_lease(
659        &self,
660        fence: &SessionExecutionLeaseFence,
661        lease_ttl_ms: u64,
662    ) -> Result<SessionExecutionLease, StoreError> {
663        let fence = fence.clone();
664        self.conn
665            .write_flow(move |tx| {
666                let outcome: Result<SessionExecutionLease, StoreError> = (|| {
667                    let now = current_epoch_ms();
668                    let current = load_session_execution_lease_row_conn(tx, &fence.session_id)?;
669                    let Some(current) = current else {
670                        return Err(StoreError::SessionExecutionLeaseExpired {
671                            session_id: fence.session_id.clone(),
672                        });
673                    };
674                    if !current
675                        .owner
676                        .as_ref()
677                        .is_some_and(|owner| owner.same_incarnation(&fence.owner))
678                        || current.lease_token.as_deref() != Some(fence.lease_token.as_str())
679                        || current.fencing_token != fence.fencing_token
680                        || current.expires_at_ms <= now
681                    {
682                        return Err(StoreError::SessionExecutionLeaseExpired {
683                            session_id: fence.session_id.clone(),
684                        });
685                    }
686                    let expires_at = now.saturating_add(lease_ttl_ms);
687                    tx.execute(
688                        "UPDATE session_execution_leases
689                         SET lease_expires_at_ms = ?5
690                         WHERE session_id = ?1
691                           AND lease_owner_id = ?2
692                           AND lease_owner_incarnation_id = ?3
693                           AND lease_token = ?4
694                           AND lease_fencing_token = ?6",
695                        params![
696                            fence.session_id,
697                            fence.owner.owner_id,
698                            fence.owner.incarnation_id,
699                            fence.lease_token,
700                            expires_at as i64,
701                            fence.fencing_token as i64
702                        ],
703                    )
704                    .map_err(sqlite_error)?;
705                    Ok(SessionExecutionLease {
706                        session_id: fence.session_id,
707                        owner: fence.owner,
708                        lease_token: fence.lease_token,
709                        fencing_token: fence.fencing_token,
710                        claimed_at_epoch_ms: current.claimed_at_ms,
711                        expires_at_epoch_ms: expires_at,
712                    })
713                })();
714                match outcome {
715                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
716                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
717                }
718            })
719            .await
720            .map_err(sqlite_error)?
721    }
722
723    async fn release_session_execution_lease(
724        &self,
725        completion: &SessionExecutionLeaseCompletion,
726    ) -> Result<(), StoreError> {
727        let completion = completion.clone();
728        self.conn
729            .write_flow(move |tx| {
730                let outcome = release_session_execution_lease_conn(tx, &completion);
731                match outcome {
732                    Ok(()) => Ok(TxOutcome::Commit(Ok(()))),
733                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
734                }
735            })
736            .await
737            .map_err(sqlite_error)?
738    }
739
740    async fn enqueue_queued_work(
741        &self,
742        batch: QueuedWorkBatchDraft,
743    ) -> Result<QueuedWorkBatch, StoreError> {
744        let nonce = self.commit_count.fetch_add(1, AtomicOrdering::Relaxed);
745        self.conn
746            .write_flow(move |tx| {
747                let outcome: Result<QueuedWorkBatch, StoreError> = (|| {
748                    if let Some(source_key) = batch.source_key.as_deref() {
749                        let existing_id: Option<String> = tx
750                            .query_row(
751                                "SELECT batch_id
752                                 FROM queued_work_batches
753                                 WHERE session_id = ?1 AND source_key = ?2",
754                                params![batch.session_id, source_key],
755                                |row| row.get(0),
756                            )
757                            .optional()
758                            .map_err(sqlite_error)?;
759                        if let Some(batch_id) = existing_id {
760                            let existing = load_queued_batch_by_id_conn(tx, &batch_id)?
761                                .ok_or_else(|| {
762                                    StoreError::Backend(
763                                        "queued work source row disappeared".to_string(),
764                                    )
765                                })?;
766                            return Ok(existing);
767                        }
768                    }
769                    let now = current_epoch_ms();
770                    let batch_id =
771                        derive_batch_id(&batch.session_id, batch.source_key.as_deref(), now, Some(nonce));
772                    tx.execute(
773                        "INSERT INTO queued_work_batches (
774                            batch_id, session_id, source_key, delivery_policy, slot_policy,
775                            merge_key_json, available_at_ms, enqueued_at_ms
776                         )
777                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
778                        params![
779                            batch_id,
780                            batch.session_id,
781                            batch.source_key.as_deref(),
782                            batch.delivery_policy.as_str(),
783                            batch.slot_policy.as_str(),
784                            encode_json(&batch.merge_key),
785                            batch.available_at_ms as i64,
786                            now as i64,
787                        ],
788                    )
789                    .map_err(sqlite_error)?;
790                    for (index, payload) in batch.payloads.iter().enumerate() {
791                        let item_id = format!("{batch_id}:item:{index}");
792                        tx.execute(
793                            "INSERT INTO queued_work_items (batch_id, item_index, item_id, payload_json)
794                             VALUES (?1, ?2, ?3, ?4)",
795                            params![batch_id, index as i64, item_id, encode_json(payload)],
796                        )
797                        .map_err(sqlite_error)?;
798                    }
799                    load_queued_batch_by_id_conn(tx, &batch_id)?.ok_or_else(|| {
800                        StoreError::Backend("queued work insert disappeared".to_string())
801                    })
802                })();
803                // Roll back the partially-inserted batch/items on a
804                // `StoreError` while still returning the typed error.
805                match outcome {
806                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
807                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
808                }
809            })
810            .await
811            .map_err(sqlite_error)?
812    }
813
814    async fn claim_leading_ready_session_command(
815        &self,
816        session_id: &str,
817        session_execution_lease: &SessionExecutionLeaseFence,
818        owner: &LeaseOwnerIdentity,
819        lease_ttl_ms: u64,
820    ) -> Result<Option<QueuedWorkClaim>, StoreError> {
821        let session_id = session_id.to_string();
822        let session_execution_lease = session_execution_lease.clone();
823        let owner = owner.clone();
824        self.conn
825            .write_flow(move |tx| {
826                let outcome: Result<TxOutcome<Option<QueuedWorkClaim>>, StoreError> = (|| {
827                    ensure_session_execution_lease_conn(
828                        tx,
829                        &session_id,
830                        &session_execution_lease,
831                    )?;
832                    let now = current_epoch_ms();
833                    let candidate_rows = {
834                        let mut stmt = tx
835                            .prepare(
836                                "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
837                                        slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
838                                        claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
839                                        claim_owner_liveness_json, claim_token, claim_expires_at_ms
840                                 FROM queued_work_batches
841                                 WHERE session_id = ?1
842                                   AND available_at_ms <= ?2
843                                 ORDER BY enqueue_seq ASC
844                                 LIMIT ?3",
845                            )
846                            .map_err(sqlite_error)?;
847                        let rows = stmt
848                            .query_map(
849                                params![session_id, now as i64, claim_scan_limit(1)],
850                                queued_batch_row_from_sql,
851                            )
852                            .map_err(sqlite_error)?;
853                        rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
854                    };
855                    let candidate_rows = candidate_rows
856                        .into_iter()
857                        .filter(|row| {
858                            row.claim_token.is_none()
859                                || row.claim_expires_at_ms <= now
860                                || row
861                                    .claim_owner
862                                    .as_ref()
863                                    .is_some_and(|holder| holder.is_definitely_dead_for_claimant(&owner))
864                        })
865                        .collect::<Vec<_>>();
866                    let candidate_batches = candidate_rows
867                        .iter()
868                        .map(|row| queued_work_batch_from_conn(tx, row.clone()))
869                        .collect::<Result<Vec<_>, StoreError>>()?;
870                    let candidates = candidate_rows
871                        .iter()
872                        .zip(candidate_batches.iter())
873                        .map(|(row, batch)| {
874                            Ok(ClaimCandidate {
875                                enqueue_seq: row.enqueue_seq,
876                                claim_fencing_token: row.claim_fencing_token,
877                                work_class: batch.work_class().ok_or_else(|| {
878                                    StoreError::Backend(format!(
879                                        "queued-work batch `{}` has mixed or empty payload classes",
880                                        batch.batch_id
881                                    ))
882                                })?,
883                                delivery_policy: decode_delivery_policy(
884                                    row.delivery_policy.clone(),
885                                )?,
886                                slot_policy: decode_slot_policy(row.slot_policy.clone())?,
887                                merge_key: decode_merge_key(row.merge_key_json.clone())?,
888                            })
889                        })
890                        .collect::<Result<Vec<_>, StoreError>>()?;
891                    let selected_len = select_leading_session_command(&candidates);
892                    if selected_len == 0 {
893                        return Ok(TxOutcome::Commit(None));
894                    }
895                    let mut selected = candidate_rows;
896                    selected.truncate(selected_len);
897                    let mut selected_batches = candidate_batches;
898                    selected_batches.truncate(selected_len);
899                    let lease = QueuedWorkClaimLease::derive(
900                        &candidates[0],
901                        &session_id,
902                        &owner,
903                        now,
904                        lease_ttl_ms,
905                    );
906                    let liveness_json = encode_liveness(&owner.liveness)?;
907                    for row in &selected {
908                        let claimed = tx
909                            .execute(
910                                "UPDATE queued_work_batches
911                                 SET claim_id = ?3,
912                                     claim_owner_id = ?4,
913                                     claim_owner_incarnation_id = ?5,
914                                     claim_owner_liveness_json = ?6,
915                                     claim_token = ?7,
916                                     claim_fencing_token = claim_fencing_token + 1,
917                                     claim_claimed_at_ms = ?8,
918                                     claim_expires_at_ms = ?9
919                                 WHERE session_id = ?1
920                                   AND batch_id = ?2
921                                   AND (
922                                        claim_token IS NULL
923                                        OR claim_expires_at_ms <= ?8
924                                        OR (
925                                            claim_token = ?10
926                                            AND claim_owner_id = ?11
927                                            AND claim_owner_incarnation_id = ?12
928                                        )
929                                   )",
930                                params![
931                                    session_id,
932                                    row.batch_id,
933                                    lease.claim_id,
934                                    owner.owner_id.as_str(),
935                                    owner.incarnation_id.as_str(),
936                                    liveness_json.as_str(),
937                                    lease.lease_token,
938                                    now as i64,
939                                    lease.expires_at_epoch_ms as i64,
940                                    row.claim_token,
941                                    row.claim_owner.as_ref().map(|owner| owner.owner_id.as_str()),
942                                    row.claim_owner
943                                        .as_ref()
944                                        .map(|owner| owner.incarnation_id.as_str())
945                                ],
946                            )
947                            .map_err(sqlite_error)?;
948                        if claimed == 0 {
949                            return Ok(TxOutcome::Rollback(None));
950                        }
951                    }
952                    Ok(TxOutcome::Commit(Some(QueuedWorkClaim {
953                        session_id: session_id.clone(),
954                        claim_id: lease.claim_id,
955                        owner: owner.clone(),
956                        lease_token: lease.lease_token,
957                        fencing_token: lease.fencing_token,
958                        claimed_at_epoch_ms: lease.claimed_at_epoch_ms,
959                        expires_at_epoch_ms: lease.expires_at_epoch_ms,
960                        batches: selected_batches,
961                    })))
962                })();
963                match outcome {
964                    Ok(TxOutcome::Commit(value)) => Ok(TxOutcome::Commit(Ok(value))),
965                    Ok(TxOutcome::Rollback(value)) => Ok(TxOutcome::Rollback(Ok(value))),
966                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
967                }
968            })
969            .await
970            .map_err(sqlite_error)?
971    }
972
973    async fn claim_ready_queued_work(
974        &self,
975        session_id: &str,
976        session_execution_lease: &SessionExecutionLeaseFence,
977        owner: &LeaseOwnerIdentity,
978        boundary: QueuedWorkClaimBoundary,
979        lease_ttl_ms: u64,
980        max_batches: usize,
981    ) -> Result<Option<QueuedWorkClaim>, StoreError> {
982        if max_batches == 0 {
983            return Ok(None);
984        }
985        let session_id = session_id.to_string();
986        let session_execution_lease = session_execution_lease.clone();
987        let owner = owner.clone();
988        self.conn
989            .write_flow(move |tx| {
990                let outcome: Result<TxOutcome<Option<QueuedWorkClaim>>, StoreError> = (|| {
991                    ensure_session_execution_lease_conn(
992                        tx,
993                        &session_id,
994                        &session_execution_lease,
995                    )?;
996                    let now = current_epoch_ms();
997                    let candidate_rows = {
998                        let mut stmt = tx
999                            .prepare(
1000                                "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1001                                        slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1002                                        claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
1003                                        claim_owner_liveness_json, claim_token, claim_expires_at_ms
1004                                 FROM queued_work_batches
1005                                 WHERE session_id = ?1
1006                                   AND available_at_ms <= ?2
1007                                 ORDER BY enqueue_seq ASC
1008                                 LIMIT ?3",
1009                            )
1010                            .map_err(sqlite_error)?;
1011                        let rows = stmt
1012                            .query_map(
1013                                params![session_id, now as i64, claim_scan_limit(max_batches)],
1014                                queued_batch_row_from_sql,
1015                            )
1016                            .map_err(sqlite_error)?;
1017                        rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1018                    };
1019                    let candidate_rows = candidate_rows
1020                        .into_iter()
1021                        .filter(|row| {
1022                            row.claim_token.is_none()
1023                                || row.claim_expires_at_ms <= now
1024                                || row
1025                                    .claim_owner
1026                                    .as_ref()
1027                                    .is_some_and(|holder| holder.is_definitely_dead_for_claimant(&owner))
1028                        })
1029                        .collect::<Vec<_>>();
1030                    let candidate_batches = candidate_rows
1031                        .iter()
1032                        .map(|row| queued_work_batch_from_conn(tx, row.clone()))
1033                        .collect::<Result<Vec<_>, StoreError>>()?;
1034                    let candidates = candidate_rows
1035                        .iter()
1036                        .zip(candidate_batches.iter())
1037                        .map(|(row, batch)| {
1038                            Ok(ClaimCandidate {
1039                                enqueue_seq: row.enqueue_seq,
1040                                claim_fencing_token: row.claim_fencing_token,
1041                                work_class: batch.work_class().ok_or_else(|| {
1042                                    StoreError::Backend(format!(
1043                                        "queued-work batch `{}` has mixed or empty payload classes",
1044                                        batch.batch_id
1045                                    ))
1046                                })?,
1047                                delivery_policy: decode_delivery_policy(
1048                                    row.delivery_policy.clone(),
1049                                )?,
1050                                slot_policy: decode_slot_policy(row.slot_policy.clone())?,
1051                                merge_key: decode_merge_key(row.merge_key_json.clone())?,
1052                            })
1053                        })
1054                        .collect::<Result<Vec<_>, StoreError>>()?;
1055                    let selected_len =
1056                        select_turn_work_claim_prefix(&candidates, boundary, max_batches);
1057                    if selected_len == 0 {
1058                        return Ok(TxOutcome::Commit(None));
1059                    }
1060                    let mut selected = candidate_rows;
1061                    selected.truncate(selected_len);
1062                    let mut selected_batches = candidate_batches;
1063                    selected_batches.truncate(selected_len);
1064                    let lease = QueuedWorkClaimLease::derive(
1065                        &candidates[0],
1066                        &session_id,
1067                        &owner,
1068                        now,
1069                        lease_ttl_ms,
1070                    );
1071                    let liveness_json = encode_liveness(&owner.liveness)?;
1072                    for row in &selected {
1073                        // Under `BEGIN IMMEDIATE` this connection already holds
1074                        // the write lock, but the row could still have been
1075                        // claimed by an earlier committed writer (its
1076                        // `claim_token` set and not yet expired). The `WHERE`
1077                        // clause filters those out, so a 0-row update means we
1078                        // lost the race for this batch: treat the whole claim as
1079                        // not-won rather than returning a claim that doesn't
1080                        // actually own the row.
1081                        let claimed = tx
1082                            .execute(
1083                                "UPDATE queued_work_batches
1084                                 SET claim_id = ?3,
1085                                     claim_owner_id = ?4,
1086                                     claim_owner_incarnation_id = ?5,
1087                                     claim_owner_liveness_json = ?6,
1088                                     claim_token = ?7,
1089                                     claim_fencing_token = claim_fencing_token + 1,
1090                                     claim_claimed_at_ms = ?8,
1091                                     claim_expires_at_ms = ?9
1092                                 WHERE session_id = ?1
1093                                   AND batch_id = ?2
1094                                   AND (
1095                                        claim_token IS NULL
1096                                        OR claim_expires_at_ms <= ?8
1097                                        OR (
1098                                            claim_token = ?10
1099                                            AND claim_owner_id = ?11
1100                                            AND claim_owner_incarnation_id = ?12
1101                                        )
1102                                   )",
1103                                params![
1104                                    session_id,
1105                                    row.batch_id,
1106                                    lease.claim_id,
1107                                    owner.owner_id.as_str(),
1108                                    owner.incarnation_id.as_str(),
1109                                    liveness_json.as_str(),
1110                                    lease.lease_token,
1111                                    now as i64,
1112                                    lease.expires_at_epoch_ms as i64,
1113                                    row.claim_token,
1114                                    row.claim_owner.as_ref().map(|owner| owner.owner_id.as_str()),
1115                                    row.claim_owner
1116                                        .as_ref()
1117                                        .map(|owner| owner.incarnation_id.as_str())
1118                                ],
1119                            )
1120                            .map_err(sqlite_error)?;
1121                        if claimed == 0 {
1122                            // Lost the race for this batch. Roll back any sibling
1123                            // rows we already claimed in this transaction so we
1124                            // never return a half-owned claim.
1125                            return Ok(TxOutcome::Rollback(None));
1126                        }
1127                    }
1128                    Ok(TxOutcome::Commit(Some(QueuedWorkClaim {
1129                        session_id: session_id.clone(),
1130                        claim_id: lease.claim_id,
1131                        owner: owner.clone(),
1132                        lease_token: lease.lease_token,
1133                        fencing_token: lease.fencing_token,
1134                        claimed_at_epoch_ms: lease.claimed_at_epoch_ms,
1135                        expires_at_epoch_ms: lease.expires_at_epoch_ms,
1136                        batches: selected_batches,
1137                    })))
1138                })();
1139                // Lower a `StoreError` into the rollback arm so the closure body
1140                // can keep using `?` while still propagating the error to the
1141                // caller. Encode it as a `Result` carried out of the flow.
1142                match outcome {
1143                    Ok(TxOutcome::Commit(value)) => Ok(TxOutcome::Commit(Ok(value))),
1144                    Ok(TxOutcome::Rollback(value)) => Ok(TxOutcome::Rollback(Ok(value))),
1145                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1146                }
1147            })
1148            .await
1149            .map_err(sqlite_error)?
1150    }
1151
1152    async fn renew_queued_work_claim(
1153        &self,
1154        claim: &QueuedWorkClaim,
1155        lease_ttl_ms: u64,
1156    ) -> Result<QueuedWorkClaim, StoreError> {
1157        let now = current_epoch_ms();
1158        let expires_at = now.saturating_add(lease_ttl_ms);
1159        let session_id = claim.session_id.clone();
1160        let claim_id = claim.claim_id.clone();
1161        let lease_token = claim.lease_token.clone();
1162        let changed = self
1163            .conn
1164            .write(move |tx| {
1165                tx.execute(
1166                    "UPDATE queued_work_batches
1167                     SET claim_expires_at_ms = ?4
1168                     WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
1169                    params![session_id, claim_id, lease_token, expires_at as i64],
1170                )
1171            })
1172            .await
1173            .map_err(sqlite_error)?;
1174        renewed_claim(claim, changed, expires_at)
1175    }
1176
1177    async fn abandon_queued_work_claim(&self, claim: &QueuedWorkClaim) -> Result<(), StoreError> {
1178        let session_id = claim.session_id.clone();
1179        let claim_id = claim.claim_id.clone();
1180        let lease_token = claim.lease_token.clone();
1181        self.conn
1182            .write(move |tx| {
1183                tx.execute(
1184                    "UPDATE queued_work_batches
1185                     SET claim_id = NULL,
1186                         claim_owner_id = NULL,
1187                         claim_owner_incarnation_id = NULL,
1188                         claim_owner_liveness_json = NULL,
1189                         claim_token = NULL,
1190                         claim_claimed_at_ms = 0,
1191                         claim_expires_at_ms = 0
1192                     WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
1193                    params![session_id, claim_id, lease_token],
1194                )
1195            })
1196            .await
1197            .map_err(sqlite_error)?;
1198        Ok(())
1199    }
1200
1201    async fn cancel_queued_work_batch(
1202        &self,
1203        session_id: &str,
1204        batch_id: &str,
1205    ) -> Result<Option<QueuedWorkBatch>, StoreError> {
1206        let session_id = session_id.to_string();
1207        let batch_id = batch_id.to_string();
1208        self.conn
1209            .write_flow(move |tx| {
1210                let outcome: Result<Option<QueuedWorkBatch>, StoreError> = (|| {
1211                    let now = current_epoch_ms() as i64;
1212                    let row = tx
1213                        .query_row(
1214                            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1215                                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1216                                    claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
1217                                    claim_owner_liveness_json, claim_token, claim_expires_at_ms
1218                             FROM queued_work_batches
1219                             WHERE session_id = ?1
1220                               AND batch_id = ?2
1221                               AND (claim_token IS NULL OR claim_expires_at_ms <= ?3)",
1222                            params![session_id, batch_id, now],
1223                            queued_batch_row_from_sql,
1224                        )
1225                        .optional()
1226                        .map_err(sqlite_error)?;
1227                    let Some(row) = row else {
1228                        return Ok(None);
1229                    };
1230                    let batch = queued_work_batch_from_conn(tx, row)?;
1231                    tx.execute(
1232                        "DELETE FROM queued_work_batches
1233                         WHERE session_id = ?1
1234                           AND batch_id = ?2
1235                           AND (claim_token IS NULL OR claim_expires_at_ms <= ?3)",
1236                        params![session_id, batch_id, now],
1237                    )
1238                    .map_err(sqlite_error)?;
1239                    Ok(Some(batch))
1240                })();
1241                match outcome {
1242                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
1243                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1244                }
1245            })
1246            .await
1247            .map_err(sqlite_error)?
1248    }
1249
1250    async fn list_queued_work(&self, session_id: &str) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1251        let session_id = session_id.to_string();
1252        self.conn
1253            .call(move |conn| {
1254                let outcome: Result<Vec<QueuedWorkBatch>, StoreError> = (|| {
1255                    let rows = {
1256                        let mut stmt = conn
1257                            .prepare(
1258                                "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1259                                        slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1260                                        claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
1261                                        claim_owner_liveness_json, claim_token, claim_expires_at_ms
1262                                 FROM queued_work_batches
1263                                 WHERE session_id = ?1
1264                                 ORDER BY enqueue_seq ASC",
1265                            )
1266                            .map_err(sqlite_error)?;
1267                        let rows = stmt
1268                            .query_map(params![session_id], queued_batch_row_from_sql)
1269                            .map_err(sqlite_error)?;
1270                        rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1271                    };
1272                    rows.into_iter()
1273                        .map(|row| queued_work_batch_from_conn(conn, row))
1274                        .collect()
1275                })();
1276                Ok(outcome)
1277            })
1278            .await
1279            .map_err(sqlite_error)?
1280    }
1281
1282    async fn list_pending_queued_work(
1283        &self,
1284        session_id: &str,
1285    ) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1286        let session_id = session_id.to_string();
1287        self.conn
1288            .call(move |conn| {
1289                let outcome: Result<Vec<QueuedWorkBatch>, StoreError> = (|| {
1290                    let now = current_epoch_ms();
1291                    let rows = {
1292                        let mut stmt = conn
1293                            .prepare(
1294                                "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1295                                        slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1296                                        claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
1297                                        claim_owner_liveness_json, claim_token, claim_expires_at_ms
1298                                 FROM queued_work_batches
1299                                 WHERE session_id = ?1
1300                                   AND (claim_token IS NULL OR claim_expires_at_ms <= ?2)
1301                                 ORDER BY enqueue_seq ASC",
1302                            )
1303                            .map_err(sqlite_error)?;
1304                        let rows = stmt
1305                            .query_map(
1306                                params![session_id, now as i64],
1307                                queued_batch_row_from_sql,
1308                            )
1309                            .map_err(sqlite_error)?;
1310                        rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1311                    };
1312                    rows.into_iter()
1313                        .map(|row| queued_work_batch_from_conn(conn, row))
1314                        .collect()
1315                })();
1316                Ok(outcome)
1317            })
1318            .await
1319            .map_err(sqlite_error)?
1320    }
1321
1322    async fn enqueue_pending_turn_input(
1323        &self,
1324        draft: lash_core::PendingTurnInputDraft,
1325    ) -> Result<lash_core::PendingTurnInput, StoreError> {
1326        let nonce = self.commit_count.fetch_add(1, AtomicOrdering::Relaxed);
1327        self.conn
1328            .write_flow(move |tx| {
1329                let outcome: Result<lash_core::PendingTurnInput, StoreError> = (|| {
1330                    if let Some(source_key) = draft.source_key.as_deref() {
1331                        let existing_id: Option<String> = tx
1332                            .query_row(
1333                                "SELECT input_id
1334                                 FROM pending_turn_inputs
1335                                 WHERE session_id = ?1 AND source_key = ?2",
1336                                params![draft.session_id, source_key],
1337                                |row| row.get(0),
1338                            )
1339                            .optional()
1340                            .map_err(sqlite_error)?;
1341                        if let Some(input_id) = existing_id {
1342                            let existing = load_pending_turn_input_by_id_conn(
1343                                tx,
1344                                &draft.session_id,
1345                                &input_id,
1346                            )?
1347                            .ok_or_else(|| {
1348                                StoreError::Backend(
1349                                    "pending turn input source row disappeared".to_string(),
1350                                )
1351                            })?;
1352                            if !draft.submitted_content_matches(&existing).map_err(|err| {
1353                                StoreError::Backend(format!(
1354                                    "failed to compare pending turn input submission: {err}"
1355                                ))
1356                            })? {
1357                                return Err(StoreError::PendingTurnInputSourceKeyConflict {
1358                                    session_id: draft.session_id.clone(),
1359                                    source_key: source_key.to_string(),
1360                                    existing_input_id: existing.input_id.clone(),
1361                                });
1362                            }
1363                            return Ok(existing);
1364                        }
1365                    }
1366                    let now = current_epoch_ms();
1367                    let input_id = draft.input_id.clone().unwrap_or_else(|| {
1368                        derive_pending_turn_input_id(
1369                            &draft.session_id,
1370                            draft.source_key.as_deref(),
1371                            now,
1372                            nonce,
1373                        )
1374                    });
1375                    let state = match draft.ingress {
1376                        lash_core::TurnInputIngress::ActiveTurn { .. } => {
1377                            lash_core::TurnInputState::PendingActive
1378                        }
1379                        lash_core::TurnInputIngress::NextTurn => {
1380                            lash_core::TurnInputState::DeferredNextTurn
1381                        }
1382                    };
1383                    tx.execute(
1384                        "INSERT INTO pending_turn_inputs (
1385                            input_id, session_id, source_key, ingress_json, state,
1386                            input_json, enqueued_at_ms
1387                         )
1388                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1389                        params![
1390                            input_id,
1391                            draft.session_id,
1392                            draft.source_key.as_deref(),
1393                            encode_json(&draft.ingress),
1394                            state.as_str(),
1395                            encode_json(&draft.input),
1396                            now as i64,
1397                        ],
1398                    )
1399                    .map_err(sqlite_error)?;
1400                    load_pending_turn_input_by_id_conn(tx, &draft.session_id, &input_id)?
1401                        .ok_or_else(|| {
1402                            StoreError::Backend("pending turn input insert disappeared".to_string())
1403                        })
1404                })();
1405                match outcome {
1406                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
1407                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1408                }
1409            })
1410            .await
1411            .map_err(sqlite_error)?
1412    }
1413
1414    async fn list_pending_turn_inputs(
1415        &self,
1416        session_id: &str,
1417    ) -> Result<Vec<lash_core::PendingTurnInput>, StoreError> {
1418        let session_id = session_id.to_string();
1419        self.conn
1420            .call(move |conn| {
1421                let outcome: Result<Vec<lash_core::PendingTurnInput>, StoreError> = (|| {
1422                    let now = current_epoch_ms();
1423                    let rows = {
1424                        let mut stmt = conn
1425                            .prepare(
1426                                "SELECT enqueue_seq, input_id, session_id, source_key, ingress_json,
1427                                        state, input_json, enqueued_at_ms, claim_id, claim_fencing_token,
1428                                        claim_owner_id, claim_owner_incarnation_id,
1429                                        claim_owner_liveness_json, claim_token, claim_expires_at_ms
1430                                 FROM pending_turn_inputs
1431                                 WHERE session_id = ?1
1432                                   AND state IN (?2, ?3)
1433                                   AND (claim_token IS NULL OR claim_expires_at_ms <= ?4)
1434                                 ORDER BY enqueue_seq ASC",
1435                            )
1436                            .map_err(sqlite_error)?;
1437                        let rows = stmt
1438                            .query_map(
1439                                params![
1440                                    session_id,
1441                                    lash_core::TurnInputState::PendingActive.as_str(),
1442                                    lash_core::TurnInputState::DeferredNextTurn.as_str(),
1443                                    now as i64
1444                                ],
1445                                pending_turn_input_row_from_sql,
1446                            )
1447                            .map_err(sqlite_error)?;
1448                        rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1449                    };
1450                    rows.into_iter().map(pending_turn_input_from_row).collect()
1451                })(
1452                );
1453                Ok(outcome)
1454            })
1455            .await
1456            .map_err(sqlite_error)?
1457    }
1458
1459    async fn cancel_pending_turn_input(
1460        &self,
1461        session_id: &str,
1462        input_id: &str,
1463    ) -> Result<lash_core::PendingTurnInputCancelOutcome, StoreError> {
1464        let target = lash_core::PendingTurnInputCancelTarget::input_id(input_id);
1465        let targets = vec![target];
1466        let mut outcomes = self
1467            .cancel_pending_turn_inputs(session_id, &targets)
1468            .await?;
1469        Ok(outcomes
1470            .pop()
1471            .map(|result| result.outcome)
1472            .unwrap_or(lash_core::PendingTurnInputCancelOutcome::NotFound))
1473    }
1474
1475    async fn cancel_pending_turn_inputs(
1476        &self,
1477        session_id: &str,
1478        targets: &[lash_core::PendingTurnInputCancelTarget],
1479    ) -> Result<Vec<lash_core::PendingTurnInputCancelResult>, StoreError> {
1480        let session_id = session_id.to_string();
1481        let targets = targets.to_vec();
1482        self.conn
1483            .write_flow(move |tx| {
1484                let outcome: Result<Vec<lash_core::PendingTurnInputCancelResult>, StoreError> =
1485                    (|| {
1486                        let now = current_epoch_ms();
1487                        let mut results = Vec::with_capacity(targets.len());
1488                        for target in targets {
1489                            let outcome = match load_pending_turn_input_row_by_target_conn(
1490                                tx,
1491                                &session_id,
1492                                &target,
1493                            )? {
1494                                Some(row) => cancel_pending_turn_input_row_conn(tx, row, now)?,
1495                                None => lash_core::PendingTurnInputCancelOutcome::NotFound,
1496                            };
1497                            results
1498                                .push(lash_core::PendingTurnInputCancelResult { target, outcome });
1499                        }
1500                        Ok(results)
1501                    })();
1502                match outcome {
1503                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
1504                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1505                }
1506            })
1507            .await
1508            .map_err(sqlite_error)?
1509    }
1510
1511    async fn cancel_pending_turn_input_suffix(
1512        &self,
1513        session_id: &str,
1514        anchor: &lash_core::PendingTurnInputCancelTarget,
1515    ) -> Result<lash_core::PendingTurnInputSuffixCancelOutcome, StoreError> {
1516        let session_id = session_id.to_string();
1517        let anchor = anchor.clone();
1518        self.conn
1519            .write_flow(move |tx| {
1520                let outcome: Result<lash_core::PendingTurnInputSuffixCancelOutcome, StoreError> =
1521                    (|| {
1522                        let now = current_epoch_ms();
1523                        let Some(anchor_row) =
1524                            load_pending_turn_input_row_by_target_conn(tx, &session_id, &anchor)?
1525                        else {
1526                            return Ok(
1527                                lash_core::PendingTurnInputSuffixCancelOutcome::AnchorNotFound {
1528                                    anchor,
1529                                },
1530                            );
1531                        };
1532                        let rows = {
1533                            let mut stmt = tx
1534                                .prepare(
1535                                    "SELECT enqueue_seq, input_id, session_id, source_key, ingress_json,
1536                                            state, input_json, enqueued_at_ms, claim_id, claim_fencing_token,
1537                                            claim_owner_id, claim_owner_incarnation_id,
1538                                            claim_owner_liveness_json, claim_token, claim_expires_at_ms
1539                                     FROM pending_turn_inputs
1540                                     WHERE session_id = ?1 AND enqueue_seq >= ?2
1541                                     ORDER BY enqueue_seq ASC",
1542                                )
1543                                .map_err(sqlite_error)?;
1544                            let rows = stmt
1545                                .query_map(
1546                                    params![session_id, anchor_row.enqueue_seq as i64],
1547                                    pending_turn_input_row_from_sql,
1548                                )
1549                                .map_err(sqlite_error)?;
1550                            rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1551                        };
1552                        let mut outcomes = Vec::with_capacity(rows.len());
1553                        for row in rows {
1554                            outcomes.push(cancel_pending_turn_input_row_conn(tx, row, now)?);
1555                        }
1556                        Ok(lash_core::PendingTurnInputSuffixCancelOutcome::Outcomes {
1557                            anchor,
1558                            outcomes,
1559                        })
1560                    })();
1561                match outcome {
1562                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
1563                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1564                }
1565            })
1566            .await
1567            .map_err(sqlite_error)?
1568    }
1569
1570    async fn claim_active_turn_inputs(
1571        &self,
1572        session_id: &str,
1573        session_execution_lease: &SessionExecutionLeaseFence,
1574        owner: &LeaseOwnerIdentity,
1575        turn_id: &str,
1576        checkpoint: lash_core::CheckpointKind,
1577        lease_ttl_ms: u64,
1578        max_inputs: usize,
1579    ) -> Result<Option<lash_core::TurnInputClaim>, StoreError> {
1580        claim_pending_turn_inputs_sqlite(
1581            &self.conn,
1582            session_id,
1583            session_execution_lease,
1584            owner,
1585            lease_ttl_ms,
1586            max_inputs,
1587            lash_core::TurnInputClaimMode::ActiveTurn {
1588                turn_id: turn_id.to_string(),
1589                checkpoint,
1590            },
1591        )
1592        .await
1593    }
1594
1595    async fn claim_next_turn_inputs(
1596        &self,
1597        session_id: &str,
1598        session_execution_lease: &SessionExecutionLeaseFence,
1599        owner: &LeaseOwnerIdentity,
1600        lease_ttl_ms: u64,
1601        max_inputs: usize,
1602    ) -> Result<Option<lash_core::TurnInputClaim>, StoreError> {
1603        claim_pending_turn_inputs_sqlite(
1604            &self.conn,
1605            session_id,
1606            session_execution_lease,
1607            owner,
1608            lease_ttl_ms,
1609            max_inputs,
1610            lash_core::TurnInputClaimMode::NextTurn,
1611        )
1612        .await
1613    }
1614
1615    async fn abandon_turn_input_claim(
1616        &self,
1617        claim: &lash_core::TurnInputClaim,
1618    ) -> Result<(), StoreError> {
1619        let session_id = claim.session_id.clone();
1620        let claim_id = claim.claim_id.clone();
1621        let lease_token = claim.lease_token.clone();
1622        let restored_state = match claim.mode {
1623            lash_core::TurnInputClaimMode::ActiveTurn { .. } => {
1624                lash_core::TurnInputState::PendingActive
1625            }
1626            lash_core::TurnInputClaimMode::NextTurn => lash_core::TurnInputState::DeferredNextTurn,
1627        };
1628        self.conn
1629            .write(move |tx| {
1630                tx.execute(
1631                    "UPDATE pending_turn_inputs
1632                     SET state = CASE
1633                             WHEN state = ?4 THEN ?5
1634                             ELSE state
1635                         END,
1636                         claim_id = NULL,
1637                         claim_owner_id = NULL,
1638                         claim_owner_incarnation_id = NULL,
1639                         claim_owner_liveness_json = NULL,
1640                         claim_token = NULL,
1641                         claim_claimed_at_ms = 0,
1642                         claim_expires_at_ms = 0
1643                     WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
1644                    params![
1645                        session_id,
1646                        claim_id,
1647                        lease_token,
1648                        lash_core::TurnInputState::Accepted.as_str(),
1649                        restored_state.as_str(),
1650                    ],
1651                )
1652            })
1653            .await
1654            .map_err(sqlite_error)?;
1655        Ok(())
1656    }
1657
1658    async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError> {
1659        Store::save_session_meta(self, meta).await;
1660        Ok(())
1661    }
1662
1663    async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError> {
1664        Ok(Store::load_session_meta(self).await)
1665    }
1666
1667    async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError> {
1668        if ids.is_empty() {
1669            return Ok(());
1670        }
1671        let ids = ids.to_vec();
1672        self.conn
1673            .write(move |tx| {
1674                let mut stmt =
1675                    tx.prepare("UPDATE graph_nodes SET tombstoned = 1 WHERE node_id = ?1")?;
1676                for id in &ids {
1677                    stmt.execute(params![id])?;
1678                }
1679                Ok(())
1680            })
1681            .await
1682            .map_err(sqlite_error)
1683    }
1684
1685    async fn vacuum(&self) -> Result<VacuumReport, StoreError> {
1686        let (removed_node_count, removed_pending_turn_input_tombstone_count) = self
1687            .conn
1688            .write(move |tx| {
1689                let removed_node_count =
1690                    tx.execute("DELETE FROM graph_nodes WHERE tombstoned = 1", [])?;
1691                let removed_pending_turn_input_tombstone_count = tx.execute(
1692                    "DELETE FROM pending_turn_inputs
1693                     WHERE state IN (?1, ?2)",
1694                    params![
1695                        lash_core::TurnInputState::Cancelled.as_str(),
1696                        lash_core::TurnInputState::Completed.as_str()
1697                    ],
1698                )?;
1699                Ok((
1700                    removed_node_count,
1701                    removed_pending_turn_input_tombstone_count,
1702                ))
1703            })
1704            .await
1705            .map_err(sqlite_error)?;
1706        Ok(VacuumReport {
1707            removed_node_count,
1708            removed_pending_turn_input_tombstone_count,
1709        })
1710    }
1711
1712    async fn gc_unreachable(&self) -> Result<GcReport, StoreError> {
1713        Ok(Store::gc_unreachable(self).await)
1714    }
1715}
1716
1717fn derive_pending_turn_input_id(
1718    session_id: &str,
1719    source_key: Option<&str>,
1720    now_epoch_ms: u64,
1721    nonce: u64,
1722) -> String {
1723    format!(
1724        "ti:{:x}",
1725        Sha256::digest(format!("{session_id}:{source_key:?}:{now_epoch_ms}:{nonce}").as_bytes())
1726    )
1727}
1728
1729fn cancel_pending_turn_input_row_conn(
1730    conn: &Connection,
1731    row: PendingTurnInputRow,
1732    now_epoch_ms: u64,
1733) -> Result<lash_core::PendingTurnInputCancelOutcome, StoreError> {
1734    let mut input = pending_turn_input_from_row(row.clone())?;
1735    match input.state {
1736        lash_core::TurnInputState::Cancelled => Ok(
1737            lash_core::PendingTurnInputCancelOutcome::AlreadyCancelled(input),
1738        ),
1739        lash_core::TurnInputState::Completed => Ok(
1740            lash_core::PendingTurnInputCancelOutcome::AlreadyCompleted(input),
1741        ),
1742        lash_core::TurnInputState::Accepted => {
1743            Ok(lash_core::PendingTurnInputCancelOutcome::AlreadyClaimed {
1744                claim: pending_turn_input_claim_diagnostics_from_row(&row, input.state),
1745                input,
1746            })
1747        }
1748        lash_core::TurnInputState::PendingActive | lash_core::TurnInputState::DeferredNextTurn => {
1749            let live_claim = row.claim_token.is_some() && row.claim_expires_at_ms > now_epoch_ms;
1750            if live_claim {
1751                return Ok(lash_core::PendingTurnInputCancelOutcome::AlreadyClaimed {
1752                    claim: pending_turn_input_claim_diagnostics_from_row(&row, input.state),
1753                    input,
1754                });
1755            }
1756            conn.execute(
1757                "UPDATE pending_turn_inputs
1758                 SET state = ?3,
1759                     claim_id = NULL,
1760                     claim_owner_id = NULL,
1761                     claim_owner_incarnation_id = NULL,
1762                     claim_owner_liveness_json = NULL,
1763                     claim_token = NULL,
1764                     claim_claimed_at_ms = 0,
1765                     claim_expires_at_ms = 0
1766                 WHERE session_id = ?1 AND input_id = ?2",
1767                params![
1768                    row.session_id,
1769                    row.input_id,
1770                    lash_core::TurnInputState::Cancelled.as_str(),
1771                ],
1772            )
1773            .map_err(sqlite_error)?;
1774            input.state = lash_core::TurnInputState::Cancelled;
1775            Ok(lash_core::PendingTurnInputCancelOutcome::Cancelled(input))
1776        }
1777    }
1778}
1779
1780async fn claim_pending_turn_inputs_sqlite(
1781    conn: &SqliteConnection,
1782    session_id: &str,
1783    session_execution_lease: &SessionExecutionLeaseFence,
1784    owner: &LeaseOwnerIdentity,
1785    lease_ttl_ms: u64,
1786    max_inputs: usize,
1787    mode: lash_core::TurnInputClaimMode,
1788) -> Result<Option<lash_core::TurnInputClaim>, StoreError> {
1789    if max_inputs == 0 {
1790        return Ok(None);
1791    }
1792    let session_id = session_id.to_string();
1793    let session_execution_lease = session_execution_lease.clone();
1794    let owner = owner.clone();
1795    conn.write_flow(move |tx| {
1796        let outcome: Result<TxOutcome<Option<lash_core::TurnInputClaim>>, StoreError> = (|| {
1797            ensure_session_execution_lease_conn(tx, &session_id, &session_execution_lease)?;
1798            let now = current_epoch_ms();
1799            let wanted_state = match &mode {
1800                lash_core::TurnInputClaimMode::ActiveTurn { .. } => {
1801                    lash_core::TurnInputState::PendingActive
1802                }
1803                lash_core::TurnInputClaimMode::NextTurn => {
1804                    lash_core::TurnInputState::DeferredNextTurn
1805                }
1806            };
1807            let candidate_rows = {
1808                let mut stmt = tx
1809                    .prepare(
1810                        "SELECT enqueue_seq, input_id, session_id, source_key, ingress_json,
1811                                state, input_json, enqueued_at_ms, claim_id, claim_fencing_token,
1812                                claim_owner_id, claim_owner_incarnation_id,
1813                                claim_owner_liveness_json, claim_token, claim_expires_at_ms
1814                         FROM pending_turn_inputs
1815                         WHERE session_id = ?1 AND state = ?2
1816                         ORDER BY enqueue_seq ASC
1817                         LIMIT ?3",
1818                    )
1819                    .map_err(sqlite_error)?;
1820                let rows = stmt
1821                    .query_map(
1822                        params![
1823                            session_id,
1824                            wanted_state.as_str(),
1825                            (max_inputs as i64).saturating_add(32)
1826                        ],
1827                        pending_turn_input_row_from_sql,
1828                    )
1829                    .map_err(sqlite_error)?;
1830                rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1831            };
1832            let mut selected = Vec::new();
1833            for row in candidate_rows {
1834                let claim_available = row.claim_token.is_none()
1835                    || row.claim_expires_at_ms <= now
1836                    || row
1837                        .claim_owner
1838                        .as_ref()
1839                        .is_some_and(|holder| holder.is_definitely_dead_for_claimant(&owner));
1840                if !claim_available {
1841                    continue;
1842                }
1843                let input = pending_turn_input_from_row(row.clone())?;
1844                let matches_mode = match &mode {
1845                    lash_core::TurnInputClaimMode::ActiveTurn {
1846                        turn_id,
1847                        checkpoint,
1848                    } => {
1849                        input
1850                            .ingress
1851                            .active_turn_id()
1852                            .is_some_and(|active| active == turn_id)
1853                            && input.ingress.admits_checkpoint(*checkpoint)
1854                    }
1855                    lash_core::TurnInputClaimMode::NextTurn => input.state.is_next_turn_pending(),
1856                };
1857                if matches_mode {
1858                    selected.push((row, input));
1859                    if selected.len() >= max_inputs {
1860                        break;
1861                    }
1862                }
1863            }
1864            let Some((head, _)) = selected.first() else {
1865                return Ok(TxOutcome::Commit(None));
1866            };
1867            let lease = TurnInputClaimLease::derive(head, &session_id, &owner, now, lease_ttl_ms);
1868            let liveness_json = encode_liveness(&owner.liveness)?;
1869            let state_after_claim = match &mode {
1870                lash_core::TurnInputClaimMode::ActiveTurn { .. } => {
1871                    lash_core::TurnInputState::Accepted
1872                }
1873                lash_core::TurnInputClaimMode::NextTurn => {
1874                    lash_core::TurnInputState::DeferredNextTurn
1875                }
1876            };
1877            let mut inputs = Vec::new();
1878            for (row, mut input) in selected {
1879                let claimed = tx
1880                    .execute(
1881                        "UPDATE pending_turn_inputs
1882                         SET state = ?3,
1883                             claim_id = ?4,
1884                             claim_owner_id = ?5,
1885                             claim_owner_incarnation_id = ?6,
1886                             claim_owner_liveness_json = ?7,
1887                             claim_token = ?8,
1888                             claim_fencing_token = claim_fencing_token + 1,
1889                             claim_claimed_at_ms = ?9,
1890                             claim_expires_at_ms = ?10
1891                         WHERE session_id = ?1
1892                           AND input_id = ?2
1893                           AND (
1894                                claim_token IS NULL
1895                                OR claim_expires_at_ms <= ?9
1896                                OR (
1897                                    claim_token = ?11
1898                                    AND claim_owner_id = ?12
1899                                    AND claim_owner_incarnation_id = ?13
1900                                )
1901                           )",
1902                        params![
1903                            session_id,
1904                            row.input_id,
1905                            state_after_claim.as_str(),
1906                            lease.claim_id,
1907                            owner.owner_id.as_str(),
1908                            owner.incarnation_id.as_str(),
1909                            liveness_json.as_str(),
1910                            lease.lease_token,
1911                            now as i64,
1912                            lease.expires_at_epoch_ms as i64,
1913                            row.claim_token,
1914                            row.claim_owner
1915                                .as_ref()
1916                                .map(|owner| owner.owner_id.as_str()),
1917                            row.claim_owner
1918                                .as_ref()
1919                                .map(|owner| owner.incarnation_id.as_str())
1920                        ],
1921                    )
1922                    .map_err(sqlite_error)?;
1923                if claimed == 0 {
1924                    return Ok(TxOutcome::Rollback(None));
1925                }
1926                input.state = state_after_claim;
1927                inputs.push(input);
1928            }
1929            Ok(TxOutcome::Commit(Some(lash_core::TurnInputClaim {
1930                session_id: session_id.clone(),
1931                claim_id: lease.claim_id,
1932                owner: owner.clone(),
1933                lease_token: lease.lease_token,
1934                fencing_token: lease.fencing_token,
1935                claimed_at_epoch_ms: lease.claimed_at_epoch_ms,
1936                expires_at_epoch_ms: lease.expires_at_epoch_ms,
1937                mode,
1938                inputs,
1939            })))
1940        })(
1941        );
1942        match outcome {
1943            Ok(TxOutcome::Commit(value)) => Ok(TxOutcome::Commit(Ok(value))),
1944            Ok(TxOutcome::Rollback(value)) => Ok(TxOutcome::Rollback(Ok(value))),
1945            Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1946        }
1947    })
1948    .await
1949    .map_err(sqlite_error)?
1950}
1951
1952struct SessionExecutionLeaseRow {
1953    owner: Option<LeaseOwnerIdentity>,
1954    lease_token: Option<String>,
1955    fencing_token: u64,
1956    claimed_at_ms: u64,
1957    expires_at_ms: u64,
1958}
1959
1960fn load_session_execution_lease_row_conn(
1961    conn: &Connection,
1962    session_id: &str,
1963) -> Result<Option<SessionExecutionLeaseRow>, StoreError> {
1964    let row = conn
1965        .query_row(
1966            "SELECT lease_owner_id, lease_token, lease_fencing_token,
1967                    lease_claimed_at_ms, lease_expires_at_ms,
1968                    lease_owner_incarnation_id, lease_owner_liveness_json
1969             FROM session_execution_leases
1970             WHERE session_id = ?1",
1971            params![session_id],
1972            |row| {
1973                let owner_id: Option<String> = row.get(0)?;
1974                let incarnation_id: Option<String> = row.get(5)?;
1975                let liveness_json: Option<String> = row.get(6)?;
1976                Ok(SessionExecutionLeaseRow {
1977                    owner: lease_owner_from_columns(owner_id, incarnation_id, liveness_json),
1978                    lease_token: row.get(1)?,
1979                    fencing_token: row.get::<_, i64>(2)? as u64,
1980                    claimed_at_ms: row.get::<_, i64>(3)? as u64,
1981                    expires_at_ms: row.get::<_, i64>(4)? as u64,
1982                })
1983            },
1984        )
1985        .optional()
1986        .map_err(sqlite_error)?;
1987    Ok(row)
1988}
1989
1990fn lease_owner_from_columns(
1991    owner_id: Option<String>,
1992    incarnation_id: Option<String>,
1993    liveness_json: Option<String>,
1994) -> Option<LeaseOwnerIdentity> {
1995    owner_id.map(|owner_id| LeaseOwnerIdentity {
1996        incarnation_id: incarnation_id.unwrap_or_else(|| owner_id.clone()),
1997        owner_id,
1998        liveness: liveness_json
1999            .as_deref()
2000            .and_then(|json| serde_json::from_str(json).ok())
2001            .unwrap_or(LeaseOwnerLiveness::Opaque),
2002    })
2003}
2004
2005fn encode_liveness(liveness: &LeaseOwnerLiveness) -> Result<String, StoreError> {
2006    serde_json::to_string(liveness)
2007        .map_err(|err| StoreError::Backend(format!("failed to encode lease liveness: {err}")))
2008}
2009
2010fn row_to_session_execution_lease(
2011    session_id: &str,
2012    row: SessionExecutionLeaseRow,
2013) -> Result<SessionExecutionLease, StoreError> {
2014    Ok(SessionExecutionLease {
2015        session_id: session_id.to_string(),
2016        owner: row
2017            .owner
2018            .ok_or_else(|| StoreError::Backend("live session lease missing owner".to_string()))?,
2019        lease_token: row.lease_token.ok_or_else(|| {
2020            StoreError::Backend("live session lease missing lease token".to_string())
2021        })?,
2022        fencing_token: row.fencing_token,
2023        claimed_at_epoch_ms: row.claimed_at_ms,
2024        expires_at_epoch_ms: row.expires_at_ms,
2025    })
2026}
2027
2028fn acquire_session_execution_lease_conn(
2029    conn: &Connection,
2030    session_id: &str,
2031    owner: &LeaseOwnerIdentity,
2032    previous_fencing_token: u64,
2033    now: u64,
2034    lease_ttl_ms: u64,
2035) -> Result<SessionExecutionLease, StoreError> {
2036    let fencing_token = previous_fencing_token.saturating_add(1);
2037    let lease_token = format!(
2038        "{}:{}:{}:{now}:{fencing_token}",
2039        session_id, owner.owner_id, owner.incarnation_id
2040    );
2041    let expires_at = now.saturating_add(lease_ttl_ms);
2042    let liveness_json = encode_liveness(&owner.liveness)?;
2043    conn.execute(
2044        "INSERT INTO session_execution_leases (
2045            session_id, lease_owner_id, lease_owner_incarnation_id, lease_owner_liveness_json,
2046            lease_token, lease_fencing_token, lease_claimed_at_ms, lease_expires_at_ms
2047         )
2048         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
2049         ON CONFLICT(session_id) DO UPDATE SET
2050            lease_owner_id = excluded.lease_owner_id,
2051            lease_owner_incarnation_id = excluded.lease_owner_incarnation_id,
2052            lease_owner_liveness_json = excluded.lease_owner_liveness_json,
2053            lease_token = excluded.lease_token,
2054            lease_fencing_token = excluded.lease_fencing_token,
2055            lease_claimed_at_ms = excluded.lease_claimed_at_ms,
2056            lease_expires_at_ms = excluded.lease_expires_at_ms",
2057        params![
2058            session_id,
2059            owner.owner_id,
2060            owner.incarnation_id,
2061            liveness_json,
2062            lease_token,
2063            fencing_token as i64,
2064            now as i64,
2065            expires_at as i64
2066        ],
2067    )
2068    .map_err(sqlite_error)?;
2069    Ok(SessionExecutionLease {
2070        session_id: session_id.to_string(),
2071        owner: owner.clone(),
2072        lease_token,
2073        fencing_token,
2074        claimed_at_epoch_ms: now,
2075        expires_at_epoch_ms: expires_at,
2076    })
2077}
2078
2079fn ensure_session_execution_lease_conn(
2080    conn: &Connection,
2081    session_id: &str,
2082    fence: &SessionExecutionLeaseFence,
2083) -> Result<(), StoreError> {
2084    if fence.session_id != session_id {
2085        return Err(StoreError::SessionExecutionLeaseExpired {
2086            session_id: session_id.to_string(),
2087        });
2088    }
2089    let now = current_epoch_ms();
2090    let current = load_session_execution_lease_row_conn(conn, session_id)?;
2091    let Some(current) = current else {
2092        return Err(StoreError::SessionExecutionLeaseExpired {
2093            session_id: session_id.to_string(),
2094        });
2095    };
2096    if current
2097        .owner
2098        .as_ref()
2099        .is_some_and(|owner| owner.same_incarnation(&fence.owner))
2100        && current.lease_token.as_deref() == Some(fence.lease_token.as_str())
2101        && current.fencing_token == fence.fencing_token
2102        && current.expires_at_ms > now
2103    {
2104        Ok(())
2105    } else {
2106        Err(StoreError::SessionExecutionLeaseExpired {
2107            session_id: session_id.to_string(),
2108        })
2109    }
2110}
2111
2112fn release_session_execution_lease_conn(
2113    conn: &Connection,
2114    completion: &SessionExecutionLeaseCompletion,
2115) -> Result<(), StoreError> {
2116    conn.execute(
2117        "UPDATE session_execution_leases
2118         SET lease_owner_id = NULL,
2119             lease_owner_incarnation_id = NULL,
2120             lease_owner_liveness_json = NULL,
2121             lease_token = NULL,
2122             lease_claimed_at_ms = 0,
2123             lease_expires_at_ms = 0
2124         WHERE session_id = ?1
2125           AND lease_owner_id = ?2
2126           AND lease_owner_incarnation_id = ?3
2127           AND lease_token = ?4
2128           AND lease_fencing_token = ?5",
2129        params![
2130            completion.session_id,
2131            completion.owner.owner_id,
2132            completion.owner.incarnation_id,
2133            completion.lease_token,
2134            completion.fencing_token as i64
2135        ],
2136    )
2137    .map_err(sqlite_error)?;
2138    Ok(())
2139}