use std::path::{Path, PathBuf};
use futures_util::{SinkExt, StreamExt};
use jsonrpsee::core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT};
use tokio::net::UnixStream;
use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
use tokio_util::codec::{FramedRead, FramedWrite, LinesCodec};
use crate::rpc::{SOCKET_ENV_VAR, default_socket_path};
const MAX_FRAME_BYTES: usize = 1024 * 1024;
type Client = jsonrpsee::core::client::Client;
pub async fn connect() -> anyhow::Result<Client> {
let path = if let Ok(p) = std::env::var(SOCKET_ENV_VAR) {
PathBuf::from(p)
} else {
default_socket_path()?
};
connect_at(&path).await
}
pub async fn connect_at(path: &Path) -> anyhow::Result<Client> {
let stream = UnixStream::connect(path).await.map_err(|e| {
anyhow::anyhow!(
"could not connect to enwiro-daemon at {}: {}",
path.display(),
e
)
})?;
let (read_half, write_half) = stream.into_split();
let sender = UdsSender {
writer: FramedWrite::new(write_half, LinesCodec::new_with_max_length(MAX_FRAME_BYTES)),
};
let receiver = UdsReceiver {
reader: FramedRead::new(read_half, LinesCodec::new_with_max_length(MAX_FRAME_BYTES)),
};
Ok(jsonrpsee::core::client::ClientBuilder::default()
.max_buffer_capacity_per_subscription(64)
.build_with_tokio(sender, receiver))
}
struct UdsSender {
writer: FramedWrite<OwnedWriteHalf, LinesCodec>,
}
impl TransportSenderT for UdsSender {
type Error = TransportError;
async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
self.writer
.send(msg)
.await
.map_err(|e| TransportError(e.to_string()))
}
}
struct UdsReceiver {
reader: FramedRead<OwnedReadHalf, LinesCodec>,
}
impl TransportReceiverT for UdsReceiver {
type Error = TransportError;
async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
match self.reader.next().await {
Some(Ok(line)) => Ok(ReceivedMessage::Text(line)),
Some(Err(e)) => Err(TransportError(e.to_string())),
None => Err(TransportError("daemon closed the connection".into())),
}
}
}
#[derive(Debug, thiserror::Error)]
#[error("uds transport error: {0}")]
pub struct TransportError(String);