Skip to main content

lash_sqlite_store/
lib.rs

1//! # lash-sqlite-store
2//!
3//! The high-performance local **durable** persistence backend for the lash
4//! agent runtime. One SQLite database per session, opened in WAL journal mode
5//! with a 15-second busy timeout, satisfying the full [`RuntimePersistence`] +
6//! [`AttachmentManifest`] contract from `lash-core`.
7//!
8//! This crate is a drop-in replacement for `lash-sqlite-store`: it exposes the
9//! same public surface (`Store`, `SqliteProcessRegistry`,
10//! `SqliteSessionStoreFactory`, `SqliteEffectHost`, the option/descriptor types)
11//! with identical async signatures, so a consumer swaps backends by renaming
12//! the crate path only. The difference is the engine underneath: tokio-rusqlite
13//! over a statically-linked SQLite with real WAL (`-wal`/`-shm` sidecars,
14//! multi-process readers + single writer) instead of the prior store's experimental mvcc.
15//!
16//! ## Why this is "the durable backend" not just "an option"
17//!
18//! Lash's runtime layer treats persistence as a first-class boundary, not a
19//! debug-only convenience. Every primitive that lets the runtime survive a
20//! crash — head-revision CAS, final turn-commit idempotency, attachment
21//! write-ahead manifests, blob content-addressing with optional compression —
22//! is implemented in this crate against SQLite for one reason: SQLite is the
23//! simplest backend that gives us *atomic multi-statement transactions on a
24//! single file* with durability guarantees we can reason about.
25//!
26//! ## Schema cutover, not migrations
27//!
28//! There is exactly one supported schema (see [`schema::SCHEMA`]). Older
29//! databases must be deleted before opening — we do not carry migration code.
30//!
31//! [`RuntimePersistence`]: lash_core::RuntimePersistence
32//! [`AttachmentManifest`]: lash_core::AttachmentManifest
33
34use std::collections::BTreeMap;
35use std::path::{Path, PathBuf};
36use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
37use std::sync::{Arc, Mutex};
38use std::time::{Duration, SystemTime, UNIX_EPOCH};
39
40use flate2::Compression;
41use flate2::read::ZlibDecoder;
42use flate2::write::ZlibEncoder;
43use lash_core::runtime::ProcessHandleGrantEntry;
44use lash_core::runtime::{
45    QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
46    QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, prepare_process_event_append,
47    prepare_process_registration,
48};
49use lash_core::store::queued_work::{
50    ClaimCandidate, QueuedWorkClaimLease, claim_scan_limit, derive_batch_id,
51    ensure_completion_owns_all_batches, renewed_claim, select_claim_prefix,
52};
53use lash_core::store::{
54    GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
55    RuntimeCommitResult, SessionCheckpoint, SessionHead, SessionHeadMeta,
56};
57use lash_core::{
58    AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
59    DeliveryPolicy, DurabilityTier, GcReport, MergeKey, PROCESS_LEASE_SCHEMA_VERSION,
60    ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest, ProcessEventAppendResult,
61    ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease,
62    ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry, ProcessScope,
63    RuntimePersistence, SessionMeta, SessionPickerInfo, SessionReadScope,
64    SessionStoreCreateRequest, SessionStoreFactory, SlotPolicy, StoreError, VacuumReport,
65};
66use rusqlite::{Connection, OptionalExtension, Transaction, params};
67use sha2::{Digest, Sha256};
68
69use conn::SqliteConnection;
70
71mod attachments;
72mod blobs;
73mod conn;
74mod effect_replay;
75mod graph;
76mod host_events;
77mod leases;
78mod lifecycle;
79mod persistence;
80mod process_registry;
81mod queued_work;
82mod schema;
83
84use conn::TxOutcome;
85pub use effect_replay::{
86    SqliteEffectHost, SqliteEffectReplayOptions, SqliteRuntimeEffectController,
87};
88pub use host_events::SqliteHostEventStore;
89use leases::*;
90use queued_work::*;
91use schema::{
92    StoreBacking, apply_pragmas, ensure_effect_schema, ensure_host_event_schema,
93    ensure_process_schema, ensure_schema,
94};
95
96/// SQLite-backed store for checkpoint blobs and the canonical session head.
97///
98/// The struct name and every public method match `lash_sqlite_store::Store`
99/// exactly so consumers swap backends with a path rename. Internally it holds a
100/// single cloneable [`SqliteConnection`] (a tokio-rusqlite handle to one
101/// database thread) rather than the prior store's `tokio::sync::Mutex<rusqlite::Connection>`.
102pub struct Store {
103    conn: SqliteConnection,
104    artifact_cache: Mutex<BTreeMap<lashlang::ModuleRef, Arc<lashlang::ModuleArtifact>>>,
105    options: StoreOptions,
106    commit_count: AtomicU64,
107}
108
109/// SQLite-backed process registry for one configured runtime deployment.
110///
111/// Named `SqliteProcessRegistry` so the path-rename swap keeps compiling; this is
112/// the SQLite implementation. It is intentionally separate from [`Store`]:
113/// session databases persist one conversation, while this registry persists
114/// background process state and handle visibility across all sessions in the
115/// same host profile.
116pub struct SqliteProcessRegistry {
117    conn: SqliteConnection,
118    notify: tokio::sync::Notify,
119}
120
121fn sqlite_error(err: rusqlite::Error) -> StoreError {
122    StoreError::Backend(err.to_string())
123}
124
125fn process_sqlite_error(err: rusqlite::Error) -> lash_core::PluginError {
126    lash_core::PluginError::Session(err.to_string())
127}
128
129fn process_decode_error(err: serde_json::Error) -> lash_core::PluginError {
130    lash_core::PluginError::Session(format!("failed to decode process registry row: {err}"))
131}
132
133fn process_encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
134    serde_json::to_string(value).map_err(|err| {
135        lash_core::PluginError::Session(format!("failed to encode process row: {err}"))
136    })
137}
138
139fn block_on_store<T>(future: impl std::future::Future<Output = T>) -> T {
140    futures_executor::block_on(future)
141}
142
143#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
144pub enum PersistedArtifactKind {
145    GenericBlob,
146    CheckpointManifest,
147    ToolState,
148    PluginSessionSnapshot,
149    ExecutionStateSnapshot,
150    LashlangModule,
151}
152
153#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
154pub enum BlobStorageHint {
155    Compressible,
156    InlinePreferred,
157    LargePayload,
158}
159
160#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
161enum BlobCompression {
162    None,
163    Zlib,
164}
165
166#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
167pub struct BlobArtifactDescriptor {
168    pub kind: PersistedArtifactKind,
169    #[serde(default, skip_serializing_if = "Vec::is_empty")]
170    pub hints: Vec<BlobStorageHint>,
171}
172
173impl BlobArtifactDescriptor {
174    pub fn new(kind: PersistedArtifactKind, hints: impl Into<Vec<BlobStorageHint>>) -> Self {
175        Self {
176            kind,
177            hints: hints.into(),
178        }
179    }
180
181    pub fn checkpoint_manifest() -> Self {
182        Self::new(
183            PersistedArtifactKind::CheckpointManifest,
184            vec![BlobStorageHint::Compressible],
185        )
186    }
187
188    pub fn tool_state_snapshot() -> Self {
189        Self::new(
190            PersistedArtifactKind::ToolState,
191            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
192        )
193    }
194
195    pub fn plugin_session_snapshot() -> Self {
196        Self::new(
197            PersistedArtifactKind::PluginSessionSnapshot,
198            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
199        )
200    }
201
202    pub fn execution_state_snapshot() -> Self {
203        Self::new(
204            PersistedArtifactKind::ExecutionStateSnapshot,
205            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
206        )
207    }
208
209    pub fn lashlang_module() -> Self {
210        Self::new(
211            PersistedArtifactKind::LashlangModule,
212            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
213        )
214    }
215}
216
217#[derive(Clone, Debug, PartialEq, Eq, Hash)]
218struct RetainedArtifactRef {
219    pub blob_ref: BlobRef,
220    pub kind: PersistedArtifactKind,
221}
222
223#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
224pub enum BuiltinBlobProfile {
225    LowLatency,
226    #[default]
227    Balanced,
228    Compact,
229}
230
231#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
232pub struct StoreGcPolicy {
233    pub auto_run_every_commits: Option<u64>,
234}
235
236#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
237pub struct StoreOptions {
238    pub blob_profile: BuiltinBlobProfile,
239    pub gc_policy: StoreGcPolicy,
240}
241
242#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
243struct StoredBlobEnvelope {
244    descriptor: BlobArtifactDescriptor,
245    compression: BlobCompression,
246    content: Vec<u8>,
247}
248
249#[derive(Clone, Debug)]
250pub struct StoredSessionCheckpoint {
251    pub checkpoint_ref: BlobRef,
252    pub manifest: SessionCheckpoint,
253}
254
255/// Explicit first-party factory for one SQLite session database per Lash
256/// session.
257///
258/// Named `SqliteSessionStoreFactory` so the path-rename swap keeps compiling.
259/// Hosts opt into this by passing it to `lash::LashCoreBuilder::store_factory`.
260/// The factory never becomes a default: app storage and runtime storage remain
261/// host-owned decisions.
262#[derive(Clone, Debug)]
263pub struct SqliteSessionStoreFactory {
264    root: PathBuf,
265    options: StoreOptions,
266}
267
268impl SqliteSessionStoreFactory {
269    pub fn new(root: impl Into<PathBuf>) -> Self {
270        Self {
271            root: root.into(),
272            options: StoreOptions::default(),
273        }
274    }
275
276    pub fn with_options(root: impl Into<PathBuf>, options: StoreOptions) -> Self {
277        Self {
278            root: root.into(),
279            options,
280        }
281    }
282
283    pub fn path_for_session(&self, session_id: &str) -> PathBuf {
284        self.root.join(safe_session_db_file_name(session_id))
285    }
286}
287
288#[async_trait::async_trait]
289impl SessionStoreFactory for SqliteSessionStoreFactory {
290    fn durability_tier(&self) -> DurabilityTier {
291        DurabilityTier::Durable
292    }
293
294    async fn create_store(
295        &self,
296        request: &SessionStoreCreateRequest,
297    ) -> Result<Arc<dyn RuntimePersistence>, String> {
298        std::fs::create_dir_all(&self.root).map_err(|err| err.to_string())?;
299        let path = self.path_for_session(&request.session_id);
300        let store = Arc::new(
301            Store::open_with_options(&path, self.options)
302                .await
303                .map_err(|err| err.to_string())?,
304        );
305        if store.load_session_meta().await.is_none() {
306            store
307                .save_session_meta(SessionMeta {
308                    session_id: request.session_id.clone(),
309                    session_name: request.session_id.clone(),
310                    created_at: current_timestamp_string(),
311                    model: request.policy.model.id.clone(),
312                    cwd: std::env::current_dir()
313                        .ok()
314                        .and_then(|path| path.to_str().map(str::to_string)),
315                    relation: request.relation.clone(),
316                })
317                .await;
318        }
319        Ok(store as Arc<dyn RuntimePersistence>)
320    }
321
322    async fn delete_session(&self, session_id: &str) -> Result<(), String> {
323        let db_path = self.path_for_session(session_id);
324        for path in [
325            db_path.clone(),
326            sqlite_sidecar_path(&db_path, "-wal"),
327            sqlite_sidecar_path(&db_path, "-shm"),
328        ] {
329            match std::fs::remove_file(&path) {
330                Ok(()) => {}
331                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
332                Err(err) => {
333                    return Err(format!("remove session store {}: {err}", path.display()));
334                }
335            }
336        }
337        Ok(())
338    }
339}
340
341fn safe_session_db_file_name(session_id: &str) -> String {
342    let mut safe = session_id
343        .chars()
344        .map(|ch| match ch {
345            'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => ch,
346            _ => '_',
347        })
348        .collect::<String>();
349    safe = safe.trim_matches('_').to_string();
350    if safe.is_empty() {
351        safe.push_str("session");
352    }
353    safe.truncate(80);
354    let hash = format!("{:x}", Sha256::digest(session_id.as_bytes()));
355    format!("{safe}-{}.db", &hash[..16])
356}
357
358fn sqlite_sidecar_path(path: &Path, suffix: &str) -> PathBuf {
359    let mut sidecar = path.as_os_str().to_os_string();
360    sidecar.push(suffix);
361    PathBuf::from(sidecar)
362}
363
364fn current_timestamp_string() -> String {
365    let now = SystemTime::now()
366        .duration_since(UNIX_EPOCH)
367        .unwrap_or_default();
368    format!("unix:{}", now.as_secs())
369}
370
371fn current_epoch_ms() -> u64 {
372    SystemTime::now()
373        .duration_since(UNIX_EPOCH)
374        .unwrap_or_default()
375        .as_millis() as u64
376}
377
378fn retained_artifact_refs(checkpoint: &SessionCheckpoint) -> Vec<RetainedArtifactRef> {
379    let mut refs = Vec::new();
380    if let Some(blob_ref) = &checkpoint.tool_state_ref {
381        refs.push(RetainedArtifactRef {
382            blob_ref: blob_ref.clone(),
383            kind: PersistedArtifactKind::ToolState,
384        });
385    }
386    if let Some(blob_ref) = &checkpoint.plugin_snapshot_ref {
387        refs.push(RetainedArtifactRef {
388            blob_ref: blob_ref.clone(),
389            kind: PersistedArtifactKind::PluginSessionSnapshot,
390        });
391    }
392    if let Some(blob_ref) = &checkpoint.execution_state_ref {
393        refs.push(RetainedArtifactRef {
394            blob_ref: blob_ref.clone(),
395            kind: PersistedArtifactKind::ExecutionStateSnapshot,
396        });
397    }
398    refs
399}
400
401fn session_head_meta(head: &SessionHead) -> SessionHeadMeta {
402    SessionHeadMeta {
403        session_id: head.session_id.clone(),
404        head_revision: 0,
405        config: head.config.clone(),
406        agent_frames: head.agent_frames.clone(),
407        current_agent_frame_id: head.current_agent_frame_id.clone(),
408        checkpoint_ref: head.checkpoint_ref.clone(),
409        leaf_node_id: head.graph.leaf_node_id.clone(),
410        graph_node_count: head.graph.nodes.len(),
411        token_ledger: Vec::new(),
412    }
413}
414
415fn encode_json<T: serde::Serialize>(value: &T) -> String {
416    serde_json::to_string(value).expect("persisted state should serialize")
417}
418
419fn should_compress_blob(
420    profile: BuiltinBlobProfile,
421    descriptor: &BlobArtifactDescriptor,
422    len: usize,
423) -> bool {
424    if !descriptor.hints.contains(&BlobStorageHint::Compressible) {
425        return false;
426    }
427    match profile {
428        BuiltinBlobProfile::LowLatency => false,
429        BuiltinBlobProfile::Balanced => len >= 4 * 1024,
430        BuiltinBlobProfile::Compact => len >= 1024,
431    }
432}
433
434fn compress_blob(content: &[u8]) -> Vec<u8> {
435    let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
436    std::io::Write::write_all(&mut encoder, content).expect("compress blob");
437    encoder.finish().expect("submit blob compression")
438}
439
440fn decompress_blob(content: &[u8]) -> Option<Vec<u8>> {
441    let mut decoder = ZlibDecoder::new(content);
442    let mut out = Vec::new();
443    std::io::Read::read_to_end(&mut decoder, &mut out).ok()?;
444    Some(out)
445}
446
447fn encode_artifact_blob(
448    descriptor: &BlobArtifactDescriptor,
449    profile: BuiltinBlobProfile,
450    content: &[u8],
451) -> Vec<u8> {
452    let (compression, stored_content) = if should_compress_blob(profile, descriptor, content.len())
453    {
454        (BlobCompression::Zlib, compress_blob(content))
455    } else {
456        (BlobCompression::None, content.to_vec())
457    };
458    encode_msgpack(&StoredBlobEnvelope {
459        descriptor: descriptor.clone(),
460        compression,
461        content: stored_content,
462    })
463}
464
465fn decode_artifact_blob(bytes: &[u8]) -> Option<Vec<u8>> {
466    let envelope = decode_msgpack::<StoredBlobEnvelope>(bytes)?;
467    match envelope.compression {
468        BlobCompression::None => Some(envelope.content),
469        BlobCompression::Zlib => decompress_blob(&envelope.content),
470    }
471}
472
473/// Read the session head meta off a raw connection. Synchronous because it runs
474/// inside a `conn.call`/`conn.write` closure on the connection thread.
475fn try_load_session_head_meta_from_conn(
476    conn: &Connection,
477) -> Result<Option<SessionHeadMeta>, StoreError> {
478    let row = conn
479        .query_row(
480            "SELECT head_json, head_revision FROM session_head WHERE singleton = 1",
481            [],
482            |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)),
483        )
484        .optional()
485        .map_err(sqlite_error)?;
486    let Some((head_json, head_revision)) = row else {
487        return Ok(None);
488    };
489    let mut meta: SessionHeadMeta = serde_json::from_str(&head_json)
490        .map_err(|err| StoreError::Backend(format!("decode session head: {err}")))?;
491    meta.head_revision = head_revision as u64;
492    Ok(Some(meta))
493}
494
495fn load_session_head_meta_from_conn(conn: &Connection) -> Option<SessionHeadMeta> {
496    try_load_session_head_meta_from_conn(conn).ok().flatten()
497}
498
499fn load_session_meta_from_conn(conn: &Connection) -> Option<SessionMeta> {
500    conn.query_row(
501        "SELECT session_id, session_name, created_at, model, cwd, relation_json
502         FROM session_meta WHERE singleton = 1",
503        [],
504        |row| {
505            let relation_json: Option<String> = row.get(5)?;
506            let relation = relation_json
507                .and_then(|json| serde_json::from_str(&json).ok())
508                .unwrap_or_default();
509            Ok(SessionMeta {
510                session_id: row.get(0)?,
511                session_name: row.get(1)?,
512                created_at: row.get(2)?,
513                model: row.get(3)?,
514                cwd: row.get(4)?,
515                relation,
516            })
517        },
518    )
519    .optional()
520    .ok()
521    .flatten()
522}
523
524fn decode_checkpoint(bytes: &[u8]) -> Option<SessionCheckpoint> {
525    rmp_serde::from_slice(bytes).ok()
526}
527
528fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
529    // Pre-size the buffer so the per-byte writes inside rmp_serde don't
530    // walk the Vec through 0→4→8→16→32… reallocations on every call.
531    let mut buf = Vec::with_capacity(1024);
532    rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
533    buf
534}
535
536fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
537    rmp_serde::from_slice(bytes).ok()
538}
539
540fn merge_token_ledger_entries(
541    entries: Vec<lash_core::TokenLedgerEntry>,
542) -> Vec<lash_core::TokenLedgerEntry> {
543    let mut merged: Vec<lash_core::TokenLedgerEntry> = Vec::new();
544    for entry in entries {
545        if entry.usage.total() == 0 {
546            continue;
547        }
548        if let Some(existing) = merged
549            .iter_mut()
550            .find(|existing| existing.source == entry.source && existing.model == entry.model)
551        {
552            existing.usage.input_tokens += entry.usage.input_tokens;
553            existing.usage.output_tokens += entry.usage.output_tokens;
554            existing.usage.cached_input_tokens += entry.usage.cached_input_tokens;
555            existing.usage.reasoning_tokens += entry.usage.reasoning_tokens;
556        } else {
557            merged.push(entry);
558        }
559    }
560    merged
561}
562
563#[cfg(test)]
564mod tests {
565    use super::*;
566    use lash_core::ProcessInput;
567    use lashlang::LashlangArtifactStore;
568
569    fn registration(id: &str) -> ProcessRegistration {
570        ProcessRegistration::new(
571            id,
572            ProcessInput::External {
573                metadata: serde_json::Value::Null,
574            },
575        )
576        .with_process_provenance(lash_core::ProcessProvenance::new(
577            lash_core::ProcessScope::new("session"),
578            "test-host",
579        ))
580    }
581
582    #[tokio::test]
583    async fn sqlite_lashlang_artifact_store_round_trips_verified_module_artifacts() {
584        let store = Store::memory().await.expect("memory store");
585        let module =
586            lashlang::parse("process scan(root: str) { finish root }").expect("parse module");
587        let linked = lashlang::LinkedModule::link(
588            module,
589            lashlang::LashlangSurface::new(
590                lashlang::ResourceCatalog::new(),
591                lashlang::LashlangAbilities::all(),
592            ),
593        )
594        .expect("link module");
595
596        store
597            .put_module_artifact(&linked.artifact)
598            .await
599            .expect("put artifact");
600        let restored = store
601            .get_module_artifact(&linked.module_ref)
602            .await
603            .expect("get artifact")
604            .expect("artifact exists");
605
606        assert_eq!(restored.module_ref, linked.module_ref);
607        assert_eq!(
608            restored.process_ref("scan"),
609            linked.artifact.process_ref("scan")
610        );
611    }
612
613    #[tokio::test]
614    async fn sqlite_process_registry_persists_rows_after_reopen() {
615        let dir = tempfile::tempdir().expect("tempdir");
616        let path = dir.path().join("processes.db");
617        {
618            let registry = SqliteProcessRegistry::open(&path)
619                .await
620                .expect("open registry");
621            let owner_scope = lash_core::ProcessScope::new("session");
622            registry
623                .register_process(registration("proc-persist"))
624                .await
625                .expect("register");
626            registry
627                .grant_handle(
628                    &owner_scope,
629                    "proc-persist",
630                    ProcessHandleDescriptor::new(Some("tool"), Some("demo")),
631                )
632                .await
633                .expect("grant");
634            registry
635                .complete_process(
636                    "proc-persist",
637                    ProcessAwaitOutput::Success {
638                        value: serde_json::json!({"ok": true}),
639                        control: None,
640                    },
641                )
642                .await
643                .expect("complete");
644        }
645
646        let registry = SqliteProcessRegistry::open(&path)
647            .await
648            .expect("reopen registry");
649        let owner_scope = lash_core::ProcessScope::new("session");
650        let record = registry
651            .get_process("proc-persist")
652            .await
653            .expect("persisted process");
654
655        assert_eq!(record.owner_scope_id(), owner_scope.id());
656        assert_eq!(record.provenance.owner_scope.session_id.as_str(), "session");
657        assert_eq!(
658            registry
659                .await_process("proc-persist")
660                .await
661                .expect("await persisted"),
662            ProcessAwaitOutput::Success {
663                value: serde_json::json!({"ok": true}),
664                control: None,
665            }
666        );
667        assert_eq!(
668            registry
669                .list_handle_grants(&owner_scope)
670                .await
671                .expect("grants")
672                .len(),
673            1
674        );
675    }
676}