Skip to main content

duroxide/
lib.rs

1//! # Duroxide: Durable execution framework in Rust
2//!
3//! Duroxide is a framework for building reliable, long-running code based workflows that can survive
4//! failures and restarts. For a deep dive into how durable execution works, see the
5//! [Durable Futures Internals](https://github.com/affandar/duroxide/blob/main/docs/durable-futures-internals.md) documentation.
6//!
7//! ## Quick Start
8//!
9//! ```rust,no_run
10//! use duroxide::providers::sqlite::SqliteProvider;
11//! use duroxide::runtime::registry::ActivityRegistry;
12//! use duroxide::runtime::{self};
13//! use duroxide::{ActivityContext, OrchestrationContext, OrchestrationRegistry, Client};
14//! use std::sync::Arc;
15//!
16//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
17//! // 1. Create a storage provider
18//! let store = Arc::new(SqliteProvider::new("sqlite:./data.db", None).await.unwrap());
19//!
20//! // 2. Register activities (your business logic)
21//! let activities = ActivityRegistry::builder()
22//!     .register("Greet", |_ctx: ActivityContext, name: String| async move {
23//!         Ok(format!("Hello, {}!", name))
24//!     })
25//!     .build();
26//!
27//! // 3. Define your orchestration
28//! let orchestration = |ctx: OrchestrationContext, name: String| async move {
29//!     let greeting = ctx.schedule_activity("Greet", name).await?;
30
31// Mutex poisoning indicates a panic in another thread - a critical error.
32// All expect()/unwrap() calls on mutex locks in this module are intentional:
33// poisoned mutexes should panic as they indicate corrupted state.
34#![allow(clippy::expect_used)]
35#![allow(clippy::unwrap_used)]
36// Arc::clone() vs .clone() is a style preference - we use .clone() for brevity
37#![allow(clippy::clone_on_ref_ptr)]
38//!     Ok(greeting)
39//! };
40//!
41//! // 4. Register and start the runtime
42//! let orchestrations = OrchestrationRegistry::builder()
43//!     .register("HelloWorld", orchestration)
44//!     .build();
45//!
46//! let rt = runtime::Runtime::start_with_store(
47//!     store.clone(), activities, orchestrations
48//! ).await;
49//!
50//! // 5. Create a client and start an orchestration instance
51//! let client = Client::new(store.clone());
52//! client.start_orchestration("inst-1", "HelloWorld", "World").await?;
53//! let result = client.wait_for_orchestration("inst-1", std::time::Duration::from_secs(5)).await
54//!     .map_err(|e| format!("Wait error: {:?}", e))?;
55//! # Ok(())
56//! # }
57//! ```
58//!
59//! ## Key Concepts
60//!
61//! - **Orchestrations**: Long-running workflows written as async functions (coordination logic)
62//! - **Activities**: Single-purpose work units (can do anything - DB, API, polling, etc.)
63//!   - Supports long-running activities via automatic lock renewal (minutes to hours)
64//! - **Timers**: Use `ctx.schedule_timer(ms)` for orchestration-level delays and timeouts
65//! - **Deterministic Replay**: Orchestrations are replayed from history to ensure consistency
66//! - **Durable Futures**: Composable futures for activities, timers, and external events
67//! - **ContinueAsNew (Multi-Execution)**: An orchestration can end the current execution and
68//!   immediately start a new one with fresh input. Each execution has its own isolated history
69//!   that starts with `OrchestrationStarted { event_id: 1 }`.
70//!
71//! ## ⚠️ Important: Orchestrations vs Activities
72//!
73//! **Orchestrations = Coordination (control flow, business logic)**
74//! **Activities = Execution (single-purpose work units)**
75//!
76//! ```rust,no_run
77//! # use duroxide::OrchestrationContext;
78//! # use std::time::Duration;
79//! # async fn example(ctx: OrchestrationContext) -> Result<(), String> {
80//! // ✅ CORRECT: Orchestration-level delay using timer
81//! ctx.schedule_timer(Duration::from_secs(5)).await;  // Wait 5 seconds
82//!
83//! // ✅ ALSO CORRECT: Activity can poll/sleep as part of its work
84//! // Example: Activity that provisions a VM and polls for readiness
85//! // activities.register("ProvisionVM", |config| async move {
86//! //     let vm = create_vm(config).await?;
87//! //     while !vm_ready(&vm).await {
88//! //         tokio::time::sleep(Duration::from_secs(5)).await;  // ✅ OK - part of provisioning
89//! //     }
90//! //     Ok(vm.id)
91//! // });
92//!
93//! // ❌ WRONG: Activity that ONLY sleeps (use timer instead)
94//! // ctx.schedule_activity("Sleep5Seconds", "").await;
95//! # Ok(())
96//! # }
97//! ```
98//!
99//! **Put in Activities (single-purpose execution units):**
100//! - Database operations
101//! - API calls (can include retries/polling)
102//! - Data transformations
103//! - File I/O
104//! - VM provisioning (with internal polling)
105//!
106//! **Put in Orchestrations (coordination and business logic):**
107//! - Control flow (if/else, match, loops)
108//! - Business decisions
109//! - Multi-step workflows
110//! - Error handling and compensation
111//! - Timeouts and deadlines (use timers)
112//! - Waiting for external events
113//!
114//! ## ContinueAsNew (Multi-Execution) Semantics
115//!
116//! ContinueAsNew (CAN) allows an orchestration to end its current execution and start a new
117//! one with fresh input (useful for loops, pagination, long-running workflows).
118//!
119//! - Orchestration calls `ctx.continue_as_new(new_input)`
120//! - Runtime stamps `OrchestrationContinuedAsNew` in the CURRENT execution's history
121//! - Runtime enqueues a `WorkItem::ContinueAsNew`
122//! - When processing that work item, the runtime starts a NEW execution with:
123//!   - `execution_id = previous_execution_id + 1`
124//!   - `existing_history = []` (fresh history)
125//!   - `OrchestrationStarted { event_id: 1, input = new_input }` is stamped automatically
126//! - Each execution's history is independent; `duroxide::Client::read_execution_history(instance, id)`
127//!   returns events for that execution only
128//!
129//! Provider responsibilities are strictly storage-level (see below). The runtime owns all
130//! orchestration semantics, including execution boundaries and starting the new execution.
131//!
132//! ## Provider Responsibilities (At a Glance)
133//!
134//! Providers are pure storage abstractions. The runtime computes orchestration semantics
135//! and passes explicit instructions to the provider.
136//!
137//! - `fetch_orchestration_item()`
138//!   - Return a locked batch of work for ONE instance
139//!   - Include full history for the CURRENT `execution_id`
140//!   - Do NOT create/synthesize new executions here (even for ContinueAsNew)
141//!
142//! - `ack_orchestration_item(lock_token, execution_id, history_delta, ..., metadata)`
143//!   - Atomic commit of one orchestration turn
144//!   - Idempotently `INSERT OR IGNORE` execution row for the explicit `execution_id`
145//!   - `UPDATE instances.current_execution_id = MAX(current_execution_id, execution_id)`
146//!   - Append `history_delta` to the specified execution
147//!   - Update `executions.status` and `executions.output` from `metadata` (no event inspection)
148//!
149//! - Worker/Timer queues
150//!   - Peek-lock semantics (dequeue with lock token; ack by deleting)
151//!   - Automatic lock renewal for long-running activities (no configuration needed)
152//!   - Orchestrator, Worker, Timer queues are independent but committed atomically with history
153//!
154//! See `docs/provider-implementation-guide.md` and `src/providers/sqlite.rs` for a complete,
155//! production-grade provider implementation.
156//!
157//! ## Simplified API
158//!
159//! All schedule methods return typed futures that can be awaited directly:
160//!
161//! ```rust,no_run
162//! # use duroxide::OrchestrationContext;
163//! # use std::time::Duration;
164//! # async fn example(ctx: OrchestrationContext) -> Result<(), String> {
165//! // Activities return Result<String, String>
166//! let result = ctx.schedule_activity("Task", "input").await?;
167//!
168//! // Timers return ()
169//! ctx.schedule_timer(Duration::from_secs(5)).await;
170//!
171//! // External events return String
172//! let event = ctx.schedule_wait("Event").await;
173//!
174//! // Sub-orchestrations return Result<String, String>
175//! let sub_result = ctx.schedule_sub_orchestration("Sub", "input").await?;
176//! # Ok(())
177//! # }
178//! ```
179//!
180//! ## Common Patterns
181//!
182//! ### Function Chaining
183//! ```rust,no_run
184//! # use duroxide::OrchestrationContext;
185//! async fn chain_example(ctx: OrchestrationContext) -> Result<String, String> {
186//!     let step1 = ctx.schedule_activity("Step1", "input").await?;
187//!     let step2 = ctx.schedule_activity("Step2", &step1).await?;
188//!     Ok(step2)
189//! }
190//! ```
191//!
192//! ### Fan-Out/Fan-In
193//! ```rust,no_run
194//! # use duroxide::OrchestrationContext;
195//! async fn fanout_example(ctx: OrchestrationContext) -> Vec<String> {
196//!     let futures = vec![
197//!         ctx.schedule_activity("Process", "item1"),
198//!         ctx.schedule_activity("Process", "item2"),
199//!         ctx.schedule_activity("Process", "item3"),
200//!     ];
201//!     let results = ctx.join(futures).await;
202//!     results.into_iter().filter_map(|r| r.ok()).collect()
203//! }
204//! ```
205//!
206//! ### Human-in-the-Loop (Timeout Pattern)
207//! ```rust,no_run
208//! # use duroxide::{OrchestrationContext, Either2};
209//! # use std::time::Duration;
210//! async fn approval_example(ctx: OrchestrationContext) -> String {
211//!     let timeout = ctx.schedule_timer(Duration::from_secs(30));
212//!     let approval = ctx.schedule_wait("ApprovalEvent");
213//!     
214//!     match ctx.select2(approval, timeout).await {
215//!         Either2::First(data) => data,
216//!         Either2::Second(()) => "timeout".to_string(),
217//!     }
218//! }
219//! ```
220//!
221//! ### Delays and Timeouts
222//! ```rust,no_run
223//! # use duroxide::{OrchestrationContext, Either2};
224//! # use std::time::Duration;
225//! async fn delay_example(ctx: OrchestrationContext) -> Result<String, String> {
226//!     // Use timer for orchestration-level delays
227//!     ctx.schedule_timer(Duration::from_secs(5)).await;
228//!     
229//!     // Process after delay
230//!     let result = ctx.schedule_activity("ProcessData", "input").await?;
231//!     Ok(result)
232//! }
233//!
234//! async fn timeout_example(ctx: OrchestrationContext) -> Result<String, String> {
235//!     // Race work against timeout
236//!     let work = ctx.schedule_activity("SlowOperation", "input");
237//!     let timeout = ctx.schedule_timer(Duration::from_secs(30));
238//!     
239//!     match ctx.select2(work, timeout).await {
240//!         Either2::First(result) => result,
241//!         Either2::Second(()) => Err("Operation timed out".to_string()),
242//!     }
243//! }
244//! ```
245//!
246//! ### Fan-Out/Fan-In with Error Handling
247//! ```rust,no_run
248//! # use duroxide::OrchestrationContext;
249//! async fn fanout_with_errors(ctx: OrchestrationContext, items: Vec<String>) -> Result<Vec<String>, String> {
250//!     // Schedule all work in parallel
251//!     let futures: Vec<_> = items.iter()
252//!         .map(|item| ctx.schedule_activity("ProcessItem", item.clone()))
253//!         .collect();
254//!     
255//!     // Wait for all to complete (deterministic order preserved)
256//!     let results = ctx.join(futures).await;
257//!     
258//!     // Process results with error handling
259//!     let mut successes = Vec::new();
260//!     for result in results {
261//!         match result {
262//!             Ok(value) => successes.push(value),
263//!             Err(e) => {
264//!                 // Log error but continue processing other items
265//!                 ctx.trace_error(format!("Item processing failed: {e}"));
266//!             }
267//!         }
268//!     }
269//!     
270//!     Ok(successes)
271//! }
272//! ```
273//!
274//! ### Retry Pattern
275//! ```rust,no_run
276//! # use duroxide::{OrchestrationContext, RetryPolicy, BackoffStrategy};
277//! # use std::time::Duration;
278//! async fn retry_example(ctx: OrchestrationContext) -> Result<String, String> {
279//!     // Retry with linear backoff: 5 attempts, delay increases linearly (1s, 2s, 3s, 4s)
280//!     let result = ctx.schedule_activity_with_retry(
281//!         "UnreliableOperation",
282//!         "input",
283//!         RetryPolicy::new(5)
284//!             .with_backoff(BackoffStrategy::Linear {
285//!                 base: Duration::from_secs(1),
286//!                 max: Duration::from_secs(10),
287//!             }),
288//!     ).await?;
289//!     
290//!     Ok(result)
291//! }
292//! ```
293//!
294//! ## Examples
295//!
296//! See the `examples/` directory for complete, runnable examples:
297//! - `hello_world.rs` - Basic orchestration setup
298//! - `fan_out_fan_in.rs` - Parallel processing pattern with error handling
299//! - `timers_and_events.rs` - Human-in-the-loop workflows with timeouts
300//! - `delays_and_timeouts.rs` - Correct usage of timers for delays and timeouts
301//! - `with_observability.rs` - Using observability features (tracing, metrics)
302//! - `metrics_cli.rs` - Querying system metrics via CLI
303//!
304//! Run examples with: `cargo run --example <name>`
305//!
306//! ## Architecture
307//!
308//! This crate provides:
309//! - **Public data model**: `Event`, `Action` for history and decisions
310//! - **Orchestration driver**: `run_turn`, `run_turn_with`, and `Executor`
311//! - **OrchestrationContext**: Schedule activities, timers, and external events
312//! - **Deterministic futures**: `schedule_*()` return standard `Future`s that can be composed with `join`/`select`
313//! - **Runtime**: In-process execution engine with dispatchers and workers
314//! - **Providers**: Pluggable storage backends (filesystem, in-memory)
315//!
316//! ### End-to-End System Architecture
317//!
318//! ```text
319//! +-------------------------------------------------------------------------+
320//! |                           Application Layer                             |
321//! +-------------------------------------------------------------------------+
322//! |                                                                         |
323//! |  +--------------+         +------------------------------------+        |
324//! |  |    Client    |-------->|  start_orchestration()             |        |
325//! |  |              |         |  raise_event()                     |        |
326//! |  |              |         |  wait_for_orchestration()          |        |
327//! |  +--------------+         +------------------------------------+        |
328//! |                                                                         |
329//! +-------------------------------------------------------------------------+
330//!                                    |
331//!                                    v
332//! +-------------------------------------------------------------------------+
333//! |                            Runtime Layer                                |
334//! +-------------------------------------------------------------------------+
335//! |                                                                         |
336//! |  +-------------------------------------------------------------------+  |
337//! |  |                         Runtime                                   |  |
338//! |  |  +----------------------+         +----------------------+        |  |
339//! |  |  | Orchestration        |         | Work                 |        |  |
340//! |  |  | Dispatcher           |         | Dispatcher           |        |  |
341//! |  |  | (N concurrent)       |         | (N concurrent)       |        |  |
342//! |  |  +----------+-----------+         +----------+-----------+        |  |
343//! |  |             |                                |                    |  |
344//! |  |             | Processes turns                | Executes activities|  |
345//! |  |             |                                |                    |  |
346//! |  +-------------+--------------------------------+--------------------+  |
347//! |                |                                |                       |
348//! |  +-------------v--------------------------------v--------------------+  |
349//! |  |  OrchestrationRegistry: maps names -> orchestration handlers     |  |
350//! |  +-------------------------------------------------------------------+  |
351//! |                                                                         |
352//! |  +-------------------------------------------------------------------+  |
353//! |  |  ActivityRegistry: maps names -> activity handlers               |  |
354//! |  +-------------------------------------------------------------------+  |
355//! |                                                                         |
356//! +-------------------------------------------------------------------------+
357//!                |                                |
358//!                | Fetches work items             | Fetches work items
359//!                | (peek-lock)                    | (peek-lock)
360//!                v                                v
361//! +-------------------------------------------------------------------------+
362//! |                          Provider Layer                                 |
363//! +-------------------------------------------------------------------------+
364//! |                                                                         |
365//! |  +----------------------------+    +----------------------------+       |
366//! |  |  Orchestrator Queue        |    |  Worker Queue              |       |
367//! |  |  - StartOrchestration      |    |  - ActivityExecute         |       |
368//! |  |  - ActivityCompleted       |    |                            |       |
369//! |  |  - ActivityFailed          |    |                            |       |
370//! |  |  - TimerFired (delayed)    |    |                            |       |
371//! |  |  - ExternalRaised          |    |                            |       |
372//! |  |  - ContinueAsNew           |    |                            |       |
373//! |  +----------------------------+    +----------------------------+       |
374//! |                                                                         |
375//! |  +-------------------------------------------------------------------+  |
376//! |  |                     Provider (Storage)                            |  |
377//! |  |  - History (Events per instance/execution)                        |  |
378//! |  |  - Instance metadata                                              |  |
379//! |  |  - Execution metadata                                             |  |
380//! |  |  - Instance locks (peek-lock semantics)                           |  |
381//! |  |  - Queue management (enqueue/dequeue with visibility)             |  |
382//! |  +-------------------------------------------------------------------+  |
383//! |                                                                         |
384//! |  +-------------------------------------------------------------------+  |
385//! |  |                Storage Backend (SQLite, etc.)                     |  |
386//! |  +-------------------------------------------------------------------+  |
387//! |                                                                         |
388//! +-------------------------------------------------------------------------+
389//!
390//! ### Execution Flow
391//!
392//! 1. **Client** starts orchestration → enqueues `StartOrchestration` to orchestrator queue
393//! 2. **OrchestrationDispatcher** fetches work item (peek-lock), loads history from Provider
394//! 3. **Runtime** calls user's orchestration function with `OrchestrationContext`
395//! 4. **Orchestration** schedules activities/timers → Runtime appends `Event`s to history
396//! 5. **Runtime** enqueues `ActivityExecute` to worker queue, `TimerFired` (delayed) to orchestrator queue
397//! 6. **WorkDispatcher** fetches activity work item, executes via `ActivityRegistry`
398//! 7. **Activity** completes → enqueues `ActivityCompleted`/`ActivityFailed` to orchestrator queue
399//! 8. **OrchestrationDispatcher** processes completion → next orchestration turn
400//! 9. **Runtime** atomically commits history + queue changes via `ack_orchestration_item()`
401//!
402//! All operations are deterministic and replayable from history.
403use std::future::Future;
404use std::pin::Pin;
405use std::sync::{Arc, Mutex};
406use std::task::{Context, Poll};
407
408// Public orchestration primitives and executor
409
410pub mod client;
411pub mod runtime;
412// Re-export descriptor type for public API ergonomics
413pub use runtime::OrchestrationDescriptor;
414pub mod providers;
415
416#[cfg(feature = "provider-test")]
417pub mod provider_validations;
418
419#[cfg(feature = "provider-test")]
420pub mod provider_validation;
421
422#[cfg(feature = "provider-test")]
423pub mod provider_stress_tests;
424
425#[cfg(feature = "provider-test")]
426pub mod provider_stress_test;
427
428// Re-export key runtime types for convenience
429pub use client::{Client, ClientError};
430pub use runtime::{
431    OrchestrationHandler, OrchestrationRegistry, OrchestrationRegistryBuilder, OrchestrationStatus, RuntimeOptions,
432};
433
434// Re-export management types for convenience
435pub use providers::{
436    ExecutionInfo, InstanceInfo, ProviderAdmin, QueueDepths, ScheduledActivityIdentifier, SessionFetchConfig,
437    SystemMetrics,
438};
439
440// Re-export capability filtering types
441pub use providers::{DispatcherCapabilityFilter, SemverRange, current_build_version};
442
443// Re-export deletion/pruning types for Client API users
444pub use providers::{DeleteInstanceResult, InstanceFilter, InstanceTree, PruneOptions, PruneResult};
445
446// Type aliases for improved readability and maintainability
447/// Shared reference to a Provider implementation
448pub type ProviderRef = Arc<dyn providers::Provider>;
449
450/// Shared reference to an OrchestrationHandler
451pub type OrchestrationHandlerRef = Arc<dyn runtime::OrchestrationHandler>;
452
453// ============================================================================
454// Heterogeneous Select Result Types
455// ============================================================================
456
457// ============================================================================
458// Schedule Kind and DurableFuture (Cancellation Support)
459// ============================================================================
460
461/// Identifies the kind of scheduled work for cancellation purposes.
462///
463/// When a `DurableFuture` is dropped without completing, the runtime uses
464/// this discriminator to determine how to cancel the underlying work:
465/// - **Activity**: Lock stealing via provider (DELETE from worker_queue)
466/// - **Timer**: No-op (virtual construct, no external state)
467/// - **ExternalWait**: No-op (virtual construct, no external state)  
468/// - **SubOrchestration**: Enqueue `CancelInstance` work item for child
469#[derive(Debug, Clone)]
470pub enum ScheduleKind {
471    /// A scheduled activity execution
472    Activity {
473        /// Activity name for debugging/logging
474        name: String,
475    },
476    /// A durable timer
477    Timer,
478    /// Waiting for an external event
479    ExternalWait {
480        /// Event name for debugging/logging
481        event_name: String,
482    },
483    /// A sub-orchestration
484    SubOrchestration {
485        /// Token for this schedule (used to look up resolved instance ID)
486        token: u64,
487    },
488}
489
490/// A wrapper around scheduled futures that supports cancellation on drop.
491///
492/// When a `DurableFuture` is dropped without completing (e.g., as a select loser,
493/// or when going out of scope without being awaited), the underlying scheduled work
494/// is cancelled:
495///
496/// - **Activities**: Lock stealing via provider (removes from worker queue)
497/// - **Sub-orchestrations**: `CancelInstance` work item enqueued for child
498/// - **Timers/External waits**: No-op (virtual constructs with no external state)
499///
500/// # Examples
501///
502/// ```rust,no_run
503/// # use duroxide::OrchestrationContext;
504/// # use std::time::Duration;
505/// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
506/// // Activity scheduled - if timer wins, activity gets cancelled
507/// let activity = ctx.schedule_activity("SlowWork", "input");
508/// let timeout = ctx.schedule_timer(Duration::from_secs(5));
509///
510/// match ctx.select2(activity, timeout).await {
511///     duroxide::Either2::First(result) => result,
512///     duroxide::Either2::Second(()) => Err("Timed out - activity cancelled".to_string()),
513/// }
514/// # }
515/// ```
516///
517/// # Drop Semantics
518///
519/// Unlike regular Rust futures which are inert on drop, `DurableFuture` has
520/// meaningful drop semantics similar to `File` (closes on drop) or `MutexGuard`
521/// (releases lock on drop). This is intentional - we want unobserved scheduled
522/// work to be cancelled rather than leaked.
523///
524/// **Note:** Using `std::mem::forget()` on a `DurableFuture` will bypass
525/// cancellation, causing the scheduled work to run but its result to be lost.
526pub struct DurableFuture<T> {
527    /// Token assigned at creation (before schedule_id is known)
528    token: u64,
529    /// What kind of schedule this represents
530    kind: ScheduleKind,
531    /// Reference to context for cancellation registration
532    ctx: OrchestrationContext,
533    /// Whether the future has completed (to skip cancellation)
534    completed: bool,
535    /// The underlying future
536    inner: std::pin::Pin<Box<dyn Future<Output = T> + Send>>,
537}
538
539impl<T> DurableFuture<T> {
540    /// Create a new `DurableFuture` wrapping an inner future.
541    fn new(
542        token: u64,
543        kind: ScheduleKind,
544        ctx: OrchestrationContext,
545        inner: impl Future<Output = T> + Send + 'static,
546    ) -> Self {
547        Self {
548            token,
549            kind,
550            ctx,
551            completed: false,
552            inner: Box::pin(inner),
553        }
554    }
555}
556
557impl<T> Future for DurableFuture<T> {
558    type Output = T;
559
560    fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<T> {
561        match self.inner.as_mut().poll(cx) {
562            Poll::Ready(value) => {
563                self.completed = true;
564                Poll::Ready(value)
565            }
566            Poll::Pending => Poll::Pending,
567        }
568    }
569}
570
571impl<T> Drop for DurableFuture<T> {
572    fn drop(&mut self) {
573        if !self.completed {
574            // Future dropped without completing - trigger cancellation.
575            // Note: During dehydration (TurnResult::Continue), the orchestration future
576            // is dropped after collect_cancelled_from_context() has already run, so these
577            // cancellations go into a context that's about to be dropped. This is safe
578            // because the next turn creates a fresh context.
579            self.ctx.mark_token_cancelled(self.token, &self.kind);
580        }
581    }
582}
583
584// DurableFuture is Send if T is Send (inner is already Send-boxed)
585unsafe impl<T: Send> Send for DurableFuture<T> {}
586
587/// Result type for `select2` - represents which of two futures completed first.
588///
589/// Use this when racing two futures with different output types:
590/// ```rust,no_run
591/// # use duroxide::{OrchestrationContext, Either2};
592/// # use std::time::Duration;
593/// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
594/// let activity = ctx.schedule_activity("Work", "input");
595/// let timeout = ctx.schedule_timer(Duration::from_secs(30));
596///
597/// match ctx.select2(activity, timeout).await {
598///     Either2::First(result) => result,
599///     Either2::Second(()) => Err("Timed out".to_string()),
600/// }
601/// # }
602/// ```
603#[derive(Debug, Clone, PartialEq, Eq)]
604pub enum Either2<A, B> {
605    /// First future completed first
606    First(A),
607    /// Second future completed first  
608    Second(B),
609}
610
611impl<A, B> Either2<A, B> {
612    /// Returns true if this is the First variant
613    pub fn is_first(&self) -> bool {
614        matches!(self, Either2::First(_))
615    }
616
617    /// Returns true if this is the Second variant
618    pub fn is_second(&self) -> bool {
619        matches!(self, Either2::Second(_))
620    }
621
622    /// Returns the index of the winner (0 for First, 1 for Second)
623    pub fn index(&self) -> usize {
624        match self {
625            Either2::First(_) => 0,
626            Either2::Second(_) => 1,
627        }
628    }
629}
630
631impl<T> Either2<T, T> {
632    /// For homogeneous Either2 (both types are the same), extract as (index, value).
633    /// This is useful for migration from the old `(usize, T)` return type.
634    pub fn into_tuple(self) -> (usize, T) {
635        match self {
636            Either2::First(v) => (0, v),
637            Either2::Second(v) => (1, v),
638        }
639    }
640}
641
642/// Result type for `select3` - represents which of three futures completed first.
643#[derive(Debug, Clone, PartialEq, Eq)]
644pub enum Either3<A, B, C> {
645    /// First future completed first
646    First(A),
647    /// Second future completed first
648    Second(B),
649    /// Third future completed first
650    Third(C),
651}
652
653impl<A, B, C> Either3<A, B, C> {
654    /// Returns the index of the winner (0 for First, 1 for Second, 2 for Third)
655    pub fn index(&self) -> usize {
656        match self {
657            Either3::First(_) => 0,
658            Either3::Second(_) => 1,
659            Either3::Third(_) => 2,
660        }
661    }
662}
663
664impl<T> Either3<T, T, T> {
665    /// For homogeneous Either3 (all types are the same), extract as (index, value).
666    pub fn into_tuple(self) -> (usize, T) {
667        match self {
668            Either3::First(v) => (0, v),
669            Either3::Second(v) => (1, v),
670            Either3::Third(v) => (2, v),
671        }
672    }
673}
674
675// Reserved prefix for built-in system activities.
676// User-registered activities cannot use names starting with this prefix.
677pub(crate) const SYSCALL_ACTIVITY_PREFIX: &str = "__duroxide_syscall:";
678
679// Built-in system activity names (constructed from prefix)
680pub(crate) const SYSCALL_ACTIVITY_NEW_GUID: &str = "__duroxide_syscall:new_guid";
681pub(crate) const SYSCALL_ACTIVITY_UTC_NOW_MS: &str = "__duroxide_syscall:utc_now_ms";
682
683use crate::_typed_codec::Codec;
684// LogLevel is now defined locally in this file
685use serde::{Deserialize, Serialize};
686use std::time::{Duration as StdDuration, SystemTime, UNIX_EPOCH};
687
688// Internal codec utilities for typed I/O (kept private; public API remains ergonomic)
689mod _typed_codec {
690    use serde::{Serialize, de::DeserializeOwned};
691    use serde_json::Value;
692    pub trait Codec {
693        fn encode<T: Serialize>(v: &T) -> Result<String, String>;
694        fn decode<T: DeserializeOwned>(s: &str) -> Result<T, String>;
695    }
696    pub struct Json;
697    impl Codec for Json {
698        fn encode<T: Serialize>(v: &T) -> Result<String, String> {
699            // If the value is a JSON string, return raw content to preserve historic behavior
700            match serde_json::to_value(v) {
701                Ok(Value::String(s)) => Ok(s),
702                Ok(val) => serde_json::to_string(&val).map_err(|e| e.to_string()),
703                Err(e) => Err(e.to_string()),
704            }
705        }
706        fn decode<T: DeserializeOwned>(s: &str) -> Result<T, String> {
707            // Try parse as JSON first
708            match serde_json::from_str::<T>(s) {
709                Ok(v) => Ok(v),
710                Err(_) => {
711                    // Fallback: treat raw string as JSON string value
712                    let val = Value::String(s.to_string());
713                    serde_json::from_value(val).map_err(|e| e.to_string())
714                }
715            }
716        }
717    }
718}
719
720/// Initial execution ID for new orchestration instances.
721/// All orchestrations start with execution_id = 1.
722pub const INITIAL_EXECUTION_ID: u64 = 1;
723
724/// Initial event ID for new executions.
725/// The first event (OrchestrationStarted) always has event_id = 1.
726pub const INITIAL_EVENT_ID: u64 = 1;
727
728// =============================================================================
729// Sub-orchestration instance ID conventions
730// =============================================================================
731
732/// Prefix for auto-generated sub-orchestration instance IDs.
733/// IDs starting with this prefix will have parent prefix added: `{parent}::{sub::N}`
734pub const SUB_ORCH_AUTO_PREFIX: &str = "sub::";
735
736/// Prefix for placeholder instance IDs before event ID assignment.
737/// These are replaced with `sub::{event_id}` during action processing.
738pub(crate) const SUB_ORCH_PENDING_PREFIX: &str = "sub::pending_";
739
740/// Determine if a sub-orchestration instance ID is auto-generated (needs parent prefix).
741///
742/// Auto-generated IDs start with "sub::" and will have the parent instance prefixed
743/// to create a globally unique ID: `{parent_instance}::{child_instance}`.
744///
745/// Explicit IDs (those not starting with "sub::") are used exactly as provided.
746#[inline]
747pub fn is_auto_generated_sub_orch_id(instance: &str) -> bool {
748    instance.starts_with(SUB_ORCH_AUTO_PREFIX)
749}
750
751/// Build the full child instance ID, adding parent prefix only for auto-generated IDs.
752///
753/// - Auto-generated IDs (starting with "sub::"): `{parent}::{child}` (e.g., `parent-1::sub::5`)
754/// - Explicit IDs: used exactly as provided (e.g., `my-custom-child-id`)
755#[inline]
756pub fn build_child_instance_id(parent_instance: &str, child_instance: &str) -> String {
757    if is_auto_generated_sub_orch_id(child_instance) {
758        format!("{parent_instance}::{child_instance}")
759    } else {
760        child_instance.to_string()
761    }
762}
763
764/// Structured error details for orchestration failures.
765///
766/// Errors are categorized into three types for proper metrics and logging:
767/// - **Infrastructure**: Provider failures, data corruption (abort turn, never reach user code)
768/// - **Configuration**: Deployment issues like unregistered activities, nondeterminism (abort turn)
769/// - **Application**: Business logic failures (flow through normal orchestration code)
770#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
771pub enum ErrorDetails {
772    /// Infrastructure failure (provider errors, data corruption).
773    /// These errors abort orchestration execution and never reach user code.
774    Infrastructure {
775        operation: String,
776        message: String,
777        retryable: bool,
778    },
779
780    /// Configuration error (unregistered orchestrations/activities, nondeterminism).
781    /// These errors abort orchestration execution and never reach user code.
782    Configuration {
783        kind: ConfigErrorKind,
784        resource: String,
785        message: Option<String>,
786    },
787
788    /// Application error (business logic failures).
789    /// These are the ONLY errors that orchestration code sees.
790    Application {
791        kind: AppErrorKind,
792        message: String,
793        retryable: bool,
794    },
795
796    /// Poison message error - message exceeded max fetch attempts.
797    ///
798    /// This indicates a message that repeatedly fails to process.
799    /// Could be caused by:
800    /// - Malformed message data causing deserialization failures
801    /// - Message triggering bugs that crash the worker
802    /// - Transient infrastructure issues that became permanent
803    /// - Application code bugs triggered by specific input patterns
804    Poison {
805        /// Number of times the message was fetched
806        attempt_count: u32,
807        /// Maximum allowed attempts
808        max_attempts: u32,
809        /// Message type and identity
810        message_type: PoisonMessageType,
811        /// The poisoned message content (serialized JSON for debugging)
812        message: String,
813    },
814}
815
816/// Poison message type identification.
817#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
818pub enum PoisonMessageType {
819    /// Orchestration work item batch
820    Orchestration { instance: String, execution_id: u64 },
821    /// Activity execution
822    Activity {
823        instance: String,
824        execution_id: u64,
825        activity_name: String,
826        activity_id: u64,
827    },
828    /// History deserialization failure (e.g., unknown event types from a newer duroxide version)
829    FailedDeserialization {
830        instance: String,
831        execution_id: u64,
832        error: String,
833    },
834}
835
836/// Configuration error kinds.
837#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
838pub enum ConfigErrorKind {
839    Nondeterminism,
840}
841
842/// Application error kinds.
843#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
844pub enum AppErrorKind {
845    ActivityFailed,
846    OrchestrationFailed,
847    Panicked,
848    Cancelled { reason: String },
849}
850
851impl ErrorDetails {
852    /// Get failure category for metrics/logging.
853    pub fn category(&self) -> &'static str {
854        match self {
855            ErrorDetails::Infrastructure { .. } => "infrastructure",
856            ErrorDetails::Configuration { .. } => "configuration",
857            ErrorDetails::Application { .. } => "application",
858            ErrorDetails::Poison { .. } => "poison",
859        }
860    }
861
862    /// Check if failure is retryable.
863    pub fn is_retryable(&self) -> bool {
864        match self {
865            ErrorDetails::Infrastructure { retryable, .. } => *retryable,
866            ErrorDetails::Application { retryable, .. } => *retryable,
867            ErrorDetails::Configuration { .. } => false,
868            ErrorDetails::Poison { .. } => false, // Never retryable
869        }
870    }
871
872    /// Get display message for logging/UI (backward compatible format).
873    pub fn display_message(&self) -> String {
874        match self {
875            ErrorDetails::Infrastructure { operation, message, .. } => {
876                format!("infrastructure:{operation}: {message}")
877            }
878            ErrorDetails::Configuration {
879                kind,
880                resource,
881                message,
882            } => match kind {
883                ConfigErrorKind::Nondeterminism => message
884                    .as_ref()
885                    .map(|m| format!("nondeterministic: {m}"))
886                    .unwrap_or_else(|| format!("nondeterministic in {resource}")),
887            },
888            ErrorDetails::Application { kind, message, .. } => match kind {
889                AppErrorKind::Cancelled { reason } => format!("canceled: {reason}"),
890                AppErrorKind::Panicked => format!("orchestration panicked: {message}"),
891                _ => message.clone(),
892            },
893            ErrorDetails::Poison {
894                attempt_count,
895                max_attempts,
896                message_type,
897                ..
898            } => match message_type {
899                PoisonMessageType::Orchestration { instance, .. } => {
900                    format!("poison: orchestration {instance} exceeded {attempt_count} attempts (max {max_attempts})")
901                }
902                PoisonMessageType::Activity {
903                    activity_name,
904                    activity_id,
905                    ..
906                } => {
907                    format!(
908                        "poison: activity {activity_name}#{activity_id} exceeded {attempt_count} attempts (max {max_attempts})"
909                    )
910                }
911                PoisonMessageType::FailedDeserialization { instance, error, .. } => {
912                    format!(
913                        "poison: orchestration {instance} history deserialization failed after {attempt_count} attempts (max {max_attempts}): {error}"
914                    )
915                }
916            },
917        }
918    }
919}
920
921/// Unified event with common metadata and type-specific payload.
922///
923/// All events have common fields (event_id, source_event_id, instance_id, etc.)
924/// plus type-specific data in the `kind` field.
925///
926/// Events are append-only history entries persisted by a provider and consumed during replay.
927/// The `event_id` is a monotonically increasing position in history.
928/// Scheduling and completion events are linked via `source_event_id`.
929#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
930pub struct Event {
931    /// Sequential position in history (monotonically increasing per execution)
932    pub event_id: u64,
933
934    /// For completion events: references the scheduling event this completes.
935    /// None for lifecycle events (OrchestrationStarted, etc.) and scheduling events.
936    /// Some(id) for completion events (ActivityCompleted, TimerFired, etc.).
937    pub source_event_id: Option<u64>,
938
939    /// Instance this event belongs to.
940    /// Denormalized from DB key for self-contained events.
941    pub instance_id: String,
942
943    /// Execution this event belongs to.
944    /// Denormalized from DB key for self-contained events.
945    pub execution_id: u64,
946
947    /// Timestamp when event was created (milliseconds since Unix epoch).
948    pub timestamp_ms: u64,
949
950    /// Crate semver version that generated this event.
951    /// Format: "0.1.0", "0.2.0", etc.
952    pub duroxide_version: String,
953
954    /// Event type and associated data.
955    #[serde(flatten)]
956    pub kind: EventKind,
957}
958
959/// Event-specific payloads.
960///
961/// Common fields have been extracted to the Event struct:
962/// - event_id: moved to Event.event_id
963/// - source_event_id: moved to Event.source_event_id (`Option<u64>`)
964/// - execution_id: moved to Event.execution_id (was in 4 variants)
965#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
966#[serde(tag = "type")]
967pub enum EventKind {
968    /// Orchestration instance was created and started by name with input.
969    /// Version is required; parent linkage is present when this is a child orchestration.
970    #[serde(rename = "OrchestrationStarted")]
971    OrchestrationStarted {
972        name: String,
973        version: String,
974        input: String,
975        parent_instance: Option<String>,
976        parent_id: Option<u64>,
977    },
978
979    /// Orchestration completed with a final result.
980    #[serde(rename = "OrchestrationCompleted")]
981    OrchestrationCompleted { output: String },
982
983    /// Orchestration failed with a final error.
984    #[serde(rename = "OrchestrationFailed")]
985    OrchestrationFailed { details: ErrorDetails },
986
987    /// Activity was scheduled.
988    #[serde(rename = "ActivityScheduled")]
989    ActivityScheduled {
990        name: String,
991        input: String,
992        #[serde(skip_serializing_if = "Option::is_none")]
993        #[serde(default)]
994        session_id: Option<String>,
995    },
996
997    /// Activity completed successfully with a result.
998    #[serde(rename = "ActivityCompleted")]
999    ActivityCompleted { result: String },
1000
1001    /// Activity failed with error details.
1002    #[serde(rename = "ActivityFailed")]
1003    ActivityFailed { details: ErrorDetails },
1004
1005    /// Cancellation was requested for an activity (best-effort; completion may still arrive).
1006    /// Correlates to the ActivityScheduled event via Event.source_event_id.
1007    #[serde(rename = "ActivityCancelRequested")]
1008    ActivityCancelRequested { reason: String },
1009
1010    /// Timer was created and will logically fire at `fire_at_ms`.
1011    #[serde(rename = "TimerCreated")]
1012    TimerCreated { fire_at_ms: u64 },
1013
1014    /// Timer fired at logical time `fire_at_ms`.
1015    #[serde(rename = "TimerFired")]
1016    TimerFired { fire_at_ms: u64 },
1017
1018    /// Subscription to an external event by name was recorded.
1019    #[serde(rename = "ExternalSubscribed")]
1020    ExternalSubscribed { name: String },
1021
1022    /// An external event was raised. Matched by name (no source_event_id).
1023    #[serde(rename = "ExternalEvent")]
1024    ExternalEvent { name: String, data: String },
1025
1026    /// Fire-and-forget orchestration scheduling (detached).
1027    #[serde(rename = "OrchestrationChained")]
1028    OrchestrationChained {
1029        name: String,
1030        instance: String,
1031        input: String,
1032    },
1033
1034    /// Sub-orchestration was scheduled with deterministic child instance id.
1035    #[serde(rename = "SubOrchestrationScheduled")]
1036    SubOrchestrationScheduled {
1037        name: String,
1038        instance: String,
1039        input: String,
1040    },
1041
1042    /// Sub-orchestration completed and returned a result to the parent.
1043    #[serde(rename = "SubOrchestrationCompleted")]
1044    SubOrchestrationCompleted { result: String },
1045
1046    /// Sub-orchestration failed and returned error details to the parent.
1047    #[serde(rename = "SubOrchestrationFailed")]
1048    SubOrchestrationFailed { details: ErrorDetails },
1049
1050    /// Cancellation was requested for a sub-orchestration (best-effort; completion may still arrive).
1051    /// Correlates to the SubOrchestrationScheduled event via Event.source_event_id.
1052    #[serde(rename = "SubOrchestrationCancelRequested")]
1053    SubOrchestrationCancelRequested { reason: String },
1054
1055    /// Orchestration continued as new with fresh input (terminal for this execution).
1056    #[serde(rename = "OrchestrationContinuedAsNew")]
1057    OrchestrationContinuedAsNew { input: String },
1058
1059    /// Cancellation has been requested for the orchestration (terminal will follow deterministically).
1060    #[serde(rename = "OrchestrationCancelRequested")]
1061    OrchestrationCancelRequested { reason: String },
1062
1063    /// V2 subscription: includes a topic filter for pub/sub matching.
1064    /// Feature-gated for replay engine extensibility verification.
1065    #[cfg(feature = "replay-version-test")]
1066    #[serde(rename = "ExternalSubscribed2")]
1067    ExternalSubscribed2 { name: String, topic: String },
1068
1069    /// V2 event: includes the actual topic it was published on.
1070    /// Feature-gated for replay engine extensibility verification.
1071    #[cfg(feature = "replay-version-test")]
1072    #[serde(rename = "ExternalEvent2")]
1073    ExternalEvent2 { name: String, topic: String, data: String },
1074}
1075
1076impl Event {
1077    /// Create a new event with common fields populated and a specific event_id.
1078    ///
1079    /// Use this when you know the event_id upfront (e.g., during replay or when
1080    /// creating events inline).
1081    pub fn with_event_id(
1082        event_id: u64,
1083        instance_id: impl Into<String>,
1084        execution_id: u64,
1085        source_event_id: Option<u64>,
1086        kind: EventKind,
1087    ) -> Self {
1088        use std::time::{SystemTime, UNIX_EPOCH};
1089        Event {
1090            event_id,
1091            source_event_id,
1092            instance_id: instance_id.into(),
1093            execution_id,
1094            timestamp_ms: SystemTime::now()
1095                .duration_since(UNIX_EPOCH)
1096                .map(|d| d.as_millis() as u64)
1097                .unwrap_or(0),
1098            duroxide_version: env!("CARGO_PKG_VERSION").to_string(),
1099            kind,
1100        }
1101    }
1102
1103    /// Create a new event with common fields populated.
1104    ///
1105    /// The event_id will be 0 and should be set by the history manager.
1106    pub fn new(
1107        instance_id: impl Into<String>,
1108        execution_id: u64,
1109        source_event_id: Option<u64>,
1110        kind: EventKind,
1111    ) -> Self {
1112        Self::with_event_id(0, instance_id, execution_id, source_event_id, kind)
1113    }
1114
1115    /// Get the event_id (position in history).
1116    #[inline]
1117    pub fn event_id(&self) -> u64 {
1118        self.event_id
1119    }
1120
1121    /// Set the event_id (used by runtime when adding events to history).
1122    #[inline]
1123    pub(crate) fn set_event_id(&mut self, id: u64) {
1124        self.event_id = id;
1125    }
1126
1127    /// Get the source_event_id if this is a completion event.
1128    /// Returns None for lifecycle and scheduling events.
1129    #[inline]
1130    pub fn source_event_id(&self) -> Option<u64> {
1131        self.source_event_id
1132    }
1133
1134    /// Check if this event is a terminal event (ends the orchestration).
1135    pub fn is_terminal(&self) -> bool {
1136        matches!(
1137            self.kind,
1138            EventKind::OrchestrationCompleted { .. }
1139                | EventKind::OrchestrationFailed { .. }
1140                | EventKind::OrchestrationContinuedAsNew { .. }
1141        )
1142    }
1143}
1144
1145/// Log levels for orchestration context logging.
1146#[derive(Debug, Clone)]
1147pub enum LogLevel {
1148    Info,
1149    Warn,
1150    Error,
1151}
1152
1153/// Backoff strategy for computing delay between retry attempts.
1154#[derive(Debug, Clone)]
1155pub enum BackoffStrategy {
1156    /// No delay between retries.
1157    None,
1158    /// Fixed delay between all retries.
1159    Fixed {
1160        /// Delay duration between each retry.
1161        delay: std::time::Duration,
1162    },
1163    /// Linear backoff: delay = base * attempt, capped at max.
1164    Linear {
1165        /// Base delay multiplied by attempt number.
1166        base: std::time::Duration,
1167        /// Maximum delay cap.
1168        max: std::time::Duration,
1169    },
1170    /// Exponential backoff: delay = base * multiplier^(attempt-1), capped at max.
1171    Exponential {
1172        /// Initial delay for first retry.
1173        base: std::time::Duration,
1174        /// Multiplier applied each attempt.
1175        multiplier: f64,
1176        /// Maximum delay cap.
1177        max: std::time::Duration,
1178    },
1179}
1180
1181impl Default for BackoffStrategy {
1182    fn default() -> Self {
1183        BackoffStrategy::Exponential {
1184            base: std::time::Duration::from_millis(100),
1185            multiplier: 2.0,
1186            max: std::time::Duration::from_secs(30),
1187        }
1188    }
1189}
1190
1191impl BackoffStrategy {
1192    /// Compute delay for given attempt (1-indexed).
1193    /// Attempt 1 is after first failure, so delay_for_attempt(1) is the first backoff.
1194    pub fn delay_for_attempt(&self, attempt: u32) -> std::time::Duration {
1195        match self {
1196            BackoffStrategy::None => std::time::Duration::ZERO,
1197            BackoffStrategy::Fixed { delay } => *delay,
1198            BackoffStrategy::Linear { base, max } => {
1199                let delay = base.saturating_mul(attempt);
1200                std::cmp::min(delay, *max)
1201            }
1202            BackoffStrategy::Exponential { base, multiplier, max } => {
1203                // delay = base * multiplier^(attempt-1)
1204                let factor = multiplier.powi(attempt.saturating_sub(1) as i32);
1205                let delay_nanos = (base.as_nanos() as f64 * factor) as u128;
1206                let delay = std::time::Duration::from_nanos(delay_nanos.min(u64::MAX as u128) as u64);
1207                std::cmp::min(delay, *max)
1208            }
1209        }
1210    }
1211}
1212
1213/// Retry policy for activities.
1214///
1215/// Configures automatic retry behavior including maximum attempts, backoff strategy,
1216/// and optional total timeout spanning all attempts.
1217///
1218/// # Example
1219///
1220/// ```rust
1221/// use std::time::Duration;
1222/// use duroxide::{RetryPolicy, BackoffStrategy};
1223///
1224/// // Simple retry with defaults (3 attempts, exponential backoff)
1225/// let policy = RetryPolicy::new(3);
1226///
1227/// // Custom policy with timeout and fixed backoff
1228/// let policy = RetryPolicy::new(5)
1229///     .with_timeout(Duration::from_secs(30))
1230///     .with_backoff(BackoffStrategy::Fixed {
1231///         delay: Duration::from_secs(1),
1232///     });
1233/// ```
1234#[derive(Debug, Clone)]
1235pub struct RetryPolicy {
1236    /// Maximum number of attempts (including initial). Must be >= 1.
1237    pub max_attempts: u32,
1238    /// Backoff strategy between retries.
1239    pub backoff: BackoffStrategy,
1240    /// Per-attempt timeout. If set, each activity attempt is raced against this
1241    /// timeout. If timeout fires, returns error immediately (no retry).
1242    /// Retries only occur for activity errors, not timeouts. None = no timeout.
1243    pub timeout: Option<std::time::Duration>,
1244}
1245
1246impl Default for RetryPolicy {
1247    fn default() -> Self {
1248        Self {
1249            max_attempts: 3,
1250            backoff: BackoffStrategy::default(),
1251            timeout: None,
1252        }
1253    }
1254}
1255
1256impl RetryPolicy {
1257    /// Create a new retry policy with specified max attempts and default backoff.
1258    ///
1259    /// # Panics
1260    /// Panics if `max_attempts` is 0.
1261    pub fn new(max_attempts: u32) -> Self {
1262        assert!(max_attempts >= 1, "max_attempts must be at least 1");
1263        Self {
1264            max_attempts,
1265            ..Default::default()
1266        }
1267    }
1268
1269    /// Set per-attempt timeout.
1270    ///
1271    /// Each activity attempt is raced against this timeout. If the timeout fires
1272    /// before the activity completes, returns an error immediately (no retry).
1273    /// Retries only occur for activity errors, not timeouts.
1274    pub fn with_timeout(mut self, timeout: std::time::Duration) -> Self {
1275        self.timeout = Some(timeout);
1276        self
1277    }
1278
1279    /// Alias for `with_timeout` for backwards compatibility.
1280    #[doc(hidden)]
1281    pub fn with_total_timeout(mut self, timeout: std::time::Duration) -> Self {
1282        self.timeout = Some(timeout);
1283        self
1284    }
1285
1286    /// Set backoff strategy.
1287    pub fn with_backoff(mut self, backoff: BackoffStrategy) -> Self {
1288        self.backoff = backoff;
1289        self
1290    }
1291
1292    /// Compute delay for given attempt using the configured backoff strategy.
1293    pub fn delay_for_attempt(&self, attempt: u32) -> std::time::Duration {
1294        self.backoff.delay_for_attempt(attempt)
1295    }
1296}
1297
1298/// Declarative decisions produced by an orchestration turn. The host/provider
1299/// is responsible for materializing these into corresponding `Event`s.
1300#[derive(Debug, Clone)]
1301pub enum Action {
1302    /// Schedule an activity invocation. scheduling_event_id is the event_id of the ActivityScheduled event.
1303    CallActivity {
1304        scheduling_event_id: u64,
1305        name: String,
1306        input: String,
1307        /// Optional session ID for worker affinity routing.
1308        /// When set, the activity is routed to the worker owning this session.
1309        session_id: Option<String>,
1310    },
1311    /// Create a timer that will fire at the specified absolute time.
1312    /// scheduling_event_id is the event_id of the TimerCreated event.
1313    /// fire_at_ms is the absolute timestamp (ms since epoch) when the timer should fire.
1314    CreateTimer { scheduling_event_id: u64, fire_at_ms: u64 },
1315    /// Subscribe to an external event by name. scheduling_event_id is the event_id of the ExternalSubscribed event.
1316    WaitExternal { scheduling_event_id: u64, name: String },
1317    /// Start a detached orchestration (no result routing back to parent).
1318    StartOrchestrationDetached {
1319        scheduling_event_id: u64,
1320        name: String,
1321        version: Option<String>,
1322        instance: String,
1323        input: String,
1324    },
1325    /// Start a sub-orchestration by name and child instance id. scheduling_event_id is the event_id of the SubOrchestrationScheduled event.
1326    StartSubOrchestration {
1327        scheduling_event_id: u64,
1328        name: String,
1329        version: Option<String>,
1330        instance: String,
1331        input: String,
1332    },
1333
1334    /// Continue the current orchestration as a new execution with new input (terminal for current execution).
1335    /// Optional version string selects the target orchestration version for the new execution.
1336    ContinueAsNew { input: String, version: Option<String> },
1337
1338    /// V2: Subscribe to an external event with topic-based pub/sub matching.
1339    /// Feature-gated for replay engine extensibility verification.
1340    #[cfg(feature = "replay-version-test")]
1341    WaitExternal2 {
1342        scheduling_event_id: u64,
1343        name: String,
1344        topic: String,
1345    },
1346}
1347
1348/// Result delivered to a durable future upon completion.
1349///
1350/// This enum represents the completion states for various durable operations.
1351#[doc(hidden)]
1352#[derive(Debug, Clone)]
1353#[allow(dead_code)] // ExternalData is part of the design but delivered via get_external_event
1354pub enum CompletionResult {
1355    /// Activity completed successfully
1356    ActivityOk(String),
1357    /// Activity failed with error
1358    ActivityErr(String),
1359    /// Timer fired
1360    TimerFired,
1361    /// Sub-orchestration completed successfully
1362    SubOrchOk(String),
1363    /// Sub-orchestration failed with error
1364    SubOrchErr(String),
1365    /// External event data (NOTE: External events delivered via get_external_event, not CompletionResult)
1366    ExternalData(String),
1367}
1368
1369#[derive(Debug)]
1370struct CtxInner {
1371    /// Whether we're currently replaying history (true) or processing new events (false).
1372    /// True while processing baseline_history events, false after.
1373    /// Users can check this via `ctx.is_replaying()` to skip side effects during replay.
1374    is_replaying: bool,
1375
1376    // === Replay Engine State ===
1377    /// Token counter (each schedule_*() call gets a unique token)
1378    next_token: u64,
1379    /// Emitted actions (token -> Action kind info)
1380    /// Token is used to correlate with schedule events during replay
1381    emitted_actions: Vec<(u64, Action)>,
1382    /// Results map: token -> completion result (populated by replay engine)
1383    completion_results: std::collections::HashMap<u64, CompletionResult>,
1384    /// Token -> schedule_id binding (set when replay engine matches action to history)
1385    token_bindings: std::collections::HashMap<u64, u64>,
1386    /// External subscriptions: schedule_id -> (name, subscription_index)
1387    external_subscriptions: std::collections::HashMap<u64, (String, usize)>,
1388    /// External arrivals: name -> list of payloads in arrival order
1389    external_arrivals: std::collections::HashMap<String, Vec<String>>,
1390    /// Next subscription index per external event name
1391    external_next_index: std::collections::HashMap<String, usize>,
1392
1393    // === V2 External Event State (feature-gated) ===
1394    /// V2 external subscriptions: schedule_id -> (name, topic, subscription_index)
1395    #[cfg(feature = "replay-version-test")]
1396    external2_subscriptions: std::collections::HashMap<u64, (String, String, usize)>,
1397    /// V2 external arrivals: (name, topic) -> list of payloads in arrival order
1398    #[cfg(feature = "replay-version-test")]
1399    external2_arrivals: std::collections::HashMap<(String, String), Vec<String>>,
1400    /// Next subscription index per (name, topic) pair
1401    #[cfg(feature = "replay-version-test")]
1402    external2_next_index: std::collections::HashMap<(String, String), usize>,
1403
1404    /// Sub-orchestration token -> resolved instance ID mapping
1405    sub_orchestration_instances: std::collections::HashMap<u64, String>,
1406
1407    // === Cancellation Tracking ===
1408    /// Tokens that have been cancelled (dropped without completing)
1409    cancelled_tokens: std::collections::HashSet<u64>,
1410    /// Cancelled token -> ScheduleKind mapping (for determining cancellation action)
1411    cancelled_token_kinds: std::collections::HashMap<u64, ScheduleKind>,
1412
1413    // Execution metadata
1414    execution_id: u64,
1415    instance_id: String,
1416    orchestration_name: String,
1417    orchestration_version: String,
1418    logging_enabled_this_poll: bool,
1419}
1420
1421impl CtxInner {
1422    fn new(
1423        _history: Vec<Event>, // Kept for API compatibility, no longer used
1424        execution_id: u64,
1425        instance_id: String,
1426        orchestration_name: String,
1427        orchestration_version: String,
1428        _worker_id: Option<String>, // Kept for API compatibility, no longer used
1429    ) -> Self {
1430        Self {
1431            // Start in replaying state - will be set to false when we move past baseline history
1432            is_replaying: true,
1433
1434            // Replay engine state
1435            next_token: 0,
1436            emitted_actions: Vec::new(),
1437            completion_results: Default::default(),
1438            token_bindings: Default::default(),
1439            external_subscriptions: Default::default(),
1440            external_arrivals: Default::default(),
1441            external_next_index: Default::default(),
1442            #[cfg(feature = "replay-version-test")]
1443            external2_subscriptions: Default::default(),
1444            #[cfg(feature = "replay-version-test")]
1445            external2_arrivals: Default::default(),
1446            #[cfg(feature = "replay-version-test")]
1447            external2_next_index: Default::default(),
1448            sub_orchestration_instances: Default::default(),
1449
1450            // Cancellation tracking
1451            cancelled_tokens: Default::default(),
1452            cancelled_token_kinds: Default::default(),
1453
1454            // Execution metadata
1455            execution_id,
1456            instance_id,
1457            orchestration_name,
1458            orchestration_version,
1459            logging_enabled_this_poll: false,
1460        }
1461    }
1462
1463    fn now_ms(&self) -> u64 {
1464        SystemTime::now()
1465            .duration_since(UNIX_EPOCH)
1466            .map(|d| d.as_millis() as u64)
1467            .unwrap_or(0)
1468    }
1469
1470    // === Replay Engine Helpers ===
1471
1472    /// Emit an action and return a token for correlation.
1473    fn emit_action(&mut self, action: Action) -> u64 {
1474        self.next_token += 1;
1475        let token = self.next_token;
1476        self.emitted_actions.push((token, action));
1477        token
1478    }
1479
1480    /// Drain all emitted actions (called by replay engine after polling).
1481    fn drain_emitted_actions(&mut self) -> Vec<(u64, Action)> {
1482        std::mem::take(&mut self.emitted_actions)
1483    }
1484
1485    /// Bind a token to a schedule_id (called by replay engine when matching action to history).
1486    fn bind_token(&mut self, token: u64, schedule_id: u64) {
1487        self.token_bindings.insert(token, schedule_id);
1488    }
1489
1490    /// Get the schedule_id bound to a token (returns None if not yet bound).
1491    fn get_bound_schedule_id(&self, token: u64) -> Option<u64> {
1492        self.token_bindings.get(&token).copied()
1493    }
1494
1495    /// Deliver a completion result for a schedule_id.
1496    fn deliver_result(&mut self, schedule_id: u64, result: CompletionResult) {
1497        // Find the token that was bound to this schedule_id
1498        for (&token, &sid) in &self.token_bindings {
1499            if sid == schedule_id {
1500                self.completion_results.insert(token, result);
1501                return;
1502            }
1503        }
1504        tracing::warn!(
1505            schedule_id,
1506            "dropping completion result with no binding (unsupported for now)"
1507        );
1508    }
1509
1510    /// Check if a result is available for a token.
1511    fn get_result(&self, token: u64) -> Option<&CompletionResult> {
1512        self.completion_results.get(&token)
1513    }
1514
1515    /// Bind an external subscription to a deterministic index.
1516    fn bind_external_subscription(&mut self, schedule_id: u64, name: &str) {
1517        let idx = self.external_next_index.entry(name.to_string()).or_insert(0);
1518        let subscription_index = *idx;
1519        *idx += 1;
1520        self.external_subscriptions
1521            .insert(schedule_id, (name.to_string(), subscription_index));
1522    }
1523
1524    /// Deliver an external event (appends to arrival list for the name).
1525    fn deliver_external_event(&mut self, name: String, data: String) {
1526        self.external_arrivals.entry(name).or_default().push(data);
1527    }
1528
1529    /// Get external event data for a subscription (by schedule_id).
1530    fn get_external_event(&self, schedule_id: u64) -> Option<&String> {
1531        let (name, subscription_index) = self.external_subscriptions.get(&schedule_id)?;
1532        let arrivals = self.external_arrivals.get(name)?;
1533        arrivals.get(*subscription_index)
1534    }
1535
1536    /// V2: Bind an external subscription with topic to a deterministic index.
1537    #[cfg(feature = "replay-version-test")]
1538    fn bind_external_subscription2(&mut self, schedule_id: u64, name: &str, topic: &str) {
1539        let key = (name.to_string(), topic.to_string());
1540        let idx = self.external2_next_index.entry(key.clone()).or_insert(0);
1541        let subscription_index = *idx;
1542        *idx += 1;
1543        self.external2_subscriptions
1544            .insert(schedule_id, (name.to_string(), topic.to_string(), subscription_index));
1545    }
1546
1547    /// V2: Deliver an external event with topic (appends to arrival list for (name, topic)).
1548    #[cfg(feature = "replay-version-test")]
1549    fn deliver_external_event2(&mut self, name: String, topic: String, data: String) {
1550        self.external2_arrivals.entry((name, topic)).or_default().push(data);
1551    }
1552
1553    /// V2: Get external event data for a topic-based subscription (by schedule_id).
1554    #[cfg(feature = "replay-version-test")]
1555    fn get_external_event2(&self, schedule_id: u64) -> Option<&String> {
1556        let (name, topic, subscription_index) = self.external2_subscriptions.get(&schedule_id)?;
1557        let arrivals = self.external2_arrivals.get(&(name.clone(), topic.clone()))?;
1558        arrivals.get(*subscription_index)
1559    }
1560
1561    // === Cancellation Helpers ===
1562
1563    /// Mark a token as cancelled (called by DurableFuture::drop).
1564    fn mark_token_cancelled(&mut self, token: u64, kind: ScheduleKind) {
1565        self.cancelled_tokens.insert(token);
1566        self.cancelled_token_kinds.insert(token, kind);
1567    }
1568
1569    /// Get cancelled activity schedule_ids (tokens that were bound and then dropped).
1570    fn get_cancelled_activity_ids(&self) -> Vec<u64> {
1571        let mut ids = Vec::new();
1572        for &token in &self.cancelled_tokens {
1573            if let Some(kind) = self.cancelled_token_kinds.get(&token)
1574                && matches!(kind, ScheduleKind::Activity { .. })
1575                && let Some(&schedule_id) = self.token_bindings.get(&token)
1576            {
1577                ids.push(schedule_id);
1578            }
1579        }
1580        ids
1581    }
1582
1583    /// Get cancelled sub-orchestration cancellations.
1584    ///
1585    /// Returns `(scheduling_event_id, child_instance_id)` for sub-orchestration futures that were
1586    /// bound (schedule_id assigned) and then dropped.
1587    fn get_cancelled_sub_orchestration_cancellations(&self) -> Vec<(u64, String)> {
1588        let mut cancels = Vec::new();
1589        for &token in &self.cancelled_tokens {
1590            if let Some(ScheduleKind::SubOrchestration { token: sub_token }) = self.cancelled_token_kinds.get(&token)
1591                && let Some(&schedule_id) = self.token_bindings.get(&token)
1592            {
1593                // Look up the resolved instance ID from our mapping
1594                if let Some(instance_id) = self.sub_orchestration_instances.get(sub_token) {
1595                    cancels.push((schedule_id, instance_id.clone()));
1596                }
1597                // If not in mapping, the action wasn't bound yet - nothing to cancel
1598            }
1599        }
1600        cancels
1601    }
1602
1603    /// Bind a sub-orchestration token to its resolved instance ID.
1604    fn bind_sub_orchestration_instance(&mut self, token: u64, instance_id: String) {
1605        self.sub_orchestration_instances.insert(token, instance_id);
1606    }
1607
1608    /// Clear cancelled tokens (called after turn completion to avoid re-processing).
1609    fn clear_cancelled_tokens(&mut self) {
1610        self.cancelled_tokens.clear();
1611        self.cancelled_token_kinds.clear();
1612    }
1613
1614    // Note: deterministic GUID generation was removed from public API.
1615}
1616
1617/// User-facing orchestration context for scheduling and replay-safe helpers.
1618/// Context provided to activities for logging and metadata access.
1619///
1620/// Unlike [`OrchestrationContext`], activities are leaf nodes that cannot schedule new work,
1621/// but they often need to emit structured logs and inspect orchestration metadata. The
1622/// `ActivityContext` exposes the parent orchestration information and trace helpers that log
1623/// with full correlation fields.
1624///
1625/// # Examples
1626///
1627/// ```rust,no_run
1628/// # use duroxide::ActivityContext;
1629/// # use duroxide::runtime::registry::ActivityRegistry;
1630/// let activities = ActivityRegistry::builder()
1631///     .register("ProvisionVM", |ctx: ActivityContext, config: String| async move {
1632///         ctx.trace_info(format!("Provisioning VM with config: {}", config));
1633///         
1634///         // Do actual work (can use sleep, HTTP, etc.)
1635///         let vm_id = provision_vm_internal(config).await?;
1636///         
1637///         ctx.trace_info(format!("VM provisioned: {}", vm_id));
1638///         Ok(vm_id)
1639///     })
1640///     .build();
1641/// # async fn provision_vm_internal(config: String) -> Result<String, String> { Ok("vm-123".to_string()) }
1642/// ```
1643///
1644/// # Metadata Access
1645///
1646/// Activity context provides access to orchestration correlation metadata:
1647/// - `instance_id()` - Orchestration instance identifier
1648/// - `execution_id()` - Execution number (for ContinueAsNew scenarios)
1649/// - `orchestration_name()` - Parent orchestration name
1650/// - `orchestration_version()` - Parent orchestration version
1651/// - `activity_name()` - Current activity name
1652///
1653/// # Cancellation Support
1654///
1655/// Activities can respond to cancellation when their parent orchestration reaches a terminal state:
1656/// - `is_cancelled()` - Check if cancellation has been requested
1657/// - `cancelled()` - Future that completes when cancellation is requested (for use with `tokio::select!`)
1658/// - `cancellation_token()` - Get a clone of the token for spawned tasks
1659///
1660/// # Determinism
1661///
1662/// Activity trace helpers (`trace_info`, `trace_warn`, etc.) do **not** participate in
1663/// deterministic replay. They emit logs directly using [`tracing`] and should only be used for
1664/// diagnostic purposes.
1665#[derive(Clone)]
1666pub struct ActivityContext {
1667    instance_id: String,
1668    execution_id: u64,
1669    orchestration_name: String,
1670    orchestration_version: String,
1671    activity_name: String,
1672    activity_id: u64,
1673    worker_id: String,
1674    /// Optional session ID when scheduled via `schedule_activity_on_session`.
1675    session_id: Option<String>,
1676    /// Cancellation token for cooperative cancellation.
1677    /// Triggered when the parent orchestration reaches a terminal state.
1678    cancellation_token: tokio_util::sync::CancellationToken,
1679    /// Provider store for accessing the Client API.
1680    store: std::sync::Arc<dyn crate::providers::Provider>,
1681}
1682
1683impl ActivityContext {
1684    /// Create a new activity context. This constructor is intended for internal runtime use.
1685    ///
1686    /// Creates context with a new (non-cancelled) cancellation token.
1687    /// Note: The runtime now uses `new_with_cancellation` to provide a shared token.
1688    #[allow(dead_code)]
1689    #[allow(clippy::too_many_arguments)]
1690    pub(crate) fn new(
1691        instance_id: String,
1692        execution_id: u64,
1693        orchestration_name: String,
1694        orchestration_version: String,
1695        activity_name: String,
1696        activity_id: u64,
1697        worker_id: String,
1698        store: std::sync::Arc<dyn crate::providers::Provider>,
1699    ) -> Self {
1700        Self {
1701            instance_id,
1702            execution_id,
1703            orchestration_name,
1704            orchestration_version,
1705            activity_name,
1706            activity_id,
1707            worker_id,
1708            session_id: None,
1709            cancellation_token: tokio_util::sync::CancellationToken::new(),
1710            store,
1711        }
1712    }
1713
1714    /// Create a new activity context with a specific cancellation token.
1715    ///
1716    /// This constructor is intended for internal runtime use when the worker
1717    /// dispatcher needs to provide a cancellation token that can be triggered
1718    /// during activity execution.
1719    #[allow(clippy::too_many_arguments)]
1720    pub(crate) fn new_with_cancellation(
1721        instance_id: String,
1722        execution_id: u64,
1723        orchestration_name: String,
1724        orchestration_version: String,
1725        activity_name: String,
1726        activity_id: u64,
1727        worker_id: String,
1728        session_id: Option<String>,
1729        cancellation_token: tokio_util::sync::CancellationToken,
1730        store: std::sync::Arc<dyn crate::providers::Provider>,
1731    ) -> Self {
1732        Self {
1733            instance_id,
1734            execution_id,
1735            orchestration_name,
1736            orchestration_version,
1737            activity_name,
1738            activity_id,
1739            worker_id,
1740            session_id,
1741            cancellation_token,
1742            store,
1743        }
1744    }
1745
1746    /// Returns the orchestration instance identifier.
1747    pub fn instance_id(&self) -> &str {
1748        &self.instance_id
1749    }
1750
1751    /// Returns the execution id within the orchestration instance.
1752    pub fn execution_id(&self) -> u64 {
1753        self.execution_id
1754    }
1755
1756    /// Returns the parent orchestration name.
1757    pub fn orchestration_name(&self) -> &str {
1758        &self.orchestration_name
1759    }
1760
1761    /// Returns the parent orchestration version.
1762    pub fn orchestration_version(&self) -> &str {
1763        &self.orchestration_version
1764    }
1765
1766    /// Returns the activity name being executed.
1767    pub fn activity_name(&self) -> &str {
1768        &self.activity_name
1769    }
1770
1771    /// Returns the worker dispatcher ID processing this activity.
1772    pub fn worker_id(&self) -> &str {
1773        &self.worker_id
1774    }
1775
1776    /// Returns the session ID if this activity was scheduled via `schedule_activity_on_session`.
1777    ///
1778    /// Returns `None` for regular activities scheduled via `schedule_activity`.
1779    pub fn session_id(&self) -> Option<&str> {
1780        self.session_id.as_deref()
1781    }
1782
1783    /// Emit an INFO level trace entry associated with this activity.
1784    pub fn trace_info(&self, message: impl Into<String>) {
1785        tracing::info!(
1786            target: "duroxide::activity",
1787            instance_id = %self.instance_id,
1788            execution_id = %self.execution_id,
1789            orchestration_name = %self.orchestration_name,
1790            orchestration_version = %self.orchestration_version,
1791            activity_name = %self.activity_name,
1792            activity_id = %self.activity_id,
1793            worker_id = %self.worker_id,
1794            "{}",
1795            message.into()
1796        );
1797    }
1798
1799    /// Emit a WARN level trace entry associated with this activity.
1800    pub fn trace_warn(&self, message: impl Into<String>) {
1801        tracing::warn!(
1802            target: "duroxide::activity",
1803            instance_id = %self.instance_id,
1804            execution_id = %self.execution_id,
1805            orchestration_name = %self.orchestration_name,
1806            orchestration_version = %self.orchestration_version,
1807            activity_name = %self.activity_name,
1808            activity_id = %self.activity_id,
1809            worker_id = %self.worker_id,
1810            "{}",
1811            message.into()
1812        );
1813    }
1814
1815    /// Emit an ERROR level trace entry associated with this activity.
1816    pub fn trace_error(&self, message: impl Into<String>) {
1817        tracing::error!(
1818            target: "duroxide::activity",
1819            instance_id = %self.instance_id,
1820            execution_id = %self.execution_id,
1821            orchestration_name = %self.orchestration_name,
1822            orchestration_version = %self.orchestration_version,
1823            activity_name = %self.activity_name,
1824            activity_id = %self.activity_id,
1825            worker_id = %self.worker_id,
1826            "{}",
1827            message.into()
1828        );
1829    }
1830
1831    /// Emit a DEBUG level trace entry associated with this activity.
1832    pub fn trace_debug(&self, message: impl Into<String>) {
1833        tracing::debug!(
1834            target: "duroxide::activity",
1835            instance_id = %self.instance_id,
1836            execution_id = %self.execution_id,
1837            orchestration_name = %self.orchestration_name,
1838            orchestration_version = %self.orchestration_version,
1839            activity_name = %self.activity_name,
1840            activity_id = %self.activity_id,
1841            worker_id = %self.worker_id,
1842            "{}",
1843            message.into()
1844        );
1845    }
1846
1847    // ===== Cancellation Support =====
1848
1849    /// Check if cancellation has been requested.
1850    ///
1851    /// Returns `true` if the parent orchestration has completed, failed,
1852    /// or been cancelled. Activities can use this for cooperative cancellation.
1853    ///
1854    /// # Example
1855    ///
1856    /// ```ignore
1857    /// for item in items {
1858    ///     if ctx.is_cancelled() {
1859    ///         return Err("Activity cancelled".into());
1860    ///     }
1861    ///     process(item).await;
1862    /// }
1863    /// ```
1864    pub fn is_cancelled(&self) -> bool {
1865        self.cancellation_token.is_cancelled()
1866    }
1867
1868    /// Returns a future that completes when cancellation is requested.
1869    ///
1870    /// Use with `tokio::select!` for interruptible activities. This allows
1871    /// activities to respond promptly to cancellation without polling.
1872    ///
1873    /// # Example
1874    ///
1875    /// ```ignore
1876    /// tokio::select! {
1877    ///     result = do_work() => return result,
1878    ///     _ = ctx.cancelled() => return Err("Cancelled".into()),
1879    /// }
1880    /// ```
1881    pub async fn cancelled(&self) {
1882        self.cancellation_token.cancelled().await
1883    }
1884
1885    /// Get a clone of the cancellation token for use in spawned tasks.
1886    ///
1887    /// If your activity spawns child tasks with `tokio::spawn()`, you should
1888    /// pass them this token so they can also respond to cancellation.
1889    ///
1890    /// **Important:** If you spawn additional tasks/threads and do not pass them
1891    /// the cancellation token, they may outlive the activity's cancellation/abort.
1892    /// This is user error - the runtime provides the signal but cannot guarantee
1893    /// termination of arbitrary spawned work.
1894    ///
1895    /// # Example
1896    ///
1897    /// ```ignore
1898    /// let token = ctx.cancellation_token();
1899    /// let handle = tokio::spawn(async move {
1900    ///     loop {
1901    ///         tokio::select! {
1902    ///             _ = do_work() => {}
1903    ///             _ = token.cancelled() => break,
1904    ///         }
1905    ///     }
1906    /// });
1907    /// ```
1908    pub fn cancellation_token(&self) -> tokio_util::sync::CancellationToken {
1909        self.cancellation_token.clone()
1910    }
1911
1912    /// Get a Client for management operations.
1913    ///
1914    /// This allows activities to perform management operations such as
1915    /// pruning old executions, deleting instances, or querying instance status.
1916    ///
1917    /// # Example: Self-Pruning Eternal Orchestration
1918    ///
1919    /// ```ignore
1920    /// // Activity that prunes old executions
1921    /// async fn prune_self(ctx: ActivityContext, _input: String) -> Result<String, String> {
1922    ///     let client = ctx.get_client();
1923    ///     let instance_id = ctx.instance_id();
1924    ///
1925    ///     let result = client.prune_executions(instance_id, PruneOptions {
1926    ///         keep_last: Some(1), // Keep only current execution
1927    ///         ..Default::default()
1928    ///     }).await.map_err(|e| e.to_string())?;
1929    ///
1930    ///     Ok(format!("Pruned {} executions", result.executions_deleted))
1931    /// }
1932    /// ```
1933    pub fn get_client(&self) -> crate::Client {
1934        crate::Client::new(self.store.clone())
1935    }
1936}
1937
1938impl std::fmt::Debug for ActivityContext {
1939    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1940        f.debug_struct("ActivityContext")
1941            .field("instance_id", &self.instance_id)
1942            .field("execution_id", &self.execution_id)
1943            .field("orchestration_name", &self.orchestration_name)
1944            .field("orchestration_version", &self.orchestration_version)
1945            .field("activity_name", &self.activity_name)
1946            .field("activity_id", &self.activity_id)
1947            .field("worker_id", &self.worker_id)
1948            .field("cancellation_token", &self.cancellation_token)
1949            .field("store", &"<Provider>")
1950            .finish()
1951    }
1952}
1953
1954#[derive(Clone)]
1955pub struct OrchestrationContext {
1956    inner: Arc<Mutex<CtxInner>>,
1957}
1958
1959/// A future that never resolves, used by `continue_as_new()` to prevent further execution.
1960///
1961/// This future always returns `Poll::Pending`, ensuring that code after `await ctx.continue_as_new()`
1962/// is unreachable. The runtime extracts actions before checking the future's state, so the
1963/// `ContinueAsNew` action is properly recorded and processed.
1964struct ContinueAsNewFuture;
1965
1966impl Future for ContinueAsNewFuture {
1967    type Output = Result<String, String>; // Matches orchestration return type, but never resolves
1968
1969    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
1970        // Always pending - never resolves, making code after await unreachable
1971        // The runtime checks pending_actions before using the output, so this value is never used
1972        Poll::Pending
1973    }
1974}
1975
1976impl OrchestrationContext {
1977    /// Construct a new context from an existing history vector.
1978    ///
1979    /// # Parameters
1980    ///
1981    /// * `orchestration_name` - The name of the orchestration being executed.
1982    /// * `orchestration_version` - The semantic version string of the orchestration.
1983    /// * `worker_id` - Optional dispatcher worker ID for logging correlation.
1984    ///   - `Some(id)`: Used by runtime dispatchers to include worker_id in traces
1985    ///   - `None`: Used by standalone/test execution without runtime context
1986    pub fn new(
1987        history: Vec<Event>,
1988        execution_id: u64,
1989        instance_id: String,
1990        orchestration_name: String,
1991        orchestration_version: String,
1992        worker_id: Option<String>,
1993    ) -> Self {
1994        Self {
1995            inner: Arc::new(Mutex::new(CtxInner::new(
1996                history,
1997                execution_id,
1998                instance_id,
1999                orchestration_name,
2000                orchestration_version,
2001                worker_id,
2002            ))),
2003        }
2004    }
2005
2006    /// Check if the orchestration is currently replaying history.
2007    ///
2008    /// Returns `true` when processing events from persisted history (replay),
2009    /// and `false` when executing new logic beyond the stored history.
2010    ///
2011    /// This is useful for skipping side effects during replay, such as:
2012    /// - Logging/tracing that should only happen on first execution
2013    /// - Metrics that shouldn't be double-counted
2014    /// - External notifications that shouldn't be re-sent
2015    ///
2016    /// # Example
2017    ///
2018    /// ```rust,no_run
2019    /// # use duroxide::OrchestrationContext;
2020    /// # async fn example(ctx: OrchestrationContext) {
2021    /// if !ctx.is_replaying() {
2022    ///     // Only log on first execution, not during replay
2023    ///     println!("Starting workflow for the first time");
2024    /// }
2025    /// # }
2026    /// ```
2027    pub fn is_replaying(&self) -> bool {
2028        self.inner.lock().unwrap().is_replaying
2029    }
2030
2031    /// Set the replaying state (used by replay engine and test harness).
2032    #[doc(hidden)]
2033    pub fn set_is_replaying(&self, is_replaying: bool) {
2034        self.inner.lock().unwrap().is_replaying = is_replaying;
2035    }
2036
2037    /// Bind an external subscription to a schedule_id (used by replay engine and test harness).
2038    #[doc(hidden)]
2039    pub fn bind_external_subscription(&self, schedule_id: u64, name: &str) {
2040        self.inner.lock().unwrap().bind_external_subscription(schedule_id, name);
2041    }
2042
2043    /// Deliver an external event (used by replay engine and test harness).
2044    #[doc(hidden)]
2045    pub fn deliver_external_event(&self, name: String, data: String) {
2046        self.inner.lock().unwrap().deliver_external_event(name, data);
2047    }
2048
2049    // =========================================================================
2050    // Cancellation Support (DurableFuture integration)
2051    // =========================================================================
2052
2053    /// Mark a token as cancelled (called by DurableFuture::drop).
2054    pub(crate) fn mark_token_cancelled(&self, token: u64, kind: &ScheduleKind) {
2055        self.inner.lock().unwrap().mark_token_cancelled(token, kind.clone());
2056    }
2057
2058    /// Get cancelled activity schedule_ids for this turn.
2059    pub(crate) fn get_cancelled_activity_ids(&self) -> Vec<u64> {
2060        self.inner.lock().unwrap().get_cancelled_activity_ids()
2061    }
2062
2063    /// Get cancelled sub-orchestration cancellations for this turn.
2064    pub(crate) fn get_cancelled_sub_orchestration_cancellations(&self) -> Vec<(u64, String)> {
2065        self.inner
2066            .lock()
2067            .unwrap()
2068            .get_cancelled_sub_orchestration_cancellations()
2069    }
2070
2071    /// Clear cancelled tokens after processing (called by replay engine).
2072    pub(crate) fn clear_cancelled_tokens(&self) {
2073        self.inner.lock().unwrap().clear_cancelled_tokens();
2074    }
2075
2076    /// Bind a sub-orchestration token to its resolved instance ID.
2077    pub(crate) fn bind_sub_orchestration_instance(&self, token: u64, instance_id: String) {
2078        self.inner
2079            .lock()
2080            .unwrap()
2081            .bind_sub_orchestration_instance(token, instance_id);
2082    }
2083
2084    // =========================================================================
2085    // Simplified Mode Tracing (Replay-Guarded)
2086    // =========================================================================
2087    //
2088    // These trace methods use `is_replaying()` as a guard, which means:
2089    // - No history events are created for traces
2090    // - Traces only emit on first execution, not during replay
2091    // - Much simpler and more efficient than system-call-based tracing
2092
2093    /// Convenience wrapper for INFO level tracing.
2094    ///
2095    /// Logs with INFO level and includes instance context automatically.
2096    /// Only emits on first execution, not during replay.
2097    ///
2098    /// # Example
2099    ///
2100    /// ```rust,no_run
2101    /// # use duroxide::OrchestrationContext;
2102    /// # async fn example(ctx: OrchestrationContext) {
2103    /// ctx.trace_info("Processing order");
2104    /// ctx.trace_info(format!("Processing {} items", 42));
2105    /// # }
2106    /// ```
2107    pub fn trace_info(&self, message: impl Into<String>) {
2108        self.trace("INFO", message);
2109    }
2110
2111    /// Convenience wrapper for WARN level tracing.
2112    ///
2113    /// Logs with WARN level and includes instance context automatically.
2114    /// Only emits on first execution, not during replay.
2115    ///
2116    /// # Example
2117    ///
2118    /// ```rust,no_run
2119    /// # use duroxide::OrchestrationContext;
2120    /// # async fn example(ctx: OrchestrationContext) {
2121    /// ctx.trace_warn("Retrying failed operation");
2122    /// # }
2123    /// ```
2124    pub fn trace_warn(&self, message: impl Into<String>) {
2125        self.trace("WARN", message);
2126    }
2127
2128    /// Convenience wrapper for ERROR level tracing.
2129    ///
2130    /// Logs with ERROR level and includes instance context automatically.
2131    /// Only emits on first execution, not during replay.
2132    ///
2133    /// # Example
2134    ///
2135    /// ```rust,no_run
2136    /// # use duroxide::OrchestrationContext;
2137    /// # async fn example(ctx: OrchestrationContext) {
2138    /// ctx.trace_error("Payment processing failed");
2139    /// # }
2140    /// ```
2141    pub fn trace_error(&self, message: impl Into<String>) {
2142        self.trace("ERROR", message);
2143    }
2144
2145    /// Convenience wrapper for DEBUG level tracing.
2146    ///
2147    /// Logs with DEBUG level and includes instance context automatically.
2148    /// Only emits on first execution, not during replay.
2149    ///
2150    /// # Example
2151    ///
2152    /// ```rust,no_run
2153    /// # use duroxide::OrchestrationContext;
2154    /// # async fn example(ctx: OrchestrationContext) {
2155    /// ctx.trace_debug("Detailed state information");
2156    /// # }
2157    /// ```
2158    pub fn trace_debug(&self, message: impl Into<String>) {
2159        self.trace("DEBUG", message);
2160    }
2161
2162    /// Drain emitted actions.
2163    /// Returns a list of (token, Action) pairs.
2164    #[doc(hidden)]
2165    pub fn drain_emitted_actions(&self) -> Vec<(u64, Action)> {
2166        self.inner.lock().unwrap().drain_emitted_actions()
2167    }
2168
2169    /// Get a snapshot of emitted actions without draining.
2170    /// Returns a list of (token, Action) pairs.
2171    #[doc(hidden)]
2172    pub fn get_emitted_actions(&self) -> Vec<(u64, Action)> {
2173        self.inner.lock().unwrap().emitted_actions.clone()
2174    }
2175
2176    /// Bind a token to a schedule_id.
2177    #[doc(hidden)]
2178    pub fn bind_token(&self, token: u64, schedule_id: u64) {
2179        self.inner.lock().unwrap().bind_token(token, schedule_id);
2180    }
2181
2182    /// Deliver a result for a token.
2183    #[doc(hidden)]
2184    pub fn deliver_result(&self, schedule_id: u64, result: CompletionResult) {
2185        self.inner.lock().unwrap().deliver_result(schedule_id, result);
2186    }
2187
2188    /// Returns the orchestration instance identifier.
2189    ///
2190    /// This is the unique identifier for this orchestration instance, typically
2191    /// provided when starting the orchestration.
2192    ///
2193    /// # Example
2194    ///
2195    /// ```rust,no_run
2196    /// # use duroxide::OrchestrationContext;
2197    /// # async fn example(ctx: OrchestrationContext) {
2198    /// let id = ctx.instance_id();
2199    /// ctx.trace_info(format!("Processing instance: {}", id));
2200    /// # }
2201    /// ```
2202    pub fn instance_id(&self) -> String {
2203        self.inner.lock().unwrap().instance_id.clone()
2204    }
2205
2206    /// Returns the current execution ID within this orchestration instance.
2207    ///
2208    /// The execution ID increments each time `continue_as_new()` is called.
2209    /// Execution 1 is the initial execution.
2210    ///
2211    /// # Example
2212    ///
2213    /// ```rust,no_run
2214    /// # use duroxide::OrchestrationContext;
2215    /// # async fn example(ctx: OrchestrationContext) {
2216    /// let exec_id = ctx.execution_id();
2217    /// ctx.trace_info(format!("Execution #{}", exec_id));
2218    /// # }
2219    /// ```
2220    pub fn execution_id(&self) -> u64 {
2221        self.inner.lock().unwrap().execution_id
2222    }
2223
2224    /// Returns the orchestration name.
2225    ///
2226    /// This is the name registered with the orchestration registry.
2227    ///
2228    /// # Example
2229    ///
2230    /// ```rust,no_run
2231    /// # use duroxide::OrchestrationContext;
2232    /// # async fn example(ctx: OrchestrationContext) {
2233    /// let name = ctx.orchestration_name();
2234    /// ctx.trace_info(format!("Running orchestration: {}", name));
2235    /// # }
2236    /// ```
2237    pub fn orchestration_name(&self) -> String {
2238        self.inner.lock().unwrap().orchestration_name.clone()
2239    }
2240
2241    /// Returns the orchestration version.
2242    ///
2243    /// This is the semantic version string associated with the orchestration.
2244    ///
2245    /// # Example
2246    ///
2247    /// ```rust,no_run
2248    /// # use duroxide::OrchestrationContext;
2249    /// # async fn example(ctx: OrchestrationContext) {
2250    /// let version = ctx.orchestration_version();
2251    /// ctx.trace_info(format!("Version: {}", version));
2252    /// # }
2253    /// ```
2254    pub fn orchestration_version(&self) -> String {
2255        self.inner.lock().unwrap().orchestration_version.clone()
2256    }
2257
2258    // Replay-safe logging control
2259    /// Indicates whether logging is enabled for the current poll. This is
2260    /// flipped on when a decision is recorded to minimize log noise.
2261    pub fn is_logging_enabled(&self) -> bool {
2262        self.inner.lock().unwrap().logging_enabled_this_poll
2263    }
2264    // log_buffer removed - not used
2265
2266    /// Emit a structured trace entry with automatic context correlation.
2267    ///
2268    /// Creates a system call event for deterministic replay and logs to tracing.
2269    /// The log entry automatically includes correlation fields:
2270    /// - `instance_id` - The orchestration instance identifier
2271    /// - `execution_id` - The current execution number
2272    /// - `orchestration_name` - Name of the orchestration
2273    /// - `orchestration_version` - Semantic version
2274    ///
2275    /// # Determinism
2276    ///
2277    /// This method is replay-safe: logs are only emitted on first execution,
2278    /// not during replay.
2279    ///
2280    /// # Example
2281    ///
2282    /// ```rust,no_run
2283    /// # use duroxide::OrchestrationContext;
2284    /// # async fn example(ctx: OrchestrationContext) {
2285    /// ctx.trace("INFO", "Processing started");
2286    /// ctx.trace("WARN", format!("Retry attempt: {}", 3));
2287    /// ctx.trace("ERROR", "Payment validation failed");
2288    /// # }
2289    /// ```
2290    ///
2291    /// # Output
2292    ///
2293    /// ```text
2294    /// 2024-10-30T10:15:23.456Z INFO duroxide::orchestration [order-123] Processing started
2295    /// ```
2296    ///
2297    /// All logs include instance_id, execution_id, orchestration_name for correlation.
2298    pub fn trace(&self, level: impl Into<String>, message: impl Into<String>) {
2299        self.trace_internal(&level.into(), &message.into());
2300    }
2301
2302    /// Internal implementation of trace (guarded by is_replaying)
2303    fn trace_internal(&self, level: &str, message: &str) {
2304        let inner = self.inner.lock().unwrap();
2305
2306        // Only trace if not replaying
2307        if !inner.is_replaying {
2308            match level.to_uppercase().as_str() {
2309                "INFO" => tracing::info!(
2310                    target: "duroxide::orchestration",
2311                    instance_id = %inner.instance_id,
2312                    execution_id = %inner.execution_id,
2313                    orchestration_name = %inner.orchestration_name,
2314                    orchestration_version = %inner.orchestration_version,
2315                    "{}",
2316                    message
2317                ),
2318                "WARN" => tracing::warn!(
2319                    target: "duroxide::orchestration",
2320                    instance_id = %inner.instance_id,
2321                    execution_id = %inner.execution_id,
2322                    orchestration_name = %inner.orchestration_name,
2323                    orchestration_version = %inner.orchestration_version,
2324                    "{}",
2325                    message
2326                ),
2327                "ERROR" => tracing::error!(
2328                    target: "duroxide::orchestration",
2329                    instance_id = %inner.instance_id,
2330                    execution_id = %inner.execution_id,
2331                    orchestration_name = %inner.orchestration_name,
2332                    orchestration_version = %inner.orchestration_version,
2333                    "{}",
2334                    message
2335                ),
2336                "DEBUG" => tracing::debug!(
2337                    target: "duroxide::orchestration",
2338                    instance_id = %inner.instance_id,
2339                    execution_id = %inner.execution_id,
2340                    orchestration_name = %inner.orchestration_name,
2341                    orchestration_version = %inner.orchestration_version,
2342                    "{}",
2343                    message
2344                ),
2345                _ => tracing::trace!(
2346                    target: "duroxide::orchestration",
2347                    instance_id = %inner.instance_id,
2348                    execution_id = %inner.execution_id,
2349                    orchestration_name = %inner.orchestration_name,
2350                    orchestration_version = %inner.orchestration_version,
2351                    level = %level,
2352                    "{}",
2353                    message
2354                ),
2355            }
2356        }
2357    }
2358
2359    /// Generate a new deterministic GUID.
2360    ///
2361    /// This schedules a built-in activity that generates a unique identifier.
2362    /// The GUID is deterministic across replays (the same value is returned
2363    /// when the orchestration replays).
2364    ///
2365    /// # Example
2366    ///
2367    /// ```rust,no_run
2368    /// # use duroxide::OrchestrationContext;
2369    /// # async fn example(ctx: OrchestrationContext) -> Result<(), String> {
2370    /// let guid = ctx.new_guid().await?;
2371    /// println!("Generated GUID: {}", guid);
2372    /// # Ok(())
2373    /// # }
2374    /// ```
2375    pub fn new_guid(&self) -> impl Future<Output = Result<String, String>> {
2376        self.schedule_activity(SYSCALL_ACTIVITY_NEW_GUID, "")
2377    }
2378
2379    /// Get the current UTC time.
2380    ///
2381    /// This schedules a built-in activity that returns the current time.
2382    /// The time is deterministic across replays (the same value is returned
2383    /// when the orchestration replays).
2384    ///
2385    /// # Errors
2386    ///
2387    /// Returns an error if the activity fails or if the time value cannot be parsed.
2388    ///
2389    /// # Example
2390    ///
2391    /// ```rust,no_run
2392    /// # use duroxide::OrchestrationContext;
2393    /// # use std::time::{SystemTime, Duration};
2394    /// # async fn example(ctx: OrchestrationContext) -> Result<(), String> {
2395    /// let now = ctx.utc_now().await?;
2396    /// let deadline = now + Duration::from_secs(3600); // 1 hour from now
2397    /// # Ok(())
2398    /// # }
2399    /// ```
2400    pub fn utc_now(&self) -> impl Future<Output = Result<SystemTime, String>> {
2401        let fut = self.schedule_activity(SYSCALL_ACTIVITY_UTC_NOW_MS, "");
2402        async move {
2403            let s = fut.await?;
2404            let ms = s.parse::<u64>().map_err(|e| e.to_string())?;
2405            Ok(UNIX_EPOCH + StdDuration::from_millis(ms))
2406        }
2407    }
2408
2409    /// Continue the current execution as a new execution with fresh input.
2410    ///
2411    /// This terminates the current execution and starts a new execution with the provided input.
2412    /// Returns a future that never resolves, ensuring code after `await` is unreachable.
2413    ///
2414    /// # Example
2415    /// ```rust,no_run
2416    /// # use duroxide::OrchestrationContext;
2417    /// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
2418    /// let n: u32 = 0;
2419    /// if n < 2 {
2420    ///     return ctx.continue_as_new("next_input").await; // Execution terminates here
2421    ///     // This code is unreachable - compiler will warn
2422    /// }
2423    /// Ok("completed".to_string())
2424    /// # }
2425    /// ```
2426    pub fn continue_as_new(&self, input: impl Into<String>) -> impl Future<Output = Result<String, String>> {
2427        let mut inner = self.inner.lock().unwrap();
2428        let input: String = input.into();
2429        let action = Action::ContinueAsNew { input, version: None };
2430
2431        inner.emit_action(action);
2432        ContinueAsNewFuture
2433    }
2434
2435    pub fn continue_as_new_typed<In: serde::Serialize>(
2436        &self,
2437        input: &In,
2438    ) -> impl Future<Output = Result<String, String>> {
2439        // Serialization should never fail for valid input types - if it does, it's a programming error
2440        let payload =
2441            crate::_typed_codec::Json::encode(input).expect("Serialization should never fail for valid input");
2442        self.continue_as_new(payload)
2443    }
2444
2445    /// ContinueAsNew to a specific target version (string is parsed as semver later).
2446    pub fn continue_as_new_versioned(
2447        &self,
2448        version: impl Into<String>,
2449        input: impl Into<String>,
2450    ) -> impl Future<Output = Result<String, String>> {
2451        let mut inner = self.inner.lock().unwrap();
2452        let action = Action::ContinueAsNew {
2453            input: input.into(),
2454            version: Some(version.into()),
2455        };
2456        inner.emit_action(action);
2457        ContinueAsNewFuture
2458    }
2459}
2460
2461/// Generate a deterministic GUID for use in orchestrations.
2462///
2463/// Uses timestamp + thread-local counter for uniqueness.
2464pub(crate) fn generate_guid() -> String {
2465    use std::time::{SystemTime, UNIX_EPOCH};
2466
2467    let timestamp = SystemTime::now()
2468        .duration_since(UNIX_EPOCH)
2469        .map(|d| d.as_nanos())
2470        .unwrap_or(0);
2471
2472    // Thread-local counter for uniqueness within the same timestamp
2473    thread_local! {
2474        static COUNTER: std::cell::Cell<u32> = const { std::cell::Cell::new(0) };
2475    }
2476    let counter = COUNTER.with(|c| {
2477        let val = c.get();
2478        c.set(val.wrapping_add(1));
2479        val
2480    });
2481
2482    // Format as UUID-like string
2483    format!(
2484        "{:08x}-{:04x}-{:04x}-{:04x}-{:012x}",
2485        (timestamp >> 96) as u32,
2486        ((timestamp >> 80) & 0xFFFF) as u16,
2487        (counter & 0xFFFF) as u16,
2488        ((timestamp >> 64) & 0xFFFF) as u16,
2489        (timestamp & 0xFFFFFFFFFFFF) as u64
2490    )
2491}
2492
2493impl OrchestrationContext {
2494    /// Schedule activity with automatic retry on failure.
2495    ///
2496    /// **Retry behavior:**
2497    /// - Retries on activity **errors** up to `policy.max_attempts`
2498    /// - **Timeouts are NOT retried** - if any attempt times out, returns error immediately
2499    /// - Only application errors trigger retry logic
2500    ///
2501    /// **Timeout behavior (if `policy.total_timeout` is set):**
2502    /// - Each activity attempt is raced against the timeout
2503    /// - If the timeout fires before the activity completes → returns timeout error (no retry)
2504    /// - If the activity fails with an error before timeout → retry according to policy
2505    ///
2506    /// # Example
2507    ///
2508    /// ```rust,no_run
2509    /// # use duroxide::{OrchestrationContext, RetryPolicy, BackoffStrategy};
2510    /// # use std::time::Duration;
2511    /// # async fn example(ctx: OrchestrationContext) -> Result<(), String> {
2512    /// // Simple retry with defaults (no timeout)
2513    /// let result = ctx.schedule_activity_with_retry(
2514    ///     "CallAPI",
2515    ///     "request",
2516    ///     RetryPolicy::new(3),
2517    /// ).await?;
2518    ///
2519    /// // Retry with per-attempt timeout and custom backoff
2520    /// let result = ctx.schedule_activity_with_retry(
2521    ///     "CallAPI",
2522    ///     "request",
2523    ///     RetryPolicy::new(5)
2524    ///         .with_timeout(Duration::from_secs(30)) // 30s per attempt
2525    ///         .with_backoff(BackoffStrategy::Fixed { delay: Duration::from_secs(1) }),
2526    /// ).await?;
2527    /// # Ok(())
2528    /// # }
2529    /// ```
2530    ///
2531    /// # Errors
2532    ///
2533    /// Returns an error if all retry attempts fail or if a timeout occurs (timeouts are not retried).
2534    pub async fn schedule_activity_with_retry(
2535        &self,
2536        name: impl Into<String>,
2537        input: impl Into<String>,
2538        policy: RetryPolicy,
2539    ) -> Result<String, String> {
2540        let name = name.into();
2541        let input = input.into();
2542        let mut last_error = String::new();
2543
2544        for attempt in 1..=policy.max_attempts {
2545            // Each attempt: optionally race against per-attempt timeout
2546            let activity_result = if let Some(timeout) = policy.timeout {
2547                // Race activity vs per-attempt timeout
2548                let deadline = async {
2549                    self.schedule_timer(timeout).await;
2550                    Err::<String, String>("timeout: activity timed out".to_string())
2551                };
2552                let activity = self.schedule_activity(&name, &input);
2553
2554                match self.select2(activity, deadline).await {
2555                    Either2::First(result) => result,
2556                    Either2::Second(Err(e)) => {
2557                        // Timeout fired - exit immediately, no retry for timeouts
2558                        return Err(e);
2559                    }
2560                    Either2::Second(Ok(_)) => unreachable!(),
2561                }
2562            } else {
2563                // No timeout - just await the activity
2564                self.schedule_activity(&name, &input).await
2565            };
2566
2567            match activity_result {
2568                Ok(result) => return Ok(result),
2569                Err(e) => {
2570                    // Activity failed with error - apply retry policy
2571                    last_error = e.clone();
2572                    if attempt < policy.max_attempts {
2573                        self.trace(
2574                            "warn",
2575                            format!(
2576                                "Activity '{}' attempt {}/{} failed: {}. Retrying...",
2577                                name, attempt, policy.max_attempts, e
2578                            ),
2579                        );
2580                        let delay = policy.delay_for_attempt(attempt);
2581                        if !delay.is_zero() {
2582                            self.schedule_timer(delay).await;
2583                        }
2584                    }
2585                }
2586            }
2587        }
2588        Err(last_error)
2589    }
2590
2591    /// Typed variant of `schedule_activity_with_retry`.
2592    ///
2593    /// Serializes input once and deserializes the successful result.
2594    ///
2595    /// # Errors
2596    ///
2597    /// Returns an error if all retry attempts fail, if a timeout occurs, if input serialization fails, or if result deserialization fails.
2598    pub async fn schedule_activity_with_retry_typed<In: serde::Serialize, Out: serde::de::DeserializeOwned>(
2599        &self,
2600        name: impl Into<String>,
2601        input: &In,
2602        policy: RetryPolicy,
2603    ) -> Result<Out, String> {
2604        let payload = crate::_typed_codec::Json::encode(input).expect("encode");
2605        let result = self.schedule_activity_with_retry(name, payload, policy).await?;
2606        crate::_typed_codec::Json::decode::<Out>(&result)
2607    }
2608
2609    /// Schedule a detached orchestration with an explicit instance id.
2610    /// The runtime will prefix this with the parent instance to ensure global uniqueness.
2611    pub fn schedule_orchestration(
2612        &self,
2613        name: impl Into<String>,
2614        instance: impl Into<String>,
2615        input: impl Into<String>,
2616    ) {
2617        let name: String = name.into();
2618        let instance: String = instance.into();
2619        let input: String = input.into();
2620        let mut inner = self.inner.lock().unwrap();
2621
2622        let _ = inner.emit_action(Action::StartOrchestrationDetached {
2623            scheduling_event_id: 0, // Will be assigned by replay engine
2624            name,
2625            version: None,
2626            instance,
2627            input,
2628        });
2629    }
2630
2631    pub fn schedule_orchestration_typed<In: serde::Serialize>(
2632        &self,
2633        name: impl Into<String>,
2634        instance: impl Into<String>,
2635        input: &In,
2636    ) {
2637        let payload = crate::_typed_codec::Json::encode(input).expect("encode");
2638        self.schedule_orchestration(name, instance, payload)
2639    }
2640
2641    /// Versioned detached orchestration start (string I/O). If `version` is None, registry policy is used for the child.
2642    pub fn schedule_orchestration_versioned(
2643        &self,
2644        name: impl Into<String>,
2645        version: Option<String>,
2646        instance: impl Into<String>,
2647        input: impl Into<String>,
2648    ) {
2649        let name: String = name.into();
2650        let instance: String = instance.into();
2651        let input: String = input.into();
2652        let mut inner = self.inner.lock().unwrap();
2653
2654        let _ = inner.emit_action(Action::StartOrchestrationDetached {
2655            scheduling_event_id: 0, // Will be assigned by replay engine
2656            name,
2657            version,
2658            instance,
2659            input,
2660        });
2661    }
2662
2663    pub fn schedule_orchestration_versioned_typed<In: serde::Serialize>(
2664        &self,
2665        name: impl Into<String>,
2666        version: Option<String>,
2667        instance: impl Into<String>,
2668        input: &In,
2669    ) {
2670        let payload = crate::_typed_codec::Json::encode(input).expect("encode");
2671        self.schedule_orchestration_versioned(name, version, instance, payload)
2672    }
2673}
2674
2675// Aggregate future machinery lives below (OrchestrationContext helpers)
2676
2677impl OrchestrationContext {
2678    // =========================================================================
2679    // Core scheduling methods - return DurableFuture with cancellation support
2680    // =========================================================================
2681
2682    /// Schedule an activity and return a cancellation-aware future.
2683    ///
2684    /// Returns a [`DurableFuture`] that supports cancellation on drop. If the future
2685    /// is dropped without completing (e.g., as a select loser), the activity will be
2686    /// cancelled via lock stealing.
2687    ///
2688    /// # Example
2689    ///
2690    /// ```rust,no_run
2691    /// # use duroxide::OrchestrationContext;
2692    /// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
2693    /// // Fan-out to multiple activities
2694    /// let f1 = ctx.schedule_activity("Process", "A");
2695    /// let f2 = ctx.schedule_activity("Process", "B");
2696    /// let results = ctx.join(vec![f1, f2]).await;
2697    /// # Ok("done".to_string())
2698    /// # }
2699    /// ```
2700    pub fn schedule_activity(
2701        &self,
2702        name: impl Into<String>,
2703        input: impl Into<String>,
2704    ) -> DurableFuture<Result<String, String>> {
2705        self.schedule_activity_internal(name, input, None)
2706    }
2707
2708    /// Typed version of schedule_activity that serializes input and deserializes output.
2709    ///
2710    /// # Errors
2711    ///
2712    /// Returns an error if the activity fails or if the output cannot be deserialized.
2713    pub fn schedule_activity_typed<In: serde::Serialize, Out: serde::de::DeserializeOwned>(
2714        &self,
2715        name: impl Into<String>,
2716        input: &In,
2717    ) -> impl Future<Output = Result<Out, String>> {
2718        let payload = crate::_typed_codec::Json::encode(input).expect("encode");
2719        let fut = self.schedule_activity(name, payload);
2720        async move {
2721            let s = fut.await?;
2722            crate::_typed_codec::Json::decode::<Out>(&s)
2723        }
2724    }
2725
2726    /// Schedule an activity routed to the worker owning the given session.
2727    ///
2728    /// If no worker owns the session, any worker can claim it on first fetch.
2729    /// Once claimed, all subsequent activities with the same `session_id` route
2730    /// to the claiming worker until the session unpins (idle timeout or worker death).
2731    ///
2732    /// # Example
2733    ///
2734    /// ```rust,no_run
2735    /// # use duroxide::OrchestrationContext;
2736    /// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
2737    /// let session_id = ctx.new_guid().await?;
2738    /// let result = ctx.schedule_activity_on_session("run_turn", "input", &session_id).await?;
2739    /// # Ok(result)
2740    /// # }
2741    /// ```
2742    pub fn schedule_activity_on_session(
2743        &self,
2744        name: impl Into<String>,
2745        input: impl Into<String>,
2746        session_id: impl Into<String>,
2747    ) -> DurableFuture<Result<String, String>> {
2748        self.schedule_activity_internal(name, input, Some(session_id.into()))
2749    }
2750
2751    /// Typed version of schedule_activity_on_session that serializes input and deserializes output.
2752    ///
2753    /// # Errors
2754    ///
2755    /// Returns an error if the activity fails or if the output cannot be deserialized.
2756    pub fn schedule_activity_on_session_typed<In: serde::Serialize, Out: serde::de::DeserializeOwned>(
2757        &self,
2758        name: impl Into<String>,
2759        input: &In,
2760        session_id: impl Into<String>,
2761    ) -> impl Future<Output = Result<Out, String>> {
2762        let payload = crate::_typed_codec::Json::encode(input).expect("encode");
2763        let fut = self.schedule_activity_on_session(name, payload, session_id);
2764        async move {
2765            let s = fut.await?;
2766            crate::_typed_codec::Json::decode::<Out>(&s)
2767        }
2768    }
2769
2770    /// Internal implementation for activity scheduling.
2771    fn schedule_activity_internal(
2772        &self,
2773        name: impl Into<String>,
2774        input: impl Into<String>,
2775        session_id: Option<String>,
2776    ) -> DurableFuture<Result<String, String>> {
2777        let name: String = name.into();
2778        let input: String = input.into();
2779
2780        let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
2781
2782        let token = inner.emit_action(Action::CallActivity {
2783            scheduling_event_id: 0, // Will be assigned by replay engine
2784            name: name.clone(),
2785            input: input.clone(),
2786            session_id,
2787        });
2788        drop(inner);
2789
2790        let ctx = self.clone();
2791        let inner_future = std::future::poll_fn(move |_cx| {
2792            let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
2793            if let Some(result) = inner.get_result(token) {
2794                match result {
2795                    CompletionResult::ActivityOk(s) => Poll::Ready(Ok(s.clone())),
2796                    CompletionResult::ActivityErr(e) => Poll::Ready(Err(e.clone())),
2797                    _ => Poll::Pending, // Wrong result type, keep waiting
2798                }
2799            } else {
2800                Poll::Pending
2801            }
2802        });
2803
2804        DurableFuture::new(token, ScheduleKind::Activity { name }, self.clone(), inner_future)
2805    }
2806
2807    /// Schedule a timer and return a cancellation-aware future.
2808    ///
2809    /// Timers are virtual constructs - dropping the future is a no-op since there's
2810    /// no external state to cancel. However, wrapping in `DurableFuture` maintains
2811    /// API consistency.
2812    pub fn schedule_timer(&self, delay: std::time::Duration) -> DurableFuture<()> {
2813        let delay_ms = delay.as_millis() as u64;
2814
2815        let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
2816
2817        let now = inner.now_ms();
2818        let fire_at_ms = now.saturating_add(delay_ms);
2819        let token = inner.emit_action(Action::CreateTimer {
2820            scheduling_event_id: 0,
2821            fire_at_ms,
2822        });
2823        drop(inner);
2824
2825        let ctx = self.clone();
2826        let inner_future = std::future::poll_fn(move |_cx| {
2827            let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
2828            if let Some(result) = inner.get_result(token) {
2829                match result {
2830                    CompletionResult::TimerFired => Poll::Ready(()),
2831                    _ => Poll::Pending,
2832                }
2833            } else {
2834                Poll::Pending
2835            }
2836        });
2837
2838        DurableFuture::new(token, ScheduleKind::Timer, self.clone(), inner_future)
2839    }
2840
2841    /// Subscribe to an external event and return a cancellation-aware future.
2842    ///
2843    /// External waits are virtual constructs - dropping the future is a no-op since
2844    /// there's no external state to cancel. However, wrapping in `DurableFuture`
2845    /// maintains API consistency.
2846    pub fn schedule_wait(&self, name: impl Into<String>) -> DurableFuture<String> {
2847        let name: String = name.into();
2848
2849        let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
2850
2851        let token = inner.emit_action(Action::WaitExternal {
2852            scheduling_event_id: 0,
2853            name: name.clone(),
2854        });
2855        drop(inner);
2856
2857        let ctx = self.clone();
2858        let inner_future = std::future::poll_fn(move |_cx| {
2859            let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
2860            // Only resolve once the token has been bound to a persisted schedule_id.
2861            // External events arriving before subscription binding are currently unsupported.
2862            if let Some(bound_id) = inner.get_bound_schedule_id(token)
2863                && let Some(data) = inner.get_external_event(bound_id)
2864            {
2865                return Poll::Ready(data.clone());
2866            }
2867            Poll::Pending
2868        });
2869
2870        DurableFuture::new(
2871            token,
2872            ScheduleKind::ExternalWait { event_name: name },
2873            self.clone(),
2874            inner_future,
2875        )
2876    }
2877
2878    /// Typed version of schedule_wait.
2879    pub fn schedule_wait_typed<T: serde::de::DeserializeOwned>(
2880        &self,
2881        name: impl Into<String>,
2882    ) -> impl Future<Output = T> {
2883        let fut = self.schedule_wait(name);
2884        async move {
2885            let s = fut.await;
2886            crate::_typed_codec::Json::decode::<T>(&s).expect("decode")
2887        }
2888    }
2889
2890    /// V2: Subscribe to an external event with topic-based pub/sub matching.
2891    ///
2892    /// Same semantics as `schedule_wait`, but matches on both `name` AND `topic`.
2893    /// Feature-gated for replay engine extensibility verification.
2894    #[cfg(feature = "replay-version-test")]
2895    pub fn schedule_wait2(&self, name: impl Into<String>, topic: impl Into<String>) -> DurableFuture<String> {
2896        let name: String = name.into();
2897        let topic: String = topic.into();
2898
2899        let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
2900
2901        let token = inner.emit_action(Action::WaitExternal2 {
2902            scheduling_event_id: 0,
2903            name: name.clone(),
2904            topic: topic.clone(),
2905        });
2906        drop(inner);
2907
2908        let ctx = self.clone();
2909        let inner_future = std::future::poll_fn(move |_cx| {
2910            let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
2911            if let Some(bound_id) = inner.get_bound_schedule_id(token)
2912                && let Some(data) = inner.get_external_event2(bound_id)
2913            {
2914                return Poll::Ready(data.clone());
2915            }
2916            Poll::Pending
2917        });
2918
2919        DurableFuture::new(
2920            token,
2921            ScheduleKind::ExternalWait { event_name: name },
2922            self.clone(),
2923            inner_future,
2924        )
2925    }
2926
2927    /// Schedule a sub-orchestration and return a cancellation-aware future.
2928    ///
2929    /// The child instance ID is auto-generated from the event ID with a parent prefix.
2930    ///
2931    /// Returns a [`DurableFuture`] that supports cancellation on drop. If the future
2932    /// is dropped without completing, a `CancelInstance` work item will be enqueued
2933    /// for the child orchestration.
2934    pub fn schedule_sub_orchestration(
2935        &self,
2936        name: impl Into<String>,
2937        input: impl Into<String>,
2938    ) -> DurableFuture<Result<String, String>> {
2939        self.schedule_sub_orchestration_versioned_with_id_internal(name, None, None, input)
2940    }
2941
2942    /// Schedule a sub-orchestration with an explicit instance ID.
2943    ///
2944    /// The provided `instance` value is used exactly as the child instance ID,
2945    /// without any parent prefix. Use this when you need to control the exact
2946    /// instance ID for the sub-orchestration.
2947    ///
2948    /// For auto-generated instance IDs, use [`schedule_sub_orchestration`] instead.
2949    pub fn schedule_sub_orchestration_with_id(
2950        &self,
2951        name: impl Into<String>,
2952        instance: impl Into<String>,
2953        input: impl Into<String>,
2954    ) -> DurableFuture<Result<String, String>> {
2955        self.schedule_sub_orchestration_versioned_with_id_internal(name, None, Some(instance.into()), input)
2956    }
2957
2958    /// Schedule a versioned sub-orchestration.
2959    ///
2960    /// If `version` is `Some`, that specific version is used.
2961    /// If `version` is `None`, the registry's policy (e.g., Latest) is used.
2962    pub fn schedule_sub_orchestration_versioned(
2963        &self,
2964        name: impl Into<String>,
2965        version: Option<String>,
2966        input: impl Into<String>,
2967    ) -> DurableFuture<Result<String, String>> {
2968        self.schedule_sub_orchestration_versioned_with_id_internal(name, version, None, input)
2969    }
2970
2971    /// Schedule a versioned sub-orchestration with an explicit instance ID.
2972    ///
2973    /// The provided `instance` value is used exactly as the child instance ID,
2974    /// without any parent prefix.
2975    ///
2976    /// Returns a [`DurableFuture`] that supports cancellation on drop. If the future
2977    /// is dropped without completing, a `CancelInstance` work item will be enqueued
2978    /// for the child orchestration.
2979    pub fn schedule_sub_orchestration_versioned_with_id(
2980        &self,
2981        name: impl Into<String>,
2982        version: Option<String>,
2983        instance: impl Into<String>,
2984        input: impl Into<String>,
2985    ) -> DurableFuture<Result<String, String>> {
2986        self.schedule_sub_orchestration_versioned_with_id_internal(name, version, Some(instance.into()), input)
2987    }
2988
2989    /// Internal implementation for sub-orchestration scheduling.
2990    ///
2991    /// If `instance` is `Some`, it's an explicit ID (no parent prefix).
2992    /// If `instance` is `None`, auto-generate from event ID (with parent prefix).
2993    fn schedule_sub_orchestration_versioned_with_id_internal(
2994        &self,
2995        name: impl Into<String>,
2996        version: Option<String>,
2997        instance: Option<String>,
2998        input: impl Into<String>,
2999    ) -> DurableFuture<Result<String, String>> {
3000        let name: String = name.into();
3001        let input: String = input.into();
3002
3003        let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
3004
3005        // For explicit instance IDs, use them as-is (no parent prefix will be added).
3006        // For auto-generated, use placeholder that will be replaced with SUB_ORCH_AUTO_PREFIX + event_id
3007        // and parent prefix will be added.
3008        let action_instance = match &instance {
3009            Some(explicit_id) => explicit_id.clone(),
3010            None => format!("{}{}", SUB_ORCH_PENDING_PREFIX, inner.next_token + 1),
3011        };
3012        let token = inner.emit_action(Action::StartSubOrchestration {
3013            scheduling_event_id: 0,
3014            name: name.clone(),
3015            version,
3016            instance: action_instance.clone(),
3017            input: input.clone(),
3018        });
3019        drop(inner);
3020
3021        let ctx = self.clone();
3022        let inner_future = std::future::poll_fn(move |_cx| {
3023            let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
3024            if let Some(result) = inner.get_result(token) {
3025                match result {
3026                    CompletionResult::SubOrchOk(s) => Poll::Ready(Ok(s.clone())),
3027                    CompletionResult::SubOrchErr(e) => Poll::Ready(Err(e.clone())),
3028                    _ => Poll::Pending,
3029                }
3030            } else {
3031                Poll::Pending
3032            }
3033        });
3034
3035        // For cancellation, we store the token. The consumption path will look up
3036        // the resolved instance ID from the sub_orchestration_instances mapping.
3037        DurableFuture::new(
3038            token,
3039            ScheduleKind::SubOrchestration { token },
3040            self.clone(),
3041            inner_future,
3042        )
3043    }
3044
3045    /// Typed version of schedule_sub_orchestration.
3046    ///
3047    /// # Errors
3048    ///
3049    /// Returns an error if the sub-orchestration fails or if the output cannot be deserialized.
3050    pub fn schedule_sub_orchestration_typed<In: serde::Serialize, Out: serde::de::DeserializeOwned>(
3051        &self,
3052        name: impl Into<String>,
3053        input: &In,
3054    ) -> impl Future<Output = Result<Out, String>> {
3055        let payload = crate::_typed_codec::Json::encode(input).expect("encode");
3056        let fut = self.schedule_sub_orchestration(name, payload);
3057        async move {
3058            let s = fut.await?;
3059            crate::_typed_codec::Json::decode::<Out>(&s)
3060        }
3061    }
3062
3063    /// Typed version of schedule_sub_orchestration_with_id.
3064    ///
3065    /// # Errors
3066    ///
3067    /// Returns an error if the sub-orchestration fails or if the output cannot be deserialized.
3068    pub fn schedule_sub_orchestration_with_id_typed<In: serde::Serialize, Out: serde::de::DeserializeOwned>(
3069        &self,
3070        name: impl Into<String>,
3071        instance: impl Into<String>,
3072        input: &In,
3073    ) -> impl Future<Output = Result<Out, String>> {
3074        let payload = crate::_typed_codec::Json::encode(input).expect("encode");
3075        let fut = self.schedule_sub_orchestration_with_id(name, instance, payload);
3076        async move {
3077            let s = fut.await?;
3078            crate::_typed_codec::Json::decode::<Out>(&s)
3079        }
3080    }
3081
3082    /// Await all futures concurrently using `futures::future::join_all`.
3083    /// Works with any `Future` type.
3084    pub async fn join<T, F>(&self, futures: Vec<F>) -> Vec<T>
3085    where
3086        F: Future<Output = T>,
3087    {
3088        ::futures::future::join_all(futures).await
3089    }
3090
3091    /// Simplified join for exactly 2 futures (convenience method).
3092    pub async fn join2<T1, T2, F1, F2>(&self, f1: F1, f2: F2) -> (T1, T2)
3093    where
3094        F1: Future<Output = T1>,
3095        F2: Future<Output = T2>,
3096    {
3097        ::futures::future::join(f1, f2).await
3098    }
3099
3100    /// Simplified join for exactly 3 futures (convenience method).
3101    pub async fn join3<T1, T2, T3, F1, F2, F3>(&self, f1: F1, f2: F2, f3: F3) -> (T1, T2, T3)
3102    where
3103        F1: Future<Output = T1>,
3104        F2: Future<Output = T2>,
3105        F3: Future<Output = T3>,
3106    {
3107        ::futures::future::join3(f1, f2, f3).await
3108    }
3109
3110    /// Simplified select over 2 futures: returns the result of whichever completes first.
3111    /// Select over 2 futures with potentially different output types.
3112    ///
3113    /// Returns `Either2::First(result)` if first future wins, `Either2::Second(result)` if second wins.
3114    /// Uses futures::select_biased! for determinism (first branch polled first).
3115    ///
3116    /// # Example: Activity with timeout
3117    /// ```rust,no_run
3118    /// # use duroxide::{OrchestrationContext, Either2};
3119    /// # use std::time::Duration;
3120    /// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
3121    /// let work = ctx.schedule_activity("SlowWork", "input");
3122    /// let timeout = ctx.schedule_timer(Duration::from_secs(30));
3123    ///
3124    /// match ctx.select2(work, timeout).await {
3125    ///     Either2::First(result) => result,
3126    ///     Either2::Second(()) => Err("Operation timed out".to_string()),
3127    /// }
3128    /// # }
3129    /// ```
3130    pub async fn select2<T1, T2, F1, F2>(&self, f1: F1, f2: F2) -> Either2<T1, T2>
3131    where
3132        F1: Future<Output = T1>,
3133        F2: Future<Output = T2>,
3134    {
3135        use ::futures::FutureExt;
3136        let mut f1 = std::pin::pin!(f1.fuse());
3137        let mut f2 = std::pin::pin!(f2.fuse());
3138        ::futures::select_biased! {
3139            result = f1 => Either2::First(result),
3140            result = f2 => Either2::Second(result),
3141        }
3142    }
3143
3144    /// Select over 3 futures with potentially different output types.
3145    ///
3146    /// Returns `Either3::First/Second/Third(result)` depending on which future completes first.
3147    /// Uses futures::select_biased! for determinism (earlier branches polled first).
3148    pub async fn select3<T1, T2, T3, F1, F2, F3>(&self, f1: F1, f2: F2, f3: F3) -> Either3<T1, T2, T3>
3149    where
3150        F1: Future<Output = T1>,
3151        F2: Future<Output = T2>,
3152        F3: Future<Output = T3>,
3153    {
3154        use ::futures::FutureExt;
3155        let mut f1 = std::pin::pin!(f1.fuse());
3156        let mut f2 = std::pin::pin!(f2.fuse());
3157        let mut f3 = std::pin::pin!(f3.fuse());
3158        ::futures::select_biased! {
3159            result = f1 => Either3::First(result),
3160            result = f2 => Either3::Second(result),
3161            result = f3 => Either3::Third(result),
3162        }
3163    }
3164}