hyveos_sdk/services/debug.rs
1use futures::{Stream, StreamExt as _, TryStreamExt as _};
2pub use hyveos_core::debug::{
3 MeshTopologyEvent, MessageDebugEvent, MessageDebugEventType, RequestDebugEvent,
4 ResponseDebugEvent,
5};
6#[cfg(docsrs)]
7use hyveos_core::discovery::NeighbourEvent;
8use hyveos_core::grpc::{debug_client::DebugClient, Empty};
9use tonic::transport::Channel;
10
11use crate::{connection::Connection, error::Result};
12
13/// A handle to the debug service.
14///
15/// Exposes methods to interact with the debug service. Currently, the debug service only provides
16/// a stream of mesh topology events, which are emitted whenever the mesh topology changes.
17///
18/// # Example
19///
20/// ```no_run
21/// use futures::StreamExt as _;
22/// use hyveos_sdk::Connection;
23///
24/// # #[tokio::main]
25/// # async fn main() {
26/// let connection = Connection::new().await.unwrap();
27/// let mut debug_service = connection.debug();
28/// let mut events = debug_service.subscribe_mesh_topology().await.unwrap();
29///
30/// while let Some(event) = events.next().await {
31/// println!("{event:?}");
32/// }
33/// # }
34/// ```
35#[derive(Debug, Clone)]
36pub struct Service {
37 client: DebugClient<Channel>,
38}
39
40impl Service {
41 pub(crate) fn new(connection: &Connection) -> Self {
42 let client = DebugClient::new(connection.channel.clone());
43
44 Self { client }
45 }
46
47 /// Subscribes to mesh topology events.
48 ///
49 /// Returns a stream of mesh topology events. The stream will emit an event whenever the mesh
50 /// topology changes.
51 ///
52 /// For each peer in the mesh, it is guaranteed that the stream will first emit an event with a
53 /// [`NeighbourEvent::Init`] when it enters the mesh, followed by only events with
54 /// [`NeighbourEvent::Discovered`] or [`NeighbourEvent::Lost`], until the peer leaves the mesh.
55 ///
56 /// # Errors
57 ///
58 /// Returns an error if the RPC call fails. The stream emits errors that occur in the runtime
59 /// while processing the events, as well as data conversion errors.
60 ///
61 /// # Example
62 ///
63 /// ```no_run
64 /// use futures::TryStreamExt as _;
65 /// use hyveos_sdk::Connection;
66 ///
67 /// # #[tokio::main]
68 /// # async fn main() {
69 /// let connection = Connection::new().await.unwrap();
70 /// let mut debug_service = connection.debug();
71 /// let mut events = debug_service.subscribe_mesh_topology().await.unwrap();
72 ///
73 /// while let Some(event) = events.try_next().await.unwrap() {
74 /// println!("{event:?}");
75 /// }
76 /// # }
77 /// ```
78 #[tracing::instrument(skip(self))]
79 pub async fn subscribe_mesh_topology(
80 &mut self,
81 ) -> Result<impl Stream<Item = Result<MeshTopologyEvent>>> {
82 self.client
83 .subscribe_mesh_topology(Empty {})
84 .await
85 .map(|response| {
86 response
87 .into_inner()
88 .map_ok(TryInto::try_into)
89 .map(|res| res?.map_err(Into::into))
90 })
91 .map_err(Into::into)
92 }
93
94 /// Subscribes to message debug events.
95 ///
96 /// Returns a stream of mesh debug events. The stream will emit an event whenever a request,
97 /// response, or gossipsub message is sent by a peer in the mesh.
98 ///
99 /// # Errors
100 ///
101 /// Returns an error if the RPC call fails. The stream emits errors that occur in the runtime
102 /// while processing the events, as well as data conversion errors.
103 ///
104 /// # Example
105 ///
106 /// ```no_run
107 /// use futures::TryStreamExt as _;
108 /// use hyveos_sdk::Connection;
109 ///
110 /// # #[tokio::main]
111 /// # async fn main() {
112 /// let connection = Connection::new().await.unwrap();
113 /// let mut debug_service = connection.debug();
114 /// let mut events = debug_service.subscribe_messages().await.unwrap();
115 ///
116 /// while let Some(event) = events.try_next().await.unwrap() {
117 /// println!("{event:?}");
118 /// }
119 /// # }
120 /// ```
121 #[tracing::instrument(skip(self))]
122 pub async fn subscribe_messages(
123 &mut self,
124 ) -> Result<impl Stream<Item = Result<MessageDebugEvent>>> {
125 self.client
126 .subscribe_messages(Empty {})
127 .await
128 .map(|response| {
129 response
130 .into_inner()
131 .map_ok(TryInto::try_into)
132 .map(|res| res?.map_err(Into::into))
133 })
134 .map_err(Into::into)
135 }
136}