logo
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
use std::convert::TryFrom;
use std::fmt::Debug;

use tonic::transport::Channel;
use tracing::{instrument, trace};

use crate::data::DamlResult;
use crate::data::{DamlCommands, DamlTransaction, DamlTransactionTree};
use crate::grpc_protobuf::com::daml::ledger::api::v1::command_service_client::CommandServiceClient;
use crate::grpc_protobuf::com::daml::ledger::api::v1::{Commands, SubmitAndWaitRequest};
use crate::service::common::make_request;
use crate::util::Required;

/// Submit commands to a Daml ledger and await the completion.
///
/// The Command Service is able to correlate submitted commands with completion data, identify timeouts, and return
/// contextual information with each tracking result. This supports the implementation of stateless clients.
#[derive(Debug)]
pub struct DamlCommandService<'a> {
    channel: Channel,
    ledger_id: &'a str,
    auth_token: Option<&'a str>,
}

impl<'a> DamlCommandService<'a> {
    /// Create a `DamlCommandService` for a given GRPC `channel` and `ledger_id`.
    pub fn new(channel: Channel, ledger_id: &'a str, auth_token: Option<&'a str>) -> Self {
        Self {
            channel,
            ledger_id,
            auth_token,
        }
    }

    /// Override the JWT token to use for this service.
    pub fn with_token(self, auth_token: &'a str) -> Self {
        Self {
            auth_token: Some(auth_token),
            ..self
        }
    }

    /// Override the ledger id to use for this service.
    pub fn with_ledger_id(self, ledger_id: &'a str) -> Self {
        Self {
            ledger_id,
            ..self
        }
    }

    /// Submits a composite [`DamlCommands`] and await the completion.
    ///
    /// This method executes `commands` _synchronously_ on the ledger server (unlike the
    /// [`DamlCommandSubmissionService`] which is executed _asynchronously_ on the ledger server).  This service only
    /// waits for the completion of the execution of the command, not the propagation of any resulting events which
    /// must be consumed via the [`DamlTransactionService`].
    ///
    /// Note that this method is executed _asynchronously_ on the _client_ side and so will immediately return a
    /// future which must be driven to completion before a result can be observed.
    ///
    /// # Errors
    ///
    /// Propagates communication failure errors as [`GrpcTransportError`] and Daml server failures as
    /// [`GRPCStatusError`] errors.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// # use futures::future::Future;
    /// # use chrono::Utc;
    /// # use daml_grpc::data::DamlCommands;
    /// # use daml_grpc::DamlGrpcClientBuilder;
    /// # use daml_grpc::data::DamlResult;
    /// # use std::error::Error;
    /// # fn main() -> DamlResult<()> {
    /// futures::executor::block_on(async {
    /// let ledger_client = DamlGrpcClientBuilder::uri("http://127.0.0.1").connect().await?;
    /// # let commands: DamlCommands = DamlCommands::new("", "", "", "", "", vec![], vec![], vec![], None, None);
    /// let future_command = ledger_client.command_service().submit_and_wait(commands).await;
    /// match future_command {
    ///     Ok(command_id) => assert_eq!("1234", command_id),
    ///     Err(e) => panic!("submit_and_wait failed, error was {}", e.to_string()),
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
    /// [`DamlCommands`]: crate::data::DamlCommands
    /// [`DamlCommandSubmissionService`]: crate::service::DamlCommandSubmissionService
    /// [`DamlTransactionService`]: crate::service::DamlTransactionService
    /// [`submit_and_wait`]: DamlCommandService::submit_and_wait
    /// [`GrpcTransportError`]: crate::data::DamlError::GrpcTransportError
    /// [`GrpcStatusError`]: crate::data::DamlError::GrpcStatusError
    #[instrument(skip(self))]
    pub async fn submit_and_wait(&self, commands: impl Into<DamlCommands> + Debug) -> DamlResult<String> {
        let commands = commands.into();
        let command_id = commands.command_id().to_owned();
        let payload = self.make_payload(commands)?;
        trace!(payload = ?payload, token = ?self.auth_token);
        self.client().submit_and_wait(make_request(payload, self.auth_token)?).await?;
        Ok(command_id)
    }

    /// Submits a composite [`DamlCommands`] and returns the resulting transaction id.
    ///
    /// DOCME fully document this
    /// TODO ugly API returning a tuple as `completion_offset` was recently added, refactor
    #[instrument(skip(self))]
    pub async fn submit_and_wait_for_transaction_id(
        &self,
        commands: impl Into<DamlCommands> + Debug,
    ) -> DamlResult<(String, String)> {
        let payload = self.make_payload(commands)?;
        trace!(payload = ?payload, token = ?self.auth_token);
        let response = self
            .client()
            .submit_and_wait_for_transaction_id(make_request(payload, self.auth_token)?)
            .await?
            .into_inner();
        trace!(?response);
        Ok((response.transaction_id, response.completion_offset))
    }

    /// Submits a composite [`DamlCommands`] and returns the resulting [`DamlTransaction`].
    ///
    /// DOCME fully document this
    /// TODO ugly API returning a tuple as `completion_offset` was recently added, refactor
    #[instrument(skip(self))]
    pub async fn submit_and_wait_for_transaction(
        &self,
        commands: impl Into<DamlCommands> + Debug,
    ) -> DamlResult<(DamlTransaction, String)> {
        let payload = self.make_payload(commands)?;
        trace!(payload = ?payload, token = ?self.auth_token);
        let response =
            self.client().submit_and_wait_for_transaction(make_request(payload, self.auth_token)?).await?.into_inner();
        trace!(?response);
        response.transaction.req().and_then(DamlTransaction::try_from).map(|tree| (tree, response.completion_offset))
    }

    /// Submits a composite [`DamlCommands`] and returns the resulting [`DamlTransactionTree`].
    /// DOCME fully document this
    /// TODO ugly API returning a tuple as `completion_offset` was recently added, refactor
    #[instrument(skip(self))]
    pub async fn submit_and_wait_for_transaction_tree(
        &self,
        commands: impl Into<DamlCommands> + Debug,
    ) -> DamlResult<(DamlTransactionTree, String)> {
        let payload = self.make_payload(commands)?;
        trace!(payload = ?payload, token = ?self.auth_token);
        let response = self
            .client()
            .submit_and_wait_for_transaction_tree(make_request(payload, self.auth_token)?)
            .await?
            .into_inner();
        trace!(?response);
        response
            .transaction
            .req()
            .and_then(DamlTransactionTree::try_from)
            .map(|tree| (tree, response.completion_offset))
    }

    fn client(&self) -> CommandServiceClient<Channel> {
        CommandServiceClient::new(self.channel.clone())
    }

    fn make_payload(&self, commands: impl Into<DamlCommands>) -> DamlResult<SubmitAndWaitRequest> {
        let mut commands = Commands::try_from(commands.into())?;
        commands.ledger_id = self.ledger_id.to_string();
        Ok(SubmitAndWaitRequest {
            commands: Some(commands),
        })
    }
}