Skip to main content

duroxide/providers/
mod.rs

1use crate::Event;
2use std::any::Any;
3use std::collections::HashSet;
4use std::time::Duration;
5
6pub mod error;
7pub use error::ProviderError;
8
9/// Maximum number of tags a worker can subscribe to.
10pub use crate::runtime::limits::MAX_WORKER_TAGS;
11
12/// Filter for which activity tags a worker will process.
13///
14/// Activity tags route work items to specialized workers. A tag is an
15/// opaque string label set at schedule time via `.with_tag()`. Workers
16/// declare which tags they accept via [`crate::RuntimeOptions`]`.worker_tag_filter`.
17///
18/// # Variants
19///
20/// - `DefaultOnly` (default) — process only activities with no tag.
21/// - `Tags(["gpu"])` — process only activities tagged `"gpu"`.
22/// - `DefaultAnd(["gpu"])` — process activities with no tag OR tagged `"gpu"`.
23/// - `Any` — process all activities regardless of tag.
24/// - `None` — don't process any activities (orchestrator-only mode).
25#[derive(Debug, Clone, PartialEq, Eq, Default)]
26pub enum TagFilter {
27    /// Process only activities with no tag (default).
28    #[default]
29    DefaultOnly,
30
31    /// Process only activities with the specified tags (NOT default).
32    /// Limited to [`MAX_WORKER_TAGS`] (5) tags.
33    Tags(HashSet<String>),
34
35    /// Process activities with no tag AND the specified tags.
36    /// Limited to [`MAX_WORKER_TAGS`] (5) tags.
37    DefaultAnd(HashSet<String>),
38
39    /// Process all activities regardless of tag (generalist worker).
40    Any,
41
42    /// Don't process any activities (orchestrator-only mode).
43    None,
44}
45
46impl TagFilter {
47    /// Create a filter for specific tags only.
48    ///
49    /// # Panics
50    /// Panics if more than [`MAX_WORKER_TAGS`] (5) tags are provided.
51    pub fn tags<I, S>(tags: I) -> Self
52    where
53        I: IntoIterator<Item = S>,
54        S: Into<String>,
55    {
56        let set: HashSet<String> = tags.into_iter().map(Into::into).collect();
57        assert!(
58            !set.is_empty(),
59            "TagFilter::tags() requires at least one tag; use TagFilter::None for no activities"
60        );
61        assert!(
62            set.len() <= MAX_WORKER_TAGS,
63            "Worker can subscribe to at most {} tags, got {}",
64            MAX_WORKER_TAGS,
65            set.len()
66        );
67        TagFilter::Tags(set)
68    }
69
70    /// Create a filter for default plus specific tags.
71    ///
72    /// # Panics
73    /// Panics if more than [`MAX_WORKER_TAGS`] (5) tags are provided.
74    pub fn default_and<I, S>(tags: I) -> Self
75    where
76        I: IntoIterator<Item = S>,
77        S: Into<String>,
78    {
79        let set: HashSet<String> = tags.into_iter().map(Into::into).collect();
80        assert!(
81            !set.is_empty(),
82            "TagFilter::default_and() requires at least one tag; use TagFilter::DefaultOnly for untagged only"
83        );
84        assert!(
85            set.len() <= MAX_WORKER_TAGS,
86            "Worker can subscribe to at most {} tags, got {}",
87            MAX_WORKER_TAGS,
88            set.len()
89        );
90        TagFilter::DefaultAnd(set)
91    }
92
93    /// Create a filter for only untagged activities.
94    pub fn default_only() -> Self {
95        TagFilter::DefaultOnly
96    }
97
98    /// Create a filter that processes no activities.
99    pub fn none() -> Self {
100        TagFilter::None
101    }
102
103    /// Create a filter that processes all activities regardless of tag.
104    pub fn any() -> Self {
105        TagFilter::Any
106    }
107
108    /// Check if an activity with the given tag matches this filter.
109    pub fn matches(&self, tag: Option<&str>) -> bool {
110        match (self, tag) {
111            (TagFilter::Any, _) => true,
112            (TagFilter::None, _) => false,
113            (TagFilter::DefaultOnly, Option::None) => true,
114            (TagFilter::DefaultOnly, Some(_)) => false,
115            (TagFilter::Tags(_), Option::None) => false,
116            (TagFilter::Tags(set), Some(t)) => set.contains(t),
117            (TagFilter::DefaultAnd(_), Option::None) => true,
118            (TagFilter::DefaultAnd(set), Some(t)) => set.contains(t),
119        }
120    }
121}
122
123/// Returns the current build version of the duroxide crate.
124pub fn current_build_version() -> semver::Version {
125    semver::Version::parse(env!("CARGO_PKG_VERSION")).expect("CARGO_PKG_VERSION must be valid semver")
126}
127
128/// An inclusive version range: [min, max].
129///
130/// Both bounds are inclusive. For example, `SemverRange { min: (0,0,0), max: (1,5,0) }`
131/// matches any version `v` where `0.0.0 <= v <= 1.5.0`.
132#[derive(Debug, Clone, PartialEq, Eq)]
133pub struct SemverRange {
134    pub min: semver::Version,
135    pub max: semver::Version,
136}
137
138impl SemverRange {
139    pub fn new(min: semver::Version, max: semver::Version) -> Self {
140        Self { min, max }
141    }
142
143    /// Check if a version falls within this range (inclusive on both ends).
144    pub fn contains(&self, version: &semver::Version) -> bool {
145        version >= &self.min && version <= &self.max
146    }
147
148    /// Default range: `>=0.0.0, <=CURRENT_BUILD_VERSION`.
149    ///
150    /// This means the runtime can replay any execution pinned at or below its own
151    /// build version. Replay engines are backward-compatible.
152    pub fn default_for_current_build() -> Self {
153        Self {
154            min: semver::Version::new(0, 0, 0),
155            max: current_build_version(),
156        }
157    }
158}
159
160/// Capability filter passed by the orchestration dispatcher to the provider.
161///
162/// The provider uses this to return only orchestration items whose pinned
163/// `duroxide_version` falls within one of the supported ranges.
164///
165/// If no filter is supplied (`None`), the provider returns any available item
166/// (legacy behavior, useful for admin tooling).
167#[derive(Debug, Clone)]
168pub struct DispatcherCapabilityFilter {
169    /// Supported duroxide version ranges. An execution is compatible if its
170    /// pinned duroxide version falls within ANY of these ranges.
171    ///
172    /// Phase 1 typically has a single range: `[>=0.0.0, <=CURRENT_BUILD_VERSION]`.
173    pub supported_duroxide_versions: Vec<SemverRange>,
174}
175
176impl DispatcherCapabilityFilter {
177    /// Check if a pinned version is compatible with this filter.
178    pub fn is_compatible(&self, version: &semver::Version) -> bool {
179        self.supported_duroxide_versions.iter().any(|r| r.contains(version))
180    }
181
182    /// Build the default filter for the current build version.
183    pub fn default_for_current_build() -> Self {
184        Self {
185            supported_duroxide_versions: vec![SemverRange::default_for_current_build()],
186        }
187    }
188}
189
190/// Identity of an activity for cancellation purposes.
191///
192/// Used by the runtime to specify which activities should be cancelled
193/// during an orchestration turn (e.g., select losers, orchestration termination).
194#[derive(Debug, Clone, PartialEq, Eq)]
195pub struct ScheduledActivityIdentifier {
196    /// Instance ID of the orchestration
197    pub instance: String,
198    /// Execution ID within the instance
199    pub execution_id: u64,
200    /// Activity ID (the event_id of the ActivityScheduled event)
201    pub activity_id: u64,
202}
203
204/// Configuration for session-aware work item fetching.
205///
206/// When passed to [`Provider::fetch_work_item`], enables session routing:
207/// - Non-session items (`session_id IS NULL`) are always eligible
208/// - Items for sessions already owned by `owner_id` are eligible
209/// - Items for unowned (claimable) sessions are eligible
210///
211/// When `fetch_work_item` is called with `session: None`, only non-session
212/// items are returned.
213#[derive(Debug, Clone)]
214pub struct SessionFetchConfig {
215    /// Identity tag for session ownership. All worker slots sharing this
216    /// value are treated as a single owner for session affinity purposes.
217    /// Typically set to `RuntimeOptions::worker_node_id` (process-level)
218    /// or a per-slot UUID (ephemeral).
219    pub owner_id: String,
220    /// How long to hold the session lock when claiming a new session.
221    pub lock_timeout: Duration,
222}
223
224/// Orchestration item containing all data needed to process an instance atomically.
225///
226/// This represents a locked batch of work for a single orchestration instance.
227/// The provider must guarantee that no other process can modify this instance
228/// until `ack_orchestration_item()` or `abandon_orchestration_item()` is called.
229///
230/// # Fields
231///
232/// * `instance` - Unique identifier for the orchestration instance (e.g., "order-123")
233/// * `orchestration_name` - Name of the orchestration being executed (e.g., "ProcessOrder")
234/// * `execution_id` - Current execution ID (starts at 1, increments with ContinueAsNew)
235/// * `version` - Orchestration version string (e.g., "1.0.0")
236/// * `history` - Complete event history for the current execution (ordered by event_id)
237/// * `messages` - Batch of WorkItems to process (may include Start, completions, external events)
238/// * `lock_token` - Unique token that must be used to ack or abandon this batch
239/// * `history_error` - If set, history deserialization failed; `history` may be empty or partial
240///
241/// # Implementation Notes
242///
243/// - The provider must ensure `history` contains ALL events for `execution_id` in order
244/// - For multi-execution instances (ContinueAsNew), only return the LATEST execution's history
245/// - The `lock_token` must be unique and prevent concurrent processing of the same instance
246/// - All messages in the batch should belong to the same instance
247/// - The lock should expire after a timeout (e.g., 30s) to handle worker crashes
248/// - If history deserialization fails, set `history_error` with the error message and return
249///   `Ok(Some(...))` instead of `Err(...)`. This allows the runtime to reach the poison check
250///   and terminate the orchestration after `max_attempts`.
251///
252/// # Example from SQLite Provider
253///
254/// ```text
255/// // 1. Find available instance (check instance_locks for active locks)
256/// SELECT q.instance_id FROM orchestrator_queue q
257/// LEFT JOIN instance_locks il ON q.instance_id = il.instance_id
258/// WHERE q.visible_at <= now()
259///   AND (il.instance_id IS NULL OR il.locked_until <= now())
260/// ORDER BY q.id LIMIT 1
261///
262/// // 2. Atomically acquire instance-level lock
263/// INSERT INTO instance_locks (instance_id, lock_token, locked_until, locked_at)
264/// VALUES (?, ?, ?, ?)
265/// ON CONFLICT(instance_id) DO UPDATE
266/// SET lock_token = excluded.lock_token, locked_until = excluded.locked_until
267/// WHERE locked_until <= excluded.locked_at
268///
269/// // 3. Lock all visible messages for that instance
270/// UPDATE orchestrator_queue SET lock_token = ?, locked_until = ?
271/// WHERE instance_id = ? AND visible_at <= now()
272///
273/// // 4. Load instance metadata
274/// SELECT orchestration_name, orchestration_version, current_execution_id
275/// FROM instances WHERE instance_id = ?
276///
277/// // 5. Load history for current execution
278/// SELECT event_data FROM history
279/// WHERE instance_id = ? AND execution_id = ?
280/// ORDER BY event_id
281///
282/// // 6. Return OrchestrationItem with lock_token
283/// ```
284#[derive(Debug, Clone)]
285pub struct OrchestrationItem {
286    pub instance: String,
287    pub orchestration_name: String,
288    pub execution_id: u64,
289    pub version: String,
290    pub history: Vec<Event>,
291    pub messages: Vec<WorkItem>,
292    /// If set, history deserialization failed. `history` may be empty or partial.
293    ///
294    /// The runtime treats this as a terminal error: it abandons the item with backoff,
295    /// and once `attempt_count > max_attempts`, poisons the orchestration.
296    ///
297    /// Providers SHOULD return `Ok(Some(...))` with this field set instead of
298    /// `Err(ProviderError::permanent(...))` when deserialization fails after acquiring
299    /// a lock. This ensures the lock lifecycle stays clean (Ok = lock held, Err = no lock).
300    pub history_error: Option<String>,
301}
302
303/// Execution metadata computed by the runtime to be persisted by the provider.
304///
305/// The runtime inspects the `history_delta` and `orchestrator_items` to compute this metadata.
306/// **Providers must NOT inspect event contents themselves** - they should blindly store this metadata.
307///
308/// This design ensures the provider remains a pure storage abstraction without orchestration knowledge.
309///
310/// # Fields
311///
312/// * `status` - New execution status: `Some("Completed")`, `Some("Failed")`, `Some("ContinuedAsNew")`, or `None`
313///   - `None` means the execution is still running (no status update needed)
314///   - Provider should update the stored execution status when `Some(...)`
315///
316/// * `output` - The terminal value to store (depends on status):
317///   - `Completed`: The orchestration's successful result
318///   - `Failed`: The error message
319///   - `ContinuedAsNew`: The input that was passed to continue_as_new()
320///   - `None`: No output (execution still running)
321///
322/// # Example Usage in Provider
323///
324/// ```text
325/// async fn ack_orchestration_item(..., metadata: ExecutionMetadata) {
326///     // Store metadata without understanding what it means
327///     if let Some(status) = &metadata.status {
328///         UPDATE executions SET status = ?, output = ? WHERE instance_id = ? AND execution_id = ?
329///     }
330/// }
331/// ```
332///
333/// # ContinueAsNew Handling
334///
335/// ContinueAsNew is handled entirely by the runtime. Providers must NOT try to
336/// synthesize new executions in `fetch_orchestration_item`.
337///
338/// Runtime behavior:
339/// - When an orchestration calls `continue_as_new(input)`, the runtime stamps
340///   `OrchestrationContinuedAsNew` into the current execution's history and enqueues
341///   a `WorkItem::ContinueAsNew`.
342/// - When processing that work item, the runtime starts a fresh execution with
343///   `execution_id = current + 1`, passes an empty `existing_history`, and stamps an
344///   `OrchestrationStarted { event_id: 1, .. }` event for the new execution.
345/// - The runtime then calls `ack_orchestration_item(lock_token, execution_id, ...)` with
346///   the explicit execution id to persist history and queue operations.
347///
348/// Provider responsibilities:
349/// - Use the explicit `execution_id` given to `ack_orchestration_item`.
350/// - Idempotently create the execution record/entry if it doesn't already exist.
351/// - Update the instance's “current execution” pointer to be at least `execution_id`.
352/// - Append all `history_delta` events to the specified `execution_id`.
353/// - Update `executions.status, executions.output` from `ExecutionMetadata` when provided.
354#[derive(Debug, Clone, Default)]
355pub struct ExecutionMetadata {
356    /// New status for the execution ('Completed', 'Failed', 'ContinuedAsNew', or None to keep current)
357    pub status: Option<String>,
358    /// Output/error/input to store (for Completed/Failed/ContinuedAsNew)
359    pub output: Option<String>,
360    /// Orchestration name (for new instances or updates)
361    pub orchestration_name: Option<String>,
362    /// Orchestration version (for new instances or updates)
363    pub orchestration_version: Option<String>,
364    /// Parent instance ID (for sub-orchestrations, used for cascading delete)
365    pub parent_instance_id: Option<String>,
366    /// Pinned duroxide version for this execution (set from OrchestrationStarted event).
367    ///
368    /// The provider stores this alongside the execution record for efficient
369    /// capability filtering.
370    ///
371    /// - `Some(v)`: Store `v` as the execution's pinned version. The provider should
372    ///   update the stored version unconditionally when provided.
373    /// - `None`: No version update requested. The provider should not modify the
374    ///   existing stored value.
375    ///
376    /// The **runtime** guarantees this is only set on the first turn of a new execution
377    /// (when `OrchestrationStarted` is in the history delta), enforced by a `debug_assert`
378    /// in the orchestration dispatcher. The provider does not need to enforce write-once
379    /// semantics — it simply stores what it's told.
380    pub pinned_duroxide_version: Option<semver::Version>,
381}
382
383/// Provider-backed work queue items the runtime consumes continually.
384///
385/// WorkItems represent messages that flow through provider-managed queues.
386/// They are serialized/deserialized using serde_json for storage.
387///
388/// # Queue Routing
389///
390/// Different WorkItem types go to different queues:
391/// - `StartOrchestration, ContinueAsNew, ActivityCompleted/Failed, TimerFired, ExternalRaised, SubOrchCompleted/Failed, CancelInstance` → **Orchestrator queue**
392///   - TimerFired items use `visible_at = fire_at_ms` for delayed visibility
393/// - `ActivityExecute` → **Worker queue**
394///
395/// # Instance ID Extraction
396///
397/// Most WorkItems have an `instance` field. Sub-orchestration completions use `parent_instance`.
398/// Providers need to extract the instance ID for routing. SQLite example:
399///
400/// ```ignore
401/// let instance = match &item {
402///     WorkItem::StartOrchestration { instance, .. } |
403///     WorkItem::ActivityCompleted { instance, .. } |
404///     WorkItem::CancelInstance { instance, .. } => instance,
405///     WorkItem::SubOrchCompleted { parent_instance, .. } => parent_instance,
406///     _ => return Err("unexpected item type"),
407/// };
408/// ```
409///
410/// # Execution ID Tracking
411///
412/// WorkItems for activities, timers, and sub-orchestrations include `execution_id`.
413/// This allows providers to route completions to the correct execution when ContinueAsNew creates multiple executions.
414///
415/// **Critical:** Completions with mismatched execution_id should still be enqueued (the runtime filters them).
416#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
417pub enum WorkItem {
418    /// Start a new orchestration instance
419    /// - Instance metadata is created by runtime via ack_orchestration_item metadata (not on enqueue)
420    /// - `execution_id`: The execution ID for this start (usually INITIAL_EXECUTION_ID=1)
421    /// - `version`: None means runtime will resolve from registry
422    StartOrchestration {
423        instance: String,
424        orchestration: String,
425        input: String,
426        version: Option<String>,
427        parent_instance: Option<String>,
428        parent_id: Option<u64>,
429        execution_id: u64,
430    },
431
432    /// Execute an activity (goes to worker queue)
433    /// - `id`: event_id from ActivityScheduled (for correlation)
434    /// - Worker will enqueue ActivityCompleted or ActivityFailed
435    ActivityExecute {
436        instance: String,
437        execution_id: u64,
438        id: u64, // scheduling_event_id from ActivityScheduled
439        name: String,
440        input: String,
441        #[serde(skip_serializing_if = "Option::is_none")]
442        #[serde(default)]
443        session_id: Option<String>,
444        /// Routing tag for directing this activity to specific workers.
445        /// `None` means default (untagged) queue.
446        #[serde(skip_serializing_if = "Option::is_none")]
447        #[serde(default)]
448        tag: Option<String>,
449    },
450
451    /// Activity completed successfully (goes to orchestrator queue)
452    /// - `id`: source_event_id referencing the ActivityScheduled event
453    /// - Triggers next orchestration turn
454    ActivityCompleted {
455        instance: String,
456        execution_id: u64,
457        id: u64, // source_event_id referencing ActivityScheduled
458        result: String,
459    },
460
461    /// Activity failed with error (goes to orchestrator queue)
462    /// - `id`: source_event_id referencing the ActivityScheduled event
463    /// - Triggers next orchestration turn
464    ActivityFailed {
465        instance: String,
466        execution_id: u64,
467        id: u64, // source_event_id referencing ActivityScheduled
468        details: crate::ErrorDetails,
469    },
470
471    /// Timer fired (goes to orchestrator queue with delayed visibility)
472    /// - Created directly by runtime when timer is scheduled
473    /// - Enqueued to orchestrator queue with `visible_at = fire_at_ms`
474    /// - Orchestrator dispatcher processes when `visible_at <= now()`
475    TimerFired {
476        instance: String,
477        execution_id: u64,
478        id: u64, // source_event_id referencing TimerCreated
479        fire_at_ms: u64,
480    },
481
482    /// External event raised (goes to orchestrator queue)
483    /// - Matched by `name` to ExternalSubscribed events
484    /// - `data`: JSON payload from external system
485    ExternalRaised {
486        instance: String,
487        name: String,
488        data: String,
489    },
490
491    /// Sub-orchestration completed (goes to parent's orchestrator queue)
492    /// - Routes to `parent_instance`, not the child
493    /// - `parent_id`: event_id from parent's SubOrchestrationScheduled event
494    SubOrchCompleted {
495        parent_instance: String,
496        parent_execution_id: u64,
497        parent_id: u64, // source_event_id referencing SubOrchestrationScheduled
498        result: String,
499    },
500
501    /// Sub-orchestration failed (goes to parent's orchestration queue)
502    /// - Routes to `parent_instance`, not the child
503    /// - `parent_id`: event_id from parent's SubOrchestrationScheduled event
504    SubOrchFailed {
505        parent_instance: String,
506        parent_execution_id: u64,
507        parent_id: u64, // source_event_id referencing SubOrchestrationScheduled
508        details: crate::ErrorDetails,
509    },
510
511    /// Request orchestration cancellation (goes to orchestrator queue)
512    /// - Runtime will append OrchestrationCancelRequested event
513    /// - Eventually results in OrchestrationFailed with "canceled: {reason}"
514    CancelInstance { instance: String, reason: String },
515
516    /// Continue orchestration as new execution (goes to orchestrator queue)
517    /// - Signals the end of current execution and start of next
518    /// - Runtime will create Event::OrchestrationStarted for next execution
519    /// - Provider should create new execution (see ExecutionMetadata.create_next_execution)
520    ContinueAsNew {
521        instance: String,
522        orchestration: String,
523        input: String,
524        version: Option<String>,
525        /// Persistent events carried forward from the previous execution.
526        /// These are seeded into the new execution's history before any new
527        /// externally-raised events, preserving FIFO order across CAN boundaries.
528        #[serde(default)]
529        carry_forward_events: Vec<(String, String)>,
530        /// Custom status accumulated from the previous execution, if any.
531        /// Carried forward so `get_custom_status()` works immediately in the new execution.
532        #[serde(skip_serializing_if = "Option::is_none")]
533        #[serde(default)]
534        initial_custom_status: Option<String>,
535    },
536
537    /// Persistent external event raised (goes to orchestrator queue).
538    /// Matched by `name` using FIFO mailbox semantics — events stick around until consumed.
539    QueueMessage {
540        instance: String,
541        name: String,
542        data: String,
543    },
544
545    /// V2: External event with topic-based pub/sub matching (goes to orchestrator queue).
546    ///
547    /// Matched by `name` AND `topic` to ExternalSubscribed2 events.
548    /// Feature-gated for replay engine extensibility verification.
549    #[cfg(feature = "replay-version-test")]
550    ExternalRaised2 {
551        instance: String,
552        name: String,
553        topic: String,
554        data: String,
555    },
556}
557
558/// Provider abstraction for durable orchestration execution (persistence + queues).
559///
560/// # Overview
561///
562/// A Provider is responsible for:
563/// 1. **Persistence**: Storing orchestration history (append-only event log)
564/// 2. **Queueing**: Managing two work queues (orchestrator, worker)
565/// 3. **Locking**: Implementing peek-lock semantics to prevent concurrent processing
566/// 4. **Atomicity**: Ensuring transactional consistency across operations
567///
568/// # Architecture: Two-Queue Model
569///
570/// ```text
571/// ┌─────────────────────────────────────────────────────────────┐
572/// │                     RUNTIME (2 Dispatchers)                 │
573/// ├─────────────────────────────────────────────────────────────┤
574/// │                                                             │
575/// │  [Orchestration Dispatcher]  ←─── fetch_orchestration_item │
576/// │           ↓                                                 │
577/// │      Process Turn                                           │
578/// │           ↓                                                 │
579/// │  ack_orchestration_item ────┬──► Orchestrator Queue        │
580/// │                             │   (TimerFired with delayed   │
581/// │                             │    visibility for timers)   │
582/// │                             └──► Worker Queue              │
583/// │                                                             │
584/// │  [Worker Dispatcher]  ←────────── fetch_work_item │
585/// │       Execute Activity                                      │
586/// │           ↓                                                 │
587/// │  Completion ────────────────────► Orchestrator Queue        │
588/// │                                                             │
589/// └─────────────────────────────────────────────────────────────┘
590///                           ↕
591///              ┌────────────────────────┐
592///              │   PROVIDER (Storage)   │
593///              │  ┌──────────────────┐  │
594///              │  │ History (Events) │  │
595///              │  ├──────────────────┤  │
596///              │  │ Orch Queue       │  │
597///              │  │ Worker Queue     │  │
598///              │  └──────────────────┘  │
599///              └────────────────────────┘
600/// ```
601///
602/// # Design Principles
603///
604/// **Providers should be storage abstractions, NOT orchestration engines:**
605/// - ✅ Store and retrieve events as opaque data (don't inspect Event contents)
606/// - ✅ Manage queues and locks (generic queue operations)
607/// - ✅ Provide ACID guarantees where possible
608/// - ❌ DON'T interpret orchestration semantics (use ExecutionMetadata from runtime)
609/// - ❌ DON'T create events (runtime creates all events)
610/// - ❌ DON'T make orchestration decisions (runtime decides control flow)
611///
612/// # Multi-Execution Support (ContinueAsNew)
613///
614/// Orchestration instances can have multiple executions (execution_id 1, 2, 3, ...):
615/// - Each execution has its own event history
616/// - `read()` should return the LATEST execution's history
617/// - Provider tracks current_execution_id to know which execution is active
618/// - When metadata.create_next_execution=true, create a new execution record/entry
619///
620/// **Example:**
621/// ```text
622/// Instance "order-123":
623///   Execution 1: [OrchestrationStarted, ActivityScheduled, OrchestrationContinuedAsNew]
624///   Execution 2: [OrchestrationStarted, ActivityScheduled, OrchestrationCompleted]
625///   
626///   read("order-123") → Returns Execution 2's events (latest)
627///   read_with_execution("order-123", 1) → Returns Execution 1's events
628///   latest_execution_id("order-123") → Returns Some(2)
629/// ```
630///
631/// # Concurrency Model
632///
633/// The runtime runs 3 background dispatchers polling your queues:
634/// 1. **Orchestration Dispatcher**: Polls fetch_orchestration_item() continuously
635/// 2. **Work Dispatcher**: Polls fetch_work_item() continuously
636/// 3. **Timer Dispatcher**: Polls dequeue_timer_peek_lock() continuously
637///
638/// **Your implementation must be thread-safe** and support concurrent access from multiple dispatchers.
639///
640/// # Peek-Lock Pattern (Critical)
641///
642/// All dequeue operations use peek-lock semantics:
643/// 1. **Peek**: Select and lock a message (message stays in queue)
644/// 2. **Process**: Runtime processes the locked message
645/// 3. **Ack**: Delete message from queue (success) OR
646/// 4. **Abandon**: Release lock for retry (failure)
647///
648/// Benefits:
649/// - At-least-once delivery (messages survive crashes)
650/// - Automatic retry on worker failure (lock expires)
651/// - Prevents duplicate processing (locked messages invisible to others)
652///
653/// # Transactional Guarantees
654///
655/// **`ack_orchestration_item()` is the atomic boundary:**
656/// - ALL operations in ack must succeed or fail together
657/// - History append + queue enqueues + lock release = atomic
658/// - If commit fails, entire turn is retried
659/// - This ensures exactly-once semantics for orchestration turns
660///
661/// # Queue Message Flow
662///
663/// **Orchestrator Queue:**
664/// - Inputs: StartOrchestration, ActivityCompleted/Failed, TimerFired, ExternalRaised, SubOrchCompleted/Failed, CancelInstance, ContinueAsNew
665/// - Output: Processed by orchestration dispatcher → ack_orchestration_item
666/// - Batching: All messages for an instance processed together
667/// - Timers: TimerFired items are enqueued with `visible_at = fire_at_ms` for delayed visibility
668///
669/// **Worker Queue:**
670/// - Inputs: ActivityExecute (from ack_orchestration_item)
671/// - Output: Processed by work dispatcher → ActivityCompleted/Failed to orch queue
672/// - Batching: One message at a time (activities executed independently)
673///
674/// **Note:** There is no separate timer queue. Timers are handled by enqueuing TimerFired items
675/// directly to the orchestrator queue with delayed visibility (`visible_at` set to `fire_at_ms`).
676/// The orchestrator dispatcher processes them when `visible_at <= now()`.
677///
678/// # Instance Metadata Management
679///
680/// Providers typically maintain metadata about instances:
681/// - instance_id (primary key)
682/// - orchestration_name, orchestration_version
683/// - current_execution_id (for multi-execution support)
684/// - status, output (optional, for quick queries)
685/// - created_at, updated_at timestamps
686///
687/// This metadata is updated via:
688/// - `enqueue_for_orchestrator()` with StartOrchestration
689/// - `ack_orchestration_item()` with history changes
690/// - `ExecutionMetadata` for status/output updates
691///
692/// # Required vs Optional Methods
693///
694/// **REQUIRED** (must implement):
695/// - fetch_orchestration_item, ack_orchestration_item, abandon_orchestration_item
696/// - read, append_with_execution
697/// - enqueue_for_worker, fetch_work_item, ack_work_item
698/// - enqueue_for_orchestrator
699///
700/// **OPTIONAL** (has defaults):
701/// - latest_execution_id, read_with_execution
702/// - list_instances, list_executions
703///
704/// # Recommended Database Schema (SQL Example)
705///
706/// ```sql
707/// -- Instance metadata
708/// CREATE TABLE instances (
709///     instance_id TEXT PRIMARY KEY,
710///     orchestration_name TEXT NOT NULL,
711///     orchestration_version TEXT,  -- NULLable, set by runtime via metadata
712///     current_execution_id INTEGER DEFAULT 1,
713///     created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
714/// );
715///
716/// -- Execution tracking
717/// CREATE TABLE executions (
718///     instance_id TEXT NOT NULL,
719///     execution_id INTEGER NOT NULL,
720///     status TEXT DEFAULT 'Running',
721///     output TEXT,
722///     started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
723///     completed_at TIMESTAMP,
724///     PRIMARY KEY (instance_id, execution_id)
725/// );
726///
727/// -- Event history (append-only)
728/// CREATE TABLE history (
729///     instance_id TEXT NOT NULL,
730///     execution_id INTEGER NOT NULL,
731///     event_id INTEGER NOT NULL,
732///     event_type TEXT NOT NULL,
733///     event_data TEXT NOT NULL,  -- JSON serialized Event
734///     created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
735///     PRIMARY KEY (instance_id, execution_id, event_id)
736/// );
737///
738/// -- Orchestrator queue (peek-lock)
739/// CREATE TABLE orchestrator_queue (
740///     id INTEGER PRIMARY KEY AUTOINCREMENT,
741///     instance_id TEXT NOT NULL,
742///     work_item TEXT NOT NULL,  -- JSON serialized WorkItem
743///     visible_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
744///     lock_token TEXT,
745///     locked_until TIMESTAMP,
746///     created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
747/// );
748///
749/// -- Worker queue (peek-lock with visibility control)
750/// CREATE TABLE worker_queue (
751///     id INTEGER PRIMARY KEY AUTOINCREMENT,
752///     work_item TEXT NOT NULL,
753///     visible_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
754///     lock_token TEXT,
755///     locked_until TIMESTAMP,
756///     attempt_count INTEGER NOT NULL DEFAULT 0,
757///     created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
758/// );
759///
760/// -- Instance-level locks (CRITICAL: Prevents concurrent processing)
761/// CREATE TABLE instance_locks (
762///     instance_id TEXT PRIMARY KEY,
763///     lock_token TEXT NOT NULL,
764///     locked_until INTEGER NOT NULL,  -- Unix timestamp (milliseconds)
765///     locked_at INTEGER NOT NULL      -- Unix timestamp (milliseconds)
766/// );
767/// ```
768///
769/// # Implementing a New Provider: Checklist
770///
771/// 1. ✅ **Storage Layer**: Choose backing store (PostgreSQL, Redis, DynamoDB, etc.)
772/// 2. ✅ **Serialization**: Use serde_json for Event and WorkItem (or compatible format)
773/// 3. ✅ **Instance Locking**: Implement instance-level locks to prevent concurrent processing
774/// 4. ✅ **Locking**: Implement peek-lock with unique tokens and expiration
775/// 5. ✅ **Transactions**: Ensure `ack_orchestration_item` is atomic
776/// 6. ✅ **Indexes**: Add indexes on `instance_id`, `lock_token`, `visible_at` for orchestrator queue
777/// 7. ✅ **Delayed Visibility**: Support `visible_at` timestamps for TimerFired items in orchestrator queue
778/// 8. ✅ **Testing**: Use `tests/sqlite_provider_validations.rs` as a template, or see `docs/provider-implementation-guide.md`
779/// 9. ✅ **Multi-execution**: Support execution_id partitioning for ContinueAsNew
780/// 10. ⚠️ **DO NOT**: Inspect event contents (use ExecutionMetadata)
781/// 11. ⚠️ **DO NOT**: Create events (runtime owns event creation)
782/// 12. ⚠️ **DO NOT**: Make orchestration decisions (runtime owns logic)
783///
784/// # Example: Minimal Redis Provider Sketch
785///
786/// ```ignore
787/// struct RedisProvider {
788///     client: redis::Client,
789///     lock_timeout: Duration,
790/// }
791///
792/// impl Provider for RedisProvider {
793///     async fn fetch_orchestration_item(&self) -> Option<OrchestrationItem> {
794///         // 1. RPOPLPUSH from "orch_queue" to "orch_processing"
795///         let instance = self.client.rpoplpush("orch_queue", "orch_processing")?;
796///         
797///         // 2. Load history from "history:{instance}:{exec_id}" (sorted set)
798///         let exec_id = self.client.get(&format!("instance:{instance}:exec_id")).unwrap_or(1);
799///         let history = self.client.zrange(&format!("history:{instance}:{exec_id}"), 0, -1);
800///         
801///         // 3. Generate lock token and set expiration
802///         let lock_token = uuid::Uuid::new_v4().to_string();
803///         self.client.setex(&format!("lock:{lock_token}"), self.lock_timeout.as_secs() as usize, &instance);
804///         
805///         Some(OrchestrationItem { instance, history, lock_token, ... })
806///     }
807///     
808///     async fn ack_orchestration_item(..., metadata: ExecutionMetadata) -> Result<(), String> {
809///         let mut pipe = redis::pipe();
810///         pipe.atomic(); // Use Redis transaction
811///         
812///         // Append history (ZADD to sorted set with event_id as score)
813///         for event in history_delta {
814///             pipe.zadd(&format!("history:{instance}:{exec_id}"), event_json, event.event_id());
815///         }
816///         
817///         // Update metadata (HSET)
818///         if let Some(status) = &metadata.status {
819///             pipe.hset(&format!("exec:{instance}:{exec_id}"), "status", status);
820///             pipe.hset(&format!("exec:{instance}:{exec_id}"), "output", &metadata.output);
821///         }
822///         
823///         // Create next execution if needed
824///         if metadata.create_next_execution {
825///             pipe.incr(&format!("instance:{instance}:exec_id"), 1);
826///         }
827///         
828///         // Enqueue worker/orchestrator items
829///         for item in worker_items { pipe.lpush("worker_queue", serialize(item)); }
830///         for item in orch_items {
831///             let visible_at = match &item {
832///                 WorkItem::TimerFired { fire_at_ms, .. } => *fire_at_ms,
833///                 _ => now()
834///             };
835///             pipe.zadd("orch_queue", serialize(item), visible_at);
836///         }
837///         
838///         // Release lock
839///         pipe.del(&format!("lock:{lock_token}"));
840///         pipe.lrem("orch_processing", 1, instance);
841///         
842///         pipe.execute()?;
843///         Ok(())
844///     }
845/// }
846/// ```
847///
848/// # Design Principles
849///
850/// **Providers should be storage abstractions, NOT orchestration engines:**
851/// - ✅ Store and retrieve events as opaque data (don't inspect Event contents)
852/// - ✅ Manage queues and locks (generic queue operations)
853/// - ✅ Provide ACID guarantees where possible
854/// - ❌ DON'T interpret orchestration semantics (use ExecutionMetadata from runtime)
855/// - ❌ DON'T create events (runtime creates all events)
856/// - ❌ DON'T make orchestration decisions (runtime decides control flow)
857///
858/// # Multi-Execution Support (ContinueAsNew)
859///
860/// Orchestration instances can have multiple executions (execution_id 1, 2, 3, ...):
861/// - Each execution has its own event history
862/// - `read()` should return the LATEST execution's history
863/// - Provider tracks current_execution_id to know which execution is active
864/// - When metadata.create_next_execution=true, create a new execution record/entry
865///
866/// **Example:**
867/// ```text
868/// Instance "order-123":
869///   Execution 1: [OrchestrationStarted, ActivityScheduled, OrchestrationContinuedAsNew]
870///   Execution 2: [OrchestrationStarted, ActivityScheduled, OrchestrationCompleted]
871///   
872///   read("order-123") → Returns Execution 2's events (latest)
873///   read_with_execution("order-123", 1) → Returns Execution 1's events
874///   latest_execution_id("order-123") → Returns Some(2)
875///   list_executions("order-123") → Returns vec!\[1, 2\]
876/// ```
877///
878/// # Concurrency Model
879///
880/// The runtime runs 3 background dispatchers polling your queues:
881/// 1. **Orchestration Dispatcher**: Polls fetch_orchestration_item() continuously
882/// 2. **Work Dispatcher**: Polls fetch_work_item() continuously
883/// 3. **Timer Dispatcher**: Polls dequeue_timer_peek_lock() continuously
884///
885/// **Your implementation must be thread-safe** and support concurrent access from multiple dispatchers.
886///
887/// # Peek-Lock Pattern (Critical)
888///
889/// All dequeue operations use peek-lock semantics:
890/// 1. **Peek**: Select and lock a message (message stays in queue)
891/// 2. **Process**: Runtime processes the locked message
892/// 3. **Ack**: Delete message from queue (success) OR
893/// 4. **Abandon**: Release lock for retry (failure)
894///
895/// Benefits:
896/// - At-least-once delivery (messages survive crashes)
897/// - Automatic retry on worker failure (lock expires)
898/// - Prevents duplicate processing (locked messages invisible to others)
899///
900/// # Transactional Guarantees
901///
902/// **`ack_orchestration_item()` is the atomic boundary:**
903/// - ALL operations in ack must succeed or fail together
904/// - History append + queue enqueues + lock release = atomic
905/// - If commit fails, entire turn is retried
906/// - This ensures exactly-once semantics for orchestration turns
907///
908/// # Queue Message Flow
909///
910/// **Orchestrator Queue:**
911/// - Inputs: StartOrchestration, ActivityCompleted/Failed, TimerFired, ExternalRaised, SubOrchCompleted/Failed, CancelInstance, ContinueAsNew
912/// - Output: Processed by orchestration dispatcher → ack_orchestration_item
913/// - Batching: All messages for an instance processed together
914/// - Ordering: FIFO per instance preferred
915/// - Timers: TimerFired items are enqueued with `visible_at = fire_at_ms` for delayed visibility
916///
917/// **Worker Queue:**
918/// - Inputs: ActivityExecute (from ack_orchestration_item)
919/// - Output: Processed by work dispatcher → ActivityCompleted/Failed to orch queue
920/// - Batching: One message at a time (activities executed independently)
921/// - Ordering: FIFO preferred but not required
922///
923/// **Note:** There is no separate timer queue. Timers are handled by enqueuing TimerFired items
924/// directly to the orchestrator queue with delayed visibility (`visible_at` set to `fire_at_ms`).
925/// The orchestrator dispatcher processes them when `visible_at <= now()`.
926///
927/// # Instance Metadata Management
928///
929/// Providers typically maintain metadata about instances:
930/// - instance_id (primary key)
931/// - orchestration_name, orchestration_version (from StartOrchestration)
932/// - current_execution_id (for multi-execution support)
933/// - status, output (optional, from ExecutionMetadata)
934/// - created_at, updated_at timestamps
935///
936/// This metadata is updated via:
937/// - `ack_orchestration_item()` with ExecutionMetadata (creates instance and updates status/output)
938/// - `ack_orchestration_item()` with metadata.create_next_execution (increments current_execution_id)
939///
940/// **Note:** Instances are NOT created in `enqueue_for_orchestrator()`. Instance creation happens
941/// when the runtime acknowledges the first turn via `ack_orchestration_item()` with metadata.
942///
943/// # Error Handling Philosophy
944///
945/// - **Transient errors** (busy, timeout): Return error, let runtime retry
946/// - **Invalid tokens**: Return Ok (idempotent - already processed)
947/// - **Corruption** (missing instance, invalid data): Return error
948/// - **Concurrency conflicts**: Handle via locks, return error if deadlock
949///
950/// # Required vs Optional Methods
951///
952/// **REQUIRED** (must implement):
953/// - fetch_orchestration_item, ack_orchestration_item, abandon_orchestration_item
954/// - read, append_with_execution
955/// - enqueue_for_worker, fetch_work_item, ack_work_item
956/// - enqueue_for_orchestrator
957///
958/// **OPTIONAL** (has defaults):
959/// - latest_execution_id, read_with_execution
960/// - list_instances, list_executions
961///
962/// # Testing Your Provider
963///
964/// See `tests/sqlite_provider_validations.rs` for comprehensive provider validation tests:
965/// - Basic enqueue/dequeue operations
966/// - Transactional atomicity
967/// - Instance locking correctness (including multi-threaded tests)
968/// - Lock expiration and redelivery
969/// - Multi-execution support
970/// - Execution status and output persistence
971///
972/// All tests use the Provider trait directly (not runtime), so they're portable to new providers.
973/// See `docs/provider-implementation-guide.md` for detailed implementation guidance including instance locks.
974///
975/// Core provider trait for runtime orchestration operations.
976///
977/// This trait defines the essential methods required for durable orchestration execution.
978/// It focuses on runtime-critical operations: fetching work items, processing history,
979/// and managing queues. Management and observability features are provided through
980/// optional capability traits.
981///
982/// # Capability Discovery
983///
984/// Providers can implement additional capability traits (like `ProviderAdmin`)
985/// to expose richer features. The `Client` automatically discovers these capabilities
986/// through the `as_management_capability()` method.
987///
988/// # Implementation Guide for LLMs
989///
990/// When implementing a new provider, focus on these core methods:
991///
992/// ## Required Methods (9 total)
993///
994/// 1. **Orchestration Processing (3 methods)**
995///    - `fetch_orchestration_item()` - Atomic batch processing
996///    - `ack_orchestration_item()` - Commit processing results
997///    - `abandon_orchestration_item()` - Release locks and retry
998///
999/// 2. **History Access (1 method)**
1000///    - `read()` - Get event history for status checks
1001///
1002/// 3. **Worker Queue (2 methods)**
1003///    - `fetch_work_item()` - Get activity work items
1004///    - `ack_work_item()` - Acknowledge activity completion
1005///
1006/// 4. **Orchestrator Queue (1 method)**
1007///    - `enqueue_for_orchestrator()` - Enqueue control messages (including TimerFired with delayed visibility)
1008///
1009/// ## Optional Management Methods (2 methods)
1010///
1011/// These are included for backward compatibility but will be moved to
1012/// `ProviderAdmin` in future versions:
1013///
1014/// - `list_instances()` - List all instance IDs
1015/// - `list_executions()` - List execution IDs for an instance
1016///
1017/// # Capability Detection
1018///
1019/// Implement `as_management_capability()` to expose management features:
1020///
1021/// ```ignore
1022/// impl Provider for MyProvider {
1023///     // ... implement required methods
1024///     
1025///     fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1026///         Some(self as &dyn ProviderAdmin)
1027///     }
1028/// }
1029///
1030/// impl ProviderAdmin for MyProvider {
1031///     // ... implement management methods
1032/// }
1033/// ```
1034///
1035/// # Testing Your Provider
1036///
1037/// See `tests/sqlite_provider_validations.rs` for comprehensive provider validation tests:
1038/// - Basic enqueue/dequeue operations
1039/// - Transactional atomicity
1040/// - Instance locking correctness (including multi-threaded tests)
1041/// - Lock expiration and redelivery
1042/// - Multi-execution support
1043/// - Execution status and output persistence
1044///
1045/// All tests use the Provider trait directly (not runtime), so they're portable to new providers.
1046/// See `docs/provider-implementation-guide.md` for detailed implementation guidance including instance locks.
1047///
1048#[async_trait::async_trait]
1049#[allow(clippy::too_many_arguments)]
1050pub trait Provider: Any + Send + Sync {
1051    // ===== Provider Identity =====
1052
1053    /// Returns the name of this provider implementation.
1054    ///
1055    /// Used for logging and diagnostics. Override to provide a meaningful name.
1056    ///
1057    /// Default: "unknown"
1058    fn name(&self) -> &str {
1059        "unknown"
1060    }
1061
1062    /// Returns the version of this provider implementation.
1063    ///
1064    /// Used for logging and diagnostics. Override to provide the version.
1065    ///
1066    /// Default: "0.0.0"
1067    fn version(&self) -> &str {
1068        "0.0.0"
1069    }
1070
1071    // ===== Core Atomic Orchestration Methods (REQUIRED) =====
1072    // These three methods form the heart of reliable orchestration execution.
1073    //
1074    // ⚠️ CRITICAL ID GENERATION CONTRACT:
1075    //
1076    // The provider MUST NOT generate execution_id or event_id values.
1077    // All IDs are generated by the runtime and passed to the provider:
1078    //   - execution_id: Passed to ack_orchestration_item()
1079    //   - event_id: Set in each Event in the history_delta
1080    //
1081    // The provider's role is to STORE these IDs, not generate them.
1082
1083    /// Fetch the next orchestration work item atomically.
1084    ///
1085    /// # What This Does
1086    ///
1087    /// 1. **Select an instance to process**: Find the next available message in orchestrator queue that is not locked
1088    /// 2. **Acquire instance-level lock**: Atomically claim the instance lock to prevent concurrent processing
1089    /// 3. **Lock ALL messages** for that instance (batch processing)
1090    /// 4. **Load history**: Get complete event history for the current execution
1091    /// 5. **Load metadata**: Get orchestration_name, version, execution_id
1092    /// 6. **Return locked batch**: Provider must prevent other processes from touching this instance
1093    ///
1094    /// # ⚠️ CRITICAL: Instance-Level Locking
1095    ///
1096    /// **You MUST acquire an instance-level lock BEFORE fetching messages.** This prevents concurrent
1097    /// dispatchers from processing the same instance simultaneously, which would cause race conditions
1098    /// and data corruption.
1099    ///
1100    /// **Required Implementation Pattern:**
1101    /// 1. Find available instance (check no other lock is held for it)
1102    /// 2. Atomically acquire instance lock (with conflict handling)
1103    /// 3. Verify lock acquisition succeeded
1104    /// 4. Only then proceed to lock and fetch messages
1105    ///
1106    /// See `docs/provider-implementation-guide.md` for detailed implementation guidance.
1107    ///
1108    /// # Peek-Lock Semantics
1109    ///
1110    /// - Messages remain in queue until `ack_orchestration_item()` is called
1111    /// - Generate a unique `lock_token` (e.g., UUID)
1112    /// - Set `locked_until` timestamp (e.g., now + 30 seconds)
1113    /// - If lock expires before ack, messages become available again (automatic retry)
1114    ///
1115    /// # Visibility Filtering
1116    ///
1117    /// Only return messages where `visible_at <= now()`:
1118    /// - Normal messages: visible immediately
1119    /// - Delayed messages: `visible_at = now + delay` (used for timer backpressure)
1120    ///
1121    /// # Instance Batching
1122    ///
1123    /// **CRITICAL:** All messages in the batch must belong to the SAME instance.
1124    ///
1125    /// Implementation steps:
1126    /// 1. Find the first visible, unlocked instance in the orchestrator queue
1127    /// 2. Atomically acquire the instance lock (prevent concurrent processing)
1128    /// 3. Lock ALL queued messages for that instance
1129    /// 4. Fetch all locked messages for processing
1130    ///
1131    /// # History Loading
1132    ///
1133    /// - For new instances (no history yet): return empty Vec
1134    /// - For existing instances: return ALL events for current execution_id, ordered by event_id
1135    /// - For multi-execution instances: return ONLY the LATEST execution's history
1136    ///
1137    /// SQLite example:
1138    /// ```text
1139    /// // Get current execution ID
1140    /// let exec_id = SELECT current_execution_id FROM instances WHERE instance_id = ?
1141    ///
1142    /// // Load history for that execution
1143    /// SELECT event_data FROM history
1144    /// WHERE instance_id = ? AND execution_id = ?
1145    /// ORDER BY event_id
1146    /// ```
1147    ///
1148    /// # Return Value
1149    ///
1150    /// - `Some(OrchestrationItem)` - Work is available and locked
1151    /// - `None` - No work available (dispatcher will sleep and retry)
1152    ///
1153    /// # Error Handling
1154    ///
1155    /// Don't panic on transient errors - return None and let dispatcher retry.
1156    /// Only panic on unrecoverable errors (e.g., corrupted data or schema).
1157    ///
1158    /// # Concurrency
1159    ///
1160    /// This method is called continuously by the orchestration dispatcher.
1161    /// Must be thread-safe and handle concurrent calls gracefully.
1162    ///
1163    /// # Example Implementation Pattern
1164    ///
1165    /// ```ignore
1166    /// async fn fetch_orchestration_item(&self) -> Option<OrchestrationItem> {
1167    ///     let tx = begin_transaction()?;
1168    ///     
1169    ///     // Step 1: Find next available instance (check instance_locks)
1170    ///     let instance_id = SELECT q.instance_id FROM orch_queue q
1171    ///         LEFT JOIN instance_locks il ON q.instance_id = il.instance_id
1172    ///         WHERE q.visible_at <= now()
1173    ///           AND (il.instance_id IS NULL OR il.locked_until <= now())
1174    ///         ORDER BY q.id LIMIT 1;
1175    ///     
1176    ///     if instance_id.is_none() { return None; }
1177    ///     
1178    ///     // Step 2: Atomically acquire instance lock
1179    ///     let lock_token = generate_uuid();
1180    ///     let lock_result = INSERT INTO instance_locks (instance_id, lock_token, locked_until, locked_at)
1181    ///         VALUES (?, ?, ?, ?)
1182    ///         ON CONFLICT(instance_id) DO UPDATE
1183    ///         SET lock_token = excluded.lock_token, locked_until = excluded.locked_until
1184    ///         WHERE locked_until <= excluded.locked_at;
1185    ///     
1186    ///     if lock_result.rows_affected == 0 {
1187    ///         // Lock acquisition failed - another dispatcher has the lock
1188    ///         return None;
1189    ///     }
1190    ///     
1191    ///     // Step 3: Lock all messages for this instance
1192    ///     UPDATE orch_queue SET lock_token = ?, locked_until = now() + 30s
1193    ///         WHERE instance_id = ? AND visible_at <= now();
1194    ///     
1195    ///     // Step 4: Fetch locked messages
1196    ///     let messages = SELECT work_item FROM orch_queue WHERE lock_token = ?;
1197    ///     
1198    ///     // Step 5: Load instance metadata
1199    ///     let (name, version, exec_id) = SELECT ... FROM instances WHERE instance_id = ?;
1200    ///     
1201    ///     // Step 6: Load history for current execution
1202    ///     let history = SELECT event_data FROM history
1203    ///         WHERE instance_id = ? AND execution_id = ?
1204    ///         ORDER BY event_id;
1205    ///     
1206    ///     commit_transaction();
1207    ///     Some((OrchestrationItem { instance, orchestration_name, execution_id, version, history, messages }, lock_token, attempt_count))
1208    /// }
1209    /// ```
1210    ///
1211    /// # Return Value
1212    ///
1213    /// * `Ok(Some((item, lock_token, attempt_count)))` - Orchestration item with lock token and attempt count
1214    ///   - `item`: The orchestration item to process
1215    ///   - `lock_token`: Unique token for ack/abandon operations
1216    ///   - `attempt_count`: Number of times this item has been fetched (for poison detection)
1217    /// * `Ok(None)` - No work available
1218    /// * `Err(ProviderError)` - Storage error
1219    async fn fetch_orchestration_item(
1220        &self,
1221        lock_timeout: Duration,
1222        poll_timeout: Duration,
1223        filter: Option<&DispatcherCapabilityFilter>,
1224    ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError>;
1225
1226    /// Acknowledge successful orchestration processing atomically.
1227    ///
1228    /// This is the most critical method - it commits all changes from an orchestration turn atomically.
1229    ///
1230    /// # What This Does (ALL must be atomic)
1231    ///
1232    /// 1. **Validate lock token**: Verify instance lock is still valid and matches lock_token
1233    /// 2. **Remove instance lock**: Release the instance lock (processing complete)
1234    /// 3. **Append new events** to history for current execution
1235    /// 4. **Update execution metadata** (status, output) using pre-computed metadata
1236    /// 5. **Create new execution** if metadata.create_next_execution=true (ContinueAsNew)
1237    /// 6. **Enqueue worker_items** to worker queue (activity executions)
1238    /// 7. **Enqueue orchestrator_items** to orchestrator queue (completions, new instances, TimerFired with delayed visibility)
1239    /// 8. **Remove acknowledged messages** from orchestrator queue (release lock)
1240    ///
1241    /// # Atomicity Requirements
1242    ///
1243    /// **CRITICAL:** All 8 operations above must succeed or fail together.
1244    ///
1245    /// - **If ANY operation fails**: Roll back all changes
1246    /// - **If all operations succeed**: All changes are durable and visible
1247    ///
1248    /// This prevents:
1249    /// - Duplicate activity execution (history saved but worker item lost)
1250    /// - Lost completions (worker item enqueued but history not saved)
1251    /// - Orphaned locks (messages deleted but history append failed)
1252    /// - Stale instance locks (instance lock not removed)
1253    ///
1254    /// # Parameters
1255    ///
1256    /// * `lock_token` - Token from fetch_orchestration_item() - identifies locked messages
1257    /// * `history_delta` - New events to append (runtime assigns event_ids, provider stores as-is)
1258    /// * `worker_items` - Activity executions to enqueue (WorkItem::ActivityExecute)
1259    /// * `orchestrator_items` - Orchestrator messages to enqueue (completions, starts, TimerFired with delayed visibility)
1260    /// * `metadata` - Pre-computed execution state (DO NOT inspect events yourself!)
1261    ///
1262    /// # Event Storage (history_delta)
1263    ///
1264    /// **Important:** Store events exactly as provided. DO NOT:
1265    /// - Modify event_id (runtime assigns these)
1266    /// - Inspect event contents to make decisions
1267    /// - Filter or reorder events
1268    ///
1269    /// SQLite example:
1270    /// ```text
1271    /// for event in &history_delta {
1272    ///     let event_json = serde_json::to_string(&event)?;
1273    ///     let event_type = extract_discriminant_name(&event); // For indexing only
1274    ///     INSERT INTO history (instance_id, execution_id, event_id, event_data, event_type)
1275    ///     VALUES (?, ?, event.event_id(), event_json, event_type)
1276    /// }
1277    /// ```
1278    ///
1279    /// # Metadata Storage (ExecutionMetadata)
1280    ///
1281    /// **The runtime has ALREADY inspected events and computed metadata.**
1282    /// Provider just stores it:
1283    ///
1284    /// ```text
1285    /// // Update execution status/output from metadata
1286    /// if let Some(status) = &metadata.status {
1287    ///     UPDATE executions
1288    ///     SET status = ?, output = ?, completed_at = CURRENT_TIMESTAMP
1289    ///     WHERE instance_id = ? AND execution_id = ?
1290    /// }
1291    ///
1292    /// // Create next execution if requested
1293    /// if metadata.create_next_execution {
1294    ///     if let Some(next_id) = metadata.next_execution_id {
1295    ///         INSERT INTO executions (instance_id, execution_id, status)
1296    ///         VALUES (?, next_id, 'Running')
1297    ///         
1298    ///         UPDATE instances SET current_execution_id = next_id
1299    ///     }
1300    /// }
1301    /// ```
1302    ///
1303    /// # Queue Item Enqueuing
1304    ///
1305    /// Worker items and orchestrator items must be enqueued within the same transaction:
1306    ///
1307    /// ```text
1308    /// // Worker queue (no special handling)
1309    /// for item in worker_items {
1310    ///     INSERT INTO worker_queue (work_item) VALUES (serde_json::to_string(&item))
1311    /// }
1312    ///
1313    /// // Orchestrator queue (may have delayed visibility for TimerFired)
1314    /// for item in orchestrator_items {
1315    ///     let visible_at = match &item {
1316    ///         WorkItem::TimerFired { fire_at_ms, .. } => *fire_at_ms,  // Delayed visibility
1317    ///         _ => now()  // Immediate visibility
1318    ///     };
1319    ///     
1320    ///     INSERT INTO orchestrator_queue (instance_id, work_item, visible_at)
1321    ///     VALUES (extract_instance(&item), serde_json::to_string(&item), visible_at)
1322    /// }
1323    /// ```
1324    ///
1325    /// # Lock Release
1326    ///
1327    /// Remove all acknowledged messages associated with this lock_token.
1328    ///
1329    /// # Error Handling
1330    ///
1331    /// - Return `Ok(())` if all operations succeeded
1332    /// - Return `Err(msg)` if any operation failed (all changes rolled back)
1333    /// - On error, runtime will call `abandon_orchestration_item()` to release lock
1334    ///
1335    /// # Special Cases
1336    ///
1337    /// **StartOrchestration in orchestrator_items:**
1338    /// - May need to create instance metadata record/entry (idempotent create-if-not-exists)
1339    /// - Should create execution record/entry with ID=1 if new instance
1340    ///
1341    /// **Empty history_delta:**
1342    /// - Valid case (e.g., terminal instance being acked with no changes)
1343    /// - Still process queues and release lock
1344    ///
1345    /// # Parameters
1346    ///
1347    /// * `lock_token` - Token from `fetch_orchestration_item` identifying the batch
1348    /// * `execution_id` - **The execution ID this history belongs to** (runtime decides this)
1349    /// * `history_delta` - Events to append to the specified execution
1350    /// * `worker_items` - Activity work items to enqueue
1351    /// * `orchestrator_items` - Orchestration work items to enqueue (StartOrchestration, ContinueAsNew, TimerFired, etc.)
1352    ///   - TimerFired items should be enqueued with `visible_at = fire_at_ms` for delayed visibility
1353    /// * `metadata` - Pre-computed execution metadata (status, output)
1354    ///
1355    /// # TimerFired Items
1356    ///
1357    /// When `TimerFired` items are present in `orchestrator_items`, they should be enqueued with
1358    /// delayed visibility using the `fire_at_ms` field for the `visible_at` timestamp.
1359    /// This allows timers to fire at the correct logical time.
1360    ///
1361    /// # execution_id Parameter
1362    ///
1363    /// The `execution_id` parameter tells the provider **which execution this history belongs to**.
1364    /// The runtime is responsible for:
1365    /// - Deciding when to create new executions (e.g., for ContinueAsNew)
1366    /// - Managing execution ID sequencing
1367    /// - Ensuring each execution has its own isolated history
1368    ///
1369    /// The provider should:
1370    /// - **Create the execution record if it doesn't exist** (idempotent create-if-not-exists)
1371    /// - Append `history_delta` to the specified `execution_id`
1372    /// - Update the instance's current execution pointer if this execution_id is newer
1373    /// - NOT inspect WorkItems to decide execution IDs
1374    ///
1375    /// # SQLite Implementation Pattern
1376    ///
1377    /// ```text
1378    /// async fn ack_orchestration_item(execution_id, ...) -> Result<(), String> {
1379    ///     let tx = begin_transaction()?;
1380    ///     
1381    ///     // Step 1: Validate lock token and get instance_id from instance_locks
1382    ///     let instance_id = SELECT instance_id FROM instance_locks
1383    ///         WHERE lock_token = ? AND locked_until > now();
1384    ///     if instance_id.is_none() {
1385    ///         return Err("Invalid or expired lock token");
1386    ///     }
1387    ///     
1388    ///     // Step 2: Remove instance lock (processing complete)
1389    ///     DELETE FROM instance_locks WHERE instance_id = ? AND lock_token = ?;
1390    ///     
1391    ///     // Step 3: Create execution record if it doesn't exist (idempotent)
1392    ///     INSERT OR IGNORE INTO executions (instance_id, execution_id, status)
1393    ///     VALUES (?, ?, 'Running');
1394    ///     
1395    ///     // Step 4: Append history to the SPECIFIED execution_id
1396    ///     for event in history_delta {
1397    ///         INSERT INTO history (...) VALUES (instance_id, execution_id, event.event_id(), ...)
1398    ///     }
1399    ///     
1400    ///     // Step 5: Update execution metadata (no event inspection!)
1401    ///     if let Some(status) = &metadata.status {
1402    ///         UPDATE executions SET status = ?, output = ?, completed_at = NOW()
1403    ///         WHERE instance_id = ? AND execution_id = ?
1404    ///     }
1405    ///     
1406    ///     // Step 6: Update current_execution_id if this is a newer execution
1407    ///     UPDATE instances SET current_execution_id = GREATEST(current_execution_id, ?)
1408    ///     WHERE instance_id = ?;
1409    ///     
1410    ///     // Step 7: Enqueue worker/orchestrator items
1411    ///     // Worker items go to worker queue (populate identity columns)
1412    ///     for item in worker_items {
1413    ///         if let WorkItem::ActivityExecute { instance, execution_id, id, .. } = &item {
1414    ///             INSERT INTO worker_queue (work_item, instance_id, execution_id, activity_id)
1415    ///             VALUES (serialize(item), instance, execution_id, id)
1416    ///         }
1417    ///     }
1418    ///     
1419    ///     // Orchestrator items go to orchestrator queue (TimerFired uses fire_at_ms for visible_at)
1420    ///     for item in orchestrator_items {
1421    ///         let visible_at = match &item {
1422    ///             WorkItem::TimerFired { fire_at_ms, .. } => *fire_at_ms,
1423    ///             _ => now()
1424    ///         };
1425    ///         INSERT INTO orchestrator_queue (instance_id, work_item, visible_at)
1426    ///         VALUES (extract_instance(&item), serialize(item), visible_at)
1427    ///     }
1428    ///     
1429    ///     // Step 8: Delete cancelled activities from worker queue
1430    ///     DELETE FROM worker_queue
1431    ///     WHERE (instance_id = ? AND execution_id = ? AND activity_id = ?)
1432    ///        OR (instance_id = ? AND execution_id = ? AND activity_id = ?)
1433    ///        ...
1434    ///     
1435    ///     // Step 9: Release lock: DELETE FROM orch_queue WHERE lock_token = ?;
1436    ///     
1437    ///     commit_transaction()?;
1438    ///     Ok(())
1439    /// }
1440    /// ```
1441    async fn ack_orchestration_item(
1442        &self,
1443        _lock_token: &str,
1444        _execution_id: u64,
1445        _history_delta: Vec<Event>,
1446        _worker_items: Vec<WorkItem>,
1447        _orchestrator_items: Vec<WorkItem>,
1448        _metadata: ExecutionMetadata,
1449        _cancelled_activities: Vec<ScheduledActivityIdentifier>,
1450    ) -> Result<(), ProviderError>;
1451
1452    /// Abandon orchestration processing (used for errors/retries).
1453    ///
1454    /// Called when orchestration processing fails (e.g., storage contention, runtime crash).
1455    /// The messages must be made available for reprocessing.
1456    ///
1457    /// # What This Does
1458    ///
1459    /// 1. **Clear lock_token** from messages (make available again)
1460    /// 2. **Remove instance lock** (release instance-level lock)
1461    /// 3. **Optionally delay** retry by setting visibility timestamp
1462    /// 4. **Preserve message order** (don't reorder or modify messages)
1463    ///
1464    /// # Parameters
1465    ///
1466    /// * `lock_token` - Token from fetch_orchestration_item()
1467    /// * `delay` - Optional delay before messages become visible again
1468    ///   - `None`: immediate retry (visible_at = now)
1469    ///   - `Some(duration)`: delayed retry (visible_at = now + duration)
1470    ///
1471    /// # Implementation Pattern
1472    ///
1473    /// ```ignore
1474    /// async fn abandon_orchestration_item(lock_token: &str, delay: Option<Duration>) -> Result<(), String> {
1475    ///     let visible_at = if let Some(delay) = delay {
1476    ///         now() + delay
1477    ///     } else {
1478    ///         now()
1479    ///     };
1480    ///     
1481    ///     BEGIN TRANSACTION
1482    ///         // Get instance_id from instance_locks
1483    ///         let instance_id = SELECT instance_id FROM instance_locks WHERE lock_token = ?;
1484    ///         
1485    ///         IF instance_id IS NULL:
1486    ///             // Invalid lock token - return error
1487    ///             ROLLBACK
1488    ///             RETURN Err("Invalid lock token")
1489    ///         
1490    ///         // Clear lock_token from messages
1491    ///         UPDATE orchestrator_queue
1492    ///         SET lock_token = NULL, locked_until = NULL, visible_at = ?
1493    ///         WHERE lock_token = ?;
1494    ///         
1495    ///         // Remove instance lock
1496    ///         DELETE FROM instance_locks WHERE lock_token = ?;
1497    ///     COMMIT
1498    ///     
1499    ///     Ok(())
1500    /// }
1501    /// ```
1502    ///
1503    /// # Use Cases
1504    ///
1505    /// - Storage contention → delay = Some(Duration::from_millis(50)) for backoff
1506    /// - Orchestration turn failed → delay = None for immediate retry
1507    /// - Runtime shutdown during processing → messages auto-recover when lock expires
1508    ///
1509    /// # Error Handling
1510    ///
1511    /// **Invalid lock tokens MUST return an error.** Unlike `ack_orchestration_item()`, this method
1512    /// should not be idempotent for invalid tokens. Invalid tokens indicate a programming error or
1513    /// state corruption and should be surfaced as errors.
1514    ///
1515    /// Return `Err("Invalid lock token")` if the lock token is not recognized or has expired.
1516    ///
1517    /// # Parameters
1518    ///
1519    /// * `lock_token` - The lock token from `fetch_orchestration_item`
1520    /// * `delay` - Optional delay before message becomes visible again
1521    /// * `ignore_attempt` - If true, decrement the attempt count (never below 0)
1522    async fn abandon_orchestration_item(
1523        &self,
1524        _lock_token: &str,
1525        _delay: Option<Duration>,
1526        _ignore_attempt: bool,
1527    ) -> Result<(), ProviderError>;
1528
1529    /// Read the full history for the latest execution of an instance.
1530    ///
1531    /// # What This Does
1532    ///
1533    /// Returns ALL events for the LATEST execution of an instance, ordered by event_id.
1534    ///
1535    /// # Multi-Execution Behavior
1536    ///
1537    /// For instances with multiple executions (from ContinueAsNew):
1538    /// - Find the latest execution_id (MAX(execution_id))
1539    /// - Return events for ONLY that execution
1540    /// - DO NOT return events from earlier executions
1541    ///
1542    /// # Return Value
1543    ///
1544    /// - `Ok(Vec<Event>)` - Events ordered by event_id ascending (event_id 1, 2, 3, ...)
1545    /// - `Err(ProviderError)` - Storage error
1546    ///
1547    /// # Usage
1548    ///
1549    /// **NOT called in runtime hot path.** Used by:
1550    /// - Client.get_orchestration_status() - to determine current state
1551    /// - Runtime.get_orchestration_descriptor() - to get orchestration metadata
1552    /// - Testing and validation suites
1553    ///
1554    /// The runtime hot path uses `fetch_orchestration_item()` which loads history internally.
1555    async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError>;
1556
1557    // ===== History Reading and Testing Methods (NOT used by runtime hot path) =====
1558    // These methods are NOT called by the main runtime during orchestration execution.
1559    // They are used by testing, validation suites, client APIs, and debugging tools.
1560
1561    /// Read the full event history for a specific execution within an instance.
1562    ///
1563    /// **NOT called in runtime hot path.** Used by testing, validation suites, and debugging tools.
1564    ///
1565    /// # Parameters
1566    ///
1567    /// * `instance` - The ID of the orchestration instance.
1568    /// * `execution_id` - The specific execution ID to read history for.
1569    ///
1570    /// # Returns
1571    ///
1572    /// Vector of events in chronological order (oldest first).
1573    ///
1574    /// # Implementation Example
1575    ///
1576    /// ```ignore
1577    /// async fn read_with_execution(&self, instance: &str, execution_id: u64) -> Result<Vec<Event>, ProviderError> {
1578    ///     SELECT event_data FROM history
1579    ///     WHERE instance_id = ? AND execution_id = ?
1580    ///     ORDER BY event_id
1581    /// }
1582    /// ```
1583    async fn read_with_execution(&self, _instance: &str, _execution_id: u64) -> Result<Vec<Event>, ProviderError>;
1584
1585    /// Append events to a specific execution.
1586    ///
1587    /// **NOT called in runtime hot path.** Used by testing and validation suites for fault injection
1588    /// and test setup. The runtime uses `ack_orchestration_item()` to append history atomically.
1589    ///
1590    /// # What This Does
1591    ///
1592    /// Add new events to the history log for a specific execution.
1593    ///
1594    /// # CRITICAL: Event ID Assignment
1595    ///
1596    /// **The runtime assigns event_ids BEFORE calling this method.**
1597    /// - DO NOT modify event.event_id()
1598    /// - DO NOT renumber events
1599    /// - Store events exactly as provided
1600    ///
1601    /// # Duplicate Detection
1602    ///
1603    /// Events with duplicate (instance_id, execution_id, event_id) should:
1604    /// - Either: Reject with error (let runtime handle)
1605    /// - Or: IGNORE (idempotent append)
1606    /// - NEVER: Overwrite existing event (corrupts history)
1607    ///
1608    /// # Parameters
1609    ///
1610    /// * `instance` - The ID of the orchestration instance.
1611    /// * `execution_id` - The specific execution ID to append events to.
1612    /// * `new_events` - Vector of events to append (event_ids already assigned).
1613    ///
1614    /// # Returns
1615    ///
1616    /// `Ok(())` on success, `Err(ProviderError)` on failure.
1617    ///
1618    /// # Implementation Example
1619    ///
1620    /// ```ignore
1621    /// async fn append_with_execution(
1622    ///     &self,
1623    ///     instance: &str,
1624    ///     execution_id: u64,
1625    ///     new_events: Vec<Event>,
1626    /// ) -> Result<(), ProviderError> {
1627    ///     for event in &new_events {
1628    ///         INSERT INTO history (instance_id, execution_id, event_id, event_data)
1629    ///         VALUES (?, ?, event.event_id(), serialize(event))
1630    ///     }
1631    ///     Ok(())
1632    /// }
1633    /// ```
1634    async fn append_with_execution(
1635        &self,
1636        _instance: &str,
1637        _execution_id: u64,
1638        _new_events: Vec<Event>,
1639    ) -> Result<(), ProviderError>;
1640
1641    // ===== Worker Queue Operations (REQUIRED) =====
1642    // Worker queue processes activity executions.
1643
1644    /// Enqueue an activity execution request.
1645    ///
1646    /// # What This Does
1647    ///
1648    /// Add a WorkItem::ActivityExecute to the worker queue for background processing.
1649    ///
1650    /// # Implementation
1651    ///
1652    /// ```ignore
1653    /// async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), String> {
1654    ///     INSERT INTO worker_queue (work_item)
1655    ///     VALUES (serde_json::to_string(&item)?)
1656    /// }
1657    /// ```
1658    ///
1659    /// # Locking
1660    ///
1661    /// New messages should have lock_token = NULL (available for dequeue).
1662    ///
1663    /// # Ordering
1664    ///
1665    /// FIFO order preferred but not strictly required.
1666    async fn enqueue_for_worker(&self, _item: WorkItem) -> Result<(), ProviderError>;
1667
1668    /// Dequeue a single work item with peek-lock semantics.
1669    ///
1670    /// # What This Does
1671    ///
1672    /// Fetch next available worker task.
1673    ///
1674    /// 1. Find an available task in the worker queue (visible_at <= now, not locked)
1675    /// 2. Lock it with a unique token
1676    /// 3. Return item + token (item stays in queue until ack)
1677    ///
1678    /// # Parameters
1679    ///
1680    /// - `lock_timeout`: Duration to lock the item for processing.
1681    /// - `poll_timeout`: Maximum time to wait for work. Provider MAY wait up to this
1682    ///   duration if it supports long polling, or return immediately if it doesn't.
1683    /// - `session`: Session routing configuration.
1684    ///   - `Some(config)`: Session-aware fetch — returns non-session + owned + claimable session items.
1685    ///   - `None`: Non-session only — returns only items with `session_id IS NULL`.
1686    ///
1687    /// # Return Value
1688    ///
1689    /// - `Ok(Some((WorkItem, String, u32)))` - Item is locked and ready to process
1690    ///   - WorkItem: The work item to process
1691    ///   - String: Lock token for ack/abandon
1692    ///   - u32: Attempt count (number of times this item has been fetched)
1693    /// - `Ok(None)` - No work available
1694    /// - `Err(ProviderError)` - Storage error (allows runtime to distinguish empty queue from failures)
1695    ///
1696    /// # Concurrency
1697    ///
1698    /// Called continuously by work dispatcher. Must prevent double-dequeue.
1699    ///
1700    /// # Session Routing
1701    ///
1702    /// When `session` is `Some(config)`, the provider must filter eligible items:
1703    /// - Non-session items (`session_id IS NULL`): always eligible
1704    /// - Owned-session items: items whose `session_id` has an active session owned by `config.owner_id`
1705    /// - Claimable-session items: items whose `session_id` has no active session (unowned)
1706    ///
1707    /// When fetching a claimable session-bound item, atomically create or update the session
1708    /// record with `config.owner_id` and `config.lock_timeout`.
1709    ///
1710    /// When `session` is `None`, only non-session items (`session_id IS NULL`) are returned.
1711    /// Session-bound items are skipped entirely.
1712    ///
1713    /// # Tag Filtering
1714    ///
1715    /// The `tag_filter` parameter controls which activities this worker processes:
1716    /// - `DefaultOnly`: Only items with `tag IS NULL`
1717    /// - `Tags(set)`: Only items whose `tag` is in `set`
1718    /// - `DefaultAnd(set)`: Items with `tag IS NULL` OR `tag` in `set`
1719    /// - `None`: No items (orchestrator-only mode)
1720    async fn fetch_work_item(
1721        &self,
1722        lock_timeout: Duration,
1723        poll_timeout: Duration,
1724        session: Option<&SessionFetchConfig>,
1725        tag_filter: &TagFilter,
1726    ) -> Result<Option<(WorkItem, String, u32)>, ProviderError>;
1727
1728    /// Acknowledge successful processing of a work item.
1729    ///
1730    /// # What This Does
1731    ///
1732    /// Atomically acknowledge worker item and optionally enqueue completion to orchestrator queue.
1733    ///
1734    /// # Purpose
1735    ///
1736    /// Ensures completion delivery and worker ack happen atomically.
1737    /// Prevents lost completions if enqueue succeeds but ack fails.
1738    /// Prevents duplicate work if ack succeeds but enqueue fails.
1739    ///
1740    /// # Parameters
1741    ///
1742    /// * `token` - Lock token from fetch_work_item
1743    /// * `completion` - Optional completion work item:
1744    ///   - `Some(WorkItem)`: Remove worker item AND enqueue completion to orchestrator queue
1745    ///   - `None`: Remove worker item WITHOUT enqueueing anything (used when activity was cancelled)
1746    ///
1747    /// # Error on Missing Entry
1748    ///
1749    /// **CRITICAL**: This method MUST return a non-retryable error if the locked worker work-item
1750    /// entry does not exist anymore (was already removed) or cannot be acknowledged as expected.
1751    /// This can happen when:
1752    /// - The activity was cancelled (the backing entry was removed/invalidated by orchestration)
1753    /// - The lock was stolen by another worker (lock expired and work-item was re-fetched/completed)
1754    /// - The lock expired (worker took too long without renewing)
1755    ///
1756    /// The worker dispatcher uses this error to detect cancellation races and log appropriately.
1757    ///
1758    /// # Implementation
1759    ///
1760    /// ```ignore
1761    /// async fn ack_work_item(&self, token: &str, completion: Option<WorkItem>) -> Result<(), ProviderError> {
1762    ///     BEGIN TRANSACTION
1763    ///         // Relational/SQL example:
1764    ///         let result = DELETE FROM worker_queue WHERE lock_token = ?token AND locked_until > now();
1765    ///         
1766    ///         // CRITICAL: Check if the locked entry existed and lock was still valid
1767    ///         if result.rows_affected == 0 {
1768    ///             ROLLBACK
1769    ///             return Err(ProviderError::permanent("Work item not found - cancelled, stolen, or lock expired"))
1770    ///         }
1771    ///         
1772    ///         if let Some(completion) = completion {
1773    ///             INSERT INTO orchestrator_queue (instance_id, work_item, visible_at)
1774    ///             VALUES (completion.instance, serialize(completion), now)
1775    ///         }
1776    ///     COMMIT
1777    /// }
1778    /// ```
1779    async fn ack_work_item(&self, token: &str, completion: Option<WorkItem>) -> Result<(), ProviderError>;
1780
1781    /// Renew the lock on a worker queue item.
1782    ///
1783    /// # What This Does
1784    ///
1785    /// Extends the lock timeout for an in-progress activity execution, preventing
1786    /// the lock from expiring while the activity is still being processed.
1787    ///
1788    /// # Purpose
1789    ///
1790    /// Enables long-running activities to complete without lock timeout:
1791    /// - Worker fetches activity with initial lock timeout (e.g., 30s)
1792    /// - Background renewal task periodically extends the lock
1793    /// - If worker crashes, renewal stops and lock expires naturally
1794    /// - Another worker can then pick up the abandoned activity
1795    ///
1796    /// # Activity Cancellation
1797    ///
1798    /// This method also serves as the cancellation detection mechanism. When the
1799    /// orchestration runtime decides an activity should be cancelled, it removes or invalidates
1800    /// the backing entry for that locked work-item (e.g., by
1801    /// removing the corresponding entry). The next renewal attempt will fail, signaling to the
1802    /// worker that the activity should be cancelled.
1803    ///
1804    /// # Parameters
1805    ///
1806    /// * `token` - Lock token from fetch_work_item
1807    /// * `extend_for` - [`Duration`] to extend lock from now (typically matches `RuntimeOptions::worker_lock_timeout`)
1808    ///
1809    /// # Returns
1810    ///
1811    /// * `Ok(())` - Lock renewed successfully
1812    /// * `Err(ProviderError)` - Lock renewal failed:
1813    ///   - Permanent error: Token invalid, expired, entry removed (cancelled), or already acked
1814    ///   - Retryable error: Storage connection issues
1815    ///
1816    /// # Implementation Pattern
1817    ///
1818    /// ```ignore
1819    /// async fn renew_work_item_lock(&self, token: &str, extend_for: Duration) -> Result<(), ProviderError> {
1820    ///     // Relational/SQL example:
1821    ///     let result = UPDATE worker_queue
1822    ///                  SET locked_until = now() + extend_for
1823    ///                  WHERE lock_token = ?token AND locked_until > now();
1824    ///     
1825    ///     if result.rows_affected == 0:
1826    ///         // Entry doesn't exist (cancelled), lock expired, or invalid token
1827    ///         return Err(ProviderError::permanent("Lock token invalid, expired, or entry removed"))
1828    ///     
1829    ///     Ok(())
1830    /// }
1831    /// ```
1832    ///
1833    /// # Concurrency
1834    ///
1835    /// - Safe to call while activity is executing
1836    /// - Idempotent: Multiple renewals with same token are safe
1837    /// - Renewal fails gracefully after ack_work_item (token deleted)
1838    ///
1839    /// # Usage in Runtime
1840    ///
1841    /// Called by worker dispatcher's activity manager task:
1842    /// - Renewal interval calculated based on lock timeout
1843    /// - On error: triggers cancellation token (entry may have been removed/invalidated for cancellation)
1844    /// - Stops automatically when activity completes or worker crashes
1845    async fn renew_work_item_lock(&self, token: &str, extend_for: Duration) -> Result<(), ProviderError>;
1846
1847    /// Heartbeat all non-idle sessions owned by the given workers.
1848    ///
1849    /// Extends `locked_until` for sessions where:
1850    /// - `owner_id` matches any of the provided `owner_ids`
1851    /// - `locked_until > now` (still owned)
1852    /// - `last_activity_at + idle_timeout > now` (not idle)
1853    ///
1854    /// Sessions that are idle (no recent activity flow) are NOT renewed,
1855    /// causing their locks to naturally expire.
1856    ///
1857    /// Accepts a slice of owner IDs so the provider can batch the operation
1858    /// into a single storage call.
1859    ///
1860    /// # Returns
1861    ///
1862    /// Total count of sessions renewed across all owners.
1863    async fn renew_session_lock(
1864        &self,
1865        owner_ids: &[&str],
1866        extend_for: Duration,
1867        idle_timeout: Duration,
1868    ) -> Result<usize, ProviderError>;
1869
1870    /// Sweep orphaned session entries.
1871    ///
1872    /// This is a convenience hook so providers don't need background threads for
1873    /// a common cleanup scenario. The runtime calls it periodically on behalf of
1874    /// all workers.
1875    ///
1876    /// Removes sessions where:
1877    /// - `locked_until < now` (lock expired)
1878    /// - No pending work items reference this session
1879    ///
1880    /// Any worker can sweep any worker's orphans.
1881    ///
1882    /// Providers that already clean up expired/ownerless sessions internally
1883    /// (e.g., via TTL, background sweeps, or eager eviction on fetch) may
1884    /// return `Ok(0)` here.
1885    ///
1886    /// # Returns
1887    ///
1888    /// Count of sessions removed (0 is valid if no orphans exist or the
1889    /// provider handles cleanup through other means).
1890    async fn cleanup_orphaned_sessions(&self, idle_timeout: Duration) -> Result<usize, ProviderError>;
1891
1892    /// Abandon work item processing (release lock without completing).
1893    ///
1894    /// Called when activity processing fails with a retryable error (e.g., storage contention)
1895    /// and the work item should be made available for another worker to pick up.
1896    ///
1897    /// # What This Does
1898    ///
1899    /// 1. **Clear lock** - Remove lock_token and reset locked_until
1900    /// 2. **Optionally delay** - Set visible_at to defer retry
1901    /// 3. **Preserve message** - Don't delete or modify the work item content
1902    ///
1903    /// # Parameters
1904    ///
1905    /// * `token` - Lock token from fetch_work_item
1906    /// * `delay` - Optional delay before message becomes visible again
1907    ///   - `None`: Immediate visibility (visible_at = now)
1908    ///   - `Some(duration)`: Delayed visibility (visible_at = now + duration)
1909    ///
1910    /// # Returns
1911    ///
1912    /// * `Ok(())` - Work item abandoned, lock released
1913    /// * `Err(ProviderError)` - Abandon failed (invalid token, already acked, etc.)
1914    ///
1915    /// # Implementation Pattern
1916    ///
1917    /// ```ignore
1918    /// async fn abandon_work_item(&self, token: &str, delay: Option<Duration>) -> Result<(), ProviderError> {
1919    ///     let visible_at = delay.map(|d| now() + d).unwrap_or(now());
1920    ///     
1921    ///     let result = UPDATE worker_queue
1922    ///                  SET lock_token = NULL, locked_until = NULL, visible_at = ?visible_at
1923    ///                  WHERE lock_token = ?token;
1924    ///     
1925    ///     if result.rows_affected == 0:
1926    ///         return Err(ProviderError::permanent("Invalid lock token"))
1927    ///     
1928    ///     Ok(())
1929    /// }
1930    /// ```
1931    ///
1932    /// # Parameters
1933    ///
1934    /// * `token` - Lock token from fetch_work_item
1935    /// * `delay` - Optional delay before work item becomes visible again
1936    /// * `ignore_attempt` - If true, decrement the attempt count (never below 0)
1937    async fn abandon_work_item(
1938        &self,
1939        token: &str,
1940        delay: Option<Duration>,
1941        ignore_attempt: bool,
1942    ) -> Result<(), ProviderError>;
1943
1944    /// Renew the lock on an orchestration item.
1945    ///
1946    /// Extends the instance lock timeout, preventing lock expiration during
1947    /// long-running orchestration turns.
1948    ///
1949    /// # Parameters
1950    ///
1951    /// * `token` - Lock token from fetch_orchestration_item
1952    /// * `extend_for` - Duration to extend lock from now
1953    ///
1954    /// # Returns
1955    ///
1956    /// * `Ok(())` - Lock renewed successfully
1957    /// * `Err(ProviderError)` - Lock renewal failed (invalid token, expired, etc.)
1958    ///
1959    /// # Implementation Pattern
1960    ///
1961    /// ```ignore
1962    /// async fn renew_orchestration_item_lock(&self, token: &str, extend_for: Duration) -> Result<(), ProviderError> {
1963    ///     let locked_until = now() + extend_for;
1964    ///     
1965    ///     let result = UPDATE instance_locks
1966    ///                  SET locked_until = ?locked_until
1967    ///                  WHERE lock_token = ?token
1968    ///                    AND locked_until > now();
1969    ///     
1970    ///     if result.rows_affected == 0:
1971    ///         return Err(ProviderError::permanent("Lock token invalid or expired"))
1972    ///     
1973    ///     Ok(())
1974    /// }
1975    /// ```
1976    async fn renew_orchestration_item_lock(&self, token: &str, extend_for: Duration) -> Result<(), ProviderError>;
1977
1978    // ===== Optional Management APIs =====
1979    // These have default implementations and are primarily used for testing/debugging.
1980
1981    /// Enqueue a work item to the orchestrator queue.
1982    ///
1983    /// # Purpose
1984    ///
1985    /// Used by runtime for:
1986    /// - External events: `Client.raise_event()` → enqueues WorkItem::ExternalRaised
1987    /// - Cancellation: `Client.cancel_instance()` → enqueues WorkItem::CancelInstance
1988    /// - Testing: Direct queue manipulation
1989    ///
1990    /// **Note:** In normal operation, orchestrator items are enqueued via `ack_orchestration_item`.
1991    ///
1992    /// # Parameters
1993    ///
1994    /// * `item` - WorkItem to enqueue (usually ExternalRaised or CancelInstance)
1995    /// * `delay` - Optional visibility delay
1996    ///   - `None`: Immediate visibility (common case)
1997    ///   - `Some(duration)`: Delay visibility by the specified duration
1998    ///   - Providers without delayed_visibility support can ignore this (treat as None)
1999    ///
2000    /// # Implementation Pattern
2001    ///
2002    /// ```ignore
2003    /// async fn enqueue_for_orchestrator(&self, item: WorkItem, delay: Option<Duration>) -> Result<(), String> {
2004    ///     let instance = extract_instance(&item);  // See WorkItem docs for extraction
2005    ///     let work_json = serde_json::to_string(&item)?;
2006    ///     
2007    ///     let visible_at = if let Some(delay) = delay {
2008    ///         now() + delay
2009    ///     } else {
2010    ///         now()
2011    ///     };
2012    ///     
2013    ///     // ⚠️ DO NOT create instance here - runtime will create it via ack_orchestration_item metadata
2014    ///     // Instance creation happens when runtime acknowledges the first turn with ExecutionMetadata
2015    ///     
2016    ///     INSERT INTO orchestrator_queue (instance_id, work_item, visible_at, lock_token, locked_until)
2017    ///     VALUES (instance, work_json, visible_at, NULL, NULL);
2018    ///     
2019    ///     Ok(())
2020    /// }
2021    /// ```
2022    ///
2023    /// # Special Cases
2024    ///
2025    /// **StartOrchestration:**
2026    /// - ⚠️ DO NOT create instance here - instance creation happens via `ack_orchestration_item()` metadata
2027    /// - Just enqueue the work item; runtime will create instance when acknowledging the first turn
2028    ///
2029    /// **ExternalRaised:**
2030    /// - Can arrive before instance is started (race condition)
2031    /// - Should still enqueue - runtime will handle gracefully
2032    ///
2033    /// # Error Handling
2034    ///
2035    /// Return Err if storage fails. Return Ok if item was enqueued successfully.
2036    async fn enqueue_for_orchestrator(&self, _item: WorkItem, _delay: Option<Duration>) -> Result<(), ProviderError>;
2037
2038    // - Add timeout parameter to fetch_work_item and dequeue_timer_peek_lock
2039    // - Add refresh_worker_lock(token, extend_ms) and refresh_timer_lock(token, extend_ms)
2040    // - Provider should auto-abandon messages if lock expires without ack
2041    // This would enable graceful handling of worker crashes and long-running activities
2042
2043    // ===== Capability Discovery =====
2044
2045    /// Check if this provider implements management capabilities.
2046    ///
2047    /// # Purpose
2048    ///
2049    /// This method enables automatic capability discovery by the `Client`.
2050    /// When a provider implements `ProviderAdmin`, it should return
2051    /// `Some(self as &dyn ProviderAdmin)` to expose management features.
2052    ///
2053    /// # Default Implementation
2054    ///
2055    /// Returns `None` (no management capabilities). Override to expose capabilities:
2056    ///
2057    /// ```ignore
2058    /// impl Provider for MyProvider {
2059    ///     fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
2060    ///         Some(self as &dyn ProviderAdmin)
2061    ///     }
2062    /// }
2063    /// ```
2064    ///
2065    /// # Usage
2066    ///
2067    /// The `Client` automatically discovers capabilities:
2068    ///
2069    /// ```ignore
2070    /// let client = Client::new(provider);
2071    /// if client.has_management_capability() {
2072    ///     let instances = client.list_all_instances().await?;
2073    /// }
2074    /// ```
2075    fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
2076        None
2077    }
2078
2079    /// Lightweight check for custom status changes.
2080    ///
2081    /// Returns `Some((custom_status, version))` if `custom_status_version > last_seen_version`,
2082    /// `None` if unchanged. Used by `Client::wait_for_status_change()` polling loop.
2083    ///
2084    /// # Parameters
2085    ///
2086    /// * `instance` - Instance ID to check
2087    /// * `last_seen_version` - The version the caller last observed
2088    ///
2089    /// # Returns
2090    ///
2091    /// * `Ok(Some((custom_status, version)))` - Status has changed
2092    /// * `Ok(None)` - No change since `last_seen_version`
2093    /// * `Err(ProviderError)` - Storage error
2094    async fn get_custom_status(
2095        &self,
2096        _instance: &str,
2097        _last_seen_version: u64,
2098    ) -> Result<Option<(Option<String>, u64)>, ProviderError>;
2099}
2100
2101/// Management and observability provider interface.
2102pub mod management;
2103
2104pub mod instrumented;
2105
2106/// SQLite-backed provider with full transactional support.
2107///
2108/// Enable with the `sqlite` feature:
2109/// ```toml
2110/// duroxide = { version = "0.1", features = ["sqlite"] }
2111/// ```
2112#[cfg(feature = "sqlite")]
2113pub mod sqlite;
2114
2115// Re-export management types for convenience
2116pub use management::{
2117    DeleteInstanceResult, ExecutionInfo, InstanceFilter, InstanceInfo, InstanceTree, ManagementProvider, PruneOptions,
2118    PruneResult, QueueDepths, SystemMetrics,
2119};
2120
2121/// Administrative capability trait for observability and management operations.
2122///
2123/// This trait provides rich management and observability features that extend
2124/// the core `Provider` functionality. Providers can implement this trait to
2125/// expose administrative capabilities to the `Client`.
2126///
2127/// # Automatic Discovery
2128///
2129/// The `Client` automatically discovers this capability through the
2130/// `Provider::as_management_capability()` method. When available, management
2131/// methods become accessible through the client.
2132///
2133/// # Implementation Guide for LLMs
2134///
2135/// When implementing a new provider, you can optionally implement this trait
2136/// to expose management features:
2137///
2138/// ```ignore
2139/// impl Provider for MyProvider {
2140///     // ... implement required Provider methods
2141///     
2142///     fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
2143///         Some(self as &dyn ProviderAdmin)
2144///     }
2145/// }
2146///
2147/// impl ProviderAdmin for MyProvider {
2148///     async fn list_instances(&self) -> Result<Vec<String>, String> {
2149///         // Query your storage for all instance IDs
2150///         Ok(vec!["instance-1".to_string(), "instance-2".to_string()])
2151///     }
2152///     
2153///     async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, String> {
2154///         // Query instance metadata from your storage
2155///         Ok(InstanceInfo {
2156///             instance_id: instance.to_string(),
2157///             orchestration_name: "ProcessOrder".to_string(),
2158///             orchestration_version: "1.0.0".to_string(),
2159///             current_execution_id: 1,
2160///             status: "Running".to_string(),
2161///             output: None,
2162///             created_at: 1234567890,
2163///             updated_at: 1234567890,
2164///         })
2165///     }
2166///     
2167///     // ... implement other management methods
2168/// }
2169/// ```
2170///
2171/// # Required Methods (8 total)
2172///
2173/// 1. **Instance Discovery (2 methods)**
2174///    - `list_instances()` - List all instance IDs
2175///    - `list_instances_by_status()` - Filter instances by status
2176///
2177/// 2. **Execution Inspection (3 methods)**
2178///    - `list_executions()` - List execution IDs for an instance
2179///    - `read_execution()` - Read history for a specific execution
2180///    - `latest_execution_id()` - Get the latest execution ID
2181///
2182/// 3. **Metadata Access (2 methods)**
2183///    - `get_instance_info()` - Get comprehensive instance information
2184///    - `get_execution_info()` - Get detailed execution information
2185///
2186/// 4. **System Metrics (2 methods)**
2187///    - `get_system_metrics()` - Get system-wide metrics
2188///    - `get_queue_depths()` - Get current queue depths
2189///
2190/// # Usage
2191///
2192/// ```ignore
2193/// let client = Client::new(provider);
2194///
2195/// // Check if management features are available
2196/// if client.has_management_capability() {
2197///     let instances = client.list_all_instances().await?;
2198///     let metrics = client.get_system_metrics().await?;
2199///     println!("System has {} instances", metrics.total_instances);
2200/// } else {
2201///     println!("Management features not available");
2202/// }
2203/// ```
2204#[async_trait::async_trait]
2205pub trait ProviderAdmin: Any + Send + Sync {
2206    // ===== Instance Discovery =====
2207
2208    /// List all known instance IDs.
2209    ///
2210    /// # Returns
2211    ///
2212    /// Vector of instance IDs, typically sorted by creation time (newest first).
2213    ///
2214    /// # Use Cases
2215    ///
2216    /// - Admin dashboards showing all workflows
2217    /// - Bulk operations across instances
2218    /// - Testing (verify instance creation)
2219    ///
2220    /// # Implementation Example
2221    ///
2222    /// ```ignore
2223    /// async fn list_instances(&self) -> Result<Vec<String>, String> {
2224    ///     SELECT instance_id FROM instances ORDER BY created_at DESC
2225    /// }
2226    /// ```
2227    async fn list_instances(&self) -> Result<Vec<String>, ProviderError>;
2228
2229    /// List instances matching a status filter.
2230    ///
2231    /// # Parameters
2232    ///
2233    /// * `status` - Filter by execution status: "Running", "Completed", "Failed", "ContinuedAsNew"
2234    ///
2235    /// # Returns
2236    ///
2237    /// Vector of instance IDs with the specified status.
2238    ///
2239    /// # Implementation Example
2240    ///
2241    /// ```ignore
2242    /// async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, String> {
2243    ///     SELECT i.instance_id FROM instances i
2244    ///     JOIN executions e ON i.instance_id = e.instance_id AND i.current_execution_id = e.execution_id
2245    ///     WHERE e.status = ?
2246    ///     ORDER BY i.created_at DESC
2247    /// }
2248    /// ```
2249    async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError>;
2250
2251    // ===== Execution Inspection =====
2252
2253    /// List all execution IDs for an instance.
2254    ///
2255    /// # Returns
2256    ///
2257    /// Vector of execution IDs in ascending order: `[1]`, `[1, 2]`, `[1, 2, 3]`, etc.
2258    ///
2259    /// # Multi-Execution Context
2260    ///
2261    /// When an orchestration uses ContinueAsNew, multiple executions exist:
2262    /// - Execution 1: Original execution
2263    /// - Execution 2: First ContinueAsNew
2264    /// - Execution 3: Second ContinueAsNew
2265    /// - etc.
2266    ///
2267    /// # Implementation Example
2268    ///
2269    /// ```ignore
2270    /// async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, String> {
2271    ///     SELECT execution_id FROM executions
2272    ///     WHERE instance_id = ?
2273    ///     ORDER BY execution_id
2274    /// }
2275    /// ```
2276    async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError>;
2277
2278    /// Read the full event history for a specific execution within an instance.
2279    ///
2280    /// # Parameters
2281    ///
2282    /// * `instance` - The ID of the orchestration instance.
2283    /// * `execution_id` - The specific execution ID to read history for.
2284    ///
2285    /// # Returns
2286    ///
2287    /// Vector of events in chronological order (oldest first).
2288    ///
2289    /// # Implementation Example
2290    ///
2291    /// ```ignore
2292    /// async fn read_execution(&self, instance: &str, execution_id: u64) -> Result<Vec<Event>, String> {
2293    ///     SELECT event_data FROM history
2294    ///     WHERE instance_id = ? AND execution_id = ?
2295    ///     ORDER BY event_id
2296    /// }
2297    /// ```
2298    async fn read_history_with_execution_id(
2299        &self,
2300        instance: &str,
2301        execution_id: u64,
2302    ) -> Result<Vec<Event>, ProviderError>;
2303
2304    /// Read the full event history for the latest execution of an instance.
2305    ///
2306    /// # Parameters
2307    ///
2308    /// * `instance` - The ID of the orchestration instance.
2309    ///
2310    /// # Returns
2311    ///
2312    /// Vector of events in chronological order (oldest first) for the latest execution.
2313    ///
2314    /// # Implementation
2315    ///
2316    /// This method gets the latest execution ID and delegates to `read_history_with_execution_id`.
2317    ///
2318    /// ```ignore
2319    /// async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
2320    ///     let execution_id = self.latest_execution_id(instance).await?;
2321    ///     self.read_history_with_execution_id(instance, execution_id).await
2322    /// }
2323    /// ```
2324    async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError>;
2325
2326    /// Get the latest (current) execution ID for an instance.
2327    ///
2328    /// # Parameters
2329    ///
2330    /// * `instance` - The ID of the orchestration instance.
2331    ///
2332    /// # Implementation Pattern
2333    ///
2334    /// ```ignore
2335    /// async fn latest_execution_id(&self, instance: &str) -> Result<u64, String> {
2336    ///     SELECT COALESCE(MAX(execution_id), 1) FROM executions WHERE instance_id = ?
2337    /// }
2338    /// ```
2339    async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError>;
2340
2341    // ===== Instance Metadata =====
2342
2343    /// Get comprehensive information about an instance.
2344    ///
2345    /// # Parameters
2346    ///
2347    /// * `instance` - The ID of the orchestration instance.
2348    ///
2349    /// # Returns
2350    ///
2351    /// Detailed instance information including status, output, and metadata.
2352    ///
2353    /// # Implementation Example
2354    ///
2355    /// ```ignore
2356    /// async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, String> {
2357    ///     SELECT i.*, e.status, e.output
2358    ///     FROM instances i
2359    ///     JOIN executions e ON i.instance_id = e.instance_id AND i.current_execution_id = e.execution_id
2360    ///     WHERE i.instance_id = ?
2361    /// }
2362    /// ```
2363    async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError>;
2364
2365    /// Get detailed information about a specific execution.
2366    ///
2367    /// # Parameters
2368    ///
2369    /// * `instance` - The ID of the orchestration instance.
2370    /// * `execution_id` - The specific execution ID.
2371    ///
2372    /// # Returns
2373    ///
2374    /// Detailed execution information including status, output, and event count.
2375    ///
2376    /// # Implementation Example
2377    ///
2378    /// ```ignore
2379    /// async fn get_execution_info(&self, instance: &str, execution_id: u64) -> Result<ExecutionInfo, String> {
2380    ///     SELECT e.*, COUNT(h.event_id) as event_count
2381    ///     FROM executions e
2382    ///     LEFT JOIN history h ON e.instance_id = h.instance_id AND e.execution_id = h.execution_id
2383    ///     WHERE e.instance_id = ? AND e.execution_id = ?
2384    ///     GROUP BY e.execution_id
2385    /// }
2386    /// ```
2387    async fn get_execution_info(&self, instance: &str, execution_id: u64) -> Result<ExecutionInfo, ProviderError>;
2388
2389    // ===== System Metrics =====
2390
2391    /// Get system-wide metrics for the orchestration engine.
2392    ///
2393    /// # Returns
2394    ///
2395    /// System metrics including instance counts, execution counts, and status breakdown.
2396    ///
2397    /// # Implementation Example
2398    ///
2399    /// ```text
2400    /// async fn get_system_metrics(&self) -> Result<SystemMetrics, String> {
2401    ///     SELECT
2402    ///         COUNT(*) as total_instances,
2403    ///         SUM(CASE WHEN e.status = 'Running' THEN 1 ELSE 0 END) as running_instances,
2404    ///         SUM(CASE WHEN e.status = 'Completed' THEN 1 ELSE 0 END) as completed_instances,
2405    ///         SUM(CASE WHEN e.status = 'Failed' THEN 1 ELSE 0 END) as failed_instances
2406    ///     FROM instances i
2407    ///     JOIN executions e ON i.instance_id = e.instance_id AND i.current_execution_id = e.execution_id
2408    /// }
2409    /// ```
2410    async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError>;
2411
2412    /// Get the current depths of the internal work queues.
2413    ///
2414    /// # Returns
2415    ///
2416    /// Queue depths for orchestrator and worker queues.
2417    ///
2418    /// **Note:** Timer queue depth is not applicable since timers are handled via
2419    /// delayed visibility in the orchestrator queue.
2420    ///
2421    /// # Implementation Example
2422    ///
2423    /// ```ignore
2424    /// async fn get_queue_depths(&self) -> Result<QueueDepths, String> {
2425    ///     SELECT
2426    ///         (SELECT COUNT(*) FROM orchestrator_queue WHERE lock_token IS NULL) as orchestrator_queue,
2427    ///         (SELECT COUNT(*) FROM worker_queue WHERE lock_token IS NULL) as worker_queue
2428    /// }
2429    /// ```
2430    async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError>;
2431
2432    // ===== Hierarchy Primitive Operations =====
2433    // These are simple database operations that providers MUST implement.
2434    // Composite operations like get_instance_tree and delete_instance have
2435    // default implementations that use these primitives.
2436
2437    /// List direct children of an instance.
2438    ///
2439    /// Returns instance IDs that have `parent_instance_id = instance_id`.
2440    /// Returns empty vec if instance has no children or doesn't exist.
2441    ///
2442    /// # Implementation Example
2443    ///
2444    /// ```ignore
2445    /// async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
2446    ///     SELECT instance_id FROM instances WHERE parent_instance_id = ?
2447    /// }
2448    /// ```
2449    async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError>;
2450
2451    /// Get the parent instance ID.
2452    ///
2453    /// Returns `Some(parent_id)` for sub-orchestrations, `None` for root orchestrations.
2454    /// Returns `Err` if instance doesn't exist.
2455    ///
2456    /// # Implementation Example
2457    ///
2458    /// ```ignore
2459    /// async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
2460    ///     SELECT parent_instance_id FROM instances WHERE instance_id = ?
2461    /// }
2462    /// ```
2463    async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError>;
2464
2465    /// Atomically delete a batch of instances.
2466    ///
2467    /// # Parameters
2468    ///
2469    /// * `ids` - Instance IDs to delete.
2470    /// * `force` - If true, delete regardless of status. If false, all instances must be terminal.
2471    ///
2472    /// # Provider Contract (MUST implement)
2473    ///
2474    /// 1. **Atomicity**: All deletions MUST be atomic (all-or-nothing).
2475    ///    If any instance fails validation, the entire batch MUST be rolled back.
2476    ///
2477    /// 2. **Force semantics**:
2478    ///    - `force=false`: Return error if ANY instance has `status = 'Running'`
2479    ///    - `force=true`: Delete regardless of status (for stuck/abandoned instances)
2480    ///
2481    /// 3. **Orphan detection**: Atomically check for children not in `ids`:
2482    ///    ```sql
2483    ///    SELECT instance_id FROM instances
2484    ///    WHERE parent_instance_id IN (?) AND instance_id NOT IN (?)
2485    ///    ```
2486    ///    If any exist, return error (race condition: new child spawned after get_instance_tree).
2487    ///
2488    /// 4. **Complete cleanup**: Remove ALL related data in a single atomic operation:
2489    ///    - event history
2490    ///    - executions
2491    ///    - orchestrator queue entries
2492    ///    - worker queue entries
2493    ///    - instance locks
2494    ///    - instance metadata
2495    ///
2496    /// 5. **Prevent ack recreation**: When force-deleting Running instances, the removal
2497    ///    of the instance lock prevents in-flight `ack_orchestration_item` from recreating state.
2498    ///
2499    /// # Race Condition Protection
2500    ///
2501    /// This method MUST validate within the transaction that no orphans would be created:
2502    /// - If any instance has `parent_instance_id` pointing to an instance in `ids`,
2503    ///   but that child is NOT also in `ids`, return an error.
2504    /// - This prevents orphans from children spawned between `get_instance_tree()` and
2505    ///   this call.
2506    ///
2507    /// # Transaction Semantics
2508    ///
2509    /// All instances must be deleted atomically (all-or-nothing).
2510    /// The orphan check must be done within the transaction to prevent TOCTOU races.
2511    ///
2512    /// # Implementation Notes
2513    ///
2514    /// - Delete all related data: event history, executions, orchestrator queue, worker queue, instance locks, instance metadata
2515    /// - Order within transaction doesn't matter for correctness (single atomic transaction)
2516    /// - Count deletions and aggregate into result
2517    async fn delete_instances_atomic(&self, ids: &[String], force: bool)
2518    -> Result<DeleteInstanceResult, ProviderError>;
2519
2520    // ===== Hierarchy Composite Operations =====
2521    // These have default implementations using the primitives above.
2522    // Providers can override for better performance if needed.
2523
2524    /// Get the full instance tree rooted at the given instance.
2525    ///
2526    /// Returns all instances in the tree: the root, all children, grandchildren, etc.
2527    ///
2528    /// Default implementation uses `list_children` recursively.
2529    async fn get_instance_tree(&self, instance_id: &str) -> Result<InstanceTree, ProviderError> {
2530        let mut all_ids = vec![];
2531
2532        // BFS to collect all descendants
2533        let mut to_process = vec![instance_id.to_string()];
2534        while let Some(parent_id) = to_process.pop() {
2535            all_ids.push(parent_id.clone());
2536            let children = self.list_children(&parent_id).await?;
2537            to_process.extend(children);
2538        }
2539
2540        Ok(InstanceTree {
2541            root_id: instance_id.to_string(),
2542            all_ids,
2543        })
2544    }
2545
2546    // ===== Deletion/Pruning Operations =====
2547
2548    /// Delete a single orchestration instance and all its associated data.
2549    ///
2550    /// This removes the instance, all executions, all history events, and any
2551    /// pending queue messages (orchestrator, worker, timer).
2552    ///
2553    /// # Default Implementation
2554    ///
2555    /// Uses primitives: `get_parent_id`, `get_instance_tree`, `delete_instances_atomic`.
2556    /// Providers can override for better performance if needed.
2557    ///
2558    /// # Parameters
2559    ///
2560    /// * `instance_id` - The ID of the instance to delete.
2561    /// * `force` - If true, delete even if the instance is in Running state.
2562    ///   WARNING: Force delete only removes stored state; it does NOT cancel
2563    ///   in-flight tokio tasks. Use `cancel_instance` first for graceful termination.
2564    ///
2565    /// # Returns
2566    ///
2567    /// * `Ok(DeleteResult)` - Details of what was deleted.
2568    /// * `Err(ProviderError)` with `is_retryable() = false` for:
2569    ///   - Instance is running and force=false
2570    ///   - Instance is a sub-orchestration (must delete root instead)
2571    ///   - Instance not found
2572    ///
2573    /// # Safety
2574    ///
2575    /// - Deleting a running instance with force=true may cause in-flight operations
2576    ///   to fail when they try to persist state.
2577    /// - Sub-orchestrations cannot be deleted directly; delete the root to cascade.
2578    /// - The instance lock entry is removed to prevent zombie recreation.
2579    async fn delete_instance(&self, instance_id: &str, force: bool) -> Result<DeleteInstanceResult, ProviderError> {
2580        // Step 1: Check if this is a sub-orchestration
2581        let parent = self.get_parent_id(instance_id).await?;
2582        if parent.is_some() {
2583            return Err(ProviderError::permanent(
2584                "delete_instance",
2585                format!("Cannot delete sub-orchestration {instance_id} directly. Delete root instance instead."),
2586            ));
2587        }
2588
2589        // Step 2: Get full tree (includes all descendants)
2590        let tree = self.get_instance_tree(instance_id).await?;
2591
2592        // Step 3: Atomic delete of entire tree
2593        self.delete_instances_atomic(&tree.all_ids, force).await
2594    }
2595
2596    /// Delete multiple orchestration instances matching the filter criteria.
2597    ///
2598    /// Only instances in terminal states (Completed, Failed) are eligible.
2599    /// Running instances are silently skipped (not an error).
2600    ///
2601    /// # Parameters
2602    ///
2603    /// * `filter` - Criteria for selecting instances to delete. All criteria are ANDed.
2604    ///
2605    /// # Filter Behavior
2606    ///
2607    /// - `instance_ids`: Allowlist of specific IDs to consider
2608    /// - `completed_before`: Only delete instances completed before this timestamp (ms)
2609    /// - `limit`: Maximum number of instances to delete (applied last)
2610    ///
2611    /// # Returns
2612    ///
2613    /// Aggregated counts of all deleted data across all deleted instances.
2614    ///
2615    /// # Safety
2616    ///
2617    /// - Running instances are NEVER deleted (silently skipped)
2618    /// - Use `limit` to avoid long-running transactions
2619    /// - Sub-orchestrations are skipped (only roots are deleted with cascade)
2620    async fn delete_instance_bulk(&self, filter: InstanceFilter) -> Result<DeleteInstanceResult, ProviderError>;
2621
2622    /// Prune old executions from a single instance.
2623    ///
2624    /// Use this for `ContinueAsNew` workflows that accumulate many executions.
2625    ///
2626    /// # Parameters
2627    ///
2628    /// * `instance_id` - The instance to prune executions from.
2629    /// * `options` - Criteria for selecting executions to delete. All criteria are ANDed.
2630    ///
2631    /// # Provider Contract (MUST implement)
2632    ///
2633    /// 1. **Current execution protection**: The current execution of the instance
2634    ///    MUST NEVER be pruned, regardless of options.
2635    ///
2636    /// 2. **Running execution protection**: Executions with status 'Running'
2637    ///    MUST NEVER be pruned.
2638    ///
2639    /// 3. **Atomicity**: All deletions for a single prune call MUST be atomic
2640    ///    (all-or-nothing).
2641    ///
2642    /// # `keep_last` Semantics
2643    ///
2644    /// Since current_execution_id is always the highest execution_id:
2645    /// - `keep_last: None` → prune all except current
2646    /// - `keep_last: Some(0)` → same as None (current is protected)
2647    /// - `keep_last: Some(1)` → same as above (top 1 = current)
2648    /// - `keep_last: Some(N)` → keep current + (N-1) most recent
2649    ///
2650    /// **All three (`None`, `Some(0)`, `Some(1)`) are equivalent** because the
2651    /// current execution is always protected regardless of this setting.
2652    ///
2653    /// # Implementation Example
2654    ///
2655    /// ```sql
2656    /// DELETE FROM executions
2657    /// WHERE instance_id = ?
2658    ///   AND execution_id != ?  -- current_execution_id (CRITICAL)
2659    ///   AND status != 'Running'
2660    ///   AND execution_id NOT IN (
2661    ///     SELECT execution_id FROM executions
2662    ///     WHERE instance_id = ?
2663    ///     ORDER BY execution_id DESC
2664    ///     LIMIT ?  -- keep_last
2665    ///   )
2666    /// ```
2667    async fn prune_executions(&self, instance_id: &str, options: PruneOptions) -> Result<PruneResult, ProviderError>;
2668
2669    /// Prune old executions from multiple instances matching the filter.
2670    ///
2671    /// Applies the same prune options to all matching instances.
2672    ///
2673    /// # Parameters
2674    ///
2675    /// * `filter` - Criteria for selecting instances to process.
2676    /// * `options` - Criteria for selecting executions to delete within each instance.
2677    ///
2678    /// # Returns
2679    ///
2680    /// Aggregated counts including how many instances were processed.
2681    async fn prune_executions_bulk(
2682        &self,
2683        filter: InstanceFilter,
2684        options: PruneOptions,
2685    ) -> Result<PruneResult, ProviderError>;
2686}
2687
2688#[cfg(test)]
2689mod tests {
2690    use super::*;
2691
2692    #[test]
2693    fn default_only_matches_none_rejects_tag() {
2694        let f = TagFilter::DefaultOnly;
2695        assert!(f.matches(None));
2696        assert!(!f.matches(Some("gpu")));
2697    }
2698
2699    #[test]
2700    fn tags_matches_member_rejects_others() {
2701        let f = TagFilter::tags(["gpu"]);
2702        assert!(f.matches(Some("gpu")));
2703        assert!(!f.matches(Some("cpu")));
2704        assert!(!f.matches(None));
2705    }
2706
2707    #[test]
2708    fn default_and_matches_both() {
2709        let f = TagFilter::default_and(["gpu"]);
2710        assert!(f.matches(None));
2711        assert!(f.matches(Some("gpu")));
2712        assert!(!f.matches(Some("cpu")));
2713    }
2714
2715    #[test]
2716    fn none_matches_nothing() {
2717        let f = TagFilter::None;
2718        assert!(!f.matches(None));
2719        assert!(!f.matches(Some("x")));
2720    }
2721
2722    #[test]
2723    fn any_matches_everything() {
2724        let f = TagFilter::Any;
2725        assert!(f.matches(None));
2726        assert!(f.matches(Some("gpu")));
2727        assert!(f.matches(Some("cpu")));
2728        assert!(f.matches(Some("anything")));
2729    }
2730
2731    #[test]
2732    fn default_impl_is_default_only() {
2733        assert_eq!(TagFilter::default(), TagFilter::DefaultOnly);
2734    }
2735
2736    #[test]
2737    #[should_panic(expected = "at most 5 tags")]
2738    fn panics_over_max_tags() {
2739        TagFilter::tags(["a", "b", "c", "d", "e", "f"]);
2740    }
2741
2742    #[test]
2743    #[should_panic(expected = "requires at least one tag")]
2744    fn panics_on_empty_tags() {
2745        TagFilter::tags(Vec::<String>::new());
2746    }
2747
2748    #[test]
2749    #[should_panic(expected = "requires at least one tag")]
2750    fn panics_on_empty_default_and() {
2751        TagFilter::default_and(Vec::<String>::new());
2752    }
2753}