use std::collections::HashSet;
use std::fmt;
use std::iter::FromIterator;
use super::expression::{Expression, BExpression};
use super::misc::{Span, Subquery};
use super::return_value::{LabelSetOp, ReturnKind, ReturnValue, strs_to_set};
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum AggregationOp {
By,
Without
}
impl fmt::Display for AggregationOp {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
AggregationOp::By => write!(f, "by"),
AggregationOp::Without => write!(f, "without")
}
}
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct Aggregation {
pub op: AggregationOp,
pub labels: Vec<String>,
pub span: Option<Span>
}
impl Aggregation {
pub fn new(op: AggregationOp) -> Self {
Aggregation {
op, labels: vec![], span: None
}
}
pub fn by() -> Self {
Aggregation::new(AggregationOp::By)
}
pub fn without() -> Self {
Aggregation::new(AggregationOp::Without)
}
pub fn op(mut self, op: AggregationOp) -> Self {
self.op = op;
self
}
pub fn label<S: Into<String>>(mut self, label: S) -> Self {
self.labels.push(label.into());
self
}
pub fn labels(mut self, labels: &[&str]) -> Self {
self.labels = labels.iter().map(|l| (*l).to_string()).collect();
self
}
pub fn clear_labels(mut self) -> Self {
self.labels.clear();
self
}
pub fn span<S: Into<Span>>(mut self, span: S) -> Self {
self.span = Some(span.into());
self
}
}
impl fmt::Display for Aggregation {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.op)?;
if !self.labels.is_empty() {
write!(f, " (")?;
for (i, label) in self.labels.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{}", label)?;
}
write!(f, ")")?;
}
Ok(())
}
}
#[derive(Debug, PartialEq, Clone)]
pub struct Function {
pub name: String,
pub args: Vec<BExpression>,
pub aggregation: Option<Aggregation>,
pub subquery: Option<Subquery>,
pub span: Option<Span>
}
fn arg_index_for_function(name: &str) -> Option<usize> {
Some(match name.to_lowercase().as_str() {
"time" => return None,
"histogram_quantile" => 1,
"quantile_over_time" => 1,
_ => 0
})
}
fn is_aggregation_over_time(name: &str) -> bool {
match name.to_lowercase().as_str() {
"avg_over_time" | "min_over_time" | "max_over_time" => true,
"sum_over_time" | "count_over_time" | "quantile_over_time" => true,
"stddev_over_time" | "stdvar_over_time" => true,
"delta" | "deriv" | "idelta" | "increase" | "predict_linear" => true,
"irate" | "rate" | "resets" => true,
_ => false
}
}
fn is_aggregation(name: &str) -> bool {
match name.to_lowercase().as_str() {
"sum" | "min" | "max" | "avg" | "stddev" | "stdvar" | "count" => true,
"count_values" | "bottomk" | "topk" | "quantile" => true,
_ => false
}
}
impl Function {
pub fn new<S: Into<String>>(name: S) -> Function {
Function {
name: name.into(),
args: vec![],
aggregation: None,
subquery: None,
span: None
}
}
pub fn name<S: Into<String>>(mut self, name: S) -> Self {
self.name = name.into();
self
}
pub fn arg(mut self, arg: Expression) -> Self {
self.args.push(Box::new(arg));
self
}
pub fn aggregation(mut self, aggregation: Aggregation) -> Self {
self.aggregation = Some(aggregation);
self
}
pub fn clear_aggregation(mut self) -> Self {
self.aggregation = None;
self
}
pub fn subquery(mut self, subquery: Subquery) -> Self {
self.subquery = Some(subquery);
self
}
pub fn clear_subquery(mut self) -> Self {
self.subquery = None;
self
}
pub fn span<S: Into<Span>>(mut self, span: S) -> Self {
self.span = Some(span.into());
self
}
pub fn wrap(self) -> Expression {
Expression::Function(self)
}
pub fn return_value(&self) -> ReturnValue {
let labels_arg_index = match arg_index_for_function(&self.name) {
Some(index) => index,
None => return ReturnValue {
kind: ReturnKind::Scalar,
label_ops: vec![LabelSetOp::clear(self.clone().wrap(), self.span)]
}
};
let labels_arg = match self.args.get(labels_arg_index) {
Some(arg) => arg,
None => return ReturnValue {
kind: ReturnKind::unknown(
format!(
"function call {}(...) is missing a required argument at index {}",
self.name, labels_arg_index
),
self.clone().wrap()
),
label_ops: vec![]
}
};
let arg_return = labels_arg.return_value();
let mut kind = arg_return.kind;
let mut label_ops = arg_return.label_ops;
if is_aggregation_over_time(&self.name) {
if let ReturnKind::RangeVector = kind {
kind = ReturnKind::InstantVector;
} else {
kind = ReturnKind::unknown(
format!("aggregation over time is not valid with expression returning {:?}", kind),
*labels_arg.clone()
);
}
}
let is_agg = is_aggregation(&self.name);
if let Some(agg) = &self.aggregation {
match agg.op {
AggregationOp::By => {
label_ops.push(LabelSetOp::clear(self.clone().wrap(), agg.span));
label_ops.push(LabelSetOp::append(
self.clone().wrap(),
agg.span,
HashSet::from_iter(agg.labels.iter().cloned())
));
},
AggregationOp::Without => label_ops.push(LabelSetOp::remove(
self.clone().wrap(),
agg.span,
HashSet::from_iter(agg.labels.iter().cloned()),
))
}
} else if is_agg {
label_ops.push(LabelSetOp::clear(self.clone().wrap(), self.span));
}
match self.name.to_lowercase().as_str() {
"label_join" => if let Some(expr) = self.args.get(1) {
if let Some(s) = expr.as_str() {
label_ops.push(LabelSetOp::append(
self.clone().wrap(),
self.span,
strs_to_set(&[s])
));
}
},
"label_replace" => if let Some(expr) = self.args.get(1) {
if let Some(s) = expr.as_str() {
label_ops.push(LabelSetOp::append(
self.clone().wrap(),
self.span,
strs_to_set(&[s])
));
}
},
"histogram_quantile" => label_ops.push(LabelSetOp::append(
self.clone().wrap(),
self.span,
strs_to_set(&["le"])
)),
_ => ()
}
if self.subquery.is_some() {
kind = match kind {
ReturnKind::InstantVector => ReturnKind::RangeVector,
_ => ReturnKind::unknown(
format!("subquery on inner expression returning {:?} is invalid", kind),
self.clone().wrap()
)
};
}
ReturnValue { kind, label_ops }
}
}
impl fmt::Display for Function {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}(", self.name)?;
for (i, arg) in self.args.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{}", arg)?;
}
write!(f, ")")?;
if let Some(agg) = &self.aggregation {
write!(f, " {}", agg)?;
}
if let Some(subquery) = &self.subquery {
write!(f, "{}", subquery)?;
}
Ok(())
}
}