Skip to main content

lash_sqlite_store/
persistence.rs

1//! The [`RuntimePersistence`] capability-segment implementations for
2//! [`Store`]: [`SessionCommitStore`], [`SessionExecutionLeaseStore`],
3//! [`QueuedWorkStore`], [`TurnInputStore`], and [`StoreMaintenance`].
4//!
5//! This is the tokio-rusqlite port of the prior store's `persistence.rs`. The
6//! public surface is byte-for-byte the prior store async trait: identical method
7//! names and signatures, so consumers swap backends with a path rename only.
8//!
9//! The translation rules (see `conn.rs`, `lifecycle.rs`, `blobs.rs`):
10//!
11//! * Pure reads run through `self.conn.call(move |conn| { ... })`.
12//! * Read-then-write paths run through `self.conn.write(move |tx| { ... })`
13//!   (`BEGIN IMMEDIATE`, commit on `Ok`, rollback on `Err`) — this is the
14//!   cross-process write-lock guard.
15//! * Paths that may abandon partially-applied writes (the queued-work claim)
16//!   run through `self.conn.write_flow`, deciding commit vs rollback via
17//!   [`TxOutcome`].
18//! * The shared `*_conn` helpers (`try_load_session_head_meta_from_conn`,
19//!   `Self::put_checkpoint_conn`, `Self::load_usage_deltas_conn`,
20//!   `Self::load_session_graph_from_conn`, the queued-work helpers, …) are
21//!   synchronous and take a `&rusqlite::Connection`, so they are reused from
22//!   inside these closures (a `&Transaction` derefs to `&Connection`).
23//! * Closures must be `'static` + `Send`: every borrow of `self`/caller data is
24//!   cloned into an owned value before being moved in.
25
26use super::*;
27
28#[async_trait::async_trait]
29impl SessionCommitStore for Store {
30    fn durability_tier(&self) -> DurabilityTier {
31        DurabilityTier::Durable
32    }
33
34    async fn load_session(
35        &self,
36        scope: SessionReadScope,
37    ) -> Result<Option<PersistedSessionRead>, StoreError> {
38        self.conn
39            .call(move |conn| {
40                let outcome: Result<Option<PersistedSessionRead>, StoreError> = (|| {
41                    let Some(meta) = try_load_session_head_meta_from_conn(conn)? else {
42                        return Ok(None);
43                    };
44                    let leaf_node_id = match &scope {
45                        SessionReadScope::FullGraph => meta.leaf_node_id.clone(),
46                        SessionReadScope::ActivePath { leaf_node_id } => {
47                            leaf_node_id.clone().or_else(|| meta.leaf_node_id.clone())
48                        }
49                    };
50                    let mut graph = match scope {
51                        SessionReadScope::FullGraph => {
52                            Self::load_session_graph_from_conn(conn, meta.leaf_node_id.clone())
53                        }
54                        SessionReadScope::ActivePath { .. } => {
55                            Self::load_active_path_session_graph_from_conn(
56                                conn,
57                                leaf_node_id.clone(),
58                            )
59                            .map_err(sqlite_error)?
60                        }
61                    };
62                    graph.set_leaf_node_id(leaf_node_id);
63                    let checkpoint = meta
64                        .checkpoint_ref
65                        .as_ref()
66                        .map(|blob_ref| Self::get_checkpoint_conn(conn, blob_ref))
67                        .transpose()?
68                        .flatten();
69                    Ok(Some(PersistedSessionRead {
70                        session_id: meta.session_id,
71                        head_revision: meta.head_revision,
72                        config: meta.config,
73                        agent_frames: meta.agent_frames,
74                        current_agent_frame_id: meta.current_agent_frame_id,
75                        graph,
76                        checkpoint_ref: meta.checkpoint_ref,
77                        checkpoint,
78                        token_ledger: merge_token_ledger_entries(Self::load_usage_deltas_conn(
79                            conn,
80                        )),
81                    }))
82                })(
83                );
84                Ok(outcome)
85            })
86            .await
87            .map_err(sqlite_error)?
88    }
89
90    async fn load_node(
91        &self,
92        node_id: &str,
93    ) -> Result<Option<lash_core::SessionNodeRecord>, StoreError> {
94        let node_id = node_id.to_string();
95        let row: Option<String> = self
96            .conn
97            .call(move |conn| {
98                conn.query_row(
99                    "SELECT node_json FROM graph_nodes WHERE node_id = ?1 AND tombstoned = 0",
100                    params![node_id],
101                    |row| row.get(0),
102                )
103                .optional()
104            })
105            .await
106            .map_err(sqlite_error)?;
107        Ok(row.and_then(|json| serde_json::from_str(&json).ok()))
108    }
109
110    async fn commit_runtime_state(
111        &self,
112        commit: RuntimeCommit,
113    ) -> Result<RuntimeCommitResult, StoreError> {
114        let blob_profile = self.options.blob_profile;
115        let result = self
116            .conn
117            .write_flow(move |tx| {
118                let outcome: Result<RuntimeCommitResult, StoreError> = (|| {
119                    let existing = try_load_session_head_meta_from_conn(tx)?;
120                    if let Some(bound_session_id) =
121                        existing.as_ref().map(|meta| meta.session_id.as_str())
122                        && bound_session_id != commit.session_id
123                    {
124                        return Err(StoreError::SessionBindingMismatch {
125                            bound_session_id: bound_session_id.to_string(),
126                            attempted_session_id: commit.session_id.clone(),
127                        });
128                    }
129                    if let Some(completed) = &commit.turn_commit {
130                        if completed.session_id != commit.session_id {
131                            return Err(StoreError::RuntimeTurnCommitConflict {
132                                session_id: completed.session_id.clone(),
133                                turn_id: completed.turn_id.clone(),
134                            });
135                        }
136                        let prior: Option<(String, String)> = tx
137                            .query_row(
138                                "SELECT turn_commit_hash, result_json FROM runtime_turn_commits
139                                 WHERE session_id = ?1 AND turn_id = ?2",
140                                params![completed.session_id, completed.turn_id],
141                                |row| Ok((row.get(0)?, row.get(1)?)),
142                            )
143                            .optional()
144                            .map_err(sqlite_error)?;
145                        if let Some((turn_commit_hash, result_json)) = prior {
146                            if turn_commit_hash == completed.turn_commit_hash {
147                                let result: RuntimeCommitResult =
148                                    serde_json::from_str(&result_json).map_err(|err| {
149                                        StoreError::Backend(format!(
150                                            "failed to decode runtime turn commit result: {err}"
151                                        ))
152                                    })?;
153                                if let Some(completion) =
154                                    commit.release_session_execution_lease.as_ref()
155                                {
156                                    release_session_execution_lease_conn(tx, completion)?;
157                                }
158                                return Ok(result);
159                            }
160                            return Err(StoreError::RuntimeTurnCommitConflict {
161                                session_id: completed.session_id.clone(),
162                                turn_id: completed.turn_id.clone(),
163                            });
164                        }
165                    }
166                    let Some(session_execution_lease) = commit.session_execution_lease.as_ref()
167                    else {
168                        return Err(StoreError::SessionExecutionLeaseExpired {
169                            session_id: commit.session_id.clone(),
170                        });
171                    };
172                    ensure_session_execution_lease_conn(
173                        tx,
174                        &commit.session_id,
175                        session_execution_lease,
176                    )?;
177                    let actual_revision = existing.as_ref().map_or(0, |meta| meta.head_revision);
178                    if commit.expected_head_revision.is_some()
179                        && commit.expected_head_revision != Some(actual_revision)
180                    {
181                        return Err(StoreError::HeadRevisionConflict {
182                            expected: commit.expected_head_revision,
183                            actual: actual_revision,
184                        });
185                    }
186                    for completed in &commit.completed_queue_claims {
187                        if completed.session_id != commit.session_id {
188                            return Err(StoreError::QueuedWorkClaimExpired {
189                                session_id: completed.session_id.clone(),
190                                claim_id: completed.claim_id.clone(),
191                            });
192                        }
193                        ensure_queued_work_completion_conn(tx, completed)?;
194                    }
195                    for completed in &commit.completed_turn_input_claims {
196                        if completed.session_id != commit.session_id {
197                            return Err(StoreError::TurnInputClaimExpired {
198                                session_id: completed.session_id.clone(),
199                                claim_id: completed.claim_id.clone(),
200                            });
201                        }
202                        let owned_rows: usize = tx
203                            .query_row(
204                                "SELECT COUNT(*)
205                                 FROM pending_turn_inputs
206                                 WHERE session_id = ?1
207                                   AND claim_id = ?2
208                                   AND claim_token = ?3",
209                                params![
210                                    completed.session_id,
211                                    completed.claim_id,
212                                    completed.lease_token
213                                ],
214                                |row| row.get::<_, i64>(0),
215                            )
216                            .map_err(sqlite_error)? as usize;
217                        ensure_turn_input_completion_owns_all_inputs(completed, owned_rows)?;
218                    }
219
220                    let stored_checkpoint =
221                        Self::put_checkpoint_conn(tx, &commit.checkpoint, blob_profile)
222                            .map_err(sqlite_error)?;
223
224                    if !commit.usage_deltas.is_empty() {
225                        let mut stmt = tx
226                            .prepare(
227                                "INSERT INTO usage_deltas (
228                                    source, model, input_tokens, output_tokens, cache_read_input_tokens, cache_write_input_tokens, reasoning_output_tokens
229                                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
230                            )
231                            .map_err(sqlite_error)?;
232                        for entry in &commit.usage_deltas {
233                            stmt.execute(params![
234                                entry.source,
235                                entry.model,
236                                entry.usage.input_tokens,
237                                entry.usage.output_tokens,
238                                entry.usage.cache_read_input_tokens,
239                                entry.usage.cache_write_input_tokens,
240                                entry.usage.reasoning_output_tokens,
241                            ])
242                            .map_err(sqlite_error)?;
243                        }
244                    }
245
246                    let leaf_node_id = match &commit.graph {
247                        GraphCommitDelta::Unchanged { leaf_node_id } => leaf_node_id.clone(),
248                        GraphCommitDelta::Append {
249                            nodes,
250                            leaf_node_id,
251                        } => {
252                            for node in nodes {
253                                let node_json = encode_json(node);
254                                tx.execute(
255                                    "INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)",
256                                    params![node.node_id, node_json],
257                                )
258                                .map_err(sqlite_error)?;
259                            }
260                            leaf_node_id.clone()
261                        }
262                        GraphCommitDelta::ReplaceFull(graph) => {
263                            tx.execute("DELETE FROM graph_nodes", [])
264                                .map_err(sqlite_error)?;
265                            for node in &graph.nodes {
266                                let node_json = encode_json(node);
267                                tx.execute(
268                                    "INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)",
269                                    params![node.node_id, node_json],
270                                )
271                                .map_err(sqlite_error)?;
272                            }
273                            graph.leaf_node_id.clone()
274                        }
275                    };
276                    let graph_node_count: usize = tx
277                        .query_row(
278                            "SELECT COUNT(*) FROM graph_nodes WHERE tombstoned = 0",
279                            [],
280                            |row| row.get::<_, i64>(0),
281                        )
282                        .map_err(sqlite_error)? as usize;
283                    let next_revision = actual_revision + 1;
284                    let meta = SessionHeadMeta {
285                        schema_version: lash_core::store::SESSION_HEAD_META_SCHEMA_VERSION,
286                        session_id: commit.session_id.clone(),
287                        head_revision: next_revision,
288                        config: commit.config.clone(),
289                        agent_frames: commit.agent_frames.clone(),
290                        current_agent_frame_id: commit.current_agent_frame_id.clone(),
291                        checkpoint_ref: Some(stored_checkpoint.checkpoint_ref.clone()),
292                        leaf_node_id,
293                        graph_node_count,
294                        token_ledger: Vec::new(),
295                    };
296                    tx.execute(
297                        "INSERT OR REPLACE INTO session_head (singleton, session_id, head_json, head_revision)
298                         VALUES (1, ?1, ?2, ?3)",
299                        params![
300                            meta.session_id,
301                            encode_json(&meta),
302                            meta.head_revision as i64
303                        ],
304                    )
305                    .map_err(sqlite_error)?;
306                    for completed in &commit.completed_queue_claims {
307                        for batch_id in &completed.batch_ids {
308                            tx.execute(
309                                "DELETE FROM queued_work_batches
310                                 WHERE session_id = ?1
311                                   AND batch_id = ?2
312                                   AND claim_id = ?3
313                                   AND claim_token = ?4",
314                                params![
315                                    completed.session_id,
316                                    batch_id,
317                                    completed.claim_id,
318                                    completed.lease_token
319                                ],
320                            )
321                            .map_err(sqlite_error)?;
322                        }
323                    }
324                    for completed in &commit.completed_turn_input_claims {
325                        for input_id in &completed.input_ids {
326                            tx.execute(
327                                "UPDATE pending_turn_inputs
328                                 SET state = ?5,
329                                     claim_id = NULL,
330                                     claim_owner_id = NULL,
331                                     claim_owner_incarnation_id = NULL,
332                                     claim_owner_liveness_json = NULL,
333                                     claim_token = NULL,
334                                     claim_claimed_at_ms = 0,
335                                     claim_expires_at_ms = 0
336                                 WHERE session_id = ?1
337                                   AND input_id = ?2
338                                   AND claim_id = ?3
339                                   AND claim_token = ?4",
340                                params![
341                                    completed.session_id,
342                                    input_id,
343                                    completed.claim_id,
344                                    completed.lease_token,
345                                    lash_core::TurnInputState::Completed.as_str(),
346                                ],
347                            )
348                            .map_err(sqlite_error)?;
349                        }
350                    }
351                    if let Some(turn_id) = commit.interrupted_turn_input_turn_id.as_deref() {
352                        let input_ids = {
353                            let mut stmt = tx
354                                .prepare(
355                                    "SELECT input_id, ingress_json
356                                     FROM pending_turn_inputs
357                                     WHERE session_id = ?1 AND state = ?2",
358                                )
359                                .map_err(sqlite_error)?;
360                            let rows = stmt
361                                .query_map(
362                                    params![
363                                        commit.session_id,
364                                        lash_core::TurnInputState::PendingActive.as_str()
365                                    ],
366                                    |row| {
367                                        Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
368                                    },
369                                )
370                                .map_err(sqlite_error)?;
371                            let mut input_ids = Vec::new();
372                            for row in rows {
373                                let (input_id, ingress_json) = row.map_err(sqlite_error)?;
374                                let ingress = decode_turn_input_ingress(ingress_json)?;
375                                if ingress
376                                    .active_turn_id()
377                                    .is_some_and(|active| active == turn_id)
378                                {
379                                    input_ids.push(input_id);
380                                }
381                            }
382                            input_ids
383                        };
384                        let next_turn_ingress = encode_json(&lash_core::TurnInputIngress::NextTurn);
385                        let mut stmt = tx
386                            .prepare(
387                                "UPDATE pending_turn_inputs
388                                 SET state = ?3,
389                                     ingress_json = ?4,
390                                     claim_id = NULL,
391                                     claim_owner_id = NULL,
392                                     claim_owner_incarnation_id = NULL,
393                                     claim_owner_liveness_json = NULL,
394                                     claim_token = NULL,
395                                     claim_claimed_at_ms = 0,
396                                     claim_expires_at_ms = 0
397                                 WHERE session_id = ?1 AND input_id = ?2",
398                            )
399                            .map_err(sqlite_error)?;
400                        for input_id in input_ids {
401                            stmt.execute(params![
402                                commit.session_id,
403                                input_id,
404                                lash_core::TurnInputState::DeferredNextTurn.as_str(),
405                                next_turn_ingress
406                            ])
407                            .map_err(sqlite_error)?;
408                        }
409                    }
410                    if !commit.committed_attachment_ids.is_empty() {
411                        let now = current_epoch_ms() as i64;
412                        let mut stmt = tx
413                            .prepare(
414                                "UPDATE attachment_manifest
415                                 SET committed_at_ms = COALESCE(committed_at_ms, ?1)
416                                 WHERE attachment_id = ?2 AND session_id = ?3",
417                            )
418                            .map_err(sqlite_error)?;
419                        for id in &commit.committed_attachment_ids {
420                            stmt.execute(params![now, id.as_str(), commit.session_id])
421                                .map_err(sqlite_error)?;
422                        }
423                    }
424                    let result = RuntimeCommitResult {
425                        head_revision: next_revision,
426                        checkpoint_ref: stored_checkpoint.checkpoint_ref,
427                        manifest: stored_checkpoint.manifest,
428                    };
429                    if let Some(completed) = &commit.turn_commit {
430                        tx.execute(
431                            "INSERT INTO runtime_turn_commits (
432                                session_id, turn_id, turn_commit_hash, result_json, committed_at_ms
433                             )
434                             VALUES (?1, ?2, ?3, ?4, ?5)",
435                            params![
436                                completed.session_id,
437                                completed.turn_id,
438                                completed.turn_commit_hash,
439                                encode_json(&result),
440                                current_epoch_ms() as i64
441                            ],
442                        )
443                        .map_err(sqlite_error)?;
444                    }
445                    if let Some(completion) = commit.release_session_execution_lease.as_ref() {
446                        release_session_execution_lease_conn(tx, completion)?;
447                    }
448                    Ok(result)
449                })();
450                // Roll back on a `StoreError` so a failure after the first
451                // write (e.g. a head-revision conflict surfaced mid-commit, or a
452                // backend write error) does not leave the partial transaction
453                // committed, while still carrying the typed error to the caller.
454                match outcome {
455                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
456                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
457                }
458            })
459            .await
460            .map_err(sqlite_error)??;
461        self.maybe_auto_gc().await;
462        Ok(result)
463    }
464
465    async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError> {
466        Store::save_session_meta(self, meta).await;
467        Ok(())
468    }
469
470    async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError> {
471        Ok(Store::load_session_meta(self).await)
472    }
473}
474
475#[async_trait::async_trait]
476impl SessionExecutionLeaseStore for Store {
477    async fn try_claim_session_execution_lease(
478        &self,
479        session_id: &str,
480        owner: &LeaseOwnerIdentity,
481        lease_ttl_ms: u64,
482    ) -> Result<SessionExecutionLeaseClaimOutcome, StoreError> {
483        let session_id = session_id.to_string();
484        let owner = owner.clone();
485        self.conn
486            .write_flow(move |tx| {
487                let outcome: Result<SessionExecutionLeaseClaimOutcome, StoreError> = (|| {
488                    let now = current_epoch_ms();
489                    let current = load_session_execution_lease_row_conn(tx, &session_id)?;
490                    if current.as_ref().is_some_and(|lease| {
491                        lease.lease_token.is_some() && lease.expires_at_ms > now
492                    }) {
493                        let current = current.expect("checked current lease is present");
494                        if current
495                            .owner
496                            .as_ref()
497                            .is_some_and(|current_owner| current_owner.same_incarnation(&owner))
498                        {
499                            let expires_at = now.saturating_add(lease_ttl_ms);
500                            tx.execute(
501                                "UPDATE session_execution_leases
502                                 SET lease_expires_at_ms = ?2
503                                 WHERE session_id = ?1",
504                                params![session_id, expires_at as i64],
505                            )
506                            .map_err(sqlite_error)?;
507                            return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
508                                SessionExecutionLease {
509                                    session_id,
510                                    owner,
511                                    lease_token: current.lease_token.expect("live lease token set"),
512                                    fencing_token: current.fencing_token,
513                                    claimed_at_epoch_ms: current.claimed_at_ms,
514                                    expires_at_epoch_ms: expires_at,
515                                },
516                            ));
517                        }
518                        return Ok(SessionExecutionLeaseClaimOutcome::Busy {
519                            holder: row_to_session_execution_lease(&session_id, current)?,
520                        });
521                    }
522                    Ok(SessionExecutionLeaseClaimOutcome::Acquired(
523                        acquire_session_execution_lease_conn(
524                            tx,
525                            &session_id,
526                            &owner,
527                            current.as_ref().map_or(0, |lease| lease.fencing_token),
528                            now,
529                            lease_ttl_ms,
530                        )?,
531                    ))
532                })(
533                );
534                match outcome {
535                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
536                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
537                }
538            })
539            .await
540            .map_err(sqlite_error)?
541    }
542
543    async fn reclaim_session_execution_lease(
544        &self,
545        session_id: &str,
546        owner: &LeaseOwnerIdentity,
547        observed_holder: &SessionExecutionLeaseFence,
548        lease_ttl_ms: u64,
549    ) -> Result<SessionExecutionLeaseClaimOutcome, StoreError> {
550        let session_id = session_id.to_string();
551        let owner = owner.clone();
552        let observed_holder = observed_holder.clone();
553        self.conn
554            .write_flow(move |tx| {
555                let outcome: Result<SessionExecutionLeaseClaimOutcome, StoreError> = (|| {
556                    let now = current_epoch_ms();
557                    let current = load_session_execution_lease_row_conn(tx, &session_id)?;
558                    let Some(current) = current else {
559                        return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
560                            acquire_session_execution_lease_conn(
561                                tx,
562                                &session_id,
563                                &owner,
564                                0,
565                                now,
566                                lease_ttl_ms,
567                            )?,
568                        ));
569                    };
570                    if current.lease_token.is_none() || current.expires_at_ms <= now {
571                        return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
572                            acquire_session_execution_lease_conn(
573                                tx,
574                                &session_id,
575                                &owner,
576                                current.fencing_token,
577                                now,
578                                lease_ttl_ms,
579                            )?,
580                        ));
581                    }
582                    let holder = row_to_session_execution_lease(&session_id, current)?;
583                    if observed_holder.session_id == session_id
584                        && holder.owner.same_incarnation(&observed_holder.owner)
585                        && holder.lease_token == observed_holder.lease_token
586                        && holder.fencing_token == observed_holder.fencing_token
587                        && holder.owner.is_definitely_dead_for_claimant(&owner)
588                    {
589                        let fencing_token = holder.fencing_token.saturating_add(1);
590                        let lease_token = format!(
591                            "{}:{}:{}:{now}:{fencing_token}",
592                            session_id, owner.owner_id, owner.incarnation_id
593                        );
594                        let expires_at = now.saturating_add(lease_ttl_ms);
595                        let liveness_json = encode_liveness(&owner.liveness)?;
596                        let changed = tx
597                            .execute(
598                                "UPDATE session_execution_leases
599                                 SET lease_owner_id = ?1,
600                                     lease_owner_incarnation_id = ?2,
601                                     lease_owner_liveness_json = ?3,
602                                     lease_token = ?4,
603                                     lease_fencing_token = ?5,
604                                     lease_claimed_at_ms = ?6,
605                                     lease_expires_at_ms = ?7
606                                 WHERE session_id = ?8
607                                   AND lease_owner_id = ?9
608                                   AND lease_owner_incarnation_id = ?10
609                                   AND lease_token = ?11
610                                   AND lease_fencing_token = ?12",
611                                params![
612                                    owner.owner_id,
613                                    owner.incarnation_id,
614                                    liveness_json,
615                                    lease_token,
616                                    fencing_token as i64,
617                                    now as i64,
618                                    expires_at as i64,
619                                    session_id,
620                                    observed_holder.owner.owner_id,
621                                    observed_holder.owner.incarnation_id,
622                                    observed_holder.lease_token,
623                                    observed_holder.fencing_token as i64,
624                                ],
625                            )
626                            .map_err(sqlite_error)?;
627                        if changed == 1 {
628                            return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
629                                SessionExecutionLease {
630                                    session_id,
631                                    owner,
632                                    lease_token,
633                                    fencing_token,
634                                    claimed_at_epoch_ms: now,
635                                    expires_at_epoch_ms: expires_at,
636                                },
637                            ));
638                        }
639                        let current = load_session_execution_lease_row_conn(tx, &session_id)?;
640                        if current.as_ref().is_some_and(|lease| {
641                            lease.lease_token.is_some() && lease.expires_at_ms > now
642                        }) {
643                            let current = current.expect("checked current lease is present");
644                            return Ok(SessionExecutionLeaseClaimOutcome::Busy {
645                                holder: row_to_session_execution_lease(&session_id, current)?,
646                            });
647                        }
648                        let previous_fencing_token =
649                            current.as_ref().map_or(0, |lease| lease.fencing_token);
650                        return Ok(SessionExecutionLeaseClaimOutcome::Acquired(
651                            acquire_session_execution_lease_conn(
652                                tx,
653                                &session_id,
654                                &owner,
655                                previous_fencing_token,
656                                now,
657                                lease_ttl_ms,
658                            )?,
659                        ));
660                    }
661                    Ok(SessionExecutionLeaseClaimOutcome::Busy { holder })
662                })(
663                );
664                match outcome {
665                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
666                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
667                }
668            })
669            .await
670            .map_err(sqlite_error)?
671    }
672
673    async fn renew_session_execution_lease(
674        &self,
675        fence: &SessionExecutionLeaseFence,
676        lease_ttl_ms: u64,
677    ) -> Result<SessionExecutionLease, StoreError> {
678        let fence = fence.clone();
679        self.conn
680            .write_flow(move |tx| {
681                let outcome: Result<SessionExecutionLease, StoreError> = (|| {
682                    let now = current_epoch_ms();
683                    let current = load_session_execution_lease_row_conn(tx, &fence.session_id)?;
684                    let Some(current) = current else {
685                        return Err(StoreError::SessionExecutionLeaseExpired {
686                            session_id: fence.session_id.clone(),
687                        });
688                    };
689                    if !current
690                        .owner
691                        .as_ref()
692                        .is_some_and(|owner| owner.same_incarnation(&fence.owner))
693                        || current.lease_token.as_deref() != Some(fence.lease_token.as_str())
694                        || current.fencing_token != fence.fencing_token
695                        || current.expires_at_ms <= now
696                    {
697                        return Err(StoreError::SessionExecutionLeaseExpired {
698                            session_id: fence.session_id.clone(),
699                        });
700                    }
701                    let expires_at = now.saturating_add(lease_ttl_ms);
702                    tx.execute(
703                        "UPDATE session_execution_leases
704                         SET lease_expires_at_ms = ?5
705                         WHERE session_id = ?1
706                           AND lease_owner_id = ?2
707                           AND lease_owner_incarnation_id = ?3
708                           AND lease_token = ?4
709                           AND lease_fencing_token = ?6",
710                        params![
711                            fence.session_id,
712                            fence.owner.owner_id,
713                            fence.owner.incarnation_id,
714                            fence.lease_token,
715                            expires_at as i64,
716                            fence.fencing_token as i64
717                        ],
718                    )
719                    .map_err(sqlite_error)?;
720                    Ok(SessionExecutionLease {
721                        session_id: fence.session_id,
722                        owner: fence.owner,
723                        lease_token: fence.lease_token,
724                        fencing_token: fence.fencing_token,
725                        claimed_at_epoch_ms: current.claimed_at_ms,
726                        expires_at_epoch_ms: expires_at,
727                    })
728                })();
729                match outcome {
730                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
731                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
732                }
733            })
734            .await
735            .map_err(sqlite_error)?
736    }
737
738    async fn release_session_execution_lease(
739        &self,
740        completion: &SessionExecutionLeaseCompletion,
741    ) -> Result<(), StoreError> {
742        let completion = completion.clone();
743        self.conn
744            .write_flow(move |tx| {
745                let outcome = release_session_execution_lease_conn(tx, &completion);
746                match outcome {
747                    Ok(()) => Ok(TxOutcome::Commit(Ok(()))),
748                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
749                }
750            })
751            .await
752            .map_err(sqlite_error)?
753    }
754}
755
756#[async_trait::async_trait]
757impl QueuedWorkStore for Store {
758    async fn enqueue_queued_work(
759        &self,
760        batch: QueuedWorkBatchDraft,
761    ) -> Result<QueuedWorkBatch, StoreError> {
762        let nonce = self.commit_count.fetch_add(1, AtomicOrdering::Relaxed);
763        self.conn
764            .write_flow(move |tx| {
765                let outcome: Result<QueuedWorkBatch, StoreError> = (|| {
766                    if let Some(source_key) = batch.source_key.as_deref() {
767                        let existing_id: Option<String> = tx
768                            .query_row(
769                                "SELECT batch_id
770                                 FROM queued_work_batches
771                                 WHERE session_id = ?1 AND source_key = ?2",
772                                params![batch.session_id, source_key],
773                                |row| row.get(0),
774                            )
775                            .optional()
776                            .map_err(sqlite_error)?;
777                        if let Some(batch_id) = existing_id {
778                            let existing = load_queued_batch_by_id_conn(tx, &batch_id)?
779                                .ok_or_else(|| {
780                                    StoreError::Backend(
781                                        "queued work source row disappeared".to_string(),
782                                    )
783                                })?;
784                            return Ok(existing);
785                        }
786                    }
787                    let now = current_epoch_ms();
788                    let batch_id =
789                        derive_batch_id(&batch.session_id, batch.source_key.as_deref(), now, Some(nonce));
790                    tx.execute(
791                        "INSERT INTO queued_work_batches (
792                            batch_id, session_id, source_key, delivery_policy, slot_policy,
793                            merge_key_json, available_at_ms, enqueued_at_ms
794                         )
795                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
796                        params![
797                            batch_id,
798                            batch.session_id,
799                            batch.source_key.as_deref(),
800                            batch.delivery_policy.as_str(),
801                            batch.slot_policy.as_str(),
802                            encode_json(&batch.merge_key),
803                            batch.available_at_ms as i64,
804                            now as i64,
805                        ],
806                    )
807                    .map_err(sqlite_error)?;
808                    for (index, payload) in batch.payloads.iter().enumerate() {
809                        let item_id = format!("{batch_id}:item:{index}");
810                        tx.execute(
811                            "INSERT INTO queued_work_items (batch_id, item_index, item_id, payload_json)
812                             VALUES (?1, ?2, ?3, ?4)",
813                            params![batch_id, index as i64, item_id, encode_json(payload)],
814                        )
815                        .map_err(sqlite_error)?;
816                    }
817                    load_queued_batch_by_id_conn(tx, &batch_id)?.ok_or_else(|| {
818                        StoreError::Backend("queued work insert disappeared".to_string())
819                    })
820                })();
821                // Roll back the partially-inserted batch/items on a
822                // `StoreError` while still returning the typed error.
823                match outcome {
824                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
825                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
826                }
827            })
828            .await
829            .map_err(sqlite_error)?
830    }
831
832    async fn claim_leading_ready_session_command(
833        &self,
834        session_id: &str,
835        session_execution_lease: &SessionExecutionLeaseFence,
836        owner: &LeaseOwnerIdentity,
837        lease_ttl_ms: u64,
838    ) -> Result<Option<QueuedWorkClaim>, StoreError> {
839        let session_id = session_id.to_string();
840        let session_execution_lease = session_execution_lease.clone();
841        let owner = owner.clone();
842        self.conn
843            .write_flow(move |tx| {
844                let outcome: Result<TxOutcome<Option<QueuedWorkClaim>>, StoreError> = (|| {
845                    ensure_session_execution_lease_conn(
846                        tx,
847                        &session_id,
848                        &session_execution_lease,
849                    )?;
850                    let now = current_epoch_ms();
851                    let candidate_rows = {
852                        let mut stmt = tx
853                            .prepare(
854                                "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
855                                        slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
856                                        claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
857                                        claim_owner_liveness_json, claim_token, claim_expires_at_ms
858                                 FROM queued_work_batches
859                                 WHERE session_id = ?1
860                                   AND available_at_ms <= ?2
861                                 ORDER BY enqueue_seq ASC
862                                 LIMIT ?3",
863                            )
864                            .map_err(sqlite_error)?;
865                        let rows = stmt
866                            .query_map(
867                                params![session_id, now as i64, claim_scan_limit(1)],
868                                queued_batch_row_from_sql,
869                            )
870                            .map_err(sqlite_error)?;
871                        rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
872                    };
873                    let candidate_rows = candidate_rows
874                        .into_iter()
875                        .filter(|row| {
876                            row.claim_token.is_none()
877                                || row.claim_expires_at_ms <= now
878                                || row
879                                    .claim_owner
880                                    .as_ref()
881                                    .is_some_and(|holder| holder.is_definitely_dead_for_claimant(&owner))
882                        })
883                        .collect::<Vec<_>>();
884                    let candidate_batches = candidate_rows
885                        .iter()
886                        .map(|row| queued_work_batch_from_conn(tx, row.clone()))
887                        .collect::<Result<Vec<_>, StoreError>>()?;
888                    let candidates = candidate_rows
889                        .iter()
890                        .zip(candidate_batches.iter())
891                        .map(|(row, batch)| {
892                            Ok(ClaimCandidate {
893                                enqueue_seq: row.enqueue_seq,
894                                claim_fencing_token: row.claim_fencing_token,
895                                work_class: batch.work_class().ok_or_else(|| {
896                                    StoreError::Backend(format!(
897                                        "queued-work batch `{}` has mixed or empty payload classes",
898                                        batch.batch_id
899                                    ))
900                                })?,
901                                delivery_policy: decode_delivery_policy(
902                                    row.delivery_policy.clone(),
903                                )?,
904                                slot_policy: decode_slot_policy(row.slot_policy.clone())?,
905                                merge_key: decode_merge_key(row.merge_key_json.clone())?,
906                            })
907                        })
908                        .collect::<Result<Vec<_>, StoreError>>()?;
909                    let selected_len = select_leading_session_command(&candidates);
910                    if selected_len == 0 {
911                        return Ok(TxOutcome::Commit(None));
912                    }
913                    let mut selected = candidate_rows;
914                    selected.truncate(selected_len);
915                    let mut selected_batches = candidate_batches;
916                    selected_batches.truncate(selected_len);
917                    let lease = QueuedWorkClaimLease::derive(
918                        &candidates[0],
919                        &session_id,
920                        &owner,
921                        now,
922                        lease_ttl_ms,
923                    );
924                    let liveness_json = encode_liveness(&owner.liveness)?;
925                    for row in &selected {
926                        let claimed = tx
927                            .execute(
928                                "UPDATE queued_work_batches
929                                 SET claim_id = ?3,
930                                     claim_owner_id = ?4,
931                                     claim_owner_incarnation_id = ?5,
932                                     claim_owner_liveness_json = ?6,
933                                     claim_token = ?7,
934                                     claim_fencing_token = claim_fencing_token + 1,
935                                     claim_claimed_at_ms = ?8,
936                                     claim_expires_at_ms = ?9
937                                 WHERE session_id = ?1
938                                   AND batch_id = ?2
939                                   AND (
940                                        claim_token IS NULL
941                                        OR claim_expires_at_ms <= ?8
942                                        OR (
943                                            claim_token = ?10
944                                            AND claim_owner_id = ?11
945                                            AND claim_owner_incarnation_id = ?12
946                                        )
947                                   )",
948                                params![
949                                    session_id,
950                                    row.batch_id,
951                                    lease.claim_id,
952                                    owner.owner_id.as_str(),
953                                    owner.incarnation_id.as_str(),
954                                    liveness_json.as_str(),
955                                    lease.lease_token,
956                                    now as i64,
957                                    lease.expires_at_epoch_ms as i64,
958                                    row.claim_token,
959                                    row.claim_owner.as_ref().map(|owner| owner.owner_id.as_str()),
960                                    row.claim_owner
961                                        .as_ref()
962                                        .map(|owner| owner.incarnation_id.as_str())
963                                ],
964                            )
965                            .map_err(sqlite_error)?;
966                        if claimed == 0 {
967                            return Ok(TxOutcome::Rollback(None));
968                        }
969                    }
970                    Ok(TxOutcome::Commit(Some(QueuedWorkClaim {
971                        session_id: session_id.clone(),
972                        claim_id: lease.claim_id,
973                        owner: owner.clone(),
974                        lease_token: lease.lease_token,
975                        fencing_token: lease.fencing_token,
976                        claimed_at_epoch_ms: lease.claimed_at_epoch_ms,
977                        expires_at_epoch_ms: lease.expires_at_epoch_ms,
978                        batches: selected_batches,
979                    })))
980                })();
981                match outcome {
982                    Ok(TxOutcome::Commit(value)) => Ok(TxOutcome::Commit(Ok(value))),
983                    Ok(TxOutcome::Rollback(value)) => Ok(TxOutcome::Rollback(Ok(value))),
984                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
985                }
986            })
987            .await
988            .map_err(sqlite_error)?
989    }
990
991    async fn claim_ready_queued_work(
992        &self,
993        session_id: &str,
994        session_execution_lease: &SessionExecutionLeaseFence,
995        owner: &LeaseOwnerIdentity,
996        boundary: QueuedWorkClaimBoundary,
997        lease_ttl_ms: u64,
998        max_batches: usize,
999    ) -> Result<Option<QueuedWorkClaim>, StoreError> {
1000        if max_batches == 0 {
1001            return Ok(None);
1002        }
1003        let session_id = session_id.to_string();
1004        let session_execution_lease = session_execution_lease.clone();
1005        let owner = owner.clone();
1006        self.conn
1007            .write_flow(move |tx| {
1008                let outcome: Result<TxOutcome<Option<QueuedWorkClaim>>, StoreError> = (|| {
1009                    ensure_session_execution_lease_conn(
1010                        tx,
1011                        &session_id,
1012                        &session_execution_lease,
1013                    )?;
1014                    let now = current_epoch_ms();
1015                    let candidate_rows = {
1016                        let mut stmt = tx
1017                            .prepare(
1018                                "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1019                                        slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1020                                        claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
1021                                        claim_owner_liveness_json, claim_token, claim_expires_at_ms
1022                                 FROM queued_work_batches
1023                                 WHERE session_id = ?1
1024                                   AND available_at_ms <= ?2
1025                                 ORDER BY enqueue_seq ASC
1026                                 LIMIT ?3",
1027                            )
1028                            .map_err(sqlite_error)?;
1029                        let rows = stmt
1030                            .query_map(
1031                                params![session_id, now as i64, claim_scan_limit(max_batches)],
1032                                queued_batch_row_from_sql,
1033                            )
1034                            .map_err(sqlite_error)?;
1035                        rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1036                    };
1037                    let candidate_rows = candidate_rows
1038                        .into_iter()
1039                        .filter(|row| {
1040                            row.claim_token.is_none()
1041                                || row.claim_expires_at_ms <= now
1042                                || row
1043                                    .claim_owner
1044                                    .as_ref()
1045                                    .is_some_and(|holder| holder.is_definitely_dead_for_claimant(&owner))
1046                        })
1047                        .collect::<Vec<_>>();
1048                    let candidate_batches = candidate_rows
1049                        .iter()
1050                        .map(|row| queued_work_batch_from_conn(tx, row.clone()))
1051                        .collect::<Result<Vec<_>, StoreError>>()?;
1052                    let candidates = candidate_rows
1053                        .iter()
1054                        .zip(candidate_batches.iter())
1055                        .map(|(row, batch)| {
1056                            Ok(ClaimCandidate {
1057                                enqueue_seq: row.enqueue_seq,
1058                                claim_fencing_token: row.claim_fencing_token,
1059                                work_class: batch.work_class().ok_or_else(|| {
1060                                    StoreError::Backend(format!(
1061                                        "queued-work batch `{}` has mixed or empty payload classes",
1062                                        batch.batch_id
1063                                    ))
1064                                })?,
1065                                delivery_policy: decode_delivery_policy(
1066                                    row.delivery_policy.clone(),
1067                                )?,
1068                                slot_policy: decode_slot_policy(row.slot_policy.clone())?,
1069                                merge_key: decode_merge_key(row.merge_key_json.clone())?,
1070                            })
1071                        })
1072                        .collect::<Result<Vec<_>, StoreError>>()?;
1073                    let selected_len =
1074                        select_turn_work_claim_prefix(&candidates, boundary, max_batches);
1075                    if selected_len == 0 {
1076                        return Ok(TxOutcome::Commit(None));
1077                    }
1078                    let mut selected = candidate_rows;
1079                    selected.truncate(selected_len);
1080                    let mut selected_batches = candidate_batches;
1081                    selected_batches.truncate(selected_len);
1082                    let lease = QueuedWorkClaimLease::derive(
1083                        &candidates[0],
1084                        &session_id,
1085                        &owner,
1086                        now,
1087                        lease_ttl_ms,
1088                    );
1089                    let liveness_json = encode_liveness(&owner.liveness)?;
1090                    for row in &selected {
1091                        // Under `BEGIN IMMEDIATE` this connection already holds
1092                        // the write lock, but the row could still have been
1093                        // claimed by an earlier committed writer (its
1094                        // `claim_token` set and not yet expired). The `WHERE`
1095                        // clause filters those out, so a 0-row update means we
1096                        // lost the race for this batch: treat the whole claim as
1097                        // not-won rather than returning a claim that doesn't
1098                        // actually own the row.
1099                        let claimed = tx
1100                            .execute(
1101                                "UPDATE queued_work_batches
1102                                 SET claim_id = ?3,
1103                                     claim_owner_id = ?4,
1104                                     claim_owner_incarnation_id = ?5,
1105                                     claim_owner_liveness_json = ?6,
1106                                     claim_token = ?7,
1107                                     claim_fencing_token = claim_fencing_token + 1,
1108                                     claim_claimed_at_ms = ?8,
1109                                     claim_expires_at_ms = ?9
1110                                 WHERE session_id = ?1
1111                                   AND batch_id = ?2
1112                                   AND (
1113                                        claim_token IS NULL
1114                                        OR claim_expires_at_ms <= ?8
1115                                        OR (
1116                                            claim_token = ?10
1117                                            AND claim_owner_id = ?11
1118                                            AND claim_owner_incarnation_id = ?12
1119                                        )
1120                                   )",
1121                                params![
1122                                    session_id,
1123                                    row.batch_id,
1124                                    lease.claim_id,
1125                                    owner.owner_id.as_str(),
1126                                    owner.incarnation_id.as_str(),
1127                                    liveness_json.as_str(),
1128                                    lease.lease_token,
1129                                    now as i64,
1130                                    lease.expires_at_epoch_ms as i64,
1131                                    row.claim_token,
1132                                    row.claim_owner.as_ref().map(|owner| owner.owner_id.as_str()),
1133                                    row.claim_owner
1134                                        .as_ref()
1135                                        .map(|owner| owner.incarnation_id.as_str())
1136                                ],
1137                            )
1138                            .map_err(sqlite_error)?;
1139                        if claimed == 0 {
1140                            // Lost the race for this batch. Roll back any sibling
1141                            // rows we already claimed in this transaction so we
1142                            // never return a half-owned claim.
1143                            return Ok(TxOutcome::Rollback(None));
1144                        }
1145                    }
1146                    Ok(TxOutcome::Commit(Some(QueuedWorkClaim {
1147                        session_id: session_id.clone(),
1148                        claim_id: lease.claim_id,
1149                        owner: owner.clone(),
1150                        lease_token: lease.lease_token,
1151                        fencing_token: lease.fencing_token,
1152                        claimed_at_epoch_ms: lease.claimed_at_epoch_ms,
1153                        expires_at_epoch_ms: lease.expires_at_epoch_ms,
1154                        batches: selected_batches,
1155                    })))
1156                })();
1157                // Lower a `StoreError` into the rollback arm so the closure body
1158                // can keep using `?` while still propagating the error to the
1159                // caller. Encode it as a `Result` carried out of the flow.
1160                match outcome {
1161                    Ok(TxOutcome::Commit(value)) => Ok(TxOutcome::Commit(Ok(value))),
1162                    Ok(TxOutcome::Rollback(value)) => Ok(TxOutcome::Rollback(Ok(value))),
1163                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1164                }
1165            })
1166            .await
1167            .map_err(sqlite_error)?
1168    }
1169
1170    async fn renew_queued_work_claim(
1171        &self,
1172        claim: &QueuedWorkClaim,
1173        lease_ttl_ms: u64,
1174    ) -> Result<QueuedWorkClaim, StoreError> {
1175        let now = current_epoch_ms();
1176        let expires_at = now.saturating_add(lease_ttl_ms);
1177        let session_id = claim.session_id.clone();
1178        let claim_id = claim.claim_id.clone();
1179        let lease_token = claim.lease_token.clone();
1180        let changed = self
1181            .conn
1182            .write(move |tx| {
1183                tx.execute(
1184                    "UPDATE queued_work_batches
1185                     SET claim_expires_at_ms = ?4
1186                     WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
1187                    params![session_id, claim_id, lease_token, expires_at as i64],
1188                )
1189            })
1190            .await
1191            .map_err(sqlite_error)?;
1192        renewed_claim(claim, changed, expires_at)
1193    }
1194
1195    async fn abandon_queued_work_claim(&self, claim: &QueuedWorkClaim) -> Result<(), StoreError> {
1196        let session_id = claim.session_id.clone();
1197        let claim_id = claim.claim_id.clone();
1198        let lease_token = claim.lease_token.clone();
1199        self.conn
1200            .write(move |tx| {
1201                tx.execute(
1202                    "UPDATE queued_work_batches
1203                     SET claim_id = NULL,
1204                         claim_owner_id = NULL,
1205                         claim_owner_incarnation_id = NULL,
1206                         claim_owner_liveness_json = NULL,
1207                         claim_token = NULL,
1208                         claim_claimed_at_ms = 0,
1209                         claim_expires_at_ms = 0
1210                     WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
1211                    params![session_id, claim_id, lease_token],
1212                )
1213            })
1214            .await
1215            .map_err(sqlite_error)?;
1216        Ok(())
1217    }
1218
1219    async fn cancel_queued_work_batch(
1220        &self,
1221        session_id: &str,
1222        batch_id: &str,
1223    ) -> Result<Option<QueuedWorkBatch>, StoreError> {
1224        let session_id = session_id.to_string();
1225        let batch_id = batch_id.to_string();
1226        self.conn
1227            .write_flow(move |tx| {
1228                let outcome: Result<Option<QueuedWorkBatch>, StoreError> = (|| {
1229                    let now = current_epoch_ms() as i64;
1230                    let row = tx
1231                        .query_row(
1232                            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1233                                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1234                                    claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
1235                                    claim_owner_liveness_json, claim_token, claim_expires_at_ms
1236                             FROM queued_work_batches
1237                             WHERE session_id = ?1
1238                               AND batch_id = ?2
1239                               AND (claim_token IS NULL OR claim_expires_at_ms <= ?3)",
1240                            params![session_id, batch_id, now],
1241                            queued_batch_row_from_sql,
1242                        )
1243                        .optional()
1244                        .map_err(sqlite_error)?;
1245                    let Some(row) = row else {
1246                        return Ok(None);
1247                    };
1248                    let batch = queued_work_batch_from_conn(tx, row)?;
1249                    tx.execute(
1250                        "DELETE FROM queued_work_batches
1251                         WHERE session_id = ?1
1252                           AND batch_id = ?2
1253                           AND (claim_token IS NULL OR claim_expires_at_ms <= ?3)",
1254                        params![session_id, batch_id, now],
1255                    )
1256                    .map_err(sqlite_error)?;
1257                    Ok(Some(batch))
1258                })();
1259                match outcome {
1260                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
1261                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1262                }
1263            })
1264            .await
1265            .map_err(sqlite_error)?
1266    }
1267
1268    async fn list_queued_work(&self, session_id: &str) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1269        let session_id = session_id.to_string();
1270        self.conn
1271            .call(move |conn| {
1272                let outcome: Result<Vec<QueuedWorkBatch>, StoreError> = (|| {
1273                    let rows = {
1274                        let mut stmt = conn
1275                            .prepare(
1276                                "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1277                                        slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1278                                        claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
1279                                        claim_owner_liveness_json, claim_token, claim_expires_at_ms
1280                                 FROM queued_work_batches
1281                                 WHERE session_id = ?1
1282                                 ORDER BY enqueue_seq ASC",
1283                            )
1284                            .map_err(sqlite_error)?;
1285                        let rows = stmt
1286                            .query_map(params![session_id], queued_batch_row_from_sql)
1287                            .map_err(sqlite_error)?;
1288                        rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1289                    };
1290                    rows.into_iter()
1291                        .map(|row| queued_work_batch_from_conn(conn, row))
1292                        .collect()
1293                })();
1294                Ok(outcome)
1295            })
1296            .await
1297            .map_err(sqlite_error)?
1298    }
1299
1300    async fn list_pending_queued_work(
1301        &self,
1302        session_id: &str,
1303    ) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1304        let session_id = session_id.to_string();
1305        self.conn
1306            .call(move |conn| {
1307                let outcome: Result<Vec<QueuedWorkBatch>, StoreError> = (|| {
1308                    let now = current_epoch_ms();
1309                    let rows = {
1310                        let mut stmt = conn
1311                            .prepare(
1312                                "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1313                                        slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1314                                        claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
1315                                        claim_owner_liveness_json, claim_token, claim_expires_at_ms
1316                                 FROM queued_work_batches
1317                                 WHERE session_id = ?1
1318                                   AND (claim_token IS NULL OR claim_expires_at_ms <= ?2)
1319                                 ORDER BY enqueue_seq ASC",
1320                            )
1321                            .map_err(sqlite_error)?;
1322                        let rows = stmt
1323                            .query_map(
1324                                params![session_id, now as i64],
1325                                queued_batch_row_from_sql,
1326                            )
1327                            .map_err(sqlite_error)?;
1328                        rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1329                    };
1330                    rows.into_iter()
1331                        .map(|row| queued_work_batch_from_conn(conn, row))
1332                        .collect()
1333                })();
1334                Ok(outcome)
1335            })
1336            .await
1337            .map_err(sqlite_error)?
1338    }
1339}
1340
1341#[async_trait::async_trait]
1342impl TurnInputStore for Store {
1343    async fn enqueue_pending_turn_input(
1344        &self,
1345        draft: lash_core::PendingTurnInputDraft,
1346    ) -> Result<lash_core::PendingTurnInput, StoreError> {
1347        let nonce = self.commit_count.fetch_add(1, AtomicOrdering::Relaxed);
1348        self.conn
1349            .write_flow(move |tx| {
1350                let outcome: Result<lash_core::PendingTurnInput, StoreError> = (|| {
1351                    if let Some(source_key) = draft.source_key.as_deref() {
1352                        let existing_id: Option<String> = tx
1353                            .query_row(
1354                                "SELECT input_id
1355                                 FROM pending_turn_inputs
1356                                 WHERE session_id = ?1 AND source_key = ?2",
1357                                params![draft.session_id, source_key],
1358                                |row| row.get(0),
1359                            )
1360                            .optional()
1361                            .map_err(sqlite_error)?;
1362                        if let Some(input_id) = existing_id {
1363                            let existing = load_pending_turn_input_by_id_conn(
1364                                tx,
1365                                &draft.session_id,
1366                                &input_id,
1367                            )?
1368                            .ok_or_else(|| {
1369                                StoreError::Backend(
1370                                    "pending turn input source row disappeared".to_string(),
1371                                )
1372                            })?;
1373                            if !draft.submitted_content_matches(&existing).map_err(|err| {
1374                                StoreError::Backend(format!(
1375                                    "failed to compare pending turn input submission: {err}"
1376                                ))
1377                            })? {
1378                                return Err(StoreError::PendingTurnInputSourceKeyConflict {
1379                                    session_id: draft.session_id.clone(),
1380                                    source_key: source_key.to_string(),
1381                                    existing_input_id: existing.input_id.clone(),
1382                                });
1383                            }
1384                            return Ok(existing);
1385                        }
1386                    }
1387                    let now = current_epoch_ms();
1388                    let input_id = draft.input_id.clone().unwrap_or_else(|| {
1389                        derive_pending_turn_input_id(
1390                            &draft.session_id,
1391                            draft.source_key.as_deref(),
1392                            now,
1393                            nonce,
1394                        )
1395                    });
1396                    let state = match draft.ingress {
1397                        lash_core::TurnInputIngress::ActiveTurn { .. } => {
1398                            lash_core::TurnInputState::PendingActive
1399                        }
1400                        lash_core::TurnInputIngress::NextTurn => {
1401                            lash_core::TurnInputState::DeferredNextTurn
1402                        }
1403                    };
1404                    tx.execute(
1405                        "INSERT INTO pending_turn_inputs (
1406                            input_id, session_id, source_key, ingress_json, state,
1407                            input_json, enqueued_at_ms
1408                         )
1409                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1410                        params![
1411                            input_id,
1412                            draft.session_id,
1413                            draft.source_key.as_deref(),
1414                            encode_json(&draft.ingress),
1415                            state.as_str(),
1416                            encode_json(&draft.input),
1417                            now as i64,
1418                        ],
1419                    )
1420                    .map_err(sqlite_error)?;
1421                    load_pending_turn_input_by_id_conn(tx, &draft.session_id, &input_id)?
1422                        .ok_or_else(|| {
1423                            StoreError::Backend("pending turn input insert disappeared".to_string())
1424                        })
1425                })();
1426                match outcome {
1427                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
1428                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1429                }
1430            })
1431            .await
1432            .map_err(sqlite_error)?
1433    }
1434
1435    async fn list_pending_turn_inputs(
1436        &self,
1437        session_id: &str,
1438    ) -> Result<Vec<lash_core::PendingTurnInput>, StoreError> {
1439        let session_id = session_id.to_string();
1440        self.conn
1441            .call(move |conn| {
1442                let outcome: Result<Vec<lash_core::PendingTurnInput>, StoreError> = (|| {
1443                    let now = current_epoch_ms();
1444                    let rows = {
1445                        let mut stmt = conn
1446                            .prepare(
1447                                "SELECT enqueue_seq, input_id, session_id, source_key, ingress_json,
1448                                        state, input_json, enqueued_at_ms, claim_id, claim_fencing_token,
1449                                        claim_owner_id, claim_owner_incarnation_id,
1450                                        claim_owner_liveness_json, claim_token, claim_expires_at_ms
1451                                 FROM pending_turn_inputs
1452                                 WHERE session_id = ?1
1453                                   AND state IN (?2, ?3)
1454                                   AND (claim_token IS NULL OR claim_expires_at_ms <= ?4)
1455                                 ORDER BY enqueue_seq ASC",
1456                            )
1457                            .map_err(sqlite_error)?;
1458                        let rows = stmt
1459                            .query_map(
1460                                params![
1461                                    session_id,
1462                                    lash_core::TurnInputState::PendingActive.as_str(),
1463                                    lash_core::TurnInputState::DeferredNextTurn.as_str(),
1464                                    now as i64
1465                                ],
1466                                pending_turn_input_row_from_sql,
1467                            )
1468                            .map_err(sqlite_error)?;
1469                        rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1470                    };
1471                    rows.into_iter().map(pending_turn_input_from_row).collect()
1472                })(
1473                );
1474                Ok(outcome)
1475            })
1476            .await
1477            .map_err(sqlite_error)?
1478    }
1479
1480    async fn cancel_pending_turn_inputs(
1481        &self,
1482        session_id: &str,
1483        targets: &[lash_core::PendingTurnInputCancelTarget],
1484    ) -> Result<Vec<lash_core::PendingTurnInputCancelResult>, StoreError> {
1485        let session_id = session_id.to_string();
1486        let targets = targets.to_vec();
1487        self.conn
1488            .write_flow(move |tx| {
1489                let outcome: Result<Vec<lash_core::PendingTurnInputCancelResult>, StoreError> =
1490                    (|| {
1491                        let now = current_epoch_ms();
1492                        let mut results = Vec::with_capacity(targets.len());
1493                        for target in targets {
1494                            let outcome = match load_pending_turn_input_row_by_target_conn(
1495                                tx,
1496                                &session_id,
1497                                &target,
1498                            )? {
1499                                Some(row) => cancel_pending_turn_input_row_conn(tx, row, now)?,
1500                                None => lash_core::PendingTurnInputCancelOutcome::NotFound,
1501                            };
1502                            results
1503                                .push(lash_core::PendingTurnInputCancelResult { target, outcome });
1504                        }
1505                        Ok(results)
1506                    })();
1507                match outcome {
1508                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
1509                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1510                }
1511            })
1512            .await
1513            .map_err(sqlite_error)?
1514    }
1515
1516    async fn cancel_pending_turn_input_suffix(
1517        &self,
1518        session_id: &str,
1519        anchor: &lash_core::PendingTurnInputCancelTarget,
1520    ) -> Result<lash_core::PendingTurnInputSuffixCancelOutcome, StoreError> {
1521        let session_id = session_id.to_string();
1522        let anchor = anchor.clone();
1523        self.conn
1524            .write_flow(move |tx| {
1525                let outcome: Result<lash_core::PendingTurnInputSuffixCancelOutcome, StoreError> =
1526                    (|| {
1527                        let now = current_epoch_ms();
1528                        let Some(anchor_row) =
1529                            load_pending_turn_input_row_by_target_conn(tx, &session_id, &anchor)?
1530                        else {
1531                            return Ok(
1532                                lash_core::PendingTurnInputSuffixCancelOutcome::AnchorNotFound {
1533                                    anchor,
1534                                },
1535                            );
1536                        };
1537                        let rows = {
1538                            let mut stmt = tx
1539                                .prepare(
1540                                    "SELECT enqueue_seq, input_id, session_id, source_key, ingress_json,
1541                                            state, input_json, enqueued_at_ms, claim_id, claim_fencing_token,
1542                                            claim_owner_id, claim_owner_incarnation_id,
1543                                            claim_owner_liveness_json, claim_token, claim_expires_at_ms
1544                                     FROM pending_turn_inputs
1545                                     WHERE session_id = ?1 AND enqueue_seq >= ?2
1546                                     ORDER BY enqueue_seq ASC",
1547                                )
1548                                .map_err(sqlite_error)?;
1549                            let rows = stmt
1550                                .query_map(
1551                                    params![session_id, anchor_row.enqueue_seq as i64],
1552                                    pending_turn_input_row_from_sql,
1553                                )
1554                                .map_err(sqlite_error)?;
1555                            rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1556                        };
1557                        let mut outcomes = Vec::with_capacity(rows.len());
1558                        for row in rows {
1559                            outcomes.push(cancel_pending_turn_input_row_conn(tx, row, now)?);
1560                        }
1561                        Ok(lash_core::PendingTurnInputSuffixCancelOutcome::Outcomes {
1562                            anchor,
1563                            outcomes,
1564                        })
1565                    })();
1566                match outcome {
1567                    Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
1568                    Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1569                }
1570            })
1571            .await
1572            .map_err(sqlite_error)?
1573    }
1574
1575    async fn claim_active_turn_inputs(
1576        &self,
1577        session_id: &str,
1578        session_execution_lease: &SessionExecutionLeaseFence,
1579        owner: &LeaseOwnerIdentity,
1580        turn_id: &str,
1581        checkpoint: lash_core::CheckpointKind,
1582        lease_ttl_ms: u64,
1583        max_inputs: usize,
1584    ) -> Result<Option<lash_core::TurnInputClaim>, StoreError> {
1585        claim_pending_turn_inputs_sqlite(
1586            &self.conn,
1587            session_id,
1588            session_execution_lease,
1589            owner,
1590            lease_ttl_ms,
1591            max_inputs,
1592            lash_core::TurnInputClaimMode::ActiveTurn {
1593                turn_id: turn_id.to_string(),
1594                checkpoint,
1595            },
1596        )
1597        .await
1598    }
1599
1600    async fn claim_next_turn_inputs(
1601        &self,
1602        session_id: &str,
1603        session_execution_lease: &SessionExecutionLeaseFence,
1604        owner: &LeaseOwnerIdentity,
1605        lease_ttl_ms: u64,
1606        max_inputs: usize,
1607    ) -> Result<Option<lash_core::TurnInputClaim>, StoreError> {
1608        claim_pending_turn_inputs_sqlite(
1609            &self.conn,
1610            session_id,
1611            session_execution_lease,
1612            owner,
1613            lease_ttl_ms,
1614            max_inputs,
1615            lash_core::TurnInputClaimMode::NextTurn,
1616        )
1617        .await
1618    }
1619
1620    async fn abandon_turn_input_claim(
1621        &self,
1622        claim: &lash_core::TurnInputClaim,
1623    ) -> Result<(), StoreError> {
1624        let session_id = claim.session_id.clone();
1625        let claim_id = claim.claim_id.clone();
1626        let lease_token = claim.lease_token.clone();
1627        let restored_state = match claim.mode {
1628            lash_core::TurnInputClaimMode::ActiveTurn { .. } => {
1629                lash_core::TurnInputState::PendingActive
1630            }
1631            lash_core::TurnInputClaimMode::NextTurn => lash_core::TurnInputState::DeferredNextTurn,
1632        };
1633        self.conn
1634            .write(move |tx| {
1635                tx.execute(
1636                    "UPDATE pending_turn_inputs
1637                     SET state = CASE
1638                             WHEN state = ?4 THEN ?5
1639                             ELSE state
1640                         END,
1641                         claim_id = NULL,
1642                         claim_owner_id = NULL,
1643                         claim_owner_incarnation_id = NULL,
1644                         claim_owner_liveness_json = NULL,
1645                         claim_token = NULL,
1646                         claim_claimed_at_ms = 0,
1647                         claim_expires_at_ms = 0
1648                     WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
1649                    params![
1650                        session_id,
1651                        claim_id,
1652                        lease_token,
1653                        lash_core::TurnInputState::Accepted.as_str(),
1654                        restored_state.as_str(),
1655                    ],
1656                )
1657            })
1658            .await
1659            .map_err(sqlite_error)?;
1660        Ok(())
1661    }
1662}
1663
1664#[async_trait::async_trait]
1665impl StoreMaintenance for Store {
1666    async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError> {
1667        if ids.is_empty() {
1668            return Ok(());
1669        }
1670        let ids = ids.to_vec();
1671        self.conn
1672            .write(move |tx| {
1673                let mut stmt =
1674                    tx.prepare("UPDATE graph_nodes SET tombstoned = 1 WHERE node_id = ?1")?;
1675                for id in &ids {
1676                    stmt.execute(params![id])?;
1677                }
1678                Ok(())
1679            })
1680            .await
1681            .map_err(sqlite_error)
1682    }
1683
1684    async fn vacuum(&self) -> Result<VacuumReport, StoreError> {
1685        let (removed_node_count, removed_pending_turn_input_tombstone_count) = self
1686            .conn
1687            .write(move |tx| {
1688                let removed_node_count =
1689                    tx.execute("DELETE FROM graph_nodes WHERE tombstoned = 1", [])?;
1690                let removed_pending_turn_input_tombstone_count = tx.execute(
1691                    "DELETE FROM pending_turn_inputs
1692                     WHERE state IN (?1, ?2)",
1693                    params![
1694                        lash_core::TurnInputState::Cancelled.as_str(),
1695                        lash_core::TurnInputState::Completed.as_str()
1696                    ],
1697                )?;
1698                Ok((
1699                    removed_node_count,
1700                    removed_pending_turn_input_tombstone_count,
1701                ))
1702            })
1703            .await
1704            .map_err(sqlite_error)?;
1705        Ok(VacuumReport {
1706            removed_node_count,
1707            removed_pending_turn_input_tombstone_count,
1708        })
1709    }
1710
1711    async fn gc_unreachable(&self) -> Result<GcReport, StoreError> {
1712        Ok(Store::gc_unreachable(self).await)
1713    }
1714}
1715
1716fn derive_pending_turn_input_id(
1717    session_id: &str,
1718    source_key: Option<&str>,
1719    now_epoch_ms: u64,
1720    nonce: u64,
1721) -> String {
1722    format!(
1723        "ti:{:x}",
1724        Sha256::digest(format!("{session_id}:{source_key:?}:{now_epoch_ms}:{nonce}").as_bytes())
1725    )
1726}
1727
1728fn cancel_pending_turn_input_row_conn(
1729    conn: &Connection,
1730    row: PendingTurnInputRow,
1731    now_epoch_ms: u64,
1732) -> Result<lash_core::PendingTurnInputCancelOutcome, StoreError> {
1733    let mut input = pending_turn_input_from_row(row.clone())?;
1734    match input.state {
1735        lash_core::TurnInputState::Cancelled => Ok(
1736            lash_core::PendingTurnInputCancelOutcome::AlreadyCancelled(input),
1737        ),
1738        lash_core::TurnInputState::Completed => Ok(
1739            lash_core::PendingTurnInputCancelOutcome::AlreadyCompleted(input),
1740        ),
1741        lash_core::TurnInputState::Accepted => {
1742            Ok(lash_core::PendingTurnInputCancelOutcome::AlreadyClaimed {
1743                claim: pending_turn_input_claim_diagnostics_from_row(&row, input.state),
1744                input,
1745            })
1746        }
1747        lash_core::TurnInputState::PendingActive | lash_core::TurnInputState::DeferredNextTurn => {
1748            let live_claim = row.claim_token.is_some() && row.claim_expires_at_ms > now_epoch_ms;
1749            if live_claim {
1750                return Ok(lash_core::PendingTurnInputCancelOutcome::AlreadyClaimed {
1751                    claim: pending_turn_input_claim_diagnostics_from_row(&row, input.state),
1752                    input,
1753                });
1754            }
1755            conn.execute(
1756                "UPDATE pending_turn_inputs
1757                 SET state = ?3,
1758                     claim_id = NULL,
1759                     claim_owner_id = NULL,
1760                     claim_owner_incarnation_id = NULL,
1761                     claim_owner_liveness_json = NULL,
1762                     claim_token = NULL,
1763                     claim_claimed_at_ms = 0,
1764                     claim_expires_at_ms = 0
1765                 WHERE session_id = ?1 AND input_id = ?2",
1766                params![
1767                    row.session_id,
1768                    row.input_id,
1769                    lash_core::TurnInputState::Cancelled.as_str(),
1770                ],
1771            )
1772            .map_err(sqlite_error)?;
1773            input.state = lash_core::TurnInputState::Cancelled;
1774            Ok(lash_core::PendingTurnInputCancelOutcome::Cancelled(input))
1775        }
1776    }
1777}
1778
1779async fn claim_pending_turn_inputs_sqlite(
1780    conn: &SqliteConnection,
1781    session_id: &str,
1782    session_execution_lease: &SessionExecutionLeaseFence,
1783    owner: &LeaseOwnerIdentity,
1784    lease_ttl_ms: u64,
1785    max_inputs: usize,
1786    mode: lash_core::TurnInputClaimMode,
1787) -> Result<Option<lash_core::TurnInputClaim>, StoreError> {
1788    if max_inputs == 0 {
1789        return Ok(None);
1790    }
1791    let session_id = session_id.to_string();
1792    let session_execution_lease = session_execution_lease.clone();
1793    let owner = owner.clone();
1794    conn.write_flow(move |tx| {
1795        let outcome: Result<TxOutcome<Option<lash_core::TurnInputClaim>>, StoreError> = (|| {
1796            ensure_session_execution_lease_conn(tx, &session_id, &session_execution_lease)?;
1797            let now = current_epoch_ms();
1798            let wanted_state = match &mode {
1799                lash_core::TurnInputClaimMode::ActiveTurn { .. } => {
1800                    lash_core::TurnInputState::PendingActive
1801                }
1802                lash_core::TurnInputClaimMode::NextTurn => {
1803                    lash_core::TurnInputState::DeferredNextTurn
1804                }
1805            };
1806            let candidate_rows = {
1807                let mut stmt = tx
1808                    .prepare(
1809                        "SELECT enqueue_seq, input_id, session_id, source_key, ingress_json,
1810                                state, input_json, enqueued_at_ms, claim_id, claim_fencing_token,
1811                                claim_owner_id, claim_owner_incarnation_id,
1812                                claim_owner_liveness_json, claim_token, claim_expires_at_ms
1813                         FROM pending_turn_inputs
1814                         WHERE session_id = ?1 AND state = ?2
1815                         ORDER BY enqueue_seq ASC
1816                         LIMIT ?3",
1817                    )
1818                    .map_err(sqlite_error)?;
1819                let rows = stmt
1820                    .query_map(
1821                        params![
1822                            session_id,
1823                            wanted_state.as_str(),
1824                            (max_inputs as i64).saturating_add(32)
1825                        ],
1826                        pending_turn_input_row_from_sql,
1827                    )
1828                    .map_err(sqlite_error)?;
1829                rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
1830            };
1831            let mut selected = Vec::new();
1832            for row in candidate_rows {
1833                let claim_available = row.claim_token.is_none()
1834                    || row.claim_expires_at_ms <= now
1835                    || row
1836                        .claim_owner
1837                        .as_ref()
1838                        .is_some_and(|holder| holder.is_definitely_dead_for_claimant(&owner));
1839                if !claim_available {
1840                    continue;
1841                }
1842                let input = pending_turn_input_from_row(row.clone())?;
1843                let matches_mode = match &mode {
1844                    lash_core::TurnInputClaimMode::ActiveTurn {
1845                        turn_id,
1846                        checkpoint,
1847                    } => {
1848                        input
1849                            .ingress
1850                            .active_turn_id()
1851                            .is_some_and(|active| active == turn_id)
1852                            && input.ingress.admits_checkpoint(*checkpoint)
1853                    }
1854                    lash_core::TurnInputClaimMode::NextTurn => input.state.is_next_turn_pending(),
1855                };
1856                if matches_mode {
1857                    selected.push((row, input));
1858                    if selected.len() >= max_inputs {
1859                        break;
1860                    }
1861                }
1862            }
1863            let Some((head, _)) = selected.first() else {
1864                return Ok(TxOutcome::Commit(None));
1865            };
1866            let lease = TurnInputClaimLease::derive(head, &session_id, &owner, now, lease_ttl_ms);
1867            let liveness_json = encode_liveness(&owner.liveness)?;
1868            let state_after_claim = match &mode {
1869                lash_core::TurnInputClaimMode::ActiveTurn { .. } => {
1870                    lash_core::TurnInputState::Accepted
1871                }
1872                lash_core::TurnInputClaimMode::NextTurn => {
1873                    lash_core::TurnInputState::DeferredNextTurn
1874                }
1875            };
1876            let mut inputs = Vec::new();
1877            for (row, mut input) in selected {
1878                let claimed = tx
1879                    .execute(
1880                        "UPDATE pending_turn_inputs
1881                         SET state = ?3,
1882                             claim_id = ?4,
1883                             claim_owner_id = ?5,
1884                             claim_owner_incarnation_id = ?6,
1885                             claim_owner_liveness_json = ?7,
1886                             claim_token = ?8,
1887                             claim_fencing_token = claim_fencing_token + 1,
1888                             claim_claimed_at_ms = ?9,
1889                             claim_expires_at_ms = ?10
1890                         WHERE session_id = ?1
1891                           AND input_id = ?2
1892                           AND (
1893                                claim_token IS NULL
1894                                OR claim_expires_at_ms <= ?9
1895                                OR (
1896                                    claim_token = ?11
1897                                    AND claim_owner_id = ?12
1898                                    AND claim_owner_incarnation_id = ?13
1899                                )
1900                           )",
1901                        params![
1902                            session_id,
1903                            row.input_id,
1904                            state_after_claim.as_str(),
1905                            lease.claim_id,
1906                            owner.owner_id.as_str(),
1907                            owner.incarnation_id.as_str(),
1908                            liveness_json.as_str(),
1909                            lease.lease_token,
1910                            now as i64,
1911                            lease.expires_at_epoch_ms as i64,
1912                            row.claim_token,
1913                            row.claim_owner
1914                                .as_ref()
1915                                .map(|owner| owner.owner_id.as_str()),
1916                            row.claim_owner
1917                                .as_ref()
1918                                .map(|owner| owner.incarnation_id.as_str())
1919                        ],
1920                    )
1921                    .map_err(sqlite_error)?;
1922                if claimed == 0 {
1923                    return Ok(TxOutcome::Rollback(None));
1924                }
1925                input.state = state_after_claim;
1926                inputs.push(input);
1927            }
1928            Ok(TxOutcome::Commit(Some(lash_core::TurnInputClaim {
1929                session_id: session_id.clone(),
1930                claim_id: lease.claim_id,
1931                owner: owner.clone(),
1932                lease_token: lease.lease_token,
1933                fencing_token: lease.fencing_token,
1934                claimed_at_epoch_ms: lease.claimed_at_epoch_ms,
1935                expires_at_epoch_ms: lease.expires_at_epoch_ms,
1936                mode,
1937                inputs,
1938            })))
1939        })(
1940        );
1941        match outcome {
1942            Ok(TxOutcome::Commit(value)) => Ok(TxOutcome::Commit(Ok(value))),
1943            Ok(TxOutcome::Rollback(value)) => Ok(TxOutcome::Rollback(Ok(value))),
1944            Err(err) => Ok(TxOutcome::Rollback(Err(err))),
1945        }
1946    })
1947    .await
1948    .map_err(sqlite_error)?
1949}
1950
1951struct SessionExecutionLeaseRow {
1952    owner: Option<LeaseOwnerIdentity>,
1953    lease_token: Option<String>,
1954    fencing_token: u64,
1955    claimed_at_ms: u64,
1956    expires_at_ms: u64,
1957}
1958
1959fn load_session_execution_lease_row_conn(
1960    conn: &Connection,
1961    session_id: &str,
1962) -> Result<Option<SessionExecutionLeaseRow>, StoreError> {
1963    let row = conn
1964        .query_row(
1965            "SELECT lease_owner_id, lease_token, lease_fencing_token,
1966                    lease_claimed_at_ms, lease_expires_at_ms,
1967                    lease_owner_incarnation_id, lease_owner_liveness_json
1968             FROM session_execution_leases
1969             WHERE session_id = ?1",
1970            params![session_id],
1971            |row| {
1972                let owner_id: Option<String> = row.get(0)?;
1973                let incarnation_id: Option<String> = row.get(5)?;
1974                let liveness_json: Option<String> = row.get(6)?;
1975                Ok(SessionExecutionLeaseRow {
1976                    owner: lease_owner_from_columns(owner_id, incarnation_id, liveness_json),
1977                    lease_token: row.get(1)?,
1978                    fencing_token: row.get::<_, i64>(2)? as u64,
1979                    claimed_at_ms: row.get::<_, i64>(3)? as u64,
1980                    expires_at_ms: row.get::<_, i64>(4)? as u64,
1981                })
1982            },
1983        )
1984        .optional()
1985        .map_err(sqlite_error)?;
1986    Ok(row)
1987}
1988
1989fn lease_owner_from_columns(
1990    owner_id: Option<String>,
1991    incarnation_id: Option<String>,
1992    liveness_json: Option<String>,
1993) -> Option<LeaseOwnerIdentity> {
1994    owner_id.map(|owner_id| LeaseOwnerIdentity {
1995        incarnation_id: incarnation_id.unwrap_or_else(|| owner_id.clone()),
1996        owner_id,
1997        liveness: liveness_json
1998            .as_deref()
1999            .and_then(|json| serde_json::from_str(json).ok())
2000            .unwrap_or(LeaseOwnerLiveness::Opaque),
2001    })
2002}
2003
2004fn encode_liveness(liveness: &LeaseOwnerLiveness) -> Result<String, StoreError> {
2005    serde_json::to_string(liveness)
2006        .map_err(|err| StoreError::Backend(format!("failed to encode lease liveness: {err}")))
2007}
2008
2009fn row_to_session_execution_lease(
2010    session_id: &str,
2011    row: SessionExecutionLeaseRow,
2012) -> Result<SessionExecutionLease, StoreError> {
2013    Ok(SessionExecutionLease {
2014        session_id: session_id.to_string(),
2015        owner: row
2016            .owner
2017            .ok_or_else(|| StoreError::Backend("live session lease missing owner".to_string()))?,
2018        lease_token: row.lease_token.ok_or_else(|| {
2019            StoreError::Backend("live session lease missing lease token".to_string())
2020        })?,
2021        fencing_token: row.fencing_token,
2022        claimed_at_epoch_ms: row.claimed_at_ms,
2023        expires_at_epoch_ms: row.expires_at_ms,
2024    })
2025}
2026
2027fn acquire_session_execution_lease_conn(
2028    conn: &Connection,
2029    session_id: &str,
2030    owner: &LeaseOwnerIdentity,
2031    previous_fencing_token: u64,
2032    now: u64,
2033    lease_ttl_ms: u64,
2034) -> Result<SessionExecutionLease, StoreError> {
2035    let fencing_token = previous_fencing_token.saturating_add(1);
2036    let lease_token = format!(
2037        "{}:{}:{}:{now}:{fencing_token}",
2038        session_id, owner.owner_id, owner.incarnation_id
2039    );
2040    let expires_at = now.saturating_add(lease_ttl_ms);
2041    let liveness_json = encode_liveness(&owner.liveness)?;
2042    conn.execute(
2043        "INSERT INTO session_execution_leases (
2044            session_id, lease_owner_id, lease_owner_incarnation_id, lease_owner_liveness_json,
2045            lease_token, lease_fencing_token, lease_claimed_at_ms, lease_expires_at_ms
2046         )
2047         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
2048         ON CONFLICT(session_id) DO UPDATE SET
2049            lease_owner_id = excluded.lease_owner_id,
2050            lease_owner_incarnation_id = excluded.lease_owner_incarnation_id,
2051            lease_owner_liveness_json = excluded.lease_owner_liveness_json,
2052            lease_token = excluded.lease_token,
2053            lease_fencing_token = excluded.lease_fencing_token,
2054            lease_claimed_at_ms = excluded.lease_claimed_at_ms,
2055            lease_expires_at_ms = excluded.lease_expires_at_ms",
2056        params![
2057            session_id,
2058            owner.owner_id,
2059            owner.incarnation_id,
2060            liveness_json,
2061            lease_token,
2062            fencing_token as i64,
2063            now as i64,
2064            expires_at as i64
2065        ],
2066    )
2067    .map_err(sqlite_error)?;
2068    Ok(SessionExecutionLease {
2069        session_id: session_id.to_string(),
2070        owner: owner.clone(),
2071        lease_token,
2072        fencing_token,
2073        claimed_at_epoch_ms: now,
2074        expires_at_epoch_ms: expires_at,
2075    })
2076}
2077
2078fn ensure_session_execution_lease_conn(
2079    conn: &Connection,
2080    session_id: &str,
2081    fence: &SessionExecutionLeaseFence,
2082) -> Result<(), StoreError> {
2083    if fence.session_id != session_id {
2084        return Err(StoreError::SessionExecutionLeaseExpired {
2085            session_id: session_id.to_string(),
2086        });
2087    }
2088    let now = current_epoch_ms();
2089    let current = load_session_execution_lease_row_conn(conn, session_id)?;
2090    let Some(current) = current else {
2091        return Err(StoreError::SessionExecutionLeaseExpired {
2092            session_id: session_id.to_string(),
2093        });
2094    };
2095    if current
2096        .owner
2097        .as_ref()
2098        .is_some_and(|owner| owner.same_incarnation(&fence.owner))
2099        && current.lease_token.as_deref() == Some(fence.lease_token.as_str())
2100        && current.fencing_token == fence.fencing_token
2101        && current.expires_at_ms > now
2102    {
2103        Ok(())
2104    } else {
2105        Err(StoreError::SessionExecutionLeaseExpired {
2106            session_id: session_id.to_string(),
2107        })
2108    }
2109}
2110
2111fn release_session_execution_lease_conn(
2112    conn: &Connection,
2113    completion: &SessionExecutionLeaseCompletion,
2114) -> Result<(), StoreError> {
2115    conn.execute(
2116        "UPDATE session_execution_leases
2117         SET lease_owner_id = NULL,
2118             lease_owner_incarnation_id = NULL,
2119             lease_owner_liveness_json = NULL,
2120             lease_token = NULL,
2121             lease_claimed_at_ms = 0,
2122             lease_expires_at_ms = 0
2123         WHERE session_id = ?1
2124           AND lease_owner_id = ?2
2125           AND lease_owner_incarnation_id = ?3
2126           AND lease_token = ?4
2127           AND lease_fencing_token = ?5",
2128        params![
2129            completion.session_id,
2130            completion.owner.owner_id,
2131            completion.owner.incarnation_id,
2132            completion.lease_token,
2133            completion.fencing_token as i64
2134        ],
2135    )
2136    .map_err(sqlite_error)?;
2137    Ok(())
2138}