es_entity/
query.rs

1//! Query execution infrastructure for event-sourced entities.
2//!
3//! This module provides the underlying query types used by the `es_query!` macro.
4//! **These types are not intended to be used directly** - instead, use the `es_query!`
5//! macro which provides a simpler interface for querying index tables and automatically
6//! hydrating entities from their events.
7//!
8//! # Example
9//!
10//! Instead of using these types directly, use the `es_query!` macro:
11//!
12//! ```rust,ignore
13//! es_query!(
14//!     "SELECT id FROM users WHERE name = $1",
15//!     name
16//! ).fetch_one(&pool).await
17//! ```
18//!
19//! See the `es_query!` macro documentation for more details.
20
21use crate::{
22    events::{EntityEvents, GenericEvent},
23    one_time_executor::IntoOneTimeExecutor,
24    operation::AtomicOperation,
25    traits::*,
26};
27
28/// Query builder for event-sourced entities.
29///
30/// This type is generated by the `es_query!` macro and should not be constructed directly.
31/// It wraps a SQLx query and provides methods to fetch and hydrate entities from their events.
32pub struct EsQuery<'q, Repo, Flavor, F, A> {
33    inner: sqlx::query::Map<'q, sqlx::Postgres, F, A>,
34    _repo: std::marker::PhantomData<Repo>,
35    _flavor: std::marker::PhantomData<Flavor>,
36}
37
38/// Query flavor for flat entities without nested relationships.
39pub struct EsQueryFlavorFlat;
40
41/// Query flavor for entities with nested relationships that need to be loaded recursively.
42pub struct EsQueryFlavorNested;
43
44impl<'q, Repo, Flavor, F, A> EsQuery<'q, Repo, Flavor, F, A>
45where
46    Repo: EsRepo,
47    <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
48    F: FnMut(
49            sqlx::postgres::PgRow,
50        ) -> Result<
51            GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
52            sqlx::Error,
53        > + Send,
54    A: 'q + Send + sqlx::IntoArguments<'q, sqlx::Postgres>,
55{
56    pub fn new(query: sqlx::query::Map<'q, sqlx::Postgres, F, A>) -> Self {
57        Self {
58            inner: query,
59            _repo: std::marker::PhantomData,
60            _flavor: std::marker::PhantomData,
61        }
62    }
63
64    async fn fetch_optional_inner(
65        self,
66        op: impl IntoOneTimeExecutor<'_>,
67    ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::Err> {
68        let executor = op.into_executor();
69        let rows = executor.fetch_all(self.inner).await?;
70        if rows.is_empty() {
71            return Ok(None);
72        }
73
74        Ok(Some(EntityEvents::load_first(rows.into_iter())?))
75    }
76
77    async fn fetch_one_inner(
78        self,
79        op: impl IntoOneTimeExecutor<'_>,
80    ) -> Result<<Repo as EsRepo>::Entity, <Repo as EsRepo>::Err> {
81        let executor = op.into_executor();
82        let rows = executor.fetch_all(self.inner).await?;
83        Ok(EntityEvents::load_first(rows.into_iter())?)
84    }
85
86    async fn fetch_n_inner(
87        self,
88        op: impl IntoOneTimeExecutor<'_>,
89        first: usize,
90    ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::Err> {
91        let executor = op.into_executor();
92        let rows = executor.fetch_all(self.inner).await?;
93        Ok(EntityEvents::load_n(rows.into_iter(), first)?)
94    }
95}
96
97impl<'q, Repo, F, A> EsQuery<'q, Repo, EsQueryFlavorFlat, F, A>
98where
99    Repo: EsRepo,
100    <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
101    F: FnMut(
102            sqlx::postgres::PgRow,
103        ) -> Result<
104            GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
105            sqlx::Error,
106        > + Send,
107    A: 'q + Send + sqlx::IntoArguments<'q, sqlx::Postgres>,
108{
109    /// Fetches at most one entity from the query results.
110    ///
111    /// Returns `Ok(None)` if no entities match the query, or `Ok(Some(entity))` if found.
112    pub async fn fetch_optional(
113        self,
114        op: impl IntoOneTimeExecutor<'_>,
115    ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::Err> {
116        self.fetch_optional_inner(op).await
117    }
118
119    /// Fetches exactly one entity from the query results.
120    ///
121    /// Returns an error if no entities match or if the entity cannot be loaded.
122    pub async fn fetch_one(
123        self,
124        op: impl IntoOneTimeExecutor<'_>,
125    ) -> Result<<Repo as EsRepo>::Entity, <Repo as EsRepo>::Err> {
126        self.fetch_one_inner(op).await
127    }
128
129    /// Fetches up to `first` entities from the query results.
130    ///
131    /// Returns a tuple of (entities, has_more) where `has_more` indicates if there
132    /// were more entities available beyond the requested limit.
133    pub async fn fetch_n(
134        self,
135        op: impl IntoOneTimeExecutor<'_>,
136        first: usize,
137    ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::Err> {
138        self.fetch_n_inner(op, first).await
139    }
140}
141
142impl<'q, Repo, F, A> EsQuery<'q, Repo, EsQueryFlavorNested, F, A>
143where
144    Repo: EsRepo,
145    <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
146    F: FnMut(
147            sqlx::postgres::PgRow,
148        ) -> Result<
149            GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
150            sqlx::Error,
151        > + Send,
152    A: 'q + Send + sqlx::IntoArguments<'q, sqlx::Postgres>,
153{
154    /// Fetches at most one entity and loads all nested relationships.
155    ///
156    /// Returns `Ok(None)` if no entities match, or `Ok(Some(entity))` with all
157    /// nested entities loaded if found.
158    pub async fn fetch_optional<OP>(
159        self,
160        op: &mut OP,
161    ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::Err>
162    where
163        OP: AtomicOperation,
164    {
165        let Some(entity) = self.fetch_optional_inner(&mut *op).await? else {
166            return Ok(None);
167        };
168        let mut entities = [entity];
169        <Repo as EsRepo>::load_all_nested_in_op(op, &mut entities).await?;
170        let [entity] = entities;
171        Ok(Some(entity))
172    }
173
174    /// Fetches exactly one entity and loads all nested relationships.
175    ///
176    /// Returns an error if no entities match or if the entity/nested relationships
177    /// cannot be loaded.
178    pub async fn fetch_one<OP>(
179        self,
180        op: &mut OP,
181    ) -> Result<<Repo as EsRepo>::Entity, <Repo as EsRepo>::Err>
182    where
183        OP: AtomicOperation,
184    {
185        let entity = self.fetch_one_inner(&mut *op).await?;
186        let mut entities = [entity];
187        <Repo as EsRepo>::load_all_nested_in_op(op, &mut entities).await?;
188        let [entity] = entities;
189        Ok(entity)
190    }
191
192    /// Fetches up to `first` entities and loads all nested relationships.
193    ///
194    /// Returns a tuple of (entities, has_more) where all entities have their nested
195    /// relationships loaded, and `has_more` indicates if more entities were available.
196    pub async fn fetch_n<OP>(
197        self,
198        op: &mut OP,
199        first: usize,
200    ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::Err>
201    where
202        OP: AtomicOperation,
203    {
204        let (mut entities, more) = self.fetch_n_inner(&mut *op, first).await?;
205        <Repo as EsRepo>::load_all_nested_in_op(op, &mut entities).await?;
206        Ok((entities, more))
207    }
208}