use super::types::PayloadFormat;
use crate::error::{StreamError, StreamResult};
use std::collections::HashMap;
pub struct PayloadParser;
impl PayloadParser {
pub fn new() -> Self {
Self
}
pub fn parse(
&self,
payload: &[u8],
format: &PayloadFormat,
) -> StreamResult<HashMap<String, serde_json::Value>> {
match format {
PayloadFormat::Json {
schema: _,
root_path,
} => self.parse_json(payload, root_path.as_deref()),
PayloadFormat::SparkplugB { namespace } => self.parse_sparkplug(payload, namespace),
PayloadFormat::PlainText { datatype } => self.parse_plain_text(payload, datatype),
PayloadFormat::Csv {
delimiter,
headers,
skip_header,
} => self.parse_csv(payload, *delimiter, headers, *skip_header),
PayloadFormat::Raw => self.parse_raw(payload),
_ => Err(StreamError::NotSupported(
"Format not yet implemented".to_string(),
)),
}
}
fn parse_json(
&self,
payload: &[u8],
root_path: Option<&str>,
) -> StreamResult<HashMap<String, serde_json::Value>> {
let value: serde_json::Value = serde_json::from_slice(payload)
.map_err(|e| StreamError::Deserialization(format!("JSON parse error: {}", e)))?;
let value = if let Some(path) = root_path {
self.extract_json_path(&value, path)?
} else {
value
};
if let serde_json::Value::Object(obj) = value {
Ok(obj.into_iter().collect())
} else {
Err(StreamError::Deserialization(
"JSON root is not an object".to_string(),
))
}
}
fn parse_sparkplug(
&self,
payload: &[u8],
namespace: &str,
) -> StreamResult<HashMap<String, serde_json::Value>> {
#[cfg(feature = "sparkplug")]
{
self.parse_sparkplug_impl(payload, namespace)
}
#[cfg(not(feature = "sparkplug"))]
{
Err(StreamError::NotSupported(
"Sparkplug B support not enabled. Enable 'sparkplug' feature".to_string(),
))
}
}
#[cfg(feature = "sparkplug")]
fn parse_sparkplug_impl(
&self,
payload: &[u8],
_namespace: &str,
) -> StreamResult<HashMap<String, serde_json::Value>> {
use super::sparkplug_b;
use prost::Message;
let sparkplug_payload = sparkplug_b::Payload::decode(payload).map_err(|e| {
StreamError::Deserialization(format!("Sparkplug B decode error: {}", e))
})?;
let mut result = HashMap::new();
if let Some(timestamp) = sparkplug_payload.timestamp {
result.insert("timestamp".to_string(), serde_json::json!(timestamp));
}
if let Some(seq) = sparkplug_payload.seq {
result.insert("seq".to_string(), serde_json::json!(seq));
}
for metric in sparkplug_payload.metrics {
if let Some(ref name) = metric.name {
let value = self.extract_sparkplug_metric_value(&metric);
result.insert(name.clone(), value);
}
}
Ok(result)
}
#[cfg(feature = "sparkplug")]
fn extract_sparkplug_metric_value(
&self,
metric: &super::sparkplug_b::Metric,
) -> serde_json::Value {
use super::sparkplug_b::metric::Value;
if let Some(value) = &metric.value {
match value {
Value::IntValue(v) => serde_json::json!(v),
Value::LongValue(v) => serde_json::json!(v),
Value::FloatValue(v) => serde_json::json!(v),
Value::DoubleValue(v) => serde_json::json!(v),
Value::BooleanValue(v) => serde_json::json!(v),
Value::StringValue(v) => serde_json::json!(v),
_ => serde_json::Value::Null,
}
} else {
serde_json::Value::Null
}
}
fn parse_plain_text(
&self,
payload: &[u8],
_datatype: &str,
) -> StreamResult<HashMap<String, serde_json::Value>> {
let text = String::from_utf8(payload.to_vec())
.map_err(|e| StreamError::Deserialization(format!("UTF-8 decode error: {}", e)))?;
let mut result = HashMap::new();
result.insert("value".to_string(), serde_json::json!(text));
Ok(result)
}
fn parse_csv(
&self,
payload: &[u8],
delimiter: char,
headers: &[String],
skip_header: bool,
) -> StreamResult<HashMap<String, serde_json::Value>> {
let text = String::from_utf8(payload.to_vec())
.map_err(|e| StreamError::Deserialization(format!("UTF-8 decode error: {}", e)))?;
let lines: Vec<&str> = text.lines().collect();
if lines.is_empty() {
return Ok(HashMap::new());
}
let start_line = if skip_header { 1 } else { 0 };
if start_line >= lines.len() {
return Ok(HashMap::new());
}
let values: Vec<&str> = lines[start_line].split(delimiter).collect();
let mut result = HashMap::new();
for (i, value) in values.iter().enumerate() {
let key = headers
.get(i)
.map(|s: &String| s.as_str())
.unwrap_or("field");
let trimmed: &str = value.trim();
result.insert(key.to_string(), serde_json::json!(trimmed));
}
Ok(result)
}
fn parse_raw(&self, payload: &[u8]) -> StreamResult<HashMap<String, serde_json::Value>> {
use base64::{engine::general_purpose, Engine as _};
let encoded = general_purpose::STANDARD.encode(payload);
let mut result = HashMap::new();
result.insert("data".to_string(), serde_json::json!(encoded));
result.insert("size".to_string(), serde_json::json!(payload.len()));
Ok(result)
}
fn extract_json_path(
&self,
value: &serde_json::Value,
path: &str,
) -> StreamResult<serde_json::Value> {
let parts: Vec<&str> = path.split('.').collect();
let mut current = value;
for part in parts {
current = current.get(part).ok_or_else(|| {
StreamError::Deserialization(format!("Path {} not found in JSON", path))
})?;
}
Ok(current.clone())
}
}
impl Default for PayloadParser {
fn default() -> Self {
Self::new()
}
}
pub struct ParsedPayload {
pub fields: HashMap<String, serde_json::Value>,
pub size: usize,
pub parsed_at: chrono::DateTime<chrono::Utc>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_json() {
let parser = PayloadParser::new();
let payload = br#"{"temperature": 25.5, "humidity": 60}"#;
let result = parser
.parse(
payload,
&PayloadFormat::Json {
schema: None,
root_path: None,
},
)
.unwrap();
assert_eq!(result.get("temperature").unwrap().as_f64().unwrap(), 25.5);
assert_eq!(result.get("humidity").unwrap().as_i64().unwrap(), 60);
}
#[test]
fn test_parse_plain_text() {
let parser = PayloadParser::new();
let payload = b"Hello, MQTT!";
let result = parser
.parse(
payload,
&PayloadFormat::PlainText {
datatype: "xsd:string".to_string(),
},
)
.unwrap();
assert_eq!(
result.get("value").unwrap().as_str().unwrap(),
"Hello, MQTT!"
);
}
#[test]
fn test_parse_csv() {
let parser = PayloadParser::new();
let payload = b"sensor1,25.5,60";
let result = parser
.parse(
payload,
&PayloadFormat::Csv {
delimiter: ',',
headers: vec![
"device".to_string(),
"temp".to_string(),
"humidity".to_string(),
],
skip_header: false,
},
)
.unwrap();
assert_eq!(result.get("device").unwrap().as_str().unwrap(), "sensor1");
assert_eq!(result.get("temp").unwrap().as_str().unwrap(), "25.5");
}
#[test]
fn test_extract_json_path() {
let parser = PayloadParser::new();
let json = serde_json::json!({
"device": {
"sensor": {
"temperature": 25.5
}
}
});
let result = parser
.extract_json_path(&json, "device.sensor.temperature")
.unwrap();
assert_eq!(result.as_f64().unwrap(), 25.5);
}
}