pub struct OrchestrationContext { /* private fields */ }Implementations§
Source§impl OrchestrationContext
impl OrchestrationContext
Sourcepub fn new(
history: Vec<Event>,
execution_id: u64,
instance_id: String,
orchestration_name: String,
orchestration_version: String,
worker_id: Option<String>,
) -> Self
pub fn new( history: Vec<Event>, execution_id: u64, instance_id: String, orchestration_name: String, orchestration_version: String, worker_id: Option<String>, ) -> Self
Construct a new context from an existing history vector.
§Parameters
orchestration_name- The name of the orchestration being executed.orchestration_version- The semantic version string of the orchestration.worker_id- Optional dispatcher worker ID for logging correlation.Some(id): Used by runtime dispatchers to include worker_id in tracesNone: Used by standalone/test execution without runtime context
Sourcepub fn is_replaying(&self) -> bool
pub fn is_replaying(&self) -> bool
Check if the orchestration is currently replaying history.
Returns true when processing events from persisted history (replay),
and false when executing new logic beyond the stored history.
This is useful for skipping side effects during replay, such as:
- Logging/tracing that should only happen on first execution
- Metrics that shouldn’t be double-counted
- External notifications that shouldn’t be re-sent
§Example
if !ctx.is_replaying() {
// Only log on first execution, not during replay
println!("Starting workflow for the first time");
}Sourcepub fn trace_info(&self, message: impl Into<String>)
pub fn trace_info(&self, message: impl Into<String>)
Convenience wrapper for INFO level tracing.
Logs with INFO level and includes instance context automatically. Only emits on first execution, not during replay.
§Example
ctx.trace_info("Processing order");
ctx.trace_info(format!("Processing {} items", 42));Sourcepub fn trace_warn(&self, message: impl Into<String>)
pub fn trace_warn(&self, message: impl Into<String>)
Convenience wrapper for WARN level tracing.
Logs with WARN level and includes instance context automatically. Only emits on first execution, not during replay.
§Example
ctx.trace_warn("Retrying failed operation");Sourcepub fn trace_error(&self, message: impl Into<String>)
pub fn trace_error(&self, message: impl Into<String>)
Convenience wrapper for ERROR level tracing.
Logs with ERROR level and includes instance context automatically. Only emits on first execution, not during replay.
§Example
ctx.trace_error("Payment processing failed");Sourcepub fn trace_debug(&self, message: impl Into<String>)
pub fn trace_debug(&self, message: impl Into<String>)
Convenience wrapper for DEBUG level tracing.
Logs with DEBUG level and includes instance context automatically. Only emits on first execution, not during replay.
§Example
ctx.trace_debug("Detailed state information");Sourcepub fn instance_id(&self) -> String
pub fn instance_id(&self) -> String
Returns the orchestration instance identifier.
This is the unique identifier for this orchestration instance, typically provided when starting the orchestration.
§Example
let id = ctx.instance_id();
ctx.trace_info(format!("Processing instance: {}", id));Sourcepub fn execution_id(&self) -> u64
pub fn execution_id(&self) -> u64
Returns the current execution ID within this orchestration instance.
The execution ID increments each time continue_as_new() is called.
Execution 1 is the initial execution.
§Example
let exec_id = ctx.execution_id();
ctx.trace_info(format!("Execution #{}", exec_id));Sourcepub fn orchestration_name(&self) -> String
pub fn orchestration_name(&self) -> String
Returns the orchestration name.
This is the name registered with the orchestration registry.
§Example
let name = ctx.orchestration_name();
ctx.trace_info(format!("Running orchestration: {}", name));Sourcepub fn orchestration_version(&self) -> String
pub fn orchestration_version(&self) -> String
Returns the orchestration version.
This is the semantic version string associated with the orchestration.
§Example
let version = ctx.orchestration_version();
ctx.trace_info(format!("Version: {}", version));Sourcepub fn is_logging_enabled(&self) -> bool
pub fn is_logging_enabled(&self) -> bool
Indicates whether logging is enabled for the current poll. This is flipped on when a decision is recorded to minimize log noise.
Sourcepub fn trace(&self, level: impl Into<String>, message: impl Into<String>)
pub fn trace(&self, level: impl Into<String>, message: impl Into<String>)
Emit a structured trace entry with automatic context correlation.
Creates a system call event for deterministic replay and logs to tracing. The log entry automatically includes correlation fields:
instance_id- The orchestration instance identifierexecution_id- The current execution numberorchestration_name- Name of the orchestrationorchestration_version- Semantic version
§Determinism
This method is replay-safe: logs are only emitted on first execution, not during replay.
§Example
ctx.trace("INFO", "Processing started");
ctx.trace("WARN", format!("Retry attempt: {}", 3));
ctx.trace("ERROR", "Payment validation failed");§Output
2024-10-30T10:15:23.456Z INFO duroxide::orchestration [order-123] Processing startedAll logs include instance_id, execution_id, orchestration_name for correlation.
Sourcepub fn new_guid(&self) -> impl Future<Output = Result<String, String>>
pub fn new_guid(&self) -> impl Future<Output = Result<String, String>>
Generate a new deterministic GUID.
This schedules a built-in activity that generates a unique identifier. The GUID is deterministic across replays (the same value is returned when the orchestration replays).
§Example
let guid = ctx.new_guid().await?;
println!("Generated GUID: {}", guid);Sourcepub fn utc_now(&self) -> impl Future<Output = Result<SystemTime, String>>
pub fn utc_now(&self) -> impl Future<Output = Result<SystemTime, String>>
Get the current UTC time.
This schedules a built-in activity that returns the current time. The time is deterministic across replays (the same value is returned when the orchestration replays).
§Errors
Returns an error if the activity fails or if the time value cannot be parsed.
§Example
let now = ctx.utc_now().await?;
let deadline = now + Duration::from_secs(3600); // 1 hour from nowSourcepub fn continue_as_new(
&self,
input: impl Into<String>,
) -> impl Future<Output = Result<String, String>>
pub fn continue_as_new( &self, input: impl Into<String>, ) -> impl Future<Output = Result<String, String>>
Continue the current execution as a new execution with fresh input.
This terminates the current execution and starts a new execution with the provided input.
Returns a future that never resolves, ensuring code after await is unreachable.
§Example
let n: u32 = 0;
if n < 2 {
return ctx.continue_as_new("next_input").await; // Execution terminates here
// This code is unreachable - compiler will warn
}
Ok("completed".to_string())pub fn continue_as_new_typed<In: Serialize>( &self, input: &In, ) -> impl Future<Output = Result<String, String>>
Source§impl OrchestrationContext
impl OrchestrationContext
Sourcepub async fn schedule_activity_with_retry(
&self,
name: impl Into<String>,
input: impl Into<String>,
policy: RetryPolicy,
) -> Result<String, String>
pub async fn schedule_activity_with_retry( &self, name: impl Into<String>, input: impl Into<String>, policy: RetryPolicy, ) -> Result<String, String>
Schedule activity with automatic retry on failure.
Retry behavior:
- Retries on activity errors up to
policy.max_attempts - Timeouts are NOT retried - if any attempt times out, returns error immediately
- Only application errors trigger retry logic
Timeout behavior (if policy.total_timeout is set):
- Each activity attempt is raced against the timeout
- If the timeout fires before the activity completes → returns timeout error (no retry)
- If the activity fails with an error before timeout → retry according to policy
§Example
// Simple retry with defaults (no timeout)
let result = ctx.schedule_activity_with_retry(
"CallAPI",
"request",
RetryPolicy::new(3),
).await?;
// Retry with per-attempt timeout and custom backoff
let result = ctx.schedule_activity_with_retry(
"CallAPI",
"request",
RetryPolicy::new(5)
.with_timeout(Duration::from_secs(30)) // 30s per attempt
.with_backoff(BackoffStrategy::Fixed { delay: Duration::from_secs(1) }),
).await?;§Errors
Returns an error if all retry attempts fail or if a timeout occurs (timeouts are not retried).
Sourcepub async fn schedule_activity_with_retry_typed<In: Serialize, Out: DeserializeOwned>(
&self,
name: impl Into<String>,
input: &In,
policy: RetryPolicy,
) -> Result<Out, String>
pub async fn schedule_activity_with_retry_typed<In: Serialize, Out: DeserializeOwned>( &self, name: impl Into<String>, input: &In, policy: RetryPolicy, ) -> Result<Out, String>
Typed variant of schedule_activity_with_retry.
Serializes input once and deserializes the successful result.
§Errors
Returns an error if all retry attempts fail, if a timeout occurs, if input serialization fails, or if result deserialization fails.
Sourcepub async fn schedule_activity_with_retry_on_session(
&self,
name: impl Into<String>,
input: impl Into<String>,
policy: RetryPolicy,
session_id: impl Into<String>,
) -> Result<String, String>
pub async fn schedule_activity_with_retry_on_session( &self, name: impl Into<String>, input: impl Into<String>, policy: RetryPolicy, session_id: impl Into<String>, ) -> Result<String, String>
Schedule an activity with automatic retry on a specific session.
Combines retry semantics from Self::schedule_activity_with_retry with
session affinity from Self::schedule_activity_on_session. All retry
attempts are pinned to the same session_id, ensuring they execute on
the same worker.
§Example
let session = ctx.new_guid().await?;
let result = ctx.schedule_activity_with_retry_on_session(
"RunQuery",
"SELECT 1",
RetryPolicy::new(3)
.with_backoff(BackoffStrategy::Fixed { delay: Duration::from_secs(1) }),
&session,
).await?;§Errors
Returns an error if all retry attempts fail or if a timeout occurs (timeouts are not retried).
Sourcepub async fn schedule_activity_with_retry_on_session_typed<In: Serialize, Out: DeserializeOwned>(
&self,
name: impl Into<String>,
input: &In,
policy: RetryPolicy,
session_id: impl Into<String>,
) -> Result<Out, String>
pub async fn schedule_activity_with_retry_on_session_typed<In: Serialize, Out: DeserializeOwned>( &self, name: impl Into<String>, input: &In, policy: RetryPolicy, session_id: impl Into<String>, ) -> Result<Out, String>
Typed variant of Self::schedule_activity_with_retry_on_session.
Serializes input once and deserializes the successful result. All retry attempts are pinned to the same session.
§Errors
Returns an error if all retry attempts fail, if a timeout occurs, if input serialization fails, or if result deserialization fails.
Sourcepub fn schedule_orchestration(
&self,
name: impl Into<String>,
instance: impl Into<String>,
input: impl Into<String>,
)
pub fn schedule_orchestration( &self, name: impl Into<String>, instance: impl Into<String>, input: impl Into<String>, )
Schedule a detached orchestration with an explicit instance id. The runtime will prefix this with the parent instance to ensure global uniqueness.
pub fn schedule_orchestration_typed<In: Serialize>( &self, name: impl Into<String>, instance: impl Into<String>, input: &In, )
Sourcepub fn schedule_orchestration_versioned(
&self,
name: impl Into<String>,
version: Option<String>,
instance: impl Into<String>,
input: impl Into<String>,
)
pub fn schedule_orchestration_versioned( &self, name: impl Into<String>, version: Option<String>, instance: impl Into<String>, input: impl Into<String>, )
Versioned detached orchestration start (string I/O). If version is None, registry policy is used for the child.
pub fn schedule_orchestration_versioned_typed<In: Serialize>( &self, name: impl Into<String>, version: Option<String>, instance: impl Into<String>, input: &In, )
Sourcepub fn set_custom_status(&self, status: impl Into<String>)
pub fn set_custom_status(&self, status: impl Into<String>)
Set a user-defined custom status for progress reporting.
This is not a history event or an action — it’s pure metadata that gets
plumbed into ExecutionMetadata at ack time. No impact on determinism,
no replay implications.
- Call it whenever, as many times as you want within a turn
- Last write wins: if called twice in the same turn, only the last value is sent
- Persistent across turns: if you don’t call it on a later turn, the provider keeps the previous value
§Example
ctx.set_custom_status("Processing item 3 of 10");
let result = ctx.schedule_activity("ProcessItem", "item-3").await;
ctx.set_custom_status("Processing item 4 of 10");Sourcepub fn reset_custom_status(&self)
pub fn reset_custom_status(&self)
Clear the custom status back to None. The provider will set the column to NULL
and increment custom_status_version.
ctx.set_custom_status("Processing batch");
// ... work ...
ctx.reset_custom_status(); // done, clear the progressSourcepub fn get_custom_status(&self) -> Option<String>
pub fn get_custom_status(&self) -> Option<String>
Returns the current custom status value, if any.
This reflects all set_custom_status / reset_custom_status calls made so far
in this and previous turns. In a CAN’d execution, it includes the value carried
from the previous execution.
Source§impl OrchestrationContext
impl OrchestrationContext
Sourcepub fn schedule_activity(
&self,
name: impl Into<String>,
input: impl Into<String>,
) -> DurableFuture<Result<String, String>> ⓘ
pub fn schedule_activity( &self, name: impl Into<String>, input: impl Into<String>, ) -> DurableFuture<Result<String, String>> ⓘ
Schedule an activity and return a cancellation-aware future.
Returns a DurableFuture that supports cancellation on drop. If the future
is dropped without completing (e.g., as a select loser), the activity will be
cancelled via lock stealing.
§Example
// Fan-out to multiple activities
let f1 = ctx.schedule_activity("Process", "A");
let f2 = ctx.schedule_activity("Process", "B");
let results = ctx.join(vec![f1, f2]).await;Sourcepub fn schedule_activity_typed<In: Serialize, Out: DeserializeOwned + Send + 'static>(
&self,
name: impl Into<String>,
input: &In,
) -> DurableFuture<Result<Out, String>> ⓘ
pub fn schedule_activity_typed<In: Serialize, Out: DeserializeOwned + Send + 'static>( &self, name: impl Into<String>, input: &In, ) -> DurableFuture<Result<Out, String>> ⓘ
Typed version of schedule_activity that serializes input and deserializes output.
§Errors
Returns an error if the activity fails or if the output cannot be deserialized.
Sourcepub fn schedule_activity_on_session(
&self,
name: impl Into<String>,
input: impl Into<String>,
session_id: impl Into<String>,
) -> DurableFuture<Result<String, String>> ⓘ
pub fn schedule_activity_on_session( &self, name: impl Into<String>, input: impl Into<String>, session_id: impl Into<String>, ) -> DurableFuture<Result<String, String>> ⓘ
Schedule an activity routed to the worker owning the given session.
If no worker owns the session, any worker can claim it on first fetch.
Once claimed, all subsequent activities with the same session_id route
to the claiming worker until the session unpins (idle timeout or worker death).
§Example
let session_id = ctx.new_guid().await?;
let result = ctx.schedule_activity_on_session("run_turn", "input", &session_id).await?;Sourcepub fn schedule_activity_on_session_typed<In: Serialize, Out: DeserializeOwned + Send + 'static>(
&self,
name: impl Into<String>,
input: &In,
session_id: impl Into<String>,
) -> DurableFuture<Result<Out, String>> ⓘ
pub fn schedule_activity_on_session_typed<In: Serialize, Out: DeserializeOwned + Send + 'static>( &self, name: impl Into<String>, input: &In, session_id: impl Into<String>, ) -> DurableFuture<Result<Out, String>> ⓘ
Typed version of schedule_activity_on_session that serializes input and deserializes output.
§Errors
Returns an error if the activity fails or if the output cannot be deserialized.
Sourcepub fn schedule_timer(&self, delay: Duration) -> DurableFuture<()> ⓘ
pub fn schedule_timer(&self, delay: Duration) -> DurableFuture<()> ⓘ
Schedule a timer and return a cancellation-aware future.
Timers are virtual constructs - dropping the future is a no-op since there’s
no external state to cancel. However, wrapping in DurableFuture maintains
API consistency.
Sourcepub fn schedule_wait(&self, name: impl Into<String>) -> DurableFuture<String> ⓘ
pub fn schedule_wait(&self, name: impl Into<String>) -> DurableFuture<String> ⓘ
Subscribe to an external event and return a cancellation-aware future.
External waits are virtual constructs - dropping the future is a no-op since
there’s no external state to cancel. However, wrapping in DurableFuture
maintains API consistency.
Sourcepub fn schedule_wait_typed<T: DeserializeOwned + Send + 'static>(
&self,
name: impl Into<String>,
) -> DurableFuture<T> ⓘ
pub fn schedule_wait_typed<T: DeserializeOwned + Send + 'static>( &self, name: impl Into<String>, ) -> DurableFuture<T> ⓘ
Typed version of schedule_wait.
Sourcepub fn dequeue_event(&self, queue: impl Into<String>) -> DurableFuture<String> ⓘ
pub fn dequeue_event(&self, queue: impl Into<String>) -> DurableFuture<String> ⓘ
Dequeue the next message from a named queue (FIFO mailbox semantics).
Unlike schedule_wait, queued events use FIFO matching:
- No positional pairing — any unresolved subscription gets the first unmatched arrival
- Cancelled subscriptions are skipped (don’t consume arrivals)
- Events that arrive before a subscription are buffered until consumed
- Events survive
continue_as_newboundaries (carried forward)
The caller enqueues messages with Client::enqueue_event.
Sourcepub fn dequeue_event_typed<T: DeserializeOwned>(
&self,
queue: impl Into<String>,
) -> impl Future<Output = T>
pub fn dequeue_event_typed<T: DeserializeOwned>( &self, queue: impl Into<String>, ) -> impl Future<Output = T>
Typed version of Self::dequeue_event. Deserializes the message payload as T.
Sourcepub fn schedule_wait_persistent(
&self,
name: impl Into<String>,
) -> DurableFuture<String> ⓘ
👎Deprecated: Use dequeue_event() instead
pub fn schedule_wait_persistent( &self, name: impl Into<String>, ) -> DurableFuture<String> ⓘ
Subscribe to a persistent external event (mailbox semantics).
Prefer Self::dequeue_event — this is a deprecated alias.
Sourcepub fn schedule_wait_persistent_typed<T: DeserializeOwned>(
&self,
name: impl Into<String>,
) -> impl Future<Output = T>
👎Deprecated: Use dequeue_event_typed() instead
pub fn schedule_wait_persistent_typed<T: DeserializeOwned>( &self, name: impl Into<String>, ) -> impl Future<Output = T>
Typed version of schedule_wait_persistent.
Prefer Self::dequeue_event_typed — this is a deprecated alias.
Sourcepub fn schedule_sub_orchestration(
&self,
name: impl Into<String>,
input: impl Into<String>,
) -> DurableFuture<Result<String, String>> ⓘ
pub fn schedule_sub_orchestration( &self, name: impl Into<String>, input: impl Into<String>, ) -> DurableFuture<Result<String, String>> ⓘ
Schedule a sub-orchestration and return a cancellation-aware future.
The child instance ID is auto-generated from the event ID with a parent prefix.
Returns a DurableFuture that supports cancellation on drop. If the future
is dropped without completing, a CancelInstance work item will be enqueued
for the child orchestration.
Sourcepub fn schedule_sub_orchestration_with_id(
&self,
name: impl Into<String>,
instance: impl Into<String>,
input: impl Into<String>,
) -> DurableFuture<Result<String, String>> ⓘ
pub fn schedule_sub_orchestration_with_id( &self, name: impl Into<String>, instance: impl Into<String>, input: impl Into<String>, ) -> DurableFuture<Result<String, String>> ⓘ
Schedule a sub-orchestration with an explicit instance ID.
The provided instance value is used exactly as the child instance ID,
without any parent prefix. Use this when you need to control the exact
instance ID for the sub-orchestration.
For auto-generated instance IDs, use [schedule_sub_orchestration] instead.
Sourcepub fn schedule_sub_orchestration_versioned(
&self,
name: impl Into<String>,
version: Option<String>,
input: impl Into<String>,
) -> DurableFuture<Result<String, String>> ⓘ
pub fn schedule_sub_orchestration_versioned( &self, name: impl Into<String>, version: Option<String>, input: impl Into<String>, ) -> DurableFuture<Result<String, String>> ⓘ
Schedule a versioned sub-orchestration.
If version is Some, that specific version is used.
If version is None, the registry’s policy (e.g., Latest) is used.
Sourcepub fn schedule_sub_orchestration_versioned_with_id(
&self,
name: impl Into<String>,
version: Option<String>,
instance: impl Into<String>,
input: impl Into<String>,
) -> DurableFuture<Result<String, String>> ⓘ
pub fn schedule_sub_orchestration_versioned_with_id( &self, name: impl Into<String>, version: Option<String>, instance: impl Into<String>, input: impl Into<String>, ) -> DurableFuture<Result<String, String>> ⓘ
Schedule a versioned sub-orchestration with an explicit instance ID.
The provided instance value is used exactly as the child instance ID,
without any parent prefix.
Returns a DurableFuture that supports cancellation on drop. If the future
is dropped without completing, a CancelInstance work item will be enqueued
for the child orchestration.
Sourcepub fn schedule_sub_orchestration_typed<In: Serialize, Out: DeserializeOwned + Send + 'static>(
&self,
name: impl Into<String>,
input: &In,
) -> DurableFuture<Result<Out, String>> ⓘ
pub fn schedule_sub_orchestration_typed<In: Serialize, Out: DeserializeOwned + Send + 'static>( &self, name: impl Into<String>, input: &In, ) -> DurableFuture<Result<Out, String>> ⓘ
Typed version of schedule_sub_orchestration.
§Errors
Returns an error if the sub-orchestration fails or if the output cannot be deserialized.
Sourcepub fn schedule_sub_orchestration_with_id_typed<In: Serialize, Out: DeserializeOwned + Send + 'static>(
&self,
name: impl Into<String>,
instance: impl Into<String>,
input: &In,
) -> DurableFuture<Result<Out, String>> ⓘ
pub fn schedule_sub_orchestration_with_id_typed<In: Serialize, Out: DeserializeOwned + Send + 'static>( &self, name: impl Into<String>, instance: impl Into<String>, input: &In, ) -> DurableFuture<Result<Out, String>> ⓘ
Typed version of schedule_sub_orchestration_with_id.
§Errors
Returns an error if the sub-orchestration fails or if the output cannot be deserialized.
Sourcepub async fn join<T, F>(&self, futures: Vec<F>) -> Vec<T>where
F: Future<Output = T>,
pub async fn join<T, F>(&self, futures: Vec<F>) -> Vec<T>where
F: Future<Output = T>,
Await all futures concurrently using futures::future::join_all.
Works with any Future type.
Sourcepub async fn join2<T1, T2, F1, F2>(&self, f1: F1, f2: F2) -> (T1, T2)
pub async fn join2<T1, T2, F1, F2>(&self, f1: F1, f2: F2) -> (T1, T2)
Simplified join for exactly 2 futures (convenience method).
Sourcepub async fn join3<T1, T2, T3, F1, F2, F3>(
&self,
f1: F1,
f2: F2,
f3: F3,
) -> (T1, T2, T3)
pub async fn join3<T1, T2, T3, F1, F2, F3>( &self, f1: F1, f2: F2, f3: F3, ) -> (T1, T2, T3)
Simplified join for exactly 3 futures (convenience method).
Sourcepub async fn select2<T1, T2, F1, F2>(&self, f1: F1, f2: F2) -> Either2<T1, T2>
pub async fn select2<T1, T2, F1, F2>(&self, f1: F1, f2: F2) -> Either2<T1, T2>
Simplified select over 2 futures: returns the result of whichever completes first. Select over 2 futures with potentially different output types.
Returns Either2::First(result) if first future wins, Either2::Second(result) if second wins.
Uses futures::select_biased! for determinism (first branch polled first).
§Example: Activity with timeout
let work = ctx.schedule_activity("SlowWork", "input");
let timeout = ctx.schedule_timer(Duration::from_secs(30));
match ctx.select2(work, timeout).await {
Either2::First(result) => result,
Either2::Second(()) => Err("Operation timed out".to_string()),
}Sourcepub async fn select3<T1, T2, T3, F1, F2, F3>(
&self,
f1: F1,
f2: F2,
f3: F3,
) -> Either3<T1, T2, T3>
pub async fn select3<T1, T2, T3, F1, F2, F3>( &self, f1: F1, f2: F2, f3: F3, ) -> Either3<T1, T2, T3>
Select over 3 futures with potentially different output types.
Returns Either3::First/Second/Third(result) depending on which future completes first.
Uses futures::select_biased! for determinism (earlier branches polled first).
Trait Implementations§
Source§impl Clone for OrchestrationContext
impl Clone for OrchestrationContext
Source§fn clone(&self) -> OrchestrationContext
fn clone(&self) -> OrchestrationContext
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read more