evidentsource_client/
connection.rs

1//! Connection to an EvidentSource database with live metadata updates.
2
3use std::sync::Arc;
4use std::time::Duration;
5
6use chrono::{DateTime, Utc};
7use futures::stream::{self, StreamExt};
8use futures::Stream;
9use nonempty::NonEmpty;
10use tokio::sync::{watch, Mutex};
11use tokio::task::JoinHandle;
12
13use evidentsource_core::domain::{
14    AppendCondition, CommandRequest, DatabaseError, DatabaseName, ProspectiveEvent, Revision,
15    StateChangeError, StateChangeName, StateChangeVersion, Transaction, TransactionError,
16    TransactionSummary,
17};
18use evidentsource_core::{DatabaseConnection, DatabaseIdentity, DatabaseProvider};
19
20use crate::com::evidentsource as proto;
21use crate::conversions::{timestamp_to_datetime, ConversionError};
22use crate::database::DatabaseAtRevisionImpl;
23use crate::io::cloudevents::v1 as proto_ce;
24use crate::EvidentSourceClient;
25
26/// Metadata about the database state, updated via subscription.
27#[derive(Clone, Debug)]
28pub struct DatabaseMetadata {
29    /// Database name.
30    pub name: DatabaseName,
31    /// When the database was created.
32    pub created_at: DateTime<Utc>,
33    /// Current revision number.
34    pub revision: u64,
35    /// When the current revision was committed.
36    pub revision_timestamp: DateTime<Utc>,
37}
38
39impl DatabaseMetadata {
40    /// Create metadata from a proto Database message.
41    fn from_proto(proto: proto::Database) -> Result<Self, ConversionError> {
42        let name = DatabaseName::new(&proto.name)?;
43        let created_at = proto
44            .created_at
45            .ok_or_else(|| ConversionError::missing_field("Database", "created_at"))
46            .and_then(timestamp_to_datetime)?;
47        let revision_timestamp = proto
48            .revision_timestamp
49            .ok_or_else(|| ConversionError::missing_field("Database", "revision_timestamp"))
50            .and_then(timestamp_to_datetime)?;
51
52        Ok(Self {
53            name,
54            created_at,
55            revision: proto.revision,
56            revision_timestamp,
57        })
58    }
59}
60
61/// Shared state for the connection, held behind Arc for cloning.
62struct ConnectionInner {
63    /// The low-level gRPC client.
64    client: EvidentSourceClient,
65    /// Database name (stored explicitly for guaranteed availability).
66    database_name: DatabaseName,
67    /// Sender side of the watch channel (used by subscription task).
68    metadata_tx: watch::Sender<DatabaseMetadata>,
69    /// Handle to the subscription task for lifecycle management.
70    subscription_handle: Mutex<Option<JoinHandle<()>>>,
71    /// Shutdown signal sender.
72    shutdown_tx: watch::Sender<bool>,
73}
74
75impl Drop for ConnectionInner {
76    fn drop(&mut self) {
77        // Signal shutdown to subscription task
78        let _ = self.shutdown_tx.send(true);
79    }
80}
81
82/// A connection to an EvidentSource database with live metadata updates.
83///
84/// The connection maintains a background subscription to receive database
85/// updates whenever transactions are committed. This enables efficient access
86/// to the latest database state without polling.
87///
88/// # Example
89///
90/// ```ignore
91/// let es = EvidentSource::connect_to_server("http://localhost:50051").await?;
92/// let conn = es.connect(&DatabaseName::new("my-db")?).await?;
93///
94/// // Get latest database state
95/// let db = conn.latest_database().await?;
96/// println!("Revision: {}", db.revision());
97/// ```
98#[derive(Clone)]
99pub struct Connection {
100    /// Shared inner state (clones share this).
101    inner: Arc<ConnectionInner>,
102    /// Receiver side of metadata watch channel.
103    /// Each clone gets its own receiver but they all see the same updates.
104    metadata_rx: watch::Receiver<DatabaseMetadata>,
105    /// Receiver for shutdown signal.
106    shutdown_rx: watch::Receiver<bool>,
107}
108
109impl Connection {
110    /// Create a new connection to a database.
111    ///
112    /// This fetches the initial database state and starts a background
113    /// subscription to receive updates.
114    pub async fn new(
115        client: EvidentSourceClient,
116        database_name: DatabaseName,
117    ) -> Result<Self, DatabaseError> {
118        let mut client_clone = client.clone();
119
120        // Fetch initial database state
121        let initial_db = client_clone
122            .fetch_latest_database(database_name.to_string())
123            .await
124            .map_err(|_| DatabaseError::NotFound(database_name.to_string()))?;
125
126        let initial_metadata = DatabaseMetadata::from_proto(initial_db)
127            .map_err(|e| DatabaseError::ServerError(format!("failed to parse metadata: {}", e)))?;
128
129        // Create watch channel with initial state
130        let (metadata_tx, metadata_rx) = watch::channel(initial_metadata);
131
132        // Create shutdown signal
133        let (shutdown_tx, shutdown_rx) = watch::channel(false);
134
135        let inner = Arc::new(ConnectionInner {
136            client,
137            database_name: database_name.clone(),
138            metadata_tx,
139            subscription_handle: Mutex::new(None),
140            shutdown_tx,
141        });
142
143        let connection = Connection {
144            inner,
145            metadata_rx,
146            shutdown_rx,
147        };
148
149        // Spawn the subscription task
150        connection.spawn_subscription_task().await;
151
152        Ok(connection)
153    }
154
155    /// Spawn (or respawn) the subscription task.
156    async fn spawn_subscription_task(&self) {
157        let inner = self.inner.clone();
158        let db_name = self.inner.database_name.to_string();
159        let mut shutdown_rx = self.shutdown_rx.clone();
160
161        let handle = tokio::spawn(async move {
162            let mut backoff = Duration::from_millis(100);
163            let max_backoff = Duration::from_secs(30);
164
165            loop {
166                // Try to establish subscription
167                let stream_result = {
168                    let mut client = inner.client.clone();
169                    client.subscribe_database_updates(db_name.clone()).await
170                };
171
172                match stream_result {
173                    Ok(mut stream) => {
174                        backoff = Duration::from_millis(100); // Reset backoff on success
175
176                        // Process updates until error or shutdown
177                        loop {
178                            tokio::select! {
179                                // Check for shutdown signal
180                                _ = shutdown_rx.changed() => {
181                                    if *shutdown_rx.borrow() {
182                                        tracing::debug!("Subscription task shutting down");
183                                        return;
184                                    }
185                                }
186
187                                // Process next update
188                                update = stream.next() => {
189                                    match update {
190                                        Some(Ok(reply)) => {
191                                            if let Some(db) = reply.database {
192                                                match DatabaseMetadata::from_proto(db) {
193                                                    Ok(metadata) => {
194                                                        // Send update - ignore error if all receivers dropped
195                                                        let _ = inner.metadata_tx.send(metadata);
196                                                    }
197                                                    Err(e) => {
198                                                        tracing::warn!("Failed to parse metadata: {}", e);
199                                                    }
200                                                }
201                                            }
202                                        }
203                                        Some(Err(status)) => {
204                                            tracing::warn!("Stream error: {}", status);
205                                            break; // Will reconnect
206                                        }
207                                        None => {
208                                            tracing::debug!("Stream ended");
209                                            break; // Will reconnect
210                                        }
211                                    }
212                                }
213                            }
214                        }
215                    }
216                    Err(e) => {
217                        tracing::warn!("Failed to subscribe: {}", e);
218                    }
219                }
220
221                // Check shutdown before reconnecting
222                if *shutdown_rx.borrow() {
223                    return;
224                }
225
226                // Exponential backoff before retry
227                tracing::debug!("Reconnecting in {:?}", backoff);
228                tokio::time::sleep(backoff).await;
229                backoff = (backoff * 2).min(max_backoff);
230            }
231        });
232
233        // Store the handle
234        let mut guard = self.inner.subscription_handle.lock().await;
235        *guard = Some(handle);
236    }
237
238    /// Explicitly close the connection and wait for cleanup.
239    pub async fn close(self) -> Result<(), DatabaseError> {
240        // Signal shutdown
241        let _ = self.inner.shutdown_tx.send(true);
242
243        // Wait for subscription task to complete
244        if let Some(handle) = self.inner.subscription_handle.lock().await.take() {
245            // Give it a reasonable timeout
246            match tokio::time::timeout(Duration::from_secs(5), handle).await {
247                Ok(Ok(())) => Ok(()),
248                Ok(Err(e)) => Err(DatabaseError::ServerError(format!(
249                    "subscription task panicked: {}",
250                    e
251                ))),
252                Err(_) => Err(DatabaseError::Timeout),
253            }
254        } else {
255            Ok(())
256        }
257    }
258
259    /// Get the current cached metadata.
260    fn current_metadata(&self) -> DatabaseMetadata {
261        self.metadata_rx.borrow().clone()
262    }
263
264    /// Create a database snapshot from the current metadata.
265    fn snapshot_from_metadata(&self, metadata: &DatabaseMetadata) -> DatabaseAtRevisionImpl {
266        DatabaseAtRevisionImpl::at_revision_with_metadata(
267            self.inner.client.clone(),
268            metadata.name.clone(),
269            metadata.created_at,
270            metadata.revision,
271            metadata.revision_timestamp,
272        )
273    }
274}
275
276impl std::fmt::Debug for Connection {
277    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278        f.debug_struct("Connection")
279            .field("database_name", &self.inner.database_name)
280            .field("current_revision", &self.metadata_rx.borrow().revision)
281            .finish()
282    }
283}
284
285impl DatabaseIdentity for Connection {
286    fn name(&self) -> &DatabaseName {
287        &self.inner.database_name
288    }
289
290    fn created_at(&self) -> DateTime<Utc> {
291        self.metadata_rx.borrow().created_at
292    }
293}
294
295impl DatabaseProvider for Connection {
296    type AtRevision = DatabaseAtRevisionImpl;
297
298    fn local_database(&self) -> Self::AtRevision {
299        let metadata = self.current_metadata();
300        self.snapshot_from_metadata(&metadata)
301    }
302
303    fn latest_database(
304        &self,
305    ) -> impl std::future::Future<Output = Result<Self::AtRevision, DatabaseError>> {
306        let mut client = self.inner.client.clone();
307        let database_name = self.inner.database_name.to_string();
308
309        async move {
310            let proto_db = client
311                .fetch_latest_database(database_name.clone())
312                .await
313                .map_err(|e| match e {
314                    crate::Error::GrpcStatus(ref status) => {
315                        crate::status_mapping::to_database_error(status, &database_name)
316                    }
317                    _ => DatabaseError::ServerError(e.to_string()),
318                })?;
319
320            DatabaseAtRevisionImpl::new(client, proto_db)
321                .map_err(|e| DatabaseError::ServerError(format!("failed to parse database: {}", e)))
322        }
323    }
324
325    fn database_at_revision(
326        &self,
327        revision: u64,
328    ) -> impl std::future::Future<Output = Result<Self::AtRevision, DatabaseError>> {
329        let mut client = self.inner.client.clone();
330        let database_name = self.inner.database_name.to_string();
331
332        async move {
333            let proto_db = client
334                .await_database(database_name.clone(), revision)
335                .await
336                .map_err(|e| match e {
337                    crate::Error::GrpcStatus(ref status) => {
338                        crate::status_mapping::to_database_error(status, &database_name)
339                    }
340                    _ => DatabaseError::ServerError(e.to_string()),
341                })?;
342
343            DatabaseAtRevisionImpl::new(client, proto_db)
344                .map_err(|e| DatabaseError::ServerError(format!("failed to parse database: {}", e)))
345        }
346    }
347
348    fn database_at_timestamp(
349        &self,
350        revision_timestamp: DateTime<Utc>,
351    ) -> impl std::future::Future<Output = Result<Self::AtRevision, DatabaseError>> {
352        use crate::conversions::datetime_to_timestamp;
353
354        let mut client = self.inner.client.clone();
355        let database_name = self.inner.database_name.to_string();
356        let ts = datetime_to_timestamp(revision_timestamp);
357
358        async move {
359            let proto_db = client
360                .database_effective_at_timestamp(database_name.clone(), ts)
361                .await
362                .map_err(|e| match e {
363                    crate::Error::GrpcStatus(ref status) => {
364                        crate::status_mapping::to_database_error(status, &database_name)
365                    }
366                    _ => DatabaseError::ServerError(e.to_string()),
367                })?;
368
369            DatabaseAtRevisionImpl::new(client, proto_db)
370                .map_err(|e| DatabaseError::ServerError(format!("failed to parse database: {}", e)))
371        }
372    }
373}
374
375impl DatabaseConnection for Connection {
376    fn transact(
377        &self,
378        events: NonEmpty<ProspectiveEvent>,
379        conditions: Vec<AppendCondition>,
380    ) -> impl std::future::Future<Output = Result<Self::AtRevision, DatabaseError>> {
381        let mut client = self.inner.client.clone();
382        let database_name = self.inner.database_name.to_string();
383
384        // Convert domain types to proto
385        let proto_events: Vec<proto_ce::CloudEvent> =
386            events.into_iter().map(|pe| pe.into()).collect();
387        let proto_conditions: Vec<proto::AppendCondition> =
388            conditions.into_iter().map(|c| c.into()).collect();
389        let transaction_id = uuid::Uuid::new_v4().to_string();
390
391        async move {
392            let result = client
393                .transact(
394                    transaction_id,
395                    database_name.clone(),
396                    proto_events,
397                    proto_conditions,
398                )
399                .await
400                .map_err(|e| match e {
401                    crate::Error::GrpcStatus(ref status) => {
402                        crate::status_mapping::to_database_error(status, &database_name)
403                    }
404                    _ => DatabaseError::ServerError(e.to_string()),
405                })?;
406
407            // Get the new revision from the result
408            let new_revision = result
409                .transaction_summary
410                .map(|s| s.revision)
411                .unwrap_or_default();
412
413            // Await the database at the new revision
414            let proto_db = client
415                .await_database(database_name.clone(), new_revision)
416                .await
417                .map_err(|e| match e {
418                    crate::Error::GrpcStatus(ref status) => {
419                        crate::status_mapping::to_database_error(status, &database_name)
420                    }
421                    _ => DatabaseError::ServerError(e.to_string()),
422                })?;
423
424            DatabaseAtRevisionImpl::new(client, proto_db)
425                .map_err(|e| DatabaseError::ServerError(format!("failed to parse database: {}", e)))
426        }
427    }
428
429    fn execute_state_change(
430        &self,
431        name: &StateChangeName,
432        version: StateChangeVersion,
433        request: CommandRequest,
434    ) -> impl std::future::Future<Output = Result<Self::AtRevision, StateChangeError>> {
435        let mut client = self.inner.client.clone();
436        let database_name = self.inner.database_name.to_string();
437        let state_change_name = name.to_string();
438        let current_revision = self.current_metadata().revision;
439
440        // Convert CommandRequest to proto
441        let proto_request = proto::CommandRequest {
442            headers: request
443                .headers
444                .into_iter()
445                .map(|(k, v)| proto::Header { key: k, value: v })
446                .collect(),
447            body: request.body,
448            content_type: request.content_type,
449            content_schema: request.content_schema,
450        };
451
452        async move {
453            let result = client
454                .execute_state_change(
455                    database_name.clone(),
456                    state_change_name.clone(),
457                    version,
458                    Some(current_revision),
459                    proto_request,
460                    None,
461                )
462                .await
463                .map_err(|e| match e {
464                    crate::Error::GrpcStatus(ref status) => {
465                        crate::status_mapping::to_state_change_error(
466                            status,
467                            &state_change_name,
468                            version,
469                        )
470                    }
471                    _ => StateChangeError::ServerError(e.to_string()),
472                })?;
473
474            // Get the new revision from the result
475            let new_revision = result
476                .transaction_summary
477                .map(|s| s.revision)
478                .unwrap_or_default();
479
480            // Await the database at the new revision
481            let proto_db = client
482                .await_database(database_name.clone(), new_revision)
483                .await
484                .map_err(|e| match e {
485                    crate::Error::GrpcStatus(ref status) => StateChangeError::Database(
486                        crate::status_mapping::to_database_error(status, &database_name),
487                    ),
488                    _ => StateChangeError::Database(DatabaseError::ServerError(e.to_string())),
489                })?;
490
491            DatabaseAtRevisionImpl::new(client, proto_db).map_err(|e| {
492                StateChangeError::Database(DatabaseError::ServerError(format!(
493                    "failed to parse database: {}",
494                    e
495                )))
496            })
497        }
498    }
499
500    fn log(&self) -> impl Stream<Item = TransactionSummary> {
501        let mut client = self.inner.client.clone();
502        let database_name = self.inner.database_name.to_string();
503
504        stream::once(async move {
505            let result = client.scan_database_log(database_name, 0, false).await;
506
507            match result {
508                Ok(response_stream) => response_stream
509                    .filter_map(|result| async move {
510                        match result {
511                            Ok(reply) => {
512                                if let Some(proto::database_log_reply::Transaction::Summary(
513                                    summary,
514                                )) = reply.transaction
515                                {
516                                    Some(TransactionSummary::from(summary))
517                                } else {
518                                    None
519                                }
520                            }
521                            Err(_) => None,
522                        }
523                    })
524                    .boxed(),
525                Err(_) => stream::empty().boxed(),
526            }
527        })
528        .flatten()
529    }
530
531    fn log_detail(&self) -> impl Stream<Item = Transaction> {
532        let mut client = self.inner.client.clone();
533        let database_name = self.inner.database_name.to_string();
534
535        stream::once(async move {
536            let result = client.scan_database_log(database_name, 0, true).await;
537
538            match result {
539                Ok(response_stream) => response_stream
540                    .filter_map(|result| async move {
541                        match result {
542                            Ok(reply) => {
543                                if let Some(proto::database_log_reply::Transaction::Detail(txn)) =
544                                    reply.transaction
545                                {
546                                    Transaction::try_from(txn).ok()
547                                } else {
548                                    None
549                                }
550                            }
551                            Err(_) => None,
552                        }
553                    })
554                    .boxed(),
555                Err(_) => stream::empty().boxed(),
556            }
557        })
558        .flatten()
559    }
560
561    fn transact_with_id(
562        &self,
563        transaction_id: &str,
564        events: NonEmpty<ProspectiveEvent>,
565        conditions: Vec<AppendCondition>,
566    ) -> impl std::future::Future<Output = Result<Self::AtRevision, DatabaseError>> {
567        let mut client = self.inner.client.clone();
568        let database_name = self.inner.database_name.to_string();
569        let transaction_id = transaction_id.to_string();
570
571        // Convert domain types to proto
572        let proto_events: Vec<proto_ce::CloudEvent> =
573            events.into_iter().map(|pe| pe.into()).collect();
574        let proto_conditions: Vec<proto::AppendCondition> =
575            conditions.into_iter().map(|c| c.into()).collect();
576
577        async move {
578            let result = client
579                .transact(
580                    transaction_id,
581                    database_name.clone(),
582                    proto_events,
583                    proto_conditions,
584                )
585                .await
586                .map_err(|e| match e {
587                    crate::Error::GrpcStatus(ref status) => {
588                        crate::status_mapping::to_database_error(status, &database_name)
589                    }
590                    _ => DatabaseError::ServerError(e.to_string()),
591                })?;
592
593            // Get the new revision from the result
594            let new_revision = result
595                .transaction_summary
596                .map(|s| s.revision)
597                .unwrap_or_default();
598
599            // Await the database at the new revision
600            let proto_db = client
601                .await_database(database_name.clone(), new_revision)
602                .await
603                .map_err(|e| match e {
604                    crate::Error::GrpcStatus(ref status) => {
605                        crate::status_mapping::to_database_error(status, &database_name)
606                    }
607                    _ => DatabaseError::ServerError(e.to_string()),
608                })?;
609
610            DatabaseAtRevisionImpl::new(client, proto_db)
611                .map_err(|e| DatabaseError::ServerError(format!("failed to parse database: {}", e)))
612        }
613    }
614
615    fn log_from(&self, from_revision: Revision) -> impl Stream<Item = TransactionSummary> {
616        let mut client = self.inner.client.clone();
617        let database_name = self.inner.database_name.to_string();
618
619        stream::once(async move {
620            let result = client
621                .scan_database_log(database_name, from_revision, false)
622                .await;
623
624            match result {
625                Ok(response_stream) => response_stream
626                    .filter_map(|result| async move {
627                        match result {
628                            Ok(reply) => {
629                                if let Some(proto::database_log_reply::Transaction::Summary(
630                                    summary,
631                                )) = reply.transaction
632                                {
633                                    Some(TransactionSummary::from(summary))
634                                } else {
635                                    None
636                                }
637                            }
638                            Err(_) => None,
639                        }
640                    })
641                    .boxed(),
642                Err(_) => stream::empty().boxed(),
643            }
644        })
645        .flatten()
646    }
647
648    fn log_detail_from(&self, from_revision: Revision) -> impl Stream<Item = Transaction> {
649        let mut client = self.inner.client.clone();
650        let database_name = self.inner.database_name.to_string();
651
652        stream::once(async move {
653            let result = client
654                .scan_database_log(database_name, from_revision, true)
655                .await;
656
657            match result {
658                Ok(response_stream) => response_stream
659                    .filter_map(|result| async move {
660                        match result {
661                            Ok(reply) => {
662                                if let Some(proto::database_log_reply::Transaction::Detail(txn)) =
663                                    reply.transaction
664                                {
665                                    Transaction::try_from(txn).ok()
666                                } else {
667                                    None
668                                }
669                            }
670                            Err(_) => None,
671                        }
672                    })
673                    .boxed(),
674                Err(_) => stream::empty().boxed(),
675            }
676        })
677        .flatten()
678    }
679}
680
681// =============================================================================
682// Connection Additional Methods
683// =============================================================================
684
685impl Connection {
686    /// List all state change definitions registered with this database.
687    ///
688    /// # Example
689    ///
690    /// ```ignore
691    /// use futures::StreamExt;
692    ///
693    /// let mut state_changes = conn.list_state_changes();
694    /// while let Some(sc) = state_changes.next().await {
695    ///     println!("State change: {} v{}", sc.name, sc.version);
696    /// }
697    /// ```
698    pub fn list_state_changes(
699        &self,
700    ) -> impl Stream<Item = evidentsource_core::domain::StateChangeDefinitionSummary> {
701        use evidentsource_core::domain::{StateChangeDefinitionSummary, StateChangeName};
702
703        let mut client = self.inner.client.clone();
704        let database_name = self.inner.database_name.to_string();
705
706        stream::once(async move {
707            let result = client.list_state_changes(database_name).await;
708
709            match result {
710                Ok(response_stream) => response_stream
711                    .filter_map(|result| async move {
712                        match result {
713                            Ok(reply) => StateChangeName::new(&reply.name).ok().map(|name| {
714                                StateChangeDefinitionSummary {
715                                    name,
716                                    version: reply.version,
717                                }
718                            }),
719                            Err(_) => None,
720                        }
721                    })
722                    .boxed(),
723                Err(_) => stream::empty().boxed(),
724            }
725        })
726        .flatten()
727    }
728
729    /// Fetch a transaction by its ID.
730    ///
731    /// # Example
732    ///
733    /// ```ignore
734    /// let txn = conn.fetch_transaction_by_id("my-txn-id").await?;
735    /// println!("Transaction revision: {}", txn.summary.revision);
736    /// ```
737    pub async fn fetch_transaction_by_id(
738        &self,
739        transaction_id: &str,
740    ) -> Result<Transaction, DatabaseError> {
741        let mut client = self.inner.client.clone();
742        let database_name = self.inner.database_name.to_string();
743
744        let reply = client
745            .fetch_transaction_by_id(database_name.clone(), transaction_id.to_string())
746            .await
747            .map_err(|e| match e {
748                crate::Error::GrpcStatus(ref status) => {
749                    crate::status_mapping::to_database_error(status, &database_name)
750                }
751                _ => DatabaseError::ServerError(e.to_string()),
752            })?;
753
754        reply
755            .transaction
756            .ok_or_else(|| {
757                DatabaseError::ServerError("missing transaction in response".to_string())
758            })
759            .and_then(|txn| {
760                Transaction::try_from(txn)
761                    .map_err(|e| DatabaseError::ServerError(format!("invalid transaction: {}", e)))
762            })
763    }
764
765    /// Get a reference to the underlying gRPC client.
766    ///
767    /// This provides access to low-level operations not exposed through
768    /// the high-level API.
769    pub fn client(&self) -> &EvidentSourceClient {
770        &self.inner.client
771    }
772
773    /// Create a transaction builder for constructing transactions.
774    ///
775    /// # Example
776    ///
777    /// ```ignore
778    /// let db = conn
779    ///     .transaction()
780    ///     .event(my_event)
781    ///     .require_not_exists(EventSelector::stream_equals("my-stream"))
782    ///     .commit()
783    ///     .await?;
784    /// ```
785    pub fn transaction(&self) -> TransactionBuilder {
786        TransactionBuilder::new(self.clone())
787    }
788
789    /// Create a state change builder for executing state changes.
790    ///
791    /// # Example
792    ///
793    /// ```ignore
794    /// let db = conn
795    ///     .state_change("create-account", 1)
796    ///     .json(&CreateAccountRequest { name: "Alice" })?
797    ///     .execute()
798    ///     .await?;
799    /// ```
800    pub fn state_change(&self, name: &str, version: StateChangeVersion) -> StateChangeBuilder {
801        StateChangeBuilder::new(self.clone(), name.to_string(), version)
802    }
803}
804
805// =============================================================================
806// Async Command Extension Trait
807// =============================================================================
808
809/// A correlation ID returned from async commands.
810///
811/// This can be used to track the result of an async operation via Kafka
812/// or other message brokers.
813#[derive(Debug, Clone, PartialEq, Eq, Hash)]
814pub struct CorrelationId(pub String);
815
816impl std::fmt::Display for CorrelationId {
817    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
818        write!(f, "{}", self.0)
819    }
820}
821
822impl From<String> for CorrelationId {
823    fn from(s: String) -> Self {
824        Self(s)
825    }
826}
827
828/// Extension trait for async database operations.
829///
830/// Async operations return immediately with a correlation ID that can be used
831/// to track the result via Kafka or other message brokers.
832pub trait DatabaseConnectionAsync {
833    /// Transact asynchronously.
834    ///
835    /// Returns a correlation ID that can be used to track the result.
836    fn transact_async(
837        &self,
838        events: NonEmpty<ProspectiveEvent>,
839        conditions: Vec<AppendCondition>,
840    ) -> impl std::future::Future<Output = Result<CorrelationId, DatabaseError>>;
841
842    /// Transact asynchronously with a specific transaction ID.
843    fn transact_async_with_id(
844        &self,
845        transaction_id: &str,
846        events: NonEmpty<ProspectiveEvent>,
847        conditions: Vec<AppendCondition>,
848    ) -> impl std::future::Future<Output = Result<CorrelationId, DatabaseError>>;
849
850    /// Execute a state change asynchronously.
851    ///
852    /// Returns a correlation ID that can be used to track the result.
853    fn execute_state_change_async(
854        &self,
855        name: &StateChangeName,
856        version: StateChangeVersion,
857        request: CommandRequest,
858    ) -> impl std::future::Future<Output = Result<CorrelationId, StateChangeError>>;
859}
860
861impl DatabaseConnectionAsync for Connection {
862    fn transact_async(
863        &self,
864        events: NonEmpty<ProspectiveEvent>,
865        conditions: Vec<AppendCondition>,
866    ) -> impl std::future::Future<Output = Result<CorrelationId, DatabaseError>> {
867        let mut client = self.inner.client.clone();
868        let database_name = self.inner.database_name.to_string();
869        let transaction_id = uuid::Uuid::new_v4().to_string();
870
871        // Convert domain types to proto
872        let proto_events: Vec<proto_ce::CloudEvent> =
873            events.into_iter().map(|pe| pe.into()).collect();
874        let proto_conditions: Vec<proto::AppendCondition> =
875            conditions.into_iter().map(|c| c.into()).collect();
876
877        async move {
878            let response = client
879                .transact_async(
880                    transaction_id,
881                    database_name.clone(),
882                    proto_events,
883                    proto_conditions,
884                )
885                .await
886                .map_err(|e| match e {
887                    crate::Error::GrpcStatus(ref status) => {
888                        crate::status_mapping::to_database_error(status, &database_name)
889                    }
890                    _ => DatabaseError::ServerError(e.to_string()),
891                })?;
892
893            Ok(CorrelationId(response.correlation_id))
894        }
895    }
896
897    fn transact_async_with_id(
898        &self,
899        transaction_id: &str,
900        events: NonEmpty<ProspectiveEvent>,
901        conditions: Vec<AppendCondition>,
902    ) -> impl std::future::Future<Output = Result<CorrelationId, DatabaseError>> {
903        let mut client = self.inner.client.clone();
904        let database_name = self.inner.database_name.to_string();
905        let transaction_id = transaction_id.to_string();
906
907        // Convert domain types to proto
908        let proto_events: Vec<proto_ce::CloudEvent> =
909            events.into_iter().map(|pe| pe.into()).collect();
910        let proto_conditions: Vec<proto::AppendCondition> =
911            conditions.into_iter().map(|c| c.into()).collect();
912
913        async move {
914            let response = client
915                .transact_async(
916                    transaction_id,
917                    database_name.clone(),
918                    proto_events,
919                    proto_conditions,
920                )
921                .await
922                .map_err(|e| match e {
923                    crate::Error::GrpcStatus(ref status) => {
924                        crate::status_mapping::to_database_error(status, &database_name)
925                    }
926                    _ => DatabaseError::ServerError(e.to_string()),
927                })?;
928
929            Ok(CorrelationId(response.correlation_id))
930        }
931    }
932
933    fn execute_state_change_async(
934        &self,
935        name: &StateChangeName,
936        version: StateChangeVersion,
937        request: CommandRequest,
938    ) -> impl std::future::Future<Output = Result<CorrelationId, StateChangeError>> {
939        let mut client = self.inner.client.clone();
940        let database_name = self.inner.database_name.to_string();
941        let state_change_name = name.to_string();
942        let current_revision = self.current_metadata().revision;
943
944        // Convert CommandRequest to proto
945        let proto_request = proto::CommandRequest {
946            headers: request
947                .headers
948                .into_iter()
949                .map(|(k, v)| proto::Header { key: k, value: v })
950                .collect(),
951            body: request.body,
952            content_type: request.content_type,
953            content_schema: request.content_schema,
954        };
955
956        async move {
957            let response = client
958                .execute_state_change_async(
959                    database_name,
960                    state_change_name.clone(),
961                    version,
962                    Some(current_revision),
963                    proto_request,
964                    None,
965                )
966                .await
967                .map_err(|e| match e {
968                    crate::Error::GrpcStatus(ref status) => {
969                        crate::status_mapping::to_state_change_error(
970                            status,
971                            &state_change_name,
972                            version,
973                        )
974                    }
975                    _ => StateChangeError::ServerError(e.to_string()),
976                })?;
977
978            Ok(CorrelationId(response.correlation_id))
979        }
980    }
981}
982
983// =============================================================================
984// Transaction Builder
985// =============================================================================
986
987/// Builder for constructing transactions.
988///
989/// # Example
990///
991/// ```ignore
992/// let db = conn
993///     .transaction()
994///     .event(event1)
995///     .event(event2)
996///     .condition(AppendCondition::must_not_exist(selector))
997///     .with_correlation_id("order-12345")
998///     .commit()
999///     .await?;
1000/// ```
1001pub struct TransactionBuilder {
1002    connection: Connection,
1003    events: Vec<ProspectiveEvent>,
1004    conditions: Vec<AppendCondition>,
1005    transaction_id: Option<String>,
1006    correlation_id: Option<String>,
1007    causation_id: Option<String>,
1008}
1009
1010impl TransactionBuilder {
1011    /// Create a new transaction builder.
1012    pub fn new(connection: Connection) -> Self {
1013        Self {
1014            connection,
1015            events: Vec::new(),
1016            conditions: Vec::new(),
1017            transaction_id: None,
1018            correlation_id: None,
1019            causation_id: None,
1020        }
1021    }
1022
1023    /// Add an event to the transaction.
1024    pub fn event(mut self, event: ProspectiveEvent) -> Self {
1025        self.events.push(event);
1026        self
1027    }
1028
1029    /// Add multiple events to the transaction.
1030    pub fn events(mut self, events: impl IntoIterator<Item = ProspectiveEvent>) -> Self {
1031        self.events.extend(events);
1032        self
1033    }
1034
1035    /// Add a condition to the transaction.
1036    pub fn condition(mut self, condition: AppendCondition) -> Self {
1037        self.conditions.push(condition);
1038        self
1039    }
1040
1041    /// Add multiple conditions to the transaction.
1042    pub fn conditions(mut self, conditions: impl IntoIterator<Item = AppendCondition>) -> Self {
1043        self.conditions.extend(conditions);
1044        self
1045    }
1046
1047    /// Require that no events matching the selector exist.
1048    pub fn require_not_exists(self, selector: evidentsource_core::domain::EventSelector) -> Self {
1049        self.condition(AppendCondition::must_not_exist(selector))
1050    }
1051
1052    /// Require that at least one event matching the selector exists.
1053    pub fn require_exists(self, selector: evidentsource_core::domain::EventSelector) -> Self {
1054        self.condition(AppendCondition::must_exist(selector))
1055    }
1056
1057    /// Set a specific transaction ID for idempotency.
1058    pub fn with_transaction_id(mut self, transaction_id: impl Into<String>) -> Self {
1059        self.transaction_id = Some(transaction_id.into());
1060        self
1061    }
1062
1063    /// Set the correlation ID for grouping related events in a business flow.
1064    ///
1065    /// See: <https://github.com/cloudevents/spec/blob/main/cloudevents/extensions/correlation.md>
1066    pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
1067        self.correlation_id = Some(correlation_id.into());
1068        self
1069    }
1070
1071    /// Set the causation ID for parent-child event relationships.
1072    ///
1073    /// See: <https://github.com/cloudevents/spec/blob/main/cloudevents/extensions/correlation.md>
1074    pub fn with_causation_id(mut self, causation_id: impl Into<String>) -> Self {
1075        self.causation_id = Some(causation_id.into());
1076        self
1077    }
1078
1079    /// Commit the transaction synchronously.
1080    ///
1081    /// Returns the database at the new revision.
1082    pub async fn commit(self) -> Result<DatabaseAtRevisionImpl, DatabaseError> {
1083        let events = NonEmpty::from_vec(self.events)
1084            .ok_or(DatabaseError::Transaction(TransactionError::Empty))?;
1085
1086        let mut client = self.connection.inner.client.clone();
1087        let database_name = self.connection.inner.database_name.to_string();
1088        let transaction_id = self
1089            .transaction_id
1090            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1091
1092        // Convert domain types to proto
1093        let proto_events: Vec<proto_ce::CloudEvent> =
1094            events.into_iter().map(|pe| pe.into()).collect();
1095        let proto_conditions: Vec<proto::AppendCondition> =
1096            self.conditions.into_iter().map(|c| c.into()).collect();
1097
1098        let result = client
1099            .transact_with_options(
1100                transaction_id,
1101                database_name.clone(),
1102                proto_events,
1103                proto_conditions,
1104                self.correlation_id,
1105                self.causation_id,
1106            )
1107            .await
1108            .map_err(|e| match e {
1109                crate::Error::GrpcStatus(ref status) => {
1110                    crate::status_mapping::to_database_error(status, &database_name)
1111                }
1112                _ => DatabaseError::ServerError(e.to_string()),
1113            })?;
1114
1115        // Get the new revision from the result
1116        let new_revision = result
1117            .transaction_summary
1118            .map(|s| s.revision)
1119            .unwrap_or_default();
1120
1121        // Await the database at the new revision
1122        let proto_db = client
1123            .await_database(database_name.clone(), new_revision)
1124            .await
1125            .map_err(|e| match e {
1126                crate::Error::GrpcStatus(ref status) => {
1127                    crate::status_mapping::to_database_error(status, &database_name)
1128                }
1129                _ => DatabaseError::ServerError(e.to_string()),
1130            })?;
1131
1132        DatabaseAtRevisionImpl::new(client, proto_db)
1133            .map_err(|e| DatabaseError::ServerError(format!("failed to parse database: {}", e)))
1134    }
1135
1136    /// Commit the transaction asynchronously.
1137    ///
1138    /// Returns a correlation ID for tracking.
1139    pub async fn commit_async(self) -> Result<CorrelationId, DatabaseError> {
1140        let events = NonEmpty::from_vec(self.events)
1141            .ok_or(DatabaseError::Transaction(TransactionError::Empty))?;
1142
1143        let mut client = self.connection.inner.client.clone();
1144        let database_name = self.connection.inner.database_name.to_string();
1145        let transaction_id = self
1146            .transaction_id
1147            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1148
1149        // Convert domain types to proto
1150        let proto_events: Vec<proto_ce::CloudEvent> =
1151            events.into_iter().map(|pe| pe.into()).collect();
1152        let proto_conditions: Vec<proto::AppendCondition> =
1153            self.conditions.into_iter().map(|c| c.into()).collect();
1154
1155        let response = client
1156            .transact_async_with_options(
1157                transaction_id,
1158                database_name.clone(),
1159                proto_events,
1160                proto_conditions,
1161                self.correlation_id,
1162                self.causation_id,
1163            )
1164            .await
1165            .map_err(|e| match e {
1166                crate::Error::GrpcStatus(ref status) => {
1167                    crate::status_mapping::to_database_error(status, &database_name)
1168                }
1169                _ => DatabaseError::ServerError(e.to_string()),
1170            })?;
1171
1172        Ok(CorrelationId(response.correlation_id))
1173    }
1174}
1175
1176// =============================================================================
1177// State Change Builder
1178// =============================================================================
1179
1180/// Builder for executing state changes.
1181///
1182/// # Example
1183///
1184/// ```ignore
1185/// let db = conn
1186///     .state_change("create-account", 1)
1187///     .json(&CreateAccountRequest { name: "Alice" })?
1188///     .with_correlation_id("order-12345")
1189///     .execute()
1190///     .await?;
1191/// ```
1192pub struct StateChangeBuilder {
1193    connection: Connection,
1194    name: String,
1195    version: StateChangeVersion,
1196    request: Option<CommandRequest>,
1197    transaction_id: Option<String>,
1198    correlation_id: Option<String>,
1199    causation_id: Option<String>,
1200}
1201
1202impl StateChangeBuilder {
1203    /// Create a new state change builder.
1204    pub fn new(connection: Connection, name: String, version: StateChangeVersion) -> Self {
1205        Self {
1206            connection,
1207            name,
1208            version,
1209            request: None,
1210            transaction_id: None,
1211            correlation_id: None,
1212            causation_id: None,
1213        }
1214    }
1215
1216    /// Set the request body as JSON.
1217    pub fn json<T: ::serde::Serialize>(
1218        mut self,
1219        value: &T,
1220    ) -> Result<Self, evidentsource_core::domain::CommandRequestError> {
1221        self.request = Some(CommandRequest::json(value)?);
1222        Ok(self)
1223    }
1224
1225    /// Set the raw command request.
1226    pub fn request(mut self, request: CommandRequest) -> Self {
1227        self.request = Some(request);
1228        self
1229    }
1230
1231    /// Set a specific transaction ID for idempotency.
1232    pub fn with_transaction_id(mut self, transaction_id: impl Into<String>) -> Self {
1233        self.transaction_id = Some(transaction_id.into());
1234        self
1235    }
1236
1237    /// Set the correlation ID for grouping related events in a business flow.
1238    ///
1239    /// See: <https://github.com/cloudevents/spec/blob/main/cloudevents/extensions/correlation.md>
1240    pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
1241        self.correlation_id = Some(correlation_id.into());
1242        self
1243    }
1244
1245    /// Set the causation ID for parent-child event relationships.
1246    ///
1247    /// See: <https://github.com/cloudevents/spec/blob/main/cloudevents/extensions/correlation.md>
1248    pub fn with_causation_id(mut self, causation_id: impl Into<String>) -> Self {
1249        self.causation_id = Some(causation_id.into());
1250        self
1251    }
1252
1253    /// Set the content type for the request.
1254    ///
1255    /// If no request exists yet, creates a default request.
1256    ///
1257    /// # Example
1258    ///
1259    /// ```ignore
1260    /// let db = conn
1261    ///     .state_change("process-xml", 1)
1262    ///     .request(CommandRequest::with_body(xml_bytes))
1263    ///     .content_type("application/xml")
1264    ///     .execute()
1265    ///     .await?;
1266    /// ```
1267    pub fn content_type(mut self, content_type: impl Into<String>) -> Self {
1268        let request = self.request.take().unwrap_or_default();
1269        self.request = Some(request.content_type(content_type));
1270        self
1271    }
1272
1273    /// Set the content schema URL for the request.
1274    ///
1275    /// If no request exists yet, creates a default request.
1276    ///
1277    /// # Example
1278    ///
1279    /// ```ignore
1280    /// let db = conn
1281    ///     .state_change("create-account", 1)
1282    ///     .json(&request)?
1283    ///     .content_schema("https://example.com/schemas/create-account.json")
1284    ///     .execute()
1285    ///     .await?;
1286    /// ```
1287    pub fn content_schema(mut self, schema_url: impl Into<String>) -> Self {
1288        let request = self.request.take().unwrap_or_default();
1289        self.request = Some(request.content_schema(schema_url));
1290        self
1291    }
1292
1293    /// Execute the state change synchronously.
1294    pub async fn execute(self) -> Result<DatabaseAtRevisionImpl, StateChangeError> {
1295        let mut client = self.connection.inner.client.clone();
1296        let database_name = self.connection.inner.database_name.to_string();
1297        let state_change_name = self.name.clone();
1298        let current_revision = self.connection.current_metadata().revision;
1299        let request = self.request.unwrap_or_default();
1300
1301        // Convert CommandRequest to proto
1302        let proto_request = proto::CommandRequest {
1303            headers: request
1304                .headers
1305                .into_iter()
1306                .map(|(k, v)| proto::Header { key: k, value: v })
1307                .collect(),
1308            body: request.body,
1309            content_type: request.content_type,
1310            content_schema: request.content_schema,
1311        };
1312
1313        let result = client
1314            .execute_state_change_with_options(
1315                database_name.clone(),
1316                state_change_name.clone(),
1317                self.version,
1318                Some(current_revision),
1319                proto_request,
1320                self.transaction_id,
1321                self.correlation_id,
1322                self.causation_id,
1323            )
1324            .await
1325            .map_err(|e| match e {
1326                crate::Error::GrpcStatus(ref status) => {
1327                    crate::status_mapping::to_state_change_error(
1328                        status,
1329                        &state_change_name,
1330                        self.version,
1331                    )
1332                }
1333                _ => StateChangeError::ServerError(e.to_string()),
1334            })?;
1335
1336        // Get the new revision from the result
1337        let new_revision = result
1338            .transaction_summary
1339            .map(|s| s.revision)
1340            .unwrap_or_default();
1341
1342        // Await the database at the new revision
1343        let proto_db = client
1344            .await_database(database_name.clone(), new_revision)
1345            .await
1346            .map_err(|e| match e {
1347                crate::Error::GrpcStatus(ref status) => StateChangeError::Database(
1348                    crate::status_mapping::to_database_error(status, &database_name),
1349                ),
1350                _ => StateChangeError::Database(DatabaseError::ServerError(e.to_string())),
1351            })?;
1352
1353        DatabaseAtRevisionImpl::new(client, proto_db).map_err(|e| {
1354            StateChangeError::Database(DatabaseError::ServerError(format!(
1355                "failed to parse database: {}",
1356                e
1357            )))
1358        })
1359    }
1360
1361    /// Execute the state change asynchronously.
1362    pub async fn execute_async(self) -> Result<CorrelationId, StateChangeError> {
1363        let mut client = self.connection.inner.client.clone();
1364        let database_name = self.connection.inner.database_name.to_string();
1365        let state_change_name = self.name.clone();
1366        let current_revision = self.connection.current_metadata().revision;
1367        let request = self.request.unwrap_or_default();
1368
1369        // Convert CommandRequest to proto
1370        let proto_request = proto::CommandRequest {
1371            headers: request
1372                .headers
1373                .into_iter()
1374                .map(|(k, v)| proto::Header { key: k, value: v })
1375                .collect(),
1376            body: request.body,
1377            content_type: request.content_type,
1378            content_schema: request.content_schema,
1379        };
1380
1381        let response = client
1382            .execute_state_change_async_with_options(
1383                database_name,
1384                state_change_name.clone(),
1385                self.version,
1386                Some(current_revision),
1387                proto_request,
1388                self.transaction_id,
1389                self.correlation_id,
1390                self.causation_id,
1391            )
1392            .await
1393            .map_err(|e| match e {
1394                crate::Error::GrpcStatus(ref status) => {
1395                    crate::status_mapping::to_state_change_error(
1396                        status,
1397                        &state_change_name,
1398                        self.version,
1399                    )
1400                }
1401                _ => StateChangeError::ServerError(e.to_string()),
1402            })?;
1403
1404        Ok(CorrelationId(response.correlation_id))
1405    }
1406}