use anyhow::Result;
use futures_core::Stream;
use std::path::{Path, PathBuf};
use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
use wcore::protocol::{
api::Client,
codec,
message::{ClientMessage, ErrorMsg, ServerMessage, server_message},
};
#[derive(Debug, Clone)]
pub struct ClientConfig {
pub socket_path: PathBuf,
}
pub struct WalrusClient {
config: ClientConfig,
}
impl WalrusClient {
pub fn new(config: ClientConfig) -> Self {
Self { config }
}
pub fn config(&self) -> &ClientConfig {
&self.config
}
pub fn socket_path(mut self, path: impl Into<PathBuf>) -> Self {
self.config.socket_path = path.into();
self
}
pub async fn connect(&self) -> Result<Connection> {
Connection::connect(&self.config.socket_path).await
}
}
pub struct Connection {
reader: OwnedReadHalf,
writer: OwnedWriteHalf,
}
impl Connection {
pub async fn connect(socket_path: &Path) -> Result<Self> {
let stream = tokio::net::UnixStream::connect(socket_path).await?;
tracing::debug!("connected to {}", socket_path.display());
let (reader, writer) = stream.into_split();
Ok(Self { reader, writer })
}
}
impl Client for Connection {
async fn request(&mut self, msg: ClientMessage) -> Result<ServerMessage> {
codec::write_message(&mut self.writer, &msg).await?;
Ok(codec::read_message(&mut self.reader).await?)
}
fn request_stream(
&mut self,
msg: ClientMessage,
) -> impl Stream<Item = Result<ServerMessage>> + Send + '_ {
async_stream::try_stream! {
codec::write_message(&mut self.writer, &msg).await?;
loop {
let server_msg: ServerMessage = codec::read_message(&mut self.reader).await?;
match &server_msg {
ServerMessage { msg: Some(server_message::Msg::Error(ErrorMsg { code, message })) } => {
Err(anyhow::anyhow!("server error ({code}): {message}"))?;
}
_ => yield server_msg,
}
}
}
}
}