1use std::sync::Arc;
4
5use chrono::{DateTime, Utc};
6use futures::stream::{self, StreamExt};
7use futures::Stream;
8use nonempty::NonEmpty;
9
10use evidentsource_core::domain::{
11 DatabaseName, Event, EventAttribute, EventSelector, ProspectiveEvent, QueryOptions, Revision,
12 StateView, StateViewError, StateViewName, StateViewVersion,
13};
14use evidentsource_core::{DatabaseAtRevision, DatabaseIdentity};
15
16use crate::com::evidentsource as proto;
17use crate::conversions::ConversionError;
18use crate::EvidentSourceClient;
19
20use super::effective_timestamp::EffectiveTimestampViewImpl;
21use super::speculative::SpeculativeDatabaseImpl;
22
23struct DatabaseAtRevisionInner {
25 client: EvidentSourceClient,
27 name: DatabaseName,
29 created_at: DateTime<Utc>,
31 revision: Revision,
33 revision_timestamp: DateTime<Utc>,
35}
36
37#[derive(Clone)]
42pub struct DatabaseAtRevisionImpl {
43 inner: Arc<DatabaseAtRevisionInner>,
44}
45
46impl DatabaseAtRevisionImpl {
47 pub fn new(
49 client: EvidentSourceClient,
50 proto_db: proto::Database,
51 ) -> Result<Self, ConversionError> {
52 use crate::conversions::timestamp_to_datetime;
53
54 let name = DatabaseName::new(&proto_db.name)?;
55 let created_at = proto_db
56 .created_at
57 .ok_or_else(|| ConversionError::missing_field("Database", "created_at"))
58 .and_then(timestamp_to_datetime)?;
59 let revision_timestamp = proto_db
60 .revision_timestamp
61 .ok_or_else(|| ConversionError::missing_field("Database", "revision_timestamp"))
62 .and_then(timestamp_to_datetime)?;
63
64 Ok(Self {
65 inner: Arc::new(DatabaseAtRevisionInner {
66 client,
67 name,
68 created_at,
69 revision: proto_db.revision,
70 revision_timestamp,
71 }),
72 })
73 }
74
75 pub(crate) fn at_revision_with_metadata(
77 client: EvidentSourceClient,
78 name: DatabaseName,
79 created_at: DateTime<Utc>,
80 revision: Revision,
81 revision_timestamp: DateTime<Utc>,
82 ) -> Self {
83 Self {
84 inner: Arc::new(DatabaseAtRevisionInner {
85 client,
86 name,
87 created_at,
88 revision,
89 revision_timestamp,
90 }),
91 }
92 }
93
94 pub(crate) fn client(&self) -> EvidentSourceClient {
96 self.inner.client.clone()
97 }
98}
99
100impl std::fmt::Debug for DatabaseAtRevisionImpl {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 f.debug_struct("DatabaseAtRevisionImpl")
103 .field("name", &self.inner.name)
104 .field("revision", &self.inner.revision)
105 .finish()
106 }
107}
108
109impl DatabaseIdentity for DatabaseAtRevisionImpl {
110 fn name(&self) -> &DatabaseName {
111 &self.inner.name
112 }
113
114 fn created_at(&self) -> DateTime<Utc> {
115 self.inner.created_at
116 }
117}
118
119impl DatabaseAtRevision for DatabaseAtRevisionImpl {
120 type EffectiveTimestampView = EffectiveTimestampViewImpl;
121 type Speculative = SpeculativeDatabaseImpl;
122
123 fn revision(&self) -> Revision {
124 self.inner.revision
125 }
126
127 fn revision_timestamp(&self) -> DateTime<Utc> {
128 self.inner.revision_timestamp
129 }
130
131 fn at_effective_timestamp(
132 &self,
133 effective_timestamp: DateTime<Utc>,
134 ) -> Self::EffectiveTimestampView {
135 EffectiveTimestampViewImpl::new(self.clone(), effective_timestamp)
136 }
137
138 fn speculate_with_transaction(
139 &self,
140 transaction: NonEmpty<ProspectiveEvent>,
141 ) -> Self::Speculative {
142 SpeculativeDatabaseImpl::new(self.clone(), transaction)
143 }
144
145 fn at_revision(&self, revision: Revision) -> impl std::future::Future<Output = Self> {
146 let mut client = self.inner.client.clone();
147 let database_name = self.inner.name.to_string();
148 let name = self.inner.name.clone();
149 let created_at = self.inner.created_at;
150
151 async move {
152 let proto_db = client.await_database(database_name, revision).await;
154
155 match proto_db {
156 Ok(db) => {
157 use crate::conversions::timestamp_to_datetime;
158 let revision_timestamp = db
159 .revision_timestamp
160 .and_then(|ts| timestamp_to_datetime(ts).ok())
161 .unwrap_or_else(chrono::Utc::now);
162
163 Self::at_revision_with_metadata(
164 client,
165 name,
166 created_at,
167 db.revision,
168 revision_timestamp,
169 )
170 }
171 Err(_) => {
172 Self::at_revision_with_metadata(
174 client,
175 name,
176 created_at,
177 revision,
178 chrono::Utc::now(),
179 )
180 }
181 }
182 }
183 }
184
185 fn query_events(&self, selector: &EventSelector) -> impl Stream<Item = Event> {
186 let database_name = self.inner.name.to_string();
187 let revision = self.inner.revision;
188 let mut client = self.inner.client.clone();
189 let proto_selector: proto::EventSelector = selector.clone().into();
190
191 let query = proto::DatabaseQuery {
192 selector: Some(proto_selector),
193 range: Some(proto::QueryRange {
194 range: Some(proto::query_range::Range::Revision(
195 proto::query_range::RevisionRange { start_at: Some(0) },
196 )),
197 }),
198 direction: proto::QueryDirection::Forward as i32,
199 limit: None,
200 };
201
202 stream::once(async move {
203 let result = client
204 .query_events(database_name, revision, true, query)
205 .await;
206
207 match result {
208 Ok(response_stream) => response_stream
209 .filter_map(|result| async move {
210 match result {
211 Ok(reply) => {
212 if let Some(proto::event_query_reply::Event::Detail(ce)) =
213 reply.event
214 {
215 Event::try_from(ce).ok()
216 } else {
217 None
218 }
219 }
220 Err(_) => None,
221 }
222 })
223 .boxed(),
224 Err(_) => stream::empty().boxed(),
225 }
226 })
227 .flatten()
228 }
229
230 fn query_events_with_options(
231 &self,
232 selector: &EventSelector,
233 options: QueryOptions,
234 ) -> impl Stream<Item = Event> {
235 use evidentsource_core::domain::QueryDirection;
236
237 let database_name = self.inner.name.to_string();
238 let revision = self.inner.revision;
239 let mut client = self.inner.client.clone();
240 let proto_selector: proto::EventSelector = selector.clone().into();
241
242 let direction = match options.get_direction() {
243 QueryDirection::Forward => proto::QueryDirection::Forward as i32,
244 QueryDirection::Reverse => proto::QueryDirection::Reverse as i32,
245 };
246
247 let query = proto::DatabaseQuery {
248 selector: Some(proto_selector),
249 range: Some(proto::QueryRange {
250 range: Some(proto::query_range::Range::Revision(
251 proto::query_range::RevisionRange { start_at: Some(0) },
252 )),
253 }),
254 direction,
255 limit: options.get_limit().map(|l| l as u32),
256 };
257
258 stream::once(async move {
259 let result = client
260 .query_events(database_name, revision, true, query)
261 .await;
262
263 match result {
264 Ok(response_stream) => response_stream
265 .filter_map(|result| async move {
266 match result {
267 Ok(reply) => {
268 if let Some(proto::event_query_reply::Event::Detail(ce)) =
269 reply.event
270 {
271 Event::try_from(ce).ok()
272 } else {
273 None
274 }
275 }
276 Err(_) => None,
277 }
278 })
279 .boxed(),
280 Err(_) => stream::empty().boxed(),
281 }
282 })
283 .flatten()
284 }
285
286 fn view_state(
287 &self,
288 name: &StateViewName,
289 version: StateViewVersion,
290 ) -> impl std::future::Future<Output = Result<StateView, StateViewError>> {
291 let state_view_name = name.to_string();
292 let identity = proto::StateViewIdentity {
293 database_name: self.inner.name.to_string(),
294 state_view_name: state_view_name.clone(),
295 state_view_version: version,
296 };
297 let revision = self.inner.revision;
298 let mut client = self.inner.client.clone();
299
300 async move {
301 let proto_view = client
302 .fetch_state_view_at_revision(Some(identity), revision, None, None)
303 .await
304 .map_err(|e| match e {
305 crate::Error::GrpcStatus(ref status) => {
306 crate::status_mapping::to_state_view_error(
307 status,
308 &state_view_name,
309 version,
310 )
311 }
312 _ => StateViewError::ServerError(e.to_string()),
313 })?;
314
315 StateView::try_from(proto_view).map_err(|e| {
316 StateViewError::ServerError(format!("failed to parse state view: {}", e))
317 })
318 }
319 }
320
321 fn view_state_with_params(
322 &self,
323 name: &StateViewName,
324 version: StateViewVersion,
325 params: &[(String, EventAttribute)],
326 ) -> impl std::future::Future<Output = Result<StateView, StateViewError>> {
327 let state_view_name = name.to_string();
328 let identity = proto::StateViewIdentity {
329 database_name: self.inner.name.to_string(),
330 state_view_name: state_view_name.clone(),
331 state_view_version: version,
332 };
333 let revision = self.inner.revision;
334 let mut client = self.inner.client.clone();
335
336 let param_bindings = if params.is_empty() {
338 None
339 } else {
340 Some(proto::ParameterBindings {
341 bindings: params
342 .iter()
343 .map(|(k, v)| (k.clone(), v.clone().into()))
344 .collect(),
345 })
346 };
347
348 async move {
349 let proto_view = client
350 .fetch_state_view_at_revision(Some(identity), revision, param_bindings, None)
351 .await
352 .map_err(|e| match e {
353 crate::Error::GrpcStatus(ref status) => {
354 crate::status_mapping::to_state_view_error(
355 status,
356 &state_view_name,
357 version,
358 )
359 }
360 _ => StateViewError::ServerError(e.to_string()),
361 })?;
362
363 StateView::try_from(proto_view).map_err(|e| {
364 StateViewError::ServerError(format!("failed to parse state view: {}", e))
365 })
366 }
367 }
368}
369
370impl DatabaseAtRevisionImpl {
375 pub fn events(&self, selector: EventSelector) -> EventQueryBuilder {
394 EventQueryBuilder::new(self.clone(), selector)
395 }
396
397 pub fn list_streams(&self) -> impl Stream<Item = evidentsource_core::domain::StreamName> {
410 use crate::com::evidentsource::index_key_scan_request::IndexKeyType;
411 use evidentsource_core::domain::StreamName;
412
413 let mut client = self.inner.client.clone();
414 let database_name = self.inner.name.to_string();
415 let revision = self.inner.revision;
416
417 stream::once(async move {
418 let result = client
419 .scan_index_keys(database_name, revision, IndexKeyType::Stream)
420 .await;
421
422 match result {
423 Ok(response_stream) => response_stream
424 .filter_map(|result| async move {
425 match result {
426 Ok(reply) => StreamName::new(&reply.index_key).ok(),
427 Err(_) => None,
428 }
429 })
430 .boxed(),
431 Err(_) => stream::empty().boxed(),
432 }
433 })
434 .flatten()
435 }
436
437 pub fn list_subjects(&self) -> impl Stream<Item = evidentsource_core::domain::EventSubject> {
447 use crate::com::evidentsource::index_key_scan_request::IndexKeyType;
448 use evidentsource_core::domain::EventSubject;
449
450 let mut client = self.inner.client.clone();
451 let database_name = self.inner.name.to_string();
452 let revision = self.inner.revision;
453
454 stream::once(async move {
455 let result = client
456 .scan_index_keys(database_name, revision, IndexKeyType::Subject)
457 .await;
458
459 match result {
460 Ok(response_stream) => response_stream
461 .filter_map(|result| async move {
462 match result {
463 Ok(reply) => EventSubject::new(&reply.index_key).ok(),
464 Err(_) => None,
465 }
466 })
467 .boxed(),
468 Err(_) => stream::empty().boxed(),
469 }
470 })
471 .flatten()
472 }
473
474 pub fn list_event_types(&self) -> impl Stream<Item = evidentsource_core::domain::EventType> {
484 use crate::com::evidentsource::index_key_scan_request::IndexKeyType;
485 use evidentsource_core::domain::EventType;
486
487 let mut client = self.inner.client.clone();
488 let database_name = self.inner.name.to_string();
489 let revision = self.inner.revision;
490
491 stream::once(async move {
492 let result = client
493 .scan_index_keys(database_name, revision, IndexKeyType::EventType)
494 .await;
495
496 match result {
497 Ok(response_stream) => response_stream
498 .filter_map(|result| async move {
499 match result {
500 Ok(reply) => EventType::new(&reply.index_key).ok(),
501 Err(_) => None,
502 }
503 })
504 .boxed(),
505 Err(_) => stream::empty().boxed(),
506 }
507 })
508 .flatten()
509 }
510}
511
512pub struct EventQueryBuilder {
536 database: DatabaseAtRevisionImpl,
537 selector: EventSelector,
538 options: QueryOptions,
539}
540
541impl EventQueryBuilder {
542 pub fn new(database: DatabaseAtRevisionImpl, selector: EventSelector) -> Self {
544 Self {
545 database,
546 selector,
547 options: QueryOptions::default(),
548 }
549 }
550
551 pub fn direction(mut self, direction: evidentsource_core::domain::QueryDirection) -> Self {
553 self.options = self.options.direction(direction);
554 self
555 }
556
557 pub fn reverse(mut self) -> Self {
559 self.options = self.options.reverse();
560 self
561 }
562
563 pub fn forward(mut self) -> Self {
565 self.options = self.options.forward();
566 self
567 }
568
569 pub fn limit(mut self, limit: u64) -> Self {
571 self.options = self.options.limit(limit);
572 self
573 }
574
575 pub async fn collect(self) -> Vec<Event> {
577 use futures::StreamExt as _;
578 self.database
579 .query_events_with_options(&self.selector, self.options)
580 .collect()
581 .await
582 }
583
584 pub async fn first(mut self) -> Option<Event> {
586 use futures::StreamExt as _;
587 self.options = self.options.limit(1);
588 self.database
589 .query_events_with_options(&self.selector, self.options)
590 .boxed()
591 .next()
592 .await
593 }
594
595 pub fn typed<T>(self) -> TypedEventQuery<T>
620 where
621 T: for<'a> TryFrom<&'a Event> + 'static,
622 {
623 TypedEventQuery::new(self)
624 }
625}
626
627pub struct TypedEventQuery<T> {
629 builder: EventQueryBuilder,
630 _phantom: std::marker::PhantomData<T>,
631}
632
633impl<T> TypedEventQuery<T>
634where
635 T: for<'a> TryFrom<&'a Event> + 'static,
636{
637 fn new(builder: EventQueryBuilder) -> Self {
638 Self {
639 builder,
640 _phantom: std::marker::PhantomData,
641 }
642 }
643
644 pub async fn collect(self) -> Vec<T> {
648 let events: Vec<Event> = self.builder.collect().await;
649 events
650 .iter()
651 .filter_map(|event| T::try_from(event).ok())
652 .collect()
653 }
654
655 pub async fn first(self) -> Option<T> {
657 let event = self.builder.first().await?;
658 T::try_from(&event).ok()
659 }
660}
661
662pub trait DatabaseAtRevisionTyped: DatabaseAtRevision {
671 fn view_state_as<T>(
687 &self,
688 name: &StateViewName,
689 version: StateViewVersion,
690 ) -> impl std::future::Future<Output = Result<T, StateViewError>>
691 where
692 T: serde::de::DeserializeOwned;
693
694 fn view_state_opt<T>(
699 &self,
700 name: &StateViewName,
701 version: StateViewVersion,
702 ) -> impl std::future::Future<Output = Result<Option<T>, StateViewError>>
703 where
704 T: serde::de::DeserializeOwned;
705}
706
707impl DatabaseAtRevisionTyped for DatabaseAtRevisionImpl {
708 fn view_state_as<T>(
709 &self,
710 name: &StateViewName,
711 version: StateViewVersion,
712 ) -> impl std::future::Future<Output = Result<T, StateViewError>>
713 where
714 T: serde::de::DeserializeOwned,
715 {
716 let view_future = self.view_state(name, version);
717
718 async move {
719 let state_view = view_future.await?;
720
721 let content = state_view.content_bytes().ok_or_else(|| {
723 StateViewError::ServerError("state view has no content".to_string())
724 })?;
725
726 serde_json::from_slice(content).map_err(|e| {
728 StateViewError::ServerError(format!("failed to deserialize state view: {}", e))
729 })
730 }
731 }
732
733 fn view_state_opt<T>(
734 &self,
735 name: &StateViewName,
736 version: StateViewVersion,
737 ) -> impl std::future::Future<Output = Result<Option<T>, StateViewError>>
738 where
739 T: serde::de::DeserializeOwned,
740 {
741 let view_future = self.view_state(name, version);
742
743 async move {
744 match view_future.await {
745 Ok(state_view) => {
746 match state_view.content_bytes() {
748 Some(content) => {
749 let value = serde_json::from_slice(content).map_err(|e| {
750 StateViewError::ServerError(format!(
751 "failed to deserialize state view: {}",
752 e
753 ))
754 })?;
755 Ok(Some(value))
756 }
757 None => Ok(None),
758 }
759 }
760 Err(StateViewError::NotFound { .. }) => Ok(None),
761 Err(e) => Err(e),
762 }
763 }
764 }
765}