liquid_cache_parquet/cache/
stats.rs

1use super::LiquidCache;
2use crate::{cache::id::ParquetArrayID, sync::Arc};
3use arrow::array::{ArrayBuilder, RecordBatch, StringBuilder, UInt64Builder};
4use arrow_schema::{DataType, Field, Schema, SchemaRef};
5use liquid_cache_storage::cache::CachedBatch;
6use parquet::{
7    arrow::ArrowWriter, basic::Compression, errors::ParquetError,
8    file::properties::WriterProperties,
9};
10use std::{fs::File, path::Path};
11
12struct StatsWriter {
13    writer: ArrowWriter<File>,
14    schema: SchemaRef,
15    file_path_builder: StringBuilder,
16    row_group_id_builder: UInt64Builder,
17    column_id_builder: UInt64Builder,
18    row_start_id_builder: UInt64Builder,
19    row_count_builder: UInt64Builder,
20    memory_size_builder: UInt64Builder,
21    cache_type_builder: StringBuilder,
22    reference_count_builder: UInt64Builder,
23}
24
25impl StatsWriter {
26    fn new(file_path: impl AsRef<Path>) -> Result<Self, ParquetError> {
27        let schema = Arc::new(Schema::new(vec![
28            Field::new("row_group_id", DataType::UInt64, false),
29            Field::new("column_id", DataType::UInt64, false),
30            Field::new("row_start_id", DataType::UInt64, false),
31            Field::new("row_count", DataType::UInt64, true),
32            Field::new("memory_size", DataType::UInt64, false),
33            Field::new("cache_type", DataType::Utf8, false),
34            Field::new("file_path", DataType::Utf8, false),
35            Field::new("reference_count", DataType::UInt64, false),
36        ]));
37
38        let file = File::create(file_path)?;
39        let write_props = WriterProperties::builder()
40            .set_compression(Compression::LZ4)
41            .set_created_by("liquid-cache-stats".to_string())
42            .build();
43        let writer = ArrowWriter::try_new(file, schema.clone(), Some(write_props))?;
44        Ok(Self {
45            writer,
46            schema,
47            file_path_builder: StringBuilder::with_capacity(8192, 8192),
48            row_group_id_builder: UInt64Builder::new(),
49            column_id_builder: UInt64Builder::new(),
50            row_start_id_builder: UInt64Builder::new(),
51            row_count_builder: UInt64Builder::new(),
52            memory_size_builder: UInt64Builder::new(),
53            cache_type_builder: StringBuilder::with_capacity(8192, 8192),
54            reference_count_builder: UInt64Builder::new(),
55        })
56    }
57
58    fn build_batch(&mut self) -> Result<RecordBatch, ParquetError> {
59        let row_group_id_array = self.row_group_id_builder.finish();
60        let column_id_array = self.column_id_builder.finish();
61        let row_start_id_array = self.row_start_id_builder.finish();
62        let row_count_array = self.row_count_builder.finish();
63        let memory_size_array = self.memory_size_builder.finish();
64        let cache_type_array = self.cache_type_builder.finish();
65        let file_path_array = self.file_path_builder.finish();
66        let reference_count_array = self.reference_count_builder.finish();
67        Ok(RecordBatch::try_new(
68            self.schema.clone(),
69            vec![
70                Arc::new(row_group_id_array),
71                Arc::new(column_id_array),
72                Arc::new(row_start_id_array),
73                Arc::new(row_count_array),
74                Arc::new(memory_size_array),
75                Arc::new(cache_type_array),
76                Arc::new(file_path_array),
77                Arc::new(reference_count_array),
78            ],
79        )?)
80    }
81
82    #[allow(clippy::too_many_arguments)]
83    fn append_entry(
84        &mut self,
85        file_path: &str,
86        row_group_id: u64,
87        column_id: u64,
88        row_start_id: u64,
89        row_count: Option<u64>,
90        memory_size: u64,
91        cache_type: &str,
92        reference_count: u64,
93    ) -> Result<(), ParquetError> {
94        self.row_group_id_builder.append_value(row_group_id);
95        self.column_id_builder.append_value(column_id);
96        self.row_start_id_builder.append_value(row_start_id);
97        self.row_count_builder.append_option(row_count);
98        self.memory_size_builder.append_value(memory_size);
99        self.cache_type_builder.append_value(cache_type);
100        self.reference_count_builder.append_value(reference_count);
101        self.file_path_builder.append_value(file_path);
102        if self.row_start_id_builder.len() >= 8192 {
103            let batch = self.build_batch()?;
104            self.writer.write(&batch)?;
105        }
106        Ok(())
107    }
108
109    fn finish(mut self) -> Result<(), ParquetError> {
110        let batch = self.build_batch()?;
111        self.writer.write(&batch)?;
112        self.writer.close()?;
113        Ok(())
114    }
115}
116
117impl LiquidCache {
118    /// Get the memory usage of the cache in bytes.
119    pub fn compute_memory_usage_bytes(&self) -> u64 {
120        self.cache_store.budget().memory_usage_bytes() as u64
121    }
122
123    /// Write the stats of the cache to a parquet file.
124    pub fn write_stats(&self, parquet_file_path: impl AsRef<Path>) -> Result<(), ParquetError> {
125        let mut writer = StatsWriter::new(parquet_file_path)?;
126        self.cache_store.for_each_entry(|entry_id, cached_batch| {
127            let memory_size = cached_batch.memory_usage_bytes();
128            let row_count = match cached_batch {
129                CachedBatch::MemoryArrow(array) => Some(array.len() as u64),
130                CachedBatch::MemoryLiquid(array) => Some(array.len() as u64),
131                CachedBatch::MemoryHybridLiquid(array) => Some(array.len() as u64),
132                CachedBatch::DiskLiquid(_) => None,
133                CachedBatch::DiskArrow(_) => None, // We'd need to read it to get the count
134            };
135            let cache_type = match cached_batch {
136                CachedBatch::MemoryArrow(_) => "InMemory",
137                CachedBatch::MemoryLiquid(_) => "LiquidMemory",
138                CachedBatch::MemoryHybridLiquid(_) => "LiquidHybrid",
139                CachedBatch::DiskLiquid(_) => "OnDiskLiquid",
140                CachedBatch::DiskArrow(_) => "OnDiskArrow",
141            };
142            let reference_count = cached_batch.reference_count();
143            let entry_id = ParquetArrayID::from(*entry_id);
144            writer
145                .append_entry(
146                    entry_id
147                        .on_disk_path(self.cache_store.config().cache_root_dir())
148                        .to_str()
149                        .unwrap(),
150                    entry_id.row_group_id_inner(),
151                    entry_id.column_id_inner(),
152                    entry_id.batch_id_inner() * self.batch_size() as u64,
153                    row_count,
154                    memory_size as u64,
155                    cache_type,
156                    reference_count as u64,
157                )
158                .unwrap();
159        });
160
161        writer.finish()?;
162        Ok(())
163    }
164}
165
166#[cfg(test)]
167mod tests {
168    use std::io::Read;
169
170    use crate::cache::{IoMode, id::BatchID};
171
172    use super::*;
173    use arrow::{
174        array::{Array, AsArray},
175        datatypes::UInt64Type,
176    };
177    use bytes::Bytes;
178    use liquid_cache_storage::{cache::squeeze_policies::Evict, cache_policies::LiquidPolicy};
179    use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
180    use tempfile::NamedTempFile;
181
182    #[tokio::test]
183    async fn test_stats_writer() -> Result<(), ParquetError> {
184        let tmp_dir = tempfile::tempdir().unwrap();
185        let cache = LiquidCache::new(
186            1024,
187            usize::MAX,
188            tmp_dir.path().to_path_buf(),
189            Box::new(LiquidPolicy::new()),
190            Box::new(Evict),
191            IoMode::Uring,
192        );
193        let array = Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3]));
194        let num_rows = 8 * 8 * 8 * 8;
195
196        let mut row_group_id_sum = 0;
197        let mut column_id_sum = 0;
198        let mut row_start_id_sum = 0;
199        let mut row_count_sum = 0;
200        let mut memory_size_sum = 0;
201        for file_no in 0..8 {
202            let file_name = format!("test_{file_no}.parquet");
203            let file = cache.register_or_get_file(file_name);
204            for rg in 0..8 {
205                let row_group = file.row_group(rg);
206                for col in 0..8 {
207                    let column = row_group.create_column(
208                        col,
209                        Arc::new(Field::new(format!("test_{col}"), DataType::Int32, false)),
210                    );
211                    for batch in 0..8 {
212                        let batch_id = BatchID::from_raw(batch);
213                        assert!(column.insert(batch_id, array.clone()).await.is_ok());
214                        row_group_id_sum += rg;
215                        column_id_sum += col;
216                        row_start_id_sum += *batch_id as u64 * cache.batch_size() as u64;
217                        row_count_sum += array.len() as u64;
218                        memory_size_sum += array.get_array_memory_size();
219
220                        if batch.is_multiple_of(2) {
221                            _ = column.get_arrow_array_test_only(batch_id).await.unwrap();
222                        }
223                    }
224                }
225            }
226        }
227
228        let mut tmp_file = NamedTempFile::new()?;
229        cache.write_stats(tmp_file.path())?;
230
231        // Read and verify stats
232        let mut bytes = Vec::new();
233        tmp_file.read_to_end(&mut bytes)?;
234        let bytes = Bytes::from(bytes);
235        let reader = ParquetRecordBatchReader::try_new(bytes, 8192)?;
236
237        let batch = reader.into_iter().next().unwrap()?;
238        assert_eq!(batch.num_rows(), num_rows);
239
240        macro_rules! uint64_col {
241            ($batch:expr, $col_idx:expr) => {
242                $batch
243                    .column_by_name($col_idx)
244                    .unwrap()
245                    .as_primitive::<UInt64Type>()
246            };
247        }
248
249        let row_group_id_array = uint64_col!(batch, "row_group_id");
250        let column_id_array = uint64_col!(batch, "column_id");
251        let row_start_id_array = uint64_col!(batch, "row_start_id");
252        let row_count_array = uint64_col!(batch, "row_count");
253        let memory_size_array = uint64_col!(batch, "memory_size");
254
255        assert_eq!(
256            row_group_id_array.iter().map(|v| v.unwrap()).sum::<u64>(),
257            row_group_id_sum
258        );
259        assert_eq!(
260            column_id_array.iter().map(|v| v.unwrap()).sum::<u64>(),
261            column_id_sum
262        );
263        assert_eq!(
264            row_start_id_array.iter().map(|v| v.unwrap()).sum::<u64>(),
265            row_start_id_sum
266        );
267        assert_eq!(
268            row_count_array.iter().map(|v| v.unwrap()).sum::<u64>(),
269            row_count_sum
270        );
271        assert_eq!(
272            memory_size_array.iter().map(|v| v.unwrap()).sum::<u64>(),
273            memory_size_sum as u64
274        );
275
276        Ok(())
277    }
278}