influxdb_line_protocol/
builder.rs

1//! Typestate [line protocol] builder.
2//!
3//! [line protocol]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol
4//! [special characters]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#special-characters
5use bytes::BufMut;
6use std::{
7    fmt::{self},
8    marker::PhantomData,
9};
10
11// https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#special-characters
12const COMMA_EQ_SPACE: [char; 3] = [',', '=', ' '];
13const COMMA_SPACE: [char; 2] = [',', ' '];
14const DOUBLE_QUOTE: [char; 1] = ['"'];
15
16#[doc(hidden)]
17#[derive(Clone, Copy, Debug, Default)]
18pub struct BeforeMeasurement;
19#[doc(hidden)]
20#[derive(Clone, Copy, Debug)]
21pub struct AfterMeasurement;
22#[doc(hidden)]
23#[derive(Clone, Copy, Debug)]
24pub struct AfterTag;
25#[doc(hidden)]
26#[derive(Clone, Copy, Debug)]
27pub struct AfterField;
28#[doc(hidden)]
29#[derive(Clone, Copy, Debug)]
30pub struct AfterTimestamp;
31
32/// Implements a [line protocol] builder.
33///
34/// A [`LineProtocolBuilder`] is a statically-typed InfluxDB [line protocol] builder.
35/// It writes one or more lines of [line protocol] to a [`bytes::BufMut`].
36///
37/// ```
38/// use influxdb_line_protocol::LineProtocolBuilder;
39/// let lp = LineProtocolBuilder::new()
40///     .measurement("foo")
41///     .tag("bar", "baz")
42///     .field("qux", 42.0)
43///     .close_line();
44///
45/// assert_eq!(lp.build(), b"foo,bar=baz qux=42\n");
46/// ```
47///
48/// [`LineProtocolBuilder`] never returns runtime errors. Instead, it employs a type-level state machine
49/// to guarantee that users can't build a syntactically-malformed line protocol batch.
50///
51/// This builder does not check for semantic errors. In particular, it does not check for duplicate tag and field
52/// names, nor it does enforce [naming restrictions] on keys.
53///
54/// Attempts to consume the line protocol before closing a line yield
55/// compile-time errors:
56///
57/// ```compile_fail
58/// # use influxdb_line_protocol::LineProtocolBuilder;
59/// let lp = LineProtocolBuilder::new()
60///     .measurement("foo")
61///     .tag("bar", "baz")
62///
63/// assert_eq!(lp.build(), b"foo,bar=baz qux=42\n");
64/// ```
65///
66/// and attempts to `close_line` the line without at least one field also yield
67/// compile-time errors:
68///
69/// ```compile_fail
70/// # use influxdb_line_protocol::LineProtocolBuilder;
71/// let lp = LineProtocolBuilder::new()
72///     .measurement("foo")
73///     .tag("bar", "baz")
74///     .close_line();
75/// ```
76///
77/// Tags, if any, must be emitted before fields. This will fail to compile:
78///
79/// ```compile_fail
80/// # use influxdb_line_protocol::LineProtocolBuilder;
81/// let lp = LineProtocolBuilder::new()
82///     .measurement("foo")
83///     .field("qux", 42.0);
84///     .tag("bar", "baz")
85///     .close_line();
86/// ```
87///
88/// and timestamps, if any, must be specified last before closing the line:
89///
90/// ```compile_fail
91/// # use influxdb_line_protocol::LineProtocolBuilder;
92/// let lp = LineProtocolBuilder::new()
93///     .measurement("foo")
94///     .timestamp(1234)
95///     .field("qux", 42.0);
96///     .close_line();
97/// ```
98///
99/// (the negative examples part of the documentation is so verbose because it's the only way to test compilation failures)
100///
101/// [line protocol]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol
102/// [special characters]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#special-characters
103/// [naming restrictions]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#naming-restrictions
104#[derive(Debug, Default)]
105pub struct LineProtocolBuilder<B, S = BeforeMeasurement>
106where
107    B: BufMut,
108{
109    buf: B,
110    _marker: PhantomData<S>,
111}
112
113impl LineProtocolBuilder<Vec<u8>, BeforeMeasurement> {
114    /// Creates a new [`LineProtocolBuilder`] with an empty buffer.
115    pub fn new() -> Self {
116        Self::new_with(vec![])
117    }
118}
119
120impl<B> LineProtocolBuilder<B, BeforeMeasurement>
121where
122    B: BufMut,
123{
124    /// Like `new` but appending to an existing `BufMut`.
125    pub fn new_with(buf: B) -> Self {
126        Self {
127            buf,
128            _marker: PhantomData,
129        }
130    }
131
132    /// Provide the measurement name.
133    ///
134    /// It returns a new builder whose type allows only setting tags and fields.
135    pub fn measurement(self, measurement: &str) -> LineProtocolBuilder<B, AfterMeasurement> {
136        let measurement = escape(measurement, COMMA_SPACE);
137        self.write(format_args!("{measurement}"))
138    }
139
140    /// Finish building the line protocol and return the inner buffer.
141    pub fn build(self) -> B {
142        self.buf
143    }
144}
145
146impl<B> LineProtocolBuilder<B, AfterMeasurement>
147where
148    B: BufMut,
149{
150    /// Add a tag (key + value).
151    ///
152    /// Tag keys and tag values will be escaped according to the rules defined in [the special characters documentation].
153    ///
154    /// [special characters]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#special-characters
155    pub fn tag(self, tag_key: &str, tag_value: &str) -> Self {
156        let tag_key = escape(tag_key, COMMA_EQ_SPACE);
157        let tag_value = escape(tag_value, COMMA_EQ_SPACE);
158        self.write(format_args!(",{tag_key}={tag_value}"))
159    }
160
161    /// Add a field (key + value).
162    ///
163    /// Field keys will be escaped according to the rules defined in [the special characters documentation].
164    ///
165    /// Field values will encoded according to the rules defined in [the data types and formats documentation].
166    ///
167    /// This function is called for the first field only. It returns a new builder whose type no longer allows adding tags.
168    ///
169    /// [special characters]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#special-characters
170    /// [data types and formats]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#data-types-and-format
171    pub fn field<F>(self, field_key: &str, field_value: F) -> LineProtocolBuilder<B, AfterField>
172    where
173        F: FieldValue,
174    {
175        self.write(format_args!(" {}", format_field(field_key, &field_value)))
176    }
177}
178
179impl<B> LineProtocolBuilder<B, AfterField>
180where
181    B: BufMut,
182{
183    /// Add a field (key + value).
184    ///
185    /// This function is called for the second and subsequent fields.
186    pub fn field<F: FieldValue>(self, field_key: &str, field_value: F) -> Self {
187        self.write(format_args!(",{}", format_field(field_key, &field_value)))
188    }
189
190    /// Provide a timestamp.
191    ///
192    /// It returns a builder whose type allows only closing the line.
193    ///
194    /// The precision of the timestamp is by default nanoseconds (ns) but the unit
195    /// can be changed when performing the request that carries the line protocol body.
196    /// Setting the unit is outside of the scope of a line protocol builder.
197    pub fn timestamp(self, ts: i64) -> LineProtocolBuilder<B, AfterTimestamp> {
198        self.write(format_args!(" {ts}"))
199    }
200
201    /// Closing a line is required before starting a new one or finishing building the batch.
202    pub fn close_line(self) -> LineProtocolBuilder<B, BeforeMeasurement> {
203        self.close()
204    }
205}
206
207impl<B> LineProtocolBuilder<B, AfterTimestamp>
208where
209    B: BufMut,
210{
211    /// Closing a line is required before starting a new one or finishing building the batch.
212    pub fn close_line(self) -> LineProtocolBuilder<B, BeforeMeasurement> {
213        self.close()
214    }
215}
216
217impl<B, S> LineProtocolBuilder<B, S>
218where
219    B: BufMut,
220{
221    fn close(self) -> LineProtocolBuilder<B, BeforeMeasurement> {
222        self.write(format_args!("\n"))
223    }
224
225    fn write<S2>(self, args: fmt::Arguments<'_>) -> LineProtocolBuilder<B, S2> {
226        use std::io::Write;
227        // MutBuf's Write adapter is infallible.
228        let mut writer = self.buf.writer();
229        write!(&mut writer, "{args}").unwrap();
230        LineProtocolBuilder {
231            buf: writer.into_inner(),
232            _marker: PhantomData,
233        }
234    }
235}
236
237// Return a [`fmt::Display`] that renders string while escaping any characters in the `special_characters` array
238// with a `\`
239fn escape<const N: usize>(src: &str, special_characters: [char; N]) -> Escaped<'_, N> {
240    Escaped {
241        src,
242        special_characters,
243    }
244}
245
246struct Escaped<'a, const N: usize> {
247    src: &'a str,
248    special_characters: [char; N],
249}
250
251impl<'a, const N: usize> fmt::Display for Escaped<'a, N> {
252    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
253        for ch in self.src.chars() {
254            if self.special_characters.contains(&ch) || ch == '\\' {
255                write!(f, "\\")?;
256            }
257            write!(f, "{ch}")?;
258        }
259        Ok(())
260    }
261}
262
263// This method is used by the two [`LineProtocolBuilder::field`] variants in order to render the
264// `key=value` encoding of a field.
265fn format_field<'a, F>(field_key: &'a str, field_value: &'a F) -> impl fmt::Display + 'a
266where
267    F: FieldValue,
268{
269    FormattedField {
270        field_key,
271        field_value,
272    }
273}
274
275struct FormattedField<'a, F>
276where
277    F: FieldValue,
278{
279    field_key: &'a str,
280    field_value: &'a F,
281}
282
283impl<'a, F> fmt::Display for FormattedField<'a, F>
284where
285    F: FieldValue,
286{
287    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
288        write!(f, "{}=", escape(self.field_key, COMMA_EQ_SPACE))?;
289        self.field_value.fmt(f)
290    }
291}
292
293/// The [`FieldValue`] trait is implemented by the legal [line protocol types].
294///
295/// [line protocol types]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#data-types-and-format
296pub trait FieldValue {
297    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result;
298}
299
300impl FieldValue for &str {
301    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
302        write!(f, "\"{}\"", escape(self, DOUBLE_QUOTE))
303    }
304}
305
306impl FieldValue for f64 {
307    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
308        write!(f, "{self}")
309    }
310}
311
312impl FieldValue for bool {
313    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
314        write!(f, "{self}")
315    }
316}
317
318impl FieldValue for i64 {
319    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
320        write!(f, "{self}i")
321    }
322}
323
324impl FieldValue for u64 {
325    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
326        write!(f, "{self}u")
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use crate::{parse_lines, FieldSet, ParsedLine};
333
334    use super::*;
335
336    #[test]
337    fn test_string_escape() {
338        assert_eq!(
339            format!("\"{}\"", escape(r#"foo"#, DOUBLE_QUOTE)),
340            r#""foo""#
341        );
342        assert_eq!(
343            format!("\"{}\"", escape(r"foo \ bar", DOUBLE_QUOTE)),
344            r#""foo \\ bar""#
345        );
346        assert_eq!(
347            format!("\"{}\"", escape(r#"foo " bar"#, DOUBLE_QUOTE)),
348            r#""foo \" bar""#
349        );
350        assert_eq!(
351            format!("\"{}\"", escape(r#"foo \" bar"#, DOUBLE_QUOTE)),
352            r#""foo \\\" bar""#
353        );
354    }
355
356    #[test]
357    fn test_lp_builder() {
358        const PLAIN: &str = "plain";
359        const WITH_SPACE: &str = "with space";
360        const WITH_COMMA: &str = "with,comma";
361        const WITH_EQ: &str = "with=eq";
362        const WITH_DOUBLE_QUOTE: &str = r#"with"doublequote"#;
363        const WITH_SINGLE_QUOTE: &str = "with'singlequote";
364        const WITH_BACKSLASH: &str = r"with\ backslash";
365
366        let builder = LineProtocolBuilder::new()
367            // line 0
368            .measurement("tag_keys")
369            .tag(PLAIN, "dummy")
370            .tag(WITH_SPACE, "dummy")
371            .tag(WITH_COMMA, "dummy")
372            .tag(WITH_EQ, "dummy")
373            .tag(WITH_DOUBLE_QUOTE, "dummy")
374            .tag(WITH_SINGLE_QUOTE, "dummy")
375            .tag(WITH_BACKSLASH, "dummy")
376            .field("dummy", true)
377            .close_line()
378            // line 1
379            .measurement("tag_values")
380            .tag("plain", PLAIN)
381            .tag("withspace", WITH_SPACE)
382            .tag("withcomma", WITH_COMMA)
383            .tag("witheq", WITH_EQ)
384            .tag("withdoublequote", WITH_DOUBLE_QUOTE)
385            .tag("withsinglaquote", WITH_SINGLE_QUOTE)
386            .tag("withbackslash", WITH_BACKSLASH)
387            .field("dummy", true)
388            .close_line()
389            // line 2
390            .measurement("field keys")
391            .field(PLAIN, true)
392            .field(WITH_SPACE, true)
393            .field(WITH_COMMA, true)
394            .field(WITH_EQ, true)
395            .field(WITH_DOUBLE_QUOTE, true)
396            .field(WITH_SINGLE_QUOTE, true)
397            .field(WITH_BACKSLASH, true)
398            .close_line()
399            // line3
400            .measurement("field values")
401            .field("mybool", false)
402            .field("mysigned", 51_i64)
403            .field("myunsigned", 51_u64)
404            .field("myfloat", 51.0)
405            .field("mystring", "some value")
406            .field("mystringwithquotes", "some \" value")
407            .close_line()
408            // line 4
409            .measurement(PLAIN)
410            .field("dummy", true)
411            .close_line()
412            // line 5
413            .measurement(WITH_SPACE)
414            .field("dummy", true)
415            .close_line()
416            // line 6
417            .measurement(WITH_COMMA)
418            .field("dummy", true)
419            .close_line()
420            // line 7
421            .measurement(WITH_EQ)
422            .field("dummy", true)
423            .close_line()
424            // line 8
425            .measurement(WITH_DOUBLE_QUOTE)
426            .field("dummy", true)
427            .close_line()
428            // line 9
429            .measurement(WITH_SINGLE_QUOTE)
430            .field("dummy", true)
431            .close_line()
432            // line 10
433            .measurement(WITH_BACKSLASH)
434            .field("dummy", true)
435            .close_line()
436            // line 11
437            .measurement("without timestamp")
438            .field("dummy", true)
439            .close_line()
440            // line 12
441            .measurement("with timestamp")
442            .field("dummy", true)
443            .timestamp(1234)
444            .close_line();
445
446        let lp = String::from_utf8(builder.build()).unwrap();
447        println!("-----\n{lp}-----");
448
449        let parsed_lines = parse_lines(&lp)
450            .collect::<Result<Vec<ParsedLine<'_>>, _>>()
451            .unwrap();
452
453        let get_tag_key = |n: usize, f: usize| {
454            format!("{}", parsed_lines[n].series.tag_set.as_ref().unwrap()[f].0)
455        };
456        let row = 0;
457        assert_eq!(get_tag_key(row, 0), PLAIN);
458        assert_eq!(get_tag_key(row, 1), WITH_SPACE);
459        assert_eq!(get_tag_key(row, 2), WITH_COMMA);
460        assert_eq!(get_tag_key(row, 3), WITH_EQ);
461        assert_eq!(get_tag_key(row, 4), WITH_DOUBLE_QUOTE);
462        assert_eq!(get_tag_key(row, 5), WITH_SINGLE_QUOTE);
463        assert_eq!(get_tag_key(row, 6), WITH_BACKSLASH);
464
465        let get_tag_value = |n: usize, f: usize| {
466            format!("{}", parsed_lines[n].series.tag_set.as_ref().unwrap()[f].1)
467        };
468        let row = 1;
469        assert_eq!(get_tag_value(row, 0), PLAIN);
470        assert_eq!(get_tag_value(row, 1), WITH_SPACE);
471        assert_eq!(get_tag_value(row, 2), WITH_COMMA);
472        assert_eq!(get_tag_value(row, 3), WITH_EQ);
473        assert_eq!(get_tag_value(row, 4), WITH_DOUBLE_QUOTE);
474        assert_eq!(get_tag_value(row, 5), WITH_SINGLE_QUOTE);
475        assert_eq!(get_tag_value(row, 6), WITH_BACKSLASH);
476
477        let get_field_key = |n: usize, f: usize| format!("{}", parsed_lines[n].field_set[f].0);
478        let row = 2;
479        assert_eq!(get_field_key(row, 0), PLAIN);
480        assert_eq!(get_field_key(row, 1), WITH_SPACE);
481        assert_eq!(get_field_key(row, 2), WITH_COMMA);
482        assert_eq!(get_field_key(row, 3), WITH_EQ);
483        assert_eq!(get_field_key(row, 4), WITH_DOUBLE_QUOTE);
484        assert_eq!(get_field_key(row, 5), WITH_SINGLE_QUOTE);
485        assert_eq!(get_field_key(row, 6), WITH_BACKSLASH);
486
487        let get_field_value = |n: usize, f: usize| format!("{}", parsed_lines[n].field_set[f].1);
488        let row = 3;
489        assert_eq!(get_field_value(row, 0), "false");
490        assert_eq!(get_field_value(row, 1), "51i");
491        assert_eq!(get_field_value(row, 2), "51u");
492        assert_eq!(get_field_value(row, 3), "51");
493        assert_eq!(get_field_value(row, 4), "some value");
494        // TODO(mkm): file an issue for the parser since it incorrectly decodes an escaped double quote (possibly also the Go version).
495        // assert_eq!(get_field_value(row, 5), "some \" value");
496
497        let get_measurement = |n: usize| format!("{}", parsed_lines[n].series.measurement);
498        assert_eq!(get_measurement(4), PLAIN);
499        assert_eq!(get_measurement(5), WITH_SPACE);
500        assert_eq!(get_measurement(6), WITH_COMMA);
501        assert_eq!(get_measurement(7), WITH_EQ);
502        assert_eq!(get_measurement(8), WITH_DOUBLE_QUOTE);
503        assert_eq!(get_measurement(9), WITH_SINGLE_QUOTE);
504        assert_eq!(get_measurement(10), WITH_BACKSLASH);
505
506        let get_timestamp = |n: usize| parsed_lines[n].timestamp;
507        assert_eq!(get_timestamp(11), None);
508        assert_eq!(get_timestamp(12), Some(1234));
509    }
510
511    #[test]
512    fn test_float_formatting() {
513        // ensure that my_float is printed in a way that it is parsed
514        // as a float (not an int)
515        let builder = LineProtocolBuilder::new()
516            .measurement("tag_keys")
517            .tag("foo", "bar")
518            .field("my_float", 3.0)
519            .close_line();
520
521        let lp = String::from_utf8(builder.build()).unwrap();
522        println!("-----\n{lp}-----");
523
524        let parsed_lines = parse_lines(&lp)
525            .collect::<Result<Vec<ParsedLine<'_>>, _>>()
526            .unwrap();
527
528        assert_eq!(parsed_lines.len(), 1);
529        let parsed_line = &parsed_lines[0];
530
531        let expected_fields = vec![("my_float".into(), crate::FieldValue::F64(3.0))]
532            .into_iter()
533            .collect::<FieldSet<'_>>();
534
535        assert_eq!(parsed_line.field_set, expected_fields)
536    }
537}