use anyhow::Result;
use async_trait::async_trait;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
use super::jsonrpc::JsonRpcMessage;
#[async_trait]
pub trait Transport: Send + Sync {
async fn send(&mut self, message: &JsonRpcMessage) -> Result<()>;
async fn receive(&mut self) -> Result<Option<JsonRpcMessage>>;
async fn close(&mut self) -> Result<()>;
}
pub struct HttpTransport {
base_url: String,
client: reqwest::Client,
timeout: Duration,
}
impl HttpTransport {
pub fn new(base_url: String) -> Result<Self> {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?;
Ok(Self {
base_url,
client,
timeout: Duration::from_secs(30),
})
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
}
#[async_trait]
impl Transport for HttpTransport {
async fn send(&mut self, message: &JsonRpcMessage) -> Result<()> {
let json_str = message.to_string()?;
let response = self
.client
.post(&self.base_url)
.header("Content-Type", "application/json")
.body(json_str)
.timeout(self.timeout)
.send()
.await?;
if !response.status().is_success() {
return Err(anyhow::anyhow!(
"HTTP request failed with status: {}",
response.status()
));
}
Ok(())
}
async fn receive(&mut self) -> Result<Option<JsonRpcMessage>> {
Ok(None)
}
async fn close(&mut self) -> Result<()> {
Ok(())
}
}
pub struct UnixSocketTransport {
stream: Option<UnixStream>,
reader: Option<BufReader<tokio::net::unix::OwnedReadHalf>>,
writer: Option<tokio::net::unix::OwnedWriteHalf>,
socket_path: String,
}
impl UnixSocketTransport {
pub fn new(socket_path: String) -> Self {
Self {
stream: None,
reader: None,
writer: None,
socket_path,
}
}
pub async fn connect(&mut self) -> Result<()> {
let stream = UnixStream::connect(&self.socket_path).await?;
let (read_half, write_half) = stream.into_split();
self.reader = Some(BufReader::new(read_half));
self.writer = Some(write_half);
Ok(())
}
}
#[async_trait]
impl Transport for UnixSocketTransport {
async fn send(&mut self, message: &JsonRpcMessage) -> Result<()> {
let writer = self
.writer
.as_mut()
.ok_or_else(|| anyhow::anyhow!("Unix socket not connected"))?;
let json_str = message.to_string()?;
let message_with_delimiter = format!("{}\n", json_str);
writer.write_all(message_with_delimiter.as_bytes()).await?;
writer.flush().await?;
Ok(())
}
async fn receive(&mut self) -> Result<Option<JsonRpcMessage>> {
let reader = self
.reader
.as_mut()
.ok_or_else(|| anyhow::anyhow!("Unix socket not connected"))?;
let mut line = String::new();
let bytes_read = reader.read_line(&mut line).await?;
if bytes_read == 0 {
return Ok(None);
}
let message = line.trim().parse::<JsonRpcMessage>()?;
Ok(Some(message))
}
async fn close(&mut self) -> Result<()> {
if let Some(writer) = self.writer.take() {
drop(writer);
}
if let Some(reader) = self.reader.take() {
drop(reader);
}
self.stream = None;
Ok(())
}
}
pub struct InMemoryTransport {
send_queue: tokio::sync::mpsc::UnboundedSender<JsonRpcMessage>,
recv_queue: tokio::sync::mpsc::UnboundedReceiver<JsonRpcMessage>,
}
impl InMemoryTransport {
pub fn pair() -> (Self, Self) {
let (tx1, rx1) = tokio::sync::mpsc::unbounded_channel();
let (tx2, rx2) = tokio::sync::mpsc::unbounded_channel();
let transport1 = Self {
send_queue: tx2,
recv_queue: rx1,
};
let transport2 = Self {
send_queue: tx1,
recv_queue: rx2,
};
(transport1, transport2)
}
}
#[async_trait]
impl Transport for InMemoryTransport {
async fn send(&mut self, message: &JsonRpcMessage) -> Result<()> {
self.send_queue
.send(message.clone())
.map_err(|e| anyhow::anyhow!("Failed to send message: {}", e))?;
Ok(())
}
async fn receive(&mut self) -> Result<Option<JsonRpcMessage>> {
match self.recv_queue.recv().await {
Some(message) => Ok(Some(message)),
None => Ok(None),
}
}
async fn close(&mut self) -> Result<()> {
self.recv_queue.close();
Ok(())
}
}