evidentsource_client/
lib.rs

1//! # EvidentSource Client
2//!
3//! A Rust client for connecting to EvidentSource event sourcing servers.
4//!
5//! ## API Layers
6//!
7//! This crate provides two API layers:
8//!
9//! ### High-level API (recommended)
10//!
11//! The [`client`] module provides an ergonomic, type-safe API using domain types:
12//!
13//! ```rust,ignore
14//! use evidentsource_client::client::{EvidentSource, Connection};
15//! use evidentsource_client::client::{DatabaseName, ProspectiveEvent};
16//!
17//! let es = EvidentSource::connect_to_server("http://localhost:50051").await?;
18//! let conn = es.connect(&DatabaseName::new("my-db")?).await?;
19//! let db = conn.latest_database().await?;
20//! ```
21//!
22//! ### Low-level gRPC API (advanced)
23//!
24//! The [`grpc`] module provides direct access to protocol buffer types:
25//!
26//! ```rust,ignore
27//! use evidentsource_client::grpc::{EvidentSourceClient, proto};
28//!
29//! let mut client = EvidentSourceClient::new("http://localhost:50051").await?;
30//! let db = client.fetch_latest_database("my-db".into()).await?;
31//! ```
32
33// Internal proto modules (used by connection, evident_source, etc.)
34#[allow(clippy::large_enum_variant)]
35pub(crate) mod com {
36    pub mod evidentsource {
37        tonic::include_proto!("com.evidentsource");
38    }
39}
40
41pub(crate) mod io {
42    pub mod cloudevents {
43        pub mod v1 {
44            tonic::include_proto!("io.cloudevents.v1");
45        }
46    }
47}
48
49// Internal modules
50mod auth;
51pub(crate) mod connection;
52pub(crate) mod conversions;
53pub(crate) mod database;
54pub(crate) mod evident_source;
55pub(crate) mod status_mapping;
56
57// Public API modules
58pub mod client;
59pub mod grpc;
60pub mod prelude;
61
62// Re-export high-level types at crate root for convenience
63pub use auth::{AuthInterceptor, Credentials, DevModeCredentials};
64pub use client::{Connection, EvidentSource};
65
66use futures::Stream;
67use http::Uri;
68use thiserror::Error;
69use tonic::{
70    service::interceptor::InterceptedService,
71    transport::{Channel, ClientTlsConfig},
72    Request,
73};
74
75use com::evidentsource::{
76    evident_source_client::EvidentSourceClient as Client, AwaitDatabaseRequest, CatalogRequest,
77    CreateDatabaseRequest, DatabaseEffectiveAtTimestampRequest, DatabaseUpdatesSubscriptionRequest,
78    DeleteDatabaseRequest, EventByIdRequest, EventQueryRequest, EventsByRevisionsRequest,
79    ExecuteStateChangeRequest, FetchStateViewRequest, FetchTransactionByIdRequest,
80    IndexKeyScanRequest, LatestDatabaseRequest, ListStateChangesRequest,
81    ListStateViewDefinitionsRequest, LogScanRequest, TransactionRequest,
82};
83
84#[derive(Error, Debug)]
85pub enum Error {
86    #[error("invalid URI: {0}")]
87    InvalidUri(#[from] http::uri::InvalidUri),
88    #[error(transparent)]
89    Transport(#[from] tonic::transport::Error),
90    #[error(transparent)]
91    GrpcStatus(#[from] tonic::Status),
92}
93
94#[derive(Clone, Debug)]
95pub struct EvidentSourceClient {
96    client: Client<InterceptedService<Channel, AuthInterceptor>>,
97}
98
99type ClientResult<T> = Result<T, Error>;
100
101impl EvidentSourceClient {
102    /// Create a new client without authentication.
103    ///
104    /// This only works if the server has `allow_anonymous=true`.
105    pub async fn new(addr: &str) -> ClientResult<Self> {
106        Self::with_credentials(addr, Credentials::None).await
107    }
108
109    /// Create a new client with authentication credentials.
110    ///
111    /// # Examples
112    ///
113    /// ```rust,ignore
114    /// use evidentsource_client::{EvidentSourceClient, Credentials, DevModeCredentials};
115    ///
116    /// // With bearer token (requires TLS)
117    /// let client = EvidentSourceClient::with_credentials(
118    ///     "https://api.example.com:50051",
119    ///     Credentials::BearerToken(my_jwt_token),
120    /// ).await?;
121    ///
122    /// // With DevMode credentials
123    /// let client = EvidentSourceClient::with_credentials(
124    ///     "http://localhost:50051",
125    ///     Credentials::DevMode(DevModeCredentials::new("dev-user")),
126    /// ).await?;
127    /// ```
128    pub async fn with_credentials(addr: &str, credentials: Credentials) -> ClientResult<Self> {
129        let channel = Self::build_channel(addr).await?;
130        let interceptor = AuthInterceptor::new(credentials);
131        let client = Client::with_interceptor(channel, interceptor);
132        Ok(Self { client })
133    }
134
135    async fn build_channel(addr: &str) -> ClientResult<Channel> {
136        let uri = Uri::try_from(addr)?;
137        let mut channel_builder = Channel::builder(uri.clone());
138
139        if let Some("https") = uri.scheme_str() {
140            let tls_config = ClientTlsConfig::new()
141                .with_native_roots()
142                .domain_name(uri.host().unwrap());
143            channel_builder = channel_builder.tls_config(tls_config)?;
144        }
145
146        Ok(channel_builder.connect().await?)
147    }
148
149    // Command API Methods
150
151    pub async fn create_database(
152        &mut self,
153        database_name: String,
154    ) -> ClientResult<com::evidentsource::Database> {
155        let request = Request::new(CreateDatabaseRequest { database_name });
156        let response = self.client.create_database(request).await?;
157        Ok(response.into_inner().database.unwrap())
158    }
159
160    pub async fn transact(
161        &mut self,
162        transaction_id: String,
163        database_name: String,
164        events: Vec<io::cloudevents::v1::CloudEvent>,
165        conditions: Vec<com::evidentsource::AppendCondition>,
166    ) -> ClientResult<com::evidentsource::TransactionResult> {
167        self.transact_with_options(
168            transaction_id,
169            database_name,
170            events,
171            conditions,
172            None,
173            None,
174        )
175        .await
176    }
177
178    /// Transact with optional correlation metadata.
179    ///
180    /// # Arguments
181    ///
182    /// * `correlation_id` - Groups related events across a business flow
183    ///   (CloudEvents correlation extension)
184    /// * `causation_id` - Tracks direct parent-child event relationships
185    ///   (CloudEvents correlation extension)
186    ///
187    /// See: <https://github.com/cloudevents/spec/blob/main/cloudevents/extensions/correlation.md>
188    pub async fn transact_with_options(
189        &mut self,
190        transaction_id: String,
191        database_name: String,
192        events: Vec<io::cloudevents::v1::CloudEvent>,
193        conditions: Vec<com::evidentsource::AppendCondition>,
194        correlation_id: Option<String>,
195        causation_id: Option<String>,
196    ) -> ClientResult<com::evidentsource::TransactionResult> {
197        let request = Request::new(TransactionRequest {
198            transaction_id,
199            database_name,
200            events,
201            conditions,
202            last_read_revision: None,
203            principal_attributes: Default::default(),
204            commit_message: None,
205            correlation_id,
206            causation_id,
207        });
208        let response = self.client.transact(request).await?;
209        Ok(response.into_inner().result.unwrap())
210    }
211
212    pub async fn delete_database(
213        &mut self,
214        database_name: String,
215    ) -> ClientResult<com::evidentsource::Database> {
216        let request = Request::new(DeleteDatabaseRequest { database_name });
217        let response = self.client.delete_database(request).await?;
218        Ok(response.into_inner().database.unwrap())
219    }
220
221    // Query API Methods
222
223    pub async fn fetch_catalog(
224        &mut self,
225    ) -> ClientResult<impl Stream<Item = Result<com::evidentsource::CatalogReply, tonic::Status>>>
226    {
227        let request = Request::new(CatalogRequest {});
228        let response = self.client.fetch_catalog(request).await?;
229        Ok(response.into_inner())
230    }
231
232    pub async fn fetch_latest_database(
233        &mut self,
234        database_name: String,
235    ) -> ClientResult<com::evidentsource::Database> {
236        let request = Request::new(LatestDatabaseRequest { database_name });
237        let response = self.client.fetch_latest_database(request).await?;
238        Ok(response.into_inner().database.unwrap())
239    }
240
241    pub async fn await_database(
242        &mut self,
243        database_name: String,
244        at_revision: u64,
245    ) -> ClientResult<com::evidentsource::Database> {
246        let request = Request::new(AwaitDatabaseRequest {
247            database_name,
248            at_revision,
249        });
250        let response = self.client.await_database(request).await?;
251        Ok(response.into_inner().database.unwrap())
252    }
253
254    pub async fn database_effective_at_timestamp(
255        &mut self,
256        database_name: String,
257        at_timestamp: prost_types::Timestamp,
258    ) -> ClientResult<com::evidentsource::Database> {
259        let request = Request::new(DatabaseEffectiveAtTimestampRequest {
260            database_name,
261            at_timestamp: Some(at_timestamp),
262        });
263        let response = self.client.database_effective_at_timestamp(request).await?;
264        Ok(response.into_inner().database.unwrap())
265    }
266
267    pub async fn subscribe_database_updates(
268        &mut self,
269        database_name: String,
270    ) -> ClientResult<impl Stream<Item = Result<com::evidentsource::DatabaseReply, tonic::Status>>>
271    {
272        let request = Request::new(DatabaseUpdatesSubscriptionRequest { database_name });
273        let response = self.client.subscribe_database_updates(request).await?;
274        Ok(response.into_inner())
275    }
276
277    pub async fn scan_database_log(
278        &mut self,
279        database_name: String,
280        start_at_revision: u64,
281        include_event_detail: bool,
282    ) -> ClientResult<impl Stream<Item = Result<com::evidentsource::DatabaseLogReply, tonic::Status>>>
283    {
284        let request = Request::new(LogScanRequest {
285            database_name,
286            start_at_revision,
287            include_event_detail,
288        });
289        let response = self.client.scan_database_log(request).await?;
290        Ok(response.into_inner())
291    }
292
293    pub async fn scan_index_keys(
294        &mut self,
295        database_name: String,
296        revision: u64,
297        index_key_type: com::evidentsource::index_key_scan_request::IndexKeyType,
298    ) -> ClientResult<
299        impl Stream<Item = Result<com::evidentsource::IndexKeyScanReply, tonic::Status>>,
300    > {
301        let request = Request::new(IndexKeyScanRequest {
302            database_name,
303            revision,
304            index_key_type: index_key_type.into(),
305        });
306        let response = self.client.scan_index_keys(request).await?;
307        Ok(response.into_inner())
308    }
309
310    // Updated method: query_events (previously query_event_index)
311    pub async fn query_events(
312        &mut self,
313        database_name: String,
314        revision: u64,
315        include_event_detail: bool,
316        query: com::evidentsource::DatabaseQuery,
317    ) -> ClientResult<impl Stream<Item = Result<com::evidentsource::EventQueryReply, tonic::Status>>>
318    {
319        let request = Request::new(EventQueryRequest {
320            database_name,
321            revision,
322            include_event_detail,
323            query: Some(query),
324        });
325        let response = self.client.query_events(request).await?;
326        Ok(response.into_inner())
327    }
328
329    pub async fn event_by_id(
330        &mut self,
331        database_name: String,
332        revision: u64,
333        stream: String,
334        event_id: String,
335    ) -> ClientResult<com::evidentsource::EventQueryReply> {
336        let request = Request::new(EventByIdRequest {
337            database_name,
338            revision,
339            stream,
340            event_id,
341        });
342        let response = self.client.event_by_id(request).await?;
343        Ok(response.into_inner())
344    }
345
346    pub async fn fetch_events_by_revisions(
347        &mut self,
348        database_name: String,
349        event_revisions: Vec<u64>,
350    ) -> ClientResult<com::evidentsource::EventsReply> {
351        let request = Request::new(EventsByRevisionsRequest {
352            database_name,
353            event_revisions,
354        });
355        let response = self.client.fetch_events_by_revisions(request).await?;
356        Ok(response.into_inner())
357    }
358
359    pub async fn list_state_view_definitions(
360        &mut self,
361        database_name: String,
362        status: Option<com::evidentsource::StateViewStatus>,
363    ) -> ClientResult<
364        impl Stream<Item = Result<com::evidentsource::ListStateViewDefinitionsReply, tonic::Status>>,
365    > {
366        let request = Request::new(ListStateViewDefinitionsRequest {
367            database_name,
368            status: status.map(|s| s.into()),
369        });
370        let response = self.client.list_state_view_definitions(request).await?;
371        Ok(response.into_inner())
372    }
373
374    pub async fn fetch_state_view_at_revision(
375        &mut self,
376        state_view_identity: Option<com::evidentsource::StateViewIdentity>,
377        database_revision: u64,
378        parameters: Option<com::evidentsource::ParameterBindings>,
379        effective_time_end_at: Option<prost_types::Timestamp>,
380    ) -> ClientResult<com::evidentsource::StateView> {
381        let request = Request::new(FetchStateViewRequest {
382            state_view_identity,
383            database_revision,
384            parameters,
385            effective_time_end_at,
386        });
387        let response = self.client.fetch_state_view_at_revision(request).await?;
388        Ok(response.into_inner().state_view.unwrap())
389    }
390
391    pub async fn execute_state_change(
392        &mut self,
393        database_name: String,
394        state_change_name: String,
395        version: u64,
396        last_seen_revision: Option<u64>,
397        request: com::evidentsource::CommandRequest,
398        transaction_id: Option<String>,
399    ) -> ClientResult<com::evidentsource::TransactionResult> {
400        self.execute_state_change_with_options(
401            database_name,
402            state_change_name,
403            version,
404            last_seen_revision,
405            request,
406            transaction_id,
407            None,
408            None,
409        )
410        .await
411    }
412
413    /// Execute a state change with optional correlation metadata.
414    ///
415    /// # Arguments
416    ///
417    /// * `correlation_id` - Groups related events across a business flow
418    ///   (CloudEvents correlation extension)
419    /// * `causation_id` - Tracks direct parent-child event relationships
420    ///   (CloudEvents correlation extension)
421    ///
422    /// See: <https://github.com/cloudevents/spec/blob/main/cloudevents/extensions/correlation.md>
423    #[allow(clippy::too_many_arguments)]
424    pub async fn execute_state_change_with_options(
425        &mut self,
426        database_name: String,
427        state_change_name: String,
428        version: u64,
429        last_seen_revision: Option<u64>,
430        request: com::evidentsource::CommandRequest,
431        transaction_id: Option<String>,
432        correlation_id: Option<String>,
433        causation_id: Option<String>,
434    ) -> ClientResult<com::evidentsource::TransactionResult> {
435        let request = Request::new(ExecuteStateChangeRequest {
436            database_name,
437            state_change_name,
438            version,
439            last_seen_revision,
440            request: Some(request),
441            transaction_id,
442            principal_attributes: Default::default(),
443            commit_message: None,
444            correlation_id,
445            causation_id,
446        });
447        let response = self.client.execute_state_change(request).await?;
448        Ok(response.into_inner().result.unwrap())
449    }
450
451    // Async Command API Methods
452
453    /// Transact asynchronously, returning a correlation ID.
454    ///
455    /// The correlation ID can be used to track the result via Kafka.
456    pub async fn transact_async(
457        &mut self,
458        transaction_id: String,
459        database_name: String,
460        events: Vec<io::cloudevents::v1::CloudEvent>,
461        conditions: Vec<com::evidentsource::AppendCondition>,
462    ) -> ClientResult<com::evidentsource::AsyncCommandResponse> {
463        self.transact_async_with_options(
464            transaction_id,
465            database_name,
466            events,
467            conditions,
468            None,
469            None,
470        )
471        .await
472    }
473
474    /// Transact asynchronously with optional correlation metadata.
475    ///
476    /// See: <https://github.com/cloudevents/spec/blob/main/cloudevents/extensions/correlation.md>
477    pub async fn transact_async_with_options(
478        &mut self,
479        transaction_id: String,
480        database_name: String,
481        events: Vec<io::cloudevents::v1::CloudEvent>,
482        conditions: Vec<com::evidentsource::AppendCondition>,
483        correlation_id: Option<String>,
484        causation_id: Option<String>,
485    ) -> ClientResult<com::evidentsource::AsyncCommandResponse> {
486        let request = Request::new(TransactionRequest {
487            transaction_id,
488            database_name,
489            events,
490            conditions,
491            last_read_revision: None,
492            principal_attributes: Default::default(),
493            commit_message: None,
494            correlation_id,
495            causation_id,
496        });
497        let response = self.client.transact_async(request).await?;
498        Ok(response.into_inner())
499    }
500
501    /// Execute a state change asynchronously, returning a correlation ID.
502    ///
503    /// The correlation ID can be used to track the result via Kafka.
504    pub async fn execute_state_change_async(
505        &mut self,
506        database_name: String,
507        state_change_name: String,
508        version: u64,
509        last_seen_revision: Option<u64>,
510        request: com::evidentsource::CommandRequest,
511        transaction_id: Option<String>,
512    ) -> ClientResult<com::evidentsource::AsyncCommandResponse> {
513        self.execute_state_change_async_with_options(
514            database_name,
515            state_change_name,
516            version,
517            last_seen_revision,
518            request,
519            transaction_id,
520            None,
521            None,
522        )
523        .await
524    }
525
526    /// Execute a state change asynchronously with optional correlation metadata.
527    ///
528    /// See: <https://github.com/cloudevents/spec/blob/main/cloudevents/extensions/correlation.md>
529    #[allow(clippy::too_many_arguments)]
530    pub async fn execute_state_change_async_with_options(
531        &mut self,
532        database_name: String,
533        state_change_name: String,
534        version: u64,
535        last_seen_revision: Option<u64>,
536        request: com::evidentsource::CommandRequest,
537        transaction_id: Option<String>,
538        correlation_id: Option<String>,
539        causation_id: Option<String>,
540    ) -> ClientResult<com::evidentsource::AsyncCommandResponse> {
541        let request = Request::new(ExecuteStateChangeRequest {
542            database_name,
543            state_change_name,
544            version,
545            last_seen_revision,
546            request: Some(request),
547            transaction_id,
548            principal_attributes: Default::default(),
549            commit_message: None,
550            correlation_id,
551            causation_id,
552        });
553        let response = self.client.execute_state_change_async(request).await?;
554        Ok(response.into_inner())
555    }
556
557    // Additional Query API Methods
558
559    /// List state change definitions registered with the database.
560    pub async fn list_state_changes(
561        &mut self,
562        database_name: String,
563    ) -> ClientResult<
564        impl Stream<Item = Result<com::evidentsource::ListStateChangesReply, tonic::Status>>,
565    > {
566        let request = Request::new(ListStateChangesRequest { database_name });
567        let response = self.client.list_state_changes(request).await?;
568        Ok(response.into_inner())
569    }
570
571    /// Fetch a transaction by its ID.
572    pub async fn fetch_transaction_by_id(
573        &mut self,
574        database_name: String,
575        transaction_id: String,
576    ) -> ClientResult<com::evidentsource::FetchTransactionReply> {
577        let request = Request::new(FetchTransactionByIdRequest {
578            database_name,
579            transaction_id,
580        });
581        let response = self.client.fetch_transaction_by_id(request).await?;
582        Ok(response.into_inner())
583    }
584}
585
586#[cfg(test)]
587mod tests {
588    #[test]
589    fn it_works() {}
590}