use std::collections::{BTreeMap, HashMap};
use std::io::Write;
use std::sync::Arc;
use arrow::array::*;
use arrow::datatypes::*;
use arrow::error::ArrowError;
use histogram::Histogram;
use parquet::arrow::ArrowWriter;
use parquet::basic::{Compression, ZstdLevel};
use parquet::errors::ParquetError;
use parquet::file::properties::WriterProperties;
use parquet::format::{FileMetaData, KeyValue};
use crate::snapshot::{HashedSnapshot, Snapshot};
const DEFAULT_MAX_BATCH_SIZE: usize = 1024 * 1024;
#[derive(Clone, Debug)]
pub struct ParquetCompression {
inner: Compression,
}
impl ParquetCompression {
pub fn none() -> Self {
Self {
inner: Compression::UNCOMPRESSED,
}
}
pub fn zstd(level: i32) -> Result<Self, ParquetError> {
Ok(Self {
inner: Compression::ZSTD(ZstdLevel::try_new(level)?),
})
}
}
impl Default for ParquetCompression {
fn default() -> Self {
Self::zstd(3).unwrap()
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ParquetHistogramStorage {
Standard,
Sparse,
}
#[derive(Clone, Debug)]
pub struct ParquetOptions {
compression: ParquetCompression,
max_batch_size: usize,
histogram: ParquetHistogramStorage,
}
impl ParquetOptions {
pub fn new() -> Self {
Self::default()
}
pub fn compression(mut self, compression: ParquetCompression) -> Self {
self.compression = compression;
self
}
pub fn max_batch_size(mut self, batch_size: usize) -> Self {
self.max_batch_size = batch_size;
self
}
pub fn histogram(mut self, histogram: ParquetHistogramStorage) -> Self {
self.histogram = histogram;
self
}
}
impl Default for ParquetOptions {
fn default() -> Self {
Self {
compression: Default::default(),
max_batch_size: DEFAULT_MAX_BATCH_SIZE,
histogram: ParquetHistogramStorage::Standard,
}
}
}
#[derive(Default)]
pub struct ParquetSchema {
counters: BTreeMap<String, HashMap<String, String>>,
gauges: BTreeMap<String, HashMap<String, String>>,
histograms: BTreeMap<String, HashMap<String, String>>,
summary_percentiles: Option<Vec<f64>>,
metadata: HashMap<String, String>,
rows: usize,
}
impl ParquetSchema {
pub fn new(percentiles: Option<Vec<f64>>) -> Self {
ParquetSchema {
counters: BTreeMap::new(),
gauges: BTreeMap::new(),
histograms: BTreeMap::new(),
summary_percentiles: percentiles,
metadata: HashMap::new(),
rows: 0,
}
}
pub fn push(&mut self, snapshot: Snapshot) {
let (counters, gauges, histograms) =
(snapshot.counters, snapshot.gauges, snapshot.histograms);
for counter in counters {
self.counters
.entry(counter.name)
.or_insert(counter.metadata);
}
for gauge in gauges {
self.gauges.entry(gauge.name).or_insert(gauge.metadata);
}
for histogram in histograms {
self.histograms
.entry(histogram.name)
.or_insert(histogram.metadata);
}
if self.metadata.is_empty() && !snapshot.metadata.is_empty() {
self.metadata = snapshot.metadata;
}
self.rows += 1;
}
pub fn finalize(
self,
writer: impl Write + Send,
options: ParquetOptions,
) -> Result<ParquetWriter<impl Write + Send>, ParquetError> {
let mut fields: Vec<Field> = Vec::with_capacity(
1 + self.counters.len() + self.gauges.len() + (self.histograms.len() * 3),
);
fields.push(
Field::new("timestamp", DataType::UInt64, false).with_metadata(HashMap::from([(
"metric_type".to_owned(),
"timestamp".to_owned(),
)])),
);
let mut counters = BTreeMap::new();
for (counter, mut metadata) in self.counters.into_iter() {
metadata.insert("metric_type".to_string(), "counter".to_string());
fields
.push(Field::new(counter.clone(), DataType::UInt64, true).with_metadata(metadata));
counters.insert(counter, Vec::with_capacity(self.rows));
}
let mut gauges = BTreeMap::new();
for (gauge, mut metadata) in self.gauges.into_iter() {
metadata.insert("metric_type".to_string(), "gauge".to_string());
fields.push(Field::new(gauge.clone(), DataType::Int64, true).with_metadata(metadata));
gauges.insert(gauge, Vec::with_capacity(self.rows));
}
let mut histograms = BTreeMap::new();
for (histogram, mut metadata) in self.histograms.into_iter() {
match options.histogram {
ParquetHistogramStorage::Standard => {
metadata.insert("metric_type".to_string(), "histogram".to_string());
fields.push(
Field::new(
format!("{histogram}:buckets"),
DataType::new_list(DataType::UInt64, true),
true,
)
.with_metadata(metadata.clone()),
);
}
ParquetHistogramStorage::Sparse => {
metadata.insert("metric_type".to_string(), "sparse histogram".to_string());
fields.push(
Field::new(
format!("{histogram}:bucket_index"),
DataType::new_list(DataType::UInt64, true),
true,
)
.with_metadata(metadata.clone()),
);
fields.push(
Field::new(
format!("{histogram}:bucket_count"),
DataType::new_list(DataType::UInt64, true),
true,
)
.with_metadata(metadata.clone()),
);
}
};
if let Some(ref x) = self.summary_percentiles {
for percentile in x {
fields.push(
Field::new(format!("{histogram}:p{percentile}"), DataType::UInt64, true)
.with_metadata(metadata.clone()),
);
}
}
histograms.insert(histogram, Vec::with_capacity(self.rows));
}
let metadata: Option<Vec<KeyValue>> = if self.metadata.is_empty() {
None
} else {
Some(
self.metadata
.into_iter()
.map(|(key, value)| KeyValue {
key,
value: Some(value),
})
.collect(),
)
};
let schema = Arc::new(Schema::new(fields));
let props = WriterProperties::builder()
.set_compression(options.compression.inner)
.set_key_value_metadata(metadata)
.build();
let arrow_writer = ArrowWriter::try_new(writer, schema.clone(), Some(props))?;
Ok(ParquetWriter {
writer: arrow_writer,
options,
schema,
timestamps: Vec::new(),
counters,
gauges,
histograms,
summary_percentiles: self.summary_percentiles,
})
}
}
pub struct ParquetWriter<W: Write + Send> {
writer: ArrowWriter<W>,
options: ParquetOptions,
schema: Arc<Schema>,
timestamps: Vec<u64>,
counters: BTreeMap<String, Vec<Option<u64>>>,
gauges: BTreeMap<String, Vec<Option<i64>>>,
histograms: BTreeMap<String, Vec<Option<Histogram>>>,
summary_percentiles: Option<Vec<f64>>,
}
impl<W: Write + Send> ParquetWriter<W> {
pub fn push(&mut self, snapshot: Snapshot) -> Result<(), ParquetError> {
let mut hs: HashedSnapshot = HashedSnapshot::from(snapshot);
self.timestamps.push(hs.ts);
for (key, v) in self.counters.iter_mut() {
v.push(hs.counters.remove(key).map(|v| v.value));
}
for (key, v) in self.gauges.iter_mut() {
v.push(hs.gauges.remove(key).map(|v| v.value));
}
for (key, v) in self.histograms.iter_mut() {
v.push(hs.histograms.remove(key).map(|v| v.value));
}
if self.timestamps.len() == self.options.max_batch_size {
let batch = self.snapshots_to_recordbatch()?;
self.writer.write(&batch)?;
}
Ok(())
}
pub fn finalize(mut self) -> Result<FileMetaData, ParquetError> {
let batch = self.snapshots_to_recordbatch()?;
self.writer.write(&batch)?;
self.writer.close()
}
fn snapshots_to_recordbatch(&mut self) -> Result<RecordBatch, ArrowError> {
let mut columns: Vec<Arc<dyn Array>> = Vec::with_capacity(self.schema.fields().len());
columns.push(Arc::new(UInt64Array::from(std::mem::take(
&mut self.timestamps,
))));
for (_, val) in self.counters.iter_mut() {
let col = std::mem::take(val);
columns.push(Arc::new(UInt64Array::from(col)));
}
for (_, val) in self.gauges.iter_mut() {
let col = std::mem::take(val);
columns.push(Arc::new(Int64Array::from(col)));
}
for (_, val) in self.histograms.iter_mut() {
let hists = std::mem::take(val);
let mut buckets = match self.options.histogram {
ParquetHistogramStorage::Standard => vec![ListBuilder::new(UInt64Builder::new())],
ParquetHistogramStorage::Sparse => vec![
ListBuilder::new(UInt64Builder::new()),
ListBuilder::new(UInt64Builder::new()),
],
};
let mut summaries: Vec<Vec<Option<u64>>> = match self.summary_percentiles {
None => Vec::new(),
Some(ref x) => {
let mut outer = Vec::with_capacity(x.len());
for _ in 0..x.len() {
outer.push(Vec::with_capacity(hists.len()));
}
outer
}
};
for h in hists {
if let Some(x) = h {
match self.options.histogram {
ParquetHistogramStorage::Standard => buckets[0].append_value(
x.into_iter()
.map(|x| Some(x.count()))
.collect::<Vec<Option<u64>>>(),
),
ParquetHistogramStorage::Sparse => {
let sparse = histogram::SparseHistogram::from(&x);
buckets[0].append_value(
sparse
.index
.into_iter()
.map(|x| Some(x as u64))
.collect::<Vec<Option<u64>>>(),
);
buckets[1].append_value(
sparse
.count
.into_iter()
.map(Some)
.collect::<Vec<Option<u64>>>(),
);
}
};
if let Some(ref percentiles) = self.summary_percentiles {
for (idx, percentile) in percentiles.iter().enumerate() {
let v = x.percentile(*percentile).map(|x| x.end()).ok();
summaries[idx].push(v);
}
}
} else {
buckets[0].append_null();
if self.options.histogram == ParquetHistogramStorage::Sparse {
buckets[1].append_null();
}
if let Some(ref percentiles) = self.summary_percentiles {
for (idx, _) in percentiles.iter().enumerate() {
summaries[idx].push(None);
}
}
}
}
columns.push(Arc::new(buckets[0].finish()));
if self.options.histogram == ParquetHistogramStorage::Sparse {
columns.push(Arc::new(buckets[1].finish()));
}
if !summaries.is_empty() {
for mut col in summaries {
let v = std::mem::take(&mut col);
columns.push(Arc::new(UInt64Array::from(v)));
}
}
}
RecordBatch::try_new(self.schema.clone(), columns)
}
}