pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
//! JSONPath-like helpers for `serde_json::Value`.
//!
//! This is intentionally **not** full JSONPath. It supports a small, predictable subset:
//! - Root `$` or `$.` prefix
//! - Dot-separated object keys: `$.data.user`
//! - Bracket notation for keys with special chars: `$['crypto:price:BTC']`
//! - Array index per segment: `items[0]` or `[0]`
//!
//! # Pre-compiled Paths
//!
//! For performance-critical code paths (like transform pipelines), use [`CompiledPath`]
//! to parse the path once and reuse it for every message. This avoids repeated string
//! parsing and provides ~4x speedup over runtime parsing.

use serde_json::Value;

use crate::error::{Error, Result};

/// Ensure value is an object, converting if needed
fn ensure_object(value: &mut Value) -> &mut serde_json::Map<String, Value> {
    if !value.is_object() {
        *value = Value::Object(serde_json::Map::new());
    }
    value.as_object_mut().unwrap()
}

/// Ensure value is an array with at least `min_len` elements
fn ensure_array(value: &mut Value, min_len: usize) -> &mut Vec<Value> {
    if !value.is_array() {
        *value = Value::Array(Vec::new());
    }
    let arr = value.as_array_mut().unwrap();
    while arr.len() < min_len {
        arr.push(Value::Null);
    }
    arr
}

// =============================================================================
// Compiled Path (Pre-parsed for efficiency)
// =============================================================================

/// A single segment in a compiled JSONPath.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Segment {
    /// Object key access: `.field`
    Key(String),
    /// Array index access: `[0]`
    Index(usize),
    /// Combined key + index: `field[0]`
    KeyIndex(String, usize),
}

/// Pre-compiled JSONPath for efficient repeated access.
#[derive(Debug, Clone)]
pub struct CompiledPath {
    /// Original path string (for logging/debugging)
    raw: String,
    /// Pre-parsed segments
    segments: Vec<Segment>,
}

impl CompiledPath {
    /// Compile a JSONPath-like expression into a reusable accessor.
    pub fn compile(path: &str) -> Result<Self> {
        let path = path.trim();
        let raw = path.to_string();

        if path.is_empty() || path == "$" {
            return Ok(Self {
                raw,
                segments: vec![],
            });
        }

        let mut segments = Vec::new();
        let chars: Vec<char> = path.chars().collect();
        let mut i = 0;

        // Skip leading $
        if i < chars.len() && chars[i] == '$' {
            i += 1;
        }

        while i < chars.len() {
            let c = chars[i];

            if c == '.' {
                i += 1; // Skip dot
                if i >= chars.len() {
                    break;
                }
            } else if c == '[' {
                // Start of bracket notation
                i += 1;
                let mut content = String::new();
                let mut is_quote = false;
                let mut quote_char = '\0';

                if i < chars.len() && (chars[i] == '\'' || chars[i] == '"') {
                    is_quote = true;
                    quote_char = chars[i];
                    i += 1;
                }

                while i < chars.len() {
                    let curr = chars[i];
                    if is_quote {
                        if curr == quote_char {
                            i += 1; // Skip closing quote
                            break;
                        }
                    } else if curr == ']' {
                        break;
                    }
                    content.push(curr);
                    i += 1;
                }

                if i < chars.len() && chars[i] == ']' {
                    i += 1; // Skip closing bracket
                }

                if is_quote {
                    // Key access: ['key']
                    segments.push(Segment::Key(content));
                } else {
                    // Array index: [0]
                    let index: usize = content.parse().map_err(|_| {
                        Error::config(format!("Invalid array index in path: [{}]", content))
                    })?;
                    // Check if previous segment was a Key, merge if needed?
                    // Actually, our Segment enum supports KeyIndex, but typical JSONPath
                    // usually treats `field[0]` as `field` then `[0]`.
                    // The existing KeyIndex is an optimization for `key[index]`.
                    // But here we are parsing token by token.
                    // If the PREVIOUS segment was a Key, we can upgrade it to KeyIndex?
                    // For simplicity, let's treat it as separate Index segment for now,
                    // unless we want to maintain behavior of `field[0]`.
                    // The previous impl handled `field[0]` by splitting on brackets.

                    segments.push(Segment::Index(index));
                }
            } else {
                // Regular key
                let mut key = String::new();
                while i < chars.len() {
                    let curr = chars[i];
                    if curr == '.' || curr == '[' {
                        break;
                    }
                    key.push(curr);
                    i += 1;
                }

                // Check if we immediately see a bracket for array index
                if i < chars.len() && chars[i] == '[' {
                    // Check if it's an index or quoted key (which would be weird: key['key'])
                    // Assuming standard property[index] syntax
                    let next_char = if i + 1 < chars.len() {
                        chars[i + 1]
                    } else {
                        '\0'
                    };
                    if next_char.is_ascii_digit() {
                        // Parse index
                        i += 1; // Skip [
                        let mut idx_str = String::new();
                        while i < chars.len() && chars[i] != ']' {
                            idx_str.push(chars[i]);
                            i += 1;
                        }
                        if i < chars.len() {
                            i += 1;
                        } // Skip ]

                        let index: usize = idx_str.parse().map_err(|_| {
                            Error::config(format!(
                                "Invalid array index in path: {}[{}]",
                                key, idx_str
                            ))
                        })?;
                        segments.push(Segment::KeyIndex(key, index));
                        continue;
                    }
                }

                segments.push(Segment::Key(key));
            }
        }

        Ok(Self { raw, segments })
    }

    /// Get the original path string
    #[allow(dead_code)]
    pub fn raw(&self) -> &str {
        &self.raw
    }

    /// Extract a value from JSON using this pre-compiled path.
    pub fn extract<'a>(&self, value: &'a Value) -> Option<&'a Value> {
        let mut current = value;

        for segment in &self.segments {
            current = match segment {
                Segment::Key(key) => current.get(key)?,
                Segment::Index(idx) => current.get(idx)?,
                Segment::KeyIndex(key, idx) => current.get(key)?.get(idx)?,
            };
        }

        Some(current)
    }

    /// Set a value in JSON at this path, creating intermediate objects/arrays as needed.
    pub fn set(&self, target: &mut Value, value: Value) {
        if self.segments.is_empty() {
            *target = value;
            return;
        }

        let mut current = target;
        let last_idx = self.segments.len() - 1;

        for (i, segment) in self.segments.iter().enumerate() {
            let is_last = i == last_idx;

            match segment {
                Segment::Key(key) => {
                    let obj = ensure_object(current);
                    if is_last {
                        obj.insert(key.clone(), value);
                        return;
                    }
                    obj.entry(key.clone())
                        .or_insert(Value::Object(serde_json::Map::new()));
                    current = obj.get_mut(key).unwrap();
                }
                Segment::Index(idx) => {
                    let arr = ensure_array(current, idx + 1);
                    if is_last {
                        arr[*idx] = value;
                        return;
                    }
                    current = &mut arr[*idx];
                }
                Segment::KeyIndex(key, idx) => {
                    // Navigate to key first
                    let obj = ensure_object(current);
                    obj.entry(key.clone()).or_insert(Value::Null);
                    current = obj.get_mut(key).unwrap();

                    // Then navigate to array index
                    let arr = ensure_array(current, idx + 1);
                    if is_last {
                        arr[*idx] = value;
                        return;
                    }
                    current = &mut arr[*idx];
                }
            }
        }
    }
}

impl std::fmt::Display for CompiledPath {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", self.raw)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use serde_json::json;

    // ========== CompiledPath::compile() tests ==========

    #[test]
    fn test_compile_root() {
        let path = CompiledPath::compile("$").unwrap();
        assert!(path.segments.is_empty());
    }

    #[test]
    fn test_compile_simple_field() {
        let path = CompiledPath::compile("$.name").unwrap();
        assert_eq!(path.segments, vec![Segment::Key("name".into())]);
    }

    #[test]
    fn test_compile_bracket_field() {
        let path = CompiledPath::compile("$['name']").unwrap();
        assert_eq!(path.segments, vec![Segment::Key("name".into())]);
    }

    #[test]
    fn test_compile_bracket_special_char() {
        let path = CompiledPath::compile("$['crypto:price:BTC']").unwrap();
        assert_eq!(path.segments, vec![Segment::Key("crypto:price:BTC".into())]);
    }

    #[test]
    fn test_compile_nested_fields() {
        let path = CompiledPath::compile("$.user.profile.email").unwrap();
        assert_eq!(
            path.segments,
            vec![
                Segment::Key("user".into()),
                Segment::Key("profile".into()),
                Segment::Key("email".into()),
            ]
        );
    }

    #[test]
    fn test_compile_mixed_notation() {
        let path = CompiledPath::compile("$.data['crypto:price:BTC'].value").unwrap();
        assert_eq!(
            path.segments,
            vec![
                Segment::Key("data".into()),
                Segment::Key("crypto:price:BTC".into()),
                Segment::Key("value".into()),
            ]
        );
    }

    #[test]
    fn test_compile_array_index() {
        let path = CompiledPath::compile("$.items[0]").unwrap();
        assert_eq!(path.segments, vec![Segment::KeyIndex("items".into(), 0)]);
    }

    #[test]
    fn test_compile_nested_with_array() {
        let path = CompiledPath::compile("$.data.users[2].name").unwrap();
        assert_eq!(
            path.segments,
            vec![
                Segment::Key("data".into()),
                Segment::KeyIndex("users".into(), 2),
                Segment::Key("name".into()),
            ]
        );
    }

    #[test]
    fn test_compile_bracket_array() {
        let path = CompiledPath::compile("$[0]").unwrap();
        assert_eq!(path.segments, vec![Segment::Index(0)]);
    }

    #[test]
    fn test_compile_invalid_index_error() {
        let result = CompiledPath::compile("$.items[abc]");
        assert!(result.is_err());
    }

    // ========== CompiledPath::extract() tests ==========

    #[test]
    fn test_compiled_extract_special() {
        let path = CompiledPath::compile("$['crypto:price:BTC']").unwrap();
        let payload = json!({"crypto:price:BTC": 100});
        assert_eq!(path.extract(&payload), Some(&json!(100)));
    }

    #[test]
    fn test_compiled_extract_mixed() {
        let path = CompiledPath::compile("$.data['complex.key'].val").unwrap();
        let payload = json!({
            "data": {
                "complex.key": {
                    "val": 42
                }
            }
        });
        assert_eq!(path.extract(&payload), Some(&json!(42)));
    }
}