Skip to main content

es_entity/
query.rs

1//! Query execution infrastructure for event-sourced entities.
2//!
3//! This module provides the underlying query types used by the `es_query!` macro.
4//! **These types are not intended to be used directly** - instead, use the `es_query!`
5//! macro which provides a simpler interface for querying index tables and automatically
6//! hydrating entities from their events.
7//!
8//! # Example
9//!
10//! Instead of using these types directly, use the `es_query!` macro:
11//!
12//! ```rust,ignore
13//! es_query!(
14//!     "SELECT id FROM users WHERE name = $1",
15//!     name
16//! ).fetch_optional(&pool).await
17//! ```
18//!
19//! See the `es_query!` macro documentation for more details.
20
21use crate::{
22    error::EntityHydrationError,
23    events::{EntityEvents, GenericEvent},
24    one_time_executor::IntoOneTimeExecutor,
25    operation::AtomicOperation,
26    traits::*,
27};
28
29/// Query builder for event-sourced entities.
30///
31/// This type is generated by the `es_query!` macro and should not be constructed directly.
32/// It wraps a SQLx query and provides methods to fetch and hydrate entities from their events.
33pub struct EsQuery<'q, Repo, Flavor, F, A> {
34    inner: sqlx::query::Map<'q, sqlx::Postgres, F, A>,
35    _repo: std::marker::PhantomData<Repo>,
36    _flavor: std::marker::PhantomData<Flavor>,
37}
38
39/// Query flavor for flat entities without nested relationships.
40pub struct EsQueryFlavorFlat;
41
42/// Query flavor for entities with nested relationships that need to be loaded recursively.
43pub struct EsQueryFlavorNested;
44
45impl<'q, Repo, Flavor, F, A> EsQuery<'q, Repo, Flavor, F, A>
46where
47    Repo: EsRepo,
48    <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
49    F: FnMut(
50            sqlx::postgres::PgRow,
51        ) -> Result<
52            GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
53            sqlx::Error,
54        > + Send,
55    A: 'q + Send + sqlx::IntoArguments<'q, sqlx::Postgres>,
56{
57    pub fn new(query: sqlx::query::Map<'q, sqlx::Postgres, F, A>) -> Self {
58        Self {
59            inner: query,
60            _repo: std::marker::PhantomData,
61            _flavor: std::marker::PhantomData,
62        }
63    }
64
65    async fn fetch_optional_inner<E: From<sqlx::Error> + From<EntityHydrationError>>(
66        self,
67        op: impl IntoOneTimeExecutor<'_>,
68    ) -> Result<Option<<Repo as EsRepo>::Entity>, E> {
69        let executor = op.into_executor();
70        let rows = executor.fetch_all(self.inner).await?;
71        if rows.is_empty() {
72            return Ok(None);
73        }
74
75        Ok(EntityEvents::load_first(rows.into_iter())?)
76    }
77
78    async fn fetch_n_inner<E: From<sqlx::Error> + From<EntityHydrationError>>(
79        self,
80        op: impl IntoOneTimeExecutor<'_>,
81        first: usize,
82    ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), E> {
83        let executor = op.into_executor();
84        let rows = executor.fetch_all(self.inner).await?;
85        Ok(EntityEvents::load_n(rows.into_iter(), first)?)
86    }
87}
88
89impl<'q, Repo, F, A> EsQuery<'q, Repo, EsQueryFlavorFlat, F, A>
90where
91    Repo: EsRepo,
92    <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
93    F: FnMut(
94            sqlx::postgres::PgRow,
95        ) -> Result<
96            GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
97            sqlx::Error,
98        > + Send,
99    A: 'q + Send + sqlx::IntoArguments<'q, sqlx::Postgres>,
100{
101    /// Fetches at most one entity from the query results.
102    ///
103    /// Returns `Ok(None)` if no entities match the query, or `Ok(Some(entity))` if found.
104    pub async fn fetch_optional(
105        self,
106        op: impl IntoOneTimeExecutor<'_>,
107    ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::QueryError> {
108        self.fetch_optional_inner(op).await
109    }
110
111    /// Fetches up to `first` entities from the query results.
112    ///
113    /// Returns a tuple of (entities, has_more) where `has_more` indicates if there
114    /// were more entities available beyond the requested limit.
115    pub async fn fetch_n(
116        self,
117        op: impl IntoOneTimeExecutor<'_>,
118        first: usize,
119    ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::QueryError> {
120        self.fetch_n_inner(op, first).await
121    }
122}
123
124impl<'q, Repo, F, A> EsQuery<'q, Repo, EsQueryFlavorNested, F, A>
125where
126    Repo: EsRepo,
127    <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
128    F: FnMut(
129            sqlx::postgres::PgRow,
130        ) -> Result<
131            GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
132            sqlx::Error,
133        > + Send,
134    A: 'q + Send + sqlx::IntoArguments<'q, sqlx::Postgres>,
135{
136    /// Fetches at most one entity and loads all nested relationships.
137    ///
138    /// Returns `Ok(None)` if no entities match, or `Ok(Some(entity))` with all
139    /// nested entities loaded if found.
140    pub async fn fetch_optional<OP>(
141        self,
142        op: &mut OP,
143    ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::QueryError>
144    where
145        OP: AtomicOperation,
146    {
147        let Some(entity) = self
148            .fetch_optional_inner::<<Repo as EsRepo>::QueryError>(&mut *op)
149            .await?
150        else {
151            return Ok(None);
152        };
153        let mut entities = [entity];
154        <Repo as EsRepo>::load_all_nested_in_op::<_, <Repo as EsRepo>::QueryError>(
155            op,
156            &mut entities,
157        )
158        .await?;
159        let [entity] = entities;
160        Ok(Some(entity))
161    }
162
163    /// Fetches up to `first` entities and loads all nested relationships.
164    ///
165    /// Returns a tuple of (entities, has_more) where all entities have their nested
166    /// relationships loaded, and `has_more` indicates if more entities were available.
167    pub async fn fetch_n<OP>(
168        self,
169        op: &mut OP,
170        first: usize,
171    ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::QueryError>
172    where
173        OP: AtomicOperation,
174    {
175        let (mut entities, more) = self
176            .fetch_n_inner::<<Repo as EsRepo>::QueryError>(&mut *op, first)
177            .await?;
178        <Repo as EsRepo>::load_all_nested_in_op::<_, <Repo as EsRepo>::QueryError>(
179            op,
180            &mut entities,
181        )
182        .await?;
183        Ok((entities, more))
184    }
185}