jarq 0.8.2

An interactive jq-like JSON query tool with a TUI
Documentation
use crossbeam_channel::{Receiver, Sender, unbounded};
use simd_json::OwnedValue as Value;
use std::sync::Arc;
use std::thread::{self, JoinHandle};

use crate::error::FilterError;
use crate::filter;

/// Maximum number of values to cache. Caching large intermediate results
/// doubles memory usage, so we skip caching when results exceed this threshold.
const CACHE_SIZE_LIMIT: usize = 1_000_000;

pub struct EvalRequest {
    pub id: u64,
    pub filter_text: String,
    pub inputs: Arc<Vec<Value>>,
    pub slurp_mode: bool,
}

pub struct EvalResult {
    pub id: u64,
    pub result: Result<Vec<Value>, FilterError>,
}

/// Cache for intermediate pipe results
struct PipeCache {
    filter_text: String,
    values: Arc<Vec<Value>>,
    slurp_mode: bool,
}

impl PipeCache {
    /// Check if new_filter extends cached filter at a pipe boundary.
    /// Returns the suffix to evaluate if cache can be used, None otherwise.
    fn get_suffix<'a>(&self, new_filter: &'a str, new_slurp: bool) -> Option<&'a str> {
        // Input semantics changed - can't use cache
        if new_slurp != self.slurp_mode {
            return None;
        }

        // Normalize whitespace for comparison
        let cached_trimmed = self.filter_text.trim();
        let new_trimmed = new_filter.trim();

        // Not a prefix match
        if !new_trimmed.starts_with(cached_trimmed) {
            return None;
        }

        let remainder = &new_trimmed[cached_trimmed.len()..];
        let trimmed = remainder.trim_start();

        if trimmed.is_empty() {
            // Exact match - use cache as-is
            return Some("");
        }

        if trimmed.starts_with('|') {
            // Valid pipe extension - return the part after the pipe
            return Some(trimmed.strip_prefix('|').unwrap().trim_start());
        }

        // Not at pipe boundary (e.g., ".foo" -> ".foobar")
        None
    }
}

pub struct Worker {
    request_tx: Option<Sender<EvalRequest>>,
    result_rx: Receiver<EvalResult>,
    handle: Option<JoinHandle<()>>,
}

impl Worker {
    pub fn spawn() -> Self {
        let (request_tx, request_rx) = unbounded::<EvalRequest>();
        let (result_tx, result_rx) = unbounded::<EvalResult>();

        let handle = thread::spawn(move || {
            let mut cache: Option<PipeCache> = None;

            while let Ok(mut req) = request_rx.recv() {
                // Drain any newer requests, keeping only the latest.
                // This replaces the old TOCTOU-prone is_empty() check.
                while let Ok(newer) = request_rx.try_recv() {
                    req = newer;
                }

                // Check for cache hit
                // error_offset tracks where suffix starts in full filter (for error position adjustment)
                let (base_values, filter_to_eval, error_offset) = match &cache {
                    Some(c) => match c.get_suffix(&req.filter_text, req.slurp_mode) {
                        Some("") => {
                            // Exact match - return cached values directly
                            let _ = result_tx.send(EvalResult {
                                id: req.id,
                                result: Ok((*c.values).clone()),
                            });
                            continue;
                        }
                        Some(suffix) => {
                            // Pipe extension - evaluate only the suffix on cached values
                            // Calculate offset: find where suffix starts in full filter
                            let offset = req.filter_text.len() - suffix.len();
                            (Arc::clone(&c.values), suffix.to_string(), offset)
                        }
                        None => {
                            // No cache hit - evaluate full filter
                            (prepare_inputs(&req), req.filter_text.clone(), 0)
                        }
                    },
                    None => {
                        // No cache - evaluate full filter
                        (prepare_inputs(&req), req.filter_text.clone(), 0)
                    }
                };

                // Evaluate the filter (either full or just the suffix)
                let result = filter::evaluate_all(&filter_to_eval, &base_values).map_err(|e| {
                    // Adjust error position to account for suffix offset
                    if error_offset > 0 {
                        match e {
                            FilterError::Eval(mut eval_err) => {
                                let pos = eval_err.position();
                                eval_err.set_position(pos + error_offset);
                                FilterError::Eval(eval_err)
                            }
                            FilterError::Parse(mut parse_err) => {
                                parse_err.position += error_offset;
                                FilterError::Parse(parse_err)
                            }
                        }
                    } else {
                        e
                    }
                });

                // Update cache on success (skip if too large to avoid OOM)
                if let Ok(ref values) = result {
                    if values.len() <= CACHE_SIZE_LIMIT {
                        cache = Some(PipeCache {
                            filter_text: req.filter_text.clone(),
                            values: Arc::new(values.clone()),
                            slurp_mode: req.slurp_mode,
                        });
                    } else {
                        cache = None;
                    }
                }

                // Skip sending if newer requests arrived during evaluation.
                // The App side also rejects stale results, but this avoids
                // unnecessary channel traffic.
                if !request_rx.is_empty() {
                    continue;
                }

                let _ = result_tx.send(EvalResult { id: req.id, result });
            }
        });

        Worker {
            request_tx: Some(request_tx),
            result_rx,
            handle: Some(handle),
        }
    }

    pub fn send(&self, req: EvalRequest) {
        if let Some(tx) = &self.request_tx {
            let _ = tx.send(req);
        }
    }

    pub fn try_recv(&self) -> Option<EvalResult> {
        self.result_rx.try_recv().ok()
    }
}

impl Drop for Worker {
    fn drop(&mut self) {
        // Drop the sender to signal the worker to exit
        self.request_tx.take();
        // Wait for the worker thread to finish
        if let Some(handle) = self.handle.take() {
            let _ = handle.join();
        }
    }
}

fn prepare_inputs(req: &EvalRequest) -> Arc<Vec<Value>> {
    if req.slurp_mode {
        Arc::new(vec![Value::Array(Box::new((*req.inputs).clone()))])
    } else {
        Arc::clone(&req.inputs)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use simd_json::json;
    use std::time::Duration;

    fn make_cache(filter_text: &str, slurp_mode: bool) -> PipeCache {
        PipeCache {
            filter_text: filter_text.to_string(),
            values: Arc::new(vec![]),
            slurp_mode,
        }
    }

    // =========================================================================
    // PipeCache::get_suffix tests
    // =========================================================================

    #[test]
    fn test_pipe_cache_exact_match() {
        let cache = make_cache(".foo", false);
        assert_eq!(cache.get_suffix(".foo", false), Some(""));
    }

    #[test]
    fn test_pipe_cache_pipe_extension() {
        let cache = make_cache(".foo", false);
        assert_eq!(cache.get_suffix(".foo | .bar", false), Some(".bar"));
    }

    #[test]
    fn test_pipe_cache_multiple_pipes() {
        let cache = make_cache(".foo | .bar", false);
        assert_eq!(cache.get_suffix(".foo | .bar | .baz", false), Some(".baz"));
    }

    #[test]
    fn test_pipe_cache_not_at_pipe_boundary() {
        let cache = make_cache(".foo", false);
        // ".foobar" is not a pipe extension of ".foo"
        assert_eq!(cache.get_suffix(".foobar", false), None);
    }

    #[test]
    fn test_pipe_cache_slurp_mode_mismatch() {
        let cache = make_cache(".foo", false);
        // Cache was created without slurp, but new request has slurp
        assert_eq!(cache.get_suffix(".foo | .bar", true), None);
    }

    #[test]
    fn test_pipe_cache_slurp_mode_match() {
        let cache = make_cache(".foo", true);
        assert_eq!(cache.get_suffix(".foo | .bar", true), Some(".bar"));
    }

    #[test]
    fn test_pipe_cache_whitespace_normalization() {
        let cache = make_cache("  .foo  ", false);
        // Extra whitespace in new filter should still match
        assert_eq!(cache.get_suffix(".foo | .bar", false), Some(".bar"));
    }

    #[test]
    fn test_pipe_cache_whitespace_around_pipe() {
        let cache = make_cache(".foo", false);
        // Various whitespace around the pipe
        assert_eq!(cache.get_suffix(".foo|.bar", false), Some(".bar"));
        assert_eq!(cache.get_suffix(".foo |.bar", false), Some(".bar"));
        assert_eq!(cache.get_suffix(".foo| .bar", false), Some(".bar"));
        assert_eq!(cache.get_suffix(".foo  |  .bar", false), Some(".bar"));
    }

    #[test]
    fn test_pipe_cache_no_prefix_match() {
        let cache = make_cache(".foo", false);
        assert_eq!(cache.get_suffix(".bar", false), None);
    }

    #[test]
    fn test_pipe_cache_shorter_filter() {
        let cache = make_cache(".foo | .bar", false);
        // New filter is shorter - not a valid extension
        assert_eq!(cache.get_suffix(".foo", false), None);
    }

    // =========================================================================
    // Integration test
    // =========================================================================

    /// Test that error positions are correctly adjusted when using pipe cache.
    /// When a cached prefix is reused and only the suffix is evaluated,
    /// error positions must be offset to point to the correct location
    /// in the full filter text.
    #[test]
    fn test_pipe_cache_error_position_adjustment() {
        let worker = Worker::spawn();

        // Input data: array of objects with "name" field
        let inputs = Arc::new(vec![json!({"name": "alice"}), json!({"name": "bob"})]);

        // First request: evaluate a prefix that succeeds and gets cached
        // This collects names into an array: ["alice", "bob"]
        let prefix_filter = "[.[].name]".to_string();
        worker.send(EvalRequest {
            id: 1,
            filter_text: prefix_filter.clone(),
            inputs: Arc::clone(&inputs),
            slurp_mode: false,
        });

        // Wait for result
        let mut result = None;
        for _ in 0..100 {
            if let Some(r) = worker.try_recv() {
                result = Some(r);
                break;
            }
            thread::sleep(Duration::from_millis(10));
        }
        assert!(result.is_some(), "Should receive result for prefix");
        assert!(result.unwrap().result.is_ok(), "Prefix should succeed");

        // Second request: extend with a suffix that will error
        // .[] iterates the array, {name: .} creates objects,
        // sort_by(.name) fails because it receives objects one at a time, not an array
        let full_filter = "[.[].name] | .[] | {name: .} | sort_by(.name)".to_string();
        worker.send(EvalRequest {
            id: 2,
            filter_text: full_filter.clone(),
            inputs: Arc::clone(&inputs),
            slurp_mode: false,
        });

        // Wait for result
        let mut result = None;
        for _ in 0..100 {
            if let Some(r) = worker.try_recv() {
                result = Some(r);
                break;
            }
            thread::sleep(Duration::from_millis(10));
        }
        assert!(result.is_some(), "Should receive result for full filter");

        let err = result
            .unwrap()
            .result
            .expect_err("Full filter should error");
        let pos = match &err {
            FilterError::Eval(e) => e.position(),
            FilterError::Parse(e) => e.position,
        };

        // sort_by(.name) starts at position 31 in the full filter
        // The error position should point there, not at 0
        let error_text = &full_filter[pos..];
        assert!(
            error_text.starts_with("sort_by"),
            "Error position {} should point to 'sort_by', got '{}'",
            pos,
            error_text
        );
    }
}