1use crate::codec;
4use anyhow::Result;
5use futures_core::Stream;
6use std::path::{Path, PathBuf};
7use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
8use wcore::protocol::{
9 api::Client,
10 message::{client::ClientMessage, server::ServerMessage},
11};
12
13#[derive(Debug, Clone)]
15pub struct ClientConfig {
16 pub socket_path: PathBuf,
18}
19
20pub struct WalrusClient {
25 config: ClientConfig,
26}
27
28impl WalrusClient {
29 pub fn new(config: ClientConfig) -> Self {
31 Self { config }
32 }
33
34 pub fn config(&self) -> &ClientConfig {
36 &self.config
37 }
38
39 pub fn socket_path(mut self, path: impl Into<PathBuf>) -> Self {
41 self.config.socket_path = path.into();
42 self
43 }
44
45 pub async fn connect(&self) -> Result<Connection> {
47 Connection::connect(&self.config.socket_path).await
48 }
49}
50
51pub struct Connection {
56 reader: OwnedReadHalf,
57 writer: OwnedWriteHalf,
58}
59
60impl Connection {
61 pub async fn connect(socket_path: &Path) -> Result<Self> {
63 let stream = tokio::net::UnixStream::connect(socket_path).await?;
64 tracing::debug!("connected to {}", socket_path.display());
65 let (reader, writer) = stream.into_split();
66 Ok(Self { reader, writer })
67 }
68}
69
70impl Client for Connection {
71 async fn request(&mut self, msg: ClientMessage) -> Result<ServerMessage> {
72 codec::write_message(&mut self.writer, &msg).await?;
73 Ok(codec::read_message(&mut self.reader).await?)
74 }
75
76 fn request_stream(
77 &mut self,
78 msg: ClientMessage,
79 ) -> impl Stream<Item = Result<ServerMessage>> + Send + '_ {
80 async_stream::try_stream! {
81 codec::write_message(&mut self.writer, &msg).await?;
82
83 loop {
84 let server_msg: ServerMessage = codec::read_message(&mut self.reader).await?;
85
86 match &server_msg {
87 ServerMessage::Error { code, message } => {
88 Err(anyhow::anyhow!("server error ({code}): {message}"))?;
89 }
90 _ => yield server_msg,
91 }
92 }
93 }
94 }
95}