Skip to main content

duroxide/runtime/
mod.rs

1// Runtime module: Mutex poisoning indicates a panic - all lock().unwrap()/expect() are intentional.
2#![allow(clippy::expect_used)]
3#![allow(clippy::unwrap_used)]
4#![allow(clippy::clone_on_ref_ptr)]
5
6//
7use crate::providers::{ExecutionMetadata, Provider, WorkItem};
8use crate::{Event, EventKind, OrchestrationContext};
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::time::Duration;
13use tokio::sync::Mutex;
14use tokio::task::JoinHandle;
15use tracing::warn;
16
17// ============================================================================
18// Built-in System Activities
19// ============================================================================
20
21/// Inject built-in system activities into the activity registry.
22/// This adds the new_guid, utc_now_ms, and get_kv_value activities that are used by
23/// `OrchestrationContext::new_guid()`, `OrchestrationContext::utc_now()`,
24/// and `OrchestrationContext::get_value_from_instance()`.
25fn inject_builtin_activities(user_registry: registry::ActivityRegistry) -> registry::ActivityRegistry {
26    registry::ActivityRegistry::builder_from(&user_registry)
27        .register_builtin(
28            crate::SYSCALL_ACTIVITY_NEW_GUID,
29            |_ctx: crate::ActivityContext, _input: String| async move { Ok(crate::generate_guid()) },
30        )
31        .register_builtin(
32            crate::SYSCALL_ACTIVITY_UTC_NOW_MS,
33            |_ctx: crate::ActivityContext, _input: String| async move {
34                use std::time::{SystemTime, UNIX_EPOCH};
35                let ms = SystemTime::now()
36                    .duration_since(UNIX_EPOCH)
37                    .map(|d| d.as_millis() as u64)
38                    .unwrap_or(0);
39                Ok(ms.to_string())
40            },
41        )
42        .register_builtin(
43            crate::SYSCALL_ACTIVITY_GET_KV_VALUE,
44            |ctx: crate::ActivityContext, input: String| async move {
45                let parsed: serde_json::Value =
46                    serde_json::from_str(&input).map_err(|e| format!("get_kv_value: invalid input: {e}"))?;
47                let instance_id = parsed["instance_id"]
48                    .as_str()
49                    .ok_or_else(|| "get_kv_value: missing instance_id".to_string())?;
50                let key = parsed["key"]
51                    .as_str()
52                    .ok_or_else(|| "get_kv_value: missing key".to_string())?;
53                let client = ctx.get_client();
54                let value = client
55                    .get_kv_value(instance_id, key)
56                    .await
57                    .map_err(|e| format!("get_kv_value client error: {e}"))?;
58                serde_json::to_string(&value).map_err(|e| format!("get_kv_value serialization error: {e}"))
59            },
60        )
61        .build_result()
62        .expect("builtin syscall activity registration should never fail")
63}
64
65/// Configuration for exponential backoff when encountering unregistered orchestrations/activities.
66///
67/// During rolling deployments, work items for unregistered handlers are abandoned
68/// with exponential backoff instead of immediately failing. This allows the runtime
69/// to wait for the handler to be registered on upgraded nodes.
70///
71/// # Backoff Calculation
72///
73/// For a work item with `attempt_count` (1-based):
74/// - Delay = `base_delay * 2^(attempt_count - 1)`, capped at `max_delay`
75///
76/// # Example with Default Configuration
77///
78/// With default settings (`base_delay: 1s`, `max_delay: 60s`):
79/// - Attempt 1: 1s delay
80/// - Attempt 2: 2s delay
81/// - Attempt 3: 4s delay
82/// - Attempt 4: 8s delay
83/// - Attempt 5: 16s delay
84/// - Attempt 6: 32s delay
85/// - Attempt 7+: 60s delay (capped)
86#[derive(Debug, Clone)]
87pub struct UnregisteredBackoffConfig {
88    /// Base delay for the first backoff attempt.
89    /// Default: 1 second
90    pub base_delay: Duration,
91
92    /// Maximum delay cap for any backoff attempt.
93    /// Default: 60 seconds
94    pub max_delay: Duration,
95}
96
97impl UnregisteredBackoffConfig {
98    /// Maximum exponent for backoff calculation (caps at 64x base delay)
99    const MAX_BACKOFF_EXPONENT: u32 = 6;
100    /// Default base delay for unregistered handler backoff
101    const DEFAULT_BASE_DELAY: Duration = Duration::from_secs(1);
102    /// Default maximum delay for unregistered handler backoff
103    const DEFAULT_MAX_DELAY: Duration = Duration::from_secs(60);
104
105    /// Calculate the backoff delay for a given attempt count (1-based).
106    ///
107    /// # Arguments
108    ///
109    /// * `attempt_count` - The fetch attempt number (1-based from provider)
110    ///
111    /// # Returns
112    ///
113    /// The backoff delay, capped at `max_delay`
114    pub fn delay(&self, attempt_count: u32) -> Duration {
115        // attempt_count is 1-based, so subtract 1 for exponent
116        let exponent = attempt_count.saturating_sub(1).min(Self::MAX_BACKOFF_EXPONENT);
117        let delay = self.base_delay.saturating_mul(1 << exponent);
118        delay.min(self.max_delay)
119    }
120}
121
122impl Default for UnregisteredBackoffConfig {
123    fn default() -> Self {
124        Self {
125            base_delay: Self::DEFAULT_BASE_DELAY,
126            max_delay: Self::DEFAULT_MAX_DELAY,
127        }
128    }
129}
130
131/// Configuration options for the Runtime.
132///
133/// # Example
134///
135/// ```rust,no_run
136/// # use duroxide::runtime::{RuntimeOptions, ObservabilityConfig, LogFormat};
137/// # use std::time::Duration;
138/// let options = RuntimeOptions {
139///     orchestration_concurrency: 4,
140///     worker_concurrency: 8,
141///     dispatcher_min_poll_interval: Duration::from_millis(25), // Polling backoff when queues idle
142///     dispatcher_long_poll_timeout: Duration::from_secs(30),   // Long polling timeout
143///     orchestrator_lock_timeout: Duration::from_secs(10),      // Orchestration turns retry after 10s
144///     worker_lock_timeout: Duration::from_secs(300),        // Activities retry after 5 minutes
145///     worker_lock_renewal_buffer: Duration::from_secs(30),  // Renew worker locks 30s early
146///     observability: ObservabilityConfig {
147///         log_format: LogFormat::Compact,
148///        log_level: "info".to_string(),
149///         ..Default::default()
150///     },
151///     ..Default::default()
152/// };
153/// ```
154#[derive(Debug, Clone)]
155pub struct RuntimeOptions {
156    /// Minimum polling cycle duration when idle.
157    ///
158    /// If a provider returns 'None' (no work) faster than this duration,
159    /// the dispatcher will sleep for the remainder of the time.
160    /// This prevents hot loops for providers that do not support long polling
161    /// or return early.
162    ///
163    /// Default: 100ms (10 Hz)
164    pub dispatcher_min_poll_interval: Duration,
165
166    /// Maximum time to wait for work inside the provider (Long Polling).
167    ///
168    /// Only used if the provider supports long polling.
169    ///
170    /// Default: 30 seconds
171    pub dispatcher_long_poll_timeout: Duration,
172
173    /// Number of concurrent orchestration workers.
174    /// Each worker can process one orchestration turn at a time.
175    /// Higher values = more parallel orchestration execution.
176    /// Default: 2
177    pub orchestration_concurrency: usize,
178
179    /// Number of concurrent worker dispatchers.
180    /// Each worker can execute one activity at a time.
181    /// Higher values = more parallel activity execution.
182    /// Default: 2
183    pub worker_concurrency: usize,
184
185    /// Lock timeout for orchestrator queue items.
186    /// When an orchestration message is dequeued, it's locked for this duration.
187    /// Orchestration turns are typically fast (milliseconds), so a shorter timeout is appropriate.
188    /// If processing doesn't complete within this time, the lock expires and the message is retried.
189    /// Default: 5 seconds
190    pub orchestrator_lock_timeout: Duration,
191
192    /// Buffer time before orchestration lock expiration to trigger renewal.
193    ///
194    /// Lock renewal strategy:
195    /// - If `orchestrator_lock_timeout` ≥ 15s: renew at (`timeout - orchestrator_lock_renewal_buffer`)
196    /// - If `orchestrator_lock_timeout` < 15s: renew at 0.5 × timeout (buffer ignored)
197    ///
198    /// Default: 2 seconds
199    pub orchestrator_lock_renewal_buffer: Duration,
200
201    /// Lock timeout for worker queue items (activities).
202    /// When an activity is dequeued, it's locked for this duration.
203    /// Activities can be long-running (minutes), so a longer timeout is appropriate.
204    /// If processing doesn't complete within this time, the lock expires and the activity is retried.
205    /// Higher values = more tolerance for long-running activities.
206    /// Lower values = faster retry on failures, but may timeout legitimate work.
207    /// Default: 30 seconds
208    pub worker_lock_timeout: Duration,
209
210    /// Buffer time before lock expiration to trigger renewal.
211    ///
212    /// Lock renewal strategy:
213    /// - If `worker_lock_timeout` ≥ 15s: renew at (`timeout - worker_lock_renewal_buffer`)
214    /// - If `worker_lock_timeout` < 15s: renew at 0.5 × timeout (buffer ignored)
215    ///
216    /// Example with default values (timeout=30s, buffer=5s):
217    /// - Initial lock: expires at T+30s
218    /// - First renewal: at T+25s (30-5), extends to T+55s
219    /// - Second renewal: at T+50s (55-5), extends to T+80s
220    ///
221    /// Example with short timeout (timeout=10s, buffer ignored):
222    /// - Initial lock: expires at T+10s
223    /// - First renewal: at T+5s (10*0.5), extends to T+15s
224    /// - Second renewal: at T+10s (15*0.5), extends to T+20s
225    ///
226    /// Default: 5 seconds
227    pub worker_lock_renewal_buffer: Duration,
228
229    /// Observability configuration for metrics and logging.
230    /// Requires the `observability` feature flag for full functionality.
231    /// Default: Disabled with basic logging
232    pub observability: ObservabilityConfig,
233
234    /// Configuration for backoff when encountering unregistered orchestrations/activities.
235    ///
236    /// During rolling deployments, work items for unregistered handlers are abandoned
237    /// with exponential backoff instead of immediately failing. This allows the runtime
238    /// to wait for the handler to be registered on upgraded nodes.
239    ///
240    /// Default: 1s base delay, 60s max delay
241    pub unregistered_backoff: UnregisteredBackoffConfig,
242
243    /// Maximum fetch attempts before a message is considered poison.
244    ///
245    /// After this many fetch attempts, the runtime will immediately fail
246    /// the orchestration/activity with a Poison error instead of processing.
247    ///
248    /// Default: 10
249    pub max_attempts: u32,
250
251    /// Grace period for activity cancellation.
252    ///
253    /// When an orchestration reaches a terminal state, in-flight activities
254    /// are notified via their cancellation token. This setting controls how
255    /// long to wait for activities to complete gracefully before aborting
256    /// the activity task to free worker capacity.
257    ///
258    /// After this grace period, if the activity has not completed:
259    /// - The activity task is aborted (`JoinHandle::abort()`)
260    /// - The worker queue message is dropped without notifying the orchestrator
261    /// - A warning is logged
262    ///
263    /// Note: Child tasks/threads spawned by the activity that do not observe
264    /// the cancellation token may outlive the abort (user responsibility).
265    ///
266    /// Default: 10 seconds
267    pub activity_cancellation_grace_period: Duration,
268
269    /// Override the replay-engine version range used for capability filtering.
270    ///
271    /// By default, the runtime uses `>=0.0.0, <=CURRENT_BUILD_VERSION`, meaning it
272    /// can replay any execution pinned at or below its own semver. This is correct for
273    /// most deployments since replay engines are backward-compatible.
274    ///
275    /// Set this to change the range for advanced scenarios:
276    /// - **Narrowing:** Restrict a node to only process a specific version band
277    ///   (e.g., `>=1.0.0, <=1.9.999` in a mixed-version cluster).
278    /// - **Widening to drain stuck items:** Set a wide range like `>=0.0.0, <=99.0.0`
279    ///   to fetch orchestrations pinned at any version. Items with unknown event types
280    ///   will fail at provider-level deserialization (never reaching the replay engine)
281    ///   and remain in the queue with escalating `attempt_count`.
282    ///
283    /// Default: `None` (uses `>=0.0.0, <=CURRENT_BUILD_VERSION`)
284    pub supported_replay_versions: Option<crate::providers::SemverRange>,
285
286    /// Lock timeout for session heartbeat lease.
287    /// Controls crash recovery speed — if a worker dies, its sessions become
288    /// claimable after this duration.
289    /// Default: 30 seconds
290    pub session_lock_timeout: Duration,
291
292    /// Buffer time before session lock expiration to trigger renewal.
293    /// Uses the same formula as `worker_lock_renewal_buffer`.
294    /// Default: 5 seconds
295    pub session_lock_renewal_buffer: Duration,
296
297    /// How long a session stays pinned after the last activity is
298    /// fetched, renewed, or completed. The session renewal thread
299    /// stops heartbeating idle sessions, so their locks naturally expire.
300    /// Default: 5 minutes
301    pub session_idle_timeout: Duration,
302
303    /// How often orphaned session rows are swept from the sessions table.
304    /// Runs on the same background thread as session lock renewal.
305    /// Default: 5 minutes
306    pub session_cleanup_interval: Duration,
307
308    /// Maximum number of distinct sessions this runtime will own concurrently,
309    /// spanning **all** `worker_concurrency` slots.
310    ///
311    /// A single `SessionTracker` is shared across every worker slot in this
312    /// runtime. When `distinct_count()` reaches this limit, **all** slots stop
313    /// claiming new sessions (fetch switches to non-session mode) until an
314    /// in-flight session activity completes and frees a session slot.
315    ///
316    /// Session activities and non-session activities share the same
317    /// `worker_concurrency` slots.
318    /// Default: 10
319    pub max_sessions_per_runtime: usize,
320
321    /// Stable worker identity for session ownership.
322    /// If set, used directly as the session `worker_id` for session claims —
323    /// all `worker_concurrency` slots share this single identity, so any idle
324    /// slot can serve any session owned by this runtime (no head-of-line blocking).
325    /// Also allows a restarted worker to reclaim its sessions without waiting
326    /// for lock expiry.
327    /// Example: Kubernetes StatefulSet pod name.
328    /// If `None`, uses ephemeral per-slot identity (`work-{idx}-{runtime_id}`);
329    /// sessions are pinned per-slot and cannot survive restarts.
330    /// Note: Logging/tracing always includes the per-slot `work-{idx}-{node_id}`
331    /// format regardless of this setting.
332    /// Default: None
333    pub worker_node_id: Option<String>,
334
335    /// Tag filter for worker activity routing.
336    ///
337    /// Controls which activities this runtime's worker slots will process:
338    /// - `DefaultOnly`: Only untagged activities (default)
339    /// - `Tags(["gpu"])`: Only activities tagged `"gpu"`
340    /// - `DefaultAnd(["gpu"])`: Both untagged and `"gpu"` activities
341    /// - `None`: Disable worker (orchestrator-only mode)
342    ///
343    /// Default: `TagFilter::DefaultOnly` (untagged activities only)
344    pub worker_tag_filter: crate::providers::TagFilter,
345}
346
347impl Default for RuntimeOptions {
348    fn default() -> Self {
349        Self {
350            dispatcher_min_poll_interval: Duration::from_millis(100),
351            dispatcher_long_poll_timeout: Duration::from_secs(30), // 30 seconds
352            orchestration_concurrency: 2,
353            worker_concurrency: 2,
354            orchestrator_lock_timeout: Duration::from_secs(5),
355            orchestrator_lock_renewal_buffer: Duration::from_secs(2),
356            worker_lock_timeout: Duration::from_secs(30),
357            worker_lock_renewal_buffer: Duration::from_secs(5),
358            observability: ObservabilityConfig::default(),
359            unregistered_backoff: UnregisteredBackoffConfig::default(),
360            max_attempts: 10,
361            activity_cancellation_grace_period: Duration::from_secs(10),
362            supported_replay_versions: None,
363            session_lock_timeout: Duration::from_secs(30),
364            session_lock_renewal_buffer: Duration::from_secs(5),
365            session_idle_timeout: Duration::from_secs(300), // 5 minutes
366            session_cleanup_interval: Duration::from_secs(300), // 5 minutes
367            max_sessions_per_runtime: 10,
368            worker_node_id: None,
369            worker_tag_filter: crate::providers::TagFilter::default(),
370        }
371    }
372}
373
374mod dispatchers;
375pub mod limits;
376pub mod observability;
377pub mod registry;
378mod state_helpers;
379
380#[cfg(feature = "test-hooks")]
381pub mod test_hooks;
382
383use async_trait::async_trait;
384pub use state_helpers::{HistoryManager, WorkItemReader};
385
386pub mod execution;
387pub mod replay_engine;
388
389pub use observability::{LogFormat, ObservabilityConfig};
390
391/// High-level orchestration status derived from history.
392#[derive(Debug, Clone, PartialEq, Eq)]
393pub enum OrchestrationStatus {
394    /// Instance does not exist
395    NotFound,
396    /// Instance is currently executing
397    Running {
398        /// User-defined progress string set via `ctx.set_custom_status()`
399        custom_status: Option<String>,
400        /// Monotonically increasing version counter for change detection
401        custom_status_version: u64,
402    },
403    /// Instance completed successfully with output
404    Completed {
405        output: String,
406        /// Last custom status set before completion
407        custom_status: Option<String>,
408        /// Version at completion time
409        custom_status_version: u64,
410    },
411    /// Instance failed with structured error details.
412    /// Use `details.category()` to distinguish infrastructure/configuration/application errors.
413    Failed {
414        details: crate::ErrorDetails,
415        /// Last custom status set before failure
416        custom_status: Option<String>,
417        /// Version at failure time
418        custom_status_version: u64,
419    },
420}
421
422/// Trait implemented by orchestration handlers that can be invoked by the runtime.
423#[async_trait]
424pub trait OrchestrationHandler: Send + Sync {
425    async fn invoke(&self, ctx: OrchestrationContext, input: String) -> Result<String, String>;
426}
427
428/// Function wrapper that implements `OrchestrationHandler`.
429pub struct FnOrchestration<F, Fut>(pub F)
430where
431    F: Fn(OrchestrationContext, String) -> Fut + Send + Sync + 'static,
432    Fut: std::future::Future<Output = Result<String, String>> + Send + 'static;
433
434#[async_trait]
435impl<F, Fut> OrchestrationHandler for FnOrchestration<F, Fut>
436where
437    F: Fn(OrchestrationContext, String) -> Fut + Send + Sync + 'static,
438    Fut: std::future::Future<Output = Result<String, String>> + Send + 'static,
439{
440    async fn invoke(&self, ctx: OrchestrationContext, input: String) -> Result<String, String> {
441        (self.0)(ctx, input).await
442    }
443}
444
445/// Trait implemented by activity handlers that can be invoked by the runtime.
446#[async_trait]
447pub trait ActivityHandler: Send + Sync {
448    async fn invoke(&self, ctx: crate::ActivityContext, input: String) -> Result<String, String>;
449}
450
451/// Function wrapper that implements `ActivityHandler`.
452pub struct FnActivity<F, Fut>(pub F)
453where
454    F: Fn(crate::ActivityContext, String) -> Fut + Send + Sync + 'static,
455    Fut: std::future::Future<Output = Result<String, String>> + Send + 'static;
456
457#[async_trait]
458impl<F, Fut> ActivityHandler for FnActivity<F, Fut>
459where
460    F: Fn(crate::ActivityContext, String) -> Fut + Send + Sync + 'static,
461    Fut: std::future::Future<Output = Result<String, String>> + Send + 'static,
462{
463    async fn invoke(&self, ctx: crate::ActivityContext, input: String) -> Result<String, String> {
464        (self.0)(ctx, input).await
465    }
466}
467
468/// Immutable registry mapping orchestration names to versioned handlers.
469pub use crate::runtime::registry::{OrchestrationRegistry, OrchestrationRegistryBuilder, VersionPolicy};
470
471pub fn kind_of(msg: &WorkItem) -> &'static str {
472    match msg {
473        WorkItem::StartOrchestration { .. } => "StartOrchestration",
474        WorkItem::ActivityExecute { .. } => "ActivityExecute",
475        WorkItem::ActivityCompleted { .. } => "ActivityCompleted",
476        WorkItem::ActivityFailed { .. } => "ActivityFailed",
477        WorkItem::TimerFired { .. } => "TimerFired",
478        WorkItem::ExternalRaised { .. } => "ExternalRaised",
479        WorkItem::QueueMessage { .. } => "ExternalRaisedPersistent",
480        #[cfg(feature = "replay-version-test")]
481        WorkItem::ExternalRaised2 { .. } => "ExternalRaised2",
482        WorkItem::SubOrchCompleted { .. } => "SubOrchCompleted",
483        WorkItem::SubOrchFailed { .. } => "SubOrchFailed",
484        WorkItem::CancelInstance { .. } => "CancelInstance",
485        WorkItem::ContinueAsNew { .. } => "ContinueAsNew",
486    }
487}
488
489/// In-process runtime that executes activities and timers and persists
490/// history via a `Provider`.
491pub struct Runtime {
492    joins: Mutex<Vec<JoinHandle<()>>>,
493    history_store: Arc<dyn Provider>,
494    orchestration_registry: OrchestrationRegistry,
495    /// Track the current execution ID for each active instance
496    current_execution_ids: Mutex<HashMap<String, u64>>,
497    /// Shutdown flag checked by dispatchers
498    shutdown_flag: Arc<AtomicBool>,
499    /// Runtime configuration options
500    options: RuntimeOptions,
501    /// Observability handle for metrics and logging
502    observability_handle: Option<observability::ObservabilityHandle>,
503    /// Unique runtime instance ID (4-char hex, generated on start)
504    runtime_id: String,
505}
506
507/// Introspection: descriptor of an orchestration derived from history.
508#[derive(Debug, Clone, PartialEq, Eq)]
509pub struct OrchestrationDescriptor {
510    pub name: String,
511    pub version: String,
512    pub parent_instance: Option<String>,
513    pub parent_id: Option<u64>,
514}
515
516impl Runtime {
517    /// Helper to get the metrics provider if available.
518    #[inline]
519    fn metrics_provider(&self) -> Option<&observability::MetricsProvider> {
520        self.observability_handle
521            .as_ref()
522            .map(|h| h.metrics_provider().as_ref())
523    }
524
525    // New label-aware metric recording methods
526    #[inline]
527    fn record_orchestration_start(&self, orchestration_name: &str, version: &str, initiated_by: &str) {
528        if let Some(provider) = self.metrics_provider() {
529            provider.record_orchestration_start(orchestration_name, version, initiated_by);
530        }
531    }
532
533    #[inline]
534    fn record_orchestration_completion_with_labels(
535        &self,
536        orchestration_name: &str,
537        version: &str,
538        status: &str,
539        duration_seconds: f64,
540        turn_count: u64,
541        history_events: u64,
542    ) {
543        if let Some(provider) = self.metrics_provider() {
544            provider.record_orchestration_completion(
545                orchestration_name,
546                version,
547                status,
548                duration_seconds,
549                turn_count,
550                history_events,
551            );
552        }
553    }
554
555    #[inline]
556    fn record_orchestration_failure_with_labels(
557        &self,
558        orchestration_name: &str,
559        version: &str,
560        error_type: &str,
561        error_category: &str,
562    ) {
563        if let Some(provider) = self.metrics_provider() {
564            provider.record_orchestration_failure(orchestration_name, version, error_type, error_category);
565        }
566    }
567
568    #[inline]
569    fn record_continue_as_new(&self, orchestration_name: &str, execution_id: u64) {
570        if let Some(provider) = self.metrics_provider() {
571            provider.record_continue_as_new(orchestration_name, execution_id);
572        }
573    }
574
575    #[inline]
576    fn increment_active_orchestrations(&self) {
577        if let Some(provider) = self.metrics_provider() {
578            provider.increment_active_orchestrations();
579        }
580    }
581
582    #[inline]
583    fn decrement_active_orchestrations(&self) {
584        if let Some(provider) = self.metrics_provider() {
585            provider.decrement_active_orchestrations();
586        }
587    }
588
589    #[inline]
590    fn record_activity_execution(
591        &self,
592        activity_name: &str,
593        outcome: &str,
594        duration_seconds: f64,
595        retry_attempt: u32,
596        tag: Option<&str>,
597    ) {
598        if let Some(provider) = self.metrics_provider() {
599            provider.record_activity_execution(activity_name, outcome, duration_seconds, retry_attempt, tag);
600        }
601    }
602
603    // Simple metric recording methods (used by execution.rs and worker.rs)
604    // These call MetricsProvider methods which emit both counter!() and atomic increments
605    #[inline]
606    fn record_orchestration_application_error(&self) {
607        if let Some(provider) = self.metrics_provider() {
608            provider.record_orchestration_application_error();
609        }
610    }
611
612    #[inline]
613    fn record_orchestration_infrastructure_error(&self) {
614        if let Some(provider) = self.metrics_provider() {
615            provider.record_orchestration_infrastructure_error();
616        }
617    }
618
619    #[inline]
620    fn record_orchestration_configuration_error(&self) {
621        if let Some(provider) = self.metrics_provider() {
622            provider.record_orchestration_configuration_error();
623        }
624    }
625
626    #[inline]
627    fn record_activity_success(&self) {
628        if let Some(provider) = self.metrics_provider() {
629            provider.record_activity_success();
630        }
631    }
632
633    #[inline]
634    fn record_activity_app_error(&self) {
635        if let Some(provider) = self.metrics_provider() {
636            provider.record_activity_app_error();
637        }
638    }
639
640    #[inline]
641    fn record_activity_infra_error(&self) {
642        if let Some(provider) = self.metrics_provider() {
643            provider.record_activity_infra_error();
644        }
645    }
646
647    #[inline]
648    fn record_orchestration_poison(&self) {
649        if let Some(provider) = self.metrics_provider() {
650            provider.record_orchestration_poison();
651        }
652    }
653
654    #[inline]
655    fn record_activity_poison(&self) {
656        if let Some(provider) = self.metrics_provider() {
657            provider.record_activity_poison();
658        }
659    }
660
661    pub fn metrics_snapshot(&self) -> Option<observability::MetricsSnapshot> {
662        self.observability_handle
663            .as_ref()
664            .map(|handle| handle.metrics_snapshot())
665    }
666
667    /// Returns a reference to the observability handle, if observability is enabled.
668    pub fn observability_handle(&self) -> Option<&observability::ObservabilityHandle> {
669        self.observability_handle.as_ref()
670    }
671
672    /// Initialize all gauges that need to sync with persistent state on startup.
673    ///
674    /// Gauges (unlike counters) represent current state and must be initialized
675    /// from the provider to reflect reality after a restart.
676    ///
677    /// This initializes:
678    /// - `duroxide_active_orchestrations` - Current running orchestrations
679    /// - `duroxide_orchestrator_queue_depth` - Current orchestrator queue backlog
680    /// - `duroxide_worker_queue_depth` - Current worker queue backlog
681    async fn initialize_gauges(self: Arc<Self>) {
682        if let Some(admin) = self.history_store.as_management_capability() {
683            // Query provider for current state (parallel for efficiency)
684            let system_metrics_future = admin.get_system_metrics();
685            let queue_depths_future = admin.get_queue_depths();
686
687            let (system_result, queue_result) = tokio::join!(system_metrics_future, queue_depths_future);
688
689            if let Some(provider) = self.observability_handle.as_ref().map(|h| h.metrics_provider()) {
690                // Initialize active orchestrations gauge
691                if let Ok(metrics) = system_result {
692                    let active_count = metrics.running_instances as i64;
693                    provider.set_active_orchestrations(active_count);
694                    tracing::debug!(
695                        target: "duroxide::runtime",
696                        active_count = %active_count,
697                        "Initialized active orchestrations gauge"
698                    );
699                }
700
701                // Initialize queue depth gauges
702                if let Ok(depths) = queue_result {
703                    provider.update_queue_depths(depths.orchestrator_queue as u64, depths.worker_queue as u64);
704                    tracing::debug!(
705                        target: "duroxide::runtime",
706                        orch_queue = %depths.orchestrator_queue,
707                        worker_queue = %depths.worker_queue,
708                        "Initialized queue depth gauges"
709                    );
710                }
711            }
712        }
713    }
714
715    /// Spawn a background task that periodically polls the provider for gauge values.
716    ///
717    /// Updates `duroxide_active_orchestrations`, `duroxide_orchestrator_queue_depth`,
718    /// and `duroxide_worker_queue_depth` gauges from the database at the configured interval.
719    fn start_gauge_poller(self: Arc<Self>) -> JoinHandle<()> {
720        let interval = self.options.observability.gauge_poll_interval;
721        let shutdown_flag = self.shutdown_flag.clone();
722
723        tokio::spawn(async move {
724            tracing::debug!(
725                target: "duroxide::runtime",
726                interval_secs = interval.as_secs(),
727                "Gauge poller started"
728            );
729
730            loop {
731                tokio::time::sleep(interval).await;
732
733                if shutdown_flag.load(Ordering::Relaxed) {
734                    break;
735                }
736
737                self.clone().refresh_gauges().await;
738            }
739        })
740    }
741
742    /// Refresh all gauge metrics from the provider.
743    async fn refresh_gauges(self: Arc<Self>) {
744        let provider = &self
745            .observability_handle
746            .as_ref()
747            .expect("gauge poller only runs when observability is enabled")
748            .metrics_provider();
749        let admin = match self.history_store.as_management_capability() {
750            Some(admin) => admin,
751            None => return,
752        };
753
754        let (system_result, queue_result) = tokio::join!(admin.get_system_metrics(), admin.get_queue_depths());
755
756        if let Ok(metrics) = system_result {
757            provider.set_active_orchestrations(metrics.running_instances as i64);
758        }
759        if let Ok(depths) = queue_result {
760            provider.update_queue_depths(depths.orchestrator_queue as u64, depths.worker_queue as u64);
761        }
762    }
763
764    /// Compute execution metadata from history delta without inspecting event contents.
765    /// This allows the runtime to extract semantic information and pass it to the provider
766    /// as pre-computed metadata, preventing the provider from needing orchestration knowledge.
767    fn compute_execution_metadata(
768        history_delta: &[Event],
769        _orchestrator_items: &[WorkItem],
770        _current_execution_id: u64,
771    ) -> ExecutionMetadata {
772        let mut metadata = ExecutionMetadata::default();
773
774        // Scan history_delta for OrchestrationStarted (first event) and terminal events
775        for event in history_delta {
776            match &event.kind {
777                EventKind::OrchestrationStarted {
778                    name,
779                    version,
780                    parent_instance,
781                    ..
782                } => {
783                    // Capture orchestration metadata from start event
784                    metadata.orchestration_name = Some(name.clone());
785                    metadata.orchestration_version = Some(version.clone());
786                    // Capture parent instance for sub-orchestration tracking (cascading delete)
787                    metadata.parent_instance_id = parent_instance.clone();
788                    // Extract pinned duroxide version from the event's duroxide_version field.
789                    // This is the version of the runtime that created this execution.
790                    // The provider stores it for capability-filtered fetching.
791                    metadata.pinned_duroxide_version = semver::Version::parse(&event.duroxide_version).ok();
792                }
793                EventKind::OrchestrationCompleted { output } => {
794                    metadata.status = Some("Completed".to_string());
795                    metadata.output = Some(output.clone());
796                    break;
797                }
798                EventKind::OrchestrationFailed { details } => {
799                    metadata.status = Some("Failed".to_string());
800                    metadata.output = Some(details.display_message());
801                    break;
802                }
803                EventKind::OrchestrationContinuedAsNew { input } => {
804                    metadata.status = Some("ContinuedAsNew".to_string());
805                    metadata.output = Some(input.clone());
806                    // Don't set create_next_execution - the new execution will be started
807                    // by WorkItem::ContinueAsNew being processed like StartOrchestration
808                    break;
809                }
810                _ => {}
811            }
812        }
813
814        metadata
815    }
816
817    // Execution engine: consumes provider queues and persists history atomically.
818    /// Return the most recent descriptor `{ name, version, parent_instance?, parent_id? }` for an instance.
819    /// Returns `None` if the instance/history does not exist or no OrchestrationStarted is present.
820    pub async fn get_orchestration_descriptor(
821        &self,
822        instance: &str,
823    ) -> Option<crate::runtime::OrchestrationDescriptor> {
824        let hist = self.history_store.read(instance).await.unwrap_or_default();
825        for e in hist.iter().rev() {
826            if let EventKind::OrchestrationStarted {
827                name,
828                version,
829                parent_instance,
830                parent_id,
831                ..
832            } = &e.kind
833            {
834                return Some(crate::runtime::OrchestrationDescriptor {
835                    name: name.clone(),
836                    version: version.clone(),
837                    parent_instance: parent_instance.clone(),
838                    parent_id: *parent_id,
839                });
840            }
841        }
842        None
843    }
844
845    /// Get the current execution ID for an instance, or fetch from store if not tracked
846    ///
847    /// If `current_execution_id` is provided and the instance matches, use it directly.
848    /// Otherwise, check in-memory tracking, then fall back to INITIAL_EXECUTION_ID.
849    async fn get_execution_id_for_instance(&self, instance: &str, current_execution_id: Option<u64>) -> u64 {
850        // If this is the current instance being processed, use the provided execution_id
851        if let Some(exec_id) = current_execution_id {
852            // Update in-memory tracking for future calls
853            self.current_execution_ids
854                .lock()
855                .await
856                .insert(instance.to_string(), exec_id);
857            return exec_id;
858        }
859
860        // First check in-memory tracking
861        if let Some(&exec_id) = self.current_execution_ids.lock().await.get(instance) {
862            return exec_id;
863        }
864
865        // Fall back to INITIAL_EXECUTION_ID (no longer querying Provider::latest_execution_id)
866        crate::INITIAL_EXECUTION_ID
867    }
868
869    /// Start a new runtime using the in-memory SQLite provider.
870    ///
871    /// Requires the `sqlite` feature.
872    #[cfg(feature = "sqlite")]
873    pub async fn start(
874        activity_registry: registry::ActivityRegistry,
875        orchestration_registry: OrchestrationRegistry,
876    ) -> Arc<Self> {
877        let history_store: Arc<dyn Provider> = Arc::new(
878            crate::providers::sqlite::SqliteProvider::new_in_memory()
879                .await
880                .expect("in-memory SQLite provider creation should never fail"),
881        );
882        Self::start_with_store(history_store, activity_registry, orchestration_registry).await
883    }
884
885    /// Start a new runtime with a custom `Provider` implementation.
886    pub async fn start_with_store(
887        history_store: Arc<dyn Provider>,
888        activity_registry: registry::ActivityRegistry,
889        orchestration_registry: OrchestrationRegistry,
890    ) -> Arc<Self> {
891        Self::start_with_options(
892            history_store,
893            activity_registry,
894            orchestration_registry,
895            RuntimeOptions::default(),
896        )
897        .await
898    }
899
900    /// Start a new runtime with custom options.
901    ///
902    /// # Panics
903    ///
904    /// Panics if `session_idle_timeout` is not greater than the worker lock renewal interval
905    /// (`worker_lock_timeout - worker_lock_renewal_buffer`). This prevents sessions from being
906    /// unpinned during long-running activity execution.
907    pub async fn start_with_options(
908        history_store: Arc<dyn Provider>,
909        activity_registry: registry::ActivityRegistry,
910        orchestration_registry: OrchestrationRegistry,
911        options: RuntimeOptions,
912    ) -> Arc<Self> {
913        // Validate session timeout invariant
914        let worker_renewal_interval = options
915            .worker_lock_timeout
916            .checked_sub(options.worker_lock_renewal_buffer)
917            .unwrap_or(Duration::from_secs(1));
918        if options.session_idle_timeout <= worker_renewal_interval {
919            panic!(
920                "session_idle_timeout ({}s) must be greater than worker lock renewal interval ({}s). \
921                 Sessions would unpin during long-running activity execution. \
922                 Increase session_idle_timeout or decrease worker_lock_timeout.",
923                options.session_idle_timeout.as_secs(),
924                worker_renewal_interval.as_secs(),
925            );
926        }
927
928        // Inject built-in system activities (new_guid, utc_now_ms, get_kv_value)
929        let activity_registry = inject_builtin_activities(activity_registry);
930
931        // Wrap activity registry in Arc for internal sharing across worker threads
932        let activity_registry = Arc::new(activity_registry);
933
934        // Initialize observability (metrics + structured logging)
935        let observability_handle = observability::ObservabilityHandle::init(&options.observability).ok(); // Gracefully degrade if observability fails to initialize
936
937        // Print version on startup
938        tracing::info!(
939            target: "duroxide::runtime",
940            "duroxide runtime ({}) starting with provider {} ({})",
941            env!("CARGO_PKG_VERSION"),
942            history_store.name(),
943            history_store.version()
944        );
945
946        // Wrap provider with metrics instrumentation if metrics are enabled
947        let history_store: Arc<dyn Provider> = if let Some(ref handle) = observability_handle {
948            let metrics = handle.metrics_provider();
949            Arc::new(crate::providers::instrumented::InstrumentedProvider::new(
950                history_store,
951                Some(metrics.clone()),
952            ))
953        } else {
954            history_store
955        };
956
957        let joins: Vec<JoinHandle<()>> = Vec::new();
958
959        // Generate unique runtime instance ID (4-char hex)
960        use std::time::{SystemTime, UNIX_EPOCH};
961        let runtime_id = format!(
962            "{:04x}",
963            SystemTime::now()
964                .duration_since(UNIX_EPOCH)
965                .map(|d| (d.as_nanos() & 0xFFFF) as u16)
966                .unwrap_or(0)
967        );
968
969        // start request queue + worker
970        let runtime = Arc::new(Self {
971            joins: Mutex::new(joins),
972            history_store,
973            orchestration_registry,
974            current_execution_ids: Mutex::new(HashMap::new()),
975            shutdown_flag: Arc::new(AtomicBool::new(false)),
976
977            options,
978            observability_handle,
979            runtime_id,
980        });
981
982        // Initialize gauges from provider (if supported)
983        runtime.clone().initialize_gauges().await;
984
985        // Start periodic gauge polling if observability is enabled
986        if runtime.observability_handle.is_some() {
987            let gauge_handle = runtime.clone().start_gauge_poller();
988            runtime.joins.lock().await.push(gauge_handle);
989        }
990
991        // background orchestrator dispatcher (extracted from inline poller)
992        let handle = runtime.clone().start_orchestration_dispatcher();
993        runtime.joins.lock().await.push(handle);
994
995        // background work dispatcher (executes activities)
996        let work_handle = runtime.clone().start_work_dispatcher(activity_registry);
997        runtime.joins.lock().await.push(work_handle);
998
999        runtime
1000    }
1001
1002    /// Shutdown the runtime.
1003    ///
1004    /// # Parameters
1005    ///
1006    /// * `timeout_ms` - How long to wait for graceful shutdown:
1007    ///   - `None`: Default 1000ms
1008    ///   - `Some(Duration::ZERO)`: Immediate abort
1009    ///   - `Some(ms)`: Wait specified milliseconds
1010    pub async fn shutdown(self: Arc<Self>, timeout_ms: Option<u64>) {
1011        let timeout_ms = timeout_ms.unwrap_or(1000);
1012
1013        if timeout_ms == 0 {
1014            warn!("Immediate shutdown - aborting all tasks");
1015            let mut joins = self.joins.lock().await;
1016            for j in joins.drain(..) {
1017                j.abort();
1018            }
1019            return;
1020        }
1021
1022        // debug!("Graceful shutdown (timeout: {}ms)", timeout_ms);
1023
1024        // Set shutdown flag - workers check this between iterations
1025        self.shutdown_flag.store(true, Ordering::Relaxed);
1026
1027        // Give workers time to notice and exit gracefully
1028        tokio::time::sleep(std::time::Duration::from_millis(timeout_ms)).await;
1029
1030        // Check if any tasks are still running (need to be aborted)
1031        let mut joins = self.joins.lock().await;
1032
1033        // Abort any remaining tasks
1034        for j in joins.drain(..) {
1035            j.abort();
1036        }
1037
1038        // debug!("Runtime shut down");
1039
1040        // Shutdown observability last (after all workers stopped)
1041        // Note: We can't move out of Arc here, so observability shutdown happens when Runtime is dropped
1042        // or if we could restructure to take ownership in shutdown
1043    }
1044}