Skip to main content

elastik_core/
engine_introspection.rs

1//! Protocol-neutral Engine introspection surface.
2//!
3//! This module owns the typed snapshots behind `/proc/*`. The HTTP adapter
4//! still renders text/plain bodies; Engine returns structured data.
5
6#![cfg_attr(not(feature = "unstable-engine"), allow(dead_code))]
7
8use std::{fmt, sync::atomic::Ordering};
9
10use crate::{
11    auth,
12    engine::{self, Engine, EngineError},
13    engine_ops::{log_storage_error, EngineOps},
14    engine_types::{AccessTier, ValidatedWorldPath},
15    store, world, AuthGate,
16};
17
18/// Validated `/proc/*` introspection endpoint.
19///
20/// This proves the adapter selected one of the declared proc surfaces before
21/// asking Engine for metadata. Reserved or misspelled proc paths stay in the
22/// adapter as 404/405 rendering concerns.
23#[derive(Clone, Debug, PartialEq, Eq)]
24pub struct ValidatedProcPath {
25    endpoint: ProcEndpoint,
26    audit_world: Option<ValidatedWorldPath>,
27}
28
29/// Returned when a string is not one of Engine's known proc endpoints.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub struct InvalidProcPath;
32
33/// Stable proc endpoint identity carried by [`ValidatedProcPath`].
34#[derive(Clone, Copy, Debug, PartialEq, Eq)]
35#[non_exhaustive]
36pub enum ProcEndpoint {
37    /// Engine build version string.
38    Version,
39    /// Enumerate every world (durable + in-memory).
40    Worlds,
41    /// Per-world body byte size (`du`-like).
42    Du,
43    /// Aggregate storage + memory usage and quotas (`df`-like).
44    Df,
45    /// Read cache + ledger writer pool counters.
46    Pool,
47    /// Verify a single world's HMAC audit chain.
48    AuditVerify,
49}
50
51/// One world-size row for engine introspection.
52pub struct WorldUsage {
53    /// Canonical world path.
54    pub world: ValidatedWorldPath,
55    /// Body byte size for this world.
56    pub bytes: usize,
57}
58
59/// Aggregate storage/memory snapshot.
60///
61/// Returned by [`crate::Engine::df`]. Quotas of `0`/`None` mean "unlimited";
62/// adapters should render that string for operator-facing output.
63pub struct DfSnapshot {
64    /// Bytes used by durable bodies (SQLite-backed worlds).
65    pub storage_used: usize,
66    /// Configured durable quota, or `None` if unlimited.
67    pub storage_quota: Option<usize>,
68    /// Bytes used by in-memory bodies.
69    pub memory_used: usize,
70    /// Configured memory cap. `0` means unlimited.
71    pub memory_quota: usize,
72    /// Total live worlds (durable + in-memory).
73    pub worlds: usize,
74}
75
76/// Read-cache + ledger-writer snapshot.
77///
78/// Returned by [`crate::Engine::pool`]. All counters are monotonic since
79/// process start.
80pub struct PoolSnapshot {
81    /// Active read-cache entries (open SQLite connections currently parked).
82    pub read_cache_entries: usize,
83    /// Tombstoned entries waiting on in-flight reads to drain.
84    pub read_cache_tombstones: usize,
85    /// Cache hits since process start.
86    pub read_cache_hits: usize,
87    /// Cache misses since process start.
88    pub read_cache_misses: usize,
89    /// Times a read was admitted with a transient slot (cache was at cap).
90    pub read_cache_capped: usize,
91    /// Read-cache slot open failures (SQLite open errors).
92    pub read_cache_open_fails: usize,
93    /// Configured maximum cache entries.
94    pub read_cache_max_entries: usize,
95    /// Ledger writer (re)initializations since process start.
96    pub ledger_writer_inits: usize,
97}
98
99/// Successful audit-chain verification details.
100pub struct AuditValid {
101    /// Number of events on the chain.
102    pub events: usize,
103    /// HMAC of the genesis event.
104    pub genesis: String,
105    /// HMAC of the most recent event.
106    pub latest: String,
107}
108
109/// Audit-chain break details.
110pub struct AuditBroken {
111    /// Event id at which verification failed.
112    pub break_at: usize,
113    /// HMAC the chain expected at that position.
114    pub expected: String,
115    /// HMAC actually stored at that position.
116    pub actual: String,
117}
118
119/// Result of [`crate::Engine::verify_audit`].
120#[non_exhaustive]
121pub enum AuditVerify {
122    /// Chain verified end-to-end.
123    Valid(AuditValid),
124    /// Chain failed verification; see [`AuditBroken`] for the break point.
125    Broken(AuditBroken),
126    /// World does not have an audit chain (e.g. an in-memory `tmp/` world).
127    NotApplicable,
128}
129
130struct IntrospectionPermit {
131    path: ValidatedProcPath,
132}
133
134impl ValidatedProcPath {
135    /// Parses a wire-shaped proc path into a sealed endpoint identity.
136    ///
137    /// Accepts `proc/version`, `proc/worlds`, `proc/du`, `proc/df`,
138    /// `proc/pool`, or `proc/audit/<world>/verify`. Leading and trailing
139    /// slashes are tolerated.
140    ///
141    /// # Errors
142    /// Returns [`InvalidProcPath`] if the input is not one of the declared
143    /// endpoints, or if the audit-verify world fails canonical-path
144    /// validation.
145    pub fn new(raw: impl AsRef<str>) -> Result<Self, InvalidProcPath> {
146        let raw = raw.as_ref().trim_matches('/');
147        match raw {
148            "proc/version" => Ok(Self::version()),
149            "proc/worlds" => Ok(Self::worlds()),
150            "proc/du" => Ok(Self::du()),
151            "proc/df" => Ok(Self::df()),
152            "proc/pool" => Ok(Self::pool()),
153            _ => {
154                let Some(world) = raw
155                    .strip_prefix("proc/audit/")
156                    .and_then(|value| value.strip_suffix("/verify"))
157                else {
158                    return Err(InvalidProcPath);
159                };
160                if world.trim_matches('/').is_empty() {
161                    return Err(InvalidProcPath);
162                }
163                let world = world.trim_end_matches('/').to_owned();
164                let world =
165                    ValidatedWorldPath::from_canonical(world).map_err(|_| InvalidProcPath)?;
166                Ok(Self::audit_verify(world))
167            }
168        }
169    }
170
171    /// Returns the proof token for the `version` endpoint.
172    pub fn version() -> Self {
173        Self {
174            endpoint: ProcEndpoint::Version,
175            audit_world: None,
176        }
177    }
178
179    /// Returns the proof token for the `worlds` endpoint.
180    pub fn worlds() -> Self {
181        Self {
182            endpoint: ProcEndpoint::Worlds,
183            audit_world: None,
184        }
185    }
186
187    /// Returns the proof token for the `du` endpoint.
188    pub fn du() -> Self {
189        Self {
190            endpoint: ProcEndpoint::Du,
191            audit_world: None,
192        }
193    }
194
195    /// Returns the proof token for the `df` endpoint.
196    pub fn df() -> Self {
197        Self {
198            endpoint: ProcEndpoint::Df,
199            audit_world: None,
200        }
201    }
202
203    /// Returns the proof token for the `pool` endpoint.
204    pub fn pool() -> Self {
205        Self {
206            endpoint: ProcEndpoint::Pool,
207            audit_world: None,
208        }
209    }
210
211    /// Returns the proof token for an `audit verify` request against a
212    /// specific world.
213    pub fn audit_verify(world: ValidatedWorldPath) -> Self {
214        Self {
215            endpoint: ProcEndpoint::AuditVerify,
216            audit_world: Some(world),
217        }
218    }
219
220    pub(crate) fn endpoint(&self) -> ProcEndpoint {
221        self.endpoint
222    }
223
224    pub(crate) fn audit_world(&self) -> Option<&ValidatedWorldPath> {
225        self.audit_world.as_ref()
226    }
227}
228
229impl fmt::Display for InvalidProcPath {
230    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
231        f.write_str("invalid proc path")
232    }
233}
234
235impl std::error::Error for InvalidProcPath {}
236
237impl EngineOps<'_> {
238    pub(crate) fn list_worlds(
239        &self,
240        path: &ValidatedProcPath,
241        tier: auth::Tier,
242    ) -> Result<Vec<ValidatedWorldPath>, EngineError> {
243        let permit = self.authorize_introspection(path, tier)?;
244        ensure_proc_endpoint(&permit, ProcEndpoint::Worlds)?;
245        let mut names = world::list(&self.core().data)
246            .map_err(|err| storage_error_to_engine("proc worlds", err, "list_worlds", None))?;
247        names.extend(self.core().mem.list());
248        names.sort();
249        names.dedup();
250        names
251            .into_iter()
252            .map(validated_world_from_storage)
253            .collect()
254    }
255
256    pub(crate) fn du(
257        &self,
258        path: &ValidatedProcPath,
259        tier: auth::Tier,
260    ) -> Result<Vec<WorldUsage>, EngineError> {
261        let permit = self.authorize_introspection(path, tier)?;
262        ensure_proc_endpoint(&permit, ProcEndpoint::Du)?;
263        let mut sizes = world::sizes(&self.core().data)
264            .map_err(|err| storage_error_to_engine("proc du", err, "du", None))?;
265        sizes.extend(self.core().mem.sizes());
266        sizes.sort_by(|a, b| a.0.cmp(&b.0));
267        sizes.dedup_by(|a, b| a.0 == b.0);
268        sizes
269            .into_iter()
270            .map(|(world, bytes)| {
271                Ok(WorldUsage {
272                    world: validated_world_from_storage(world)?,
273                    bytes,
274                })
275            })
276            .collect()
277    }
278
279    pub(crate) fn df(
280        &self,
281        path: &ValidatedProcPath,
282        tier: auth::Tier,
283    ) -> Result<DfSnapshot, EngineError> {
284        let permit = self.authorize_introspection(path, tier)?;
285        ensure_proc_endpoint(&permit, ProcEndpoint::Df)?;
286        let memory_used = self.core().mem.total_bytes();
287        let memory_worlds = self.core().mem.list().len();
288        let storage_used = self.core().storage_body_bytes.load(Ordering::Relaxed);
289        let durable_worlds = self
290            .core()
291            .durable_world_count
292            .load(Ordering::Relaxed)
293            .saturating_sub(usize::from(
294                self.core().delete_ledger_created.load(Ordering::Relaxed),
295            ));
296        Ok(DfSnapshot {
297            storage_used,
298            storage_quota: self.core().max_storage_bytes,
299            memory_used,
300            memory_quota: self.core().max_memory_bytes,
301            worlds: durable_worlds + memory_worlds,
302        })
303    }
304
305    pub(crate) fn pool(
306        &self,
307        path: &ValidatedProcPath,
308        tier: auth::Tier,
309    ) -> Result<PoolSnapshot, EngineError> {
310        let permit = self.authorize_introspection(path, tier)?;
311        ensure_proc_endpoint(&permit, ProcEndpoint::Pool)?;
312        Ok(PoolSnapshot {
313            read_cache_entries: self.core().read_cache.snapshot_entries(),
314            read_cache_tombstones: self.core().read_cache.snapshot_tombstones(),
315            read_cache_hits: self
316                .core()
317                .read_cache
318                .metrics
319                .read_cache_hits
320                .load(Ordering::Relaxed),
321            read_cache_misses: self
322                .core()
323                .read_cache
324                .metrics
325                .read_cache_misses
326                .load(Ordering::Relaxed),
327            read_cache_capped: self
328                .core()
329                .read_cache
330                .metrics
331                .read_cache_capped
332                .load(Ordering::Relaxed),
333            read_cache_open_fails: self
334                .core()
335                .read_cache
336                .metrics
337                .read_cache_open_fails
338                .load(Ordering::Relaxed),
339            read_cache_max_entries: self.core().read_cache.max_entries,
340            ledger_writer_inits: self.core().ledger.inits.load(Ordering::Relaxed),
341        })
342    }
343
344    pub(crate) fn verify_audit(
345        &self,
346        path: &ValidatedProcPath,
347        tier: auth::Tier,
348    ) -> Result<AuditVerify, EngineError> {
349        let permit = self.authorize_introspection(path, tier)?;
350        ensure_proc_endpoint(&permit, ProcEndpoint::AuditVerify)?;
351        let world = permit
352            .path
353            .audit_world()
354            .ok_or(EngineError::InternalInvariant("audit verify missing world"))?;
355        if store::is_memory_world(world.as_str()) {
356            if !self.core().mem.contains(world.as_str()) {
357                return Err(EngineError::NotFound);
358            }
359            return Ok(AuditVerify::NotApplicable);
360        }
361        match self.core().cached_verify_chain(world.as_str()) {
362            Ok(Some(crate::audit::VerifyReport::Valid(report))) => {
363                Ok(AuditVerify::Valid(report.into()))
364            }
365            Ok(Some(crate::audit::VerifyReport::Broken(report))) => {
366                Ok(AuditVerify::Broken(report.into()))
367            }
368            Ok(None) => Err(EngineError::NotFound),
369            Err(err) => Err(storage_error_to_engine(
370                "audit verify",
371                err,
372                "verify_audit",
373                Some(world.as_str()),
374            )),
375        }
376    }
377
378    fn authorize_introspection(
379        &self,
380        path: &ValidatedProcPath,
381        tier: auth::Tier,
382    ) -> Result<IntrospectionPermit, EngineError> {
383        if !crate::can_read(self.core(), tier) {
384            return Err(EngineError::Auth(AuthGate::Read));
385        }
386        Ok(IntrospectionPermit { path: path.clone() })
387    }
388}
389
390impl Engine {
391    /// Lists every canonical world (durable + in-memory) in sorted order.
392    ///
393    /// # Errors
394    /// - [`EngineError::Auth`] if `tier` is below `Read`.
395    /// - [`EngineError::TransientStorage`] / [`EngineError::Storage`] /
396    ///   [`EngineError::InsufficientStorage`] for storage failures.
397    pub fn list_worlds(&self, tier: AccessTier) -> Result<Vec<ValidatedWorldPath>, EngineError> {
398        EngineOps::new(self.core()).list_worlds(&ValidatedProcPath::worlds(), tier.into())
399    }
400
401    /// Returns per-world body byte size, `du`-style.
402    ///
403    /// # Errors
404    /// See [`Engine::list_worlds`] for the storage-failure variants. Same
405    /// `Read`-tier requirement.
406    pub fn du(&self, tier: AccessTier) -> Result<Vec<WorldUsage>, EngineError> {
407        EngineOps::new(self.core()).du(&ValidatedProcPath::du(), tier.into())
408    }
409
410    /// Returns aggregate storage + memory usage, `df`-style.
411    ///
412    /// # Errors
413    /// - [`EngineError::Auth`] if `tier` is below `Read`.
414    pub fn df(&self, tier: AccessTier) -> Result<DfSnapshot, EngineError> {
415        EngineOps::new(self.core()).df(&ValidatedProcPath::df(), tier.into())
416    }
417
418    /// Returns the read-cache + ledger-writer counter snapshot.
419    ///
420    /// # Errors
421    /// - [`EngineError::Auth`] if `tier` is below `Read`.
422    pub fn pool(&self, tier: AccessTier) -> Result<PoolSnapshot, EngineError> {
423        EngineOps::new(self.core()).pool(&ValidatedProcPath::pool(), tier.into())
424    }
425
426    /// Verifies a single world's HMAC audit chain.
427    ///
428    /// Returns [`AuditVerify::Valid`] / [`AuditVerify::Broken`] /
429    /// [`AuditVerify::NotApplicable`] (the latter for in-memory worlds with
430    /// no chain).
431    ///
432    /// # Errors
433    /// - [`EngineError::Auth`] if `tier` is below `Read`.
434    /// - [`EngineError::NotFound`] if `world` does not exist.
435    /// - [`EngineError::TransientStorage`] / [`EngineError::Storage`] /
436    ///   [`EngineError::InsufficientStorage`] for storage failures during
437    ///   verification.
438    pub fn verify_audit(
439        &self,
440        world: &ValidatedWorldPath,
441        tier: AccessTier,
442    ) -> Result<AuditVerify, EngineError> {
443        let path = ValidatedProcPath::audit_verify(world.clone());
444        EngineOps::new(self.core()).verify_audit(&path, tier.into())
445    }
446}
447
448fn storage_error_to_engine(
449    scope: &'static str,
450    err: rusqlite::Error,
451    operation: &'static str,
452    world: Option<&str>,
453) -> EngineError {
454    if crate::is_insufficient_storage_error(&err) {
455        log_storage_error(scope, &err, operation, world);
456        EngineError::InsufficientStorage {
457            sqlite_code: engine::sqlite_code(&err),
458        }
459    } else if crate::is_transient_storage_error(&err) {
460        log_storage_error(scope, &err, operation, world);
461        EngineError::TransientStorage {
462            sqlite_code: engine::sqlite_code(&err),
463        }
464    } else {
465        log_storage_error(scope, &err, operation, world);
466        EngineError::Storage {
467            sqlite_code: engine::sqlite_code(&err),
468        }
469    }
470}
471
472fn validated_world_from_storage(world: String) -> Result<ValidatedWorldPath, EngineError> {
473    ValidatedWorldPath::from_canonical(world)
474        .map_err(|_| EngineError::InternalInvariant("storage returned invalid world path"))
475}
476
477fn ensure_proc_endpoint(
478    permit: &IntrospectionPermit,
479    expected: ProcEndpoint,
480) -> Result<(), EngineError> {
481    if permit.path.endpoint() == expected {
482        Ok(())
483    } else {
484        Err(EngineError::InternalInvariant(
485            "proc permit endpoint mismatch",
486        ))
487    }
488}
489
490impl From<crate::audit::VerifyOk> for AuditValid {
491    fn from(value: crate::audit::VerifyOk) -> Self {
492        Self {
493            events: value.events,
494            genesis: value.genesis,
495            latest: value.latest,
496        }
497    }
498}
499
500impl From<crate::audit::VerifyBreak> for AuditBroken {
501    fn from(value: crate::audit::VerifyBreak) -> Self {
502        Self {
503            break_at: value.break_at,
504            expected: value.expected,
505            actual: value.actual,
506        }
507    }
508}
509
510#[cfg(test)]
511mod tests {
512    use std::path::PathBuf;
513    use std::time::{SystemTime, UNIX_EPOCH};
514
515    use bytes::Bytes;
516
517    use super::{AuditVerify, ProcEndpoint, ValidatedProcPath};
518    use crate::{
519        engine::{Engine, EngineError},
520        engine_ops::EngineOps,
521        engine_types::{
522            AccessTier, Preconditions, Representation, SecretBytes, ValidatedWorldPath,
523        },
524        AuthGate,
525    };
526
527    fn temp_root(name: &str) -> PathBuf {
528        let nonce = SystemTime::now()
529            .duration_since(UNIX_EPOCH)
530            .unwrap()
531            .as_nanos();
532        let root = std::env::temp_dir().join(format!(
533            "elastik-engine-introspection-{name}-{}-{nonce}",
534            std::process::id()
535        ));
536        let _ = std::fs::remove_dir_all(&root);
537        root
538    }
539
540    #[test]
541    fn validated_proc_path_accepts_declared_endpoints_only() {
542        assert_eq!(
543            ValidatedProcPath::new("/proc/worlds").unwrap().endpoint(),
544            ProcEndpoint::Worlds
545        );
546        assert_eq!(
547            ValidatedProcPath::new("proc/du").unwrap().endpoint(),
548            ProcEndpoint::Du
549        );
550        assert_eq!(
551            ValidatedProcPath::new("proc/audit/home/foo/verify")
552                .unwrap()
553                .audit_world()
554                .unwrap()
555                .as_str(),
556            "home/foo"
557        );
558
559        assert!(ValidatedProcPath::new("proc").is_err());
560        assert!(ValidatedProcPath::new("proc/nope").is_err());
561        assert!(ValidatedProcPath::new("proc/audit/foo/verify").is_err());
562        assert!(ValidatedProcPath::new("proc/audit//verify").is_err());
563        assert!(ValidatedProcPath::new("proc/audit/proc/version/verify").is_err());
564    }
565
566    #[tokio::test]
567    async fn engine_introspection_returns_typed_snapshots() {
568        let root = temp_root("snapshots");
569        let engine = Engine::builder()
570            .data_root(root.clone())
571            .key(SecretBytes::try_from_slice(b"key").unwrap())
572            .read_token(b"reader".to_vec())
573            .max_storage_bytes(Some(64))
574            .build()
575            .unwrap();
576        let disk = ValidatedWorldPath::new("home/inspect").unwrap();
577        let memory = ValidatedWorldPath::new("tmp/inspect").unwrap();
578
579        assert!(matches!(
580            engine.list_worlds(AccessTier::Anon),
581            Err(EngineError::Auth(AuthGate::Read))
582        ));
583
584        for world in [&disk, &memory] {
585            engine
586                .replace(
587                    world,
588                    Representation {
589                        body: Bytes::from_static(b"hello"),
590                        content_type: "text/plain".to_owned(),
591                        headers: Vec::new(),
592                    },
593                    Preconditions::none(),
594                    AccessTier::Write,
595                )
596                .await
597                .unwrap();
598        }
599
600        let worlds = engine.list_worlds(AccessTier::Read).unwrap();
601        assert!(worlds.iter().any(|world| world == &disk));
602        assert!(worlds.iter().any(|world| world == &memory));
603
604        let usage = engine.du(AccessTier::Read).unwrap();
605        assert!(usage
606            .iter()
607            .any(|row| row.world == disk && row.bytes == b"hello".len()));
608        assert!(usage
609            .iter()
610            .any(|row| row.world == memory && row.bytes == b"hello".len()));
611
612        let df = engine.df(AccessTier::Read).unwrap();
613        assert_eq!(df.storage_used, b"hello".len());
614        assert_eq!(df.storage_quota, Some(64));
615        assert_eq!(df.memory_used, b"hello".len());
616        assert_eq!(df.worlds, 2);
617
618        let pool = engine.pool(AccessTier::Read).unwrap();
619        assert_eq!(
620            pool.read_cache_max_entries,
621            crate::read_cache::DEFAULT_READ_CACHE_MAX_ENTRIES
622        );
623        assert!(pool.read_cache_entries <= pool.read_cache_max_entries);
624
625        assert!(matches!(
626            engine.verify_audit(&disk, AccessTier::Read).unwrap(),
627            AuditVerify::Valid(_)
628        ));
629        assert!(matches!(
630            engine.verify_audit(&memory, AccessTier::Read).unwrap(),
631            AuditVerify::NotApplicable
632        ));
633
634        assert!(matches!(
635            EngineOps::new(engine.core())
636                .list_worlds(&ValidatedProcPath::du(), AccessTier::Read.into()),
637            Err(EngineError::InternalInvariant(
638                "proc permit endpoint mismatch"
639            ))
640        ));
641
642        drop(engine);
643        let _ = std::fs::remove_dir_all(root);
644    }
645}