1use crate::data::{DamlJsonCreatedEvent, DamlJsonExerciseResult, DamlJsonParty, DamlJsonQuery};
2use crate::error::{DamlJsonError, DamlJsonResult};
3use crate::request::{
4 DamlJsonAllocatePartyRequest, DamlJsonAllocatePartyResponse, DamlJsonCreateAndExerciseRequest,
5 DamlJsonCreateAndExerciseResponse, DamlJsonCreateRequest, DamlJsonCreateResponse, DamlJsonErrorResponse,
6 DamlJsonExerciseByKeyRequest, DamlJsonExerciseByKeyResponse, DamlJsonExerciseRequest, DamlJsonExerciseResponse,
7 DamlJsonFetchByKeyRequest, DamlJsonFetchPartiesRequest, DamlJsonFetchPartiesResponse, DamlJsonFetchRequest,
8 DamlJsonFetchResponse, DamlJsonListPackagesResponse, DamlJsonQueryResponse, DamlJsonRequestMeta,
9 DamlJsonUploadDarResponse,
10};
11use crate::util::Required;
12use bytes::Bytes;
13use reqwest::{Certificate, Client, ClientBuilder, RequestBuilder, Response};
14use serde::de::DeserializeOwned;
15use serde::Serialize;
16use serde_json::Value;
17use std::fmt::Debug;
18use std::time::{Duration, Instant};
19use tracing::{instrument, trace};
20use url::Url;
21
22static CREATE_REST: &str = "/v1/create";
23static EXERCISE_REST: &str = "/v1/exercise";
24static CREATE_AND_EXERCISE_REST: &str = "/v1/create-and-exercise";
25static FETCH_REST: &str = "/v1/fetch";
26static QUERY_REST: &str = "/v1/query";
27static PARTIES_REST: &str = "/v1/parties";
28static ALLOCATE_PARTY_REST: &str = "/v1/parties/allocate";
29static PACKAGES_REST: &str = "/v1/packages";
30
31const DEFAULT_TIMEOUT_SECS: u64 = 5;
32
33#[derive(Debug, Default)]
35pub struct DamlJsonClientConfig {
36 url: String,
37 connect_timeout: Duration,
38 timeout: Duration,
39 keepalive: Option<Duration>,
40 nodelay: bool,
41 max_idle_connection_per_host: usize,
42 tls_config: Option<DamlJsonTlsConfig>,
43 auth_token: Option<String>,
44}
45
46#[derive(Debug)]
48pub struct DamlJsonTlsConfig {
49 ca_cert: Vec<u8>,
50}
51
52pub struct DamlJsonClientBuilder {
54 config: DamlJsonClientConfig,
55}
56
57impl DamlJsonClientBuilder {
58 pub fn url(url: impl Into<String>) -> Self {
60 Self {
61 config: DamlJsonClientConfig {
62 url: url.into(),
63 connect_timeout: Duration::from_secs(DEFAULT_TIMEOUT_SECS),
64 timeout: Duration::from_secs(DEFAULT_TIMEOUT_SECS),
65 ..DamlJsonClientConfig::default()
66 },
67 }
68 }
69
70 pub fn connect_timeout(self, connect_timeout: Duration) -> Self {
72 Self {
73 config: DamlJsonClientConfig {
74 connect_timeout,
75 ..self.config
76 },
77 }
78 }
79
80 pub fn timeout(self, timeout: Duration) -> Self {
82 Self {
83 config: DamlJsonClientConfig {
84 timeout,
85 ..self.config
86 },
87 }
88 }
89
90 pub fn keepalive(self, keepalive: Duration) -> Self {
92 Self {
93 config: DamlJsonClientConfig {
94 keepalive: Some(keepalive),
95 ..self.config
96 },
97 }
98 }
99
100 pub fn nodelay(self) -> Self {
102 Self {
103 config: DamlJsonClientConfig {
104 nodelay: true,
105 ..self.config
106 },
107 }
108 }
109
110 pub fn max_idle_connection_per_host(self, max: usize) -> Self {
112 Self {
113 config: DamlJsonClientConfig {
114 max_idle_connection_per_host: max,
115 ..self.config
116 },
117 }
118 }
119
120 pub fn with_tls(self, ca_cert: impl Into<Vec<u8>>) -> Self {
122 Self {
123 config: DamlJsonClientConfig {
124 tls_config: Some(DamlJsonTlsConfig {
125 ca_cert: ca_cert.into(),
126 }),
127 ..self.config
128 },
129 }
130 }
131
132 pub fn with_auth(self, auth_token: String) -> Self {
134 Self {
135 config: DamlJsonClientConfig {
136 auth_token: Some(auth_token),
137 ..self.config
138 },
139 }
140 }
141
142 pub fn build(self) -> DamlJsonResult<DamlJsonClient> {
144 DamlJsonClient::new_from_config(self.config)
145 }
146}
147
148pub struct DamlJsonClient {
172 client: Client,
173 config: DamlJsonClientConfig,
174}
175
176impl DamlJsonClient {
177 pub fn new(url: impl Into<String>, token: Option<String>) -> DamlJsonResult<Self> {
179 Ok(Self {
180 client: Client::new(),
181 config: DamlJsonClientConfig {
182 url: url.into(),
183 auth_token: token,
184 ..DamlJsonClientConfig::default()
185 },
186 })
187 }
188
189 pub fn new_from_config(config: DamlJsonClientConfig) -> DamlJsonResult<Self> {
191 let mut builder = ClientBuilder::default()
192 .connect_timeout(config.connect_timeout)
193 .timeout(config.timeout)
194 .pool_idle_timeout(config.keepalive)
195 .tcp_nodelay(config.nodelay)
196 .pool_max_idle_per_host(config.max_idle_connection_per_host)
197 .use_rustls_tls();
198 if let Some(cc) = &config.tls_config {
199 builder = builder.add_root_certificate(Certificate::from_pem(&cc.ca_cert)?);
200 }
201 Ok(Self {
202 client: builder.build()?,
203 config,
204 })
205 }
206
207 pub const fn config(&self) -> &DamlJsonClientConfig {
209 &self.config
210 }
211
212 #[instrument(skip(self))]
214 pub async fn create(&self, template_id: &str, payload: Value) -> DamlJsonResult<DamlJsonCreatedEvent> {
215 Ok(self.create_request(&DamlJsonCreateRequest::new(template_id, payload)).await?.result)
216 }
217
218 #[instrument(skip(self))]
220 pub async fn create_with_meta(
221 &self,
222 template_id: &str,
223 payload: Value,
224 meta: DamlJsonRequestMeta,
225 ) -> DamlJsonResult<DamlJsonCreatedEvent> {
226 Ok(self.create_request(&DamlJsonCreateRequest::new_with_meta(template_id, payload, meta)).await?.result)
227 }
228
229 #[instrument(skip(self))]
231 pub async fn exercise(
232 &self,
233 template_id: &str,
234 contract_id: &str,
235 choice: &str,
236 argument: Value,
237 ) -> DamlJsonResult<DamlJsonExerciseResult> {
238 Ok(self
239 .exercise_request(&DamlJsonExerciseRequest::new(template_id, contract_id, choice, argument))
240 .await?
241 .result)
242 }
243
244 #[instrument(skip(self))]
246 pub async fn exercise_by_key(
247 &self,
248 template_id: &str,
249 key: Value,
250 choice: &str,
251 argument: Value,
252 ) -> DamlJsonResult<DamlJsonExerciseResult> {
253 Ok(self
254 .exercise_by_key_request(&DamlJsonExerciseByKeyRequest::new(template_id, key, choice, argument))
255 .await?
256 .result)
257 }
258
259 #[instrument(skip(self))]
261 pub async fn create_and_exercise(
262 &self,
263 template_id: &str,
264 payload: Value,
265 choice: &str,
266 argument: Value,
267 ) -> DamlJsonResult<DamlJsonExerciseResult> {
268 Ok(self
269 .create_and_exercise_request(&DamlJsonCreateAndExerciseRequest::new(template_id, payload, choice, argument))
270 .await?
271 .result)
272 }
273
274 #[instrument(skip(self))]
276 pub async fn fetch(&self, contract_id: &str) -> DamlJsonResult<DamlJsonCreatedEvent> {
277 Ok(self.fetch_request(&DamlJsonFetchRequest::new(contract_id)).await?.result)
278 }
279
280 #[instrument(skip(self))]
282 pub async fn fetch_by_key(&self, template_id: &str, key: Value) -> DamlJsonResult<DamlJsonCreatedEvent> {
283 Ok(self.fetch_by_key_request(&DamlJsonFetchByKeyRequest::new(template_id, key)).await?.result)
284 }
285
286 #[instrument(skip(self))]
288 pub async fn query_all(&self) -> DamlJsonResult<Vec<DamlJsonCreatedEvent>> {
289 Ok(self.query_all_request().await?.result)
290 }
291
292 #[instrument(skip(self))]
294 pub async fn query<S: Into<String> + Debug>(
295 &self,
296 template_ids: Vec<S>,
297 query: Value,
298 ) -> DamlJsonResult<Vec<DamlJsonCreatedEvent>> {
299 let templates: Vec<_> = template_ids.into_iter().map(Into::into).collect::<Vec<_>>();
300 Ok(self.query_request(&DamlJsonQuery::new(templates, query)).await?.result)
301 }
302
303 #[instrument(skip(self))]
308 pub async fn fetch_parties<S: Into<String> + Debug>(&self, parties: Vec<S>) -> DamlJsonResult<Vec<DamlJsonParty>> {
309 Ok(self
310 .fetch_parties_request(&DamlJsonFetchPartiesRequest::new(parties.into_iter().map(Into::into).collect()))
311 .await?
312 .result)
313 }
314
315 #[instrument(skip(self))]
319 pub async fn fetch_parties_with_unknown<S: Into<String> + Debug>(
320 &self,
321 parties: Vec<S>,
322 ) -> DamlJsonResult<(Vec<DamlJsonParty>, Vec<String>)> {
323 let response = self
324 .fetch_parties_request(&DamlJsonFetchPartiesRequest::new(parties.into_iter().map(Into::into).collect()))
325 .await?;
326 let unknown_parties =
327 response.warnings.and_then(|mut warnings| warnings.remove("unknownParties")).unwrap_or_default();
328 Ok((response.result, unknown_parties))
329 }
330
331 #[instrument(skip(self))]
333 pub async fn fetch_all_parties(&self) -> DamlJsonResult<Vec<DamlJsonParty>> {
334 Ok(self.fetch_all_parties_request().await?.result)
335 }
336
337 #[instrument(skip(self))]
339 pub async fn allocate_party(
340 &self,
341 identifier_hint: Option<&str>,
342 display_name: Option<&str>,
343 ) -> DamlJsonResult<DamlJsonParty> {
344 Ok(self.allocate_party_request(&DamlJsonAllocatePartyRequest::new(identifier_hint, display_name)).await?.result)
345 }
346
347 #[instrument(skip(self))]
349 pub async fn list_packages(&self) -> DamlJsonResult<Vec<String>> {
350 Ok(self.list_packages_request().await?.result)
351 }
352
353 #[instrument(skip(self))]
355 pub async fn download_package(&self, package_id: &str) -> DamlJsonResult<Vec<u8>> {
356 self.download_package_request(package_id).await
357 }
358
359 #[instrument(skip(self))]
361 pub async fn upload_dar(&self, content: Vec<u8>) -> DamlJsonResult<()> {
362 self.upload_dar_request(content).await?;
363 Ok(())
364 }
365
366 #[instrument(skip(self))]
367 async fn create_request(&self, request: &DamlJsonCreateRequest) -> DamlJsonResult<DamlJsonCreateResponse> {
368 self.post_json(Self::url(&self.config.url, CREATE_REST)?, request).await
369 }
370
371 #[instrument(skip(self))]
372 async fn exercise_request(&self, request: &DamlJsonExerciseRequest) -> DamlJsonResult<DamlJsonExerciseResponse> {
373 self.post_json(Self::url(&self.config.url, EXERCISE_REST)?, request).await
374 }
375
376 #[instrument(skip(self))]
377 async fn exercise_by_key_request(
378 &self,
379 request: &DamlJsonExerciseByKeyRequest,
380 ) -> DamlJsonResult<DamlJsonExerciseByKeyResponse> {
381 self.post_json(Self::url(&self.config.url, EXERCISE_REST)?, request).await
382 }
383
384 #[instrument(skip(self))]
385 async fn create_and_exercise_request(
386 &self,
387 request: &DamlJsonCreateAndExerciseRequest,
388 ) -> DamlJsonResult<DamlJsonCreateAndExerciseResponse> {
389 self.post_json(Self::url(&self.config.url, CREATE_AND_EXERCISE_REST)?, request).await
390 }
391
392 #[instrument(skip(self))]
393 async fn fetch_request(&self, request: &DamlJsonFetchRequest) -> DamlJsonResult<DamlJsonFetchResponse> {
394 self.post_json(Self::url(&self.config.url, FETCH_REST)?, request).await
395 }
396
397 #[instrument(skip(self))]
398 async fn fetch_by_key_request(&self, request: &DamlJsonFetchByKeyRequest) -> DamlJsonResult<DamlJsonFetchResponse> {
399 self.post_json(Self::url(&self.config.url, FETCH_REST)?, request).await
400 }
401
402 #[instrument(skip(self))]
403 async fn query_all_request(&self) -> DamlJsonResult<DamlJsonQueryResponse> {
404 self.get_json(Self::url(&self.config.url, QUERY_REST)?).await
405 }
406
407 #[instrument(skip(self))]
408 async fn query_request(&self, request: &DamlJsonQuery) -> DamlJsonResult<DamlJsonQueryResponse> {
409 self.post_json(Self::url(&self.config.url, QUERY_REST)?, request).await
410 }
411
412 #[instrument(skip(self))]
413 async fn fetch_parties_request(
414 &self,
415 request: &DamlJsonFetchPartiesRequest,
416 ) -> DamlJsonResult<DamlJsonFetchPartiesResponse> {
417 self.post_json(Self::url(&self.config.url, PARTIES_REST)?, request).await
418 }
419
420 #[instrument(skip(self))]
421 async fn fetch_all_parties_request(&self) -> DamlJsonResult<DamlJsonFetchPartiesResponse> {
422 self.get_json(Self::url(&self.config.url, PARTIES_REST)?).await
423 }
424
425 #[instrument(skip(self))]
426 async fn allocate_party_request(
427 &self,
428 request: &DamlJsonAllocatePartyRequest,
429 ) -> DamlJsonResult<DamlJsonAllocatePartyResponse> {
430 self.post_json(Self::url(&self.config.url, ALLOCATE_PARTY_REST)?, request).await
431 }
432
433 #[instrument(skip(self))]
434 async fn list_packages_request(&self) -> DamlJsonResult<DamlJsonListPackagesResponse> {
435 self.get_json(Self::url(&self.config.url, PACKAGES_REST)?).await
436 }
437
438 #[instrument(skip(self))]
439 async fn download_package_request(&self, package_id: &str) -> DamlJsonResult<Vec<u8>> {
440 Ok(self.get_bytes(Self::url(&self.config.url, &format!("{}/{}", PACKAGES_REST, package_id))?).await?.to_vec())
441 }
442
443 #[instrument(skip(self))]
444 async fn upload_dar_request(&self, bytes: Vec<u8>) -> DamlJsonResult<DamlJsonUploadDarResponse> {
445 self.post_bytes(Self::url(&self.config.url, PACKAGES_REST)?, bytes).await
446 }
447
448 #[instrument(skip(self))]
449 async fn get_json<R: DeserializeOwned>(&self, url: Url) -> DamlJsonResult<R> {
450 let request = self.make_get_request(&url);
451 trace!(?request);
452 let response = self.execute_with_retry(request).await?;
453 trace!(?response);
454 self.process_json_response(response).await
455 }
456
457 #[instrument(skip(self))]
458 async fn post_json<T: Serialize + Debug, R: DeserializeOwned>(&self, url: Url, json: T) -> DamlJsonResult<R> {
459 let request = self.make_post_request(&url).json(&json);
460 trace!(?request);
461 let response = self.execute_with_retry(request).await?;
462 trace!(?response);
463 self.process_json_response(response).await
464 }
465
466 #[instrument(skip(self))]
467 async fn get_bytes(&self, url: Url) -> DamlJsonResult<Bytes> {
468 let request = self.make_get_request(&url);
469 trace!(?request);
470 let response = self.execute_with_retry(request).await?;
471 trace!(?response);
472 self.process_bytes_response(response).await
473 }
474
475 #[instrument(skip(self))]
476 async fn post_bytes<R: DeserializeOwned>(&self, url: Url, bytes: impl Into<Bytes> + Debug) -> DamlJsonResult<R> {
477 let request =
478 self.make_post_request(&url).header("Content-Type", "application/octet-stream").body(bytes.into());
479 trace!(?request);
480 let response = self.execute_with_retry(request).await?;
481 trace!(?response);
482 self.process_json_response(response).await
483 }
484
485 fn make_post_request(&self, url: &Url) -> RequestBuilder {
486 match self.config.auth_token.as_deref() {
487 Some(token) => self.client.post(url.clone()).bearer_auth(token),
488 None => self.client.post(url.clone()),
489 }
490 }
491
492 fn make_get_request(&self, url: &Url) -> RequestBuilder {
493 match self.config.auth_token.as_deref() {
494 Some(token) => self.client.get(url.clone()).bearer_auth(token),
495 None => self.client.get(url.clone()),
496 }
497 }
498
499 async fn execute_with_retry(&self, request: RequestBuilder) -> DamlJsonResult<Response> {
501 let mut res = request.try_clone().req()?.send().await;
502 let start = Instant::now();
503 while let Err(e) = &res {
504 if start.elapsed() > self.config.connect_timeout {
505 return Ok(res?);
506 } else if Self::is_retryable_error(e) {
507 res = request.try_clone().req()?.send().await;
508 } else {
509 return Ok(res?);
510 }
511 }
512 Ok(res?)
513 }
514
515 async fn process_json_response<R: DeserializeOwned>(&self, res: Response) -> DamlJsonResult<R> {
516 if res.status().is_success() {
517 Ok(res.json::<R>().await?)
518 } else {
519 Err(self.process_error_response(res).await?)
520 }
521 }
522
523 async fn process_bytes_response(&self, res: Response) -> DamlJsonResult<Bytes> {
524 if res.status().is_success() {
525 Ok(res.bytes().await?)
526 } else {
527 Err(self.process_error_response(res).await?)
528 }
529 }
530
531 async fn process_error_response(&self, error_response: Response) -> DamlJsonResult<DamlJsonError> {
532 if error_response.status().is_client_error() || error_response.status().is_server_error() {
533 match error_response.content_length() {
534 Some(length) if length > 0 => {
535 let error_body = error_response.json::<DamlJsonErrorResponse>().await?;
536 Ok(DamlJsonError::ErrorResponse(error_body.status, error_body.errors.join(",")))
537 },
538 _ => Ok(DamlJsonError::UnhandledHttpResponse(error_response.status().to_string())),
539 }
540 } else {
541 Ok(DamlJsonError::UnhandledHttpResponse(error_response.status().to_string()))
542 }
543 }
544
545 fn is_retryable_error(error: &reqwest::Error) -> bool {
547 error.is_request()
548 }
549
550 fn url(base: &str, path: &str) -> DamlJsonResult<Url> {
551 Ok(Url::parse(base)?.join(path)?)
552 }
553}