crabtalk_core/protocol/api/
server.rs1use crate::protocol::message::{
4 AgentEventMsg, ClientMessage, CompactResponse, ConfigMsg, CreateCronMsg, CronInfo, CronList,
5 DaemonStats, ErrorMsg, Pong, SendMsg, SendResponse, ServerMessage, SessionInfo, SessionList,
6 StreamEvent, StreamMsg, client_message, server_message,
7};
8use anyhow::Result;
9use futures_core::Stream;
10use futures_util::StreamExt;
11
12fn server_error(code: u32, message: String) -> ServerMessage {
14 ServerMessage {
15 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
16 }
17}
18
19fn server_pong() -> ServerMessage {
21 ServerMessage {
22 msg: Some(server_message::Msg::Pong(Pong {})),
23 }
24}
25
26fn result_to_msg<T: Into<ServerMessage>>(result: Result<T>) -> ServerMessage {
28 match result {
29 Ok(resp) => resp.into(),
30 Err(e) => server_error(500, e.to_string()),
31 }
32}
33
34pub trait Server: Sync {
44 fn send(&self, req: SendMsg) -> impl std::future::Future<Output = Result<SendResponse>> + Send;
46
47 fn stream(&self, req: StreamMsg) -> impl Stream<Item = Result<StreamEvent>> + Send;
49
50 fn ping(&self) -> impl std::future::Future<Output = Result<()>> + Send;
52
53 fn list_sessions(&self) -> impl std::future::Future<Output = Result<Vec<SessionInfo>>> + Send;
55
56 fn kill_session(&self, session: u64) -> impl std::future::Future<Output = Result<bool>> + Send;
58
59 fn subscribe_events(&self) -> impl Stream<Item = Result<AgentEventMsg>> + Send;
61
62 fn get_config(&self) -> impl std::future::Future<Output = Result<String>> + Send;
64
65 fn set_config(&self, config: String) -> impl std::future::Future<Output = Result<()>> + Send;
67
68 fn reload(&self) -> impl std::future::Future<Output = Result<()>> + Send;
70
71 fn get_stats(&self) -> impl std::future::Future<Output = Result<DaemonStats>> + Send;
73
74 fn create_cron(
76 &self,
77 req: CreateCronMsg,
78 ) -> impl std::future::Future<Output = Result<CronInfo>> + Send;
79
80 fn delete_cron(&self, id: u64) -> impl std::future::Future<Output = Result<bool>> + Send;
82
83 fn list_crons(&self) -> impl std::future::Future<Output = Result<CronList>> + Send;
85
86 fn compact_session(
88 &self,
89 session: u64,
90 ) -> impl std::future::Future<Output = Result<String>> + Send;
91
92 fn reply_to_ask(
94 &self,
95 session: u64,
96 content: String,
97 ) -> impl std::future::Future<Output = Result<()>> + Send;
98
99 fn dispatch(&self, msg: ClientMessage) -> impl Stream<Item = ServerMessage> + Send + '_ {
104 async_stream::stream! {
105 let Some(inner) = msg.msg else {
106 yield server_error(400, "empty client message".to_string());
107 return;
108 };
109
110 match inner {
111 client_message::Msg::Send(send_msg) => {
112 yield result_to_msg(self.send(send_msg).await);
113 }
114 client_message::Msg::Stream(stream_msg) => {
115 let s = self.stream(stream_msg);
116 tokio::pin!(s);
117 while let Some(result) = s.next().await {
118 yield result_to_msg(result);
119 }
120 }
121 client_message::Msg::Ping(_) => {
122 yield match self.ping().await {
123 Ok(()) => server_pong(),
124 Err(e) => server_error(500, e.to_string()),
125 };
126 }
127 client_message::Msg::Sessions(_) => {
128 yield match self.list_sessions().await {
129 Ok(sessions) => ServerMessage {
130 msg: Some(server_message::Msg::Sessions(SessionList { sessions })),
131 },
132 Err(e) => server_error(500, e.to_string()),
133 };
134 }
135 client_message::Msg::Kill(kill_msg) => {
136 yield match self.kill_session(kill_msg.session).await {
137 Ok(true) => server_pong(),
138 Ok(false) => server_error(
139 404,
140 format!("session {} not found", kill_msg.session),
141 ),
142 Err(e) => server_error(500, e.to_string()),
143 };
144 }
145 client_message::Msg::GetConfig(_) => {
146 yield match self.get_config().await {
147 Ok(config) => ServerMessage {
148 msg: Some(server_message::Msg::Config(ConfigMsg { config })),
149 },
150 Err(e) => server_error(500, e.to_string()),
151 };
152 }
153 client_message::Msg::SetConfig(set_config_msg) => {
154 yield match self.set_config(set_config_msg.config).await {
155 Ok(()) => server_pong(),
156 Err(e) => server_error(500, e.to_string()),
157 };
158 }
159 client_message::Msg::SubscribeEvents(_) => {
160 let s = self.subscribe_events();
161 tokio::pin!(s);
162 while let Some(result) = s.next().await {
163 yield result_to_msg(result);
164 }
165 }
166 client_message::Msg::Reload(_) => {
167 yield match self.reload().await {
168 Ok(()) => server_pong(),
169 Err(e) => server_error(500, e.to_string()),
170 };
171 }
172 client_message::Msg::ReplyToAsk(msg) => {
173 yield match self.reply_to_ask(msg.session, msg.content).await {
174 Ok(()) => server_pong(),
175 Err(e) => server_error(404, e.to_string()),
176 };
177 }
178 client_message::Msg::GetStats(_) => {
179 yield match self.get_stats().await {
180 Ok(stats) => ServerMessage {
181 msg: Some(server_message::Msg::Stats(stats)),
182 },
183 Err(e) => server_error(500, e.to_string()),
184 };
185 }
186 client_message::Msg::CreateCron(req) => {
187 yield match self.create_cron(req).await {
188 Ok(info) => ServerMessage {
189 msg: Some(server_message::Msg::CronInfo(info)),
190 },
191 Err(e) => server_error(500, e.to_string()),
192 };
193 }
194 client_message::Msg::DeleteCron(req) => {
195 yield match self.delete_cron(req.id).await {
196 Ok(true) => server_pong(),
197 Ok(false) => server_error(404, format!("cron {} not found", req.id)),
198 Err(e) => server_error(500, e.to_string()),
199 };
200 }
201 client_message::Msg::ListCrons(_) => {
202 yield match self.list_crons().await {
203 Ok(list) => ServerMessage {
204 msg: Some(server_message::Msg::CronList(list)),
205 },
206 Err(e) => server_error(500, e.to_string()),
207 };
208 }
209 client_message::Msg::Compact(req) => {
210 yield match self.compact_session(req.session).await {
211 Ok(summary) => ServerMessage {
212 msg: Some(server_message::Msg::Compact(CompactResponse { summary })),
213 },
214 Err(e) => server_error(500, e.to_string()),
215 };
216 }
217 }
218 }
219 }
220}