Skip to main content

liquid_cache/cache/observer/
mod.rs

1mod internal_tracing;
2mod stats;
3mod tracer;
4
5pub use internal_tracing::EventTrace;
6pub use stats::{CacheStats, RuntimeStats, RuntimeStatsSnapshot};
7pub use tracer::CacheTracer;
8
9use std::path::Path;
10
11use internal_tracing::EventTracer;
12pub(crate) use internal_tracing::InternalEvent;
13use stats::{RuntimeStats as RuntimeStatsInner, RuntimeStatsSnapshot as RuntimeStatsSnapshotInner};
14
15#[derive(Debug)]
16/// Cache-side observer for runtime stats, debug traces, and optional cache tracing.
17pub struct Observer {
18    runtime: RuntimeStatsInner,
19    event_tracer: EventTracer,
20    cache_tracer: CacheTracer,
21}
22
23impl Default for Observer {
24    fn default() -> Self {
25        Self::new()
26    }
27}
28
29impl Observer {
30    /// Create a new observer with all counters and traces reset.
31    pub fn new() -> Self {
32        Self {
33            runtime: RuntimeStatsInner::default(),
34            event_tracer: EventTracer::new(),
35            cache_tracer: CacheTracer::new(),
36        }
37    }
38
39    /// Snapshot runtime counters and reset them to zero.
40    pub fn runtime_snapshot(&self) -> RuntimeStatsSnapshotInner {
41        self.runtime.consume_snapshot()
42    }
43
44    /// Consume and clear the in-memory debug event trace.
45    pub fn consume_event_trace(&self) -> EventTrace {
46        self.event_tracer.drain()
47    }
48
49    /// Enable recording cache trace events (for offline analysis).
50    pub fn enable_cache_trace(&self) {
51        self.cache_tracer.enable();
52    }
53
54    /// Disable recording cache trace events.
55    pub fn disable_cache_trace(&self) {
56        self.cache_tracer.disable();
57    }
58
59    /// Flush recorded cache trace events to a Parquet file.
60    pub fn flush_cache_trace(&self, to_file: impl AsRef<Path>) {
61        self.cache_tracer.flush(to_file);
62    }
63
64    /// Access the underlying cache tracer.
65    pub fn cache_tracer(&self) -> &CacheTracer {
66        &self.cache_tracer
67    }
68
69    #[inline]
70    pub(crate) fn on_get(&self, selection: bool) {
71        self.runtime.incr_get();
72        if selection {
73            self.runtime.incr_get_with_selection();
74        }
75    }
76
77    #[inline]
78    pub(crate) fn on_try_read_liquid(&self) {
79        self.runtime.incr_try_read_liquid();
80    }
81
82    #[inline]
83    pub(crate) fn on_eval_predicate(&self) {
84        self.runtime.incr_eval_predicate();
85    }
86
87    #[inline]
88    pub(crate) fn on_get_squeezed_success(&self) {
89        self.runtime.incr_get_squeezed_success();
90    }
91
92    #[inline]
93    pub(crate) fn on_get_squeezed_needs_io(&self) {
94        self.runtime.incr_get_squeezed_needs_io();
95    }
96
97    #[inline]
98    pub(crate) fn on_hit_date32_expression(&self) {
99        self.runtime.incr_hit_date32_expression();
100    }
101
102    pub(crate) fn record_internal(&self, event: InternalEvent) {
103        match event {
104            InternalEvent::IoWrite { .. } => self.runtime.incr_write_io_count(),
105            InternalEvent::IoReadArrow { .. } | InternalEvent::IoReadLiquid { .. } => {
106                self.runtime.incr_read_io_count()
107            }
108            InternalEvent::IoReadSqueezedBacking { .. } => {
109                self.runtime.incr_read_io_count();
110                self.runtime.incr_get_squeezed_needs_io();
111            }
112            InternalEvent::DecompressSqueezed {
113                decompressed,
114                total,
115                ..
116            } => {
117                self.runtime
118                    .track_decompress_squeezed_count(decompressed, total);
119            }
120            _ => {}
121        }
122
123        if cfg!(debug_assertions) {
124            self.event_tracer.record(event);
125        }
126    }
127
128    pub(crate) fn runtime_stats(&self) -> &RuntimeStats {
129        &self.runtime
130    }
131}