pub struct PostgresSagaStore { /* private fields */ }Expand description
PostgreSQL-backed Saga Store
Manages persistent storage of sagas and their execution state using PostgreSQL. Provides crash recovery and distributed coordination across federation instances.
Implementations§
Source§impl PostgresSagaStore
impl PostgresSagaStore
Sourcepub async fn new(_connection_string: &str) -> Result<Self>
pub async fn new(_connection_string: &str) -> Result<Self>
Create a new PostgreSQL saga store with default configuration.
Connects to PostgreSQL and verifies connectivity.
§Arguments
_connection_string- PostgreSQL connection string (currently unused, uses default config)
§Errors
Returns SagaStoreError::Database if connection fails.
§Example
let store = PostgresSagaStore::new("postgresql://localhost/fraiseql").await?;Sourcepub async fn migrate_schema(&self) -> Result<()>
pub async fn migrate_schema(&self) -> Result<()>
Create database schema and indices if they don’t exist
Uses the trinity pattern with proper table naming:
pk_(BIGINT PRIMARY KEY): Surrogate key for efficient internal joinsid(UUID NOT NULL UNIQUE): Natural key for distributed systemstb_prefix: Table naming convention for trinity pattern- Foreign keys use surrogate keys for better performance
§Errors
Returns SagaStoreError::Database if schema creation fails.
Sourcepub async fn health_check(&self) -> Result<()>
pub async fn health_check(&self) -> Result<()>
Health check - verifies database connectivity
§Errors
Returns SagaStoreError::Database if connection fails.
Sourcepub async fn save_saga(&self, saga: &Saga) -> Result<()>
pub async fn save_saga(&self, saga: &Saga) -> Result<()>
Save or update a saga
Uses upsert semantics - inserts if new, updates if exists. Trinity pattern: surrogate pk_ auto-generated, natural key id (UUID) maintained.
§Errors
Returns SagaStoreError::Database if the operation fails.
Sourcepub async fn load_all_sagas(&self) -> Result<Vec<Saga>>
pub async fn load_all_sagas(&self) -> Result<Vec<Saga>>
Load all sagas ordered by creation time (newest first)
§Errors
Returns SagaStoreError::Database if the query fails.
Sourcepub async fn update_saga_state(
&self,
saga_id: Uuid,
state: &SagaState,
) -> Result<()>
pub async fn update_saga_state( &self, saga_id: Uuid, state: &SagaState, ) -> Result<()>
Update saga state and automatically set completion time for terminal states
Terminal states (Completed, Compensated) automatically receive completed_at timestamp.
§Errors
Returns SagaStoreError::Database if the update fails.
Sourcepub async fn load_saga_steps(&self, saga_id: Uuid) -> Result<Vec<SagaStep>>
pub async fn load_saga_steps(&self, saga_id: Uuid) -> Result<Vec<SagaStep>>
Load all saga steps for a saga, ordered by step number (Trinity pattern with JOIN)
§Errors
Returns SagaStoreError::Database if the query fails.
Sourcepub async fn update_saga_step_state(
&self,
step_id: Uuid,
state: &StepState,
) -> Result<()>
pub async fn update_saga_step_state( &self, step_id: Uuid, state: &StepState, ) -> Result<()>
Update saga step state and automatically set completion time for terminal states
Terminal states (Completed, Failed) automatically receive completed_at timestamp.
§Errors
Returns SagaStoreError::Database if the update fails.
Sourcepub async fn save_saga_step(&self, step: &SagaStep) -> Result<()>
pub async fn save_saga_step(&self, step: &SagaStep) -> Result<()>
Save or update a saga step
Uses upsert semantics - inserts if new, updates if exists. Trinity pattern: subquery converts saga natural key (UUID) to surrogate key (BIGINT).
§Errors
Returns SagaStoreError::Database if the operation fails.
Sourcepub async fn update_saga_step_result(
&self,
step_id: Uuid,
result: &Value,
) -> Result<()>
pub async fn update_saga_step_result( &self, step_id: Uuid, result: &Value, ) -> Result<()>
Update the result of a completed saga step
§Errors
Returns SagaStoreError::Database if the update fails.
Sourcepub async fn mark_saga_for_recovery(
&self,
saga_id: Uuid,
reason: &str,
) -> Result<()>
pub async fn mark_saga_for_recovery( &self, saga_id: Uuid, reason: &str, ) -> Result<()>
Mark a saga for recovery
Creates a recovery record tracking an attempt to recover a failed saga. Trinity pattern: subquery converts saga natural key (UUID) to surrogate key (BIGINT).
§Errors
Returns SagaStoreError::Database if the operation fails.
Sourcepub async fn find_pending_sagas(&self) -> Result<Vec<Saga>>
pub async fn find_pending_sagas(&self) -> Result<Vec<Saga>>
Find all pending sagas (not yet started)
§Errors
Returns SagaStoreError::Database if the query fails.
Sourcepub async fn clear_recovery_record(&self, saga_id: Uuid) -> Result<()>
pub async fn clear_recovery_record(&self, saga_id: Uuid) -> Result<()>
Clear recovery record for a saga
Trinity pattern: uses subquery to convert saga natural key to surrogate key.
§Errors
Returns SagaStoreError::Database if the operation fails.
Sourcepub async fn delete_saga(&self, saga_id: Uuid) -> Result<()>
pub async fn delete_saga(&self, saga_id: Uuid) -> Result<()>
Delete a saga and all associated steps and recovery records
CASCADE constraints ensure related records are deleted. Uses natural key (UUID) for deletion.
§Errors
Returns SagaStoreError::Database if the operation fails.
Sourcepub async fn delete_completed_sagas(&self) -> Result<u64>
pub async fn delete_completed_sagas(&self) -> Result<u64>
Sourcepub async fn cleanup_stale_sagas(&self, hours_threshold: i64) -> Result<u64>
pub async fn cleanup_stale_sagas(&self, hours_threshold: i64) -> Result<u64>
Sourcepub async fn get_recovery_attempts(&self, saga_id: Uuid) -> Result<i32>
pub async fn get_recovery_attempts(&self, saga_id: Uuid) -> Result<i32>
Get the maximum recovery attempt count for a saga
Trinity pattern: uses subquery to convert saga natural key to surrogate key.
§Errors
Returns SagaStoreError::Database if the query fails.
Sourcepub async fn save_recovery_record(&self, recovery: &SagaRecovery) -> Result<()>
pub async fn save_recovery_record(&self, recovery: &SagaRecovery) -> Result<()>
Save a saga recovery record
Trinity pattern: subquery converts saga natural key (UUID) to surrogate key (BIGINT).
§Errors
Returns SagaStoreError::Database if the operation fails.
Sourcepub async fn cleanup_all(&self) -> Result<()>
pub async fn cleanup_all(&self) -> Result<()>
Delete all sagas, steps, and recovery records (for testing)
§Errors
Returns SagaStoreError::Database if the operation fails.
Sourcepub async fn saga_count(&self) -> Result<i64>
pub async fn saga_count(&self) -> Result<i64>
Get total number of sagas in the database
§Errors
Returns SagaStoreError::Database if the query fails.
Sourcepub async fn step_count(&self) -> Result<i64>
pub async fn step_count(&self) -> Result<i64>
Get total number of saga steps in the database
§Errors
Returns SagaStoreError::Database if the query fails.
Sourcepub async fn recovery_count(&self) -> Result<i64>
pub async fn recovery_count(&self) -> Result<i64>
Get total number of saga recovery records in the database
§Errors
Returns SagaStoreError::Database if the query fails.