1use std::fmt::Write;
8
9use rinfluxdb_types::{Duration, InstantOrDuration};
10
11use super::query::Query;
12
13#[derive(Debug)]
14enum Statement {
15 Range(InstantOrDuration, InstantOrDuration),
16 RangeStart(InstantOrDuration),
17 RangeStop(InstantOrDuration),
18 Filter(String),
19 Window(Duration),
20 Aggregate(String),
21 Duplicate(String, String),
22 AggregateWindow(String, Duration),
23}
24
25pub struct QueryBuilder {
52 bucket: String,
53 statements: Vec<Statement>,
54}
55
56impl QueryBuilder {
57 pub fn from<T>(bucket: T) -> Self
59 where
60 T: Into<String>,
61 {
62 Self {
63 bucket: bucket.into(),
64 statements: vec![],
65 }
66 }
67
68 fn statement(&mut self, statement: Statement) {
69 self.statements.push(statement);
70 }
71
72 pub fn range_start<T>(mut self, start: T) -> Self
74 where
75 T: Into<InstantOrDuration>,
76 {
77 self.statement(Statement::RangeStart(start.into()));
78 self
79 }
80
81 pub fn range_stop<T>(mut self, stop: T) -> Self
83 where
84 T: Into<InstantOrDuration>,
85 {
86 self.statement(Statement::RangeStop(stop.into()));
87 self
88 }
89
90 pub fn range<T, S>(mut self, start: T, stop: S) -> Self
92 where
93 T: Into<InstantOrDuration>,
94 S: Into<InstantOrDuration>,
95 {
96 self.statement(Statement::Range(start.into(), stop.into()));
97 self
98 }
99
100 pub fn filter<T>(mut self, filter: T) -> Self
102 where
103 T: Into<String>,
104 {
105 self.statement(Statement::Filter(filter.into()));
106 self
107 }
108
109 pub fn window<T>(mut self, every: T) -> Self
111 where
112 T: Into<Duration>,
113 {
114 self.statement(Statement::Window(every.into()));
115 self
116 }
117
118 pub fn aggregate<T>(mut self, fn_: T) -> Self
120 where
121 T: Into<String>,
122 {
123 self.statement(Statement::Aggregate(fn_.into()));
124 self
125 }
126
127 pub fn mean(self) -> Self {
129 self.aggregate("mean")
130 }
131
132 pub fn duplicate<T, S>(mut self, column: T, as_: S) -> Self
134 where
135 T: Into<String>,
136 S: Into<String>,
137 {
138 self.statement(Statement::Duplicate(column.into(), as_.into()));
142 self
143 }
144
145 pub fn aggregate_window<T, S>(mut self, fn_: S, every: Duration) -> Self
147 where
148 T: Into<String>,
149 S: Into<String>,
150 {
151 self.statement(Statement::AggregateWindow(fn_.into(), every));
152 self
153 }
154
155 pub fn build(self) -> Query {
157 let mut result = String::new();
158
159 writeln!(&mut result, "from(bucket: \"{}\")", self.bucket).unwrap();
160
161 for statement in self.statements {
162
163 match statement {
165 Statement::Range(start, stop) => writeln!(
166 &mut result,
167 " |> range(start: {}, stop: {})",
168 start.to_string(),
169 stop.to_string(),
170 )
171 .unwrap(),
172 Statement::RangeStart(start) => writeln!(
173 &mut result,
174 " |> range(start: {})",
175 start.to_string(),
176 )
177 .unwrap(),
178 Statement::RangeStop(stop) => writeln!(
179 &mut result,
180 " |> range(stop: {})",
181 stop.to_string(),
182 )
183 .unwrap(),
184 Statement::Filter(filter) => {
185 writeln!(&mut result, " |> filter(fn: (r) =>").unwrap();
186 for line in filter.lines() {
187 writeln!(&mut result, " {}", line.trim_start()).unwrap();
188 }
189 writeln!(&mut result, " )").unwrap();
190 }
191 Statement::Window(every) => writeln!(
192 &mut result,
193 " |> window(every: {})",
194 every.to_string(),
195 )
196 .unwrap(),
197 Statement::Aggregate(fn_) => writeln!(
198 &mut result,
199 " |> {}()",
200 fn_,
201 )
202 .unwrap(),
203 Statement::Duplicate(column, as_) => writeln!(
204 &mut result,
205 " |> duplicate(column: \"{}\", as: \"{}\")",
206 column,
207 as_,
208 )
209 .unwrap(),
210 Statement::AggregateWindow(fn_, every) => writeln!(
211 &mut result,
212 " |> aggregate_window(fn: {}, every: {})",
213 fn_,
214 every.to_string(),
215 )
216 .unwrap(),
217 }
218 }
219
220 write!(&mut result, " |> yield()").unwrap();
221
222 Query::new(result)
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229
230 #[test]
231 fn simple_query() {
232 let expected = Query::new(
233 "from(bucket: \"telegraf/autogen\")
234 |> range(start: -15m)
235 |> yield()",
236 );
237
238 let actual = QueryBuilder::from("telegraf/autogen")
239 .range_start(Duration::Minutes(-15))
240 .build();
241
242 assert_eq!(actual, expected);
243 }
244
245 #[test]
246 fn simple_query_with_filter() {
247 let expected = Query::new(
248 r#"from(bucket: "telegraf/autogen")
249 |> range(start: -15m)
250 |> filter(fn: (r) =>
251 r._measurement == "cpu" and
252 r._field == "usage_system" and
253 r.cpu == "cpu-total"
254 )
255 |> yield()"#,
256 );
257
258 let actual = QueryBuilder::from("telegraf/autogen")
259 .range_start(Duration::Minutes(-15))
260 .filter(
261 r#"r._measurement == "cpu" and
262 r._field == "usage_system" and
263 r.cpu == "cpu-total""#,
264 )
265 .build();
266
267 assert_eq!(actual, expected);
268 }
269
270 #[test]
271 fn simple_query_with_statements() {
272 let expected = Query::new(
273 r#"from(bucket: "telegraf/autogen")
274 |> range(start: -1h)
275 |> filter(fn: (r) =>
276 r._measurement == "cpu" and
277 r._field == "usage_system" and
278 r.cpu == "cpu-total"
279 )
280 |> window(every: 5m)
281 |> mean()
282 |> duplicate(column: "_stop", as: "_time")
283 |> window(every: inf)
284 |> yield()"#,
285 );
286
287 let actual = QueryBuilder::from("telegraf/autogen")
288 .range_start(Duration::Hours(-1))
289 .filter(
290 r#"r._measurement == "cpu" and
291 r._field == "usage_system" and
292 r.cpu == "cpu-total""#,
293 )
294 .window(Duration::Minutes(5))
295 .mean()
296 .duplicate("_stop", "_time")
297 .window(Duration::Infinity)
298 .build();
299
300 assert_eq!(actual, expected);
301 }
302}