Skip to main content

crabtalk_core/protocol/api/
server.rs

1//! Server trait — one async method per protocol operation.
2
3use crate::protocol::message::{
4    AgentEventMsg, ClientMessage, ConfigMsg, ErrorMsg, Pong, SendMsg, SendResponse, ServerMessage,
5    SessionInfo, SessionList, StreamEvent, StreamMsg, client_message, server_message,
6};
7use anyhow::Result;
8use futures_core::Stream;
9use futures_util::StreamExt;
10
11/// Construct an error `ServerMessage`.
12fn server_error(code: u32, message: String) -> ServerMessage {
13    ServerMessage {
14        msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
15    }
16}
17
18/// Construct a pong `ServerMessage`.
19fn server_pong() -> ServerMessage {
20    ServerMessage {
21        msg: Some(server_message::Msg::Pong(Pong {})),
22    }
23}
24
25/// Convert a typed `Result` into a `ServerMessage`.
26fn result_to_msg<T: Into<ServerMessage>>(result: Result<T>) -> ServerMessage {
27    match result {
28        Ok(resp) => resp.into(),
29        Err(e) => server_error(500, e.to_string()),
30    }
31}
32
33/// Server-side protocol handler.
34///
35/// Each method corresponds to one `ClientMessage` variant. Implementations
36/// receive typed request structs and return typed responses — no enum matching
37/// required. Streaming operations return `impl Stream`.
38///
39/// The provided [`dispatch`](Server::dispatch) method routes a raw
40/// `ClientMessage` to the appropriate handler, returning a stream of
41/// `ServerMessage`s.
42pub trait Server: Sync {
43    /// Handle `Send` — run agent and return complete response.
44    fn send(&self, req: SendMsg) -> impl std::future::Future<Output = Result<SendResponse>> + Send;
45
46    /// Handle `Stream` — run agent and stream response events.
47    fn stream(&self, req: StreamMsg) -> impl Stream<Item = Result<StreamEvent>> + Send;
48
49    /// Handle `Ping` — keepalive.
50    fn ping(&self) -> impl std::future::Future<Output = Result<()>> + Send;
51
52    /// Handle `Sessions` — list active sessions.
53    fn list_sessions(&self) -> impl std::future::Future<Output = Result<Vec<SessionInfo>>> + Send;
54
55    /// Handle `Kill` — close a session by ID.
56    fn kill_session(&self, session: u64) -> impl std::future::Future<Output = Result<bool>> + Send;
57
58    /// Handle `SubscribeEvents` — stream agent events.
59    fn subscribe_events(&self) -> impl Stream<Item = Result<AgentEventMsg>> + Send;
60
61    /// Handle `GetConfig` — return the full daemon config as JSON.
62    fn get_config(&self) -> impl std::future::Future<Output = Result<String>> + Send;
63
64    /// Handle `SetConfig` — replace the daemon config from JSON.
65    fn set_config(&self, config: String) -> impl std::future::Future<Output = Result<()>> + Send;
66
67    /// Handle `Reload` — hot-reload runtime from disk.
68    fn reload(&self) -> impl std::future::Future<Output = Result<()>> + Send;
69
70    /// Handle `ReplyToAsk` — deliver a user reply to a pending `ask_user` tool call.
71    fn reply_to_ask(
72        &self,
73        session: u64,
74        content: String,
75    ) -> impl std::future::Future<Output = Result<()>> + Send;
76
77    /// Dispatch a `ClientMessage` to the appropriate handler method.
78    ///
79    /// Returns a stream of `ServerMessage`s. Request-response operations
80    /// yield exactly one message; streaming operations yield many.
81    fn dispatch(&self, msg: ClientMessage) -> impl Stream<Item = ServerMessage> + Send + '_ {
82        async_stream::stream! {
83            let Some(inner) = msg.msg else {
84                yield server_error(400, "empty client message".to_string());
85                return;
86            };
87
88            match inner {
89                client_message::Msg::Send(send_msg) => {
90                    yield result_to_msg(self.send(send_msg).await);
91                }
92                client_message::Msg::Stream(stream_msg) => {
93                    let s = self.stream(stream_msg);
94                    tokio::pin!(s);
95                    while let Some(result) = s.next().await {
96                        yield result_to_msg(result);
97                    }
98                }
99                client_message::Msg::Ping(_) => {
100                    yield match self.ping().await {
101                        Ok(()) => server_pong(),
102                        Err(e) => server_error(500, e.to_string()),
103                    };
104                }
105                client_message::Msg::Sessions(_) => {
106                    yield match self.list_sessions().await {
107                        Ok(sessions) => ServerMessage {
108                            msg: Some(server_message::Msg::Sessions(SessionList { sessions })),
109                        },
110                        Err(e) => server_error(500, e.to_string()),
111                    };
112                }
113                client_message::Msg::Kill(kill_msg) => {
114                    yield match self.kill_session(kill_msg.session).await {
115                        Ok(true) => server_pong(),
116                        Ok(false) => server_error(
117                            404,
118                            format!("session {} not found", kill_msg.session),
119                        ),
120                        Err(e) => server_error(500, e.to_string()),
121                    };
122                }
123                client_message::Msg::GetConfig(_) => {
124                    yield match self.get_config().await {
125                        Ok(config) => ServerMessage {
126                            msg: Some(server_message::Msg::Config(ConfigMsg { config })),
127                        },
128                        Err(e) => server_error(500, e.to_string()),
129                    };
130                }
131                client_message::Msg::SetConfig(set_config_msg) => {
132                    yield match self.set_config(set_config_msg.config).await {
133                        Ok(()) => server_pong(),
134                        Err(e) => server_error(500, e.to_string()),
135                    };
136                }
137                client_message::Msg::SubscribeEvents(_) => {
138                    let s = self.subscribe_events();
139                    tokio::pin!(s);
140                    while let Some(result) = s.next().await {
141                        yield result_to_msg(result);
142                    }
143                }
144                client_message::Msg::Reload(_) => {
145                    yield match self.reload().await {
146                        Ok(()) => server_pong(),
147                        Err(e) => server_error(500, e.to_string()),
148                    };
149                }
150                client_message::Msg::ReplyToAsk(msg) => {
151                    yield match self.reply_to_ask(msg.session, msg.content).await {
152                        Ok(()) => server_pong(),
153                        Err(e) => server_error(404, e.to_string()),
154                    };
155                }
156            }
157        }
158    }
159}