1use crate::protocol::message::{
4 AgentInfo, AgentList, ClientMessage, ConversationHistory, ConversationInfo, ConversationList,
5 CreateAgentMsg, DaemonStats, DeleteAgentMsg, DeleteConversationMsg, DeleteMcpMsg,
6 DeleteProviderMsg, ErrorMsg, GetAgentMsg, GetConversationHistoryMsg, GetStats,
7 InstallPluginMsg, ListAgentsMsg, ListConversationsMsg, ListMcpsMsg, ListModelsMsg,
8 ListPluginsMsg, ListProviderPresetsMsg, ListProvidersMsg, ListSkillsMsg, ListSubscriptionsMsg,
9 McpInfo, McpList, ModelInfo, ModelList, Ping, PluginEvent, PluginInfo, PluginList,
10 PluginSearchList, ProviderInfo, ProviderList, ProviderPresetInfo, ProviderPresetList,
11 PublishEventMsg, RenameAgentMsg, SearchPluginsMsg, SendMsg, SendResponse, ServerMessage,
12 ServiceLogOutput, ServiceLogsMsg, SetActiveModelMsg, SetProviderMsg, SkillInfo, SkillList,
13 StartServiceMsg, StopServiceMsg, StreamEvent, StreamMsg, SubscribeEventMsg, SubscriptionInfo,
14 SubscriptionList, UninstallPluginMsg, UnsubscribeEventMsg, UpdateAgentMsg, UpsertMcpMsg,
15 client_message, plugin_event, server_message, stream_event,
16};
17use anyhow::Result;
18use futures_core::Stream;
19use futures_util::StreamExt;
20
21pub trait Client: Send {
28 fn request(
30 &mut self,
31 msg: ClientMessage,
32 ) -> impl std::future::Future<Output = Result<ServerMessage>> + Send;
33
34 fn request_stream(
41 &mut self,
42 msg: ClientMessage,
43 ) -> impl Stream<Item = Result<ServerMessage>> + Send + '_;
44
45 fn send(
47 &mut self,
48 req: SendMsg,
49 ) -> impl std::future::Future<Output = Result<SendResponse>> + Send {
50 async move { SendResponse::try_from(self.request(req.into()).await?) }
51 }
52
53 fn stream(
55 &mut self,
56 req: StreamMsg,
57 ) -> impl Stream<Item = Result<stream_event::Event>> + Send + '_ {
58 self.request_stream(req.into())
59 .take_while(|r| {
60 std::future::ready(!matches!(
61 r,
62 Ok(ServerMessage {
63 msg: Some(server_message::Msg::Stream(StreamEvent {
64 event: Some(stream_event::Event::End(_))
65 }))
66 })
67 ))
68 })
69 .map(|r| r.and_then(stream_event::Event::try_from))
70 }
71
72 fn ping(&mut self) -> impl std::future::Future<Output = Result<()>> + Send {
74 async move {
75 match self
76 .request(ClientMessage {
77 msg: Some(client_message::Msg::Ping(Ping {})),
78 })
79 .await?
80 {
81 ServerMessage {
82 msg: Some(server_message::Msg::Pong(_)),
83 } => Ok(()),
84 ServerMessage {
85 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
86 } => {
87 anyhow::bail!("server error ({code}): {message}")
88 }
89 other => anyhow::bail!("unexpected response: {other:?}"),
90 }
91 }
92 }
93
94 fn get_stats(&mut self) -> impl std::future::Future<Output = Result<DaemonStats>> + Send {
96 async move {
97 match self
98 .request(ClientMessage {
99 msg: Some(client_message::Msg::GetStats(GetStats {})),
100 })
101 .await?
102 {
103 ServerMessage {
104 msg: Some(server_message::Msg::Stats(stats)),
105 } => Ok(stats),
106 ServerMessage {
107 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
108 } => {
109 anyhow::bail!("server error ({code}): {message}")
110 }
111 other => anyhow::bail!("unexpected response: {other:?}"),
112 }
113 }
114 }
115
116 fn list_agents(&mut self) -> impl std::future::Future<Output = Result<Vec<AgentInfo>>> + Send {
118 async move {
119 match self
120 .request(ClientMessage {
121 msg: Some(client_message::Msg::ListAgents(ListAgentsMsg {})),
122 })
123 .await?
124 {
125 ServerMessage {
126 msg: Some(server_message::Msg::AgentList(AgentList { agents })),
127 } => Ok(agents),
128 ServerMessage {
129 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
130 } => {
131 anyhow::bail!("server error ({code}): {message}")
132 }
133 other => anyhow::bail!("unexpected response: {other:?}"),
134 }
135 }
136 }
137
138 fn get_agent(
140 &mut self,
141 name: String,
142 ) -> impl std::future::Future<Output = Result<AgentInfo>> + Send {
143 async move {
144 match self
145 .request(ClientMessage {
146 msg: Some(client_message::Msg::GetAgent(GetAgentMsg { name })),
147 })
148 .await?
149 {
150 ServerMessage {
151 msg: Some(server_message::Msg::AgentInfo(info)),
152 } => Ok(info),
153 ServerMessage {
154 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
155 } => {
156 anyhow::bail!("server error ({code}): {message}")
157 }
158 other => anyhow::bail!("unexpected response: {other:?}"),
159 }
160 }
161 }
162
163 fn create_agent(
165 &mut self,
166 name: String,
167 config: String,
168 prompt: String,
169 ) -> impl std::future::Future<Output = Result<AgentInfo>> + Send {
170 async move {
171 match self
172 .request(ClientMessage {
173 msg: Some(client_message::Msg::CreateAgent(CreateAgentMsg {
174 name,
175 config,
176 prompt,
177 })),
178 })
179 .await?
180 {
181 ServerMessage {
182 msg: Some(server_message::Msg::AgentInfo(info)),
183 } => Ok(info),
184 ServerMessage {
185 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
186 } => {
187 anyhow::bail!("server error ({code}): {message}")
188 }
189 other => anyhow::bail!("unexpected response: {other:?}"),
190 }
191 }
192 }
193
194 fn update_agent(
196 &mut self,
197 name: String,
198 config: String,
199 prompt: String,
200 ) -> impl std::future::Future<Output = Result<AgentInfo>> + Send {
201 async move {
202 match self
203 .request(ClientMessage {
204 msg: Some(client_message::Msg::UpdateAgent(UpdateAgentMsg {
205 name,
206 config,
207 prompt,
208 })),
209 })
210 .await?
211 {
212 ServerMessage {
213 msg: Some(server_message::Msg::AgentInfo(info)),
214 } => Ok(info),
215 ServerMessage {
216 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
217 } => {
218 anyhow::bail!("server error ({code}): {message}")
219 }
220 other => anyhow::bail!("unexpected response: {other:?}"),
221 }
222 }
223 }
224
225 fn delete_agent(
227 &mut self,
228 name: String,
229 ) -> impl std::future::Future<Output = Result<()>> + Send {
230 async move {
231 match self
232 .request(ClientMessage {
233 msg: Some(client_message::Msg::DeleteAgent(DeleteAgentMsg { name })),
234 })
235 .await?
236 {
237 ServerMessage {
238 msg: Some(server_message::Msg::Pong(_)),
239 } => Ok(()),
240 ServerMessage {
241 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
242 } => {
243 anyhow::bail!("server error ({code}): {message}")
244 }
245 other => anyhow::bail!("unexpected response: {other:?}"),
246 }
247 }
248 }
249
250 fn rename_agent(
252 &mut self,
253 old_name: String,
254 new_name: String,
255 ) -> impl std::future::Future<Output = Result<AgentInfo>> + Send {
256 async move {
257 match self
258 .request(ClientMessage {
259 msg: Some(client_message::Msg::RenameAgent(RenameAgentMsg {
260 old_name,
261 new_name,
262 })),
263 })
264 .await?
265 {
266 ServerMessage {
267 msg: Some(server_message::Msg::AgentInfo(info)),
268 } => Ok(info),
269 ServerMessage {
270 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
271 } => {
272 anyhow::bail!("server error ({code}): {message}")
273 }
274 other => anyhow::bail!("unexpected response: {other:?}"),
275 }
276 }
277 }
278
279 fn list_providers(
281 &mut self,
282 ) -> impl std::future::Future<Output = Result<Vec<ProviderInfo>>> + Send {
283 async move {
284 match self
285 .request(ClientMessage {
286 msg: Some(client_message::Msg::ListProviders(ListProvidersMsg {})),
287 })
288 .await?
289 {
290 ServerMessage {
291 msg: Some(server_message::Msg::ProviderList(ProviderList { providers })),
292 } => Ok(providers),
293 ServerMessage {
294 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
295 } => {
296 anyhow::bail!("server error ({code}): {message}")
297 }
298 other => anyhow::bail!("unexpected response: {other:?}"),
299 }
300 }
301 }
302
303 fn install_plugin(
305 &mut self,
306 plugin: String,
307 branch: String,
308 path: String,
309 force: bool,
310 ) -> impl Stream<Item = Result<plugin_event::Event>> + Send + '_ {
311 self.request_stream(ClientMessage {
312 msg: Some(client_message::Msg::InstallPlugin(InstallPluginMsg {
313 plugin,
314 branch,
315 path,
316 force,
317 })),
318 })
319 .take_while(|r| {
320 std::future::ready(!matches!(
321 r,
322 Ok(ServerMessage {
323 msg: Some(server_message::Msg::PluginEvent(PluginEvent {
324 event: Some(plugin_event::Event::Done(d))
325 }))
326 }) if d.error.is_empty()
327 ))
328 })
329 .map(|r| r.and_then(plugin_event::Event::try_from))
330 }
331
332 fn uninstall_plugin(
334 &mut self,
335 plugin: String,
336 ) -> impl Stream<Item = Result<plugin_event::Event>> + Send + '_ {
337 self.request_stream(ClientMessage {
338 msg: Some(client_message::Msg::UninstallPlugin(UninstallPluginMsg {
339 plugin,
340 })),
341 })
342 .take_while(|r| {
343 std::future::ready(!matches!(
344 r,
345 Ok(ServerMessage {
346 msg: Some(server_message::Msg::PluginEvent(PluginEvent {
347 event: Some(plugin_event::Event::Done(d))
348 }))
349 }) if d.error.is_empty()
350 ))
351 })
352 .map(|r| r.and_then(plugin_event::Event::try_from))
353 }
354
355 fn list_plugins(
357 &mut self,
358 ) -> impl std::future::Future<Output = Result<Vec<PluginInfo>>> + Send {
359 async move {
360 match self
361 .request(ClientMessage {
362 msg: Some(client_message::Msg::ListPlugins(ListPluginsMsg {})),
363 })
364 .await?
365 {
366 ServerMessage {
367 msg: Some(server_message::Msg::PluginList(PluginList { plugins })),
368 } => Ok(plugins),
369 ServerMessage {
370 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
371 } => {
372 anyhow::bail!("server error ({code}): {message}")
373 }
374 other => anyhow::bail!("unexpected response: {other:?}"),
375 }
376 }
377 }
378
379 fn search_plugins(
381 &mut self,
382 query: String,
383 ) -> impl std::future::Future<Output = Result<Vec<PluginInfo>>> + Send {
384 async move {
385 match self
386 .request(ClientMessage {
387 msg: Some(client_message::Msg::SearchPlugins(SearchPluginsMsg {
388 query,
389 })),
390 })
391 .await?
392 {
393 ServerMessage {
394 msg: Some(server_message::Msg::PluginSearchList(PluginSearchList { plugins })),
395 } => Ok(plugins),
396 ServerMessage {
397 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
398 } => {
399 anyhow::bail!("server error ({code}): {message}")
400 }
401 other => anyhow::bail!("unexpected response: {other:?}"),
402 }
403 }
404 }
405
406 fn list_conversations(
408 &mut self,
409 agent: String,
410 sender: String,
411 ) -> impl std::future::Future<Output = Result<Vec<ConversationInfo>>> + Send {
412 async move {
413 match self
414 .request(ClientMessage {
415 msg: Some(client_message::Msg::ListConversations(
416 ListConversationsMsg { agent, sender },
417 )),
418 })
419 .await?
420 {
421 ServerMessage {
422 msg:
423 Some(server_message::Msg::ConversationList(ConversationList { conversations })),
424 } => Ok(conversations),
425 ServerMessage {
426 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
427 } => {
428 anyhow::bail!("server error ({code}): {message}")
429 }
430 other => anyhow::bail!("unexpected response: {other:?}"),
431 }
432 }
433 }
434
435 fn get_conversation_history(
437 &mut self,
438 file_path: String,
439 ) -> impl std::future::Future<Output = Result<ConversationHistory>> + Send {
440 async move {
441 match self
442 .request(ClientMessage {
443 msg: Some(client_message::Msg::GetConversationHistory(
444 GetConversationHistoryMsg { file_path },
445 )),
446 })
447 .await?
448 {
449 ServerMessage {
450 msg: Some(server_message::Msg::ConversationHistory(history)),
451 } => Ok(history),
452 ServerMessage {
453 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
454 } => {
455 anyhow::bail!("server error ({code}): {message}")
456 }
457 other => anyhow::bail!("unexpected response: {other:?}"),
458 }
459 }
460 }
461
462 fn delete_conversation(
464 &mut self,
465 file_path: String,
466 ) -> impl std::future::Future<Output = Result<()>> + Send {
467 async move {
468 match self
469 .request(ClientMessage {
470 msg: Some(client_message::Msg::DeleteConversation(
471 DeleteConversationMsg { file_path },
472 )),
473 })
474 .await?
475 {
476 ServerMessage {
477 msg: Some(server_message::Msg::Pong(_)),
478 } => Ok(()),
479 ServerMessage {
480 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
481 } => {
482 anyhow::bail!("server error ({code}): {message}")
483 }
484 other => anyhow::bail!("unexpected response: {other:?}"),
485 }
486 }
487 }
488
489 fn list_mcps(&mut self) -> impl std::future::Future<Output = Result<Vec<McpInfo>>> + Send {
491 async move {
492 match self
493 .request(ClientMessage {
494 msg: Some(client_message::Msg::ListMcps(ListMcpsMsg {})),
495 })
496 .await?
497 {
498 ServerMessage {
499 msg: Some(server_message::Msg::McpList(McpList { mcps })),
500 } => Ok(mcps),
501 ServerMessage {
502 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
503 } => anyhow::bail!("server error ({code}): {message}"),
504 other => anyhow::bail!("unexpected response: {other:?}"),
505 }
506 }
507 }
508
509 fn upsert_mcp(
511 &mut self,
512 config: String,
513 ) -> impl std::future::Future<Output = Result<McpInfo>> + Send {
514 async move {
515 match self
516 .request(ClientMessage {
517 msg: Some(client_message::Msg::UpsertMcp(UpsertMcpMsg { config })),
518 })
519 .await?
520 {
521 ServerMessage {
522 msg: Some(server_message::Msg::McpInfo(info)),
523 } => Ok(info),
524 ServerMessage {
525 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
526 } => anyhow::bail!("server error ({code}): {message}"),
527 other => anyhow::bail!("unexpected response: {other:?}"),
528 }
529 }
530 }
531
532 fn delete_mcp(&mut self, name: String) -> impl std::future::Future<Output = Result<()>> + Send {
534 async move {
535 match self
536 .request(ClientMessage {
537 msg: Some(client_message::Msg::DeleteMcp(DeleteMcpMsg { name })),
538 })
539 .await?
540 {
541 ServerMessage {
542 msg: Some(server_message::Msg::Pong(_)),
543 } => Ok(()),
544 ServerMessage {
545 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
546 } => anyhow::bail!("server error ({code}): {message}"),
547 other => anyhow::bail!("unexpected response: {other:?}"),
548 }
549 }
550 }
551
552 fn set_provider(
554 &mut self,
555 name: String,
556 config: String,
557 ) -> impl std::future::Future<Output = Result<ProviderInfo>> + Send {
558 async move {
559 match self
560 .request(ClientMessage {
561 msg: Some(client_message::Msg::SetProvider(SetProviderMsg {
562 name,
563 config,
564 })),
565 })
566 .await?
567 {
568 ServerMessage {
569 msg: Some(server_message::Msg::ProviderList(ProviderList { providers })),
570 } => providers
571 .into_iter()
572 .next()
573 .ok_or_else(|| anyhow::anyhow!("empty provider list in response")),
574 ServerMessage {
575 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
576 } => anyhow::bail!("server error ({code}): {message}"),
577 other => anyhow::bail!("unexpected response: {other:?}"),
578 }
579 }
580 }
581
582 fn delete_provider(
584 &mut self,
585 name: String,
586 ) -> impl std::future::Future<Output = Result<()>> + Send {
587 async move {
588 match self
589 .request(ClientMessage {
590 msg: Some(client_message::Msg::DeleteProvider(DeleteProviderMsg {
591 name,
592 })),
593 })
594 .await?
595 {
596 ServerMessage {
597 msg: Some(server_message::Msg::Pong(_)),
598 } => Ok(()),
599 ServerMessage {
600 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
601 } => anyhow::bail!("server error ({code}): {message}"),
602 other => anyhow::bail!("unexpected response: {other:?}"),
603 }
604 }
605 }
606
607 fn set_active_model(
609 &mut self,
610 model: String,
611 ) -> impl std::future::Future<Output = Result<()>> + Send {
612 async move {
613 match self
614 .request(ClientMessage {
615 msg: Some(client_message::Msg::SetActiveModel(SetActiveModelMsg {
616 model,
617 })),
618 })
619 .await?
620 {
621 ServerMessage {
622 msg: Some(server_message::Msg::Pong(_)),
623 } => Ok(()),
624 ServerMessage {
625 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
626 } => anyhow::bail!("server error ({code}): {message}"),
627 other => anyhow::bail!("unexpected response: {other:?}"),
628 }
629 }
630 }
631
632 fn list_provider_presets(
634 &mut self,
635 ) -> impl std::future::Future<Output = Result<Vec<ProviderPresetInfo>>> + Send {
636 async move {
637 match self
638 .request(ClientMessage {
639 msg: Some(client_message::Msg::ListProviderPresets(
640 ListProviderPresetsMsg {},
641 )),
642 })
643 .await?
644 {
645 ServerMessage {
646 msg:
647 Some(server_message::Msg::ProviderPresetList(ProviderPresetList { presets })),
648 } => Ok(presets),
649 ServerMessage {
650 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
651 } => anyhow::bail!("server error ({code}): {message}"),
652 other => anyhow::bail!("unexpected response: {other:?}"),
653 }
654 }
655 }
656
657 fn start_service(
659 &mut self,
660 name: String,
661 force: bool,
662 ) -> impl std::future::Future<Output = Result<()>> + Send {
663 async move {
664 match self
665 .request(ClientMessage {
666 msg: Some(client_message::Msg::StartService(StartServiceMsg {
667 name,
668 force,
669 })),
670 })
671 .await?
672 {
673 ServerMessage {
674 msg: Some(server_message::Msg::Pong(_)),
675 } => Ok(()),
676 ServerMessage {
677 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
678 } => {
679 anyhow::bail!("server error ({code}): {message}")
680 }
681 other => anyhow::bail!("unexpected response: {other:?}"),
682 }
683 }
684 }
685
686 fn stop_service(
688 &mut self,
689 name: String,
690 ) -> impl std::future::Future<Output = Result<()>> + Send {
691 async move {
692 match self
693 .request(ClientMessage {
694 msg: Some(client_message::Msg::StopService(StopServiceMsg { name })),
695 })
696 .await?
697 {
698 ServerMessage {
699 msg: Some(server_message::Msg::Pong(_)),
700 } => Ok(()),
701 ServerMessage {
702 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
703 } => {
704 anyhow::bail!("server error ({code}): {message}")
705 }
706 other => anyhow::bail!("unexpected response: {other:?}"),
707 }
708 }
709 }
710
711 fn list_skills(&mut self) -> impl std::future::Future<Output = Result<Vec<SkillInfo>>> + Send {
713 async move {
714 match self
715 .request(ClientMessage {
716 msg: Some(client_message::Msg::ListSkills(ListSkillsMsg {})),
717 })
718 .await?
719 {
720 ServerMessage {
721 msg: Some(server_message::Msg::SkillList(SkillList { skills })),
722 } => Ok(skills),
723 ServerMessage {
724 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
725 } => {
726 anyhow::bail!("server error ({code}): {message}")
727 }
728 other => anyhow::bail!("unexpected response: {other:?}"),
729 }
730 }
731 }
732
733 fn list_models(&mut self) -> impl std::future::Future<Output = Result<Vec<ModelInfo>>> + Send {
735 async move {
736 match self
737 .request(ClientMessage {
738 msg: Some(client_message::Msg::ListModels(ListModelsMsg {})),
739 })
740 .await?
741 {
742 ServerMessage {
743 msg: Some(server_message::Msg::ModelList(ModelList { models })),
744 } => Ok(models),
745 ServerMessage {
746 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
747 } => {
748 anyhow::bail!("server error ({code}): {message}")
749 }
750 other => anyhow::bail!("unexpected response: {other:?}"),
751 }
752 }
753 }
754
755 fn service_logs(
757 &mut self,
758 name: String,
759 lines: u32,
760 ) -> impl std::future::Future<Output = Result<String>> + Send {
761 async move {
762 match self
763 .request(ClientMessage {
764 msg: Some(client_message::Msg::ServiceLogs(ServiceLogsMsg {
765 name,
766 lines,
767 })),
768 })
769 .await?
770 {
771 ServerMessage {
772 msg: Some(server_message::Msg::ServiceLogOutput(ServiceLogOutput { content })),
773 } => Ok(content),
774 ServerMessage {
775 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
776 } => {
777 anyhow::bail!("server error ({code}): {message}")
778 }
779 other => anyhow::bail!("unexpected response: {other:?}"),
780 }
781 }
782 }
783
784 fn subscribe_event(
786 &mut self,
787 source: String,
788 target_agent: String,
789 once: bool,
790 ) -> impl std::future::Future<Output = Result<SubscriptionInfo>> + Send {
791 async move {
792 match self
793 .request(ClientMessage {
794 msg: Some(client_message::Msg::SubscribeEvent(SubscribeEventMsg {
795 source,
796 target_agent,
797 once,
798 })),
799 })
800 .await?
801 {
802 ServerMessage {
803 msg: Some(server_message::Msg::SubscriptionInfo(info)),
804 } => Ok(info),
805 ServerMessage {
806 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
807 } => {
808 anyhow::bail!("server error ({code}): {message}")
809 }
810 other => anyhow::bail!("unexpected response: {other:?}"),
811 }
812 }
813 }
814
815 fn unsubscribe_event(
817 &mut self,
818 id: u64,
819 ) -> impl std::future::Future<Output = Result<()>> + Send {
820 async move {
821 match self
822 .request(ClientMessage {
823 msg: Some(client_message::Msg::UnsubscribeEvent(UnsubscribeEventMsg {
824 id,
825 })),
826 })
827 .await?
828 {
829 ServerMessage {
830 msg: Some(server_message::Msg::Pong(_)),
831 } => Ok(()),
832 ServerMessage {
833 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
834 } => {
835 anyhow::bail!("server error ({code}): {message}")
836 }
837 other => anyhow::bail!("unexpected response: {other:?}"),
838 }
839 }
840 }
841
842 fn list_subscriptions(
844 &mut self,
845 ) -> impl std::future::Future<Output = Result<Vec<SubscriptionInfo>>> + Send {
846 async move {
847 match self
848 .request(ClientMessage {
849 msg: Some(client_message::Msg::ListSubscriptions(
850 ListSubscriptionsMsg {},
851 )),
852 })
853 .await?
854 {
855 ServerMessage {
856 msg:
857 Some(server_message::Msg::SubscriptionList(SubscriptionList { subscriptions })),
858 } => Ok(subscriptions),
859 ServerMessage {
860 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
861 } => {
862 anyhow::bail!("server error ({code}): {message}")
863 }
864 other => anyhow::bail!("unexpected response: {other:?}"),
865 }
866 }
867 }
868
869 fn publish_event(
871 &mut self,
872 source: String,
873 payload: String,
874 ) -> impl std::future::Future<Output = Result<()>> + Send {
875 async move {
876 match self
877 .request(ClientMessage {
878 msg: Some(client_message::Msg::PublishEvent(PublishEventMsg {
879 source,
880 payload,
881 })),
882 })
883 .await?
884 {
885 ServerMessage {
886 msg: Some(server_message::Msg::Pong(_)),
887 } => Ok(()),
888 ServerMessage {
889 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
890 } => {
891 anyhow::bail!("server error ({code}): {message}")
892 }
893 other => anyhow::bail!("unexpected response: {other:?}"),
894 }
895 }
896 }
897}