Skip to main content

duroxide/
lib.rs

1//! # Duroxide: Durable execution framework in Rust
2//!
3//! Duroxide is a framework for building reliable, long-running code based workflows that can survive
4//! failures and restarts. For a deep dive into how durable execution works, see the
5//! [Durable Futures Internals](https://github.com/affandar/duroxide/blob/main/docs/durable-futures-internals.md) documentation.
6//!
7//! ## Quick Start
8//!
9//! ```rust,no_run
10//! use duroxide::providers::sqlite::SqliteProvider;
11//! use duroxide::runtime::registry::ActivityRegistry;
12//! use duroxide::runtime::{self};
13//! use duroxide::{ActivityContext, OrchestrationContext, OrchestrationRegistry, Client};
14//! use std::sync::Arc;
15//!
16//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
17//! // 1. Create a storage provider
18//! let store = Arc::new(SqliteProvider::new("sqlite:./data.db", None).await.unwrap());
19//!
20//! // 2. Register activities (your business logic)
21//! let activities = ActivityRegistry::builder()
22//!     .register("Greet", |_ctx: ActivityContext, name: String| async move {
23//!         Ok(format!("Hello, {}!", name))
24//!     })
25//!     .build();
26//!
27//! // 3. Define your orchestration
28//! let orchestration = |ctx: OrchestrationContext, name: String| async move {
29//!     let greeting = ctx.schedule_activity("Greet", name).await?;
30
31// Mutex poisoning indicates a panic in another thread - a critical error.
32// All expect()/unwrap() calls on mutex locks in this module are intentional:
33// poisoned mutexes should panic as they indicate corrupted state.
34#![allow(clippy::expect_used)]
35#![allow(clippy::unwrap_used)]
36// Arc::clone() vs .clone() is a style preference - we use .clone() for brevity
37#![allow(clippy::clone_on_ref_ptr)]
38//!     Ok(greeting)
39//! };
40//!
41//! // 4. Register and start the runtime
42//! let orchestrations = OrchestrationRegistry::builder()
43//!     .register("HelloWorld", orchestration)
44//!     .build();
45//!
46//! let rt = runtime::Runtime::start_with_store(
47//!     store.clone(), activities, orchestrations
48//! ).await;
49//!
50//! // 5. Create a client and start an orchestration instance
51//! let client = Client::new(store.clone());
52//! client.start_orchestration("inst-1", "HelloWorld", "World").await?;
53//! let result = client.wait_for_orchestration("inst-1", std::time::Duration::from_secs(5)).await
54//!     .map_err(|e| format!("Wait error: {:?}", e))?;
55//! # Ok(())
56//! # }
57//! ```
58//!
59//! ## Key Concepts
60//!
61//! - **Orchestrations**: Long-running workflows written as async functions (coordination logic)
62//! - **Activities**: Single-purpose work units (can do anything - DB, API, polling, etc.)
63//!   - Supports long-running activities via automatic lock renewal (minutes to hours)
64//! - **Timers**: Use `ctx.schedule_timer(ms)` for orchestration-level delays and timeouts
65//! - **Deterministic Replay**: Orchestrations are replayed from history to ensure consistency
66//! - **Durable Futures**: Composable futures for activities, timers, and external events
67//! - **ContinueAsNew (Multi-Execution)**: An orchestration can end the current execution and
68//!   immediately start a new one with fresh input. Each execution has its own isolated history
69//!   that starts with `OrchestrationStarted { event_id: 1 }`.
70//!
71//! ## ⚠️ Important: Orchestrations vs Activities
72//!
73//! **Orchestrations = Coordination (control flow, business logic)**
74//! **Activities = Execution (single-purpose work units)**
75//!
76//! ```rust,no_run
77//! # use duroxide::OrchestrationContext;
78//! # use std::time::Duration;
79//! # async fn example(ctx: OrchestrationContext) -> Result<(), String> {
80//! // ✅ CORRECT: Orchestration-level delay using timer
81//! ctx.schedule_timer(Duration::from_secs(5)).await;  // Wait 5 seconds
82//!
83//! // ✅ ALSO CORRECT: Activity can poll/sleep as part of its work
84//! // Example: Activity that provisions a VM and polls for readiness
85//! // activities.register("ProvisionVM", |config| async move {
86//! //     let vm = create_vm(config).await?;
87//! //     while !vm_ready(&vm).await {
88//! //         tokio::time::sleep(Duration::from_secs(5)).await;  // ✅ OK - part of provisioning
89//! //     }
90//! //     Ok(vm.id)
91//! // });
92//!
93//! // ❌ WRONG: Activity that ONLY sleeps (use timer instead)
94//! // ctx.schedule_activity("Sleep5Seconds", "").await;
95//! # Ok(())
96//! # }
97//! ```
98//!
99//! **Put in Activities (single-purpose execution units):**
100//! - Database operations
101//! - API calls (can include retries/polling)
102//! - Data transformations
103//! - File I/O
104//! - VM provisioning (with internal polling)
105//!
106//! **Put in Orchestrations (coordination and business logic):**
107//! - Control flow (if/else, match, loops)
108//! - Business decisions
109//! - Multi-step workflows
110//! - Error handling and compensation
111//! - Timeouts and deadlines (use timers)
112//! - Waiting for external events
113//!
114//! ## ContinueAsNew (Multi-Execution) Semantics
115//!
116//! ContinueAsNew (CAN) allows an orchestration to end its current execution and start a new
117//! one with fresh input (useful for loops, pagination, long-running workflows).
118//!
119//! - Orchestration calls `ctx.continue_as_new(new_input)`
120//! - Runtime stamps `OrchestrationContinuedAsNew` in the CURRENT execution's history
121//! - Runtime enqueues a `WorkItem::ContinueAsNew`
122//! - When processing that work item, the runtime starts a NEW execution with:
123//!   - `execution_id = previous_execution_id + 1`
124//!   - `existing_history = []` (fresh history)
125//!   - `OrchestrationStarted { event_id: 1, input = new_input }` is stamped automatically
126//! - Each execution's history is independent; `duroxide::Client::read_execution_history(instance, id)`
127//!   returns events for that execution only
128//!
129//! Provider responsibilities are strictly storage-level (see below). The runtime owns all
130//! orchestration semantics, including execution boundaries and starting the new execution.
131//!
132//! ## Provider Responsibilities (At a Glance)
133//!
134//! Providers are pure storage abstractions. The runtime computes orchestration semantics
135//! and passes explicit instructions to the provider.
136//!
137//! - `fetch_orchestration_item()`
138//!   - Return a locked batch of work for ONE instance
139//!   - Include full history for the CURRENT `execution_id`
140//!   - Do NOT create/synthesize new executions here (even for ContinueAsNew)
141//!
142//! - `ack_orchestration_item(lock_token, execution_id, history_delta, ..., metadata)`
143//!   - Atomic commit of one orchestration turn
144//!   - Idempotently `INSERT OR IGNORE` execution row for the explicit `execution_id`
145//!   - `UPDATE instances.current_execution_id = MAX(current_execution_id, execution_id)`
146//!   - Append `history_delta` to the specified execution
147//!   - Update `executions.status` and `executions.output` from `metadata` (no event inspection)
148//!
149//! - Worker/Timer queues
150//!   - Peek-lock semantics (dequeue with lock token; ack by deleting)
151//!   - Automatic lock renewal for long-running activities (no configuration needed)
152//!   - Orchestrator, Worker, Timer queues are independent but committed atomically with history
153//!
154//! See `docs/provider-implementation-guide.md` and `src/providers/sqlite.rs` for a complete,
155//! production-grade provider implementation.
156//!
157//! ## Simplified API
158//!
159//! All schedule methods return typed futures that can be awaited directly:
160//!
161//! ```rust,no_run
162//! # use duroxide::OrchestrationContext;
163//! # use std::time::Duration;
164//! # async fn example(ctx: OrchestrationContext) -> Result<(), String> {
165//! // Activities return Result<String, String>
166//! let result = ctx.schedule_activity("Task", "input").await?;
167//!
168//! // Timers return ()
169//! ctx.schedule_timer(Duration::from_secs(5)).await;
170//!
171//! // External events return String
172//! let event = ctx.schedule_wait("Event").await;
173//!
174//! // Sub-orchestrations return Result<String, String>
175//! let sub_result = ctx.schedule_sub_orchestration("Sub", "input").await?;
176//! # Ok(())
177//! # }
178//! ```
179//!
180//! ## Common Patterns
181//!
182//! ### Function Chaining
183//! ```rust,no_run
184//! # use duroxide::OrchestrationContext;
185//! async fn chain_example(ctx: OrchestrationContext) -> Result<String, String> {
186//!     let step1 = ctx.schedule_activity("Step1", "input").await?;
187//!     let step2 = ctx.schedule_activity("Step2", &step1).await?;
188//!     Ok(step2)
189//! }
190//! ```
191//!
192//! ### Fan-Out/Fan-In
193//! ```rust,no_run
194//! # use duroxide::OrchestrationContext;
195//! async fn fanout_example(ctx: OrchestrationContext) -> Vec<String> {
196//!     let futures = vec![
197//!         ctx.schedule_activity("Process", "item1"),
198//!         ctx.schedule_activity("Process", "item2"),
199//!         ctx.schedule_activity("Process", "item3"),
200//!     ];
201//!     let results = ctx.join(futures).await;
202//!     results.into_iter().filter_map(|r| r.ok()).collect()
203//! }
204//! ```
205//!
206//! ### Human-in-the-Loop (Timeout Pattern)
207//! ```rust,no_run
208//! # use duroxide::{OrchestrationContext, Either2};
209//! # use std::time::Duration;
210//! async fn approval_example(ctx: OrchestrationContext) -> String {
211//!     let timeout = ctx.schedule_timer(Duration::from_secs(30));
212//!     let approval = ctx.schedule_wait("ApprovalEvent");
213//!     
214//!     match ctx.select2(approval, timeout).await {
215//!         Either2::First(data) => data,
216//!         Either2::Second(()) => "timeout".to_string(),
217//!     }
218//! }
219//! ```
220//!
221//! ### Delays and Timeouts
222//! ```rust,no_run
223//! # use duroxide::{OrchestrationContext, Either2};
224//! # use std::time::Duration;
225//! async fn delay_example(ctx: OrchestrationContext) -> Result<String, String> {
226//!     // Use timer for orchestration-level delays
227//!     ctx.schedule_timer(Duration::from_secs(5)).await;
228//!     
229//!     // Process after delay
230//!     let result = ctx.schedule_activity("ProcessData", "input").await?;
231//!     Ok(result)
232//! }
233//!
234//! async fn timeout_example(ctx: OrchestrationContext) -> Result<String, String> {
235//!     // Race work against timeout
236//!     let work = ctx.schedule_activity("SlowOperation", "input");
237//!     let timeout = ctx.schedule_timer(Duration::from_secs(30));
238//!     
239//!     match ctx.select2(work, timeout).await {
240//!         Either2::First(result) => result,
241//!         Either2::Second(()) => Err("Operation timed out".to_string()),
242//!     }
243//! }
244//! ```
245//!
246//! ### Fan-Out/Fan-In with Error Handling
247//! ```rust,no_run
248//! # use duroxide::OrchestrationContext;
249//! async fn fanout_with_errors(ctx: OrchestrationContext, items: Vec<String>) -> Result<Vec<String>, String> {
250//!     // Schedule all work in parallel
251//!     let futures: Vec<_> = items.iter()
252//!         .map(|item| ctx.schedule_activity("ProcessItem", item.clone()))
253//!         .collect();
254//!     
255//!     // Wait for all to complete (deterministic order preserved)
256//!     let results = ctx.join(futures).await;
257//!     
258//!     // Process results with error handling
259//!     let mut successes = Vec::new();
260//!     for result in results {
261//!         match result {
262//!             Ok(value) => successes.push(value),
263//!             Err(e) => {
264//!                 // Log error but continue processing other items
265//!                 ctx.trace_error(format!("Item processing failed: {e}"));
266//!             }
267//!         }
268//!     }
269//!     
270//!     Ok(successes)
271//! }
272//! ```
273//!
274//! ### Retry Pattern
275//! ```rust,no_run
276//! # use duroxide::{OrchestrationContext, RetryPolicy, BackoffStrategy};
277//! # use std::time::Duration;
278//! async fn retry_example(ctx: OrchestrationContext) -> Result<String, String> {
279//!     // Retry with linear backoff: 5 attempts, delay increases linearly (1s, 2s, 3s, 4s)
280//!     let result = ctx.schedule_activity_with_retry(
281//!         "UnreliableOperation",
282//!         "input",
283//!         RetryPolicy::new(5)
284//!             .with_backoff(BackoffStrategy::Linear {
285//!                 base: Duration::from_secs(1),
286//!                 max: Duration::from_secs(10),
287//!             }),
288//!     ).await?;
289//!     
290//!     Ok(result)
291//! }
292//! ```
293//!
294//! ## Examples
295//!
296//! See the `examples/` directory for complete, runnable examples:
297//! - `hello_world.rs` - Basic orchestration setup
298//! - `fan_out_fan_in.rs` - Parallel processing pattern with error handling
299//! - `timers_and_events.rs` - Human-in-the-loop workflows with timeouts
300//! - `delays_and_timeouts.rs` - Correct usage of timers for delays and timeouts
301//! - `with_observability.rs` - Using observability features (tracing, metrics)
302//! - `metrics_cli.rs` - Querying system metrics via CLI
303//!
304//! Run examples with: `cargo run --example <name>`
305//!
306//! ## Architecture
307//!
308//! This crate provides:
309//! - **Public data model**: `Event`, `Action` for history and decisions
310//! - **Orchestration driver**: `run_turn`, `run_turn_with`, and `Executor`
311//! - **OrchestrationContext**: Schedule activities, timers, and external events
312//! - **Deterministic futures**: `schedule_*()` return standard `Future`s that can be composed with `join`/`select`
313//! - **Runtime**: In-process execution engine with dispatchers and workers
314//! - **Providers**: Pluggable storage backends (filesystem, in-memory)
315//!
316//! ### End-to-End System Architecture
317//!
318//! ```text
319//! +-------------------------------------------------------------------------+
320//! |                           Application Layer                             |
321//! +-------------------------------------------------------------------------+
322//! |                                                                         |
323//! |  +--------------+         +------------------------------------+        |
324//! |  |    Client    |-------->|  start_orchestration()             |        |
325//! |  |              |         |  raise_event()                     |        |
326//! |  |              |         |  wait_for_orchestration()          |        |
327//! |  +--------------+         +------------------------------------+        |
328//! |                                                                         |
329//! +-------------------------------------------------------------------------+
330//!                                    |
331//!                                    v
332//! +-------------------------------------------------------------------------+
333//! |                            Runtime Layer                                |
334//! +-------------------------------------------------------------------------+
335//! |                                                                         |
336//! |  +-------------------------------------------------------------------+  |
337//! |  |                         Runtime                                   |  |
338//! |  |  +----------------------+         +----------------------+        |  |
339//! |  |  | Orchestration        |         | Work                 |        |  |
340//! |  |  | Dispatcher           |         | Dispatcher           |        |  |
341//! |  |  | (N concurrent)       |         | (N concurrent)       |        |  |
342//! |  |  +----------+-----------+         +----------+-----------+        |  |
343//! |  |             |                                |                    |  |
344//! |  |             | Processes turns                | Executes activities|  |
345//! |  |             |                                |                    |  |
346//! |  +-------------+--------------------------------+--------------------+  |
347//! |                |                                |                       |
348//! |  +-------------v--------------------------------v--------------------+  |
349//! |  |  OrchestrationRegistry: maps names -> orchestration handlers     |  |
350//! |  +-------------------------------------------------------------------+  |
351//! |                                                                         |
352//! |  +-------------------------------------------------------------------+  |
353//! |  |  ActivityRegistry: maps names -> activity handlers               |  |
354//! |  +-------------------------------------------------------------------+  |
355//! |                                                                         |
356//! +-------------------------------------------------------------------------+
357//!                |                                |
358//!                | Fetches work items             | Fetches work items
359//!                | (peek-lock)                    | (peek-lock)
360//!                v                                v
361//! +-------------------------------------------------------------------------+
362//! |                          Provider Layer                                 |
363//! +-------------------------------------------------------------------------+
364//! |                                                                         |
365//! |  +----------------------------+    +----------------------------+       |
366//! |  |  Orchestrator Queue        |    |  Worker Queue              |       |
367//! |  |  - StartOrchestration      |    |  - ActivityExecute         |       |
368//! |  |  - ActivityCompleted       |    |                            |       |
369//! |  |  - ActivityFailed          |    |                            |       |
370//! |  |  - TimerFired (delayed)    |    |                            |       |
371//! |  |  - ExternalRaised          |    |                            |       |
372//! |  |  - ContinueAsNew           |    |                            |       |
373//! |  +----------------------------+    +----------------------------+       |
374//! |                                                                         |
375//! |  +-------------------------------------------------------------------+  |
376//! |  |                     Provider (Storage)                            |  |
377//! |  |  - History (Events per instance/execution)                        |  |
378//! |  |  - Instance metadata                                              |  |
379//! |  |  - Execution metadata                                             |  |
380//! |  |  - Instance locks (peek-lock semantics)                           |  |
381//! |  |  - Queue management (enqueue/dequeue with visibility)             |  |
382//! |  +-------------------------------------------------------------------+  |
383//! |                                                                         |
384//! |  +-------------------------------------------------------------------+  |
385//! |  |                Storage Backend (SQLite, etc.)                     |  |
386//! |  +-------------------------------------------------------------------+  |
387//! |                                                                         |
388//! +-------------------------------------------------------------------------+
389//!
390//! ### Execution Flow
391//!
392//! 1. **Client** starts orchestration → enqueues `StartOrchestration` to orchestrator queue
393//! 2. **OrchestrationDispatcher** fetches work item (peek-lock), loads history from Provider
394//! 3. **Runtime** calls user's orchestration function with `OrchestrationContext`
395//! 4. **Orchestration** schedules activities/timers → Runtime appends `Event`s to history
396//! 5. **Runtime** enqueues `ActivityExecute` to worker queue, `TimerFired` (delayed) to orchestrator queue
397//! 6. **WorkDispatcher** fetches activity work item, executes via `ActivityRegistry`
398//! 7. **Activity** completes → enqueues `ActivityCompleted`/`ActivityFailed` to orchestrator queue
399//! 8. **OrchestrationDispatcher** processes completion → next orchestration turn
400//! 9. **Runtime** atomically commits history + queue changes via `ack_orchestration_item()`
401//!
402//! All operations are deterministic and replayable from history.
403use std::future::Future;
404use std::pin::Pin;
405use std::sync::{Arc, Mutex};
406use std::task::{Context, Poll};
407
408// Public orchestration primitives and executor
409
410pub mod client;
411pub mod runtime;
412// Re-export descriptor type for public API ergonomics
413pub use runtime::OrchestrationDescriptor;
414pub mod providers;
415
416#[cfg(feature = "provider-test")]
417pub mod provider_validations;
418
419#[cfg(feature = "provider-test")]
420pub mod provider_validation;
421
422#[cfg(feature = "provider-test")]
423pub mod provider_stress_tests;
424
425#[cfg(feature = "provider-test")]
426pub mod provider_stress_test;
427
428// Re-export key runtime types for convenience
429pub use client::{Client, ClientError};
430pub use runtime::{
431    OrchestrationHandler, OrchestrationRegistry, OrchestrationRegistryBuilder, OrchestrationStatus, RuntimeOptions,
432};
433
434// Re-export management types for convenience
435pub use providers::{
436    ExecutionInfo, InstanceInfo, ProviderAdmin, QueueDepths, ScheduledActivityIdentifier, SessionFetchConfig,
437    SystemMetrics, TagFilter,
438};
439
440// Re-export capability filtering types
441pub use providers::{DispatcherCapabilityFilter, SemverRange, current_build_version};
442
443// Re-export deletion/pruning types for Client API users
444pub use providers::{DeleteInstanceResult, InstanceFilter, InstanceTree, PruneOptions, PruneResult};
445
446// Type aliases for improved readability and maintainability
447/// Shared reference to a Provider implementation
448pub type ProviderRef = Arc<dyn providers::Provider>;
449
450/// Shared reference to an OrchestrationHandler
451pub type OrchestrationHandlerRef = Arc<dyn runtime::OrchestrationHandler>;
452
453// ============================================================================
454// Heterogeneous Select Result Types
455// ============================================================================
456
457// ============================================================================
458// Schedule Kind and DurableFuture (Cancellation Support)
459// ============================================================================
460
461/// Identifies the kind of scheduled work for cancellation purposes.
462///
463/// When a `DurableFuture` is dropped without completing, the runtime uses
464/// this discriminator to determine how to cancel the underlying work:
465/// - **Activity**: Lock stealing via provider (DELETE from worker_queue)
466/// - **Timer**: No-op (virtual construct, no external state)
467/// - **ExternalWait**: No-op (virtual construct, no external state)  
468/// - **SubOrchestration**: Enqueue `CancelInstance` work item for child
469#[derive(Debug, Clone)]
470pub enum ScheduleKind {
471    /// A scheduled activity execution
472    Activity {
473        /// Activity name for debugging/logging
474        name: String,
475    },
476    /// A durable timer
477    Timer,
478    /// Waiting for an external event
479    ExternalWait {
480        /// Event name for debugging/logging
481        event_name: String,
482    },
483    /// Waiting for a persistent external event (mailbox semantics)
484    QueueDequeue {
485        /// Event name for debugging/logging
486        event_name: String,
487    },
488    /// A sub-orchestration
489    SubOrchestration {
490        /// Token for this schedule (used to look up resolved instance ID)
491        token: u64,
492    },
493}
494
495/// A wrapper around scheduled futures that supports cancellation on drop.
496///
497/// When a `DurableFuture` is dropped without completing (e.g., as a select loser,
498/// or when going out of scope without being awaited), the underlying scheduled work
499/// is cancelled:
500///
501/// - **Activities**: Lock stealing via provider (removes from worker queue)
502/// - **Sub-orchestrations**: `CancelInstance` work item enqueued for child
503/// - **Timers/External waits**: No-op (virtual constructs with no external state)
504///
505/// # Examples
506///
507/// ```rust,no_run
508/// # use duroxide::OrchestrationContext;
509/// # use std::time::Duration;
510/// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
511/// // Activity scheduled - if timer wins, activity gets cancelled
512/// let activity = ctx.schedule_activity("SlowWork", "input");
513/// let timeout = ctx.schedule_timer(Duration::from_secs(5));
514///
515/// match ctx.select2(activity, timeout).await {
516///     duroxide::Either2::First(result) => result,
517///     duroxide::Either2::Second(()) => Err("Timed out - activity cancelled".to_string()),
518/// }
519/// # }
520/// ```
521///
522/// # Drop Semantics
523///
524/// Unlike regular Rust futures which are inert on drop, `DurableFuture` has
525/// meaningful drop semantics similar to `File` (closes on drop) or `MutexGuard`
526/// (releases lock on drop). This is intentional - we want unobserved scheduled
527/// work to be cancelled rather than leaked.
528///
529/// **Note:** Using `std::mem::forget()` on a `DurableFuture` will bypass
530/// cancellation, causing the scheduled work to run but its result to be lost.
531pub struct DurableFuture<T> {
532    /// Token assigned at creation (before schedule_id is known)
533    token: u64,
534    /// What kind of schedule this represents
535    kind: ScheduleKind,
536    /// Reference to context for cancellation registration
537    ctx: OrchestrationContext,
538    /// Whether the future has completed (to skip cancellation)
539    completed: bool,
540    /// The underlying future
541    inner: std::pin::Pin<Box<dyn Future<Output = T> + Send>>,
542}
543
544impl<T: Send + 'static> DurableFuture<T> {
545    /// Create a new `DurableFuture` wrapping an inner future.
546    fn new(
547        token: u64,
548        kind: ScheduleKind,
549        ctx: OrchestrationContext,
550        inner: impl Future<Output = T> + Send + 'static,
551    ) -> Self {
552        Self {
553            token,
554            kind,
555            ctx,
556            completed: false,
557            inner: Box::pin(inner),
558        }
559    }
560
561    /// Transform the output of this `DurableFuture` while preserving its
562    /// identity for cancellation, `join`, and `select` composability.
563    ///
564    /// The mapping function runs synchronously after the underlying future
565    /// completes. Cancellation semantics are fully preserved: dropping the
566    /// returned `DurableFuture` cancels the original scheduled work.
567    ///
568    /// # Example
569    ///
570    /// ```rust,no_run
571    /// # use duroxide::OrchestrationContext;
572    /// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
573    /// // Map an activity result to extract a field
574    /// let length = ctx.schedule_activity("Greet", "World")
575    ///     .map(|r| r.map(|s| s.len().to_string()));
576    /// let len_result = length.await?;
577    /// # Ok(len_result)
578    /// # }
579    /// ```
580    pub fn map<U: Send + 'static>(self, f: impl FnOnce(T) -> U + Send + 'static) -> DurableFuture<U> {
581        // Transfer ownership of cancellation guard to the new DurableFuture.
582        // We must defuse `self` so its Drop doesn't fire cancellation.
583        let token = self.token;
584        let kind = self.kind.clone();
585        let ctx = self.ctx.clone();
586
587        // Defuse: take the inner future out without running Drop.
588        let inner = unsafe {
589            let inner = std::ptr::read(&self.inner);
590            // Prevent Drop from running on the original (would cancel the token)
591            std::mem::forget(self);
592            inner
593        };
594
595        let mapped = async move {
596            let value = inner.await;
597            f(value)
598        };
599
600        DurableFuture::new(token, kind, ctx, mapped)
601    }
602
603    /// Set a routing tag on this scheduled activity.
604    ///
605    /// Tags direct activities to specialized workers. A worker configured with
606    /// [`crate::providers::TagFilter::tags`]`(["gpu"])` will only process activities
607    /// tagged `"gpu"`.
608    ///
609    /// This method uses **mutate-after-emit**: the action has already been emitted to
610    /// the context's action list, and `with_tag` reaches back to modify it in place.
611    /// This is safe because actions are not drained until the replay engine polls
612    /// the orchestration future (which hasn't happened yet — we're still inside the
613    /// user's `async` block).
614    ///
615    /// # Panics
616    ///
617    /// Panics if called on a non-activity `DurableFuture` (e.g., timer or sub-orchestration).
618    ///
619    /// # Example
620    ///
621    /// ```rust,no_run
622    /// # use duroxide::OrchestrationContext;
623    /// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
624    /// let result = ctx.schedule_activity("CompileRelease", "repo-url")
625    ///     .with_tag("build-machine")
626    ///     .await?;
627    /// # Ok(result)
628    /// # }
629    /// ```
630    pub fn with_tag(self, tag: impl Into<String>) -> Self {
631        match &self.kind {
632            ScheduleKind::Activity { .. } => {}
633            other => panic!("with_tag() can only be called on activity futures, got {:?}", other),
634        }
635
636        let tag_value = tag.into();
637        let mut inner = self.ctx.inner.lock().expect("Mutex should not be poisoned");
638        // Find the action with our token and set its tag.
639        // The token must exist: it was just emitted by schedule_activity_internal
640        // and emitted_actions is only drained when the replay engine polls
641        // (which hasn't happened yet — we're still in the user's async block).
642        let mut found = false;
643        for (token, action) in inner.emitted_actions.iter_mut() {
644            if *token == self.token {
645                match action {
646                    Action::CallActivity { tag, .. } => {
647                        *tag = Some(tag_value);
648                    }
649                    _ => panic!("Token matched but action is not CallActivity"),
650                }
651                found = true;
652                break;
653            }
654        }
655        assert!(
656            found,
657            "with_tag(): token {} not found in emitted_actions — actions were drained before with_tag() was called",
658            self.token
659        );
660        drop(inner);
661        self
662    }
663}
664
665impl<T> Future for DurableFuture<T> {
666    type Output = T;
667
668    fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<T> {
669        match self.inner.as_mut().poll(cx) {
670            Poll::Ready(value) => {
671                self.completed = true;
672                Poll::Ready(value)
673            }
674            Poll::Pending => Poll::Pending,
675        }
676    }
677}
678
679impl<T> Drop for DurableFuture<T> {
680    fn drop(&mut self) {
681        if !self.completed {
682            // Future dropped without completing - trigger cancellation.
683            // Note: During dehydration (TurnResult::Continue), the orchestration future
684            // is dropped after collect_cancelled_from_context() has already run, so these
685            // cancellations go into a context that's about to be dropped. This is safe
686            // because the next turn creates a fresh context.
687            self.ctx.mark_token_cancelled(self.token, &self.kind);
688        }
689    }
690}
691
692// DurableFuture is Send if T is Send (inner is already Send-boxed)
693unsafe impl<T: Send> Send for DurableFuture<T> {}
694
695/// Result type for `select2` - represents which of two futures completed first.
696///
697/// Use this when racing two futures with different output types:
698/// ```rust,no_run
699/// # use duroxide::{OrchestrationContext, Either2};
700/// # use std::time::Duration;
701/// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
702/// let activity = ctx.schedule_activity("Work", "input");
703/// let timeout = ctx.schedule_timer(Duration::from_secs(30));
704///
705/// match ctx.select2(activity, timeout).await {
706///     Either2::First(result) => result,
707///     Either2::Second(()) => Err("Timed out".to_string()),
708/// }
709/// # }
710/// ```
711#[derive(Debug, Clone, PartialEq, Eq)]
712pub enum Either2<A, B> {
713    /// First future completed first
714    First(A),
715    /// Second future completed first  
716    Second(B),
717}
718
719impl<A, B> Either2<A, B> {
720    /// Returns true if this is the First variant
721    pub fn is_first(&self) -> bool {
722        matches!(self, Either2::First(_))
723    }
724
725    /// Returns true if this is the Second variant
726    pub fn is_second(&self) -> bool {
727        matches!(self, Either2::Second(_))
728    }
729
730    /// Returns the index of the winner (0 for First, 1 for Second)
731    pub fn index(&self) -> usize {
732        match self {
733            Either2::First(_) => 0,
734            Either2::Second(_) => 1,
735        }
736    }
737}
738
739impl<T> Either2<T, T> {
740    /// For homogeneous Either2 (both types are the same), extract as (index, value).
741    /// This is useful for migration from the old `(usize, T)` return type.
742    pub fn into_tuple(self) -> (usize, T) {
743        match self {
744            Either2::First(v) => (0, v),
745            Either2::Second(v) => (1, v),
746        }
747    }
748}
749
750/// Result type for `select3` - represents which of three futures completed first.
751#[derive(Debug, Clone, PartialEq, Eq)]
752pub enum Either3<A, B, C> {
753    /// First future completed first
754    First(A),
755    /// Second future completed first
756    Second(B),
757    /// Third future completed first
758    Third(C),
759}
760
761impl<A, B, C> Either3<A, B, C> {
762    /// Returns the index of the winner (0 for First, 1 for Second, 2 for Third)
763    pub fn index(&self) -> usize {
764        match self {
765            Either3::First(_) => 0,
766            Either3::Second(_) => 1,
767            Either3::Third(_) => 2,
768        }
769    }
770}
771
772impl<T> Either3<T, T, T> {
773    /// For homogeneous Either3 (all types are the same), extract as (index, value).
774    pub fn into_tuple(self) -> (usize, T) {
775        match self {
776            Either3::First(v) => (0, v),
777            Either3::Second(v) => (1, v),
778            Either3::Third(v) => (2, v),
779        }
780    }
781}
782
783// Reserved prefix for built-in system activities.
784// User-registered activities cannot use names starting with this prefix.
785pub(crate) const SYSCALL_ACTIVITY_PREFIX: &str = "__duroxide_syscall:";
786
787// Built-in system activity names (constructed from prefix)
788pub(crate) const SYSCALL_ACTIVITY_NEW_GUID: &str = "__duroxide_syscall:new_guid";
789pub(crate) const SYSCALL_ACTIVITY_UTC_NOW_MS: &str = "__duroxide_syscall:utc_now_ms";
790pub(crate) const SYSCALL_ACTIVITY_GET_KV_VALUE: &str = "__duroxide_syscall:get_kv_value";
791
792use crate::_typed_codec::Codec;
793// LogLevel is now defined locally in this file
794use serde::{Deserialize, Serialize};
795use std::time::{Duration as StdDuration, SystemTime, UNIX_EPOCH};
796
797// Internal codec utilities for typed I/O (kept private; public API remains ergonomic)
798mod _typed_codec {
799    use serde::{Serialize, de::DeserializeOwned};
800    use serde_json::Value;
801    pub trait Codec {
802        fn encode<T: Serialize>(v: &T) -> Result<String, String>;
803        fn decode<T: DeserializeOwned>(s: &str) -> Result<T, String>;
804    }
805    pub struct Json;
806    impl Codec for Json {
807        fn encode<T: Serialize>(v: &T) -> Result<String, String> {
808            // If the value is a JSON string, return raw content to preserve historic behavior
809            match serde_json::to_value(v) {
810                Ok(Value::String(s)) => Ok(s),
811                Ok(val) => serde_json::to_string(&val).map_err(|e| e.to_string()),
812                Err(e) => Err(e.to_string()),
813            }
814        }
815        fn decode<T: DeserializeOwned>(s: &str) -> Result<T, String> {
816            // Try parse as JSON first
817            match serde_json::from_str::<T>(s) {
818                Ok(v) => Ok(v),
819                Err(_) => {
820                    // Fallback: treat raw string as JSON string value
821                    let val = Value::String(s.to_string());
822                    serde_json::from_value(val).map_err(|e| e.to_string())
823                }
824            }
825        }
826    }
827}
828
829/// Initial execution ID for new orchestration instances.
830/// All orchestrations start with execution_id = 1.
831pub const INITIAL_EXECUTION_ID: u64 = 1;
832
833/// Initial event ID for new executions.
834/// The first event (OrchestrationStarted) always has event_id = 1.
835pub const INITIAL_EVENT_ID: u64 = 1;
836
837// =============================================================================
838// Sub-orchestration instance ID conventions
839// =============================================================================
840
841/// Prefix for auto-generated sub-orchestration instance IDs.
842/// IDs starting with this prefix will have parent prefix added: `{parent}::{sub::N}`
843pub const SUB_ORCH_AUTO_PREFIX: &str = "sub::";
844
845/// Prefix for placeholder instance IDs before event ID assignment.
846/// These are replaced with `sub::{event_id}` during action processing.
847pub(crate) const SUB_ORCH_PENDING_PREFIX: &str = "sub::pending_";
848
849/// Determine if a sub-orchestration instance ID is auto-generated (needs parent prefix).
850///
851/// Auto-generated IDs start with "sub::" and will have the parent instance prefixed
852/// to create a globally unique ID: `{parent_instance}::{child_instance}`.
853///
854/// Explicit IDs (those not starting with "sub::") are used exactly as provided.
855#[inline]
856pub fn is_auto_generated_sub_orch_id(instance: &str) -> bool {
857    instance.starts_with(SUB_ORCH_AUTO_PREFIX)
858}
859
860/// Build the full child instance ID, adding parent prefix only for auto-generated IDs.
861///
862/// - Auto-generated IDs (starting with "sub::"): `{parent}::{child}` (e.g., `parent-1::sub::5`)
863/// - Explicit IDs: used exactly as provided (e.g., `my-custom-child-id`)
864#[inline]
865pub fn build_child_instance_id(parent_instance: &str, child_instance: &str) -> String {
866    if is_auto_generated_sub_orch_id(child_instance) {
867        format!("{parent_instance}::{child_instance}")
868    } else {
869        child_instance.to_string()
870    }
871}
872
873/// Structured error details for orchestration failures.
874///
875/// Errors are categorized into three types for proper metrics and logging:
876/// - **Infrastructure**: Provider failures, data corruption (abort turn, never reach user code)
877/// - **Configuration**: Deployment issues like unregistered activities, nondeterminism (abort turn)
878/// - **Application**: Business logic failures (flow through normal orchestration code)
879#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
880pub enum ErrorDetails {
881    /// Infrastructure failure (provider errors, data corruption).
882    /// These errors abort orchestration execution and never reach user code.
883    Infrastructure {
884        operation: String,
885        message: String,
886        retryable: bool,
887    },
888
889    /// Configuration error (unregistered orchestrations/activities, nondeterminism).
890    /// These errors abort orchestration execution and never reach user code.
891    Configuration {
892        kind: ConfigErrorKind,
893        resource: String,
894        message: Option<String>,
895    },
896
897    /// Application error (business logic failures).
898    /// These are the ONLY errors that orchestration code sees.
899    Application {
900        kind: AppErrorKind,
901        message: String,
902        retryable: bool,
903    },
904
905    /// Poison message error - message exceeded max fetch attempts.
906    ///
907    /// This indicates a message that repeatedly fails to process.
908    /// Could be caused by:
909    /// - Malformed message data causing deserialization failures
910    /// - Message triggering bugs that crash the worker
911    /// - Transient infrastructure issues that became permanent
912    /// - Application code bugs triggered by specific input patterns
913    Poison {
914        /// Number of times the message was fetched
915        attempt_count: u32,
916        /// Maximum allowed attempts
917        max_attempts: u32,
918        /// Message type and identity
919        message_type: PoisonMessageType,
920        /// The poisoned message content (serialized JSON for debugging)
921        message: String,
922    },
923}
924
925/// Poison message type identification.
926#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
927pub enum PoisonMessageType {
928    /// Orchestration work item batch
929    Orchestration { instance: String, execution_id: u64 },
930    /// Activity execution
931    Activity {
932        instance: String,
933        execution_id: u64,
934        activity_name: String,
935        activity_id: u64,
936    },
937    /// History deserialization failure (e.g., unknown event types from a newer duroxide version)
938    FailedDeserialization {
939        instance: String,
940        execution_id: u64,
941        error: String,
942    },
943}
944
945/// Configuration error kinds.
946#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
947pub enum ConfigErrorKind {
948    Nondeterminism,
949}
950
951/// Application error kinds.
952#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
953pub enum AppErrorKind {
954    ActivityFailed,
955    OrchestrationFailed,
956    Panicked,
957    Cancelled { reason: String },
958}
959
960impl ErrorDetails {
961    /// Get failure category for metrics/logging.
962    pub fn category(&self) -> &'static str {
963        match self {
964            ErrorDetails::Infrastructure { .. } => "infrastructure",
965            ErrorDetails::Configuration { .. } => "configuration",
966            ErrorDetails::Application { .. } => "application",
967            ErrorDetails::Poison { .. } => "poison",
968        }
969    }
970
971    /// Check if failure is retryable.
972    pub fn is_retryable(&self) -> bool {
973        match self {
974            ErrorDetails::Infrastructure { retryable, .. } => *retryable,
975            ErrorDetails::Application { retryable, .. } => *retryable,
976            ErrorDetails::Configuration { .. } => false,
977            ErrorDetails::Poison { .. } => false, // Never retryable
978        }
979    }
980
981    /// Get display message for logging/UI (backward compatible format).
982    pub fn display_message(&self) -> String {
983        match self {
984            ErrorDetails::Infrastructure { operation, message, .. } => {
985                format!("infrastructure:{operation}: {message}")
986            }
987            ErrorDetails::Configuration {
988                kind,
989                resource,
990                message,
991            } => match kind {
992                ConfigErrorKind::Nondeterminism => message
993                    .as_ref()
994                    .map(|m| format!("nondeterministic: {m}"))
995                    .unwrap_or_else(|| format!("nondeterministic in {resource}")),
996            },
997            ErrorDetails::Application { kind, message, .. } => match kind {
998                AppErrorKind::Cancelled { reason } => format!("canceled: {reason}"),
999                AppErrorKind::Panicked => format!("orchestration panicked: {message}"),
1000                _ => message.clone(),
1001            },
1002            ErrorDetails::Poison {
1003                attempt_count,
1004                max_attempts,
1005                message_type,
1006                ..
1007            } => match message_type {
1008                PoisonMessageType::Orchestration { instance, .. } => {
1009                    format!("poison: orchestration {instance} exceeded {attempt_count} attempts (max {max_attempts})")
1010                }
1011                PoisonMessageType::Activity {
1012                    activity_name,
1013                    activity_id,
1014                    ..
1015                } => {
1016                    format!(
1017                        "poison: activity {activity_name}#{activity_id} exceeded {attempt_count} attempts (max {max_attempts})"
1018                    )
1019                }
1020                PoisonMessageType::FailedDeserialization { instance, error, .. } => {
1021                    format!(
1022                        "poison: orchestration {instance} history deserialization failed after {attempt_count} attempts (max {max_attempts}): {error}"
1023                    )
1024                }
1025            },
1026        }
1027    }
1028}
1029
1030/// Unified event with common metadata and type-specific payload.
1031///
1032/// All events have common fields (event_id, source_event_id, instance_id, etc.)
1033/// plus type-specific data in the `kind` field.
1034///
1035/// Events are append-only history entries persisted by a provider and consumed during replay.
1036/// The `event_id` is a monotonically increasing position in history.
1037/// Scheduling and completion events are linked via `source_event_id`.
1038#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1039pub struct Event {
1040    /// Sequential position in history (monotonically increasing per execution)
1041    pub event_id: u64,
1042
1043    /// For completion events: references the scheduling event this completes.
1044    /// None for lifecycle events (OrchestrationStarted, etc.) and scheduling events.
1045    /// Some(id) for completion events (ActivityCompleted, TimerFired, etc.).
1046    pub source_event_id: Option<u64>,
1047
1048    /// Instance this event belongs to.
1049    /// Denormalized from DB key for self-contained events.
1050    pub instance_id: String,
1051
1052    /// Execution this event belongs to.
1053    /// Denormalized from DB key for self-contained events.
1054    pub execution_id: u64,
1055
1056    /// Timestamp when event was created (milliseconds since Unix epoch).
1057    pub timestamp_ms: u64,
1058
1059    /// Crate semver version that generated this event.
1060    /// Format: "0.1.0", "0.2.0", etc.
1061    pub duroxide_version: String,
1062
1063    /// Event type and associated data.
1064    #[serde(flatten)]
1065    pub kind: EventKind,
1066}
1067
1068/// Event-specific payloads.
1069///
1070/// Common fields have been extracted to the Event struct:
1071/// - event_id: moved to Event.event_id
1072/// - source_event_id: moved to Event.source_event_id (`Option<u64>`)
1073/// - execution_id: moved to Event.execution_id (was in 4 variants)
1074#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1075#[serde(tag = "type")]
1076pub enum EventKind {
1077    /// Orchestration instance was created and started by name with input.
1078    /// Version is required; parent linkage is present when this is a child orchestration.
1079    #[serde(rename = "OrchestrationStarted")]
1080    OrchestrationStarted {
1081        name: String,
1082        version: String,
1083        input: String,
1084        parent_instance: Option<String>,
1085        parent_id: Option<u64>,
1086        /// Persistent events carried forward from the previous execution during continue-as-new.
1087        /// Present only on CAN-initiated executions for audit trail. Each tuple is (event_name, data).
1088        #[serde(skip_serializing_if = "Option::is_none")]
1089        #[serde(default)]
1090        carry_forward_events: Option<Vec<(String, String)>>,
1091        /// Custom status carried forward from the previous execution during continue-as-new.
1092        /// Initialized from the last `CustomStatusUpdated` event in the previous execution's history.
1093        #[serde(skip_serializing_if = "Option::is_none")]
1094        #[serde(default)]
1095        initial_custom_status: Option<String>,
1096    },
1097
1098    /// Orchestration completed with a final result.
1099    #[serde(rename = "OrchestrationCompleted")]
1100    OrchestrationCompleted { output: String },
1101
1102    /// Orchestration failed with a final error.
1103    #[serde(rename = "OrchestrationFailed")]
1104    OrchestrationFailed { details: ErrorDetails },
1105
1106    /// Activity was scheduled.
1107    #[serde(rename = "ActivityScheduled")]
1108    ActivityScheduled {
1109        name: String,
1110        input: String,
1111        #[serde(skip_serializing_if = "Option::is_none")]
1112        #[serde(default)]
1113        session_id: Option<String>,
1114        /// Routing tag for directing this activity to specific workers.
1115        /// `None` means default (untagged) queue.
1116        #[serde(skip_serializing_if = "Option::is_none")]
1117        #[serde(default)]
1118        tag: Option<String>,
1119    },
1120
1121    /// Activity completed successfully with a result.
1122    #[serde(rename = "ActivityCompleted")]
1123    ActivityCompleted { result: String },
1124
1125    /// Activity failed with error details.
1126    #[serde(rename = "ActivityFailed")]
1127    ActivityFailed { details: ErrorDetails },
1128
1129    /// Cancellation was requested for an activity (best-effort; completion may still arrive).
1130    /// Correlates to the ActivityScheduled event via Event.source_event_id.
1131    #[serde(rename = "ActivityCancelRequested")]
1132    ActivityCancelRequested { reason: String },
1133
1134    /// Timer was created and will logically fire at `fire_at_ms`.
1135    #[serde(rename = "TimerCreated")]
1136    TimerCreated { fire_at_ms: u64 },
1137
1138    /// Timer fired at logical time `fire_at_ms`.
1139    #[serde(rename = "TimerFired")]
1140    TimerFired { fire_at_ms: u64 },
1141
1142    /// Subscription to an external event by name was recorded.
1143    #[serde(rename = "ExternalSubscribed")]
1144    ExternalSubscribed { name: String },
1145
1146    /// An external event was raised. Matched by name (no source_event_id).
1147    #[serde(rename = "ExternalEvent")]
1148    ExternalEvent { name: String, data: String },
1149
1150    /// Fire-and-forget orchestration scheduling (detached).
1151    #[serde(rename = "OrchestrationChained")]
1152    OrchestrationChained {
1153        name: String,
1154        instance: String,
1155        input: String,
1156    },
1157
1158    /// Sub-orchestration was scheduled with deterministic child instance id.
1159    #[serde(rename = "SubOrchestrationScheduled")]
1160    SubOrchestrationScheduled {
1161        name: String,
1162        instance: String,
1163        input: String,
1164    },
1165
1166    /// Sub-orchestration completed and returned a result to the parent.
1167    #[serde(rename = "SubOrchestrationCompleted")]
1168    SubOrchestrationCompleted { result: String },
1169
1170    /// Sub-orchestration failed and returned error details to the parent.
1171    #[serde(rename = "SubOrchestrationFailed")]
1172    SubOrchestrationFailed { details: ErrorDetails },
1173
1174    /// Cancellation was requested for a sub-orchestration (best-effort; completion may still arrive).
1175    /// Correlates to the SubOrchestrationScheduled event via Event.source_event_id.
1176    #[serde(rename = "SubOrchestrationCancelRequested")]
1177    SubOrchestrationCancelRequested { reason: String },
1178
1179    /// Orchestration continued as new with fresh input (terminal for this execution).
1180    #[serde(rename = "OrchestrationContinuedAsNew")]
1181    OrchestrationContinuedAsNew { input: String },
1182
1183    /// Cancellation has been requested for the orchestration (terminal will follow deterministically).
1184    #[serde(rename = "OrchestrationCancelRequested")]
1185    OrchestrationCancelRequested { reason: String },
1186
1187    /// An external event subscription was cancelled (e.g., lost a select2 race).
1188    /// Correlates to the ExternalSubscribed event via Event.source_event_id.
1189    /// This breadcrumb ensures deterministic replay — cancelled subscriptions
1190    /// are skipped during index-based matching.
1191    #[serde(rename = "ExternalSubscribedCancelled")]
1192    ExternalSubscribedCancelled { reason: String },
1193
1194    /// Persistent subscription to an external event by name was recorded.
1195    /// Unlike positional ExternalSubscribed, persistent subscriptions use
1196    /// mailbox semantics: FIFO matching, no positional pairing.
1197    #[serde(rename = "ExternalSubscribedPersistent")]
1198    QueueSubscribed { name: String },
1199
1200    /// A persistent external event was raised. Matched by name using FIFO
1201    /// mailbox semantics (not positional). Events stick around until consumed.
1202    #[serde(rename = "ExternalEventPersistent")]
1203    QueueEventDelivered { name: String, data: String },
1204
1205    /// A persistent external event subscription was cancelled (e.g., lost a select2 race).
1206    /// Correlates to the QueueSubscribed event via Event.source_event_id.
1207    /// This breadcrumb ensures correct CAN carry-forward — cancelled subscriptions
1208    /// are not counted as having consumed an arrival.
1209    #[serde(rename = "ExternalSubscribedPersistentCancelled")]
1210    QueueSubscriptionCancelled { reason: String },
1211
1212    /// Custom status was updated via `ctx.set_custom_status()` or `ctx.reset_custom_status()`.
1213    /// `Some(s)` means set to `s`; `None` means cleared back to NULL.
1214    /// This event makes custom status changes durable and replayable.
1215    #[serde(rename = "CustomStatusUpdated")]
1216    CustomStatusUpdated { status: Option<String> },
1217
1218    /// V2 subscription: includes a topic filter for pub/sub matching.
1219    /// Feature-gated for replay engine extensibility verification.
1220    #[cfg(feature = "replay-version-test")]
1221    #[serde(rename = "ExternalSubscribed2")]
1222    ExternalSubscribed2 { name: String, topic: String },
1223
1224    /// V2 event: includes the actual topic it was published on.
1225    /// Feature-gated for replay engine extensibility verification.
1226    #[cfg(feature = "replay-version-test")]
1227    #[serde(rename = "ExternalEvent2")]
1228    ExternalEvent2 { name: String, topic: String, data: String },
1229
1230    // === Key-Value Store Events ===
1231    /// A key-value pair was set via `ctx.set_kv_value()`.
1232    /// Fire-and-forget metadata action (like `CustomStatusUpdated`).
1233    #[serde(rename = "KeyValueSet")]
1234    KeyValueSet {
1235        key: String,
1236        value: String,
1237        /// Timestamp (ms since epoch) when this key was written.
1238        /// Set by the runtime, persisted by the provider. Defaults to 0 for pre-upgrade events.
1239        #[serde(default)]
1240        last_updated_at_ms: u64,
1241    },
1242
1243    /// A single key was cleared via `ctx.clear_kv_value()`.
1244    #[serde(rename = "KeyValueCleared")]
1245    KeyValueCleared { key: String },
1246
1247    /// All key-value pairs were cleared via `ctx.clear_all_kv_values()`.
1248    #[serde(rename = "KeyValuesCleared")]
1249    KeyValuesCleared,
1250}
1251
1252impl Event {
1253    /// Create a new event with common fields populated and a specific event_id.
1254    ///
1255    /// Use this when you know the event_id upfront (e.g., during replay or when
1256    /// creating events inline).
1257    pub fn with_event_id(
1258        event_id: u64,
1259        instance_id: impl Into<String>,
1260        execution_id: u64,
1261        source_event_id: Option<u64>,
1262        kind: EventKind,
1263    ) -> Self {
1264        use std::time::{SystemTime, UNIX_EPOCH};
1265        Event {
1266            event_id,
1267            source_event_id,
1268            instance_id: instance_id.into(),
1269            execution_id,
1270            timestamp_ms: SystemTime::now()
1271                .duration_since(UNIX_EPOCH)
1272                .map(|d| d.as_millis() as u64)
1273                .unwrap_or(0),
1274            duroxide_version: env!("CARGO_PKG_VERSION").to_string(),
1275            kind,
1276        }
1277    }
1278
1279    /// Create a new event with common fields populated.
1280    ///
1281    /// The event_id will be 0 and should be set by the history manager.
1282    pub fn new(
1283        instance_id: impl Into<String>,
1284        execution_id: u64,
1285        source_event_id: Option<u64>,
1286        kind: EventKind,
1287    ) -> Self {
1288        Self::with_event_id(0, instance_id, execution_id, source_event_id, kind)
1289    }
1290
1291    /// Get the event_id (position in history).
1292    #[inline]
1293    pub fn event_id(&self) -> u64 {
1294        self.event_id
1295    }
1296
1297    /// Set the event_id (used by runtime when adding events to history).
1298    #[inline]
1299    pub(crate) fn set_event_id(&mut self, id: u64) {
1300        self.event_id = id;
1301    }
1302
1303    /// Get the source_event_id if this is a completion event.
1304    /// Returns None for lifecycle and scheduling events.
1305    #[inline]
1306    pub fn source_event_id(&self) -> Option<u64> {
1307        self.source_event_id
1308    }
1309
1310    /// Check if this event is a terminal event (ends the orchestration).
1311    pub fn is_terminal(&self) -> bool {
1312        matches!(
1313            self.kind,
1314            EventKind::OrchestrationCompleted { .. }
1315                | EventKind::OrchestrationFailed { .. }
1316                | EventKind::OrchestrationContinuedAsNew { .. }
1317        )
1318    }
1319}
1320
1321/// Log levels for orchestration context logging.
1322#[derive(Debug, Clone)]
1323pub enum LogLevel {
1324    Info,
1325    Warn,
1326    Error,
1327}
1328
1329/// Backoff strategy for computing delay between retry attempts.
1330#[derive(Debug, Clone)]
1331pub enum BackoffStrategy {
1332    /// No delay between retries.
1333    None,
1334    /// Fixed delay between all retries.
1335    Fixed {
1336        /// Delay duration between each retry.
1337        delay: std::time::Duration,
1338    },
1339    /// Linear backoff: delay = base * attempt, capped at max.
1340    Linear {
1341        /// Base delay multiplied by attempt number.
1342        base: std::time::Duration,
1343        /// Maximum delay cap.
1344        max: std::time::Duration,
1345    },
1346    /// Exponential backoff: delay = base * multiplier^(attempt-1), capped at max.
1347    Exponential {
1348        /// Initial delay for first retry.
1349        base: std::time::Duration,
1350        /// Multiplier applied each attempt.
1351        multiplier: f64,
1352        /// Maximum delay cap.
1353        max: std::time::Duration,
1354    },
1355}
1356
1357impl Default for BackoffStrategy {
1358    fn default() -> Self {
1359        BackoffStrategy::Exponential {
1360            base: std::time::Duration::from_millis(100),
1361            multiplier: 2.0,
1362            max: std::time::Duration::from_secs(30),
1363        }
1364    }
1365}
1366
1367impl BackoffStrategy {
1368    /// Compute delay for given attempt (1-indexed).
1369    /// Attempt 1 is after first failure, so delay_for_attempt(1) is the first backoff.
1370    pub fn delay_for_attempt(&self, attempt: u32) -> std::time::Duration {
1371        match self {
1372            BackoffStrategy::None => std::time::Duration::ZERO,
1373            BackoffStrategy::Fixed { delay } => *delay,
1374            BackoffStrategy::Linear { base, max } => {
1375                let delay = base.saturating_mul(attempt);
1376                std::cmp::min(delay, *max)
1377            }
1378            BackoffStrategy::Exponential { base, multiplier, max } => {
1379                // delay = base * multiplier^(attempt-1)
1380                let factor = multiplier.powi(attempt.saturating_sub(1) as i32);
1381                let delay_nanos = (base.as_nanos() as f64 * factor) as u128;
1382                let delay = std::time::Duration::from_nanos(delay_nanos.min(u64::MAX as u128) as u64);
1383                std::cmp::min(delay, *max)
1384            }
1385        }
1386    }
1387}
1388
1389/// Retry policy for activities.
1390///
1391/// Configures automatic retry behavior including maximum attempts, backoff strategy,
1392/// and optional total timeout spanning all attempts.
1393///
1394/// # Example
1395///
1396/// ```rust
1397/// use std::time::Duration;
1398/// use duroxide::{RetryPolicy, BackoffStrategy};
1399///
1400/// // Simple retry with defaults (3 attempts, exponential backoff)
1401/// let policy = RetryPolicy::new(3);
1402///
1403/// // Custom policy with timeout and fixed backoff
1404/// let policy = RetryPolicy::new(5)
1405///     .with_timeout(Duration::from_secs(30))
1406///     .with_backoff(BackoffStrategy::Fixed {
1407///         delay: Duration::from_secs(1),
1408///     });
1409/// ```
1410#[derive(Debug, Clone)]
1411pub struct RetryPolicy {
1412    /// Maximum number of attempts (including initial). Must be >= 1.
1413    pub max_attempts: u32,
1414    /// Backoff strategy between retries.
1415    pub backoff: BackoffStrategy,
1416    /// Per-attempt timeout. If set, each activity attempt is raced against this
1417    /// timeout. If timeout fires, returns error immediately (no retry).
1418    /// Retries only occur for activity errors, not timeouts. None = no timeout.
1419    pub timeout: Option<std::time::Duration>,
1420}
1421
1422impl Default for RetryPolicy {
1423    fn default() -> Self {
1424        Self {
1425            max_attempts: 3,
1426            backoff: BackoffStrategy::default(),
1427            timeout: None,
1428        }
1429    }
1430}
1431
1432impl RetryPolicy {
1433    /// Create a new retry policy with specified max attempts and default backoff.
1434    ///
1435    /// # Panics
1436    /// Panics if `max_attempts` is 0.
1437    pub fn new(max_attempts: u32) -> Self {
1438        assert!(max_attempts >= 1, "max_attempts must be at least 1");
1439        Self {
1440            max_attempts,
1441            ..Default::default()
1442        }
1443    }
1444
1445    /// Set per-attempt timeout.
1446    ///
1447    /// Each activity attempt is raced against this timeout. If the timeout fires
1448    /// before the activity completes, returns an error immediately (no retry).
1449    /// Retries only occur for activity errors, not timeouts.
1450    pub fn with_timeout(mut self, timeout: std::time::Duration) -> Self {
1451        self.timeout = Some(timeout);
1452        self
1453    }
1454
1455    /// Alias for `with_timeout` for backwards compatibility.
1456    #[doc(hidden)]
1457    pub fn with_total_timeout(mut self, timeout: std::time::Duration) -> Self {
1458        self.timeout = Some(timeout);
1459        self
1460    }
1461
1462    /// Set backoff strategy.
1463    pub fn with_backoff(mut self, backoff: BackoffStrategy) -> Self {
1464        self.backoff = backoff;
1465        self
1466    }
1467
1468    /// Compute delay for given attempt using the configured backoff strategy.
1469    pub fn delay_for_attempt(&self, attempt: u32) -> std::time::Duration {
1470        self.backoff.delay_for_attempt(attempt)
1471    }
1472}
1473
1474/// Declarative decisions produced by an orchestration turn. The host/provider
1475/// is responsible for materializing these into corresponding `Event`s.
1476#[derive(Debug, Clone)]
1477pub enum Action {
1478    /// Schedule an activity invocation. scheduling_event_id is the event_id of the ActivityScheduled event.
1479    CallActivity {
1480        scheduling_event_id: u64,
1481        name: String,
1482        input: String,
1483        /// Optional session ID for worker affinity routing.
1484        /// When set, the activity is routed to the worker owning this session.
1485        session_id: Option<String>,
1486        /// Optional routing tag for directing this activity to specific workers.
1487        /// Set via `.with_tag()` on the returned `DurableFuture`.
1488        tag: Option<String>,
1489    },
1490    /// Create a timer that will fire at the specified absolute time.
1491    /// scheduling_event_id is the event_id of the TimerCreated event.
1492    /// fire_at_ms is the absolute timestamp (ms since epoch) when the timer should fire.
1493    CreateTimer { scheduling_event_id: u64, fire_at_ms: u64 },
1494    /// Subscribe to an external event by name. scheduling_event_id is the event_id of the ExternalSubscribed event.
1495    WaitExternal { scheduling_event_id: u64, name: String },
1496    /// Start a detached orchestration (no result routing back to parent).
1497    StartOrchestrationDetached {
1498        scheduling_event_id: u64,
1499        name: String,
1500        version: Option<String>,
1501        instance: String,
1502        input: String,
1503    },
1504    /// Start a sub-orchestration by name and child instance id. scheduling_event_id is the event_id of the SubOrchestrationScheduled event.
1505    StartSubOrchestration {
1506        scheduling_event_id: u64,
1507        name: String,
1508        version: Option<String>,
1509        instance: String,
1510        input: String,
1511    },
1512
1513    /// Continue the current orchestration as a new execution with new input (terminal for current execution).
1514    /// Optional version string selects the target orchestration version for the new execution.
1515    ContinueAsNew { input: String, version: Option<String> },
1516
1517    /// Update the custom status of the orchestration.
1518    /// `Some(s)` means set to `s`; `None` means cleared back to NULL.
1519    UpdateCustomStatus { status: Option<String> },
1520
1521    /// Subscribe to a persistent external event by name (mailbox semantics).
1522    /// Unlike WaitExternal, persistent events use FIFO matching and survive cancellation.
1523    DequeueEvent { scheduling_event_id: u64, name: String },
1524
1525    /// V2: Subscribe to an external event with topic-based pub/sub matching.
1526    /// Feature-gated for replay engine extensibility verification.
1527    #[cfg(feature = "replay-version-test")]
1528    WaitExternal2 {
1529        scheduling_event_id: u64,
1530        name: String,
1531        topic: String,
1532    },
1533
1534    // === Key-Value Store Actions ===
1535    /// Set a key-value pair on this orchestration instance.
1536    /// Fire-and-forget metadata action (like `UpdateCustomStatus`).
1537    SetKeyValue {
1538        key: String,
1539        value: String,
1540        last_updated_at_ms: u64,
1541    },
1542
1543    /// Clear a single key from the KV store.
1544    ClearKeyValue { key: String },
1545
1546    /// Clear all keys from the KV store.
1547    ClearKeyValues,
1548}
1549
1550/// Result delivered to a durable future upon completion.
1551///
1552/// This enum represents the completion states for various durable operations.
1553#[doc(hidden)]
1554#[derive(Debug, Clone)]
1555#[allow(dead_code)] // ExternalData is part of the design but delivered via get_external_event
1556pub enum CompletionResult {
1557    /// Activity completed successfully
1558    ActivityOk(String),
1559    /// Activity failed with error
1560    ActivityErr(String),
1561    /// Timer fired
1562    TimerFired,
1563    /// Sub-orchestration completed successfully
1564    SubOrchOk(String),
1565    /// Sub-orchestration failed with error
1566    SubOrchErr(String),
1567    /// External event data (NOTE: External events delivered via get_external_event, not CompletionResult)
1568    ExternalData(String),
1569}
1570
1571#[derive(Debug)]
1572struct CtxInner {
1573    /// Whether we're currently replaying history (true) or processing new events (false).
1574    /// True while processing baseline_history events, false after.
1575    /// Users can check this via `ctx.is_replaying()` to skip side effects during replay.
1576    is_replaying: bool,
1577
1578    // === Replay Engine State ===
1579    /// Token counter (each schedule_*() call gets a unique token)
1580    next_token: u64,
1581    /// Emitted actions (token -> Action kind info)
1582    /// Token is used to correlate with schedule events during replay
1583    emitted_actions: Vec<(u64, Action)>,
1584    /// Results map: token -> completion result (populated by replay engine)
1585    completion_results: std::collections::HashMap<u64, CompletionResult>,
1586    /// Token -> schedule_id binding (set when replay engine matches action to history)
1587    token_bindings: std::collections::HashMap<u64, u64>,
1588    /// External subscriptions: schedule_id -> (name, subscription_index)
1589    external_subscriptions: std::collections::HashMap<u64, (String, usize)>,
1590    /// External arrivals: name -> list of payloads in arrival order
1591    external_arrivals: std::collections::HashMap<String, Vec<String>>,
1592    /// Next subscription index per external event name
1593    external_next_index: std::collections::HashMap<String, usize>,
1594    /// Cancelled external subscription schedule_ids (from ExternalSubscribedCancelled breadcrumbs)
1595    external_cancelled_subscriptions: std::collections::HashSet<u64>,
1596
1597    // === Persistent External Event State (mailbox semantics) ===
1598    /// Persistent subscriptions: schedule_id -> name
1599    /// Each subscription is matched FIFO with persistent arrivals.
1600    queue_subscriptions: Vec<(u64, String)>,
1601    /// Persistent arrivals: name -> list of payloads in arrival order
1602    queue_arrivals: std::collections::HashMap<String, Vec<String>>,
1603    /// Persistent subscriptions that have been cancelled (dropped without completing)
1604    queue_cancelled_subscriptions: std::collections::HashSet<u64>,
1605    /// Persistent subscriptions that have already been resolved (consumed an arrival)
1606    queue_resolved_subscriptions: std::collections::HashSet<u64>,
1607
1608    // === V2 External Event State (feature-gated) ===
1609    /// V2 external subscriptions: schedule_id -> (name, topic, subscription_index)
1610    #[cfg(feature = "replay-version-test")]
1611    external2_subscriptions: std::collections::HashMap<u64, (String, String, usize)>,
1612    /// V2 external arrivals: (name, topic) -> list of payloads in arrival order
1613    #[cfg(feature = "replay-version-test")]
1614    external2_arrivals: std::collections::HashMap<(String, String), Vec<String>>,
1615    /// Next subscription index per (name, topic) pair
1616    #[cfg(feature = "replay-version-test")]
1617    external2_next_index: std::collections::HashMap<(String, String), usize>,
1618
1619    /// Sub-orchestration token -> resolved instance ID mapping
1620    sub_orchestration_instances: std::collections::HashMap<u64, String>,
1621
1622    // === Cancellation Tracking ===
1623    /// Tokens that have been cancelled (dropped without completing)
1624    cancelled_tokens: std::collections::HashSet<u64>,
1625    /// Cancelled token -> ScheduleKind mapping (for determining cancellation action)
1626    cancelled_token_kinds: std::collections::HashMap<u64, ScheduleKind>,
1627
1628    // Execution metadata
1629    execution_id: u64,
1630    instance_id: String,
1631    orchestration_name: String,
1632    orchestration_version: String,
1633    logging_enabled_this_poll: bool,
1634
1635    /// Accumulated custom status from history events.
1636    /// Updated by `CustomStatusUpdated` events during replay and by `set_custom_status()` / `reset_custom_status()` calls.
1637    /// `None` means no custom status has been set (either never set, or cleared via `reset_custom_status()`).
1638    accumulated_custom_status: Option<String>,
1639
1640    /// Instance-scoped key-value store (values only).
1641    /// Seeded from the provider's materialized `kv_store` table at fetch time,
1642    /// then kept current by `set_kv_value`/`clear_kv_value`/`clear_all_kv_values` calls.
1643    kv_state: std::collections::HashMap<String, String>,
1644
1645    /// Per-key `last_updated_at_ms` timestamps, seeded from the provider snapshot.
1646    /// Used by `prune_kv_values_updated_before()` for deterministic pruning decisions.
1647    /// Keys written during the current turn are marked with `u64::MAX` (never prunable
1648    /// until the next turn, when their real timestamp will appear in the snapshot).
1649    kv_metadata: std::collections::HashMap<String, u64>,
1650}
1651
1652impl CtxInner {
1653    fn new(
1654        _history: Vec<Event>, // Kept for API compatibility, no longer used
1655        execution_id: u64,
1656        instance_id: String,
1657        orchestration_name: String,
1658        orchestration_version: String,
1659        _worker_id: Option<String>, // Kept for API compatibility, no longer used
1660    ) -> Self {
1661        Self {
1662            // Start in replaying state - will be set to false when we move past baseline history
1663            is_replaying: true,
1664
1665            // Replay engine state
1666            next_token: 0,
1667            emitted_actions: Vec::new(),
1668            completion_results: Default::default(),
1669            token_bindings: Default::default(),
1670            external_subscriptions: Default::default(),
1671            external_arrivals: Default::default(),
1672            external_next_index: Default::default(),
1673            external_cancelled_subscriptions: Default::default(),
1674            queue_subscriptions: Default::default(),
1675            queue_arrivals: Default::default(),
1676            queue_cancelled_subscriptions: Default::default(),
1677            queue_resolved_subscriptions: Default::default(),
1678            #[cfg(feature = "replay-version-test")]
1679            external2_subscriptions: Default::default(),
1680            #[cfg(feature = "replay-version-test")]
1681            external2_arrivals: Default::default(),
1682            #[cfg(feature = "replay-version-test")]
1683            external2_next_index: Default::default(),
1684            sub_orchestration_instances: Default::default(),
1685
1686            // Cancellation tracking
1687            cancelled_tokens: Default::default(),
1688            cancelled_token_kinds: Default::default(),
1689
1690            // Execution metadata
1691            execution_id,
1692            instance_id,
1693            orchestration_name,
1694            orchestration_version,
1695            logging_enabled_this_poll: false,
1696
1697            accumulated_custom_status: None,
1698
1699            kv_state: std::collections::HashMap::new(),
1700            kv_metadata: std::collections::HashMap::new(),
1701        }
1702    }
1703
1704    fn now_ms(&self) -> u64 {
1705        SystemTime::now()
1706            .duration_since(UNIX_EPOCH)
1707            .map(|d| d.as_millis() as u64)
1708            .unwrap_or(0)
1709    }
1710
1711    // === Replay Engine Helpers ===
1712
1713    /// Emit an action and return a token for correlation.
1714    fn emit_action(&mut self, action: Action) -> u64 {
1715        self.next_token += 1;
1716        let token = self.next_token;
1717        self.emitted_actions.push((token, action));
1718        token
1719    }
1720
1721    /// Drain all emitted actions (called by replay engine after polling).
1722    fn drain_emitted_actions(&mut self) -> Vec<(u64, Action)> {
1723        std::mem::take(&mut self.emitted_actions)
1724    }
1725
1726    /// Bind a token to a schedule_id (called by replay engine when matching action to history).
1727    fn bind_token(&mut self, token: u64, schedule_id: u64) {
1728        self.token_bindings.insert(token, schedule_id);
1729    }
1730
1731    /// Get the schedule_id bound to a token (returns None if not yet bound).
1732    fn get_bound_schedule_id(&self, token: u64) -> Option<u64> {
1733        self.token_bindings.get(&token).copied()
1734    }
1735
1736    /// Deliver a completion result for a schedule_id.
1737    fn deliver_result(&mut self, schedule_id: u64, result: CompletionResult) {
1738        // Find the token that was bound to this schedule_id
1739        for (&token, &sid) in &self.token_bindings {
1740            if sid == schedule_id {
1741                self.completion_results.insert(token, result);
1742                return;
1743            }
1744        }
1745        tracing::warn!(
1746            schedule_id,
1747            "dropping completion result with no binding (unsupported for now)"
1748        );
1749    }
1750
1751    /// Check if a result is available for a token.
1752    fn get_result(&self, token: u64) -> Option<&CompletionResult> {
1753        self.completion_results.get(&token)
1754    }
1755
1756    /// Bind an external subscription to a deterministic index.
1757    fn bind_external_subscription(&mut self, schedule_id: u64, name: &str) {
1758        let idx = self.external_next_index.entry(name.to_string()).or_insert(0);
1759        let subscription_index = *idx;
1760        *idx += 1;
1761        self.external_subscriptions
1762            .insert(schedule_id, (name.to_string(), subscription_index));
1763    }
1764
1765    /// Check if there is an active (non-cancelled) subscription slot available
1766    /// for a positional external event.
1767    ///
1768    /// Returns `true` if there is at least one active subscription for this name
1769    /// that hasn't been matched to an arrival yet. Cancelled subscriptions are
1770    /// excluded — they don't occupy arrival slots (compression skips them).
1771    fn has_pending_subscription_slot(&self, name: &str) -> bool {
1772        let active_subs = self
1773            .external_subscriptions
1774            .iter()
1775            .filter(|(sid, (n, _))| n == name && !self.external_cancelled_subscriptions.contains(sid))
1776            .count();
1777        let delivered = self.external_arrivals.get(name).map_or(0, |v| v.len());
1778        active_subs > delivered
1779    }
1780
1781    /// Deliver an external event (appends to arrival list for the name).
1782    fn deliver_external_event(&mut self, name: String, data: String) {
1783        self.external_arrivals.entry(name).or_default().push(data);
1784    }
1785
1786    /// Get external event data for a subscription (by schedule_id).
1787    ///
1788    /// Uses compressed (effective) indices that skip cancelled subscriptions:
1789    /// effective_index = raw_index - count_of_cancelled_subscriptions_with_lower_raw_index
1790    ///
1791    /// Stale events are prevented from entering the arrival list by the causal
1792    /// check in the replay loop (`has_pending_subscription_slot`), so the
1793    /// arrival array only contains events that have a matching active slot.
1794    fn get_external_event(&self, schedule_id: u64) -> Option<&String> {
1795        let (name, subscription_index) = self.external_subscriptions.get(&schedule_id)?;
1796
1797        // If this subscription is cancelled, it never resolves
1798        if self.external_cancelled_subscriptions.contains(&schedule_id) {
1799            return None;
1800        }
1801
1802        // Count cancelled subscriptions for this name with a lower raw index
1803        let cancelled_below = self
1804            .external_subscriptions
1805            .values()
1806            .filter(|(n, idx)| n == name && *idx < *subscription_index)
1807            .filter(|(_, idx)| {
1808                // Check if this subscription's schedule_id is in the cancelled set
1809                self.external_subscriptions
1810                    .iter()
1811                    .any(|(sid, (n, i))| n == name && i == idx && self.external_cancelled_subscriptions.contains(sid))
1812            })
1813            .count();
1814
1815        let effective_index = subscription_index - cancelled_below;
1816        let arrivals = self.external_arrivals.get(name)?;
1817        arrivals.get(effective_index)
1818    }
1819
1820    /// Mark an external subscription as cancelled.
1821    fn mark_external_subscription_cancelled(&mut self, schedule_id: u64) {
1822        self.external_cancelled_subscriptions.insert(schedule_id);
1823    }
1824
1825    // === Persistent External Event Methods (mailbox semantics) ===
1826
1827    /// Bind a persistent subscription.
1828    fn bind_queue_subscription(&mut self, schedule_id: u64, name: &str) {
1829        self.queue_subscriptions.push((schedule_id, name.to_string()));
1830    }
1831
1832    /// Deliver a persistent external event (appends to arrival list for the name).
1833    fn deliver_queue_message(&mut self, name: String, data: String) {
1834        self.queue_arrivals.entry(name).or_default().push(data);
1835    }
1836
1837    /// Mark a persistent subscription as cancelled (dropped without completing).
1838    fn mark_queue_subscription_cancelled(&mut self, schedule_id: u64) {
1839        self.queue_cancelled_subscriptions.insert(schedule_id);
1840    }
1841
1842    /// Get persistent event data for a subscription (by schedule_id).
1843    ///
1844    /// Uses FIFO matching: the first active (non-cancelled, non-resolved) persistent
1845    /// subscription gets the first unmatched persistent arrival for that name.
1846    fn get_queue_message(&mut self, schedule_id: u64) -> Option<String> {
1847        // Find which name this subscription is for
1848        let name = self
1849            .queue_subscriptions
1850            .iter()
1851            .find(|(sid, _)| *sid == schedule_id)
1852            .map(|(_, n)| n.clone())?;
1853
1854        // Already resolved?
1855        if self.queue_resolved_subscriptions.contains(&schedule_id) {
1856            return None;
1857        }
1858
1859        // Already cancelled?
1860        if self.queue_cancelled_subscriptions.contains(&schedule_id) {
1861            return None;
1862        }
1863
1864        // Our arrival index is exactly the number of active (non-cancelled)
1865        // subscriptions for this name that were created before us.
1866        // This guarantees strict FIFO matching regardless of poll order.
1867        let arrival_index: usize = self
1868            .queue_subscriptions
1869            .iter()
1870            .take_while(|(sid, _)| *sid != schedule_id)
1871            .filter(|(sid, n)| n == &name && !self.queue_cancelled_subscriptions.contains(sid))
1872            .count();
1873
1874        // Get arrivals for this name
1875        let arrivals = self.queue_arrivals.get(&name)?;
1876
1877        if arrival_index < arrivals.len() {
1878            // Mark as resolved
1879            self.queue_resolved_subscriptions.insert(schedule_id);
1880            Some(arrivals[arrival_index].clone())
1881        } else {
1882            None
1883        }
1884    }
1885
1886    /// Get cancelled persistent wait schedule_ids (tokens that were bound and then dropped).
1887    fn get_cancelled_queue_ids(&self) -> Vec<u64> {
1888        let mut ids = Vec::new();
1889        for &token in &self.cancelled_tokens {
1890            if let Some(kind) = self.cancelled_token_kinds.get(&token)
1891                && matches!(kind, ScheduleKind::QueueDequeue { .. })
1892                && let Some(&schedule_id) = self.token_bindings.get(&token)
1893            {
1894                ids.push(schedule_id);
1895            }
1896        }
1897        ids
1898    }
1899
1900    /// V2: Bind an external subscription with topic to a deterministic index.
1901    #[cfg(feature = "replay-version-test")]
1902    fn bind_external_subscription2(&mut self, schedule_id: u64, name: &str, topic: &str) {
1903        let key = (name.to_string(), topic.to_string());
1904        let idx = self.external2_next_index.entry(key.clone()).or_insert(0);
1905        let subscription_index = *idx;
1906        *idx += 1;
1907        self.external2_subscriptions
1908            .insert(schedule_id, (name.to_string(), topic.to_string(), subscription_index));
1909    }
1910
1911    /// V2: Deliver an external event with topic (appends to arrival list for (name, topic)).
1912    #[cfg(feature = "replay-version-test")]
1913    fn deliver_external_event2(&mut self, name: String, topic: String, data: String) {
1914        self.external2_arrivals.entry((name, topic)).or_default().push(data);
1915    }
1916
1917    /// V2: Get external event data for a topic-based subscription (by schedule_id).
1918    #[cfg(feature = "replay-version-test")]
1919    fn get_external_event2(&self, schedule_id: u64) -> Option<&String> {
1920        let (name, topic, subscription_index) = self.external2_subscriptions.get(&schedule_id)?;
1921        let arrivals = self.external2_arrivals.get(&(name.clone(), topic.clone()))?;
1922        arrivals.get(*subscription_index)
1923    }
1924
1925    // === Cancellation Helpers ===
1926
1927    /// Mark a token as cancelled (called by DurableFuture::drop).
1928    fn mark_token_cancelled(&mut self, token: u64, kind: ScheduleKind) {
1929        self.cancelled_tokens.insert(token);
1930        self.cancelled_token_kinds.insert(token, kind);
1931    }
1932
1933    /// Get cancelled activity schedule_ids (tokens that were bound and then dropped).
1934    fn get_cancelled_activity_ids(&self) -> Vec<u64> {
1935        let mut ids = Vec::new();
1936        for &token in &self.cancelled_tokens {
1937            if let Some(kind) = self.cancelled_token_kinds.get(&token)
1938                && matches!(kind, ScheduleKind::Activity { .. })
1939                && let Some(&schedule_id) = self.token_bindings.get(&token)
1940            {
1941                ids.push(schedule_id);
1942            }
1943        }
1944        ids
1945    }
1946
1947    /// Get cancelled external wait schedule_ids (tokens that were bound and then dropped).
1948    fn get_cancelled_external_wait_ids(&self) -> Vec<u64> {
1949        let mut ids = Vec::new();
1950        for &token in &self.cancelled_tokens {
1951            if let Some(kind) = self.cancelled_token_kinds.get(&token)
1952                && matches!(kind, ScheduleKind::ExternalWait { .. })
1953                && let Some(&schedule_id) = self.token_bindings.get(&token)
1954            {
1955                ids.push(schedule_id);
1956            }
1957        }
1958        ids
1959    }
1960
1961    /// Get cancelled sub-orchestration cancellations.
1962    ///
1963    /// Returns `(scheduling_event_id, child_instance_id)` for sub-orchestration futures that were
1964    /// bound (schedule_id assigned) and then dropped.
1965    fn get_cancelled_sub_orchestration_cancellations(&self) -> Vec<(u64, String)> {
1966        let mut cancels = Vec::new();
1967        for &token in &self.cancelled_tokens {
1968            if let Some(ScheduleKind::SubOrchestration { token: sub_token }) = self.cancelled_token_kinds.get(&token)
1969                && let Some(&schedule_id) = self.token_bindings.get(&token)
1970            {
1971                // Look up the resolved instance ID from our mapping
1972                if let Some(instance_id) = self.sub_orchestration_instances.get(sub_token) {
1973                    cancels.push((schedule_id, instance_id.clone()));
1974                }
1975                // If not in mapping, the action wasn't bound yet - nothing to cancel
1976            }
1977        }
1978        cancels
1979    }
1980
1981    /// Bind a sub-orchestration token to its resolved instance ID.
1982    fn bind_sub_orchestration_instance(&mut self, token: u64, instance_id: String) {
1983        self.sub_orchestration_instances.insert(token, instance_id);
1984    }
1985
1986    /// Clear cancelled tokens (called after turn completion to avoid re-processing).
1987    fn clear_cancelled_tokens(&mut self) {
1988        self.cancelled_tokens.clear();
1989        self.cancelled_token_kinds.clear();
1990    }
1991
1992    // Note: deterministic GUID generation was removed from public API.
1993}
1994
1995/// User-facing orchestration context for scheduling and replay-safe helpers.
1996/// Context provided to activities for logging and metadata access.
1997///
1998/// Unlike [`OrchestrationContext`], activities are leaf nodes that cannot schedule new work,
1999/// but they often need to emit structured logs and inspect orchestration metadata. The
2000/// `ActivityContext` exposes the parent orchestration information and trace helpers that log
2001/// with full correlation fields.
2002///
2003/// # Examples
2004///
2005/// ```rust,no_run
2006/// # use duroxide::ActivityContext;
2007/// # use duroxide::runtime::registry::ActivityRegistry;
2008/// let activities = ActivityRegistry::builder()
2009///     .register("ProvisionVM", |ctx: ActivityContext, config: String| async move {
2010///         ctx.trace_info(format!("Provisioning VM with config: {}", config));
2011///         
2012///         // Do actual work (can use sleep, HTTP, etc.)
2013///         let vm_id = provision_vm_internal(config).await?;
2014///         
2015///         ctx.trace_info(format!("VM provisioned: {}", vm_id));
2016///         Ok(vm_id)
2017///     })
2018///     .build();
2019/// # async fn provision_vm_internal(config: String) -> Result<String, String> { Ok("vm-123".to_string()) }
2020/// ```
2021///
2022/// # Metadata Access
2023///
2024/// Activity context provides access to orchestration correlation metadata:
2025/// - `instance_id()` - Orchestration instance identifier
2026/// - `execution_id()` - Execution number (for ContinueAsNew scenarios)
2027/// - `orchestration_name()` - Parent orchestration name
2028/// - `orchestration_version()` - Parent orchestration version
2029/// - `activity_name()` - Current activity name
2030///
2031/// # Cancellation Support
2032///
2033/// Activities can respond to cancellation when their parent orchestration reaches a terminal state:
2034/// - `is_cancelled()` - Check if cancellation has been requested
2035/// - `cancelled()` - Future that completes when cancellation is requested (for use with `tokio::select!`)
2036/// - `cancellation_token()` - Get a clone of the token for spawned tasks
2037///
2038/// # Determinism
2039///
2040/// Activity trace helpers (`trace_info`, `trace_warn`, etc.) do **not** participate in
2041/// deterministic replay. They emit logs directly using [`tracing`] and should only be used for
2042/// diagnostic purposes.
2043#[derive(Clone)]
2044pub struct ActivityContext {
2045    instance_id: String,
2046    execution_id: u64,
2047    orchestration_name: String,
2048    orchestration_version: String,
2049    activity_name: String,
2050    activity_id: u64,
2051    worker_id: String,
2052    /// Optional session ID when scheduled via `schedule_activity_on_session`.
2053    session_id: Option<String>,
2054    /// Optional routing tag when scheduled via `.with_tag()`.
2055    tag: Option<String>,
2056    /// Cancellation token for cooperative cancellation.
2057    /// Triggered when the parent orchestration reaches a terminal state.
2058    cancellation_token: tokio_util::sync::CancellationToken,
2059    /// Provider store for accessing the Client API.
2060    store: std::sync::Arc<dyn crate::providers::Provider>,
2061}
2062
2063impl ActivityContext {
2064    /// Create a new activity context with a specific cancellation token.
2065    ///
2066    /// This constructor is intended for internal runtime use when the worker
2067    /// dispatcher needs to provide a cancellation token that can be triggered
2068    /// during activity execution.
2069    #[allow(clippy::too_many_arguments)]
2070    pub(crate) fn new_with_cancellation(
2071        instance_id: String,
2072        execution_id: u64,
2073        orchestration_name: String,
2074        orchestration_version: String,
2075        activity_name: String,
2076        activity_id: u64,
2077        worker_id: String,
2078        session_id: Option<String>,
2079        tag: Option<String>,
2080        cancellation_token: tokio_util::sync::CancellationToken,
2081        store: std::sync::Arc<dyn crate::providers::Provider>,
2082    ) -> Self {
2083        Self {
2084            instance_id,
2085            execution_id,
2086            orchestration_name,
2087            orchestration_version,
2088            activity_name,
2089            activity_id,
2090            worker_id,
2091            session_id,
2092            tag,
2093            cancellation_token,
2094            store,
2095        }
2096    }
2097
2098    /// Returns the orchestration instance identifier.
2099    pub fn instance_id(&self) -> &str {
2100        &self.instance_id
2101    }
2102
2103    /// Returns the execution id within the orchestration instance.
2104    pub fn execution_id(&self) -> u64 {
2105        self.execution_id
2106    }
2107
2108    /// Returns the parent orchestration name.
2109    pub fn orchestration_name(&self) -> &str {
2110        &self.orchestration_name
2111    }
2112
2113    /// Returns the parent orchestration version.
2114    pub fn orchestration_version(&self) -> &str {
2115        &self.orchestration_version
2116    }
2117
2118    /// Returns the activity name being executed.
2119    pub fn activity_name(&self) -> &str {
2120        &self.activity_name
2121    }
2122
2123    /// Returns the worker dispatcher ID processing this activity.
2124    pub fn worker_id(&self) -> &str {
2125        &self.worker_id
2126    }
2127
2128    /// Returns the session ID if this activity was scheduled via `schedule_activity_on_session`.
2129    ///
2130    /// Returns `None` for regular activities scheduled via `schedule_activity`.
2131    pub fn session_id(&self) -> Option<&str> {
2132        self.session_id.as_deref()
2133    }
2134
2135    /// Returns the routing tag if this activity was scheduled via `.with_tag()`.
2136    ///
2137    /// Returns `None` for activities scheduled without a tag.
2138    pub fn tag(&self) -> Option<&str> {
2139        self.tag.as_deref()
2140    }
2141
2142    /// Emit an INFO level trace entry associated with this activity.
2143    pub fn trace_info(&self, message: impl Into<String>) {
2144        tracing::info!(
2145            target: "duroxide::activity",
2146            instance_id = %self.instance_id,
2147            execution_id = %self.execution_id,
2148            orchestration_name = %self.orchestration_name,
2149            orchestration_version = %self.orchestration_version,
2150            activity_name = %self.activity_name,
2151            activity_id = %self.activity_id,
2152            worker_id = %self.worker_id,
2153            "{}",
2154            message.into()
2155        );
2156    }
2157
2158    /// Emit a WARN level trace entry associated with this activity.
2159    pub fn trace_warn(&self, message: impl Into<String>) {
2160        tracing::warn!(
2161            target: "duroxide::activity",
2162            instance_id = %self.instance_id,
2163            execution_id = %self.execution_id,
2164            orchestration_name = %self.orchestration_name,
2165            orchestration_version = %self.orchestration_version,
2166            activity_name = %self.activity_name,
2167            activity_id = %self.activity_id,
2168            worker_id = %self.worker_id,
2169            "{}",
2170            message.into()
2171        );
2172    }
2173
2174    /// Emit an ERROR level trace entry associated with this activity.
2175    pub fn trace_error(&self, message: impl Into<String>) {
2176        tracing::error!(
2177            target: "duroxide::activity",
2178            instance_id = %self.instance_id,
2179            execution_id = %self.execution_id,
2180            orchestration_name = %self.orchestration_name,
2181            orchestration_version = %self.orchestration_version,
2182            activity_name = %self.activity_name,
2183            activity_id = %self.activity_id,
2184            worker_id = %self.worker_id,
2185            "{}",
2186            message.into()
2187        );
2188    }
2189
2190    /// Emit a DEBUG level trace entry associated with this activity.
2191    pub fn trace_debug(&self, message: impl Into<String>) {
2192        tracing::debug!(
2193            target: "duroxide::activity",
2194            instance_id = %self.instance_id,
2195            execution_id = %self.execution_id,
2196            orchestration_name = %self.orchestration_name,
2197            orchestration_version = %self.orchestration_version,
2198            activity_name = %self.activity_name,
2199            activity_id = %self.activity_id,
2200            worker_id = %self.worker_id,
2201            "{}",
2202            message.into()
2203        );
2204    }
2205
2206    // ===== Cancellation Support =====
2207
2208    /// Check if cancellation has been requested.
2209    ///
2210    /// Returns `true` if the parent orchestration has completed, failed,
2211    /// or been cancelled. Activities can use this for cooperative cancellation.
2212    ///
2213    /// # Example
2214    ///
2215    /// ```ignore
2216    /// for item in items {
2217    ///     if ctx.is_cancelled() {
2218    ///         return Err("Activity cancelled".into());
2219    ///     }
2220    ///     process(item).await;
2221    /// }
2222    /// ```
2223    pub fn is_cancelled(&self) -> bool {
2224        self.cancellation_token.is_cancelled()
2225    }
2226
2227    /// Returns a future that completes when cancellation is requested.
2228    ///
2229    /// Use with `tokio::select!` for interruptible activities. This allows
2230    /// activities to respond promptly to cancellation without polling.
2231    ///
2232    /// # Example
2233    ///
2234    /// ```ignore
2235    /// tokio::select! {
2236    ///     result = do_work() => return result,
2237    ///     _ = ctx.cancelled() => return Err("Cancelled".into()),
2238    /// }
2239    /// ```
2240    pub async fn cancelled(&self) {
2241        self.cancellation_token.cancelled().await
2242    }
2243
2244    /// Get a clone of the cancellation token for use in spawned tasks.
2245    ///
2246    /// If your activity spawns child tasks with `tokio::spawn()`, you should
2247    /// pass them this token so they can also respond to cancellation.
2248    ///
2249    /// **Important:** If you spawn additional tasks/threads and do not pass them
2250    /// the cancellation token, they may outlive the activity's cancellation/abort.
2251    /// This is user error - the runtime provides the signal but cannot guarantee
2252    /// termination of arbitrary spawned work.
2253    ///
2254    /// # Example
2255    ///
2256    /// ```ignore
2257    /// let token = ctx.cancellation_token();
2258    /// let handle = tokio::spawn(async move {
2259    ///     loop {
2260    ///         tokio::select! {
2261    ///             _ = do_work() => {}
2262    ///             _ = token.cancelled() => break,
2263    ///         }
2264    ///     }
2265    /// });
2266    /// ```
2267    pub fn cancellation_token(&self) -> tokio_util::sync::CancellationToken {
2268        self.cancellation_token.clone()
2269    }
2270
2271    /// Get a Client for management operations.
2272    ///
2273    /// This allows activities to perform management operations such as
2274    /// pruning old executions, deleting instances, or querying instance status.
2275    ///
2276    /// # Example: Self-Pruning Eternal Orchestration
2277    ///
2278    /// ```ignore
2279    /// // Activity that prunes old executions
2280    /// async fn prune_self(ctx: ActivityContext, _input: String) -> Result<String, String> {
2281    ///     let client = ctx.get_client();
2282    ///     let instance_id = ctx.instance_id();
2283    ///
2284    ///     let result = client.prune_executions(instance_id, PruneOptions {
2285    ///         keep_last: Some(1), // Keep only current execution
2286    ///         ..Default::default()
2287    ///     }).await.map_err(|e| e.to_string())?;
2288    ///
2289    ///     Ok(format!("Pruned {} executions", result.executions_deleted))
2290    /// }
2291    /// ```
2292    pub fn get_client(&self) -> crate::Client {
2293        crate::Client::new(self.store.clone())
2294    }
2295}
2296
2297impl std::fmt::Debug for ActivityContext {
2298    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2299        f.debug_struct("ActivityContext")
2300            .field("instance_id", &self.instance_id)
2301            .field("execution_id", &self.execution_id)
2302            .field("orchestration_name", &self.orchestration_name)
2303            .field("orchestration_version", &self.orchestration_version)
2304            .field("activity_name", &self.activity_name)
2305            .field("activity_id", &self.activity_id)
2306            .field("worker_id", &self.worker_id)
2307            .field("cancellation_token", &self.cancellation_token)
2308            .field("store", &"<Provider>")
2309            .finish()
2310    }
2311}
2312
2313#[derive(Clone)]
2314pub struct OrchestrationContext {
2315    inner: Arc<Mutex<CtxInner>>,
2316}
2317
2318/// A future that never resolves, used by `continue_as_new()` to prevent further execution.
2319///
2320/// This future always returns `Poll::Pending`, ensuring that code after `await ctx.continue_as_new()`
2321/// is unreachable. The runtime extracts actions before checking the future's state, so the
2322/// `ContinueAsNew` action is properly recorded and processed.
2323struct ContinueAsNewFuture;
2324
2325impl Future for ContinueAsNewFuture {
2326    type Output = Result<String, String>; // Matches orchestration return type, but never resolves
2327
2328    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
2329        // Always pending - never resolves, making code after await unreachable
2330        // The runtime checks pending_actions before using the output, so this value is never used
2331        Poll::Pending
2332    }
2333}
2334
2335impl OrchestrationContext {
2336    /// Construct a new context from an existing history vector.
2337    ///
2338    /// # Parameters
2339    ///
2340    /// * `orchestration_name` - The name of the orchestration being executed.
2341    /// * `orchestration_version` - The semantic version string of the orchestration.
2342    /// * `worker_id` - Optional dispatcher worker ID for logging correlation.
2343    ///   - `Some(id)`: Used by runtime dispatchers to include worker_id in traces
2344    ///   - `None`: Used by standalone/test execution without runtime context
2345    pub fn new(
2346        history: Vec<Event>,
2347        execution_id: u64,
2348        instance_id: String,
2349        orchestration_name: String,
2350        orchestration_version: String,
2351        worker_id: Option<String>,
2352    ) -> Self {
2353        Self {
2354            inner: Arc::new(Mutex::new(CtxInner::new(
2355                history,
2356                execution_id,
2357                instance_id,
2358                orchestration_name,
2359                orchestration_version,
2360                worker_id,
2361            ))),
2362        }
2363    }
2364
2365    /// Check if the orchestration is currently replaying history.
2366    ///
2367    /// Returns `true` when processing events from persisted history (replay),
2368    /// and `false` when executing new logic beyond the stored history.
2369    ///
2370    /// This is useful for skipping side effects during replay, such as:
2371    /// - Logging/tracing that should only happen on first execution
2372    /// - Metrics that shouldn't be double-counted
2373    /// - External notifications that shouldn't be re-sent
2374    ///
2375    /// # Example
2376    ///
2377    /// ```rust,no_run
2378    /// # use duroxide::OrchestrationContext;
2379    /// # async fn example(ctx: OrchestrationContext) {
2380    /// if !ctx.is_replaying() {
2381    ///     // Only log on first execution, not during replay
2382    ///     println!("Starting workflow for the first time");
2383    /// }
2384    /// # }
2385    /// ```
2386    pub fn is_replaying(&self) -> bool {
2387        self.inner.lock().unwrap().is_replaying
2388    }
2389
2390    /// Set the replaying state (used by replay engine and test harness).
2391    #[doc(hidden)]
2392    pub fn set_is_replaying(&self, is_replaying: bool) {
2393        self.inner.lock().unwrap().is_replaying = is_replaying;
2394    }
2395
2396    /// Bind an external subscription to a schedule_id (used by replay engine and test harness).
2397    #[doc(hidden)]
2398    pub fn bind_external_subscription(&self, schedule_id: u64, name: &str) {
2399        self.inner.lock().unwrap().bind_external_subscription(schedule_id, name);
2400    }
2401
2402    /// Deliver an external event (used by replay engine and test harness).
2403    #[doc(hidden)]
2404    pub fn deliver_external_event(&self, name: String, data: String) {
2405        self.inner.lock().unwrap().deliver_external_event(name, data);
2406    }
2407
2408    /// Bind a persistent subscription to a schedule_id (used by replay engine).
2409    #[doc(hidden)]
2410    pub fn bind_queue_subscription(&self, schedule_id: u64, name: &str) {
2411        self.inner.lock().unwrap().bind_queue_subscription(schedule_id, name);
2412    }
2413
2414    /// Deliver a persistent external event (used by replay engine).
2415    #[doc(hidden)]
2416    pub fn deliver_queue_message(&self, name: String, data: String) {
2417        self.inner.lock().unwrap().deliver_queue_message(name, data);
2418    }
2419
2420    /// Mark a persistent subscription as cancelled (used by replay engine).
2421    pub(crate) fn mark_queue_subscription_cancelled(&self, schedule_id: u64) {
2422        self.inner
2423            .lock()
2424            .unwrap()
2425            .mark_queue_subscription_cancelled(schedule_id);
2426    }
2427
2428    // =========================================================================
2429    // Cancellation Support (DurableFuture integration)
2430    // =========================================================================
2431
2432    /// Mark a token as cancelled (called by DurableFuture::drop).
2433    pub(crate) fn mark_token_cancelled(&self, token: u64, kind: &ScheduleKind) {
2434        self.inner.lock().unwrap().mark_token_cancelled(token, kind.clone());
2435    }
2436
2437    /// Get cancelled activity schedule_ids for this turn.
2438    pub(crate) fn get_cancelled_activity_ids(&self) -> Vec<u64> {
2439        self.inner.lock().unwrap().get_cancelled_activity_ids()
2440    }
2441
2442    /// Get cancelled external wait schedule_ids for this turn.
2443    pub(crate) fn get_cancelled_external_wait_ids(&self) -> Vec<u64> {
2444        self.inner.lock().unwrap().get_cancelled_external_wait_ids()
2445    }
2446
2447    /// Mark an external subscription as cancelled (called by replay engine).
2448    pub(crate) fn mark_external_subscription_cancelled(&self, schedule_id: u64) {
2449        self.inner
2450            .lock()
2451            .unwrap()
2452            .mark_external_subscription_cancelled(schedule_id);
2453    }
2454
2455    /// Get cancelled persistent wait schedule_ids for this turn.
2456    pub(crate) fn get_cancelled_queue_ids(&self) -> Vec<u64> {
2457        self.inner.lock().unwrap().get_cancelled_queue_ids()
2458    }
2459
2460    /// Get cancelled sub-orchestration cancellations for this turn.
2461    pub(crate) fn get_cancelled_sub_orchestration_cancellations(&self) -> Vec<(u64, String)> {
2462        self.inner
2463            .lock()
2464            .unwrap()
2465            .get_cancelled_sub_orchestration_cancellations()
2466    }
2467
2468    /// Clear cancelled tokens after processing (called by replay engine).
2469    pub(crate) fn clear_cancelled_tokens(&self) {
2470        self.inner.lock().unwrap().clear_cancelled_tokens();
2471    }
2472
2473    /// Bind a sub-orchestration token to its resolved instance ID.
2474    pub(crate) fn bind_sub_orchestration_instance(&self, token: u64, instance_id: String) {
2475        self.inner
2476            .lock()
2477            .unwrap()
2478            .bind_sub_orchestration_instance(token, instance_id);
2479    }
2480
2481    // =========================================================================
2482    // Simplified Mode Tracing (Replay-Guarded)
2483    // =========================================================================
2484    //
2485    // These trace methods use `is_replaying()` as a guard, which means:
2486    // - No history events are created for traces
2487    // - Traces only emit on first execution, not during replay
2488    // - Much simpler and more efficient than system-call-based tracing
2489
2490    /// Convenience wrapper for INFO level tracing.
2491    ///
2492    /// Logs with INFO level and includes instance context automatically.
2493    /// Only emits on first execution, not during replay.
2494    ///
2495    /// # Example
2496    ///
2497    /// ```rust,no_run
2498    /// # use duroxide::OrchestrationContext;
2499    /// # async fn example(ctx: OrchestrationContext) {
2500    /// ctx.trace_info("Processing order");
2501    /// ctx.trace_info(format!("Processing {} items", 42));
2502    /// # }
2503    /// ```
2504    pub fn trace_info(&self, message: impl Into<String>) {
2505        self.trace("INFO", message);
2506    }
2507
2508    /// Convenience wrapper for WARN level tracing.
2509    ///
2510    /// Logs with WARN level and includes instance context automatically.
2511    /// Only emits on first execution, not during replay.
2512    ///
2513    /// # Example
2514    ///
2515    /// ```rust,no_run
2516    /// # use duroxide::OrchestrationContext;
2517    /// # async fn example(ctx: OrchestrationContext) {
2518    /// ctx.trace_warn("Retrying failed operation");
2519    /// # }
2520    /// ```
2521    pub fn trace_warn(&self, message: impl Into<String>) {
2522        self.trace("WARN", message);
2523    }
2524
2525    /// Convenience wrapper for ERROR level tracing.
2526    ///
2527    /// Logs with ERROR level and includes instance context automatically.
2528    /// Only emits on first execution, not during replay.
2529    ///
2530    /// # Example
2531    ///
2532    /// ```rust,no_run
2533    /// # use duroxide::OrchestrationContext;
2534    /// # async fn example(ctx: OrchestrationContext) {
2535    /// ctx.trace_error("Payment processing failed");
2536    /// # }
2537    /// ```
2538    pub fn trace_error(&self, message: impl Into<String>) {
2539        self.trace("ERROR", message);
2540    }
2541
2542    /// Convenience wrapper for DEBUG level tracing.
2543    ///
2544    /// Logs with DEBUG level and includes instance context automatically.
2545    /// Only emits on first execution, not during replay.
2546    ///
2547    /// # Example
2548    ///
2549    /// ```rust,no_run
2550    /// # use duroxide::OrchestrationContext;
2551    /// # async fn example(ctx: OrchestrationContext) {
2552    /// ctx.trace_debug("Detailed state information");
2553    /// # }
2554    /// ```
2555    pub fn trace_debug(&self, message: impl Into<String>) {
2556        self.trace("DEBUG", message);
2557    }
2558
2559    /// Drain emitted actions.
2560    /// Returns a list of (token, Action) pairs.
2561    #[doc(hidden)]
2562    pub fn drain_emitted_actions(&self) -> Vec<(u64, Action)> {
2563        self.inner.lock().unwrap().drain_emitted_actions()
2564    }
2565
2566    /// Get a snapshot of emitted actions without draining.
2567    /// Returns a list of (token, Action) pairs.
2568    #[doc(hidden)]
2569    pub fn get_emitted_actions(&self) -> Vec<(u64, Action)> {
2570        self.inner.lock().unwrap().emitted_actions.clone()
2571    }
2572
2573    /// Bind a token to a schedule_id.
2574    #[doc(hidden)]
2575    pub fn bind_token(&self, token: u64, schedule_id: u64) {
2576        self.inner.lock().unwrap().bind_token(token, schedule_id);
2577    }
2578
2579    /// Deliver a result for a token.
2580    #[doc(hidden)]
2581    pub fn deliver_result(&self, schedule_id: u64, result: CompletionResult) {
2582        self.inner.lock().unwrap().deliver_result(schedule_id, result);
2583    }
2584
2585    /// Returns the orchestration instance identifier.
2586    ///
2587    /// This is the unique identifier for this orchestration instance, typically
2588    /// provided when starting the orchestration.
2589    ///
2590    /// # Example
2591    ///
2592    /// ```rust,no_run
2593    /// # use duroxide::OrchestrationContext;
2594    /// # async fn example(ctx: OrchestrationContext) {
2595    /// let id = ctx.instance_id();
2596    /// ctx.trace_info(format!("Processing instance: {}", id));
2597    /// # }
2598    /// ```
2599    pub fn instance_id(&self) -> String {
2600        self.inner.lock().unwrap().instance_id.clone()
2601    }
2602
2603    /// Returns the current execution ID within this orchestration instance.
2604    ///
2605    /// The execution ID increments each time `continue_as_new()` is called.
2606    /// Execution 1 is the initial execution.
2607    ///
2608    /// # Example
2609    ///
2610    /// ```rust,no_run
2611    /// # use duroxide::OrchestrationContext;
2612    /// # async fn example(ctx: OrchestrationContext) {
2613    /// let exec_id = ctx.execution_id();
2614    /// ctx.trace_info(format!("Execution #{}", exec_id));
2615    /// # }
2616    /// ```
2617    pub fn execution_id(&self) -> u64 {
2618        self.inner.lock().unwrap().execution_id
2619    }
2620
2621    /// Returns the orchestration name.
2622    ///
2623    /// This is the name registered with the orchestration registry.
2624    ///
2625    /// # Example
2626    ///
2627    /// ```rust,no_run
2628    /// # use duroxide::OrchestrationContext;
2629    /// # async fn example(ctx: OrchestrationContext) {
2630    /// let name = ctx.orchestration_name();
2631    /// ctx.trace_info(format!("Running orchestration: {}", name));
2632    /// # }
2633    /// ```
2634    pub fn orchestration_name(&self) -> String {
2635        self.inner.lock().unwrap().orchestration_name.clone()
2636    }
2637
2638    /// Returns the orchestration version.
2639    ///
2640    /// This is the semantic version string associated with the orchestration.
2641    ///
2642    /// # Example
2643    ///
2644    /// ```rust,no_run
2645    /// # use duroxide::OrchestrationContext;
2646    /// # async fn example(ctx: OrchestrationContext) {
2647    /// let version = ctx.orchestration_version();
2648    /// ctx.trace_info(format!("Version: {}", version));
2649    /// # }
2650    /// ```
2651    pub fn orchestration_version(&self) -> String {
2652        self.inner.lock().unwrap().orchestration_version.clone()
2653    }
2654
2655    // Replay-safe logging control
2656    /// Indicates whether logging is enabled for the current poll. This is
2657    /// flipped on when a decision is recorded to minimize log noise.
2658    pub fn is_logging_enabled(&self) -> bool {
2659        self.inner.lock().unwrap().logging_enabled_this_poll
2660    }
2661    // log_buffer removed - not used
2662
2663    /// Emit a structured trace entry with automatic context correlation.
2664    ///
2665    /// Creates a system call event for deterministic replay and logs to tracing.
2666    /// The log entry automatically includes correlation fields:
2667    /// - `instance_id` - The orchestration instance identifier
2668    /// - `execution_id` - The current execution number
2669    /// - `orchestration_name` - Name of the orchestration
2670    /// - `orchestration_version` - Semantic version
2671    ///
2672    /// # Determinism
2673    ///
2674    /// This method is replay-safe: logs are only emitted on first execution,
2675    /// not during replay.
2676    ///
2677    /// # Example
2678    ///
2679    /// ```rust,no_run
2680    /// # use duroxide::OrchestrationContext;
2681    /// # async fn example(ctx: OrchestrationContext) {
2682    /// ctx.trace("INFO", "Processing started");
2683    /// ctx.trace("WARN", format!("Retry attempt: {}", 3));
2684    /// ctx.trace("ERROR", "Payment validation failed");
2685    /// # }
2686    /// ```
2687    ///
2688    /// # Output
2689    ///
2690    /// ```text
2691    /// 2024-10-30T10:15:23.456Z INFO duroxide::orchestration [order-123] Processing started
2692    /// ```
2693    ///
2694    /// All logs include instance_id, execution_id, orchestration_name for correlation.
2695    pub fn trace(&self, level: impl Into<String>, message: impl Into<String>) {
2696        self.trace_internal(&level.into(), &message.into());
2697    }
2698
2699    /// Internal implementation of trace (guarded by is_replaying)
2700    fn trace_internal(&self, level: &str, message: &str) {
2701        let inner = self.inner.lock().unwrap();
2702
2703        // Only trace if not replaying
2704        if !inner.is_replaying {
2705            match level.to_uppercase().as_str() {
2706                "INFO" => tracing::info!(
2707                    target: "duroxide::orchestration",
2708                    instance_id = %inner.instance_id,
2709                    execution_id = %inner.execution_id,
2710                    orchestration_name = %inner.orchestration_name,
2711                    orchestration_version = %inner.orchestration_version,
2712                    "{}",
2713                    message
2714                ),
2715                "WARN" => tracing::warn!(
2716                    target: "duroxide::orchestration",
2717                    instance_id = %inner.instance_id,
2718                    execution_id = %inner.execution_id,
2719                    orchestration_name = %inner.orchestration_name,
2720                    orchestration_version = %inner.orchestration_version,
2721                    "{}",
2722                    message
2723                ),
2724                "ERROR" => tracing::error!(
2725                    target: "duroxide::orchestration",
2726                    instance_id = %inner.instance_id,
2727                    execution_id = %inner.execution_id,
2728                    orchestration_name = %inner.orchestration_name,
2729                    orchestration_version = %inner.orchestration_version,
2730                    "{}",
2731                    message
2732                ),
2733                "DEBUG" => tracing::debug!(
2734                    target: "duroxide::orchestration",
2735                    instance_id = %inner.instance_id,
2736                    execution_id = %inner.execution_id,
2737                    orchestration_name = %inner.orchestration_name,
2738                    orchestration_version = %inner.orchestration_version,
2739                    "{}",
2740                    message
2741                ),
2742                _ => tracing::trace!(
2743                    target: "duroxide::orchestration",
2744                    instance_id = %inner.instance_id,
2745                    execution_id = %inner.execution_id,
2746                    orchestration_name = %inner.orchestration_name,
2747                    orchestration_version = %inner.orchestration_version,
2748                    level = %level,
2749                    "{}",
2750                    message
2751                ),
2752            }
2753        }
2754    }
2755
2756    /// Generate a new deterministic GUID.
2757    ///
2758    /// This schedules a built-in activity that generates a unique identifier.
2759    /// The GUID is deterministic across replays (the same value is returned
2760    /// when the orchestration replays).
2761    ///
2762    /// # Example
2763    ///
2764    /// ```rust,no_run
2765    /// # use duroxide::OrchestrationContext;
2766    /// # async fn example(ctx: OrchestrationContext) -> Result<(), String> {
2767    /// let guid = ctx.new_guid().await?;
2768    /// println!("Generated GUID: {}", guid);
2769    /// # Ok(())
2770    /// # }
2771    /// ```
2772    pub fn new_guid(&self) -> impl Future<Output = Result<String, String>> {
2773        self.schedule_activity(SYSCALL_ACTIVITY_NEW_GUID, "")
2774    }
2775
2776    /// Get the current UTC time.
2777    ///
2778    /// This schedules a built-in activity that returns the current time.
2779    /// The time is deterministic across replays (the same value is returned
2780    /// when the orchestration replays).
2781    ///
2782    /// # Errors
2783    ///
2784    /// Returns an error if the activity fails or if the time value cannot be parsed.
2785    ///
2786    /// # Example
2787    ///
2788    /// ```rust,no_run
2789    /// # use duroxide::OrchestrationContext;
2790    /// # use std::time::{SystemTime, Duration};
2791    /// # async fn example(ctx: OrchestrationContext) -> Result<(), String> {
2792    /// let now = ctx.utc_now().await?;
2793    /// let deadline = now + Duration::from_secs(3600); // 1 hour from now
2794    /// # Ok(())
2795    /// # }
2796    /// ```
2797    pub fn utc_now(&self) -> impl Future<Output = Result<SystemTime, String>> {
2798        let fut = self.schedule_activity(SYSCALL_ACTIVITY_UTC_NOW_MS, "");
2799        async move {
2800            let s = fut.await?;
2801            let ms = s.parse::<u64>().map_err(|e| e.to_string())?;
2802            Ok(UNIX_EPOCH + StdDuration::from_millis(ms))
2803        }
2804    }
2805
2806    /// Continue the current execution as a new execution with fresh input.
2807    ///
2808    /// This terminates the current execution and starts a new execution with the provided input.
2809    /// Returns a future that never resolves, ensuring code after `await` is unreachable.
2810    ///
2811    /// # Example
2812    /// ```rust,no_run
2813    /// # use duroxide::OrchestrationContext;
2814    /// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
2815    /// let n: u32 = 0;
2816    /// if n < 2 {
2817    ///     return ctx.continue_as_new("next_input").await; // Execution terminates here
2818    ///     // This code is unreachable - compiler will warn
2819    /// }
2820    /// Ok("completed".to_string())
2821    /// # }
2822    /// ```
2823    pub fn continue_as_new(&self, input: impl Into<String>) -> impl Future<Output = Result<String, String>> {
2824        let mut inner = self.inner.lock().unwrap();
2825        let input: String = input.into();
2826        let action = Action::ContinueAsNew { input, version: None };
2827
2828        inner.emit_action(action);
2829        ContinueAsNewFuture
2830    }
2831
2832    pub fn continue_as_new_typed<In: serde::Serialize>(
2833        &self,
2834        input: &In,
2835    ) -> impl Future<Output = Result<String, String>> {
2836        // Serialization should never fail for valid input types - if it does, it's a programming error
2837        let payload =
2838            crate::_typed_codec::Json::encode(input).expect("Serialization should never fail for valid input");
2839        self.continue_as_new(payload)
2840    }
2841
2842    /// ContinueAsNew to a specific target version (string is parsed as semver later).
2843    pub fn continue_as_new_versioned(
2844        &self,
2845        version: impl Into<String>,
2846        input: impl Into<String>,
2847    ) -> impl Future<Output = Result<String, String>> {
2848        let mut inner = self.inner.lock().unwrap();
2849        let action = Action::ContinueAsNew {
2850            input: input.into(),
2851            version: Some(version.into()),
2852        };
2853        inner.emit_action(action);
2854        ContinueAsNewFuture
2855    }
2856}
2857
2858/// Generate a deterministic GUID for use in orchestrations.
2859///
2860/// Uses timestamp + thread-local counter for uniqueness.
2861pub(crate) fn generate_guid() -> String {
2862    use std::time::{SystemTime, UNIX_EPOCH};
2863
2864    let timestamp = SystemTime::now()
2865        .duration_since(UNIX_EPOCH)
2866        .map(|d| d.as_nanos())
2867        .unwrap_or(0);
2868
2869    // Thread-local counter for uniqueness within the same timestamp
2870    thread_local! {
2871        static COUNTER: std::cell::Cell<u32> = const { std::cell::Cell::new(0) };
2872    }
2873    let counter = COUNTER.with(|c| {
2874        let val = c.get();
2875        c.set(val.wrapping_add(1));
2876        val
2877    });
2878
2879    // Format as UUID-like string
2880    format!(
2881        "{:08x}-{:04x}-{:04x}-{:04x}-{:012x}",
2882        (timestamp >> 96) as u32,
2883        ((timestamp >> 80) & 0xFFFF) as u16,
2884        (counter & 0xFFFF) as u16,
2885        ((timestamp >> 64) & 0xFFFF) as u16,
2886        (timestamp & 0xFFFFFFFFFFFF) as u64
2887    )
2888}
2889
2890impl OrchestrationContext {
2891    /// Schedule activity with automatic retry on failure.
2892    ///
2893    /// **Retry behavior:**
2894    /// - Retries on activity **errors** up to `policy.max_attempts`
2895    /// - **Timeouts are NOT retried** - if any attempt times out, returns error immediately
2896    /// - Only application errors trigger retry logic
2897    ///
2898    /// **Timeout behavior (if `policy.total_timeout` is set):**
2899    /// - Each activity attempt is raced against the timeout
2900    /// - If the timeout fires before the activity completes → returns timeout error (no retry)
2901    /// - If the activity fails with an error before timeout → retry according to policy
2902    ///
2903    /// # Example
2904    ///
2905    /// ```rust,no_run
2906    /// # use duroxide::{OrchestrationContext, RetryPolicy, BackoffStrategy};
2907    /// # use std::time::Duration;
2908    /// # async fn example(ctx: OrchestrationContext) -> Result<(), String> {
2909    /// // Simple retry with defaults (no timeout)
2910    /// let result = ctx.schedule_activity_with_retry(
2911    ///     "CallAPI",
2912    ///     "request",
2913    ///     RetryPolicy::new(3),
2914    /// ).await?;
2915    ///
2916    /// // Retry with per-attempt timeout and custom backoff
2917    /// let result = ctx.schedule_activity_with_retry(
2918    ///     "CallAPI",
2919    ///     "request",
2920    ///     RetryPolicy::new(5)
2921    ///         .with_timeout(Duration::from_secs(30)) // 30s per attempt
2922    ///         .with_backoff(BackoffStrategy::Fixed { delay: Duration::from_secs(1) }),
2923    /// ).await?;
2924    /// # Ok(())
2925    /// # }
2926    /// ```
2927    ///
2928    /// # Errors
2929    ///
2930    /// Returns an error if all retry attempts fail or if a timeout occurs (timeouts are not retried).
2931    pub async fn schedule_activity_with_retry(
2932        &self,
2933        name: impl Into<String>,
2934        input: impl Into<String>,
2935        policy: RetryPolicy,
2936    ) -> Result<String, String> {
2937        let name = name.into();
2938        let input = input.into();
2939        let mut last_error = String::new();
2940
2941        for attempt in 1..=policy.max_attempts {
2942            // Each attempt: optionally race against per-attempt timeout
2943            let activity_result = if let Some(timeout) = policy.timeout {
2944                // Race activity vs per-attempt timeout
2945                let deadline = async {
2946                    self.schedule_timer(timeout).await;
2947                    Err::<String, String>("timeout: activity timed out".to_string())
2948                };
2949                let activity = self.schedule_activity(&name, &input);
2950
2951                match self.select2(activity, deadline).await {
2952                    Either2::First(result) => result,
2953                    Either2::Second(Err(e)) => {
2954                        // Timeout fired - exit immediately, no retry for timeouts
2955                        return Err(e);
2956                    }
2957                    Either2::Second(Ok(_)) => unreachable!(),
2958                }
2959            } else {
2960                // No timeout - just await the activity
2961                self.schedule_activity(&name, &input).await
2962            };
2963
2964            match activity_result {
2965                Ok(result) => return Ok(result),
2966                Err(e) => {
2967                    // Activity failed with error - apply retry policy
2968                    last_error = e.clone();
2969                    if attempt < policy.max_attempts {
2970                        self.trace(
2971                            "warn",
2972                            format!(
2973                                "Activity '{}' attempt {}/{} failed: {}. Retrying...",
2974                                name, attempt, policy.max_attempts, e
2975                            ),
2976                        );
2977                        let delay = policy.delay_for_attempt(attempt);
2978                        if !delay.is_zero() {
2979                            self.schedule_timer(delay).await;
2980                        }
2981                    }
2982                }
2983            }
2984        }
2985        Err(last_error)
2986    }
2987
2988    /// Typed variant of `schedule_activity_with_retry`.
2989    ///
2990    /// Serializes input once and deserializes the successful result.
2991    ///
2992    /// # Errors
2993    ///
2994    /// Returns an error if all retry attempts fail, if a timeout occurs, if input serialization fails, or if result deserialization fails.
2995    pub async fn schedule_activity_with_retry_typed<In: serde::Serialize, Out: serde::de::DeserializeOwned>(
2996        &self,
2997        name: impl Into<String>,
2998        input: &In,
2999        policy: RetryPolicy,
3000    ) -> Result<Out, String> {
3001        let payload = crate::_typed_codec::Json::encode(input).expect("encode");
3002        let result = self.schedule_activity_with_retry(name, payload, policy).await?;
3003        crate::_typed_codec::Json::decode::<Out>(&result)
3004    }
3005
3006    /// Schedule an activity with automatic retry on a specific session.
3007    ///
3008    /// Combines retry semantics from [`Self::schedule_activity_with_retry`] with
3009    /// session affinity from [`Self::schedule_activity_on_session`]. All retry
3010    /// attempts are pinned to the same `session_id`, ensuring they execute on
3011    /// the same worker.
3012    ///
3013    /// # Example
3014    ///
3015    /// ```rust,no_run
3016    /// # use duroxide::{OrchestrationContext, RetryPolicy, BackoffStrategy};
3017    /// # use std::time::Duration;
3018    /// # async fn example(ctx: OrchestrationContext) -> Result<(), String> {
3019    /// let session = ctx.new_guid().await?;
3020    /// let result = ctx.schedule_activity_with_retry_on_session(
3021    ///     "RunQuery",
3022    ///     "SELECT 1",
3023    ///     RetryPolicy::new(3)
3024    ///         .with_backoff(BackoffStrategy::Fixed { delay: Duration::from_secs(1) }),
3025    ///     &session,
3026    /// ).await?;
3027    /// # Ok(())
3028    /// # }
3029    /// ```
3030    ///
3031    /// # Errors
3032    ///
3033    /// Returns an error if all retry attempts fail or if a timeout occurs (timeouts are not retried).
3034    pub async fn schedule_activity_with_retry_on_session(
3035        &self,
3036        name: impl Into<String>,
3037        input: impl Into<String>,
3038        policy: RetryPolicy,
3039        session_id: impl Into<String>,
3040    ) -> Result<String, String> {
3041        let name = name.into();
3042        let input = input.into();
3043        let session_id = session_id.into();
3044        let mut last_error = String::new();
3045
3046        for attempt in 1..=policy.max_attempts {
3047            let activity_result = if let Some(timeout) = policy.timeout {
3048                let deadline = async {
3049                    self.schedule_timer(timeout).await;
3050                    Err::<String, String>("timeout: activity timed out".to_string())
3051                };
3052                let activity = self.schedule_activity_on_session(&name, &input, &session_id);
3053
3054                match self.select2(activity, deadline).await {
3055                    Either2::First(result) => result,
3056                    Either2::Second(Err(e)) => return Err(e),
3057                    Either2::Second(Ok(_)) => unreachable!(),
3058                }
3059            } else {
3060                self.schedule_activity_on_session(&name, &input, &session_id).await
3061            };
3062
3063            match activity_result {
3064                Ok(result) => return Ok(result),
3065                Err(e) => {
3066                    last_error = e.clone();
3067                    if attempt < policy.max_attempts {
3068                        self.trace(
3069                            "warn",
3070                            format!(
3071                                "Activity '{}' (session={}) attempt {}/{} failed: {}. Retrying...",
3072                                name, session_id, attempt, policy.max_attempts, e
3073                            ),
3074                        );
3075                        let delay = policy.delay_for_attempt(attempt);
3076                        if !delay.is_zero() {
3077                            self.schedule_timer(delay).await;
3078                        }
3079                    }
3080                }
3081            }
3082        }
3083        Err(last_error)
3084    }
3085
3086    /// Typed variant of [`Self::schedule_activity_with_retry_on_session`].
3087    ///
3088    /// Serializes input once and deserializes the successful result. All retry
3089    /// attempts are pinned to the same session.
3090    ///
3091    /// # Errors
3092    ///
3093    /// Returns an error if all retry attempts fail, if a timeout occurs, if input
3094    /// serialization fails, or if result deserialization fails.
3095    pub async fn schedule_activity_with_retry_on_session_typed<
3096        In: serde::Serialize,
3097        Out: serde::de::DeserializeOwned,
3098    >(
3099        &self,
3100        name: impl Into<String>,
3101        input: &In,
3102        policy: RetryPolicy,
3103        session_id: impl Into<String>,
3104    ) -> Result<Out, String> {
3105        let payload = crate::_typed_codec::Json::encode(input).expect("encode");
3106        let result = self
3107            .schedule_activity_with_retry_on_session(name, payload, policy, session_id)
3108            .await?;
3109        crate::_typed_codec::Json::decode::<Out>(&result)
3110    }
3111
3112    /// Schedule a detached orchestration with an explicit instance id.
3113    /// The runtime will prefix this with the parent instance to ensure global uniqueness.
3114    pub fn schedule_orchestration(
3115        &self,
3116        name: impl Into<String>,
3117        instance: impl Into<String>,
3118        input: impl Into<String>,
3119    ) {
3120        let name: String = name.into();
3121        let instance: String = instance.into();
3122        let input: String = input.into();
3123        let mut inner = self.inner.lock().unwrap();
3124
3125        let _ = inner.emit_action(Action::StartOrchestrationDetached {
3126            scheduling_event_id: 0, // Will be assigned by replay engine
3127            name,
3128            version: None,
3129            instance,
3130            input,
3131        });
3132    }
3133
3134    pub fn schedule_orchestration_typed<In: serde::Serialize>(
3135        &self,
3136        name: impl Into<String>,
3137        instance: impl Into<String>,
3138        input: &In,
3139    ) {
3140        let payload = crate::_typed_codec::Json::encode(input).expect("encode");
3141        self.schedule_orchestration(name, instance, payload)
3142    }
3143
3144    /// Versioned detached orchestration start (string I/O). If `version` is None, registry policy is used for the child.
3145    pub fn schedule_orchestration_versioned(
3146        &self,
3147        name: impl Into<String>,
3148        version: Option<String>,
3149        instance: impl Into<String>,
3150        input: impl Into<String>,
3151    ) {
3152        let name: String = name.into();
3153        let instance: String = instance.into();
3154        let input: String = input.into();
3155        let mut inner = self.inner.lock().unwrap();
3156
3157        let _ = inner.emit_action(Action::StartOrchestrationDetached {
3158            scheduling_event_id: 0, // Will be assigned by replay engine
3159            name,
3160            version,
3161            instance,
3162            input,
3163        });
3164    }
3165
3166    pub fn schedule_orchestration_versioned_typed<In: serde::Serialize>(
3167        &self,
3168        name: impl Into<String>,
3169        version: Option<String>,
3170        instance: impl Into<String>,
3171        input: &In,
3172    ) {
3173        let payload = crate::_typed_codec::Json::encode(input).expect("encode");
3174        self.schedule_orchestration_versioned(name, version, instance, payload)
3175    }
3176
3177    /// Set a user-defined custom status for progress reporting.
3178    ///
3179    /// This is **not** a history event or an action — it's pure metadata that gets
3180    /// plumbed into `ExecutionMetadata` at ack time. No impact on determinism,
3181    /// no replay implications.
3182    ///
3183    /// - Call it whenever, as many times as you want within a turn
3184    /// - Last write wins: if called twice in the same turn, only the last value is sent
3185    /// - Persistent across turns: if you don't call it on a later turn, the provider
3186    ///   keeps the previous value
3187    ///
3188    /// # Example
3189    ///
3190    /// ```rust,no_run
3191    /// # use duroxide::OrchestrationContext;
3192    /// # async fn example(ctx: OrchestrationContext) {
3193    /// ctx.set_custom_status("Processing item 3 of 10");
3194    /// let result = ctx.schedule_activity("ProcessItem", "item-3").await;
3195    /// ctx.set_custom_status("Processing item 4 of 10");
3196    /// # }
3197    /// ```
3198    pub fn set_custom_status(&self, status: impl Into<String>) {
3199        let status: String = status.into();
3200        let mut inner = self.inner.lock().unwrap();
3201        inner.accumulated_custom_status = Some(status.clone());
3202        inner.emit_action(Action::UpdateCustomStatus { status: Some(status) });
3203    }
3204
3205    /// Clear the custom status back to `None`. The provider will set the column to NULL
3206    /// and increment `custom_status_version`.
3207    ///
3208    /// ```rust,no_run
3209    /// # use duroxide::OrchestrationContext;
3210    /// # async fn example(ctx: OrchestrationContext) {
3211    /// ctx.set_custom_status("Processing batch");
3212    /// // ... work ...
3213    /// ctx.reset_custom_status(); // done, clear the progress
3214    /// # }
3215    /// ```
3216    pub fn reset_custom_status(&self) {
3217        let mut inner = self.inner.lock().unwrap();
3218        inner.accumulated_custom_status = None;
3219        inner.emit_action(Action::UpdateCustomStatus { status: None });
3220    }
3221
3222    /// Returns the current custom status value, if any.
3223    ///
3224    /// This reflects all `set_custom_status` / `reset_custom_status` calls made so far
3225    /// in this and previous turns. In a CAN'd execution, it includes the value carried
3226    /// from the previous execution.
3227    pub fn get_custom_status(&self) -> Option<String> {
3228        self.inner.lock().unwrap().accumulated_custom_status.clone()
3229    }
3230
3231    // =========================================================================
3232    // Key-Value Store
3233    // =========================================================================
3234
3235    /// Set a key-value pair scoped to this orchestration instance.
3236    ///
3237    /// Emits a `KeyValueSet` history event. The provider materializes this
3238    /// into the `kv_store` table during ack. The `last_updated_at_ms` timestamp
3239    /// is stamped from the current wall clock and persisted in the event.
3240    pub fn set_kv_value(&self, key: impl Into<String>, value: impl Into<String>) {
3241        let key: String = key.into();
3242        let value: String = value.into();
3243        let mut inner = self.inner.lock().unwrap();
3244        let last_updated_at_ms = inner.now_ms();
3245        inner.kv_state.insert(key.clone(), value.clone());
3246        inner.kv_metadata.insert(key.clone(), last_updated_at_ms);
3247        inner.emit_action(Action::SetKeyValue {
3248            key,
3249            value,
3250            last_updated_at_ms,
3251        });
3252    }
3253
3254    /// Set a typed value (serialized as JSON) scoped to this orchestration instance.
3255    pub fn set_kv_value_typed<T: serde::Serialize>(&self, key: impl Into<String>, value: &T) {
3256        let serialized = serde_json::to_string(value).expect("KV value serialization should not fail");
3257        self.set_kv_value(key, serialized);
3258    }
3259
3260    /// Read a KV entry from in-memory state.
3261    ///
3262    /// Pure read — no provider call, no event emitted, fully deterministic.
3263    /// Returns `None` if the key has never been set or was cleared.
3264    pub fn get_kv_value(&self, key: &str) -> Option<String> {
3265        self.inner.lock().unwrap().kv_state.get(key).cloned()
3266    }
3267
3268    /// Read a typed KV entry. Returns `None` if the key doesn't exist.
3269    ///
3270    /// # Errors
3271    ///
3272    /// Returns `Err` if the key exists but deserialization fails.
3273    pub fn get_kv_value_typed<T: serde::de::DeserializeOwned>(&self, key: &str) -> Result<Option<T>, String> {
3274        match self.get_kv_value(key) {
3275            None => Ok(None),
3276            Some(s) => serde_json::from_str(&s)
3277                .map(Some)
3278                .map_err(|e| format!("KV deserialization error for key '{}': {}", key, e)),
3279        }
3280    }
3281
3282    /// Return a snapshot of all KV entries as a `HashMap`.
3283    ///
3284    /// Pure read from in-memory state — no provider call, no event emitted.
3285    pub fn get_kv_all_values(&self) -> std::collections::HashMap<String, String> {
3286        self.inner.lock().unwrap().kv_state.clone()
3287    }
3288
3289    /// Return a list of all KV keys.
3290    ///
3291    /// Pure read from in-memory state — no provider call, no event emitted.
3292    pub fn get_kv_all_keys(&self) -> Vec<String> {
3293        self.inner.lock().unwrap().kv_state.keys().cloned().collect()
3294    }
3295
3296    /// Return the number of KV entries.
3297    ///
3298    /// Pure read from in-memory state — no provider call, no event emitted.
3299    pub fn get_kv_length(&self) -> usize {
3300        self.inner.lock().unwrap().kv_state.len()
3301    }
3302
3303    /// Clear a single key from the KV store.
3304    ///
3305    /// Emits a `KeyValueCleared` history event. After this call,
3306    /// `get_kv_value(key)` returns `None`.
3307    pub fn clear_kv_value(&self, key: impl Into<String>) {
3308        let key: String = key.into();
3309        let mut inner = self.inner.lock().unwrap();
3310        inner.kv_state.remove(&key);
3311        inner.kv_metadata.remove(&key);
3312        inner.emit_action(Action::ClearKeyValue { key });
3313    }
3314
3315    /// Clear all keys from the KV store.
3316    ///
3317    /// Emits a `KeyValuesCleared` history event. After this call,
3318    /// `get_kv_value(key)` returns `None` for all keys.
3319    pub fn clear_all_kv_values(&self) {
3320        let mut inner = self.inner.lock().unwrap();
3321        inner.kv_state.clear();
3322        inner.kv_metadata.clear();
3323        inner.emit_action(Action::ClearKeyValues);
3324    }
3325
3326    /// Remove KV entries whose persisted `last_updated_at_ms` is older than `updated_before_ms`.
3327    ///
3328    /// This helper scans the snapshot metadata loaded from the provider and emits
3329    /// `clear_kv_value()` for each qualifying key. Keys written during the current
3330    /// turn are never prunable (they haven't been acked yet).
3331    ///
3332    /// Returns the number of keys cleared.
3333    pub fn prune_kv_values_updated_before(&self, updated_before_ms: u64) -> usize {
3334        let keys_to_clear: Vec<String> = {
3335            let inner = self.inner.lock().unwrap();
3336            inner
3337                .kv_metadata
3338                .iter()
3339                .filter(|(_, ts)| **ts < updated_before_ms)
3340                .filter(|(key, _)| inner.kv_state.contains_key(*key))
3341                .map(|(key, _)| key.clone())
3342                .collect()
3343        };
3344        let count = keys_to_clear.len();
3345        for key in keys_to_clear {
3346            self.clear_kv_value(key);
3347        }
3348        count
3349    }
3350
3351    /// Read a KV entry from another orchestration instance.
3352    ///
3353    /// This is modeled as a system activity — under the covers it schedules
3354    /// `__duroxide_syscall:get_kv_value` which calls `client.get_kv_value()`.
3355    /// Returns the value at the time the activity executes (not replay-cached on first run).
3356    /// On replay, the recorded result is returned — no provider call.
3357    pub fn get_kv_value_from_instance(
3358        &self,
3359        instance_id: impl Into<String>,
3360        key: impl Into<String>,
3361    ) -> DurableFuture<Result<Option<String>, String>> {
3362        let input = serde_json::json!({
3363            "instance_id": instance_id.into(),
3364            "key": key.into(),
3365        })
3366        .to_string();
3367        self.schedule_activity(SYSCALL_ACTIVITY_GET_KV_VALUE, input)
3368            .map(|result| match result {
3369                Ok(json_str) => serde_json::from_str::<Option<String>>(&json_str)
3370                    .map_err(|e| format!("get_kv_value_from_instance deserialization error: {e}")),
3371                Err(e) => Err(e),
3372            })
3373    }
3374
3375    /// Typed variant of `get_kv_value_from_instance`.
3376    ///
3377    /// Deserializes the returned JSON string into `T`.
3378    /// Returns `Ok(None)` if the key doesn't exist in the remote instance.
3379    pub fn get_kv_value_from_instance_typed<T: serde::de::DeserializeOwned + Send + 'static>(
3380        &self,
3381        instance_id: impl Into<String>,
3382        key: impl Into<String>,
3383    ) -> DurableFuture<Result<Option<T>, String>> {
3384        self.get_kv_value_from_instance(instance_id, key)
3385            .map(|result| match result {
3386                Ok(None) => Ok(None),
3387                Ok(Some(s)) => serde_json::from_str::<T>(&s)
3388                    .map(Some)
3389                    .map_err(|e| format!("get_kv_value_from_instance_typed deserialization error: {e}")),
3390                Err(e) => Err(e),
3391            })
3392    }
3393}
3394
3395// Aggregate future machinery lives below (OrchestrationContext helpers)
3396
3397impl OrchestrationContext {
3398    // =========================================================================
3399    // Core scheduling methods - return DurableFuture with cancellation support
3400    // =========================================================================
3401
3402    /// Schedule an activity and return a cancellation-aware future.
3403    ///
3404    /// Returns a [`DurableFuture`] that supports cancellation on drop. If the future
3405    /// is dropped without completing (e.g., as a select loser), the activity will be
3406    /// cancelled via lock stealing.
3407    ///
3408    /// # Example
3409    ///
3410    /// ```rust,no_run
3411    /// # use duroxide::OrchestrationContext;
3412    /// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
3413    /// // Fan-out to multiple activities
3414    /// let f1 = ctx.schedule_activity("Process", "A");
3415    /// let f2 = ctx.schedule_activity("Process", "B");
3416    /// let results = ctx.join(vec![f1, f2]).await;
3417    /// # Ok("done".to_string())
3418    /// # }
3419    /// ```
3420    pub fn schedule_activity(
3421        &self,
3422        name: impl Into<String>,
3423        input: impl Into<String>,
3424    ) -> DurableFuture<Result<String, String>> {
3425        self.schedule_activity_internal(name, input, None)
3426    }
3427
3428    /// Typed version of schedule_activity that serializes input and deserializes output.
3429    ///
3430    /// # Errors
3431    ///
3432    /// Returns an error if the activity fails or if the output cannot be deserialized.
3433    pub fn schedule_activity_typed<In: serde::Serialize, Out: serde::de::DeserializeOwned + Send + 'static>(
3434        &self,
3435        name: impl Into<String>,
3436        input: &In,
3437    ) -> DurableFuture<Result<Out, String>> {
3438        let payload = crate::_typed_codec::Json::encode(input).expect("encode");
3439        self.schedule_activity(name, payload)
3440            .map(|r| r.and_then(|s| crate::_typed_codec::Json::decode::<Out>(&s)))
3441    }
3442
3443    /// Schedule an activity routed to the worker owning the given session.
3444    ///
3445    /// If no worker owns the session, any worker can claim it on first fetch.
3446    /// Once claimed, all subsequent activities with the same `session_id` route
3447    /// to the claiming worker until the session unpins (idle timeout or worker death).
3448    ///
3449    /// # Example
3450    ///
3451    /// ```rust,no_run
3452    /// # use duroxide::OrchestrationContext;
3453    /// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
3454    /// let session_id = ctx.new_guid().await?;
3455    /// let result = ctx.schedule_activity_on_session("run_turn", "input", &session_id).await?;
3456    /// # Ok(result)
3457    /// # }
3458    /// ```
3459    pub fn schedule_activity_on_session(
3460        &self,
3461        name: impl Into<String>,
3462        input: impl Into<String>,
3463        session_id: impl Into<String>,
3464    ) -> DurableFuture<Result<String, String>> {
3465        self.schedule_activity_internal(name, input, Some(session_id.into()))
3466    }
3467
3468    /// Typed version of schedule_activity_on_session that serializes input and deserializes output.
3469    ///
3470    /// # Errors
3471    ///
3472    /// Returns an error if the activity fails or if the output cannot be deserialized.
3473    pub fn schedule_activity_on_session_typed<
3474        In: serde::Serialize,
3475        Out: serde::de::DeserializeOwned + Send + 'static,
3476    >(
3477        &self,
3478        name: impl Into<String>,
3479        input: &In,
3480        session_id: impl Into<String>,
3481    ) -> DurableFuture<Result<Out, String>> {
3482        let payload = crate::_typed_codec::Json::encode(input).expect("encode");
3483        self.schedule_activity_on_session(name, payload, session_id)
3484            .map(|r| r.and_then(|s| crate::_typed_codec::Json::decode::<Out>(&s)))
3485    }
3486
3487    /// Internal implementation for activity scheduling.
3488    fn schedule_activity_internal(
3489        &self,
3490        name: impl Into<String>,
3491        input: impl Into<String>,
3492        session_id: Option<String>,
3493    ) -> DurableFuture<Result<String, String>> {
3494        let name: String = name.into();
3495        let input: String = input.into();
3496
3497        let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
3498
3499        let token = inner.emit_action(Action::CallActivity {
3500            scheduling_event_id: 0, // Will be assigned by replay engine
3501            name: name.clone(),
3502            input: input.clone(),
3503            session_id,
3504            tag: None,
3505        });
3506        drop(inner);
3507
3508        let ctx = self.clone();
3509        let inner_future = std::future::poll_fn(move |_cx| {
3510            let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
3511            if let Some(result) = inner.get_result(token) {
3512                match result {
3513                    CompletionResult::ActivityOk(s) => Poll::Ready(Ok(s.clone())),
3514                    CompletionResult::ActivityErr(e) => Poll::Ready(Err(e.clone())),
3515                    _ => Poll::Pending, // Wrong result type, keep waiting
3516                }
3517            } else {
3518                Poll::Pending
3519            }
3520        });
3521
3522        DurableFuture::new(token, ScheduleKind::Activity { name }, self.clone(), inner_future)
3523    }
3524
3525    /// Schedule a timer and return a cancellation-aware future.
3526    ///
3527    /// Timers are virtual constructs - dropping the future is a no-op since there's
3528    /// no external state to cancel. However, wrapping in `DurableFuture` maintains
3529    /// API consistency.
3530    pub fn schedule_timer(&self, delay: std::time::Duration) -> DurableFuture<()> {
3531        let delay_ms = delay.as_millis() as u64;
3532
3533        let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
3534
3535        let now = inner.now_ms();
3536        let fire_at_ms = now.saturating_add(delay_ms);
3537        let token = inner.emit_action(Action::CreateTimer {
3538            scheduling_event_id: 0,
3539            fire_at_ms,
3540        });
3541        drop(inner);
3542
3543        let ctx = self.clone();
3544        let inner_future = std::future::poll_fn(move |_cx| {
3545            let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
3546            if let Some(result) = inner.get_result(token) {
3547                match result {
3548                    CompletionResult::TimerFired => Poll::Ready(()),
3549                    _ => Poll::Pending,
3550                }
3551            } else {
3552                Poll::Pending
3553            }
3554        });
3555
3556        DurableFuture::new(token, ScheduleKind::Timer, self.clone(), inner_future)
3557    }
3558
3559    /// Subscribe to an external event and return a cancellation-aware future.
3560    ///
3561    /// External waits are virtual constructs - dropping the future is a no-op since
3562    /// there's no external state to cancel. However, wrapping in `DurableFuture`
3563    /// maintains API consistency.
3564    pub fn schedule_wait(&self, name: impl Into<String>) -> DurableFuture<String> {
3565        let name: String = name.into();
3566
3567        let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
3568
3569        let token = inner.emit_action(Action::WaitExternal {
3570            scheduling_event_id: 0,
3571            name: name.clone(),
3572        });
3573        drop(inner);
3574
3575        let ctx = self.clone();
3576        let inner_future = std::future::poll_fn(move |_cx| {
3577            let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
3578            // Only resolve once the token has been bound to a persisted schedule_id.
3579            // External events arriving before subscription binding are currently unsupported.
3580            if let Some(bound_id) = inner.get_bound_schedule_id(token)
3581                && let Some(data) = inner.get_external_event(bound_id)
3582            {
3583                return Poll::Ready(data.clone());
3584            }
3585            Poll::Pending
3586        });
3587
3588        DurableFuture::new(
3589            token,
3590            ScheduleKind::ExternalWait { event_name: name },
3591            self.clone(),
3592            inner_future,
3593        )
3594    }
3595
3596    /// Typed version of schedule_wait.
3597    pub fn schedule_wait_typed<T: serde::de::DeserializeOwned + Send + 'static>(
3598        &self,
3599        name: impl Into<String>,
3600    ) -> DurableFuture<T> {
3601        self.schedule_wait(name)
3602            .map(|s| crate::_typed_codec::Json::decode::<T>(&s).expect("decode"))
3603    }
3604
3605    /// Dequeue the next message from a named queue (FIFO mailbox semantics).
3606    ///
3607    /// Unlike `schedule_wait`, queued events use FIFO matching:
3608    /// - No positional pairing — any unresolved subscription gets the first unmatched arrival
3609    /// - Cancelled subscriptions are skipped (don't consume arrivals)
3610    /// - Events that arrive before a subscription are buffered until consumed
3611    /// - Events survive `continue_as_new` boundaries (carried forward)
3612    ///
3613    /// The caller enqueues messages with [`Client::enqueue_event`].
3614    pub fn dequeue_event(&self, queue: impl Into<String>) -> DurableFuture<String> {
3615        let name: String = queue.into();
3616
3617        let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
3618
3619        let token = inner.emit_action(Action::DequeueEvent {
3620            scheduling_event_id: 0,
3621            name: name.clone(),
3622        });
3623        drop(inner);
3624
3625        let ctx = self.clone();
3626        let inner_future = std::future::poll_fn(move |_cx| {
3627            let mut inner = ctx.inner.lock().expect("Mutex should not be poisoned");
3628            if let Some(bound_id) = inner.get_bound_schedule_id(token)
3629                && let Some(data) = inner.get_queue_message(bound_id)
3630            {
3631                return Poll::Ready(data);
3632            }
3633            Poll::Pending
3634        });
3635
3636        DurableFuture::new(
3637            token,
3638            ScheduleKind::QueueDequeue { event_name: name },
3639            self.clone(),
3640            inner_future,
3641        )
3642    }
3643
3644    /// Typed version of [`Self::dequeue_event`]. Deserializes the message payload as `T`.
3645    pub fn dequeue_event_typed<T: serde::de::DeserializeOwned>(
3646        &self,
3647        queue: impl Into<String>,
3648    ) -> impl Future<Output = T> {
3649        let fut = self.dequeue_event(queue);
3650        async move {
3651            let s = fut.await;
3652            crate::_typed_codec::Json::decode::<T>(&s).expect("decode")
3653        }
3654    }
3655
3656    /// Subscribe to a persistent external event (mailbox semantics).
3657    ///
3658    /// Prefer [`Self::dequeue_event`] — this is a deprecated alias.
3659    #[deprecated(note = "Use dequeue_event() instead")]
3660    pub fn schedule_wait_persistent(&self, name: impl Into<String>) -> DurableFuture<String> {
3661        self.dequeue_event(name)
3662    }
3663
3664    /// Typed version of schedule_wait_persistent.
3665    ///
3666    /// Prefer [`Self::dequeue_event_typed`] — this is a deprecated alias.
3667    #[deprecated(note = "Use dequeue_event_typed() instead")]
3668    pub fn schedule_wait_persistent_typed<T: serde::de::DeserializeOwned>(
3669        &self,
3670        name: impl Into<String>,
3671    ) -> impl Future<Output = T> {
3672        self.dequeue_event_typed(name)
3673    }
3674
3675    /// V2: Subscribe to an external event with topic-based pub/sub matching.
3676    ///
3677    /// Same semantics as `schedule_wait`, but matches on both `name` AND `topic`.
3678    /// Feature-gated for replay engine extensibility verification.
3679    #[cfg(feature = "replay-version-test")]
3680    pub fn schedule_wait2(&self, name: impl Into<String>, topic: impl Into<String>) -> DurableFuture<String> {
3681        let name: String = name.into();
3682        let topic: String = topic.into();
3683
3684        let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
3685
3686        let token = inner.emit_action(Action::WaitExternal2 {
3687            scheduling_event_id: 0,
3688            name: name.clone(),
3689            topic: topic.clone(),
3690        });
3691        drop(inner);
3692
3693        let ctx = self.clone();
3694        let inner_future = std::future::poll_fn(move |_cx| {
3695            let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
3696            if let Some(bound_id) = inner.get_bound_schedule_id(token)
3697                && let Some(data) = inner.get_external_event2(bound_id)
3698            {
3699                return Poll::Ready(data.clone());
3700            }
3701            Poll::Pending
3702        });
3703
3704        DurableFuture::new(
3705            token,
3706            ScheduleKind::ExternalWait { event_name: name },
3707            self.clone(),
3708            inner_future,
3709        )
3710    }
3711
3712    /// Schedule a sub-orchestration and return a cancellation-aware future.
3713    ///
3714    /// The child instance ID is auto-generated from the event ID with a parent prefix.
3715    ///
3716    /// Returns a [`DurableFuture`] that supports cancellation on drop. If the future
3717    /// is dropped without completing, a `CancelInstance` work item will be enqueued
3718    /// for the child orchestration.
3719    pub fn schedule_sub_orchestration(
3720        &self,
3721        name: impl Into<String>,
3722        input: impl Into<String>,
3723    ) -> DurableFuture<Result<String, String>> {
3724        self.schedule_sub_orchestration_versioned_with_id_internal(name, None, None, input)
3725    }
3726
3727    /// Schedule a sub-orchestration with an explicit instance ID.
3728    ///
3729    /// The provided `instance` value is used exactly as the child instance ID,
3730    /// without any parent prefix. Use this when you need to control the exact
3731    /// instance ID for the sub-orchestration.
3732    ///
3733    /// For auto-generated instance IDs, use [`schedule_sub_orchestration`] instead.
3734    pub fn schedule_sub_orchestration_with_id(
3735        &self,
3736        name: impl Into<String>,
3737        instance: impl Into<String>,
3738        input: impl Into<String>,
3739    ) -> DurableFuture<Result<String, String>> {
3740        self.schedule_sub_orchestration_versioned_with_id_internal(name, None, Some(instance.into()), input)
3741    }
3742
3743    /// Schedule a versioned sub-orchestration.
3744    ///
3745    /// If `version` is `Some`, that specific version is used.
3746    /// If `version` is `None`, the registry's policy (e.g., Latest) is used.
3747    pub fn schedule_sub_orchestration_versioned(
3748        &self,
3749        name: impl Into<String>,
3750        version: Option<String>,
3751        input: impl Into<String>,
3752    ) -> DurableFuture<Result<String, String>> {
3753        self.schedule_sub_orchestration_versioned_with_id_internal(name, version, None, input)
3754    }
3755
3756    /// Schedule a versioned sub-orchestration with an explicit instance ID.
3757    ///
3758    /// The provided `instance` value is used exactly as the child instance ID,
3759    /// without any parent prefix.
3760    ///
3761    /// Returns a [`DurableFuture`] that supports cancellation on drop. If the future
3762    /// is dropped without completing, a `CancelInstance` work item will be enqueued
3763    /// for the child orchestration.
3764    pub fn schedule_sub_orchestration_versioned_with_id(
3765        &self,
3766        name: impl Into<String>,
3767        version: Option<String>,
3768        instance: impl Into<String>,
3769        input: impl Into<String>,
3770    ) -> DurableFuture<Result<String, String>> {
3771        self.schedule_sub_orchestration_versioned_with_id_internal(name, version, Some(instance.into()), input)
3772    }
3773
3774    /// Internal implementation for sub-orchestration scheduling.
3775    ///
3776    /// If `instance` is `Some`, it's an explicit ID (no parent prefix).
3777    /// If `instance` is `None`, auto-generate from event ID (with parent prefix).
3778    fn schedule_sub_orchestration_versioned_with_id_internal(
3779        &self,
3780        name: impl Into<String>,
3781        version: Option<String>,
3782        instance: Option<String>,
3783        input: impl Into<String>,
3784    ) -> DurableFuture<Result<String, String>> {
3785        let name: String = name.into();
3786        let input: String = input.into();
3787
3788        let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
3789
3790        // For explicit instance IDs, use them as-is (no parent prefix will be added).
3791        // For auto-generated, use placeholder that will be replaced with SUB_ORCH_AUTO_PREFIX + event_id
3792        // and parent prefix will be added.
3793        let action_instance = match &instance {
3794            Some(explicit_id) => explicit_id.clone(),
3795            None => format!("{}{}", SUB_ORCH_PENDING_PREFIX, inner.next_token + 1),
3796        };
3797        let token = inner.emit_action(Action::StartSubOrchestration {
3798            scheduling_event_id: 0,
3799            name: name.clone(),
3800            version,
3801            instance: action_instance.clone(),
3802            input: input.clone(),
3803        });
3804        drop(inner);
3805
3806        let ctx = self.clone();
3807        let inner_future = std::future::poll_fn(move |_cx| {
3808            let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
3809            if let Some(result) = inner.get_result(token) {
3810                match result {
3811                    CompletionResult::SubOrchOk(s) => Poll::Ready(Ok(s.clone())),
3812                    CompletionResult::SubOrchErr(e) => Poll::Ready(Err(e.clone())),
3813                    _ => Poll::Pending,
3814                }
3815            } else {
3816                Poll::Pending
3817            }
3818        });
3819
3820        // For cancellation, we store the token. The consumption path will look up
3821        // the resolved instance ID from the sub_orchestration_instances mapping.
3822        DurableFuture::new(
3823            token,
3824            ScheduleKind::SubOrchestration { token },
3825            self.clone(),
3826            inner_future,
3827        )
3828    }
3829
3830    /// Typed version of schedule_sub_orchestration.
3831    ///
3832    /// # Errors
3833    ///
3834    /// Returns an error if the sub-orchestration fails or if the output cannot be deserialized.
3835    pub fn schedule_sub_orchestration_typed<In: serde::Serialize, Out: serde::de::DeserializeOwned + Send + 'static>(
3836        &self,
3837        name: impl Into<String>,
3838        input: &In,
3839    ) -> DurableFuture<Result<Out, String>> {
3840        let payload = crate::_typed_codec::Json::encode(input).expect("encode");
3841        self.schedule_sub_orchestration(name, payload)
3842            .map(|r| r.and_then(|s| crate::_typed_codec::Json::decode::<Out>(&s)))
3843    }
3844
3845    /// Typed version of schedule_sub_orchestration_with_id.
3846    ///
3847    /// # Errors
3848    ///
3849    /// Returns an error if the sub-orchestration fails or if the output cannot be deserialized.
3850    pub fn schedule_sub_orchestration_with_id_typed<
3851        In: serde::Serialize,
3852        Out: serde::de::DeserializeOwned + Send + 'static,
3853    >(
3854        &self,
3855        name: impl Into<String>,
3856        instance: impl Into<String>,
3857        input: &In,
3858    ) -> DurableFuture<Result<Out, String>> {
3859        let payload = crate::_typed_codec::Json::encode(input).expect("encode");
3860        self.schedule_sub_orchestration_with_id(name, instance, payload)
3861            .map(|r| r.and_then(|s| crate::_typed_codec::Json::decode::<Out>(&s)))
3862    }
3863
3864    /// Await all futures concurrently using `futures::future::join_all`.
3865    /// Works with any `Future` type.
3866    pub async fn join<T, F>(&self, futures: Vec<F>) -> Vec<T>
3867    where
3868        F: Future<Output = T>,
3869    {
3870        ::futures::future::join_all(futures).await
3871    }
3872
3873    /// Simplified join for exactly 2 futures (convenience method).
3874    pub async fn join2<T1, T2, F1, F2>(&self, f1: F1, f2: F2) -> (T1, T2)
3875    where
3876        F1: Future<Output = T1>,
3877        F2: Future<Output = T2>,
3878    {
3879        ::futures::future::join(f1, f2).await
3880    }
3881
3882    /// Simplified join for exactly 3 futures (convenience method).
3883    pub async fn join3<T1, T2, T3, F1, F2, F3>(&self, f1: F1, f2: F2, f3: F3) -> (T1, T2, T3)
3884    where
3885        F1: Future<Output = T1>,
3886        F2: Future<Output = T2>,
3887        F3: Future<Output = T3>,
3888    {
3889        ::futures::future::join3(f1, f2, f3).await
3890    }
3891
3892    /// Simplified select over 2 futures: returns the result of whichever completes first.
3893    /// Select over 2 futures with potentially different output types.
3894    ///
3895    /// Returns `Either2::First(result)` if first future wins, `Either2::Second(result)` if second wins.
3896    /// Uses futures::select_biased! for determinism (first branch polled first).
3897    ///
3898    /// # Example: Activity with timeout
3899    /// ```rust,no_run
3900    /// # use duroxide::{OrchestrationContext, Either2};
3901    /// # use std::time::Duration;
3902    /// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
3903    /// let work = ctx.schedule_activity("SlowWork", "input");
3904    /// let timeout = ctx.schedule_timer(Duration::from_secs(30));
3905    ///
3906    /// match ctx.select2(work, timeout).await {
3907    ///     Either2::First(result) => result,
3908    ///     Either2::Second(()) => Err("Operation timed out".to_string()),
3909    /// }
3910    /// # }
3911    /// ```
3912    pub async fn select2<T1, T2, F1, F2>(&self, f1: F1, f2: F2) -> Either2<T1, T2>
3913    where
3914        F1: Future<Output = T1>,
3915        F2: Future<Output = T2>,
3916    {
3917        use ::futures::FutureExt;
3918        let mut f1 = std::pin::pin!(f1.fuse());
3919        let mut f2 = std::pin::pin!(f2.fuse());
3920        ::futures::select_biased! {
3921            result = f1 => Either2::First(result),
3922            result = f2 => Either2::Second(result),
3923        }
3924    }
3925
3926    /// Select over 3 futures with potentially different output types.
3927    ///
3928    /// Returns `Either3::First/Second/Third(result)` depending on which future completes first.
3929    /// Uses futures::select_biased! for determinism (earlier branches polled first).
3930    pub async fn select3<T1, T2, T3, F1, F2, F3>(&self, f1: F1, f2: F2, f3: F3) -> Either3<T1, T2, T3>
3931    where
3932        F1: Future<Output = T1>,
3933        F2: Future<Output = T2>,
3934        F3: Future<Output = T3>,
3935    {
3936        use ::futures::FutureExt;
3937        let mut f1 = std::pin::pin!(f1.fuse());
3938        let mut f2 = std::pin::pin!(f2.fuse());
3939        let mut f3 = std::pin::pin!(f3.fuse());
3940        ::futures::select_biased! {
3941            result = f1 => Either3::First(result),
3942            result = f2 => Either3::Second(result),
3943            result = f3 => Either3::Third(result),
3944        }
3945    }
3946}