jarq 0.2.3

An interactive jq-like JSON query tool with a TUI
Documentation
use crossbeam_channel::{unbounded, Receiver, Sender};
use serde_json::Value;
use std::sync::Arc;
use std::thread::{self, JoinHandle};

use crate::error::FilterError;
use crate::filter;

pub struct EvalRequest {
    pub id: u64,
    pub filter_text: String,
    pub inputs: Arc<Vec<Value>>,
    pub slurp_mode: bool,
}

pub struct EvalResult {
    pub id: u64,
    pub result: Result<Vec<Value>, FilterError>,
}

/// Cache for intermediate pipe results
struct PipeCache {
    filter_text: String,
    values: Arc<Vec<Value>>,
    slurp_mode: bool,
}

impl PipeCache {
    /// Check if new_filter extends cached filter at a pipe boundary.
    /// Returns the suffix to evaluate if cache can be used, None otherwise.
    fn get_suffix<'a>(&self, new_filter: &'a str, new_slurp: bool) -> Option<&'a str> {
        // Input semantics changed - can't use cache
        if new_slurp != self.slurp_mode {
            return None;
        }

        // Normalize whitespace for comparison
        let cached_trimmed = self.filter_text.trim();
        let new_trimmed = new_filter.trim();

        // Not a prefix match
        if !new_trimmed.starts_with(cached_trimmed) {
            return None;
        }

        let remainder = &new_trimmed[cached_trimmed.len()..];
        let trimmed = remainder.trim_start();

        if trimmed.is_empty() {
            // Exact match - use cache as-is
            return Some("");
        }

        if trimmed.starts_with('|') {
            // Valid pipe extension - return the part after the pipe
            return Some(trimmed.strip_prefix('|').unwrap().trim_start());
        }

        // Not at pipe boundary (e.g., ".foo" -> ".foobar")
        None
    }
}

pub struct Worker {
    request_tx: Sender<EvalRequest>,
    result_rx: Receiver<EvalResult>,
    _handle: JoinHandle<()>,
}

impl Worker {
    pub fn spawn() -> Self {
        let (request_tx, request_rx) = unbounded::<EvalRequest>();
        let (result_tx, result_rx) = unbounded::<EvalResult>();

        let handle = thread::spawn(move || {
            let mut cache: Option<PipeCache> = None;

            while let Ok(req) = request_rx.recv() {
                // Skip if newer request waiting (implicit cancellation)
                if !request_rx.is_empty() {
                    continue;
                }

                // Check for cache hit
                let (base_values, filter_to_eval) = match &cache {
                    Some(c) => match c.get_suffix(&req.filter_text, req.slurp_mode) {
                        Some("") => {
                            // Exact match - return cached values directly
                            let _ = result_tx.send(EvalResult {
                                id: req.id,
                                result: Ok((*c.values).clone()),
                            });
                            continue;
                        }
                        Some(suffix) => {
                            // Pipe extension - evaluate only the suffix on cached values
                            (Arc::clone(&c.values), suffix.to_string())
                        }
                        None => {
                            // No cache hit - evaluate full filter
                            (prepare_inputs(&req), req.filter_text.clone())
                        }
                    },
                    None => {
                        // No cache - evaluate full filter
                        (prepare_inputs(&req), req.filter_text.clone())
                    }
                };

                // Evaluate the filter (either full or just the suffix)
                let result = filter::evaluate_all(&filter_to_eval, &base_values);

                // Update cache on success
                if let Ok(ref values) = result {
                    cache = Some(PipeCache {
                        filter_text: req.filter_text.clone(),
                        values: Arc::new(values.clone()),
                        slurp_mode: req.slurp_mode,
                    });
                }

                let _ = result_tx.send(EvalResult { id: req.id, result });
            }
        });

        Worker {
            request_tx,
            result_rx,
            _handle: handle,
        }
    }

    pub fn send(&self, req: EvalRequest) {
        let _ = self.request_tx.send(req);
    }

    pub fn try_recv(&self) -> Option<EvalResult> {
        self.result_rx.try_recv().ok()
    }
}

fn prepare_inputs(req: &EvalRequest) -> Arc<Vec<Value>> {
    if req.slurp_mode {
        Arc::new(vec![Value::Array((*req.inputs).clone())])
    } else {
        Arc::clone(&req.inputs)
    }
}