use std::sync::Arc;
use chrono::{DateTime, Utc};
use futures::stream::{self, StreamExt};
use futures::Stream;
use nonempty::NonEmpty;
use evidentsource_core::domain::{
DatabaseName, Event, EventAttribute, EventSelector, ProspectiveEvent, QueryOptions, Revision,
StateView, StateViewError, StateViewName, StateViewVersion,
};
use evidentsource_core::{DatabaseAtRevision, DatabaseIdentity};
use crate::com::evidentsource as proto;
use crate::conversions::ConversionError;
use crate::EvidentSourceClient;
use super::effective_timestamp::EffectiveTimestampViewImpl;
use super::speculative::SpeculativeDatabaseImpl;
struct DatabaseAtRevisionInner {
client: EvidentSourceClient,
name: DatabaseName,
created_at: DateTime<Utc>,
revision: Revision,
revision_timestamp: DateTime<Utc>,
}
#[derive(Clone)]
pub struct DatabaseAtRevisionImpl {
inner: Arc<DatabaseAtRevisionInner>,
}
impl DatabaseAtRevisionImpl {
pub fn new(
client: EvidentSourceClient,
proto_db: proto::Database,
) -> Result<Self, ConversionError> {
use crate::conversions::timestamp_to_datetime;
let name = DatabaseName::new(&proto_db.name)?;
let created_at = proto_db
.created_at
.ok_or_else(|| ConversionError::missing_field("Database", "created_at"))
.and_then(timestamp_to_datetime)?;
let revision_timestamp = proto_db
.revision_timestamp
.ok_or_else(|| ConversionError::missing_field("Database", "revision_timestamp"))
.and_then(timestamp_to_datetime)?;
Ok(Self {
inner: Arc::new(DatabaseAtRevisionInner {
client,
name,
created_at,
revision: proto_db.revision,
revision_timestamp,
}),
})
}
pub(crate) fn at_revision_with_metadata(
client: EvidentSourceClient,
name: DatabaseName,
created_at: DateTime<Utc>,
revision: Revision,
revision_timestamp: DateTime<Utc>,
) -> Self {
Self {
inner: Arc::new(DatabaseAtRevisionInner {
client,
name,
created_at,
revision,
revision_timestamp,
}),
}
}
pub(crate) fn client(&self) -> EvidentSourceClient {
self.inner.client.clone()
}
}
impl std::fmt::Debug for DatabaseAtRevisionImpl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DatabaseAtRevisionImpl")
.field("name", &self.inner.name)
.field("revision", &self.inner.revision)
.finish()
}
}
impl DatabaseIdentity for DatabaseAtRevisionImpl {
fn name(&self) -> &DatabaseName {
&self.inner.name
}
fn created_at(&self) -> DateTime<Utc> {
self.inner.created_at
}
}
impl DatabaseAtRevision for DatabaseAtRevisionImpl {
type EffectiveTimestampView = EffectiveTimestampViewImpl;
type Speculative = SpeculativeDatabaseImpl;
fn revision(&self) -> Revision {
self.inner.revision
}
fn revision_timestamp(&self) -> DateTime<Utc> {
self.inner.revision_timestamp
}
fn at_effective_timestamp(
&self,
effective_timestamp: DateTime<Utc>,
) -> Self::EffectiveTimestampView {
EffectiveTimestampViewImpl::new(self.clone(), effective_timestamp)
}
fn speculate_with_transaction(
&self,
transaction: NonEmpty<ProspectiveEvent>,
) -> Self::Speculative {
SpeculativeDatabaseImpl::new(self.clone(), transaction)
}
fn at_revision(&self, revision: Revision) -> impl std::future::Future<Output = Self> {
let mut client = self.inner.client.clone();
let database_name = self.inner.name.to_string();
let name = self.inner.name.clone();
let created_at = self.inner.created_at;
async move {
let proto_db = client.await_database(database_name, revision).await;
match proto_db {
Ok(db) => {
use crate::conversions::timestamp_to_datetime;
let revision_timestamp = db
.revision_timestamp
.and_then(|ts| timestamp_to_datetime(ts).ok())
.unwrap_or_else(chrono::Utc::now);
Self::at_revision_with_metadata(
client,
name,
created_at,
db.revision,
revision_timestamp,
)
}
Err(_) => {
Self::at_revision_with_metadata(
client,
name,
created_at,
revision,
chrono::Utc::now(),
)
}
}
}
}
fn query_events(&self, selector: &EventSelector) -> impl Stream<Item = Event> {
let database_name = self.inner.name.to_string();
let revision = self.inner.revision;
let mut client = self.inner.client.clone();
let proto_selector: proto::EventSelector = selector.clone().into();
let query = proto::DatabaseQuery {
selector: Some(proto_selector),
range: Some(proto::QueryRange {
range: Some(proto::query_range::Range::Revision(
proto::query_range::RevisionRange { start_at: Some(0) },
)),
}),
direction: proto::QueryDirection::Forward as i32,
limit: None,
};
stream::once(async move {
let result = client
.query_events(database_name, revision, true, query)
.await;
match result {
Ok(response_stream) => response_stream
.filter_map(|result| async move {
match result {
Ok(reply) => {
if let Some(proto::event_query_reply::Event::Detail(ce)) =
reply.event
{
Event::try_from(ce).ok()
} else {
None
}
}
Err(_) => None,
}
})
.boxed(),
Err(_) => stream::empty().boxed(),
}
})
.flatten()
}
fn query_events_with_options(
&self,
selector: &EventSelector,
options: QueryOptions,
) -> impl Stream<Item = Event> {
use evidentsource_core::domain::QueryDirection;
let database_name = self.inner.name.to_string();
let revision = self.inner.revision;
let mut client = self.inner.client.clone();
let proto_selector: proto::EventSelector = selector.clone().into();
let direction = match options.get_direction() {
QueryDirection::Forward => proto::QueryDirection::Forward as i32,
QueryDirection::Reverse => proto::QueryDirection::Reverse as i32,
};
let query = proto::DatabaseQuery {
selector: Some(proto_selector),
range: Some(proto::QueryRange {
range: Some(proto::query_range::Range::Revision(
proto::query_range::RevisionRange { start_at: Some(0) },
)),
}),
direction,
limit: options.get_limit().map(|l| l as u32),
};
stream::once(async move {
let result = client
.query_events(database_name, revision, true, query)
.await;
match result {
Ok(response_stream) => response_stream
.filter_map(|result| async move {
match result {
Ok(reply) => {
if let Some(proto::event_query_reply::Event::Detail(ce)) =
reply.event
{
Event::try_from(ce).ok()
} else {
None
}
}
Err(_) => None,
}
})
.boxed(),
Err(_) => stream::empty().boxed(),
}
})
.flatten()
}
fn view_state(
&self,
name: &StateViewName,
version: StateViewVersion,
) -> impl std::future::Future<Output = Result<StateView, StateViewError>> {
let state_view_name = name.to_string();
let identity = proto::StateViewIdentity {
database_name: self.inner.name.to_string(),
state_view_name: state_view_name.clone(),
state_view_version: version,
};
let revision = self.inner.revision;
let mut client = self.inner.client.clone();
async move {
let proto_view = client
.fetch_state_view_at_revision(Some(identity), revision, None, None)
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => {
crate::status_mapping::to_state_view_error(
status,
&state_view_name,
version,
)
}
_ => StateViewError::ServerError(e.to_string()),
})?;
StateView::try_from(proto_view).map_err(|e| {
StateViewError::ServerError(format!("failed to parse state view: {}", e))
})
}
}
fn view_state_with_params(
&self,
name: &StateViewName,
version: StateViewVersion,
params: &[(String, EventAttribute)],
) -> impl std::future::Future<Output = Result<StateView, StateViewError>> {
let state_view_name = name.to_string();
let identity = proto::StateViewIdentity {
database_name: self.inner.name.to_string(),
state_view_name: state_view_name.clone(),
state_view_version: version,
};
let revision = self.inner.revision;
let mut client = self.inner.client.clone();
let param_bindings = if params.is_empty() {
None
} else {
Some(proto::ParameterBindings {
bindings: params
.iter()
.map(|(k, v)| (k.clone(), v.clone().into()))
.collect(),
})
};
async move {
let proto_view = client
.fetch_state_view_at_revision(Some(identity), revision, param_bindings, None)
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => {
crate::status_mapping::to_state_view_error(
status,
&state_view_name,
version,
)
}
_ => StateViewError::ServerError(e.to_string()),
})?;
StateView::try_from(proto_view).map_err(|e| {
StateViewError::ServerError(format!("failed to parse state view: {}", e))
})
}
}
}
impl DatabaseAtRevisionImpl {
pub fn events(&self, selector: EventSelector) -> EventQueryBuilder {
EventQueryBuilder::new(self.clone(), selector)
}
pub fn list_streams(&self) -> impl Stream<Item = evidentsource_core::domain::StreamName> {
use crate::com::evidentsource::index_key_scan_request::IndexKeyType;
use evidentsource_core::domain::StreamName;
let mut client = self.inner.client.clone();
let database_name = self.inner.name.to_string();
let revision = self.inner.revision;
stream::once(async move {
let result = client
.scan_index_keys(database_name, revision, IndexKeyType::Stream)
.await;
match result {
Ok(response_stream) => response_stream
.filter_map(|result| async move {
match result {
Ok(reply) => StreamName::new(&reply.index_key).ok(),
Err(_) => None,
}
})
.boxed(),
Err(_) => stream::empty().boxed(),
}
})
.flatten()
}
pub fn list_subjects(&self) -> impl Stream<Item = evidentsource_core::domain::EventSubject> {
use crate::com::evidentsource::index_key_scan_request::IndexKeyType;
use evidentsource_core::domain::EventSubject;
let mut client = self.inner.client.clone();
let database_name = self.inner.name.to_string();
let revision = self.inner.revision;
stream::once(async move {
let result = client
.scan_index_keys(database_name, revision, IndexKeyType::Subject)
.await;
match result {
Ok(response_stream) => response_stream
.filter_map(|result| async move {
match result {
Ok(reply) => EventSubject::new(&reply.index_key).ok(),
Err(_) => None,
}
})
.boxed(),
Err(_) => stream::empty().boxed(),
}
})
.flatten()
}
pub fn list_event_types(&self) -> impl Stream<Item = evidentsource_core::domain::EventType> {
use crate::com::evidentsource::index_key_scan_request::IndexKeyType;
use evidentsource_core::domain::EventType;
let mut client = self.inner.client.clone();
let database_name = self.inner.name.to_string();
let revision = self.inner.revision;
stream::once(async move {
let result = client
.scan_index_keys(database_name, revision, IndexKeyType::EventType)
.await;
match result {
Ok(response_stream) => response_stream
.filter_map(|result| async move {
match result {
Ok(reply) => EventType::new(&reply.index_key).ok(),
Err(_) => None,
}
})
.boxed(),
Err(_) => stream::empty().boxed(),
}
})
.flatten()
}
}
pub struct EventQueryBuilder {
database: DatabaseAtRevisionImpl,
selector: EventSelector,
options: QueryOptions,
}
impl EventQueryBuilder {
pub fn new(database: DatabaseAtRevisionImpl, selector: EventSelector) -> Self {
Self {
database,
selector,
options: QueryOptions::default(),
}
}
pub fn direction(mut self, direction: evidentsource_core::domain::QueryDirection) -> Self {
self.options = self.options.direction(direction);
self
}
pub fn reverse(mut self) -> Self {
self.options = self.options.reverse();
self
}
pub fn forward(mut self) -> Self {
self.options = self.options.forward();
self
}
pub fn limit(mut self, limit: u64) -> Self {
self.options = self.options.limit(limit);
self
}
pub async fn collect(self) -> Vec<Event> {
use futures::StreamExt as _;
self.database
.query_events_with_options(&self.selector, self.options)
.collect()
.await
}
pub async fn first(mut self) -> Option<Event> {
use futures::StreamExt as _;
self.options = self.options.limit(1);
self.database
.query_events_with_options(&self.selector, self.options)
.boxed()
.next()
.await
}
pub fn typed<T>(self) -> TypedEventQuery<T>
where
T: for<'a> TryFrom<&'a Event> + 'static,
{
TypedEventQuery::new(self)
}
}
pub struct TypedEventQuery<T> {
builder: EventQueryBuilder,
_phantom: std::marker::PhantomData<T>,
}
impl<T> TypedEventQuery<T>
where
T: for<'a> TryFrom<&'a Event> + 'static,
{
fn new(builder: EventQueryBuilder) -> Self {
Self {
builder,
_phantom: std::marker::PhantomData,
}
}
pub async fn collect(self) -> Vec<T> {
let events: Vec<Event> = self.builder.collect().await;
events
.iter()
.filter_map(|event| T::try_from(event).ok())
.collect()
}
pub async fn first(self) -> Option<T> {
let event = self.builder.first().await?;
T::try_from(&event).ok()
}
}
pub trait DatabaseAtRevisionTyped: DatabaseAtRevision {
fn view_state_as<T>(
&self,
name: &StateViewName,
version: StateViewVersion,
) -> impl std::future::Future<Output = Result<T, StateViewError>>
where
T: serde::de::DeserializeOwned;
fn view_state_opt<T>(
&self,
name: &StateViewName,
version: StateViewVersion,
) -> impl std::future::Future<Output = Result<Option<T>, StateViewError>>
where
T: serde::de::DeserializeOwned;
}
impl DatabaseAtRevisionTyped for DatabaseAtRevisionImpl {
fn view_state_as<T>(
&self,
name: &StateViewName,
version: StateViewVersion,
) -> impl std::future::Future<Output = Result<T, StateViewError>>
where
T: serde::de::DeserializeOwned,
{
let view_future = self.view_state(name, version);
async move {
let state_view = view_future.await?;
let content = state_view.content_bytes().ok_or_else(|| {
StateViewError::ServerError("state view has no content".to_string())
})?;
serde_json::from_slice(content).map_err(|e| {
StateViewError::ServerError(format!("failed to deserialize state view: {}", e))
})
}
}
fn view_state_opt<T>(
&self,
name: &StateViewName,
version: StateViewVersion,
) -> impl std::future::Future<Output = Result<Option<T>, StateViewError>>
where
T: serde::de::DeserializeOwned,
{
let view_future = self.view_state(name, version);
async move {
match view_future.await {
Ok(state_view) => {
match state_view.content_bytes() {
Some(content) => {
let value = serde_json::from_slice(content).map_err(|e| {
StateViewError::ServerError(format!(
"failed to deserialize state view: {}",
e
))
})?;
Ok(Some(value))
}
None => Ok(None),
}
}
Err(StateViewError::NotFound { .. }) => Ok(None),
Err(e) => Err(e),
}
}
}
}