#[allow(clippy::large_enum_variant)]
pub(crate) mod com {
pub mod evidentsource {
tonic::include_proto!("com.evidentsource");
}
}
pub(crate) mod io {
pub mod cloudevents {
pub mod v1 {
tonic::include_proto!("io.cloudevents.v1");
}
}
}
mod auth;
pub(crate) mod connection;
pub(crate) mod conversions;
pub(crate) mod database;
pub(crate) mod evident_source;
pub(crate) mod status_mapping;
pub mod client;
pub mod grpc;
pub mod prelude;
pub use auth::{AuthInterceptor, Credentials, DevModeCredentials};
pub use client::{Connection, EvidentSource};
use futures::Stream;
use http::Uri;
use thiserror::Error;
use tonic::{
service::interceptor::InterceptedService,
transport::{Channel, ClientTlsConfig},
Request,
};
use com::evidentsource::{
evident_source_client::EvidentSourceClient as Client, AwaitDatabaseRequest, CatalogRequest,
CreateDatabaseRequest, DatabaseEffectiveAtTimestampRequest, DatabaseUpdatesSubscriptionRequest,
DeleteDatabaseRequest, EventByIdRequest, EventQueryRequest, EventsByRevisionsRequest,
ExecuteStateChangeRequest, FetchStateViewRequest, FetchTransactionByIdRequest,
IndexKeyScanRequest, LatestDatabaseRequest, ListStateChangesRequest,
ListStateViewDefinitionsRequest, LogScanRequest, TransactionRequest,
};
#[derive(Error, Debug)]
pub enum Error {
#[error("invalid URI: {0}")]
InvalidUri(#[from] http::uri::InvalidUri),
#[error(transparent)]
Transport(#[from] tonic::transport::Error),
#[error(transparent)]
GrpcStatus(#[from] tonic::Status),
}
#[derive(Clone, Debug)]
pub struct EvidentSourceClient {
client: Client<InterceptedService<Channel, AuthInterceptor>>,
}
type ClientResult<T> = Result<T, Error>;
impl EvidentSourceClient {
pub async fn new(addr: &str) -> ClientResult<Self> {
Self::with_credentials(addr, Credentials::None).await
}
pub async fn with_credentials(addr: &str, credentials: Credentials) -> ClientResult<Self> {
let channel = Self::build_channel(addr).await?;
let interceptor = AuthInterceptor::new(credentials);
let client = Client::with_interceptor(channel, interceptor);
Ok(Self { client })
}
async fn build_channel(addr: &str) -> ClientResult<Channel> {
let uri = Uri::try_from(addr)?;
let mut channel_builder = Channel::builder(uri.clone());
if let Some("https") = uri.scheme_str() {
let tls_config = ClientTlsConfig::new()
.with_native_roots()
.domain_name(uri.host().unwrap());
channel_builder = channel_builder.tls_config(tls_config)?;
}
Ok(channel_builder.connect().await?)
}
pub async fn create_database(
&mut self,
database_name: String,
) -> ClientResult<com::evidentsource::Database> {
let request = Request::new(CreateDatabaseRequest { database_name });
let response = self.client.create_database(request).await?;
Ok(response.into_inner().database.unwrap())
}
pub async fn transact(
&mut self,
transaction_id: String,
database_name: String,
events: Vec<io::cloudevents::v1::CloudEvent>,
conditions: Vec<com::evidentsource::AppendCondition>,
) -> ClientResult<com::evidentsource::TransactionResult> {
self.transact_with_options(
transaction_id,
database_name,
events,
conditions,
None,
None,
)
.await
}
pub async fn transact_with_options(
&mut self,
transaction_id: String,
database_name: String,
events: Vec<io::cloudevents::v1::CloudEvent>,
conditions: Vec<com::evidentsource::AppendCondition>,
correlation_id: Option<String>,
causation_id: Option<String>,
) -> ClientResult<com::evidentsource::TransactionResult> {
let request = Request::new(TransactionRequest {
transaction_id,
database_name,
events,
conditions,
last_read_revision: None,
principal_attributes: Default::default(),
commit_message: None,
correlation_id,
causation_id,
});
let response = self.client.transact(request).await?;
Ok(response.into_inner().result.unwrap())
}
pub async fn delete_database(
&mut self,
database_name: String,
) -> ClientResult<com::evidentsource::Database> {
let request = Request::new(DeleteDatabaseRequest { database_name });
let response = self.client.delete_database(request).await?;
Ok(response.into_inner().database.unwrap())
}
pub async fn fetch_catalog(
&mut self,
) -> ClientResult<impl Stream<Item = Result<com::evidentsource::CatalogReply, tonic::Status>>>
{
let request = Request::new(CatalogRequest {});
let response = self.client.fetch_catalog(request).await?;
Ok(response.into_inner())
}
pub async fn fetch_latest_database(
&mut self,
database_name: String,
) -> ClientResult<com::evidentsource::Database> {
let request = Request::new(LatestDatabaseRequest { database_name });
let response = self.client.fetch_latest_database(request).await?;
Ok(response.into_inner().database.unwrap())
}
pub async fn await_database(
&mut self,
database_name: String,
at_revision: u64,
) -> ClientResult<com::evidentsource::Database> {
let request = Request::new(AwaitDatabaseRequest {
database_name,
at_revision,
});
let response = self.client.await_database(request).await?;
Ok(response.into_inner().database.unwrap())
}
pub async fn database_effective_at_timestamp(
&mut self,
database_name: String,
at_timestamp: prost_types::Timestamp,
) -> ClientResult<com::evidentsource::Database> {
let request = Request::new(DatabaseEffectiveAtTimestampRequest {
database_name,
at_timestamp: Some(at_timestamp),
});
let response = self.client.database_effective_at_timestamp(request).await?;
Ok(response.into_inner().database.unwrap())
}
pub async fn subscribe_database_updates(
&mut self,
database_name: String,
) -> ClientResult<impl Stream<Item = Result<com::evidentsource::DatabaseReply, tonic::Status>>>
{
let request = Request::new(DatabaseUpdatesSubscriptionRequest { database_name });
let response = self.client.subscribe_database_updates(request).await?;
Ok(response.into_inner())
}
pub async fn scan_database_log(
&mut self,
database_name: String,
start_at_revision: u64,
include_event_detail: bool,
) -> ClientResult<impl Stream<Item = Result<com::evidentsource::DatabaseLogReply, tonic::Status>>>
{
let request = Request::new(LogScanRequest {
database_name,
start_at_revision,
include_event_detail,
});
let response = self.client.scan_database_log(request).await?;
Ok(response.into_inner())
}
pub async fn scan_index_keys(
&mut self,
database_name: String,
revision: u64,
index_key_type: com::evidentsource::index_key_scan_request::IndexKeyType,
) -> ClientResult<
impl Stream<Item = Result<com::evidentsource::IndexKeyScanReply, tonic::Status>>,
> {
let request = Request::new(IndexKeyScanRequest {
database_name,
revision,
index_key_type: index_key_type.into(),
});
let response = self.client.scan_index_keys(request).await?;
Ok(response.into_inner())
}
pub async fn query_events(
&mut self,
database_name: String,
revision: u64,
include_event_detail: bool,
query: com::evidentsource::DatabaseQuery,
) -> ClientResult<impl Stream<Item = Result<com::evidentsource::EventQueryReply, tonic::Status>>>
{
let request = Request::new(EventQueryRequest {
database_name,
revision,
include_event_detail,
query: Some(query),
});
let response = self.client.query_events(request).await?;
Ok(response.into_inner())
}
pub async fn event_by_id(
&mut self,
database_name: String,
revision: u64,
stream: String,
event_id: String,
) -> ClientResult<com::evidentsource::EventQueryReply> {
let request = Request::new(EventByIdRequest {
database_name,
revision,
stream,
event_id,
});
let response = self.client.event_by_id(request).await?;
Ok(response.into_inner())
}
pub async fn fetch_events_by_revisions(
&mut self,
database_name: String,
event_revisions: Vec<u64>,
) -> ClientResult<com::evidentsource::EventsReply> {
let request = Request::new(EventsByRevisionsRequest {
database_name,
event_revisions,
});
let response = self.client.fetch_events_by_revisions(request).await?;
Ok(response.into_inner())
}
pub async fn list_state_view_definitions(
&mut self,
database_name: String,
status: Option<com::evidentsource::StateViewStatus>,
) -> ClientResult<
impl Stream<Item = Result<com::evidentsource::ListStateViewDefinitionsReply, tonic::Status>>,
> {
let request = Request::new(ListStateViewDefinitionsRequest {
database_name,
status: status.map(|s| s.into()),
});
let response = self.client.list_state_view_definitions(request).await?;
Ok(response.into_inner())
}
pub async fn fetch_state_view_at_revision(
&mut self,
state_view_identity: Option<com::evidentsource::StateViewIdentity>,
database_revision: u64,
parameters: Option<com::evidentsource::ParameterBindings>,
effective_time_end_at: Option<prost_types::Timestamp>,
) -> ClientResult<com::evidentsource::StateView> {
let request = Request::new(FetchStateViewRequest {
state_view_identity,
database_revision,
parameters,
effective_time_end_at,
});
let response = self.client.fetch_state_view_at_revision(request).await?;
Ok(response.into_inner().state_view.unwrap())
}
pub async fn execute_state_change(
&mut self,
database_name: String,
state_change_name: String,
version: u64,
last_seen_revision: Option<u64>,
request: com::evidentsource::CommandRequest,
transaction_id: Option<String>,
) -> ClientResult<com::evidentsource::TransactionResult> {
self.execute_state_change_with_options(
database_name,
state_change_name,
version,
last_seen_revision,
request,
transaction_id,
None,
None,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn execute_state_change_with_options(
&mut self,
database_name: String,
state_change_name: String,
version: u64,
last_seen_revision: Option<u64>,
request: com::evidentsource::CommandRequest,
transaction_id: Option<String>,
correlation_id: Option<String>,
causation_id: Option<String>,
) -> ClientResult<com::evidentsource::TransactionResult> {
let request = Request::new(ExecuteStateChangeRequest {
database_name,
state_change_name,
version,
last_seen_revision,
request: Some(request),
transaction_id,
principal_attributes: Default::default(),
commit_message: None,
correlation_id,
causation_id,
});
let response = self.client.execute_state_change(request).await?;
Ok(response.into_inner().result.unwrap())
}
pub async fn transact_async(
&mut self,
transaction_id: String,
database_name: String,
events: Vec<io::cloudevents::v1::CloudEvent>,
conditions: Vec<com::evidentsource::AppendCondition>,
) -> ClientResult<com::evidentsource::AsyncCommandResponse> {
self.transact_async_with_options(
transaction_id,
database_name,
events,
conditions,
None,
None,
)
.await
}
pub async fn transact_async_with_options(
&mut self,
transaction_id: String,
database_name: String,
events: Vec<io::cloudevents::v1::CloudEvent>,
conditions: Vec<com::evidentsource::AppendCondition>,
correlation_id: Option<String>,
causation_id: Option<String>,
) -> ClientResult<com::evidentsource::AsyncCommandResponse> {
let request = Request::new(TransactionRequest {
transaction_id,
database_name,
events,
conditions,
last_read_revision: None,
principal_attributes: Default::default(),
commit_message: None,
correlation_id,
causation_id,
});
let response = self.client.transact_async(request).await?;
Ok(response.into_inner())
}
pub async fn execute_state_change_async(
&mut self,
database_name: String,
state_change_name: String,
version: u64,
last_seen_revision: Option<u64>,
request: com::evidentsource::CommandRequest,
transaction_id: Option<String>,
) -> ClientResult<com::evidentsource::AsyncCommandResponse> {
self.execute_state_change_async_with_options(
database_name,
state_change_name,
version,
last_seen_revision,
request,
transaction_id,
None,
None,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn execute_state_change_async_with_options(
&mut self,
database_name: String,
state_change_name: String,
version: u64,
last_seen_revision: Option<u64>,
request: com::evidentsource::CommandRequest,
transaction_id: Option<String>,
correlation_id: Option<String>,
causation_id: Option<String>,
) -> ClientResult<com::evidentsource::AsyncCommandResponse> {
let request = Request::new(ExecuteStateChangeRequest {
database_name,
state_change_name,
version,
last_seen_revision,
request: Some(request),
transaction_id,
principal_attributes: Default::default(),
commit_message: None,
correlation_id,
causation_id,
});
let response = self.client.execute_state_change_async(request).await?;
Ok(response.into_inner())
}
pub async fn list_state_changes(
&mut self,
database_name: String,
) -> ClientResult<
impl Stream<Item = Result<com::evidentsource::ListStateChangesReply, tonic::Status>>,
> {
let request = Request::new(ListStateChangesRequest { database_name });
let response = self.client.list_state_changes(request).await?;
Ok(response.into_inner())
}
pub async fn fetch_transaction_by_id(
&mut self,
database_name: String,
transaction_id: String,
) -> ClientResult<com::evidentsource::FetchTransactionReply> {
let request = Request::new(FetchTransactionByIdRequest {
database_name,
transaction_id,
});
let response = self.client.fetch_transaction_by_id(request).await?;
Ok(response.into_inner())
}
}
#[cfg(test)]
mod tests {
#[test]
fn it_works() {}
}