es_entity/
query.rs

1//! Handle query generation with helper methods for execution.
2
3use crate::{
4    events::{EntityEvents, GenericEvent},
5    one_time_executor::IntoOneTimeExecutor,
6    operation::AtomicOperation,
7    traits::*,
8};
9
10pub struct EsQuery<'q, Repo, Flavor, F, A> {
11    inner: sqlx::query::Map<'q, sqlx::Postgres, F, A>,
12    _repo: std::marker::PhantomData<Repo>,
13    _flavor: std::marker::PhantomData<Flavor>,
14}
15pub struct EsQueryFlavorFlat;
16pub struct EsQueryFlavorNested;
17
18impl<'q, Repo, Flavor, F, A> EsQuery<'q, Repo, Flavor, F, A>
19where
20    Repo: EsRepo,
21    <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
22    F: FnMut(
23            sqlx::postgres::PgRow,
24        ) -> Result<
25            GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
26            sqlx::Error,
27        > + Send,
28    A: 'q + Send + sqlx::IntoArguments<'q, sqlx::Postgres>,
29{
30    pub fn new(query: sqlx::query::Map<'q, sqlx::Postgres, F, A>) -> Self {
31        Self {
32            inner: query,
33            _repo: std::marker::PhantomData,
34            _flavor: std::marker::PhantomData,
35        }
36    }
37
38    async fn fetch_optional_inner(
39        self,
40        op: impl IntoOneTimeExecutor<'_>,
41    ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::Err> {
42        let executor = op.into_executor();
43        let rows = executor.fetch_all(self.inner).await?;
44        if rows.is_empty() {
45            return Ok(None);
46        }
47
48        Ok(Some(EntityEvents::load_first(rows.into_iter())?))
49    }
50
51    async fn fetch_one_inner(
52        self,
53        op: impl IntoOneTimeExecutor<'_>,
54    ) -> Result<<Repo as EsRepo>::Entity, <Repo as EsRepo>::Err> {
55        let executor = op.into_executor();
56        let rows = executor.fetch_all(self.inner).await?;
57        Ok(EntityEvents::load_first(rows.into_iter())?)
58    }
59
60    async fn fetch_n_inner(
61        self,
62        op: impl IntoOneTimeExecutor<'_>,
63        first: usize,
64    ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::Err> {
65        let executor = op.into_executor();
66        let rows = executor.fetch_all(self.inner).await?;
67        Ok(EntityEvents::load_n(rows.into_iter(), first)?)
68    }
69}
70
71impl<'q, Repo, F, A> EsQuery<'q, Repo, EsQueryFlavorFlat, F, A>
72where
73    Repo: EsRepo,
74    <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
75    F: FnMut(
76            sqlx::postgres::PgRow,
77        ) -> Result<
78            GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
79            sqlx::Error,
80        > + Send,
81    A: 'q + Send + sqlx::IntoArguments<'q, sqlx::Postgres>,
82{
83    pub async fn fetch_optional(
84        self,
85        op: impl IntoOneTimeExecutor<'_>,
86    ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::Err> {
87        self.fetch_optional_inner(op).await
88    }
89
90    pub async fn fetch_one(
91        self,
92        op: impl IntoOneTimeExecutor<'_>,
93    ) -> Result<<Repo as EsRepo>::Entity, <Repo as EsRepo>::Err> {
94        self.fetch_one_inner(op).await
95    }
96
97    pub async fn fetch_n(
98        self,
99        op: impl IntoOneTimeExecutor<'_>,
100        first: usize,
101    ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::Err> {
102        self.fetch_n_inner(op, first).await
103    }
104}
105
106impl<'q, Repo, F, A> EsQuery<'q, Repo, EsQueryFlavorNested, F, A>
107where
108    Repo: EsRepo,
109    <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
110    F: FnMut(
111            sqlx::postgres::PgRow,
112        ) -> Result<
113            GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
114            sqlx::Error,
115        > + Send,
116    A: 'q + Send + sqlx::IntoArguments<'q, sqlx::Postgres>,
117{
118    pub async fn fetch_optional<OP>(
119        self,
120        op: &mut OP,
121    ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::Err>
122    where
123        OP: AtomicOperation,
124    {
125        let Some(entity) = self.fetch_optional_inner(&mut *op).await? else {
126            return Ok(None);
127        };
128        let mut entities = [entity];
129        <Repo as EsRepo>::load_all_nested_in_op(op, &mut entities).await?;
130        let [entity] = entities;
131        Ok(Some(entity))
132    }
133
134    pub async fn fetch_one<OP>(
135        self,
136        op: &mut OP,
137    ) -> Result<<Repo as EsRepo>::Entity, <Repo as EsRepo>::Err>
138    where
139        OP: AtomicOperation,
140    {
141        let entity = self.fetch_one_inner(&mut *op).await?;
142        let mut entities = [entity];
143        <Repo as EsRepo>::load_all_nested_in_op(op, &mut entities).await?;
144        let [entity] = entities;
145        Ok(entity)
146    }
147
148    pub async fn fetch_n<OP>(
149        self,
150        op: &mut OP,
151        first: usize,
152    ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::Err>
153    where
154        OP: AtomicOperation,
155    {
156        let (mut entities, more) = self.fetch_n_inner(&mut *op, first).await?;
157        <Repo as EsRepo>::load_all_nested_in_op(op, &mut entities).await?;
158        Ok((entities, more))
159    }
160}