walrus_client/
connection.rs1use anyhow::Result;
4use futures_core::Stream;
5use protocol::codec;
6use protocol::{ClientMessage, ServerMessage};
7use std::path::Path;
8use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
9
10pub struct Connection {
15 reader: OwnedReadHalf,
16 writer: OwnedWriteHalf,
17}
18
19impl Connection {
20 pub async fn connect(socket_path: &Path) -> Result<Self> {
22 let stream = tokio::net::UnixStream::connect(socket_path).await?;
23 tracing::debug!("connected to {}", socket_path.display());
24 let (reader, writer) = stream.into_split();
25 Ok(Self { reader, writer })
26 }
27
28 pub async fn send(&mut self, msg: ClientMessage) -> Result<ServerMessage> {
30 codec::write_message(&mut self.writer, &msg)
31 .await
32 .map_err(|e| anyhow::anyhow!("{e}"))?;
33 self.read_message().await
34 }
35
36 pub fn stream(&mut self, msg: ClientMessage) -> impl Stream<Item = Result<ServerMessage>> + '_ {
41 async_stream::try_stream! {
42 codec::write_message(&mut self.writer, &msg)
43 .await
44 .map_err(|e| anyhow::anyhow!("{e}"))?;
45
46 loop {
47 let server_msg = self.read_message().await?;
48 match &server_msg {
49 ServerMessage::StreamEnd { .. } => break,
50 ServerMessage::Error { .. } => {
51 yield server_msg;
52 break;
53 }
54 _ => yield server_msg,
55 }
56 }
57 }
58 }
59
60 pub fn download_stream(
64 &mut self,
65 msg: ClientMessage,
66 ) -> impl Stream<Item = Result<ServerMessage>> + '_ {
67 async_stream::try_stream! {
68 codec::write_message(&mut self.writer, &msg)
69 .await
70 .map_err(|e| anyhow::anyhow!("{e}"))?;
71
72 loop {
73 let server_msg = self.read_message().await?;
74 match &server_msg {
75 ServerMessage::DownloadEnd { .. } => {
76 yield server_msg;
77 break;
78 }
79 ServerMessage::Error { .. } => {
80 yield server_msg;
81 break;
82 }
83 _ => yield server_msg,
84 }
85 }
86 }
87 }
88
89 pub fn close(self) {
91 drop(self);
92 }
93
94 async fn read_message(&mut self) -> Result<ServerMessage> {
96 codec::read_message(&mut self.reader)
97 .await
98 .map_err(|e| anyhow::anyhow!("{e}"))
99 }
100}