1use std::collections::VecDeque;
2use std::future::Future;
3use std::{marker::PhantomData, sync::Arc};
4
5use futures::future::try_join_all;
6use futures::stream::{try_unfold, SelectAll};
7use futures::{StreamExt, TryStream};
8use serde::{de::DeserializeOwned, Serialize};
9
10use crate::dto::items::*;
11use crate::{
12 ApiClient, EqIdentity, Filter, Identity, IntoParams, IntoPatch, Partition, Patch, Result,
13 Search, SetCursor, UpsertOptions, WithPartition,
14};
15
16use super::utils::{get_duplicates_from_result, get_missing_from_result};
17
18pub struct Resource<T> {
21 pub api_client: Arc<ApiClient>,
23 marker: PhantomData<T>,
24}
25
26impl<T> Resource<T> {
27 pub fn new(api_client: Arc<ApiClient>) -> Self {
33 Resource {
34 api_client,
35 marker: PhantomData,
36 }
37 }
38}
39
40impl<T> Clone for Resource<T> {
41 fn clone(&self) -> Self {
42 Self {
43 api_client: self.api_client.clone(),
44 marker: PhantomData,
45 }
46 }
47}
48
49impl<T> WithApiClient for Resource<T> {
50 fn get_client(&self) -> &ApiClient {
51 &self.api_client
52 }
53}
54
55pub trait WithApiClient {
57 fn get_client(&self) -> &ApiClient;
59}
60
61pub trait WithBasePath {
63 const BASE_PATH: &'static str;
65}
66
67pub trait List<TParams, TResponse>
69where
70 TParams: IntoParams + Send + Sync + 'static,
71 TResponse: Serialize + DeserializeOwned + Send + Sync,
72 Self: WithApiClient + WithBasePath + Sync,
73{
74 fn list(
80 &self,
81 params: Option<TParams>,
82 ) -> impl Future<Output = Result<ItemsVec<TResponse, Cursor>>> + Send {
83 async move {
84 self.get_client()
85 .get_with_params(Self::BASE_PATH, params)
86 .await
87 }
88 }
89
90 fn list_all(&self, mut params: TParams) -> impl Future<Output = Result<Vec<TResponse>>> + Send
96 where
97 TParams: SetCursor + Clone,
98 TResponse: Send,
99 {
100 async move {
101 let mut result = vec![];
102 loop {
103 let lparams = params.clone();
104 let response: ItemsVec<TResponse, Cursor> = self
105 .get_client()
106 .get_with_params(Self::BASE_PATH, Some(lparams))
107 .await?;
108 for it in response.items {
109 result.push(it);
110 }
111 match response.extra_fields.next_cursor {
112 Some(cursor) => params.set_cursor(Some(cursor)),
113 None => return Ok(result),
114 }
115 }
116 }
117 }
118
119 fn list_all_stream(
129 &self,
130 params: TParams,
131 ) -> impl TryStream<Ok = TResponse, Error = crate::Error, Item = Result<TResponse>> + Send
132 where
133 TParams: SetCursor + Clone,
134 TResponse: Send + 'static,
135 {
136 let state = CursorStreamState {
137 req: params,
138 responses: VecDeque::new(),
139 next_cursor: CursorState::Initial,
140 };
141
142 try_unfold(state, move |mut state| async move {
143 if let Some(next) = state.responses.pop_front() {
144 Ok(Some((next, state)))
145 } else {
146 let cursor = match std::mem::take(&mut state.next_cursor) {
147 CursorState::Initial => None,
148 CursorState::Some(x) => Some(x),
149 CursorState::End => {
150 return Ok(None);
151 }
152 };
153 state.req.set_cursor(cursor);
154 let response: ItemsVec<TResponse, Cursor> = self
155 .get_client()
156 .get_with_params(Self::BASE_PATH, Some(state.req.clone()))
157 .await?;
158
159 state.responses.extend(response.items);
160 state.next_cursor = match response.extra_fields.next_cursor {
161 Some(x) => CursorState::Some(x),
162 None => CursorState::End,
163 };
164 if let Some(next) = state.responses.pop_front() {
165 Ok(Some((next, state)))
166 } else {
167 Ok(None)
168 }
169 }
170 })
171 }
172}
173
174pub trait Create<TCreate, TResponse>
176where
177 TCreate: Serialize + Sync + Send,
178 TResponse: Serialize + DeserializeOwned + Send,
179 Self: WithApiClient + WithBasePath + Sync,
180{
181 fn create(&self, creates: &[TCreate]) -> impl Future<Output = Result<Vec<TResponse>>> + Send {
187 async move {
188 let items = Items::new(creates);
189 let response: ItemsVec<TResponse> =
190 self.get_client().post(Self::BASE_PATH, &items).await?;
191 Ok(response.items)
192 }
193 }
194
195 fn create_from(
201 &self,
202 creates: &[impl Into<TCreate> + Sync + Clone],
203 ) -> impl Future<Output = Result<Vec<TResponse>>> + Send {
204 async move {
205 let to_add: Vec<TCreate> = creates.iter().map(|i| i.clone().into()).collect();
206 self.create(&to_add).await
207 }
208 }
209
210 fn create_ignore_duplicates(
216 &self,
217 creates: &[TCreate],
218 ) -> impl Future<Output = Result<Vec<TResponse>>> + Send
219 where
220 TCreate: EqIdentity,
221 {
222 async move {
223 let resp = self.create(creates).await;
224
225 let duplicates: Option<Vec<Identity>> = get_duplicates_from_result(&resp);
226
227 if let Some(duplicates) = duplicates {
228 let next: Vec<&TCreate> = creates
229 .iter()
230 .filter(|c| !duplicates.iter().any(|i| c.eq(i)))
231 .collect();
232
233 if next.is_empty() {
234 if duplicates.len() == creates.len() {
235 return Ok(vec![]);
236 }
237 return resp;
238 }
239
240 let items = Items::new(next);
241 let response: ItemsVec<TResponse> =
242 self.get_client().post(Self::BASE_PATH, &items).await?;
243 Ok(response.items)
244 } else {
245 resp
246 }
247 }
248 }
249
250 fn create_from_ignore_duplicates<'a, T: 'a>(
257 &self,
258 creates: &'a [impl Into<TCreate> + Sync + Clone],
259 ) -> impl Future<Output = Result<Vec<TResponse>>> + Send
260 where
261 TCreate: EqIdentity,
262 {
263 async move {
264 let to_add: Vec<TCreate> = creates.iter().map(|i| i.clone().into()).collect();
265 self.create_ignore_duplicates(&to_add).await
266 }
267 }
268}
269
270pub trait Upsert<'a, TCreate, TUpdate, TResponse>
272where
273 TCreate: Serialize + Sync + Send + EqIdentity + 'a + Clone + IntoPatch<TUpdate>,
274 TUpdate: Serialize + Sync + Send + Default,
275 TResponse: Serialize + DeserializeOwned + Sync + Send,
276 Self: WithApiClient + WithBasePath + Sync,
277{
278 fn upsert(
287 &'a self,
288 upserts: &'a [TCreate],
289 options: &UpsertOptions,
290 ) -> impl Future<Output = Result<Vec<TResponse>>> + Send {
291 async move {
292 let items = Items::new(upserts);
293 let resp: Result<ItemsVec<TResponse>> =
294 self.get_client().post(Self::BASE_PATH, &items).await;
295
296 let duplicates: Option<Vec<Identity>> = get_duplicates_from_result(&resp);
297
298 if let Some(duplicates) = duplicates {
299 let mut to_create = Vec::with_capacity(upserts.len() - duplicates.len());
300 let mut to_update = Vec::with_capacity(duplicates.len());
301 for it in upserts {
302 let idt = duplicates.iter().find(|i| it.eq(i));
303 if let Some(idt) = idt {
304 to_update.push(Patch::<TUpdate> {
305 id: idt.clone(),
306 update: it.clone().patch(options),
307 });
308 } else {
309 to_create.push(it);
310 }
311 }
312
313 let mut result = Vec::with_capacity(to_create.len() + to_update.len());
314 if !to_create.is_empty() {
315 let mut create_response: ItemsVec<TResponse> = self
316 .get_client()
317 .post(Self::BASE_PATH, &Items::new(to_create))
318 .await?;
319 result.append(&mut create_response.items);
320 }
321 if !to_update.is_empty() {
322 let mut update_response: ItemsVec<TResponse> = self
323 .get_client()
324 .post(
325 &format!("{}/update", Self::BASE_PATH),
326 &Items::new(&to_update),
327 )
328 .await?;
329 result.append(&mut update_response.items);
330 }
331
332 Ok(result)
333 } else {
334 resp.map(|i| i.items)
335 }
336 }
337 }
338}
339
340impl<'a, T, TCreate, TUpdate, TResponse> Upsert<'a, TCreate, TUpdate, TResponse> for T
341where
342 T: Create<TCreate, TResponse> + Update<Patch<TUpdate>, TResponse> + Sync,
343 TCreate: Serialize + Sync + Send + EqIdentity + 'a + Clone + IntoPatch<TUpdate>,
344 TUpdate: Serialize + Sync + Send + Default,
345 TResponse: Serialize + DeserializeOwned + Sync + Send,
346{
347}
348
349pub trait UpsertCollection<TUpsert, TResponse> {
351 fn upsert(&self, collection: &TUpsert) -> impl Future<Output = Result<Vec<TResponse>>> + Send
357 where
358 TUpsert: Serialize + Sync + Send,
359 TResponse: Serialize + DeserializeOwned + Sync + Send,
360 Self: WithApiClient + WithBasePath + Sync,
361 {
362 async move {
363 let response: ItemsVec<TResponse> =
364 self.get_client().post(Self::BASE_PATH, &collection).await?;
365 Ok(response.items)
366 }
367 }
368}
369
370pub trait Delete<TIdt>
372where
373 TIdt: Serialize + Sync + Send,
374 Self: WithApiClient + WithBasePath + Sync,
375{
376 fn delete(&self, deletes: &[TIdt]) -> impl Future<Output = Result<()>> + Send {
382 async move {
383 let items = Items::new(deletes);
384 self.get_client()
385 .post::<::serde_json::Value, Items<&[TIdt]>>(
386 &format!("{}/delete", Self::BASE_PATH),
387 &items,
388 )
389 .await?;
390 Ok(())
391 }
392 }
393}
394
395pub trait DeleteWithRequest<TReq>
397where
398 TReq: Serialize + Sync + Send,
399 Self: WithApiClient + WithBasePath + Sync,
400{
401 fn delete(&self, req: &TReq) -> impl Future<Output = Result<()>> + Send {
407 async move {
408 self.get_client()
409 .post::<::serde_json::Value, TReq>(&format!("{}/delete", Self::BASE_PATH), req)
410 .await?;
411 Ok(())
412 }
413 }
414}
415
416pub trait DeleteWithIgnoreUnknownIds<TIdt>
419where
420 TIdt: Serialize + Sync + Send,
421 Self: WithApiClient + WithBasePath + Sync,
422{
423 fn delete(
431 &self,
432 deletes: impl Into<TIdt> + Send,
433 ignore_unknown_ids: bool,
434 ) -> impl Future<Output = Result<()>> + Send
435 where
436 Self: Sync,
437 {
438 async move {
439 let req = Items::new_with_extra_fields(
440 deletes.into(),
441 IgnoreUnknownIds { ignore_unknown_ids },
442 );
443 self.get_client()
444 .post::<::serde_json::Value, _>(&format!("{}/delete", Self::BASE_PATH), &req)
445 .await?;
446 Ok(())
447 }
448 }
449}
450
451pub trait DeleteWithResponse<TIdt, TResponse>
454where
455 TIdt: Serialize + Sync + Send,
456 TResponse: Serialize + DeserializeOwned + Sync + Send,
457 Self: WithApiClient + WithBasePath + Sync,
458{
459 fn delete(&self, deletes: &[TIdt]) -> impl Future<Output = Result<ItemsVec<TResponse>>> + Send {
465 async move {
466 let items = Items::new(deletes);
467 let response: ItemsVec<TResponse> = self
468 .get_client()
469 .post(&format!("{}/delete", Self::BASE_PATH), &items)
470 .await?;
471 Ok(response)
472 }
473 }
474}
475
476pub trait Update<TUpdate, TResponse>
478where
479 TUpdate: Serialize + Sync + Send,
480 TResponse: Serialize + DeserializeOwned,
481 Self: WithApiClient + WithBasePath + Sync,
482{
483 fn update(&self, updates: &[TUpdate]) -> impl Future<Output = Result<Vec<TResponse>>> + Send {
489 async move {
490 let items = Items::new(updates);
491 let response: ItemsVec<TResponse> = self
492 .get_client()
493 .post(&format!("{}/update", Self::BASE_PATH), &items)
494 .await?;
495 Ok(response.items)
496 }
497 }
498
499 fn update_from<'a, T>(
505 &self,
506 updates: &'a [T],
507 ) -> impl Future<Output = Result<Vec<TResponse>>> + Send
508 where
509 T: std::marker::Sync + Clone + 'a,
510 TUpdate: From<T>,
511 {
512 async move {
513 let to_update: Vec<TUpdate> =
514 updates.iter().map(|i| TUpdate::from(i.clone())).collect();
515 self.update(&to_update).await
516 }
517 }
518
519 fn update_ignore_unknown_ids(
525 &self,
526 updates: &[TUpdate],
527 ) -> impl Future<Output = Result<Vec<TResponse>>> + Send
528 where
529 TUpdate: EqIdentity,
530 TResponse: Send,
531 {
532 async move {
533 let response = self.update(updates).await;
534 let missing: Option<Vec<Identity>> = get_missing_from_result(&response);
535
536 if let Some(missing) = missing {
537 let next: Vec<&TUpdate> = updates
538 .iter()
539 .filter(|c| !missing.iter().any(|i| c.eq(i)))
540 .collect();
541
542 if next.is_empty() {
543 if missing.len() == updates.len() {
544 return Ok(vec![]);
545 }
546 return response;
547 }
548
549 let items = Items::new(next);
550 let response: ItemsVec<TResponse> = self
551 .get_client()
552 .post(&format!("{}/update", Self::BASE_PATH), &items)
553 .await?;
554 Ok(response.items)
555 } else {
556 response
557 }
558 }
559 }
560
561 fn update_from_ignore_unknown_ids<'a, T>(
568 &self,
569 updates: &'a [T],
570 ) -> impl Future<Output = Result<Vec<TResponse>>> + Send
571 where
572 T: Sync + Clone + 'a,
573 TUpdate: From<T> + EqIdentity,
574 TResponse: Send,
575 {
576 async move {
577 let to_update: Vec<TUpdate> =
578 updates.iter().map(|i| TUpdate::from(i.clone())).collect();
579 self.update_ignore_unknown_ids(&to_update).await
580 }
581 }
582}
583
584pub trait Retrieve<TIdt, TResponse>
586where
587 TIdt: Serialize + Sync + Send,
588 TResponse: Serialize + DeserializeOwned,
589 Self: WithApiClient + WithBasePath + Sync,
590{
591 fn retrieve(&self, ids: &[TIdt]) -> impl Future<Output = Result<Vec<TResponse>>> + Send {
597 async move {
598 let items = Items::new(ids);
599 let response: ItemsVec<TResponse> = self
600 .get_client()
601 .post(&format!("{}/byids", Self::BASE_PATH), &items)
602 .await?;
603 Ok(response.items)
604 }
605 }
606}
607
608pub trait RetrieveWithRequest<TRequest, TResponse>
610where
611 TRequest: Serialize + Sync + Send,
612 TResponse: Serialize + DeserializeOwned,
613 Self: WithApiClient + WithBasePath + Sync,
614{
615 fn retrieve(&self, req: &TRequest) -> impl Future<Output = Result<TResponse>> + Send {
621 async move {
622 let response: TResponse = self
623 .get_client()
624 .post(&format!("{}/byids", Self::BASE_PATH), req)
625 .await?;
626 Ok(response)
627 }
628 }
629}
630
631pub trait RetrieveWithIgnoreUnknownIds<TIdt, TResponse>
633where
634 TIdt: Serialize + Sync + Send,
635 TResponse: Serialize + DeserializeOwned,
636 Self: WithApiClient + WithBasePath + Sync,
637{
638 fn retrieve(
647 &self,
648 ids: impl Into<TIdt> + Send,
649 ignore_unknown_ids: bool,
650 ) -> impl Future<Output = Result<Vec<TResponse>>> + Send {
651 async move {
652 let items =
653 Items::new_with_extra_fields(ids.into(), IgnoreUnknownIds { ignore_unknown_ids });
654 let response: ItemsVec<TResponse> = self
655 .get_client()
656 .post(&format!("{}/byids", Self::BASE_PATH), &items)
657 .await?;
658 Ok(response.items)
659 }
660 }
661}
662
663pub trait FilterItems<TFilter, TResponse>
665where
666 TFilter: Serialize + Sync + Send + 'static,
667 TResponse: Serialize + DeserializeOwned,
668 Self: WithApiClient + WithBasePath + Sync,
669{
670 fn filter_items(
679 &self,
680 filter: TFilter,
681 cursor: Option<String>,
682 limit: Option<u32>,
683 ) -> impl Future<Output = Result<ItemsVec<TResponse, Cursor>>> + Send {
684 async move {
685 let filter = Filter::<TFilter>::new(filter, cursor, limit);
686 let response: ItemsVec<TResponse, Cursor> = self
687 .get_client()
688 .post(&format!("{}/list", Self::BASE_PATH), &filter)
689 .await?;
690 Ok(response)
691 }
692 }
693}
694
695impl<TFilter, TResponse, T> FilterWithRequest<Filter<TFilter>, TResponse> for T
696where
697 TFilter: Serialize + Sync + Send + 'static,
698 TResponse: Serialize + DeserializeOwned,
699 T: FilterItems<TFilter, TResponse>,
700 Self: WithApiClient + WithBasePath,
701{
702}
703
704#[derive(Debug, Default)]
705pub(crate) enum CursorState {
706 Initial,
707 Some(String),
708 #[default]
709 End,
710}
711
712pub(crate) struct CursorStreamState<TFilter, TResponse> {
713 pub(crate) req: TFilter,
714 pub(crate) responses: VecDeque<TResponse>,
715 pub(crate) next_cursor: CursorState,
716}
717
718pub trait FilterWithRequest<TFilter, TResponse>
720where
721 TFilter: Serialize + Sync + Send + 'static,
722 TResponse: Serialize + DeserializeOwned,
723 Self: WithApiClient + WithBasePath + Sync,
724{
725 fn filter(
731 &self,
732 filter: TFilter,
733 ) -> impl Future<Output = Result<ItemsVec<TResponse, Cursor>>> + Send {
734 async move {
735 let response: ItemsVec<TResponse, Cursor> = self
736 .get_client()
737 .post(&format!("{}/list", Self::BASE_PATH), &filter)
738 .await?;
739 Ok(response)
740 }
741 }
742
743 fn filter_all(&self, mut filter: TFilter) -> impl Future<Output = Result<Vec<TResponse>>> + Send
749 where
750 TFilter: SetCursor,
751 TResponse: Send,
752 {
753 async move {
754 let mut result = vec![];
755 loop {
756 let response: ItemsVec<TResponse, Cursor> = self
757 .get_client()
758 .post(&format!("{}/list", Self::BASE_PATH), &filter)
759 .await?;
760 for it in response.items {
761 result.push(it);
762 }
763 match response.extra_fields.next_cursor {
764 Some(cursor) => filter.set_cursor(Some(cursor)),
765 None => return Ok(result),
766 }
767 }
768 }
769 }
770
771 fn filter_all_stream(
781 &self,
782 filter: TFilter,
783 ) -> impl TryStream<Ok = TResponse, Error = crate::Error, Item = Result<TResponse>> + Send
784 where
785 TFilter: SetCursor,
786 TResponse: Send + 'static,
787 {
788 let state = CursorStreamState {
789 req: filter,
790 responses: VecDeque::new(),
791 next_cursor: CursorState::Initial,
792 };
793
794 try_unfold(state, move |mut state| async move {
795 if let Some(next) = state.responses.pop_front() {
796 Ok(Some((next, state)))
797 } else {
798 let cursor = match std::mem::take(&mut state.next_cursor) {
799 CursorState::Initial => None,
800 CursorState::Some(x) => Some(x),
801 CursorState::End => {
802 return Ok(None);
803 }
804 };
805 state.req.set_cursor(cursor);
806 let response: ItemsVec<TResponse, Cursor> = self
807 .get_client()
808 .post(&format!("{}/list", Self::BASE_PATH), &state.req)
809 .await?;
810
811 state.responses.extend(response.items);
812 state.next_cursor = match response.extra_fields.next_cursor {
813 Some(x) => CursorState::Some(x),
814 None => CursorState::End,
815 };
816 if let Some(next) = state.responses.pop_front() {
817 Ok(Some((next, state)))
818 } else {
819 Ok(None)
820 }
821 }
822 })
823 }
824
825 fn filter_all_partitioned(
833 &self,
834 filter: TFilter,
835 num_partitions: u32,
836 ) -> impl Future<Output = Result<Vec<TResponse>>> + Send
837 where
838 TFilter: SetCursor + WithPartition,
839 TResponse: Send,
840 {
841 async move {
842 let mut futures = Vec::with_capacity(num_partitions as usize);
843 for partition in 0..num_partitions {
844 let part_filter =
845 filter.with_partition(Partition::new(partition + 1, num_partitions));
846 futures.push(self.filter_all(part_filter));
847 }
848 let results = try_join_all(futures).await?;
849 let mut response_items = Vec::with_capacity(results.iter().map(|i| i.len()).sum());
850 for chunk in results.into_iter() {
851 response_items.extend(chunk);
852 }
853 Ok(response_items)
854 }
855 }
856
857 fn filter_all_partitioned_stream(
870 &self,
871 filter: TFilter,
872 num_partitions: u32,
873 ) -> impl TryStream<Ok = TResponse, Error = crate::Error, Item = Result<TResponse>> + Send
874 where
875 TFilter: SetCursor + WithPartition,
876 TResponse: Send + 'static,
877 {
878 let mut streams = SelectAll::new();
879 for partition in 0..num_partitions {
880 let part_filter = filter.with_partition(Partition::new(partition + 1, num_partitions));
881 streams.push(self.filter_all_stream(part_filter).boxed());
882 }
883
884 streams
885 }
886}
887
888pub trait SearchItems<'a, TFilter, TSearch, TResponse>
890where
891 TFilter: Serialize + Sync + Send + 'a,
892 TSearch: Serialize + Sync + Send + 'a,
893 TResponse: Serialize + DeserializeOwned,
894 Self: WithApiClient + WithBasePath + Sync,
895{
896 fn search(
904 &'a self,
905 filter: TFilter,
906 search: TSearch,
907 limit: Option<u32>,
908 ) -> impl Future<Output = Result<Vec<TResponse>>> + Send {
909 async move {
910 let req = Search::<TFilter, TSearch>::new(filter, search, limit);
911 let response: ItemsVec<TResponse> = self
912 .get_client()
913 .post(&format!("{}/search", Self::BASE_PATH), &req)
914 .await?;
915 Ok(response.items)
916 }
917 }
918}