use crate::{
cache::utils::EntryID,
sync::{Arc, Mutex, atomic::AtomicBool},
};
use std::{
fs::File,
path::Path,
time::{SystemTime, UNIX_EPOCH},
};
use arrow::{
array::{ArrayRef, RecordBatch, UInt64Array},
datatypes::{DataType, Field, Schema},
};
use parquet::{
arrow::arrow_writer::ArrowWriter, basic::Compression, file::properties::WriterProperties,
};
struct TraceEvent {
entry_id: EntryID,
cache_memory_bytes: usize,
entry_size: usize,
time_stamp_nanos: u128,
}
pub struct CacheTracer {
enabled: AtomicBool,
entries: Mutex<Vec<TraceEvent>>,
}
impl std::fmt::Debug for CacheTracer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "CacheTracer")
}
}
impl CacheTracer {
pub(super) fn new() -> Self {
Self {
enabled: AtomicBool::new(false),
entries: Mutex::new(Vec::new()),
}
}
pub fn enable(&self) {
self.enabled
.store(true, std::sync::atomic::Ordering::Relaxed);
}
pub fn disable(&self) {
self.enabled
.store(false, std::sync::atomic::Ordering::Relaxed);
}
fn enabled(&self) -> bool {
self.enabled.load(std::sync::atomic::Ordering::Relaxed)
}
#[allow(unused)]
pub(super) fn trace_get(
&self,
entry_id: EntryID,
cache_memory_bytes: usize,
entry_size: usize,
) {
if !self.enabled() {
return;
}
let mut entries = self.entries.lock().unwrap();
let time_stamp_nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
entries.push(TraceEvent {
entry_id,
cache_memory_bytes,
entry_size,
time_stamp_nanos,
});
}
pub fn flush(&self, to_file: impl AsRef<Path>) {
let mut entries = self.entries.lock().unwrap();
if entries.is_empty() {
return; }
let schema = Arc::new(Schema::new(vec![
Field::new("entry_id", DataType::UInt64, false),
Field::new("entry_size", DataType::UInt64, false),
Field::new("cache_memory_bytes", DataType::UInt64, false),
Field::new("time_stamp_nanos", DataType::UInt64, false),
]));
let num_rows = entries.len();
let mut entry_ids = Vec::with_capacity(num_rows);
let mut entry_sizes = Vec::with_capacity(num_rows);
let mut cache_memory_bytes_vec = Vec::with_capacity(num_rows);
let mut time_stamp_nanos_vec = Vec::with_capacity(num_rows);
for event in entries.iter() {
let entry_id = event.entry_id;
entry_ids.push(usize::from(entry_id) as u64);
entry_sizes.push(event.entry_size as u64);
cache_memory_bytes_vec.push(event.cache_memory_bytes as u64);
time_stamp_nanos_vec.push(event.time_stamp_nanos as u64);
}
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(UInt64Array::from(entry_ids)) as ArrayRef,
Arc::new(UInt64Array::from(entry_sizes)) as ArrayRef,
Arc::new(UInt64Array::from(cache_memory_bytes_vec)) as ArrayRef,
Arc::new(UInt64Array::from(time_stamp_nanos_vec)) as ArrayRef,
],
)
.expect("Failed to create record batch");
let file = File::create(to_file).expect("Failed to create trace file");
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let mut writer = ArrowWriter::try_new(file, schema, Some(props))
.expect("Failed to create parquet writer");
writer
.write(&batch)
.expect("Failed to write batch to parquet file");
writer.close().expect("Failed to close parquet writer");
entries.clear(); }
}
#[cfg(test)]
mod tests {
use super::*;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use std::fs::File;
use tempfile::tempdir;
#[test]
fn test_cache_tracer_enable_disable() {
let tracer = CacheTracer::new();
assert!(!tracer.enabled());
tracer.enable();
assert!(tracer.enabled());
tracer.disable();
assert!(!tracer.enabled());
}
#[test]
fn test_cache_tracer_event_recording() {
let tracer = CacheTracer::new();
let entry_id = EntryID::from(1);
tracer.trace_get(entry_id, 1000, 100);
assert!(tracer.entries.lock().unwrap().is_empty());
tracer.enable();
tracer.trace_get(entry_id, 1000, 100);
assert_eq!(tracer.entries.lock().unwrap().len(), 1);
tracer.trace_get(entry_id, 2000, 100);
assert_eq!(tracer.entries.lock().unwrap().len(), 2);
let entries = tracer.entries.lock().unwrap();
assert_eq!(entries[0].entry_id, entry_id);
assert_eq!(entries[0].cache_memory_bytes, 1000);
assert_eq!(entries[0].entry_size, 100);
assert_eq!(entries[1].entry_id, entry_id);
assert_eq!(entries[1].cache_memory_bytes, 2000);
assert_eq!(entries[1].entry_size, 100);
}
#[test]
fn test_cache_tracer_flush_empty() {
let temp_dir = tempdir().unwrap();
let file_path = temp_dir.path().join("empty_trace.parquet");
let tracer = CacheTracer::new();
tracer.flush(&file_path);
assert!(!file_path.exists());
}
#[test]
fn test_cache_tracer_flush() {
let temp_dir = tempdir().unwrap();
let file_path = temp_dir.path().join("trace.parquet");
let tracer = CacheTracer::new();
tracer.enable();
let entry_id1 = EntryID::from(1);
let entry_id2 = EntryID::from(2);
tracer.trace_get(entry_id1, 1000, 100);
tracer.trace_get(entry_id2, 2000, 100);
tracer.flush(&file_path);
assert!(tracer.entries.lock().unwrap().is_empty());
assert!(file_path.exists());
let file = File::open(&file_path).unwrap();
let reader = ParquetRecordBatchReaderBuilder::try_new(file)
.unwrap()
.with_batch_size(1024)
.build()
.unwrap();
let batch = reader.into_iter().next().unwrap().unwrap();
assert_eq!(batch.num_rows(), 2);
assert_eq!(batch.num_columns(), 4);
let entry_id_array = batch
.column(0)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
assert_eq!(entry_id_array.value(0), 1);
assert_eq!(entry_id_array.value(1), 2);
let entry_size_array = batch
.column(1)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
assert_eq!(entry_size_array.value(0), 100);
assert_eq!(entry_size_array.value(1), 100);
let cache_memory_bytes_array = batch
.column(2)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
assert_eq!(cache_memory_bytes_array.value(0), 1000);
assert_eq!(cache_memory_bytes_array.value(1), 2000);
}
#[test]
fn test_cache_tracer_multiple_flush() {
let temp_dir = tempdir().unwrap();
let file_path1 = temp_dir.path().join("trace1.parquet");
let file_path2 = temp_dir.path().join("trace2.parquet");
let tracer = CacheTracer::new();
tracer.enable();
tracer.trace_get(EntryID::from(1), 1000, 100);
tracer.flush(&file_path1);
tracer.trace_get(EntryID::from(2), 2000, 100);
tracer.flush(&file_path2);
assert!(file_path1.exists());
assert!(file_path2.exists());
let file1 = File::open(&file_path1).unwrap();
let reader1 = ParquetRecordBatchReaderBuilder::try_new(file1)
.unwrap()
.with_batch_size(1024)
.build()
.unwrap();
let batch1 = reader1.into_iter().next().unwrap().unwrap();
assert_eq!(batch1.num_rows(), 1);
let file2 = File::open(&file_path2).unwrap();
let reader2 = ParquetRecordBatchReaderBuilder::try_new(file2)
.unwrap()
.with_batch_size(1024)
.build()
.unwrap();
let batch2 = reader2.into_iter().next().unwrap().unwrap();
assert_eq!(batch2.num_rows(), 1);
}
}