Skip to main content

WorkflowRuntime

Struct WorkflowRuntime 

Source
pub struct WorkflowRuntime<S>{ /* private fields */ }
Expand description

Effect execution runtime.

The runtime coordinates effect and timer workers to process effects from the outbox. It routes effects to the appropriate workflow handler based on the workflow_type.

§Lifecycle

  1. Create with [WorkflowRuntime::builder(store, WorkflowServiceConfig::default())]
  2. Register workflows with WorkflowBuilder::register()
  3. Configure with WorkflowBuilder::config()
  4. Build with WorkflowBuilder::build_runtime()
  5. Run with WorkflowRuntime::run() (not yet implemented)

§Example

let runtime = WorkflowRuntime::builder(store, WorkflowServiceConfig::default())
    .register::<OrderWorkflow>(order_handler)
    .build_runtime()?;

// Run until shutdown signal
runtime.run(shutdown_signal).await?;

Implementations§

Source§

impl<S> WorkflowRuntime<S>

Source

pub fn builder( store: S, service_config: WorkflowServiceConfig, ) -> WorkflowBuilder<S>

Create a new runtime builder.

Source

pub fn config(&self) -> &RuntimeConfig

Returns the runtime configuration.

Source

pub fn worker_id(&self) -> &str

Returns the worker identifier.

Source

pub fn workflow_count(&self) -> usize

Returns the number of registered workflows.

Source§

impl<S> WorkflowRuntime<S>

Source

pub async fn run<F>(self, shutdown: F) -> Result<()>
where F: Future<Output = ()> + Send,

Run the effect and timer workers until shutdown signal.

This method starts workers which poll the outbox:

  • Effect workers: process immediate effects via handlers
  • Timer workers: process due timers by routing embedded inputs

The number of workers is controlled by effect_workers and timer_workers in RuntimeConfig. Workers coordinate via FOR UPDATE SKIP LOCKED to avoid processing the same effect twice.

§Shutdown Behavior

When the shutdown future completes:

  1. All workers stop claiming new work
  2. Wait for current work (if any) to complete
  3. Return cleanly after timeout
§Example
use tokio::signal;

let runtime = WorkflowRuntime::builder(pg_store, WorkflowServiceConfig::default())
    .register(order_handler)
    .config(RuntimeConfig {
        effect_workers: 4,  // 4 parallel effect workers
        ..Default::default()
    })
    .build_runtime()?;

// Run until Ctrl+C
runtime.run(async { signal::ctrl_c().await.ok(); }).await?;
Source

pub async fn fetch_dead_letters( &self, query: DeadLetterQuery, ) -> Result<Vec<DeadLetter>>

Fetch dead-lettered effects.

Returns effects that have exceeded the configured max_attempts and are no longer being retried.

§Example
use ironflow::runtime::outbox::DeadLetterQuery;

// Fetch all dead letters
let dead_letters = runtime.fetch_dead_letters(DeadLetterQuery::new()).await?;

// Fetch dead letters for a specific workflow type
let order_dead_letters = runtime
    .fetch_dead_letters(DeadLetterQuery::new().workflow_type("order"))
    .await?;
Source

pub async fn count_dead_letters(&self, query: DeadLetterQuery) -> Result<u64>

Count dead-lettered effects.

Useful for monitoring and alerting on dead letter queue size.

§Example
let count = runtime.count_dead_letters(DeadLetterQuery::new()).await?;
println!("Dead letters: {}", count);
Source

pub async fn retry_dead_letter(&self, effect_id: Uuid) -> Result<bool>

Retry a dead-lettered effect.

Resets the effect’s attempt count to 0, making it available for processing again by the effect worker.

Returns Ok(true) if the effect was found and reset, Ok(false) if the effect was not found or already processed.

§Example
let dead_letters = runtime.fetch_dead_letters(DeadLetterQuery::new()).await?;
for dl in dead_letters {
    runtime.retry_dead_letter(dl.id).await?;
}
Source

pub async fn fetch_timer_dead_letters( &self, query: DeadLetterQuery, ) -> Result<Vec<DeadLetter>>

Fetch dead-lettered timers.

Source

pub async fn count_timer_dead_letters( &self, query: DeadLetterQuery, ) -> Result<u64>

Count dead-lettered timers.

Source

pub async fn retry_timer_dead_letter(&self, timer_id: Uuid) -> Result<bool>

Retry a dead-lettered timer.

Trait Implementations§

Source§

impl<S> Clone for WorkflowRuntime<S>

Source§

fn clone(&self) -> WorkflowRuntime<S>

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

§

impl<S> Freeze for WorkflowRuntime<S>
where S: Freeze,

§

impl<S> !RefUnwindSafe for WorkflowRuntime<S>

§

impl<S> Send for WorkflowRuntime<S>

§

impl<S> Sync for WorkflowRuntime<S>

§

impl<S> Unpin for WorkflowRuntime<S>
where S: Unpin,

§

impl<S> !UnwindSafe for WorkflowRuntime<S>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more