Skip to main content

liquid_cache_datafusion/cache/
id.rs

1use std::{
2    ops::Deref,
3    path::{Path, PathBuf},
4};
5
6use liquid_cache::cache::EntryID;
7
8/// This is a unique identifier for a row in a parquet file.
9#[repr(C, align(8))]
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
11pub struct ParquetArrayID {
12    file_id: u16,
13    rg_id: u16,
14    col_id: u16,
15    batch_id: BatchID,
16}
17
18impl From<ParquetArrayID> for usize {
19    fn from(id: ParquetArrayID) -> Self {
20        (id.file_id as usize) << 48
21            | (id.rg_id as usize) << 32
22            | (id.col_id as usize) << 16
23            | (id.batch_id.v as usize)
24    }
25}
26
27impl From<usize> for ParquetArrayID {
28    fn from(value: usize) -> Self {
29        Self {
30            file_id: (value >> 48) as u16,
31            rg_id: ((value >> 32) & 0xFFFF) as u16,
32            col_id: ((value >> 16) & 0xFFFF) as u16,
33            batch_id: BatchID::from_raw((value & 0xFFFF) as u16),
34        }
35    }
36}
37
38impl ParquetArrayID {}
39
40impl From<ParquetArrayID> for EntryID {
41    fn from(id: ParquetArrayID) -> Self {
42        EntryID::from(usize::from(id))
43    }
44}
45
46impl From<EntryID> for ParquetArrayID {
47    fn from(id: EntryID) -> Self {
48        ParquetArrayID::from(usize::from(id))
49    }
50}
51
52const _: () = assert!(std::mem::size_of::<ParquetArrayID>() == 8);
53const _: () = assert!(std::mem::align_of::<ParquetArrayID>() == 8);
54
55impl ParquetArrayID {
56    /// Creates a new CacheEntryID.
57    pub fn new(file_id: u64, row_group_id: u64, column_id: u64, batch_id: BatchID) -> Self {
58        debug_assert!(file_id <= u16::MAX as u64);
59        debug_assert!(row_group_id <= u16::MAX as u64);
60        debug_assert!(column_id <= u16::MAX as u64);
61        Self {
62            file_id: file_id as u16,
63            rg_id: row_group_id as u16,
64            col_id: column_id as u16,
65            batch_id,
66        }
67    }
68
69    /// Get the batch id.
70    pub fn batch_id_inner(&self) -> u64 {
71        self.batch_id.v as u64
72    }
73
74    /// Get the file id.
75    pub fn file_id_inner(&self) -> u64 {
76        self.file_id as u64
77    }
78
79    /// Get the row group id.
80    pub fn row_group_id_inner(&self) -> u64 {
81        self.rg_id as u64
82    }
83
84    /// Get the column id.
85    pub fn column_id_inner(&self) -> u64 {
86        self.col_id as u64
87    }
88
89    /// Get the on-disk path.
90    pub fn on_disk_liquid_path(&self, cache_root_dir: &Path) -> PathBuf {
91        let batch_id = self.batch_id_inner();
92        cache_root_dir
93            .join(format!("file_{}", self.file_id_inner()))
94            .join(format!("rg_{}", self.row_group_id_inner()))
95            .join(format!("col_{}", self.column_id_inner()))
96            .join(format!("batch_{batch_id}.liquid"))
97    }
98}
99
100/// BatchID is a unique identifier for a batch of rows,
101/// it is row id divided by the batch size.
102///
103// It's very easy to misinterpret this as row id, so we use new type idiom to avoid confusion:
104// https://doc.rust-lang.org/rust-by-example/generics/new_types.html
105#[repr(C, align(2))]
106#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Ord, PartialOrd)]
107pub struct BatchID {
108    v: u16,
109}
110
111impl BatchID {
112    /// Creates a new BatchID from a row id and a batch size.
113    /// The row id is at the boundary of the batch.
114    pub fn from_row_id(row_id: usize, batch_size: usize) -> Self {
115        Self {
116            v: (row_id / batch_size) as u16,
117        }
118    }
119
120    /// Creates a new BatchID from a raw value.
121    pub fn from_raw(v: u16) -> Self {
122        Self { v }
123    }
124
125    /// Increment the batch id.
126    pub fn inc(&mut self) {
127        debug_assert!(self.v < u16::MAX);
128        self.v += 1;
129    }
130}
131
132impl Deref for BatchID {
133    type Target = u16;
134
135    fn deref(&self) -> &Self::Target {
136        &self.v
137    }
138}
139
140/// Column access path.
141#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
142pub struct ColumnAccessPath {
143    file_id: u16,
144    rg_id: u16,
145    col_id: u16,
146}
147
148impl ColumnAccessPath {
149    /// Create a new instance of ColumnAccessPath.
150    pub fn new(file_id: u64, row_group_id: u64, column_id: u64) -> Self {
151        debug_assert!(file_id <= u16::MAX as u64);
152        debug_assert!(row_group_id <= u16::MAX as u64);
153        debug_assert!(column_id <= u16::MAX as u64);
154        Self {
155            file_id: file_id as u16,
156            rg_id: row_group_id as u16,
157            col_id: column_id as u16,
158        }
159    }
160
161    /// Initialize the directory for the column access path.
162    pub fn initialize_dir(&self, cache_root_dir: &Path) {
163        let path = cache_root_dir
164            .join(format!("file_{}", self.file_id_inner()))
165            .join(format!("rg_{}", self.row_group_id_inner()))
166            .join(format!("col_{}", self.column_id_inner()));
167        std::fs::create_dir_all(&path).expect("Failed to create cache directory");
168    }
169
170    /// Get the file id.
171    fn file_id_inner(&self) -> u64 {
172        self.file_id as u64
173    }
174
175    /// Get the row group id.
176    fn row_group_id_inner(&self) -> u64 {
177        self.rg_id as u64
178    }
179
180    /// Get the column id.
181    fn column_id_inner(&self) -> u64 {
182        self.col_id as u64
183    }
184
185    /// Get the entry id.
186    pub fn entry_id(&self, batch_id: BatchID) -> ParquetArrayID {
187        ParquetArrayID::new(
188            self.file_id_inner(),
189            self.row_group_id_inner(),
190            self.column_id_inner(),
191            batch_id,
192        )
193    }
194}
195
196impl From<ParquetArrayID> for ColumnAccessPath {
197    fn from(value: ParquetArrayID) -> Self {
198        Self {
199            file_id: value.file_id_inner() as u16,
200            rg_id: value.row_group_id_inner() as u16,
201            col_id: value.column_id_inner() as u16,
202        }
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use tempfile::tempdir;
209
210    use super::*;
211
212    #[test]
213    fn test_cache_entry_id_new_and_getters() {
214        let file_id = 10u64;
215        let row_group_id = 20u64;
216        let column_id = 30u64;
217        let batch_id = BatchID::from_raw(40);
218        let entry_id = ParquetArrayID::new(file_id, row_group_id, column_id, batch_id);
219
220        assert_eq!(entry_id.file_id_inner(), file_id);
221        assert_eq!(entry_id.row_group_id_inner(), row_group_id);
222        assert_eq!(entry_id.column_id_inner(), column_id);
223        assert_eq!(entry_id.batch_id_inner(), *batch_id as u64);
224    }
225
226    #[test]
227    fn test_cache_entry_id_boundaries() {
228        let file_id = u16::MAX as u64;
229        let row_group_id = 0u64;
230        let column_id = u16::MAX as u64;
231        let batch_id = BatchID::from_raw(0);
232        let entry_id = ParquetArrayID::new(file_id, row_group_id, column_id, batch_id);
233
234        assert_eq!(entry_id.file_id_inner(), file_id);
235        assert_eq!(entry_id.row_group_id_inner(), row_group_id);
236        assert_eq!(entry_id.column_id_inner(), column_id);
237        assert_eq!(entry_id.batch_id_inner(), *batch_id as u64);
238    }
239
240    #[test]
241    #[should_panic]
242    fn test_cache_entry_id_new_panic_file_id() {
243        ParquetArrayID::new((u16::MAX as u64) + 1, 0, 0, BatchID::from_raw(0));
244    }
245
246    #[test]
247    #[should_panic]
248    fn test_cache_entry_id_new_panic_row_group_id() {
249        ParquetArrayID::new(0, (u16::MAX as u64) + 1, 0, BatchID::from_raw(0));
250    }
251
252    #[test]
253    #[should_panic]
254    fn test_cache_entry_id_new_panic_column_id() {
255        ParquetArrayID::new(0, 0, (u16::MAX as u64) + 1, BatchID::from_raw(0));
256    }
257
258    #[test]
259    fn test_cache_entry_id_on_disk_path() {
260        let temp_dir = tempdir().unwrap();
261        let cache_root = temp_dir.path();
262        let entry_id = ParquetArrayID::new(1, 2, 3, BatchID::from_raw(4));
263        let expected_path = cache_root
264            .join("file_1")
265            .join("rg_2")
266            .join("col_3")
267            .join("batch_4.liquid");
268        assert_eq!(entry_id.on_disk_liquid_path(cache_root), expected_path);
269    }
270
271    #[test]
272    fn test_batch_id_from_row_id() {
273        let batch_id = BatchID::from_row_id(256, 128);
274        assert_eq!(batch_id.v, 2);
275    }
276
277    #[test]
278    fn test_batch_id_from_raw() {
279        let batch_id = BatchID::from_raw(5);
280        assert_eq!(batch_id.v, 5);
281    }
282
283    #[test]
284    fn test_batch_id_inc() {
285        let mut batch_id = BatchID::from_raw(10);
286        batch_id.inc();
287        assert_eq!(batch_id.v, 11);
288    }
289
290    #[test]
291    #[should_panic]
292    fn test_batch_id_inc_overflow() {
293        let mut batch_id = BatchID::from_raw(u16::MAX);
294        // Should panic because incrementing exceeds u16::MAX
295        batch_id.inc();
296    }
297
298    #[test]
299    fn test_batch_id_deref() {
300        let batch_id = BatchID::from_raw(15);
301        assert_eq!(*batch_id, 15);
302    }
303
304    #[test]
305    fn test_column_path_from_cache_entry_id() {
306        let entry_id = ParquetArrayID::new(1, 2, 3, BatchID::from_raw(4));
307        let column_path: ColumnAccessPath = entry_id.into();
308
309        assert_eq!(column_path.file_id, 1);
310        assert_eq!(column_path.rg_id, 2);
311        assert_eq!(column_path.col_id, 3);
312    }
313
314    #[test]
315    fn test_column_path_directory_hosts_cache_entry_path() {
316        let temp_dir = tempdir().unwrap();
317        let cache_root = temp_dir.path();
318
319        // Create a column path
320        let file_id = 5u64;
321        let row_group_id = 6u64;
322        let column_id = 7u64;
323        let column_path = ColumnAccessPath::new(file_id, row_group_id, column_id);
324
325        // Initialize the directory
326        column_path.initialize_dir(cache_root);
327
328        // Create a cache entry ID from the column path
329        let batch_id = BatchID::from_raw(8);
330        let entry_id = column_path.entry_id(batch_id);
331
332        // Get the on-disk path
333        let entry_path = entry_id.on_disk_liquid_path(cache_root);
334
335        // Verify the parent directory of the entry path exists
336        assert!(entry_path.parent().unwrap().exists());
337
338        // Verify the directory structure matches
339        let expected_dir = cache_root
340            .join(format!("file_{file_id}"))
341            .join(format!("rg_{row_group_id}"))
342            .join(format!("col_{column_id}"));
343
344        assert_eq!(entry_path.parent().unwrap(), &expected_dir);
345
346        // Verify we can create a file at the entry path
347        std::fs::write(&entry_path, b"test data").unwrap();
348        assert!(entry_path.exists());
349    }
350}