1#![deny(clippy::all)]
2mod error;
3pub mod resources;
4pub mod retry;
5
6use chrono::{DateTime, Utc};
7use http::{header::ACCEPT, Method};
8use log::debug;
9use once_cell::sync::Lazy;
10use reqwest::{
11 blocking::{
12 multipart::{Form, Part},
13 Client as HttpClient, Response as HttpResponse,
14 },
15 header::{self, HeaderMap, HeaderValue},
16 IntoUrl, Proxy, Result as ReqwestResult,
17};
18use resources::{
19 attachments::UploadAttachmentResponse,
20 auth::{RefreshUserPermissionsRequest, RefreshUserPermissionsResponse},
21 bucket::{
22 GetKeyedSyncStateIdsRequest, GetKeyedSyncStateIdsResponse, GetKeyedSyncStatesResponse,
23 KeyedSyncState, KeyedSyncStateId,
24 },
25 bucket_statistics::GetBucketStatisticsResponse,
26 comment::{AttachmentReference, CommentTimestampFilter},
27 dataset::{
28 CreateIxpDatasetRequest, CreateIxpDatasetResponse, GetAllModelsInDatasetRequest,
29 GetAllModelsInDatasetRespone, IxpDatasetNew, QueryRequestParams, QueryResponse,
30 StatisticsRequestParams as DatasetStatisticsRequestParams, SummaryRequestParams,
31 SummaryResponse, UploadIxpDocumentResponse, UserModelMetadata,
32 },
33 documents::{Document, SyncRawEmailsRequest, SyncRawEmailsResponse},
34 email::{Email, GetEmailResponse},
35 integration::{
36 GetIntegrationResponse, GetIntegrationsResponse, Integration, NewIntegration,
37 PostIntegrationRequest, PostIntegrationResponse, PutIntegrationRequest,
38 PutIntegrationResponse,
39 },
40 label_def::{CreateOrUpdateLabelDefsBulkRequest, CreateOrUpdateLabelDefsBulkResponse},
41 project::ForceDeleteProject,
42 quota::{GetQuotasResponse, Quota},
43 source::StatisticsRequestParams as SourceStatisticsRequestParams,
44 stream::{GetStreamResponse, NewStream, PutStreamRequest, PutStreamResponse},
45 validation::{
46 LabelValidation, LabelValidationRequest, LabelValidationResponse, ValidationResponse,
47 },
48};
49use serde::{Deserialize, Serialize};
50use serde_json::json;
51use std::{
52 cell::Cell,
53 fmt::{Debug, Display},
54 io::Read,
55 path::{Path, PathBuf},
56 time::Duration,
57};
58use url::Url;
59
60use crate::resources::{
61 audit::{AuditQueryFilter, AuditQueryRequest, AuditQueryResponse},
62 bucket::{
63 CreateRequest as CreateBucketRequest, CreateResponse as CreateBucketResponse,
64 GetAvailableResponse as GetAvailableBucketsResponse, GetResponse as GetBucketResponse,
65 },
66 bucket_statistics::Statistics as BucketStatistics,
67 comment::{
68 GetAnnotationsResponse, GetCommentResponse, GetLabellingsAfter, GetPredictionsResponse,
69 GetRecentRequest, PutCommentsRequest, PutCommentsResponse, RecentCommentsPage,
70 SyncCommentsRequest, UpdateAnnotationsRequest,
71 },
72 dataset::{
73 CreateRequest as CreateDatasetRequest, CreateResponse as CreateDatasetResponse,
74 GetAvailableResponse as GetAvailableDatasetsResponse, GetResponse as GetDatasetResponse,
75 UpdateRequest as UpdateDatasetRequest, UpdateResponse as UpdateDatasetResponse,
76 },
77 email::{PutEmailsRequest, PutEmailsResponse},
78 project::{
79 CreateProjectRequest, CreateProjectResponse, GetProjectResponse, GetProjectsResponse,
80 UpdateProjectRequest, UpdateProjectResponse,
81 },
82 quota::{CreateQuota, TenantQuotaKind},
83 source::{
84 CreateRequest as CreateSourceRequest, CreateResponse as CreateSourceResponse,
85 GetAvailableResponse as GetAvailableSourcesResponse, GetResponse as GetSourceResponse,
86 UpdateRequest as UpdateSourceRequest, UpdateResponse as UpdateSourceResponse,
87 },
88 statistics::GetResponse as GetStatisticsResponse,
89 stream::{
90 AdvanceRequest as StreamAdvanceRequest, FetchRequest as StreamFetchRequest,
91 GetStreamsResponse, ResetRequest as StreamResetRequest,
92 TagExceptionsRequest as TagStreamExceptionsRequest,
93 },
94 tenant_id::TenantId,
95 user::GetResponse as GetUserResponse,
96 user::{
97 CreateRequest as CreateUserRequest, CreateResponse as CreateUserResponse,
98 GetAvailableResponse as GetAvailableUsersResponse,
99 GetCurrentResponse as GetCurrentUserResponse, PostUserRequest, PostUserResponse,
100 WelcomeEmailResponse,
101 },
102 EmptySuccess, Response,
103};
104
105use crate::retry::{Retrier, RetryConfig};
106
107pub use crate::{
108 error::{Error, Result},
109 resources::{
110 bucket::{
111 Bucket, BucketType, FullName as BucketFullName, Id as BucketId,
112 Identifier as BucketIdentifier, Name as BucketName, NewBucket,
113 },
114 comment::{
115 AnnotatedComment, Comment, CommentFilter, CommentPredictionsThreshold,
116 CommentsIterPage, Continuation, EitherLabelling, Entities, Entity,
117 GetCommentPredictionsRequest, HasAnnotations, Id as CommentId, Label, Labelling,
118 Message, MessageBody, MessageSignature, MessageSubject, NewAnnotatedComment,
119 NewComment, NewEntities, NewLabelling, NewMoonForm, PredictedLabel, Prediction,
120 PropertyMap, PropertyValue, Sentiment, SyncCommentsResponse, TriggerLabelThreshold,
121 Uid as CommentUid,
122 },
123 dataset::{
124 Dataset, FullName as DatasetFullName, Id as DatasetId, Identifier as DatasetIdentifier,
125 ModelVersion, Name as DatasetName, NewDataset, UpdateDataset,
126 },
127 email::{
128 Continuation as EmailContinuation, EmailsIterPage, Id as EmailId, Mailbox, MimeContent,
129 NewEmail,
130 },
131 entity_def::{EntityDef, Id as EntityDefId, Name as EntityName, NewEntityDef},
132 integration::FullName as IntegrationFullName,
133 label_def::{
134 LabelDef, LabelDefPretrained, MoonFormFieldDef, Name as LabelName, NewLabelDef,
135 NewLabelDefPretrained, PretrainedId as LabelDefPretrainedId,
136 },
137 label_group::{
138 LabelGroup, Name as LabelGroupName, NewLabelGroup, DEFAULT_LABEL_GROUP_NAME,
139 },
140 project::{NewProject, Project, ProjectName, UpdateProject},
141 source::{
142 FullName as SourceFullName, Id as SourceId, Identifier as SourceIdentifier,
143 Name as SourceName, NewSource, Source, SourceKind, TransformTag, UpdateSource,
144 },
145 statistics::Statistics as CommentStatistics,
146 stream::{
147 Batch as StreamBatch, FullName as StreamFullName, SequenceId as StreamSequenceId,
148 Stream, StreamException, StreamExceptionMetadata,
149 },
150 user::{
151 Email as UserEmail, GlobalPermission, Id as UserId, Identifier as UserIdentifier,
152 ModifiedPermissions, NewUser, ProjectPermission, UpdateUser, User, Username,
153 },
154 },
155};
156
157#[derive(Clone, Debug, PartialEq, Eq)]
158pub struct Token(pub String);
159
160pub trait SplittableRequest {
161 fn split(self) -> impl Iterator<Item = Self>
162 where
163 Self: Sized;
164
165 fn count(&self) -> usize;
166}
167
168pub struct SplitableRequestResponse<ResponseT>
169where
170 for<'de> ResponseT: Deserialize<'de> + ReducibleResponse,
171{
172 pub response: ResponseT,
173 pub num_failed: usize,
174}
175
176pub trait ReducibleResponse {
177 fn merge(self, _b: Self) -> Self
178 where
179 Self: std::default::Default,
180 {
181 Default::default()
182 }
183
184 fn empty() -> Self
185 where
186 Self: std::default::Default,
187 {
188 Default::default()
189 }
190}
191
192pub struct Config {
193 pub endpoint: Url,
194 pub token: Token,
195 pub accept_invalid_certificates: bool,
196 pub proxy: Option<Url>,
197 pub retry_config: Option<RetryConfig>,
200}
201
202impl Default for Config {
203 fn default() -> Self {
204 Config {
205 endpoint: DEFAULT_ENDPOINT.clone(),
206 token: Token("".to_owned()),
207 accept_invalid_certificates: false,
208 proxy: None,
209 retry_config: None,
210 }
211 }
212}
213
214#[derive(Debug)]
215pub struct Client {
216 endpoints: Endpoints,
217 http_client: HttpClient,
218 headers: HeaderMap,
219 retrier: Option<Retrier>,
220}
221
222#[derive(Serialize)]
223pub struct GetLabellingsInBulk<'a> {
224 pub source_id: &'a SourceId,
225 pub return_predictions: &'a bool,
226
227 #[serde(skip_serializing_if = "Option::is_none")]
228 pub after: &'a Option<GetLabellingsAfter>,
229
230 #[serde(skip_serializing_if = "Option::is_none")]
231 pub limit: &'a Option<usize>,
232}
233
234#[derive(Serialize)]
235pub struct GetCommentsIterPageQuery<'a> {
236 #[serde(skip_serializing_if = "Option::is_none")]
237 pub from_timestamp: Option<DateTime<Utc>>,
238 #[serde(skip_serializing_if = "Option::is_none")]
239 pub to_timestamp: Option<DateTime<Utc>>,
240 #[serde(skip_serializing_if = "Option::is_none")]
241 pub after: Option<&'a Continuation>,
242 pub limit: usize,
243 pub include_markup: bool,
244}
245
246#[derive(Serialize)]
247pub struct GetEmailsIterPageQuery<'a> {
248 #[serde(skip_serializing_if = "Option::is_none")]
249 pub continuation: Option<&'a EmailContinuation>,
250 pub limit: usize,
251}
252
253#[derive(Serialize)]
254pub struct GetCommentQuery {
255 pub include_markup: bool,
256}
257
258#[derive(Serialize)]
259pub struct GetEmailQuery {
260 pub id: String,
261}
262
263impl Client {
264 pub fn new(config: Config) -> Result<Client> {
266 let http_client = build_http_client(&config)?;
267 let headers = build_headers(&config)?;
268 let endpoints = Endpoints::new(config.endpoint)?;
269 let retrier = config.retry_config.map(Retrier::new);
270 Ok(Client {
271 endpoints,
272 http_client,
273 headers,
274 retrier,
275 })
276 }
277
278 pub fn base_url(&self) -> &Url {
280 &self.endpoints.base
281 }
282
283 pub fn get_sources(&self) -> Result<Vec<Source>> {
285 Ok(self
286 .get::<_, GetAvailableSourcesResponse>(self.endpoints.sources.clone())?
287 .sources)
288 }
289
290 pub fn get_user(&self, user: impl Into<UserIdentifier>) -> Result<User> {
292 Ok(match user.into() {
293 UserIdentifier::Id(user_id) => {
294 self.get::<_, GetUserResponse>(self.endpoints.user_by_id(&user_id)?)?
295 .user
296 }
297 })
298 }
299
300 pub fn get_source(&self, source: impl Into<SourceIdentifier>) -> Result<Source> {
302 Ok(match source.into() {
303 SourceIdentifier::Id(source_id) => {
304 self.get::<_, GetSourceResponse>(self.endpoints.source_by_id(&source_id)?)?
305 .source
306 }
307 SourceIdentifier::FullName(source_name) => {
308 self.get::<_, GetSourceResponse>(self.endpoints.source_by_name(&source_name)?)?
309 .source
310 }
311 })
312 }
313
314 pub fn create_label_defs_bulk(
315 &self,
316 dataset_name: &DatasetFullName,
317 label_group: LabelGroupName,
318 label_defs: Vec<NewLabelDef>,
319 ) -> Result<()> {
320 self.put::<_, _, CreateOrUpdateLabelDefsBulkResponse>(
321 self.endpoints.label_group(dataset_name, label_group)?,
322 CreateOrUpdateLabelDefsBulkRequest { label_defs },
323 )?;
324 Ok(())
325 }
326
327 pub fn create_source(
329 &self,
330 source_name: &SourceFullName,
331 options: NewSource<'_>,
332 ) -> Result<Source> {
333 Ok(self
334 .put::<_, _, CreateSourceResponse>(
335 self.endpoints.source_by_name(source_name)?,
336 CreateSourceRequest { source: options },
337 )?
338 .source)
339 }
340
341 pub fn update_source(
343 &self,
344 source_name: &SourceFullName,
345 options: UpdateSource<'_>,
346 ) -> Result<Source> {
347 Ok(self
348 .post::<_, _, UpdateSourceResponse>(
349 self.endpoints.source_by_name(source_name)?,
350 UpdateSourceRequest { source: options },
351 Retry::Yes,
352 )?
353 .source)
354 }
355
356 pub fn delete_source(&self, source: impl Into<SourceIdentifier>) -> Result<()> {
358 let source_id = match source.into() {
359 SourceIdentifier::Id(source_id) => source_id,
360 source @ SourceIdentifier::FullName(_) => self.get_source(source)?.id,
361 };
362 self.delete(self.endpoints.source_by_id(&source_id)?)
363 }
364
365 pub fn create_quota(
367 &self,
368 target_tenant_id: &TenantId,
369 tenant_quota_kind: TenantQuotaKind,
370 options: CreateQuota,
371 ) -> Result<()> {
372 self.post(
373 self.endpoints.quota(target_tenant_id, tenant_quota_kind)?,
374 options,
375 Retry::Yes,
376 )
377 }
378
379 pub fn get_quotas(&self) -> Result<Vec<Quota>> {
381 Ok(self
382 .get::<_, GetQuotasResponse>(self.endpoints.quotas()?)?
383 .quotas)
384 }
385
386 pub fn delete_user(&self, user: impl Into<UserIdentifier>) -> Result<()> {
388 let UserIdentifier::Id(user_id) = user.into();
389 self.delete(self.endpoints.user_by_id(&user_id)?)
390 }
391
392 pub fn delete_comments(
394 &self,
395 source: impl Into<SourceIdentifier>,
396 comments: &[CommentId],
397 ) -> Result<()> {
398 let source_full_name = match source.into() {
399 source @ SourceIdentifier::Id(_) => self.get_source(source)?.full_name(),
400 SourceIdentifier::FullName(source_full_name) => source_full_name,
401 };
402 self.delete_query(
403 self.endpoints.comments_v1(&source_full_name)?,
404 Some(&id_list_query(comments.iter().map(|uid| &uid.0))),
405 )
406 }
407
408 pub fn get_comments_iter_page(
410 &self,
411 source_name: &SourceFullName,
412 continuation: Option<&ContinuationKind>,
413 to_timestamp: Option<DateTime<Utc>>,
414 limit: usize,
415 ) -> Result<CommentsIterPage> {
416 let (from_timestamp, after) = match continuation {
419 Some(ContinuationKind::Timestamp(from_timestamp)) => (Some(*from_timestamp), None),
422 Some(ContinuationKind::Continuation(after)) => (None, Some(after)),
425 None => (None, None),
428 };
429 let query_params = GetCommentsIterPageQuery {
430 from_timestamp,
431 to_timestamp,
432 after,
433 limit,
434 include_markup: true,
435 };
436 self.get_query(self.endpoints.comments(source_name)?, Some(&query_params))
437 }
438
439 pub fn get_dataset_query_iter<'a>(
441 &'a self,
442 dataset_name: &'a DatasetFullName,
443 params: &'a mut QueryRequestParams,
444 ) -> DatasetQueryIter<'a> {
445 DatasetQueryIter::new(self, dataset_name, params)
446 }
447
448 pub fn get_comments_iter<'a>(
450 &'a self,
451 source_name: &'a SourceFullName,
452 page_size: Option<usize>,
453 timerange: CommentsIterTimerange,
454 ) -> CommentsIter<'a> {
455 CommentsIter::new(self, source_name, page_size, timerange)
456 }
457
458 pub fn get_keyed_sync_state_ids(
459 &self,
460 bucket_id: &BucketId,
461 request: &GetKeyedSyncStateIdsRequest,
462 ) -> Result<Vec<KeyedSyncStateId>> {
463 Ok(self
464 .post::<_, _, GetKeyedSyncStateIdsResponse>(
465 self.endpoints.query_keyed_sync_state_ids(bucket_id)?,
466 Some(&request),
467 Retry::Yes,
468 )?
469 .keyed_sync_state_ids)
470 }
471
472 pub fn delete_keyed_sync_state(
473 &self,
474 bucket_id: &BucketId,
475 id: &KeyedSyncStateId,
476 ) -> Result<()> {
477 self.delete(self.endpoints.keyed_sync_state(bucket_id, id)?)
478 }
479
480 pub fn get_keyed_sync_states(&self, bucket_id: &BucketId) -> Result<Vec<KeyedSyncState>> {
481 Ok(self
482 .get::<_, GetKeyedSyncStatesResponse>(self.endpoints.keyed_sync_states(bucket_id)?)?
483 .keyed_sync_states)
484 }
485
486 pub fn get_email(&self, bucket_name: &BucketFullName, id: EmailId) -> Result<Vec<Email>> {
488 let query_params = GetEmailQuery { id: id.0 };
489 Ok(self
490 .get_query::<_, _, GetEmailResponse>(
491 self.endpoints.get_emails(bucket_name)?,
492 Some(&query_params),
493 )?
494 .emails)
495 }
496
497 pub fn get_emails_iter_page(
499 &self,
500 bucket_name: &BucketFullName,
501 continuation: Option<&EmailContinuation>,
502 limit: usize,
503 ) -> Result<EmailsIterPage> {
504 let query_params = GetEmailsIterPageQuery {
505 continuation,
506 limit,
507 };
508 self.post(
509 self.endpoints.get_emails(bucket_name)?,
510 Some(&query_params),
511 Retry::Yes,
512 )
513 }
514
515 pub fn get_emails_iter<'a>(
517 &'a self,
518 bucket_name: &'a BucketFullName,
519 page_size: Option<usize>,
520 ) -> EmailsIter<'a> {
521 EmailsIter::new(self, bucket_name, page_size)
522 }
523
524 pub fn get_comment<'a>(
526 &'a self,
527 source_name: &'a SourceFullName,
528 comment_id: &'a CommentId,
529 ) -> Result<Comment> {
530 let query_params = GetCommentQuery {
531 include_markup: true,
532 };
533 Ok(self
534 .get_query::<_, _, GetCommentResponse>(
535 self.endpoints.comment_by_id(source_name, comment_id)?,
536 Some(&query_params),
537 )?
538 .comment)
539 }
540 pub fn post_integration(
541 &self,
542 name: &IntegrationFullName,
543 integration: &NewIntegration,
544 ) -> Result<PostIntegrationResponse> {
545 self.request(
546 &Method::POST,
547 &self.endpoints.integration(name)?,
548 &Some(PostIntegrationRequest {
549 integration: integration.clone(),
550 }),
551 &None::<()>,
552 &Retry::No,
553 )
554 }
555
556 pub fn put_integration(
557 &self,
558 name: &IntegrationFullName,
559 integration: &NewIntegration,
560 ) -> Result<PutIntegrationResponse> {
561 self.request(
562 &Method::PUT,
563 &self.endpoints.integration(name)?,
564 &Some(PutIntegrationRequest {
565 integration: integration.clone(),
566 }),
567 &None::<()>,
568 &Retry::No,
569 )
570 }
571
572 pub fn put_comments_split_on_failure(
573 &self,
574 source_name: &SourceFullName,
575 comments: Vec<NewComment>,
576 no_charge: bool,
577 ) -> Result<SplitableRequestResponse<PutCommentsResponse>> {
578 self.splitable_request(
582 Method::PUT,
583 self.endpoints.put_comments(source_name)?,
584 PutCommentsRequest { comments },
585 Some(NoChargeQuery { no_charge }),
586 Retry::Yes,
587 )
588 }
589
590 pub fn put_comments(
591 &self,
592 source_name: &SourceFullName,
593 comments: Vec<NewComment>,
594 no_charge: bool,
595 ) -> Result<PutCommentsResponse> {
596 self.request(
599 &Method::PUT,
600 &self.endpoints.put_comments(source_name)?,
601 &Some(PutCommentsRequest { comments }),
602 &Some(NoChargeQuery { no_charge }),
603 &Retry::Yes,
604 )
605 }
606
607 pub fn put_stream(
608 &self,
609 dataset_name: &DatasetFullName,
610 stream: &NewStream,
611 ) -> Result<PutStreamResponse> {
612 self.put(
613 self.endpoints.streams(dataset_name)?,
614 Some(PutStreamRequest { stream }),
615 )
616 }
617
618 pub fn get_audit_events(
619 &self,
620 minimum_timestamp: Option<DateTime<Utc>>,
621 maximum_timestamp: Option<DateTime<Utc>>,
622 continuation: Option<Continuation>,
623 ) -> Result<AuditQueryResponse> {
624 self.post::<_, _, AuditQueryResponse>(
625 self.endpoints.audit_events_query()?,
626 AuditQueryRequest {
627 continuation,
628 filter: AuditQueryFilter {
629 timestamp: CommentTimestampFilter {
630 minimum: minimum_timestamp,
631 maximum: maximum_timestamp,
632 },
633 },
634 },
635 Retry::Yes,
636 )
637 }
638 pub fn get_latest_validation(
639 &self,
640 dataset_name: &DatasetFullName,
641 ) -> Result<ValidationResponse> {
642 self.get::<_, ValidationResponse>(self.endpoints.latest_validation(dataset_name)?)
643 }
644
645 pub fn get_validation(
646 &self,
647 dataset_name: &DatasetFullName,
648 model_version: &ModelVersion,
649 ) -> Result<ValidationResponse> {
650 self.get::<_, ValidationResponse>(self.endpoints.validation(dataset_name, model_version)?)
651 }
652
653 pub fn get_labellers(&self, dataset_name: &DatasetFullName) -> Result<Vec<UserModelMetadata>> {
654 Ok(self
655 .post::<_, _, GetAllModelsInDatasetRespone>(
656 self.endpoints.labellers(dataset_name)?,
657 GetAllModelsInDatasetRequest {},
658 Retry::Yes,
659 )?
660 .labellers)
661 }
662
663 pub fn get_label_validation(
664 &self,
665 label: &LabelName,
666 dataset_name: &DatasetFullName,
667 model_version: &ModelVersion,
668 ) -> Result<LabelValidation> {
669 Ok(self
670 .post::<_, _, LabelValidationResponse>(
671 self.endpoints
672 .label_validation(dataset_name, model_version)?,
673 LabelValidationRequest {
674 label: label.clone(),
675 },
676 Retry::Yes,
677 )?
678 .label_validation)
679 }
680
681 pub fn sync_comments(
682 &self,
683 source_name: &SourceFullName,
684 comments: Vec<NewComment>,
685 no_charge: bool,
686 ) -> Result<SyncCommentsResponse> {
687 self.request(
688 &Method::POST,
689 &self.endpoints.sync_comments(source_name)?,
690 &Some(SyncCommentsRequest { comments }),
691 &Some(NoChargeQuery { no_charge }),
692 &Retry::Yes,
693 )
694 }
695
696 pub fn sync_comments_split_on_failure(
697 &self,
698 source_name: &SourceFullName,
699 comments: Vec<NewComment>,
700 no_charge: bool,
701 ) -> Result<SplitableRequestResponse<SyncCommentsResponse>> {
702 self.splitable_request(
703 Method::POST,
704 self.endpoints.sync_comments(source_name)?,
705 SyncCommentsRequest { comments },
706 Some(NoChargeQuery { no_charge }),
707 Retry::Yes,
708 )
709 }
710
711 pub fn sync_raw_emails(
712 &self,
713 source_name: &SourceFullName,
714 documents: &[Document],
715 transform_tag: &TransformTag,
716 include_comments: bool,
717 no_charge: bool,
718 ) -> Result<SyncRawEmailsResponse> {
719 self.request(
720 &Method::POST,
721 &self.endpoints.sync_comments_raw_emails(source_name)?,
722 &Some(SyncRawEmailsRequest {
723 documents,
724 transform_tag,
725 include_comments,
726 }),
727 &Some(NoChargeQuery { no_charge }),
728 &Retry::Yes,
729 )
730 }
731
732 pub fn put_emails_split_on_failure(
733 &self,
734 bucket_name: &BucketFullName,
735 emails: Vec<NewEmail>,
736 no_charge: bool,
737 ) -> Result<SplitableRequestResponse<PutEmailsResponse>> {
738 self.splitable_request(
739 Method::PUT,
740 self.endpoints.put_emails(bucket_name)?,
741 PutEmailsRequest { emails },
742 Some(NoChargeQuery { no_charge }),
743 Retry::Yes,
744 )
745 }
746
747 pub fn put_emails(
748 &self,
749 bucket_name: &BucketFullName,
750 emails: Vec<NewEmail>,
751 no_charge: bool,
752 ) -> Result<PutEmailsResponse> {
753 self.request(
754 &Method::PUT,
755 &self.endpoints.put_emails(bucket_name)?,
756 &Some(PutEmailsRequest { emails }),
757 &Some(NoChargeQuery { no_charge }),
758 &Retry::Yes,
759 )
760 }
761
762 pub fn post_user(&self, user_id: &UserId, user: UpdateUser) -> Result<PostUserResponse> {
763 self.post(
764 self.endpoints.post_user(user_id)?,
765 PostUserRequest { user: &user },
766 Retry::Yes,
767 )
768 }
769
770 pub fn put_comment_audio(
771 &self,
772 source_id: &SourceId,
773 comment_id: &CommentId,
774 audio_path: impl AsRef<Path>,
775 ) -> Result<()> {
776 let form = Form::new()
777 .file("file", audio_path)
778 .map_err(|source| Error::Unknown {
779 message: "PUT comment audio operation failed".to_owned(),
780 source: source.into(),
781 })?;
782 let http_response = self
783 .http_client
784 .put(self.endpoints.comment_audio(source_id, comment_id)?)
785 .headers(self.headers.clone())
786 .multipart(form)
787 .send()
788 .map_err(|source| Error::ReqwestError {
789 message: "PUT comment audio operation failed".to_owned(),
790 source,
791 })?;
792 let status = http_response.status();
793 http_response
794 .json::<Response<EmptySuccess>>()
795 .map_err(Error::BadJsonResponse)?
796 .into_result(status)?;
797 Ok(())
798 }
799
800 pub fn upload_ixp_document(
801 &self,
802 source_id: &SourceId,
803 filename: String,
804 bytes: Vec<u8>,
805 ) -> Result<CommentId> {
806 let endpoint = self.endpoints.ixp_documents(source_id)?;
807
808 let do_request = || {
809 let form = Form::new().part(
810 "file",
811 Part::bytes(bytes.clone()).file_name(filename.clone()),
812 );
813 let request = self
814 .http_client
815 .request(Method::PUT, endpoint.clone())
816 .multipart(form)
817 .headers(self.headers.clone());
818
819 request.send()
820 };
821
822 let result = self.with_retries(do_request);
823
824 let http_response = result.map_err(|source| Error::ReqwestError {
825 source,
826 message: "Operation failed.".to_string(),
827 })?;
828
829 let status = http_response.status();
830
831 Ok(http_response
832 .json::<Response<UploadIxpDocumentResponse>>()
833 .map_err(Error::BadJsonResponse)?
834 .into_result(status)?
835 .comment_id)
836 }
837
838 pub fn upload_comment_attachment(
839 &self,
840 source_id: &SourceId,
841 comment_id: &CommentId,
842 attachment_index: usize,
843 attachment: &PathBuf,
844 ) -> Result<UploadAttachmentResponse> {
845 let url = self
846 .endpoints
847 .attachment_upload(source_id, comment_id, attachment_index)?;
848
849 if !attachment.is_file() || !attachment.exists() {
850 return Err(Error::FileDoesNotExist {
851 path: attachment.clone(),
852 });
853 }
854
855 let do_request = || {
856 let form = Form::new()
857 .file("file", attachment)
858 .map_err(|source| Error::Unknown {
859 message: "PUT comment attachment operation failed".to_owned(),
860 source: source.into(),
861 })
862 .unwrap();
863 let request = self
864 .http_client
865 .request(Method::PUT, url.clone())
866 .multipart(form)
867 .headers(self.headers.clone());
868
869 request.send()
870 };
871
872 let result = self.with_retries(do_request);
873
874 let http_response = result.map_err(|source| Error::ReqwestError {
875 source,
876 message: "Operation failed.".to_string(),
877 })?;
878
879 let status = http_response.status();
880
881 http_response
882 .json::<Response<UploadAttachmentResponse>>()
883 .map_err(Error::BadJsonResponse)?
884 .into_result(status)
885 }
886
887 pub fn get_ixp_document(
888 &self,
889 source_id: &SourceId,
890 comment_id: &CommentId,
891 ) -> Result<Vec<u8>> {
892 self.get_octet_stream(&self.endpoints.ixp_document(source_id, comment_id)?)
893 }
894
895 fn get_octet_stream(&self, endpoint: &Url) -> Result<Vec<u8>> {
896 let mut response = self.raw_request(
897 &Method::GET,
898 endpoint,
899 &None::<()>,
900 &None::<()>,
901 &Retry::Yes,
902 None,
903 )?;
904
905 let mut buffer = Vec::new();
906
907 response
908 .read_to_end(&mut buffer)
909 .map_err(|source| Error::Unknown {
910 message: "Failed to read buffer".to_string(),
911 source: Box::new(source),
912 })?;
913 Ok(buffer)
914 }
915
916 pub fn get_attachment(&self, reference: &AttachmentReference) -> Result<Vec<u8>> {
917 self.get_octet_stream(&self.endpoints.attachment_reference(reference)?)
918 }
919
920 pub fn get_integrations(&self) -> Result<Vec<Integration>> {
921 Ok(self
922 .get::<_, GetIntegrationsResponse>(self.endpoints.integrations()?)?
923 .integrations)
924 }
925
926 pub fn get_integration(&self, name: &IntegrationFullName) -> Result<Integration> {
927 Ok(self
928 .get::<_, GetIntegrationResponse>(self.endpoints.integration(name)?)?
929 .integration)
930 }
931
932 pub fn get_datasets(&self) -> Result<Vec<Dataset>> {
933 Ok(self
934 .get::<_, GetAvailableDatasetsResponse>(self.endpoints.datasets.clone())?
935 .datasets)
936 }
937
938 pub fn get_dataset<IdentifierT>(&self, dataset: IdentifierT) -> Result<Dataset>
939 where
940 IdentifierT: Into<DatasetIdentifier>,
941 {
942 Ok(match dataset.into() {
943 DatasetIdentifier::Id(dataset_id) => {
944 self.get::<_, GetDatasetResponse>(self.endpoints.dataset_by_id(&dataset_id)?)?
945 .dataset
946 }
947 DatasetIdentifier::FullName(dataset_name) => {
948 self.get::<_, GetDatasetResponse>(self.endpoints.dataset_by_name(&dataset_name)?)?
949 .dataset
950 }
951 })
952 }
953
954 pub fn create_ixp_dataset(&self, dataset: IxpDatasetNew) -> Result<Dataset> {
956 Ok(self
957 .put::<_, _, CreateIxpDatasetResponse>(
958 self.endpoints.ixp_datasets()?,
959 CreateIxpDatasetRequest { dataset },
960 )?
961 .dataset)
962 }
963
964 pub fn create_dataset(
966 &self,
967 dataset_name: &DatasetFullName,
968 options: NewDataset<'_>,
969 ) -> Result<Dataset> {
970 Ok(self
971 .put::<_, _, CreateDatasetResponse>(
972 self.endpoints.dataset_by_name(dataset_name)?,
973 CreateDatasetRequest { dataset: options },
974 )?
975 .dataset)
976 }
977
978 pub fn update_dataset(
980 &self,
981 dataset_name: &DatasetFullName,
982 options: UpdateDataset<'_>,
983 ) -> Result<Dataset> {
984 Ok(self
985 .post::<_, _, UpdateDatasetResponse>(
986 self.endpoints.dataset_by_name(dataset_name)?,
987 UpdateDatasetRequest { dataset: options },
988 Retry::Yes,
989 )?
990 .dataset)
991 }
992
993 pub fn delete_dataset<IdentifierT>(&self, dataset: IdentifierT) -> Result<()>
994 where
995 IdentifierT: Into<DatasetIdentifier>,
996 {
997 let dataset_id = match dataset.into() {
998 DatasetIdentifier::Id(dataset_id) => dataset_id,
999 dataset @ DatasetIdentifier::FullName(_) => self.get_dataset(dataset)?.id,
1000 };
1001 self.delete(self.endpoints.dataset_by_id(&dataset_id)?)
1002 }
1003
1004 pub fn get_labellings<'a>(
1006 &self,
1007 dataset_name: &DatasetFullName,
1008 comment_uids: impl Iterator<Item = &'a CommentUid>,
1009 ) -> Result<Vec<AnnotatedComment>> {
1010 Ok(self
1011 .get_query::<_, _, GetAnnotationsResponse>(
1012 self.endpoints.get_labellings(dataset_name)?,
1013 Some(&id_list_query(comment_uids.into_iter().map(|id| &id.0))),
1014 )?
1015 .results)
1016 }
1017
1018 pub fn get_labellings_iter<'a>(
1020 &'a self,
1021 dataset_name: &'a DatasetFullName,
1022 source_id: &'a SourceId,
1023 return_predictions: bool,
1024 limit: Option<usize>,
1025 ) -> LabellingsIter<'a> {
1026 LabellingsIter::new(self, dataset_name, source_id, return_predictions, limit)
1027 }
1028
1029 pub fn get_labellings_in_bulk(
1031 &self,
1032 dataset_name: &DatasetFullName,
1033 query_parameters: GetLabellingsInBulk<'_>,
1034 ) -> Result<GetAnnotationsResponse> {
1035 self.get_query::<_, _, GetAnnotationsResponse>(
1036 self.endpoints.get_labellings(dataset_name)?,
1037 Some(&query_parameters),
1038 )
1039 }
1040
1041 pub fn update_labelling(
1043 &self,
1044 dataset_name: &DatasetFullName,
1045 comment_uid: &CommentUid,
1046 labelling: Option<&[NewLabelling]>,
1047 entities: Option<&NewEntities>,
1048 moon_forms: Option<&[NewMoonForm]>,
1049 ) -> Result<AnnotatedComment> {
1050 self.post::<_, _, AnnotatedComment>(
1051 self.endpoints.post_labelling(dataset_name, comment_uid)?,
1052 UpdateAnnotationsRequest {
1053 labelling,
1054 entities,
1055 moon_forms,
1056 },
1057 Retry::Yes,
1058 )
1059 }
1060
1061 pub fn get_comment_predictions<'a>(
1063 &self,
1064 dataset_name: &DatasetFullName,
1065 model_version: &ModelVersion,
1066 comment_uids: impl Iterator<Item = &'a CommentUid>,
1067 threshold: Option<CommentPredictionsThreshold>,
1068 labels: Option<Vec<TriggerLabelThreshold>>,
1069 ) -> Result<Vec<Prediction>> {
1070 Ok(self
1071 .post::<_, _, GetPredictionsResponse>(
1072 self.endpoints
1073 .get_comment_predictions(dataset_name, model_version)?,
1074 GetCommentPredictionsRequest {
1075 uids: comment_uids
1076 .into_iter()
1077 .map(|id| id.0.clone())
1078 .collect::<Vec<_>>(),
1079
1080 threshold,
1081 labels,
1082 },
1083 Retry::Yes,
1084 )?
1085 .predictions)
1086 }
1087
1088 pub fn get_streams(&self, dataset_name: &DatasetFullName) -> Result<Vec<Stream>> {
1089 Ok(self
1090 .get::<_, GetStreamsResponse>(self.endpoints.streams(dataset_name)?)?
1091 .streams)
1092 }
1093
1094 pub fn get_recent_comments(
1095 &self,
1096 dataset_name: &DatasetFullName,
1097 filter: &CommentFilter,
1098 limit: usize,
1099 continuation: Option<&Continuation>,
1100 ) -> Result<RecentCommentsPage> {
1101 self.post::<_, _, RecentCommentsPage>(
1102 self.endpoints.recent_comments(dataset_name)?,
1103 GetRecentRequest {
1104 limit,
1105 filter,
1106 continuation,
1107 },
1108 Retry::No,
1109 )
1110 }
1111
1112 pub fn refresh_user_permissions(&self) -> Result<RefreshUserPermissionsResponse> {
1113 self.post::<_, _, RefreshUserPermissionsResponse>(
1114 self.endpoints.refresh_user_permissions()?,
1115 RefreshUserPermissionsRequest {},
1116 Retry::Yes,
1117 )
1118 }
1119
1120 pub fn get_current_user(&self) -> Result<User> {
1121 Ok(self
1122 .get::<_, GetCurrentUserResponse>(self.endpoints.current_user.clone())?
1123 .user)
1124 }
1125
1126 pub fn get_users(&self) -> Result<Vec<User>> {
1127 Ok(self
1128 .get::<_, GetAvailableUsersResponse>(self.endpoints.users.clone())?
1129 .users)
1130 }
1131
1132 pub fn create_user(&self, user: NewUser<'_>) -> Result<User> {
1133 Ok(self
1134 .put::<_, _, CreateUserResponse>(
1135 self.endpoints.users.clone(),
1136 CreateUserRequest { user },
1137 )?
1138 .user)
1139 }
1140
1141 pub fn dataset_summary(
1142 &self,
1143 dataset_name: &DatasetFullName,
1144 params: &SummaryRequestParams,
1145 ) -> Result<SummaryResponse> {
1146 self.post::<_, _, SummaryResponse>(
1147 self.endpoints.dataset_summary(dataset_name)?,
1148 serde_json::to_value(params).expect("summary params serialization error"),
1149 Retry::Yes,
1150 )
1151 }
1152
1153 pub fn query_dataset_csv(
1154 &self,
1155 dataset_name: &DatasetFullName,
1156 params: &QueryRequestParams,
1157 ) -> Result<String> {
1158 let response = self
1159 .raw_request(
1160 &Method::POST,
1161 &self.endpoints.query_dataset(dataset_name)?,
1162 &Some(serde_json::to_value(params).expect("query params serialization error")),
1163 &None::<()>,
1164 &Retry::Yes,
1165 Some(HeaderValue::from_str("text/csv").expect("Could not parse csv header")),
1166 )?
1167 .text()
1168 .expect("Could not get csv text");
1169
1170 Ok(response)
1171 }
1172
1173 pub fn query_dataset(
1174 &self,
1175 dataset_name: &DatasetFullName,
1176 params: &QueryRequestParams,
1177 ) -> Result<QueryResponse> {
1178 self.post::<_, _, QueryResponse>(
1179 self.endpoints.query_dataset(dataset_name)?,
1180 serde_json::to_value(params).expect("query params serialization error"),
1181 Retry::Yes,
1182 )
1183 }
1184
1185 pub fn send_welcome_email(&self, user_id: UserId) -> Result<()> {
1186 self.post::<_, _, WelcomeEmailResponse>(
1187 self.endpoints.welcome_email(&user_id)?,
1188 json!({}),
1189 Retry::No,
1190 )?;
1191 Ok(())
1192 }
1193
1194 pub fn get_bucket_statistics(&self, bucket_name: &BucketFullName) -> Result<BucketStatistics> {
1195 Ok(self
1196 .get::<_, GetBucketStatisticsResponse>(self.endpoints.bucket_statistics(bucket_name)?)?
1197 .statistics)
1198 }
1199
1200 pub fn get_dataset_statistics(
1201 &self,
1202 dataset_name: &DatasetFullName,
1203 params: &DatasetStatisticsRequestParams,
1204 ) -> Result<CommentStatistics> {
1205 Ok(self
1206 .post::<_, _, GetStatisticsResponse>(
1207 self.endpoints.dataset_statistics(dataset_name)?,
1208 serde_json::to_value(params)
1209 .expect("dataset statistics params serialization error"),
1210 Retry::No,
1211 )?
1212 .statistics)
1213 }
1214
1215 pub fn get_source_statistics(
1216 &self,
1217 source_name: &SourceFullName,
1218 params: &SourceStatisticsRequestParams,
1219 ) -> Result<CommentStatistics> {
1220 Ok(self
1221 .post::<_, _, GetStatisticsResponse>(
1222 self.endpoints.source_statistics(source_name)?,
1223 serde_json::to_value(params).expect("source statistics params serialization error"),
1224 Retry::No,
1225 )?
1226 .statistics)
1227 }
1228
1229 pub fn create_bucket(
1231 &self,
1232 bucket_name: &BucketFullName,
1233 options: NewBucket<'_>,
1234 ) -> Result<Bucket> {
1235 Ok(self
1236 .put::<_, _, CreateBucketResponse>(
1237 self.endpoints.bucket_by_name(bucket_name)?,
1238 CreateBucketRequest { bucket: options },
1239 )?
1240 .bucket)
1241 }
1242
1243 pub fn get_buckets(&self) -> Result<Vec<Bucket>> {
1244 Ok(self
1245 .get::<_, GetAvailableBucketsResponse>(self.endpoints.buckets.clone())?
1246 .buckets)
1247 }
1248
1249 pub fn get_bucket<IdentifierT>(&self, bucket: IdentifierT) -> Result<Bucket>
1250 where
1251 IdentifierT: Into<BucketIdentifier>,
1252 {
1253 Ok(match bucket.into() {
1254 BucketIdentifier::Id(bucket_id) => {
1255 self.get::<_, GetBucketResponse>(self.endpoints.bucket_by_id(&bucket_id)?)?
1256 .bucket
1257 }
1258 BucketIdentifier::FullName(bucket_name) => {
1259 self.get::<_, GetBucketResponse>(self.endpoints.bucket_by_name(&bucket_name)?)?
1260 .bucket
1261 }
1262 })
1263 }
1264
1265 pub fn delete_bucket<IdentifierT>(&self, bucket: IdentifierT) -> Result<()>
1266 where
1267 IdentifierT: Into<BucketIdentifier>,
1268 {
1269 let bucket_id = match bucket.into() {
1270 BucketIdentifier::Id(bucket_id) => bucket_id,
1271 bucket @ BucketIdentifier::FullName(_) => self.get_bucket(bucket)?.id,
1272 };
1273 self.delete(self.endpoints.bucket_by_id(&bucket_id)?)
1274 }
1275
1276 pub fn fetch_stream_comments(
1277 &self,
1278 stream_name: &StreamFullName,
1279 size: u32,
1280 ) -> Result<StreamBatch> {
1281 self.post(
1282 self.endpoints.stream_fetch(stream_name)?,
1283 StreamFetchRequest { size },
1284 Retry::No,
1285 )
1286 }
1287
1288 pub fn get_stream(&self, stream_name: &StreamFullName) -> Result<Stream> {
1289 Ok(self
1290 .get::<_, GetStreamResponse>(self.endpoints.stream(stream_name)?)?
1291 .stream)
1292 }
1293
1294 pub fn advance_stream(
1295 &self,
1296 stream_name: &StreamFullName,
1297 sequence_id: StreamSequenceId,
1298 ) -> Result<()> {
1299 self.post::<_, _, serde::de::IgnoredAny>(
1300 self.endpoints.stream_advance(stream_name)?,
1301 StreamAdvanceRequest { sequence_id },
1302 Retry::No,
1303 )?;
1304 Ok(())
1305 }
1306
1307 pub fn reset_stream(
1308 &self,
1309 stream_name: &StreamFullName,
1310 to_comment_created_at: DateTime<Utc>,
1311 ) -> Result<()> {
1312 self.post::<_, _, serde::de::IgnoredAny>(
1313 self.endpoints.stream_reset(stream_name)?,
1314 StreamResetRequest {
1315 to_comment_created_at,
1316 },
1317 Retry::No,
1318 )?;
1319 Ok(())
1320 }
1321
1322 pub fn tag_stream_exceptions(
1323 &self,
1324 stream_name: &StreamFullName,
1325 exceptions: &[StreamException],
1326 ) -> Result<()> {
1327 self.put::<_, _, serde::de::IgnoredAny>(
1328 self.endpoints.stream_exceptions(stream_name)?,
1329 TagStreamExceptionsRequest { exceptions },
1330 )?;
1331 Ok(())
1332 }
1333
1334 pub fn get_project(&self, project_name: &ProjectName) -> Result<Project> {
1336 let response =
1337 self.get::<_, GetProjectResponse>(self.endpoints.project_by_name(project_name)?)?;
1338 Ok(response.project)
1339 }
1340
1341 pub fn get_projects(&self) -> Result<Vec<Project>> {
1343 let response = self.get::<_, GetProjectsResponse>(self.endpoints.projects.clone())?;
1344 Ok(response.projects)
1345 }
1346
1347 pub fn create_project(
1349 &self,
1350 project_name: &ProjectName,
1351 options: NewProject,
1352 user_ids: &[UserId],
1353 ) -> Result<Project> {
1354 Ok(self
1355 .put::<_, _, CreateProjectResponse>(
1356 self.endpoints.project_by_name(project_name)?,
1357 CreateProjectRequest {
1358 project: options,
1359 user_ids,
1360 },
1361 )?
1362 .project)
1363 }
1364
1365 pub fn update_project(
1367 &self,
1368 project_name: &ProjectName,
1369 options: UpdateProject,
1370 ) -> Result<Project> {
1371 Ok(self
1372 .post::<_, _, UpdateProjectResponse>(
1373 self.endpoints.project_by_name(project_name)?,
1374 UpdateProjectRequest { project: options },
1375 Retry::Yes,
1376 )?
1377 .project)
1378 }
1379
1380 pub fn delete_project(
1382 &self,
1383 project_name: &ProjectName,
1384 force_delete: ForceDeleteProject,
1385 ) -> Result<()> {
1386 let endpoint = self.endpoints.project_by_name(project_name)?;
1387 match force_delete {
1388 ForceDeleteProject::No => self.delete(endpoint)?,
1389 ForceDeleteProject::Yes => {
1390 self.delete_query(endpoint, Some(&json!({ "force": true })))?
1391 }
1392 };
1393 Ok(())
1394 }
1395
1396 fn get<LocationT, SuccessT>(&self, url: LocationT) -> Result<SuccessT>
1397 where
1398 LocationT: IntoUrl + Display + Clone,
1399 for<'de> SuccessT: Deserialize<'de>,
1400 {
1401 self.request(&Method::GET, &url, &None::<()>, &None::<()>, &Retry::Yes)
1402 }
1403
1404 fn get_query<LocationT, QueryT, SuccessT>(
1405 &self,
1406 url: LocationT,
1407 query: Option<&QueryT>,
1408 ) -> Result<SuccessT>
1409 where
1410 LocationT: IntoUrl + Display + Clone,
1411 QueryT: Serialize,
1412 for<'de> SuccessT: Deserialize<'de>,
1413 {
1414 self.request(&Method::GET, &url, &None::<()>, &Some(query), &Retry::Yes)
1415 }
1416
1417 fn delete<LocationT>(&self, url: LocationT) -> Result<()>
1418 where
1419 LocationT: IntoUrl + Display + Clone,
1420 {
1421 self.delete_query::<LocationT, ()>(url, None)
1422 }
1423
1424 fn delete_query<LocationT, QueryT>(&self, url: LocationT, query: Option<&QueryT>) -> Result<()>
1425 where
1426 LocationT: IntoUrl + Display + Clone,
1427 QueryT: Serialize,
1428 {
1429 debug!("Attempting DELETE `{}`", url);
1430
1431 let attempts = Cell::new(0);
1432 let http_response = self
1433 .with_retries(|| {
1434 attempts.set(attempts.get() + 1);
1435
1436 let mut request = self
1437 .http_client
1438 .delete(url.clone())
1439 .headers(self.headers.clone());
1440 if let Some(query) = query {
1441 request = request.query(query);
1442 }
1443 request.send()
1444 })
1445 .map_err(|source| Error::ReqwestError {
1446 source,
1447 message: "DELETE operation failed.".to_owned(),
1448 })?;
1449 let status = http_response.status();
1450 http_response
1451 .json::<Response<EmptySuccess>>()
1452 .map_err(Error::BadJsonResponse)?
1453 .into_result(status)
1454 .map_or_else(
1455 |error| {
1458 if attempts.get() > 1 && status == reqwest::StatusCode::NOT_FOUND {
1459 Ok(())
1460 } else {
1461 Err(error)
1462 }
1463 },
1464 |_| Ok(()),
1465 )
1466 }
1467
1468 fn post<LocationT, RequestT, SuccessT>(
1469 &self,
1470 url: LocationT,
1471 request: RequestT,
1472 retry: Retry,
1473 ) -> Result<SuccessT>
1474 where
1475 LocationT: IntoUrl + Display + Clone,
1476 RequestT: Serialize,
1477 for<'de> SuccessT: Deserialize<'de>,
1478 {
1479 self.request(&Method::POST, &url, &Some(request), &None::<()>, &retry)
1480 }
1481
1482 fn put<LocationT, RequestT, SuccessT>(
1483 &self,
1484 url: LocationT,
1485 request: RequestT,
1486 ) -> Result<SuccessT>
1487 where
1488 LocationT: IntoUrl + Display + Clone,
1489 RequestT: Serialize,
1490 for<'de> SuccessT: Deserialize<'de>,
1491 {
1492 self.request(&Method::PUT, &url, &Some(request), &None::<()>, &Retry::Yes)
1493 }
1494
1495 fn raw_request<LocationT, RequestT, QueryT>(
1496 &self,
1497 method: &Method,
1498 url: &LocationT,
1499 body: &Option<RequestT>,
1500 query: &Option<QueryT>,
1501 retry: &Retry,
1502 accept_header: Option<HeaderValue>,
1503 ) -> Result<reqwest::blocking::Response>
1504 where
1505 LocationT: IntoUrl + Display + Clone,
1506 RequestT: Serialize,
1507 QueryT: Serialize,
1508 {
1509 let mut headers = self.headers.clone();
1510
1511 if let Some(accept_header) = accept_header {
1512 headers.insert(ACCEPT, accept_header);
1513 }
1514
1515 let do_request = || {
1516 let request = self
1517 .http_client
1518 .request(method.clone(), url.clone())
1519 .headers(headers.clone());
1520
1521 let request = match &query {
1522 Some(query) => request.query(query),
1523 None => request,
1524 };
1525 let request = match &body {
1526 Some(body) => request.json(body),
1527 None => request,
1528 };
1529 request.send()
1530 };
1531
1532 let result = match retry {
1533 Retry::Yes => self.with_retries(do_request),
1534 Retry::No => do_request(),
1535 };
1536 let http_response = result.map_err(|source| Error::ReqwestError {
1537 source,
1538 message: format!("{method} operation failed."),
1539 })?;
1540
1541 Ok(http_response)
1542 }
1543
1544 fn splitable_request<LocationT, RequestT, SuccessT, QueryT>(
1545 &self,
1546 method: Method,
1547 url: LocationT,
1548 body: RequestT,
1549 query: Option<QueryT>,
1550 retry: Retry,
1551 ) -> Result<SplitableRequestResponse<SuccessT>>
1552 where
1553 LocationT: IntoUrl + Display + Clone,
1554 RequestT: Serialize + SplittableRequest + Clone,
1555 QueryT: Serialize + Clone,
1556 for<'de> SuccessT: Deserialize<'de> + ReducibleResponse + Clone + Default,
1557 {
1558 debug!("Attempting {method} `{url}`");
1559 let result: Result<SuccessT> =
1560 self.request(&method, &url, &Some(body.clone()), &query, &retry);
1561
1562 fn should_split(error: &Error) -> bool {
1563 if let Error::Api { status_code, .. } = error {
1564 *status_code == reqwest::StatusCode::UNPROCESSABLE_ENTITY
1565 || *status_code == reqwest::StatusCode::BAD_REQUEST
1566 } else if let Error::BadJsonResponse(_) = error {
1567 true
1570 } else {
1571 false
1572 }
1573 }
1574
1575 match result {
1576 Ok(response) => Ok(SplitableRequestResponse {
1577 response,
1578 num_failed: 0,
1579 }),
1580 Err(error) if should_split(&error) => {
1581 let mut num_failed = 0;
1582 let response = body
1583 .split()
1584 .filter_map(|request| {
1585 match self.request(&method, &url, &Some(request), &query, &retry) {
1586 Ok(response) => Some(response),
1587 Err(_) => {
1588 num_failed += 1;
1589 None
1590 }
1591 }
1592 })
1593 .fold(SuccessT::empty(), |merged, next: SuccessT| {
1594 merged.merge(next)
1595 });
1596
1597 Ok(SplitableRequestResponse {
1598 num_failed,
1599 response,
1600 })
1601 }
1602 Err(error) => Err(error),
1603 }
1604 }
1605
1606 fn request<LocationT, RequestT, SuccessT, QueryT>(
1607 &self,
1608 method: &Method,
1609 url: &LocationT,
1610 body: &Option<RequestT>,
1611 query: &Option<QueryT>,
1612 retry: &Retry,
1613 ) -> Result<SuccessT>
1614 where
1615 LocationT: IntoUrl + Display + Clone,
1616 RequestT: Serialize,
1617 QueryT: Serialize + Clone,
1618 for<'de> SuccessT: Deserialize<'de>,
1619 {
1620 debug!("Attempting {} `{}`", method, url);
1621 let http_response = self.raw_request(method, url, body, query, retry, None)?;
1622
1623 let status = http_response.status();
1624 http_response
1625 .json::<Response<SuccessT>>()
1626 .map_err(Error::BadJsonResponse)?
1627 .into_result(status)
1628 }
1629
1630 fn with_retries(
1631 &self,
1632 send_request: impl Fn() -> ReqwestResult<HttpResponse>,
1633 ) -> ReqwestResult<HttpResponse> {
1634 match &self.retrier {
1635 Some(retrier) => retrier.with_retries(send_request),
1636 None => send_request(),
1637 }
1638 }
1639}
1640
1641#[derive(Copy, Clone)]
1642enum Retry {
1643 Yes,
1644 No,
1645}
1646
1647pub struct DatasetQueryIter<'a> {
1648 client: &'a Client,
1649 dataset_name: &'a DatasetFullName,
1650 done: bool,
1651 params: &'a mut QueryRequestParams,
1652}
1653
1654impl<'a> DatasetQueryIter<'a> {
1655 fn new(
1656 client: &'a Client,
1657 dataset_name: &'a DatasetFullName,
1658 params: &'a mut QueryRequestParams,
1659 ) -> Self {
1660 Self {
1661 client,
1662 dataset_name,
1663 done: false,
1664 params,
1665 }
1666 }
1667}
1668
1669impl Iterator for DatasetQueryIter<'_> {
1670 type Item = Result<Vec<AnnotatedComment>>;
1671
1672 fn next(&mut self) -> Option<Self::Item> {
1673 if self.done {
1674 return None;
1675 }
1676
1677 let response = self.client.query_dataset(self.dataset_name, self.params);
1678 Some(response.map(|page| {
1679 self.params.continuation = page.continuation;
1680 self.done = self.params.continuation.is_none();
1681 page.results
1682 }))
1683 }
1684}
1685
1686pub enum ContinuationKind {
1687 Timestamp(DateTime<Utc>),
1688 Continuation(Continuation),
1689}
1690
1691pub struct EmailsIter<'a> {
1692 client: &'a Client,
1693 bucket_name: &'a BucketFullName,
1694 continuation: Option<EmailContinuation>,
1695 done: bool,
1696 page_size: usize,
1697}
1698
1699impl<'a> EmailsIter<'a> {
1700 pub const DEFAULT_PAGE_SIZE: usize = 64;
1702 pub const MAX_PAGE_SIZE: usize = 256;
1704
1705 fn new(client: &'a Client, bucket_name: &'a BucketFullName, page_size: Option<usize>) -> Self {
1706 Self {
1707 client,
1708 bucket_name,
1709 continuation: None,
1710 done: false,
1711 page_size: page_size.unwrap_or(Self::DEFAULT_PAGE_SIZE),
1712 }
1713 }
1714}
1715
1716impl Iterator for EmailsIter<'_> {
1717 type Item = Result<Vec<Email>>;
1718
1719 fn next(&mut self) -> Option<Self::Item> {
1720 if self.done {
1721 return None;
1722 }
1723 let response = self.client.get_emails_iter_page(
1724 self.bucket_name,
1725 self.continuation.as_ref(),
1726 self.page_size,
1727 );
1728 Some(response.map(|page| {
1729 self.continuation = page.continuation;
1730 self.done = self.continuation.is_none();
1731 page.emails
1732 }))
1733 }
1734}
1735
1736pub struct CommentsIter<'a> {
1737 client: &'a Client,
1738 source_name: &'a SourceFullName,
1739 continuation: Option<ContinuationKind>,
1740 done: bool,
1741 page_size: usize,
1742 to_timestamp: Option<DateTime<Utc>>,
1743}
1744
1745#[derive(Debug, Default)]
1746pub struct CommentsIterTimerange {
1747 pub from: Option<DateTime<Utc>>,
1748 pub to: Option<DateTime<Utc>>,
1749}
1750impl<'a> CommentsIter<'a> {
1751 pub const DEFAULT_PAGE_SIZE: usize = 64;
1753 pub const MAX_PAGE_SIZE: usize = 256;
1755
1756 fn new(
1757 client: &'a Client,
1758 source_name: &'a SourceFullName,
1759 page_size: Option<usize>,
1760 timerange: CommentsIterTimerange,
1761 ) -> Self {
1762 let (from_timestamp, to_timestamp) = (timerange.from, timerange.to);
1763 Self {
1764 client,
1765 source_name,
1766 to_timestamp,
1767 continuation: from_timestamp.map(ContinuationKind::Timestamp),
1768 done: false,
1769 page_size: page_size.unwrap_or(Self::DEFAULT_PAGE_SIZE),
1770 }
1771 }
1772}
1773
1774impl Iterator for CommentsIter<'_> {
1775 type Item = Result<Vec<Comment>>;
1776
1777 fn next(&mut self) -> Option<Self::Item> {
1778 if self.done {
1779 return None;
1780 }
1781 let response = self.client.get_comments_iter_page(
1782 self.source_name,
1783 self.continuation.as_ref(),
1784 self.to_timestamp,
1785 self.page_size,
1786 );
1787 Some(response.map(|page| {
1788 self.continuation = page.continuation.map(ContinuationKind::Continuation);
1789 self.done = self.continuation.is_none();
1790 page.comments
1791 }))
1792 }
1793}
1794
1795pub struct LabellingsIter<'a> {
1796 client: &'a Client,
1797 dataset_name: &'a DatasetFullName,
1798 source_id: &'a SourceId,
1799 return_predictions: bool,
1800 after: Option<GetLabellingsAfter>,
1801 limit: Option<usize>,
1802 done: bool,
1803}
1804
1805impl<'a> LabellingsIter<'a> {
1806 fn new(
1807 client: &'a Client,
1808 dataset_name: &'a DatasetFullName,
1809 source_id: &'a SourceId,
1810 return_predictions: bool,
1811 limit: Option<usize>,
1812 ) -> Self {
1813 Self {
1814 client,
1815 dataset_name,
1816 source_id,
1817 return_predictions,
1818 after: None,
1819 limit,
1820 done: false,
1821 }
1822 }
1823}
1824
1825impl Iterator for LabellingsIter<'_> {
1826 type Item = Result<Vec<AnnotatedComment>>;
1827
1828 fn next(&mut self) -> Option<Self::Item> {
1829 if self.done {
1830 return None;
1831 }
1832 let response = self.client.get_labellings_in_bulk(
1833 self.dataset_name,
1834 GetLabellingsInBulk {
1835 source_id: self.source_id,
1836 return_predictions: &self.return_predictions,
1837 after: &self.after,
1838 limit: &self.limit,
1839 },
1840 );
1841 Some(response.map(|page| {
1842 if self.after == page.after && !page.results.is_empty() {
1843 panic!("Labellings API did not increment pagination continuation");
1844 }
1845 self.after = page.after;
1846 if page.results.is_empty() {
1847 self.done = true;
1848 }
1849 page.results
1850 }))
1851 }
1852}
1853
1854#[derive(Debug)]
1855struct Endpoints {
1856 base: Url,
1857 datasets: Url,
1858 sources: Url,
1859 buckets: Url,
1860 users: Url,
1861 current_user: Url,
1862 projects: Url,
1863}
1864
1865#[derive(Debug, Serialize, Clone, Copy)]
1866struct NoChargeQuery {
1867 no_charge: bool,
1868}
1869
1870fn construct_endpoint(base: &Url, segments: &[&str]) -> Result<Url> {
1871 let mut endpoint = base.clone();
1872
1873 let mut endpoint_segments = endpoint
1874 .path_segments_mut()
1875 .map_err(|_| Error::BadEndpoint {
1876 endpoint: base.clone(),
1877 })?;
1878
1879 for segment in segments {
1880 endpoint_segments.push(segment);
1881 }
1882
1883 drop(endpoint_segments);
1884
1885 Ok(endpoint)
1886}
1887
1888impl Endpoints {
1889 pub fn new(base: Url) -> Result<Self> {
1890 let datasets = construct_endpoint(&base, &["api", "v1", "datasets"])?;
1891 let sources = construct_endpoint(&base, &["api", "v1", "sources"])?;
1892 let buckets = construct_endpoint(&base, &["api", "_private", "buckets"])?;
1893 let users = construct_endpoint(&base, &["api", "_private", "users"])?;
1894 let current_user = construct_endpoint(&base, &["auth", "user"])?;
1895 let projects = construct_endpoint(&base, &["api", "_private", "projects"])?;
1896
1897 Ok(Endpoints {
1898 base,
1899 datasets,
1900 sources,
1901 buckets,
1902 users,
1903 current_user,
1904 projects,
1905 })
1906 }
1907
1908 fn refresh_user_permissions(&self) -> Result<Url> {
1909 construct_endpoint(&self.base, &["auth", "refresh-user-permissions"])
1910 }
1911
1912 fn label_group(
1913 &self,
1914 dataset_name: &DatasetFullName,
1915 label_group: LabelGroupName,
1916 ) -> Result<Url> {
1917 construct_endpoint(
1918 &self.base,
1919 &[
1920 "api",
1921 "_private",
1922 "datasets",
1923 &dataset_name.0,
1924 "labels",
1925 &label_group.0,
1926 ],
1927 )
1928 }
1929
1930 fn ixp_datasets(&self) -> Result<Url> {
1931 construct_endpoint(&self.base, &["api", "_private", "ixp", "datasets"])
1932 }
1933
1934 fn ixp_documents(&self, source_id: &SourceId) -> Result<Url> {
1935 construct_endpoint(
1936 &self.base,
1937 &[
1938 "api",
1939 "_private",
1940 "sources",
1941 &format!("id:{0}", source_id.0),
1942 "documents",
1943 ],
1944 )
1945 }
1946
1947 fn ixp_document(&self, source_id: &SourceId, comment_id: &CommentId) -> Result<Url> {
1948 construct_endpoint(
1949 &self.base,
1950 &[
1951 "api",
1952 "_private",
1953 "sources",
1954 &format!("id:{0}", source_id.0),
1955 "documents",
1956 &comment_id.0,
1957 ],
1958 )
1959 }
1960
1961 fn keyed_sync_states(&self, bucket_id: &BucketId) -> Result<Url> {
1962 construct_endpoint(
1963 &self.base,
1964 &[
1965 "api",
1966 "_private",
1967 "buckets",
1968 &format!("id:{}", bucket_id.0),
1969 "keyed-sync-states/",
1970 ],
1971 )
1972 }
1973
1974 fn keyed_sync_state(&self, bucket_id: &BucketId, id: &KeyedSyncStateId) -> Result<Url> {
1975 construct_endpoint(
1976 &self.base,
1977 &[
1978 "api",
1979 "_private",
1980 "buckets",
1981 &format!("id:{}", bucket_id.0),
1982 "keyed-sync-state",
1983 &id.0,
1984 ],
1985 )
1986 }
1987
1988 fn query_keyed_sync_state_ids(&self, bucket_id: &BucketId) -> Result<Url> {
1989 construct_endpoint(
1990 &self.base,
1991 &[
1992 "api",
1993 "_private",
1994 "buckets",
1995 &format!("id:{}", bucket_id.0),
1996 "keyed-sync-state-ids",
1997 ],
1998 )
1999 }
2000
2001 fn audit_events_query(&self) -> Result<Url> {
2002 construct_endpoint(&self.base, &["api", "v1", "audit_events", "query"])
2003 }
2004
2005 fn integrations(&self) -> Result<Url> {
2006 construct_endpoint(&self.base, &["api", "_private", "integrations"])
2007 }
2008
2009 fn integration(&self, name: &IntegrationFullName) -> Result<Url> {
2010 construct_endpoint(&self.base, &["api", "_private", "integrations", &name.0])
2011 }
2012
2013 fn attachment_reference(&self, reference: &AttachmentReference) -> Result<Url> {
2014 construct_endpoint(&self.base, &["api", "v1", "attachments", &reference.0])
2015 }
2016
2017 fn attachment_upload(
2018 &self,
2019 source_id: &SourceId,
2020 comment_id: &CommentId,
2021 attachment_index: usize,
2022 ) -> Result<Url> {
2023 construct_endpoint(
2024 &self.base,
2025 &[
2026 "api",
2027 "_private",
2028 "sources",
2029 &format!("id:{}", source_id.0),
2030 "comments",
2031 &comment_id.0,
2032 "attachments",
2033 &attachment_index.to_string(),
2034 ],
2035 )
2036 }
2037 fn latest_validation(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2038 construct_endpoint(
2039 &self.base,
2040 &[
2041 "api",
2042 "_private",
2043 "datasets",
2044 &dataset_name.0,
2045 "labellers",
2046 "latest",
2047 "validation",
2048 ],
2049 )
2050 }
2051
2052 fn validation(
2053 &self,
2054 dataset_name: &DatasetFullName,
2055 model_version: &ModelVersion,
2056 ) -> Result<Url> {
2057 construct_endpoint(
2058 &self.base,
2059 &[
2060 "api",
2061 "_private",
2062 "datasets",
2063 &dataset_name.0,
2064 "labellers",
2065 &model_version.0.to_string(),
2066 "validation",
2067 ],
2068 )
2069 }
2070
2071 fn label_validation(
2072 &self,
2073 dataset_name: &DatasetFullName,
2074 model_version: &ModelVersion,
2075 ) -> Result<Url> {
2076 construct_endpoint(
2077 &self.base,
2078 &[
2079 "api",
2080 "_private",
2081 "datasets",
2082 &dataset_name.0,
2083 "labellers",
2084 &model_version.0.to_string(),
2085 "label-validation",
2086 ],
2087 )
2088 }
2089 fn bucket_statistics(&self, bucket_name: &BucketFullName) -> Result<Url> {
2090 construct_endpoint(
2091 &self.base,
2092 &["api", "_private", "buckets", &bucket_name.0, "statistics"],
2093 )
2094 }
2095
2096 fn dataset_summary(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2097 construct_endpoint(
2098 &self.base,
2099 &["api", "_private", "datasets", &dataset_name.0, "summary"],
2100 )
2101 }
2102
2103 fn query_dataset(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2104 construct_endpoint(
2105 &self.base,
2106 &["api", "_private", "datasets", &dataset_name.0, "query"],
2107 )
2108 }
2109
2110 fn streams(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2111 construct_endpoint(
2112 &self.base,
2113 &["api", "v1", "datasets", &dataset_name.0, "streams"],
2114 )
2115 }
2116
2117 fn stream(&self, stream_name: &StreamFullName) -> Result<Url> {
2118 construct_endpoint(
2119 &self.base,
2120 &[
2121 "api",
2122 "v1",
2123 "datasets",
2124 &stream_name.dataset.0,
2125 "streams",
2126 &stream_name.stream.0,
2127 ],
2128 )
2129 }
2130
2131 fn stream_fetch(&self, stream_name: &StreamFullName) -> Result<Url> {
2132 construct_endpoint(
2133 &self.base,
2134 &[
2135 "api",
2136 "v1",
2137 "datasets",
2138 &stream_name.dataset.0,
2139 "streams",
2140 &stream_name.stream.0,
2141 "fetch",
2142 ],
2143 )
2144 }
2145
2146 fn stream_advance(&self, stream_name: &StreamFullName) -> Result<Url> {
2147 construct_endpoint(
2148 &self.base,
2149 &[
2150 "api",
2151 "v1",
2152 "datasets",
2153 &stream_name.dataset.0,
2154 "streams",
2155 &stream_name.stream.0,
2156 "advance",
2157 ],
2158 )
2159 }
2160
2161 fn stream_reset(&self, stream_name: &StreamFullName) -> Result<Url> {
2162 construct_endpoint(
2163 &self.base,
2164 &[
2165 "api",
2166 "v1",
2167 "datasets",
2168 &stream_name.dataset.0,
2169 "streams",
2170 &stream_name.stream.0,
2171 "reset",
2172 ],
2173 )
2174 }
2175
2176 fn stream_exceptions(&self, stream_name: &StreamFullName) -> Result<Url> {
2177 construct_endpoint(
2178 &self.base,
2179 &[
2180 "api",
2181 "v1",
2182 "datasets",
2183 &stream_name.dataset.0,
2184 "streams",
2185 &stream_name.stream.0,
2186 "exceptions",
2187 ],
2188 )
2189 }
2190
2191 fn recent_comments(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2192 construct_endpoint(
2193 &self.base,
2194 &["api", "_private", "datasets", &dataset_name.0, "recent"],
2195 )
2196 }
2197
2198 fn dataset_statistics(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2199 construct_endpoint(
2200 &self.base,
2201 &["api", "_private", "datasets", &dataset_name.0, "statistics"],
2202 )
2203 }
2204
2205 fn source_statistics(&self, source_name: &SourceFullName) -> Result<Url> {
2206 construct_endpoint(
2207 &self.base,
2208 &["api", "v1", "sources", &source_name.0, "statistics"],
2209 )
2210 }
2211
2212 fn user_by_id(&self, user_id: &UserId) -> Result<Url> {
2213 construct_endpoint(&self.base, &["api", "_private", "users", &user_id.0])
2214 }
2215
2216 fn source_by_id(&self, source_id: &SourceId) -> Result<Url> {
2217 construct_endpoint(
2218 &self.base,
2219 &["api", "v1", "sources", &format!("id:{}", source_id.0)],
2220 )
2221 }
2222
2223 fn source_by_name(&self, source_name: &SourceFullName) -> Result<Url> {
2224 construct_endpoint(&self.base, &["api", "v1", "sources", &source_name.0])
2225 }
2226
2227 fn quotas(&self) -> Result<Url> {
2228 construct_endpoint(&self.base, &["api", "_private", "quotas"])
2229 }
2230
2231 fn quota(&self, tenant_id: &TenantId, tenant_quota_kind: TenantQuotaKind) -> Result<Url> {
2232 construct_endpoint(
2233 &self.base,
2234 &[
2235 "api",
2236 "_private",
2237 "quotas",
2238 &tenant_id.to_string(),
2239 &tenant_quota_kind.to_string(),
2240 ],
2241 )
2242 }
2243
2244 fn put_comments(&self, source_name: &SourceFullName) -> Result<Url> {
2245 construct_endpoint(
2246 &self.base,
2247 &["api", "_private", "sources", &source_name.0, "comments"],
2248 )
2249 }
2250
2251 fn comments(&self, source_name: &SourceFullName) -> Result<Url> {
2252 construct_endpoint(
2253 &self.base,
2254 &["api", "_private", "sources", &source_name.0, "comments"],
2255 )
2256 }
2257
2258 fn comment_by_id(&self, source_name: &SourceFullName, comment_id: &CommentId) -> Result<Url> {
2259 construct_endpoint(
2260 &self.base,
2261 &[
2262 "api",
2263 "v1",
2264 "sources",
2265 &source_name.0,
2266 "comments",
2267 &comment_id.0,
2268 ],
2269 )
2270 }
2271
2272 fn comments_v1(&self, source_name: &SourceFullName) -> Result<Url> {
2273 construct_endpoint(
2274 &self.base,
2275 &["api", "v1", "sources", &source_name.0, "comments"],
2276 )
2277 }
2278
2279 fn sync_comments(&self, source_name: &SourceFullName) -> Result<Url> {
2280 construct_endpoint(
2281 &self.base,
2282 &["api", "v1", "sources", &source_name.0, "sync"],
2283 )
2284 }
2285
2286 fn sync_comments_raw_emails(&self, source_name: &SourceFullName) -> Result<Url> {
2287 construct_endpoint(
2288 &self.base,
2289 &["api", "v1", "sources", &source_name.0, "sync-raw-emails"],
2290 )
2291 }
2292
2293 fn comment_audio(&self, source_id: &SourceId, comment_id: &CommentId) -> Result<Url> {
2294 construct_endpoint(
2295 &self.base,
2296 &[
2297 "api",
2298 "_private",
2299 "sources",
2300 &format!("id:{}", source_id.0),
2301 "comments",
2302 &comment_id.0,
2303 "audio",
2304 ],
2305 )
2306 }
2307
2308 fn get_emails(&self, bucket_name: &BucketFullName) -> Result<Url> {
2309 construct_endpoint(
2310 &self.base,
2311 &["api", "_private", "buckets", &bucket_name.0, "emails"],
2312 )
2313 }
2314
2315 fn put_emails(&self, bucket_name: &BucketFullName) -> Result<Url> {
2316 construct_endpoint(
2317 &self.base,
2318 &["api", "_private", "buckets", &bucket_name.0, "emails"],
2319 )
2320 }
2321
2322 fn post_user(&self, user_id: &UserId) -> Result<Url> {
2323 construct_endpoint(&self.base, &["api", "_private", "users", &user_id.0])
2324 }
2325
2326 fn dataset_by_id(&self, dataset_id: &DatasetId) -> Result<Url> {
2327 construct_endpoint(
2328 &self.base,
2329 &["api", "v1", "datasets", &format!("id:{}", dataset_id.0)],
2330 )
2331 }
2332
2333 fn dataset_by_name(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2334 construct_endpoint(&self.base, &["api", "v1", "datasets", &dataset_name.0])
2335 }
2336
2337 fn get_labellings(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2338 construct_endpoint(
2339 &self.base,
2340 &["api", "_private", "datasets", &dataset_name.0, "labellings"],
2341 )
2342 }
2343
2344 fn labellers(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2345 construct_endpoint(
2346 &self.base,
2347 &["api", "_private", "datasets", &dataset_name.0, "labellers"],
2348 )
2349 }
2350
2351 fn get_comment_predictions(
2352 &self,
2353 dataset_name: &DatasetFullName,
2354 model_version: &ModelVersion,
2355 ) -> Result<Url> {
2356 construct_endpoint(
2357 &self.base,
2358 &[
2359 "api",
2360 "v1",
2361 "datasets",
2362 &dataset_name.0,
2363 "labellers",
2364 &model_version.0.to_string(),
2365 "predict-comments",
2366 ],
2367 )
2368 }
2369
2370 fn post_labelling(
2371 &self,
2372 dataset_name: &DatasetFullName,
2373 comment_uid: &CommentUid,
2374 ) -> Result<Url> {
2375 construct_endpoint(
2376 &self.base,
2377 &[
2378 "api",
2379 "_private",
2380 "datasets",
2381 &dataset_name.0,
2382 "labellings",
2383 &comment_uid.0,
2384 ],
2385 )
2386 }
2387
2388 fn bucket_by_id(&self, bucket_id: &BucketId) -> Result<Url> {
2389 construct_endpoint(
2390 &self.base,
2391 &["api", "_private", "buckets", &format!("id:{}", bucket_id.0)],
2392 )
2393 }
2394
2395 fn bucket_by_name(&self, bucket_name: &BucketFullName) -> Result<Url> {
2396 construct_endpoint(&self.base, &["api", "_private", "buckets", &bucket_name.0])
2397 }
2398
2399 fn project_by_name(&self, project_name: &ProjectName) -> Result<Url> {
2400 construct_endpoint(
2401 &self.base,
2402 &["api", "_private", "projects", &project_name.0],
2403 )
2404 }
2405
2406 fn welcome_email(&self, user_id: &UserId) -> Result<Url> {
2407 construct_endpoint(
2408 &self.base,
2409 &["api", "_private", "users", &user_id.0, "welcome-email"],
2410 )
2411 }
2412}
2413
2414const DEFAULT_HTTP_TIMEOUT_SECONDS: u64 = 240;
2415
2416fn build_http_client(config: &Config) -> Result<HttpClient> {
2417 let mut builder = HttpClient::builder()
2418 .gzip(true)
2419 .danger_accept_invalid_certs(config.accept_invalid_certificates)
2420 .timeout(Some(Duration::from_secs(DEFAULT_HTTP_TIMEOUT_SECONDS)));
2421
2422 if let Some(proxy) = config.proxy.clone() {
2423 builder = builder.proxy(Proxy::all(proxy).map_err(Error::BuildHttpClient)?);
2424 }
2425 builder.build().map_err(Error::BuildHttpClient)
2426}
2427
2428fn build_headers(config: &Config) -> Result<HeaderMap> {
2429 let mut headers = HeaderMap::new();
2430 headers.insert(
2431 header::AUTHORIZATION,
2432 HeaderValue::from_str(&format!("Bearer {}", &config.token.0)).map_err(|_| {
2433 Error::BadToken {
2434 token: config.token.0.clone(),
2435 }
2436 })?,
2437 );
2438 Ok(headers)
2439}
2440
2441fn id_list_query<'a>(ids: impl Iterator<Item = &'a String>) -> Vec<(&'static str, &'a str)> {
2442 ids.map(|id| ("id", id.as_str())).collect()
2446}
2447
2448pub static DEFAULT_ENDPOINT: Lazy<Url> =
2449 Lazy::new(|| Url::parse("https://reinfer.dev").expect("Default URL is well-formed"));
2450
2451#[cfg(test)]
2452mod tests {
2453 use super::*;
2454
2455 #[test]
2456 fn test_construct_endpoint() {
2457 let url = construct_endpoint(
2458 &Url::parse("https://cloud.uipath.com/org/tenant/reinfer_").unwrap(),
2459 &["api", "v1", "sources", "project", "source", "sync"],
2460 )
2461 .unwrap();
2462
2463 assert_eq!(
2464 url.to_string(),
2465 "https://cloud.uipath.com/org/tenant/reinfer_/api/v1/sources/project/source/sync"
2466 )
2467 }
2468
2469 #[test]
2470 fn test_id_list_query() {
2471 assert_eq!(id_list_query(Vec::new().iter()), Vec::new());
2472 assert_eq!(
2473 id_list_query(["foo".to_owned()].iter()),
2474 vec![("id", "foo")]
2475 );
2476 assert_eq!(
2477 id_list_query(
2478 [
2479 "Stream".to_owned(),
2480 "River".to_owned(),
2481 "Waterfall".to_owned()
2482 ]
2483 .iter()
2484 ),
2485 [("id", "Stream"), ("id", "River"), ("id", "Waterfall"),]
2486 );
2487 }
2488}