daml_grpc/service/daml_command_service.rs
1use std::convert::TryFrom;
2use std::fmt::Debug;
3
4use tonic::transport::Channel;
5use tracing::{instrument, trace};
6
7use crate::data::DamlResult;
8use crate::data::{DamlCommands, DamlTransaction, DamlTransactionTree};
9use crate::grpc_protobuf::com::daml::ledger::api::v1::command_service_client::CommandServiceClient;
10use crate::grpc_protobuf::com::daml::ledger::api::v1::{Commands, SubmitAndWaitRequest};
11use crate::service::common::make_request;
12use crate::util::Required;
13
14/// Submit commands to a Daml ledger and await the completion.
15///
16/// The Command Service is able to correlate submitted commands with completion data, identify timeouts, and return
17/// contextual information with each tracking result. This supports the implementation of stateless clients.
18#[derive(Debug)]
19pub struct DamlCommandService<'a> {
20 channel: Channel,
21 ledger_id: &'a str,
22 auth_token: Option<&'a str>,
23}
24
25impl<'a> DamlCommandService<'a> {
26 /// Create a `DamlCommandService` for a given GRPC `channel` and `ledger_id`.
27 pub fn new(channel: Channel, ledger_id: &'a str, auth_token: Option<&'a str>) -> Self {
28 Self {
29 channel,
30 ledger_id,
31 auth_token,
32 }
33 }
34
35 /// Override the JWT token to use for this service.
36 pub fn with_token(self, auth_token: &'a str) -> Self {
37 Self {
38 auth_token: Some(auth_token),
39 ..self
40 }
41 }
42
43 /// Override the ledger id to use for this service.
44 pub fn with_ledger_id(self, ledger_id: &'a str) -> Self {
45 Self {
46 ledger_id,
47 ..self
48 }
49 }
50
51 /// Submits a composite [`DamlCommands`] and await the completion.
52 ///
53 /// This method executes `commands` _synchronously_ on the ledger server (unlike the
54 /// [`DamlCommandSubmissionService`] which is executed _asynchronously_ on the ledger server). This service only
55 /// waits for the completion of the execution of the command, not the propagation of any resulting events which
56 /// must be consumed via the [`DamlTransactionService`].
57 ///
58 /// Note that this method is executed _asynchronously_ on the _client_ side and so will immediately return a
59 /// future which must be driven to completion before a result can be observed.
60 ///
61 /// # Errors
62 ///
63 /// Propagates communication failure errors as [`GrpcTransportError`] and Daml server failures as
64 /// [`GRPCStatusError`] errors.
65 ///
66 /// # Examples
67 ///
68 /// ```no_run
69 /// # use futures::future::Future;
70 /// # use chrono::Utc;
71 /// # use daml_grpc::data::DamlCommands;
72 /// # use daml_grpc::DamlGrpcClientBuilder;
73 /// # use daml_grpc::data::DamlResult;
74 /// # use std::error::Error;
75 /// # fn main() -> DamlResult<()> {
76 /// # futures::executor::block_on(async {
77 /// let ledger_client = DamlGrpcClientBuilder::uri("http://127.0.0.1").connect().await?;
78 /// # let commands: DamlCommands = DamlCommands::new("", "", "", "", "", vec![], vec![], vec![], None, None);
79 /// let future_command = ledger_client.command_service().submit_and_wait(commands).await;
80 /// match future_command {
81 /// Ok(command_id) => assert_eq!("1234", command_id),
82 /// Err(e) => panic!("submit_and_wait failed, error was {}", e.to_string()),
83 /// }
84 /// # Ok(())
85 /// # })
86 /// # }
87 /// ```
88 /// [`DamlCommands`]: crate::data::DamlCommands
89 /// [`DamlCommandSubmissionService`]: crate::service::DamlCommandSubmissionService
90 /// [`DamlTransactionService`]: crate::service::DamlTransactionService
91 /// [`submit_and_wait`]: DamlCommandService::submit_and_wait
92 /// [`GrpcTransportError`]: crate::data::DamlError::GrpcTransportError
93 /// [`GrpcStatusError`]: crate::data::DamlError::GrpcStatusError
94 #[instrument(skip(self))]
95 pub async fn submit_and_wait(&self, commands: impl Into<DamlCommands> + Debug) -> DamlResult<String> {
96 let commands = commands.into();
97 let command_id = commands.command_id().to_owned();
98 let payload = self.make_payload(commands)?;
99 trace!(payload = ?payload, token = ?self.auth_token);
100 self.client().submit_and_wait(make_request(payload, self.auth_token)?).await?;
101 Ok(command_id)
102 }
103
104 /// Submits a composite [`DamlCommands`] and returns the resulting transaction id.
105 ///
106 /// DOCME fully document this
107 /// TODO ugly API returning a tuple as `completion_offset` was recently added, refactor
108 #[instrument(skip(self))]
109 pub async fn submit_and_wait_for_transaction_id(
110 &self,
111 commands: impl Into<DamlCommands> + Debug,
112 ) -> DamlResult<(String, String)> {
113 let payload = self.make_payload(commands)?;
114 trace!(payload = ?payload, token = ?self.auth_token);
115 let response = self
116 .client()
117 .submit_and_wait_for_transaction_id(make_request(payload, self.auth_token)?)
118 .await?
119 .into_inner();
120 trace!(?response);
121 Ok((response.transaction_id, response.completion_offset))
122 }
123
124 /// Submits a composite [`DamlCommands`] and returns the resulting [`DamlTransaction`].
125 ///
126 /// DOCME fully document this
127 /// TODO ugly API returning a tuple as `completion_offset` was recently added, refactor
128 #[instrument(skip(self))]
129 pub async fn submit_and_wait_for_transaction(
130 &self,
131 commands: impl Into<DamlCommands> + Debug,
132 ) -> DamlResult<(DamlTransaction, String)> {
133 let payload = self.make_payload(commands)?;
134 trace!(payload = ?payload, token = ?self.auth_token);
135 let response =
136 self.client().submit_and_wait_for_transaction(make_request(payload, self.auth_token)?).await?.into_inner();
137 trace!(?response);
138 response.transaction.req().and_then(DamlTransaction::try_from).map(|tree| (tree, response.completion_offset))
139 }
140
141 /// Submits a composite [`DamlCommands`] and returns the resulting [`DamlTransactionTree`].
142 /// DOCME fully document this
143 /// TODO ugly API returning a tuple as `completion_offset` was recently added, refactor
144 #[instrument(skip(self))]
145 pub async fn submit_and_wait_for_transaction_tree(
146 &self,
147 commands: impl Into<DamlCommands> + Debug,
148 ) -> DamlResult<(DamlTransactionTree, String)> {
149 let payload = self.make_payload(commands)?;
150 trace!(payload = ?payload, token = ?self.auth_token);
151 let response = self
152 .client()
153 .submit_and_wait_for_transaction_tree(make_request(payload, self.auth_token)?)
154 .await?
155 .into_inner();
156 trace!(?response);
157 response
158 .transaction
159 .req()
160 .and_then(DamlTransactionTree::try_from)
161 .map(|tree| (tree, response.completion_offset))
162 }
163
164 fn client(&self) -> CommandServiceClient<Channel> {
165 CommandServiceClient::new(self.channel.clone())
166 }
167
168 fn make_payload(&self, commands: impl Into<DamlCommands>) -> DamlResult<SubmitAndWaitRequest> {
169 let mut commands = Commands::try_from(commands.into())?;
170 commands.ledger_id = self.ledger_id.to_string();
171 Ok(SubmitAndWaitRequest {
172 commands: Some(commands),
173 })
174 }
175}