1use crate::{
22 error::EntityHydrationError,
23 events::{EntityEvents, GenericEvent},
24 one_time_executor::IntoOneTimeExecutor,
25 operation::AtomicOperation,
26 traits::*,
27};
28
29pub struct EsQuery<'q, Repo, Flavor, F, A> {
34 inner: sqlx::query::Map<'q, sqlx::Postgres, F, A>,
35 _repo: std::marker::PhantomData<Repo>,
36 _flavor: std::marker::PhantomData<Flavor>,
37}
38
39pub struct EsQueryFlavorFlat;
41
42pub struct EsQueryFlavorNested;
44
45impl<'q, Repo, Flavor, F, A> EsQuery<'q, Repo, Flavor, F, A>
46where
47 Repo: EsRepo,
48 <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
49 F: FnMut(
50 sqlx::postgres::PgRow,
51 ) -> Result<
52 GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
53 sqlx::Error,
54 > + Send,
55 A: 'q + Send + sqlx::IntoArguments<'q, sqlx::Postgres>,
56{
57 pub fn new(query: sqlx::query::Map<'q, sqlx::Postgres, F, A>) -> Self {
58 Self {
59 inner: query,
60 _repo: std::marker::PhantomData,
61 _flavor: std::marker::PhantomData,
62 }
63 }
64
65 async fn fetch_optional_inner<E: From<sqlx::Error> + From<EntityHydrationError>>(
66 self,
67 op: impl IntoOneTimeExecutor<'_>,
68 ) -> Result<Option<<Repo as EsRepo>::Entity>, E> {
69 let executor = op.into_executor();
70 let rows = executor.fetch_all(self.inner).await?;
71 if rows.is_empty() {
72 return Ok(None);
73 }
74
75 Ok(EntityEvents::load_first(rows.into_iter())?)
76 }
77
78 async fn fetch_n_inner<E: From<sqlx::Error> + From<EntityHydrationError>>(
79 self,
80 op: impl IntoOneTimeExecutor<'_>,
81 first: usize,
82 ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), E> {
83 let executor = op.into_executor();
84 let rows = executor.fetch_all(self.inner).await?;
85 Ok(EntityEvents::load_n(rows.into_iter(), first)?)
86 }
87}
88
89impl<'q, Repo, F, A> EsQuery<'q, Repo, EsQueryFlavorFlat, F, A>
90where
91 Repo: EsRepo,
92 <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
93 F: FnMut(
94 sqlx::postgres::PgRow,
95 ) -> Result<
96 GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
97 sqlx::Error,
98 > + Send,
99 A: 'q + Send + sqlx::IntoArguments<'q, sqlx::Postgres>,
100{
101 pub async fn fetch_optional(
105 self,
106 op: impl IntoOneTimeExecutor<'_>,
107 ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::QueryError> {
108 self.fetch_optional_inner(op).await
109 }
110
111 pub async fn fetch_n(
116 self,
117 op: impl IntoOneTimeExecutor<'_>,
118 first: usize,
119 ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::QueryError> {
120 self.fetch_n_inner(op, first).await
121 }
122}
123
124impl<'q, Repo, F, A> EsQuery<'q, Repo, EsQueryFlavorNested, F, A>
125where
126 Repo: EsRepo,
127 <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
128 F: FnMut(
129 sqlx::postgres::PgRow,
130 ) -> Result<
131 GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
132 sqlx::Error,
133 > + Send,
134 A: 'q + Send + sqlx::IntoArguments<'q, sqlx::Postgres>,
135{
136 pub async fn fetch_optional<OP>(
141 self,
142 op: &mut OP,
143 ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::QueryError>
144 where
145 OP: AtomicOperation,
146 {
147 let Some(entity) = self
148 .fetch_optional_inner::<<Repo as EsRepo>::QueryError>(&mut *op)
149 .await?
150 else {
151 return Ok(None);
152 };
153 let mut entities = [entity];
154 <Repo as EsRepo>::load_all_nested_in_op::<_, <Repo as EsRepo>::QueryError>(
155 op,
156 &mut entities,
157 )
158 .await?;
159 let [entity] = entities;
160 Ok(Some(entity))
161 }
162
163 pub async fn fetch_n<OP>(
168 self,
169 op: &mut OP,
170 first: usize,
171 ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::QueryError>
172 where
173 OP: AtomicOperation,
174 {
175 let (mut entities, more) = self
176 .fetch_n_inner::<<Repo as EsRepo>::QueryError>(&mut *op, first)
177 .await?;
178 <Repo as EsRepo>::load_all_nested_in_op::<_, <Repo as EsRepo>::QueryError>(
179 op,
180 &mut entities,
181 )
182 .await?;
183 Ok((entities, more))
184 }
185}