Skip to main content

lash_sqlite_store/
lib.rs

1//! # lash-sqlite-store
2//!
3//! The high-performance local **durable** persistence backend for the lash
4//! agent runtime. One SQLite database per session, opened in WAL journal mode
5//! with a 15-second busy timeout, satisfying the full [`RuntimePersistence`] +
6//! [`AttachmentManifest`] contract from `lash-core`.
7//!
8//! This crate is a drop-in replacement for `lash-sqlite-store`: it exposes the
9//! same public surface (`Store`, `SqliteProcessRegistry`,
10//! `SqliteSessionStoreFactory`, `SqliteEffectHost`, the option/descriptor types)
11//! with identical async signatures, so a consumer swaps backends by renaming
12//! the crate path only. The difference is the engine underneath: tokio-rusqlite
13//! over a statically-linked SQLite with real WAL (`-wal`/`-shm` sidecars,
14//! multi-process readers + single writer) instead of the prior store's experimental mvcc.
15//!
16//! ## Why this is "the durable backend" not just "an option"
17//!
18//! Lash's runtime layer treats persistence as a first-class boundary, not a
19//! debug-only convenience. Every primitive that lets the runtime survive a
20//! crash — head-revision CAS, final turn-commit idempotency, attachment
21//! write-ahead manifests, blob content-addressing with optional compression —
22//! is implemented in this crate against SQLite for one reason: SQLite is the
23//! simplest backend that gives us *atomic multi-statement transactions on a
24//! single file* with durability guarantees we can reason about.
25//!
26//! ## Schema cutover, not migrations
27//!
28//! There is exactly one supported schema (see [`schema::SCHEMA`]). Older
29//! databases must be deleted before opening — we do not carry migration code.
30//!
31//! [`RuntimePersistence`]: lash_core::RuntimePersistence
32//! [`AttachmentManifest`]: lash_core::AttachmentManifest
33
34use std::collections::BTreeMap;
35use std::path::{Path, PathBuf};
36use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
37use std::sync::{Arc, Mutex};
38use std::time::{Duration, SystemTime, UNIX_EPOCH};
39
40use flate2::Compression;
41use flate2::read::ZlibDecoder;
42use flate2::write::ZlibEncoder;
43use lash_core::runtime::ProcessHandleGrantEntry;
44use lash_core::runtime::{
45    QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
46    QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, prepare_process_event_append,
47    prepare_process_registration,
48};
49use lash_core::store::queued_work::{
50    ClaimCandidate, QueuedWorkClaimLease, claim_scan_limit, derive_batch_id,
51    ensure_completion_owns_all_batches, renewed_claim, select_leading_session_command,
52    select_turn_work_claim_prefix,
53};
54use lash_core::store::{
55    GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
56    RuntimeCommitResult, SessionCheckpoint, SessionHead, SessionHeadMeta,
57};
58use lash_core::{
59    AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
60    DeliveryPolicy, DurabilityTier, GcReport, LeaseOwnerIdentity, LeaseOwnerLiveness, MergeKey,
61    PROCESS_LEASE_SCHEMA_VERSION, ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest,
62    ProcessEventAppendResult, ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant,
63    ProcessLease, ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
64    RuntimePersistence, SessionExecutionLease, SessionExecutionLeaseClaimOutcome,
65    SessionExecutionLeaseCompletion, SessionExecutionLeaseFence, SessionMeta, SessionPickerInfo,
66    SessionReadScope, SessionScope, SessionStoreCreateRequest, SessionStoreFactory, SlotPolicy,
67    StoreError, VacuumReport,
68};
69use rusqlite::{Connection, OptionalExtension, Transaction, params};
70use sha2::{Digest, Sha256};
71
72use conn::SqliteConnection;
73
74mod attachments;
75mod blobs;
76mod conn;
77mod effect_replay;
78mod graph;
79mod leases;
80mod lifecycle;
81mod pending_turn_inputs;
82mod persistence;
83mod process_registry;
84mod queued_work;
85mod schema;
86mod triggers;
87
88use conn::TxOutcome;
89pub use effect_replay::{
90    SqliteEffectHost, SqliteEffectReplayOptions, SqliteRuntimeEffectController,
91};
92use leases::*;
93use pending_turn_inputs::*;
94use queued_work::*;
95use schema::{
96    StoreBacking, apply_pragmas, ensure_effect_schema, ensure_process_schema, ensure_schema,
97    ensure_trigger_schema,
98};
99pub use triggers::SqliteTriggerStore;
100
101/// SQLite-backed store for checkpoint blobs, runtime session state, and
102/// Lashlang artifacts.
103///
104/// This is the first-party local implementation of the runtime store traits.
105/// Internally it holds a single cloneable [`SqliteConnection`] (a
106/// tokio-rusqlite handle to one database thread).
107pub struct Store {
108    conn: SqliteConnection,
109    artifact_cache: Mutex<BTreeMap<lashlang::ModuleRef, Arc<lashlang::ModuleArtifact>>>,
110    options: StoreOptions,
111    commit_count: AtomicU64,
112}
113
114/// SQLite-backed process registry for one configured runtime deployment.
115///
116/// It is intentionally separate from [`Store`]: session databases persist one
117/// conversation, while this registry persists background process state and
118/// handle visibility across all sessions sharing the registry.
119pub struct SqliteProcessRegistry {
120    conn: SqliteConnection,
121    notify: tokio::sync::Notify,
122}
123
124fn sqlite_error(err: rusqlite::Error) -> StoreError {
125    StoreError::Backend(err.to_string())
126}
127
128fn process_sqlite_error(err: rusqlite::Error) -> lash_core::PluginError {
129    lash_core::PluginError::Session(err.to_string())
130}
131
132fn process_decode_error(err: serde_json::Error) -> lash_core::PluginError {
133    lash_core::PluginError::Session(format!("failed to decode process registry row: {err}"))
134}
135
136fn process_encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
137    serde_json::to_string(value).map_err(|err| {
138        lash_core::PluginError::Session(format!("failed to encode process row: {err}"))
139    })
140}
141
142fn block_on_store<T>(future: impl std::future::Future<Output = T>) -> T {
143    futures_executor::block_on(future)
144}
145
146#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
147pub enum PersistedArtifactKind {
148    GenericBlob,
149    CheckpointManifest,
150    ToolState,
151    PluginSessionSnapshot,
152    ExecutionStateSnapshot,
153    LashlangModule,
154    ProcessExecutionEnv,
155}
156
157#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
158pub enum BlobStorageHint {
159    Compressible,
160    InlinePreferred,
161    LargePayload,
162}
163
164#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
165enum BlobCompression {
166    None,
167    Zlib,
168}
169
170#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
171pub struct BlobArtifactDescriptor {
172    pub kind: PersistedArtifactKind,
173    #[serde(default, skip_serializing_if = "Vec::is_empty")]
174    pub hints: Vec<BlobStorageHint>,
175}
176
177impl BlobArtifactDescriptor {
178    pub fn new(kind: PersistedArtifactKind, hints: impl Into<Vec<BlobStorageHint>>) -> Self {
179        Self {
180            kind,
181            hints: hints.into(),
182        }
183    }
184
185    pub fn checkpoint_manifest() -> Self {
186        Self::new(
187            PersistedArtifactKind::CheckpointManifest,
188            vec![BlobStorageHint::Compressible],
189        )
190    }
191
192    pub fn tool_state_snapshot() -> Self {
193        Self::new(
194            PersistedArtifactKind::ToolState,
195            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
196        )
197    }
198
199    pub fn plugin_session_snapshot() -> Self {
200        Self::new(
201            PersistedArtifactKind::PluginSessionSnapshot,
202            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
203        )
204    }
205
206    pub fn execution_state_snapshot() -> Self {
207        Self::new(
208            PersistedArtifactKind::ExecutionStateSnapshot,
209            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
210        )
211    }
212
213    pub fn lashlang_module() -> Self {
214        Self::new(
215            PersistedArtifactKind::LashlangModule,
216            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
217        )
218    }
219
220    pub fn process_execution_env() -> Self {
221        Self::new(
222            PersistedArtifactKind::ProcessExecutionEnv,
223            vec![BlobStorageHint::Compressible],
224        )
225    }
226}
227
228#[derive(Clone, Debug, PartialEq, Eq, Hash)]
229struct RetainedArtifactRef {
230    pub blob_ref: BlobRef,
231    pub kind: PersistedArtifactKind,
232}
233
234#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
235pub enum BuiltinBlobProfile {
236    LowLatency,
237    #[default]
238    Balanced,
239    Compact,
240}
241
242#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
243pub struct StoreGcPolicy {
244    pub auto_run_every_commits: Option<u64>,
245}
246
247#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
248pub struct StoreOptions {
249    pub blob_profile: BuiltinBlobProfile,
250    pub gc_policy: StoreGcPolicy,
251}
252
253#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
254struct StoredBlobEnvelope {
255    descriptor: BlobArtifactDescriptor,
256    compression: BlobCompression,
257    content: Vec<u8>,
258}
259
260#[derive(Clone, Debug)]
261pub struct StoredSessionCheckpoint {
262    pub checkpoint_ref: BlobRef,
263    pub manifest: SessionCheckpoint,
264}
265
266/// Explicit first-party factory for one SQLite session database per Lash
267/// session.
268///
269/// Hosts opt into this by passing it to `lash::LashCoreBuilder::store_factory`.
270/// The factory never becomes a default: app storage and runtime storage remain
271/// host-owned decisions.
272#[derive(Clone, Debug)]
273pub struct SqliteSessionStoreFactory {
274    root: PathBuf,
275    options: StoreOptions,
276}
277
278impl SqliteSessionStoreFactory {
279    pub fn new(root: impl Into<PathBuf>) -> Self {
280        Self {
281            root: root.into(),
282            options: StoreOptions::default(),
283        }
284    }
285
286    pub fn with_options(root: impl Into<PathBuf>, options: StoreOptions) -> Self {
287        Self {
288            root: root.into(),
289            options,
290        }
291    }
292
293    pub fn path_for_session(&self, session_id: &str) -> PathBuf {
294        self.root.join(safe_session_db_file_name(session_id))
295    }
296}
297
298#[async_trait::async_trait]
299impl SessionStoreFactory for SqliteSessionStoreFactory {
300    fn durability_tier(&self) -> DurabilityTier {
301        DurabilityTier::Durable
302    }
303
304    async fn create_store(
305        &self,
306        request: &SessionStoreCreateRequest,
307    ) -> Result<Arc<dyn RuntimePersistence>, String> {
308        std::fs::create_dir_all(&self.root).map_err(|err| err.to_string())?;
309        let path = self.path_for_session(&request.session_id);
310        let store = Arc::new(
311            Store::open_with_options(&path, self.options)
312                .await
313                .map_err(|err| err.to_string())?,
314        );
315        if store.load_session_meta().await.is_none() {
316            store
317                .save_session_meta(SessionMeta {
318                    session_id: request.session_id.clone(),
319                    session_name: request.session_id.clone(),
320                    created_at: current_timestamp_string(),
321                    model: request.policy.model.id.clone(),
322                    cwd: std::env::current_dir()
323                        .ok()
324                        .and_then(|path| path.to_str().map(str::to_string)),
325                    relation: request.relation.clone(),
326                })
327                .await;
328        }
329        Ok(store as Arc<dyn RuntimePersistence>)
330    }
331
332    async fn open_existing_store(
333        &self,
334        request: &SessionStoreCreateRequest,
335    ) -> Result<Option<Arc<dyn RuntimePersistence>>, String> {
336        let path = self.path_for_session(&request.session_id);
337        if !path.exists() {
338            return Ok(None);
339        }
340        self.create_store(request).await.map(Some)
341    }
342
343    async fn delete_session(&self, session_id: &str) -> Result<(), String> {
344        let db_path = self.path_for_session(session_id);
345        for path in [
346            db_path.clone(),
347            sqlite_sidecar_path(&db_path, "-wal"),
348            sqlite_sidecar_path(&db_path, "-shm"),
349        ] {
350            match std::fs::remove_file(&path) {
351                Ok(()) => {}
352                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
353                Err(err) => {
354                    return Err(format!("remove session store {}: {err}", path.display()));
355                }
356            }
357        }
358        Ok(())
359    }
360}
361
362fn safe_session_db_file_name(session_id: &str) -> String {
363    let mut safe = session_id
364        .chars()
365        .map(|ch| match ch {
366            'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => ch,
367            _ => '_',
368        })
369        .collect::<String>();
370    safe = safe.trim_matches('_').to_string();
371    if safe.is_empty() {
372        safe.push_str("session");
373    }
374    safe.truncate(80);
375    let hash = format!("{:x}", Sha256::digest(session_id.as_bytes()));
376    format!("{safe}-{}.db", &hash[..16])
377}
378
379fn sqlite_sidecar_path(path: &Path, suffix: &str) -> PathBuf {
380    let mut sidecar = path.as_os_str().to_os_string();
381    sidecar.push(suffix);
382    PathBuf::from(sidecar)
383}
384
385fn current_timestamp_string() -> String {
386    let now = SystemTime::now()
387        .duration_since(UNIX_EPOCH)
388        .unwrap_or_default();
389    format!("unix:{}", now.as_secs())
390}
391
392fn current_epoch_ms() -> u64 {
393    SystemTime::now()
394        .duration_since(UNIX_EPOCH)
395        .unwrap_or_default()
396        .as_millis() as u64
397}
398
399fn retained_artifact_refs(checkpoint: &SessionCheckpoint) -> Vec<RetainedArtifactRef> {
400    let mut refs = Vec::new();
401    if let Some(blob_ref) = &checkpoint.tool_state_ref {
402        refs.push(RetainedArtifactRef {
403            blob_ref: blob_ref.clone(),
404            kind: PersistedArtifactKind::ToolState,
405        });
406    }
407    if let Some(blob_ref) = &checkpoint.plugin_snapshot_ref {
408        refs.push(RetainedArtifactRef {
409            blob_ref: blob_ref.clone(),
410            kind: PersistedArtifactKind::PluginSessionSnapshot,
411        });
412    }
413    if let Some(blob_ref) = &checkpoint.execution_state_ref {
414        refs.push(RetainedArtifactRef {
415            blob_ref: blob_ref.clone(),
416            kind: PersistedArtifactKind::ExecutionStateSnapshot,
417        });
418    }
419    refs
420}
421
422fn session_head_meta(head: &SessionHead) -> SessionHeadMeta {
423    SessionHeadMeta {
424        schema_version: lash_core::store::SESSION_HEAD_META_SCHEMA_VERSION,
425        session_id: head.session_id.clone(),
426        head_revision: 0,
427        config: head.config.clone(),
428        agent_frames: head.agent_frames.clone(),
429        current_agent_frame_id: head.current_agent_frame_id.clone(),
430        checkpoint_ref: head.checkpoint_ref.clone(),
431        leaf_node_id: head.graph.leaf_node_id.clone(),
432        graph_node_count: head.graph.nodes.len(),
433        token_ledger: Vec::new(),
434    }
435}
436
437fn encode_json<T: serde::Serialize>(value: &T) -> String {
438    serde_json::to_string(value).expect("persisted state should serialize")
439}
440
441fn should_compress_blob(
442    profile: BuiltinBlobProfile,
443    descriptor: &BlobArtifactDescriptor,
444    len: usize,
445) -> bool {
446    if !descriptor.hints.contains(&BlobStorageHint::Compressible) {
447        return false;
448    }
449    match profile {
450        BuiltinBlobProfile::LowLatency => false,
451        BuiltinBlobProfile::Balanced => len >= 4 * 1024,
452        BuiltinBlobProfile::Compact => len >= 1024,
453    }
454}
455
456fn compress_blob(content: &[u8]) -> Vec<u8> {
457    let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
458    std::io::Write::write_all(&mut encoder, content).expect("compress blob");
459    encoder.finish().expect("submit blob compression")
460}
461
462fn decompress_blob(content: &[u8]) -> Option<Vec<u8>> {
463    let mut decoder = ZlibDecoder::new(content);
464    let mut out = Vec::new();
465    std::io::Read::read_to_end(&mut decoder, &mut out).ok()?;
466    Some(out)
467}
468
469fn encode_artifact_blob(
470    descriptor: &BlobArtifactDescriptor,
471    profile: BuiltinBlobProfile,
472    content: &[u8],
473) -> Vec<u8> {
474    let (compression, stored_content) = if should_compress_blob(profile, descriptor, content.len())
475    {
476        (BlobCompression::Zlib, compress_blob(content))
477    } else {
478        (BlobCompression::None, content.to_vec())
479    };
480    encode_msgpack(&StoredBlobEnvelope {
481        descriptor: descriptor.clone(),
482        compression,
483        content: stored_content,
484    })
485}
486
487fn decode_artifact_blob(bytes: &[u8]) -> Option<Vec<u8>> {
488    let envelope = decode_msgpack::<StoredBlobEnvelope>(bytes)?;
489    match envelope.compression {
490        BlobCompression::None => Some(envelope.content),
491        BlobCompression::Zlib => decompress_blob(&envelope.content),
492    }
493}
494
495/// Read the session head meta off a raw connection. Synchronous because it runs
496/// inside a `conn.call`/`conn.write` closure on the connection thread.
497fn try_load_session_head_meta_from_conn(
498    conn: &Connection,
499) -> Result<Option<SessionHeadMeta>, StoreError> {
500    let row = conn
501        .query_row(
502            "SELECT head_json, head_revision FROM session_head WHERE singleton = 1",
503            [],
504            |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)),
505        )
506        .optional()
507        .map_err(sqlite_error)?;
508    let Some((head_json, head_revision)) = row else {
509        return Ok(None);
510    };
511    let mut meta: SessionHeadMeta = lash_core::store::decode_versioned_json_record(
512        &head_json,
513        "SessionHeadMeta",
514        lash_core::store::SESSION_HEAD_META_SCHEMA_VERSION,
515    )?;
516    meta.head_revision = head_revision as u64;
517    Ok(Some(meta))
518}
519
520fn load_session_head_meta_from_conn(conn: &Connection) -> Option<SessionHeadMeta> {
521    try_load_session_head_meta_from_conn(conn).ok().flatten()
522}
523
524fn load_session_meta_from_conn(conn: &Connection) -> Option<SessionMeta> {
525    conn.query_row(
526        "SELECT session_id, session_name, created_at, model, cwd, relation_json
527         FROM session_meta WHERE singleton = 1",
528        [],
529        |row| {
530            let relation_json: Option<String> = row.get(5)?;
531            let relation = relation_json
532                .and_then(|json| serde_json::from_str(&json).ok())
533                .unwrap_or_default();
534            Ok(SessionMeta {
535                session_id: row.get(0)?,
536                session_name: row.get(1)?,
537                created_at: row.get(2)?,
538                model: row.get(3)?,
539                cwd: row.get(4)?,
540                relation,
541            })
542        },
543    )
544    .optional()
545    .ok()
546    .flatten()
547}
548
549fn decode_checkpoint(bytes: &[u8]) -> Result<SessionCheckpoint, StoreError> {
550    let value: serde_json::Value = rmp_serde::from_slice(bytes)
551        .map_err(|err| StoreError::Backend(format!("failed to decode SessionCheckpoint: {err}")))?;
552    lash_core::store::ensure_supported_record_schema_version(
553        "SessionCheckpoint",
554        &value,
555        lash_core::store::SESSION_CHECKPOINT_SCHEMA_VERSION,
556    )?;
557    rmp_serde::from_slice(bytes)
558        .map_err(|err| StoreError::Backend(format!("failed to decode SessionCheckpoint: {err}")))
559}
560
561fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
562    // Pre-size the buffer so the per-byte writes inside rmp_serde don't
563    // walk the Vec through 0→4→8→16→32… reallocations on every call.
564    let mut buf = Vec::with_capacity(1024);
565    rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
566    buf
567}
568
569fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
570    rmp_serde::from_slice(bytes).ok()
571}
572
573fn merge_token_ledger_entries(
574    entries: Vec<lash_core::TokenLedgerEntry>,
575) -> Vec<lash_core::TokenLedgerEntry> {
576    let mut merged: Vec<lash_core::TokenLedgerEntry> = Vec::new();
577    for entry in entries {
578        if entry.usage.total() == 0 {
579            continue;
580        }
581        if let Some(existing) = merged
582            .iter_mut()
583            .find(|existing| existing.source == entry.source && existing.model == entry.model)
584        {
585            existing.usage.input_tokens += entry.usage.input_tokens;
586            existing.usage.output_tokens += entry.usage.output_tokens;
587            existing.usage.cache_read_input_tokens += entry.usage.cache_read_input_tokens;
588            existing.usage.cache_write_input_tokens += entry.usage.cache_write_input_tokens;
589            existing.usage.reasoning_output_tokens += entry.usage.reasoning_output_tokens;
590        } else {
591            merged.push(entry);
592        }
593    }
594    merged
595}
596
597#[cfg(test)]
598mod tests {
599    use super::*;
600    use lash_core::ProcessInput;
601    use lashlang::LashlangArtifactStore;
602
603    fn registration(id: &str) -> ProcessRegistration {
604        ProcessRegistration::new(
605            id,
606            ProcessInput::External {
607                metadata: serde_json::Value::Null,
608            },
609            lash_core::ProcessProvenance::session(lash_core::SessionScope::new("session")),
610        )
611    }
612
613    #[tokio::test]
614    async fn sqlite_lashlang_artifact_store_round_trips_verified_module_artifacts() {
615        let store = Store::memory().await.expect("memory store");
616        let module =
617            lashlang::parse("process scan(root: str) { finish root }").expect("parse module");
618        let linked = lashlang::LinkedModule::link(
619            module,
620            lashlang::LashlangHostEnvironment::new(
621                lashlang::LashlangHostCatalog::new(),
622                lashlang::LashlangAbilities::all(),
623            ),
624        )
625        .expect("link module");
626
627        store
628            .put_module_artifact(&linked.artifact)
629            .await
630            .expect("put artifact");
631        let restored = store
632            .get_module_artifact(&linked.module_ref)
633            .await
634            .expect("get artifact")
635            .expect("artifact exists");
636
637        assert_eq!(restored.module_ref, linked.module_ref);
638        assert_eq!(
639            restored.process_ref("scan"),
640            linked.artifact.process_ref("scan")
641        );
642    }
643
644    #[tokio::test]
645    async fn sqlite_process_registry_persists_rows_after_reopen() {
646        let dir = tempfile::tempdir().expect("tempdir");
647        let path = dir.path().join("processes.db");
648        {
649            let registry = SqliteProcessRegistry::open(&path)
650                .await
651                .expect("open registry");
652            let session_scope = lash_core::SessionScope::new("session");
653            registry
654                .register_process(registration("proc-persist"))
655                .await
656                .expect("register");
657            registry
658                .grant_handle(
659                    &session_scope,
660                    "proc-persist",
661                    ProcessHandleDescriptor::new(Some("tool"), Some("demo")),
662                )
663                .await
664                .expect("grant");
665            registry
666                .complete_process(
667                    "proc-persist",
668                    ProcessAwaitOutput::Success {
669                        value: serde_json::json!({"ok": true}),
670                        control: None,
671                    },
672                )
673                .await
674                .expect("complete");
675        }
676
677        let registry = SqliteProcessRegistry::open(&path)
678            .await
679            .expect("reopen registry");
680        let session_scope = lash_core::SessionScope::new("session");
681        let record = registry
682            .get_process("proc-persist")
683            .await
684            .expect("persisted process");
685
686        assert_eq!(record.originator_scope_id(), session_scope.id().as_str());
687        assert_eq!(
688            record.provenance.originator,
689            lash_core::ProcessOriginator::session(session_scope.clone())
690        );
691        assert_eq!(
692            registry
693                .await_process("proc-persist")
694                .await
695                .expect("await persisted"),
696            ProcessAwaitOutput::Success {
697                value: serde_json::json!({"ok": true}),
698                control: None,
699            }
700        );
701        assert_eq!(
702            registry
703                .list_handle_grants(&session_scope)
704                .await
705                .expect("grants")
706                .len(),
707            1
708        );
709    }
710}