Skip to main content

crabtalk_core/protocol/api/
server.rs

1//! Server trait — one async method per protocol operation.
2
3use crate::protocol::message::{
4    AgentEventMsg, ClientMessage, CompactResponse, ConfigMsg, CreateCronMsg, CronInfo, CronList,
5    DaemonStats, ErrorMsg, Pong, SendMsg, SendResponse, ServerMessage, SessionInfo, SessionList,
6    StreamEvent, StreamMsg, client_message, server_message,
7};
8use anyhow::Result;
9use futures_core::Stream;
10use futures_util::StreamExt;
11
12/// Construct an error `ServerMessage`.
13fn server_error(code: u32, message: String) -> ServerMessage {
14    ServerMessage {
15        msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
16    }
17}
18
19/// Construct a pong `ServerMessage`.
20fn server_pong() -> ServerMessage {
21    ServerMessage {
22        msg: Some(server_message::Msg::Pong(Pong {})),
23    }
24}
25
26/// Convert a typed `Result` into a `ServerMessage`.
27fn result_to_msg<T: Into<ServerMessage>>(result: Result<T>) -> ServerMessage {
28    match result {
29        Ok(resp) => resp.into(),
30        Err(e) => server_error(500, e.to_string()),
31    }
32}
33
34/// Server-side protocol handler.
35///
36/// Each method corresponds to one `ClientMessage` variant. Implementations
37/// receive typed request structs and return typed responses — no enum matching
38/// required. Streaming operations return `impl Stream`.
39///
40/// The provided [`dispatch`](Server::dispatch) method routes a raw
41/// `ClientMessage` to the appropriate handler, returning a stream of
42/// `ServerMessage`s.
43pub trait Server: Sync {
44    /// Handle `Send` — run agent and return complete response.
45    fn send(&self, req: SendMsg) -> impl std::future::Future<Output = Result<SendResponse>> + Send;
46
47    /// Handle `Stream` — run agent and stream response events.
48    fn stream(&self, req: StreamMsg) -> impl Stream<Item = Result<StreamEvent>> + Send;
49
50    /// Handle `Ping` — keepalive.
51    fn ping(&self) -> impl std::future::Future<Output = Result<()>> + Send;
52
53    /// Handle `Sessions` — list active sessions.
54    fn list_sessions(&self) -> impl std::future::Future<Output = Result<Vec<SessionInfo>>> + Send;
55
56    /// Handle `Kill` — close a session by ID.
57    fn kill_session(&self, session: u64) -> impl std::future::Future<Output = Result<bool>> + Send;
58
59    /// Handle `SubscribeEvents` — stream agent events.
60    fn subscribe_events(&self) -> impl Stream<Item = Result<AgentEventMsg>> + Send;
61
62    /// Handle `GetConfig` — return the full daemon config as JSON.
63    fn get_config(&self) -> impl std::future::Future<Output = Result<String>> + Send;
64
65    /// Handle `SetConfig` — replace the daemon config from JSON.
66    fn set_config(&self, config: String) -> impl std::future::Future<Output = Result<()>> + Send;
67
68    /// Handle `Reload` — hot-reload runtime from disk.
69    fn reload(&self) -> impl std::future::Future<Output = Result<()>> + Send;
70
71    /// Handle `GetStats` — return daemon-level stats.
72    fn get_stats(&self) -> impl std::future::Future<Output = Result<DaemonStats>> + Send;
73
74    /// Handle `CreateCron` — create a new cron entry and start its timer.
75    fn create_cron(
76        &self,
77        req: CreateCronMsg,
78    ) -> impl std::future::Future<Output = Result<CronInfo>> + Send;
79
80    /// Handle `DeleteCron` — remove a cron entry and stop its timer.
81    fn delete_cron(&self, id: u64) -> impl std::future::Future<Output = Result<bool>> + Send;
82
83    /// Handle `ListCrons` — return all cron entries.
84    fn list_crons(&self) -> impl std::future::Future<Output = Result<CronList>> + Send;
85
86    /// Handle `Compact` — compact a session's history into a summary.
87    fn compact_session(
88        &self,
89        session: u64,
90    ) -> impl std::future::Future<Output = Result<String>> + Send;
91
92    /// Handle `ReplyToAsk` — deliver a user reply to a pending `ask_user` tool call.
93    fn reply_to_ask(
94        &self,
95        session: u64,
96        content: String,
97    ) -> impl std::future::Future<Output = Result<()>> + Send;
98
99    /// Dispatch a `ClientMessage` to the appropriate handler method.
100    ///
101    /// Returns a stream of `ServerMessage`s. Request-response operations
102    /// yield exactly one message; streaming operations yield many.
103    fn dispatch(&self, msg: ClientMessage) -> impl Stream<Item = ServerMessage> + Send + '_ {
104        async_stream::stream! {
105            let Some(inner) = msg.msg else {
106                yield server_error(400, "empty client message".to_string());
107                return;
108            };
109
110            match inner {
111                client_message::Msg::Send(send_msg) => {
112                    yield result_to_msg(self.send(send_msg).await);
113                }
114                client_message::Msg::Stream(stream_msg) => {
115                    let s = self.stream(stream_msg);
116                    tokio::pin!(s);
117                    while let Some(result) = s.next().await {
118                        yield result_to_msg(result);
119                    }
120                }
121                client_message::Msg::Ping(_) => {
122                    yield match self.ping().await {
123                        Ok(()) => server_pong(),
124                        Err(e) => server_error(500, e.to_string()),
125                    };
126                }
127                client_message::Msg::Sessions(_) => {
128                    yield match self.list_sessions().await {
129                        Ok(sessions) => ServerMessage {
130                            msg: Some(server_message::Msg::Sessions(SessionList { sessions })),
131                        },
132                        Err(e) => server_error(500, e.to_string()),
133                    };
134                }
135                client_message::Msg::Kill(kill_msg) => {
136                    yield match self.kill_session(kill_msg.session).await {
137                        Ok(true) => server_pong(),
138                        Ok(false) => server_error(
139                            404,
140                            format!("session {} not found", kill_msg.session),
141                        ),
142                        Err(e) => server_error(500, e.to_string()),
143                    };
144                }
145                client_message::Msg::GetConfig(_) => {
146                    yield match self.get_config().await {
147                        Ok(config) => ServerMessage {
148                            msg: Some(server_message::Msg::Config(ConfigMsg { config })),
149                        },
150                        Err(e) => server_error(500, e.to_string()),
151                    };
152                }
153                client_message::Msg::SetConfig(set_config_msg) => {
154                    yield match self.set_config(set_config_msg.config).await {
155                        Ok(()) => server_pong(),
156                        Err(e) => server_error(500, e.to_string()),
157                    };
158                }
159                client_message::Msg::SubscribeEvents(_) => {
160                    let s = self.subscribe_events();
161                    tokio::pin!(s);
162                    while let Some(result) = s.next().await {
163                        yield result_to_msg(result);
164                    }
165                }
166                client_message::Msg::Reload(_) => {
167                    yield match self.reload().await {
168                        Ok(()) => server_pong(),
169                        Err(e) => server_error(500, e.to_string()),
170                    };
171                }
172                client_message::Msg::ReplyToAsk(msg) => {
173                    yield match self.reply_to_ask(msg.session, msg.content).await {
174                        Ok(()) => server_pong(),
175                        Err(e) => server_error(404, e.to_string()),
176                    };
177                }
178                client_message::Msg::GetStats(_) => {
179                    yield match self.get_stats().await {
180                        Ok(stats) => ServerMessage {
181                            msg: Some(server_message::Msg::Stats(stats)),
182                        },
183                        Err(e) => server_error(500, e.to_string()),
184                    };
185                }
186                client_message::Msg::CreateCron(req) => {
187                    yield match self.create_cron(req).await {
188                        Ok(info) => ServerMessage {
189                            msg: Some(server_message::Msg::CronInfo(info)),
190                        },
191                        Err(e) => server_error(500, e.to_string()),
192                    };
193                }
194                client_message::Msg::DeleteCron(req) => {
195                    yield match self.delete_cron(req.id).await {
196                        Ok(true) => server_pong(),
197                        Ok(false) => server_error(404, format!("cron {} not found", req.id)),
198                        Err(e) => server_error(500, e.to_string()),
199                    };
200                }
201                client_message::Msg::ListCrons(_) => {
202                    yield match self.list_crons().await {
203                        Ok(list) => ServerMessage {
204                            msg: Some(server_message::Msg::CronList(list)),
205                        },
206                        Err(e) => server_error(500, e.to_string()),
207                    };
208                }
209                client_message::Msg::Compact(req) => {
210                    yield match self.compact_session(req.session).await {
211                        Ok(summary) => ServerMessage {
212                            msg: Some(server_message::Msg::Compact(CompactResponse { summary })),
213                        },
214                        Err(e) => server_error(500, e.to_string()),
215                    };
216                }
217            }
218        }
219    }
220}