1#![cfg_attr(not(feature = "unstable-engine"), allow(dead_code))]
7
8use std::{fmt, sync::atomic::Ordering};
9
10use crate::{
11 auth,
12 engine::{self, Engine, EngineError},
13 engine_ops::{log_storage_error, EngineOps},
14 engine_types::{AccessTier, ValidatedWorldPath},
15 store, world, AuthGate,
16};
17
18#[derive(Clone, Debug, PartialEq, Eq)]
24pub struct ValidatedProcPath {
25 endpoint: ProcEndpoint,
26 audit_world: Option<ValidatedWorldPath>,
27}
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub struct InvalidProcPath;
32
33#[derive(Clone, Copy, Debug, PartialEq, Eq)]
35#[non_exhaustive]
36pub enum ProcEndpoint {
37 Version,
39 Worlds,
41 Du,
43 Df,
45 Pool,
47 AuditVerify,
49}
50
51pub struct WorldUsage {
53 pub world: ValidatedWorldPath,
55 pub bytes: usize,
57}
58
59pub struct DfSnapshot {
64 pub storage_used: usize,
66 pub storage_quota: Option<usize>,
68 pub memory_used: usize,
70 pub memory_quota: usize,
72 pub worlds: usize,
74}
75
76pub struct PoolSnapshot {
81 pub read_cache_entries: usize,
83 pub read_cache_tombstones: usize,
85 pub read_cache_hits: usize,
87 pub read_cache_misses: usize,
89 pub read_cache_capped: usize,
91 pub read_cache_open_fails: usize,
93 pub read_cache_max_entries: usize,
95 pub ledger_writer_inits: usize,
97}
98
99pub struct AuditValid {
101 pub events: usize,
103 pub genesis: String,
105 pub latest: String,
107}
108
109pub struct AuditBroken {
111 pub break_at: usize,
113 pub expected: String,
115 pub actual: String,
117}
118
119#[non_exhaustive]
121pub enum AuditVerify {
122 Valid(AuditValid),
124 Broken(AuditBroken),
126 NotApplicable,
128}
129
130struct IntrospectionPermit {
131 path: ValidatedProcPath,
132}
133
134impl ValidatedProcPath {
135 pub fn new(raw: impl AsRef<str>) -> Result<Self, InvalidProcPath> {
146 let raw = raw.as_ref().trim_matches('/');
147 match raw {
148 "proc/version" => Ok(Self::version()),
149 "proc/worlds" => Ok(Self::worlds()),
150 "proc/du" => Ok(Self::du()),
151 "proc/df" => Ok(Self::df()),
152 "proc/pool" => Ok(Self::pool()),
153 _ => {
154 let Some(world) = raw
155 .strip_prefix("proc/audit/")
156 .and_then(|value| value.strip_suffix("/verify"))
157 else {
158 return Err(InvalidProcPath);
159 };
160 if world.trim_matches('/').is_empty() {
161 return Err(InvalidProcPath);
162 }
163 let world = world.trim_end_matches('/').to_owned();
164 let world =
165 ValidatedWorldPath::from_canonical(world).map_err(|_| InvalidProcPath)?;
166 Ok(Self::audit_verify(world))
167 }
168 }
169 }
170
171 pub fn version() -> Self {
173 Self {
174 endpoint: ProcEndpoint::Version,
175 audit_world: None,
176 }
177 }
178
179 pub fn worlds() -> Self {
181 Self {
182 endpoint: ProcEndpoint::Worlds,
183 audit_world: None,
184 }
185 }
186
187 pub fn du() -> Self {
189 Self {
190 endpoint: ProcEndpoint::Du,
191 audit_world: None,
192 }
193 }
194
195 pub fn df() -> Self {
197 Self {
198 endpoint: ProcEndpoint::Df,
199 audit_world: None,
200 }
201 }
202
203 pub fn pool() -> Self {
205 Self {
206 endpoint: ProcEndpoint::Pool,
207 audit_world: None,
208 }
209 }
210
211 pub fn audit_verify(world: ValidatedWorldPath) -> Self {
214 Self {
215 endpoint: ProcEndpoint::AuditVerify,
216 audit_world: Some(world),
217 }
218 }
219
220 pub(crate) fn endpoint(&self) -> ProcEndpoint {
221 self.endpoint
222 }
223
224 pub(crate) fn audit_world(&self) -> Option<&ValidatedWorldPath> {
225 self.audit_world.as_ref()
226 }
227}
228
229impl fmt::Display for InvalidProcPath {
230 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
231 f.write_str("invalid proc path")
232 }
233}
234
235impl std::error::Error for InvalidProcPath {}
236
237impl EngineOps<'_> {
238 pub(crate) fn list_worlds(
239 &self,
240 path: &ValidatedProcPath,
241 tier: auth::Tier,
242 ) -> Result<Vec<ValidatedWorldPath>, EngineError> {
243 let permit = self.authorize_introspection(path, tier)?;
244 ensure_proc_endpoint(&permit, ProcEndpoint::Worlds)?;
245 let mut names = world::list(&self.core().data)
246 .map_err(|err| storage_error_to_engine("proc worlds", err, "list_worlds", None))?;
247 names.extend(self.core().mem.list());
248 names.sort();
249 names.dedup();
250 names
251 .into_iter()
252 .map(validated_world_from_storage)
253 .collect()
254 }
255
256 pub(crate) fn du(
257 &self,
258 path: &ValidatedProcPath,
259 tier: auth::Tier,
260 ) -> Result<Vec<WorldUsage>, EngineError> {
261 let permit = self.authorize_introspection(path, tier)?;
262 ensure_proc_endpoint(&permit, ProcEndpoint::Du)?;
263 let mut sizes = world::sizes(&self.core().data)
264 .map_err(|err| storage_error_to_engine("proc du", err, "du", None))?;
265 sizes.extend(self.core().mem.sizes());
266 sizes.sort_by(|a, b| a.0.cmp(&b.0));
267 sizes.dedup_by(|a, b| a.0 == b.0);
268 sizes
269 .into_iter()
270 .map(|(world, bytes)| {
271 Ok(WorldUsage {
272 world: validated_world_from_storage(world)?,
273 bytes,
274 })
275 })
276 .collect()
277 }
278
279 pub(crate) fn df(
280 &self,
281 path: &ValidatedProcPath,
282 tier: auth::Tier,
283 ) -> Result<DfSnapshot, EngineError> {
284 let permit = self.authorize_introspection(path, tier)?;
285 ensure_proc_endpoint(&permit, ProcEndpoint::Df)?;
286 let memory_used = self.core().mem.total_bytes();
287 let memory_worlds = self.core().mem.list().len();
288 let storage_used = self.core().storage_body_bytes.load(Ordering::Relaxed);
289 let durable_worlds = self
290 .core()
291 .durable_world_count
292 .load(Ordering::Relaxed)
293 .saturating_sub(usize::from(
294 self.core().delete_ledger_created.load(Ordering::Relaxed),
295 ));
296 Ok(DfSnapshot {
297 storage_used,
298 storage_quota: self.core().max_storage_bytes,
299 memory_used,
300 memory_quota: self.core().max_memory_bytes,
301 worlds: durable_worlds + memory_worlds,
302 })
303 }
304
305 pub(crate) fn pool(
306 &self,
307 path: &ValidatedProcPath,
308 tier: auth::Tier,
309 ) -> Result<PoolSnapshot, EngineError> {
310 let permit = self.authorize_introspection(path, tier)?;
311 ensure_proc_endpoint(&permit, ProcEndpoint::Pool)?;
312 Ok(PoolSnapshot {
313 read_cache_entries: self.core().read_cache.snapshot_entries(),
314 read_cache_tombstones: self.core().read_cache.snapshot_tombstones(),
315 read_cache_hits: self
316 .core()
317 .read_cache
318 .metrics
319 .read_cache_hits
320 .load(Ordering::Relaxed),
321 read_cache_misses: self
322 .core()
323 .read_cache
324 .metrics
325 .read_cache_misses
326 .load(Ordering::Relaxed),
327 read_cache_capped: self
328 .core()
329 .read_cache
330 .metrics
331 .read_cache_capped
332 .load(Ordering::Relaxed),
333 read_cache_open_fails: self
334 .core()
335 .read_cache
336 .metrics
337 .read_cache_open_fails
338 .load(Ordering::Relaxed),
339 read_cache_max_entries: self.core().read_cache.max_entries,
340 ledger_writer_inits: self.core().ledger.inits.load(Ordering::Relaxed),
341 })
342 }
343
344 pub(crate) fn verify_audit(
345 &self,
346 path: &ValidatedProcPath,
347 tier: auth::Tier,
348 ) -> Result<AuditVerify, EngineError> {
349 let permit = self.authorize_introspection(path, tier)?;
350 ensure_proc_endpoint(&permit, ProcEndpoint::AuditVerify)?;
351 let world = permit
352 .path
353 .audit_world()
354 .ok_or(EngineError::InternalInvariant("audit verify missing world"))?;
355 if store::is_memory_world(world.as_str()) {
356 if !self.core().mem.contains(world.as_str()) {
357 return Err(EngineError::NotFound);
358 }
359 return Ok(AuditVerify::NotApplicable);
360 }
361 match self.core().cached_verify_chain(world.as_str()) {
362 Ok(Some(crate::audit::VerifyReport::Valid(report))) => {
363 Ok(AuditVerify::Valid(report.into()))
364 }
365 Ok(Some(crate::audit::VerifyReport::Broken(report))) => {
366 Ok(AuditVerify::Broken(report.into()))
367 }
368 Ok(None) => Err(EngineError::NotFound),
369 Err(err) => Err(storage_error_to_engine(
370 "audit verify",
371 err,
372 "verify_audit",
373 Some(world.as_str()),
374 )),
375 }
376 }
377
378 fn authorize_introspection(
379 &self,
380 path: &ValidatedProcPath,
381 tier: auth::Tier,
382 ) -> Result<IntrospectionPermit, EngineError> {
383 if !crate::can_read(self.core(), tier) {
384 return Err(EngineError::Auth(AuthGate::Read));
385 }
386 Ok(IntrospectionPermit { path: path.clone() })
387 }
388}
389
390impl Engine {
391 pub fn list_worlds(&self, tier: AccessTier) -> Result<Vec<ValidatedWorldPath>, EngineError> {
398 EngineOps::new(self.core()).list_worlds(&ValidatedProcPath::worlds(), tier.into())
399 }
400
401 pub fn du(&self, tier: AccessTier) -> Result<Vec<WorldUsage>, EngineError> {
407 EngineOps::new(self.core()).du(&ValidatedProcPath::du(), tier.into())
408 }
409
410 pub fn df(&self, tier: AccessTier) -> Result<DfSnapshot, EngineError> {
415 EngineOps::new(self.core()).df(&ValidatedProcPath::df(), tier.into())
416 }
417
418 pub fn pool(&self, tier: AccessTier) -> Result<PoolSnapshot, EngineError> {
423 EngineOps::new(self.core()).pool(&ValidatedProcPath::pool(), tier.into())
424 }
425
426 pub fn verify_audit(
439 &self,
440 world: &ValidatedWorldPath,
441 tier: AccessTier,
442 ) -> Result<AuditVerify, EngineError> {
443 let path = ValidatedProcPath::audit_verify(world.clone());
444 EngineOps::new(self.core()).verify_audit(&path, tier.into())
445 }
446}
447
448fn storage_error_to_engine(
449 scope: &'static str,
450 err: rusqlite::Error,
451 operation: &'static str,
452 world: Option<&str>,
453) -> EngineError {
454 if crate::is_insufficient_storage_error(&err) {
455 log_storage_error(scope, &err, operation, world);
456 EngineError::InsufficientStorage {
457 sqlite_code: engine::sqlite_code(&err),
458 }
459 } else if crate::is_transient_storage_error(&err) {
460 log_storage_error(scope, &err, operation, world);
461 EngineError::TransientStorage {
462 sqlite_code: engine::sqlite_code(&err),
463 }
464 } else {
465 log_storage_error(scope, &err, operation, world);
466 EngineError::Storage {
467 sqlite_code: engine::sqlite_code(&err),
468 }
469 }
470}
471
472fn validated_world_from_storage(world: String) -> Result<ValidatedWorldPath, EngineError> {
473 ValidatedWorldPath::from_canonical(world)
474 .map_err(|_| EngineError::InternalInvariant("storage returned invalid world path"))
475}
476
477fn ensure_proc_endpoint(
478 permit: &IntrospectionPermit,
479 expected: ProcEndpoint,
480) -> Result<(), EngineError> {
481 if permit.path.endpoint() == expected {
482 Ok(())
483 } else {
484 Err(EngineError::InternalInvariant(
485 "proc permit endpoint mismatch",
486 ))
487 }
488}
489
490impl From<crate::audit::VerifyOk> for AuditValid {
491 fn from(value: crate::audit::VerifyOk) -> Self {
492 Self {
493 events: value.events,
494 genesis: value.genesis,
495 latest: value.latest,
496 }
497 }
498}
499
500impl From<crate::audit::VerifyBreak> for AuditBroken {
501 fn from(value: crate::audit::VerifyBreak) -> Self {
502 Self {
503 break_at: value.break_at,
504 expected: value.expected,
505 actual: value.actual,
506 }
507 }
508}
509
510#[cfg(test)]
511mod tests {
512 use std::path::PathBuf;
513 use std::time::{SystemTime, UNIX_EPOCH};
514
515 use bytes::Bytes;
516
517 use super::{AuditVerify, ProcEndpoint, ValidatedProcPath};
518 use crate::{
519 engine::{Engine, EngineError},
520 engine_ops::EngineOps,
521 engine_types::{
522 AccessTier, Preconditions, Representation, SecretBytes, ValidatedWorldPath,
523 },
524 AuthGate,
525 };
526
527 fn temp_root(name: &str) -> PathBuf {
528 let nonce = SystemTime::now()
529 .duration_since(UNIX_EPOCH)
530 .unwrap()
531 .as_nanos();
532 let root = std::env::temp_dir().join(format!(
533 "elastik-engine-introspection-{name}-{}-{nonce}",
534 std::process::id()
535 ));
536 let _ = std::fs::remove_dir_all(&root);
537 root
538 }
539
540 #[test]
541 fn validated_proc_path_accepts_declared_endpoints_only() {
542 assert_eq!(
543 ValidatedProcPath::new("/proc/worlds").unwrap().endpoint(),
544 ProcEndpoint::Worlds
545 );
546 assert_eq!(
547 ValidatedProcPath::new("proc/du").unwrap().endpoint(),
548 ProcEndpoint::Du
549 );
550 assert_eq!(
551 ValidatedProcPath::new("proc/audit/home/foo/verify")
552 .unwrap()
553 .audit_world()
554 .unwrap()
555 .as_str(),
556 "home/foo"
557 );
558
559 assert!(ValidatedProcPath::new("proc").is_err());
560 assert!(ValidatedProcPath::new("proc/nope").is_err());
561 assert!(ValidatedProcPath::new("proc/audit/foo/verify").is_err());
562 assert!(ValidatedProcPath::new("proc/audit//verify").is_err());
563 assert!(ValidatedProcPath::new("proc/audit/proc/version/verify").is_err());
564 }
565
566 #[tokio::test]
567 async fn engine_introspection_returns_typed_snapshots() {
568 let root = temp_root("snapshots");
569 let engine = Engine::builder()
570 .data_root(root.clone())
571 .key(SecretBytes::try_from_slice(b"key").unwrap())
572 .read_token(b"reader".to_vec())
573 .max_storage_bytes(Some(64))
574 .build()
575 .unwrap();
576 let disk = ValidatedWorldPath::new("home/inspect").unwrap();
577 let memory = ValidatedWorldPath::new("tmp/inspect").unwrap();
578
579 assert!(matches!(
580 engine.list_worlds(AccessTier::Anon),
581 Err(EngineError::Auth(AuthGate::Read))
582 ));
583
584 for world in [&disk, &memory] {
585 engine
586 .replace(
587 world,
588 Representation {
589 body: Bytes::from_static(b"hello"),
590 content_type: "text/plain".to_owned(),
591 headers: Vec::new(),
592 },
593 Preconditions::none(),
594 AccessTier::Write,
595 )
596 .await
597 .unwrap();
598 }
599
600 let worlds = engine.list_worlds(AccessTier::Read).unwrap();
601 assert!(worlds.iter().any(|world| world == &disk));
602 assert!(worlds.iter().any(|world| world == &memory));
603
604 let usage = engine.du(AccessTier::Read).unwrap();
605 assert!(usage
606 .iter()
607 .any(|row| row.world == disk && row.bytes == b"hello".len()));
608 assert!(usage
609 .iter()
610 .any(|row| row.world == memory && row.bytes == b"hello".len()));
611
612 let df = engine.df(AccessTier::Read).unwrap();
613 assert_eq!(df.storage_used, b"hello".len());
614 assert_eq!(df.storage_quota, Some(64));
615 assert_eq!(df.memory_used, b"hello".len());
616 assert_eq!(df.worlds, 2);
617
618 let pool = engine.pool(AccessTier::Read).unwrap();
619 assert_eq!(
620 pool.read_cache_max_entries,
621 crate::read_cache::DEFAULT_READ_CACHE_MAX_ENTRIES
622 );
623 assert!(pool.read_cache_entries <= pool.read_cache_max_entries);
624
625 assert!(matches!(
626 engine.verify_audit(&disk, AccessTier::Read).unwrap(),
627 AuditVerify::Valid(_)
628 ));
629 assert!(matches!(
630 engine.verify_audit(&memory, AccessTier::Read).unwrap(),
631 AuditVerify::NotApplicable
632 ));
633
634 assert!(matches!(
635 EngineOps::new(engine.core())
636 .list_worlds(&ValidatedProcPath::du(), AccessTier::Read.into()),
637 Err(EngineError::InternalInvariant(
638 "proc permit endpoint mismatch"
639 ))
640 ));
641
642 drop(engine);
643 let _ = std::fs::remove_dir_all(root);
644 }
645}