1use crate::data::{DamlError, DamlResult};
2use crate::service::{
3 DamlActiveContractsService, DamlCommandCompletionService, DamlCommandService, DamlCommandSubmissionService,
4 DamlLedgerConfigurationService, DamlLedgerIdentityService, DamlPackageService, DamlParticipantPruningService,
5 DamlTransactionService, DamlVersionService,
6};
7#[cfg(feature = "admin")]
8use crate::service::{DamlConfigManagementService, DamlPackageManagementService, DamlPartyManagementService};
9#[cfg(feature = "sandbox")]
10use crate::service::{DamlResetService, DamlTimeService};
11use std::time::Duration;
12#[cfg(feature = "sandbox")]
13use std::time::Instant;
14use tonic::transport::{Certificate, Channel, ClientTlsConfig};
15use tracing::{debug, instrument};
16
17use hyper::client::HttpConnector;
18#[cfg(test)]
19use tonic::transport::Uri;
20
21const DEFAULT_TIMEOUT_SECS: u64 = 5;
22const DEFAULT_CONNECT_TIMEOUT_SECS: u64 = 5;
23#[cfg(feature = "sandbox")]
24const DEFAULT_RESET_TIMEOUT_SECS: u64 = 5;
25
26#[derive(Debug, Default)]
28pub struct DamlGrpcClientConfig {
29 uri: String,
30 timeout: Duration,
31 connect_timeout: Option<Duration>,
32 #[cfg(feature = "sandbox")]
33 reset_timeout: Duration,
34 concurrency_limit: Option<usize>,
35 rate_limit: Option<(u64, Duration)>,
36 initial_stream_window_size: Option<u32>,
37 initial_connection_window_size: Option<u32>,
38 tcp_keepalive: Option<Duration>,
39 tcp_nodelay: bool,
40 tls_config: Option<DamlGrpcTlsConfig>,
41 auth_token: Option<String>,
42}
43
44#[derive(Debug)]
46pub struct DamlGrpcTlsConfig {
47 ca_cert: Option<Vec<u8>>,
48}
49
50pub struct DamlGrpcClientBuilder {
52 config: DamlGrpcClientConfig,
53}
54
55impl DamlGrpcClientBuilder {
56 pub fn uri(uri: impl Into<String>) -> Self {
58 Self {
59 config: DamlGrpcClientConfig {
60 uri: uri.into(),
61 timeout: Duration::from_secs(DEFAULT_TIMEOUT_SECS),
62 connect_timeout: Some(Duration::from_secs(DEFAULT_CONNECT_TIMEOUT_SECS)),
63 #[cfg(feature = "sandbox")]
64 reset_timeout: Duration::from_secs(DEFAULT_RESET_TIMEOUT_SECS),
65 ..DamlGrpcClientConfig::default()
66 },
67 }
68 }
69
70 pub fn timeout(self, timeout: Duration) -> Self {
72 Self {
73 config: DamlGrpcClientConfig {
74 timeout,
75 ..self.config
76 },
77 }
78 }
79
80 pub fn connect_timeout(self, connect_timeout: Option<Duration>) -> Self {
82 Self {
83 config: DamlGrpcClientConfig {
84 connect_timeout,
85 ..self.config
86 },
87 }
88 }
89
90 #[cfg(feature = "sandbox")]
91 pub fn reset_timeout(self, reset_timeout: Duration) -> Self {
93 Self {
94 config: DamlGrpcClientConfig {
95 reset_timeout,
96 ..self.config
97 },
98 }
99 }
100
101 pub fn concurrency_limit(self, concurrency_limit: usize) -> Self {
103 Self {
104 config: DamlGrpcClientConfig {
105 concurrency_limit: Some(concurrency_limit),
106 ..self.config
107 },
108 }
109 }
110
111 pub fn rate_limit(self, rate_limit: (u64, Duration)) -> Self {
113 Self {
114 config: DamlGrpcClientConfig {
115 rate_limit: Some(rate_limit),
116 ..self.config
117 },
118 }
119 }
120
121 pub fn initial_stream_window_size(self, initial_stream_window_size: u32) -> Self {
123 Self {
124 config: DamlGrpcClientConfig {
125 initial_stream_window_size: Some(initial_stream_window_size),
126 ..self.config
127 },
128 }
129 }
130
131 pub fn initial_connection_window_size(self, initial_connection_window_size: u32) -> Self {
133 Self {
134 config: DamlGrpcClientConfig {
135 initial_connection_window_size: Some(initial_connection_window_size),
136 ..self.config
137 },
138 }
139 }
140
141 pub fn tcp_keepalive(self, tcp_keepalive: Duration) -> Self {
143 Self {
144 config: DamlGrpcClientConfig {
145 tcp_keepalive: Some(tcp_keepalive),
146 ..self.config
147 },
148 }
149 }
150
151 pub fn tcp_nodelay(self, tcp_nodelay: bool) -> Self {
153 Self {
154 config: DamlGrpcClientConfig {
155 tcp_nodelay,
156 ..self.config
157 },
158 }
159 }
160
161 pub fn with_tls(self, ca_cert: impl Into<Vec<u8>>) -> Self {
163 Self {
164 config: DamlGrpcClientConfig {
165 tls_config: Some(DamlGrpcTlsConfig {
166 ca_cert: Some(ca_cert.into()),
167 }),
168 ..self.config
169 },
170 }
171 }
172
173 pub fn with_auth(self, auth_token: String) -> Self {
175 Self {
176 config: DamlGrpcClientConfig {
177 auth_token: Some(auth_token),
178 ..self.config
179 },
180 }
181 }
182
183 pub async fn connect(self) -> DamlResult<DamlGrpcClient> {
185 DamlGrpcClient::connect(self.config).await
186 }
187}
188
189#[derive(Debug)]
191pub struct DamlGrpcClient {
192 config: DamlGrpcClientConfig,
193 channel: Channel,
194 ledger_identity: String,
195}
196
197impl DamlGrpcClient {
198 #[instrument]
200 pub async fn connect(config: DamlGrpcClientConfig) -> DamlResult<Self> {
201 debug!("connecting to {}", config.uri);
202 let channel = Self::open_channel(&config).await?;
203 Self::make_client_from_channel(channel, config).await
204 }
205
206 #[cfg(feature = "sandbox")]
208 #[instrument(skip(self))]
209 pub async fn reset_and_wait(self) -> DamlResult<Self> {
210 debug!("resetting Sandbox");
211 self.reset_service().reset().await?;
212 let channel = Self::open_channel_and_wait(&self.config).await?;
213 Self::make_client_from_channel_and_wait(channel, self.config).await
214 }
215
216 pub const fn config(&self) -> &DamlGrpcClientConfig {
218 &self.config
219 }
220
221 pub fn ledger_identity(&self) -> &str {
223 &self.ledger_identity
224 }
225
226 pub fn ledger_identity_service(&self) -> DamlLedgerIdentityService<'_> {
228 DamlLedgerIdentityService::new(self.channel.clone(), self.config.auth_token.as_deref())
229 }
230
231 pub fn ledger_configuration_service(&self) -> DamlLedgerConfigurationService<'_> {
233 DamlLedgerConfigurationService::new(
234 self.channel.clone(),
235 &self.ledger_identity,
236 self.config.auth_token.as_deref(),
237 )
238 }
239
240 pub fn package_service(&self) -> DamlPackageService<'_> {
242 DamlPackageService::new(self.channel.clone(), &self.ledger_identity, self.config.auth_token.as_deref())
243 }
244
245 pub fn command_submission_service(&self) -> DamlCommandSubmissionService<'_> {
247 DamlCommandSubmissionService::new(
248 self.channel.clone(),
249 &self.ledger_identity,
250 self.config.auth_token.as_deref(),
251 )
252 }
253
254 pub fn command_completion_service(&self) -> DamlCommandCompletionService<'_> {
256 DamlCommandCompletionService::new(
257 self.channel.clone(),
258 &self.ledger_identity,
259 self.config.auth_token.as_deref(),
260 )
261 }
262
263 pub fn command_service(&self) -> DamlCommandService<'_> {
265 DamlCommandService::new(self.channel.clone(), &self.ledger_identity, self.config.auth_token.as_deref())
266 }
267
268 pub fn transaction_service(&self) -> DamlTransactionService<'_> {
270 DamlTransactionService::new(self.channel.clone(), &self.ledger_identity, self.config.auth_token.as_deref())
271 }
272
273 pub fn active_contract_service(&self) -> DamlActiveContractsService<'_> {
275 DamlActiveContractsService::new(self.channel.clone(), &self.ledger_identity, self.config.auth_token.as_deref())
276 }
277
278 pub fn version_service(&self) -> DamlVersionService<'_> {
280 DamlVersionService::new(self.channel.clone(), &self.ledger_identity, self.config.auth_token.as_deref())
281 }
282
283 #[cfg(feature = "admin")]
285 pub fn package_management_service(&self) -> DamlPackageManagementService<'_> {
286 DamlPackageManagementService::new(self.channel.clone(), self.config.auth_token.as_deref())
287 }
288
289 #[cfg(feature = "admin")]
291 pub fn party_management_service(&self) -> DamlPartyManagementService<'_> {
292 DamlPartyManagementService::new(self.channel.clone(), self.config.auth_token.as_deref())
293 }
294
295 #[cfg(feature = "admin")]
297 pub fn config_management_service(&self) -> DamlConfigManagementService<'_> {
298 DamlConfigManagementService::new(self.channel.clone(), self.config.auth_token.as_deref())
299 }
300
301 #[cfg(feature = "admin")]
303 pub fn participant_pruning_service(&self) -> DamlParticipantPruningService<'_> {
304 DamlParticipantPruningService::new(self.channel.clone(), self.config.auth_token.as_deref())
305 }
306
307 #[cfg(feature = "sandbox")]
309 pub fn reset_service(&self) -> DamlResetService<'_> {
310 DamlResetService::new(self.channel.clone(), &self.ledger_identity, self.config.auth_token.as_deref())
311 }
312
313 #[cfg(feature = "sandbox")]
315 pub fn time_service(&self) -> DamlTimeService<'_> {
316 DamlTimeService::new(self.channel.clone(), &self.ledger_identity, self.config.auth_token.as_deref())
317 }
318
319 async fn open_channel(config: &DamlGrpcClientConfig) -> DamlResult<Channel> {
320 Self::make_channel(config).await
321 }
322
323 async fn make_channel(config: &DamlGrpcClientConfig) -> DamlResult<Channel> {
324 let mut channel = Channel::from_shared(config.uri.clone())?;
325 if let Some(limit) = config.concurrency_limit {
326 channel = channel.concurrency_limit(limit);
327 }
328 if let Some((limit, duration)) = config.rate_limit {
329 channel = channel.rate_limit(limit, duration);
330 }
331 if let Some(size) = config.initial_stream_window_size {
332 channel = channel.initial_stream_window_size(size);
333 }
334 if let Some(size) = config.initial_connection_window_size {
335 channel = channel.initial_connection_window_size(size);
336 }
337 if let Some(duration) = config.tcp_keepalive {
338 channel = channel.tcp_keepalive(Some(duration));
339 }
340 channel = channel.tcp_nodelay(config.tcp_nodelay);
341 channel = channel.timeout(config.timeout);
342 match &config.tls_config {
343 Some(DamlGrpcTlsConfig {
344 ca_cert: Some(cert),
345 }) => {
346 channel = channel.tls_config(ClientTlsConfig::new().ca_certificate(Certificate::from_pem(cert)))?;
347 },
348 Some(DamlGrpcTlsConfig {
349 ca_cert: None,
350 }) => {
351 channel = channel.tls_config(ClientTlsConfig::new())?;
352 },
353 _ => {},
354 }
355
356 let mut http = HttpConnector::new();
359 http.enforce_http(false);
360 http.set_nodelay(config.tcp_nodelay);
361 http.set_keepalive(config.tcp_keepalive);
362 http.set_connect_timeout(config.connect_timeout);
363 channel.connect_with_connector(http).await.map_err(DamlError::from)
364 }
365
366 async fn make_client_from_channel(channel: Channel, config: DamlGrpcClientConfig) -> DamlResult<Self> {
367 let ledger_identity_service = DamlLedgerIdentityService::new(channel.clone(), config.auth_token.as_deref());
368 let ledger_identity = ledger_identity_service.get_ledger_identity().await?;
369 Ok(Self {
370 config,
371 channel: channel.clone(),
372 ledger_identity,
373 })
374 }
375
376 #[cfg(feature = "sandbox")]
377 async fn open_channel_and_wait(config: &DamlGrpcClientConfig) -> DamlResult<Channel> {
378 let mut channel = Self::make_channel(config).await;
379 let start = Instant::now();
380 while let Err(e) = channel {
381 if start.elapsed() > config.reset_timeout {
382 return Err(DamlError::new_timeout_error(e));
383 }
384 channel = Self::make_channel(config).await;
385 }
386 channel
387 }
388
389 #[cfg(feature = "sandbox")]
390 async fn make_client_from_channel_and_wait(channel: Channel, config: DamlGrpcClientConfig) -> DamlResult<Self> {
391 let ledger_identity_service = DamlLedgerIdentityService::new(channel.clone(), config.auth_token.as_deref());
392 let ledger_identity =
393 Self::query_ledger_identity_and_wait(&config.reset_timeout, &ledger_identity_service).await?;
394 Ok(Self {
395 config,
396 channel: channel.clone(),
397 ledger_identity,
398 })
399 }
400
401 #[cfg(feature = "sandbox")]
402 async fn query_ledger_identity_and_wait(
403 reset_timeout: &Duration,
404 ledger_identity_service: &DamlLedgerIdentityService<'_>,
405 ) -> DamlResult<String> {
406 let start = Instant::now();
407 let mut ledger_identity: DamlResult<String> = ledger_identity_service.get_ledger_identity().await;
408 while let Err(e) = ledger_identity {
409 if let DamlError::GrpcPermissionError(_) = e {
410 return Err(e);
411 }
412 if start.elapsed() > *reset_timeout {
413 return Err(DamlError::new_timeout_error(e));
414 }
415 ledger_identity = ledger_identity_service.get_ledger_identity().await;
416 }
417 ledger_identity
418 }
419
420 #[cfg(test)]
421 pub(crate) async fn dummy_for_testing() -> Self {
422 DamlGrpcClient {
423 config: DamlGrpcClientConfig::default(),
424 channel: Channel::builder(Uri::from_static("http://dummy.for.testing")).connect_lazy(),
425 ledger_identity: String::default(),
426 }
427 }
428}