Skip to main content

cloudconvert_sdk/
client.rs

1#[cfg(feature = "retry")]
2use std::time::Duration;
3use std::{
4    error::Error as StdError,
5    ffi::{OsStr, OsString},
6    path::{Path, PathBuf},
7    sync::Arc,
8    time::{SystemTime, UNIX_EPOCH},
9};
10
11use bytes::Bytes;
12use futures_util::{Stream, StreamExt, TryStream};
13use reqwest::{
14    Body, Method, Response,
15    header::{HeaderMap, LOCATION},
16    multipart,
17};
18use serde::{Serialize, de::DeserializeOwned};
19use serde_json::Value;
20use tokio::io::AsyncWriteExt;
21use url::Url;
22
23use crate::{
24    Result,
25    config::{ApiKey, ClientBuilder, CloudConvertConfig, OAuthAccessToken},
26    error::{ApiErrorBody, Error},
27    jobs::{
28        ApiResponse, DataEnvelope, Job, JobCreateRequest, JobGetQuery, JobListQuery, Page,
29        RateLimit, Task, TaskGetQuery, TaskListQuery, UploadForm,
30    },
31    operations::{Operation, OperationListQuery},
32    resources::{User, Webhook, WebhookCreateRequest, WebhookListQuery},
33    socket::{SocketChannel, SocketSubscription},
34    tasks::TaskRequest,
35};
36
37#[cfg(feature = "socket")]
38use crate::socket::CloudConvertSocket;
39
40#[derive(Clone, Debug)]
41pub struct CloudConvertClient {
42    config: Arc<CloudConvertConfig>,
43    http: reqwest::Client,
44    redirectless_http: reqwest::Client,
45}
46
47impl CloudConvertClient {
48    pub fn builder(api_key: ApiKey) -> ClientBuilder {
49        ClientBuilder::new(api_key)
50    }
51
52    pub fn builder_with_access_token(access_token: OAuthAccessToken) -> ClientBuilder {
53        ClientBuilder::new_with_access_token(access_token)
54    }
55
56    pub(crate) fn from_parts(
57        config: CloudConvertConfig,
58        http: reqwest::Client,
59        redirectless_http: reqwest::Client,
60    ) -> Result<Self> {
61        Ok(Self {
62            config: Arc::new(config),
63            http,
64            redirectless_http,
65        })
66    }
67
68    pub fn config(&self) -> &CloudConvertConfig {
69        &self.config
70    }
71
72    pub fn jobs(&self) -> JobsResource {
73        JobsResource {
74            client: self.clone(),
75        }
76    }
77
78    pub fn tasks(&self) -> TasksResource {
79        TasksResource {
80            client: self.clone(),
81        }
82    }
83
84    pub fn users(&self) -> UsersResource {
85        UsersResource {
86            client: self.clone(),
87        }
88    }
89
90    pub fn webhooks(&self) -> WebhooksResource {
91        WebhooksResource {
92            client: self.clone(),
93        }
94    }
95
96    pub fn operations(&self) -> OperationsResource {
97        OperationsResource {
98            client: self.clone(),
99        }
100    }
101
102    pub fn socket_base_url(&self) -> &'static str {
103        crate::socket::socket_base_url(self.config.sandbox())
104    }
105
106    pub fn socket_subscription(&self, channel: SocketChannel) -> SocketSubscription {
107        SocketSubscription::new(channel.name().into_owned(), self.config.credential.expose())
108    }
109
110    #[cfg(feature = "socket")]
111    pub async fn socket(
112        &self,
113        channels: impl IntoIterator<Item = SocketChannel>,
114    ) -> Result<CloudConvertSocket> {
115        let subscriptions = channels
116            .into_iter()
117            .map(|channel| self.socket_subscription(channel));
118        CloudConvertSocket::connect(self.socket_base_url(), subscriptions).await
119    }
120
121    #[cfg(feature = "socket")]
122    pub async fn socket_with_buffer(
123        &self,
124        channels: impl IntoIterator<Item = SocketChannel>,
125        buffer: usize,
126    ) -> Result<CloudConvertSocket> {
127        let subscriptions = channels
128            .into_iter()
129            .map(|channel| self.socket_subscription(channel));
130        CloudConvertSocket::connect_with_buffer(self.socket_base_url(), subscriptions, buffer).await
131    }
132
133    pub async fn download(&self, url: impl AsRef<str>) -> Result<Bytes> {
134        let response = self.http.get(url.as_ref()).send().await?;
135        Self::ensure_success(response)
136            .await?
137            .bytes()
138            .await
139            .map_err(Error::Http)
140    }
141
142    pub async fn download_stream(
143        &self,
144        url: impl AsRef<str>,
145    ) -> Result<impl Stream<Item = Result<Bytes>>> {
146        let response = self.http.get(url.as_ref()).send().await?;
147        Ok(Self::ensure_success(response)
148            .await?
149            .bytes_stream()
150            .map(|chunk| chunk.map_err(Error::Http)))
151    }
152
153    pub async fn download_to_path(
154        &self,
155        url: impl AsRef<str>,
156        destination: impl AsRef<Path>,
157    ) -> Result<()> {
158        let destination = destination.as_ref();
159        let temp_path = temporary_download_path(destination);
160        self.download_to_temporary_path(url.as_ref(), destination, temp_path)
161            .await
162    }
163
164    pub async fn upload_bytes(
165        &self,
166        task: &Task,
167        filename: impl Into<String>,
168        bytes: impl Into<Bytes>,
169    ) -> Result<()> {
170        self.upload_part(
171            task,
172            "file",
173            multipart::Part::stream(Body::from(bytes.into())).file_name(filename.into()),
174        )
175        .await
176    }
177
178    pub async fn upload_body(
179        &self,
180        task: &Task,
181        filename: impl Into<String>,
182        body: Body,
183    ) -> Result<()> {
184        self.upload_part(
185            task,
186            "file",
187            multipart::Part::stream(body).file_name(filename.into()),
188        )
189        .await
190    }
191
192    pub async fn upload_stream<S, B, E>(
193        &self,
194        task: &Task,
195        filename: impl Into<String>,
196        stream: S,
197    ) -> Result<()>
198    where
199        S: TryStream<Ok = B, Error = E> + Send + 'static,
200        Bytes: From<B>,
201        E: Into<Box<dyn StdError + Send + Sync>>,
202    {
203        self.upload_body(task, filename, Body::wrap_stream(stream))
204            .await
205    }
206
207    pub async fn upload_path(&self, task: &Task, path: impl AsRef<Path>) -> Result<()> {
208        self.upload_part(task, "file", multipart::Part::file(path).await?)
209            .await
210    }
211
212    async fn download_to_temporary_path(
213        &self,
214        url: &str,
215        destination: &Path,
216        temp_path: PathBuf,
217    ) -> Result<()> {
218        let mut temp_file = TemporaryDownload::new(temp_path);
219        let mut stream = self.download_stream(url).await?;
220        let mut file = tokio::fs::File::create(temp_file.path()).await?;
221
222        while let Some(chunk) = stream.next().await {
223            file.write_all(&chunk?).await?;
224        }
225
226        file.flush().await?;
227        drop(file);
228        tokio::fs::rename(temp_file.path(), destination).await?;
229        temp_file.commit();
230        Ok(())
231    }
232
233    async fn upload_part(
234        &self,
235        task: &Task,
236        field_name: &str,
237        part: multipart::Part,
238    ) -> Result<()> {
239        let form = Self::upload_form(task)?;
240        let mut multipart = multipart::Form::new();
241        for (key, value) in &form.parameters {
242            multipart = multipart.text(key.clone(), form_value(value));
243        }
244        multipart = multipart.part(field_name.to_string(), part);
245
246        let response = self
247            .http
248            .post(&form.url)
249            .multipart(multipart)
250            .send()
251            .await?;
252        Self::ensure_success(response).await?;
253        Ok(())
254    }
255
256    fn upload_form(task: &Task) -> Result<&UploadForm> {
257        task.upload_form().ok_or(Error::UploadTaskNotReady)
258    }
259
260    async fn get_response<T>(&self, base: &Url, path: &str) -> Result<ApiResponse<T>>
261    where
262        T: DeserializeOwned,
263    {
264        let url = api_url(base, path)?;
265        let response = self
266            .send_api(|| {
267                self.http
268                    .request(Method::GET, url.clone())
269                    .bearer_auth(self.config.credential.expose())
270            })
271            .await?;
272        Self::decode_response(response).await
273    }
274
275    async fn get_with_query_response<T, Q>(
276        &self,
277        base: &Url,
278        path: &str,
279        query: &Q,
280    ) -> Result<ApiResponse<T>>
281    where
282        T: DeserializeOwned,
283        Q: Serialize + ?Sized,
284    {
285        let url = api_url(base, path)?;
286        let response = self
287            .send_api(|| {
288                self.http
289                    .request(Method::GET, url.clone())
290                    .bearer_auth(self.config.credential.expose())
291                    .query(query)
292            })
293            .await?;
294        Self::decode_response(response).await
295    }
296
297    async fn post_response<T, B>(&self, base: &Url, path: &str, body: &B) -> Result<ApiResponse<T>>
298    where
299        T: DeserializeOwned,
300        B: Serialize + ?Sized,
301    {
302        let url = api_url(base, path)?;
303        let response = self
304            .send_api(|| {
305                self.http
306                    .request(Method::POST, url.clone())
307                    .bearer_auth(self.config.credential.expose())
308                    .json(body)
309            })
310            .await?;
311        Self::decode_response(response).await
312    }
313
314    async fn delete(&self, base: &Url, path: &str) -> Result<()> {
315        let url = api_url(base, path)?;
316        let response = self
317            .send_api(|| {
318                self.http
319                    .request(Method::DELETE, url.clone())
320                    .bearer_auth(self.config.credential.expose())
321            })
322            .await?;
323        Self::ensure_success(response).await?;
324        Ok(())
325    }
326
327    async fn get_redirect_location<Q>(&self, base: &Url, path: &str, query: &Q) -> Result<Url>
328    where
329        Q: Serialize + ?Sized,
330    {
331        let url = api_url(base, path)?;
332        let response = self
333            .send_api(|| {
334                self.redirectless_http
335                    .request(Method::GET, url.clone())
336                    .bearer_auth(self.config.credential.expose())
337                    .query(query)
338            })
339            .await?;
340        Self::decode_redirect_location(response).await
341    }
342
343    async fn post_redirect_location<B>(&self, base: &Url, path: &str, body: &B) -> Result<Url>
344    where
345        B: Serialize + ?Sized,
346    {
347        let url = api_url(base, path)?;
348        let response = self
349            .send_api(|| {
350                self.redirectless_http
351                    .request(Method::POST, url.clone())
352                    .bearer_auth(self.config.credential.expose())
353                    .json(body)
354            })
355            .await?;
356        Self::decode_redirect_location(response).await
357    }
358
359    async fn send_api<F>(&self, build: F) -> Result<Response>
360    where
361        F: Fn() -> reqwest::RequestBuilder,
362    {
363        #[cfg(feature = "retry")]
364        if let Some(policy) = &self.config.retry_policy {
365            return self.send_api_with_retry(build, policy).await;
366        }
367
368        Ok(build().send().await?)
369    }
370
371    #[cfg(feature = "retry")]
372    async fn send_api_with_retry<F>(
373        &self,
374        build: F,
375        policy: &crate::RetryPolicy,
376    ) -> Result<Response>
377    where
378        F: Fn() -> reqwest::RequestBuilder,
379    {
380        let attempts = policy.max_attempts_value().max(1);
381        let mut delay = policy.initial_delay_value();
382
383        for attempt in 1..=attempts {
384            match build().send().await {
385                Ok(response) => {
386                    if attempt == attempts || !Self::is_retryable_status(response.status().as_u16())
387                    {
388                        return Ok(response);
389                    }
390
391                    let retry_after = policy.respect_retry_after_value().then(|| {
392                        response
393                            .headers()
394                            .get("retry-after")
395                            .and_then(|value| value.to_str().ok())
396                            .and_then(|value| value.parse::<u64>().ok())
397                            .map(Duration::from_secs)
398                    });
399                    let sleep_for = retry_after
400                        .flatten()
401                        .map(|duration| duration.min(policy.max_delay_value()))
402                        .unwrap_or(delay);
403                    tokio::time::sleep(sleep_for).await;
404                }
405                Err(error) => {
406                    if attempt == attempts || !Self::is_retryable_error(&error) {
407                        return Err(Error::Http(error));
408                    }
409
410                    tokio::time::sleep(delay).await;
411                }
412            }
413
414            delay = next_retry_delay(delay, policy);
415        }
416
417        unreachable!("retry loop always returns on the final attempt")
418    }
419
420    #[cfg(feature = "retry")]
421    fn is_retryable_status(status: u16) -> bool {
422        matches!(status, 429 | 500 | 502 | 503 | 504)
423    }
424
425    #[cfg(feature = "retry")]
426    fn is_retryable_error(error: &reqwest::Error) -> bool {
427        error.is_connect() || error.is_timeout()
428    }
429
430    async fn decode_response<T>(response: Response) -> Result<ApiResponse<T>>
431    where
432        T: DeserializeOwned,
433    {
434        let rate_limit = Self::rate_limit_from_headers(response.headers());
435        let response = Self::ensure_success(response).await?;
436        let envelope = response.json::<DataEnvelope<T>>().await?;
437        Ok(ApiResponse {
438            data: envelope.data,
439            links: envelope.links,
440            meta: envelope.meta,
441            rate_limit,
442        })
443    }
444
445    fn into_page<T>(response: ApiResponse<Vec<T>>) -> Page<T> {
446        Page {
447            data: response.data,
448            links: response.links,
449            meta: response.meta,
450            rate_limit: response.rate_limit,
451        }
452    }
453
454    async fn decode_redirect_location(response: Response) -> Result<Url> {
455        let response_url = response.url().clone();
456        if response.status().is_redirection() {
457            let location = response
458                .headers()
459                .get(LOCATION)
460                .ok_or(Error::MissingRedirectLocation)?;
461            let location = location
462                .to_str()
463                .map_err(|_| Error::MissingRedirectLocation)?;
464            return response_url.join(location).map_err(Error::Url);
465        }
466
467        let _ = Self::ensure_success(response).await?;
468        Err(Error::MissingRedirectLocation)
469    }
470
471    async fn ensure_success(response: Response) -> Result<Response> {
472        if response.status().is_success() {
473            return Ok(response);
474        }
475
476        let status = response.status().as_u16();
477        let rate_limit = Self::rate_limit_from_headers(response.headers());
478        let body = response.text().await.unwrap_or_default();
479        let parsed = serde_json::from_str::<ApiErrorBody>(&body).ok();
480        let message = parsed
481            .as_ref()
482            .and_then(|body| body.message.clone())
483            .filter(|message| !message.is_empty())
484            .unwrap_or_else(|| "request failed".to_string());
485        let code = parsed.as_ref().and_then(|body| body.code.clone());
486        let errors = parsed.and_then(|body| body.errors);
487
488        Err(Error::Api {
489            status,
490            message,
491            code,
492            errors: errors.map(Box::new),
493            rate_limit: rate_limit.map(Box::new),
494        })
495    }
496
497    fn rate_limit_from_headers(headers: &HeaderMap) -> Option<RateLimit> {
498        let rate_limit = RateLimit {
499            limit: header_u64(headers, "x-ratelimit-limit"),
500            remaining: header_u64(headers, "x-ratelimit-remaining"),
501            reset: header_u64(headers, "x-ratelimit-reset"),
502            retry_after: header_u64(headers, "retry-after"),
503        };
504
505        if rate_limit.limit.is_some()
506            || rate_limit.remaining.is_some()
507            || rate_limit.reset.is_some()
508            || rate_limit.retry_after.is_some()
509        {
510            Some(rate_limit)
511        } else {
512            None
513        }
514    }
515}
516
517fn header_u64(headers: &HeaderMap, name: &'static str) -> Option<u64> {
518    headers.get(name)?.to_str().ok()?.parse().ok()
519}
520
521fn api_url(base: &Url, path: &str) -> Result<Url> {
522    validate_api_path(path)?;
523    base.join(path).map_err(Error::Url)
524}
525
526fn validate_api_path(path: &str) -> Result<()> {
527    let valid = !path.is_empty()
528        && path.split('/').all(|segment| {
529            !segment.is_empty()
530                && segment
531                    .bytes()
532                    .all(|byte| byte.is_ascii_alphanumeric() || matches!(byte, b'-' | b'_'))
533        });
534
535    if valid {
536        Ok(())
537    } else {
538        Err(Error::InvalidApiPath)
539    }
540}
541
542fn resource_id(id: &str) -> Result<&str> {
543    let valid = !id.is_empty()
544        && id
545            .bytes()
546            .all(|byte| byte.is_ascii_alphanumeric() || matches!(byte, b'-' | b'_'));
547
548    if valid {
549        Ok(id)
550    } else {
551        Err(Error::InvalidResourceId)
552    }
553}
554
555#[cfg(feature = "retry")]
556fn next_retry_delay(current: Duration, policy: &crate::RetryPolicy) -> Duration {
557    let next = current.mul_f64(policy.backoff_factor_value());
558    next.min(policy.max_delay_value())
559}
560
561#[derive(Clone, Debug)]
562pub struct JobsResource {
563    client: CloudConvertClient,
564}
565
566impl JobsResource {
567    pub async fn list(&self, query: &JobListQuery) -> Result<Vec<Job>> {
568        Ok(self.list_page(query).await?.data)
569    }
570
571    pub async fn list_page(&self, query: &JobListQuery) -> Result<Page<Job>> {
572        let response = self
573            .client
574            .get_with_query_response(&self.client.config.api_base_url, "jobs", query)
575            .await?;
576        Ok(CloudConvertClient::into_page(response))
577    }
578
579    pub async fn create(&self, request: impl Into<JobCreateRequest>) -> Result<Job> {
580        Ok(self.create_response(request).await?.data)
581    }
582
583    pub async fn create_response(
584        &self,
585        request: impl Into<JobCreateRequest>,
586    ) -> Result<ApiResponse<Job>> {
587        self.client
588            .post_response(&self.client.config.api_base_url, "jobs", &request.into())
589            .await
590    }
591
592    pub async fn create_and_wait(&self, request: impl Into<JobCreateRequest>) -> Result<Job> {
593        Ok(self.create_and_wait_response(request).await?.data)
594    }
595
596    #[cfg(feature = "socket")]
597    pub async fn create_and_wait_socket(
598        &self,
599        request: impl Into<JobCreateRequest>,
600    ) -> Result<Job> {
601        let job = self.create(request).await?;
602        if job.is_terminal() {
603            return Ok(job);
604        }
605
606        self.wait_socket(&job.id).await
607    }
608
609    pub async fn create_and_wait_response(
610        &self,
611        request: impl Into<JobCreateRequest>,
612    ) -> Result<ApiResponse<Job>> {
613        self.client
614            .post_response(&self.client.config.sync_base_url, "jobs", &request.into())
615            .await
616    }
617
618    pub async fn create_and_wait_redirect_url(
619        &self,
620        request: impl Into<JobCreateRequest>,
621    ) -> Result<Url> {
622        let mut request = request.into();
623        request.redirect = Some(true);
624        self.client
625            .post_redirect_location(&self.client.config.sync_base_url, "jobs", &request)
626            .await
627    }
628
629    pub async fn get(&self, id: impl AsRef<str>) -> Result<Job> {
630        Ok(self.get_response(id).await?.data)
631    }
632
633    pub async fn get_response(&self, id: impl AsRef<str>) -> Result<ApiResponse<Job>> {
634        let id = resource_id(id.as_ref())?;
635        self.client
636            .get_response(&self.client.config.api_base_url, &format!("jobs/{id}"))
637            .await
638    }
639
640    pub async fn get_with_query(&self, id: impl AsRef<str>, query: &JobGetQuery) -> Result<Job> {
641        Ok(self.get_with_query_response(id, query).await?.data)
642    }
643
644    pub async fn get_with_query_response(
645        &self,
646        id: impl AsRef<str>,
647        query: &JobGetQuery,
648    ) -> Result<ApiResponse<Job>> {
649        let id = resource_id(id.as_ref())?;
650        self.client
651            .get_with_query_response(
652                &self.client.config.api_base_url,
653                &format!("jobs/{id}"),
654                query,
655            )
656            .await
657    }
658
659    pub async fn get_redirect_url(&self, id: impl AsRef<str>) -> Result<Url> {
660        let id = resource_id(id.as_ref())?;
661        let query = JobGetQuery::default().redirect(true);
662        self.client
663            .get_redirect_location(
664                &self.client.config.api_base_url,
665                &format!("jobs/{id}"),
666                &query,
667            )
668            .await
669    }
670
671    pub async fn wait(&self, id: impl AsRef<str>) -> Result<Job> {
672        Ok(self.wait_response(id).await?.data)
673    }
674
675    #[cfg(feature = "socket")]
676    pub async fn wait_socket(&self, id: impl AsRef<str>) -> Result<Job> {
677        let id = resource_id(id.as_ref())?.to_string();
678        let mut socket = self.client.socket([SocketChannel::job(id.clone())]).await?;
679        let current = self.get(&id).await?;
680        if current.is_terminal() {
681            let _ = socket.disconnect().await;
682            return Ok(current);
683        }
684
685        loop {
686            let event = socket
687                .next_event()
688                .await
689                .ok_or_else(|| Error::Socket(format!("socket closed before job {id} completed")))?;
690
691            if !event.is_job_event() || !event.is_terminal() {
692                continue;
693            }
694
695            let job = match event.job()? {
696                Some(job) if job.id == id => job,
697                Some(_) => continue,
698                None => self.get(&id).await?,
699            };
700
701            let _ = socket.disconnect().await;
702            return Ok(job);
703        }
704    }
705
706    #[cfg(feature = "socket")]
707    pub async fn task_events_socket(&self, id: impl AsRef<str>) -> Result<CloudConvertSocket> {
708        self.task_events_socket_with_buffer(id, 64).await
709    }
710
711    #[cfg(feature = "socket")]
712    pub async fn task_events_socket_with_buffer(
713        &self,
714        id: impl AsRef<str>,
715        buffer: usize,
716    ) -> Result<CloudConvertSocket> {
717        let id = resource_id(id.as_ref())?.to_string();
718        self.client
719            .socket_with_buffer([SocketChannel::job_tasks(id)], buffer)
720            .await
721    }
722
723    pub async fn wait_response(&self, id: impl AsRef<str>) -> Result<ApiResponse<Job>> {
724        let id = resource_id(id.as_ref())?;
725        self.client
726            .get_response(&self.client.config.sync_base_url, &format!("jobs/{id}"))
727            .await
728    }
729
730    pub async fn wait_with_query(&self, id: impl AsRef<str>, query: &JobGetQuery) -> Result<Job> {
731        Ok(self.wait_with_query_response(id, query).await?.data)
732    }
733
734    pub async fn wait_with_query_response(
735        &self,
736        id: impl AsRef<str>,
737        query: &JobGetQuery,
738    ) -> Result<ApiResponse<Job>> {
739        let id = resource_id(id.as_ref())?;
740        self.client
741            .get_with_query_response(
742                &self.client.config.sync_base_url,
743                &format!("jobs/{id}"),
744                query,
745            )
746            .await
747    }
748
749    pub async fn wait_redirect_url(&self, id: impl AsRef<str>) -> Result<Url> {
750        let id = resource_id(id.as_ref())?;
751        let query = JobGetQuery::default().redirect(true);
752        self.client
753            .get_redirect_location(
754                &self.client.config.sync_base_url,
755                &format!("jobs/{id}"),
756                &query,
757            )
758            .await
759    }
760
761    pub async fn delete(&self, id: impl AsRef<str>) -> Result<()> {
762        let id = resource_id(id.as_ref())?;
763        self.client
764            .delete(&self.client.config.api_base_url, &format!("jobs/{id}"))
765            .await
766    }
767}
768
769#[derive(Clone, Debug)]
770pub struct TasksResource {
771    client: CloudConvertClient,
772}
773
774impl TasksResource {
775    pub async fn list(&self, query: &TaskListQuery) -> Result<Vec<Task>> {
776        Ok(self.list_page(query).await?.data)
777    }
778
779    pub async fn list_page(&self, query: &TaskListQuery) -> Result<Page<Task>> {
780        let response = self
781            .client
782            .get_with_query_response(&self.client.config.api_base_url, "tasks", query)
783            .await?;
784        Ok(CloudConvertClient::into_page(response))
785    }
786
787    pub async fn create(&self, request: impl Into<TaskRequest>) -> Result<Task> {
788        Ok(self.create_response(request).await?.data)
789    }
790
791    pub async fn create_response(
792        &self,
793        request: impl Into<TaskRequest>,
794    ) -> Result<ApiResponse<Task>> {
795        let request = request.into();
796        let path = request.operation().to_string();
797        self.client
798            .post_response(&self.client.config.api_base_url, &path, &request)
799            .await
800    }
801
802    pub async fn get(&self, id: impl AsRef<str>) -> Result<Task> {
803        Ok(self.get_response(id).await?.data)
804    }
805
806    pub async fn get_response(&self, id: impl AsRef<str>) -> Result<ApiResponse<Task>> {
807        let id = resource_id(id.as_ref())?;
808        self.client
809            .get_response(&self.client.config.api_base_url, &format!("tasks/{id}"))
810            .await
811    }
812
813    pub async fn get_with_query(&self, id: impl AsRef<str>, query: &TaskGetQuery) -> Result<Task> {
814        Ok(self.get_with_query_response(id, query).await?.data)
815    }
816
817    pub async fn get_with_query_response(
818        &self,
819        id: impl AsRef<str>,
820        query: &TaskGetQuery,
821    ) -> Result<ApiResponse<Task>> {
822        let id = resource_id(id.as_ref())?;
823        self.client
824            .get_with_query_response(
825                &self.client.config.api_base_url,
826                &format!("tasks/{id}"),
827                query,
828            )
829            .await
830    }
831
832    pub async fn wait(&self, id: impl AsRef<str>) -> Result<Task> {
833        Ok(self.wait_response(id).await?.data)
834    }
835
836    #[cfg(feature = "socket")]
837    pub async fn wait_socket(&self, id: impl AsRef<str>) -> Result<Task> {
838        let id = resource_id(id.as_ref())?.to_string();
839        let mut socket = self
840            .client
841            .socket([SocketChannel::task(id.clone())])
842            .await?;
843        let current = self.get(&id).await?;
844        if current.is_terminal() {
845            let _ = socket.disconnect().await;
846            return Ok(current);
847        }
848
849        loop {
850            let event = socket.next_event().await.ok_or_else(|| {
851                Error::Socket(format!("socket closed before task {id} completed"))
852            })?;
853
854            if !event.is_task_event() || !event.is_terminal() {
855                continue;
856            }
857
858            let task = match event.task()? {
859                Some(task) if task.id == id => task,
860                Some(_) => continue,
861                None => self.get(&id).await?,
862            };
863
864            let _ = socket.disconnect().await;
865            return Ok(task);
866        }
867    }
868
869    pub async fn wait_response(&self, id: impl AsRef<str>) -> Result<ApiResponse<Task>> {
870        let id = resource_id(id.as_ref())?;
871        self.client
872            .get_response(&self.client.config.sync_base_url, &format!("tasks/{id}"))
873            .await
874    }
875
876    pub async fn wait_with_query(&self, id: impl AsRef<str>, query: &TaskGetQuery) -> Result<Task> {
877        Ok(self.wait_with_query_response(id, query).await?.data)
878    }
879
880    pub async fn wait_with_query_response(
881        &self,
882        id: impl AsRef<str>,
883        query: &TaskGetQuery,
884    ) -> Result<ApiResponse<Task>> {
885        let id = resource_id(id.as_ref())?;
886        self.client
887            .get_with_query_response(
888                &self.client.config.sync_base_url,
889                &format!("tasks/{id}"),
890                query,
891            )
892            .await
893    }
894
895    pub async fn cancel(&self, id: impl AsRef<str>) -> Result<Task> {
896        Ok(self.cancel_response(id).await?.data)
897    }
898
899    pub async fn cancel_response(&self, id: impl AsRef<str>) -> Result<ApiResponse<Task>> {
900        let id = resource_id(id.as_ref())?;
901        let empty = serde_json::json!({});
902        self.client
903            .post_response(
904                &self.client.config.api_base_url,
905                &format!("tasks/{id}/cancel"),
906                &empty,
907            )
908            .await
909    }
910
911    pub async fn retry(&self, id: impl AsRef<str>) -> Result<Task> {
912        Ok(self.retry_response(id).await?.data)
913    }
914
915    pub async fn retry_response(&self, id: impl AsRef<str>) -> Result<ApiResponse<Task>> {
916        let id = resource_id(id.as_ref())?;
917        let empty = serde_json::json!({});
918        self.client
919            .post_response(
920                &self.client.config.api_base_url,
921                &format!("tasks/{id}/retry"),
922                &empty,
923            )
924            .await
925    }
926
927    pub async fn delete(&self, id: impl AsRef<str>) -> Result<()> {
928        let id = resource_id(id.as_ref())?;
929        self.client
930            .delete(&self.client.config.api_base_url, &format!("tasks/{id}"))
931            .await
932    }
933}
934
935#[derive(Clone, Debug)]
936pub struct UsersResource {
937    client: CloudConvertClient,
938}
939
940impl UsersResource {
941    pub async fn me(&self) -> Result<User> {
942        Ok(self.me_response().await?.data)
943    }
944
945    pub async fn me_response(&self) -> Result<ApiResponse<User>> {
946        self.client
947            .get_response(&self.client.config.api_base_url, "users/me")
948            .await
949    }
950
951    #[cfg(feature = "socket")]
952    pub async fn job_events_socket(&self) -> Result<CloudConvertSocket> {
953        self.job_events_socket_with_buffer(64).await
954    }
955
956    #[cfg(feature = "socket")]
957    pub async fn job_events_socket_with_buffer(&self, buffer: usize) -> Result<CloudConvertSocket> {
958        let user = self.me().await?;
959        self.client
960            .socket_with_buffer([SocketChannel::user_jobs(user.id)], buffer)
961            .await
962    }
963
964    #[cfg(feature = "socket")]
965    pub async fn task_events_socket(&self) -> Result<CloudConvertSocket> {
966        self.task_events_socket_with_buffer(64).await
967    }
968
969    #[cfg(feature = "socket")]
970    pub async fn task_events_socket_with_buffer(
971        &self,
972        buffer: usize,
973    ) -> Result<CloudConvertSocket> {
974        let user = self.me().await?;
975        self.client
976            .socket_with_buffer([SocketChannel::user_tasks(user.id)], buffer)
977            .await
978    }
979
980    #[cfg(feature = "socket")]
981    pub async fn events_socket(&self) -> Result<CloudConvertSocket> {
982        self.events_socket_with_buffer(64).await
983    }
984
985    #[cfg(feature = "socket")]
986    pub async fn events_socket_with_buffer(&self, buffer: usize) -> Result<CloudConvertSocket> {
987        let user = self.me().await?;
988        self.client
989            .socket_with_buffer(
990                [
991                    SocketChannel::user_jobs(user.id.clone()),
992                    SocketChannel::user_tasks(user.id),
993                ],
994                buffer,
995            )
996            .await
997    }
998}
999
1000#[derive(Clone, Debug)]
1001pub struct WebhooksResource {
1002    client: CloudConvertClient,
1003}
1004
1005impl WebhooksResource {
1006    pub async fn create(&self, request: &WebhookCreateRequest) -> Result<Webhook> {
1007        Ok(self.create_response(request).await?.data)
1008    }
1009
1010    pub async fn create_response(
1011        &self,
1012        request: &WebhookCreateRequest,
1013    ) -> Result<ApiResponse<Webhook>> {
1014        self.client
1015            .post_response(&self.client.config.api_base_url, "webhooks", request)
1016            .await
1017    }
1018
1019    pub async fn list(&self, query: &WebhookListQuery) -> Result<Vec<Webhook>> {
1020        Ok(self.list_page(query).await?.data)
1021    }
1022
1023    pub async fn list_page(&self, query: &WebhookListQuery) -> Result<Page<Webhook>> {
1024        let response = self
1025            .client
1026            .get_with_query_response(&self.client.config.api_base_url, "users/me/webhooks", query)
1027            .await?;
1028        Ok(CloudConvertClient::into_page(response))
1029    }
1030
1031    pub async fn delete(&self, id: impl AsRef<str>) -> Result<()> {
1032        let id = resource_id(id.as_ref())?;
1033        self.client
1034            .delete(&self.client.config.api_base_url, &format!("webhooks/{id}"))
1035            .await
1036    }
1037}
1038
1039#[derive(Clone, Debug)]
1040pub struct OperationsResource {
1041    client: CloudConvertClient,
1042}
1043
1044impl OperationsResource {
1045    pub async fn list(&self, query: &OperationListQuery) -> Result<Vec<Operation>> {
1046        Ok(self.list_page(query).await?.data)
1047    }
1048
1049    pub async fn list_page(&self, query: &OperationListQuery) -> Result<Page<Operation>> {
1050        let response = self
1051            .client
1052            .get_with_query_response(&self.client.config.api_base_url, "operations", query)
1053            .await?;
1054        Ok(CloudConvertClient::into_page(response))
1055    }
1056}
1057
1058fn form_value(value: &Value) -> String {
1059    match value {
1060        Value::String(value) => value.clone(),
1061        Value::Number(value) => value.to_string(),
1062        Value::Bool(value) => value.to_string(),
1063        Value::Null => String::new(),
1064        other => other.to_string(),
1065    }
1066}
1067
1068fn temporary_download_path(destination: &Path) -> PathBuf {
1069    let mut name = OsString::from(".");
1070    name.push(
1071        destination
1072            .file_name()
1073            .unwrap_or_else(|| OsStr::new("download")),
1074    );
1075    name.push(format!(".cloudconvert-{}.tmp", unique_suffix()));
1076
1077    match destination.parent() {
1078        Some(parent) => parent.join(name),
1079        None => PathBuf::from(name),
1080    }
1081}
1082
1083struct TemporaryDownload {
1084    path: PathBuf,
1085    committed: bool,
1086}
1087
1088impl TemporaryDownload {
1089    fn new(path: PathBuf) -> Self {
1090        Self {
1091            path,
1092            committed: false,
1093        }
1094    }
1095
1096    fn path(&self) -> &Path {
1097        &self.path
1098    }
1099
1100    fn commit(&mut self) {
1101        self.committed = true;
1102    }
1103}
1104
1105impl Drop for TemporaryDownload {
1106    fn drop(&mut self) {
1107        if !self.committed {
1108            let _ = std::fs::remove_file(&self.path);
1109        }
1110    }
1111}
1112
1113fn unique_suffix() -> String {
1114    let nanos = SystemTime::now()
1115        .duration_since(UNIX_EPOCH)
1116        .map(|duration| duration.as_nanos())
1117        .unwrap_or_default();
1118
1119    format!("{}-{nanos}", std::process::id())
1120}