1#![cfg_attr(not(feature = "unstable-engine"), allow(dead_code))]
7
8use std::{fmt, sync::atomic::Ordering};
9
10use crate::{
11 auth,
12 engine::{self, Engine, EngineError},
13 engine_ops::{log_storage_error, EngineOps},
14 engine_types::{AccessTier, ValidatedWorldPath},
15 store, world, AuthGate,
16};
17
18#[derive(Clone, Debug, PartialEq, Eq)]
24pub struct ValidatedProcPath {
25 endpoint: ProcEndpoint,
26 audit_world: Option<ValidatedWorldPath>,
27}
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub struct InvalidProcPath;
32
33#[derive(Clone, Copy, Debug, PartialEq, Eq)]
35#[non_exhaustive]
36pub enum ProcEndpoint {
37 Version,
39 Worlds,
41 Du,
43 Df,
45 Pool,
47 AuditVerify,
49}
50
51#[non_exhaustive]
53pub struct WorldUsage {
54 pub world: ValidatedWorldPath,
56 pub bytes: usize,
58}
59
60#[non_exhaustive]
65pub struct DfSnapshot {
66 pub storage_used: usize,
68 pub storage_quota: Option<usize>,
70 pub memory_used: usize,
72 pub memory_quota: usize,
74 pub worlds: usize,
76}
77
78#[non_exhaustive]
83pub struct PoolSnapshot {
84 pub read_cache_entries: usize,
86 pub read_cache_tombstones: usize,
88 pub read_cache_hits: usize,
94 pub read_cache_misses: usize,
100 pub read_cache_capped: usize,
105 pub read_cache_evictions: usize,
108 pub read_cache_open_fails: usize,
110 pub read_cache_max_entries: usize,
112 pub ledger_writer_inits: usize,
114}
115
116#[non_exhaustive]
118pub struct AuditValid {
119 pub events: usize,
121 pub genesis: String,
123 pub latest: String,
125}
126
127#[non_exhaustive]
129pub struct AuditBroken {
130 pub break_at: usize,
132 pub expected: String,
134 pub actual: String,
136}
137
138#[non_exhaustive]
140pub enum AuditVerify {
141 Valid(AuditValid),
143 Broken(AuditBroken),
145 NotApplicable,
147}
148
149struct IntrospectionPermit {
150 path: ValidatedProcPath,
151}
152
153impl ValidatedProcPath {
154 pub fn new(raw: impl AsRef<str>) -> Result<Self, InvalidProcPath> {
167 let raw = raw.as_ref().trim_matches('/');
168 match raw {
169 "proc/version" => Ok(Self::version()),
170 "proc/worlds" => Ok(Self::worlds()),
171 "proc/du" => Ok(Self::du()),
172 "proc/df" => Ok(Self::df()),
173 "proc/pool" => Ok(Self::pool()),
174 _ => {
175 let Some(world) = raw
176 .strip_prefix("proc/audit/")
177 .and_then(|value| value.strip_suffix("/verify"))
178 else {
179 return Err(InvalidProcPath);
180 };
181 if world.trim_matches('/').is_empty() {
182 return Err(InvalidProcPath);
183 }
184 let world = world.trim_end_matches('/').to_owned();
185 let world =
186 ValidatedWorldPath::from_canonical(world).map_err(|_| InvalidProcPath)?;
187 Ok(Self::audit_verify(world))
188 }
189 }
190 }
191
192 pub fn version() -> Self {
194 Self {
195 endpoint: ProcEndpoint::Version,
196 audit_world: None,
197 }
198 }
199
200 pub fn worlds() -> Self {
202 Self {
203 endpoint: ProcEndpoint::Worlds,
204 audit_world: None,
205 }
206 }
207
208 pub fn du() -> Self {
210 Self {
211 endpoint: ProcEndpoint::Du,
212 audit_world: None,
213 }
214 }
215
216 pub fn df() -> Self {
218 Self {
219 endpoint: ProcEndpoint::Df,
220 audit_world: None,
221 }
222 }
223
224 pub fn pool() -> Self {
226 Self {
227 endpoint: ProcEndpoint::Pool,
228 audit_world: None,
229 }
230 }
231
232 pub fn audit_verify(world: ValidatedWorldPath) -> Self {
235 Self {
236 endpoint: ProcEndpoint::AuditVerify,
237 audit_world: Some(world),
238 }
239 }
240
241 pub(crate) fn endpoint(&self) -> ProcEndpoint {
242 self.endpoint
243 }
244
245 pub(crate) fn audit_world(&self) -> Option<&ValidatedWorldPath> {
246 self.audit_world.as_ref()
247 }
248}
249
250impl fmt::Display for InvalidProcPath {
251 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252 f.write_str("invalid proc path")
253 }
254}
255
256impl std::error::Error for InvalidProcPath {}
257
258impl EngineOps<'_> {
259 pub(crate) fn list_worlds(
260 &self,
261 path: &ValidatedProcPath,
262 tier: auth::Tier,
263 ) -> Result<Vec<ValidatedWorldPath>, EngineError> {
264 let permit = self.authorize_introspection(path, tier)?;
265 ensure_proc_endpoint(&permit, ProcEndpoint::Worlds)?;
266 let mut names = world::list(&self.core().data)
267 .map_err(|err| storage_error_to_engine("proc worlds", err, "list_worlds", None))?;
268 names.extend(self.core().mem.list());
269 names.sort();
270 names.dedup();
271 names
272 .into_iter()
273 .map(validated_world_from_storage)
274 .collect()
275 }
276
277 pub(crate) fn list_worlds_with_prefix(
278 &self,
279 prefix: &str,
280 tier: auth::Tier,
281 ) -> Result<Vec<ValidatedWorldPath>, EngineError> {
282 if !crate::can_read(self.core(), tier) {
283 return Err(EngineError::Auth(AuthGate::Read));
284 }
285 let mut names = world::list_with_prefix(&self.core().data, prefix).map_err(|err| {
286 storage_error_to_engine("worlds prefix", err, "list_worlds_with_prefix", None)
287 })?;
288 names.extend(self.core().mem.list_with_prefix(prefix));
289 names.sort();
290 names.dedup();
291 names
292 .into_iter()
293 .map(validated_world_from_storage)
294 .collect()
295 }
296
297 pub(crate) fn list_worlds_with_prefix_bounded(
298 &self,
299 prefix: &str,
300 tier: auth::Tier,
301 max: usize,
302 ) -> Result<Option<Vec<ValidatedWorldPath>>, EngineError> {
303 if !crate::can_read(self.core(), tier) {
304 return Err(EngineError::Auth(AuthGate::Read));
305 }
306 let limit = max.saturating_add(1);
307 let Some(mut names) = world::list_with_prefix_bounded(&self.core().data, prefix, limit)
308 .map_err(|err| {
309 storage_error_to_engine("worlds prefix", err, "list_worlds_with_prefix", None)
310 })?
311 else {
312 return Ok(None);
313 };
314 let Some(mem_names) = self.core().mem.list_with_prefix_bounded(prefix, limit) else {
315 return Ok(None);
316 };
317 names.extend(mem_names);
318 names.sort();
319 names.dedup();
320 if names.len() > max {
321 return Ok(None);
322 }
323 names
324 .into_iter()
325 .map(validated_world_from_storage)
326 .collect::<Result<Vec<_>, _>>()
327 .map(Some)
328 }
329
330 pub(crate) fn du(
331 &self,
332 path: &ValidatedProcPath,
333 tier: auth::Tier,
334 ) -> Result<Vec<WorldUsage>, EngineError> {
335 let permit = self.authorize_introspection(path, tier)?;
336 ensure_proc_endpoint(&permit, ProcEndpoint::Du)?;
337 let mut sizes = world::sizes(&self.core().data)
338 .map_err(|err| storage_error_to_engine("proc du", err, "du", None))?;
339 sizes.extend(self.core().mem.sizes());
340 sizes.sort_by(|a, b| a.0.cmp(&b.0));
341 sizes.dedup_by(|a, b| a.0 == b.0);
342 sizes
343 .into_iter()
344 .map(|(world, bytes)| {
345 Ok(WorldUsage {
346 world: validated_world_from_storage(world)?,
347 bytes,
348 })
349 })
350 .collect()
351 }
352
353 pub(crate) fn df(
354 &self,
355 path: &ValidatedProcPath,
356 tier: auth::Tier,
357 ) -> Result<DfSnapshot, EngineError> {
358 let permit = self.authorize_introspection(path, tier)?;
359 ensure_proc_endpoint(&permit, ProcEndpoint::Df)?;
360 let memory_used = self.core().mem.total_bytes();
361 let memory_worlds = self.core().mem.list().len();
362 let storage_used = self.core().storage_body_bytes.load(Ordering::Relaxed);
363 let durable_worlds = self
364 .core()
365 .durable_world_count
366 .load(Ordering::Relaxed)
367 .saturating_sub(usize::from(
368 self.core().delete_ledger_created.load(Ordering::Relaxed),
369 ));
370 Ok(DfSnapshot {
371 storage_used,
372 storage_quota: self.core().max_storage_bytes,
373 memory_used,
374 memory_quota: self.core().max_memory_bytes,
375 worlds: durable_worlds + memory_worlds,
376 })
377 }
378
379 pub(crate) fn pool(
380 &self,
381 path: &ValidatedProcPath,
382 tier: auth::Tier,
383 ) -> Result<PoolSnapshot, EngineError> {
384 let permit = self.authorize_introspection(path, tier)?;
385 ensure_proc_endpoint(&permit, ProcEndpoint::Pool)?;
386 Ok(PoolSnapshot {
387 read_cache_entries: self.core().read_cache.snapshot_entries(),
388 read_cache_tombstones: self.core().read_cache.snapshot_tombstones(),
389 read_cache_hits: self
390 .core()
391 .read_cache
392 .metrics
393 .read_cache_hits
394 .load(Ordering::Relaxed),
395 read_cache_misses: self
396 .core()
397 .read_cache
398 .metrics
399 .read_cache_misses
400 .load(Ordering::Relaxed),
401 read_cache_capped: self
402 .core()
403 .read_cache
404 .metrics
405 .read_cache_capped
406 .load(Ordering::Relaxed),
407 read_cache_evictions: self
408 .core()
409 .read_cache
410 .metrics
411 .read_cache_evictions
412 .load(Ordering::Relaxed),
413 read_cache_open_fails: self
414 .core()
415 .read_cache
416 .metrics
417 .read_cache_open_fails
418 .load(Ordering::Relaxed),
419 read_cache_max_entries: self.core().read_cache.max_entries,
420 ledger_writer_inits: self.core().ledger.inits.load(Ordering::Relaxed),
421 })
422 }
423
424 pub(crate) fn verify_audit(
425 &self,
426 path: &ValidatedProcPath,
427 tier: auth::Tier,
428 ) -> Result<AuditVerify, EngineError> {
429 let permit = self.authorize_introspection(path, tier)?;
430 ensure_proc_endpoint(&permit, ProcEndpoint::AuditVerify)?;
431 let world = permit
432 .path
433 .audit_world()
434 .ok_or(EngineError::InternalInvariant("audit verify missing world"))?;
435 if store::is_memory_world(world.as_str()) {
436 if !self.core().mem.contains(world.as_str()) {
437 return Err(EngineError::NotFound);
438 }
439 return Ok(AuditVerify::NotApplicable);
440 }
441 match self.core().cached_verify_chain(world.as_str()) {
442 Ok(Some(crate::audit::VerifyReport::Valid(report))) => {
443 Ok(AuditVerify::Valid(report.into()))
444 }
445 Ok(Some(crate::audit::VerifyReport::Broken(report))) => {
446 Ok(AuditVerify::Broken(report.into()))
447 }
448 Ok(None) => Err(EngineError::NotFound),
449 Err(err) => Err(storage_error_to_engine(
450 "audit verify",
451 err,
452 "verify_audit",
453 Some(world.as_str()),
454 )),
455 }
456 }
457
458 fn authorize_introspection(
459 &self,
460 path: &ValidatedProcPath,
461 tier: auth::Tier,
462 ) -> Result<IntrospectionPermit, EngineError> {
463 if !crate::can_read(self.core(), tier) {
464 return Err(EngineError::Auth(AuthGate::Read));
465 }
466 Ok(IntrospectionPermit { path: path.clone() })
467 }
468}
469
470impl Engine {
471 pub fn list_worlds(&self, tier: AccessTier) -> Result<Vec<ValidatedWorldPath>, EngineError> {
478 EngineOps::new(self.core()).list_worlds(&ValidatedProcPath::worlds(), tier.into())
479 }
480
481 pub fn list_worlds_with_prefix(
491 &self,
492 prefix: &str,
493 tier: AccessTier,
494 ) -> Result<Vec<ValidatedWorldPath>, EngineError> {
495 EngineOps::new(self.core()).list_worlds_with_prefix(prefix, tier.into())
496 }
497
498 pub fn list_worlds_with_prefix_bounded(
504 &self,
505 prefix: &str,
506 tier: AccessTier,
507 max: usize,
508 ) -> Result<Option<Vec<ValidatedWorldPath>>, EngineError> {
509 EngineOps::new(self.core()).list_worlds_with_prefix_bounded(prefix, tier.into(), max)
510 }
511
512 pub fn du(&self, tier: AccessTier) -> Result<Vec<WorldUsage>, EngineError> {
518 EngineOps::new(self.core()).du(&ValidatedProcPath::du(), tier.into())
519 }
520
521 pub fn df(&self, tier: AccessTier) -> Result<DfSnapshot, EngineError> {
526 EngineOps::new(self.core()).df(&ValidatedProcPath::df(), tier.into())
527 }
528
529 pub fn pool(&self, tier: AccessTier) -> Result<PoolSnapshot, EngineError> {
534 EngineOps::new(self.core()).pool(&ValidatedProcPath::pool(), tier.into())
535 }
536
537 pub fn verify_audit(
550 &self,
551 world: &ValidatedWorldPath,
552 tier: AccessTier,
553 ) -> Result<AuditVerify, EngineError> {
554 let path = ValidatedProcPath::audit_verify(world.clone());
555 EngineOps::new(self.core()).verify_audit(&path, tier.into())
556 }
557}
558
559fn storage_error_to_engine(
560 scope: &'static str,
561 err: rusqlite::Error,
562 operation: &'static str,
563 world: Option<&str>,
564) -> EngineError {
565 if crate::is_insufficient_storage_error(&err) {
566 log_storage_error(scope, &err, operation, world);
567 EngineError::InsufficientStorage {
568 sqlite_code: engine::sqlite_code(&err),
569 }
570 } else if crate::is_transient_storage_error(&err) {
571 log_storage_error(scope, &err, operation, world);
572 EngineError::TransientStorage {
573 sqlite_code: engine::sqlite_code(&err),
574 }
575 } else {
576 log_storage_error(scope, &err, operation, world);
577 EngineError::Storage {
578 sqlite_code: engine::sqlite_code(&err),
579 }
580 }
581}
582
583fn validated_world_from_storage(world: String) -> Result<ValidatedWorldPath, EngineError> {
584 ValidatedWorldPath::from_canonical(world)
585 .map_err(|_| EngineError::InternalInvariant("storage returned invalid world path"))
586}
587
588fn ensure_proc_endpoint(
589 permit: &IntrospectionPermit,
590 expected: ProcEndpoint,
591) -> Result<(), EngineError> {
592 if permit.path.endpoint() == expected {
593 Ok(())
594 } else {
595 Err(EngineError::InternalInvariant(
596 "proc permit endpoint mismatch",
597 ))
598 }
599}
600
601impl From<crate::audit::VerifyOk> for AuditValid {
602 fn from(value: crate::audit::VerifyOk) -> Self {
603 Self {
604 events: value.events,
605 genesis: value.genesis,
606 latest: value.latest,
607 }
608 }
609}
610
611impl From<crate::audit::VerifyBreak> for AuditBroken {
612 fn from(value: crate::audit::VerifyBreak) -> Self {
613 Self {
614 break_at: value.break_at,
615 expected: value.expected,
616 actual: value.actual,
617 }
618 }
619}
620
621#[cfg(test)]
622mod tests {
623 use std::path::PathBuf;
624 use std::time::{SystemTime, UNIX_EPOCH};
625
626 use bytes::Bytes;
627
628 use super::{AuditVerify, ProcEndpoint, ValidatedProcPath};
629 use crate::{
630 engine::{Engine, EngineError},
631 engine_ops::EngineOps,
632 engine_types::{
633 AccessTier, Preconditions, Representation, SecretBytes, ValidatedWorldPath,
634 },
635 AuthGate,
636 };
637
638 fn temp_root(name: &str) -> PathBuf {
639 let nonce = SystemTime::now()
640 .duration_since(UNIX_EPOCH)
641 .unwrap()
642 .as_nanos();
643 let root = std::env::temp_dir().join(format!(
644 "elastik-engine-introspection-{name}-{}-{nonce}",
645 std::process::id()
646 ));
647 let _ = std::fs::remove_dir_all(&root);
648 root
649 }
650
651 #[test]
652 fn validated_proc_path_accepts_declared_endpoints_only() {
653 assert_eq!(
654 ValidatedProcPath::new("/proc/worlds").unwrap().endpoint(),
655 ProcEndpoint::Worlds
656 );
657 assert_eq!(
658 ValidatedProcPath::new("proc/du").unwrap().endpoint(),
659 ProcEndpoint::Du
660 );
661 assert_eq!(
662 ValidatedProcPath::new("proc/audit/home/foo/verify")
663 .unwrap()
664 .audit_world()
665 .unwrap()
666 .as_str(),
667 "home/foo"
668 );
669
670 assert!(ValidatedProcPath::new("proc").is_err());
671 assert!(ValidatedProcPath::new("proc/nope").is_err());
672 assert!(ValidatedProcPath::new("proc/audit/foo/verify").is_err());
673 assert!(ValidatedProcPath::new("proc/audit//verify").is_err());
674 assert!(ValidatedProcPath::new("proc/audit/proc/version/verify").is_err());
675 }
676
677 #[tokio::test]
678 async fn engine_introspection_returns_typed_snapshots() {
679 let root = temp_root("snapshots");
680 let engine = Engine::builder()
681 .data_root(root.clone())
682 .key(SecretBytes::try_from_slice(b"key").unwrap())
683 .read_token(b"reader".to_vec())
684 .max_storage_bytes(Some(64))
685 .build()
686 .unwrap();
687 let disk = ValidatedWorldPath::new("home/inspect").unwrap();
688 let memory = ValidatedWorldPath::new("tmp/inspect").unwrap();
689
690 assert!(matches!(
691 engine.list_worlds(AccessTier::Anon),
692 Err(EngineError::Auth(AuthGate::Read))
693 ));
694
695 for world in [&disk, &memory] {
696 engine
697 .replace(
698 world,
699 Representation::new(Bytes::from_static(b"hello"), "text/plain", Vec::new()),
700 Preconditions::none(),
701 AccessTier::Write,
702 )
703 .await
704 .unwrap();
705 }
706
707 let worlds = engine.list_worlds(AccessTier::Read).unwrap();
708 assert!(worlds.iter().any(|world| world == &disk));
709 assert!(worlds.iter().any(|world| world == &memory));
710
711 let usage = engine.du(AccessTier::Read).unwrap();
712 assert!(usage
713 .iter()
714 .any(|row| row.world == disk && row.bytes == b"hello".len()));
715 assert!(usage
716 .iter()
717 .any(|row| row.world == memory && row.bytes == b"hello".len()));
718
719 let df = engine.df(AccessTier::Read).unwrap();
720 assert_eq!(df.storage_used, b"hello".len());
721 assert_eq!(df.storage_quota, Some(64));
722 assert_eq!(df.memory_used, b"hello".len());
723 assert_eq!(df.worlds, 2);
724
725 let pool = engine.pool(AccessTier::Read).unwrap();
726 assert_eq!(
727 pool.read_cache_max_entries,
728 crate::read_cache::DEFAULT_READ_CACHE_MAX_ENTRIES
729 );
730 assert!(pool.read_cache_entries <= pool.read_cache_max_entries);
731
732 assert!(matches!(
733 engine.verify_audit(&disk, AccessTier::Read).unwrap(),
734 AuditVerify::Valid(_)
735 ));
736 assert!(matches!(
737 engine.verify_audit(&memory, AccessTier::Read).unwrap(),
738 AuditVerify::NotApplicable
739 ));
740
741 assert!(matches!(
742 EngineOps::new(engine.core())
743 .list_worlds(&ValidatedProcPath::du(), AccessTier::Read.into()),
744 Err(EngineError::InternalInvariant(
745 "proc permit endpoint mismatch"
746 ))
747 ));
748
749 drop(engine);
750 let _ = std::fs::remove_dir_all(root);
751 }
752}