1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// Copyright 2022 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use futures::stream::{Stream, StreamExt};
use inx::{proto, proto::inx_client::InxClient, tonic};

use crate::{Error, Milestone, MilestoneRangeRequest, MilestoneRequest, NodeConfiguration, NodeStatus};

/// An INX client connection.
#[derive(Clone, Debug)]
pub struct Inx {
    inx: InxClient<inx::tonic::Channel>,
}

fn unpack_proto_msg<Proto, Bee>(msg: Result<Proto, tonic::Status>) -> Result<Bee, Error>
where
    Bee: TryFrom<Proto, Error = bee_block::InxError>,
{
    let inner = msg.map_err(Error::StatusCode)?;
    Bee::try_from(inner).map_err(Error::InxError)
}

impl Inx {
    /// Connect to the INX interface of a node.
    pub async fn connect(address: String) -> Result<Self, Error> {
        Ok(Self {
            inx: InxClient::connect(address).await?,
        })
    }

    /// Listens to confirmed milestones in the range of
    pub async fn listen_to_confirmed_milestones(
        &mut self,
        request: MilestoneRangeRequest,
    ) -> Result<impl Stream<Item = Result<crate::MilestoneAndProtocolParameters, Error>>, Error> {
        Ok(self
            .inx
            .listen_to_confirmed_milestones(proto::MilestoneRangeRequest::from(request))
            .await?
            .into_inner()
            .map(unpack_proto_msg))
    }

    pub async fn listen_to_ledger_updates(
        &mut self,
        request: MilestoneRangeRequest,
    ) -> Result<impl Stream<Item = Result<crate::LedgerUpdate, Error>>, Error> {
        Ok(self
            .inx
            .listen_to_ledger_updates(proto::MilestoneRangeRequest::from(request))
            .await?
            .into_inner()
            .map(unpack_proto_msg))
    }

    pub async fn read_node_status(&mut self) -> Result<NodeStatus, Error> {
        NodeStatus::try_from(self.inx.read_node_status(proto::NoParams {}).await?.into_inner()).map_err(Error::InxError)
    }

    pub async fn read_node_configuration(&mut self) -> Result<NodeConfiguration, Error> {
        NodeConfiguration::try_from(self.inx.read_node_configuration(proto::NoParams {}).await?.into_inner())
            .map_err(Error::InxError)
    }

    pub async fn read_unspent_outputs(
        &mut self,
    ) -> Result<impl Stream<Item = Result<crate::UnspentOutput, Error>>, Error> {
        Ok(self
            .inx
            .read_unspent_outputs(proto::NoParams {})
            .await?
            .into_inner()
            .map(unpack_proto_msg))
    }

    pub async fn read_protocol_parameters(
        &mut self,
        request: MilestoneRequest,
    ) -> Result<crate::RawProtocolParameters, Error> {
        Ok(self
            .inx
            .read_protocol_parameters(proto::MilestoneRequest::from(request))
            .await?
            .into_inner()
            .into())
    }

    /// Reads the past cone of a milestone specified by a [`MilestoneRequest`].
    pub async fn read_milestone_cone(
        &mut self,
        request: MilestoneRequest,
    ) -> Result<impl Stream<Item = Result<crate::BlockWithMetadata, Error>>, Error> {
        Ok(self
            .inx
            .read_milestone_cone(proto::MilestoneRequest::from(request))
            .await?
            .into_inner()
            .map(unpack_proto_msg))
    }

    pub async fn read_milestone(&mut self, request: MilestoneRequest) -> Result<Milestone, Error> {
        Milestone::try_from(
            self.inx
                .read_milestone(proto::MilestoneRequest::from(request))
                .await?
                .into_inner(),
        )
        .map_err(Error::InxError)
    }
}