Skip to main content

crabtalk_core/protocol/api/
client.rs

1//! Client trait — transport primitives plus typed provided methods.
2
3use crate::protocol::message::{
4    AgentInfo, AgentList, ClientMessage, ConversationHistory, ConversationInfo, ConversationList,
5    CreateAgentMsg, DaemonStats, DeleteAgentMsg, DeleteConversationMsg, DeleteMcpMsg,
6    DeleteProviderMsg, ErrorMsg, GetAgentMsg, GetConversationHistoryMsg, GetStats,
7    InstallPluginMsg, ListAgentsMsg, ListConversationsMsg, ListMcpsMsg, ListModelsMsg,
8    ListPluginsMsg, ListProviderPresetsMsg, ListProvidersMsg, ListSkillsMsg, ListSubscriptionsMsg,
9    McpInfo, McpList, ModelInfo, ModelList, Ping, PluginEvent, PluginInfo, PluginList,
10    PluginSearchList, ProviderInfo, ProviderList, ProviderPresetInfo, ProviderPresetList,
11    PublishEventMsg, RenameAgentMsg, SearchPluginsMsg, SendMsg, SendResponse, ServerMessage,
12    ServiceLogOutput, ServiceLogsMsg, SetActiveModelMsg, SetProviderMsg, SkillInfo, SkillList,
13    StartServiceMsg, StopServiceMsg, StreamEvent, StreamMsg, SubscribeEventMsg, SubscriptionInfo,
14    SubscriptionList, UninstallPluginMsg, UnsubscribeEventMsg, UpdateAgentMsg, UpsertMcpMsg,
15    client_message, plugin_event, server_message, stream_event,
16};
17use anyhow::Result;
18use futures_core::Stream;
19use futures_util::StreamExt;
20
21/// Client-side protocol interface.
22///
23/// Implementors provide two transport primitives — [`request`](Client::request)
24/// for request-response and [`request_stream`](Client::request_stream) for
25/// streaming operations. All typed methods are provided defaults that delegate
26/// to these primitives.
27pub trait Client: Send {
28    /// Send a `ClientMessage` and receive a single `ServerMessage`.
29    fn request(
30        &mut self,
31        msg: ClientMessage,
32    ) -> impl std::future::Future<Output = Result<ServerMessage>> + Send;
33
34    /// Send a `ClientMessage` and receive a stream of `ServerMessage`s.
35    ///
36    /// This is a raw transport primitive — the stream reads indefinitely.
37    /// Callers must detect the terminal sentinel (e.g. `StreamEnd`)
38    /// and stop consuming. The typed streaming methods handle this
39    /// automatically.
40    fn request_stream(
41        &mut self,
42        msg: ClientMessage,
43    ) -> impl Stream<Item = Result<ServerMessage>> + Send + '_;
44
45    /// Send a message to an agent and receive a complete response.
46    fn send(
47        &mut self,
48        req: SendMsg,
49    ) -> impl std::future::Future<Output = Result<SendResponse>> + Send {
50        async move { SendResponse::try_from(self.request(req.into()).await?) }
51    }
52
53    /// Send a message to an agent and receive a streamed response.
54    fn stream(
55        &mut self,
56        req: StreamMsg,
57    ) -> impl Stream<Item = Result<stream_event::Event>> + Send + '_ {
58        self.request_stream(req.into())
59            .take_while(|r| {
60                std::future::ready(!matches!(
61                    r,
62                    Ok(ServerMessage {
63                        msg: Some(server_message::Msg::Stream(StreamEvent {
64                            event: Some(stream_event::Event::End(_))
65                        }))
66                    })
67                ))
68            })
69            .map(|r| r.and_then(stream_event::Event::try_from))
70    }
71
72    /// Ping the server (keepalive).
73    fn ping(&mut self) -> impl std::future::Future<Output = Result<()>> + Send {
74        async move {
75            match self
76                .request(ClientMessage {
77                    msg: Some(client_message::Msg::Ping(Ping {})),
78                })
79                .await?
80            {
81                ServerMessage {
82                    msg: Some(server_message::Msg::Pong(_)),
83                } => Ok(()),
84                ServerMessage {
85                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
86                } => {
87                    anyhow::bail!("server error ({code}): {message}")
88                }
89                other => anyhow::bail!("unexpected response: {other:?}"),
90            }
91        }
92    }
93
94    /// Get daemon stats including the active model name.
95    fn get_stats(&mut self) -> impl std::future::Future<Output = Result<DaemonStats>> + Send {
96        async move {
97            match self
98                .request(ClientMessage {
99                    msg: Some(client_message::Msg::GetStats(GetStats {})),
100                })
101                .await?
102            {
103                ServerMessage {
104                    msg: Some(server_message::Msg::Stats(stats)),
105                } => Ok(stats),
106                ServerMessage {
107                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
108                } => {
109                    anyhow::bail!("server error ({code}): {message}")
110                }
111                other => anyhow::bail!("unexpected response: {other:?}"),
112            }
113        }
114    }
115
116    /// List all registered agents.
117    fn list_agents(&mut self) -> impl std::future::Future<Output = Result<Vec<AgentInfo>>> + Send {
118        async move {
119            match self
120                .request(ClientMessage {
121                    msg: Some(client_message::Msg::ListAgents(ListAgentsMsg {})),
122                })
123                .await?
124            {
125                ServerMessage {
126                    msg: Some(server_message::Msg::AgentList(AgentList { agents })),
127                } => Ok(agents),
128                ServerMessage {
129                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
130                } => {
131                    anyhow::bail!("server error ({code}): {message}")
132                }
133                other => anyhow::bail!("unexpected response: {other:?}"),
134            }
135        }
136    }
137
138    /// Get a single agent by name.
139    fn get_agent(
140        &mut self,
141        name: String,
142    ) -> impl std::future::Future<Output = Result<AgentInfo>> + Send {
143        async move {
144            match self
145                .request(ClientMessage {
146                    msg: Some(client_message::Msg::GetAgent(GetAgentMsg { name })),
147                })
148                .await?
149            {
150                ServerMessage {
151                    msg: Some(server_message::Msg::AgentInfo(info)),
152                } => Ok(info),
153                ServerMessage {
154                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
155                } => {
156                    anyhow::bail!("server error ({code}): {message}")
157                }
158                other => anyhow::bail!("unexpected response: {other:?}"),
159            }
160        }
161    }
162
163    /// Create an agent from JSON config and system prompt.
164    fn create_agent(
165        &mut self,
166        name: String,
167        config: String,
168        prompt: String,
169    ) -> impl std::future::Future<Output = Result<AgentInfo>> + Send {
170        async move {
171            match self
172                .request(ClientMessage {
173                    msg: Some(client_message::Msg::CreateAgent(CreateAgentMsg {
174                        name,
175                        config,
176                        prompt,
177                    })),
178                })
179                .await?
180            {
181                ServerMessage {
182                    msg: Some(server_message::Msg::AgentInfo(info)),
183                } => Ok(info),
184                ServerMessage {
185                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
186                } => {
187                    anyhow::bail!("server error ({code}): {message}")
188                }
189                other => anyhow::bail!("unexpected response: {other:?}"),
190            }
191        }
192    }
193
194    /// Update an agent from JSON config and optional system prompt.
195    fn update_agent(
196        &mut self,
197        name: String,
198        config: String,
199        prompt: String,
200    ) -> impl std::future::Future<Output = Result<AgentInfo>> + Send {
201        async move {
202            match self
203                .request(ClientMessage {
204                    msg: Some(client_message::Msg::UpdateAgent(UpdateAgentMsg {
205                        name,
206                        config,
207                        prompt,
208                    })),
209                })
210                .await?
211            {
212                ServerMessage {
213                    msg: Some(server_message::Msg::AgentInfo(info)),
214                } => Ok(info),
215                ServerMessage {
216                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
217                } => {
218                    anyhow::bail!("server error ({code}): {message}")
219                }
220                other => anyhow::bail!("unexpected response: {other:?}"),
221            }
222        }
223    }
224
225    /// Delete an agent by name.
226    fn delete_agent(
227        &mut self,
228        name: String,
229    ) -> impl std::future::Future<Output = Result<()>> + Send {
230        async move {
231            match self
232                .request(ClientMessage {
233                    msg: Some(client_message::Msg::DeleteAgent(DeleteAgentMsg { name })),
234                })
235                .await?
236            {
237                ServerMessage {
238                    msg: Some(server_message::Msg::Pong(_)),
239                } => Ok(()),
240                ServerMessage {
241                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
242                } => {
243                    anyhow::bail!("server error ({code}): {message}")
244                }
245                other => anyhow::bail!("unexpected response: {other:?}"),
246            }
247        }
248    }
249
250    /// Rename an agent. The agent's stored ULID stays stable.
251    fn rename_agent(
252        &mut self,
253        old_name: String,
254        new_name: String,
255    ) -> impl std::future::Future<Output = Result<AgentInfo>> + Send {
256        async move {
257            match self
258                .request(ClientMessage {
259                    msg: Some(client_message::Msg::RenameAgent(RenameAgentMsg {
260                        old_name,
261                        new_name,
262                    })),
263                })
264                .await?
265            {
266                ServerMessage {
267                    msg: Some(server_message::Msg::AgentInfo(info)),
268                } => Ok(info),
269                ServerMessage {
270                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
271                } => {
272                    anyhow::bail!("server error ({code}): {message}")
273                }
274                other => anyhow::bail!("unexpected response: {other:?}"),
275            }
276        }
277    }
278
279    /// List all registered LLM providers.
280    fn list_providers(
281        &mut self,
282    ) -> impl std::future::Future<Output = Result<Vec<ProviderInfo>>> + Send {
283        async move {
284            match self
285                .request(ClientMessage {
286                    msg: Some(client_message::Msg::ListProviders(ListProvidersMsg {})),
287                })
288                .await?
289            {
290                ServerMessage {
291                    msg: Some(server_message::Msg::ProviderList(ProviderList { providers })),
292                } => Ok(providers),
293                ServerMessage {
294                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
295                } => {
296                    anyhow::bail!("server error ({code}): {message}")
297                }
298                other => anyhow::bail!("unexpected response: {other:?}"),
299            }
300        }
301    }
302
303    /// Install a plugin, streaming progress events.
304    fn install_plugin(
305        &mut self,
306        plugin: String,
307        branch: String,
308        path: String,
309        force: bool,
310    ) -> impl Stream<Item = Result<plugin_event::Event>> + Send + '_ {
311        self.request_stream(ClientMessage {
312            msg: Some(client_message::Msg::InstallPlugin(InstallPluginMsg {
313                plugin,
314                branch,
315                path,
316                force,
317            })),
318        })
319        .take_while(|r| {
320            std::future::ready(!matches!(
321                r,
322                Ok(ServerMessage {
323                    msg: Some(server_message::Msg::PluginEvent(PluginEvent {
324                        event: Some(plugin_event::Event::Done(d))
325                    }))
326                }) if d.error.is_empty()
327            ))
328        })
329        .map(|r| r.and_then(plugin_event::Event::try_from))
330    }
331
332    /// Uninstall a plugin, streaming progress events.
333    fn uninstall_plugin(
334        &mut self,
335        plugin: String,
336    ) -> impl Stream<Item = Result<plugin_event::Event>> + Send + '_ {
337        self.request_stream(ClientMessage {
338            msg: Some(client_message::Msg::UninstallPlugin(UninstallPluginMsg {
339                plugin,
340            })),
341        })
342        .take_while(|r| {
343            std::future::ready(!matches!(
344                r,
345                Ok(ServerMessage {
346                    msg: Some(server_message::Msg::PluginEvent(PluginEvent {
347                        event: Some(plugin_event::Event::Done(d))
348                    }))
349                }) if d.error.is_empty()
350            ))
351        })
352        .map(|r| r.and_then(plugin_event::Event::try_from))
353    }
354
355    /// List installed plugins.
356    fn list_plugins(
357        &mut self,
358    ) -> impl std::future::Future<Output = Result<Vec<PluginInfo>>> + Send {
359        async move {
360            match self
361                .request(ClientMessage {
362                    msg: Some(client_message::Msg::ListPlugins(ListPluginsMsg {})),
363                })
364                .await?
365            {
366                ServerMessage {
367                    msg: Some(server_message::Msg::PluginList(PluginList { plugins })),
368                } => Ok(plugins),
369                ServerMessage {
370                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
371                } => {
372                    anyhow::bail!("server error ({code}): {message}")
373                }
374                other => anyhow::bail!("unexpected response: {other:?}"),
375            }
376        }
377    }
378
379    /// Search plugin registry for available plugins.
380    fn search_plugins(
381        &mut self,
382        query: String,
383    ) -> impl std::future::Future<Output = Result<Vec<PluginInfo>>> + Send {
384        async move {
385            match self
386                .request(ClientMessage {
387                    msg: Some(client_message::Msg::SearchPlugins(SearchPluginsMsg {
388                        query,
389                    })),
390                })
391                .await?
392            {
393                ServerMessage {
394                    msg: Some(server_message::Msg::PluginSearchList(PluginSearchList { plugins })),
395                } => Ok(plugins),
396                ServerMessage {
397                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
398                } => {
399                    anyhow::bail!("server error ({code}): {message}")
400                }
401                other => anyhow::bail!("unexpected response: {other:?}"),
402            }
403        }
404    }
405
406    /// List historical conversations from disk.
407    fn list_conversations(
408        &mut self,
409        agent: String,
410        sender: String,
411    ) -> impl std::future::Future<Output = Result<Vec<ConversationInfo>>> + Send {
412        async move {
413            match self
414                .request(ClientMessage {
415                    msg: Some(client_message::Msg::ListConversations(
416                        ListConversationsMsg { agent, sender },
417                    )),
418                })
419                .await?
420            {
421                ServerMessage {
422                    msg:
423                        Some(server_message::Msg::ConversationList(ConversationList { conversations })),
424                } => Ok(conversations),
425                ServerMessage {
426                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
427                } => {
428                    anyhow::bail!("server error ({code}): {message}")
429                }
430                other => anyhow::bail!("unexpected response: {other:?}"),
431            }
432        }
433    }
434
435    /// Load conversation history from a session file.
436    fn get_conversation_history(
437        &mut self,
438        file_path: String,
439    ) -> impl std::future::Future<Output = Result<ConversationHistory>> + Send {
440        async move {
441            match self
442                .request(ClientMessage {
443                    msg: Some(client_message::Msg::GetConversationHistory(
444                        GetConversationHistoryMsg { file_path },
445                    )),
446                })
447                .await?
448            {
449                ServerMessage {
450                    msg: Some(server_message::Msg::ConversationHistory(history)),
451                } => Ok(history),
452                ServerMessage {
453                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
454                } => {
455                    anyhow::bail!("server error ({code}): {message}")
456                }
457                other => anyhow::bail!("unexpected response: {other:?}"),
458            }
459        }
460    }
461
462    /// Delete a conversation file from disk.
463    fn delete_conversation(
464        &mut self,
465        file_path: String,
466    ) -> impl std::future::Future<Output = Result<()>> + Send {
467        async move {
468            match self
469                .request(ClientMessage {
470                    msg: Some(client_message::Msg::DeleteConversation(
471                        DeleteConversationMsg { file_path },
472                    )),
473                })
474                .await?
475            {
476                ServerMessage {
477                    msg: Some(server_message::Msg::Pong(_)),
478                } => Ok(()),
479                ServerMessage {
480                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
481                } => {
482                    anyhow::bail!("server error ({code}): {message}")
483                }
484                other => anyhow::bail!("unexpected response: {other:?}"),
485            }
486        }
487    }
488
489    /// List all MCP server configs.
490    fn list_mcps(&mut self) -> impl std::future::Future<Output = Result<Vec<McpInfo>>> + Send {
491        async move {
492            match self
493                .request(ClientMessage {
494                    msg: Some(client_message::Msg::ListMcps(ListMcpsMsg {})),
495                })
496                .await?
497            {
498                ServerMessage {
499                    msg: Some(server_message::Msg::McpList(McpList { mcps })),
500                } => Ok(mcps),
501                ServerMessage {
502                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
503                } => anyhow::bail!("server error ({code}): {message}"),
504                other => anyhow::bail!("unexpected response: {other:?}"),
505            }
506        }
507    }
508
509    /// Create or replace an MCP server (Storage-backed).
510    fn upsert_mcp(
511        &mut self,
512        config: String,
513    ) -> impl std::future::Future<Output = Result<McpInfo>> + Send {
514        async move {
515            match self
516                .request(ClientMessage {
517                    msg: Some(client_message::Msg::UpsertMcp(UpsertMcpMsg { config })),
518                })
519                .await?
520            {
521                ServerMessage {
522                    msg: Some(server_message::Msg::McpInfo(info)),
523                } => Ok(info),
524                ServerMessage {
525                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
526                } => anyhow::bail!("server error ({code}): {message}"),
527                other => anyhow::bail!("unexpected response: {other:?}"),
528            }
529        }
530    }
531
532    /// Delete an MCP server by name.
533    fn delete_mcp(&mut self, name: String) -> impl std::future::Future<Output = Result<()>> + Send {
534        async move {
535            match self
536                .request(ClientMessage {
537                    msg: Some(client_message::Msg::DeleteMcp(DeleteMcpMsg { name })),
538                })
539                .await?
540            {
541                ServerMessage {
542                    msg: Some(server_message::Msg::Pong(_)),
543                } => Ok(()),
544                ServerMessage {
545                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
546                } => anyhow::bail!("server error ({code}): {message}"),
547                other => anyhow::bail!("unexpected response: {other:?}"),
548            }
549        }
550    }
551
552    /// Create or update a provider.
553    fn set_provider(
554        &mut self,
555        name: String,
556        config: String,
557    ) -> impl std::future::Future<Output = Result<ProviderInfo>> + Send {
558        async move {
559            match self
560                .request(ClientMessage {
561                    msg: Some(client_message::Msg::SetProvider(SetProviderMsg {
562                        name,
563                        config,
564                    })),
565                })
566                .await?
567            {
568                ServerMessage {
569                    msg: Some(server_message::Msg::ProviderList(ProviderList { providers })),
570                } => providers
571                    .into_iter()
572                    .next()
573                    .ok_or_else(|| anyhow::anyhow!("empty provider list in response")),
574                ServerMessage {
575                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
576                } => anyhow::bail!("server error ({code}): {message}"),
577                other => anyhow::bail!("unexpected response: {other:?}"),
578            }
579        }
580    }
581
582    /// Delete a provider by name.
583    fn delete_provider(
584        &mut self,
585        name: String,
586    ) -> impl std::future::Future<Output = Result<()>> + Send {
587        async move {
588            match self
589                .request(ClientMessage {
590                    msg: Some(client_message::Msg::DeleteProvider(DeleteProviderMsg {
591                        name,
592                    })),
593                })
594                .await?
595            {
596                ServerMessage {
597                    msg: Some(server_message::Msg::Pong(_)),
598                } => Ok(()),
599                ServerMessage {
600                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
601                } => anyhow::bail!("server error ({code}): {message}"),
602                other => anyhow::bail!("unexpected response: {other:?}"),
603            }
604        }
605    }
606
607    /// Set the active model.
608    fn set_active_model(
609        &mut self,
610        model: String,
611    ) -> impl std::future::Future<Output = Result<()>> + Send {
612        async move {
613            match self
614                .request(ClientMessage {
615                    msg: Some(client_message::Msg::SetActiveModel(SetActiveModelMsg {
616                        model,
617                    })),
618                })
619                .await?
620            {
621                ServerMessage {
622                    msg: Some(server_message::Msg::Pong(_)),
623                } => Ok(()),
624                ServerMessage {
625                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
626                } => anyhow::bail!("server error ({code}): {message}"),
627                other => anyhow::bail!("unexpected response: {other:?}"),
628            }
629        }
630    }
631
632    /// List provider presets.
633    fn list_provider_presets(
634        &mut self,
635    ) -> impl std::future::Future<Output = Result<Vec<ProviderPresetInfo>>> + Send {
636        async move {
637            match self
638                .request(ClientMessage {
639                    msg: Some(client_message::Msg::ListProviderPresets(
640                        ListProviderPresetsMsg {},
641                    )),
642                })
643                .await?
644            {
645                ServerMessage {
646                    msg:
647                        Some(server_message::Msg::ProviderPresetList(ProviderPresetList { presets })),
648                } => Ok(presets),
649                ServerMessage {
650                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
651                } => anyhow::bail!("server error ({code}): {message}"),
652                other => anyhow::bail!("unexpected response: {other:?}"),
653            }
654        }
655    }
656
657    /// Start a command service.
658    fn start_service(
659        &mut self,
660        name: String,
661        force: bool,
662    ) -> impl std::future::Future<Output = Result<()>> + Send {
663        async move {
664            match self
665                .request(ClientMessage {
666                    msg: Some(client_message::Msg::StartService(StartServiceMsg {
667                        name,
668                        force,
669                    })),
670                })
671                .await?
672            {
673                ServerMessage {
674                    msg: Some(server_message::Msg::Pong(_)),
675                } => Ok(()),
676                ServerMessage {
677                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
678                } => {
679                    anyhow::bail!("server error ({code}): {message}")
680                }
681                other => anyhow::bail!("unexpected response: {other:?}"),
682            }
683        }
684    }
685
686    /// Stop a command service.
687    fn stop_service(
688        &mut self,
689        name: String,
690    ) -> impl std::future::Future<Output = Result<()>> + Send {
691        async move {
692            match self
693                .request(ClientMessage {
694                    msg: Some(client_message::Msg::StopService(StopServiceMsg { name })),
695                })
696                .await?
697            {
698                ServerMessage {
699                    msg: Some(server_message::Msg::Pong(_)),
700                } => Ok(()),
701                ServerMessage {
702                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
703                } => {
704                    anyhow::bail!("server error ({code}): {message}")
705                }
706                other => anyhow::bail!("unexpected response: {other:?}"),
707            }
708        }
709    }
710
711    /// List all available skills with enabled state.
712    fn list_skills(&mut self) -> impl std::future::Future<Output = Result<Vec<SkillInfo>>> + Send {
713        async move {
714            match self
715                .request(ClientMessage {
716                    msg: Some(client_message::Msg::ListSkills(ListSkillsMsg {})),
717                })
718                .await?
719            {
720                ServerMessage {
721                    msg: Some(server_message::Msg::SkillList(SkillList { skills })),
722                } => Ok(skills),
723                ServerMessage {
724                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
725                } => {
726                    anyhow::bail!("server error ({code}): {message}")
727                }
728                other => anyhow::bail!("unexpected response: {other:?}"),
729            }
730        }
731    }
732
733    /// List all resolved models with provider and active state.
734    fn list_models(&mut self) -> impl std::future::Future<Output = Result<Vec<ModelInfo>>> + Send {
735        async move {
736            match self
737                .request(ClientMessage {
738                    msg: Some(client_message::Msg::ListModels(ListModelsMsg {})),
739                })
740                .await?
741            {
742                ServerMessage {
743                    msg: Some(server_message::Msg::ModelList(ModelList { models })),
744                } => Ok(models),
745                ServerMessage {
746                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
747                } => {
748                    anyhow::bail!("server error ({code}): {message}")
749                }
750                other => anyhow::bail!("unexpected response: {other:?}"),
751            }
752        }
753    }
754
755    /// Get recent log lines for a service.
756    fn service_logs(
757        &mut self,
758        name: String,
759        lines: u32,
760    ) -> impl std::future::Future<Output = Result<String>> + Send {
761        async move {
762            match self
763                .request(ClientMessage {
764                    msg: Some(client_message::Msg::ServiceLogs(ServiceLogsMsg {
765                        name,
766                        lines,
767                    })),
768                })
769                .await?
770            {
771                ServerMessage {
772                    msg: Some(server_message::Msg::ServiceLogOutput(ServiceLogOutput { content })),
773                } => Ok(content),
774                ServerMessage {
775                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
776                } => {
777                    anyhow::bail!("server error ({code}): {message}")
778                }
779                other => anyhow::bail!("unexpected response: {other:?}"),
780            }
781        }
782    }
783
784    /// Create an event bus subscription.
785    fn subscribe_event(
786        &mut self,
787        source: String,
788        target_agent: String,
789        once: bool,
790    ) -> impl std::future::Future<Output = Result<SubscriptionInfo>> + Send {
791        async move {
792            match self
793                .request(ClientMessage {
794                    msg: Some(client_message::Msg::SubscribeEvent(SubscribeEventMsg {
795                        source,
796                        target_agent,
797                        once,
798                    })),
799                })
800                .await?
801            {
802                ServerMessage {
803                    msg: Some(server_message::Msg::SubscriptionInfo(info)),
804                } => Ok(info),
805                ServerMessage {
806                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
807                } => {
808                    anyhow::bail!("server error ({code}): {message}")
809                }
810                other => anyhow::bail!("unexpected response: {other:?}"),
811            }
812        }
813    }
814
815    /// Remove an event bus subscription.
816    fn unsubscribe_event(
817        &mut self,
818        id: u64,
819    ) -> impl std::future::Future<Output = Result<()>> + Send {
820        async move {
821            match self
822                .request(ClientMessage {
823                    msg: Some(client_message::Msg::UnsubscribeEvent(UnsubscribeEventMsg {
824                        id,
825                    })),
826                })
827                .await?
828            {
829                ServerMessage {
830                    msg: Some(server_message::Msg::Pong(_)),
831                } => Ok(()),
832                ServerMessage {
833                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
834                } => {
835                    anyhow::bail!("server error ({code}): {message}")
836                }
837                other => anyhow::bail!("unexpected response: {other:?}"),
838            }
839        }
840    }
841
842    /// List all event bus subscriptions.
843    fn list_subscriptions(
844        &mut self,
845    ) -> impl std::future::Future<Output = Result<Vec<SubscriptionInfo>>> + Send {
846        async move {
847            match self
848                .request(ClientMessage {
849                    msg: Some(client_message::Msg::ListSubscriptions(
850                        ListSubscriptionsMsg {},
851                    )),
852                })
853                .await?
854            {
855                ServerMessage {
856                    msg:
857                        Some(server_message::Msg::SubscriptionList(SubscriptionList { subscriptions })),
858                } => Ok(subscriptions),
859                ServerMessage {
860                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
861                } => {
862                    anyhow::bail!("server error ({code}): {message}")
863                }
864                other => anyhow::bail!("unexpected response: {other:?}"),
865            }
866        }
867    }
868
869    /// Publish an event to the bus.
870    fn publish_event(
871        &mut self,
872        source: String,
873        payload: String,
874    ) -> impl std::future::Future<Output = Result<()>> + Send {
875        async move {
876            match self
877                .request(ClientMessage {
878                    msg: Some(client_message::Msg::PublishEvent(PublishEventMsg {
879                        source,
880                        payload,
881                    })),
882                })
883                .await?
884            {
885                ServerMessage {
886                    msg: Some(server_message::Msg::Pong(_)),
887                } => Ok(()),
888                ServerMessage {
889                    msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
890                } => {
891                    anyhow::bail!("server error ({code}): {message}")
892                }
893                other => anyhow::bail!("unexpected response: {other:?}"),
894            }
895        }
896    }
897}