use tokio::io::BufReader;
use tokio::net::UnixStream;
use crate::bridge::events::PromptResult;
use crate::client::permissions::PermissionMode;
use crate::error::{AcpCliError, Result};
use crate::output::OutputRenderer;
use super::ipc::{connect_ipc, recv_message, send_message};
use super::messages::{QueueRequest, QueueResponse};
pub struct QueueClient {
stream: UnixStream,
reader: BufReader<UnixStream>,
}
impl QueueClient {
pub async fn connect(session_key: &str) -> std::io::Result<Self> {
let stream = connect_ipc(session_key).await?;
let std_stream = stream.into_std()?;
let reader_std = std_stream.try_clone()?;
let write_stream = UnixStream::from_std(std_stream)?;
let read_stream = UnixStream::from_std(reader_std)?;
Ok(Self {
stream: write_stream,
reader: BufReader::new(read_stream),
})
}
pub async fn prompt(
&mut self,
messages: Vec<String>,
renderer: &mut dyn OutputRenderer,
_permission_mode: &PermissionMode,
) -> Result<PromptResult> {
let reply_id = generate_reply_id();
let request = QueueRequest::Prompt {
messages,
reply_id: reply_id.clone(),
};
send_message(&mut self.stream, &request)
.await
.map_err(|e| AcpCliError::Connection(format!("failed to send prompt: {e}")))?;
loop {
let response: Option<QueueResponse> = recv_message(&mut self.reader)
.await
.map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
match response {
Some(QueueResponse::Queued {
position,
reply_id: _,
}) => {
renderer.session_info(&format!("Queued at position {position}"));
}
Some(QueueResponse::Event { kind, data }) => match kind.as_str() {
"text_chunk" => renderer.text_chunk(&data),
"tool_use" => renderer.tool_status(&data),
_ => {
renderer.session_info(&format!("event({kind}): {data}"));
}
},
Some(QueueResponse::PromptResult {
content,
stop_reason,
reply_id: _,
}) => {
return Ok(PromptResult {
content,
stop_reason,
});
}
Some(QueueResponse::StatusResponse { state, queue_depth }) => {
renderer.session_info(&format!("status: {state}, depth: {queue_depth}"));
}
Some(QueueResponse::Error { message }) => {
return Err(AcpCliError::Agent(message));
}
Some(QueueResponse::Ok) => {
}
None => {
return Err(AcpCliError::Connection(
"queue owner disconnected".to_string(),
));
}
}
}
}
pub async fn enqueue_only(&mut self, messages: Vec<String>) -> Result<usize> {
let reply_id = generate_reply_id();
let request = QueueRequest::Prompt {
messages,
reply_id: reply_id.clone(),
};
send_message(&mut self.stream, &request)
.await
.map_err(|e| AcpCliError::Connection(format!("failed to send prompt: {e}")))?;
loop {
let response: Option<QueueResponse> = recv_message(&mut self.reader)
.await
.map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
match response {
Some(QueueResponse::Queued { position, .. }) => {
return Ok(position);
}
Some(QueueResponse::Error { message }) => {
return Err(AcpCliError::Agent(message));
}
None => {
return Err(AcpCliError::Connection(
"queue owner disconnected before acknowledging prompt".to_string(),
));
}
_ => continue,
}
}
}
pub async fn set_mode(&mut self, mode: &str) -> Result<()> {
let request = QueueRequest::SetMode {
mode: mode.to_string(),
};
send_message(&mut self.stream, &request)
.await
.map_err(|e| AcpCliError::Connection(format!("failed to send set-mode: {e}")))?;
let response: Option<QueueResponse> = recv_message(&mut self.reader)
.await
.map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
match response {
Some(QueueResponse::Ok) => {
println!("Mode set to: {mode}");
Ok(())
}
Some(QueueResponse::Error { message }) => Err(AcpCliError::Agent(message)),
_ => Err(AcpCliError::Connection(
"unexpected response to set-mode request".to_string(),
)),
}
}
pub async fn set_config(&mut self, key: &str, value: &str) -> Result<()> {
let request = QueueRequest::SetConfig {
key: key.to_string(),
value: value.to_string(),
};
send_message(&mut self.stream, &request)
.await
.map_err(|e| AcpCliError::Connection(format!("failed to send set-config: {e}")))?;
let response: Option<QueueResponse> = recv_message(&mut self.reader)
.await
.map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
match response {
Some(QueueResponse::Ok) => {
println!("Config set: {key} = {value}");
Ok(())
}
Some(QueueResponse::Error { message }) => Err(AcpCliError::Agent(message)),
_ => Err(AcpCliError::Connection(
"unexpected response to set-config request".to_string(),
)),
}
}
pub async fn cancel(&mut self) -> std::io::Result<()> {
send_message(&mut self.stream, &QueueRequest::Cancel).await
}
pub async fn status(&mut self) -> std::io::Result<String> {
send_message(&mut self.stream, &QueueRequest::Status).await?;
let response: Option<QueueResponse> = recv_message(&mut self.reader).await?;
match response {
Some(QueueResponse::StatusResponse { state, queue_depth }) => {
Ok(format!("state: {state}, queue_depth: {queue_depth}"))
}
Some(QueueResponse::Error { message }) => {
Err(std::io::Error::other(format!("status error: {message}")))
}
_ => Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"unexpected response to status request",
)),
}
}
}
fn generate_reply_id() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let pid = std::process::id();
format!("{timestamp:x}-{pid:x}")
}