daml_json/
service.rs

1use crate::data::{DamlJsonCreatedEvent, DamlJsonExerciseResult, DamlJsonParty, DamlJsonQuery};
2use crate::error::{DamlJsonError, DamlJsonResult};
3use crate::request::{
4    DamlJsonAllocatePartyRequest, DamlJsonAllocatePartyResponse, DamlJsonCreateAndExerciseRequest,
5    DamlJsonCreateAndExerciseResponse, DamlJsonCreateRequest, DamlJsonCreateResponse, DamlJsonErrorResponse,
6    DamlJsonExerciseByKeyRequest, DamlJsonExerciseByKeyResponse, DamlJsonExerciseRequest, DamlJsonExerciseResponse,
7    DamlJsonFetchByKeyRequest, DamlJsonFetchPartiesRequest, DamlJsonFetchPartiesResponse, DamlJsonFetchRequest,
8    DamlJsonFetchResponse, DamlJsonListPackagesResponse, DamlJsonQueryResponse, DamlJsonRequestMeta,
9    DamlJsonUploadDarResponse,
10};
11use crate::util::Required;
12use bytes::Bytes;
13use reqwest::{Certificate, Client, ClientBuilder, RequestBuilder, Response};
14use serde::de::DeserializeOwned;
15use serde::Serialize;
16use serde_json::Value;
17use std::fmt::Debug;
18use std::time::{Duration, Instant};
19use tracing::{instrument, trace};
20use url::Url;
21
22static CREATE_REST: &str = "/v1/create";
23static EXERCISE_REST: &str = "/v1/exercise";
24static CREATE_AND_EXERCISE_REST: &str = "/v1/create-and-exercise";
25static FETCH_REST: &str = "/v1/fetch";
26static QUERY_REST: &str = "/v1/query";
27static PARTIES_REST: &str = "/v1/parties";
28static ALLOCATE_PARTY_REST: &str = "/v1/parties/allocate";
29static PACKAGES_REST: &str = "/v1/packages";
30
31const DEFAULT_TIMEOUT_SECS: u64 = 5;
32
33/// Daml JSON client configuration options.
34#[derive(Debug, Default)]
35pub struct DamlJsonClientConfig {
36    url: String,
37    connect_timeout: Duration,
38    timeout: Duration,
39    keepalive: Option<Duration>,
40    nodelay: bool,
41    max_idle_connection_per_host: usize,
42    tls_config: Option<DamlJsonTlsConfig>,
43    auth_token: Option<String>,
44}
45
46/// Daml JSON client TLS configuration.
47#[derive(Debug)]
48pub struct DamlJsonTlsConfig {
49    ca_cert: Vec<u8>,
50}
51
52/// Daml JSON client builder.
53pub struct DamlJsonClientBuilder {
54    config: DamlJsonClientConfig,
55}
56
57impl DamlJsonClientBuilder {
58    /// Create a new [`DamlJsonClientBuilder`] for a given `url`.
59    pub fn url(url: impl Into<String>) -> Self {
60        Self {
61            config: DamlJsonClientConfig {
62                url: url.into(),
63                connect_timeout: Duration::from_secs(DEFAULT_TIMEOUT_SECS),
64                timeout: Duration::from_secs(DEFAULT_TIMEOUT_SECS),
65                ..DamlJsonClientConfig::default()
66            },
67        }
68    }
69
70    /// Set the connection timeout.
71    pub fn connect_timeout(self, connect_timeout: Duration) -> Self {
72        Self {
73            config: DamlJsonClientConfig {
74                connect_timeout,
75                ..self.config
76            },
77        }
78    }
79
80    /// Set the timeout.
81    pub fn timeout(self, timeout: Duration) -> Self {
82        Self {
83            config: DamlJsonClientConfig {
84                timeout,
85                ..self.config
86            },
87        }
88    }
89
90    /// Enable TCP keepalive.
91    pub fn keepalive(self, keepalive: Duration) -> Self {
92        Self {
93            config: DamlJsonClientConfig {
94                keepalive: Some(keepalive),
95                ..self.config
96            },
97        }
98    }
99
100    /// Enable TCP nodelay.
101    pub fn nodelay(self) -> Self {
102        Self {
103            config: DamlJsonClientConfig {
104                nodelay: true,
105                ..self.config
106            },
107        }
108    }
109
110    /// Set the maximum number of idle connections allowed per host.
111    pub fn max_idle_connection_per_host(self, max: usize) -> Self {
112        Self {
113            config: DamlJsonClientConfig {
114                max_idle_connection_per_host: max,
115                ..self.config
116            },
117        }
118    }
119
120    /// Set the TLS root CA.
121    pub fn with_tls(self, ca_cert: impl Into<Vec<u8>>) -> Self {
122        Self {
123            config: DamlJsonClientConfig {
124                tls_config: Some(DamlJsonTlsConfig {
125                    ca_cert: ca_cert.into(),
126                }),
127                ..self.config
128            },
129        }
130    }
131
132    /// Set the Bearer auth token.
133    pub fn with_auth(self, auth_token: String) -> Self {
134        Self {
135            config: DamlJsonClientConfig {
136                auth_token: Some(auth_token),
137                ..self.config
138            },
139        }
140    }
141
142    /// Build the [`DamlJsonClient`] from the configuration.
143    pub fn build(self) -> DamlJsonResult<DamlJsonClient> {
144        DamlJsonClient::new_from_config(self.config)
145    }
146}
147
148/// Daml JSON API client.
149///
150/// See [here](https://docs.daml.com/json-api) for full details of the Daml JSON API.
151///
152/// ## Examples
153///
154/// The following example connects to a Daml ledger via the JSON API and creates a contract:
155///
156/// ```no_run
157/// use serde_json::json;
158/// use daml_json::service::DamlJsonClientBuilder;
159/// use daml_json::error::DamlJsonResult;
160/// #[tokio::main]
161/// async fn main() -> DamlJsonResult<()> {
162///     let payload = json!({ "sender": "Alice", "receiver": "Bob", "count": "0" });
163///     let client = DamlJsonClientBuilder::url("https://api.myledger.org")
164///         .with_auth("...token...".into())
165///         .build()?;
166///     let create_response = client.create("Fuji.PingPong:Ping", payload.clone()).await?;
167///     assert_eq!(create_response.payload, payload);
168///     Ok(())
169/// }
170/// ```
171pub struct DamlJsonClient {
172    client: Client,
173    config: DamlJsonClientConfig,
174}
175
176impl DamlJsonClient {
177    /// Create a new [`DamlJsonClient`].
178    pub fn new(url: impl Into<String>, token: Option<String>) -> DamlJsonResult<Self> {
179        Ok(Self {
180            client: Client::new(),
181            config: DamlJsonClientConfig {
182                url: url.into(),
183                auth_token: token,
184                ..DamlJsonClientConfig::default()
185            },
186        })
187    }
188
189    /// Create a new [`DamlJsonClient`] from a [`DamlJsonClientConfig`].
190    pub fn new_from_config(config: DamlJsonClientConfig) -> DamlJsonResult<Self> {
191        let mut builder = ClientBuilder::default()
192            .connect_timeout(config.connect_timeout)
193            .timeout(config.timeout)
194            .pool_idle_timeout(config.keepalive)
195            .tcp_nodelay(config.nodelay)
196            .pool_max_idle_per_host(config.max_idle_connection_per_host)
197            .use_rustls_tls();
198        if let Some(cc) = &config.tls_config {
199            builder = builder.add_root_certificate(Certificate::from_pem(&cc.ca_cert)?);
200        }
201        Ok(Self {
202            client: builder.build()?,
203            config,
204        })
205    }
206
207    /// Return the current configuration.
208    pub const fn config(&self) -> &DamlJsonClientConfig {
209        &self.config
210    }
211
212    /// Create a new `Daml` contract.
213    #[instrument(skip(self))]
214    pub async fn create(&self, template_id: &str, payload: Value) -> DamlJsonResult<DamlJsonCreatedEvent> {
215        Ok(self.create_request(&DamlJsonCreateRequest::new(template_id, payload)).await?.result)
216    }
217
218    /// Create a new `Daml` Contract with optional meta field.
219    #[instrument(skip(self))]
220    pub async fn create_with_meta(
221        &self,
222        template_id: &str,
223        payload: Value,
224        meta: DamlJsonRequestMeta,
225    ) -> DamlJsonResult<DamlJsonCreatedEvent> {
226        Ok(self.create_request(&DamlJsonCreateRequest::new_with_meta(template_id, payload, meta)).await?.result)
227    }
228
229    /// Exercise a `Daml` choice by contract id.
230    #[instrument(skip(self))]
231    pub async fn exercise(
232        &self,
233        template_id: &str,
234        contract_id: &str,
235        choice: &str,
236        argument: Value,
237    ) -> DamlJsonResult<DamlJsonExerciseResult> {
238        Ok(self
239            .exercise_request(&DamlJsonExerciseRequest::new(template_id, contract_id, choice, argument))
240            .await?
241            .result)
242    }
243
244    /// Exercise a `Daml` choice by contract key.
245    #[instrument(skip(self))]
246    pub async fn exercise_by_key(
247        &self,
248        template_id: &str,
249        key: Value,
250        choice: &str,
251        argument: Value,
252    ) -> DamlJsonResult<DamlJsonExerciseResult> {
253        Ok(self
254            .exercise_by_key_request(&DamlJsonExerciseByKeyRequest::new(template_id, key, choice, argument))
255            .await?
256            .result)
257    }
258
259    /// Create and exercise a `Daml` choice.
260    #[instrument(skip(self))]
261    pub async fn create_and_exercise(
262        &self,
263        template_id: &str,
264        payload: Value,
265        choice: &str,
266        argument: Value,
267    ) -> DamlJsonResult<DamlJsonExerciseResult> {
268        Ok(self
269            .create_and_exercise_request(&DamlJsonCreateAndExerciseRequest::new(template_id, payload, choice, argument))
270            .await?
271            .result)
272    }
273
274    /// Fetch a `Daml` contract by id.
275    #[instrument(skip(self))]
276    pub async fn fetch(&self, contract_id: &str) -> DamlJsonResult<DamlJsonCreatedEvent> {
277        Ok(self.fetch_request(&DamlJsonFetchRequest::new(contract_id)).await?.result)
278    }
279
280    /// Fetch a `Daml` contract by key.
281    #[instrument(skip(self))]
282    pub async fn fetch_by_key(&self, template_id: &str, key: Value) -> DamlJsonResult<DamlJsonCreatedEvent> {
283        Ok(self.fetch_by_key_request(&DamlJsonFetchByKeyRequest::new(template_id, key)).await?.result)
284    }
285
286    /// List all currently active contracts for all known templates.
287    #[instrument(skip(self))]
288    pub async fn query_all(&self) -> DamlJsonResult<Vec<DamlJsonCreatedEvent>> {
289        Ok(self.query_all_request().await?.result)
290    }
291
292    /// List currently active contracts that match a given query.
293    #[instrument(skip(self))]
294    pub async fn query<S: Into<String> + Debug>(
295        &self,
296        template_ids: Vec<S>,
297        query: Value,
298    ) -> DamlJsonResult<Vec<DamlJsonCreatedEvent>> {
299        let templates: Vec<_> = template_ids.into_iter().map(Into::into).collect::<Vec<_>>();
300        Ok(self.query_request(&DamlJsonQuery::new(templates, query)).await?.result)
301    }
302
303    /// Fetch `Daml` Parties by identifiers.
304    ///
305    /// Retrieve the [`DamlJsonParty`] entries for the given `parties` identifiers.  Unknown parties are silently
306    /// discarded.
307    #[instrument(skip(self))]
308    pub async fn fetch_parties<S: Into<String> + Debug>(&self, parties: Vec<S>) -> DamlJsonResult<Vec<DamlJsonParty>> {
309        Ok(self
310            .fetch_parties_request(&DamlJsonFetchPartiesRequest::new(parties.into_iter().map(Into::into).collect()))
311            .await?
312            .result)
313    }
314
315    /// Fetch `Daml` Parties and unknown `Daml` Parties by identifiers.
316    ///
317    /// Retrieve the [`DamlJsonParty`] entries for the given `parties` identifiers and unknown party identifiers.
318    #[instrument(skip(self))]
319    pub async fn fetch_parties_with_unknown<S: Into<String> + Debug>(
320        &self,
321        parties: Vec<S>,
322    ) -> DamlJsonResult<(Vec<DamlJsonParty>, Vec<String>)> {
323        let response = self
324            .fetch_parties_request(&DamlJsonFetchPartiesRequest::new(parties.into_iter().map(Into::into).collect()))
325            .await?;
326        let unknown_parties =
327            response.warnings.and_then(|mut warnings| warnings.remove("unknownParties")).unwrap_or_default();
328        Ok((response.result, unknown_parties))
329    }
330
331    /// Fetch all known Parties.
332    #[instrument(skip(self))]
333    pub async fn fetch_all_parties(&self) -> DamlJsonResult<Vec<DamlJsonParty>> {
334        Ok(self.fetch_all_parties_request().await?.result)
335    }
336
337    /// Allocate Party.
338    #[instrument(skip(self))]
339    pub async fn allocate_party(
340        &self,
341        identifier_hint: Option<&str>,
342        display_name: Option<&str>,
343    ) -> DamlJsonResult<DamlJsonParty> {
344        Ok(self.allocate_party_request(&DamlJsonAllocatePartyRequest::new(identifier_hint, display_name)).await?.result)
345    }
346
347    /// List All `DALF` packages
348    #[instrument(skip(self))]
349    pub async fn list_packages(&self) -> DamlJsonResult<Vec<String>> {
350        Ok(self.list_packages_request().await?.result)
351    }
352
353    /// Download a `DALF` package.
354    #[instrument(skip(self))]
355    pub async fn download_package(&self, package_id: &str) -> DamlJsonResult<Vec<u8>> {
356        self.download_package_request(package_id).await
357    }
358
359    /// Upload a `DAR` file.
360    #[instrument(skip(self))]
361    pub async fn upload_dar(&self, content: Vec<u8>) -> DamlJsonResult<()> {
362        self.upload_dar_request(content).await?;
363        Ok(())
364    }
365
366    #[instrument(skip(self))]
367    async fn create_request(&self, request: &DamlJsonCreateRequest) -> DamlJsonResult<DamlJsonCreateResponse> {
368        self.post_json(Self::url(&self.config.url, CREATE_REST)?, request).await
369    }
370
371    #[instrument(skip(self))]
372    async fn exercise_request(&self, request: &DamlJsonExerciseRequest) -> DamlJsonResult<DamlJsonExerciseResponse> {
373        self.post_json(Self::url(&self.config.url, EXERCISE_REST)?, request).await
374    }
375
376    #[instrument(skip(self))]
377    async fn exercise_by_key_request(
378        &self,
379        request: &DamlJsonExerciseByKeyRequest,
380    ) -> DamlJsonResult<DamlJsonExerciseByKeyResponse> {
381        self.post_json(Self::url(&self.config.url, EXERCISE_REST)?, request).await
382    }
383
384    #[instrument(skip(self))]
385    async fn create_and_exercise_request(
386        &self,
387        request: &DamlJsonCreateAndExerciseRequest,
388    ) -> DamlJsonResult<DamlJsonCreateAndExerciseResponse> {
389        self.post_json(Self::url(&self.config.url, CREATE_AND_EXERCISE_REST)?, request).await
390    }
391
392    #[instrument(skip(self))]
393    async fn fetch_request(&self, request: &DamlJsonFetchRequest) -> DamlJsonResult<DamlJsonFetchResponse> {
394        self.post_json(Self::url(&self.config.url, FETCH_REST)?, request).await
395    }
396
397    #[instrument(skip(self))]
398    async fn fetch_by_key_request(&self, request: &DamlJsonFetchByKeyRequest) -> DamlJsonResult<DamlJsonFetchResponse> {
399        self.post_json(Self::url(&self.config.url, FETCH_REST)?, request).await
400    }
401
402    #[instrument(skip(self))]
403    async fn query_all_request(&self) -> DamlJsonResult<DamlJsonQueryResponse> {
404        self.get_json(Self::url(&self.config.url, QUERY_REST)?).await
405    }
406
407    #[instrument(skip(self))]
408    async fn query_request(&self, request: &DamlJsonQuery) -> DamlJsonResult<DamlJsonQueryResponse> {
409        self.post_json(Self::url(&self.config.url, QUERY_REST)?, request).await
410    }
411
412    #[instrument(skip(self))]
413    async fn fetch_parties_request(
414        &self,
415        request: &DamlJsonFetchPartiesRequest,
416    ) -> DamlJsonResult<DamlJsonFetchPartiesResponse> {
417        self.post_json(Self::url(&self.config.url, PARTIES_REST)?, request).await
418    }
419
420    #[instrument(skip(self))]
421    async fn fetch_all_parties_request(&self) -> DamlJsonResult<DamlJsonFetchPartiesResponse> {
422        self.get_json(Self::url(&self.config.url, PARTIES_REST)?).await
423    }
424
425    #[instrument(skip(self))]
426    async fn allocate_party_request(
427        &self,
428        request: &DamlJsonAllocatePartyRequest,
429    ) -> DamlJsonResult<DamlJsonAllocatePartyResponse> {
430        self.post_json(Self::url(&self.config.url, ALLOCATE_PARTY_REST)?, request).await
431    }
432
433    #[instrument(skip(self))]
434    async fn list_packages_request(&self) -> DamlJsonResult<DamlJsonListPackagesResponse> {
435        self.get_json(Self::url(&self.config.url, PACKAGES_REST)?).await
436    }
437
438    #[instrument(skip(self))]
439    async fn download_package_request(&self, package_id: &str) -> DamlJsonResult<Vec<u8>> {
440        Ok(self.get_bytes(Self::url(&self.config.url, &format!("{}/{}", PACKAGES_REST, package_id))?).await?.to_vec())
441    }
442
443    #[instrument(skip(self))]
444    async fn upload_dar_request(&self, bytes: Vec<u8>) -> DamlJsonResult<DamlJsonUploadDarResponse> {
445        self.post_bytes(Self::url(&self.config.url, PACKAGES_REST)?, bytes).await
446    }
447
448    #[instrument(skip(self))]
449    async fn get_json<R: DeserializeOwned>(&self, url: Url) -> DamlJsonResult<R> {
450        let request = self.make_get_request(&url);
451        trace!(?request);
452        let response = self.execute_with_retry(request).await?;
453        trace!(?response);
454        self.process_json_response(response).await
455    }
456
457    #[instrument(skip(self))]
458    async fn post_json<T: Serialize + Debug, R: DeserializeOwned>(&self, url: Url, json: T) -> DamlJsonResult<R> {
459        let request = self.make_post_request(&url).json(&json);
460        trace!(?request);
461        let response = self.execute_with_retry(request).await?;
462        trace!(?response);
463        self.process_json_response(response).await
464    }
465
466    #[instrument(skip(self))]
467    async fn get_bytes(&self, url: Url) -> DamlJsonResult<Bytes> {
468        let request = self.make_get_request(&url);
469        trace!(?request);
470        let response = self.execute_with_retry(request).await?;
471        trace!(?response);
472        self.process_bytes_response(response).await
473    }
474
475    #[instrument(skip(self))]
476    async fn post_bytes<R: DeserializeOwned>(&self, url: Url, bytes: impl Into<Bytes> + Debug) -> DamlJsonResult<R> {
477        let request =
478            self.make_post_request(&url).header("Content-Type", "application/octet-stream").body(bytes.into());
479        trace!(?request);
480        let response = self.execute_with_retry(request).await?;
481        trace!(?response);
482        self.process_json_response(response).await
483    }
484
485    fn make_post_request(&self, url: &Url) -> RequestBuilder {
486        match self.config.auth_token.as_deref() {
487            Some(token) => self.client.post(url.clone()).bearer_auth(token),
488            None => self.client.post(url.clone()),
489        }
490    }
491
492    fn make_get_request(&self, url: &Url) -> RequestBuilder {
493        match self.config.auth_token.as_deref() {
494            Some(token) => self.client.get(url.clone()).bearer_auth(token),
495            None => self.client.get(url.clone()),
496        }
497    }
498
499    // TODO need backoff on retries, but in an executor agnostic way...
500    async fn execute_with_retry(&self, request: RequestBuilder) -> DamlJsonResult<Response> {
501        let mut res = request.try_clone().req()?.send().await;
502        let start = Instant::now();
503        while let Err(e) = &res {
504            if start.elapsed() > self.config.connect_timeout {
505                return Ok(res?);
506            } else if Self::is_retryable_error(e) {
507                res = request.try_clone().req()?.send().await;
508            } else {
509                return Ok(res?);
510            }
511        }
512        Ok(res?)
513    }
514
515    async fn process_json_response<R: DeserializeOwned>(&self, res: Response) -> DamlJsonResult<R> {
516        if res.status().is_success() {
517            Ok(res.json::<R>().await?)
518        } else {
519            Err(self.process_error_response(res).await?)
520        }
521    }
522
523    async fn process_bytes_response(&self, res: Response) -> DamlJsonResult<Bytes> {
524        if res.status().is_success() {
525            Ok(res.bytes().await?)
526        } else {
527            Err(self.process_error_response(res).await?)
528        }
529    }
530
531    async fn process_error_response(&self, error_response: Response) -> DamlJsonResult<DamlJsonError> {
532        if error_response.status().is_client_error() || error_response.status().is_server_error() {
533            match error_response.content_length() {
534                Some(length) if length > 0 => {
535                    let error_body = error_response.json::<DamlJsonErrorResponse>().await?;
536                    Ok(DamlJsonError::ErrorResponse(error_body.status, error_body.errors.join(",")))
537                },
538                _ => Ok(DamlJsonError::UnhandledHttpResponse(error_response.status().to_string())),
539            }
540        } else {
541            Ok(DamlJsonError::UnhandledHttpResponse(error_response.status().to_string()))
542        }
543    }
544
545    /// TODO, which errors are retryable?
546    fn is_retryable_error(error: &reqwest::Error) -> bool {
547        error.is_request()
548    }
549
550    fn url(base: &str, path: &str) -> DamlJsonResult<Url> {
551        Ok(Url::parse(base)?.join(path)?)
552    }
553}