1use crate::{
22 db,
23 error::EntityHydrationError,
24 events::{EntityEvents, GenericEvent},
25 one_time_executor::IntoOneTimeExecutor,
26 operation::AtomicOperation,
27 traits::*,
28};
29
30pub struct EsQuery<'q, Repo, Flavor, F, A> {
35 inner: sqlx::query::Map<'q, db::Db, F, A>,
36 _repo: std::marker::PhantomData<Repo>,
37 _flavor: std::marker::PhantomData<Flavor>,
38}
39
40pub struct EsQueryFlavorFlat;
42
43pub struct EsQueryFlavorNested;
45
46impl<'q, Repo, Flavor, F, A> EsQuery<'q, Repo, Flavor, F, A>
47where
48 Repo: EsRepo,
49 <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
50 F: FnMut(
51 db::Row,
52 ) -> Result<
53 GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
54 sqlx::Error,
55 > + Send,
56 A: 'q + Send + sqlx::IntoArguments<'q, db::Db>,
57{
58 pub fn new(query: sqlx::query::Map<'q, db::Db, F, A>) -> Self {
59 Self {
60 inner: query,
61 _repo: std::marker::PhantomData,
62 _flavor: std::marker::PhantomData,
63 }
64 }
65
66 async fn fetch_optional_inner<E: From<sqlx::Error> + From<EntityHydrationError>>(
67 self,
68 op: impl IntoOneTimeExecutor<'_>,
69 ) -> Result<Option<<Repo as EsRepo>::Entity>, E> {
70 let executor = op.into_executor();
71 let rows = executor.fetch_all(self.inner).await?;
72 if rows.is_empty() {
73 return Ok(None);
74 }
75
76 Ok(EntityEvents::load_first(rows.into_iter())?)
77 }
78
79 async fn fetch_n_inner<E: From<sqlx::Error> + From<EntityHydrationError>>(
80 self,
81 op: impl IntoOneTimeExecutor<'_>,
82 first: usize,
83 ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), E> {
84 let executor = op.into_executor();
85 let rows = executor.fetch_all(self.inner).await?;
86 Ok(EntityEvents::load_n(rows.into_iter(), first)?)
87 }
88}
89
90impl<'q, Repo, F, A> EsQuery<'q, Repo, EsQueryFlavorFlat, F, A>
91where
92 Repo: EsRepo,
93 <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
94 F: FnMut(
95 db::Row,
96 ) -> Result<
97 GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
98 sqlx::Error,
99 > + Send,
100 A: 'q + Send + sqlx::IntoArguments<'q, db::Db>,
101{
102 pub async fn fetch_optional(
106 self,
107 op: impl IntoOneTimeExecutor<'_>,
108 ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::QueryError> {
109 self.fetch_optional_inner(op).await
110 }
111
112 pub async fn fetch_n(
117 self,
118 op: impl IntoOneTimeExecutor<'_>,
119 first: usize,
120 ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::QueryError> {
121 self.fetch_n_inner(op, first).await
122 }
123}
124
125impl<'q, Repo, F, A> EsQuery<'q, Repo, EsQueryFlavorNested, F, A>
126where
127 Repo: EsRepo,
128 <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
129 F: FnMut(
130 db::Row,
131 ) -> Result<
132 GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
133 sqlx::Error,
134 > + Send,
135 A: 'q + Send + sqlx::IntoArguments<'q, db::Db>,
136{
137 pub async fn fetch_optional<OP>(
142 self,
143 op: &mut OP,
144 ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::QueryError>
145 where
146 OP: AtomicOperation,
147 {
148 let Some(entity) = self
149 .fetch_optional_inner::<<Repo as EsRepo>::QueryError>(&mut *op)
150 .await?
151 else {
152 return Ok(None);
153 };
154 let mut entities = [entity];
155 <Repo as EsRepo>::load_all_nested_in_op::<_, <Repo as EsRepo>::QueryError>(
156 op,
157 &mut entities,
158 )
159 .await?;
160 let [entity] = entities;
161 Ok(Some(entity))
162 }
163
164 pub async fn fetch_n<OP>(
169 self,
170 op: &mut OP,
171 first: usize,
172 ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::QueryError>
173 where
174 OP: AtomicOperation,
175 {
176 let (mut entities, more) = self
177 .fetch_n_inner::<<Repo as EsRepo>::QueryError>(&mut *op, first)
178 .await?;
179 <Repo as EsRepo>::load_all_nested_in_op::<_, <Repo as EsRepo>::QueryError>(
180 op,
181 &mut entities,
182 )
183 .await?;
184 Ok((entities, more))
185 }
186}