use std::collections::{BTreeMap, HashMap};
use std::io::Write;
use std::sync::Arc;
use arrow::array::*;
use arrow::datatypes::*;
use parquet::arrow::ArrowWriter;
use parquet::basic::{Compression, ZstdLevel};
use parquet::errors::ParquetError;
use parquet::file::properties::WriterProperties;
use parquet::format::{FileMetaData, KeyValue};
use crate::snapshot::{HashedSnapshot, Snapshot};
const DEFAULT_MAX_BATCH_SIZE: usize = 50_000;
#[derive(Clone, Debug)]
pub struct ParquetCompression {
inner: Compression,
}
impl ParquetCompression {
pub fn none() -> Self {
Self {
inner: Compression::UNCOMPRESSED,
}
}
pub fn zstd(level: i32) -> Result<Self, ParquetError> {
Ok(Self {
inner: Compression::ZSTD(ZstdLevel::try_new(level)?),
})
}
}
impl Default for ParquetCompression {
fn default() -> Self {
Self::zstd(3).unwrap()
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ParquetHistogramType {
Standard,
Sparse,
}
#[derive(Clone, Debug)]
pub struct ParquetOptions {
compression: ParquetCompression,
max_batch_size: usize,
histogram_type: ParquetHistogramType,
}
impl ParquetOptions {
pub fn new() -> Self {
Self::default()
}
pub fn compression(mut self, compression: ParquetCompression) -> Self {
self.compression = compression;
self
}
pub fn max_batch_size(mut self, batch_size: usize) -> Self {
self.max_batch_size = batch_size;
self
}
pub fn histogram_type(mut self, histogram: ParquetHistogramType) -> Self {
self.histogram_type = histogram;
self
}
}
impl Default for ParquetOptions {
fn default() -> Self {
Self {
compression: Default::default(),
max_batch_size: DEFAULT_MAX_BATCH_SIZE,
histogram_type: ParquetHistogramType::Standard,
}
}
}
#[derive(Default)]
pub struct ParquetSchema {
counters: BTreeMap<String, HashMap<String, String>>,
gauges: BTreeMap<String, HashMap<String, String>>,
histograms: BTreeMap<String, HashMap<String, String>>,
metadata: HashMap<String, String>,
rows: usize,
}
impl ParquetSchema {
pub fn new() -> Self {
ParquetSchema {
counters: BTreeMap::new(),
gauges: BTreeMap::new(),
histograms: BTreeMap::new(),
metadata: HashMap::new(),
rows: 0,
}
}
pub fn push(&mut self, snapshot: Snapshot) {
let (counters, gauges, histograms) =
(snapshot.counters, snapshot.gauges, snapshot.histograms);
for counter in counters {
self.counters
.entry(counter.name)
.or_insert(counter.metadata);
}
for gauge in gauges {
self.gauges.entry(gauge.name).or_insert(gauge.metadata);
}
for histogram in histograms {
self.histograms
.entry(histogram.name)
.or_insert(histogram.metadata);
}
if self.metadata.is_empty() && !snapshot.metadata.is_empty() {
self.metadata = snapshot.metadata;
}
self.rows += 1;
}
pub fn finalize(
self,
writer: impl Write + Send,
options: ParquetOptions,
) -> Result<ParquetWriter<impl Write + Send>, ParquetError> {
let mut fields: Vec<Field> = Vec::with_capacity(
1 + self.counters.len() + self.gauges.len() + (self.histograms.len() * 3),
);
fields.push(
Field::new("timestamp", DataType::UInt64, false).with_metadata(HashMap::from([(
"metric_type".to_owned(),
"timestamp".to_owned(),
)])),
);
let mut counters = Vec::with_capacity(self.counters.len());
for (counter, mut metadata) in self.counters.into_iter() {
metadata.insert("metric_type".to_string(), "counter".to_string());
fields
.push(Field::new(counter.clone(), DataType::UInt64, true).with_metadata(metadata));
counters.push(counter);
}
let mut gauges = Vec::with_capacity(self.gauges.len());
for (gauge, mut metadata) in self.gauges.into_iter() {
metadata.insert("metric_type".to_string(), "gauge".to_string());
fields.push(Field::new(gauge.clone(), DataType::Int64, true).with_metadata(metadata));
gauges.push(gauge);
}
let mut histograms = Vec::with_capacity(self.histograms.len());
for (histogram, mut metadata) in self.histograms.into_iter() {
match options.histogram_type {
ParquetHistogramType::Standard => {
metadata.insert("metric_type".to_string(), "histogram".to_string());
fields.push(
Field::new(
format!("{histogram}:buckets"),
DataType::new_list(DataType::UInt64, true),
true,
)
.with_metadata(metadata.clone()),
);
}
ParquetHistogramType::Sparse => {
metadata.insert("metric_type".to_string(), "sparse_histogram".to_string());
fields.push(
Field::new(
format!("{histogram}:bucket_indices"),
DataType::new_list(DataType::UInt64, true),
true,
)
.with_metadata(metadata.clone()),
);
fields.push(
Field::new(
format!("{histogram}:bucket_counts"),
DataType::new_list(DataType::UInt64, true),
true,
)
.with_metadata(metadata.clone()),
);
}
};
histograms.push(histogram);
}
let metadata: Option<Vec<KeyValue>> = if self.metadata.is_empty() {
None
} else {
Some(
self.metadata
.into_iter()
.map(|(key, value)| KeyValue {
key,
value: Some(value),
})
.collect(),
)
};
let schema = Arc::new(Schema::new(fields));
let props = WriterProperties::builder()
.set_compression(options.compression.inner)
.set_key_value_metadata(metadata)
.set_max_row_group_size(options.max_batch_size)
.build();
let arrow_writer = ArrowWriter::try_new(writer, schema.clone(), Some(props))?;
Ok(ParquetWriter {
writer: arrow_writer,
options,
schema,
counters,
gauges,
histograms,
})
}
}
pub struct ParquetWriter<W: Write + Send> {
writer: ArrowWriter<W>,
options: ParquetOptions,
schema: Arc<Schema>,
counters: Vec<String>,
gauges: Vec<String>,
histograms: Vec<String>,
}
impl<W: Write + Send> ParquetWriter<W> {
pub fn push(&mut self, snapshot: Snapshot) -> Result<(), ParquetError> {
let mut columns: Vec<Arc<dyn Array>> = Vec::with_capacity(self.schema.fields().len());
let mut hs: HashedSnapshot = HashedSnapshot::from(snapshot);
columns.push(Arc::new(UInt64Array::from(vec![hs.ts])));
for counter in self.counters.iter_mut() {
columns.push(Arc::new(UInt64Array::from(vec![hs
.counters
.remove(counter)
.map(|v| v.value)])));
}
for gauge in self.gauges.iter_mut() {
columns.push(Arc::new(Int64Array::from(vec![hs
.gauges
.remove(gauge)
.map(|v| v.value)])));
}
for h in self.histograms.iter_mut() {
let histogram = hs.histograms.remove(h).map(|v| v.value);
if let Some(hist) = histogram {
match self.options.histogram_type {
ParquetHistogramType::Standard => {
columns.push(Self::listu64_entry_from_slice(hist.as_slice()))
}
ParquetHistogramType::Sparse => {
let sparse = histogram::SparseHistogram::from(&hist);
columns.push(Self::listu64_entry_from_vec(sparse.index));
columns.push(Self::listu64_entry_from_slice(sparse.count.as_slice()));
}
};
} else {
match self.options.histogram_type {
ParquetHistogramType::Standard => columns.push(Self::listu64_entry_null()),
ParquetHistogramType::Sparse => columns.append(&mut vec![
Self::listu64_entry_null(),
Self::listu64_entry_null(),
]),
};
}
}
let batch = RecordBatch::try_new(self.schema.clone(), columns)?;
self.writer.write(&batch)
}
pub fn finalize(self) -> Result<FileMetaData, ParquetError> {
self.writer.close()
}
fn listu64_entry_from_slice(v: &[u64]) -> Arc<ListArray> {
Arc::new(ListArray::from_iter_primitive::<UInt64Type, _, _>([Some(
v.iter().map(|x| Some(*x)).collect::<Vec<Option<u64>>>(),
)]))
}
fn listu64_entry_from_vec(v: Vec<usize>) -> Arc<ListArray> {
Arc::new(ListArray::from_iter_primitive::<UInt64Type, _, _>([Some(
v.into_iter()
.map(|x| Some(x as u64))
.collect::<Vec<Option<u64>>>(),
)]))
}
fn listu64_entry_null() -> Arc<ListArray> {
Arc::new(ListArray::from_iter_primitive::<
UInt64Type,
Vec<Option<u64>>,
_,
>([None]))
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::fs::File;
use std::io::Seek;
use std::time::{Duration, SystemTime};
use ::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use arrow::array::*;
use metriken::histogram::Histogram as H2Histogram;
use crate::*;
fn build_snapshots() -> Vec<Snapshot> {
let h1 = H2Histogram::from_buckets(1, 3, vec![0, 1, 1, 0, 0, 0]).unwrap();
let s1 = Snapshot {
systemtime: SystemTime::now(),
metadata: HashMap::new(),
counters: vec![Counter {
name: "counter".to_string(),
value: 100,
metadata: HashMap::new(),
}],
gauges: vec![Gauge {
name: "gauge".to_string(),
value: 16,
metadata: HashMap::new(),
}],
histograms: vec![Histogram {
name: "histogram".to_string(),
value: h1,
metadata: HashMap::new(),
}],
};
let h2 = H2Histogram::from_buckets(1, 3, vec![0, 1, 1, 0, 1, 0]).unwrap();
let s2 = Snapshot {
systemtime: SystemTime::now()
.checked_add(Duration::from_secs(600))
.unwrap(),
metadata: HashMap::new(),
counters: vec![Counter {
name: "counter".to_string(),
value: 121,
metadata: HashMap::new(),
}],
gauges: vec![Gauge {
name: "gauge".to_string(),
value: 6,
metadata: HashMap::new(),
}],
histograms: vec![Histogram {
name: "histogram".to_string(),
value: h2,
metadata: HashMap::new(),
}],
};
vec![s1, s2]
}
fn write_parquet(snapshots: Vec<Snapshot>, options: ParquetOptions) -> File {
let mut schema = ParquetSchema::new();
for s in &snapshots {
schema.push(s.clone());
}
let mut tmpfile = tempfile::tempfile().unwrap();
let mut writer = schema
.finalize(tmpfile.try_clone().unwrap(), options)
.unwrap();
for s in &snapshots {
let _ = writer.push(s.clone());
}
let _ = writer.finalize();
let _ = tmpfile.rewind();
tmpfile
}
fn validate_i64_array(col: ArrayRef, vals: &[i64]) {
let v = col.as_any().downcast_ref::<array::Int64Array>().unwrap();
assert_eq!(v.values(), vals);
}
fn validate_u64_array(col: ArrayRef, vals: &[u64]) {
let v = col.as_any().downcast_ref::<array::UInt64Array>().unwrap();
assert_eq!(v.values(), vals);
}
#[test]
fn test_row_groups() {
let snapshots = build_snapshots();
let tmpfile = write_parquet(snapshots, ParquetOptions::new().max_batch_size(1));
let builder = ParquetRecordBatchReaderBuilder::try_new(tmpfile).unwrap();
assert_eq!(builder.metadata().row_groups().len(), 2);
assert_eq!(builder.metadata().row_group(0).num_rows(), 1);
assert_eq!(builder.metadata().row_group(1).num_rows(), 1);
}
#[test]
fn test_default() {
let snapshots = build_snapshots();
let tmpfile = write_parquet(snapshots, ParquetOptions::new());
let builder = ParquetRecordBatchReaderBuilder::try_new(tmpfile).unwrap();
assert_eq!(builder.metadata().row_groups().len(), 1);
assert_eq!(builder.metadata().row_group(0).num_rows(), 2);
let fields: Vec<&String> = builder.schema().fields().iter().map(|x| x.name()).collect();
let expected = vec!["timestamp", "counter", "gauge", "histogram:buckets"];
assert_eq!(fields.len(), expected.len());
assert_eq!(fields, expected);
let batch = builder.build().unwrap().next().unwrap().unwrap();
assert_eq!(batch.num_columns(), 4);
assert_eq!(batch.num_rows(), 2);
validate_u64_array(batch.column(1).clone(), &[100, 121]);
validate_i64_array(batch.column(2).clone(), &[16, 6]);
let histograms = batch
.column(3)
.as_any()
.downcast_ref::<array::ListArray>()
.unwrap();
validate_u64_array(histograms.value(0), &[0, 1, 1, 0, 0, 0]);
validate_u64_array(histograms.value(1), &[0, 1, 1, 0, 1, 0]);
}
#[test]
fn test_sparse() {
let snapshots = build_snapshots();
let tmpfile = write_parquet(
snapshots,
ParquetOptions::new().histogram_type(ParquetHistogramType::Sparse),
);
let builder = ParquetRecordBatchReaderBuilder::try_new(tmpfile).unwrap();
assert_eq!(builder.metadata().row_groups().len(), 1);
assert_eq!(builder.metadata().row_group(0).num_rows(), 2);
let fields: Vec<&String> = builder.schema().fields().iter().map(|x| x.name()).collect();
let expected = vec![
"timestamp",
"counter",
"gauge",
"histogram:bucket_indices",
"histogram:bucket_counts",
];
assert_eq!(fields.len(), expected.len());
assert_eq!(fields, expected);
let batch = builder.build().unwrap().next().unwrap().unwrap();
assert_eq!(batch.num_columns(), 5);
assert_eq!(batch.num_rows(), 2);
validate_u64_array(batch.column(1).clone(), &[100, 121]);
validate_i64_array(batch.column(2).clone(), &[16, 6]);
let indices = batch
.column(3)
.as_any()
.downcast_ref::<array::ListArray>()
.unwrap();
validate_u64_array(indices.value(0), &[1, 2]);
validate_u64_array(indices.value(1), &[1, 2, 4]);
let counts = batch
.column(4)
.as_any()
.downcast_ref::<array::ListArray>()
.unwrap();
validate_u64_array(counts.value(0), &[1, 1]);
validate_u64_array(counts.value(1), &[1, 1, 1]);
}
}