jarq 0.7.5

An interactive jq-like JSON query tool with a TUI
Documentation
use crossbeam_channel::{Receiver, Sender, unbounded};
use simd_json::OwnedValue as Value;
use std::sync::Arc;
use std::thread::{self, JoinHandle};

use crate::error::FilterError;
use crate::filter;

pub struct EvalRequest {
    pub id: u64,
    pub filter_text: String,
    pub inputs: Arc<Vec<Value>>,
    pub slurp_mode: bool,
}

pub struct EvalResult {
    pub id: u64,
    pub result: Result<Vec<Value>, FilterError>,
}

/// Cache for intermediate pipe results
struct PipeCache {
    filter_text: String,
    values: Arc<Vec<Value>>,
    slurp_mode: bool,
}

impl PipeCache {
    /// Check if new_filter extends cached filter at a pipe boundary.
    /// Returns the suffix to evaluate if cache can be used, None otherwise.
    fn get_suffix<'a>(&self, new_filter: &'a str, new_slurp: bool) -> Option<&'a str> {
        // Input semantics changed - can't use cache
        if new_slurp != self.slurp_mode {
            return None;
        }

        // Normalize whitespace for comparison
        let cached_trimmed = self.filter_text.trim();
        let new_trimmed = new_filter.trim();

        // Not a prefix match
        if !new_trimmed.starts_with(cached_trimmed) {
            return None;
        }

        let remainder = &new_trimmed[cached_trimmed.len()..];
        let trimmed = remainder.trim_start();

        if trimmed.is_empty() {
            // Exact match - use cache as-is
            return Some("");
        }

        if trimmed.starts_with('|') {
            // Valid pipe extension - return the part after the pipe
            return Some(trimmed.strip_prefix('|').unwrap().trim_start());
        }

        // Not at pipe boundary (e.g., ".foo" -> ".foobar")
        None
    }
}

pub struct Worker {
    request_tx: Sender<EvalRequest>,
    result_rx: Receiver<EvalResult>,
    _handle: JoinHandle<()>,
}

impl Worker {
    pub fn spawn() -> Self {
        let (request_tx, request_rx) = unbounded::<EvalRequest>();
        let (result_tx, result_rx) = unbounded::<EvalResult>();

        let handle = thread::spawn(move || {
            let mut cache: Option<PipeCache> = None;

            while let Ok(req) = request_rx.recv() {
                // Skip if newer request waiting (implicit cancellation)
                if !request_rx.is_empty() {
                    continue;
                }

                // Check for cache hit
                // error_offset tracks where suffix starts in full filter (for error position adjustment)
                let (base_values, filter_to_eval, error_offset) = match &cache {
                    Some(c) => match c.get_suffix(&req.filter_text, req.slurp_mode) {
                        Some("") => {
                            // Exact match - return cached values directly
                            let _ = result_tx.send(EvalResult {
                                id: req.id,
                                result: Ok((*c.values).clone()),
                            });
                            continue;
                        }
                        Some(suffix) => {
                            // Pipe extension - evaluate only the suffix on cached values
                            // Calculate offset: find where suffix starts in full filter
                            let offset = req.filter_text.len() - suffix.len();
                            (Arc::clone(&c.values), suffix.to_string(), offset)
                        }
                        None => {
                            // No cache hit - evaluate full filter
                            (prepare_inputs(&req), req.filter_text.clone(), 0)
                        }
                    },
                    None => {
                        // No cache - evaluate full filter
                        (prepare_inputs(&req), req.filter_text.clone(), 0)
                    }
                };

                // Evaluate the filter (either full or just the suffix)
                let result = filter::evaluate_all(&filter_to_eval, &base_values).map_err(|e| {
                    // Adjust error position to account for suffix offset
                    if error_offset > 0 {
                        match e {
                            FilterError::Eval(mut eval_err) => {
                                let pos = eval_err.position();
                                eval_err.set_position(pos + error_offset);
                                FilterError::Eval(eval_err)
                            }
                            FilterError::Parse(mut parse_err) => {
                                parse_err.position += error_offset;
                                FilterError::Parse(parse_err)
                            }
                        }
                    } else {
                        e
                    }
                });

                // Update cache on success
                if let Ok(ref values) = result {
                    cache = Some(PipeCache {
                        filter_text: req.filter_text.clone(),
                        values: Arc::new(values.clone()),
                        slurp_mode: req.slurp_mode,
                    });
                }

                let _ = result_tx.send(EvalResult { id: req.id, result });
            }
        });

        Worker {
            request_tx,
            result_rx,
            _handle: handle,
        }
    }

    pub fn send(&self, req: EvalRequest) {
        let _ = self.request_tx.send(req);
    }

    pub fn try_recv(&self) -> Option<EvalResult> {
        self.result_rx.try_recv().ok()
    }
}

fn prepare_inputs(req: &EvalRequest) -> Arc<Vec<Value>> {
    if req.slurp_mode {
        Arc::new(vec![Value::Array(Box::new((*req.inputs).clone()))])
    } else {
        Arc::clone(&req.inputs)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use simd_json::json;
    use std::time::Duration;

    /// Test that error positions are correctly adjusted when using pipe cache.
    /// When a cached prefix is reused and only the suffix is evaluated,
    /// error positions must be offset to point to the correct location
    /// in the full filter text.
    #[test]
    fn test_pipe_cache_error_position_adjustment() {
        let worker = Worker::spawn();

        // Input data: array of objects with "name" field
        let inputs = Arc::new(vec![json!({"name": "alice"}), json!({"name": "bob"})]);

        // First request: evaluate a prefix that succeeds and gets cached
        // This collects names into an array: ["alice", "bob"]
        let prefix_filter = "[.[].name]".to_string();
        worker.send(EvalRequest {
            id: 1,
            filter_text: prefix_filter.clone(),
            inputs: Arc::clone(&inputs),
            slurp_mode: false,
        });

        // Wait for result
        let mut result = None;
        for _ in 0..100 {
            if let Some(r) = worker.try_recv() {
                result = Some(r);
                break;
            }
            thread::sleep(Duration::from_millis(10));
        }
        assert!(result.is_some(), "Should receive result for prefix");
        assert!(result.unwrap().result.is_ok(), "Prefix should succeed");

        // Second request: extend with a suffix that will error
        // .[] iterates the array, {name: .} creates objects,
        // sort_by(.name) fails because it receives objects one at a time, not an array
        let full_filter = "[.[].name] | .[] | {name: .} | sort_by(.name)".to_string();
        worker.send(EvalRequest {
            id: 2,
            filter_text: full_filter.clone(),
            inputs: Arc::clone(&inputs),
            slurp_mode: false,
        });

        // Wait for result
        let mut result = None;
        for _ in 0..100 {
            if let Some(r) = worker.try_recv() {
                result = Some(r);
                break;
            }
            thread::sleep(Duration::from_millis(10));
        }
        assert!(result.is_some(), "Should receive result for full filter");

        let err = result
            .unwrap()
            .result
            .expect_err("Full filter should error");
        let pos = match &err {
            FilterError::Eval(e) => e.position(),
            FilterError::Parse(e) => e.position,
        };

        // sort_by(.name) starts at position 31 in the full filter
        // The error position should point there, not at 0
        let error_text = &full_filter[pos..];
        assert!(
            error_text.starts_with("sort_by"),
            "Error position {} should point to 'sort_by', got '{}'",
            pos,
            error_text
        );
    }
}