use crate::common::ErrorHandling;
use async_trait::async_trait;
use drasi_core::{
interface::{
ElementIndex, MiddlewareError, MiddlewareSetupError, SourceMiddleware,
SourceMiddlewareFactory,
},
models::{Element, ElementValue, SourceChange, SourceMiddlewareConfig},
};
use serde::Deserialize;
use serde_json::Value;
use std::sync::Arc;
#[cfg(test)]
mod tests;
#[derive(Debug, PartialEq, Eq)]
pub enum ParseJsonErrorType {
MissingProperty,
InvalidType,
SizeExceeded,
InvalidInput,
DeepNesting,
ParseError,
ConversionError,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ParseJsonConfig {
pub target_property: String,
pub output_property: Option<String>,
#[serde(default)]
pub on_error: ErrorHandling,
#[serde(default = "default_max_json_size")]
pub max_json_size: usize,
#[serde(default = "default_max_nesting_depth")]
pub max_nesting_depth: usize,
}
fn default_max_json_size() -> usize {
1024 * 1024 }
fn default_max_nesting_depth() -> usize {
20 }
pub struct ParseJson {
name: String,
config: ParseJsonConfig,
}
#[async_trait]
impl SourceMiddleware for ParseJson {
async fn process(
&self,
source_change: SourceChange,
_element_index: &dyn ElementIndex,
) -> Result<Vec<SourceChange>, MiddlewareError> {
match source_change {
SourceChange::Insert { mut element } => match self.parse_property(&mut element) {
Ok(_) => Ok(vec![SourceChange::Insert { element }]),
Err(e) => Err(e),
},
SourceChange::Update { mut element } => match self.parse_property(&mut element) {
Ok(_) => Ok(vec![SourceChange::Update { element }]),
Err(e) => Err(e),
},
SourceChange::Delete { .. } | SourceChange::Future { .. } => Ok(vec![source_change]),
}
}
}
impl ParseJson {
fn get_element_value_type_name(value: &ElementValue) -> &'static str {
match value {
ElementValue::Integer(_) => "Integer",
ElementValue::Float(_) => "Float",
ElementValue::String(_) => "String",
ElementValue::Bool(_) => "Bool",
ElementValue::List(_) => "List",
ElementValue::Object(_) => "Object",
ElementValue::Null => "Null",
}
}
fn handle_error(
&self,
error_type: ParseJsonErrorType,
message: String,
) -> Result<(), MiddlewareError> {
log::warn!("[{}] {}", self.name, message);
if self.config.on_error == ErrorHandling::Fail {
Err(MiddlewareError::SourceChangeError(format!(
"[{}] [Error: {:?}] {}",
self.name, error_type, message
)))
} else {
Ok(())
}
}
fn calculate_nesting_depth(value: &Value) -> usize {
match value {
Value::Object(map) => {
let max_child_depth = map
.values()
.map(Self::calculate_nesting_depth)
.max()
.unwrap_or(0);
1 + max_child_depth
}
Value::Array(arr) => {
let max_child_depth = arr
.iter()
.map(Self::calculate_nesting_depth)
.max()
.unwrap_or(0);
1 + max_child_depth
}
_ => 1, }
}
fn check_for_invalid_characters(
&self,
json_string: &str,
) -> Result<(), (ParseJsonErrorType, String)> {
if json_string
.chars()
.any(|c| c.is_control() && c != '\n' && c != '\r' && c != '\t')
{
return Err((
ParseJsonErrorType::InvalidInput,
"JSON string contains invalid control characters".to_string(),
));
}
Ok(())
}
fn parse_property(&self, element: &mut Element) -> Result<(), MiddlewareError> {
let target_prop_name = &self.config.target_property;
let output_prop_name = self
.config
.output_property
.as_deref()
.unwrap_or(target_prop_name);
let json_string = match element.get_properties().get(target_prop_name) {
Some(ElementValue::String(s)) => s.as_ref(),
Some(other_value) => {
let type_name = Self::get_element_value_type_name(other_value);
return self.handle_error(
ParseJsonErrorType::InvalidType,
format!(
"Target property '{target_prop_name}' is not a String (Type: {type_name}). Cannot parse JSON."
),
);
}
None => {
return self.handle_error(
ParseJsonErrorType::MissingProperty,
format!(
"Target property '{target_prop_name}' not found in element. Cannot parse JSON."
),
);
}
};
if json_string.len() > self.config.max_json_size {
return self.handle_error(
ParseJsonErrorType::SizeExceeded,
format!(
"JSON string in property '{}' exceeds maximum allowed size ({} > {} bytes)",
target_prop_name,
json_string.len(),
self.config.max_json_size
),
);
}
if let Err((error_type, message)) = self.check_for_invalid_characters(json_string) {
return self.handle_error(error_type, message);
}
match serde_json::from_str::<Value>(json_string) {
Ok(parsed_value) => {
let depth = Self::calculate_nesting_depth(&parsed_value);
if depth > self.config.max_nesting_depth {
return self.handle_error(
ParseJsonErrorType::DeepNesting,
format!(
"JSON nesting depth ({}) exceeds maximum allowed depth ({})",
depth, self.config.max_nesting_depth
),
);
}
let element_value = ElementValue::from(&parsed_value);
match element {
Element::Node { properties, .. } | Element::Relation { properties, .. } => {
if output_prop_name != target_prop_name
&& properties.get(output_prop_name).is_some()
{
log::warn!(
"[{}] Output property '{}' specified in config already exists and will be overwritten.",
self.name,
output_prop_name
);
}
properties.insert(output_prop_name, element_value);
}
}
Ok(())
}
Err(e) => self.handle_error(
ParseJsonErrorType::ParseError,
format!("Failed to parse JSON string in property '{target_prop_name}': {e}"),
),
}
}
}
pub struct ParseJsonFactory {}
impl ParseJsonFactory {
pub fn new() -> Self {
ParseJsonFactory {}
}
}
impl Default for ParseJsonFactory {
fn default() -> Self {
Self::new()
}
}
impl SourceMiddlewareFactory for ParseJsonFactory {
fn name(&self) -> String {
"parse_json".to_string()
}
fn create(
&self,
config: &SourceMiddlewareConfig,
) -> Result<Arc<dyn SourceMiddleware>, MiddlewareSetupError> {
let parse_json_config: ParseJsonConfig =
match serde_json::from_value(Value::Object(config.config.clone())) {
Ok(cfg) => cfg,
Err(e) => {
return Err(MiddlewareSetupError::InvalidConfiguration(format!(
"[{}] Invalid parse_json configuration: {}",
config.name, e
)));
}
};
if parse_json_config.target_property.is_empty() {
return Err(MiddlewareSetupError::InvalidConfiguration(format!(
"[{}] Missing or empty 'target_property' field in parse_json configuration",
config.name
)));
}
if let Some(output_prop) = &parse_json_config.output_property {
if output_prop.is_empty() {
return Err(MiddlewareSetupError::InvalidConfiguration(format!(
"[{}] 'output_property' cannot be empty if provided in parse_json configuration",
config.name
)));
}
}
if parse_json_config.max_json_size == 0 {
return Err(MiddlewareSetupError::InvalidConfiguration(format!(
"[{}] 'max_json_size' must be greater than 0",
config.name
)));
}
if parse_json_config.max_json_size > 10 * 1024 * 1024 {
log::warn!(
"[{}] Large 'max_json_size' configured ({}MB). This might cause memory issues with large JSON inputs.",
config.name,
parse_json_config.max_json_size / (1024 * 1024)
);
}
log::debug!(
"[{}] Creating ParseJson middleware with config: {:?}",
config.name,
parse_json_config
);
Ok(Arc::new(ParseJson {
name: config.name.to_string(),
config: parse_json_config,
}))
}
}