1use std::convert::TryFrom;
2use std::fmt::Debug;
3
4use futures::Stream;
5use futures::StreamExt;
6use tonic::transport::Channel;
7use tracing::{instrument, trace};
8
9use crate::data::filter::DamlTransactionFilter;
10use crate::data::offset::{DamlLedgerOffset, DamlLedgerOffsetType};
11use crate::data::DamlError;
12use crate::data::DamlResult;
13use crate::data::DamlTransaction;
14use crate::data::DamlTransactionTree;
15use crate::grpc_protobuf::com::daml::ledger::api::v1::transaction_service_client::TransactionServiceClient;
16use crate::grpc_protobuf::com::daml::ledger::api::v1::{
17 GetLedgerEndRequest, GetTransactionByEventIdRequest, GetTransactionByIdRequest, GetTransactionTreesResponse,
18 GetTransactionsRequest, GetTransactionsResponse, LedgerOffset, TransactionFilter,
19};
20use crate::service::common::make_request;
21use crate::service::DamlVerbosity;
22use crate::util::Required;
23
24#[derive(Debug)]
26pub struct DamlTransactionService<'a> {
27 channel: Channel,
28 ledger_id: &'a str,
29 auth_token: Option<&'a str>,
30}
31
32impl<'a> DamlTransactionService<'a> {
33 pub fn new(channel: Channel, ledger_id: &'a str, auth_token: Option<&'a str>) -> Self {
34 Self {
35 channel,
36 ledger_id,
37 auth_token,
38 }
39 }
40
41 pub fn with_token(self, auth_token: &'a str) -> Self {
43 Self {
44 auth_token: Some(auth_token),
45 ..self
46 }
47 }
48
49 pub fn with_ledger_id(self, ledger_id: &'a str) -> Self {
51 Self {
52 ledger_id,
53 ..self
54 }
55 }
56
57 #[instrument(skip(self))]
59 pub async fn get_transactions(
60 &self,
61 begin: impl Into<DamlLedgerOffset> + Debug,
62 end: impl Into<DamlLedgerOffsetType> + Debug,
63 filter: impl Into<DamlTransactionFilter> + Debug,
64 verbose: impl Into<DamlVerbosity> + Debug,
65 ) -> DamlResult<impl Stream<Item = DamlResult<Vec<DamlTransaction>>>> {
66 let payload = self.make_transactions_payload(begin, end, filter, verbose);
67 trace!(payload = ?payload, token = ?self.auth_token);
68 let transaction_stream =
69 self.client().get_transactions(make_request(payload, self.auth_token)?).await?.into_inner();
70 Ok(transaction_stream.inspect(|response| trace!(?response)).map(|item: Result<GetTransactionsResponse, _>| {
71 match item {
72 Ok(r) => Ok(r
73 .transactions
74 .into_iter()
75 .map(DamlTransaction::try_from)
76 .collect::<DamlResult<Vec<DamlTransaction>>>()?),
77 Err(e) => Err(DamlError::from(e)),
78 }
79 }))
80 }
81
82 #[instrument(skip(self))]
84 pub async fn get_transaction_trees(
85 &self,
86 begin: impl Into<DamlLedgerOffset> + Debug,
87 end: impl Into<DamlLedgerOffsetType> + Debug,
88 filter: impl Into<DamlTransactionFilter> + Debug,
89 verbose: impl Into<DamlVerbosity> + Debug,
90 ) -> DamlResult<impl Stream<Item = DamlResult<Vec<DamlTransactionTree>>>> {
91 let payload = self.make_transactions_payload(begin, end, filter, verbose);
92 trace!(payload = ?payload, token = ?self.auth_token);
93 let transaction_stream =
94 self.client().get_transaction_trees(make_request(payload, self.auth_token)?).await?.into_inner();
95 Ok(transaction_stream.inspect(|response| trace!(?response)).map(
96 |item: Result<GetTransactionTreesResponse, _>| match item {
97 Ok(r) => Ok(r
98 .transactions
99 .into_iter()
100 .map(DamlTransactionTree::try_from)
101 .collect::<DamlResult<Vec<DamlTransactionTree>>>()?),
102 Err(e) => Err(DamlError::from(e)),
103 },
104 ))
105 }
106
107 #[instrument(skip(self))]
109 pub async fn get_transaction_by_event_id(
110 &self,
111 event_id: impl Into<String> + Debug,
112 parties: impl Into<Vec<String>> + Debug,
113 ) -> DamlResult<DamlTransactionTree> {
114 let payload = self.make_by_event_id_payload(event_id, parties);
115 trace!(payload = ?payload, token = ?self.auth_token);
116 let response =
117 self.client().get_transaction_by_event_id(make_request(payload, self.auth_token)?).await?.into_inner();
118 trace!(?response);
119 DamlTransactionTree::try_from(response.transaction.req()?)
120 }
121
122 #[instrument(skip(self))]
124 pub async fn get_transaction_by_id(
125 &self,
126 transaction_id: impl Into<String> + Debug,
127 parties: impl Into<Vec<String>> + Debug,
128 ) -> DamlResult<DamlTransactionTree> {
129 let payload = self.make_by_id_payload(transaction_id, parties);
130 trace!(payload = ?payload, token = ?self.auth_token);
131 let response = self.client().get_transaction_by_id(make_request(payload, self.auth_token)?).await?.into_inner();
132 trace!(?response);
133 DamlTransactionTree::try_from(response.transaction.req()?)
134 }
135
136 #[instrument(skip(self))]
138 pub async fn get_flat_transaction_by_event_id(
139 &self,
140 event_id: impl Into<String> + Debug,
141 parties: impl Into<Vec<String>> + Debug,
142 ) -> DamlResult<DamlTransaction> {
143 let payload = self.make_by_event_id_payload(event_id, parties);
144 trace!(payload = ?payload, token = ?self.auth_token);
145 let response =
146 self.client().get_flat_transaction_by_event_id(make_request(payload, self.auth_token)?).await?.into_inner();
147 trace!(?response);
148 DamlTransaction::try_from(response.transaction.req()?)
149 }
150
151 #[instrument(skip(self))]
153 pub async fn get_flat_transaction_by_id(
154 &self,
155 transaction_id: impl Into<String> + Debug,
156 parties: impl Into<Vec<String>> + Debug,
157 ) -> DamlResult<DamlTransaction> {
158 let payload = self.make_by_id_payload(transaction_id, parties);
159 trace!(payload = ?payload, token = ?self.auth_token);
160 let response =
161 self.client().get_flat_transaction_by_id(make_request(payload, self.auth_token)?).await?.into_inner();
162 trace!(?response);
163 DamlTransaction::try_from(response.transaction.req()?)
164 }
165
166 #[instrument(skip(self))]
168 pub async fn get_ledger_end(&self) -> DamlResult<DamlLedgerOffset> {
169 let payload = GetLedgerEndRequest {
170 ledger_id: self.ledger_id.to_string(),
171 };
172 trace!(payload = ?payload, token = ?self.auth_token);
173 let response = self.client().get_ledger_end(make_request(payload, self.auth_token)?).await?.into_inner();
174 trace!(?response);
175 DamlLedgerOffset::try_from(response.offset.req()?)
176 }
177
178 fn client(&self) -> TransactionServiceClient<Channel> {
179 TransactionServiceClient::new(self.channel.clone())
180 }
181
182 fn make_transactions_payload(
183 &self,
184 begin: impl Into<DamlLedgerOffset>,
185 end: impl Into<DamlLedgerOffsetType>,
186 filter: impl Into<DamlTransactionFilter>,
187 verbose: impl Into<DamlVerbosity>,
188 ) -> GetTransactionsRequest {
189 GetTransactionsRequest {
190 ledger_id: self.ledger_id.to_string(),
191 begin: Some(LedgerOffset::from(begin.into())),
192 end: match end.into() {
193 DamlLedgerOffsetType::Unbounded => None,
194 DamlLedgerOffsetType::Bounded(b) => Some(LedgerOffset::from(b)),
195 },
196 filter: Some(TransactionFilter::from(filter.into())),
197 verbose: bool::from(verbose.into()),
198 }
199 }
200
201 fn make_by_event_id_payload(
202 &self,
203 event_id: impl Into<String>,
204 parties: impl Into<Vec<String>>,
205 ) -> GetTransactionByEventIdRequest {
206 GetTransactionByEventIdRequest {
207 ledger_id: self.ledger_id.to_string(),
208 event_id: event_id.into(),
209 requesting_parties: parties.into(),
210 }
211 }
212
213 fn make_by_id_payload(
214 &self,
215 transaction_id: impl Into<String>,
216 parties: impl Into<Vec<String>>,
217 ) -> GetTransactionByIdRequest {
218 GetTransactionByIdRequest {
219 ledger_id: self.ledger_id.to_string(),
220 transaction_id: transaction_id.into(),
221 requesting_parties: parties.into(),
222 }
223 }
224}