Skip to main content

liquid_cache/cache/observer/
internal_tracing.rs

1use std::sync::Mutex;
2use std::{fmt, fmt::Write};
3
4use crate::cache::{CacheExpression, CachedBatchType, EntryID};
5
6#[derive(Clone, PartialEq, Eq, serde::Serialize)]
7pub(crate) enum InternalEvent {
8    InsertFailed {
9        entry: EntryID,
10        kind: CachedBatchType,
11    },
12    InsertSuccess {
13        entry: EntryID,
14        kind: CachedBatchType,
15    },
16    SqueezeBegin {
17        victims: Vec<EntryID>,
18    },
19    SqueezeVictim {
20        entry: EntryID,
21    },
22    IoWrite {
23        entry: EntryID,
24        kind: CachedBatchType,
25        bytes: usize,
26    },
27    IoReadSqueezedBacking {
28        entry: EntryID,
29        bytes: usize,
30    },
31    IoReadArrow {
32        entry: EntryID,
33        bytes: usize,
34    },
35    IoReadLiquid {
36        entry: EntryID,
37        bytes: usize,
38    },
39    Read {
40        entry: EntryID,
41        selection: bool,
42        expr: Option<CacheExpression>,
43        cached: CachedBatchType,
44    },
45    Hydrate {
46        entry: EntryID,
47        cached: CachedBatchType,
48        new: CachedBatchType,
49    },
50    EvalPredicate {
51        entry: EntryID,
52        selection: bool,
53        cached: CachedBatchType,
54    },
55    ReadSqueezedData {
56        entry: EntryID,
57        expression: CacheExpression,
58    },
59    TryReadLiquid {
60        entry: EntryID,
61    },
62    DecompressSqueezed {
63        entry: EntryID,
64        decompressed: usize,
65        total: usize,
66    },
67}
68
69#[derive(Debug)]
70pub(crate) struct EventTracer {
71    events: Mutex<Vec<InternalEvent>>,
72}
73
74fn fmt_entry_list(buf: &mut String, victims: &[EntryID]) -> fmt::Result {
75    buf.push('[');
76    for (idx, v) in victims.iter().enumerate() {
77        if idx > 0 {
78            buf.push(',');
79        }
80        write!(buf, "{}", usize::from(*v))?;
81    }
82    buf.push(']');
83    Ok(())
84}
85
86impl fmt::Display for InternalEvent {
87    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88        match self {
89            InternalEvent::InsertFailed { entry, kind } => {
90                write!(
91                    f,
92                    "event=insert_failed entry={} kind={:?}",
93                    usize::from(*entry),
94                    kind
95                )
96            }
97            InternalEvent::InsertSuccess { entry, kind } => {
98                write!(
99                    f,
100                    "event=insert_success entry={} kind={:?}",
101                    usize::from(*entry),
102                    kind
103                )
104            }
105            InternalEvent::SqueezeBegin { victims } => {
106                let mut buf = String::new();
107                fmt_entry_list(&mut buf, victims)?;
108                write!(f, "event=squeeze_begin victims={}", buf)
109            }
110            InternalEvent::SqueezeVictim { entry } => {
111                write!(f, "event=squeeze_victim entry={}", usize::from(*entry))
112            }
113            InternalEvent::IoWrite { entry, kind, bytes } => {
114                write!(
115                    f,
116                    "event=io_write entry={} kind={:?} bytes={}",
117                    usize::from(*entry),
118                    kind,
119                    bytes
120                )
121            }
122            InternalEvent::IoReadSqueezedBacking { entry, bytes } => {
123                write!(
124                    f,
125                    "event=io_read_squeezed_backing entry={} bytes={}",
126                    usize::from(*entry),
127                    bytes
128                )
129            }
130            InternalEvent::IoReadArrow { entry, bytes } => {
131                write!(
132                    f,
133                    "event=io_read_arrow entry={} bytes={}",
134                    usize::from(*entry),
135                    bytes
136                )
137            }
138            InternalEvent::IoReadLiquid { entry, bytes } => {
139                write!(
140                    f,
141                    "event=io_read_liquid entry={} bytes={}",
142                    usize::from(*entry),
143                    bytes
144                )
145            }
146            InternalEvent::Read {
147                entry,
148                selection,
149                expr,
150                cached,
151            } => write!(
152                f,
153                "event=read entry={} selection={} expr={} cached={:?}",
154                usize::from(*entry),
155                selection,
156                expr.as_ref()
157                    .map(|e| e.to_string())
158                    .unwrap_or_else(|| "None".to_string()),
159                cached
160            ),
161            InternalEvent::Hydrate { entry, cached, new } => write!(
162                f,
163                "event=hydrate entry={} cached={:?} new={:?}",
164                usize::from(*entry),
165                cached,
166                new
167            ),
168            InternalEvent::EvalPredicate {
169                entry,
170                selection,
171                cached,
172            } => write!(
173                f,
174                "event=eval_predicate entry={} selection={} cached={:?}",
175                usize::from(*entry),
176                selection,
177                cached
178            ),
179            InternalEvent::TryReadLiquid { entry } => {
180                write!(f, "event=try_read_liquid entry={}", usize::from(*entry))
181            }
182            InternalEvent::ReadSqueezedData { entry, expression } => {
183                write!(
184                    f,
185                    "event=read_squeezed_data entry={} expression={}",
186                    usize::from(*entry),
187                    expression
188                )
189            }
190            InternalEvent::DecompressSqueezed {
191                entry,
192                decompressed,
193                total,
194            } => {
195                write!(
196                    f,
197                    "event=decompress_squeezed entry={} decompressed={} total={}",
198                    usize::from(*entry),
199                    decompressed,
200                    total
201                )
202            }
203        }
204    }
205}
206
207impl fmt::Debug for InternalEvent {
208    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209        fmt::Display::fmt(self, f)
210    }
211}
212
213impl EventTracer {
214    pub fn new() -> Self {
215        Self {
216            events: Mutex::new(Vec::new()),
217        }
218    }
219
220    pub fn record(&self, event: InternalEvent) {
221        self.events.lock().unwrap().push(event);
222    }
223
224    pub fn drain(&self) -> EventTrace {
225        EventTrace {
226            events: std::mem::take(&mut *self.events.lock().unwrap()),
227        }
228    }
229}
230
231/// A trace of events that occurred in the cache.
232/// This is used for testing only.
233#[derive(PartialEq, Eq, serde::Serialize)]
234pub struct EventTrace {
235    events: Vec<InternalEvent>,
236}
237
238impl From<Vec<InternalEvent>> for EventTrace {
239    fn from(events: Vec<InternalEvent>) -> Self {
240        Self { events }
241    }
242}
243
244impl fmt::Display for EventTrace {
245    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246        writeln!(f, "{:?}", self)
247    }
248}
249
250impl fmt::Debug for EventTrace {
251    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252        writeln!(f, "EventTrace: [")?;
253        for event in &self.events {
254            writeln!(f, "{}", event)?;
255        }
256        writeln!(f, "]")?;
257        Ok(())
258    }
259}