1#![cfg_attr(not(feature = "unstable-engine"), allow(dead_code))]
7
8use std::{fmt, sync::atomic::Ordering};
9
10use crate::{
11 auth,
12 engine::{self, Engine, EngineError},
13 engine_ops::{log_storage_error, EngineOps},
14 engine_types::{AccessTier, ValidatedWorldPath},
15 store, world, AuthGate,
16};
17
18#[derive(Clone, Debug, PartialEq, Eq)]
24pub struct ValidatedProcPath {
25 endpoint: ProcEndpoint,
26 audit_world: Option<ValidatedWorldPath>,
27}
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub struct InvalidProcPath;
32
33#[derive(Clone, Copy, Debug, PartialEq, Eq)]
35#[non_exhaustive]
36pub enum ProcEndpoint {
37 Version,
39 Worlds,
41 Du,
43 Df,
45 Pool,
47 AuditVerify,
49}
50
51#[non_exhaustive]
53pub struct WorldUsage {
54 pub world: ValidatedWorldPath,
56 pub bytes: usize,
58}
59
60#[non_exhaustive]
65pub struct DfSnapshot {
66 pub storage_used: usize,
68 pub storage_quota: Option<usize>,
70 pub memory_used: usize,
72 pub memory_quota: usize,
74 pub worlds: usize,
76}
77
78#[non_exhaustive]
83pub struct PoolSnapshot {
84 pub read_cache_entries: usize,
86 pub read_cache_tombstones: usize,
88 pub read_cache_hits: usize,
94 pub read_cache_misses: usize,
100 pub read_cache_capped: usize,
105 pub read_cache_evictions: usize,
108 pub read_cache_open_fails: usize,
110 pub read_cache_max_entries: usize,
112 pub ledger_writer_inits: usize,
114}
115
116#[non_exhaustive]
118pub struct AuditValid {
119 pub events: usize,
121 pub genesis: String,
123 pub latest: String,
125}
126
127#[non_exhaustive]
129pub struct AuditBroken {
130 pub break_at: usize,
132 pub expected: String,
134 pub actual: String,
136}
137
138#[non_exhaustive]
140pub enum AuditVerify {
141 Valid(AuditValid),
143 Broken(AuditBroken),
145 NotApplicable,
147}
148
149struct IntrospectionPermit {
150 path: ValidatedProcPath,
151}
152
153impl ValidatedProcPath {
154 pub fn new(raw: impl AsRef<str>) -> Result<Self, InvalidProcPath> {
167 let raw = raw.as_ref().trim_matches('/');
168 match raw {
169 "proc/version" => Ok(Self::version()),
170 "proc/worlds" => Ok(Self::worlds()),
171 "proc/du" => Ok(Self::du()),
172 "proc/df" => Ok(Self::df()),
173 "proc/pool" => Ok(Self::pool()),
174 _ => {
175 let Some(world) = raw
176 .strip_prefix("proc/audit/")
177 .and_then(|value| value.strip_suffix("/verify"))
178 else {
179 return Err(InvalidProcPath);
180 };
181 if world.trim_matches('/').is_empty() {
182 return Err(InvalidProcPath);
183 }
184 let world = world.trim_end_matches('/').to_owned();
185 let world =
186 ValidatedWorldPath::from_canonical(world).map_err(|_| InvalidProcPath)?;
187 Ok(Self::audit_verify(world))
188 }
189 }
190 }
191
192 pub fn version() -> Self {
194 Self {
195 endpoint: ProcEndpoint::Version,
196 audit_world: None,
197 }
198 }
199
200 pub fn worlds() -> Self {
202 Self {
203 endpoint: ProcEndpoint::Worlds,
204 audit_world: None,
205 }
206 }
207
208 pub fn du() -> Self {
210 Self {
211 endpoint: ProcEndpoint::Du,
212 audit_world: None,
213 }
214 }
215
216 pub fn df() -> Self {
218 Self {
219 endpoint: ProcEndpoint::Df,
220 audit_world: None,
221 }
222 }
223
224 pub fn pool() -> Self {
226 Self {
227 endpoint: ProcEndpoint::Pool,
228 audit_world: None,
229 }
230 }
231
232 pub fn audit_verify(world: ValidatedWorldPath) -> Self {
235 Self {
236 endpoint: ProcEndpoint::AuditVerify,
237 audit_world: Some(world),
238 }
239 }
240
241 pub(crate) fn endpoint(&self) -> ProcEndpoint {
242 self.endpoint
243 }
244
245 pub(crate) fn audit_world(&self) -> Option<&ValidatedWorldPath> {
246 self.audit_world.as_ref()
247 }
248}
249
250impl fmt::Display for InvalidProcPath {
251 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252 f.write_str("invalid proc path")
253 }
254}
255
256impl std::error::Error for InvalidProcPath {}
257
258impl EngineOps<'_> {
259 pub(crate) fn list_worlds(
260 &self,
261 path: &ValidatedProcPath,
262 tier: auth::Tier,
263 ) -> Result<Vec<ValidatedWorldPath>, EngineError> {
264 let permit = self.authorize_introspection(path, tier)?;
265 ensure_proc_endpoint(&permit, ProcEndpoint::Worlds)?;
266 let mut names = world::list(&self.core().data)
267 .map_err(|err| storage_error_to_engine("proc worlds", err, "list_worlds", None))?;
268 names.extend(self.core().mem.list());
269 names.sort();
270 names.dedup();
271 names
272 .into_iter()
273 .map(validated_world_from_storage)
274 .collect()
275 }
276
277 pub(crate) fn du(
278 &self,
279 path: &ValidatedProcPath,
280 tier: auth::Tier,
281 ) -> Result<Vec<WorldUsage>, EngineError> {
282 let permit = self.authorize_introspection(path, tier)?;
283 ensure_proc_endpoint(&permit, ProcEndpoint::Du)?;
284 let mut sizes = world::sizes(&self.core().data)
285 .map_err(|err| storage_error_to_engine("proc du", err, "du", None))?;
286 sizes.extend(self.core().mem.sizes());
287 sizes.sort_by(|a, b| a.0.cmp(&b.0));
288 sizes.dedup_by(|a, b| a.0 == b.0);
289 sizes
290 .into_iter()
291 .map(|(world, bytes)| {
292 Ok(WorldUsage {
293 world: validated_world_from_storage(world)?,
294 bytes,
295 })
296 })
297 .collect()
298 }
299
300 pub(crate) fn df(
301 &self,
302 path: &ValidatedProcPath,
303 tier: auth::Tier,
304 ) -> Result<DfSnapshot, EngineError> {
305 let permit = self.authorize_introspection(path, tier)?;
306 ensure_proc_endpoint(&permit, ProcEndpoint::Df)?;
307 let memory_used = self.core().mem.total_bytes();
308 let memory_worlds = self.core().mem.list().len();
309 let storage_used = self.core().storage_body_bytes.load(Ordering::Relaxed);
310 let durable_worlds = self
311 .core()
312 .durable_world_count
313 .load(Ordering::Relaxed)
314 .saturating_sub(usize::from(
315 self.core().delete_ledger_created.load(Ordering::Relaxed),
316 ));
317 Ok(DfSnapshot {
318 storage_used,
319 storage_quota: self.core().max_storage_bytes,
320 memory_used,
321 memory_quota: self.core().max_memory_bytes,
322 worlds: durable_worlds + memory_worlds,
323 })
324 }
325
326 pub(crate) fn pool(
327 &self,
328 path: &ValidatedProcPath,
329 tier: auth::Tier,
330 ) -> Result<PoolSnapshot, EngineError> {
331 let permit = self.authorize_introspection(path, tier)?;
332 ensure_proc_endpoint(&permit, ProcEndpoint::Pool)?;
333 Ok(PoolSnapshot {
334 read_cache_entries: self.core().read_cache.snapshot_entries(),
335 read_cache_tombstones: self.core().read_cache.snapshot_tombstones(),
336 read_cache_hits: self
337 .core()
338 .read_cache
339 .metrics
340 .read_cache_hits
341 .load(Ordering::Relaxed),
342 read_cache_misses: self
343 .core()
344 .read_cache
345 .metrics
346 .read_cache_misses
347 .load(Ordering::Relaxed),
348 read_cache_capped: self
349 .core()
350 .read_cache
351 .metrics
352 .read_cache_capped
353 .load(Ordering::Relaxed),
354 read_cache_evictions: self
355 .core()
356 .read_cache
357 .metrics
358 .read_cache_evictions
359 .load(Ordering::Relaxed),
360 read_cache_open_fails: self
361 .core()
362 .read_cache
363 .metrics
364 .read_cache_open_fails
365 .load(Ordering::Relaxed),
366 read_cache_max_entries: self.core().read_cache.max_entries,
367 ledger_writer_inits: self.core().ledger.inits.load(Ordering::Relaxed),
368 })
369 }
370
371 pub(crate) fn verify_audit(
372 &self,
373 path: &ValidatedProcPath,
374 tier: auth::Tier,
375 ) -> Result<AuditVerify, EngineError> {
376 let permit = self.authorize_introspection(path, tier)?;
377 ensure_proc_endpoint(&permit, ProcEndpoint::AuditVerify)?;
378 let world = permit
379 .path
380 .audit_world()
381 .ok_or(EngineError::InternalInvariant("audit verify missing world"))?;
382 if store::is_memory_world(world.as_str()) {
383 if !self.core().mem.contains(world.as_str()) {
384 return Err(EngineError::NotFound);
385 }
386 return Ok(AuditVerify::NotApplicable);
387 }
388 match self.core().cached_verify_chain(world.as_str()) {
389 Ok(Some(crate::audit::VerifyReport::Valid(report))) => {
390 Ok(AuditVerify::Valid(report.into()))
391 }
392 Ok(Some(crate::audit::VerifyReport::Broken(report))) => {
393 Ok(AuditVerify::Broken(report.into()))
394 }
395 Ok(None) => Err(EngineError::NotFound),
396 Err(err) => Err(storage_error_to_engine(
397 "audit verify",
398 err,
399 "verify_audit",
400 Some(world.as_str()),
401 )),
402 }
403 }
404
405 fn authorize_introspection(
406 &self,
407 path: &ValidatedProcPath,
408 tier: auth::Tier,
409 ) -> Result<IntrospectionPermit, EngineError> {
410 if !crate::can_read(self.core(), tier) {
411 return Err(EngineError::Auth(AuthGate::Read));
412 }
413 Ok(IntrospectionPermit { path: path.clone() })
414 }
415}
416
417impl Engine {
418 pub fn list_worlds(&self, tier: AccessTier) -> Result<Vec<ValidatedWorldPath>, EngineError> {
425 EngineOps::new(self.core()).list_worlds(&ValidatedProcPath::worlds(), tier.into())
426 }
427
428 pub fn du(&self, tier: AccessTier) -> Result<Vec<WorldUsage>, EngineError> {
434 EngineOps::new(self.core()).du(&ValidatedProcPath::du(), tier.into())
435 }
436
437 pub fn df(&self, tier: AccessTier) -> Result<DfSnapshot, EngineError> {
442 EngineOps::new(self.core()).df(&ValidatedProcPath::df(), tier.into())
443 }
444
445 pub fn pool(&self, tier: AccessTier) -> Result<PoolSnapshot, EngineError> {
450 EngineOps::new(self.core()).pool(&ValidatedProcPath::pool(), tier.into())
451 }
452
453 pub fn verify_audit(
466 &self,
467 world: &ValidatedWorldPath,
468 tier: AccessTier,
469 ) -> Result<AuditVerify, EngineError> {
470 let path = ValidatedProcPath::audit_verify(world.clone());
471 EngineOps::new(self.core()).verify_audit(&path, tier.into())
472 }
473}
474
475fn storage_error_to_engine(
476 scope: &'static str,
477 err: rusqlite::Error,
478 operation: &'static str,
479 world: Option<&str>,
480) -> EngineError {
481 if crate::is_insufficient_storage_error(&err) {
482 log_storage_error(scope, &err, operation, world);
483 EngineError::InsufficientStorage {
484 sqlite_code: engine::sqlite_code(&err),
485 }
486 } else if crate::is_transient_storage_error(&err) {
487 log_storage_error(scope, &err, operation, world);
488 EngineError::TransientStorage {
489 sqlite_code: engine::sqlite_code(&err),
490 }
491 } else {
492 log_storage_error(scope, &err, operation, world);
493 EngineError::Storage {
494 sqlite_code: engine::sqlite_code(&err),
495 }
496 }
497}
498
499fn validated_world_from_storage(world: String) -> Result<ValidatedWorldPath, EngineError> {
500 ValidatedWorldPath::from_canonical(world)
501 .map_err(|_| EngineError::InternalInvariant("storage returned invalid world path"))
502}
503
504fn ensure_proc_endpoint(
505 permit: &IntrospectionPermit,
506 expected: ProcEndpoint,
507) -> Result<(), EngineError> {
508 if permit.path.endpoint() == expected {
509 Ok(())
510 } else {
511 Err(EngineError::InternalInvariant(
512 "proc permit endpoint mismatch",
513 ))
514 }
515}
516
517impl From<crate::audit::VerifyOk> for AuditValid {
518 fn from(value: crate::audit::VerifyOk) -> Self {
519 Self {
520 events: value.events,
521 genesis: value.genesis,
522 latest: value.latest,
523 }
524 }
525}
526
527impl From<crate::audit::VerifyBreak> for AuditBroken {
528 fn from(value: crate::audit::VerifyBreak) -> Self {
529 Self {
530 break_at: value.break_at,
531 expected: value.expected,
532 actual: value.actual,
533 }
534 }
535}
536
537#[cfg(test)]
538mod tests {
539 use std::path::PathBuf;
540 use std::time::{SystemTime, UNIX_EPOCH};
541
542 use bytes::Bytes;
543
544 use super::{AuditVerify, ProcEndpoint, ValidatedProcPath};
545 use crate::{
546 engine::{Engine, EngineError},
547 engine_ops::EngineOps,
548 engine_types::{
549 AccessTier, Preconditions, Representation, SecretBytes, ValidatedWorldPath,
550 },
551 AuthGate,
552 };
553
554 fn temp_root(name: &str) -> PathBuf {
555 let nonce = SystemTime::now()
556 .duration_since(UNIX_EPOCH)
557 .unwrap()
558 .as_nanos();
559 let root = std::env::temp_dir().join(format!(
560 "elastik-engine-introspection-{name}-{}-{nonce}",
561 std::process::id()
562 ));
563 let _ = std::fs::remove_dir_all(&root);
564 root
565 }
566
567 #[test]
568 fn validated_proc_path_accepts_declared_endpoints_only() {
569 assert_eq!(
570 ValidatedProcPath::new("/proc/worlds").unwrap().endpoint(),
571 ProcEndpoint::Worlds
572 );
573 assert_eq!(
574 ValidatedProcPath::new("proc/du").unwrap().endpoint(),
575 ProcEndpoint::Du
576 );
577 assert_eq!(
578 ValidatedProcPath::new("proc/audit/home/foo/verify")
579 .unwrap()
580 .audit_world()
581 .unwrap()
582 .as_str(),
583 "home/foo"
584 );
585
586 assert!(ValidatedProcPath::new("proc").is_err());
587 assert!(ValidatedProcPath::new("proc/nope").is_err());
588 assert!(ValidatedProcPath::new("proc/audit/foo/verify").is_err());
589 assert!(ValidatedProcPath::new("proc/audit//verify").is_err());
590 assert!(ValidatedProcPath::new("proc/audit/proc/version/verify").is_err());
591 }
592
593 #[tokio::test]
594 async fn engine_introspection_returns_typed_snapshots() {
595 let root = temp_root("snapshots");
596 let engine = Engine::builder()
597 .data_root(root.clone())
598 .key(SecretBytes::try_from_slice(b"key").unwrap())
599 .read_token(b"reader".to_vec())
600 .max_storage_bytes(Some(64))
601 .build()
602 .unwrap();
603 let disk = ValidatedWorldPath::new("home/inspect").unwrap();
604 let memory = ValidatedWorldPath::new("tmp/inspect").unwrap();
605
606 assert!(matches!(
607 engine.list_worlds(AccessTier::Anon),
608 Err(EngineError::Auth(AuthGate::Read))
609 ));
610
611 for world in [&disk, &memory] {
612 engine
613 .replace(
614 world,
615 Representation::new(Bytes::from_static(b"hello"), "text/plain", Vec::new()),
616 Preconditions::none(),
617 AccessTier::Write,
618 )
619 .await
620 .unwrap();
621 }
622
623 let worlds = engine.list_worlds(AccessTier::Read).unwrap();
624 assert!(worlds.iter().any(|world| world == &disk));
625 assert!(worlds.iter().any(|world| world == &memory));
626
627 let usage = engine.du(AccessTier::Read).unwrap();
628 assert!(usage
629 .iter()
630 .any(|row| row.world == disk && row.bytes == b"hello".len()));
631 assert!(usage
632 .iter()
633 .any(|row| row.world == memory && row.bytes == b"hello".len()));
634
635 let df = engine.df(AccessTier::Read).unwrap();
636 assert_eq!(df.storage_used, b"hello".len());
637 assert_eq!(df.storage_quota, Some(64));
638 assert_eq!(df.memory_used, b"hello".len());
639 assert_eq!(df.worlds, 2);
640
641 let pool = engine.pool(AccessTier::Read).unwrap();
642 assert_eq!(
643 pool.read_cache_max_entries,
644 crate::read_cache::DEFAULT_READ_CACHE_MAX_ENTRIES
645 );
646 assert!(pool.read_cache_entries <= pool.read_cache_max_entries);
647
648 assert!(matches!(
649 engine.verify_audit(&disk, AccessTier::Read).unwrap(),
650 AuditVerify::Valid(_)
651 ));
652 assert!(matches!(
653 engine.verify_audit(&memory, AccessTier::Read).unwrap(),
654 AuditVerify::NotApplicable
655 ));
656
657 assert!(matches!(
658 EngineOps::new(engine.core())
659 .list_worlds(&ValidatedProcPath::du(), AccessTier::Read.into()),
660 Err(EngineError::InternalInvariant(
661 "proc permit endpoint mismatch"
662 ))
663 ));
664
665 drop(engine);
666 let _ = std::fs::remove_dir_all(root);
667 }
668}