use std::sync::Arc;
use chrono::{DateTime, Utc};
use futures::stream;
use futures::Stream;
use nonempty::NonEmpty;
use evidentsource_core::domain::{
DatabaseName, Event, EventAttribute, EventSelector, ProspectiveEvent, QueryDirection,
QueryOptions, Revision, StateView, StateViewError, StateViewName, StateViewVersion,
};
use evidentsource_core::{DatabaseAtRevision, DatabaseIdentity, SpeculativeDatabase};
use super::at_revision::DatabaseAtRevisionImpl;
use super::effective_timestamp::EffectiveTimestampViewImpl;
struct SpeculativeDatabaseInner {
basis: DatabaseAtRevisionImpl,
speculated_transactions: NonEmpty<NonEmpty<ProspectiveEvent>>,
}
#[derive(Clone)]
pub struct SpeculativeDatabaseImpl {
inner: Arc<SpeculativeDatabaseInner>,
}
impl SpeculativeDatabaseImpl {
pub fn new(basis: DatabaseAtRevisionImpl, transaction: NonEmpty<ProspectiveEvent>) -> Self {
Self {
inner: Arc::new(SpeculativeDatabaseInner {
basis,
speculated_transactions: NonEmpty::singleton(transaction),
}),
}
}
fn with_additional_transaction(self, transaction: NonEmpty<ProspectiveEvent>) -> Self {
let mut transactions = self.inner.speculated_transactions.clone();
transactions.push(transaction);
Self {
inner: Arc::new(SpeculativeDatabaseInner {
basis: self.inner.basis.clone(),
speculated_transactions: transactions,
}),
}
}
fn speculated_events(&self) -> impl Iterator<Item = &ProspectiveEvent> {
self.inner
.speculated_transactions
.iter()
.flat_map(|b| b.iter())
}
}
impl std::fmt::Debug for SpeculativeDatabaseImpl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SpeculativeDatabaseImpl")
.field("basis", &self.inner.basis)
.field(
"speculated_transaction_count",
&self.inner.speculated_transactions.len(),
)
.finish()
}
}
impl DatabaseIdentity for SpeculativeDatabaseImpl {
fn name(&self) -> &DatabaseName {
self.inner.basis.name()
}
fn created_at(&self) -> DateTime<Utc> {
self.inner.basis.created_at()
}
}
impl DatabaseAtRevision for SpeculativeDatabaseImpl {
type EffectiveTimestampView = EffectiveTimestampViewImpl;
type Speculative = Self;
fn revision(&self) -> Revision {
let total_events: u64 = self
.inner
.speculated_transactions
.iter()
.map(|transaction| transaction.len() as u64)
.sum();
self.inner.basis.revision() + total_events
}
fn revision_timestamp(&self) -> DateTime<Utc> {
Utc::now()
}
fn at_effective_timestamp(
&self,
effective_timestamp: DateTime<Utc>,
) -> Self::EffectiveTimestampView {
EffectiveTimestampViewImpl::new(self.inner.basis.clone(), effective_timestamp)
}
fn speculate_with_transaction(
&self,
transaction: NonEmpty<ProspectiveEvent>,
) -> Self::Speculative {
self.clone().with_additional_transaction(transaction)
}
fn at_revision(&self, _revision: Revision) -> impl std::future::Future<Output = Self> {
let this = self.clone();
async move { this }
}
fn query_events(&self, selector: &EventSelector) -> impl Stream<Item = Event> {
let speculated: Vec<Event> = self
.speculated_events()
.filter(|pe| selector.matches_prospective(pe))
.map(|pe| {
Event {
id: pe.id.clone(),
source: format!("speculative://{}", pe.stream),
event_type: pe.event_type.clone(),
subject: pe.subject.clone(),
data: pe.data.clone(),
time: pe.time,
datacontenttype: pe.datacontenttype.clone(),
dataschema: pe.dataschema.clone(),
extensions: pe.extensions.clone(),
}
})
.collect();
stream::iter(speculated)
}
fn query_events_with_options(
&self,
selector: &EventSelector,
options: QueryOptions,
) -> impl Stream<Item = Event> {
let mut speculated: Vec<Event> = self
.speculated_events()
.filter(|pe| selector.matches_prospective(pe))
.map(|pe| Event {
id: pe.id.clone(),
source: format!("speculative://{}", pe.stream),
event_type: pe.event_type.clone(),
subject: pe.subject.clone(),
data: pe.data.clone(),
time: pe.time,
datacontenttype: pe.datacontenttype.clone(),
dataschema: pe.dataschema.clone(),
extensions: pe.extensions.clone(),
})
.collect();
if options.get_direction() == QueryDirection::Reverse {
speculated.reverse();
}
if let Some(limit) = options.get_limit() {
speculated.truncate(limit as usize);
}
stream::iter(speculated)
}
async fn view_state(
&self,
_name: &StateViewName,
_version: StateViewVersion,
) -> Result<StateView, StateViewError> {
Err(StateViewError::EvolveError(
"Cannot compute state views speculatively without local WASM runtime".to_string(),
))
}
async fn view_state_with_params(
&self,
_name: &StateViewName,
_version: StateViewVersion,
_params: &[(String, EventAttribute)],
) -> Result<StateView, StateViewError> {
Err(StateViewError::EvolveError(
"Cannot compute state views speculatively without local WASM runtime".to_string(),
))
}
}
impl SpeculativeDatabase for SpeculativeDatabaseImpl {
type Basis = DatabaseAtRevisionImpl;
fn basis(&self) -> &Self::Basis {
&self.inner.basis
}
fn speculated_transactions(&self) -> &NonEmpty<NonEmpty<ProspectiveEvent>> {
&self.inner.speculated_transactions
}
}