1use crate::{
4 events::{EntityEvents, GenericEvent},
5 one_time_executor::IntoOneTimeExecutor,
6 operation::AtomicOperation,
7 traits::*,
8};
9
10pub struct EsQuery<'q, Repo, Flavor, F, A> {
11 inner: sqlx::query::Map<'q, sqlx::Postgres, F, A>,
12 _repo: std::marker::PhantomData<Repo>,
13 _flavor: std::marker::PhantomData<Flavor>,
14}
15pub struct EsQueryFlavorFlat;
16pub struct EsQueryFlavorNested;
17
18impl<'q, Repo, Flavor, F, A> EsQuery<'q, Repo, Flavor, F, A>
19where
20 Repo: EsRepo,
21 <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
22 F: FnMut(
23 sqlx::postgres::PgRow,
24 ) -> Result<
25 GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
26 sqlx::Error,
27 > + Send,
28 A: 'q + Send + sqlx::IntoArguments<'q, sqlx::Postgres>,
29{
30 pub fn new(query: sqlx::query::Map<'q, sqlx::Postgres, F, A>) -> Self {
31 Self {
32 inner: query,
33 _repo: std::marker::PhantomData,
34 _flavor: std::marker::PhantomData,
35 }
36 }
37
38 async fn fetch_optional_inner(
39 self,
40 op: impl IntoOneTimeExecutor<'_>,
41 ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::Err> {
42 let executor = op.into_executor();
43 let rows = executor.fetch_all(self.inner).await?;
44 if rows.is_empty() {
45 return Ok(None);
46 }
47
48 Ok(Some(EntityEvents::load_first(rows.into_iter())?))
49 }
50
51 async fn fetch_one_inner(
52 self,
53 op: impl IntoOneTimeExecutor<'_>,
54 ) -> Result<<Repo as EsRepo>::Entity, <Repo as EsRepo>::Err> {
55 let executor = op.into_executor();
56 let rows = executor.fetch_all(self.inner).await?;
57 Ok(EntityEvents::load_first(rows.into_iter())?)
58 }
59
60 async fn fetch_n_inner(
61 self,
62 op: impl IntoOneTimeExecutor<'_>,
63 first: usize,
64 ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::Err> {
65 let executor = op.into_executor();
66 let rows = executor.fetch_all(self.inner).await?;
67 Ok(EntityEvents::load_n(rows.into_iter(), first)?)
68 }
69}
70
71impl<'q, Repo, F, A> EsQuery<'q, Repo, EsQueryFlavorFlat, F, A>
72where
73 Repo: EsRepo,
74 <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
75 F: FnMut(
76 sqlx::postgres::PgRow,
77 ) -> Result<
78 GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
79 sqlx::Error,
80 > + Send,
81 A: 'q + Send + sqlx::IntoArguments<'q, sqlx::Postgres>,
82{
83 pub async fn fetch_optional(
84 self,
85 op: impl IntoOneTimeExecutor<'_>,
86 ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::Err> {
87 self.fetch_optional_inner(op).await
88 }
89
90 pub async fn fetch_one(
91 self,
92 op: impl IntoOneTimeExecutor<'_>,
93 ) -> Result<<Repo as EsRepo>::Entity, <Repo as EsRepo>::Err> {
94 self.fetch_one_inner(op).await
95 }
96
97 pub async fn fetch_n(
98 self,
99 op: impl IntoOneTimeExecutor<'_>,
100 first: usize,
101 ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::Err> {
102 self.fetch_n_inner(op, first).await
103 }
104}
105
106impl<'q, Repo, F, A> EsQuery<'q, Repo, EsQueryFlavorNested, F, A>
107where
108 Repo: EsRepo,
109 <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
110 F: FnMut(
111 sqlx::postgres::PgRow,
112 ) -> Result<
113 GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
114 sqlx::Error,
115 > + Send,
116 A: 'q + Send + sqlx::IntoArguments<'q, sqlx::Postgres>,
117{
118 pub async fn fetch_optional<OP>(
119 self,
120 op: &mut OP,
121 ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::Err>
122 where
123 OP: AtomicOperation,
124 {
125 let Some(entity) = self.fetch_optional_inner(&mut *op).await? else {
126 return Ok(None);
127 };
128 let mut entities = [entity];
129 <Repo as EsRepo>::load_all_nested_in_op(op, &mut entities).await?;
130 let [entity] = entities;
131 Ok(Some(entity))
132 }
133
134 pub async fn fetch_one<OP>(
135 self,
136 op: &mut OP,
137 ) -> Result<<Repo as EsRepo>::Entity, <Repo as EsRepo>::Err>
138 where
139 OP: AtomicOperation,
140 {
141 let entity = self.fetch_one_inner(&mut *op).await?;
142 let mut entities = [entity];
143 <Repo as EsRepo>::load_all_nested_in_op(op, &mut entities).await?;
144 let [entity] = entities;
145 Ok(entity)
146 }
147
148 pub async fn fetch_n<OP>(
149 self,
150 op: &mut OP,
151 first: usize,
152 ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::Err>
153 where
154 OP: AtomicOperation,
155 {
156 let (mut entities, more) = self.fetch_n_inner(&mut *op, first).await?;
157 <Repo as EsRepo>::load_all_nested_in_op(op, &mut entities).await?;
158 Ok((entities, more))
159 }
160}