1use crate::{
22 events::{EntityEvents, GenericEvent},
23 one_time_executor::IntoOneTimeExecutor,
24 operation::AtomicOperation,
25 traits::*,
26};
27
28pub struct EsQuery<'q, Repo, Flavor, F, A> {
33 inner: sqlx::query::Map<'q, sqlx::Postgres, F, A>,
34 _repo: std::marker::PhantomData<Repo>,
35 _flavor: std::marker::PhantomData<Flavor>,
36}
37
38pub struct EsQueryFlavorFlat;
40
41pub struct EsQueryFlavorNested;
43
44impl<'q, Repo, Flavor, F, A> EsQuery<'q, Repo, Flavor, F, A>
45where
46 Repo: EsRepo,
47 <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
48 F: FnMut(
49 sqlx::postgres::PgRow,
50 ) -> Result<
51 GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
52 sqlx::Error,
53 > + Send,
54 A: 'q + Send + sqlx::IntoArguments<'q, sqlx::Postgres>,
55{
56 pub fn new(query: sqlx::query::Map<'q, sqlx::Postgres, F, A>) -> Self {
57 Self {
58 inner: query,
59 _repo: std::marker::PhantomData,
60 _flavor: std::marker::PhantomData,
61 }
62 }
63
64 async fn fetch_optional_inner(
65 self,
66 op: impl IntoOneTimeExecutor<'_>,
67 ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::Err> {
68 let executor = op.into_executor();
69 let rows = executor.fetch_all(self.inner).await?;
70 if rows.is_empty() {
71 return Ok(None);
72 }
73
74 Ok(Some(EntityEvents::load_first(rows.into_iter())?))
75 }
76
77 async fn fetch_one_inner(
78 self,
79 op: impl IntoOneTimeExecutor<'_>,
80 ) -> Result<<Repo as EsRepo>::Entity, <Repo as EsRepo>::Err> {
81 let executor = op.into_executor();
82 let rows = executor.fetch_all(self.inner).await?;
83 Ok(EntityEvents::load_first(rows.into_iter())?)
84 }
85
86 async fn fetch_n_inner(
87 self,
88 op: impl IntoOneTimeExecutor<'_>,
89 first: usize,
90 ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::Err> {
91 let executor = op.into_executor();
92 let rows = executor.fetch_all(self.inner).await?;
93 Ok(EntityEvents::load_n(rows.into_iter(), first)?)
94 }
95}
96
97impl<'q, Repo, F, A> EsQuery<'q, Repo, EsQueryFlavorFlat, F, A>
98where
99 Repo: EsRepo,
100 <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
101 F: FnMut(
102 sqlx::postgres::PgRow,
103 ) -> Result<
104 GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
105 sqlx::Error,
106 > + Send,
107 A: 'q + Send + sqlx::IntoArguments<'q, sqlx::Postgres>,
108{
109 pub async fn fetch_optional(
113 self,
114 op: impl IntoOneTimeExecutor<'_>,
115 ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::Err> {
116 self.fetch_optional_inner(op).await
117 }
118
119 pub async fn fetch_one(
123 self,
124 op: impl IntoOneTimeExecutor<'_>,
125 ) -> Result<<Repo as EsRepo>::Entity, <Repo as EsRepo>::Err> {
126 self.fetch_one_inner(op).await
127 }
128
129 pub async fn fetch_n(
134 self,
135 op: impl IntoOneTimeExecutor<'_>,
136 first: usize,
137 ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::Err> {
138 self.fetch_n_inner(op, first).await
139 }
140}
141
142impl<'q, Repo, F, A> EsQuery<'q, Repo, EsQueryFlavorNested, F, A>
143where
144 Repo: EsRepo,
145 <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
146 F: FnMut(
147 sqlx::postgres::PgRow,
148 ) -> Result<
149 GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
150 sqlx::Error,
151 > + Send,
152 A: 'q + Send + sqlx::IntoArguments<'q, sqlx::Postgres>,
153{
154 pub async fn fetch_optional<OP>(
159 self,
160 op: &mut OP,
161 ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::Err>
162 where
163 OP: AtomicOperation,
164 {
165 let Some(entity) = self.fetch_optional_inner(&mut *op).await? else {
166 return Ok(None);
167 };
168 let mut entities = [entity];
169 <Repo as EsRepo>::load_all_nested_in_op(op, &mut entities).await?;
170 let [entity] = entities;
171 Ok(Some(entity))
172 }
173
174 pub async fn fetch_one<OP>(
179 self,
180 op: &mut OP,
181 ) -> Result<<Repo as EsRepo>::Entity, <Repo as EsRepo>::Err>
182 where
183 OP: AtomicOperation,
184 {
185 let entity = self.fetch_one_inner(&mut *op).await?;
186 let mut entities = [entity];
187 <Repo as EsRepo>::load_all_nested_in_op(op, &mut entities).await?;
188 let [entity] = entities;
189 Ok(entity)
190 }
191
192 pub async fn fetch_n<OP>(
197 self,
198 op: &mut OP,
199 first: usize,
200 ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::Err>
201 where
202 OP: AtomicOperation,
203 {
204 let (mut entities, more) = self.fetch_n_inner(&mut *op, first).await?;
205 <Repo as EsRepo>::load_all_nested_in_op(op, &mut entities).await?;
206 Ok((entities, more))
207 }
208}