Skip to main content

walrus_socket/
client.rs

1//! Unix domain socket client for connecting to a walrus daemon.
2
3use crate::codec;
4use anyhow::Result;
5use futures_core::Stream;
6use protocol::{
7    api::Client,
8    message::{client::ClientMessage, server::ServerMessage},
9};
10use std::path::{Path, PathBuf};
11use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
12
13/// Client configuration for connecting to a walrus daemon.
14#[derive(Debug, Clone)]
15pub struct ClientConfig {
16    /// Daemon Unix domain socket path.
17    pub socket_path: PathBuf,
18}
19
20impl Default for ClientConfig {
21    fn default() -> Self {
22        Self {
23            socket_path: default_socket_path(),
24        }
25    }
26}
27
28/// Default socket path: `~/.walrus/walrus.sock`.
29fn default_socket_path() -> PathBuf {
30    dirs::home_dir()
31        .expect("no home directory")
32        .join(".walrus")
33        .join("walrus.sock")
34}
35
36/// Unix domain socket client for the walrus daemon.
37///
38/// Holds configuration. Call [`WalrusClient::connect`] to establish a
39/// connection.
40pub struct WalrusClient {
41    config: ClientConfig,
42}
43
44impl WalrusClient {
45    /// Create a new client with the given configuration.
46    pub fn new(config: ClientConfig) -> Self {
47        Self { config }
48    }
49
50    /// Access the client configuration.
51    pub fn config(&self) -> &ClientConfig {
52        &self.config
53    }
54
55    /// Set the daemon socket path.
56    pub fn socket_path(mut self, path: impl Into<PathBuf>) -> Self {
57        self.config.socket_path = path.into();
58        self
59    }
60
61    /// Connect to the daemon and return a [`Connection`].
62    pub async fn connect(&self) -> Result<Connection> {
63        Connection::connect(&self.config.socket_path).await
64    }
65}
66
67/// An established Unix domain socket connection to a walrus daemon.
68///
69/// Not Clone — one connection per session. Use [`WalrusClient::connect`]
70/// to create a connection.
71pub struct Connection {
72    reader: OwnedReadHalf,
73    writer: OwnedWriteHalf,
74}
75
76impl Connection {
77    /// Connect to a daemon at the given socket path.
78    pub async fn connect(socket_path: &Path) -> Result<Self> {
79        let stream = tokio::net::UnixStream::connect(socket_path).await?;
80        tracing::debug!("connected to {}", socket_path.display());
81        let (reader, writer) = stream.into_split();
82        Ok(Self { reader, writer })
83    }
84}
85
86impl Client for Connection {
87    async fn request(&mut self, msg: ClientMessage) -> Result<ServerMessage> {
88        codec::write_message(&mut self.writer, &msg).await?;
89        Ok(codec::read_message(&mut self.reader).await?)
90    }
91
92    fn request_stream(
93        &mut self,
94        msg: ClientMessage,
95    ) -> impl Stream<Item = Result<ServerMessage>> + Send + '_ {
96        async_stream::try_stream! {
97            codec::write_message(&mut self.writer, &msg).await?;
98
99            loop {
100                let server_msg: ServerMessage = codec::read_message(&mut self.reader).await?;
101
102                match &server_msg {
103                    ServerMessage::Error { code, message } => {
104                        Err(anyhow::anyhow!("server error ({code}): {message}"))?;
105                    }
106                    _ => yield server_msg,
107                }
108            }
109        }
110    }
111}