liquid_cache_parquet/cache/
stats.rs1use super::LiquidCache;
2use crate::{cache::id::ParquetArrayID, sync::Arc};
3use arrow::array::{ArrayBuilder, RecordBatch, StringBuilder, UInt64Builder};
4use arrow_schema::{DataType, Field, Schema, SchemaRef};
5use liquid_cache_storage::cache::CachedBatch;
6use parquet::{
7 arrow::ArrowWriter, basic::Compression, errors::ParquetError,
8 file::properties::WriterProperties,
9};
10use std::{fs::File, path::Path};
11
12struct StatsWriter {
13 writer: ArrowWriter<File>,
14 schema: SchemaRef,
15 file_path_builder: StringBuilder,
16 row_group_id_builder: UInt64Builder,
17 column_id_builder: UInt64Builder,
18 row_start_id_builder: UInt64Builder,
19 row_count_builder: UInt64Builder,
20 memory_size_builder: UInt64Builder,
21 cache_type_builder: StringBuilder,
22 reference_count_builder: UInt64Builder,
23}
24
25impl StatsWriter {
26 fn new(file_path: impl AsRef<Path>) -> Result<Self, ParquetError> {
27 let schema = Arc::new(Schema::new(vec![
28 Field::new("row_group_id", DataType::UInt64, false),
29 Field::new("column_id", DataType::UInt64, false),
30 Field::new("row_start_id", DataType::UInt64, false),
31 Field::new("row_count", DataType::UInt64, true),
32 Field::new("memory_size", DataType::UInt64, false),
33 Field::new("cache_type", DataType::Utf8, false),
34 Field::new("file_path", DataType::Utf8, false),
35 Field::new("reference_count", DataType::UInt64, false),
36 ]));
37
38 let file = File::create(file_path)?;
39 let write_props = WriterProperties::builder()
40 .set_compression(Compression::LZ4)
41 .set_created_by("liquid-cache-stats".to_string())
42 .build();
43 let writer = ArrowWriter::try_new(file, schema.clone(), Some(write_props))?;
44 Ok(Self {
45 writer,
46 schema,
47 file_path_builder: StringBuilder::with_capacity(8192, 8192),
48 row_group_id_builder: UInt64Builder::new(),
49 column_id_builder: UInt64Builder::new(),
50 row_start_id_builder: UInt64Builder::new(),
51 row_count_builder: UInt64Builder::new(),
52 memory_size_builder: UInt64Builder::new(),
53 cache_type_builder: StringBuilder::with_capacity(8192, 8192),
54 reference_count_builder: UInt64Builder::new(),
55 })
56 }
57
58 fn build_batch(&mut self) -> Result<RecordBatch, ParquetError> {
59 let row_group_id_array = self.row_group_id_builder.finish();
60 let column_id_array = self.column_id_builder.finish();
61 let row_start_id_array = self.row_start_id_builder.finish();
62 let row_count_array = self.row_count_builder.finish();
63 let memory_size_array = self.memory_size_builder.finish();
64 let cache_type_array = self.cache_type_builder.finish();
65 let file_path_array = self.file_path_builder.finish();
66 let reference_count_array = self.reference_count_builder.finish();
67 Ok(RecordBatch::try_new(
68 self.schema.clone(),
69 vec![
70 Arc::new(row_group_id_array),
71 Arc::new(column_id_array),
72 Arc::new(row_start_id_array),
73 Arc::new(row_count_array),
74 Arc::new(memory_size_array),
75 Arc::new(cache_type_array),
76 Arc::new(file_path_array),
77 Arc::new(reference_count_array),
78 ],
79 )?)
80 }
81
82 #[allow(clippy::too_many_arguments)]
83 fn append_entry(
84 &mut self,
85 file_path: &str,
86 row_group_id: u64,
87 column_id: u64,
88 row_start_id: u64,
89 row_count: Option<u64>,
90 memory_size: u64,
91 cache_type: &str,
92 reference_count: u64,
93 ) -> Result<(), ParquetError> {
94 self.row_group_id_builder.append_value(row_group_id);
95 self.column_id_builder.append_value(column_id);
96 self.row_start_id_builder.append_value(row_start_id);
97 self.row_count_builder.append_option(row_count);
98 self.memory_size_builder.append_value(memory_size);
99 self.cache_type_builder.append_value(cache_type);
100 self.reference_count_builder.append_value(reference_count);
101 self.file_path_builder.append_value(file_path);
102 if self.row_start_id_builder.len() >= 8192 {
103 let batch = self.build_batch()?;
104 self.writer.write(&batch)?;
105 }
106 Ok(())
107 }
108
109 fn finish(mut self) -> Result<(), ParquetError> {
110 let batch = self.build_batch()?;
111 self.writer.write(&batch)?;
112 self.writer.close()?;
113 Ok(())
114 }
115}
116
117impl LiquidCache {
118 pub fn compute_memory_usage_bytes(&self) -> u64 {
120 self.cache_store.budget().memory_usage_bytes() as u64
121 }
122
123 pub fn write_stats(&self, parquet_file_path: impl AsRef<Path>) -> Result<(), ParquetError> {
125 let mut writer = StatsWriter::new(parquet_file_path)?;
126 self.cache_store.for_each_entry(|entry_id, cached_batch| {
127 let memory_size = cached_batch.memory_usage_bytes();
128 let row_count = match cached_batch {
129 CachedBatch::MemoryArrow(array) => Some(array.len() as u64),
130 CachedBatch::MemoryLiquid(array) => Some(array.len() as u64),
131 CachedBatch::MemoryHybridLiquid(array) => Some(array.len() as u64),
132 CachedBatch::DiskLiquid(_) => None,
133 CachedBatch::DiskArrow(_) => None, };
135 let cache_type = match cached_batch {
136 CachedBatch::MemoryArrow(_) => "InMemory",
137 CachedBatch::MemoryLiquid(_) => "LiquidMemory",
138 CachedBatch::MemoryHybridLiquid(_) => "LiquidHybrid",
139 CachedBatch::DiskLiquid(_) => "OnDiskLiquid",
140 CachedBatch::DiskArrow(_) => "OnDiskArrow",
141 };
142 let reference_count = cached_batch.reference_count();
143 let entry_id = ParquetArrayID::from(*entry_id);
144 writer
145 .append_entry(
146 entry_id
147 .on_disk_path(self.cache_store.config().cache_root_dir())
148 .to_str()
149 .unwrap(),
150 entry_id.row_group_id_inner(),
151 entry_id.column_id_inner(),
152 entry_id.batch_id_inner() * self.batch_size() as u64,
153 row_count,
154 memory_size as u64,
155 cache_type,
156 reference_count as u64,
157 )
158 .unwrap();
159 });
160
161 writer.finish()?;
162 Ok(())
163 }
164}
165
166#[cfg(test)]
167mod tests {
168 use std::io::Read;
169
170 use crate::cache::{IoMode, id::BatchID};
171
172 use super::*;
173 use arrow::{
174 array::{Array, AsArray},
175 datatypes::UInt64Type,
176 };
177 use bytes::Bytes;
178 use liquid_cache_storage::{cache::squeeze_policies::Evict, cache_policies::LiquidPolicy};
179 use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
180 use tempfile::NamedTempFile;
181
182 #[tokio::test]
183 async fn test_stats_writer() -> Result<(), ParquetError> {
184 let tmp_dir = tempfile::tempdir().unwrap();
185 let cache = LiquidCache::new(
186 1024,
187 usize::MAX,
188 tmp_dir.path().to_path_buf(),
189 Box::new(LiquidPolicy::new()),
190 Box::new(Evict),
191 IoMode::Uring,
192 );
193 let array = Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3]));
194 let num_rows = 8 * 8 * 8 * 8;
195
196 let mut row_group_id_sum = 0;
197 let mut column_id_sum = 0;
198 let mut row_start_id_sum = 0;
199 let mut row_count_sum = 0;
200 let mut memory_size_sum = 0;
201 for file_no in 0..8 {
202 let file_name = format!("test_{file_no}.parquet");
203 let file = cache.register_or_get_file(file_name);
204 for rg in 0..8 {
205 let row_group = file.row_group(rg);
206 for col in 0..8 {
207 let column = row_group.create_column(
208 col,
209 Arc::new(Field::new(format!("test_{col}"), DataType::Int32, false)),
210 );
211 for batch in 0..8 {
212 let batch_id = BatchID::from_raw(batch);
213 assert!(column.insert(batch_id, array.clone()).await.is_ok());
214 row_group_id_sum += rg;
215 column_id_sum += col;
216 row_start_id_sum += *batch_id as u64 * cache.batch_size() as u64;
217 row_count_sum += array.len() as u64;
218 memory_size_sum += array.get_array_memory_size();
219
220 if batch.is_multiple_of(2) {
221 _ = column.get_arrow_array_test_only(batch_id).await.unwrap();
222 }
223 }
224 }
225 }
226 }
227
228 let mut tmp_file = NamedTempFile::new()?;
229 cache.write_stats(tmp_file.path())?;
230
231 let mut bytes = Vec::new();
233 tmp_file.read_to_end(&mut bytes)?;
234 let bytes = Bytes::from(bytes);
235 let reader = ParquetRecordBatchReader::try_new(bytes, 8192)?;
236
237 let batch = reader.into_iter().next().unwrap()?;
238 assert_eq!(batch.num_rows(), num_rows);
239
240 macro_rules! uint64_col {
241 ($batch:expr, $col_idx:expr) => {
242 $batch
243 .column_by_name($col_idx)
244 .unwrap()
245 .as_primitive::<UInt64Type>()
246 };
247 }
248
249 let row_group_id_array = uint64_col!(batch, "row_group_id");
250 let column_id_array = uint64_col!(batch, "column_id");
251 let row_start_id_array = uint64_col!(batch, "row_start_id");
252 let row_count_array = uint64_col!(batch, "row_count");
253 let memory_size_array = uint64_col!(batch, "memory_size");
254
255 assert_eq!(
256 row_group_id_array.iter().map(|v| v.unwrap()).sum::<u64>(),
257 row_group_id_sum
258 );
259 assert_eq!(
260 column_id_array.iter().map(|v| v.unwrap()).sum::<u64>(),
261 column_id_sum
262 );
263 assert_eq!(
264 row_start_id_array.iter().map(|v| v.unwrap()).sum::<u64>(),
265 row_start_id_sum
266 );
267 assert_eq!(
268 row_count_array.iter().map(|v| v.unwrap()).sum::<u64>(),
269 row_count_sum
270 );
271 assert_eq!(
272 memory_size_array.iter().map(|v| v.unwrap()).sum::<u64>(),
273 memory_size_sum as u64
274 );
275
276 Ok(())
277 }
278}