use crate::{
db,
error::EntityHydrationError,
events::{EntityEvents, GenericEvent},
one_time_executor::IntoOneTimeExecutor,
operation::AtomicOperation,
traits::*,
};
pub struct EsQuery<'q, Repo, Flavor, F, A> {
inner: sqlx::query::Map<'q, db::Db, F, A>,
_repo: std::marker::PhantomData<Repo>,
_flavor: std::marker::PhantomData<Flavor>,
}
pub struct EsQueryFlavorFlat;
pub struct EsQueryFlavorNested;
impl<'q, Repo, Flavor, F, A> EsQuery<'q, Repo, Flavor, F, A>
where
Repo: EsRepo,
<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
F: FnMut(
db::Row,
) -> Result<
GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
sqlx::Error,
> + Send,
A: 'q + Send + sqlx::IntoArguments<'q, db::Db>,
{
pub fn new(query: sqlx::query::Map<'q, db::Db, F, A>) -> Self {
Self {
inner: query,
_repo: std::marker::PhantomData,
_flavor: std::marker::PhantomData,
}
}
async fn fetch_optional_inner<E: From<sqlx::Error> + From<EntityHydrationError>>(
self,
op: impl IntoOneTimeExecutor<'_>,
) -> Result<Option<<Repo as EsRepo>::Entity>, E> {
let executor = op.into_executor();
let rows = executor.fetch_all(self.inner).await?;
if rows.is_empty() {
return Ok(None);
}
Ok(EntityEvents::load_first(rows.into_iter())?)
}
async fn fetch_n_inner<E: From<sqlx::Error> + From<EntityHydrationError>>(
self,
op: impl IntoOneTimeExecutor<'_>,
first: usize,
) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), E> {
let executor = op.into_executor();
let rows = executor.fetch_all(self.inner).await?;
Ok(EntityEvents::load_n(rows.into_iter(), first)?)
}
}
impl<'q, Repo, F, A> EsQuery<'q, Repo, EsQueryFlavorFlat, F, A>
where
Repo: EsRepo,
<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
F: FnMut(
db::Row,
) -> Result<
GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
sqlx::Error,
> + Send,
A: 'q + Send + sqlx::IntoArguments<'q, db::Db>,
{
pub async fn fetch_optional(
self,
op: impl IntoOneTimeExecutor<'_>,
) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::QueryError> {
self.fetch_optional_inner(op).await
}
pub async fn fetch_n(
self,
op: impl IntoOneTimeExecutor<'_>,
first: usize,
) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::QueryError> {
self.fetch_n_inner(op, first).await
}
}
impl<'q, Repo, F, A> EsQuery<'q, Repo, EsQueryFlavorNested, F, A>
where
Repo: EsRepo,
<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
F: FnMut(
db::Row,
) -> Result<
GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
sqlx::Error,
> + Send,
A: 'q + Send + sqlx::IntoArguments<'q, db::Db>,
{
pub async fn fetch_optional<OP>(
self,
op: &mut OP,
) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::QueryError>
where
OP: AtomicOperation,
{
let Some(entity) = self
.fetch_optional_inner::<<Repo as EsRepo>::QueryError>(&mut *op)
.await?
else {
return Ok(None);
};
let mut entities = [entity];
<Repo as EsRepo>::load_all_nested_in_op::<_, <Repo as EsRepo>::QueryError>(
op,
&mut entities,
)
.await?;
let [entity] = entities;
Ok(Some(entity))
}
pub async fn fetch_n<OP>(
self,
op: &mut OP,
first: usize,
) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::QueryError>
where
OP: AtomicOperation,
{
let (mut entities, more) = self
.fetch_n_inner::<<Repo as EsRepo>::QueryError>(&mut *op, first)
.await?;
<Repo as EsRepo>::load_all_nested_in_op::<_, <Repo as EsRepo>::QueryError>(
op,
&mut entities,
)
.await?;
Ok((entities, more))
}
pub async fn fetch_optional_include_deleted<OP>(
self,
op: &mut OP,
) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::QueryError>
where
OP: AtomicOperation,
{
let Some(entity) = self
.fetch_optional_inner::<<Repo as EsRepo>::QueryError>(&mut *op)
.await?
else {
return Ok(None);
};
let mut entities = [entity];
<Repo as EsRepo>::load_all_nested_in_op_include_deleted::<_, <Repo as EsRepo>::QueryError>(
op,
&mut entities,
)
.await?;
let [entity] = entities;
Ok(Some(entity))
}
pub async fn fetch_n_include_deleted<OP>(
self,
op: &mut OP,
first: usize,
) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::QueryError>
where
OP: AtomicOperation,
{
let (mut entities, more) = self
.fetch_n_inner::<<Repo as EsRepo>::QueryError>(&mut *op, first)
.await?;
<Repo as EsRepo>::load_all_nested_in_op_include_deleted::<_, <Repo as EsRepo>::QueryError>(
op,
&mut entities,
)
.await?;
Ok((entities, more))
}
}