use std::sync::Arc;
use anyhow::Result;
use serde_json::json;
use tokio::sync::mpsc;
use super::helpers::handshake_summary;
use super::notifications::handle_notification;
use super::server_requests::handle_server_request;
use crate::app_server::AppServerInbound;
use crate::bridge_protocol::{RuntimeStatusSnapshot, now_millis};
use crate::state::BridgeState;
pub(super) async fn run_app_server_event_loop(
state: Arc<BridgeState>,
mut inbound_rx: mpsc::UnboundedReceiver<AppServerInbound>,
) {
while let Some(message) = inbound_rx.recv().await {
if let Err(error) = handle_app_server_message(&state, message).await {
let _ = state.emit_event(
"error",
None,
None,
json!({
"message": error.to_string(),
}),
);
}
}
}
pub(super) async fn handle_app_server_message(
state: &BridgeState,
message: AppServerInbound,
) -> Result<()> {
match message {
AppServerInbound::Starting { runtime_id } => {
state
.transition_runtime_status(
&runtime_id,
"starting".to_string(),
None,
handshake_summary("starting", false, Vec::new(), None),
)
.await
}
AppServerInbound::ProcessChanged {
runtime_id,
pid,
running,
} => {
state
.emit_runtime_process_changed(&runtime_id, pid, running)
.await
}
AppServerInbound::Initializing {
runtime_id,
experimental_api_enabled,
opt_out_notification_methods,
} => {
state
.transition_runtime_status(
&runtime_id,
"starting".to_string(),
None,
handshake_summary(
"initializing",
experimental_api_enabled,
opt_out_notification_methods,
Some("已发送 initialize,等待握手完成".to_string()),
),
)
.await
}
AppServerInbound::Initialized {
runtime_id,
info,
experimental_api_enabled,
opt_out_notification_methods,
} => {
let current = state.require_runtime(Some(&runtime_id)).await?;
let current_status = current.status.read().await.clone();
state
.emit_runtime_status(
&runtime_id,
RuntimeStatusSnapshot {
runtime_id: runtime_id.clone(),
status: "running".to_string(),
codex_home: Some(info.codex_home),
user_agent: Some(info.user_agent),
platform_family: Some(info.platform_family),
platform_os: Some(info.platform_os),
last_error: None,
pid: current_status.pid,
app_server_handshake: handshake_summary(
"ready",
experimental_api_enabled,
opt_out_notification_methods,
Some("握手完成,initialized 已发送".to_string()),
),
updated_at_ms: now_millis(),
},
)
.await
}
AppServerInbound::HandshakeFailed {
runtime_id,
message,
experimental_api_enabled,
opt_out_notification_methods,
} => {
state
.transition_runtime_status(
&runtime_id,
"error".to_string(),
Some(message.clone()),
handshake_summary(
"failed",
experimental_api_enabled,
opt_out_notification_methods,
Some(message.clone()),
),
)
.await?;
state.emit_runtime_degraded(&runtime_id, message).await
}
AppServerInbound::Exited {
runtime_id,
message,
expected,
} => {
let current = state.require_runtime(Some(&runtime_id)).await?;
let current_status = current.status.read().await.clone();
let handshake = if expected {
crate::bridge_protocol::AppServerHandshakeSummary::inactive()
} else if current_status.app_server_handshake.state == "failed" {
handshake_summary(
"failed",
current_status.app_server_handshake.experimental_api_enabled,
current_status
.app_server_handshake
.opt_out_notification_methods
.clone(),
current_status
.app_server_handshake
.detail
.clone()
.or_else(|| Some(message.clone())),
)
} else {
handshake_summary(
"failed",
current_status.app_server_handshake.experimental_api_enabled,
current_status
.app_server_handshake
.opt_out_notification_methods
.clone(),
Some(message.clone()),
)
};
state
.emit_runtime_status(
&runtime_id,
RuntimeStatusSnapshot {
runtime_id: runtime_id.clone(),
status: if expected {
"stopped".to_string()
} else {
"error".to_string()
},
codex_home: current_status.codex_home,
user_agent: current_status.user_agent,
platform_family: current_status.platform_family,
platform_os: current_status.platform_os,
last_error: if expected {
None
} else if current_status.last_error.is_some() {
current_status.last_error
} else {
Some(message.clone())
},
pid: None,
app_server_handshake: handshake,
updated_at_ms: now_millis(),
},
)
.await?;
if !expected {
state.emit_runtime_degraded(&runtime_id, message).await?;
}
Ok(())
}
AppServerInbound::Notification {
runtime_id,
method,
params,
} => handle_notification(state, &runtime_id, &method, params).await,
AppServerInbound::ServerRequest {
runtime_id,
id,
method,
params,
} => handle_server_request(state, &runtime_id, id, &method, params).await,
AppServerInbound::LogChunk {
runtime_id,
stream,
level,
source,
message,
detail,
occurred_at_ms,
} => state.emit_event(
"app_server_log_chunk",
Some(&runtime_id),
None,
json!({
"runtimeId": runtime_id,
"stream": stream,
"level": level,
"source": source,
"message": message,
"detail": detail,
"occurredAtMs": occurred_at_ms,
}),
),
}
}