Skip to main content

es_entity/
query.rs

1//! Query execution infrastructure for event-sourced entities.
2//!
3//! This module provides the underlying query types used by the `es_query!` macro.
4//! **These types are not intended to be used directly** - instead, use the `es_query!`
5//! macro which provides a simpler interface for querying index tables and automatically
6//! hydrating entities from their events.
7//!
8//! # Example
9//!
10//! Instead of using these types directly, use the `es_query!` macro:
11//!
12//! ```rust,ignore
13//! es_query!(
14//!     "SELECT id FROM users WHERE name = $1",
15//!     name
16//! ).fetch_optional(&pool).await
17//! ```
18//!
19//! See the `es_query!` macro documentation for more details.
20
21use crate::{
22    db,
23    error::EntityHydrationError,
24    events::{EntityEvents, GenericEvent},
25    one_time_executor::IntoOneTimeExecutor,
26    operation::AtomicOperation,
27    traits::*,
28};
29
30/// Query builder for event-sourced entities.
31///
32/// This type is generated by the `es_query!` macro and should not be constructed directly.
33/// It wraps a SQLx query and provides methods to fetch and hydrate entities from their events.
34pub struct EsQuery<'q, Repo, Flavor, F, A> {
35    inner: sqlx::query::Map<'q, db::Db, F, A>,
36    _repo: std::marker::PhantomData<Repo>,
37    _flavor: std::marker::PhantomData<Flavor>,
38}
39
40/// Query flavor for flat entities without nested relationships.
41pub struct EsQueryFlavorFlat;
42
43/// Query flavor for entities with nested relationships that need to be loaded recursively.
44pub struct EsQueryFlavorNested;
45
46impl<'q, Repo, Flavor, F, A> EsQuery<'q, Repo, Flavor, F, A>
47where
48    Repo: EsRepo,
49    <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
50    F: FnMut(
51            db::Row,
52        ) -> Result<
53            GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
54            sqlx::Error,
55        > + Send,
56    A: 'q + Send + sqlx::IntoArguments<'q, db::Db>,
57{
58    pub fn new(query: sqlx::query::Map<'q, db::Db, F, A>) -> Self {
59        Self {
60            inner: query,
61            _repo: std::marker::PhantomData,
62            _flavor: std::marker::PhantomData,
63        }
64    }
65
66    async fn fetch_optional_inner<E: From<sqlx::Error> + From<EntityHydrationError>>(
67        self,
68        op: impl IntoOneTimeExecutor<'_>,
69    ) -> Result<Option<<Repo as EsRepo>::Entity>, E> {
70        let executor = op.into_executor();
71        let rows = executor.fetch_all(self.inner).await?;
72        if rows.is_empty() {
73            return Ok(None);
74        }
75
76        Ok(EntityEvents::load_first(rows.into_iter())?)
77    }
78
79    async fn fetch_n_inner<E: From<sqlx::Error> + From<EntityHydrationError>>(
80        self,
81        op: impl IntoOneTimeExecutor<'_>,
82        first: usize,
83    ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), E> {
84        let executor = op.into_executor();
85        let rows = executor.fetch_all(self.inner).await?;
86        Ok(EntityEvents::load_n(rows.into_iter(), first)?)
87    }
88}
89
90impl<'q, Repo, F, A> EsQuery<'q, Repo, EsQueryFlavorFlat, F, A>
91where
92    Repo: EsRepo,
93    <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
94    F: FnMut(
95            db::Row,
96        ) -> Result<
97            GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
98            sqlx::Error,
99        > + Send,
100    A: 'q + Send + sqlx::IntoArguments<'q, db::Db>,
101{
102    /// Fetches at most one entity from the query results.
103    ///
104    /// Returns `Ok(None)` if no entities match the query, or `Ok(Some(entity))` if found.
105    pub async fn fetch_optional(
106        self,
107        op: impl IntoOneTimeExecutor<'_>,
108    ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::QueryError> {
109        self.fetch_optional_inner(op).await
110    }
111
112    /// Fetches up to `first` entities from the query results.
113    ///
114    /// Returns a tuple of (entities, has_more) where `has_more` indicates if there
115    /// were more entities available beyond the requested limit.
116    pub async fn fetch_n(
117        self,
118        op: impl IntoOneTimeExecutor<'_>,
119        first: usize,
120    ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::QueryError> {
121        self.fetch_n_inner(op, first).await
122    }
123}
124
125impl<'q, Repo, F, A> EsQuery<'q, Repo, EsQueryFlavorNested, F, A>
126where
127    Repo: EsRepo,
128    <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
129    F: FnMut(
130            db::Row,
131        ) -> Result<
132            GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
133            sqlx::Error,
134        > + Send,
135    A: 'q + Send + sqlx::IntoArguments<'q, db::Db>,
136{
137    /// Fetches at most one entity and loads all nested relationships.
138    ///
139    /// Returns `Ok(None)` if no entities match, or `Ok(Some(entity))` with all
140    /// nested entities loaded if found.
141    pub async fn fetch_optional<OP>(
142        self,
143        op: &mut OP,
144    ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::QueryError>
145    where
146        OP: AtomicOperation,
147    {
148        let Some(entity) = self
149            .fetch_optional_inner::<<Repo as EsRepo>::QueryError>(&mut *op)
150            .await?
151        else {
152            return Ok(None);
153        };
154        let mut entities = [entity];
155        <Repo as EsRepo>::load_all_nested_in_op::<_, <Repo as EsRepo>::QueryError>(
156            op,
157            &mut entities,
158        )
159        .await?;
160        let [entity] = entities;
161        Ok(Some(entity))
162    }
163
164    /// Fetches up to `first` entities and loads all nested relationships.
165    ///
166    /// Returns a tuple of (entities, has_more) where all entities have their nested
167    /// relationships loaded, and `has_more` indicates if more entities were available.
168    pub async fn fetch_n<OP>(
169        self,
170        op: &mut OP,
171        first: usize,
172    ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::QueryError>
173    where
174        OP: AtomicOperation,
175    {
176        let (mut entities, more) = self
177            .fetch_n_inner::<<Repo as EsRepo>::QueryError>(&mut *op, first)
178            .await?;
179        <Repo as EsRepo>::load_all_nested_in_op::<_, <Repo as EsRepo>::QueryError>(
180            op,
181            &mut entities,
182        )
183        .await?;
184        Ok((entities, more))
185    }
186}