Skip to main content

PostgresStore

Struct PostgresStore 

Source
pub struct PostgresStore { /* private fields */ }
Expand description

PostgreSQL implementation of Store trait

Implementations§

Source§

impl PostgresStore

Source

pub fn new(pool: PgPool) -> Self

Source

pub fn pool(&self) -> &PgPool

Source

pub async fn is_processed(&self, event_id: Uuid) -> Result<bool>

Check if an event has already been processed (for idempotency).

This is used by the Kafka backend to prevent duplicate processing when Kafka redelivers messages.

Trait Implementations§

Source§

impl Clone for PostgresStore

Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl InsightStore for PostgresStore

Source§

fn subscribe_events<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<Box<dyn Stream<Item = InsightEvent> + Send + Unpin>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Subscribe to live events for all workflows Read more
Source§

fn get_workflow_tree<'life0, 'async_trait>( &'life0 self, correlation_id: Uuid, ) -> Pin<Box<dyn Future<Output = Result<WorkflowTree>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get workflow tree for visualization Read more
Source§

fn get_stats<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<InsightStats>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get aggregate statistics Read more
Source§

fn get_recent_events<'life0, 'async_trait>( &'life0 self, cursor: Option<i64>, limit: usize, ) -> Pin<Box<dyn Future<Output = Result<Vec<InsightEvent>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get recent events with cursor-based pagination Read more
Source§

fn get_effect_logs<'life0, 'async_trait>( &'life0 self, correlation_id: Option<Uuid>, limit: usize, ) -> Pin<Box<dyn Future<Output = Result<Vec<EffectExecutionLog>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get effect execution logs for operational debugging. Read more
Source§

fn get_dead_letters<'life0, 'async_trait>( &'life0 self, unresolved_only: bool, limit: usize, ) -> Pin<Box<dyn Future<Output = Result<Vec<DeadLetterEntry>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get dead letter queue entries. Read more
Source§

fn get_failed_workflows<'life0, 'async_trait>( &'life0 self, limit: usize, ) -> Pin<Box<dyn Future<Output = Result<Vec<FailedWorkflow>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get workflows that currently have failures or dead letters. Read more
Source§

impl Store for PostgresStore

Source§

fn publish<'life0, 'async_trait>( &'life0 self, event: QueuedEvent, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Publish event to queue Read more
Source§

fn poll_next<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<Option<QueuedEvent>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Poll next event (per-workflow FIFO with advisory locks) Read more
Source§

fn ack<'life0, 'async_trait>( &'life0 self, id: i64, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Acknowledge event processing completion Read more
Source§

fn nack<'life0, 'async_trait>( &'life0 self, id: i64, retry_after_secs: u64, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Nack event (failed, retry later) Read more
Source§

fn commit_event_processing<'life0, 'async_trait>( &'life0 self, commit: EventProcessingCommit, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Atomically commit event processing side effects. Read more
Source§

fn insert_effect_intent<'life0, 'async_trait>( &'life0 self, event_id: Uuid, handler_id: String, correlation_id: Uuid, event_type: String, event_payload: Value, parent_event_id: Option<Uuid>, batch_id: Option<Uuid>, batch_index: Option<i32>, batch_size: Option<i32>, execute_at: DateTime<Utc>, timeout_seconds: i32, max_attempts: i32, priority: i32, join_window_timeout_seconds: Option<i32>, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Insert effect execution intent Read more
Source§

fn poll_next_effect<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<Option<QueuedHandlerExecution>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Poll next ready effect (priority-based) Read more
Source§

fn complete_effect<'life0, 'async_trait>( &'life0 self, event_id: Uuid, handler_id: String, result: Value, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Mark effect execution as completed
Source§

fn complete_effect_with_events<'life0, 'async_trait>( &'life0 self, event_id: Uuid, handler_id: String, result: Value, emitted_events: Vec<EmittedEvent>, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Complete effect and atomically publish emitted events (crash-safe) Read more
Source§

fn fail_effect<'life0, 'async_trait>( &'life0 self, event_id: Uuid, handler_id: String, error: String, attempts: i32, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Mark effect execution as failed and schedule retry (store-defined backoff)
Source§

fn dlq_effect<'life0, 'async_trait>( &'life0 self, event_id: Uuid, handler_id: String, error: String, reason: String, attempts: i32, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Move effect to DLQ (permanently failed)
Source§

fn dlq_effect_with_events<'life0, 'async_trait>( &'life0 self, event_id: Uuid, handler_id: String, error: String, reason: String, attempts: i32, emitted_events: Vec<EmittedEvent>, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Move effect to DLQ and atomically publish mapped terminal events. Read more
Source§

fn subscribe_workflow_events<'life0, 'async_trait>( &'life0 self, correlation_id: Uuid, ) -> Pin<Box<dyn Future<Output = Result<Box<dyn Stream<Item = WorkflowEvent> + Send + Unpin>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Subscribe to events for a specific workflow via LISTEN/NOTIFY. Read more
Source§

fn get_workflow_status<'life0, 'async_trait>( &'life0 self, correlation_id: Uuid, ) -> Pin<Box<dyn Future<Output = Result<WorkflowStatus>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get workflow status for a correlation ID Read more
Source§

fn join_same_batch_append_and_maybe_claim<'life0, 'async_trait>( &'life0 self, join_handler_id: String, correlation_id: Uuid, source_event_id: Uuid, source_event_type: String, source_payload: Value, source_created_at: DateTime<Utc>, batch_id: Uuid, batch_index: i32, batch_size: i32, join_window_timeout_seconds: Option<i32>, ) -> Pin<Box<dyn Future<Output = Result<Option<Vec<JoinEntry>>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Append a terminal item into a same-batch join window and attempt to claim the window for processing when complete. Read more
Source§

fn join_same_batch_complete<'life0, 'async_trait>( &'life0 self, join_handler_id: String, correlation_id: Uuid, batch_id: Uuid, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Mark a claimed same-batch window as completed and clear durable join rows.
Source§

fn join_same_batch_release<'life0, 'async_trait>( &'life0 self, join_handler_id: String, correlation_id: Uuid, batch_id: Uuid, error: String, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Release a claimed same-batch window back to open status after a handler error so retries can claim it again.
Source§

fn expire_same_batch_windows<'life0, 'async_trait>( &'life0 self, now: DateTime<Utc>, ) -> Pin<Box<dyn Future<Output = Result<Vec<ExpiredJoinWindow>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Expire same-batch windows that have exceeded their configured timeout.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more