Skip to main content

walrus_socket/
client.rs

1//! Unix domain socket client for connecting to a walrus daemon.
2
3use crate::codec;
4use anyhow::Result;
5use futures_core::Stream;
6use std::path::{Path, PathBuf};
7use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
8use wcore::protocol::{
9    api::Client,
10    message::{client::ClientMessage, server::ServerMessage},
11};
12
13/// Client configuration for connecting to a walrus daemon.
14#[derive(Debug, Clone)]
15pub struct ClientConfig {
16    /// Daemon Unix domain socket path.
17    pub socket_path: PathBuf,
18}
19
20/// Unix domain socket client for the walrus daemon.
21///
22/// Holds configuration. Call [`WalrusClient::connect`] to establish a
23/// connection.
24pub struct WalrusClient {
25    config: ClientConfig,
26}
27
28impl WalrusClient {
29    /// Create a new client with the given configuration.
30    pub fn new(config: ClientConfig) -> Self {
31        Self { config }
32    }
33
34    /// Access the client configuration.
35    pub fn config(&self) -> &ClientConfig {
36        &self.config
37    }
38
39    /// Set the daemon socket path.
40    pub fn socket_path(mut self, path: impl Into<PathBuf>) -> Self {
41        self.config.socket_path = path.into();
42        self
43    }
44
45    /// Connect to the daemon and return a [`Connection`].
46    pub async fn connect(&self) -> Result<Connection> {
47        Connection::connect(&self.config.socket_path).await
48    }
49}
50
51/// An established Unix domain socket connection to a walrus daemon.
52///
53/// Not Clone — one connection per session. Use [`WalrusClient::connect`]
54/// to create a connection.
55pub struct Connection {
56    reader: OwnedReadHalf,
57    writer: OwnedWriteHalf,
58}
59
60impl Connection {
61    /// Connect to a daemon at the given socket path.
62    pub async fn connect(socket_path: &Path) -> Result<Self> {
63        let stream = tokio::net::UnixStream::connect(socket_path).await?;
64        tracing::debug!("connected to {}", socket_path.display());
65        let (reader, writer) = stream.into_split();
66        Ok(Self { reader, writer })
67    }
68}
69
70impl Client for Connection {
71    async fn request(&mut self, msg: ClientMessage) -> Result<ServerMessage> {
72        codec::write_message(&mut self.writer, &msg).await?;
73        Ok(codec::read_message(&mut self.reader).await?)
74    }
75
76    fn request_stream(
77        &mut self,
78        msg: ClientMessage,
79    ) -> impl Stream<Item = Result<ServerMessage>> + Send + '_ {
80        async_stream::try_stream! {
81            codec::write_message(&mut self.writer, &msg).await?;
82
83            loop {
84                let server_msg: ServerMessage = codec::read_message(&mut self.reader).await?;
85
86                match &server_msg {
87                    ServerMessage::Error { code, message } => {
88                        Err(anyhow::anyhow!("server error ({code}): {message}"))?;
89                    }
90                    _ => yield server_msg,
91                }
92            }
93        }
94    }
95}