evidentsource_client/database/
at_revision.rs

1//! DatabaseAtRevision implementation backed by gRPC.
2
3use std::sync::Arc;
4
5use chrono::{DateTime, Utc};
6use futures::stream::{self, StreamExt};
7use futures::Stream;
8use nonempty::NonEmpty;
9
10use evidentsource_core::domain::{
11    DatabaseName, Event, EventAttribute, EventSelector, ProspectiveEvent, QueryOptions, Revision,
12    StateView, StateViewError, StateViewName, StateViewVersion,
13};
14use evidentsource_core::{DatabaseAtRevision, DatabaseIdentity};
15
16use crate::com::evidentsource as proto;
17use crate::conversions::ConversionError;
18use crate::EvidentSourceClient;
19
20use super::effective_timestamp::EffectiveTimestampViewImpl;
21use super::speculative::SpeculativeDatabaseImpl;
22
23/// Inner state shared via Arc for Clone-ability.
24struct DatabaseAtRevisionInner {
25    /// Reference to the gRPC client for making queries.
26    client: EvidentSourceClient,
27    /// Database name.
28    name: DatabaseName,
29    /// When the database was created.
30    created_at: DateTime<Utc>,
31    /// This view's revision number.
32    revision: Revision,
33    /// When this revision was committed.
34    revision_timestamp: DateTime<Utc>,
35}
36
37/// A database view at a specific revision, backed by gRPC calls.
38///
39/// This implements `DatabaseAtRevision` trait from the core crate, allowing
40/// queries for events and state views at a specific point-in-time revision.
41#[derive(Clone)]
42pub struct DatabaseAtRevisionImpl {
43    inner: Arc<DatabaseAtRevisionInner>,
44}
45
46impl DatabaseAtRevisionImpl {
47    /// Create a new database view from a proto Database message.
48    pub fn new(
49        client: EvidentSourceClient,
50        proto_db: proto::Database,
51    ) -> Result<Self, ConversionError> {
52        use crate::conversions::timestamp_to_datetime;
53
54        let name = DatabaseName::new(&proto_db.name)?;
55        let created_at = proto_db
56            .created_at
57            .ok_or_else(|| ConversionError::missing_field("Database", "created_at"))
58            .and_then(timestamp_to_datetime)?;
59        let revision_timestamp = proto_db
60            .revision_timestamp
61            .ok_or_else(|| ConversionError::missing_field("Database", "revision_timestamp"))
62            .and_then(timestamp_to_datetime)?;
63
64        Ok(Self {
65            inner: Arc::new(DatabaseAtRevisionInner {
66                client,
67                name,
68                created_at,
69                revision: proto_db.revision,
70                revision_timestamp,
71            }),
72        })
73    }
74
75    /// Create a database view at a specific revision using existing metadata.
76    pub(crate) fn at_revision_with_metadata(
77        client: EvidentSourceClient,
78        name: DatabaseName,
79        created_at: DateTime<Utc>,
80        revision: Revision,
81        revision_timestamp: DateTime<Utc>,
82    ) -> Self {
83        Self {
84            inner: Arc::new(DatabaseAtRevisionInner {
85                client,
86                name,
87                created_at,
88                revision,
89                revision_timestamp,
90            }),
91        }
92    }
93
94    /// Get a clone of the underlying client.
95    pub(crate) fn client(&self) -> EvidentSourceClient {
96        self.inner.client.clone()
97    }
98}
99
100impl std::fmt::Debug for DatabaseAtRevisionImpl {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        f.debug_struct("DatabaseAtRevisionImpl")
103            .field("name", &self.inner.name)
104            .field("revision", &self.inner.revision)
105            .finish()
106    }
107}
108
109impl DatabaseIdentity for DatabaseAtRevisionImpl {
110    fn name(&self) -> &DatabaseName {
111        &self.inner.name
112    }
113
114    fn created_at(&self) -> DateTime<Utc> {
115        self.inner.created_at
116    }
117}
118
119impl DatabaseAtRevision for DatabaseAtRevisionImpl {
120    type EffectiveTimestampView = EffectiveTimestampViewImpl;
121    type Speculative = SpeculativeDatabaseImpl;
122
123    fn revision(&self) -> Revision {
124        self.inner.revision
125    }
126
127    fn revision_timestamp(&self) -> DateTime<Utc> {
128        self.inner.revision_timestamp
129    }
130
131    fn at_effective_timestamp(
132        &self,
133        effective_timestamp: DateTime<Utc>,
134    ) -> Self::EffectiveTimestampView {
135        EffectiveTimestampViewImpl::new(self.clone(), effective_timestamp)
136    }
137
138    fn speculate_with_transaction(
139        &self,
140        transaction: NonEmpty<ProspectiveEvent>,
141    ) -> Self::Speculative {
142        SpeculativeDatabaseImpl::new(self.clone(), transaction)
143    }
144
145    fn at_revision(&self, revision: Revision) -> impl std::future::Future<Output = Self> {
146        let mut client = self.inner.client.clone();
147        let database_name = self.inner.name.to_string();
148        let name = self.inner.name.clone();
149        let created_at = self.inner.created_at;
150
151        async move {
152            // Fetch the database at the requested revision
153            let proto_db = client.await_database(database_name, revision).await;
154
155            match proto_db {
156                Ok(db) => {
157                    use crate::conversions::timestamp_to_datetime;
158                    let revision_timestamp = db
159                        .revision_timestamp
160                        .and_then(|ts| timestamp_to_datetime(ts).ok())
161                        .unwrap_or_else(chrono::Utc::now);
162
163                    Self::at_revision_with_metadata(
164                        client,
165                        name,
166                        created_at,
167                        db.revision,
168                        revision_timestamp,
169                    )
170                }
171                Err(_) => {
172                    // On error, return self (best effort)
173                    Self::at_revision_with_metadata(
174                        client,
175                        name,
176                        created_at,
177                        revision,
178                        chrono::Utc::now(),
179                    )
180                }
181            }
182        }
183    }
184
185    fn query_events(&self, selector: &EventSelector) -> impl Stream<Item = Event> {
186        let database_name = self.inner.name.to_string();
187        let revision = self.inner.revision;
188        let mut client = self.inner.client.clone();
189        let proto_selector: proto::EventSelector = selector.clone().into();
190
191        let query = proto::DatabaseQuery {
192            selector: Some(proto_selector),
193            range: Some(proto::QueryRange {
194                range: Some(proto::query_range::Range::Revision(
195                    proto::query_range::RevisionRange { start_at: Some(0) },
196                )),
197            }),
198            direction: proto::QueryDirection::Forward as i32,
199            limit: None,
200        };
201
202        stream::once(async move {
203            let result = client
204                .query_events(database_name, revision, true, query)
205                .await;
206
207            match result {
208                Ok(response_stream) => response_stream
209                    .filter_map(|result| async move {
210                        match result {
211                            Ok(reply) => {
212                                if let Some(proto::event_query_reply::Event::Detail(ce)) =
213                                    reply.event
214                                {
215                                    Event::try_from(ce).ok()
216                                } else {
217                                    None
218                                }
219                            }
220                            Err(_) => None,
221                        }
222                    })
223                    .boxed(),
224                Err(_) => stream::empty().boxed(),
225            }
226        })
227        .flatten()
228    }
229
230    fn query_events_with_options(
231        &self,
232        selector: &EventSelector,
233        options: QueryOptions,
234    ) -> impl Stream<Item = Event> {
235        use evidentsource_core::domain::QueryDirection;
236
237        let database_name = self.inner.name.to_string();
238        let revision = self.inner.revision;
239        let mut client = self.inner.client.clone();
240        let proto_selector: proto::EventSelector = selector.clone().into();
241
242        let direction = match options.get_direction() {
243            QueryDirection::Forward => proto::QueryDirection::Forward as i32,
244            QueryDirection::Reverse => proto::QueryDirection::Reverse as i32,
245        };
246
247        let query = proto::DatabaseQuery {
248            selector: Some(proto_selector),
249            range: Some(proto::QueryRange {
250                range: Some(proto::query_range::Range::Revision(
251                    proto::query_range::RevisionRange { start_at: Some(0) },
252                )),
253            }),
254            direction,
255            limit: options.get_limit().map(|l| l as u32),
256        };
257
258        stream::once(async move {
259            let result = client
260                .query_events(database_name, revision, true, query)
261                .await;
262
263            match result {
264                Ok(response_stream) => response_stream
265                    .filter_map(|result| async move {
266                        match result {
267                            Ok(reply) => {
268                                if let Some(proto::event_query_reply::Event::Detail(ce)) =
269                                    reply.event
270                                {
271                                    Event::try_from(ce).ok()
272                                } else {
273                                    None
274                                }
275                            }
276                            Err(_) => None,
277                        }
278                    })
279                    .boxed(),
280                Err(_) => stream::empty().boxed(),
281            }
282        })
283        .flatten()
284    }
285
286    fn view_state(
287        &self,
288        name: &StateViewName,
289        version: StateViewVersion,
290    ) -> impl std::future::Future<Output = Result<StateView, StateViewError>> {
291        let state_view_name = name.to_string();
292        let identity = proto::StateViewIdentity {
293            database_name: self.inner.name.to_string(),
294            state_view_name: state_view_name.clone(),
295            state_view_version: version,
296        };
297        let revision = self.inner.revision;
298        let mut client = self.inner.client.clone();
299
300        async move {
301            let proto_view = client
302                .fetch_state_view_at_revision(Some(identity), revision, None, None)
303                .await
304                .map_err(|e| match e {
305                    crate::Error::GrpcStatus(ref status) => {
306                        crate::status_mapping::to_state_view_error(
307                            status,
308                            &state_view_name,
309                            version,
310                        )
311                    }
312                    _ => StateViewError::ServerError(e.to_string()),
313                })?;
314
315            StateView::try_from(proto_view).map_err(|e| {
316                StateViewError::ServerError(format!("failed to parse state view: {}", e))
317            })
318        }
319    }
320
321    fn view_state_with_params(
322        &self,
323        name: &StateViewName,
324        version: StateViewVersion,
325        params: &[(String, EventAttribute)],
326    ) -> impl std::future::Future<Output = Result<StateView, StateViewError>> {
327        let state_view_name = name.to_string();
328        let identity = proto::StateViewIdentity {
329            database_name: self.inner.name.to_string(),
330            state_view_name: state_view_name.clone(),
331            state_view_version: version,
332        };
333        let revision = self.inner.revision;
334        let mut client = self.inner.client.clone();
335
336        // Convert params to proto ParameterBindings
337        let param_bindings = if params.is_empty() {
338            None
339        } else {
340            Some(proto::ParameterBindings {
341                bindings: params
342                    .iter()
343                    .map(|(k, v)| (k.clone(), v.clone().into()))
344                    .collect(),
345            })
346        };
347
348        async move {
349            let proto_view = client
350                .fetch_state_view_at_revision(Some(identity), revision, param_bindings, None)
351                .await
352                .map_err(|e| match e {
353                    crate::Error::GrpcStatus(ref status) => {
354                        crate::status_mapping::to_state_view_error(
355                            status,
356                            &state_view_name,
357                            version,
358                        )
359                    }
360                    _ => StateViewError::ServerError(e.to_string()),
361                })?;
362
363            StateView::try_from(proto_view).map_err(|e| {
364                StateViewError::ServerError(format!("failed to parse state view: {}", e))
365            })
366        }
367    }
368}
369
370// =============================================================================
371// DatabaseAtRevision Extensions
372// =============================================================================
373
374impl DatabaseAtRevisionImpl {
375    /// Create an event query builder for ergonomic event queries.
376    ///
377    /// # Example
378    ///
379    /// ```ignore
380    /// use futures::StreamExt;
381    ///
382    /// // Simple query
383    /// let events: Vec<Event> = db.events(EventSelector::all()).collect().await;
384    ///
385    /// // With options
386    /// let events: Vec<Event> = db
387    ///     .events(EventSelector::stream_equals("my-stream"))
388    ///     .reverse()
389    ///     .limit(10)
390    ///     .collect()
391    ///     .await;
392    /// ```
393    pub fn events(&self, selector: EventSelector) -> EventQueryBuilder {
394        EventQueryBuilder::new(self.clone(), selector)
395    }
396
397    /// List all stream names in the database at this revision.
398    ///
399    /// # Example
400    ///
401    /// ```ignore
402    /// use futures::StreamExt;
403    ///
404    /// let streams: Vec<StreamName> = db.list_streams().collect().await;
405    /// for stream in streams {
406    ///     println!("Stream: {}", stream);
407    /// }
408    /// ```
409    pub fn list_streams(&self) -> impl Stream<Item = evidentsource_core::domain::StreamName> {
410        use crate::com::evidentsource::index_key_scan_request::IndexKeyType;
411        use evidentsource_core::domain::StreamName;
412
413        let mut client = self.inner.client.clone();
414        let database_name = self.inner.name.to_string();
415        let revision = self.inner.revision;
416
417        stream::once(async move {
418            let result = client
419                .scan_index_keys(database_name, revision, IndexKeyType::Stream)
420                .await;
421
422            match result {
423                Ok(response_stream) => response_stream
424                    .filter_map(|result| async move {
425                        match result {
426                            Ok(reply) => StreamName::new(&reply.index_key).ok(),
427                            Err(_) => None,
428                        }
429                    })
430                    .boxed(),
431                Err(_) => stream::empty().boxed(),
432            }
433        })
434        .flatten()
435    }
436
437    /// List all subjects in the database at this revision.
438    ///
439    /// # Example
440    ///
441    /// ```ignore
442    /// use futures::StreamExt;
443    ///
444    /// let subjects: Vec<EventSubject> = db.list_subjects().collect().await;
445    /// ```
446    pub fn list_subjects(&self) -> impl Stream<Item = evidentsource_core::domain::EventSubject> {
447        use crate::com::evidentsource::index_key_scan_request::IndexKeyType;
448        use evidentsource_core::domain::EventSubject;
449
450        let mut client = self.inner.client.clone();
451        let database_name = self.inner.name.to_string();
452        let revision = self.inner.revision;
453
454        stream::once(async move {
455            let result = client
456                .scan_index_keys(database_name, revision, IndexKeyType::Subject)
457                .await;
458
459            match result {
460                Ok(response_stream) => response_stream
461                    .filter_map(|result| async move {
462                        match result {
463                            Ok(reply) => EventSubject::new(&reply.index_key).ok(),
464                            Err(_) => None,
465                        }
466                    })
467                    .boxed(),
468                Err(_) => stream::empty().boxed(),
469            }
470        })
471        .flatten()
472    }
473
474    /// List all event types in the database at this revision.
475    ///
476    /// # Example
477    ///
478    /// ```ignore
479    /// use futures::StreamExt;
480    ///
481    /// let types: Vec<EventType> = db.list_event_types().collect().await;
482    /// ```
483    pub fn list_event_types(&self) -> impl Stream<Item = evidentsource_core::domain::EventType> {
484        use crate::com::evidentsource::index_key_scan_request::IndexKeyType;
485        use evidentsource_core::domain::EventType;
486
487        let mut client = self.inner.client.clone();
488        let database_name = self.inner.name.to_string();
489        let revision = self.inner.revision;
490
491        stream::once(async move {
492            let result = client
493                .scan_index_keys(database_name, revision, IndexKeyType::EventType)
494                .await;
495
496            match result {
497                Ok(response_stream) => response_stream
498                    .filter_map(|result| async move {
499                        match result {
500                            Ok(reply) => EventType::new(&reply.index_key).ok(),
501                            Err(_) => None,
502                        }
503                    })
504                    .boxed(),
505                Err(_) => stream::empty().boxed(),
506            }
507        })
508        .flatten()
509    }
510}
511
512// =============================================================================
513// Event Query Builder
514// =============================================================================
515
516/// Builder for constructing event queries with options.
517///
518/// # Example
519///
520/// ```ignore
521/// // Get all events as a stream
522/// let events = db.events(EventSelector::all()).stream();
523///
524/// // Get first 10 events in reverse order, collected to a Vec
525/// let events: Vec<Event> = db
526///     .events(EventSelector::stream_equals("my-stream"))
527///     .reverse()
528///     .limit(10)
529///     .collect()
530///     .await;
531///
532/// // Get just the first event
533/// let first = db.events(EventSelector::all()).first().await;
534/// ```
535pub struct EventQueryBuilder {
536    database: DatabaseAtRevisionImpl,
537    selector: EventSelector,
538    options: QueryOptions,
539}
540
541impl EventQueryBuilder {
542    /// Create a new event query builder.
543    pub fn new(database: DatabaseAtRevisionImpl, selector: EventSelector) -> Self {
544        Self {
545            database,
546            selector,
547            options: QueryOptions::default(),
548        }
549    }
550
551    /// Set the query direction.
552    pub fn direction(mut self, direction: evidentsource_core::domain::QueryDirection) -> Self {
553        self.options = self.options.direction(direction);
554        self
555    }
556
557    /// Set the query to reverse order (newest first).
558    pub fn reverse(mut self) -> Self {
559        self.options = self.options.reverse();
560        self
561    }
562
563    /// Set the query to forward order (oldest first).
564    pub fn forward(mut self) -> Self {
565        self.options = self.options.forward();
566        self
567    }
568
569    /// Set a limit on the number of events returned.
570    pub fn limit(mut self, limit: u64) -> Self {
571        self.options = self.options.limit(limit);
572        self
573    }
574
575    /// Execute the query and collect all events into a Vec.
576    pub async fn collect(self) -> Vec<Event> {
577        use futures::StreamExt as _;
578        self.database
579            .query_events_with_options(&self.selector, self.options)
580            .collect()
581            .await
582    }
583
584    /// Execute the query and return just the first event, if any.
585    pub async fn first(mut self) -> Option<Event> {
586        use futures::StreamExt as _;
587        self.options = self.options.limit(1);
588        self.database
589            .query_events_with_options(&self.selector, self.options)
590            .boxed()
591            .next()
592            .await
593    }
594
595    /// Create a typed event query that filters and converts events.
596    ///
597    /// The type `T` must implement `TryFrom<&Event>`. Events that fail to
598    /// convert are silently skipped.
599    ///
600    /// # Example
601    ///
602    /// ```ignore
603    /// #[derive(Debug)]
604    /// struct AccountCreated { name: String }
605    ///
606    /// impl TryFrom<&Event> for AccountCreated {
607    ///     type Error = Box<dyn std::error::Error>;
608    ///     fn try_from(event: &Event) -> Result<Self, Self::Error> {
609    ///         // Parse the event data...
610    ///     }
611    /// }
612    ///
613    /// let accounts: Vec<AccountCreated> = db
614    ///     .events(EventSelector::type_equals("AccountCreated"))
615    ///     .typed::<AccountCreated>()
616    ///     .collect()
617    ///     .await;
618    /// ```
619    pub fn typed<T>(self) -> TypedEventQuery<T>
620    where
621        T: for<'a> TryFrom<&'a Event> + 'static,
622    {
623        TypedEventQuery::new(self)
624    }
625}
626
627/// A typed event query that converts events to a specific type.
628pub struct TypedEventQuery<T> {
629    builder: EventQueryBuilder,
630    _phantom: std::marker::PhantomData<T>,
631}
632
633impl<T> TypedEventQuery<T>
634where
635    T: for<'a> TryFrom<&'a Event> + 'static,
636{
637    fn new(builder: EventQueryBuilder) -> Self {
638        Self {
639            builder,
640            _phantom: std::marker::PhantomData,
641        }
642    }
643
644    /// Execute the query and collect all typed events into a Vec.
645    ///
646    /// Events that fail to convert are silently skipped.
647    pub async fn collect(self) -> Vec<T> {
648        let events: Vec<Event> = self.builder.collect().await;
649        events
650            .iter()
651            .filter_map(|event| T::try_from(event).ok())
652            .collect()
653    }
654
655    /// Execute the query and return just the first typed event, if any.
656    pub async fn first(self) -> Option<T> {
657        let event = self.builder.first().await?;
658        T::try_from(&event).ok()
659    }
660}
661
662// =============================================================================
663// Typed State View Extension Trait
664// =============================================================================
665
666/// Extension trait for typed state view access.
667///
668/// This trait provides convenience methods for fetching state views and
669/// automatically deserializing them to typed Rust structs.
670pub trait DatabaseAtRevisionTyped: DatabaseAtRevision {
671    /// Fetch a state view and deserialize it to the specified type.
672    ///
673    /// # Example
674    ///
675    /// ```ignore
676    /// #[derive(Deserialize)]
677    /// struct AccountSummary {
678    ///     balance: i64,
679    ///     transaction_count: u32,
680    /// }
681    ///
682    /// let summary: AccountSummary = db
683    ///     .view_state_as::<AccountSummary>(&StateViewName::new("account-summary")?, 1)
684    ///     .await?;
685    /// ```
686    fn view_state_as<T>(
687        &self,
688        name: &StateViewName,
689        version: StateViewVersion,
690    ) -> impl std::future::Future<Output = Result<T, StateViewError>>
691    where
692        T: serde::de::DeserializeOwned;
693
694    /// Fetch a state view and deserialize it, returning None if not found.
695    ///
696    /// This is useful when the state view might not exist yet (e.g., before
697    /// any events have been recorded for an entity).
698    fn view_state_opt<T>(
699        &self,
700        name: &StateViewName,
701        version: StateViewVersion,
702    ) -> impl std::future::Future<Output = Result<Option<T>, StateViewError>>
703    where
704        T: serde::de::DeserializeOwned;
705}
706
707impl DatabaseAtRevisionTyped for DatabaseAtRevisionImpl {
708    fn view_state_as<T>(
709        &self,
710        name: &StateViewName,
711        version: StateViewVersion,
712    ) -> impl std::future::Future<Output = Result<T, StateViewError>>
713    where
714        T: serde::de::DeserializeOwned,
715    {
716        let view_future = self.view_state(name, version);
717
718        async move {
719            let state_view = view_future.await?;
720
721            // Get the content bytes from the state view
722            let content = state_view.content_bytes().ok_or_else(|| {
723                StateViewError::ServerError("state view has no content".to_string())
724            })?;
725
726            // Deserialize from JSON
727            serde_json::from_slice(content).map_err(|e| {
728                StateViewError::ServerError(format!("failed to deserialize state view: {}", e))
729            })
730        }
731    }
732
733    fn view_state_opt<T>(
734        &self,
735        name: &StateViewName,
736        version: StateViewVersion,
737    ) -> impl std::future::Future<Output = Result<Option<T>, StateViewError>>
738    where
739        T: serde::de::DeserializeOwned,
740    {
741        let view_future = self.view_state(name, version);
742
743        async move {
744            match view_future.await {
745                Ok(state_view) => {
746                    // Get the content bytes from the state view
747                    match state_view.content_bytes() {
748                        Some(content) => {
749                            let value = serde_json::from_slice(content).map_err(|e| {
750                                StateViewError::ServerError(format!(
751                                    "failed to deserialize state view: {}",
752                                    e
753                                ))
754                            })?;
755                            Ok(Some(value))
756                        }
757                        None => Ok(None),
758                    }
759                }
760                Err(StateViewError::NotFound { .. }) => Ok(None),
761                Err(e) => Err(e),
762            }
763        }
764    }
765}