Skip to main content

elastik_core/
engine_introspection.rs

1//! Protocol-neutral Engine introspection surface.
2//!
3//! This module owns the typed snapshots behind `/proc/*`. The HTTP adapter
4//! still renders text/plain bodies; Engine returns structured data.
5
6#![cfg_attr(not(feature = "unstable-engine"), allow(dead_code))]
7
8use std::{fmt, sync::atomic::Ordering};
9
10use crate::{
11    auth,
12    engine::{self, Engine, EngineError},
13    engine_ops::{log_storage_error, EngineOps},
14    engine_types::{AccessTier, ValidatedWorldPath},
15    store, world, AuthGate,
16};
17
18/// Validated `/proc/*` introspection endpoint.
19///
20/// This proves the adapter selected one of the declared proc surfaces before
21/// asking Engine for metadata. Reserved or misspelled proc paths stay in the
22/// adapter as 404/405 rendering concerns.
23#[derive(Clone, Debug, PartialEq, Eq)]
24pub struct ValidatedProcPath {
25    endpoint: ProcEndpoint,
26    audit_world: Option<ValidatedWorldPath>,
27}
28
29/// Returned when a string is not one of Engine's known proc endpoints.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub struct InvalidProcPath;
32
33/// Stable proc endpoint identity carried by [`ValidatedProcPath`].
34#[derive(Clone, Copy, Debug, PartialEq, Eq)]
35#[non_exhaustive]
36pub enum ProcEndpoint {
37    /// Engine build version string.
38    Version,
39    /// Enumerate every world (durable + in-memory).
40    Worlds,
41    /// Per-world body byte size (`du`-like).
42    Du,
43    /// Aggregate storage + memory usage and quotas (`df`-like).
44    Df,
45    /// Read cache + ledger writer pool counters.
46    Pool,
47    /// Verify a single world's HMAC audit chain.
48    AuditVerify,
49}
50
51/// One world-size row for engine introspection.
52#[non_exhaustive]
53pub struct WorldUsage {
54    /// Canonical world path.
55    pub world: ValidatedWorldPath,
56    /// Body byte size for this world.
57    pub bytes: usize,
58}
59
60/// Aggregate storage/memory snapshot.
61///
62/// Returned by [`crate::Engine::df`]. Quotas of `0`/`None` mean "unlimited";
63/// adapters should render that string for operator-facing output.
64#[non_exhaustive]
65pub struct DfSnapshot {
66    /// Bytes used by durable bodies (SQLite-backed worlds).
67    pub storage_used: usize,
68    /// Configured durable quota, or `None` if unlimited.
69    pub storage_quota: Option<usize>,
70    /// Bytes used by in-memory bodies.
71    pub memory_used: usize,
72    /// Configured memory cap. `0` means unlimited.
73    pub memory_quota: usize,
74    /// Total live worlds (durable + in-memory).
75    pub worlds: usize,
76}
77
78/// Read-cache + ledger-writer snapshot.
79///
80/// Returned by [`crate::Engine::pool`]. All counters are monotonic since
81/// process start.
82#[non_exhaustive]
83pub struct PoolSnapshot {
84    /// Active read-cache entries (open SQLite connections currently parked).
85    pub read_cache_entries: usize,
86    /// Tombstoned entries waiting on in-flight reads to drain.
87    pub read_cache_tombstones: usize,
88    /// Cache hits since process start.
89    ///
90    /// A hit means a Phase 1 cached slot returned a definitive answer:
91    /// `Some` from a Ready slot, or `None` from Tombstone. Opening and
92    /// Evicted retry signals are not hits.
93    pub read_cache_hits: usize,
94    /// Cache misses since process start.
95    ///
96    /// Counted at most once per external read, even if internal retry loops run
97    /// more than once. Under races, one external read may independently count a
98    /// miss and later a hit if another reader installs the target slot first.
99    pub read_cache_misses: usize,
100    /// Times a read saw the cache at cap after a miss.
101    ///
102    /// Counted at most once per external read. The read may still evict a
103    /// sampled cold slot instead of falling back to a transient slot.
104    pub read_cache_capped: usize,
105    /// Successful cap-full read-cache evictions since process start. Failed
106    /// eviction attempts fall back to transient reads and are not counted.
107    pub read_cache_evictions: usize,
108    /// Read-cache slot open failures (SQLite open errors).
109    pub read_cache_open_fails: usize,
110    /// Configured maximum cache entries.
111    pub read_cache_max_entries: usize,
112    /// Ledger writer (re)initializations since process start.
113    pub ledger_writer_inits: usize,
114}
115
116/// Successful audit-chain verification details.
117#[non_exhaustive]
118pub struct AuditValid {
119    /// Number of events on the chain.
120    pub events: usize,
121    /// HMAC of the genesis event.
122    pub genesis: String,
123    /// HMAC of the most recent event.
124    pub latest: String,
125}
126
127/// Audit-chain break details.
128#[non_exhaustive]
129pub struct AuditBroken {
130    /// Event id at which verification failed.
131    pub break_at: usize,
132    /// HMAC the chain expected at that position.
133    pub expected: String,
134    /// HMAC actually stored at that position.
135    pub actual: String,
136}
137
138/// Result of [`crate::Engine::verify_audit`].
139#[non_exhaustive]
140pub enum AuditVerify {
141    /// Chain verified end-to-end.
142    Valid(AuditValid),
143    /// Chain failed verification; see [`AuditBroken`] for the break point.
144    Broken(AuditBroken),
145    /// World does not have an audit chain (e.g. an in-memory `tmp/` world).
146    NotApplicable,
147}
148
149struct IntrospectionPermit {
150    path: ValidatedProcPath,
151}
152
153impl ValidatedProcPath {
154    /// Parses a wire-shaped proc path into a sealed endpoint identity.
155    ///
156    /// Accepts `proc/version`, `proc/worlds`, `proc/du`, `proc/df`,
157    /// `proc/pool`, or `proc/audit/<world>/verify`. Leading and trailing
158    /// slashes are tolerated. The audit world segment must already be a
159    /// canonical Engine path such as `home/foo`; bare names like `foo`, wire
160    /// paths like `/home/foo`, and generated `proc/*` worlds are rejected.
161    ///
162    /// # Errors
163    /// Returns [`InvalidProcPath`] if the input is not one of the declared
164    /// endpoints, or if the audit-verify world fails canonical-path
165    /// validation.
166    pub fn new(raw: impl AsRef<str>) -> Result<Self, InvalidProcPath> {
167        let raw = raw.as_ref().trim_matches('/');
168        match raw {
169            "proc/version" => Ok(Self::version()),
170            "proc/worlds" => Ok(Self::worlds()),
171            "proc/du" => Ok(Self::du()),
172            "proc/df" => Ok(Self::df()),
173            "proc/pool" => Ok(Self::pool()),
174            _ => {
175                let Some(world) = raw
176                    .strip_prefix("proc/audit/")
177                    .and_then(|value| value.strip_suffix("/verify"))
178                else {
179                    return Err(InvalidProcPath);
180                };
181                if world.trim_matches('/').is_empty() {
182                    return Err(InvalidProcPath);
183                }
184                let world = world.trim_end_matches('/').to_owned();
185                let world =
186                    ValidatedWorldPath::from_canonical(world).map_err(|_| InvalidProcPath)?;
187                Ok(Self::audit_verify(world))
188            }
189        }
190    }
191
192    /// Returns the proof token for the `version` endpoint.
193    pub fn version() -> Self {
194        Self {
195            endpoint: ProcEndpoint::Version,
196            audit_world: None,
197        }
198    }
199
200    /// Returns the proof token for the `worlds` endpoint.
201    pub fn worlds() -> Self {
202        Self {
203            endpoint: ProcEndpoint::Worlds,
204            audit_world: None,
205        }
206    }
207
208    /// Returns the proof token for the `du` endpoint.
209    pub fn du() -> Self {
210        Self {
211            endpoint: ProcEndpoint::Du,
212            audit_world: None,
213        }
214    }
215
216    /// Returns the proof token for the `df` endpoint.
217    pub fn df() -> Self {
218        Self {
219            endpoint: ProcEndpoint::Df,
220            audit_world: None,
221        }
222    }
223
224    /// Returns the proof token for the `pool` endpoint.
225    pub fn pool() -> Self {
226        Self {
227            endpoint: ProcEndpoint::Pool,
228            audit_world: None,
229        }
230    }
231
232    /// Returns the proof token for an `audit verify` request against a
233    /// specific world.
234    pub fn audit_verify(world: ValidatedWorldPath) -> Self {
235        Self {
236            endpoint: ProcEndpoint::AuditVerify,
237            audit_world: Some(world),
238        }
239    }
240
241    pub(crate) fn endpoint(&self) -> ProcEndpoint {
242        self.endpoint
243    }
244
245    pub(crate) fn audit_world(&self) -> Option<&ValidatedWorldPath> {
246        self.audit_world.as_ref()
247    }
248}
249
250impl fmt::Display for InvalidProcPath {
251    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252        f.write_str("invalid proc path")
253    }
254}
255
256impl std::error::Error for InvalidProcPath {}
257
258impl EngineOps<'_> {
259    pub(crate) fn list_worlds(
260        &self,
261        path: &ValidatedProcPath,
262        tier: auth::Tier,
263    ) -> Result<Vec<ValidatedWorldPath>, EngineError> {
264        let permit = self.authorize_introspection(path, tier)?;
265        ensure_proc_endpoint(&permit, ProcEndpoint::Worlds)?;
266        let mut names = world::list(&self.core().data)
267            .map_err(|err| storage_error_to_engine("proc worlds", err, "list_worlds", None))?;
268        names.extend(self.core().mem.list());
269        names.sort();
270        names.dedup();
271        names
272            .into_iter()
273            .map(validated_world_from_storage)
274            .collect()
275    }
276
277    pub(crate) fn du(
278        &self,
279        path: &ValidatedProcPath,
280        tier: auth::Tier,
281    ) -> Result<Vec<WorldUsage>, EngineError> {
282        let permit = self.authorize_introspection(path, tier)?;
283        ensure_proc_endpoint(&permit, ProcEndpoint::Du)?;
284        let mut sizes = world::sizes(&self.core().data)
285            .map_err(|err| storage_error_to_engine("proc du", err, "du", None))?;
286        sizes.extend(self.core().mem.sizes());
287        sizes.sort_by(|a, b| a.0.cmp(&b.0));
288        sizes.dedup_by(|a, b| a.0 == b.0);
289        sizes
290            .into_iter()
291            .map(|(world, bytes)| {
292                Ok(WorldUsage {
293                    world: validated_world_from_storage(world)?,
294                    bytes,
295                })
296            })
297            .collect()
298    }
299
300    pub(crate) fn df(
301        &self,
302        path: &ValidatedProcPath,
303        tier: auth::Tier,
304    ) -> Result<DfSnapshot, EngineError> {
305        let permit = self.authorize_introspection(path, tier)?;
306        ensure_proc_endpoint(&permit, ProcEndpoint::Df)?;
307        let memory_used = self.core().mem.total_bytes();
308        let memory_worlds = self.core().mem.list().len();
309        let storage_used = self.core().storage_body_bytes.load(Ordering::Relaxed);
310        let durable_worlds = self
311            .core()
312            .durable_world_count
313            .load(Ordering::Relaxed)
314            .saturating_sub(usize::from(
315                self.core().delete_ledger_created.load(Ordering::Relaxed),
316            ));
317        Ok(DfSnapshot {
318            storage_used,
319            storage_quota: self.core().max_storage_bytes,
320            memory_used,
321            memory_quota: self.core().max_memory_bytes,
322            worlds: durable_worlds + memory_worlds,
323        })
324    }
325
326    pub(crate) fn pool(
327        &self,
328        path: &ValidatedProcPath,
329        tier: auth::Tier,
330    ) -> Result<PoolSnapshot, EngineError> {
331        let permit = self.authorize_introspection(path, tier)?;
332        ensure_proc_endpoint(&permit, ProcEndpoint::Pool)?;
333        Ok(PoolSnapshot {
334            read_cache_entries: self.core().read_cache.snapshot_entries(),
335            read_cache_tombstones: self.core().read_cache.snapshot_tombstones(),
336            read_cache_hits: self
337                .core()
338                .read_cache
339                .metrics
340                .read_cache_hits
341                .load(Ordering::Relaxed),
342            read_cache_misses: self
343                .core()
344                .read_cache
345                .metrics
346                .read_cache_misses
347                .load(Ordering::Relaxed),
348            read_cache_capped: self
349                .core()
350                .read_cache
351                .metrics
352                .read_cache_capped
353                .load(Ordering::Relaxed),
354            read_cache_evictions: self
355                .core()
356                .read_cache
357                .metrics
358                .read_cache_evictions
359                .load(Ordering::Relaxed),
360            read_cache_open_fails: self
361                .core()
362                .read_cache
363                .metrics
364                .read_cache_open_fails
365                .load(Ordering::Relaxed),
366            read_cache_max_entries: self.core().read_cache.max_entries,
367            ledger_writer_inits: self.core().ledger.inits.load(Ordering::Relaxed),
368        })
369    }
370
371    pub(crate) fn verify_audit(
372        &self,
373        path: &ValidatedProcPath,
374        tier: auth::Tier,
375    ) -> Result<AuditVerify, EngineError> {
376        let permit = self.authorize_introspection(path, tier)?;
377        ensure_proc_endpoint(&permit, ProcEndpoint::AuditVerify)?;
378        let world = permit
379            .path
380            .audit_world()
381            .ok_or(EngineError::InternalInvariant("audit verify missing world"))?;
382        if store::is_memory_world(world.as_str()) {
383            if !self.core().mem.contains(world.as_str()) {
384                return Err(EngineError::NotFound);
385            }
386            return Ok(AuditVerify::NotApplicable);
387        }
388        match self.core().cached_verify_chain(world.as_str()) {
389            Ok(Some(crate::audit::VerifyReport::Valid(report))) => {
390                Ok(AuditVerify::Valid(report.into()))
391            }
392            Ok(Some(crate::audit::VerifyReport::Broken(report))) => {
393                Ok(AuditVerify::Broken(report.into()))
394            }
395            Ok(None) => Err(EngineError::NotFound),
396            Err(err) => Err(storage_error_to_engine(
397                "audit verify",
398                err,
399                "verify_audit",
400                Some(world.as_str()),
401            )),
402        }
403    }
404
405    fn authorize_introspection(
406        &self,
407        path: &ValidatedProcPath,
408        tier: auth::Tier,
409    ) -> Result<IntrospectionPermit, EngineError> {
410        if !crate::can_read(self.core(), tier) {
411            return Err(EngineError::Auth(AuthGate::Read));
412        }
413        Ok(IntrospectionPermit { path: path.clone() })
414    }
415}
416
417impl Engine {
418    /// Lists every canonical world (durable + in-memory) in sorted order.
419    ///
420    /// # Errors
421    /// - [`EngineError::Auth`] if `tier` is below `Read`.
422    /// - [`EngineError::TransientStorage`] / [`EngineError::Storage`] /
423    ///   [`EngineError::InsufficientStorage`] for storage failures.
424    pub fn list_worlds(&self, tier: AccessTier) -> Result<Vec<ValidatedWorldPath>, EngineError> {
425        EngineOps::new(self.core()).list_worlds(&ValidatedProcPath::worlds(), tier.into())
426    }
427
428    /// Returns per-world body byte size, `du`-style.
429    ///
430    /// # Errors
431    /// See [`Engine::list_worlds`] for the storage-failure variants. Same
432    /// `Read`-tier requirement.
433    pub fn du(&self, tier: AccessTier) -> Result<Vec<WorldUsage>, EngineError> {
434        EngineOps::new(self.core()).du(&ValidatedProcPath::du(), tier.into())
435    }
436
437    /// Returns aggregate storage + memory usage, `df`-style.
438    ///
439    /// # Errors
440    /// - [`EngineError::Auth`] if `tier` is below `Read`.
441    pub fn df(&self, tier: AccessTier) -> Result<DfSnapshot, EngineError> {
442        EngineOps::new(self.core()).df(&ValidatedProcPath::df(), tier.into())
443    }
444
445    /// Returns the read-cache + ledger-writer counter snapshot.
446    ///
447    /// # Errors
448    /// - [`EngineError::Auth`] if `tier` is below `Read`.
449    pub fn pool(&self, tier: AccessTier) -> Result<PoolSnapshot, EngineError> {
450        EngineOps::new(self.core()).pool(&ValidatedProcPath::pool(), tier.into())
451    }
452
453    /// Verifies a single world's HMAC audit chain.
454    ///
455    /// Returns [`AuditVerify::Valid`] / [`AuditVerify::Broken`] /
456    /// [`AuditVerify::NotApplicable`] (the latter for in-memory worlds with
457    /// no chain).
458    ///
459    /// # Errors
460    /// - [`EngineError::Auth`] if `tier` is below `Read`.
461    /// - [`EngineError::NotFound`] if `world` does not exist.
462    /// - [`EngineError::TransientStorage`] / [`EngineError::Storage`] /
463    ///   [`EngineError::InsufficientStorage`] for storage failures during
464    ///   verification.
465    pub fn verify_audit(
466        &self,
467        world: &ValidatedWorldPath,
468        tier: AccessTier,
469    ) -> Result<AuditVerify, EngineError> {
470        let path = ValidatedProcPath::audit_verify(world.clone());
471        EngineOps::new(self.core()).verify_audit(&path, tier.into())
472    }
473}
474
475fn storage_error_to_engine(
476    scope: &'static str,
477    err: rusqlite::Error,
478    operation: &'static str,
479    world: Option<&str>,
480) -> EngineError {
481    if crate::is_insufficient_storage_error(&err) {
482        log_storage_error(scope, &err, operation, world);
483        EngineError::InsufficientStorage {
484            sqlite_code: engine::sqlite_code(&err),
485        }
486    } else if crate::is_transient_storage_error(&err) {
487        log_storage_error(scope, &err, operation, world);
488        EngineError::TransientStorage {
489            sqlite_code: engine::sqlite_code(&err),
490        }
491    } else {
492        log_storage_error(scope, &err, operation, world);
493        EngineError::Storage {
494            sqlite_code: engine::sqlite_code(&err),
495        }
496    }
497}
498
499fn validated_world_from_storage(world: String) -> Result<ValidatedWorldPath, EngineError> {
500    ValidatedWorldPath::from_canonical(world)
501        .map_err(|_| EngineError::InternalInvariant("storage returned invalid world path"))
502}
503
504fn ensure_proc_endpoint(
505    permit: &IntrospectionPermit,
506    expected: ProcEndpoint,
507) -> Result<(), EngineError> {
508    if permit.path.endpoint() == expected {
509        Ok(())
510    } else {
511        Err(EngineError::InternalInvariant(
512            "proc permit endpoint mismatch",
513        ))
514    }
515}
516
517impl From<crate::audit::VerifyOk> for AuditValid {
518    fn from(value: crate::audit::VerifyOk) -> Self {
519        Self {
520            events: value.events,
521            genesis: value.genesis,
522            latest: value.latest,
523        }
524    }
525}
526
527impl From<crate::audit::VerifyBreak> for AuditBroken {
528    fn from(value: crate::audit::VerifyBreak) -> Self {
529        Self {
530            break_at: value.break_at,
531            expected: value.expected,
532            actual: value.actual,
533        }
534    }
535}
536
537#[cfg(test)]
538mod tests {
539    use std::path::PathBuf;
540    use std::time::{SystemTime, UNIX_EPOCH};
541
542    use bytes::Bytes;
543
544    use super::{AuditVerify, ProcEndpoint, ValidatedProcPath};
545    use crate::{
546        engine::{Engine, EngineError},
547        engine_ops::EngineOps,
548        engine_types::{
549            AccessTier, Preconditions, Representation, SecretBytes, ValidatedWorldPath,
550        },
551        AuthGate,
552    };
553
554    fn temp_root(name: &str) -> PathBuf {
555        let nonce = SystemTime::now()
556            .duration_since(UNIX_EPOCH)
557            .unwrap()
558            .as_nanos();
559        let root = std::env::temp_dir().join(format!(
560            "elastik-engine-introspection-{name}-{}-{nonce}",
561            std::process::id()
562        ));
563        let _ = std::fs::remove_dir_all(&root);
564        root
565    }
566
567    #[test]
568    fn validated_proc_path_accepts_declared_endpoints_only() {
569        assert_eq!(
570            ValidatedProcPath::new("/proc/worlds").unwrap().endpoint(),
571            ProcEndpoint::Worlds
572        );
573        assert_eq!(
574            ValidatedProcPath::new("proc/du").unwrap().endpoint(),
575            ProcEndpoint::Du
576        );
577        assert_eq!(
578            ValidatedProcPath::new("proc/audit/home/foo/verify")
579                .unwrap()
580                .audit_world()
581                .unwrap()
582                .as_str(),
583            "home/foo"
584        );
585
586        assert!(ValidatedProcPath::new("proc").is_err());
587        assert!(ValidatedProcPath::new("proc/nope").is_err());
588        assert!(ValidatedProcPath::new("proc/audit/foo/verify").is_err());
589        assert!(ValidatedProcPath::new("proc/audit//verify").is_err());
590        assert!(ValidatedProcPath::new("proc/audit/proc/version/verify").is_err());
591    }
592
593    #[tokio::test]
594    async fn engine_introspection_returns_typed_snapshots() {
595        let root = temp_root("snapshots");
596        let engine = Engine::builder()
597            .data_root(root.clone())
598            .key(SecretBytes::try_from_slice(b"key").unwrap())
599            .read_token(b"reader".to_vec())
600            .max_storage_bytes(Some(64))
601            .build()
602            .unwrap();
603        let disk = ValidatedWorldPath::new("home/inspect").unwrap();
604        let memory = ValidatedWorldPath::new("tmp/inspect").unwrap();
605
606        assert!(matches!(
607            engine.list_worlds(AccessTier::Anon),
608            Err(EngineError::Auth(AuthGate::Read))
609        ));
610
611        for world in [&disk, &memory] {
612            engine
613                .replace(
614                    world,
615                    Representation::new(Bytes::from_static(b"hello"), "text/plain", Vec::new()),
616                    Preconditions::none(),
617                    AccessTier::Write,
618                )
619                .await
620                .unwrap();
621        }
622
623        let worlds = engine.list_worlds(AccessTier::Read).unwrap();
624        assert!(worlds.iter().any(|world| world == &disk));
625        assert!(worlds.iter().any(|world| world == &memory));
626
627        let usage = engine.du(AccessTier::Read).unwrap();
628        assert!(usage
629            .iter()
630            .any(|row| row.world == disk && row.bytes == b"hello".len()));
631        assert!(usage
632            .iter()
633            .any(|row| row.world == memory && row.bytes == b"hello".len()));
634
635        let df = engine.df(AccessTier::Read).unwrap();
636        assert_eq!(df.storage_used, b"hello".len());
637        assert_eq!(df.storage_quota, Some(64));
638        assert_eq!(df.memory_used, b"hello".len());
639        assert_eq!(df.worlds, 2);
640
641        let pool = engine.pool(AccessTier::Read).unwrap();
642        assert_eq!(
643            pool.read_cache_max_entries,
644            crate::read_cache::DEFAULT_READ_CACHE_MAX_ENTRIES
645        );
646        assert!(pool.read_cache_entries <= pool.read_cache_max_entries);
647
648        assert!(matches!(
649            engine.verify_audit(&disk, AccessTier::Read).unwrap(),
650            AuditVerify::Valid(_)
651        ));
652        assert!(matches!(
653            engine.verify_audit(&memory, AccessTier::Read).unwrap(),
654            AuditVerify::NotApplicable
655        ));
656
657        assert!(matches!(
658            EngineOps::new(engine.core())
659                .list_worlds(&ValidatedProcPath::du(), AccessTier::Read.into()),
660            Err(EngineError::InternalInvariant(
661                "proc permit endpoint mismatch"
662            ))
663        ));
664
665        drop(engine);
666        let _ = std::fs::remove_dir_all(root);
667    }
668}