liquid_cache/cache/observer/
internal_tracing.rs1use std::sync::Mutex;
2use std::{fmt, fmt::Write};
3
4use crate::cache::{CacheExpression, CachedBatchType, EntryID};
5
6#[derive(Clone, PartialEq, Eq, serde::Serialize)]
7pub(crate) enum InternalEvent {
8 InsertFailed {
9 entry: EntryID,
10 kind: CachedBatchType,
11 },
12 InsertSuccess {
13 entry: EntryID,
14 kind: CachedBatchType,
15 },
16 SqueezeBegin {
17 victims: Vec<EntryID>,
18 },
19 SqueezeVictim {
20 entry: EntryID,
21 },
22 IoWrite {
23 entry: EntryID,
24 kind: CachedBatchType,
25 bytes: usize,
26 },
27 IoReadSqueezedBacking {
28 entry: EntryID,
29 bytes: usize,
30 },
31 IoReadArrow {
32 entry: EntryID,
33 bytes: usize,
34 },
35 IoReadLiquid {
36 entry: EntryID,
37 bytes: usize,
38 },
39 Read {
40 entry: EntryID,
41 selection: bool,
42 expr: Option<CacheExpression>,
43 cached: CachedBatchType,
44 },
45 Hydrate {
46 entry: EntryID,
47 cached: CachedBatchType,
48 new: CachedBatchType,
49 },
50 EvalPredicate {
51 entry: EntryID,
52 selection: bool,
53 cached: CachedBatchType,
54 },
55 ReadSqueezedData {
56 entry: EntryID,
57 expression: CacheExpression,
58 },
59 TryReadLiquid {
60 entry: EntryID,
61 },
62 DecompressSqueezed {
63 entry: EntryID,
64 decompressed: usize,
65 total: usize,
66 },
67}
68
69#[derive(Debug)]
70pub(crate) struct EventTracer {
71 events: Mutex<Vec<InternalEvent>>,
72}
73
74fn fmt_entry_list(buf: &mut String, victims: &[EntryID]) -> fmt::Result {
75 buf.push('[');
76 for (idx, v) in victims.iter().enumerate() {
77 if idx > 0 {
78 buf.push(',');
79 }
80 write!(buf, "{}", usize::from(*v))?;
81 }
82 buf.push(']');
83 Ok(())
84}
85
86impl fmt::Display for InternalEvent {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 match self {
89 InternalEvent::InsertFailed { entry, kind } => {
90 write!(
91 f,
92 "event=insert_failed entry={} kind={:?}",
93 usize::from(*entry),
94 kind
95 )
96 }
97 InternalEvent::InsertSuccess { entry, kind } => {
98 write!(
99 f,
100 "event=insert_success entry={} kind={:?}",
101 usize::from(*entry),
102 kind
103 )
104 }
105 InternalEvent::SqueezeBegin { victims } => {
106 let mut buf = String::new();
107 fmt_entry_list(&mut buf, victims)?;
108 write!(f, "event=squeeze_begin victims={}", buf)
109 }
110 InternalEvent::SqueezeVictim { entry } => {
111 write!(f, "event=squeeze_victim entry={}", usize::from(*entry))
112 }
113 InternalEvent::IoWrite { entry, kind, bytes } => {
114 write!(
115 f,
116 "event=io_write entry={} kind={:?} bytes={}",
117 usize::from(*entry),
118 kind,
119 bytes
120 )
121 }
122 InternalEvent::IoReadSqueezedBacking { entry, bytes } => {
123 write!(
124 f,
125 "event=io_read_squeezed_backing entry={} bytes={}",
126 usize::from(*entry),
127 bytes
128 )
129 }
130 InternalEvent::IoReadArrow { entry, bytes } => {
131 write!(
132 f,
133 "event=io_read_arrow entry={} bytes={}",
134 usize::from(*entry),
135 bytes
136 )
137 }
138 InternalEvent::IoReadLiquid { entry, bytes } => {
139 write!(
140 f,
141 "event=io_read_liquid entry={} bytes={}",
142 usize::from(*entry),
143 bytes
144 )
145 }
146 InternalEvent::Read {
147 entry,
148 selection,
149 expr,
150 cached,
151 } => write!(
152 f,
153 "event=read entry={} selection={} expr={} cached={:?}",
154 usize::from(*entry),
155 selection,
156 expr.as_ref()
157 .map(|e| e.to_string())
158 .unwrap_or_else(|| "None".to_string()),
159 cached
160 ),
161 InternalEvent::Hydrate { entry, cached, new } => write!(
162 f,
163 "event=hydrate entry={} cached={:?} new={:?}",
164 usize::from(*entry),
165 cached,
166 new
167 ),
168 InternalEvent::EvalPredicate {
169 entry,
170 selection,
171 cached,
172 } => write!(
173 f,
174 "event=eval_predicate entry={} selection={} cached={:?}",
175 usize::from(*entry),
176 selection,
177 cached
178 ),
179 InternalEvent::TryReadLiquid { entry } => {
180 write!(f, "event=try_read_liquid entry={}", usize::from(*entry))
181 }
182 InternalEvent::ReadSqueezedData { entry, expression } => {
183 write!(
184 f,
185 "event=read_squeezed_data entry={} expression={}",
186 usize::from(*entry),
187 expression
188 )
189 }
190 InternalEvent::DecompressSqueezed {
191 entry,
192 decompressed,
193 total,
194 } => {
195 write!(
196 f,
197 "event=decompress_squeezed entry={} decompressed={} total={}",
198 usize::from(*entry),
199 decompressed,
200 total
201 )
202 }
203 }
204 }
205}
206
207impl fmt::Debug for InternalEvent {
208 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209 fmt::Display::fmt(self, f)
210 }
211}
212
213impl EventTracer {
214 pub fn new() -> Self {
215 Self {
216 events: Mutex::new(Vec::new()),
217 }
218 }
219
220 pub fn record(&self, event: InternalEvent) {
221 self.events.lock().unwrap().push(event);
222 }
223
224 pub fn drain(&self) -> EventTrace {
225 EventTrace {
226 events: std::mem::take(&mut *self.events.lock().unwrap()),
227 }
228 }
229}
230
231#[derive(PartialEq, Eq, serde::Serialize)]
234pub struct EventTrace {
235 events: Vec<InternalEvent>,
236}
237
238impl From<Vec<InternalEvent>> for EventTrace {
239 fn from(events: Vec<InternalEvent>) -> Self {
240 Self { events }
241 }
242}
243
244impl fmt::Display for EventTrace {
245 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246 writeln!(f, "{:?}", self)
247 }
248}
249
250impl fmt::Debug for EventTrace {
251 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252 writeln!(f, "EventTrace: [")?;
253 for event in &self.events {
254 writeln!(f, "{}", event)?;
255 }
256 writeln!(f, "]")?;
257 Ok(())
258 }
259}