cognite/api/
resource.rs

1use std::collections::VecDeque;
2use std::future::Future;
3use std::{marker::PhantomData, sync::Arc};
4
5use futures::future::try_join_all;
6use futures::stream::{try_unfold, SelectAll};
7use futures::{StreamExt, TryStream};
8use serde::{de::DeserializeOwned, Serialize};
9
10use crate::dto::items::*;
11use crate::{
12    ApiClient, EqIdentity, Filter, Identity, IntoParams, IntoPatch, Partition, Patch, Result,
13    Search, SetCursor, UpsertOptions, WithPartition,
14};
15
16use super::utils::{get_duplicates_from_result, get_missing_from_result};
17
18/// A resource instance contains methods for accessing a single
19/// CDF resource type.
20pub struct Resource<T> {
21    /// A reference to the shared API Client.
22    pub api_client: Arc<ApiClient>,
23    marker: PhantomData<T>,
24}
25
26impl<T> Resource<T> {
27    /// Create a new resource with given API client.
28    ///
29    /// # Arguments
30    ///
31    /// * `api_client` - API client reference.
32    pub fn new(api_client: Arc<ApiClient>) -> Self {
33        Resource {
34            api_client,
35            marker: PhantomData,
36        }
37    }
38}
39
40impl<T> Clone for Resource<T> {
41    fn clone(&self) -> Self {
42        Self {
43            api_client: self.api_client.clone(),
44            marker: PhantomData,
45        }
46    }
47}
48
49impl<T> WithApiClient for Resource<T> {
50    fn get_client(&self) -> &ApiClient {
51        &self.api_client
52    }
53}
54
55/// Trait for a type that contains an API client.
56pub trait WithApiClient {
57    /// Get the API client for this type.
58    fn get_client(&self) -> &ApiClient;
59}
60
61/// Trait for a type with a base path.
62pub trait WithBasePath {
63    /// Base path for this resource type.
64    const BASE_PATH: &'static str;
65}
66
67/// Trait for simple GET / endpoints.
68pub trait List<TParams, TResponse>
69where
70    TParams: IntoParams + Send + Sync + 'static,
71    TResponse: Serialize + DeserializeOwned + Send + Sync,
72    Self: WithApiClient + WithBasePath + Sync,
73{
74    /// Query a resource with optional query parameters.
75    ///
76    /// # Arguments
77    ///
78    /// * `params` - Query parameters.
79    fn list(
80        &self,
81        params: Option<TParams>,
82    ) -> impl Future<Output = Result<ItemsVec<TResponse, Cursor>>> + Send {
83        async move {
84            self.get_client()
85                .get_with_params(Self::BASE_PATH, params)
86                .await
87        }
88    }
89
90    /// Query a resource with query parameters, continuing until the cursor is exhausted.
91    ///
92    /// # Arguments
93    ///
94    /// * `params` - Initial query parameters. This can contain a cursor which is the starting point.
95    fn list_all(&self, mut params: TParams) -> impl Future<Output = Result<Vec<TResponse>>> + Send
96    where
97        TParams: SetCursor + Clone,
98        TResponse: Send,
99    {
100        async move {
101            let mut result = vec![];
102            loop {
103                let lparams = params.clone();
104                let response: ItemsVec<TResponse, Cursor> = self
105                    .get_client()
106                    .get_with_params(Self::BASE_PATH, Some(lparams))
107                    .await?;
108                for it in response.items {
109                    result.push(it);
110                }
111                match response.extra_fields.next_cursor {
112                    Some(cursor) => params.set_cursor(Some(cursor)),
113                    None => return Ok(result),
114                }
115            }
116        }
117    }
118
119    /// List resources, following cursors. This returns a stream, you can abort the stream whenever you
120    /// want and only resources retrieved up to that point will be returned.
121    ///
122    /// Each item in the stream will be a result, after the first error is returned the
123    /// stream will end.
124    ///
125    /// # Arguments
126    ///
127    /// * `params` - Initial query parameters. This can contain a cursors used as starting point.
128    fn list_all_stream(
129        &self,
130        params: TParams,
131    ) -> impl TryStream<Ok = TResponse, Error = crate::Error, Item = Result<TResponse>> + Send
132    where
133        TParams: SetCursor + Clone,
134        TResponse: Send + 'static,
135    {
136        let state = CursorStreamState {
137            req: params,
138            responses: VecDeque::new(),
139            next_cursor: CursorState::Initial,
140        };
141
142        try_unfold(state, move |mut state| async move {
143            if let Some(next) = state.responses.pop_front() {
144                Ok(Some((next, state)))
145            } else {
146                let cursor = match std::mem::take(&mut state.next_cursor) {
147                    CursorState::Initial => None,
148                    CursorState::Some(x) => Some(x),
149                    CursorState::End => {
150                        return Ok(None);
151                    }
152                };
153                state.req.set_cursor(cursor);
154                let response: ItemsVec<TResponse, Cursor> = self
155                    .get_client()
156                    .get_with_params(Self::BASE_PATH, Some(state.req.clone()))
157                    .await?;
158
159                state.responses.extend(response.items);
160                state.next_cursor = match response.extra_fields.next_cursor {
161                    Some(x) => CursorState::Some(x),
162                    None => CursorState::End,
163                };
164                if let Some(next) = state.responses.pop_front() {
165                    Ok(Some((next, state)))
166                } else {
167                    Ok(None)
168                }
169            }
170        })
171    }
172}
173
174/// Trait for creating resources with POST / requests.
175pub trait Create<TCreate, TResponse>
176where
177    TCreate: Serialize + Sync + Send,
178    TResponse: Serialize + DeserializeOwned + Send,
179    Self: WithApiClient + WithBasePath + Sync,
180{
181    /// Create a list of resources.
182    ///
183    /// # Arguments
184    ///
185    /// `creates` - List of resources to create.
186    fn create(&self, creates: &[TCreate]) -> impl Future<Output = Result<Vec<TResponse>>> + Send {
187        async move {
188            let items = Items::new(creates);
189            let response: ItemsVec<TResponse> =
190                self.get_client().post(Self::BASE_PATH, &items).await?;
191            Ok(response.items)
192        }
193    }
194
195    /// Create a list of resources, converting from a different type.
196    ///
197    /// # Arguments
198    ///
199    /// * `creates` - List of resources to create.
200    fn create_from(
201        &self,
202        creates: &[impl Into<TCreate> + Sync + Clone],
203    ) -> impl Future<Output = Result<Vec<TResponse>>> + Send {
204        async move {
205            let to_add: Vec<TCreate> = creates.iter().map(|i| i.clone().into()).collect();
206            self.create(&to_add).await
207        }
208    }
209
210    /// Create a list of resources, ignoring any that fail with general "conflict" errors.
211    ///
212    /// # Arguments
213    ///
214    /// * `creates` - List of resources to create.
215    fn create_ignore_duplicates(
216        &self,
217        creates: &[TCreate],
218    ) -> impl Future<Output = Result<Vec<TResponse>>> + Send
219    where
220        TCreate: EqIdentity,
221    {
222        async move {
223            let resp = self.create(creates).await;
224
225            let duplicates: Option<Vec<Identity>> = get_duplicates_from_result(&resp);
226
227            if let Some(duplicates) = duplicates {
228                let next: Vec<&TCreate> = creates
229                    .iter()
230                    .filter(|c| !duplicates.iter().any(|i| c.eq(i)))
231                    .collect();
232
233                if next.is_empty() {
234                    if duplicates.len() == creates.len() {
235                        return Ok(vec![]);
236                    }
237                    return resp;
238                }
239
240                let items = Items::new(next);
241                let response: ItemsVec<TResponse> =
242                    self.get_client().post(Self::BASE_PATH, &items).await?;
243                Ok(response.items)
244            } else {
245                resp
246            }
247        }
248    }
249
250    /// Create a list of resources, converting from a different type, and ignoring any that fail
251    /// with general "conflict" errors.
252    ///
253    /// # Arguments
254    ///
255    /// * `creates` - List of resources to create.
256    fn create_from_ignore_duplicates<'a, T: 'a>(
257        &self,
258        creates: &'a [impl Into<TCreate> + Sync + Clone],
259    ) -> impl Future<Output = Result<Vec<TResponse>>> + Send
260    where
261        TCreate: EqIdentity,
262    {
263        async move {
264            let to_add: Vec<TCreate> = creates.iter().map(|i| i.clone().into()).collect();
265            self.create_ignore_duplicates(&to_add).await
266        }
267    }
268}
269
270/// Trait for upserts of resources that support both Create and Update.
271pub trait Upsert<'a, TCreate, TUpdate, TResponse>
272where
273    TCreate: Serialize + Sync + Send + EqIdentity + 'a + Clone + IntoPatch<TUpdate>,
274    TUpdate: Serialize + Sync + Send + Default,
275    TResponse: Serialize + DeserializeOwned + Sync + Send,
276    Self: WithApiClient + WithBasePath + Sync,
277{
278    /// Upsert a list resources, meaning that they will first be attempted created,
279    /// and if that fails with a conflict, update any that already existed, and create
280    /// the remainder.
281    ///
282    /// # Arguments
283    ///
284    /// * `upserts` - Resources to insert or update.
285    /// * `options` - Configuration for upserts, which fields are kept and which are overwritten.
286    fn upsert(
287        &'a self,
288        upserts: &'a [TCreate],
289        options: &UpsertOptions,
290    ) -> impl Future<Output = Result<Vec<TResponse>>> + Send {
291        async move {
292            let items = Items::new(upserts);
293            let resp: Result<ItemsVec<TResponse>> =
294                self.get_client().post(Self::BASE_PATH, &items).await;
295
296            let duplicates: Option<Vec<Identity>> = get_duplicates_from_result(&resp);
297
298            if let Some(duplicates) = duplicates {
299                let mut to_create = Vec::with_capacity(upserts.len() - duplicates.len());
300                let mut to_update = Vec::with_capacity(duplicates.len());
301                for it in upserts {
302                    let idt = duplicates.iter().find(|i| it.eq(i));
303                    if let Some(idt) = idt {
304                        to_update.push(Patch::<TUpdate> {
305                            id: idt.clone(),
306                            update: it.clone().patch(options),
307                        });
308                    } else {
309                        to_create.push(it);
310                    }
311                }
312
313                let mut result = Vec::with_capacity(to_create.len() + to_update.len());
314                if !to_create.is_empty() {
315                    let mut create_response: ItemsVec<TResponse> = self
316                        .get_client()
317                        .post(Self::BASE_PATH, &Items::new(to_create))
318                        .await?;
319                    result.append(&mut create_response.items);
320                }
321                if !to_update.is_empty() {
322                    let mut update_response: ItemsVec<TResponse> = self
323                        .get_client()
324                        .post(
325                            &format!("{}/update", Self::BASE_PATH),
326                            &Items::new(&to_update),
327                        )
328                        .await?;
329                    result.append(&mut update_response.items);
330                }
331
332                Ok(result)
333            } else {
334                resp.map(|i| i.items)
335            }
336        }
337    }
338}
339
340impl<'a, T, TCreate, TUpdate, TResponse> Upsert<'a, TCreate, TUpdate, TResponse> for T
341where
342    T: Create<TCreate, TResponse> + Update<Patch<TUpdate>, TResponse> + Sync,
343    TCreate: Serialize + Sync + Send + EqIdentity + 'a + Clone + IntoPatch<TUpdate>,
344    TUpdate: Serialize + Sync + Send + Default,
345    TResponse: Serialize + DeserializeOwned + Sync + Send,
346{
347}
348
349/// Trait for resource types that support upserts directly.
350pub trait UpsertCollection<TUpsert, TResponse> {
351    /// Upsert a list of resources.
352    ///
353    /// # Arguments
354    ///
355    /// * `collection` - Items to insert or update.
356    fn upsert(&self, collection: &TUpsert) -> impl Future<Output = Result<Vec<TResponse>>> + Send
357    where
358        TUpsert: Serialize + Sync + Send,
359        TResponse: Serialize + DeserializeOwned + Sync + Send,
360        Self: WithApiClient + WithBasePath + Sync,
361    {
362        async move {
363            let response: ItemsVec<TResponse> =
364                self.get_client().post(Self::BASE_PATH, &collection).await?;
365            Ok(response.items)
366        }
367    }
368}
369
370/// Trait for resource types that can be deleted with a list of `TIdt`.
371pub trait Delete<TIdt>
372where
373    TIdt: Serialize + Sync + Send,
374    Self: WithApiClient + WithBasePath + Sync,
375{
376    /// Delete a list of resources by ID.
377    ///
378    /// # Arguments
379    ///
380    /// * `deletes` - IDs of items to delete.
381    fn delete(&self, deletes: &[TIdt]) -> impl Future<Output = Result<()>> + Send {
382        async move {
383            let items = Items::new(deletes);
384            self.get_client()
385                .post::<::serde_json::Value, Items<&[TIdt]>>(
386                    &format!("{}/delete", Self::BASE_PATH),
387                    &items,
388                )
389                .await?;
390            Ok(())
391        }
392    }
393}
394
395/// Trait for resource types that can be deleted with a more complex request.
396pub trait DeleteWithRequest<TReq>
397where
398    TReq: Serialize + Sync + Send,
399    Self: WithApiClient + WithBasePath + Sync,
400{
401    /// Delete resources using `req`.
402    ///
403    /// # Arguments
404    ///
405    /// * `req` - Request describing items to delete.
406    fn delete(&self, req: &TReq) -> impl Future<Output = Result<()>> + Send {
407        async move {
408            self.get_client()
409                .post::<::serde_json::Value, TReq>(&format!("{}/delete", Self::BASE_PATH), req)
410                .await?;
411            Ok(())
412        }
413    }
414}
415
416/// Trait for resource types that can be deleted with a list of identities and
417/// a boolean option to ignore unknown ids.
418pub trait DeleteWithIgnoreUnknownIds<TIdt>
419where
420    TIdt: Serialize + Sync + Send,
421    Self: WithApiClient + WithBasePath + Sync,
422{
423    /// Delete a list of resources, optionally ignore unknown ids.
424    ///
425    /// # Arguments
426    ///
427    /// * `deletes` - IDs of items to delete.
428    /// * `ignore_unknown_ids` - If `true`, missing IDs will be ignored, and not
429    ///   cause the request to fail.
430    fn delete(
431        &self,
432        deletes: impl Into<TIdt> + Send,
433        ignore_unknown_ids: bool,
434    ) -> impl Future<Output = Result<()>> + Send
435    where
436        Self: Sync,
437    {
438        async move {
439            let req = Items::new_with_extra_fields(
440                deletes.into(),
441                IgnoreUnknownIds { ignore_unknown_ids },
442            );
443            self.get_client()
444                .post::<::serde_json::Value, _>(&format!("{}/delete", Self::BASE_PATH), &req)
445                .await?;
446            Ok(())
447        }
448    }
449}
450
451/// Trait for resource types that can be deleted, and where the delete request
452/// has a non-empty response.
453pub trait DeleteWithResponse<TIdt, TResponse>
454where
455    TIdt: Serialize + Sync + Send,
456    TResponse: Serialize + DeserializeOwned + Sync + Send,
457    Self: WithApiClient + WithBasePath + Sync,
458{
459    /// Delete a list of resources.
460    ///
461    /// # Arguments
462    ///
463    /// * `deletes` - IDs of items to delete.
464    fn delete(&self, deletes: &[TIdt]) -> impl Future<Output = Result<ItemsVec<TResponse>>> + Send {
465        async move {
466            let items = Items::new(deletes);
467            let response: ItemsVec<TResponse> = self
468                .get_client()
469                .post(&format!("{}/delete", Self::BASE_PATH), &items)
470                .await?;
471            Ok(response)
472        }
473    }
474}
475
476/// Trait for resource types that can be patch updated.
477pub trait Update<TUpdate, TResponse>
478where
479    TUpdate: Serialize + Sync + Send,
480    TResponse: Serialize + DeserializeOwned,
481    Self: WithApiClient + WithBasePath + Sync,
482{
483    /// Update a list of resources.
484    ///
485    /// # Arguments
486    ///
487    /// * `updates` - Items to update.
488    fn update(&self, updates: &[TUpdate]) -> impl Future<Output = Result<Vec<TResponse>>> + Send {
489        async move {
490            let items = Items::new(updates);
491            let response: ItemsVec<TResponse> = self
492                .get_client()
493                .post(&format!("{}/update", Self::BASE_PATH), &items)
494                .await?;
495            Ok(response.items)
496        }
497    }
498
499    /// Update a list of resources by converting to the update from a different type.
500    ///
501    /// # Arguments
502    ///
503    /// * `updates` - Items to update.
504    fn update_from<'a, T>(
505        &self,
506        updates: &'a [T],
507    ) -> impl Future<Output = Result<Vec<TResponse>>> + Send
508    where
509        T: std::marker::Sync + Clone + 'a,
510        TUpdate: From<T>,
511    {
512        async move {
513            let to_update: Vec<TUpdate> =
514                updates.iter().map(|i| TUpdate::from(i.clone())).collect();
515            self.update(&to_update).await
516        }
517    }
518
519    /// Update a list of resources, ignoring any that fail due to items missing in CDF.
520    ///
521    /// # Arguments
522    ///
523    /// * `updates` - Items to update.
524    fn update_ignore_unknown_ids(
525        &self,
526        updates: &[TUpdate],
527    ) -> impl Future<Output = Result<Vec<TResponse>>> + Send
528    where
529        TUpdate: EqIdentity,
530        TResponse: Send,
531    {
532        async move {
533            let response = self.update(updates).await;
534            let missing: Option<Vec<Identity>> = get_missing_from_result(&response);
535
536            if let Some(missing) = missing {
537                let next: Vec<&TUpdate> = updates
538                    .iter()
539                    .filter(|c| !missing.iter().any(|i| c.eq(i)))
540                    .collect();
541
542                if next.is_empty() {
543                    if missing.len() == updates.len() {
544                        return Ok(vec![]);
545                    }
546                    return response;
547                }
548
549                let items = Items::new(next);
550                let response: ItemsVec<TResponse> = self
551                    .get_client()
552                    .post(&format!("{}/update", Self::BASE_PATH), &items)
553                    .await?;
554                Ok(response.items)
555            } else {
556                response
557            }
558        }
559    }
560
561    /// Update a list of resources by converting from a different type, ignoring any that fail
562    /// due items missing in CDF.
563    ///
564    /// # Arguments
565    ///
566    /// * `updates` - Items to update.
567    fn update_from_ignore_unknown_ids<'a, T>(
568        &self,
569        updates: &'a [T],
570    ) -> impl Future<Output = Result<Vec<TResponse>>> + Send
571    where
572        T: Sync + Clone + 'a,
573        TUpdate: From<T> + EqIdentity,
574        TResponse: Send,
575    {
576        async move {
577            let to_update: Vec<TUpdate> =
578                updates.iter().map(|i| TUpdate::from(i.clone())).collect();
579            self.update_ignore_unknown_ids(&to_update).await
580        }
581    }
582}
583
584/// Trait for retrieving items from CDF by id.
585pub trait Retrieve<TIdt, TResponse>
586where
587    TIdt: Serialize + Sync + Send,
588    TResponse: Serialize + DeserializeOwned,
589    Self: WithApiClient + WithBasePath + Sync,
590{
591    /// Retrieve a list of items from CDF by id.
592    ///
593    /// # Arguments
594    ///
595    /// * `ids` - IDs of items to retrieve.
596    fn retrieve(&self, ids: &[TIdt]) -> impl Future<Output = Result<Vec<TResponse>>> + Send {
597        async move {
598            let items = Items::new(ids);
599            let response: ItemsVec<TResponse> = self
600                .get_client()
601                .post(&format!("{}/byids", Self::BASE_PATH), &items)
602                .await?;
603            Ok(response.items)
604        }
605    }
606}
607
608/// Trait for retrieving items from CDF with a more complex request type.
609pub trait RetrieveWithRequest<TRequest, TResponse>
610where
611    TRequest: Serialize + Sync + Send,
612    TResponse: Serialize + DeserializeOwned,
613    Self: WithApiClient + WithBasePath + Sync,
614{
615    /// Retrieve items from CDF with a more complex request.
616    ///
617    /// # Arguments
618    ///
619    /// * `req` - Request describing items to retrieve.
620    fn retrieve(&self, req: &TRequest) -> impl Future<Output = Result<TResponse>> + Send {
621        async move {
622            let response: TResponse = self
623                .get_client()
624                .post(&format!("{}/byids", Self::BASE_PATH), req)
625                .await?;
626            Ok(response)
627        }
628    }
629}
630
631/// Trait for retrieving items from CDF with an option to ignore unknown IDs.
632pub trait RetrieveWithIgnoreUnknownIds<TIdt, TResponse>
633where
634    TIdt: Serialize + Sync + Send,
635    TResponse: Serialize + DeserializeOwned,
636    Self: WithApiClient + WithBasePath + Sync,
637{
638    /// Retrieve a list of items from CDF. If ignore_unknown_ids is false,
639    /// this will fail if any items are missing from CDF.
640    ///
641    /// # Arguments
642    ///
643    /// * `ids` - IDs of items to retrieve.
644    /// * `ignore_unknown_ids` - If `true`, items missing from CDF will be ignored, and not
645    ///   cause the request to fail.
646    fn retrieve(
647        &self,
648        ids: impl Into<TIdt> + Send,
649        ignore_unknown_ids: bool,
650    ) -> impl Future<Output = Result<Vec<TResponse>>> + Send {
651        async move {
652            let items =
653                Items::new_with_extra_fields(ids.into(), IgnoreUnknownIds { ignore_unknown_ids });
654            let response: ItemsVec<TResponse> = self
655                .get_client()
656                .post(&format!("{}/byids", Self::BASE_PATH), &items)
657                .await?;
658            Ok(response.items)
659        }
660    }
661}
662
663/// Trait for resource types that allow filtering with a simple filter.
664pub trait FilterItems<TFilter, TResponse>
665where
666    TFilter: Serialize + Sync + Send + 'static,
667    TResponse: Serialize + DeserializeOwned,
668    Self: WithApiClient + WithBasePath + Sync,
669{
670    /// Filter resources using a simple filter.
671    /// The response may contain a cursor that can be used to paginate results.
672    ///
673    /// # Arguments
674    ///
675    /// * `filter` - Filter which items to retrieve.
676    /// * `cursor` - Optional cursor for pagination.
677    /// * `limit` - Maximum number of result items.
678    fn filter_items(
679        &self,
680        filter: TFilter,
681        cursor: Option<String>,
682        limit: Option<u32>,
683    ) -> impl Future<Output = Result<ItemsVec<TResponse, Cursor>>> + Send {
684        async move {
685            let filter = Filter::<TFilter>::new(filter, cursor, limit);
686            let response: ItemsVec<TResponse, Cursor> = self
687                .get_client()
688                .post(&format!("{}/list", Self::BASE_PATH), &filter)
689                .await?;
690            Ok(response)
691        }
692    }
693}
694
695impl<TFilter, TResponse, T> FilterWithRequest<Filter<TFilter>, TResponse> for T
696where
697    TFilter: Serialize + Sync + Send + 'static,
698    TResponse: Serialize + DeserializeOwned,
699    T: FilterItems<TFilter, TResponse>,
700    Self: WithApiClient + WithBasePath,
701{
702}
703
704#[derive(Debug, Default)]
705pub(crate) enum CursorState {
706    Initial,
707    Some(String),
708    #[default]
709    End,
710}
711
712pub(crate) struct CursorStreamState<TFilter, TResponse> {
713    pub(crate) req: TFilter,
714    pub(crate) responses: VecDeque<TResponse>,
715    pub(crate) next_cursor: CursorState,
716}
717
718/// Trait for resource types that allow filtering with a more complex request.
719pub trait FilterWithRequest<TFilter, TResponse>
720where
721    TFilter: Serialize + Sync + Send + 'static,
722    TResponse: Serialize + DeserializeOwned,
723    Self: WithApiClient + WithBasePath + Sync,
724{
725    /// Filter resources.
726    ///
727    /// # Arguments
728    ///
729    /// * `filter` - Filter which items to retrieve.
730    fn filter(
731        &self,
732        filter: TFilter,
733    ) -> impl Future<Output = Result<ItemsVec<TResponse, Cursor>>> + Send {
734        async move {
735            let response: ItemsVec<TResponse, Cursor> = self
736                .get_client()
737                .post(&format!("{}/list", Self::BASE_PATH), &filter)
738                .await?;
739            Ok(response)
740        }
741    }
742
743    /// Filter resources, following cursors until they are exhausted.
744    ///
745    /// # Arguments
746    ///
747    /// * `filter` - Filter which items to retrieve.
748    fn filter_all(&self, mut filter: TFilter) -> impl Future<Output = Result<Vec<TResponse>>> + Send
749    where
750        TFilter: SetCursor,
751        TResponse: Send,
752    {
753        async move {
754            let mut result = vec![];
755            loop {
756                let response: ItemsVec<TResponse, Cursor> = self
757                    .get_client()
758                    .post(&format!("{}/list", Self::BASE_PATH), &filter)
759                    .await?;
760                for it in response.items {
761                    result.push(it);
762                }
763                match response.extra_fields.next_cursor {
764                    Some(cursor) => filter.set_cursor(Some(cursor)),
765                    None => return Ok(result),
766                }
767            }
768        }
769    }
770
771    /// Filter resources, following cursors. This returns a stream, you can abort the stream whenever you
772    /// want and only resources retrieved up to that point will be returned.
773    ///
774    /// Each item in the stream will be a result, after the first error is returned the
775    /// stream will end.
776    ///
777    /// # Arguments
778    ///
779    /// * `filter` - Filter which items to retrieve.
780    fn filter_all_stream(
781        &self,
782        filter: TFilter,
783    ) -> impl TryStream<Ok = TResponse, Error = crate::Error, Item = Result<TResponse>> + Send
784    where
785        TFilter: SetCursor,
786        TResponse: Send + 'static,
787    {
788        let state = CursorStreamState {
789            req: filter,
790            responses: VecDeque::new(),
791            next_cursor: CursorState::Initial,
792        };
793
794        try_unfold(state, move |mut state| async move {
795            if let Some(next) = state.responses.pop_front() {
796                Ok(Some((next, state)))
797            } else {
798                let cursor = match std::mem::take(&mut state.next_cursor) {
799                    CursorState::Initial => None,
800                    CursorState::Some(x) => Some(x),
801                    CursorState::End => {
802                        return Ok(None);
803                    }
804                };
805                state.req.set_cursor(cursor);
806                let response: ItemsVec<TResponse, Cursor> = self
807                    .get_client()
808                    .post(&format!("{}/list", Self::BASE_PATH), &state.req)
809                    .await?;
810
811                state.responses.extend(response.items);
812                state.next_cursor = match response.extra_fields.next_cursor {
813                    Some(x) => CursorState::Some(x),
814                    None => CursorState::End,
815                };
816                if let Some(next) = state.responses.pop_front() {
817                    Ok(Some((next, state)))
818                } else {
819                    Ok(None)
820                }
821            }
822        })
823    }
824
825    /// Filter resources using partitioned reads, following cursors until all partitions are
826    /// exhausted.
827    ///
828    /// # Arguments
829    ///
830    /// * `filter` - Filter which items to retrieve.
831    /// * `num_partitions` - Number of partitions.
832    fn filter_all_partitioned(
833        &self,
834        filter: TFilter,
835        num_partitions: u32,
836    ) -> impl Future<Output = Result<Vec<TResponse>>> + Send
837    where
838        TFilter: SetCursor + WithPartition,
839        TResponse: Send,
840    {
841        async move {
842            let mut futures = Vec::with_capacity(num_partitions as usize);
843            for partition in 0..num_partitions {
844                let part_filter =
845                    filter.with_partition(Partition::new(partition + 1, num_partitions));
846                futures.push(self.filter_all(part_filter));
847            }
848            let results = try_join_all(futures).await?;
849            let mut response_items = Vec::with_capacity(results.iter().map(|i| i.len()).sum());
850            for chunk in results.into_iter() {
851                response_items.extend(chunk);
852            }
853            Ok(response_items)
854        }
855    }
856
857    /// Filter resources using partitioned reads, following cursors until all partitions
858    /// are exhausted. This returns a stream.
859    ///
860    /// Note that the returned stream is simply a combinator of streams returned by
861    /// `filter_all_stream` for different partitions.
862    ///
863    /// The order of the returned values is not guaranteed to be in any way consistent.
864    ///
865    /// # Arguments
866    ///
867    /// * `filter` - Filter which items to retrieve.
868    /// * `num_partitions` - Number of partitions.
869    fn filter_all_partitioned_stream(
870        &self,
871        filter: TFilter,
872        num_partitions: u32,
873    ) -> impl TryStream<Ok = TResponse, Error = crate::Error, Item = Result<TResponse>> + Send
874    where
875        TFilter: SetCursor + WithPartition,
876        TResponse: Send + 'static,
877    {
878        let mut streams = SelectAll::new();
879        for partition in 0..num_partitions {
880            let part_filter = filter.with_partition(Partition::new(partition + 1, num_partitions));
881            streams.push(self.filter_all_stream(part_filter).boxed());
882        }
883
884        streams
885    }
886}
887
888/// Trait for resource types that allow filtering with fuzzy search.
889pub trait SearchItems<'a, TFilter, TSearch, TResponse>
890where
891    TFilter: Serialize + Sync + Send + 'a,
892    TSearch: Serialize + Sync + Send + 'a,
893    TResponse: Serialize + DeserializeOwned,
894    Self: WithApiClient + WithBasePath + Sync,
895{
896    /// Fuzzy search resources.
897    ///
898    /// # Arguments
899    ///
900    /// * `filter` - Simple filter applied to items.
901    /// * `search` - Fuzzy search.
902    /// * `limit` - Maximum number of items to retrieve.
903    fn search(
904        &'a self,
905        filter: TFilter,
906        search: TSearch,
907        limit: Option<u32>,
908    ) -> impl Future<Output = Result<Vec<TResponse>>> + Send {
909        async move {
910            let req = Search::<TFilter, TSearch>::new(filter, search, limit);
911            let response: ItemsVec<TResponse> = self
912                .get_client()
913                .post(&format!("{}/search", Self::BASE_PATH), &req)
914                .await?;
915            Ok(response.items)
916        }
917    }
918}