duroxide/lib.rs
1//! # Duroxide: Durable execution framework in Rust
2//!
3//! Duroxide is a framework for building reliable, long-running code based workflows that can survive
4//! failures and restarts. For a deep dive into how durable execution works, see the
5//! [Durable Futures Internals](https://github.com/affandar/duroxide/blob/main/docs/durable-futures-internals.md) documentation.
6//!
7//! ## Quick Start
8//!
9//! ```rust,no_run
10//! use duroxide::providers::sqlite::SqliteProvider;
11//! use duroxide::runtime::registry::ActivityRegistry;
12//! use duroxide::runtime::{self};
13//! use duroxide::{ActivityContext, OrchestrationContext, OrchestrationRegistry, Client};
14//! use std::sync::Arc;
15//!
16//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
17//! // 1. Create a storage provider
18//! let store = Arc::new(SqliteProvider::new("sqlite:./data.db", None).await.unwrap());
19//!
20//! // 2. Register activities (your business logic)
21//! let activities = ActivityRegistry::builder()
22//! .register("Greet", |_ctx: ActivityContext, name: String| async move {
23//! Ok(format!("Hello, {}!", name))
24//! })
25//! .build();
26//!
27//! // 3. Define your orchestration
28//! let orchestration = |ctx: OrchestrationContext, name: String| async move {
29//! let greeting = ctx.schedule_activity("Greet", name).await?;
30
31// Mutex poisoning indicates a panic in another thread - a critical error.
32// All expect()/unwrap() calls on mutex locks in this module are intentional:
33// poisoned mutexes should panic as they indicate corrupted state.
34#![allow(clippy::expect_used)]
35#![allow(clippy::unwrap_used)]
36// Arc::clone() vs .clone() is a style preference - we use .clone() for brevity
37#![allow(clippy::clone_on_ref_ptr)]
38//! Ok(greeting)
39//! };
40//!
41//! // 4. Register and start the runtime
42//! let orchestrations = OrchestrationRegistry::builder()
43//! .register("HelloWorld", orchestration)
44//! .build();
45//!
46//! let rt = runtime::Runtime::start_with_store(
47//! store.clone(), activities, orchestrations
48//! ).await;
49//!
50//! // 5. Create a client and start an orchestration instance
51//! let client = Client::new(store.clone());
52//! client.start_orchestration("inst-1", "HelloWorld", "World").await?;
53//! let result = client.wait_for_orchestration("inst-1", std::time::Duration::from_secs(5)).await
54//! .map_err(|e| format!("Wait error: {:?}", e))?;
55//! # Ok(())
56//! # }
57//! ```
58//!
59//! ## Key Concepts
60//!
61//! - **Orchestrations**: Long-running workflows written as async functions (coordination logic)
62//! - **Activities**: Single-purpose work units (can do anything - DB, API, polling, etc.)
63//! - Supports long-running activities via automatic lock renewal (minutes to hours)
64//! - **Timers**: Use `ctx.schedule_timer(ms)` for orchestration-level delays and timeouts
65//! - **Deterministic Replay**: Orchestrations are replayed from history to ensure consistency
66//! - **Durable Futures**: Composable futures for activities, timers, and external events
67//! - **ContinueAsNew (Multi-Execution)**: An orchestration can end the current execution and
68//! immediately start a new one with fresh input. Each execution has its own isolated history
69//! that starts with `OrchestrationStarted { event_id: 1 }`.
70//!
71//! ## ⚠️ Important: Orchestrations vs Activities
72//!
73//! **Orchestrations = Coordination (control flow, business logic)**
74//! **Activities = Execution (single-purpose work units)**
75//!
76//! ```rust,no_run
77//! # use duroxide::OrchestrationContext;
78//! # use std::time::Duration;
79//! # async fn example(ctx: OrchestrationContext) -> Result<(), String> {
80//! // ✅ CORRECT: Orchestration-level delay using timer
81//! ctx.schedule_timer(Duration::from_secs(5)).await; // Wait 5 seconds
82//!
83//! // ✅ ALSO CORRECT: Activity can poll/sleep as part of its work
84//! // Example: Activity that provisions a VM and polls for readiness
85//! // activities.register("ProvisionVM", |config| async move {
86//! // let vm = create_vm(config).await?;
87//! // while !vm_ready(&vm).await {
88//! // tokio::time::sleep(Duration::from_secs(5)).await; // ✅ OK - part of provisioning
89//! // }
90//! // Ok(vm.id)
91//! // });
92//!
93//! // ❌ WRONG: Activity that ONLY sleeps (use timer instead)
94//! // ctx.schedule_activity("Sleep5Seconds", "").await;
95//! # Ok(())
96//! # }
97//! ```
98//!
99//! **Put in Activities (single-purpose execution units):**
100//! - Database operations
101//! - API calls (can include retries/polling)
102//! - Data transformations
103//! - File I/O
104//! - VM provisioning (with internal polling)
105//!
106//! **Put in Orchestrations (coordination and business logic):**
107//! - Control flow (if/else, match, loops)
108//! - Business decisions
109//! - Multi-step workflows
110//! - Error handling and compensation
111//! - Timeouts and deadlines (use timers)
112//! - Waiting for external events
113//!
114//! ## ContinueAsNew (Multi-Execution) Semantics
115//!
116//! ContinueAsNew (CAN) allows an orchestration to end its current execution and start a new
117//! one with fresh input (useful for loops, pagination, long-running workflows).
118//!
119//! - Orchestration calls `ctx.continue_as_new(new_input)`
120//! - Runtime stamps `OrchestrationContinuedAsNew` in the CURRENT execution's history
121//! - Runtime enqueues a `WorkItem::ContinueAsNew`
122//! - When processing that work item, the runtime starts a NEW execution with:
123//! - `execution_id = previous_execution_id + 1`
124//! - `existing_history = []` (fresh history)
125//! - `OrchestrationStarted { event_id: 1, input = new_input }` is stamped automatically
126//! - Each execution's history is independent; `duroxide::Client::read_execution_history(instance, id)`
127//! returns events for that execution only
128//!
129//! Provider responsibilities are strictly storage-level (see below). The runtime owns all
130//! orchestration semantics, including execution boundaries and starting the new execution.
131//!
132//! ## Provider Responsibilities (At a Glance)
133//!
134//! Providers are pure storage abstractions. The runtime computes orchestration semantics
135//! and passes explicit instructions to the provider.
136//!
137//! - `fetch_orchestration_item()`
138//! - Return a locked batch of work for ONE instance
139//! - Include full history for the CURRENT `execution_id`
140//! - Do NOT create/synthesize new executions here (even for ContinueAsNew)
141//!
142//! - `ack_orchestration_item(lock_token, execution_id, history_delta, ..., metadata)`
143//! - Atomic commit of one orchestration turn
144//! - Idempotently `INSERT OR IGNORE` execution row for the explicit `execution_id`
145//! - `UPDATE instances.current_execution_id = MAX(current_execution_id, execution_id)`
146//! - Append `history_delta` to the specified execution
147//! - Update `executions.status` and `executions.output` from `metadata` (no event inspection)
148//!
149//! - Worker/Timer queues
150//! - Peek-lock semantics (dequeue with lock token; ack by deleting)
151//! - Automatic lock renewal for long-running activities (no configuration needed)
152//! - Orchestrator, Worker, Timer queues are independent but committed atomically with history
153//!
154//! See `docs/provider-implementation-guide.md` and `src/providers/sqlite.rs` for a complete,
155//! production-grade provider implementation.
156//!
157//! ## Simplified API
158//!
159//! All schedule methods return typed futures that can be awaited directly:
160//!
161//! ```rust,no_run
162//! # use duroxide::OrchestrationContext;
163//! # use std::time::Duration;
164//! # async fn example(ctx: OrchestrationContext) -> Result<(), String> {
165//! // Activities return Result<String, String>
166//! let result = ctx.schedule_activity("Task", "input").await?;
167//!
168//! // Timers return ()
169//! ctx.schedule_timer(Duration::from_secs(5)).await;
170//!
171//! // External events return String
172//! let event = ctx.schedule_wait("Event").await;
173//!
174//! // Sub-orchestrations return Result<String, String>
175//! let sub_result = ctx.schedule_sub_orchestration("Sub", "input").await?;
176//! # Ok(())
177//! # }
178//! ```
179//!
180//! ## Common Patterns
181//!
182//! ### Function Chaining
183//! ```rust,no_run
184//! # use duroxide::OrchestrationContext;
185//! async fn chain_example(ctx: OrchestrationContext) -> Result<String, String> {
186//! let step1 = ctx.schedule_activity("Step1", "input").await?;
187//! let step2 = ctx.schedule_activity("Step2", &step1).await?;
188//! Ok(step2)
189//! }
190//! ```
191//!
192//! ### Fan-Out/Fan-In
193//! ```rust,no_run
194//! # use duroxide::OrchestrationContext;
195//! async fn fanout_example(ctx: OrchestrationContext) -> Vec<String> {
196//! let futures = vec![
197//! ctx.schedule_activity("Process", "item1"),
198//! ctx.schedule_activity("Process", "item2"),
199//! ctx.schedule_activity("Process", "item3"),
200//! ];
201//! let results = ctx.join(futures).await;
202//! results.into_iter().filter_map(|r| r.ok()).collect()
203//! }
204//! ```
205//!
206//! ### Human-in-the-Loop (Timeout Pattern)
207//! ```rust,no_run
208//! # use duroxide::{OrchestrationContext, Either2};
209//! # use std::time::Duration;
210//! async fn approval_example(ctx: OrchestrationContext) -> String {
211//! let timeout = ctx.schedule_timer(Duration::from_secs(30));
212//! let approval = ctx.schedule_wait("ApprovalEvent");
213//!
214//! match ctx.select2(approval, timeout).await {
215//! Either2::First(data) => data,
216//! Either2::Second(()) => "timeout".to_string(),
217//! }
218//! }
219//! ```
220//!
221//! ### Delays and Timeouts
222//! ```rust,no_run
223//! # use duroxide::{OrchestrationContext, Either2};
224//! # use std::time::Duration;
225//! async fn delay_example(ctx: OrchestrationContext) -> Result<String, String> {
226//! // Use timer for orchestration-level delays
227//! ctx.schedule_timer(Duration::from_secs(5)).await;
228//!
229//! // Process after delay
230//! let result = ctx.schedule_activity("ProcessData", "input").await?;
231//! Ok(result)
232//! }
233//!
234//! async fn timeout_example(ctx: OrchestrationContext) -> Result<String, String> {
235//! // Race work against timeout
236//! let work = ctx.schedule_activity("SlowOperation", "input");
237//! let timeout = ctx.schedule_timer(Duration::from_secs(30));
238//!
239//! match ctx.select2(work, timeout).await {
240//! Either2::First(result) => result,
241//! Either2::Second(()) => Err("Operation timed out".to_string()),
242//! }
243//! }
244//! ```
245//!
246//! ### Fan-Out/Fan-In with Error Handling
247//! ```rust,no_run
248//! # use duroxide::OrchestrationContext;
249//! async fn fanout_with_errors(ctx: OrchestrationContext, items: Vec<String>) -> Result<Vec<String>, String> {
250//! // Schedule all work in parallel
251//! let futures: Vec<_> = items.iter()
252//! .map(|item| ctx.schedule_activity("ProcessItem", item.clone()))
253//! .collect();
254//!
255//! // Wait for all to complete (deterministic order preserved)
256//! let results = ctx.join(futures).await;
257//!
258//! // Process results with error handling
259//! let mut successes = Vec::new();
260//! for result in results {
261//! match result {
262//! Ok(value) => successes.push(value),
263//! Err(e) => {
264//! // Log error but continue processing other items
265//! ctx.trace_error(format!("Item processing failed: {e}"));
266//! }
267//! }
268//! }
269//!
270//! Ok(successes)
271//! }
272//! ```
273//!
274//! ### Retry Pattern
275//! ```rust,no_run
276//! # use duroxide::{OrchestrationContext, RetryPolicy, BackoffStrategy};
277//! # use std::time::Duration;
278//! async fn retry_example(ctx: OrchestrationContext) -> Result<String, String> {
279//! // Retry with linear backoff: 5 attempts, delay increases linearly (1s, 2s, 3s, 4s)
280//! let result = ctx.schedule_activity_with_retry(
281//! "UnreliableOperation",
282//! "input",
283//! RetryPolicy::new(5)
284//! .with_backoff(BackoffStrategy::Linear {
285//! base: Duration::from_secs(1),
286//! max: Duration::from_secs(10),
287//! }),
288//! ).await?;
289//!
290//! Ok(result)
291//! }
292//! ```
293//!
294//! ## Examples
295//!
296//! See the `examples/` directory for complete, runnable examples:
297//! - `hello_world.rs` - Basic orchestration setup
298//! - `fan_out_fan_in.rs` - Parallel processing pattern with error handling
299//! - `timers_and_events.rs` - Human-in-the-loop workflows with timeouts
300//! - `delays_and_timeouts.rs` - Correct usage of timers for delays and timeouts
301//! - `with_observability.rs` - Using observability features (tracing, metrics)
302//! - `metrics_cli.rs` - Querying system metrics via CLI
303//!
304//! Run examples with: `cargo run --example <name>`
305//!
306//! ## Architecture
307//!
308//! This crate provides:
309//! - **Public data model**: `Event`, `Action` for history and decisions
310//! - **Orchestration driver**: `run_turn`, `run_turn_with`, and `Executor`
311//! - **OrchestrationContext**: Schedule activities, timers, and external events
312//! - **Deterministic futures**: `schedule_*()` return standard `Future`s that can be composed with `join`/`select`
313//! - **Runtime**: In-process execution engine with dispatchers and workers
314//! - **Providers**: Pluggable storage backends (filesystem, in-memory)
315//!
316//! ### End-to-End System Architecture
317//!
318//! ```text
319//! +-------------------------------------------------------------------------+
320//! | Application Layer |
321//! +-------------------------------------------------------------------------+
322//! | |
323//! | +--------------+ +------------------------------------+ |
324//! | | Client |-------->| start_orchestration() | |
325//! | | | | raise_event() | |
326//! | | | | wait_for_orchestration() | |
327//! | +--------------+ +------------------------------------+ |
328//! | |
329//! +-------------------------------------------------------------------------+
330//! |
331//! v
332//! +-------------------------------------------------------------------------+
333//! | Runtime Layer |
334//! +-------------------------------------------------------------------------+
335//! | |
336//! | +-------------------------------------------------------------------+ |
337//! | | Runtime | |
338//! | | +----------------------+ +----------------------+ | |
339//! | | | Orchestration | | Work | | |
340//! | | | Dispatcher | | Dispatcher | | |
341//! | | | (N concurrent) | | (N concurrent) | | |
342//! | | +----------+-----------+ +----------+-----------+ | |
343//! | | | | | |
344//! | | | Processes turns | Executes activities| |
345//! | | | | | |
346//! | +-------------+--------------------------------+--------------------+ |
347//! | | | |
348//! | +-------------v--------------------------------v--------------------+ |
349//! | | OrchestrationRegistry: maps names -> orchestration handlers | |
350//! | +-------------------------------------------------------------------+ |
351//! | |
352//! | +-------------------------------------------------------------------+ |
353//! | | ActivityRegistry: maps names -> activity handlers | |
354//! | +-------------------------------------------------------------------+ |
355//! | |
356//! +-------------------------------------------------------------------------+
357//! | |
358//! | Fetches work items | Fetches work items
359//! | (peek-lock) | (peek-lock)
360//! v v
361//! +-------------------------------------------------------------------------+
362//! | Provider Layer |
363//! +-------------------------------------------------------------------------+
364//! | |
365//! | +----------------------------+ +----------------------------+ |
366//! | | Orchestrator Queue | | Worker Queue | |
367//! | | - StartOrchestration | | - ActivityExecute | |
368//! | | - ActivityCompleted | | | |
369//! | | - ActivityFailed | | | |
370//! | | - TimerFired (delayed) | | | |
371//! | | - ExternalRaised | | | |
372//! | | - ContinueAsNew | | | |
373//! | +----------------------------+ +----------------------------+ |
374//! | |
375//! | +-------------------------------------------------------------------+ |
376//! | | Provider (Storage) | |
377//! | | - History (Events per instance/execution) | |
378//! | | - Instance metadata | |
379//! | | - Execution metadata | |
380//! | | - Instance locks (peek-lock semantics) | |
381//! | | - Queue management (enqueue/dequeue with visibility) | |
382//! | +-------------------------------------------------------------------+ |
383//! | |
384//! | +-------------------------------------------------------------------+ |
385//! | | Storage Backend (SQLite, etc.) | |
386//! | +-------------------------------------------------------------------+ |
387//! | |
388//! +-------------------------------------------------------------------------+
389//!
390//! ### Execution Flow
391//!
392//! 1. **Client** starts orchestration → enqueues `StartOrchestration` to orchestrator queue
393//! 2. **OrchestrationDispatcher** fetches work item (peek-lock), loads history from Provider
394//! 3. **Runtime** calls user's orchestration function with `OrchestrationContext`
395//! 4. **Orchestration** schedules activities/timers → Runtime appends `Event`s to history
396//! 5. **Runtime** enqueues `ActivityExecute` to worker queue, `TimerFired` (delayed) to orchestrator queue
397//! 6. **WorkDispatcher** fetches activity work item, executes via `ActivityRegistry`
398//! 7. **Activity** completes → enqueues `ActivityCompleted`/`ActivityFailed` to orchestrator queue
399//! 8. **OrchestrationDispatcher** processes completion → next orchestration turn
400//! 9. **Runtime** atomically commits history + queue changes via `ack_orchestration_item()`
401//!
402//! All operations are deterministic and replayable from history.
403use std::future::Future;
404use std::pin::Pin;
405use std::sync::{Arc, Mutex};
406use std::task::{Context, Poll};
407
408// Public orchestration primitives and executor
409
410pub mod client;
411pub mod runtime;
412// Re-export descriptor type for public API ergonomics
413pub use runtime::OrchestrationDescriptor;
414pub mod providers;
415
416#[cfg(feature = "provider-test")]
417pub mod provider_validations;
418
419#[cfg(feature = "provider-test")]
420pub mod provider_validation;
421
422#[cfg(feature = "provider-test")]
423pub mod provider_stress_tests;
424
425#[cfg(feature = "provider-test")]
426pub mod provider_stress_test;
427
428// Re-export key runtime types for convenience
429pub use client::{Client, ClientError};
430pub use runtime::{
431 OrchestrationHandler, OrchestrationRegistry, OrchestrationRegistryBuilder, OrchestrationStatus, RuntimeOptions,
432};
433
434// Re-export management types for convenience
435pub use providers::{
436 ExecutionInfo, InstanceInfo, ProviderAdmin, QueueDepths, ScheduledActivityIdentifier, SessionFetchConfig,
437 SystemMetrics, TagFilter,
438};
439
440// Re-export capability filtering types
441pub use providers::{DispatcherCapabilityFilter, SemverRange, current_build_version};
442
443// Re-export deletion/pruning types for Client API users
444pub use providers::{DeleteInstanceResult, InstanceFilter, InstanceTree, PruneOptions, PruneResult};
445
446// Type aliases for improved readability and maintainability
447/// Shared reference to a Provider implementation
448pub type ProviderRef = Arc<dyn providers::Provider>;
449
450/// Shared reference to an OrchestrationHandler
451pub type OrchestrationHandlerRef = Arc<dyn runtime::OrchestrationHandler>;
452
453// ============================================================================
454// Heterogeneous Select Result Types
455// ============================================================================
456
457// ============================================================================
458// Schedule Kind and DurableFuture (Cancellation Support)
459// ============================================================================
460
461/// Identifies the kind of scheduled work for cancellation purposes.
462///
463/// When a `DurableFuture` is dropped without completing, the runtime uses
464/// this discriminator to determine how to cancel the underlying work:
465/// - **Activity**: Lock stealing via provider (DELETE from worker_queue)
466/// - **Timer**: No-op (virtual construct, no external state)
467/// - **ExternalWait**: No-op (virtual construct, no external state)
468/// - **SubOrchestration**: Enqueue `CancelInstance` work item for child
469#[derive(Debug, Clone)]
470pub enum ScheduleKind {
471 /// A scheduled activity execution
472 Activity {
473 /// Activity name for debugging/logging
474 name: String,
475 },
476 /// A durable timer
477 Timer,
478 /// Waiting for an external event
479 ExternalWait {
480 /// Event name for debugging/logging
481 event_name: String,
482 },
483 /// Waiting for a persistent external event (mailbox semantics)
484 QueueDequeue {
485 /// Event name for debugging/logging
486 event_name: String,
487 },
488 /// A sub-orchestration
489 SubOrchestration {
490 /// Token for this schedule (used to look up resolved instance ID)
491 token: u64,
492 },
493}
494
495/// A wrapper around scheduled futures that supports cancellation on drop.
496///
497/// When a `DurableFuture` is dropped without completing (e.g., as a select loser,
498/// or when going out of scope without being awaited), the underlying scheduled work
499/// is cancelled:
500///
501/// - **Activities**: Lock stealing via provider (removes from worker queue)
502/// - **Sub-orchestrations**: `CancelInstance` work item enqueued for child
503/// - **Timers/External waits**: No-op (virtual constructs with no external state)
504///
505/// # Examples
506///
507/// ```rust,no_run
508/// # use duroxide::OrchestrationContext;
509/// # use std::time::Duration;
510/// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
511/// // Activity scheduled - if timer wins, activity gets cancelled
512/// let activity = ctx.schedule_activity("SlowWork", "input");
513/// let timeout = ctx.schedule_timer(Duration::from_secs(5));
514///
515/// match ctx.select2(activity, timeout).await {
516/// duroxide::Either2::First(result) => result,
517/// duroxide::Either2::Second(()) => Err("Timed out - activity cancelled".to_string()),
518/// }
519/// # }
520/// ```
521///
522/// # Drop Semantics
523///
524/// Unlike regular Rust futures which are inert on drop, `DurableFuture` has
525/// meaningful drop semantics similar to `File` (closes on drop) or `MutexGuard`
526/// (releases lock on drop). This is intentional - we want unobserved scheduled
527/// work to be cancelled rather than leaked.
528///
529/// **Note:** Using `std::mem::forget()` on a `DurableFuture` will bypass
530/// cancellation, causing the scheduled work to run but its result to be lost.
531pub struct DurableFuture<T> {
532 /// Token assigned at creation (before schedule_id is known)
533 token: u64,
534 /// What kind of schedule this represents
535 kind: ScheduleKind,
536 /// Reference to context for cancellation registration
537 ctx: OrchestrationContext,
538 /// Whether the future has completed (to skip cancellation)
539 completed: bool,
540 /// The underlying future
541 inner: std::pin::Pin<Box<dyn Future<Output = T> + Send>>,
542}
543
544impl<T: Send + 'static> DurableFuture<T> {
545 /// Create a new `DurableFuture` wrapping an inner future.
546 fn new(
547 token: u64,
548 kind: ScheduleKind,
549 ctx: OrchestrationContext,
550 inner: impl Future<Output = T> + Send + 'static,
551 ) -> Self {
552 Self {
553 token,
554 kind,
555 ctx,
556 completed: false,
557 inner: Box::pin(inner),
558 }
559 }
560
561 /// Transform the output of this `DurableFuture` while preserving its
562 /// identity for cancellation, `join`, and `select` composability.
563 ///
564 /// The mapping function runs synchronously after the underlying future
565 /// completes. Cancellation semantics are fully preserved: dropping the
566 /// returned `DurableFuture` cancels the original scheduled work.
567 ///
568 /// # Example
569 ///
570 /// ```rust,no_run
571 /// # use duroxide::OrchestrationContext;
572 /// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
573 /// // Map an activity result to extract a field
574 /// let length = ctx.schedule_activity("Greet", "World")
575 /// .map(|r| r.map(|s| s.len().to_string()));
576 /// let len_result = length.await?;
577 /// # Ok(len_result)
578 /// # }
579 /// ```
580 pub fn map<U: Send + 'static>(self, f: impl FnOnce(T) -> U + Send + 'static) -> DurableFuture<U> {
581 // Transfer ownership of cancellation guard to the new DurableFuture.
582 // We must defuse `self` so its Drop doesn't fire cancellation.
583 let token = self.token;
584 let kind = self.kind.clone();
585 let ctx = self.ctx.clone();
586
587 // Defuse: take the inner future out without running Drop.
588 let inner = unsafe {
589 let inner = std::ptr::read(&self.inner);
590 // Prevent Drop from running on the original (would cancel the token)
591 std::mem::forget(self);
592 inner
593 };
594
595 let mapped = async move {
596 let value = inner.await;
597 f(value)
598 };
599
600 DurableFuture::new(token, kind, ctx, mapped)
601 }
602
603 /// Set a routing tag on this scheduled activity.
604 ///
605 /// Tags direct activities to specialized workers. A worker configured with
606 /// [`crate::providers::TagFilter::tags`]`(["gpu"])` will only process activities
607 /// tagged `"gpu"`.
608 ///
609 /// This method uses **mutate-after-emit**: the action has already been emitted to
610 /// the context's action list, and `with_tag` reaches back to modify it in place.
611 /// This is safe because actions are not drained until the replay engine polls
612 /// the orchestration future (which hasn't happened yet — we're still inside the
613 /// user's `async` block).
614 ///
615 /// # Panics
616 ///
617 /// Panics if called on a non-activity `DurableFuture` (e.g., timer or sub-orchestration).
618 ///
619 /// # Example
620 ///
621 /// ```rust,no_run
622 /// # use duroxide::OrchestrationContext;
623 /// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
624 /// let result = ctx.schedule_activity("CompileRelease", "repo-url")
625 /// .with_tag("build-machine")
626 /// .await?;
627 /// # Ok(result)
628 /// # }
629 /// ```
630 pub fn with_tag(self, tag: impl Into<String>) -> Self {
631 match &self.kind {
632 ScheduleKind::Activity { .. } => {}
633 other => panic!("with_tag() can only be called on activity futures, got {:?}", other),
634 }
635
636 let tag_value = tag.into();
637 let mut inner = self.ctx.inner.lock().expect("Mutex should not be poisoned");
638 // Find the action with our token and set its tag.
639 // The token must exist: it was just emitted by schedule_activity_internal
640 // and emitted_actions is only drained when the replay engine polls
641 // (which hasn't happened yet — we're still in the user's async block).
642 let mut found = false;
643 for (token, action) in inner.emitted_actions.iter_mut() {
644 if *token == self.token {
645 match action {
646 Action::CallActivity { tag, .. } => {
647 *tag = Some(tag_value);
648 }
649 _ => panic!("Token matched but action is not CallActivity"),
650 }
651 found = true;
652 break;
653 }
654 }
655 assert!(
656 found,
657 "with_tag(): token {} not found in emitted_actions — actions were drained before with_tag() was called",
658 self.token
659 );
660 drop(inner);
661 self
662 }
663}
664
665impl<T> Future for DurableFuture<T> {
666 type Output = T;
667
668 fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<T> {
669 match self.inner.as_mut().poll(cx) {
670 Poll::Ready(value) => {
671 self.completed = true;
672 Poll::Ready(value)
673 }
674 Poll::Pending => Poll::Pending,
675 }
676 }
677}
678
679impl<T> Drop for DurableFuture<T> {
680 fn drop(&mut self) {
681 if !self.completed {
682 // Future dropped without completing - trigger cancellation.
683 // Note: During dehydration (TurnResult::Continue), the orchestration future
684 // is dropped after collect_cancelled_from_context() has already run, so these
685 // cancellations go into a context that's about to be dropped. This is safe
686 // because the next turn creates a fresh context.
687 self.ctx.mark_token_cancelled(self.token, &self.kind);
688 }
689 }
690}
691
692// DurableFuture is Send if T is Send (inner is already Send-boxed)
693unsafe impl<T: Send> Send for DurableFuture<T> {}
694
695/// Result type for `select2` - represents which of two futures completed first.
696///
697/// Use this when racing two futures with different output types:
698/// ```rust,no_run
699/// # use duroxide::{OrchestrationContext, Either2};
700/// # use std::time::Duration;
701/// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
702/// let activity = ctx.schedule_activity("Work", "input");
703/// let timeout = ctx.schedule_timer(Duration::from_secs(30));
704///
705/// match ctx.select2(activity, timeout).await {
706/// Either2::First(result) => result,
707/// Either2::Second(()) => Err("Timed out".to_string()),
708/// }
709/// # }
710/// ```
711#[derive(Debug, Clone, PartialEq, Eq)]
712pub enum Either2<A, B> {
713 /// First future completed first
714 First(A),
715 /// Second future completed first
716 Second(B),
717}
718
719impl<A, B> Either2<A, B> {
720 /// Returns true if this is the First variant
721 pub fn is_first(&self) -> bool {
722 matches!(self, Either2::First(_))
723 }
724
725 /// Returns true if this is the Second variant
726 pub fn is_second(&self) -> bool {
727 matches!(self, Either2::Second(_))
728 }
729
730 /// Returns the index of the winner (0 for First, 1 for Second)
731 pub fn index(&self) -> usize {
732 match self {
733 Either2::First(_) => 0,
734 Either2::Second(_) => 1,
735 }
736 }
737}
738
739impl<T> Either2<T, T> {
740 /// For homogeneous Either2 (both types are the same), extract as (index, value).
741 /// This is useful for migration from the old `(usize, T)` return type.
742 pub fn into_tuple(self) -> (usize, T) {
743 match self {
744 Either2::First(v) => (0, v),
745 Either2::Second(v) => (1, v),
746 }
747 }
748}
749
750/// Result type for `select3` - represents which of three futures completed first.
751#[derive(Debug, Clone, PartialEq, Eq)]
752pub enum Either3<A, B, C> {
753 /// First future completed first
754 First(A),
755 /// Second future completed first
756 Second(B),
757 /// Third future completed first
758 Third(C),
759}
760
761impl<A, B, C> Either3<A, B, C> {
762 /// Returns the index of the winner (0 for First, 1 for Second, 2 for Third)
763 pub fn index(&self) -> usize {
764 match self {
765 Either3::First(_) => 0,
766 Either3::Second(_) => 1,
767 Either3::Third(_) => 2,
768 }
769 }
770}
771
772impl<T> Either3<T, T, T> {
773 /// For homogeneous Either3 (all types are the same), extract as (index, value).
774 pub fn into_tuple(self) -> (usize, T) {
775 match self {
776 Either3::First(v) => (0, v),
777 Either3::Second(v) => (1, v),
778 Either3::Third(v) => (2, v),
779 }
780 }
781}
782
783// Reserved prefix for built-in system activities.
784// User-registered activities cannot use names starting with this prefix.
785pub(crate) const SYSCALL_ACTIVITY_PREFIX: &str = "__duroxide_syscall:";
786
787// Built-in system activity names (constructed from prefix)
788pub(crate) const SYSCALL_ACTIVITY_NEW_GUID: &str = "__duroxide_syscall:new_guid";
789pub(crate) const SYSCALL_ACTIVITY_UTC_NOW_MS: &str = "__duroxide_syscall:utc_now_ms";
790pub(crate) const SYSCALL_ACTIVITY_GET_KV_VALUE: &str = "__duroxide_syscall:get_kv_value";
791
792use crate::_typed_codec::Codec;
793// LogLevel is now defined locally in this file
794use serde::{Deserialize, Serialize};
795use std::time::{Duration as StdDuration, SystemTime, UNIX_EPOCH};
796
797// Internal codec utilities for typed I/O (kept private; public API remains ergonomic)
798mod _typed_codec {
799 use serde::{Serialize, de::DeserializeOwned};
800 use serde_json::Value;
801 pub trait Codec {
802 fn encode<T: Serialize>(v: &T) -> Result<String, String>;
803 fn decode<T: DeserializeOwned>(s: &str) -> Result<T, String>;
804 }
805 pub struct Json;
806 impl Codec for Json {
807 fn encode<T: Serialize>(v: &T) -> Result<String, String> {
808 // If the value is a JSON string, return raw content to preserve historic behavior
809 match serde_json::to_value(v) {
810 Ok(Value::String(s)) => Ok(s),
811 Ok(val) => serde_json::to_string(&val).map_err(|e| e.to_string()),
812 Err(e) => Err(e.to_string()),
813 }
814 }
815 fn decode<T: DeserializeOwned>(s: &str) -> Result<T, String> {
816 // Try parse as JSON first
817 match serde_json::from_str::<T>(s) {
818 Ok(v) => Ok(v),
819 Err(_) => {
820 // Fallback: treat raw string as JSON string value
821 let val = Value::String(s.to_string());
822 serde_json::from_value(val).map_err(|e| e.to_string())
823 }
824 }
825 }
826 }
827}
828
829/// Initial execution ID for new orchestration instances.
830/// All orchestrations start with execution_id = 1.
831pub const INITIAL_EXECUTION_ID: u64 = 1;
832
833/// Initial event ID for new executions.
834/// The first event (OrchestrationStarted) always has event_id = 1.
835pub const INITIAL_EVENT_ID: u64 = 1;
836
837// =============================================================================
838// Sub-orchestration instance ID conventions
839// =============================================================================
840
841/// Prefix for auto-generated sub-orchestration instance IDs.
842/// IDs starting with this prefix will have parent prefix added: `{parent}::{sub::N}`
843pub const SUB_ORCH_AUTO_PREFIX: &str = "sub::";
844
845/// Prefix for placeholder instance IDs before event ID assignment.
846/// These are replaced with `sub::{event_id}` during action processing.
847pub(crate) const SUB_ORCH_PENDING_PREFIX: &str = "sub::pending_";
848
849/// Determine if a sub-orchestration instance ID is auto-generated (needs parent prefix).
850///
851/// Auto-generated IDs start with "sub::" and will have the parent instance prefixed
852/// to create a globally unique ID: `{parent_instance}::{child_instance}`.
853///
854/// Explicit IDs (those not starting with "sub::") are used exactly as provided.
855#[inline]
856pub fn is_auto_generated_sub_orch_id(instance: &str) -> bool {
857 instance.starts_with(SUB_ORCH_AUTO_PREFIX)
858}
859
860/// Build the full child instance ID, adding parent prefix only for auto-generated IDs.
861///
862/// - Auto-generated IDs (starting with "sub::"): `{parent}::{child}` (e.g., `parent-1::sub::5`)
863/// - Explicit IDs: used exactly as provided (e.g., `my-custom-child-id`)
864#[inline]
865pub fn build_child_instance_id(parent_instance: &str, child_instance: &str) -> String {
866 if is_auto_generated_sub_orch_id(child_instance) {
867 format!("{parent_instance}::{child_instance}")
868 } else {
869 child_instance.to_string()
870 }
871}
872
873/// Structured error details for orchestration failures.
874///
875/// Errors are categorized into three types for proper metrics and logging:
876/// - **Infrastructure**: Provider failures, data corruption (abort turn, never reach user code)
877/// - **Configuration**: Deployment issues like unregistered activities, nondeterminism (abort turn)
878/// - **Application**: Business logic failures (flow through normal orchestration code)
879#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
880pub enum ErrorDetails {
881 /// Infrastructure failure (provider errors, data corruption).
882 /// These errors abort orchestration execution and never reach user code.
883 Infrastructure {
884 operation: String,
885 message: String,
886 retryable: bool,
887 },
888
889 /// Configuration error (unregistered orchestrations/activities, nondeterminism).
890 /// These errors abort orchestration execution and never reach user code.
891 Configuration {
892 kind: ConfigErrorKind,
893 resource: String,
894 message: Option<String>,
895 },
896
897 /// Application error (business logic failures).
898 /// These are the ONLY errors that orchestration code sees.
899 Application {
900 kind: AppErrorKind,
901 message: String,
902 retryable: bool,
903 },
904
905 /// Poison message error - message exceeded max fetch attempts.
906 ///
907 /// This indicates a message that repeatedly fails to process.
908 /// Could be caused by:
909 /// - Malformed message data causing deserialization failures
910 /// - Message triggering bugs that crash the worker
911 /// - Transient infrastructure issues that became permanent
912 /// - Application code bugs triggered by specific input patterns
913 Poison {
914 /// Number of times the message was fetched
915 attempt_count: u32,
916 /// Maximum allowed attempts
917 max_attempts: u32,
918 /// Message type and identity
919 message_type: PoisonMessageType,
920 /// The poisoned message content (serialized JSON for debugging)
921 message: String,
922 },
923}
924
925/// Poison message type identification.
926#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
927pub enum PoisonMessageType {
928 /// Orchestration work item batch
929 Orchestration { instance: String, execution_id: u64 },
930 /// Activity execution
931 Activity {
932 instance: String,
933 execution_id: u64,
934 activity_name: String,
935 activity_id: u64,
936 },
937 /// History deserialization failure (e.g., unknown event types from a newer duroxide version)
938 FailedDeserialization {
939 instance: String,
940 execution_id: u64,
941 error: String,
942 },
943}
944
945/// Configuration error kinds.
946#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
947pub enum ConfigErrorKind {
948 Nondeterminism,
949}
950
951/// Application error kinds.
952#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
953pub enum AppErrorKind {
954 ActivityFailed,
955 OrchestrationFailed,
956 Panicked,
957 Cancelled { reason: String },
958}
959
960impl ErrorDetails {
961 /// Get failure category for metrics/logging.
962 pub fn category(&self) -> &'static str {
963 match self {
964 ErrorDetails::Infrastructure { .. } => "infrastructure",
965 ErrorDetails::Configuration { .. } => "configuration",
966 ErrorDetails::Application { .. } => "application",
967 ErrorDetails::Poison { .. } => "poison",
968 }
969 }
970
971 /// Check if failure is retryable.
972 pub fn is_retryable(&self) -> bool {
973 match self {
974 ErrorDetails::Infrastructure { retryable, .. } => *retryable,
975 ErrorDetails::Application { retryable, .. } => *retryable,
976 ErrorDetails::Configuration { .. } => false,
977 ErrorDetails::Poison { .. } => false, // Never retryable
978 }
979 }
980
981 /// Get display message for logging/UI (backward compatible format).
982 pub fn display_message(&self) -> String {
983 match self {
984 ErrorDetails::Infrastructure { operation, message, .. } => {
985 format!("infrastructure:{operation}: {message}")
986 }
987 ErrorDetails::Configuration {
988 kind,
989 resource,
990 message,
991 } => match kind {
992 ConfigErrorKind::Nondeterminism => message
993 .as_ref()
994 .map(|m| format!("nondeterministic: {m}"))
995 .unwrap_or_else(|| format!("nondeterministic in {resource}")),
996 },
997 ErrorDetails::Application { kind, message, .. } => match kind {
998 AppErrorKind::Cancelled { reason } => format!("canceled: {reason}"),
999 AppErrorKind::Panicked => format!("orchestration panicked: {message}"),
1000 _ => message.clone(),
1001 },
1002 ErrorDetails::Poison {
1003 attempt_count,
1004 max_attempts,
1005 message_type,
1006 ..
1007 } => match message_type {
1008 PoisonMessageType::Orchestration { instance, .. } => {
1009 format!("poison: orchestration {instance} exceeded {attempt_count} attempts (max {max_attempts})")
1010 }
1011 PoisonMessageType::Activity {
1012 activity_name,
1013 activity_id,
1014 ..
1015 } => {
1016 format!(
1017 "poison: activity {activity_name}#{activity_id} exceeded {attempt_count} attempts (max {max_attempts})"
1018 )
1019 }
1020 PoisonMessageType::FailedDeserialization { instance, error, .. } => {
1021 format!(
1022 "poison: orchestration {instance} history deserialization failed after {attempt_count} attempts (max {max_attempts}): {error}"
1023 )
1024 }
1025 },
1026 }
1027 }
1028}
1029
1030/// Unified event with common metadata and type-specific payload.
1031///
1032/// All events have common fields (event_id, source_event_id, instance_id, etc.)
1033/// plus type-specific data in the `kind` field.
1034///
1035/// Events are append-only history entries persisted by a provider and consumed during replay.
1036/// The `event_id` is a monotonically increasing position in history.
1037/// Scheduling and completion events are linked via `source_event_id`.
1038#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1039pub struct Event {
1040 /// Sequential position in history (monotonically increasing per execution)
1041 pub event_id: u64,
1042
1043 /// For completion events: references the scheduling event this completes.
1044 /// None for lifecycle events (OrchestrationStarted, etc.) and scheduling events.
1045 /// Some(id) for completion events (ActivityCompleted, TimerFired, etc.).
1046 pub source_event_id: Option<u64>,
1047
1048 /// Instance this event belongs to.
1049 /// Denormalized from DB key for self-contained events.
1050 pub instance_id: String,
1051
1052 /// Execution this event belongs to.
1053 /// Denormalized from DB key for self-contained events.
1054 pub execution_id: u64,
1055
1056 /// Timestamp when event was created (milliseconds since Unix epoch).
1057 pub timestamp_ms: u64,
1058
1059 /// Crate semver version that generated this event.
1060 /// Format: "0.1.0", "0.2.0", etc.
1061 pub duroxide_version: String,
1062
1063 /// Event type and associated data.
1064 #[serde(flatten)]
1065 pub kind: EventKind,
1066}
1067
1068/// Event-specific payloads.
1069///
1070/// Common fields have been extracted to the Event struct:
1071/// - event_id: moved to Event.event_id
1072/// - source_event_id: moved to Event.source_event_id (`Option<u64>`)
1073/// - execution_id: moved to Event.execution_id (was in 4 variants)
1074#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1075#[serde(tag = "type")]
1076pub enum EventKind {
1077 /// Orchestration instance was created and started by name with input.
1078 /// Version is required; parent linkage is present when this is a child orchestration.
1079 #[serde(rename = "OrchestrationStarted")]
1080 OrchestrationStarted {
1081 name: String,
1082 version: String,
1083 input: String,
1084 parent_instance: Option<String>,
1085 parent_id: Option<u64>,
1086 /// Persistent events carried forward from the previous execution during continue-as-new.
1087 /// Present only on CAN-initiated executions for audit trail. Each tuple is (event_name, data).
1088 #[serde(skip_serializing_if = "Option::is_none")]
1089 #[serde(default)]
1090 carry_forward_events: Option<Vec<(String, String)>>,
1091 /// Custom status carried forward from the previous execution during continue-as-new.
1092 /// Initialized from the last `CustomStatusUpdated` event in the previous execution's history.
1093 #[serde(skip_serializing_if = "Option::is_none")]
1094 #[serde(default)]
1095 initial_custom_status: Option<String>,
1096 },
1097
1098 /// Orchestration completed with a final result.
1099 #[serde(rename = "OrchestrationCompleted")]
1100 OrchestrationCompleted { output: String },
1101
1102 /// Orchestration failed with a final error.
1103 #[serde(rename = "OrchestrationFailed")]
1104 OrchestrationFailed { details: ErrorDetails },
1105
1106 /// Activity was scheduled.
1107 #[serde(rename = "ActivityScheduled")]
1108 ActivityScheduled {
1109 name: String,
1110 input: String,
1111 #[serde(skip_serializing_if = "Option::is_none")]
1112 #[serde(default)]
1113 session_id: Option<String>,
1114 /// Routing tag for directing this activity to specific workers.
1115 /// `None` means default (untagged) queue.
1116 #[serde(skip_serializing_if = "Option::is_none")]
1117 #[serde(default)]
1118 tag: Option<String>,
1119 },
1120
1121 /// Activity completed successfully with a result.
1122 #[serde(rename = "ActivityCompleted")]
1123 ActivityCompleted { result: String },
1124
1125 /// Activity failed with error details.
1126 #[serde(rename = "ActivityFailed")]
1127 ActivityFailed { details: ErrorDetails },
1128
1129 /// Cancellation was requested for an activity (best-effort; completion may still arrive).
1130 /// Correlates to the ActivityScheduled event via Event.source_event_id.
1131 #[serde(rename = "ActivityCancelRequested")]
1132 ActivityCancelRequested { reason: String },
1133
1134 /// Timer was created and will logically fire at `fire_at_ms`.
1135 #[serde(rename = "TimerCreated")]
1136 TimerCreated { fire_at_ms: u64 },
1137
1138 /// Timer fired at logical time `fire_at_ms`.
1139 #[serde(rename = "TimerFired")]
1140 TimerFired { fire_at_ms: u64 },
1141
1142 /// Subscription to an external event by name was recorded.
1143 #[serde(rename = "ExternalSubscribed")]
1144 ExternalSubscribed { name: String },
1145
1146 /// An external event was raised. Matched by name (no source_event_id).
1147 #[serde(rename = "ExternalEvent")]
1148 ExternalEvent { name: String, data: String },
1149
1150 /// Fire-and-forget orchestration scheduling (detached).
1151 #[serde(rename = "OrchestrationChained")]
1152 OrchestrationChained {
1153 name: String,
1154 instance: String,
1155 input: String,
1156 },
1157
1158 /// Sub-orchestration was scheduled with deterministic child instance id.
1159 #[serde(rename = "SubOrchestrationScheduled")]
1160 SubOrchestrationScheduled {
1161 name: String,
1162 instance: String,
1163 input: String,
1164 },
1165
1166 /// Sub-orchestration completed and returned a result to the parent.
1167 #[serde(rename = "SubOrchestrationCompleted")]
1168 SubOrchestrationCompleted { result: String },
1169
1170 /// Sub-orchestration failed and returned error details to the parent.
1171 #[serde(rename = "SubOrchestrationFailed")]
1172 SubOrchestrationFailed { details: ErrorDetails },
1173
1174 /// Cancellation was requested for a sub-orchestration (best-effort; completion may still arrive).
1175 /// Correlates to the SubOrchestrationScheduled event via Event.source_event_id.
1176 #[serde(rename = "SubOrchestrationCancelRequested")]
1177 SubOrchestrationCancelRequested { reason: String },
1178
1179 /// Orchestration continued as new with fresh input (terminal for this execution).
1180 #[serde(rename = "OrchestrationContinuedAsNew")]
1181 OrchestrationContinuedAsNew { input: String },
1182
1183 /// Cancellation has been requested for the orchestration (terminal will follow deterministically).
1184 #[serde(rename = "OrchestrationCancelRequested")]
1185 OrchestrationCancelRequested { reason: String },
1186
1187 /// An external event subscription was cancelled (e.g., lost a select2 race).
1188 /// Correlates to the ExternalSubscribed event via Event.source_event_id.
1189 /// This breadcrumb ensures deterministic replay — cancelled subscriptions
1190 /// are skipped during index-based matching.
1191 #[serde(rename = "ExternalSubscribedCancelled")]
1192 ExternalSubscribedCancelled { reason: String },
1193
1194 /// Persistent subscription to an external event by name was recorded.
1195 /// Unlike positional ExternalSubscribed, persistent subscriptions use
1196 /// mailbox semantics: FIFO matching, no positional pairing.
1197 #[serde(rename = "ExternalSubscribedPersistent")]
1198 QueueSubscribed { name: String },
1199
1200 /// A persistent external event was raised. Matched by name using FIFO
1201 /// mailbox semantics (not positional). Events stick around until consumed.
1202 #[serde(rename = "ExternalEventPersistent")]
1203 QueueEventDelivered { name: String, data: String },
1204
1205 /// A persistent external event subscription was cancelled (e.g., lost a select2 race).
1206 /// Correlates to the QueueSubscribed event via Event.source_event_id.
1207 /// This breadcrumb ensures correct CAN carry-forward — cancelled subscriptions
1208 /// are not counted as having consumed an arrival.
1209 #[serde(rename = "ExternalSubscribedPersistentCancelled")]
1210 QueueSubscriptionCancelled { reason: String },
1211
1212 /// Custom status was updated via `ctx.set_custom_status()` or `ctx.reset_custom_status()`.
1213 /// `Some(s)` means set to `s`; `None` means cleared back to NULL.
1214 /// This event makes custom status changes durable and replayable.
1215 #[serde(rename = "CustomStatusUpdated")]
1216 CustomStatusUpdated { status: Option<String> },
1217
1218 /// V2 subscription: includes a topic filter for pub/sub matching.
1219 /// Feature-gated for replay engine extensibility verification.
1220 #[cfg(feature = "replay-version-test")]
1221 #[serde(rename = "ExternalSubscribed2")]
1222 ExternalSubscribed2 { name: String, topic: String },
1223
1224 /// V2 event: includes the actual topic it was published on.
1225 /// Feature-gated for replay engine extensibility verification.
1226 #[cfg(feature = "replay-version-test")]
1227 #[serde(rename = "ExternalEvent2")]
1228 ExternalEvent2 { name: String, topic: String, data: String },
1229
1230 // === Key-Value Store Events ===
1231 /// A key-value pair was set via `ctx.set_kv_value()`.
1232 /// Fire-and-forget metadata action (like `CustomStatusUpdated`).
1233 #[serde(rename = "KeyValueSet")]
1234 KeyValueSet {
1235 key: String,
1236 value: String,
1237 /// Timestamp (ms since epoch) when this key was written.
1238 /// Set by the runtime, persisted by the provider. Defaults to 0 for pre-upgrade events.
1239 #[serde(default)]
1240 last_updated_at_ms: u64,
1241 },
1242
1243 /// A single key was cleared via `ctx.clear_kv_value()`.
1244 #[serde(rename = "KeyValueCleared")]
1245 KeyValueCleared { key: String },
1246
1247 /// All key-value pairs were cleared via `ctx.clear_all_kv_values()`.
1248 #[serde(rename = "KeyValuesCleared")]
1249 KeyValuesCleared,
1250}
1251
1252impl Event {
1253 /// Create a new event with common fields populated and a specific event_id.
1254 ///
1255 /// Use this when you know the event_id upfront (e.g., during replay or when
1256 /// creating events inline).
1257 pub fn with_event_id(
1258 event_id: u64,
1259 instance_id: impl Into<String>,
1260 execution_id: u64,
1261 source_event_id: Option<u64>,
1262 kind: EventKind,
1263 ) -> Self {
1264 use std::time::{SystemTime, UNIX_EPOCH};
1265 Event {
1266 event_id,
1267 source_event_id,
1268 instance_id: instance_id.into(),
1269 execution_id,
1270 timestamp_ms: SystemTime::now()
1271 .duration_since(UNIX_EPOCH)
1272 .map(|d| d.as_millis() as u64)
1273 .unwrap_or(0),
1274 duroxide_version: env!("CARGO_PKG_VERSION").to_string(),
1275 kind,
1276 }
1277 }
1278
1279 /// Create a new event with common fields populated.
1280 ///
1281 /// The event_id will be 0 and should be set by the history manager.
1282 pub fn new(
1283 instance_id: impl Into<String>,
1284 execution_id: u64,
1285 source_event_id: Option<u64>,
1286 kind: EventKind,
1287 ) -> Self {
1288 Self::with_event_id(0, instance_id, execution_id, source_event_id, kind)
1289 }
1290
1291 /// Get the event_id (position in history).
1292 #[inline]
1293 pub fn event_id(&self) -> u64 {
1294 self.event_id
1295 }
1296
1297 /// Set the event_id (used by runtime when adding events to history).
1298 #[inline]
1299 pub(crate) fn set_event_id(&mut self, id: u64) {
1300 self.event_id = id;
1301 }
1302
1303 /// Get the source_event_id if this is a completion event.
1304 /// Returns None for lifecycle and scheduling events.
1305 #[inline]
1306 pub fn source_event_id(&self) -> Option<u64> {
1307 self.source_event_id
1308 }
1309
1310 /// Check if this event is a terminal event (ends the orchestration).
1311 pub fn is_terminal(&self) -> bool {
1312 matches!(
1313 self.kind,
1314 EventKind::OrchestrationCompleted { .. }
1315 | EventKind::OrchestrationFailed { .. }
1316 | EventKind::OrchestrationContinuedAsNew { .. }
1317 )
1318 }
1319}
1320
1321/// Log levels for orchestration context logging.
1322#[derive(Debug, Clone)]
1323pub enum LogLevel {
1324 Info,
1325 Warn,
1326 Error,
1327}
1328
1329/// Backoff strategy for computing delay between retry attempts.
1330#[derive(Debug, Clone)]
1331pub enum BackoffStrategy {
1332 /// No delay between retries.
1333 None,
1334 /// Fixed delay between all retries.
1335 Fixed {
1336 /// Delay duration between each retry.
1337 delay: std::time::Duration,
1338 },
1339 /// Linear backoff: delay = base * attempt, capped at max.
1340 Linear {
1341 /// Base delay multiplied by attempt number.
1342 base: std::time::Duration,
1343 /// Maximum delay cap.
1344 max: std::time::Duration,
1345 },
1346 /// Exponential backoff: delay = base * multiplier^(attempt-1), capped at max.
1347 Exponential {
1348 /// Initial delay for first retry.
1349 base: std::time::Duration,
1350 /// Multiplier applied each attempt.
1351 multiplier: f64,
1352 /// Maximum delay cap.
1353 max: std::time::Duration,
1354 },
1355}
1356
1357impl Default for BackoffStrategy {
1358 fn default() -> Self {
1359 BackoffStrategy::Exponential {
1360 base: std::time::Duration::from_millis(100),
1361 multiplier: 2.0,
1362 max: std::time::Duration::from_secs(30),
1363 }
1364 }
1365}
1366
1367impl BackoffStrategy {
1368 /// Compute delay for given attempt (1-indexed).
1369 /// Attempt 1 is after first failure, so delay_for_attempt(1) is the first backoff.
1370 pub fn delay_for_attempt(&self, attempt: u32) -> std::time::Duration {
1371 match self {
1372 BackoffStrategy::None => std::time::Duration::ZERO,
1373 BackoffStrategy::Fixed { delay } => *delay,
1374 BackoffStrategy::Linear { base, max } => {
1375 let delay = base.saturating_mul(attempt);
1376 std::cmp::min(delay, *max)
1377 }
1378 BackoffStrategy::Exponential { base, multiplier, max } => {
1379 // delay = base * multiplier^(attempt-1)
1380 let factor = multiplier.powi(attempt.saturating_sub(1) as i32);
1381 let delay_nanos = (base.as_nanos() as f64 * factor) as u128;
1382 let delay = std::time::Duration::from_nanos(delay_nanos.min(u64::MAX as u128) as u64);
1383 std::cmp::min(delay, *max)
1384 }
1385 }
1386 }
1387}
1388
1389/// Retry policy for activities.
1390///
1391/// Configures automatic retry behavior including maximum attempts, backoff strategy,
1392/// and optional total timeout spanning all attempts.
1393///
1394/// # Example
1395///
1396/// ```rust
1397/// use std::time::Duration;
1398/// use duroxide::{RetryPolicy, BackoffStrategy};
1399///
1400/// // Simple retry with defaults (3 attempts, exponential backoff)
1401/// let policy = RetryPolicy::new(3);
1402///
1403/// // Custom policy with timeout and fixed backoff
1404/// let policy = RetryPolicy::new(5)
1405/// .with_timeout(Duration::from_secs(30))
1406/// .with_backoff(BackoffStrategy::Fixed {
1407/// delay: Duration::from_secs(1),
1408/// });
1409/// ```
1410#[derive(Debug, Clone)]
1411pub struct RetryPolicy {
1412 /// Maximum number of attempts (including initial). Must be >= 1.
1413 pub max_attempts: u32,
1414 /// Backoff strategy between retries.
1415 pub backoff: BackoffStrategy,
1416 /// Per-attempt timeout. If set, each activity attempt is raced against this
1417 /// timeout. If timeout fires, returns error immediately (no retry).
1418 /// Retries only occur for activity errors, not timeouts. None = no timeout.
1419 pub timeout: Option<std::time::Duration>,
1420}
1421
1422impl Default for RetryPolicy {
1423 fn default() -> Self {
1424 Self {
1425 max_attempts: 3,
1426 backoff: BackoffStrategy::default(),
1427 timeout: None,
1428 }
1429 }
1430}
1431
1432impl RetryPolicy {
1433 /// Create a new retry policy with specified max attempts and default backoff.
1434 ///
1435 /// # Panics
1436 /// Panics if `max_attempts` is 0.
1437 pub fn new(max_attempts: u32) -> Self {
1438 assert!(max_attempts >= 1, "max_attempts must be at least 1");
1439 Self {
1440 max_attempts,
1441 ..Default::default()
1442 }
1443 }
1444
1445 /// Set per-attempt timeout.
1446 ///
1447 /// Each activity attempt is raced against this timeout. If the timeout fires
1448 /// before the activity completes, returns an error immediately (no retry).
1449 /// Retries only occur for activity errors, not timeouts.
1450 pub fn with_timeout(mut self, timeout: std::time::Duration) -> Self {
1451 self.timeout = Some(timeout);
1452 self
1453 }
1454
1455 /// Alias for `with_timeout` for backwards compatibility.
1456 #[doc(hidden)]
1457 pub fn with_total_timeout(mut self, timeout: std::time::Duration) -> Self {
1458 self.timeout = Some(timeout);
1459 self
1460 }
1461
1462 /// Set backoff strategy.
1463 pub fn with_backoff(mut self, backoff: BackoffStrategy) -> Self {
1464 self.backoff = backoff;
1465 self
1466 }
1467
1468 /// Compute delay for given attempt using the configured backoff strategy.
1469 pub fn delay_for_attempt(&self, attempt: u32) -> std::time::Duration {
1470 self.backoff.delay_for_attempt(attempt)
1471 }
1472}
1473
1474/// Declarative decisions produced by an orchestration turn. The host/provider
1475/// is responsible for materializing these into corresponding `Event`s.
1476#[derive(Debug, Clone)]
1477pub enum Action {
1478 /// Schedule an activity invocation. scheduling_event_id is the event_id of the ActivityScheduled event.
1479 CallActivity {
1480 scheduling_event_id: u64,
1481 name: String,
1482 input: String,
1483 /// Optional session ID for worker affinity routing.
1484 /// When set, the activity is routed to the worker owning this session.
1485 session_id: Option<String>,
1486 /// Optional routing tag for directing this activity to specific workers.
1487 /// Set via `.with_tag()` on the returned `DurableFuture`.
1488 tag: Option<String>,
1489 },
1490 /// Create a timer that will fire at the specified absolute time.
1491 /// scheduling_event_id is the event_id of the TimerCreated event.
1492 /// fire_at_ms is the absolute timestamp (ms since epoch) when the timer should fire.
1493 CreateTimer { scheduling_event_id: u64, fire_at_ms: u64 },
1494 /// Subscribe to an external event by name. scheduling_event_id is the event_id of the ExternalSubscribed event.
1495 WaitExternal { scheduling_event_id: u64, name: String },
1496 /// Start a detached orchestration (no result routing back to parent).
1497 StartOrchestrationDetached {
1498 scheduling_event_id: u64,
1499 name: String,
1500 version: Option<String>,
1501 instance: String,
1502 input: String,
1503 },
1504 /// Start a sub-orchestration by name and child instance id. scheduling_event_id is the event_id of the SubOrchestrationScheduled event.
1505 StartSubOrchestration {
1506 scheduling_event_id: u64,
1507 name: String,
1508 version: Option<String>,
1509 instance: String,
1510 input: String,
1511 },
1512
1513 /// Continue the current orchestration as a new execution with new input (terminal for current execution).
1514 /// Optional version string selects the target orchestration version for the new execution.
1515 ContinueAsNew { input: String, version: Option<String> },
1516
1517 /// Update the custom status of the orchestration.
1518 /// `Some(s)` means set to `s`; `None` means cleared back to NULL.
1519 UpdateCustomStatus { status: Option<String> },
1520
1521 /// Subscribe to a persistent external event by name (mailbox semantics).
1522 /// Unlike WaitExternal, persistent events use FIFO matching and survive cancellation.
1523 DequeueEvent { scheduling_event_id: u64, name: String },
1524
1525 /// V2: Subscribe to an external event with topic-based pub/sub matching.
1526 /// Feature-gated for replay engine extensibility verification.
1527 #[cfg(feature = "replay-version-test")]
1528 WaitExternal2 {
1529 scheduling_event_id: u64,
1530 name: String,
1531 topic: String,
1532 },
1533
1534 // === Key-Value Store Actions ===
1535 /// Set a key-value pair on this orchestration instance.
1536 /// Fire-and-forget metadata action (like `UpdateCustomStatus`).
1537 SetKeyValue {
1538 key: String,
1539 value: String,
1540 last_updated_at_ms: u64,
1541 },
1542
1543 /// Clear a single key from the KV store.
1544 ClearKeyValue { key: String },
1545
1546 /// Clear all keys from the KV store.
1547 ClearKeyValues,
1548}
1549
1550/// Result delivered to a durable future upon completion.
1551///
1552/// This enum represents the completion states for various durable operations.
1553#[doc(hidden)]
1554#[derive(Debug, Clone)]
1555#[allow(dead_code)] // ExternalData is part of the design but delivered via get_external_event
1556pub enum CompletionResult {
1557 /// Activity completed successfully
1558 ActivityOk(String),
1559 /// Activity failed with error
1560 ActivityErr(String),
1561 /// Timer fired
1562 TimerFired,
1563 /// Sub-orchestration completed successfully
1564 SubOrchOk(String),
1565 /// Sub-orchestration failed with error
1566 SubOrchErr(String),
1567 /// External event data (NOTE: External events delivered via get_external_event, not CompletionResult)
1568 ExternalData(String),
1569}
1570
1571#[derive(Debug)]
1572struct CtxInner {
1573 /// Whether we're currently replaying history (true) or processing new events (false).
1574 /// True while processing baseline_history events, false after.
1575 /// Users can check this via `ctx.is_replaying()` to skip side effects during replay.
1576 is_replaying: bool,
1577
1578 // === Replay Engine State ===
1579 /// Token counter (each schedule_*() call gets a unique token)
1580 next_token: u64,
1581 /// Emitted actions (token -> Action kind info)
1582 /// Token is used to correlate with schedule events during replay
1583 emitted_actions: Vec<(u64, Action)>,
1584 /// Results map: token -> completion result (populated by replay engine)
1585 completion_results: std::collections::HashMap<u64, CompletionResult>,
1586 /// Token -> schedule_id binding (set when replay engine matches action to history)
1587 token_bindings: std::collections::HashMap<u64, u64>,
1588 /// External subscriptions: schedule_id -> (name, subscription_index)
1589 external_subscriptions: std::collections::HashMap<u64, (String, usize)>,
1590 /// External arrivals: name -> list of payloads in arrival order
1591 external_arrivals: std::collections::HashMap<String, Vec<String>>,
1592 /// Next subscription index per external event name
1593 external_next_index: std::collections::HashMap<String, usize>,
1594 /// Cancelled external subscription schedule_ids (from ExternalSubscribedCancelled breadcrumbs)
1595 external_cancelled_subscriptions: std::collections::HashSet<u64>,
1596
1597 // === Persistent External Event State (mailbox semantics) ===
1598 /// Persistent subscriptions: schedule_id -> name
1599 /// Each subscription is matched FIFO with persistent arrivals.
1600 queue_subscriptions: Vec<(u64, String)>,
1601 /// Persistent arrivals: name -> list of payloads in arrival order
1602 queue_arrivals: std::collections::HashMap<String, Vec<String>>,
1603 /// Persistent subscriptions that have been cancelled (dropped without completing)
1604 queue_cancelled_subscriptions: std::collections::HashSet<u64>,
1605 /// Persistent subscriptions that have already been resolved (consumed an arrival)
1606 queue_resolved_subscriptions: std::collections::HashSet<u64>,
1607
1608 // === V2 External Event State (feature-gated) ===
1609 /// V2 external subscriptions: schedule_id -> (name, topic, subscription_index)
1610 #[cfg(feature = "replay-version-test")]
1611 external2_subscriptions: std::collections::HashMap<u64, (String, String, usize)>,
1612 /// V2 external arrivals: (name, topic) -> list of payloads in arrival order
1613 #[cfg(feature = "replay-version-test")]
1614 external2_arrivals: std::collections::HashMap<(String, String), Vec<String>>,
1615 /// Next subscription index per (name, topic) pair
1616 #[cfg(feature = "replay-version-test")]
1617 external2_next_index: std::collections::HashMap<(String, String), usize>,
1618
1619 /// Sub-orchestration token -> resolved instance ID mapping
1620 sub_orchestration_instances: std::collections::HashMap<u64, String>,
1621
1622 // === Cancellation Tracking ===
1623 /// Tokens that have been cancelled (dropped without completing)
1624 cancelled_tokens: std::collections::HashSet<u64>,
1625 /// Cancelled token -> ScheduleKind mapping (for determining cancellation action)
1626 cancelled_token_kinds: std::collections::HashMap<u64, ScheduleKind>,
1627
1628 // Execution metadata
1629 execution_id: u64,
1630 instance_id: String,
1631 orchestration_name: String,
1632 orchestration_version: String,
1633 logging_enabled_this_poll: bool,
1634
1635 /// Accumulated custom status from history events.
1636 /// Updated by `CustomStatusUpdated` events during replay and by `set_custom_status()` / `reset_custom_status()` calls.
1637 /// `None` means no custom status has been set (either never set, or cleared via `reset_custom_status()`).
1638 accumulated_custom_status: Option<String>,
1639
1640 /// Instance-scoped key-value store (values only).
1641 /// Seeded from the provider's materialized `kv_store` table at fetch time,
1642 /// then kept current by `set_kv_value`/`clear_kv_value`/`clear_all_kv_values` calls.
1643 kv_state: std::collections::HashMap<String, String>,
1644
1645 /// Per-key `last_updated_at_ms` timestamps, seeded from the provider snapshot.
1646 /// Used by `prune_kv_values_updated_before()` for deterministic pruning decisions.
1647 /// Keys written during the current turn are marked with `u64::MAX` (never prunable
1648 /// until the next turn, when their real timestamp will appear in the snapshot).
1649 kv_metadata: std::collections::HashMap<String, u64>,
1650}
1651
1652impl CtxInner {
1653 fn new(
1654 _history: Vec<Event>, // Kept for API compatibility, no longer used
1655 execution_id: u64,
1656 instance_id: String,
1657 orchestration_name: String,
1658 orchestration_version: String,
1659 _worker_id: Option<String>, // Kept for API compatibility, no longer used
1660 ) -> Self {
1661 Self {
1662 // Start in replaying state - will be set to false when we move past baseline history
1663 is_replaying: true,
1664
1665 // Replay engine state
1666 next_token: 0,
1667 emitted_actions: Vec::new(),
1668 completion_results: Default::default(),
1669 token_bindings: Default::default(),
1670 external_subscriptions: Default::default(),
1671 external_arrivals: Default::default(),
1672 external_next_index: Default::default(),
1673 external_cancelled_subscriptions: Default::default(),
1674 queue_subscriptions: Default::default(),
1675 queue_arrivals: Default::default(),
1676 queue_cancelled_subscriptions: Default::default(),
1677 queue_resolved_subscriptions: Default::default(),
1678 #[cfg(feature = "replay-version-test")]
1679 external2_subscriptions: Default::default(),
1680 #[cfg(feature = "replay-version-test")]
1681 external2_arrivals: Default::default(),
1682 #[cfg(feature = "replay-version-test")]
1683 external2_next_index: Default::default(),
1684 sub_orchestration_instances: Default::default(),
1685
1686 // Cancellation tracking
1687 cancelled_tokens: Default::default(),
1688 cancelled_token_kinds: Default::default(),
1689
1690 // Execution metadata
1691 execution_id,
1692 instance_id,
1693 orchestration_name,
1694 orchestration_version,
1695 logging_enabled_this_poll: false,
1696
1697 accumulated_custom_status: None,
1698
1699 kv_state: std::collections::HashMap::new(),
1700 kv_metadata: std::collections::HashMap::new(),
1701 }
1702 }
1703
1704 fn now_ms(&self) -> u64 {
1705 SystemTime::now()
1706 .duration_since(UNIX_EPOCH)
1707 .map(|d| d.as_millis() as u64)
1708 .unwrap_or(0)
1709 }
1710
1711 // === Replay Engine Helpers ===
1712
1713 /// Emit an action and return a token for correlation.
1714 fn emit_action(&mut self, action: Action) -> u64 {
1715 self.next_token += 1;
1716 let token = self.next_token;
1717 self.emitted_actions.push((token, action));
1718 token
1719 }
1720
1721 /// Drain all emitted actions (called by replay engine after polling).
1722 fn drain_emitted_actions(&mut self) -> Vec<(u64, Action)> {
1723 std::mem::take(&mut self.emitted_actions)
1724 }
1725
1726 /// Bind a token to a schedule_id (called by replay engine when matching action to history).
1727 fn bind_token(&mut self, token: u64, schedule_id: u64) {
1728 self.token_bindings.insert(token, schedule_id);
1729 }
1730
1731 /// Get the schedule_id bound to a token (returns None if not yet bound).
1732 fn get_bound_schedule_id(&self, token: u64) -> Option<u64> {
1733 self.token_bindings.get(&token).copied()
1734 }
1735
1736 /// Deliver a completion result for a schedule_id.
1737 fn deliver_result(&mut self, schedule_id: u64, result: CompletionResult) {
1738 // Find the token that was bound to this schedule_id
1739 for (&token, &sid) in &self.token_bindings {
1740 if sid == schedule_id {
1741 self.completion_results.insert(token, result);
1742 return;
1743 }
1744 }
1745 tracing::warn!(
1746 schedule_id,
1747 "dropping completion result with no binding (unsupported for now)"
1748 );
1749 }
1750
1751 /// Check if a result is available for a token.
1752 fn get_result(&self, token: u64) -> Option<&CompletionResult> {
1753 self.completion_results.get(&token)
1754 }
1755
1756 /// Bind an external subscription to a deterministic index.
1757 fn bind_external_subscription(&mut self, schedule_id: u64, name: &str) {
1758 let idx = self.external_next_index.entry(name.to_string()).or_insert(0);
1759 let subscription_index = *idx;
1760 *idx += 1;
1761 self.external_subscriptions
1762 .insert(schedule_id, (name.to_string(), subscription_index));
1763 }
1764
1765 /// Check if there is an active (non-cancelled) subscription slot available
1766 /// for a positional external event.
1767 ///
1768 /// Returns `true` if there is at least one active subscription for this name
1769 /// that hasn't been matched to an arrival yet. Cancelled subscriptions are
1770 /// excluded — they don't occupy arrival slots (compression skips them).
1771 fn has_pending_subscription_slot(&self, name: &str) -> bool {
1772 let active_subs = self
1773 .external_subscriptions
1774 .iter()
1775 .filter(|(sid, (n, _))| n == name && !self.external_cancelled_subscriptions.contains(sid))
1776 .count();
1777 let delivered = self.external_arrivals.get(name).map_or(0, |v| v.len());
1778 active_subs > delivered
1779 }
1780
1781 /// Deliver an external event (appends to arrival list for the name).
1782 fn deliver_external_event(&mut self, name: String, data: String) {
1783 self.external_arrivals.entry(name).or_default().push(data);
1784 }
1785
1786 /// Get external event data for a subscription (by schedule_id).
1787 ///
1788 /// Uses compressed (effective) indices that skip cancelled subscriptions:
1789 /// effective_index = raw_index - count_of_cancelled_subscriptions_with_lower_raw_index
1790 ///
1791 /// Stale events are prevented from entering the arrival list by the causal
1792 /// check in the replay loop (`has_pending_subscription_slot`), so the
1793 /// arrival array only contains events that have a matching active slot.
1794 fn get_external_event(&self, schedule_id: u64) -> Option<&String> {
1795 let (name, subscription_index) = self.external_subscriptions.get(&schedule_id)?;
1796
1797 // If this subscription is cancelled, it never resolves
1798 if self.external_cancelled_subscriptions.contains(&schedule_id) {
1799 return None;
1800 }
1801
1802 // Count cancelled subscriptions for this name with a lower raw index
1803 let cancelled_below = self
1804 .external_subscriptions
1805 .values()
1806 .filter(|(n, idx)| n == name && *idx < *subscription_index)
1807 .filter(|(_, idx)| {
1808 // Check if this subscription's schedule_id is in the cancelled set
1809 self.external_subscriptions
1810 .iter()
1811 .any(|(sid, (n, i))| n == name && i == idx && self.external_cancelled_subscriptions.contains(sid))
1812 })
1813 .count();
1814
1815 let effective_index = subscription_index - cancelled_below;
1816 let arrivals = self.external_arrivals.get(name)?;
1817 arrivals.get(effective_index)
1818 }
1819
1820 /// Mark an external subscription as cancelled.
1821 fn mark_external_subscription_cancelled(&mut self, schedule_id: u64) {
1822 self.external_cancelled_subscriptions.insert(schedule_id);
1823 }
1824
1825 // === Persistent External Event Methods (mailbox semantics) ===
1826
1827 /// Bind a persistent subscription.
1828 fn bind_queue_subscription(&mut self, schedule_id: u64, name: &str) {
1829 self.queue_subscriptions.push((schedule_id, name.to_string()));
1830 }
1831
1832 /// Deliver a persistent external event (appends to arrival list for the name).
1833 fn deliver_queue_message(&mut self, name: String, data: String) {
1834 self.queue_arrivals.entry(name).or_default().push(data);
1835 }
1836
1837 /// Mark a persistent subscription as cancelled (dropped without completing).
1838 fn mark_queue_subscription_cancelled(&mut self, schedule_id: u64) {
1839 self.queue_cancelled_subscriptions.insert(schedule_id);
1840 }
1841
1842 /// Get persistent event data for a subscription (by schedule_id).
1843 ///
1844 /// Uses FIFO matching: the first active (non-cancelled, non-resolved) persistent
1845 /// subscription gets the first unmatched persistent arrival for that name.
1846 fn get_queue_message(&mut self, schedule_id: u64) -> Option<String> {
1847 // Find which name this subscription is for
1848 let name = self
1849 .queue_subscriptions
1850 .iter()
1851 .find(|(sid, _)| *sid == schedule_id)
1852 .map(|(_, n)| n.clone())?;
1853
1854 // Already resolved?
1855 if self.queue_resolved_subscriptions.contains(&schedule_id) {
1856 return None;
1857 }
1858
1859 // Already cancelled?
1860 if self.queue_cancelled_subscriptions.contains(&schedule_id) {
1861 return None;
1862 }
1863
1864 // Our arrival index is exactly the number of active (non-cancelled)
1865 // subscriptions for this name that were created before us.
1866 // This guarantees strict FIFO matching regardless of poll order.
1867 let arrival_index: usize = self
1868 .queue_subscriptions
1869 .iter()
1870 .take_while(|(sid, _)| *sid != schedule_id)
1871 .filter(|(sid, n)| n == &name && !self.queue_cancelled_subscriptions.contains(sid))
1872 .count();
1873
1874 // Get arrivals for this name
1875 let arrivals = self.queue_arrivals.get(&name)?;
1876
1877 if arrival_index < arrivals.len() {
1878 // Mark as resolved
1879 self.queue_resolved_subscriptions.insert(schedule_id);
1880 Some(arrivals[arrival_index].clone())
1881 } else {
1882 None
1883 }
1884 }
1885
1886 /// Get cancelled persistent wait schedule_ids (tokens that were bound and then dropped).
1887 fn get_cancelled_queue_ids(&self) -> Vec<u64> {
1888 let mut ids = Vec::new();
1889 for &token in &self.cancelled_tokens {
1890 if let Some(kind) = self.cancelled_token_kinds.get(&token)
1891 && matches!(kind, ScheduleKind::QueueDequeue { .. })
1892 && let Some(&schedule_id) = self.token_bindings.get(&token)
1893 {
1894 ids.push(schedule_id);
1895 }
1896 }
1897 ids
1898 }
1899
1900 /// V2: Bind an external subscription with topic to a deterministic index.
1901 #[cfg(feature = "replay-version-test")]
1902 fn bind_external_subscription2(&mut self, schedule_id: u64, name: &str, topic: &str) {
1903 let key = (name.to_string(), topic.to_string());
1904 let idx = self.external2_next_index.entry(key.clone()).or_insert(0);
1905 let subscription_index = *idx;
1906 *idx += 1;
1907 self.external2_subscriptions
1908 .insert(schedule_id, (name.to_string(), topic.to_string(), subscription_index));
1909 }
1910
1911 /// V2: Deliver an external event with topic (appends to arrival list for (name, topic)).
1912 #[cfg(feature = "replay-version-test")]
1913 fn deliver_external_event2(&mut self, name: String, topic: String, data: String) {
1914 self.external2_arrivals.entry((name, topic)).or_default().push(data);
1915 }
1916
1917 /// V2: Get external event data for a topic-based subscription (by schedule_id).
1918 #[cfg(feature = "replay-version-test")]
1919 fn get_external_event2(&self, schedule_id: u64) -> Option<&String> {
1920 let (name, topic, subscription_index) = self.external2_subscriptions.get(&schedule_id)?;
1921 let arrivals = self.external2_arrivals.get(&(name.clone(), topic.clone()))?;
1922 arrivals.get(*subscription_index)
1923 }
1924
1925 // === Cancellation Helpers ===
1926
1927 /// Mark a token as cancelled (called by DurableFuture::drop).
1928 fn mark_token_cancelled(&mut self, token: u64, kind: ScheduleKind) {
1929 self.cancelled_tokens.insert(token);
1930 self.cancelled_token_kinds.insert(token, kind);
1931 }
1932
1933 /// Get cancelled activity schedule_ids (tokens that were bound and then dropped).
1934 fn get_cancelled_activity_ids(&self) -> Vec<u64> {
1935 let mut ids = Vec::new();
1936 for &token in &self.cancelled_tokens {
1937 if let Some(kind) = self.cancelled_token_kinds.get(&token)
1938 && matches!(kind, ScheduleKind::Activity { .. })
1939 && let Some(&schedule_id) = self.token_bindings.get(&token)
1940 {
1941 ids.push(schedule_id);
1942 }
1943 }
1944 ids
1945 }
1946
1947 /// Get cancelled external wait schedule_ids (tokens that were bound and then dropped).
1948 fn get_cancelled_external_wait_ids(&self) -> Vec<u64> {
1949 let mut ids = Vec::new();
1950 for &token in &self.cancelled_tokens {
1951 if let Some(kind) = self.cancelled_token_kinds.get(&token)
1952 && matches!(kind, ScheduleKind::ExternalWait { .. })
1953 && let Some(&schedule_id) = self.token_bindings.get(&token)
1954 {
1955 ids.push(schedule_id);
1956 }
1957 }
1958 ids
1959 }
1960
1961 /// Get cancelled sub-orchestration cancellations.
1962 ///
1963 /// Returns `(scheduling_event_id, child_instance_id)` for sub-orchestration futures that were
1964 /// bound (schedule_id assigned) and then dropped.
1965 fn get_cancelled_sub_orchestration_cancellations(&self) -> Vec<(u64, String)> {
1966 let mut cancels = Vec::new();
1967 for &token in &self.cancelled_tokens {
1968 if let Some(ScheduleKind::SubOrchestration { token: sub_token }) = self.cancelled_token_kinds.get(&token)
1969 && let Some(&schedule_id) = self.token_bindings.get(&token)
1970 {
1971 // Look up the resolved instance ID from our mapping
1972 if let Some(instance_id) = self.sub_orchestration_instances.get(sub_token) {
1973 cancels.push((schedule_id, instance_id.clone()));
1974 }
1975 // If not in mapping, the action wasn't bound yet - nothing to cancel
1976 }
1977 }
1978 cancels
1979 }
1980
1981 /// Bind a sub-orchestration token to its resolved instance ID.
1982 fn bind_sub_orchestration_instance(&mut self, token: u64, instance_id: String) {
1983 self.sub_orchestration_instances.insert(token, instance_id);
1984 }
1985
1986 /// Clear cancelled tokens (called after turn completion to avoid re-processing).
1987 fn clear_cancelled_tokens(&mut self) {
1988 self.cancelled_tokens.clear();
1989 self.cancelled_token_kinds.clear();
1990 }
1991
1992 // Note: deterministic GUID generation was removed from public API.
1993}
1994
1995/// User-facing orchestration context for scheduling and replay-safe helpers.
1996/// Context provided to activities for logging and metadata access.
1997///
1998/// Unlike [`OrchestrationContext`], activities are leaf nodes that cannot schedule new work,
1999/// but they often need to emit structured logs and inspect orchestration metadata. The
2000/// `ActivityContext` exposes the parent orchestration information and trace helpers that log
2001/// with full correlation fields.
2002///
2003/// # Examples
2004///
2005/// ```rust,no_run
2006/// # use duroxide::ActivityContext;
2007/// # use duroxide::runtime::registry::ActivityRegistry;
2008/// let activities = ActivityRegistry::builder()
2009/// .register("ProvisionVM", |ctx: ActivityContext, config: String| async move {
2010/// ctx.trace_info(format!("Provisioning VM with config: {}", config));
2011///
2012/// // Do actual work (can use sleep, HTTP, etc.)
2013/// let vm_id = provision_vm_internal(config).await?;
2014///
2015/// ctx.trace_info(format!("VM provisioned: {}", vm_id));
2016/// Ok(vm_id)
2017/// })
2018/// .build();
2019/// # async fn provision_vm_internal(config: String) -> Result<String, String> { Ok("vm-123".to_string()) }
2020/// ```
2021///
2022/// # Metadata Access
2023///
2024/// Activity context provides access to orchestration correlation metadata:
2025/// - `instance_id()` - Orchestration instance identifier
2026/// - `execution_id()` - Execution number (for ContinueAsNew scenarios)
2027/// - `orchestration_name()` - Parent orchestration name
2028/// - `orchestration_version()` - Parent orchestration version
2029/// - `activity_name()` - Current activity name
2030///
2031/// # Cancellation Support
2032///
2033/// Activities can respond to cancellation when their parent orchestration reaches a terminal state:
2034/// - `is_cancelled()` - Check if cancellation has been requested
2035/// - `cancelled()` - Future that completes when cancellation is requested (for use with `tokio::select!`)
2036/// - `cancellation_token()` - Get a clone of the token for spawned tasks
2037///
2038/// # Determinism
2039///
2040/// Activity trace helpers (`trace_info`, `trace_warn`, etc.) do **not** participate in
2041/// deterministic replay. They emit logs directly using [`tracing`] and should only be used for
2042/// diagnostic purposes.
2043#[derive(Clone)]
2044pub struct ActivityContext {
2045 instance_id: String,
2046 execution_id: u64,
2047 orchestration_name: String,
2048 orchestration_version: String,
2049 activity_name: String,
2050 activity_id: u64,
2051 worker_id: String,
2052 /// Optional session ID when scheduled via `schedule_activity_on_session`.
2053 session_id: Option<String>,
2054 /// Optional routing tag when scheduled via `.with_tag()`.
2055 tag: Option<String>,
2056 /// Cancellation token for cooperative cancellation.
2057 /// Triggered when the parent orchestration reaches a terminal state.
2058 cancellation_token: tokio_util::sync::CancellationToken,
2059 /// Provider store for accessing the Client API.
2060 store: std::sync::Arc<dyn crate::providers::Provider>,
2061}
2062
2063impl ActivityContext {
2064 /// Create a new activity context with a specific cancellation token.
2065 ///
2066 /// This constructor is intended for internal runtime use when the worker
2067 /// dispatcher needs to provide a cancellation token that can be triggered
2068 /// during activity execution.
2069 #[allow(clippy::too_many_arguments)]
2070 pub(crate) fn new_with_cancellation(
2071 instance_id: String,
2072 execution_id: u64,
2073 orchestration_name: String,
2074 orchestration_version: String,
2075 activity_name: String,
2076 activity_id: u64,
2077 worker_id: String,
2078 session_id: Option<String>,
2079 tag: Option<String>,
2080 cancellation_token: tokio_util::sync::CancellationToken,
2081 store: std::sync::Arc<dyn crate::providers::Provider>,
2082 ) -> Self {
2083 Self {
2084 instance_id,
2085 execution_id,
2086 orchestration_name,
2087 orchestration_version,
2088 activity_name,
2089 activity_id,
2090 worker_id,
2091 session_id,
2092 tag,
2093 cancellation_token,
2094 store,
2095 }
2096 }
2097
2098 /// Returns the orchestration instance identifier.
2099 pub fn instance_id(&self) -> &str {
2100 &self.instance_id
2101 }
2102
2103 /// Returns the execution id within the orchestration instance.
2104 pub fn execution_id(&self) -> u64 {
2105 self.execution_id
2106 }
2107
2108 /// Returns the parent orchestration name.
2109 pub fn orchestration_name(&self) -> &str {
2110 &self.orchestration_name
2111 }
2112
2113 /// Returns the parent orchestration version.
2114 pub fn orchestration_version(&self) -> &str {
2115 &self.orchestration_version
2116 }
2117
2118 /// Returns the activity name being executed.
2119 pub fn activity_name(&self) -> &str {
2120 &self.activity_name
2121 }
2122
2123 /// Returns the worker dispatcher ID processing this activity.
2124 pub fn worker_id(&self) -> &str {
2125 &self.worker_id
2126 }
2127
2128 /// Returns the session ID if this activity was scheduled via `schedule_activity_on_session`.
2129 ///
2130 /// Returns `None` for regular activities scheduled via `schedule_activity`.
2131 pub fn session_id(&self) -> Option<&str> {
2132 self.session_id.as_deref()
2133 }
2134
2135 /// Returns the routing tag if this activity was scheduled via `.with_tag()`.
2136 ///
2137 /// Returns `None` for activities scheduled without a tag.
2138 pub fn tag(&self) -> Option<&str> {
2139 self.tag.as_deref()
2140 }
2141
2142 /// Emit an INFO level trace entry associated with this activity.
2143 pub fn trace_info(&self, message: impl Into<String>) {
2144 tracing::info!(
2145 target: "duroxide::activity",
2146 instance_id = %self.instance_id,
2147 execution_id = %self.execution_id,
2148 orchestration_name = %self.orchestration_name,
2149 orchestration_version = %self.orchestration_version,
2150 activity_name = %self.activity_name,
2151 activity_id = %self.activity_id,
2152 worker_id = %self.worker_id,
2153 "{}",
2154 message.into()
2155 );
2156 }
2157
2158 /// Emit a WARN level trace entry associated with this activity.
2159 pub fn trace_warn(&self, message: impl Into<String>) {
2160 tracing::warn!(
2161 target: "duroxide::activity",
2162 instance_id = %self.instance_id,
2163 execution_id = %self.execution_id,
2164 orchestration_name = %self.orchestration_name,
2165 orchestration_version = %self.orchestration_version,
2166 activity_name = %self.activity_name,
2167 activity_id = %self.activity_id,
2168 worker_id = %self.worker_id,
2169 "{}",
2170 message.into()
2171 );
2172 }
2173
2174 /// Emit an ERROR level trace entry associated with this activity.
2175 pub fn trace_error(&self, message: impl Into<String>) {
2176 tracing::error!(
2177 target: "duroxide::activity",
2178 instance_id = %self.instance_id,
2179 execution_id = %self.execution_id,
2180 orchestration_name = %self.orchestration_name,
2181 orchestration_version = %self.orchestration_version,
2182 activity_name = %self.activity_name,
2183 activity_id = %self.activity_id,
2184 worker_id = %self.worker_id,
2185 "{}",
2186 message.into()
2187 );
2188 }
2189
2190 /// Emit a DEBUG level trace entry associated with this activity.
2191 pub fn trace_debug(&self, message: impl Into<String>) {
2192 tracing::debug!(
2193 target: "duroxide::activity",
2194 instance_id = %self.instance_id,
2195 execution_id = %self.execution_id,
2196 orchestration_name = %self.orchestration_name,
2197 orchestration_version = %self.orchestration_version,
2198 activity_name = %self.activity_name,
2199 activity_id = %self.activity_id,
2200 worker_id = %self.worker_id,
2201 "{}",
2202 message.into()
2203 );
2204 }
2205
2206 // ===== Cancellation Support =====
2207
2208 /// Check if cancellation has been requested.
2209 ///
2210 /// Returns `true` if the parent orchestration has completed, failed,
2211 /// or been cancelled. Activities can use this for cooperative cancellation.
2212 ///
2213 /// # Example
2214 ///
2215 /// ```ignore
2216 /// for item in items {
2217 /// if ctx.is_cancelled() {
2218 /// return Err("Activity cancelled".into());
2219 /// }
2220 /// process(item).await;
2221 /// }
2222 /// ```
2223 pub fn is_cancelled(&self) -> bool {
2224 self.cancellation_token.is_cancelled()
2225 }
2226
2227 /// Returns a future that completes when cancellation is requested.
2228 ///
2229 /// Use with `tokio::select!` for interruptible activities. This allows
2230 /// activities to respond promptly to cancellation without polling.
2231 ///
2232 /// # Example
2233 ///
2234 /// ```ignore
2235 /// tokio::select! {
2236 /// result = do_work() => return result,
2237 /// _ = ctx.cancelled() => return Err("Cancelled".into()),
2238 /// }
2239 /// ```
2240 pub async fn cancelled(&self) {
2241 self.cancellation_token.cancelled().await
2242 }
2243
2244 /// Get a clone of the cancellation token for use in spawned tasks.
2245 ///
2246 /// If your activity spawns child tasks with `tokio::spawn()`, you should
2247 /// pass them this token so they can also respond to cancellation.
2248 ///
2249 /// **Important:** If you spawn additional tasks/threads and do not pass them
2250 /// the cancellation token, they may outlive the activity's cancellation/abort.
2251 /// This is user error - the runtime provides the signal but cannot guarantee
2252 /// termination of arbitrary spawned work.
2253 ///
2254 /// # Example
2255 ///
2256 /// ```ignore
2257 /// let token = ctx.cancellation_token();
2258 /// let handle = tokio::spawn(async move {
2259 /// loop {
2260 /// tokio::select! {
2261 /// _ = do_work() => {}
2262 /// _ = token.cancelled() => break,
2263 /// }
2264 /// }
2265 /// });
2266 /// ```
2267 pub fn cancellation_token(&self) -> tokio_util::sync::CancellationToken {
2268 self.cancellation_token.clone()
2269 }
2270
2271 /// Get a Client for management operations.
2272 ///
2273 /// This allows activities to perform management operations such as
2274 /// pruning old executions, deleting instances, or querying instance status.
2275 ///
2276 /// # Example: Self-Pruning Eternal Orchestration
2277 ///
2278 /// ```ignore
2279 /// // Activity that prunes old executions
2280 /// async fn prune_self(ctx: ActivityContext, _input: String) -> Result<String, String> {
2281 /// let client = ctx.get_client();
2282 /// let instance_id = ctx.instance_id();
2283 ///
2284 /// let result = client.prune_executions(instance_id, PruneOptions {
2285 /// keep_last: Some(1), // Keep only current execution
2286 /// ..Default::default()
2287 /// }).await.map_err(|e| e.to_string())?;
2288 ///
2289 /// Ok(format!("Pruned {} executions", result.executions_deleted))
2290 /// }
2291 /// ```
2292 pub fn get_client(&self) -> crate::Client {
2293 crate::Client::new(self.store.clone())
2294 }
2295}
2296
2297impl std::fmt::Debug for ActivityContext {
2298 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2299 f.debug_struct("ActivityContext")
2300 .field("instance_id", &self.instance_id)
2301 .field("execution_id", &self.execution_id)
2302 .field("orchestration_name", &self.orchestration_name)
2303 .field("orchestration_version", &self.orchestration_version)
2304 .field("activity_name", &self.activity_name)
2305 .field("activity_id", &self.activity_id)
2306 .field("worker_id", &self.worker_id)
2307 .field("cancellation_token", &self.cancellation_token)
2308 .field("store", &"<Provider>")
2309 .finish()
2310 }
2311}
2312
2313#[derive(Clone)]
2314pub struct OrchestrationContext {
2315 inner: Arc<Mutex<CtxInner>>,
2316}
2317
2318/// A future that never resolves, used by `continue_as_new()` to prevent further execution.
2319///
2320/// This future always returns `Poll::Pending`, ensuring that code after `await ctx.continue_as_new()`
2321/// is unreachable. The runtime extracts actions before checking the future's state, so the
2322/// `ContinueAsNew` action is properly recorded and processed.
2323struct ContinueAsNewFuture;
2324
2325impl Future for ContinueAsNewFuture {
2326 type Output = Result<String, String>; // Matches orchestration return type, but never resolves
2327
2328 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
2329 // Always pending - never resolves, making code after await unreachable
2330 // The runtime checks pending_actions before using the output, so this value is never used
2331 Poll::Pending
2332 }
2333}
2334
2335impl OrchestrationContext {
2336 /// Construct a new context from an existing history vector.
2337 ///
2338 /// # Parameters
2339 ///
2340 /// * `orchestration_name` - The name of the orchestration being executed.
2341 /// * `orchestration_version` - The semantic version string of the orchestration.
2342 /// * `worker_id` - Optional dispatcher worker ID for logging correlation.
2343 /// - `Some(id)`: Used by runtime dispatchers to include worker_id in traces
2344 /// - `None`: Used by standalone/test execution without runtime context
2345 pub fn new(
2346 history: Vec<Event>,
2347 execution_id: u64,
2348 instance_id: String,
2349 orchestration_name: String,
2350 orchestration_version: String,
2351 worker_id: Option<String>,
2352 ) -> Self {
2353 Self {
2354 inner: Arc::new(Mutex::new(CtxInner::new(
2355 history,
2356 execution_id,
2357 instance_id,
2358 orchestration_name,
2359 orchestration_version,
2360 worker_id,
2361 ))),
2362 }
2363 }
2364
2365 /// Check if the orchestration is currently replaying history.
2366 ///
2367 /// Returns `true` when processing events from persisted history (replay),
2368 /// and `false` when executing new logic beyond the stored history.
2369 ///
2370 /// This is useful for skipping side effects during replay, such as:
2371 /// - Logging/tracing that should only happen on first execution
2372 /// - Metrics that shouldn't be double-counted
2373 /// - External notifications that shouldn't be re-sent
2374 ///
2375 /// # Example
2376 ///
2377 /// ```rust,no_run
2378 /// # use duroxide::OrchestrationContext;
2379 /// # async fn example(ctx: OrchestrationContext) {
2380 /// if !ctx.is_replaying() {
2381 /// // Only log on first execution, not during replay
2382 /// println!("Starting workflow for the first time");
2383 /// }
2384 /// # }
2385 /// ```
2386 pub fn is_replaying(&self) -> bool {
2387 self.inner.lock().unwrap().is_replaying
2388 }
2389
2390 /// Set the replaying state (used by replay engine and test harness).
2391 #[doc(hidden)]
2392 pub fn set_is_replaying(&self, is_replaying: bool) {
2393 self.inner.lock().unwrap().is_replaying = is_replaying;
2394 }
2395
2396 /// Bind an external subscription to a schedule_id (used by replay engine and test harness).
2397 #[doc(hidden)]
2398 pub fn bind_external_subscription(&self, schedule_id: u64, name: &str) {
2399 self.inner.lock().unwrap().bind_external_subscription(schedule_id, name);
2400 }
2401
2402 /// Deliver an external event (used by replay engine and test harness).
2403 #[doc(hidden)]
2404 pub fn deliver_external_event(&self, name: String, data: String) {
2405 self.inner.lock().unwrap().deliver_external_event(name, data);
2406 }
2407
2408 /// Bind a persistent subscription to a schedule_id (used by replay engine).
2409 #[doc(hidden)]
2410 pub fn bind_queue_subscription(&self, schedule_id: u64, name: &str) {
2411 self.inner.lock().unwrap().bind_queue_subscription(schedule_id, name);
2412 }
2413
2414 /// Deliver a persistent external event (used by replay engine).
2415 #[doc(hidden)]
2416 pub fn deliver_queue_message(&self, name: String, data: String) {
2417 self.inner.lock().unwrap().deliver_queue_message(name, data);
2418 }
2419
2420 /// Mark a persistent subscription as cancelled (used by replay engine).
2421 pub(crate) fn mark_queue_subscription_cancelled(&self, schedule_id: u64) {
2422 self.inner
2423 .lock()
2424 .unwrap()
2425 .mark_queue_subscription_cancelled(schedule_id);
2426 }
2427
2428 // =========================================================================
2429 // Cancellation Support (DurableFuture integration)
2430 // =========================================================================
2431
2432 /// Mark a token as cancelled (called by DurableFuture::drop).
2433 pub(crate) fn mark_token_cancelled(&self, token: u64, kind: &ScheduleKind) {
2434 self.inner.lock().unwrap().mark_token_cancelled(token, kind.clone());
2435 }
2436
2437 /// Get cancelled activity schedule_ids for this turn.
2438 pub(crate) fn get_cancelled_activity_ids(&self) -> Vec<u64> {
2439 self.inner.lock().unwrap().get_cancelled_activity_ids()
2440 }
2441
2442 /// Get cancelled external wait schedule_ids for this turn.
2443 pub(crate) fn get_cancelled_external_wait_ids(&self) -> Vec<u64> {
2444 self.inner.lock().unwrap().get_cancelled_external_wait_ids()
2445 }
2446
2447 /// Mark an external subscription as cancelled (called by replay engine).
2448 pub(crate) fn mark_external_subscription_cancelled(&self, schedule_id: u64) {
2449 self.inner
2450 .lock()
2451 .unwrap()
2452 .mark_external_subscription_cancelled(schedule_id);
2453 }
2454
2455 /// Get cancelled persistent wait schedule_ids for this turn.
2456 pub(crate) fn get_cancelled_queue_ids(&self) -> Vec<u64> {
2457 self.inner.lock().unwrap().get_cancelled_queue_ids()
2458 }
2459
2460 /// Get cancelled sub-orchestration cancellations for this turn.
2461 pub(crate) fn get_cancelled_sub_orchestration_cancellations(&self) -> Vec<(u64, String)> {
2462 self.inner
2463 .lock()
2464 .unwrap()
2465 .get_cancelled_sub_orchestration_cancellations()
2466 }
2467
2468 /// Clear cancelled tokens after processing (called by replay engine).
2469 pub(crate) fn clear_cancelled_tokens(&self) {
2470 self.inner.lock().unwrap().clear_cancelled_tokens();
2471 }
2472
2473 /// Bind a sub-orchestration token to its resolved instance ID.
2474 pub(crate) fn bind_sub_orchestration_instance(&self, token: u64, instance_id: String) {
2475 self.inner
2476 .lock()
2477 .unwrap()
2478 .bind_sub_orchestration_instance(token, instance_id);
2479 }
2480
2481 // =========================================================================
2482 // Simplified Mode Tracing (Replay-Guarded)
2483 // =========================================================================
2484 //
2485 // These trace methods use `is_replaying()` as a guard, which means:
2486 // - No history events are created for traces
2487 // - Traces only emit on first execution, not during replay
2488 // - Much simpler and more efficient than system-call-based tracing
2489
2490 /// Convenience wrapper for INFO level tracing.
2491 ///
2492 /// Logs with INFO level and includes instance context automatically.
2493 /// Only emits on first execution, not during replay.
2494 ///
2495 /// # Example
2496 ///
2497 /// ```rust,no_run
2498 /// # use duroxide::OrchestrationContext;
2499 /// # async fn example(ctx: OrchestrationContext) {
2500 /// ctx.trace_info("Processing order");
2501 /// ctx.trace_info(format!("Processing {} items", 42));
2502 /// # }
2503 /// ```
2504 pub fn trace_info(&self, message: impl Into<String>) {
2505 self.trace("INFO", message);
2506 }
2507
2508 /// Convenience wrapper for WARN level tracing.
2509 ///
2510 /// Logs with WARN level and includes instance context automatically.
2511 /// Only emits on first execution, not during replay.
2512 ///
2513 /// # Example
2514 ///
2515 /// ```rust,no_run
2516 /// # use duroxide::OrchestrationContext;
2517 /// # async fn example(ctx: OrchestrationContext) {
2518 /// ctx.trace_warn("Retrying failed operation");
2519 /// # }
2520 /// ```
2521 pub fn trace_warn(&self, message: impl Into<String>) {
2522 self.trace("WARN", message);
2523 }
2524
2525 /// Convenience wrapper for ERROR level tracing.
2526 ///
2527 /// Logs with ERROR level and includes instance context automatically.
2528 /// Only emits on first execution, not during replay.
2529 ///
2530 /// # Example
2531 ///
2532 /// ```rust,no_run
2533 /// # use duroxide::OrchestrationContext;
2534 /// # async fn example(ctx: OrchestrationContext) {
2535 /// ctx.trace_error("Payment processing failed");
2536 /// # }
2537 /// ```
2538 pub fn trace_error(&self, message: impl Into<String>) {
2539 self.trace("ERROR", message);
2540 }
2541
2542 /// Convenience wrapper for DEBUG level tracing.
2543 ///
2544 /// Logs with DEBUG level and includes instance context automatically.
2545 /// Only emits on first execution, not during replay.
2546 ///
2547 /// # Example
2548 ///
2549 /// ```rust,no_run
2550 /// # use duroxide::OrchestrationContext;
2551 /// # async fn example(ctx: OrchestrationContext) {
2552 /// ctx.trace_debug("Detailed state information");
2553 /// # }
2554 /// ```
2555 pub fn trace_debug(&self, message: impl Into<String>) {
2556 self.trace("DEBUG", message);
2557 }
2558
2559 /// Drain emitted actions.
2560 /// Returns a list of (token, Action) pairs.
2561 #[doc(hidden)]
2562 pub fn drain_emitted_actions(&self) -> Vec<(u64, Action)> {
2563 self.inner.lock().unwrap().drain_emitted_actions()
2564 }
2565
2566 /// Get a snapshot of emitted actions without draining.
2567 /// Returns a list of (token, Action) pairs.
2568 #[doc(hidden)]
2569 pub fn get_emitted_actions(&self) -> Vec<(u64, Action)> {
2570 self.inner.lock().unwrap().emitted_actions.clone()
2571 }
2572
2573 /// Bind a token to a schedule_id.
2574 #[doc(hidden)]
2575 pub fn bind_token(&self, token: u64, schedule_id: u64) {
2576 self.inner.lock().unwrap().bind_token(token, schedule_id);
2577 }
2578
2579 /// Deliver a result for a token.
2580 #[doc(hidden)]
2581 pub fn deliver_result(&self, schedule_id: u64, result: CompletionResult) {
2582 self.inner.lock().unwrap().deliver_result(schedule_id, result);
2583 }
2584
2585 /// Returns the orchestration instance identifier.
2586 ///
2587 /// This is the unique identifier for this orchestration instance, typically
2588 /// provided when starting the orchestration.
2589 ///
2590 /// # Example
2591 ///
2592 /// ```rust,no_run
2593 /// # use duroxide::OrchestrationContext;
2594 /// # async fn example(ctx: OrchestrationContext) {
2595 /// let id = ctx.instance_id();
2596 /// ctx.trace_info(format!("Processing instance: {}", id));
2597 /// # }
2598 /// ```
2599 pub fn instance_id(&self) -> String {
2600 self.inner.lock().unwrap().instance_id.clone()
2601 }
2602
2603 /// Returns the current execution ID within this orchestration instance.
2604 ///
2605 /// The execution ID increments each time `continue_as_new()` is called.
2606 /// Execution 1 is the initial execution.
2607 ///
2608 /// # Example
2609 ///
2610 /// ```rust,no_run
2611 /// # use duroxide::OrchestrationContext;
2612 /// # async fn example(ctx: OrchestrationContext) {
2613 /// let exec_id = ctx.execution_id();
2614 /// ctx.trace_info(format!("Execution #{}", exec_id));
2615 /// # }
2616 /// ```
2617 pub fn execution_id(&self) -> u64 {
2618 self.inner.lock().unwrap().execution_id
2619 }
2620
2621 /// Returns the orchestration name.
2622 ///
2623 /// This is the name registered with the orchestration registry.
2624 ///
2625 /// # Example
2626 ///
2627 /// ```rust,no_run
2628 /// # use duroxide::OrchestrationContext;
2629 /// # async fn example(ctx: OrchestrationContext) {
2630 /// let name = ctx.orchestration_name();
2631 /// ctx.trace_info(format!("Running orchestration: {}", name));
2632 /// # }
2633 /// ```
2634 pub fn orchestration_name(&self) -> String {
2635 self.inner.lock().unwrap().orchestration_name.clone()
2636 }
2637
2638 /// Returns the orchestration version.
2639 ///
2640 /// This is the semantic version string associated with the orchestration.
2641 ///
2642 /// # Example
2643 ///
2644 /// ```rust,no_run
2645 /// # use duroxide::OrchestrationContext;
2646 /// # async fn example(ctx: OrchestrationContext) {
2647 /// let version = ctx.orchestration_version();
2648 /// ctx.trace_info(format!("Version: {}", version));
2649 /// # }
2650 /// ```
2651 pub fn orchestration_version(&self) -> String {
2652 self.inner.lock().unwrap().orchestration_version.clone()
2653 }
2654
2655 // Replay-safe logging control
2656 /// Indicates whether logging is enabled for the current poll. This is
2657 /// flipped on when a decision is recorded to minimize log noise.
2658 pub fn is_logging_enabled(&self) -> bool {
2659 self.inner.lock().unwrap().logging_enabled_this_poll
2660 }
2661 // log_buffer removed - not used
2662
2663 /// Emit a structured trace entry with automatic context correlation.
2664 ///
2665 /// Creates a system call event for deterministic replay and logs to tracing.
2666 /// The log entry automatically includes correlation fields:
2667 /// - `instance_id` - The orchestration instance identifier
2668 /// - `execution_id` - The current execution number
2669 /// - `orchestration_name` - Name of the orchestration
2670 /// - `orchestration_version` - Semantic version
2671 ///
2672 /// # Determinism
2673 ///
2674 /// This method is replay-safe: logs are only emitted on first execution,
2675 /// not during replay.
2676 ///
2677 /// # Example
2678 ///
2679 /// ```rust,no_run
2680 /// # use duroxide::OrchestrationContext;
2681 /// # async fn example(ctx: OrchestrationContext) {
2682 /// ctx.trace("INFO", "Processing started");
2683 /// ctx.trace("WARN", format!("Retry attempt: {}", 3));
2684 /// ctx.trace("ERROR", "Payment validation failed");
2685 /// # }
2686 /// ```
2687 ///
2688 /// # Output
2689 ///
2690 /// ```text
2691 /// 2024-10-30T10:15:23.456Z INFO duroxide::orchestration [order-123] Processing started
2692 /// ```
2693 ///
2694 /// All logs include instance_id, execution_id, orchestration_name for correlation.
2695 pub fn trace(&self, level: impl Into<String>, message: impl Into<String>) {
2696 self.trace_internal(&level.into(), &message.into());
2697 }
2698
2699 /// Internal implementation of trace (guarded by is_replaying)
2700 fn trace_internal(&self, level: &str, message: &str) {
2701 let inner = self.inner.lock().unwrap();
2702
2703 // Only trace if not replaying
2704 if !inner.is_replaying {
2705 match level.to_uppercase().as_str() {
2706 "INFO" => tracing::info!(
2707 target: "duroxide::orchestration",
2708 instance_id = %inner.instance_id,
2709 execution_id = %inner.execution_id,
2710 orchestration_name = %inner.orchestration_name,
2711 orchestration_version = %inner.orchestration_version,
2712 "{}",
2713 message
2714 ),
2715 "WARN" => tracing::warn!(
2716 target: "duroxide::orchestration",
2717 instance_id = %inner.instance_id,
2718 execution_id = %inner.execution_id,
2719 orchestration_name = %inner.orchestration_name,
2720 orchestration_version = %inner.orchestration_version,
2721 "{}",
2722 message
2723 ),
2724 "ERROR" => tracing::error!(
2725 target: "duroxide::orchestration",
2726 instance_id = %inner.instance_id,
2727 execution_id = %inner.execution_id,
2728 orchestration_name = %inner.orchestration_name,
2729 orchestration_version = %inner.orchestration_version,
2730 "{}",
2731 message
2732 ),
2733 "DEBUG" => tracing::debug!(
2734 target: "duroxide::orchestration",
2735 instance_id = %inner.instance_id,
2736 execution_id = %inner.execution_id,
2737 orchestration_name = %inner.orchestration_name,
2738 orchestration_version = %inner.orchestration_version,
2739 "{}",
2740 message
2741 ),
2742 _ => tracing::trace!(
2743 target: "duroxide::orchestration",
2744 instance_id = %inner.instance_id,
2745 execution_id = %inner.execution_id,
2746 orchestration_name = %inner.orchestration_name,
2747 orchestration_version = %inner.orchestration_version,
2748 level = %level,
2749 "{}",
2750 message
2751 ),
2752 }
2753 }
2754 }
2755
2756 /// Generate a new deterministic GUID.
2757 ///
2758 /// This schedules a built-in activity that generates a unique identifier.
2759 /// The GUID is deterministic across replays (the same value is returned
2760 /// when the orchestration replays).
2761 ///
2762 /// # Example
2763 ///
2764 /// ```rust,no_run
2765 /// # use duroxide::OrchestrationContext;
2766 /// # async fn example(ctx: OrchestrationContext) -> Result<(), String> {
2767 /// let guid = ctx.new_guid().await?;
2768 /// println!("Generated GUID: {}", guid);
2769 /// # Ok(())
2770 /// # }
2771 /// ```
2772 pub fn new_guid(&self) -> impl Future<Output = Result<String, String>> {
2773 self.schedule_activity(SYSCALL_ACTIVITY_NEW_GUID, "")
2774 }
2775
2776 /// Get the current UTC time.
2777 ///
2778 /// This schedules a built-in activity that returns the current time.
2779 /// The time is deterministic across replays (the same value is returned
2780 /// when the orchestration replays).
2781 ///
2782 /// # Errors
2783 ///
2784 /// Returns an error if the activity fails or if the time value cannot be parsed.
2785 ///
2786 /// # Example
2787 ///
2788 /// ```rust,no_run
2789 /// # use duroxide::OrchestrationContext;
2790 /// # use std::time::{SystemTime, Duration};
2791 /// # async fn example(ctx: OrchestrationContext) -> Result<(), String> {
2792 /// let now = ctx.utc_now().await?;
2793 /// let deadline = now + Duration::from_secs(3600); // 1 hour from now
2794 /// # Ok(())
2795 /// # }
2796 /// ```
2797 pub fn utc_now(&self) -> impl Future<Output = Result<SystemTime, String>> {
2798 let fut = self.schedule_activity(SYSCALL_ACTIVITY_UTC_NOW_MS, "");
2799 async move {
2800 let s = fut.await?;
2801 let ms = s.parse::<u64>().map_err(|e| e.to_string())?;
2802 Ok(UNIX_EPOCH + StdDuration::from_millis(ms))
2803 }
2804 }
2805
2806 /// Continue the current execution as a new execution with fresh input.
2807 ///
2808 /// This terminates the current execution and starts a new execution with the provided input.
2809 /// Returns a future that never resolves, ensuring code after `await` is unreachable.
2810 ///
2811 /// # Example
2812 /// ```rust,no_run
2813 /// # use duroxide::OrchestrationContext;
2814 /// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
2815 /// let n: u32 = 0;
2816 /// if n < 2 {
2817 /// return ctx.continue_as_new("next_input").await; // Execution terminates here
2818 /// // This code is unreachable - compiler will warn
2819 /// }
2820 /// Ok("completed".to_string())
2821 /// # }
2822 /// ```
2823 pub fn continue_as_new(&self, input: impl Into<String>) -> impl Future<Output = Result<String, String>> {
2824 let mut inner = self.inner.lock().unwrap();
2825 let input: String = input.into();
2826 let action = Action::ContinueAsNew { input, version: None };
2827
2828 inner.emit_action(action);
2829 ContinueAsNewFuture
2830 }
2831
2832 pub fn continue_as_new_typed<In: serde::Serialize>(
2833 &self,
2834 input: &In,
2835 ) -> impl Future<Output = Result<String, String>> {
2836 // Serialization should never fail for valid input types - if it does, it's a programming error
2837 let payload =
2838 crate::_typed_codec::Json::encode(input).expect("Serialization should never fail for valid input");
2839 self.continue_as_new(payload)
2840 }
2841
2842 /// ContinueAsNew to a specific target version (string is parsed as semver later).
2843 pub fn continue_as_new_versioned(
2844 &self,
2845 version: impl Into<String>,
2846 input: impl Into<String>,
2847 ) -> impl Future<Output = Result<String, String>> {
2848 let mut inner = self.inner.lock().unwrap();
2849 let action = Action::ContinueAsNew {
2850 input: input.into(),
2851 version: Some(version.into()),
2852 };
2853 inner.emit_action(action);
2854 ContinueAsNewFuture
2855 }
2856}
2857
2858/// Generate a deterministic GUID for use in orchestrations.
2859///
2860/// Uses timestamp + thread-local counter for uniqueness.
2861pub(crate) fn generate_guid() -> String {
2862 use std::time::{SystemTime, UNIX_EPOCH};
2863
2864 let timestamp = SystemTime::now()
2865 .duration_since(UNIX_EPOCH)
2866 .map(|d| d.as_nanos())
2867 .unwrap_or(0);
2868
2869 // Thread-local counter for uniqueness within the same timestamp
2870 thread_local! {
2871 static COUNTER: std::cell::Cell<u32> = const { std::cell::Cell::new(0) };
2872 }
2873 let counter = COUNTER.with(|c| {
2874 let val = c.get();
2875 c.set(val.wrapping_add(1));
2876 val
2877 });
2878
2879 // Format as UUID-like string
2880 format!(
2881 "{:08x}-{:04x}-{:04x}-{:04x}-{:012x}",
2882 (timestamp >> 96) as u32,
2883 ((timestamp >> 80) & 0xFFFF) as u16,
2884 (counter & 0xFFFF) as u16,
2885 ((timestamp >> 64) & 0xFFFF) as u16,
2886 (timestamp & 0xFFFFFFFFFFFF) as u64
2887 )
2888}
2889
2890impl OrchestrationContext {
2891 /// Schedule activity with automatic retry on failure.
2892 ///
2893 /// **Retry behavior:**
2894 /// - Retries on activity **errors** up to `policy.max_attempts`
2895 /// - **Timeouts are NOT retried** - if any attempt times out, returns error immediately
2896 /// - Only application errors trigger retry logic
2897 ///
2898 /// **Timeout behavior (if `policy.total_timeout` is set):**
2899 /// - Each activity attempt is raced against the timeout
2900 /// - If the timeout fires before the activity completes → returns timeout error (no retry)
2901 /// - If the activity fails with an error before timeout → retry according to policy
2902 ///
2903 /// # Example
2904 ///
2905 /// ```rust,no_run
2906 /// # use duroxide::{OrchestrationContext, RetryPolicy, BackoffStrategy};
2907 /// # use std::time::Duration;
2908 /// # async fn example(ctx: OrchestrationContext) -> Result<(), String> {
2909 /// // Simple retry with defaults (no timeout)
2910 /// let result = ctx.schedule_activity_with_retry(
2911 /// "CallAPI",
2912 /// "request",
2913 /// RetryPolicy::new(3),
2914 /// ).await?;
2915 ///
2916 /// // Retry with per-attempt timeout and custom backoff
2917 /// let result = ctx.schedule_activity_with_retry(
2918 /// "CallAPI",
2919 /// "request",
2920 /// RetryPolicy::new(5)
2921 /// .with_timeout(Duration::from_secs(30)) // 30s per attempt
2922 /// .with_backoff(BackoffStrategy::Fixed { delay: Duration::from_secs(1) }),
2923 /// ).await?;
2924 /// # Ok(())
2925 /// # }
2926 /// ```
2927 ///
2928 /// # Errors
2929 ///
2930 /// Returns an error if all retry attempts fail or if a timeout occurs (timeouts are not retried).
2931 pub async fn schedule_activity_with_retry(
2932 &self,
2933 name: impl Into<String>,
2934 input: impl Into<String>,
2935 policy: RetryPolicy,
2936 ) -> Result<String, String> {
2937 let name = name.into();
2938 let input = input.into();
2939 let mut last_error = String::new();
2940
2941 for attempt in 1..=policy.max_attempts {
2942 // Each attempt: optionally race against per-attempt timeout
2943 let activity_result = if let Some(timeout) = policy.timeout {
2944 // Race activity vs per-attempt timeout
2945 let deadline = async {
2946 self.schedule_timer(timeout).await;
2947 Err::<String, String>("timeout: activity timed out".to_string())
2948 };
2949 let activity = self.schedule_activity(&name, &input);
2950
2951 match self.select2(activity, deadline).await {
2952 Either2::First(result) => result,
2953 Either2::Second(Err(e)) => {
2954 // Timeout fired - exit immediately, no retry for timeouts
2955 return Err(e);
2956 }
2957 Either2::Second(Ok(_)) => unreachable!(),
2958 }
2959 } else {
2960 // No timeout - just await the activity
2961 self.schedule_activity(&name, &input).await
2962 };
2963
2964 match activity_result {
2965 Ok(result) => return Ok(result),
2966 Err(e) => {
2967 // Activity failed with error - apply retry policy
2968 last_error = e.clone();
2969 if attempt < policy.max_attempts {
2970 self.trace(
2971 "warn",
2972 format!(
2973 "Activity '{}' attempt {}/{} failed: {}. Retrying...",
2974 name, attempt, policy.max_attempts, e
2975 ),
2976 );
2977 let delay = policy.delay_for_attempt(attempt);
2978 if !delay.is_zero() {
2979 self.schedule_timer(delay).await;
2980 }
2981 }
2982 }
2983 }
2984 }
2985 Err(last_error)
2986 }
2987
2988 /// Typed variant of `schedule_activity_with_retry`.
2989 ///
2990 /// Serializes input once and deserializes the successful result.
2991 ///
2992 /// # Errors
2993 ///
2994 /// Returns an error if all retry attempts fail, if a timeout occurs, if input serialization fails, or if result deserialization fails.
2995 pub async fn schedule_activity_with_retry_typed<In: serde::Serialize, Out: serde::de::DeserializeOwned>(
2996 &self,
2997 name: impl Into<String>,
2998 input: &In,
2999 policy: RetryPolicy,
3000 ) -> Result<Out, String> {
3001 let payload = crate::_typed_codec::Json::encode(input).expect("encode");
3002 let result = self.schedule_activity_with_retry(name, payload, policy).await?;
3003 crate::_typed_codec::Json::decode::<Out>(&result)
3004 }
3005
3006 /// Schedule an activity with automatic retry on a specific session.
3007 ///
3008 /// Combines retry semantics from [`Self::schedule_activity_with_retry`] with
3009 /// session affinity from [`Self::schedule_activity_on_session`]. All retry
3010 /// attempts are pinned to the same `session_id`, ensuring they execute on
3011 /// the same worker.
3012 ///
3013 /// # Example
3014 ///
3015 /// ```rust,no_run
3016 /// # use duroxide::{OrchestrationContext, RetryPolicy, BackoffStrategy};
3017 /// # use std::time::Duration;
3018 /// # async fn example(ctx: OrchestrationContext) -> Result<(), String> {
3019 /// let session = ctx.new_guid().await?;
3020 /// let result = ctx.schedule_activity_with_retry_on_session(
3021 /// "RunQuery",
3022 /// "SELECT 1",
3023 /// RetryPolicy::new(3)
3024 /// .with_backoff(BackoffStrategy::Fixed { delay: Duration::from_secs(1) }),
3025 /// &session,
3026 /// ).await?;
3027 /// # Ok(())
3028 /// # }
3029 /// ```
3030 ///
3031 /// # Errors
3032 ///
3033 /// Returns an error if all retry attempts fail or if a timeout occurs (timeouts are not retried).
3034 pub async fn schedule_activity_with_retry_on_session(
3035 &self,
3036 name: impl Into<String>,
3037 input: impl Into<String>,
3038 policy: RetryPolicy,
3039 session_id: impl Into<String>,
3040 ) -> Result<String, String> {
3041 let name = name.into();
3042 let input = input.into();
3043 let session_id = session_id.into();
3044 let mut last_error = String::new();
3045
3046 for attempt in 1..=policy.max_attempts {
3047 let activity_result = if let Some(timeout) = policy.timeout {
3048 let deadline = async {
3049 self.schedule_timer(timeout).await;
3050 Err::<String, String>("timeout: activity timed out".to_string())
3051 };
3052 let activity = self.schedule_activity_on_session(&name, &input, &session_id);
3053
3054 match self.select2(activity, deadline).await {
3055 Either2::First(result) => result,
3056 Either2::Second(Err(e)) => return Err(e),
3057 Either2::Second(Ok(_)) => unreachable!(),
3058 }
3059 } else {
3060 self.schedule_activity_on_session(&name, &input, &session_id).await
3061 };
3062
3063 match activity_result {
3064 Ok(result) => return Ok(result),
3065 Err(e) => {
3066 last_error = e.clone();
3067 if attempt < policy.max_attempts {
3068 self.trace(
3069 "warn",
3070 format!(
3071 "Activity '{}' (session={}) attempt {}/{} failed: {}. Retrying...",
3072 name, session_id, attempt, policy.max_attempts, e
3073 ),
3074 );
3075 let delay = policy.delay_for_attempt(attempt);
3076 if !delay.is_zero() {
3077 self.schedule_timer(delay).await;
3078 }
3079 }
3080 }
3081 }
3082 }
3083 Err(last_error)
3084 }
3085
3086 /// Typed variant of [`Self::schedule_activity_with_retry_on_session`].
3087 ///
3088 /// Serializes input once and deserializes the successful result. All retry
3089 /// attempts are pinned to the same session.
3090 ///
3091 /// # Errors
3092 ///
3093 /// Returns an error if all retry attempts fail, if a timeout occurs, if input
3094 /// serialization fails, or if result deserialization fails.
3095 pub async fn schedule_activity_with_retry_on_session_typed<
3096 In: serde::Serialize,
3097 Out: serde::de::DeserializeOwned,
3098 >(
3099 &self,
3100 name: impl Into<String>,
3101 input: &In,
3102 policy: RetryPolicy,
3103 session_id: impl Into<String>,
3104 ) -> Result<Out, String> {
3105 let payload = crate::_typed_codec::Json::encode(input).expect("encode");
3106 let result = self
3107 .schedule_activity_with_retry_on_session(name, payload, policy, session_id)
3108 .await?;
3109 crate::_typed_codec::Json::decode::<Out>(&result)
3110 }
3111
3112 /// Schedule a detached orchestration with an explicit instance id.
3113 /// The runtime will prefix this with the parent instance to ensure global uniqueness.
3114 pub fn schedule_orchestration(
3115 &self,
3116 name: impl Into<String>,
3117 instance: impl Into<String>,
3118 input: impl Into<String>,
3119 ) {
3120 let name: String = name.into();
3121 let instance: String = instance.into();
3122 let input: String = input.into();
3123 let mut inner = self.inner.lock().unwrap();
3124
3125 let _ = inner.emit_action(Action::StartOrchestrationDetached {
3126 scheduling_event_id: 0, // Will be assigned by replay engine
3127 name,
3128 version: None,
3129 instance,
3130 input,
3131 });
3132 }
3133
3134 pub fn schedule_orchestration_typed<In: serde::Serialize>(
3135 &self,
3136 name: impl Into<String>,
3137 instance: impl Into<String>,
3138 input: &In,
3139 ) {
3140 let payload = crate::_typed_codec::Json::encode(input).expect("encode");
3141 self.schedule_orchestration(name, instance, payload)
3142 }
3143
3144 /// Versioned detached orchestration start (string I/O). If `version` is None, registry policy is used for the child.
3145 pub fn schedule_orchestration_versioned(
3146 &self,
3147 name: impl Into<String>,
3148 version: Option<String>,
3149 instance: impl Into<String>,
3150 input: impl Into<String>,
3151 ) {
3152 let name: String = name.into();
3153 let instance: String = instance.into();
3154 let input: String = input.into();
3155 let mut inner = self.inner.lock().unwrap();
3156
3157 let _ = inner.emit_action(Action::StartOrchestrationDetached {
3158 scheduling_event_id: 0, // Will be assigned by replay engine
3159 name,
3160 version,
3161 instance,
3162 input,
3163 });
3164 }
3165
3166 pub fn schedule_orchestration_versioned_typed<In: serde::Serialize>(
3167 &self,
3168 name: impl Into<String>,
3169 version: Option<String>,
3170 instance: impl Into<String>,
3171 input: &In,
3172 ) {
3173 let payload = crate::_typed_codec::Json::encode(input).expect("encode");
3174 self.schedule_orchestration_versioned(name, version, instance, payload)
3175 }
3176
3177 /// Set a user-defined custom status for progress reporting.
3178 ///
3179 /// This is **not** a history event or an action — it's pure metadata that gets
3180 /// plumbed into `ExecutionMetadata` at ack time. No impact on determinism,
3181 /// no replay implications.
3182 ///
3183 /// - Call it whenever, as many times as you want within a turn
3184 /// - Last write wins: if called twice in the same turn, only the last value is sent
3185 /// - Persistent across turns: if you don't call it on a later turn, the provider
3186 /// keeps the previous value
3187 ///
3188 /// # Example
3189 ///
3190 /// ```rust,no_run
3191 /// # use duroxide::OrchestrationContext;
3192 /// # async fn example(ctx: OrchestrationContext) {
3193 /// ctx.set_custom_status("Processing item 3 of 10");
3194 /// let result = ctx.schedule_activity("ProcessItem", "item-3").await;
3195 /// ctx.set_custom_status("Processing item 4 of 10");
3196 /// # }
3197 /// ```
3198 pub fn set_custom_status(&self, status: impl Into<String>) {
3199 let status: String = status.into();
3200 let mut inner = self.inner.lock().unwrap();
3201 inner.accumulated_custom_status = Some(status.clone());
3202 inner.emit_action(Action::UpdateCustomStatus { status: Some(status) });
3203 }
3204
3205 /// Clear the custom status back to `None`. The provider will set the column to NULL
3206 /// and increment `custom_status_version`.
3207 ///
3208 /// ```rust,no_run
3209 /// # use duroxide::OrchestrationContext;
3210 /// # async fn example(ctx: OrchestrationContext) {
3211 /// ctx.set_custom_status("Processing batch");
3212 /// // ... work ...
3213 /// ctx.reset_custom_status(); // done, clear the progress
3214 /// # }
3215 /// ```
3216 pub fn reset_custom_status(&self) {
3217 let mut inner = self.inner.lock().unwrap();
3218 inner.accumulated_custom_status = None;
3219 inner.emit_action(Action::UpdateCustomStatus { status: None });
3220 }
3221
3222 /// Returns the current custom status value, if any.
3223 ///
3224 /// This reflects all `set_custom_status` / `reset_custom_status` calls made so far
3225 /// in this and previous turns. In a CAN'd execution, it includes the value carried
3226 /// from the previous execution.
3227 pub fn get_custom_status(&self) -> Option<String> {
3228 self.inner.lock().unwrap().accumulated_custom_status.clone()
3229 }
3230
3231 // =========================================================================
3232 // Key-Value Store
3233 // =========================================================================
3234
3235 /// Set a key-value pair scoped to this orchestration instance.
3236 ///
3237 /// Emits a `KeyValueSet` history event. The provider materializes this
3238 /// into the `kv_store` table during ack. The `last_updated_at_ms` timestamp
3239 /// is stamped from the current wall clock and persisted in the event.
3240 pub fn set_kv_value(&self, key: impl Into<String>, value: impl Into<String>) {
3241 let key: String = key.into();
3242 let value: String = value.into();
3243 let mut inner = self.inner.lock().unwrap();
3244 let last_updated_at_ms = inner.now_ms();
3245 inner.kv_state.insert(key.clone(), value.clone());
3246 inner.kv_metadata.insert(key.clone(), last_updated_at_ms);
3247 inner.emit_action(Action::SetKeyValue {
3248 key,
3249 value,
3250 last_updated_at_ms,
3251 });
3252 }
3253
3254 /// Set a typed value (serialized as JSON) scoped to this orchestration instance.
3255 pub fn set_kv_value_typed<T: serde::Serialize>(&self, key: impl Into<String>, value: &T) {
3256 let serialized = serde_json::to_string(value).expect("KV value serialization should not fail");
3257 self.set_kv_value(key, serialized);
3258 }
3259
3260 /// Read a KV entry from in-memory state.
3261 ///
3262 /// Pure read — no provider call, no event emitted, fully deterministic.
3263 /// Returns `None` if the key has never been set or was cleared.
3264 pub fn get_kv_value(&self, key: &str) -> Option<String> {
3265 self.inner.lock().unwrap().kv_state.get(key).cloned()
3266 }
3267
3268 /// Read a typed KV entry. Returns `None` if the key doesn't exist.
3269 ///
3270 /// # Errors
3271 ///
3272 /// Returns `Err` if the key exists but deserialization fails.
3273 pub fn get_kv_value_typed<T: serde::de::DeserializeOwned>(&self, key: &str) -> Result<Option<T>, String> {
3274 match self.get_kv_value(key) {
3275 None => Ok(None),
3276 Some(s) => serde_json::from_str(&s)
3277 .map(Some)
3278 .map_err(|e| format!("KV deserialization error for key '{}': {}", key, e)),
3279 }
3280 }
3281
3282 /// Return a snapshot of all KV entries as a `HashMap`.
3283 ///
3284 /// Pure read from in-memory state — no provider call, no event emitted.
3285 pub fn get_kv_all_values(&self) -> std::collections::HashMap<String, String> {
3286 self.inner.lock().unwrap().kv_state.clone()
3287 }
3288
3289 /// Return a list of all KV keys.
3290 ///
3291 /// Pure read from in-memory state — no provider call, no event emitted.
3292 pub fn get_kv_all_keys(&self) -> Vec<String> {
3293 self.inner.lock().unwrap().kv_state.keys().cloned().collect()
3294 }
3295
3296 /// Return the number of KV entries.
3297 ///
3298 /// Pure read from in-memory state — no provider call, no event emitted.
3299 pub fn get_kv_length(&self) -> usize {
3300 self.inner.lock().unwrap().kv_state.len()
3301 }
3302
3303 /// Clear a single key from the KV store.
3304 ///
3305 /// Emits a `KeyValueCleared` history event. After this call,
3306 /// `get_kv_value(key)` returns `None`.
3307 pub fn clear_kv_value(&self, key: impl Into<String>) {
3308 let key: String = key.into();
3309 let mut inner = self.inner.lock().unwrap();
3310 inner.kv_state.remove(&key);
3311 inner.kv_metadata.remove(&key);
3312 inner.emit_action(Action::ClearKeyValue { key });
3313 }
3314
3315 /// Clear all keys from the KV store.
3316 ///
3317 /// Emits a `KeyValuesCleared` history event. After this call,
3318 /// `get_kv_value(key)` returns `None` for all keys.
3319 pub fn clear_all_kv_values(&self) {
3320 let mut inner = self.inner.lock().unwrap();
3321 inner.kv_state.clear();
3322 inner.kv_metadata.clear();
3323 inner.emit_action(Action::ClearKeyValues);
3324 }
3325
3326 /// Remove KV entries whose persisted `last_updated_at_ms` is older than `updated_before_ms`.
3327 ///
3328 /// This helper scans the snapshot metadata loaded from the provider and emits
3329 /// `clear_kv_value()` for each qualifying key. Keys written during the current
3330 /// turn are never prunable (they haven't been acked yet).
3331 ///
3332 /// Returns the number of keys cleared.
3333 pub fn prune_kv_values_updated_before(&self, updated_before_ms: u64) -> usize {
3334 let keys_to_clear: Vec<String> = {
3335 let inner = self.inner.lock().unwrap();
3336 inner
3337 .kv_metadata
3338 .iter()
3339 .filter(|(_, ts)| **ts < updated_before_ms)
3340 .filter(|(key, _)| inner.kv_state.contains_key(*key))
3341 .map(|(key, _)| key.clone())
3342 .collect()
3343 };
3344 let count = keys_to_clear.len();
3345 for key in keys_to_clear {
3346 self.clear_kv_value(key);
3347 }
3348 count
3349 }
3350
3351 /// Read a KV entry from another orchestration instance.
3352 ///
3353 /// This is modeled as a system activity — under the covers it schedules
3354 /// `__duroxide_syscall:get_kv_value` which calls `client.get_kv_value()`.
3355 /// Returns the value at the time the activity executes (not replay-cached on first run).
3356 /// On replay, the recorded result is returned — no provider call.
3357 pub fn get_kv_value_from_instance(
3358 &self,
3359 instance_id: impl Into<String>,
3360 key: impl Into<String>,
3361 ) -> DurableFuture<Result<Option<String>, String>> {
3362 let input = serde_json::json!({
3363 "instance_id": instance_id.into(),
3364 "key": key.into(),
3365 })
3366 .to_string();
3367 self.schedule_activity(SYSCALL_ACTIVITY_GET_KV_VALUE, input)
3368 .map(|result| match result {
3369 Ok(json_str) => serde_json::from_str::<Option<String>>(&json_str)
3370 .map_err(|e| format!("get_kv_value_from_instance deserialization error: {e}")),
3371 Err(e) => Err(e),
3372 })
3373 }
3374
3375 /// Typed variant of `get_kv_value_from_instance`.
3376 ///
3377 /// Deserializes the returned JSON string into `T`.
3378 /// Returns `Ok(None)` if the key doesn't exist in the remote instance.
3379 pub fn get_kv_value_from_instance_typed<T: serde::de::DeserializeOwned + Send + 'static>(
3380 &self,
3381 instance_id: impl Into<String>,
3382 key: impl Into<String>,
3383 ) -> DurableFuture<Result<Option<T>, String>> {
3384 self.get_kv_value_from_instance(instance_id, key)
3385 .map(|result| match result {
3386 Ok(None) => Ok(None),
3387 Ok(Some(s)) => serde_json::from_str::<T>(&s)
3388 .map(Some)
3389 .map_err(|e| format!("get_kv_value_from_instance_typed deserialization error: {e}")),
3390 Err(e) => Err(e),
3391 })
3392 }
3393}
3394
3395// Aggregate future machinery lives below (OrchestrationContext helpers)
3396
3397impl OrchestrationContext {
3398 // =========================================================================
3399 // Core scheduling methods - return DurableFuture with cancellation support
3400 // =========================================================================
3401
3402 /// Schedule an activity and return a cancellation-aware future.
3403 ///
3404 /// Returns a [`DurableFuture`] that supports cancellation on drop. If the future
3405 /// is dropped without completing (e.g., as a select loser), the activity will be
3406 /// cancelled via lock stealing.
3407 ///
3408 /// # Example
3409 ///
3410 /// ```rust,no_run
3411 /// # use duroxide::OrchestrationContext;
3412 /// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
3413 /// // Fan-out to multiple activities
3414 /// let f1 = ctx.schedule_activity("Process", "A");
3415 /// let f2 = ctx.schedule_activity("Process", "B");
3416 /// let results = ctx.join(vec![f1, f2]).await;
3417 /// # Ok("done".to_string())
3418 /// # }
3419 /// ```
3420 pub fn schedule_activity(
3421 &self,
3422 name: impl Into<String>,
3423 input: impl Into<String>,
3424 ) -> DurableFuture<Result<String, String>> {
3425 self.schedule_activity_internal(name, input, None)
3426 }
3427
3428 /// Typed version of schedule_activity that serializes input and deserializes output.
3429 ///
3430 /// # Errors
3431 ///
3432 /// Returns an error if the activity fails or if the output cannot be deserialized.
3433 pub fn schedule_activity_typed<In: serde::Serialize, Out: serde::de::DeserializeOwned + Send + 'static>(
3434 &self,
3435 name: impl Into<String>,
3436 input: &In,
3437 ) -> DurableFuture<Result<Out, String>> {
3438 let payload = crate::_typed_codec::Json::encode(input).expect("encode");
3439 self.schedule_activity(name, payload)
3440 .map(|r| r.and_then(|s| crate::_typed_codec::Json::decode::<Out>(&s)))
3441 }
3442
3443 /// Schedule an activity routed to the worker owning the given session.
3444 ///
3445 /// If no worker owns the session, any worker can claim it on first fetch.
3446 /// Once claimed, all subsequent activities with the same `session_id` route
3447 /// to the claiming worker until the session unpins (idle timeout or worker death).
3448 ///
3449 /// # Example
3450 ///
3451 /// ```rust,no_run
3452 /// # use duroxide::OrchestrationContext;
3453 /// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
3454 /// let session_id = ctx.new_guid().await?;
3455 /// let result = ctx.schedule_activity_on_session("run_turn", "input", &session_id).await?;
3456 /// # Ok(result)
3457 /// # }
3458 /// ```
3459 pub fn schedule_activity_on_session(
3460 &self,
3461 name: impl Into<String>,
3462 input: impl Into<String>,
3463 session_id: impl Into<String>,
3464 ) -> DurableFuture<Result<String, String>> {
3465 self.schedule_activity_internal(name, input, Some(session_id.into()))
3466 }
3467
3468 /// Typed version of schedule_activity_on_session that serializes input and deserializes output.
3469 ///
3470 /// # Errors
3471 ///
3472 /// Returns an error if the activity fails or if the output cannot be deserialized.
3473 pub fn schedule_activity_on_session_typed<
3474 In: serde::Serialize,
3475 Out: serde::de::DeserializeOwned + Send + 'static,
3476 >(
3477 &self,
3478 name: impl Into<String>,
3479 input: &In,
3480 session_id: impl Into<String>,
3481 ) -> DurableFuture<Result<Out, String>> {
3482 let payload = crate::_typed_codec::Json::encode(input).expect("encode");
3483 self.schedule_activity_on_session(name, payload, session_id)
3484 .map(|r| r.and_then(|s| crate::_typed_codec::Json::decode::<Out>(&s)))
3485 }
3486
3487 /// Internal implementation for activity scheduling.
3488 fn schedule_activity_internal(
3489 &self,
3490 name: impl Into<String>,
3491 input: impl Into<String>,
3492 session_id: Option<String>,
3493 ) -> DurableFuture<Result<String, String>> {
3494 let name: String = name.into();
3495 let input: String = input.into();
3496
3497 let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
3498
3499 let token = inner.emit_action(Action::CallActivity {
3500 scheduling_event_id: 0, // Will be assigned by replay engine
3501 name: name.clone(),
3502 input: input.clone(),
3503 session_id,
3504 tag: None,
3505 });
3506 drop(inner);
3507
3508 let ctx = self.clone();
3509 let inner_future = std::future::poll_fn(move |_cx| {
3510 let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
3511 if let Some(result) = inner.get_result(token) {
3512 match result {
3513 CompletionResult::ActivityOk(s) => Poll::Ready(Ok(s.clone())),
3514 CompletionResult::ActivityErr(e) => Poll::Ready(Err(e.clone())),
3515 _ => Poll::Pending, // Wrong result type, keep waiting
3516 }
3517 } else {
3518 Poll::Pending
3519 }
3520 });
3521
3522 DurableFuture::new(token, ScheduleKind::Activity { name }, self.clone(), inner_future)
3523 }
3524
3525 /// Schedule a timer and return a cancellation-aware future.
3526 ///
3527 /// Timers are virtual constructs - dropping the future is a no-op since there's
3528 /// no external state to cancel. However, wrapping in `DurableFuture` maintains
3529 /// API consistency.
3530 pub fn schedule_timer(&self, delay: std::time::Duration) -> DurableFuture<()> {
3531 let delay_ms = delay.as_millis() as u64;
3532
3533 let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
3534
3535 let now = inner.now_ms();
3536 let fire_at_ms = now.saturating_add(delay_ms);
3537 let token = inner.emit_action(Action::CreateTimer {
3538 scheduling_event_id: 0,
3539 fire_at_ms,
3540 });
3541 drop(inner);
3542
3543 let ctx = self.clone();
3544 let inner_future = std::future::poll_fn(move |_cx| {
3545 let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
3546 if let Some(result) = inner.get_result(token) {
3547 match result {
3548 CompletionResult::TimerFired => Poll::Ready(()),
3549 _ => Poll::Pending,
3550 }
3551 } else {
3552 Poll::Pending
3553 }
3554 });
3555
3556 DurableFuture::new(token, ScheduleKind::Timer, self.clone(), inner_future)
3557 }
3558
3559 /// Subscribe to an external event and return a cancellation-aware future.
3560 ///
3561 /// External waits are virtual constructs - dropping the future is a no-op since
3562 /// there's no external state to cancel. However, wrapping in `DurableFuture`
3563 /// maintains API consistency.
3564 pub fn schedule_wait(&self, name: impl Into<String>) -> DurableFuture<String> {
3565 let name: String = name.into();
3566
3567 let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
3568
3569 let token = inner.emit_action(Action::WaitExternal {
3570 scheduling_event_id: 0,
3571 name: name.clone(),
3572 });
3573 drop(inner);
3574
3575 let ctx = self.clone();
3576 let inner_future = std::future::poll_fn(move |_cx| {
3577 let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
3578 // Only resolve once the token has been bound to a persisted schedule_id.
3579 // External events arriving before subscription binding are currently unsupported.
3580 if let Some(bound_id) = inner.get_bound_schedule_id(token)
3581 && let Some(data) = inner.get_external_event(bound_id)
3582 {
3583 return Poll::Ready(data.clone());
3584 }
3585 Poll::Pending
3586 });
3587
3588 DurableFuture::new(
3589 token,
3590 ScheduleKind::ExternalWait { event_name: name },
3591 self.clone(),
3592 inner_future,
3593 )
3594 }
3595
3596 /// Typed version of schedule_wait.
3597 pub fn schedule_wait_typed<T: serde::de::DeserializeOwned + Send + 'static>(
3598 &self,
3599 name: impl Into<String>,
3600 ) -> DurableFuture<T> {
3601 self.schedule_wait(name)
3602 .map(|s| crate::_typed_codec::Json::decode::<T>(&s).expect("decode"))
3603 }
3604
3605 /// Dequeue the next message from a named queue (FIFO mailbox semantics).
3606 ///
3607 /// Unlike `schedule_wait`, queued events use FIFO matching:
3608 /// - No positional pairing — any unresolved subscription gets the first unmatched arrival
3609 /// - Cancelled subscriptions are skipped (don't consume arrivals)
3610 /// - Events that arrive before a subscription are buffered until consumed
3611 /// - Events survive `continue_as_new` boundaries (carried forward)
3612 ///
3613 /// The caller enqueues messages with [`Client::enqueue_event`].
3614 pub fn dequeue_event(&self, queue: impl Into<String>) -> DurableFuture<String> {
3615 let name: String = queue.into();
3616
3617 let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
3618
3619 let token = inner.emit_action(Action::DequeueEvent {
3620 scheduling_event_id: 0,
3621 name: name.clone(),
3622 });
3623 drop(inner);
3624
3625 let ctx = self.clone();
3626 let inner_future = std::future::poll_fn(move |_cx| {
3627 let mut inner = ctx.inner.lock().expect("Mutex should not be poisoned");
3628 if let Some(bound_id) = inner.get_bound_schedule_id(token)
3629 && let Some(data) = inner.get_queue_message(bound_id)
3630 {
3631 return Poll::Ready(data);
3632 }
3633 Poll::Pending
3634 });
3635
3636 DurableFuture::new(
3637 token,
3638 ScheduleKind::QueueDequeue { event_name: name },
3639 self.clone(),
3640 inner_future,
3641 )
3642 }
3643
3644 /// Typed version of [`Self::dequeue_event`]. Deserializes the message payload as `T`.
3645 pub fn dequeue_event_typed<T: serde::de::DeserializeOwned>(
3646 &self,
3647 queue: impl Into<String>,
3648 ) -> impl Future<Output = T> {
3649 let fut = self.dequeue_event(queue);
3650 async move {
3651 let s = fut.await;
3652 crate::_typed_codec::Json::decode::<T>(&s).expect("decode")
3653 }
3654 }
3655
3656 /// Subscribe to a persistent external event (mailbox semantics).
3657 ///
3658 /// Prefer [`Self::dequeue_event`] — this is a deprecated alias.
3659 #[deprecated(note = "Use dequeue_event() instead")]
3660 pub fn schedule_wait_persistent(&self, name: impl Into<String>) -> DurableFuture<String> {
3661 self.dequeue_event(name)
3662 }
3663
3664 /// Typed version of schedule_wait_persistent.
3665 ///
3666 /// Prefer [`Self::dequeue_event_typed`] — this is a deprecated alias.
3667 #[deprecated(note = "Use dequeue_event_typed() instead")]
3668 pub fn schedule_wait_persistent_typed<T: serde::de::DeserializeOwned>(
3669 &self,
3670 name: impl Into<String>,
3671 ) -> impl Future<Output = T> {
3672 self.dequeue_event_typed(name)
3673 }
3674
3675 /// V2: Subscribe to an external event with topic-based pub/sub matching.
3676 ///
3677 /// Same semantics as `schedule_wait`, but matches on both `name` AND `topic`.
3678 /// Feature-gated for replay engine extensibility verification.
3679 #[cfg(feature = "replay-version-test")]
3680 pub fn schedule_wait2(&self, name: impl Into<String>, topic: impl Into<String>) -> DurableFuture<String> {
3681 let name: String = name.into();
3682 let topic: String = topic.into();
3683
3684 let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
3685
3686 let token = inner.emit_action(Action::WaitExternal2 {
3687 scheduling_event_id: 0,
3688 name: name.clone(),
3689 topic: topic.clone(),
3690 });
3691 drop(inner);
3692
3693 let ctx = self.clone();
3694 let inner_future = std::future::poll_fn(move |_cx| {
3695 let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
3696 if let Some(bound_id) = inner.get_bound_schedule_id(token)
3697 && let Some(data) = inner.get_external_event2(bound_id)
3698 {
3699 return Poll::Ready(data.clone());
3700 }
3701 Poll::Pending
3702 });
3703
3704 DurableFuture::new(
3705 token,
3706 ScheduleKind::ExternalWait { event_name: name },
3707 self.clone(),
3708 inner_future,
3709 )
3710 }
3711
3712 /// Schedule a sub-orchestration and return a cancellation-aware future.
3713 ///
3714 /// The child instance ID is auto-generated from the event ID with a parent prefix.
3715 ///
3716 /// Returns a [`DurableFuture`] that supports cancellation on drop. If the future
3717 /// is dropped without completing, a `CancelInstance` work item will be enqueued
3718 /// for the child orchestration.
3719 pub fn schedule_sub_orchestration(
3720 &self,
3721 name: impl Into<String>,
3722 input: impl Into<String>,
3723 ) -> DurableFuture<Result<String, String>> {
3724 self.schedule_sub_orchestration_versioned_with_id_internal(name, None, None, input)
3725 }
3726
3727 /// Schedule a sub-orchestration with an explicit instance ID.
3728 ///
3729 /// The provided `instance` value is used exactly as the child instance ID,
3730 /// without any parent prefix. Use this when you need to control the exact
3731 /// instance ID for the sub-orchestration.
3732 ///
3733 /// For auto-generated instance IDs, use [`schedule_sub_orchestration`] instead.
3734 pub fn schedule_sub_orchestration_with_id(
3735 &self,
3736 name: impl Into<String>,
3737 instance: impl Into<String>,
3738 input: impl Into<String>,
3739 ) -> DurableFuture<Result<String, String>> {
3740 self.schedule_sub_orchestration_versioned_with_id_internal(name, None, Some(instance.into()), input)
3741 }
3742
3743 /// Schedule a versioned sub-orchestration.
3744 ///
3745 /// If `version` is `Some`, that specific version is used.
3746 /// If `version` is `None`, the registry's policy (e.g., Latest) is used.
3747 pub fn schedule_sub_orchestration_versioned(
3748 &self,
3749 name: impl Into<String>,
3750 version: Option<String>,
3751 input: impl Into<String>,
3752 ) -> DurableFuture<Result<String, String>> {
3753 self.schedule_sub_orchestration_versioned_with_id_internal(name, version, None, input)
3754 }
3755
3756 /// Schedule a versioned sub-orchestration with an explicit instance ID.
3757 ///
3758 /// The provided `instance` value is used exactly as the child instance ID,
3759 /// without any parent prefix.
3760 ///
3761 /// Returns a [`DurableFuture`] that supports cancellation on drop. If the future
3762 /// is dropped without completing, a `CancelInstance` work item will be enqueued
3763 /// for the child orchestration.
3764 pub fn schedule_sub_orchestration_versioned_with_id(
3765 &self,
3766 name: impl Into<String>,
3767 version: Option<String>,
3768 instance: impl Into<String>,
3769 input: impl Into<String>,
3770 ) -> DurableFuture<Result<String, String>> {
3771 self.schedule_sub_orchestration_versioned_with_id_internal(name, version, Some(instance.into()), input)
3772 }
3773
3774 /// Internal implementation for sub-orchestration scheduling.
3775 ///
3776 /// If `instance` is `Some`, it's an explicit ID (no parent prefix).
3777 /// If `instance` is `None`, auto-generate from event ID (with parent prefix).
3778 fn schedule_sub_orchestration_versioned_with_id_internal(
3779 &self,
3780 name: impl Into<String>,
3781 version: Option<String>,
3782 instance: Option<String>,
3783 input: impl Into<String>,
3784 ) -> DurableFuture<Result<String, String>> {
3785 let name: String = name.into();
3786 let input: String = input.into();
3787
3788 let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
3789
3790 // For explicit instance IDs, use them as-is (no parent prefix will be added).
3791 // For auto-generated, use placeholder that will be replaced with SUB_ORCH_AUTO_PREFIX + event_id
3792 // and parent prefix will be added.
3793 let action_instance = match &instance {
3794 Some(explicit_id) => explicit_id.clone(),
3795 None => format!("{}{}", SUB_ORCH_PENDING_PREFIX, inner.next_token + 1),
3796 };
3797 let token = inner.emit_action(Action::StartSubOrchestration {
3798 scheduling_event_id: 0,
3799 name: name.clone(),
3800 version,
3801 instance: action_instance.clone(),
3802 input: input.clone(),
3803 });
3804 drop(inner);
3805
3806 let ctx = self.clone();
3807 let inner_future = std::future::poll_fn(move |_cx| {
3808 let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
3809 if let Some(result) = inner.get_result(token) {
3810 match result {
3811 CompletionResult::SubOrchOk(s) => Poll::Ready(Ok(s.clone())),
3812 CompletionResult::SubOrchErr(e) => Poll::Ready(Err(e.clone())),
3813 _ => Poll::Pending,
3814 }
3815 } else {
3816 Poll::Pending
3817 }
3818 });
3819
3820 // For cancellation, we store the token. The consumption path will look up
3821 // the resolved instance ID from the sub_orchestration_instances mapping.
3822 DurableFuture::new(
3823 token,
3824 ScheduleKind::SubOrchestration { token },
3825 self.clone(),
3826 inner_future,
3827 )
3828 }
3829
3830 /// Typed version of schedule_sub_orchestration.
3831 ///
3832 /// # Errors
3833 ///
3834 /// Returns an error if the sub-orchestration fails or if the output cannot be deserialized.
3835 pub fn schedule_sub_orchestration_typed<In: serde::Serialize, Out: serde::de::DeserializeOwned + Send + 'static>(
3836 &self,
3837 name: impl Into<String>,
3838 input: &In,
3839 ) -> DurableFuture<Result<Out, String>> {
3840 let payload = crate::_typed_codec::Json::encode(input).expect("encode");
3841 self.schedule_sub_orchestration(name, payload)
3842 .map(|r| r.and_then(|s| crate::_typed_codec::Json::decode::<Out>(&s)))
3843 }
3844
3845 /// Typed version of schedule_sub_orchestration_with_id.
3846 ///
3847 /// # Errors
3848 ///
3849 /// Returns an error if the sub-orchestration fails or if the output cannot be deserialized.
3850 pub fn schedule_sub_orchestration_with_id_typed<
3851 In: serde::Serialize,
3852 Out: serde::de::DeserializeOwned + Send + 'static,
3853 >(
3854 &self,
3855 name: impl Into<String>,
3856 instance: impl Into<String>,
3857 input: &In,
3858 ) -> DurableFuture<Result<Out, String>> {
3859 let payload = crate::_typed_codec::Json::encode(input).expect("encode");
3860 self.schedule_sub_orchestration_with_id(name, instance, payload)
3861 .map(|r| r.and_then(|s| crate::_typed_codec::Json::decode::<Out>(&s)))
3862 }
3863
3864 /// Await all futures concurrently using `futures::future::join_all`.
3865 /// Works with any `Future` type.
3866 pub async fn join<T, F>(&self, futures: Vec<F>) -> Vec<T>
3867 where
3868 F: Future<Output = T>,
3869 {
3870 ::futures::future::join_all(futures).await
3871 }
3872
3873 /// Simplified join for exactly 2 futures (convenience method).
3874 pub async fn join2<T1, T2, F1, F2>(&self, f1: F1, f2: F2) -> (T1, T2)
3875 where
3876 F1: Future<Output = T1>,
3877 F2: Future<Output = T2>,
3878 {
3879 ::futures::future::join(f1, f2).await
3880 }
3881
3882 /// Simplified join for exactly 3 futures (convenience method).
3883 pub async fn join3<T1, T2, T3, F1, F2, F3>(&self, f1: F1, f2: F2, f3: F3) -> (T1, T2, T3)
3884 where
3885 F1: Future<Output = T1>,
3886 F2: Future<Output = T2>,
3887 F3: Future<Output = T3>,
3888 {
3889 ::futures::future::join3(f1, f2, f3).await
3890 }
3891
3892 /// Simplified select over 2 futures: returns the result of whichever completes first.
3893 /// Select over 2 futures with potentially different output types.
3894 ///
3895 /// Returns `Either2::First(result)` if first future wins, `Either2::Second(result)` if second wins.
3896 /// Uses futures::select_biased! for determinism (first branch polled first).
3897 ///
3898 /// # Example: Activity with timeout
3899 /// ```rust,no_run
3900 /// # use duroxide::{OrchestrationContext, Either2};
3901 /// # use std::time::Duration;
3902 /// # async fn example(ctx: OrchestrationContext) -> Result<String, String> {
3903 /// let work = ctx.schedule_activity("SlowWork", "input");
3904 /// let timeout = ctx.schedule_timer(Duration::from_secs(30));
3905 ///
3906 /// match ctx.select2(work, timeout).await {
3907 /// Either2::First(result) => result,
3908 /// Either2::Second(()) => Err("Operation timed out".to_string()),
3909 /// }
3910 /// # }
3911 /// ```
3912 pub async fn select2<T1, T2, F1, F2>(&self, f1: F1, f2: F2) -> Either2<T1, T2>
3913 where
3914 F1: Future<Output = T1>,
3915 F2: Future<Output = T2>,
3916 {
3917 use ::futures::FutureExt;
3918 let mut f1 = std::pin::pin!(f1.fuse());
3919 let mut f2 = std::pin::pin!(f2.fuse());
3920 ::futures::select_biased! {
3921 result = f1 => Either2::First(result),
3922 result = f2 => Either2::Second(result),
3923 }
3924 }
3925
3926 /// Select over 3 futures with potentially different output types.
3927 ///
3928 /// Returns `Either3::First/Second/Third(result)` depending on which future completes first.
3929 /// Uses futures::select_biased! for determinism (earlier branches polled first).
3930 pub async fn select3<T1, T2, T3, F1, F2, F3>(&self, f1: F1, f2: F2, f3: F3) -> Either3<T1, T2, T3>
3931 where
3932 F1: Future<Output = T1>,
3933 F2: Future<Output = T2>,
3934 F3: Future<Output = T3>,
3935 {
3936 use ::futures::FutureExt;
3937 let mut f1 = std::pin::pin!(f1.fuse());
3938 let mut f2 = std::pin::pin!(f2.fuse());
3939 let mut f3 = std::pin::pin!(f3.fuse());
3940 ::futures::select_biased! {
3941 result = f1 => Either3::First(result),
3942 result = f2 => Either3::Second(result),
3943 result = f3 => Either3::Third(result),
3944 }
3945 }
3946}