use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::common::message::Message;
use crate::error::{Error, Result};
use crate::transform::json_path::CompiledPath;
use crate::transform::step::Step;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ComputeStepConfig {
pub expression: String,
pub output: String,
#[serde(default)]
pub precision: Option<u32>,
}
#[derive(Debug, Clone, PartialEq)]
enum Token {
Number(f64),
JsonPath(String),
Plus,
Minus,
Multiply,
Divide,
LParen,
RParen,
}
#[derive(Debug, Clone)]
enum ExprNode {
Number(f64),
Field(CompiledPath),
BinaryOp {
left: Box<ExprNode>,
op: BinaryOp,
right: Box<ExprNode>,
},
Negate(Box<ExprNode>),
}
#[derive(Debug, Clone, Copy)]
enum BinaryOp {
Add,
Subtract,
Multiply,
Divide,
}
fn tokenize(expr: &str) -> Result<Vec<Token>> {
let mut tokens = Vec::new();
let mut chars = expr.chars().peekable();
while let Some(&ch) = chars.peek() {
match ch {
' ' | '\t' | '\n' | '\r' => {
chars.next();
}
'+' => {
tokens.push(Token::Plus);
chars.next();
}
'-' => {
tokens.push(Token::Minus);
chars.next();
}
'*' => {
tokens.push(Token::Multiply);
chars.next();
}
'/' => {
tokens.push(Token::Divide);
chars.next();
}
'(' => {
tokens.push(Token::LParen);
chars.next();
}
')' => {
tokens.push(Token::RParen);
chars.next();
}
'$' => {
let mut path = String::new();
while let Some(&c) = chars.peek() {
if c.is_alphanumeric()
|| c == '$'
|| c == '.'
|| c == '_'
|| c == '['
|| c == ']'
|| c == '\''
|| c == '"'
|| c == ':'
|| c == '-'
{
path.push(c);
chars.next();
} else {
break;
}
}
if path.is_empty() {
return Err(Error::config("Empty JSONPath reference in expression"));
}
tokens.push(Token::JsonPath(path));
}
'0'..='9' | '.' => {
let mut num_str = String::new();
let mut has_dot = false;
while let Some(&c) = chars.peek() {
if c.is_ascii_digit() {
num_str.push(c);
chars.next();
} else if c == '.' && !has_dot {
has_dot = true;
num_str.push(c);
chars.next();
} else {
break;
}
}
let num: f64 = num_str.parse().map_err(|_| {
Error::config(format!("Invalid number in expression: {}", num_str))
})?;
tokens.push(Token::Number(num));
}
_ => {
return Err(Error::config(format!(
"Unexpected character in expression: '{}'",
ch
)));
}
}
}
Ok(tokens)
}
struct ExprParser {
tokens: Vec<Token>,
pos: usize,
}
impl ExprParser {
fn new(tokens: Vec<Token>) -> Self {
Self { tokens, pos: 0 }
}
fn peek(&self) -> Option<&Token> {
self.tokens.get(self.pos)
}
fn advance(&mut self) -> Option<&Token> {
let token = self.tokens.get(self.pos);
if token.is_some() {
self.pos += 1;
}
token
}
fn parse_expr(&mut self) -> Result<ExprNode> {
let mut left = self.parse_term()?;
loop {
match self.peek() {
Some(Token::Plus) => {
self.advance();
let right = self.parse_term()?;
left = ExprNode::BinaryOp {
left: Box::new(left),
op: BinaryOp::Add,
right: Box::new(right),
};
}
Some(Token::Minus) => {
self.advance();
let right = self.parse_term()?;
left = ExprNode::BinaryOp {
left: Box::new(left),
op: BinaryOp::Subtract,
right: Box::new(right),
};
}
_ => break,
}
}
Ok(left)
}
fn parse_term(&mut self) -> Result<ExprNode> {
let mut left = self.parse_unary()?;
loop {
match self.peek() {
Some(Token::Multiply) => {
self.advance();
let right = self.parse_unary()?;
left = ExprNode::BinaryOp {
left: Box::new(left),
op: BinaryOp::Multiply,
right: Box::new(right),
};
}
Some(Token::Divide) => {
self.advance();
let right = self.parse_unary()?;
left = ExprNode::BinaryOp {
left: Box::new(left),
op: BinaryOp::Divide,
right: Box::new(right),
};
}
_ => break,
}
}
Ok(left)
}
fn parse_unary(&mut self) -> Result<ExprNode> {
if let Some(Token::Minus) = self.peek() {
self.advance();
let operand = self.parse_unary()?;
return Ok(ExprNode::Negate(Box::new(operand)));
}
self.parse_primary()
}
fn parse_primary(&mut self) -> Result<ExprNode> {
match self.advance() {
Some(Token::Number(n)) => Ok(ExprNode::Number(*n)),
Some(Token::JsonPath(path)) => {
let compiled = CompiledPath::compile(path)?;
Ok(ExprNode::Field(compiled))
}
Some(Token::LParen) => {
let expr = self.parse_expr()?;
match self.advance() {
Some(Token::RParen) => Ok(expr),
_ => Err(Error::config("Missing closing parenthesis in expression")),
}
}
Some(token) => Err(Error::config(format!(
"Unexpected token in expression: {:?}",
token
))),
None => Err(Error::config("Unexpected end of expression")),
}
}
}
impl ExprNode {
fn evaluate(&self, payload: &Value) -> Result<f64> {
match self {
ExprNode::Number(n) => Ok(*n),
ExprNode::Field(path) => {
let value = path
.extract(payload)
.ok_or_else(|| Error::transform(format!("Field not found: {}", path)))?;
match value {
Value::Number(n) => n.as_f64().ok_or_else(|| {
Error::transform(format!("Field {} is not a valid number", path))
}),
Value::String(s) => s.parse::<f64>().map_err(|_| {
Error::transform(format!(
"Field {} cannot be parsed as number: {}",
path, s
))
}),
_ => Err(Error::transform(format!(
"Field {} is not a number: {:?}",
path, value
))),
}
}
ExprNode::BinaryOp { left, op, right } => {
let l = left.evaluate(payload)?;
let r = right.evaluate(payload)?;
match op {
BinaryOp::Add => Ok(l + r),
BinaryOp::Subtract => Ok(l - r),
BinaryOp::Multiply => Ok(l * r),
BinaryOp::Divide => {
if r == 0.0 {
Err(Error::transform("Division by zero"))
} else {
Ok(l / r)
}
}
}
}
ExprNode::Negate(operand) => Ok(-operand.evaluate(payload)?),
}
}
}
pub struct ComputeStep {
expr: ExprNode,
output: CompiledPath,
precision: Option<u32>,
}
impl ComputeStep {
pub fn new(config: ComputeStepConfig) -> Result<Self> {
let tokens = tokenize(&config.expression)?;
if tokens.is_empty() {
return Err(Error::config("Empty expression"));
}
let mut parser = ExprParser::new(tokens);
let expr = parser.parse_expr()?;
if parser.peek().is_some() {
return Err(Error::config(format!(
"Unexpected tokens after expression: {:?}",
parser.peek()
)));
}
let output = CompiledPath::compile(&config.output)?;
Ok(Self {
expr,
output,
precision: config.precision,
})
}
}
impl Step for ComputeStep {
fn step_type(&self) -> &'static str {
"compute"
}
fn process(&self, mut msg: Message) -> Result<Option<Message>> {
let result = self.expr.evaluate(&msg.payload)?;
let result = if let Some(precision) = self.precision {
let factor = 10_f64.powi(precision as i32);
(result * factor).round() / factor
} else {
result
};
let value = serde_json::Number::from_f64(result)
.map(Value::Number)
.unwrap_or(Value::Null);
self.output.set(&mut msg.payload, value);
tracing::debug!(
output = %self.output,
result = result,
"Compute step evaluated expression"
);
Ok(Some(msg))
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn make_msg(payload: Value) -> Message {
Message::new("test", payload)
}
#[test]
fn test_compute_simple_division() {
let config = ComputeStepConfig {
expression: "$.eth / $.btc".into(),
output: "$.ratio".into(),
precision: Some(6),
};
let step = ComputeStep::new(config).unwrap();
let msg = make_msg(json!({
"eth": 3000.0,
"btc": 100000.0
}));
let result = step.process(msg).unwrap().unwrap();
assert_eq!(result.payload["ratio"], 0.03);
}
#[test]
fn test_compute_nested_fields() {
let config = ComputeStepConfig {
expression: "$.data.a + $.data.b".into(),
output: "$.sum".into(),
precision: None,
};
let step = ComputeStep::new(config).unwrap();
let msg = make_msg(json!({
"data": {"a": 10, "b": 20}
}));
let result = step.process(msg).unwrap().unwrap();
assert_eq!(result.payload["sum"], 30.0);
}
#[test]
fn test_compute_complex_expression() {
let config = ComputeStepConfig {
expression: "($.a + $.b) * $.c".into(),
output: "$.result".into(),
precision: None,
};
let step = ComputeStep::new(config).unwrap();
let msg = make_msg(json!({"a": 2, "b": 3, "c": 4}));
let result = step.process(msg).unwrap().unwrap();
assert_eq!(result.payload["result"], 20.0);
}
#[test]
fn test_compute_with_literals() {
let config = ComputeStepConfig {
expression: "$.value * 100".into(),
output: "$.percentage".into(),
precision: None,
};
let step = ComputeStep::new(config).unwrap();
let msg = make_msg(json!({"value": 0.5}));
let result = step.process(msg).unwrap().unwrap();
assert_eq!(result.payload["percentage"], 50.0);
}
#[test]
fn test_compute_unary_minus() {
let config = ComputeStepConfig {
expression: "-$.value".into(),
output: "$.negated".into(),
precision: None,
};
let step = ComputeStep::new(config).unwrap();
let msg = make_msg(json!({"value": 42}));
let result = step.process(msg).unwrap().unwrap();
assert_eq!(result.payload["negated"], -42.0);
}
#[test]
fn test_compute_precision() {
let config = ComputeStepConfig {
expression: "1 / 3".into(),
output: "$.result".into(),
precision: Some(4),
};
let step = ComputeStep::new(config).unwrap();
let msg = make_msg(json!({}));
let result = step.process(msg).unwrap().unwrap();
assert_eq!(result.payload["result"], 0.3333);
}
#[test]
fn test_compute_division_by_zero() {
let config = ComputeStepConfig {
expression: "$.a / $.b".into(),
output: "$.result".into(),
precision: None,
};
let step = ComputeStep::new(config).unwrap();
let msg = make_msg(json!({"a": 10, "b": 0}));
let result = step.process(msg);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Division by zero"));
}
#[test]
fn test_compute_missing_field() {
let config = ComputeStepConfig {
expression: "$.a + $.missing".into(),
output: "$.result".into(),
precision: None,
};
let step = ComputeStep::new(config).unwrap();
let msg = make_msg(json!({"a": 10}));
let result = step.process(msg);
assert!(result.is_err());
}
#[test]
fn test_compute_string_to_number() {
let config = ComputeStepConfig {
expression: "$.price * 2".into(),
output: "$.double".into(),
precision: None,
};
let step = ComputeStep::new(config).unwrap();
let msg = make_msg(json!({"price": "50"}));
let result = step.process(msg).unwrap().unwrap();
assert_eq!(result.payload["double"], 100.0);
}
#[test]
fn test_tokenize() {
let tokens = tokenize("$.a + $.b * 2").unwrap();
assert_eq!(tokens.len(), 5);
assert!(matches!(tokens[0], Token::JsonPath(_)));
assert!(matches!(tokens[1], Token::Plus));
assert!(matches!(tokens[2], Token::JsonPath(_)));
assert!(matches!(tokens[3], Token::Multiply));
assert!(matches!(tokens[4], Token::Number(2.0)));
}
#[test]
fn test_tokenize_jsonpath_with_quoted_brackets() {
let expr = "$['crypto:price:ETH'].price_usd / $['crypto:price:BTC'].price_usd";
let tokens = tokenize(expr).unwrap();
assert_eq!(tokens.len(), 3);
assert!(matches!(tokens[0], Token::JsonPath(_)));
assert!(matches!(tokens[1], Token::Divide));
assert!(matches!(tokens[2], Token::JsonPath(_)));
}
#[test]
fn test_invalid_expression() {
let config = ComputeStepConfig {
expression: "$.a +".into(),
output: "$.result".into(),
precision: None,
};
let result = ComputeStep::new(config);
assert!(result.is_err());
}
}