es-entity 0.10.35

Event Sourcing Entity Framework
Documentation
//! Query execution infrastructure for event-sourced entities.
//!
//! This module provides the underlying query types used by the `es_query!` macro.
//! **These types are not intended to be used directly** - instead, use the `es_query!`
//! macro which provides a simpler interface for querying index tables and automatically
//! hydrating entities from their events.
//!
//! # Example
//!
//! Instead of using these types directly, use the `es_query!` macro:
//!
//! ```rust,ignore
//! es_query!(
//!     "SELECT id FROM users WHERE name = $1",
//!     name
//! ).fetch_optional(&pool).await
//! ```
//!
//! See the `es_query!` macro documentation for more details.

use crate::{
    db,
    error::EntityHydrationError,
    events::{EntityEvents, GenericEvent},
    one_time_executor::IntoOneTimeExecutor,
    operation::AtomicOperation,
    traits::*,
};

/// Query builder for event-sourced entities.
///
/// This type is generated by the `es_query!` macro and should not be constructed directly.
/// It wraps a SQLx query and provides methods to fetch and hydrate entities from their events.
pub struct EsQuery<'q, Repo, Flavor, F, A> {
    inner: sqlx::query::Map<'q, db::Db, F, A>,
    _repo: std::marker::PhantomData<Repo>,
    _flavor: std::marker::PhantomData<Flavor>,
}

/// Query flavor for flat entities without nested relationships.
pub struct EsQueryFlavorFlat;

/// Query flavor for entities with nested relationships that need to be loaded recursively.
pub struct EsQueryFlavorNested;

impl<'q, Repo, Flavor, F, A> EsQuery<'q, Repo, Flavor, F, A>
where
    Repo: EsRepo,
    <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
    F: FnMut(
            db::Row,
        ) -> Result<
            GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
            sqlx::Error,
        > + Send,
    A: 'q + Send + sqlx::IntoArguments<'q, db::Db>,
{
    pub fn new(query: sqlx::query::Map<'q, db::Db, F, A>) -> Self {
        Self {
            inner: query,
            _repo: std::marker::PhantomData,
            _flavor: std::marker::PhantomData,
        }
    }

    async fn fetch_optional_inner<E: From<sqlx::Error> + From<EntityHydrationError>>(
        self,
        op: impl IntoOneTimeExecutor<'_>,
    ) -> Result<Option<<Repo as EsRepo>::Entity>, E> {
        let executor = op.into_executor();
        let rows = executor.fetch_all(self.inner).await?;
        if rows.is_empty() {
            return Ok(None);
        }

        Ok(EntityEvents::load_first(rows.into_iter())?)
    }

    async fn fetch_n_inner<E: From<sqlx::Error> + From<EntityHydrationError>>(
        self,
        op: impl IntoOneTimeExecutor<'_>,
        first: usize,
    ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), E> {
        let executor = op.into_executor();
        let rows = executor.fetch_all(self.inner).await?;
        Ok(EntityEvents::load_n(rows.into_iter(), first)?)
    }
}

impl<'q, Repo, F, A> EsQuery<'q, Repo, EsQueryFlavorFlat, F, A>
where
    Repo: EsRepo,
    <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
    F: FnMut(
            db::Row,
        ) -> Result<
            GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
            sqlx::Error,
        > + Send,
    A: 'q + Send + sqlx::IntoArguments<'q, db::Db>,
{
    /// Fetches at most one entity from the query results.
    ///
    /// Returns `Ok(None)` if no entities match the query, or `Ok(Some(entity))` if found.
    pub async fn fetch_optional(
        self,
        op: impl IntoOneTimeExecutor<'_>,
    ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::QueryError> {
        self.fetch_optional_inner(op).await
    }

    /// Fetches up to `first` entities from the query results.
    ///
    /// Returns a tuple of (entities, has_more) where `has_more` indicates if there
    /// were more entities available beyond the requested limit.
    pub async fn fetch_n(
        self,
        op: impl IntoOneTimeExecutor<'_>,
        first: usize,
    ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::QueryError> {
        self.fetch_n_inner(op, first).await
    }
}

impl<'q, Repo, F, A> EsQuery<'q, Repo, EsQueryFlavorNested, F, A>
where
    Repo: EsRepo,
    <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
    F: FnMut(
            db::Row,
        ) -> Result<
            GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
            sqlx::Error,
        > + Send,
    A: 'q + Send + sqlx::IntoArguments<'q, db::Db>,
{
    /// Fetches at most one entity and loads all nested relationships.
    ///
    /// Returns `Ok(None)` if no entities match, or `Ok(Some(entity))` with all
    /// nested entities loaded if found.
    pub async fn fetch_optional<OP>(
        self,
        op: &mut OP,
    ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::QueryError>
    where
        OP: AtomicOperation,
    {
        let Some(entity) = self
            .fetch_optional_inner::<<Repo as EsRepo>::QueryError>(&mut *op)
            .await?
        else {
            return Ok(None);
        };
        let mut entities = [entity];
        <Repo as EsRepo>::load_all_nested_in_op::<_, <Repo as EsRepo>::QueryError>(
            op,
            &mut entities,
        )
        .await?;
        let [entity] = entities;
        Ok(Some(entity))
    }

    /// Fetches up to `first` entities and loads all nested relationships.
    ///
    /// Returns a tuple of (entities, has_more) where all entities have their nested
    /// relationships loaded, and `has_more` indicates if more entities were available.
    pub async fn fetch_n<OP>(
        self,
        op: &mut OP,
        first: usize,
    ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::QueryError>
    where
        OP: AtomicOperation,
    {
        let (mut entities, more) = self
            .fetch_n_inner::<<Repo as EsRepo>::QueryError>(&mut *op, first)
            .await?;
        <Repo as EsRepo>::load_all_nested_in_op::<_, <Repo as EsRepo>::QueryError>(
            op,
            &mut entities,
        )
        .await?;
        Ok((entities, more))
    }

    /// Like [`fetch_optional`](EsQuery::fetch_optional) but transitively includes
    /// soft-deleted nested entities.
    pub async fn fetch_optional_include_deleted<OP>(
        self,
        op: &mut OP,
    ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::QueryError>
    where
        OP: AtomicOperation,
    {
        let Some(entity) = self
            .fetch_optional_inner::<<Repo as EsRepo>::QueryError>(&mut *op)
            .await?
        else {
            return Ok(None);
        };
        let mut entities = [entity];
        <Repo as EsRepo>::load_all_nested_in_op_include_deleted::<_, <Repo as EsRepo>::QueryError>(
            op,
            &mut entities,
        )
        .await?;
        let [entity] = entities;
        Ok(Some(entity))
    }

    /// Like [`fetch_n`](EsQuery::fetch_n) but transitively includes soft-deleted
    /// nested entities.
    pub async fn fetch_n_include_deleted<OP>(
        self,
        op: &mut OP,
        first: usize,
    ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::QueryError>
    where
        OP: AtomicOperation,
    {
        let (mut entities, more) = self
            .fetch_n_inner::<<Repo as EsRepo>::QueryError>(&mut *op, first)
            .await?;
        <Repo as EsRepo>::load_all_nested_in_op_include_deleted::<_, <Repo as EsRepo>::QueryError>(
            op,
            &mut entities,
        )
        .await?;
        Ok((entities, more))
    }
}