Skip to main content

liquid_cache/cache/observer/
internal_tracing.rs

1use std::sync::Mutex;
2use std::{fmt, fmt::Write};
3
4use crate::cache::{CacheExpression, CachedBatchType, EntryID};
5
6#[derive(Clone, PartialEq, Eq, serde::Serialize)]
7pub(crate) enum InternalEvent {
8    InsertFailed {
9        entry: EntryID,
10        kind: CachedBatchType,
11    },
12    InsertSuccess {
13        entry: EntryID,
14        kind: CachedBatchType,
15    },
16    SqueezeBegin {
17        victims: Vec<EntryID>,
18    },
19    SqueezeVictim {
20        entry: EntryID,
21    },
22    IoWrite {
23        entry: EntryID,
24        kind: CachedBatchType,
25        bytes: usize,
26    },
27    DiskEvict {
28        entry: EntryID,
29        bytes: usize,
30    },
31    IoReadSqueezedBacking {
32        entry: EntryID,
33        bytes: usize,
34    },
35    IoReadArrow {
36        entry: EntryID,
37        bytes: usize,
38    },
39    IoReadLiquid {
40        entry: EntryID,
41        bytes: usize,
42    },
43    Read {
44        entry: EntryID,
45        selection: bool,
46        expr: Option<CacheExpression>,
47        cached: CachedBatchType,
48    },
49    Hydrate {
50        entry: EntryID,
51        cached: CachedBatchType,
52        new: CachedBatchType,
53    },
54    EvalPredicate {
55        entry: EntryID,
56        selection: bool,
57        cached: CachedBatchType,
58    },
59    ReadSqueezedData {
60        entry: EntryID,
61        expression: CacheExpression,
62    },
63    TryReadLiquid {
64        entry: EntryID,
65    },
66    DecompressSqueezed {
67        entry: EntryID,
68        decompressed: usize,
69        total: usize,
70    },
71}
72
73#[derive(Debug)]
74pub(crate) struct EventTracer {
75    events: Mutex<Vec<InternalEvent>>,
76}
77
78fn fmt_entry_list(buf: &mut String, victims: &[EntryID]) -> fmt::Result {
79    buf.push('[');
80    for (idx, v) in victims.iter().enumerate() {
81        if idx > 0 {
82            buf.push(',');
83        }
84        write!(buf, "{}", usize::from(*v))?;
85    }
86    buf.push(']');
87    Ok(())
88}
89
90impl fmt::Display for InternalEvent {
91    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92        match self {
93            InternalEvent::InsertFailed { entry, kind } => {
94                write!(
95                    f,
96                    "event=insert_failed entry={} kind={:?}",
97                    usize::from(*entry),
98                    kind
99                )
100            }
101            InternalEvent::InsertSuccess { entry, kind } => {
102                write!(
103                    f,
104                    "event=insert_success entry={} kind={:?}",
105                    usize::from(*entry),
106                    kind
107                )
108            }
109            InternalEvent::SqueezeBegin { victims } => {
110                let mut buf = String::new();
111                fmt_entry_list(&mut buf, victims)?;
112                write!(f, "event=squeeze_begin victims={}", buf)
113            }
114            InternalEvent::SqueezeVictim { entry } => {
115                write!(f, "event=squeeze_victim entry={}", usize::from(*entry))
116            }
117            InternalEvent::IoWrite { entry, kind, bytes } => {
118                write!(
119                    f,
120                    "event=io_write entry={} kind={:?} bytes={}",
121                    usize::from(*entry),
122                    kind,
123                    bytes
124                )
125            }
126            InternalEvent::DiskEvict { entry, bytes } => {
127                write!(
128                    f,
129                    "event=disk_evict entry={} bytes={}",
130                    usize::from(*entry),
131                    bytes
132                )
133            }
134            InternalEvent::IoReadSqueezedBacking { entry, bytes } => {
135                write!(
136                    f,
137                    "event=io_read_squeezed_backing entry={} bytes={}",
138                    usize::from(*entry),
139                    bytes
140                )
141            }
142            InternalEvent::IoReadArrow { entry, bytes } => {
143                write!(
144                    f,
145                    "event=io_read_arrow entry={} bytes={}",
146                    usize::from(*entry),
147                    bytes
148                )
149            }
150            InternalEvent::IoReadLiquid { entry, bytes } => {
151                write!(
152                    f,
153                    "event=io_read_liquid entry={} bytes={}",
154                    usize::from(*entry),
155                    bytes
156                )
157            }
158            InternalEvent::Read {
159                entry,
160                selection,
161                expr,
162                cached,
163            } => write!(
164                f,
165                "event=read entry={} selection={} expr={} cached={:?}",
166                usize::from(*entry),
167                selection,
168                expr.as_ref()
169                    .map(|e| e.to_string())
170                    .unwrap_or_else(|| "None".to_string()),
171                cached
172            ),
173            InternalEvent::Hydrate { entry, cached, new } => write!(
174                f,
175                "event=hydrate entry={} cached={:?} new={:?}",
176                usize::from(*entry),
177                cached,
178                new
179            ),
180            InternalEvent::EvalPredicate {
181                entry,
182                selection,
183                cached,
184            } => write!(
185                f,
186                "event=eval_predicate entry={} selection={} cached={:?}",
187                usize::from(*entry),
188                selection,
189                cached
190            ),
191            InternalEvent::TryReadLiquid { entry } => {
192                write!(f, "event=try_read_liquid entry={}", usize::from(*entry))
193            }
194            InternalEvent::ReadSqueezedData { entry, expression } => {
195                write!(
196                    f,
197                    "event=read_squeezed_data entry={} expression={}",
198                    usize::from(*entry),
199                    expression
200                )
201            }
202            InternalEvent::DecompressSqueezed {
203                entry,
204                decompressed,
205                total,
206            } => {
207                write!(
208                    f,
209                    "event=decompress_squeezed entry={} decompressed={} total={}",
210                    usize::from(*entry),
211                    decompressed,
212                    total
213                )
214            }
215        }
216    }
217}
218
219impl fmt::Debug for InternalEvent {
220    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221        fmt::Display::fmt(self, f)
222    }
223}
224
225impl EventTracer {
226    pub fn new() -> Self {
227        Self {
228            events: Mutex::new(Vec::new()),
229        }
230    }
231
232    pub fn record(&self, event: InternalEvent) {
233        self.events.lock().unwrap().push(event);
234    }
235
236    pub fn drain(&self) -> EventTrace {
237        EventTrace {
238            events: std::mem::take(&mut *self.events.lock().unwrap()),
239        }
240    }
241}
242
243/// A trace of events that occurred in the cache.
244/// This is used for testing only.
245#[derive(PartialEq, Eq, serde::Serialize)]
246pub struct EventTrace {
247    events: Vec<InternalEvent>,
248}
249
250impl From<Vec<InternalEvent>> for EventTrace {
251    fn from(events: Vec<InternalEvent>) -> Self {
252        Self { events }
253    }
254}
255
256impl fmt::Display for EventTrace {
257    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258        writeln!(f, "{:?}", self)
259    }
260}
261
262impl fmt::Debug for EventTrace {
263    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
264        writeln!(f, "EventTrace: [")?;
265        for event in &self.events {
266            writeln!(f, "{}", event)?;
267        }
268        writeln!(f, "]")?;
269        Ok(())
270    }
271}