walrus_core/protocol/api/
client.rs1use crate::protocol::message::{
4 ClientMessage, ConfigMsg, DownloadEvent, ErrorMsg, GetConfig, HubMsg, Ping, SendMsg,
5 SendResponse, ServerMessage, ServiceQueryMsg, ServiceQueryResultMsg, SetConfigMsg, StreamEvent,
6 StreamMsg, SubscribeDownloads, SubscribeTasks, client_message, download_event, server_message,
7 stream_event, task_event,
8};
9use anyhow::Result;
10use futures_core::Stream;
11use futures_util::StreamExt;
12
13pub trait Client: Send {
20 fn request(
22 &mut self,
23 msg: ClientMessage,
24 ) -> impl std::future::Future<Output = Result<ServerMessage>> + Send;
25
26 fn request_stream(
33 &mut self,
34 msg: ClientMessage,
35 ) -> impl Stream<Item = Result<ServerMessage>> + Send + '_;
36
37 fn send(
39 &mut self,
40 req: SendMsg,
41 ) -> impl std::future::Future<Output = Result<SendResponse>> + Send {
42 async move { SendResponse::try_from(self.request(req.into()).await?) }
43 }
44
45 fn stream(
47 &mut self,
48 req: StreamMsg,
49 ) -> impl Stream<Item = Result<stream_event::Event>> + Send + '_ {
50 self.request_stream(req.into())
51 .take_while(|r| {
52 std::future::ready(!matches!(
53 r,
54 Ok(ServerMessage {
55 msg: Some(server_message::Msg::Stream(StreamEvent {
56 event: Some(stream_event::Event::End(_))
57 }))
58 })
59 ))
60 })
61 .map(|r| r.and_then(stream_event::Event::try_from))
62 }
63
64 fn hub(
66 &mut self,
67 req: HubMsg,
68 ) -> impl Stream<Item = Result<download_event::Event>> + Send + '_ {
69 self.request_stream(ClientMessage {
70 msg: Some(client_message::Msg::Hub(req)),
71 })
72 .take_while(|r| {
73 std::future::ready(!matches!(
74 r,
75 Ok(ServerMessage {
76 msg: Some(server_message::Msg::Download(DownloadEvent {
77 event: Some(download_event::Event::Completed(_))
78 }))
79 })
80 ))
81 })
82 .map(|r| r.and_then(download_event::Event::try_from))
83 }
84
85 fn ping(&mut self) -> impl std::future::Future<Output = Result<()>> + Send {
87 async move {
88 match self
89 .request(ClientMessage {
90 msg: Some(client_message::Msg::Ping(Ping {})),
91 })
92 .await?
93 {
94 ServerMessage {
95 msg: Some(server_message::Msg::Pong(_)),
96 } => Ok(()),
97 ServerMessage {
98 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
99 } => {
100 anyhow::bail!("server error ({code}): {message}")
101 }
102 other => anyhow::bail!("unexpected response: {other:?}"),
103 }
104 }
105 }
106
107 fn subscribe_tasks(&mut self) -> impl Stream<Item = Result<task_event::Event>> + Send + '_ {
111 self.request_stream(ClientMessage {
112 msg: Some(client_message::Msg::SubscribeTasks(SubscribeTasks {})),
113 })
114 .map(|r| r.and_then(task_event::Event::try_from))
115 }
116
117 fn subscribe_downloads(
121 &mut self,
122 ) -> impl Stream<Item = Result<download_event::Event>> + Send + '_ {
123 self.request_stream(ClientMessage {
124 msg: Some(client_message::Msg::SubscribeDownloads(
125 SubscribeDownloads {},
126 )),
127 })
128 .map(|r| r.and_then(download_event::Event::try_from))
129 }
130
131 fn get_config(&mut self) -> impl std::future::Future<Output = Result<String>> + Send {
133 async move {
134 match self
135 .request(ClientMessage {
136 msg: Some(client_message::Msg::GetConfig(GetConfig {})),
137 })
138 .await?
139 {
140 ServerMessage {
141 msg: Some(server_message::Msg::Config(ConfigMsg { config })),
142 } => Ok(config),
143 ServerMessage {
144 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
145 } => {
146 anyhow::bail!("server error ({code}): {message}")
147 }
148 other => anyhow::bail!("unexpected response: {other:?}"),
149 }
150 }
151 }
152
153 fn set_config(
155 &mut self,
156 config: String,
157 ) -> impl std::future::Future<Output = Result<()>> + Send {
158 async move {
159 match self
160 .request(ClientMessage {
161 msg: Some(client_message::Msg::SetConfig(SetConfigMsg { config })),
162 })
163 .await?
164 {
165 ServerMessage {
166 msg: Some(server_message::Msg::Pong(_)),
167 } => Ok(()),
168 ServerMessage {
169 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
170 } => {
171 anyhow::bail!("server error ({code}): {message}")
172 }
173 other => anyhow::bail!("unexpected response: {other:?}"),
174 }
175 }
176 }
177
178 fn service_query(
180 &mut self,
181 service: String,
182 query: String,
183 ) -> impl std::future::Future<Output = Result<String>> + Send {
184 async move {
185 match self
186 .request(ClientMessage {
187 msg: Some(client_message::Msg::ServiceQuery(ServiceQueryMsg {
188 service,
189 query,
190 })),
191 })
192 .await?
193 {
194 ServerMessage {
195 msg:
196 Some(server_message::Msg::ServiceQueryResult(ServiceQueryResultMsg { result })),
197 } => Ok(result),
198 ServerMessage {
199 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
200 } => {
201 anyhow::bail!("server error ({code}): {message}")
202 }
203 other => anyhow::bail!("unexpected response: {other:?}"),
204 }
205 }
206 }
207}