Skip to main content

walrus_client/
connection.rs

1//! Unix domain socket connection to the walrus-gateway.
2
3use anyhow::Result;
4use futures_core::Stream;
5use protocol::codec;
6use protocol::{ClientMessage, ServerMessage};
7use std::path::Path;
8use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
9
10/// An established Unix domain socket connection to a walrus-gateway.
11///
12/// Not Clone — one connection per session. Use [`super::WalrusClient::connect`]
13/// to create a connection.
14pub struct Connection {
15    reader: OwnedReadHalf,
16    writer: OwnedWriteHalf,
17}
18
19impl Connection {
20    /// Connect to a gateway at the given socket path.
21    pub async fn connect(socket_path: &Path) -> Result<Self> {
22        let stream = tokio::net::UnixStream::connect(socket_path).await?;
23        tracing::debug!("connected to {}", socket_path.display());
24        let (reader, writer) = stream.into_split();
25        Ok(Self { reader, writer })
26    }
27
28    /// Send a message and wait for a single response.
29    pub async fn send(&mut self, msg: ClientMessage) -> Result<ServerMessage> {
30        codec::write_message(&mut self.writer, &msg)
31            .await
32            .map_err(|e| anyhow::anyhow!("{e}"))?;
33        self.read_message().await
34    }
35
36    /// Send a message and return a stream of server responses.
37    ///
38    /// The stream yields messages until `StreamEnd` is received or the
39    /// connection closes. The `StreamEnd` message itself is not yielded.
40    pub fn stream(&mut self, msg: ClientMessage) -> impl Stream<Item = Result<ServerMessage>> + '_ {
41        async_stream::try_stream! {
42            codec::write_message(&mut self.writer, &msg)
43                .await
44                .map_err(|e| anyhow::anyhow!("{e}"))?;
45
46            loop {
47                let server_msg = self.read_message().await?;
48                match &server_msg {
49                    ServerMessage::StreamEnd { .. } => break,
50                    ServerMessage::Error { .. } => {
51                        yield server_msg;
52                        break;
53                    }
54                    _ => yield server_msg,
55                }
56            }
57        }
58    }
59
60    /// Send a download request and return a stream of progress messages.
61    ///
62    /// The stream yields messages until `DownloadEnd` or `Error` is received.
63    pub fn download_stream(
64        &mut self,
65        msg: ClientMessage,
66    ) -> impl Stream<Item = Result<ServerMessage>> + '_ {
67        async_stream::try_stream! {
68            codec::write_message(&mut self.writer, &msg)
69                .await
70                .map_err(|e| anyhow::anyhow!("{e}"))?;
71
72            loop {
73                let server_msg = self.read_message().await?;
74                match &server_msg {
75                    ServerMessage::DownloadEnd { .. } => {
76                        yield server_msg;
77                        break;
78                    }
79                    ServerMessage::Error { .. } => {
80                        yield server_msg;
81                        break;
82                    }
83                    _ => yield server_msg,
84                }
85            }
86        }
87    }
88
89    /// Close the connection by dropping both halves.
90    pub fn close(self) {
91        drop(self);
92    }
93
94    /// Read and deserialize a single ServerMessage from the socket.
95    async fn read_message(&mut self) -> Result<ServerMessage> {
96        codec::read_message(&mut self.reader)
97            .await
98            .map_err(|e| anyhow::anyhow!("{e}"))
99    }
100}