Skip to main content

walrus_core/protocol/api/
server.rs

1//! Server trait — one async method per protocol operation.
2
3use crate::protocol::message::{
4    AgentDetail, AgentInfoRequest, AgentList, ClearSessionRequest, DownloadEvent, DownloadRequest,
5    GetMemoryRequest, McpAddRequest, McpAdded, McpReloaded, McpRemoveRequest, McpRemoved,
6    McpServerList, MemoryEntry, MemoryList, SendRequest, SendResponse, SessionCleared,
7    SkillsReloaded, StreamEvent, StreamRequest, client::ClientMessage, server::ServerMessage,
8};
9use anyhow::Result;
10use futures_core::Stream;
11use futures_util::StreamExt;
12
13/// Server-side protocol handler.
14///
15/// Each method corresponds to one `ClientMessage` variant. Implementations
16/// receive typed request structs and return typed responses — no enum matching
17/// required. Streaming operations return `impl Stream`.
18///
19/// The provided [`dispatch`](Server::dispatch) method routes a raw
20/// `ClientMessage` to the appropriate handler, returning a stream of
21/// `ServerMessage`s.
22pub trait Server: Sync {
23    /// Handle `Send` — run agent and return complete response.
24    fn send(
25        &self,
26        req: SendRequest,
27    ) -> impl std::future::Future<Output = Result<SendResponse>> + Send;
28
29    /// Handle `Stream` — run agent and stream response events.
30    fn stream(&self, req: StreamRequest) -> impl Stream<Item = Result<StreamEvent>> + Send;
31
32    /// Handle `ClearSession` — clear agent history.
33    fn clear_session(
34        &self,
35        req: ClearSessionRequest,
36    ) -> impl std::future::Future<Output = Result<SessionCleared>> + Send;
37
38    /// Handle `ListAgents` — list all registered agents.
39    fn list_agents(&self) -> impl std::future::Future<Output = Result<AgentList>> + Send;
40
41    /// Handle `AgentInfo` — get agent details.
42    fn agent_info(
43        &self,
44        req: AgentInfoRequest,
45    ) -> impl std::future::Future<Output = Result<AgentDetail>> + Send;
46
47    /// Handle `ListMemory` — list all memory entries.
48    fn list_memory(&self) -> impl std::future::Future<Output = Result<MemoryList>> + Send;
49
50    /// Handle `GetMemory` — get a memory entry by key.
51    fn get_memory(
52        &self,
53        req: GetMemoryRequest,
54    ) -> impl std::future::Future<Output = Result<MemoryEntry>> + Send;
55
56    /// Handle `Download` — download model files with progress.
57    fn download(&self, req: DownloadRequest) -> impl Stream<Item = Result<DownloadEvent>> + Send;
58
59    /// Handle `ReloadSkills` — reload skills from disk.
60    fn reload_skills(&self) -> impl std::future::Future<Output = Result<SkillsReloaded>> + Send;
61
62    /// Handle `McpAdd` — add an MCP server.
63    fn mcp_add(
64        &self,
65        req: McpAddRequest,
66    ) -> impl std::future::Future<Output = Result<McpAdded>> + Send;
67
68    /// Handle `McpRemove` — remove an MCP server.
69    fn mcp_remove(
70        &self,
71        req: McpRemoveRequest,
72    ) -> impl std::future::Future<Output = Result<McpRemoved>> + Send;
73
74    /// Handle `McpReload` — reload MCP servers from config.
75    fn mcp_reload(&self) -> impl std::future::Future<Output = Result<McpReloaded>> + Send;
76
77    /// Handle `McpList` — list connected MCP servers.
78    fn mcp_list(&self) -> impl std::future::Future<Output = Result<McpServerList>> + Send;
79
80    /// Handle `Ping` — keepalive.
81    fn ping(&self) -> impl std::future::Future<Output = Result<()>> + Send;
82
83    /// Dispatch a `ClientMessage` to the appropriate handler method.
84    ///
85    /// Returns a stream of `ServerMessage`s. Request-response operations
86    /// yield exactly one message; streaming operations yield many.
87    fn dispatch(&self, msg: ClientMessage) -> impl Stream<Item = ServerMessage> + Send + '_ {
88        async_stream::stream! {
89            match msg {
90                ClientMessage::Send { agent, content } => {
91                    yield result_to_msg(self.send(SendRequest { agent, content }).await);
92                }
93                ClientMessage::Stream { agent, content } => {
94                    let s = self.stream(StreamRequest { agent, content });
95                    tokio::pin!(s);
96                    while let Some(result) = s.next().await {
97                        yield result_to_msg(result);
98                    }
99                }
100                ClientMessage::ClearSession { agent } => {
101                    yield result_to_msg(
102                        self.clear_session(ClearSessionRequest { agent }).await,
103                    );
104                }
105                ClientMessage::ListAgents => {
106                    yield result_to_msg(self.list_agents().await);
107                }
108                ClientMessage::AgentInfo { agent } => {
109                    yield result_to_msg(self.agent_info(AgentInfoRequest { agent }).await);
110                }
111                ClientMessage::ListMemory => {
112                    yield result_to_msg(self.list_memory().await);
113                }
114                ClientMessage::GetMemory { key } => {
115                    yield result_to_msg(self.get_memory(GetMemoryRequest { key }).await);
116                }
117                ClientMessage::Download { model } => {
118                    let s = self.download(DownloadRequest { model });
119                    tokio::pin!(s);
120                    while let Some(result) = s.next().await {
121                        yield result_to_msg(result);
122                    }
123                }
124                ClientMessage::ReloadSkills => {
125                    yield result_to_msg(self.reload_skills().await);
126                }
127                ClientMessage::McpAdd { name, command, args, env } => {
128                    yield result_to_msg(
129                        self.mcp_add(McpAddRequest { name, command, args, env }).await,
130                    );
131                }
132                ClientMessage::McpRemove { name } => {
133                    yield result_to_msg(
134                        self.mcp_remove(McpRemoveRequest { name }).await,
135                    );
136                }
137                ClientMessage::McpReload => {
138                    yield result_to_msg(self.mcp_reload().await);
139                }
140                ClientMessage::McpList => {
141                    yield result_to_msg(self.mcp_list().await);
142                }
143                ClientMessage::Ping => {
144                    yield match self.ping().await {
145                        Ok(()) => ServerMessage::Pong,
146                        Err(e) => ServerMessage::Error {
147                            code: 500,
148                            message: e.to_string(),
149                        },
150                    };
151                }
152            }
153        }
154    }
155}
156
157/// Convert a typed `Result` into a `ServerMessage`.
158fn result_to_msg<T: Into<ServerMessage>>(result: Result<T>) -> ServerMessage {
159    match result {
160        Ok(resp) => resp.into(),
161        Err(e) => ServerMessage::Error {
162            code: 500,
163            message: e.to_string(),
164        },
165    }
166}