use std::sync::Arc;
use anyhow::{Context, Result};
use serde_json::{Value, json};
use tokio::sync::mpsc;
use super::BridgeState;
use super::helpers::{normalize_name, optional_string, request_key, required_string};
use super::timeline::{
normalize_delta_payload, normalize_thread, timeline_entry_from_plan_update,
timeline_entry_from_thread_item,
};
use crate::app_server::AppServerInbound;
use crate::bridge_protocol::{
PendingServerRequestOption, PendingServerRequestQuestion, PendingServerRequestRecord,
RuntimeStatusSnapshot, ThreadStatusInfo, ThreadTokenUsage, now_millis,
};
pub(super) async fn run_app_server_event_loop(
state: Arc<BridgeState>,
mut inbound_rx: mpsc::UnboundedReceiver<AppServerInbound>,
) {
while let Some(message) = inbound_rx.recv().await {
if let Err(error) = handle_app_server_message(&state, message).await {
let _ = state.emit_event(
"error",
None,
None,
json!({
"message": error.to_string(),
}),
);
}
}
}
pub(super) async fn handle_app_server_message(
state: &BridgeState,
message: AppServerInbound,
) -> Result<()> {
match message {
AppServerInbound::Starting { runtime_id } => {
let current = state.require_runtime(Some(&runtime_id)).await?;
let current_status = current.status.read().await.clone();
state
.emit_runtime_status(
&runtime_id,
RuntimeStatusSnapshot {
runtime_id: runtime_id.clone(),
status: "starting".to_string(),
codex_home: current_status.codex_home,
user_agent: current_status.user_agent,
platform_family: current_status.platform_family,
platform_os: current_status.platform_os,
last_error: None,
pid: current_status.pid,
updated_at_ms: now_millis(),
},
)
.await
}
AppServerInbound::ProcessChanged {
runtime_id,
pid,
running,
} => {
state
.emit_runtime_process_changed(&runtime_id, pid, running)
.await
}
AppServerInbound::Initialized { runtime_id, info } => {
let current = state.require_runtime(Some(&runtime_id)).await?;
let current_status = current.status.read().await.clone();
state
.emit_runtime_status(
&runtime_id,
RuntimeStatusSnapshot {
runtime_id: runtime_id.clone(),
status: "running".to_string(),
codex_home: Some(info.codex_home),
user_agent: Some(info.user_agent),
platform_family: Some(info.platform_family),
platform_os: Some(info.platform_os),
last_error: None,
pid: current_status.pid,
updated_at_ms: now_millis(),
},
)
.await
}
AppServerInbound::Exited {
runtime_id,
message,
expected,
} => {
let current = state.require_runtime(Some(&runtime_id)).await?;
let current_status = current.status.read().await.clone();
state
.emit_runtime_status(
&runtime_id,
RuntimeStatusSnapshot {
runtime_id: runtime_id.clone(),
status: if expected {
"stopped".to_string()
} else {
"error".to_string()
},
codex_home: current_status.codex_home,
user_agent: current_status.user_agent,
platform_family: current_status.platform_family,
platform_os: current_status.platform_os,
last_error: if expected {
None
} else {
Some(message.clone())
},
pid: None,
updated_at_ms: now_millis(),
},
)
.await?;
if !expected {
state.emit_runtime_degraded(&runtime_id, message).await?;
}
Ok(())
}
AppServerInbound::Notification {
runtime_id,
method,
params,
} => handle_notification(state, &runtime_id, &method, params).await,
AppServerInbound::ServerRequest {
runtime_id,
id,
method,
params,
} => handle_server_request(state, &runtime_id, id, &method, params).await,
AppServerInbound::LogChunk {
runtime_id,
stream,
level,
source,
message,
detail,
occurred_at_ms,
} => state.emit_event(
"app_server_log_chunk",
Some(&runtime_id),
None,
json!({
"runtimeId": runtime_id,
"stream": stream,
"level": level,
"source": source,
"message": message,
"detail": detail,
"occurredAtMs": occurred_at_ms,
}),
),
}
}
async fn handle_notification(
state: &BridgeState,
runtime_id: &str,
method: &str,
params: Value,
) -> Result<()> {
match method {
"thread/started" => {
let thread_summary = params
.get("thread")
.and_then(|thread| normalize_thread_summary(state, runtime_id, thread, false));
if let Some(thread) = thread_summary.as_ref() {
state.storage.upsert_thread_index(thread)?;
}
state.emit_event(
method,
Some(runtime_id),
params
.get("thread")
.and_then(|thread| thread.get("id"))
.and_then(Value::as_str),
json!({
"runtimeId": runtime_id,
"thread": params.get("thread"),
"threadSummary": thread_summary,
}),
)?;
}
"thread/status/changed" => {
let thread_id = required_string(¶ms, "threadId")?;
let status_value = params.get("status").cloned().unwrap_or(Value::Null);
let status_info = thread_status_info(&status_value);
if let Some(mut thread) = state.storage.get_thread_index(thread_id)? {
thread.is_loaded = status_info.kind != "notLoaded";
thread.is_active = status_info.kind == "active";
thread.status = status_info.kind.clone();
thread.status_info = status_info.clone();
state.storage.upsert_thread_index(&thread)?;
}
state.emit_event(
method,
Some(runtime_id),
Some(thread_id),
json!({
"runtimeId": runtime_id,
"threadId": thread_id,
"status": status_value,
"statusInfo": status_info,
}),
)?;
}
"thread/name/updated" => {
let thread_id = required_string(¶ms, "threadId")?;
let thread_name = normalize_name(optional_string(¶ms, "threadName"));
if let Some(mut thread) = state.storage.get_thread_index(thread_id)? {
thread.name = thread_name.clone();
state.storage.upsert_thread_index(&thread)?;
}
state.emit_event(
method,
Some(runtime_id),
Some(thread_id),
json!({
"runtimeId": runtime_id,
"threadId": thread_id,
"threadName": thread_name,
}),
)?;
}
"thread/archived" => {
let thread_id = required_string(¶ms, "threadId")?;
state.storage.set_thread_archived(thread_id, true)?;
state.emit_event(
method,
Some(runtime_id),
Some(thread_id),
json!({
"runtimeId": runtime_id,
"threadId": thread_id,
}),
)?;
}
"thread/unarchived" => {
let thread_id = required_string(¶ms, "threadId")?;
state.storage.set_thread_archived(thread_id, false)?;
state.emit_event(
method,
Some(runtime_id),
Some(thread_id),
json!({
"runtimeId": runtime_id,
"threadId": thread_id,
}),
)?;
}
"thread/closed" => {
let thread_id = required_string(¶ms, "threadId")?;
if let Some(mut thread) = state.storage.get_thread_index(thread_id)? {
thread.is_loaded = false;
thread.is_active = false;
thread.status = "closed".to_string();
thread.status_info = ThreadStatusInfo {
kind: "closed".to_string(),
reason: optional_string(¶ms, "reason"),
raw: params
.get("status")
.cloned()
.unwrap_or_else(|| json!({ "type": "closed" })),
};
state.storage.upsert_thread_index(&thread)?;
}
state.emit_event(
method,
Some(runtime_id),
Some(thread_id),
json!({
"runtimeId": runtime_id,
"threadId": thread_id,
"reason": params.get("reason"),
}),
)?;
}
"thread/tokenUsage/updated" => {
let thread_id = required_string(¶ms, "threadId")?;
let token_usage = thread_token_usage(
params
.get("tokenUsage")
.or_else(|| params.get("usage"))
.unwrap_or(&Value::Null),
);
if let Some(mut thread) = state.storage.get_thread_index(thread_id)? {
thread.token_usage = Some(token_usage.clone());
state.storage.upsert_thread_index(&thread)?;
}
state.emit_event(
method,
Some(runtime_id),
Some(thread_id),
json!({
"runtimeId": runtime_id,
"threadId": thread_id,
"tokenUsage": token_usage,
}),
)?;
}
"turn/started" => {
let thread_id = required_string(¶ms, "threadId")?;
let turn_id = params
.get("turn")
.and_then(|turn| turn.get("id"))
.and_then(Value::as_str)
.unwrap_or_default();
state.emit_event(
method,
Some(runtime_id),
Some(thread_id),
json!({
"runtimeId": runtime_id,
"threadId": thread_id,
"turnId": turn_id,
"turn": params.get("turn"),
}),
)?;
}
"item/agentMessage/delta" => {
let thread_id = params
.get("threadId")
.and_then(Value::as_str)
.map(ToOwned::to_owned);
state.emit_event(
method,
Some(runtime_id),
thread_id.as_deref(),
normalize_delta_payload(
runtime_id,
params,
"agentMessage",
Some("Codex".to_string()),
None,
"item/agentMessage/delta",
json!({}),
None,
None,
),
)?;
}
"item/commandExecution/outputDelta" => {
let thread_id = params
.get("threadId")
.and_then(Value::as_str)
.map(ToOwned::to_owned);
state.emit_event(
method,
Some(runtime_id),
thread_id.as_deref(),
normalize_delta_payload(
runtime_id,
params,
"commandExecution",
Some("命令输出".to_string()),
Some("inProgress".to_string()),
"item/commandExecution/outputDelta",
json!({}),
None,
None,
),
)?;
}
"item/fileChange/outputDelta" => {
let thread_id = params
.get("threadId")
.and_then(Value::as_str)
.map(ToOwned::to_owned);
state.emit_event(
method,
Some(runtime_id),
thread_id.as_deref(),
normalize_delta_payload(
runtime_id,
params,
"fileChange",
Some("文件改动".to_string()),
Some("inProgress".to_string()),
"item/fileChange/outputDelta",
json!({}),
None,
None,
),
)?;
}
"command/exec/outputDelta" => {
state.emit_event(
method,
Some(runtime_id),
None,
json!({
"runtimeId": runtime_id,
"processId": params.get("processId"),
"stream": params.get("stream"),
"command": params.get("command"),
"cwd": params.get("cwd"),
"status": params.get("status"),
"exitCode": params.get("exitCode"),
"delta": params.get("delta"),
"raw": params,
}),
)?;
}
"turn/diff/updated" => {
state.emit_event(
method,
Some(runtime_id),
params.get("threadId").and_then(Value::as_str),
json!({
"runtimeId": runtime_id,
"threadId": params.get("threadId"),
"turnId": params.get("turnId"),
"diff": params.get("diff"),
"raw": params,
}),
)?;
}
"turn/plan/updated" => {
let thread_id = required_string(¶ms, "threadId")?;
let turn_id = required_string(¶ms, "turnId")?;
let entry = timeline_entry_from_plan_update(
runtime_id,
thread_id,
turn_id,
optional_string(¶ms, "explanation"),
params.get("plan").cloned().unwrap_or_else(|| json!([])),
);
state.emit_event(
method,
Some(runtime_id),
Some(thread_id),
serde_json::to_value(entry)?,
)?;
}
"item/started" => {
let thread_id = required_string(¶ms, "threadId")?;
let turn_id = required_string(¶ms, "turnId")?;
if let Some(item) = params.get("item").and_then(Value::as_object) {
if let Some(entry) = timeline_entry_from_thread_item(
runtime_id,
thread_id,
Some(turn_id),
&Value::Object(item.clone()),
"stream_event",
true,
false,
) {
state.emit_event(
method,
Some(runtime_id),
Some(thread_id),
serde_json::to_value(entry)?,
)?;
}
}
}
"item/completed" => {
let thread_id = required_string(¶ms, "threadId")?;
let turn_id = required_string(¶ms, "turnId")?;
if let Some(item) = params.get("item").and_then(Value::as_object) {
if let Some(entry) = timeline_entry_from_thread_item(
runtime_id,
thread_id,
Some(turn_id),
&Value::Object(item.clone()),
"stream_event",
false,
true,
) {
state.emit_event(
method,
Some(runtime_id),
Some(thread_id),
serde_json::to_value(entry)?,
)?;
}
}
}
"item/plan/delta" => {
let thread_id = params
.get("threadId")
.and_then(Value::as_str)
.map(ToOwned::to_owned);
state.emit_event(
method,
Some(runtime_id),
thread_id.as_deref(),
normalize_delta_payload(
runtime_id,
params,
"plan",
Some("执行计划".to_string()),
Some("inProgress".to_string()),
"item/plan/delta",
json!({}),
None,
None,
),
)?;
}
"item/mcpToolCall/progress" => {
let thread_id = params
.get("threadId")
.and_then(Value::as_str)
.map(ToOwned::to_owned);
state.emit_event(
method,
Some(runtime_id),
thread_id.as_deref(),
normalize_delta_payload(
runtime_id,
json!({
"threadId": params.get("threadId"),
"turnId": params.get("turnId"),
"itemId": params.get("itemId"),
"delta": params.get("message"),
}),
"mcpToolCall",
Some("MCP 工具".to_string()),
Some("inProgress".to_string()),
"item/mcpToolCall/progress",
json!({}),
None,
None,
),
)?;
}
"item/reasoning/textDelta" => {
let thread_id = params
.get("threadId")
.and_then(Value::as_str)
.map(ToOwned::to_owned);
let content_index = params.get("contentIndex").and_then(Value::as_i64);
state.emit_event(
method,
Some(runtime_id),
thread_id.as_deref(),
normalize_delta_payload(
runtime_id,
params,
"reasoning",
Some("思考过程".to_string()),
Some("inProgress".to_string()),
"item/reasoning/textDelta",
json!({
"summary": [],
"content": [],
}),
None,
content_index,
),
)?;
}
"item/reasoning/summaryPartAdded" => {
let thread_id = params
.get("threadId")
.and_then(Value::as_str)
.map(ToOwned::to_owned);
let summary_index = params.get("summaryIndex").and_then(Value::as_i64);
state.emit_event(
method,
Some(runtime_id),
thread_id.as_deref(),
normalize_delta_payload(
runtime_id,
json!({
"threadId": params.get("threadId"),
"turnId": params.get("turnId"),
"itemId": params.get("itemId"),
"delta": "",
}),
"reasoning",
Some("思考摘要".to_string()),
Some("inProgress".to_string()),
"item/reasoning/summaryPartAdded",
json!({
"summary": [],
"content": [],
}),
summary_index,
None,
),
)?;
}
"item/reasoning/summaryTextDelta" => {
let thread_id = params
.get("threadId")
.and_then(Value::as_str)
.map(ToOwned::to_owned);
let summary_index = params.get("summaryIndex").and_then(Value::as_i64);
state.emit_event(
method,
Some(runtime_id),
thread_id.as_deref(),
normalize_delta_payload(
runtime_id,
params,
"reasoning",
Some("思考摘要".to_string()),
Some("inProgress".to_string()),
"item/reasoning/summaryTextDelta",
json!({
"summary": [],
"content": [],
}),
summary_index,
None,
),
)?;
}
"serverRequest/resolved" => {
let request_id = params
.get("requestId")
.map(request_key)
.context("serverRequest/resolved 缺少 requestId")?;
state.storage.remove_pending_request(&request_id)?;
state.emit_event(
method,
Some(runtime_id),
params.get("threadId").and_then(Value::as_str),
json!({
"runtimeId": runtime_id,
"threadId": params.get("threadId"),
"requestId": request_id,
"raw": params,
}),
)?;
}
"turn/completed" => {
let thread_id = required_string(¶ms, "threadId")?;
let turn = params.get("turn").context("turn/completed 缺少 turn")?;
if let Some(turn_id) = turn.get("id").and_then(Value::as_str) {
state.cleanup_staged_turn_inputs(turn_id)?;
}
state.emit_event(
method,
Some(runtime_id),
Some(thread_id),
json!({
"runtimeId": runtime_id,
"threadId": thread_id,
"turnId": turn.get("id"),
"status": turn.get("status"),
"error": turn.get("error"),
"turn": turn,
}),
)?;
}
"error" => {
state.emit_event(
"error",
Some(runtime_id),
params.get("threadId").and_then(Value::as_str),
json!({
"runtimeId": runtime_id,
"message": params,
}),
)?;
}
_ => {}
}
Ok(())
}
async fn handle_server_request(
state: &BridgeState,
runtime_id: &str,
id: Value,
method: &str,
params: Value,
) -> Result<()> {
match method {
"item/commandExecution/requestApproval"
| "item/fileChange/requestApproval"
| "item/tool/requestUserInput"
| "mcpServer/elicitation/request"
| "item/tool/call" => {
let request = pending_server_request(runtime_id, id, method, params)?;
state.storage.put_pending_request(&request)?;
state.emit_event(
method,
Some(runtime_id),
request.thread_id.as_deref(),
json!({ "request": request }),
)?;
}
_ => {
let runtime = state.require_runtime(Some(runtime_id)).await?;
runtime
.app_server
.respond_error(id, -32000, "bridge 不支持的 server request")
.await?;
}
}
Ok(())
}
fn normalize_thread_summary(
state: &BridgeState,
runtime_id: &str,
thread_value: &Value,
archived: bool,
) -> Option<crate::bridge_protocol::ThreadSummary> {
let workspace = state.find_workspace_for_thread(thread_value).ok()?;
normalize_thread(runtime_id, thread_value, workspace.as_ref(), archived).ok()
}
fn thread_status_info(status_value: &Value) -> ThreadStatusInfo {
let kind = status_value
.get("type")
.and_then(Value::as_str)
.unwrap_or_else(|| status_value.as_str().unwrap_or("unknown"))
.to_string();
let reason = optional_string(status_value, "reason");
ThreadStatusInfo {
kind,
reason,
raw: status_value.clone(),
}
}
fn thread_token_usage(value: &Value) -> ThreadTokenUsage {
ThreadTokenUsage {
input_tokens: first_i64(value, &["inputTokens", "input_tokens"]),
cached_input_tokens: first_i64(value, &["cachedInputTokens", "cached_input_tokens"]),
output_tokens: first_i64(value, &["outputTokens", "output_tokens"]),
reasoning_tokens: first_i64(value, &["reasoningTokens", "reasoning_tokens"]),
total_tokens: first_i64(value, &["totalTokens", "total_tokens"]),
raw: value.clone(),
updated_at_ms: now_millis(),
}
}
fn first_i64(value: &Value, keys: &[&str]) -> Option<i64> {
keys.iter()
.find_map(|key| value.get(*key).and_then(Value::as_i64))
}
fn pending_server_request(
runtime_id: &str,
rpc_request_id: Value,
request_type: &str,
params: Value,
) -> Result<PendingServerRequestRecord> {
let available_decisions = params
.get("availableDecisions")
.and_then(Value::as_array)
.map(|items| {
items
.iter()
.filter_map(Value::as_str)
.map(ToOwned::to_owned)
.collect::<Vec<_>>()
})
.unwrap_or_else(|| default_available_decisions(request_type));
Ok(PendingServerRequestRecord {
request_id: request_key(&rpc_request_id),
runtime_id: runtime_id.to_string(),
rpc_request_id,
request_type: request_type.to_string(),
thread_id: optional_string(¶ms, "threadId"),
turn_id: optional_string(¶ms, "turnId"),
item_id: optional_string(¶ms, "itemId"),
title: pending_request_title(request_type),
reason: optional_string(¶ms, "reason")
.or_else(|| optional_string(¶ms, "message"))
.or_else(|| optional_string(¶ms, "title")),
command: optional_string(¶ms, "command"),
cwd: optional_string(¶ms, "cwd"),
grant_root: optional_string(¶ms, "grantRoot"),
tool_name: optional_string(¶ms, "tool").or_else(|| optional_string(¶ms, "name")),
arguments: params.get("arguments").cloned(),
questions: pending_request_questions(¶ms),
proposed_execpolicy_amendment: params.get("proposedExecpolicyAmendment").cloned(),
network_approval_context: params.get("networkApprovalContext").cloned(),
schema: params
.get("schema")
.cloned()
.or_else(|| params.get("requestedSchema").cloned())
.or_else(|| params.get("inputSchema").cloned()),
available_decisions,
raw_payload: params,
created_at_ms: now_millis(),
})
}
fn pending_request_title(request_type: &str) -> Option<String> {
Some(match request_type {
"item/commandExecution/requestApproval" => "命令执行确认".to_string(),
"item/fileChange/requestApproval" => "文件改动确认".to_string(),
"item/tool/requestUserInput" => "需要补充输入".to_string(),
"item/tool/call" => "动态工具调用".to_string(),
"mcpServer/elicitation/request" => "MCP 交互请求".to_string(),
_ => return None,
})
}
fn default_available_decisions(request_type: &str) -> Vec<String> {
match request_type {
"item/commandExecution/requestApproval" | "item/fileChange/requestApproval" => vec![
"accept".to_string(),
"acceptForSession".to_string(),
"decline".to_string(),
"cancel".to_string(),
],
_ => Vec::new(),
}
}
fn pending_request_questions(params: &Value) -> Vec<PendingServerRequestQuestion> {
params
.get("questions")
.and_then(Value::as_array)
.into_iter()
.flatten()
.map(|question| PendingServerRequestQuestion {
id: optional_string(question, "id").unwrap_or_default(),
header: optional_string(question, "header"),
question: optional_string(question, "question")
.or_else(|| optional_string(question, "prompt"))
.or_else(|| optional_string(question, "label")),
required: question
.get("required")
.and_then(Value::as_bool)
.unwrap_or(false),
options: question
.get("options")
.and_then(Value::as_array)
.into_iter()
.flatten()
.map(|option| PendingServerRequestOption {
label: optional_string(option, "label")
.or_else(|| optional_string(option, "value"))
.unwrap_or_default(),
description: optional_string(option, "description"),
value: option.get("value").cloned(),
is_other: option
.get("isOther")
.and_then(Value::as_bool)
.unwrap_or(false),
raw: option.clone(),
})
.collect(),
raw: question.clone(),
})
.collect()
}