use std::{collections::HashMap, hash::Hash};
use crate::error::{ParseError, Result};
use crate::{
element::{FieldKey, FieldValue, Measurement, TagKey, TagValue},
traits::Format,
LineProtocol,
};
impl LineProtocol {
fn parse_part<P>(chars: &mut P) -> String
where
P: Iterator<Item = char>,
{
let mut in_quote = false;
let mut is_escaped = false;
let mut part = String::new();
while let Some(char) = chars.next() {
tracing::debug!("Char: '{char}', in quote: {in_quote}, in escaped: {is_escaped}");
part.push(char);
if char == '\\' {
is_escaped = true;
}
else if char == '"' && !is_escaped {
in_quote = !in_quote;
} else if char == ' ' && (!is_escaped && !in_quote) {
break;
} else {
is_escaped = false;
}
}
part.trim().to_string()
}
fn parse_set<K, V>(set: &str) -> Result<HashMap<K, V>>
where
K: Format + From<String> + Hash + PartialEq + Eq,
V: Format + From<String>,
{
let mut in_quote = false;
let mut is_escaped = false;
let mut word = String::new();
let mut words = Vec::new();
for char in set.chars() {
tracing::debug!("Char: '{char}', in quote: {in_quote}, in escaped: {is_escaped}");
if char == '\\' {
is_escaped = true;
word.push(char);
}
else if char == '"' && !is_escaped {
in_quote = !in_quote;
word.push(char);
}
else if (char == '=' && !is_escaped) || (char == ',' && !in_quote) {
words.push(word.clone());
word.clear();
continue;
} else {
is_escaped = false;
word.push(char);
}
}
words.push(word);
if words.len() % 2 != 0 {
return Err(
ParseError::InvalidSet("set contains uneven amount of values".into()).into(),
);
}
let unescape: HashMap<K, V> = words
.chunks_exact(2)
.map(|c| {
let key: K = c[0].clone().into();
let value: V = c[1].clone().into();
(key.unescape().into(), value.unescape().into())
})
.collect();
Ok(unescape)
}
pub fn parse_line(line: &str) -> Result<Self> {
if line.starts_with("#") {
return Err(ParseError::CommentLine.into());
}
if line.is_empty() {
return Err(ParseError::EmptyLine.into());
}
let mut chars = line.chars();
let identifiers = LineProtocol::parse_part(&mut chars);
tracing::debug!("Identifiers: {identifiers}");
let (measurement, tags) = match identifiers.split_once(",") {
Some((measurement, tag_set)) => {
let tags = LineProtocol::parse_set::<TagKey, TagValue>(tag_set)?;
(measurement.to_string(), Some(tags))
}
None => (identifiers, None),
};
let field_set = LineProtocol::parse_part(&mut chars);
tracing::debug!("Field set: {field_set}");
if field_set.is_empty() {
return Err(ParseError::MissingFields.into());
}
let fields = LineProtocol::parse_set::<FieldKey, FieldValue>(&field_set)?;
let timestamp = chars.collect::<String>();
let timestamp = match !timestamp.is_empty() {
true => {
let timestamp = match timestamp.parse::<i64>() {
Ok(timestamp) => timestamp,
Err(_) => return Err(ParseError::InvalidTimestamp.into()),
};
Some(timestamp)
}
false => None,
};
tracing::debug!("Timestamp: {timestamp:#?}");
let line_protocol = Self {
measurement: Measurement::from(measurement),
tags,
fields,
timestamp,
};
Ok(line_protocol)
}
pub fn parse_lines(lines: &str) -> Result<Vec<Self>> {
let mut parsed_lines: Vec<LineProtocol> = Vec::new();
for line in lines.lines() {
if line.starts_with("#") {
continue;
}
let parsed_line = LineProtocol::parse_line(line)?;
match parsed_lines.iter_mut().find(|l| **l == parsed_line) {
Some(lp) => lp.fields.extend(parsed_line.fields),
None => parsed_lines.push(parsed_line),
}
}
Ok(parsed_lines)
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_line_protocol_parser_ok() {
let line = "measurement,tag2=value,tag=value field=\"value\",field2=\"{\\\"test\\\": \
\\\"hello\\\"}\" 1729270461612452700";
let result = LineProtocol::parse_line(line);
assert!(result.is_ok())
}
#[test]
fn test_line_protocol_parser_lines() {
let lines = vec![
"measurement,tag=value field=\"value\",field2=true",
"measurement field=\"{\\\"test\\\": \\\"hello\\\"}\"",
"measurement,tag2=value,tag=value field=\"value\",field2=\"{\\\"test\\\": \
\\\"hello\\\"}\" 1729270461612452700",
]
.join("\n");
let result = LineProtocol::parse_lines(&lines);
assert!(result.is_ok());
}
}