daml_grpc/service/
daml_active_contracts_service.rs

1use std::convert::TryFrom;
2use std::fmt::Debug;
3
4use futures::Stream;
5use futures::StreamExt;
6use tonic::transport::Channel;
7use tracing::{instrument, trace};
8
9use crate::data::filter::DamlTransactionFilter;
10use crate::data::DamlActiveContracts;
11use crate::data::DamlError;
12use crate::data::DamlResult;
13use crate::grpc_protobuf::com::daml::ledger::api::v1::active_contracts_service_client::ActiveContractsServiceClient;
14use crate::grpc_protobuf::com::daml::ledger::api::v1::{GetActiveContractsRequest, TransactionFilter};
15use crate::service::common::make_request;
16use crate::service::DamlVerbosity;
17
18/// Returns a stream of the active contracts on a Daml ledger.
19///
20/// Allows clients to initialize themselves according to a fairly recent state of the ledger without reading through
21/// all transactions that were committed since the ledger’s creation.
22///
23/// Getting an empty stream means that the active contracts set is empty and the client should listen to transactions
24/// using [`DamlLedgerOffsetBoundary::Begin`].  Clients SHOULD NOT assume that the set of active contracts they receive
25/// reflects the state at the ledger end.
26///
27/// [`DamlLedgerOffsetBoundary::Begin`]: crate::data::offset::DamlLedgerOffsetBoundary::Begin
28#[derive(Debug)]
29pub struct DamlActiveContractsService<'a> {
30    channel: Channel,
31    ledger_id: &'a str,
32    auth_token: Option<&'a str>,
33}
34
35impl<'a> DamlActiveContractsService<'a> {
36    pub fn new(channel: Channel, ledger_id: &'a str, auth_token: Option<&'a str>) -> Self {
37        Self {
38            channel,
39            ledger_id,
40            auth_token,
41        }
42    }
43
44    /// Override the JWT token to use for this service.
45    pub fn with_token(self, auth_token: &'a str) -> Self {
46        Self {
47            auth_token: Some(auth_token),
48            ..self
49        }
50    }
51
52    /// Override the ledger id to use for this service.
53    pub fn with_ledger_id(self, ledger_id: &'a str) -> Self {
54        Self {
55            ledger_id,
56            ..self
57        }
58    }
59
60    #[instrument(skip(self))]
61    pub async fn get_active_contracts(
62        &self,
63        filter: impl Into<DamlTransactionFilter> + Debug,
64        verbose: impl Into<DamlVerbosity> + Debug,
65    ) -> DamlResult<impl Stream<Item = DamlResult<DamlActiveContracts>>> {
66        let payload = GetActiveContractsRequest {
67            ledger_id: self.ledger_id.to_string(),
68            filter: Some(TransactionFilter::from(filter.into())),
69            verbose: bool::from(verbose.into()),
70        };
71        trace!(payload = ?payload, token = ?self.auth_token);
72        let active_contract_stream =
73            self.client().get_active_contracts(make_request(payload, self.auth_token)?).await?.into_inner();
74        Ok(active_contract_stream.inspect(|response| trace!(?response)).map(|response| match response {
75            Ok(c) => DamlActiveContracts::try_from(c),
76            Err(e) => Err(DamlError::from(e)),
77        }))
78    }
79
80    fn client(&self) -> ActiveContractsServiceClient<Channel> {
81        ActiveContractsServiceClient::new(self.channel.clone())
82    }
83}