Skip to main content

lash_sqlite_store/
lib.rs

1//! # lash-sqlite-store
2//!
3//! The high-performance local **durable** persistence backend for the lash
4//! agent runtime. One SQLite database per session, opened in WAL journal mode
5//! with a 15-second busy timeout, satisfying the full [`RuntimePersistence`] +
6//! [`AttachmentManifest`] contract from `lash-core`.
7//!
8//! This crate is a drop-in replacement for `lash-sqlite-store`: it exposes the
9//! same public surface (`Store`, `SqliteProcessRegistry`,
10//! `SqliteSessionStoreFactory`, `SqliteEffectHost`, the option/descriptor types)
11//! with identical async signatures, so a consumer swaps backends by renaming
12//! the crate path only. The difference is the engine underneath: tokio-rusqlite
13//! over a statically-linked SQLite with real WAL (`-wal`/`-shm` sidecars,
14//! multi-process readers + single writer) instead of the prior store's experimental mvcc.
15//!
16//! ## Why this is "the durable backend" not just "an option"
17//!
18//! Lash's runtime layer treats persistence as a first-class boundary, not a
19//! debug-only convenience. Every primitive that lets the runtime survive a
20//! crash — head-revision CAS, final turn-commit idempotency, attachment
21//! write-ahead manifests, blob content-addressing with optional compression —
22//! is implemented in this crate against SQLite for one reason: SQLite is the
23//! simplest backend that gives us *atomic multi-statement transactions on a
24//! single file* with durability guarantees we can reason about.
25//!
26//! ## Schema cutover, not migrations
27//!
28//! There is exactly one supported schema (see [`schema::SCHEMA`]). Older
29//! databases must be deleted before opening — we do not carry migration code.
30//!
31//! [`RuntimePersistence`]: lash_core::RuntimePersistence
32//! [`AttachmentManifest`]: lash_core::AttachmentManifest
33
34use std::collections::BTreeMap;
35use std::path::{Path, PathBuf};
36use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
37use std::sync::{Arc, Mutex};
38use std::time::{Duration, SystemTime, UNIX_EPOCH};
39
40use flate2::Compression;
41use flate2::read::ZlibDecoder;
42use flate2::write::ZlibEncoder;
43use lash_core::runtime::ProcessHandleGrantEntry;
44use lash_core::runtime::{
45    QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
46    QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, prepare_process_event_append,
47    prepare_process_registration,
48};
49use lash_core::store::queued_work::{
50    ClaimCandidate, QueuedWorkClaimLease, claim_scan_limit, derive_batch_id,
51    ensure_completion_owns_all_batches, renewed_claim, select_claim_prefix,
52};
53use lash_core::store::{
54    GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
55    RuntimeCommitResult, SessionCheckpoint, SessionHead, SessionHeadMeta,
56};
57use lash_core::{
58    AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
59    DeliveryPolicy, DurabilityTier, GcReport, LeaseOwnerIdentity, LeaseOwnerLiveness, MergeKey,
60    PROCESS_LEASE_SCHEMA_VERSION, ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest,
61    ProcessEventAppendResult, ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant,
62    ProcessLease, ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
63    RuntimePersistence, SessionExecutionLease, SessionExecutionLeaseClaimOutcome,
64    SessionExecutionLeaseCompletion, SessionExecutionLeaseFence, SessionMeta, SessionPickerInfo,
65    SessionReadScope, SessionScope, SessionStoreCreateRequest, SessionStoreFactory, SlotPolicy,
66    StoreError, VacuumReport,
67};
68use rusqlite::{Connection, OptionalExtension, Transaction, params};
69use sha2::{Digest, Sha256};
70
71use conn::SqliteConnection;
72
73mod attachments;
74mod blobs;
75mod conn;
76mod effect_replay;
77mod graph;
78mod leases;
79mod lifecycle;
80mod persistence;
81mod process_registry;
82mod queued_work;
83mod schema;
84mod triggers;
85
86use conn::TxOutcome;
87pub use effect_replay::{
88    SqliteEffectHost, SqliteEffectReplayOptions, SqliteRuntimeEffectController,
89};
90use leases::*;
91use queued_work::*;
92use schema::{
93    StoreBacking, apply_pragmas, ensure_effect_schema, ensure_process_schema, ensure_schema,
94    ensure_trigger_schema,
95};
96pub use triggers::SqliteTriggerStore;
97
98/// SQLite-backed store for checkpoint blobs, runtime session state, and
99/// Lashlang artifacts.
100///
101/// This is the first-party local implementation of the runtime store traits.
102/// Internally it holds a single cloneable [`SqliteConnection`] (a
103/// tokio-rusqlite handle to one database thread).
104pub struct Store {
105    conn: SqliteConnection,
106    artifact_cache: Mutex<BTreeMap<lashlang::ModuleRef, Arc<lashlang::ModuleArtifact>>>,
107    options: StoreOptions,
108    commit_count: AtomicU64,
109}
110
111/// SQLite-backed process registry for one configured runtime deployment.
112///
113/// It is intentionally separate from [`Store`]: session databases persist one
114/// conversation, while this registry persists background process state and
115/// handle visibility across all sessions sharing the registry.
116pub struct SqliteProcessRegistry {
117    conn: SqliteConnection,
118    notify: tokio::sync::Notify,
119}
120
121fn sqlite_error(err: rusqlite::Error) -> StoreError {
122    StoreError::Backend(err.to_string())
123}
124
125fn process_sqlite_error(err: rusqlite::Error) -> lash_core::PluginError {
126    lash_core::PluginError::Session(err.to_string())
127}
128
129fn process_decode_error(err: serde_json::Error) -> lash_core::PluginError {
130    lash_core::PluginError::Session(format!("failed to decode process registry row: {err}"))
131}
132
133fn process_encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
134    serde_json::to_string(value).map_err(|err| {
135        lash_core::PluginError::Session(format!("failed to encode process row: {err}"))
136    })
137}
138
139fn block_on_store<T>(future: impl std::future::Future<Output = T>) -> T {
140    futures_executor::block_on(future)
141}
142
143#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
144pub enum PersistedArtifactKind {
145    GenericBlob,
146    CheckpointManifest,
147    ToolState,
148    PluginSessionSnapshot,
149    ExecutionStateSnapshot,
150    LashlangModule,
151    ProcessExecutionEnv,
152}
153
154#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
155pub enum BlobStorageHint {
156    Compressible,
157    InlinePreferred,
158    LargePayload,
159}
160
161#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
162enum BlobCompression {
163    None,
164    Zlib,
165}
166
167#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
168pub struct BlobArtifactDescriptor {
169    pub kind: PersistedArtifactKind,
170    #[serde(default, skip_serializing_if = "Vec::is_empty")]
171    pub hints: Vec<BlobStorageHint>,
172}
173
174impl BlobArtifactDescriptor {
175    pub fn new(kind: PersistedArtifactKind, hints: impl Into<Vec<BlobStorageHint>>) -> Self {
176        Self {
177            kind,
178            hints: hints.into(),
179        }
180    }
181
182    pub fn checkpoint_manifest() -> Self {
183        Self::new(
184            PersistedArtifactKind::CheckpointManifest,
185            vec![BlobStorageHint::Compressible],
186        )
187    }
188
189    pub fn tool_state_snapshot() -> Self {
190        Self::new(
191            PersistedArtifactKind::ToolState,
192            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
193        )
194    }
195
196    pub fn plugin_session_snapshot() -> Self {
197        Self::new(
198            PersistedArtifactKind::PluginSessionSnapshot,
199            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
200        )
201    }
202
203    pub fn execution_state_snapshot() -> Self {
204        Self::new(
205            PersistedArtifactKind::ExecutionStateSnapshot,
206            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
207        )
208    }
209
210    pub fn lashlang_module() -> Self {
211        Self::new(
212            PersistedArtifactKind::LashlangModule,
213            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
214        )
215    }
216
217    pub fn process_execution_env() -> Self {
218        Self::new(
219            PersistedArtifactKind::ProcessExecutionEnv,
220            vec![BlobStorageHint::Compressible],
221        )
222    }
223}
224
225#[derive(Clone, Debug, PartialEq, Eq, Hash)]
226struct RetainedArtifactRef {
227    pub blob_ref: BlobRef,
228    pub kind: PersistedArtifactKind,
229}
230
231#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
232pub enum BuiltinBlobProfile {
233    LowLatency,
234    #[default]
235    Balanced,
236    Compact,
237}
238
239#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
240pub struct StoreGcPolicy {
241    pub auto_run_every_commits: Option<u64>,
242}
243
244#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
245pub struct StoreOptions {
246    pub blob_profile: BuiltinBlobProfile,
247    pub gc_policy: StoreGcPolicy,
248}
249
250#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
251struct StoredBlobEnvelope {
252    descriptor: BlobArtifactDescriptor,
253    compression: BlobCompression,
254    content: Vec<u8>,
255}
256
257#[derive(Clone, Debug)]
258pub struct StoredSessionCheckpoint {
259    pub checkpoint_ref: BlobRef,
260    pub manifest: SessionCheckpoint,
261}
262
263/// Explicit first-party factory for one SQLite session database per Lash
264/// session.
265///
266/// Hosts opt into this by passing it to `lash::LashCoreBuilder::store_factory`.
267/// The factory never becomes a default: app storage and runtime storage remain
268/// host-owned decisions.
269#[derive(Clone, Debug)]
270pub struct SqliteSessionStoreFactory {
271    root: PathBuf,
272    options: StoreOptions,
273}
274
275impl SqliteSessionStoreFactory {
276    pub fn new(root: impl Into<PathBuf>) -> Self {
277        Self {
278            root: root.into(),
279            options: StoreOptions::default(),
280        }
281    }
282
283    pub fn with_options(root: impl Into<PathBuf>, options: StoreOptions) -> Self {
284        Self {
285            root: root.into(),
286            options,
287        }
288    }
289
290    pub fn path_for_session(&self, session_id: &str) -> PathBuf {
291        self.root.join(safe_session_db_file_name(session_id))
292    }
293}
294
295#[async_trait::async_trait]
296impl SessionStoreFactory for SqliteSessionStoreFactory {
297    fn durability_tier(&self) -> DurabilityTier {
298        DurabilityTier::Durable
299    }
300
301    async fn create_store(
302        &self,
303        request: &SessionStoreCreateRequest,
304    ) -> Result<Arc<dyn RuntimePersistence>, String> {
305        std::fs::create_dir_all(&self.root).map_err(|err| err.to_string())?;
306        let path = self.path_for_session(&request.session_id);
307        let store = Arc::new(
308            Store::open_with_options(&path, self.options)
309                .await
310                .map_err(|err| err.to_string())?,
311        );
312        if store.load_session_meta().await.is_none() {
313            store
314                .save_session_meta(SessionMeta {
315                    session_id: request.session_id.clone(),
316                    session_name: request.session_id.clone(),
317                    created_at: current_timestamp_string(),
318                    model: request.policy.model.id.clone(),
319                    cwd: std::env::current_dir()
320                        .ok()
321                        .and_then(|path| path.to_str().map(str::to_string)),
322                    relation: request.relation.clone(),
323                })
324                .await;
325        }
326        Ok(store as Arc<dyn RuntimePersistence>)
327    }
328
329    async fn open_existing_store(
330        &self,
331        request: &SessionStoreCreateRequest,
332    ) -> Result<Option<Arc<dyn RuntimePersistence>>, String> {
333        let path = self.path_for_session(&request.session_id);
334        if !path.exists() {
335            return Ok(None);
336        }
337        self.create_store(request).await.map(Some)
338    }
339
340    async fn delete_session(&self, session_id: &str) -> Result<(), String> {
341        let db_path = self.path_for_session(session_id);
342        for path in [
343            db_path.clone(),
344            sqlite_sidecar_path(&db_path, "-wal"),
345            sqlite_sidecar_path(&db_path, "-shm"),
346        ] {
347            match std::fs::remove_file(&path) {
348                Ok(()) => {}
349                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
350                Err(err) => {
351                    return Err(format!("remove session store {}: {err}", path.display()));
352                }
353            }
354        }
355        Ok(())
356    }
357}
358
359fn safe_session_db_file_name(session_id: &str) -> String {
360    let mut safe = session_id
361        .chars()
362        .map(|ch| match ch {
363            'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => ch,
364            _ => '_',
365        })
366        .collect::<String>();
367    safe = safe.trim_matches('_').to_string();
368    if safe.is_empty() {
369        safe.push_str("session");
370    }
371    safe.truncate(80);
372    let hash = format!("{:x}", Sha256::digest(session_id.as_bytes()));
373    format!("{safe}-{}.db", &hash[..16])
374}
375
376fn sqlite_sidecar_path(path: &Path, suffix: &str) -> PathBuf {
377    let mut sidecar = path.as_os_str().to_os_string();
378    sidecar.push(suffix);
379    PathBuf::from(sidecar)
380}
381
382fn current_timestamp_string() -> String {
383    let now = SystemTime::now()
384        .duration_since(UNIX_EPOCH)
385        .unwrap_or_default();
386    format!("unix:{}", now.as_secs())
387}
388
389fn current_epoch_ms() -> u64 {
390    SystemTime::now()
391        .duration_since(UNIX_EPOCH)
392        .unwrap_or_default()
393        .as_millis() as u64
394}
395
396fn retained_artifact_refs(checkpoint: &SessionCheckpoint) -> Vec<RetainedArtifactRef> {
397    let mut refs = Vec::new();
398    if let Some(blob_ref) = &checkpoint.tool_state_ref {
399        refs.push(RetainedArtifactRef {
400            blob_ref: blob_ref.clone(),
401            kind: PersistedArtifactKind::ToolState,
402        });
403    }
404    if let Some(blob_ref) = &checkpoint.plugin_snapshot_ref {
405        refs.push(RetainedArtifactRef {
406            blob_ref: blob_ref.clone(),
407            kind: PersistedArtifactKind::PluginSessionSnapshot,
408        });
409    }
410    if let Some(blob_ref) = &checkpoint.execution_state_ref {
411        refs.push(RetainedArtifactRef {
412            blob_ref: blob_ref.clone(),
413            kind: PersistedArtifactKind::ExecutionStateSnapshot,
414        });
415    }
416    refs
417}
418
419fn session_head_meta(head: &SessionHead) -> SessionHeadMeta {
420    SessionHeadMeta {
421        session_id: head.session_id.clone(),
422        head_revision: 0,
423        config: head.config.clone(),
424        agent_frames: head.agent_frames.clone(),
425        current_agent_frame_id: head.current_agent_frame_id.clone(),
426        checkpoint_ref: head.checkpoint_ref.clone(),
427        leaf_node_id: head.graph.leaf_node_id.clone(),
428        graph_node_count: head.graph.nodes.len(),
429        token_ledger: Vec::new(),
430    }
431}
432
433fn encode_json<T: serde::Serialize>(value: &T) -> String {
434    serde_json::to_string(value).expect("persisted state should serialize")
435}
436
437fn should_compress_blob(
438    profile: BuiltinBlobProfile,
439    descriptor: &BlobArtifactDescriptor,
440    len: usize,
441) -> bool {
442    if !descriptor.hints.contains(&BlobStorageHint::Compressible) {
443        return false;
444    }
445    match profile {
446        BuiltinBlobProfile::LowLatency => false,
447        BuiltinBlobProfile::Balanced => len >= 4 * 1024,
448        BuiltinBlobProfile::Compact => len >= 1024,
449    }
450}
451
452fn compress_blob(content: &[u8]) -> Vec<u8> {
453    let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
454    std::io::Write::write_all(&mut encoder, content).expect("compress blob");
455    encoder.finish().expect("submit blob compression")
456}
457
458fn decompress_blob(content: &[u8]) -> Option<Vec<u8>> {
459    let mut decoder = ZlibDecoder::new(content);
460    let mut out = Vec::new();
461    std::io::Read::read_to_end(&mut decoder, &mut out).ok()?;
462    Some(out)
463}
464
465fn encode_artifact_blob(
466    descriptor: &BlobArtifactDescriptor,
467    profile: BuiltinBlobProfile,
468    content: &[u8],
469) -> Vec<u8> {
470    let (compression, stored_content) = if should_compress_blob(profile, descriptor, content.len())
471    {
472        (BlobCompression::Zlib, compress_blob(content))
473    } else {
474        (BlobCompression::None, content.to_vec())
475    };
476    encode_msgpack(&StoredBlobEnvelope {
477        descriptor: descriptor.clone(),
478        compression,
479        content: stored_content,
480    })
481}
482
483fn decode_artifact_blob(bytes: &[u8]) -> Option<Vec<u8>> {
484    let envelope = decode_msgpack::<StoredBlobEnvelope>(bytes)?;
485    match envelope.compression {
486        BlobCompression::None => Some(envelope.content),
487        BlobCompression::Zlib => decompress_blob(&envelope.content),
488    }
489}
490
491/// Read the session head meta off a raw connection. Synchronous because it runs
492/// inside a `conn.call`/`conn.write` closure on the connection thread.
493fn try_load_session_head_meta_from_conn(
494    conn: &Connection,
495) -> Result<Option<SessionHeadMeta>, StoreError> {
496    let row = conn
497        .query_row(
498            "SELECT head_json, head_revision FROM session_head WHERE singleton = 1",
499            [],
500            |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)),
501        )
502        .optional()
503        .map_err(sqlite_error)?;
504    let Some((head_json, head_revision)) = row else {
505        return Ok(None);
506    };
507    let mut meta: SessionHeadMeta = serde_json::from_str(&head_json)
508        .map_err(|err| StoreError::Backend(format!("decode session head: {err}")))?;
509    meta.head_revision = head_revision as u64;
510    Ok(Some(meta))
511}
512
513fn load_session_head_meta_from_conn(conn: &Connection) -> Option<SessionHeadMeta> {
514    try_load_session_head_meta_from_conn(conn).ok().flatten()
515}
516
517fn load_session_meta_from_conn(conn: &Connection) -> Option<SessionMeta> {
518    conn.query_row(
519        "SELECT session_id, session_name, created_at, model, cwd, relation_json
520         FROM session_meta WHERE singleton = 1",
521        [],
522        |row| {
523            let relation_json: Option<String> = row.get(5)?;
524            let relation = relation_json
525                .and_then(|json| serde_json::from_str(&json).ok())
526                .unwrap_or_default();
527            Ok(SessionMeta {
528                session_id: row.get(0)?,
529                session_name: row.get(1)?,
530                created_at: row.get(2)?,
531                model: row.get(3)?,
532                cwd: row.get(4)?,
533                relation,
534            })
535        },
536    )
537    .optional()
538    .ok()
539    .flatten()
540}
541
542fn decode_checkpoint(bytes: &[u8]) -> Option<SessionCheckpoint> {
543    rmp_serde::from_slice(bytes).ok()
544}
545
546fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
547    // Pre-size the buffer so the per-byte writes inside rmp_serde don't
548    // walk the Vec through 0→4→8→16→32… reallocations on every call.
549    let mut buf = Vec::with_capacity(1024);
550    rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
551    buf
552}
553
554fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
555    rmp_serde::from_slice(bytes).ok()
556}
557
558fn merge_token_ledger_entries(
559    entries: Vec<lash_core::TokenLedgerEntry>,
560) -> Vec<lash_core::TokenLedgerEntry> {
561    let mut merged: Vec<lash_core::TokenLedgerEntry> = Vec::new();
562    for entry in entries {
563        if entry.usage.total() == 0 {
564            continue;
565        }
566        if let Some(existing) = merged
567            .iter_mut()
568            .find(|existing| existing.source == entry.source && existing.model == entry.model)
569        {
570            existing.usage.input_tokens += entry.usage.input_tokens;
571            existing.usage.output_tokens += entry.usage.output_tokens;
572            existing.usage.cached_input_tokens += entry.usage.cached_input_tokens;
573            existing.usage.reasoning_tokens += entry.usage.reasoning_tokens;
574        } else {
575            merged.push(entry);
576        }
577    }
578    merged
579}
580
581#[cfg(test)]
582mod tests {
583    use super::*;
584    use lash_core::ProcessInput;
585    use lashlang::LashlangArtifactStore;
586
587    fn registration(id: &str) -> ProcessRegistration {
588        ProcessRegistration::new(
589            id,
590            ProcessInput::External {
591                metadata: serde_json::Value::Null,
592            },
593            lash_core::ProcessProvenance::session(lash_core::SessionScope::new("session")),
594        )
595    }
596
597    #[tokio::test]
598    async fn sqlite_lashlang_artifact_store_round_trips_verified_module_artifacts() {
599        let store = Store::memory().await.expect("memory store");
600        let module =
601            lashlang::parse("process scan(root: str) { finish root }").expect("parse module");
602        let linked = lashlang::LinkedModule::link(
603            module,
604            lashlang::LashlangHostEnvironment::new(
605                lashlang::LashlangHostCatalog::new(),
606                lashlang::LashlangAbilities::all(),
607            ),
608        )
609        .expect("link module");
610
611        store
612            .put_module_artifact(&linked.artifact)
613            .await
614            .expect("put artifact");
615        let restored = store
616            .get_module_artifact(&linked.module_ref)
617            .await
618            .expect("get artifact")
619            .expect("artifact exists");
620
621        assert_eq!(restored.module_ref, linked.module_ref);
622        assert_eq!(
623            restored.process_ref("scan"),
624            linked.artifact.process_ref("scan")
625        );
626    }
627
628    #[tokio::test]
629    async fn sqlite_process_registry_persists_rows_after_reopen() {
630        let dir = tempfile::tempdir().expect("tempdir");
631        let path = dir.path().join("processes.db");
632        {
633            let registry = SqliteProcessRegistry::open(&path)
634                .await
635                .expect("open registry");
636            let session_scope = lash_core::SessionScope::new("session");
637            registry
638                .register_process(registration("proc-persist"))
639                .await
640                .expect("register");
641            registry
642                .grant_handle(
643                    &session_scope,
644                    "proc-persist",
645                    ProcessHandleDescriptor::new(Some("tool"), Some("demo")),
646                )
647                .await
648                .expect("grant");
649            registry
650                .complete_process(
651                    "proc-persist",
652                    ProcessAwaitOutput::Success {
653                        value: serde_json::json!({"ok": true}),
654                        control: None,
655                    },
656                )
657                .await
658                .expect("complete");
659        }
660
661        let registry = SqliteProcessRegistry::open(&path)
662            .await
663            .expect("reopen registry");
664        let session_scope = lash_core::SessionScope::new("session");
665        let record = registry
666            .get_process("proc-persist")
667            .await
668            .expect("persisted process");
669
670        assert_eq!(record.originator_scope_id(), session_scope.id().as_str());
671        assert_eq!(
672            record.provenance.originator,
673            lash_core::ProcessOriginator::session(session_scope.clone())
674        );
675        assert_eq!(
676            registry
677                .await_process("proc-persist")
678                .await
679                .expect("await persisted"),
680            ProcessAwaitOutput::Success {
681                value: serde_json::json!({"ok": true}),
682                control: None,
683            }
684        );
685        assert_eq!(
686            registry
687                .list_handle_grants(&session_scope)
688                .await
689                .expect("grants")
690                .len(),
691            1
692        );
693    }
694}