liquid_cache/cache/observer/
internal_tracing.rs1use std::sync::Mutex;
2use std::{fmt, fmt::Write};
3
4use crate::cache::{CacheExpression, CachedBatchType, EntryID};
5
6#[derive(Clone, PartialEq, Eq, serde::Serialize)]
7pub(crate) enum InternalEvent {
8 InsertFailed {
9 entry: EntryID,
10 kind: CachedBatchType,
11 },
12 InsertSuccess {
13 entry: EntryID,
14 kind: CachedBatchType,
15 },
16 SqueezeBegin {
17 victims: Vec<EntryID>,
18 },
19 SqueezeVictim {
20 entry: EntryID,
21 },
22 IoWrite {
23 entry: EntryID,
24 kind: CachedBatchType,
25 bytes: usize,
26 },
27 DiskEvict {
28 entry: EntryID,
29 bytes: usize,
30 },
31 IoReadSqueezedBacking {
32 entry: EntryID,
33 bytes: usize,
34 },
35 IoReadArrow {
36 entry: EntryID,
37 bytes: usize,
38 },
39 IoReadLiquid {
40 entry: EntryID,
41 bytes: usize,
42 },
43 Read {
44 entry: EntryID,
45 selection: bool,
46 expr: Option<CacheExpression>,
47 cached: CachedBatchType,
48 },
49 Hydrate {
50 entry: EntryID,
51 cached: CachedBatchType,
52 new: CachedBatchType,
53 },
54 EvalPredicate {
55 entry: EntryID,
56 selection: bool,
57 cached: CachedBatchType,
58 },
59 ReadSqueezedData {
60 entry: EntryID,
61 expression: CacheExpression,
62 },
63 TryReadLiquid {
64 entry: EntryID,
65 },
66 DecompressSqueezed {
67 entry: EntryID,
68 decompressed: usize,
69 total: usize,
70 },
71}
72
73#[derive(Debug)]
74pub(crate) struct EventTracer {
75 events: Mutex<Vec<InternalEvent>>,
76}
77
78fn fmt_entry_list(buf: &mut String, victims: &[EntryID]) -> fmt::Result {
79 buf.push('[');
80 for (idx, v) in victims.iter().enumerate() {
81 if idx > 0 {
82 buf.push(',');
83 }
84 write!(buf, "{}", usize::from(*v))?;
85 }
86 buf.push(']');
87 Ok(())
88}
89
90impl fmt::Display for InternalEvent {
91 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92 match self {
93 InternalEvent::InsertFailed { entry, kind } => {
94 write!(
95 f,
96 "event=insert_failed entry={} kind={:?}",
97 usize::from(*entry),
98 kind
99 )
100 }
101 InternalEvent::InsertSuccess { entry, kind } => {
102 write!(
103 f,
104 "event=insert_success entry={} kind={:?}",
105 usize::from(*entry),
106 kind
107 )
108 }
109 InternalEvent::SqueezeBegin { victims } => {
110 let mut buf = String::new();
111 fmt_entry_list(&mut buf, victims)?;
112 write!(f, "event=squeeze_begin victims={}", buf)
113 }
114 InternalEvent::SqueezeVictim { entry } => {
115 write!(f, "event=squeeze_victim entry={}", usize::from(*entry))
116 }
117 InternalEvent::IoWrite { entry, kind, bytes } => {
118 write!(
119 f,
120 "event=io_write entry={} kind={:?} bytes={}",
121 usize::from(*entry),
122 kind,
123 bytes
124 )
125 }
126 InternalEvent::DiskEvict { entry, bytes } => {
127 write!(
128 f,
129 "event=disk_evict entry={} bytes={}",
130 usize::from(*entry),
131 bytes
132 )
133 }
134 InternalEvent::IoReadSqueezedBacking { entry, bytes } => {
135 write!(
136 f,
137 "event=io_read_squeezed_backing entry={} bytes={}",
138 usize::from(*entry),
139 bytes
140 )
141 }
142 InternalEvent::IoReadArrow { entry, bytes } => {
143 write!(
144 f,
145 "event=io_read_arrow entry={} bytes={}",
146 usize::from(*entry),
147 bytes
148 )
149 }
150 InternalEvent::IoReadLiquid { entry, bytes } => {
151 write!(
152 f,
153 "event=io_read_liquid entry={} bytes={}",
154 usize::from(*entry),
155 bytes
156 )
157 }
158 InternalEvent::Read {
159 entry,
160 selection,
161 expr,
162 cached,
163 } => write!(
164 f,
165 "event=read entry={} selection={} expr={} cached={:?}",
166 usize::from(*entry),
167 selection,
168 expr.as_ref()
169 .map(|e| e.to_string())
170 .unwrap_or_else(|| "None".to_string()),
171 cached
172 ),
173 InternalEvent::Hydrate { entry, cached, new } => write!(
174 f,
175 "event=hydrate entry={} cached={:?} new={:?}",
176 usize::from(*entry),
177 cached,
178 new
179 ),
180 InternalEvent::EvalPredicate {
181 entry,
182 selection,
183 cached,
184 } => write!(
185 f,
186 "event=eval_predicate entry={} selection={} cached={:?}",
187 usize::from(*entry),
188 selection,
189 cached
190 ),
191 InternalEvent::TryReadLiquid { entry } => {
192 write!(f, "event=try_read_liquid entry={}", usize::from(*entry))
193 }
194 InternalEvent::ReadSqueezedData { entry, expression } => {
195 write!(
196 f,
197 "event=read_squeezed_data entry={} expression={}",
198 usize::from(*entry),
199 expression
200 )
201 }
202 InternalEvent::DecompressSqueezed {
203 entry,
204 decompressed,
205 total,
206 } => {
207 write!(
208 f,
209 "event=decompress_squeezed entry={} decompressed={} total={}",
210 usize::from(*entry),
211 decompressed,
212 total
213 )
214 }
215 }
216 }
217}
218
219impl fmt::Debug for InternalEvent {
220 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221 fmt::Display::fmt(self, f)
222 }
223}
224
225impl EventTracer {
226 pub fn new() -> Self {
227 Self {
228 events: Mutex::new(Vec::new()),
229 }
230 }
231
232 pub fn record(&self, event: InternalEvent) {
233 self.events.lock().unwrap().push(event);
234 }
235
236 pub fn drain(&self) -> EventTrace {
237 EventTrace {
238 events: std::mem::take(&mut *self.events.lock().unwrap()),
239 }
240 }
241}
242
243#[derive(PartialEq, Eq, serde::Serialize)]
246pub struct EventTrace {
247 events: Vec<InternalEvent>,
248}
249
250impl From<Vec<InternalEvent>> for EventTrace {
251 fn from(events: Vec<InternalEvent>) -> Self {
252 Self { events }
253 }
254}
255
256impl fmt::Display for EventTrace {
257 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258 writeln!(f, "{:?}", self)
259 }
260}
261
262impl fmt::Debug for EventTrace {
263 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
264 writeln!(f, "EventTrace: [")?;
265 for event in &self.events {
266 writeln!(f, "{}", event)?;
267 }
268 writeln!(f, "]")?;
269 Ok(())
270 }
271}