Skip to main content

lash_sqlite_store/
lib.rs

1//! # lash-sqlite-store
2//!
3//! The high-performance local **durable** persistence backend for the lash
4//! agent runtime. One SQLite database per session, opened in WAL journal mode
5//! with a 15-second busy timeout, satisfying the full [`RuntimePersistence`] +
6//! [`AttachmentManifest`] contract from `lash-core`.
7//!
8//! This crate is a drop-in replacement for `lash-sqlite-store`: it exposes the
9//! same public surface (`Store`, `SqliteProcessRegistry`,
10//! `SqliteSessionStoreFactory`, `SqliteEffectHost`, the option/descriptor types)
11//! with identical async signatures, so a consumer swaps backends by renaming
12//! the crate path only. The difference is the engine underneath: tokio-rusqlite
13//! over a statically-linked SQLite with real WAL (`-wal`/`-shm` sidecars,
14//! multi-process readers + single writer) instead of the prior store's experimental mvcc.
15//!
16//! ## Why this is "the durable backend" not just "an option"
17//!
18//! Lash's runtime layer treats persistence as a first-class boundary, not a
19//! debug-only convenience. Every primitive that lets the runtime survive a
20//! crash — head-revision CAS, final turn-commit idempotency, attachment
21//! write-ahead manifests, blob content-addressing with optional compression —
22//! is implemented in this crate against SQLite for one reason: SQLite is the
23//! simplest backend that gives us *atomic multi-statement transactions on a
24//! single file* with durability guarantees we can reason about.
25//!
26//! ## Schema cutover, not migrations
27//!
28//! There is exactly one supported schema (see [`schema::SCHEMA`]). Older
29//! databases must be deleted before opening — we do not carry migration code.
30//!
31//! [`RuntimePersistence`]: lash_core::RuntimePersistence
32//! [`AttachmentManifest`]: lash_core::AttachmentManifest
33
34use std::collections::BTreeMap;
35use std::path::{Path, PathBuf};
36use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
37use std::sync::{Arc, Mutex};
38use std::time::{Duration, SystemTime, UNIX_EPOCH};
39
40use flate2::Compression;
41use flate2::read::ZlibDecoder;
42use flate2::write::ZlibEncoder;
43use lash_core::runtime::ProcessHandleGrantEntry;
44use lash_core::runtime::{
45    QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
46    QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, prepare_process_event_append,
47    prepare_process_registration,
48};
49use lash_core::store::queued_work::{
50    ClaimCandidate, QueuedWorkClaimLease, claim_scan_limit, derive_batch_id,
51    ensure_completion_owns_all_batches, renewed_claim, select_claim_prefix,
52};
53use lash_core::store::{
54    GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
55    RuntimeCommitResult, SessionCheckpoint, SessionHead, SessionHeadMeta,
56};
57use lash_core::{
58    AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
59    DeliveryPolicy, DurabilityTier, GcReport, MergeKey, PROCESS_LEASE_SCHEMA_VERSION,
60    ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest, ProcessEventAppendResult,
61    ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease,
62    ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
63    RuntimePersistence, SessionExecutionLease, SessionExecutionLeaseCompletion,
64    SessionExecutionLeaseFence, SessionMeta, SessionPickerInfo, SessionReadScope, SessionScope,
65    SessionStoreCreateRequest, SessionStoreFactory, SlotPolicy, StoreError, VacuumReport,
66};
67use rusqlite::{Connection, OptionalExtension, Transaction, params};
68use sha2::{Digest, Sha256};
69
70use conn::SqliteConnection;
71
72mod attachments;
73mod blobs;
74mod conn;
75mod effect_replay;
76mod graph;
77mod leases;
78mod lifecycle;
79mod persistence;
80mod process_registry;
81mod queued_work;
82mod schema;
83mod triggers;
84
85use conn::TxOutcome;
86pub use effect_replay::{
87    SqliteEffectHost, SqliteEffectReplayOptions, SqliteRuntimeEffectController,
88};
89use leases::*;
90use queued_work::*;
91use schema::{
92    StoreBacking, apply_pragmas, ensure_effect_schema, ensure_process_schema, ensure_schema,
93    ensure_trigger_schema,
94};
95pub use triggers::SqliteTriggerStore;
96
97/// SQLite-backed store for checkpoint blobs, runtime session state, and
98/// Lashlang artifacts.
99///
100/// This is the first-party local implementation of the runtime store traits.
101/// Internally it holds a single cloneable [`SqliteConnection`] (a
102/// tokio-rusqlite handle to one database thread).
103pub struct Store {
104    conn: SqliteConnection,
105    artifact_cache: Mutex<BTreeMap<lashlang::ModuleRef, Arc<lashlang::ModuleArtifact>>>,
106    options: StoreOptions,
107    commit_count: AtomicU64,
108}
109
110/// SQLite-backed process registry for one configured runtime deployment.
111///
112/// It is intentionally separate from [`Store`]: session databases persist one
113/// conversation, while this registry persists background process state and
114/// handle visibility across all sessions sharing the registry.
115pub struct SqliteProcessRegistry {
116    conn: SqliteConnection,
117    notify: tokio::sync::Notify,
118}
119
120fn sqlite_error(err: rusqlite::Error) -> StoreError {
121    StoreError::Backend(err.to_string())
122}
123
124fn process_sqlite_error(err: rusqlite::Error) -> lash_core::PluginError {
125    lash_core::PluginError::Session(err.to_string())
126}
127
128fn process_decode_error(err: serde_json::Error) -> lash_core::PluginError {
129    lash_core::PluginError::Session(format!("failed to decode process registry row: {err}"))
130}
131
132fn process_encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
133    serde_json::to_string(value).map_err(|err| {
134        lash_core::PluginError::Session(format!("failed to encode process row: {err}"))
135    })
136}
137
138fn block_on_store<T>(future: impl std::future::Future<Output = T>) -> T {
139    futures_executor::block_on(future)
140}
141
142#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
143pub enum PersistedArtifactKind {
144    GenericBlob,
145    CheckpointManifest,
146    ToolState,
147    PluginSessionSnapshot,
148    ExecutionStateSnapshot,
149    LashlangModule,
150    ProcessExecutionEnv,
151}
152
153#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
154pub enum BlobStorageHint {
155    Compressible,
156    InlinePreferred,
157    LargePayload,
158}
159
160#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
161enum BlobCompression {
162    None,
163    Zlib,
164}
165
166#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
167pub struct BlobArtifactDescriptor {
168    pub kind: PersistedArtifactKind,
169    #[serde(default, skip_serializing_if = "Vec::is_empty")]
170    pub hints: Vec<BlobStorageHint>,
171}
172
173impl BlobArtifactDescriptor {
174    pub fn new(kind: PersistedArtifactKind, hints: impl Into<Vec<BlobStorageHint>>) -> Self {
175        Self {
176            kind,
177            hints: hints.into(),
178        }
179    }
180
181    pub fn checkpoint_manifest() -> Self {
182        Self::new(
183            PersistedArtifactKind::CheckpointManifest,
184            vec![BlobStorageHint::Compressible],
185        )
186    }
187
188    pub fn tool_state_snapshot() -> Self {
189        Self::new(
190            PersistedArtifactKind::ToolState,
191            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
192        )
193    }
194
195    pub fn plugin_session_snapshot() -> Self {
196        Self::new(
197            PersistedArtifactKind::PluginSessionSnapshot,
198            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
199        )
200    }
201
202    pub fn execution_state_snapshot() -> Self {
203        Self::new(
204            PersistedArtifactKind::ExecutionStateSnapshot,
205            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
206        )
207    }
208
209    pub fn lashlang_module() -> Self {
210        Self::new(
211            PersistedArtifactKind::LashlangModule,
212            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
213        )
214    }
215
216    pub fn process_execution_env() -> Self {
217        Self::new(
218            PersistedArtifactKind::ProcessExecutionEnv,
219            vec![BlobStorageHint::Compressible],
220        )
221    }
222}
223
224#[derive(Clone, Debug, PartialEq, Eq, Hash)]
225struct RetainedArtifactRef {
226    pub blob_ref: BlobRef,
227    pub kind: PersistedArtifactKind,
228}
229
230#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
231pub enum BuiltinBlobProfile {
232    LowLatency,
233    #[default]
234    Balanced,
235    Compact,
236}
237
238#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
239pub struct StoreGcPolicy {
240    pub auto_run_every_commits: Option<u64>,
241}
242
243#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
244pub struct StoreOptions {
245    pub blob_profile: BuiltinBlobProfile,
246    pub gc_policy: StoreGcPolicy,
247}
248
249#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
250struct StoredBlobEnvelope {
251    descriptor: BlobArtifactDescriptor,
252    compression: BlobCompression,
253    content: Vec<u8>,
254}
255
256#[derive(Clone, Debug)]
257pub struct StoredSessionCheckpoint {
258    pub checkpoint_ref: BlobRef,
259    pub manifest: SessionCheckpoint,
260}
261
262/// Explicit first-party factory for one SQLite session database per Lash
263/// session.
264///
265/// Hosts opt into this by passing it to `lash::LashCoreBuilder::store_factory`.
266/// The factory never becomes a default: app storage and runtime storage remain
267/// host-owned decisions.
268#[derive(Clone, Debug)]
269pub struct SqliteSessionStoreFactory {
270    root: PathBuf,
271    options: StoreOptions,
272}
273
274impl SqliteSessionStoreFactory {
275    pub fn new(root: impl Into<PathBuf>) -> Self {
276        Self {
277            root: root.into(),
278            options: StoreOptions::default(),
279        }
280    }
281
282    pub fn with_options(root: impl Into<PathBuf>, options: StoreOptions) -> Self {
283        Self {
284            root: root.into(),
285            options,
286        }
287    }
288
289    pub fn path_for_session(&self, session_id: &str) -> PathBuf {
290        self.root.join(safe_session_db_file_name(session_id))
291    }
292}
293
294#[async_trait::async_trait]
295impl SessionStoreFactory for SqliteSessionStoreFactory {
296    fn durability_tier(&self) -> DurabilityTier {
297        DurabilityTier::Durable
298    }
299
300    async fn create_store(
301        &self,
302        request: &SessionStoreCreateRequest,
303    ) -> Result<Arc<dyn RuntimePersistence>, String> {
304        std::fs::create_dir_all(&self.root).map_err(|err| err.to_string())?;
305        let path = self.path_for_session(&request.session_id);
306        let store = Arc::new(
307            Store::open_with_options(&path, self.options)
308                .await
309                .map_err(|err| err.to_string())?,
310        );
311        if store.load_session_meta().await.is_none() {
312            store
313                .save_session_meta(SessionMeta {
314                    session_id: request.session_id.clone(),
315                    session_name: request.session_id.clone(),
316                    created_at: current_timestamp_string(),
317                    model: request.policy.model.id.clone(),
318                    cwd: std::env::current_dir()
319                        .ok()
320                        .and_then(|path| path.to_str().map(str::to_string)),
321                    relation: request.relation.clone(),
322                })
323                .await;
324        }
325        Ok(store as Arc<dyn RuntimePersistence>)
326    }
327
328    async fn open_existing_store(
329        &self,
330        request: &SessionStoreCreateRequest,
331    ) -> Result<Option<Arc<dyn RuntimePersistence>>, String> {
332        let path = self.path_for_session(&request.session_id);
333        if !path.exists() {
334            return Ok(None);
335        }
336        self.create_store(request).await.map(Some)
337    }
338
339    async fn delete_session(&self, session_id: &str) -> Result<(), String> {
340        let db_path = self.path_for_session(session_id);
341        for path in [
342            db_path.clone(),
343            sqlite_sidecar_path(&db_path, "-wal"),
344            sqlite_sidecar_path(&db_path, "-shm"),
345        ] {
346            match std::fs::remove_file(&path) {
347                Ok(()) => {}
348                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
349                Err(err) => {
350                    return Err(format!("remove session store {}: {err}", path.display()));
351                }
352            }
353        }
354        Ok(())
355    }
356}
357
358fn safe_session_db_file_name(session_id: &str) -> String {
359    let mut safe = session_id
360        .chars()
361        .map(|ch| match ch {
362            'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => ch,
363            _ => '_',
364        })
365        .collect::<String>();
366    safe = safe.trim_matches('_').to_string();
367    if safe.is_empty() {
368        safe.push_str("session");
369    }
370    safe.truncate(80);
371    let hash = format!("{:x}", Sha256::digest(session_id.as_bytes()));
372    format!("{safe}-{}.db", &hash[..16])
373}
374
375fn sqlite_sidecar_path(path: &Path, suffix: &str) -> PathBuf {
376    let mut sidecar = path.as_os_str().to_os_string();
377    sidecar.push(suffix);
378    PathBuf::from(sidecar)
379}
380
381fn current_timestamp_string() -> String {
382    let now = SystemTime::now()
383        .duration_since(UNIX_EPOCH)
384        .unwrap_or_default();
385    format!("unix:{}", now.as_secs())
386}
387
388fn current_epoch_ms() -> u64 {
389    SystemTime::now()
390        .duration_since(UNIX_EPOCH)
391        .unwrap_or_default()
392        .as_millis() as u64
393}
394
395fn retained_artifact_refs(checkpoint: &SessionCheckpoint) -> Vec<RetainedArtifactRef> {
396    let mut refs = Vec::new();
397    if let Some(blob_ref) = &checkpoint.tool_state_ref {
398        refs.push(RetainedArtifactRef {
399            blob_ref: blob_ref.clone(),
400            kind: PersistedArtifactKind::ToolState,
401        });
402    }
403    if let Some(blob_ref) = &checkpoint.plugin_snapshot_ref {
404        refs.push(RetainedArtifactRef {
405            blob_ref: blob_ref.clone(),
406            kind: PersistedArtifactKind::PluginSessionSnapshot,
407        });
408    }
409    if let Some(blob_ref) = &checkpoint.execution_state_ref {
410        refs.push(RetainedArtifactRef {
411            blob_ref: blob_ref.clone(),
412            kind: PersistedArtifactKind::ExecutionStateSnapshot,
413        });
414    }
415    refs
416}
417
418fn session_head_meta(head: &SessionHead) -> SessionHeadMeta {
419    SessionHeadMeta {
420        session_id: head.session_id.clone(),
421        head_revision: 0,
422        config: head.config.clone(),
423        agent_frames: head.agent_frames.clone(),
424        current_agent_frame_id: head.current_agent_frame_id.clone(),
425        checkpoint_ref: head.checkpoint_ref.clone(),
426        leaf_node_id: head.graph.leaf_node_id.clone(),
427        graph_node_count: head.graph.nodes.len(),
428        token_ledger: Vec::new(),
429    }
430}
431
432fn encode_json<T: serde::Serialize>(value: &T) -> String {
433    serde_json::to_string(value).expect("persisted state should serialize")
434}
435
436fn should_compress_blob(
437    profile: BuiltinBlobProfile,
438    descriptor: &BlobArtifactDescriptor,
439    len: usize,
440) -> bool {
441    if !descriptor.hints.contains(&BlobStorageHint::Compressible) {
442        return false;
443    }
444    match profile {
445        BuiltinBlobProfile::LowLatency => false,
446        BuiltinBlobProfile::Balanced => len >= 4 * 1024,
447        BuiltinBlobProfile::Compact => len >= 1024,
448    }
449}
450
451fn compress_blob(content: &[u8]) -> Vec<u8> {
452    let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
453    std::io::Write::write_all(&mut encoder, content).expect("compress blob");
454    encoder.finish().expect("submit blob compression")
455}
456
457fn decompress_blob(content: &[u8]) -> Option<Vec<u8>> {
458    let mut decoder = ZlibDecoder::new(content);
459    let mut out = Vec::new();
460    std::io::Read::read_to_end(&mut decoder, &mut out).ok()?;
461    Some(out)
462}
463
464fn encode_artifact_blob(
465    descriptor: &BlobArtifactDescriptor,
466    profile: BuiltinBlobProfile,
467    content: &[u8],
468) -> Vec<u8> {
469    let (compression, stored_content) = if should_compress_blob(profile, descriptor, content.len())
470    {
471        (BlobCompression::Zlib, compress_blob(content))
472    } else {
473        (BlobCompression::None, content.to_vec())
474    };
475    encode_msgpack(&StoredBlobEnvelope {
476        descriptor: descriptor.clone(),
477        compression,
478        content: stored_content,
479    })
480}
481
482fn decode_artifact_blob(bytes: &[u8]) -> Option<Vec<u8>> {
483    let envelope = decode_msgpack::<StoredBlobEnvelope>(bytes)?;
484    match envelope.compression {
485        BlobCompression::None => Some(envelope.content),
486        BlobCompression::Zlib => decompress_blob(&envelope.content),
487    }
488}
489
490/// Read the session head meta off a raw connection. Synchronous because it runs
491/// inside a `conn.call`/`conn.write` closure on the connection thread.
492fn try_load_session_head_meta_from_conn(
493    conn: &Connection,
494) -> Result<Option<SessionHeadMeta>, StoreError> {
495    let row = conn
496        .query_row(
497            "SELECT head_json, head_revision FROM session_head WHERE singleton = 1",
498            [],
499            |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)),
500        )
501        .optional()
502        .map_err(sqlite_error)?;
503    let Some((head_json, head_revision)) = row else {
504        return Ok(None);
505    };
506    let mut meta: SessionHeadMeta = serde_json::from_str(&head_json)
507        .map_err(|err| StoreError::Backend(format!("decode session head: {err}")))?;
508    meta.head_revision = head_revision as u64;
509    Ok(Some(meta))
510}
511
512fn load_session_head_meta_from_conn(conn: &Connection) -> Option<SessionHeadMeta> {
513    try_load_session_head_meta_from_conn(conn).ok().flatten()
514}
515
516fn load_session_meta_from_conn(conn: &Connection) -> Option<SessionMeta> {
517    conn.query_row(
518        "SELECT session_id, session_name, created_at, model, cwd, relation_json
519         FROM session_meta WHERE singleton = 1",
520        [],
521        |row| {
522            let relation_json: Option<String> = row.get(5)?;
523            let relation = relation_json
524                .and_then(|json| serde_json::from_str(&json).ok())
525                .unwrap_or_default();
526            Ok(SessionMeta {
527                session_id: row.get(0)?,
528                session_name: row.get(1)?,
529                created_at: row.get(2)?,
530                model: row.get(3)?,
531                cwd: row.get(4)?,
532                relation,
533            })
534        },
535    )
536    .optional()
537    .ok()
538    .flatten()
539}
540
541fn decode_checkpoint(bytes: &[u8]) -> Option<SessionCheckpoint> {
542    rmp_serde::from_slice(bytes).ok()
543}
544
545fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
546    // Pre-size the buffer so the per-byte writes inside rmp_serde don't
547    // walk the Vec through 0→4→8→16→32… reallocations on every call.
548    let mut buf = Vec::with_capacity(1024);
549    rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
550    buf
551}
552
553fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
554    rmp_serde::from_slice(bytes).ok()
555}
556
557fn merge_token_ledger_entries(
558    entries: Vec<lash_core::TokenLedgerEntry>,
559) -> Vec<lash_core::TokenLedgerEntry> {
560    let mut merged: Vec<lash_core::TokenLedgerEntry> = Vec::new();
561    for entry in entries {
562        if entry.usage.total() == 0 {
563            continue;
564        }
565        if let Some(existing) = merged
566            .iter_mut()
567            .find(|existing| existing.source == entry.source && existing.model == entry.model)
568        {
569            existing.usage.input_tokens += entry.usage.input_tokens;
570            existing.usage.output_tokens += entry.usage.output_tokens;
571            existing.usage.cached_input_tokens += entry.usage.cached_input_tokens;
572            existing.usage.reasoning_tokens += entry.usage.reasoning_tokens;
573        } else {
574            merged.push(entry);
575        }
576    }
577    merged
578}
579
580#[cfg(test)]
581mod tests {
582    use super::*;
583    use lash_core::ProcessInput;
584    use lashlang::LashlangArtifactStore;
585
586    fn registration(id: &str) -> ProcessRegistration {
587        ProcessRegistration::new(
588            id,
589            ProcessInput::External {
590                metadata: serde_json::Value::Null,
591            },
592            lash_core::ProcessProvenance::session(lash_core::SessionScope::new("session")),
593        )
594    }
595
596    #[tokio::test]
597    async fn sqlite_lashlang_artifact_store_round_trips_verified_module_artifacts() {
598        let store = Store::memory().await.expect("memory store");
599        let module =
600            lashlang::parse("process scan(root: str) { finish root }").expect("parse module");
601        let linked = lashlang::LinkedModule::link(
602            module,
603            lashlang::LashlangHostEnvironment::new(
604                lashlang::LashlangHostCatalog::new(),
605                lashlang::LashlangAbilities::all(),
606            ),
607        )
608        .expect("link module");
609
610        store
611            .put_module_artifact(&linked.artifact)
612            .await
613            .expect("put artifact");
614        let restored = store
615            .get_module_artifact(&linked.module_ref)
616            .await
617            .expect("get artifact")
618            .expect("artifact exists");
619
620        assert_eq!(restored.module_ref, linked.module_ref);
621        assert_eq!(
622            restored.process_ref("scan"),
623            linked.artifact.process_ref("scan")
624        );
625    }
626
627    #[tokio::test]
628    async fn sqlite_process_registry_persists_rows_after_reopen() {
629        let dir = tempfile::tempdir().expect("tempdir");
630        let path = dir.path().join("processes.db");
631        {
632            let registry = SqliteProcessRegistry::open(&path)
633                .await
634                .expect("open registry");
635            let session_scope = lash_core::SessionScope::new("session");
636            registry
637                .register_process(registration("proc-persist"))
638                .await
639                .expect("register");
640            registry
641                .grant_handle(
642                    &session_scope,
643                    "proc-persist",
644                    ProcessHandleDescriptor::new(Some("tool"), Some("demo")),
645                )
646                .await
647                .expect("grant");
648            registry
649                .complete_process(
650                    "proc-persist",
651                    ProcessAwaitOutput::Success {
652                        value: serde_json::json!({"ok": true}),
653                        control: None,
654                    },
655                )
656                .await
657                .expect("complete");
658        }
659
660        let registry = SqliteProcessRegistry::open(&path)
661            .await
662            .expect("reopen registry");
663        let session_scope = lash_core::SessionScope::new("session");
664        let record = registry
665            .get_process("proc-persist")
666            .await
667            .expect("persisted process");
668
669        assert_eq!(record.originator_scope_id(), session_scope.id().as_str());
670        assert_eq!(
671            record.provenance.originator,
672            lash_core::ProcessOriginator::session(session_scope.clone())
673        );
674        assert_eq!(
675            registry
676                .await_process("proc-persist")
677                .await
678                .expect("await persisted"),
679            ProcessAwaitOutput::Success {
680                value: serde_json::json!({"ok": true}),
681                control: None,
682            }
683        );
684        assert_eq!(
685            registry
686                .list_handle_grants(&session_scope)
687                .await
688                .expect("grants")
689                .len(),
690            1
691        );
692    }
693}