Skip to main content

reinfer_client/
lib.rs

1#![deny(clippy::all)]
2mod error;
3pub mod resources;
4pub mod retry;
5
6use chrono::{DateTime, Utc};
7use http::{header::ACCEPT, Method};
8use log::debug;
9use once_cell::sync::Lazy;
10use reqwest::{
11    blocking::{
12        multipart::{Form, Part},
13        Client as HttpClient, Response as HttpResponse,
14    },
15    header::{self, HeaderMap, HeaderValue},
16    IntoUrl, Proxy, Result as ReqwestResult,
17};
18use resources::{
19    attachments::UploadAttachmentResponse,
20    auth::{RefreshUserPermissionsRequest, RefreshUserPermissionsResponse},
21    bucket::{
22        GetKeyedSyncStateIdsRequest, GetKeyedSyncStateIdsResponse, GetKeyedSyncStatesResponse,
23        KeyedSyncState, KeyedSyncStateId,
24    },
25    bucket_statistics::GetBucketStatisticsResponse,
26    comment::{AttachmentReference, CommentTimestampFilter},
27    dataset::{
28        CreateIxpDatasetRequest, CreateIxpDatasetResponse, GetAllModelsInDatasetRequest,
29        GetAllModelsInDatasetRespone, IxpDatasetNew, QueryRequestParams, QueryResponse,
30        StatisticsRequestParams as DatasetStatisticsRequestParams, SummaryRequestParams,
31        SummaryResponse, UploadIxpDocumentResponse, UserModelMetadata,
32    },
33    documents::{Document, SyncRawEmailsRequest, SyncRawEmailsResponse},
34    email::{Email, GetEmailResponse},
35    integration::{
36        GetIntegrationResponse, GetIntegrationsResponse, Integration, NewIntegration,
37        PostIntegrationRequest, PostIntegrationResponse, PutIntegrationRequest,
38        PutIntegrationResponse,
39    },
40    label_def::{CreateOrUpdateLabelDefsBulkRequest, CreateOrUpdateLabelDefsBulkResponse},
41    project::ForceDeleteProject,
42    quota::{GetQuotasResponse, Quota},
43    source::StatisticsRequestParams as SourceStatisticsRequestParams,
44    stream::{GetStreamResponse, NewStream, PutStreamRequest, PutStreamResponse},
45    tenant_id::UiPathTenantId,
46    validation::{
47        LabelValidation, LabelValidationRequest, LabelValidationResponse, ValidationResponse,
48    },
49};
50use serde::{Deserialize, Serialize};
51use serde_json::json;
52use std::{
53    cell::Cell,
54    fmt::{Debug, Display},
55    io::Read,
56    path::{Path, PathBuf},
57    time::Duration,
58};
59use url::Url;
60
61use crate::resources::{
62    audit::{AuditQueryFilter, AuditQueryRequest, AuditQueryResponse},
63    bucket::{
64        CreateRequest as CreateBucketRequest, CreateResponse as CreateBucketResponse,
65        GetAvailableResponse as GetAvailableBucketsResponse, GetResponse as GetBucketResponse,
66    },
67    bucket_statistics::Statistics as BucketStatistics,
68    comment::{
69        GetAnnotationsResponse, GetCommentResponse, GetLabellingsAfter, GetPredictionsResponse,
70        GetRecentRequest, PutCommentsRequest, PutCommentsResponse, RecentCommentsPage,
71        SyncCommentsRequest, UpdateAnnotationsRequest,
72    },
73    dataset::{
74        CreateRequest as CreateDatasetRequest, CreateResponse as CreateDatasetResponse,
75        GetAvailableResponse as GetAvailableDatasetsResponse, GetResponse as GetDatasetResponse,
76        UpdateRequest as UpdateDatasetRequest, UpdateResponse as UpdateDatasetResponse,
77    },
78    email::{PutEmailsRequest, PutEmailsResponse},
79    project::{
80        CreateProjectRequest, CreateProjectResponse, GetProjectResponse, GetProjectsResponse,
81        UpdateProjectRequest, UpdateProjectResponse,
82    },
83    quota::{CreateQuota, TenantQuotaKind},
84    source::{
85        CreateRequest as CreateSourceRequest, CreateResponse as CreateSourceResponse,
86        GetAvailableResponse as GetAvailableSourcesResponse, GetResponse as GetSourceResponse,
87        UpdateRequest as UpdateSourceRequest, UpdateResponse as UpdateSourceResponse,
88    },
89    statistics::GetResponse as GetStatisticsResponse,
90    stream::{
91        AdvanceRequest as StreamAdvanceRequest, FetchRequest as StreamFetchRequest,
92        GetStreamsResponse, ResetRequest as StreamResetRequest,
93        TagExceptionsRequest as TagStreamExceptionsRequest,
94    },
95    tenant_id::TenantId,
96    user::{
97        CreateRequest as CreateUserRequest, CreateResponse as CreateUserResponse,
98        GetAvailableResponse as GetAvailableUsersResponse,
99        GetCurrentResponse as GetCurrentUserResponse, GetResponse as GetUserResponse,
100        PostUserRequest, PostUserResponse, WelcomeEmailResponse,
101    },
102    EmptySuccess, Response,
103};
104
105use crate::retry::{Retrier, RetryConfig};
106
107pub use crate::{
108    error::{Error, Result},
109    resources::{
110        bucket::{
111            Bucket, BucketType, FullName as BucketFullName, Id as BucketId,
112            Identifier as BucketIdentifier, Name as BucketName, NewBucket,
113        },
114        comment::{
115            AnnotatedComment, Comment, CommentFilter, CommentPredictionsThreshold,
116            CommentsIterPage, Continuation, EitherLabelling, Entities, Entity,
117            GetCommentPredictionsRequest, HasAnnotations, Id as CommentId, Label, Labelling,
118            Message, MessageBody, MessageSignature, MessageSubject, NewAnnotatedComment,
119            NewComment, NewEntities, NewLabelling, NewMoonForm, PredictedLabel, Prediction,
120            PropertyMap, PropertyValue, Sentiment, SyncCommentsResponse, TriggerLabelThreshold,
121            Uid as CommentUid,
122        },
123        dataset::{
124            Dataset, FullName as DatasetFullName, Id as DatasetId, Identifier as DatasetIdentifier,
125            ModelVersion, Name as DatasetName, NewDataset, UpdateDataset,
126        },
127        email::{
128            Continuation as EmailContinuation, EmailsIterPage, Id as EmailId, Mailbox, MimeContent,
129            NewEmail,
130        },
131        entity_def::{EntityDef, Id as EntityDefId, Name as EntityName, NewEntityDef},
132        integration::FullName as IntegrationFullName,
133        label_def::{
134            LabelDef, LabelDefPretrained, MoonFormFieldDef, Name as LabelName, NewLabelDef,
135            NewLabelDefPretrained, PretrainedId as LabelDefPretrainedId,
136        },
137        label_group::{
138            LabelGroup, Name as LabelGroupName, NewLabelGroup, DEFAULT_LABEL_GROUP_NAME,
139        },
140        project::{NewProject, Project, ProjectName, UpdateProject},
141        source::{
142            FullName as SourceFullName, Id as SourceId, Identifier as SourceIdentifier,
143            Name as SourceName, NewSource, Source, SourceKind, TransformTag, UpdateSource,
144        },
145        statistics::Statistics as CommentStatistics,
146        stream::{
147            Batch as StreamBatch, FullName as StreamFullName, SequenceId as StreamSequenceId,
148            Stream, StreamException, StreamExceptionMetadata,
149        },
150        user::{
151            Email as UserEmail, GlobalPermission, Id as UserId, Identifier as UserIdentifier,
152            ModifiedPermissions, NewUser, ProjectPermission, UpdateUser, User, Username,
153        },
154    },
155};
156
157#[derive(Clone, Debug, PartialEq, Eq)]
158pub struct Token(pub String);
159
160pub trait SplittableRequest {
161    fn split(self) -> impl Iterator<Item = Self>
162    where
163        Self: Sized;
164
165    fn count(&self) -> usize;
166}
167
168pub struct SplitableRequestResponse<ResponseT>
169where
170    for<'de> ResponseT: Deserialize<'de> + ReducibleResponse,
171{
172    pub response: ResponseT,
173    pub num_failed: usize,
174}
175
176pub trait ReducibleResponse {
177    fn merge(self, _b: Self) -> Self
178    where
179        Self: std::default::Default,
180    {
181        Default::default()
182    }
183
184    fn empty() -> Self
185    where
186        Self: std::default::Default,
187    {
188        Default::default()
189    }
190}
191
192pub struct Config {
193    pub endpoint: Url,
194    pub token: Token,
195    pub accept_invalid_certificates: bool,
196    pub proxy: Option<Url>,
197    /// Retry settings to use, if any. This will apply to all requests except for POST requests
198    /// which are not idempotent (as they cannot be naively retried).
199    pub retry_config: Option<RetryConfig>,
200}
201
202impl Default for Config {
203    fn default() -> Self {
204        Config {
205            endpoint: DEFAULT_ENDPOINT.clone(),
206            token: Token("".to_owned()),
207            accept_invalid_certificates: false,
208            proxy: None,
209            retry_config: None,
210        }
211    }
212}
213
214#[derive(Debug)]
215pub struct Client {
216    endpoints: Endpoints,
217    http_client: HttpClient,
218    headers: HeaderMap,
219    retrier: Option<Retrier>,
220}
221
222#[derive(Serialize)]
223pub struct GetLabellingsInBulk<'a> {
224    pub source_id: &'a SourceId,
225    pub return_predictions: &'a bool,
226
227    #[serde(skip_serializing_if = "Option::is_none")]
228    pub after: &'a Option<GetLabellingsAfter>,
229
230    #[serde(skip_serializing_if = "Option::is_none")]
231    pub limit: &'a Option<usize>,
232}
233
234#[derive(Serialize)]
235pub struct GetCommentsIterPageQuery<'a> {
236    #[serde(skip_serializing_if = "Option::is_none")]
237    pub from_timestamp: Option<DateTime<Utc>>,
238    #[serde(skip_serializing_if = "Option::is_none")]
239    pub to_timestamp: Option<DateTime<Utc>>,
240    #[serde(skip_serializing_if = "Option::is_none")]
241    pub after: Option<&'a Continuation>,
242    pub limit: usize,
243    pub include_markup: bool,
244}
245
246#[derive(Serialize)]
247pub struct GetEmailsIterPageQuery<'a> {
248    #[serde(skip_serializing_if = "Option::is_none")]
249    pub continuation: Option<&'a EmailContinuation>,
250    pub limit: usize,
251}
252
253#[derive(Serialize)]
254pub struct GetCommentQuery {
255    pub include_markup: bool,
256}
257
258#[derive(Serialize)]
259pub struct GetEmailQuery {
260    pub id: String,
261}
262
263impl Client {
264    /// Create a new API client.
265    pub fn new(config: Config) -> Result<Client> {
266        let http_client = build_http_client(&config)?;
267        let headers = build_headers(&config)?;
268        let endpoints = Endpoints::new(config.endpoint)?;
269        let retrier = config.retry_config.map(Retrier::new);
270        Ok(Client {
271            endpoints,
272            http_client,
273            headers,
274            retrier,
275        })
276    }
277
278    /// Get the base url for the client
279    pub fn base_url(&self) -> &Url {
280        &self.endpoints.base
281    }
282
283    /// List all visible sources.
284    pub fn get_sources(&self) -> Result<Vec<Source>> {
285        Ok(self
286            .get::<_, GetAvailableSourcesResponse>(self.endpoints.sources.clone())?
287            .sources)
288    }
289
290    /// Get a source by either id or name.
291    pub fn get_user(&self, user: impl Into<UserIdentifier>) -> Result<User> {
292        Ok(match user.into() {
293            UserIdentifier::Id(user_id) => {
294                self.get::<_, GetUserResponse>(self.endpoints.user_by_id(&user_id)?)?
295                    .user
296            }
297        })
298    }
299
300    /// Get a source by either id or name.
301    pub fn get_source(&self, source: impl Into<SourceIdentifier>) -> Result<Source> {
302        Ok(match source.into() {
303            SourceIdentifier::Id(source_id) => {
304                self.get::<_, GetSourceResponse>(self.endpoints.source_by_id(&source_id)?)?
305                    .source
306            }
307            SourceIdentifier::FullName(source_name) => {
308                self.get::<_, GetSourceResponse>(self.endpoints.source_by_name(&source_name)?)?
309                    .source
310            }
311        })
312    }
313
314    pub fn create_label_defs_bulk(
315        &self,
316        dataset_name: &DatasetFullName,
317        label_group: LabelGroupName,
318        label_defs: Vec<NewLabelDef>,
319    ) -> Result<()> {
320        self.put::<_, _, CreateOrUpdateLabelDefsBulkResponse>(
321            self.endpoints.label_group(dataset_name, label_group)?,
322            CreateOrUpdateLabelDefsBulkRequest { label_defs },
323        )?;
324        Ok(())
325    }
326
327    /// Create a new source.
328    pub fn create_source(
329        &self,
330        source_name: &SourceFullName,
331        options: NewSource<'_>,
332    ) -> Result<Source> {
333        Ok(self
334            .put::<_, _, CreateSourceResponse>(
335                self.endpoints.source_by_name(source_name)?,
336                CreateSourceRequest { source: options },
337            )?
338            .source)
339    }
340
341    /// Update a source.
342    pub fn update_source(
343        &self,
344        source_name: &SourceFullName,
345        options: UpdateSource<'_>,
346    ) -> Result<Source> {
347        Ok(self
348            .post::<_, _, UpdateSourceResponse>(
349                self.endpoints.source_by_name(source_name)?,
350                UpdateSourceRequest { source: options },
351                Retry::Yes,
352            )?
353            .source)
354    }
355
356    /// Delete a source.
357    pub fn delete_source(&self, source: impl Into<SourceIdentifier>) -> Result<()> {
358        let source_id = match source.into() {
359            SourceIdentifier::Id(source_id) => source_id,
360            source @ SourceIdentifier::FullName(_) => self.get_source(source)?.id,
361        };
362        self.delete(self.endpoints.source_by_id(&source_id)?)
363    }
364
365    /// Set a quota
366    pub fn create_quota(
367        &self,
368        target_tenant_id: &TenantId,
369        tenant_quota_kind: TenantQuotaKind,
370        options: CreateQuota,
371    ) -> Result<()> {
372        self.post(
373            self.endpoints.quota(target_tenant_id, tenant_quota_kind)?,
374            options,
375            Retry::Yes,
376        )
377    }
378
379    /// Get quotas for current tenant
380    pub fn get_quotas(&self, tenant_id: &Option<UiPathTenantId>) -> Result<Vec<Quota>> {
381        Ok(self
382            .get::<_, GetQuotasResponse>(self.endpoints.quotas(tenant_id)?)?
383            .quotas)
384    }
385
386    /// Delete a user.
387    pub fn delete_user(&self, user: impl Into<UserIdentifier>) -> Result<()> {
388        let UserIdentifier::Id(user_id) = user.into();
389        self.delete(self.endpoints.user_by_id(&user_id)?)
390    }
391
392    /// Delete comments by id in a source.
393    pub fn delete_comments(
394        &self,
395        source: impl Into<SourceIdentifier>,
396        comments: &[CommentId],
397    ) -> Result<()> {
398        let source_full_name = match source.into() {
399            source @ SourceIdentifier::Id(_) => self.get_source(source)?.full_name(),
400            SourceIdentifier::FullName(source_full_name) => source_full_name,
401        };
402        self.delete_query(
403            self.endpoints.comments_v1(&source_full_name)?,
404            Some(&id_list_query(comments.iter().map(|uid| &uid.0))),
405        )
406    }
407
408    /// Get a page of comments from a source.
409    pub fn get_comments_iter_page(
410        &self,
411        source_name: &SourceFullName,
412        continuation: Option<&ContinuationKind>,
413        to_timestamp: Option<DateTime<Utc>>,
414        limit: usize,
415    ) -> Result<CommentsIterPage> {
416        // Comments are returned from the API in increasing order of their
417        // `timestamp` field.
418        let (from_timestamp, after) = match continuation {
419            // If we have a timestamp, then this is a request for the first page of
420            // a series of comments with timestamps starting from the given time.
421            Some(ContinuationKind::Timestamp(from_timestamp)) => (Some(*from_timestamp), None),
422            // If we have a continuation, then this is a request for page n+1 of
423            // a series of comments, where the continuation came from page n.
424            Some(ContinuationKind::Continuation(after)) => (None, Some(after)),
425            // Otherwise, this is a request for the first page of a series of comments
426            // with timestamps starting from the beginning of time.
427            None => (None, None),
428        };
429        let query_params = GetCommentsIterPageQuery {
430            from_timestamp,
431            to_timestamp,
432            after,
433            limit,
434            include_markup: true,
435        };
436        self.get_query(self.endpoints.comments(source_name)?, Some(&query_params))
437    }
438
439    /// Iterate through all comments for a given dataset query.
440    pub fn get_dataset_query_iter<'a>(
441        &'a self,
442        dataset_name: &'a DatasetFullName,
443        params: &'a mut QueryRequestParams,
444    ) -> DatasetQueryIter<'a> {
445        DatasetQueryIter::new(self, dataset_name, params)
446    }
447
448    /// Iterate through all comments in a source.
449    pub fn get_comments_iter<'a>(
450        &'a self,
451        source_name: &'a SourceFullName,
452        page_size: Option<usize>,
453        timerange: CommentsIterTimerange,
454    ) -> CommentsIter<'a> {
455        CommentsIter::new(self, source_name, page_size, timerange)
456    }
457
458    pub fn get_keyed_sync_state_ids(
459        &self,
460        bucket_id: &BucketId,
461        request: &GetKeyedSyncStateIdsRequest,
462    ) -> Result<Vec<KeyedSyncStateId>> {
463        Ok(self
464            .post::<_, _, GetKeyedSyncStateIdsResponse>(
465                self.endpoints.query_keyed_sync_state_ids(bucket_id)?,
466                Some(&request),
467                Retry::Yes,
468            )?
469            .keyed_sync_state_ids)
470    }
471
472    pub fn delete_keyed_sync_state(
473        &self,
474        bucket_id: &BucketId,
475        id: &KeyedSyncStateId,
476    ) -> Result<()> {
477        self.delete(self.endpoints.keyed_sync_state(bucket_id, id)?)
478    }
479
480    pub fn get_keyed_sync_states(&self, bucket_id: &BucketId) -> Result<Vec<KeyedSyncState>> {
481        Ok(self
482            .get::<_, GetKeyedSyncStatesResponse>(self.endpoints.keyed_sync_states(bucket_id)?)?
483            .keyed_sync_states)
484    }
485
486    /// Get a single of email from a bucket.
487    pub fn get_email(&self, bucket_name: &BucketFullName, id: EmailId) -> Result<Vec<Email>> {
488        let query_params = GetEmailQuery { id: id.0 };
489        Ok(self
490            .get_query::<_, _, GetEmailResponse>(
491                self.endpoints.get_emails(bucket_name)?,
492                Some(&query_params),
493            )?
494            .emails)
495    }
496
497    /// Get a page of emails from a bucket.
498    pub fn get_emails_iter_page(
499        &self,
500        bucket_name: &BucketFullName,
501        continuation: Option<&EmailContinuation>,
502        limit: usize,
503    ) -> Result<EmailsIterPage> {
504        let query_params = GetEmailsIterPageQuery {
505            continuation,
506            limit,
507        };
508        self.post(
509            self.endpoints.get_emails(bucket_name)?,
510            Some(&query_params),
511            Retry::Yes,
512        )
513    }
514
515    /// Iterate through all comments in a source.
516    pub fn get_emails_iter<'a>(
517        &'a self,
518        bucket_name: &'a BucketFullName,
519        page_size: Option<usize>,
520    ) -> EmailsIter<'a> {
521        EmailsIter::new(self, bucket_name, page_size)
522    }
523
524    /// Get a single comment by id.
525    pub fn get_comment<'a>(
526        &'a self,
527        source_name: &'a SourceFullName,
528        comment_id: &'a CommentId,
529    ) -> Result<Comment> {
530        let query_params = GetCommentQuery {
531            include_markup: true,
532        };
533        Ok(self
534            .get_query::<_, _, GetCommentResponse>(
535                self.endpoints.comment_by_id(source_name, comment_id)?,
536                Some(&query_params),
537            )?
538            .comment)
539    }
540    pub fn post_integration(
541        &self,
542        name: &IntegrationFullName,
543        integration: &NewIntegration,
544    ) -> Result<PostIntegrationResponse> {
545        self.request(
546            &Method::POST,
547            &self.endpoints.integration(name)?,
548            &Some(PostIntegrationRequest {
549                integration: integration.clone(),
550            }),
551            &None::<()>,
552            &Retry::No,
553        )
554    }
555
556    pub fn put_integration(
557        &self,
558        name: &IntegrationFullName,
559        integration: &NewIntegration,
560    ) -> Result<PutIntegrationResponse> {
561        self.request(
562            &Method::PUT,
563            &self.endpoints.integration(name)?,
564            &Some(PutIntegrationRequest {
565                integration: integration.clone(),
566            }),
567            &None::<()>,
568            &Retry::No,
569        )
570    }
571
572    pub fn put_comments_split_on_failure(
573        &self,
574        source_name: &SourceFullName,
575        comments: Vec<NewComment>,
576        no_charge: bool,
577    ) -> SplitableRequestResponse<PutCommentsResponse> {
578        // Retrying here despite the potential for 409's in order to increase reliability when
579        // working with poor connection
580
581        self.splitable_request(
582            Method::PUT,
583            self.endpoints
584                .put_comments(source_name)
585                .expect("Could not get put_comments endpoint"),
586            PutCommentsRequest { comments },
587            Some(NoChargeQuery { no_charge }),
588            Retry::Yes,
589        )
590    }
591
592    pub fn put_comments(
593        &self,
594        source_name: &SourceFullName,
595        comments: Vec<NewComment>,
596        no_charge: bool,
597    ) -> Result<PutCommentsResponse> {
598        // Retrying here despite the potential for 409's in order to increase reliability when
599        // working with poor connection
600        self.request(
601            &Method::PUT,
602            &self.endpoints.put_comments(source_name)?,
603            &Some(PutCommentsRequest { comments }),
604            &Some(NoChargeQuery { no_charge }),
605            &Retry::Yes,
606        )
607    }
608
609    pub fn put_stream(
610        &self,
611        dataset_name: &DatasetFullName,
612        stream: &NewStream,
613    ) -> Result<PutStreamResponse> {
614        self.put(
615            self.endpoints.streams(dataset_name)?,
616            Some(PutStreamRequest { stream }),
617        )
618    }
619
620    pub fn get_audit_events(
621        &self,
622        minimum_timestamp: Option<DateTime<Utc>>,
623        maximum_timestamp: Option<DateTime<Utc>>,
624        continuation: Option<Continuation>,
625    ) -> Result<AuditQueryResponse> {
626        self.post::<_, _, AuditQueryResponse>(
627            self.endpoints.audit_events_query()?,
628            AuditQueryRequest {
629                continuation,
630                filter: AuditQueryFilter {
631                    timestamp: CommentTimestampFilter {
632                        minimum: minimum_timestamp,
633                        maximum: maximum_timestamp,
634                    },
635                },
636            },
637            Retry::Yes,
638        )
639    }
640    pub fn get_latest_validation(
641        &self,
642        dataset_name: &DatasetFullName,
643    ) -> Result<ValidationResponse> {
644        self.get::<_, ValidationResponse>(self.endpoints.latest_validation(dataset_name)?)
645    }
646
647    pub fn get_validation(
648        &self,
649        dataset_name: &DatasetFullName,
650        model_version: &ModelVersion,
651    ) -> Result<ValidationResponse> {
652        self.get::<_, ValidationResponse>(self.endpoints.validation(dataset_name, model_version)?)
653    }
654
655    pub fn get_labellers(&self, dataset_name: &DatasetFullName) -> Result<Vec<UserModelMetadata>> {
656        Ok(self
657            .post::<_, _, GetAllModelsInDatasetRespone>(
658                self.endpoints.labellers(dataset_name)?,
659                GetAllModelsInDatasetRequest {},
660                Retry::Yes,
661            )?
662            .labellers)
663    }
664
665    pub fn get_label_validation(
666        &self,
667        label: &LabelName,
668        dataset_name: &DatasetFullName,
669        model_version: &ModelVersion,
670    ) -> Result<LabelValidation> {
671        Ok(self
672            .post::<_, _, LabelValidationResponse>(
673                self.endpoints
674                    .label_validation(dataset_name, model_version)?,
675                LabelValidationRequest {
676                    label: label.clone(),
677                },
678                Retry::Yes,
679            )?
680            .label_validation)
681    }
682
683    pub fn sync_comments(
684        &self,
685        source_name: &SourceFullName,
686        comments: Vec<NewComment>,
687        no_charge: bool,
688    ) -> Result<SyncCommentsResponse> {
689        self.request(
690            &Method::POST,
691            &self.endpoints.sync_comments(source_name)?,
692            &Some(SyncCommentsRequest { comments }),
693            &Some(NoChargeQuery { no_charge }),
694            &Retry::Yes,
695        )
696    }
697
698    pub fn sync_comments_split_on_failure(
699        &self,
700        source_name: &SourceFullName,
701        comments: Vec<NewComment>,
702        no_charge: bool,
703    ) -> SplitableRequestResponse<SyncCommentsResponse> {
704        self.splitable_request(
705            Method::POST,
706            self.endpoints
707                .sync_comments(source_name)
708                .expect("Could not get sync_comments endpoint"),
709            SyncCommentsRequest { comments },
710            Some(NoChargeQuery { no_charge }),
711            Retry::Yes,
712        )
713    }
714
715    pub fn sync_raw_emails(
716        &self,
717        source_name: &SourceFullName,
718        documents: &[Document],
719        transform_tag: &TransformTag,
720        include_comments: bool,
721        no_charge: bool,
722    ) -> Result<SyncRawEmailsResponse> {
723        self.request(
724            &Method::POST,
725            &self.endpoints.sync_comments_raw_emails(source_name)?,
726            &Some(SyncRawEmailsRequest {
727                documents,
728                transform_tag,
729                include_comments,
730            }),
731            &Some(NoChargeQuery { no_charge }),
732            &Retry::Yes,
733        )
734    }
735
736    pub fn put_emails_split_on_failure(
737        &self,
738        bucket_name: &BucketFullName,
739        emails: Vec<NewEmail>,
740        no_charge: bool,
741    ) -> SplitableRequestResponse<PutEmailsResponse> {
742        self.splitable_request(
743            Method::PUT,
744            self.endpoints
745                .put_emails(bucket_name)
746                .expect("Could not get put_emails endpoint"),
747            PutEmailsRequest { emails },
748            Some(NoChargeQuery { no_charge }),
749            Retry::Yes,
750        )
751    }
752
753    pub fn put_emails(
754        &self,
755        bucket_name: &BucketFullName,
756        emails: Vec<NewEmail>,
757        no_charge: bool,
758    ) -> Result<PutEmailsResponse> {
759        self.request(
760            &Method::PUT,
761            &self.endpoints.put_emails(bucket_name)?,
762            &Some(PutEmailsRequest { emails }),
763            &Some(NoChargeQuery { no_charge }),
764            &Retry::Yes,
765        )
766    }
767
768    pub fn post_user(&self, user_id: &UserId, user: UpdateUser) -> Result<PostUserResponse> {
769        self.post(
770            self.endpoints.post_user(user_id)?,
771            PostUserRequest { user: &user },
772            Retry::Yes,
773        )
774    }
775
776    pub fn put_comment_audio(
777        &self,
778        source_id: &SourceId,
779        comment_id: &CommentId,
780        audio_path: impl AsRef<Path>,
781    ) -> Result<()> {
782        let form = Form::new()
783            .file("file", audio_path)
784            .map_err(|source| Error::Unknown {
785                message: "PUT comment audio operation failed".to_owned(),
786                source: source.into(),
787            })?;
788        let http_response = self
789            .http_client
790            .put(self.endpoints.comment_audio(source_id, comment_id)?)
791            .headers(self.headers.clone())
792            .multipart(form)
793            .send()
794            .map_err(|source| Error::ReqwestError {
795                message: "PUT comment audio operation failed".to_owned(),
796                source,
797            })?;
798        let status = http_response.status();
799        http_response
800            .json::<Response<EmptySuccess>>()
801            .map_err(Error::BadJsonResponse)?
802            .into_result(status)?;
803        Ok(())
804    }
805
806    pub fn upload_ixp_document(
807        &self,
808        source_id: &SourceId,
809        filename: String,
810        bytes: Vec<u8>,
811    ) -> Result<CommentId> {
812        let endpoint = self.endpoints.ixp_documents(source_id)?;
813
814        let do_request = || {
815            let form = Form::new().part(
816                "file",
817                Part::bytes(bytes.clone()).file_name(filename.clone()),
818            );
819            let request = self
820                .http_client
821                .request(Method::PUT, endpoint.clone())
822                .multipart(form)
823                .headers(self.headers.clone());
824
825            request.send()
826        };
827
828        let result = self.with_retries(do_request);
829
830        let http_response = result.map_err(|source| Error::ReqwestError {
831            source,
832            message: "Operation failed.".to_string(),
833        })?;
834
835        let status = http_response.status();
836
837        Ok(http_response
838            .json::<Response<UploadIxpDocumentResponse>>()
839            .map_err(Error::BadJsonResponse)?
840            .into_result(status)?
841            .comment_id)
842    }
843
844    pub fn upload_comment_attachment(
845        &self,
846        source_id: &SourceId,
847        comment_id: &CommentId,
848        attachment_index: usize,
849        attachment: &PathBuf,
850    ) -> Result<UploadAttachmentResponse> {
851        let url = self
852            .endpoints
853            .attachment_upload(source_id, comment_id, attachment_index)?;
854
855        if !attachment.is_file() || !attachment.exists() {
856            return Err(Error::FileDoesNotExist {
857                path: attachment.clone(),
858            });
859        }
860
861        let do_request = || {
862            let form = Form::new()
863                .file("file", attachment)
864                .map_err(|source| Error::Unknown {
865                    message: "PUT comment attachment operation failed".to_owned(),
866                    source: source.into(),
867                })
868                .unwrap();
869            let request = self
870                .http_client
871                .request(Method::PUT, url.clone())
872                .multipart(form)
873                .headers(self.headers.clone());
874
875            request.send()
876        };
877
878        let result = self.with_retries(do_request);
879
880        let http_response = result.map_err(|source| Error::ReqwestError {
881            source,
882            message: "Operation failed.".to_string(),
883        })?;
884
885        let status = http_response.status();
886
887        http_response
888            .json::<Response<UploadAttachmentResponse>>()
889            .map_err(Error::BadJsonResponse)?
890            .into_result(status)
891    }
892
893    pub fn get_ixp_document(
894        &self,
895        source_id: &SourceId,
896        comment_id: &CommentId,
897    ) -> Result<Vec<u8>> {
898        self.get_octet_stream(&self.endpoints.ixp_document(source_id, comment_id)?)
899    }
900
901    fn get_octet_stream(&self, endpoint: &Url) -> Result<Vec<u8>> {
902        let mut response = self.raw_request(
903            &Method::GET,
904            endpoint,
905            &None::<()>,
906            &None::<()>,
907            &Retry::Yes,
908            None,
909        )?;
910
911        let status = response.status();
912
913        if !status.is_success() {
914            return Err(Error::Api {
915                status_code: status,
916                message: "Bad status code when getting octet stream".to_string(),
917            });
918        }
919
920        let mut buffer = Vec::new();
921
922        response
923            .read_to_end(&mut buffer)
924            .map_err(|source| Error::Unknown {
925                message: "Failed to read buffer".to_string(),
926                source: Box::new(source),
927            })?;
928        Ok(buffer)
929    }
930
931    pub fn get_attachment(&self, reference: &AttachmentReference) -> Result<Vec<u8>> {
932        self.get_octet_stream(&self.endpoints.attachment_reference(reference)?)
933    }
934
935    pub fn get_integrations(&self) -> Result<Vec<Integration>> {
936        Ok(self
937            .get::<_, GetIntegrationsResponse>(self.endpoints.integrations()?)?
938            .integrations)
939    }
940
941    pub fn get_integration(&self, name: &IntegrationFullName) -> Result<Integration> {
942        Ok(self
943            .get::<_, GetIntegrationResponse>(self.endpoints.integration(name)?)?
944            .integration)
945    }
946
947    pub fn get_datasets(&self) -> Result<Vec<Dataset>> {
948        Ok(self
949            .get::<_, GetAvailableDatasetsResponse>(self.endpoints.datasets.clone())?
950            .datasets)
951    }
952
953    pub fn get_dataset<IdentifierT>(&self, dataset: IdentifierT) -> Result<Dataset>
954    where
955        IdentifierT: Into<DatasetIdentifier>,
956    {
957        Ok(match dataset.into() {
958            DatasetIdentifier::Id(dataset_id) => {
959                self.get::<_, GetDatasetResponse>(self.endpoints.dataset_by_id(&dataset_id)?)?
960                    .dataset
961            }
962            DatasetIdentifier::FullName(dataset_name) => {
963                self.get::<_, GetDatasetResponse>(self.endpoints.dataset_by_name(&dataset_name)?)?
964                    .dataset
965            }
966        })
967    }
968
969    /// Create a ixp dataset.
970    pub fn create_ixp_dataset(&self, dataset: IxpDatasetNew) -> Result<Dataset> {
971        Ok(self
972            .put::<_, _, CreateIxpDatasetResponse>(
973                self.endpoints.ixp_datasets()?,
974                CreateIxpDatasetRequest { dataset },
975            )?
976            .dataset)
977    }
978
979    /// Create a dataset.
980    pub fn create_dataset(
981        &self,
982        dataset_name: &DatasetFullName,
983        options: NewDataset<'_>,
984    ) -> Result<Dataset> {
985        Ok(self
986            .put::<_, _, CreateDatasetResponse>(
987                self.endpoints.dataset_by_name(dataset_name)?,
988                CreateDatasetRequest { dataset: options },
989            )?
990            .dataset)
991    }
992
993    /// Update a dataset.
994    pub fn update_dataset(
995        &self,
996        dataset_name: &DatasetFullName,
997        options: UpdateDataset<'_>,
998    ) -> Result<Dataset> {
999        Ok(self
1000            .post::<_, _, UpdateDatasetResponse>(
1001                self.endpoints.dataset_by_name(dataset_name)?,
1002                UpdateDatasetRequest { dataset: options },
1003                Retry::Yes,
1004            )?
1005            .dataset)
1006    }
1007
1008    pub fn delete_dataset<IdentifierT>(&self, dataset: IdentifierT) -> Result<()>
1009    where
1010        IdentifierT: Into<DatasetIdentifier>,
1011    {
1012        let dataset_id = match dataset.into() {
1013            DatasetIdentifier::Id(dataset_id) => dataset_id,
1014            dataset @ DatasetIdentifier::FullName(_) => self.get_dataset(dataset)?.id,
1015        };
1016        self.delete(self.endpoints.dataset_by_id(&dataset_id)?)
1017    }
1018
1019    /// Get labellings for a given a dataset and a list of comment UIDs.
1020    pub fn get_labellings<'a>(
1021        &self,
1022        dataset_name: &DatasetFullName,
1023        comment_uids: impl Iterator<Item = &'a CommentUid>,
1024    ) -> Result<Vec<AnnotatedComment>> {
1025        Ok(self
1026            .get_query::<_, _, GetAnnotationsResponse>(
1027                self.endpoints.get_labellings(dataset_name)?,
1028                Some(&id_list_query(comment_uids.into_iter().map(|id| &id.0))),
1029            )?
1030            .results)
1031    }
1032
1033    /// Iterate through all reviewed comments in a source.
1034    pub fn get_labellings_iter<'a>(
1035        &'a self,
1036        dataset_name: &'a DatasetFullName,
1037        source_id: &'a SourceId,
1038        return_predictions: bool,
1039        limit: Option<usize>,
1040    ) -> LabellingsIter<'a> {
1041        LabellingsIter::new(self, dataset_name, source_id, return_predictions, limit)
1042    }
1043
1044    /// Get reviewed comments in bulk
1045    pub fn get_labellings_in_bulk(
1046        &self,
1047        dataset_name: &DatasetFullName,
1048        query_parameters: GetLabellingsInBulk<'_>,
1049    ) -> Result<GetAnnotationsResponse> {
1050        self.get_query::<_, _, GetAnnotationsResponse>(
1051            self.endpoints.get_labellings(dataset_name)?,
1052            Some(&query_parameters),
1053        )
1054    }
1055
1056    /// Update labellings for a given a dataset and comment UID.
1057    pub fn update_labelling(
1058        &self,
1059        dataset_name: &DatasetFullName,
1060        comment_uid: &CommentUid,
1061        labelling: Option<&[NewLabelling]>,
1062        entities: Option<&NewEntities>,
1063        moon_forms: Option<&[NewMoonForm]>,
1064    ) -> Result<AnnotatedComment> {
1065        self.post::<_, _, AnnotatedComment>(
1066            self.endpoints.post_labelling(dataset_name, comment_uid)?,
1067            UpdateAnnotationsRequest {
1068                labelling,
1069                entities,
1070                moon_forms,
1071            },
1072            Retry::Yes,
1073        )
1074    }
1075
1076    /// Get predictions for a given a dataset, a model version, and a list of comment UIDs.
1077    pub fn get_comment_predictions<'a>(
1078        &self,
1079        dataset_name: &DatasetFullName,
1080        model_version: &ModelVersion,
1081        comment_uids: impl Iterator<Item = &'a CommentUid>,
1082        threshold: Option<CommentPredictionsThreshold>,
1083        labels: Option<Vec<TriggerLabelThreshold>>,
1084    ) -> Result<Vec<Prediction>> {
1085        Ok(self
1086            .post::<_, _, GetPredictionsResponse>(
1087                self.endpoints
1088                    .get_comment_predictions(dataset_name, model_version)?,
1089                GetCommentPredictionsRequest {
1090                    uids: comment_uids
1091                        .into_iter()
1092                        .map(|id| id.0.clone())
1093                        .collect::<Vec<_>>(),
1094
1095                    threshold,
1096                    labels,
1097                },
1098                Retry::Yes,
1099            )?
1100            .predictions)
1101    }
1102
1103    pub fn get_streams(&self, dataset_name: &DatasetFullName) -> Result<Vec<Stream>> {
1104        Ok(self
1105            .get::<_, GetStreamsResponse>(self.endpoints.streams(dataset_name)?)?
1106            .streams)
1107    }
1108
1109    pub fn get_recent_comments(
1110        &self,
1111        dataset_name: &DatasetFullName,
1112        filter: &CommentFilter,
1113        limit: usize,
1114        continuation: Option<&Continuation>,
1115    ) -> Result<RecentCommentsPage> {
1116        self.post::<_, _, RecentCommentsPage>(
1117            self.endpoints.recent_comments(dataset_name)?,
1118            GetRecentRequest {
1119                limit,
1120                filter,
1121                continuation,
1122            },
1123            Retry::No,
1124        )
1125    }
1126
1127    pub fn refresh_user_permissions(&self) -> Result<RefreshUserPermissionsResponse> {
1128        self.post::<_, _, RefreshUserPermissionsResponse>(
1129            self.endpoints.refresh_user_permissions()?,
1130            RefreshUserPermissionsRequest {},
1131            Retry::Yes,
1132        )
1133    }
1134
1135    pub fn get_current_user(&self) -> Result<User> {
1136        Ok(self
1137            .get::<_, GetCurrentUserResponse>(self.endpoints.current_user.clone())?
1138            .user)
1139    }
1140
1141    pub fn get_users(&self) -> Result<Vec<User>> {
1142        Ok(self
1143            .get::<_, GetAvailableUsersResponse>(self.endpoints.users.clone())?
1144            .users)
1145    }
1146
1147    pub fn create_user(&self, user: NewUser<'_>) -> Result<User> {
1148        Ok(self
1149            .put::<_, _, CreateUserResponse>(
1150                self.endpoints.users.clone(),
1151                CreateUserRequest { user },
1152            )?
1153            .user)
1154    }
1155
1156    pub fn dataset_summary(
1157        &self,
1158        dataset_name: &DatasetFullName,
1159        params: &SummaryRequestParams,
1160    ) -> Result<SummaryResponse> {
1161        self.post::<_, _, SummaryResponse>(
1162            self.endpoints.dataset_summary(dataset_name)?,
1163            serde_json::to_value(params).expect("summary params serialization error"),
1164            Retry::Yes,
1165        )
1166    }
1167
1168    pub fn query_dataset_csv(
1169        &self,
1170        dataset_name: &DatasetFullName,
1171        params: &QueryRequestParams,
1172    ) -> Result<String> {
1173        let response = self
1174            .raw_request(
1175                &Method::POST,
1176                &self.endpoints.query_dataset(dataset_name)?,
1177                &Some(serde_json::to_value(params).expect("query params serialization error")),
1178                &None::<()>,
1179                &Retry::Yes,
1180                Some(HeaderValue::from_str("text/csv").expect("Could not parse csv header")),
1181            )?
1182            .text()
1183            .expect("Could not get csv text");
1184
1185        Ok(response)
1186    }
1187
1188    pub fn query_dataset(
1189        &self,
1190        dataset_name: &DatasetFullName,
1191        params: &QueryRequestParams,
1192    ) -> Result<QueryResponse> {
1193        self.post::<_, _, QueryResponse>(
1194            self.endpoints.query_dataset(dataset_name)?,
1195            serde_json::to_value(params).expect("query params serialization error"),
1196            Retry::Yes,
1197        )
1198    }
1199
1200    pub fn send_welcome_email(&self, user_id: UserId) -> Result<()> {
1201        self.post::<_, _, WelcomeEmailResponse>(
1202            self.endpoints.welcome_email(&user_id)?,
1203            json!({}),
1204            Retry::No,
1205        )?;
1206        Ok(())
1207    }
1208
1209    pub fn get_bucket_statistics(&self, bucket_name: &BucketFullName) -> Result<BucketStatistics> {
1210        Ok(self
1211            .get::<_, GetBucketStatisticsResponse>(self.endpoints.bucket_statistics(bucket_name)?)?
1212            .statistics)
1213    }
1214
1215    pub fn get_dataset_statistics(
1216        &self,
1217        dataset_name: &DatasetFullName,
1218        params: &DatasetStatisticsRequestParams,
1219    ) -> Result<CommentStatistics> {
1220        Ok(self
1221            .post::<_, _, GetStatisticsResponse>(
1222                self.endpoints.dataset_statistics(dataset_name)?,
1223                serde_json::to_value(params)
1224                    .expect("dataset statistics params serialization error"),
1225                Retry::No,
1226            )?
1227            .statistics)
1228    }
1229
1230    pub fn get_source_statistics(
1231        &self,
1232        source_name: &SourceFullName,
1233        params: &SourceStatisticsRequestParams,
1234    ) -> Result<CommentStatistics> {
1235        Ok(self
1236            .post::<_, _, GetStatisticsResponse>(
1237                self.endpoints.source_statistics(source_name)?,
1238                serde_json::to_value(params).expect("source statistics params serialization error"),
1239                Retry::No,
1240            )?
1241            .statistics)
1242    }
1243
1244    /// Create a new bucket.
1245    pub fn create_bucket(
1246        &self,
1247        bucket_name: &BucketFullName,
1248        options: NewBucket<'_>,
1249    ) -> Result<Bucket> {
1250        Ok(self
1251            .put::<_, _, CreateBucketResponse>(
1252                self.endpoints.bucket_by_name(bucket_name)?,
1253                CreateBucketRequest { bucket: options },
1254            )?
1255            .bucket)
1256    }
1257
1258    pub fn get_buckets(&self) -> Result<Vec<Bucket>> {
1259        Ok(self
1260            .get::<_, GetAvailableBucketsResponse>(self.endpoints.buckets.clone())?
1261            .buckets)
1262    }
1263
1264    pub fn get_bucket<IdentifierT>(&self, bucket: IdentifierT) -> Result<Bucket>
1265    where
1266        IdentifierT: Into<BucketIdentifier>,
1267    {
1268        Ok(match bucket.into() {
1269            BucketIdentifier::Id(bucket_id) => {
1270                self.get::<_, GetBucketResponse>(self.endpoints.bucket_by_id(&bucket_id)?)?
1271                    .bucket
1272            }
1273            BucketIdentifier::FullName(bucket_name) => {
1274                self.get::<_, GetBucketResponse>(self.endpoints.bucket_by_name(&bucket_name)?)?
1275                    .bucket
1276            }
1277        })
1278    }
1279
1280    pub fn delete_bucket<IdentifierT>(&self, bucket: IdentifierT) -> Result<()>
1281    where
1282        IdentifierT: Into<BucketIdentifier>,
1283    {
1284        let bucket_id = match bucket.into() {
1285            BucketIdentifier::Id(bucket_id) => bucket_id,
1286            bucket @ BucketIdentifier::FullName(_) => self.get_bucket(bucket)?.id,
1287        };
1288        self.delete(self.endpoints.bucket_by_id(&bucket_id)?)
1289    }
1290
1291    pub fn fetch_stream_comments(
1292        &self,
1293        stream_name: &StreamFullName,
1294        size: u32,
1295    ) -> Result<StreamBatch> {
1296        self.post(
1297            self.endpoints.stream_fetch(stream_name)?,
1298            StreamFetchRequest { size },
1299            Retry::No,
1300        )
1301    }
1302
1303    pub fn get_stream(&self, stream_name: &StreamFullName) -> Result<Stream> {
1304        Ok(self
1305            .get::<_, GetStreamResponse>(self.endpoints.stream(stream_name)?)?
1306            .stream)
1307    }
1308
1309    pub fn advance_stream(
1310        &self,
1311        stream_name: &StreamFullName,
1312        sequence_id: StreamSequenceId,
1313    ) -> Result<()> {
1314        self.post::<_, _, serde::de::IgnoredAny>(
1315            self.endpoints.stream_advance(stream_name)?,
1316            StreamAdvanceRequest { sequence_id },
1317            Retry::No,
1318        )?;
1319        Ok(())
1320    }
1321
1322    pub fn reset_stream(
1323        &self,
1324        stream_name: &StreamFullName,
1325        to_comment_created_at: DateTime<Utc>,
1326    ) -> Result<()> {
1327        self.post::<_, _, serde::de::IgnoredAny>(
1328            self.endpoints.stream_reset(stream_name)?,
1329            StreamResetRequest {
1330                to_comment_created_at,
1331            },
1332            Retry::No,
1333        )?;
1334        Ok(())
1335    }
1336
1337    pub fn tag_stream_exceptions(
1338        &self,
1339        stream_name: &StreamFullName,
1340        exceptions: &[StreamException],
1341    ) -> Result<()> {
1342        self.put::<_, _, serde::de::IgnoredAny>(
1343            self.endpoints.stream_exceptions(stream_name)?,
1344            TagStreamExceptionsRequest { exceptions },
1345        )?;
1346        Ok(())
1347    }
1348
1349    /// Gets a project.
1350    pub fn get_project(&self, project_name: &ProjectName) -> Result<Project> {
1351        let response =
1352            self.get::<_, GetProjectResponse>(self.endpoints.project_by_name(project_name)?)?;
1353        Ok(response.project)
1354    }
1355
1356    /// Gets all projects.
1357    pub fn get_projects(&self) -> Result<Vec<Project>> {
1358        let response = self.get::<_, GetProjectsResponse>(self.endpoints.projects.clone())?;
1359        Ok(response.projects)
1360    }
1361
1362    /// Creates a new project.
1363    pub fn create_project(
1364        &self,
1365        project_name: &ProjectName,
1366        options: NewProject,
1367        user_ids: &[UserId],
1368    ) -> Result<Project> {
1369        Ok(self
1370            .put::<_, _, CreateProjectResponse>(
1371                self.endpoints.project_by_name(project_name)?,
1372                CreateProjectRequest {
1373                    project: options,
1374                    user_ids,
1375                },
1376            )?
1377            .project)
1378    }
1379
1380    /// Updates an existing project.
1381    pub fn update_project(
1382        &self,
1383        project_name: &ProjectName,
1384        options: UpdateProject,
1385    ) -> Result<Project> {
1386        Ok(self
1387            .post::<_, _, UpdateProjectResponse>(
1388                self.endpoints.project_by_name(project_name)?,
1389                UpdateProjectRequest { project: options },
1390                Retry::Yes,
1391            )?
1392            .project)
1393    }
1394
1395    /// Deletes an existing project.
1396    pub fn delete_project(
1397        &self,
1398        project_name: &ProjectName,
1399        force_delete: ForceDeleteProject,
1400    ) -> Result<()> {
1401        let endpoint = self.endpoints.project_by_name(project_name)?;
1402        match force_delete {
1403            ForceDeleteProject::No => self.delete(endpoint)?,
1404            ForceDeleteProject::Yes => {
1405                self.delete_query(endpoint, Some(&json!({ "force": true })))?
1406            }
1407        };
1408        Ok(())
1409    }
1410
1411    fn get<LocationT, SuccessT>(&self, url: LocationT) -> Result<SuccessT>
1412    where
1413        LocationT: IntoUrl + Display + Clone,
1414        for<'de> SuccessT: Deserialize<'de>,
1415    {
1416        self.request(&Method::GET, &url, &None::<()>, &None::<()>, &Retry::Yes)
1417    }
1418
1419    fn get_query<LocationT, QueryT, SuccessT>(
1420        &self,
1421        url: LocationT,
1422        query: Option<&QueryT>,
1423    ) -> Result<SuccessT>
1424    where
1425        LocationT: IntoUrl + Display + Clone,
1426        QueryT: Serialize,
1427        for<'de> SuccessT: Deserialize<'de>,
1428    {
1429        self.request(&Method::GET, &url, &None::<()>, &Some(query), &Retry::Yes)
1430    }
1431
1432    fn delete<LocationT>(&self, url: LocationT) -> Result<()>
1433    where
1434        LocationT: IntoUrl + Display + Clone,
1435    {
1436        self.delete_query::<LocationT, ()>(url, None)
1437    }
1438
1439    fn delete_query<LocationT, QueryT>(&self, url: LocationT, query: Option<&QueryT>) -> Result<()>
1440    where
1441        LocationT: IntoUrl + Display + Clone,
1442        QueryT: Serialize,
1443    {
1444        debug!("Attempting DELETE `{url}`");
1445
1446        let attempts = Cell::new(0);
1447        let http_response = self
1448            .with_retries(|| {
1449                attempts.set(attempts.get() + 1);
1450
1451                let mut request = self
1452                    .http_client
1453                    .delete(url.clone())
1454                    .headers(self.headers.clone());
1455                if let Some(query) = query {
1456                    request = request.query(query);
1457                }
1458                request.send()
1459            })
1460            .map_err(|source| Error::ReqwestError {
1461                source,
1462                message: "DELETE operation failed.".to_owned(),
1463            })?;
1464        let status = http_response.status();
1465        http_response
1466            .json::<Response<EmptySuccess>>()
1467            .map_err(Error::BadJsonResponse)?
1468            .into_result(status)
1469            .map_or_else(
1470                // Ignore 404 not found if the request had to be re-tried - assume the target
1471                // object was deleted on a previous incomplete request.
1472                |error| {
1473                    if attempts.get() > 1 && status == reqwest::StatusCode::NOT_FOUND {
1474                        Ok(())
1475                    } else {
1476                        Err(error)
1477                    }
1478                },
1479                |_| Ok(()),
1480            )
1481    }
1482
1483    fn post<LocationT, RequestT, SuccessT>(
1484        &self,
1485        url: LocationT,
1486        request: RequestT,
1487        retry: Retry,
1488    ) -> Result<SuccessT>
1489    where
1490        LocationT: IntoUrl + Display + Clone,
1491        RequestT: Serialize,
1492        for<'de> SuccessT: Deserialize<'de>,
1493    {
1494        self.request(&Method::POST, &url, &Some(request), &None::<()>, &retry)
1495    }
1496
1497    fn put<LocationT, RequestT, SuccessT>(
1498        &self,
1499        url: LocationT,
1500        request: RequestT,
1501    ) -> Result<SuccessT>
1502    where
1503        LocationT: IntoUrl + Display + Clone,
1504        RequestT: Serialize,
1505        for<'de> SuccessT: Deserialize<'de>,
1506    {
1507        self.request(&Method::PUT, &url, &Some(request), &None::<()>, &Retry::Yes)
1508    }
1509
1510    fn raw_request<LocationT, RequestT, QueryT>(
1511        &self,
1512        method: &Method,
1513        url: &LocationT,
1514        body: &Option<RequestT>,
1515        query: &Option<QueryT>,
1516        retry: &Retry,
1517        accept_header: Option<HeaderValue>,
1518    ) -> Result<reqwest::blocking::Response>
1519    where
1520        LocationT: IntoUrl + Display + Clone,
1521        RequestT: Serialize,
1522        QueryT: Serialize,
1523    {
1524        let mut headers = self.headers.clone();
1525
1526        if let Some(accept_header) = accept_header {
1527            headers.insert(ACCEPT, accept_header);
1528        }
1529
1530        let do_request = || {
1531            let request = self
1532                .http_client
1533                .request(method.clone(), url.clone())
1534                .headers(headers.clone());
1535
1536            let request = match &query {
1537                Some(query) => request.query(query),
1538                None => request,
1539            };
1540            let request = match &body {
1541                Some(body) => request.json(body),
1542                None => request,
1543            };
1544            request.send()
1545        };
1546
1547        let result = match retry {
1548            Retry::Yes => self.with_retries(do_request),
1549            Retry::No => do_request(),
1550        };
1551        let http_response = result.map_err(|source| Error::ReqwestError {
1552            source,
1553            message: format!("{method} operation failed."),
1554        })?;
1555
1556        Ok(http_response)
1557    }
1558
1559    fn splitable_request<LocationT, RequestT, SuccessT, QueryT>(
1560        &self,
1561        method: Method,
1562        url: LocationT,
1563        body: RequestT,
1564        query: Option<QueryT>,
1565        retry: Retry,
1566    ) -> SplitableRequestResponse<SuccessT>
1567    where
1568        LocationT: IntoUrl + Display + Clone,
1569        RequestT: Serialize + SplittableRequest + Clone,
1570        QueryT: Serialize + Clone,
1571        for<'de> SuccessT: Deserialize<'de> + ReducibleResponse + Clone + Default,
1572    {
1573        debug!("Attempting {method} `{url}`");
1574        let result: Result<SuccessT> =
1575            self.request(&method, &url, &Some(body.clone()), &query, &retry);
1576
1577        match result {
1578            Ok(response) => SplitableRequestResponse {
1579                response,
1580                num_failed: 0,
1581            },
1582            Err(_) => {
1583                let mut num_failed = 0;
1584                let response = body
1585                    .split()
1586                    .filter_map(|request| {
1587                        match self.request(&method, &url, &Some(request), &query, &retry) {
1588                            Ok(response) => Some(response),
1589                            Err(err) => {
1590                                debug!("{err}");
1591                                num_failed += 1;
1592                                None
1593                            }
1594                        }
1595                    })
1596                    .fold(SuccessT::empty(), |merged, next: SuccessT| {
1597                        merged.merge(next)
1598                    });
1599
1600                SplitableRequestResponse {
1601                    num_failed,
1602                    response,
1603                }
1604            }
1605        }
1606    }
1607
1608    fn request<LocationT, RequestT, SuccessT, QueryT>(
1609        &self,
1610        method: &Method,
1611        url: &LocationT,
1612        body: &Option<RequestT>,
1613        query: &Option<QueryT>,
1614        retry: &Retry,
1615    ) -> Result<SuccessT>
1616    where
1617        LocationT: IntoUrl + Display + Clone,
1618        RequestT: Serialize,
1619        QueryT: Serialize + Clone,
1620        for<'de> SuccessT: Deserialize<'de>,
1621    {
1622        debug!("Attempting {method} `{url}`");
1623        let http_response = self.raw_request(method, url, body, query, retry, None)?;
1624
1625        let status = http_response.status();
1626
1627        let response_text = http_response.text().map_err(|source| Error::ReqwestError {
1628            message: "Could not get request text".to_string(),
1629            source,
1630        })?;
1631
1632        let mut deserializer = serde_json::Deserializer::from_str(&response_text);
1633        deserializer.disable_recursion_limit();
1634
1635        Response::<SuccessT>::deserialize(&mut deserializer)
1636            .map_err(Error::BadSerdeJsonResponse)?
1637            .into_result(status)
1638    }
1639
1640    fn with_retries(
1641        &self,
1642        send_request: impl Fn() -> ReqwestResult<HttpResponse>,
1643    ) -> ReqwestResult<HttpResponse> {
1644        match &self.retrier {
1645            Some(retrier) => retrier.with_retries(send_request),
1646            None => send_request(),
1647        }
1648    }
1649}
1650
1651#[derive(Copy, Clone)]
1652enum Retry {
1653    Yes,
1654    No,
1655}
1656
1657pub struct DatasetQueryIter<'a> {
1658    client: &'a Client,
1659    dataset_name: &'a DatasetFullName,
1660    done: bool,
1661    params: &'a mut QueryRequestParams,
1662}
1663
1664impl<'a> DatasetQueryIter<'a> {
1665    fn new(
1666        client: &'a Client,
1667        dataset_name: &'a DatasetFullName,
1668        params: &'a mut QueryRequestParams,
1669    ) -> Self {
1670        Self {
1671            client,
1672            dataset_name,
1673            done: false,
1674            params,
1675        }
1676    }
1677}
1678
1679impl Iterator for DatasetQueryIter<'_> {
1680    type Item = Result<Vec<AnnotatedComment>>;
1681
1682    fn next(&mut self) -> Option<Self::Item> {
1683        if self.done {
1684            return None;
1685        }
1686
1687        let response = self.client.query_dataset(self.dataset_name, self.params);
1688        Some(response.map(|page| {
1689            self.params.continuation = page.continuation;
1690            self.done = self.params.continuation.is_none();
1691            page.results
1692        }))
1693    }
1694}
1695
1696pub enum ContinuationKind {
1697    Timestamp(DateTime<Utc>),
1698    Continuation(Continuation),
1699}
1700
1701pub struct EmailsIter<'a> {
1702    client: &'a Client,
1703    bucket_name: &'a BucketFullName,
1704    continuation: Option<EmailContinuation>,
1705    done: bool,
1706    page_size: usize,
1707}
1708
1709impl<'a> EmailsIter<'a> {
1710    // Default number of emails per page to request from API.
1711    pub const DEFAULT_PAGE_SIZE: usize = 64;
1712    // Maximum number of emails per page which can be requested from the API.
1713    pub const MAX_PAGE_SIZE: usize = 256;
1714
1715    fn new(client: &'a Client, bucket_name: &'a BucketFullName, page_size: Option<usize>) -> Self {
1716        Self {
1717            client,
1718            bucket_name,
1719            continuation: None,
1720            done: false,
1721            page_size: page_size.unwrap_or(Self::DEFAULT_PAGE_SIZE),
1722        }
1723    }
1724}
1725
1726impl Iterator for EmailsIter<'_> {
1727    type Item = Result<Vec<Email>>;
1728
1729    fn next(&mut self) -> Option<Self::Item> {
1730        if self.done {
1731            return None;
1732        }
1733        let response = self.client.get_emails_iter_page(
1734            self.bucket_name,
1735            self.continuation.as_ref(),
1736            self.page_size,
1737        );
1738        Some(response.map(|page| {
1739            self.continuation = page.continuation;
1740            self.done = self.continuation.is_none();
1741            page.emails
1742        }))
1743    }
1744}
1745
1746pub struct CommentsIter<'a> {
1747    client: &'a Client,
1748    source_name: &'a SourceFullName,
1749    continuation: Option<ContinuationKind>,
1750    done: bool,
1751    page_size: usize,
1752    to_timestamp: Option<DateTime<Utc>>,
1753}
1754
1755#[derive(Debug, Default)]
1756pub struct CommentsIterTimerange {
1757    pub from: Option<DateTime<Utc>>,
1758    pub to: Option<DateTime<Utc>>,
1759}
1760impl<'a> CommentsIter<'a> {
1761    // Default number of comments per page to request from API.
1762    pub const DEFAULT_PAGE_SIZE: usize = 64;
1763    // Maximum number of comments per page which can be requested from the API.
1764    pub const MAX_PAGE_SIZE: usize = 256;
1765
1766    fn new(
1767        client: &'a Client,
1768        source_name: &'a SourceFullName,
1769        page_size: Option<usize>,
1770        timerange: CommentsIterTimerange,
1771    ) -> Self {
1772        let (from_timestamp, to_timestamp) = (timerange.from, timerange.to);
1773        Self {
1774            client,
1775            source_name,
1776            to_timestamp,
1777            continuation: from_timestamp.map(ContinuationKind::Timestamp),
1778            done: false,
1779            page_size: page_size.unwrap_or(Self::DEFAULT_PAGE_SIZE),
1780        }
1781    }
1782}
1783
1784impl Iterator for CommentsIter<'_> {
1785    type Item = Result<Vec<Comment>>;
1786
1787    fn next(&mut self) -> Option<Self::Item> {
1788        if self.done {
1789            return None;
1790        }
1791        let response = self.client.get_comments_iter_page(
1792            self.source_name,
1793            self.continuation.as_ref(),
1794            self.to_timestamp,
1795            self.page_size,
1796        );
1797        Some(response.map(|page| {
1798            self.continuation = page.continuation.map(ContinuationKind::Continuation);
1799            self.done = self.continuation.is_none();
1800            page.comments
1801        }))
1802    }
1803}
1804
1805pub struct LabellingsIter<'a> {
1806    client: &'a Client,
1807    dataset_name: &'a DatasetFullName,
1808    source_id: &'a SourceId,
1809    return_predictions: bool,
1810    after: Option<GetLabellingsAfter>,
1811    limit: Option<usize>,
1812    done: bool,
1813}
1814
1815impl<'a> LabellingsIter<'a> {
1816    fn new(
1817        client: &'a Client,
1818        dataset_name: &'a DatasetFullName,
1819        source_id: &'a SourceId,
1820        return_predictions: bool,
1821        limit: Option<usize>,
1822    ) -> Self {
1823        Self {
1824            client,
1825            dataset_name,
1826            source_id,
1827            return_predictions,
1828            after: None,
1829            limit,
1830            done: false,
1831        }
1832    }
1833}
1834
1835impl Iterator for LabellingsIter<'_> {
1836    type Item = Result<Vec<AnnotatedComment>>;
1837
1838    fn next(&mut self) -> Option<Self::Item> {
1839        if self.done {
1840            return None;
1841        }
1842        let response = self.client.get_labellings_in_bulk(
1843            self.dataset_name,
1844            GetLabellingsInBulk {
1845                source_id: self.source_id,
1846                return_predictions: &self.return_predictions,
1847                after: &self.after,
1848                limit: &self.limit,
1849            },
1850        );
1851        Some(response.map(|page| {
1852            if self.after == page.after && !page.results.is_empty() {
1853                panic!("Labellings API did not increment pagination continuation");
1854            }
1855            self.after = page.after;
1856            if page.results.is_empty() {
1857                self.done = true;
1858            }
1859            page.results
1860        }))
1861    }
1862}
1863
1864#[derive(Debug)]
1865struct Endpoints {
1866    base: Url,
1867    datasets: Url,
1868    sources: Url,
1869    buckets: Url,
1870    users: Url,
1871    current_user: Url,
1872    projects: Url,
1873}
1874
1875#[derive(Debug, Serialize, Clone, Copy)]
1876struct NoChargeQuery {
1877    no_charge: bool,
1878}
1879
1880fn construct_endpoint(base: &Url, segments: &[&str]) -> Result<Url> {
1881    let mut endpoint = base.clone();
1882
1883    let mut endpoint_segments = endpoint
1884        .path_segments_mut()
1885        .map_err(|_| Error::BadEndpoint {
1886            endpoint: base.clone(),
1887        })?;
1888
1889    for segment in segments {
1890        endpoint_segments.push(segment);
1891    }
1892
1893    drop(endpoint_segments);
1894
1895    Ok(endpoint)
1896}
1897
1898impl Endpoints {
1899    pub fn new(base: Url) -> Result<Self> {
1900        let datasets = construct_endpoint(&base, &["api", "v1", "datasets"])?;
1901        let sources = construct_endpoint(&base, &["api", "v1", "sources"])?;
1902        let buckets = construct_endpoint(&base, &["api", "_private", "buckets"])?;
1903        let users = construct_endpoint(&base, &["api", "_private", "users"])?;
1904        let current_user = construct_endpoint(&base, &["auth", "user"])?;
1905        let projects = construct_endpoint(&base, &["api", "_private", "projects"])?;
1906
1907        Ok(Endpoints {
1908            base,
1909            datasets,
1910            sources,
1911            buckets,
1912            users,
1913            current_user,
1914            projects,
1915        })
1916    }
1917
1918    fn refresh_user_permissions(&self) -> Result<Url> {
1919        construct_endpoint(&self.base, &["auth", "refresh-user-permissions"])
1920    }
1921
1922    fn label_group(
1923        &self,
1924        dataset_name: &DatasetFullName,
1925        label_group: LabelGroupName,
1926    ) -> Result<Url> {
1927        construct_endpoint(
1928            &self.base,
1929            &[
1930                "api",
1931                "_private",
1932                "datasets",
1933                &dataset_name.0,
1934                "labels",
1935                &label_group.0,
1936            ],
1937        )
1938    }
1939
1940    fn ixp_datasets(&self) -> Result<Url> {
1941        construct_endpoint(&self.base, &["api", "_private", "ixp", "datasets"])
1942    }
1943
1944    fn ixp_documents(&self, source_id: &SourceId) -> Result<Url> {
1945        construct_endpoint(
1946            &self.base,
1947            &[
1948                "api",
1949                "_private",
1950                "sources",
1951                &format!("id:{0}", source_id.0),
1952                "documents",
1953            ],
1954        )
1955    }
1956
1957    fn ixp_document(&self, source_id: &SourceId, comment_id: &CommentId) -> Result<Url> {
1958        construct_endpoint(
1959            &self.base,
1960            &[
1961                "api",
1962                "_private",
1963                "sources",
1964                &format!("id:{0}", source_id.0),
1965                "documents",
1966                &comment_id.0,
1967            ],
1968        )
1969    }
1970
1971    fn keyed_sync_states(&self, bucket_id: &BucketId) -> Result<Url> {
1972        construct_endpoint(
1973            &self.base,
1974            &[
1975                "api",
1976                "_private",
1977                "buckets",
1978                &format!("id:{}", bucket_id.0),
1979                "keyed-sync-states/",
1980            ],
1981        )
1982    }
1983
1984    fn keyed_sync_state(&self, bucket_id: &BucketId, id: &KeyedSyncStateId) -> Result<Url> {
1985        construct_endpoint(
1986            &self.base,
1987            &[
1988                "api",
1989                "_private",
1990                "buckets",
1991                &format!("id:{}", bucket_id.0),
1992                "keyed-sync-state",
1993                &id.0,
1994            ],
1995        )
1996    }
1997
1998    fn query_keyed_sync_state_ids(&self, bucket_id: &BucketId) -> Result<Url> {
1999        construct_endpoint(
2000            &self.base,
2001            &[
2002                "api",
2003                "_private",
2004                "buckets",
2005                &format!("id:{}", bucket_id.0),
2006                "keyed-sync-state-ids",
2007            ],
2008        )
2009    }
2010
2011    fn audit_events_query(&self) -> Result<Url> {
2012        construct_endpoint(&self.base, &["api", "v1", "audit_events", "query"])
2013    }
2014
2015    fn integrations(&self) -> Result<Url> {
2016        construct_endpoint(&self.base, &["api", "_private", "integrations"])
2017    }
2018
2019    fn integration(&self, name: &IntegrationFullName) -> Result<Url> {
2020        construct_endpoint(&self.base, &["api", "_private", "integrations", &name.0])
2021    }
2022
2023    fn attachment_reference(&self, reference: &AttachmentReference) -> Result<Url> {
2024        construct_endpoint(&self.base, &["api", "v1", "attachments", &reference.0])
2025    }
2026
2027    fn attachment_upload(
2028        &self,
2029        source_id: &SourceId,
2030        comment_id: &CommentId,
2031        attachment_index: usize,
2032    ) -> Result<Url> {
2033        construct_endpoint(
2034            &self.base,
2035            &[
2036                "api",
2037                "_private",
2038                "sources",
2039                &format!("id:{}", source_id.0),
2040                "comments",
2041                &comment_id.0,
2042                "attachments",
2043                &attachment_index.to_string(),
2044            ],
2045        )
2046    }
2047    fn latest_validation(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2048        construct_endpoint(
2049            &self.base,
2050            &[
2051                "api",
2052                "_private",
2053                "datasets",
2054                &dataset_name.0,
2055                "labellers",
2056                "latest",
2057                "validation",
2058            ],
2059        )
2060    }
2061
2062    fn validation(
2063        &self,
2064        dataset_name: &DatasetFullName,
2065        model_version: &ModelVersion,
2066    ) -> Result<Url> {
2067        construct_endpoint(
2068            &self.base,
2069            &[
2070                "api",
2071                "_private",
2072                "datasets",
2073                &dataset_name.0,
2074                "labellers",
2075                &model_version.0.to_string(),
2076                "validation",
2077            ],
2078        )
2079    }
2080
2081    fn label_validation(
2082        &self,
2083        dataset_name: &DatasetFullName,
2084        model_version: &ModelVersion,
2085    ) -> Result<Url> {
2086        construct_endpoint(
2087            &self.base,
2088            &[
2089                "api",
2090                "_private",
2091                "datasets",
2092                &dataset_name.0,
2093                "labellers",
2094                &model_version.0.to_string(),
2095                "label-validation",
2096            ],
2097        )
2098    }
2099    fn bucket_statistics(&self, bucket_name: &BucketFullName) -> Result<Url> {
2100        construct_endpoint(
2101            &self.base,
2102            &["api", "_private", "buckets", &bucket_name.0, "statistics"],
2103        )
2104    }
2105
2106    fn dataset_summary(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2107        construct_endpoint(
2108            &self.base,
2109            &["api", "_private", "datasets", &dataset_name.0, "summary"],
2110        )
2111    }
2112
2113    fn query_dataset(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2114        construct_endpoint(
2115            &self.base,
2116            &["api", "_private", "datasets", &dataset_name.0, "query"],
2117        )
2118    }
2119
2120    fn streams(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2121        construct_endpoint(
2122            &self.base,
2123            &["api", "v1", "datasets", &dataset_name.0, "streams"],
2124        )
2125    }
2126
2127    fn stream(&self, stream_name: &StreamFullName) -> Result<Url> {
2128        construct_endpoint(
2129            &self.base,
2130            &[
2131                "api",
2132                "v1",
2133                "datasets",
2134                &stream_name.dataset.0,
2135                "streams",
2136                &stream_name.stream.0,
2137            ],
2138        )
2139    }
2140
2141    fn stream_fetch(&self, stream_name: &StreamFullName) -> Result<Url> {
2142        construct_endpoint(
2143            &self.base,
2144            &[
2145                "api",
2146                "v1",
2147                "datasets",
2148                &stream_name.dataset.0,
2149                "streams",
2150                &stream_name.stream.0,
2151                "fetch",
2152            ],
2153        )
2154    }
2155
2156    fn stream_advance(&self, stream_name: &StreamFullName) -> Result<Url> {
2157        construct_endpoint(
2158            &self.base,
2159            &[
2160                "api",
2161                "v1",
2162                "datasets",
2163                &stream_name.dataset.0,
2164                "streams",
2165                &stream_name.stream.0,
2166                "advance",
2167            ],
2168        )
2169    }
2170
2171    fn stream_reset(&self, stream_name: &StreamFullName) -> Result<Url> {
2172        construct_endpoint(
2173            &self.base,
2174            &[
2175                "api",
2176                "v1",
2177                "datasets",
2178                &stream_name.dataset.0,
2179                "streams",
2180                &stream_name.stream.0,
2181                "reset",
2182            ],
2183        )
2184    }
2185
2186    fn stream_exceptions(&self, stream_name: &StreamFullName) -> Result<Url> {
2187        construct_endpoint(
2188            &self.base,
2189            &[
2190                "api",
2191                "v1",
2192                "datasets",
2193                &stream_name.dataset.0,
2194                "streams",
2195                &stream_name.stream.0,
2196                "exceptions",
2197            ],
2198        )
2199    }
2200
2201    fn recent_comments(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2202        construct_endpoint(
2203            &self.base,
2204            &["api", "_private", "datasets", &dataset_name.0, "recent"],
2205        )
2206    }
2207
2208    fn dataset_statistics(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2209        construct_endpoint(
2210            &self.base,
2211            &["api", "_private", "datasets", &dataset_name.0, "statistics"],
2212        )
2213    }
2214
2215    fn source_statistics(&self, source_name: &SourceFullName) -> Result<Url> {
2216        construct_endpoint(
2217            &self.base,
2218            &["api", "v1", "sources", &source_name.0, "statistics"],
2219        )
2220    }
2221
2222    fn user_by_id(&self, user_id: &UserId) -> Result<Url> {
2223        construct_endpoint(&self.base, &["api", "_private", "users", &user_id.0])
2224    }
2225
2226    fn source_by_id(&self, source_id: &SourceId) -> Result<Url> {
2227        construct_endpoint(
2228            &self.base,
2229            &["api", "v1", "sources", &format!("id:{}", source_id.0)],
2230        )
2231    }
2232
2233    fn source_by_name(&self, source_name: &SourceFullName) -> Result<Url> {
2234        construct_endpoint(&self.base, &["api", "v1", "sources", &source_name.0])
2235    }
2236
2237    fn quotas(&self, tenant_id: &Option<UiPathTenantId>) -> Result<Url> {
2238        if let Some(tenant_id) = tenant_id {
2239            construct_endpoint(&self.base, &["api", "_private", "quotas", &tenant_id.0])
2240        } else {
2241            construct_endpoint(&self.base, &["api", "_private", "quotas"])
2242        }
2243    }
2244
2245    fn quota(&self, tenant_id: &TenantId, tenant_quota_kind: TenantQuotaKind) -> Result<Url> {
2246        construct_endpoint(
2247            &self.base,
2248            &[
2249                "api",
2250                "_private",
2251                "quotas",
2252                &tenant_id.to_string(),
2253                &tenant_quota_kind.to_string(),
2254            ],
2255        )
2256    }
2257
2258    fn put_comments(&self, source_name: &SourceFullName) -> Result<Url> {
2259        construct_endpoint(
2260            &self.base,
2261            &["api", "_private", "sources", &source_name.0, "comments"],
2262        )
2263    }
2264
2265    fn comments(&self, source_name: &SourceFullName) -> Result<Url> {
2266        construct_endpoint(
2267            &self.base,
2268            &["api", "_private", "sources", &source_name.0, "comments"],
2269        )
2270    }
2271
2272    fn comment_by_id(&self, source_name: &SourceFullName, comment_id: &CommentId) -> Result<Url> {
2273        construct_endpoint(
2274            &self.base,
2275            &[
2276                "api",
2277                "v1",
2278                "sources",
2279                &source_name.0,
2280                "comments",
2281                &comment_id.0,
2282            ],
2283        )
2284    }
2285
2286    fn comments_v1(&self, source_name: &SourceFullName) -> Result<Url> {
2287        construct_endpoint(
2288            &self.base,
2289            &["api", "v1", "sources", &source_name.0, "comments"],
2290        )
2291    }
2292
2293    fn sync_comments(&self, source_name: &SourceFullName) -> Result<Url> {
2294        construct_endpoint(
2295            &self.base,
2296            &["api", "v1", "sources", &source_name.0, "sync"],
2297        )
2298    }
2299
2300    fn sync_comments_raw_emails(&self, source_name: &SourceFullName) -> Result<Url> {
2301        construct_endpoint(
2302            &self.base,
2303            &["api", "v1", "sources", &source_name.0, "sync-raw-emails"],
2304        )
2305    }
2306
2307    fn comment_audio(&self, source_id: &SourceId, comment_id: &CommentId) -> Result<Url> {
2308        construct_endpoint(
2309            &self.base,
2310            &[
2311                "api",
2312                "_private",
2313                "sources",
2314                &format!("id:{}", source_id.0),
2315                "comments",
2316                &comment_id.0,
2317                "audio",
2318            ],
2319        )
2320    }
2321
2322    fn get_emails(&self, bucket_name: &BucketFullName) -> Result<Url> {
2323        construct_endpoint(
2324            &self.base,
2325            &["api", "_private", "buckets", &bucket_name.0, "emails"],
2326        )
2327    }
2328
2329    fn put_emails(&self, bucket_name: &BucketFullName) -> Result<Url> {
2330        construct_endpoint(
2331            &self.base,
2332            &["api", "_private", "buckets", &bucket_name.0, "emails"],
2333        )
2334    }
2335
2336    fn post_user(&self, user_id: &UserId) -> Result<Url> {
2337        construct_endpoint(&self.base, &["api", "_private", "users", &user_id.0])
2338    }
2339
2340    fn dataset_by_id(&self, dataset_id: &DatasetId) -> Result<Url> {
2341        construct_endpoint(
2342            &self.base,
2343            &["api", "v1", "datasets", &format!("id:{}", dataset_id.0)],
2344        )
2345    }
2346
2347    fn dataset_by_name(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2348        construct_endpoint(&self.base, &["api", "v1", "datasets", &dataset_name.0])
2349    }
2350
2351    fn get_labellings(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2352        construct_endpoint(
2353            &self.base,
2354            &["api", "_private", "datasets", &dataset_name.0, "labellings"],
2355        )
2356    }
2357
2358    fn labellers(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2359        construct_endpoint(
2360            &self.base,
2361            &["api", "_private", "datasets", &dataset_name.0, "labellers"],
2362        )
2363    }
2364
2365    fn get_comment_predictions(
2366        &self,
2367        dataset_name: &DatasetFullName,
2368        model_version: &ModelVersion,
2369    ) -> Result<Url> {
2370        construct_endpoint(
2371            &self.base,
2372            &[
2373                "api",
2374                "v1",
2375                "datasets",
2376                &dataset_name.0,
2377                "labellers",
2378                &model_version.0.to_string(),
2379                "predict-comments",
2380            ],
2381        )
2382    }
2383
2384    fn post_labelling(
2385        &self,
2386        dataset_name: &DatasetFullName,
2387        comment_uid: &CommentUid,
2388    ) -> Result<Url> {
2389        construct_endpoint(
2390            &self.base,
2391            &[
2392                "api",
2393                "_private",
2394                "datasets",
2395                &dataset_name.0,
2396                "labellings",
2397                &comment_uid.0,
2398            ],
2399        )
2400    }
2401
2402    fn bucket_by_id(&self, bucket_id: &BucketId) -> Result<Url> {
2403        construct_endpoint(
2404            &self.base,
2405            &["api", "_private", "buckets", &format!("id:{}", bucket_id.0)],
2406        )
2407    }
2408
2409    fn bucket_by_name(&self, bucket_name: &BucketFullName) -> Result<Url> {
2410        construct_endpoint(&self.base, &["api", "_private", "buckets", &bucket_name.0])
2411    }
2412
2413    fn project_by_name(&self, project_name: &ProjectName) -> Result<Url> {
2414        construct_endpoint(
2415            &self.base,
2416            &["api", "_private", "projects", &project_name.0],
2417        )
2418    }
2419
2420    fn welcome_email(&self, user_id: &UserId) -> Result<Url> {
2421        construct_endpoint(
2422            &self.base,
2423            &["api", "_private", "users", &user_id.0, "welcome-email"],
2424        )
2425    }
2426}
2427
2428const DEFAULT_HTTP_TIMEOUT_SECONDS: u64 = 240;
2429
2430fn build_http_client(config: &Config) -> Result<HttpClient> {
2431    let mut builder = HttpClient::builder()
2432        .gzip(true)
2433        .danger_accept_invalid_certs(config.accept_invalid_certificates)
2434        .timeout(Some(Duration::from_secs(DEFAULT_HTTP_TIMEOUT_SECONDS)));
2435
2436    if let Some(proxy) = config.proxy.clone() {
2437        builder = builder.proxy(Proxy::all(proxy).map_err(Error::BuildHttpClient)?);
2438    }
2439    builder.build().map_err(Error::BuildHttpClient)
2440}
2441
2442fn build_headers(config: &Config) -> Result<HeaderMap> {
2443    let mut headers = HeaderMap::new();
2444    headers.insert(
2445        header::AUTHORIZATION,
2446        HeaderValue::from_str(&format!("Bearer {}", &config.token.0)).map_err(|_| {
2447            Error::BadToken {
2448                token: config.token.0.clone(),
2449            }
2450        })?,
2451    );
2452    Ok(headers)
2453}
2454
2455fn id_list_query<'a>(ids: impl Iterator<Item = &'a String>) -> Vec<(&'static str, &'a str)> {
2456    // Return a list of pairs ("id", "a"), ("id", "b"), ...
2457    // The http client will turn this into a query string of
2458    // the form "id=a&id=b&..."
2459    ids.map(|id| ("id", id.as_str())).collect()
2460}
2461
2462pub static DEFAULT_ENDPOINT: Lazy<Url> =
2463    Lazy::new(|| Url::parse("https://reinfer.dev").expect("Default URL is well-formed"));
2464
2465#[cfg(test)]
2466mod tests {
2467    use super::*;
2468
2469    #[test]
2470    fn test_construct_endpoint() {
2471        let url = construct_endpoint(
2472            &Url::parse("https://cloud.uipath.com/org/tenant/reinfer_").unwrap(),
2473            &["api", "v1", "sources", "project", "source", "sync"],
2474        )
2475        .unwrap();
2476
2477        assert_eq!(
2478            url.to_string(),
2479            "https://cloud.uipath.com/org/tenant/reinfer_/api/v1/sources/project/source/sync"
2480        )
2481    }
2482
2483    #[test]
2484    fn test_id_list_query() {
2485        assert_eq!(id_list_query(Vec::new().iter()), Vec::new());
2486        assert_eq!(
2487            id_list_query(["foo".to_owned()].iter()),
2488            vec![("id", "foo")]
2489        );
2490        assert_eq!(
2491            id_list_query(
2492                [
2493                    "Stream".to_owned(),
2494                    "River".to_owned(),
2495                    "Waterfall".to_owned()
2496                ]
2497                .iter()
2498            ),
2499            [("id", "Stream"), ("id", "River"), ("id", "Waterfall"),]
2500        );
2501    }
2502}