Skip to main content

OrchestrationContext

Struct OrchestrationContext 

Source
pub struct OrchestrationContext { /* private fields */ }

Implementations§

Source§

impl OrchestrationContext

Source

pub fn new( history: Vec<Event>, execution_id: u64, instance_id: String, orchestration_name: String, orchestration_version: String, worker_id: Option<String>, ) -> Self

Construct a new context from an existing history vector.

§Parameters
  • orchestration_name - The name of the orchestration being executed.
  • orchestration_version - The semantic version string of the orchestration.
  • worker_id - Optional dispatcher worker ID for logging correlation.
    • Some(id): Used by runtime dispatchers to include worker_id in traces
    • None: Used by standalone/test execution without runtime context
Source

pub fn is_replaying(&self) -> bool

Check if the orchestration is currently replaying history.

Returns true when processing events from persisted history (replay), and false when executing new logic beyond the stored history.

This is useful for skipping side effects during replay, such as:

  • Logging/tracing that should only happen on first execution
  • Metrics that shouldn’t be double-counted
  • External notifications that shouldn’t be re-sent
§Example
if !ctx.is_replaying() {
    // Only log on first execution, not during replay
    println!("Starting workflow for the first time");
}
Source

pub fn trace_info(&self, message: impl Into<String>)

Convenience wrapper for INFO level tracing.

Logs with INFO level and includes instance context automatically. Only emits on first execution, not during replay.

§Example
ctx.trace_info("Processing order");
ctx.trace_info(format!("Processing {} items", 42));
Source

pub fn trace_warn(&self, message: impl Into<String>)

Convenience wrapper for WARN level tracing.

Logs with WARN level and includes instance context automatically. Only emits on first execution, not during replay.

§Example
ctx.trace_warn("Retrying failed operation");
Source

pub fn trace_error(&self, message: impl Into<String>)

Convenience wrapper for ERROR level tracing.

Logs with ERROR level and includes instance context automatically. Only emits on first execution, not during replay.

§Example
ctx.trace_error("Payment processing failed");
Source

pub fn trace_debug(&self, message: impl Into<String>)

Convenience wrapper for DEBUG level tracing.

Logs with DEBUG level and includes instance context automatically. Only emits on first execution, not during replay.

§Example
ctx.trace_debug("Detailed state information");
Source

pub fn instance_id(&self) -> String

Returns the orchestration instance identifier.

This is the unique identifier for this orchestration instance, typically provided when starting the orchestration.

§Example
let id = ctx.instance_id();
ctx.trace_info(format!("Processing instance: {}", id));
Source

pub fn execution_id(&self) -> u64

Returns the current execution ID within this orchestration instance.

The execution ID increments each time continue_as_new() is called. Execution 1 is the initial execution.

§Example
let exec_id = ctx.execution_id();
ctx.trace_info(format!("Execution #{}", exec_id));
Source

pub fn orchestration_name(&self) -> String

Returns the orchestration name.

This is the name registered with the orchestration registry.

§Example
let name = ctx.orchestration_name();
ctx.trace_info(format!("Running orchestration: {}", name));
Source

pub fn orchestration_version(&self) -> String

Returns the orchestration version.

This is the semantic version string associated with the orchestration.

§Example
let version = ctx.orchestration_version();
ctx.trace_info(format!("Version: {}", version));
Source

pub fn is_logging_enabled(&self) -> bool

Indicates whether logging is enabled for the current poll. This is flipped on when a decision is recorded to minimize log noise.

Source

pub fn trace(&self, level: impl Into<String>, message: impl Into<String>)

Emit a structured trace entry with automatic context correlation.

Creates a system call event for deterministic replay and logs to tracing. The log entry automatically includes correlation fields:

  • instance_id - The orchestration instance identifier
  • execution_id - The current execution number
  • orchestration_name - Name of the orchestration
  • orchestration_version - Semantic version
§Determinism

This method is replay-safe: logs are only emitted on first execution, not during replay.

§Example
ctx.trace("INFO", "Processing started");
ctx.trace("WARN", format!("Retry attempt: {}", 3));
ctx.trace("ERROR", "Payment validation failed");
§Output
2024-10-30T10:15:23.456Z INFO duroxide::orchestration [order-123] Processing started

All logs include instance_id, execution_id, orchestration_name for correlation.

Source

pub fn new_guid(&self) -> impl Future<Output = Result<String, String>>

Generate a new deterministic GUID.

This schedules a built-in activity that generates a unique identifier. The GUID is deterministic across replays (the same value is returned when the orchestration replays).

§Example
let guid = ctx.new_guid().await?;
println!("Generated GUID: {}", guid);
Source

pub fn utc_now(&self) -> impl Future<Output = Result<SystemTime, String>>

Get the current UTC time.

This schedules a built-in activity that returns the current time. The time is deterministic across replays (the same value is returned when the orchestration replays).

§Errors

Returns an error if the activity fails or if the time value cannot be parsed.

§Example
let now = ctx.utc_now().await?;
let deadline = now + Duration::from_secs(3600); // 1 hour from now
Source

pub fn continue_as_new( &self, input: impl Into<String>, ) -> impl Future<Output = Result<String, String>>

Continue the current execution as a new execution with fresh input.

This terminates the current execution and starts a new execution with the provided input. Returns a future that never resolves, ensuring code after await is unreachable.

§Example
let n: u32 = 0;
if n < 2 {
    return ctx.continue_as_new("next_input").await; // Execution terminates here
    // This code is unreachable - compiler will warn
}
Ok("completed".to_string())
Source

pub fn continue_as_new_typed<In: Serialize>( &self, input: &In, ) -> impl Future<Output = Result<String, String>>

Source

pub fn continue_as_new_versioned( &self, version: impl Into<String>, input: impl Into<String>, ) -> impl Future<Output = Result<String, String>>

ContinueAsNew to a specific target version (string is parsed as semver later).

Source§

impl OrchestrationContext

Source

pub async fn schedule_activity_with_retry( &self, name: impl Into<String>, input: impl Into<String>, policy: RetryPolicy, ) -> Result<String, String>

Schedule activity with automatic retry on failure.

Retry behavior:

  • Retries on activity errors up to policy.max_attempts
  • Timeouts are NOT retried - if any attempt times out, returns error immediately
  • Only application errors trigger retry logic

Timeout behavior (if policy.total_timeout is set):

  • Each activity attempt is raced against the timeout
  • If the timeout fires before the activity completes → returns timeout error (no retry)
  • If the activity fails with an error before timeout → retry according to policy
§Example
// Simple retry with defaults (no timeout)
let result = ctx.schedule_activity_with_retry(
    "CallAPI",
    "request",
    RetryPolicy::new(3),
).await?;

// Retry with per-attempt timeout and custom backoff
let result = ctx.schedule_activity_with_retry(
    "CallAPI",
    "request",
    RetryPolicy::new(5)
        .with_timeout(Duration::from_secs(30)) // 30s per attempt
        .with_backoff(BackoffStrategy::Fixed { delay: Duration::from_secs(1) }),
).await?;
§Errors

Returns an error if all retry attempts fail or if a timeout occurs (timeouts are not retried).

Source

pub async fn schedule_activity_with_retry_typed<In: Serialize, Out: DeserializeOwned>( &self, name: impl Into<String>, input: &In, policy: RetryPolicy, ) -> Result<Out, String>

Typed variant of schedule_activity_with_retry.

Serializes input once and deserializes the successful result.

§Errors

Returns an error if all retry attempts fail, if a timeout occurs, if input serialization fails, or if result deserialization fails.

Source

pub async fn schedule_activity_with_retry_on_session( &self, name: impl Into<String>, input: impl Into<String>, policy: RetryPolicy, session_id: impl Into<String>, ) -> Result<String, String>

Schedule an activity with automatic retry on a specific session.

Combines retry semantics from Self::schedule_activity_with_retry with session affinity from Self::schedule_activity_on_session. All retry attempts are pinned to the same session_id, ensuring they execute on the same worker.

§Example
let session = ctx.new_guid().await?;
let result = ctx.schedule_activity_with_retry_on_session(
    "RunQuery",
    "SELECT 1",
    RetryPolicy::new(3)
        .with_backoff(BackoffStrategy::Fixed { delay: Duration::from_secs(1) }),
    &session,
).await?;
§Errors

Returns an error if all retry attempts fail or if a timeout occurs (timeouts are not retried).

Source

pub async fn schedule_activity_with_retry_on_session_typed<In: Serialize, Out: DeserializeOwned>( &self, name: impl Into<String>, input: &In, policy: RetryPolicy, session_id: impl Into<String>, ) -> Result<Out, String>

Typed variant of Self::schedule_activity_with_retry_on_session.

Serializes input once and deserializes the successful result. All retry attempts are pinned to the same session.

§Errors

Returns an error if all retry attempts fail, if a timeout occurs, if input serialization fails, or if result deserialization fails.

Source

pub fn schedule_orchestration( &self, name: impl Into<String>, instance: impl Into<String>, input: impl Into<String>, )

Schedule a detached orchestration with an explicit instance id. The runtime will prefix this with the parent instance to ensure global uniqueness.

Source

pub fn schedule_orchestration_typed<In: Serialize>( &self, name: impl Into<String>, instance: impl Into<String>, input: &In, )

Source

pub fn schedule_orchestration_versioned( &self, name: impl Into<String>, version: Option<String>, instance: impl Into<String>, input: impl Into<String>, )

Versioned detached orchestration start (string I/O). If version is None, registry policy is used for the child.

Source

pub fn schedule_orchestration_versioned_typed<In: Serialize>( &self, name: impl Into<String>, version: Option<String>, instance: impl Into<String>, input: &In, )

Source

pub fn set_custom_status(&self, status: impl Into<String>)

Set a user-defined custom status for progress reporting.

This is not a history event or an action — it’s pure metadata that gets plumbed into ExecutionMetadata at ack time. No impact on determinism, no replay implications.

  • Call it whenever, as many times as you want within a turn
  • Last write wins: if called twice in the same turn, only the last value is sent
  • Persistent across turns: if you don’t call it on a later turn, the provider keeps the previous value
§Example
ctx.set_custom_status("Processing item 3 of 10");
let result = ctx.schedule_activity("ProcessItem", "item-3").await;
ctx.set_custom_status("Processing item 4 of 10");
Source

pub fn reset_custom_status(&self)

Clear the custom status back to None. The provider will set the column to NULL and increment custom_status_version.

ctx.set_custom_status("Processing batch");
// ... work ...
ctx.reset_custom_status(); // done, clear the progress
Source

pub fn get_custom_status(&self) -> Option<String>

Returns the current custom status value, if any.

This reflects all set_custom_status / reset_custom_status calls made so far in this and previous turns. In a CAN’d execution, it includes the value carried from the previous execution.

Source§

impl OrchestrationContext

Source

pub fn schedule_activity( &self, name: impl Into<String>, input: impl Into<String>, ) -> DurableFuture<Result<String, String>>

Schedule an activity and return a cancellation-aware future.

Returns a DurableFuture that supports cancellation on drop. If the future is dropped without completing (e.g., as a select loser), the activity will be cancelled via lock stealing.

§Example
// Fan-out to multiple activities
let f1 = ctx.schedule_activity("Process", "A");
let f2 = ctx.schedule_activity("Process", "B");
let results = ctx.join(vec![f1, f2]).await;
Source

pub fn schedule_activity_typed<In: Serialize, Out: DeserializeOwned + Send + 'static>( &self, name: impl Into<String>, input: &In, ) -> DurableFuture<Result<Out, String>>

Typed version of schedule_activity that serializes input and deserializes output.

§Errors

Returns an error if the activity fails or if the output cannot be deserialized.

Source

pub fn schedule_activity_on_session( &self, name: impl Into<String>, input: impl Into<String>, session_id: impl Into<String>, ) -> DurableFuture<Result<String, String>>

Schedule an activity routed to the worker owning the given session.

If no worker owns the session, any worker can claim it on first fetch. Once claimed, all subsequent activities with the same session_id route to the claiming worker until the session unpins (idle timeout or worker death).

§Example
let session_id = ctx.new_guid().await?;
let result = ctx.schedule_activity_on_session("run_turn", "input", &session_id).await?;
Source

pub fn schedule_activity_on_session_typed<In: Serialize, Out: DeserializeOwned + Send + 'static>( &self, name: impl Into<String>, input: &In, session_id: impl Into<String>, ) -> DurableFuture<Result<Out, String>>

Typed version of schedule_activity_on_session that serializes input and deserializes output.

§Errors

Returns an error if the activity fails or if the output cannot be deserialized.

Source

pub fn schedule_timer(&self, delay: Duration) -> DurableFuture<()>

Schedule a timer and return a cancellation-aware future.

Timers are virtual constructs - dropping the future is a no-op since there’s no external state to cancel. However, wrapping in DurableFuture maintains API consistency.

Source

pub fn schedule_wait(&self, name: impl Into<String>) -> DurableFuture<String>

Subscribe to an external event and return a cancellation-aware future.

External waits are virtual constructs - dropping the future is a no-op since there’s no external state to cancel. However, wrapping in DurableFuture maintains API consistency.

Source

pub fn schedule_wait_typed<T: DeserializeOwned + Send + 'static>( &self, name: impl Into<String>, ) -> DurableFuture<T>

Typed version of schedule_wait.

Source

pub fn dequeue_event(&self, queue: impl Into<String>) -> DurableFuture<String>

Dequeue the next message from a named queue (FIFO mailbox semantics).

Unlike schedule_wait, queued events use FIFO matching:

  • No positional pairing — any unresolved subscription gets the first unmatched arrival
  • Cancelled subscriptions are skipped (don’t consume arrivals)
  • Events that arrive before a subscription are buffered until consumed
  • Events survive continue_as_new boundaries (carried forward)

The caller enqueues messages with Client::enqueue_event.

Source

pub fn dequeue_event_typed<T: DeserializeOwned>( &self, queue: impl Into<String>, ) -> impl Future<Output = T>

Typed version of Self::dequeue_event. Deserializes the message payload as T.

Source

pub fn schedule_wait_persistent( &self, name: impl Into<String>, ) -> DurableFuture<String>

👎Deprecated: Use dequeue_event() instead

Subscribe to a persistent external event (mailbox semantics).

Prefer Self::dequeue_event — this is a deprecated alias.

Source

pub fn schedule_wait_persistent_typed<T: DeserializeOwned>( &self, name: impl Into<String>, ) -> impl Future<Output = T>

👎Deprecated: Use dequeue_event_typed() instead

Typed version of schedule_wait_persistent.

Prefer Self::dequeue_event_typed — this is a deprecated alias.

Source

pub fn schedule_sub_orchestration( &self, name: impl Into<String>, input: impl Into<String>, ) -> DurableFuture<Result<String, String>>

Schedule a sub-orchestration and return a cancellation-aware future.

The child instance ID is auto-generated from the event ID with a parent prefix.

Returns a DurableFuture that supports cancellation on drop. If the future is dropped without completing, a CancelInstance work item will be enqueued for the child orchestration.

Source

pub fn schedule_sub_orchestration_with_id( &self, name: impl Into<String>, instance: impl Into<String>, input: impl Into<String>, ) -> DurableFuture<Result<String, String>>

Schedule a sub-orchestration with an explicit instance ID.

The provided instance value is used exactly as the child instance ID, without any parent prefix. Use this when you need to control the exact instance ID for the sub-orchestration.

For auto-generated instance IDs, use [schedule_sub_orchestration] instead.

Source

pub fn schedule_sub_orchestration_versioned( &self, name: impl Into<String>, version: Option<String>, input: impl Into<String>, ) -> DurableFuture<Result<String, String>>

Schedule a versioned sub-orchestration.

If version is Some, that specific version is used. If version is None, the registry’s policy (e.g., Latest) is used.

Source

pub fn schedule_sub_orchestration_versioned_with_id( &self, name: impl Into<String>, version: Option<String>, instance: impl Into<String>, input: impl Into<String>, ) -> DurableFuture<Result<String, String>>

Schedule a versioned sub-orchestration with an explicit instance ID.

The provided instance value is used exactly as the child instance ID, without any parent prefix.

Returns a DurableFuture that supports cancellation on drop. If the future is dropped without completing, a CancelInstance work item will be enqueued for the child orchestration.

Source

pub fn schedule_sub_orchestration_typed<In: Serialize, Out: DeserializeOwned + Send + 'static>( &self, name: impl Into<String>, input: &In, ) -> DurableFuture<Result<Out, String>>

Typed version of schedule_sub_orchestration.

§Errors

Returns an error if the sub-orchestration fails or if the output cannot be deserialized.

Source

pub fn schedule_sub_orchestration_with_id_typed<In: Serialize, Out: DeserializeOwned + Send + 'static>( &self, name: impl Into<String>, instance: impl Into<String>, input: &In, ) -> DurableFuture<Result<Out, String>>

Typed version of schedule_sub_orchestration_with_id.

§Errors

Returns an error if the sub-orchestration fails or if the output cannot be deserialized.

Source

pub async fn join<T, F>(&self, futures: Vec<F>) -> Vec<T>
where F: Future<Output = T>,

Await all futures concurrently using futures::future::join_all. Works with any Future type.

Source

pub async fn join2<T1, T2, F1, F2>(&self, f1: F1, f2: F2) -> (T1, T2)
where F1: Future<Output = T1>, F2: Future<Output = T2>,

Simplified join for exactly 2 futures (convenience method).

Source

pub async fn join3<T1, T2, T3, F1, F2, F3>( &self, f1: F1, f2: F2, f3: F3, ) -> (T1, T2, T3)
where F1: Future<Output = T1>, F2: Future<Output = T2>, F3: Future<Output = T3>,

Simplified join for exactly 3 futures (convenience method).

Source

pub async fn select2<T1, T2, F1, F2>(&self, f1: F1, f2: F2) -> Either2<T1, T2>
where F1: Future<Output = T1>, F2: Future<Output = T2>,

Simplified select over 2 futures: returns the result of whichever completes first. Select over 2 futures with potentially different output types.

Returns Either2::First(result) if first future wins, Either2::Second(result) if second wins. Uses futures::select_biased! for determinism (first branch polled first).

§Example: Activity with timeout
let work = ctx.schedule_activity("SlowWork", "input");
let timeout = ctx.schedule_timer(Duration::from_secs(30));

match ctx.select2(work, timeout).await {
    Either2::First(result) => result,
    Either2::Second(()) => Err("Operation timed out".to_string()),
}
Source

pub async fn select3<T1, T2, T3, F1, F2, F3>( &self, f1: F1, f2: F2, f3: F3, ) -> Either3<T1, T2, T3>
where F1: Future<Output = T1>, F2: Future<Output = T2>, F3: Future<Output = T3>,

Select over 3 futures with potentially different output types.

Returns Either3::First/Second/Third(result) depending on which future completes first. Uses futures::select_biased! for determinism (earlier branches polled first).

Trait Implementations§

Source§

impl Clone for OrchestrationContext

Source§

fn clone(&self) -> OrchestrationContext

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more