crabtalk_core/protocol/api/
server.rs1use crate::protocol::message::{
4 AgentEventMsg, ClientMessage, ConfigMsg, ErrorMsg, Pong, SendMsg, SendResponse, ServerMessage,
5 SessionInfo, SessionList, StreamEvent, StreamMsg, client_message, server_message,
6};
7use anyhow::Result;
8use futures_core::Stream;
9use futures_util::StreamExt;
10
11fn server_error(code: u32, message: String) -> ServerMessage {
13 ServerMessage {
14 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
15 }
16}
17
18fn server_pong() -> ServerMessage {
20 ServerMessage {
21 msg: Some(server_message::Msg::Pong(Pong {})),
22 }
23}
24
25fn result_to_msg<T: Into<ServerMessage>>(result: Result<T>) -> ServerMessage {
27 match result {
28 Ok(resp) => resp.into(),
29 Err(e) => server_error(500, e.to_string()),
30 }
31}
32
33pub trait Server: Sync {
43 fn send(&self, req: SendMsg) -> impl std::future::Future<Output = Result<SendResponse>> + Send;
45
46 fn stream(&self, req: StreamMsg) -> impl Stream<Item = Result<StreamEvent>> + Send;
48
49 fn ping(&self) -> impl std::future::Future<Output = Result<()>> + Send;
51
52 fn list_sessions(&self) -> impl std::future::Future<Output = Result<Vec<SessionInfo>>> + Send;
54
55 fn kill_session(&self, session: u64) -> impl std::future::Future<Output = Result<bool>> + Send;
57
58 fn subscribe_events(&self) -> impl Stream<Item = Result<AgentEventMsg>> + Send;
60
61 fn get_config(&self) -> impl std::future::Future<Output = Result<String>> + Send;
63
64 fn set_config(&self, config: String) -> impl std::future::Future<Output = Result<()>> + Send;
66
67 fn reload(&self) -> impl std::future::Future<Output = Result<()>> + Send;
69
70 fn reply_to_ask(
72 &self,
73 session: u64,
74 content: String,
75 ) -> impl std::future::Future<Output = Result<()>> + Send;
76
77 fn dispatch(&self, msg: ClientMessage) -> impl Stream<Item = ServerMessage> + Send + '_ {
82 async_stream::stream! {
83 let Some(inner) = msg.msg else {
84 yield server_error(400, "empty client message".to_string());
85 return;
86 };
87
88 match inner {
89 client_message::Msg::Send(send_msg) => {
90 yield result_to_msg(self.send(send_msg).await);
91 }
92 client_message::Msg::Stream(stream_msg) => {
93 let s = self.stream(stream_msg);
94 tokio::pin!(s);
95 while let Some(result) = s.next().await {
96 yield result_to_msg(result);
97 }
98 }
99 client_message::Msg::Ping(_) => {
100 yield match self.ping().await {
101 Ok(()) => server_pong(),
102 Err(e) => server_error(500, e.to_string()),
103 };
104 }
105 client_message::Msg::Sessions(_) => {
106 yield match self.list_sessions().await {
107 Ok(sessions) => ServerMessage {
108 msg: Some(server_message::Msg::Sessions(SessionList { sessions })),
109 },
110 Err(e) => server_error(500, e.to_string()),
111 };
112 }
113 client_message::Msg::Kill(kill_msg) => {
114 yield match self.kill_session(kill_msg.session).await {
115 Ok(true) => server_pong(),
116 Ok(false) => server_error(
117 404,
118 format!("session {} not found", kill_msg.session),
119 ),
120 Err(e) => server_error(500, e.to_string()),
121 };
122 }
123 client_message::Msg::GetConfig(_) => {
124 yield match self.get_config().await {
125 Ok(config) => ServerMessage {
126 msg: Some(server_message::Msg::Config(ConfigMsg { config })),
127 },
128 Err(e) => server_error(500, e.to_string()),
129 };
130 }
131 client_message::Msg::SetConfig(set_config_msg) => {
132 yield match self.set_config(set_config_msg.config).await {
133 Ok(()) => server_pong(),
134 Err(e) => server_error(500, e.to_string()),
135 };
136 }
137 client_message::Msg::SubscribeEvents(_) => {
138 let s = self.subscribe_events();
139 tokio::pin!(s);
140 while let Some(result) = s.next().await {
141 yield result_to_msg(result);
142 }
143 }
144 client_message::Msg::Reload(_) => {
145 yield match self.reload().await {
146 Ok(()) => server_pong(),
147 Err(e) => server_error(500, e.to_string()),
148 };
149 }
150 client_message::Msg::ReplyToAsk(msg) => {
151 yield match self.reply_to_ask(msg.session, msg.content).await {
152 Ok(()) => server_pong(),
153 Err(e) => server_error(404, e.to_string()),
154 };
155 }
156 }
157 }
158 }
159}