Skip to main content

walrus_core/protocol/api/
client.rs

1//! Client trait — transport primitives plus typed provided methods.
2
3use crate::protocol::message::{
4    ClientMessage, ConfigMsg, DownloadEvent, ErrorMsg, GetConfig, HubMsg, Ping, SendMsg,
5    SendResponse, ServerMessage, ServiceQueryMsg, ServiceQueryResultMsg, SetConfigMsg, StreamEvent,
6    StreamMsg, SubscribeDownloads, SubscribeTasks, client_message, download_event, server_message,
7    stream_event, task_event,
8};
9use anyhow::Result;
10use futures_core::Stream;
11use futures_util::StreamExt;
12
13/// Client-side protocol interface.
14///
15/// Implementors provide two transport primitives — [`request`](Client::request)
16/// for request-response and [`request_stream`](Client::request_stream) for
17/// streaming operations. All typed methods are provided defaults that delegate
18/// to these primitives.
19pub trait Client: Send {
20    /// Send a `ClientMessage` and receive a single `ServerMessage`.
21    fn request(
22        &mut self,
23        msg: ClientMessage,
24    ) -> impl std::future::Future<Output = Result<ServerMessage>> + Send;
25
26    /// Send a `ClientMessage` and receive a stream of `ServerMessage`s.
27    ///
28    /// This is a raw transport primitive — the stream reads indefinitely.
29    /// Callers must detect the terminal sentinel (e.g. `StreamEnd`,
30    /// `DownloadEnd`) and stop consuming. The typed streaming methods
31    /// handle this automatically.
32    fn request_stream(
33        &mut self,
34        msg: ClientMessage,
35    ) -> impl Stream<Item = Result<ServerMessage>> + Send + '_;
36
37    /// Send a message to an agent and receive a complete response.
38    fn send(
39        &mut self,
40        req: SendMsg,
41    ) -> impl std::future::Future<Output = Result<SendResponse>> + Send {
42        async move { SendResponse::try_from(self.request(req.into()).await?) }
43    }
44
45    /// Send a message to an agent and receive a streamed response.
46    fn stream(
47        &mut self,
48        req: StreamMsg,
49    ) -> impl Stream<Item = Result<stream_event::Event>> + Send + '_ {
50        self.request_stream(req.into())
51            .take_while(|r| {
52                std::future::ready(!matches!(
53                    r,
54                    Ok(ServerMessage {
55                        msg: Some(server_message::Msg::Stream(StreamEvent {
56                            event: Some(stream_event::Event::End(_))
57                        }))
58                    })
59                ))
60            })
61            .map(|r| r.and_then(stream_event::Event::try_from))
62    }
63
64    /// Install or uninstall a hub package, streaming download events.
65    fn hub(
66        &mut self,
67        req: HubMsg,
68    ) -> impl Stream<Item = Result<download_event::Event>> + Send + '_ {
69        self.request_stream(ClientMessage {
70            msg: Some(client_message::Msg::Hub(req)),
71        })
72        .take_while(|r| {
73            std::future::ready(!matches!(
74                r,
75                Ok(ServerMessage {
76                    msg: Some(server_message::Msg::Download(DownloadEvent {
77                        event: Some(download_event::Event::Completed(_))
78                    }))
79                })
80            ))
81        })
82        .map(|r| r.and_then(download_event::Event::try_from))
83    }
84
85    /// Ping the server (keepalive).
86    fn ping(&mut self) -> impl std::future::Future<Output = Result<()>> + Send {
87        async move {
88            match self
89                .request(ClientMessage {
90                    msg: Some(client_message::Msg::Ping(Ping {})),
91                })
92                .await?
93            {
94                ServerMessage {
95                    msg: Some(server_message::Msg::Pong(_)),
96                } => Ok(()),
97                ServerMessage {
98                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
99                } => {
100                    anyhow::bail!("server error ({code}): {message}")
101                }
102                other => anyhow::bail!("unexpected response: {other:?}"),
103            }
104        }
105    }
106
107    /// Subscribe to task lifecycle events.
108    ///
109    /// Streams `task_event::Event`s indefinitely until the connection closes.
110    fn subscribe_tasks(&mut self) -> impl Stream<Item = Result<task_event::Event>> + Send + '_ {
111        self.request_stream(ClientMessage {
112            msg: Some(client_message::Msg::SubscribeTasks(SubscribeTasks {})),
113        })
114        .map(|r| r.and_then(task_event::Event::try_from))
115    }
116
117    /// Subscribe to download lifecycle events.
118    ///
119    /// Streams `download_event::Event`s indefinitely until the connection closes.
120    fn subscribe_downloads(
121        &mut self,
122    ) -> impl Stream<Item = Result<download_event::Event>> + Send + '_ {
123        self.request_stream(ClientMessage {
124            msg: Some(client_message::Msg::SubscribeDownloads(
125                SubscribeDownloads {},
126            )),
127        })
128        .map(|r| r.and_then(download_event::Event::try_from))
129    }
130
131    /// Get the full daemon config as JSON.
132    fn get_config(&mut self) -> impl std::future::Future<Output = Result<String>> + Send {
133        async move {
134            match self
135                .request(ClientMessage {
136                    msg: Some(client_message::Msg::GetConfig(GetConfig {})),
137                })
138                .await?
139            {
140                ServerMessage {
141                    msg: Some(server_message::Msg::Config(ConfigMsg { config })),
142                } => Ok(config),
143                ServerMessage {
144                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
145                } => {
146                    anyhow::bail!("server error ({code}): {message}")
147                }
148                other => anyhow::bail!("unexpected response: {other:?}"),
149            }
150        }
151    }
152
153    /// Replace the full daemon config from JSON.
154    fn set_config(
155        &mut self,
156        config: String,
157    ) -> impl std::future::Future<Output = Result<()>> + Send {
158        async move {
159            match self
160                .request(ClientMessage {
161                    msg: Some(client_message::Msg::SetConfig(SetConfigMsg { config })),
162                })
163                .await?
164            {
165                ServerMessage {
166                    msg: Some(server_message::Msg::Pong(_)),
167                } => Ok(()),
168                ServerMessage {
169                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
170                } => {
171                    anyhow::bail!("server error ({code}): {message}")
172                }
173                other => anyhow::bail!("unexpected response: {other:?}"),
174            }
175        }
176    }
177
178    /// Query a named service.
179    fn service_query(
180        &mut self,
181        service: String,
182        query: String,
183    ) -> impl std::future::Future<Output = Result<String>> + Send {
184        async move {
185            match self
186                .request(ClientMessage {
187                    msg: Some(client_message::Msg::ServiceQuery(ServiceQueryMsg {
188                        service,
189                        query,
190                    })),
191                })
192                .await?
193            {
194                ServerMessage {
195                    msg:
196                        Some(server_message::Msg::ServiceQueryResult(ServiceQueryResultMsg { result })),
197                } => Ok(result),
198                ServerMessage {
199                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
200                } => {
201                    anyhow::bail!("server error ({code}): {message}")
202                }
203                other => anyhow::bail!("unexpected response: {other:?}"),
204            }
205        }
206    }
207}