Skip to main content

crabtalk_core/protocol/api/
server.rs

1//! Server trait — one async method per protocol operation.
2
3use crate::protocol::message::{
4    ActiveConversationInfo, ActiveConversationList, AgentEventMsg, AgentInfo, AgentList,
5    ClientMessage, CompactResponse, ConversationHistory, ConversationInfo, ConversationList,
6    CreateAgentMsg, CreateCronMsg, CronInfo, CronList, DaemonStats, ErrorMsg, InstallPluginMsg,
7    McpInfo, McpList, ModelInfo, ModelList, PluginEvent, PluginInfo, PluginList, PluginSearchList,
8    Pong, ProviderInfo, ProviderList, ProviderPresetInfo, ProviderPresetList, PublishEventMsg,
9    SendMsg, SendResponse, ServerMessage, ServiceLogOutput, SkillInfo, SkillList, SteerSessionMsg,
10    StreamEvent, StreamMsg, SubscribeEventMsg, SubscriptionInfo, SubscriptionList, UpdateAgentMsg,
11    UpsertMcpMsg, client_message, server_message,
12};
13use anyhow::Result;
14use futures_core::Stream;
15use futures_util::StreamExt;
16
17/// Construct an error `ServerMessage`.
18fn server_error(code: u32, message: String) -> ServerMessage {
19    ServerMessage {
20        msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
21    }
22}
23
24/// Construct a pong `ServerMessage`.
25fn server_pong() -> ServerMessage {
26    ServerMessage {
27        msg: Some(server_message::Msg::Pong(Pong {})),
28    }
29}
30
31/// Convert a typed `Result` into a `ServerMessage`.
32fn result_to_msg<T: Into<ServerMessage>>(result: Result<T>) -> ServerMessage {
33    match result {
34        Ok(resp) => resp.into(),
35        Err(e) => server_error(500, e.to_string()),
36    }
37}
38
39/// Server-side protocol handler.
40///
41/// Each method corresponds to one `ClientMessage` variant. Implementations
42/// receive typed request structs and return typed responses — no enum matching
43/// required. Streaming operations return `impl Stream`.
44///
45/// The provided [`dispatch`](Server::dispatch) method routes a raw
46/// `ClientMessage` to the appropriate handler, returning a stream of
47/// `ServerMessage`s.
48pub trait Server: Sync {
49    /// Handle `Send` — run agent and return complete response.
50    fn send(&self, req: SendMsg) -> impl std::future::Future<Output = Result<SendResponse>> + Send;
51
52    /// Handle `Stream` — run agent and stream response events.
53    fn stream(&self, req: StreamMsg) -> impl Stream<Item = Result<StreamEvent>> + Send;
54
55    /// Handle `Ping` — keepalive.
56    fn ping(&self) -> impl std::future::Future<Output = Result<()>> + Send;
57
58    /// Handle `ListActiveConversations` — list active conversations.
59    fn list_conversations_active(
60        &self,
61    ) -> impl std::future::Future<Output = Result<Vec<ActiveConversationInfo>>> + Send;
62
63    /// Handle `Kill` — close a conversation by (agent, sender).
64    fn kill_conversation(
65        &self,
66        agent: String,
67        sender: String,
68    ) -> impl std::future::Future<Output = Result<bool>> + Send;
69
70    /// Handle `SubscribeEvents` — stream agent events.
71    fn subscribe_events(&self) -> impl Stream<Item = Result<AgentEventMsg>> + Send;
72
73    /// Handle `Reload` — hot-reload runtime from disk.
74    fn reload(&self) -> impl std::future::Future<Output = Result<()>> + Send;
75
76    /// Handle `GetStats` — return daemon-level stats.
77    fn get_stats(&self) -> impl std::future::Future<Output = Result<DaemonStats>> + Send;
78
79    /// Handle `CreateCron` — create a new cron entry and start its timer.
80    fn create_cron(
81        &self,
82        req: CreateCronMsg,
83    ) -> impl std::future::Future<Output = Result<CronInfo>> + Send;
84
85    /// Handle `DeleteCron` — remove a cron entry and stop its timer.
86    fn delete_cron(&self, id: u64) -> impl std::future::Future<Output = Result<bool>> + Send;
87
88    /// Handle `ListCrons` — return all cron entries.
89    fn list_crons(&self) -> impl std::future::Future<Output = Result<CronList>> + Send;
90
91    /// Handle `SubscribeEvent` — create an event bus subscription.
92    fn subscribe_event(
93        &self,
94        req: SubscribeEventMsg,
95    ) -> impl std::future::Future<Output = Result<SubscriptionInfo>> + Send;
96
97    /// Handle `UnsubscribeEvent` — remove an event bus subscription.
98    fn unsubscribe_event(&self, id: u64) -> impl std::future::Future<Output = Result<bool>> + Send;
99
100    /// Handle `ListSubscriptions` — return all event bus subscriptions.
101    fn list_subscriptions(
102        &self,
103    ) -> impl std::future::Future<Output = Result<SubscriptionList>> + Send;
104
105    /// Handle `PublishEvent` — publish an event to the bus.
106    fn publish_event(
107        &self,
108        req: PublishEventMsg,
109    ) -> impl std::future::Future<Output = Result<()>> + Send;
110
111    /// Handle `Compact` — compact a conversation's history into a summary.
112    fn compact_conversation(
113        &self,
114        agent: String,
115        sender: String,
116    ) -> impl std::future::Future<Output = Result<String>> + Send;
117
118    /// Handle `ReplyToAsk` — deliver a user reply to a pending `ask_user` tool call.
119    fn reply_to_ask(
120        &self,
121        agent: String,
122        sender: String,
123        content: String,
124    ) -> impl std::future::Future<Output = Result<()>> + Send;
125
126    /// Handle `SteerSession` — inject a user message into an active stream.
127    fn steer_session(
128        &self,
129        req: SteerSessionMsg,
130    ) -> impl std::future::Future<Output = Result<()>> + Send;
131
132    /// Handle `ListAgents` — return all registered agents.
133    fn list_agents(&self) -> impl std::future::Future<Output = Result<Vec<AgentInfo>>> + Send;
134
135    /// Handle `GetAgent` — return a single agent by name.
136    fn get_agent(
137        &self,
138        name: String,
139    ) -> impl std::future::Future<Output = Result<AgentInfo>> + Send;
140
141    /// Handle `CreateAgent` — create a new agent from JSON config.
142    fn create_agent(
143        &self,
144        req: CreateAgentMsg,
145    ) -> impl std::future::Future<Output = Result<AgentInfo>> + Send;
146
147    /// Handle `UpdateAgent` — update an existing agent from JSON config.
148    fn update_agent(
149        &self,
150        req: UpdateAgentMsg,
151    ) -> impl std::future::Future<Output = Result<AgentInfo>> + Send;
152
153    /// Handle `DeleteAgent` — remove an agent by name.
154    fn delete_agent(&self, name: String) -> impl std::future::Future<Output = Result<bool>> + Send;
155
156    /// Handle `RenameAgent` — rename an agent in place.
157    fn rename_agent(
158        &self,
159        old_name: String,
160        new_name: String,
161    ) -> impl std::future::Future<Output = Result<AgentInfo>> + Send;
162
163    /// Handle `ListProviders` — return all registered LLM providers.
164    fn list_providers(&self)
165    -> impl std::future::Future<Output = Result<Vec<ProviderInfo>>> + Send;
166
167    /// Handle `InstallPlugin` — install a plugin, stream progress.
168    fn install_plugin(
169        &self,
170        req: InstallPluginMsg,
171    ) -> impl Stream<Item = Result<PluginEvent>> + Send;
172
173    /// Handle `UninstallPlugin` — uninstall a plugin, stream progress.
174    fn uninstall_plugin(&self, plugin: String) -> impl Stream<Item = Result<PluginEvent>> + Send;
175
176    /// Handle `ListPlugins` — return all installed plugins.
177    fn list_plugins(&self) -> impl std::future::Future<Output = Result<Vec<PluginInfo>>> + Send;
178
179    /// Handle `SearchPlugins` — search registry for available plugins.
180    fn search_plugins(
181        &self,
182        query: String,
183    ) -> impl std::future::Future<Output = Result<Vec<PluginInfo>>> + Send;
184
185    /// Handle `ListSkills` — return all available skills with enabled state.
186    fn list_skills(&self) -> impl std::future::Future<Output = Result<Vec<SkillInfo>>> + Send;
187
188    /// Handle `ListModels` — return all resolved models with provider and active state.
189    fn list_models(&self) -> impl std::future::Future<Output = Result<Vec<ModelInfo>>> + Send;
190
191    /// Handle `ListConversations` — return historical conversations from disk.
192    fn list_conversations(
193        &self,
194        agent: String,
195        sender: String,
196    ) -> impl std::future::Future<Output = Result<Vec<ConversationInfo>>> + Send;
197
198    /// Handle `GetConversationHistory` — load messages from a session file.
199    fn get_conversation_history(
200        &self,
201        file_path: String,
202    ) -> impl std::future::Future<Output = Result<ConversationHistory>> + Send;
203
204    /// Handle `DeleteConversation` — delete a conversation file from disk.
205    fn delete_conversation(
206        &self,
207        file_path: String,
208    ) -> impl std::future::Future<Output = Result<()>> + Send;
209
210    /// Handle `ListMcps` — return all MCP server configs with source info.
211    fn list_mcps(&self) -> impl std::future::Future<Output = Result<Vec<McpInfo>>> + Send;
212
213    /// Handle `UpsertMcp` — create or replace an MCP server in Storage.
214    fn upsert_mcp(
215        &self,
216        req: UpsertMcpMsg,
217    ) -> impl std::future::Future<Output = Result<McpInfo>> + Send;
218
219    /// Handle `DeleteMcp` — remove an MCP server from Storage.
220    fn delete_mcp(&self, name: String) -> impl std::future::Future<Output = Result<bool>> + Send;
221
222    /// Handle `SetProvider` — create or update a provider in config.toml.
223    fn set_provider(
224        &self,
225        name: String,
226        config: String,
227    ) -> impl std::future::Future<Output = Result<ProviderInfo>> + Send;
228
229    /// Handle `DeleteProvider` — remove a provider from config.toml.
230    fn delete_provider(&self, name: String)
231    -> impl std::future::Future<Output = Result<()>> + Send;
232
233    /// Handle `SetActiveModel` — update the active model in config.toml.
234    fn set_active_model(
235        &self,
236        model: String,
237    ) -> impl std::future::Future<Output = Result<()>> + Send;
238
239    /// Handle `ListProviderPresets` — return provider preset templates.
240    fn list_provider_presets(
241        &self,
242    ) -> impl std::future::Future<Output = Result<Vec<ProviderPresetInfo>>> + Send;
243
244    /// Handle `StartService` — install and start a command service.
245    fn start_service(
246        &self,
247        name: String,
248        force: bool,
249    ) -> impl std::future::Future<Output = Result<()>> + Send;
250
251    /// Handle `StopService` — stop and uninstall a command service.
252    fn stop_service(&self, name: String) -> impl std::future::Future<Output = Result<()>> + Send;
253
254    /// Handle `ServiceLogs` — return recent log lines for a service.
255    fn service_logs(
256        &self,
257        name: String,
258        lines: u32,
259    ) -> impl std::future::Future<Output = Result<String>> + Send;
260
261    /// Handle `Extension` — opaque bytes for downstream product protocols.
262    ///
263    /// Default: returns "not supported". Downstream products override this
264    /// to handle their own message formats (local proto, JSON, bincode, etc.).
265    fn dispatch_extension(
266        &self,
267        _payload: Vec<u8>,
268    ) -> impl std::future::Future<Output = Result<Vec<u8>>> + Send {
269        async { anyhow::bail!("extension not supported") }
270    }
271
272    /// Dispatch a `ClientMessage` to the appropriate handler method.
273    ///
274    /// Returns a stream of `ServerMessage`s. Request-response operations
275    /// yield exactly one message; streaming operations yield many.
276    fn dispatch(&self, msg: ClientMessage) -> impl Stream<Item = ServerMessage> + Send + '_ {
277        async_stream::stream! {
278            let Some(inner) = msg.msg else {
279                yield server_error(400, "empty client message".to_string());
280                return;
281            };
282
283            match inner {
284                client_message::Msg::Send(send_msg) => {
285                    yield result_to_msg(self.send(send_msg).await);
286                }
287                client_message::Msg::Stream(stream_msg) => {
288                    let s = self.stream(stream_msg);
289                    tokio::pin!(s);
290                    while let Some(result) = s.next().await {
291                        yield result_to_msg(result);
292                    }
293                }
294                client_message::Msg::Ping(_) => {
295                    yield match self.ping().await {
296                        Ok(()) => server_pong(),
297                        Err(e) => server_error(500, e.to_string()),
298                    };
299                }
300                client_message::Msg::ListActiveConversations(_req) => {
301                    yield match self.list_conversations_active().await {
302                        Ok(conversations) => ServerMessage {
303                            msg: Some(server_message::Msg::ActiveConversations(ActiveConversationList { conversations })),
304                        },
305                        Err(e) => server_error(500, e.to_string()),
306                    };
307                }
308                client_message::Msg::Kill(kill_msg) => {
309                    yield match self.kill_conversation(kill_msg.agent.clone(), kill_msg.sender.clone()).await {
310                        Ok(true) => server_pong(),
311                        Ok(false) => server_error(
312                            404,
313                            format!("conversation not found for agent='{}' sender='{}'", kill_msg.agent, kill_msg.sender),
314                        ),
315                        Err(e) => server_error(500, e.to_string()),
316                    };
317                }
318                client_message::Msg::GetConfig(_) => {
319                    yield server_error(410, "GetConfig is deprecated — use GetStats and granular APIs".into());
320                }
321                client_message::Msg::SubscribeEvents(_) => {
322                    let s = self.subscribe_events();
323                    tokio::pin!(s);
324                    while let Some(result) = s.next().await {
325                        yield result_to_msg(result);
326                    }
327                }
328                client_message::Msg::Reload(_) => {
329                    yield match self.reload().await {
330                        Ok(()) => server_pong(),
331                        Err(e) => server_error(500, e.to_string()),
332                    };
333                }
334                client_message::Msg::ReplyToAsk(msg) => {
335                    yield match self.reply_to_ask(msg.agent, msg.sender, msg.content).await {
336                        Ok(()) => server_pong(),
337                        Err(e) => server_error(404, e.to_string()),
338                    };
339                }
340                client_message::Msg::SteerSession(req) => {
341                    yield match self.steer_session(req).await {
342                        Ok(()) => server_pong(),
343                        Err(e) => server_error(500, e.to_string()),
344                    };
345                }
346                client_message::Msg::GetStats(_) => {
347                    yield match self.get_stats().await {
348                        Ok(stats) => ServerMessage {
349                            msg: Some(server_message::Msg::Stats(stats)),
350                        },
351                        Err(e) => server_error(500, e.to_string()),
352                    };
353                }
354                client_message::Msg::CreateCron(req) => {
355                    yield match self.create_cron(req).await {
356                        Ok(info) => ServerMessage {
357                            msg: Some(server_message::Msg::CronInfo(info)),
358                        },
359                        Err(e) => server_error(500, e.to_string()),
360                    };
361                }
362                client_message::Msg::DeleteCron(req) => {
363                    yield match self.delete_cron(req.id).await {
364                        Ok(true) => server_pong(),
365                        Ok(false) => server_error(404, format!("cron {} not found", req.id)),
366                        Err(e) => server_error(500, e.to_string()),
367                    };
368                }
369                client_message::Msg::ListCrons(_) => {
370                    yield match self.list_crons().await {
371                        Ok(list) => ServerMessage {
372                            msg: Some(server_message::Msg::CronList(list)),
373                        },
374                        Err(e) => server_error(500, e.to_string()),
375                    };
376                }
377                client_message::Msg::SubscribeEvent(req) => {
378                    yield match self.subscribe_event(req).await {
379                        Ok(info) => ServerMessage {
380                            msg: Some(server_message::Msg::SubscriptionInfo(info)),
381                        },
382                        Err(e) => server_error(500, e.to_string()),
383                    };
384                }
385                client_message::Msg::UnsubscribeEvent(req) => {
386                    yield match self.unsubscribe_event(req.id).await {
387                        Ok(true) => server_pong(),
388                        Ok(false) => server_error(404, format!("subscription {} not found", req.id)),
389                        Err(e) => server_error(500, e.to_string()),
390                    };
391                }
392                client_message::Msg::ListSubscriptions(_) => {
393                    yield match self.list_subscriptions().await {
394                        Ok(list) => ServerMessage {
395                            msg: Some(server_message::Msg::SubscriptionList(list)),
396                        },
397                        Err(e) => server_error(500, e.to_string()),
398                    };
399                }
400                client_message::Msg::PublishEvent(req) => {
401                    yield match self.publish_event(req).await {
402                        Ok(()) => server_pong(),
403                        Err(e) => server_error(500, e.to_string()),
404                    };
405                }
406                client_message::Msg::Compact(req) => {
407                    yield match self.compact_conversation(req.agent, req.sender).await {
408                        Ok(summary) => ServerMessage {
409                            msg: Some(server_message::Msg::Compact(CompactResponse { summary })),
410                        },
411                        Err(e) => server_error(500, e.to_string()),
412                    };
413                }
414                client_message::Msg::ListAgents(_) => {
415                    yield match self.list_agents().await {
416                        Ok(agents) => ServerMessage {
417                            msg: Some(server_message::Msg::AgentList(AgentList { agents })),
418                        },
419                        Err(e) => server_error(500, e.to_string()),
420                    };
421                }
422                client_message::Msg::GetAgent(req) => {
423                    yield match self.get_agent(req.name).await {
424                        Ok(info) => ServerMessage {
425                            msg: Some(server_message::Msg::AgentInfo(info)),
426                        },
427                        Err(e) => server_error(404, e.to_string()),
428                    };
429                }
430                client_message::Msg::CreateAgent(req) => {
431                    yield match self.create_agent(req).await {
432                        Ok(info) => ServerMessage {
433                            msg: Some(server_message::Msg::AgentInfo(info)),
434                        },
435                        Err(e) => server_error(500, e.to_string()),
436                    };
437                }
438                client_message::Msg::UpdateAgent(req) => {
439                    yield match self.update_agent(req).await {
440                        Ok(info) => ServerMessage {
441                            msg: Some(server_message::Msg::AgentInfo(info)),
442                        },
443                        Err(e) => server_error(500, e.to_string()),
444                    };
445                }
446                client_message::Msg::DeleteAgent(req) => {
447                    yield match self.delete_agent(req.name.clone()).await {
448                        Ok(true) => server_pong(),
449                        Ok(false) => server_error(
450                            404,
451                            format!("agent '{}' not found in local manifest", req.name),
452                        ),
453                        Err(e) => server_error(500, e.to_string()),
454                    };
455                }
456                client_message::Msg::RenameAgent(req) => {
457                    yield match self.rename_agent(req.old_name, req.new_name).await {
458                        Ok(info) => ServerMessage {
459                            msg: Some(server_message::Msg::AgentInfo(info)),
460                        },
461                        Err(e) => server_error(500, e.to_string()),
462                    };
463                }
464                client_message::Msg::ListProviders(_) => {
465                    yield match self.list_providers().await {
466                        Ok(providers) => ServerMessage {
467                            msg: Some(server_message::Msg::ProviderList(ProviderList {
468                                providers,
469                            })),
470                        },
471                        Err(e) => server_error(500, e.to_string()),
472                    };
473                }
474                client_message::Msg::InstallPlugin(req) => {
475                    let s = self.install_plugin(req);
476                    tokio::pin!(s);
477                    while let Some(result) = s.next().await {
478                        yield result_to_msg(result);
479                    }
480                }
481                client_message::Msg::UninstallPlugin(req) => {
482                    let s = self.uninstall_plugin(req.plugin);
483                    tokio::pin!(s);
484                    while let Some(result) = s.next().await {
485                        yield result_to_msg(result);
486                    }
487                }
488                client_message::Msg::ListPlugins(_) => {
489                    yield match self.list_plugins().await {
490                        Ok(plugins) => ServerMessage {
491                            msg: Some(server_message::Msg::PluginList(PluginList {
492                                plugins,
493                            })),
494                        },
495                        Err(e) => server_error(500, e.to_string()),
496                    };
497                }
498                client_message::Msg::SearchPlugins(req) => {
499                    yield match self.search_plugins(req.query).await {
500                        Ok(plugins) => ServerMessage {
501                            msg: Some(server_message::Msg::PluginSearchList(PluginSearchList {
502                                plugins,
503                            })),
504                        },
505                        Err(e) => server_error(500, e.to_string()),
506                    };
507                }
508                client_message::Msg::StartService(req) => {
509                    yield match self.start_service(req.name, req.force).await {
510                        Ok(()) => server_pong(),
511                        Err(e) => server_error(500, e.to_string()),
512                    };
513                }
514                client_message::Msg::StopService(req) => {
515                    yield match self.stop_service(req.name).await {
516                        Ok(()) => server_pong(),
517                        Err(e) => server_error(500, e.to_string()),
518                    };
519                }
520                client_message::Msg::ServiceLogs(req) => {
521                    yield match self.service_logs(req.name, req.lines).await {
522                        Ok(content) => ServerMessage {
523                            msg: Some(server_message::Msg::ServiceLogOutput(
524                                ServiceLogOutput { content },
525                            )),
526                        },
527                        Err(e) => server_error(500, e.to_string()),
528                    };
529                }
530                client_message::Msg::ListSkills(_) => {
531                    yield match self.list_skills().await {
532                        Ok(skills) => ServerMessage {
533                            msg: Some(server_message::Msg::SkillList(SkillList { skills })),
534                        },
535                        Err(e) => server_error(500, e.to_string()),
536                    };
537                }
538                client_message::Msg::ListModels(_) => {
539                    yield match self.list_models().await {
540                        Ok(models) => ServerMessage {
541                            msg: Some(server_message::Msg::ModelList(ModelList { models })),
542                        },
543                        Err(e) => server_error(500, e.to_string()),
544                    };
545                }
546                client_message::Msg::ListConversations(req) => {
547                    yield match self.list_conversations(req.agent, req.sender).await {
548                        Ok(conversations) => ServerMessage {
549                            msg: Some(server_message::Msg::ConversationList(ConversationList {
550                                conversations,
551                            })),
552                        },
553                        Err(e) => server_error(500, e.to_string()),
554                    };
555                }
556                client_message::Msg::GetConversationHistory(req) => {
557                    yield result_to_msg(self.get_conversation_history(req.file_path).await);
558                }
559                client_message::Msg::DeleteConversation(req) => {
560                    yield match self.delete_conversation(req.file_path).await {
561                        Ok(()) => server_pong(),
562                        Err(e) => server_error(500, e.to_string()),
563                    };
564                }
565                client_message::Msg::ListMcps(_) => {
566                    yield match self.list_mcps().await {
567                        Ok(mcps) => ServerMessage {
568                            msg: Some(server_message::Msg::McpList(McpList { mcps })),
569                        },
570                        Err(e) => server_error(500, e.to_string()),
571                    };
572                }
573                client_message::Msg::UpsertMcp(req) => {
574                    yield match self.upsert_mcp(req).await {
575                        Ok(info) => ServerMessage {
576                            msg: Some(server_message::Msg::McpInfo(info)),
577                        },
578                        Err(e) => server_error(500, e.to_string()),
579                    };
580                }
581                client_message::Msg::DeleteMcp(req) => {
582                    yield match self.delete_mcp(req.name.clone()).await {
583                        Ok(true) => server_pong(),
584                        Ok(false) => server_error(404, format!("mcp '{}' not found", req.name)),
585                        Err(e) => server_error(500, e.to_string()),
586                    };
587                }
588                client_message::Msg::SetProvider(req) => {
589                    yield match self.set_provider(req.name, req.config).await {
590                        Ok(info) => ServerMessage {
591                            msg: Some(server_message::Msg::ProviderList(ProviderList {
592                                providers: vec![info],
593                            })),
594                        },
595                        Err(e) => server_error(500, e.to_string()),
596                    };
597                }
598                client_message::Msg::DeleteProvider(req) => {
599                    yield match self.delete_provider(req.name.clone()).await {
600                        Ok(()) => server_pong(),
601                        Err(e) => server_error(500, e.to_string()),
602                    };
603                }
604                client_message::Msg::SetActiveModel(req) => {
605                    yield match self.set_active_model(req.model).await {
606                        Ok(()) => server_pong(),
607                        Err(e) => server_error(500, e.to_string()),
608                    };
609                }
610                client_message::Msg::ListProviderPresets(_) => {
611                    yield match self.list_provider_presets().await {
612                        Ok(presets) => ServerMessage {
613                            msg: Some(server_message::Msg::ProviderPresetList(
614                                ProviderPresetList { presets },
615                            )),
616                        },
617                        Err(e) => server_error(500, e.to_string()),
618                    };
619                }
620                client_message::Msg::Extension(payload) => {
621                    yield match self.dispatch_extension(payload).await {
622                        Ok(response) => ServerMessage {
623                            msg: Some(server_message::Msg::Extension(response)),
624                        },
625                        Err(e) => server_error(500, e.to_string()),
626                    };
627                }
628            }
629        }
630    }
631}