bee_inx/
client.rs

1// Copyright 2022 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4use futures::stream::{Stream, StreamExt};
5use inx::{proto, proto::inx_client::InxClient, tonic};
6
7use crate::{Error, Milestone, MilestoneRangeRequest, MilestoneRequest, NodeConfiguration, NodeStatus};
8
9/// An INX client connection.
10#[derive(Clone, Debug)]
11pub struct Inx {
12    inx: InxClient<inx::tonic::transport::Channel>,
13}
14
15fn unpack_proto_msg<Proto, Bee>(msg: Result<Proto, tonic::Status>) -> Result<Bee, Error>
16where
17    Bee: TryFrom<Proto, Error = bee_block::InxError>,
18{
19    let inner = msg.map_err(Error::StatusCode)?;
20    Bee::try_from(inner).map_err(Error::InxError)
21}
22
23impl Inx {
24    /// Connect to the INX interface of a node.
25    pub async fn connect(address: String) -> Result<Self, Error> {
26        Ok(Self {
27            inx: InxClient::connect(address).await?,
28        })
29    }
30
31    /// Listens to confirmed milestones in the range of
32    pub async fn listen_to_confirmed_milestones(
33        &mut self,
34        request: MilestoneRangeRequest,
35    ) -> Result<impl Stream<Item = Result<crate::MilestoneAndProtocolParameters, Error>>, Error> {
36        Ok(self
37            .inx
38            .listen_to_confirmed_milestones(proto::MilestoneRangeRequest::from(request))
39            .await?
40            .into_inner()
41            .map(unpack_proto_msg))
42    }
43
44    pub async fn listen_to_ledger_updates(
45        &mut self,
46        request: MilestoneRangeRequest,
47    ) -> Result<impl Stream<Item = Result<crate::LedgerUpdate, Error>>, Error> {
48        Ok(self
49            .inx
50            .listen_to_ledger_updates(proto::MilestoneRangeRequest::from(request))
51            .await?
52            .into_inner()
53            .map(unpack_proto_msg))
54    }
55
56    pub async fn read_node_status(&mut self) -> Result<NodeStatus, Error> {
57        NodeStatus::try_from(self.inx.read_node_status(proto::NoParams {}).await?.into_inner()).map_err(Error::InxError)
58    }
59
60    pub async fn read_node_configuration(&mut self) -> Result<NodeConfiguration, Error> {
61        NodeConfiguration::try_from(self.inx.read_node_configuration(proto::NoParams {}).await?.into_inner())
62            .map_err(Error::InxError)
63    }
64
65    pub async fn read_unspent_outputs(
66        &mut self,
67    ) -> Result<impl Stream<Item = Result<crate::UnspentOutput, Error>>, Error> {
68        Ok(self
69            .inx
70            .read_unspent_outputs(proto::NoParams {})
71            .await?
72            .into_inner()
73            .map(unpack_proto_msg))
74    }
75
76    pub async fn read_protocol_parameters(
77        &mut self,
78        request: MilestoneRequest,
79    ) -> Result<crate::ProtocolParameters, Error> {
80        Ok(self
81            .inx
82            .read_protocol_parameters(proto::MilestoneRequest::from(request))
83            .await?
84            .into_inner()
85            .into())
86    }
87
88    /// Reads the past cone of a milestone specified by a [`MilestoneRequest`].
89    pub async fn read_milestone_cone(
90        &mut self,
91        request: MilestoneRequest,
92    ) -> Result<impl Stream<Item = Result<crate::BlockWithMetadata, Error>>, Error> {
93        Ok(self
94            .inx
95            .read_milestone_cone(proto::MilestoneRequest::from(request))
96            .await?
97            .into_inner()
98            .map(unpack_proto_msg))
99    }
100
101    pub async fn read_milestone(&mut self, request: MilestoneRequest) -> Result<Milestone, Error> {
102        Milestone::try_from(
103            self.inx
104                .read_milestone(proto::MilestoneRequest::from(request))
105                .await?
106                .into_inner(),
107        )
108        .map_err(Error::InxError)
109    }
110}