1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
use std::collections::HashSet;
use std::process::{Command, Stdio};
use std::sync::{Arc, OnceLock};
use std::thread::sleep;
use std::time::Duration;
use serde_json::Value;
use tokio_util::sync::CancellationToken;
use crate::autocomplete::json_navigator::DEFAULT_ARRAY_SAMPLE_SIZE;
use crate::query::worker::types::QueryError;
/// Execute jq queries against JSON input
///
/// Uses Arc<String> to enable cheap cloning when spawning worker threads.
/// Without Arc, each query execution would copy the entire JSON input (O(n)),
/// causing typing lag on large files. With Arc, cloning is just a reference
/// count increment (O(1)).
pub struct JqExecutor {
json_input: Arc<String>,
/// Lazily parsed JSON input, cached for autocomplete navigation.
/// Uses OnceLock for thread-safe one-time initialization.
json_input_parsed: OnceLock<Option<Arc<Value>>>,
/// All unique field names from the JSON, collected recursively.
/// Cached for non-deterministic autocomplete fallback.
all_field_names: OnceLock<Arc<HashSet<String>>>,
array_sample_size: usize,
}
impl JqExecutor {
/// Create a new JQ executor with JSON input and default sample size
pub fn new(json_input: String) -> Self {
Self::new_with_sample_size(json_input, DEFAULT_ARRAY_SAMPLE_SIZE)
}
/// Create a new JQ executor with JSON input and custom array sample size
pub fn new_with_sample_size(json_input: String, array_sample_size: usize) -> Self {
Self {
json_input: Arc::new(json_input),
json_input_parsed: OnceLock::new(),
all_field_names: OnceLock::new(),
array_sample_size,
}
}
/// Get a reference to the JSON input
pub fn json_input(&self) -> &str {
&self.json_input
}
/// Get the parsed JSON input, lazily parsing on first access.
///
/// Returns the original input JSON as a parsed Value, cached for repeated access.
/// This is the true original file input that never changes during the session.
/// Used by autocomplete to navigate nested structures.
///
/// Returns `None` if the JSON input is invalid.
pub fn json_input_parsed(&self) -> Option<Arc<Value>> {
self.json_input_parsed
.get_or_init(|| serde_json::from_str(&self.json_input).ok().map(Arc::new))
.clone()
}
/// Get all unique field names from the JSON, collected recursively.
///
/// Returns a cached set of all field names found anywhere in the JSON tree.
/// Used for non-deterministic autocomplete fallback when path navigation fails.
pub fn all_field_names(&self) -> Arc<HashSet<String>> {
let sample_size = self.array_sample_size;
self.all_field_names
.get_or_init(|| {
let mut fields = HashSet::new();
if let Some(parsed) = self.json_input_parsed() {
Self::collect_fields_recursive(&parsed, &mut fields, sample_size);
}
Arc::new(fields)
})
.clone()
}
fn collect_fields_recursive(
value: &Value,
fields: &mut HashSet<String>,
array_sample_size: usize,
) {
match value {
Value::Object(map) => {
for (key, val) in map {
fields.insert(key.clone());
Self::collect_fields_recursive(val, fields, array_sample_size);
}
}
Value::Array(arr) => {
for element in arr.iter().take(array_sample_size) {
Self::collect_fields_recursive(element, fields, array_sample_size);
}
}
_ => {}
}
}
/// Execute a jq query with cancellation support
///
/// Uses polling approach with try_wait() to check for cancellation
/// and process completion. This avoids blocking the worker thread
/// while still allowing cancellation.
///
/// # Arguments
/// * `query` - The jq filter expression
/// * `cancel_token` - Token for cancelling execution
///
/// # Returns
/// * `Ok(String)` - Filtered JSON output with colors
/// * `Err(QueryError)` - Error or cancellation
pub fn execute_with_cancel(
&self,
query: &str,
cancel_token: &CancellationToken,
) -> Result<String, QueryError> {
use std::io::Read;
use std::sync::mpsc::channel;
// Empty query defaults to identity filter
let query = if query.trim().is_empty() { "." } else { query };
// Galaxy theme colors for jq output (using true color ANSI codes)
// Format: null:false:true:numbers:strings:arrays:objects:keys
// Each segment is an ANSI SGR code (38;2;R;G;B for true color)
let jq_colors = [
"38;2;130;133;158", // null - muted gray
"38;2;224;108;117", // false - soft red
"38;2;107;203;119", // true - fresh green
"38;2;189;147;249", // numbers - purple
"38;2;107;203;119", // strings - fresh green
"1;38;2;0;217;255", // arrays - bold electric cyan
"1;38;2;0;217;255", // objects - bold electric cyan
"1;38;2;255;217;61", // keys - bold golden yellow
]
.join(":");
// Spawn jq process with custom colors
let mut child = Command::new("jq")
.env("JQ_COLORS", jq_colors)
.arg("--color-output")
.arg(query)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| QueryError::SpawnFailed(e.to_string()))?;
// Spawn thread to write JSON to stdin
// This prevents deadlock if JSON is large (>64KB) and jq is slow to read
// Arc::clone is O(1) - just increments reference count, no data copying
let json_input = Arc::clone(&self.json_input);
if let Some(stdin) = child.stdin.take() {
std::thread::spawn(move || {
use std::io::Write;
let mut stdin = stdin;
let _ = stdin.write_all(json_input.as_bytes());
// stdin is dropped here, closing the pipe
});
}
// Spawn threads to read stdout/stderr concurrently
// This prevents pipe buffer deadlock on large outputs
let (stdout_tx, stdout_rx) = channel();
let (stderr_tx, stderr_rx) = channel();
if let Some(mut stdout) = child.stdout.take() {
std::thread::spawn(move || {
let mut buffer = Vec::new();
let _ = stdout.read_to_end(&mut buffer);
let _ = stdout_tx.send(buffer);
});
}
if let Some(mut stderr) = child.stderr.take() {
std::thread::spawn(move || {
let mut buffer = Vec::new();
let _ = stderr.read_to_end(&mut buffer);
let _ = stderr_tx.send(buffer);
});
}
// Poll for completion or cancellation
const POLL_INTERVAL_MS: u64 = 10;
let status = loop {
// Check cancellation first
if cancel_token.is_cancelled() {
let _ = child.kill();
return Err(QueryError::Cancelled);
}
// Check if process finished
match child
.try_wait()
.map_err(|e| QueryError::OutputReadFailed(e.to_string()))?
{
Some(s) => break s,
None => {
// Process still running - sleep briefly
sleep(Duration::from_millis(POLL_INTERVAL_MS));
}
}
};
// Process has exited - collect output from reader threads
let stdout_data = stdout_rx
.recv()
.map_err(|_| QueryError::OutputReadFailed("Failed to read stdout".to_string()))?;
let stderr_data = stderr_rx
.recv()
.map_err(|_| QueryError::OutputReadFailed("Failed to read stderr".to_string()))?;
if status.success() {
Ok(String::from_utf8_lossy(&stdout_data).to_string())
} else {
Err(QueryError::ExecutionFailed(
String::from_utf8_lossy(&stderr_data).to_string(),
))
}
}
}
#[cfg(test)]
#[path = "executor_tests.rs"]
mod executor_tests;