use std::sync::Arc;
use std::time::Duration;
use chrono::{DateTime, Utc};
use futures::stream::{self, StreamExt};
use futures::Stream;
use nonempty::NonEmpty;
use tokio::sync::{watch, Mutex};
use tokio::task::JoinHandle;
use evidentsource_core::domain::{
AppendCondition, CommandRequest, DatabaseError, DatabaseName, ProspectiveEvent, Revision,
StateChangeError, StateChangeName, StateChangeVersion, Transaction, TransactionError,
TransactionSummary,
};
use evidentsource_core::{DatabaseConnection, DatabaseIdentity, DatabaseProvider};
use crate::com::evidentsource as proto;
use crate::conversions::{timestamp_to_datetime, ConversionError};
use crate::database::DatabaseAtRevisionImpl;
use crate::io::cloudevents::v1 as proto_ce;
use crate::EvidentSourceClient;
#[derive(Clone, Debug)]
pub struct DatabaseMetadata {
pub name: DatabaseName,
pub created_at: DateTime<Utc>,
pub revision: u64,
pub revision_timestamp: DateTime<Utc>,
}
impl DatabaseMetadata {
fn from_proto(proto: proto::Database) -> Result<Self, ConversionError> {
let name = DatabaseName::new(&proto.name)?;
let created_at = proto
.created_at
.ok_or_else(|| ConversionError::missing_field("Database", "created_at"))
.and_then(timestamp_to_datetime)?;
let revision_timestamp = proto
.revision_timestamp
.ok_or_else(|| ConversionError::missing_field("Database", "revision_timestamp"))
.and_then(timestamp_to_datetime)?;
Ok(Self {
name,
created_at,
revision: proto.revision,
revision_timestamp,
})
}
}
struct ConnectionInner {
client: EvidentSourceClient,
database_name: DatabaseName,
metadata_tx: watch::Sender<DatabaseMetadata>,
subscription_handle: Mutex<Option<JoinHandle<()>>>,
shutdown_tx: watch::Sender<bool>,
}
impl Drop for ConnectionInner {
fn drop(&mut self) {
let _ = self.shutdown_tx.send(true);
}
}
#[derive(Clone)]
pub struct Connection {
inner: Arc<ConnectionInner>,
metadata_rx: watch::Receiver<DatabaseMetadata>,
shutdown_rx: watch::Receiver<bool>,
}
impl Connection {
pub async fn new(
client: EvidentSourceClient,
database_name: DatabaseName,
) -> Result<Self, DatabaseError> {
let mut client_clone = client.clone();
let initial_db = client_clone
.fetch_latest_database(database_name.to_string())
.await
.map_err(|_| DatabaseError::NotFound(database_name.to_string()))?;
let initial_metadata = DatabaseMetadata::from_proto(initial_db)
.map_err(|e| DatabaseError::ServerError(format!("failed to parse metadata: {}", e)))?;
let (metadata_tx, metadata_rx) = watch::channel(initial_metadata);
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let inner = Arc::new(ConnectionInner {
client,
database_name: database_name.clone(),
metadata_tx,
subscription_handle: Mutex::new(None),
shutdown_tx,
});
let connection = Connection {
inner,
metadata_rx,
shutdown_rx,
};
connection.spawn_subscription_task().await;
Ok(connection)
}
async fn spawn_subscription_task(&self) {
let inner = self.inner.clone();
let db_name = self.inner.database_name.to_string();
let mut shutdown_rx = self.shutdown_rx.clone();
let handle = tokio::spawn(async move {
let mut backoff = Duration::from_millis(100);
let max_backoff = Duration::from_secs(30);
loop {
let stream_result = {
let mut client = inner.client.clone();
client.subscribe_database_updates(db_name.clone()).await
};
match stream_result {
Ok(mut stream) => {
backoff = Duration::from_millis(100);
loop {
tokio::select! {
_ = shutdown_rx.changed() => {
if *shutdown_rx.borrow() {
tracing::debug!("Subscription task shutting down");
return;
}
}
update = stream.next() => {
match update {
Some(Ok(reply)) => {
if let Some(db) = reply.database {
match DatabaseMetadata::from_proto(db) {
Ok(metadata) => {
let _ = inner.metadata_tx.send(metadata);
}
Err(e) => {
tracing::warn!("Failed to parse metadata: {}", e);
}
}
}
}
Some(Err(status)) => {
tracing::warn!("Stream error: {}", status);
break; }
None => {
tracing::debug!("Stream ended");
break; }
}
}
}
}
}
Err(e) => {
tracing::warn!("Failed to subscribe: {}", e);
}
}
if *shutdown_rx.borrow() {
return;
}
tracing::debug!("Reconnecting in {:?}", backoff);
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(max_backoff);
}
});
let mut guard = self.inner.subscription_handle.lock().await;
*guard = Some(handle);
}
pub async fn close(self) -> Result<(), DatabaseError> {
let _ = self.inner.shutdown_tx.send(true);
if let Some(handle) = self.inner.subscription_handle.lock().await.take() {
match tokio::time::timeout(Duration::from_secs(5), handle).await {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) => Err(DatabaseError::ServerError(format!(
"subscription task panicked: {}",
e
))),
Err(_) => Err(DatabaseError::Timeout),
}
} else {
Ok(())
}
}
fn current_metadata(&self) -> DatabaseMetadata {
self.metadata_rx.borrow().clone()
}
fn snapshot_from_metadata(&self, metadata: &DatabaseMetadata) -> DatabaseAtRevisionImpl {
DatabaseAtRevisionImpl::at_revision_with_metadata(
self.inner.client.clone(),
metadata.name.clone(),
metadata.created_at,
metadata.revision,
metadata.revision_timestamp,
)
}
}
impl std::fmt::Debug for Connection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Connection")
.field("database_name", &self.inner.database_name)
.field("current_revision", &self.metadata_rx.borrow().revision)
.finish()
}
}
impl DatabaseIdentity for Connection {
fn name(&self) -> &DatabaseName {
&self.inner.database_name
}
fn created_at(&self) -> DateTime<Utc> {
self.metadata_rx.borrow().created_at
}
}
impl DatabaseProvider for Connection {
type AtRevision = DatabaseAtRevisionImpl;
fn local_database(&self) -> Self::AtRevision {
let metadata = self.current_metadata();
self.snapshot_from_metadata(&metadata)
}
fn latest_database(
&self,
) -> impl std::future::Future<Output = Result<Self::AtRevision, DatabaseError>> {
let mut client = self.inner.client.clone();
let database_name = self.inner.database_name.to_string();
async move {
let proto_db = client
.fetch_latest_database(database_name.clone())
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => {
crate::status_mapping::to_database_error(status, &database_name)
}
_ => DatabaseError::ServerError(e.to_string()),
})?;
DatabaseAtRevisionImpl::new(client, proto_db)
.map_err(|e| DatabaseError::ServerError(format!("failed to parse database: {}", e)))
}
}
fn database_at_revision(
&self,
revision: u64,
) -> impl std::future::Future<Output = Result<Self::AtRevision, DatabaseError>> {
let mut client = self.inner.client.clone();
let database_name = self.inner.database_name.to_string();
async move {
let proto_db = client
.await_database(database_name.clone(), revision)
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => {
crate::status_mapping::to_database_error(status, &database_name)
}
_ => DatabaseError::ServerError(e.to_string()),
})?;
DatabaseAtRevisionImpl::new(client, proto_db)
.map_err(|e| DatabaseError::ServerError(format!("failed to parse database: {}", e)))
}
}
fn database_at_timestamp(
&self,
revision_timestamp: DateTime<Utc>,
) -> impl std::future::Future<Output = Result<Self::AtRevision, DatabaseError>> {
use crate::conversions::datetime_to_timestamp;
let mut client = self.inner.client.clone();
let database_name = self.inner.database_name.to_string();
let ts = datetime_to_timestamp(revision_timestamp);
async move {
let proto_db = client
.database_effective_at_timestamp(database_name.clone(), ts)
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => {
crate::status_mapping::to_database_error(status, &database_name)
}
_ => DatabaseError::ServerError(e.to_string()),
})?;
DatabaseAtRevisionImpl::new(client, proto_db)
.map_err(|e| DatabaseError::ServerError(format!("failed to parse database: {}", e)))
}
}
}
impl DatabaseConnection for Connection {
fn transact(
&self,
events: NonEmpty<ProspectiveEvent>,
conditions: Vec<AppendCondition>,
) -> impl std::future::Future<Output = Result<Self::AtRevision, DatabaseError>> {
let mut client = self.inner.client.clone();
let database_name = self.inner.database_name.to_string();
let proto_events: Vec<proto_ce::CloudEvent> =
events.into_iter().map(|pe| pe.into()).collect();
let proto_conditions: Vec<proto::AppendCondition> =
conditions.into_iter().map(|c| c.into()).collect();
let transaction_id = uuid::Uuid::new_v4().to_string();
async move {
let result = client
.transact(
transaction_id,
database_name.clone(),
proto_events,
proto_conditions,
)
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => {
crate::status_mapping::to_database_error(status, &database_name)
}
_ => DatabaseError::ServerError(e.to_string()),
})?;
let new_revision = result
.transaction_summary
.map(|s| s.revision)
.unwrap_or_default();
let proto_db = client
.await_database(database_name.clone(), new_revision)
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => {
crate::status_mapping::to_database_error(status, &database_name)
}
_ => DatabaseError::ServerError(e.to_string()),
})?;
DatabaseAtRevisionImpl::new(client, proto_db)
.map_err(|e| DatabaseError::ServerError(format!("failed to parse database: {}", e)))
}
}
fn execute_state_change(
&self,
name: &StateChangeName,
version: StateChangeVersion,
request: CommandRequest,
) -> impl std::future::Future<Output = Result<Self::AtRevision, StateChangeError>> {
let mut client = self.inner.client.clone();
let database_name = self.inner.database_name.to_string();
let state_change_name = name.to_string();
let current_revision = self.current_metadata().revision;
let proto_request = proto::CommandRequest {
headers: request
.headers
.into_iter()
.map(|(k, v)| proto::Header { key: k, value: v })
.collect(),
body: request.body,
content_type: request.content_type,
content_schema: request.content_schema,
};
async move {
let result = client
.execute_state_change(
database_name.clone(),
state_change_name.clone(),
version,
Some(current_revision),
proto_request,
None,
)
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => {
crate::status_mapping::to_state_change_error(
status,
&state_change_name,
version,
)
}
_ => StateChangeError::ServerError(e.to_string()),
})?;
let new_revision = result
.transaction_summary
.map(|s| s.revision)
.unwrap_or_default();
let proto_db = client
.await_database(database_name.clone(), new_revision)
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => StateChangeError::Database(
crate::status_mapping::to_database_error(status, &database_name),
),
_ => StateChangeError::Database(DatabaseError::ServerError(e.to_string())),
})?;
DatabaseAtRevisionImpl::new(client, proto_db).map_err(|e| {
StateChangeError::Database(DatabaseError::ServerError(format!(
"failed to parse database: {}",
e
)))
})
}
}
fn log(&self) -> impl Stream<Item = TransactionSummary> {
let mut client = self.inner.client.clone();
let database_name = self.inner.database_name.to_string();
stream::once(async move {
let result = client.scan_database_log(database_name, 0, false).await;
match result {
Ok(response_stream) => response_stream
.filter_map(|result| async move {
match result {
Ok(reply) => {
if let Some(proto::database_log_reply::Transaction::Summary(
summary,
)) = reply.transaction
{
Some(TransactionSummary::from(summary))
} else {
None
}
}
Err(_) => None,
}
})
.boxed(),
Err(_) => stream::empty().boxed(),
}
})
.flatten()
}
fn log_detail(&self) -> impl Stream<Item = Transaction> {
let mut client = self.inner.client.clone();
let database_name = self.inner.database_name.to_string();
stream::once(async move {
let result = client.scan_database_log(database_name, 0, true).await;
match result {
Ok(response_stream) => response_stream
.filter_map(|result| async move {
match result {
Ok(reply) => {
if let Some(proto::database_log_reply::Transaction::Detail(txn)) =
reply.transaction
{
Transaction::try_from(txn).ok()
} else {
None
}
}
Err(_) => None,
}
})
.boxed(),
Err(_) => stream::empty().boxed(),
}
})
.flatten()
}
fn transact_with_id(
&self,
transaction_id: &str,
events: NonEmpty<ProspectiveEvent>,
conditions: Vec<AppendCondition>,
) -> impl std::future::Future<Output = Result<Self::AtRevision, DatabaseError>> {
let mut client = self.inner.client.clone();
let database_name = self.inner.database_name.to_string();
let transaction_id = transaction_id.to_string();
let proto_events: Vec<proto_ce::CloudEvent> =
events.into_iter().map(|pe| pe.into()).collect();
let proto_conditions: Vec<proto::AppendCondition> =
conditions.into_iter().map(|c| c.into()).collect();
async move {
let result = client
.transact(
transaction_id,
database_name.clone(),
proto_events,
proto_conditions,
)
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => {
crate::status_mapping::to_database_error(status, &database_name)
}
_ => DatabaseError::ServerError(e.to_string()),
})?;
let new_revision = result
.transaction_summary
.map(|s| s.revision)
.unwrap_or_default();
let proto_db = client
.await_database(database_name.clone(), new_revision)
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => {
crate::status_mapping::to_database_error(status, &database_name)
}
_ => DatabaseError::ServerError(e.to_string()),
})?;
DatabaseAtRevisionImpl::new(client, proto_db)
.map_err(|e| DatabaseError::ServerError(format!("failed to parse database: {}", e)))
}
}
fn log_from(&self, from_revision: Revision) -> impl Stream<Item = TransactionSummary> {
let mut client = self.inner.client.clone();
let database_name = self.inner.database_name.to_string();
stream::once(async move {
let result = client
.scan_database_log(database_name, from_revision, false)
.await;
match result {
Ok(response_stream) => response_stream
.filter_map(|result| async move {
match result {
Ok(reply) => {
if let Some(proto::database_log_reply::Transaction::Summary(
summary,
)) = reply.transaction
{
Some(TransactionSummary::from(summary))
} else {
None
}
}
Err(_) => None,
}
})
.boxed(),
Err(_) => stream::empty().boxed(),
}
})
.flatten()
}
fn log_detail_from(&self, from_revision: Revision) -> impl Stream<Item = Transaction> {
let mut client = self.inner.client.clone();
let database_name = self.inner.database_name.to_string();
stream::once(async move {
let result = client
.scan_database_log(database_name, from_revision, true)
.await;
match result {
Ok(response_stream) => response_stream
.filter_map(|result| async move {
match result {
Ok(reply) => {
if let Some(proto::database_log_reply::Transaction::Detail(txn)) =
reply.transaction
{
Transaction::try_from(txn).ok()
} else {
None
}
}
Err(_) => None,
}
})
.boxed(),
Err(_) => stream::empty().boxed(),
}
})
.flatten()
}
}
impl Connection {
pub fn list_state_changes(
&self,
) -> impl Stream<Item = evidentsource_core::domain::StateChangeDefinitionSummary> {
use evidentsource_core::domain::{StateChangeDefinitionSummary, StateChangeName};
let mut client = self.inner.client.clone();
let database_name = self.inner.database_name.to_string();
stream::once(async move {
let result = client.list_state_changes(database_name).await;
match result {
Ok(response_stream) => response_stream
.filter_map(|result| async move {
match result {
Ok(reply) => StateChangeName::new(&reply.name).ok().map(|name| {
StateChangeDefinitionSummary {
name,
version: reply.version,
}
}),
Err(_) => None,
}
})
.boxed(),
Err(_) => stream::empty().boxed(),
}
})
.flatten()
}
pub async fn fetch_transaction_by_id(
&self,
transaction_id: &str,
) -> Result<Transaction, DatabaseError> {
let mut client = self.inner.client.clone();
let database_name = self.inner.database_name.to_string();
let reply = client
.fetch_transaction_by_id(database_name.clone(), transaction_id.to_string())
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => {
crate::status_mapping::to_database_error(status, &database_name)
}
_ => DatabaseError::ServerError(e.to_string()),
})?;
reply
.transaction
.ok_or_else(|| {
DatabaseError::ServerError("missing transaction in response".to_string())
})
.and_then(|txn| {
Transaction::try_from(txn)
.map_err(|e| DatabaseError::ServerError(format!("invalid transaction: {}", e)))
})
}
pub fn client(&self) -> &EvidentSourceClient {
&self.inner.client
}
pub fn transaction(&self) -> TransactionBuilder {
TransactionBuilder::new(self.clone())
}
pub fn state_change(&self, name: &str, version: StateChangeVersion) -> StateChangeBuilder {
StateChangeBuilder::new(self.clone(), name.to_string(), version)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CorrelationId(pub String);
impl std::fmt::Display for CorrelationId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<String> for CorrelationId {
fn from(s: String) -> Self {
Self(s)
}
}
pub trait DatabaseConnectionAsync {
fn transact_async(
&self,
events: NonEmpty<ProspectiveEvent>,
conditions: Vec<AppendCondition>,
) -> impl std::future::Future<Output = Result<CorrelationId, DatabaseError>>;
fn transact_async_with_id(
&self,
transaction_id: &str,
events: NonEmpty<ProspectiveEvent>,
conditions: Vec<AppendCondition>,
) -> impl std::future::Future<Output = Result<CorrelationId, DatabaseError>>;
fn execute_state_change_async(
&self,
name: &StateChangeName,
version: StateChangeVersion,
request: CommandRequest,
) -> impl std::future::Future<Output = Result<CorrelationId, StateChangeError>>;
}
impl DatabaseConnectionAsync for Connection {
fn transact_async(
&self,
events: NonEmpty<ProspectiveEvent>,
conditions: Vec<AppendCondition>,
) -> impl std::future::Future<Output = Result<CorrelationId, DatabaseError>> {
let mut client = self.inner.client.clone();
let database_name = self.inner.database_name.to_string();
let transaction_id = uuid::Uuid::new_v4().to_string();
let proto_events: Vec<proto_ce::CloudEvent> =
events.into_iter().map(|pe| pe.into()).collect();
let proto_conditions: Vec<proto::AppendCondition> =
conditions.into_iter().map(|c| c.into()).collect();
async move {
let response = client
.transact_async(
transaction_id,
database_name.clone(),
proto_events,
proto_conditions,
)
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => {
crate::status_mapping::to_database_error(status, &database_name)
}
_ => DatabaseError::ServerError(e.to_string()),
})?;
Ok(CorrelationId(response.correlation_id))
}
}
fn transact_async_with_id(
&self,
transaction_id: &str,
events: NonEmpty<ProspectiveEvent>,
conditions: Vec<AppendCondition>,
) -> impl std::future::Future<Output = Result<CorrelationId, DatabaseError>> {
let mut client = self.inner.client.clone();
let database_name = self.inner.database_name.to_string();
let transaction_id = transaction_id.to_string();
let proto_events: Vec<proto_ce::CloudEvent> =
events.into_iter().map(|pe| pe.into()).collect();
let proto_conditions: Vec<proto::AppendCondition> =
conditions.into_iter().map(|c| c.into()).collect();
async move {
let response = client
.transact_async(
transaction_id,
database_name.clone(),
proto_events,
proto_conditions,
)
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => {
crate::status_mapping::to_database_error(status, &database_name)
}
_ => DatabaseError::ServerError(e.to_string()),
})?;
Ok(CorrelationId(response.correlation_id))
}
}
fn execute_state_change_async(
&self,
name: &StateChangeName,
version: StateChangeVersion,
request: CommandRequest,
) -> impl std::future::Future<Output = Result<CorrelationId, StateChangeError>> {
let mut client = self.inner.client.clone();
let database_name = self.inner.database_name.to_string();
let state_change_name = name.to_string();
let current_revision = self.current_metadata().revision;
let proto_request = proto::CommandRequest {
headers: request
.headers
.into_iter()
.map(|(k, v)| proto::Header { key: k, value: v })
.collect(),
body: request.body,
content_type: request.content_type,
content_schema: request.content_schema,
};
async move {
let response = client
.execute_state_change_async(
database_name,
state_change_name.clone(),
version,
Some(current_revision),
proto_request,
None,
)
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => {
crate::status_mapping::to_state_change_error(
status,
&state_change_name,
version,
)
}
_ => StateChangeError::ServerError(e.to_string()),
})?;
Ok(CorrelationId(response.correlation_id))
}
}
}
pub struct TransactionBuilder {
connection: Connection,
events: Vec<ProspectiveEvent>,
conditions: Vec<AppendCondition>,
transaction_id: Option<String>,
correlation_id: Option<String>,
causation_id: Option<String>,
}
impl TransactionBuilder {
pub fn new(connection: Connection) -> Self {
Self {
connection,
events: Vec::new(),
conditions: Vec::new(),
transaction_id: None,
correlation_id: None,
causation_id: None,
}
}
pub fn event(mut self, event: ProspectiveEvent) -> Self {
self.events.push(event);
self
}
pub fn events(mut self, events: impl IntoIterator<Item = ProspectiveEvent>) -> Self {
self.events.extend(events);
self
}
pub fn condition(mut self, condition: AppendCondition) -> Self {
self.conditions.push(condition);
self
}
pub fn conditions(mut self, conditions: impl IntoIterator<Item = AppendCondition>) -> Self {
self.conditions.extend(conditions);
self
}
pub fn require_not_exists(self, selector: evidentsource_core::domain::EventSelector) -> Self {
self.condition(AppendCondition::must_not_exist(selector))
}
pub fn require_exists(self, selector: evidentsource_core::domain::EventSelector) -> Self {
self.condition(AppendCondition::must_exist(selector))
}
pub fn with_transaction_id(mut self, transaction_id: impl Into<String>) -> Self {
self.transaction_id = Some(transaction_id.into());
self
}
pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
self.correlation_id = Some(correlation_id.into());
self
}
pub fn with_causation_id(mut self, causation_id: impl Into<String>) -> Self {
self.causation_id = Some(causation_id.into());
self
}
pub async fn commit(self) -> Result<DatabaseAtRevisionImpl, DatabaseError> {
let events = NonEmpty::from_vec(self.events)
.ok_or(DatabaseError::Transaction(TransactionError::Empty))?;
let mut client = self.connection.inner.client.clone();
let database_name = self.connection.inner.database_name.to_string();
let transaction_id = self
.transaction_id
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let proto_events: Vec<proto_ce::CloudEvent> =
events.into_iter().map(|pe| pe.into()).collect();
let proto_conditions: Vec<proto::AppendCondition> =
self.conditions.into_iter().map(|c| c.into()).collect();
let result = client
.transact_with_options(
transaction_id,
database_name.clone(),
proto_events,
proto_conditions,
self.correlation_id,
self.causation_id,
)
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => {
crate::status_mapping::to_database_error(status, &database_name)
}
_ => DatabaseError::ServerError(e.to_string()),
})?;
let new_revision = result
.transaction_summary
.map(|s| s.revision)
.unwrap_or_default();
let proto_db = client
.await_database(database_name.clone(), new_revision)
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => {
crate::status_mapping::to_database_error(status, &database_name)
}
_ => DatabaseError::ServerError(e.to_string()),
})?;
DatabaseAtRevisionImpl::new(client, proto_db)
.map_err(|e| DatabaseError::ServerError(format!("failed to parse database: {}", e)))
}
pub async fn commit_async(self) -> Result<CorrelationId, DatabaseError> {
let events = NonEmpty::from_vec(self.events)
.ok_or(DatabaseError::Transaction(TransactionError::Empty))?;
let mut client = self.connection.inner.client.clone();
let database_name = self.connection.inner.database_name.to_string();
let transaction_id = self
.transaction_id
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let proto_events: Vec<proto_ce::CloudEvent> =
events.into_iter().map(|pe| pe.into()).collect();
let proto_conditions: Vec<proto::AppendCondition> =
self.conditions.into_iter().map(|c| c.into()).collect();
let response = client
.transact_async_with_options(
transaction_id,
database_name.clone(),
proto_events,
proto_conditions,
self.correlation_id,
self.causation_id,
)
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => {
crate::status_mapping::to_database_error(status, &database_name)
}
_ => DatabaseError::ServerError(e.to_string()),
})?;
Ok(CorrelationId(response.correlation_id))
}
}
pub struct StateChangeBuilder {
connection: Connection,
name: String,
version: StateChangeVersion,
request: Option<CommandRequest>,
transaction_id: Option<String>,
correlation_id: Option<String>,
causation_id: Option<String>,
}
impl StateChangeBuilder {
pub fn new(connection: Connection, name: String, version: StateChangeVersion) -> Self {
Self {
connection,
name,
version,
request: None,
transaction_id: None,
correlation_id: None,
causation_id: None,
}
}
pub fn json<T: ::serde::Serialize>(
mut self,
value: &T,
) -> Result<Self, evidentsource_core::domain::CommandRequestError> {
self.request = Some(CommandRequest::json(value)?);
Ok(self)
}
pub fn request(mut self, request: CommandRequest) -> Self {
self.request = Some(request);
self
}
pub fn with_transaction_id(mut self, transaction_id: impl Into<String>) -> Self {
self.transaction_id = Some(transaction_id.into());
self
}
pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
self.correlation_id = Some(correlation_id.into());
self
}
pub fn with_causation_id(mut self, causation_id: impl Into<String>) -> Self {
self.causation_id = Some(causation_id.into());
self
}
pub fn content_type(mut self, content_type: impl Into<String>) -> Self {
let request = self.request.take().unwrap_or_default();
self.request = Some(request.content_type(content_type));
self
}
pub fn content_schema(mut self, schema_url: impl Into<String>) -> Self {
let request = self.request.take().unwrap_or_default();
self.request = Some(request.content_schema(schema_url));
self
}
pub async fn execute(self) -> Result<DatabaseAtRevisionImpl, StateChangeError> {
let mut client = self.connection.inner.client.clone();
let database_name = self.connection.inner.database_name.to_string();
let state_change_name = self.name.clone();
let current_revision = self.connection.current_metadata().revision;
let request = self.request.unwrap_or_default();
let proto_request = proto::CommandRequest {
headers: request
.headers
.into_iter()
.map(|(k, v)| proto::Header { key: k, value: v })
.collect(),
body: request.body,
content_type: request.content_type,
content_schema: request.content_schema,
};
let result = client
.execute_state_change_with_options(
database_name.clone(),
state_change_name.clone(),
self.version,
Some(current_revision),
proto_request,
self.transaction_id,
self.correlation_id,
self.causation_id,
)
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => {
crate::status_mapping::to_state_change_error(
status,
&state_change_name,
self.version,
)
}
_ => StateChangeError::ServerError(e.to_string()),
})?;
let new_revision = result
.transaction_summary
.map(|s| s.revision)
.unwrap_or_default();
let proto_db = client
.await_database(database_name.clone(), new_revision)
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => StateChangeError::Database(
crate::status_mapping::to_database_error(status, &database_name),
),
_ => StateChangeError::Database(DatabaseError::ServerError(e.to_string())),
})?;
DatabaseAtRevisionImpl::new(client, proto_db).map_err(|e| {
StateChangeError::Database(DatabaseError::ServerError(format!(
"failed to parse database: {}",
e
)))
})
}
pub async fn execute_async(self) -> Result<CorrelationId, StateChangeError> {
let mut client = self.connection.inner.client.clone();
let database_name = self.connection.inner.database_name.to_string();
let state_change_name = self.name.clone();
let current_revision = self.connection.current_metadata().revision;
let request = self.request.unwrap_or_default();
let proto_request = proto::CommandRequest {
headers: request
.headers
.into_iter()
.map(|(k, v)| proto::Header { key: k, value: v })
.collect(),
body: request.body,
content_type: request.content_type,
content_schema: request.content_schema,
};
let response = client
.execute_state_change_async_with_options(
database_name,
state_change_name.clone(),
self.version,
Some(current_revision),
proto_request,
self.transaction_id,
self.correlation_id,
self.causation_id,
)
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => {
crate::status_mapping::to_state_change_error(
status,
&state_change_name,
self.version,
)
}
_ => StateChangeError::ServerError(e.to_string()),
})?;
Ok(CorrelationId(response.correlation_id))
}
}