reinfer_client/
lib.rs

1#![deny(clippy::all)]
2mod error;
3pub mod resources;
4pub mod retry;
5
6use chrono::{DateTime, Utc};
7use http::{header::ACCEPT, Method};
8use log::debug;
9use once_cell::sync::Lazy;
10use reqwest::{
11    blocking::{
12        multipart::{Form, Part},
13        Client as HttpClient, Response as HttpResponse,
14    },
15    header::{self, HeaderMap, HeaderValue},
16    IntoUrl, Proxy, Result as ReqwestResult,
17};
18use resources::{
19    attachments::UploadAttachmentResponse,
20    auth::{RefreshUserPermissionsRequest, RefreshUserPermissionsResponse},
21    bucket::{
22        GetKeyedSyncStateIdsRequest, GetKeyedSyncStateIdsResponse, GetKeyedSyncStatesResponse,
23        KeyedSyncState, KeyedSyncStateId,
24    },
25    bucket_statistics::GetBucketStatisticsResponse,
26    comment::{AttachmentReference, CommentTimestampFilter},
27    dataset::{
28        CreateIxpDatasetRequest, CreateIxpDatasetResponse, GetAllModelsInDatasetRequest,
29        GetAllModelsInDatasetRespone, IxpDatasetNew, QueryRequestParams, QueryResponse,
30        StatisticsRequestParams as DatasetStatisticsRequestParams, SummaryRequestParams,
31        SummaryResponse, UploadIxpDocumentResponse, UserModelMetadata,
32    },
33    documents::{Document, SyncRawEmailsRequest, SyncRawEmailsResponse},
34    email::{Email, GetEmailResponse},
35    integration::{
36        GetIntegrationResponse, GetIntegrationsResponse, Integration, NewIntegration,
37        PostIntegrationRequest, PostIntegrationResponse, PutIntegrationRequest,
38        PutIntegrationResponse,
39    },
40    label_def::{CreateOrUpdateLabelDefsBulkRequest, CreateOrUpdateLabelDefsBulkResponse},
41    project::ForceDeleteProject,
42    quota::{GetQuotasResponse, Quota},
43    source::StatisticsRequestParams as SourceStatisticsRequestParams,
44    stream::{GetStreamResponse, NewStream, PutStreamRequest, PutStreamResponse},
45    tenant_id::UiPathTenantId,
46    validation::{
47        LabelValidation, LabelValidationRequest, LabelValidationResponse, ValidationResponse,
48    },
49};
50use serde::{Deserialize, Serialize};
51use serde_json::json;
52use std::{
53    cell::Cell,
54    fmt::{Debug, Display},
55    io::Read,
56    path::{Path, PathBuf},
57    time::Duration,
58};
59use url::Url;
60
61use crate::resources::{
62    audit::{AuditQueryFilter, AuditQueryRequest, AuditQueryResponse},
63    bucket::{
64        CreateRequest as CreateBucketRequest, CreateResponse as CreateBucketResponse,
65        GetAvailableResponse as GetAvailableBucketsResponse, GetResponse as GetBucketResponse,
66    },
67    bucket_statistics::Statistics as BucketStatistics,
68    comment::{
69        GetAnnotationsResponse, GetCommentResponse, GetLabellingsAfter, GetPredictionsResponse,
70        GetRecentRequest, PutCommentsRequest, PutCommentsResponse, RecentCommentsPage,
71        SyncCommentsRequest, UpdateAnnotationsRequest,
72    },
73    dataset::{
74        CreateRequest as CreateDatasetRequest, CreateResponse as CreateDatasetResponse,
75        GetAvailableResponse as GetAvailableDatasetsResponse, GetResponse as GetDatasetResponse,
76        UpdateRequest as UpdateDatasetRequest, UpdateResponse as UpdateDatasetResponse,
77    },
78    email::{PutEmailsRequest, PutEmailsResponse},
79    project::{
80        CreateProjectRequest, CreateProjectResponse, GetProjectResponse, GetProjectsResponse,
81        UpdateProjectRequest, UpdateProjectResponse,
82    },
83    quota::{CreateQuota, TenantQuotaKind},
84    source::{
85        CreateRequest as CreateSourceRequest, CreateResponse as CreateSourceResponse,
86        GetAvailableResponse as GetAvailableSourcesResponse, GetResponse as GetSourceResponse,
87        UpdateRequest as UpdateSourceRequest, UpdateResponse as UpdateSourceResponse,
88    },
89    statistics::GetResponse as GetStatisticsResponse,
90    stream::{
91        AdvanceRequest as StreamAdvanceRequest, FetchRequest as StreamFetchRequest,
92        GetStreamsResponse, ResetRequest as StreamResetRequest,
93        TagExceptionsRequest as TagStreamExceptionsRequest,
94    },
95    tenant_id::TenantId,
96    user::{
97        CreateRequest as CreateUserRequest, CreateResponse as CreateUserResponse,
98        GetAvailableResponse as GetAvailableUsersResponse,
99        GetCurrentResponse as GetCurrentUserResponse, GetResponse as GetUserResponse,
100        PostUserRequest, PostUserResponse, WelcomeEmailResponse,
101    },
102    EmptySuccess, Response,
103};
104
105use crate::retry::{Retrier, RetryConfig};
106
107pub use crate::{
108    error::{Error, Result},
109    resources::{
110        bucket::{
111            Bucket, BucketType, FullName as BucketFullName, Id as BucketId,
112            Identifier as BucketIdentifier, Name as BucketName, NewBucket,
113        },
114        comment::{
115            AnnotatedComment, Comment, CommentFilter, CommentPredictionsThreshold,
116            CommentsIterPage, Continuation, EitherLabelling, Entities, Entity,
117            GetCommentPredictionsRequest, HasAnnotations, Id as CommentId, Label, Labelling,
118            Message, MessageBody, MessageSignature, MessageSubject, NewAnnotatedComment,
119            NewComment, NewEntities, NewLabelling, NewMoonForm, PredictedLabel, Prediction,
120            PropertyMap, PropertyValue, Sentiment, SyncCommentsResponse, TriggerLabelThreshold,
121            Uid as CommentUid,
122        },
123        dataset::{
124            Dataset, FullName as DatasetFullName, Id as DatasetId, Identifier as DatasetIdentifier,
125            ModelVersion, Name as DatasetName, NewDataset, UpdateDataset,
126        },
127        email::{
128            Continuation as EmailContinuation, EmailsIterPage, Id as EmailId, Mailbox, MimeContent,
129            NewEmail,
130        },
131        entity_def::{EntityDef, Id as EntityDefId, Name as EntityName, NewEntityDef},
132        integration::FullName as IntegrationFullName,
133        label_def::{
134            LabelDef, LabelDefPretrained, MoonFormFieldDef, Name as LabelName, NewLabelDef,
135            NewLabelDefPretrained, PretrainedId as LabelDefPretrainedId,
136        },
137        label_group::{
138            LabelGroup, Name as LabelGroupName, NewLabelGroup, DEFAULT_LABEL_GROUP_NAME,
139        },
140        project::{NewProject, Project, ProjectName, UpdateProject},
141        source::{
142            FullName as SourceFullName, Id as SourceId, Identifier as SourceIdentifier,
143            Name as SourceName, NewSource, Source, SourceKind, TransformTag, UpdateSource,
144        },
145        statistics::Statistics as CommentStatistics,
146        stream::{
147            Batch as StreamBatch, FullName as StreamFullName, SequenceId as StreamSequenceId,
148            Stream, StreamException, StreamExceptionMetadata,
149        },
150        user::{
151            Email as UserEmail, GlobalPermission, Id as UserId, Identifier as UserIdentifier,
152            ModifiedPermissions, NewUser, ProjectPermission, UpdateUser, User, Username,
153        },
154    },
155};
156
157#[derive(Clone, Debug, PartialEq, Eq)]
158pub struct Token(pub String);
159
160pub trait SplittableRequest {
161    fn split(self) -> impl Iterator<Item = Self>
162    where
163        Self: Sized;
164
165    fn count(&self) -> usize;
166}
167
168pub struct SplitableRequestResponse<ResponseT>
169where
170    for<'de> ResponseT: Deserialize<'de> + ReducibleResponse,
171{
172    pub response: ResponseT,
173    pub num_failed: usize,
174}
175
176pub trait ReducibleResponse {
177    fn merge(self, _b: Self) -> Self
178    where
179        Self: std::default::Default,
180    {
181        Default::default()
182    }
183
184    fn empty() -> Self
185    where
186        Self: std::default::Default,
187    {
188        Default::default()
189    }
190}
191
192pub struct Config {
193    pub endpoint: Url,
194    pub token: Token,
195    pub accept_invalid_certificates: bool,
196    pub proxy: Option<Url>,
197    /// Retry settings to use, if any. This will apply to all requests except for POST requests
198    /// which are not idempotent (as they cannot be naively retried).
199    pub retry_config: Option<RetryConfig>,
200}
201
202impl Default for Config {
203    fn default() -> Self {
204        Config {
205            endpoint: DEFAULT_ENDPOINT.clone(),
206            token: Token("".to_owned()),
207            accept_invalid_certificates: false,
208            proxy: None,
209            retry_config: None,
210        }
211    }
212}
213
214#[derive(Debug)]
215pub struct Client {
216    endpoints: Endpoints,
217    http_client: HttpClient,
218    headers: HeaderMap,
219    retrier: Option<Retrier>,
220}
221
222#[derive(Serialize)]
223pub struct GetLabellingsInBulk<'a> {
224    pub source_id: &'a SourceId,
225    pub return_predictions: &'a bool,
226
227    #[serde(skip_serializing_if = "Option::is_none")]
228    pub after: &'a Option<GetLabellingsAfter>,
229
230    #[serde(skip_serializing_if = "Option::is_none")]
231    pub limit: &'a Option<usize>,
232}
233
234#[derive(Serialize)]
235pub struct GetCommentsIterPageQuery<'a> {
236    #[serde(skip_serializing_if = "Option::is_none")]
237    pub from_timestamp: Option<DateTime<Utc>>,
238    #[serde(skip_serializing_if = "Option::is_none")]
239    pub to_timestamp: Option<DateTime<Utc>>,
240    #[serde(skip_serializing_if = "Option::is_none")]
241    pub after: Option<&'a Continuation>,
242    pub limit: usize,
243    pub include_markup: bool,
244}
245
246#[derive(Serialize)]
247pub struct GetEmailsIterPageQuery<'a> {
248    #[serde(skip_serializing_if = "Option::is_none")]
249    pub continuation: Option<&'a EmailContinuation>,
250    pub limit: usize,
251}
252
253#[derive(Serialize)]
254pub struct GetCommentQuery {
255    pub include_markup: bool,
256}
257
258#[derive(Serialize)]
259pub struct GetEmailQuery {
260    pub id: String,
261}
262
263impl Client {
264    /// Create a new API client.
265    pub fn new(config: Config) -> Result<Client> {
266        let http_client = build_http_client(&config)?;
267        let headers = build_headers(&config)?;
268        let endpoints = Endpoints::new(config.endpoint)?;
269        let retrier = config.retry_config.map(Retrier::new);
270        Ok(Client {
271            endpoints,
272            http_client,
273            headers,
274            retrier,
275        })
276    }
277
278    /// Get the base url for the client
279    pub fn base_url(&self) -> &Url {
280        &self.endpoints.base
281    }
282
283    /// List all visible sources.
284    pub fn get_sources(&self) -> Result<Vec<Source>> {
285        Ok(self
286            .get::<_, GetAvailableSourcesResponse>(self.endpoints.sources.clone())?
287            .sources)
288    }
289
290    /// Get a source by either id or name.
291    pub fn get_user(&self, user: impl Into<UserIdentifier>) -> Result<User> {
292        Ok(match user.into() {
293            UserIdentifier::Id(user_id) => {
294                self.get::<_, GetUserResponse>(self.endpoints.user_by_id(&user_id)?)?
295                    .user
296            }
297        })
298    }
299
300    /// Get a source by either id or name.
301    pub fn get_source(&self, source: impl Into<SourceIdentifier>) -> Result<Source> {
302        Ok(match source.into() {
303            SourceIdentifier::Id(source_id) => {
304                self.get::<_, GetSourceResponse>(self.endpoints.source_by_id(&source_id)?)?
305                    .source
306            }
307            SourceIdentifier::FullName(source_name) => {
308                self.get::<_, GetSourceResponse>(self.endpoints.source_by_name(&source_name)?)?
309                    .source
310            }
311        })
312    }
313
314    pub fn create_label_defs_bulk(
315        &self,
316        dataset_name: &DatasetFullName,
317        label_group: LabelGroupName,
318        label_defs: Vec<NewLabelDef>,
319    ) -> Result<()> {
320        self.put::<_, _, CreateOrUpdateLabelDefsBulkResponse>(
321            self.endpoints.label_group(dataset_name, label_group)?,
322            CreateOrUpdateLabelDefsBulkRequest { label_defs },
323        )?;
324        Ok(())
325    }
326
327    /// Create a new source.
328    pub fn create_source(
329        &self,
330        source_name: &SourceFullName,
331        options: NewSource<'_>,
332    ) -> Result<Source> {
333        Ok(self
334            .put::<_, _, CreateSourceResponse>(
335                self.endpoints.source_by_name(source_name)?,
336                CreateSourceRequest { source: options },
337            )?
338            .source)
339    }
340
341    /// Update a source.
342    pub fn update_source(
343        &self,
344        source_name: &SourceFullName,
345        options: UpdateSource<'_>,
346    ) -> Result<Source> {
347        Ok(self
348            .post::<_, _, UpdateSourceResponse>(
349                self.endpoints.source_by_name(source_name)?,
350                UpdateSourceRequest { source: options },
351                Retry::Yes,
352            )?
353            .source)
354    }
355
356    /// Delete a source.
357    pub fn delete_source(&self, source: impl Into<SourceIdentifier>) -> Result<()> {
358        let source_id = match source.into() {
359            SourceIdentifier::Id(source_id) => source_id,
360            source @ SourceIdentifier::FullName(_) => self.get_source(source)?.id,
361        };
362        self.delete(self.endpoints.source_by_id(&source_id)?)
363    }
364
365    /// Set a quota
366    pub fn create_quota(
367        &self,
368        target_tenant_id: &TenantId,
369        tenant_quota_kind: TenantQuotaKind,
370        options: CreateQuota,
371    ) -> Result<()> {
372        self.post(
373            self.endpoints.quota(target_tenant_id, tenant_quota_kind)?,
374            options,
375            Retry::Yes,
376        )
377    }
378
379    /// Get quotas for current tenant
380    pub fn get_quotas(&self, tenant_id: &Option<UiPathTenantId>) -> Result<Vec<Quota>> {
381        Ok(self
382            .get::<_, GetQuotasResponse>(self.endpoints.quotas(tenant_id)?)?
383            .quotas)
384    }
385
386    /// Delete a user.
387    pub fn delete_user(&self, user: impl Into<UserIdentifier>) -> Result<()> {
388        let UserIdentifier::Id(user_id) = user.into();
389        self.delete(self.endpoints.user_by_id(&user_id)?)
390    }
391
392    /// Delete comments by id in a source.
393    pub fn delete_comments(
394        &self,
395        source: impl Into<SourceIdentifier>,
396        comments: &[CommentId],
397    ) -> Result<()> {
398        let source_full_name = match source.into() {
399            source @ SourceIdentifier::Id(_) => self.get_source(source)?.full_name(),
400            SourceIdentifier::FullName(source_full_name) => source_full_name,
401        };
402        self.delete_query(
403            self.endpoints.comments_v1(&source_full_name)?,
404            Some(&id_list_query(comments.iter().map(|uid| &uid.0))),
405        )
406    }
407
408    /// Get a page of comments from a source.
409    pub fn get_comments_iter_page(
410        &self,
411        source_name: &SourceFullName,
412        continuation: Option<&ContinuationKind>,
413        to_timestamp: Option<DateTime<Utc>>,
414        limit: usize,
415    ) -> Result<CommentsIterPage> {
416        // Comments are returned from the API in increasing order of their
417        // `timestamp` field.
418        let (from_timestamp, after) = match continuation {
419            // If we have a timestamp, then this is a request for the first page of
420            // a series of comments with timestamps starting from the given time.
421            Some(ContinuationKind::Timestamp(from_timestamp)) => (Some(*from_timestamp), None),
422            // If we have a continuation, then this is a request for page n+1 of
423            // a series of comments, where the continuation came from page n.
424            Some(ContinuationKind::Continuation(after)) => (None, Some(after)),
425            // Otherwise, this is a request for the first page of a series of comments
426            // with timestamps starting from the beginning of time.
427            None => (None, None),
428        };
429        let query_params = GetCommentsIterPageQuery {
430            from_timestamp,
431            to_timestamp,
432            after,
433            limit,
434            include_markup: true,
435        };
436        self.get_query(self.endpoints.comments(source_name)?, Some(&query_params))
437    }
438
439    /// Iterate through all comments for a given dataset query.
440    pub fn get_dataset_query_iter<'a>(
441        &'a self,
442        dataset_name: &'a DatasetFullName,
443        params: &'a mut QueryRequestParams,
444    ) -> DatasetQueryIter<'a> {
445        DatasetQueryIter::new(self, dataset_name, params)
446    }
447
448    /// Iterate through all comments in a source.
449    pub fn get_comments_iter<'a>(
450        &'a self,
451        source_name: &'a SourceFullName,
452        page_size: Option<usize>,
453        timerange: CommentsIterTimerange,
454    ) -> CommentsIter<'a> {
455        CommentsIter::new(self, source_name, page_size, timerange)
456    }
457
458    pub fn get_keyed_sync_state_ids(
459        &self,
460        bucket_id: &BucketId,
461        request: &GetKeyedSyncStateIdsRequest,
462    ) -> Result<Vec<KeyedSyncStateId>> {
463        Ok(self
464            .post::<_, _, GetKeyedSyncStateIdsResponse>(
465                self.endpoints.query_keyed_sync_state_ids(bucket_id)?,
466                Some(&request),
467                Retry::Yes,
468            )?
469            .keyed_sync_state_ids)
470    }
471
472    pub fn delete_keyed_sync_state(
473        &self,
474        bucket_id: &BucketId,
475        id: &KeyedSyncStateId,
476    ) -> Result<()> {
477        self.delete(self.endpoints.keyed_sync_state(bucket_id, id)?)
478    }
479
480    pub fn get_keyed_sync_states(&self, bucket_id: &BucketId) -> Result<Vec<KeyedSyncState>> {
481        Ok(self
482            .get::<_, GetKeyedSyncStatesResponse>(self.endpoints.keyed_sync_states(bucket_id)?)?
483            .keyed_sync_states)
484    }
485
486    /// Get a single of email from a bucket.
487    pub fn get_email(&self, bucket_name: &BucketFullName, id: EmailId) -> Result<Vec<Email>> {
488        let query_params = GetEmailQuery { id: id.0 };
489        Ok(self
490            .get_query::<_, _, GetEmailResponse>(
491                self.endpoints.get_emails(bucket_name)?,
492                Some(&query_params),
493            )?
494            .emails)
495    }
496
497    /// Get a page of emails from a bucket.
498    pub fn get_emails_iter_page(
499        &self,
500        bucket_name: &BucketFullName,
501        continuation: Option<&EmailContinuation>,
502        limit: usize,
503    ) -> Result<EmailsIterPage> {
504        let query_params = GetEmailsIterPageQuery {
505            continuation,
506            limit,
507        };
508        self.post(
509            self.endpoints.get_emails(bucket_name)?,
510            Some(&query_params),
511            Retry::Yes,
512        )
513    }
514
515    /// Iterate through all comments in a source.
516    pub fn get_emails_iter<'a>(
517        &'a self,
518        bucket_name: &'a BucketFullName,
519        page_size: Option<usize>,
520    ) -> EmailsIter<'a> {
521        EmailsIter::new(self, bucket_name, page_size)
522    }
523
524    /// Get a single comment by id.
525    pub fn get_comment<'a>(
526        &'a self,
527        source_name: &'a SourceFullName,
528        comment_id: &'a CommentId,
529    ) -> Result<Comment> {
530        let query_params = GetCommentQuery {
531            include_markup: true,
532        };
533        Ok(self
534            .get_query::<_, _, GetCommentResponse>(
535                self.endpoints.comment_by_id(source_name, comment_id)?,
536                Some(&query_params),
537            )?
538            .comment)
539    }
540    pub fn post_integration(
541        &self,
542        name: &IntegrationFullName,
543        integration: &NewIntegration,
544    ) -> Result<PostIntegrationResponse> {
545        self.request(
546            &Method::POST,
547            &self.endpoints.integration(name)?,
548            &Some(PostIntegrationRequest {
549                integration: integration.clone(),
550            }),
551            &None::<()>,
552            &Retry::No,
553        )
554    }
555
556    pub fn put_integration(
557        &self,
558        name: &IntegrationFullName,
559        integration: &NewIntegration,
560    ) -> Result<PutIntegrationResponse> {
561        self.request(
562            &Method::PUT,
563            &self.endpoints.integration(name)?,
564            &Some(PutIntegrationRequest {
565                integration: integration.clone(),
566            }),
567            &None::<()>,
568            &Retry::No,
569        )
570    }
571
572    pub fn put_comments_split_on_failure(
573        &self,
574        source_name: &SourceFullName,
575        comments: Vec<NewComment>,
576        no_charge: bool,
577    ) -> Result<SplitableRequestResponse<PutCommentsResponse>> {
578        // Retrying here despite the potential for 409's in order to increase reliability when
579        // working with poor connection
580
581        self.splitable_request(
582            Method::PUT,
583            self.endpoints.put_comments(source_name)?,
584            PutCommentsRequest { comments },
585            Some(NoChargeQuery { no_charge }),
586            Retry::Yes,
587        )
588    }
589
590    pub fn put_comments(
591        &self,
592        source_name: &SourceFullName,
593        comments: Vec<NewComment>,
594        no_charge: bool,
595    ) -> Result<PutCommentsResponse> {
596        // Retrying here despite the potential for 409's in order to increase reliability when
597        // working with poor connection
598        self.request(
599            &Method::PUT,
600            &self.endpoints.put_comments(source_name)?,
601            &Some(PutCommentsRequest { comments }),
602            &Some(NoChargeQuery { no_charge }),
603            &Retry::Yes,
604        )
605    }
606
607    pub fn put_stream(
608        &self,
609        dataset_name: &DatasetFullName,
610        stream: &NewStream,
611    ) -> Result<PutStreamResponse> {
612        self.put(
613            self.endpoints.streams(dataset_name)?,
614            Some(PutStreamRequest { stream }),
615        )
616    }
617
618    pub fn get_audit_events(
619        &self,
620        minimum_timestamp: Option<DateTime<Utc>>,
621        maximum_timestamp: Option<DateTime<Utc>>,
622        continuation: Option<Continuation>,
623    ) -> Result<AuditQueryResponse> {
624        self.post::<_, _, AuditQueryResponse>(
625            self.endpoints.audit_events_query()?,
626            AuditQueryRequest {
627                continuation,
628                filter: AuditQueryFilter {
629                    timestamp: CommentTimestampFilter {
630                        minimum: minimum_timestamp,
631                        maximum: maximum_timestamp,
632                    },
633                },
634            },
635            Retry::Yes,
636        )
637    }
638    pub fn get_latest_validation(
639        &self,
640        dataset_name: &DatasetFullName,
641    ) -> Result<ValidationResponse> {
642        self.get::<_, ValidationResponse>(self.endpoints.latest_validation(dataset_name)?)
643    }
644
645    pub fn get_validation(
646        &self,
647        dataset_name: &DatasetFullName,
648        model_version: &ModelVersion,
649    ) -> Result<ValidationResponse> {
650        self.get::<_, ValidationResponse>(self.endpoints.validation(dataset_name, model_version)?)
651    }
652
653    pub fn get_labellers(&self, dataset_name: &DatasetFullName) -> Result<Vec<UserModelMetadata>> {
654        Ok(self
655            .post::<_, _, GetAllModelsInDatasetRespone>(
656                self.endpoints.labellers(dataset_name)?,
657                GetAllModelsInDatasetRequest {},
658                Retry::Yes,
659            )?
660            .labellers)
661    }
662
663    pub fn get_label_validation(
664        &self,
665        label: &LabelName,
666        dataset_name: &DatasetFullName,
667        model_version: &ModelVersion,
668    ) -> Result<LabelValidation> {
669        Ok(self
670            .post::<_, _, LabelValidationResponse>(
671                self.endpoints
672                    .label_validation(dataset_name, model_version)?,
673                LabelValidationRequest {
674                    label: label.clone(),
675                },
676                Retry::Yes,
677            )?
678            .label_validation)
679    }
680
681    pub fn sync_comments(
682        &self,
683        source_name: &SourceFullName,
684        comments: Vec<NewComment>,
685        no_charge: bool,
686    ) -> Result<SyncCommentsResponse> {
687        self.request(
688            &Method::POST,
689            &self.endpoints.sync_comments(source_name)?,
690            &Some(SyncCommentsRequest { comments }),
691            &Some(NoChargeQuery { no_charge }),
692            &Retry::Yes,
693        )
694    }
695
696    pub fn sync_comments_split_on_failure(
697        &self,
698        source_name: &SourceFullName,
699        comments: Vec<NewComment>,
700        no_charge: bool,
701    ) -> Result<SplitableRequestResponse<SyncCommentsResponse>> {
702        self.splitable_request(
703            Method::POST,
704            self.endpoints.sync_comments(source_name)?,
705            SyncCommentsRequest { comments },
706            Some(NoChargeQuery { no_charge }),
707            Retry::Yes,
708        )
709    }
710
711    pub fn sync_raw_emails(
712        &self,
713        source_name: &SourceFullName,
714        documents: &[Document],
715        transform_tag: &TransformTag,
716        include_comments: bool,
717        no_charge: bool,
718    ) -> Result<SyncRawEmailsResponse> {
719        self.request(
720            &Method::POST,
721            &self.endpoints.sync_comments_raw_emails(source_name)?,
722            &Some(SyncRawEmailsRequest {
723                documents,
724                transform_tag,
725                include_comments,
726            }),
727            &Some(NoChargeQuery { no_charge }),
728            &Retry::Yes,
729        )
730    }
731
732    pub fn put_emails_split_on_failure(
733        &self,
734        bucket_name: &BucketFullName,
735        emails: Vec<NewEmail>,
736        no_charge: bool,
737    ) -> Result<SplitableRequestResponse<PutEmailsResponse>> {
738        self.splitable_request(
739            Method::PUT,
740            self.endpoints.put_emails(bucket_name)?,
741            PutEmailsRequest { emails },
742            Some(NoChargeQuery { no_charge }),
743            Retry::Yes,
744        )
745    }
746
747    pub fn put_emails(
748        &self,
749        bucket_name: &BucketFullName,
750        emails: Vec<NewEmail>,
751        no_charge: bool,
752    ) -> Result<PutEmailsResponse> {
753        self.request(
754            &Method::PUT,
755            &self.endpoints.put_emails(bucket_name)?,
756            &Some(PutEmailsRequest { emails }),
757            &Some(NoChargeQuery { no_charge }),
758            &Retry::Yes,
759        )
760    }
761
762    pub fn post_user(&self, user_id: &UserId, user: UpdateUser) -> Result<PostUserResponse> {
763        self.post(
764            self.endpoints.post_user(user_id)?,
765            PostUserRequest { user: &user },
766            Retry::Yes,
767        )
768    }
769
770    pub fn put_comment_audio(
771        &self,
772        source_id: &SourceId,
773        comment_id: &CommentId,
774        audio_path: impl AsRef<Path>,
775    ) -> Result<()> {
776        let form = Form::new()
777            .file("file", audio_path)
778            .map_err(|source| Error::Unknown {
779                message: "PUT comment audio operation failed".to_owned(),
780                source: source.into(),
781            })?;
782        let http_response = self
783            .http_client
784            .put(self.endpoints.comment_audio(source_id, comment_id)?)
785            .headers(self.headers.clone())
786            .multipart(form)
787            .send()
788            .map_err(|source| Error::ReqwestError {
789                message: "PUT comment audio operation failed".to_owned(),
790                source,
791            })?;
792        let status = http_response.status();
793        http_response
794            .json::<Response<EmptySuccess>>()
795            .map_err(Error::BadJsonResponse)?
796            .into_result(status)?;
797        Ok(())
798    }
799
800    pub fn upload_ixp_document(
801        &self,
802        source_id: &SourceId,
803        filename: String,
804        bytes: Vec<u8>,
805    ) -> Result<CommentId> {
806        let endpoint = self.endpoints.ixp_documents(source_id)?;
807
808        let do_request = || {
809            let form = Form::new().part(
810                "file",
811                Part::bytes(bytes.clone()).file_name(filename.clone()),
812            );
813            let request = self
814                .http_client
815                .request(Method::PUT, endpoint.clone())
816                .multipart(form)
817                .headers(self.headers.clone());
818
819            request.send()
820        };
821
822        let result = self.with_retries(do_request);
823
824        let http_response = result.map_err(|source| Error::ReqwestError {
825            source,
826            message: "Operation failed.".to_string(),
827        })?;
828
829        let status = http_response.status();
830
831        Ok(http_response
832            .json::<Response<UploadIxpDocumentResponse>>()
833            .map_err(Error::BadJsonResponse)?
834            .into_result(status)?
835            .comment_id)
836    }
837
838    pub fn upload_comment_attachment(
839        &self,
840        source_id: &SourceId,
841        comment_id: &CommentId,
842        attachment_index: usize,
843        attachment: &PathBuf,
844    ) -> Result<UploadAttachmentResponse> {
845        let url = self
846            .endpoints
847            .attachment_upload(source_id, comment_id, attachment_index)?;
848
849        if !attachment.is_file() || !attachment.exists() {
850            return Err(Error::FileDoesNotExist {
851                path: attachment.clone(),
852            });
853        }
854
855        let do_request = || {
856            let form = Form::new()
857                .file("file", attachment)
858                .map_err(|source| Error::Unknown {
859                    message: "PUT comment attachment operation failed".to_owned(),
860                    source: source.into(),
861                })
862                .unwrap();
863            let request = self
864                .http_client
865                .request(Method::PUT, url.clone())
866                .multipart(form)
867                .headers(self.headers.clone());
868
869            request.send()
870        };
871
872        let result = self.with_retries(do_request);
873
874        let http_response = result.map_err(|source| Error::ReqwestError {
875            source,
876            message: "Operation failed.".to_string(),
877        })?;
878
879        let status = http_response.status();
880
881        http_response
882            .json::<Response<UploadAttachmentResponse>>()
883            .map_err(Error::BadJsonResponse)?
884            .into_result(status)
885    }
886
887    pub fn get_ixp_document(
888        &self,
889        source_id: &SourceId,
890        comment_id: &CommentId,
891    ) -> Result<Vec<u8>> {
892        self.get_octet_stream(&self.endpoints.ixp_document(source_id, comment_id)?)
893    }
894
895    fn get_octet_stream(&self, endpoint: &Url) -> Result<Vec<u8>> {
896        let mut response = self.raw_request(
897            &Method::GET,
898            endpoint,
899            &None::<()>,
900            &None::<()>,
901            &Retry::Yes,
902            None,
903        )?;
904
905        let status = response.status();
906
907        if !status.is_success() {
908            return Err(Error::Api {
909                status_code: status,
910                message: "Bad status code when getting octet stream".to_string(),
911            });
912        }
913
914        let mut buffer = Vec::new();
915
916        response
917            .read_to_end(&mut buffer)
918            .map_err(|source| Error::Unknown {
919                message: "Failed to read buffer".to_string(),
920                source: Box::new(source),
921            })?;
922        Ok(buffer)
923    }
924
925    pub fn get_attachment(&self, reference: &AttachmentReference) -> Result<Vec<u8>> {
926        self.get_octet_stream(&self.endpoints.attachment_reference(reference)?)
927    }
928
929    pub fn get_integrations(&self) -> Result<Vec<Integration>> {
930        Ok(self
931            .get::<_, GetIntegrationsResponse>(self.endpoints.integrations()?)?
932            .integrations)
933    }
934
935    pub fn get_integration(&self, name: &IntegrationFullName) -> Result<Integration> {
936        Ok(self
937            .get::<_, GetIntegrationResponse>(self.endpoints.integration(name)?)?
938            .integration)
939    }
940
941    pub fn get_datasets(&self) -> Result<Vec<Dataset>> {
942        Ok(self
943            .get::<_, GetAvailableDatasetsResponse>(self.endpoints.datasets.clone())?
944            .datasets)
945    }
946
947    pub fn get_dataset<IdentifierT>(&self, dataset: IdentifierT) -> Result<Dataset>
948    where
949        IdentifierT: Into<DatasetIdentifier>,
950    {
951        Ok(match dataset.into() {
952            DatasetIdentifier::Id(dataset_id) => {
953                self.get::<_, GetDatasetResponse>(self.endpoints.dataset_by_id(&dataset_id)?)?
954                    .dataset
955            }
956            DatasetIdentifier::FullName(dataset_name) => {
957                self.get::<_, GetDatasetResponse>(self.endpoints.dataset_by_name(&dataset_name)?)?
958                    .dataset
959            }
960        })
961    }
962
963    /// Create a ixp dataset.
964    pub fn create_ixp_dataset(&self, dataset: IxpDatasetNew) -> Result<Dataset> {
965        Ok(self
966            .put::<_, _, CreateIxpDatasetResponse>(
967                self.endpoints.ixp_datasets()?,
968                CreateIxpDatasetRequest { dataset },
969            )?
970            .dataset)
971    }
972
973    /// Create a dataset.
974    pub fn create_dataset(
975        &self,
976        dataset_name: &DatasetFullName,
977        options: NewDataset<'_>,
978    ) -> Result<Dataset> {
979        Ok(self
980            .put::<_, _, CreateDatasetResponse>(
981                self.endpoints.dataset_by_name(dataset_name)?,
982                CreateDatasetRequest { dataset: options },
983            )?
984            .dataset)
985    }
986
987    /// Update a dataset.
988    pub fn update_dataset(
989        &self,
990        dataset_name: &DatasetFullName,
991        options: UpdateDataset<'_>,
992    ) -> Result<Dataset> {
993        Ok(self
994            .post::<_, _, UpdateDatasetResponse>(
995                self.endpoints.dataset_by_name(dataset_name)?,
996                UpdateDatasetRequest { dataset: options },
997                Retry::Yes,
998            )?
999            .dataset)
1000    }
1001
1002    pub fn delete_dataset<IdentifierT>(&self, dataset: IdentifierT) -> Result<()>
1003    where
1004        IdentifierT: Into<DatasetIdentifier>,
1005    {
1006        let dataset_id = match dataset.into() {
1007            DatasetIdentifier::Id(dataset_id) => dataset_id,
1008            dataset @ DatasetIdentifier::FullName(_) => self.get_dataset(dataset)?.id,
1009        };
1010        self.delete(self.endpoints.dataset_by_id(&dataset_id)?)
1011    }
1012
1013    /// Get labellings for a given a dataset and a list of comment UIDs.
1014    pub fn get_labellings<'a>(
1015        &self,
1016        dataset_name: &DatasetFullName,
1017        comment_uids: impl Iterator<Item = &'a CommentUid>,
1018    ) -> Result<Vec<AnnotatedComment>> {
1019        Ok(self
1020            .get_query::<_, _, GetAnnotationsResponse>(
1021                self.endpoints.get_labellings(dataset_name)?,
1022                Some(&id_list_query(comment_uids.into_iter().map(|id| &id.0))),
1023            )?
1024            .results)
1025    }
1026
1027    /// Iterate through all reviewed comments in a source.
1028    pub fn get_labellings_iter<'a>(
1029        &'a self,
1030        dataset_name: &'a DatasetFullName,
1031        source_id: &'a SourceId,
1032        return_predictions: bool,
1033        limit: Option<usize>,
1034    ) -> LabellingsIter<'a> {
1035        LabellingsIter::new(self, dataset_name, source_id, return_predictions, limit)
1036    }
1037
1038    /// Get reviewed comments in bulk
1039    pub fn get_labellings_in_bulk(
1040        &self,
1041        dataset_name: &DatasetFullName,
1042        query_parameters: GetLabellingsInBulk<'_>,
1043    ) -> Result<GetAnnotationsResponse> {
1044        self.get_query::<_, _, GetAnnotationsResponse>(
1045            self.endpoints.get_labellings(dataset_name)?,
1046            Some(&query_parameters),
1047        )
1048    }
1049
1050    /// Update labellings for a given a dataset and comment UID.
1051    pub fn update_labelling(
1052        &self,
1053        dataset_name: &DatasetFullName,
1054        comment_uid: &CommentUid,
1055        labelling: Option<&[NewLabelling]>,
1056        entities: Option<&NewEntities>,
1057        moon_forms: Option<&[NewMoonForm]>,
1058    ) -> Result<AnnotatedComment> {
1059        self.post::<_, _, AnnotatedComment>(
1060            self.endpoints.post_labelling(dataset_name, comment_uid)?,
1061            UpdateAnnotationsRequest {
1062                labelling,
1063                entities,
1064                moon_forms,
1065            },
1066            Retry::Yes,
1067        )
1068    }
1069
1070    /// Get predictions for a given a dataset, a model version, and a list of comment UIDs.
1071    pub fn get_comment_predictions<'a>(
1072        &self,
1073        dataset_name: &DatasetFullName,
1074        model_version: &ModelVersion,
1075        comment_uids: impl Iterator<Item = &'a CommentUid>,
1076        threshold: Option<CommentPredictionsThreshold>,
1077        labels: Option<Vec<TriggerLabelThreshold>>,
1078    ) -> Result<Vec<Prediction>> {
1079        Ok(self
1080            .post::<_, _, GetPredictionsResponse>(
1081                self.endpoints
1082                    .get_comment_predictions(dataset_name, model_version)?,
1083                GetCommentPredictionsRequest {
1084                    uids: comment_uids
1085                        .into_iter()
1086                        .map(|id| id.0.clone())
1087                        .collect::<Vec<_>>(),
1088
1089                    threshold,
1090                    labels,
1091                },
1092                Retry::Yes,
1093            )?
1094            .predictions)
1095    }
1096
1097    pub fn get_streams(&self, dataset_name: &DatasetFullName) -> Result<Vec<Stream>> {
1098        Ok(self
1099            .get::<_, GetStreamsResponse>(self.endpoints.streams(dataset_name)?)?
1100            .streams)
1101    }
1102
1103    pub fn get_recent_comments(
1104        &self,
1105        dataset_name: &DatasetFullName,
1106        filter: &CommentFilter,
1107        limit: usize,
1108        continuation: Option<&Continuation>,
1109    ) -> Result<RecentCommentsPage> {
1110        self.post::<_, _, RecentCommentsPage>(
1111            self.endpoints.recent_comments(dataset_name)?,
1112            GetRecentRequest {
1113                limit,
1114                filter,
1115                continuation,
1116            },
1117            Retry::No,
1118        )
1119    }
1120
1121    pub fn refresh_user_permissions(&self) -> Result<RefreshUserPermissionsResponse> {
1122        self.post::<_, _, RefreshUserPermissionsResponse>(
1123            self.endpoints.refresh_user_permissions()?,
1124            RefreshUserPermissionsRequest {},
1125            Retry::Yes,
1126        )
1127    }
1128
1129    pub fn get_current_user(&self) -> Result<User> {
1130        Ok(self
1131            .get::<_, GetCurrentUserResponse>(self.endpoints.current_user.clone())?
1132            .user)
1133    }
1134
1135    pub fn get_users(&self) -> Result<Vec<User>> {
1136        Ok(self
1137            .get::<_, GetAvailableUsersResponse>(self.endpoints.users.clone())?
1138            .users)
1139    }
1140
1141    pub fn create_user(&self, user: NewUser<'_>) -> Result<User> {
1142        Ok(self
1143            .put::<_, _, CreateUserResponse>(
1144                self.endpoints.users.clone(),
1145                CreateUserRequest { user },
1146            )?
1147            .user)
1148    }
1149
1150    pub fn dataset_summary(
1151        &self,
1152        dataset_name: &DatasetFullName,
1153        params: &SummaryRequestParams,
1154    ) -> Result<SummaryResponse> {
1155        self.post::<_, _, SummaryResponse>(
1156            self.endpoints.dataset_summary(dataset_name)?,
1157            serde_json::to_value(params).expect("summary params serialization error"),
1158            Retry::Yes,
1159        )
1160    }
1161
1162    pub fn query_dataset_csv(
1163        &self,
1164        dataset_name: &DatasetFullName,
1165        params: &QueryRequestParams,
1166    ) -> Result<String> {
1167        let response = self
1168            .raw_request(
1169                &Method::POST,
1170                &self.endpoints.query_dataset(dataset_name)?,
1171                &Some(serde_json::to_value(params).expect("query params serialization error")),
1172                &None::<()>,
1173                &Retry::Yes,
1174                Some(HeaderValue::from_str("text/csv").expect("Could not parse csv header")),
1175            )?
1176            .text()
1177            .expect("Could not get csv text");
1178
1179        Ok(response)
1180    }
1181
1182    pub fn query_dataset(
1183        &self,
1184        dataset_name: &DatasetFullName,
1185        params: &QueryRequestParams,
1186    ) -> Result<QueryResponse> {
1187        self.post::<_, _, QueryResponse>(
1188            self.endpoints.query_dataset(dataset_name)?,
1189            serde_json::to_value(params).expect("query params serialization error"),
1190            Retry::Yes,
1191        )
1192    }
1193
1194    pub fn send_welcome_email(&self, user_id: UserId) -> Result<()> {
1195        self.post::<_, _, WelcomeEmailResponse>(
1196            self.endpoints.welcome_email(&user_id)?,
1197            json!({}),
1198            Retry::No,
1199        )?;
1200        Ok(())
1201    }
1202
1203    pub fn get_bucket_statistics(&self, bucket_name: &BucketFullName) -> Result<BucketStatistics> {
1204        Ok(self
1205            .get::<_, GetBucketStatisticsResponse>(self.endpoints.bucket_statistics(bucket_name)?)?
1206            .statistics)
1207    }
1208
1209    pub fn get_dataset_statistics(
1210        &self,
1211        dataset_name: &DatasetFullName,
1212        params: &DatasetStatisticsRequestParams,
1213    ) -> Result<CommentStatistics> {
1214        Ok(self
1215            .post::<_, _, GetStatisticsResponse>(
1216                self.endpoints.dataset_statistics(dataset_name)?,
1217                serde_json::to_value(params)
1218                    .expect("dataset statistics params serialization error"),
1219                Retry::No,
1220            )?
1221            .statistics)
1222    }
1223
1224    pub fn get_source_statistics(
1225        &self,
1226        source_name: &SourceFullName,
1227        params: &SourceStatisticsRequestParams,
1228    ) -> Result<CommentStatistics> {
1229        Ok(self
1230            .post::<_, _, GetStatisticsResponse>(
1231                self.endpoints.source_statistics(source_name)?,
1232                serde_json::to_value(params).expect("source statistics params serialization error"),
1233                Retry::No,
1234            )?
1235            .statistics)
1236    }
1237
1238    /// Create a new bucket.
1239    pub fn create_bucket(
1240        &self,
1241        bucket_name: &BucketFullName,
1242        options: NewBucket<'_>,
1243    ) -> Result<Bucket> {
1244        Ok(self
1245            .put::<_, _, CreateBucketResponse>(
1246                self.endpoints.bucket_by_name(bucket_name)?,
1247                CreateBucketRequest { bucket: options },
1248            )?
1249            .bucket)
1250    }
1251
1252    pub fn get_buckets(&self) -> Result<Vec<Bucket>> {
1253        Ok(self
1254            .get::<_, GetAvailableBucketsResponse>(self.endpoints.buckets.clone())?
1255            .buckets)
1256    }
1257
1258    pub fn get_bucket<IdentifierT>(&self, bucket: IdentifierT) -> Result<Bucket>
1259    where
1260        IdentifierT: Into<BucketIdentifier>,
1261    {
1262        Ok(match bucket.into() {
1263            BucketIdentifier::Id(bucket_id) => {
1264                self.get::<_, GetBucketResponse>(self.endpoints.bucket_by_id(&bucket_id)?)?
1265                    .bucket
1266            }
1267            BucketIdentifier::FullName(bucket_name) => {
1268                self.get::<_, GetBucketResponse>(self.endpoints.bucket_by_name(&bucket_name)?)?
1269                    .bucket
1270            }
1271        })
1272    }
1273
1274    pub fn delete_bucket<IdentifierT>(&self, bucket: IdentifierT) -> Result<()>
1275    where
1276        IdentifierT: Into<BucketIdentifier>,
1277    {
1278        let bucket_id = match bucket.into() {
1279            BucketIdentifier::Id(bucket_id) => bucket_id,
1280            bucket @ BucketIdentifier::FullName(_) => self.get_bucket(bucket)?.id,
1281        };
1282        self.delete(self.endpoints.bucket_by_id(&bucket_id)?)
1283    }
1284
1285    pub fn fetch_stream_comments(
1286        &self,
1287        stream_name: &StreamFullName,
1288        size: u32,
1289    ) -> Result<StreamBatch> {
1290        self.post(
1291            self.endpoints.stream_fetch(stream_name)?,
1292            StreamFetchRequest { size },
1293            Retry::No,
1294        )
1295    }
1296
1297    pub fn get_stream(&self, stream_name: &StreamFullName) -> Result<Stream> {
1298        Ok(self
1299            .get::<_, GetStreamResponse>(self.endpoints.stream(stream_name)?)?
1300            .stream)
1301    }
1302
1303    pub fn advance_stream(
1304        &self,
1305        stream_name: &StreamFullName,
1306        sequence_id: StreamSequenceId,
1307    ) -> Result<()> {
1308        self.post::<_, _, serde::de::IgnoredAny>(
1309            self.endpoints.stream_advance(stream_name)?,
1310            StreamAdvanceRequest { sequence_id },
1311            Retry::No,
1312        )?;
1313        Ok(())
1314    }
1315
1316    pub fn reset_stream(
1317        &self,
1318        stream_name: &StreamFullName,
1319        to_comment_created_at: DateTime<Utc>,
1320    ) -> Result<()> {
1321        self.post::<_, _, serde::de::IgnoredAny>(
1322            self.endpoints.stream_reset(stream_name)?,
1323            StreamResetRequest {
1324                to_comment_created_at,
1325            },
1326            Retry::No,
1327        )?;
1328        Ok(())
1329    }
1330
1331    pub fn tag_stream_exceptions(
1332        &self,
1333        stream_name: &StreamFullName,
1334        exceptions: &[StreamException],
1335    ) -> Result<()> {
1336        self.put::<_, _, serde::de::IgnoredAny>(
1337            self.endpoints.stream_exceptions(stream_name)?,
1338            TagStreamExceptionsRequest { exceptions },
1339        )?;
1340        Ok(())
1341    }
1342
1343    /// Gets a project.
1344    pub fn get_project(&self, project_name: &ProjectName) -> Result<Project> {
1345        let response =
1346            self.get::<_, GetProjectResponse>(self.endpoints.project_by_name(project_name)?)?;
1347        Ok(response.project)
1348    }
1349
1350    /// Gets all projects.
1351    pub fn get_projects(&self) -> Result<Vec<Project>> {
1352        let response = self.get::<_, GetProjectsResponse>(self.endpoints.projects.clone())?;
1353        Ok(response.projects)
1354    }
1355
1356    /// Creates a new project.
1357    pub fn create_project(
1358        &self,
1359        project_name: &ProjectName,
1360        options: NewProject,
1361        user_ids: &[UserId],
1362    ) -> Result<Project> {
1363        Ok(self
1364            .put::<_, _, CreateProjectResponse>(
1365                self.endpoints.project_by_name(project_name)?,
1366                CreateProjectRequest {
1367                    project: options,
1368                    user_ids,
1369                },
1370            )?
1371            .project)
1372    }
1373
1374    /// Updates an existing project.
1375    pub fn update_project(
1376        &self,
1377        project_name: &ProjectName,
1378        options: UpdateProject,
1379    ) -> Result<Project> {
1380        Ok(self
1381            .post::<_, _, UpdateProjectResponse>(
1382                self.endpoints.project_by_name(project_name)?,
1383                UpdateProjectRequest { project: options },
1384                Retry::Yes,
1385            )?
1386            .project)
1387    }
1388
1389    /// Deletes an existing project.
1390    pub fn delete_project(
1391        &self,
1392        project_name: &ProjectName,
1393        force_delete: ForceDeleteProject,
1394    ) -> Result<()> {
1395        let endpoint = self.endpoints.project_by_name(project_name)?;
1396        match force_delete {
1397            ForceDeleteProject::No => self.delete(endpoint)?,
1398            ForceDeleteProject::Yes => {
1399                self.delete_query(endpoint, Some(&json!({ "force": true })))?
1400            }
1401        };
1402        Ok(())
1403    }
1404
1405    fn get<LocationT, SuccessT>(&self, url: LocationT) -> Result<SuccessT>
1406    where
1407        LocationT: IntoUrl + Display + Clone,
1408        for<'de> SuccessT: Deserialize<'de>,
1409    {
1410        self.request(&Method::GET, &url, &None::<()>, &None::<()>, &Retry::Yes)
1411    }
1412
1413    fn get_query<LocationT, QueryT, SuccessT>(
1414        &self,
1415        url: LocationT,
1416        query: Option<&QueryT>,
1417    ) -> Result<SuccessT>
1418    where
1419        LocationT: IntoUrl + Display + Clone,
1420        QueryT: Serialize,
1421        for<'de> SuccessT: Deserialize<'de>,
1422    {
1423        self.request(&Method::GET, &url, &None::<()>, &Some(query), &Retry::Yes)
1424    }
1425
1426    fn delete<LocationT>(&self, url: LocationT) -> Result<()>
1427    where
1428        LocationT: IntoUrl + Display + Clone,
1429    {
1430        self.delete_query::<LocationT, ()>(url, None)
1431    }
1432
1433    fn delete_query<LocationT, QueryT>(&self, url: LocationT, query: Option<&QueryT>) -> Result<()>
1434    where
1435        LocationT: IntoUrl + Display + Clone,
1436        QueryT: Serialize,
1437    {
1438        debug!("Attempting DELETE `{url}`");
1439
1440        let attempts = Cell::new(0);
1441        let http_response = self
1442            .with_retries(|| {
1443                attempts.set(attempts.get() + 1);
1444
1445                let mut request = self
1446                    .http_client
1447                    .delete(url.clone())
1448                    .headers(self.headers.clone());
1449                if let Some(query) = query {
1450                    request = request.query(query);
1451                }
1452                request.send()
1453            })
1454            .map_err(|source| Error::ReqwestError {
1455                source,
1456                message: "DELETE operation failed.".to_owned(),
1457            })?;
1458        let status = http_response.status();
1459        http_response
1460            .json::<Response<EmptySuccess>>()
1461            .map_err(Error::BadJsonResponse)?
1462            .into_result(status)
1463            .map_or_else(
1464                // Ignore 404 not found if the request had to be re-tried - assume the target
1465                // object was deleted on a previous incomplete request.
1466                |error| {
1467                    if attempts.get() > 1 && status == reqwest::StatusCode::NOT_FOUND {
1468                        Ok(())
1469                    } else {
1470                        Err(error)
1471                    }
1472                },
1473                |_| Ok(()),
1474            )
1475    }
1476
1477    fn post<LocationT, RequestT, SuccessT>(
1478        &self,
1479        url: LocationT,
1480        request: RequestT,
1481        retry: Retry,
1482    ) -> Result<SuccessT>
1483    where
1484        LocationT: IntoUrl + Display + Clone,
1485        RequestT: Serialize,
1486        for<'de> SuccessT: Deserialize<'de>,
1487    {
1488        self.request(&Method::POST, &url, &Some(request), &None::<()>, &retry)
1489    }
1490
1491    fn put<LocationT, RequestT, SuccessT>(
1492        &self,
1493        url: LocationT,
1494        request: RequestT,
1495    ) -> Result<SuccessT>
1496    where
1497        LocationT: IntoUrl + Display + Clone,
1498        RequestT: Serialize,
1499        for<'de> SuccessT: Deserialize<'de>,
1500    {
1501        self.request(&Method::PUT, &url, &Some(request), &None::<()>, &Retry::Yes)
1502    }
1503
1504    fn raw_request<LocationT, RequestT, QueryT>(
1505        &self,
1506        method: &Method,
1507        url: &LocationT,
1508        body: &Option<RequestT>,
1509        query: &Option<QueryT>,
1510        retry: &Retry,
1511        accept_header: Option<HeaderValue>,
1512    ) -> Result<reqwest::blocking::Response>
1513    where
1514        LocationT: IntoUrl + Display + Clone,
1515        RequestT: Serialize,
1516        QueryT: Serialize,
1517    {
1518        let mut headers = self.headers.clone();
1519
1520        if let Some(accept_header) = accept_header {
1521            headers.insert(ACCEPT, accept_header);
1522        }
1523
1524        let do_request = || {
1525            let request = self
1526                .http_client
1527                .request(method.clone(), url.clone())
1528                .headers(headers.clone());
1529
1530            let request = match &query {
1531                Some(query) => request.query(query),
1532                None => request,
1533            };
1534            let request = match &body {
1535                Some(body) => request.json(body),
1536                None => request,
1537            };
1538            request.send()
1539        };
1540
1541        let result = match retry {
1542            Retry::Yes => self.with_retries(do_request),
1543            Retry::No => do_request(),
1544        };
1545        let http_response = result.map_err(|source| Error::ReqwestError {
1546            source,
1547            message: format!("{method} operation failed."),
1548        })?;
1549
1550        Ok(http_response)
1551    }
1552
1553    fn splitable_request<LocationT, RequestT, SuccessT, QueryT>(
1554        &self,
1555        method: Method,
1556        url: LocationT,
1557        body: RequestT,
1558        query: Option<QueryT>,
1559        retry: Retry,
1560    ) -> Result<SplitableRequestResponse<SuccessT>>
1561    where
1562        LocationT: IntoUrl + Display + Clone,
1563        RequestT: Serialize + SplittableRequest + Clone,
1564        QueryT: Serialize + Clone,
1565        for<'de> SuccessT: Deserialize<'de> + ReducibleResponse + Clone + Default,
1566    {
1567        debug!("Attempting {method} `{url}`");
1568        let result: Result<SuccessT> =
1569            self.request(&method, &url, &Some(body.clone()), &query, &retry);
1570
1571        fn should_split(error: &Error) -> bool {
1572            if let Error::Api { status_code, .. } = error {
1573                *status_code == reqwest::StatusCode::UNPROCESSABLE_ENTITY
1574                    || *status_code == reqwest::StatusCode::BAD_REQUEST
1575            } else if let Error::BadJsonResponse(_) = error {
1576                // This is for the case where some sort of network config (e.g. cloudflare) blocks
1577                // the request and returns invalid content
1578                true
1579            } else if let Error::ReqwestError { source, .. } = error {
1580                // Should split timeouts
1581                source.is_timeout()
1582            } else {
1583                false
1584            }
1585        }
1586
1587        match result {
1588            Ok(response) => Ok(SplitableRequestResponse {
1589                response,
1590                num_failed: 0,
1591            }),
1592            Err(error) if should_split(&error) => {
1593                let mut num_failed = 0;
1594                let response = body
1595                    .split()
1596                    .filter_map(|request| {
1597                        match self.request(&method, &url, &Some(request), &query, &retry) {
1598                            Ok(response) => Some(response),
1599                            Err(_) => {
1600                                num_failed += 1;
1601                                None
1602                            }
1603                        }
1604                    })
1605                    .fold(SuccessT::empty(), |merged, next: SuccessT| {
1606                        merged.merge(next)
1607                    });
1608
1609                Ok(SplitableRequestResponse {
1610                    num_failed,
1611                    response,
1612                })
1613            }
1614            Err(error) => Err(error),
1615        }
1616    }
1617
1618    fn request<LocationT, RequestT, SuccessT, QueryT>(
1619        &self,
1620        method: &Method,
1621        url: &LocationT,
1622        body: &Option<RequestT>,
1623        query: &Option<QueryT>,
1624        retry: &Retry,
1625    ) -> Result<SuccessT>
1626    where
1627        LocationT: IntoUrl + Display + Clone,
1628        RequestT: Serialize,
1629        QueryT: Serialize + Clone,
1630        for<'de> SuccessT: Deserialize<'de>,
1631    {
1632        debug!("Attempting {method} `{url}`");
1633        let http_response = self.raw_request(method, url, body, query, retry, None)?;
1634
1635        let status = http_response.status();
1636        http_response
1637            .json::<Response<SuccessT>>()
1638            .map_err(Error::BadJsonResponse)?
1639            .into_result(status)
1640    }
1641
1642    fn with_retries(
1643        &self,
1644        send_request: impl Fn() -> ReqwestResult<HttpResponse>,
1645    ) -> ReqwestResult<HttpResponse> {
1646        match &self.retrier {
1647            Some(retrier) => retrier.with_retries(send_request),
1648            None => send_request(),
1649        }
1650    }
1651}
1652
1653#[derive(Copy, Clone)]
1654enum Retry {
1655    Yes,
1656    No,
1657}
1658
1659pub struct DatasetQueryIter<'a> {
1660    client: &'a Client,
1661    dataset_name: &'a DatasetFullName,
1662    done: bool,
1663    params: &'a mut QueryRequestParams,
1664}
1665
1666impl<'a> DatasetQueryIter<'a> {
1667    fn new(
1668        client: &'a Client,
1669        dataset_name: &'a DatasetFullName,
1670        params: &'a mut QueryRequestParams,
1671    ) -> Self {
1672        Self {
1673            client,
1674            dataset_name,
1675            done: false,
1676            params,
1677        }
1678    }
1679}
1680
1681impl Iterator for DatasetQueryIter<'_> {
1682    type Item = Result<Vec<AnnotatedComment>>;
1683
1684    fn next(&mut self) -> Option<Self::Item> {
1685        if self.done {
1686            return None;
1687        }
1688
1689        let response = self.client.query_dataset(self.dataset_name, self.params);
1690        Some(response.map(|page| {
1691            self.params.continuation = page.continuation;
1692            self.done = self.params.continuation.is_none();
1693            page.results
1694        }))
1695    }
1696}
1697
1698pub enum ContinuationKind {
1699    Timestamp(DateTime<Utc>),
1700    Continuation(Continuation),
1701}
1702
1703pub struct EmailsIter<'a> {
1704    client: &'a Client,
1705    bucket_name: &'a BucketFullName,
1706    continuation: Option<EmailContinuation>,
1707    done: bool,
1708    page_size: usize,
1709}
1710
1711impl<'a> EmailsIter<'a> {
1712    // Default number of emails per page to request from API.
1713    pub const DEFAULT_PAGE_SIZE: usize = 64;
1714    // Maximum number of emails per page which can be requested from the API.
1715    pub const MAX_PAGE_SIZE: usize = 256;
1716
1717    fn new(client: &'a Client, bucket_name: &'a BucketFullName, page_size: Option<usize>) -> Self {
1718        Self {
1719            client,
1720            bucket_name,
1721            continuation: None,
1722            done: false,
1723            page_size: page_size.unwrap_or(Self::DEFAULT_PAGE_SIZE),
1724        }
1725    }
1726}
1727
1728impl Iterator for EmailsIter<'_> {
1729    type Item = Result<Vec<Email>>;
1730
1731    fn next(&mut self) -> Option<Self::Item> {
1732        if self.done {
1733            return None;
1734        }
1735        let response = self.client.get_emails_iter_page(
1736            self.bucket_name,
1737            self.continuation.as_ref(),
1738            self.page_size,
1739        );
1740        Some(response.map(|page| {
1741            self.continuation = page.continuation;
1742            self.done = self.continuation.is_none();
1743            page.emails
1744        }))
1745    }
1746}
1747
1748pub struct CommentsIter<'a> {
1749    client: &'a Client,
1750    source_name: &'a SourceFullName,
1751    continuation: Option<ContinuationKind>,
1752    done: bool,
1753    page_size: usize,
1754    to_timestamp: Option<DateTime<Utc>>,
1755}
1756
1757#[derive(Debug, Default)]
1758pub struct CommentsIterTimerange {
1759    pub from: Option<DateTime<Utc>>,
1760    pub to: Option<DateTime<Utc>>,
1761}
1762impl<'a> CommentsIter<'a> {
1763    // Default number of comments per page to request from API.
1764    pub const DEFAULT_PAGE_SIZE: usize = 64;
1765    // Maximum number of comments per page which can be requested from the API.
1766    pub const MAX_PAGE_SIZE: usize = 256;
1767
1768    fn new(
1769        client: &'a Client,
1770        source_name: &'a SourceFullName,
1771        page_size: Option<usize>,
1772        timerange: CommentsIterTimerange,
1773    ) -> Self {
1774        let (from_timestamp, to_timestamp) = (timerange.from, timerange.to);
1775        Self {
1776            client,
1777            source_name,
1778            to_timestamp,
1779            continuation: from_timestamp.map(ContinuationKind::Timestamp),
1780            done: false,
1781            page_size: page_size.unwrap_or(Self::DEFAULT_PAGE_SIZE),
1782        }
1783    }
1784}
1785
1786impl Iterator for CommentsIter<'_> {
1787    type Item = Result<Vec<Comment>>;
1788
1789    fn next(&mut self) -> Option<Self::Item> {
1790        if self.done {
1791            return None;
1792        }
1793        let response = self.client.get_comments_iter_page(
1794            self.source_name,
1795            self.continuation.as_ref(),
1796            self.to_timestamp,
1797            self.page_size,
1798        );
1799        Some(response.map(|page| {
1800            self.continuation = page.continuation.map(ContinuationKind::Continuation);
1801            self.done = self.continuation.is_none();
1802            page.comments
1803        }))
1804    }
1805}
1806
1807pub struct LabellingsIter<'a> {
1808    client: &'a Client,
1809    dataset_name: &'a DatasetFullName,
1810    source_id: &'a SourceId,
1811    return_predictions: bool,
1812    after: Option<GetLabellingsAfter>,
1813    limit: Option<usize>,
1814    done: bool,
1815}
1816
1817impl<'a> LabellingsIter<'a> {
1818    fn new(
1819        client: &'a Client,
1820        dataset_name: &'a DatasetFullName,
1821        source_id: &'a SourceId,
1822        return_predictions: bool,
1823        limit: Option<usize>,
1824    ) -> Self {
1825        Self {
1826            client,
1827            dataset_name,
1828            source_id,
1829            return_predictions,
1830            after: None,
1831            limit,
1832            done: false,
1833        }
1834    }
1835}
1836
1837impl Iterator for LabellingsIter<'_> {
1838    type Item = Result<Vec<AnnotatedComment>>;
1839
1840    fn next(&mut self) -> Option<Self::Item> {
1841        if self.done {
1842            return None;
1843        }
1844        let response = self.client.get_labellings_in_bulk(
1845            self.dataset_name,
1846            GetLabellingsInBulk {
1847                source_id: self.source_id,
1848                return_predictions: &self.return_predictions,
1849                after: &self.after,
1850                limit: &self.limit,
1851            },
1852        );
1853        Some(response.map(|page| {
1854            if self.after == page.after && !page.results.is_empty() {
1855                panic!("Labellings API did not increment pagination continuation");
1856            }
1857            self.after = page.after;
1858            if page.results.is_empty() {
1859                self.done = true;
1860            }
1861            page.results
1862        }))
1863    }
1864}
1865
1866#[derive(Debug)]
1867struct Endpoints {
1868    base: Url,
1869    datasets: Url,
1870    sources: Url,
1871    buckets: Url,
1872    users: Url,
1873    current_user: Url,
1874    projects: Url,
1875}
1876
1877#[derive(Debug, Serialize, Clone, Copy)]
1878struct NoChargeQuery {
1879    no_charge: bool,
1880}
1881
1882fn construct_endpoint(base: &Url, segments: &[&str]) -> Result<Url> {
1883    let mut endpoint = base.clone();
1884
1885    let mut endpoint_segments = endpoint
1886        .path_segments_mut()
1887        .map_err(|_| Error::BadEndpoint {
1888            endpoint: base.clone(),
1889        })?;
1890
1891    for segment in segments {
1892        endpoint_segments.push(segment);
1893    }
1894
1895    drop(endpoint_segments);
1896
1897    Ok(endpoint)
1898}
1899
1900impl Endpoints {
1901    pub fn new(base: Url) -> Result<Self> {
1902        let datasets = construct_endpoint(&base, &["api", "v1", "datasets"])?;
1903        let sources = construct_endpoint(&base, &["api", "v1", "sources"])?;
1904        let buckets = construct_endpoint(&base, &["api", "_private", "buckets"])?;
1905        let users = construct_endpoint(&base, &["api", "_private", "users"])?;
1906        let current_user = construct_endpoint(&base, &["auth", "user"])?;
1907        let projects = construct_endpoint(&base, &["api", "_private", "projects"])?;
1908
1909        Ok(Endpoints {
1910            base,
1911            datasets,
1912            sources,
1913            buckets,
1914            users,
1915            current_user,
1916            projects,
1917        })
1918    }
1919
1920    fn refresh_user_permissions(&self) -> Result<Url> {
1921        construct_endpoint(&self.base, &["auth", "refresh-user-permissions"])
1922    }
1923
1924    fn label_group(
1925        &self,
1926        dataset_name: &DatasetFullName,
1927        label_group: LabelGroupName,
1928    ) -> Result<Url> {
1929        construct_endpoint(
1930            &self.base,
1931            &[
1932                "api",
1933                "_private",
1934                "datasets",
1935                &dataset_name.0,
1936                "labels",
1937                &label_group.0,
1938            ],
1939        )
1940    }
1941
1942    fn ixp_datasets(&self) -> Result<Url> {
1943        construct_endpoint(&self.base, &["api", "_private", "ixp", "datasets"])
1944    }
1945
1946    fn ixp_documents(&self, source_id: &SourceId) -> Result<Url> {
1947        construct_endpoint(
1948            &self.base,
1949            &[
1950                "api",
1951                "_private",
1952                "sources",
1953                &format!("id:{0}", source_id.0),
1954                "documents",
1955            ],
1956        )
1957    }
1958
1959    fn ixp_document(&self, source_id: &SourceId, comment_id: &CommentId) -> Result<Url> {
1960        construct_endpoint(
1961            &self.base,
1962            &[
1963                "api",
1964                "_private",
1965                "sources",
1966                &format!("id:{0}", source_id.0),
1967                "documents",
1968                &comment_id.0,
1969            ],
1970        )
1971    }
1972
1973    fn keyed_sync_states(&self, bucket_id: &BucketId) -> Result<Url> {
1974        construct_endpoint(
1975            &self.base,
1976            &[
1977                "api",
1978                "_private",
1979                "buckets",
1980                &format!("id:{}", bucket_id.0),
1981                "keyed-sync-states/",
1982            ],
1983        )
1984    }
1985
1986    fn keyed_sync_state(&self, bucket_id: &BucketId, id: &KeyedSyncStateId) -> Result<Url> {
1987        construct_endpoint(
1988            &self.base,
1989            &[
1990                "api",
1991                "_private",
1992                "buckets",
1993                &format!("id:{}", bucket_id.0),
1994                "keyed-sync-state",
1995                &id.0,
1996            ],
1997        )
1998    }
1999
2000    fn query_keyed_sync_state_ids(&self, bucket_id: &BucketId) -> Result<Url> {
2001        construct_endpoint(
2002            &self.base,
2003            &[
2004                "api",
2005                "_private",
2006                "buckets",
2007                &format!("id:{}", bucket_id.0),
2008                "keyed-sync-state-ids",
2009            ],
2010        )
2011    }
2012
2013    fn audit_events_query(&self) -> Result<Url> {
2014        construct_endpoint(&self.base, &["api", "v1", "audit_events", "query"])
2015    }
2016
2017    fn integrations(&self) -> Result<Url> {
2018        construct_endpoint(&self.base, &["api", "_private", "integrations"])
2019    }
2020
2021    fn integration(&self, name: &IntegrationFullName) -> Result<Url> {
2022        construct_endpoint(&self.base, &["api", "_private", "integrations", &name.0])
2023    }
2024
2025    fn attachment_reference(&self, reference: &AttachmentReference) -> Result<Url> {
2026        construct_endpoint(&self.base, &["api", "v1", "attachments", &reference.0])
2027    }
2028
2029    fn attachment_upload(
2030        &self,
2031        source_id: &SourceId,
2032        comment_id: &CommentId,
2033        attachment_index: usize,
2034    ) -> Result<Url> {
2035        construct_endpoint(
2036            &self.base,
2037            &[
2038                "api",
2039                "_private",
2040                "sources",
2041                &format!("id:{}", source_id.0),
2042                "comments",
2043                &comment_id.0,
2044                "attachments",
2045                &attachment_index.to_string(),
2046            ],
2047        )
2048    }
2049    fn latest_validation(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2050        construct_endpoint(
2051            &self.base,
2052            &[
2053                "api",
2054                "_private",
2055                "datasets",
2056                &dataset_name.0,
2057                "labellers",
2058                "latest",
2059                "validation",
2060            ],
2061        )
2062    }
2063
2064    fn validation(
2065        &self,
2066        dataset_name: &DatasetFullName,
2067        model_version: &ModelVersion,
2068    ) -> Result<Url> {
2069        construct_endpoint(
2070            &self.base,
2071            &[
2072                "api",
2073                "_private",
2074                "datasets",
2075                &dataset_name.0,
2076                "labellers",
2077                &model_version.0.to_string(),
2078                "validation",
2079            ],
2080        )
2081    }
2082
2083    fn label_validation(
2084        &self,
2085        dataset_name: &DatasetFullName,
2086        model_version: &ModelVersion,
2087    ) -> Result<Url> {
2088        construct_endpoint(
2089            &self.base,
2090            &[
2091                "api",
2092                "_private",
2093                "datasets",
2094                &dataset_name.0,
2095                "labellers",
2096                &model_version.0.to_string(),
2097                "label-validation",
2098            ],
2099        )
2100    }
2101    fn bucket_statistics(&self, bucket_name: &BucketFullName) -> Result<Url> {
2102        construct_endpoint(
2103            &self.base,
2104            &["api", "_private", "buckets", &bucket_name.0, "statistics"],
2105        )
2106    }
2107
2108    fn dataset_summary(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2109        construct_endpoint(
2110            &self.base,
2111            &["api", "_private", "datasets", &dataset_name.0, "summary"],
2112        )
2113    }
2114
2115    fn query_dataset(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2116        construct_endpoint(
2117            &self.base,
2118            &["api", "_private", "datasets", &dataset_name.0, "query"],
2119        )
2120    }
2121
2122    fn streams(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2123        construct_endpoint(
2124            &self.base,
2125            &["api", "v1", "datasets", &dataset_name.0, "streams"],
2126        )
2127    }
2128
2129    fn stream(&self, stream_name: &StreamFullName) -> Result<Url> {
2130        construct_endpoint(
2131            &self.base,
2132            &[
2133                "api",
2134                "v1",
2135                "datasets",
2136                &stream_name.dataset.0,
2137                "streams",
2138                &stream_name.stream.0,
2139            ],
2140        )
2141    }
2142
2143    fn stream_fetch(&self, stream_name: &StreamFullName) -> Result<Url> {
2144        construct_endpoint(
2145            &self.base,
2146            &[
2147                "api",
2148                "v1",
2149                "datasets",
2150                &stream_name.dataset.0,
2151                "streams",
2152                &stream_name.stream.0,
2153                "fetch",
2154            ],
2155        )
2156    }
2157
2158    fn stream_advance(&self, stream_name: &StreamFullName) -> Result<Url> {
2159        construct_endpoint(
2160            &self.base,
2161            &[
2162                "api",
2163                "v1",
2164                "datasets",
2165                &stream_name.dataset.0,
2166                "streams",
2167                &stream_name.stream.0,
2168                "advance",
2169            ],
2170        )
2171    }
2172
2173    fn stream_reset(&self, stream_name: &StreamFullName) -> Result<Url> {
2174        construct_endpoint(
2175            &self.base,
2176            &[
2177                "api",
2178                "v1",
2179                "datasets",
2180                &stream_name.dataset.0,
2181                "streams",
2182                &stream_name.stream.0,
2183                "reset",
2184            ],
2185        )
2186    }
2187
2188    fn stream_exceptions(&self, stream_name: &StreamFullName) -> Result<Url> {
2189        construct_endpoint(
2190            &self.base,
2191            &[
2192                "api",
2193                "v1",
2194                "datasets",
2195                &stream_name.dataset.0,
2196                "streams",
2197                &stream_name.stream.0,
2198                "exceptions",
2199            ],
2200        )
2201    }
2202
2203    fn recent_comments(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2204        construct_endpoint(
2205            &self.base,
2206            &["api", "_private", "datasets", &dataset_name.0, "recent"],
2207        )
2208    }
2209
2210    fn dataset_statistics(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2211        construct_endpoint(
2212            &self.base,
2213            &["api", "_private", "datasets", &dataset_name.0, "statistics"],
2214        )
2215    }
2216
2217    fn source_statistics(&self, source_name: &SourceFullName) -> Result<Url> {
2218        construct_endpoint(
2219            &self.base,
2220            &["api", "v1", "sources", &source_name.0, "statistics"],
2221        )
2222    }
2223
2224    fn user_by_id(&self, user_id: &UserId) -> Result<Url> {
2225        construct_endpoint(&self.base, &["api", "_private", "users", &user_id.0])
2226    }
2227
2228    fn source_by_id(&self, source_id: &SourceId) -> Result<Url> {
2229        construct_endpoint(
2230            &self.base,
2231            &["api", "v1", "sources", &format!("id:{}", source_id.0)],
2232        )
2233    }
2234
2235    fn source_by_name(&self, source_name: &SourceFullName) -> Result<Url> {
2236        construct_endpoint(&self.base, &["api", "v1", "sources", &source_name.0])
2237    }
2238
2239    fn quotas(&self, tenant_id: &Option<UiPathTenantId>) -> Result<Url> {
2240        if let Some(tenant_id) = tenant_id {
2241            construct_endpoint(&self.base, &["api", "_private", "quotas", &tenant_id.0])
2242        } else {
2243            construct_endpoint(&self.base, &["api", "_private", "quotas"])
2244        }
2245    }
2246
2247    fn quota(&self, tenant_id: &TenantId, tenant_quota_kind: TenantQuotaKind) -> Result<Url> {
2248        construct_endpoint(
2249            &self.base,
2250            &[
2251                "api",
2252                "_private",
2253                "quotas",
2254                &tenant_id.to_string(),
2255                &tenant_quota_kind.to_string(),
2256            ],
2257        )
2258    }
2259
2260    fn put_comments(&self, source_name: &SourceFullName) -> Result<Url> {
2261        construct_endpoint(
2262            &self.base,
2263            &["api", "_private", "sources", &source_name.0, "comments"],
2264        )
2265    }
2266
2267    fn comments(&self, source_name: &SourceFullName) -> Result<Url> {
2268        construct_endpoint(
2269            &self.base,
2270            &["api", "_private", "sources", &source_name.0, "comments"],
2271        )
2272    }
2273
2274    fn comment_by_id(&self, source_name: &SourceFullName, comment_id: &CommentId) -> Result<Url> {
2275        construct_endpoint(
2276            &self.base,
2277            &[
2278                "api",
2279                "v1",
2280                "sources",
2281                &source_name.0,
2282                "comments",
2283                &comment_id.0,
2284            ],
2285        )
2286    }
2287
2288    fn comments_v1(&self, source_name: &SourceFullName) -> Result<Url> {
2289        construct_endpoint(
2290            &self.base,
2291            &["api", "v1", "sources", &source_name.0, "comments"],
2292        )
2293    }
2294
2295    fn sync_comments(&self, source_name: &SourceFullName) -> Result<Url> {
2296        construct_endpoint(
2297            &self.base,
2298            &["api", "v1", "sources", &source_name.0, "sync"],
2299        )
2300    }
2301
2302    fn sync_comments_raw_emails(&self, source_name: &SourceFullName) -> Result<Url> {
2303        construct_endpoint(
2304            &self.base,
2305            &["api", "v1", "sources", &source_name.0, "sync-raw-emails"],
2306        )
2307    }
2308
2309    fn comment_audio(&self, source_id: &SourceId, comment_id: &CommentId) -> Result<Url> {
2310        construct_endpoint(
2311            &self.base,
2312            &[
2313                "api",
2314                "_private",
2315                "sources",
2316                &format!("id:{}", source_id.0),
2317                "comments",
2318                &comment_id.0,
2319                "audio",
2320            ],
2321        )
2322    }
2323
2324    fn get_emails(&self, bucket_name: &BucketFullName) -> Result<Url> {
2325        construct_endpoint(
2326            &self.base,
2327            &["api", "_private", "buckets", &bucket_name.0, "emails"],
2328        )
2329    }
2330
2331    fn put_emails(&self, bucket_name: &BucketFullName) -> Result<Url> {
2332        construct_endpoint(
2333            &self.base,
2334            &["api", "_private", "buckets", &bucket_name.0, "emails"],
2335        )
2336    }
2337
2338    fn post_user(&self, user_id: &UserId) -> Result<Url> {
2339        construct_endpoint(&self.base, &["api", "_private", "users", &user_id.0])
2340    }
2341
2342    fn dataset_by_id(&self, dataset_id: &DatasetId) -> Result<Url> {
2343        construct_endpoint(
2344            &self.base,
2345            &["api", "v1", "datasets", &format!("id:{}", dataset_id.0)],
2346        )
2347    }
2348
2349    fn dataset_by_name(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2350        construct_endpoint(&self.base, &["api", "v1", "datasets", &dataset_name.0])
2351    }
2352
2353    fn get_labellings(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2354        construct_endpoint(
2355            &self.base,
2356            &["api", "_private", "datasets", &dataset_name.0, "labellings"],
2357        )
2358    }
2359
2360    fn labellers(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2361        construct_endpoint(
2362            &self.base,
2363            &["api", "_private", "datasets", &dataset_name.0, "labellers"],
2364        )
2365    }
2366
2367    fn get_comment_predictions(
2368        &self,
2369        dataset_name: &DatasetFullName,
2370        model_version: &ModelVersion,
2371    ) -> Result<Url> {
2372        construct_endpoint(
2373            &self.base,
2374            &[
2375                "api",
2376                "v1",
2377                "datasets",
2378                &dataset_name.0,
2379                "labellers",
2380                &model_version.0.to_string(),
2381                "predict-comments",
2382            ],
2383        )
2384    }
2385
2386    fn post_labelling(
2387        &self,
2388        dataset_name: &DatasetFullName,
2389        comment_uid: &CommentUid,
2390    ) -> Result<Url> {
2391        construct_endpoint(
2392            &self.base,
2393            &[
2394                "api",
2395                "_private",
2396                "datasets",
2397                &dataset_name.0,
2398                "labellings",
2399                &comment_uid.0,
2400            ],
2401        )
2402    }
2403
2404    fn bucket_by_id(&self, bucket_id: &BucketId) -> Result<Url> {
2405        construct_endpoint(
2406            &self.base,
2407            &["api", "_private", "buckets", &format!("id:{}", bucket_id.0)],
2408        )
2409    }
2410
2411    fn bucket_by_name(&self, bucket_name: &BucketFullName) -> Result<Url> {
2412        construct_endpoint(&self.base, &["api", "_private", "buckets", &bucket_name.0])
2413    }
2414
2415    fn project_by_name(&self, project_name: &ProjectName) -> Result<Url> {
2416        construct_endpoint(
2417            &self.base,
2418            &["api", "_private", "projects", &project_name.0],
2419        )
2420    }
2421
2422    fn welcome_email(&self, user_id: &UserId) -> Result<Url> {
2423        construct_endpoint(
2424            &self.base,
2425            &["api", "_private", "users", &user_id.0, "welcome-email"],
2426        )
2427    }
2428}
2429
2430const DEFAULT_HTTP_TIMEOUT_SECONDS: u64 = 240;
2431
2432fn build_http_client(config: &Config) -> Result<HttpClient> {
2433    let mut builder = HttpClient::builder()
2434        .gzip(true)
2435        .danger_accept_invalid_certs(config.accept_invalid_certificates)
2436        .timeout(Some(Duration::from_secs(DEFAULT_HTTP_TIMEOUT_SECONDS)));
2437
2438    if let Some(proxy) = config.proxy.clone() {
2439        builder = builder.proxy(Proxy::all(proxy).map_err(Error::BuildHttpClient)?);
2440    }
2441    builder.build().map_err(Error::BuildHttpClient)
2442}
2443
2444fn build_headers(config: &Config) -> Result<HeaderMap> {
2445    let mut headers = HeaderMap::new();
2446    headers.insert(
2447        header::AUTHORIZATION,
2448        HeaderValue::from_str(&format!("Bearer {}", &config.token.0)).map_err(|_| {
2449            Error::BadToken {
2450                token: config.token.0.clone(),
2451            }
2452        })?,
2453    );
2454    Ok(headers)
2455}
2456
2457fn id_list_query<'a>(ids: impl Iterator<Item = &'a String>) -> Vec<(&'static str, &'a str)> {
2458    // Return a list of pairs ("id", "a"), ("id", "b"), ...
2459    // The http client will turn this into a query string of
2460    // the form "id=a&id=b&..."
2461    ids.map(|id| ("id", id.as_str())).collect()
2462}
2463
2464pub static DEFAULT_ENDPOINT: Lazy<Url> =
2465    Lazy::new(|| Url::parse("https://reinfer.dev").expect("Default URL is well-formed"));
2466
2467#[cfg(test)]
2468mod tests {
2469    use super::*;
2470
2471    #[test]
2472    fn test_construct_endpoint() {
2473        let url = construct_endpoint(
2474            &Url::parse("https://cloud.uipath.com/org/tenant/reinfer_").unwrap(),
2475            &["api", "v1", "sources", "project", "source", "sync"],
2476        )
2477        .unwrap();
2478
2479        assert_eq!(
2480            url.to_string(),
2481            "https://cloud.uipath.com/org/tenant/reinfer_/api/v1/sources/project/source/sync"
2482        )
2483    }
2484
2485    #[test]
2486    fn test_id_list_query() {
2487        assert_eq!(id_list_query(Vec::new().iter()), Vec::new());
2488        assert_eq!(
2489            id_list_query(["foo".to_owned()].iter()),
2490            vec![("id", "foo")]
2491        );
2492        assert_eq!(
2493            id_list_query(
2494                [
2495                    "Stream".to_owned(),
2496                    "River".to_owned(),
2497                    "Waterfall".to_owned()
2498                ]
2499                .iter()
2500            ),
2501            [("id", "Stream"), ("id", "River"), ("id", "Waterfall"),]
2502        );
2503    }
2504}