use crate::common::ErrorHandling;
use async_trait::async_trait;
use drasi_core::{
interface::{
ElementIndex, MiddlewareError, MiddlewareSetupError, SourceMiddleware,
SourceMiddlewareFactory,
},
models::{Element, ElementValue, SourceChange, SourceMiddlewareConfig},
};
use jsonpath_rust::{path::config::JsonPathConfig, JsonPathInst};
use log::debug;
use serde::{de, Deserialize, Deserializer};
use serde_json::Value;
use std::{str::FromStr, sync::Arc};
#[cfg(test)]
mod tests;
#[derive(Default, Debug, Clone, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum ConflictStrategy {
#[default]
Overwrite,
Skip,
Fail,
}
#[derive(Debug, Clone)]
pub struct JsonPathExpression {
expression: String,
path: JsonPathInst,
}
impl JsonPathExpression {
fn execute(&self, value: &Value) -> Vec<Value> {
self.path
.find_slice(value, JsonPathConfig::default())
.into_iter()
.map(|v| (*v).clone())
.collect()
}
}
impl FromStr for JsonPathExpression {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match JsonPathInst::from_str(s) {
Ok(path) => Ok(JsonPathExpression {
expression: s.to_string(),
path,
}),
Err(e) => Err(format!("Failed to parse rule: {e}")),
}
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct PromoteMapping {
#[serde(deserialize_with = "deserialize_jsonpath")]
pub path: JsonPathExpression,
pub target_name: String,
}
fn deserialize_jsonpath<'de, D>(deserializer: D) -> Result<JsonPathExpression, D::Error>
where
D: Deserializer<'de>,
{
let expression = String::deserialize(deserializer)?;
if expression.is_empty() {
return Err(de::Error::custom("Empty JSONPath"));
}
JsonPathExpression::from_str(&expression).map_err(de::Error::custom)
}
#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct PromoteMiddlewareConfig {
pub mappings: Vec<PromoteMapping>,
#[serde(default)]
pub on_conflict: ConflictStrategy,
#[serde(default)]
pub on_error: ErrorHandling,
}
pub struct PromoteMiddleware {
name: String,
mappings: Vec<PromoteMapping>,
on_conflict: ConflictStrategy,
on_error: ErrorHandling,
}
impl PromoteMiddleware {
pub fn new(name: String, config: PromoteMiddlewareConfig) -> Self {
PromoteMiddleware {
name,
mappings: config.mappings,
on_conflict: config.on_conflict,
on_error: config.on_error,
}
}
fn extract_value(
&self,
json_obj: &Value,
path: &JsonPathExpression,
) -> Result<Option<Value>, MiddlewareError> {
let results = path.execute(json_obj);
if results.is_empty() {
let msg = format!(
"[{}] JSONPath '{}' selected no values",
self.name, path.expression
);
match self.on_error {
ErrorHandling::Skip => {
debug!("{msg}");
Ok(None)
}
ErrorHandling::Fail => Err(MiddlewareError::SourceChangeError(msg)),
}
} else if results.len() > 1 {
let msg = format!(
"[{}] JSONPath '{}' selected multiple values ({})",
self.name,
path.expression,
results.len()
);
match self.on_error {
ErrorHandling::Skip => {
debug!("{msg}");
Ok(None)
}
ErrorHandling::Fail => Err(MiddlewareError::SourceChangeError(msg)),
}
} else {
Ok(Some(results[0].clone()))
}
}
fn process_element(&self, element: &mut Element) -> Result<(), MiddlewareError> {
let properties = match element {
Element::Node { properties, .. } => properties,
Element::Relation { properties, .. } => properties,
};
let json_props: Value = {
let map: serde_json::Map<String, serde_json::Value> = (&*properties).into();
Value::Object(map)
};
for mapping in &self.mappings {
let value = match self.extract_value(&json_props, &mapping.path)? {
Some(v) => v,
None => continue, };
if let Some(existing) = properties.get(&mapping.target_name) {
if *existing != ElementValue::Null {
match self.on_conflict {
ConflictStrategy::Overwrite => {
debug!(
"[{}] Overwriting existing property '{}'",
self.name, mapping.target_name
);
}
ConflictStrategy::Skip => {
debug!(
"[{}] Skipping promotion to '{}' due to existing property",
self.name, mapping.target_name
);
continue;
}
ConflictStrategy::Fail => {
return Err(MiddlewareError::SourceChangeError(format!(
"[{}] Property '{}' already exists and conflict strategy is 'fail'",
self.name, mapping.target_name
)));
}
}
}
}
let element_value = ElementValue::from(&value);
properties.insert(&mapping.target_name, element_value);
}
Ok(())
}
}
#[async_trait]
impl SourceMiddleware for PromoteMiddleware {
async fn process(
&self,
source_change: SourceChange,
_element_index: &dyn ElementIndex,
) -> Result<Vec<SourceChange>, MiddlewareError> {
match source_change {
SourceChange::Insert { mut element } => match self.process_element(&mut element) {
Ok(_) => Ok(vec![SourceChange::Insert { element }]),
Err(e) => Err(e),
},
SourceChange::Update { mut element } => match self.process_element(&mut element) {
Ok(_) => Ok(vec![SourceChange::Update { element }]),
Err(e) => Err(e),
},
SourceChange::Delete { .. } | SourceChange::Future { .. } => Ok(vec![source_change]),
}
}
}
pub struct PromoteMiddlewareFactory {}
impl PromoteMiddlewareFactory {
pub fn new() -> Self {
PromoteMiddlewareFactory {}
}
}
impl Default for PromoteMiddlewareFactory {
fn default() -> Self {
Self::new()
}
}
impl SourceMiddlewareFactory for PromoteMiddlewareFactory {
fn name(&self) -> String {
"promote".to_string()
}
fn create(
&self,
config: &SourceMiddlewareConfig,
) -> Result<Arc<dyn SourceMiddleware>, MiddlewareSetupError> {
let promote_config: PromoteMiddlewareConfig =
match serde_json::from_value(serde_json::Value::Object(config.config.clone())) {
Ok(cfg) => cfg,
Err(e) => {
if e.to_string().contains("Empty JSONPath") {
return Err(MiddlewareSetupError::InvalidConfiguration(
format!("[{}] {}", config.name, e), ));
}
return Err(MiddlewareSetupError::InvalidConfiguration(format!(
"[{}] Invalid configuration: {}",
config.name, e
)));
}
};
if promote_config.mappings.is_empty() {
return Err(MiddlewareSetupError::InvalidConfiguration(format!(
"[{}] At least one mapping must be specified",
config.name
)));
}
for (i, mapping) in promote_config.mappings.iter().enumerate() {
if mapping.target_name.is_empty() {
return Err(MiddlewareSetupError::InvalidConfiguration(format!(
"[{}] Empty target_name in mapping at index {}",
config.name, i
)));
}
}
log::info!(
"[{}] Creating Promote middleware with {} mappings",
config.name,
promote_config.mappings.len()
);
Ok(Arc::new(PromoteMiddleware::new(
config.name.to_string(),
promote_config,
)))
}
}