1#[cfg(feature = "retry")]
2use std::time::Duration;
3use std::{
4 error::Error as StdError,
5 ffi::{OsStr, OsString},
6 path::{Path, PathBuf},
7 sync::Arc,
8 time::{SystemTime, UNIX_EPOCH},
9};
10
11use bytes::Bytes;
12use futures_util::{Stream, StreamExt, TryStream};
13use reqwest::{
14 Body, Method, Response,
15 header::{HeaderMap, LOCATION},
16 multipart,
17};
18use serde::{Serialize, de::DeserializeOwned};
19use serde_json::Value;
20use tokio::io::AsyncWriteExt;
21use url::Url;
22
23use crate::{
24 Result,
25 config::{ApiKey, ClientBuilder, CloudConvertConfig, OAuthAccessToken},
26 error::{ApiErrorBody, Error},
27 jobs::{
28 ApiResponse, DataEnvelope, Job, JobCreateRequest, JobGetQuery, JobListQuery, Page,
29 RateLimit, Task, TaskGetQuery, TaskListQuery, UploadForm,
30 },
31 operations::{Operation, OperationListQuery},
32 resources::{User, Webhook, WebhookCreateRequest, WebhookListQuery},
33 socket::{SocketChannel, SocketSubscription},
34 tasks::TaskRequest,
35};
36
37#[cfg(feature = "socket")]
38use crate::socket::CloudConvertSocket;
39
40#[derive(Clone, Debug)]
41pub struct CloudConvertClient {
42 config: Arc<CloudConvertConfig>,
43 http: reqwest::Client,
44 redirectless_http: reqwest::Client,
45}
46
47impl CloudConvertClient {
48 pub fn builder(api_key: ApiKey) -> ClientBuilder {
49 ClientBuilder::new(api_key)
50 }
51
52 pub fn builder_with_access_token(access_token: OAuthAccessToken) -> ClientBuilder {
53 ClientBuilder::new_with_access_token(access_token)
54 }
55
56 pub(crate) fn from_parts(
57 config: CloudConvertConfig,
58 http: reqwest::Client,
59 redirectless_http: reqwest::Client,
60 ) -> Result<Self> {
61 Ok(Self {
62 config: Arc::new(config),
63 http,
64 redirectless_http,
65 })
66 }
67
68 pub fn config(&self) -> &CloudConvertConfig {
69 &self.config
70 }
71
72 pub fn jobs(&self) -> JobsResource {
73 JobsResource {
74 client: self.clone(),
75 }
76 }
77
78 pub fn tasks(&self) -> TasksResource {
79 TasksResource {
80 client: self.clone(),
81 }
82 }
83
84 pub fn users(&self) -> UsersResource {
85 UsersResource {
86 client: self.clone(),
87 }
88 }
89
90 pub fn webhooks(&self) -> WebhooksResource {
91 WebhooksResource {
92 client: self.clone(),
93 }
94 }
95
96 pub fn operations(&self) -> OperationsResource {
97 OperationsResource {
98 client: self.clone(),
99 }
100 }
101
102 pub fn socket_base_url(&self) -> &'static str {
103 crate::socket::socket_base_url(self.config.sandbox())
104 }
105
106 pub fn socket_subscription(&self, channel: SocketChannel) -> SocketSubscription {
107 SocketSubscription::new(channel.name().into_owned(), self.config.credential.expose())
108 }
109
110 #[cfg(feature = "socket")]
111 pub async fn socket(
112 &self,
113 channels: impl IntoIterator<Item = SocketChannel>,
114 ) -> Result<CloudConvertSocket> {
115 let subscriptions = channels
116 .into_iter()
117 .map(|channel| self.socket_subscription(channel));
118 CloudConvertSocket::connect(self.socket_base_url(), subscriptions).await
119 }
120
121 #[cfg(feature = "socket")]
122 pub async fn socket_with_buffer(
123 &self,
124 channels: impl IntoIterator<Item = SocketChannel>,
125 buffer: usize,
126 ) -> Result<CloudConvertSocket> {
127 let subscriptions = channels
128 .into_iter()
129 .map(|channel| self.socket_subscription(channel));
130 CloudConvertSocket::connect_with_buffer(self.socket_base_url(), subscriptions, buffer).await
131 }
132
133 pub async fn download(&self, url: impl AsRef<str>) -> Result<Bytes> {
134 let response = self.http.get(url.as_ref()).send().await?;
135 Self::ensure_success(response)
136 .await?
137 .bytes()
138 .await
139 .map_err(Error::Http)
140 }
141
142 pub async fn download_stream(
143 &self,
144 url: impl AsRef<str>,
145 ) -> Result<impl Stream<Item = Result<Bytes>>> {
146 let response = self.http.get(url.as_ref()).send().await?;
147 Ok(Self::ensure_success(response)
148 .await?
149 .bytes_stream()
150 .map(|chunk| chunk.map_err(Error::Http)))
151 }
152
153 pub async fn download_to_path(
154 &self,
155 url: impl AsRef<str>,
156 destination: impl AsRef<Path>,
157 ) -> Result<()> {
158 let destination = destination.as_ref();
159 let temp_path = temporary_download_path(destination);
160 self.download_to_temporary_path(url.as_ref(), destination, temp_path)
161 .await
162 }
163
164 pub async fn upload_bytes(
165 &self,
166 task: &Task,
167 filename: impl Into<String>,
168 bytes: impl Into<Bytes>,
169 ) -> Result<()> {
170 self.upload_part(
171 task,
172 "file",
173 multipart::Part::stream(Body::from(bytes.into())).file_name(filename.into()),
174 )
175 .await
176 }
177
178 pub async fn upload_body(
179 &self,
180 task: &Task,
181 filename: impl Into<String>,
182 body: Body,
183 ) -> Result<()> {
184 self.upload_part(
185 task,
186 "file",
187 multipart::Part::stream(body).file_name(filename.into()),
188 )
189 .await
190 }
191
192 pub async fn upload_stream<S, B, E>(
193 &self,
194 task: &Task,
195 filename: impl Into<String>,
196 stream: S,
197 ) -> Result<()>
198 where
199 S: TryStream<Ok = B, Error = E> + Send + 'static,
200 Bytes: From<B>,
201 E: Into<Box<dyn StdError + Send + Sync>>,
202 {
203 self.upload_body(task, filename, Body::wrap_stream(stream))
204 .await
205 }
206
207 pub async fn upload_path(&self, task: &Task, path: impl AsRef<Path>) -> Result<()> {
208 self.upload_part(task, "file", multipart::Part::file(path).await?)
209 .await
210 }
211
212 async fn download_to_temporary_path(
213 &self,
214 url: &str,
215 destination: &Path,
216 temp_path: PathBuf,
217 ) -> Result<()> {
218 let mut temp_file = TemporaryDownload::new(temp_path);
219 let mut stream = self.download_stream(url).await?;
220 let mut file = tokio::fs::File::create(temp_file.path()).await?;
221
222 while let Some(chunk) = stream.next().await {
223 file.write_all(&chunk?).await?;
224 }
225
226 file.flush().await?;
227 drop(file);
228 tokio::fs::rename(temp_file.path(), destination).await?;
229 temp_file.commit();
230 Ok(())
231 }
232
233 async fn upload_part(
234 &self,
235 task: &Task,
236 field_name: &str,
237 part: multipart::Part,
238 ) -> Result<()> {
239 let form = Self::upload_form(task)?;
240 let mut multipart = multipart::Form::new();
241 for (key, value) in &form.parameters {
242 multipart = multipart.text(key.clone(), form_value(value));
243 }
244 multipart = multipart.part(field_name.to_string(), part);
245
246 let response = self
247 .http
248 .post(&form.url)
249 .multipart(multipart)
250 .send()
251 .await?;
252 Self::ensure_success(response).await?;
253 Ok(())
254 }
255
256 fn upload_form(task: &Task) -> Result<&UploadForm> {
257 task.upload_form().ok_or(Error::UploadTaskNotReady)
258 }
259
260 async fn get_response<T>(&self, base: &Url, path: &str) -> Result<ApiResponse<T>>
261 where
262 T: DeserializeOwned,
263 {
264 let url = api_url(base, path)?;
265 let response = self
266 .send_api(|| {
267 self.http
268 .request(Method::GET, url.clone())
269 .bearer_auth(self.config.credential.expose())
270 })
271 .await?;
272 Self::decode_response(response).await
273 }
274
275 async fn get_with_query_response<T, Q>(
276 &self,
277 base: &Url,
278 path: &str,
279 query: &Q,
280 ) -> Result<ApiResponse<T>>
281 where
282 T: DeserializeOwned,
283 Q: Serialize + ?Sized,
284 {
285 let url = api_url(base, path)?;
286 let response = self
287 .send_api(|| {
288 self.http
289 .request(Method::GET, url.clone())
290 .bearer_auth(self.config.credential.expose())
291 .query(query)
292 })
293 .await?;
294 Self::decode_response(response).await
295 }
296
297 async fn post_response<T, B>(&self, base: &Url, path: &str, body: &B) -> Result<ApiResponse<T>>
298 where
299 T: DeserializeOwned,
300 B: Serialize + ?Sized,
301 {
302 let url = api_url(base, path)?;
303 let response = self
304 .send_api(|| {
305 self.http
306 .request(Method::POST, url.clone())
307 .bearer_auth(self.config.credential.expose())
308 .json(body)
309 })
310 .await?;
311 Self::decode_response(response).await
312 }
313
314 async fn delete(&self, base: &Url, path: &str) -> Result<()> {
315 let url = api_url(base, path)?;
316 let response = self
317 .send_api(|| {
318 self.http
319 .request(Method::DELETE, url.clone())
320 .bearer_auth(self.config.credential.expose())
321 })
322 .await?;
323 Self::ensure_success(response).await?;
324 Ok(())
325 }
326
327 async fn get_redirect_location<Q>(&self, base: &Url, path: &str, query: &Q) -> Result<Url>
328 where
329 Q: Serialize + ?Sized,
330 {
331 let url = api_url(base, path)?;
332 let response = self
333 .send_api(|| {
334 self.redirectless_http
335 .request(Method::GET, url.clone())
336 .bearer_auth(self.config.credential.expose())
337 .query(query)
338 })
339 .await?;
340 Self::decode_redirect_location(response).await
341 }
342
343 async fn post_redirect_location<B>(&self, base: &Url, path: &str, body: &B) -> Result<Url>
344 where
345 B: Serialize + ?Sized,
346 {
347 let url = api_url(base, path)?;
348 let response = self
349 .send_api(|| {
350 self.redirectless_http
351 .request(Method::POST, url.clone())
352 .bearer_auth(self.config.credential.expose())
353 .json(body)
354 })
355 .await?;
356 Self::decode_redirect_location(response).await
357 }
358
359 async fn send_api<F>(&self, build: F) -> Result<Response>
360 where
361 F: Fn() -> reqwest::RequestBuilder,
362 {
363 #[cfg(feature = "retry")]
364 if let Some(policy) = &self.config.retry_policy {
365 return self.send_api_with_retry(build, policy).await;
366 }
367
368 Ok(build().send().await?)
369 }
370
371 #[cfg(feature = "retry")]
372 async fn send_api_with_retry<F>(
373 &self,
374 build: F,
375 policy: &crate::RetryPolicy,
376 ) -> Result<Response>
377 where
378 F: Fn() -> reqwest::RequestBuilder,
379 {
380 let attempts = policy.max_attempts_value().max(1);
381 let mut delay = policy.initial_delay_value();
382
383 for attempt in 1..=attempts {
384 match build().send().await {
385 Ok(response) => {
386 if attempt == attempts || !Self::is_retryable_status(response.status().as_u16())
387 {
388 return Ok(response);
389 }
390
391 let retry_after = policy.respect_retry_after_value().then(|| {
392 response
393 .headers()
394 .get("retry-after")
395 .and_then(|value| value.to_str().ok())
396 .and_then(|value| value.parse::<u64>().ok())
397 .map(Duration::from_secs)
398 });
399 let sleep_for = retry_after
400 .flatten()
401 .map(|duration| duration.min(policy.max_delay_value()))
402 .unwrap_or(delay);
403 tokio::time::sleep(sleep_for).await;
404 }
405 Err(error) => {
406 if attempt == attempts || !Self::is_retryable_error(&error) {
407 return Err(Error::Http(error));
408 }
409
410 tokio::time::sleep(delay).await;
411 }
412 }
413
414 delay = next_retry_delay(delay, policy);
415 }
416
417 unreachable!("retry loop always returns on the final attempt")
418 }
419
420 #[cfg(feature = "retry")]
421 fn is_retryable_status(status: u16) -> bool {
422 matches!(status, 429 | 500 | 502 | 503 | 504)
423 }
424
425 #[cfg(feature = "retry")]
426 fn is_retryable_error(error: &reqwest::Error) -> bool {
427 error.is_connect() || error.is_timeout()
428 }
429
430 async fn decode_response<T>(response: Response) -> Result<ApiResponse<T>>
431 where
432 T: DeserializeOwned,
433 {
434 let rate_limit = Self::rate_limit_from_headers(response.headers());
435 let response = Self::ensure_success(response).await?;
436 let envelope = response.json::<DataEnvelope<T>>().await?;
437 Ok(ApiResponse {
438 data: envelope.data,
439 links: envelope.links,
440 meta: envelope.meta,
441 rate_limit,
442 })
443 }
444
445 fn into_page<T>(response: ApiResponse<Vec<T>>) -> Page<T> {
446 Page {
447 data: response.data,
448 links: response.links,
449 meta: response.meta,
450 rate_limit: response.rate_limit,
451 }
452 }
453
454 async fn decode_redirect_location(response: Response) -> Result<Url> {
455 let response_url = response.url().clone();
456 if response.status().is_redirection() {
457 let location = response
458 .headers()
459 .get(LOCATION)
460 .ok_or(Error::MissingRedirectLocation)?;
461 let location = location
462 .to_str()
463 .map_err(|_| Error::MissingRedirectLocation)?;
464 return response_url.join(location).map_err(Error::Url);
465 }
466
467 let _ = Self::ensure_success(response).await?;
468 Err(Error::MissingRedirectLocation)
469 }
470
471 async fn ensure_success(response: Response) -> Result<Response> {
472 if response.status().is_success() {
473 return Ok(response);
474 }
475
476 let status = response.status().as_u16();
477 let rate_limit = Self::rate_limit_from_headers(response.headers());
478 let body = response.text().await.unwrap_or_default();
479 let parsed = serde_json::from_str::<ApiErrorBody>(&body).ok();
480 let message = parsed
481 .as_ref()
482 .and_then(|body| body.message.clone())
483 .filter(|message| !message.is_empty())
484 .unwrap_or_else(|| "request failed".to_string());
485 let code = parsed.as_ref().and_then(|body| body.code.clone());
486 let errors = parsed.and_then(|body| body.errors);
487
488 Err(Error::Api {
489 status,
490 message,
491 code,
492 errors: errors.map(Box::new),
493 rate_limit: rate_limit.map(Box::new),
494 })
495 }
496
497 fn rate_limit_from_headers(headers: &HeaderMap) -> Option<RateLimit> {
498 let rate_limit = RateLimit {
499 limit: header_u64(headers, "x-ratelimit-limit"),
500 remaining: header_u64(headers, "x-ratelimit-remaining"),
501 reset: header_u64(headers, "x-ratelimit-reset"),
502 retry_after: header_u64(headers, "retry-after"),
503 };
504
505 if rate_limit.limit.is_some()
506 || rate_limit.remaining.is_some()
507 || rate_limit.reset.is_some()
508 || rate_limit.retry_after.is_some()
509 {
510 Some(rate_limit)
511 } else {
512 None
513 }
514 }
515}
516
517fn header_u64(headers: &HeaderMap, name: &'static str) -> Option<u64> {
518 headers.get(name)?.to_str().ok()?.parse().ok()
519}
520
521fn api_url(base: &Url, path: &str) -> Result<Url> {
522 validate_api_path(path)?;
523 base.join(path).map_err(Error::Url)
524}
525
526fn validate_api_path(path: &str) -> Result<()> {
527 let valid = !path.is_empty()
528 && path.split('/').all(|segment| {
529 !segment.is_empty()
530 && segment
531 .bytes()
532 .all(|byte| byte.is_ascii_alphanumeric() || matches!(byte, b'-' | b'_'))
533 });
534
535 if valid {
536 Ok(())
537 } else {
538 Err(Error::InvalidApiPath)
539 }
540}
541
542fn resource_id(id: &str) -> Result<&str> {
543 let valid = !id.is_empty()
544 && id
545 .bytes()
546 .all(|byte| byte.is_ascii_alphanumeric() || matches!(byte, b'-' | b'_'));
547
548 if valid {
549 Ok(id)
550 } else {
551 Err(Error::InvalidResourceId)
552 }
553}
554
555#[cfg(feature = "retry")]
556fn next_retry_delay(current: Duration, policy: &crate::RetryPolicy) -> Duration {
557 let next = current.mul_f64(policy.backoff_factor_value());
558 next.min(policy.max_delay_value())
559}
560
561#[derive(Clone, Debug)]
562pub struct JobsResource {
563 client: CloudConvertClient,
564}
565
566impl JobsResource {
567 pub async fn list(&self, query: &JobListQuery) -> Result<Vec<Job>> {
568 Ok(self.list_page(query).await?.data)
569 }
570
571 pub async fn list_page(&self, query: &JobListQuery) -> Result<Page<Job>> {
572 let response = self
573 .client
574 .get_with_query_response(&self.client.config.api_base_url, "jobs", query)
575 .await?;
576 Ok(CloudConvertClient::into_page(response))
577 }
578
579 pub async fn create(&self, request: impl Into<JobCreateRequest>) -> Result<Job> {
580 Ok(self.create_response(request).await?.data)
581 }
582
583 pub async fn create_response(
584 &self,
585 request: impl Into<JobCreateRequest>,
586 ) -> Result<ApiResponse<Job>> {
587 self.client
588 .post_response(&self.client.config.api_base_url, "jobs", &request.into())
589 .await
590 }
591
592 pub async fn create_and_wait(&self, request: impl Into<JobCreateRequest>) -> Result<Job> {
593 Ok(self.create_and_wait_response(request).await?.data)
594 }
595
596 #[cfg(feature = "socket")]
597 pub async fn create_and_wait_socket(
598 &self,
599 request: impl Into<JobCreateRequest>,
600 ) -> Result<Job> {
601 let job = self.create(request).await?;
602 if job.is_terminal() {
603 return Ok(job);
604 }
605
606 self.wait_socket(&job.id).await
607 }
608
609 pub async fn create_and_wait_response(
610 &self,
611 request: impl Into<JobCreateRequest>,
612 ) -> Result<ApiResponse<Job>> {
613 self.client
614 .post_response(&self.client.config.sync_base_url, "jobs", &request.into())
615 .await
616 }
617
618 pub async fn create_and_wait_redirect_url(
619 &self,
620 request: impl Into<JobCreateRequest>,
621 ) -> Result<Url> {
622 let mut request = request.into();
623 request.redirect = Some(true);
624 self.client
625 .post_redirect_location(&self.client.config.sync_base_url, "jobs", &request)
626 .await
627 }
628
629 pub async fn get(&self, id: impl AsRef<str>) -> Result<Job> {
630 Ok(self.get_response(id).await?.data)
631 }
632
633 pub async fn get_response(&self, id: impl AsRef<str>) -> Result<ApiResponse<Job>> {
634 let id = resource_id(id.as_ref())?;
635 self.client
636 .get_response(&self.client.config.api_base_url, &format!("jobs/{id}"))
637 .await
638 }
639
640 pub async fn get_with_query(&self, id: impl AsRef<str>, query: &JobGetQuery) -> Result<Job> {
641 Ok(self.get_with_query_response(id, query).await?.data)
642 }
643
644 pub async fn get_with_query_response(
645 &self,
646 id: impl AsRef<str>,
647 query: &JobGetQuery,
648 ) -> Result<ApiResponse<Job>> {
649 let id = resource_id(id.as_ref())?;
650 self.client
651 .get_with_query_response(
652 &self.client.config.api_base_url,
653 &format!("jobs/{id}"),
654 query,
655 )
656 .await
657 }
658
659 pub async fn get_redirect_url(&self, id: impl AsRef<str>) -> Result<Url> {
660 let id = resource_id(id.as_ref())?;
661 let query = JobGetQuery::default().redirect(true);
662 self.client
663 .get_redirect_location(
664 &self.client.config.api_base_url,
665 &format!("jobs/{id}"),
666 &query,
667 )
668 .await
669 }
670
671 pub async fn wait(&self, id: impl AsRef<str>) -> Result<Job> {
672 Ok(self.wait_response(id).await?.data)
673 }
674
675 #[cfg(feature = "socket")]
676 pub async fn wait_socket(&self, id: impl AsRef<str>) -> Result<Job> {
677 let id = resource_id(id.as_ref())?.to_string();
678 let mut socket = self.client.socket([SocketChannel::job(id.clone())]).await?;
679 let current = self.get(&id).await?;
680 if current.is_terminal() {
681 let _ = socket.disconnect().await;
682 return Ok(current);
683 }
684
685 loop {
686 let event = socket
687 .next_event()
688 .await
689 .ok_or_else(|| Error::Socket(format!("socket closed before job {id} completed")))?;
690
691 if !event.is_job_event() || !event.is_terminal() {
692 continue;
693 }
694
695 let job = match event.job()? {
696 Some(job) if job.id == id => job,
697 Some(_) => continue,
698 None => self.get(&id).await?,
699 };
700
701 let _ = socket.disconnect().await;
702 return Ok(job);
703 }
704 }
705
706 #[cfg(feature = "socket")]
707 pub async fn task_events_socket(&self, id: impl AsRef<str>) -> Result<CloudConvertSocket> {
708 self.task_events_socket_with_buffer(id, 64).await
709 }
710
711 #[cfg(feature = "socket")]
712 pub async fn task_events_socket_with_buffer(
713 &self,
714 id: impl AsRef<str>,
715 buffer: usize,
716 ) -> Result<CloudConvertSocket> {
717 let id = resource_id(id.as_ref())?.to_string();
718 self.client
719 .socket_with_buffer([SocketChannel::job_tasks(id)], buffer)
720 .await
721 }
722
723 pub async fn wait_response(&self, id: impl AsRef<str>) -> Result<ApiResponse<Job>> {
724 let id = resource_id(id.as_ref())?;
725 self.client
726 .get_response(&self.client.config.sync_base_url, &format!("jobs/{id}"))
727 .await
728 }
729
730 pub async fn wait_with_query(&self, id: impl AsRef<str>, query: &JobGetQuery) -> Result<Job> {
731 Ok(self.wait_with_query_response(id, query).await?.data)
732 }
733
734 pub async fn wait_with_query_response(
735 &self,
736 id: impl AsRef<str>,
737 query: &JobGetQuery,
738 ) -> Result<ApiResponse<Job>> {
739 let id = resource_id(id.as_ref())?;
740 self.client
741 .get_with_query_response(
742 &self.client.config.sync_base_url,
743 &format!("jobs/{id}"),
744 query,
745 )
746 .await
747 }
748
749 pub async fn wait_redirect_url(&self, id: impl AsRef<str>) -> Result<Url> {
750 let id = resource_id(id.as_ref())?;
751 let query = JobGetQuery::default().redirect(true);
752 self.client
753 .get_redirect_location(
754 &self.client.config.sync_base_url,
755 &format!("jobs/{id}"),
756 &query,
757 )
758 .await
759 }
760
761 pub async fn delete(&self, id: impl AsRef<str>) -> Result<()> {
762 let id = resource_id(id.as_ref())?;
763 self.client
764 .delete(&self.client.config.api_base_url, &format!("jobs/{id}"))
765 .await
766 }
767}
768
769#[derive(Clone, Debug)]
770pub struct TasksResource {
771 client: CloudConvertClient,
772}
773
774impl TasksResource {
775 pub async fn list(&self, query: &TaskListQuery) -> Result<Vec<Task>> {
776 Ok(self.list_page(query).await?.data)
777 }
778
779 pub async fn list_page(&self, query: &TaskListQuery) -> Result<Page<Task>> {
780 let response = self
781 .client
782 .get_with_query_response(&self.client.config.api_base_url, "tasks", query)
783 .await?;
784 Ok(CloudConvertClient::into_page(response))
785 }
786
787 pub async fn create(&self, request: impl Into<TaskRequest>) -> Result<Task> {
788 Ok(self.create_response(request).await?.data)
789 }
790
791 pub async fn create_response(
792 &self,
793 request: impl Into<TaskRequest>,
794 ) -> Result<ApiResponse<Task>> {
795 let request = request.into();
796 let path = request.operation().to_string();
797 self.client
798 .post_response(&self.client.config.api_base_url, &path, &request)
799 .await
800 }
801
802 pub async fn get(&self, id: impl AsRef<str>) -> Result<Task> {
803 Ok(self.get_response(id).await?.data)
804 }
805
806 pub async fn get_response(&self, id: impl AsRef<str>) -> Result<ApiResponse<Task>> {
807 let id = resource_id(id.as_ref())?;
808 self.client
809 .get_response(&self.client.config.api_base_url, &format!("tasks/{id}"))
810 .await
811 }
812
813 pub async fn get_with_query(&self, id: impl AsRef<str>, query: &TaskGetQuery) -> Result<Task> {
814 Ok(self.get_with_query_response(id, query).await?.data)
815 }
816
817 pub async fn get_with_query_response(
818 &self,
819 id: impl AsRef<str>,
820 query: &TaskGetQuery,
821 ) -> Result<ApiResponse<Task>> {
822 let id = resource_id(id.as_ref())?;
823 self.client
824 .get_with_query_response(
825 &self.client.config.api_base_url,
826 &format!("tasks/{id}"),
827 query,
828 )
829 .await
830 }
831
832 pub async fn wait(&self, id: impl AsRef<str>) -> Result<Task> {
833 Ok(self.wait_response(id).await?.data)
834 }
835
836 #[cfg(feature = "socket")]
837 pub async fn wait_socket(&self, id: impl AsRef<str>) -> Result<Task> {
838 let id = resource_id(id.as_ref())?.to_string();
839 let mut socket = self
840 .client
841 .socket([SocketChannel::task(id.clone())])
842 .await?;
843 let current = self.get(&id).await?;
844 if current.is_terminal() {
845 let _ = socket.disconnect().await;
846 return Ok(current);
847 }
848
849 loop {
850 let event = socket.next_event().await.ok_or_else(|| {
851 Error::Socket(format!("socket closed before task {id} completed"))
852 })?;
853
854 if !event.is_task_event() || !event.is_terminal() {
855 continue;
856 }
857
858 let task = match event.task()? {
859 Some(task) if task.id == id => task,
860 Some(_) => continue,
861 None => self.get(&id).await?,
862 };
863
864 let _ = socket.disconnect().await;
865 return Ok(task);
866 }
867 }
868
869 pub async fn wait_response(&self, id: impl AsRef<str>) -> Result<ApiResponse<Task>> {
870 let id = resource_id(id.as_ref())?;
871 self.client
872 .get_response(&self.client.config.sync_base_url, &format!("tasks/{id}"))
873 .await
874 }
875
876 pub async fn wait_with_query(&self, id: impl AsRef<str>, query: &TaskGetQuery) -> Result<Task> {
877 Ok(self.wait_with_query_response(id, query).await?.data)
878 }
879
880 pub async fn wait_with_query_response(
881 &self,
882 id: impl AsRef<str>,
883 query: &TaskGetQuery,
884 ) -> Result<ApiResponse<Task>> {
885 let id = resource_id(id.as_ref())?;
886 self.client
887 .get_with_query_response(
888 &self.client.config.sync_base_url,
889 &format!("tasks/{id}"),
890 query,
891 )
892 .await
893 }
894
895 pub async fn cancel(&self, id: impl AsRef<str>) -> Result<Task> {
896 Ok(self.cancel_response(id).await?.data)
897 }
898
899 pub async fn cancel_response(&self, id: impl AsRef<str>) -> Result<ApiResponse<Task>> {
900 let id = resource_id(id.as_ref())?;
901 let empty = serde_json::json!({});
902 self.client
903 .post_response(
904 &self.client.config.api_base_url,
905 &format!("tasks/{id}/cancel"),
906 &empty,
907 )
908 .await
909 }
910
911 pub async fn retry(&self, id: impl AsRef<str>) -> Result<Task> {
912 Ok(self.retry_response(id).await?.data)
913 }
914
915 pub async fn retry_response(&self, id: impl AsRef<str>) -> Result<ApiResponse<Task>> {
916 let id = resource_id(id.as_ref())?;
917 let empty = serde_json::json!({});
918 self.client
919 .post_response(
920 &self.client.config.api_base_url,
921 &format!("tasks/{id}/retry"),
922 &empty,
923 )
924 .await
925 }
926
927 pub async fn delete(&self, id: impl AsRef<str>) -> Result<()> {
928 let id = resource_id(id.as_ref())?;
929 self.client
930 .delete(&self.client.config.api_base_url, &format!("tasks/{id}"))
931 .await
932 }
933}
934
935#[derive(Clone, Debug)]
936pub struct UsersResource {
937 client: CloudConvertClient,
938}
939
940impl UsersResource {
941 pub async fn me(&self) -> Result<User> {
942 Ok(self.me_response().await?.data)
943 }
944
945 pub async fn me_response(&self) -> Result<ApiResponse<User>> {
946 self.client
947 .get_response(&self.client.config.api_base_url, "users/me")
948 .await
949 }
950
951 #[cfg(feature = "socket")]
952 pub async fn job_events_socket(&self) -> Result<CloudConvertSocket> {
953 self.job_events_socket_with_buffer(64).await
954 }
955
956 #[cfg(feature = "socket")]
957 pub async fn job_events_socket_with_buffer(&self, buffer: usize) -> Result<CloudConvertSocket> {
958 let user = self.me().await?;
959 self.client
960 .socket_with_buffer([SocketChannel::user_jobs(user.id)], buffer)
961 .await
962 }
963
964 #[cfg(feature = "socket")]
965 pub async fn task_events_socket(&self) -> Result<CloudConvertSocket> {
966 self.task_events_socket_with_buffer(64).await
967 }
968
969 #[cfg(feature = "socket")]
970 pub async fn task_events_socket_with_buffer(
971 &self,
972 buffer: usize,
973 ) -> Result<CloudConvertSocket> {
974 let user = self.me().await?;
975 self.client
976 .socket_with_buffer([SocketChannel::user_tasks(user.id)], buffer)
977 .await
978 }
979
980 #[cfg(feature = "socket")]
981 pub async fn events_socket(&self) -> Result<CloudConvertSocket> {
982 self.events_socket_with_buffer(64).await
983 }
984
985 #[cfg(feature = "socket")]
986 pub async fn events_socket_with_buffer(&self, buffer: usize) -> Result<CloudConvertSocket> {
987 let user = self.me().await?;
988 self.client
989 .socket_with_buffer(
990 [
991 SocketChannel::user_jobs(user.id.clone()),
992 SocketChannel::user_tasks(user.id),
993 ],
994 buffer,
995 )
996 .await
997 }
998}
999
1000#[derive(Clone, Debug)]
1001pub struct WebhooksResource {
1002 client: CloudConvertClient,
1003}
1004
1005impl WebhooksResource {
1006 pub async fn create(&self, request: &WebhookCreateRequest) -> Result<Webhook> {
1007 Ok(self.create_response(request).await?.data)
1008 }
1009
1010 pub async fn create_response(
1011 &self,
1012 request: &WebhookCreateRequest,
1013 ) -> Result<ApiResponse<Webhook>> {
1014 self.client
1015 .post_response(&self.client.config.api_base_url, "webhooks", request)
1016 .await
1017 }
1018
1019 pub async fn list(&self, query: &WebhookListQuery) -> Result<Vec<Webhook>> {
1020 Ok(self.list_page(query).await?.data)
1021 }
1022
1023 pub async fn list_page(&self, query: &WebhookListQuery) -> Result<Page<Webhook>> {
1024 let response = self
1025 .client
1026 .get_with_query_response(&self.client.config.api_base_url, "users/me/webhooks", query)
1027 .await?;
1028 Ok(CloudConvertClient::into_page(response))
1029 }
1030
1031 pub async fn delete(&self, id: impl AsRef<str>) -> Result<()> {
1032 let id = resource_id(id.as_ref())?;
1033 self.client
1034 .delete(&self.client.config.api_base_url, &format!("webhooks/{id}"))
1035 .await
1036 }
1037}
1038
1039#[derive(Clone, Debug)]
1040pub struct OperationsResource {
1041 client: CloudConvertClient,
1042}
1043
1044impl OperationsResource {
1045 pub async fn list(&self, query: &OperationListQuery) -> Result<Vec<Operation>> {
1046 Ok(self.list_page(query).await?.data)
1047 }
1048
1049 pub async fn list_page(&self, query: &OperationListQuery) -> Result<Page<Operation>> {
1050 let response = self
1051 .client
1052 .get_with_query_response(&self.client.config.api_base_url, "operations", query)
1053 .await?;
1054 Ok(CloudConvertClient::into_page(response))
1055 }
1056}
1057
1058fn form_value(value: &Value) -> String {
1059 match value {
1060 Value::String(value) => value.clone(),
1061 Value::Number(value) => value.to_string(),
1062 Value::Bool(value) => value.to_string(),
1063 Value::Null => String::new(),
1064 other => other.to_string(),
1065 }
1066}
1067
1068fn temporary_download_path(destination: &Path) -> PathBuf {
1069 let mut name = OsString::from(".");
1070 name.push(
1071 destination
1072 .file_name()
1073 .unwrap_or_else(|| OsStr::new("download")),
1074 );
1075 name.push(format!(".cloudconvert-{}.tmp", unique_suffix()));
1076
1077 match destination.parent() {
1078 Some(parent) => parent.join(name),
1079 None => PathBuf::from(name),
1080 }
1081}
1082
1083struct TemporaryDownload {
1084 path: PathBuf,
1085 committed: bool,
1086}
1087
1088impl TemporaryDownload {
1089 fn new(path: PathBuf) -> Self {
1090 Self {
1091 path,
1092 committed: false,
1093 }
1094 }
1095
1096 fn path(&self) -> &Path {
1097 &self.path
1098 }
1099
1100 fn commit(&mut self) {
1101 self.committed = true;
1102 }
1103}
1104
1105impl Drop for TemporaryDownload {
1106 fn drop(&mut self) {
1107 if !self.committed {
1108 let _ = std::fs::remove_file(&self.path);
1109 }
1110 }
1111}
1112
1113fn unique_suffix() -> String {
1114 let nanos = SystemTime::now()
1115 .duration_since(UNIX_EPOCH)
1116 .map(|duration| duration.as_nanos())
1117 .unwrap_or_default();
1118
1119 format!("{}-{nanos}", std::process::id())
1120}