1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
//! # PostgreSQL State Store
//!
//! This module provides an implementation of a StateStore for a PostgreSQL database.
//! It allows storing and retrieving business concepts, known as states, which are hydrated from event store events.
use crate::Error;
use crate::PgEventStore;
use async_trait::async_trait;
use disintegrate::state::{Hydrated, State, StateStore};
use disintegrate::Event;
use disintegrate::EventStore;
use disintegrate_serde::Serde;
use futures::StreamExt;
/// Implementation of the `StateStore` trait for a PostgreSQL event store.
///
/// This struct allows hydrating and saving states using a PostgreSQL event store.
/// It requires the event type `E` to implement the `Event` traits.
/// Additionally, it requires the serializer type `Srd` to implement Serde traits.
#[async_trait]
impl<E, Srd> StateStore<E> for PgEventStore<E, Srd>
where
E: Event + Clone + Send + Sync,
Srd: Serde<E> + Send + Sync,
{
type Error = Error;
/// Hydrates the given state using the events stored in the PostgreSQL event store.
///
/// It retrieves events from the event store and applies them to the default state,
/// resulting in a hydrated state. The function returns a `Hydrated` object containing
/// the hydrated state and the version of the state.
///
/// # Arguments
///
/// * `default` - The default state to be hydrated.
///
/// # Returns
///
/// A `Result` containing the hydrated state wrapped in a `Hydrated` object and the version
/// of the state if successful, or an `Error` if an error occurs during the hydration process.
async fn hydrate<QE, S>(&self, default: S) -> Result<Hydrated<S>, Self::Error>
where
S: State<Event = QE>,
QE: TryFrom<E> + Event + Clone + Send + Sync,
<QE as TryFrom<E>>::Error: std::fmt::Debug + Send,
{
let (state, version) = self
.stream(&default.query())
.unwrap()
.fold((default, 0), |(mut state, _), e| async move {
let applied_event_id = e.id();
state.mutate(e.unwrap());
(state, applied_event_id)
})
.await;
Ok(Hydrated::new(state, version))
}
/// Saves the changes from the provided hydrated state into the PostgreSQL event store.
///
/// It appends the changes in the hydrated state as events to the event store,
/// updating the state's query and version.
///
/// # Arguments
///
/// * `state` - The hydrated state containing the changes to be saved.
///
/// # Returns
///
/// A `Result` indicating success or failure of the save operation. Returns `Ok(())` if
/// the changes are successfully saved, or an `Error` if an error occurs during the save process.
async fn save<QE, S>(&self, state: &mut Hydrated<S>) -> Result<(), Self::Error>
where
S: State<Event = QE>,
QE: Into<E> + Event + Clone + Send + Sync,
{
self.append(
state
.changes()
.into_iter()
.map(|e| e.into())
.collect::<Vec<E>>(),
state.query(),
state.version(),
)
.await?;
Ok(())
}
}