1use std::sync::Arc;
4use std::time::Duration;
5
6use chrono::{DateTime, Utc};
7use futures::stream::{self, StreamExt};
8use futures::Stream;
9use nonempty::NonEmpty;
10use tokio::sync::{watch, Mutex};
11use tokio::task::JoinHandle;
12
13use evidentsource_core::domain::{
14 AppendCondition, CommandRequest, DatabaseError, DatabaseName, ProspectiveEvent, Revision,
15 StateChangeError, StateChangeName, StateChangeVersion, Transaction, TransactionError,
16 TransactionSummary,
17};
18use evidentsource_core::{DatabaseConnection, DatabaseIdentity, DatabaseProvider};
19
20use crate::com::evidentsource as proto;
21use crate::conversions::{timestamp_to_datetime, ConversionError};
22use crate::database::DatabaseAtRevisionImpl;
23use crate::io::cloudevents::v1 as proto_ce;
24use crate::EvidentSourceClient;
25
26#[derive(Clone, Debug)]
28pub struct DatabaseMetadata {
29 pub name: DatabaseName,
31 pub created_at: DateTime<Utc>,
33 pub revision: u64,
35 pub revision_timestamp: DateTime<Utc>,
37}
38
39impl DatabaseMetadata {
40 fn from_proto(proto: proto::Database) -> Result<Self, ConversionError> {
42 let name = DatabaseName::new(&proto.name)?;
43 let created_at = proto
44 .created_at
45 .ok_or_else(|| ConversionError::missing_field("Database", "created_at"))
46 .and_then(timestamp_to_datetime)?;
47 let revision_timestamp = proto
48 .revision_timestamp
49 .ok_or_else(|| ConversionError::missing_field("Database", "revision_timestamp"))
50 .and_then(timestamp_to_datetime)?;
51
52 Ok(Self {
53 name,
54 created_at,
55 revision: proto.revision,
56 revision_timestamp,
57 })
58 }
59}
60
61struct ConnectionInner {
63 client: EvidentSourceClient,
65 database_name: DatabaseName,
67 metadata_tx: watch::Sender<DatabaseMetadata>,
69 subscription_handle: Mutex<Option<JoinHandle<()>>>,
71 shutdown_tx: watch::Sender<bool>,
73}
74
75impl Drop for ConnectionInner {
76 fn drop(&mut self) {
77 let _ = self.shutdown_tx.send(true);
79 }
80}
81
82#[derive(Clone)]
99pub struct Connection {
100 inner: Arc<ConnectionInner>,
102 metadata_rx: watch::Receiver<DatabaseMetadata>,
105 shutdown_rx: watch::Receiver<bool>,
107}
108
109impl Connection {
110 pub async fn new(
115 client: EvidentSourceClient,
116 database_name: DatabaseName,
117 ) -> Result<Self, DatabaseError> {
118 let mut client_clone = client.clone();
119
120 let initial_db = client_clone
122 .fetch_latest_database(database_name.to_string())
123 .await
124 .map_err(|_| DatabaseError::NotFound(database_name.to_string()))?;
125
126 let initial_metadata = DatabaseMetadata::from_proto(initial_db)
127 .map_err(|e| DatabaseError::ServerError(format!("failed to parse metadata: {}", e)))?;
128
129 let (metadata_tx, metadata_rx) = watch::channel(initial_metadata);
131
132 let (shutdown_tx, shutdown_rx) = watch::channel(false);
134
135 let inner = Arc::new(ConnectionInner {
136 client,
137 database_name: database_name.clone(),
138 metadata_tx,
139 subscription_handle: Mutex::new(None),
140 shutdown_tx,
141 });
142
143 let connection = Connection {
144 inner,
145 metadata_rx,
146 shutdown_rx,
147 };
148
149 connection.spawn_subscription_task().await;
151
152 Ok(connection)
153 }
154
155 async fn spawn_subscription_task(&self) {
157 let inner = self.inner.clone();
158 let db_name = self.inner.database_name.to_string();
159 let mut shutdown_rx = self.shutdown_rx.clone();
160
161 let handle = tokio::spawn(async move {
162 let mut backoff = Duration::from_millis(100);
163 let max_backoff = Duration::from_secs(30);
164
165 loop {
166 let stream_result = {
168 let mut client = inner.client.clone();
169 client.subscribe_database_updates(db_name.clone()).await
170 };
171
172 match stream_result {
173 Ok(mut stream) => {
174 backoff = Duration::from_millis(100); loop {
178 tokio::select! {
179 _ = shutdown_rx.changed() => {
181 if *shutdown_rx.borrow() {
182 tracing::debug!("Subscription task shutting down");
183 return;
184 }
185 }
186
187 update = stream.next() => {
189 match update {
190 Some(Ok(reply)) => {
191 if let Some(db) = reply.database {
192 match DatabaseMetadata::from_proto(db) {
193 Ok(metadata) => {
194 let _ = inner.metadata_tx.send(metadata);
196 }
197 Err(e) => {
198 tracing::warn!("Failed to parse metadata: {}", e);
199 }
200 }
201 }
202 }
203 Some(Err(status)) => {
204 tracing::warn!("Stream error: {}", status);
205 break; }
207 None => {
208 tracing::debug!("Stream ended");
209 break; }
211 }
212 }
213 }
214 }
215 }
216 Err(e) => {
217 tracing::warn!("Failed to subscribe: {}", e);
218 }
219 }
220
221 if *shutdown_rx.borrow() {
223 return;
224 }
225
226 tracing::debug!("Reconnecting in {:?}", backoff);
228 tokio::time::sleep(backoff).await;
229 backoff = (backoff * 2).min(max_backoff);
230 }
231 });
232
233 let mut guard = self.inner.subscription_handle.lock().await;
235 *guard = Some(handle);
236 }
237
238 pub async fn close(self) -> Result<(), DatabaseError> {
240 let _ = self.inner.shutdown_tx.send(true);
242
243 if let Some(handle) = self.inner.subscription_handle.lock().await.take() {
245 match tokio::time::timeout(Duration::from_secs(5), handle).await {
247 Ok(Ok(())) => Ok(()),
248 Ok(Err(e)) => Err(DatabaseError::ServerError(format!(
249 "subscription task panicked: {}",
250 e
251 ))),
252 Err(_) => Err(DatabaseError::Timeout),
253 }
254 } else {
255 Ok(())
256 }
257 }
258
259 fn current_metadata(&self) -> DatabaseMetadata {
261 self.metadata_rx.borrow().clone()
262 }
263
264 fn snapshot_from_metadata(&self, metadata: &DatabaseMetadata) -> DatabaseAtRevisionImpl {
266 DatabaseAtRevisionImpl::at_revision_with_metadata(
267 self.inner.client.clone(),
268 metadata.name.clone(),
269 metadata.created_at,
270 metadata.revision,
271 metadata.revision_timestamp,
272 )
273 }
274}
275
276impl std::fmt::Debug for Connection {
277 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278 f.debug_struct("Connection")
279 .field("database_name", &self.inner.database_name)
280 .field("current_revision", &self.metadata_rx.borrow().revision)
281 .finish()
282 }
283}
284
285impl DatabaseIdentity for Connection {
286 fn name(&self) -> &DatabaseName {
287 &self.inner.database_name
288 }
289
290 fn created_at(&self) -> DateTime<Utc> {
291 self.metadata_rx.borrow().created_at
292 }
293}
294
295impl DatabaseProvider for Connection {
296 type AtRevision = DatabaseAtRevisionImpl;
297
298 fn local_database(&self) -> Self::AtRevision {
299 let metadata = self.current_metadata();
300 self.snapshot_from_metadata(&metadata)
301 }
302
303 fn latest_database(
304 &self,
305 ) -> impl std::future::Future<Output = Result<Self::AtRevision, DatabaseError>> {
306 let mut client = self.inner.client.clone();
307 let database_name = self.inner.database_name.to_string();
308
309 async move {
310 let proto_db = client
311 .fetch_latest_database(database_name.clone())
312 .await
313 .map_err(|e| match e {
314 crate::Error::GrpcStatus(ref status) => {
315 crate::status_mapping::to_database_error(status, &database_name)
316 }
317 _ => DatabaseError::ServerError(e.to_string()),
318 })?;
319
320 DatabaseAtRevisionImpl::new(client, proto_db)
321 .map_err(|e| DatabaseError::ServerError(format!("failed to parse database: {}", e)))
322 }
323 }
324
325 fn database_at_revision(
326 &self,
327 revision: u64,
328 ) -> impl std::future::Future<Output = Result<Self::AtRevision, DatabaseError>> {
329 let mut client = self.inner.client.clone();
330 let database_name = self.inner.database_name.to_string();
331
332 async move {
333 let proto_db = client
334 .await_database(database_name.clone(), revision)
335 .await
336 .map_err(|e| match e {
337 crate::Error::GrpcStatus(ref status) => {
338 crate::status_mapping::to_database_error(status, &database_name)
339 }
340 _ => DatabaseError::ServerError(e.to_string()),
341 })?;
342
343 DatabaseAtRevisionImpl::new(client, proto_db)
344 .map_err(|e| DatabaseError::ServerError(format!("failed to parse database: {}", e)))
345 }
346 }
347
348 fn database_at_timestamp(
349 &self,
350 revision_timestamp: DateTime<Utc>,
351 ) -> impl std::future::Future<Output = Result<Self::AtRevision, DatabaseError>> {
352 use crate::conversions::datetime_to_timestamp;
353
354 let mut client = self.inner.client.clone();
355 let database_name = self.inner.database_name.to_string();
356 let ts = datetime_to_timestamp(revision_timestamp);
357
358 async move {
359 let proto_db = client
360 .database_effective_at_timestamp(database_name.clone(), ts)
361 .await
362 .map_err(|e| match e {
363 crate::Error::GrpcStatus(ref status) => {
364 crate::status_mapping::to_database_error(status, &database_name)
365 }
366 _ => DatabaseError::ServerError(e.to_string()),
367 })?;
368
369 DatabaseAtRevisionImpl::new(client, proto_db)
370 .map_err(|e| DatabaseError::ServerError(format!("failed to parse database: {}", e)))
371 }
372 }
373}
374
375impl DatabaseConnection for Connection {
376 fn transact(
377 &self,
378 events: NonEmpty<ProspectiveEvent>,
379 conditions: Vec<AppendCondition>,
380 ) -> impl std::future::Future<Output = Result<Self::AtRevision, DatabaseError>> {
381 let mut client = self.inner.client.clone();
382 let database_name = self.inner.database_name.to_string();
383
384 let proto_events: Vec<proto_ce::CloudEvent> =
386 events.into_iter().map(|pe| pe.into()).collect();
387 let proto_conditions: Vec<proto::AppendCondition> =
388 conditions.into_iter().map(|c| c.into()).collect();
389 let transaction_id = uuid::Uuid::new_v4().to_string();
390
391 async move {
392 let result = client
393 .transact(
394 transaction_id,
395 database_name.clone(),
396 proto_events,
397 proto_conditions,
398 )
399 .await
400 .map_err(|e| match e {
401 crate::Error::GrpcStatus(ref status) => {
402 crate::status_mapping::to_database_error(status, &database_name)
403 }
404 _ => DatabaseError::ServerError(e.to_string()),
405 })?;
406
407 let new_revision = result
409 .transaction_summary
410 .map(|s| s.revision)
411 .unwrap_or_default();
412
413 let proto_db = client
415 .await_database(database_name.clone(), new_revision)
416 .await
417 .map_err(|e| match e {
418 crate::Error::GrpcStatus(ref status) => {
419 crate::status_mapping::to_database_error(status, &database_name)
420 }
421 _ => DatabaseError::ServerError(e.to_string()),
422 })?;
423
424 DatabaseAtRevisionImpl::new(client, proto_db)
425 .map_err(|e| DatabaseError::ServerError(format!("failed to parse database: {}", e)))
426 }
427 }
428
429 fn execute_state_change(
430 &self,
431 name: &StateChangeName,
432 version: StateChangeVersion,
433 request: CommandRequest,
434 ) -> impl std::future::Future<Output = Result<Self::AtRevision, StateChangeError>> {
435 let mut client = self.inner.client.clone();
436 let database_name = self.inner.database_name.to_string();
437 let state_change_name = name.to_string();
438 let current_revision = self.current_metadata().revision;
439
440 let proto_request = proto::CommandRequest {
442 headers: request
443 .headers
444 .into_iter()
445 .map(|(k, v)| proto::Header { key: k, value: v })
446 .collect(),
447 body: request.body,
448 content_type: request.content_type,
449 content_schema: request.content_schema,
450 };
451
452 async move {
453 let result = client
454 .execute_state_change(
455 database_name.clone(),
456 state_change_name.clone(),
457 version,
458 Some(current_revision),
459 proto_request,
460 None,
461 )
462 .await
463 .map_err(|e| match e {
464 crate::Error::GrpcStatus(ref status) => {
465 crate::status_mapping::to_state_change_error(
466 status,
467 &state_change_name,
468 version,
469 )
470 }
471 _ => StateChangeError::ServerError(e.to_string()),
472 })?;
473
474 let new_revision = result
476 .transaction_summary
477 .map(|s| s.revision)
478 .unwrap_or_default();
479
480 let proto_db = client
482 .await_database(database_name.clone(), new_revision)
483 .await
484 .map_err(|e| match e {
485 crate::Error::GrpcStatus(ref status) => StateChangeError::Database(
486 crate::status_mapping::to_database_error(status, &database_name),
487 ),
488 _ => StateChangeError::Database(DatabaseError::ServerError(e.to_string())),
489 })?;
490
491 DatabaseAtRevisionImpl::new(client, proto_db).map_err(|e| {
492 StateChangeError::Database(DatabaseError::ServerError(format!(
493 "failed to parse database: {}",
494 e
495 )))
496 })
497 }
498 }
499
500 fn log(&self) -> impl Stream<Item = TransactionSummary> {
501 let mut client = self.inner.client.clone();
502 let database_name = self.inner.database_name.to_string();
503
504 stream::once(async move {
505 let result = client.scan_database_log(database_name, 0, false).await;
506
507 match result {
508 Ok(response_stream) => response_stream
509 .filter_map(|result| async move {
510 match result {
511 Ok(reply) => {
512 if let Some(proto::database_log_reply::Transaction::Summary(
513 summary,
514 )) = reply.transaction
515 {
516 Some(TransactionSummary::from(summary))
517 } else {
518 None
519 }
520 }
521 Err(_) => None,
522 }
523 })
524 .boxed(),
525 Err(_) => stream::empty().boxed(),
526 }
527 })
528 .flatten()
529 }
530
531 fn log_detail(&self) -> impl Stream<Item = Transaction> {
532 let mut client = self.inner.client.clone();
533 let database_name = self.inner.database_name.to_string();
534
535 stream::once(async move {
536 let result = client.scan_database_log(database_name, 0, true).await;
537
538 match result {
539 Ok(response_stream) => response_stream
540 .filter_map(|result| async move {
541 match result {
542 Ok(reply) => {
543 if let Some(proto::database_log_reply::Transaction::Detail(txn)) =
544 reply.transaction
545 {
546 Transaction::try_from(txn).ok()
547 } else {
548 None
549 }
550 }
551 Err(_) => None,
552 }
553 })
554 .boxed(),
555 Err(_) => stream::empty().boxed(),
556 }
557 })
558 .flatten()
559 }
560
561 fn transact_with_id(
562 &self,
563 transaction_id: &str,
564 events: NonEmpty<ProspectiveEvent>,
565 conditions: Vec<AppendCondition>,
566 ) -> impl std::future::Future<Output = Result<Self::AtRevision, DatabaseError>> {
567 let mut client = self.inner.client.clone();
568 let database_name = self.inner.database_name.to_string();
569 let transaction_id = transaction_id.to_string();
570
571 let proto_events: Vec<proto_ce::CloudEvent> =
573 events.into_iter().map(|pe| pe.into()).collect();
574 let proto_conditions: Vec<proto::AppendCondition> =
575 conditions.into_iter().map(|c| c.into()).collect();
576
577 async move {
578 let result = client
579 .transact(
580 transaction_id,
581 database_name.clone(),
582 proto_events,
583 proto_conditions,
584 )
585 .await
586 .map_err(|e| match e {
587 crate::Error::GrpcStatus(ref status) => {
588 crate::status_mapping::to_database_error(status, &database_name)
589 }
590 _ => DatabaseError::ServerError(e.to_string()),
591 })?;
592
593 let new_revision = result
595 .transaction_summary
596 .map(|s| s.revision)
597 .unwrap_or_default();
598
599 let proto_db = client
601 .await_database(database_name.clone(), new_revision)
602 .await
603 .map_err(|e| match e {
604 crate::Error::GrpcStatus(ref status) => {
605 crate::status_mapping::to_database_error(status, &database_name)
606 }
607 _ => DatabaseError::ServerError(e.to_string()),
608 })?;
609
610 DatabaseAtRevisionImpl::new(client, proto_db)
611 .map_err(|e| DatabaseError::ServerError(format!("failed to parse database: {}", e)))
612 }
613 }
614
615 fn log_from(&self, from_revision: Revision) -> impl Stream<Item = TransactionSummary> {
616 let mut client = self.inner.client.clone();
617 let database_name = self.inner.database_name.to_string();
618
619 stream::once(async move {
620 let result = client
621 .scan_database_log(database_name, from_revision, false)
622 .await;
623
624 match result {
625 Ok(response_stream) => response_stream
626 .filter_map(|result| async move {
627 match result {
628 Ok(reply) => {
629 if let Some(proto::database_log_reply::Transaction::Summary(
630 summary,
631 )) = reply.transaction
632 {
633 Some(TransactionSummary::from(summary))
634 } else {
635 None
636 }
637 }
638 Err(_) => None,
639 }
640 })
641 .boxed(),
642 Err(_) => stream::empty().boxed(),
643 }
644 })
645 .flatten()
646 }
647
648 fn log_detail_from(&self, from_revision: Revision) -> impl Stream<Item = Transaction> {
649 let mut client = self.inner.client.clone();
650 let database_name = self.inner.database_name.to_string();
651
652 stream::once(async move {
653 let result = client
654 .scan_database_log(database_name, from_revision, true)
655 .await;
656
657 match result {
658 Ok(response_stream) => response_stream
659 .filter_map(|result| async move {
660 match result {
661 Ok(reply) => {
662 if let Some(proto::database_log_reply::Transaction::Detail(txn)) =
663 reply.transaction
664 {
665 Transaction::try_from(txn).ok()
666 } else {
667 None
668 }
669 }
670 Err(_) => None,
671 }
672 })
673 .boxed(),
674 Err(_) => stream::empty().boxed(),
675 }
676 })
677 .flatten()
678 }
679}
680
681impl Connection {
686 pub fn list_state_changes(
699 &self,
700 ) -> impl Stream<Item = evidentsource_core::domain::StateChangeDefinitionSummary> {
701 use evidentsource_core::domain::{StateChangeDefinitionSummary, StateChangeName};
702
703 let mut client = self.inner.client.clone();
704 let database_name = self.inner.database_name.to_string();
705
706 stream::once(async move {
707 let result = client.list_state_changes(database_name).await;
708
709 match result {
710 Ok(response_stream) => response_stream
711 .filter_map(|result| async move {
712 match result {
713 Ok(reply) => StateChangeName::new(&reply.name).ok().map(|name| {
714 StateChangeDefinitionSummary {
715 name,
716 version: reply.version,
717 }
718 }),
719 Err(_) => None,
720 }
721 })
722 .boxed(),
723 Err(_) => stream::empty().boxed(),
724 }
725 })
726 .flatten()
727 }
728
729 pub async fn fetch_transaction_by_id(
738 &self,
739 transaction_id: &str,
740 ) -> Result<Transaction, DatabaseError> {
741 let mut client = self.inner.client.clone();
742 let database_name = self.inner.database_name.to_string();
743
744 let reply = client
745 .fetch_transaction_by_id(database_name.clone(), transaction_id.to_string())
746 .await
747 .map_err(|e| match e {
748 crate::Error::GrpcStatus(ref status) => {
749 crate::status_mapping::to_database_error(status, &database_name)
750 }
751 _ => DatabaseError::ServerError(e.to_string()),
752 })?;
753
754 reply
755 .transaction
756 .ok_or_else(|| {
757 DatabaseError::ServerError("missing transaction in response".to_string())
758 })
759 .and_then(|txn| {
760 Transaction::try_from(txn)
761 .map_err(|e| DatabaseError::ServerError(format!("invalid transaction: {}", e)))
762 })
763 }
764
765 pub fn client(&self) -> &EvidentSourceClient {
770 &self.inner.client
771 }
772
773 pub fn transaction(&self) -> TransactionBuilder {
786 TransactionBuilder::new(self.clone())
787 }
788
789 pub fn state_change(&self, name: &str, version: StateChangeVersion) -> StateChangeBuilder {
801 StateChangeBuilder::new(self.clone(), name.to_string(), version)
802 }
803}
804
805#[derive(Debug, Clone, PartialEq, Eq, Hash)]
814pub struct CorrelationId(pub String);
815
816impl std::fmt::Display for CorrelationId {
817 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
818 write!(f, "{}", self.0)
819 }
820}
821
822impl From<String> for CorrelationId {
823 fn from(s: String) -> Self {
824 Self(s)
825 }
826}
827
828pub trait DatabaseConnectionAsync {
833 fn transact_async(
837 &self,
838 events: NonEmpty<ProspectiveEvent>,
839 conditions: Vec<AppendCondition>,
840 ) -> impl std::future::Future<Output = Result<CorrelationId, DatabaseError>>;
841
842 fn transact_async_with_id(
844 &self,
845 transaction_id: &str,
846 events: NonEmpty<ProspectiveEvent>,
847 conditions: Vec<AppendCondition>,
848 ) -> impl std::future::Future<Output = Result<CorrelationId, DatabaseError>>;
849
850 fn execute_state_change_async(
854 &self,
855 name: &StateChangeName,
856 version: StateChangeVersion,
857 request: CommandRequest,
858 ) -> impl std::future::Future<Output = Result<CorrelationId, StateChangeError>>;
859}
860
861impl DatabaseConnectionAsync for Connection {
862 fn transact_async(
863 &self,
864 events: NonEmpty<ProspectiveEvent>,
865 conditions: Vec<AppendCondition>,
866 ) -> impl std::future::Future<Output = Result<CorrelationId, DatabaseError>> {
867 let mut client = self.inner.client.clone();
868 let database_name = self.inner.database_name.to_string();
869 let transaction_id = uuid::Uuid::new_v4().to_string();
870
871 let proto_events: Vec<proto_ce::CloudEvent> =
873 events.into_iter().map(|pe| pe.into()).collect();
874 let proto_conditions: Vec<proto::AppendCondition> =
875 conditions.into_iter().map(|c| c.into()).collect();
876
877 async move {
878 let response = client
879 .transact_async(
880 transaction_id,
881 database_name.clone(),
882 proto_events,
883 proto_conditions,
884 )
885 .await
886 .map_err(|e| match e {
887 crate::Error::GrpcStatus(ref status) => {
888 crate::status_mapping::to_database_error(status, &database_name)
889 }
890 _ => DatabaseError::ServerError(e.to_string()),
891 })?;
892
893 Ok(CorrelationId(response.correlation_id))
894 }
895 }
896
897 fn transact_async_with_id(
898 &self,
899 transaction_id: &str,
900 events: NonEmpty<ProspectiveEvent>,
901 conditions: Vec<AppendCondition>,
902 ) -> impl std::future::Future<Output = Result<CorrelationId, DatabaseError>> {
903 let mut client = self.inner.client.clone();
904 let database_name = self.inner.database_name.to_string();
905 let transaction_id = transaction_id.to_string();
906
907 let proto_events: Vec<proto_ce::CloudEvent> =
909 events.into_iter().map(|pe| pe.into()).collect();
910 let proto_conditions: Vec<proto::AppendCondition> =
911 conditions.into_iter().map(|c| c.into()).collect();
912
913 async move {
914 let response = client
915 .transact_async(
916 transaction_id,
917 database_name.clone(),
918 proto_events,
919 proto_conditions,
920 )
921 .await
922 .map_err(|e| match e {
923 crate::Error::GrpcStatus(ref status) => {
924 crate::status_mapping::to_database_error(status, &database_name)
925 }
926 _ => DatabaseError::ServerError(e.to_string()),
927 })?;
928
929 Ok(CorrelationId(response.correlation_id))
930 }
931 }
932
933 fn execute_state_change_async(
934 &self,
935 name: &StateChangeName,
936 version: StateChangeVersion,
937 request: CommandRequest,
938 ) -> impl std::future::Future<Output = Result<CorrelationId, StateChangeError>> {
939 let mut client = self.inner.client.clone();
940 let database_name = self.inner.database_name.to_string();
941 let state_change_name = name.to_string();
942 let current_revision = self.current_metadata().revision;
943
944 let proto_request = proto::CommandRequest {
946 headers: request
947 .headers
948 .into_iter()
949 .map(|(k, v)| proto::Header { key: k, value: v })
950 .collect(),
951 body: request.body,
952 content_type: request.content_type,
953 content_schema: request.content_schema,
954 };
955
956 async move {
957 let response = client
958 .execute_state_change_async(
959 database_name,
960 state_change_name.clone(),
961 version,
962 Some(current_revision),
963 proto_request,
964 None,
965 )
966 .await
967 .map_err(|e| match e {
968 crate::Error::GrpcStatus(ref status) => {
969 crate::status_mapping::to_state_change_error(
970 status,
971 &state_change_name,
972 version,
973 )
974 }
975 _ => StateChangeError::ServerError(e.to_string()),
976 })?;
977
978 Ok(CorrelationId(response.correlation_id))
979 }
980 }
981}
982
983pub struct TransactionBuilder {
1002 connection: Connection,
1003 events: Vec<ProspectiveEvent>,
1004 conditions: Vec<AppendCondition>,
1005 transaction_id: Option<String>,
1006 correlation_id: Option<String>,
1007 causation_id: Option<String>,
1008}
1009
1010impl TransactionBuilder {
1011 pub fn new(connection: Connection) -> Self {
1013 Self {
1014 connection,
1015 events: Vec::new(),
1016 conditions: Vec::new(),
1017 transaction_id: None,
1018 correlation_id: None,
1019 causation_id: None,
1020 }
1021 }
1022
1023 pub fn event(mut self, event: ProspectiveEvent) -> Self {
1025 self.events.push(event);
1026 self
1027 }
1028
1029 pub fn events(mut self, events: impl IntoIterator<Item = ProspectiveEvent>) -> Self {
1031 self.events.extend(events);
1032 self
1033 }
1034
1035 pub fn condition(mut self, condition: AppendCondition) -> Self {
1037 self.conditions.push(condition);
1038 self
1039 }
1040
1041 pub fn conditions(mut self, conditions: impl IntoIterator<Item = AppendCondition>) -> Self {
1043 self.conditions.extend(conditions);
1044 self
1045 }
1046
1047 pub fn require_not_exists(self, selector: evidentsource_core::domain::EventSelector) -> Self {
1049 self.condition(AppendCondition::must_not_exist(selector))
1050 }
1051
1052 pub fn require_exists(self, selector: evidentsource_core::domain::EventSelector) -> Self {
1054 self.condition(AppendCondition::must_exist(selector))
1055 }
1056
1057 pub fn with_transaction_id(mut self, transaction_id: impl Into<String>) -> Self {
1059 self.transaction_id = Some(transaction_id.into());
1060 self
1061 }
1062
1063 pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
1067 self.correlation_id = Some(correlation_id.into());
1068 self
1069 }
1070
1071 pub fn with_causation_id(mut self, causation_id: impl Into<String>) -> Self {
1075 self.causation_id = Some(causation_id.into());
1076 self
1077 }
1078
1079 pub async fn commit(self) -> Result<DatabaseAtRevisionImpl, DatabaseError> {
1083 let events = NonEmpty::from_vec(self.events)
1084 .ok_or(DatabaseError::Transaction(TransactionError::Empty))?;
1085
1086 let mut client = self.connection.inner.client.clone();
1087 let database_name = self.connection.inner.database_name.to_string();
1088 let transaction_id = self
1089 .transaction_id
1090 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1091
1092 let proto_events: Vec<proto_ce::CloudEvent> =
1094 events.into_iter().map(|pe| pe.into()).collect();
1095 let proto_conditions: Vec<proto::AppendCondition> =
1096 self.conditions.into_iter().map(|c| c.into()).collect();
1097
1098 let result = client
1099 .transact_with_options(
1100 transaction_id,
1101 database_name.clone(),
1102 proto_events,
1103 proto_conditions,
1104 self.correlation_id,
1105 self.causation_id,
1106 )
1107 .await
1108 .map_err(|e| match e {
1109 crate::Error::GrpcStatus(ref status) => {
1110 crate::status_mapping::to_database_error(status, &database_name)
1111 }
1112 _ => DatabaseError::ServerError(e.to_string()),
1113 })?;
1114
1115 let new_revision = result
1117 .transaction_summary
1118 .map(|s| s.revision)
1119 .unwrap_or_default();
1120
1121 let proto_db = client
1123 .await_database(database_name.clone(), new_revision)
1124 .await
1125 .map_err(|e| match e {
1126 crate::Error::GrpcStatus(ref status) => {
1127 crate::status_mapping::to_database_error(status, &database_name)
1128 }
1129 _ => DatabaseError::ServerError(e.to_string()),
1130 })?;
1131
1132 DatabaseAtRevisionImpl::new(client, proto_db)
1133 .map_err(|e| DatabaseError::ServerError(format!("failed to parse database: {}", e)))
1134 }
1135
1136 pub async fn commit_async(self) -> Result<CorrelationId, DatabaseError> {
1140 let events = NonEmpty::from_vec(self.events)
1141 .ok_or(DatabaseError::Transaction(TransactionError::Empty))?;
1142
1143 let mut client = self.connection.inner.client.clone();
1144 let database_name = self.connection.inner.database_name.to_string();
1145 let transaction_id = self
1146 .transaction_id
1147 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1148
1149 let proto_events: Vec<proto_ce::CloudEvent> =
1151 events.into_iter().map(|pe| pe.into()).collect();
1152 let proto_conditions: Vec<proto::AppendCondition> =
1153 self.conditions.into_iter().map(|c| c.into()).collect();
1154
1155 let response = client
1156 .transact_async_with_options(
1157 transaction_id,
1158 database_name.clone(),
1159 proto_events,
1160 proto_conditions,
1161 self.correlation_id,
1162 self.causation_id,
1163 )
1164 .await
1165 .map_err(|e| match e {
1166 crate::Error::GrpcStatus(ref status) => {
1167 crate::status_mapping::to_database_error(status, &database_name)
1168 }
1169 _ => DatabaseError::ServerError(e.to_string()),
1170 })?;
1171
1172 Ok(CorrelationId(response.correlation_id))
1173 }
1174}
1175
1176pub struct StateChangeBuilder {
1193 connection: Connection,
1194 name: String,
1195 version: StateChangeVersion,
1196 request: Option<CommandRequest>,
1197 transaction_id: Option<String>,
1198 correlation_id: Option<String>,
1199 causation_id: Option<String>,
1200}
1201
1202impl StateChangeBuilder {
1203 pub fn new(connection: Connection, name: String, version: StateChangeVersion) -> Self {
1205 Self {
1206 connection,
1207 name,
1208 version,
1209 request: None,
1210 transaction_id: None,
1211 correlation_id: None,
1212 causation_id: None,
1213 }
1214 }
1215
1216 pub fn json<T: ::serde::Serialize>(
1218 mut self,
1219 value: &T,
1220 ) -> Result<Self, evidentsource_core::domain::CommandRequestError> {
1221 self.request = Some(CommandRequest::json(value)?);
1222 Ok(self)
1223 }
1224
1225 pub fn request(mut self, request: CommandRequest) -> Self {
1227 self.request = Some(request);
1228 self
1229 }
1230
1231 pub fn with_transaction_id(mut self, transaction_id: impl Into<String>) -> Self {
1233 self.transaction_id = Some(transaction_id.into());
1234 self
1235 }
1236
1237 pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
1241 self.correlation_id = Some(correlation_id.into());
1242 self
1243 }
1244
1245 pub fn with_causation_id(mut self, causation_id: impl Into<String>) -> Self {
1249 self.causation_id = Some(causation_id.into());
1250 self
1251 }
1252
1253 pub fn content_type(mut self, content_type: impl Into<String>) -> Self {
1268 let request = self.request.take().unwrap_or_default();
1269 self.request = Some(request.content_type(content_type));
1270 self
1271 }
1272
1273 pub fn content_schema(mut self, schema_url: impl Into<String>) -> Self {
1288 let request = self.request.take().unwrap_or_default();
1289 self.request = Some(request.content_schema(schema_url));
1290 self
1291 }
1292
1293 pub async fn execute(self) -> Result<DatabaseAtRevisionImpl, StateChangeError> {
1295 let mut client = self.connection.inner.client.clone();
1296 let database_name = self.connection.inner.database_name.to_string();
1297 let state_change_name = self.name.clone();
1298 let current_revision = self.connection.current_metadata().revision;
1299 let request = self.request.unwrap_or_default();
1300
1301 let proto_request = proto::CommandRequest {
1303 headers: request
1304 .headers
1305 .into_iter()
1306 .map(|(k, v)| proto::Header { key: k, value: v })
1307 .collect(),
1308 body: request.body,
1309 content_type: request.content_type,
1310 content_schema: request.content_schema,
1311 };
1312
1313 let result = client
1314 .execute_state_change_with_options(
1315 database_name.clone(),
1316 state_change_name.clone(),
1317 self.version,
1318 Some(current_revision),
1319 proto_request,
1320 self.transaction_id,
1321 self.correlation_id,
1322 self.causation_id,
1323 )
1324 .await
1325 .map_err(|e| match e {
1326 crate::Error::GrpcStatus(ref status) => {
1327 crate::status_mapping::to_state_change_error(
1328 status,
1329 &state_change_name,
1330 self.version,
1331 )
1332 }
1333 _ => StateChangeError::ServerError(e.to_string()),
1334 })?;
1335
1336 let new_revision = result
1338 .transaction_summary
1339 .map(|s| s.revision)
1340 .unwrap_or_default();
1341
1342 let proto_db = client
1344 .await_database(database_name.clone(), new_revision)
1345 .await
1346 .map_err(|e| match e {
1347 crate::Error::GrpcStatus(ref status) => StateChangeError::Database(
1348 crate::status_mapping::to_database_error(status, &database_name),
1349 ),
1350 _ => StateChangeError::Database(DatabaseError::ServerError(e.to_string())),
1351 })?;
1352
1353 DatabaseAtRevisionImpl::new(client, proto_db).map_err(|e| {
1354 StateChangeError::Database(DatabaseError::ServerError(format!(
1355 "failed to parse database: {}",
1356 e
1357 )))
1358 })
1359 }
1360
1361 pub async fn execute_async(self) -> Result<CorrelationId, StateChangeError> {
1363 let mut client = self.connection.inner.client.clone();
1364 let database_name = self.connection.inner.database_name.to_string();
1365 let state_change_name = self.name.clone();
1366 let current_revision = self.connection.current_metadata().revision;
1367 let request = self.request.unwrap_or_default();
1368
1369 let proto_request = proto::CommandRequest {
1371 headers: request
1372 .headers
1373 .into_iter()
1374 .map(|(k, v)| proto::Header { key: k, value: v })
1375 .collect(),
1376 body: request.body,
1377 content_type: request.content_type,
1378 content_schema: request.content_schema,
1379 };
1380
1381 let response = client
1382 .execute_state_change_async_with_options(
1383 database_name,
1384 state_change_name.clone(),
1385 self.version,
1386 Some(current_revision),
1387 proto_request,
1388 self.transaction_id,
1389 self.correlation_id,
1390 self.causation_id,
1391 )
1392 .await
1393 .map_err(|e| match e {
1394 crate::Error::GrpcStatus(ref status) => {
1395 crate::status_mapping::to_state_change_error(
1396 status,
1397 &state_change_name,
1398 self.version,
1399 )
1400 }
1401 _ => StateChangeError::ServerError(e.to_string()),
1402 })?;
1403
1404 Ok(CorrelationId(response.correlation_id))
1405 }
1406}