use std::sync::{Arc, Mutex};
use std::time::Duration;
use serde_json::Value;
use tokio::sync::broadcast;
use crate::core::types::AgentEvent;
use crate::rpc::handler::HostHandler;
use crate::rpc::message::{
classify_line, IncomingMessage, RpcResponse,
};
use crate::rpc::pending::PendingRequests;
use super::protocol::{update_to_event, SessionUpdateParams};
use super::spawn::AcpProcess;
pub(crate) fn acp_reader_loop(
process: Arc<Mutex<AcpProcess>>,
tx: broadcast::Sender<AgentEvent>,
pending: PendingRequests,
handler: Arc<dyn HostHandler>,
) {
let mut received_session_end = false;
loop {
let line = {
match process.lock() {
Ok(guard) => guard.try_recv(),
Err(_) => break, }
};
let line = match line {
Some(l) => l,
None => {
let still_running = process
.lock()
.ok()
.map(|mut g| g.is_running())
.unwrap_or(false);
if !still_running {
let exit_code = collect_exit_code(&process);
if !received_session_end {
let _ = tx.send(AgentEvent::SessionEnd {
result: format!("exit_code={}", exit_code),
cost_usd: None,
is_error: exit_code != 0,
});
}
let _ = tx.send(AgentEvent::Exited { code: exit_code });
break;
}
std::thread::sleep(Duration::from_millis(10));
continue;
}
};
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
match classify_line(trimmed) {
IncomingMessage::Request { id, method, params } => {
let result = handler.handle(&method, params.clone());
let response = match result {
Ok(val) => RpcResponse::success(id.clone(), val),
Err(err) => RpcResponse::error_response(id.clone(), err),
};
if let Ok(json) = serde_json::to_string(&response) {
write_line_to_process(&process, &format!("{}\n", json));
}
let _ = tx.send(AgentEvent::RpcIncomingRequest { id, method, params });
}
IncomingMessage::Response { id, result, error } => {
let rpc_result = match error {
Some(e) => Err(e),
None => Ok(result.unwrap_or(Value::Null)),
};
let _ = pending.resolve(id, rpc_result);
}
IncomingMessage::Notification { method, params } => {
if method == "session/update" {
if let Some(ref p) = params {
if let Ok(sup) = serde_json::from_value::<SessionUpdateParams>(p.clone()) {
let events = update_to_event(&sup);
if events.is_empty() {
let _ = tx.send(AgentEvent::RpcNotification {
method,
params: p.clone(),
});
} else {
for ev in &events {
if matches!(ev, AgentEvent::SessionEnd { .. }) {
received_session_end = true;
}
}
for ev in events {
let _ = tx.send(ev);
}
}
continue;
}
}
}
let _ = tx.send(AgentEvent::RpcNotification {
method,
params: params.unwrap_or(Value::Null),
});
}
IncomingMessage::Legacy(_raw) => {
}
}
}
pending.cancel_all("acp session closed");
}
fn write_line_to_process(process: &Arc<Mutex<AcpProcess>>, line: &str) {
if let Ok(mut guard) = process.lock() {
let _ = guard.write_line(line.trim_end_matches('\n'));
}
}
fn collect_exit_code(process: &Arc<Mutex<AcpProcess>>) -> i32 {
process
.lock()
.ok()
.map(|mut g| g.exit_code())
.unwrap_or(0)
}