walrus_core/protocol/api/
client.rs1use crate::protocol::message::{
4 DownloadEvent, DownloadRequest, HubEvent, HubRequest, SendRequest, SendResponse, StreamEvent,
5 StreamRequest, client::ClientMessage, server::ServerMessage,
6};
7use anyhow::Result;
8use futures_core::Stream;
9use futures_util::StreamExt;
10
11pub trait Client: Send {
18 fn request(
20 &mut self,
21 msg: ClientMessage,
22 ) -> impl std::future::Future<Output = Result<ServerMessage>> + Send;
23
24 fn request_stream(
31 &mut self,
32 msg: ClientMessage,
33 ) -> impl Stream<Item = Result<ServerMessage>> + Send + '_;
34
35 fn send(
37 &mut self,
38 req: SendRequest,
39 ) -> impl std::future::Future<Output = Result<SendResponse>> + Send {
40 async move { SendResponse::try_from(self.request(req.into()).await?) }
41 }
42
43 fn stream(
45 &mut self,
46 req: StreamRequest,
47 ) -> impl Stream<Item = Result<StreamEvent>> + Send + '_ {
48 self.request_stream(req.into())
49 .take_while(|r| {
50 std::future::ready(!matches!(
51 r,
52 Ok(ServerMessage::Stream(StreamEvent::End { .. }))
53 ))
54 })
55 .map(|r| r.and_then(StreamEvent::try_from))
56 }
57
58 fn download(
60 &mut self,
61 req: DownloadRequest,
62 ) -> impl Stream<Item = Result<DownloadEvent>> + Send + '_ {
63 self.request_stream(req.into())
64 .take_while(|r| {
65 std::future::ready(!matches!(
66 r,
67 Ok(ServerMessage::Download(DownloadEvent::End { .. }))
68 ))
69 })
70 .map(|r| r.and_then(DownloadEvent::try_from))
71 }
72
73 fn hub(&mut self, req: HubRequest) -> impl Stream<Item = Result<HubEvent>> + Send + '_ {
75 self.request_stream(req.into())
76 .take_while(|r| {
77 std::future::ready(!matches!(r, Ok(ServerMessage::Hub(HubEvent::End { .. }))))
78 })
79 .map(|r| r.and_then(HubEvent::try_from))
80 }
81
82 fn ping(&mut self) -> impl std::future::Future<Output = Result<()>> + Send {
84 async move {
85 match self.request(ClientMessage::Ping).await? {
86 ServerMessage::Pong => Ok(()),
87 ServerMessage::Error { code, message } => {
88 anyhow::bail!("server error ({code}): {message}")
89 }
90 other => anyhow::bail!("unexpected response: {other:?}"),
91 }
92 }
93 }
94}