Skip to main content

PostgresProvider

Struct PostgresProvider 

Source
pub struct PostgresProvider { /* private fields */ }
Expand description

PostgreSQL-based provider for Duroxide durable orchestrations.

Implements the Provider and ProviderAdmin traits from Duroxide, storing orchestration state, history, and work queues in PostgreSQL.

§Example

use duroxide_pg_opt::PostgresProvider;

// Connect using DATABASE_URL or explicit connection string
let provider = PostgresProvider::new("postgres://localhost/mydb").await?;

// Or use a custom schema for isolation
let provider = PostgresProvider::new_with_schema(
    "postgres://localhost/mydb",
    Some("my_app"),
).await?;

Implementations§

Source§

impl PostgresProvider

Source

pub async fn new(database_url: &str) -> Result<Self>

Create a new provider with default settings (long-poll enabled).

Source

pub async fn new_with_schema( database_url: &str, schema_name: Option<&str>, ) -> Result<Self>

Create a new provider with a custom schema.

Source

pub async fn new_with_options( database_url: &str, schema_name: Option<&str>, config: LongPollConfig, ) -> Result<Self>

Create a new provider with full configuration options.

Source

pub async fn new_with_fault_injection( database_url: &str, schema_name: Option<&str>, config: LongPollConfig, fault_injector: Arc<FaultInjector>, ) -> Result<Self>

Create a new provider with fault injection for testing.

This constructor allows injecting faults to test resilience scenarios. The FaultInjector can be used to disable the notifier thread.

Source

pub async fn initialize_schema(&self) -> Result<()>

Source

pub fn pool(&self) -> &PgPool

Get the database pool (for testing)

Source

pub fn schema_name(&self) -> &str

Get the schema name (for testing)

Source

pub async fn cleanup_schema(&self) -> Result<()>

Clean up schema after tests (drops all tables and optionally the schema)

SAFETY: Never drops the “public” schema itself, only tables within it. Only drops the schema if it’s a custom schema (not “public”).

Trait Implementations§

Source§

impl Drop for PostgresProvider

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more
Source§

impl Provider for PostgresProvider

Source§

fn name(&self) -> &str

Returns the name of this provider implementation. Read more
Source§

fn version(&self) -> &str

Returns the version of this provider implementation. Read more
Source§

fn fetch_orchestration_item<'life0, 'life1, 'async_trait>( &'life0 self, lock_timeout: Duration, poll_timeout: Duration, filter: Option<&'life1 DispatcherCapabilityFilter>, ) -> Pin<Box<dyn Future<Output = Result<Option<(OrchestrationItem, String, u32)>, ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Fetch the next orchestration work item atomically. Read more
Source§

fn ack_orchestration_item<'life0, 'life1, 'async_trait>( &'life0 self, lock_token: &'life1 str, execution_id: u64, history_delta: Vec<Event>, worker_items: Vec<WorkItem>, orchestrator_items: Vec<WorkItem>, metadata: ExecutionMetadata, cancelled_activities: Vec<ScheduledActivityIdentifier>, ) -> Pin<Box<dyn Future<Output = Result<(), ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Acknowledge successful orchestration processing atomically. Read more
Source§

fn abandon_orchestration_item<'life0, 'life1, 'async_trait>( &'life0 self, lock_token: &'life1 str, delay: Option<Duration>, ignore_attempt: bool, ) -> Pin<Box<dyn Future<Output = Result<(), ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Abandon orchestration processing (used for errors/retries). Read more
Source§

fn read<'life0, 'life1, 'async_trait>( &'life0 self, instance: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<Vec<Event>, ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Read the full history for the latest execution of an instance. Read more
Source§

fn append_with_execution<'life0, 'life1, 'async_trait>( &'life0 self, instance: &'life1 str, execution_id: u64, new_events: Vec<Event>, ) -> Pin<Box<dyn Future<Output = Result<(), ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Append events to a specific execution. Read more
Source§

fn enqueue_for_worker<'life0, 'async_trait>( &'life0 self, item: WorkItem, ) -> Pin<Box<dyn Future<Output = Result<(), ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Enqueue an activity execution request. Read more
Source§

fn fetch_work_item<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, lock_timeout: Duration, poll_timeout: Duration, session: Option<&'life1 SessionFetchConfig>, tag_filter: &'life2 TagFilter, ) -> Pin<Box<dyn Future<Output = Result<Option<(WorkItem, String, u32)>, ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Dequeue a single work item with peek-lock semantics. Read more
Source§

fn ack_work_item<'life0, 'life1, 'async_trait>( &'life0 self, token: &'life1 str, completion: Option<WorkItem>, ) -> Pin<Box<dyn Future<Output = Result<(), ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Acknowledge successful processing of a work item. Read more
Source§

fn renew_work_item_lock<'life0, 'life1, 'async_trait>( &'life0 self, token: &'life1 str, extend_for: Duration, ) -> Pin<Box<dyn Future<Output = Result<(), ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Renew the lock on a worker queue item. Read more
Source§

fn abandon_work_item<'life0, 'life1, 'async_trait>( &'life0 self, token: &'life1 str, delay: Option<Duration>, ignore_attempt: bool, ) -> Pin<Box<dyn Future<Output = Result<(), ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Abandon work item processing (release lock without completing). Read more
Source§

fn renew_orchestration_item_lock<'life0, 'life1, 'async_trait>( &'life0 self, token: &'life1 str, extend_for: Duration, ) -> Pin<Box<dyn Future<Output = Result<(), ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Renew the lock on an orchestration item. Read more
Source§

fn enqueue_for_orchestrator<'life0, 'async_trait>( &'life0 self, item: WorkItem, delay: Option<Duration>, ) -> Pin<Box<dyn Future<Output = Result<(), ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Enqueue a work item to the orchestrator queue. Read more
Source§

fn read_with_execution<'life0, 'life1, 'async_trait>( &'life0 self, instance: &'life1 str, execution_id: u64, ) -> Pin<Box<dyn Future<Output = Result<Vec<Event>, ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Read the full event history for a specific execution within an instance. Read more
Source§

fn renew_session_lock<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, owner_ids: &'life1 [&'life2 str], extend_for: Duration, idle_timeout: Duration, ) -> Pin<Box<dyn Future<Output = Result<usize, ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Heartbeat all non-idle sessions owned by the given workers. Read more
Source§

fn cleanup_orphaned_sessions<'life0, 'async_trait>( &'life0 self, _idle_timeout: Duration, ) -> Pin<Box<dyn Future<Output = Result<usize, ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Sweep orphaned session entries. Read more
Source§

fn as_management_capability(&self) -> Option<&dyn ProviderAdmin>

Check if this provider implements management capabilities. Read more
Source§

fn get_custom_status<'life0, 'life1, 'async_trait>( &'life0 self, instance: &'life1 str, last_seen_version: u64, ) -> Pin<Box<dyn Future<Output = Result<Option<(Option<String>, u64)>, ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Lightweight check for custom status changes. Read more
Source§

fn get_kv_value<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, instance_id: &'life1 str, key: &'life2 str, ) -> Pin<Box<dyn Future<Output = Result<Option<String>, ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Read a single KV entry for the given instance. Read more
Source§

impl ProviderAdmin for PostgresProvider

Source§

fn list_instances<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<Vec<String>, ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

List all known instance IDs. Read more
Source§

fn list_instances_by_status<'life0, 'life1, 'async_trait>( &'life0 self, status: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<Vec<String>, ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

List instances matching a status filter. Read more
Source§

fn list_executions<'life0, 'life1, 'async_trait>( &'life0 self, instance: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<Vec<u64>, ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

List all execution IDs for an instance. Read more
Source§

fn read_history_with_execution_id<'life0, 'life1, 'async_trait>( &'life0 self, instance: &'life1 str, execution_id: u64, ) -> Pin<Box<dyn Future<Output = Result<Vec<Event>, ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Read the full event history for a specific execution within an instance. Read more
Source§

fn read_history<'life0, 'life1, 'async_trait>( &'life0 self, instance: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<Vec<Event>, ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Read the full event history for the latest execution of an instance. Read more
Source§

fn latest_execution_id<'life0, 'life1, 'async_trait>( &'life0 self, instance: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<u64, ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Get the latest (current) execution ID for an instance. Read more
Source§

fn get_instance_info<'life0, 'life1, 'async_trait>( &'life0 self, instance: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<InstanceInfo, ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Get comprehensive information about an instance. Read more
Source§

fn get_execution_info<'life0, 'life1, 'async_trait>( &'life0 self, instance: &'life1 str, execution_id: u64, ) -> Pin<Box<dyn Future<Output = Result<ExecutionInfo, ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Get detailed information about a specific execution. Read more
Source§

fn get_system_metrics<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<SystemMetrics, ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get system-wide metrics for the orchestration engine. Read more
Source§

fn get_queue_depths<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<QueueDepths, ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get the current depths of the internal work queues. Read more
Source§

fn list_children<'life0, 'life1, 'async_trait>( &'life0 self, instance_id: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<Vec<String>, ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

List direct children of an instance. Read more
Source§

fn get_parent_id<'life0, 'life1, 'async_trait>( &'life0 self, instance_id: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<Option<String>, ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Get the parent instance ID. Read more
Source§

fn delete_instances_atomic<'life0, 'life1, 'async_trait>( &'life0 self, ids: &'life1 [String], force: bool, ) -> Pin<Box<dyn Future<Output = Result<DeleteInstanceResult, ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Atomically delete a batch of instances. Read more
Source§

fn delete_instance_bulk<'life0, 'async_trait>( &'life0 self, filter: InstanceFilter, ) -> Pin<Box<dyn Future<Output = Result<DeleteInstanceResult, ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Delete multiple orchestration instances matching the filter criteria. Read more
Source§

fn prune_executions<'life0, 'life1, 'async_trait>( &'life0 self, instance_id: &'life1 str, options: PruneOptions, ) -> Pin<Box<dyn Future<Output = Result<PruneResult, ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Prune old executions from a single instance. Read more
Source§

fn prune_executions_bulk<'life0, 'async_trait>( &'life0 self, filter: InstanceFilter, options: PruneOptions, ) -> Pin<Box<dyn Future<Output = Result<PruneResult, ProviderError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Prune old executions from multiple instances matching the filter. Read more
Source§

fn get_instance_tree<'life0, 'life1, 'async_trait>( &'life0 self, instance_id: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<InstanceTree, ProviderError>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, Self: 'async_trait,

Get the full instance tree rooted at the given instance. Read more
Source§

fn delete_instance<'life0, 'life1, 'async_trait>( &'life0 self, instance_id: &'life1 str, force: bool, ) -> Pin<Box<dyn Future<Output = Result<DeleteInstanceResult, ProviderError>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, Self: 'async_trait,

Delete a single orchestration instance and all its associated data. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more