liquid_cache_datafusion/cache/
stats.rs1use super::LiquidCacheParquet;
2use crate::{cache::id::ParquetArrayID, sync::Arc};
3use arrow::array::{ArrayBuilder, RecordBatch, StringBuilder, UInt64Builder};
4use arrow_schema::{DataType, Field, Schema, SchemaRef};
5use liquid_cache::cache::CacheEntry;
6use parquet::{
7 arrow::ArrowWriter, basic::Compression, errors::ParquetError,
8 file::properties::WriterProperties,
9};
10use std::{fs::File, path::Path};
11
12struct StatsWriter {
13 writer: ArrowWriter<File>,
14 schema: SchemaRef,
15 file_path_builder: StringBuilder,
16 row_group_id_builder: UInt64Builder,
17 column_id_builder: UInt64Builder,
18 row_start_id_builder: UInt64Builder,
19 row_count_builder: UInt64Builder,
20 memory_size_builder: UInt64Builder,
21 cache_type_builder: StringBuilder,
22 reference_count_builder: UInt64Builder,
23}
24
25impl StatsWriter {
26 fn new(file_path: impl AsRef<Path>) -> Result<Self, ParquetError> {
27 let schema = Arc::new(Schema::new(vec![
28 Field::new("row_group_id", DataType::UInt64, false),
29 Field::new("column_id", DataType::UInt64, false),
30 Field::new("row_start_id", DataType::UInt64, false),
31 Field::new("row_count", DataType::UInt64, true),
32 Field::new("memory_size", DataType::UInt64, false),
33 Field::new("cache_type", DataType::Utf8, false),
34 Field::new("file_path", DataType::Utf8, false),
35 Field::new("reference_count", DataType::UInt64, false),
36 ]));
37
38 let file = File::create(file_path)?;
39 let write_props = WriterProperties::builder()
40 .set_compression(Compression::LZ4)
41 .set_created_by("liquid-cache-stats".to_string())
42 .build();
43 let writer = ArrowWriter::try_new(file, schema.clone(), Some(write_props))?;
44 Ok(Self {
45 writer,
46 schema,
47 file_path_builder: StringBuilder::with_capacity(8192, 8192),
48 row_group_id_builder: UInt64Builder::new(),
49 column_id_builder: UInt64Builder::new(),
50 row_start_id_builder: UInt64Builder::new(),
51 row_count_builder: UInt64Builder::new(),
52 memory_size_builder: UInt64Builder::new(),
53 cache_type_builder: StringBuilder::with_capacity(8192, 8192),
54 reference_count_builder: UInt64Builder::new(),
55 })
56 }
57
58 fn build_batch(&mut self) -> Result<RecordBatch, ParquetError> {
59 let row_group_id_array = self.row_group_id_builder.finish();
60 let column_id_array = self.column_id_builder.finish();
61 let row_start_id_array = self.row_start_id_builder.finish();
62 let row_count_array = self.row_count_builder.finish();
63 let memory_size_array = self.memory_size_builder.finish();
64 let cache_type_array = self.cache_type_builder.finish();
65 let file_path_array = self.file_path_builder.finish();
66 let reference_count_array = self.reference_count_builder.finish();
67 Ok(RecordBatch::try_new(
68 self.schema.clone(),
69 vec![
70 Arc::new(row_group_id_array),
71 Arc::new(column_id_array),
72 Arc::new(row_start_id_array),
73 Arc::new(row_count_array),
74 Arc::new(memory_size_array),
75 Arc::new(cache_type_array),
76 Arc::new(file_path_array),
77 Arc::new(reference_count_array),
78 ],
79 )?)
80 }
81
82 #[allow(clippy::too_many_arguments)]
83 fn append_entry(
84 &mut self,
85 file_path: &str,
86 row_group_id: u64,
87 column_id: u64,
88 row_start_id: u64,
89 row_count: Option<u64>,
90 memory_size: u64,
91 cache_type: &str,
92 reference_count: u64,
93 ) -> Result<(), ParquetError> {
94 self.row_group_id_builder.append_value(row_group_id);
95 self.column_id_builder.append_value(column_id);
96 self.row_start_id_builder.append_value(row_start_id);
97 self.row_count_builder.append_option(row_count);
98 self.memory_size_builder.append_value(memory_size);
99 self.cache_type_builder.append_value(cache_type);
100 self.reference_count_builder.append_value(reference_count);
101 self.file_path_builder.append_value(file_path);
102 if self.row_start_id_builder.len() >= 8192 {
103 let batch = self.build_batch()?;
104 self.writer.write(&batch)?;
105 }
106 Ok(())
107 }
108
109 fn finish(mut self) -> Result<(), ParquetError> {
110 let batch = self.build_batch()?;
111 self.writer.write(&batch)?;
112 self.writer.close()?;
113 Ok(())
114 }
115}
116
117impl LiquidCacheParquet {
118 pub fn compute_memory_usage_bytes(&self) -> u64 {
120 self.cache_store.budget().memory_usage_bytes() as u64
121 }
122
123 pub fn write_stats(&self, parquet_file_path: impl AsRef<Path>) -> Result<(), ParquetError> {
125 let mut writer = StatsWriter::new(parquet_file_path)?;
126 self.cache_store.for_each_entry(|entry_id, cached_batch| {
127 let memory_size = cached_batch.memory_usage_bytes();
128 let row_count = match cached_batch {
129 CacheEntry::MemoryArrow(array) => Some(array.len() as u64),
130 CacheEntry::MemoryLiquid(array) => Some(array.len() as u64),
131 CacheEntry::MemorySqueezedLiquid(array) => Some(array.len() as u64),
132 CacheEntry::DiskLiquid(_) => None,
133 CacheEntry::DiskArrow(_) => None, };
135 let cache_type = match cached_batch {
136 CacheEntry::MemoryArrow(_) => "InMemory",
137 CacheEntry::MemoryLiquid(_) => "LiquidMemory",
138 CacheEntry::MemorySqueezedLiquid(_) => "LiquidSqueezed",
139 CacheEntry::DiskLiquid(_) => "OnDiskLiquid",
140 CacheEntry::DiskArrow(_) => "OnDiskArrow",
141 };
142 let reference_count = cached_batch.reference_count();
143 let entry_id = ParquetArrayID::from(*entry_id);
144 writer
145 .append_entry(
146 entry_id
147 .on_disk_liquid_path(self.cache_store.config().cache_root_dir())
148 .to_str()
149 .unwrap(),
150 entry_id.row_group_id_inner(),
151 entry_id.column_id_inner(),
152 entry_id.batch_id_inner() * self.batch_size() as u64,
153 row_count,
154 memory_size as u64,
155 cache_type,
156 reference_count as u64,
157 )
158 .unwrap();
159 });
160
161 writer.finish()?;
162 Ok(())
163 }
164}
165
166#[cfg(test)]
167mod tests {
168 use std::io::Read;
169
170 use crate::cache::{IoMode, id::BatchID};
171
172 use super::*;
173 use arrow::{
174 array::{Array, AsArray},
175 datatypes::UInt64Type,
176 };
177 use bytes::Bytes;
178 use liquid_cache::{
179 cache::{AlwaysHydrate, squeeze_policies::Evict},
180 cache_policies::LiquidPolicy,
181 };
182 use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
183 use tempfile::NamedTempFile;
184
185 #[tokio::test]
186 async fn test_stats_writer() -> Result<(), ParquetError> {
187 let tmp_dir = tempfile::tempdir().unwrap();
188 let cache = LiquidCacheParquet::new(
189 1024,
190 usize::MAX,
191 tmp_dir.path().to_path_buf(),
192 Box::new(LiquidPolicy::new()),
193 Box::new(Evict),
194 Box::new(AlwaysHydrate::new()),
195 IoMode::Uring,
196 );
197 let fields: Vec<Field> = (0..8)
198 .map(|i| Field::new(format!("test_{i}"), DataType::Int32, false))
199 .collect();
200 let schema = Arc::new(Schema::new(fields));
201 let array = Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3]));
202 let num_rows = 8 * 8 * 8 * 8;
203
204 let mut row_group_id_sum = 0;
205 let mut column_id_sum = 0;
206 let mut row_start_id_sum = 0;
207 let mut row_count_sum = 0;
208 let mut memory_size_sum = 0;
209 for file_no in 0..8 {
210 let file_name = format!("test_{file_no}.parquet");
211 let file = cache.register_or_get_file(file_name, schema.clone());
212 for rg in 0..8 {
213 let row_group = file.create_row_group(rg, vec![]);
214 for col in 0..8 {
215 let column = row_group.get_column(col).unwrap();
216 for batch in 0..8 {
217 let batch_id = BatchID::from_raw(batch);
218 assert!(column.insert(batch_id, array.clone()).await.is_ok());
219 row_group_id_sum += rg;
220 column_id_sum += col;
221 row_start_id_sum += *batch_id as u64 * cache.batch_size() as u64;
222 row_count_sum += array.len() as u64;
223 memory_size_sum += array.get_array_memory_size();
224
225 if batch.is_multiple_of(2) {
226 _ = column.get_arrow_array_test_only(batch_id).await.unwrap();
227 }
228 }
229 }
230 }
231 }
232
233 let mut tmp_file = NamedTempFile::new()?;
234 cache.write_stats(tmp_file.path())?;
235
236 let mut bytes = Vec::new();
238 tmp_file.read_to_end(&mut bytes)?;
239 let bytes = Bytes::from(bytes);
240 let reader = ParquetRecordBatchReader::try_new(bytes, 8192)?;
241
242 let batch = reader.into_iter().next().unwrap()?;
243 assert_eq!(batch.num_rows(), num_rows);
244
245 macro_rules! uint64_col {
246 ($batch:expr, $col_idx:expr) => {
247 $batch
248 .column_by_name($col_idx)
249 .unwrap()
250 .as_primitive::<UInt64Type>()
251 };
252 }
253
254 let row_group_id_array = uint64_col!(batch, "row_group_id");
255 let column_id_array = uint64_col!(batch, "column_id");
256 let row_start_id_array = uint64_col!(batch, "row_start_id");
257 let row_count_array = uint64_col!(batch, "row_count");
258 let memory_size_array = uint64_col!(batch, "memory_size");
259
260 assert_eq!(
261 row_group_id_array.iter().map(|v| v.unwrap()).sum::<u64>(),
262 row_group_id_sum
263 );
264 assert_eq!(
265 column_id_array.iter().map(|v| v.unwrap()).sum::<u64>(),
266 column_id_sum
267 );
268 assert_eq!(
269 row_start_id_array.iter().map(|v| v.unwrap()).sum::<u64>(),
270 row_start_id_sum
271 );
272 assert_eq!(
273 row_count_array.iter().map(|v| v.unwrap()).sum::<u64>(),
274 row_count_sum
275 );
276 assert_eq!(
277 memory_size_array.iter().map(|v| v.unwrap()).sum::<u64>(),
278 memory_size_sum as u64
279 );
280
281 Ok(())
282 }
283}