Skip to main content

lash_sqlite_store/
lib.rs

1//! # lash-sqlite-store
2//!
3//! The high-performance local **durable** persistence backend for the lash
4//! agent runtime. One SQLite database per session, opened in WAL journal mode
5//! with a 15-second busy timeout, satisfying the full [`RuntimePersistence`] +
6//! [`AttachmentManifest`] contract from `lash-core`.
7//!
8//! This crate is a drop-in replacement for `lash-sqlite-store`: it exposes the
9//! same public surface (`Store`, `SqliteProcessRegistry`,
10//! `SqliteSessionStoreFactory`, `SqliteEffectHost`, the option/descriptor types)
11//! with identical async signatures, so a consumer swaps backends by renaming
12//! the crate path only. The difference is the engine underneath: tokio-rusqlite
13//! over a statically-linked SQLite with real WAL (`-wal`/`-shm` sidecars,
14//! multi-process readers + single writer) instead of the prior store's experimental mvcc.
15//!
16//! ## Why this is "the durable backend" not just "an option"
17//!
18//! Lash's runtime layer treats persistence as a first-class boundary, not a
19//! debug-only convenience. Every primitive that lets the runtime survive a
20//! crash — head-revision CAS, final turn-commit idempotency, attachment
21//! write-ahead manifests, blob content-addressing with optional compression —
22//! is implemented in this crate against SQLite for one reason: SQLite is the
23//! simplest backend that gives us *atomic multi-statement transactions on a
24//! single file* with durability guarantees we can reason about.
25//!
26//! ## Schema cutover, not migrations
27//!
28//! There is exactly one supported schema (see [`schema::SCHEMA`]). Older
29//! databases must be deleted before opening — we do not carry migration code.
30//!
31//! [`RuntimePersistence`]: lash_core::RuntimePersistence
32//! [`AttachmentManifest`]: lash_core::AttachmentManifest
33
34use std::collections::BTreeMap;
35use std::path::{Path, PathBuf};
36use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
37use std::sync::{Arc, Mutex};
38use std::time::{SystemTime, UNIX_EPOCH};
39
40use flate2::Compression;
41use flate2::read::ZlibDecoder;
42use flate2::write::ZlibEncoder;
43use lash_core::runtime::ProcessHandleGrantEntry;
44use lash_core::runtime::{
45    QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
46    QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, prepare_process_event_append,
47    prepare_process_registration,
48};
49use lash_core::store::queued_work::{
50    ClaimCandidate, QueuedWorkClaimLease, claim_scan_limit, derive_batch_id,
51    ensure_completion_owns_all_batches, renewed_claim, select_leading_session_command,
52    select_turn_work_claim_prefix,
53};
54use lash_core::store::{
55    GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
56    RuntimeCommitResult, SessionCheckpoint, SessionHead, SessionHeadMeta,
57};
58use lash_core::{
59    AbandonRequest, AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry,
60    BlobRef, DeliveryPolicy, DurabilityTier, GcReport, LeaseOwnerIdentity, LeaseOwnerLiveness,
61    MergeKey, PROCESS_LEASE_SCHEMA_VERSION, ProcessAwaitOutput, ProcessEvent,
62    ProcessEventAppendRequest, ProcessEventAppendResult, ProcessExternalRef,
63    ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease, ProcessLeaseClaimOutcome,
64    ProcessLeaseCompletion, ProcessPruneReport, ProcessRecord, ProcessRegistration,
65    ProcessRegistry, ProcessStarted, QueuedWorkStore, RuntimePersistence, SessionCommitStore,
66    SessionExecutionLease, SessionExecutionLeaseClaimOutcome, SessionExecutionLeaseCompletion,
67    SessionExecutionLeaseFence, SessionExecutionLeaseStore, SessionMeta, SessionPickerInfo,
68    SessionReadScope, SessionScope, SessionStoreCreateRequest, SessionStoreFactory, SlotPolicy,
69    StoreError, StoreMaintenance, TurnInputStore, VacuumReport,
70};
71use rusqlite::{Connection, OptionalExtension, Transaction, params};
72use sha2::{Digest, Sha256};
73
74use conn::SqliteConnection;
75
76mod attachments;
77mod blobs;
78mod conn;
79mod effect_replay;
80mod graph;
81mod leases;
82mod lifecycle;
83mod pending_turn_inputs;
84mod persistence;
85mod process_registry;
86mod queued_work;
87mod schema;
88mod triggers;
89
90use conn::TxOutcome;
91pub use effect_replay::{
92    SqliteEffectHost, SqliteEffectReplayOptions, SqliteRuntimeEffectController,
93};
94use leases::*;
95use pending_turn_inputs::*;
96use queued_work::*;
97use schema::{
98    StoreBacking, apply_pragmas, ensure_effect_schema, ensure_process_schema, ensure_schema,
99    ensure_trigger_schema,
100};
101pub use triggers::SqliteTriggerStore;
102
103/// SQLite-backed store for checkpoint blobs, runtime session state, and
104/// Lashlang artifacts.
105///
106/// This is the first-party local implementation of the runtime store traits.
107/// Internally it holds a single cloneable [`SqliteConnection`] (a
108/// tokio-rusqlite handle to one database thread).
109pub struct Store {
110    conn: SqliteConnection,
111    artifact_cache: Mutex<BTreeMap<lashlang::ModuleRef, Arc<lashlang::ModuleArtifact>>>,
112    options: StoreOptions,
113    commit_count: AtomicU64,
114}
115
116/// SQLite-backed process registry for one configured runtime deployment.
117///
118/// It is intentionally separate from [`Store`]: session databases persist one
119/// conversation, while this registry persists background process state and
120/// handle visibility across all sessions sharing the registry.
121pub struct SqliteProcessRegistry {
122    conn: SqliteConnection,
123}
124
125fn sqlite_error(err: rusqlite::Error) -> StoreError {
126    StoreError::Backend(err.to_string())
127}
128
129fn process_sqlite_error(err: rusqlite::Error) -> lash_core::PluginError {
130    lash_core::PluginError::Session(err.to_string())
131}
132
133fn process_decode_error(err: serde_json::Error) -> lash_core::PluginError {
134    lash_core::PluginError::Session(format!("failed to decode process registry row: {err}"))
135}
136
137fn process_encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
138    serde_json::to_string(value).map_err(|err| {
139        lash_core::PluginError::Session(format!("failed to encode process row: {err}"))
140    })
141}
142
143fn block_on_store<T>(future: impl std::future::Future<Output = T>) -> T {
144    futures_executor::block_on(future)
145}
146
147#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
148pub enum PersistedArtifactKind {
149    GenericBlob,
150    CheckpointManifest,
151    ToolState,
152    PluginSessionSnapshot,
153    ExecutionStateSnapshot,
154    LashlangModule,
155    ProcessExecutionEnv,
156}
157
158#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
159pub enum BlobStorageHint {
160    Compressible,
161    InlinePreferred,
162    LargePayload,
163}
164
165#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
166enum BlobCompression {
167    None,
168    Zlib,
169}
170
171#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
172pub struct BlobArtifactDescriptor {
173    pub kind: PersistedArtifactKind,
174    #[serde(default, skip_serializing_if = "Vec::is_empty")]
175    pub hints: Vec<BlobStorageHint>,
176}
177
178impl BlobArtifactDescriptor {
179    pub fn new(kind: PersistedArtifactKind, hints: impl Into<Vec<BlobStorageHint>>) -> Self {
180        Self {
181            kind,
182            hints: hints.into(),
183        }
184    }
185
186    pub fn checkpoint_manifest() -> Self {
187        Self::new(
188            PersistedArtifactKind::CheckpointManifest,
189            vec![BlobStorageHint::Compressible],
190        )
191    }
192
193    pub fn tool_state_snapshot() -> Self {
194        Self::new(
195            PersistedArtifactKind::ToolState,
196            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
197        )
198    }
199
200    pub fn plugin_session_snapshot() -> Self {
201        Self::new(
202            PersistedArtifactKind::PluginSessionSnapshot,
203            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
204        )
205    }
206
207    pub fn execution_state_snapshot() -> Self {
208        Self::new(
209            PersistedArtifactKind::ExecutionStateSnapshot,
210            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
211        )
212    }
213
214    pub fn lashlang_module() -> Self {
215        Self::new(
216            PersistedArtifactKind::LashlangModule,
217            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
218        )
219    }
220
221    pub fn process_execution_env() -> Self {
222        Self::new(
223            PersistedArtifactKind::ProcessExecutionEnv,
224            vec![BlobStorageHint::Compressible],
225        )
226    }
227}
228
229#[derive(Clone, Debug, PartialEq, Eq, Hash)]
230struct RetainedArtifactRef {
231    pub blob_ref: BlobRef,
232    pub kind: PersistedArtifactKind,
233}
234
235#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
236pub enum BuiltinBlobProfile {
237    LowLatency,
238    #[default]
239    Balanced,
240    Compact,
241}
242
243#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
244pub struct StoreGcPolicy {
245    pub auto_run_every_commits: Option<u64>,
246}
247
248#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
249pub struct StoreOptions {
250    pub blob_profile: BuiltinBlobProfile,
251    pub gc_policy: StoreGcPolicy,
252}
253
254#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
255struct StoredBlobEnvelope {
256    descriptor: BlobArtifactDescriptor,
257    compression: BlobCompression,
258    content: Vec<u8>,
259}
260
261#[derive(Clone, Debug)]
262pub struct StoredSessionCheckpoint {
263    pub checkpoint_ref: BlobRef,
264    pub manifest: SessionCheckpoint,
265}
266
267/// Explicit first-party factory for one SQLite session database per Lash
268/// session.
269///
270/// Hosts opt into this by passing it to `lash::LashCoreBuilder::store_factory`.
271/// The factory never becomes a default: app storage and runtime storage remain
272/// host-owned decisions.
273#[derive(Clone, Debug)]
274pub struct SqliteSessionStoreFactory {
275    root: PathBuf,
276    options: StoreOptions,
277}
278
279impl SqliteSessionStoreFactory {
280    pub fn new(root: impl Into<PathBuf>) -> Self {
281        Self {
282            root: root.into(),
283            options: StoreOptions::default(),
284        }
285    }
286
287    pub fn with_options(root: impl Into<PathBuf>, options: StoreOptions) -> Self {
288        Self {
289            root: root.into(),
290            options,
291        }
292    }
293
294    pub fn path_for_session(&self, session_id: &str) -> PathBuf {
295        self.root.join(safe_session_db_file_name(session_id))
296    }
297}
298
299#[async_trait::async_trait]
300impl SessionStoreFactory for SqliteSessionStoreFactory {
301    fn durability_tier(&self) -> DurabilityTier {
302        DurabilityTier::Durable
303    }
304
305    async fn create_store(
306        &self,
307        request: &SessionStoreCreateRequest,
308    ) -> Result<Arc<dyn RuntimePersistence>, String> {
309        std::fs::create_dir_all(&self.root).map_err(|err| err.to_string())?;
310        let path = self.path_for_session(&request.session_id);
311        let store = Arc::new(
312            Store::open_with_options(&path, self.options)
313                .await
314                .map_err(|err| err.to_string())?,
315        );
316        if store.load_session_meta().await.is_none() {
317            store
318                .save_session_meta(SessionMeta {
319                    session_id: request.session_id.clone(),
320                    session_name: request.session_id.clone(),
321                    created_at: current_timestamp_string(),
322                    model: request.policy.model.id.clone(),
323                    cwd: std::env::current_dir()
324                        .ok()
325                        .and_then(|path| path.to_str().map(str::to_string)),
326                    relation: request.relation.clone(),
327                })
328                .await;
329        }
330        Ok(store as Arc<dyn RuntimePersistence>)
331    }
332
333    async fn open_existing_store(
334        &self,
335        request: &SessionStoreCreateRequest,
336    ) -> Result<Option<Arc<dyn RuntimePersistence>>, String> {
337        let path = self.path_for_session(&request.session_id);
338        if !path.exists() {
339            return Ok(None);
340        }
341        self.create_store(request).await.map(Some)
342    }
343
344    async fn delete_session(&self, session_id: &str) -> Result<(), String> {
345        let db_path = self.path_for_session(session_id);
346        for path in [
347            db_path.clone(),
348            sqlite_sidecar_path(&db_path, "-wal"),
349            sqlite_sidecar_path(&db_path, "-shm"),
350        ] {
351            match std::fs::remove_file(&path) {
352                Ok(()) => {}
353                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
354                Err(err) => {
355                    return Err(format!("remove session store {}: {err}", path.display()));
356                }
357            }
358        }
359        Ok(())
360    }
361}
362
363fn safe_session_db_file_name(session_id: &str) -> String {
364    let mut safe = session_id
365        .chars()
366        .map(|ch| match ch {
367            'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => ch,
368            _ => '_',
369        })
370        .collect::<String>();
371    safe = safe.trim_matches('_').to_string();
372    if safe.is_empty() {
373        safe.push_str("session");
374    }
375    safe.truncate(80);
376    let hash = format!("{:x}", Sha256::digest(session_id.as_bytes()));
377    format!("{safe}-{}.db", &hash[..16])
378}
379
380fn sqlite_sidecar_path(path: &Path, suffix: &str) -> PathBuf {
381    let mut sidecar = path.as_os_str().to_os_string();
382    sidecar.push(suffix);
383    PathBuf::from(sidecar)
384}
385
386fn current_timestamp_string() -> String {
387    let now = SystemTime::now()
388        .duration_since(UNIX_EPOCH)
389        .unwrap_or_default();
390    format!("unix:{}", now.as_secs())
391}
392
393fn current_epoch_ms() -> u64 {
394    SystemTime::now()
395        .duration_since(UNIX_EPOCH)
396        .unwrap_or_default()
397        .as_millis() as u64
398}
399
400fn retained_artifact_refs(checkpoint: &SessionCheckpoint) -> Vec<RetainedArtifactRef> {
401    let mut refs = Vec::new();
402    if let Some(blob_ref) = &checkpoint.tool_state_ref {
403        refs.push(RetainedArtifactRef {
404            blob_ref: blob_ref.clone(),
405            kind: PersistedArtifactKind::ToolState,
406        });
407    }
408    if let Some(blob_ref) = &checkpoint.plugin_snapshot_ref {
409        refs.push(RetainedArtifactRef {
410            blob_ref: blob_ref.clone(),
411            kind: PersistedArtifactKind::PluginSessionSnapshot,
412        });
413    }
414    if let Some(blob_ref) = &checkpoint.execution_state_ref {
415        refs.push(RetainedArtifactRef {
416            blob_ref: blob_ref.clone(),
417            kind: PersistedArtifactKind::ExecutionStateSnapshot,
418        });
419    }
420    refs
421}
422
423fn session_head_meta(head: &SessionHead) -> SessionHeadMeta {
424    SessionHeadMeta {
425        schema_version: lash_core::store::SESSION_HEAD_META_SCHEMA_VERSION,
426        session_id: head.session_id.clone(),
427        head_revision: 0,
428        config: head.config.clone(),
429        agent_frames: head.agent_frames.clone(),
430        current_agent_frame_id: head.current_agent_frame_id.clone(),
431        checkpoint_ref: head.checkpoint_ref.clone(),
432        leaf_node_id: head.graph.leaf_node_id.clone(),
433        graph_node_count: head.graph.nodes.len(),
434        token_ledger: Vec::new(),
435    }
436}
437
438fn encode_json<T: serde::Serialize>(value: &T) -> String {
439    serde_json::to_string(value).expect("persisted state should serialize")
440}
441
442fn should_compress_blob(
443    profile: BuiltinBlobProfile,
444    descriptor: &BlobArtifactDescriptor,
445    len: usize,
446) -> bool {
447    if !descriptor.hints.contains(&BlobStorageHint::Compressible) {
448        return false;
449    }
450    match profile {
451        BuiltinBlobProfile::LowLatency => false,
452        BuiltinBlobProfile::Balanced => len >= 4 * 1024,
453        BuiltinBlobProfile::Compact => len >= 1024,
454    }
455}
456
457fn compress_blob(content: &[u8]) -> Vec<u8> {
458    let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
459    std::io::Write::write_all(&mut encoder, content).expect("compress blob");
460    encoder.finish().expect("submit blob compression")
461}
462
463fn decompress_blob(content: &[u8]) -> Option<Vec<u8>> {
464    let mut decoder = ZlibDecoder::new(content);
465    let mut out = Vec::new();
466    std::io::Read::read_to_end(&mut decoder, &mut out).ok()?;
467    Some(out)
468}
469
470fn encode_artifact_blob(
471    descriptor: &BlobArtifactDescriptor,
472    profile: BuiltinBlobProfile,
473    content: &[u8],
474) -> Vec<u8> {
475    let (compression, stored_content) = if should_compress_blob(profile, descriptor, content.len())
476    {
477        (BlobCompression::Zlib, compress_blob(content))
478    } else {
479        (BlobCompression::None, content.to_vec())
480    };
481    encode_msgpack(&StoredBlobEnvelope {
482        descriptor: descriptor.clone(),
483        compression,
484        content: stored_content,
485    })
486}
487
488fn decode_artifact_blob(bytes: &[u8]) -> Option<Vec<u8>> {
489    let envelope = decode_msgpack::<StoredBlobEnvelope>(bytes)?;
490    match envelope.compression {
491        BlobCompression::None => Some(envelope.content),
492        BlobCompression::Zlib => decompress_blob(&envelope.content),
493    }
494}
495
496/// Read the session head meta off a raw connection. Synchronous because it runs
497/// inside a `conn.call`/`conn.write` closure on the connection thread.
498fn try_load_session_head_meta_from_conn(
499    conn: &Connection,
500) -> Result<Option<SessionHeadMeta>, StoreError> {
501    let row = conn
502        .query_row(
503            "SELECT head_json, head_revision FROM session_head WHERE singleton = 1",
504            [],
505            |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)),
506        )
507        .optional()
508        .map_err(sqlite_error)?;
509    let Some((head_json, head_revision)) = row else {
510        return Ok(None);
511    };
512    let mut meta: SessionHeadMeta = lash_core::store::decode_versioned_json_record(
513        &head_json,
514        "SessionHeadMeta",
515        lash_core::store::SESSION_HEAD_META_SCHEMA_VERSION,
516    )?;
517    meta.head_revision = head_revision as u64;
518    Ok(Some(meta))
519}
520
521fn load_session_head_meta_from_conn(conn: &Connection) -> Option<SessionHeadMeta> {
522    try_load_session_head_meta_from_conn(conn).ok().flatten()
523}
524
525fn load_session_meta_from_conn(conn: &Connection) -> Option<SessionMeta> {
526    conn.query_row(
527        "SELECT session_id, session_name, created_at, model, cwd, relation_json
528         FROM session_meta WHERE singleton = 1",
529        [],
530        |row| {
531            let relation_json: Option<String> = row.get(5)?;
532            let relation = relation_json
533                .and_then(|json| serde_json::from_str(&json).ok())
534                .unwrap_or_default();
535            Ok(SessionMeta {
536                session_id: row.get(0)?,
537                session_name: row.get(1)?,
538                created_at: row.get(2)?,
539                model: row.get(3)?,
540                cwd: row.get(4)?,
541                relation,
542            })
543        },
544    )
545    .optional()
546    .ok()
547    .flatten()
548}
549
550fn decode_checkpoint(bytes: &[u8]) -> Result<SessionCheckpoint, StoreError> {
551    let value: serde_json::Value = rmp_serde::from_slice(bytes)
552        .map_err(|err| StoreError::Backend(format!("failed to decode SessionCheckpoint: {err}")))?;
553    lash_core::store::ensure_supported_record_schema_version(
554        "SessionCheckpoint",
555        &value,
556        lash_core::store::SESSION_CHECKPOINT_SCHEMA_VERSION,
557    )?;
558    rmp_serde::from_slice(bytes)
559        .map_err(|err| StoreError::Backend(format!("failed to decode SessionCheckpoint: {err}")))
560}
561
562fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
563    // Pre-size the buffer so the per-byte writes inside rmp_serde don't
564    // walk the Vec through 0→4→8→16→32… reallocations on every call.
565    let mut buf = Vec::with_capacity(1024);
566    rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
567    buf
568}
569
570fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
571    rmp_serde::from_slice(bytes).ok()
572}
573
574fn merge_token_ledger_entries(
575    entries: Vec<lash_core::TokenLedgerEntry>,
576) -> Vec<lash_core::TokenLedgerEntry> {
577    let mut merged: Vec<lash_core::TokenLedgerEntry> = Vec::new();
578    for entry in entries {
579        if entry.usage.total() == 0 {
580            continue;
581        }
582        if let Some(existing) = merged
583            .iter_mut()
584            .find(|existing| existing.source == entry.source && existing.model == entry.model)
585        {
586            existing.usage.input_tokens += entry.usage.input_tokens;
587            existing.usage.output_tokens += entry.usage.output_tokens;
588            existing.usage.cache_read_input_tokens += entry.usage.cache_read_input_tokens;
589            existing.usage.cache_write_input_tokens += entry.usage.cache_write_input_tokens;
590            existing.usage.reasoning_output_tokens += entry.usage.reasoning_output_tokens;
591        } else {
592            merged.push(entry);
593        }
594    }
595    merged
596}
597
598#[cfg(test)]
599mod tests {
600    use super::*;
601    use lash_core::ProcessInput;
602    use lashlang::LashlangArtifactStore;
603
604    fn registration(id: &str) -> ProcessRegistration {
605        ProcessRegistration::new(
606            id,
607            ProcessInput::External {
608                metadata: serde_json::Value::Null,
609            },
610            lash_core::RecoveryDisposition::ExternallyOwned,
611            lash_core::ProcessProvenance::session(lash_core::SessionScope::new("session")),
612        )
613    }
614
615    #[tokio::test]
616    async fn sqlite_lashlang_artifact_store_round_trips_verified_module_artifacts() {
617        let store = Store::memory().await.expect("memory store");
618        let module =
619            lashlang::parse("process scan(root: str) { finish root }").expect("parse module");
620        let linked = lashlang::LinkedModule::link(
621            module,
622            lashlang::LashlangHostEnvironment::new(
623                lashlang::LashlangHostCatalog::new(),
624                lashlang::LashlangAbilities::all(),
625            ),
626        )
627        .expect("link module");
628
629        store
630            .put_module_artifact(&linked.artifact)
631            .await
632            .expect("put artifact");
633        let restored = store
634            .get_module_artifact(&linked.module_ref)
635            .await
636            .expect("get artifact")
637            .expect("artifact exists");
638
639        assert_eq!(restored.module_ref, linked.module_ref);
640        assert_eq!(
641            restored.process_ref("scan"),
642            linked.artifact.process_ref("scan")
643        );
644    }
645
646    #[tokio::test]
647    async fn sqlite_process_registry_persists_rows_after_reopen() {
648        let dir = tempfile::tempdir().expect("tempdir");
649        let path = dir.path().join("processes.db");
650        {
651            let registry = SqliteProcessRegistry::open(&path)
652                .await
653                .expect("open registry");
654            let session_scope = lash_core::SessionScope::new("session");
655            registry
656                .register_process(registration("proc-persist"))
657                .await
658                .expect("register");
659            registry
660                .grant_handle(
661                    &session_scope,
662                    "proc-persist",
663                    ProcessHandleDescriptor::new(Some("tool"), Some("demo")),
664                )
665                .await
666                .expect("grant");
667            registry
668                .complete_process(
669                    "proc-persist",
670                    ProcessAwaitOutput::Success {
671                        value: serde_json::json!({"ok": true}),
672                        control: None,
673                    },
674                )
675                .await
676                .expect("complete");
677        }
678
679        let registry = Arc::new(
680            SqliteProcessRegistry::open(&path)
681                .await
682                .expect("reopen registry"),
683        ) as Arc<dyn lash_core::ProcessRegistry>;
684        let session_scope = lash_core::SessionScope::new("session");
685        let record = registry
686            .get_process("proc-persist")
687            .await
688            .expect("persisted process");
689
690        assert_eq!(record.originator_scope_id(), session_scope.id().as_str());
691        assert_eq!(
692            record.provenance.originator,
693            lash_core::ProcessOriginator::session(session_scope.clone())
694        );
695        assert_eq!(
696            lash_core::ProcessAwaiter::polling(Arc::clone(&registry))
697                .await_terminal("proc-persist")
698                .await
699                .expect("await persisted"),
700            ProcessAwaitOutput::Success {
701                value: serde_json::json!({"ok": true}),
702                control: None,
703            }
704        );
705        assert_eq!(
706            registry
707                .list_handle_grants(&session_scope)
708                .await
709                .expect("grants")
710                .len(),
711            1
712        );
713    }
714}