Skip to main content

lash_sqlite_store/
lib.rs

1//! # lash-sqlite-store
2//!
3//! The high-performance local **durable** persistence backend for the lash
4//! agent runtime. One SQLite database per session, opened in WAL journal mode
5//! with a 15-second busy timeout, satisfying the full [`RuntimePersistence`] +
6//! [`AttachmentManifest`] contract from `lash-core`.
7//!
8//! This crate is a drop-in replacement for `lash-sqlite-store`: it exposes the
9//! same public surface (`Store`, `SqliteProcessRegistry`,
10//! `SqliteSessionStoreFactory`, `SqliteEffectHost`, the option/descriptor types)
11//! with identical async signatures, so a consumer swaps backends by renaming
12//! the crate path only. The difference is the engine underneath: tokio-rusqlite
13//! over a statically-linked SQLite with real WAL (`-wal`/`-shm` sidecars,
14//! multi-process readers + single writer) instead of the prior store's experimental mvcc.
15//!
16//! ## Why this is "the durable backend" not just "an option"
17//!
18//! Lash's runtime layer treats persistence as a first-class boundary, not a
19//! debug-only convenience. Every primitive that lets the runtime survive a
20//! crash — head-revision CAS, final turn-commit idempotency, attachment
21//! write-ahead manifests, blob content-addressing with optional compression —
22//! is implemented in this crate against SQLite for one reason: SQLite is the
23//! simplest backend that gives us *atomic multi-statement transactions on a
24//! single file* with durability guarantees we can reason about.
25//!
26//! ## Schema cutover, not migrations
27//!
28//! There is exactly one supported schema (see [`schema::SCHEMA`]). Older
29//! databases must be deleted before opening — we do not carry migration code.
30//!
31//! [`RuntimePersistence`]: lash_core::RuntimePersistence
32//! [`AttachmentManifest`]: lash_core::AttachmentManifest
33
34use std::collections::BTreeMap;
35use std::path::{Path, PathBuf};
36use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
37use std::sync::{Arc, Mutex};
38use std::time::{Duration, SystemTime, UNIX_EPOCH};
39
40use flate2::Compression;
41use flate2::read::ZlibDecoder;
42use flate2::write::ZlibEncoder;
43use lash_core::runtime::ProcessHandleGrantEntry;
44use lash_core::runtime::{
45    QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
46    QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, prepare_process_event_append,
47    prepare_process_registration,
48};
49use lash_core::store::queued_work::{
50    ClaimCandidate, QueuedWorkClaimLease, claim_scan_limit, derive_batch_id,
51    ensure_completion_owns_all_batches, renewed_claim, select_claim_prefix,
52};
53use lash_core::store::{
54    GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
55    RuntimeCommitResult, SessionCheckpoint, SessionHead, SessionHeadMeta,
56};
57use lash_core::{
58    AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
59    DeliveryPolicy, DurabilityTier, GcReport, MergeKey, PROCESS_LEASE_SCHEMA_VERSION,
60    ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest, ProcessEventAppendResult,
61    ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease,
62    ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
63    RuntimePersistence, SessionMeta, SessionPickerInfo, SessionReadScope, SessionScope,
64    SessionStoreCreateRequest, SessionStoreFactory, SlotPolicy, StoreError, VacuumReport,
65};
66use rusqlite::{Connection, OptionalExtension, Transaction, params};
67use sha2::{Digest, Sha256};
68
69use conn::SqliteConnection;
70
71mod attachments;
72mod blobs;
73mod conn;
74mod effect_replay;
75mod graph;
76mod leases;
77mod lifecycle;
78mod persistence;
79mod process_registry;
80mod queued_work;
81mod schema;
82mod triggers;
83
84use conn::TxOutcome;
85pub use effect_replay::{
86    SqliteEffectHost, SqliteEffectReplayOptions, SqliteRuntimeEffectController,
87};
88use leases::*;
89use queued_work::*;
90use schema::{
91    StoreBacking, apply_pragmas, ensure_effect_schema, ensure_process_schema, ensure_schema,
92    ensure_trigger_schema,
93};
94pub use triggers::SqliteTriggerStore;
95
96/// SQLite-backed store for checkpoint blobs, runtime session state, and
97/// Lashlang artifacts.
98///
99/// This is the first-party local implementation of the runtime store traits.
100/// Internally it holds a single cloneable [`SqliteConnection`] (a
101/// tokio-rusqlite handle to one database thread).
102pub struct Store {
103    conn: SqliteConnection,
104    artifact_cache: Mutex<BTreeMap<lashlang::ModuleRef, Arc<lashlang::ModuleArtifact>>>,
105    options: StoreOptions,
106    commit_count: AtomicU64,
107}
108
109/// SQLite-backed process registry for one configured runtime deployment.
110///
111/// It is intentionally separate from [`Store`]: session databases persist one
112/// conversation, while this registry persists background process state and
113/// handle visibility across all sessions sharing the registry.
114pub struct SqliteProcessRegistry {
115    conn: SqliteConnection,
116    notify: tokio::sync::Notify,
117}
118
119fn sqlite_error(err: rusqlite::Error) -> StoreError {
120    StoreError::Backend(err.to_string())
121}
122
123fn process_sqlite_error(err: rusqlite::Error) -> lash_core::PluginError {
124    lash_core::PluginError::Session(err.to_string())
125}
126
127fn process_decode_error(err: serde_json::Error) -> lash_core::PluginError {
128    lash_core::PluginError::Session(format!("failed to decode process registry row: {err}"))
129}
130
131fn process_encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
132    serde_json::to_string(value).map_err(|err| {
133        lash_core::PluginError::Session(format!("failed to encode process row: {err}"))
134    })
135}
136
137fn block_on_store<T>(future: impl std::future::Future<Output = T>) -> T {
138    futures_executor::block_on(future)
139}
140
141#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
142pub enum PersistedArtifactKind {
143    GenericBlob,
144    CheckpointManifest,
145    ToolState,
146    PluginSessionSnapshot,
147    ExecutionStateSnapshot,
148    LashlangModule,
149    ProcessExecutionEnv,
150}
151
152#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
153pub enum BlobStorageHint {
154    Compressible,
155    InlinePreferred,
156    LargePayload,
157}
158
159#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
160enum BlobCompression {
161    None,
162    Zlib,
163}
164
165#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
166pub struct BlobArtifactDescriptor {
167    pub kind: PersistedArtifactKind,
168    #[serde(default, skip_serializing_if = "Vec::is_empty")]
169    pub hints: Vec<BlobStorageHint>,
170}
171
172impl BlobArtifactDescriptor {
173    pub fn new(kind: PersistedArtifactKind, hints: impl Into<Vec<BlobStorageHint>>) -> Self {
174        Self {
175            kind,
176            hints: hints.into(),
177        }
178    }
179
180    pub fn checkpoint_manifest() -> Self {
181        Self::new(
182            PersistedArtifactKind::CheckpointManifest,
183            vec![BlobStorageHint::Compressible],
184        )
185    }
186
187    pub fn tool_state_snapshot() -> Self {
188        Self::new(
189            PersistedArtifactKind::ToolState,
190            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
191        )
192    }
193
194    pub fn plugin_session_snapshot() -> Self {
195        Self::new(
196            PersistedArtifactKind::PluginSessionSnapshot,
197            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
198        )
199    }
200
201    pub fn execution_state_snapshot() -> Self {
202        Self::new(
203            PersistedArtifactKind::ExecutionStateSnapshot,
204            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
205        )
206    }
207
208    pub fn lashlang_module() -> Self {
209        Self::new(
210            PersistedArtifactKind::LashlangModule,
211            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
212        )
213    }
214
215    pub fn process_execution_env() -> Self {
216        Self::new(
217            PersistedArtifactKind::ProcessExecutionEnv,
218            vec![BlobStorageHint::Compressible],
219        )
220    }
221}
222
223#[derive(Clone, Debug, PartialEq, Eq, Hash)]
224struct RetainedArtifactRef {
225    pub blob_ref: BlobRef,
226    pub kind: PersistedArtifactKind,
227}
228
229#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
230pub enum BuiltinBlobProfile {
231    LowLatency,
232    #[default]
233    Balanced,
234    Compact,
235}
236
237#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
238pub struct StoreGcPolicy {
239    pub auto_run_every_commits: Option<u64>,
240}
241
242#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
243pub struct StoreOptions {
244    pub blob_profile: BuiltinBlobProfile,
245    pub gc_policy: StoreGcPolicy,
246}
247
248#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
249struct StoredBlobEnvelope {
250    descriptor: BlobArtifactDescriptor,
251    compression: BlobCompression,
252    content: Vec<u8>,
253}
254
255#[derive(Clone, Debug)]
256pub struct StoredSessionCheckpoint {
257    pub checkpoint_ref: BlobRef,
258    pub manifest: SessionCheckpoint,
259}
260
261/// Explicit first-party factory for one SQLite session database per Lash
262/// session.
263///
264/// Hosts opt into this by passing it to `lash::LashCoreBuilder::store_factory`.
265/// The factory never becomes a default: app storage and runtime storage remain
266/// host-owned decisions.
267#[derive(Clone, Debug)]
268pub struct SqliteSessionStoreFactory {
269    root: PathBuf,
270    options: StoreOptions,
271}
272
273impl SqliteSessionStoreFactory {
274    pub fn new(root: impl Into<PathBuf>) -> Self {
275        Self {
276            root: root.into(),
277            options: StoreOptions::default(),
278        }
279    }
280
281    pub fn with_options(root: impl Into<PathBuf>, options: StoreOptions) -> Self {
282        Self {
283            root: root.into(),
284            options,
285        }
286    }
287
288    pub fn path_for_session(&self, session_id: &str) -> PathBuf {
289        self.root.join(safe_session_db_file_name(session_id))
290    }
291}
292
293#[async_trait::async_trait]
294impl SessionStoreFactory for SqliteSessionStoreFactory {
295    fn durability_tier(&self) -> DurabilityTier {
296        DurabilityTier::Durable
297    }
298
299    async fn create_store(
300        &self,
301        request: &SessionStoreCreateRequest,
302    ) -> Result<Arc<dyn RuntimePersistence>, String> {
303        std::fs::create_dir_all(&self.root).map_err(|err| err.to_string())?;
304        let path = self.path_for_session(&request.session_id);
305        let store = Arc::new(
306            Store::open_with_options(&path, self.options)
307                .await
308                .map_err(|err| err.to_string())?,
309        );
310        if store.load_session_meta().await.is_none() {
311            store
312                .save_session_meta(SessionMeta {
313                    session_id: request.session_id.clone(),
314                    session_name: request.session_id.clone(),
315                    created_at: current_timestamp_string(),
316                    model: request.policy.model.id.clone(),
317                    cwd: std::env::current_dir()
318                        .ok()
319                        .and_then(|path| path.to_str().map(str::to_string)),
320                    relation: request.relation.clone(),
321                })
322                .await;
323        }
324        Ok(store as Arc<dyn RuntimePersistence>)
325    }
326
327    async fn open_existing_store(
328        &self,
329        request: &SessionStoreCreateRequest,
330    ) -> Result<Option<Arc<dyn RuntimePersistence>>, String> {
331        let path = self.path_for_session(&request.session_id);
332        if !path.exists() {
333            return Ok(None);
334        }
335        self.create_store(request).await.map(Some)
336    }
337
338    async fn delete_session(&self, session_id: &str) -> Result<(), String> {
339        let db_path = self.path_for_session(session_id);
340        for path in [
341            db_path.clone(),
342            sqlite_sidecar_path(&db_path, "-wal"),
343            sqlite_sidecar_path(&db_path, "-shm"),
344        ] {
345            match std::fs::remove_file(&path) {
346                Ok(()) => {}
347                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
348                Err(err) => {
349                    return Err(format!("remove session store {}: {err}", path.display()));
350                }
351            }
352        }
353        Ok(())
354    }
355}
356
357fn safe_session_db_file_name(session_id: &str) -> String {
358    let mut safe = session_id
359        .chars()
360        .map(|ch| match ch {
361            'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => ch,
362            _ => '_',
363        })
364        .collect::<String>();
365    safe = safe.trim_matches('_').to_string();
366    if safe.is_empty() {
367        safe.push_str("session");
368    }
369    safe.truncate(80);
370    let hash = format!("{:x}", Sha256::digest(session_id.as_bytes()));
371    format!("{safe}-{}.db", &hash[..16])
372}
373
374fn sqlite_sidecar_path(path: &Path, suffix: &str) -> PathBuf {
375    let mut sidecar = path.as_os_str().to_os_string();
376    sidecar.push(suffix);
377    PathBuf::from(sidecar)
378}
379
380fn current_timestamp_string() -> String {
381    let now = SystemTime::now()
382        .duration_since(UNIX_EPOCH)
383        .unwrap_or_default();
384    format!("unix:{}", now.as_secs())
385}
386
387fn current_epoch_ms() -> u64 {
388    SystemTime::now()
389        .duration_since(UNIX_EPOCH)
390        .unwrap_or_default()
391        .as_millis() as u64
392}
393
394fn retained_artifact_refs(checkpoint: &SessionCheckpoint) -> Vec<RetainedArtifactRef> {
395    let mut refs = Vec::new();
396    if let Some(blob_ref) = &checkpoint.tool_state_ref {
397        refs.push(RetainedArtifactRef {
398            blob_ref: blob_ref.clone(),
399            kind: PersistedArtifactKind::ToolState,
400        });
401    }
402    if let Some(blob_ref) = &checkpoint.plugin_snapshot_ref {
403        refs.push(RetainedArtifactRef {
404            blob_ref: blob_ref.clone(),
405            kind: PersistedArtifactKind::PluginSessionSnapshot,
406        });
407    }
408    if let Some(blob_ref) = &checkpoint.execution_state_ref {
409        refs.push(RetainedArtifactRef {
410            blob_ref: blob_ref.clone(),
411            kind: PersistedArtifactKind::ExecutionStateSnapshot,
412        });
413    }
414    refs
415}
416
417fn session_head_meta(head: &SessionHead) -> SessionHeadMeta {
418    SessionHeadMeta {
419        session_id: head.session_id.clone(),
420        head_revision: 0,
421        config: head.config.clone(),
422        agent_frames: head.agent_frames.clone(),
423        current_agent_frame_id: head.current_agent_frame_id.clone(),
424        checkpoint_ref: head.checkpoint_ref.clone(),
425        leaf_node_id: head.graph.leaf_node_id.clone(),
426        graph_node_count: head.graph.nodes.len(),
427        token_ledger: Vec::new(),
428    }
429}
430
431fn encode_json<T: serde::Serialize>(value: &T) -> String {
432    serde_json::to_string(value).expect("persisted state should serialize")
433}
434
435fn should_compress_blob(
436    profile: BuiltinBlobProfile,
437    descriptor: &BlobArtifactDescriptor,
438    len: usize,
439) -> bool {
440    if !descriptor.hints.contains(&BlobStorageHint::Compressible) {
441        return false;
442    }
443    match profile {
444        BuiltinBlobProfile::LowLatency => false,
445        BuiltinBlobProfile::Balanced => len >= 4 * 1024,
446        BuiltinBlobProfile::Compact => len >= 1024,
447    }
448}
449
450fn compress_blob(content: &[u8]) -> Vec<u8> {
451    let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
452    std::io::Write::write_all(&mut encoder, content).expect("compress blob");
453    encoder.finish().expect("submit blob compression")
454}
455
456fn decompress_blob(content: &[u8]) -> Option<Vec<u8>> {
457    let mut decoder = ZlibDecoder::new(content);
458    let mut out = Vec::new();
459    std::io::Read::read_to_end(&mut decoder, &mut out).ok()?;
460    Some(out)
461}
462
463fn encode_artifact_blob(
464    descriptor: &BlobArtifactDescriptor,
465    profile: BuiltinBlobProfile,
466    content: &[u8],
467) -> Vec<u8> {
468    let (compression, stored_content) = if should_compress_blob(profile, descriptor, content.len())
469    {
470        (BlobCompression::Zlib, compress_blob(content))
471    } else {
472        (BlobCompression::None, content.to_vec())
473    };
474    encode_msgpack(&StoredBlobEnvelope {
475        descriptor: descriptor.clone(),
476        compression,
477        content: stored_content,
478    })
479}
480
481fn decode_artifact_blob(bytes: &[u8]) -> Option<Vec<u8>> {
482    let envelope = decode_msgpack::<StoredBlobEnvelope>(bytes)?;
483    match envelope.compression {
484        BlobCompression::None => Some(envelope.content),
485        BlobCompression::Zlib => decompress_blob(&envelope.content),
486    }
487}
488
489/// Read the session head meta off a raw connection. Synchronous because it runs
490/// inside a `conn.call`/`conn.write` closure on the connection thread.
491fn try_load_session_head_meta_from_conn(
492    conn: &Connection,
493) -> Result<Option<SessionHeadMeta>, StoreError> {
494    let row = conn
495        .query_row(
496            "SELECT head_json, head_revision FROM session_head WHERE singleton = 1",
497            [],
498            |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)),
499        )
500        .optional()
501        .map_err(sqlite_error)?;
502    let Some((head_json, head_revision)) = row else {
503        return Ok(None);
504    };
505    let mut meta: SessionHeadMeta = serde_json::from_str(&head_json)
506        .map_err(|err| StoreError::Backend(format!("decode session head: {err}")))?;
507    meta.head_revision = head_revision as u64;
508    Ok(Some(meta))
509}
510
511fn load_session_head_meta_from_conn(conn: &Connection) -> Option<SessionHeadMeta> {
512    try_load_session_head_meta_from_conn(conn).ok().flatten()
513}
514
515fn load_session_meta_from_conn(conn: &Connection) -> Option<SessionMeta> {
516    conn.query_row(
517        "SELECT session_id, session_name, created_at, model, cwd, relation_json
518         FROM session_meta WHERE singleton = 1",
519        [],
520        |row| {
521            let relation_json: Option<String> = row.get(5)?;
522            let relation = relation_json
523                .and_then(|json| serde_json::from_str(&json).ok())
524                .unwrap_or_default();
525            Ok(SessionMeta {
526                session_id: row.get(0)?,
527                session_name: row.get(1)?,
528                created_at: row.get(2)?,
529                model: row.get(3)?,
530                cwd: row.get(4)?,
531                relation,
532            })
533        },
534    )
535    .optional()
536    .ok()
537    .flatten()
538}
539
540fn decode_checkpoint(bytes: &[u8]) -> Option<SessionCheckpoint> {
541    rmp_serde::from_slice(bytes).ok()
542}
543
544fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
545    // Pre-size the buffer so the per-byte writes inside rmp_serde don't
546    // walk the Vec through 0→4→8→16→32… reallocations on every call.
547    let mut buf = Vec::with_capacity(1024);
548    rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
549    buf
550}
551
552fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
553    rmp_serde::from_slice(bytes).ok()
554}
555
556fn merge_token_ledger_entries(
557    entries: Vec<lash_core::TokenLedgerEntry>,
558) -> Vec<lash_core::TokenLedgerEntry> {
559    let mut merged: Vec<lash_core::TokenLedgerEntry> = Vec::new();
560    for entry in entries {
561        if entry.usage.total() == 0 {
562            continue;
563        }
564        if let Some(existing) = merged
565            .iter_mut()
566            .find(|existing| existing.source == entry.source && existing.model == entry.model)
567        {
568            existing.usage.input_tokens += entry.usage.input_tokens;
569            existing.usage.output_tokens += entry.usage.output_tokens;
570            existing.usage.cached_input_tokens += entry.usage.cached_input_tokens;
571            existing.usage.reasoning_tokens += entry.usage.reasoning_tokens;
572        } else {
573            merged.push(entry);
574        }
575    }
576    merged
577}
578
579#[cfg(test)]
580mod tests {
581    use super::*;
582    use lash_core::ProcessInput;
583    use lashlang::LashlangArtifactStore;
584
585    fn registration(id: &str) -> ProcessRegistration {
586        ProcessRegistration::new(
587            id,
588            ProcessInput::External {
589                metadata: serde_json::Value::Null,
590            },
591            lash_core::ProcessProvenance::session(lash_core::SessionScope::new("session")),
592        )
593    }
594
595    #[tokio::test]
596    async fn sqlite_lashlang_artifact_store_round_trips_verified_module_artifacts() {
597        let store = Store::memory().await.expect("memory store");
598        let module =
599            lashlang::parse("process scan(root: str) { finish root }").expect("parse module");
600        let linked = lashlang::LinkedModule::link(
601            module,
602            lashlang::LashlangHostEnvironment::new(
603                lashlang::LashlangHostCatalog::new(),
604                lashlang::LashlangAbilities::all(),
605            ),
606        )
607        .expect("link module");
608
609        store
610            .put_module_artifact(&linked.artifact)
611            .await
612            .expect("put artifact");
613        let restored = store
614            .get_module_artifact(&linked.module_ref)
615            .await
616            .expect("get artifact")
617            .expect("artifact exists");
618
619        assert_eq!(restored.module_ref, linked.module_ref);
620        assert_eq!(
621            restored.process_ref("scan"),
622            linked.artifact.process_ref("scan")
623        );
624    }
625
626    #[tokio::test]
627    async fn sqlite_process_registry_persists_rows_after_reopen() {
628        let dir = tempfile::tempdir().expect("tempdir");
629        let path = dir.path().join("processes.db");
630        {
631            let registry = SqliteProcessRegistry::open(&path)
632                .await
633                .expect("open registry");
634            let session_scope = lash_core::SessionScope::new("session");
635            registry
636                .register_process(registration("proc-persist"))
637                .await
638                .expect("register");
639            registry
640                .grant_handle(
641                    &session_scope,
642                    "proc-persist",
643                    ProcessHandleDescriptor::new(Some("tool"), Some("demo")),
644                )
645                .await
646                .expect("grant");
647            registry
648                .complete_process(
649                    "proc-persist",
650                    ProcessAwaitOutput::Success {
651                        value: serde_json::json!({"ok": true}),
652                        control: None,
653                    },
654                )
655                .await
656                .expect("complete");
657        }
658
659        let registry = SqliteProcessRegistry::open(&path)
660            .await
661            .expect("reopen registry");
662        let session_scope = lash_core::SessionScope::new("session");
663        let record = registry
664            .get_process("proc-persist")
665            .await
666            .expect("persisted process");
667
668        assert_eq!(record.originator_scope_id(), session_scope.id().as_str());
669        assert_eq!(
670            record.provenance.originator,
671            lash_core::ProcessOriginator::session(session_scope.clone())
672        );
673        assert_eq!(
674            registry
675                .await_process("proc-persist")
676                .await
677                .expect("await persisted"),
678            ProcessAwaitOutput::Success {
679                value: serde_json::json!({"ok": true}),
680                control: None,
681            }
682        );
683        assert_eq!(
684            registry
685                .list_handle_grants(&session_scope)
686                .await
687                .expect("grants")
688                .len(),
689            1
690        );
691    }
692}