1use futures::stream::{Stream, StreamExt};
5use inx::{proto, proto::inx_client::InxClient, tonic};
6
7use crate::{Error, Milestone, MilestoneRangeRequest, MilestoneRequest, NodeConfiguration, NodeStatus};
8
9#[derive(Clone, Debug)]
11pub struct Inx {
12 inx: InxClient<inx::tonic::transport::Channel>,
13}
14
15fn unpack_proto_msg<Proto, Bee>(msg: Result<Proto, tonic::Status>) -> Result<Bee, Error>
16where
17 Bee: TryFrom<Proto, Error = bee_block::InxError>,
18{
19 let inner = msg.map_err(Error::StatusCode)?;
20 Bee::try_from(inner).map_err(Error::InxError)
21}
22
23impl Inx {
24 pub async fn connect(address: String) -> Result<Self, Error> {
26 Ok(Self {
27 inx: InxClient::connect(address).await?,
28 })
29 }
30
31 pub async fn listen_to_confirmed_milestones(
33 &mut self,
34 request: MilestoneRangeRequest,
35 ) -> Result<impl Stream<Item = Result<crate::MilestoneAndProtocolParameters, Error>>, Error> {
36 Ok(self
37 .inx
38 .listen_to_confirmed_milestones(proto::MilestoneRangeRequest::from(request))
39 .await?
40 .into_inner()
41 .map(unpack_proto_msg))
42 }
43
44 pub async fn listen_to_ledger_updates(
45 &mut self,
46 request: MilestoneRangeRequest,
47 ) -> Result<impl Stream<Item = Result<crate::LedgerUpdate, Error>>, Error> {
48 Ok(self
49 .inx
50 .listen_to_ledger_updates(proto::MilestoneRangeRequest::from(request))
51 .await?
52 .into_inner()
53 .map(unpack_proto_msg))
54 }
55
56 pub async fn read_node_status(&mut self) -> Result<NodeStatus, Error> {
57 NodeStatus::try_from(self.inx.read_node_status(proto::NoParams {}).await?.into_inner()).map_err(Error::InxError)
58 }
59
60 pub async fn read_node_configuration(&mut self) -> Result<NodeConfiguration, Error> {
61 NodeConfiguration::try_from(self.inx.read_node_configuration(proto::NoParams {}).await?.into_inner())
62 .map_err(Error::InxError)
63 }
64
65 pub async fn read_unspent_outputs(
66 &mut self,
67 ) -> Result<impl Stream<Item = Result<crate::UnspentOutput, Error>>, Error> {
68 Ok(self
69 .inx
70 .read_unspent_outputs(proto::NoParams {})
71 .await?
72 .into_inner()
73 .map(unpack_proto_msg))
74 }
75
76 pub async fn read_protocol_parameters(
77 &mut self,
78 request: MilestoneRequest,
79 ) -> Result<crate::ProtocolParameters, Error> {
80 Ok(self
81 .inx
82 .read_protocol_parameters(proto::MilestoneRequest::from(request))
83 .await?
84 .into_inner()
85 .into())
86 }
87
88 pub async fn read_milestone_cone(
90 &mut self,
91 request: MilestoneRequest,
92 ) -> Result<impl Stream<Item = Result<crate::BlockWithMetadata, Error>>, Error> {
93 Ok(self
94 .inx
95 .read_milestone_cone(proto::MilestoneRequest::from(request))
96 .await?
97 .into_inner()
98 .map(unpack_proto_msg))
99 }
100
101 pub async fn read_milestone(&mut self, request: MilestoneRequest) -> Result<Milestone, Error> {
102 Milestone::try_from(
103 self.inx
104 .read_milestone(proto::MilestoneRequest::from(request))
105 .await?
106 .into_inner(),
107 )
108 .map_err(Error::InxError)
109 }
110}