daml_grpc/service/
daml_command_service.rs

1use std::convert::TryFrom;
2use std::fmt::Debug;
3
4use tonic::transport::Channel;
5use tracing::{instrument, trace};
6
7use crate::data::DamlResult;
8use crate::data::{DamlCommands, DamlTransaction, DamlTransactionTree};
9use crate::grpc_protobuf::com::daml::ledger::api::v1::command_service_client::CommandServiceClient;
10use crate::grpc_protobuf::com::daml::ledger::api::v1::{Commands, SubmitAndWaitRequest};
11use crate::service::common::make_request;
12use crate::util::Required;
13
14/// Submit commands to a Daml ledger and await the completion.
15///
16/// The Command Service is able to correlate submitted commands with completion data, identify timeouts, and return
17/// contextual information with each tracking result. This supports the implementation of stateless clients.
18#[derive(Debug)]
19pub struct DamlCommandService<'a> {
20    channel: Channel,
21    ledger_id: &'a str,
22    auth_token: Option<&'a str>,
23}
24
25impl<'a> DamlCommandService<'a> {
26    /// Create a `DamlCommandService` for a given GRPC `channel` and `ledger_id`.
27    pub fn new(channel: Channel, ledger_id: &'a str, auth_token: Option<&'a str>) -> Self {
28        Self {
29            channel,
30            ledger_id,
31            auth_token,
32        }
33    }
34
35    /// Override the JWT token to use for this service.
36    pub fn with_token(self, auth_token: &'a str) -> Self {
37        Self {
38            auth_token: Some(auth_token),
39            ..self
40        }
41    }
42
43    /// Override the ledger id to use for this service.
44    pub fn with_ledger_id(self, ledger_id: &'a str) -> Self {
45        Self {
46            ledger_id,
47            ..self
48        }
49    }
50
51    /// Submits a composite [`DamlCommands`] and await the completion.
52    ///
53    /// This method executes `commands` _synchronously_ on the ledger server (unlike the
54    /// [`DamlCommandSubmissionService`] which is executed _asynchronously_ on the ledger server).  This service only
55    /// waits for the completion of the execution of the command, not the propagation of any resulting events which
56    /// must be consumed via the [`DamlTransactionService`].
57    ///
58    /// Note that this method is executed _asynchronously_ on the _client_ side and so will immediately return a
59    /// future which must be driven to completion before a result can be observed.
60    ///
61    /// # Errors
62    ///
63    /// Propagates communication failure errors as [`GrpcTransportError`] and Daml server failures as
64    /// [`GRPCStatusError`] errors.
65    ///
66    /// # Examples
67    ///
68    /// ```no_run
69    /// # use futures::future::Future;
70    /// # use chrono::Utc;
71    /// # use daml_grpc::data::DamlCommands;
72    /// # use daml_grpc::DamlGrpcClientBuilder;
73    /// # use daml_grpc::data::DamlResult;
74    /// # use std::error::Error;
75    /// # fn main() -> DamlResult<()> {
76    /// # futures::executor::block_on(async {
77    /// let ledger_client = DamlGrpcClientBuilder::uri("http://127.0.0.1").connect().await?;
78    /// # let commands: DamlCommands = DamlCommands::new("", "", "", "", "", vec![], vec![], vec![], None, None);
79    /// let future_command = ledger_client.command_service().submit_and_wait(commands).await;
80    /// match future_command {
81    ///     Ok(command_id) => assert_eq!("1234", command_id),
82    ///     Err(e) => panic!("submit_and_wait failed, error was {}", e.to_string()),
83    /// }
84    /// # Ok(())
85    /// # })
86    /// # }
87    /// ```
88    /// [`DamlCommands`]: crate::data::DamlCommands
89    /// [`DamlCommandSubmissionService`]: crate::service::DamlCommandSubmissionService
90    /// [`DamlTransactionService`]: crate::service::DamlTransactionService
91    /// [`submit_and_wait`]: DamlCommandService::submit_and_wait
92    /// [`GrpcTransportError`]: crate::data::DamlError::GrpcTransportError
93    /// [`GrpcStatusError`]: crate::data::DamlError::GrpcStatusError
94    #[instrument(skip(self))]
95    pub async fn submit_and_wait(&self, commands: impl Into<DamlCommands> + Debug) -> DamlResult<String> {
96        let commands = commands.into();
97        let command_id = commands.command_id().to_owned();
98        let payload = self.make_payload(commands)?;
99        trace!(payload = ?payload, token = ?self.auth_token);
100        self.client().submit_and_wait(make_request(payload, self.auth_token)?).await?;
101        Ok(command_id)
102    }
103
104    /// Submits a composite [`DamlCommands`] and returns the resulting transaction id.
105    ///
106    /// DOCME fully document this
107    /// TODO ugly API returning a tuple as `completion_offset` was recently added, refactor
108    #[instrument(skip(self))]
109    pub async fn submit_and_wait_for_transaction_id(
110        &self,
111        commands: impl Into<DamlCommands> + Debug,
112    ) -> DamlResult<(String, String)> {
113        let payload = self.make_payload(commands)?;
114        trace!(payload = ?payload, token = ?self.auth_token);
115        let response = self
116            .client()
117            .submit_and_wait_for_transaction_id(make_request(payload, self.auth_token)?)
118            .await?
119            .into_inner();
120        trace!(?response);
121        Ok((response.transaction_id, response.completion_offset))
122    }
123
124    /// Submits a composite [`DamlCommands`] and returns the resulting [`DamlTransaction`].
125    ///
126    /// DOCME fully document this
127    /// TODO ugly API returning a tuple as `completion_offset` was recently added, refactor
128    #[instrument(skip(self))]
129    pub async fn submit_and_wait_for_transaction(
130        &self,
131        commands: impl Into<DamlCommands> + Debug,
132    ) -> DamlResult<(DamlTransaction, String)> {
133        let payload = self.make_payload(commands)?;
134        trace!(payload = ?payload, token = ?self.auth_token);
135        let response =
136            self.client().submit_and_wait_for_transaction(make_request(payload, self.auth_token)?).await?.into_inner();
137        trace!(?response);
138        response.transaction.req().and_then(DamlTransaction::try_from).map(|tree| (tree, response.completion_offset))
139    }
140
141    /// Submits a composite [`DamlCommands`] and returns the resulting [`DamlTransactionTree`].
142    /// DOCME fully document this
143    /// TODO ugly API returning a tuple as `completion_offset` was recently added, refactor
144    #[instrument(skip(self))]
145    pub async fn submit_and_wait_for_transaction_tree(
146        &self,
147        commands: impl Into<DamlCommands> + Debug,
148    ) -> DamlResult<(DamlTransactionTree, String)> {
149        let payload = self.make_payload(commands)?;
150        trace!(payload = ?payload, token = ?self.auth_token);
151        let response = self
152            .client()
153            .submit_and_wait_for_transaction_tree(make_request(payload, self.auth_token)?)
154            .await?
155            .into_inner();
156        trace!(?response);
157        response
158            .transaction
159            .req()
160            .and_then(DamlTransactionTree::try_from)
161            .map(|tree| (tree, response.completion_offset))
162    }
163
164    fn client(&self) -> CommandServiceClient<Channel> {
165        CommandServiceClient::new(self.channel.clone())
166    }
167
168    fn make_payload(&self, commands: impl Into<DamlCommands>) -> DamlResult<SubmitAndWaitRequest> {
169        let mut commands = Commands::try_from(commands.into())?;
170        commands.ledger_id = self.ledger_id.to_string();
171        Ok(SubmitAndWaitRequest {
172            commands: Some(commands),
173        })
174    }
175}