walrus_core/protocol/api/
client.rs1use crate::protocol::message::{
4 DownloadEvent, DownloadRequest, HubEvent, HubRequest, SendRequest, SendResponse, StreamEvent,
5 StreamRequest, client::ClientMessage, server::ServerMessage,
6};
7use anyhow::Result;
8use futures_core::Stream;
9use futures_util::StreamExt;
10
11pub trait Client: Send {
18 fn request(
20 &mut self,
21 msg: ClientMessage,
22 ) -> impl std::future::Future<Output = Result<ServerMessage>> + Send;
23
24 fn request_stream(
31 &mut self,
32 msg: ClientMessage,
33 ) -> impl Stream<Item = Result<ServerMessage>> + Send + '_;
34
35 fn send(
37 &mut self,
38 req: SendRequest,
39 ) -> impl std::future::Future<Output = Result<SendResponse>> + Send {
40 async move { SendResponse::try_from(self.request(req.into()).await?) }
41 }
42
43 fn stream(
45 &mut self,
46 req: StreamRequest,
47 ) -> impl Stream<Item = Result<StreamEvent>> + Send + '_ {
48 self.request_stream(req.into())
49 .scan(false, |done, r| {
50 if *done {
51 return std::future::ready(None);
52 }
53 if matches!(&r, Ok(ServerMessage::Stream(StreamEvent::End { .. }))) {
54 *done = true;
55 }
56 std::future::ready(Some(r))
57 })
58 .map(|r| r.and_then(StreamEvent::try_from))
59 }
60
61 fn download(
63 &mut self,
64 req: DownloadRequest,
65 ) -> impl Stream<Item = Result<DownloadEvent>> + Send + '_ {
66 self.request_stream(req.into())
67 .scan(false, |done, r| {
68 if *done {
69 return std::future::ready(None);
70 }
71 if matches!(&r, Ok(ServerMessage::Download(DownloadEvent::End { .. }))) {
72 *done = true;
73 }
74 std::future::ready(Some(r))
75 })
76 .map(|r| r.and_then(DownloadEvent::try_from))
77 }
78
79 fn hub(&mut self, req: HubRequest) -> impl Stream<Item = Result<HubEvent>> + Send + '_ {
81 self.request_stream(req.into())
82 .scan(false, |done, r| {
83 if *done {
84 return std::future::ready(None);
85 }
86 if matches!(&r, Ok(ServerMessage::Hub(HubEvent::End { .. }))) {
87 *done = true;
88 }
89 std::future::ready(Some(r))
90 })
91 .map(|r| r.and_then(HubEvent::try_from))
92 }
93
94 fn ping(&mut self) -> impl std::future::Future<Output = Result<()>> + Send {
96 async move {
97 match self.request(ClientMessage::Ping).await? {
98 ServerMessage::Pong => Ok(()),
99 ServerMessage::Error { code, message } => {
100 anyhow::bail!("server error ({code}): {message}")
101 }
102 other => anyhow::bail!("unexpected response: {other:?}"),
103 }
104 }
105 }
106}