daml_grpc/service/
daml_transaction_service.rs

1use std::convert::TryFrom;
2use std::fmt::Debug;
3
4use futures::Stream;
5use futures::StreamExt;
6use tonic::transport::Channel;
7use tracing::{instrument, trace};
8
9use crate::data::filter::DamlTransactionFilter;
10use crate::data::offset::{DamlLedgerOffset, DamlLedgerOffsetType};
11use crate::data::DamlError;
12use crate::data::DamlResult;
13use crate::data::DamlTransaction;
14use crate::data::DamlTransactionTree;
15use crate::grpc_protobuf::com::daml::ledger::api::v1::transaction_service_client::TransactionServiceClient;
16use crate::grpc_protobuf::com::daml::ledger::api::v1::{
17    GetLedgerEndRequest, GetTransactionByEventIdRequest, GetTransactionByIdRequest, GetTransactionTreesResponse,
18    GetTransactionsRequest, GetTransactionsResponse, LedgerOffset, TransactionFilter,
19};
20use crate::service::common::make_request;
21use crate::service::DamlVerbosity;
22use crate::util::Required;
23
24/// Read transactions from a Daml ledger.
25#[derive(Debug)]
26pub struct DamlTransactionService<'a> {
27    channel: Channel,
28    ledger_id: &'a str,
29    auth_token: Option<&'a str>,
30}
31
32impl<'a> DamlTransactionService<'a> {
33    pub fn new(channel: Channel, ledger_id: &'a str, auth_token: Option<&'a str>) -> Self {
34        Self {
35            channel,
36            ledger_id,
37            auth_token,
38        }
39    }
40
41    /// Override the JWT token to use for this service.
42    pub fn with_token(self, auth_token: &'a str) -> Self {
43        Self {
44            auth_token: Some(auth_token),
45            ..self
46        }
47    }
48
49    /// Override the ledger id to use for this service.
50    pub fn with_ledger_id(self, ledger_id: &'a str) -> Self {
51        Self {
52            ledger_id,
53            ..self
54        }
55    }
56
57    /// DOCME fully document this
58    #[instrument(skip(self))]
59    pub async fn get_transactions(
60        &self,
61        begin: impl Into<DamlLedgerOffset> + Debug,
62        end: impl Into<DamlLedgerOffsetType> + Debug,
63        filter: impl Into<DamlTransactionFilter> + Debug,
64        verbose: impl Into<DamlVerbosity> + Debug,
65    ) -> DamlResult<impl Stream<Item = DamlResult<Vec<DamlTransaction>>>> {
66        let payload = self.make_transactions_payload(begin, end, filter, verbose);
67        trace!(payload = ?payload, token = ?self.auth_token);
68        let transaction_stream =
69            self.client().get_transactions(make_request(payload, self.auth_token)?).await?.into_inner();
70        Ok(transaction_stream.inspect(|response| trace!(?response)).map(|item: Result<GetTransactionsResponse, _>| {
71            match item {
72                Ok(r) => Ok(r
73                    .transactions
74                    .into_iter()
75                    .map(DamlTransaction::try_from)
76                    .collect::<DamlResult<Vec<DamlTransaction>>>()?),
77                Err(e) => Err(DamlError::from(e)),
78            }
79        }))
80    }
81
82    /// DOCME fully document this
83    #[instrument(skip(self))]
84    pub async fn get_transaction_trees(
85        &self,
86        begin: impl Into<DamlLedgerOffset> + Debug,
87        end: impl Into<DamlLedgerOffsetType> + Debug,
88        filter: impl Into<DamlTransactionFilter> + Debug,
89        verbose: impl Into<DamlVerbosity> + Debug,
90    ) -> DamlResult<impl Stream<Item = DamlResult<Vec<DamlTransactionTree>>>> {
91        let payload = self.make_transactions_payload(begin, end, filter, verbose);
92        trace!(payload = ?payload, token = ?self.auth_token);
93        let transaction_stream =
94            self.client().get_transaction_trees(make_request(payload, self.auth_token)?).await?.into_inner();
95        Ok(transaction_stream.inspect(|response| trace!(?response)).map(
96            |item: Result<GetTransactionTreesResponse, _>| match item {
97                Ok(r) => Ok(r
98                    .transactions
99                    .into_iter()
100                    .map(DamlTransactionTree::try_from)
101                    .collect::<DamlResult<Vec<DamlTransactionTree>>>()?),
102                Err(e) => Err(DamlError::from(e)),
103            },
104        ))
105    }
106
107    /// DOCME fully document this
108    #[instrument(skip(self))]
109    pub async fn get_transaction_by_event_id(
110        &self,
111        event_id: impl Into<String> + Debug,
112        parties: impl Into<Vec<String>> + Debug,
113    ) -> DamlResult<DamlTransactionTree> {
114        let payload = self.make_by_event_id_payload(event_id, parties);
115        trace!(payload = ?payload, token = ?self.auth_token);
116        let response =
117            self.client().get_transaction_by_event_id(make_request(payload, self.auth_token)?).await?.into_inner();
118        trace!(?response);
119        DamlTransactionTree::try_from(response.transaction.req()?)
120    }
121
122    /// DOCME fully document this
123    #[instrument(skip(self))]
124    pub async fn get_transaction_by_id(
125        &self,
126        transaction_id: impl Into<String> + Debug,
127        parties: impl Into<Vec<String>> + Debug,
128    ) -> DamlResult<DamlTransactionTree> {
129        let payload = self.make_by_id_payload(transaction_id, parties);
130        trace!(payload = ?payload, token = ?self.auth_token);
131        let response = self.client().get_transaction_by_id(make_request(payload, self.auth_token)?).await?.into_inner();
132        trace!(?response);
133        DamlTransactionTree::try_from(response.transaction.req()?)
134    }
135
136    /// DOCME fully document this
137    #[instrument(skip(self))]
138    pub async fn get_flat_transaction_by_event_id(
139        &self,
140        event_id: impl Into<String> + Debug,
141        parties: impl Into<Vec<String>> + Debug,
142    ) -> DamlResult<DamlTransaction> {
143        let payload = self.make_by_event_id_payload(event_id, parties);
144        trace!(payload = ?payload, token = ?self.auth_token);
145        let response =
146            self.client().get_flat_transaction_by_event_id(make_request(payload, self.auth_token)?).await?.into_inner();
147        trace!(?response);
148        DamlTransaction::try_from(response.transaction.req()?)
149    }
150
151    /// DOCME fully document this
152    #[instrument(skip(self))]
153    pub async fn get_flat_transaction_by_id(
154        &self,
155        transaction_id: impl Into<String> + Debug,
156        parties: impl Into<Vec<String>> + Debug,
157    ) -> DamlResult<DamlTransaction> {
158        let payload = self.make_by_id_payload(transaction_id, parties);
159        trace!(payload = ?payload, token = ?self.auth_token);
160        let response =
161            self.client().get_flat_transaction_by_id(make_request(payload, self.auth_token)?).await?.into_inner();
162        trace!(?response);
163        DamlTransaction::try_from(response.transaction.req()?)
164    }
165
166    /// DOCME fully document this
167    #[instrument(skip(self))]
168    pub async fn get_ledger_end(&self) -> DamlResult<DamlLedgerOffset> {
169        let payload = GetLedgerEndRequest {
170            ledger_id: self.ledger_id.to_string(),
171        };
172        trace!(payload = ?payload, token = ?self.auth_token);
173        let response = self.client().get_ledger_end(make_request(payload, self.auth_token)?).await?.into_inner();
174        trace!(?response);
175        DamlLedgerOffset::try_from(response.offset.req()?)
176    }
177
178    fn client(&self) -> TransactionServiceClient<Channel> {
179        TransactionServiceClient::new(self.channel.clone())
180    }
181
182    fn make_transactions_payload(
183        &self,
184        begin: impl Into<DamlLedgerOffset>,
185        end: impl Into<DamlLedgerOffsetType>,
186        filter: impl Into<DamlTransactionFilter>,
187        verbose: impl Into<DamlVerbosity>,
188    ) -> GetTransactionsRequest {
189        GetTransactionsRequest {
190            ledger_id: self.ledger_id.to_string(),
191            begin: Some(LedgerOffset::from(begin.into())),
192            end: match end.into() {
193                DamlLedgerOffsetType::Unbounded => None,
194                DamlLedgerOffsetType::Bounded(b) => Some(LedgerOffset::from(b)),
195            },
196            filter: Some(TransactionFilter::from(filter.into())),
197            verbose: bool::from(verbose.into()),
198        }
199    }
200
201    fn make_by_event_id_payload(
202        &self,
203        event_id: impl Into<String>,
204        parties: impl Into<Vec<String>>,
205    ) -> GetTransactionByEventIdRequest {
206        GetTransactionByEventIdRequest {
207            ledger_id: self.ledger_id.to_string(),
208            event_id: event_id.into(),
209            requesting_parties: parties.into(),
210        }
211    }
212
213    fn make_by_id_payload(
214        &self,
215        transaction_id: impl Into<String>,
216        parties: impl Into<Vec<String>>,
217    ) -> GetTransactionByIdRequest {
218        GetTransactionByIdRequest {
219            ledger_id: self.ledger_id.to_string(),
220            transaction_id: transaction_id.into(),
221            requesting_parties: parties.into(),
222        }
223    }
224}