Skip to main content

walrus_protocol/api/
client.rs

1//! Client trait — transport primitives plus typed provided methods.
2
3use crate::message::{
4    AgentDetail, AgentInfoRequest, AgentList, ClearSessionRequest, DownloadEvent, DownloadRequest,
5    GetMemoryRequest, McpAddRequest, McpAdded, McpReloaded, McpRemoveRequest, McpRemoved,
6    McpServerList, MemoryEntry, MemoryList, SendRequest, SendResponse, SessionCleared,
7    SkillsReloaded, StreamEvent, StreamRequest, client::ClientMessage, server::ServerMessage,
8};
9use anyhow::Result;
10use futures_core::Stream;
11use futures_util::StreamExt;
12
13/// Client-side protocol interface.
14///
15/// Implementors provide two transport primitives — [`request`](Client::request)
16/// for request-response and [`request_stream`](Client::request_stream) for
17/// streaming operations. All typed methods are provided defaults that delegate
18/// to these primitives.
19pub trait Client: Send {
20    /// Send a `ClientMessage` and receive a single `ServerMessage`.
21    fn request(
22        &mut self,
23        msg: ClientMessage,
24    ) -> impl std::future::Future<Output = Result<ServerMessage>> + Send;
25
26    /// Send a `ClientMessage` and receive a stream of `ServerMessage`s.
27    ///
28    /// This is a raw transport primitive — the stream reads indefinitely.
29    /// Callers must detect the terminal sentinel (e.g. `StreamEnd`,
30    /// `DownloadEnd`) and stop consuming. The typed streaming methods
31    /// handle this automatically.
32    fn request_stream(
33        &mut self,
34        msg: ClientMessage,
35    ) -> impl Stream<Item = Result<ServerMessage>> + Send + '_;
36
37    /// Send a message to an agent and receive a complete response.
38    fn send(
39        &mut self,
40        req: SendRequest,
41    ) -> impl std::future::Future<Output = Result<SendResponse>> + Send {
42        async move { SendResponse::try_from(self.request(req.into()).await?) }
43    }
44
45    /// Send a message to an agent and receive a streamed response.
46    fn stream(
47        &mut self,
48        req: StreamRequest,
49    ) -> impl Stream<Item = Result<StreamEvent>> + Send + '_ {
50        self.request_stream(req.into())
51            .scan(false, |done, r| {
52                if *done {
53                    return std::future::ready(None);
54                }
55                if matches!(&r, Ok(ServerMessage::StreamEnd { .. })) {
56                    *done = true;
57                }
58                std::future::ready(Some(r))
59            })
60            .map(|r| r.and_then(StreamEvent::try_from))
61    }
62
63    /// Clear the session history for an agent.
64    fn clear_session(
65        &mut self,
66        req: ClearSessionRequest,
67    ) -> impl std::future::Future<Output = Result<SessionCleared>> + Send {
68        async move { SessionCleared::try_from(self.request(req.into()).await?) }
69    }
70
71    /// List all registered agents.
72    fn list_agents(&mut self) -> impl std::future::Future<Output = Result<AgentList>> + Send {
73        async move { AgentList::try_from(self.request(ClientMessage::ListAgents).await?) }
74    }
75
76    /// Get detailed info for a specific agent.
77    fn agent_info(
78        &mut self,
79        req: AgentInfoRequest,
80    ) -> impl std::future::Future<Output = Result<AgentDetail>> + Send {
81        async move { AgentDetail::try_from(self.request(req.into()).await?) }
82    }
83
84    /// List all memory entries.
85    fn list_memory(&mut self) -> impl std::future::Future<Output = Result<MemoryList>> + Send {
86        async move { MemoryList::try_from(self.request(ClientMessage::ListMemory).await?) }
87    }
88
89    /// Get a specific memory entry by key.
90    fn get_memory(
91        &mut self,
92        req: GetMemoryRequest,
93    ) -> impl std::future::Future<Output = Result<MemoryEntry>> + Send {
94        async move { MemoryEntry::try_from(self.request(req.into()).await?) }
95    }
96
97    /// Download a model's files with progress reporting.
98    fn download(
99        &mut self,
100        req: DownloadRequest,
101    ) -> impl Stream<Item = Result<DownloadEvent>> + Send + '_ {
102        self.request_stream(req.into())
103            .scan(false, |done, r| {
104                if *done {
105                    return std::future::ready(None);
106                }
107                if matches!(&r, Ok(ServerMessage::DownloadEnd { .. })) {
108                    *done = true;
109                }
110                std::future::ready(Some(r))
111            })
112            .map(|r| r.and_then(DownloadEvent::try_from))
113    }
114
115    /// Reload skills from disk.
116    fn reload_skills(
117        &mut self,
118    ) -> impl std::future::Future<Output = Result<SkillsReloaded>> + Send {
119        async move { SkillsReloaded::try_from(self.request(ClientMessage::ReloadSkills).await?) }
120    }
121
122    /// Add an MCP server.
123    fn mcp_add(
124        &mut self,
125        req: McpAddRequest,
126    ) -> impl std::future::Future<Output = Result<McpAdded>> + Send {
127        async move { McpAdded::try_from(self.request(req.into()).await?) }
128    }
129
130    /// Remove an MCP server.
131    fn mcp_remove(
132        &mut self,
133        req: McpRemoveRequest,
134    ) -> impl std::future::Future<Output = Result<McpRemoved>> + Send {
135        async move { McpRemoved::try_from(self.request(req.into()).await?) }
136    }
137
138    /// Reload MCP servers from config.
139    fn mcp_reload(&mut self) -> impl std::future::Future<Output = Result<McpReloaded>> + Send {
140        async move { McpReloaded::try_from(self.request(ClientMessage::McpReload).await?) }
141    }
142
143    /// List connected MCP servers.
144    fn mcp_list(&mut self) -> impl std::future::Future<Output = Result<McpServerList>> + Send {
145        async move { McpServerList::try_from(self.request(ClientMessage::McpList).await?) }
146    }
147
148    /// Ping the server (keepalive).
149    fn ping(&mut self) -> impl std::future::Future<Output = Result<()>> + Send {
150        async move {
151            match self.request(ClientMessage::Ping).await? {
152                ServerMessage::Pong => Ok(()),
153                ServerMessage::Error { code, message } => {
154                    anyhow::bail!("server error ({code}): {message}")
155                }
156                other => anyhow::bail!("unexpected response: {other:?}"),
157            }
158        }
159    }
160}