hyveos_sdk/services/
debug.rs

1use futures::{Stream, StreamExt as _, TryStreamExt as _};
2pub use hyveos_core::debug::{
3    MeshTopologyEvent, MessageDebugEvent, MessageDebugEventType, RequestDebugEvent,
4    ResponseDebugEvent,
5};
6#[cfg(docsrs)]
7use hyveos_core::discovery::NeighbourEvent;
8use hyveos_core::grpc::{debug_client::DebugClient, Empty};
9use tonic::transport::Channel;
10
11use crate::{connection::Connection, error::Result};
12
13/// A handle to the debug service.
14///
15/// Exposes methods to interact with the debug service. Currently, the debug service only provides
16/// a stream of mesh topology events, which are emitted whenever the mesh topology changes.
17///
18/// # Example
19///
20/// ```no_run
21/// use futures::StreamExt as _;
22/// use hyveos_sdk::Connection;
23///
24/// # #[tokio::main]
25/// # async fn main() {
26/// let connection = Connection::new().await.unwrap();
27/// let mut debug_service = connection.debug();
28/// let mut events = debug_service.subscribe_mesh_topology().await.unwrap();
29///
30/// while let Some(event) = events.next().await {
31///     println!("{event:?}");
32/// }
33/// # }
34/// ```
35#[derive(Debug, Clone)]
36pub struct Service {
37    client: DebugClient<Channel>,
38}
39
40impl Service {
41    pub(crate) fn new(connection: &Connection) -> Self {
42        let client = DebugClient::new(connection.channel.clone());
43
44        Self { client }
45    }
46
47    /// Subscribes to mesh topology events.
48    ///
49    /// Returns a stream of mesh topology events. The stream will emit an event whenever the mesh
50    /// topology changes.
51    ///
52    /// For each peer in the mesh, it is guaranteed that the stream will first emit an event with a
53    /// [`NeighbourEvent::Init`] when it enters the mesh, followed by only events with
54    /// [`NeighbourEvent::Discovered`] or [`NeighbourEvent::Lost`], until the peer leaves the mesh.
55    ///
56    /// # Errors
57    ///
58    /// Returns an error if the RPC call fails. The stream emits errors that occur in the runtime
59    /// while processing the events, as well as data conversion errors.
60    ///
61    /// # Example
62    ///
63    /// ```no_run
64    /// use futures::TryStreamExt as _;
65    /// use hyveos_sdk::Connection;
66    ///
67    /// # #[tokio::main]
68    /// # async fn main() {
69    /// let connection = Connection::new().await.unwrap();
70    /// let mut debug_service = connection.debug();
71    /// let mut events = debug_service.subscribe_mesh_topology().await.unwrap();
72    ///
73    /// while let Some(event) = events.try_next().await.unwrap() {
74    ///     println!("{event:?}");
75    /// }
76    /// # }
77    /// ```
78    #[tracing::instrument(skip(self))]
79    pub async fn subscribe_mesh_topology(
80        &mut self,
81    ) -> Result<impl Stream<Item = Result<MeshTopologyEvent>>> {
82        self.client
83            .subscribe_mesh_topology(Empty {})
84            .await
85            .map(|response| {
86                response
87                    .into_inner()
88                    .map_ok(TryInto::try_into)
89                    .map(|res| res?.map_err(Into::into))
90            })
91            .map_err(Into::into)
92    }
93
94    /// Subscribes to message debug events.
95    ///
96    /// Returns a stream of mesh debug events. The stream will emit an event whenever a request,
97    /// response, or gossipsub message is sent by a peer in the mesh.
98    ///
99    /// # Errors
100    ///
101    /// Returns an error if the RPC call fails. The stream emits errors that occur in the runtime
102    /// while processing the events, as well as data conversion errors.
103    ///
104    /// # Example
105    ///
106    /// ```no_run
107    /// use futures::TryStreamExt as _;
108    /// use hyveos_sdk::Connection;
109    ///
110    /// # #[tokio::main]
111    /// # async fn main() {
112    /// let connection = Connection::new().await.unwrap();
113    /// let mut debug_service = connection.debug();
114    /// let mut events = debug_service.subscribe_messages().await.unwrap();
115    ///
116    /// while let Some(event) = events.try_next().await.unwrap() {
117    ///     println!("{event:?}");
118    /// }
119    /// # }
120    /// ```
121    #[tracing::instrument(skip(self))]
122    pub async fn subscribe_messages(
123        &mut self,
124    ) -> Result<impl Stream<Item = Result<MessageDebugEvent>>> {
125        self.client
126            .subscribe_messages(Empty {})
127            .await
128            .map(|response| {
129                response
130                    .into_inner()
131                    .map_ok(TryInto::try_into)
132                    .map(|res| res?.map_err(Into::into))
133            })
134            .map_err(Into::into)
135    }
136}