Struct PostgresEventStore

Source
pub struct PostgresEventStore<E>
where E: Send + Sync,
{ /* private fields */ }
Expand description

PostgreSQL event store implementation

TODO: The serialization_format configuration is currently stored but not used. The PostgreSQL adapter still uses hardcoded JSON serialization. Implementing pluggable serialization formats requires changes to the database schema and query logic to handle binary data instead of JSON columns.

Implementations§

Source§

impl<E> PostgresEventStore<E>
where E: Serialize + for<'de> Deserialize<'de> + Send + Sync + Debug + Clone + PartialEq + Eq + 'static,

Source

pub async fn read_streams_paginated_impl( &self, stream_ids: &[StreamId], options: &ReadOptions, continuation_token: Option<EventId>, ) -> Result<(Vec<StoredEvent<E>>, Option<EventId>), EventStoreError>

Read streams with pagination support for large result sets Returns events and a continuation token for the next page

Source§

impl<E> PostgresEventStore<E>
where E: Serialize + for<'de> Deserialize<'de> + Send + Sync + Debug + Clone + PartialEq + Eq + 'static,

Source

pub async fn new(config: PostgresConfig) -> Result<Self, PostgresError>

Create a new PostgreSQL event store with the given configuration

Source

pub async fn new_with_retry_strategy( config: PostgresConfig, retry_strategy: RetryStrategy, ) -> Result<Self, PostgresError>

Create a new PostgreSQL event store with custom retry strategy

Source

pub fn pool(&self) -> &PgPool

Get a reference to the connection pool

Source

pub const fn config(&self) -> &PostgresConfig

Get the configuration

Source

pub async fn migrate(&self) -> Result<(), PostgresError>

Run database migrations

Source

pub async fn initialize(&self) -> Result<(), PostgresError>

Initialize the database schema

This method creates the necessary tables and indexes for the event store. It is idempotent and can be called multiple times safely. Uses PostgreSQL advisory locks to prevent concurrent initialization conflicts.

Source

pub async fn health_check(&self) -> Result<HealthStatus, PostgresError>

Check database connectivity with comprehensive health checks

Source

pub async fn recover_connection(&self) -> Result<(), PostgresError>

Attempt to recover from connection issues

Source

pub fn get_pool_metrics(&self) -> PoolMetrics

Get current pool metrics

Source

pub fn monitor(&self) -> Arc<PoolMonitor>

Get pool monitor for advanced monitoring setup

Source

pub fn start_pool_monitoring(&self) -> (JoinHandle<()>, Sender<bool>)

Start background pool monitoring task

Source

pub async fn read_paginated( &self, stream_ids: &[StreamId], options: &ReadOptions, continuation_token: Option<EventId>, ) -> Result<(Vec<StoredEvent<E>>, Option<EventId>), EventStoreError>
where E: Serialize + for<'de> Deserialize<'de> + Send + Sync + Clone + Debug + PartialEq + Eq + 'static,

Read events from multiple streams with pagination support.

This method is designed for efficiently processing large result sets without loading all events into memory at once.

§Arguments
  • stream_ids - The streams to read from
  • options - Read options (version filters, max events)
  • continuation_token - Optional continuation token from previous page
§Returns

A tuple containing:

  • Vector of events for this page
  • Optional continuation token for the next page (None if no more results)
§Example
let stream_ids = vec![StreamId::try_new("stream-1")?, StreamId::try_new("stream-2")?];
let options = ReadOptions::default();

let mut continuation = None;
loop {
    let (events, next_token) = store.read_paginated(&stream_ids, &options, continuation).await?;
     
    // Process events for this page
    for event in events {
        println!("Processing event: {:?}", event.event_id);
    }
     
    // Check if there are more pages
    match next_token {
        Some(token) => continuation = Some(token),
        None => break, // No more pages
    }
}

Trait Implementations§

Source§

impl<E> Clone for PostgresEventStore<E>
where E: Serialize + for<'de> Deserialize<'de> + Send + Sync + Debug + Clone + PartialEq + Eq + 'static,

Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<E> EventStore for PostgresEventStore<E>
where E: Serialize + for<'de> Deserialize<'de> + Send + Sync + Debug + Clone + PartialEq + Eq + 'static,

Source§

type Event = E

The event type this store handles.
Source§

fn read_streams<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, stream_ids: &'life1 [StreamId], options: &'life2 ReadOptions, ) -> Pin<Box<dyn Future<Output = Result<StreamData<Self::Event>, EventStoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Reads events from multiple streams. Read more
Source§

fn write_events_multi<'life0, 'async_trait>( &'life0 self, stream_events: Vec<StreamEvents<Self::Event>>, ) -> Pin<Box<dyn Future<Output = Result<HashMap<StreamId, EventVersion>, EventStoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Writes events to multiple streams atomically. Read more
Source§

fn stream_exists<'life0, 'life1, 'async_trait>( &'life0 self, stream_id: &'life1 StreamId, ) -> Pin<Box<dyn Future<Output = Result<bool, EventStoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Checks if a stream exists. Read more
Source§

fn get_stream_version<'life0, 'life1, 'async_trait>( &'life0 self, stream_id: &'life1 StreamId, ) -> Pin<Box<dyn Future<Output = Result<Option<EventVersion>, EventStoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Gets the current version of a stream. Read more
Source§

fn subscribe<'life0, 'async_trait>( &'life0 self, options: SubscriptionOptions, ) -> Pin<Box<dyn Future<Output = Result<Box<dyn Subscription<Event = Self::Event>>, EventStoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Creates a subscription to events. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<ES> EventStoreResourceExt<ES> for ES

Source§

fn into_resource(self) -> Resource<ES, Acquired>

Create an event store resource
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<D> OwoColorize for D

Source§

fn fg<C>(&self) -> FgColorDisplay<'_, C, Self>
where C: Color,

Set the foreground color generically Read more
Source§

fn bg<C>(&self) -> BgColorDisplay<'_, C, Self>
where C: Color,

Set the background color generically. Read more
Source§

fn black(&self) -> FgColorDisplay<'_, Black, Self>

Change the foreground color to black
Source§

fn on_black(&self) -> BgColorDisplay<'_, Black, Self>

Change the background color to black
Source§

fn red(&self) -> FgColorDisplay<'_, Red, Self>

Change the foreground color to red
Source§

fn on_red(&self) -> BgColorDisplay<'_, Red, Self>

Change the background color to red
Source§

fn green(&self) -> FgColorDisplay<'_, Green, Self>

Change the foreground color to green
Source§

fn on_green(&self) -> BgColorDisplay<'_, Green, Self>

Change the background color to green
Source§

fn yellow(&self) -> FgColorDisplay<'_, Yellow, Self>

Change the foreground color to yellow
Source§

fn on_yellow(&self) -> BgColorDisplay<'_, Yellow, Self>

Change the background color to yellow
Source§

fn blue(&self) -> FgColorDisplay<'_, Blue, Self>

Change the foreground color to blue
Source§

fn on_blue(&self) -> BgColorDisplay<'_, Blue, Self>

Change the background color to blue
Source§

fn magenta(&self) -> FgColorDisplay<'_, Magenta, Self>

Change the foreground color to magenta
Source§

fn on_magenta(&self) -> BgColorDisplay<'_, Magenta, Self>

Change the background color to magenta
Source§

fn purple(&self) -> FgColorDisplay<'_, Magenta, Self>

Change the foreground color to purple
Source§

fn on_purple(&self) -> BgColorDisplay<'_, Magenta, Self>

Change the background color to purple
Source§

fn cyan(&self) -> FgColorDisplay<'_, Cyan, Self>

Change the foreground color to cyan
Source§

fn on_cyan(&self) -> BgColorDisplay<'_, Cyan, Self>

Change the background color to cyan
Source§

fn white(&self) -> FgColorDisplay<'_, White, Self>

Change the foreground color to white
Source§

fn on_white(&self) -> BgColorDisplay<'_, White, Self>

Change the background color to white
Source§

fn default_color(&self) -> FgColorDisplay<'_, Default, Self>

Change the foreground color to the terminal default
Source§

fn on_default_color(&self) -> BgColorDisplay<'_, Default, Self>

Change the background color to the terminal default
Source§

fn bright_black(&self) -> FgColorDisplay<'_, BrightBlack, Self>

Change the foreground color to bright black
Source§

fn on_bright_black(&self) -> BgColorDisplay<'_, BrightBlack, Self>

Change the background color to bright black
Source§

fn bright_red(&self) -> FgColorDisplay<'_, BrightRed, Self>

Change the foreground color to bright red
Source§

fn on_bright_red(&self) -> BgColorDisplay<'_, BrightRed, Self>

Change the background color to bright red
Source§

fn bright_green(&self) -> FgColorDisplay<'_, BrightGreen, Self>

Change the foreground color to bright green
Source§

fn on_bright_green(&self) -> BgColorDisplay<'_, BrightGreen, Self>

Change the background color to bright green
Source§

fn bright_yellow(&self) -> FgColorDisplay<'_, BrightYellow, Self>

Change the foreground color to bright yellow
Source§

fn on_bright_yellow(&self) -> BgColorDisplay<'_, BrightYellow, Self>

Change the background color to bright yellow
Source§

fn bright_blue(&self) -> FgColorDisplay<'_, BrightBlue, Self>

Change the foreground color to bright blue
Source§

fn on_bright_blue(&self) -> BgColorDisplay<'_, BrightBlue, Self>

Change the background color to bright blue
Source§

fn bright_magenta(&self) -> FgColorDisplay<'_, BrightMagenta, Self>

Change the foreground color to bright magenta
Source§

fn on_bright_magenta(&self) -> BgColorDisplay<'_, BrightMagenta, Self>

Change the background color to bright magenta
Source§

fn bright_purple(&self) -> FgColorDisplay<'_, BrightMagenta, Self>

Change the foreground color to bright purple
Source§

fn on_bright_purple(&self) -> BgColorDisplay<'_, BrightMagenta, Self>

Change the background color to bright purple
Source§

fn bright_cyan(&self) -> FgColorDisplay<'_, BrightCyan, Self>

Change the foreground color to bright cyan
Source§

fn on_bright_cyan(&self) -> BgColorDisplay<'_, BrightCyan, Self>

Change the background color to bright cyan
Source§

fn bright_white(&self) -> FgColorDisplay<'_, BrightWhite, Self>

Change the foreground color to bright white
Source§

fn on_bright_white(&self) -> BgColorDisplay<'_, BrightWhite, Self>

Change the background color to bright white
Source§

fn bold(&self) -> BoldDisplay<'_, Self>

Make the text bold
Source§

fn dimmed(&self) -> DimDisplay<'_, Self>

Make the text dim
Source§

fn italic(&self) -> ItalicDisplay<'_, Self>

Make the text italicized
Source§

fn underline(&self) -> UnderlineDisplay<'_, Self>

Make the text underlined
Make the text blink
Make the text blink (but fast!)
Source§

fn reversed(&self) -> ReversedDisplay<'_, Self>

Swap the foreground and background colors
Source§

fn hidden(&self) -> HiddenDisplay<'_, Self>

Hide the text
Source§

fn strikethrough(&self) -> StrikeThroughDisplay<'_, Self>

Cross out the text
Source§

fn color<Color>(&self, color: Color) -> FgDynColorDisplay<'_, Color, Self>
where Color: DynColor,

Set the foreground color at runtime. Only use if you do not know which color will be used at compile-time. If the color is constant, use either OwoColorize::fg or a color-specific method, such as OwoColorize::green, Read more
Source§

fn on_color<Color>(&self, color: Color) -> BgDynColorDisplay<'_, Color, Self>
where Color: DynColor,

Set the background color at runtime. Only use if you do not know what color to use at compile-time. If the color is constant, use either OwoColorize::bg or a color-specific method, such as OwoColorize::on_yellow, Read more
Source§

fn fg_rgb<const R: u8, const G: u8, const B: u8>( &self, ) -> FgColorDisplay<'_, CustomColor<R, G, B>, Self>

Set the foreground color to a specific RGB value.
Source§

fn bg_rgb<const R: u8, const G: u8, const B: u8>( &self, ) -> BgColorDisplay<'_, CustomColor<R, G, B>, Self>

Set the background color to a specific RGB value.
Source§

fn truecolor(&self, r: u8, g: u8, b: u8) -> FgDynColorDisplay<'_, Rgb, Self>

Sets the foreground color to an RGB value.
Source§

fn on_truecolor(&self, r: u8, g: u8, b: u8) -> BgDynColorDisplay<'_, Rgb, Self>

Sets the background color to an RGB value.
Source§

fn style(&self, style: Style) -> Styled<&Self>

Apply a runtime-determined style
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<S> SubscriptionResourceExt<S> for S

Source§

fn into_resource(self) -> Resource<S, Acquired>

Create a subscription resource
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,