1use std::{collections::HashMap, hash::Hash};
12
13use crate::error::{ParseError, Result};
14
15use crate::{
16 element::{FieldKey, FieldValue, Measurement, TagKey, TagValue},
17 traits::{Convert, Format},
18 LineProtocol,
19};
20
21impl LineProtocol {
22 fn parse_part<P>(chars: &mut P) -> String
24 where
25 P: Iterator<Item = char>,
26 {
27 let mut in_quote = false;
28 let mut is_escaped = false;
29
30 let mut part = String::new();
32 while let Some(char) = chars.next() {
33 if char == '\\' {
36 is_escaped = true;
37 }
38 else if char == '"' && !is_escaped {
41 in_quote = !in_quote;
42 } else if char == ' ' && (!is_escaped && !in_quote) {
45 break;
46 } else {
47 is_escaped = false;
49 }
50
51 part.push(char);
52 }
53
54 part.trim().to_string()
55 }
56
57 fn parse_set<K, V>(set: &str) -> Result<HashMap<K, V>>
60 where
61 K: Format + Convert + Hash + PartialEq + Eq,
62 V: Format + Convert,
63 {
64 let mut in_quote = false;
65 let mut is_escaped = false;
66
67 let mut word = String::new();
68 let mut words = Vec::new();
69 for char in set.chars() {
70 if char == '\\' {
73 is_escaped = true;
74 word.push(char);
75 }
76 else if char == '"' && !is_escaped {
79 in_quote = !in_quote;
80 word.push(char);
81 }
82 else if (char == '=' && !is_escaped) || (char == ',' && !in_quote) {
86 words.push(word.clone());
87 word.clear();
88 continue;
89 } else {
90 is_escaped = false;
92 word.push(char);
93 }
94 }
95
96 if word.is_empty() {
98 return Err(
99 ParseError::InvalidSet("set contains uneven amount of values".into()).into(),
100 );
101 }
102 words.push(word);
103
104 if words.len() % 2 != 0 {
106 return Err(
107 ParseError::InvalidSet("set contains uneven amount of values".into()).into(),
108 );
109 }
110
111 let mut set = HashMap::new();
113 for word in words.chunks_exact(2) {
114 let key = K::parse_from(&word[0]).map_err(|e| ParseError::InvalidSet(e.into()))?;
116 let value = V::parse_from(&word[1]).map_err(|e| ParseError::InvalidSet(e.into()))?;
117
118 set.insert(key.unescape(), value.unescape());
119 }
120
121 Ok(set)
122 }
123
124 fn parse_identifiers(
126 input: String,
127 ) -> Result<(Measurement, Option<HashMap<TagKey, TagValue>>)> {
128 let mut chars = input.chars();
129 let mut is_escaped = false;
130
131 let mut measurement = String::new();
132 while let Some(char) = chars.next() {
133 if char == '\\' {
136 is_escaped = true;
137 } else if char == ',' && !is_escaped {
138 break;
139 } else {
140 is_escaped = false;
141 }
142
143 measurement.push(char);
144 }
145
146 if measurement.is_empty() {
147 return Err(ParseError::MissingMeasurement.into());
148 }
149 let measurement = Measurement::from(measurement).unescape();
150
151 let tag_set = chars.collect::<String>();
152 let tags = match !tag_set.is_empty() {
153 true => Some(LineProtocol::parse_set::<TagKey, TagValue>(&tag_set)?),
154 false => None,
155 };
156
157 Ok((measurement, tags))
158 }
159
160 pub fn parse_line(line: &str) -> Result<Self> {
181 let line = line.trim();
183
184 if line.starts_with("#") {
186 return Err(ParseError::CommentLine.into());
187 }
188
189 if line.is_empty() {
191 return Err(ParseError::EmptyLine.into());
192 }
193
194 let mut chars = line.chars();
195
196 let identifiers = LineProtocol::parse_part(&mut chars);
198 let (measurement, tags) = LineProtocol::parse_identifiers(identifiers)?;
199
200 let field_set = LineProtocol::parse_part(&mut chars);
202 if field_set.is_empty() {
203 return Err(ParseError::MissingFields.into());
204 }
205
206 let fields = LineProtocol::parse_set::<FieldKey, FieldValue>(&field_set)?;
207
208 let timestamp = chars.collect::<String>();
210 let timestamp = match !timestamp.is_empty() {
211 true => {
212 let timestamp = match timestamp.parse::<i64>() {
213 Ok(timestamp) => timestamp,
214 Err(_) => return Err(ParseError::InvalidTimestamp.into()),
215 };
216 Some(timestamp)
217 }
218 false => None,
219 };
220
221 let line_protocol = Self {
222 measurement: Measurement::from(measurement),
223 tags,
224 fields,
225 timestamp,
226 };
227 Ok(line_protocol)
228 }
229
230 pub fn parse_vec(lines: Vec<&str>) -> Result<Vec<Self>> {
248 let mut parsed_lines: Vec<LineProtocol> = Vec::new();
249 for line in lines {
250 if line.starts_with("#") {
252 continue;
253 }
254
255 if line.is_empty() {
257 continue;
258 }
259
260 let parsed_line = LineProtocol::parse_line(line)?;
263 match parsed_lines.iter_mut().find(|l| **l == parsed_line) {
264 Some(lp) => lp.fields.extend(parsed_line.fields),
265 None => parsed_lines.push(parsed_line),
266 }
267 }
268
269 Ok(parsed_lines)
270 }
271
272 pub fn parse_lines(lines: &str) -> Result<Vec<Self>> {
290 let parsed_lines = LineProtocol::parse_vec(lines.lines().collect())?;
291 Ok(parsed_lines)
292 }
293}
294
295#[cfg(test)]
296mod test {
297 use super::*;
298
299 #[test]
300 fn test_parser_valid_missing_tags() {
301 let line = "measurement field=\"value\" 1729270461612452700";
302 let result = LineProtocol::parse_line(&line);
303 assert!(result.is_ok());
304
305 let parsed = result.unwrap();
306 let expected = LineProtocol::new("measurement")
307 .add_field("field", "value")
308 .with_timestamp(1729270461612452700i64);
309 assert_eq!(parsed, expected)
310 }
311
312 #[test]
313 fn test_parser_valid_missing_timestamp() {
314 let line = "measurement,tag=value field=\"value\"";
315 let result = LineProtocol::parse_line(&line);
316 assert!(result.is_ok());
317
318 let parsed = result.unwrap();
319 let expected = LineProtocol::new("measurement")
320 .add_tag("tag", "value")
321 .add_field("field", "value");
322 assert_eq!(parsed, expected)
323 }
324
325 #[test]
326 fn test_parser_valid() {
327 let line = "measurement,tag1=value,tag2=value field1=\"value\",field2=\"{\\\"foo\\\": \
328 \\\"bar\\\"}\",field3=\"[\\\"hello\\\", \
329 \\\"world\\\"]\",field4=true,field5=10,field6=10i,field7=0.5 \
330 1729270461612452700";
331 let result = LineProtocol::parse_line(&line);
332 assert!(result.is_ok());
333
334 let parsed = result.unwrap();
335 let expected = LineProtocol::new("measurement")
336 .add_tag("tag1", "value")
337 .add_tag("tag2", "value")
338 .add_field("field", "value")
339 .add_field("field2", "{\"foo\": \"bar\"}")
340 .add_field("field3", "[\"hello\", \"world\"]")
341 .add_field("field4", true)
342 .add_field("field5", 10.0)
343 .add_field("field6", 10)
344 .add_field("field7", 0.5)
345 .with_timestamp(1729270461612452700i64);
346 assert_eq!(parsed, expected)
347 }
348
349 #[test]
350 fn test_parser_comment_line_is_err() {
351 let line = "# this is a comment line";
352 let result = LineProtocol::parse_line(&line);
353 assert!(result.is_err())
354 }
355
356 #[test]
357 fn test_parser_empty_line_is_err() {
358 let line = "";
359 let result = LineProtocol::parse_line(&line);
360 assert!(result.is_err())
361 }
362
363 #[test]
364 fn test_parser_missing_measurement_is_err() {
365 let line = ",tag=value field=\"value\"";
366 let result = LineProtocol::parse_line(&line);
367 assert!(result.is_err())
368 }
369
370 #[test]
371 fn test_parser_missing_field_set_is_err() {
372 let line = "measurement,tag=value 1729270461612452800";
373 let result = LineProtocol::parse_line(&line);
374 assert!(result.is_err())
375 }
376
377 #[test]
378 fn test_parser_missing_uneven_tag_set_is_err() {
379 let line = "measurement,tag= 1729270461612452800";
380 let result = LineProtocol::parse_line(&line);
381 assert!(result.is_err())
382 }
383
384 #[test]
385 fn test_parser_missing_uneven_field_set_is_err() {
386 let line = "measurement field= 1729270461612452800";
387 let result = LineProtocol::parse_line(&line);
388 assert!(result.is_err())
389 }
390
391 #[test]
392 fn test_parser_missing_invalid_timestamp_is_err() {
393 let line = "measurement field=\"value\" timestamp";
394 let result = LineProtocol::parse_line(&line);
395 assert!(result.is_err())
396 }
397}