liquid_cache_datafusion/cache/
id.rs1use std::{
2 ops::Deref,
3 path::{Path, PathBuf},
4};
5
6use liquid_cache::cache::EntryID;
7
8#[repr(C, align(8))]
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
11pub struct ParquetArrayID {
12 file_id: u16,
13 rg_id: u16,
14 col_id: u16,
15 batch_id: BatchID,
16}
17
18impl From<ParquetArrayID> for usize {
19 fn from(id: ParquetArrayID) -> Self {
20 (id.file_id as usize) << 48
21 | (id.rg_id as usize) << 32
22 | (id.col_id as usize) << 16
23 | (id.batch_id.v as usize)
24 }
25}
26
27impl From<usize> for ParquetArrayID {
28 fn from(value: usize) -> Self {
29 Self {
30 file_id: (value >> 48) as u16,
31 rg_id: ((value >> 32) & 0xFFFF) as u16,
32 col_id: ((value >> 16) & 0xFFFF) as u16,
33 batch_id: BatchID::from_raw((value & 0xFFFF) as u16),
34 }
35 }
36}
37
38impl ParquetArrayID {}
39
40impl From<ParquetArrayID> for EntryID {
41 fn from(id: ParquetArrayID) -> Self {
42 EntryID::from(usize::from(id))
43 }
44}
45
46impl From<EntryID> for ParquetArrayID {
47 fn from(id: EntryID) -> Self {
48 ParquetArrayID::from(usize::from(id))
49 }
50}
51
52const _: () = assert!(std::mem::size_of::<ParquetArrayID>() == 8);
53const _: () = assert!(std::mem::align_of::<ParquetArrayID>() == 8);
54
55impl ParquetArrayID {
56 pub fn new(file_id: u64, row_group_id: u64, column_id: u64, batch_id: BatchID) -> Self {
58 debug_assert!(file_id <= u16::MAX as u64);
59 debug_assert!(row_group_id <= u16::MAX as u64);
60 debug_assert!(column_id <= u16::MAX as u64);
61 Self {
62 file_id: file_id as u16,
63 rg_id: row_group_id as u16,
64 col_id: column_id as u16,
65 batch_id,
66 }
67 }
68
69 pub fn batch_id_inner(&self) -> u64 {
71 self.batch_id.v as u64
72 }
73
74 pub fn file_id_inner(&self) -> u64 {
76 self.file_id as u64
77 }
78
79 pub fn row_group_id_inner(&self) -> u64 {
81 self.rg_id as u64
82 }
83
84 pub fn column_id_inner(&self) -> u64 {
86 self.col_id as u64
87 }
88
89 pub fn on_disk_liquid_path(&self, cache_root_dir: &Path) -> PathBuf {
91 let batch_id = self.batch_id_inner();
92 cache_root_dir
93 .join(format!("file_{}", self.file_id_inner()))
94 .join(format!("rg_{}", self.row_group_id_inner()))
95 .join(format!("col_{}", self.column_id_inner()))
96 .join(format!("batch_{batch_id}.liquid"))
97 }
98}
99
100#[repr(C, align(2))]
106#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Ord, PartialOrd)]
107pub struct BatchID {
108 v: u16,
109}
110
111impl BatchID {
112 pub fn from_row_id(row_id: usize, batch_size: usize) -> Self {
115 Self {
116 v: (row_id / batch_size) as u16,
117 }
118 }
119
120 pub fn from_raw(v: u16) -> Self {
122 Self { v }
123 }
124
125 pub fn inc(&mut self) {
127 debug_assert!(self.v < u16::MAX);
128 self.v += 1;
129 }
130}
131
132impl Deref for BatchID {
133 type Target = u16;
134
135 fn deref(&self) -> &Self::Target {
136 &self.v
137 }
138}
139
140#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
142pub struct ColumnAccessPath {
143 file_id: u16,
144 rg_id: u16,
145 col_id: u16,
146}
147
148impl ColumnAccessPath {
149 pub fn new(file_id: u64, row_group_id: u64, column_id: u64) -> Self {
151 debug_assert!(file_id <= u16::MAX as u64);
152 debug_assert!(row_group_id <= u16::MAX as u64);
153 debug_assert!(column_id <= u16::MAX as u64);
154 Self {
155 file_id: file_id as u16,
156 rg_id: row_group_id as u16,
157 col_id: column_id as u16,
158 }
159 }
160
161 pub fn initialize_dir(&self, cache_root_dir: &Path) {
163 let path = cache_root_dir
164 .join(format!("file_{}", self.file_id_inner()))
165 .join(format!("rg_{}", self.row_group_id_inner()))
166 .join(format!("col_{}", self.column_id_inner()));
167 std::fs::create_dir_all(&path).expect("Failed to create cache directory");
168 }
169
170 fn file_id_inner(&self) -> u64 {
172 self.file_id as u64
173 }
174
175 fn row_group_id_inner(&self) -> u64 {
177 self.rg_id as u64
178 }
179
180 fn column_id_inner(&self) -> u64 {
182 self.col_id as u64
183 }
184
185 pub fn entry_id(&self, batch_id: BatchID) -> ParquetArrayID {
187 ParquetArrayID::new(
188 self.file_id_inner(),
189 self.row_group_id_inner(),
190 self.column_id_inner(),
191 batch_id,
192 )
193 }
194}
195
196impl From<ParquetArrayID> for ColumnAccessPath {
197 fn from(value: ParquetArrayID) -> Self {
198 Self {
199 file_id: value.file_id_inner() as u16,
200 rg_id: value.row_group_id_inner() as u16,
201 col_id: value.column_id_inner() as u16,
202 }
203 }
204}
205
206#[cfg(test)]
207mod tests {
208 use tempfile::tempdir;
209
210 use super::*;
211
212 #[test]
213 fn test_cache_entry_id_new_and_getters() {
214 let file_id = 10u64;
215 let row_group_id = 20u64;
216 let column_id = 30u64;
217 let batch_id = BatchID::from_raw(40);
218 let entry_id = ParquetArrayID::new(file_id, row_group_id, column_id, batch_id);
219
220 assert_eq!(entry_id.file_id_inner(), file_id);
221 assert_eq!(entry_id.row_group_id_inner(), row_group_id);
222 assert_eq!(entry_id.column_id_inner(), column_id);
223 assert_eq!(entry_id.batch_id_inner(), *batch_id as u64);
224 }
225
226 #[test]
227 fn test_cache_entry_id_boundaries() {
228 let file_id = u16::MAX as u64;
229 let row_group_id = 0u64;
230 let column_id = u16::MAX as u64;
231 let batch_id = BatchID::from_raw(0);
232 let entry_id = ParquetArrayID::new(file_id, row_group_id, column_id, batch_id);
233
234 assert_eq!(entry_id.file_id_inner(), file_id);
235 assert_eq!(entry_id.row_group_id_inner(), row_group_id);
236 assert_eq!(entry_id.column_id_inner(), column_id);
237 assert_eq!(entry_id.batch_id_inner(), *batch_id as u64);
238 }
239
240 #[test]
241 #[should_panic]
242 fn test_cache_entry_id_new_panic_file_id() {
243 ParquetArrayID::new((u16::MAX as u64) + 1, 0, 0, BatchID::from_raw(0));
244 }
245
246 #[test]
247 #[should_panic]
248 fn test_cache_entry_id_new_panic_row_group_id() {
249 ParquetArrayID::new(0, (u16::MAX as u64) + 1, 0, BatchID::from_raw(0));
250 }
251
252 #[test]
253 #[should_panic]
254 fn test_cache_entry_id_new_panic_column_id() {
255 ParquetArrayID::new(0, 0, (u16::MAX as u64) + 1, BatchID::from_raw(0));
256 }
257
258 #[test]
259 fn test_cache_entry_id_on_disk_path() {
260 let temp_dir = tempdir().unwrap();
261 let cache_root = temp_dir.path();
262 let entry_id = ParquetArrayID::new(1, 2, 3, BatchID::from_raw(4));
263 let expected_path = cache_root
264 .join("file_1")
265 .join("rg_2")
266 .join("col_3")
267 .join("batch_4.liquid");
268 assert_eq!(entry_id.on_disk_liquid_path(cache_root), expected_path);
269 }
270
271 #[test]
272 fn test_batch_id_from_row_id() {
273 let batch_id = BatchID::from_row_id(256, 128);
274 assert_eq!(batch_id.v, 2);
275 }
276
277 #[test]
278 fn test_batch_id_from_raw() {
279 let batch_id = BatchID::from_raw(5);
280 assert_eq!(batch_id.v, 5);
281 }
282
283 #[test]
284 fn test_batch_id_inc() {
285 let mut batch_id = BatchID::from_raw(10);
286 batch_id.inc();
287 assert_eq!(batch_id.v, 11);
288 }
289
290 #[test]
291 #[should_panic]
292 fn test_batch_id_inc_overflow() {
293 let mut batch_id = BatchID::from_raw(u16::MAX);
294 batch_id.inc();
296 }
297
298 #[test]
299 fn test_batch_id_deref() {
300 let batch_id = BatchID::from_raw(15);
301 assert_eq!(*batch_id, 15);
302 }
303
304 #[test]
305 fn test_column_path_from_cache_entry_id() {
306 let entry_id = ParquetArrayID::new(1, 2, 3, BatchID::from_raw(4));
307 let column_path: ColumnAccessPath = entry_id.into();
308
309 assert_eq!(column_path.file_id, 1);
310 assert_eq!(column_path.rg_id, 2);
311 assert_eq!(column_path.col_id, 3);
312 }
313
314 #[test]
315 fn test_column_path_directory_hosts_cache_entry_path() {
316 let temp_dir = tempdir().unwrap();
317 let cache_root = temp_dir.path();
318
319 let file_id = 5u64;
321 let row_group_id = 6u64;
322 let column_id = 7u64;
323 let column_path = ColumnAccessPath::new(file_id, row_group_id, column_id);
324
325 column_path.initialize_dir(cache_root);
327
328 let batch_id = BatchID::from_raw(8);
330 let entry_id = column_path.entry_id(batch_id);
331
332 let entry_path = entry_id.on_disk_liquid_path(cache_root);
334
335 assert!(entry_path.parent().unwrap().exists());
337
338 let expected_dir = cache_root
340 .join(format!("file_{file_id}"))
341 .join(format!("rg_{row_group_id}"))
342 .join(format!("col_{column_id}"));
343
344 assert_eq!(entry_path.parent().unwrap(), &expected_dir);
345
346 std::fs::write(&entry_path, b"test data").unwrap();
348 assert!(entry_path.exists());
349 }
350}