pub struct PostgresEventStore<E>{ /* private fields */ }Expand description
PostgreSQL event store implementation
TODO: The serialization_format configuration is currently stored but not used.
The PostgreSQL adapter still uses hardcoded JSON serialization. Implementing
pluggable serialization formats requires changes to the database schema and
query logic to handle binary data instead of JSON columns.
Implementations§
Source§impl<E> PostgresEventStore<E>
impl<E> PostgresEventStore<E>
Sourcepub async fn read_streams_paginated_impl(
&self,
stream_ids: &[StreamId],
options: &ReadOptions,
continuation_token: Option<EventId>,
) -> Result<(Vec<StoredEvent<E>>, Option<EventId>), EventStoreError>
pub async fn read_streams_paginated_impl( &self, stream_ids: &[StreamId], options: &ReadOptions, continuation_token: Option<EventId>, ) -> Result<(Vec<StoredEvent<E>>, Option<EventId>), EventStoreError>
Read streams with pagination support for large result sets Returns events and a continuation token for the next page
Source§impl<E> PostgresEventStore<E>
impl<E> PostgresEventStore<E>
Sourcepub async fn new(config: PostgresConfig) -> Result<Self, PostgresError>
pub async fn new(config: PostgresConfig) -> Result<Self, PostgresError>
Create a new PostgreSQL event store with the given configuration
Sourcepub async fn new_with_retry_strategy(
config: PostgresConfig,
retry_strategy: RetryStrategy,
) -> Result<Self, PostgresError>
pub async fn new_with_retry_strategy( config: PostgresConfig, retry_strategy: RetryStrategy, ) -> Result<Self, PostgresError>
Create a new PostgreSQL event store with custom retry strategy
Sourcepub const fn config(&self) -> &PostgresConfig
pub const fn config(&self) -> &PostgresConfig
Get the configuration
Sourcepub async fn migrate(&self) -> Result<(), PostgresError>
pub async fn migrate(&self) -> Result<(), PostgresError>
Run database migrations
Sourcepub async fn initialize(&self) -> Result<(), PostgresError>
pub async fn initialize(&self) -> Result<(), PostgresError>
Initialize the database schema
This method creates the necessary tables and indexes for the event store.
It is idempotent and can be called multiple times safely.
Uses PostgreSQL advisory locks to prevent concurrent initialization conflicts.
Sourcepub async fn health_check(&self) -> Result<HealthStatus, PostgresError>
pub async fn health_check(&self) -> Result<HealthStatus, PostgresError>
Check database connectivity with comprehensive health checks
Sourcepub async fn recover_connection(&self) -> Result<(), PostgresError>
pub async fn recover_connection(&self) -> Result<(), PostgresError>
Attempt to recover from connection issues
Sourcepub fn get_pool_metrics(&self) -> PoolMetrics
pub fn get_pool_metrics(&self) -> PoolMetrics
Get current pool metrics
Sourcepub fn monitor(&self) -> Arc<PoolMonitor>
pub fn monitor(&self) -> Arc<PoolMonitor>
Get pool monitor for advanced monitoring setup
Sourcepub fn start_pool_monitoring(&self) -> (JoinHandle<()>, Sender<bool>)
pub fn start_pool_monitoring(&self) -> (JoinHandle<()>, Sender<bool>)
Start background pool monitoring task
Sourcepub async fn read_paginated(
&self,
stream_ids: &[StreamId],
options: &ReadOptions,
continuation_token: Option<EventId>,
) -> Result<(Vec<StoredEvent<E>>, Option<EventId>), EventStoreError>
pub async fn read_paginated( &self, stream_ids: &[StreamId], options: &ReadOptions, continuation_token: Option<EventId>, ) -> Result<(Vec<StoredEvent<E>>, Option<EventId>), EventStoreError>
Read events from multiple streams with pagination support.
This method is designed for efficiently processing large result sets without loading all events into memory at once.
§Arguments
stream_ids- The streams to read fromoptions- Read options (version filters, max events)continuation_token- Optional continuation token from previous page
§Returns
A tuple containing:
- Vector of events for this page
- Optional continuation token for the next page (None if no more results)
§Example
let stream_ids = vec![StreamId::try_new("stream-1")?, StreamId::try_new("stream-2")?];
let options = ReadOptions::default();
let mut continuation = None;
loop {
let (events, next_token) = store.read_paginated(&stream_ids, &options, continuation).await?;
// Process events for this page
for event in events {
println!("Processing event: {:?}", event.event_id);
}
// Check if there are more pages
match next_token {
Some(token) => continuation = Some(token),
None => break, // No more pages
}
}Trait Implementations§
Source§impl<E> Clone for PostgresEventStore<E>
impl<E> Clone for PostgresEventStore<E>
Source§impl<E> EventStore for PostgresEventStore<E>
impl<E> EventStore for PostgresEventStore<E>
Source§fn read_streams<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
stream_ids: &'life1 [StreamId],
options: &'life2 ReadOptions,
) -> Pin<Box<dyn Future<Output = Result<StreamData<Self::Event>, EventStoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn read_streams<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
stream_ids: &'life1 [StreamId],
options: &'life2 ReadOptions,
) -> Pin<Box<dyn Future<Output = Result<StreamData<Self::Event>, EventStoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Source§fn write_events_multi<'life0, 'async_trait>(
&'life0 self,
stream_events: Vec<StreamEvents<Self::Event>>,
) -> Pin<Box<dyn Future<Output = Result<HashMap<StreamId, EventVersion>, EventStoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn write_events_multi<'life0, 'async_trait>(
&'life0 self,
stream_events: Vec<StreamEvents<Self::Event>>,
) -> Pin<Box<dyn Future<Output = Result<HashMap<StreamId, EventVersion>, EventStoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn stream_exists<'life0, 'life1, 'async_trait>(
&'life0 self,
stream_id: &'life1 StreamId,
) -> Pin<Box<dyn Future<Output = Result<bool, EventStoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn stream_exists<'life0, 'life1, 'async_trait>(
&'life0 self,
stream_id: &'life1 StreamId,
) -> Pin<Box<dyn Future<Output = Result<bool, EventStoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn get_stream_version<'life0, 'life1, 'async_trait>(
&'life0 self,
stream_id: &'life1 StreamId,
) -> Pin<Box<dyn Future<Output = Result<Option<EventVersion>, EventStoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn get_stream_version<'life0, 'life1, 'async_trait>(
&'life0 self,
stream_id: &'life1 StreamId,
) -> Pin<Box<dyn Future<Output = Result<Option<EventVersion>, EventStoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn subscribe<'life0, 'async_trait>(
&'life0 self,
options: SubscriptionOptions,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Subscription<Event = Self::Event>>, EventStoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn subscribe<'life0, 'async_trait>(
&'life0 self,
options: SubscriptionOptions,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Subscription<Event = Self::Event>>, EventStoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Auto Trait Implementations§
impl<E> Freeze for PostgresEventStore<E>
impl<E> !RefUnwindSafe for PostgresEventStore<E>
impl<E> Send for PostgresEventStore<E>
impl<E> Sync for PostgresEventStore<E>
impl<E> Unpin for PostgresEventStore<E>where
E: Unpin,
impl<E> !UnwindSafe for PostgresEventStore<E>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<ES> EventStoreResourceExt<ES> for ES
impl<ES> EventStoreResourceExt<ES> for ES
Source§fn into_resource(self) -> Resource<ES, Acquired>
fn into_resource(self) -> Resource<ES, Acquired>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<D> OwoColorize for D
impl<D> OwoColorize for D
Source§fn fg<C>(&self) -> FgColorDisplay<'_, C, Self>where
C: Color,
fn fg<C>(&self) -> FgColorDisplay<'_, C, Self>where
C: Color,
Source§fn bg<C>(&self) -> BgColorDisplay<'_, C, Self>where
C: Color,
fn bg<C>(&self) -> BgColorDisplay<'_, C, Self>where
C: Color,
Source§fn black(&self) -> FgColorDisplay<'_, Black, Self>
fn black(&self) -> FgColorDisplay<'_, Black, Self>
Source§fn on_black(&self) -> BgColorDisplay<'_, Black, Self>
fn on_black(&self) -> BgColorDisplay<'_, Black, Self>
Source§fn red(&self) -> FgColorDisplay<'_, Red, Self>
fn red(&self) -> FgColorDisplay<'_, Red, Self>
Source§fn on_red(&self) -> BgColorDisplay<'_, Red, Self>
fn on_red(&self) -> BgColorDisplay<'_, Red, Self>
Source§fn green(&self) -> FgColorDisplay<'_, Green, Self>
fn green(&self) -> FgColorDisplay<'_, Green, Self>
Source§fn on_green(&self) -> BgColorDisplay<'_, Green, Self>
fn on_green(&self) -> BgColorDisplay<'_, Green, Self>
Source§fn yellow(&self) -> FgColorDisplay<'_, Yellow, Self>
fn yellow(&self) -> FgColorDisplay<'_, Yellow, Self>
Source§fn on_yellow(&self) -> BgColorDisplay<'_, Yellow, Self>
fn on_yellow(&self) -> BgColorDisplay<'_, Yellow, Self>
Source§fn blue(&self) -> FgColorDisplay<'_, Blue, Self>
fn blue(&self) -> FgColorDisplay<'_, Blue, Self>
Source§fn on_blue(&self) -> BgColorDisplay<'_, Blue, Self>
fn on_blue(&self) -> BgColorDisplay<'_, Blue, Self>
Source§fn magenta(&self) -> FgColorDisplay<'_, Magenta, Self>
fn magenta(&self) -> FgColorDisplay<'_, Magenta, Self>
Source§fn on_magenta(&self) -> BgColorDisplay<'_, Magenta, Self>
fn on_magenta(&self) -> BgColorDisplay<'_, Magenta, Self>
Source§fn purple(&self) -> FgColorDisplay<'_, Magenta, Self>
fn purple(&self) -> FgColorDisplay<'_, Magenta, Self>
Source§fn on_purple(&self) -> BgColorDisplay<'_, Magenta, Self>
fn on_purple(&self) -> BgColorDisplay<'_, Magenta, Self>
Source§fn cyan(&self) -> FgColorDisplay<'_, Cyan, Self>
fn cyan(&self) -> FgColorDisplay<'_, Cyan, Self>
Source§fn on_cyan(&self) -> BgColorDisplay<'_, Cyan, Self>
fn on_cyan(&self) -> BgColorDisplay<'_, Cyan, Self>
Source§fn white(&self) -> FgColorDisplay<'_, White, Self>
fn white(&self) -> FgColorDisplay<'_, White, Self>
Source§fn on_white(&self) -> BgColorDisplay<'_, White, Self>
fn on_white(&self) -> BgColorDisplay<'_, White, Self>
Source§fn default_color(&self) -> FgColorDisplay<'_, Default, Self>
fn default_color(&self) -> FgColorDisplay<'_, Default, Self>
Source§fn on_default_color(&self) -> BgColorDisplay<'_, Default, Self>
fn on_default_color(&self) -> BgColorDisplay<'_, Default, Self>
Source§fn bright_black(&self) -> FgColorDisplay<'_, BrightBlack, Self>
fn bright_black(&self) -> FgColorDisplay<'_, BrightBlack, Self>
Source§fn on_bright_black(&self) -> BgColorDisplay<'_, BrightBlack, Self>
fn on_bright_black(&self) -> BgColorDisplay<'_, BrightBlack, Self>
Source§fn bright_red(&self) -> FgColorDisplay<'_, BrightRed, Self>
fn bright_red(&self) -> FgColorDisplay<'_, BrightRed, Self>
Source§fn on_bright_red(&self) -> BgColorDisplay<'_, BrightRed, Self>
fn on_bright_red(&self) -> BgColorDisplay<'_, BrightRed, Self>
Source§fn bright_green(&self) -> FgColorDisplay<'_, BrightGreen, Self>
fn bright_green(&self) -> FgColorDisplay<'_, BrightGreen, Self>
Source§fn on_bright_green(&self) -> BgColorDisplay<'_, BrightGreen, Self>
fn on_bright_green(&self) -> BgColorDisplay<'_, BrightGreen, Self>
Source§fn bright_yellow(&self) -> FgColorDisplay<'_, BrightYellow, Self>
fn bright_yellow(&self) -> FgColorDisplay<'_, BrightYellow, Self>
Source§fn on_bright_yellow(&self) -> BgColorDisplay<'_, BrightYellow, Self>
fn on_bright_yellow(&self) -> BgColorDisplay<'_, BrightYellow, Self>
Source§fn bright_blue(&self) -> FgColorDisplay<'_, BrightBlue, Self>
fn bright_blue(&self) -> FgColorDisplay<'_, BrightBlue, Self>
Source§fn on_bright_blue(&self) -> BgColorDisplay<'_, BrightBlue, Self>
fn on_bright_blue(&self) -> BgColorDisplay<'_, BrightBlue, Self>
Source§fn bright_magenta(&self) -> FgColorDisplay<'_, BrightMagenta, Self>
fn bright_magenta(&self) -> FgColorDisplay<'_, BrightMagenta, Self>
Source§fn on_bright_magenta(&self) -> BgColorDisplay<'_, BrightMagenta, Self>
fn on_bright_magenta(&self) -> BgColorDisplay<'_, BrightMagenta, Self>
Source§fn bright_purple(&self) -> FgColorDisplay<'_, BrightMagenta, Self>
fn bright_purple(&self) -> FgColorDisplay<'_, BrightMagenta, Self>
Source§fn on_bright_purple(&self) -> BgColorDisplay<'_, BrightMagenta, Self>
fn on_bright_purple(&self) -> BgColorDisplay<'_, BrightMagenta, Self>
Source§fn bright_cyan(&self) -> FgColorDisplay<'_, BrightCyan, Self>
fn bright_cyan(&self) -> FgColorDisplay<'_, BrightCyan, Self>
Source§fn on_bright_cyan(&self) -> BgColorDisplay<'_, BrightCyan, Self>
fn on_bright_cyan(&self) -> BgColorDisplay<'_, BrightCyan, Self>
Source§fn bright_white(&self) -> FgColorDisplay<'_, BrightWhite, Self>
fn bright_white(&self) -> FgColorDisplay<'_, BrightWhite, Self>
Source§fn on_bright_white(&self) -> BgColorDisplay<'_, BrightWhite, Self>
fn on_bright_white(&self) -> BgColorDisplay<'_, BrightWhite, Self>
Source§fn bold(&self) -> BoldDisplay<'_, Self>
fn bold(&self) -> BoldDisplay<'_, Self>
Source§fn dimmed(&self) -> DimDisplay<'_, Self>
fn dimmed(&self) -> DimDisplay<'_, Self>
Source§fn italic(&self) -> ItalicDisplay<'_, Self>
fn italic(&self) -> ItalicDisplay<'_, Self>
Source§fn underline(&self) -> UnderlineDisplay<'_, Self>
fn underline(&self) -> UnderlineDisplay<'_, Self>
Source§fn blink(&self) -> BlinkDisplay<'_, Self>
fn blink(&self) -> BlinkDisplay<'_, Self>
Source§fn blink_fast(&self) -> BlinkFastDisplay<'_, Self>
fn blink_fast(&self) -> BlinkFastDisplay<'_, Self>
Source§fn reversed(&self) -> ReversedDisplay<'_, Self>
fn reversed(&self) -> ReversedDisplay<'_, Self>
Source§fn strikethrough(&self) -> StrikeThroughDisplay<'_, Self>
fn strikethrough(&self) -> StrikeThroughDisplay<'_, Self>
Source§fn color<Color>(&self, color: Color) -> FgDynColorDisplay<'_, Color, Self>where
Color: DynColor,
fn color<Color>(&self, color: Color) -> FgDynColorDisplay<'_, Color, Self>where
Color: DynColor,
OwoColorize::fg or
a color-specific method, such as OwoColorize::green, Read moreSource§fn on_color<Color>(&self, color: Color) -> BgDynColorDisplay<'_, Color, Self>where
Color: DynColor,
fn on_color<Color>(&self, color: Color) -> BgDynColorDisplay<'_, Color, Self>where
Color: DynColor,
OwoColorize::bg or
a color-specific method, such as OwoColorize::on_yellow, Read more