use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::env;
use std::sync::OnceLock;
use crate::common::message::Message;
use crate::error::{Error, Result};
use super::json_path::CompiledPath;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BuiltinVar {
Now,
Date,
Uuid,
Timestamp,
SourceId,
MsgId,
}
impl BuiltinVar {
pub fn parse(s: &str) -> Option<Self> {
match s {
"$NOW" => Some(Self::Now),
"$DATE" => Some(Self::Date),
"$UUID" => Some(Self::Uuid),
"$TIMESTAMP" => Some(Self::Timestamp),
"$SOURCE_ID" => Some(Self::SourceId),
"$MSG_ID" => Some(Self::MsgId),
_ => None,
}
}
pub fn evaluate(&self, msg: &Message) -> Value {
match self {
Self::Now => Value::String(
chrono::Utc::now()
.format("%Y-%m-%dT%H:%M:%S%:z")
.to_string(),
),
Self::Date => Value::String(chrono::Utc::now().format("%Y-%m-%d").to_string()),
Self::Uuid => Value::String(uuid::Uuid::now_v7().to_string()),
Self::Timestamp => Value::Number(chrono::Utc::now().timestamp_millis().into()),
Self::SourceId => Value::String(msg.meta.source_node.clone()),
Self::MsgId => Value::String(msg.meta.id.to_string()),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MetaField {
Id,
Timestamp,
SourceNode,
CorrelationId,
ChainDepth,
Tag(String),
}
impl MetaField {
pub fn parse(path: &str) -> Option<Self> {
match path {
"id" => Some(Self::Id),
"timestamp" => Some(Self::Timestamp),
"source_node" => Some(Self::SourceNode),
"correlation_id" => Some(Self::CorrelationId),
"chain_depth" => Some(Self::ChainDepth),
s if s.starts_with("tags.") => {
let key = s.strip_prefix("tags.")?;
if key.is_empty() {
None
} else {
Some(Self::Tag(key.to_string()))
}
}
_ => None,
}
}
pub fn evaluate(&self, msg: &Message) -> Value {
match self {
Self::Id => Value::String(msg.meta.id.to_string()),
Self::Timestamp => Value::Number(msg.meta.timestamp.into()),
Self::SourceNode => Value::String(msg.meta.source_node.clone()),
Self::CorrelationId => msg
.meta
.correlation_id
.map(|id| Value::String(id.to_string()))
.unwrap_or(Value::Null),
Self::ChainDepth => Value::Number(msg.meta.chain_depth.into()),
Self::Tag(key) => msg
.meta
.tags
.get(key)
.map(|v| Value::String(v.clone()))
.unwrap_or(Value::Null),
}
}
pub fn is_optional(&self) -> bool {
matches!(self, Self::CorrelationId | Self::Tag(_))
}
}
#[derive(Debug, Clone)]
pub enum TemplatePart {
Literal(String),
Path(CompiledPath),
Variable(BuiltinVar),
EnvVar(Value),
MetaField(MetaField),
}
#[derive(Debug, Clone)]
pub enum ValueSource {
Static(Value),
Variable(BuiltinVar),
JsonPath(CompiledPath),
Template(Vec<TemplatePart>),
EnvVar {
name: String,
value: Value,
},
MetaField(MetaField),
}
impl ValueSource {
pub fn should_skip_null(&self) -> bool {
match self {
Self::JsonPath(_) | Self::Template(_) => true,
Self::MetaField(field) => field.is_optional(),
_ => false,
}
}
pub fn compile(from: Option<&str>, value: Option<&Value>) -> Result<Self> {
if from.is_none() && value.is_none() {
return Err(Error::config("Must specify either 'from' or 'value'"));
}
if let Some(val) = value {
if let Some(var_str) = val.as_str()
&& let Some(var) = BuiltinVar::parse(var_str)
{
return Ok(Self::Variable(var));
}
return Ok(Self::Static(val.clone()));
}
let from_str = from.unwrap().trim();
if let Some(var) = BuiltinVar::parse(from_str) {
return Ok(Self::Variable(var));
}
if from_str.starts_with("$ENV:") {
let (name, value) = Self::parse_env_var(from_str)?;
return Ok(Self::EnvVar { name, value });
}
if from_str.starts_with("$META.") {
let path = from_str.strip_prefix("$META.").unwrap();
let field = MetaField::parse(path).ok_or_else(|| {
Error::config(format!(
"Invalid metadata field: '{}'. Valid fields: id, timestamp, source_node, \
correlation_id, chain_depth, tags.<key>",
path
))
})?;
return Ok(Self::MetaField(field));
}
if from_str.starts_with('$') && !from_str.contains("{{") {
return Ok(Self::JsonPath(CompiledPath::compile(from_str)?));
}
if from_str.contains("{{") {
return Ok(Self::Template(Self::compile_template(from_str)?));
}
Ok(Self::Static(Value::String(from_str.to_string())))
}
fn parse_env_var(s: &str) -> Result<(String, Value)> {
let rest = s.strip_prefix("$ENV:").unwrap();
let (var_name, default) = if let Some(idx) = rest.find(":-") {
let name = &rest[..idx];
let default = &rest[idx + 2..];
(name, Some(default))
} else {
(rest, None)
};
if var_name.is_empty() {
return Err(Error::config(
"Environment variable name cannot be empty (e.g., '$ENV:MY_VAR')",
));
}
match env::var(var_name) {
Ok(val) => Ok((var_name.to_string(), Value::String(val))),
Err(_) => {
if let Some(default_val) = default {
Ok((var_name.to_string(), Value::String(default_val.to_string())))
} else {
Err(Error::config(format!(
"Environment variable '{}' is not set and no default provided. \
Use '$ENV:{}:-default' syntax to provide a default value.",
var_name, var_name
)))
}
}
}
}
fn compile_template(template: &str) -> Result<Vec<TemplatePart>> {
static TEMPLATE_RE: OnceLock<Regex> = OnceLock::new();
let re = TEMPLATE_RE
.get_or_init(|| Regex::new(r"\{\{\s*(.*?)\s*\}\}").expect("template regex is valid"));
let mut parts = Vec::new();
let mut last_end = 0;
for cap in re.captures_iter(template) {
let full_match = cap.get(0).unwrap();
let inner = cap.get(1).unwrap().as_str().trim();
if full_match.start() > last_end {
parts.push(TemplatePart::Literal(
template[last_end..full_match.start()].to_string(),
));
}
let part = Self::compile_template_part(inner)?;
parts.push(part);
last_end = full_match.end();
}
if last_end < template.len() {
parts.push(TemplatePart::Literal(template[last_end..].to_string()));
}
Ok(parts)
}
fn compile_template_part(inner: &str) -> Result<TemplatePart> {
if let Some(var) = BuiltinVar::parse(inner) {
return Ok(TemplatePart::Variable(var));
}
if inner.starts_with("$ENV:") {
let (_, value) = Self::parse_env_var(inner)?;
return Ok(TemplatePart::EnvVar(value));
}
if inner.starts_with("$META.") {
let path = inner.strip_prefix("$META.").unwrap();
let field = MetaField::parse(path).ok_or_else(|| {
Error::config(format!(
"Invalid metadata field in template: '{}'. Valid fields: id, timestamp, \
source_node, correlation_id, chain_depth, tags.<key>",
path
))
})?;
return Ok(TemplatePart::MetaField(field));
}
Ok(TemplatePart::Path(CompiledPath::compile(inner)?))
}
pub fn resolve(&self, msg: &Message) -> Value {
match self {
Self::Static(val) => val.clone(),
Self::Variable(var) => var.evaluate(msg),
Self::JsonPath(path) => path.extract(&msg.payload).cloned().unwrap_or(Value::Null),
Self::EnvVar { value, .. } => value.clone(),
Self::MetaField(field) => field.evaluate(msg),
Self::Template(parts) => {
let mut result = String::new();
let mut all_found = true;
for part in parts {
match part {
TemplatePart::Literal(s) => result.push_str(s),
TemplatePart::Variable(var) => {
append_value_as_string(&mut result, &var.evaluate(msg));
}
TemplatePart::EnvVar(val) => {
append_value_as_string(&mut result, val);
}
TemplatePart::MetaField(field) => {
let val = field.evaluate(msg);
if val.is_null() && field.is_optional() {
all_found = false;
} else {
append_value_as_string(&mut result, &val);
}
}
TemplatePart::Path(path) => {
if let Some(val) = path.extract(&msg.payload) {
append_value_as_string(&mut result, val);
} else {
all_found = false;
}
}
}
}
if all_found {
Value::String(result)
} else {
Value::Null
}
}
}
}
pub fn resolve_to_string(&self, msg: &Message, fallback: impl FnOnce() -> String) -> String {
let value = self.resolve(msg);
if value.is_null() && self.should_skip_null() {
return fallback();
}
match value {
Value::String(s) => s,
other => other.to_string(),
}
}
pub fn compile_optional(template: Option<&str>) -> Result<Option<Self>> {
match template {
Some(t) => Self::compile(Some(t), None).map(Some),
None => Ok(None),
}
}
}
fn append_value_as_string(buf: &mut String, val: &Value) {
if let Some(s) = val.as_str() {
buf.push_str(s);
} else {
buf.push_str(&val.to_string());
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FieldMapping {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub from: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub value: Option<Value>,
pub to: String,
}
pub struct CompiledMapping {
pub source: ValueSource,
pub to: CompiledPath,
}
impl FieldMapping {
pub fn compile(&self) -> Result<CompiledMapping> {
let source = ValueSource::compile(self.from.as_deref(), self.value.as_ref())?;
let to = CompiledPath::compile(&self.to)?;
Ok(CompiledMapping { source, to })
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn make_msg(payload: Value) -> Message {
Message::new("test_source", payload)
}
#[test]
fn test_builtin_var_parse() {
assert_eq!(BuiltinVar::parse("$NOW"), Some(BuiltinVar::Now));
assert_eq!(BuiltinVar::parse("$DATE"), Some(BuiltinVar::Date));
assert_eq!(BuiltinVar::parse("$UUID"), Some(BuiltinVar::Uuid));
assert_eq!(BuiltinVar::parse("$TIMESTAMP"), Some(BuiltinVar::Timestamp));
assert_eq!(BuiltinVar::parse("$SOURCE_ID"), Some(BuiltinVar::SourceId));
assert_eq!(BuiltinVar::parse("$MSG_ID"), Some(BuiltinVar::MsgId));
assert_eq!(BuiltinVar::parse("$UNKNOWN"), None);
assert_eq!(BuiltinVar::parse("NOW"), None);
}
#[test]
fn test_builtin_var_evaluate() {
let msg = make_msg(json!({}));
let now = BuiltinVar::Now.evaluate(&msg);
assert!(now.as_str().unwrap().contains("T"));
let date = BuiltinVar::Date.evaluate(&msg);
let date_str = date.as_str().unwrap();
assert_eq!(date_str.len(), 10); assert!(date_str.contains("-"));
let uuid = BuiltinVar::Uuid.evaluate(&msg);
assert_eq!(uuid.as_str().unwrap().len(), 36);
let ts = BuiltinVar::Timestamp.evaluate(&msg);
assert!(ts.as_i64().unwrap() > 0);
let source = BuiltinVar::SourceId.evaluate(&msg);
assert_eq!(source.as_str().unwrap(), "test_source");
}
#[test]
fn test_value_source_static() {
let source = ValueSource::compile(None, Some(&json!("hello"))).unwrap();
let msg = make_msg(json!({}));
assert_eq!(source.resolve(&msg), json!("hello"));
}
#[test]
fn test_value_source_variable() {
let source = ValueSource::compile(None, Some(&json!("$UUID"))).unwrap();
let msg = make_msg(json!({}));
let result = source.resolve(&msg);
assert_eq!(result.as_str().unwrap().len(), 36);
}
#[test]
fn test_value_source_jsonpath() {
let source = ValueSource::compile(Some("$.user.name"), None).unwrap();
let msg = make_msg(json!({"user": {"name": "Alice"}}));
assert_eq!(source.resolve(&msg), json!("Alice"));
}
#[test]
fn test_value_source_template() {
let source = ValueSource::compile(Some("Hello {{ $.name }}!"), None).unwrap();
let msg = make_msg(json!({"name": "World"}));
assert_eq!(source.resolve(&msg), json!("Hello World!"));
}
#[test]
fn test_value_source_template_with_variable() {
let source = ValueSource::compile(Some("ID: {{ $UUID }}"), None).unwrap();
let msg = make_msg(json!({}));
let result = source.resolve(&msg);
assert!(result.as_str().unwrap().starts_with("ID: "));
assert!(result.as_str().unwrap().len() >= 40); }
#[test]
fn test_value_source_from_variable() {
let source = ValueSource::compile(Some("$NOW"), None).unwrap();
let msg = make_msg(json!({}));
let result = source.resolve(&msg);
assert!(result.as_str().unwrap().contains("T"));
}
#[test]
fn test_meta_field_parse() {
assert_eq!(MetaField::parse("id"), Some(MetaField::Id));
assert_eq!(MetaField::parse("timestamp"), Some(MetaField::Timestamp));
assert_eq!(MetaField::parse("source_node"), Some(MetaField::SourceNode));
assert_eq!(
MetaField::parse("correlation_id"),
Some(MetaField::CorrelationId)
);
assert_eq!(MetaField::parse("chain_depth"), Some(MetaField::ChainDepth));
assert_eq!(
MetaField::parse("tags.my_key"),
Some(MetaField::Tag("my_key".to_string()))
);
assert_eq!(MetaField::parse("invalid"), None);
assert_eq!(MetaField::parse("tags."), None);
assert_eq!(MetaField::parse("tags"), None);
}
#[test]
fn test_meta_field_evaluate() {
let msg = make_msg(json!({})).with_tag("my_tag", "tag_value");
let id = MetaField::Id.evaluate(&msg);
assert_eq!(id.as_str().unwrap().len(), 36);
let ts = MetaField::Timestamp.evaluate(&msg);
assert!(ts.as_i64().unwrap() > 0);
let source = MetaField::SourceNode.evaluate(&msg);
assert_eq!(source.as_str().unwrap(), "test_source");
let corr = MetaField::CorrelationId.evaluate(&msg);
assert_eq!(corr, Value::Null);
let depth = MetaField::ChainDepth.evaluate(&msg);
assert_eq!(depth, json!(0));
let tag = MetaField::Tag("my_tag".to_string()).evaluate(&msg);
assert_eq!(tag, json!("tag_value"));
let missing_tag = MetaField::Tag("missing".to_string()).evaluate(&msg);
assert_eq!(missing_tag, Value::Null);
}
#[test]
fn test_value_source_meta_field() {
let source = ValueSource::compile(Some("$META.source_node"), None).unwrap();
let msg = make_msg(json!({}));
assert_eq!(source.resolve(&msg), json!("test_source"));
let source = ValueSource::compile(Some("$META.tags.my_tag"), None).unwrap();
let msg = make_msg(json!({})).with_tag("my_tag", "tag_value");
assert_eq!(source.resolve(&msg), json!("tag_value"));
let msg_missing = make_msg(json!({}));
assert_eq!(source.resolve(&msg_missing), Value::Null);
}
#[test]
fn test_meta_field_in_template() {
let source = ValueSource::compile(Some("Source: {{ $META.source_node }}"), None).unwrap();
let msg = make_msg(json!({}));
assert_eq!(source.resolve(&msg), json!("Source: test_source"));
}
#[test]
fn test_env_var_with_default() {
let source =
ValueSource::compile(Some("$ENV:PIPEFLOW_TEST_MISSING:-default_val"), None).unwrap();
let msg = make_msg(json!({}));
assert_eq!(source.resolve(&msg), json!("default_val"));
}
#[test]
fn test_env_var_existing() {
let source = ValueSource::compile(Some("$ENV:PATH"), None).unwrap();
let msg = make_msg(json!({}));
let result = source.resolve(&msg);
assert!(!result.as_str().unwrap().is_empty());
}
#[test]
fn test_env_var_missing_no_default() {
let result = ValueSource::compile(Some("$ENV:PIPEFLOW_DEFINITELY_NOT_SET_12345"), None);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("PIPEFLOW_DEFINITELY_NOT_SET_12345"));
}
#[test]
fn test_env_var_in_template() {
let source =
ValueSource::compile(Some("Env: {{ $ENV:PIPEFLOW_TEST_TPL:-fallback }}"), None)
.unwrap();
let msg = make_msg(json!({}));
assert_eq!(source.resolve(&msg), json!("Env: fallback"));
}
#[test]
fn test_should_skip_null() {
let static_src = ValueSource::compile(None, Some(&json!("hello"))).unwrap();
assert!(!static_src.should_skip_null());
let var_src = ValueSource::compile(Some("$UUID"), None).unwrap();
assert!(!var_src.should_skip_null());
let env_src = ValueSource::compile(Some("$ENV:TEST_SKIP:-default"), None).unwrap();
assert!(!env_src.should_skip_null());
let path_src = ValueSource::compile(Some("$.missing"), None).unwrap();
assert!(path_src.should_skip_null());
let tpl_src = ValueSource::compile(Some("Hello {{ $.name }}"), None).unwrap();
assert!(tpl_src.should_skip_null());
let meta_source_node = ValueSource::compile(Some("$META.source_node"), None).unwrap();
assert!(!meta_source_node.should_skip_null());
let meta_corr = ValueSource::compile(Some("$META.correlation_id"), None).unwrap();
assert!(meta_corr.should_skip_null());
let meta_tag = ValueSource::compile(Some("$META.tags.key"), None).unwrap();
assert!(meta_tag.should_skip_null()); }
}