use std::collections::BTreeMap;
use std::io::Write;
use std::sync::Arc;
use arrow::array::*;
use arrow::datatypes::*;
use arrow::error::ArrowError;
use parquet::arrow::ArrowWriter;
use parquet::basic::{Compression, ZstdLevel};
use parquet::errors::ParquetError;
use parquet::file::properties::WriterProperties;
use parquet::format::FileMetaData;
use crate::snapshot::{HashedSnapshot, Snapshot};
use crate::HistogramSnapshot;
#[derive(Clone, Debug)]
pub struct ParquetOptions {
compression: Compression,
max_batch_size: usize,
}
impl ParquetOptions {
pub fn new() -> Self {
Self::default()
}
pub fn compression_level(mut self, level: i32) -> Result<Self, ParquetError> {
let compression = Compression::ZSTD(ZstdLevel::try_new(level)?);
self.compression = compression;
Ok(self)
}
pub fn max_batch_size(mut self, batch_size: usize) -> Self {
self.max_batch_size = batch_size;
self
}
}
impl Default for ParquetOptions {
fn default() -> Self {
Self {
compression: Compression::UNCOMPRESSED,
max_batch_size: 1024 * 1024,
}
}
}
#[derive(Default)]
pub struct ParquetSchema {
counters: BTreeMap<String, Vec<Option<u64>>>,
gauges: BTreeMap<String, Vec<Option<i64>>>,
histograms: BTreeMap<String, Vec<Option<HistogramSnapshot>>>,
summary_percentiles: Option<Vec<f64>>,
}
impl ParquetSchema {
pub fn new(percentiles: Option<Vec<f64>>) -> Self {
ParquetSchema {
counters: BTreeMap::new(),
gauges: BTreeMap::new(),
histograms: BTreeMap::new(),
summary_percentiles: percentiles,
}
}
pub fn push(&mut self, snapshot: Snapshot) {
let (counters, gauges, histograms) =
(snapshot.counters, snapshot.gauges, snapshot.histograms);
for counter in counters {
self.counters.entry(counter.0).or_default();
}
for gauge in gauges {
self.gauges.entry(gauge.0).or_default();
}
for h in histograms {
self.histograms.entry(h.0).or_default();
}
}
pub fn finalize(
self,
writer: impl Write + Send,
options: ParquetOptions,
) -> Result<ParquetWriter<impl Write + Send>, ParquetError> {
let mut fields: Vec<Field> = Vec::with_capacity(
1 + self.counters.len() + self.gauges.len() + (self.histograms.len() * 3),
);
fields.push(Field::new("timestamp", DataType::UInt64, false));
for counter in self.counters.keys() {
fields.push(Field::new(counter.clone(), DataType::UInt64, true));
}
for gauge in self.gauges.keys() {
fields.push(Field::new(gauge.clone(), DataType::Int64, true));
}
for h in self.histograms.keys() {
fields.push(Field::new(
format!("{}:grouping_power", h),
DataType::UInt8,
true,
));
fields.push(Field::new(
format!("{}:max_config_power", h),
DataType::UInt8,
true,
));
fields.push(Field::new(
format!("{}:buckets", h),
DataType::List(Arc::new(Field::new("item", DataType::UInt64, true))),
true,
));
if let Some(ref x) = self.summary_percentiles {
for percentile in x {
fields.push(Field::new(
format!("{}:p{}", h, percentile),
DataType::UInt64,
true,
));
}
}
}
let schema = Arc::new(Schema::new(fields));
let props = WriterProperties::builder()
.set_compression(options.compression)
.build();
let arrow_writer = ArrowWriter::try_new(writer, schema.clone(), Some(props))?;
Ok(ParquetWriter {
writer: arrow_writer,
options,
schema,
timestamps: Vec::new(),
counters: self.counters,
gauges: self.gauges,
histograms: self.histograms,
summary_percentiles: self.summary_percentiles,
})
}
}
pub struct ParquetWriter<W: Write + Send> {
writer: ArrowWriter<W>,
options: ParquetOptions,
schema: Arc<Schema>,
timestamps: Vec<u64>,
counters: BTreeMap<String, Vec<Option<u64>>>,
gauges: BTreeMap<String, Vec<Option<i64>>>,
histograms: BTreeMap<String, Vec<Option<HistogramSnapshot>>>,
summary_percentiles: Option<Vec<f64>>,
}
impl<W: Write + Send> ParquetWriter<W> {
pub fn push(&mut self, snapshot: Snapshot) -> Result<(), ParquetError> {
let mut hs: HashedSnapshot = HashedSnapshot::from(snapshot);
self.timestamps.push(hs.ts);
for (key, v) in self.counters.iter_mut() {
v.push(hs.counters.remove(key));
}
for (key, v) in self.gauges.iter_mut() {
v.push(hs.gauges.remove(key));
}
for (key, v) in self.histograms.iter_mut() {
v.push(hs.histograms.remove(key));
}
if self.timestamps.len() == self.options.max_batch_size {
let batch = self.snapshots_to_recordbatch()?;
self.writer.write(&batch)?;
}
Ok(())
}
pub fn finalize(mut self) -> Result<FileMetaData, ParquetError> {
let batch = self.snapshots_to_recordbatch()?;
self.writer.write(&batch)?;
self.writer.close()
}
fn snapshots_to_recordbatch(&mut self) -> Result<RecordBatch, ArrowError> {
let mut columns: Vec<Arc<dyn Array>> = Vec::with_capacity(self.schema.fields().len());
columns.push(Arc::new(UInt64Array::from(std::mem::take(
&mut self.timestamps,
))));
for (_, val) in self.counters.iter_mut() {
let col = std::mem::take(val);
columns.push(Arc::new(UInt64Array::from(col)));
}
for (_, val) in self.gauges.iter_mut() {
let col = std::mem::take(val);
columns.push(Arc::new(Int64Array::from(col)));
}
for (_, val) in self.histograms.iter_mut() {
let hists = std::mem::take(val);
let mut gps: Vec<Option<u8>> = Vec::with_capacity(hists.len());
let mut maxes: Vec<Option<u8>> = Vec::with_capacity(hists.len());
let mut buckets = ListBuilder::new(UInt64Builder::new());
let mut summaries: Vec<Vec<Option<u64>>> = match self.summary_percentiles {
None => Vec::new(),
Some(ref x) => {
let mut outer = Vec::with_capacity(x.len());
for _ in 0..x.len() {
outer.push(Vec::with_capacity(hists.len()));
}
outer
}
};
for h in hists {
if let Some(x) = h {
gps.push(Some(x.config().grouping_power()));
maxes.push(Some(x.config().max_value_power()));
buckets.append_value(
x.into_iter()
.map(|x| Some(x.count()))
.collect::<Vec<Option<u64>>>(),
);
if let Some(ref percentiles) = self.summary_percentiles {
for (idx, percentile) in percentiles.iter().enumerate() {
let v = x.percentile(*percentile).map(|x| x.end()).ok();
summaries[idx].push(v);
}
}
} else {
gps.push(None);
maxes.push(None);
buckets.append_null();
if let Some(ref percentiles) = self.summary_percentiles {
for (idx, _) in percentiles.iter().enumerate() {
summaries[idx].push(None);
}
}
}
}
columns.push(Arc::new(UInt8Array::from(gps)));
columns.push(Arc::new(UInt8Array::from(maxes)));
columns.push(Arc::new(buckets.finish()));
if !summaries.is_empty() {
for mut col in summaries {
let v = std::mem::take(&mut col);
columns.push(Arc::new(UInt64Array::from(v)));
}
}
}
RecordBatch::try_new(self.schema.clone(), columns)
}
}