influxlp_tools/builder.rs
1//! A line is built by using the builder methods
2//!
3//! To build a line protocol string start by calling [LineProtocol::new] with
4//! the measurement name. Afterwards you can use the different methods, e.g,
5//! [LineProtocol::add_tag] or [LineProtocol::add_field] to populate the
6//! datapoint. When you are finished call [LineProtocol::build] to convert the
7//! struct into a valid line protocol string
8
9use std::collections::HashMap;
10
11use crate::{
12 element::{FieldKey, FieldValue, Measurement, TagKey, TagValue},
13 error::BuilderError,
14 traits::Format,
15 LineProtocol,
16};
17
18use crate::error::Result;
19
20impl LineProtocol {
21 /// Create a new [LineProtocol] for building a single data point
22 ///
23 /// # Args
24 /// * `measurement` - A [valid](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#measurement)
25 /// measurement name
26 pub fn new<T>(measurement: T) -> Self
27 where
28 T: Into<Measurement>,
29 {
30 Self {
31 measurement: measurement.into(),
32 tags: None,
33 fields: HashMap::new(),
34 timestamp: None,
35 }
36 }
37
38 /// Overwrite the measurement name with a new name
39 ///
40 /// # Example
41 /// ```rust
42 /// let mut line_protocol = LineProtocol::new("measurement").add_field("key", "value");
43 ///
44 /// line_protocol = line_protocol.measurement("new_measurement");
45 /// ```
46 ///
47 /// # Args
48 /// * `measurement` - A [valid](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#measurement)
49 /// measurement name
50 pub fn measurement<T>(mut self, measurement: T) -> Self
51 where
52 T: Into<Measurement>,
53 {
54 self.measurement = measurement.into();
55 self
56 }
57
58 /// Overwrite the measurement name with a new name
59 ///
60 /// # Example
61 /// ```rust
62 /// let mut line_protocol = LineProtocol::new("measurement").add_field("key", "value");
63 ///
64 /// line_protocol.measurement_ref("new_measurement");
65 /// ```
66 ///
67 /// # Args
68 /// * `measurement` - A [valid](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#measurement)
69 /// measurement name
70 pub fn measurement_ref<T>(&mut self, measurement: T)
71 where
72 T: Into<Measurement>,
73 {
74 self.measurement = measurement.into();
75 }
76
77 /// Add or update a [tag key-value pair](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#tag-set) to the data point
78 ///
79 /// This function is useful if you want to follow a builder pattern
80 ///
81 /// # Example
82 /// ```rust
83 /// let line_protocol = LineProtocol::new("measurement").add_tag("key", "value");
84 /// ```
85 ///
86 /// # Args
87 /// * `key` - A [valid](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#special-characters)
88 /// tag key
89 /// * `value` - A [valid](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#special-characters)
90 /// tag value
91 pub fn add_tag<K, V>(mut self, key: K, value: V) -> Self
92 where
93 K: Into<TagKey>,
94 V: Into<TagValue>,
95 {
96 self.tags
97 .get_or_insert(HashMap::new())
98 .insert(key.into(), value.into());
99 self
100 }
101
102 /// Add or update a [tag key-value pair](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#tag-set) to the data point
103 ///
104 /// This function is useful if you want to build a data point dynamically
105 ///
106 /// # Example
107 /// ```rust
108 /// let line_protocol = LineProtocol::new("measurement");
109 ///
110 /// for (key, value) in tags {
111 /// line_protocol.add_tag_ref(key, value);
112 /// }
113 /// ```
114 ///
115 /// # Args
116 /// * `key` - A [valid](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#special-characters)
117 /// tag key
118 /// * `value` - A [valid](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#special-characters)
119 /// tag value
120 pub fn add_tag_ref<K, V>(&mut self, key: K, value: V)
121 where
122 K: Into<TagKey>,
123 V: Into<TagValue>,
124 {
125 self.tags
126 .get_or_insert(HashMap::new())
127 .insert(key.into(), value.into());
128 }
129
130 /// Delete a tag from the data point
131 ///
132 /// # Args
133 /// * `key` - An existing [TagKey]
134 pub fn delete_tag<K>(mut self, key: K) -> Self
135 where
136 K: Into<TagKey>,
137 {
138 self.tags.get_or_insert(HashMap::new()).remove(&key.into());
139 self
140 }
141
142 /// Delete a tag from the data point
143 ///
144 /// # Args
145 /// * `key` - An existing [TagKey]
146 pub fn delete_tag_ref<K>(&mut self, key: K)
147 where
148 K: Into<TagKey>,
149 {
150 self.tags.get_or_insert(HashMap::new()).remove(&key.into());
151 }
152
153 /// Add or update a [field key-value pair](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#field-set) to the data point
154 ///
155 /// This function is useful if you want to follow a builder pattern
156 ///
157 /// # Example
158 /// ```rust
159 /// let line_protocol = LineProtocol::new("measurement").add_field("key", "value");
160 /// ```
161 ///
162 /// # Args
163 /// * `key` - A [valid](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#special-characters)
164 /// field key
165 /// * `value` - A [valid](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#special-characters)
166 /// field value
167 pub fn add_field<K, V>(mut self, key: K, value: V) -> Self
168 where
169 K: Into<FieldKey>,
170 V: Into<FieldValue>,
171 {
172 self.fields.insert(key.into(), value.into());
173 self
174 }
175
176 /// Add or update a [field key-value pair](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#field-set) to the data point
177 ///
178 /// This function is useful if you want to build a data point dynamically
179 ///
180 /// # Example
181 /// ```rust
182 /// let line_protocol = LineProtocol::new("measurement");
183 ///
184 /// for (key, value) in fields {
185 /// line_protocol.add_field_ref(key, value);
186 /// }
187 /// ```
188 ///
189 /// # Args
190 /// * `key` - A [valid](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#special-characters)
191 /// field key
192 /// * `value` - A [valid](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#special-characters)
193 /// field value
194 pub fn add_field_ref<K, V>(&mut self, key: K, value: V)
195 where
196 K: Into<FieldKey>,
197 V: Into<FieldValue>,
198 {
199 self.fields.insert(key.into(), value.into());
200 }
201
202 /// Delete a field from the data point
203 ///
204 /// # Args
205 /// * `key` - An existing [FieldKey]
206 pub fn delete_field<K>(mut self, key: K) -> Self
207 where
208 K: Into<FieldKey>,
209 {
210 self.fields.remove(&key.into());
211 self
212 }
213
214 /// Delete a field from the data point
215 ///
216 /// # Args
217 /// * `key` - An existing [FieldKey]
218 pub fn delete_field_ref<K>(&mut self, key: K)
219 where
220 K: Into<FieldKey>,
221 {
222 self.fields.remove(&key.into());
223 }
224
225 /// Set the timestamp for the data point
226 ///
227 /// It is [recommend](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#timestamp)
228 /// to set a timestamp. By default InfluxDB v2 expects the timestamp to be
229 /// in nanosecond precision. If you are using any other form of
230 /// precision it needs to be explicitly set when making the query
231 ///
232 /// # Example
233 /// ```rust
234 /// let line_protocol = LineProtocol::new("measurement");
235 /// .with_timestamp(1729270461612452700i64);
236 /// ```
237 ///
238 /// # Args
239 /// * `timestamp` - A unix timestamp
240 pub fn with_timestamp<T>(mut self, timestamp: T) -> Self
241 where
242 T: Into<i64>,
243 {
244 self.timestamp = Some(timestamp.into());
245 self
246 }
247
248 /// Set the timestamp for the data point
249 ///
250 /// It is [recommend](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#timestamp)
251 /// to set a timestamp. By default InfluxDB v2 expects the timestamp to be
252 /// in nanosecond precision. If you are using any other form of
253 /// precision it needs to be explicitly set when making the query
254 ///
255 /// # Example
256 /// ```rust
257 /// let line_protocol = LineProtocol::new("measurement");
258 /// line_protocol.with_timestamp_ref(1729270461612452700i64);
259 /// ```
260 ///
261 /// # Args
262 /// * `timestamp` - A unix timestamp
263 pub fn with_timestamp_ref<T>(&mut self, timestamp: T)
264 where
265 T: Into<i64>,
266 {
267 self.timestamp = Some(timestamp.into());
268 }
269
270 /// Delete the set timestamp
271 ///
272 /// # Example
273 /// ```rust
274 /// let mut line_protocol = LineProtocol::new("measurement")
275 /// .add_field("key", "value")
276 /// .with_timestamp_ref(1729270461612452700i64);
277 ///
278 /// line_protocol = line_protocol.delete_timestamp();
279 /// ```
280 pub fn delete_timestamp(mut self) -> Self {
281 self.timestamp = None;
282 self
283 }
284
285 /// Delete the set timestamp
286 ///
287 /// # Example
288 /// ```rust
289 /// let mut line_protocol = LineProtocol::new("measurement")
290 /// .add_field("key", "value")
291 /// .with_timestamp_ref(1729270461612452700i64);
292 ///
293 /// line_protocol.delete_timestamp();
294 /// ```
295 pub fn delete_timestamp_ref(&mut self) {
296 self.timestamp = None;
297 }
298
299 /// Builds an InfluxDB v2 data point using the previously defined
300 /// measurement name, optional tags, fields, and an optional timestamp
301 ///
302 /// In addition validation checks are performed on the individual parts
303 pub fn build(&self) -> Result<String> {
304 if self.measurement.0.is_empty() {
305 return Err(BuilderError::EmptyMeasurement.into());
306 }
307
308 if self.measurement.0.starts_with("_") {
309 return Err(BuilderError::InvalidMeasurement.into());
310 }
311
312 let mut line_protocol = format!("{}", self.measurement.escape());
313
314 if let Some(tags) = &self.tags {
315 let mut formatted_tags = Vec::new();
316 for (key, value) in tags {
317 // Influx naming restriction
318 // https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/#naming-restrictions
319 if key.0.is_empty() {
320 return Err(BuilderError::EmptyTagKey.into());
321 }
322
323 if key.0.starts_with("_") {
324 return Err(BuilderError::InvalidTagKey.into());
325 }
326
327 if value.0.is_empty() {
328 return Err(BuilderError::EmptyTagValue.into());
329 }
330
331 formatted_tags.push(format!("{}={}", key.escape(), value.escape()));
332 }
333
334 // Influx best practices
335 // https://docs.influxdata.com/influxdb/v2/write-data/best-practices/optimize-writes/#sort-tags-by-key
336 formatted_tags.sort();
337 line_protocol = format!("{line_protocol},{}", formatted_tags.join(","))
338 }
339
340 let mut formatted_fields = Vec::new();
341 for (key, value) in &self.fields {
342 // Influx naming restriction
343 // https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/#naming-restrictions
344 if key.0.is_empty() {
345 return Err(BuilderError::EmptyFieldKey.into());
346 }
347
348 if key.0.starts_with("_") {
349 return Err(BuilderError::InvalidFieldKey.into());
350 }
351
352 if let FieldValue::String(string) = value {
353 if string.is_empty() {
354 return Err(BuilderError::EmptyFieldValue.into());
355 }
356 }
357
358 formatted_fields.push(format!("{}={}", key.escape(), value.escape()));
359 }
360
361 if formatted_fields.is_empty() {
362 return Err(BuilderError::MissingFields.into());
363 }
364
365 formatted_fields.sort();
366 line_protocol = format!("{line_protocol} {}", formatted_fields.join(","));
367
368 if let Some(timestamp) = self.timestamp {
369 line_protocol = format!("{line_protocol} {timestamp}");
370 }
371
372 Ok(line_protocol)
373 }
374}
375
376#[cfg(test)]
377mod test {
378 use super::*;
379
380 #[test]
381 fn test_builder_valid_missing_tags() {
382 let result = LineProtocol::new("measurement")
383 .add_field("field", "value")
384 .with_timestamp(1729270461612452700i64)
385 .build();
386 assert!(result.is_ok());
387
388 let line = result.unwrap();
389 assert_eq!(line, "measurement field=\"value\" 1729270461612452700")
390 }
391
392 #[test]
393 fn test_builder_valid() {
394 let result = LineProtocol::new("measurement")
395 .add_tag("tag1", "value")
396 .add_tag("tag2", "value")
397 .add_field("field1", "value")
398 .add_field("field2", "{\"foo\": \"bar\"}")
399 .add_field("field3", "[\"hello\", \"world\"]")
400 .add_field("field4", true)
401 .add_field("field5", 10.0)
402 .add_field("field6", 10)
403 .add_field("field7", 0.5)
404 .with_timestamp(1729270461612452700i64)
405 .build();
406 assert!(result.is_ok());
407
408 let line = result.unwrap();
409 assert_eq!(
410 line,
411 "measurement,tag1=value,tag2=value field1=\"value\",field2=\"{\\\"foo\\\": \
412 \\\"bar\\\"}\",field3=\"[\\\"hello\\\", \
413 \\\"world\\\"]\",field4=true,field5=10,field6=10i,field7=0.5 1729270461612452700"
414 )
415 }
416
417 #[test]
418 fn test_builder_missing_field_is_err() {
419 let result = LineProtocol::new("measurement").build();
420 assert!(result.is_err());
421 }
422
423 #[test]
424 fn test_builder_empty_measurement_is_err() {
425 let result = LineProtocol::new("").add_field("field", "value").build();
426 assert!(result.is_err());
427 }
428
429 #[test]
430 fn test_builder_invalid_measurement_is_err() {
431 let result = LineProtocol::new("_measurement")
432 .add_field("field", "value")
433 .build();
434 assert!(result.is_err());
435 }
436
437 #[test]
438 fn test_builder_empty_tag_key_is_err() {
439 let result = LineProtocol::new("measurement")
440 .add_tag("", "value")
441 .add_field("field", "value")
442 .build();
443 assert!(result.is_err());
444 }
445
446 #[test]
447 fn test_builder_invalid_tag_key_is_err() {
448 let result = LineProtocol::new("measurement")
449 .add_tag("_tag", "value")
450 .add_field("field", "value")
451 .build();
452 assert!(result.is_err());
453 }
454
455 #[test]
456 fn test_builder_empty_tag_value_is_err() {
457 let result = LineProtocol::new("measurement")
458 .add_tag("key", "")
459 .add_field("field", "value")
460 .build();
461 assert!(result.is_err());
462 }
463
464 #[test]
465 fn test_builder_empty_field_key_is_err() {
466 let result = LineProtocol::new("measurement")
467 .add_field("", "value")
468 .build();
469 assert!(result.is_err());
470 }
471
472 #[test]
473 fn test_builder_invalid_field_key_is_err() {
474 let result = LineProtocol::new("measurement")
475 .add_tag("tag", "value")
476 .add_field("_field", "value")
477 .build();
478 assert!(result.is_err());
479 }
480
481 #[test]
482 fn test_builder_empty_field_value_is_err() {
483 let result = LineProtocol::new("measurement")
484 .add_field("field", "")
485 .build();
486 assert!(result.is_err());
487 }
488}