use crate::engine::error::{DataflowError, Result};
use crate::engine::message::{Change, Message};
use crate::engine::utils::get_nested_value;
use log::debug;
use serde::Deserialize;
use serde_json::Value;
use std::sync::Arc;
#[derive(Debug, Clone, Deserialize)]
pub struct ParseConfig {
pub source: String,
pub target: String,
}
impl ParseConfig {
pub fn from_json(input: &Value) -> Result<Self> {
let source = input
.get("source")
.and_then(Value::as_str)
.ok_or_else(|| {
DataflowError::Validation("Missing 'source' in parse config".to_string())
})?
.to_string();
let target = input
.get("target")
.and_then(Value::as_str)
.ok_or_else(|| {
DataflowError::Validation("Missing 'target' in parse config".to_string())
})?
.to_string();
Ok(ParseConfig { source, target })
}
fn extract_source(&self, message: &Message) -> Value {
if self.source == "payload" {
(*message.payload).clone()
} else if let Some(path) = self.source.strip_prefix("payload.") {
get_nested_value(&message.payload, path)
.cloned()
.unwrap_or(Value::Null)
} else if let Some(path) = self.source.strip_prefix("data.") {
get_nested_value(message.data(), path)
.cloned()
.unwrap_or(Value::Null)
} else {
get_nested_value(&message.context, &self.source)
.cloned()
.unwrap_or(Value::Null)
}
}
}
pub fn execute_parse_json(
message: &mut Message,
config: &ParseConfig,
) -> Result<(usize, Vec<Change>)> {
debug!(
"ParseJson: Extracting from '{}' to 'data.{}'",
config.source, config.target
);
let source_data = config.extract_source(message);
let source_data = match &source_data {
Value::String(s) => serde_json::from_str(s).unwrap_or(source_data),
_ => source_data,
};
let old_value = message
.data()
.get(&config.target)
.cloned()
.unwrap_or(Value::Null);
if let Some(data_obj) = message.data_mut().as_object_mut() {
data_obj.insert(config.target.clone(), source_data.clone());
}
message.invalidate_context_cache();
debug!(
"ParseJson: Successfully stored data to 'data.{}'",
config.target
);
Ok((
200,
vec![Change {
path: Arc::from(format!("data.{}", config.target)),
old_value: Arc::new(old_value),
new_value: Arc::new(source_data),
}],
))
}
pub fn execute_parse_xml(
message: &mut Message,
config: &ParseConfig,
) -> Result<(usize, Vec<Change>)> {
debug!(
"ParseXml: Extracting from '{}' to 'data.{}'",
config.source, config.target
);
let source_data = config.extract_source(message);
let xml_string = match &source_data {
Value::String(s) => s.clone(),
_ => {
return Err(DataflowError::Validation(format!(
"ParseXml: Source '{}' is not a string",
config.source
)));
}
};
let parsed_json = xml_to_json(&xml_string)?;
let old_value = message
.data()
.get(&config.target)
.cloned()
.unwrap_or(Value::Null);
if let Some(data_obj) = message.data_mut().as_object_mut() {
data_obj.insert(config.target.clone(), parsed_json.clone());
}
message.invalidate_context_cache();
debug!(
"ParseXml: Successfully parsed and stored XML to 'data.{}'",
config.target
);
Ok((
200,
vec![Change {
path: Arc::from(format!("data.{}", config.target)),
old_value: Arc::new(old_value),
new_value: Arc::new(parsed_json),
}],
))
}
fn xml_to_json(xml: &str) -> Result<Value> {
use quick_xml::de::from_str;
let parsed: Value = from_str(xml)
.map_err(|e| DataflowError::Validation(format!("Failed to parse XML: {}", e)))?;
Ok(parsed)
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_parse_config_from_json() {
let input = json!({
"source": "payload",
"target": "input_data"
});
let config = ParseConfig::from_json(&input).unwrap();
assert_eq!(config.source, "payload");
assert_eq!(config.target, "input_data");
}
#[test]
fn test_parse_config_missing_source() {
let input = json!({
"target": "input_data"
});
let result = ParseConfig::from_json(&input);
assert!(result.is_err());
}
#[test]
fn test_parse_config_missing_target() {
let input = json!({
"source": "payload"
});
let result = ParseConfig::from_json(&input);
assert!(result.is_err());
}
#[test]
fn test_execute_parse_json_from_payload() {
let payload = json!({
"name": "John",
"age": 30
});
let mut message = Message::new(Arc::new(payload));
let config = ParseConfig {
source: "payload".to_string(),
target: "input".to_string(),
};
let result = execute_parse_json(&mut message, &config);
assert!(result.is_ok());
let (status, changes) = result.unwrap();
assert_eq!(status, 200);
assert_eq!(changes.len(), 1);
assert_eq!(changes[0].path.as_ref(), "data.input");
assert_eq!(message.data()["input"]["name"], json!("John"));
assert_eq!(message.data()["input"]["age"], json!(30));
}
#[test]
fn test_execute_parse_json_from_nested_payload() {
let payload = json!({
"body": {
"user": {
"name": "Alice"
}
}
});
let mut message = Message::new(Arc::new(payload));
let config = ParseConfig {
source: "payload.body.user".to_string(),
target: "user_data".to_string(),
};
let result = execute_parse_json(&mut message, &config);
assert!(result.is_ok());
let (status, _) = result.unwrap();
assert_eq!(status, 200);
assert_eq!(message.data()["user_data"]["name"], json!("Alice"));
}
#[test]
fn test_execute_parse_json_from_data() {
let mut message = Message::new(Arc::new(json!({})));
message.context["data"] = json!({
"existing": {
"value": 42
}
});
let config = ParseConfig {
source: "data.existing".to_string(),
target: "copied".to_string(),
};
let result = execute_parse_json(&mut message, &config);
assert!(result.is_ok());
assert_eq!(message.data()["copied"]["value"], json!(42));
}
#[test]
fn test_execute_parse_xml_simple() {
let xml_payload = json!("<root><name>John</name><age>30</age></root>");
let mut message = Message::new(Arc::new(xml_payload));
let config = ParseConfig {
source: "payload".to_string(),
target: "parsed".to_string(),
};
let result = execute_parse_xml(&mut message, &config);
assert!(result.is_ok());
let (status, _) = result.unwrap();
assert_eq!(status, 200);
let parsed = &message.data()["parsed"];
assert!(parsed.is_object());
}
#[test]
fn test_execute_parse_xml_not_string() {
let payload = json!({"not": "a string"});
let mut message = Message::new(Arc::new(payload));
let config = ParseConfig {
source: "payload".to_string(),
target: "parsed".to_string(),
};
let result = execute_parse_xml(&mut message, &config);
assert!(result.is_err());
}
#[test]
fn test_xml_to_json_simple() {
let xml = "<root><name>Test</name></root>";
let result = xml_to_json(xml);
assert!(result.is_ok());
let json = result.unwrap();
assert!(json.is_object());
}
#[test]
fn test_xml_to_json_invalid() {
let xml = "<root><unclosed>";
let result = xml_to_json(xml);
assert!(result.is_err());
}
#[test]
fn test_xml_to_json_with_attributes() {
let xml = r#"<person id="123"><name>John</name></person>"#;
let result = xml_to_json(xml);
assert!(result.is_ok());
}
#[test]
fn test_xml_to_json_nested() {
let xml = r#"<root><user><name>Alice</name><email>alice@example.com</email></user></root>"#;
let result = xml_to_json(xml);
assert!(result.is_ok());
let json = result.unwrap();
assert!(json.is_object());
}
#[test]
fn test_execute_parse_json_from_string_payload() {
let payload = Value::String(r#"{"name":"John","age":30}"#.to_string());
let mut message = Message::new(Arc::new(payload));
let config = ParseConfig {
source: "payload".to_string(),
target: "input".to_string(),
};
let result = execute_parse_json(&mut message, &config);
assert!(result.is_ok());
let (status, _) = result.unwrap();
assert_eq!(status, 200);
assert_eq!(message.data()["input"]["name"], json!("John"));
assert_eq!(message.data()["input"]["age"], json!(30));
}
}