use std::sync::{Arc, atomic::Ordering};
use anyhow::Result;
use serde_json::Value;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::sync::mpsc;
use tokio::time::sleep;
use tracing::warn;
use super::handshake::parse_app_server_stderr_line;
use super::{AppServerInbound, RpcErrorPayload, RunningAppServer};
use crate::bridge_protocol::{json_string, now_millis};
pub(super) fn spawn_stdout_task(
running: Arc<RunningAppServer>,
runtime_id: String,
stdout: tokio::process::ChildStdout,
inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
) {
tokio::spawn(async move {
let mut reader = BufReader::new(stdout).lines();
loop {
match reader.next_line().await {
Ok(Some(line)) => {
if line.trim().is_empty() {
continue;
}
if let Err(error) =
handle_stdout_line(&running, &runtime_id, &inbound_tx, &line).await
{
warn!("解析 app-server 输出失败: {error}");
}
}
Ok(None) => break,
Err(error) => {
warn!("读取 app-server stdout 失败: {error}");
break;
}
}
}
});
}
pub(super) fn spawn_stderr_task(
runtime_id: String,
stderr: tokio::process::ChildStderr,
inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
) {
tokio::spawn(async move {
let mut reader = BufReader::new(stderr).lines();
loop {
match reader.next_line().await {
Ok(Some(line)) => {
let trimmed = line.trim();
if !trimmed.is_empty() {
let (level, message) = parse_app_server_stderr_line(trimmed);
warn!("app-server stderr [{runtime_id}]: {trimmed}");
let _ = inbound_tx.send(AppServerInbound::LogChunk {
runtime_id: runtime_id.clone(),
stream: "stderr".to_string(),
level,
source: "app-server".to_string(),
message,
detail: None,
occurred_at_ms: now_millis(),
});
}
}
Ok(None) => break,
Err(error) => {
warn!("读取 app-server stderr 失败: {error}");
break;
}
}
}
});
}
pub(super) fn spawn_wait_task(
running: Arc<RunningAppServer>,
inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
) {
tokio::spawn(async move {
let message = loop {
let maybe_exit = {
let mut guard = running.child.lock().await;
let Some(child) = guard.as_mut() else {
break format!("app-server 已退出: runtime {}", running.runtime_id);
};
match child.try_wait() {
Ok(Some(status)) => {
*guard = None;
Some(Ok(status))
}
Ok(None) => None,
Err(error) => {
*guard = None;
Some(Err(error))
}
}
};
match maybe_exit {
Some(Ok(status)) => break format!("app-server 已退出: {status}"),
Some(Err(error)) => break format!("等待 app-server 退出失败: {error}"),
None => sleep(tokio::time::Duration::from_millis(300)).await,
}
};
running.alive.store(false, Ordering::SeqCst);
fail_pending_requests(&running, &message).await;
let _ = inbound_tx.send(AppServerInbound::ProcessChanged {
runtime_id: running.runtime_id.clone(),
pid: None,
running: false,
});
let _ = inbound_tx.send(AppServerInbound::Exited {
runtime_id: running.runtime_id.clone(),
message,
expected: running.stopping.load(Ordering::SeqCst),
});
});
}
async fn handle_stdout_line(
running: &Arc<RunningAppServer>,
runtime_id: &str,
inbound_tx: &mpsc::UnboundedSender<AppServerInbound>,
line: &str,
) -> Result<()> {
let message: Value = serde_json::from_str(line)?;
let method = message.get("method").and_then(Value::as_str);
let id = message.get("id").cloned();
let result = message.get("result").cloned();
let error = message.get("error").cloned();
match (method, id, result, error) {
(Some(method), Some(id), None, None) => {
let params = message.get("params").cloned().unwrap_or(Value::Null);
let _ = inbound_tx.send(AppServerInbound::ServerRequest {
runtime_id: runtime_id.to_string(),
id,
method: method.to_string(),
params,
});
}
(Some(method), None, None, None) => {
let params = message.get("params").cloned().unwrap_or(Value::Null);
let _ = inbound_tx.send(AppServerInbound::Notification {
runtime_id: runtime_id.to_string(),
method: method.to_string(),
params,
});
}
(None, Some(id), Some(result), _) => {
let key = json_string(&id);
if let Some(sender) = running.pending.lock().await.remove(&key) {
let _ = sender.send(Ok(result));
}
}
(None, Some(id), _, Some(error_value)) => {
let key = json_string(&id);
if let Some(sender) = running.pending.lock().await.remove(&key) {
let payload = serde_json::from_value::<RpcErrorPayload>(error_value)?;
let _ = sender.send(Err(payload));
}
}
_ => {
warn!("收到未知 app-server 消息: {line}");
}
}
Ok(())
}
async fn fail_pending_requests(running: &RunningAppServer, message: &str) {
let mut pending = running.pending.lock().await;
for (_, sender) in pending.drain() {
let _ = sender.send(Err(RpcErrorPayload {
code: -32001,
message: message.to_string(),
data: None,
}));
}
}