use std::panic::{self, AssertUnwindSafe};
use std::sync::mpsc::{Receiver, Sender};
use std::time::Instant;
use super::preprocess::preprocess_result;
use super::types::{QueryError, QueryRequest, QueryResponse};
use crate::query::executor::JqExecutor;
pub fn spawn_worker(
json_input: String,
request_rx: Receiver<QueryRequest>,
response_tx: Sender<QueryResponse>,
array_sample_size: usize,
) {
std::thread::spawn(move || {
let response_tx_clone = response_tx.clone();
let prev_hook = panic::take_hook();
panic::set_hook(Box::new(move |panic_info| {
let panic_msg = if let Some(s) = panic_info.payload().downcast_ref::<&str>() {
s.to_string()
} else if let Some(s) = panic_info.payload().downcast_ref::<String>() {
s.clone()
} else {
"Unknown panic in query worker".to_string()
};
log::error!(
"Query worker panic: {} at {:?}",
panic_msg,
panic_info.location()
);
let _ = response_tx_clone.send(QueryResponse::Error {
message: format!("Query worker crashed: {}", panic_msg),
query: String::new(), request_id: 0,
});
}));
let result = panic::catch_unwind(AssertUnwindSafe(|| {
worker_loop(&json_input, request_rx, response_tx, array_sample_size);
}));
panic::set_hook(prev_hook);
if let Err(e) = result {
let panic_msg = if let Some(s) = e.downcast_ref::<&str>() {
s.to_string()
} else if let Some(s) = e.downcast_ref::<String>() {
s.clone()
} else {
"Unknown panic".to_string()
};
log::error!("Query worker thread panicked: {}", panic_msg);
}
});
}
fn worker_loop(
json_input: &str,
request_rx: Receiver<QueryRequest>,
response_tx: Sender<QueryResponse>,
array_sample_size: usize,
) {
let executor = JqExecutor::new_with_sample_size(json_input.to_string(), array_sample_size);
while let Ok(request) = request_rx.recv() {
handle_request(&executor, request, &response_tx, array_sample_size);
}
}
fn handle_request(
executor: &JqExecutor,
request: QueryRequest,
response_tx: &Sender<QueryResponse>,
array_sample_size: usize,
) {
if request.cancel_token.is_cancelled() {
let _ = response_tx.send(QueryResponse::Cancelled {
request_id: request.request_id,
});
return;
}
let query = request.query.clone();
let start = Instant::now();
match executor.execute_with_cancel(&request.query, &request.cancel_token) {
Ok(output) => {
match preprocess_result(output, &query, &request.cancel_token, array_sample_size) {
Ok(mut processed) => {
processed.execution_time_ms = Some(start.elapsed().as_millis() as u64);
let _ = response_tx.send(QueryResponse::ProcessedSuccess {
processed,
request_id: request.request_id,
});
}
Err(QueryError::Cancelled) => {
let _ = response_tx.send(QueryResponse::Cancelled {
request_id: request.request_id,
});
}
Err(e) => {
let _ = response_tx.send(QueryResponse::Error {
message: e.to_string(),
query: query.clone(),
request_id: request.request_id,
});
}
}
}
Err(QueryError::Cancelled) => {
let _ = response_tx.send(QueryResponse::Cancelled {
request_id: request.request_id,
});
}
Err(e) => {
let _ = response_tx.send(QueryResponse::Error {
message: e.to_string(),
query: request.query,
request_id: request.request_id,
});
}
}
}
#[cfg(test)]
#[path = "thread_tests.rs"]
mod thread_tests;