rinfluxdb_flux/
querybuilder.rs

1// Copyright Claudio Mattera 2021.
2// Distributed under the MIT License or Apache 2.0 License at your option.
3// See accompanying files License-MIT.txt and License-Apache-2.0, or online at
4// https://opensource.org/licenses/MIT
5// https://opensource.org/licenses/Apache-2.0
6
7use std::fmt::Write;
8
9use rinfluxdb_types::{Duration, InstantOrDuration};
10
11use super::query::Query;
12
13#[derive(Debug)]
14enum Statement {
15    Range(InstantOrDuration, InstantOrDuration),
16    RangeStart(InstantOrDuration),
17    RangeStop(InstantOrDuration),
18    Filter(String),
19    Window(Duration),
20    Aggregate(String),
21    Duplicate(String, String),
22    AggregateWindow(String, Duration),
23}
24
25/// A builder for Flux queries
26///
27/// ```
28/// # use rinfluxdb_types::Duration;
29/// # use rinfluxdb_flux::QueryBuilder;
30/// let query = QueryBuilder::from("telegraf/autogen")
31///     .range_start(Duration::Minutes(-15))
32///     .filter(
33///         r#"r._measurement == "cpu" and
34///         r._field == "usage_system" and
35///         r.cpu == "cpu-total""#
36///     )
37///     .build();
38///
39/// assert_eq!(
40///     query.as_ref(),
41///     r#"from(bucket: "telegraf/autogen")
42///   |> range(start: -15m)
43///   |> filter(fn: (r) =>
44///     r._measurement == "cpu" and
45///     r._field == "usage_system" and
46///     r.cpu == "cpu-total"
47///   )
48///   |> yield()"#,
49/// );
50/// ```
51pub struct QueryBuilder {
52    bucket: String,
53    statements: Vec<Statement>,
54}
55
56impl QueryBuilder {
57    /// Create a query selecting from a bucket.
58    pub fn from<T>(bucket: T) -> Self
59    where
60        T: Into<String>,
61    {
62        Self {
63            bucket: bucket.into(),
64            statements: vec![],
65        }
66    }
67
68    fn statement(&mut self, statement: Statement) {
69        self.statements.push(statement);
70    }
71
72    /// Restrict query results to a start time
73    pub fn range_start<T>(mut self, start: T) -> Self
74    where
75        T: Into<InstantOrDuration>,
76    {
77        self.statement(Statement::RangeStart(start.into()));
78        self
79    }
80
81    /// Restrict query results to a stop time
82    pub fn range_stop<T>(mut self, stop: T) -> Self
83    where
84        T: Into<InstantOrDuration>,
85    {
86        self.statement(Statement::RangeStop(stop.into()));
87        self
88    }
89
90    /// Restrict query results to a between two instants
91    pub fn range<T, S>(mut self, start: T, stop: S) -> Self
92    where
93        T: Into<InstantOrDuration>,
94        S: Into<InstantOrDuration>,
95    {
96        self.statement(Statement::Range(start.into(), stop.into()));
97        self
98    }
99
100    /// Add a filter to the query
101    pub fn filter<T>(mut self, filter: T) -> Self
102    where
103        T: Into<String>,
104    {
105        self.statement(Statement::Filter(filter.into()));
106        self
107    }
108
109    /// Add a window to the query
110    pub fn window<T>(mut self, every: T) -> Self
111    where
112        T: Into<Duration>,
113    {
114        self.statement(Statement::Window(every.into()));
115        self
116    }
117
118    /// Aggregate results
119    pub fn aggregate<T>(mut self, fn_: T) -> Self
120    where
121        T: Into<String>,
122    {
123        self.statement(Statement::Aggregate(fn_.into()));
124        self
125    }
126
127    /// Aggregate results using the `mean` function
128    pub fn mean(self) -> Self {
129        self.aggregate("mean")
130    }
131
132    /// Duplicate fields
133    pub fn duplicate<T, S>(mut self, column: T, as_: S) -> Self
134    where
135        T: Into<String>,
136        S: Into<String>,
137    {
138        // let statement = Statement::new("duplicate")
139        //     .argument("column", "\"".to_owned() + column.as_ref() + "\"")
140        //     .argument("as", "\"".to_owned() + as_.as_ref() + "\"");
141        self.statement(Statement::Duplicate(column.into(), as_.into()));
142        self
143    }
144
145    /// Aggregate results over a window
146    pub fn aggregate_window<T, S>(mut self, fn_: S, every: Duration) -> Self
147    where
148        T: Into<String>,
149        S: Into<String>,
150    {
151        self.statement(Statement::AggregateWindow(fn_.into(), every));
152        self
153    }
154
155    /// Create the Flux query
156    pub fn build(self) -> Query {
157        let mut result = String::new();
158
159        writeln!(&mut result, "from(bucket: \"{}\")", self.bucket).unwrap();
160
161        for statement in self.statements {
162
163            // TODO: Return error if vecs have not expected number of arguments
164            match statement {
165                Statement::Range(start, stop) => writeln!(
166                    &mut result,
167                    "  |> range(start: {}, stop: {})",
168                    start.to_string(),
169                    stop.to_string(),
170                )
171                .unwrap(),
172                Statement::RangeStart(start) => writeln!(
173                    &mut result,
174                    "  |> range(start: {})",
175                    start.to_string(),
176                )
177                .unwrap(),
178                Statement::RangeStop(stop) => writeln!(
179                    &mut result,
180                    "  |> range(stop: {})",
181                    stop.to_string(),
182                )
183                .unwrap(),
184                Statement::Filter(filter) => {
185                    writeln!(&mut result, "  |> filter(fn: (r) =>").unwrap();
186                    for line in filter.lines() {
187                        writeln!(&mut result, "    {}", line.trim_start()).unwrap();
188                    }
189                    writeln!(&mut result, "  )").unwrap();
190                }
191                Statement::Window(every) => writeln!(
192                    &mut result,
193                    "  |> window(every: {})",
194                    every.to_string(),
195                )
196                .unwrap(),
197                Statement::Aggregate(fn_) => writeln!(
198                    &mut result,
199                    "  |> {}()",
200                    fn_,
201                )
202                .unwrap(),
203                Statement::Duplicate(column, as_) => writeln!(
204                    &mut result,
205                    "  |> duplicate(column: \"{}\", as: \"{}\")",
206                    column,
207                    as_,
208                )
209                .unwrap(),
210                Statement::AggregateWindow(fn_, every) => writeln!(
211                    &mut result,
212                    "  |> aggregate_window(fn: {}, every: {})",
213                    fn_,
214                    every.to_string(),
215                )
216                .unwrap(),
217            }
218        }
219
220        write!(&mut result, "  |> yield()").unwrap();
221
222        Query::new(result)
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229
230    #[test]
231    fn simple_query() {
232        let expected = Query::new(
233            "from(bucket: \"telegraf/autogen\")
234  |> range(start: -15m)
235  |> yield()",
236        );
237
238        let actual = QueryBuilder::from("telegraf/autogen")
239            .range_start(Duration::Minutes(-15))
240            .build();
241
242        assert_eq!(actual, expected);
243    }
244
245    #[test]
246    fn simple_query_with_filter() {
247        let expected = Query::new(
248            r#"from(bucket: "telegraf/autogen")
249  |> range(start: -15m)
250  |> filter(fn: (r) =>
251    r._measurement == "cpu" and
252    r._field == "usage_system" and
253    r.cpu == "cpu-total"
254  )
255  |> yield()"#,
256        );
257
258        let actual = QueryBuilder::from("telegraf/autogen")
259            .range_start(Duration::Minutes(-15))
260            .filter(
261                r#"r._measurement == "cpu" and
262                r._field == "usage_system" and
263                r.cpu == "cpu-total""#,
264            )
265            .build();
266
267        assert_eq!(actual, expected);
268    }
269
270    #[test]
271    fn simple_query_with_statements() {
272        let expected = Query::new(
273            r#"from(bucket: "telegraf/autogen")
274  |> range(start: -1h)
275  |> filter(fn: (r) =>
276    r._measurement == "cpu" and
277    r._field == "usage_system" and
278    r.cpu == "cpu-total"
279  )
280  |> window(every: 5m)
281  |> mean()
282  |> duplicate(column: "_stop", as: "_time")
283  |> window(every: inf)
284  |> yield()"#,
285        );
286
287        let actual = QueryBuilder::from("telegraf/autogen")
288            .range_start(Duration::Hours(-1))
289            .filter(
290                r#"r._measurement == "cpu" and
291                r._field == "usage_system" and
292                r.cpu == "cpu-total""#,
293            )
294            .window(Duration::Minutes(5))
295            .mean()
296            .duplicate("_stop", "_time")
297            .window(Duration::Infinity)
298            .build();
299
300        assert_eq!(actual, expected);
301    }
302}