duroxide/providers/mod.rs
1use crate::Event;
2use std::any::Any;
3use std::collections::HashSet;
4use std::time::Duration;
5
6pub mod error;
7pub use error::ProviderError;
8
9/// Maximum number of tags a worker can subscribe to.
10pub use crate::runtime::limits::MAX_WORKER_TAGS;
11
12/// Filter for which activity tags a worker will process.
13///
14/// Activity tags route work items to specialized workers. A tag is an
15/// opaque string label set at schedule time via `.with_tag()`. Workers
16/// declare which tags they accept via [`crate::RuntimeOptions`]`.worker_tag_filter`.
17///
18/// # Variants
19///
20/// - `DefaultOnly` (default) — process only activities with no tag.
21/// - `Tags(["gpu"])` — process only activities tagged `"gpu"`.
22/// - `DefaultAnd(["gpu"])` — process activities with no tag OR tagged `"gpu"`.
23/// - `Any` — process all activities regardless of tag.
24/// - `None` — don't process any activities (orchestrator-only mode).
25#[derive(Debug, Clone, PartialEq, Eq, Default)]
26pub enum TagFilter {
27 /// Process only activities with no tag (default).
28 #[default]
29 DefaultOnly,
30
31 /// Process only activities with the specified tags (NOT default).
32 /// Limited to [`MAX_WORKER_TAGS`] (5) tags.
33 Tags(HashSet<String>),
34
35 /// Process activities with no tag AND the specified tags.
36 /// Limited to [`MAX_WORKER_TAGS`] (5) tags.
37 DefaultAnd(HashSet<String>),
38
39 /// Process all activities regardless of tag (generalist worker).
40 Any,
41
42 /// Don't process any activities (orchestrator-only mode).
43 None,
44}
45
46impl TagFilter {
47 /// Create a filter for specific tags only.
48 ///
49 /// # Panics
50 /// Panics if more than [`MAX_WORKER_TAGS`] (5) tags are provided.
51 pub fn tags<I, S>(tags: I) -> Self
52 where
53 I: IntoIterator<Item = S>,
54 S: Into<String>,
55 {
56 let set: HashSet<String> = tags.into_iter().map(Into::into).collect();
57 assert!(
58 !set.is_empty(),
59 "TagFilter::tags() requires at least one tag; use TagFilter::None for no activities"
60 );
61 assert!(
62 set.len() <= MAX_WORKER_TAGS,
63 "Worker can subscribe to at most {} tags, got {}",
64 MAX_WORKER_TAGS,
65 set.len()
66 );
67 TagFilter::Tags(set)
68 }
69
70 /// Create a filter for default plus specific tags.
71 ///
72 /// # Panics
73 /// Panics if more than [`MAX_WORKER_TAGS`] (5) tags are provided.
74 pub fn default_and<I, S>(tags: I) -> Self
75 where
76 I: IntoIterator<Item = S>,
77 S: Into<String>,
78 {
79 let set: HashSet<String> = tags.into_iter().map(Into::into).collect();
80 assert!(
81 !set.is_empty(),
82 "TagFilter::default_and() requires at least one tag; use TagFilter::DefaultOnly for untagged only"
83 );
84 assert!(
85 set.len() <= MAX_WORKER_TAGS,
86 "Worker can subscribe to at most {} tags, got {}",
87 MAX_WORKER_TAGS,
88 set.len()
89 );
90 TagFilter::DefaultAnd(set)
91 }
92
93 /// Create a filter for only untagged activities.
94 pub fn default_only() -> Self {
95 TagFilter::DefaultOnly
96 }
97
98 /// Create a filter that processes no activities.
99 pub fn none() -> Self {
100 TagFilter::None
101 }
102
103 /// Create a filter that processes all activities regardless of tag.
104 pub fn any() -> Self {
105 TagFilter::Any
106 }
107
108 /// Check if an activity with the given tag matches this filter.
109 pub fn matches(&self, tag: Option<&str>) -> bool {
110 match (self, tag) {
111 (TagFilter::Any, _) => true,
112 (TagFilter::None, _) => false,
113 (TagFilter::DefaultOnly, Option::None) => true,
114 (TagFilter::DefaultOnly, Some(_)) => false,
115 (TagFilter::Tags(_), Option::None) => false,
116 (TagFilter::Tags(set), Some(t)) => set.contains(t),
117 (TagFilter::DefaultAnd(_), Option::None) => true,
118 (TagFilter::DefaultAnd(set), Some(t)) => set.contains(t),
119 }
120 }
121}
122
123/// Returns the current build version of the duroxide crate.
124pub fn current_build_version() -> semver::Version {
125 semver::Version::parse(env!("CARGO_PKG_VERSION")).expect("CARGO_PKG_VERSION must be valid semver")
126}
127
128/// An inclusive version range: [min, max].
129///
130/// Both bounds are inclusive. For example, `SemverRange { min: (0,0,0), max: (1,5,0) }`
131/// matches any version `v` where `0.0.0 <= v <= 1.5.0`.
132#[derive(Debug, Clone, PartialEq, Eq)]
133pub struct SemverRange {
134 pub min: semver::Version,
135 pub max: semver::Version,
136}
137
138impl SemverRange {
139 pub fn new(min: semver::Version, max: semver::Version) -> Self {
140 Self { min, max }
141 }
142
143 /// Check if a version falls within this range (inclusive on both ends).
144 pub fn contains(&self, version: &semver::Version) -> bool {
145 version >= &self.min && version <= &self.max
146 }
147
148 /// Default range: `>=0.0.0, <=CURRENT_BUILD_VERSION`.
149 ///
150 /// This means the runtime can replay any execution pinned at or below its own
151 /// build version. Replay engines are backward-compatible.
152 pub fn default_for_current_build() -> Self {
153 Self {
154 min: semver::Version::new(0, 0, 0),
155 max: current_build_version(),
156 }
157 }
158}
159
160/// Capability filter passed by the orchestration dispatcher to the provider.
161///
162/// The provider uses this to return only orchestration items whose pinned
163/// `duroxide_version` falls within one of the supported ranges.
164///
165/// If no filter is supplied (`None`), the provider returns any available item
166/// (legacy behavior, useful for admin tooling).
167#[derive(Debug, Clone)]
168pub struct DispatcherCapabilityFilter {
169 /// Supported duroxide version ranges. An execution is compatible if its
170 /// pinned duroxide version falls within ANY of these ranges.
171 ///
172 /// Phase 1 typically has a single range: `[>=0.0.0, <=CURRENT_BUILD_VERSION]`.
173 pub supported_duroxide_versions: Vec<SemverRange>,
174}
175
176impl DispatcherCapabilityFilter {
177 /// Check if a pinned version is compatible with this filter.
178 pub fn is_compatible(&self, version: &semver::Version) -> bool {
179 self.supported_duroxide_versions.iter().any(|r| r.contains(version))
180 }
181
182 /// Build the default filter for the current build version.
183 pub fn default_for_current_build() -> Self {
184 Self {
185 supported_duroxide_versions: vec![SemverRange::default_for_current_build()],
186 }
187 }
188}
189
190/// Identity of an activity for cancellation purposes.
191///
192/// Used by the runtime to specify which activities should be cancelled
193/// during an orchestration turn (e.g., select losers, orchestration termination).
194#[derive(Debug, Clone, PartialEq, Eq)]
195pub struct ScheduledActivityIdentifier {
196 /// Instance ID of the orchestration
197 pub instance: String,
198 /// Execution ID within the instance
199 pub execution_id: u64,
200 /// Activity ID (the event_id of the ActivityScheduled event)
201 pub activity_id: u64,
202}
203
204/// Configuration for session-aware work item fetching.
205///
206/// When passed to [`Provider::fetch_work_item`], enables session routing:
207/// - Non-session items (`session_id IS NULL`) are always eligible
208/// - Items for sessions already owned by `owner_id` are eligible
209/// - Items for unowned (claimable) sessions are eligible
210///
211/// When `fetch_work_item` is called with `session: None`, only non-session
212/// items are returned.
213#[derive(Debug, Clone)]
214pub struct SessionFetchConfig {
215 /// Identity tag for session ownership. All worker slots sharing this
216 /// value are treated as a single owner for session affinity purposes.
217 /// Typically set to `RuntimeOptions::worker_node_id` (process-level)
218 /// or a per-slot UUID (ephemeral).
219 pub owner_id: String,
220 /// How long to hold the session lock when claiming a new session.
221 pub lock_timeout: Duration,
222}
223
224/// Orchestration item containing all data needed to process an instance atomically.
225///
226/// This represents a locked batch of work for a single orchestration instance.
227/// The provider must guarantee that no other process can modify this instance
228/// until `ack_orchestration_item()` or `abandon_orchestration_item()` is called.
229///
230/// # Fields
231///
232/// * `instance` - Unique identifier for the orchestration instance (e.g., "order-123")
233/// * `orchestration_name` - Name of the orchestration being executed (e.g., "ProcessOrder")
234/// * `execution_id` - Current execution ID (starts at 1, increments with ContinueAsNew)
235/// * `version` - Orchestration version string (e.g., "1.0.0")
236/// * `history` - Complete event history for the current execution (ordered by event_id)
237/// * `messages` - Batch of WorkItems to process (may include Start, completions, external events)
238/// * `lock_token` - Unique token that must be used to ack or abandon this batch
239/// * `history_error` - If set, history deserialization failed; `history` may be empty or partial
240///
241/// # Implementation Notes
242///
243/// - The provider must ensure `history` contains ALL events for `execution_id` in order
244/// - For multi-execution instances (ContinueAsNew), only return the LATEST execution's history
245/// - The `lock_token` must be unique and prevent concurrent processing of the same instance
246/// - All messages in the batch should belong to the same instance
247/// - The lock should expire after a timeout (e.g., 30s) to handle worker crashes
248/// - If history deserialization fails, set `history_error` with the error message and return
249/// `Ok(Some(...))` instead of `Err(...)`. This allows the runtime to reach the poison check
250/// and terminate the orchestration after `max_attempts`.
251///
252/// # Example from SQLite Provider
253///
254/// ```text
255/// // 1. Find available instance (check instance_locks for active locks)
256/// SELECT q.instance_id FROM orchestrator_queue q
257/// LEFT JOIN instance_locks il ON q.instance_id = il.instance_id
258/// WHERE q.visible_at <= now()
259/// AND (il.instance_id IS NULL OR il.locked_until <= now())
260/// ORDER BY q.id LIMIT 1
261///
262/// // 2. Atomically acquire instance-level lock
263/// INSERT INTO instance_locks (instance_id, lock_token, locked_until, locked_at)
264/// VALUES (?, ?, ?, ?)
265/// ON CONFLICT(instance_id) DO UPDATE
266/// SET lock_token = excluded.lock_token, locked_until = excluded.locked_until
267/// WHERE locked_until <= excluded.locked_at
268///
269/// // 3. Lock all visible messages for that instance
270/// UPDATE orchestrator_queue SET lock_token = ?, locked_until = ?
271/// WHERE instance_id = ? AND visible_at <= now()
272///
273/// // 4. Load instance metadata
274/// SELECT orchestration_name, orchestration_version, current_execution_id
275/// FROM instances WHERE instance_id = ?
276///
277/// // 5. Load history for current execution
278/// SELECT event_data FROM history
279/// WHERE instance_id = ? AND execution_id = ?
280/// ORDER BY event_id
281///
282/// // 6. Return OrchestrationItem with lock_token
283/// ```
284#[derive(Debug, Clone)]
285pub struct OrchestrationItem {
286 pub instance: String,
287 pub orchestration_name: String,
288 pub execution_id: u64,
289 pub version: String,
290 pub history: Vec<Event>,
291 pub messages: Vec<WorkItem>,
292 /// If set, history deserialization failed. `history` may be empty or partial.
293 ///
294 /// The runtime treats this as a terminal error: it abandons the item with backoff,
295 /// and once `attempt_count > max_attempts`, poisons the orchestration.
296 ///
297 /// Providers SHOULD return `Ok(Some(...))` with this field set instead of
298 /// `Err(ProviderError::permanent(...))` when deserialization fails after acquiring
299 /// a lock. This ensures the lock lifecycle stays clean (Ok = lock held, Err = no lock).
300 pub history_error: Option<String>,
301}
302
303/// Execution metadata computed by the runtime to be persisted by the provider.
304///
305/// The runtime inspects the `history_delta` and `orchestrator_items` to compute this metadata.
306/// **Providers must NOT inspect event contents themselves** - they should blindly store this metadata.
307///
308/// This design ensures the provider remains a pure storage abstraction without orchestration knowledge.
309///
310/// # Fields
311///
312/// * `status` - New execution status: `Some("Completed")`, `Some("Failed")`, `Some("ContinuedAsNew")`, or `None`
313/// - `None` means the execution is still running (no status update needed)
314/// - Provider should update the stored execution status when `Some(...)`
315///
316/// * `output` - The terminal value to store (depends on status):
317/// - `Completed`: The orchestration's successful result
318/// - `Failed`: The error message
319/// - `ContinuedAsNew`: The input that was passed to continue_as_new()
320/// - `None`: No output (execution still running)
321///
322/// # Example Usage in Provider
323///
324/// ```text
325/// async fn ack_orchestration_item(..., metadata: ExecutionMetadata) {
326/// // Store metadata without understanding what it means
327/// if let Some(status) = &metadata.status {
328/// UPDATE executions SET status = ?, output = ? WHERE instance_id = ? AND execution_id = ?
329/// }
330/// }
331/// ```
332///
333/// # ContinueAsNew Handling
334///
335/// ContinueAsNew is handled entirely by the runtime. Providers must NOT try to
336/// synthesize new executions in `fetch_orchestration_item`.
337///
338/// Runtime behavior:
339/// - When an orchestration calls `continue_as_new(input)`, the runtime stamps
340/// `OrchestrationContinuedAsNew` into the current execution's history and enqueues
341/// a `WorkItem::ContinueAsNew`.
342/// - When processing that work item, the runtime starts a fresh execution with
343/// `execution_id = current + 1`, passes an empty `existing_history`, and stamps an
344/// `OrchestrationStarted { event_id: 1, .. }` event for the new execution.
345/// - The runtime then calls `ack_orchestration_item(lock_token, execution_id, ...)` with
346/// the explicit execution id to persist history and queue operations.
347///
348/// Provider responsibilities:
349/// - Use the explicit `execution_id` given to `ack_orchestration_item`.
350/// - Idempotently create the execution record/entry if it doesn't already exist.
351/// - Update the instance's “current execution” pointer to be at least `execution_id`.
352/// - Append all `history_delta` events to the specified `execution_id`.
353/// - Update `executions.status, executions.output` from `ExecutionMetadata` when provided.
354#[derive(Debug, Clone, Default)]
355pub struct ExecutionMetadata {
356 /// New status for the execution ('Completed', 'Failed', 'ContinuedAsNew', or None to keep current)
357 pub status: Option<String>,
358 /// Output/error/input to store (for Completed/Failed/ContinuedAsNew)
359 pub output: Option<String>,
360 /// Orchestration name (for new instances or updates)
361 pub orchestration_name: Option<String>,
362 /// Orchestration version (for new instances or updates)
363 pub orchestration_version: Option<String>,
364 /// Parent instance ID (for sub-orchestrations, used for cascading delete)
365 pub parent_instance_id: Option<String>,
366 /// Pinned duroxide version for this execution (set from OrchestrationStarted event).
367 ///
368 /// The provider stores this alongside the execution record for efficient
369 /// capability filtering.
370 ///
371 /// - `Some(v)`: Store `v` as the execution's pinned version. The provider should
372 /// update the stored version unconditionally when provided.
373 /// - `None`: No version update requested. The provider should not modify the
374 /// existing stored value.
375 ///
376 /// The **runtime** guarantees this is only set on the first turn of a new execution
377 /// (when `OrchestrationStarted` is in the history delta), enforced by a `debug_assert`
378 /// in the orchestration dispatcher. The provider does not need to enforce write-once
379 /// semantics — it simply stores what it's told.
380 pub pinned_duroxide_version: Option<semver::Version>,
381}
382
383/// Provider-backed work queue items the runtime consumes continually.
384///
385/// WorkItems represent messages that flow through provider-managed queues.
386/// They are serialized/deserialized using serde_json for storage.
387///
388/// # Queue Routing
389///
390/// Different WorkItem types go to different queues:
391/// - `StartOrchestration, ContinueAsNew, ActivityCompleted/Failed, TimerFired, ExternalRaised, SubOrchCompleted/Failed, CancelInstance` → **Orchestrator queue**
392/// - TimerFired items use `visible_at = fire_at_ms` for delayed visibility
393/// - `ActivityExecute` → **Worker queue**
394///
395/// # Instance ID Extraction
396///
397/// Most WorkItems have an `instance` field. Sub-orchestration completions use `parent_instance`.
398/// Providers need to extract the instance ID for routing. SQLite example:
399///
400/// ```ignore
401/// let instance = match &item {
402/// WorkItem::StartOrchestration { instance, .. } |
403/// WorkItem::ActivityCompleted { instance, .. } |
404/// WorkItem::CancelInstance { instance, .. } => instance,
405/// WorkItem::SubOrchCompleted { parent_instance, .. } => parent_instance,
406/// _ => return Err("unexpected item type"),
407/// };
408/// ```
409///
410/// # Execution ID Tracking
411///
412/// WorkItems for activities, timers, and sub-orchestrations include `execution_id`.
413/// This allows providers to route completions to the correct execution when ContinueAsNew creates multiple executions.
414///
415/// **Critical:** Completions with mismatched execution_id should still be enqueued (the runtime filters them).
416#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
417pub enum WorkItem {
418 /// Start a new orchestration instance
419 /// - Instance metadata is created by runtime via ack_orchestration_item metadata (not on enqueue)
420 /// - `execution_id`: The execution ID for this start (usually INITIAL_EXECUTION_ID=1)
421 /// - `version`: None means runtime will resolve from registry
422 StartOrchestration {
423 instance: String,
424 orchestration: String,
425 input: String,
426 version: Option<String>,
427 parent_instance: Option<String>,
428 parent_id: Option<u64>,
429 execution_id: u64,
430 },
431
432 /// Execute an activity (goes to worker queue)
433 /// - `id`: event_id from ActivityScheduled (for correlation)
434 /// - Worker will enqueue ActivityCompleted or ActivityFailed
435 ActivityExecute {
436 instance: String,
437 execution_id: u64,
438 id: u64, // scheduling_event_id from ActivityScheduled
439 name: String,
440 input: String,
441 #[serde(skip_serializing_if = "Option::is_none")]
442 #[serde(default)]
443 session_id: Option<String>,
444 /// Routing tag for directing this activity to specific workers.
445 /// `None` means default (untagged) queue.
446 #[serde(skip_serializing_if = "Option::is_none")]
447 #[serde(default)]
448 tag: Option<String>,
449 },
450
451 /// Activity completed successfully (goes to orchestrator queue)
452 /// - `id`: source_event_id referencing the ActivityScheduled event
453 /// - Triggers next orchestration turn
454 ActivityCompleted {
455 instance: String,
456 execution_id: u64,
457 id: u64, // source_event_id referencing ActivityScheduled
458 result: String,
459 },
460
461 /// Activity failed with error (goes to orchestrator queue)
462 /// - `id`: source_event_id referencing the ActivityScheduled event
463 /// - Triggers next orchestration turn
464 ActivityFailed {
465 instance: String,
466 execution_id: u64,
467 id: u64, // source_event_id referencing ActivityScheduled
468 details: crate::ErrorDetails,
469 },
470
471 /// Timer fired (goes to orchestrator queue with delayed visibility)
472 /// - Created directly by runtime when timer is scheduled
473 /// - Enqueued to orchestrator queue with `visible_at = fire_at_ms`
474 /// - Orchestrator dispatcher processes when `visible_at <= now()`
475 TimerFired {
476 instance: String,
477 execution_id: u64,
478 id: u64, // source_event_id referencing TimerCreated
479 fire_at_ms: u64,
480 },
481
482 /// External event raised (goes to orchestrator queue)
483 /// - Matched by `name` to ExternalSubscribed events
484 /// - `data`: JSON payload from external system
485 ExternalRaised {
486 instance: String,
487 name: String,
488 data: String,
489 },
490
491 /// Sub-orchestration completed (goes to parent's orchestrator queue)
492 /// - Routes to `parent_instance`, not the child
493 /// - `parent_id`: event_id from parent's SubOrchestrationScheduled event
494 SubOrchCompleted {
495 parent_instance: String,
496 parent_execution_id: u64,
497 parent_id: u64, // source_event_id referencing SubOrchestrationScheduled
498 result: String,
499 },
500
501 /// Sub-orchestration failed (goes to parent's orchestration queue)
502 /// - Routes to `parent_instance`, not the child
503 /// - `parent_id`: event_id from parent's SubOrchestrationScheduled event
504 SubOrchFailed {
505 parent_instance: String,
506 parent_execution_id: u64,
507 parent_id: u64, // source_event_id referencing SubOrchestrationScheduled
508 details: crate::ErrorDetails,
509 },
510
511 /// Request orchestration cancellation (goes to orchestrator queue)
512 /// - Runtime will append OrchestrationCancelRequested event
513 /// - Eventually results in OrchestrationFailed with "canceled: {reason}"
514 CancelInstance { instance: String, reason: String },
515
516 /// Continue orchestration as new execution (goes to orchestrator queue)
517 /// - Signals the end of current execution and start of next
518 /// - Runtime will create Event::OrchestrationStarted for next execution
519 /// - Provider should create new execution (see ExecutionMetadata.create_next_execution)
520 ContinueAsNew {
521 instance: String,
522 orchestration: String,
523 input: String,
524 version: Option<String>,
525 /// Persistent events carried forward from the previous execution.
526 /// These are seeded into the new execution's history before any new
527 /// externally-raised events, preserving FIFO order across CAN boundaries.
528 #[serde(default)]
529 carry_forward_events: Vec<(String, String)>,
530 /// Custom status accumulated from the previous execution, if any.
531 /// Carried forward so `get_custom_status()` works immediately in the new execution.
532 #[serde(skip_serializing_if = "Option::is_none")]
533 #[serde(default)]
534 initial_custom_status: Option<String>,
535 },
536
537 /// Persistent external event raised (goes to orchestrator queue).
538 /// Matched by `name` using FIFO mailbox semantics — events stick around until consumed.
539 QueueMessage {
540 instance: String,
541 name: String,
542 data: String,
543 },
544
545 /// V2: External event with topic-based pub/sub matching (goes to orchestrator queue).
546 ///
547 /// Matched by `name` AND `topic` to ExternalSubscribed2 events.
548 /// Feature-gated for replay engine extensibility verification.
549 #[cfg(feature = "replay-version-test")]
550 ExternalRaised2 {
551 instance: String,
552 name: String,
553 topic: String,
554 data: String,
555 },
556}
557
558/// Provider abstraction for durable orchestration execution (persistence + queues).
559///
560/// # Overview
561///
562/// A Provider is responsible for:
563/// 1. **Persistence**: Storing orchestration history (append-only event log)
564/// 2. **Queueing**: Managing two work queues (orchestrator, worker)
565/// 3. **Locking**: Implementing peek-lock semantics to prevent concurrent processing
566/// 4. **Atomicity**: Ensuring transactional consistency across operations
567///
568/// # Architecture: Two-Queue Model
569///
570/// ```text
571/// ┌─────────────────────────────────────────────────────────────┐
572/// │ RUNTIME (2 Dispatchers) │
573/// ├─────────────────────────────────────────────────────────────┤
574/// │ │
575/// │ [Orchestration Dispatcher] ←─── fetch_orchestration_item │
576/// │ ↓ │
577/// │ Process Turn │
578/// │ ↓ │
579/// │ ack_orchestration_item ────┬──► Orchestrator Queue │
580/// │ │ (TimerFired with delayed │
581/// │ │ visibility for timers) │
582/// │ └──► Worker Queue │
583/// │ │
584/// │ [Worker Dispatcher] ←────────── fetch_work_item │
585/// │ Execute Activity │
586/// │ ↓ │
587/// │ Completion ────────────────────► Orchestrator Queue │
588/// │ │
589/// └─────────────────────────────────────────────────────────────┘
590/// ↕
591/// ┌────────────────────────┐
592/// │ PROVIDER (Storage) │
593/// │ ┌──────────────────┐ │
594/// │ │ History (Events) │ │
595/// │ ├──────────────────┤ │
596/// │ │ Orch Queue │ │
597/// │ │ Worker Queue │ │
598/// │ └──────────────────┘ │
599/// └────────────────────────┘
600/// ```
601///
602/// # Design Principles
603///
604/// **Providers should be storage abstractions, NOT orchestration engines:**
605/// - ✅ Store and retrieve events as opaque data (don't inspect Event contents)
606/// - ✅ Manage queues and locks (generic queue operations)
607/// - ✅ Provide ACID guarantees where possible
608/// - ❌ DON'T interpret orchestration semantics (use ExecutionMetadata from runtime)
609/// - ❌ DON'T create events (runtime creates all events)
610/// - ❌ DON'T make orchestration decisions (runtime decides control flow)
611///
612/// # Multi-Execution Support (ContinueAsNew)
613///
614/// Orchestration instances can have multiple executions (execution_id 1, 2, 3, ...):
615/// - Each execution has its own event history
616/// - `read()` should return the LATEST execution's history
617/// - Provider tracks current_execution_id to know which execution is active
618/// - When metadata.create_next_execution=true, create a new execution record/entry
619///
620/// **Example:**
621/// ```text
622/// Instance "order-123":
623/// Execution 1: [OrchestrationStarted, ActivityScheduled, OrchestrationContinuedAsNew]
624/// Execution 2: [OrchestrationStarted, ActivityScheduled, OrchestrationCompleted]
625///
626/// read("order-123") → Returns Execution 2's events (latest)
627/// read_with_execution("order-123", 1) → Returns Execution 1's events
628/// latest_execution_id("order-123") → Returns Some(2)
629/// ```
630///
631/// # Concurrency Model
632///
633/// The runtime runs 3 background dispatchers polling your queues:
634/// 1. **Orchestration Dispatcher**: Polls fetch_orchestration_item() continuously
635/// 2. **Work Dispatcher**: Polls fetch_work_item() continuously
636/// 3. **Timer Dispatcher**: Polls dequeue_timer_peek_lock() continuously
637///
638/// **Your implementation must be thread-safe** and support concurrent access from multiple dispatchers.
639///
640/// # Peek-Lock Pattern (Critical)
641///
642/// All dequeue operations use peek-lock semantics:
643/// 1. **Peek**: Select and lock a message (message stays in queue)
644/// 2. **Process**: Runtime processes the locked message
645/// 3. **Ack**: Delete message from queue (success) OR
646/// 4. **Abandon**: Release lock for retry (failure)
647///
648/// Benefits:
649/// - At-least-once delivery (messages survive crashes)
650/// - Automatic retry on worker failure (lock expires)
651/// - Prevents duplicate processing (locked messages invisible to others)
652///
653/// # Transactional Guarantees
654///
655/// **`ack_orchestration_item()` is the atomic boundary:**
656/// - ALL operations in ack must succeed or fail together
657/// - History append + queue enqueues + lock release = atomic
658/// - If commit fails, entire turn is retried
659/// - This ensures exactly-once semantics for orchestration turns
660///
661/// # Queue Message Flow
662///
663/// **Orchestrator Queue:**
664/// - Inputs: StartOrchestration, ActivityCompleted/Failed, TimerFired, ExternalRaised, SubOrchCompleted/Failed, CancelInstance, ContinueAsNew
665/// - Output: Processed by orchestration dispatcher → ack_orchestration_item
666/// - Batching: All messages for an instance processed together
667/// - Timers: TimerFired items are enqueued with `visible_at = fire_at_ms` for delayed visibility
668///
669/// **Worker Queue:**
670/// - Inputs: ActivityExecute (from ack_orchestration_item)
671/// - Output: Processed by work dispatcher → ActivityCompleted/Failed to orch queue
672/// - Batching: One message at a time (activities executed independently)
673///
674/// **Note:** There is no separate timer queue. Timers are handled by enqueuing TimerFired items
675/// directly to the orchestrator queue with delayed visibility (`visible_at` set to `fire_at_ms`).
676/// The orchestrator dispatcher processes them when `visible_at <= now()`.
677///
678/// # Instance Metadata Management
679///
680/// Providers typically maintain metadata about instances:
681/// - instance_id (primary key)
682/// - orchestration_name, orchestration_version
683/// - current_execution_id (for multi-execution support)
684/// - status, output (optional, for quick queries)
685/// - created_at, updated_at timestamps
686///
687/// This metadata is updated via:
688/// - `enqueue_for_orchestrator()` with StartOrchestration
689/// - `ack_orchestration_item()` with history changes
690/// - `ExecutionMetadata` for status/output updates
691///
692/// # Required vs Optional Methods
693///
694/// **REQUIRED** (must implement):
695/// - fetch_orchestration_item, ack_orchestration_item, abandon_orchestration_item
696/// - read, append_with_execution
697/// - enqueue_for_worker, fetch_work_item, ack_work_item
698/// - enqueue_for_orchestrator
699///
700/// **OPTIONAL** (has defaults):
701/// - latest_execution_id, read_with_execution
702/// - list_instances, list_executions
703///
704/// # Recommended Database Schema (SQL Example)
705///
706/// ```sql
707/// -- Instance metadata
708/// CREATE TABLE instances (
709/// instance_id TEXT PRIMARY KEY,
710/// orchestration_name TEXT NOT NULL,
711/// orchestration_version TEXT, -- NULLable, set by runtime via metadata
712/// current_execution_id INTEGER DEFAULT 1,
713/// created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
714/// );
715///
716/// -- Execution tracking
717/// CREATE TABLE executions (
718/// instance_id TEXT NOT NULL,
719/// execution_id INTEGER NOT NULL,
720/// status TEXT DEFAULT 'Running',
721/// output TEXT,
722/// started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
723/// completed_at TIMESTAMP,
724/// PRIMARY KEY (instance_id, execution_id)
725/// );
726///
727/// -- Event history (append-only)
728/// CREATE TABLE history (
729/// instance_id TEXT NOT NULL,
730/// execution_id INTEGER NOT NULL,
731/// event_id INTEGER NOT NULL,
732/// event_type TEXT NOT NULL,
733/// event_data TEXT NOT NULL, -- JSON serialized Event
734/// created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
735/// PRIMARY KEY (instance_id, execution_id, event_id)
736/// );
737///
738/// -- Orchestrator queue (peek-lock)
739/// CREATE TABLE orchestrator_queue (
740/// id INTEGER PRIMARY KEY AUTOINCREMENT,
741/// instance_id TEXT NOT NULL,
742/// work_item TEXT NOT NULL, -- JSON serialized WorkItem
743/// visible_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
744/// lock_token TEXT,
745/// locked_until TIMESTAMP,
746/// created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
747/// );
748///
749/// -- Worker queue (peek-lock with visibility control)
750/// CREATE TABLE worker_queue (
751/// id INTEGER PRIMARY KEY AUTOINCREMENT,
752/// work_item TEXT NOT NULL,
753/// visible_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
754/// lock_token TEXT,
755/// locked_until TIMESTAMP,
756/// attempt_count INTEGER NOT NULL DEFAULT 0,
757/// created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
758/// );
759///
760/// -- Instance-level locks (CRITICAL: Prevents concurrent processing)
761/// CREATE TABLE instance_locks (
762/// instance_id TEXT PRIMARY KEY,
763/// lock_token TEXT NOT NULL,
764/// locked_until INTEGER NOT NULL, -- Unix timestamp (milliseconds)
765/// locked_at INTEGER NOT NULL -- Unix timestamp (milliseconds)
766/// );
767/// ```
768///
769/// # Implementing a New Provider: Checklist
770///
771/// 1. ✅ **Storage Layer**: Choose backing store (PostgreSQL, Redis, DynamoDB, etc.)
772/// 2. ✅ **Serialization**: Use serde_json for Event and WorkItem (or compatible format)
773/// 3. ✅ **Instance Locking**: Implement instance-level locks to prevent concurrent processing
774/// 4. ✅ **Locking**: Implement peek-lock with unique tokens and expiration
775/// 5. ✅ **Transactions**: Ensure `ack_orchestration_item` is atomic
776/// 6. ✅ **Indexes**: Add indexes on `instance_id`, `lock_token`, `visible_at` for orchestrator queue
777/// 7. ✅ **Delayed Visibility**: Support `visible_at` timestamps for TimerFired items in orchestrator queue
778/// 8. ✅ **Testing**: Use `tests/sqlite_provider_validations.rs` as a template, or see `docs/provider-implementation-guide.md`
779/// 9. ✅ **Multi-execution**: Support execution_id partitioning for ContinueAsNew
780/// 10. ⚠️ **DO NOT**: Inspect event contents (use ExecutionMetadata)
781/// 11. ⚠️ **DO NOT**: Create events (runtime owns event creation)
782/// 12. ⚠️ **DO NOT**: Make orchestration decisions (runtime owns logic)
783///
784/// # Example: Minimal Redis Provider Sketch
785///
786/// ```ignore
787/// struct RedisProvider {
788/// client: redis::Client,
789/// lock_timeout: Duration,
790/// }
791///
792/// impl Provider for RedisProvider {
793/// async fn fetch_orchestration_item(&self) -> Option<OrchestrationItem> {
794/// // 1. RPOPLPUSH from "orch_queue" to "orch_processing"
795/// let instance = self.client.rpoplpush("orch_queue", "orch_processing")?;
796///
797/// // 2. Load history from "history:{instance}:{exec_id}" (sorted set)
798/// let exec_id = self.client.get(&format!("instance:{instance}:exec_id")).unwrap_or(1);
799/// let history = self.client.zrange(&format!("history:{instance}:{exec_id}"), 0, -1);
800///
801/// // 3. Generate lock token and set expiration
802/// let lock_token = uuid::Uuid::new_v4().to_string();
803/// self.client.setex(&format!("lock:{lock_token}"), self.lock_timeout.as_secs() as usize, &instance);
804///
805/// Some(OrchestrationItem { instance, history, lock_token, ... })
806/// }
807///
808/// async fn ack_orchestration_item(..., metadata: ExecutionMetadata) -> Result<(), String> {
809/// let mut pipe = redis::pipe();
810/// pipe.atomic(); // Use Redis transaction
811///
812/// // Append history (ZADD to sorted set with event_id as score)
813/// for event in history_delta {
814/// pipe.zadd(&format!("history:{instance}:{exec_id}"), event_json, event.event_id());
815/// }
816///
817/// // Update metadata (HSET)
818/// if let Some(status) = &metadata.status {
819/// pipe.hset(&format!("exec:{instance}:{exec_id}"), "status", status);
820/// pipe.hset(&format!("exec:{instance}:{exec_id}"), "output", &metadata.output);
821/// }
822///
823/// // Create next execution if needed
824/// if metadata.create_next_execution {
825/// pipe.incr(&format!("instance:{instance}:exec_id"), 1);
826/// }
827///
828/// // Enqueue worker/orchestrator items
829/// for item in worker_items { pipe.lpush("worker_queue", serialize(item)); }
830/// for item in orch_items {
831/// let visible_at = match &item {
832/// WorkItem::TimerFired { fire_at_ms, .. } => *fire_at_ms,
833/// _ => now()
834/// };
835/// pipe.zadd("orch_queue", serialize(item), visible_at);
836/// }
837///
838/// // Release lock
839/// pipe.del(&format!("lock:{lock_token}"));
840/// pipe.lrem("orch_processing", 1, instance);
841///
842/// pipe.execute()?;
843/// Ok(())
844/// }
845/// }
846/// ```
847///
848/// # Design Principles
849///
850/// **Providers should be storage abstractions, NOT orchestration engines:**
851/// - ✅ Store and retrieve events as opaque data (don't inspect Event contents)
852/// - ✅ Manage queues and locks (generic queue operations)
853/// - ✅ Provide ACID guarantees where possible
854/// - ❌ DON'T interpret orchestration semantics (use ExecutionMetadata from runtime)
855/// - ❌ DON'T create events (runtime creates all events)
856/// - ❌ DON'T make orchestration decisions (runtime decides control flow)
857///
858/// # Multi-Execution Support (ContinueAsNew)
859///
860/// Orchestration instances can have multiple executions (execution_id 1, 2, 3, ...):
861/// - Each execution has its own event history
862/// - `read()` should return the LATEST execution's history
863/// - Provider tracks current_execution_id to know which execution is active
864/// - When metadata.create_next_execution=true, create a new execution record/entry
865///
866/// **Example:**
867/// ```text
868/// Instance "order-123":
869/// Execution 1: [OrchestrationStarted, ActivityScheduled, OrchestrationContinuedAsNew]
870/// Execution 2: [OrchestrationStarted, ActivityScheduled, OrchestrationCompleted]
871///
872/// read("order-123") → Returns Execution 2's events (latest)
873/// read_with_execution("order-123", 1) → Returns Execution 1's events
874/// latest_execution_id("order-123") → Returns Some(2)
875/// list_executions("order-123") → Returns vec!\[1, 2\]
876/// ```
877///
878/// # Concurrency Model
879///
880/// The runtime runs 3 background dispatchers polling your queues:
881/// 1. **Orchestration Dispatcher**: Polls fetch_orchestration_item() continuously
882/// 2. **Work Dispatcher**: Polls fetch_work_item() continuously
883/// 3. **Timer Dispatcher**: Polls dequeue_timer_peek_lock() continuously
884///
885/// **Your implementation must be thread-safe** and support concurrent access from multiple dispatchers.
886///
887/// # Peek-Lock Pattern (Critical)
888///
889/// All dequeue operations use peek-lock semantics:
890/// 1. **Peek**: Select and lock a message (message stays in queue)
891/// 2. **Process**: Runtime processes the locked message
892/// 3. **Ack**: Delete message from queue (success) OR
893/// 4. **Abandon**: Release lock for retry (failure)
894///
895/// Benefits:
896/// - At-least-once delivery (messages survive crashes)
897/// - Automatic retry on worker failure (lock expires)
898/// - Prevents duplicate processing (locked messages invisible to others)
899///
900/// # Transactional Guarantees
901///
902/// **`ack_orchestration_item()` is the atomic boundary:**
903/// - ALL operations in ack must succeed or fail together
904/// - History append + queue enqueues + lock release = atomic
905/// - If commit fails, entire turn is retried
906/// - This ensures exactly-once semantics for orchestration turns
907///
908/// # Queue Message Flow
909///
910/// **Orchestrator Queue:**
911/// - Inputs: StartOrchestration, ActivityCompleted/Failed, TimerFired, ExternalRaised, SubOrchCompleted/Failed, CancelInstance, ContinueAsNew
912/// - Output: Processed by orchestration dispatcher → ack_orchestration_item
913/// - Batching: All messages for an instance processed together
914/// - Ordering: FIFO per instance preferred
915/// - Timers: TimerFired items are enqueued with `visible_at = fire_at_ms` for delayed visibility
916///
917/// **Worker Queue:**
918/// - Inputs: ActivityExecute (from ack_orchestration_item)
919/// - Output: Processed by work dispatcher → ActivityCompleted/Failed to orch queue
920/// - Batching: One message at a time (activities executed independently)
921/// - Ordering: FIFO preferred but not required
922///
923/// **Note:** There is no separate timer queue. Timers are handled by enqueuing TimerFired items
924/// directly to the orchestrator queue with delayed visibility (`visible_at` set to `fire_at_ms`).
925/// The orchestrator dispatcher processes them when `visible_at <= now()`.
926///
927/// # Instance Metadata Management
928///
929/// Providers typically maintain metadata about instances:
930/// - instance_id (primary key)
931/// - orchestration_name, orchestration_version (from StartOrchestration)
932/// - current_execution_id (for multi-execution support)
933/// - status, output (optional, from ExecutionMetadata)
934/// - created_at, updated_at timestamps
935///
936/// This metadata is updated via:
937/// - `ack_orchestration_item()` with ExecutionMetadata (creates instance and updates status/output)
938/// - `ack_orchestration_item()` with metadata.create_next_execution (increments current_execution_id)
939///
940/// **Note:** Instances are NOT created in `enqueue_for_orchestrator()`. Instance creation happens
941/// when the runtime acknowledges the first turn via `ack_orchestration_item()` with metadata.
942///
943/// # Error Handling Philosophy
944///
945/// - **Transient errors** (busy, timeout): Return error, let runtime retry
946/// - **Invalid tokens**: Return Ok (idempotent - already processed)
947/// - **Corruption** (missing instance, invalid data): Return error
948/// - **Concurrency conflicts**: Handle via locks, return error if deadlock
949///
950/// # Required vs Optional Methods
951///
952/// **REQUIRED** (must implement):
953/// - fetch_orchestration_item, ack_orchestration_item, abandon_orchestration_item
954/// - read, append_with_execution
955/// - enqueue_for_worker, fetch_work_item, ack_work_item
956/// - enqueue_for_orchestrator
957///
958/// **OPTIONAL** (has defaults):
959/// - latest_execution_id, read_with_execution
960/// - list_instances, list_executions
961///
962/// # Testing Your Provider
963///
964/// See `tests/sqlite_provider_validations.rs` for comprehensive provider validation tests:
965/// - Basic enqueue/dequeue operations
966/// - Transactional atomicity
967/// - Instance locking correctness (including multi-threaded tests)
968/// - Lock expiration and redelivery
969/// - Multi-execution support
970/// - Execution status and output persistence
971///
972/// All tests use the Provider trait directly (not runtime), so they're portable to new providers.
973/// See `docs/provider-implementation-guide.md` for detailed implementation guidance including instance locks.
974///
975/// Core provider trait for runtime orchestration operations.
976///
977/// This trait defines the essential methods required for durable orchestration execution.
978/// It focuses on runtime-critical operations: fetching work items, processing history,
979/// and managing queues. Management and observability features are provided through
980/// optional capability traits.
981///
982/// # Capability Discovery
983///
984/// Providers can implement additional capability traits (like `ProviderAdmin`)
985/// to expose richer features. The `Client` automatically discovers these capabilities
986/// through the `as_management_capability()` method.
987///
988/// # Implementation Guide for LLMs
989///
990/// When implementing a new provider, focus on these core methods:
991///
992/// ## Required Methods (9 total)
993///
994/// 1. **Orchestration Processing (3 methods)**
995/// - `fetch_orchestration_item()` - Atomic batch processing
996/// - `ack_orchestration_item()` - Commit processing results
997/// - `abandon_orchestration_item()` - Release locks and retry
998///
999/// 2. **History Access (1 method)**
1000/// - `read()` - Get event history for status checks
1001///
1002/// 3. **Worker Queue (2 methods)**
1003/// - `fetch_work_item()` - Get activity work items
1004/// - `ack_work_item()` - Acknowledge activity completion
1005///
1006/// 4. **Orchestrator Queue (1 method)**
1007/// - `enqueue_for_orchestrator()` - Enqueue control messages (including TimerFired with delayed visibility)
1008///
1009/// ## Optional Management Methods (2 methods)
1010///
1011/// These are included for backward compatibility but will be moved to
1012/// `ProviderAdmin` in future versions:
1013///
1014/// - `list_instances()` - List all instance IDs
1015/// - `list_executions()` - List execution IDs for an instance
1016///
1017/// # Capability Detection
1018///
1019/// Implement `as_management_capability()` to expose management features:
1020///
1021/// ```ignore
1022/// impl Provider for MyProvider {
1023/// // ... implement required methods
1024///
1025/// fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1026/// Some(self as &dyn ProviderAdmin)
1027/// }
1028/// }
1029///
1030/// impl ProviderAdmin for MyProvider {
1031/// // ... implement management methods
1032/// }
1033/// ```
1034///
1035/// # Testing Your Provider
1036///
1037/// See `tests/sqlite_provider_validations.rs` for comprehensive provider validation tests:
1038/// - Basic enqueue/dequeue operations
1039/// - Transactional atomicity
1040/// - Instance locking correctness (including multi-threaded tests)
1041/// - Lock expiration and redelivery
1042/// - Multi-execution support
1043/// - Execution status and output persistence
1044///
1045/// All tests use the Provider trait directly (not runtime), so they're portable to new providers.
1046/// See `docs/provider-implementation-guide.md` for detailed implementation guidance including instance locks.
1047///
1048#[async_trait::async_trait]
1049#[allow(clippy::too_many_arguments)]
1050pub trait Provider: Any + Send + Sync {
1051 // ===== Provider Identity =====
1052
1053 /// Returns the name of this provider implementation.
1054 ///
1055 /// Used for logging and diagnostics. Override to provide a meaningful name.
1056 ///
1057 /// Default: "unknown"
1058 fn name(&self) -> &str {
1059 "unknown"
1060 }
1061
1062 /// Returns the version of this provider implementation.
1063 ///
1064 /// Used for logging and diagnostics. Override to provide the version.
1065 ///
1066 /// Default: "0.0.0"
1067 fn version(&self) -> &str {
1068 "0.0.0"
1069 }
1070
1071 // ===== Core Atomic Orchestration Methods (REQUIRED) =====
1072 // These three methods form the heart of reliable orchestration execution.
1073 //
1074 // ⚠️ CRITICAL ID GENERATION CONTRACT:
1075 //
1076 // The provider MUST NOT generate execution_id or event_id values.
1077 // All IDs are generated by the runtime and passed to the provider:
1078 // - execution_id: Passed to ack_orchestration_item()
1079 // - event_id: Set in each Event in the history_delta
1080 //
1081 // The provider's role is to STORE these IDs, not generate them.
1082
1083 /// Fetch the next orchestration work item atomically.
1084 ///
1085 /// # What This Does
1086 ///
1087 /// 1. **Select an instance to process**: Find the next available message in orchestrator queue that is not locked
1088 /// 2. **Acquire instance-level lock**: Atomically claim the instance lock to prevent concurrent processing
1089 /// 3. **Lock ALL messages** for that instance (batch processing)
1090 /// 4. **Load history**: Get complete event history for the current execution
1091 /// 5. **Load metadata**: Get orchestration_name, version, execution_id
1092 /// 6. **Return locked batch**: Provider must prevent other processes from touching this instance
1093 ///
1094 /// # ⚠️ CRITICAL: Instance-Level Locking
1095 ///
1096 /// **You MUST acquire an instance-level lock BEFORE fetching messages.** This prevents concurrent
1097 /// dispatchers from processing the same instance simultaneously, which would cause race conditions
1098 /// and data corruption.
1099 ///
1100 /// **Required Implementation Pattern:**
1101 /// 1. Find available instance (check no other lock is held for it)
1102 /// 2. Atomically acquire instance lock (with conflict handling)
1103 /// 3. Verify lock acquisition succeeded
1104 /// 4. Only then proceed to lock and fetch messages
1105 ///
1106 /// See `docs/provider-implementation-guide.md` for detailed implementation guidance.
1107 ///
1108 /// # Peek-Lock Semantics
1109 ///
1110 /// - Messages remain in queue until `ack_orchestration_item()` is called
1111 /// - Generate a unique `lock_token` (e.g., UUID)
1112 /// - Set `locked_until` timestamp (e.g., now + 30 seconds)
1113 /// - If lock expires before ack, messages become available again (automatic retry)
1114 ///
1115 /// # Visibility Filtering
1116 ///
1117 /// Only return messages where `visible_at <= now()`:
1118 /// - Normal messages: visible immediately
1119 /// - Delayed messages: `visible_at = now + delay` (used for timer backpressure)
1120 ///
1121 /// # Instance Batching
1122 ///
1123 /// **CRITICAL:** All messages in the batch must belong to the SAME instance.
1124 ///
1125 /// Implementation steps:
1126 /// 1. Find the first visible, unlocked instance in the orchestrator queue
1127 /// 2. Atomically acquire the instance lock (prevent concurrent processing)
1128 /// 3. Lock ALL queued messages for that instance
1129 /// 4. Fetch all locked messages for processing
1130 ///
1131 /// # History Loading
1132 ///
1133 /// - For new instances (no history yet): return empty Vec
1134 /// - For existing instances: return ALL events for current execution_id, ordered by event_id
1135 /// - For multi-execution instances: return ONLY the LATEST execution's history
1136 ///
1137 /// SQLite example:
1138 /// ```text
1139 /// // Get current execution ID
1140 /// let exec_id = SELECT current_execution_id FROM instances WHERE instance_id = ?
1141 ///
1142 /// // Load history for that execution
1143 /// SELECT event_data FROM history
1144 /// WHERE instance_id = ? AND execution_id = ?
1145 /// ORDER BY event_id
1146 /// ```
1147 ///
1148 /// # Return Value
1149 ///
1150 /// - `Some(OrchestrationItem)` - Work is available and locked
1151 /// - `None` - No work available (dispatcher will sleep and retry)
1152 ///
1153 /// # Error Handling
1154 ///
1155 /// Don't panic on transient errors - return None and let dispatcher retry.
1156 /// Only panic on unrecoverable errors (e.g., corrupted data or schema).
1157 ///
1158 /// # Concurrency
1159 ///
1160 /// This method is called continuously by the orchestration dispatcher.
1161 /// Must be thread-safe and handle concurrent calls gracefully.
1162 ///
1163 /// # Example Implementation Pattern
1164 ///
1165 /// ```ignore
1166 /// async fn fetch_orchestration_item(&self) -> Option<OrchestrationItem> {
1167 /// let tx = begin_transaction()?;
1168 ///
1169 /// // Step 1: Find next available instance (check instance_locks)
1170 /// let instance_id = SELECT q.instance_id FROM orch_queue q
1171 /// LEFT JOIN instance_locks il ON q.instance_id = il.instance_id
1172 /// WHERE q.visible_at <= now()
1173 /// AND (il.instance_id IS NULL OR il.locked_until <= now())
1174 /// ORDER BY q.id LIMIT 1;
1175 ///
1176 /// if instance_id.is_none() { return None; }
1177 ///
1178 /// // Step 2: Atomically acquire instance lock
1179 /// let lock_token = generate_uuid();
1180 /// let lock_result = INSERT INTO instance_locks (instance_id, lock_token, locked_until, locked_at)
1181 /// VALUES (?, ?, ?, ?)
1182 /// ON CONFLICT(instance_id) DO UPDATE
1183 /// SET lock_token = excluded.lock_token, locked_until = excluded.locked_until
1184 /// WHERE locked_until <= excluded.locked_at;
1185 ///
1186 /// if lock_result.rows_affected == 0 {
1187 /// // Lock acquisition failed - another dispatcher has the lock
1188 /// return None;
1189 /// }
1190 ///
1191 /// // Step 3: Lock all messages for this instance
1192 /// UPDATE orch_queue SET lock_token = ?, locked_until = now() + 30s
1193 /// WHERE instance_id = ? AND visible_at <= now();
1194 ///
1195 /// // Step 4: Fetch locked messages
1196 /// let messages = SELECT work_item FROM orch_queue WHERE lock_token = ?;
1197 ///
1198 /// // Step 5: Load instance metadata
1199 /// let (name, version, exec_id) = SELECT ... FROM instances WHERE instance_id = ?;
1200 ///
1201 /// // Step 6: Load history for current execution
1202 /// let history = SELECT event_data FROM history
1203 /// WHERE instance_id = ? AND execution_id = ?
1204 /// ORDER BY event_id;
1205 ///
1206 /// commit_transaction();
1207 /// Some((OrchestrationItem { instance, orchestration_name, execution_id, version, history, messages }, lock_token, attempt_count))
1208 /// }
1209 /// ```
1210 ///
1211 /// # Return Value
1212 ///
1213 /// * `Ok(Some((item, lock_token, attempt_count)))` - Orchestration item with lock token and attempt count
1214 /// - `item`: The orchestration item to process
1215 /// - `lock_token`: Unique token for ack/abandon operations
1216 /// - `attempt_count`: Number of times this item has been fetched (for poison detection)
1217 /// * `Ok(None)` - No work available
1218 /// * `Err(ProviderError)` - Storage error
1219 async fn fetch_orchestration_item(
1220 &self,
1221 lock_timeout: Duration,
1222 poll_timeout: Duration,
1223 filter: Option<&DispatcherCapabilityFilter>,
1224 ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError>;
1225
1226 /// Acknowledge successful orchestration processing atomically.
1227 ///
1228 /// This is the most critical method - it commits all changes from an orchestration turn atomically.
1229 ///
1230 /// # What This Does (ALL must be atomic)
1231 ///
1232 /// 1. **Validate lock token**: Verify instance lock is still valid and matches lock_token
1233 /// 2. **Remove instance lock**: Release the instance lock (processing complete)
1234 /// 3. **Append new events** to history for current execution
1235 /// 4. **Update execution metadata** (status, output) using pre-computed metadata
1236 /// 5. **Create new execution** if metadata.create_next_execution=true (ContinueAsNew)
1237 /// 6. **Enqueue worker_items** to worker queue (activity executions)
1238 /// 7. **Enqueue orchestrator_items** to orchestrator queue (completions, new instances, TimerFired with delayed visibility)
1239 /// 8. **Remove acknowledged messages** from orchestrator queue (release lock)
1240 ///
1241 /// # Atomicity Requirements
1242 ///
1243 /// **CRITICAL:** All 8 operations above must succeed or fail together.
1244 ///
1245 /// - **If ANY operation fails**: Roll back all changes
1246 /// - **If all operations succeed**: All changes are durable and visible
1247 ///
1248 /// This prevents:
1249 /// - Duplicate activity execution (history saved but worker item lost)
1250 /// - Lost completions (worker item enqueued but history not saved)
1251 /// - Orphaned locks (messages deleted but history append failed)
1252 /// - Stale instance locks (instance lock not removed)
1253 ///
1254 /// # Parameters
1255 ///
1256 /// * `lock_token` - Token from fetch_orchestration_item() - identifies locked messages
1257 /// * `history_delta` - New events to append (runtime assigns event_ids, provider stores as-is)
1258 /// * `worker_items` - Activity executions to enqueue (WorkItem::ActivityExecute)
1259 /// * `orchestrator_items` - Orchestrator messages to enqueue (completions, starts, TimerFired with delayed visibility)
1260 /// * `metadata` - Pre-computed execution state (DO NOT inspect events yourself!)
1261 ///
1262 /// # Event Storage (history_delta)
1263 ///
1264 /// **Important:** Store events exactly as provided. DO NOT:
1265 /// - Modify event_id (runtime assigns these)
1266 /// - Inspect event contents to make decisions
1267 /// - Filter or reorder events
1268 ///
1269 /// SQLite example:
1270 /// ```text
1271 /// for event in &history_delta {
1272 /// let event_json = serde_json::to_string(&event)?;
1273 /// let event_type = extract_discriminant_name(&event); // For indexing only
1274 /// INSERT INTO history (instance_id, execution_id, event_id, event_data, event_type)
1275 /// VALUES (?, ?, event.event_id(), event_json, event_type)
1276 /// }
1277 /// ```
1278 ///
1279 /// # Metadata Storage (ExecutionMetadata)
1280 ///
1281 /// **The runtime has ALREADY inspected events and computed metadata.**
1282 /// Provider just stores it:
1283 ///
1284 /// ```text
1285 /// // Update execution status/output from metadata
1286 /// if let Some(status) = &metadata.status {
1287 /// UPDATE executions
1288 /// SET status = ?, output = ?, completed_at = CURRENT_TIMESTAMP
1289 /// WHERE instance_id = ? AND execution_id = ?
1290 /// }
1291 ///
1292 /// // Create next execution if requested
1293 /// if metadata.create_next_execution {
1294 /// if let Some(next_id) = metadata.next_execution_id {
1295 /// INSERT INTO executions (instance_id, execution_id, status)
1296 /// VALUES (?, next_id, 'Running')
1297 ///
1298 /// UPDATE instances SET current_execution_id = next_id
1299 /// }
1300 /// }
1301 /// ```
1302 ///
1303 /// # Queue Item Enqueuing
1304 ///
1305 /// Worker items and orchestrator items must be enqueued within the same transaction:
1306 ///
1307 /// ```text
1308 /// // Worker queue (no special handling)
1309 /// for item in worker_items {
1310 /// INSERT INTO worker_queue (work_item) VALUES (serde_json::to_string(&item))
1311 /// }
1312 ///
1313 /// // Orchestrator queue (may have delayed visibility for TimerFired)
1314 /// for item in orchestrator_items {
1315 /// let visible_at = match &item {
1316 /// WorkItem::TimerFired { fire_at_ms, .. } => *fire_at_ms, // Delayed visibility
1317 /// _ => now() // Immediate visibility
1318 /// };
1319 ///
1320 /// INSERT INTO orchestrator_queue (instance_id, work_item, visible_at)
1321 /// VALUES (extract_instance(&item), serde_json::to_string(&item), visible_at)
1322 /// }
1323 /// ```
1324 ///
1325 /// # Lock Release
1326 ///
1327 /// Remove all acknowledged messages associated with this lock_token.
1328 ///
1329 /// # Error Handling
1330 ///
1331 /// - Return `Ok(())` if all operations succeeded
1332 /// - Return `Err(msg)` if any operation failed (all changes rolled back)
1333 /// - On error, runtime will call `abandon_orchestration_item()` to release lock
1334 ///
1335 /// # Special Cases
1336 ///
1337 /// **StartOrchestration in orchestrator_items:**
1338 /// - May need to create instance metadata record/entry (idempotent create-if-not-exists)
1339 /// - Should create execution record/entry with ID=1 if new instance
1340 ///
1341 /// **Empty history_delta:**
1342 /// - Valid case (e.g., terminal instance being acked with no changes)
1343 /// - Still process queues and release lock
1344 ///
1345 /// # Parameters
1346 ///
1347 /// * `lock_token` - Token from `fetch_orchestration_item` identifying the batch
1348 /// * `execution_id` - **The execution ID this history belongs to** (runtime decides this)
1349 /// * `history_delta` - Events to append to the specified execution
1350 /// * `worker_items` - Activity work items to enqueue
1351 /// * `orchestrator_items` - Orchestration work items to enqueue (StartOrchestration, ContinueAsNew, TimerFired, etc.)
1352 /// - TimerFired items should be enqueued with `visible_at = fire_at_ms` for delayed visibility
1353 /// * `metadata` - Pre-computed execution metadata (status, output)
1354 ///
1355 /// # TimerFired Items
1356 ///
1357 /// When `TimerFired` items are present in `orchestrator_items`, they should be enqueued with
1358 /// delayed visibility using the `fire_at_ms` field for the `visible_at` timestamp.
1359 /// This allows timers to fire at the correct logical time.
1360 ///
1361 /// # execution_id Parameter
1362 ///
1363 /// The `execution_id` parameter tells the provider **which execution this history belongs to**.
1364 /// The runtime is responsible for:
1365 /// - Deciding when to create new executions (e.g., for ContinueAsNew)
1366 /// - Managing execution ID sequencing
1367 /// - Ensuring each execution has its own isolated history
1368 ///
1369 /// The provider should:
1370 /// - **Create the execution record if it doesn't exist** (idempotent create-if-not-exists)
1371 /// - Append `history_delta` to the specified `execution_id`
1372 /// - Update the instance's current execution pointer if this execution_id is newer
1373 /// - NOT inspect WorkItems to decide execution IDs
1374 ///
1375 /// # SQLite Implementation Pattern
1376 ///
1377 /// ```text
1378 /// async fn ack_orchestration_item(execution_id, ...) -> Result<(), String> {
1379 /// let tx = begin_transaction()?;
1380 ///
1381 /// // Step 1: Validate lock token and get instance_id from instance_locks
1382 /// let instance_id = SELECT instance_id FROM instance_locks
1383 /// WHERE lock_token = ? AND locked_until > now();
1384 /// if instance_id.is_none() {
1385 /// return Err("Invalid or expired lock token");
1386 /// }
1387 ///
1388 /// // Step 2: Remove instance lock (processing complete)
1389 /// DELETE FROM instance_locks WHERE instance_id = ? AND lock_token = ?;
1390 ///
1391 /// // Step 3: Create execution record if it doesn't exist (idempotent)
1392 /// INSERT OR IGNORE INTO executions (instance_id, execution_id, status)
1393 /// VALUES (?, ?, 'Running');
1394 ///
1395 /// // Step 4: Append history to the SPECIFIED execution_id
1396 /// for event in history_delta {
1397 /// INSERT INTO history (...) VALUES (instance_id, execution_id, event.event_id(), ...)
1398 /// }
1399 ///
1400 /// // Step 5: Update execution metadata (no event inspection!)
1401 /// if let Some(status) = &metadata.status {
1402 /// UPDATE executions SET status = ?, output = ?, completed_at = NOW()
1403 /// WHERE instance_id = ? AND execution_id = ?
1404 /// }
1405 ///
1406 /// // Step 6: Update current_execution_id if this is a newer execution
1407 /// UPDATE instances SET current_execution_id = GREATEST(current_execution_id, ?)
1408 /// WHERE instance_id = ?;
1409 ///
1410 /// // Step 7: Enqueue worker/orchestrator items
1411 /// // Worker items go to worker queue (populate identity columns)
1412 /// for item in worker_items {
1413 /// if let WorkItem::ActivityExecute { instance, execution_id, id, .. } = &item {
1414 /// INSERT INTO worker_queue (work_item, instance_id, execution_id, activity_id)
1415 /// VALUES (serialize(item), instance, execution_id, id)
1416 /// }
1417 /// }
1418 ///
1419 /// // Orchestrator items go to orchestrator queue (TimerFired uses fire_at_ms for visible_at)
1420 /// for item in orchestrator_items {
1421 /// let visible_at = match &item {
1422 /// WorkItem::TimerFired { fire_at_ms, .. } => *fire_at_ms,
1423 /// _ => now()
1424 /// };
1425 /// INSERT INTO orchestrator_queue (instance_id, work_item, visible_at)
1426 /// VALUES (extract_instance(&item), serialize(item), visible_at)
1427 /// }
1428 ///
1429 /// // Step 8: Delete cancelled activities from worker queue
1430 /// DELETE FROM worker_queue
1431 /// WHERE (instance_id = ? AND execution_id = ? AND activity_id = ?)
1432 /// OR (instance_id = ? AND execution_id = ? AND activity_id = ?)
1433 /// ...
1434 ///
1435 /// // Step 9: Release lock: DELETE FROM orch_queue WHERE lock_token = ?;
1436 ///
1437 /// commit_transaction()?;
1438 /// Ok(())
1439 /// }
1440 /// ```
1441 async fn ack_orchestration_item(
1442 &self,
1443 _lock_token: &str,
1444 _execution_id: u64,
1445 _history_delta: Vec<Event>,
1446 _worker_items: Vec<WorkItem>,
1447 _orchestrator_items: Vec<WorkItem>,
1448 _metadata: ExecutionMetadata,
1449 _cancelled_activities: Vec<ScheduledActivityIdentifier>,
1450 ) -> Result<(), ProviderError>;
1451
1452 /// Abandon orchestration processing (used for errors/retries).
1453 ///
1454 /// Called when orchestration processing fails (e.g., storage contention, runtime crash).
1455 /// The messages must be made available for reprocessing.
1456 ///
1457 /// # What This Does
1458 ///
1459 /// 1. **Clear lock_token** from messages (make available again)
1460 /// 2. **Remove instance lock** (release instance-level lock)
1461 /// 3. **Optionally delay** retry by setting visibility timestamp
1462 /// 4. **Preserve message order** (don't reorder or modify messages)
1463 ///
1464 /// # Parameters
1465 ///
1466 /// * `lock_token` - Token from fetch_orchestration_item()
1467 /// * `delay` - Optional delay before messages become visible again
1468 /// - `None`: immediate retry (visible_at = now)
1469 /// - `Some(duration)`: delayed retry (visible_at = now + duration)
1470 ///
1471 /// # Implementation Pattern
1472 ///
1473 /// ```ignore
1474 /// async fn abandon_orchestration_item(lock_token: &str, delay: Option<Duration>) -> Result<(), String> {
1475 /// let visible_at = if let Some(delay) = delay {
1476 /// now() + delay
1477 /// } else {
1478 /// now()
1479 /// };
1480 ///
1481 /// BEGIN TRANSACTION
1482 /// // Get instance_id from instance_locks
1483 /// let instance_id = SELECT instance_id FROM instance_locks WHERE lock_token = ?;
1484 ///
1485 /// IF instance_id IS NULL:
1486 /// // Invalid lock token - return error
1487 /// ROLLBACK
1488 /// RETURN Err("Invalid lock token")
1489 ///
1490 /// // Clear lock_token from messages
1491 /// UPDATE orchestrator_queue
1492 /// SET lock_token = NULL, locked_until = NULL, visible_at = ?
1493 /// WHERE lock_token = ?;
1494 ///
1495 /// // Remove instance lock
1496 /// DELETE FROM instance_locks WHERE lock_token = ?;
1497 /// COMMIT
1498 ///
1499 /// Ok(())
1500 /// }
1501 /// ```
1502 ///
1503 /// # Use Cases
1504 ///
1505 /// - Storage contention → delay = Some(Duration::from_millis(50)) for backoff
1506 /// - Orchestration turn failed → delay = None for immediate retry
1507 /// - Runtime shutdown during processing → messages auto-recover when lock expires
1508 ///
1509 /// # Error Handling
1510 ///
1511 /// **Invalid lock tokens MUST return an error.** Unlike `ack_orchestration_item()`, this method
1512 /// should not be idempotent for invalid tokens. Invalid tokens indicate a programming error or
1513 /// state corruption and should be surfaced as errors.
1514 ///
1515 /// Return `Err("Invalid lock token")` if the lock token is not recognized or has expired.
1516 ///
1517 /// # Parameters
1518 ///
1519 /// * `lock_token` - The lock token from `fetch_orchestration_item`
1520 /// * `delay` - Optional delay before message becomes visible again
1521 /// * `ignore_attempt` - If true, decrement the attempt count (never below 0)
1522 async fn abandon_orchestration_item(
1523 &self,
1524 _lock_token: &str,
1525 _delay: Option<Duration>,
1526 _ignore_attempt: bool,
1527 ) -> Result<(), ProviderError>;
1528
1529 /// Read the full history for the latest execution of an instance.
1530 ///
1531 /// # What This Does
1532 ///
1533 /// Returns ALL events for the LATEST execution of an instance, ordered by event_id.
1534 ///
1535 /// # Multi-Execution Behavior
1536 ///
1537 /// For instances with multiple executions (from ContinueAsNew):
1538 /// - Find the latest execution_id (MAX(execution_id))
1539 /// - Return events for ONLY that execution
1540 /// - DO NOT return events from earlier executions
1541 ///
1542 /// # Return Value
1543 ///
1544 /// - `Ok(Vec<Event>)` - Events ordered by event_id ascending (event_id 1, 2, 3, ...)
1545 /// - `Err(ProviderError)` - Storage error
1546 ///
1547 /// # Usage
1548 ///
1549 /// **NOT called in runtime hot path.** Used by:
1550 /// - Client.get_orchestration_status() - to determine current state
1551 /// - Runtime.get_orchestration_descriptor() - to get orchestration metadata
1552 /// - Testing and validation suites
1553 ///
1554 /// The runtime hot path uses `fetch_orchestration_item()` which loads history internally.
1555 async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError>;
1556
1557 // ===== History Reading and Testing Methods (NOT used by runtime hot path) =====
1558 // These methods are NOT called by the main runtime during orchestration execution.
1559 // They are used by testing, validation suites, client APIs, and debugging tools.
1560
1561 /// Read the full event history for a specific execution within an instance.
1562 ///
1563 /// **NOT called in runtime hot path.** Used by testing, validation suites, and debugging tools.
1564 ///
1565 /// # Parameters
1566 ///
1567 /// * `instance` - The ID of the orchestration instance.
1568 /// * `execution_id` - The specific execution ID to read history for.
1569 ///
1570 /// # Returns
1571 ///
1572 /// Vector of events in chronological order (oldest first).
1573 ///
1574 /// # Implementation Example
1575 ///
1576 /// ```ignore
1577 /// async fn read_with_execution(&self, instance: &str, execution_id: u64) -> Result<Vec<Event>, ProviderError> {
1578 /// SELECT event_data FROM history
1579 /// WHERE instance_id = ? AND execution_id = ?
1580 /// ORDER BY event_id
1581 /// }
1582 /// ```
1583 async fn read_with_execution(&self, _instance: &str, _execution_id: u64) -> Result<Vec<Event>, ProviderError>;
1584
1585 /// Append events to a specific execution.
1586 ///
1587 /// **NOT called in runtime hot path.** Used by testing and validation suites for fault injection
1588 /// and test setup. The runtime uses `ack_orchestration_item()` to append history atomically.
1589 ///
1590 /// # What This Does
1591 ///
1592 /// Add new events to the history log for a specific execution.
1593 ///
1594 /// # CRITICAL: Event ID Assignment
1595 ///
1596 /// **The runtime assigns event_ids BEFORE calling this method.**
1597 /// - DO NOT modify event.event_id()
1598 /// - DO NOT renumber events
1599 /// - Store events exactly as provided
1600 ///
1601 /// # Duplicate Detection
1602 ///
1603 /// Events with duplicate (instance_id, execution_id, event_id) should:
1604 /// - Either: Reject with error (let runtime handle)
1605 /// - Or: IGNORE (idempotent append)
1606 /// - NEVER: Overwrite existing event (corrupts history)
1607 ///
1608 /// # Parameters
1609 ///
1610 /// * `instance` - The ID of the orchestration instance.
1611 /// * `execution_id` - The specific execution ID to append events to.
1612 /// * `new_events` - Vector of events to append (event_ids already assigned).
1613 ///
1614 /// # Returns
1615 ///
1616 /// `Ok(())` on success, `Err(ProviderError)` on failure.
1617 ///
1618 /// # Implementation Example
1619 ///
1620 /// ```ignore
1621 /// async fn append_with_execution(
1622 /// &self,
1623 /// instance: &str,
1624 /// execution_id: u64,
1625 /// new_events: Vec<Event>,
1626 /// ) -> Result<(), ProviderError> {
1627 /// for event in &new_events {
1628 /// INSERT INTO history (instance_id, execution_id, event_id, event_data)
1629 /// VALUES (?, ?, event.event_id(), serialize(event))
1630 /// }
1631 /// Ok(())
1632 /// }
1633 /// ```
1634 async fn append_with_execution(
1635 &self,
1636 _instance: &str,
1637 _execution_id: u64,
1638 _new_events: Vec<Event>,
1639 ) -> Result<(), ProviderError>;
1640
1641 // ===== Worker Queue Operations (REQUIRED) =====
1642 // Worker queue processes activity executions.
1643
1644 /// Enqueue an activity execution request.
1645 ///
1646 /// # What This Does
1647 ///
1648 /// Add a WorkItem::ActivityExecute to the worker queue for background processing.
1649 ///
1650 /// # Implementation
1651 ///
1652 /// ```ignore
1653 /// async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), String> {
1654 /// INSERT INTO worker_queue (work_item)
1655 /// VALUES (serde_json::to_string(&item)?)
1656 /// }
1657 /// ```
1658 ///
1659 /// # Locking
1660 ///
1661 /// New messages should have lock_token = NULL (available for dequeue).
1662 ///
1663 /// # Ordering
1664 ///
1665 /// FIFO order preferred but not strictly required.
1666 async fn enqueue_for_worker(&self, _item: WorkItem) -> Result<(), ProviderError>;
1667
1668 /// Dequeue a single work item with peek-lock semantics.
1669 ///
1670 /// # What This Does
1671 ///
1672 /// Fetch next available worker task.
1673 ///
1674 /// 1. Find an available task in the worker queue (visible_at <= now, not locked)
1675 /// 2. Lock it with a unique token
1676 /// 3. Return item + token (item stays in queue until ack)
1677 ///
1678 /// # Parameters
1679 ///
1680 /// - `lock_timeout`: Duration to lock the item for processing.
1681 /// - `poll_timeout`: Maximum time to wait for work. Provider MAY wait up to this
1682 /// duration if it supports long polling, or return immediately if it doesn't.
1683 /// - `session`: Session routing configuration.
1684 /// - `Some(config)`: Session-aware fetch — returns non-session + owned + claimable session items.
1685 /// - `None`: Non-session only — returns only items with `session_id IS NULL`.
1686 ///
1687 /// # Return Value
1688 ///
1689 /// - `Ok(Some((WorkItem, String, u32)))` - Item is locked and ready to process
1690 /// - WorkItem: The work item to process
1691 /// - String: Lock token for ack/abandon
1692 /// - u32: Attempt count (number of times this item has been fetched)
1693 /// - `Ok(None)` - No work available
1694 /// - `Err(ProviderError)` - Storage error (allows runtime to distinguish empty queue from failures)
1695 ///
1696 /// # Concurrency
1697 ///
1698 /// Called continuously by work dispatcher. Must prevent double-dequeue.
1699 ///
1700 /// # Session Routing
1701 ///
1702 /// When `session` is `Some(config)`, the provider must filter eligible items:
1703 /// - Non-session items (`session_id IS NULL`): always eligible
1704 /// - Owned-session items: items whose `session_id` has an active session owned by `config.owner_id`
1705 /// - Claimable-session items: items whose `session_id` has no active session (unowned)
1706 ///
1707 /// When fetching a claimable session-bound item, atomically create or update the session
1708 /// record with `config.owner_id` and `config.lock_timeout`.
1709 ///
1710 /// When `session` is `None`, only non-session items (`session_id IS NULL`) are returned.
1711 /// Session-bound items are skipped entirely.
1712 ///
1713 /// # Tag Filtering
1714 ///
1715 /// The `tag_filter` parameter controls which activities this worker processes:
1716 /// - `DefaultOnly`: Only items with `tag IS NULL`
1717 /// - `Tags(set)`: Only items whose `tag` is in `set`
1718 /// - `DefaultAnd(set)`: Items with `tag IS NULL` OR `tag` in `set`
1719 /// - `None`: No items (orchestrator-only mode)
1720 async fn fetch_work_item(
1721 &self,
1722 lock_timeout: Duration,
1723 poll_timeout: Duration,
1724 session: Option<&SessionFetchConfig>,
1725 tag_filter: &TagFilter,
1726 ) -> Result<Option<(WorkItem, String, u32)>, ProviderError>;
1727
1728 /// Acknowledge successful processing of a work item.
1729 ///
1730 /// # What This Does
1731 ///
1732 /// Atomically acknowledge worker item and optionally enqueue completion to orchestrator queue.
1733 ///
1734 /// # Purpose
1735 ///
1736 /// Ensures completion delivery and worker ack happen atomically.
1737 /// Prevents lost completions if enqueue succeeds but ack fails.
1738 /// Prevents duplicate work if ack succeeds but enqueue fails.
1739 ///
1740 /// # Parameters
1741 ///
1742 /// * `token` - Lock token from fetch_work_item
1743 /// * `completion` - Optional completion work item:
1744 /// - `Some(WorkItem)`: Remove worker item AND enqueue completion to orchestrator queue
1745 /// - `None`: Remove worker item WITHOUT enqueueing anything (used when activity was cancelled)
1746 ///
1747 /// # Error on Missing Entry
1748 ///
1749 /// **CRITICAL**: This method MUST return a non-retryable error if the locked worker work-item
1750 /// entry does not exist anymore (was already removed) or cannot be acknowledged as expected.
1751 /// This can happen when:
1752 /// - The activity was cancelled (the backing entry was removed/invalidated by orchestration)
1753 /// - The lock was stolen by another worker (lock expired and work-item was re-fetched/completed)
1754 /// - The lock expired (worker took too long without renewing)
1755 ///
1756 /// The worker dispatcher uses this error to detect cancellation races and log appropriately.
1757 ///
1758 /// # Implementation
1759 ///
1760 /// ```ignore
1761 /// async fn ack_work_item(&self, token: &str, completion: Option<WorkItem>) -> Result<(), ProviderError> {
1762 /// BEGIN TRANSACTION
1763 /// // Relational/SQL example:
1764 /// let result = DELETE FROM worker_queue WHERE lock_token = ?token AND locked_until > now();
1765 ///
1766 /// // CRITICAL: Check if the locked entry existed and lock was still valid
1767 /// if result.rows_affected == 0 {
1768 /// ROLLBACK
1769 /// return Err(ProviderError::permanent("Work item not found - cancelled, stolen, or lock expired"))
1770 /// }
1771 ///
1772 /// if let Some(completion) = completion {
1773 /// INSERT INTO orchestrator_queue (instance_id, work_item, visible_at)
1774 /// VALUES (completion.instance, serialize(completion), now)
1775 /// }
1776 /// COMMIT
1777 /// }
1778 /// ```
1779 async fn ack_work_item(&self, token: &str, completion: Option<WorkItem>) -> Result<(), ProviderError>;
1780
1781 /// Renew the lock on a worker queue item.
1782 ///
1783 /// # What This Does
1784 ///
1785 /// Extends the lock timeout for an in-progress activity execution, preventing
1786 /// the lock from expiring while the activity is still being processed.
1787 ///
1788 /// # Purpose
1789 ///
1790 /// Enables long-running activities to complete without lock timeout:
1791 /// - Worker fetches activity with initial lock timeout (e.g., 30s)
1792 /// - Background renewal task periodically extends the lock
1793 /// - If worker crashes, renewal stops and lock expires naturally
1794 /// - Another worker can then pick up the abandoned activity
1795 ///
1796 /// # Activity Cancellation
1797 ///
1798 /// This method also serves as the cancellation detection mechanism. When the
1799 /// orchestration runtime decides an activity should be cancelled, it removes or invalidates
1800 /// the backing entry for that locked work-item (e.g., by
1801 /// removing the corresponding entry). The next renewal attempt will fail, signaling to the
1802 /// worker that the activity should be cancelled.
1803 ///
1804 /// # Parameters
1805 ///
1806 /// * `token` - Lock token from fetch_work_item
1807 /// * `extend_for` - [`Duration`] to extend lock from now (typically matches `RuntimeOptions::worker_lock_timeout`)
1808 ///
1809 /// # Returns
1810 ///
1811 /// * `Ok(())` - Lock renewed successfully
1812 /// * `Err(ProviderError)` - Lock renewal failed:
1813 /// - Permanent error: Token invalid, expired, entry removed (cancelled), or already acked
1814 /// - Retryable error: Storage connection issues
1815 ///
1816 /// # Implementation Pattern
1817 ///
1818 /// ```ignore
1819 /// async fn renew_work_item_lock(&self, token: &str, extend_for: Duration) -> Result<(), ProviderError> {
1820 /// // Relational/SQL example:
1821 /// let result = UPDATE worker_queue
1822 /// SET locked_until = now() + extend_for
1823 /// WHERE lock_token = ?token AND locked_until > now();
1824 ///
1825 /// if result.rows_affected == 0:
1826 /// // Entry doesn't exist (cancelled), lock expired, or invalid token
1827 /// return Err(ProviderError::permanent("Lock token invalid, expired, or entry removed"))
1828 ///
1829 /// Ok(())
1830 /// }
1831 /// ```
1832 ///
1833 /// # Concurrency
1834 ///
1835 /// - Safe to call while activity is executing
1836 /// - Idempotent: Multiple renewals with same token are safe
1837 /// - Renewal fails gracefully after ack_work_item (token deleted)
1838 ///
1839 /// # Usage in Runtime
1840 ///
1841 /// Called by worker dispatcher's activity manager task:
1842 /// - Renewal interval calculated based on lock timeout
1843 /// - On error: triggers cancellation token (entry may have been removed/invalidated for cancellation)
1844 /// - Stops automatically when activity completes or worker crashes
1845 async fn renew_work_item_lock(&self, token: &str, extend_for: Duration) -> Result<(), ProviderError>;
1846
1847 /// Heartbeat all non-idle sessions owned by the given workers.
1848 ///
1849 /// Extends `locked_until` for sessions where:
1850 /// - `owner_id` matches any of the provided `owner_ids`
1851 /// - `locked_until > now` (still owned)
1852 /// - `last_activity_at + idle_timeout > now` (not idle)
1853 ///
1854 /// Sessions that are idle (no recent activity flow) are NOT renewed,
1855 /// causing their locks to naturally expire.
1856 ///
1857 /// Accepts a slice of owner IDs so the provider can batch the operation
1858 /// into a single storage call.
1859 ///
1860 /// # Returns
1861 ///
1862 /// Total count of sessions renewed across all owners.
1863 async fn renew_session_lock(
1864 &self,
1865 owner_ids: &[&str],
1866 extend_for: Duration,
1867 idle_timeout: Duration,
1868 ) -> Result<usize, ProviderError>;
1869
1870 /// Sweep orphaned session entries.
1871 ///
1872 /// This is a convenience hook so providers don't need background threads for
1873 /// a common cleanup scenario. The runtime calls it periodically on behalf of
1874 /// all workers.
1875 ///
1876 /// Removes sessions where:
1877 /// - `locked_until < now` (lock expired)
1878 /// - No pending work items reference this session
1879 ///
1880 /// Any worker can sweep any worker's orphans.
1881 ///
1882 /// Providers that already clean up expired/ownerless sessions internally
1883 /// (e.g., via TTL, background sweeps, or eager eviction on fetch) may
1884 /// return `Ok(0)` here.
1885 ///
1886 /// # Returns
1887 ///
1888 /// Count of sessions removed (0 is valid if no orphans exist or the
1889 /// provider handles cleanup through other means).
1890 async fn cleanup_orphaned_sessions(&self, idle_timeout: Duration) -> Result<usize, ProviderError>;
1891
1892 /// Abandon work item processing (release lock without completing).
1893 ///
1894 /// Called when activity processing fails with a retryable error (e.g., storage contention)
1895 /// and the work item should be made available for another worker to pick up.
1896 ///
1897 /// # What This Does
1898 ///
1899 /// 1. **Clear lock** - Remove lock_token and reset locked_until
1900 /// 2. **Optionally delay** - Set visible_at to defer retry
1901 /// 3. **Preserve message** - Don't delete or modify the work item content
1902 ///
1903 /// # Parameters
1904 ///
1905 /// * `token` - Lock token from fetch_work_item
1906 /// * `delay` - Optional delay before message becomes visible again
1907 /// - `None`: Immediate visibility (visible_at = now)
1908 /// - `Some(duration)`: Delayed visibility (visible_at = now + duration)
1909 ///
1910 /// # Returns
1911 ///
1912 /// * `Ok(())` - Work item abandoned, lock released
1913 /// * `Err(ProviderError)` - Abandon failed (invalid token, already acked, etc.)
1914 ///
1915 /// # Implementation Pattern
1916 ///
1917 /// ```ignore
1918 /// async fn abandon_work_item(&self, token: &str, delay: Option<Duration>) -> Result<(), ProviderError> {
1919 /// let visible_at = delay.map(|d| now() + d).unwrap_or(now());
1920 ///
1921 /// let result = UPDATE worker_queue
1922 /// SET lock_token = NULL, locked_until = NULL, visible_at = ?visible_at
1923 /// WHERE lock_token = ?token;
1924 ///
1925 /// if result.rows_affected == 0:
1926 /// return Err(ProviderError::permanent("Invalid lock token"))
1927 ///
1928 /// Ok(())
1929 /// }
1930 /// ```
1931 ///
1932 /// # Parameters
1933 ///
1934 /// * `token` - Lock token from fetch_work_item
1935 /// * `delay` - Optional delay before work item becomes visible again
1936 /// * `ignore_attempt` - If true, decrement the attempt count (never below 0)
1937 async fn abandon_work_item(
1938 &self,
1939 token: &str,
1940 delay: Option<Duration>,
1941 ignore_attempt: bool,
1942 ) -> Result<(), ProviderError>;
1943
1944 /// Renew the lock on an orchestration item.
1945 ///
1946 /// Extends the instance lock timeout, preventing lock expiration during
1947 /// long-running orchestration turns.
1948 ///
1949 /// # Parameters
1950 ///
1951 /// * `token` - Lock token from fetch_orchestration_item
1952 /// * `extend_for` - Duration to extend lock from now
1953 ///
1954 /// # Returns
1955 ///
1956 /// * `Ok(())` - Lock renewed successfully
1957 /// * `Err(ProviderError)` - Lock renewal failed (invalid token, expired, etc.)
1958 ///
1959 /// # Implementation Pattern
1960 ///
1961 /// ```ignore
1962 /// async fn renew_orchestration_item_lock(&self, token: &str, extend_for: Duration) -> Result<(), ProviderError> {
1963 /// let locked_until = now() + extend_for;
1964 ///
1965 /// let result = UPDATE instance_locks
1966 /// SET locked_until = ?locked_until
1967 /// WHERE lock_token = ?token
1968 /// AND locked_until > now();
1969 ///
1970 /// if result.rows_affected == 0:
1971 /// return Err(ProviderError::permanent("Lock token invalid or expired"))
1972 ///
1973 /// Ok(())
1974 /// }
1975 /// ```
1976 async fn renew_orchestration_item_lock(&self, token: &str, extend_for: Duration) -> Result<(), ProviderError>;
1977
1978 // ===== Optional Management APIs =====
1979 // These have default implementations and are primarily used for testing/debugging.
1980
1981 /// Enqueue a work item to the orchestrator queue.
1982 ///
1983 /// # Purpose
1984 ///
1985 /// Used by runtime for:
1986 /// - External events: `Client.raise_event()` → enqueues WorkItem::ExternalRaised
1987 /// - Cancellation: `Client.cancel_instance()` → enqueues WorkItem::CancelInstance
1988 /// - Testing: Direct queue manipulation
1989 ///
1990 /// **Note:** In normal operation, orchestrator items are enqueued via `ack_orchestration_item`.
1991 ///
1992 /// # Parameters
1993 ///
1994 /// * `item` - WorkItem to enqueue (usually ExternalRaised or CancelInstance)
1995 /// * `delay` - Optional visibility delay
1996 /// - `None`: Immediate visibility (common case)
1997 /// - `Some(duration)`: Delay visibility by the specified duration
1998 /// - Providers without delayed_visibility support can ignore this (treat as None)
1999 ///
2000 /// # Implementation Pattern
2001 ///
2002 /// ```ignore
2003 /// async fn enqueue_for_orchestrator(&self, item: WorkItem, delay: Option<Duration>) -> Result<(), String> {
2004 /// let instance = extract_instance(&item); // See WorkItem docs for extraction
2005 /// let work_json = serde_json::to_string(&item)?;
2006 ///
2007 /// let visible_at = if let Some(delay) = delay {
2008 /// now() + delay
2009 /// } else {
2010 /// now()
2011 /// };
2012 ///
2013 /// // ⚠️ DO NOT create instance here - runtime will create it via ack_orchestration_item metadata
2014 /// // Instance creation happens when runtime acknowledges the first turn with ExecutionMetadata
2015 ///
2016 /// INSERT INTO orchestrator_queue (instance_id, work_item, visible_at, lock_token, locked_until)
2017 /// VALUES (instance, work_json, visible_at, NULL, NULL);
2018 ///
2019 /// Ok(())
2020 /// }
2021 /// ```
2022 ///
2023 /// # Special Cases
2024 ///
2025 /// **StartOrchestration:**
2026 /// - ⚠️ DO NOT create instance here - instance creation happens via `ack_orchestration_item()` metadata
2027 /// - Just enqueue the work item; runtime will create instance when acknowledging the first turn
2028 ///
2029 /// **ExternalRaised:**
2030 /// - Can arrive before instance is started (race condition)
2031 /// - Should still enqueue - runtime will handle gracefully
2032 ///
2033 /// # Error Handling
2034 ///
2035 /// Return Err if storage fails. Return Ok if item was enqueued successfully.
2036 async fn enqueue_for_orchestrator(&self, _item: WorkItem, _delay: Option<Duration>) -> Result<(), ProviderError>;
2037
2038 // - Add timeout parameter to fetch_work_item and dequeue_timer_peek_lock
2039 // - Add refresh_worker_lock(token, extend_ms) and refresh_timer_lock(token, extend_ms)
2040 // - Provider should auto-abandon messages if lock expires without ack
2041 // This would enable graceful handling of worker crashes and long-running activities
2042
2043 // ===== Capability Discovery =====
2044
2045 /// Check if this provider implements management capabilities.
2046 ///
2047 /// # Purpose
2048 ///
2049 /// This method enables automatic capability discovery by the `Client`.
2050 /// When a provider implements `ProviderAdmin`, it should return
2051 /// `Some(self as &dyn ProviderAdmin)` to expose management features.
2052 ///
2053 /// # Default Implementation
2054 ///
2055 /// Returns `None` (no management capabilities). Override to expose capabilities:
2056 ///
2057 /// ```ignore
2058 /// impl Provider for MyProvider {
2059 /// fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
2060 /// Some(self as &dyn ProviderAdmin)
2061 /// }
2062 /// }
2063 /// ```
2064 ///
2065 /// # Usage
2066 ///
2067 /// The `Client` automatically discovers capabilities:
2068 ///
2069 /// ```ignore
2070 /// let client = Client::new(provider);
2071 /// if client.has_management_capability() {
2072 /// let instances = client.list_all_instances().await?;
2073 /// }
2074 /// ```
2075 fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
2076 None
2077 }
2078
2079 /// Lightweight check for custom status changes.
2080 ///
2081 /// Returns `Some((custom_status, version))` if `custom_status_version > last_seen_version`,
2082 /// `None` if unchanged. Used by `Client::wait_for_status_change()` polling loop.
2083 ///
2084 /// # Parameters
2085 ///
2086 /// * `instance` - Instance ID to check
2087 /// * `last_seen_version` - The version the caller last observed
2088 ///
2089 /// # Returns
2090 ///
2091 /// * `Ok(Some((custom_status, version)))` - Status has changed
2092 /// * `Ok(None)` - No change since `last_seen_version`
2093 /// * `Err(ProviderError)` - Storage error
2094 async fn get_custom_status(
2095 &self,
2096 _instance: &str,
2097 _last_seen_version: u64,
2098 ) -> Result<Option<(Option<String>, u64)>, ProviderError>;
2099}
2100
2101/// Management and observability provider interface.
2102pub mod management;
2103
2104pub mod instrumented;
2105
2106/// SQLite-backed provider with full transactional support.
2107///
2108/// Enable with the `sqlite` feature:
2109/// ```toml
2110/// duroxide = { version = "0.1", features = ["sqlite"] }
2111/// ```
2112#[cfg(feature = "sqlite")]
2113pub mod sqlite;
2114
2115// Re-export management types for convenience
2116pub use management::{
2117 DeleteInstanceResult, ExecutionInfo, InstanceFilter, InstanceInfo, InstanceTree, ManagementProvider, PruneOptions,
2118 PruneResult, QueueDepths, SystemMetrics,
2119};
2120
2121/// Administrative capability trait for observability and management operations.
2122///
2123/// This trait provides rich management and observability features that extend
2124/// the core `Provider` functionality. Providers can implement this trait to
2125/// expose administrative capabilities to the `Client`.
2126///
2127/// # Automatic Discovery
2128///
2129/// The `Client` automatically discovers this capability through the
2130/// `Provider::as_management_capability()` method. When available, management
2131/// methods become accessible through the client.
2132///
2133/// # Implementation Guide for LLMs
2134///
2135/// When implementing a new provider, you can optionally implement this trait
2136/// to expose management features:
2137///
2138/// ```ignore
2139/// impl Provider for MyProvider {
2140/// // ... implement required Provider methods
2141///
2142/// fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
2143/// Some(self as &dyn ProviderAdmin)
2144/// }
2145/// }
2146///
2147/// impl ProviderAdmin for MyProvider {
2148/// async fn list_instances(&self) -> Result<Vec<String>, String> {
2149/// // Query your storage for all instance IDs
2150/// Ok(vec!["instance-1".to_string(), "instance-2".to_string()])
2151/// }
2152///
2153/// async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, String> {
2154/// // Query instance metadata from your storage
2155/// Ok(InstanceInfo {
2156/// instance_id: instance.to_string(),
2157/// orchestration_name: "ProcessOrder".to_string(),
2158/// orchestration_version: "1.0.0".to_string(),
2159/// current_execution_id: 1,
2160/// status: "Running".to_string(),
2161/// output: None,
2162/// created_at: 1234567890,
2163/// updated_at: 1234567890,
2164/// })
2165/// }
2166///
2167/// // ... implement other management methods
2168/// }
2169/// ```
2170///
2171/// # Required Methods (8 total)
2172///
2173/// 1. **Instance Discovery (2 methods)**
2174/// - `list_instances()` - List all instance IDs
2175/// - `list_instances_by_status()` - Filter instances by status
2176///
2177/// 2. **Execution Inspection (3 methods)**
2178/// - `list_executions()` - List execution IDs for an instance
2179/// - `read_execution()` - Read history for a specific execution
2180/// - `latest_execution_id()` - Get the latest execution ID
2181///
2182/// 3. **Metadata Access (2 methods)**
2183/// - `get_instance_info()` - Get comprehensive instance information
2184/// - `get_execution_info()` - Get detailed execution information
2185///
2186/// 4. **System Metrics (2 methods)**
2187/// - `get_system_metrics()` - Get system-wide metrics
2188/// - `get_queue_depths()` - Get current queue depths
2189///
2190/// # Usage
2191///
2192/// ```ignore
2193/// let client = Client::new(provider);
2194///
2195/// // Check if management features are available
2196/// if client.has_management_capability() {
2197/// let instances = client.list_all_instances().await?;
2198/// let metrics = client.get_system_metrics().await?;
2199/// println!("System has {} instances", metrics.total_instances);
2200/// } else {
2201/// println!("Management features not available");
2202/// }
2203/// ```
2204#[async_trait::async_trait]
2205pub trait ProviderAdmin: Any + Send + Sync {
2206 // ===== Instance Discovery =====
2207
2208 /// List all known instance IDs.
2209 ///
2210 /// # Returns
2211 ///
2212 /// Vector of instance IDs, typically sorted by creation time (newest first).
2213 ///
2214 /// # Use Cases
2215 ///
2216 /// - Admin dashboards showing all workflows
2217 /// - Bulk operations across instances
2218 /// - Testing (verify instance creation)
2219 ///
2220 /// # Implementation Example
2221 ///
2222 /// ```ignore
2223 /// async fn list_instances(&self) -> Result<Vec<String>, String> {
2224 /// SELECT instance_id FROM instances ORDER BY created_at DESC
2225 /// }
2226 /// ```
2227 async fn list_instances(&self) -> Result<Vec<String>, ProviderError>;
2228
2229 /// List instances matching a status filter.
2230 ///
2231 /// # Parameters
2232 ///
2233 /// * `status` - Filter by execution status: "Running", "Completed", "Failed", "ContinuedAsNew"
2234 ///
2235 /// # Returns
2236 ///
2237 /// Vector of instance IDs with the specified status.
2238 ///
2239 /// # Implementation Example
2240 ///
2241 /// ```ignore
2242 /// async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, String> {
2243 /// SELECT i.instance_id FROM instances i
2244 /// JOIN executions e ON i.instance_id = e.instance_id AND i.current_execution_id = e.execution_id
2245 /// WHERE e.status = ?
2246 /// ORDER BY i.created_at DESC
2247 /// }
2248 /// ```
2249 async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError>;
2250
2251 // ===== Execution Inspection =====
2252
2253 /// List all execution IDs for an instance.
2254 ///
2255 /// # Returns
2256 ///
2257 /// Vector of execution IDs in ascending order: `[1]`, `[1, 2]`, `[1, 2, 3]`, etc.
2258 ///
2259 /// # Multi-Execution Context
2260 ///
2261 /// When an orchestration uses ContinueAsNew, multiple executions exist:
2262 /// - Execution 1: Original execution
2263 /// - Execution 2: First ContinueAsNew
2264 /// - Execution 3: Second ContinueAsNew
2265 /// - etc.
2266 ///
2267 /// # Implementation Example
2268 ///
2269 /// ```ignore
2270 /// async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, String> {
2271 /// SELECT execution_id FROM executions
2272 /// WHERE instance_id = ?
2273 /// ORDER BY execution_id
2274 /// }
2275 /// ```
2276 async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError>;
2277
2278 /// Read the full event history for a specific execution within an instance.
2279 ///
2280 /// # Parameters
2281 ///
2282 /// * `instance` - The ID of the orchestration instance.
2283 /// * `execution_id` - The specific execution ID to read history for.
2284 ///
2285 /// # Returns
2286 ///
2287 /// Vector of events in chronological order (oldest first).
2288 ///
2289 /// # Implementation Example
2290 ///
2291 /// ```ignore
2292 /// async fn read_execution(&self, instance: &str, execution_id: u64) -> Result<Vec<Event>, String> {
2293 /// SELECT event_data FROM history
2294 /// WHERE instance_id = ? AND execution_id = ?
2295 /// ORDER BY event_id
2296 /// }
2297 /// ```
2298 async fn read_history_with_execution_id(
2299 &self,
2300 instance: &str,
2301 execution_id: u64,
2302 ) -> Result<Vec<Event>, ProviderError>;
2303
2304 /// Read the full event history for the latest execution of an instance.
2305 ///
2306 /// # Parameters
2307 ///
2308 /// * `instance` - The ID of the orchestration instance.
2309 ///
2310 /// # Returns
2311 ///
2312 /// Vector of events in chronological order (oldest first) for the latest execution.
2313 ///
2314 /// # Implementation
2315 ///
2316 /// This method gets the latest execution ID and delegates to `read_history_with_execution_id`.
2317 ///
2318 /// ```ignore
2319 /// async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
2320 /// let execution_id = self.latest_execution_id(instance).await?;
2321 /// self.read_history_with_execution_id(instance, execution_id).await
2322 /// }
2323 /// ```
2324 async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError>;
2325
2326 /// Get the latest (current) execution ID for an instance.
2327 ///
2328 /// # Parameters
2329 ///
2330 /// * `instance` - The ID of the orchestration instance.
2331 ///
2332 /// # Implementation Pattern
2333 ///
2334 /// ```ignore
2335 /// async fn latest_execution_id(&self, instance: &str) -> Result<u64, String> {
2336 /// SELECT COALESCE(MAX(execution_id), 1) FROM executions WHERE instance_id = ?
2337 /// }
2338 /// ```
2339 async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError>;
2340
2341 // ===== Instance Metadata =====
2342
2343 /// Get comprehensive information about an instance.
2344 ///
2345 /// # Parameters
2346 ///
2347 /// * `instance` - The ID of the orchestration instance.
2348 ///
2349 /// # Returns
2350 ///
2351 /// Detailed instance information including status, output, and metadata.
2352 ///
2353 /// # Implementation Example
2354 ///
2355 /// ```ignore
2356 /// async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, String> {
2357 /// SELECT i.*, e.status, e.output
2358 /// FROM instances i
2359 /// JOIN executions e ON i.instance_id = e.instance_id AND i.current_execution_id = e.execution_id
2360 /// WHERE i.instance_id = ?
2361 /// }
2362 /// ```
2363 async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError>;
2364
2365 /// Get detailed information about a specific execution.
2366 ///
2367 /// # Parameters
2368 ///
2369 /// * `instance` - The ID of the orchestration instance.
2370 /// * `execution_id` - The specific execution ID.
2371 ///
2372 /// # Returns
2373 ///
2374 /// Detailed execution information including status, output, and event count.
2375 ///
2376 /// # Implementation Example
2377 ///
2378 /// ```ignore
2379 /// async fn get_execution_info(&self, instance: &str, execution_id: u64) -> Result<ExecutionInfo, String> {
2380 /// SELECT e.*, COUNT(h.event_id) as event_count
2381 /// FROM executions e
2382 /// LEFT JOIN history h ON e.instance_id = h.instance_id AND e.execution_id = h.execution_id
2383 /// WHERE e.instance_id = ? AND e.execution_id = ?
2384 /// GROUP BY e.execution_id
2385 /// }
2386 /// ```
2387 async fn get_execution_info(&self, instance: &str, execution_id: u64) -> Result<ExecutionInfo, ProviderError>;
2388
2389 // ===== System Metrics =====
2390
2391 /// Get system-wide metrics for the orchestration engine.
2392 ///
2393 /// # Returns
2394 ///
2395 /// System metrics including instance counts, execution counts, and status breakdown.
2396 ///
2397 /// # Implementation Example
2398 ///
2399 /// ```text
2400 /// async fn get_system_metrics(&self) -> Result<SystemMetrics, String> {
2401 /// SELECT
2402 /// COUNT(*) as total_instances,
2403 /// SUM(CASE WHEN e.status = 'Running' THEN 1 ELSE 0 END) as running_instances,
2404 /// SUM(CASE WHEN e.status = 'Completed' THEN 1 ELSE 0 END) as completed_instances,
2405 /// SUM(CASE WHEN e.status = 'Failed' THEN 1 ELSE 0 END) as failed_instances
2406 /// FROM instances i
2407 /// JOIN executions e ON i.instance_id = e.instance_id AND i.current_execution_id = e.execution_id
2408 /// }
2409 /// ```
2410 async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError>;
2411
2412 /// Get the current depths of the internal work queues.
2413 ///
2414 /// # Returns
2415 ///
2416 /// Queue depths for orchestrator and worker queues.
2417 ///
2418 /// **Note:** Timer queue depth is not applicable since timers are handled via
2419 /// delayed visibility in the orchestrator queue.
2420 ///
2421 /// # Implementation Example
2422 ///
2423 /// ```ignore
2424 /// async fn get_queue_depths(&self) -> Result<QueueDepths, String> {
2425 /// SELECT
2426 /// (SELECT COUNT(*) FROM orchestrator_queue WHERE lock_token IS NULL) as orchestrator_queue,
2427 /// (SELECT COUNT(*) FROM worker_queue WHERE lock_token IS NULL) as worker_queue
2428 /// }
2429 /// ```
2430 async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError>;
2431
2432 // ===== Hierarchy Primitive Operations =====
2433 // These are simple database operations that providers MUST implement.
2434 // Composite operations like get_instance_tree and delete_instance have
2435 // default implementations that use these primitives.
2436
2437 /// List direct children of an instance.
2438 ///
2439 /// Returns instance IDs that have `parent_instance_id = instance_id`.
2440 /// Returns empty vec if instance has no children or doesn't exist.
2441 ///
2442 /// # Implementation Example
2443 ///
2444 /// ```ignore
2445 /// async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
2446 /// SELECT instance_id FROM instances WHERE parent_instance_id = ?
2447 /// }
2448 /// ```
2449 async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError>;
2450
2451 /// Get the parent instance ID.
2452 ///
2453 /// Returns `Some(parent_id)` for sub-orchestrations, `None` for root orchestrations.
2454 /// Returns `Err` if instance doesn't exist.
2455 ///
2456 /// # Implementation Example
2457 ///
2458 /// ```ignore
2459 /// async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
2460 /// SELECT parent_instance_id FROM instances WHERE instance_id = ?
2461 /// }
2462 /// ```
2463 async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError>;
2464
2465 /// Atomically delete a batch of instances.
2466 ///
2467 /// # Parameters
2468 ///
2469 /// * `ids` - Instance IDs to delete.
2470 /// * `force` - If true, delete regardless of status. If false, all instances must be terminal.
2471 ///
2472 /// # Provider Contract (MUST implement)
2473 ///
2474 /// 1. **Atomicity**: All deletions MUST be atomic (all-or-nothing).
2475 /// If any instance fails validation, the entire batch MUST be rolled back.
2476 ///
2477 /// 2. **Force semantics**:
2478 /// - `force=false`: Return error if ANY instance has `status = 'Running'`
2479 /// - `force=true`: Delete regardless of status (for stuck/abandoned instances)
2480 ///
2481 /// 3. **Orphan detection**: Atomically check for children not in `ids`:
2482 /// ```sql
2483 /// SELECT instance_id FROM instances
2484 /// WHERE parent_instance_id IN (?) AND instance_id NOT IN (?)
2485 /// ```
2486 /// If any exist, return error (race condition: new child spawned after get_instance_tree).
2487 ///
2488 /// 4. **Complete cleanup**: Remove ALL related data in a single atomic operation:
2489 /// - event history
2490 /// - executions
2491 /// - orchestrator queue entries
2492 /// - worker queue entries
2493 /// - instance locks
2494 /// - instance metadata
2495 ///
2496 /// 5. **Prevent ack recreation**: When force-deleting Running instances, the removal
2497 /// of the instance lock prevents in-flight `ack_orchestration_item` from recreating state.
2498 ///
2499 /// # Race Condition Protection
2500 ///
2501 /// This method MUST validate within the transaction that no orphans would be created:
2502 /// - If any instance has `parent_instance_id` pointing to an instance in `ids`,
2503 /// but that child is NOT also in `ids`, return an error.
2504 /// - This prevents orphans from children spawned between `get_instance_tree()` and
2505 /// this call.
2506 ///
2507 /// # Transaction Semantics
2508 ///
2509 /// All instances must be deleted atomically (all-or-nothing).
2510 /// The orphan check must be done within the transaction to prevent TOCTOU races.
2511 ///
2512 /// # Implementation Notes
2513 ///
2514 /// - Delete all related data: event history, executions, orchestrator queue, worker queue, instance locks, instance metadata
2515 /// - Order within transaction doesn't matter for correctness (single atomic transaction)
2516 /// - Count deletions and aggregate into result
2517 async fn delete_instances_atomic(&self, ids: &[String], force: bool)
2518 -> Result<DeleteInstanceResult, ProviderError>;
2519
2520 // ===== Hierarchy Composite Operations =====
2521 // These have default implementations using the primitives above.
2522 // Providers can override for better performance if needed.
2523
2524 /// Get the full instance tree rooted at the given instance.
2525 ///
2526 /// Returns all instances in the tree: the root, all children, grandchildren, etc.
2527 ///
2528 /// Default implementation uses `list_children` recursively.
2529 async fn get_instance_tree(&self, instance_id: &str) -> Result<InstanceTree, ProviderError> {
2530 let mut all_ids = vec![];
2531
2532 // BFS to collect all descendants
2533 let mut to_process = vec![instance_id.to_string()];
2534 while let Some(parent_id) = to_process.pop() {
2535 all_ids.push(parent_id.clone());
2536 let children = self.list_children(&parent_id).await?;
2537 to_process.extend(children);
2538 }
2539
2540 Ok(InstanceTree {
2541 root_id: instance_id.to_string(),
2542 all_ids,
2543 })
2544 }
2545
2546 // ===== Deletion/Pruning Operations =====
2547
2548 /// Delete a single orchestration instance and all its associated data.
2549 ///
2550 /// This removes the instance, all executions, all history events, and any
2551 /// pending queue messages (orchestrator, worker, timer).
2552 ///
2553 /// # Default Implementation
2554 ///
2555 /// Uses primitives: `get_parent_id`, `get_instance_tree`, `delete_instances_atomic`.
2556 /// Providers can override for better performance if needed.
2557 ///
2558 /// # Parameters
2559 ///
2560 /// * `instance_id` - The ID of the instance to delete.
2561 /// * `force` - If true, delete even if the instance is in Running state.
2562 /// WARNING: Force delete only removes stored state; it does NOT cancel
2563 /// in-flight tokio tasks. Use `cancel_instance` first for graceful termination.
2564 ///
2565 /// # Returns
2566 ///
2567 /// * `Ok(DeleteResult)` - Details of what was deleted.
2568 /// * `Err(ProviderError)` with `is_retryable() = false` for:
2569 /// - Instance is running and force=false
2570 /// - Instance is a sub-orchestration (must delete root instead)
2571 /// - Instance not found
2572 ///
2573 /// # Safety
2574 ///
2575 /// - Deleting a running instance with force=true may cause in-flight operations
2576 /// to fail when they try to persist state.
2577 /// - Sub-orchestrations cannot be deleted directly; delete the root to cascade.
2578 /// - The instance lock entry is removed to prevent zombie recreation.
2579 async fn delete_instance(&self, instance_id: &str, force: bool) -> Result<DeleteInstanceResult, ProviderError> {
2580 // Step 1: Check if this is a sub-orchestration
2581 let parent = self.get_parent_id(instance_id).await?;
2582 if parent.is_some() {
2583 return Err(ProviderError::permanent(
2584 "delete_instance",
2585 format!("Cannot delete sub-orchestration {instance_id} directly. Delete root instance instead."),
2586 ));
2587 }
2588
2589 // Step 2: Get full tree (includes all descendants)
2590 let tree = self.get_instance_tree(instance_id).await?;
2591
2592 // Step 3: Atomic delete of entire tree
2593 self.delete_instances_atomic(&tree.all_ids, force).await
2594 }
2595
2596 /// Delete multiple orchestration instances matching the filter criteria.
2597 ///
2598 /// Only instances in terminal states (Completed, Failed) are eligible.
2599 /// Running instances are silently skipped (not an error).
2600 ///
2601 /// # Parameters
2602 ///
2603 /// * `filter` - Criteria for selecting instances to delete. All criteria are ANDed.
2604 ///
2605 /// # Filter Behavior
2606 ///
2607 /// - `instance_ids`: Allowlist of specific IDs to consider
2608 /// - `completed_before`: Only delete instances completed before this timestamp (ms)
2609 /// - `limit`: Maximum number of instances to delete (applied last)
2610 ///
2611 /// # Returns
2612 ///
2613 /// Aggregated counts of all deleted data across all deleted instances.
2614 ///
2615 /// # Safety
2616 ///
2617 /// - Running instances are NEVER deleted (silently skipped)
2618 /// - Use `limit` to avoid long-running transactions
2619 /// - Sub-orchestrations are skipped (only roots are deleted with cascade)
2620 async fn delete_instance_bulk(&self, filter: InstanceFilter) -> Result<DeleteInstanceResult, ProviderError>;
2621
2622 /// Prune old executions from a single instance.
2623 ///
2624 /// Use this for `ContinueAsNew` workflows that accumulate many executions.
2625 ///
2626 /// # Parameters
2627 ///
2628 /// * `instance_id` - The instance to prune executions from.
2629 /// * `options` - Criteria for selecting executions to delete. All criteria are ANDed.
2630 ///
2631 /// # Provider Contract (MUST implement)
2632 ///
2633 /// 1. **Current execution protection**: The current execution of the instance
2634 /// MUST NEVER be pruned, regardless of options.
2635 ///
2636 /// 2. **Running execution protection**: Executions with status 'Running'
2637 /// MUST NEVER be pruned.
2638 ///
2639 /// 3. **Atomicity**: All deletions for a single prune call MUST be atomic
2640 /// (all-or-nothing).
2641 ///
2642 /// # `keep_last` Semantics
2643 ///
2644 /// Since current_execution_id is always the highest execution_id:
2645 /// - `keep_last: None` → prune all except current
2646 /// - `keep_last: Some(0)` → same as None (current is protected)
2647 /// - `keep_last: Some(1)` → same as above (top 1 = current)
2648 /// - `keep_last: Some(N)` → keep current + (N-1) most recent
2649 ///
2650 /// **All three (`None`, `Some(0)`, `Some(1)`) are equivalent** because the
2651 /// current execution is always protected regardless of this setting.
2652 ///
2653 /// # Implementation Example
2654 ///
2655 /// ```sql
2656 /// DELETE FROM executions
2657 /// WHERE instance_id = ?
2658 /// AND execution_id != ? -- current_execution_id (CRITICAL)
2659 /// AND status != 'Running'
2660 /// AND execution_id NOT IN (
2661 /// SELECT execution_id FROM executions
2662 /// WHERE instance_id = ?
2663 /// ORDER BY execution_id DESC
2664 /// LIMIT ? -- keep_last
2665 /// )
2666 /// ```
2667 async fn prune_executions(&self, instance_id: &str, options: PruneOptions) -> Result<PruneResult, ProviderError>;
2668
2669 /// Prune old executions from multiple instances matching the filter.
2670 ///
2671 /// Applies the same prune options to all matching instances.
2672 ///
2673 /// # Parameters
2674 ///
2675 /// * `filter` - Criteria for selecting instances to process.
2676 /// * `options` - Criteria for selecting executions to delete within each instance.
2677 ///
2678 /// # Returns
2679 ///
2680 /// Aggregated counts including how many instances were processed.
2681 async fn prune_executions_bulk(
2682 &self,
2683 filter: InstanceFilter,
2684 options: PruneOptions,
2685 ) -> Result<PruneResult, ProviderError>;
2686}
2687
2688#[cfg(test)]
2689mod tests {
2690 use super::*;
2691
2692 #[test]
2693 fn default_only_matches_none_rejects_tag() {
2694 let f = TagFilter::DefaultOnly;
2695 assert!(f.matches(None));
2696 assert!(!f.matches(Some("gpu")));
2697 }
2698
2699 #[test]
2700 fn tags_matches_member_rejects_others() {
2701 let f = TagFilter::tags(["gpu"]);
2702 assert!(f.matches(Some("gpu")));
2703 assert!(!f.matches(Some("cpu")));
2704 assert!(!f.matches(None));
2705 }
2706
2707 #[test]
2708 fn default_and_matches_both() {
2709 let f = TagFilter::default_and(["gpu"]);
2710 assert!(f.matches(None));
2711 assert!(f.matches(Some("gpu")));
2712 assert!(!f.matches(Some("cpu")));
2713 }
2714
2715 #[test]
2716 fn none_matches_nothing() {
2717 let f = TagFilter::None;
2718 assert!(!f.matches(None));
2719 assert!(!f.matches(Some("x")));
2720 }
2721
2722 #[test]
2723 fn any_matches_everything() {
2724 let f = TagFilter::Any;
2725 assert!(f.matches(None));
2726 assert!(f.matches(Some("gpu")));
2727 assert!(f.matches(Some("cpu")));
2728 assert!(f.matches(Some("anything")));
2729 }
2730
2731 #[test]
2732 fn default_impl_is_default_only() {
2733 assert_eq!(TagFilter::default(), TagFilter::DefaultOnly);
2734 }
2735
2736 #[test]
2737 #[should_panic(expected = "at most 5 tags")]
2738 fn panics_over_max_tags() {
2739 TagFilter::tags(["a", "b", "c", "d", "e", "f"]);
2740 }
2741
2742 #[test]
2743 #[should_panic(expected = "requires at least one tag")]
2744 fn panics_on_empty_tags() {
2745 TagFilter::tags(Vec::<String>::new());
2746 }
2747
2748 #[test]
2749 #[should_panic(expected = "requires at least one tag")]
2750 fn panics_on_empty_default_and() {
2751 TagFilter::default_and(Vec::<String>::new());
2752 }
2753}