use std::sync::Arc;
use chrono::{DateTime, Utc};
use futures::stream::{self, StreamExt};
use futures::Stream;
use nonempty::NonEmpty;
use evidentsource_core::domain::{
DatabaseError, DatabaseName, Event, EventAttribute, EventSelector, ProspectiveEvent,
QueryOptions, Revision, StateView, StateViewError, StateViewName, StateViewVersion,
};
use evidentsource_core::{
DatabaseAtRevision, DatabaseAtRevisionAndEffectiveTimestamp, DatabaseIdentity,
};
use crate::com::evidentsource as proto;
use crate::conversions::datetime_to_timestamp;
use super::at_revision::DatabaseAtRevisionImpl;
use super::speculative::SpeculativeDatabaseImpl;
struct EffectiveTimestampViewInner {
basis: DatabaseAtRevisionImpl,
effective_timestamp: DateTime<Utc>,
}
#[derive(Clone)]
pub struct EffectiveTimestampViewImpl {
inner: Arc<EffectiveTimestampViewInner>,
}
impl EffectiveTimestampViewImpl {
pub fn new(basis: DatabaseAtRevisionImpl, effective_timestamp: DateTime<Utc>) -> Self {
Self {
inner: Arc::new(EffectiveTimestampViewInner {
basis,
effective_timestamp,
}),
}
}
}
impl std::fmt::Debug for EffectiveTimestampViewImpl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EffectiveTimestampViewImpl")
.field("basis", &self.inner.basis)
.field("effective_timestamp", &self.inner.effective_timestamp)
.finish()
}
}
impl DatabaseIdentity for EffectiveTimestampViewImpl {
fn name(&self) -> &DatabaseName {
self.inner.basis.name()
}
fn created_at(&self) -> DateTime<Utc> {
self.inner.basis.created_at()
}
}
impl DatabaseAtRevision for EffectiveTimestampViewImpl {
type EffectiveTimestampView = Self;
type Speculative = SpeculativeDatabaseImpl;
fn revision(&self) -> Revision {
self.inner.basis.revision()
}
fn revision_timestamp(&self) -> DateTime<Utc> {
self.inner.basis.revision_timestamp()
}
fn at_effective_timestamp(
&self,
effective_timestamp: DateTime<Utc>,
) -> Self::EffectiveTimestampView {
Self::new(self.inner.basis.clone(), effective_timestamp)
}
fn speculate_with_transaction(
&self,
transaction: NonEmpty<ProspectiveEvent>,
) -> Self::Speculative {
SpeculativeDatabaseImpl::new(self.inner.basis.clone(), transaction)
}
fn at_revision(&self, revision: Revision) -> impl std::future::Future<Output = Self> {
let effective_timestamp = self.inner.effective_timestamp;
let database_name = self.inner.basis.name().to_string();
let mut client = self.inner.basis.client();
let created_at = self.inner.basis.created_at();
let name = self.inner.basis.name().clone();
async move {
let proto_db = client.await_database(database_name, revision).await;
match proto_db {
Ok(db) => {
let revision_timestamp = db
.revision_timestamp
.and_then(|ts| crate::conversions::timestamp_to_datetime(ts).ok())
.unwrap_or_else(chrono::Utc::now);
let new_basis = DatabaseAtRevisionImpl::at_revision_with_metadata(
client,
name,
created_at,
db.revision,
revision_timestamp,
);
Self::new(new_basis, effective_timestamp)
}
Err(_) => {
let new_basis = DatabaseAtRevisionImpl::at_revision_with_metadata(
client,
name,
created_at,
revision,
chrono::Utc::now(),
);
Self::new(new_basis, effective_timestamp)
}
}
}
}
fn query_events(&self, selector: &EventSelector) -> impl Stream<Item = Event> {
let database_name = self.inner.basis.name().to_string();
let revision = self.inner.basis.revision();
let mut client = self.inner.basis.client();
let proto_selector: proto::EventSelector = selector.clone().into();
let effective_ts = self.inner.effective_timestamp;
let effective_time_range = proto::query_range::EffectiveTimeRange {
start_at: None,
end_at: Some(datetime_to_timestamp(effective_ts)),
};
let query = proto::DatabaseQuery {
selector: Some(proto_selector),
range: Some(proto::QueryRange {
range: Some(proto::query_range::Range::EffectiveTime(
effective_time_range,
)),
}),
direction: proto::QueryDirection::Forward as i32,
limit: None,
};
stream::once(async move {
let result = client
.query_events(database_name, revision, true, query)
.await;
match result {
Ok(response_stream) => response_stream
.filter_map(|result| async move {
match result {
Ok(reply) => {
if let Some(proto::event_query_reply::Event::Detail(ce)) =
reply.event
{
Event::try_from(ce).ok()
} else {
None
}
}
Err(_) => None,
}
})
.boxed(),
Err(_) => stream::empty().boxed(),
}
})
.flatten()
}
fn query_events_with_options(
&self,
selector: &EventSelector,
options: QueryOptions,
) -> impl Stream<Item = Event> {
use evidentsource_core::domain::QueryDirection;
let database_name = self.inner.basis.name().to_string();
let revision = self.inner.basis.revision();
let mut client = self.inner.basis.client();
let proto_selector: proto::EventSelector = selector.clone().into();
let effective_ts = self.inner.effective_timestamp;
let direction = match options.get_direction() {
QueryDirection::Forward => proto::QueryDirection::Forward as i32,
QueryDirection::Reverse => proto::QueryDirection::Reverse as i32,
};
let effective_time_range = proto::query_range::EffectiveTimeRange {
start_at: None,
end_at: Some(datetime_to_timestamp(effective_ts)),
};
let query = proto::DatabaseQuery {
selector: Some(proto_selector),
range: Some(proto::QueryRange {
range: Some(proto::query_range::Range::EffectiveTime(
effective_time_range,
)),
}),
direction,
limit: options.get_limit().map(|l| l as u32),
};
stream::once(async move {
let result = client
.query_events(database_name, revision, true, query)
.await;
match result {
Ok(response_stream) => response_stream
.filter_map(|result| async move {
match result {
Ok(reply) => {
if let Some(proto::event_query_reply::Event::Detail(ce)) =
reply.event
{
Event::try_from(ce).ok()
} else {
None
}
}
Err(_) => None,
}
})
.boxed(),
Err(_) => stream::empty().boxed(),
}
})
.flatten()
}
fn view_state(
&self,
name: &StateViewName,
version: StateViewVersion,
) -> impl std::future::Future<Output = Result<StateView, StateViewError>> {
let state_view_name = name.to_string();
let identity = proto::StateViewIdentity {
database_name: self.inner.basis.name().to_string(),
state_view_name: state_view_name.clone(),
state_view_version: version,
};
let revision = self.inner.basis.revision();
let effective_ts = datetime_to_timestamp(self.inner.effective_timestamp);
let mut client = self.inner.basis.client();
async move {
let proto_view = client
.fetch_state_view_at_revision(Some(identity), revision, None, Some(effective_ts))
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => {
crate::status_mapping::to_state_view_error(
status,
&state_view_name,
version,
)
}
_ => StateViewError::ServerError(e.to_string()),
})?;
StateView::try_from(proto_view).map_err(|e| {
StateViewError::ServerError(format!("failed to parse state view: {}", e))
})
}
}
fn view_state_with_params(
&self,
name: &StateViewName,
version: StateViewVersion,
params: &[(String, EventAttribute)],
) -> impl std::future::Future<Output = Result<StateView, StateViewError>> {
let state_view_name = name.to_string();
let identity = proto::StateViewIdentity {
database_name: self.inner.basis.name().to_string(),
state_view_name: state_view_name.clone(),
state_view_version: version,
};
let revision = self.inner.basis.revision();
let effective_ts = datetime_to_timestamp(self.inner.effective_timestamp);
let mut client = self.inner.basis.client();
let param_bindings = if params.is_empty() {
None
} else {
Some(proto::ParameterBindings {
bindings: params
.iter()
.map(|(k, v)| (k.clone(), v.clone().into()))
.collect(),
})
};
async move {
let proto_view = client
.fetch_state_view_at_revision(
Some(identity),
revision,
param_bindings,
Some(effective_ts),
)
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => {
crate::status_mapping::to_state_view_error(
status,
&state_view_name,
version,
)
}
_ => StateViewError::ServerError(e.to_string()),
})?;
StateView::try_from(proto_view).map_err(|e| {
StateViewError::ServerError(format!("failed to parse state view: {}", e))
})
}
}
}
impl DatabaseAtRevisionAndEffectiveTimestamp for EffectiveTimestampViewImpl {
type Basis = DatabaseAtRevisionImpl;
fn basis(&self) -> &Self::Basis {
&self.inner.basis
}
fn effective_timestamp(&self) -> DateTime<Utc> {
self.inner.effective_timestamp
}
fn at_revision_with_effective_timestamp(
&self,
revision: Revision,
) -> impl std::future::Future<Output = Result<Self, DatabaseError>> {
let effective_timestamp = self.inner.effective_timestamp;
let database_name = self.inner.basis.name().to_string();
let mut client = self.inner.basis.client();
let created_at = self.inner.basis.created_at();
let name = self.inner.basis.name().clone();
async move {
let proto_db = client
.await_database(database_name.clone(), revision)
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => {
crate::status_mapping::to_database_error(status, &database_name)
}
_ => DatabaseError::ServerError(e.to_string()),
})?;
let new_basis = DatabaseAtRevisionImpl::at_revision_with_metadata(
client,
name,
created_at,
proto_db.revision,
crate::conversions::timestamp_to_datetime(proto_db.revision_timestamp.ok_or_else(
|| DatabaseError::ServerError("missing revision_timestamp".to_string()),
)?)
.map_err(|e| {
DatabaseError::ServerError(format!("invalid revision_timestamp: {}", e))
})?,
);
Ok(Self::new(new_basis, effective_timestamp))
}
}
}