1#![deny(clippy::all)]
2mod error;
3pub mod resources;
4pub mod retry;
5
6use chrono::{DateTime, Utc};
7use http::{header::ACCEPT, Method};
8use log::debug;
9use once_cell::sync::Lazy;
10use reqwest::{
11 blocking::{
12 multipart::{Form, Part},
13 Client as HttpClient, Response as HttpResponse,
14 },
15 header::{self, HeaderMap, HeaderValue},
16 IntoUrl, Proxy, Result as ReqwestResult,
17};
18use resources::{
19 attachments::UploadAttachmentResponse,
20 auth::{RefreshUserPermissionsRequest, RefreshUserPermissionsResponse},
21 bucket::{
22 GetKeyedSyncStateIdsRequest, GetKeyedSyncStateIdsResponse, GetKeyedSyncStatesResponse,
23 KeyedSyncState, KeyedSyncStateId,
24 },
25 bucket_statistics::GetBucketStatisticsResponse,
26 comment::{AttachmentReference, CommentTimestampFilter},
27 dataset::{
28 CreateIxpDatasetRequest, CreateIxpDatasetResponse, GetAllModelsInDatasetRequest,
29 GetAllModelsInDatasetRespone, IxpDatasetNew, QueryRequestParams, QueryResponse,
30 StatisticsRequestParams as DatasetStatisticsRequestParams, SummaryRequestParams,
31 SummaryResponse, UploadIxpDocumentResponse, UserModelMetadata,
32 },
33 documents::{Document, SyncRawEmailsRequest, SyncRawEmailsResponse},
34 email::{Email, GetEmailResponse},
35 integration::{
36 GetIntegrationResponse, GetIntegrationsResponse, Integration, NewIntegration,
37 PostIntegrationRequest, PostIntegrationResponse, PutIntegrationRequest,
38 PutIntegrationResponse,
39 },
40 label_def::{CreateOrUpdateLabelDefsBulkRequest, CreateOrUpdateLabelDefsBulkResponse},
41 project::ForceDeleteProject,
42 quota::{GetQuotasResponse, Quota},
43 source::StatisticsRequestParams as SourceStatisticsRequestParams,
44 stream::{GetStreamResponse, NewStream, PutStreamRequest, PutStreamResponse},
45 tenant_id::UiPathTenantId,
46 validation::{
47 LabelValidation, LabelValidationRequest, LabelValidationResponse, ValidationResponse,
48 },
49};
50use serde::{Deserialize, Serialize};
51use serde_json::json;
52use std::{
53 cell::Cell,
54 fmt::{Debug, Display},
55 io::Read,
56 path::{Path, PathBuf},
57 time::Duration,
58};
59use url::Url;
60
61use crate::resources::{
62 audit::{AuditQueryFilter, AuditQueryRequest, AuditQueryResponse},
63 bucket::{
64 CreateRequest as CreateBucketRequest, CreateResponse as CreateBucketResponse,
65 GetAvailableResponse as GetAvailableBucketsResponse, GetResponse as GetBucketResponse,
66 },
67 bucket_statistics::Statistics as BucketStatistics,
68 comment::{
69 GetAnnotationsResponse, GetCommentResponse, GetLabellingsAfter, GetPredictionsResponse,
70 GetRecentRequest, PutCommentsRequest, PutCommentsResponse, RecentCommentsPage,
71 SyncCommentsRequest, UpdateAnnotationsRequest,
72 },
73 dataset::{
74 CreateRequest as CreateDatasetRequest, CreateResponse as CreateDatasetResponse,
75 GetAvailableResponse as GetAvailableDatasetsResponse, GetResponse as GetDatasetResponse,
76 UpdateRequest as UpdateDatasetRequest, UpdateResponse as UpdateDatasetResponse,
77 },
78 email::{PutEmailsRequest, PutEmailsResponse},
79 project::{
80 CreateProjectRequest, CreateProjectResponse, GetProjectResponse, GetProjectsResponse,
81 UpdateProjectRequest, UpdateProjectResponse,
82 },
83 quota::{CreateQuota, TenantQuotaKind},
84 source::{
85 CreateRequest as CreateSourceRequest, CreateResponse as CreateSourceResponse,
86 GetAvailableResponse as GetAvailableSourcesResponse, GetResponse as GetSourceResponse,
87 UpdateRequest as UpdateSourceRequest, UpdateResponse as UpdateSourceResponse,
88 },
89 statistics::GetResponse as GetStatisticsResponse,
90 stream::{
91 AdvanceRequest as StreamAdvanceRequest, FetchRequest as StreamFetchRequest,
92 GetStreamsResponse, ResetRequest as StreamResetRequest,
93 TagExceptionsRequest as TagStreamExceptionsRequest,
94 },
95 tenant_id::TenantId,
96 user::{
97 CreateRequest as CreateUserRequest, CreateResponse as CreateUserResponse,
98 GetAvailableResponse as GetAvailableUsersResponse,
99 GetCurrentResponse as GetCurrentUserResponse, GetResponse as GetUserResponse,
100 PostUserRequest, PostUserResponse, WelcomeEmailResponse,
101 },
102 EmptySuccess, Response,
103};
104
105use crate::retry::{Retrier, RetryConfig};
106
107pub use crate::{
108 error::{Error, Result},
109 resources::{
110 bucket::{
111 Bucket, BucketType, FullName as BucketFullName, Id as BucketId,
112 Identifier as BucketIdentifier, Name as BucketName, NewBucket,
113 },
114 comment::{
115 AnnotatedComment, Comment, CommentFilter, CommentPredictionsThreshold,
116 CommentsIterPage, Continuation, EitherLabelling, Entities, Entity,
117 GetCommentPredictionsRequest, HasAnnotations, Id as CommentId, Label, Labelling,
118 Message, MessageBody, MessageSignature, MessageSubject, NewAnnotatedComment,
119 NewComment, NewEntities, NewLabelling, NewMoonForm, PredictedLabel, Prediction,
120 PropertyMap, PropertyValue, Sentiment, SyncCommentsResponse, TriggerLabelThreshold,
121 Uid as CommentUid,
122 },
123 dataset::{
124 Dataset, FullName as DatasetFullName, Id as DatasetId, Identifier as DatasetIdentifier,
125 ModelVersion, Name as DatasetName, NewDataset, UpdateDataset,
126 },
127 email::{
128 Continuation as EmailContinuation, EmailsIterPage, Id as EmailId, Mailbox, MimeContent,
129 NewEmail,
130 },
131 entity_def::{EntityDef, Id as EntityDefId, Name as EntityName, NewEntityDef},
132 integration::FullName as IntegrationFullName,
133 label_def::{
134 LabelDef, LabelDefPretrained, MoonFormFieldDef, Name as LabelName, NewLabelDef,
135 NewLabelDefPretrained, PretrainedId as LabelDefPretrainedId,
136 },
137 label_group::{
138 LabelGroup, Name as LabelGroupName, NewLabelGroup, DEFAULT_LABEL_GROUP_NAME,
139 },
140 project::{NewProject, Project, ProjectName, UpdateProject},
141 source::{
142 FullName as SourceFullName, Id as SourceId, Identifier as SourceIdentifier,
143 Name as SourceName, NewSource, Source, SourceKind, TransformTag, UpdateSource,
144 },
145 statistics::Statistics as CommentStatistics,
146 stream::{
147 Batch as StreamBatch, FullName as StreamFullName, SequenceId as StreamSequenceId,
148 Stream, StreamException, StreamExceptionMetadata,
149 },
150 user::{
151 Email as UserEmail, GlobalPermission, Id as UserId, Identifier as UserIdentifier,
152 ModifiedPermissions, NewUser, ProjectPermission, UpdateUser, User, Username,
153 },
154 },
155};
156
157#[derive(Clone, Debug, PartialEq, Eq)]
158pub struct Token(pub String);
159
160pub trait SplittableRequest {
161 fn split(self) -> impl Iterator<Item = Self>
162 where
163 Self: Sized;
164
165 fn count(&self) -> usize;
166}
167
168pub struct SplitableRequestResponse<ResponseT>
169where
170 for<'de> ResponseT: Deserialize<'de> + ReducibleResponse,
171{
172 pub response: ResponseT,
173 pub num_failed: usize,
174}
175
176pub trait ReducibleResponse {
177 fn merge(self, _b: Self) -> Self
178 where
179 Self: std::default::Default,
180 {
181 Default::default()
182 }
183
184 fn empty() -> Self
185 where
186 Self: std::default::Default,
187 {
188 Default::default()
189 }
190}
191
192pub struct Config {
193 pub endpoint: Url,
194 pub token: Token,
195 pub accept_invalid_certificates: bool,
196 pub proxy: Option<Url>,
197 pub retry_config: Option<RetryConfig>,
200}
201
202impl Default for Config {
203 fn default() -> Self {
204 Config {
205 endpoint: DEFAULT_ENDPOINT.clone(),
206 token: Token("".to_owned()),
207 accept_invalid_certificates: false,
208 proxy: None,
209 retry_config: None,
210 }
211 }
212}
213
214#[derive(Debug)]
215pub struct Client {
216 endpoints: Endpoints,
217 http_client: HttpClient,
218 headers: HeaderMap,
219 retrier: Option<Retrier>,
220}
221
222#[derive(Serialize)]
223pub struct GetLabellingsInBulk<'a> {
224 pub source_id: &'a SourceId,
225 pub return_predictions: &'a bool,
226
227 #[serde(skip_serializing_if = "Option::is_none")]
228 pub after: &'a Option<GetLabellingsAfter>,
229
230 #[serde(skip_serializing_if = "Option::is_none")]
231 pub limit: &'a Option<usize>,
232}
233
234#[derive(Serialize)]
235pub struct GetCommentsIterPageQuery<'a> {
236 #[serde(skip_serializing_if = "Option::is_none")]
237 pub from_timestamp: Option<DateTime<Utc>>,
238 #[serde(skip_serializing_if = "Option::is_none")]
239 pub to_timestamp: Option<DateTime<Utc>>,
240 #[serde(skip_serializing_if = "Option::is_none")]
241 pub after: Option<&'a Continuation>,
242 pub limit: usize,
243 pub include_markup: bool,
244}
245
246#[derive(Serialize)]
247pub struct GetEmailsIterPageQuery<'a> {
248 #[serde(skip_serializing_if = "Option::is_none")]
249 pub continuation: Option<&'a EmailContinuation>,
250 pub limit: usize,
251}
252
253#[derive(Serialize)]
254pub struct GetCommentQuery {
255 pub include_markup: bool,
256}
257
258#[derive(Serialize)]
259pub struct GetEmailQuery {
260 pub id: String,
261}
262
263impl Client {
264 pub fn new(config: Config) -> Result<Client> {
266 let http_client = build_http_client(&config)?;
267 let headers = build_headers(&config)?;
268 let endpoints = Endpoints::new(config.endpoint)?;
269 let retrier = config.retry_config.map(Retrier::new);
270 Ok(Client {
271 endpoints,
272 http_client,
273 headers,
274 retrier,
275 })
276 }
277
278 pub fn base_url(&self) -> &Url {
280 &self.endpoints.base
281 }
282
283 pub fn get_sources(&self) -> Result<Vec<Source>> {
285 Ok(self
286 .get::<_, GetAvailableSourcesResponse>(self.endpoints.sources.clone())?
287 .sources)
288 }
289
290 pub fn get_user(&self, user: impl Into<UserIdentifier>) -> Result<User> {
292 Ok(match user.into() {
293 UserIdentifier::Id(user_id) => {
294 self.get::<_, GetUserResponse>(self.endpoints.user_by_id(&user_id)?)?
295 .user
296 }
297 })
298 }
299
300 pub fn get_source(&self, source: impl Into<SourceIdentifier>) -> Result<Source> {
302 Ok(match source.into() {
303 SourceIdentifier::Id(source_id) => {
304 self.get::<_, GetSourceResponse>(self.endpoints.source_by_id(&source_id)?)?
305 .source
306 }
307 SourceIdentifier::FullName(source_name) => {
308 self.get::<_, GetSourceResponse>(self.endpoints.source_by_name(&source_name)?)?
309 .source
310 }
311 })
312 }
313
314 pub fn create_label_defs_bulk(
315 &self,
316 dataset_name: &DatasetFullName,
317 label_group: LabelGroupName,
318 label_defs: Vec<NewLabelDef>,
319 ) -> Result<()> {
320 self.put::<_, _, CreateOrUpdateLabelDefsBulkResponse>(
321 self.endpoints.label_group(dataset_name, label_group)?,
322 CreateOrUpdateLabelDefsBulkRequest { label_defs },
323 )?;
324 Ok(())
325 }
326
327 pub fn create_source(
329 &self,
330 source_name: &SourceFullName,
331 options: NewSource<'_>,
332 ) -> Result<Source> {
333 Ok(self
334 .put::<_, _, CreateSourceResponse>(
335 self.endpoints.source_by_name(source_name)?,
336 CreateSourceRequest { source: options },
337 )?
338 .source)
339 }
340
341 pub fn update_source(
343 &self,
344 source_name: &SourceFullName,
345 options: UpdateSource<'_>,
346 ) -> Result<Source> {
347 Ok(self
348 .post::<_, _, UpdateSourceResponse>(
349 self.endpoints.source_by_name(source_name)?,
350 UpdateSourceRequest { source: options },
351 Retry::Yes,
352 )?
353 .source)
354 }
355
356 pub fn delete_source(&self, source: impl Into<SourceIdentifier>) -> Result<()> {
358 let source_id = match source.into() {
359 SourceIdentifier::Id(source_id) => source_id,
360 source @ SourceIdentifier::FullName(_) => self.get_source(source)?.id,
361 };
362 self.delete(self.endpoints.source_by_id(&source_id)?)
363 }
364
365 pub fn create_quota(
367 &self,
368 target_tenant_id: &TenantId,
369 tenant_quota_kind: TenantQuotaKind,
370 options: CreateQuota,
371 ) -> Result<()> {
372 self.post(
373 self.endpoints.quota(target_tenant_id, tenant_quota_kind)?,
374 options,
375 Retry::Yes,
376 )
377 }
378
379 pub fn get_quotas(&self, tenant_id: &Option<UiPathTenantId>) -> Result<Vec<Quota>> {
381 Ok(self
382 .get::<_, GetQuotasResponse>(self.endpoints.quotas(tenant_id)?)?
383 .quotas)
384 }
385
386 pub fn delete_user(&self, user: impl Into<UserIdentifier>) -> Result<()> {
388 let UserIdentifier::Id(user_id) = user.into();
389 self.delete(self.endpoints.user_by_id(&user_id)?)
390 }
391
392 pub fn delete_comments(
394 &self,
395 source: impl Into<SourceIdentifier>,
396 comments: &[CommentId],
397 ) -> Result<()> {
398 let source_full_name = match source.into() {
399 source @ SourceIdentifier::Id(_) => self.get_source(source)?.full_name(),
400 SourceIdentifier::FullName(source_full_name) => source_full_name,
401 };
402 self.delete_query(
403 self.endpoints.comments_v1(&source_full_name)?,
404 Some(&id_list_query(comments.iter().map(|uid| &uid.0))),
405 )
406 }
407
408 pub fn get_comments_iter_page(
410 &self,
411 source_name: &SourceFullName,
412 continuation: Option<&ContinuationKind>,
413 to_timestamp: Option<DateTime<Utc>>,
414 limit: usize,
415 ) -> Result<CommentsIterPage> {
416 let (from_timestamp, after) = match continuation {
419 Some(ContinuationKind::Timestamp(from_timestamp)) => (Some(*from_timestamp), None),
422 Some(ContinuationKind::Continuation(after)) => (None, Some(after)),
425 None => (None, None),
428 };
429 let query_params = GetCommentsIterPageQuery {
430 from_timestamp,
431 to_timestamp,
432 after,
433 limit,
434 include_markup: true,
435 };
436 self.get_query(self.endpoints.comments(source_name)?, Some(&query_params))
437 }
438
439 pub fn get_dataset_query_iter<'a>(
441 &'a self,
442 dataset_name: &'a DatasetFullName,
443 params: &'a mut QueryRequestParams,
444 ) -> DatasetQueryIter<'a> {
445 DatasetQueryIter::new(self, dataset_name, params)
446 }
447
448 pub fn get_comments_iter<'a>(
450 &'a self,
451 source_name: &'a SourceFullName,
452 page_size: Option<usize>,
453 timerange: CommentsIterTimerange,
454 ) -> CommentsIter<'a> {
455 CommentsIter::new(self, source_name, page_size, timerange)
456 }
457
458 pub fn get_keyed_sync_state_ids(
459 &self,
460 bucket_id: &BucketId,
461 request: &GetKeyedSyncStateIdsRequest,
462 ) -> Result<Vec<KeyedSyncStateId>> {
463 Ok(self
464 .post::<_, _, GetKeyedSyncStateIdsResponse>(
465 self.endpoints.query_keyed_sync_state_ids(bucket_id)?,
466 Some(&request),
467 Retry::Yes,
468 )?
469 .keyed_sync_state_ids)
470 }
471
472 pub fn delete_keyed_sync_state(
473 &self,
474 bucket_id: &BucketId,
475 id: &KeyedSyncStateId,
476 ) -> Result<()> {
477 self.delete(self.endpoints.keyed_sync_state(bucket_id, id)?)
478 }
479
480 pub fn get_keyed_sync_states(&self, bucket_id: &BucketId) -> Result<Vec<KeyedSyncState>> {
481 Ok(self
482 .get::<_, GetKeyedSyncStatesResponse>(self.endpoints.keyed_sync_states(bucket_id)?)?
483 .keyed_sync_states)
484 }
485
486 pub fn get_email(&self, bucket_name: &BucketFullName, id: EmailId) -> Result<Vec<Email>> {
488 let query_params = GetEmailQuery { id: id.0 };
489 Ok(self
490 .get_query::<_, _, GetEmailResponse>(
491 self.endpoints.get_emails(bucket_name)?,
492 Some(&query_params),
493 )?
494 .emails)
495 }
496
497 pub fn get_emails_iter_page(
499 &self,
500 bucket_name: &BucketFullName,
501 continuation: Option<&EmailContinuation>,
502 limit: usize,
503 ) -> Result<EmailsIterPage> {
504 let query_params = GetEmailsIterPageQuery {
505 continuation,
506 limit,
507 };
508 self.post(
509 self.endpoints.get_emails(bucket_name)?,
510 Some(&query_params),
511 Retry::Yes,
512 )
513 }
514
515 pub fn get_emails_iter<'a>(
517 &'a self,
518 bucket_name: &'a BucketFullName,
519 page_size: Option<usize>,
520 ) -> EmailsIter<'a> {
521 EmailsIter::new(self, bucket_name, page_size)
522 }
523
524 pub fn get_comment<'a>(
526 &'a self,
527 source_name: &'a SourceFullName,
528 comment_id: &'a CommentId,
529 ) -> Result<Comment> {
530 let query_params = GetCommentQuery {
531 include_markup: true,
532 };
533 Ok(self
534 .get_query::<_, _, GetCommentResponse>(
535 self.endpoints.comment_by_id(source_name, comment_id)?,
536 Some(&query_params),
537 )?
538 .comment)
539 }
540 pub fn post_integration(
541 &self,
542 name: &IntegrationFullName,
543 integration: &NewIntegration,
544 ) -> Result<PostIntegrationResponse> {
545 self.request(
546 &Method::POST,
547 &self.endpoints.integration(name)?,
548 &Some(PostIntegrationRequest {
549 integration: integration.clone(),
550 }),
551 &None::<()>,
552 &Retry::No,
553 )
554 }
555
556 pub fn put_integration(
557 &self,
558 name: &IntegrationFullName,
559 integration: &NewIntegration,
560 ) -> Result<PutIntegrationResponse> {
561 self.request(
562 &Method::PUT,
563 &self.endpoints.integration(name)?,
564 &Some(PutIntegrationRequest {
565 integration: integration.clone(),
566 }),
567 &None::<()>,
568 &Retry::No,
569 )
570 }
571
572 pub fn put_comments_split_on_failure(
573 &self,
574 source_name: &SourceFullName,
575 comments: Vec<NewComment>,
576 no_charge: bool,
577 ) -> SplitableRequestResponse<PutCommentsResponse> {
578 self.splitable_request(
582 Method::PUT,
583 self.endpoints
584 .put_comments(source_name)
585 .expect("Could not get put_comments endpoint"),
586 PutCommentsRequest { comments },
587 Some(NoChargeQuery { no_charge }),
588 Retry::Yes,
589 )
590 }
591
592 pub fn put_comments(
593 &self,
594 source_name: &SourceFullName,
595 comments: Vec<NewComment>,
596 no_charge: bool,
597 ) -> Result<PutCommentsResponse> {
598 self.request(
601 &Method::PUT,
602 &self.endpoints.put_comments(source_name)?,
603 &Some(PutCommentsRequest { comments }),
604 &Some(NoChargeQuery { no_charge }),
605 &Retry::Yes,
606 )
607 }
608
609 pub fn put_stream(
610 &self,
611 dataset_name: &DatasetFullName,
612 stream: &NewStream,
613 ) -> Result<PutStreamResponse> {
614 self.put(
615 self.endpoints.streams(dataset_name)?,
616 Some(PutStreamRequest { stream }),
617 )
618 }
619
620 pub fn get_audit_events(
621 &self,
622 minimum_timestamp: Option<DateTime<Utc>>,
623 maximum_timestamp: Option<DateTime<Utc>>,
624 continuation: Option<Continuation>,
625 ) -> Result<AuditQueryResponse> {
626 self.post::<_, _, AuditQueryResponse>(
627 self.endpoints.audit_events_query()?,
628 AuditQueryRequest {
629 continuation,
630 filter: AuditQueryFilter {
631 timestamp: CommentTimestampFilter {
632 minimum: minimum_timestamp,
633 maximum: maximum_timestamp,
634 },
635 },
636 },
637 Retry::Yes,
638 )
639 }
640 pub fn get_latest_validation(
641 &self,
642 dataset_name: &DatasetFullName,
643 ) -> Result<ValidationResponse> {
644 self.get::<_, ValidationResponse>(self.endpoints.latest_validation(dataset_name)?)
645 }
646
647 pub fn get_validation(
648 &self,
649 dataset_name: &DatasetFullName,
650 model_version: &ModelVersion,
651 ) -> Result<ValidationResponse> {
652 self.get::<_, ValidationResponse>(self.endpoints.validation(dataset_name, model_version)?)
653 }
654
655 pub fn get_labellers(&self, dataset_name: &DatasetFullName) -> Result<Vec<UserModelMetadata>> {
656 Ok(self
657 .post::<_, _, GetAllModelsInDatasetRespone>(
658 self.endpoints.labellers(dataset_name)?,
659 GetAllModelsInDatasetRequest {},
660 Retry::Yes,
661 )?
662 .labellers)
663 }
664
665 pub fn get_label_validation(
666 &self,
667 label: &LabelName,
668 dataset_name: &DatasetFullName,
669 model_version: &ModelVersion,
670 ) -> Result<LabelValidation> {
671 Ok(self
672 .post::<_, _, LabelValidationResponse>(
673 self.endpoints
674 .label_validation(dataset_name, model_version)?,
675 LabelValidationRequest {
676 label: label.clone(),
677 },
678 Retry::Yes,
679 )?
680 .label_validation)
681 }
682
683 pub fn sync_comments(
684 &self,
685 source_name: &SourceFullName,
686 comments: Vec<NewComment>,
687 no_charge: bool,
688 ) -> Result<SyncCommentsResponse> {
689 self.request(
690 &Method::POST,
691 &self.endpoints.sync_comments(source_name)?,
692 &Some(SyncCommentsRequest { comments }),
693 &Some(NoChargeQuery { no_charge }),
694 &Retry::Yes,
695 )
696 }
697
698 pub fn sync_comments_split_on_failure(
699 &self,
700 source_name: &SourceFullName,
701 comments: Vec<NewComment>,
702 no_charge: bool,
703 ) -> SplitableRequestResponse<SyncCommentsResponse> {
704 self.splitable_request(
705 Method::POST,
706 self.endpoints
707 .sync_comments(source_name)
708 .expect("Could not get sync_comments endpoint"),
709 SyncCommentsRequest { comments },
710 Some(NoChargeQuery { no_charge }),
711 Retry::Yes,
712 )
713 }
714
715 pub fn sync_raw_emails(
716 &self,
717 source_name: &SourceFullName,
718 documents: &[Document],
719 transform_tag: &TransformTag,
720 include_comments: bool,
721 no_charge: bool,
722 ) -> Result<SyncRawEmailsResponse> {
723 self.request(
724 &Method::POST,
725 &self.endpoints.sync_comments_raw_emails(source_name)?,
726 &Some(SyncRawEmailsRequest {
727 documents,
728 transform_tag,
729 include_comments,
730 }),
731 &Some(NoChargeQuery { no_charge }),
732 &Retry::Yes,
733 )
734 }
735
736 pub fn put_emails_split_on_failure(
737 &self,
738 bucket_name: &BucketFullName,
739 emails: Vec<NewEmail>,
740 no_charge: bool,
741 ) -> SplitableRequestResponse<PutEmailsResponse> {
742 self.splitable_request(
743 Method::PUT,
744 self.endpoints
745 .put_emails(bucket_name)
746 .expect("Could not get put_emails endpoint"),
747 PutEmailsRequest { emails },
748 Some(NoChargeQuery { no_charge }),
749 Retry::Yes,
750 )
751 }
752
753 pub fn put_emails(
754 &self,
755 bucket_name: &BucketFullName,
756 emails: Vec<NewEmail>,
757 no_charge: bool,
758 ) -> Result<PutEmailsResponse> {
759 self.request(
760 &Method::PUT,
761 &self.endpoints.put_emails(bucket_name)?,
762 &Some(PutEmailsRequest { emails }),
763 &Some(NoChargeQuery { no_charge }),
764 &Retry::Yes,
765 )
766 }
767
768 pub fn post_user(&self, user_id: &UserId, user: UpdateUser) -> Result<PostUserResponse> {
769 self.post(
770 self.endpoints.post_user(user_id)?,
771 PostUserRequest { user: &user },
772 Retry::Yes,
773 )
774 }
775
776 pub fn put_comment_audio(
777 &self,
778 source_id: &SourceId,
779 comment_id: &CommentId,
780 audio_path: impl AsRef<Path>,
781 ) -> Result<()> {
782 let form = Form::new()
783 .file("file", audio_path)
784 .map_err(|source| Error::Unknown {
785 message: "PUT comment audio operation failed".to_owned(),
786 source: source.into(),
787 })?;
788 let http_response = self
789 .http_client
790 .put(self.endpoints.comment_audio(source_id, comment_id)?)
791 .headers(self.headers.clone())
792 .multipart(form)
793 .send()
794 .map_err(|source| Error::ReqwestError {
795 message: "PUT comment audio operation failed".to_owned(),
796 source,
797 })?;
798 let status = http_response.status();
799 http_response
800 .json::<Response<EmptySuccess>>()
801 .map_err(Error::BadJsonResponse)?
802 .into_result(status)?;
803 Ok(())
804 }
805
806 pub fn upload_ixp_document(
807 &self,
808 source_id: &SourceId,
809 filename: String,
810 bytes: Vec<u8>,
811 ) -> Result<CommentId> {
812 let endpoint = self.endpoints.ixp_documents(source_id)?;
813
814 let do_request = || {
815 let form = Form::new().part(
816 "file",
817 Part::bytes(bytes.clone()).file_name(filename.clone()),
818 );
819 let request = self
820 .http_client
821 .request(Method::PUT, endpoint.clone())
822 .multipart(form)
823 .headers(self.headers.clone());
824
825 request.send()
826 };
827
828 let result = self.with_retries(do_request);
829
830 let http_response = result.map_err(|source| Error::ReqwestError {
831 source,
832 message: "Operation failed.".to_string(),
833 })?;
834
835 let status = http_response.status();
836
837 Ok(http_response
838 .json::<Response<UploadIxpDocumentResponse>>()
839 .map_err(Error::BadJsonResponse)?
840 .into_result(status)?
841 .comment_id)
842 }
843
844 pub fn upload_comment_attachment(
845 &self,
846 source_id: &SourceId,
847 comment_id: &CommentId,
848 attachment_index: usize,
849 attachment: &PathBuf,
850 ) -> Result<UploadAttachmentResponse> {
851 let url = self
852 .endpoints
853 .attachment_upload(source_id, comment_id, attachment_index)?;
854
855 if !attachment.is_file() || !attachment.exists() {
856 return Err(Error::FileDoesNotExist {
857 path: attachment.clone(),
858 });
859 }
860
861 let do_request = || {
862 let form = Form::new()
863 .file("file", attachment)
864 .map_err(|source| Error::Unknown {
865 message: "PUT comment attachment operation failed".to_owned(),
866 source: source.into(),
867 })
868 .unwrap();
869 let request = self
870 .http_client
871 .request(Method::PUT, url.clone())
872 .multipart(form)
873 .headers(self.headers.clone());
874
875 request.send()
876 };
877
878 let result = self.with_retries(do_request);
879
880 let http_response = result.map_err(|source| Error::ReqwestError {
881 source,
882 message: "Operation failed.".to_string(),
883 })?;
884
885 let status = http_response.status();
886
887 http_response
888 .json::<Response<UploadAttachmentResponse>>()
889 .map_err(Error::BadJsonResponse)?
890 .into_result(status)
891 }
892
893 pub fn get_ixp_document(
894 &self,
895 source_id: &SourceId,
896 comment_id: &CommentId,
897 ) -> Result<Vec<u8>> {
898 self.get_octet_stream(&self.endpoints.ixp_document(source_id, comment_id)?)
899 }
900
901 fn get_octet_stream(&self, endpoint: &Url) -> Result<Vec<u8>> {
902 let mut response = self.raw_request(
903 &Method::GET,
904 endpoint,
905 &None::<()>,
906 &None::<()>,
907 &Retry::Yes,
908 None,
909 )?;
910
911 let status = response.status();
912
913 if !status.is_success() {
914 return Err(Error::Api {
915 status_code: status,
916 message: "Bad status code when getting octet stream".to_string(),
917 });
918 }
919
920 let mut buffer = Vec::new();
921
922 response
923 .read_to_end(&mut buffer)
924 .map_err(|source| Error::Unknown {
925 message: "Failed to read buffer".to_string(),
926 source: Box::new(source),
927 })?;
928 Ok(buffer)
929 }
930
931 pub fn get_attachment(&self, reference: &AttachmentReference) -> Result<Vec<u8>> {
932 self.get_octet_stream(&self.endpoints.attachment_reference(reference)?)
933 }
934
935 pub fn get_integrations(&self) -> Result<Vec<Integration>> {
936 Ok(self
937 .get::<_, GetIntegrationsResponse>(self.endpoints.integrations()?)?
938 .integrations)
939 }
940
941 pub fn get_integration(&self, name: &IntegrationFullName) -> Result<Integration> {
942 Ok(self
943 .get::<_, GetIntegrationResponse>(self.endpoints.integration(name)?)?
944 .integration)
945 }
946
947 pub fn get_datasets(&self) -> Result<Vec<Dataset>> {
948 Ok(self
949 .get::<_, GetAvailableDatasetsResponse>(self.endpoints.datasets.clone())?
950 .datasets)
951 }
952
953 pub fn get_dataset<IdentifierT>(&self, dataset: IdentifierT) -> Result<Dataset>
954 where
955 IdentifierT: Into<DatasetIdentifier>,
956 {
957 Ok(match dataset.into() {
958 DatasetIdentifier::Id(dataset_id) => {
959 self.get::<_, GetDatasetResponse>(self.endpoints.dataset_by_id(&dataset_id)?)?
960 .dataset
961 }
962 DatasetIdentifier::FullName(dataset_name) => {
963 self.get::<_, GetDatasetResponse>(self.endpoints.dataset_by_name(&dataset_name)?)?
964 .dataset
965 }
966 })
967 }
968
969 pub fn create_ixp_dataset(&self, dataset: IxpDatasetNew) -> Result<Dataset> {
971 Ok(self
972 .put::<_, _, CreateIxpDatasetResponse>(
973 self.endpoints.ixp_datasets()?,
974 CreateIxpDatasetRequest { dataset },
975 )?
976 .dataset)
977 }
978
979 pub fn create_dataset(
981 &self,
982 dataset_name: &DatasetFullName,
983 options: NewDataset<'_>,
984 ) -> Result<Dataset> {
985 Ok(self
986 .put::<_, _, CreateDatasetResponse>(
987 self.endpoints.dataset_by_name(dataset_name)?,
988 CreateDatasetRequest { dataset: options },
989 )?
990 .dataset)
991 }
992
993 pub fn update_dataset(
995 &self,
996 dataset_name: &DatasetFullName,
997 options: UpdateDataset<'_>,
998 ) -> Result<Dataset> {
999 Ok(self
1000 .post::<_, _, UpdateDatasetResponse>(
1001 self.endpoints.dataset_by_name(dataset_name)?,
1002 UpdateDatasetRequest { dataset: options },
1003 Retry::Yes,
1004 )?
1005 .dataset)
1006 }
1007
1008 pub fn delete_dataset<IdentifierT>(&self, dataset: IdentifierT) -> Result<()>
1009 where
1010 IdentifierT: Into<DatasetIdentifier>,
1011 {
1012 let dataset_id = match dataset.into() {
1013 DatasetIdentifier::Id(dataset_id) => dataset_id,
1014 dataset @ DatasetIdentifier::FullName(_) => self.get_dataset(dataset)?.id,
1015 };
1016 self.delete(self.endpoints.dataset_by_id(&dataset_id)?)
1017 }
1018
1019 pub fn get_labellings<'a>(
1021 &self,
1022 dataset_name: &DatasetFullName,
1023 comment_uids: impl Iterator<Item = &'a CommentUid>,
1024 ) -> Result<Vec<AnnotatedComment>> {
1025 Ok(self
1026 .get_query::<_, _, GetAnnotationsResponse>(
1027 self.endpoints.get_labellings(dataset_name)?,
1028 Some(&id_list_query(comment_uids.into_iter().map(|id| &id.0))),
1029 )?
1030 .results)
1031 }
1032
1033 pub fn get_labellings_iter<'a>(
1035 &'a self,
1036 dataset_name: &'a DatasetFullName,
1037 source_id: &'a SourceId,
1038 return_predictions: bool,
1039 limit: Option<usize>,
1040 ) -> LabellingsIter<'a> {
1041 LabellingsIter::new(self, dataset_name, source_id, return_predictions, limit)
1042 }
1043
1044 pub fn get_labellings_in_bulk(
1046 &self,
1047 dataset_name: &DatasetFullName,
1048 query_parameters: GetLabellingsInBulk<'_>,
1049 ) -> Result<GetAnnotationsResponse> {
1050 self.get_query::<_, _, GetAnnotationsResponse>(
1051 self.endpoints.get_labellings(dataset_name)?,
1052 Some(&query_parameters),
1053 )
1054 }
1055
1056 pub fn update_labelling(
1058 &self,
1059 dataset_name: &DatasetFullName,
1060 comment_uid: &CommentUid,
1061 labelling: Option<&[NewLabelling]>,
1062 entities: Option<&NewEntities>,
1063 moon_forms: Option<&[NewMoonForm]>,
1064 ) -> Result<AnnotatedComment> {
1065 self.post::<_, _, AnnotatedComment>(
1066 self.endpoints.post_labelling(dataset_name, comment_uid)?,
1067 UpdateAnnotationsRequest {
1068 labelling,
1069 entities,
1070 moon_forms,
1071 },
1072 Retry::Yes,
1073 )
1074 }
1075
1076 pub fn get_comment_predictions<'a>(
1078 &self,
1079 dataset_name: &DatasetFullName,
1080 model_version: &ModelVersion,
1081 comment_uids: impl Iterator<Item = &'a CommentUid>,
1082 threshold: Option<CommentPredictionsThreshold>,
1083 labels: Option<Vec<TriggerLabelThreshold>>,
1084 ) -> Result<Vec<Prediction>> {
1085 Ok(self
1086 .post::<_, _, GetPredictionsResponse>(
1087 self.endpoints
1088 .get_comment_predictions(dataset_name, model_version)?,
1089 GetCommentPredictionsRequest {
1090 uids: comment_uids
1091 .into_iter()
1092 .map(|id| id.0.clone())
1093 .collect::<Vec<_>>(),
1094
1095 threshold,
1096 labels,
1097 },
1098 Retry::Yes,
1099 )?
1100 .predictions)
1101 }
1102
1103 pub fn get_streams(&self, dataset_name: &DatasetFullName) -> Result<Vec<Stream>> {
1104 Ok(self
1105 .get::<_, GetStreamsResponse>(self.endpoints.streams(dataset_name)?)?
1106 .streams)
1107 }
1108
1109 pub fn get_recent_comments(
1110 &self,
1111 dataset_name: &DatasetFullName,
1112 filter: &CommentFilter,
1113 limit: usize,
1114 continuation: Option<&Continuation>,
1115 ) -> Result<RecentCommentsPage> {
1116 self.post::<_, _, RecentCommentsPage>(
1117 self.endpoints.recent_comments(dataset_name)?,
1118 GetRecentRequest {
1119 limit,
1120 filter,
1121 continuation,
1122 },
1123 Retry::No,
1124 )
1125 }
1126
1127 pub fn refresh_user_permissions(&self) -> Result<RefreshUserPermissionsResponse> {
1128 self.post::<_, _, RefreshUserPermissionsResponse>(
1129 self.endpoints.refresh_user_permissions()?,
1130 RefreshUserPermissionsRequest {},
1131 Retry::Yes,
1132 )
1133 }
1134
1135 pub fn get_current_user(&self) -> Result<User> {
1136 Ok(self
1137 .get::<_, GetCurrentUserResponse>(self.endpoints.current_user.clone())?
1138 .user)
1139 }
1140
1141 pub fn get_users(&self) -> Result<Vec<User>> {
1142 Ok(self
1143 .get::<_, GetAvailableUsersResponse>(self.endpoints.users.clone())?
1144 .users)
1145 }
1146
1147 pub fn create_user(&self, user: NewUser<'_>) -> Result<User> {
1148 Ok(self
1149 .put::<_, _, CreateUserResponse>(
1150 self.endpoints.users.clone(),
1151 CreateUserRequest { user },
1152 )?
1153 .user)
1154 }
1155
1156 pub fn dataset_summary(
1157 &self,
1158 dataset_name: &DatasetFullName,
1159 params: &SummaryRequestParams,
1160 ) -> Result<SummaryResponse> {
1161 self.post::<_, _, SummaryResponse>(
1162 self.endpoints.dataset_summary(dataset_name)?,
1163 serde_json::to_value(params).expect("summary params serialization error"),
1164 Retry::Yes,
1165 )
1166 }
1167
1168 pub fn query_dataset_csv(
1169 &self,
1170 dataset_name: &DatasetFullName,
1171 params: &QueryRequestParams,
1172 ) -> Result<String> {
1173 let response = self
1174 .raw_request(
1175 &Method::POST,
1176 &self.endpoints.query_dataset(dataset_name)?,
1177 &Some(serde_json::to_value(params).expect("query params serialization error")),
1178 &None::<()>,
1179 &Retry::Yes,
1180 Some(HeaderValue::from_str("text/csv").expect("Could not parse csv header")),
1181 )?
1182 .text()
1183 .expect("Could not get csv text");
1184
1185 Ok(response)
1186 }
1187
1188 pub fn query_dataset(
1189 &self,
1190 dataset_name: &DatasetFullName,
1191 params: &QueryRequestParams,
1192 ) -> Result<QueryResponse> {
1193 self.post::<_, _, QueryResponse>(
1194 self.endpoints.query_dataset(dataset_name)?,
1195 serde_json::to_value(params).expect("query params serialization error"),
1196 Retry::Yes,
1197 )
1198 }
1199
1200 pub fn send_welcome_email(&self, user_id: UserId) -> Result<()> {
1201 self.post::<_, _, WelcomeEmailResponse>(
1202 self.endpoints.welcome_email(&user_id)?,
1203 json!({}),
1204 Retry::No,
1205 )?;
1206 Ok(())
1207 }
1208
1209 pub fn get_bucket_statistics(&self, bucket_name: &BucketFullName) -> Result<BucketStatistics> {
1210 Ok(self
1211 .get::<_, GetBucketStatisticsResponse>(self.endpoints.bucket_statistics(bucket_name)?)?
1212 .statistics)
1213 }
1214
1215 pub fn get_dataset_statistics(
1216 &self,
1217 dataset_name: &DatasetFullName,
1218 params: &DatasetStatisticsRequestParams,
1219 ) -> Result<CommentStatistics> {
1220 Ok(self
1221 .post::<_, _, GetStatisticsResponse>(
1222 self.endpoints.dataset_statistics(dataset_name)?,
1223 serde_json::to_value(params)
1224 .expect("dataset statistics params serialization error"),
1225 Retry::No,
1226 )?
1227 .statistics)
1228 }
1229
1230 pub fn get_source_statistics(
1231 &self,
1232 source_name: &SourceFullName,
1233 params: &SourceStatisticsRequestParams,
1234 ) -> Result<CommentStatistics> {
1235 Ok(self
1236 .post::<_, _, GetStatisticsResponse>(
1237 self.endpoints.source_statistics(source_name)?,
1238 serde_json::to_value(params).expect("source statistics params serialization error"),
1239 Retry::No,
1240 )?
1241 .statistics)
1242 }
1243
1244 pub fn create_bucket(
1246 &self,
1247 bucket_name: &BucketFullName,
1248 options: NewBucket<'_>,
1249 ) -> Result<Bucket> {
1250 Ok(self
1251 .put::<_, _, CreateBucketResponse>(
1252 self.endpoints.bucket_by_name(bucket_name)?,
1253 CreateBucketRequest { bucket: options },
1254 )?
1255 .bucket)
1256 }
1257
1258 pub fn get_buckets(&self) -> Result<Vec<Bucket>> {
1259 Ok(self
1260 .get::<_, GetAvailableBucketsResponse>(self.endpoints.buckets.clone())?
1261 .buckets)
1262 }
1263
1264 pub fn get_bucket<IdentifierT>(&self, bucket: IdentifierT) -> Result<Bucket>
1265 where
1266 IdentifierT: Into<BucketIdentifier>,
1267 {
1268 Ok(match bucket.into() {
1269 BucketIdentifier::Id(bucket_id) => {
1270 self.get::<_, GetBucketResponse>(self.endpoints.bucket_by_id(&bucket_id)?)?
1271 .bucket
1272 }
1273 BucketIdentifier::FullName(bucket_name) => {
1274 self.get::<_, GetBucketResponse>(self.endpoints.bucket_by_name(&bucket_name)?)?
1275 .bucket
1276 }
1277 })
1278 }
1279
1280 pub fn delete_bucket<IdentifierT>(&self, bucket: IdentifierT) -> Result<()>
1281 where
1282 IdentifierT: Into<BucketIdentifier>,
1283 {
1284 let bucket_id = match bucket.into() {
1285 BucketIdentifier::Id(bucket_id) => bucket_id,
1286 bucket @ BucketIdentifier::FullName(_) => self.get_bucket(bucket)?.id,
1287 };
1288 self.delete(self.endpoints.bucket_by_id(&bucket_id)?)
1289 }
1290
1291 pub fn fetch_stream_comments(
1292 &self,
1293 stream_name: &StreamFullName,
1294 size: u32,
1295 ) -> Result<StreamBatch> {
1296 self.post(
1297 self.endpoints.stream_fetch(stream_name)?,
1298 StreamFetchRequest { size },
1299 Retry::No,
1300 )
1301 }
1302
1303 pub fn get_stream(&self, stream_name: &StreamFullName) -> Result<Stream> {
1304 Ok(self
1305 .get::<_, GetStreamResponse>(self.endpoints.stream(stream_name)?)?
1306 .stream)
1307 }
1308
1309 pub fn advance_stream(
1310 &self,
1311 stream_name: &StreamFullName,
1312 sequence_id: StreamSequenceId,
1313 ) -> Result<()> {
1314 self.post::<_, _, serde::de::IgnoredAny>(
1315 self.endpoints.stream_advance(stream_name)?,
1316 StreamAdvanceRequest { sequence_id },
1317 Retry::No,
1318 )?;
1319 Ok(())
1320 }
1321
1322 pub fn reset_stream(
1323 &self,
1324 stream_name: &StreamFullName,
1325 to_comment_created_at: DateTime<Utc>,
1326 ) -> Result<()> {
1327 self.post::<_, _, serde::de::IgnoredAny>(
1328 self.endpoints.stream_reset(stream_name)?,
1329 StreamResetRequest {
1330 to_comment_created_at,
1331 },
1332 Retry::No,
1333 )?;
1334 Ok(())
1335 }
1336
1337 pub fn tag_stream_exceptions(
1338 &self,
1339 stream_name: &StreamFullName,
1340 exceptions: &[StreamException],
1341 ) -> Result<()> {
1342 self.put::<_, _, serde::de::IgnoredAny>(
1343 self.endpoints.stream_exceptions(stream_name)?,
1344 TagStreamExceptionsRequest { exceptions },
1345 )?;
1346 Ok(())
1347 }
1348
1349 pub fn get_project(&self, project_name: &ProjectName) -> Result<Project> {
1351 let response =
1352 self.get::<_, GetProjectResponse>(self.endpoints.project_by_name(project_name)?)?;
1353 Ok(response.project)
1354 }
1355
1356 pub fn get_projects(&self) -> Result<Vec<Project>> {
1358 let response = self.get::<_, GetProjectsResponse>(self.endpoints.projects.clone())?;
1359 Ok(response.projects)
1360 }
1361
1362 pub fn create_project(
1364 &self,
1365 project_name: &ProjectName,
1366 options: NewProject,
1367 user_ids: &[UserId],
1368 ) -> Result<Project> {
1369 Ok(self
1370 .put::<_, _, CreateProjectResponse>(
1371 self.endpoints.project_by_name(project_name)?,
1372 CreateProjectRequest {
1373 project: options,
1374 user_ids,
1375 },
1376 )?
1377 .project)
1378 }
1379
1380 pub fn update_project(
1382 &self,
1383 project_name: &ProjectName,
1384 options: UpdateProject,
1385 ) -> Result<Project> {
1386 Ok(self
1387 .post::<_, _, UpdateProjectResponse>(
1388 self.endpoints.project_by_name(project_name)?,
1389 UpdateProjectRequest { project: options },
1390 Retry::Yes,
1391 )?
1392 .project)
1393 }
1394
1395 pub fn delete_project(
1397 &self,
1398 project_name: &ProjectName,
1399 force_delete: ForceDeleteProject,
1400 ) -> Result<()> {
1401 let endpoint = self.endpoints.project_by_name(project_name)?;
1402 match force_delete {
1403 ForceDeleteProject::No => self.delete(endpoint)?,
1404 ForceDeleteProject::Yes => {
1405 self.delete_query(endpoint, Some(&json!({ "force": true })))?
1406 }
1407 };
1408 Ok(())
1409 }
1410
1411 fn get<LocationT, SuccessT>(&self, url: LocationT) -> Result<SuccessT>
1412 where
1413 LocationT: IntoUrl + Display + Clone,
1414 for<'de> SuccessT: Deserialize<'de>,
1415 {
1416 self.request(&Method::GET, &url, &None::<()>, &None::<()>, &Retry::Yes)
1417 }
1418
1419 fn get_query<LocationT, QueryT, SuccessT>(
1420 &self,
1421 url: LocationT,
1422 query: Option<&QueryT>,
1423 ) -> Result<SuccessT>
1424 where
1425 LocationT: IntoUrl + Display + Clone,
1426 QueryT: Serialize,
1427 for<'de> SuccessT: Deserialize<'de>,
1428 {
1429 self.request(&Method::GET, &url, &None::<()>, &Some(query), &Retry::Yes)
1430 }
1431
1432 fn delete<LocationT>(&self, url: LocationT) -> Result<()>
1433 where
1434 LocationT: IntoUrl + Display + Clone,
1435 {
1436 self.delete_query::<LocationT, ()>(url, None)
1437 }
1438
1439 fn delete_query<LocationT, QueryT>(&self, url: LocationT, query: Option<&QueryT>) -> Result<()>
1440 where
1441 LocationT: IntoUrl + Display + Clone,
1442 QueryT: Serialize,
1443 {
1444 debug!("Attempting DELETE `{url}`");
1445
1446 let attempts = Cell::new(0);
1447 let http_response = self
1448 .with_retries(|| {
1449 attempts.set(attempts.get() + 1);
1450
1451 let mut request = self
1452 .http_client
1453 .delete(url.clone())
1454 .headers(self.headers.clone());
1455 if let Some(query) = query {
1456 request = request.query(query);
1457 }
1458 request.send()
1459 })
1460 .map_err(|source| Error::ReqwestError {
1461 source,
1462 message: "DELETE operation failed.".to_owned(),
1463 })?;
1464 let status = http_response.status();
1465 http_response
1466 .json::<Response<EmptySuccess>>()
1467 .map_err(Error::BadJsonResponse)?
1468 .into_result(status)
1469 .map_or_else(
1470 |error| {
1473 if attempts.get() > 1 && status == reqwest::StatusCode::NOT_FOUND {
1474 Ok(())
1475 } else {
1476 Err(error)
1477 }
1478 },
1479 |_| Ok(()),
1480 )
1481 }
1482
1483 fn post<LocationT, RequestT, SuccessT>(
1484 &self,
1485 url: LocationT,
1486 request: RequestT,
1487 retry: Retry,
1488 ) -> Result<SuccessT>
1489 where
1490 LocationT: IntoUrl + Display + Clone,
1491 RequestT: Serialize,
1492 for<'de> SuccessT: Deserialize<'de>,
1493 {
1494 self.request(&Method::POST, &url, &Some(request), &None::<()>, &retry)
1495 }
1496
1497 fn put<LocationT, RequestT, SuccessT>(
1498 &self,
1499 url: LocationT,
1500 request: RequestT,
1501 ) -> Result<SuccessT>
1502 where
1503 LocationT: IntoUrl + Display + Clone,
1504 RequestT: Serialize,
1505 for<'de> SuccessT: Deserialize<'de>,
1506 {
1507 self.request(&Method::PUT, &url, &Some(request), &None::<()>, &Retry::Yes)
1508 }
1509
1510 fn raw_request<LocationT, RequestT, QueryT>(
1511 &self,
1512 method: &Method,
1513 url: &LocationT,
1514 body: &Option<RequestT>,
1515 query: &Option<QueryT>,
1516 retry: &Retry,
1517 accept_header: Option<HeaderValue>,
1518 ) -> Result<reqwest::blocking::Response>
1519 where
1520 LocationT: IntoUrl + Display + Clone,
1521 RequestT: Serialize,
1522 QueryT: Serialize,
1523 {
1524 let mut headers = self.headers.clone();
1525
1526 if let Some(accept_header) = accept_header {
1527 headers.insert(ACCEPT, accept_header);
1528 }
1529
1530 let do_request = || {
1531 let request = self
1532 .http_client
1533 .request(method.clone(), url.clone())
1534 .headers(headers.clone());
1535
1536 let request = match &query {
1537 Some(query) => request.query(query),
1538 None => request,
1539 };
1540 let request = match &body {
1541 Some(body) => request.json(body),
1542 None => request,
1543 };
1544 request.send()
1545 };
1546
1547 let result = match retry {
1548 Retry::Yes => self.with_retries(do_request),
1549 Retry::No => do_request(),
1550 };
1551 let http_response = result.map_err(|source| Error::ReqwestError {
1552 source,
1553 message: format!("{method} operation failed."),
1554 })?;
1555
1556 Ok(http_response)
1557 }
1558
1559 fn splitable_request<LocationT, RequestT, SuccessT, QueryT>(
1560 &self,
1561 method: Method,
1562 url: LocationT,
1563 body: RequestT,
1564 query: Option<QueryT>,
1565 retry: Retry,
1566 ) -> SplitableRequestResponse<SuccessT>
1567 where
1568 LocationT: IntoUrl + Display + Clone,
1569 RequestT: Serialize + SplittableRequest + Clone,
1570 QueryT: Serialize + Clone,
1571 for<'de> SuccessT: Deserialize<'de> + ReducibleResponse + Clone + Default,
1572 {
1573 debug!("Attempting {method} `{url}`");
1574 let result: Result<SuccessT> =
1575 self.request(&method, &url, &Some(body.clone()), &query, &retry);
1576
1577 match result {
1578 Ok(response) => SplitableRequestResponse {
1579 response,
1580 num_failed: 0,
1581 },
1582 Err(_) => {
1583 let mut num_failed = 0;
1584 let response = body
1585 .split()
1586 .filter_map(|request| {
1587 match self.request(&method, &url, &Some(request), &query, &retry) {
1588 Ok(response) => Some(response),
1589 Err(err) => {
1590 debug!("{err}");
1591 num_failed += 1;
1592 None
1593 }
1594 }
1595 })
1596 .fold(SuccessT::empty(), |merged, next: SuccessT| {
1597 merged.merge(next)
1598 });
1599
1600 SplitableRequestResponse {
1601 num_failed,
1602 response,
1603 }
1604 }
1605 }
1606 }
1607
1608 fn request<LocationT, RequestT, SuccessT, QueryT>(
1609 &self,
1610 method: &Method,
1611 url: &LocationT,
1612 body: &Option<RequestT>,
1613 query: &Option<QueryT>,
1614 retry: &Retry,
1615 ) -> Result<SuccessT>
1616 where
1617 LocationT: IntoUrl + Display + Clone,
1618 RequestT: Serialize,
1619 QueryT: Serialize + Clone,
1620 for<'de> SuccessT: Deserialize<'de>,
1621 {
1622 debug!("Attempting {method} `{url}`");
1623 let http_response = self.raw_request(method, url, body, query, retry, None)?;
1624
1625 let status = http_response.status();
1626
1627 let response_text = http_response.text().map_err(|source| Error::ReqwestError {
1628 message: "Could not get request text".to_string(),
1629 source,
1630 })?;
1631
1632 let mut deserializer = serde_json::Deserializer::from_str(&response_text);
1633 deserializer.disable_recursion_limit();
1634
1635 Response::<SuccessT>::deserialize(&mut deserializer)
1636 .map_err(Error::BadSerdeJsonResponse)?
1637 .into_result(status)
1638 }
1639
1640 fn with_retries(
1641 &self,
1642 send_request: impl Fn() -> ReqwestResult<HttpResponse>,
1643 ) -> ReqwestResult<HttpResponse> {
1644 match &self.retrier {
1645 Some(retrier) => retrier.with_retries(send_request),
1646 None => send_request(),
1647 }
1648 }
1649}
1650
1651#[derive(Copy, Clone)]
1652enum Retry {
1653 Yes,
1654 No,
1655}
1656
1657pub struct DatasetQueryIter<'a> {
1658 client: &'a Client,
1659 dataset_name: &'a DatasetFullName,
1660 done: bool,
1661 params: &'a mut QueryRequestParams,
1662}
1663
1664impl<'a> DatasetQueryIter<'a> {
1665 fn new(
1666 client: &'a Client,
1667 dataset_name: &'a DatasetFullName,
1668 params: &'a mut QueryRequestParams,
1669 ) -> Self {
1670 Self {
1671 client,
1672 dataset_name,
1673 done: false,
1674 params,
1675 }
1676 }
1677}
1678
1679impl Iterator for DatasetQueryIter<'_> {
1680 type Item = Result<Vec<AnnotatedComment>>;
1681
1682 fn next(&mut self) -> Option<Self::Item> {
1683 if self.done {
1684 return None;
1685 }
1686
1687 let response = self.client.query_dataset(self.dataset_name, self.params);
1688 Some(response.map(|page| {
1689 self.params.continuation = page.continuation;
1690 self.done = self.params.continuation.is_none();
1691 page.results
1692 }))
1693 }
1694}
1695
1696pub enum ContinuationKind {
1697 Timestamp(DateTime<Utc>),
1698 Continuation(Continuation),
1699}
1700
1701pub struct EmailsIter<'a> {
1702 client: &'a Client,
1703 bucket_name: &'a BucketFullName,
1704 continuation: Option<EmailContinuation>,
1705 done: bool,
1706 page_size: usize,
1707}
1708
1709impl<'a> EmailsIter<'a> {
1710 pub const DEFAULT_PAGE_SIZE: usize = 64;
1712 pub const MAX_PAGE_SIZE: usize = 256;
1714
1715 fn new(client: &'a Client, bucket_name: &'a BucketFullName, page_size: Option<usize>) -> Self {
1716 Self {
1717 client,
1718 bucket_name,
1719 continuation: None,
1720 done: false,
1721 page_size: page_size.unwrap_or(Self::DEFAULT_PAGE_SIZE),
1722 }
1723 }
1724}
1725
1726impl Iterator for EmailsIter<'_> {
1727 type Item = Result<Vec<Email>>;
1728
1729 fn next(&mut self) -> Option<Self::Item> {
1730 if self.done {
1731 return None;
1732 }
1733 let response = self.client.get_emails_iter_page(
1734 self.bucket_name,
1735 self.continuation.as_ref(),
1736 self.page_size,
1737 );
1738 Some(response.map(|page| {
1739 self.continuation = page.continuation;
1740 self.done = self.continuation.is_none();
1741 page.emails
1742 }))
1743 }
1744}
1745
1746pub struct CommentsIter<'a> {
1747 client: &'a Client,
1748 source_name: &'a SourceFullName,
1749 continuation: Option<ContinuationKind>,
1750 done: bool,
1751 page_size: usize,
1752 to_timestamp: Option<DateTime<Utc>>,
1753}
1754
1755#[derive(Debug, Default)]
1756pub struct CommentsIterTimerange {
1757 pub from: Option<DateTime<Utc>>,
1758 pub to: Option<DateTime<Utc>>,
1759}
1760impl<'a> CommentsIter<'a> {
1761 pub const DEFAULT_PAGE_SIZE: usize = 64;
1763 pub const MAX_PAGE_SIZE: usize = 256;
1765
1766 fn new(
1767 client: &'a Client,
1768 source_name: &'a SourceFullName,
1769 page_size: Option<usize>,
1770 timerange: CommentsIterTimerange,
1771 ) -> Self {
1772 let (from_timestamp, to_timestamp) = (timerange.from, timerange.to);
1773 Self {
1774 client,
1775 source_name,
1776 to_timestamp,
1777 continuation: from_timestamp.map(ContinuationKind::Timestamp),
1778 done: false,
1779 page_size: page_size.unwrap_or(Self::DEFAULT_PAGE_SIZE),
1780 }
1781 }
1782}
1783
1784impl Iterator for CommentsIter<'_> {
1785 type Item = Result<Vec<Comment>>;
1786
1787 fn next(&mut self) -> Option<Self::Item> {
1788 if self.done {
1789 return None;
1790 }
1791 let response = self.client.get_comments_iter_page(
1792 self.source_name,
1793 self.continuation.as_ref(),
1794 self.to_timestamp,
1795 self.page_size,
1796 );
1797 Some(response.map(|page| {
1798 self.continuation = page.continuation.map(ContinuationKind::Continuation);
1799 self.done = self.continuation.is_none();
1800 page.comments
1801 }))
1802 }
1803}
1804
1805pub struct LabellingsIter<'a> {
1806 client: &'a Client,
1807 dataset_name: &'a DatasetFullName,
1808 source_id: &'a SourceId,
1809 return_predictions: bool,
1810 after: Option<GetLabellingsAfter>,
1811 limit: Option<usize>,
1812 done: bool,
1813}
1814
1815impl<'a> LabellingsIter<'a> {
1816 fn new(
1817 client: &'a Client,
1818 dataset_name: &'a DatasetFullName,
1819 source_id: &'a SourceId,
1820 return_predictions: bool,
1821 limit: Option<usize>,
1822 ) -> Self {
1823 Self {
1824 client,
1825 dataset_name,
1826 source_id,
1827 return_predictions,
1828 after: None,
1829 limit,
1830 done: false,
1831 }
1832 }
1833}
1834
1835impl Iterator for LabellingsIter<'_> {
1836 type Item = Result<Vec<AnnotatedComment>>;
1837
1838 fn next(&mut self) -> Option<Self::Item> {
1839 if self.done {
1840 return None;
1841 }
1842 let response = self.client.get_labellings_in_bulk(
1843 self.dataset_name,
1844 GetLabellingsInBulk {
1845 source_id: self.source_id,
1846 return_predictions: &self.return_predictions,
1847 after: &self.after,
1848 limit: &self.limit,
1849 },
1850 );
1851 Some(response.map(|page| {
1852 if self.after == page.after && !page.results.is_empty() {
1853 panic!("Labellings API did not increment pagination continuation");
1854 }
1855 self.after = page.after;
1856 if page.results.is_empty() {
1857 self.done = true;
1858 }
1859 page.results
1860 }))
1861 }
1862}
1863
1864#[derive(Debug)]
1865struct Endpoints {
1866 base: Url,
1867 datasets: Url,
1868 sources: Url,
1869 buckets: Url,
1870 users: Url,
1871 current_user: Url,
1872 projects: Url,
1873}
1874
1875#[derive(Debug, Serialize, Clone, Copy)]
1876struct NoChargeQuery {
1877 no_charge: bool,
1878}
1879
1880fn construct_endpoint(base: &Url, segments: &[&str]) -> Result<Url> {
1881 let mut endpoint = base.clone();
1882
1883 let mut endpoint_segments = endpoint
1884 .path_segments_mut()
1885 .map_err(|_| Error::BadEndpoint {
1886 endpoint: base.clone(),
1887 })?;
1888
1889 for segment in segments {
1890 endpoint_segments.push(segment);
1891 }
1892
1893 drop(endpoint_segments);
1894
1895 Ok(endpoint)
1896}
1897
1898impl Endpoints {
1899 pub fn new(base: Url) -> Result<Self> {
1900 let datasets = construct_endpoint(&base, &["api", "v1", "datasets"])?;
1901 let sources = construct_endpoint(&base, &["api", "v1", "sources"])?;
1902 let buckets = construct_endpoint(&base, &["api", "_private", "buckets"])?;
1903 let users = construct_endpoint(&base, &["api", "_private", "users"])?;
1904 let current_user = construct_endpoint(&base, &["auth", "user"])?;
1905 let projects = construct_endpoint(&base, &["api", "_private", "projects"])?;
1906
1907 Ok(Endpoints {
1908 base,
1909 datasets,
1910 sources,
1911 buckets,
1912 users,
1913 current_user,
1914 projects,
1915 })
1916 }
1917
1918 fn refresh_user_permissions(&self) -> Result<Url> {
1919 construct_endpoint(&self.base, &["auth", "refresh-user-permissions"])
1920 }
1921
1922 fn label_group(
1923 &self,
1924 dataset_name: &DatasetFullName,
1925 label_group: LabelGroupName,
1926 ) -> Result<Url> {
1927 construct_endpoint(
1928 &self.base,
1929 &[
1930 "api",
1931 "_private",
1932 "datasets",
1933 &dataset_name.0,
1934 "labels",
1935 &label_group.0,
1936 ],
1937 )
1938 }
1939
1940 fn ixp_datasets(&self) -> Result<Url> {
1941 construct_endpoint(&self.base, &["api", "_private", "ixp", "datasets"])
1942 }
1943
1944 fn ixp_documents(&self, source_id: &SourceId) -> Result<Url> {
1945 construct_endpoint(
1946 &self.base,
1947 &[
1948 "api",
1949 "_private",
1950 "sources",
1951 &format!("id:{0}", source_id.0),
1952 "documents",
1953 ],
1954 )
1955 }
1956
1957 fn ixp_document(&self, source_id: &SourceId, comment_id: &CommentId) -> Result<Url> {
1958 construct_endpoint(
1959 &self.base,
1960 &[
1961 "api",
1962 "_private",
1963 "sources",
1964 &format!("id:{0}", source_id.0),
1965 "documents",
1966 &comment_id.0,
1967 ],
1968 )
1969 }
1970
1971 fn keyed_sync_states(&self, bucket_id: &BucketId) -> Result<Url> {
1972 construct_endpoint(
1973 &self.base,
1974 &[
1975 "api",
1976 "_private",
1977 "buckets",
1978 &format!("id:{}", bucket_id.0),
1979 "keyed-sync-states/",
1980 ],
1981 )
1982 }
1983
1984 fn keyed_sync_state(&self, bucket_id: &BucketId, id: &KeyedSyncStateId) -> Result<Url> {
1985 construct_endpoint(
1986 &self.base,
1987 &[
1988 "api",
1989 "_private",
1990 "buckets",
1991 &format!("id:{}", bucket_id.0),
1992 "keyed-sync-state",
1993 &id.0,
1994 ],
1995 )
1996 }
1997
1998 fn query_keyed_sync_state_ids(&self, bucket_id: &BucketId) -> Result<Url> {
1999 construct_endpoint(
2000 &self.base,
2001 &[
2002 "api",
2003 "_private",
2004 "buckets",
2005 &format!("id:{}", bucket_id.0),
2006 "keyed-sync-state-ids",
2007 ],
2008 )
2009 }
2010
2011 fn audit_events_query(&self) -> Result<Url> {
2012 construct_endpoint(&self.base, &["api", "v1", "audit_events", "query"])
2013 }
2014
2015 fn integrations(&self) -> Result<Url> {
2016 construct_endpoint(&self.base, &["api", "_private", "integrations"])
2017 }
2018
2019 fn integration(&self, name: &IntegrationFullName) -> Result<Url> {
2020 construct_endpoint(&self.base, &["api", "_private", "integrations", &name.0])
2021 }
2022
2023 fn attachment_reference(&self, reference: &AttachmentReference) -> Result<Url> {
2024 construct_endpoint(&self.base, &["api", "v1", "attachments", &reference.0])
2025 }
2026
2027 fn attachment_upload(
2028 &self,
2029 source_id: &SourceId,
2030 comment_id: &CommentId,
2031 attachment_index: usize,
2032 ) -> Result<Url> {
2033 construct_endpoint(
2034 &self.base,
2035 &[
2036 "api",
2037 "_private",
2038 "sources",
2039 &format!("id:{}", source_id.0),
2040 "comments",
2041 &comment_id.0,
2042 "attachments",
2043 &attachment_index.to_string(),
2044 ],
2045 )
2046 }
2047 fn latest_validation(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2048 construct_endpoint(
2049 &self.base,
2050 &[
2051 "api",
2052 "_private",
2053 "datasets",
2054 &dataset_name.0,
2055 "labellers",
2056 "latest",
2057 "validation",
2058 ],
2059 )
2060 }
2061
2062 fn validation(
2063 &self,
2064 dataset_name: &DatasetFullName,
2065 model_version: &ModelVersion,
2066 ) -> Result<Url> {
2067 construct_endpoint(
2068 &self.base,
2069 &[
2070 "api",
2071 "_private",
2072 "datasets",
2073 &dataset_name.0,
2074 "labellers",
2075 &model_version.0.to_string(),
2076 "validation",
2077 ],
2078 )
2079 }
2080
2081 fn label_validation(
2082 &self,
2083 dataset_name: &DatasetFullName,
2084 model_version: &ModelVersion,
2085 ) -> Result<Url> {
2086 construct_endpoint(
2087 &self.base,
2088 &[
2089 "api",
2090 "_private",
2091 "datasets",
2092 &dataset_name.0,
2093 "labellers",
2094 &model_version.0.to_string(),
2095 "label-validation",
2096 ],
2097 )
2098 }
2099 fn bucket_statistics(&self, bucket_name: &BucketFullName) -> Result<Url> {
2100 construct_endpoint(
2101 &self.base,
2102 &["api", "_private", "buckets", &bucket_name.0, "statistics"],
2103 )
2104 }
2105
2106 fn dataset_summary(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2107 construct_endpoint(
2108 &self.base,
2109 &["api", "_private", "datasets", &dataset_name.0, "summary"],
2110 )
2111 }
2112
2113 fn query_dataset(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2114 construct_endpoint(
2115 &self.base,
2116 &["api", "_private", "datasets", &dataset_name.0, "query"],
2117 )
2118 }
2119
2120 fn streams(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2121 construct_endpoint(
2122 &self.base,
2123 &["api", "v1", "datasets", &dataset_name.0, "streams"],
2124 )
2125 }
2126
2127 fn stream(&self, stream_name: &StreamFullName) -> Result<Url> {
2128 construct_endpoint(
2129 &self.base,
2130 &[
2131 "api",
2132 "v1",
2133 "datasets",
2134 &stream_name.dataset.0,
2135 "streams",
2136 &stream_name.stream.0,
2137 ],
2138 )
2139 }
2140
2141 fn stream_fetch(&self, stream_name: &StreamFullName) -> Result<Url> {
2142 construct_endpoint(
2143 &self.base,
2144 &[
2145 "api",
2146 "v1",
2147 "datasets",
2148 &stream_name.dataset.0,
2149 "streams",
2150 &stream_name.stream.0,
2151 "fetch",
2152 ],
2153 )
2154 }
2155
2156 fn stream_advance(&self, stream_name: &StreamFullName) -> Result<Url> {
2157 construct_endpoint(
2158 &self.base,
2159 &[
2160 "api",
2161 "v1",
2162 "datasets",
2163 &stream_name.dataset.0,
2164 "streams",
2165 &stream_name.stream.0,
2166 "advance",
2167 ],
2168 )
2169 }
2170
2171 fn stream_reset(&self, stream_name: &StreamFullName) -> Result<Url> {
2172 construct_endpoint(
2173 &self.base,
2174 &[
2175 "api",
2176 "v1",
2177 "datasets",
2178 &stream_name.dataset.0,
2179 "streams",
2180 &stream_name.stream.0,
2181 "reset",
2182 ],
2183 )
2184 }
2185
2186 fn stream_exceptions(&self, stream_name: &StreamFullName) -> Result<Url> {
2187 construct_endpoint(
2188 &self.base,
2189 &[
2190 "api",
2191 "v1",
2192 "datasets",
2193 &stream_name.dataset.0,
2194 "streams",
2195 &stream_name.stream.0,
2196 "exceptions",
2197 ],
2198 )
2199 }
2200
2201 fn recent_comments(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2202 construct_endpoint(
2203 &self.base,
2204 &["api", "_private", "datasets", &dataset_name.0, "recent"],
2205 )
2206 }
2207
2208 fn dataset_statistics(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2209 construct_endpoint(
2210 &self.base,
2211 &["api", "_private", "datasets", &dataset_name.0, "statistics"],
2212 )
2213 }
2214
2215 fn source_statistics(&self, source_name: &SourceFullName) -> Result<Url> {
2216 construct_endpoint(
2217 &self.base,
2218 &["api", "v1", "sources", &source_name.0, "statistics"],
2219 )
2220 }
2221
2222 fn user_by_id(&self, user_id: &UserId) -> Result<Url> {
2223 construct_endpoint(&self.base, &["api", "_private", "users", &user_id.0])
2224 }
2225
2226 fn source_by_id(&self, source_id: &SourceId) -> Result<Url> {
2227 construct_endpoint(
2228 &self.base,
2229 &["api", "v1", "sources", &format!("id:{}", source_id.0)],
2230 )
2231 }
2232
2233 fn source_by_name(&self, source_name: &SourceFullName) -> Result<Url> {
2234 construct_endpoint(&self.base, &["api", "v1", "sources", &source_name.0])
2235 }
2236
2237 fn quotas(&self, tenant_id: &Option<UiPathTenantId>) -> Result<Url> {
2238 if let Some(tenant_id) = tenant_id {
2239 construct_endpoint(&self.base, &["api", "_private", "quotas", &tenant_id.0])
2240 } else {
2241 construct_endpoint(&self.base, &["api", "_private", "quotas"])
2242 }
2243 }
2244
2245 fn quota(&self, tenant_id: &TenantId, tenant_quota_kind: TenantQuotaKind) -> Result<Url> {
2246 construct_endpoint(
2247 &self.base,
2248 &[
2249 "api",
2250 "_private",
2251 "quotas",
2252 &tenant_id.to_string(),
2253 &tenant_quota_kind.to_string(),
2254 ],
2255 )
2256 }
2257
2258 fn put_comments(&self, source_name: &SourceFullName) -> Result<Url> {
2259 construct_endpoint(
2260 &self.base,
2261 &["api", "_private", "sources", &source_name.0, "comments"],
2262 )
2263 }
2264
2265 fn comments(&self, source_name: &SourceFullName) -> Result<Url> {
2266 construct_endpoint(
2267 &self.base,
2268 &["api", "_private", "sources", &source_name.0, "comments"],
2269 )
2270 }
2271
2272 fn comment_by_id(&self, source_name: &SourceFullName, comment_id: &CommentId) -> Result<Url> {
2273 construct_endpoint(
2274 &self.base,
2275 &[
2276 "api",
2277 "v1",
2278 "sources",
2279 &source_name.0,
2280 "comments",
2281 &comment_id.0,
2282 ],
2283 )
2284 }
2285
2286 fn comments_v1(&self, source_name: &SourceFullName) -> Result<Url> {
2287 construct_endpoint(
2288 &self.base,
2289 &["api", "v1", "sources", &source_name.0, "comments"],
2290 )
2291 }
2292
2293 fn sync_comments(&self, source_name: &SourceFullName) -> Result<Url> {
2294 construct_endpoint(
2295 &self.base,
2296 &["api", "v1", "sources", &source_name.0, "sync"],
2297 )
2298 }
2299
2300 fn sync_comments_raw_emails(&self, source_name: &SourceFullName) -> Result<Url> {
2301 construct_endpoint(
2302 &self.base,
2303 &["api", "v1", "sources", &source_name.0, "sync-raw-emails"],
2304 )
2305 }
2306
2307 fn comment_audio(&self, source_id: &SourceId, comment_id: &CommentId) -> Result<Url> {
2308 construct_endpoint(
2309 &self.base,
2310 &[
2311 "api",
2312 "_private",
2313 "sources",
2314 &format!("id:{}", source_id.0),
2315 "comments",
2316 &comment_id.0,
2317 "audio",
2318 ],
2319 )
2320 }
2321
2322 fn get_emails(&self, bucket_name: &BucketFullName) -> Result<Url> {
2323 construct_endpoint(
2324 &self.base,
2325 &["api", "_private", "buckets", &bucket_name.0, "emails"],
2326 )
2327 }
2328
2329 fn put_emails(&self, bucket_name: &BucketFullName) -> Result<Url> {
2330 construct_endpoint(
2331 &self.base,
2332 &["api", "_private", "buckets", &bucket_name.0, "emails"],
2333 )
2334 }
2335
2336 fn post_user(&self, user_id: &UserId) -> Result<Url> {
2337 construct_endpoint(&self.base, &["api", "_private", "users", &user_id.0])
2338 }
2339
2340 fn dataset_by_id(&self, dataset_id: &DatasetId) -> Result<Url> {
2341 construct_endpoint(
2342 &self.base,
2343 &["api", "v1", "datasets", &format!("id:{}", dataset_id.0)],
2344 )
2345 }
2346
2347 fn dataset_by_name(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2348 construct_endpoint(&self.base, &["api", "v1", "datasets", &dataset_name.0])
2349 }
2350
2351 fn get_labellings(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2352 construct_endpoint(
2353 &self.base,
2354 &["api", "_private", "datasets", &dataset_name.0, "labellings"],
2355 )
2356 }
2357
2358 fn labellers(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2359 construct_endpoint(
2360 &self.base,
2361 &["api", "_private", "datasets", &dataset_name.0, "labellers"],
2362 )
2363 }
2364
2365 fn get_comment_predictions(
2366 &self,
2367 dataset_name: &DatasetFullName,
2368 model_version: &ModelVersion,
2369 ) -> Result<Url> {
2370 construct_endpoint(
2371 &self.base,
2372 &[
2373 "api",
2374 "v1",
2375 "datasets",
2376 &dataset_name.0,
2377 "labellers",
2378 &model_version.0.to_string(),
2379 "predict-comments",
2380 ],
2381 )
2382 }
2383
2384 fn post_labelling(
2385 &self,
2386 dataset_name: &DatasetFullName,
2387 comment_uid: &CommentUid,
2388 ) -> Result<Url> {
2389 construct_endpoint(
2390 &self.base,
2391 &[
2392 "api",
2393 "_private",
2394 "datasets",
2395 &dataset_name.0,
2396 "labellings",
2397 &comment_uid.0,
2398 ],
2399 )
2400 }
2401
2402 fn bucket_by_id(&self, bucket_id: &BucketId) -> Result<Url> {
2403 construct_endpoint(
2404 &self.base,
2405 &["api", "_private", "buckets", &format!("id:{}", bucket_id.0)],
2406 )
2407 }
2408
2409 fn bucket_by_name(&self, bucket_name: &BucketFullName) -> Result<Url> {
2410 construct_endpoint(&self.base, &["api", "_private", "buckets", &bucket_name.0])
2411 }
2412
2413 fn project_by_name(&self, project_name: &ProjectName) -> Result<Url> {
2414 construct_endpoint(
2415 &self.base,
2416 &["api", "_private", "projects", &project_name.0],
2417 )
2418 }
2419
2420 fn welcome_email(&self, user_id: &UserId) -> Result<Url> {
2421 construct_endpoint(
2422 &self.base,
2423 &["api", "_private", "users", &user_id.0, "welcome-email"],
2424 )
2425 }
2426}
2427
2428const DEFAULT_HTTP_TIMEOUT_SECONDS: u64 = 240;
2429
2430fn build_http_client(config: &Config) -> Result<HttpClient> {
2431 let mut builder = HttpClient::builder()
2432 .gzip(true)
2433 .danger_accept_invalid_certs(config.accept_invalid_certificates)
2434 .timeout(Some(Duration::from_secs(DEFAULT_HTTP_TIMEOUT_SECONDS)));
2435
2436 if let Some(proxy) = config.proxy.clone() {
2437 builder = builder.proxy(Proxy::all(proxy).map_err(Error::BuildHttpClient)?);
2438 }
2439 builder.build().map_err(Error::BuildHttpClient)
2440}
2441
2442fn build_headers(config: &Config) -> Result<HeaderMap> {
2443 let mut headers = HeaderMap::new();
2444 headers.insert(
2445 header::AUTHORIZATION,
2446 HeaderValue::from_str(&format!("Bearer {}", &config.token.0)).map_err(|_| {
2447 Error::BadToken {
2448 token: config.token.0.clone(),
2449 }
2450 })?,
2451 );
2452 Ok(headers)
2453}
2454
2455fn id_list_query<'a>(ids: impl Iterator<Item = &'a String>) -> Vec<(&'static str, &'a str)> {
2456 ids.map(|id| ("id", id.as_str())).collect()
2460}
2461
2462pub static DEFAULT_ENDPOINT: Lazy<Url> =
2463 Lazy::new(|| Url::parse("https://reinfer.dev").expect("Default URL is well-formed"));
2464
2465#[cfg(test)]
2466mod tests {
2467 use super::*;
2468
2469 #[test]
2470 fn test_construct_endpoint() {
2471 let url = construct_endpoint(
2472 &Url::parse("https://cloud.uipath.com/org/tenant/reinfer_").unwrap(),
2473 &["api", "v1", "sources", "project", "source", "sync"],
2474 )
2475 .unwrap();
2476
2477 assert_eq!(
2478 url.to_string(),
2479 "https://cloud.uipath.com/org/tenant/reinfer_/api/v1/sources/project/source/sync"
2480 )
2481 }
2482
2483 #[test]
2484 fn test_id_list_query() {
2485 assert_eq!(id_list_query(Vec::new().iter()), Vec::new());
2486 assert_eq!(
2487 id_list_query(["foo".to_owned()].iter()),
2488 vec![("id", "foo")]
2489 );
2490 assert_eq!(
2491 id_list_query(
2492 [
2493 "Stream".to_owned(),
2494 "River".to_owned(),
2495 "Waterfall".to_owned()
2496 ]
2497 .iter()
2498 ),
2499 [("id", "Stream"), ("id", "River"), ("id", "Waterfall"),]
2500 );
2501 }
2502}