reinfer_client/
lib.rs

1#![deny(clippy::all)]
2mod error;
3pub mod resources;
4pub mod retry;
5
6use chrono::{DateTime, Utc};
7use http::{header::ACCEPT, Method};
8use log::debug;
9use once_cell::sync::Lazy;
10use reqwest::{
11    blocking::{
12        multipart::{Form, Part},
13        Client as HttpClient, Response as HttpResponse,
14    },
15    header::{self, HeaderMap, HeaderValue},
16    IntoUrl, Proxy, Result as ReqwestResult,
17};
18use resources::{
19    attachments::UploadAttachmentResponse,
20    auth::{RefreshUserPermissionsRequest, RefreshUserPermissionsResponse},
21    bucket::{
22        GetKeyedSyncStateIdsRequest, GetKeyedSyncStateIdsResponse, GetKeyedSyncStatesResponse,
23        KeyedSyncState, KeyedSyncStateId,
24    },
25    bucket_statistics::GetBucketStatisticsResponse,
26    comment::{AttachmentReference, CommentTimestampFilter},
27    dataset::{
28        CreateIxpDatasetRequest, CreateIxpDatasetResponse, GetAllModelsInDatasetRequest,
29        GetAllModelsInDatasetRespone, IxpDatasetNew, QueryRequestParams, QueryResponse,
30        StatisticsRequestParams as DatasetStatisticsRequestParams, SummaryRequestParams,
31        SummaryResponse, UploadIxpDocumentResponse, UserModelMetadata,
32    },
33    documents::{Document, SyncRawEmailsRequest, SyncRawEmailsResponse},
34    email::{Email, GetEmailResponse},
35    integration::{
36        GetIntegrationResponse, GetIntegrationsResponse, Integration, NewIntegration,
37        PostIntegrationRequest, PostIntegrationResponse, PutIntegrationRequest,
38        PutIntegrationResponse,
39    },
40    label_def::{CreateOrUpdateLabelDefsBulkRequest, CreateOrUpdateLabelDefsBulkResponse},
41    project::ForceDeleteProject,
42    quota::{GetQuotasResponse, Quota},
43    source::StatisticsRequestParams as SourceStatisticsRequestParams,
44    stream::{GetStreamResponse, NewStream, PutStreamRequest, PutStreamResponse},
45    validation::{
46        LabelValidation, LabelValidationRequest, LabelValidationResponse, ValidationResponse,
47    },
48};
49use serde::{Deserialize, Serialize};
50use serde_json::json;
51use std::{
52    cell::Cell,
53    fmt::{Debug, Display},
54    io::Read,
55    path::{Path, PathBuf},
56    time::Duration,
57};
58use url::Url;
59
60use crate::resources::{
61    audit::{AuditQueryFilter, AuditQueryRequest, AuditQueryResponse},
62    bucket::{
63        CreateRequest as CreateBucketRequest, CreateResponse as CreateBucketResponse,
64        GetAvailableResponse as GetAvailableBucketsResponse, GetResponse as GetBucketResponse,
65    },
66    bucket_statistics::Statistics as BucketStatistics,
67    comment::{
68        GetAnnotationsResponse, GetCommentResponse, GetLabellingsAfter, GetPredictionsResponse,
69        GetRecentRequest, PutCommentsRequest, PutCommentsResponse, RecentCommentsPage,
70        SyncCommentsRequest, UpdateAnnotationsRequest,
71    },
72    dataset::{
73        CreateRequest as CreateDatasetRequest, CreateResponse as CreateDatasetResponse,
74        GetAvailableResponse as GetAvailableDatasetsResponse, GetResponse as GetDatasetResponse,
75        UpdateRequest as UpdateDatasetRequest, UpdateResponse as UpdateDatasetResponse,
76    },
77    email::{PutEmailsRequest, PutEmailsResponse},
78    project::{
79        CreateProjectRequest, CreateProjectResponse, GetProjectResponse, GetProjectsResponse,
80        UpdateProjectRequest, UpdateProjectResponse,
81    },
82    quota::{CreateQuota, TenantQuotaKind},
83    source::{
84        CreateRequest as CreateSourceRequest, CreateResponse as CreateSourceResponse,
85        GetAvailableResponse as GetAvailableSourcesResponse, GetResponse as GetSourceResponse,
86        UpdateRequest as UpdateSourceRequest, UpdateResponse as UpdateSourceResponse,
87    },
88    statistics::GetResponse as GetStatisticsResponse,
89    stream::{
90        AdvanceRequest as StreamAdvanceRequest, FetchRequest as StreamFetchRequest,
91        GetStreamsResponse, ResetRequest as StreamResetRequest,
92        TagExceptionsRequest as TagStreamExceptionsRequest,
93    },
94    tenant_id::TenantId,
95    user::GetResponse as GetUserResponse,
96    user::{
97        CreateRequest as CreateUserRequest, CreateResponse as CreateUserResponse,
98        GetAvailableResponse as GetAvailableUsersResponse,
99        GetCurrentResponse as GetCurrentUserResponse, PostUserRequest, PostUserResponse,
100        WelcomeEmailResponse,
101    },
102    EmptySuccess, Response,
103};
104
105use crate::retry::{Retrier, RetryConfig};
106
107pub use crate::{
108    error::{Error, Result},
109    resources::{
110        bucket::{
111            Bucket, BucketType, FullName as BucketFullName, Id as BucketId,
112            Identifier as BucketIdentifier, Name as BucketName, NewBucket,
113        },
114        comment::{
115            AnnotatedComment, Comment, CommentFilter, CommentPredictionsThreshold,
116            CommentsIterPage, Continuation, EitherLabelling, Entities, Entity,
117            GetCommentPredictionsRequest, HasAnnotations, Id as CommentId, Label, Labelling,
118            Message, MessageBody, MessageSignature, MessageSubject, NewAnnotatedComment,
119            NewComment, NewEntities, NewLabelling, NewMoonForm, PredictedLabel, Prediction,
120            PropertyMap, PropertyValue, Sentiment, SyncCommentsResponse, TriggerLabelThreshold,
121            Uid as CommentUid,
122        },
123        dataset::{
124            Dataset, FullName as DatasetFullName, Id as DatasetId, Identifier as DatasetIdentifier,
125            ModelVersion, Name as DatasetName, NewDataset, UpdateDataset,
126        },
127        email::{
128            Continuation as EmailContinuation, EmailsIterPage, Id as EmailId, Mailbox, MimeContent,
129            NewEmail,
130        },
131        entity_def::{EntityDef, Id as EntityDefId, Name as EntityName, NewEntityDef},
132        integration::FullName as IntegrationFullName,
133        label_def::{
134            LabelDef, LabelDefPretrained, MoonFormFieldDef, Name as LabelName, NewLabelDef,
135            NewLabelDefPretrained, PretrainedId as LabelDefPretrainedId,
136        },
137        label_group::{
138            LabelGroup, Name as LabelGroupName, NewLabelGroup, DEFAULT_LABEL_GROUP_NAME,
139        },
140        project::{NewProject, Project, ProjectName, UpdateProject},
141        source::{
142            FullName as SourceFullName, Id as SourceId, Identifier as SourceIdentifier,
143            Name as SourceName, NewSource, Source, SourceKind, TransformTag, UpdateSource,
144        },
145        statistics::Statistics as CommentStatistics,
146        stream::{
147            Batch as StreamBatch, FullName as StreamFullName, SequenceId as StreamSequenceId,
148            Stream, StreamException, StreamExceptionMetadata,
149        },
150        user::{
151            Email as UserEmail, GlobalPermission, Id as UserId, Identifier as UserIdentifier,
152            ModifiedPermissions, NewUser, ProjectPermission, UpdateUser, User, Username,
153        },
154    },
155};
156
157#[derive(Clone, Debug, PartialEq, Eq)]
158pub struct Token(pub String);
159
160pub trait SplittableRequest {
161    fn split(self) -> impl Iterator<Item = Self>
162    where
163        Self: Sized;
164
165    fn count(&self) -> usize;
166}
167
168pub struct SplitableRequestResponse<ResponseT>
169where
170    for<'de> ResponseT: Deserialize<'de> + ReducibleResponse,
171{
172    pub response: ResponseT,
173    pub num_failed: usize,
174}
175
176pub trait ReducibleResponse {
177    fn merge(self, _b: Self) -> Self
178    where
179        Self: std::default::Default,
180    {
181        Default::default()
182    }
183
184    fn empty() -> Self
185    where
186        Self: std::default::Default,
187    {
188        Default::default()
189    }
190}
191
192pub struct Config {
193    pub endpoint: Url,
194    pub token: Token,
195    pub accept_invalid_certificates: bool,
196    pub proxy: Option<Url>,
197    /// Retry settings to use, if any. This will apply to all requests except for POST requests
198    /// which are not idempotent (as they cannot be naively retried).
199    pub retry_config: Option<RetryConfig>,
200}
201
202impl Default for Config {
203    fn default() -> Self {
204        Config {
205            endpoint: DEFAULT_ENDPOINT.clone(),
206            token: Token("".to_owned()),
207            accept_invalid_certificates: false,
208            proxy: None,
209            retry_config: None,
210        }
211    }
212}
213
214#[derive(Debug)]
215pub struct Client {
216    endpoints: Endpoints,
217    http_client: HttpClient,
218    headers: HeaderMap,
219    retrier: Option<Retrier>,
220}
221
222#[derive(Serialize)]
223pub struct GetLabellingsInBulk<'a> {
224    pub source_id: &'a SourceId,
225    pub return_predictions: &'a bool,
226
227    #[serde(skip_serializing_if = "Option::is_none")]
228    pub after: &'a Option<GetLabellingsAfter>,
229
230    #[serde(skip_serializing_if = "Option::is_none")]
231    pub limit: &'a Option<usize>,
232}
233
234#[derive(Serialize)]
235pub struct GetCommentsIterPageQuery<'a> {
236    #[serde(skip_serializing_if = "Option::is_none")]
237    pub from_timestamp: Option<DateTime<Utc>>,
238    #[serde(skip_serializing_if = "Option::is_none")]
239    pub to_timestamp: Option<DateTime<Utc>>,
240    #[serde(skip_serializing_if = "Option::is_none")]
241    pub after: Option<&'a Continuation>,
242    pub limit: usize,
243    pub include_markup: bool,
244}
245
246#[derive(Serialize)]
247pub struct GetEmailsIterPageQuery<'a> {
248    #[serde(skip_serializing_if = "Option::is_none")]
249    pub continuation: Option<&'a EmailContinuation>,
250    pub limit: usize,
251}
252
253#[derive(Serialize)]
254pub struct GetCommentQuery {
255    pub include_markup: bool,
256}
257
258#[derive(Serialize)]
259pub struct GetEmailQuery {
260    pub id: String,
261}
262
263impl Client {
264    /// Create a new API client.
265    pub fn new(config: Config) -> Result<Client> {
266        let http_client = build_http_client(&config)?;
267        let headers = build_headers(&config)?;
268        let endpoints = Endpoints::new(config.endpoint)?;
269        let retrier = config.retry_config.map(Retrier::new);
270        Ok(Client {
271            endpoints,
272            http_client,
273            headers,
274            retrier,
275        })
276    }
277
278    /// Get the base url for the client
279    pub fn base_url(&self) -> &Url {
280        &self.endpoints.base
281    }
282
283    /// List all visible sources.
284    pub fn get_sources(&self) -> Result<Vec<Source>> {
285        Ok(self
286            .get::<_, GetAvailableSourcesResponse>(self.endpoints.sources.clone())?
287            .sources)
288    }
289
290    /// Get a source by either id or name.
291    pub fn get_user(&self, user: impl Into<UserIdentifier>) -> Result<User> {
292        Ok(match user.into() {
293            UserIdentifier::Id(user_id) => {
294                self.get::<_, GetUserResponse>(self.endpoints.user_by_id(&user_id)?)?
295                    .user
296            }
297        })
298    }
299
300    /// Get a source by either id or name.
301    pub fn get_source(&self, source: impl Into<SourceIdentifier>) -> Result<Source> {
302        Ok(match source.into() {
303            SourceIdentifier::Id(source_id) => {
304                self.get::<_, GetSourceResponse>(self.endpoints.source_by_id(&source_id)?)?
305                    .source
306            }
307            SourceIdentifier::FullName(source_name) => {
308                self.get::<_, GetSourceResponse>(self.endpoints.source_by_name(&source_name)?)?
309                    .source
310            }
311        })
312    }
313
314    pub fn create_label_defs_bulk(
315        &self,
316        dataset_name: &DatasetFullName,
317        label_group: LabelGroupName,
318        label_defs: Vec<NewLabelDef>,
319    ) -> Result<()> {
320        self.put::<_, _, CreateOrUpdateLabelDefsBulkResponse>(
321            self.endpoints.label_group(dataset_name, label_group)?,
322            CreateOrUpdateLabelDefsBulkRequest { label_defs },
323        )?;
324        Ok(())
325    }
326
327    /// Create a new source.
328    pub fn create_source(
329        &self,
330        source_name: &SourceFullName,
331        options: NewSource<'_>,
332    ) -> Result<Source> {
333        Ok(self
334            .put::<_, _, CreateSourceResponse>(
335                self.endpoints.source_by_name(source_name)?,
336                CreateSourceRequest { source: options },
337            )?
338            .source)
339    }
340
341    /// Update a source.
342    pub fn update_source(
343        &self,
344        source_name: &SourceFullName,
345        options: UpdateSource<'_>,
346    ) -> Result<Source> {
347        Ok(self
348            .post::<_, _, UpdateSourceResponse>(
349                self.endpoints.source_by_name(source_name)?,
350                UpdateSourceRequest { source: options },
351                Retry::Yes,
352            )?
353            .source)
354    }
355
356    /// Delete a source.
357    pub fn delete_source(&self, source: impl Into<SourceIdentifier>) -> Result<()> {
358        let source_id = match source.into() {
359            SourceIdentifier::Id(source_id) => source_id,
360            source @ SourceIdentifier::FullName(_) => self.get_source(source)?.id,
361        };
362        self.delete(self.endpoints.source_by_id(&source_id)?)
363    }
364
365    /// Set a quota
366    pub fn create_quota(
367        &self,
368        target_tenant_id: &TenantId,
369        tenant_quota_kind: TenantQuotaKind,
370        options: CreateQuota,
371    ) -> Result<()> {
372        self.post(
373            self.endpoints.quota(target_tenant_id, tenant_quota_kind)?,
374            options,
375            Retry::Yes,
376        )
377    }
378
379    /// Get quotas for current tenant
380    pub fn get_quotas(&self) -> Result<Vec<Quota>> {
381        Ok(self
382            .get::<_, GetQuotasResponse>(self.endpoints.quotas()?)?
383            .quotas)
384    }
385
386    /// Delete a user.
387    pub fn delete_user(&self, user: impl Into<UserIdentifier>) -> Result<()> {
388        let UserIdentifier::Id(user_id) = user.into();
389        self.delete(self.endpoints.user_by_id(&user_id)?)
390    }
391
392    /// Delete comments by id in a source.
393    pub fn delete_comments(
394        &self,
395        source: impl Into<SourceIdentifier>,
396        comments: &[CommentId],
397    ) -> Result<()> {
398        let source_full_name = match source.into() {
399            source @ SourceIdentifier::Id(_) => self.get_source(source)?.full_name(),
400            SourceIdentifier::FullName(source_full_name) => source_full_name,
401        };
402        self.delete_query(
403            self.endpoints.comments_v1(&source_full_name)?,
404            Some(&id_list_query(comments.iter().map(|uid| &uid.0))),
405        )
406    }
407
408    /// Get a page of comments from a source.
409    pub fn get_comments_iter_page(
410        &self,
411        source_name: &SourceFullName,
412        continuation: Option<&ContinuationKind>,
413        to_timestamp: Option<DateTime<Utc>>,
414        limit: usize,
415    ) -> Result<CommentsIterPage> {
416        // Comments are returned from the API in increasing order of their
417        // `timestamp` field.
418        let (from_timestamp, after) = match continuation {
419            // If we have a timestamp, then this is a request for the first page of
420            // a series of comments with timestamps starting from the given time.
421            Some(ContinuationKind::Timestamp(from_timestamp)) => (Some(*from_timestamp), None),
422            // If we have a continuation, then this is a request for page n+1 of
423            // a series of comments, where the continuation came from page n.
424            Some(ContinuationKind::Continuation(after)) => (None, Some(after)),
425            // Otherwise, this is a request for the first page of a series of comments
426            // with timestamps starting from the beginning of time.
427            None => (None, None),
428        };
429        let query_params = GetCommentsIterPageQuery {
430            from_timestamp,
431            to_timestamp,
432            after,
433            limit,
434            include_markup: true,
435        };
436        self.get_query(self.endpoints.comments(source_name)?, Some(&query_params))
437    }
438
439    /// Iterate through all comments for a given dataset query.
440    pub fn get_dataset_query_iter<'a>(
441        &'a self,
442        dataset_name: &'a DatasetFullName,
443        params: &'a mut QueryRequestParams,
444    ) -> DatasetQueryIter<'a> {
445        DatasetQueryIter::new(self, dataset_name, params)
446    }
447
448    /// Iterate through all comments in a source.
449    pub fn get_comments_iter<'a>(
450        &'a self,
451        source_name: &'a SourceFullName,
452        page_size: Option<usize>,
453        timerange: CommentsIterTimerange,
454    ) -> CommentsIter<'a> {
455        CommentsIter::new(self, source_name, page_size, timerange)
456    }
457
458    pub fn get_keyed_sync_state_ids(
459        &self,
460        bucket_id: &BucketId,
461        request: &GetKeyedSyncStateIdsRequest,
462    ) -> Result<Vec<KeyedSyncStateId>> {
463        Ok(self
464            .post::<_, _, GetKeyedSyncStateIdsResponse>(
465                self.endpoints.query_keyed_sync_state_ids(bucket_id)?,
466                Some(&request),
467                Retry::Yes,
468            )?
469            .keyed_sync_state_ids)
470    }
471
472    pub fn delete_keyed_sync_state(
473        &self,
474        bucket_id: &BucketId,
475        id: &KeyedSyncStateId,
476    ) -> Result<()> {
477        self.delete(self.endpoints.keyed_sync_state(bucket_id, id)?)
478    }
479
480    pub fn get_keyed_sync_states(&self, bucket_id: &BucketId) -> Result<Vec<KeyedSyncState>> {
481        Ok(self
482            .get::<_, GetKeyedSyncStatesResponse>(self.endpoints.keyed_sync_states(bucket_id)?)?
483            .keyed_sync_states)
484    }
485
486    /// Get a single of email from a bucket.
487    pub fn get_email(&self, bucket_name: &BucketFullName, id: EmailId) -> Result<Vec<Email>> {
488        let query_params = GetEmailQuery { id: id.0 };
489        Ok(self
490            .get_query::<_, _, GetEmailResponse>(
491                self.endpoints.get_emails(bucket_name)?,
492                Some(&query_params),
493            )?
494            .emails)
495    }
496
497    /// Get a page of emails from a bucket.
498    pub fn get_emails_iter_page(
499        &self,
500        bucket_name: &BucketFullName,
501        continuation: Option<&EmailContinuation>,
502        limit: usize,
503    ) -> Result<EmailsIterPage> {
504        let query_params = GetEmailsIterPageQuery {
505            continuation,
506            limit,
507        };
508        self.post(
509            self.endpoints.get_emails(bucket_name)?,
510            Some(&query_params),
511            Retry::Yes,
512        )
513    }
514
515    /// Iterate through all comments in a source.
516    pub fn get_emails_iter<'a>(
517        &'a self,
518        bucket_name: &'a BucketFullName,
519        page_size: Option<usize>,
520    ) -> EmailsIter<'a> {
521        EmailsIter::new(self, bucket_name, page_size)
522    }
523
524    /// Get a single comment by id.
525    pub fn get_comment<'a>(
526        &'a self,
527        source_name: &'a SourceFullName,
528        comment_id: &'a CommentId,
529    ) -> Result<Comment> {
530        let query_params = GetCommentQuery {
531            include_markup: true,
532        };
533        Ok(self
534            .get_query::<_, _, GetCommentResponse>(
535                self.endpoints.comment_by_id(source_name, comment_id)?,
536                Some(&query_params),
537            )?
538            .comment)
539    }
540    pub fn post_integration(
541        &self,
542        name: &IntegrationFullName,
543        integration: &NewIntegration,
544    ) -> Result<PostIntegrationResponse> {
545        self.request(
546            &Method::POST,
547            &self.endpoints.integration(name)?,
548            &Some(PostIntegrationRequest {
549                integration: integration.clone(),
550            }),
551            &None::<()>,
552            &Retry::No,
553        )
554    }
555
556    pub fn put_integration(
557        &self,
558        name: &IntegrationFullName,
559        integration: &NewIntegration,
560    ) -> Result<PutIntegrationResponse> {
561        self.request(
562            &Method::PUT,
563            &self.endpoints.integration(name)?,
564            &Some(PutIntegrationRequest {
565                integration: integration.clone(),
566            }),
567            &None::<()>,
568            &Retry::No,
569        )
570    }
571
572    pub fn put_comments_split_on_failure(
573        &self,
574        source_name: &SourceFullName,
575        comments: Vec<NewComment>,
576        no_charge: bool,
577    ) -> Result<SplitableRequestResponse<PutCommentsResponse>> {
578        // Retrying here despite the potential for 409's in order to increase reliability when
579        // working with poor connection
580
581        self.splitable_request(
582            Method::PUT,
583            self.endpoints.put_comments(source_name)?,
584            PutCommentsRequest { comments },
585            Some(NoChargeQuery { no_charge }),
586            Retry::Yes,
587        )
588    }
589
590    pub fn put_comments(
591        &self,
592        source_name: &SourceFullName,
593        comments: Vec<NewComment>,
594        no_charge: bool,
595    ) -> Result<PutCommentsResponse> {
596        // Retrying here despite the potential for 409's in order to increase reliability when
597        // working with poor connection
598        self.request(
599            &Method::PUT,
600            &self.endpoints.put_comments(source_name)?,
601            &Some(PutCommentsRequest { comments }),
602            &Some(NoChargeQuery { no_charge }),
603            &Retry::Yes,
604        )
605    }
606
607    pub fn put_stream(
608        &self,
609        dataset_name: &DatasetFullName,
610        stream: &NewStream,
611    ) -> Result<PutStreamResponse> {
612        self.put(
613            self.endpoints.streams(dataset_name)?,
614            Some(PutStreamRequest { stream }),
615        )
616    }
617
618    pub fn get_audit_events(
619        &self,
620        minimum_timestamp: Option<DateTime<Utc>>,
621        maximum_timestamp: Option<DateTime<Utc>>,
622        continuation: Option<Continuation>,
623    ) -> Result<AuditQueryResponse> {
624        self.post::<_, _, AuditQueryResponse>(
625            self.endpoints.audit_events_query()?,
626            AuditQueryRequest {
627                continuation,
628                filter: AuditQueryFilter {
629                    timestamp: CommentTimestampFilter {
630                        minimum: minimum_timestamp,
631                        maximum: maximum_timestamp,
632                    },
633                },
634            },
635            Retry::Yes,
636        )
637    }
638    pub fn get_latest_validation(
639        &self,
640        dataset_name: &DatasetFullName,
641    ) -> Result<ValidationResponse> {
642        self.get::<_, ValidationResponse>(self.endpoints.latest_validation(dataset_name)?)
643    }
644
645    pub fn get_validation(
646        &self,
647        dataset_name: &DatasetFullName,
648        model_version: &ModelVersion,
649    ) -> Result<ValidationResponse> {
650        self.get::<_, ValidationResponse>(self.endpoints.validation(dataset_name, model_version)?)
651    }
652
653    pub fn get_labellers(&self, dataset_name: &DatasetFullName) -> Result<Vec<UserModelMetadata>> {
654        Ok(self
655            .post::<_, _, GetAllModelsInDatasetRespone>(
656                self.endpoints.labellers(dataset_name)?,
657                GetAllModelsInDatasetRequest {},
658                Retry::Yes,
659            )?
660            .labellers)
661    }
662
663    pub fn get_label_validation(
664        &self,
665        label: &LabelName,
666        dataset_name: &DatasetFullName,
667        model_version: &ModelVersion,
668    ) -> Result<LabelValidation> {
669        Ok(self
670            .post::<_, _, LabelValidationResponse>(
671                self.endpoints
672                    .label_validation(dataset_name, model_version)?,
673                LabelValidationRequest {
674                    label: label.clone(),
675                },
676                Retry::Yes,
677            )?
678            .label_validation)
679    }
680
681    pub fn sync_comments(
682        &self,
683        source_name: &SourceFullName,
684        comments: Vec<NewComment>,
685        no_charge: bool,
686    ) -> Result<SyncCommentsResponse> {
687        self.request(
688            &Method::POST,
689            &self.endpoints.sync_comments(source_name)?,
690            &Some(SyncCommentsRequest { comments }),
691            &Some(NoChargeQuery { no_charge }),
692            &Retry::Yes,
693        )
694    }
695
696    pub fn sync_comments_split_on_failure(
697        &self,
698        source_name: &SourceFullName,
699        comments: Vec<NewComment>,
700        no_charge: bool,
701    ) -> Result<SplitableRequestResponse<SyncCommentsResponse>> {
702        self.splitable_request(
703            Method::POST,
704            self.endpoints.sync_comments(source_name)?,
705            SyncCommentsRequest { comments },
706            Some(NoChargeQuery { no_charge }),
707            Retry::Yes,
708        )
709    }
710
711    pub fn sync_raw_emails(
712        &self,
713        source_name: &SourceFullName,
714        documents: &[Document],
715        transform_tag: &TransformTag,
716        include_comments: bool,
717        no_charge: bool,
718    ) -> Result<SyncRawEmailsResponse> {
719        self.request(
720            &Method::POST,
721            &self.endpoints.sync_comments_raw_emails(source_name)?,
722            &Some(SyncRawEmailsRequest {
723                documents,
724                transform_tag,
725                include_comments,
726            }),
727            &Some(NoChargeQuery { no_charge }),
728            &Retry::Yes,
729        )
730    }
731
732    pub fn put_emails_split_on_failure(
733        &self,
734        bucket_name: &BucketFullName,
735        emails: Vec<NewEmail>,
736        no_charge: bool,
737    ) -> Result<SplitableRequestResponse<PutEmailsResponse>> {
738        self.splitable_request(
739            Method::PUT,
740            self.endpoints.put_emails(bucket_name)?,
741            PutEmailsRequest { emails },
742            Some(NoChargeQuery { no_charge }),
743            Retry::Yes,
744        )
745    }
746
747    pub fn put_emails(
748        &self,
749        bucket_name: &BucketFullName,
750        emails: Vec<NewEmail>,
751        no_charge: bool,
752    ) -> Result<PutEmailsResponse> {
753        self.request(
754            &Method::PUT,
755            &self.endpoints.put_emails(bucket_name)?,
756            &Some(PutEmailsRequest { emails }),
757            &Some(NoChargeQuery { no_charge }),
758            &Retry::Yes,
759        )
760    }
761
762    pub fn post_user(&self, user_id: &UserId, user: UpdateUser) -> Result<PostUserResponse> {
763        self.post(
764            self.endpoints.post_user(user_id)?,
765            PostUserRequest { user: &user },
766            Retry::Yes,
767        )
768    }
769
770    pub fn put_comment_audio(
771        &self,
772        source_id: &SourceId,
773        comment_id: &CommentId,
774        audio_path: impl AsRef<Path>,
775    ) -> Result<()> {
776        let form = Form::new()
777            .file("file", audio_path)
778            .map_err(|source| Error::Unknown {
779                message: "PUT comment audio operation failed".to_owned(),
780                source: source.into(),
781            })?;
782        let http_response = self
783            .http_client
784            .put(self.endpoints.comment_audio(source_id, comment_id)?)
785            .headers(self.headers.clone())
786            .multipart(form)
787            .send()
788            .map_err(|source| Error::ReqwestError {
789                message: "PUT comment audio operation failed".to_owned(),
790                source,
791            })?;
792        let status = http_response.status();
793        http_response
794            .json::<Response<EmptySuccess>>()
795            .map_err(Error::BadJsonResponse)?
796            .into_result(status)?;
797        Ok(())
798    }
799
800    pub fn upload_ixp_document(
801        &self,
802        source_id: &SourceId,
803        filename: String,
804        bytes: Vec<u8>,
805    ) -> Result<CommentId> {
806        let endpoint = self.endpoints.ixp_documents(source_id)?;
807
808        let do_request = || {
809            let form = Form::new().part(
810                "file",
811                Part::bytes(bytes.clone()).file_name(filename.clone()),
812            );
813            let request = self
814                .http_client
815                .request(Method::PUT, endpoint.clone())
816                .multipart(form)
817                .headers(self.headers.clone());
818
819            request.send()
820        };
821
822        let result = self.with_retries(do_request);
823
824        let http_response = result.map_err(|source| Error::ReqwestError {
825            source,
826            message: "Operation failed.".to_string(),
827        })?;
828
829        let status = http_response.status();
830
831        Ok(http_response
832            .json::<Response<UploadIxpDocumentResponse>>()
833            .map_err(Error::BadJsonResponse)?
834            .into_result(status)?
835            .comment_id)
836    }
837
838    pub fn upload_comment_attachment(
839        &self,
840        source_id: &SourceId,
841        comment_id: &CommentId,
842        attachment_index: usize,
843        attachment: &PathBuf,
844    ) -> Result<UploadAttachmentResponse> {
845        let url = self
846            .endpoints
847            .attachment_upload(source_id, comment_id, attachment_index)?;
848
849        if !attachment.is_file() || !attachment.exists() {
850            return Err(Error::FileDoesNotExist {
851                path: attachment.clone(),
852            });
853        }
854
855        let do_request = || {
856            let form = Form::new()
857                .file("file", attachment)
858                .map_err(|source| Error::Unknown {
859                    message: "PUT comment attachment operation failed".to_owned(),
860                    source: source.into(),
861                })
862                .unwrap();
863            let request = self
864                .http_client
865                .request(Method::PUT, url.clone())
866                .multipart(form)
867                .headers(self.headers.clone());
868
869            request.send()
870        };
871
872        let result = self.with_retries(do_request);
873
874        let http_response = result.map_err(|source| Error::ReqwestError {
875            source,
876            message: "Operation failed.".to_string(),
877        })?;
878
879        let status = http_response.status();
880
881        http_response
882            .json::<Response<UploadAttachmentResponse>>()
883            .map_err(Error::BadJsonResponse)?
884            .into_result(status)
885    }
886
887    pub fn get_ixp_document(
888        &self,
889        source_id: &SourceId,
890        comment_id: &CommentId,
891    ) -> Result<Vec<u8>> {
892        self.get_octet_stream(&self.endpoints.ixp_document(source_id, comment_id)?)
893    }
894
895    fn get_octet_stream(&self, endpoint: &Url) -> Result<Vec<u8>> {
896        let mut response = self.raw_request(
897            &Method::GET,
898            endpoint,
899            &None::<()>,
900            &None::<()>,
901            &Retry::Yes,
902            None,
903        )?;
904
905        let mut buffer = Vec::new();
906
907        response
908            .read_to_end(&mut buffer)
909            .map_err(|source| Error::Unknown {
910                message: "Failed to read buffer".to_string(),
911                source: Box::new(source),
912            })?;
913        Ok(buffer)
914    }
915
916    pub fn get_attachment(&self, reference: &AttachmentReference) -> Result<Vec<u8>> {
917        self.get_octet_stream(&self.endpoints.attachment_reference(reference)?)
918    }
919
920    pub fn get_integrations(&self) -> Result<Vec<Integration>> {
921        Ok(self
922            .get::<_, GetIntegrationsResponse>(self.endpoints.integrations()?)?
923            .integrations)
924    }
925
926    pub fn get_integration(&self, name: &IntegrationFullName) -> Result<Integration> {
927        Ok(self
928            .get::<_, GetIntegrationResponse>(self.endpoints.integration(name)?)?
929            .integration)
930    }
931
932    pub fn get_datasets(&self) -> Result<Vec<Dataset>> {
933        Ok(self
934            .get::<_, GetAvailableDatasetsResponse>(self.endpoints.datasets.clone())?
935            .datasets)
936    }
937
938    pub fn get_dataset<IdentifierT>(&self, dataset: IdentifierT) -> Result<Dataset>
939    where
940        IdentifierT: Into<DatasetIdentifier>,
941    {
942        Ok(match dataset.into() {
943            DatasetIdentifier::Id(dataset_id) => {
944                self.get::<_, GetDatasetResponse>(self.endpoints.dataset_by_id(&dataset_id)?)?
945                    .dataset
946            }
947            DatasetIdentifier::FullName(dataset_name) => {
948                self.get::<_, GetDatasetResponse>(self.endpoints.dataset_by_name(&dataset_name)?)?
949                    .dataset
950            }
951        })
952    }
953
954    /// Create a ixp dataset.
955    pub fn create_ixp_dataset(&self, dataset: IxpDatasetNew) -> Result<Dataset> {
956        Ok(self
957            .put::<_, _, CreateIxpDatasetResponse>(
958                self.endpoints.ixp_datasets()?,
959                CreateIxpDatasetRequest { dataset },
960            )?
961            .dataset)
962    }
963
964    /// Create a dataset.
965    pub fn create_dataset(
966        &self,
967        dataset_name: &DatasetFullName,
968        options: NewDataset<'_>,
969    ) -> Result<Dataset> {
970        Ok(self
971            .put::<_, _, CreateDatasetResponse>(
972                self.endpoints.dataset_by_name(dataset_name)?,
973                CreateDatasetRequest { dataset: options },
974            )?
975            .dataset)
976    }
977
978    /// Update a dataset.
979    pub fn update_dataset(
980        &self,
981        dataset_name: &DatasetFullName,
982        options: UpdateDataset<'_>,
983    ) -> Result<Dataset> {
984        Ok(self
985            .post::<_, _, UpdateDatasetResponse>(
986                self.endpoints.dataset_by_name(dataset_name)?,
987                UpdateDatasetRequest { dataset: options },
988                Retry::Yes,
989            )?
990            .dataset)
991    }
992
993    pub fn delete_dataset<IdentifierT>(&self, dataset: IdentifierT) -> Result<()>
994    where
995        IdentifierT: Into<DatasetIdentifier>,
996    {
997        let dataset_id = match dataset.into() {
998            DatasetIdentifier::Id(dataset_id) => dataset_id,
999            dataset @ DatasetIdentifier::FullName(_) => self.get_dataset(dataset)?.id,
1000        };
1001        self.delete(self.endpoints.dataset_by_id(&dataset_id)?)
1002    }
1003
1004    /// Get labellings for a given a dataset and a list of comment UIDs.
1005    pub fn get_labellings<'a>(
1006        &self,
1007        dataset_name: &DatasetFullName,
1008        comment_uids: impl Iterator<Item = &'a CommentUid>,
1009    ) -> Result<Vec<AnnotatedComment>> {
1010        Ok(self
1011            .get_query::<_, _, GetAnnotationsResponse>(
1012                self.endpoints.get_labellings(dataset_name)?,
1013                Some(&id_list_query(comment_uids.into_iter().map(|id| &id.0))),
1014            )?
1015            .results)
1016    }
1017
1018    /// Iterate through all reviewed comments in a source.
1019    pub fn get_labellings_iter<'a>(
1020        &'a self,
1021        dataset_name: &'a DatasetFullName,
1022        source_id: &'a SourceId,
1023        return_predictions: bool,
1024        limit: Option<usize>,
1025    ) -> LabellingsIter<'a> {
1026        LabellingsIter::new(self, dataset_name, source_id, return_predictions, limit)
1027    }
1028
1029    /// Get reviewed comments in bulk
1030    pub fn get_labellings_in_bulk(
1031        &self,
1032        dataset_name: &DatasetFullName,
1033        query_parameters: GetLabellingsInBulk<'_>,
1034    ) -> Result<GetAnnotationsResponse> {
1035        self.get_query::<_, _, GetAnnotationsResponse>(
1036            self.endpoints.get_labellings(dataset_name)?,
1037            Some(&query_parameters),
1038        )
1039    }
1040
1041    /// Update labellings for a given a dataset and comment UID.
1042    pub fn update_labelling(
1043        &self,
1044        dataset_name: &DatasetFullName,
1045        comment_uid: &CommentUid,
1046        labelling: Option<&[NewLabelling]>,
1047        entities: Option<&NewEntities>,
1048        moon_forms: Option<&[NewMoonForm]>,
1049    ) -> Result<AnnotatedComment> {
1050        self.post::<_, _, AnnotatedComment>(
1051            self.endpoints.post_labelling(dataset_name, comment_uid)?,
1052            UpdateAnnotationsRequest {
1053                labelling,
1054                entities,
1055                moon_forms,
1056            },
1057            Retry::Yes,
1058        )
1059    }
1060
1061    /// Get predictions for a given a dataset, a model version, and a list of comment UIDs.
1062    pub fn get_comment_predictions<'a>(
1063        &self,
1064        dataset_name: &DatasetFullName,
1065        model_version: &ModelVersion,
1066        comment_uids: impl Iterator<Item = &'a CommentUid>,
1067        threshold: Option<CommentPredictionsThreshold>,
1068        labels: Option<Vec<TriggerLabelThreshold>>,
1069    ) -> Result<Vec<Prediction>> {
1070        Ok(self
1071            .post::<_, _, GetPredictionsResponse>(
1072                self.endpoints
1073                    .get_comment_predictions(dataset_name, model_version)?,
1074                GetCommentPredictionsRequest {
1075                    uids: comment_uids
1076                        .into_iter()
1077                        .map(|id| id.0.clone())
1078                        .collect::<Vec<_>>(),
1079
1080                    threshold,
1081                    labels,
1082                },
1083                Retry::Yes,
1084            )?
1085            .predictions)
1086    }
1087
1088    pub fn get_streams(&self, dataset_name: &DatasetFullName) -> Result<Vec<Stream>> {
1089        Ok(self
1090            .get::<_, GetStreamsResponse>(self.endpoints.streams(dataset_name)?)?
1091            .streams)
1092    }
1093
1094    pub fn get_recent_comments(
1095        &self,
1096        dataset_name: &DatasetFullName,
1097        filter: &CommentFilter,
1098        limit: usize,
1099        continuation: Option<&Continuation>,
1100    ) -> Result<RecentCommentsPage> {
1101        self.post::<_, _, RecentCommentsPage>(
1102            self.endpoints.recent_comments(dataset_name)?,
1103            GetRecentRequest {
1104                limit,
1105                filter,
1106                continuation,
1107            },
1108            Retry::No,
1109        )
1110    }
1111
1112    pub fn refresh_user_permissions(&self) -> Result<RefreshUserPermissionsResponse> {
1113        self.post::<_, _, RefreshUserPermissionsResponse>(
1114            self.endpoints.refresh_user_permissions()?,
1115            RefreshUserPermissionsRequest {},
1116            Retry::Yes,
1117        )
1118    }
1119
1120    pub fn get_current_user(&self) -> Result<User> {
1121        Ok(self
1122            .get::<_, GetCurrentUserResponse>(self.endpoints.current_user.clone())?
1123            .user)
1124    }
1125
1126    pub fn get_users(&self) -> Result<Vec<User>> {
1127        Ok(self
1128            .get::<_, GetAvailableUsersResponse>(self.endpoints.users.clone())?
1129            .users)
1130    }
1131
1132    pub fn create_user(&self, user: NewUser<'_>) -> Result<User> {
1133        Ok(self
1134            .put::<_, _, CreateUserResponse>(
1135                self.endpoints.users.clone(),
1136                CreateUserRequest { user },
1137            )?
1138            .user)
1139    }
1140
1141    pub fn dataset_summary(
1142        &self,
1143        dataset_name: &DatasetFullName,
1144        params: &SummaryRequestParams,
1145    ) -> Result<SummaryResponse> {
1146        self.post::<_, _, SummaryResponse>(
1147            self.endpoints.dataset_summary(dataset_name)?,
1148            serde_json::to_value(params).expect("summary params serialization error"),
1149            Retry::Yes,
1150        )
1151    }
1152
1153    pub fn query_dataset_csv(
1154        &self,
1155        dataset_name: &DatasetFullName,
1156        params: &QueryRequestParams,
1157    ) -> Result<String> {
1158        let response = self
1159            .raw_request(
1160                &Method::POST,
1161                &self.endpoints.query_dataset(dataset_name)?,
1162                &Some(serde_json::to_value(params).expect("query params serialization error")),
1163                &None::<()>,
1164                &Retry::Yes,
1165                Some(HeaderValue::from_str("text/csv").expect("Could not parse csv header")),
1166            )?
1167            .text()
1168            .expect("Could not get csv text");
1169
1170        Ok(response)
1171    }
1172
1173    pub fn query_dataset(
1174        &self,
1175        dataset_name: &DatasetFullName,
1176        params: &QueryRequestParams,
1177    ) -> Result<QueryResponse> {
1178        self.post::<_, _, QueryResponse>(
1179            self.endpoints.query_dataset(dataset_name)?,
1180            serde_json::to_value(params).expect("query params serialization error"),
1181            Retry::Yes,
1182        )
1183    }
1184
1185    pub fn send_welcome_email(&self, user_id: UserId) -> Result<()> {
1186        self.post::<_, _, WelcomeEmailResponse>(
1187            self.endpoints.welcome_email(&user_id)?,
1188            json!({}),
1189            Retry::No,
1190        )?;
1191        Ok(())
1192    }
1193
1194    pub fn get_bucket_statistics(&self, bucket_name: &BucketFullName) -> Result<BucketStatistics> {
1195        Ok(self
1196            .get::<_, GetBucketStatisticsResponse>(self.endpoints.bucket_statistics(bucket_name)?)?
1197            .statistics)
1198    }
1199
1200    pub fn get_dataset_statistics(
1201        &self,
1202        dataset_name: &DatasetFullName,
1203        params: &DatasetStatisticsRequestParams,
1204    ) -> Result<CommentStatistics> {
1205        Ok(self
1206            .post::<_, _, GetStatisticsResponse>(
1207                self.endpoints.dataset_statistics(dataset_name)?,
1208                serde_json::to_value(params)
1209                    .expect("dataset statistics params serialization error"),
1210                Retry::No,
1211            )?
1212            .statistics)
1213    }
1214
1215    pub fn get_source_statistics(
1216        &self,
1217        source_name: &SourceFullName,
1218        params: &SourceStatisticsRequestParams,
1219    ) -> Result<CommentStatistics> {
1220        Ok(self
1221            .post::<_, _, GetStatisticsResponse>(
1222                self.endpoints.source_statistics(source_name)?,
1223                serde_json::to_value(params).expect("source statistics params serialization error"),
1224                Retry::No,
1225            )?
1226            .statistics)
1227    }
1228
1229    /// Create a new bucket.
1230    pub fn create_bucket(
1231        &self,
1232        bucket_name: &BucketFullName,
1233        options: NewBucket<'_>,
1234    ) -> Result<Bucket> {
1235        Ok(self
1236            .put::<_, _, CreateBucketResponse>(
1237                self.endpoints.bucket_by_name(bucket_name)?,
1238                CreateBucketRequest { bucket: options },
1239            )?
1240            .bucket)
1241    }
1242
1243    pub fn get_buckets(&self) -> Result<Vec<Bucket>> {
1244        Ok(self
1245            .get::<_, GetAvailableBucketsResponse>(self.endpoints.buckets.clone())?
1246            .buckets)
1247    }
1248
1249    pub fn get_bucket<IdentifierT>(&self, bucket: IdentifierT) -> Result<Bucket>
1250    where
1251        IdentifierT: Into<BucketIdentifier>,
1252    {
1253        Ok(match bucket.into() {
1254            BucketIdentifier::Id(bucket_id) => {
1255                self.get::<_, GetBucketResponse>(self.endpoints.bucket_by_id(&bucket_id)?)?
1256                    .bucket
1257            }
1258            BucketIdentifier::FullName(bucket_name) => {
1259                self.get::<_, GetBucketResponse>(self.endpoints.bucket_by_name(&bucket_name)?)?
1260                    .bucket
1261            }
1262        })
1263    }
1264
1265    pub fn delete_bucket<IdentifierT>(&self, bucket: IdentifierT) -> Result<()>
1266    where
1267        IdentifierT: Into<BucketIdentifier>,
1268    {
1269        let bucket_id = match bucket.into() {
1270            BucketIdentifier::Id(bucket_id) => bucket_id,
1271            bucket @ BucketIdentifier::FullName(_) => self.get_bucket(bucket)?.id,
1272        };
1273        self.delete(self.endpoints.bucket_by_id(&bucket_id)?)
1274    }
1275
1276    pub fn fetch_stream_comments(
1277        &self,
1278        stream_name: &StreamFullName,
1279        size: u32,
1280    ) -> Result<StreamBatch> {
1281        self.post(
1282            self.endpoints.stream_fetch(stream_name)?,
1283            StreamFetchRequest { size },
1284            Retry::No,
1285        )
1286    }
1287
1288    pub fn get_stream(&self, stream_name: &StreamFullName) -> Result<Stream> {
1289        Ok(self
1290            .get::<_, GetStreamResponse>(self.endpoints.stream(stream_name)?)?
1291            .stream)
1292    }
1293
1294    pub fn advance_stream(
1295        &self,
1296        stream_name: &StreamFullName,
1297        sequence_id: StreamSequenceId,
1298    ) -> Result<()> {
1299        self.post::<_, _, serde::de::IgnoredAny>(
1300            self.endpoints.stream_advance(stream_name)?,
1301            StreamAdvanceRequest { sequence_id },
1302            Retry::No,
1303        )?;
1304        Ok(())
1305    }
1306
1307    pub fn reset_stream(
1308        &self,
1309        stream_name: &StreamFullName,
1310        to_comment_created_at: DateTime<Utc>,
1311    ) -> Result<()> {
1312        self.post::<_, _, serde::de::IgnoredAny>(
1313            self.endpoints.stream_reset(stream_name)?,
1314            StreamResetRequest {
1315                to_comment_created_at,
1316            },
1317            Retry::No,
1318        )?;
1319        Ok(())
1320    }
1321
1322    pub fn tag_stream_exceptions(
1323        &self,
1324        stream_name: &StreamFullName,
1325        exceptions: &[StreamException],
1326    ) -> Result<()> {
1327        self.put::<_, _, serde::de::IgnoredAny>(
1328            self.endpoints.stream_exceptions(stream_name)?,
1329            TagStreamExceptionsRequest { exceptions },
1330        )?;
1331        Ok(())
1332    }
1333
1334    /// Gets a project.
1335    pub fn get_project(&self, project_name: &ProjectName) -> Result<Project> {
1336        let response =
1337            self.get::<_, GetProjectResponse>(self.endpoints.project_by_name(project_name)?)?;
1338        Ok(response.project)
1339    }
1340
1341    /// Gets all projects.
1342    pub fn get_projects(&self) -> Result<Vec<Project>> {
1343        let response = self.get::<_, GetProjectsResponse>(self.endpoints.projects.clone())?;
1344        Ok(response.projects)
1345    }
1346
1347    /// Creates a new project.
1348    pub fn create_project(
1349        &self,
1350        project_name: &ProjectName,
1351        options: NewProject,
1352        user_ids: &[UserId],
1353    ) -> Result<Project> {
1354        Ok(self
1355            .put::<_, _, CreateProjectResponse>(
1356                self.endpoints.project_by_name(project_name)?,
1357                CreateProjectRequest {
1358                    project: options,
1359                    user_ids,
1360                },
1361            )?
1362            .project)
1363    }
1364
1365    /// Updates an existing project.
1366    pub fn update_project(
1367        &self,
1368        project_name: &ProjectName,
1369        options: UpdateProject,
1370    ) -> Result<Project> {
1371        Ok(self
1372            .post::<_, _, UpdateProjectResponse>(
1373                self.endpoints.project_by_name(project_name)?,
1374                UpdateProjectRequest { project: options },
1375                Retry::Yes,
1376            )?
1377            .project)
1378    }
1379
1380    /// Deletes an existing project.
1381    pub fn delete_project(
1382        &self,
1383        project_name: &ProjectName,
1384        force_delete: ForceDeleteProject,
1385    ) -> Result<()> {
1386        let endpoint = self.endpoints.project_by_name(project_name)?;
1387        match force_delete {
1388            ForceDeleteProject::No => self.delete(endpoint)?,
1389            ForceDeleteProject::Yes => {
1390                self.delete_query(endpoint, Some(&json!({ "force": true })))?
1391            }
1392        };
1393        Ok(())
1394    }
1395
1396    fn get<LocationT, SuccessT>(&self, url: LocationT) -> Result<SuccessT>
1397    where
1398        LocationT: IntoUrl + Display + Clone,
1399        for<'de> SuccessT: Deserialize<'de>,
1400    {
1401        self.request(&Method::GET, &url, &None::<()>, &None::<()>, &Retry::Yes)
1402    }
1403
1404    fn get_query<LocationT, QueryT, SuccessT>(
1405        &self,
1406        url: LocationT,
1407        query: Option<&QueryT>,
1408    ) -> Result<SuccessT>
1409    where
1410        LocationT: IntoUrl + Display + Clone,
1411        QueryT: Serialize,
1412        for<'de> SuccessT: Deserialize<'de>,
1413    {
1414        self.request(&Method::GET, &url, &None::<()>, &Some(query), &Retry::Yes)
1415    }
1416
1417    fn delete<LocationT>(&self, url: LocationT) -> Result<()>
1418    where
1419        LocationT: IntoUrl + Display + Clone,
1420    {
1421        self.delete_query::<LocationT, ()>(url, None)
1422    }
1423
1424    fn delete_query<LocationT, QueryT>(&self, url: LocationT, query: Option<&QueryT>) -> Result<()>
1425    where
1426        LocationT: IntoUrl + Display + Clone,
1427        QueryT: Serialize,
1428    {
1429        debug!("Attempting DELETE `{}`", url);
1430
1431        let attempts = Cell::new(0);
1432        let http_response = self
1433            .with_retries(|| {
1434                attempts.set(attempts.get() + 1);
1435
1436                let mut request = self
1437                    .http_client
1438                    .delete(url.clone())
1439                    .headers(self.headers.clone());
1440                if let Some(query) = query {
1441                    request = request.query(query);
1442                }
1443                request.send()
1444            })
1445            .map_err(|source| Error::ReqwestError {
1446                source,
1447                message: "DELETE operation failed.".to_owned(),
1448            })?;
1449        let status = http_response.status();
1450        http_response
1451            .json::<Response<EmptySuccess>>()
1452            .map_err(Error::BadJsonResponse)?
1453            .into_result(status)
1454            .map_or_else(
1455                // Ignore 404 not found if the request had to be re-tried - assume the target
1456                // object was deleted on a previous incomplete request.
1457                |error| {
1458                    if attempts.get() > 1 && status == reqwest::StatusCode::NOT_FOUND {
1459                        Ok(())
1460                    } else {
1461                        Err(error)
1462                    }
1463                },
1464                |_| Ok(()),
1465            )
1466    }
1467
1468    fn post<LocationT, RequestT, SuccessT>(
1469        &self,
1470        url: LocationT,
1471        request: RequestT,
1472        retry: Retry,
1473    ) -> Result<SuccessT>
1474    where
1475        LocationT: IntoUrl + Display + Clone,
1476        RequestT: Serialize,
1477        for<'de> SuccessT: Deserialize<'de>,
1478    {
1479        self.request(&Method::POST, &url, &Some(request), &None::<()>, &retry)
1480    }
1481
1482    fn put<LocationT, RequestT, SuccessT>(
1483        &self,
1484        url: LocationT,
1485        request: RequestT,
1486    ) -> Result<SuccessT>
1487    where
1488        LocationT: IntoUrl + Display + Clone,
1489        RequestT: Serialize,
1490        for<'de> SuccessT: Deserialize<'de>,
1491    {
1492        self.request(&Method::PUT, &url, &Some(request), &None::<()>, &Retry::Yes)
1493    }
1494
1495    fn raw_request<LocationT, RequestT, QueryT>(
1496        &self,
1497        method: &Method,
1498        url: &LocationT,
1499        body: &Option<RequestT>,
1500        query: &Option<QueryT>,
1501        retry: &Retry,
1502        accept_header: Option<HeaderValue>,
1503    ) -> Result<reqwest::blocking::Response>
1504    where
1505        LocationT: IntoUrl + Display + Clone,
1506        RequestT: Serialize,
1507        QueryT: Serialize,
1508    {
1509        let mut headers = self.headers.clone();
1510
1511        if let Some(accept_header) = accept_header {
1512            headers.insert(ACCEPT, accept_header);
1513        }
1514
1515        let do_request = || {
1516            let request = self
1517                .http_client
1518                .request(method.clone(), url.clone())
1519                .headers(headers.clone());
1520
1521            let request = match &query {
1522                Some(query) => request.query(query),
1523                None => request,
1524            };
1525            let request = match &body {
1526                Some(body) => request.json(body),
1527                None => request,
1528            };
1529            request.send()
1530        };
1531
1532        let result = match retry {
1533            Retry::Yes => self.with_retries(do_request),
1534            Retry::No => do_request(),
1535        };
1536        let http_response = result.map_err(|source| Error::ReqwestError {
1537            source,
1538            message: format!("{method} operation failed."),
1539        })?;
1540
1541        Ok(http_response)
1542    }
1543
1544    fn splitable_request<LocationT, RequestT, SuccessT, QueryT>(
1545        &self,
1546        method: Method,
1547        url: LocationT,
1548        body: RequestT,
1549        query: Option<QueryT>,
1550        retry: Retry,
1551    ) -> Result<SplitableRequestResponse<SuccessT>>
1552    where
1553        LocationT: IntoUrl + Display + Clone,
1554        RequestT: Serialize + SplittableRequest + Clone,
1555        QueryT: Serialize + Clone,
1556        for<'de> SuccessT: Deserialize<'de> + ReducibleResponse + Clone + Default,
1557    {
1558        debug!("Attempting {method} `{url}`");
1559        let result: Result<SuccessT> =
1560            self.request(&method, &url, &Some(body.clone()), &query, &retry);
1561
1562        fn should_split(error: &Error) -> bool {
1563            if let Error::Api { status_code, .. } = error {
1564                *status_code == reqwest::StatusCode::UNPROCESSABLE_ENTITY
1565                    || *status_code == reqwest::StatusCode::BAD_REQUEST
1566            } else if let Error::BadJsonResponse(_) = error {
1567                // This is for the case where some sort of network config (e.g. cloudflare) blocks
1568                // the request and returns invalid content
1569                true
1570            } else {
1571                false
1572            }
1573        }
1574
1575        match result {
1576            Ok(response) => Ok(SplitableRequestResponse {
1577                response,
1578                num_failed: 0,
1579            }),
1580            Err(error) if should_split(&error) => {
1581                let mut num_failed = 0;
1582                let response = body
1583                    .split()
1584                    .filter_map(|request| {
1585                        match self.request(&method, &url, &Some(request), &query, &retry) {
1586                            Ok(response) => Some(response),
1587                            Err(_) => {
1588                                num_failed += 1;
1589                                None
1590                            }
1591                        }
1592                    })
1593                    .fold(SuccessT::empty(), |merged, next: SuccessT| {
1594                        merged.merge(next)
1595                    });
1596
1597                Ok(SplitableRequestResponse {
1598                    num_failed,
1599                    response,
1600                })
1601            }
1602            Err(error) => Err(error),
1603        }
1604    }
1605
1606    fn request<LocationT, RequestT, SuccessT, QueryT>(
1607        &self,
1608        method: &Method,
1609        url: &LocationT,
1610        body: &Option<RequestT>,
1611        query: &Option<QueryT>,
1612        retry: &Retry,
1613    ) -> Result<SuccessT>
1614    where
1615        LocationT: IntoUrl + Display + Clone,
1616        RequestT: Serialize,
1617        QueryT: Serialize + Clone,
1618        for<'de> SuccessT: Deserialize<'de>,
1619    {
1620        debug!("Attempting {} `{}`", method, url);
1621        let http_response = self.raw_request(method, url, body, query, retry, None)?;
1622
1623        let status = http_response.status();
1624        http_response
1625            .json::<Response<SuccessT>>()
1626            .map_err(Error::BadJsonResponse)?
1627            .into_result(status)
1628    }
1629
1630    fn with_retries(
1631        &self,
1632        send_request: impl Fn() -> ReqwestResult<HttpResponse>,
1633    ) -> ReqwestResult<HttpResponse> {
1634        match &self.retrier {
1635            Some(retrier) => retrier.with_retries(send_request),
1636            None => send_request(),
1637        }
1638    }
1639}
1640
1641#[derive(Copy, Clone)]
1642enum Retry {
1643    Yes,
1644    No,
1645}
1646
1647pub struct DatasetQueryIter<'a> {
1648    client: &'a Client,
1649    dataset_name: &'a DatasetFullName,
1650    done: bool,
1651    params: &'a mut QueryRequestParams,
1652}
1653
1654impl<'a> DatasetQueryIter<'a> {
1655    fn new(
1656        client: &'a Client,
1657        dataset_name: &'a DatasetFullName,
1658        params: &'a mut QueryRequestParams,
1659    ) -> Self {
1660        Self {
1661            client,
1662            dataset_name,
1663            done: false,
1664            params,
1665        }
1666    }
1667}
1668
1669impl Iterator for DatasetQueryIter<'_> {
1670    type Item = Result<Vec<AnnotatedComment>>;
1671
1672    fn next(&mut self) -> Option<Self::Item> {
1673        if self.done {
1674            return None;
1675        }
1676
1677        let response = self.client.query_dataset(self.dataset_name, self.params);
1678        Some(response.map(|page| {
1679            self.params.continuation = page.continuation;
1680            self.done = self.params.continuation.is_none();
1681            page.results
1682        }))
1683    }
1684}
1685
1686pub enum ContinuationKind {
1687    Timestamp(DateTime<Utc>),
1688    Continuation(Continuation),
1689}
1690
1691pub struct EmailsIter<'a> {
1692    client: &'a Client,
1693    bucket_name: &'a BucketFullName,
1694    continuation: Option<EmailContinuation>,
1695    done: bool,
1696    page_size: usize,
1697}
1698
1699impl<'a> EmailsIter<'a> {
1700    // Default number of emails per page to request from API.
1701    pub const DEFAULT_PAGE_SIZE: usize = 64;
1702    // Maximum number of emails per page which can be requested from the API.
1703    pub const MAX_PAGE_SIZE: usize = 256;
1704
1705    fn new(client: &'a Client, bucket_name: &'a BucketFullName, page_size: Option<usize>) -> Self {
1706        Self {
1707            client,
1708            bucket_name,
1709            continuation: None,
1710            done: false,
1711            page_size: page_size.unwrap_or(Self::DEFAULT_PAGE_SIZE),
1712        }
1713    }
1714}
1715
1716impl Iterator for EmailsIter<'_> {
1717    type Item = Result<Vec<Email>>;
1718
1719    fn next(&mut self) -> Option<Self::Item> {
1720        if self.done {
1721            return None;
1722        }
1723        let response = self.client.get_emails_iter_page(
1724            self.bucket_name,
1725            self.continuation.as_ref(),
1726            self.page_size,
1727        );
1728        Some(response.map(|page| {
1729            self.continuation = page.continuation;
1730            self.done = self.continuation.is_none();
1731            page.emails
1732        }))
1733    }
1734}
1735
1736pub struct CommentsIter<'a> {
1737    client: &'a Client,
1738    source_name: &'a SourceFullName,
1739    continuation: Option<ContinuationKind>,
1740    done: bool,
1741    page_size: usize,
1742    to_timestamp: Option<DateTime<Utc>>,
1743}
1744
1745#[derive(Debug, Default)]
1746pub struct CommentsIterTimerange {
1747    pub from: Option<DateTime<Utc>>,
1748    pub to: Option<DateTime<Utc>>,
1749}
1750impl<'a> CommentsIter<'a> {
1751    // Default number of comments per page to request from API.
1752    pub const DEFAULT_PAGE_SIZE: usize = 64;
1753    // Maximum number of comments per page which can be requested from the API.
1754    pub const MAX_PAGE_SIZE: usize = 256;
1755
1756    fn new(
1757        client: &'a Client,
1758        source_name: &'a SourceFullName,
1759        page_size: Option<usize>,
1760        timerange: CommentsIterTimerange,
1761    ) -> Self {
1762        let (from_timestamp, to_timestamp) = (timerange.from, timerange.to);
1763        Self {
1764            client,
1765            source_name,
1766            to_timestamp,
1767            continuation: from_timestamp.map(ContinuationKind::Timestamp),
1768            done: false,
1769            page_size: page_size.unwrap_or(Self::DEFAULT_PAGE_SIZE),
1770        }
1771    }
1772}
1773
1774impl Iterator for CommentsIter<'_> {
1775    type Item = Result<Vec<Comment>>;
1776
1777    fn next(&mut self) -> Option<Self::Item> {
1778        if self.done {
1779            return None;
1780        }
1781        let response = self.client.get_comments_iter_page(
1782            self.source_name,
1783            self.continuation.as_ref(),
1784            self.to_timestamp,
1785            self.page_size,
1786        );
1787        Some(response.map(|page| {
1788            self.continuation = page.continuation.map(ContinuationKind::Continuation);
1789            self.done = self.continuation.is_none();
1790            page.comments
1791        }))
1792    }
1793}
1794
1795pub struct LabellingsIter<'a> {
1796    client: &'a Client,
1797    dataset_name: &'a DatasetFullName,
1798    source_id: &'a SourceId,
1799    return_predictions: bool,
1800    after: Option<GetLabellingsAfter>,
1801    limit: Option<usize>,
1802    done: bool,
1803}
1804
1805impl<'a> LabellingsIter<'a> {
1806    fn new(
1807        client: &'a Client,
1808        dataset_name: &'a DatasetFullName,
1809        source_id: &'a SourceId,
1810        return_predictions: bool,
1811        limit: Option<usize>,
1812    ) -> Self {
1813        Self {
1814            client,
1815            dataset_name,
1816            source_id,
1817            return_predictions,
1818            after: None,
1819            limit,
1820            done: false,
1821        }
1822    }
1823}
1824
1825impl Iterator for LabellingsIter<'_> {
1826    type Item = Result<Vec<AnnotatedComment>>;
1827
1828    fn next(&mut self) -> Option<Self::Item> {
1829        if self.done {
1830            return None;
1831        }
1832        let response = self.client.get_labellings_in_bulk(
1833            self.dataset_name,
1834            GetLabellingsInBulk {
1835                source_id: self.source_id,
1836                return_predictions: &self.return_predictions,
1837                after: &self.after,
1838                limit: &self.limit,
1839            },
1840        );
1841        Some(response.map(|page| {
1842            if self.after == page.after && !page.results.is_empty() {
1843                panic!("Labellings API did not increment pagination continuation");
1844            }
1845            self.after = page.after;
1846            if page.results.is_empty() {
1847                self.done = true;
1848            }
1849            page.results
1850        }))
1851    }
1852}
1853
1854#[derive(Debug)]
1855struct Endpoints {
1856    base: Url,
1857    datasets: Url,
1858    sources: Url,
1859    buckets: Url,
1860    users: Url,
1861    current_user: Url,
1862    projects: Url,
1863}
1864
1865#[derive(Debug, Serialize, Clone, Copy)]
1866struct NoChargeQuery {
1867    no_charge: bool,
1868}
1869
1870fn construct_endpoint(base: &Url, segments: &[&str]) -> Result<Url> {
1871    let mut endpoint = base.clone();
1872
1873    let mut endpoint_segments = endpoint
1874        .path_segments_mut()
1875        .map_err(|_| Error::BadEndpoint {
1876            endpoint: base.clone(),
1877        })?;
1878
1879    for segment in segments {
1880        endpoint_segments.push(segment);
1881    }
1882
1883    drop(endpoint_segments);
1884
1885    Ok(endpoint)
1886}
1887
1888impl Endpoints {
1889    pub fn new(base: Url) -> Result<Self> {
1890        let datasets = construct_endpoint(&base, &["api", "v1", "datasets"])?;
1891        let sources = construct_endpoint(&base, &["api", "v1", "sources"])?;
1892        let buckets = construct_endpoint(&base, &["api", "_private", "buckets"])?;
1893        let users = construct_endpoint(&base, &["api", "_private", "users"])?;
1894        let current_user = construct_endpoint(&base, &["auth", "user"])?;
1895        let projects = construct_endpoint(&base, &["api", "_private", "projects"])?;
1896
1897        Ok(Endpoints {
1898            base,
1899            datasets,
1900            sources,
1901            buckets,
1902            users,
1903            current_user,
1904            projects,
1905        })
1906    }
1907
1908    fn refresh_user_permissions(&self) -> Result<Url> {
1909        construct_endpoint(&self.base, &["auth", "refresh-user-permissions"])
1910    }
1911
1912    fn label_group(
1913        &self,
1914        dataset_name: &DatasetFullName,
1915        label_group: LabelGroupName,
1916    ) -> Result<Url> {
1917        construct_endpoint(
1918            &self.base,
1919            &[
1920                "api",
1921                "_private",
1922                "datasets",
1923                &dataset_name.0,
1924                "labels",
1925                &label_group.0,
1926            ],
1927        )
1928    }
1929
1930    fn ixp_datasets(&self) -> Result<Url> {
1931        construct_endpoint(&self.base, &["api", "_private", "ixp", "datasets"])
1932    }
1933
1934    fn ixp_documents(&self, source_id: &SourceId) -> Result<Url> {
1935        construct_endpoint(
1936            &self.base,
1937            &[
1938                "api",
1939                "_private",
1940                "sources",
1941                &format!("id:{0}", source_id.0),
1942                "documents",
1943            ],
1944        )
1945    }
1946
1947    fn ixp_document(&self, source_id: &SourceId, comment_id: &CommentId) -> Result<Url> {
1948        construct_endpoint(
1949            &self.base,
1950            &[
1951                "api",
1952                "_private",
1953                "sources",
1954                &format!("id:{0}", source_id.0),
1955                "documents",
1956                &comment_id.0,
1957            ],
1958        )
1959    }
1960
1961    fn keyed_sync_states(&self, bucket_id: &BucketId) -> Result<Url> {
1962        construct_endpoint(
1963            &self.base,
1964            &[
1965                "api",
1966                "_private",
1967                "buckets",
1968                &format!("id:{}", bucket_id.0),
1969                "keyed-sync-states/",
1970            ],
1971        )
1972    }
1973
1974    fn keyed_sync_state(&self, bucket_id: &BucketId, id: &KeyedSyncStateId) -> Result<Url> {
1975        construct_endpoint(
1976            &self.base,
1977            &[
1978                "api",
1979                "_private",
1980                "buckets",
1981                &format!("id:{}", bucket_id.0),
1982                "keyed-sync-state",
1983                &id.0,
1984            ],
1985        )
1986    }
1987
1988    fn query_keyed_sync_state_ids(&self, bucket_id: &BucketId) -> Result<Url> {
1989        construct_endpoint(
1990            &self.base,
1991            &[
1992                "api",
1993                "_private",
1994                "buckets",
1995                &format!("id:{}", bucket_id.0),
1996                "keyed-sync-state-ids",
1997            ],
1998        )
1999    }
2000
2001    fn audit_events_query(&self) -> Result<Url> {
2002        construct_endpoint(&self.base, &["api", "v1", "audit_events", "query"])
2003    }
2004
2005    fn integrations(&self) -> Result<Url> {
2006        construct_endpoint(&self.base, &["api", "_private", "integrations"])
2007    }
2008
2009    fn integration(&self, name: &IntegrationFullName) -> Result<Url> {
2010        construct_endpoint(&self.base, &["api", "_private", "integrations", &name.0])
2011    }
2012
2013    fn attachment_reference(&self, reference: &AttachmentReference) -> Result<Url> {
2014        construct_endpoint(&self.base, &["api", "v1", "attachments", &reference.0])
2015    }
2016
2017    fn attachment_upload(
2018        &self,
2019        source_id: &SourceId,
2020        comment_id: &CommentId,
2021        attachment_index: usize,
2022    ) -> Result<Url> {
2023        construct_endpoint(
2024            &self.base,
2025            &[
2026                "api",
2027                "_private",
2028                "sources",
2029                &format!("id:{}", source_id.0),
2030                "comments",
2031                &comment_id.0,
2032                "attachments",
2033                &attachment_index.to_string(),
2034            ],
2035        )
2036    }
2037    fn latest_validation(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2038        construct_endpoint(
2039            &self.base,
2040            &[
2041                "api",
2042                "_private",
2043                "datasets",
2044                &dataset_name.0,
2045                "labellers",
2046                "latest",
2047                "validation",
2048            ],
2049        )
2050    }
2051
2052    fn validation(
2053        &self,
2054        dataset_name: &DatasetFullName,
2055        model_version: &ModelVersion,
2056    ) -> Result<Url> {
2057        construct_endpoint(
2058            &self.base,
2059            &[
2060                "api",
2061                "_private",
2062                "datasets",
2063                &dataset_name.0,
2064                "labellers",
2065                &model_version.0.to_string(),
2066                "validation",
2067            ],
2068        )
2069    }
2070
2071    fn label_validation(
2072        &self,
2073        dataset_name: &DatasetFullName,
2074        model_version: &ModelVersion,
2075    ) -> Result<Url> {
2076        construct_endpoint(
2077            &self.base,
2078            &[
2079                "api",
2080                "_private",
2081                "datasets",
2082                &dataset_name.0,
2083                "labellers",
2084                &model_version.0.to_string(),
2085                "label-validation",
2086            ],
2087        )
2088    }
2089    fn bucket_statistics(&self, bucket_name: &BucketFullName) -> Result<Url> {
2090        construct_endpoint(
2091            &self.base,
2092            &["api", "_private", "buckets", &bucket_name.0, "statistics"],
2093        )
2094    }
2095
2096    fn dataset_summary(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2097        construct_endpoint(
2098            &self.base,
2099            &["api", "_private", "datasets", &dataset_name.0, "summary"],
2100        )
2101    }
2102
2103    fn query_dataset(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2104        construct_endpoint(
2105            &self.base,
2106            &["api", "_private", "datasets", &dataset_name.0, "query"],
2107        )
2108    }
2109
2110    fn streams(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2111        construct_endpoint(
2112            &self.base,
2113            &["api", "v1", "datasets", &dataset_name.0, "streams"],
2114        )
2115    }
2116
2117    fn stream(&self, stream_name: &StreamFullName) -> Result<Url> {
2118        construct_endpoint(
2119            &self.base,
2120            &[
2121                "api",
2122                "v1",
2123                "datasets",
2124                &stream_name.dataset.0,
2125                "streams",
2126                &stream_name.stream.0,
2127            ],
2128        )
2129    }
2130
2131    fn stream_fetch(&self, stream_name: &StreamFullName) -> Result<Url> {
2132        construct_endpoint(
2133            &self.base,
2134            &[
2135                "api",
2136                "v1",
2137                "datasets",
2138                &stream_name.dataset.0,
2139                "streams",
2140                &stream_name.stream.0,
2141                "fetch",
2142            ],
2143        )
2144    }
2145
2146    fn stream_advance(&self, stream_name: &StreamFullName) -> Result<Url> {
2147        construct_endpoint(
2148            &self.base,
2149            &[
2150                "api",
2151                "v1",
2152                "datasets",
2153                &stream_name.dataset.0,
2154                "streams",
2155                &stream_name.stream.0,
2156                "advance",
2157            ],
2158        )
2159    }
2160
2161    fn stream_reset(&self, stream_name: &StreamFullName) -> Result<Url> {
2162        construct_endpoint(
2163            &self.base,
2164            &[
2165                "api",
2166                "v1",
2167                "datasets",
2168                &stream_name.dataset.0,
2169                "streams",
2170                &stream_name.stream.0,
2171                "reset",
2172            ],
2173        )
2174    }
2175
2176    fn stream_exceptions(&self, stream_name: &StreamFullName) -> Result<Url> {
2177        construct_endpoint(
2178            &self.base,
2179            &[
2180                "api",
2181                "v1",
2182                "datasets",
2183                &stream_name.dataset.0,
2184                "streams",
2185                &stream_name.stream.0,
2186                "exceptions",
2187            ],
2188        )
2189    }
2190
2191    fn recent_comments(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2192        construct_endpoint(
2193            &self.base,
2194            &["api", "_private", "datasets", &dataset_name.0, "recent"],
2195        )
2196    }
2197
2198    fn dataset_statistics(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2199        construct_endpoint(
2200            &self.base,
2201            &["api", "_private", "datasets", &dataset_name.0, "statistics"],
2202        )
2203    }
2204
2205    fn source_statistics(&self, source_name: &SourceFullName) -> Result<Url> {
2206        construct_endpoint(
2207            &self.base,
2208            &["api", "v1", "sources", &source_name.0, "statistics"],
2209        )
2210    }
2211
2212    fn user_by_id(&self, user_id: &UserId) -> Result<Url> {
2213        construct_endpoint(&self.base, &["api", "_private", "users", &user_id.0])
2214    }
2215
2216    fn source_by_id(&self, source_id: &SourceId) -> Result<Url> {
2217        construct_endpoint(
2218            &self.base,
2219            &["api", "v1", "sources", &format!("id:{}", source_id.0)],
2220        )
2221    }
2222
2223    fn source_by_name(&self, source_name: &SourceFullName) -> Result<Url> {
2224        construct_endpoint(&self.base, &["api", "v1", "sources", &source_name.0])
2225    }
2226
2227    fn quotas(&self) -> Result<Url> {
2228        construct_endpoint(&self.base, &["api", "_private", "quotas"])
2229    }
2230
2231    fn quota(&self, tenant_id: &TenantId, tenant_quota_kind: TenantQuotaKind) -> Result<Url> {
2232        construct_endpoint(
2233            &self.base,
2234            &[
2235                "api",
2236                "_private",
2237                "quotas",
2238                &tenant_id.to_string(),
2239                &tenant_quota_kind.to_string(),
2240            ],
2241        )
2242    }
2243
2244    fn put_comments(&self, source_name: &SourceFullName) -> Result<Url> {
2245        construct_endpoint(
2246            &self.base,
2247            &["api", "_private", "sources", &source_name.0, "comments"],
2248        )
2249    }
2250
2251    fn comments(&self, source_name: &SourceFullName) -> Result<Url> {
2252        construct_endpoint(
2253            &self.base,
2254            &["api", "_private", "sources", &source_name.0, "comments"],
2255        )
2256    }
2257
2258    fn comment_by_id(&self, source_name: &SourceFullName, comment_id: &CommentId) -> Result<Url> {
2259        construct_endpoint(
2260            &self.base,
2261            &[
2262                "api",
2263                "v1",
2264                "sources",
2265                &source_name.0,
2266                "comments",
2267                &comment_id.0,
2268            ],
2269        )
2270    }
2271
2272    fn comments_v1(&self, source_name: &SourceFullName) -> Result<Url> {
2273        construct_endpoint(
2274            &self.base,
2275            &["api", "v1", "sources", &source_name.0, "comments"],
2276        )
2277    }
2278
2279    fn sync_comments(&self, source_name: &SourceFullName) -> Result<Url> {
2280        construct_endpoint(
2281            &self.base,
2282            &["api", "v1", "sources", &source_name.0, "sync"],
2283        )
2284    }
2285
2286    fn sync_comments_raw_emails(&self, source_name: &SourceFullName) -> Result<Url> {
2287        construct_endpoint(
2288            &self.base,
2289            &["api", "v1", "sources", &source_name.0, "sync-raw-emails"],
2290        )
2291    }
2292
2293    fn comment_audio(&self, source_id: &SourceId, comment_id: &CommentId) -> Result<Url> {
2294        construct_endpoint(
2295            &self.base,
2296            &[
2297                "api",
2298                "_private",
2299                "sources",
2300                &format!("id:{}", source_id.0),
2301                "comments",
2302                &comment_id.0,
2303                "audio",
2304            ],
2305        )
2306    }
2307
2308    fn get_emails(&self, bucket_name: &BucketFullName) -> Result<Url> {
2309        construct_endpoint(
2310            &self.base,
2311            &["api", "_private", "buckets", &bucket_name.0, "emails"],
2312        )
2313    }
2314
2315    fn put_emails(&self, bucket_name: &BucketFullName) -> Result<Url> {
2316        construct_endpoint(
2317            &self.base,
2318            &["api", "_private", "buckets", &bucket_name.0, "emails"],
2319        )
2320    }
2321
2322    fn post_user(&self, user_id: &UserId) -> Result<Url> {
2323        construct_endpoint(&self.base, &["api", "_private", "users", &user_id.0])
2324    }
2325
2326    fn dataset_by_id(&self, dataset_id: &DatasetId) -> Result<Url> {
2327        construct_endpoint(
2328            &self.base,
2329            &["api", "v1", "datasets", &format!("id:{}", dataset_id.0)],
2330        )
2331    }
2332
2333    fn dataset_by_name(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2334        construct_endpoint(&self.base, &["api", "v1", "datasets", &dataset_name.0])
2335    }
2336
2337    fn get_labellings(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2338        construct_endpoint(
2339            &self.base,
2340            &["api", "_private", "datasets", &dataset_name.0, "labellings"],
2341        )
2342    }
2343
2344    fn labellers(&self, dataset_name: &DatasetFullName) -> Result<Url> {
2345        construct_endpoint(
2346            &self.base,
2347            &["api", "_private", "datasets", &dataset_name.0, "labellers"],
2348        )
2349    }
2350
2351    fn get_comment_predictions(
2352        &self,
2353        dataset_name: &DatasetFullName,
2354        model_version: &ModelVersion,
2355    ) -> Result<Url> {
2356        construct_endpoint(
2357            &self.base,
2358            &[
2359                "api",
2360                "v1",
2361                "datasets",
2362                &dataset_name.0,
2363                "labellers",
2364                &model_version.0.to_string(),
2365                "predict-comments",
2366            ],
2367        )
2368    }
2369
2370    fn post_labelling(
2371        &self,
2372        dataset_name: &DatasetFullName,
2373        comment_uid: &CommentUid,
2374    ) -> Result<Url> {
2375        construct_endpoint(
2376            &self.base,
2377            &[
2378                "api",
2379                "_private",
2380                "datasets",
2381                &dataset_name.0,
2382                "labellings",
2383                &comment_uid.0,
2384            ],
2385        )
2386    }
2387
2388    fn bucket_by_id(&self, bucket_id: &BucketId) -> Result<Url> {
2389        construct_endpoint(
2390            &self.base,
2391            &["api", "_private", "buckets", &format!("id:{}", bucket_id.0)],
2392        )
2393    }
2394
2395    fn bucket_by_name(&self, bucket_name: &BucketFullName) -> Result<Url> {
2396        construct_endpoint(&self.base, &["api", "_private", "buckets", &bucket_name.0])
2397    }
2398
2399    fn project_by_name(&self, project_name: &ProjectName) -> Result<Url> {
2400        construct_endpoint(
2401            &self.base,
2402            &["api", "_private", "projects", &project_name.0],
2403        )
2404    }
2405
2406    fn welcome_email(&self, user_id: &UserId) -> Result<Url> {
2407        construct_endpoint(
2408            &self.base,
2409            &["api", "_private", "users", &user_id.0, "welcome-email"],
2410        )
2411    }
2412}
2413
2414const DEFAULT_HTTP_TIMEOUT_SECONDS: u64 = 240;
2415
2416fn build_http_client(config: &Config) -> Result<HttpClient> {
2417    let mut builder = HttpClient::builder()
2418        .gzip(true)
2419        .danger_accept_invalid_certs(config.accept_invalid_certificates)
2420        .timeout(Some(Duration::from_secs(DEFAULT_HTTP_TIMEOUT_SECONDS)));
2421
2422    if let Some(proxy) = config.proxy.clone() {
2423        builder = builder.proxy(Proxy::all(proxy).map_err(Error::BuildHttpClient)?);
2424    }
2425    builder.build().map_err(Error::BuildHttpClient)
2426}
2427
2428fn build_headers(config: &Config) -> Result<HeaderMap> {
2429    let mut headers = HeaderMap::new();
2430    headers.insert(
2431        header::AUTHORIZATION,
2432        HeaderValue::from_str(&format!("Bearer {}", &config.token.0)).map_err(|_| {
2433            Error::BadToken {
2434                token: config.token.0.clone(),
2435            }
2436        })?,
2437    );
2438    Ok(headers)
2439}
2440
2441fn id_list_query<'a>(ids: impl Iterator<Item = &'a String>) -> Vec<(&'static str, &'a str)> {
2442    // Return a list of pairs ("id", "a"), ("id", "b"), ...
2443    // The http client will turn this into a query string of
2444    // the form "id=a&id=b&..."
2445    ids.map(|id| ("id", id.as_str())).collect()
2446}
2447
2448pub static DEFAULT_ENDPOINT: Lazy<Url> =
2449    Lazy::new(|| Url::parse("https://reinfer.dev").expect("Default URL is well-formed"));
2450
2451#[cfg(test)]
2452mod tests {
2453    use super::*;
2454
2455    #[test]
2456    fn test_construct_endpoint() {
2457        let url = construct_endpoint(
2458            &Url::parse("https://cloud.uipath.com/org/tenant/reinfer_").unwrap(),
2459            &["api", "v1", "sources", "project", "source", "sync"],
2460        )
2461        .unwrap();
2462
2463        assert_eq!(
2464            url.to_string(),
2465            "https://cloud.uipath.com/org/tenant/reinfer_/api/v1/sources/project/source/sync"
2466        )
2467    }
2468
2469    #[test]
2470    fn test_id_list_query() {
2471        assert_eq!(id_list_query(Vec::new().iter()), Vec::new());
2472        assert_eq!(
2473            id_list_query(["foo".to_owned()].iter()),
2474            vec![("id", "foo")]
2475        );
2476        assert_eq!(
2477            id_list_query(
2478                [
2479                    "Stream".to_owned(),
2480                    "River".to_owned(),
2481                    "Waterfall".to_owned()
2482                ]
2483                .iter()
2484            ),
2485            [("id", "Stream"), ("id", "River"), ("id", "Waterfall"),]
2486        );
2487    }
2488}