use anyhow::{Context, Result};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use crate::acp::jsonrpc::JsonRpcMessage;
pub struct NdJsonCodec;
impl NdJsonCodec {
pub fn new() -> Self {
Self
}
}
impl Default for NdJsonCodec {
fn default() -> Self {
Self::new()
}
}
pub struct StdinReader {
reader: BufReader<tokio::io::Stdin>,
}
impl StdinReader {
pub fn new() -> Self {
Self {
reader: BufReader::new(tokio::io::stdin()),
}
}
pub async fn read_message(&mut self) -> Result<Option<JsonRpcMessage>> {
let mut line = String::new();
loop {
line.clear();
match self.reader.read_line(&mut line).await {
Ok(0) => return Ok(None),
Ok(_) => {
let line = line.trim();
if !line.is_empty() {
let msg =
serde_json::from_str(line).context("Failed to parse NDJSON message")?;
return Ok(Some(msg));
}
}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => return Err(e.into()),
}
}
}
}
impl Default for StdinReader {
fn default() -> Self {
Self::new()
}
}
pub struct StdioWriter {
writer: tokio::io::BufWriter<tokio::io::Stdout>,
}
impl StdioWriter {
pub fn new() -> Self {
Self {
writer: tokio::io::BufWriter::new(tokio::io::stdout()),
}
}
pub async fn write_message(&mut self, msg: &JsonRpcMessage) -> Result<()> {
let line = serde_json::to_string(msg)?;
let mut combined = line.as_bytes().to_vec();
combined.push(b'\n');
self.writer.write_all(&combined).await?;
self.writer.flush().await?;
Ok(())
}
}
impl Default for StdioWriter {
fn default() -> Self {
Self::new()
}
}
pub struct NdJsonStream {
pub reader: StdinReader,
pub writer: StdioWriter,
}
impl NdJsonStream {
pub fn new() -> Self {
Self {
reader: StdinReader::new(),
writer: StdioWriter::new(),
}
}
pub async fn next(&mut self) -> Result<Option<JsonRpcMessage>> {
self.reader.read_message().await
}
pub async fn send(&mut self, msg: &JsonRpcMessage) -> Result<()> {
self.writer.write_message(msg).await
}
}
impl Default for NdJsonStream {
fn default() -> Self {
Self::new()
}
}
pub struct ProcessReader {
reader: BufReader<tokio::process::ChildStdout>,
}
impl ProcessReader {
pub async fn from_child(child: &mut tokio::process::Child) -> Result<Self> {
let stdout = child.stdout.take().context("Child has no stdout")?;
Ok(Self {
reader: BufReader::new(stdout),
})
}
pub async fn read_message(&mut self) -> Result<Option<JsonRpcMessage>> {
let mut line = String::new();
let start = std::time::Instant::now();
let timeout = std::time::Duration::from_secs(30);
loop {
if start.elapsed() > timeout {
anyhow::bail!("Timeout reading from subprocess");
}
line.clear();
tokio::select! {
result = self.reader.read_line(&mut line) => {
match result {
Ok(0) => return Ok(None), Ok(_) => {
let line = line.trim();
if !line.is_empty() {
match serde_json::from_str(line) {
Ok(msg) => return Ok(Some(msg)),
Err(e) => return Err(e.into()),
}
}
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
Err(e) => return Err(e.into()),
}
}
_ = tokio::time::sleep(std::time::Duration::from_millis(10)) => {
}
}
}
}
}
pub struct ProcessWriter {
writer: tokio::io::BufWriter<tokio::process::ChildStdin>,
}
impl ProcessWriter {
pub async fn from_child(child: &mut tokio::process::Child) -> Result<Self> {
let stdin = child.stdin.take().context("Child has no stdin")?;
Ok(Self {
writer: tokio::io::BufWriter::new(stdin),
})
}
pub async fn write_message(&mut self, msg: &JsonRpcMessage) -> Result<()> {
let line = serde_json::to_string(msg)?;
let mut combined = line.as_bytes().to_vec();
combined.push(b'\n');
self.writer.write_all(&combined).await?;
self.writer.flush().await?;
Ok(())
}
}
pub struct SubprocessStream {
pub reader: ProcessReader,
pub writer: ProcessWriter,
}
impl SubprocessStream {
pub async fn new(child: &mut tokio::process::Child) -> Result<Self> {
Ok(Self {
reader: ProcessReader::from_child(child).await?,
writer: ProcessWriter::from_child(child).await?,
})
}
pub async fn next(&mut self) -> Result<Option<JsonRpcMessage>> {
self.reader.read_message().await
}
pub async fn send(&mut self, msg: &JsonRpcMessage) -> Result<()> {
self.writer.write_message(msg).await
}
}