duroxide/lib.rs
1//! # Duroxide: Durable execution framework in Rust
2//!
3//! Duroxide is a framework for building reliable, long-running code based workflows that can survive
4//! failures and restarts. For a deep dive into how durable execution works, see the
5//! [Durable Futures Internals](https://github.com/affandar/duroxide/blob/main/docs/durable-futures-internals.md) documentation.
6//!
7//! ## Quick Start
8//!
9//! ```rust,no_run
10//! use duroxide::providers::sqlite::SqliteProvider;
11//! use duroxide::runtime::registry::ActivityRegistry;
12//! use duroxide::runtime::{self};
13//! use duroxide::{ActivityContext, OrchestrationContext, OrchestrationRegistry, Client};
14//! use std::sync::Arc;
15//!
16//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
17//! // 1. Create a storage provider
18//! let store = Arc::new(SqliteProvider::new("sqlite:./data.db", None).await.unwrap());
19//!
20//! // 2. Register activities (your business logic)
21//! let activities = ActivityRegistry::builder()
22//! .register("Greet", |_ctx: ActivityContext, name: String| async move {
23//! Ok(format!("Hello, {}!", name))
24//! })
25//! .build();
26//!
27//! // 3. Define your orchestration
28//! let orchestration = |ctx: OrchestrationContext, name: String| async move {
29//! let greeting = ctx.schedule_activity("Greet", name).await?;
30
31// Mutex poisoning indicates a panic in another thread - a critical error.
32// All expect()/unwrap() calls on mutex locks in this module are intentional:
33// poisoned mutexes should panic as they indicate corrupted state.
34#![allow(clippy::expect_used)]
35#![allow(clippy::unwrap_used)]
36// Arc::clone() vs .clone() is a style preference - we use .clone() for brevity
37#![allow(clippy::clone_on_ref_ptr)]
38//! Ok(greeting)
39//! };
40//!
41//! // 4. Register and start the runtime
42//! let orchestrations = OrchestrationRegistry::builder()
43//! .register("HelloWorld", orchestration)
44//! .build();
45//!
46//! let rt = runtime::Runtime::start_with_store(
47//! store.clone(), activities, orchestrations
48//! ).await;
49//!
50//! // 5. Create a client and start an orchestration instance
51//! let client = Client::new(store.clone());
52//! client.start_orchestration("inst-1", "HelloWorld", "World").await?;
53//! let result = client.wait_for_orchestration("inst-1", std::time::Duration::from_secs(5)).await
54//! .map_err(|e| format!("Wait error: {:?}", e))?;
55//! # Ok(())
56//! # }
57//! ```
58//!
59//! ## Key Concepts
60//!
61//! - **Orchestrations**: Long-running workflows written as async functions (coordination logic)
62//! - **Activities**: Single-purpose work units (can do anything - DB, API, polling, etc.)
63//! - Supports long-running activities via automatic lock renewal (minutes to hours)
64//! - **Timers**: Use `ctx.schedule_timer(ms)` for orchestration-level delays and timeouts
65//! - **Deterministic Replay**: Orchestrations are replayed from history to ensure consistency
66//! - **Durable Futures**: Composable futures for activities, timers, and external events
67//! - **ContinueAsNew (Multi-Execution)**: An orchestration can end the current execution and
68//! immediately start a new one with fresh input. Each execution has its own isolated history
69//! that starts with `OrchestrationStarted { event_id: 1 }`.
70//!
71//! ## ⚠️ Important: Orchestrations vs Activities
72//!
73//! **Orchestrations = Coordination (control flow, business logic)**
74//! **Activities = Execution (single-purpose work units)**
75//!
76//! ```rust,no_run
77//! # use duroxide::OrchestrationContext;
78//! # use std::time::Duration;
79//! # async fn example(ctx: OrchestrationContext) -> Result<(), String> {
80//! // ✅ CORRECT: Orchestration-level delay using timer
81//! ctx.schedule_timer(Duration::from_secs(5)).await; // Wait 5 seconds
82//!
83//! // ✅ ALSO CORRECT: Activity can poll/sleep as part of its work
84//! // Example: Activity that provisions a VM and polls for readiness
85//! // activities.register("ProvisionVM", |config| async move {
86//! // let vm = create_vm(config).await?;
87//! // while !vm_ready(&vm).await {
88//! // tokio::time::sleep(Duration::from_secs(5)).await; // ✅ OK - part of provisioning
89//! // }
90//! // Ok(vm.id)
91//! // });
92//!
93//! // ❌ WRONG: Activity that ONLY sleeps (use timer instead)
94//! // ctx.schedule_activity("Sleep5Seconds", "").await;
95//! # Ok(())
96//! # }
97//! ```
98//!
99//! **Put in Activities (single-purpose execution units):**
100//! - Database operations
101//! - API calls (can include retries/polling)
102//! - Data transformations
103//! - File I/O
104//! - VM provisioning (with internal polling)
105//!
106//! **Put in Orchestrations (coordination and business logic):**
107//! - Control flow (if/else, match, loops)
108//! - Business decisions
109//! - Multi-step workflows
110//! - Error handling and compensation
111//! - Timeouts and deadlines (use timers)
112//! - Waiting for external events
113//!
114//! ## ContinueAsNew (Multi-Execution) Semantics
115//!
116//! ContinueAsNew (CAN) allows an orchestration to end its current execution and start a new
117//! one with fresh input (useful for loops, pagination, long-running workflows).
118//!
119//! - Orchestration calls `ctx.continue_as_new(new_input)`
120//! - Runtime stamps `OrchestrationContinuedAsNew` in the CURRENT execution's history
121//! - Runtime enqueues a `WorkItem::ContinueAsNew`
122//! - When processing that work item, the runtime starts a NEW execution with:
123//! - `execution_id = previous_execution_id + 1`
124//! - `existing_history = []` (fresh history)
125//! - `OrchestrationStarted { event_id: 1, input = new_input }` is stamped automatically
126//! - Each execution's history is independent; `duroxide::Client::read_execution_history(instance, id)`
127//! returns events for that execution only
128//!
129//! Provider responsibilities are strictly storage-level (see below). The runtime owns all
130//! orchestration semantics, including execution boundaries and starting the new execution.
131//!
132//! ## Provider Responsibilities (At a Glance)
133//!
134//! Providers are pure storage abstractions. The runtime computes orchestration semantics
135//! and passes explicit instructions to the provider.
136//!
137//! - `fetch_orchestration_item()`
138//! - Return a locked batch of work for ONE instance
139//! - Include full history for the CURRENT `execution_id`
140//! - Do NOT create/synthesize new executions here (even for ContinueAsNew)
141//!
142//! - `ack_orchestration_item(lock_token, execution_id, history_delta, ..., metadata)`
143//! - Atomic commit of one orchestration turn
144//! - Idempotently `INSERT OR IGNORE` execution row for the explicit `execution_id`
145//! - `UPDATE instances.current_execution_id = MAX(current_execution_id, execution_id)`
146//! - Append `history_delta` to the specified execution
147//! - Update `executions.status` and `executions.output` from `metadata` (no event inspection)
148//!
149//! - Worker/Timer queues
150//! - Peek-lock semantics (dequeue with lock token; ack by deleting)
151//! - Automatic lock renewal for long-running activities (no configuration needed)
152//! - Orchestrator, Worker, Timer queues are independent but committed atomically with history
153//!
154//! See `docs/provider-implementation-guide.md` and `src/providers/sqlite.rs` for a complete,
155//! production-grade provider implementation.
156//!
157//! ## Simplified API
158//!
159//! All schedule methods return typed futures that can be awaited directly:
160//!
161//! ```rust,no_run
162//! # use duroxide::OrchestrationContext;
163//! # use std::time::Duration;
164//! # async fn example(ctx: OrchestrationContext) -> Result<(), String> {
165//! // Activities return Result<String, String>
166//! let result = ctx.schedule_activity("Task", "input").await?;
167//!
168//! // Timers return ()
169//! ctx.schedule_timer(Duration::from_secs(5)).await;
170//!
171//! // External events return String
172//! let event = ctx.schedule_wait("Event").await;
173//!
174//! // Sub-orchestrations return Result<String, String>
175//! let sub_result = ctx.schedule_sub_orchestration("Sub", "input").await?;
176//! # Ok(())
177//! # }
178//! ```
179//!
180//! ## Common Patterns
181//!
182//! ### Function Chaining
183//! ```rust,no_run
184//! # use duroxide::OrchestrationContext;
185//! async fn chain_example(ctx: OrchestrationContext) -> Result<String, String> {
186//! let step1 = ctx.schedule_activity("Step1", "input").await?;
187//! let step2 = ctx.schedule_activity("Step2", &step1).await?;
188//! Ok(step2)
189//! }
190//! ```
191//!
192//! ### Fan-Out/Fan-In
193//! ```rust,no_run
194//! # use duroxide::OrchestrationContext;
195//! async fn fanout_example(ctx: OrchestrationContext) -> Vec<String> {
196//! let futures = vec![
197//! ctx.schedule_activity("Process", "item1"),
198//! ctx.schedule_activity("Process", "item2"),
199//! ctx.schedule_activity("Process", "item3"),
200//! ];
201//! let results = ctx.join(futures).await;
202//! results.into_iter().filter_map(|r| r.ok()).collect()
203//! }
204//! ```
205//!
206//! ### Human-in-the-Loop (Timeout Pattern)
207//! ```rust,no_run
208//! # use duroxide::{OrchestrationContext, Either2};
209//! # use std::time::Duration;
210//! async fn approval_example(ctx: OrchestrationContext) -> String {
211//! let timeout = ctx.schedule_timer(Duration::from_secs(30));
212//! let approval = ctx.schedule_wait("ApprovalEvent");
213//!
214//! match ctx.select2(approval, timeout).await {
215//! Either2::First(data) => data,
216//! Either2::Second(()) => "timeout".to_string(),
217//! }
218//! }
219//! ```
220//!
221//! ### Delays and Timeouts
222//! ```rust,no_run
223//! # use duroxide::{OrchestrationContext, Either2};
224//! # use std::time::Duration;
225//! async fn delay_example(ctx: OrchestrationContext) -> Result<String, String> {
226//! // Use timer for orchestration-level delays
227//! ctx.schedule_timer(Duration::from_secs(5)).await;
228//!
229//! // Process after delay
230//! let result = ctx.schedule_activity("ProcessData", "input").await?;
231//! Ok(result)
232//! }
233//!
234//! async fn timeout_example(ctx: OrchestrationContext) -> Result<String, String> {
235//! // Race work against timeout
236//! let work = ctx.schedule_activity("SlowOperation", "input");
237//! let timeout = ctx.schedule_timer(Duration::from_secs(30));
238//!
239//! match ctx.select2(work, timeout).await {
240//! Either2::First(result) => result,
241//! Either2::Second(()) => Err("Operation timed out".to_string()),
242//! }
243//! }
244//! ```
245//!
246//! ### Fan-Out/Fan-In with Error Handling
247//! ```rust,no_run
248//! # use duroxide::OrchestrationContext;
249//! async fn fanout_with_errors(ctx: OrchestrationContext, items: Vec<String>) -> Result<Vec<String>, String> {
250//! // Schedule all work in parallel
251//! let futures: Vec<_> = items.iter()
252//! .map(|item| ctx.schedule_activity("ProcessItem", item.clone()))
253//! .collect();
254//!
255//! // Wait for all to complete (deterministic order preserved)
256//! let results = ctx.join(futures).await;
257//!
258//! // Process results with error handling
259//! let mut successes = Vec::new();
260//! for result in results {
261//! match result {
262//! Ok(value) => successes.push(value),
263//! Err(e) => {
264//! // Log error but continue processing other items
265//! ctx.trace_error(format!("Item processing failed: {e}"));
266//! }
267//! }
268//! }
269//!
270//! Ok(successes)
271//! }
272//! ```
273//!
274//! ### Retry Pattern
275//! ```rust,no_run
276//! # use duroxide::{OrchestrationContext, RetryPolicy, BackoffStrategy};
277//! # use std::time::Duration;
278//! async fn retry_example(ctx: OrchestrationContext) -> Result<String, String> {
279//! // Retry with linear backoff: 5 attempts, delay increases linearly (1s, 2s, 3s, 4s)
280//! let result = ctx.schedule_activity_with_retry(
281//! "UnreliableOperation",
282//! "input",
283//! RetryPolicy::new(5)
284//! .with_backoff(BackoffStrategy::Linear {
285//! base: Duration::from_secs(1),
286//! max: Duration::from_secs(10),
287//! }),
288//! ).await?;
289//!
290//! Ok(result)
291//! }
292//! ```
293//!
294//! ## Examples
295//!
296//! See the `examples/` directory for complete, runnable examples:
297//! - `hello_world.rs` - Basic orchestration setup
298//! - `fan_out_fan_in.rs` - Parallel processing pattern with error handling
299//! - `timers_and_events.rs` - Human-in-the-loop workflows with timeouts
300//! - `delays_and_timeouts.rs` - Correct usage of timers for delays and timeouts
301//! - `with_observability.rs` - Using observability features (tracing, metrics)
302//! - `metrics_cli.rs` - Querying system metrics via CLI
303//!
304//! Run examples with: `cargo run --example <name>`
305//!
306//! ## Architecture
307//!
308//! This crate provides:
309//! - **Public data model**: `Event`, `Action` for history and decisions
310//! - **Orchestration driver**: `run_turn`, `run_turn_with`, and `Executor`
311//! - **OrchestrationContext**: Schedule activities, timers, and external events
312//! - **Deterministic futures**: `schedule_*()` return standard `Future`s that can be composed with `join`/`select`
313//! - **Runtime**: In-process execution engine with dispatchers and workers
314//! - **Providers**: Pluggable storage backends (filesystem, in-memory)
315//!
316//! ### End-to-End System Architecture
317//!
318//! ```text
319//! +-------------------------------------------------------------------------+
320//! | Application Layer |
321//! +-------------------------------------------------------------------------+
322//! | |
323//! | +--------------+ +------------------------------------+ |
324//! | | Client |-------->| start_orchestration() | |
325//! | | | | raise_event() | |
326//! | | | | wait_for_orchestration() | |
327//! | +--------------+ +------------------------------------+ |
328//! | |
329//! +-------------------------------------------------------------------------+
330//! |
331//! v
332//! +-------------------------------------------------------------------------+
333//! | Runtime Layer |
334//! +-------------------------------------------------------------------------+
335//! | |
336//! | +-------------------------------------------------------------------+ |
337//! | | Runtime | |
338//! | | +----------------------+ +----------------------+ | |
339//! | | | Orchestration | | Work | | |
340//! | | | Dispatcher | | Dispatcher | | |
341//! | | | (N concurrent) | | (N concurrent) | | |
342//! | | +----------+-----------+ +----------+-----------+ | |
343//! | | | | | |
344//! | | | Processes turns | Executes activities| |
345//! | | | | | |
346//! | +-------------+--------------------------------+--------------------+ |
347//! | | | |
348//! | +-------------v--------------------------------v--------------------+ |
349//! | | OrchestrationRegistry: maps names -> orchestration handlers | |
350//! | +-------------------------------------------------------------------+ |
351//! | |
352//! | +-------------------------------------------------------------------+ |
353//! | | ActivityRegistry: maps names -> activity handlers | |
354//! | +-------------------------------------------------------------------+ |
355//! | |
356//! +-------------------------------------------------------------------------+
357//! | |
358//! | Fetches work items | Fetches work items
359//! | (peek-lock) | (peek-lock)
360//! v v
361//! +-------------------------------------------------------------------------+
362//! | Provider Layer |
363//! +-------------------------------------------------------------------------+
364//! | |
365//! | +----------------------------+ +----------------------------+ |
366//! | | Orchestrator Queue | | Worker Queue | |
367//! | | - StartOrchestration | | - ActivityExecute | |
368//! | | - ActivityCompleted | | | |
369//! | | - ActivityFailed | | | |
370//! | | - TimerFired (delayed) | | | |
371//! | | - ExternalRaised | | | |
372//! | | - ContinueAsNew | | | |
373//! | +----------------------------+ +----------------------------+ |
374//! | |
375//! | +-------------------------------------------------------------------+ |
376//! | | Provider (Storage) | |
377//! | | - History (Events per instance/execution) | |
378//! | | - Instance metadata | |
379//! | | - Execution metadata | |
380//! | | - Instance locks (peek-lock semantics) | |
381//! | | - Queue management (enqueue/dequeue with visibility) | |
382//! | +-------------------------------------------------------------------+ |
383//! | |
384//! | +-------------------------------------------------------------------+ |
385//! | | Storage Backend (SQLite, etc.) | |
386//! | +-------------------------------------------------------------------+ |
387//! | |
388//! +-------------------------------------------------------------------------+
389//!
390//! ### Execution Flow
391//!
392//! 1. **Client** starts orchestration → enqueues `StartOrchestration` to orchestrator queue
393//! 2. **OrchestrationDispatcher** fetches work item (peek-lock), loads history from Provider
394//! 3. **Runtime** calls user's orchestration function with `OrchestrationContext`
395//! 4. **Orchestration** schedules activities/timers → Runtime appends `Event`s to history
396//! 5. **Runtime** enqueues `ActivityExecute` to worker queue, `TimerFired` (delayed) to orchestrator queue
397//! 6. **WorkDispatcher** fetches activity work item, executes via `ActivityRegistry`
398//! 7. **Activity** completes → enqueues `ActivityCompleted`/`ActivityFailed` to orchestrator queue
399//! 8. **OrchestrationDispatcher** processes completion → next orchestration turn
400//! 9. **Runtime** atomically commits history + queue changes via `ack_orchestration_item()`
401//!
402//! All operations are deterministic and replayable from history.
403use std::future::Future;
404use std::pin::Pin;
405use std::sync::{Arc, Mutex};
406use std::task::{Context, Poll};
407
408// Public orchestration primitives and executor
409
410pub mod client;
411pub mod runtime;
412// Re-export descriptor type for public API ergonomics
413pub use runtime::OrchestrationDescriptor;
414pub mod providers;
415
416#[cfg(feature = "provider-test")]
417pub mod provider_validations;
418
419#[cfg(feature = "provider-test")]
420pub mod provider_validation;
421
422#[cfg(feature = "provider-test")]
423pub mod provider_stress_tests;
424
425#[cfg(feature = "provider-test")]
426pub mod provider_stress_test;
427
428// Re-export key runtime types for convenience
429pub use client::{Client, ClientError};
430pub use runtime::{
431 OrchestrationHandler, OrchestrationRegistry, OrchestrationRegistryBuilder, OrchestrationStatus, RuntimeOptions,
432};
433
434// Re-export management types for convenience
435pub use providers::{
436 ExecutionInfo, InstanceInfo, ProviderAdmin, QueueDepths, ScheduledActivityIdentifier, SessionFetchConfig,
437 SystemMetrics,
438};
439
440// Re-export capability filtering types
441pub use providers::{DispatcherCapabilityFilter, SemverRange, current_build_version};
442
443// Re-export deletion/pruning types for Client API users
444pub use providers::{DeleteInstanceResult, InstanceFilter, InstanceTree, PruneOptions, PruneResult};
445
446// Type aliases for improved readability and maintainability
447/// Shared reference to a Provider implementation
448pub type ProviderRef = Arc<dyn providers::Provider>;
449
450/// Shared reference to an OrchestrationHandler
451pub type OrchestrationHandlerRef = Arc<dyn runtime::OrchestrationHandler>;
452
453// ============================================================================
454// Heterogeneous Select Result Types
455// ============================================================================
456
457// ============================================================================
458// Schedule Kind and DurableFuture (Cancellation Support)
459// ============================================================================
460
461/// Identifies the kind of scheduled work for cancellation purposes.
462///
463/// When a `DurableFuture` is dropped without completing, the runtime uses
464/// this discriminator to determine how to cancel the underlying work:
465/// - **Activity**: Lock stealing via provider (DELETE from worker_queue)
466/// - **Timer**: No-op (virtual construct, no external state)
467/// - **ExternalWait**: No-op (virtual construct, no external state)
468/// - **SubOrchestration**: Enqueue `CancelInstance` work item for child
469#[derive(Debug, Clone)]
470pub enum ScheduleKind {
471 /// A scheduled activity execution
472 Activity {
473 /// Activity name for debugging/logging
474 name: String,
475 },
476 /// A durable timer
477 Timer,
478 /// Waiting for an external event
479 ExternalWait {
480 /// Event name for debugging/logging
481 event_name: String,
482 },
483 /// A sub-orchestration
484 SubOrchestration {
485 /// Token for this schedule (used to look up resolved instance ID)
486 token: u64,
487 },
488}
489
490/// A wrapper around scheduled futures that supports cancellation on drop.
491///
492/// When a `DurableFuture` is dropped without completing (e.g., as a select loser,
493/// or when going out of scope without being awaited), the underlying scheduled work
494/// is cancelled:
495///
496/// - **Activities**: Lock stealing via provider (removes from worker queue)
497/// - **Sub-orchestrations**: `CancelInstance` work item enqueued for child
498/// - **Timers/External waits**: No-op (virtual constructs with no external state)
499///
500/// # Examples
501///
502/// ```rust,no_run
503/// # use duroxide::OrchestrationContext;
504/// # use std::time::Duration;
505/// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
506/// // Activity scheduled - if timer wins, activity gets cancelled
507/// let activity = ctx.schedule_activity("SlowWork", "input");
508/// let timeout = ctx.schedule_timer(Duration::from_secs(5));
509///
510/// match ctx.select2(activity, timeout).await {
511/// duroxide::Either2::First(result) => result,
512/// duroxide::Either2::Second(()) => Err("Timed out - activity cancelled".to_string()),
513/// }
514/// # }
515/// ```
516///
517/// # Drop Semantics
518///
519/// Unlike regular Rust futures which are inert on drop, `DurableFuture` has
520/// meaningful drop semantics similar to `File` (closes on drop) or `MutexGuard`
521/// (releases lock on drop). This is intentional - we want unobserved scheduled
522/// work to be cancelled rather than leaked.
523///
524/// **Note:** Using `std::mem::forget()` on a `DurableFuture` will bypass
525/// cancellation, causing the scheduled work to run but its result to be lost.
526pub struct DurableFuture<T> {
527 /// Token assigned at creation (before schedule_id is known)
528 token: u64,
529 /// What kind of schedule this represents
530 kind: ScheduleKind,
531 /// Reference to context for cancellation registration
532 ctx: OrchestrationContext,
533 /// Whether the future has completed (to skip cancellation)
534 completed: bool,
535 /// The underlying future
536 inner: std::pin::Pin<Box<dyn Future<Output = T> + Send>>,
537}
538
539impl<T> DurableFuture<T> {
540 /// Create a new `DurableFuture` wrapping an inner future.
541 fn new(
542 token: u64,
543 kind: ScheduleKind,
544 ctx: OrchestrationContext,
545 inner: impl Future<Output = T> + Send + 'static,
546 ) -> Self {
547 Self {
548 token,
549 kind,
550 ctx,
551 completed: false,
552 inner: Box::pin(inner),
553 }
554 }
555}
556
557impl<T> Future for DurableFuture<T> {
558 type Output = T;
559
560 fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<T> {
561 match self.inner.as_mut().poll(cx) {
562 Poll::Ready(value) => {
563 self.completed = true;
564 Poll::Ready(value)
565 }
566 Poll::Pending => Poll::Pending,
567 }
568 }
569}
570
571impl<T> Drop for DurableFuture<T> {
572 fn drop(&mut self) {
573 if !self.completed {
574 // Future dropped without completing - trigger cancellation.
575 // Note: During dehydration (TurnResult::Continue), the orchestration future
576 // is dropped after collect_cancelled_from_context() has already run, so these
577 // cancellations go into a context that's about to be dropped. This is safe
578 // because the next turn creates a fresh context.
579 self.ctx.mark_token_cancelled(self.token, &self.kind);
580 }
581 }
582}
583
584// DurableFuture is Send if T is Send (inner is already Send-boxed)
585unsafe impl<T: Send> Send for DurableFuture<T> {}
586
587/// Result type for `select2` - represents which of two futures completed first.
588///
589/// Use this when racing two futures with different output types:
590/// ```rust,no_run
591/// # use duroxide::{OrchestrationContext, Either2};
592/// # use std::time::Duration;
593/// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
594/// let activity = ctx.schedule_activity("Work", "input");
595/// let timeout = ctx.schedule_timer(Duration::from_secs(30));
596///
597/// match ctx.select2(activity, timeout).await {
598/// Either2::First(result) => result,
599/// Either2::Second(()) => Err("Timed out".to_string()),
600/// }
601/// # }
602/// ```
603#[derive(Debug, Clone, PartialEq, Eq)]
604pub enum Either2<A, B> {
605 /// First future completed first
606 First(A),
607 /// Second future completed first
608 Second(B),
609}
610
611impl<A, B> Either2<A, B> {
612 /// Returns true if this is the First variant
613 pub fn is_first(&self) -> bool {
614 matches!(self, Either2::First(_))
615 }
616
617 /// Returns true if this is the Second variant
618 pub fn is_second(&self) -> bool {
619 matches!(self, Either2::Second(_))
620 }
621
622 /// Returns the index of the winner (0 for First, 1 for Second)
623 pub fn index(&self) -> usize {
624 match self {
625 Either2::First(_) => 0,
626 Either2::Second(_) => 1,
627 }
628 }
629}
630
631impl<T> Either2<T, T> {
632 /// For homogeneous Either2 (both types are the same), extract as (index, value).
633 /// This is useful for migration from the old `(usize, T)` return type.
634 pub fn into_tuple(self) -> (usize, T) {
635 match self {
636 Either2::First(v) => (0, v),
637 Either2::Second(v) => (1, v),
638 }
639 }
640}
641
642/// Result type for `select3` - represents which of three futures completed first.
643#[derive(Debug, Clone, PartialEq, Eq)]
644pub enum Either3<A, B, C> {
645 /// First future completed first
646 First(A),
647 /// Second future completed first
648 Second(B),
649 /// Third future completed first
650 Third(C),
651}
652
653impl<A, B, C> Either3<A, B, C> {
654 /// Returns the index of the winner (0 for First, 1 for Second, 2 for Third)
655 pub fn index(&self) -> usize {
656 match self {
657 Either3::First(_) => 0,
658 Either3::Second(_) => 1,
659 Either3::Third(_) => 2,
660 }
661 }
662}
663
664impl<T> Either3<T, T, T> {
665 /// For homogeneous Either3 (all types are the same), extract as (index, value).
666 pub fn into_tuple(self) -> (usize, T) {
667 match self {
668 Either3::First(v) => (0, v),
669 Either3::Second(v) => (1, v),
670 Either3::Third(v) => (2, v),
671 }
672 }
673}
674
675// Reserved prefix for built-in system activities.
676// User-registered activities cannot use names starting with this prefix.
677pub(crate) const SYSCALL_ACTIVITY_PREFIX: &str = "__duroxide_syscall:";
678
679// Built-in system activity names (constructed from prefix)
680pub(crate) const SYSCALL_ACTIVITY_NEW_GUID: &str = "__duroxide_syscall:new_guid";
681pub(crate) const SYSCALL_ACTIVITY_UTC_NOW_MS: &str = "__duroxide_syscall:utc_now_ms";
682
683use crate::_typed_codec::Codec;
684// LogLevel is now defined locally in this file
685use serde::{Deserialize, Serialize};
686use std::time::{Duration as StdDuration, SystemTime, UNIX_EPOCH};
687
688// Internal codec utilities for typed I/O (kept private; public API remains ergonomic)
689mod _typed_codec {
690 use serde::{Serialize, de::DeserializeOwned};
691 use serde_json::Value;
692 pub trait Codec {
693 fn encode<T: Serialize>(v: &T) -> Result<String, String>;
694 fn decode<T: DeserializeOwned>(s: &str) -> Result<T, String>;
695 }
696 pub struct Json;
697 impl Codec for Json {
698 fn encode<T: Serialize>(v: &T) -> Result<String, String> {
699 // If the value is a JSON string, return raw content to preserve historic behavior
700 match serde_json::to_value(v) {
701 Ok(Value::String(s)) => Ok(s),
702 Ok(val) => serde_json::to_string(&val).map_err(|e| e.to_string()),
703 Err(e) => Err(e.to_string()),
704 }
705 }
706 fn decode<T: DeserializeOwned>(s: &str) -> Result<T, String> {
707 // Try parse as JSON first
708 match serde_json::from_str::<T>(s) {
709 Ok(v) => Ok(v),
710 Err(_) => {
711 // Fallback: treat raw string as JSON string value
712 let val = Value::String(s.to_string());
713 serde_json::from_value(val).map_err(|e| e.to_string())
714 }
715 }
716 }
717 }
718}
719
720/// Initial execution ID for new orchestration instances.
721/// All orchestrations start with execution_id = 1.
722pub const INITIAL_EXECUTION_ID: u64 = 1;
723
724/// Initial event ID for new executions.
725/// The first event (OrchestrationStarted) always has event_id = 1.
726pub const INITIAL_EVENT_ID: u64 = 1;
727
728// =============================================================================
729// Sub-orchestration instance ID conventions
730// =============================================================================
731
732/// Prefix for auto-generated sub-orchestration instance IDs.
733/// IDs starting with this prefix will have parent prefix added: `{parent}::{sub::N}`
734pub const SUB_ORCH_AUTO_PREFIX: &str = "sub::";
735
736/// Prefix for placeholder instance IDs before event ID assignment.
737/// These are replaced with `sub::{event_id}` during action processing.
738pub(crate) const SUB_ORCH_PENDING_PREFIX: &str = "sub::pending_";
739
740/// Determine if a sub-orchestration instance ID is auto-generated (needs parent prefix).
741///
742/// Auto-generated IDs start with "sub::" and will have the parent instance prefixed
743/// to create a globally unique ID: `{parent_instance}::{child_instance}`.
744///
745/// Explicit IDs (those not starting with "sub::") are used exactly as provided.
746#[inline]
747pub fn is_auto_generated_sub_orch_id(instance: &str) -> bool {
748 instance.starts_with(SUB_ORCH_AUTO_PREFIX)
749}
750
751/// Build the full child instance ID, adding parent prefix only for auto-generated IDs.
752///
753/// - Auto-generated IDs (starting with "sub::"): `{parent}::{child}` (e.g., `parent-1::sub::5`)
754/// - Explicit IDs: used exactly as provided (e.g., `my-custom-child-id`)
755#[inline]
756pub fn build_child_instance_id(parent_instance: &str, child_instance: &str) -> String {
757 if is_auto_generated_sub_orch_id(child_instance) {
758 format!("{parent_instance}::{child_instance}")
759 } else {
760 child_instance.to_string()
761 }
762}
763
764/// Structured error details for orchestration failures.
765///
766/// Errors are categorized into three types for proper metrics and logging:
767/// - **Infrastructure**: Provider failures, data corruption (abort turn, never reach user code)
768/// - **Configuration**: Deployment issues like unregistered activities, nondeterminism (abort turn)
769/// - **Application**: Business logic failures (flow through normal orchestration code)
770#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
771pub enum ErrorDetails {
772 /// Infrastructure failure (provider errors, data corruption).
773 /// These errors abort orchestration execution and never reach user code.
774 Infrastructure {
775 operation: String,
776 message: String,
777 retryable: bool,
778 },
779
780 /// Configuration error (unregistered orchestrations/activities, nondeterminism).
781 /// These errors abort orchestration execution and never reach user code.
782 Configuration {
783 kind: ConfigErrorKind,
784 resource: String,
785 message: Option<String>,
786 },
787
788 /// Application error (business logic failures).
789 /// These are the ONLY errors that orchestration code sees.
790 Application {
791 kind: AppErrorKind,
792 message: String,
793 retryable: bool,
794 },
795
796 /// Poison message error - message exceeded max fetch attempts.
797 ///
798 /// This indicates a message that repeatedly fails to process.
799 /// Could be caused by:
800 /// - Malformed message data causing deserialization failures
801 /// - Message triggering bugs that crash the worker
802 /// - Transient infrastructure issues that became permanent
803 /// - Application code bugs triggered by specific input patterns
804 Poison {
805 /// Number of times the message was fetched
806 attempt_count: u32,
807 /// Maximum allowed attempts
808 max_attempts: u32,
809 /// Message type and identity
810 message_type: PoisonMessageType,
811 /// The poisoned message content (serialized JSON for debugging)
812 message: String,
813 },
814}
815
816/// Poison message type identification.
817#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
818pub enum PoisonMessageType {
819 /// Orchestration work item batch
820 Orchestration { instance: String, execution_id: u64 },
821 /// Activity execution
822 Activity {
823 instance: String,
824 execution_id: u64,
825 activity_name: String,
826 activity_id: u64,
827 },
828 /// History deserialization failure (e.g., unknown event types from a newer duroxide version)
829 FailedDeserialization {
830 instance: String,
831 execution_id: u64,
832 error: String,
833 },
834}
835
836/// Configuration error kinds.
837#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
838pub enum ConfigErrorKind {
839 Nondeterminism,
840}
841
842/// Application error kinds.
843#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
844pub enum AppErrorKind {
845 ActivityFailed,
846 OrchestrationFailed,
847 Panicked,
848 Cancelled { reason: String },
849}
850
851impl ErrorDetails {
852 /// Get failure category for metrics/logging.
853 pub fn category(&self) -> &'static str {
854 match self {
855 ErrorDetails::Infrastructure { .. } => "infrastructure",
856 ErrorDetails::Configuration { .. } => "configuration",
857 ErrorDetails::Application { .. } => "application",
858 ErrorDetails::Poison { .. } => "poison",
859 }
860 }
861
862 /// Check if failure is retryable.
863 pub fn is_retryable(&self) -> bool {
864 match self {
865 ErrorDetails::Infrastructure { retryable, .. } => *retryable,
866 ErrorDetails::Application { retryable, .. } => *retryable,
867 ErrorDetails::Configuration { .. } => false,
868 ErrorDetails::Poison { .. } => false, // Never retryable
869 }
870 }
871
872 /// Get display message for logging/UI (backward compatible format).
873 pub fn display_message(&self) -> String {
874 match self {
875 ErrorDetails::Infrastructure { operation, message, .. } => {
876 format!("infrastructure:{operation}: {message}")
877 }
878 ErrorDetails::Configuration {
879 kind,
880 resource,
881 message,
882 } => match kind {
883 ConfigErrorKind::Nondeterminism => message
884 .as_ref()
885 .map(|m| format!("nondeterministic: {m}"))
886 .unwrap_or_else(|| format!("nondeterministic in {resource}")),
887 },
888 ErrorDetails::Application { kind, message, .. } => match kind {
889 AppErrorKind::Cancelled { reason } => format!("canceled: {reason}"),
890 AppErrorKind::Panicked => format!("orchestration panicked: {message}"),
891 _ => message.clone(),
892 },
893 ErrorDetails::Poison {
894 attempt_count,
895 max_attempts,
896 message_type,
897 ..
898 } => match message_type {
899 PoisonMessageType::Orchestration { instance, .. } => {
900 format!("poison: orchestration {instance} exceeded {attempt_count} attempts (max {max_attempts})")
901 }
902 PoisonMessageType::Activity {
903 activity_name,
904 activity_id,
905 ..
906 } => {
907 format!(
908 "poison: activity {activity_name}#{activity_id} exceeded {attempt_count} attempts (max {max_attempts})"
909 )
910 }
911 PoisonMessageType::FailedDeserialization { instance, error, .. } => {
912 format!(
913 "poison: orchestration {instance} history deserialization failed after {attempt_count} attempts (max {max_attempts}): {error}"
914 )
915 }
916 },
917 }
918 }
919}
920
921/// Unified event with common metadata and type-specific payload.
922///
923/// All events have common fields (event_id, source_event_id, instance_id, etc.)
924/// plus type-specific data in the `kind` field.
925///
926/// Events are append-only history entries persisted by a provider and consumed during replay.
927/// The `event_id` is a monotonically increasing position in history.
928/// Scheduling and completion events are linked via `source_event_id`.
929#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
930pub struct Event {
931 /// Sequential position in history (monotonically increasing per execution)
932 pub event_id: u64,
933
934 /// For completion events: references the scheduling event this completes.
935 /// None for lifecycle events (OrchestrationStarted, etc.) and scheduling events.
936 /// Some(id) for completion events (ActivityCompleted, TimerFired, etc.).
937 pub source_event_id: Option<u64>,
938
939 /// Instance this event belongs to.
940 /// Denormalized from DB key for self-contained events.
941 pub instance_id: String,
942
943 /// Execution this event belongs to.
944 /// Denormalized from DB key for self-contained events.
945 pub execution_id: u64,
946
947 /// Timestamp when event was created (milliseconds since Unix epoch).
948 pub timestamp_ms: u64,
949
950 /// Crate semver version that generated this event.
951 /// Format: "0.1.0", "0.2.0", etc.
952 pub duroxide_version: String,
953
954 /// Event type and associated data.
955 #[serde(flatten)]
956 pub kind: EventKind,
957}
958
959/// Event-specific payloads.
960///
961/// Common fields have been extracted to the Event struct:
962/// - event_id: moved to Event.event_id
963/// - source_event_id: moved to Event.source_event_id (`Option<u64>`)
964/// - execution_id: moved to Event.execution_id (was in 4 variants)
965#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
966#[serde(tag = "type")]
967pub enum EventKind {
968 /// Orchestration instance was created and started by name with input.
969 /// Version is required; parent linkage is present when this is a child orchestration.
970 #[serde(rename = "OrchestrationStarted")]
971 OrchestrationStarted {
972 name: String,
973 version: String,
974 input: String,
975 parent_instance: Option<String>,
976 parent_id: Option<u64>,
977 },
978
979 /// Orchestration completed with a final result.
980 #[serde(rename = "OrchestrationCompleted")]
981 OrchestrationCompleted { output: String },
982
983 /// Orchestration failed with a final error.
984 #[serde(rename = "OrchestrationFailed")]
985 OrchestrationFailed { details: ErrorDetails },
986
987 /// Activity was scheduled.
988 #[serde(rename = "ActivityScheduled")]
989 ActivityScheduled {
990 name: String,
991 input: String,
992 #[serde(skip_serializing_if = "Option::is_none")]
993 #[serde(default)]
994 session_id: Option<String>,
995 },
996
997 /// Activity completed successfully with a result.
998 #[serde(rename = "ActivityCompleted")]
999 ActivityCompleted { result: String },
1000
1001 /// Activity failed with error details.
1002 #[serde(rename = "ActivityFailed")]
1003 ActivityFailed { details: ErrorDetails },
1004
1005 /// Cancellation was requested for an activity (best-effort; completion may still arrive).
1006 /// Correlates to the ActivityScheduled event via Event.source_event_id.
1007 #[serde(rename = "ActivityCancelRequested")]
1008 ActivityCancelRequested { reason: String },
1009
1010 /// Timer was created and will logically fire at `fire_at_ms`.
1011 #[serde(rename = "TimerCreated")]
1012 TimerCreated { fire_at_ms: u64 },
1013
1014 /// Timer fired at logical time `fire_at_ms`.
1015 #[serde(rename = "TimerFired")]
1016 TimerFired { fire_at_ms: u64 },
1017
1018 /// Subscription to an external event by name was recorded.
1019 #[serde(rename = "ExternalSubscribed")]
1020 ExternalSubscribed { name: String },
1021
1022 /// An external event was raised. Matched by name (no source_event_id).
1023 #[serde(rename = "ExternalEvent")]
1024 ExternalEvent { name: String, data: String },
1025
1026 /// Fire-and-forget orchestration scheduling (detached).
1027 #[serde(rename = "OrchestrationChained")]
1028 OrchestrationChained {
1029 name: String,
1030 instance: String,
1031 input: String,
1032 },
1033
1034 /// Sub-orchestration was scheduled with deterministic child instance id.
1035 #[serde(rename = "SubOrchestrationScheduled")]
1036 SubOrchestrationScheduled {
1037 name: String,
1038 instance: String,
1039 input: String,
1040 },
1041
1042 /// Sub-orchestration completed and returned a result to the parent.
1043 #[serde(rename = "SubOrchestrationCompleted")]
1044 SubOrchestrationCompleted { result: String },
1045
1046 /// Sub-orchestration failed and returned error details to the parent.
1047 #[serde(rename = "SubOrchestrationFailed")]
1048 SubOrchestrationFailed { details: ErrorDetails },
1049
1050 /// Cancellation was requested for a sub-orchestration (best-effort; completion may still arrive).
1051 /// Correlates to the SubOrchestrationScheduled event via Event.source_event_id.
1052 #[serde(rename = "SubOrchestrationCancelRequested")]
1053 SubOrchestrationCancelRequested { reason: String },
1054
1055 /// Orchestration continued as new with fresh input (terminal for this execution).
1056 #[serde(rename = "OrchestrationContinuedAsNew")]
1057 OrchestrationContinuedAsNew { input: String },
1058
1059 /// Cancellation has been requested for the orchestration (terminal will follow deterministically).
1060 #[serde(rename = "OrchestrationCancelRequested")]
1061 OrchestrationCancelRequested { reason: String },
1062
1063 /// V2 subscription: includes a topic filter for pub/sub matching.
1064 /// Feature-gated for replay engine extensibility verification.
1065 #[cfg(feature = "replay-version-test")]
1066 #[serde(rename = "ExternalSubscribed2")]
1067 ExternalSubscribed2 { name: String, topic: String },
1068
1069 /// V2 event: includes the actual topic it was published on.
1070 /// Feature-gated for replay engine extensibility verification.
1071 #[cfg(feature = "replay-version-test")]
1072 #[serde(rename = "ExternalEvent2")]
1073 ExternalEvent2 { name: String, topic: String, data: String },
1074}
1075
1076impl Event {
1077 /// Create a new event with common fields populated and a specific event_id.
1078 ///
1079 /// Use this when you know the event_id upfront (e.g., during replay or when
1080 /// creating events inline).
1081 pub fn with_event_id(
1082 event_id: u64,
1083 instance_id: impl Into<String>,
1084 execution_id: u64,
1085 source_event_id: Option<u64>,
1086 kind: EventKind,
1087 ) -> Self {
1088 use std::time::{SystemTime, UNIX_EPOCH};
1089 Event {
1090 event_id,
1091 source_event_id,
1092 instance_id: instance_id.into(),
1093 execution_id,
1094 timestamp_ms: SystemTime::now()
1095 .duration_since(UNIX_EPOCH)
1096 .map(|d| d.as_millis() as u64)
1097 .unwrap_or(0),
1098 duroxide_version: env!("CARGO_PKG_VERSION").to_string(),
1099 kind,
1100 }
1101 }
1102
1103 /// Create a new event with common fields populated.
1104 ///
1105 /// The event_id will be 0 and should be set by the history manager.
1106 pub fn new(
1107 instance_id: impl Into<String>,
1108 execution_id: u64,
1109 source_event_id: Option<u64>,
1110 kind: EventKind,
1111 ) -> Self {
1112 Self::with_event_id(0, instance_id, execution_id, source_event_id, kind)
1113 }
1114
1115 /// Get the event_id (position in history).
1116 #[inline]
1117 pub fn event_id(&self) -> u64 {
1118 self.event_id
1119 }
1120
1121 /// Set the event_id (used by runtime when adding events to history).
1122 #[inline]
1123 pub(crate) fn set_event_id(&mut self, id: u64) {
1124 self.event_id = id;
1125 }
1126
1127 /// Get the source_event_id if this is a completion event.
1128 /// Returns None for lifecycle and scheduling events.
1129 #[inline]
1130 pub fn source_event_id(&self) -> Option<u64> {
1131 self.source_event_id
1132 }
1133
1134 /// Check if this event is a terminal event (ends the orchestration).
1135 pub fn is_terminal(&self) -> bool {
1136 matches!(
1137 self.kind,
1138 EventKind::OrchestrationCompleted { .. }
1139 | EventKind::OrchestrationFailed { .. }
1140 | EventKind::OrchestrationContinuedAsNew { .. }
1141 )
1142 }
1143}
1144
1145/// Log levels for orchestration context logging.
1146#[derive(Debug, Clone)]
1147pub enum LogLevel {
1148 Info,
1149 Warn,
1150 Error,
1151}
1152
1153/// Backoff strategy for computing delay between retry attempts.
1154#[derive(Debug, Clone)]
1155pub enum BackoffStrategy {
1156 /// No delay between retries.
1157 None,
1158 /// Fixed delay between all retries.
1159 Fixed {
1160 /// Delay duration between each retry.
1161 delay: std::time::Duration,
1162 },
1163 /// Linear backoff: delay = base * attempt, capped at max.
1164 Linear {
1165 /// Base delay multiplied by attempt number.
1166 base: std::time::Duration,
1167 /// Maximum delay cap.
1168 max: std::time::Duration,
1169 },
1170 /// Exponential backoff: delay = base * multiplier^(attempt-1), capped at max.
1171 Exponential {
1172 /// Initial delay for first retry.
1173 base: std::time::Duration,
1174 /// Multiplier applied each attempt.
1175 multiplier: f64,
1176 /// Maximum delay cap.
1177 max: std::time::Duration,
1178 },
1179}
1180
1181impl Default for BackoffStrategy {
1182 fn default() -> Self {
1183 BackoffStrategy::Exponential {
1184 base: std::time::Duration::from_millis(100),
1185 multiplier: 2.0,
1186 max: std::time::Duration::from_secs(30),
1187 }
1188 }
1189}
1190
1191impl BackoffStrategy {
1192 /// Compute delay for given attempt (1-indexed).
1193 /// Attempt 1 is after first failure, so delay_for_attempt(1) is the first backoff.
1194 pub fn delay_for_attempt(&self, attempt: u32) -> std::time::Duration {
1195 match self {
1196 BackoffStrategy::None => std::time::Duration::ZERO,
1197 BackoffStrategy::Fixed { delay } => *delay,
1198 BackoffStrategy::Linear { base, max } => {
1199 let delay = base.saturating_mul(attempt);
1200 std::cmp::min(delay, *max)
1201 }
1202 BackoffStrategy::Exponential { base, multiplier, max } => {
1203 // delay = base * multiplier^(attempt-1)
1204 let factor = multiplier.powi(attempt.saturating_sub(1) as i32);
1205 let delay_nanos = (base.as_nanos() as f64 * factor) as u128;
1206 let delay = std::time::Duration::from_nanos(delay_nanos.min(u64::MAX as u128) as u64);
1207 std::cmp::min(delay, *max)
1208 }
1209 }
1210 }
1211}
1212
1213/// Retry policy for activities.
1214///
1215/// Configures automatic retry behavior including maximum attempts, backoff strategy,
1216/// and optional total timeout spanning all attempts.
1217///
1218/// # Example
1219///
1220/// ```rust
1221/// use std::time::Duration;
1222/// use duroxide::{RetryPolicy, BackoffStrategy};
1223///
1224/// // Simple retry with defaults (3 attempts, exponential backoff)
1225/// let policy = RetryPolicy::new(3);
1226///
1227/// // Custom policy with timeout and fixed backoff
1228/// let policy = RetryPolicy::new(5)
1229/// .with_timeout(Duration::from_secs(30))
1230/// .with_backoff(BackoffStrategy::Fixed {
1231/// delay: Duration::from_secs(1),
1232/// });
1233/// ```
1234#[derive(Debug, Clone)]
1235pub struct RetryPolicy {
1236 /// Maximum number of attempts (including initial). Must be >= 1.
1237 pub max_attempts: u32,
1238 /// Backoff strategy between retries.
1239 pub backoff: BackoffStrategy,
1240 /// Per-attempt timeout. If set, each activity attempt is raced against this
1241 /// timeout. If timeout fires, returns error immediately (no retry).
1242 /// Retries only occur for activity errors, not timeouts. None = no timeout.
1243 pub timeout: Option<std::time::Duration>,
1244}
1245
1246impl Default for RetryPolicy {
1247 fn default() -> Self {
1248 Self {
1249 max_attempts: 3,
1250 backoff: BackoffStrategy::default(),
1251 timeout: None,
1252 }
1253 }
1254}
1255
1256impl RetryPolicy {
1257 /// Create a new retry policy with specified max attempts and default backoff.
1258 ///
1259 /// # Panics
1260 /// Panics if `max_attempts` is 0.
1261 pub fn new(max_attempts: u32) -> Self {
1262 assert!(max_attempts >= 1, "max_attempts must be at least 1");
1263 Self {
1264 max_attempts,
1265 ..Default::default()
1266 }
1267 }
1268
1269 /// Set per-attempt timeout.
1270 ///
1271 /// Each activity attempt is raced against this timeout. If the timeout fires
1272 /// before the activity completes, returns an error immediately (no retry).
1273 /// Retries only occur for activity errors, not timeouts.
1274 pub fn with_timeout(mut self, timeout: std::time::Duration) -> Self {
1275 self.timeout = Some(timeout);
1276 self
1277 }
1278
1279 /// Alias for `with_timeout` for backwards compatibility.
1280 #[doc(hidden)]
1281 pub fn with_total_timeout(mut self, timeout: std::time::Duration) -> Self {
1282 self.timeout = Some(timeout);
1283 self
1284 }
1285
1286 /// Set backoff strategy.
1287 pub fn with_backoff(mut self, backoff: BackoffStrategy) -> Self {
1288 self.backoff = backoff;
1289 self
1290 }
1291
1292 /// Compute delay for given attempt using the configured backoff strategy.
1293 pub fn delay_for_attempt(&self, attempt: u32) -> std::time::Duration {
1294 self.backoff.delay_for_attempt(attempt)
1295 }
1296}
1297
1298/// Declarative decisions produced by an orchestration turn. The host/provider
1299/// is responsible for materializing these into corresponding `Event`s.
1300#[derive(Debug, Clone)]
1301pub enum Action {
1302 /// Schedule an activity invocation. scheduling_event_id is the event_id of the ActivityScheduled event.
1303 CallActivity {
1304 scheduling_event_id: u64,
1305 name: String,
1306 input: String,
1307 /// Optional session ID for worker affinity routing.
1308 /// When set, the activity is routed to the worker owning this session.
1309 session_id: Option<String>,
1310 },
1311 /// Create a timer that will fire at the specified absolute time.
1312 /// scheduling_event_id is the event_id of the TimerCreated event.
1313 /// fire_at_ms is the absolute timestamp (ms since epoch) when the timer should fire.
1314 CreateTimer { scheduling_event_id: u64, fire_at_ms: u64 },
1315 /// Subscribe to an external event by name. scheduling_event_id is the event_id of the ExternalSubscribed event.
1316 WaitExternal { scheduling_event_id: u64, name: String },
1317 /// Start a detached orchestration (no result routing back to parent).
1318 StartOrchestrationDetached {
1319 scheduling_event_id: u64,
1320 name: String,
1321 version: Option<String>,
1322 instance: String,
1323 input: String,
1324 },
1325 /// Start a sub-orchestration by name and child instance id. scheduling_event_id is the event_id of the SubOrchestrationScheduled event.
1326 StartSubOrchestration {
1327 scheduling_event_id: u64,
1328 name: String,
1329 version: Option<String>,
1330 instance: String,
1331 input: String,
1332 },
1333
1334 /// Continue the current orchestration as a new execution with new input (terminal for current execution).
1335 /// Optional version string selects the target orchestration version for the new execution.
1336 ContinueAsNew { input: String, version: Option<String> },
1337
1338 /// V2: Subscribe to an external event with topic-based pub/sub matching.
1339 /// Feature-gated for replay engine extensibility verification.
1340 #[cfg(feature = "replay-version-test")]
1341 WaitExternal2 {
1342 scheduling_event_id: u64,
1343 name: String,
1344 topic: String,
1345 },
1346}
1347
1348/// Result delivered to a durable future upon completion.
1349///
1350/// This enum represents the completion states for various durable operations.
1351#[doc(hidden)]
1352#[derive(Debug, Clone)]
1353#[allow(dead_code)] // ExternalData is part of the design but delivered via get_external_event
1354pub enum CompletionResult {
1355 /// Activity completed successfully
1356 ActivityOk(String),
1357 /// Activity failed with error
1358 ActivityErr(String),
1359 /// Timer fired
1360 TimerFired,
1361 /// Sub-orchestration completed successfully
1362 SubOrchOk(String),
1363 /// Sub-orchestration failed with error
1364 SubOrchErr(String),
1365 /// External event data (NOTE: External events delivered via get_external_event, not CompletionResult)
1366 ExternalData(String),
1367}
1368
1369#[derive(Debug)]
1370struct CtxInner {
1371 /// Whether we're currently replaying history (true) or processing new events (false).
1372 /// True while processing baseline_history events, false after.
1373 /// Users can check this via `ctx.is_replaying()` to skip side effects during replay.
1374 is_replaying: bool,
1375
1376 // === Replay Engine State ===
1377 /// Token counter (each schedule_*() call gets a unique token)
1378 next_token: u64,
1379 /// Emitted actions (token -> Action kind info)
1380 /// Token is used to correlate with schedule events during replay
1381 emitted_actions: Vec<(u64, Action)>,
1382 /// Results map: token -> completion result (populated by replay engine)
1383 completion_results: std::collections::HashMap<u64, CompletionResult>,
1384 /// Token -> schedule_id binding (set when replay engine matches action to history)
1385 token_bindings: std::collections::HashMap<u64, u64>,
1386 /// External subscriptions: schedule_id -> (name, subscription_index)
1387 external_subscriptions: std::collections::HashMap<u64, (String, usize)>,
1388 /// External arrivals: name -> list of payloads in arrival order
1389 external_arrivals: std::collections::HashMap<String, Vec<String>>,
1390 /// Next subscription index per external event name
1391 external_next_index: std::collections::HashMap<String, usize>,
1392
1393 // === V2 External Event State (feature-gated) ===
1394 /// V2 external subscriptions: schedule_id -> (name, topic, subscription_index)
1395 #[cfg(feature = "replay-version-test")]
1396 external2_subscriptions: std::collections::HashMap<u64, (String, String, usize)>,
1397 /// V2 external arrivals: (name, topic) -> list of payloads in arrival order
1398 #[cfg(feature = "replay-version-test")]
1399 external2_arrivals: std::collections::HashMap<(String, String), Vec<String>>,
1400 /// Next subscription index per (name, topic) pair
1401 #[cfg(feature = "replay-version-test")]
1402 external2_next_index: std::collections::HashMap<(String, String), usize>,
1403
1404 /// Sub-orchestration token -> resolved instance ID mapping
1405 sub_orchestration_instances: std::collections::HashMap<u64, String>,
1406
1407 // === Cancellation Tracking ===
1408 /// Tokens that have been cancelled (dropped without completing)
1409 cancelled_tokens: std::collections::HashSet<u64>,
1410 /// Cancelled token -> ScheduleKind mapping (for determining cancellation action)
1411 cancelled_token_kinds: std::collections::HashMap<u64, ScheduleKind>,
1412
1413 // Execution metadata
1414 execution_id: u64,
1415 instance_id: String,
1416 orchestration_name: String,
1417 orchestration_version: String,
1418 logging_enabled_this_poll: bool,
1419}
1420
1421impl CtxInner {
1422 fn new(
1423 _history: Vec<Event>, // Kept for API compatibility, no longer used
1424 execution_id: u64,
1425 instance_id: String,
1426 orchestration_name: String,
1427 orchestration_version: String,
1428 _worker_id: Option<String>, // Kept for API compatibility, no longer used
1429 ) -> Self {
1430 Self {
1431 // Start in replaying state - will be set to false when we move past baseline history
1432 is_replaying: true,
1433
1434 // Replay engine state
1435 next_token: 0,
1436 emitted_actions: Vec::new(),
1437 completion_results: Default::default(),
1438 token_bindings: Default::default(),
1439 external_subscriptions: Default::default(),
1440 external_arrivals: Default::default(),
1441 external_next_index: Default::default(),
1442 #[cfg(feature = "replay-version-test")]
1443 external2_subscriptions: Default::default(),
1444 #[cfg(feature = "replay-version-test")]
1445 external2_arrivals: Default::default(),
1446 #[cfg(feature = "replay-version-test")]
1447 external2_next_index: Default::default(),
1448 sub_orchestration_instances: Default::default(),
1449
1450 // Cancellation tracking
1451 cancelled_tokens: Default::default(),
1452 cancelled_token_kinds: Default::default(),
1453
1454 // Execution metadata
1455 execution_id,
1456 instance_id,
1457 orchestration_name,
1458 orchestration_version,
1459 logging_enabled_this_poll: false,
1460 }
1461 }
1462
1463 fn now_ms(&self) -> u64 {
1464 SystemTime::now()
1465 .duration_since(UNIX_EPOCH)
1466 .map(|d| d.as_millis() as u64)
1467 .unwrap_or(0)
1468 }
1469
1470 // === Replay Engine Helpers ===
1471
1472 /// Emit an action and return a token for correlation.
1473 fn emit_action(&mut self, action: Action) -> u64 {
1474 self.next_token += 1;
1475 let token = self.next_token;
1476 self.emitted_actions.push((token, action));
1477 token
1478 }
1479
1480 /// Drain all emitted actions (called by replay engine after polling).
1481 fn drain_emitted_actions(&mut self) -> Vec<(u64, Action)> {
1482 std::mem::take(&mut self.emitted_actions)
1483 }
1484
1485 /// Bind a token to a schedule_id (called by replay engine when matching action to history).
1486 fn bind_token(&mut self, token: u64, schedule_id: u64) {
1487 self.token_bindings.insert(token, schedule_id);
1488 }
1489
1490 /// Get the schedule_id bound to a token (returns None if not yet bound).
1491 fn get_bound_schedule_id(&self, token: u64) -> Option<u64> {
1492 self.token_bindings.get(&token).copied()
1493 }
1494
1495 /// Deliver a completion result for a schedule_id.
1496 fn deliver_result(&mut self, schedule_id: u64, result: CompletionResult) {
1497 // Find the token that was bound to this schedule_id
1498 for (&token, &sid) in &self.token_bindings {
1499 if sid == schedule_id {
1500 self.completion_results.insert(token, result);
1501 return;
1502 }
1503 }
1504 tracing::warn!(
1505 schedule_id,
1506 "dropping completion result with no binding (unsupported for now)"
1507 );
1508 }
1509
1510 /// Check if a result is available for a token.
1511 fn get_result(&self, token: u64) -> Option<&CompletionResult> {
1512 self.completion_results.get(&token)
1513 }
1514
1515 /// Bind an external subscription to a deterministic index.
1516 fn bind_external_subscription(&mut self, schedule_id: u64, name: &str) {
1517 let idx = self.external_next_index.entry(name.to_string()).or_insert(0);
1518 let subscription_index = *idx;
1519 *idx += 1;
1520 self.external_subscriptions
1521 .insert(schedule_id, (name.to_string(), subscription_index));
1522 }
1523
1524 /// Deliver an external event (appends to arrival list for the name).
1525 fn deliver_external_event(&mut self, name: String, data: String) {
1526 self.external_arrivals.entry(name).or_default().push(data);
1527 }
1528
1529 /// Get external event data for a subscription (by schedule_id).
1530 fn get_external_event(&self, schedule_id: u64) -> Option<&String> {
1531 let (name, subscription_index) = self.external_subscriptions.get(&schedule_id)?;
1532 let arrivals = self.external_arrivals.get(name)?;
1533 arrivals.get(*subscription_index)
1534 }
1535
1536 /// V2: Bind an external subscription with topic to a deterministic index.
1537 #[cfg(feature = "replay-version-test")]
1538 fn bind_external_subscription2(&mut self, schedule_id: u64, name: &str, topic: &str) {
1539 let key = (name.to_string(), topic.to_string());
1540 let idx = self.external2_next_index.entry(key.clone()).or_insert(0);
1541 let subscription_index = *idx;
1542 *idx += 1;
1543 self.external2_subscriptions
1544 .insert(schedule_id, (name.to_string(), topic.to_string(), subscription_index));
1545 }
1546
1547 /// V2: Deliver an external event with topic (appends to arrival list for (name, topic)).
1548 #[cfg(feature = "replay-version-test")]
1549 fn deliver_external_event2(&mut self, name: String, topic: String, data: String) {
1550 self.external2_arrivals.entry((name, topic)).or_default().push(data);
1551 }
1552
1553 /// V2: Get external event data for a topic-based subscription (by schedule_id).
1554 #[cfg(feature = "replay-version-test")]
1555 fn get_external_event2(&self, schedule_id: u64) -> Option<&String> {
1556 let (name, topic, subscription_index) = self.external2_subscriptions.get(&schedule_id)?;
1557 let arrivals = self.external2_arrivals.get(&(name.clone(), topic.clone()))?;
1558 arrivals.get(*subscription_index)
1559 }
1560
1561 // === Cancellation Helpers ===
1562
1563 /// Mark a token as cancelled (called by DurableFuture::drop).
1564 fn mark_token_cancelled(&mut self, token: u64, kind: ScheduleKind) {
1565 self.cancelled_tokens.insert(token);
1566 self.cancelled_token_kinds.insert(token, kind);
1567 }
1568
1569 /// Get cancelled activity schedule_ids (tokens that were bound and then dropped).
1570 fn get_cancelled_activity_ids(&self) -> Vec<u64> {
1571 let mut ids = Vec::new();
1572 for &token in &self.cancelled_tokens {
1573 if let Some(kind) = self.cancelled_token_kinds.get(&token)
1574 && matches!(kind, ScheduleKind::Activity { .. })
1575 && let Some(&schedule_id) = self.token_bindings.get(&token)
1576 {
1577 ids.push(schedule_id);
1578 }
1579 }
1580 ids
1581 }
1582
1583 /// Get cancelled sub-orchestration cancellations.
1584 ///
1585 /// Returns `(scheduling_event_id, child_instance_id)` for sub-orchestration futures that were
1586 /// bound (schedule_id assigned) and then dropped.
1587 fn get_cancelled_sub_orchestration_cancellations(&self) -> Vec<(u64, String)> {
1588 let mut cancels = Vec::new();
1589 for &token in &self.cancelled_tokens {
1590 if let Some(ScheduleKind::SubOrchestration { token: sub_token }) = self.cancelled_token_kinds.get(&token)
1591 && let Some(&schedule_id) = self.token_bindings.get(&token)
1592 {
1593 // Look up the resolved instance ID from our mapping
1594 if let Some(instance_id) = self.sub_orchestration_instances.get(sub_token) {
1595 cancels.push((schedule_id, instance_id.clone()));
1596 }
1597 // If not in mapping, the action wasn't bound yet - nothing to cancel
1598 }
1599 }
1600 cancels
1601 }
1602
1603 /// Bind a sub-orchestration token to its resolved instance ID.
1604 fn bind_sub_orchestration_instance(&mut self, token: u64, instance_id: String) {
1605 self.sub_orchestration_instances.insert(token, instance_id);
1606 }
1607
1608 /// Clear cancelled tokens (called after turn completion to avoid re-processing).
1609 fn clear_cancelled_tokens(&mut self) {
1610 self.cancelled_tokens.clear();
1611 self.cancelled_token_kinds.clear();
1612 }
1613
1614 // Note: deterministic GUID generation was removed from public API.
1615}
1616
1617/// User-facing orchestration context for scheduling and replay-safe helpers.
1618/// Context provided to activities for logging and metadata access.
1619///
1620/// Unlike [`OrchestrationContext`], activities are leaf nodes that cannot schedule new work,
1621/// but they often need to emit structured logs and inspect orchestration metadata. The
1622/// `ActivityContext` exposes the parent orchestration information and trace helpers that log
1623/// with full correlation fields.
1624///
1625/// # Examples
1626///
1627/// ```rust,no_run
1628/// # use duroxide::ActivityContext;
1629/// # use duroxide::runtime::registry::ActivityRegistry;
1630/// let activities = ActivityRegistry::builder()
1631/// .register("ProvisionVM", |ctx: ActivityContext, config: String| async move {
1632/// ctx.trace_info(format!("Provisioning VM with config: {}", config));
1633///
1634/// // Do actual work (can use sleep, HTTP, etc.)
1635/// let vm_id = provision_vm_internal(config).await?;
1636///
1637/// ctx.trace_info(format!("VM provisioned: {}", vm_id));
1638/// Ok(vm_id)
1639/// })
1640/// .build();
1641/// # async fn provision_vm_internal(config: String) -> Result<String, String> { Ok("vm-123".to_string()) }
1642/// ```
1643///
1644/// # Metadata Access
1645///
1646/// Activity context provides access to orchestration correlation metadata:
1647/// - `instance_id()` - Orchestration instance identifier
1648/// - `execution_id()` - Execution number (for ContinueAsNew scenarios)
1649/// - `orchestration_name()` - Parent orchestration name
1650/// - `orchestration_version()` - Parent orchestration version
1651/// - `activity_name()` - Current activity name
1652///
1653/// # Cancellation Support
1654///
1655/// Activities can respond to cancellation when their parent orchestration reaches a terminal state:
1656/// - `is_cancelled()` - Check if cancellation has been requested
1657/// - `cancelled()` - Future that completes when cancellation is requested (for use with `tokio::select!`)
1658/// - `cancellation_token()` - Get a clone of the token for spawned tasks
1659///
1660/// # Determinism
1661///
1662/// Activity trace helpers (`trace_info`, `trace_warn`, etc.) do **not** participate in
1663/// deterministic replay. They emit logs directly using [`tracing`] and should only be used for
1664/// diagnostic purposes.
1665#[derive(Clone)]
1666pub struct ActivityContext {
1667 instance_id: String,
1668 execution_id: u64,
1669 orchestration_name: String,
1670 orchestration_version: String,
1671 activity_name: String,
1672 activity_id: u64,
1673 worker_id: String,
1674 /// Optional session ID when scheduled via `schedule_activity_on_session`.
1675 session_id: Option<String>,
1676 /// Cancellation token for cooperative cancellation.
1677 /// Triggered when the parent orchestration reaches a terminal state.
1678 cancellation_token: tokio_util::sync::CancellationToken,
1679 /// Provider store for accessing the Client API.
1680 store: std::sync::Arc<dyn crate::providers::Provider>,
1681}
1682
1683impl ActivityContext {
1684 /// Create a new activity context. This constructor is intended for internal runtime use.
1685 ///
1686 /// Creates context with a new (non-cancelled) cancellation token.
1687 /// Note: The runtime now uses `new_with_cancellation` to provide a shared token.
1688 #[allow(dead_code)]
1689 #[allow(clippy::too_many_arguments)]
1690 pub(crate) fn new(
1691 instance_id: String,
1692 execution_id: u64,
1693 orchestration_name: String,
1694 orchestration_version: String,
1695 activity_name: String,
1696 activity_id: u64,
1697 worker_id: String,
1698 store: std::sync::Arc<dyn crate::providers::Provider>,
1699 ) -> Self {
1700 Self {
1701 instance_id,
1702 execution_id,
1703 orchestration_name,
1704 orchestration_version,
1705 activity_name,
1706 activity_id,
1707 worker_id,
1708 session_id: None,
1709 cancellation_token: tokio_util::sync::CancellationToken::new(),
1710 store,
1711 }
1712 }
1713
1714 /// Create a new activity context with a specific cancellation token.
1715 ///
1716 /// This constructor is intended for internal runtime use when the worker
1717 /// dispatcher needs to provide a cancellation token that can be triggered
1718 /// during activity execution.
1719 #[allow(clippy::too_many_arguments)]
1720 pub(crate) fn new_with_cancellation(
1721 instance_id: String,
1722 execution_id: u64,
1723 orchestration_name: String,
1724 orchestration_version: String,
1725 activity_name: String,
1726 activity_id: u64,
1727 worker_id: String,
1728 session_id: Option<String>,
1729 cancellation_token: tokio_util::sync::CancellationToken,
1730 store: std::sync::Arc<dyn crate::providers::Provider>,
1731 ) -> Self {
1732 Self {
1733 instance_id,
1734 execution_id,
1735 orchestration_name,
1736 orchestration_version,
1737 activity_name,
1738 activity_id,
1739 worker_id,
1740 session_id,
1741 cancellation_token,
1742 store,
1743 }
1744 }
1745
1746 /// Returns the orchestration instance identifier.
1747 pub fn instance_id(&self) -> &str {
1748 &self.instance_id
1749 }
1750
1751 /// Returns the execution id within the orchestration instance.
1752 pub fn execution_id(&self) -> u64 {
1753 self.execution_id
1754 }
1755
1756 /// Returns the parent orchestration name.
1757 pub fn orchestration_name(&self) -> &str {
1758 &self.orchestration_name
1759 }
1760
1761 /// Returns the parent orchestration version.
1762 pub fn orchestration_version(&self) -> &str {
1763 &self.orchestration_version
1764 }
1765
1766 /// Returns the activity name being executed.
1767 pub fn activity_name(&self) -> &str {
1768 &self.activity_name
1769 }
1770
1771 /// Returns the worker dispatcher ID processing this activity.
1772 pub fn worker_id(&self) -> &str {
1773 &self.worker_id
1774 }
1775
1776 /// Returns the session ID if this activity was scheduled via `schedule_activity_on_session`.
1777 ///
1778 /// Returns `None` for regular activities scheduled via `schedule_activity`.
1779 pub fn session_id(&self) -> Option<&str> {
1780 self.session_id.as_deref()
1781 }
1782
1783 /// Emit an INFO level trace entry associated with this activity.
1784 pub fn trace_info(&self, message: impl Into<String>) {
1785 tracing::info!(
1786 target: "duroxide::activity",
1787 instance_id = %self.instance_id,
1788 execution_id = %self.execution_id,
1789 orchestration_name = %self.orchestration_name,
1790 orchestration_version = %self.orchestration_version,
1791 activity_name = %self.activity_name,
1792 activity_id = %self.activity_id,
1793 worker_id = %self.worker_id,
1794 "{}",
1795 message.into()
1796 );
1797 }
1798
1799 /// Emit a WARN level trace entry associated with this activity.
1800 pub fn trace_warn(&self, message: impl Into<String>) {
1801 tracing::warn!(
1802 target: "duroxide::activity",
1803 instance_id = %self.instance_id,
1804 execution_id = %self.execution_id,
1805 orchestration_name = %self.orchestration_name,
1806 orchestration_version = %self.orchestration_version,
1807 activity_name = %self.activity_name,
1808 activity_id = %self.activity_id,
1809 worker_id = %self.worker_id,
1810 "{}",
1811 message.into()
1812 );
1813 }
1814
1815 /// Emit an ERROR level trace entry associated with this activity.
1816 pub fn trace_error(&self, message: impl Into<String>) {
1817 tracing::error!(
1818 target: "duroxide::activity",
1819 instance_id = %self.instance_id,
1820 execution_id = %self.execution_id,
1821 orchestration_name = %self.orchestration_name,
1822 orchestration_version = %self.orchestration_version,
1823 activity_name = %self.activity_name,
1824 activity_id = %self.activity_id,
1825 worker_id = %self.worker_id,
1826 "{}",
1827 message.into()
1828 );
1829 }
1830
1831 /// Emit a DEBUG level trace entry associated with this activity.
1832 pub fn trace_debug(&self, message: impl Into<String>) {
1833 tracing::debug!(
1834 target: "duroxide::activity",
1835 instance_id = %self.instance_id,
1836 execution_id = %self.execution_id,
1837 orchestration_name = %self.orchestration_name,
1838 orchestration_version = %self.orchestration_version,
1839 activity_name = %self.activity_name,
1840 activity_id = %self.activity_id,
1841 worker_id = %self.worker_id,
1842 "{}",
1843 message.into()
1844 );
1845 }
1846
1847 // ===== Cancellation Support =====
1848
1849 /// Check if cancellation has been requested.
1850 ///
1851 /// Returns `true` if the parent orchestration has completed, failed,
1852 /// or been cancelled. Activities can use this for cooperative cancellation.
1853 ///
1854 /// # Example
1855 ///
1856 /// ```ignore
1857 /// for item in items {
1858 /// if ctx.is_cancelled() {
1859 /// return Err("Activity cancelled".into());
1860 /// }
1861 /// process(item).await;
1862 /// }
1863 /// ```
1864 pub fn is_cancelled(&self) -> bool {
1865 self.cancellation_token.is_cancelled()
1866 }
1867
1868 /// Returns a future that completes when cancellation is requested.
1869 ///
1870 /// Use with `tokio::select!` for interruptible activities. This allows
1871 /// activities to respond promptly to cancellation without polling.
1872 ///
1873 /// # Example
1874 ///
1875 /// ```ignore
1876 /// tokio::select! {
1877 /// result = do_work() => return result,
1878 /// _ = ctx.cancelled() => return Err("Cancelled".into()),
1879 /// }
1880 /// ```
1881 pub async fn cancelled(&self) {
1882 self.cancellation_token.cancelled().await
1883 }
1884
1885 /// Get a clone of the cancellation token for use in spawned tasks.
1886 ///
1887 /// If your activity spawns child tasks with `tokio::spawn()`, you should
1888 /// pass them this token so they can also respond to cancellation.
1889 ///
1890 /// **Important:** If you spawn additional tasks/threads and do not pass them
1891 /// the cancellation token, they may outlive the activity's cancellation/abort.
1892 /// This is user error - the runtime provides the signal but cannot guarantee
1893 /// termination of arbitrary spawned work.
1894 ///
1895 /// # Example
1896 ///
1897 /// ```ignore
1898 /// let token = ctx.cancellation_token();
1899 /// let handle = tokio::spawn(async move {
1900 /// loop {
1901 /// tokio::select! {
1902 /// _ = do_work() => {}
1903 /// _ = token.cancelled() => break,
1904 /// }
1905 /// }
1906 /// });
1907 /// ```
1908 pub fn cancellation_token(&self) -> tokio_util::sync::CancellationToken {
1909 self.cancellation_token.clone()
1910 }
1911
1912 /// Get a Client for management operations.
1913 ///
1914 /// This allows activities to perform management operations such as
1915 /// pruning old executions, deleting instances, or querying instance status.
1916 ///
1917 /// # Example: Self-Pruning Eternal Orchestration
1918 ///
1919 /// ```ignore
1920 /// // Activity that prunes old executions
1921 /// async fn prune_self(ctx: ActivityContext, _input: String) -> Result<String, String> {
1922 /// let client = ctx.get_client();
1923 /// let instance_id = ctx.instance_id();
1924 ///
1925 /// let result = client.prune_executions(instance_id, PruneOptions {
1926 /// keep_last: Some(1), // Keep only current execution
1927 /// ..Default::default()
1928 /// }).await.map_err(|e| e.to_string())?;
1929 ///
1930 /// Ok(format!("Pruned {} executions", result.executions_deleted))
1931 /// }
1932 /// ```
1933 pub fn get_client(&self) -> crate::Client {
1934 crate::Client::new(self.store.clone())
1935 }
1936}
1937
1938impl std::fmt::Debug for ActivityContext {
1939 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1940 f.debug_struct("ActivityContext")
1941 .field("instance_id", &self.instance_id)
1942 .field("execution_id", &self.execution_id)
1943 .field("orchestration_name", &self.orchestration_name)
1944 .field("orchestration_version", &self.orchestration_version)
1945 .field("activity_name", &self.activity_name)
1946 .field("activity_id", &self.activity_id)
1947 .field("worker_id", &self.worker_id)
1948 .field("cancellation_token", &self.cancellation_token)
1949 .field("store", &"<Provider>")
1950 .finish()
1951 }
1952}
1953
1954#[derive(Clone)]
1955pub struct OrchestrationContext {
1956 inner: Arc<Mutex<CtxInner>>,
1957}
1958
1959/// A future that never resolves, used by `continue_as_new()` to prevent further execution.
1960///
1961/// This future always returns `Poll::Pending`, ensuring that code after `await ctx.continue_as_new()`
1962/// is unreachable. The runtime extracts actions before checking the future's state, so the
1963/// `ContinueAsNew` action is properly recorded and processed.
1964struct ContinueAsNewFuture;
1965
1966impl Future for ContinueAsNewFuture {
1967 type Output = Result<String, String>; // Matches orchestration return type, but never resolves
1968
1969 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
1970 // Always pending - never resolves, making code after await unreachable
1971 // The runtime checks pending_actions before using the output, so this value is never used
1972 Poll::Pending
1973 }
1974}
1975
1976impl OrchestrationContext {
1977 /// Construct a new context from an existing history vector.
1978 ///
1979 /// # Parameters
1980 ///
1981 /// * `orchestration_name` - The name of the orchestration being executed.
1982 /// * `orchestration_version` - The semantic version string of the orchestration.
1983 /// * `worker_id` - Optional dispatcher worker ID for logging correlation.
1984 /// - `Some(id)`: Used by runtime dispatchers to include worker_id in traces
1985 /// - `None`: Used by standalone/test execution without runtime context
1986 pub fn new(
1987 history: Vec<Event>,
1988 execution_id: u64,
1989 instance_id: String,
1990 orchestration_name: String,
1991 orchestration_version: String,
1992 worker_id: Option<String>,
1993 ) -> Self {
1994 Self {
1995 inner: Arc::new(Mutex::new(CtxInner::new(
1996 history,
1997 execution_id,
1998 instance_id,
1999 orchestration_name,
2000 orchestration_version,
2001 worker_id,
2002 ))),
2003 }
2004 }
2005
2006 /// Check if the orchestration is currently replaying history.
2007 ///
2008 /// Returns `true` when processing events from persisted history (replay),
2009 /// and `false` when executing new logic beyond the stored history.
2010 ///
2011 /// This is useful for skipping side effects during replay, such as:
2012 /// - Logging/tracing that should only happen on first execution
2013 /// - Metrics that shouldn't be double-counted
2014 /// - External notifications that shouldn't be re-sent
2015 ///
2016 /// # Example
2017 ///
2018 /// ```rust,no_run
2019 /// # use duroxide::OrchestrationContext;
2020 /// # async fn example(ctx: OrchestrationContext) {
2021 /// if !ctx.is_replaying() {
2022 /// // Only log on first execution, not during replay
2023 /// println!("Starting workflow for the first time");
2024 /// }
2025 /// # }
2026 /// ```
2027 pub fn is_replaying(&self) -> bool {
2028 self.inner.lock().unwrap().is_replaying
2029 }
2030
2031 /// Set the replaying state (used by replay engine and test harness).
2032 #[doc(hidden)]
2033 pub fn set_is_replaying(&self, is_replaying: bool) {
2034 self.inner.lock().unwrap().is_replaying = is_replaying;
2035 }
2036
2037 /// Bind an external subscription to a schedule_id (used by replay engine and test harness).
2038 #[doc(hidden)]
2039 pub fn bind_external_subscription(&self, schedule_id: u64, name: &str) {
2040 self.inner.lock().unwrap().bind_external_subscription(schedule_id, name);
2041 }
2042
2043 /// Deliver an external event (used by replay engine and test harness).
2044 #[doc(hidden)]
2045 pub fn deliver_external_event(&self, name: String, data: String) {
2046 self.inner.lock().unwrap().deliver_external_event(name, data);
2047 }
2048
2049 // =========================================================================
2050 // Cancellation Support (DurableFuture integration)
2051 // =========================================================================
2052
2053 /// Mark a token as cancelled (called by DurableFuture::drop).
2054 pub(crate) fn mark_token_cancelled(&self, token: u64, kind: &ScheduleKind) {
2055 self.inner.lock().unwrap().mark_token_cancelled(token, kind.clone());
2056 }
2057
2058 /// Get cancelled activity schedule_ids for this turn.
2059 pub(crate) fn get_cancelled_activity_ids(&self) -> Vec<u64> {
2060 self.inner.lock().unwrap().get_cancelled_activity_ids()
2061 }
2062
2063 /// Get cancelled sub-orchestration cancellations for this turn.
2064 pub(crate) fn get_cancelled_sub_orchestration_cancellations(&self) -> Vec<(u64, String)> {
2065 self.inner
2066 .lock()
2067 .unwrap()
2068 .get_cancelled_sub_orchestration_cancellations()
2069 }
2070
2071 /// Clear cancelled tokens after processing (called by replay engine).
2072 pub(crate) fn clear_cancelled_tokens(&self) {
2073 self.inner.lock().unwrap().clear_cancelled_tokens();
2074 }
2075
2076 /// Bind a sub-orchestration token to its resolved instance ID.
2077 pub(crate) fn bind_sub_orchestration_instance(&self, token: u64, instance_id: String) {
2078 self.inner
2079 .lock()
2080 .unwrap()
2081 .bind_sub_orchestration_instance(token, instance_id);
2082 }
2083
2084 // =========================================================================
2085 // Simplified Mode Tracing (Replay-Guarded)
2086 // =========================================================================
2087 //
2088 // These trace methods use `is_replaying()` as a guard, which means:
2089 // - No history events are created for traces
2090 // - Traces only emit on first execution, not during replay
2091 // - Much simpler and more efficient than system-call-based tracing
2092
2093 /// Convenience wrapper for INFO level tracing.
2094 ///
2095 /// Logs with INFO level and includes instance context automatically.
2096 /// Only emits on first execution, not during replay.
2097 ///
2098 /// # Example
2099 ///
2100 /// ```rust,no_run
2101 /// # use duroxide::OrchestrationContext;
2102 /// # async fn example(ctx: OrchestrationContext) {
2103 /// ctx.trace_info("Processing order");
2104 /// ctx.trace_info(format!("Processing {} items", 42));
2105 /// # }
2106 /// ```
2107 pub fn trace_info(&self, message: impl Into<String>) {
2108 self.trace("INFO", message);
2109 }
2110
2111 /// Convenience wrapper for WARN level tracing.
2112 ///
2113 /// Logs with WARN level and includes instance context automatically.
2114 /// Only emits on first execution, not during replay.
2115 ///
2116 /// # Example
2117 ///
2118 /// ```rust,no_run
2119 /// # use duroxide::OrchestrationContext;
2120 /// # async fn example(ctx: OrchestrationContext) {
2121 /// ctx.trace_warn("Retrying failed operation");
2122 /// # }
2123 /// ```
2124 pub fn trace_warn(&self, message: impl Into<String>) {
2125 self.trace("WARN", message);
2126 }
2127
2128 /// Convenience wrapper for ERROR level tracing.
2129 ///
2130 /// Logs with ERROR level and includes instance context automatically.
2131 /// Only emits on first execution, not during replay.
2132 ///
2133 /// # Example
2134 ///
2135 /// ```rust,no_run
2136 /// # use duroxide::OrchestrationContext;
2137 /// # async fn example(ctx: OrchestrationContext) {
2138 /// ctx.trace_error("Payment processing failed");
2139 /// # }
2140 /// ```
2141 pub fn trace_error(&self, message: impl Into<String>) {
2142 self.trace("ERROR", message);
2143 }
2144
2145 /// Convenience wrapper for DEBUG level tracing.
2146 ///
2147 /// Logs with DEBUG level and includes instance context automatically.
2148 /// Only emits on first execution, not during replay.
2149 ///
2150 /// # Example
2151 ///
2152 /// ```rust,no_run
2153 /// # use duroxide::OrchestrationContext;
2154 /// # async fn example(ctx: OrchestrationContext) {
2155 /// ctx.trace_debug("Detailed state information");
2156 /// # }
2157 /// ```
2158 pub fn trace_debug(&self, message: impl Into<String>) {
2159 self.trace("DEBUG", message);
2160 }
2161
2162 /// Drain emitted actions.
2163 /// Returns a list of (token, Action) pairs.
2164 #[doc(hidden)]
2165 pub fn drain_emitted_actions(&self) -> Vec<(u64, Action)> {
2166 self.inner.lock().unwrap().drain_emitted_actions()
2167 }
2168
2169 /// Get a snapshot of emitted actions without draining.
2170 /// Returns a list of (token, Action) pairs.
2171 #[doc(hidden)]
2172 pub fn get_emitted_actions(&self) -> Vec<(u64, Action)> {
2173 self.inner.lock().unwrap().emitted_actions.clone()
2174 }
2175
2176 /// Bind a token to a schedule_id.
2177 #[doc(hidden)]
2178 pub fn bind_token(&self, token: u64, schedule_id: u64) {
2179 self.inner.lock().unwrap().bind_token(token, schedule_id);
2180 }
2181
2182 /// Deliver a result for a token.
2183 #[doc(hidden)]
2184 pub fn deliver_result(&self, schedule_id: u64, result: CompletionResult) {
2185 self.inner.lock().unwrap().deliver_result(schedule_id, result);
2186 }
2187
2188 /// Returns the orchestration instance identifier.
2189 ///
2190 /// This is the unique identifier for this orchestration instance, typically
2191 /// provided when starting the orchestration.
2192 ///
2193 /// # Example
2194 ///
2195 /// ```rust,no_run
2196 /// # use duroxide::OrchestrationContext;
2197 /// # async fn example(ctx: OrchestrationContext) {
2198 /// let id = ctx.instance_id();
2199 /// ctx.trace_info(format!("Processing instance: {}", id));
2200 /// # }
2201 /// ```
2202 pub fn instance_id(&self) -> String {
2203 self.inner.lock().unwrap().instance_id.clone()
2204 }
2205
2206 /// Returns the current execution ID within this orchestration instance.
2207 ///
2208 /// The execution ID increments each time `continue_as_new()` is called.
2209 /// Execution 1 is the initial execution.
2210 ///
2211 /// # Example
2212 ///
2213 /// ```rust,no_run
2214 /// # use duroxide::OrchestrationContext;
2215 /// # async fn example(ctx: OrchestrationContext) {
2216 /// let exec_id = ctx.execution_id();
2217 /// ctx.trace_info(format!("Execution #{}", exec_id));
2218 /// # }
2219 /// ```
2220 pub fn execution_id(&self) -> u64 {
2221 self.inner.lock().unwrap().execution_id
2222 }
2223
2224 /// Returns the orchestration name.
2225 ///
2226 /// This is the name registered with the orchestration registry.
2227 ///
2228 /// # Example
2229 ///
2230 /// ```rust,no_run
2231 /// # use duroxide::OrchestrationContext;
2232 /// # async fn example(ctx: OrchestrationContext) {
2233 /// let name = ctx.orchestration_name();
2234 /// ctx.trace_info(format!("Running orchestration: {}", name));
2235 /// # }
2236 /// ```
2237 pub fn orchestration_name(&self) -> String {
2238 self.inner.lock().unwrap().orchestration_name.clone()
2239 }
2240
2241 /// Returns the orchestration version.
2242 ///
2243 /// This is the semantic version string associated with the orchestration.
2244 ///
2245 /// # Example
2246 ///
2247 /// ```rust,no_run
2248 /// # use duroxide::OrchestrationContext;
2249 /// # async fn example(ctx: OrchestrationContext) {
2250 /// let version = ctx.orchestration_version();
2251 /// ctx.trace_info(format!("Version: {}", version));
2252 /// # }
2253 /// ```
2254 pub fn orchestration_version(&self) -> String {
2255 self.inner.lock().unwrap().orchestration_version.clone()
2256 }
2257
2258 // Replay-safe logging control
2259 /// Indicates whether logging is enabled for the current poll. This is
2260 /// flipped on when a decision is recorded to minimize log noise.
2261 pub fn is_logging_enabled(&self) -> bool {
2262 self.inner.lock().unwrap().logging_enabled_this_poll
2263 }
2264 // log_buffer removed - not used
2265
2266 /// Emit a structured trace entry with automatic context correlation.
2267 ///
2268 /// Creates a system call event for deterministic replay and logs to tracing.
2269 /// The log entry automatically includes correlation fields:
2270 /// - `instance_id` - The orchestration instance identifier
2271 /// - `execution_id` - The current execution number
2272 /// - `orchestration_name` - Name of the orchestration
2273 /// - `orchestration_version` - Semantic version
2274 ///
2275 /// # Determinism
2276 ///
2277 /// This method is replay-safe: logs are only emitted on first execution,
2278 /// not during replay.
2279 ///
2280 /// # Example
2281 ///
2282 /// ```rust,no_run
2283 /// # use duroxide::OrchestrationContext;
2284 /// # async fn example(ctx: OrchestrationContext) {
2285 /// ctx.trace("INFO", "Processing started");
2286 /// ctx.trace("WARN", format!("Retry attempt: {}", 3));
2287 /// ctx.trace("ERROR", "Payment validation failed");
2288 /// # }
2289 /// ```
2290 ///
2291 /// # Output
2292 ///
2293 /// ```text
2294 /// 2024-10-30T10:15:23.456Z INFO duroxide::orchestration [order-123] Processing started
2295 /// ```
2296 ///
2297 /// All logs include instance_id, execution_id, orchestration_name for correlation.
2298 pub fn trace(&self, level: impl Into<String>, message: impl Into<String>) {
2299 self.trace_internal(&level.into(), &message.into());
2300 }
2301
2302 /// Internal implementation of trace (guarded by is_replaying)
2303 fn trace_internal(&self, level: &str, message: &str) {
2304 let inner = self.inner.lock().unwrap();
2305
2306 // Only trace if not replaying
2307 if !inner.is_replaying {
2308 match level.to_uppercase().as_str() {
2309 "INFO" => tracing::info!(
2310 target: "duroxide::orchestration",
2311 instance_id = %inner.instance_id,
2312 execution_id = %inner.execution_id,
2313 orchestration_name = %inner.orchestration_name,
2314 orchestration_version = %inner.orchestration_version,
2315 "{}",
2316 message
2317 ),
2318 "WARN" => tracing::warn!(
2319 target: "duroxide::orchestration",
2320 instance_id = %inner.instance_id,
2321 execution_id = %inner.execution_id,
2322 orchestration_name = %inner.orchestration_name,
2323 orchestration_version = %inner.orchestration_version,
2324 "{}",
2325 message
2326 ),
2327 "ERROR" => tracing::error!(
2328 target: "duroxide::orchestration",
2329 instance_id = %inner.instance_id,
2330 execution_id = %inner.execution_id,
2331 orchestration_name = %inner.orchestration_name,
2332 orchestration_version = %inner.orchestration_version,
2333 "{}",
2334 message
2335 ),
2336 "DEBUG" => tracing::debug!(
2337 target: "duroxide::orchestration",
2338 instance_id = %inner.instance_id,
2339 execution_id = %inner.execution_id,
2340 orchestration_name = %inner.orchestration_name,
2341 orchestration_version = %inner.orchestration_version,
2342 "{}",
2343 message
2344 ),
2345 _ => tracing::trace!(
2346 target: "duroxide::orchestration",
2347 instance_id = %inner.instance_id,
2348 execution_id = %inner.execution_id,
2349 orchestration_name = %inner.orchestration_name,
2350 orchestration_version = %inner.orchestration_version,
2351 level = %level,
2352 "{}",
2353 message
2354 ),
2355 }
2356 }
2357 }
2358
2359 /// Generate a new deterministic GUID.
2360 ///
2361 /// This schedules a built-in activity that generates a unique identifier.
2362 /// The GUID is deterministic across replays (the same value is returned
2363 /// when the orchestration replays).
2364 ///
2365 /// # Example
2366 ///
2367 /// ```rust,no_run
2368 /// # use duroxide::OrchestrationContext;
2369 /// # async fn example(ctx: OrchestrationContext) -> Result<(), String> {
2370 /// let guid = ctx.new_guid().await?;
2371 /// println!("Generated GUID: {}", guid);
2372 /// # Ok(())
2373 /// # }
2374 /// ```
2375 pub fn new_guid(&self) -> impl Future<Output = Result<String, String>> {
2376 self.schedule_activity(SYSCALL_ACTIVITY_NEW_GUID, "")
2377 }
2378
2379 /// Get the current UTC time.
2380 ///
2381 /// This schedules a built-in activity that returns the current time.
2382 /// The time is deterministic across replays (the same value is returned
2383 /// when the orchestration replays).
2384 ///
2385 /// # Errors
2386 ///
2387 /// Returns an error if the activity fails or if the time value cannot be parsed.
2388 ///
2389 /// # Example
2390 ///
2391 /// ```rust,no_run
2392 /// # use duroxide::OrchestrationContext;
2393 /// # use std::time::{SystemTime, Duration};
2394 /// # async fn example(ctx: OrchestrationContext) -> Result<(), String> {
2395 /// let now = ctx.utc_now().await?;
2396 /// let deadline = now + Duration::from_secs(3600); // 1 hour from now
2397 /// # Ok(())
2398 /// # }
2399 /// ```
2400 pub fn utc_now(&self) -> impl Future<Output = Result<SystemTime, String>> {
2401 let fut = self.schedule_activity(SYSCALL_ACTIVITY_UTC_NOW_MS, "");
2402 async move {
2403 let s = fut.await?;
2404 let ms = s.parse::<u64>().map_err(|e| e.to_string())?;
2405 Ok(UNIX_EPOCH + StdDuration::from_millis(ms))
2406 }
2407 }
2408
2409 /// Continue the current execution as a new execution with fresh input.
2410 ///
2411 /// This terminates the current execution and starts a new execution with the provided input.
2412 /// Returns a future that never resolves, ensuring code after `await` is unreachable.
2413 ///
2414 /// # Example
2415 /// ```rust,no_run
2416 /// # use duroxide::OrchestrationContext;
2417 /// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
2418 /// let n: u32 = 0;
2419 /// if n < 2 {
2420 /// return ctx.continue_as_new("next_input").await; // Execution terminates here
2421 /// // This code is unreachable - compiler will warn
2422 /// }
2423 /// Ok("completed".to_string())
2424 /// # }
2425 /// ```
2426 pub fn continue_as_new(&self, input: impl Into<String>) -> impl Future<Output = Result<String, String>> {
2427 let mut inner = self.inner.lock().unwrap();
2428 let input: String = input.into();
2429 let action = Action::ContinueAsNew { input, version: None };
2430
2431 inner.emit_action(action);
2432 ContinueAsNewFuture
2433 }
2434
2435 pub fn continue_as_new_typed<In: serde::Serialize>(
2436 &self,
2437 input: &In,
2438 ) -> impl Future<Output = Result<String, String>> {
2439 // Serialization should never fail for valid input types - if it does, it's a programming error
2440 let payload =
2441 crate::_typed_codec::Json::encode(input).expect("Serialization should never fail for valid input");
2442 self.continue_as_new(payload)
2443 }
2444
2445 /// ContinueAsNew to a specific target version (string is parsed as semver later).
2446 pub fn continue_as_new_versioned(
2447 &self,
2448 version: impl Into<String>,
2449 input: impl Into<String>,
2450 ) -> impl Future<Output = Result<String, String>> {
2451 let mut inner = self.inner.lock().unwrap();
2452 let action = Action::ContinueAsNew {
2453 input: input.into(),
2454 version: Some(version.into()),
2455 };
2456 inner.emit_action(action);
2457 ContinueAsNewFuture
2458 }
2459}
2460
2461/// Generate a deterministic GUID for use in orchestrations.
2462///
2463/// Uses timestamp + thread-local counter for uniqueness.
2464pub(crate) fn generate_guid() -> String {
2465 use std::time::{SystemTime, UNIX_EPOCH};
2466
2467 let timestamp = SystemTime::now()
2468 .duration_since(UNIX_EPOCH)
2469 .map(|d| d.as_nanos())
2470 .unwrap_or(0);
2471
2472 // Thread-local counter for uniqueness within the same timestamp
2473 thread_local! {
2474 static COUNTER: std::cell::Cell<u32> = const { std::cell::Cell::new(0) };
2475 }
2476 let counter = COUNTER.with(|c| {
2477 let val = c.get();
2478 c.set(val.wrapping_add(1));
2479 val
2480 });
2481
2482 // Format as UUID-like string
2483 format!(
2484 "{:08x}-{:04x}-{:04x}-{:04x}-{:012x}",
2485 (timestamp >> 96) as u32,
2486 ((timestamp >> 80) & 0xFFFF) as u16,
2487 (counter & 0xFFFF) as u16,
2488 ((timestamp >> 64) & 0xFFFF) as u16,
2489 (timestamp & 0xFFFFFFFFFFFF) as u64
2490 )
2491}
2492
2493impl OrchestrationContext {
2494 /// Schedule activity with automatic retry on failure.
2495 ///
2496 /// **Retry behavior:**
2497 /// - Retries on activity **errors** up to `policy.max_attempts`
2498 /// - **Timeouts are NOT retried** - if any attempt times out, returns error immediately
2499 /// - Only application errors trigger retry logic
2500 ///
2501 /// **Timeout behavior (if `policy.total_timeout` is set):**
2502 /// - Each activity attempt is raced against the timeout
2503 /// - If the timeout fires before the activity completes → returns timeout error (no retry)
2504 /// - If the activity fails with an error before timeout → retry according to policy
2505 ///
2506 /// # Example
2507 ///
2508 /// ```rust,no_run
2509 /// # use duroxide::{OrchestrationContext, RetryPolicy, BackoffStrategy};
2510 /// # use std::time::Duration;
2511 /// # async fn example(ctx: OrchestrationContext) -> Result<(), String> {
2512 /// // Simple retry with defaults (no timeout)
2513 /// let result = ctx.schedule_activity_with_retry(
2514 /// "CallAPI",
2515 /// "request",
2516 /// RetryPolicy::new(3),
2517 /// ).await?;
2518 ///
2519 /// // Retry with per-attempt timeout and custom backoff
2520 /// let result = ctx.schedule_activity_with_retry(
2521 /// "CallAPI",
2522 /// "request",
2523 /// RetryPolicy::new(5)
2524 /// .with_timeout(Duration::from_secs(30)) // 30s per attempt
2525 /// .with_backoff(BackoffStrategy::Fixed { delay: Duration::from_secs(1) }),
2526 /// ).await?;
2527 /// # Ok(())
2528 /// # }
2529 /// ```
2530 ///
2531 /// # Errors
2532 ///
2533 /// Returns an error if all retry attempts fail or if a timeout occurs (timeouts are not retried).
2534 pub async fn schedule_activity_with_retry(
2535 &self,
2536 name: impl Into<String>,
2537 input: impl Into<String>,
2538 policy: RetryPolicy,
2539 ) -> Result<String, String> {
2540 let name = name.into();
2541 let input = input.into();
2542 let mut last_error = String::new();
2543
2544 for attempt in 1..=policy.max_attempts {
2545 // Each attempt: optionally race against per-attempt timeout
2546 let activity_result = if let Some(timeout) = policy.timeout {
2547 // Race activity vs per-attempt timeout
2548 let deadline = async {
2549 self.schedule_timer(timeout).await;
2550 Err::<String, String>("timeout: activity timed out".to_string())
2551 };
2552 let activity = self.schedule_activity(&name, &input);
2553
2554 match self.select2(activity, deadline).await {
2555 Either2::First(result) => result,
2556 Either2::Second(Err(e)) => {
2557 // Timeout fired - exit immediately, no retry for timeouts
2558 return Err(e);
2559 }
2560 Either2::Second(Ok(_)) => unreachable!(),
2561 }
2562 } else {
2563 // No timeout - just await the activity
2564 self.schedule_activity(&name, &input).await
2565 };
2566
2567 match activity_result {
2568 Ok(result) => return Ok(result),
2569 Err(e) => {
2570 // Activity failed with error - apply retry policy
2571 last_error = e.clone();
2572 if attempt < policy.max_attempts {
2573 self.trace(
2574 "warn",
2575 format!(
2576 "Activity '{}' attempt {}/{} failed: {}. Retrying...",
2577 name, attempt, policy.max_attempts, e
2578 ),
2579 );
2580 let delay = policy.delay_for_attempt(attempt);
2581 if !delay.is_zero() {
2582 self.schedule_timer(delay).await;
2583 }
2584 }
2585 }
2586 }
2587 }
2588 Err(last_error)
2589 }
2590
2591 /// Typed variant of `schedule_activity_with_retry`.
2592 ///
2593 /// Serializes input once and deserializes the successful result.
2594 ///
2595 /// # Errors
2596 ///
2597 /// Returns an error if all retry attempts fail, if a timeout occurs, if input serialization fails, or if result deserialization fails.
2598 pub async fn schedule_activity_with_retry_typed<In: serde::Serialize, Out: serde::de::DeserializeOwned>(
2599 &self,
2600 name: impl Into<String>,
2601 input: &In,
2602 policy: RetryPolicy,
2603 ) -> Result<Out, String> {
2604 let payload = crate::_typed_codec::Json::encode(input).expect("encode");
2605 let result = self.schedule_activity_with_retry(name, payload, policy).await?;
2606 crate::_typed_codec::Json::decode::<Out>(&result)
2607 }
2608
2609 /// Schedule a detached orchestration with an explicit instance id.
2610 /// The runtime will prefix this with the parent instance to ensure global uniqueness.
2611 pub fn schedule_orchestration(
2612 &self,
2613 name: impl Into<String>,
2614 instance: impl Into<String>,
2615 input: impl Into<String>,
2616 ) {
2617 let name: String = name.into();
2618 let instance: String = instance.into();
2619 let input: String = input.into();
2620 let mut inner = self.inner.lock().unwrap();
2621
2622 let _ = inner.emit_action(Action::StartOrchestrationDetached {
2623 scheduling_event_id: 0, // Will be assigned by replay engine
2624 name,
2625 version: None,
2626 instance,
2627 input,
2628 });
2629 }
2630
2631 pub fn schedule_orchestration_typed<In: serde::Serialize>(
2632 &self,
2633 name: impl Into<String>,
2634 instance: impl Into<String>,
2635 input: &In,
2636 ) {
2637 let payload = crate::_typed_codec::Json::encode(input).expect("encode");
2638 self.schedule_orchestration(name, instance, payload)
2639 }
2640
2641 /// Versioned detached orchestration start (string I/O). If `version` is None, registry policy is used for the child.
2642 pub fn schedule_orchestration_versioned(
2643 &self,
2644 name: impl Into<String>,
2645 version: Option<String>,
2646 instance: impl Into<String>,
2647 input: impl Into<String>,
2648 ) {
2649 let name: String = name.into();
2650 let instance: String = instance.into();
2651 let input: String = input.into();
2652 let mut inner = self.inner.lock().unwrap();
2653
2654 let _ = inner.emit_action(Action::StartOrchestrationDetached {
2655 scheduling_event_id: 0, // Will be assigned by replay engine
2656 name,
2657 version,
2658 instance,
2659 input,
2660 });
2661 }
2662
2663 pub fn schedule_orchestration_versioned_typed<In: serde::Serialize>(
2664 &self,
2665 name: impl Into<String>,
2666 version: Option<String>,
2667 instance: impl Into<String>,
2668 input: &In,
2669 ) {
2670 let payload = crate::_typed_codec::Json::encode(input).expect("encode");
2671 self.schedule_orchestration_versioned(name, version, instance, payload)
2672 }
2673}
2674
2675// Aggregate future machinery lives below (OrchestrationContext helpers)
2676
2677impl OrchestrationContext {
2678 // =========================================================================
2679 // Core scheduling methods - return DurableFuture with cancellation support
2680 // =========================================================================
2681
2682 /// Schedule an activity and return a cancellation-aware future.
2683 ///
2684 /// Returns a [`DurableFuture`] that supports cancellation on drop. If the future
2685 /// is dropped without completing (e.g., as a select loser), the activity will be
2686 /// cancelled via lock stealing.
2687 ///
2688 /// # Example
2689 ///
2690 /// ```rust,no_run
2691 /// # use duroxide::OrchestrationContext;
2692 /// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
2693 /// // Fan-out to multiple activities
2694 /// let f1 = ctx.schedule_activity("Process", "A");
2695 /// let f2 = ctx.schedule_activity("Process", "B");
2696 /// let results = ctx.join(vec![f1, f2]).await;
2697 /// # Ok("done".to_string())
2698 /// # }
2699 /// ```
2700 pub fn schedule_activity(
2701 &self,
2702 name: impl Into<String>,
2703 input: impl Into<String>,
2704 ) -> DurableFuture<Result<String, String>> {
2705 self.schedule_activity_internal(name, input, None)
2706 }
2707
2708 /// Typed version of schedule_activity that serializes input and deserializes output.
2709 ///
2710 /// # Errors
2711 ///
2712 /// Returns an error if the activity fails or if the output cannot be deserialized.
2713 pub fn schedule_activity_typed<In: serde::Serialize, Out: serde::de::DeserializeOwned>(
2714 &self,
2715 name: impl Into<String>,
2716 input: &In,
2717 ) -> impl Future<Output = Result<Out, String>> {
2718 let payload = crate::_typed_codec::Json::encode(input).expect("encode");
2719 let fut = self.schedule_activity(name, payload);
2720 async move {
2721 let s = fut.await?;
2722 crate::_typed_codec::Json::decode::<Out>(&s)
2723 }
2724 }
2725
2726 /// Schedule an activity routed to the worker owning the given session.
2727 ///
2728 /// If no worker owns the session, any worker can claim it on first fetch.
2729 /// Once claimed, all subsequent activities with the same `session_id` route
2730 /// to the claiming worker until the session unpins (idle timeout or worker death).
2731 ///
2732 /// # Example
2733 ///
2734 /// ```rust,no_run
2735 /// # use duroxide::OrchestrationContext;
2736 /// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
2737 /// let session_id = ctx.new_guid().await?;
2738 /// let result = ctx.schedule_activity_on_session("run_turn", "input", &session_id).await?;
2739 /// # Ok(result)
2740 /// # }
2741 /// ```
2742 pub fn schedule_activity_on_session(
2743 &self,
2744 name: impl Into<String>,
2745 input: impl Into<String>,
2746 session_id: impl Into<String>,
2747 ) -> DurableFuture<Result<String, String>> {
2748 self.schedule_activity_internal(name, input, Some(session_id.into()))
2749 }
2750
2751 /// Typed version of schedule_activity_on_session that serializes input and deserializes output.
2752 ///
2753 /// # Errors
2754 ///
2755 /// Returns an error if the activity fails or if the output cannot be deserialized.
2756 pub fn schedule_activity_on_session_typed<In: serde::Serialize, Out: serde::de::DeserializeOwned>(
2757 &self,
2758 name: impl Into<String>,
2759 input: &In,
2760 session_id: impl Into<String>,
2761 ) -> impl Future<Output = Result<Out, String>> {
2762 let payload = crate::_typed_codec::Json::encode(input).expect("encode");
2763 let fut = self.schedule_activity_on_session(name, payload, session_id);
2764 async move {
2765 let s = fut.await?;
2766 crate::_typed_codec::Json::decode::<Out>(&s)
2767 }
2768 }
2769
2770 /// Internal implementation for activity scheduling.
2771 fn schedule_activity_internal(
2772 &self,
2773 name: impl Into<String>,
2774 input: impl Into<String>,
2775 session_id: Option<String>,
2776 ) -> DurableFuture<Result<String, String>> {
2777 let name: String = name.into();
2778 let input: String = input.into();
2779
2780 let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
2781
2782 let token = inner.emit_action(Action::CallActivity {
2783 scheduling_event_id: 0, // Will be assigned by replay engine
2784 name: name.clone(),
2785 input: input.clone(),
2786 session_id,
2787 });
2788 drop(inner);
2789
2790 let ctx = self.clone();
2791 let inner_future = std::future::poll_fn(move |_cx| {
2792 let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
2793 if let Some(result) = inner.get_result(token) {
2794 match result {
2795 CompletionResult::ActivityOk(s) => Poll::Ready(Ok(s.clone())),
2796 CompletionResult::ActivityErr(e) => Poll::Ready(Err(e.clone())),
2797 _ => Poll::Pending, // Wrong result type, keep waiting
2798 }
2799 } else {
2800 Poll::Pending
2801 }
2802 });
2803
2804 DurableFuture::new(token, ScheduleKind::Activity { name }, self.clone(), inner_future)
2805 }
2806
2807 /// Schedule a timer and return a cancellation-aware future.
2808 ///
2809 /// Timers are virtual constructs - dropping the future is a no-op since there's
2810 /// no external state to cancel. However, wrapping in `DurableFuture` maintains
2811 /// API consistency.
2812 pub fn schedule_timer(&self, delay: std::time::Duration) -> DurableFuture<()> {
2813 let delay_ms = delay.as_millis() as u64;
2814
2815 let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
2816
2817 let now = inner.now_ms();
2818 let fire_at_ms = now.saturating_add(delay_ms);
2819 let token = inner.emit_action(Action::CreateTimer {
2820 scheduling_event_id: 0,
2821 fire_at_ms,
2822 });
2823 drop(inner);
2824
2825 let ctx = self.clone();
2826 let inner_future = std::future::poll_fn(move |_cx| {
2827 let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
2828 if let Some(result) = inner.get_result(token) {
2829 match result {
2830 CompletionResult::TimerFired => Poll::Ready(()),
2831 _ => Poll::Pending,
2832 }
2833 } else {
2834 Poll::Pending
2835 }
2836 });
2837
2838 DurableFuture::new(token, ScheduleKind::Timer, self.clone(), inner_future)
2839 }
2840
2841 /// Subscribe to an external event and return a cancellation-aware future.
2842 ///
2843 /// External waits are virtual constructs - dropping the future is a no-op since
2844 /// there's no external state to cancel. However, wrapping in `DurableFuture`
2845 /// maintains API consistency.
2846 pub fn schedule_wait(&self, name: impl Into<String>) -> DurableFuture<String> {
2847 let name: String = name.into();
2848
2849 let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
2850
2851 let token = inner.emit_action(Action::WaitExternal {
2852 scheduling_event_id: 0,
2853 name: name.clone(),
2854 });
2855 drop(inner);
2856
2857 let ctx = self.clone();
2858 let inner_future = std::future::poll_fn(move |_cx| {
2859 let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
2860 // Only resolve once the token has been bound to a persisted schedule_id.
2861 // External events arriving before subscription binding are currently unsupported.
2862 if let Some(bound_id) = inner.get_bound_schedule_id(token)
2863 && let Some(data) = inner.get_external_event(bound_id)
2864 {
2865 return Poll::Ready(data.clone());
2866 }
2867 Poll::Pending
2868 });
2869
2870 DurableFuture::new(
2871 token,
2872 ScheduleKind::ExternalWait { event_name: name },
2873 self.clone(),
2874 inner_future,
2875 )
2876 }
2877
2878 /// Typed version of schedule_wait.
2879 pub fn schedule_wait_typed<T: serde::de::DeserializeOwned>(
2880 &self,
2881 name: impl Into<String>,
2882 ) -> impl Future<Output = T> {
2883 let fut = self.schedule_wait(name);
2884 async move {
2885 let s = fut.await;
2886 crate::_typed_codec::Json::decode::<T>(&s).expect("decode")
2887 }
2888 }
2889
2890 /// V2: Subscribe to an external event with topic-based pub/sub matching.
2891 ///
2892 /// Same semantics as `schedule_wait`, but matches on both `name` AND `topic`.
2893 /// Feature-gated for replay engine extensibility verification.
2894 #[cfg(feature = "replay-version-test")]
2895 pub fn schedule_wait2(&self, name: impl Into<String>, topic: impl Into<String>) -> DurableFuture<String> {
2896 let name: String = name.into();
2897 let topic: String = topic.into();
2898
2899 let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
2900
2901 let token = inner.emit_action(Action::WaitExternal2 {
2902 scheduling_event_id: 0,
2903 name: name.clone(),
2904 topic: topic.clone(),
2905 });
2906 drop(inner);
2907
2908 let ctx = self.clone();
2909 let inner_future = std::future::poll_fn(move |_cx| {
2910 let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
2911 if let Some(bound_id) = inner.get_bound_schedule_id(token)
2912 && let Some(data) = inner.get_external_event2(bound_id)
2913 {
2914 return Poll::Ready(data.clone());
2915 }
2916 Poll::Pending
2917 });
2918
2919 DurableFuture::new(
2920 token,
2921 ScheduleKind::ExternalWait { event_name: name },
2922 self.clone(),
2923 inner_future,
2924 )
2925 }
2926
2927 /// Schedule a sub-orchestration and return a cancellation-aware future.
2928 ///
2929 /// The child instance ID is auto-generated from the event ID with a parent prefix.
2930 ///
2931 /// Returns a [`DurableFuture`] that supports cancellation on drop. If the future
2932 /// is dropped without completing, a `CancelInstance` work item will be enqueued
2933 /// for the child orchestration.
2934 pub fn schedule_sub_orchestration(
2935 &self,
2936 name: impl Into<String>,
2937 input: impl Into<String>,
2938 ) -> DurableFuture<Result<String, String>> {
2939 self.schedule_sub_orchestration_versioned_with_id_internal(name, None, None, input)
2940 }
2941
2942 /// Schedule a sub-orchestration with an explicit instance ID.
2943 ///
2944 /// The provided `instance` value is used exactly as the child instance ID,
2945 /// without any parent prefix. Use this when you need to control the exact
2946 /// instance ID for the sub-orchestration.
2947 ///
2948 /// For auto-generated instance IDs, use [`schedule_sub_orchestration`] instead.
2949 pub fn schedule_sub_orchestration_with_id(
2950 &self,
2951 name: impl Into<String>,
2952 instance: impl Into<String>,
2953 input: impl Into<String>,
2954 ) -> DurableFuture<Result<String, String>> {
2955 self.schedule_sub_orchestration_versioned_with_id_internal(name, None, Some(instance.into()), input)
2956 }
2957
2958 /// Schedule a versioned sub-orchestration.
2959 ///
2960 /// If `version` is `Some`, that specific version is used.
2961 /// If `version` is `None`, the registry's policy (e.g., Latest) is used.
2962 pub fn schedule_sub_orchestration_versioned(
2963 &self,
2964 name: impl Into<String>,
2965 version: Option<String>,
2966 input: impl Into<String>,
2967 ) -> DurableFuture<Result<String, String>> {
2968 self.schedule_sub_orchestration_versioned_with_id_internal(name, version, None, input)
2969 }
2970
2971 /// Schedule a versioned sub-orchestration with an explicit instance ID.
2972 ///
2973 /// The provided `instance` value is used exactly as the child instance ID,
2974 /// without any parent prefix.
2975 ///
2976 /// Returns a [`DurableFuture`] that supports cancellation on drop. If the future
2977 /// is dropped without completing, a `CancelInstance` work item will be enqueued
2978 /// for the child orchestration.
2979 pub fn schedule_sub_orchestration_versioned_with_id(
2980 &self,
2981 name: impl Into<String>,
2982 version: Option<String>,
2983 instance: impl Into<String>,
2984 input: impl Into<String>,
2985 ) -> DurableFuture<Result<String, String>> {
2986 self.schedule_sub_orchestration_versioned_with_id_internal(name, version, Some(instance.into()), input)
2987 }
2988
2989 /// Internal implementation for sub-orchestration scheduling.
2990 ///
2991 /// If `instance` is `Some`, it's an explicit ID (no parent prefix).
2992 /// If `instance` is `None`, auto-generate from event ID (with parent prefix).
2993 fn schedule_sub_orchestration_versioned_with_id_internal(
2994 &self,
2995 name: impl Into<String>,
2996 version: Option<String>,
2997 instance: Option<String>,
2998 input: impl Into<String>,
2999 ) -> DurableFuture<Result<String, String>> {
3000 let name: String = name.into();
3001 let input: String = input.into();
3002
3003 let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
3004
3005 // For explicit instance IDs, use them as-is (no parent prefix will be added).
3006 // For auto-generated, use placeholder that will be replaced with SUB_ORCH_AUTO_PREFIX + event_id
3007 // and parent prefix will be added.
3008 let action_instance = match &instance {
3009 Some(explicit_id) => explicit_id.clone(),
3010 None => format!("{}{}", SUB_ORCH_PENDING_PREFIX, inner.next_token + 1),
3011 };
3012 let token = inner.emit_action(Action::StartSubOrchestration {
3013 scheduling_event_id: 0,
3014 name: name.clone(),
3015 version,
3016 instance: action_instance.clone(),
3017 input: input.clone(),
3018 });
3019 drop(inner);
3020
3021 let ctx = self.clone();
3022 let inner_future = std::future::poll_fn(move |_cx| {
3023 let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
3024 if let Some(result) = inner.get_result(token) {
3025 match result {
3026 CompletionResult::SubOrchOk(s) => Poll::Ready(Ok(s.clone())),
3027 CompletionResult::SubOrchErr(e) => Poll::Ready(Err(e.clone())),
3028 _ => Poll::Pending,
3029 }
3030 } else {
3031 Poll::Pending
3032 }
3033 });
3034
3035 // For cancellation, we store the token. The consumption path will look up
3036 // the resolved instance ID from the sub_orchestration_instances mapping.
3037 DurableFuture::new(
3038 token,
3039 ScheduleKind::SubOrchestration { token },
3040 self.clone(),
3041 inner_future,
3042 )
3043 }
3044
3045 /// Typed version of schedule_sub_orchestration.
3046 ///
3047 /// # Errors
3048 ///
3049 /// Returns an error if the sub-orchestration fails or if the output cannot be deserialized.
3050 pub fn schedule_sub_orchestration_typed<In: serde::Serialize, Out: serde::de::DeserializeOwned>(
3051 &self,
3052 name: impl Into<String>,
3053 input: &In,
3054 ) -> impl Future<Output = Result<Out, String>> {
3055 let payload = crate::_typed_codec::Json::encode(input).expect("encode");
3056 let fut = self.schedule_sub_orchestration(name, payload);
3057 async move {
3058 let s = fut.await?;
3059 crate::_typed_codec::Json::decode::<Out>(&s)
3060 }
3061 }
3062
3063 /// Typed version of schedule_sub_orchestration_with_id.
3064 ///
3065 /// # Errors
3066 ///
3067 /// Returns an error if the sub-orchestration fails or if the output cannot be deserialized.
3068 pub fn schedule_sub_orchestration_with_id_typed<In: serde::Serialize, Out: serde::de::DeserializeOwned>(
3069 &self,
3070 name: impl Into<String>,
3071 instance: impl Into<String>,
3072 input: &In,
3073 ) -> impl Future<Output = Result<Out, String>> {
3074 let payload = crate::_typed_codec::Json::encode(input).expect("encode");
3075 let fut = self.schedule_sub_orchestration_with_id(name, instance, payload);
3076 async move {
3077 let s = fut.await?;
3078 crate::_typed_codec::Json::decode::<Out>(&s)
3079 }
3080 }
3081
3082 /// Await all futures concurrently using `futures::future::join_all`.
3083 /// Works with any `Future` type.
3084 pub async fn join<T, F>(&self, futures: Vec<F>) -> Vec<T>
3085 where
3086 F: Future<Output = T>,
3087 {
3088 ::futures::future::join_all(futures).await
3089 }
3090
3091 /// Simplified join for exactly 2 futures (convenience method).
3092 pub async fn join2<T1, T2, F1, F2>(&self, f1: F1, f2: F2) -> (T1, T2)
3093 where
3094 F1: Future<Output = T1>,
3095 F2: Future<Output = T2>,
3096 {
3097 ::futures::future::join(f1, f2).await
3098 }
3099
3100 /// Simplified join for exactly 3 futures (convenience method).
3101 pub async fn join3<T1, T2, T3, F1, F2, F3>(&self, f1: F1, f2: F2, f3: F3) -> (T1, T2, T3)
3102 where
3103 F1: Future<Output = T1>,
3104 F2: Future<Output = T2>,
3105 F3: Future<Output = T3>,
3106 {
3107 ::futures::future::join3(f1, f2, f3).await
3108 }
3109
3110 /// Simplified select over 2 futures: returns the result of whichever completes first.
3111 /// Select over 2 futures with potentially different output types.
3112 ///
3113 /// Returns `Either2::First(result)` if first future wins, `Either2::Second(result)` if second wins.
3114 /// Uses futures::select_biased! for determinism (first branch polled first).
3115 ///
3116 /// # Example: Activity with timeout
3117 /// ```rust,no_run
3118 /// # use duroxide::{OrchestrationContext, Either2};
3119 /// # use std::time::Duration;
3120 /// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
3121 /// let work = ctx.schedule_activity("SlowWork", "input");
3122 /// let timeout = ctx.schedule_timer(Duration::from_secs(30));
3123 ///
3124 /// match ctx.select2(work, timeout).await {
3125 /// Either2::First(result) => result,
3126 /// Either2::Second(()) => Err("Operation timed out".to_string()),
3127 /// }
3128 /// # }
3129 /// ```
3130 pub async fn select2<T1, T2, F1, F2>(&self, f1: F1, f2: F2) -> Either2<T1, T2>
3131 where
3132 F1: Future<Output = T1>,
3133 F2: Future<Output = T2>,
3134 {
3135 use ::futures::FutureExt;
3136 let mut f1 = std::pin::pin!(f1.fuse());
3137 let mut f2 = std::pin::pin!(f2.fuse());
3138 ::futures::select_biased! {
3139 result = f1 => Either2::First(result),
3140 result = f2 => Either2::Second(result),
3141 }
3142 }
3143
3144 /// Select over 3 futures with potentially different output types.
3145 ///
3146 /// Returns `Either3::First/Second/Third(result)` depending on which future completes first.
3147 /// Uses futures::select_biased! for determinism (earlier branches polled first).
3148 pub async fn select3<T1, T2, T3, F1, F2, F3>(&self, f1: F1, f2: F2, f3: F3) -> Either3<T1, T2, T3>
3149 where
3150 F1: Future<Output = T1>,
3151 F2: Future<Output = T2>,
3152 F3: Future<Output = T3>,
3153 {
3154 use ::futures::FutureExt;
3155 let mut f1 = std::pin::pin!(f1.fuse());
3156 let mut f2 = std::pin::pin!(f2.fuse());
3157 let mut f3 = std::pin::pin!(f3.fuse());
3158 ::futures::select_biased! {
3159 result = f1 => Either3::First(result),
3160 result = f2 => Either3::Second(result),
3161 result = f3 => Either3::Third(result),
3162 }
3163 }
3164}