Skip to main content

walrus_core/protocol/api/
client.rs

1//! Client trait — transport primitives plus typed provided methods.
2
3use crate::protocol::message::{
4    DownloadEvent, DownloadRequest, HubEvent, HubRequest, SendRequest, SendResponse, StreamEvent,
5    StreamRequest, client::ClientMessage, server::ServerMessage,
6};
7use anyhow::Result;
8use futures_core::Stream;
9use futures_util::StreamExt;
10
11/// Client-side protocol interface.
12///
13/// Implementors provide two transport primitives — [`request`](Client::request)
14/// for request-response and [`request_stream`](Client::request_stream) for
15/// streaming operations. All typed methods are provided defaults that delegate
16/// to these primitives.
17pub trait Client: Send {
18    /// Send a `ClientMessage` and receive a single `ServerMessage`.
19    fn request(
20        &mut self,
21        msg: ClientMessage,
22    ) -> impl std::future::Future<Output = Result<ServerMessage>> + Send;
23
24    /// Send a `ClientMessage` and receive a stream of `ServerMessage`s.
25    ///
26    /// This is a raw transport primitive — the stream reads indefinitely.
27    /// Callers must detect the terminal sentinel (e.g. `StreamEnd`,
28    /// `DownloadEnd`) and stop consuming. The typed streaming methods
29    /// handle this automatically.
30    fn request_stream(
31        &mut self,
32        msg: ClientMessage,
33    ) -> impl Stream<Item = Result<ServerMessage>> + Send + '_;
34
35    /// Send a message to an agent and receive a complete response.
36    fn send(
37        &mut self,
38        req: SendRequest,
39    ) -> impl std::future::Future<Output = Result<SendResponse>> + Send {
40        async move { SendResponse::try_from(self.request(req.into()).await?) }
41    }
42
43    /// Send a message to an agent and receive a streamed response.
44    fn stream(
45        &mut self,
46        req: StreamRequest,
47    ) -> impl Stream<Item = Result<StreamEvent>> + Send + '_ {
48        self.request_stream(req.into())
49            .scan(false, |done, r| {
50                if *done {
51                    return std::future::ready(None);
52                }
53                if matches!(&r, Ok(ServerMessage::Stream(StreamEvent::End { .. }))) {
54                    *done = true;
55                }
56                std::future::ready(Some(r))
57            })
58            .map(|r| r.and_then(StreamEvent::try_from))
59    }
60
61    /// Download a model's files with progress reporting.
62    fn download(
63        &mut self,
64        req: DownloadRequest,
65    ) -> impl Stream<Item = Result<DownloadEvent>> + Send + '_ {
66        self.request_stream(req.into())
67            .scan(false, |done, r| {
68                if *done {
69                    return std::future::ready(None);
70                }
71                if matches!(&r, Ok(ServerMessage::Download(DownloadEvent::End { .. }))) {
72                    *done = true;
73                }
74                std::future::ready(Some(r))
75            })
76            .map(|r| r.and_then(DownloadEvent::try_from))
77    }
78
79    /// Install or uninstall a hub package, streaming progress events.
80    fn hub(&mut self, req: HubRequest) -> impl Stream<Item = Result<HubEvent>> + Send + '_ {
81        self.request_stream(req.into())
82            .scan(false, |done, r| {
83                if *done {
84                    return std::future::ready(None);
85                }
86                if matches!(&r, Ok(ServerMessage::Hub(HubEvent::End { .. }))) {
87                    *done = true;
88                }
89                std::future::ready(Some(r))
90            })
91            .map(|r| r.and_then(HubEvent::try_from))
92    }
93
94    /// Ping the server (keepalive).
95    fn ping(&mut self) -> impl std::future::Future<Output = Result<()>> + Send {
96        async move {
97            match self.request(ClientMessage::Ping).await? {
98                ServerMessage::Pong => Ok(()),
99                ServerMessage::Error { code, message } => {
100                    anyhow::bail!("server error ({code}): {message}")
101                }
102                other => anyhow::bail!("unexpected response: {other:?}"),
103            }
104        }
105    }
106}