pub struct WorkflowContext {
pub run_id: Uuid,
pub workflow_name: String,
pub version: u32,
pub started_at: DateTime<Utc>,
pub auth: AuthContext,
/* private fields */
}Expand description
Context available to workflow handlers.
Fields§
§run_id: UuidWorkflow run ID.
workflow_name: StringWorkflow name.
version: u32Workflow version.
started_at: DateTime<Utc>When the workflow started.
auth: AuthContextAuthentication context.
Implementations§
Source§impl WorkflowContext
impl WorkflowContext
Sourcepub fn new(
run_id: Uuid,
workflow_name: String,
version: u32,
db_pool: PgPool,
http_client: Client,
) -> Self
pub fn new( run_id: Uuid, workflow_name: String, version: u32, db_pool: PgPool, http_client: Client, ) -> Self
Create a new workflow context.
Sourcepub fn resumed(
run_id: Uuid,
workflow_name: String,
version: u32,
started_at: DateTime<Utc>,
db_pool: PgPool,
http_client: Client,
) -> Self
pub fn resumed( run_id: Uuid, workflow_name: String, version: u32, started_at: DateTime<Utc>, db_pool: PgPool, http_client: Client, ) -> Self
Create a resumed workflow context.
Sourcepub fn with_env_provider(self, provider: Arc<dyn EnvProvider>) -> Self
pub fn with_env_provider(self, provider: Arc<dyn EnvProvider>) -> Self
Set environment provider.
Sourcepub fn with_resumed_from_sleep(self) -> Self
pub fn with_resumed_from_sleep(self) -> Self
Mark that this context resumed from a sleep (timer expired).
Sourcepub fn with_suspend_channel(self, tx: Sender<SuspendReason>) -> Self
pub fn with_suspend_channel(self, tx: Sender<SuspendReason>) -> Self
Set the suspend channel.
Sourcepub fn with_tenant(self, tenant_id: Uuid) -> Self
pub fn with_tenant(self, tenant_id: Uuid) -> Self
Set the tenant ID.
pub fn tenant_id(&self) -> Option<Uuid>
pub fn is_resumed(&self) -> bool
pub fn workflow_time(&self) -> DateTime<Utc>
pub fn db(&self) -> &PgPool
pub fn http(&self) -> &Client
Sourcepub fn with_auth(self, auth: AuthContext) -> Self
pub fn with_auth(self, auth: AuthContext) -> Self
Set authentication context.
Sourcepub fn with_step_states(self, states: HashMap<String, StepState>) -> Self
pub fn with_step_states(self, states: HashMap<String, StepState>) -> Self
Restore step states from persisted data.
pub fn get_step_state(&self, name: &str) -> Option<StepState>
pub fn is_step_completed(&self, name: &str) -> bool
Sourcepub fn is_step_started(&self, name: &str) -> bool
pub fn is_step_started(&self, name: &str) -> bool
Check if a step has been started (running, completed, or failed).
Use this to guard steps that should only execute once, even across workflow suspension and resumption.
pub fn get_step_result<T: DeserializeOwned>(&self, name: &str) -> Option<T>
Sourcepub fn record_step_start(&self, name: &str)
pub fn record_step_start(&self, name: &str)
Record step start.
If the step is already running or beyond (completed/failed), this is a no-op. This prevents race conditions when resuming workflows.
Sourcepub fn record_step_complete(&self, name: &str, result: Value)
pub fn record_step_complete(&self, name: &str, result: Value)
Record step completion (fire-and-forget database update).
Use record_step_complete_async if you need to ensure persistence before continuing.
Sourcepub async fn record_step_complete_async(&self, name: &str, result: Value)
pub async fn record_step_complete_async(&self, name: &str, result: Value)
Record step completion and wait for database persistence.
Sourcepub fn record_step_failure(&self, name: &str, error: impl Into<String>)
pub fn record_step_failure(&self, name: &str, error: impl Into<String>)
Record step failure.
Sourcepub fn record_step_compensated(&self, name: &str)
pub fn record_step_compensated(&self, name: &str)
Record step compensation.
pub fn completed_steps_reversed(&self) -> Vec<String>
pub fn all_step_states(&self) -> HashMap<String, StepState>
pub fn elapsed(&self) -> Duration
Sourcepub fn register_compensation(
&self,
step_name: &str,
handler: CompensationHandler,
)
pub fn register_compensation( &self, step_name: &str, handler: CompensationHandler, )
Register a compensation handler for a step.
pub fn get_compensation_handler( &self, step_name: &str, ) -> Option<CompensationHandler>
pub fn has_compensation(&self, step_name: &str) -> bool
Sourcepub async fn run_compensation(&self) -> Vec<(String, bool)>
pub async fn run_compensation(&self) -> Vec<(String, bool)>
Run compensation for all completed steps in reverse order. Returns a list of (step_name, success) tuples.
pub fn compensation_handlers(&self) -> HashMap<String, CompensationHandler>
Sourcepub async fn wait_for_event<T: DeserializeOwned>(
&self,
event_name: &str,
timeout: Option<Duration>,
) -> Result<T>
pub async fn wait_for_event<T: DeserializeOwned>( &self, event_name: &str, timeout: Option<Duration>, ) -> Result<T>
Wait for an external event with optional timeout.
The workflow suspends until the event arrives or the timeout expires. Events are correlated by the workflow run ID.
§Example
let payment: PaymentConfirmation = ctx.wait_for_event(
"payment_confirmed",
Some(Duration::from_secs(7 * 24 * 60 * 60)), // 7 days
).await?;Sourcepub fn parallel(&self) -> ParallelBuilder<'_>
pub fn parallel(&self) -> ParallelBuilder<'_>
Create a parallel builder for executing steps concurrently.
§Example
let results = ctx.parallel()
.step("fetch_user", || async { get_user(id).await })
.step("fetch_orders", || async { get_orders(id).await })
.step_with_compensate("charge_card",
|| async { charge_card(amount).await },
|charge| async move { refund(charge.id).await })
.run().await?;
let user: User = results.get("fetch_user")?;
let orders: Vec<Order> = results.get("fetch_orders")?;Sourcepub fn step<T, F, Fut>(
&self,
name: impl Into<String>,
f: F,
) -> StepRunner<'_, T>
pub fn step<T, F, Fut>( &self, name: impl Into<String>, f: F, ) -> StepRunner<'_, T>
Create a step runner for executing a workflow step.
This provides a fluent API for defining steps with retry, compensation, timeout, and optional behavior.
§Examples
use std::time::Duration;
// Simple step
let data = ctx.step("fetch_data", || async {
Ok(fetch_from_api().await?)
}).run().await?;
// Step with retry (3 attempts, 2 second delay)
ctx.step("send_email", || async {
send_verification_email(&user.email).await
})
.retry(3, Duration::from_secs(2))
.run()
.await?;
// Step with compensation (rollback on later failure)
let charge = ctx.step("charge_card", || async {
charge_credit_card(&card).await
})
.compensate(|charge_result| async move {
refund_charge(&charge_result.charge_id).await
})
.run()
.await?;
// Optional step (failure won't trigger compensation)
ctx.step("notify_slack", || async {
post_to_slack("User signed up!").await
})
.optional()
.run()
.await?;
// Step with timeout
ctx.step("slow_operation", || async {
process_large_file().await
})
.timeout(Duration::from_secs(60))
.run()
.await?;Trait Implementations§
Source§impl EnvAccess for WorkflowContext
impl EnvAccess for WorkflowContext
Source§fn env_provider(&self) -> &dyn EnvProvider
fn env_provider(&self) -> &dyn EnvProvider
Source§fn env_or(&self, key: &str, default: &str) -> String
fn env_or(&self, key: &str, default: &str) -> String
Source§fn env_require(&self, key: &str) -> Result<String>
fn env_require(&self, key: &str) -> Result<String>
Source§fn env_parse<T: FromStr>(&self, key: &str) -> Result<T>
fn env_parse<T: FromStr>(&self, key: &str) -> Result<T>
Source§fn env_parse_or<T: FromStr>(&self, key: &str, default: T) -> Result<T>
fn env_parse_or<T: FromStr>(&self, key: &str, default: T) -> Result<T>
Source§fn env_contains(&self, key: &str) -> bool
fn env_contains(&self, key: &str) -> bool
Auto Trait Implementations§
impl Freeze for WorkflowContext
impl !RefUnwindSafe for WorkflowContext
impl Send for WorkflowContext
impl Sync for WorkflowContext
impl Unpin for WorkflowContext
impl !UnwindSafe for WorkflowContext
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more