1#![deny(clippy::all)]
2mod error;
3pub mod resources;
4pub mod retry;
5
6use chrono::{DateTime, Utc};
7use http::{header::ACCEPT, Method};
8use log::debug;
9use once_cell::sync::Lazy;
10use reqwest::{
11 blocking::{
12 multipart::{Form, Part},
13 Client as HttpClient, Response as HttpResponse,
14 },
15 header::{self, HeaderMap, HeaderValue},
16 IntoUrl, Proxy, Result as ReqwestResult,
17};
18use resources::{
19 attachments::UploadAttachmentResponse,
20 auth::{RefreshUserPermissionsRequest, RefreshUserPermissionsResponse},
21 bucket::{
22 GetKeyedSyncStateIdsRequest, GetKeyedSyncStateIdsResponse, GetKeyedSyncStatesResponse,
23 KeyedSyncState, KeyedSyncStateId,
24 },
25 bucket_statistics::GetBucketStatisticsResponse,
26 comment::{AttachmentReference, CommentTimestampFilter},
27 dataset::{
28 CreateIxpDatasetRequest, CreateIxpDatasetResponse, GetAllModelsInDatasetRequest,
29 GetAllModelsInDatasetRespone, IxpDatasetNew, QueryRequestParams, QueryResponse,
30 StatisticsRequestParams as DatasetStatisticsRequestParams, SummaryRequestParams,
31 SummaryResponse, UploadIxpDocumentResponse, UserModelMetadata,
32 },
33 documents::{Document, SyncRawEmailsRequest, SyncRawEmailsResponse},
34 email::{Email, GetEmailResponse},
35 integration::{
36 GetIntegrationResponse, GetIntegrationsResponse, Integration, NewIntegration,
37 PostIntegrationRequest, PostIntegrationResponse, PutIntegrationRequest,
38 PutIntegrationResponse,
39 },
40 label_def::{CreateOrUpdateLabelDefsBulkRequest, CreateOrUpdateLabelDefsBulkResponse},
41 project::ForceDeleteProject,
42 quota::{GetQuotasResponse, Quota},
43 source::StatisticsRequestParams as SourceStatisticsRequestParams,
44 stream::{GetStreamResponse, NewStream, PutStreamRequest, PutStreamResponse},
45 tenant_id::UiPathTenantId,
46 validation::{
47 LabelValidation, LabelValidationRequest, LabelValidationResponse, ValidationResponse,
48 },
49};
50use serde::{Deserialize, Serialize};
51use serde_json::json;
52use std::{
53 cell::Cell,
54 fmt::{Debug, Display},
55 io::Read,
56 path::{Path, PathBuf},
57 time::Duration,
58};
59use url::Url;
60
61use crate::resources::{
62 audit::{AuditQueryFilter, AuditQueryRequest, AuditQueryResponse},
63 bucket::{
64 CreateRequest as CreateBucketRequest, CreateResponse as CreateBucketResponse,
65 GetAvailableResponse as GetAvailableBucketsResponse, GetResponse as GetBucketResponse,
66 },
67 bucket_statistics::Statistics as BucketStatistics,
68 comment::{
69 GetAnnotationsResponse, GetCommentResponse, GetLabellingsAfter, GetPredictionsResponse,
70 GetRecentRequest, PutCommentsRequest, PutCommentsResponse, RecentCommentsPage,
71 SyncCommentsRequest, UpdateAnnotationsRequest,
72 },
73 dataset::{
74 CreateRequest as CreateDatasetRequest, CreateResponse as CreateDatasetResponse,
75 GetAvailableResponse as GetAvailableDatasetsResponse, GetResponse as GetDatasetResponse,
76 UpdateRequest as UpdateDatasetRequest, UpdateResponse as UpdateDatasetResponse,
77 },
78 email::{PutEmailsRequest, PutEmailsResponse},
79 project::{
80 CreateProjectRequest, CreateProjectResponse, GetProjectResponse, GetProjectsResponse,
81 UpdateProjectRequest, UpdateProjectResponse,
82 },
83 quota::{CreateQuota, TenantQuotaKind},
84 source::{
85 CreateRequest as CreateSourceRequest, CreateResponse as CreateSourceResponse,
86 GetAvailableResponse as GetAvailableSourcesResponse, GetResponse as GetSourceResponse,
87 UpdateRequest as UpdateSourceRequest, UpdateResponse as UpdateSourceResponse,
88 },
89 statistics::GetResponse as GetStatisticsResponse,
90 stream::{
91 AdvanceRequest as StreamAdvanceRequest, FetchRequest as StreamFetchRequest,
92 GetStreamsResponse, ResetRequest as StreamResetRequest,
93 TagExceptionsRequest as TagStreamExceptionsRequest,
94 },
95 tenant_id::TenantId,
96 user::{
97 CreateRequest as CreateUserRequest, CreateResponse as CreateUserResponse,
98 GetAvailableResponse as GetAvailableUsersResponse,
99 GetCurrentResponse as GetCurrentUserResponse, GetResponse as GetUserResponse,
100 PostUserRequest, PostUserResponse, WelcomeEmailResponse,
101 },
102 EmptySuccess, Response,
103};
104
105use crate::retry::{Retrier, RetryConfig};
106
107pub use crate::{
108 error::{Error, Result},
109 resources::{
110 bucket::{
111 Bucket, BucketType, FullName as BucketFullName, Id as BucketId,
112 Identifier as BucketIdentifier, Name as BucketName, NewBucket,
113 },
114 comment::{
115 AnnotatedComment, Comment, CommentFilter, CommentPredictionsThreshold,
116 CommentsIterPage, Continuation, EitherLabelling, Entities, Entity,
117 GetCommentPredictionsRequest, HasAnnotations, Id as CommentId, Label, Labelling,
118 Message, MessageBody, MessageSignature, MessageSubject, NewAnnotatedComment,
119 NewComment, NewEntities, NewLabelling, NewMoonForm, PredictedLabel, Prediction,
120 PropertyMap, PropertyValue, Sentiment, SyncCommentsResponse, TriggerLabelThreshold,
121 Uid as CommentUid,
122 },
123 dataset::{
124 Dataset, FullName as DatasetFullName, Id as DatasetId, Identifier as DatasetIdentifier,
125 ModelVersion, Name as DatasetName, NewDataset, UpdateDataset,
126 },
127 email::{
128 Continuation as EmailContinuation, EmailsIterPage, Id as EmailId, Mailbox, MimeContent,
129 NewEmail,
130 },
131 entity_def::{EntityDef, Id as EntityDefId, Name as EntityName, NewEntityDef},
132 integration::FullName as IntegrationFullName,
133 label_def::{
134 LabelDef, LabelDefPretrained, MoonFormFieldDef, Name as LabelName, NewLabelDef,
135 NewLabelDefPretrained, PretrainedId as LabelDefPretrainedId,
136 },
137 label_group::{
138 LabelGroup, Name as LabelGroupName, NewLabelGroup, DEFAULT_LABEL_GROUP_NAME,
139 },
140 project::{NewProject, Project, ProjectName, UpdateProject},
141 source::{
142 FullName as SourceFullName, Id as SourceId, Identifier as SourceIdentifier,
143 Name as SourceName, NewSource, Source, SourceKind, TransformTag, UpdateSource,
144 },
145 statistics::Statistics as CommentStatistics,
146 stream::{
147 Batch as StreamBatch, FullName as StreamFullName, SequenceId as StreamSequenceId,
148 Stream, StreamException, StreamExceptionMetadata,
149 },
150 user::{
151 Email as UserEmail, GlobalPermission, Id as UserId, Identifier as UserIdentifier,
152 ModifiedPermissions, NewUser, ProjectPermission, UpdateUser, User, Username,
153 },
154 },
155};
156
157#[derive(Clone, Debug, PartialEq, Eq)]
158pub struct Token(pub String);
159
160pub trait SplittableRequest {
161 fn split(self) -> impl Iterator<Item = Self>
162 where
163 Self: Sized;
164
165 fn count(&self) -> usize;
166}
167
168pub struct SplitableRequestResponse<ResponseT>
169where
170 for<'de> ResponseT: Deserialize<'de> + ReducibleResponse,
171{
172 pub response: ResponseT,
173 pub num_failed: usize,
174}
175
176pub trait ReducibleResponse {
177 fn merge(self, _b: Self) -> Self
178 where
179 Self: std::default::Default,
180 {
181 Default::default()
182 }
183
184 fn empty() -> Self
185 where
186 Self: std::default::Default,
187 {
188 Default::default()
189 }
190}
191
192pub struct Config {
193 pub endpoint: Url,
194 pub token: Token,
195 pub accept_invalid_certificates: bool,
196 pub proxy: Option<Url>,
197 pub retry_config: Option<RetryConfig>,
200}
201
202impl Default for Config {
203 fn default() -> Self {
204 Config {
205 endpoint: DEFAULT_ENDPOINT.clone(),
206 token: Token("".to_owned()),
207 accept_invalid_certificates: false,
208 proxy: None,
209 retry_config: None,
210 }
211 }
212}
213
214#[derive(Debug)]
215pub struct Client {
216 endpoints: Endpoints,
217 http_client: HttpClient,
218 headers: HeaderMap,
219 retrier: Option<Retrier>,
220}
221
222#[derive(Serialize)]
223pub struct GetLabellingsInBulk<'a> {
224 pub source_id: &'a SourceId,
225 pub return_predictions: &'a bool,
226
227 #[serde(skip_serializing_if = "Option::is_none")]
228 pub after: &'a Option<GetLabellingsAfter>,
229
230 #[serde(skip_serializing_if = "Option::is_none")]
231 pub limit: &'a Option<usize>,
232}
233
234#[derive(Serialize)]
235pub struct GetCommentsIterPageQuery<'a> {
236 #[serde(skip_serializing_if = "Option::is_none")]
237 pub from_timestamp: Option<DateTime<Utc>>,
238 #[serde(skip_serializing_if = "Option::is_none")]
239 pub to_timestamp: Option<DateTime<Utc>>,
240 #[serde(skip_serializing_if = "Option::is_none")]
241 pub after: Option<&'a Continuation>,
242 pub limit: usize,
243 pub include_markup: bool,
244}
245
246#[derive(Serialize)]
247pub struct GetEmailsIterPageQuery<'a> {
248 #[serde(skip_serializing_if = "Option::is_none")]
249 pub continuation: Option<&'a EmailContinuation>,
250 pub limit: usize,
251}
252
253#[derive(Serialize)]
254pub struct GetCommentQuery {
255 pub include_markup: bool,
256}
257
258#[derive(Serialize)]
259pub struct GetEmailQuery {
260 pub id: String,
261}
262
263impl Client {
264 pub fn new(config: Config) -> Result<Client> {
266 let http_client = build_http_client(&config)?;
267 let headers = build_headers(&config)?;
268 let endpoints = Endpoints::new(config.endpoint)?;
269 let retrier = config.retry_config.map(Retrier::new);
270 Ok(Client {
271 endpoints,
272 http_client,
273 headers,
274 retrier,
275 })
276 }
277
278 pub fn base_url(&self) -> &Url {
280 &self.endpoints.base
281 }
282
283 pub fn get_sources(&self) -> Result<Vec<Source>> {
285 Ok(self
286 .get::<_, GetAvailableSourcesResponse>(self.endpoints.sources.clone())?
287 .sources)
288 }
289
290 pub fn get_user(&self, user: impl Into<UserIdentifier>) -> Result<User> {
292 Ok(match user.into() {
293 UserIdentifier::Id(user_id) => {
294 self.get::<_, GetUserResponse>(self.endpoints.user_by_id(&user_id)?)?
295 .user
296 }
297 })
298 }
299
300 pub fn get_source(&self, source: impl Into<SourceIdentifier>) -> Result<Source> {
302 Ok(match source.into() {
303 SourceIdentifier::Id(source_id) => {
304 self.get::<_, GetSourceResponse>(self.endpoints.source_by_id(&source_id)?)?
305 .source
306 }
307 SourceIdentifier::FullName(source_name) => {
308 self.get::<_, GetSourceResponse>(self.endpoints.source_by_name(&source_name)?)?
309 .source
310 }
311 })
312 }
313
314 pub fn create_label_defs_bulk(
315 &self,
316 dataset_name: &DatasetFullName,
317 label_group: LabelGroupName,
318 label_defs: Vec<NewLabelDef>,
319 ) -> Result<()> {
320 self.put::<_, _, CreateOrUpdateLabelDefsBulkResponse>(
321 self.endpoints.label_group(dataset_name, label_group)?,
322 CreateOrUpdateLabelDefsBulkRequest { label_defs },
323 )?;
324 Ok(())
325 }
326
327 pub fn create_source(
329 &self,
330 source_name: &SourceFullName,
331 options: NewSource<'_>,
332 ) -> Result<Source> {
333 Ok(self
334 .put::<_, _, CreateSourceResponse>(
335 self.endpoints.source_by_name(source_name)?,
336 CreateSourceRequest { source: options },
337 )?
338 .source)
339 }
340
341 pub fn update_source(
343 &self,
344 source_name: &SourceFullName,
345 options: UpdateSource<'_>,
346 ) -> Result<Source> {
347 Ok(self
348 .post::<_, _, UpdateSourceResponse>(
349 self.endpoints.source_by_name(source_name)?,
350 UpdateSourceRequest { source: options },
351 Retry::Yes,
352 )?
353 .source)
354 }
355
356 pub fn delete_source(&self, source: impl Into<SourceIdentifier>) -> Result<()> {
358 let source_id = match source.into() {
359 SourceIdentifier::Id(source_id) => source_id,
360 source @ SourceIdentifier::FullName(_) => self.get_source(source)?.id,
361 };
362 self.delete(self.endpoints.source_by_id(&source_id)?)
363 }
364
365 pub fn create_quota(
367 &self,
368 target_tenant_id: &TenantId,
369 tenant_quota_kind: TenantQuotaKind,
370 options: CreateQuota,
371 ) -> Result<()> {
372 self.post(
373 self.endpoints.quota(target_tenant_id, tenant_quota_kind)?,
374 options,
375 Retry::Yes,
376 )
377 }
378
379 pub fn get_quotas(&self, tenant_id: &Option<UiPathTenantId>) -> Result<Vec<Quota>> {
381 Ok(self
382 .get::<_, GetQuotasResponse>(self.endpoints.quotas(tenant_id)?)?
383 .quotas)
384 }
385
386 pub fn delete_user(&self, user: impl Into<UserIdentifier>) -> Result<()> {
388 let UserIdentifier::Id(user_id) = user.into();
389 self.delete(self.endpoints.user_by_id(&user_id)?)
390 }
391
392 pub fn delete_comments(
394 &self,
395 source: impl Into<SourceIdentifier>,
396 comments: &[CommentId],
397 ) -> Result<()> {
398 let source_full_name = match source.into() {
399 source @ SourceIdentifier::Id(_) => self.get_source(source)?.full_name(),
400 SourceIdentifier::FullName(source_full_name) => source_full_name,
401 };
402 self.delete_query(
403 self.endpoints.comments_v1(&source_full_name)?,
404 Some(&id_list_query(comments.iter().map(|uid| &uid.0))),
405 )
406 }
407
408 pub fn get_comments_iter_page(
410 &self,
411 source_name: &SourceFullName,
412 continuation: Option<&ContinuationKind>,
413 to_timestamp: Option<DateTime<Utc>>,
414 limit: usize,
415 ) -> Result<CommentsIterPage> {
416 let (from_timestamp, after) = match continuation {
419 Some(ContinuationKind::Timestamp(from_timestamp)) => (Some(*from_timestamp), None),
422 Some(ContinuationKind::Continuation(after)) => (None, Some(after)),
425 None => (None, None),
428 };
429 let query_params = GetCommentsIterPageQuery {
430 from_timestamp,
431 to_timestamp,
432 after,
433 limit,
434 include_markup: true,
435 };
436 self.get_query(self.endpoints.comments(source_name)?, Some(&query_params))
437 }
438
439 pub fn get_dataset_query_iter<'a>(
441 &'a self,
442 dataset_name: &'a DatasetFullName,
443 params: &'a mut QueryRequestParams,
444 ) -> DatasetQueryIter<'a> {
445 DatasetQueryIter::new(self, dataset_name, params)
446 }
447
448 pub fn get_comments_iter<'a>(
450 &'a self,
451 source_name: &'a SourceFullName,
452 page_size: Option<usize>,
453 timerange: CommentsIterTimerange,
454 ) -> CommentsIter<'a> {
455 CommentsIter::new(self, source_name, page_size, timerange)
456 }
457
458 pub fn get_keyed_sync_state_ids(
459 &self,
460 bucket_id: &BucketId,
461 request: &GetKeyedSyncStateIdsRequest,
462 ) -> Result<Vec<KeyedSyncStateId>> {
463 Ok(self
464 .post::<_, _, GetKeyedSyncStateIdsResponse>(
465 self.endpoints.query_keyed_sync_state_ids(bucket_id)?,
466 Some(&request),
467 Retry::Yes,
468 )?
469 .keyed_sync_state_ids)
470 }
471
472 pub fn delete_keyed_sync_state(
473 &self,
474 bucket_id: &BucketId,
475 id: &KeyedSyncStateId,
476 ) -> Result<()> {
477 self.delete(self.endpoints.keyed_sync_state(bucket_id, id)?)
478 }
479
480 pub fn get_keyed_sync_states(&self, bucket_id: &BucketId) -> Result<Vec<KeyedSyncState>> {
481 Ok(self
482 .get::<_, GetKeyedSyncStatesResponse>(self.endpoints.keyed_sync_states(bucket_id)?)?
483 .keyed_sync_states)
484 }
485
486 pub fn get_email(&self, bucket_name: &BucketFullName, id: EmailId) -> Result<Vec<Email>> {
488 let query_params = GetEmailQuery { id: id.0 };
489 Ok(self
490 .get_query::<_, _, GetEmailResponse>(
491 self.endpoints.get_emails(bucket_name)?,
492 Some(&query_params),
493 )?
494 .emails)
495 }
496
497 pub fn get_emails_iter_page(
499 &self,
500 bucket_name: &BucketFullName,
501 continuation: Option<&EmailContinuation>,
502 limit: usize,
503 ) -> Result<EmailsIterPage> {
504 let query_params = GetEmailsIterPageQuery {
505 continuation,
506 limit,
507 };
508 self.post(
509 self.endpoints.get_emails(bucket_name)?,
510 Some(&query_params),
511 Retry::Yes,
512 )
513 }
514
515 pub fn get_emails_iter<'a>(
517 &'a self,
518 bucket_name: &'a BucketFullName,
519 page_size: Option<usize>,
520 ) -> EmailsIter<'a> {
521 EmailsIter::new(self, bucket_name, page_size)
522 }
523
524 pub fn get_comment<'a>(
526 &'a self,
527 source_name: &'a SourceFullName,
528 comment_id: &'a CommentId,
529 ) -> Result<Comment> {
530 let query_params = GetCommentQuery {
531 include_markup: true,
532 };
533 Ok(self
534 .get_query::<_, _, GetCommentResponse>(
535 self.endpoints.comment_by_id(source_name, comment_id)?,
536 Some(&query_params),
537 )?
538 .comment)
539 }
540 pub fn post_integration(
541 &self,
542 name: &IntegrationFullName,
543 integration: &NewIntegration,
544 ) -> Result<PostIntegrationResponse> {
545 self.request(
546 &Method::POST,
547 &self.endpoints.integration(name)?,
548 &Some(PostIntegrationRequest {
549 integration: integration.clone(),
550 }),
551 &None::<()>,
552 &Retry::No,
553 )
554 }
555
556 pub fn put_integration(
557 &self,
558 name: &IntegrationFullName,
559 integration: &NewIntegration,
560 ) -> Result<PutIntegrationResponse> {
561 self.request(
562 &Method::PUT,
563 &self.endpoints.integration(name)?,
564 &Some(PutIntegrationRequest {
565 integration: integration.clone(),
566 }),
567 &None::<()>,
568 &Retry::No,
569 )
570 }
571
572 pub fn put_comments_split_on_failure(
573 &self,
574 source_name: &SourceFullName,
575 comments: Vec<NewComment>,
576 no_charge: bool,
577 ) -> Result<SplitableRequestResponse<PutCommentsResponse>> {
578 self.splitable_request(
582 Method::PUT,
583 self.endpoints.put_comments(source_name)?,
584 PutCommentsRequest { comments },
585 Some(NoChargeQuery { no_charge }),
586 Retry::Yes,
587 )
588 }
589
590 pub fn put_comments(
591 &self,
592 source_name: &SourceFullName,
593 comments: Vec<NewComment>,
594 no_charge: bool,
595 ) -> Result<PutCommentsResponse> {
596 self.request(
599 &Method::PUT,
600 &self.endpoints.put_comments(source_name)?,
601 &Some(PutCommentsRequest { comments }),
602 &Some(NoChargeQuery { no_charge }),
603 &Retry::Yes,
604 )
605 }
606
607 pub fn put_stream(
608 &self,
609 dataset_name: &DatasetFullName,
610 stream: &NewStream,
611 ) -> Result<PutStreamResponse> {
612 self.put(
613 self.endpoints.streams(dataset_name)?,
614 Some(PutStreamRequest { stream }),
615 )
616 }
617
618 pub fn get_audit_events(
619 &self,
620 minimum_timestamp: Option<DateTime<Utc>>,
621 maximum_timestamp: Option<DateTime<Utc>>,
622 continuation: Option<Continuation>,
623 ) -> Result<AuditQueryResponse> {
624 self.post::<_, _, AuditQueryResponse>(
625 self.endpoints.audit_events_query()?,
626 AuditQueryRequest {
627 continuation,
628 filter: AuditQueryFilter {
629 timestamp: CommentTimestampFilter {
630 minimum: minimum_timestamp,
631 maximum: maximum_timestamp,
632 },
633 },
634 },
635 Retry::Yes,
636 )
637 }
638 pub fn get_latest_validation(
639 &self,
640 dataset_name: &DatasetFullName,
641 ) -> Result<ValidationResponse> {
642 self.get::<_, ValidationResponse>(self.endpoints.latest_validation(dataset_name)?)
643 }
644
645 pub fn get_validation(
646 &self,
647 dataset_name: &DatasetFullName,
648 model_version: &ModelVersion,
649 ) -> Result<ValidationResponse> {
650 self.get::<_, ValidationResponse>(self.endpoints.validation(dataset_name, model_version)?)
651 }
652
653 pub fn get_labellers(&self, dataset_name: &DatasetFullName) -> Result<Vec<UserModelMetadata>> {
654 Ok(self
655 .post::<_, _, GetAllModelsInDatasetRespone>(
656 self.endpoints.labellers(dataset_name)?,
657 GetAllModelsInDatasetRequest {},
658 Retry::Yes,
659 )?
660 .labellers)
661 }
662
663 pub fn get_label_validation(
664 &self,
665 label: &LabelName,
666 dataset_name: &DatasetFullName,
667 model_version: &ModelVersion,
668 ) -> Result<LabelValidation> {
669 Ok(self
670 .post::<_, _, LabelValidationResponse>(
671 self.endpoints
672 .label_validation(dataset_name, model_version)?,
673 LabelValidationRequest {
674 label: label.clone(),
675 },
676 Retry::Yes,
677 )?
678 .label_validation)
679 }
680
681 pub fn sync_comments(
682 &self,
683 source_name: &SourceFullName,
684 comments: Vec<NewComment>,
685 no_charge: bool,
686 ) -> Result<SyncCommentsResponse> {
687 self.request(
688 &Method::POST,
689 &self.endpoints.sync_comments(source_name)?,
690 &Some(SyncCommentsRequest { comments }),
691 &Some(NoChargeQuery { no_charge }),
692 &Retry::Yes,
693 )
694 }
695
696 pub fn sync_comments_split_on_failure(
697 &self,
698 source_name: &SourceFullName,
699 comments: Vec<NewComment>,
700 no_charge: bool,
701 ) -> Result<SplitableRequestResponse<SyncCommentsResponse>> {
702 self.splitable_request(
703 Method::POST,
704 self.endpoints.sync_comments(source_name)?,
705 SyncCommentsRequest { comments },
706 Some(NoChargeQuery { no_charge }),
707 Retry::Yes,
708 )
709 }
710
711 pub fn sync_raw_emails(
712 &self,
713 source_name: &SourceFullName,
714 documents: &[Document],
715 transform_tag: &TransformTag,
716 include_comments: bool,
717 no_charge: bool,
718 ) -> Result<SyncRawEmailsResponse> {
719 self.request(
720 &Method::POST,
721 &self.endpoints.sync_comments_raw_emails(source_name)?,
722 &Some(SyncRawEmailsRequest {
723 documents,
724 transform_tag,
725 include_comments,
726 }),
727 &Some(NoChargeQuery { no_charge }),
728 &Retry::Yes,
729 )
730 }
731
732 pub fn put_emails_split_on_failure(
733 &self,
734 bucket_name: &BucketFullName,
735 emails: Vec<NewEmail>,
736 no_charge: bool,
737 ) -> Result<SplitableRequestResponse<PutEmailsResponse>> {
738 self.splitable_request(
739 Method::PUT,
740 self.endpoints.put_emails(bucket_name)?,
741 PutEmailsRequest { emails },
742 Some(NoChargeQuery { no_charge }),
743 Retry::Yes,
744 )
745 }
746
747 pub fn put_emails(
748 &self,
749 bucket_name: &BucketFullName,
750 emails: Vec<NewEmail>,
751 no_charge: bool,
752 ) -> Result<PutEmailsResponse> {
753 self.request(
754 &Method::PUT,
755 &self.endpoints.put_emails(bucket_name)?,
756 &Some(PutEmailsRequest { emails }),
757 &Some(NoChargeQuery { no_charge }),
758 &Retry::Yes,
759 )
760 }
761
762 pub fn post_user(&self, user_id: &UserId, user: UpdateUser) -> Result<PostUserResponse> {
763 self.post(
764 self.endpoints.post_user(user_id)?,
765 PostUserRequest { user: &user },
766 Retry::Yes,
767 )
768 }
769
770 pub fn put_comment_audio(
771 &self,
772 source_id: &SourceId,
773 comment_id: &CommentId,
774 audio_path: impl AsRef<Path>,
775 ) -> Result<()> {
776 let form = Form::new()
777 .file("file", audio_path)
778 .map_err(|source| Error::Unknown {
779 message: "PUT comment audio operation failed".to_owned(),
780 source: source.into(),
781 })?;
782 let http_response = self
783 .http_client
784 .put(self.endpoints.comment_audio(source_id, comment_id)?)
785 .headers(self.headers.clone())
786 .multipart(form)
787 .send()
788 .map_err(|source| Error::ReqwestError {
789 message: "PUT comment audio operation failed".to_owned(),
790 source,
791 })?;
792 let status = http_response.status();
793 http_response
794 .json::<Response<EmptySuccess>>()
795 .map_err(Error::BadJsonResponse)?
796 .into_result(status)?;
797 Ok(())
798 }
799
800 pub fn upload_ixp_document(
801 &self,
802 source_id: &SourceId,
803 filename: String,
804 bytes: Vec<u8>,
805 ) -> Result<CommentId> {
806 let endpoint = self.endpoints.ixp_documents(source_id)?;
807
808 let do_request = || {
809 let form = Form::new().part(
810 "file",
811 Part::bytes(bytes.clone()).file_name(filename.clone()),
812 );
813 let request = self
814 .http_client
815 .request(Method::PUT, endpoint.clone())
816 .multipart(form)
817 .headers(self.headers.clone());
818
819 request.send()
820 };
821
822 let result = self.with_retries(do_request);
823
824 let http_response = result.map_err(|source| Error::ReqwestError {
825 source,
826 message: "Operation failed.".to_string(),
827 })?;
828
829 let status = http_response.status();
830
831 Ok(http_response
832 .json::<Response<UploadIxpDocumentResponse>>()
833 .map_err(Error::BadJsonResponse)?
834 .into_result(status)?
835 .comment_id)
836 }
837
838 pub fn upload_comment_attachment(
839 &self,
840 source_id: &SourceId,
841 comment_id: &CommentId,
842 attachment_index: usize,
843 attachment: &PathBuf,
844 ) -> Result<UploadAttachmentResponse> {
845 let url = self
846 .endpoints
847 .attachment_upload(source_id, comment_id, attachment_index)?;
848
849 if !attachment.is_file() || !attachment.exists() {
850 return Err(Error::FileDoesNotExist {
851 path: attachment.clone(),
852 });
853 }
854
855 let do_request = || {
856 let form = Form::new()
857 .file("file", attachment)
858 .map_err(|source| Error::Unknown {
859 message: "PUT comment attachment operation failed".to_owned(),
860 source: source.into(),
861 })
862 .unwrap();
863 let request = self
864 .http_client
865 .request(Method::PUT, url.clone())
866 .multipart(form)
867 .headers(self.headers.clone());
868
869 request.send()
870 };
871
872 let result = self.with_retries(do_request);
873
874 let http_response = result.map_err(|source| Error::ReqwestError {
875 source,
876 message: "Operation failed.".to_string(),
877 })?;
878
879 let status = http_response.status();
880
881 http_response
882 .json::<Response<UploadAttachmentResponse>>()
883 .map_err(Error::BadJsonResponse)?
884 .into_result(status)
885 }
886
887 pub fn get_ixp_document(
888 &self,
889 source_id: &SourceId,
890 comment_id: &CommentId,
891 ) -> Result<Vec<u8>> {
892 self.get_octet_stream(&self.endpoints.ixp_document(source_id, comment_id)?)
893 }
894
895 fn get_octet_stream(&self, endpoint: &Url) -> Result<Vec<u8>> {
896 let mut response = self.raw_request(
897 &Method::GET,
898 endpoint,
899 &None::<()>,
900 &None::<()>,
901 &Retry::Yes,
902 None,
903 )?;
904
905 let status = response.status();
906
907 if !status.is_success() {
908 return Err(Error::Api {
909 status_code: status,
910 message: "Bad status code when getting octet stream".to_string(),
911 });
912 }
913
914 let mut buffer = Vec::new();
915
916 response
917 .read_to_end(&mut buffer)
918 .map_err(|source| Error::Unknown {
919 message: "Failed to read buffer".to_string(),
920 source: Box::new(source),
921 })?;
922 Ok(buffer)
923 }
924
925 pub fn get_attachment(&self, reference: &AttachmentReference) -> Result<Vec<u8>> {
926 self.get_octet_stream(&self.endpoints.attachment_reference(reference)?)
927 }
928
929 pub fn get_integrations(&self) -> Result<Vec<Integration>> {
930 Ok(self
931 .get::<_, GetIntegrationsResponse>(self.endpoints.integrations()?)?
932 .integrations)
933 }
934
935 pub fn get_integration(&self, name: &IntegrationFullName) -> Result<Integration> {
936 Ok(self
937 .get::<_, GetIntegrationResponse>(self.endpoints.integration(name)?)?
938 .integration)
939 }
940
941 pub fn get_datasets(&self) -> Result<Vec<Dataset>> {
942 Ok(self
943 .get::<_, GetAvailableDatasetsResponse>(self.endpoints.datasets.clone())?
944 .datasets)
945 }
946
947 pub fn get_dataset<IdentifierT>(&self, dataset: IdentifierT) -> Result<Dataset>
948 where
949 IdentifierT: Into<DatasetIdentifier>,
950 {
951 Ok(match dataset.into() {
952 DatasetIdentifier::Id(dataset_id) => {
953 self.get::<_, GetDatasetResponse>(self.endpoints.dataset_by_id(&dataset_id)?)?
954 .dataset
955 }
956 DatasetIdentifier::FullName(dataset_name) => {
957 self.get::<_, GetDatasetResponse>(self.endpoints.dataset_by_name(&dataset_name)?)?
958 .dataset
959 }
960 })
961 }
962
963 pub fn create_ixp_dataset(&self, dataset: IxpDatasetNew) -> Result<Dataset> {
965 Ok(self
966 .put::<_, _, CreateIxpDatasetResponse>(
967 self.endpoints.ixp_datasets()?,
968 CreateIxpDatasetRequest { dataset },
969 )?
970 .dataset)
971 }
972
973 pub fn create_dataset(
975 &self,
976 dataset_name: &DatasetFullName,
977 options: NewDataset<'_>,
978 ) -> Result<Dataset> {
979 Ok(self
980 .put::<_, _, CreateDatasetResponse>(
981 self.endpoints.dataset_by_name(dataset_name)?,
982 CreateDatasetRequest { dataset: options },
983 )?
984 .dataset)
985 }
986
987 pub fn update_dataset(
989 &self,
990 dataset_name: &DatasetFullName,
991 options: UpdateDataset<'_>,
992 ) -> Result<Dataset> {
993 Ok(self
994 .post::<_, _, UpdateDatasetResponse>(
995 self.endpoints.dataset_by_name(dataset_name)?,
996 UpdateDatasetRequest { dataset: options },
997 Retry::Yes,
998 )?
999 .dataset)
1000 }
1001
1002 pub fn delete_dataset<IdentifierT>(&self, dataset: IdentifierT) -> Result<()>
1003 where
1004 IdentifierT: Into<DatasetIdentifier>,
1005 {
1006 let dataset_id = match dataset.into() {
1007 DatasetIdentifier::Id(dataset_id) => dataset_id,
1008 dataset @ DatasetIdentifier::FullName(_) => self.get_dataset(dataset)?.id,
1009 };
1010 self.delete(self.endpoints.dataset_by_id(&dataset_id)?)
1011 }
1012
1013 pub fn get_labellings<'a>(
1015 &self,
1016 dataset_name: &DatasetFullName,
1017 comment_uids: impl Iterator<Item = &'a CommentUid>,
1018 ) -> Result<Vec<AnnotatedComment>> {
1019 Ok(self
1020 .get_query::<_, _, GetAnnotationsResponse>(
1021 self.endpoints.get_labellings(dataset_name)?,
1022 Some(&id_list_query(comment_uids.into_iter().map(|id| &id.0))),
1023 )?
1024 .results)
1025 }
1026
1027 pub fn get_labellings_iter<'a>(
1029 &'a self,
1030 dataset_name: &'a DatasetFullName,
1031 source_id: &'a SourceId,
1032 return_predictions: bool,
1033 limit: Option<usize>,
1034 ) -> LabellingsIter<'a> {
1035 LabellingsIter::new(self, dataset_name, source_id, return_predictions, limit)
1036 }
1037
1038 pub fn get_labellings_in_bulk(
1040 &self,
1041 dataset_name: &DatasetFullName,
1042 query_parameters: GetLabellingsInBulk<'_>,
1043 ) -> Result<GetAnnotationsResponse> {
1044 self.get_query::<_, _, GetAnnotationsResponse>(
1045 self.endpoints.get_labellings(dataset_name)?,
1046 Some(&query_parameters),
1047 )
1048 }
1049
1050 pub fn update_labelling(
1052 &self,
1053 dataset_name: &DatasetFullName,
1054 comment_uid: &CommentUid,
1055 labelling: Option<&[NewLabelling]>,
1056 entities: Option<&NewEntities>,
1057 moon_forms: Option<&[NewMoonForm]>,
1058 ) -> Result<AnnotatedComment> {
1059 self.post::<_, _, AnnotatedComment>(
1060 self.endpoints.post_labelling(dataset_name, comment_uid)?,
1061 UpdateAnnotationsRequest {
1062 labelling,
1063 entities,
1064 moon_forms,
1065 },
1066 Retry::Yes,
1067 )
1068 }
1069
1070 pub fn get_comment_predictions<'a>(
1072 &self,
1073 dataset_name: &DatasetFullName,
1074 model_version: &ModelVersion,
1075 comment_uids: impl Iterator<Item = &'a CommentUid>,
1076 threshold: Option<CommentPredictionsThreshold>,
1077 labels: Option<Vec<TriggerLabelThreshold>>,
1078 ) -> Result<Vec<Prediction>> {
1079 Ok(self
1080 .post::<_, _, GetPredictionsResponse>(
1081 self.endpoints
1082 .get_comment_predictions(dataset_name, model_version)?,
1083 GetCommentPredictionsRequest {
1084 uids: comment_uids
1085 .into_iter()
1086 .map(|id| id.0.clone())
1087 .collect::<Vec<_>>(),
1088
1089 threshold,
1090 labels,
1091 },
1092 Retry::Yes,
1093 )?
1094 .predictions)
1095 }
1096
1097 pub fn get_streams(&self, dataset_name: &DatasetFullName) -> Result<Vec<Stream>> {
1098 Ok(self
1099 .get::<_, GetStreamsResponse>(self.endpoints.streams(dataset_name)?)?
1100 .streams)
1101 }
1102
1103 pub fn get_recent_comments(
1104 &self,
1105 dataset_name: &DatasetFullName,
1106 filter: &CommentFilter,
1107 limit: usize,
1108 continuation: Option<&Continuation>,
1109 ) -> Result<RecentCommentsPage> {
1110 self.post::<_, _, RecentCommentsPage>(
1111 self.endpoints.recent_comments(dataset_name)?,
1112 GetRecentRequest {
1113 limit,
1114 filter,
1115 continuation,
1116 },
1117 Retry::No,
1118 )
1119 }
1120
1121 pub fn refresh_user_permissions(&self) -> Result<RefreshUserPermissionsResponse> {
1122 self.post::<_, _, RefreshUserPermissionsResponse>(
1123 self.endpoints.refresh_user_permissions()?,
1124 RefreshUserPermissionsRequest {},
1125 Retry::Yes,
1126 )
1127 }
1128
1129 pub fn get_current_user(&self) -> Result<User> {
1130 Ok(self
1131 .get::<_, GetCurrentUserResponse>(self.endpoints.current_user.clone())?
1132 .user)
1133 }
1134
1135 pub fn get_users(&self) -> Result<Vec<User>> {
1136 Ok(self
1137 .get::<_, GetAvailableUsersResponse>(self.endpoints.users.clone())?
1138 .users)
1139 }
1140
1141 pub fn create_user(&self, user: NewUser<'_>) -> Result<User> {
1142 Ok(self
1143 .put::<_, _, CreateUserResponse>(
1144 self.endpoints.users.clone(),
1145 CreateUserRequest { user },
1146 )?
1147 .user)
1148 }
1149
1150 pub fn dataset_summary(
1151 &self,
1152 dataset_name: &DatasetFullName,
1153 params: &SummaryRequestParams,
1154 ) -> Result<SummaryResponse> {
1155 self.post::<_, _, SummaryResponse>(
1156 self.endpoints.dataset_summary(dataset_name)?,
1157 serde_json::to_value(params).expect("summary params serialization error"),
1158 Retry::Yes,
1159 )
1160 }
1161
1162 pub fn query_dataset_csv(
1163 &self,
1164 dataset_name: &DatasetFullName,
1165 params: &QueryRequestParams,
1166 ) -> Result<String> {
1167 let response = self
1168 .raw_request(
1169 &Method::POST,
1170 &self.endpoints.query_dataset(dataset_name)?,
1171 &Some(serde_json::to_value(params).expect("query params serialization error")),
1172 &None::<()>,
1173 &Retry::Yes,
1174 Some(HeaderValue::from_str("text/csv").expect("Could not parse csv header")),
1175 )?
1176 .text()
1177 .expect("Could not get csv text");
1178
1179 Ok(response)
1180 }
1181
1182 pub fn query_dataset(
1183 &self,
1184 dataset_name: &DatasetFullName,
1185 params: &QueryRequestParams,
1186 ) -> Result<QueryResponse> {
1187 self.post::<_, _, QueryResponse>(
1188 self.endpoints.query_dataset(dataset_name)?,
1189 serde_json::to_value(params).expect("query params serialization error"),
1190 Retry::Yes,
1191 )
1192 }
1193
1194 pub fn send_welcome_email(&self, user_id: UserId) -> Result<()> {
1195 self.post::<_, _, WelcomeEmailResponse>(
1196 self.endpoints.welcome_email(&user_id)?,
1197 json!({}),
1198 Retry::No,
1199 )?;
1200 Ok(())
1201 }
1202
1203 pub fn get_bucket_statistics(&self, bucket_name: &BucketFullName) -> Result<BucketStatistics> {
1204 Ok(self
1205 .get::<_, GetBucketStatisticsResponse>(self.endpoints.bucket_statistics(bucket_name)?)?
1206 .statistics)
1207 }
1208
1209 pub fn get_dataset_statistics(
1210 &self,
1211 dataset_name: &DatasetFullName,
1212 params: &DatasetStatisticsRequestParams,
1213 ) -> Result<CommentStatistics> {
1214 Ok(self
1215 .post::<_, _, GetStatisticsResponse>(
1216 self.endpoints.dataset_statistics(dataset_name)?,
1217 serde_json::to_value(params)
1218 .expect("dataset statistics params serialization error"),
1219 Retry::No,
1220 )?
1221 .statistics)
1222 }
1223
1224 pub fn get_source_statistics(
1225 &self,
1226 source_name: &SourceFullName,
1227 params: &SourceStatisticsRequestParams,
1228 ) -> Result<CommentStatistics> {
1229 Ok(self
1230 .post::<_, _, GetStatisticsResponse>(
1231 self.endpoints.source_statistics(source_name)?,
1232 serde_json::to_value(params).expect("source statistics params serialization error"),
1233 Retry::No,
1234 )?
1235 .statistics)
1236 }
1237
1238 pub fn create_bucket(
1240 &self,
1241 bucket_name: &BucketFullName,
1242 options: NewBucket<'_>,
1243 ) -> Result<Bucket> {
1244 Ok(self
1245 .put::<_, _, CreateBucketResponse>(
1246 self.endpoints.bucket_by_name(bucket_name)?,
1247 CreateBucketRequest { bucket: options },
1248 )?
1249 .bucket)
1250 }
1251
1252 pub fn get_buckets(&self) -> Result<Vec<Bucket>> {
1253 Ok(self
1254 .get::<_, GetAvailableBucketsResponse>(self.endpoints.buckets.clone())?
1255 .buckets)
1256 }
1257
1258 pub fn get_bucket<IdentifierT>(&self, bucket: IdentifierT) -> Result<Bucket>
1259 where
1260 IdentifierT: Into<BucketIdentifier>,
1261 {
1262 Ok(match bucket.into() {
1263 BucketIdentifier::Id(bucket_id) => {
1264 self.get::<_, GetBucketResponse>(self.endpoints.bucket_by_id(&bucket_id)?)?
1265 .bucket
1266 }
1267 BucketIdentifier::FullName(bucket_name) => {
1268 self.get::<_, GetBucketResponse>(self.endpoints.bucket_by_name(&bucket_name)?)?
1269 .bucket
1270 }
1271 })
1272 }
1273
1274 pub fn delete_bucket<IdentifierT>(&self, bucket: IdentifierT) -> Result<()>
1275 where
1276 IdentifierT: Into<BucketIdentifier>,
1277 {
1278 let bucket_id = match bucket.into() {
1279 BucketIdentifier::Id(bucket_id) => bucket_id,
1280 bucket @ BucketIdentifier::FullName(_) => self.get_bucket(bucket)?.id,
1281 };
1282 self.delete(self.endpoints.bucket_by_id(&bucket_id)?)
1283 }
1284
1285 pub fn fetch_stream_comments(
1286 &self,
1287 stream_name: &StreamFullName,
1288 size: u32,
1289 ) -> Result<StreamBatch> {
1290 self.post(
1291 self.endpoints.stream_fetch(stream_name)?,
1292 StreamFetchRequest { size },
1293 Retry::No,
1294 )
1295 }
1296
1297 pub fn get_stream(&self, stream_name: &StreamFullName) -> Result<Stream> {
1298 Ok(self
1299 .get::<_, GetStreamResponse>(self.endpoints.stream(stream_name)?)?
1300 .stream)
1301 }
1302
1303 pub fn advance_stream(
1304 &self,
1305 stream_name: &StreamFullName,
1306 sequence_id: StreamSequenceId,
1307 ) -> Result<()> {
1308 self.post::<_, _, serde::de::IgnoredAny>(
1309 self.endpoints.stream_advance(stream_name)?,
1310 StreamAdvanceRequest { sequence_id },
1311 Retry::No,
1312 )?;
1313 Ok(())
1314 }
1315
1316 pub fn reset_stream(
1317 &self,
1318 stream_name: &StreamFullName,
1319 to_comment_created_at: DateTime<Utc>,
1320 ) -> Result<()> {
1321 self.post::<_, _, serde::de::IgnoredAny>(
1322 self.endpoints.stream_reset(stream_name)?,
1323 StreamResetRequest {
1324 to_comment_created_at,
1325 },
1326 Retry::No,
1327 )?;
1328 Ok(())
1329 }
1330
1331 pub fn tag_stream_exceptions(
1332 &self,
1333 stream_name: &StreamFullName,
1334 exceptions: &[StreamException],
1335 ) -> Result<()> {
1336 self.put::<_, _, serde::de::IgnoredAny>(
1337 self.endpoints.stream_exceptions(stream_name)?,
1338 TagStreamExceptionsRequest { exceptions },
1339 )?;
1340 Ok(())
1341 }
1342
1343 pub fn get_project(&self, project_name: &ProjectName) -> Result<Project> {
1345 let response =
1346 self.get::<_, GetProjectResponse>(self.endpoints.project_by_name(project_name)?)?;
1347 Ok(response.project)
1348 }
1349
1350 pub fn get_projects(&self) -> Result<Vec<Project>> {
1352 let response = self.get::<_, GetProjectsResponse>(self.endpoints.projects.clone())?;
1353 Ok(response.projects)
1354 }
1355
1356 pub fn create_project(
1358 &self,
1359 project_name: &ProjectName,
1360 options: NewProject,
1361 user_ids: &[UserId],
1362 ) -> Result<Project> {
1363 Ok(self
1364 .put::<_, _, CreateProjectResponse>(
1365 self.endpoints.project_by_name(project_name)?,
1366 CreateProjectRequest {
1367 project: options,
1368 user_ids,
1369 },
1370 )?
1371 .project)
1372 }
1373
1374 pub fn update_project(
1376 &self,
1377 project_name: &ProjectName,
1378 options: UpdateProject,
1379 ) -> Result<Project> {
1380 Ok(self
1381 .post::<_, _, UpdateProjectResponse>(
1382 self.endpoints.project_by_name(project_name)?,
1383 UpdateProjectRequest { project: options },
1384 Retry::Yes,
1385 )?
1386 .project)
1387 }
1388
1389 pub fn delete_project(
1391 &self,
1392 project_name: &ProjectName,
1393 force_delete: ForceDeleteProject,
1394 ) -> Result<()> {
1395 let endpoint = self.endpoints.project_by_name(project_name)?;
1396 match force_delete {
1397 ForceDeleteProject::No => self.delete(endpoint)?,
1398 ForceDeleteProject::Yes => {
1399 self.delete_query(endpoint, Some(&json!({ "force": true })))?
1400 }
1401 };
1402 Ok(())
1403 }
1404
1405 fn get<LocationT, SuccessT>(&self, url: LocationT) -> Result<SuccessT>
1406 where
1407 LocationT: IntoUrl + Display + Clone,
1408 for<'de> SuccessT: Deserialize<'de>,
1409 {
1410 self.request(&Method::GET, &url, &None::<()>, &None::<()>, &Retry::Yes)
1411 }
1412
1413 fn get_query<LocationT, QueryT, SuccessT>(
1414 &self,
1415 url: LocationT,
1416 query: Option<&QueryT>,
1417 ) -> Result<SuccessT>
1418 where
1419 LocationT: IntoUrl + Display + Clone,
1420 QueryT: Serialize,
1421 for<'de> SuccessT: Deserialize<'de>,
1422 {
1423 self.request(&Method::GET, &url, &None::<()>, &Some(query), &Retry::Yes)
1424 }
1425
1426 fn delete<LocationT>(&self, url: LocationT) -> Result<()>
1427 where
1428 LocationT: IntoUrl + Display + Clone,
1429 {
1430 self.delete_query::<LocationT, ()>(url, None)
1431 }
1432
1433 fn delete_query<LocationT, QueryT>(&self, url: LocationT, query: Option<&QueryT>) -> Result<()>
1434 where
1435 LocationT: IntoUrl + Display + Clone,
1436 QueryT: Serialize,
1437 {
1438 debug!("Attempting DELETE `{url}`");
1439
1440 let attempts = Cell::new(0);
1441 let http_response = self
1442 .with_retries(|| {
1443 attempts.set(attempts.get() + 1);
1444
1445 let mut request = self
1446 .http_client
1447 .delete(url.clone())
1448 .headers(self.headers.clone());
1449 if let Some(query) = query {
1450 request = request.query(query);
1451 }
1452 request.send()
1453 })
1454 .map_err(|source| Error::ReqwestError {
1455 source,
1456 message: "DELETE operation failed.".to_owned(),
1457 })?;
1458 let status = http_response.status();
1459 http_response
1460 .json::<Response<EmptySuccess>>()
1461 .map_err(Error::BadJsonResponse)?
1462 .into_result(status)
1463 .map_or_else(
1464 |error| {
1467 if attempts.get() > 1 && status == reqwest::StatusCode::NOT_FOUND {
1468 Ok(())
1469 } else {
1470 Err(error)
1471 }
1472 },
1473 |_| Ok(()),
1474 )
1475 }
1476
1477 fn post<LocationT, RequestT, SuccessT>(
1478 &self,
1479 url: LocationT,
1480 request: RequestT,
1481 retry: Retry,
1482 ) -> Result<SuccessT>
1483 where
1484 LocationT: IntoUrl + Display + Clone,
1485 RequestT: Serialize,
1486 for<'de> SuccessT: Deserialize<'de>,
1487 {
1488 self.request(&Method::POST, &url, &Some(request), &None::<()>, &retry)
1489 }
1490
1491 fn put<LocationT, RequestT, SuccessT>(
1492 &self,
1493 url: LocationT,
1494 request: RequestT,
1495 ) -> Result<SuccessT>
1496 where
1497 LocationT: IntoUrl + Display + Clone,
1498 RequestT: Serialize,
1499 for<'de> SuccessT: Deserialize<'de>,
1500 {
1501 self.request(&Method::PUT, &url, &Some(request), &None::<()>, &Retry::Yes)
1502 }
1503
1504 fn raw_request<LocationT, RequestT, QueryT>(
1505 &self,
1506 method: &Method,
1507 url: &LocationT,
1508 body: &Option<RequestT>,
1509 query: &Option<QueryT>,
1510 retry: &Retry,
1511 accept_header: Option<HeaderValue>,
1512 ) -> Result<reqwest::blocking::Response>
1513 where
1514 LocationT: IntoUrl + Display + Clone,
1515 RequestT: Serialize,
1516 QueryT: Serialize,
1517 {
1518 let mut headers = self.headers.clone();
1519
1520 if let Some(accept_header) = accept_header {
1521 headers.insert(ACCEPT, accept_header);
1522 }
1523
1524 let do_request = || {
1525 let request = self
1526 .http_client
1527 .request(method.clone(), url.clone())
1528 .headers(headers.clone());
1529
1530 let request = match &query {
1531 Some(query) => request.query(query),
1532 None => request,
1533 };
1534 let request = match &body {
1535 Some(body) => request.json(body),
1536 None => request,
1537 };
1538 request.send()
1539 };
1540
1541 let result = match retry {
1542 Retry::Yes => self.with_retries(do_request),
1543 Retry::No => do_request(),
1544 };
1545 let http_response = result.map_err(|source| Error::ReqwestError {
1546 source,
1547 message: format!("{method} operation failed."),
1548 })?;
1549
1550 Ok(http_response)
1551 }
1552
1553 fn splitable_request<LocationT, RequestT, SuccessT, QueryT>(
1554 &self,
1555 method: Method,
1556 url: LocationT,
1557 body: RequestT,
1558 query: Option<QueryT>,
1559 retry: Retry,
1560 ) -> Result<SplitableRequestResponse<SuccessT>>
1561 where
1562 LocationT: IntoUrl + Display + Clone,
1563 RequestT: Serialize + SplittableRequest + Clone,
1564 QueryT: Serialize + Clone,
1565 for<'de> SuccessT: Deserialize<'de> + ReducibleResponse + Clone + Default,
1566 {
1567 debug!("Attempting {method} `{url}`");
1568 let result: Result<SuccessT> =
1569 self.request(&method, &url, &Some(body.clone()), &query, &retry);
1570
1571 fn should_split(error: &Error) -> bool {
1572 if let Error::Api { status_code, .. } = error {
1573 *status_code == reqwest::StatusCode::UNPROCESSABLE_ENTITY
1574 || *status_code == reqwest::StatusCode::BAD_REQUEST
1575 } else if let Error::BadJsonResponse(_) = error {
1576 true
1579 } else if let Error::ReqwestError { source, .. } = error {
1580 source.is_timeout()
1582 } else {
1583 false
1584 }
1585 }
1586
1587 match result {
1588 Ok(response) => Ok(SplitableRequestResponse {
1589 response,
1590 num_failed: 0,
1591 }),
1592 Err(error) if should_split(&error) => {
1593 let mut num_failed = 0;
1594 let response = body
1595 .split()
1596 .filter_map(|request| {
1597 match self.request(&method, &url, &Some(request), &query, &retry) {
1598 Ok(response) => Some(response),
1599 Err(_) => {
1600 num_failed += 1;
1601 None
1602 }
1603 }
1604 })
1605 .fold(SuccessT::empty(), |merged, next: SuccessT| {
1606 merged.merge(next)
1607 });
1608
1609 Ok(SplitableRequestResponse {
1610 num_failed,
1611 response,
1612 })
1613 }
1614 Err(error) => Err(error),
1615 }
1616 }
1617
1618 fn request<LocationT, RequestT, SuccessT, QueryT>(
1619 &self,
1620 method: &Method,
1621 url: &LocationT,
1622 body: &Option<RequestT>,
1623 query: &Option<QueryT>,
1624 retry: &Retry,
1625 ) -> Result<SuccessT>
1626 where
1627 LocationT: IntoUrl + Display + Clone,
1628 RequestT: Serialize,
1629 QueryT: Serialize + Clone,
1630 for<'de> SuccessT: Deserialize<'de>,
1631 {
1632 debug!("Attempting {method} `{url}`");
1633 let http_response = self.raw_request(method, url, body, query, retry, None)?;
1634
1635 let status = http_response.status();
1636 http_response
1637 .json::<Response<SuccessT>>()
1638 .map_err(Error::BadJsonResponse)?
1639 .into_result(status)
1640 }
1641
1642 fn with_retries(
1643 &self,
1644 send_request: impl Fn() -> ReqwestResult<HttpResponse>,
1645 ) -> ReqwestResult<HttpResponse> {
1646 match &self.retrier {
1647 Some(retrier) => retrier.with_retries(send_request),
1648 None => send_request(),
1649 }
1650 }
1651}
1652
1653#[derive(Copy, Clone)]
1654enum Retry {
1655 Yes,
1656 No,
1657}
1658
1659pub struct DatasetQueryIter<'a> {
1660 client: &'a Client,
1661 dataset_name: &'a DatasetFullName,
1662 done: bool,
1663 params: &'a mut QueryRequestParams,
1664}
1665
1666impl<'a> DatasetQueryIter<'a> {
1667 fn new(
1668 client: &'a Client,
1669 dataset_name: &'a DatasetFullName,
1670 params: &'a mut QueryRequestParams,
1671 ) -> Self {
1672 Self {
1673 client,
1674 dataset_name,
1675 done: false,
1676 params,
1677 }
1678 }
1679}
1680
1681impl Iterator for DatasetQueryIter<'_> {
1682 type Item = Result<Vec<AnnotatedComment>>;
1683
1684 fn next(&mut self) -> Option<Self::Item> {
1685 if self.done {
1686 return None;
1687 }
1688
1689 let response = self.client.query_dataset(self.dataset_name, self.params);
1690 Some(response.map(|page| {
1691 self.params.continuation = page.continuation;
1692 self.done = self.params.continuation.is_none();
1693 page.results
1694 }))
1695 }
1696}
1697
1698pub enum ContinuationKind {
1699 Timestamp(DateTime<Utc>),
1700 Continuation(Continuation),
1701}
1702
1703pub struct EmailsIter<'a> {
1704 client: &'a Client,
1705 bucket_name: &'a BucketFullName,
1706 continuation: Option<EmailContinuation>,
1707 done: bool,
1708 page_size: usize,
1709}
1710
1711impl<'a> EmailsIter<'a> {
1712 pub const DEFAULT_PAGE_SIZE: usize = 64;
1714 pub const MAX_PAGE_SIZE: usize = 256;
1716
1717 fn new(client: &'a Client, bucket_name: &'a BucketFullName, page_size: Option<usize>) -> Self {
1718 Self {
1719 client,
1720 bucket_name,
1721 continuation: None,
1722 done: false,
1723 page_size: page_size.unwrap_or(Self::DEFAULT_PAGE_SIZE),
1724 }
1725 }
1726}
1727
1728impl Iterator for EmailsIter<'_> {
1729 type Item = Result<Vec<Email>>;
1730
1731 fn next(&mut self) -> Option<Self::Item> {
1732 if self.done {
1733 return None;
1734 }
1735 let response = self.client.get_emails_iter_page(
1736 self.bucket_name,
1737 self.continuation.as_ref(),
1738 self.page_size,
1739 );
1740 Some(response.map(|page| {
1741 self.continuation = page.continuation;
1742 self.done = self.continuation.is_none();
1743 page.emails
1744 }))
1745 }
1746}
1747
1748pub struct CommentsIter<'a> {
1749 client: &'a Client,
1750 source_name: &'a SourceFullName,
1751 continuation: Option<ContinuationKind>,
1752 done: bool,
1753 page_size: usize,
1754 to_timestamp: Option<DateTime<Utc>>,
1755}
1756
1757#[derive(Debug, Default)]
1758pub struct CommentsIterTimerange {
1759 pub from: Option<DateTime<Utc>>,
1760 pub to: Option<DateTime<Utc>>,
1761}
1762impl<'a> CommentsIter<'a> {
1763 pub const DEFAULT_PAGE_SIZE: usize = 64;
1765 pub const MAX_PAGE_SIZE: usize = 256;
1767
1768 fn new(
1769 client: &'a Client,
1770 source_name: &'a SourceFullName,
1771 page_size: Option<usize>,
1772 timerange: CommentsIterTimerange,
1773 ) -> Self {
1774 let (from_timestamp, to_timestamp) = (timerange.from, timerange.to);
1775 Self {
1776 client,
1777 source_name,
1778 to_timestamp,
1779 continuation: from_timestamp.map(ContinuationKind::Timestamp),
1780 done: false,
1781 page_size: page_size.unwrap_or(Self::DEFAULT_PAGE_SIZE),
1782 }
1783 }
1784}
1785
1786impl Iterator for CommentsIter<'_> {
1787 type Item = Result<Vec<Comment>>;
1788
1789 fn next(&mut self) -> Option<Self::Item> {
1790 if self.done {
1791 return None;
1792 }
1793 let response = self.client.get_comments_iter_page(
1794 self.source_name,
1795 self.continuation.as_ref(),
1796 self.to_timestamp,
1797 self.page_size,
1798 );
1799 Some(response.map(|page| {
1800 self.continuation = page.continuation.map(ContinuationKind::Continuation);
1801 self.done = self.continuation.is_none();
1802 page.comments
1803 }))
1804 }
1805}
1806
1807pub struct LabellingsIter<'a> {
1808 client: &'a Client,
1809 dataset_name: &'a DatasetFullName,
1810 source_id: &'a SourceId,
1811 return_predictions: bool,
1812 after: Option<GetLabellingsAfter>,
1813 limit: Option<usize>,
1814 done: bool,
1815}
1816
1817impl<'a> LabellingsIter<'a> {
1818 fn new(
1819 client: &'a Client,
1820 dataset_name: &'a DatasetFullName,
1821 source_id: &'a SourceId,
1822 return_predictions: bool,
1823 limit: Option<usize>,
1824 ) -> Self {
1825 Self {
1826 client,
1827 dataset_name,
1828 source_id,
1829 return_predictions,
1830 after: None,
1831 limit,
1832 done: false,
1833 }
1834 }
1835}
1836
1837impl Iterator for LabellingsIter<'_> {
1838 type Item = Result<Vec<AnnotatedComment>>;
1839
1840 fn next(&mut self) -> Option<Self::Item> {
1841 if self.done {
1842 return None;
1843 }
1844 let response = self.client.get_labellings_in_bulk(
1845 self.dataset_name,
1846 GetLabellingsInBulk {
1847 source_id: self.source_id,
1848 return_predictions: &self.return_predictions,
1849 after: &self.after,
1850 limit: &self.limit,
1851 },
1852 );
1853 Some(response.map(|page| {
1854 if self.after == page.after && !page.results.is_empty() {
1855 panic!("Labellings API did not increment pagination continuation");
1856 }
1857 self.after = page.after;
1858 if page.results.is_empty() {
1859 self.done = true;
1860 }
1861 page.results
1862 }))
1863 }
1864}
1865
1866#[derive(Debug)]
1867struct Endpoints {
1868 base: Url,
1869 datasets: Url,
1870 sources: Url,
1871 buckets: Url,
1872 users: Url,
1873 current_user: Url,
1874 projects: Url,
1875}
1876
1877#[derive(Debug, Serialize, Clone, Copy)]
1878struct NoChargeQuery {
1879 no_charge: bool,
1880}
1881
1882fn construct_endpoint(base: &Url, segments: &[&str]) -> Result<Url> {
1883 let mut endpoint = base.clone();
1884
1885 let mut endpoint_segments = endpoint
1886 .path_segments_mut()
1887 .map_err(|_| Error::BadEndpoint {
1888 endpoint: base.clone(),
1889 })?;
1890
1891 for segment in segments {
1892 endpoint_segments.push(segment);
1893 }
1894
1895 drop(endpoint_segments);
1896
1897 Ok(endpoint)
1898}
1899
1900impl Endpoints {
1901 pub fn new(base: Url) -> Result<Self> {
1902 let datasets = construct_endpoint(&base, &["api", "v1", "datasets"])?;
1903 let sources = construct_endpoint(&base, &["api", "v1", "sources"])?;
1904 let buckets = construct_endpoint(&base, &["api", "_private", "buckets"])?;
1905 let users = construct_endpoint(&base, &["api", "_private", "users"])?;
1906 let current_user = construct_endpoint(&base, &["auth", "user"])?;
1907 let projects = construct_endpoint(&base, &["api", "_private", "projects"])?;
1908
1909 Ok(Endpoints {
1910 base,
1911 datasets,
1912 sources,
1913 buckets,
1914 users,
1915 current_user,
1916 projects,
1917 })
1918 }
1919
1920 fn refresh_user_permissions(&self) -> Result<Url> {
1921 construct_endpoint(&self.base, &["auth", "refresh-user-permissions"])
1922 }
1923
1924 fn label_group(
1925 &self,
1926 dataset_name: &DatasetFullName,
1927 label_group: LabelGroupName,
1928 ) -> Result<Url> {
1929 construct_endpoint(
1930 &self.base,
1931 &[
1932 "api",
1933 "_private",
1934 "datasets",
1935 &dataset_name.0,
1936 "labels",
1937 &label_group.0,
1938 ],
1939 )
1940 }
1941
1942 fn ixp_datasets(&self) -> Result<Url> {
1943 construct_endpoint(&self.base, &["api", "_private", "ixp", "datasets"])
1944 }
1945
1946 fn ixp_documents(&self, source_id: &SourceId) -> Result<Url> {
1947 construct_endpoint(
1948 &self.base,
1949 &[
1950 "api",
1951 "_private",
1952 "sources",
1953 &format!("id:{0}", source_id.0),
1954 "documents",
1955 ],
1956 )
1957 }
1958
1959 fn ixp_document(&self, source_id: &SourceId, comment_id: &CommentId) -> Result<Url> {
1960 construct_endpoint(
1961 &self.base,
1962 &[
1963 "api",
1964 "_private",
1965 "sources",
1966 &format!("id:{0}", source_id.0),
1967 "documents",
1968 &comment_id.0,
1969 ],
1970 )
1971 }
1972
1973 fn keyed_sync_states(&self, bucket_id: &BucketId) -> Result<Url> {
1974 construct_endpoint(
1975 &self.base,
1976 &[
1977 "api",
1978 "_private",
1979 "buckets",
1980 &format!("id:{}", bucket_id.0),
1981 "keyed-sync-states/",
1982 ],
1983 )
1984 }
1985
1986 fn keyed_sync_state(&self, bucket_id: &BucketId, id: &KeyedSyncStateId) -> Result<Url> {
1987 construct_endpoint(
1988 &self.base,
1989 &[
1990 "api",
1991 "_private",
1992 "buckets",
1993 &format!("id:{}", bucket_id.0),
1994 "keyed-sync-state",
1995 &id.0,
1996 ],
1997 )
1998 }
1999
2000 fn query_keyed_sync_state_ids(&self, bucket_id: &BucketId) -> Result<Url> {
2001 construct_endpoint(
2002 &self.base,
2003 &[
2004 "api",
2005 "_private",
2006 "buckets",
2007 &format!("id:{}", bucket_id.0),
2008 "keyed-sync-state-ids",
2009 ],
2010 )
2011 }
2012
2013 fn audit_events_query(&self) -> Result<Url> {
2014 construct_endpoint(&self.base, &["api", "v1", "audit_events", "query"])
2015 }
2016
2017 fn integrations(&self) -> Result<Url> {
2018 construct_endpoint(&self.base, &["api", "_private", "integrations"])
2019 }
2020
2021 fn integration(&self, name: &IntegrationFullName) -> Result<Url> {
2022 construct_endpoint(&self.base, &["api", "_private", "integrations", &name.0])
2023 }
2024
2025 fn attachment_reference(&self, reference: &AttachmentReference) -> Result<Url> {
2026 construct_endpoint(&self.base, &["api", "v1", "attachments", &reference.0])
2027 }
2028
2029 fn attachment_upload(
2030 &self,
2031 source_id: &SourceId,
2032 comment_id: &CommentId,
2033 attachment_index: usize,
2034 ) -> Result<Url> {
2035 construct_endpoint(
2036 &self.base,
2037 &[
2038 "api",
2039 "_private",
2040 "sources",
2041 &format!("id:{}", source_id.0),
2042 "comments",
2043 &comment_id.0,
2044 "attachments",
2045 &attachment_index.to_string(),
2046 ],
2047 )
2048 }
2049 fn latest_validation(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2050 construct_endpoint(
2051 &self.base,
2052 &[
2053 "api",
2054 "_private",
2055 "datasets",
2056 &dataset_name.0,
2057 "labellers",
2058 "latest",
2059 "validation",
2060 ],
2061 )
2062 }
2063
2064 fn validation(
2065 &self,
2066 dataset_name: &DatasetFullName,
2067 model_version: &ModelVersion,
2068 ) -> Result<Url> {
2069 construct_endpoint(
2070 &self.base,
2071 &[
2072 "api",
2073 "_private",
2074 "datasets",
2075 &dataset_name.0,
2076 "labellers",
2077 &model_version.0.to_string(),
2078 "validation",
2079 ],
2080 )
2081 }
2082
2083 fn label_validation(
2084 &self,
2085 dataset_name: &DatasetFullName,
2086 model_version: &ModelVersion,
2087 ) -> Result<Url> {
2088 construct_endpoint(
2089 &self.base,
2090 &[
2091 "api",
2092 "_private",
2093 "datasets",
2094 &dataset_name.0,
2095 "labellers",
2096 &model_version.0.to_string(),
2097 "label-validation",
2098 ],
2099 )
2100 }
2101 fn bucket_statistics(&self, bucket_name: &BucketFullName) -> Result<Url> {
2102 construct_endpoint(
2103 &self.base,
2104 &["api", "_private", "buckets", &bucket_name.0, "statistics"],
2105 )
2106 }
2107
2108 fn dataset_summary(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2109 construct_endpoint(
2110 &self.base,
2111 &["api", "_private", "datasets", &dataset_name.0, "summary"],
2112 )
2113 }
2114
2115 fn query_dataset(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2116 construct_endpoint(
2117 &self.base,
2118 &["api", "_private", "datasets", &dataset_name.0, "query"],
2119 )
2120 }
2121
2122 fn streams(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2123 construct_endpoint(
2124 &self.base,
2125 &["api", "v1", "datasets", &dataset_name.0, "streams"],
2126 )
2127 }
2128
2129 fn stream(&self, stream_name: &StreamFullName) -> Result<Url> {
2130 construct_endpoint(
2131 &self.base,
2132 &[
2133 "api",
2134 "v1",
2135 "datasets",
2136 &stream_name.dataset.0,
2137 "streams",
2138 &stream_name.stream.0,
2139 ],
2140 )
2141 }
2142
2143 fn stream_fetch(&self, stream_name: &StreamFullName) -> Result<Url> {
2144 construct_endpoint(
2145 &self.base,
2146 &[
2147 "api",
2148 "v1",
2149 "datasets",
2150 &stream_name.dataset.0,
2151 "streams",
2152 &stream_name.stream.0,
2153 "fetch",
2154 ],
2155 )
2156 }
2157
2158 fn stream_advance(&self, stream_name: &StreamFullName) -> Result<Url> {
2159 construct_endpoint(
2160 &self.base,
2161 &[
2162 "api",
2163 "v1",
2164 "datasets",
2165 &stream_name.dataset.0,
2166 "streams",
2167 &stream_name.stream.0,
2168 "advance",
2169 ],
2170 )
2171 }
2172
2173 fn stream_reset(&self, stream_name: &StreamFullName) -> Result<Url> {
2174 construct_endpoint(
2175 &self.base,
2176 &[
2177 "api",
2178 "v1",
2179 "datasets",
2180 &stream_name.dataset.0,
2181 "streams",
2182 &stream_name.stream.0,
2183 "reset",
2184 ],
2185 )
2186 }
2187
2188 fn stream_exceptions(&self, stream_name: &StreamFullName) -> Result<Url> {
2189 construct_endpoint(
2190 &self.base,
2191 &[
2192 "api",
2193 "v1",
2194 "datasets",
2195 &stream_name.dataset.0,
2196 "streams",
2197 &stream_name.stream.0,
2198 "exceptions",
2199 ],
2200 )
2201 }
2202
2203 fn recent_comments(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2204 construct_endpoint(
2205 &self.base,
2206 &["api", "_private", "datasets", &dataset_name.0, "recent"],
2207 )
2208 }
2209
2210 fn dataset_statistics(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2211 construct_endpoint(
2212 &self.base,
2213 &["api", "_private", "datasets", &dataset_name.0, "statistics"],
2214 )
2215 }
2216
2217 fn source_statistics(&self, source_name: &SourceFullName) -> Result<Url> {
2218 construct_endpoint(
2219 &self.base,
2220 &["api", "v1", "sources", &source_name.0, "statistics"],
2221 )
2222 }
2223
2224 fn user_by_id(&self, user_id: &UserId) -> Result<Url> {
2225 construct_endpoint(&self.base, &["api", "_private", "users", &user_id.0])
2226 }
2227
2228 fn source_by_id(&self, source_id: &SourceId) -> Result<Url> {
2229 construct_endpoint(
2230 &self.base,
2231 &["api", "v1", "sources", &format!("id:{}", source_id.0)],
2232 )
2233 }
2234
2235 fn source_by_name(&self, source_name: &SourceFullName) -> Result<Url> {
2236 construct_endpoint(&self.base, &["api", "v1", "sources", &source_name.0])
2237 }
2238
2239 fn quotas(&self, tenant_id: &Option<UiPathTenantId>) -> Result<Url> {
2240 if let Some(tenant_id) = tenant_id {
2241 construct_endpoint(&self.base, &["api", "_private", "quotas", &tenant_id.0])
2242 } else {
2243 construct_endpoint(&self.base, &["api", "_private", "quotas"])
2244 }
2245 }
2246
2247 fn quota(&self, tenant_id: &TenantId, tenant_quota_kind: TenantQuotaKind) -> Result<Url> {
2248 construct_endpoint(
2249 &self.base,
2250 &[
2251 "api",
2252 "_private",
2253 "quotas",
2254 &tenant_id.to_string(),
2255 &tenant_quota_kind.to_string(),
2256 ],
2257 )
2258 }
2259
2260 fn put_comments(&self, source_name: &SourceFullName) -> Result<Url> {
2261 construct_endpoint(
2262 &self.base,
2263 &["api", "_private", "sources", &source_name.0, "comments"],
2264 )
2265 }
2266
2267 fn comments(&self, source_name: &SourceFullName) -> Result<Url> {
2268 construct_endpoint(
2269 &self.base,
2270 &["api", "_private", "sources", &source_name.0, "comments"],
2271 )
2272 }
2273
2274 fn comment_by_id(&self, source_name: &SourceFullName, comment_id: &CommentId) -> Result<Url> {
2275 construct_endpoint(
2276 &self.base,
2277 &[
2278 "api",
2279 "v1",
2280 "sources",
2281 &source_name.0,
2282 "comments",
2283 &comment_id.0,
2284 ],
2285 )
2286 }
2287
2288 fn comments_v1(&self, source_name: &SourceFullName) -> Result<Url> {
2289 construct_endpoint(
2290 &self.base,
2291 &["api", "v1", "sources", &source_name.0, "comments"],
2292 )
2293 }
2294
2295 fn sync_comments(&self, source_name: &SourceFullName) -> Result<Url> {
2296 construct_endpoint(
2297 &self.base,
2298 &["api", "v1", "sources", &source_name.0, "sync"],
2299 )
2300 }
2301
2302 fn sync_comments_raw_emails(&self, source_name: &SourceFullName) -> Result<Url> {
2303 construct_endpoint(
2304 &self.base,
2305 &["api", "v1", "sources", &source_name.0, "sync-raw-emails"],
2306 )
2307 }
2308
2309 fn comment_audio(&self, source_id: &SourceId, comment_id: &CommentId) -> Result<Url> {
2310 construct_endpoint(
2311 &self.base,
2312 &[
2313 "api",
2314 "_private",
2315 "sources",
2316 &format!("id:{}", source_id.0),
2317 "comments",
2318 &comment_id.0,
2319 "audio",
2320 ],
2321 )
2322 }
2323
2324 fn get_emails(&self, bucket_name: &BucketFullName) -> Result<Url> {
2325 construct_endpoint(
2326 &self.base,
2327 &["api", "_private", "buckets", &bucket_name.0, "emails"],
2328 )
2329 }
2330
2331 fn put_emails(&self, bucket_name: &BucketFullName) -> Result<Url> {
2332 construct_endpoint(
2333 &self.base,
2334 &["api", "_private", "buckets", &bucket_name.0, "emails"],
2335 )
2336 }
2337
2338 fn post_user(&self, user_id: &UserId) -> Result<Url> {
2339 construct_endpoint(&self.base, &["api", "_private", "users", &user_id.0])
2340 }
2341
2342 fn dataset_by_id(&self, dataset_id: &DatasetId) -> Result<Url> {
2343 construct_endpoint(
2344 &self.base,
2345 &["api", "v1", "datasets", &format!("id:{}", dataset_id.0)],
2346 )
2347 }
2348
2349 fn dataset_by_name(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2350 construct_endpoint(&self.base, &["api", "v1", "datasets", &dataset_name.0])
2351 }
2352
2353 fn get_labellings(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2354 construct_endpoint(
2355 &self.base,
2356 &["api", "_private", "datasets", &dataset_name.0, "labellings"],
2357 )
2358 }
2359
2360 fn labellers(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2361 construct_endpoint(
2362 &self.base,
2363 &["api", "_private", "datasets", &dataset_name.0, "labellers"],
2364 )
2365 }
2366
2367 fn get_comment_predictions(
2368 &self,
2369 dataset_name: &DatasetFullName,
2370 model_version: &ModelVersion,
2371 ) -> Result<Url> {
2372 construct_endpoint(
2373 &self.base,
2374 &[
2375 "api",
2376 "v1",
2377 "datasets",
2378 &dataset_name.0,
2379 "labellers",
2380 &model_version.0.to_string(),
2381 "predict-comments",
2382 ],
2383 )
2384 }
2385
2386 fn post_labelling(
2387 &self,
2388 dataset_name: &DatasetFullName,
2389 comment_uid: &CommentUid,
2390 ) -> Result<Url> {
2391 construct_endpoint(
2392 &self.base,
2393 &[
2394 "api",
2395 "_private",
2396 "datasets",
2397 &dataset_name.0,
2398 "labellings",
2399 &comment_uid.0,
2400 ],
2401 )
2402 }
2403
2404 fn bucket_by_id(&self, bucket_id: &BucketId) -> Result<Url> {
2405 construct_endpoint(
2406 &self.base,
2407 &["api", "_private", "buckets", &format!("id:{}", bucket_id.0)],
2408 )
2409 }
2410
2411 fn bucket_by_name(&self, bucket_name: &BucketFullName) -> Result<Url> {
2412 construct_endpoint(&self.base, &["api", "_private", "buckets", &bucket_name.0])
2413 }
2414
2415 fn project_by_name(&self, project_name: &ProjectName) -> Result<Url> {
2416 construct_endpoint(
2417 &self.base,
2418 &["api", "_private", "projects", &project_name.0],
2419 )
2420 }
2421
2422 fn welcome_email(&self, user_id: &UserId) -> Result<Url> {
2423 construct_endpoint(
2424 &self.base,
2425 &["api", "_private", "users", &user_id.0, "welcome-email"],
2426 )
2427 }
2428}
2429
2430const DEFAULT_HTTP_TIMEOUT_SECONDS: u64 = 240;
2431
2432fn build_http_client(config: &Config) -> Result<HttpClient> {
2433 let mut builder = HttpClient::builder()
2434 .gzip(true)
2435 .danger_accept_invalid_certs(config.accept_invalid_certificates)
2436 .timeout(Some(Duration::from_secs(DEFAULT_HTTP_TIMEOUT_SECONDS)));
2437
2438 if let Some(proxy) = config.proxy.clone() {
2439 builder = builder.proxy(Proxy::all(proxy).map_err(Error::BuildHttpClient)?);
2440 }
2441 builder.build().map_err(Error::BuildHttpClient)
2442}
2443
2444fn build_headers(config: &Config) -> Result<HeaderMap> {
2445 let mut headers = HeaderMap::new();
2446 headers.insert(
2447 header::AUTHORIZATION,
2448 HeaderValue::from_str(&format!("Bearer {}", &config.token.0)).map_err(|_| {
2449 Error::BadToken {
2450 token: config.token.0.clone(),
2451 }
2452 })?,
2453 );
2454 Ok(headers)
2455}
2456
2457fn id_list_query<'a>(ids: impl Iterator<Item = &'a String>) -> Vec<(&'static str, &'a str)> {
2458 ids.map(|id| ("id", id.as_str())).collect()
2462}
2463
2464pub static DEFAULT_ENDPOINT: Lazy<Url> =
2465 Lazy::new(|| Url::parse("https://reinfer.dev").expect("Default URL is well-formed"));
2466
2467#[cfg(test)]
2468mod tests {
2469 use super::*;
2470
2471 #[test]
2472 fn test_construct_endpoint() {
2473 let url = construct_endpoint(
2474 &Url::parse("https://cloud.uipath.com/org/tenant/reinfer_").unwrap(),
2475 &["api", "v1", "sources", "project", "source", "sync"],
2476 )
2477 .unwrap();
2478
2479 assert_eq!(
2480 url.to_string(),
2481 "https://cloud.uipath.com/org/tenant/reinfer_/api/v1/sources/project/source/sync"
2482 )
2483 }
2484
2485 #[test]
2486 fn test_id_list_query() {
2487 assert_eq!(id_list_query(Vec::new().iter()), Vec::new());
2488 assert_eq!(
2489 id_list_query(["foo".to_owned()].iter()),
2490 vec![("id", "foo")]
2491 );
2492 assert_eq!(
2493 id_list_query(
2494 [
2495 "Stream".to_owned(),
2496 "River".to_owned(),
2497 "Waterfall".to_owned()
2498 ]
2499 .iter()
2500 ),
2501 [("id", "Stream"), ("id", "River"), ("id", "Waterfall"),]
2502 );
2503 }
2504}