Skip to main content

influxlp_tools/
parser.rs

1//! The parser methods consist of three main methods used for parsing line(s)
2//! 1. [LineProtocol::parse_line]
3//!     - Parse a single line protocol line into the [LineProtocol] struct
4//! 2. [LineProtocol::parse_lines]
5//!     - Parse multiple lines seperated by a newline into a vector of
6//!       [LineProtocol] structs
7//! 3. [LineProtocol::parse_vec]
8//!     - Parse multiple lines stored in a vector into a vector of
9//!       [LineProtocol] structs
10
11use std::{collections::HashMap, hash::Hash};
12
13use crate::error::{ParseError, Result};
14
15use crate::{
16    element::{FieldKey, FieldValue, Measurement, TagKey, TagValue},
17    traits::{Convert, Format},
18    LineProtocol,
19};
20
21impl LineProtocol {
22    /// Split a line protocol part from the rest of the line protocol
23    fn parse_part<P>(chars: &mut P) -> String
24    where
25        P: Iterator<Item = char>,
26    {
27        let mut in_quote = false;
28        let mut is_escaped = false;
29
30        // Parse the measurement name
31        let mut part = String::new();
32        while let Some(char) = chars.next() {
33            // If the current character is a \ (slash) then we know the next character must
34            // be escaped
35            if char == '\\' {
36                is_escaped = true;
37            }
38            // Toggle the `in_quote` flag if the current character is a double quote and the
39            // previous character was not an escape character
40            else if char == '"' && !is_escaped {
41                in_quote = !in_quote;
42            // If the current character is a ' ' (space) and we are not in a
43            // quote or its not escaped we've finished a part
44            } else if char == ' ' && (!is_escaped && !in_quote) {
45                break;
46            } else {
47                // We've gone past the escaped character
48                is_escaped = false;
49            }
50
51            part.push(char);
52        }
53
54        part.trim().to_string()
55    }
56
57    /// Parses a set (tag- or field set) into a hashmap of the defined key-value
58    /// types
59    fn parse_set<K, V>(set: &str) -> Result<HashMap<K, V>>
60    where
61        K: Format + Convert + Hash + PartialEq + Eq,
62        V: Format + Convert,
63    {
64        let mut in_quote = false;
65        let mut is_escaped = false;
66
67        let mut word = String::new();
68        let mut words = Vec::new();
69        for char in set.chars() {
70            // If the current character is a \ (slash) then we know the next character must
71            // be escaped
72            if char == '\\' {
73                is_escaped = true;
74                word.push(char);
75            }
76            // We toggle the `in_quote` flag if the current character is a double quote and the
77            // previous character was not an escape character
78            else if char == '"' && !is_escaped {
79                in_quote = !in_quote;
80                word.push(char);
81            }
82            // If the current character is a `=` (equals sign) and its not escaped we've finished a
83            // word or if the current character is a `,` (comma) and we are not in a quote we've
84            // finished a word
85            else if (char == '=' && !is_escaped) || (char == ',' && !in_quote) {
86                words.push(word.clone());
87                word.clear();
88                continue;
89            } else {
90                // We've gone past the escaped character
91                is_escaped = false;
92                word.push(char);
93            }
94        }
95
96        // Push whatever is left
97        if word.is_empty() {
98            return Err(
99                ParseError::InvalidSet("set contains uneven amount of values".into()).into(),
100            );
101        }
102        words.push(word);
103
104        // If we don't have an even number of words the given set is invalid
105        if words.len() % 2 != 0 {
106            return Err(
107                ParseError::InvalidSet("set contains uneven amount of values".into()).into(),
108            );
109        }
110
111        // Transform to a hashmap and unescape words
112        let mut set = HashMap::new();
113        for word in words.chunks_exact(2) {
114            // Only FieldValue can actually return an error
115            let key = K::parse_from(&word[0]).map_err(|e| ParseError::InvalidSet(e.into()))?;
116            let value = V::parse_from(&word[1]).map_err(|e| ParseError::InvalidSet(e.into()))?;
117
118            set.insert(key.unescape(), value.unescape());
119        }
120
121        Ok(set)
122    }
123
124    /// Parses the identifier (measurement and tag set)
125    fn parse_identifiers(
126        input: String,
127    ) -> Result<(Measurement, Option<HashMap<TagKey, TagValue>>)> {
128        let mut chars = input.chars();
129        let mut is_escaped = false;
130
131        let mut measurement = String::new();
132        while let Some(char) = chars.next() {
133            // If the current character is a \ (slash) then we know the next character must
134            // be escaped
135            if char == '\\' {
136                is_escaped = true;
137            } else if char == ',' && !is_escaped {
138                break;
139            } else {
140                is_escaped = false;
141            }
142
143            measurement.push(char);
144        }
145
146        if measurement.is_empty() {
147            return Err(ParseError::MissingMeasurement.into());
148        }
149        let measurement = Measurement::from(measurement).unescape();
150
151        let tag_set = chars.collect::<String>();
152        let tags = match !tag_set.is_empty() {
153            true => Some(LineProtocol::parse_set::<TagKey, TagValue>(&tag_set)?),
154            false => None,
155        };
156
157        Ok((measurement, tags))
158    }
159
160    /// Parse a single line protocol line into the [LineProtocol] struct
161    ///
162    /// Allows for modifying the line protocol by adding or removing fields/tags
163    /// and rebuilding
164    ///
165    /// # Example
166    /// ```rust
167    /// let line = "measurement,tag=value field=true 1729270461612452700"
168    /// let parsed_line = LineProtocol::parse_line(line).unwrap();
169    ///
170    /// parsed_line.delete_tag("tag");
171    /// parsed_line.add_field("field2", "hello");
172    /// parsed_line.with_timestamp(1729270461612452800i64)
173    ///
174    /// let line = parsed_line.build().unwrap();
175    /// // Output: measurement field=true,field2="hello" 1729270461612452800
176    /// ```
177    ///
178    /// # Args
179    /// * `line` - A InfluxDB line protocol line
180    pub fn parse_line(line: &str) -> Result<Self> {
181        // Trim away leading and trailing whitespace
182        let line = line.trim();
183
184        // Comment line
185        if line.starts_with("#") {
186            return Err(ParseError::CommentLine.into());
187        }
188
189        // Can't parse empty lines
190        if line.is_empty() {
191            return Err(ParseError::EmptyLine.into());
192        }
193
194        let mut chars = line.chars();
195
196        // Parse measurement and tags
197        let identifiers = LineProtocol::parse_part(&mut chars);
198        let (measurement, tags) = LineProtocol::parse_identifiers(identifiers)?;
199
200        // Parse field set
201        let field_set = LineProtocol::parse_part(&mut chars);
202        if field_set.is_empty() {
203            return Err(ParseError::MissingFields.into());
204        }
205
206        let fields = LineProtocol::parse_set::<FieldKey, FieldValue>(&field_set)?;
207
208        // Timestamp is the only part remaining
209        let timestamp = chars.collect::<String>();
210        let timestamp = match !timestamp.is_empty() {
211            true => {
212                let timestamp = match timestamp.parse::<i64>() {
213                    Ok(timestamp) => timestamp,
214                    Err(_) => return Err(ParseError::InvalidTimestamp.into()),
215                };
216                Some(timestamp)
217            }
218            false => None,
219        };
220
221        let line_protocol = Self {
222            measurement: Measurement::from(measurement),
223            tags,
224            fields,
225            timestamp,
226        };
227        Ok(line_protocol)
228    }
229
230    /// Parse a vector of lines
231    ///
232    /// Empty lines and comment lines are silently ignored
233    ///
234    /// # Example
235    /// ```rust
236    /// let lines = vec![
237    ///     "measurement,tag=value field=\"value\"",
238    ///     "measurement,tag=value field=true 1729270461612452700",
239    ///     ...
240    /// ];
241    ///
242    /// let parsed = LineProtocol::parse_vec(lines).unwrap();
243    /// ```
244    ///
245    /// # Args
246    /// * `lines` - An array of InfluxDB line protocol lines
247    pub fn parse_vec(lines: Vec<&str>) -> Result<Vec<Self>> {
248        let mut parsed_lines: Vec<LineProtocol> = Vec::new();
249        for line in lines {
250            // Ignore comment lines
251            if line.starts_with("#") {
252                continue;
253            }
254
255            // Ignore empty lines
256            if line.is_empty() {
257                continue;
258            }
259
260            // If the line protocol has been parsed earlier but is a duplicate we just add
261            // the fields value to the original but favor the latter
262            let parsed_line = LineProtocol::parse_line(line)?;
263            match parsed_lines.iter_mut().find(|l| **l == parsed_line) {
264                Some(lp) => lp.fields.extend(parsed_line.fields),
265                None => parsed_lines.push(parsed_line),
266            }
267        }
268
269        Ok(parsed_lines)
270    }
271
272    /// Parse multiple lines seprated by a newline (\n)
273    ///
274    /// Empty lines and comment lines are silently ignored
275    ///
276    /// # Example
277    /// ```rust
278    /// let lines = vec![
279    ///     "measurement,tag=value field=\"value\"",
280    ///     "measurement,tag=value field=true 1729270461612452700",
281    ///     ...
282    /// ];
283    ///
284    /// let parsed = LineProtocol::parse_lines(lines.join("\n")).unwrap();
285    /// ```
286    ///
287    /// # Args
288    /// * `lines` - Multiple InfluxDB line protocol lines seperated by a newline
289    pub fn parse_lines(lines: &str) -> Result<Vec<Self>> {
290        let parsed_lines = LineProtocol::parse_vec(lines.lines().collect())?;
291        Ok(parsed_lines)
292    }
293}
294
295#[cfg(test)]
296mod test {
297    use super::*;
298
299    #[test]
300    fn test_parser_valid_missing_tags() {
301        let line = "measurement field=\"value\" 1729270461612452700";
302        let result = LineProtocol::parse_line(&line);
303        assert!(result.is_ok());
304
305        let parsed = result.unwrap();
306        let expected = LineProtocol::new("measurement")
307            .add_field("field", "value")
308            .with_timestamp(1729270461612452700i64);
309        assert_eq!(parsed, expected)
310    }
311
312    #[test]
313    fn test_parser_valid_missing_timestamp() {
314        let line = "measurement,tag=value field=\"value\"";
315        let result = LineProtocol::parse_line(&line);
316        assert!(result.is_ok());
317
318        let parsed = result.unwrap();
319        let expected = LineProtocol::new("measurement")
320            .add_tag("tag", "value")
321            .add_field("field", "value");
322        assert_eq!(parsed, expected)
323    }
324
325    #[test]
326    fn test_parser_valid() {
327        let line = "measurement,tag1=value,tag2=value field1=\"value\",field2=\"{\\\"foo\\\": \
328                    \\\"bar\\\"}\",field3=\"[\\\"hello\\\", \
329                    \\\"world\\\"]\",field4=true,field5=10,field6=10i,field7=0.5 \
330                    1729270461612452700";
331        let result = LineProtocol::parse_line(&line);
332        assert!(result.is_ok());
333
334        let parsed = result.unwrap();
335        let expected = LineProtocol::new("measurement")
336            .add_tag("tag1", "value")
337            .add_tag("tag2", "value")
338            .add_field("field", "value")
339            .add_field("field2", "{\"foo\": \"bar\"}")
340            .add_field("field3", "[\"hello\", \"world\"]")
341            .add_field("field4", true)
342            .add_field("field5", 10.0)
343            .add_field("field6", 10)
344            .add_field("field7", 0.5)
345            .with_timestamp(1729270461612452700i64);
346        assert_eq!(parsed, expected)
347    }
348
349    #[test]
350    fn test_parser_comment_line_is_err() {
351        let line = "# this is a comment line";
352        let result = LineProtocol::parse_line(&line);
353        assert!(result.is_err())
354    }
355
356    #[test]
357    fn test_parser_empty_line_is_err() {
358        let line = "";
359        let result = LineProtocol::parse_line(&line);
360        assert!(result.is_err())
361    }
362
363    #[test]
364    fn test_parser_missing_measurement_is_err() {
365        let line = ",tag=value field=\"value\"";
366        let result = LineProtocol::parse_line(&line);
367        assert!(result.is_err())
368    }
369
370    #[test]
371    fn test_parser_missing_field_set_is_err() {
372        let line = "measurement,tag=value 1729270461612452800";
373        let result = LineProtocol::parse_line(&line);
374        assert!(result.is_err())
375    }
376
377    #[test]
378    fn test_parser_missing_uneven_tag_set_is_err() {
379        let line = "measurement,tag= 1729270461612452800";
380        let result = LineProtocol::parse_line(&line);
381        assert!(result.is_err())
382    }
383
384    #[test]
385    fn test_parser_missing_uneven_field_set_is_err() {
386        let line = "measurement field= 1729270461612452800";
387        let result = LineProtocol::parse_line(&line);
388        assert!(result.is_err())
389    }
390
391    #[test]
392    fn test_parser_missing_invalid_timestamp_is_err() {
393        let line = "measurement field=\"value\" timestamp";
394        let result = LineProtocol::parse_line(&line);
395        assert!(result.is_err())
396    }
397}