1use std::collections::HashSet;
4use std::fmt;
5use std::iter::FromIterator;
6
7use super::expression::{Expression, BExpression};
8use super::misc::{Span, Subquery};
9use super::return_value::{LabelSetOp, ReturnKind, ReturnValue, strs_to_set};
10
11#[derive(Debug, PartialEq, Eq, Clone)]
13pub enum AggregationOp {
14 By,
15 Without
16}
17
18impl fmt::Display for AggregationOp {
19 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
20 match self {
21 AggregationOp::By => write!(f, "by"),
22 AggregationOp::Without => write!(f, "without")
23 }
24 }
25}
26
27#[derive(Debug, PartialEq, Eq, Clone)]
32pub struct Aggregation {
33 pub op: AggregationOp,
34 pub labels: Vec<String>,
35
36 pub span: Option<Span>
37}
38
39impl Aggregation {
40 pub fn new(op: AggregationOp) -> Self {
41 Aggregation {
42 op, labels: vec![], span: None
43 }
44 }
45
46 pub fn by() -> Self {
47 Aggregation::new(AggregationOp::By)
48 }
49
50 pub fn without() -> Self {
51 Aggregation::new(AggregationOp::Without)
52 }
53
54 pub fn op(mut self, op: AggregationOp) -> Self {
56 self.op = op;
57 self
58 }
59
60 pub fn label<S: Into<String>>(mut self, label: S) -> Self {
62 self.labels.push(label.into());
63 self
64 }
65
66 pub fn labels(mut self, labels: &[&str]) -> Self {
68 self.labels = labels.iter().map(|l| (*l).to_string()).collect();
69 self
70 }
71
72 pub fn clear_labels(mut self) -> Self {
74 self.labels.clear();
75 self
76 }
77
78 pub fn span<S: Into<Span>>(mut self, span: S) -> Self {
79 self.span = Some(span.into());
80 self
81 }
82}
83
84impl fmt::Display for Aggregation {
85 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
86 write!(f, "{}", self.op)?;
87
88 if !self.labels.is_empty() {
89 write!(f, " (")?;
90
91 for (i, label) in self.labels.iter().enumerate() {
92 if i > 0 {
93 write!(f, ", ")?;
94 }
95
96 write!(f, "{}", label)?;
97 }
98
99 write!(f, ")")?;
100 }
101
102 Ok(())
103 }
104}
105
106#[derive(Debug, PartialEq, Clone)]
111pub struct Function {
112 pub name: String,
113 pub args: Vec<BExpression>,
114
115 pub aggregation: Option<Aggregation>,
116
117 pub subquery: Option<Subquery>,
118
119 pub span: Option<Span>
120}
121
122fn arg_index_for_function(name: &str) -> Option<usize> {
125 Some(match name.to_lowercase().as_str() {
126 "time" => return None,
127
128 "histogram_quantile" => 1,
129 "quantile_over_time" => 1,
130
131 _ => 0
133 })
134}
135
136fn is_aggregation_over_time(name: &str) -> bool {
141 match name.to_lowercase().as_str() {
142 "avg_over_time" | "min_over_time" | "max_over_time" => true,
144 "sum_over_time" | "count_over_time" | "quantile_over_time" => true,
145 "stddev_over_time" | "stdvar_over_time" => true,
146
147 "delta" | "deriv" | "idelta" | "increase" | "predict_linear" => true,
149 "irate" | "rate" | "resets" => true,
150
151 _ => false
152 }
153}
154
155fn is_aggregation(name: &str) -> bool {
157 match name.to_lowercase().as_str() {
158 "sum" | "min" | "max" | "avg" | "stddev" | "stdvar" | "count" => true,
159 "count_values" | "bottomk" | "topk" | "quantile" => true,
160 _ => false
161 }
162}
163
164impl Function {
165 pub fn new<S: Into<String>>(name: S) -> Function {
166 Function {
167 name: name.into(),
168 args: vec![],
169 aggregation: None,
170 subquery: None,
171 span: None
172 }
173 }
174
175 pub fn name<S: Into<String>>(mut self, name: S) -> Self {
179 self.name = name.into();
180 self
181 }
182
183 pub fn arg(mut self, arg: Expression) -> Self {
185 self.args.push(Box::new(arg));
186 self
187 }
188
189 pub fn aggregation(mut self, aggregation: Aggregation) -> Self {
191 self.aggregation = Some(aggregation);
192 self
193 }
194
195 pub fn clear_aggregation(mut self) -> Self {
197 self.aggregation = None;
198 self
199 }
200
201 pub fn subquery(mut self, subquery: Subquery) -> Self {
202 self.subquery = Some(subquery);
203 self
204 }
205
206 pub fn clear_subquery(mut self) -> Self {
207 self.subquery = None;
208 self
209 }
210
211 pub fn span<S: Into<Span>>(mut self, span: S) -> Self {
212 self.span = Some(span.into());
213 self
214 }
215
216 pub fn wrap(self) -> Expression {
218 Expression::Function(self)
219 }
220
221 pub fn return_value(&self) -> ReturnValue {
222 let labels_arg_index = match arg_index_for_function(&self.name) {
227 Some(index) => index,
228
229 None => return ReturnValue {
231 kind: ReturnKind::Scalar,
232 label_ops: vec![LabelSetOp::clear(self.clone().wrap(), self.span)]
233 }
234 };
235
236 let labels_arg = match self.args.get(labels_arg_index) {
238 Some(arg) => arg,
239 None => return ReturnValue {
240 kind: ReturnKind::unknown(
241 format!(
242 "function call {}(...) is missing a required argument at index {}",
243 self.name, labels_arg_index
244 ),
245 self.clone().wrap()
246 ),
247
248 label_ops: vec![]
251 }
252 };
253
254 let arg_return = labels_arg.return_value();
255
256 let mut kind = arg_return.kind;
257 let mut label_ops = arg_return.label_ops;
258
259 if is_aggregation_over_time(&self.name) {
260 if let ReturnKind::RangeVector = kind {
261 kind = ReturnKind::InstantVector;
262 } else {
263 kind = ReturnKind::unknown(
265 format!("aggregation over time is not valid with expression returning {:?}", kind),
266
267 *labels_arg.clone()
271 );
272 }
273 }
274
275 let is_agg = is_aggregation(&self.name);
278
279 if let Some(agg) = &self.aggregation {
281 match agg.op {
282 AggregationOp::By => {
283 label_ops.push(LabelSetOp::clear(self.clone().wrap(), agg.span));
285 label_ops.push(LabelSetOp::append(
286 self.clone().wrap(),
287 agg.span,
288 HashSet::from_iter(agg.labels.iter().cloned())
289 ));
290 },
291 AggregationOp::Without => label_ops.push(LabelSetOp::remove(
292 self.clone().wrap(),
293 agg.span,
294 HashSet::from_iter(agg.labels.iter().cloned()),
295 ))
296 }
297 } else if is_agg {
298 label_ops.push(LabelSetOp::clear(self.clone().wrap(), self.span));
300 }
301
302 match self.name.to_lowercase().as_str() {
304 "label_join" => if let Some(expr) = self.args.get(1) {
306 if let Some(s) = expr.as_str() {
307 label_ops.push(LabelSetOp::append(
308 self.clone().wrap(),
309 self.span,
310 strs_to_set(&[s])
311 ));
312 }
313 },
314
315 "label_replace" => if let Some(expr) = self.args.get(1) {
317 if let Some(s) = expr.as_str() {
318 label_ops.push(LabelSetOp::append(
319 self.clone().wrap(),
320 self.span,
321 strs_to_set(&[s])
322 ));
323 }
324 },
325
326 "histogram_quantile" => label_ops.push(LabelSetOp::append(
328 self.clone().wrap(),
329 self.span,
330 strs_to_set(&["le"])
331 )),
332
333 _ => ()
336 }
337
338 if self.subquery.is_some() {
340 kind = match kind {
341 ReturnKind::InstantVector => ReturnKind::RangeVector,
342 _ => ReturnKind::unknown(
343 format!("subquery on inner expression returning {:?} is invalid", kind),
344 self.clone().wrap()
345 )
346 };
347 }
348
349 ReturnValue { kind, label_ops }
350 }
351}
352
353impl fmt::Display for Function {
354 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
355 write!(f, "{}(", self.name)?;
356
357 for (i, arg) in self.args.iter().enumerate() {
358 if i > 0 {
359 write!(f, ", ")?;
360 }
361
362 write!(f, "{}", arg)?;
363 }
364
365 write!(f, ")")?;
366
367 if let Some(agg) = &self.aggregation {
368 write!(f, " {}", agg)?;
369 }
370
371 if let Some(subquery) = &self.subquery {
372 write!(f, "{}", subquery)?;
373 }
374
375 Ok(())
376 }
377}