evidentsource-client 1.0.0-rc1

Rust client for the EvidentSource event sourcing platform
Documentation
//! SpeculativeDatabase implementation for uncommitted event scenarios.

use std::sync::Arc;

use chrono::{DateTime, Utc};
use futures::stream;
use futures::Stream;
use nonempty::NonEmpty;

use evidentsource_core::domain::{
    DatabaseName, Event, EventAttribute, EventSelector, ProspectiveEvent, QueryDirection,
    QueryOptions, Revision, StateView, StateViewError, StateViewName, StateViewVersion,
};
use evidentsource_core::{DatabaseAtRevision, DatabaseIdentity, SpeculativeDatabase};

use super::at_revision::DatabaseAtRevisionImpl;
use super::effective_timestamp::EffectiveTimestampViewImpl;

/// Inner state for speculative database views.
struct SpeculativeDatabaseInner {
    /// The committed basis.
    basis: DatabaseAtRevisionImpl,
    /// Accumulated speculative transactions.
    speculated_transactions: NonEmpty<NonEmpty<ProspectiveEvent>>,
}

/// A speculative database view with uncommitted events overlaid.
///
/// This allows querying the database as if certain events had been committed,
/// useful for validation and preview scenarios.
#[derive(Clone)]
pub struct SpeculativeDatabaseImpl {
    inner: Arc<SpeculativeDatabaseInner>,
}

impl SpeculativeDatabaseImpl {
    /// Create a new speculative view with a single transaction.
    pub fn new(basis: DatabaseAtRevisionImpl, transaction: NonEmpty<ProspectiveEvent>) -> Self {
        Self {
            inner: Arc::new(SpeculativeDatabaseInner {
                basis,
                speculated_transactions: NonEmpty::singleton(transaction),
            }),
        }
    }

    /// Add another speculative transaction to this view.
    fn with_additional_transaction(self, transaction: NonEmpty<ProspectiveEvent>) -> Self {
        let mut transactions = self.inner.speculated_transactions.clone();
        transactions.push(transaction);
        Self {
            inner: Arc::new(SpeculativeDatabaseInner {
                basis: self.inner.basis.clone(),
                speculated_transactions: transactions,
            }),
        }
    }

    /// Get all speculated events as a flat iterator.
    fn speculated_events(&self) -> impl Iterator<Item = &ProspectiveEvent> {
        self.inner
            .speculated_transactions
            .iter()
            .flat_map(|b| b.iter())
    }
}

impl std::fmt::Debug for SpeculativeDatabaseImpl {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("SpeculativeDatabaseImpl")
            .field("basis", &self.inner.basis)
            .field(
                "speculated_transaction_count",
                &self.inner.speculated_transactions.len(),
            )
            .finish()
    }
}

impl DatabaseIdentity for SpeculativeDatabaseImpl {
    fn name(&self) -> &DatabaseName {
        self.inner.basis.name()
    }

    fn created_at(&self) -> DateTime<Utc> {
        self.inner.basis.created_at()
    }
}

impl DatabaseAtRevision for SpeculativeDatabaseImpl {
    type EffectiveTimestampView = EffectiveTimestampViewImpl;
    type Speculative = Self;

    fn revision(&self) -> Revision {
        // Speculative revision is basis + total number of events across all transactions
        let total_events: u64 = self
            .inner
            .speculated_transactions
            .iter()
            .map(|transaction| transaction.len() as u64)
            .sum();
        self.inner.basis.revision() + total_events
    }

    fn revision_timestamp(&self) -> DateTime<Utc> {
        // Use current time for speculative timestamp
        Utc::now()
    }

    fn at_effective_timestamp(
        &self,
        effective_timestamp: DateTime<Utc>,
    ) -> Self::EffectiveTimestampView {
        // Note: Returns an effective timestamp view based on the committed basis
        EffectiveTimestampViewImpl::new(self.inner.basis.clone(), effective_timestamp)
    }

    fn speculate_with_transaction(
        &self,
        transaction: NonEmpty<ProspectiveEvent>,
    ) -> Self::Speculative {
        self.clone().with_additional_transaction(transaction)
    }

    fn at_revision(&self, _revision: Revision) -> impl std::future::Future<Output = Self> {
        // Speculative databases cannot navigate to other revisions
        // We return self unchanged (the speculative view stays the same)
        let this = self.clone();
        async move { this }
    }

    fn query_events(&self, selector: &EventSelector) -> impl Stream<Item = Event> {
        // Filter speculated events that match the selector
        // Note: For speculated events, we use matches_prospective since they are ProspectiveEvents
        let speculated: Vec<Event> = self
            .speculated_events()
            .filter(|pe| selector.matches_prospective(pe))
            .map(|pe| {
                // Convert ProspectiveEvent to Event for the query result
                // Use a synthetic source URI for speculated events
                Event {
                    id: pe.id.clone(),
                    source: format!("speculative://{}", pe.stream),
                    event_type: pe.event_type.clone(),
                    subject: pe.subject.clone(),
                    data: pe.data.clone(),
                    time: pe.time,
                    datacontenttype: pe.datacontenttype.clone(),
                    dataschema: pe.dataschema.clone(),
                    extensions: pe.extensions.clone(),
                }
            })
            .collect();

        // Note: We only return speculated events here. To get a complete view,
        // users should chain this with basis.query_events() in their application.
        // This is because we cannot easily merge async streams from basis with
        // sync speculated events in a generic way.
        //
        // Example usage:
        // ```
        // let basis_events = spec_db.basis().query_events(&selector);
        // let speculated_events = spec_db.query_events(&selector);
        // // basis_events will be followed by speculated_events
        // ```
        stream::iter(speculated)
    }

    fn query_events_with_options(
        &self,
        selector: &EventSelector,
        options: QueryOptions,
    ) -> impl Stream<Item = Event> {
        // Filter speculated events that match the selector
        let mut speculated: Vec<Event> = self
            .speculated_events()
            .filter(|pe| selector.matches_prospective(pe))
            .map(|pe| Event {
                id: pe.id.clone(),
                source: format!("speculative://{}", pe.stream),
                event_type: pe.event_type.clone(),
                subject: pe.subject.clone(),
                data: pe.data.clone(),
                time: pe.time,
                datacontenttype: pe.datacontenttype.clone(),
                dataschema: pe.dataschema.clone(),
                extensions: pe.extensions.clone(),
            })
            .collect();

        // Apply direction
        if options.get_direction() == QueryDirection::Reverse {
            speculated.reverse();
        }

        // Apply limit
        if let Some(limit) = options.get_limit() {
            speculated.truncate(limit as usize);
        }

        stream::iter(speculated)
    }

    async fn view_state(
        &self,
        _name: &StateViewName,
        _version: StateViewVersion,
    ) -> Result<StateView, StateViewError> {
        // Cannot compute state views speculatively without a local WASM runtime
        Err(StateViewError::EvolveError(
            "Cannot compute state views speculatively without local WASM runtime".to_string(),
        ))
    }

    async fn view_state_with_params(
        &self,
        _name: &StateViewName,
        _version: StateViewVersion,
        _params: &[(String, EventAttribute)],
    ) -> Result<StateView, StateViewError> {
        // Cannot compute state views speculatively without a local WASM runtime
        Err(StateViewError::EvolveError(
            "Cannot compute state views speculatively without local WASM runtime".to_string(),
        ))
    }
}

impl SpeculativeDatabase for SpeculativeDatabaseImpl {
    type Basis = DatabaseAtRevisionImpl;

    fn basis(&self) -> &Self::Basis {
        &self.inner.basis
    }

    fn speculated_transactions(&self) -> &NonEmpty<NonEmpty<ProspectiveEvent>> {
        &self.inner.speculated_transactions
    }
}