pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
//! Hash step for generating deterministic identifiers.
//!
//! This step computes cryptographic hashes (md5/sha1/sha256) from a value source
//! (JSONPath, template, static value, or built-in variable) and writes the
//! resulting lower-case hex string to the configured target path.
//!
//! Notes:
//! - Hash input is treated as UTF-8 text. If the resolved value is not a string,
//!   its JSON representation (`Value::to_string()`) is hashed.
//! - If the resolved value is `null`, the mapping is skipped (no field is set).

use serde::{Deserialize, Serialize};
use serde_json::Value;

use crate::common::message::Message;
use crate::error::{Error, Result};
use crate::transform::json_path::CompiledPath;
use crate::transform::step::Step;
use crate::transform::value::{FieldMapping, ValueSource};

/// Supported hash algorithms
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum HashAlgo {
    /// MD5 (not collision-resistant; provided for interoperability)
    Md5,
    /// SHA-1 (not collision-resistant; provided for legacy compatibility)
    Sha1,
    /// SHA-256 (recommended default)
    Sha256,
}

/// A single hash mapping definition
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HashMapping {
    /// Hash algorithm to apply
    pub algo: HashAlgo,
    #[serde(flatten)]
    /// Value source and target path
    pub mapping: FieldMapping,
}

/// Hash step configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HashStepConfig {
    /// Mappings to compute and write
    pub mappings: Vec<HashMapping>,
}

#[derive(Debug, Clone)]
struct CompiledHashMapping {
    algo: HashAlgo,
    source: ValueSource,
    to: CompiledPath,
}

/// Hash step that computes hashes and writes them into the payload.
pub struct HashStep {
    mappings: Vec<CompiledHashMapping>,
}

impl HashStep {
    /// Create a new hash step from configuration.
    ///
    /// Compiles value sources and target paths once for efficient per-message execution.
    pub fn new(config: HashStepConfig) -> Result<Self> {
        if config.mappings.is_empty() {
            return Err(Error::config("Hash step requires at least one mapping"));
        }

        let mut mappings = Vec::with_capacity(config.mappings.len());
        for m in &config.mappings {
            let compiled = m
                .mapping
                .compile()
                .map_err(|e| Error::config(format!("Hash mapping to '{}': {}", m.mapping.to, e)))?;

            mappings.push(CompiledHashMapping {
                algo: m.algo,
                source: compiled.source,
                to: compiled.to,
            });
        }

        Ok(Self { mappings })
    }

    fn hash_hex(algo: HashAlgo, bytes: &[u8]) -> String {
        use digest::Digest;

        match algo {
            HashAlgo::Md5 => hex::encode(md5::Md5::digest(bytes)),
            HashAlgo::Sha1 => hex::encode(sha1::Sha1::digest(bytes)),
            HashAlgo::Sha256 => hex::encode(sha2::Sha256::digest(bytes)),
        }
    }
}

impl Step for HashStep {
    fn step_type(&self) -> &'static str {
        "hash"
    }

    fn process(&self, mut msg: Message) -> Result<Option<Message>> {
        // Snapshot semantics: resolve all inputs from the original message.
        let mut updates: Vec<(&CompiledPath, Value)> = Vec::with_capacity(self.mappings.len());

        for mapping in &self.mappings {
            let value = mapping.source.resolve(&msg);

            if value.is_null() {
                tracing::debug!(to = %mapping.to, "Hash input resolved to null, skipping mapping");
                continue;
            }

            let input = match &value {
                Value::String(s) => s.clone(),
                other => other.to_string(),
            };

            let digest = Self::hash_hex(mapping.algo, input.as_bytes());
            updates.push((&mapping.to, Value::String(digest)));
        }

        for (to, value) in updates {
            to.set(&mut msg.payload, value);
        }

        Ok(Some(msg))
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use serde_json::json;

    fn make_msg(payload: Value) -> Message {
        Message::new("test", payload)
    }

    #[test]
    fn test_hash_md5() {
        let cfg = HashStepConfig {
            mappings: vec![HashMapping {
                algo: HashAlgo::Md5,
                mapping: FieldMapping {
                    from: Some("$.value".into()),
                    value: None,
                    to: "$.id".into(),
                },
            }],
        };
        let step = HashStep::new(cfg).unwrap();

        let msg = make_msg(json!({"value": "hello"}));
        let out = step.process(msg).unwrap().unwrap();
        assert_eq!(out.payload["id"], json!("5d41402abc4b2a76b9719d911017c592"));
    }

    #[test]
    fn test_hash_sha1() {
        let cfg = HashStepConfig {
            mappings: vec![HashMapping {
                algo: HashAlgo::Sha1,
                mapping: FieldMapping {
                    from: Some("$.value".into()),
                    value: None,
                    to: "$.id".into(),
                },
            }],
        };
        let step = HashStep::new(cfg).unwrap();

        let msg = make_msg(json!({"value": "hello"}));
        let out = step.process(msg).unwrap().unwrap();
        assert_eq!(
            out.payload["id"],
            json!("aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d")
        );
    }

    #[test]
    fn test_hash_sha256() {
        let cfg = HashStepConfig {
            mappings: vec![HashMapping {
                algo: HashAlgo::Sha256,
                mapping: FieldMapping {
                    from: Some("$.value".into()),
                    value: None,
                    to: "$.id".into(),
                },
            }],
        };
        let step = HashStep::new(cfg).unwrap();

        let msg = make_msg(json!({"value": "hello"}));
        let out = step.process(msg).unwrap().unwrap();
        assert_eq!(
            out.payload["id"],
            json!("2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824")
        );
    }

    #[test]
    fn test_hash_template_input() {
        let cfg = HashStepConfig {
            mappings: vec![HashMapping {
                algo: HashAlgo::Sha256,
                mapping: FieldMapping {
                    from: Some("{{ $.a }}|{{ $.b }}".into()),
                    value: None,
                    to: "$.id".into(),
                },
            }],
        };
        let step = HashStep::new(cfg).unwrap();

        let msg = make_msg(json!({"a": "x", "b": "y"}));
        let out = step.process(msg).unwrap().unwrap();
        assert_eq!(
            out.payload["id"],
            json!("791a886d455a84781e210fdcef62b8be992cd8d73a79e8d25ce62656a0e41c15")
        );
    }

    #[test]
    fn test_hash_null_input_skips_mapping() {
        let cfg = HashStepConfig {
            mappings: vec![HashMapping {
                algo: HashAlgo::Sha256,
                mapping: FieldMapping {
                    from: Some("$.missing".into()),
                    value: None,
                    to: "$.id".into(),
                },
            }],
        };
        let step = HashStep::new(cfg).unwrap();

        let msg = make_msg(json!({"value": "hello"}));
        let out = step.process(msg).unwrap().unwrap();
        assert!(out.payload.get("id").is_none() || out.payload["id"].is_null());
    }
}