use std::collections::HashMap;
use std::future::Future;
use std::panic::{self, AssertUnwindSafe};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::Duration;
use futures::{FutureExt, Stream, StreamExt};
use serde_json::{Map, Value, json};
use tokio::sync::{Mutex, mpsc, oneshot};
use tokio::task::JoinHandle;
use tracing::{debug, warn};
use crate::errors::{Error, Result};
use crate::message_parser::parse_message;
use crate::sdk_mcp::McpSdkServer;
use crate::transport::{TransportCloseHandle, TransportReader, TransportWriter};
use crate::types::{
AgentDefinition, CanUseToolCallback, HookCallback, HookMatcher, McpStatusResponse, Message,
PermissionResult, ToolPermissionContext,
};
const MESSAGE_CHANNEL_BUFFER: usize = 100;
fn convert_hook_output_for_cli(output: Value) -> Value {
let Some(obj) = output.as_object() else {
return output;
};
let mut converted = Map::new();
for (key, value) in obj {
match key.as_str() {
"async_" => {
converted.insert("async".to_string(), value.clone());
}
"continue_" => {
converted.insert("continue".to_string(), value.clone());
}
_ => {
converted.insert(key.clone(), value.clone());
}
}
}
Value::Object(converted)
}
fn panic_payload_to_string(payload: Box<dyn std::any::Any + Send>) -> String {
if let Some(msg) = payload.downcast_ref::<&str>() {
(*msg).to_string()
} else if let Some(msg) = payload.downcast_ref::<String>() {
msg.clone()
} else {
"unknown panic payload".to_string()
}
}
fn callback_panic_error(callback_type: &str, payload: Box<dyn std::any::Any + Send>) -> Error {
let panic_message = panic_payload_to_string(payload);
warn!(
callback_type,
panic_message, "Caught panic in callback invocation"
);
Error::Other(format!(
"{callback_type} callback panicked: {panic_message}"
))
}
async fn await_callback_with_panic_isolation<T, F>(
callback_type: &str,
callback_future: F,
) -> Result<T>
where
F: Future<Output = Result<T>>,
{
match AssertUnwindSafe(callback_future).catch_unwind().await {
Ok(result) => result,
Err(payload) => Err(callback_panic_error(callback_type, payload)),
}
}
struct PendingControlsState {
senders: HashMap<String, oneshot::Sender<Result<Value>>>,
buffered: HashMap<String, Result<Value>>,
}
struct QuerySharedState {
can_use_tool: Option<CanUseToolCallback>,
hook_callbacks: Mutex<HashMap<String, HookCallback>>,
sdk_mcp_servers: HashMap<String, Arc<McpSdkServer>>,
pending_controls: Mutex<PendingControlsState>,
writer: Arc<Mutex<Box<dyn TransportWriter>>>,
pending_stdin_close: AtomicBool,
stream_close_timeout: Duration,
reader_terminated: AtomicBool,
reader_termination_reason: Mutex<Option<String>>,
}
pub struct Query {
state: Option<Arc<QuerySharedState>>,
message_rx: Option<mpsc::Receiver<Result<Message>>>,
reader_task: Option<JoinHandle<()>>,
close_handle: Option<Box<dyn TransportCloseHandle>>,
request_counter: Arc<AtomicUsize>,
is_streaming_mode: bool,
agents: Option<HashMap<String, AgentDefinition>>,
initialized: bool,
initialization_result: Option<Value>,
initialize_timeout: Duration,
has_hooks_or_mcp: bool,
}
impl Query {
#[allow(clippy::too_many_arguments)]
pub(crate) fn start(
reader: Box<dyn TransportReader>,
writer: Box<dyn TransportWriter>,
close_handle: Box<dyn TransportCloseHandle>,
is_streaming_mode: bool,
can_use_tool: Option<CanUseToolCallback>,
hook_callbacks: HashMap<String, HookCallback>,
sdk_mcp_servers: HashMap<String, Arc<McpSdkServer>>,
agents: Option<HashMap<String, AgentDefinition>>,
initialize_timeout: Duration,
) -> Self {
let stream_close_timeout_ms: u64 = std::env::var("CLAUDE_CODE_STREAM_CLOSE_TIMEOUT")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(60_000);
let stream_close_timeout =
Duration::from_millis(stream_close_timeout_ms).max(Duration::from_secs(60));
let has_hooks_or_mcp = !hook_callbacks.is_empty() || !sdk_mcp_servers.is_empty();
let writer = Arc::new(Mutex::new(writer));
let state = Arc::new(QuerySharedState {
can_use_tool,
hook_callbacks: Mutex::new(hook_callbacks),
sdk_mcp_servers,
pending_controls: Mutex::new(PendingControlsState {
senders: HashMap::new(),
buffered: HashMap::new(),
}),
writer: writer.clone(),
pending_stdin_close: AtomicBool::new(false),
stream_close_timeout,
reader_terminated: AtomicBool::new(false),
reader_termination_reason: Mutex::new(None),
});
let (message_tx, message_rx) = mpsc::channel(MESSAGE_CHANNEL_BUFFER);
let reader_state = state.clone();
let reader_task = tokio::spawn(async move {
background_reader_task(reader, reader_state, message_tx).await;
});
Self {
state: Some(state),
message_rx: Some(message_rx),
reader_task: Some(reader_task),
close_handle: Some(close_handle),
request_counter: Arc::new(AtomicUsize::new(0)),
is_streaming_mode,
agents,
initialized: false,
initialization_result: None,
initialize_timeout,
has_hooks_or_mcp,
}
}
pub async fn initialize(&mut self, hooks_config: Map<String, Value>) -> Result<Option<Value>> {
if !self.is_streaming_mode {
return Ok(None);
}
let mut request = Map::new();
request.insert(
"subtype".to_string(),
Value::String("initialize".to_string()),
);
request.insert(
"hooks".to_string(),
if hooks_config.is_empty() {
Value::Null
} else {
Value::Object(hooks_config)
},
);
if let Some(agents) = &self.agents {
request.insert(
"agents".to_string(),
serde_json::to_value(agents).unwrap_or(Value::Null),
);
}
let response = self
.send_control_request(Value::Object(request), self.initialize_timeout)
.await?;
self.initialized = true;
self.initialization_result = Some(response.clone());
Ok(Some(response))
}
pub fn initialization_result(&self) -> Option<Value> {
self.initialization_result.clone()
}
async fn send_control_request(&self, request: Value, timeout: Duration) -> Result<Value> {
if !self.is_streaming_mode {
return Err(Error::Other(
"Control requests require streaming mode".to_string(),
));
}
let state = self
.state
.as_ref()
.ok_or_else(|| Error::Other("Query not started or already closed.".to_string()))?;
let request_id = format!(
"req_{}",
self.request_counter.fetch_add(1, Ordering::SeqCst) + 1
);
let control_request = json!({
"type": "control_request",
"request_id": request_id,
"request": request,
});
state
.writer
.lock()
.await
.write(&(control_request.to_string() + "\n"))
.await?;
let (tx, rx) = oneshot::channel();
{
let mut controls = state.pending_controls.lock().await;
if let Some(result) = controls.buffered.remove(&request_id) {
return result;
}
controls.senders.insert(request_id.clone(), tx);
}
if state.reader_terminated.load(Ordering::SeqCst) {
state
.pending_controls
.lock()
.await
.senders
.remove(&request_id);
let reason = reader_termination_reason(state).await;
return Err(Error::Other(format!(
"Background reader task terminated: {reason}"
)));
}
let result = tokio::time::timeout(timeout, rx).await;
match result {
Ok(Ok(value)) => value,
Ok(Err(_)) => {
Err(Error::Other(
"Background reader task terminated while waiting for control response"
.to_string(),
))
}
Err(_) => {
let subtype = request
.get("subtype")
.and_then(Value::as_str)
.unwrap_or("unknown");
state
.pending_controls
.lock()
.await
.senders
.remove(&request_id);
Err(Error::Other(format!("Control request timeout: {subtype}")))
}
}
}
pub async fn send_user_message(&self, prompt: &str, session_id: &str) -> Result<()> {
let message = json!({
"type": "user",
"message": {"role": "user", "content": prompt},
"parent_tool_use_id": Value::Null,
"session_id": session_id
});
self.write_message(&message).await
}
pub async fn send_raw_message(&self, message: Value) -> Result<()> {
self.write_message(&message).await
}
async fn write_message(&self, message: &Value) -> Result<()> {
let state = self
.state
.as_ref()
.ok_or_else(|| Error::Other("Query not started or already closed.".to_string()))?;
state
.writer
.lock()
.await
.write(&(message.to_string() + "\n"))
.await
}
pub async fn send_input_messages(&self, messages: Vec<Value>) -> Result<()> {
for message in messages {
self.send_raw_message(message).await?;
}
Ok(())
}
pub async fn send_input_from_stream<S>(&self, mut messages: S) -> Result<()>
where
S: Stream<Item = Value> + Unpin,
{
while let Some(message) = messages.next().await {
self.send_raw_message(message).await?;
}
Ok(())
}
pub fn spawn_input_from_stream<S>(&self, mut messages: S) -> Result<JoinHandle<Result<()>>>
where
S: Stream<Item = Value> + Send + Unpin + 'static,
{
let state = self
.state
.as_ref()
.cloned()
.ok_or_else(|| Error::Other("Query not started or already closed.".to_string()))?;
Ok(tokio::spawn(async move {
while let Some(message) = messages.next().await {
state
.writer
.lock()
.await
.write(&(message.to_string() + "\n"))
.await?;
}
Ok(())
}))
}
pub async fn stream_input(&self, messages: Vec<Value>) -> Result<()> {
self.send_input_messages(messages).await?;
self.finalize_stream_input().await
}
pub async fn stream_input_from_stream<S>(&self, mut messages: S) -> Result<()>
where
S: Stream<Item = Value> + Unpin,
{
self.send_input_from_stream(&mut messages).await?;
self.finalize_stream_input().await
}
async fn finalize_stream_input(&self) -> Result<()> {
let state = self
.state
.as_ref()
.ok_or_else(|| Error::Other("Query not started or already closed.".to_string()))?;
if self.has_hooks_or_mcp {
debug!(
has_hooks_or_mcp = self.has_hooks_or_mcp,
"Deferring stdin close until first result"
);
state.pending_stdin_close.store(true, Ordering::SeqCst);
} else {
state.writer.lock().await.end_input().await?;
}
Ok(())
}
pub async fn end_input(&self) -> Result<()> {
let state = self
.state
.as_ref()
.ok_or_else(|| Error::Other("Query not started or already closed.".to_string()))?;
state.writer.lock().await.end_input().await
}
pub async fn receive_next_message(&mut self) -> Result<Option<Message>> {
let rx = self
.message_rx
.as_mut()
.ok_or_else(|| Error::Other("Query not started or already closed.".to_string()))?;
match rx.recv().await {
Some(Ok(message)) => Ok(Some(message)),
Some(Err(err)) => Err(err),
None => Ok(None),
}
}
pub async fn get_mcp_status(&self) -> Result<McpStatusResponse> {
let raw = self
.send_control_request(json!({ "subtype": "mcp_status" }), Duration::from_secs(60))
.await?;
serde_json::from_value(raw).map_err(|err| {
Error::Other(format!("Failed to decode typed MCP status response: {err}"))
})
}
pub async fn interrupt(&self) -> Result<()> {
self.send_control_request(json!({ "subtype": "interrupt" }), Duration::from_secs(60))
.await?;
Ok(())
}
pub async fn set_permission_mode(&self, mode: &str) -> Result<()> {
self.send_control_request(
json!({ "subtype": "set_permission_mode", "mode": mode }),
Duration::from_secs(60),
)
.await?;
Ok(())
}
pub async fn set_model(&self, model: Option<&str>) -> Result<()> {
self.send_control_request(
json!({ "subtype": "set_model", "model": model }),
Duration::from_secs(60),
)
.await?;
Ok(())
}
pub async fn rewind_files(&self, user_message_id: &str) -> Result<()> {
self.send_control_request(
json!({ "subtype": "rewind_files", "user_message_id": user_message_id }),
Duration::from_secs(60),
)
.await?;
Ok(())
}
pub async fn reconnect_mcp_server(&self, server_name: &str) -> Result<()> {
self.send_control_request(
json!({ "subtype": "mcp_reconnect", "serverName": server_name }),
Duration::from_secs(60),
)
.await?;
Ok(())
}
pub async fn toggle_mcp_server(&self, server_name: &str, enabled: bool) -> Result<()> {
self.send_control_request(
json!({ "subtype": "mcp_toggle", "serverName": server_name, "enabled": enabled }),
Duration::from_secs(60),
)
.await?;
Ok(())
}
pub async fn stop_task(&self, task_id: &str) -> Result<()> {
self.send_control_request(
json!({ "subtype": "stop_task", "task_id": task_id }),
Duration::from_secs(60),
)
.await?;
Ok(())
}
pub async fn close(mut self) -> Result<()> {
self.shutdown().await
}
async fn shutdown(&mut self) -> Result<()> {
self.message_rx.take();
self.state.take();
if let Some(task) = self.reader_task.take() {
task.abort();
let _ = task.await;
}
if let Some(close_handle) = self.close_handle.take() {
close_handle.close().await?;
}
Ok(())
}
pub(crate) fn take_message_receiver(&mut self) -> Option<mpsc::Receiver<Result<Message>>> {
self.message_rx.take()
}
}
impl Drop for Query {
fn drop(&mut self) {
if let Some(task) = self.reader_task.take() {
task.abort();
}
if let Some(close_handle) = self.close_handle.take() {
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
let _ = close_handle.close().await;
});
} else if let Ok(runtime) = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
let _ = runtime.block_on(async move { close_handle.close().await });
}
}
}
}
async fn background_reader_task(
mut reader: Box<dyn TransportReader>,
state: Arc<QuerySharedState>,
message_tx: mpsc::Sender<Result<Message>>,
) {
loop {
let read_result = if state.pending_stdin_close.load(Ordering::SeqCst) {
let timeout_dur = state.stream_close_timeout;
match tokio::time::timeout(timeout_dur, reader.read_next_message()).await {
Ok(result) => result,
Err(_) => {
debug!("Timed out waiting for first result, closing input stream");
try_close_deferred_stdin(&state).await;
continue;
}
}
} else {
reader.read_next_message().await
};
let raw = match read_result {
Ok(Some(raw)) => raw,
Ok(None) => {
try_close_deferred_stdin(&state).await;
break;
}
Err(err) => {
mark_reader_terminated(&state, err.to_string()).await;
let _ = message_tx.send(Err(err)).await;
break;
}
};
let msg_type = raw.get("type").and_then(Value::as_str).unwrap_or_default();
if msg_type == "control_response" {
handle_control_response(&state, &raw).await;
continue;
}
if msg_type == "control_request" {
if let Err(err) = handle_control_request(&state, raw).await {
debug!("Error handling control request: {err}");
}
continue;
}
if msg_type == "control_cancel_request" {
continue;
}
match parse_message(&raw) {
Ok(Some(msg)) => {
if matches!(msg, Message::Result(_))
&& state.pending_stdin_close.load(Ordering::SeqCst)
{
debug!("Received first result, closing input stream");
try_close_deferred_stdin(&state).await;
}
if message_tx.send(Ok(msg)).await.is_err() {
break;
}
}
Ok(None) => {}
Err(err) => {
if message_tx
.send(Err(Error::MessageParse(err)))
.await
.is_err()
{
break;
}
}
}
}
}
async fn mark_reader_terminated(state: &QuerySharedState, reason: String) {
state.reader_terminated.store(true, Ordering::SeqCst);
let stored_reason = {
let mut termination_reason = state.reader_termination_reason.lock().await;
if termination_reason.is_none() {
*termination_reason = Some(reason);
}
termination_reason
.clone()
.unwrap_or_else(|| "Unknown reason".to_string())
};
let mut controls = state.pending_controls.lock().await;
for (_, sender) in controls.senders.drain() {
let _ = sender.send(Err(Error::Other(format!(
"Background reader task terminated: {stored_reason}"
))));
}
}
async fn reader_termination_reason(state: &QuerySharedState) -> String {
state
.reader_termination_reason
.lock()
.await
.clone()
.unwrap_or_else(|| "Unknown reason".to_string())
}
async fn try_close_deferred_stdin(state: &QuerySharedState) {
if state
.pending_stdin_close
.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
if let Err(e) = state.writer.lock().await.end_input().await {
debug!("Error closing deferred stdin: {e}");
}
}
}
async fn handle_control_response(state: &QuerySharedState, raw: &Value) {
let Some(response) = raw.get("response").and_then(Value::as_object) else {
return;
};
let response_request_id = response
.get("request_id")
.and_then(Value::as_str)
.unwrap_or_default();
let subtype = response
.get("subtype")
.and_then(Value::as_str)
.unwrap_or_default();
let result: Result<Value> = if subtype == "error" {
let error = response
.get("error")
.and_then(Value::as_str)
.unwrap_or("Unknown error");
Err(Error::Other(error.to_string()))
} else {
Ok(response
.get("response")
.cloned()
.unwrap_or_else(|| json!({})))
};
let mut controls = state.pending_controls.lock().await;
if let Some(sender) = controls.senders.remove(response_request_id) {
let _ = sender.send(result);
} else {
controls
.buffered
.insert(response_request_id.to_string(), result);
}
}
async fn handle_can_use_tool_request(
state: &QuerySharedState,
request_data: &Map<String, Value>,
) -> Result<Value> {
let callback = state
.can_use_tool
.clone()
.ok_or_else(|| Error::Other("canUseTool callback is not provided".to_string()))?;
let tool_name = request_data
.get("tool_name")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string();
let input = request_data
.get("input")
.cloned()
.unwrap_or_else(|| json!({}));
let suggestions = request_data
.get("permission_suggestions")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default()
.into_iter()
.filter_map(|value| serde_json::from_value(value).ok())
.collect();
let blocked_path = request_data
.get("blocked_path")
.and_then(Value::as_str)
.map(ToString::to_string);
let context = ToolPermissionContext {
suggestions,
blocked_path,
signal: None,
};
let callback_future = panic::catch_unwind(AssertUnwindSafe(|| {
callback(tool_name, input.clone(), context)
}))
.map_err(|payload| callback_panic_error("can_use_tool", payload))?;
let callback_result =
await_callback_with_panic_isolation("can_use_tool", callback_future).await?;
let output = match callback_result {
PermissionResult::Allow(allow) => {
let mut obj = Map::new();
obj.insert("behavior".to_string(), Value::String("allow".to_string()));
obj.insert(
"updatedInput".to_string(),
allow.updated_input.unwrap_or(input),
);
if let Some(updated_permissions) = allow.updated_permissions {
let permissions_json: Vec<Value> = updated_permissions
.into_iter()
.map(|permission| permission.to_cli_dict())
.collect();
obj.insert(
"updatedPermissions".to_string(),
Value::Array(permissions_json),
);
}
Value::Object(obj)
}
PermissionResult::Deny(deny) => {
let mut obj = Map::new();
obj.insert("behavior".to_string(), Value::String("deny".to_string()));
obj.insert("message".to_string(), Value::String(deny.message));
if deny.interrupt {
obj.insert("interrupt".to_string(), Value::Bool(true));
}
Value::Object(obj)
}
};
Ok(output)
}
async fn handle_hook_callback_request(
state: &QuerySharedState,
request_data: &Map<String, Value>,
) -> Result<Value> {
let callback_id = request_data
.get("callback_id")
.and_then(Value::as_str)
.ok_or_else(|| Error::Other("Missing callback_id in hook_callback".to_string()))?;
let callback = state
.hook_callbacks
.lock()
.await
.get(callback_id)
.cloned()
.ok_or_else(|| Error::Other(format!("No hook callback found for ID: {callback_id}")))?;
let input = request_data.get("input").cloned().unwrap_or(Value::Null);
let tool_use_id = request_data
.get("tool_use_id")
.and_then(Value::as_str)
.map(ToString::to_string);
let callback_future = panic::catch_unwind(AssertUnwindSafe(|| {
callback(input, tool_use_id, Default::default())
}))
.map_err(|payload| callback_panic_error("hook", payload))?;
let output = await_callback_with_panic_isolation("hook", callback_future).await?;
Ok(convert_hook_output_for_cli(output))
}
async fn handle_mcp_message_request(
state: &QuerySharedState,
request_data: &Map<String, Value>,
) -> Result<Value> {
let server_name = request_data
.get("server_name")
.and_then(Value::as_str)
.ok_or_else(|| Error::Other("Missing server_name in mcp_message".to_string()))?;
let message = request_data
.get("message")
.cloned()
.ok_or_else(|| Error::Other("Missing message in mcp_message".to_string()))?;
let response = handle_sdk_mcp_request(&state.sdk_mcp_servers, server_name, &message).await;
Ok(json!({ "mcp_response": response }))
}
async fn handle_control_request(state: &QuerySharedState, request: Value) -> Result<()> {
let Some(request_obj) = request.as_object() else {
return Err(Error::Other("Invalid control request format".to_string()));
};
let request_id = request_obj
.get("request_id")
.and_then(Value::as_str)
.ok_or_else(|| Error::Other("Missing request_id in control request".to_string()))?
.to_string();
let request_data = request_obj
.get("request")
.and_then(Value::as_object)
.ok_or_else(|| Error::Other("Missing request payload".to_string()))?;
let subtype = request_data
.get("subtype")
.and_then(Value::as_str)
.ok_or_else(|| Error::Other("Missing request subtype".to_string()))?;
let result: Result<Value> = match subtype {
"can_use_tool" => handle_can_use_tool_request(state, request_data).await,
"hook_callback" => handle_hook_callback_request(state, request_data).await,
"mcp_message" => handle_mcp_message_request(state, request_data).await,
_ => Err(Error::Other(format!(
"Unsupported control request subtype: {subtype}"
))),
};
let response_json = match result {
Ok(payload) => json!({
"type": "control_response",
"response": {
"subtype": "success",
"request_id": request_id,
"response": payload
}
}),
Err(err) => json!({
"type": "control_response",
"response": {
"subtype": "error",
"request_id": request_id,
"error": err.to_string()
}
}),
};
state
.writer
.lock()
.await
.write(&(response_json.to_string() + "\n"))
.await
}
pub async fn handle_sdk_mcp_request(
sdk_mcp_servers: &HashMap<String, Arc<McpSdkServer>>,
server_name: &str,
message: &Value,
) -> Value {
let Some(server) = sdk_mcp_servers.get(server_name) else {
return json!({
"jsonrpc": "2.0",
"id": message.get("id").cloned().unwrap_or(Value::Null),
"error": {
"code": -32601,
"message": format!("Server '{server_name}' not found")
}
});
};
let method = message
.get("method")
.and_then(Value::as_str)
.unwrap_or_default();
let id = message.get("id").cloned().unwrap_or(Value::Null);
let params = message.get("params").cloned().unwrap_or_else(|| json!({}));
match method {
"initialize" => json!({
"jsonrpc": "2.0",
"id": id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"serverInfo": {
"name": server.name,
"version": server.version
}
}
}),
"tools/list" => json!({
"jsonrpc": "2.0",
"id": id,
"result": {
"tools": server.list_tools_json()
}
}),
"tools/call" => {
let tool_name = params
.get("name")
.and_then(Value::as_str)
.unwrap_or_default();
let arguments = params
.get("arguments")
.cloned()
.unwrap_or_else(|| json!({}));
let result = server.call_tool_json(tool_name, arguments).await;
json!({
"jsonrpc": "2.0",
"id": id,
"result": result
})
}
"notifications/initialized" => json!({
"jsonrpc": "2.0",
"result": {}
}),
_ => json!({
"jsonrpc": "2.0",
"id": id,
"error": {
"code": -32601,
"message": format!("Method '{method}' not found")
}
}),
}
}
pub(crate) fn build_hooks_config(
hooks: &HashMap<String, Vec<HookMatcher>>,
) -> (Map<String, Value>, HashMap<String, HookCallback>) {
let mut hooks_config = Map::new();
let mut hook_callbacks = HashMap::new();
let mut next_callback_id: usize = 0;
for (event, matchers) in hooks {
if matchers.is_empty() {
continue;
}
let mut event_matchers = Vec::new();
for matcher in matchers {
let mut callback_ids = Vec::new();
for callback in &matcher.hooks {
let callback_id = format!("hook_{}", next_callback_id);
next_callback_id += 1;
hook_callbacks.insert(callback_id.clone(), callback.clone());
callback_ids.push(callback_id);
}
let mut matcher_obj = Map::new();
matcher_obj.insert(
"matcher".to_string(),
matcher
.matcher
.as_ref()
.map(|m| Value::String(m.clone()))
.unwrap_or(Value::Null),
);
matcher_obj.insert("hookCallbackIds".to_string(), json!(callback_ids));
if let Some(timeout) = matcher.timeout {
matcher_obj.insert("timeout".to_string(), json!(timeout));
}
event_matchers.push(Value::Object(matcher_obj));
}
hooks_config.insert(event.clone(), Value::Array(event_matchers));
}
(hooks_config, hook_callbacks)
}