Skip to main content

PostgresSagaStore

Struct PostgresSagaStore 

Source
pub struct PostgresSagaStore { /* private fields */ }
Expand description

PostgreSQL-backed Saga Store

Manages persistent storage of sagas and their execution state using PostgreSQL. Provides crash recovery and distributed coordination across federation instances.

Implementations§

Source§

impl PostgresSagaStore

Source

pub async fn new(_connection_string: &str) -> Result<Self>

Create a new PostgreSQL saga store with default configuration.

Connects to PostgreSQL and verifies connectivity.

§Arguments
  • _connection_string - PostgreSQL connection string (currently unused, uses default config)
§Errors

Returns SagaStoreError::Database if connection fails.

§Example
let store = PostgresSagaStore::new("postgresql://localhost/fraiseql").await?;
Source

pub async fn migrate_schema(&self) -> Result<()>

Create database schema and indices if they don’t exist

Uses the trinity pattern with proper table naming:

  • pk_ (BIGINT PRIMARY KEY): Surrogate key for efficient internal joins
  • id (UUID NOT NULL UNIQUE): Natural key for distributed systems
  • tb_ prefix: Table naming convention for trinity pattern
  • Foreign keys use surrogate keys for better performance
§Errors

Returns SagaStoreError::Database if schema creation fails.

Source

pub async fn health_check(&self) -> Result<()>

Health check - verifies database connectivity

§Errors

Returns SagaStoreError::Database if connection fails.

Source

pub async fn save_saga(&self, saga: &Saga) -> Result<()>

Save or update a saga

Uses upsert semantics - inserts if new, updates if exists. Trinity pattern: surrogate pk_ auto-generated, natural key id (UUID) maintained.

§Errors

Returns SagaStoreError::Database if the operation fails.

Source

pub async fn load_saga(&self, saga_id: Uuid) -> Result<Option<Saga>>

Load a saga by ID

§Errors

Returns SagaStoreError::Database if the query fails.

Source

pub async fn load_all_sagas(&self) -> Result<Vec<Saga>>

Load all sagas ordered by creation time (newest first)

§Errors

Returns SagaStoreError::Database if the query fails.

Source

pub async fn load_sagas_by_state(&self, state: &SagaState) -> Result<Vec<Saga>>

Load sagas filtered by state

§Errors

Returns SagaStoreError::Database if the query fails.

Source

pub async fn update_saga_state( &self, saga_id: Uuid, state: &SagaState, ) -> Result<()>

Update saga state and automatically set completion time for terminal states

Terminal states (Completed, Compensated) automatically receive completed_at timestamp.

§Errors

Returns SagaStoreError::Database if the update fails.

Source

pub async fn load_saga_step(&self, step_id: Uuid) -> Result<Option<SagaStep>>

Load a saga step by ID

§Errors

Returns SagaStoreError::Database if the query fails.

Source

pub async fn load_saga_steps(&self, saga_id: Uuid) -> Result<Vec<SagaStep>>

Load all saga steps for a saga, ordered by step number (Trinity pattern with JOIN)

§Errors

Returns SagaStoreError::Database if the query fails.

Source

pub async fn update_saga_step_state( &self, step_id: Uuid, state: &StepState, ) -> Result<()>

Update saga step state and automatically set completion time for terminal states

Terminal states (Completed, Failed) automatically receive completed_at timestamp.

§Errors

Returns SagaStoreError::Database if the update fails.

Source

pub async fn save_saga_step(&self, step: &SagaStep) -> Result<()>

Save or update a saga step

Uses upsert semantics - inserts if new, updates if exists. Trinity pattern: subquery converts saga natural key (UUID) to surrogate key (BIGINT).

§Errors

Returns SagaStoreError::Database if the operation fails.

Source

pub async fn update_saga_step_result( &self, step_id: Uuid, result: &Value, ) -> Result<()>

Update the result of a completed saga step

§Errors

Returns SagaStoreError::Database if the update fails.

Source

pub async fn mark_saga_for_recovery( &self, saga_id: Uuid, reason: &str, ) -> Result<()>

Mark a saga for recovery

Creates a recovery record tracking an attempt to recover a failed saga. Trinity pattern: subquery converts saga natural key (UUID) to surrogate key (BIGINT).

§Errors

Returns SagaStoreError::Database if the operation fails.

Source

pub async fn find_pending_sagas(&self) -> Result<Vec<Saga>>

Find all pending sagas (not yet started)

§Errors

Returns SagaStoreError::Database if the query fails.

Source

pub async fn clear_recovery_record(&self, saga_id: Uuid) -> Result<()>

Clear recovery record for a saga

Trinity pattern: uses subquery to convert saga natural key to surrogate key.

§Errors

Returns SagaStoreError::Database if the operation fails.

Source

pub async fn delete_saga(&self, saga_id: Uuid) -> Result<()>

Delete a saga and all associated steps and recovery records

CASCADE constraints ensure related records are deleted. Uses natural key (UUID) for deletion.

§Errors

Returns SagaStoreError::Database if the operation fails.

Source

pub async fn delete_completed_sagas(&self) -> Result<u64>

Delete all completed and compensated sagas

§Errors

Returns SagaStoreError::Database if the operation fails.

§Returns

Number of sagas deleted.

Source

pub async fn cleanup_stale_sagas(&self, hours_threshold: i64) -> Result<u64>

Delete sagas older than the specified threshold that are in a terminal state

§Arguments
  • hours_threshold - Delete sagas created more than this many hours ago
§Errors

Returns SagaStoreError::Database if the operation fails.

§Returns

Number of sagas deleted.

Source

pub async fn get_recovery_attempts(&self, saga_id: Uuid) -> Result<i32>

Get the maximum recovery attempt count for a saga

Trinity pattern: uses subquery to convert saga natural key to surrogate key.

§Errors

Returns SagaStoreError::Database if the query fails.

Source

pub async fn save_recovery_record(&self, recovery: &SagaRecovery) -> Result<()>

Save a saga recovery record

Trinity pattern: subquery converts saga natural key (UUID) to surrogate key (BIGINT).

§Errors

Returns SagaStoreError::Database if the operation fails.

Source

pub async fn cleanup_all(&self) -> Result<()>

Delete all sagas, steps, and recovery records (for testing)

§Errors

Returns SagaStoreError::Database if the operation fails.

Source

pub async fn saga_count(&self) -> Result<i64>

Get total number of sagas in the database

§Errors

Returns SagaStoreError::Database if the query fails.

Source

pub async fn step_count(&self) -> Result<i64>

Get total number of saga steps in the database

§Errors

Returns SagaStoreError::Database if the query fails.

Source

pub async fn recovery_count(&self) -> Result<i64>

Get total number of saga recovery records in the database

§Errors

Returns SagaStoreError::Database if the query fails.

Source

pub async fn find_stuck_sagas(&self) -> Result<Vec<Saga>>

Find all stuck sagas (in executing state that may have crashed)

§Errors

Returns SagaStoreError::Database if the query fails.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more