use crate::event::Event;
use crate::schema::create_intermediate_schema;
use anyhow::{Context as _, Result};
use arrow::{
array::{
ArrayRef, Int32Builder, Int64Builder, StringBuilder, UInt8Builder, UInt32Builder,
UInt64Builder,
},
datatypes::Schema,
record_batch::RecordBatch,
};
use log::debug;
use parquet::{
arrow::ArrowWriter,
basic::Compression,
file::{metadata::KeyValue, properties::WriterProperties},
};
use std::fs::File;
use std::sync::Arc;
pub const BATCH_SIZE: usize = 10_000;
pub const PARQUET_METADATA_SAMPLE_FREQ_HZ_KEY: &str = "probex.sample_freq_hz";
pub const PARQUET_METADATA_STACK_TRACE_FORMAT_KEY: &str = "probex.stack_trace_format";
pub const STACK_TRACE_FORMAT_SYMBOLIZED_V1: &str = "symbolized_v1";
pub struct ParquetBatchWriter {
writer: ArrowWriter<File>,
schema: Arc<Schema>,
batch: Vec<Event>,
total_written: usize,
}
impl ParquetBatchWriter {
pub fn new(path: &str, sample_freq_hz: u64) -> Result<Self> {
let schema = Arc::new(create_intermediate_schema());
let file =
File::create(path).with_context(|| format!("failed to create output file {}", path))?;
let key_value_metadata = vec![KeyValue::new(
PARQUET_METADATA_SAMPLE_FREQ_HZ_KEY.to_string(),
sample_freq_hz.to_string(),
)];
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.set_key_value_metadata(Some(key_value_metadata))
.build();
let writer = ArrowWriter::try_new(file, schema.clone(), Some(props))
.with_context(|| "failed to create Parquet writer")?;
Ok(Self {
writer,
schema,
batch: Vec::with_capacity(BATCH_SIZE),
total_written: 0,
})
}
pub fn push(&mut self, event: Event) -> Result<()> {
self.batch.push(event);
if self.batch.len() >= BATCH_SIZE {
self.flush_batch()?;
}
Ok(())
}
pub fn flush_batch(&mut self) -> Result<()> {
if self.batch.is_empty() {
return Ok(());
}
let batch_len = self.batch.len();
let mut event_type_builder = StringBuilder::with_capacity(batch_len, batch_len * 20);
let mut ts_ns_builder = UInt64Builder::with_capacity(batch_len);
let mut pid_builder = UInt32Builder::with_capacity(batch_len);
let mut tgid_builder = UInt32Builder::with_capacity(batch_len);
let mut process_name_builder = StringBuilder::with_capacity(batch_len, batch_len * 24);
let mut stack_id_builder = Int32Builder::with_capacity(batch_len);
let mut kernel_stack_id_builder = Int32Builder::with_capacity(batch_len);
let mut stack_kind_builder = StringBuilder::with_capacity(batch_len, batch_len * 8);
let mut stack_frames_builder = StringBuilder::with_capacity(batch_len, batch_len * 64);
let mut stack_trace_builder = StringBuilder::with_capacity(batch_len, batch_len * 48);
let mut cpu_builder = UInt8Builder::with_capacity(batch_len);
let mut prev_pid_builder = UInt32Builder::with_capacity(batch_len);
let mut next_pid_builder = UInt32Builder::with_capacity(batch_len);
let mut prev_state_builder = Int64Builder::with_capacity(batch_len);
let mut parent_pid_builder = UInt32Builder::with_capacity(batch_len);
let mut child_pid_builder = UInt32Builder::with_capacity(batch_len);
let mut exit_code_builder = Int32Builder::with_capacity(batch_len);
let mut address_builder = UInt64Builder::with_capacity(batch_len);
let mut error_code_builder = UInt64Builder::with_capacity(batch_len);
let mut fd_builder = Int64Builder::with_capacity(batch_len);
let mut count_builder = UInt64Builder::with_capacity(batch_len);
let mut ret_builder = Int64Builder::with_capacity(batch_len);
let mut submit_ts_ns_builder = UInt64Builder::with_capacity(batch_len);
let mut io_uring_opcode_builder = UInt8Builder::with_capacity(batch_len);
let mut io_uring_res_builder = Int32Builder::with_capacity(batch_len);
for event in self.batch.drain(..) {
event_type_builder.append_value(event.event_type);
ts_ns_builder.append_value(event.ts_ns);
pid_builder.append_value(event.pid);
tgid_builder.append_value(event.tgid);
process_name_builder.append_option(event.process_name.as_deref());
stack_id_builder.append_option(event.stack_id);
kernel_stack_id_builder.append_option(event.kernel_stack_id);
stack_kind_builder.append_option(event.stack_kind);
stack_frames_builder.append_option(event.stack_frames.as_deref());
stack_trace_builder.append_option(event.stack_trace.as_deref());
cpu_builder.append_value(event.cpu);
prev_pid_builder.append_option(event.prev_pid);
next_pid_builder.append_option(event.next_pid);
prev_state_builder.append_option(event.prev_state);
parent_pid_builder.append_option(event.parent_pid);
child_pid_builder.append_option(event.child_pid);
exit_code_builder.append_option(event.exit_code);
address_builder.append_option(event.address);
error_code_builder.append_option(event.error_code);
fd_builder.append_option(event.fd);
count_builder.append_option(event.count);
ret_builder.append_option(event.ret);
submit_ts_ns_builder.append_option(event.submit_ts_ns);
io_uring_opcode_builder.append_option(event.io_uring_opcode);
io_uring_res_builder.append_option(event.io_uring_res);
}
let columns: Vec<ArrayRef> = vec![
Arc::new(event_type_builder.finish()),
Arc::new(ts_ns_builder.finish()),
Arc::new(pid_builder.finish()),
Arc::new(tgid_builder.finish()),
Arc::new(process_name_builder.finish()),
Arc::new(stack_id_builder.finish()),
Arc::new(kernel_stack_id_builder.finish()),
Arc::new(stack_kind_builder.finish()),
Arc::new(stack_frames_builder.finish()),
Arc::new(stack_trace_builder.finish()),
Arc::new(cpu_builder.finish()),
Arc::new(prev_pid_builder.finish()),
Arc::new(next_pid_builder.finish()),
Arc::new(prev_state_builder.finish()),
Arc::new(parent_pid_builder.finish()),
Arc::new(child_pid_builder.finish()),
Arc::new(exit_code_builder.finish()),
Arc::new(address_builder.finish()),
Arc::new(error_code_builder.finish()),
Arc::new(fd_builder.finish()),
Arc::new(count_builder.finish()),
Arc::new(ret_builder.finish()),
Arc::new(submit_ts_ns_builder.finish()),
Arc::new(io_uring_opcode_builder.finish()),
Arc::new(io_uring_res_builder.finish()),
];
let record_batch = RecordBatch::try_new(self.schema.clone(), columns)
.with_context(|| "failed to create record batch")?;
self.writer
.write(&record_batch)
.with_context(|| "failed to write record batch")?;
self.total_written += batch_len;
debug!(
"Flushed {} events to Parquet (total: {})",
batch_len, self.total_written
);
Ok(())
}
pub fn finish(mut self) -> Result<usize> {
self.flush_batch()?;
self.writer
.close()
.with_context(|| "failed to close Parquet writer")?;
Ok(self.total_written)
}
}