Skip to main content

influxlp_tools/
builder.rs

1//! A line is built by using the builder methods
2//!
3//! To build a line protocol string start by calling [LineProtocol::new] with
4//! the measurement name. Afterwards you can use the different methods, e.g,
5//! [LineProtocol::add_tag] or [LineProtocol::add_field] to populate the
6//! datapoint. When you are finished call [LineProtocol::build] to convert the
7//! struct into a valid line protocol string
8
9use std::collections::HashMap;
10
11use crate::{
12    element::{FieldKey, FieldValue, Measurement, TagKey, TagValue},
13    error::BuilderError,
14    traits::Format,
15    LineProtocol,
16};
17
18use crate::error::Result;
19
20impl LineProtocol {
21    /// Create a new [LineProtocol] for building a single data point
22    ///
23    /// # Args
24    /// * `measurement` - A [valid](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#measurement)
25    ///   measurement name
26    pub fn new<T>(measurement: T) -> Self
27    where
28        T: Into<Measurement>,
29    {
30        Self {
31            measurement: measurement.into(),
32            tags: None,
33            fields: HashMap::new(),
34            timestamp: None,
35        }
36    }
37
38    /// Overwrite the measurement name with a new name
39    ///
40    /// # Example
41    /// ```rust
42    /// let mut line_protocol = LineProtocol::new("measurement").add_field("key", "value");
43    ///
44    /// line_protocol = line_protocol.measurement("new_measurement");
45    /// ```
46    ///
47    /// # Args
48    /// * `measurement` - A [valid](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#measurement)
49    ///   measurement name
50    pub fn measurement<T>(mut self, measurement: T) -> Self
51    where
52        T: Into<Measurement>,
53    {
54        self.measurement = measurement.into();
55        self
56    }
57
58    /// Overwrite the measurement name with a new name
59    ///
60    /// # Example
61    /// ```rust
62    /// let mut line_protocol = LineProtocol::new("measurement").add_field("key", "value");
63    ///
64    /// line_protocol.measurement_ref("new_measurement");
65    /// ```
66    ///
67    /// # Args
68    /// * `measurement` - A [valid](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#measurement)
69    ///   measurement name
70    pub fn measurement_ref<T>(&mut self, measurement: T)
71    where
72        T: Into<Measurement>,
73    {
74        self.measurement = measurement.into();
75    }
76
77    /// Add or update a [tag key-value pair](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#tag-set) to the data point
78    ///
79    /// This function is useful if you want to follow a builder pattern
80    ///
81    /// # Example
82    /// ```rust
83    /// let line_protocol = LineProtocol::new("measurement").add_tag("key", "value");
84    /// ```
85    ///
86    /// # Args
87    /// * `key` - A [valid](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#special-characters)
88    ///   tag key
89    /// * `value` - A [valid](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#special-characters)
90    ///   tag value
91    pub fn add_tag<K, V>(mut self, key: K, value: V) -> Self
92    where
93        K: Into<TagKey>,
94        V: Into<TagValue>,
95    {
96        self.tags
97            .get_or_insert(HashMap::new())
98            .insert(key.into(), value.into());
99        self
100    }
101
102    /// Add or update a [tag key-value pair](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#tag-set) to the data point
103    ///
104    /// This function is useful if you want to build a data point dynamically
105    ///
106    /// # Example
107    /// ```rust
108    /// let line_protocol = LineProtocol::new("measurement");
109    ///
110    /// for (key, value) in tags {
111    ///     line_protocol.add_tag_ref(key, value);
112    /// }
113    /// ```
114    ///
115    /// # Args
116    /// * `key` - A [valid](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#special-characters)
117    ///   tag key
118    /// * `value` - A [valid](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#special-characters)
119    ///   tag value
120    pub fn add_tag_ref<K, V>(&mut self, key: K, value: V)
121    where
122        K: Into<TagKey>,
123        V: Into<TagValue>,
124    {
125        self.tags
126            .get_or_insert(HashMap::new())
127            .insert(key.into(), value.into());
128    }
129
130    /// Delete a tag from the data point
131    ///
132    /// # Args
133    /// * `key` - An existing [TagKey]
134    pub fn delete_tag<K>(mut self, key: K) -> Self
135    where
136        K: Into<TagKey>,
137    {
138        self.tags.get_or_insert(HashMap::new()).remove(&key.into());
139        self
140    }
141
142    /// Delete a tag from the data point
143    ///
144    /// # Args
145    /// * `key` - An existing [TagKey]
146    pub fn delete_tag_ref<K>(&mut self, key: K)
147    where
148        K: Into<TagKey>,
149    {
150        self.tags.get_or_insert(HashMap::new()).remove(&key.into());
151    }
152
153    /// Add or update a [field key-value pair](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#field-set) to the data point
154    ///
155    /// This function is useful if you want to follow a builder pattern
156    ///
157    /// # Example
158    /// ```rust
159    /// let line_protocol = LineProtocol::new("measurement").add_field("key", "value");
160    /// ```
161    ///
162    /// # Args
163    /// * `key` - A [valid](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#special-characters)
164    ///   field key
165    /// * `value` - A [valid](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#special-characters)
166    ///   field value
167    pub fn add_field<K, V>(mut self, key: K, value: V) -> Self
168    where
169        K: Into<FieldKey>,
170        V: Into<FieldValue>,
171    {
172        self.fields.insert(key.into(), value.into());
173        self
174    }
175
176    /// Add or update a [field key-value pair](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#field-set) to the data point
177    ///
178    /// This function is useful if you want to build a data point dynamically
179    ///
180    /// # Example
181    /// ```rust
182    /// let line_protocol = LineProtocol::new("measurement");
183    ///
184    /// for (key, value) in fields {
185    ///     line_protocol.add_field_ref(key, value);
186    /// }
187    /// ```
188    ///
189    /// # Args
190    /// * `key` - A [valid](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#special-characters)
191    ///   field key
192    /// * `value` - A [valid](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#special-characters)
193    ///   field value
194    pub fn add_field_ref<K, V>(&mut self, key: K, value: V)
195    where
196        K: Into<FieldKey>,
197        V: Into<FieldValue>,
198    {
199        self.fields.insert(key.into(), value.into());
200    }
201
202    /// Delete a field from the data point
203    ///
204    /// # Args
205    /// * `key` - An existing [FieldKey]
206    pub fn delete_field<K>(mut self, key: K) -> Self
207    where
208        K: Into<FieldKey>,
209    {
210        self.fields.remove(&key.into());
211        self
212    }
213
214    /// Delete a field from the data point
215    ///
216    /// # Args
217    /// * `key` - An existing [FieldKey]
218    pub fn delete_field_ref<K>(&mut self, key: K)
219    where
220        K: Into<FieldKey>,
221    {
222        self.fields.remove(&key.into());
223    }
224
225    /// Set the timestamp for the data point
226    ///
227    /// It is [recommend](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#timestamp)
228    /// to set a timestamp. By default InfluxDB v2 expects the timestamp to be
229    /// in nanosecond precision. If you are using any other form of
230    /// precision it needs to be explicitly set when making the query
231    ///
232    /// # Example
233    /// ```rust
234    /// let line_protocol = LineProtocol::new("measurement");
235    ///     .with_timestamp(1729270461612452700i64);
236    /// ```
237    ///
238    /// # Args
239    /// * `timestamp` - A unix timestamp
240    pub fn with_timestamp<T>(mut self, timestamp: T) -> Self
241    where
242        T: Into<i64>,
243    {
244        self.timestamp = Some(timestamp.into());
245        self
246    }
247
248    /// Set the timestamp for the data point
249    ///
250    /// It is [recommend](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#timestamp)
251    /// to set a timestamp. By default InfluxDB v2 expects the timestamp to be
252    /// in nanosecond precision. If you are using any other form of
253    /// precision it needs to be explicitly set when making the query
254    ///
255    /// # Example
256    /// ```rust
257    /// let line_protocol = LineProtocol::new("measurement");
258    /// line_protocol.with_timestamp_ref(1729270461612452700i64);
259    /// ```
260    ///
261    /// # Args
262    /// * `timestamp` - A unix timestamp
263    pub fn with_timestamp_ref<T>(&mut self, timestamp: T)
264    where
265        T: Into<i64>,
266    {
267        self.timestamp = Some(timestamp.into());
268    }
269
270    /// Delete the set timestamp
271    ///
272    /// # Example
273    /// ```rust
274    /// let mut line_protocol = LineProtocol::new("measurement")
275    ///     .add_field("key", "value")
276    ///     .with_timestamp_ref(1729270461612452700i64);
277    ///
278    /// line_protocol = line_protocol.delete_timestamp();
279    /// ```
280    pub fn delete_timestamp(mut self) -> Self {
281        self.timestamp = None;
282        self
283    }
284
285    /// Delete the set timestamp
286    ///
287    /// # Example
288    /// ```rust
289    /// let mut line_protocol = LineProtocol::new("measurement")
290    ///     .add_field("key", "value")
291    ///     .with_timestamp_ref(1729270461612452700i64);
292    ///
293    /// line_protocol.delete_timestamp();
294    /// ```
295    pub fn delete_timestamp_ref(&mut self) {
296        self.timestamp = None;
297    }
298
299    /// Builds an InfluxDB v2 data point using the previously defined
300    /// measurement name, optional tags, fields, and an optional timestamp
301    ///
302    /// In addition validation checks are performed on the individual parts
303    pub fn build(&self) -> Result<String> {
304        if self.measurement.0.is_empty() {
305            return Err(BuilderError::EmptyMeasurement.into());
306        }
307
308        if self.measurement.0.starts_with("_") {
309            return Err(BuilderError::InvalidMeasurement.into());
310        }
311
312        let mut line_protocol = format!("{}", self.measurement.escape());
313
314        if let Some(tags) = &self.tags {
315            let mut formatted_tags = Vec::new();
316            for (key, value) in tags {
317                // Influx naming restriction
318                // https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/#naming-restrictions
319                if key.0.is_empty() {
320                    return Err(BuilderError::EmptyTagKey.into());
321                }
322
323                if key.0.starts_with("_") {
324                    return Err(BuilderError::InvalidTagKey.into());
325                }
326
327                if value.0.is_empty() {
328                    return Err(BuilderError::EmptyTagValue.into());
329                }
330
331                formatted_tags.push(format!("{}={}", key.escape(), value.escape()));
332            }
333
334            // Influx best practices
335            // https://docs.influxdata.com/influxdb/v2/write-data/best-practices/optimize-writes/#sort-tags-by-key
336            formatted_tags.sort();
337            line_protocol = format!("{line_protocol},{}", formatted_tags.join(","))
338        }
339
340        let mut formatted_fields = Vec::new();
341        for (key, value) in &self.fields {
342            // Influx naming restriction
343            // https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/#naming-restrictions
344            if key.0.is_empty() {
345                return Err(BuilderError::EmptyFieldKey.into());
346            }
347
348            if key.0.starts_with("_") {
349                return Err(BuilderError::InvalidFieldKey.into());
350            }
351
352            if let FieldValue::String(string) = value {
353                if string.is_empty() {
354                    return Err(BuilderError::EmptyFieldValue.into());
355                }
356            }
357
358            formatted_fields.push(format!("{}={}", key.escape(), value.escape()));
359        }
360
361        if formatted_fields.is_empty() {
362            return Err(BuilderError::MissingFields.into());
363        }
364
365        formatted_fields.sort();
366        line_protocol = format!("{line_protocol} {}", formatted_fields.join(","));
367
368        if let Some(timestamp) = self.timestamp {
369            line_protocol = format!("{line_protocol} {timestamp}");
370        }
371
372        Ok(line_protocol)
373    }
374}
375
376#[cfg(test)]
377mod test {
378    use super::*;
379
380    #[test]
381    fn test_builder_valid_missing_tags() {
382        let result = LineProtocol::new("measurement")
383            .add_field("field", "value")
384            .with_timestamp(1729270461612452700i64)
385            .build();
386        assert!(result.is_ok());
387
388        let line = result.unwrap();
389        assert_eq!(line, "measurement field=\"value\" 1729270461612452700")
390    }
391
392    #[test]
393    fn test_builder_valid() {
394        let result = LineProtocol::new("measurement")
395            .add_tag("tag1", "value")
396            .add_tag("tag2", "value")
397            .add_field("field1", "value")
398            .add_field("field2", "{\"foo\": \"bar\"}")
399            .add_field("field3", "[\"hello\", \"world\"]")
400            .add_field("field4", true)
401            .add_field("field5", 10.0)
402            .add_field("field6", 10)
403            .add_field("field7", 0.5)
404            .with_timestamp(1729270461612452700i64)
405            .build();
406        assert!(result.is_ok());
407
408        let line = result.unwrap();
409        assert_eq!(
410            line,
411            "measurement,tag1=value,tag2=value field1=\"value\",field2=\"{\\\"foo\\\": \
412             \\\"bar\\\"}\",field3=\"[\\\"hello\\\", \
413             \\\"world\\\"]\",field4=true,field5=10,field6=10i,field7=0.5 1729270461612452700"
414        )
415    }
416
417    #[test]
418    fn test_builder_missing_field_is_err() {
419        let result = LineProtocol::new("measurement").build();
420        assert!(result.is_err());
421    }
422
423    #[test]
424    fn test_builder_empty_measurement_is_err() {
425        let result = LineProtocol::new("").add_field("field", "value").build();
426        assert!(result.is_err());
427    }
428
429    #[test]
430    fn test_builder_invalid_measurement_is_err() {
431        let result = LineProtocol::new("_measurement")
432            .add_field("field", "value")
433            .build();
434        assert!(result.is_err());
435    }
436
437    #[test]
438    fn test_builder_empty_tag_key_is_err() {
439        let result = LineProtocol::new("measurement")
440            .add_tag("", "value")
441            .add_field("field", "value")
442            .build();
443        assert!(result.is_err());
444    }
445
446    #[test]
447    fn test_builder_invalid_tag_key_is_err() {
448        let result = LineProtocol::new("measurement")
449            .add_tag("_tag", "value")
450            .add_field("field", "value")
451            .build();
452        assert!(result.is_err());
453    }
454
455    #[test]
456    fn test_builder_empty_tag_value_is_err() {
457        let result = LineProtocol::new("measurement")
458            .add_tag("key", "")
459            .add_field("field", "value")
460            .build();
461        assert!(result.is_err());
462    }
463
464    #[test]
465    fn test_builder_empty_field_key_is_err() {
466        let result = LineProtocol::new("measurement")
467            .add_field("", "value")
468            .build();
469        assert!(result.is_err());
470    }
471
472    #[test]
473    fn test_builder_invalid_field_key_is_err() {
474        let result = LineProtocol::new("measurement")
475            .add_tag("tag", "value")
476            .add_field("_field", "value")
477            .build();
478        assert!(result.is_err());
479    }
480
481    #[test]
482    fn test_builder_empty_field_value_is_err() {
483        let result = LineProtocol::new("measurement")
484            .add_field("field", "")
485            .build();
486        assert!(result.is_err());
487    }
488}