daml_grpc/
ledger_client.rs

1use crate::data::{DamlError, DamlResult};
2use crate::service::{
3    DamlActiveContractsService, DamlCommandCompletionService, DamlCommandService, DamlCommandSubmissionService,
4    DamlLedgerConfigurationService, DamlLedgerIdentityService, DamlPackageService, DamlParticipantPruningService,
5    DamlTransactionService, DamlVersionService,
6};
7#[cfg(feature = "admin")]
8use crate::service::{DamlConfigManagementService, DamlPackageManagementService, DamlPartyManagementService};
9#[cfg(feature = "sandbox")]
10use crate::service::{DamlResetService, DamlTimeService};
11use std::time::Duration;
12#[cfg(feature = "sandbox")]
13use std::time::Instant;
14use tonic::transport::{Certificate, Channel, ClientTlsConfig};
15use tracing::{debug, instrument};
16
17use hyper::client::HttpConnector;
18#[cfg(test)]
19use tonic::transport::Uri;
20
21const DEFAULT_TIMEOUT_SECS: u64 = 5;
22const DEFAULT_CONNECT_TIMEOUT_SECS: u64 = 5;
23#[cfg(feature = "sandbox")]
24const DEFAULT_RESET_TIMEOUT_SECS: u64 = 5;
25
26/// DOCME
27#[derive(Debug, Default)]
28pub struct DamlGrpcClientConfig {
29    uri: String,
30    timeout: Duration,
31    connect_timeout: Option<Duration>,
32    #[cfg(feature = "sandbox")]
33    reset_timeout: Duration,
34    concurrency_limit: Option<usize>,
35    rate_limit: Option<(u64, Duration)>,
36    initial_stream_window_size: Option<u32>,
37    initial_connection_window_size: Option<u32>,
38    tcp_keepalive: Option<Duration>,
39    tcp_nodelay: bool,
40    tls_config: Option<DamlGrpcTlsConfig>,
41    auth_token: Option<String>,
42}
43
44/// DOCME
45#[derive(Debug)]
46pub struct DamlGrpcTlsConfig {
47    ca_cert: Option<Vec<u8>>,
48}
49
50/// Construct a [`DamlGrpcClient`].
51pub struct DamlGrpcClientBuilder {
52    config: DamlGrpcClientConfig,
53}
54
55impl DamlGrpcClientBuilder {
56    /// DOCME
57    pub fn uri(uri: impl Into<String>) -> Self {
58        Self {
59            config: DamlGrpcClientConfig {
60                uri: uri.into(),
61                timeout: Duration::from_secs(DEFAULT_TIMEOUT_SECS),
62                connect_timeout: Some(Duration::from_secs(DEFAULT_CONNECT_TIMEOUT_SECS)),
63                #[cfg(feature = "sandbox")]
64                reset_timeout: Duration::from_secs(DEFAULT_RESET_TIMEOUT_SECS),
65                ..DamlGrpcClientConfig::default()
66            },
67        }
68    }
69
70    /// The network timeout.
71    pub fn timeout(self, timeout: Duration) -> Self {
72        Self {
73            config: DamlGrpcClientConfig {
74                timeout,
75                ..self.config
76            },
77        }
78    }
79
80    /// The connection timeout.
81    pub fn connect_timeout(self, connect_timeout: Option<Duration>) -> Self {
82        Self {
83            config: DamlGrpcClientConfig {
84                connect_timeout,
85                ..self.config
86            },
87        }
88    }
89
90    #[cfg(feature = "sandbox")]
91    /// The sandbox reset timeout.
92    pub fn reset_timeout(self, reset_timeout: Duration) -> Self {
93        Self {
94            config: DamlGrpcClientConfig {
95                reset_timeout,
96                ..self.config
97            },
98        }
99    }
100
101    /// DOCME
102    pub fn concurrency_limit(self, concurrency_limit: usize) -> Self {
103        Self {
104            config: DamlGrpcClientConfig {
105                concurrency_limit: Some(concurrency_limit),
106                ..self.config
107            },
108        }
109    }
110
111    /// DOCME
112    pub fn rate_limit(self, rate_limit: (u64, Duration)) -> Self {
113        Self {
114            config: DamlGrpcClientConfig {
115                rate_limit: Some(rate_limit),
116                ..self.config
117            },
118        }
119    }
120
121    /// DOCME
122    pub fn initial_stream_window_size(self, initial_stream_window_size: u32) -> Self {
123        Self {
124            config: DamlGrpcClientConfig {
125                initial_stream_window_size: Some(initial_stream_window_size),
126                ..self.config
127            },
128        }
129    }
130
131    /// DOCME
132    pub fn initial_connection_window_size(self, initial_connection_window_size: u32) -> Self {
133        Self {
134            config: DamlGrpcClientConfig {
135                initial_connection_window_size: Some(initial_connection_window_size),
136                ..self.config
137            },
138        }
139    }
140
141    /// DOCME
142    pub fn tcp_keepalive(self, tcp_keepalive: Duration) -> Self {
143        Self {
144            config: DamlGrpcClientConfig {
145                tcp_keepalive: Some(tcp_keepalive),
146                ..self.config
147            },
148        }
149    }
150
151    /// DOCME
152    pub fn tcp_nodelay(self, tcp_nodelay: bool) -> Self {
153        Self {
154            config: DamlGrpcClientConfig {
155                tcp_nodelay,
156                ..self.config
157            },
158        }
159    }
160
161    /// DOCME
162    pub fn with_tls(self, ca_cert: impl Into<Vec<u8>>) -> Self {
163        Self {
164            config: DamlGrpcClientConfig {
165                tls_config: Some(DamlGrpcTlsConfig {
166                    ca_cert: Some(ca_cert.into()),
167                }),
168                ..self.config
169            },
170        }
171    }
172
173    /// DOCME
174    pub fn with_auth(self, auth_token: String) -> Self {
175        Self {
176            config: DamlGrpcClientConfig {
177                auth_token: Some(auth_token),
178                ..self.config
179            },
180        }
181    }
182
183    /// DOCME
184    pub async fn connect(self) -> DamlResult<DamlGrpcClient> {
185        DamlGrpcClient::connect(self.config).await
186    }
187}
188
189/// Daml ledger client connection.
190#[derive(Debug)]
191pub struct DamlGrpcClient {
192    config: DamlGrpcClientConfig,
193    channel: Channel,
194    ledger_identity: String,
195}
196
197impl DamlGrpcClient {
198    /// Create a channel and connect.
199    #[instrument]
200    pub async fn connect(config: DamlGrpcClientConfig) -> DamlResult<Self> {
201        debug!("connecting to {}", config.uri);
202        let channel = Self::open_channel(&config).await?;
203        Self::make_client_from_channel(channel, config).await
204    }
205
206    /// Reset the ledger and reconnect.
207    #[cfg(feature = "sandbox")]
208    #[instrument(skip(self))]
209    pub async fn reset_and_wait(self) -> DamlResult<Self> {
210        debug!("resetting Sandbox");
211        self.reset_service().reset().await?;
212        let channel = Self::open_channel_and_wait(&self.config).await?;
213        Self::make_client_from_channel_and_wait(channel, self.config).await
214    }
215
216    /// Return the current configuration.
217    pub const fn config(&self) -> &DamlGrpcClientConfig {
218        &self.config
219    }
220
221    /// DOCME
222    pub fn ledger_identity(&self) -> &str {
223        &self.ledger_identity
224    }
225
226    /// DOCME
227    pub fn ledger_identity_service(&self) -> DamlLedgerIdentityService<'_> {
228        DamlLedgerIdentityService::new(self.channel.clone(), self.config.auth_token.as_deref())
229    }
230
231    /// DOCME
232    pub fn ledger_configuration_service(&self) -> DamlLedgerConfigurationService<'_> {
233        DamlLedgerConfigurationService::new(
234            self.channel.clone(),
235            &self.ledger_identity,
236            self.config.auth_token.as_deref(),
237        )
238    }
239
240    /// DOCME
241    pub fn package_service(&self) -> DamlPackageService<'_> {
242        DamlPackageService::new(self.channel.clone(), &self.ledger_identity, self.config.auth_token.as_deref())
243    }
244
245    /// DOCME
246    pub fn command_submission_service(&self) -> DamlCommandSubmissionService<'_> {
247        DamlCommandSubmissionService::new(
248            self.channel.clone(),
249            &self.ledger_identity,
250            self.config.auth_token.as_deref(),
251        )
252    }
253
254    /// DOCME
255    pub fn command_completion_service(&self) -> DamlCommandCompletionService<'_> {
256        DamlCommandCompletionService::new(
257            self.channel.clone(),
258            &self.ledger_identity,
259            self.config.auth_token.as_deref(),
260        )
261    }
262
263    /// DOCME
264    pub fn command_service(&self) -> DamlCommandService<'_> {
265        DamlCommandService::new(self.channel.clone(), &self.ledger_identity, self.config.auth_token.as_deref())
266    }
267
268    /// DOCME
269    pub fn transaction_service(&self) -> DamlTransactionService<'_> {
270        DamlTransactionService::new(self.channel.clone(), &self.ledger_identity, self.config.auth_token.as_deref())
271    }
272
273    /// DOCME
274    pub fn active_contract_service(&self) -> DamlActiveContractsService<'_> {
275        DamlActiveContractsService::new(self.channel.clone(), &self.ledger_identity, self.config.auth_token.as_deref())
276    }
277
278    /// DOCME
279    pub fn version_service(&self) -> DamlVersionService<'_> {
280        DamlVersionService::new(self.channel.clone(), &self.ledger_identity, self.config.auth_token.as_deref())
281    }
282
283    /// DOCME
284    #[cfg(feature = "admin")]
285    pub fn package_management_service(&self) -> DamlPackageManagementService<'_> {
286        DamlPackageManagementService::new(self.channel.clone(), self.config.auth_token.as_deref())
287    }
288
289    /// DOCME
290    #[cfg(feature = "admin")]
291    pub fn party_management_service(&self) -> DamlPartyManagementService<'_> {
292        DamlPartyManagementService::new(self.channel.clone(), self.config.auth_token.as_deref())
293    }
294
295    /// DOCME
296    #[cfg(feature = "admin")]
297    pub fn config_management_service(&self) -> DamlConfigManagementService<'_> {
298        DamlConfigManagementService::new(self.channel.clone(), self.config.auth_token.as_deref())
299    }
300
301    /// DOCME
302    #[cfg(feature = "admin")]
303    pub fn participant_pruning_service(&self) -> DamlParticipantPruningService<'_> {
304        DamlParticipantPruningService::new(self.channel.clone(), self.config.auth_token.as_deref())
305    }
306
307    /// DOCME
308    #[cfg(feature = "sandbox")]
309    pub fn reset_service(&self) -> DamlResetService<'_> {
310        DamlResetService::new(self.channel.clone(), &self.ledger_identity, self.config.auth_token.as_deref())
311    }
312
313    /// DOCME
314    #[cfg(feature = "sandbox")]
315    pub fn time_service(&self) -> DamlTimeService<'_> {
316        DamlTimeService::new(self.channel.clone(), &self.ledger_identity, self.config.auth_token.as_deref())
317    }
318
319    async fn open_channel(config: &DamlGrpcClientConfig) -> DamlResult<Channel> {
320        Self::make_channel(config).await
321    }
322
323    async fn make_channel(config: &DamlGrpcClientConfig) -> DamlResult<Channel> {
324        let mut channel = Channel::from_shared(config.uri.clone())?;
325        if let Some(limit) = config.concurrency_limit {
326            channel = channel.concurrency_limit(limit);
327        }
328        if let Some((limit, duration)) = config.rate_limit {
329            channel = channel.rate_limit(limit, duration);
330        }
331        if let Some(size) = config.initial_stream_window_size {
332            channel = channel.initial_stream_window_size(size);
333        }
334        if let Some(size) = config.initial_connection_window_size {
335            channel = channel.initial_connection_window_size(size);
336        }
337        if let Some(duration) = config.tcp_keepalive {
338            channel = channel.tcp_keepalive(Some(duration));
339        }
340        channel = channel.tcp_nodelay(config.tcp_nodelay);
341        channel = channel.timeout(config.timeout);
342        match &config.tls_config {
343            Some(DamlGrpcTlsConfig {
344                ca_cert: Some(cert),
345            }) => {
346                channel = channel.tls_config(ClientTlsConfig::new().ca_certificate(Certificate::from_pem(cert)))?;
347            },
348            Some(DamlGrpcTlsConfig {
349                ca_cert: None,
350            }) => {
351                channel = channel.tls_config(ClientTlsConfig::new())?;
352            },
353            _ => {},
354        }
355
356        // Tonic does not current allow us to set a connect timeout (see https://github.com/hyperium/tonic/issues/498)
357        // directly and so we workaround this by creating the Hyper HttpConnector directly.
358        let mut http = HttpConnector::new();
359        http.enforce_http(false);
360        http.set_nodelay(config.tcp_nodelay);
361        http.set_keepalive(config.tcp_keepalive);
362        http.set_connect_timeout(config.connect_timeout);
363        channel.connect_with_connector(http).await.map_err(DamlError::from)
364    }
365
366    async fn make_client_from_channel(channel: Channel, config: DamlGrpcClientConfig) -> DamlResult<Self> {
367        let ledger_identity_service = DamlLedgerIdentityService::new(channel.clone(), config.auth_token.as_deref());
368        let ledger_identity = ledger_identity_service.get_ledger_identity().await?;
369        Ok(Self {
370            config,
371            channel: channel.clone(),
372            ledger_identity,
373        })
374    }
375
376    #[cfg(feature = "sandbox")]
377    async fn open_channel_and_wait(config: &DamlGrpcClientConfig) -> DamlResult<Channel> {
378        let mut channel = Self::make_channel(config).await;
379        let start = Instant::now();
380        while let Err(e) = channel {
381            if start.elapsed() > config.reset_timeout {
382                return Err(DamlError::new_timeout_error(e));
383            }
384            channel = Self::make_channel(config).await;
385        }
386        channel
387    }
388
389    #[cfg(feature = "sandbox")]
390    async fn make_client_from_channel_and_wait(channel: Channel, config: DamlGrpcClientConfig) -> DamlResult<Self> {
391        let ledger_identity_service = DamlLedgerIdentityService::new(channel.clone(), config.auth_token.as_deref());
392        let ledger_identity =
393            Self::query_ledger_identity_and_wait(&config.reset_timeout, &ledger_identity_service).await?;
394        Ok(Self {
395            config,
396            channel: channel.clone(),
397            ledger_identity,
398        })
399    }
400
401    #[cfg(feature = "sandbox")]
402    async fn query_ledger_identity_and_wait(
403        reset_timeout: &Duration,
404        ledger_identity_service: &DamlLedgerIdentityService<'_>,
405    ) -> DamlResult<String> {
406        let start = Instant::now();
407        let mut ledger_identity: DamlResult<String> = ledger_identity_service.get_ledger_identity().await;
408        while let Err(e) = ledger_identity {
409            if let DamlError::GrpcPermissionError(_) = e {
410                return Err(e);
411            }
412            if start.elapsed() > *reset_timeout {
413                return Err(DamlError::new_timeout_error(e));
414            }
415            ledger_identity = ledger_identity_service.get_ledger_identity().await;
416        }
417        ledger_identity
418    }
419
420    #[cfg(test)]
421    pub(crate) async fn dummy_for_testing() -> Self {
422        DamlGrpcClient {
423            config: DamlGrpcClientConfig::default(),
424            channel: Channel::builder(Uri::from_static("http://dummy.for.testing")).connect_lazy(),
425            ledger_identity: String::default(),
426        }
427    }
428}