es-entity 0.10.19

Event Sourcing Entity Framework
Documentation
//! Query execution infrastructure for event-sourced entities.
//!
//! This module provides the underlying query types used by the `es_query!` macro.
//! **These types are not intended to be used directly** - instead, use the `es_query!`
//! macro which provides a simpler interface for querying index tables and automatically
//! hydrating entities from their events.
//!
//! # Example
//!
//! Instead of using these types directly, use the `es_query!` macro:
//!
//! ```rust,ignore
//! es_query!(
//!     "SELECT id FROM users WHERE name = $1",
//!     name
//! ).fetch_one(&pool).await
//! ```
//!
//! See the `es_query!` macro documentation for more details.

use crate::{
    events::{EntityEvents, GenericEvent},
    one_time_executor::IntoOneTimeExecutor,
    operation::AtomicOperation,
    traits::*,
};

/// Query builder for event-sourced entities.
///
/// This type is generated by the `es_query!` macro and should not be constructed directly.
/// It wraps a SQLx query and provides methods to fetch and hydrate entities from their events.
pub struct EsQuery<'q, Repo, Flavor, F, A> {
    inner: sqlx::query::Map<'q, sqlx::Postgres, F, A>,
    _repo: std::marker::PhantomData<Repo>,
    _flavor: std::marker::PhantomData<Flavor>,
}

/// Query flavor for flat entities without nested relationships.
pub struct EsQueryFlavorFlat;

/// Query flavor for entities with nested relationships that need to be loaded recursively.
pub struct EsQueryFlavorNested;

impl<'q, Repo, Flavor, F, A> EsQuery<'q, Repo, Flavor, F, A>
where
    Repo: EsRepo,
    <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
    F: FnMut(
            sqlx::postgres::PgRow,
        ) -> Result<
            GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
            sqlx::Error,
        > + Send,
    A: 'q + Send + sqlx::IntoArguments<'q, sqlx::Postgres>,
{
    pub fn new(query: sqlx::query::Map<'q, sqlx::Postgres, F, A>) -> Self {
        Self {
            inner: query,
            _repo: std::marker::PhantomData,
            _flavor: std::marker::PhantomData,
        }
    }

    async fn fetch_optional_inner(
        self,
        op: impl IntoOneTimeExecutor<'_>,
    ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::Err> {
        let executor = op.into_executor();
        let rows = executor.fetch_all(self.inner).await?;
        if rows.is_empty() {
            return Ok(None);
        }

        Ok(Some(EntityEvents::load_first(rows.into_iter())?))
    }

    async fn fetch_one_inner(
        self,
        op: impl IntoOneTimeExecutor<'_>,
    ) -> Result<<Repo as EsRepo>::Entity, <Repo as EsRepo>::Err> {
        let executor = op.into_executor();
        let rows = executor.fetch_all(self.inner).await?;
        Ok(EntityEvents::load_first(rows.into_iter())?)
    }

    async fn fetch_n_inner(
        self,
        op: impl IntoOneTimeExecutor<'_>,
        first: usize,
    ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::Err> {
        let executor = op.into_executor();
        let rows = executor.fetch_all(self.inner).await?;
        Ok(EntityEvents::load_n(rows.into_iter(), first)?)
    }
}

impl<'q, Repo, F, A> EsQuery<'q, Repo, EsQueryFlavorFlat, F, A>
where
    Repo: EsRepo,
    <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
    F: FnMut(
            sqlx::postgres::PgRow,
        ) -> Result<
            GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
            sqlx::Error,
        > + Send,
    A: 'q + Send + sqlx::IntoArguments<'q, sqlx::Postgres>,
{
    /// Fetches at most one entity from the query results.
    ///
    /// Returns `Ok(None)` if no entities match the query, or `Ok(Some(entity))` if found.
    pub async fn fetch_optional(
        self,
        op: impl IntoOneTimeExecutor<'_>,
    ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::Err> {
        self.fetch_optional_inner(op).await
    }

    /// Fetches exactly one entity from the query results.
    ///
    /// Returns an error if no entities match or if the entity cannot be loaded.
    pub async fn fetch_one(
        self,
        op: impl IntoOneTimeExecutor<'_>,
    ) -> Result<<Repo as EsRepo>::Entity, <Repo as EsRepo>::Err> {
        self.fetch_one_inner(op).await
    }

    /// Fetches up to `first` entities from the query results.
    ///
    /// Returns a tuple of (entities, has_more) where `has_more` indicates if there
    /// were more entities available beyond the requested limit.
    pub async fn fetch_n(
        self,
        op: impl IntoOneTimeExecutor<'_>,
        first: usize,
    ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::Err> {
        self.fetch_n_inner(op, first).await
    }
}

impl<'q, Repo, F, A> EsQuery<'q, Repo, EsQueryFlavorNested, F, A>
where
    Repo: EsRepo,
    <<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId: Unpin,
    F: FnMut(
            sqlx::postgres::PgRow,
        ) -> Result<
            GenericEvent<<<<Repo as EsRepo>::Entity as EsEntity>::Event as EsEvent>::EntityId>,
            sqlx::Error,
        > + Send,
    A: 'q + Send + sqlx::IntoArguments<'q, sqlx::Postgres>,
{
    /// Fetches at most one entity and loads all nested relationships.
    ///
    /// Returns `Ok(None)` if no entities match, or `Ok(Some(entity))` with all
    /// nested entities loaded if found.
    pub async fn fetch_optional<OP>(
        self,
        op: &mut OP,
    ) -> Result<Option<<Repo as EsRepo>::Entity>, <Repo as EsRepo>::Err>
    where
        OP: AtomicOperation,
    {
        let Some(entity) = self.fetch_optional_inner(&mut *op).await? else {
            return Ok(None);
        };
        let mut entities = [entity];
        <Repo as EsRepo>::load_all_nested_in_op(op, &mut entities).await?;
        let [entity] = entities;
        Ok(Some(entity))
    }

    /// Fetches exactly one entity and loads all nested relationships.
    ///
    /// Returns an error if no entities match or if the entity/nested relationships
    /// cannot be loaded.
    pub async fn fetch_one<OP>(
        self,
        op: &mut OP,
    ) -> Result<<Repo as EsRepo>::Entity, <Repo as EsRepo>::Err>
    where
        OP: AtomicOperation,
    {
        let entity = self.fetch_one_inner(&mut *op).await?;
        let mut entities = [entity];
        <Repo as EsRepo>::load_all_nested_in_op(op, &mut entities).await?;
        let [entity] = entities;
        Ok(entity)
    }

    /// Fetches up to `first` entities and loads all nested relationships.
    ///
    /// Returns a tuple of (entities, has_more) where all entities have their nested
    /// relationships loaded, and `has_more` indicates if more entities were available.
    pub async fn fetch_n<OP>(
        self,
        op: &mut OP,
        first: usize,
    ) -> Result<(Vec<<Repo as EsRepo>::Entity>, bool), <Repo as EsRepo>::Err>
    where
        OP: AtomicOperation,
    {
        let (mut entities, more) = self.fetch_n_inner(&mut *op, first).await?;
        <Repo as EsRepo>::load_all_nested_in_op(op, &mut entities).await?;
        Ok((entities, more))
    }
}