prometheus_parser/types/
function.rs

1// (C) Copyright 2019-2020 Hewlett Packard Enterprise Development LP
2
3use std::collections::HashSet;
4use std::fmt;
5use std::iter::FromIterator;
6
7use super::expression::{Expression, BExpression};
8use super::misc::{Span, Subquery};
9use super::return_value::{LabelSetOp, ReturnKind, ReturnValue, strs_to_set};
10
11/// Aggregation operator types, namely "by" and "without"
12#[derive(Debug, PartialEq, Eq, Clone)]
13pub enum AggregationOp {
14  By,
15  Without
16}
17
18impl fmt::Display for AggregationOp {
19  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
20    match self {
21      AggregationOp::By => write!(f, "by"),
22      AggregationOp::Without => write!(f, "without")
23    }
24  }
25}
26
27/// A function aggregation clause.
28///
29/// Not all functions can be aggregated, but this library currently does not
30/// attempt to validate this sort of usage.
31#[derive(Debug, PartialEq, Eq, Clone)]
32pub struct Aggregation {
33  pub op: AggregationOp,
34  pub labels: Vec<String>,
35
36  pub span: Option<Span>
37}
38
39impl Aggregation {
40  pub fn new(op: AggregationOp) -> Self {
41    Aggregation {
42      op, labels: vec![], span: None
43    }
44  }
45
46  pub fn by() -> Self {
47    Aggregation::new(AggregationOp::By)
48  }
49
50  pub fn without() -> Self {
51    Aggregation::new(AggregationOp::Without)
52  }
53
54  /// Replaces this Aggregation's operator
55  pub fn op(mut self, op: AggregationOp) -> Self {
56    self.op = op;
57    self
58  }
59
60  /// Adds a label key to this Aggregation
61  pub fn label<S: Into<String>>(mut self, label: S) -> Self {
62    self.labels.push(label.into());
63    self
64  }
65
66  /// Replaces this Aggregation's labels with the given set
67  pub fn labels(mut self, labels: &[&str]) -> Self {
68    self.labels = labels.iter().map(|l| (*l).to_string()).collect();
69    self
70  }
71
72  /// Clear this Matching's set of labels
73  pub fn clear_labels(mut self) -> Self {
74    self.labels.clear();
75    self
76  }
77
78  pub fn span<S: Into<Span>>(mut self, span: S) -> Self {
79    self.span = Some(span.into());
80    self
81  }
82}
83
84impl fmt::Display for Aggregation {
85  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
86    write!(f, "{}", self.op)?;
87
88    if !self.labels.is_empty() {
89      write!(f, " (")?;
90
91      for (i, label) in self.labels.iter().enumerate() {
92        if i > 0 {
93          write!(f, ", ")?;
94        }
95
96        write!(f, "{}", label)?;
97      }
98
99      write!(f, ")")?;
100    }
101
102    Ok(())
103  }
104}
105
106/// A function call.
107///
108/// Note that function names, arguments, argument types, and return types are
109/// not validated.
110#[derive(Debug, PartialEq, Clone)]
111pub struct Function {
112  pub name: String,
113  pub args: Vec<BExpression>,
114
115  pub aggregation: Option<Aggregation>,
116
117  pub subquery: Option<Subquery>,
118
119  pub span: Option<Span>
120}
121
122/// Given a Prometheus function name, returns the argument index that determines
123/// the set of resulting labels
124fn arg_index_for_function(name: &str) -> Option<usize> {
125  Some(match name.to_lowercase().as_str() {
126    "time" => return None,
127
128    "histogram_quantile" => 1,
129    "quantile_over_time" => 1,
130
131    // almost all functions use the first arg (often by only accepting 1 arg)
132    _ => 0
133  })
134}
135
136/// Determines if a given function converts a range vector to an instance vector
137///
138/// Note that `_over_time` functions do not affect labels, unlike their regular
139/// counterparts
140fn is_aggregation_over_time(name: &str) -> bool {
141  match name.to_lowercase().as_str() {
142    // all the _over_time functions ...
143    "avg_over_time" | "min_over_time" | "max_over_time" => true,
144    "sum_over_time" | "count_over_time" | "quantile_over_time" => true,
145    "stddev_over_time" | "stdvar_over_time" => true,
146
147    // and also a few more
148    "delta" | "deriv" | "idelta" | "increase" | "predict_linear" => true,
149    "irate" | "rate" | "resets" => true,
150
151    _ => false
152  }
153}
154
155/// Determines if a given function is an aggregation function
156fn is_aggregation(name: &str) -> bool {
157  match name.to_lowercase().as_str() {
158    "sum" | "min" | "max" | "avg" | "stddev" | "stdvar" | "count" => true,
159    "count_values" | "bottomk" | "topk" | "quantile" => true,
160    _ => false
161  }
162}
163
164impl Function {
165  pub fn new<S: Into<String>>(name: S) -> Function {
166    Function {
167      name: name.into(),
168      args: vec![],
169      aggregation: None,
170      subquery: None,
171      span: None
172    }
173  }
174
175  /// Replaces this Function's name with the given string
176  ///
177  /// Note that the new name is not validated.
178  pub fn name<S: Into<String>>(mut self, name: S) -> Self {
179    self.name = name.into();
180    self
181  }
182
183  /// Adds the given expression as new argument to this function 
184  pub fn arg(mut self, arg: Expression) -> Self {
185    self.args.push(Box::new(arg));
186    self
187  }
188
189  /// Sets this Function's aggregation clause
190  pub fn aggregation(mut self, aggregation: Aggregation) -> Self {
191    self.aggregation = Some(aggregation);
192    self
193  }
194
195  /// Clears this Function's aggregation clause, if any 
196  pub fn clear_aggregation(mut self) -> Self {
197    self.aggregation = None;
198    self
199  }
200
201  pub fn subquery(mut self, subquery: Subquery) -> Self {
202    self.subquery = Some(subquery);
203    self
204  }
205
206  pub fn clear_subquery(mut self) -> Self {
207    self.subquery = None;
208    self
209  }
210
211  pub fn span<S: Into<Span>>(mut self, span: S) -> Self {
212    self.span = Some(span.into());
213    self
214  }
215
216  /// Wraps this function in an Expression
217  pub fn wrap(self) -> Expression {
218    Expression::Function(self)
219  }
220
221  pub fn return_value(&self) -> ReturnValue {
222    // functions generally pass through labels of one of their arguments, with
223    // some exceptions
224
225    // determine the arg to pass through
226    let labels_arg_index = match arg_index_for_function(&self.name) {
227      Some(index) => index,
228
229      // only `time(...)` accepts no arguments and always returns a scalar
230      None => return ReturnValue {
231        kind: ReturnKind::Scalar,
232        label_ops: vec![LabelSetOp::clear(self.clone().wrap(), self.span)]
233      }
234    };
235
236    // if the function call is invalid, bail with unknowns
237    let labels_arg = match self.args.get(labels_arg_index) {
238      Some(arg) => arg,
239      None => return ReturnValue {
240        kind: ReturnKind::unknown(
241          format!(
242            "function call {}(...) is missing a required argument at index {}",
243            self.name, labels_arg_index
244          ),
245          self.clone().wrap()
246        ),
247
248        // we can't predict what will happen with labels, but we can at least
249        // try to output _something_
250        label_ops: vec![]
251      }
252    };
253
254    let arg_return = labels_arg.return_value();
255    
256    let mut kind = arg_return.kind;
257    let mut label_ops = arg_return.label_ops;
258
259    if is_aggregation_over_time(&self.name) {
260      if let ReturnKind::RangeVector = kind {
261        kind = ReturnKind::InstantVector;
262      } else {
263        // invalid arg
264        kind = ReturnKind::unknown(
265          format!("aggregation over time is not valid with expression returning {:?}", kind),
266
267          // show the label arg as the cause
268          // doesn't follow the usual pattern of showing the parent, but would
269          // otherwise be ambiguous with multiple args
270          *labels_arg.clone()
271        );
272      }
273    }
274
275    // aggregation functions reset labels unless they have an aggregation clause
276    // (handled below)
277    let is_agg = is_aggregation(&self.name);
278
279    // aggregations turn range vectors into instant vectors
280    if let Some(agg) = &self.aggregation {
281      match agg.op {
282        AggregationOp::By => {
283          // by (...) resets labels to only those in the by clause
284          label_ops.push(LabelSetOp::clear(self.clone().wrap(), agg.span));
285          label_ops.push(LabelSetOp::append(
286            self.clone().wrap(),
287            agg.span,
288            HashSet::from_iter(agg.labels.iter().cloned())
289          ));
290        },
291        AggregationOp::Without => label_ops.push(LabelSetOp::remove(
292          self.clone().wrap(),
293          agg.span,
294          HashSet::from_iter(agg.labels.iter().cloned()),
295        ))
296      }
297    } else if is_agg {
298      // an aggregation function with no aggregation clause clears all labels
299      label_ops.push(LabelSetOp::clear(self.clone().wrap(), self.span));
300    }
301    
302    // handle special cases with functions that mutate labels
303    match self.name.to_lowercase().as_str() {
304      // creats a new label with contents of others, leaves them intact
305      "label_join" => if let Some(expr) = self.args.get(1) {
306        if let Some(s) = expr.as_str() {
307          label_ops.push(LabelSetOp::append(
308            self.clone().wrap(),
309            self.span,
310            strs_to_set(&[s])
311          ));
312        }
313      },
314
315      // note: does not remove src_label
316      "label_replace" => if let Some(expr) = self.args.get(1) {
317        if let Some(s) = expr.as_str() {
318          label_ops.push(LabelSetOp::append(
319            self.clone().wrap(),
320            self.span,
321            strs_to_set(&[s])
322          ));
323        }
324      },
325
326      // per docs, input vector must have an `le` label
327      "histogram_quantile" => label_ops.push(LabelSetOp::append(
328        self.clone().wrap(),
329        self.span,
330        strs_to_set(&["le"])
331      )),
332
333      // "vector" => // no-op, its input expr is a scalar anyway
334
335      _ => ()
336    }
337
338    // subqueries turn instant vectors into ranges
339    if self.subquery.is_some() {
340      kind = match kind {
341        ReturnKind::InstantVector => ReturnKind::RangeVector,
342        _ => ReturnKind::unknown(
343          format!("subquery on inner expression returning {:?} is invalid", kind),
344          self.clone().wrap()
345        )
346      };
347    }
348
349    ReturnValue { kind, label_ops }
350  }
351}
352
353impl fmt::Display for Function {
354  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
355    write!(f, "{}(", self.name)?;
356    
357    for (i, arg) in self.args.iter().enumerate() {
358      if i > 0 {
359        write!(f, ", ")?;
360      }
361
362      write!(f, "{}", arg)?;
363    }
364
365    write!(f, ")")?;
366
367    if let Some(agg) = &self.aggregation {
368      write!(f, " {}", agg)?;
369    }
370
371    if let Some(subquery) = &self.subquery {
372      write!(f, "{}", subquery)?;
373    }
374
375    Ok(())
376  }
377}