Skip to main content

elastik_core/
engine_introspection.rs

1//! Protocol-neutral Engine introspection surface.
2//!
3//! This module owns the typed snapshots behind generated `proc/*` surfaces.
4//! Engine returns structured data; adapters choose their own rendering.
5
6#![cfg_attr(not(feature = "unstable-engine"), allow(dead_code))]
7
8use std::{fmt, sync::atomic::Ordering};
9
10use crate::{
11    auth,
12    engine::{self, Engine, EngineError},
13    engine_ops::{log_storage_error, EngineOps},
14    engine_types::{AccessTier, ValidatedWorldPath},
15    store, world, AuthGate,
16};
17
18/// Validated `/proc/*` introspection endpoint.
19///
20/// This proves the adapter selected one of the declared proc surfaces before
21/// asking Engine for metadata. Reserved or misspelled proc paths stay outside
22/// the Engine as adapter rendering concerns.
23#[derive(Clone, Debug, PartialEq, Eq)]
24pub struct ValidatedProcPath {
25    endpoint: ProcEndpoint,
26    audit_world: Option<ValidatedWorldPath>,
27}
28
29/// Returned when a string is not one of Engine's known proc endpoints.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub struct InvalidProcPath;
32
33/// Stable proc endpoint identity carried by [`ValidatedProcPath`].
34#[derive(Clone, Copy, Debug, PartialEq, Eq)]
35#[non_exhaustive]
36pub enum ProcEndpoint {
37    /// Engine build version string.
38    Version,
39    /// Enumerate every world (durable + in-memory).
40    Worlds,
41    /// Per-world body byte size (`du`-like).
42    Du,
43    /// Aggregate storage + memory usage and quotas (`df`-like).
44    Df,
45    /// Read cache + ledger writer pool counters.
46    Pool,
47    /// Verify a single world's HMAC audit chain.
48    AuditVerify,
49}
50
51/// One world-size row for engine introspection.
52#[non_exhaustive]
53pub struct WorldUsage {
54    /// Canonical world path.
55    pub world: ValidatedWorldPath,
56    /// Body byte size for this world.
57    pub bytes: usize,
58}
59
60/// Aggregate storage/memory snapshot.
61///
62/// Returned by [`crate::Engine::df`]. Quotas of `0`/`None` mean "unlimited";
63/// adapters should render that string for operator-facing output.
64#[non_exhaustive]
65pub struct DfSnapshot {
66    /// Bytes used by durable bodies (SQLite-backed worlds).
67    pub storage_used: usize,
68    /// Configured durable quota, or `None` if unlimited.
69    pub storage_quota: Option<usize>,
70    /// Bytes used by in-memory bodies.
71    pub memory_used: usize,
72    /// Configured memory cap. `0` means unlimited.
73    pub memory_quota: usize,
74    /// Total live worlds (durable + in-memory).
75    pub worlds: usize,
76}
77
78/// Read-cache + ledger-writer snapshot.
79///
80/// Returned by [`crate::Engine::pool`]. All counters are monotonic since
81/// process start.
82#[non_exhaustive]
83pub struct PoolSnapshot {
84    /// Active read-cache entries (open SQLite connections currently parked).
85    pub read_cache_entries: usize,
86    /// Tombstoned entries waiting on in-flight reads to drain.
87    pub read_cache_tombstones: usize,
88    /// Cache hits since process start.
89    ///
90    /// A hit means a Phase 1 cached slot returned a definitive answer:
91    /// `Some` from a Ready slot, or `None` from Tombstone. Opening and
92    /// Evicted retry signals are not hits.
93    pub read_cache_hits: usize,
94    /// Cache misses since process start.
95    ///
96    /// Counted at most once per external read, even if internal retry loops run
97    /// more than once. Under races, one external read may independently count a
98    /// miss and later a hit if another reader installs the target slot first.
99    pub read_cache_misses: usize,
100    /// Times a read saw the cache at cap after a miss.
101    ///
102    /// Counted at most once per external read. The read may still evict a
103    /// sampled cold slot instead of falling back to a transient slot.
104    pub read_cache_capped: usize,
105    /// Successful cap-full read-cache evictions since process start. Failed
106    /// eviction attempts fall back to transient reads and are not counted.
107    pub read_cache_evictions: usize,
108    /// Read-cache slot open failures (SQLite open errors).
109    pub read_cache_open_fails: usize,
110    /// Configured maximum cache entries.
111    pub read_cache_max_entries: usize,
112    /// Ledger writer (re)initializations since process start.
113    pub ledger_writer_inits: usize,
114}
115
116/// Successful audit-chain verification details.
117#[non_exhaustive]
118pub struct AuditValid {
119    /// Number of events on the chain.
120    pub events: usize,
121    /// HMAC of the genesis event.
122    pub genesis: String,
123    /// HMAC of the most recent event.
124    pub latest: String,
125}
126
127/// Audit-chain break details.
128#[non_exhaustive]
129pub struct AuditBroken {
130    /// Event id at which verification failed.
131    pub break_at: usize,
132    /// HMAC the chain expected at that position.
133    pub expected: String,
134    /// HMAC actually stored at that position.
135    pub actual: String,
136}
137
138/// Result of [`crate::Engine::verify_audit`].
139#[non_exhaustive]
140pub enum AuditVerify {
141    /// Chain verified end-to-end.
142    Valid(AuditValid),
143    /// Chain failed verification; see [`AuditBroken`] for the break point.
144    Broken(AuditBroken),
145    /// World does not have an audit chain (e.g. an in-memory `tmp/` world).
146    NotApplicable,
147}
148
149struct IntrospectionPermit {
150    path: ValidatedProcPath,
151}
152
153impl ValidatedProcPath {
154    /// Parses a wire-shaped proc path into a sealed endpoint identity.
155    ///
156    /// Accepts `proc/version`, `proc/worlds`, `proc/du`, `proc/df`,
157    /// `proc/pool`, or `proc/audit/<world>/verify`. Leading and trailing
158    /// slashes are tolerated. The audit world segment must already be a
159    /// canonical Engine path such as `home/foo`; bare names like `foo`, wire
160    /// paths like `/home/foo`, and generated `proc/*` worlds are rejected.
161    ///
162    /// # Errors
163    /// Returns [`InvalidProcPath`] if the input is not one of the declared
164    /// endpoints, or if the audit-verify world fails canonical-path
165    /// validation.
166    pub fn new(raw: impl AsRef<str>) -> Result<Self, InvalidProcPath> {
167        let raw = raw.as_ref().trim_matches('/');
168        match raw {
169            "proc/version" => Ok(Self::version()),
170            "proc/worlds" => Ok(Self::worlds()),
171            "proc/du" => Ok(Self::du()),
172            "proc/df" => Ok(Self::df()),
173            "proc/pool" => Ok(Self::pool()),
174            _ => {
175                let Some(world) = raw
176                    .strip_prefix("proc/audit/")
177                    .and_then(|value| value.strip_suffix("/verify"))
178                else {
179                    return Err(InvalidProcPath);
180                };
181                if world.trim_matches('/').is_empty() {
182                    return Err(InvalidProcPath);
183                }
184                let world = world.trim_end_matches('/').to_owned();
185                let world =
186                    ValidatedWorldPath::from_canonical(world).map_err(|_| InvalidProcPath)?;
187                Ok(Self::audit_verify(world))
188            }
189        }
190    }
191
192    /// Returns the proof token for the `version` endpoint.
193    pub fn version() -> Self {
194        Self {
195            endpoint: ProcEndpoint::Version,
196            audit_world: None,
197        }
198    }
199
200    /// Returns the proof token for the `worlds` endpoint.
201    pub fn worlds() -> Self {
202        Self {
203            endpoint: ProcEndpoint::Worlds,
204            audit_world: None,
205        }
206    }
207
208    /// Returns the proof token for the `du` endpoint.
209    pub fn du() -> Self {
210        Self {
211            endpoint: ProcEndpoint::Du,
212            audit_world: None,
213        }
214    }
215
216    /// Returns the proof token for the `df` endpoint.
217    pub fn df() -> Self {
218        Self {
219            endpoint: ProcEndpoint::Df,
220            audit_world: None,
221        }
222    }
223
224    /// Returns the proof token for the `pool` endpoint.
225    pub fn pool() -> Self {
226        Self {
227            endpoint: ProcEndpoint::Pool,
228            audit_world: None,
229        }
230    }
231
232    /// Returns the proof token for an `audit verify` operation against a
233    /// specific world.
234    pub fn audit_verify(world: ValidatedWorldPath) -> Self {
235        Self {
236            endpoint: ProcEndpoint::AuditVerify,
237            audit_world: Some(world),
238        }
239    }
240
241    pub(crate) fn endpoint(&self) -> ProcEndpoint {
242        self.endpoint
243    }
244
245    pub(crate) fn audit_world(&self) -> Option<&ValidatedWorldPath> {
246        self.audit_world.as_ref()
247    }
248}
249
250impl fmt::Display for InvalidProcPath {
251    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252        f.write_str("invalid proc path")
253    }
254}
255
256impl std::error::Error for InvalidProcPath {}
257
258impl EngineOps<'_> {
259    pub(crate) fn list_worlds(
260        &self,
261        path: &ValidatedProcPath,
262        tier: auth::Tier,
263    ) -> Result<Vec<ValidatedWorldPath>, EngineError> {
264        let permit = self.authorize_introspection(path, tier)?;
265        ensure_proc_endpoint(&permit, ProcEndpoint::Worlds)?;
266        let mut names = world::list(&self.core().data)
267            .map_err(|err| storage_error_to_engine("proc worlds", err, "list_worlds", None))?;
268        names.extend(self.core().mem.list());
269        names.sort();
270        names.dedup();
271        names
272            .into_iter()
273            .map(validated_world_from_storage)
274            .collect()
275    }
276
277    pub(crate) fn list_worlds_with_prefix(
278        &self,
279        prefix: &str,
280        tier: auth::Tier,
281    ) -> Result<Vec<ValidatedWorldPath>, EngineError> {
282        if !crate::can_read(self.core(), tier) {
283            return Err(EngineError::Auth(AuthGate::Read));
284        }
285        let mut names = world::list_with_prefix(&self.core().data, prefix).map_err(|err| {
286            storage_error_to_engine("worlds prefix", err, "list_worlds_with_prefix", None)
287        })?;
288        names.extend(self.core().mem.list_with_prefix(prefix));
289        names.sort();
290        names.dedup();
291        names
292            .into_iter()
293            .map(validated_world_from_storage)
294            .collect()
295    }
296
297    pub(crate) fn list_worlds_with_prefix_bounded(
298        &self,
299        prefix: &str,
300        tier: auth::Tier,
301        max: usize,
302    ) -> Result<Option<Vec<ValidatedWorldPath>>, EngineError> {
303        if !crate::can_read(self.core(), tier) {
304            return Err(EngineError::Auth(AuthGate::Read));
305        }
306        let limit = max.saturating_add(1);
307        let Some(mut names) = world::list_with_prefix_bounded(&self.core().data, prefix, limit)
308            .map_err(|err| {
309                storage_error_to_engine("worlds prefix", err, "list_worlds_with_prefix", None)
310            })?
311        else {
312            return Ok(None);
313        };
314        let Some(mem_names) = self.core().mem.list_with_prefix_bounded(prefix, limit) else {
315            return Ok(None);
316        };
317        names.extend(mem_names);
318        names.sort();
319        names.dedup();
320        if names.len() > max {
321            return Ok(None);
322        }
323        names
324            .into_iter()
325            .map(validated_world_from_storage)
326            .collect::<Result<Vec<_>, _>>()
327            .map(Some)
328    }
329
330    pub(crate) fn du(
331        &self,
332        path: &ValidatedProcPath,
333        tier: auth::Tier,
334    ) -> Result<Vec<WorldUsage>, EngineError> {
335        let permit = self.authorize_introspection(path, tier)?;
336        ensure_proc_endpoint(&permit, ProcEndpoint::Du)?;
337        let mut sizes = world::sizes(&self.core().data)
338            .map_err(|err| storage_error_to_engine("proc du", err, "du", None))?;
339        sizes.extend(self.core().mem.sizes());
340        sizes.sort_by(|a, b| a.0.cmp(&b.0));
341        sizes.dedup_by(|a, b| a.0 == b.0);
342        sizes
343            .into_iter()
344            .map(|(world, bytes)| {
345                Ok(WorldUsage {
346                    world: validated_world_from_storage(world)?,
347                    bytes,
348                })
349            })
350            .collect()
351    }
352
353    pub(crate) fn df(
354        &self,
355        path: &ValidatedProcPath,
356        tier: auth::Tier,
357    ) -> Result<DfSnapshot, EngineError> {
358        let permit = self.authorize_introspection(path, tier)?;
359        ensure_proc_endpoint(&permit, ProcEndpoint::Df)?;
360        let memory_used = self.core().mem.total_bytes();
361        let memory_worlds = self.core().mem.list().len();
362        let storage_used = self.core().storage_body_bytes.load(Ordering::Relaxed);
363        let durable_worlds = self
364            .core()
365            .durable_world_count
366            .load(Ordering::Relaxed)
367            .saturating_sub(usize::from(
368                self.core().delete_ledger_created.load(Ordering::Relaxed),
369            ));
370        Ok(DfSnapshot {
371            storage_used,
372            storage_quota: self.core().max_storage_bytes,
373            memory_used,
374            memory_quota: self.core().max_memory_bytes,
375            worlds: durable_worlds + memory_worlds,
376        })
377    }
378
379    pub(crate) fn pool(
380        &self,
381        path: &ValidatedProcPath,
382        tier: auth::Tier,
383    ) -> Result<PoolSnapshot, EngineError> {
384        let permit = self.authorize_introspection(path, tier)?;
385        ensure_proc_endpoint(&permit, ProcEndpoint::Pool)?;
386        Ok(PoolSnapshot {
387            read_cache_entries: self.core().read_cache.snapshot_entries(),
388            read_cache_tombstones: self.core().read_cache.snapshot_tombstones(),
389            read_cache_hits: self
390                .core()
391                .read_cache
392                .metrics
393                .read_cache_hits
394                .load(Ordering::Relaxed),
395            read_cache_misses: self
396                .core()
397                .read_cache
398                .metrics
399                .read_cache_misses
400                .load(Ordering::Relaxed),
401            read_cache_capped: self
402                .core()
403                .read_cache
404                .metrics
405                .read_cache_capped
406                .load(Ordering::Relaxed),
407            read_cache_evictions: self
408                .core()
409                .read_cache
410                .metrics
411                .read_cache_evictions
412                .load(Ordering::Relaxed),
413            read_cache_open_fails: self
414                .core()
415                .read_cache
416                .metrics
417                .read_cache_open_fails
418                .load(Ordering::Relaxed),
419            read_cache_max_entries: self.core().read_cache.max_entries,
420            ledger_writer_inits: self.core().ledger.inits.load(Ordering::Relaxed),
421        })
422    }
423
424    pub(crate) fn verify_audit(
425        &self,
426        path: &ValidatedProcPath,
427        tier: auth::Tier,
428    ) -> Result<AuditVerify, EngineError> {
429        let permit = self.authorize_introspection(path, tier)?;
430        ensure_proc_endpoint(&permit, ProcEndpoint::AuditVerify)?;
431        let world = permit
432            .path
433            .audit_world()
434            .ok_or(EngineError::InternalInvariant("audit verify missing world"))?;
435        if store::is_memory_world(world.as_str()) {
436            if !self.core().mem.contains(world.as_str()) {
437                return Err(EngineError::NotFound);
438            }
439            return Ok(AuditVerify::NotApplicable);
440        }
441        match self.core().cached_verify_chain(world.as_str()) {
442            Ok(Some(crate::audit::VerifyReport::Valid(report))) => {
443                Ok(AuditVerify::Valid(report.into()))
444            }
445            Ok(Some(crate::audit::VerifyReport::Broken(report))) => {
446                Ok(AuditVerify::Broken(report.into()))
447            }
448            Ok(None) => Err(EngineError::NotFound),
449            Err(err) => Err(storage_error_to_engine(
450                "audit verify",
451                err,
452                "verify_audit",
453                Some(world.as_str()),
454            )),
455        }
456    }
457
458    fn authorize_introspection(
459        &self,
460        path: &ValidatedProcPath,
461        tier: auth::Tier,
462    ) -> Result<IntrospectionPermit, EngineError> {
463        if !crate::can_read(self.core(), tier) {
464            return Err(EngineError::Auth(AuthGate::Read));
465        }
466        Ok(IntrospectionPermit { path: path.clone() })
467    }
468}
469
470impl Engine {
471    /// Lists every canonical world (durable + in-memory) in sorted order.
472    ///
473    /// # Errors
474    /// - [`EngineError::Auth`] if `tier` is below `Read`.
475    /// - [`EngineError::TransientStorage`] / [`EngineError::Storage`] /
476    ///   [`EngineError::InsufficientStorage`] for storage failures.
477    pub fn list_worlds(&self, tier: AccessTier) -> Result<Vec<ValidatedWorldPath>, EngineError> {
478        EngineOps::new(self.core()).list_worlds(&ValidatedProcPath::worlds(), tier.into())
479    }
480
481    /// Lists canonical worlds with the supplied canonical prefix.
482    ///
483    /// This is intended for adapters that need a bounded namespace view (for
484    /// example retained replay) without materializing the full proc-worlds
485    /// set first. It applies the read gate directly and intentionally bypasses
486    /// proc-path authorization; do not expose it directly as a network endpoint.
487    ///
488    /// # Errors
489    /// Same authorization and storage failures as [`Engine::list_worlds`].
490    pub fn list_worlds_with_prefix(
491        &self,
492        prefix: &str,
493        tier: AccessTier,
494    ) -> Result<Vec<ValidatedWorldPath>, EngineError> {
495        EngineOps::new(self.core()).list_worlds_with_prefix(prefix, tier.into())
496    }
497
498    /// Lists canonical worlds with the supplied canonical prefix, returning
499    /// `Ok(None)` if more than `max` distinct worlds match.
500    ///
501    /// This is intended for adapter-internal bounded scans. It uses the same
502    /// read-tier gate as [`Engine::list_worlds_with_prefix`].
503    pub fn list_worlds_with_prefix_bounded(
504        &self,
505        prefix: &str,
506        tier: AccessTier,
507        max: usize,
508    ) -> Result<Option<Vec<ValidatedWorldPath>>, EngineError> {
509        EngineOps::new(self.core()).list_worlds_with_prefix_bounded(prefix, tier.into(), max)
510    }
511
512    /// Returns per-world body byte size, `du`-style.
513    ///
514    /// # Errors
515    /// See [`Engine::list_worlds`] for the storage-failure variants. Same
516    /// `Read`-tier requirement.
517    pub fn du(&self, tier: AccessTier) -> Result<Vec<WorldUsage>, EngineError> {
518        EngineOps::new(self.core()).du(&ValidatedProcPath::du(), tier.into())
519    }
520
521    /// Returns aggregate storage + memory usage, `df`-style.
522    ///
523    /// # Errors
524    /// - [`EngineError::Auth`] if `tier` is below `Read`.
525    pub fn df(&self, tier: AccessTier) -> Result<DfSnapshot, EngineError> {
526        EngineOps::new(self.core()).df(&ValidatedProcPath::df(), tier.into())
527    }
528
529    /// Returns the read-cache + ledger-writer counter snapshot.
530    ///
531    /// # Errors
532    /// - [`EngineError::Auth`] if `tier` is below `Read`.
533    pub fn pool(&self, tier: AccessTier) -> Result<PoolSnapshot, EngineError> {
534        EngineOps::new(self.core()).pool(&ValidatedProcPath::pool(), tier.into())
535    }
536
537    /// Verifies a single world's HMAC audit chain.
538    ///
539    /// Returns [`AuditVerify::Valid`] / [`AuditVerify::Broken`] /
540    /// [`AuditVerify::NotApplicable`] (the latter for in-memory worlds with
541    /// no chain).
542    ///
543    /// # Errors
544    /// - [`EngineError::Auth`] if `tier` is below `Read`.
545    /// - [`EngineError::NotFound`] if `world` does not exist.
546    /// - [`EngineError::TransientStorage`] / [`EngineError::Storage`] /
547    ///   [`EngineError::InsufficientStorage`] for storage failures during
548    ///   verification.
549    pub fn verify_audit(
550        &self,
551        world: &ValidatedWorldPath,
552        tier: AccessTier,
553    ) -> Result<AuditVerify, EngineError> {
554        let path = ValidatedProcPath::audit_verify(world.clone());
555        EngineOps::new(self.core()).verify_audit(&path, tier.into())
556    }
557}
558
559fn storage_error_to_engine(
560    scope: &'static str,
561    err: rusqlite::Error,
562    operation: &'static str,
563    world: Option<&str>,
564) -> EngineError {
565    if crate::is_insufficient_storage_error(&err) {
566        log_storage_error(scope, &err, operation, world);
567        EngineError::InsufficientStorage {
568            sqlite_code: engine::sqlite_code(&err),
569        }
570    } else if crate::is_transient_storage_error(&err) {
571        log_storage_error(scope, &err, operation, world);
572        EngineError::TransientStorage {
573            sqlite_code: engine::sqlite_code(&err),
574        }
575    } else {
576        log_storage_error(scope, &err, operation, world);
577        EngineError::Storage {
578            sqlite_code: engine::sqlite_code(&err),
579        }
580    }
581}
582
583fn validated_world_from_storage(world: String) -> Result<ValidatedWorldPath, EngineError> {
584    ValidatedWorldPath::from_canonical(world)
585        .map_err(|_| EngineError::InternalInvariant("storage returned invalid world path"))
586}
587
588fn ensure_proc_endpoint(
589    permit: &IntrospectionPermit,
590    expected: ProcEndpoint,
591) -> Result<(), EngineError> {
592    if permit.path.endpoint() == expected {
593        Ok(())
594    } else {
595        Err(EngineError::InternalInvariant(
596            "proc permit endpoint mismatch",
597        ))
598    }
599}
600
601impl From<crate::audit::VerifyOk> for AuditValid {
602    fn from(value: crate::audit::VerifyOk) -> Self {
603        Self {
604            events: value.events,
605            genesis: value.genesis,
606            latest: value.latest,
607        }
608    }
609}
610
611impl From<crate::audit::VerifyBreak> for AuditBroken {
612    fn from(value: crate::audit::VerifyBreak) -> Self {
613        Self {
614            break_at: value.break_at,
615            expected: value.expected,
616            actual: value.actual,
617        }
618    }
619}
620
621#[cfg(test)]
622mod tests {
623    use std::path::PathBuf;
624    use std::time::{SystemTime, UNIX_EPOCH};
625
626    use bytes::Bytes;
627
628    use super::{AuditVerify, ProcEndpoint, ValidatedProcPath};
629    use crate::{
630        engine::{Engine, EngineError},
631        engine_ops::EngineOps,
632        engine_types::{
633            AccessTier, Preconditions, Representation, SecretBytes, ValidatedWorldPath,
634        },
635        AuthGate,
636    };
637
638    fn temp_root(name: &str) -> PathBuf {
639        let nonce = SystemTime::now()
640            .duration_since(UNIX_EPOCH)
641            .unwrap()
642            .as_nanos();
643        let root = std::env::temp_dir().join(format!(
644            "elastik-engine-introspection-{name}-{}-{nonce}",
645            std::process::id()
646        ));
647        let _ = std::fs::remove_dir_all(&root);
648        root
649    }
650
651    #[test]
652    fn validated_proc_path_accepts_declared_endpoints_only() {
653        assert_eq!(
654            ValidatedProcPath::new("/proc/worlds").unwrap().endpoint(),
655            ProcEndpoint::Worlds
656        );
657        assert_eq!(
658            ValidatedProcPath::new("proc/du").unwrap().endpoint(),
659            ProcEndpoint::Du
660        );
661        assert_eq!(
662            ValidatedProcPath::new("proc/audit/home/foo/verify")
663                .unwrap()
664                .audit_world()
665                .unwrap()
666                .as_str(),
667            "home/foo"
668        );
669
670        assert!(ValidatedProcPath::new("proc").is_err());
671        assert!(ValidatedProcPath::new("proc/nope").is_err());
672        assert!(ValidatedProcPath::new("proc/audit/foo/verify").is_err());
673        assert!(ValidatedProcPath::new("proc/audit//verify").is_err());
674        assert!(ValidatedProcPath::new("proc/audit/proc/version/verify").is_err());
675    }
676
677    #[tokio::test]
678    async fn engine_introspection_returns_typed_snapshots() {
679        let root = temp_root("snapshots");
680        let engine = Engine::builder()
681            .data_root(root.clone())
682            .key(SecretBytes::try_from_slice(b"key").unwrap())
683            .read_token(b"reader".to_vec())
684            .max_storage_bytes(Some(64))
685            .build()
686            .unwrap();
687        let disk = ValidatedWorldPath::new("home/inspect").unwrap();
688        let memory = ValidatedWorldPath::new("tmp/inspect").unwrap();
689
690        assert!(matches!(
691            engine.list_worlds(AccessTier::Anon),
692            Err(EngineError::Auth(AuthGate::Read))
693        ));
694
695        for world in [&disk, &memory] {
696            engine
697                .replace(
698                    world,
699                    Representation::new(Bytes::from_static(b"hello"), "text/plain", Vec::new()),
700                    Preconditions::none(),
701                    AccessTier::Write,
702                )
703                .await
704                .unwrap();
705        }
706
707        let worlds = engine.list_worlds(AccessTier::Read).unwrap();
708        assert!(worlds.iter().any(|world| world == &disk));
709        assert!(worlds.iter().any(|world| world == &memory));
710
711        let usage = engine.du(AccessTier::Read).unwrap();
712        assert!(usage
713            .iter()
714            .any(|row| row.world == disk && row.bytes == b"hello".len()));
715        assert!(usage
716            .iter()
717            .any(|row| row.world == memory && row.bytes == b"hello".len()));
718
719        let df = engine.df(AccessTier::Read).unwrap();
720        assert_eq!(df.storage_used, b"hello".len());
721        assert_eq!(df.storage_quota, Some(64));
722        assert_eq!(df.memory_used, b"hello".len());
723        assert_eq!(df.worlds, 2);
724
725        let pool = engine.pool(AccessTier::Read).unwrap();
726        assert_eq!(
727            pool.read_cache_max_entries,
728            crate::read_cache::DEFAULT_READ_CACHE_MAX_ENTRIES
729        );
730        assert!(pool.read_cache_entries <= pool.read_cache_max_entries);
731
732        assert!(matches!(
733            engine.verify_audit(&disk, AccessTier::Read).unwrap(),
734            AuditVerify::Valid(_)
735        ));
736        assert!(matches!(
737            engine.verify_audit(&memory, AccessTier::Read).unwrap(),
738            AuditVerify::NotApplicable
739        ));
740
741        assert!(matches!(
742            EngineOps::new(engine.core())
743                .list_worlds(&ValidatedProcPath::du(), AccessTier::Read.into()),
744            Err(EngineError::InternalInvariant(
745                "proc permit endpoint mismatch"
746            ))
747        ));
748
749        drop(engine);
750        let _ = std::fs::remove_dir_all(root);
751    }
752}