evidentsource-client 1.0.0-rc1

Rust client for the EvidentSource event sourcing platform
Documentation
//! EffectiveTimestampView implementation for bi-temporal queries.

use std::sync::Arc;

use chrono::{DateTime, Utc};
use futures::stream::{self, StreamExt};
use futures::Stream;
use nonempty::NonEmpty;

use evidentsource_core::domain::{
    DatabaseError, DatabaseName, Event, EventAttribute, EventSelector, ProspectiveEvent,
    QueryOptions, Revision, StateView, StateViewError, StateViewName, StateViewVersion,
};
use evidentsource_core::{
    DatabaseAtRevision, DatabaseAtRevisionAndEffectiveTimestamp, DatabaseIdentity,
};

use crate::com::evidentsource as proto;
use crate::conversions::datetime_to_timestamp;

use super::at_revision::DatabaseAtRevisionImpl;
use super::speculative::SpeculativeDatabaseImpl;

/// Inner state for effective timestamp views.
struct EffectiveTimestampViewInner {
    /// The base revision view.
    basis: DatabaseAtRevisionImpl,
    /// The effective timestamp scope.
    effective_timestamp: DateTime<Utc>,
}

/// A database view scoped to both a revision and an effective timestamp.
///
/// This enables bi-temporal queries where you want to see the database
/// state as it was understood at a particular point in effective time.
#[derive(Clone)]
pub struct EffectiveTimestampViewImpl {
    inner: Arc<EffectiveTimestampViewInner>,
}

impl EffectiveTimestampViewImpl {
    /// Create a new effective timestamp view.
    pub fn new(basis: DatabaseAtRevisionImpl, effective_timestamp: DateTime<Utc>) -> Self {
        Self {
            inner: Arc::new(EffectiveTimestampViewInner {
                basis,
                effective_timestamp,
            }),
        }
    }
}

impl std::fmt::Debug for EffectiveTimestampViewImpl {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("EffectiveTimestampViewImpl")
            .field("basis", &self.inner.basis)
            .field("effective_timestamp", &self.inner.effective_timestamp)
            .finish()
    }
}

impl DatabaseIdentity for EffectiveTimestampViewImpl {
    fn name(&self) -> &DatabaseName {
        self.inner.basis.name()
    }

    fn created_at(&self) -> DateTime<Utc> {
        self.inner.basis.created_at()
    }
}

impl DatabaseAtRevision for EffectiveTimestampViewImpl {
    type EffectiveTimestampView = Self;
    type Speculative = SpeculativeDatabaseImpl;

    fn revision(&self) -> Revision {
        self.inner.basis.revision()
    }

    fn revision_timestamp(&self) -> DateTime<Utc> {
        self.inner.basis.revision_timestamp()
    }

    fn at_effective_timestamp(
        &self,
        effective_timestamp: DateTime<Utc>,
    ) -> Self::EffectiveTimestampView {
        Self::new(self.inner.basis.clone(), effective_timestamp)
    }

    fn speculate_with_transaction(
        &self,
        transaction: NonEmpty<ProspectiveEvent>,
    ) -> Self::Speculative {
        // Note: This creates a speculative view based on the basis, not this effective timestamp view
        SpeculativeDatabaseImpl::new(self.inner.basis.clone(), transaction)
    }

    fn at_revision(&self, revision: Revision) -> impl std::future::Future<Output = Self> {
        let effective_timestamp = self.inner.effective_timestamp;
        let database_name = self.inner.basis.name().to_string();
        let mut client = self.inner.basis.client();
        let created_at = self.inner.basis.created_at();
        let name = self.inner.basis.name().clone();

        async move {
            // Fetch the database at the requested revision
            let proto_db = client.await_database(database_name, revision).await;

            match proto_db {
                Ok(db) => {
                    let revision_timestamp = db
                        .revision_timestamp
                        .and_then(|ts| crate::conversions::timestamp_to_datetime(ts).ok())
                        .unwrap_or_else(chrono::Utc::now);

                    let new_basis = DatabaseAtRevisionImpl::at_revision_with_metadata(
                        client,
                        name,
                        created_at,
                        db.revision,
                        revision_timestamp,
                    );

                    Self::new(new_basis, effective_timestamp)
                }
                Err(_) => {
                    // On error, return self with updated revision (best effort)
                    let new_basis = DatabaseAtRevisionImpl::at_revision_with_metadata(
                        client,
                        name,
                        created_at,
                        revision,
                        chrono::Utc::now(),
                    );

                    Self::new(new_basis, effective_timestamp)
                }
            }
        }
    }

    fn query_events(&self, selector: &EventSelector) -> impl Stream<Item = Event> {
        let database_name = self.inner.basis.name().to_string();
        let revision = self.inner.basis.revision();
        let mut client = self.inner.basis.client();
        let proto_selector: proto::EventSelector = selector.clone().into();
        let effective_ts = self.inner.effective_timestamp;

        // Create a query with effective time range
        let effective_time_range = proto::query_range::EffectiveTimeRange {
            start_at: None,
            end_at: Some(datetime_to_timestamp(effective_ts)),
        };

        let query = proto::DatabaseQuery {
            selector: Some(proto_selector),
            range: Some(proto::QueryRange {
                range: Some(proto::query_range::Range::EffectiveTime(
                    effective_time_range,
                )),
            }),
            direction: proto::QueryDirection::Forward as i32,
            limit: None,
        };

        stream::once(async move {
            let result = client
                .query_events(database_name, revision, true, query)
                .await;

            match result {
                Ok(response_stream) => response_stream
                    .filter_map(|result| async move {
                        match result {
                            Ok(reply) => {
                                if let Some(proto::event_query_reply::Event::Detail(ce)) =
                                    reply.event
                                {
                                    Event::try_from(ce).ok()
                                } else {
                                    None
                                }
                            }
                            Err(_) => None,
                        }
                    })
                    .boxed(),
                Err(_) => stream::empty().boxed(),
            }
        })
        .flatten()
    }

    fn query_events_with_options(
        &self,
        selector: &EventSelector,
        options: QueryOptions,
    ) -> impl Stream<Item = Event> {
        use evidentsource_core::domain::QueryDirection;

        let database_name = self.inner.basis.name().to_string();
        let revision = self.inner.basis.revision();
        let mut client = self.inner.basis.client();
        let proto_selector: proto::EventSelector = selector.clone().into();
        let effective_ts = self.inner.effective_timestamp;

        let direction = match options.get_direction() {
            QueryDirection::Forward => proto::QueryDirection::Forward as i32,
            QueryDirection::Reverse => proto::QueryDirection::Reverse as i32,
        };

        // Create a query with effective time range
        let effective_time_range = proto::query_range::EffectiveTimeRange {
            start_at: None,
            end_at: Some(datetime_to_timestamp(effective_ts)),
        };

        let query = proto::DatabaseQuery {
            selector: Some(proto_selector),
            range: Some(proto::QueryRange {
                range: Some(proto::query_range::Range::EffectiveTime(
                    effective_time_range,
                )),
            }),
            direction,
            limit: options.get_limit().map(|l| l as u32),
        };

        stream::once(async move {
            let result = client
                .query_events(database_name, revision, true, query)
                .await;

            match result {
                Ok(response_stream) => response_stream
                    .filter_map(|result| async move {
                        match result {
                            Ok(reply) => {
                                if let Some(proto::event_query_reply::Event::Detail(ce)) =
                                    reply.event
                                {
                                    Event::try_from(ce).ok()
                                } else {
                                    None
                                }
                            }
                            Err(_) => None,
                        }
                    })
                    .boxed(),
                Err(_) => stream::empty().boxed(),
            }
        })
        .flatten()
    }

    fn view_state(
        &self,
        name: &StateViewName,
        version: StateViewVersion,
    ) -> impl std::future::Future<Output = Result<StateView, StateViewError>> {
        let state_view_name = name.to_string();
        let identity = proto::StateViewIdentity {
            database_name: self.inner.basis.name().to_string(),
            state_view_name: state_view_name.clone(),
            state_view_version: version,
        };
        let revision = self.inner.basis.revision();
        let effective_ts = datetime_to_timestamp(self.inner.effective_timestamp);
        let mut client = self.inner.basis.client();

        async move {
            let proto_view = client
                .fetch_state_view_at_revision(Some(identity), revision, None, Some(effective_ts))
                .await
                .map_err(|e| match e {
                    crate::Error::GrpcStatus(ref status) => {
                        crate::status_mapping::to_state_view_error(
                            status,
                            &state_view_name,
                            version,
                        )
                    }
                    _ => StateViewError::ServerError(e.to_string()),
                })?;

            StateView::try_from(proto_view).map_err(|e| {
                StateViewError::ServerError(format!("failed to parse state view: {}", e))
            })
        }
    }

    fn view_state_with_params(
        &self,
        name: &StateViewName,
        version: StateViewVersion,
        params: &[(String, EventAttribute)],
    ) -> impl std::future::Future<Output = Result<StateView, StateViewError>> {
        let state_view_name = name.to_string();
        let identity = proto::StateViewIdentity {
            database_name: self.inner.basis.name().to_string(),
            state_view_name: state_view_name.clone(),
            state_view_version: version,
        };
        let revision = self.inner.basis.revision();
        let effective_ts = datetime_to_timestamp(self.inner.effective_timestamp);
        let mut client = self.inner.basis.client();

        // Convert params to proto ParameterBindings
        let param_bindings = if params.is_empty() {
            None
        } else {
            Some(proto::ParameterBindings {
                bindings: params
                    .iter()
                    .map(|(k, v)| (k.clone(), v.clone().into()))
                    .collect(),
            })
        };

        async move {
            let proto_view = client
                .fetch_state_view_at_revision(
                    Some(identity),
                    revision,
                    param_bindings,
                    Some(effective_ts),
                )
                .await
                .map_err(|e| match e {
                    crate::Error::GrpcStatus(ref status) => {
                        crate::status_mapping::to_state_view_error(
                            status,
                            &state_view_name,
                            version,
                        )
                    }
                    _ => StateViewError::ServerError(e.to_string()),
                })?;

            StateView::try_from(proto_view).map_err(|e| {
                StateViewError::ServerError(format!("failed to parse state view: {}", e))
            })
        }
    }
}

impl DatabaseAtRevisionAndEffectiveTimestamp for EffectiveTimestampViewImpl {
    type Basis = DatabaseAtRevisionImpl;

    fn basis(&self) -> &Self::Basis {
        &self.inner.basis
    }

    fn effective_timestamp(&self) -> DateTime<Utc> {
        self.inner.effective_timestamp
    }

    fn at_revision_with_effective_timestamp(
        &self,
        revision: Revision,
    ) -> impl std::future::Future<Output = Result<Self, DatabaseError>> {
        let effective_timestamp = self.inner.effective_timestamp;
        let database_name = self.inner.basis.name().to_string();
        let mut client = self.inner.basis.client();
        let created_at = self.inner.basis.created_at();
        let name = self.inner.basis.name().clone();

        async move {
            // Fetch the database at the requested revision
            let proto_db = client
                .await_database(database_name.clone(), revision)
                .await
                .map_err(|e| match e {
                    crate::Error::GrpcStatus(ref status) => {
                        crate::status_mapping::to_database_error(status, &database_name)
                    }
                    _ => DatabaseError::ServerError(e.to_string()),
                })?;

            // Create a new basis at the fetched revision
            let new_basis = DatabaseAtRevisionImpl::at_revision_with_metadata(
                client,
                name,
                created_at,
                proto_db.revision,
                crate::conversions::timestamp_to_datetime(proto_db.revision_timestamp.ok_or_else(
                    || DatabaseError::ServerError("missing revision_timestamp".to_string()),
                )?)
                .map_err(|e| {
                    DatabaseError::ServerError(format!("invalid revision_timestamp: {}", e))
                })?,
            );

            Ok(Self::new(new_basis, effective_timestamp))
        }
    }
}