Skip to main content

lash_sqlite_store/
lib.rs

1//! # lash-sqlite-store
2//!
3//! The high-performance local **durable** persistence backend for the lash
4//! agent runtime. One SQLite database per session, opened in WAL journal mode
5//! with a 15-second busy timeout, satisfying the full [`RuntimePersistence`] +
6//! [`AttachmentManifest`] contract from `lash-core`.
7//!
8//! This crate is a drop-in replacement for `lash-sqlite-store`: it exposes the
9//! same public surface (`Store`, `SqliteProcessRegistry`,
10//! `SqliteSessionStoreFactory`, `SqliteEffectHost`, the option/descriptor types)
11//! with identical async signatures, so a consumer swaps backends by renaming
12//! the crate path only. The difference is the engine underneath: tokio-rusqlite
13//! over a statically-linked SQLite with real WAL (`-wal`/`-shm` sidecars,
14//! multi-process readers + single writer) instead of the prior store's experimental mvcc.
15//!
16//! ## Why this is "the durable backend" not just "an option"
17//!
18//! Lash's runtime layer treats persistence as a first-class boundary, not a
19//! debug-only convenience. Every primitive that lets the runtime survive a
20//! crash — head-revision CAS, final turn-commit idempotency, attachment
21//! write-ahead manifests, blob content-addressing with optional compression —
22//! is implemented in this crate against SQLite for one reason: SQLite is the
23//! simplest backend that gives us *atomic multi-statement transactions on a
24//! single file* with durability guarantees we can reason about.
25//!
26//! ## Schema cutover, not migrations
27//!
28//! There is exactly one supported schema (see [`schema::SCHEMA`]). Older
29//! databases must be deleted before opening — we do not carry migration code.
30//!
31//! [`RuntimePersistence`]: lash_core::RuntimePersistence
32//! [`AttachmentManifest`]: lash_core::AttachmentManifest
33
34use std::collections::BTreeMap;
35use std::path::{Path, PathBuf};
36use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
37use std::sync::{Arc, Mutex};
38use std::time::{Duration, SystemTime, UNIX_EPOCH};
39
40use flate2::Compression;
41use flate2::read::ZlibDecoder;
42use flate2::write::ZlibEncoder;
43use lash_core::runtime::ProcessHandleGrantEntry;
44use lash_core::runtime::{
45    QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
46    QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, prepare_process_event_append,
47    prepare_process_registration,
48};
49use lash_core::store::queued_work::{
50    ClaimCandidate, QueuedWorkClaimLease, claim_scan_limit, derive_batch_id,
51    ensure_completion_owns_all_batches, renewed_claim, select_claim_prefix,
52};
53use lash_core::store::{
54    GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
55    RuntimeCommitResult, SessionCheckpoint, SessionHead, SessionHeadMeta,
56};
57use lash_core::{
58    AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
59    DeliveryPolicy, DurabilityTier, GcReport, MergeKey, PROCESS_LEASE_SCHEMA_VERSION,
60    ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest, ProcessEventAppendResult,
61    ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease,
62    ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
63    RuntimePersistence, SessionMeta, SessionPickerInfo, SessionReadScope, SessionScope,
64    SessionStoreCreateRequest, SessionStoreFactory, SlotPolicy, StoreError, VacuumReport,
65};
66use rusqlite::{Connection, OptionalExtension, Transaction, params};
67use sha2::{Digest, Sha256};
68
69use conn::SqliteConnection;
70
71mod attachments;
72mod blobs;
73mod conn;
74mod effect_replay;
75mod graph;
76mod host_events;
77mod leases;
78mod lifecycle;
79mod persistence;
80mod process_registry;
81mod queued_work;
82mod schema;
83
84use conn::TxOutcome;
85pub use effect_replay::{
86    SqliteEffectHost, SqliteEffectReplayOptions, SqliteRuntimeEffectController,
87};
88pub use host_events::SqliteHostEventStore;
89use leases::*;
90use queued_work::*;
91use schema::{
92    StoreBacking, apply_pragmas, ensure_effect_schema, ensure_host_event_schema,
93    ensure_process_schema, ensure_schema,
94};
95
96/// SQLite-backed store for checkpoint blobs and the canonical session head.
97///
98/// The struct name and every public method match `lash_sqlite_store::Store`
99/// exactly so consumers swap backends with a path rename. Internally it holds a
100/// single cloneable [`SqliteConnection`] (a tokio-rusqlite handle to one
101/// database thread) rather than the prior store's `tokio::sync::Mutex<rusqlite::Connection>`.
102pub struct Store {
103    conn: SqliteConnection,
104    artifact_cache: Mutex<BTreeMap<lashlang::ModuleRef, Arc<lashlang::ModuleArtifact>>>,
105    options: StoreOptions,
106    commit_count: AtomicU64,
107}
108
109/// SQLite-backed process registry for one configured runtime deployment.
110///
111/// Named `SqliteProcessRegistry` so the path-rename swap keeps compiling; this is
112/// the SQLite implementation. It is intentionally separate from [`Store`]:
113/// session databases persist one conversation, while this registry persists
114/// background process state and handle visibility across all sessions in the
115/// same host profile.
116pub struct SqliteProcessRegistry {
117    conn: SqliteConnection,
118    notify: tokio::sync::Notify,
119}
120
121fn sqlite_error(err: rusqlite::Error) -> StoreError {
122    StoreError::Backend(err.to_string())
123}
124
125fn process_sqlite_error(err: rusqlite::Error) -> lash_core::PluginError {
126    lash_core::PluginError::Session(err.to_string())
127}
128
129fn process_decode_error(err: serde_json::Error) -> lash_core::PluginError {
130    lash_core::PluginError::Session(format!("failed to decode process registry row: {err}"))
131}
132
133fn process_encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
134    serde_json::to_string(value).map_err(|err| {
135        lash_core::PluginError::Session(format!("failed to encode process row: {err}"))
136    })
137}
138
139fn block_on_store<T>(future: impl std::future::Future<Output = T>) -> T {
140    futures_executor::block_on(future)
141}
142
143#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
144pub enum PersistedArtifactKind {
145    GenericBlob,
146    CheckpointManifest,
147    ToolState,
148    PluginSessionSnapshot,
149    ExecutionStateSnapshot,
150    LashlangModule,
151    ProcessExecutionEnv,
152}
153
154#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
155pub enum BlobStorageHint {
156    Compressible,
157    InlinePreferred,
158    LargePayload,
159}
160
161#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
162enum BlobCompression {
163    None,
164    Zlib,
165}
166
167#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
168pub struct BlobArtifactDescriptor {
169    pub kind: PersistedArtifactKind,
170    #[serde(default, skip_serializing_if = "Vec::is_empty")]
171    pub hints: Vec<BlobStorageHint>,
172}
173
174impl BlobArtifactDescriptor {
175    pub fn new(kind: PersistedArtifactKind, hints: impl Into<Vec<BlobStorageHint>>) -> Self {
176        Self {
177            kind,
178            hints: hints.into(),
179        }
180    }
181
182    pub fn checkpoint_manifest() -> Self {
183        Self::new(
184            PersistedArtifactKind::CheckpointManifest,
185            vec![BlobStorageHint::Compressible],
186        )
187    }
188
189    pub fn tool_state_snapshot() -> Self {
190        Self::new(
191            PersistedArtifactKind::ToolState,
192            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
193        )
194    }
195
196    pub fn plugin_session_snapshot() -> Self {
197        Self::new(
198            PersistedArtifactKind::PluginSessionSnapshot,
199            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
200        )
201    }
202
203    pub fn execution_state_snapshot() -> Self {
204        Self::new(
205            PersistedArtifactKind::ExecutionStateSnapshot,
206            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
207        )
208    }
209
210    pub fn lashlang_module() -> Self {
211        Self::new(
212            PersistedArtifactKind::LashlangModule,
213            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
214        )
215    }
216
217    pub fn process_execution_env() -> Self {
218        Self::new(
219            PersistedArtifactKind::ProcessExecutionEnv,
220            vec![BlobStorageHint::Compressible],
221        )
222    }
223}
224
225#[derive(Clone, Debug, PartialEq, Eq, Hash)]
226struct RetainedArtifactRef {
227    pub blob_ref: BlobRef,
228    pub kind: PersistedArtifactKind,
229}
230
231#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
232pub enum BuiltinBlobProfile {
233    LowLatency,
234    #[default]
235    Balanced,
236    Compact,
237}
238
239#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
240pub struct StoreGcPolicy {
241    pub auto_run_every_commits: Option<u64>,
242}
243
244#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
245pub struct StoreOptions {
246    pub blob_profile: BuiltinBlobProfile,
247    pub gc_policy: StoreGcPolicy,
248}
249
250#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
251struct StoredBlobEnvelope {
252    descriptor: BlobArtifactDescriptor,
253    compression: BlobCompression,
254    content: Vec<u8>,
255}
256
257#[derive(Clone, Debug)]
258pub struct StoredSessionCheckpoint {
259    pub checkpoint_ref: BlobRef,
260    pub manifest: SessionCheckpoint,
261}
262
263/// Explicit first-party factory for one SQLite session database per Lash
264/// session.
265///
266/// Named `SqliteSessionStoreFactory` so the path-rename swap keeps compiling.
267/// Hosts opt into this by passing it to `lash::LashCoreBuilder::store_factory`.
268/// The factory never becomes a default: app storage and runtime storage remain
269/// host-owned decisions.
270#[derive(Clone, Debug)]
271pub struct SqliteSessionStoreFactory {
272    root: PathBuf,
273    options: StoreOptions,
274}
275
276impl SqliteSessionStoreFactory {
277    pub fn new(root: impl Into<PathBuf>) -> Self {
278        Self {
279            root: root.into(),
280            options: StoreOptions::default(),
281        }
282    }
283
284    pub fn with_options(root: impl Into<PathBuf>, options: StoreOptions) -> Self {
285        Self {
286            root: root.into(),
287            options,
288        }
289    }
290
291    pub fn path_for_session(&self, session_id: &str) -> PathBuf {
292        self.root.join(safe_session_db_file_name(session_id))
293    }
294}
295
296#[async_trait::async_trait]
297impl SessionStoreFactory for SqliteSessionStoreFactory {
298    fn durability_tier(&self) -> DurabilityTier {
299        DurabilityTier::Durable
300    }
301
302    async fn create_store(
303        &self,
304        request: &SessionStoreCreateRequest,
305    ) -> Result<Arc<dyn RuntimePersistence>, String> {
306        std::fs::create_dir_all(&self.root).map_err(|err| err.to_string())?;
307        let path = self.path_for_session(&request.session_id);
308        let store = Arc::new(
309            Store::open_with_options(&path, self.options)
310                .await
311                .map_err(|err| err.to_string())?,
312        );
313        if store.load_session_meta().await.is_none() {
314            store
315                .save_session_meta(SessionMeta {
316                    session_id: request.session_id.clone(),
317                    session_name: request.session_id.clone(),
318                    created_at: current_timestamp_string(),
319                    model: request.policy.model.id.clone(),
320                    cwd: std::env::current_dir()
321                        .ok()
322                        .and_then(|path| path.to_str().map(str::to_string)),
323                    relation: request.relation.clone(),
324                })
325                .await;
326        }
327        Ok(store as Arc<dyn RuntimePersistence>)
328    }
329
330    async fn open_existing_store(
331        &self,
332        request: &SessionStoreCreateRequest,
333    ) -> Result<Option<Arc<dyn RuntimePersistence>>, String> {
334        let path = self.path_for_session(&request.session_id);
335        if !path.exists() {
336            return Ok(None);
337        }
338        self.create_store(request).await.map(Some)
339    }
340
341    async fn delete_session(&self, session_id: &str) -> Result<(), String> {
342        let db_path = self.path_for_session(session_id);
343        for path in [
344            db_path.clone(),
345            sqlite_sidecar_path(&db_path, "-wal"),
346            sqlite_sidecar_path(&db_path, "-shm"),
347        ] {
348            match std::fs::remove_file(&path) {
349                Ok(()) => {}
350                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
351                Err(err) => {
352                    return Err(format!("remove session store {}: {err}", path.display()));
353                }
354            }
355        }
356        Ok(())
357    }
358}
359
360fn safe_session_db_file_name(session_id: &str) -> String {
361    let mut safe = session_id
362        .chars()
363        .map(|ch| match ch {
364            'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => ch,
365            _ => '_',
366        })
367        .collect::<String>();
368    safe = safe.trim_matches('_').to_string();
369    if safe.is_empty() {
370        safe.push_str("session");
371    }
372    safe.truncate(80);
373    let hash = format!("{:x}", Sha256::digest(session_id.as_bytes()));
374    format!("{safe}-{}.db", &hash[..16])
375}
376
377fn sqlite_sidecar_path(path: &Path, suffix: &str) -> PathBuf {
378    let mut sidecar = path.as_os_str().to_os_string();
379    sidecar.push(suffix);
380    PathBuf::from(sidecar)
381}
382
383fn current_timestamp_string() -> String {
384    let now = SystemTime::now()
385        .duration_since(UNIX_EPOCH)
386        .unwrap_or_default();
387    format!("unix:{}", now.as_secs())
388}
389
390fn current_epoch_ms() -> u64 {
391    SystemTime::now()
392        .duration_since(UNIX_EPOCH)
393        .unwrap_or_default()
394        .as_millis() as u64
395}
396
397fn retained_artifact_refs(checkpoint: &SessionCheckpoint) -> Vec<RetainedArtifactRef> {
398    let mut refs = Vec::new();
399    if let Some(blob_ref) = &checkpoint.tool_state_ref {
400        refs.push(RetainedArtifactRef {
401            blob_ref: blob_ref.clone(),
402            kind: PersistedArtifactKind::ToolState,
403        });
404    }
405    if let Some(blob_ref) = &checkpoint.plugin_snapshot_ref {
406        refs.push(RetainedArtifactRef {
407            blob_ref: blob_ref.clone(),
408            kind: PersistedArtifactKind::PluginSessionSnapshot,
409        });
410    }
411    if let Some(blob_ref) = &checkpoint.execution_state_ref {
412        refs.push(RetainedArtifactRef {
413            blob_ref: blob_ref.clone(),
414            kind: PersistedArtifactKind::ExecutionStateSnapshot,
415        });
416    }
417    refs
418}
419
420fn session_head_meta(head: &SessionHead) -> SessionHeadMeta {
421    SessionHeadMeta {
422        session_id: head.session_id.clone(),
423        head_revision: 0,
424        config: head.config.clone(),
425        agent_frames: head.agent_frames.clone(),
426        current_agent_frame_id: head.current_agent_frame_id.clone(),
427        checkpoint_ref: head.checkpoint_ref.clone(),
428        leaf_node_id: head.graph.leaf_node_id.clone(),
429        graph_node_count: head.graph.nodes.len(),
430        token_ledger: Vec::new(),
431    }
432}
433
434fn encode_json<T: serde::Serialize>(value: &T) -> String {
435    serde_json::to_string(value).expect("persisted state should serialize")
436}
437
438fn should_compress_blob(
439    profile: BuiltinBlobProfile,
440    descriptor: &BlobArtifactDescriptor,
441    len: usize,
442) -> bool {
443    if !descriptor.hints.contains(&BlobStorageHint::Compressible) {
444        return false;
445    }
446    match profile {
447        BuiltinBlobProfile::LowLatency => false,
448        BuiltinBlobProfile::Balanced => len >= 4 * 1024,
449        BuiltinBlobProfile::Compact => len >= 1024,
450    }
451}
452
453fn compress_blob(content: &[u8]) -> Vec<u8> {
454    let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
455    std::io::Write::write_all(&mut encoder, content).expect("compress blob");
456    encoder.finish().expect("submit blob compression")
457}
458
459fn decompress_blob(content: &[u8]) -> Option<Vec<u8>> {
460    let mut decoder = ZlibDecoder::new(content);
461    let mut out = Vec::new();
462    std::io::Read::read_to_end(&mut decoder, &mut out).ok()?;
463    Some(out)
464}
465
466fn encode_artifact_blob(
467    descriptor: &BlobArtifactDescriptor,
468    profile: BuiltinBlobProfile,
469    content: &[u8],
470) -> Vec<u8> {
471    let (compression, stored_content) = if should_compress_blob(profile, descriptor, content.len())
472    {
473        (BlobCompression::Zlib, compress_blob(content))
474    } else {
475        (BlobCompression::None, content.to_vec())
476    };
477    encode_msgpack(&StoredBlobEnvelope {
478        descriptor: descriptor.clone(),
479        compression,
480        content: stored_content,
481    })
482}
483
484fn decode_artifact_blob(bytes: &[u8]) -> Option<Vec<u8>> {
485    let envelope = decode_msgpack::<StoredBlobEnvelope>(bytes)?;
486    match envelope.compression {
487        BlobCompression::None => Some(envelope.content),
488        BlobCompression::Zlib => decompress_blob(&envelope.content),
489    }
490}
491
492/// Read the session head meta off a raw connection. Synchronous because it runs
493/// inside a `conn.call`/`conn.write` closure on the connection thread.
494fn try_load_session_head_meta_from_conn(
495    conn: &Connection,
496) -> Result<Option<SessionHeadMeta>, StoreError> {
497    let row = conn
498        .query_row(
499            "SELECT head_json, head_revision FROM session_head WHERE singleton = 1",
500            [],
501            |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)),
502        )
503        .optional()
504        .map_err(sqlite_error)?;
505    let Some((head_json, head_revision)) = row else {
506        return Ok(None);
507    };
508    let mut meta: SessionHeadMeta = serde_json::from_str(&head_json)
509        .map_err(|err| StoreError::Backend(format!("decode session head: {err}")))?;
510    meta.head_revision = head_revision as u64;
511    Ok(Some(meta))
512}
513
514fn load_session_head_meta_from_conn(conn: &Connection) -> Option<SessionHeadMeta> {
515    try_load_session_head_meta_from_conn(conn).ok().flatten()
516}
517
518fn load_session_meta_from_conn(conn: &Connection) -> Option<SessionMeta> {
519    conn.query_row(
520        "SELECT session_id, session_name, created_at, model, cwd, relation_json
521         FROM session_meta WHERE singleton = 1",
522        [],
523        |row| {
524            let relation_json: Option<String> = row.get(5)?;
525            let relation = relation_json
526                .and_then(|json| serde_json::from_str(&json).ok())
527                .unwrap_or_default();
528            Ok(SessionMeta {
529                session_id: row.get(0)?,
530                session_name: row.get(1)?,
531                created_at: row.get(2)?,
532                model: row.get(3)?,
533                cwd: row.get(4)?,
534                relation,
535            })
536        },
537    )
538    .optional()
539    .ok()
540    .flatten()
541}
542
543fn decode_checkpoint(bytes: &[u8]) -> Option<SessionCheckpoint> {
544    rmp_serde::from_slice(bytes).ok()
545}
546
547fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
548    // Pre-size the buffer so the per-byte writes inside rmp_serde don't
549    // walk the Vec through 0→4→8→16→32… reallocations on every call.
550    let mut buf = Vec::with_capacity(1024);
551    rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
552    buf
553}
554
555fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
556    rmp_serde::from_slice(bytes).ok()
557}
558
559fn merge_token_ledger_entries(
560    entries: Vec<lash_core::TokenLedgerEntry>,
561) -> Vec<lash_core::TokenLedgerEntry> {
562    let mut merged: Vec<lash_core::TokenLedgerEntry> = Vec::new();
563    for entry in entries {
564        if entry.usage.total() == 0 {
565            continue;
566        }
567        if let Some(existing) = merged
568            .iter_mut()
569            .find(|existing| existing.source == entry.source && existing.model == entry.model)
570        {
571            existing.usage.input_tokens += entry.usage.input_tokens;
572            existing.usage.output_tokens += entry.usage.output_tokens;
573            existing.usage.cached_input_tokens += entry.usage.cached_input_tokens;
574            existing.usage.reasoning_tokens += entry.usage.reasoning_tokens;
575        } else {
576            merged.push(entry);
577        }
578    }
579    merged
580}
581
582#[cfg(test)]
583mod tests {
584    use super::*;
585    use lash_core::ProcessInput;
586    use lashlang::LashlangArtifactStore;
587
588    fn registration(id: &str) -> ProcessRegistration {
589        ProcessRegistration::new(
590            id,
591            ProcessInput::External {
592                metadata: serde_json::Value::Null,
593            },
594            lash_core::ProcessProvenance::session(
595                lash_core::SessionScope::new("session"),
596                "test-host",
597            ),
598        )
599    }
600
601    #[tokio::test]
602    async fn sqlite_lashlang_artifact_store_round_trips_verified_module_artifacts() {
603        let store = Store::memory().await.expect("memory store");
604        let module =
605            lashlang::parse("process scan(root: str) { finish root }").expect("parse module");
606        let linked = lashlang::LinkedModule::link(
607            module,
608            lashlang::LashlangSurface::new(
609                lashlang::ResourceCatalog::new(),
610                lashlang::LashlangAbilities::all(),
611            ),
612        )
613        .expect("link module");
614
615        store
616            .put_module_artifact(&linked.artifact)
617            .await
618            .expect("put artifact");
619        let restored = store
620            .get_module_artifact(&linked.module_ref)
621            .await
622            .expect("get artifact")
623            .expect("artifact exists");
624
625        assert_eq!(restored.module_ref, linked.module_ref);
626        assert_eq!(
627            restored.process_ref("scan"),
628            linked.artifact.process_ref("scan")
629        );
630    }
631
632    #[tokio::test]
633    async fn sqlite_process_registry_persists_rows_after_reopen() {
634        let dir = tempfile::tempdir().expect("tempdir");
635        let path = dir.path().join("processes.db");
636        {
637            let registry = SqliteProcessRegistry::open(&path)
638                .await
639                .expect("open registry");
640            let session_scope = lash_core::SessionScope::new("session");
641            registry
642                .register_process(registration("proc-persist"))
643                .await
644                .expect("register");
645            registry
646                .grant_handle(
647                    &session_scope,
648                    "proc-persist",
649                    ProcessHandleDescriptor::new(Some("tool"), Some("demo")),
650                )
651                .await
652                .expect("grant");
653            registry
654                .complete_process(
655                    "proc-persist",
656                    ProcessAwaitOutput::Success {
657                        value: serde_json::json!({"ok": true}),
658                        control: None,
659                    },
660                )
661                .await
662                .expect("complete");
663        }
664
665        let registry = SqliteProcessRegistry::open(&path)
666            .await
667            .expect("reopen registry");
668        let session_scope = lash_core::SessionScope::new("session");
669        let record = registry
670            .get_process("proc-persist")
671            .await
672            .expect("persisted process");
673
674        assert_eq!(record.originator_scope_id(), session_scope.id().as_str());
675        assert_eq!(
676            record.provenance.originator,
677            lash_core::ProcessOriginator::session(session_scope.clone())
678        );
679        assert_eq!(
680            registry
681                .await_process("proc-persist")
682                .await
683                .expect("await persisted"),
684            ProcessAwaitOutput::Success {
685                value: serde_json::json!({"ok": true}),
686                control: None,
687            }
688        );
689        assert_eq!(
690            registry
691                .list_handle_grants(&session_scope)
692                .await
693                .expect("grants")
694                .len(),
695            1
696        );
697    }
698}