1#[allow(clippy::large_enum_variant)]
35pub(crate) mod com {
36 pub mod evidentsource {
37 tonic::include_proto!("com.evidentsource");
38 }
39}
40
41pub(crate) mod io {
42 pub mod cloudevents {
43 pub mod v1 {
44 tonic::include_proto!("io.cloudevents.v1");
45 }
46 }
47}
48
49mod auth;
51pub(crate) mod connection;
52pub(crate) mod conversions;
53pub(crate) mod database;
54pub(crate) mod evident_source;
55pub(crate) mod status_mapping;
56
57pub mod client;
59pub mod grpc;
60pub mod prelude;
61
62pub use auth::{AuthInterceptor, Credentials, DevModeCredentials};
64pub use client::{Connection, EvidentSource};
65
66use futures::Stream;
67use http::Uri;
68use thiserror::Error;
69use tonic::{
70 service::interceptor::InterceptedService,
71 transport::{Channel, ClientTlsConfig},
72 Request,
73};
74
75use com::evidentsource::{
76 evident_source_client::EvidentSourceClient as Client, AwaitDatabaseRequest, CatalogRequest,
77 CreateDatabaseRequest, DatabaseEffectiveAtTimestampRequest, DatabaseUpdatesSubscriptionRequest,
78 DeleteDatabaseRequest, EventByIdRequest, EventQueryRequest, EventsByRevisionsRequest,
79 ExecuteStateChangeRequest, FetchStateViewRequest, FetchTransactionByIdRequest,
80 IndexKeyScanRequest, LatestDatabaseRequest, ListStateChangesRequest,
81 ListStateViewDefinitionsRequest, LogScanRequest, TransactionRequest,
82};
83
84#[derive(Error, Debug)]
85pub enum Error {
86 #[error("invalid URI: {0}")]
87 InvalidUri(#[from] http::uri::InvalidUri),
88 #[error(transparent)]
89 Transport(#[from] tonic::transport::Error),
90 #[error(transparent)]
91 GrpcStatus(#[from] tonic::Status),
92}
93
94#[derive(Clone, Debug)]
95pub struct EvidentSourceClient {
96 client: Client<InterceptedService<Channel, AuthInterceptor>>,
97}
98
99type ClientResult<T> = Result<T, Error>;
100
101impl EvidentSourceClient {
102 pub async fn new(addr: &str) -> ClientResult<Self> {
106 Self::with_credentials(addr, Credentials::None).await
107 }
108
109 pub async fn with_credentials(addr: &str, credentials: Credentials) -> ClientResult<Self> {
129 let channel = Self::build_channel(addr).await?;
130 let interceptor = AuthInterceptor::new(credentials);
131 let client = Client::with_interceptor(channel, interceptor);
132 Ok(Self { client })
133 }
134
135 async fn build_channel(addr: &str) -> ClientResult<Channel> {
136 let uri = Uri::try_from(addr)?;
137 let mut channel_builder = Channel::builder(uri.clone());
138
139 if let Some("https") = uri.scheme_str() {
140 let tls_config = ClientTlsConfig::new()
141 .with_native_roots()
142 .domain_name(uri.host().unwrap());
143 channel_builder = channel_builder.tls_config(tls_config)?;
144 }
145
146 Ok(channel_builder.connect().await?)
147 }
148
149 pub async fn create_database(
152 &mut self,
153 database_name: String,
154 ) -> ClientResult<com::evidentsource::Database> {
155 let request = Request::new(CreateDatabaseRequest { database_name });
156 let response = self.client.create_database(request).await?;
157 Ok(response.into_inner().database.unwrap())
158 }
159
160 pub async fn transact(
161 &mut self,
162 transaction_id: String,
163 database_name: String,
164 events: Vec<io::cloudevents::v1::CloudEvent>,
165 conditions: Vec<com::evidentsource::AppendCondition>,
166 ) -> ClientResult<com::evidentsource::TransactionResult> {
167 self.transact_with_options(
168 transaction_id,
169 database_name,
170 events,
171 conditions,
172 None,
173 None,
174 )
175 .await
176 }
177
178 pub async fn transact_with_options(
189 &mut self,
190 transaction_id: String,
191 database_name: String,
192 events: Vec<io::cloudevents::v1::CloudEvent>,
193 conditions: Vec<com::evidentsource::AppendCondition>,
194 correlation_id: Option<String>,
195 causation_id: Option<String>,
196 ) -> ClientResult<com::evidentsource::TransactionResult> {
197 let request = Request::new(TransactionRequest {
198 transaction_id,
199 database_name,
200 events,
201 conditions,
202 last_read_revision: None,
203 principal_attributes: Default::default(),
204 commit_message: None,
205 correlation_id,
206 causation_id,
207 });
208 let response = self.client.transact(request).await?;
209 Ok(response.into_inner().result.unwrap())
210 }
211
212 pub async fn delete_database(
213 &mut self,
214 database_name: String,
215 ) -> ClientResult<com::evidentsource::Database> {
216 let request = Request::new(DeleteDatabaseRequest { database_name });
217 let response = self.client.delete_database(request).await?;
218 Ok(response.into_inner().database.unwrap())
219 }
220
221 pub async fn fetch_catalog(
224 &mut self,
225 ) -> ClientResult<impl Stream<Item = Result<com::evidentsource::CatalogReply, tonic::Status>>>
226 {
227 let request = Request::new(CatalogRequest {});
228 let response = self.client.fetch_catalog(request).await?;
229 Ok(response.into_inner())
230 }
231
232 pub async fn fetch_latest_database(
233 &mut self,
234 database_name: String,
235 ) -> ClientResult<com::evidentsource::Database> {
236 let request = Request::new(LatestDatabaseRequest { database_name });
237 let response = self.client.fetch_latest_database(request).await?;
238 Ok(response.into_inner().database.unwrap())
239 }
240
241 pub async fn await_database(
242 &mut self,
243 database_name: String,
244 at_revision: u64,
245 ) -> ClientResult<com::evidentsource::Database> {
246 let request = Request::new(AwaitDatabaseRequest {
247 database_name,
248 at_revision,
249 });
250 let response = self.client.await_database(request).await?;
251 Ok(response.into_inner().database.unwrap())
252 }
253
254 pub async fn database_effective_at_timestamp(
255 &mut self,
256 database_name: String,
257 at_timestamp: prost_types::Timestamp,
258 ) -> ClientResult<com::evidentsource::Database> {
259 let request = Request::new(DatabaseEffectiveAtTimestampRequest {
260 database_name,
261 at_timestamp: Some(at_timestamp),
262 });
263 let response = self.client.database_effective_at_timestamp(request).await?;
264 Ok(response.into_inner().database.unwrap())
265 }
266
267 pub async fn subscribe_database_updates(
268 &mut self,
269 database_name: String,
270 ) -> ClientResult<impl Stream<Item = Result<com::evidentsource::DatabaseReply, tonic::Status>>>
271 {
272 let request = Request::new(DatabaseUpdatesSubscriptionRequest { database_name });
273 let response = self.client.subscribe_database_updates(request).await?;
274 Ok(response.into_inner())
275 }
276
277 pub async fn scan_database_log(
278 &mut self,
279 database_name: String,
280 start_at_revision: u64,
281 include_event_detail: bool,
282 ) -> ClientResult<impl Stream<Item = Result<com::evidentsource::DatabaseLogReply, tonic::Status>>>
283 {
284 let request = Request::new(LogScanRequest {
285 database_name,
286 start_at_revision,
287 include_event_detail,
288 });
289 let response = self.client.scan_database_log(request).await?;
290 Ok(response.into_inner())
291 }
292
293 pub async fn scan_index_keys(
294 &mut self,
295 database_name: String,
296 revision: u64,
297 index_key_type: com::evidentsource::index_key_scan_request::IndexKeyType,
298 ) -> ClientResult<
299 impl Stream<Item = Result<com::evidentsource::IndexKeyScanReply, tonic::Status>>,
300 > {
301 let request = Request::new(IndexKeyScanRequest {
302 database_name,
303 revision,
304 index_key_type: index_key_type.into(),
305 });
306 let response = self.client.scan_index_keys(request).await?;
307 Ok(response.into_inner())
308 }
309
310 pub async fn query_events(
312 &mut self,
313 database_name: String,
314 revision: u64,
315 include_event_detail: bool,
316 query: com::evidentsource::DatabaseQuery,
317 ) -> ClientResult<impl Stream<Item = Result<com::evidentsource::EventQueryReply, tonic::Status>>>
318 {
319 let request = Request::new(EventQueryRequest {
320 database_name,
321 revision,
322 include_event_detail,
323 query: Some(query),
324 });
325 let response = self.client.query_events(request).await?;
326 Ok(response.into_inner())
327 }
328
329 pub async fn event_by_id(
330 &mut self,
331 database_name: String,
332 revision: u64,
333 stream: String,
334 event_id: String,
335 ) -> ClientResult<com::evidentsource::EventQueryReply> {
336 let request = Request::new(EventByIdRequest {
337 database_name,
338 revision,
339 stream,
340 event_id,
341 });
342 let response = self.client.event_by_id(request).await?;
343 Ok(response.into_inner())
344 }
345
346 pub async fn fetch_events_by_revisions(
347 &mut self,
348 database_name: String,
349 event_revisions: Vec<u64>,
350 ) -> ClientResult<com::evidentsource::EventsReply> {
351 let request = Request::new(EventsByRevisionsRequest {
352 database_name,
353 event_revisions,
354 });
355 let response = self.client.fetch_events_by_revisions(request).await?;
356 Ok(response.into_inner())
357 }
358
359 pub async fn list_state_view_definitions(
360 &mut self,
361 database_name: String,
362 status: Option<com::evidentsource::StateViewStatus>,
363 ) -> ClientResult<
364 impl Stream<Item = Result<com::evidentsource::ListStateViewDefinitionsReply, tonic::Status>>,
365 > {
366 let request = Request::new(ListStateViewDefinitionsRequest {
367 database_name,
368 status: status.map(|s| s.into()),
369 });
370 let response = self.client.list_state_view_definitions(request).await?;
371 Ok(response.into_inner())
372 }
373
374 pub async fn fetch_state_view_at_revision(
375 &mut self,
376 state_view_identity: Option<com::evidentsource::StateViewIdentity>,
377 database_revision: u64,
378 parameters: Option<com::evidentsource::ParameterBindings>,
379 effective_time_end_at: Option<prost_types::Timestamp>,
380 ) -> ClientResult<com::evidentsource::StateView> {
381 let request = Request::new(FetchStateViewRequest {
382 state_view_identity,
383 database_revision,
384 parameters,
385 effective_time_end_at,
386 });
387 let response = self.client.fetch_state_view_at_revision(request).await?;
388 Ok(response.into_inner().state_view.unwrap())
389 }
390
391 pub async fn execute_state_change(
392 &mut self,
393 database_name: String,
394 state_change_name: String,
395 version: u64,
396 last_seen_revision: Option<u64>,
397 request: com::evidentsource::CommandRequest,
398 transaction_id: Option<String>,
399 ) -> ClientResult<com::evidentsource::TransactionResult> {
400 self.execute_state_change_with_options(
401 database_name,
402 state_change_name,
403 version,
404 last_seen_revision,
405 request,
406 transaction_id,
407 None,
408 None,
409 )
410 .await
411 }
412
413 #[allow(clippy::too_many_arguments)]
424 pub async fn execute_state_change_with_options(
425 &mut self,
426 database_name: String,
427 state_change_name: String,
428 version: u64,
429 last_seen_revision: Option<u64>,
430 request: com::evidentsource::CommandRequest,
431 transaction_id: Option<String>,
432 correlation_id: Option<String>,
433 causation_id: Option<String>,
434 ) -> ClientResult<com::evidentsource::TransactionResult> {
435 let request = Request::new(ExecuteStateChangeRequest {
436 database_name,
437 state_change_name,
438 version,
439 last_seen_revision,
440 request: Some(request),
441 transaction_id,
442 principal_attributes: Default::default(),
443 commit_message: None,
444 correlation_id,
445 causation_id,
446 });
447 let response = self.client.execute_state_change(request).await?;
448 Ok(response.into_inner().result.unwrap())
449 }
450
451 pub async fn transact_async(
457 &mut self,
458 transaction_id: String,
459 database_name: String,
460 events: Vec<io::cloudevents::v1::CloudEvent>,
461 conditions: Vec<com::evidentsource::AppendCondition>,
462 ) -> ClientResult<com::evidentsource::AsyncCommandResponse> {
463 self.transact_async_with_options(
464 transaction_id,
465 database_name,
466 events,
467 conditions,
468 None,
469 None,
470 )
471 .await
472 }
473
474 pub async fn transact_async_with_options(
478 &mut self,
479 transaction_id: String,
480 database_name: String,
481 events: Vec<io::cloudevents::v1::CloudEvent>,
482 conditions: Vec<com::evidentsource::AppendCondition>,
483 correlation_id: Option<String>,
484 causation_id: Option<String>,
485 ) -> ClientResult<com::evidentsource::AsyncCommandResponse> {
486 let request = Request::new(TransactionRequest {
487 transaction_id,
488 database_name,
489 events,
490 conditions,
491 last_read_revision: None,
492 principal_attributes: Default::default(),
493 commit_message: None,
494 correlation_id,
495 causation_id,
496 });
497 let response = self.client.transact_async(request).await?;
498 Ok(response.into_inner())
499 }
500
501 pub async fn execute_state_change_async(
505 &mut self,
506 database_name: String,
507 state_change_name: String,
508 version: u64,
509 last_seen_revision: Option<u64>,
510 request: com::evidentsource::CommandRequest,
511 transaction_id: Option<String>,
512 ) -> ClientResult<com::evidentsource::AsyncCommandResponse> {
513 self.execute_state_change_async_with_options(
514 database_name,
515 state_change_name,
516 version,
517 last_seen_revision,
518 request,
519 transaction_id,
520 None,
521 None,
522 )
523 .await
524 }
525
526 #[allow(clippy::too_many_arguments)]
530 pub async fn execute_state_change_async_with_options(
531 &mut self,
532 database_name: String,
533 state_change_name: String,
534 version: u64,
535 last_seen_revision: Option<u64>,
536 request: com::evidentsource::CommandRequest,
537 transaction_id: Option<String>,
538 correlation_id: Option<String>,
539 causation_id: Option<String>,
540 ) -> ClientResult<com::evidentsource::AsyncCommandResponse> {
541 let request = Request::new(ExecuteStateChangeRequest {
542 database_name,
543 state_change_name,
544 version,
545 last_seen_revision,
546 request: Some(request),
547 transaction_id,
548 principal_attributes: Default::default(),
549 commit_message: None,
550 correlation_id,
551 causation_id,
552 });
553 let response = self.client.execute_state_change_async(request).await?;
554 Ok(response.into_inner())
555 }
556
557 pub async fn list_state_changes(
561 &mut self,
562 database_name: String,
563 ) -> ClientResult<
564 impl Stream<Item = Result<com::evidentsource::ListStateChangesReply, tonic::Status>>,
565 > {
566 let request = Request::new(ListStateChangesRequest { database_name });
567 let response = self.client.list_state_changes(request).await?;
568 Ok(response.into_inner())
569 }
570
571 pub async fn fetch_transaction_by_id(
573 &mut self,
574 database_name: String,
575 transaction_id: String,
576 ) -> ClientResult<com::evidentsource::FetchTransactionReply> {
577 let request = Request::new(FetchTransactionByIdRequest {
578 database_name,
579 transaction_id,
580 });
581 let response = self.client.fetch_transaction_by_id(request).await?;
582 Ok(response.into_inner())
583 }
584}
585
586#[cfg(test)]
587mod tests {
588 #[test]
589 fn it_works() {}
590}