duroxide/runtime/mod.rs
1// Runtime module: Mutex poisoning indicates a panic - all lock().unwrap()/expect() are intentional.
2#![allow(clippy::expect_used)]
3#![allow(clippy::unwrap_used)]
4#![allow(clippy::clone_on_ref_ptr)]
5
6//
7use crate::providers::{ExecutionMetadata, Provider, WorkItem};
8use crate::{Event, EventKind, OrchestrationContext};
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::time::Duration;
13use tokio::sync::Mutex;
14use tokio::task::JoinHandle;
15use tracing::warn;
16
17// ============================================================================
18// Built-in System Activities
19// ============================================================================
20
21/// Inject built-in system activities into the activity registry.
22/// This adds the new_guid, utc_now_ms, and get_kv_value activities that are used by
23/// `OrchestrationContext::new_guid()`, `OrchestrationContext::utc_now()`,
24/// and `OrchestrationContext::get_value_from_instance()`.
25fn inject_builtin_activities(user_registry: registry::ActivityRegistry) -> registry::ActivityRegistry {
26 registry::ActivityRegistry::builder_from(&user_registry)
27 .register_builtin(
28 crate::SYSCALL_ACTIVITY_NEW_GUID,
29 |_ctx: crate::ActivityContext, _input: String| async move { Ok(crate::generate_guid()) },
30 )
31 .register_builtin(
32 crate::SYSCALL_ACTIVITY_UTC_NOW_MS,
33 |_ctx: crate::ActivityContext, _input: String| async move {
34 use std::time::{SystemTime, UNIX_EPOCH};
35 let ms = SystemTime::now()
36 .duration_since(UNIX_EPOCH)
37 .map(|d| d.as_millis() as u64)
38 .unwrap_or(0);
39 Ok(ms.to_string())
40 },
41 )
42 .register_builtin(
43 crate::SYSCALL_ACTIVITY_GET_KV_VALUE,
44 |ctx: crate::ActivityContext, input: String| async move {
45 let parsed: serde_json::Value =
46 serde_json::from_str(&input).map_err(|e| format!("get_kv_value: invalid input: {e}"))?;
47 let instance_id = parsed["instance_id"]
48 .as_str()
49 .ok_or_else(|| "get_kv_value: missing instance_id".to_string())?;
50 let key = parsed["key"]
51 .as_str()
52 .ok_or_else(|| "get_kv_value: missing key".to_string())?;
53 let client = ctx.get_client();
54 let value = client
55 .get_kv_value(instance_id, key)
56 .await
57 .map_err(|e| format!("get_kv_value client error: {e}"))?;
58 serde_json::to_string(&value).map_err(|e| format!("get_kv_value serialization error: {e}"))
59 },
60 )
61 .build_result()
62 .expect("builtin syscall activity registration should never fail")
63}
64
65/// Configuration for exponential backoff when encountering unregistered orchestrations/activities.
66///
67/// During rolling deployments, work items for unregistered handlers are abandoned
68/// with exponential backoff instead of immediately failing. This allows the runtime
69/// to wait for the handler to be registered on upgraded nodes.
70///
71/// # Backoff Calculation
72///
73/// For a work item with `attempt_count` (1-based):
74/// - Delay = `base_delay * 2^(attempt_count - 1)`, capped at `max_delay`
75///
76/// # Example with Default Configuration
77///
78/// With default settings (`base_delay: 1s`, `max_delay: 60s`):
79/// - Attempt 1: 1s delay
80/// - Attempt 2: 2s delay
81/// - Attempt 3: 4s delay
82/// - Attempt 4: 8s delay
83/// - Attempt 5: 16s delay
84/// - Attempt 6: 32s delay
85/// - Attempt 7+: 60s delay (capped)
86#[derive(Debug, Clone)]
87pub struct UnregisteredBackoffConfig {
88 /// Base delay for the first backoff attempt.
89 /// Default: 1 second
90 pub base_delay: Duration,
91
92 /// Maximum delay cap for any backoff attempt.
93 /// Default: 60 seconds
94 pub max_delay: Duration,
95}
96
97impl UnregisteredBackoffConfig {
98 /// Maximum exponent for backoff calculation (caps at 64x base delay)
99 const MAX_BACKOFF_EXPONENT: u32 = 6;
100 /// Default base delay for unregistered handler backoff
101 const DEFAULT_BASE_DELAY: Duration = Duration::from_secs(1);
102 /// Default maximum delay for unregistered handler backoff
103 const DEFAULT_MAX_DELAY: Duration = Duration::from_secs(60);
104
105 /// Calculate the backoff delay for a given attempt count (1-based).
106 ///
107 /// # Arguments
108 ///
109 /// * `attempt_count` - The fetch attempt number (1-based from provider)
110 ///
111 /// # Returns
112 ///
113 /// The backoff delay, capped at `max_delay`
114 pub fn delay(&self, attempt_count: u32) -> Duration {
115 // attempt_count is 1-based, so subtract 1 for exponent
116 let exponent = attempt_count.saturating_sub(1).min(Self::MAX_BACKOFF_EXPONENT);
117 let delay = self.base_delay.saturating_mul(1 << exponent);
118 delay.min(self.max_delay)
119 }
120}
121
122impl Default for UnregisteredBackoffConfig {
123 fn default() -> Self {
124 Self {
125 base_delay: Self::DEFAULT_BASE_DELAY,
126 max_delay: Self::DEFAULT_MAX_DELAY,
127 }
128 }
129}
130
131/// Configuration options for the Runtime.
132///
133/// # Example
134///
135/// ```rust,no_run
136/// # use duroxide::runtime::{RuntimeOptions, ObservabilityConfig, LogFormat};
137/// # use std::time::Duration;
138/// let options = RuntimeOptions {
139/// orchestration_concurrency: 4,
140/// worker_concurrency: 8,
141/// dispatcher_min_poll_interval: Duration::from_millis(25), // Polling backoff when queues idle
142/// dispatcher_long_poll_timeout: Duration::from_secs(30), // Long polling timeout
143/// orchestrator_lock_timeout: Duration::from_secs(10), // Orchestration turns retry after 10s
144/// worker_lock_timeout: Duration::from_secs(300), // Activities retry after 5 minutes
145/// worker_lock_renewal_buffer: Duration::from_secs(30), // Renew worker locks 30s early
146/// observability: ObservabilityConfig {
147/// log_format: LogFormat::Compact,
148/// log_level: "info".to_string(),
149/// ..Default::default()
150/// },
151/// ..Default::default()
152/// };
153/// ```
154#[derive(Debug, Clone)]
155pub struct RuntimeOptions {
156 /// Minimum polling cycle duration when idle.
157 ///
158 /// If a provider returns 'None' (no work) faster than this duration,
159 /// the dispatcher will sleep for the remainder of the time.
160 /// This prevents hot loops for providers that do not support long polling
161 /// or return early.
162 ///
163 /// Default: 100ms (10 Hz)
164 pub dispatcher_min_poll_interval: Duration,
165
166 /// Maximum time to wait for work inside the provider (Long Polling).
167 ///
168 /// Only used if the provider supports long polling.
169 ///
170 /// Default: 30 seconds
171 pub dispatcher_long_poll_timeout: Duration,
172
173 /// Number of concurrent orchestration workers.
174 /// Each worker can process one orchestration turn at a time.
175 /// Higher values = more parallel orchestration execution.
176 /// Default: 2
177 pub orchestration_concurrency: usize,
178
179 /// Number of concurrent worker dispatchers.
180 /// Each worker can execute one activity at a time.
181 /// Higher values = more parallel activity execution.
182 /// Default: 2
183 pub worker_concurrency: usize,
184
185 /// Lock timeout for orchestrator queue items.
186 /// When an orchestration message is dequeued, it's locked for this duration.
187 /// Orchestration turns are typically fast (milliseconds), so a shorter timeout is appropriate.
188 /// If processing doesn't complete within this time, the lock expires and the message is retried.
189 /// Default: 5 seconds
190 pub orchestrator_lock_timeout: Duration,
191
192 /// Buffer time before orchestration lock expiration to trigger renewal.
193 ///
194 /// Lock renewal strategy:
195 /// - If `orchestrator_lock_timeout` ≥ 15s: renew at (`timeout - orchestrator_lock_renewal_buffer`)
196 /// - If `orchestrator_lock_timeout` < 15s: renew at 0.5 × timeout (buffer ignored)
197 ///
198 /// Default: 2 seconds
199 pub orchestrator_lock_renewal_buffer: Duration,
200
201 /// Lock timeout for worker queue items (activities).
202 /// When an activity is dequeued, it's locked for this duration.
203 /// Activities can be long-running (minutes), so a longer timeout is appropriate.
204 /// If processing doesn't complete within this time, the lock expires and the activity is retried.
205 /// Higher values = more tolerance for long-running activities.
206 /// Lower values = faster retry on failures, but may timeout legitimate work.
207 /// Default: 30 seconds
208 pub worker_lock_timeout: Duration,
209
210 /// Buffer time before lock expiration to trigger renewal.
211 ///
212 /// Lock renewal strategy:
213 /// - If `worker_lock_timeout` ≥ 15s: renew at (`timeout - worker_lock_renewal_buffer`)
214 /// - If `worker_lock_timeout` < 15s: renew at 0.5 × timeout (buffer ignored)
215 ///
216 /// Example with default values (timeout=30s, buffer=5s):
217 /// - Initial lock: expires at T+30s
218 /// - First renewal: at T+25s (30-5), extends to T+55s
219 /// - Second renewal: at T+50s (55-5), extends to T+80s
220 ///
221 /// Example with short timeout (timeout=10s, buffer ignored):
222 /// - Initial lock: expires at T+10s
223 /// - First renewal: at T+5s (10*0.5), extends to T+15s
224 /// - Second renewal: at T+10s (15*0.5), extends to T+20s
225 ///
226 /// Default: 5 seconds
227 pub worker_lock_renewal_buffer: Duration,
228
229 /// Observability configuration for metrics and logging.
230 /// Requires the `observability` feature flag for full functionality.
231 /// Default: Disabled with basic logging
232 pub observability: ObservabilityConfig,
233
234 /// Configuration for backoff when encountering unregistered orchestrations/activities.
235 ///
236 /// During rolling deployments, work items for unregistered handlers are abandoned
237 /// with exponential backoff instead of immediately failing. This allows the runtime
238 /// to wait for the handler to be registered on upgraded nodes.
239 ///
240 /// Default: 1s base delay, 60s max delay
241 pub unregistered_backoff: UnregisteredBackoffConfig,
242
243 /// Maximum fetch attempts before a message is considered poison.
244 ///
245 /// After this many fetch attempts, the runtime will immediately fail
246 /// the orchestration/activity with a Poison error instead of processing.
247 ///
248 /// Default: 10
249 pub max_attempts: u32,
250
251 /// Grace period for activity cancellation.
252 ///
253 /// When an orchestration reaches a terminal state, in-flight activities
254 /// are notified via their cancellation token. This setting controls how
255 /// long to wait for activities to complete gracefully before aborting
256 /// the activity task to free worker capacity.
257 ///
258 /// After this grace period, if the activity has not completed:
259 /// - The activity task is aborted (`JoinHandle::abort()`)
260 /// - The worker queue message is dropped without notifying the orchestrator
261 /// - A warning is logged
262 ///
263 /// Note: Child tasks/threads spawned by the activity that do not observe
264 /// the cancellation token may outlive the abort (user responsibility).
265 ///
266 /// Default: 10 seconds
267 pub activity_cancellation_grace_period: Duration,
268
269 /// Override the replay-engine version range used for capability filtering.
270 ///
271 /// By default, the runtime uses `>=0.0.0, <=CURRENT_BUILD_VERSION`, meaning it
272 /// can replay any execution pinned at or below its own semver. This is correct for
273 /// most deployments since replay engines are backward-compatible.
274 ///
275 /// Set this to change the range for advanced scenarios:
276 /// - **Narrowing:** Restrict a node to only process a specific version band
277 /// (e.g., `>=1.0.0, <=1.9.999` in a mixed-version cluster).
278 /// - **Widening to drain stuck items:** Set a wide range like `>=0.0.0, <=99.0.0`
279 /// to fetch orchestrations pinned at any version. Items with unknown event types
280 /// will fail at provider-level deserialization (never reaching the replay engine)
281 /// and remain in the queue with escalating `attempt_count`.
282 ///
283 /// Default: `None` (uses `>=0.0.0, <=CURRENT_BUILD_VERSION`)
284 pub supported_replay_versions: Option<crate::providers::SemverRange>,
285
286 /// Lock timeout for session heartbeat lease.
287 /// Controls crash recovery speed — if a worker dies, its sessions become
288 /// claimable after this duration.
289 /// Default: 30 seconds
290 pub session_lock_timeout: Duration,
291
292 /// Buffer time before session lock expiration to trigger renewal.
293 /// Uses the same formula as `worker_lock_renewal_buffer`.
294 /// Default: 5 seconds
295 pub session_lock_renewal_buffer: Duration,
296
297 /// How long a session stays pinned after the last activity is
298 /// fetched, renewed, or completed. The session renewal thread
299 /// stops heartbeating idle sessions, so their locks naturally expire.
300 /// Default: 5 minutes
301 pub session_idle_timeout: Duration,
302
303 /// How often orphaned session rows are swept from the sessions table.
304 /// Runs on the same background thread as session lock renewal.
305 /// Default: 5 minutes
306 pub session_cleanup_interval: Duration,
307
308 /// Maximum number of distinct sessions this runtime will own concurrently,
309 /// spanning **all** `worker_concurrency` slots.
310 ///
311 /// A single `SessionTracker` is shared across every worker slot in this
312 /// runtime. When `distinct_count()` reaches this limit, **all** slots stop
313 /// claiming new sessions (fetch switches to non-session mode) until an
314 /// in-flight session activity completes and frees a session slot.
315 ///
316 /// Session activities and non-session activities share the same
317 /// `worker_concurrency` slots.
318 /// Default: 10
319 pub max_sessions_per_runtime: usize,
320
321 /// Stable worker identity for session ownership.
322 /// If set, used directly as the session `worker_id` for session claims —
323 /// all `worker_concurrency` slots share this single identity, so any idle
324 /// slot can serve any session owned by this runtime (no head-of-line blocking).
325 /// Also allows a restarted worker to reclaim its sessions without waiting
326 /// for lock expiry.
327 /// Example: Kubernetes StatefulSet pod name.
328 /// If `None`, uses ephemeral per-slot identity (`work-{idx}-{runtime_id}`);
329 /// sessions are pinned per-slot and cannot survive restarts.
330 /// Note: Logging/tracing always includes the per-slot `work-{idx}-{node_id}`
331 /// format regardless of this setting.
332 /// Default: None
333 pub worker_node_id: Option<String>,
334
335 /// Tag filter for worker activity routing.
336 ///
337 /// Controls which activities this runtime's worker slots will process:
338 /// - `DefaultOnly`: Only untagged activities (default)
339 /// - `Tags(["gpu"])`: Only activities tagged `"gpu"`
340 /// - `DefaultAnd(["gpu"])`: Both untagged and `"gpu"` activities
341 /// - `None`: Disable worker (orchestrator-only mode)
342 ///
343 /// Default: `TagFilter::DefaultOnly` (untagged activities only)
344 pub worker_tag_filter: crate::providers::TagFilter,
345}
346
347impl Default for RuntimeOptions {
348 fn default() -> Self {
349 Self {
350 dispatcher_min_poll_interval: Duration::from_millis(100),
351 dispatcher_long_poll_timeout: Duration::from_secs(30), // 30 seconds
352 orchestration_concurrency: 2,
353 worker_concurrency: 2,
354 orchestrator_lock_timeout: Duration::from_secs(5),
355 orchestrator_lock_renewal_buffer: Duration::from_secs(2),
356 worker_lock_timeout: Duration::from_secs(30),
357 worker_lock_renewal_buffer: Duration::from_secs(5),
358 observability: ObservabilityConfig::default(),
359 unregistered_backoff: UnregisteredBackoffConfig::default(),
360 max_attempts: 10,
361 activity_cancellation_grace_period: Duration::from_secs(10),
362 supported_replay_versions: None,
363 session_lock_timeout: Duration::from_secs(30),
364 session_lock_renewal_buffer: Duration::from_secs(5),
365 session_idle_timeout: Duration::from_secs(300), // 5 minutes
366 session_cleanup_interval: Duration::from_secs(300), // 5 minutes
367 max_sessions_per_runtime: 10,
368 worker_node_id: None,
369 worker_tag_filter: crate::providers::TagFilter::default(),
370 }
371 }
372}
373
374mod dispatchers;
375pub mod limits;
376pub mod observability;
377pub mod registry;
378mod state_helpers;
379
380#[cfg(feature = "test-hooks")]
381pub mod test_hooks;
382
383use async_trait::async_trait;
384pub use state_helpers::{HistoryManager, WorkItemReader};
385
386pub mod execution;
387pub mod replay_engine;
388
389pub use observability::{LogFormat, ObservabilityConfig};
390
391/// High-level orchestration status derived from history.
392#[derive(Debug, Clone, PartialEq, Eq)]
393pub enum OrchestrationStatus {
394 /// Instance does not exist
395 NotFound,
396 /// Instance is currently executing
397 Running {
398 /// User-defined progress string set via `ctx.set_custom_status()`
399 custom_status: Option<String>,
400 /// Monotonically increasing version counter for change detection
401 custom_status_version: u64,
402 },
403 /// Instance completed successfully with output
404 Completed {
405 output: String,
406 /// Last custom status set before completion
407 custom_status: Option<String>,
408 /// Version at completion time
409 custom_status_version: u64,
410 },
411 /// Instance failed with structured error details.
412 /// Use `details.category()` to distinguish infrastructure/configuration/application errors.
413 Failed {
414 details: crate::ErrorDetails,
415 /// Last custom status set before failure
416 custom_status: Option<String>,
417 /// Version at failure time
418 custom_status_version: u64,
419 },
420}
421
422/// Trait implemented by orchestration handlers that can be invoked by the runtime.
423#[async_trait]
424pub trait OrchestrationHandler: Send + Sync {
425 async fn invoke(&self, ctx: OrchestrationContext, input: String) -> Result<String, String>;
426}
427
428/// Function wrapper that implements `OrchestrationHandler`.
429pub struct FnOrchestration<F, Fut>(pub F)
430where
431 F: Fn(OrchestrationContext, String) -> Fut + Send + Sync + 'static,
432 Fut: std::future::Future<Output = Result<String, String>> + Send + 'static;
433
434#[async_trait]
435impl<F, Fut> OrchestrationHandler for FnOrchestration<F, Fut>
436where
437 F: Fn(OrchestrationContext, String) -> Fut + Send + Sync + 'static,
438 Fut: std::future::Future<Output = Result<String, String>> + Send + 'static,
439{
440 async fn invoke(&self, ctx: OrchestrationContext, input: String) -> Result<String, String> {
441 (self.0)(ctx, input).await
442 }
443}
444
445/// Trait implemented by activity handlers that can be invoked by the runtime.
446#[async_trait]
447pub trait ActivityHandler: Send + Sync {
448 async fn invoke(&self, ctx: crate::ActivityContext, input: String) -> Result<String, String>;
449}
450
451/// Function wrapper that implements `ActivityHandler`.
452pub struct FnActivity<F, Fut>(pub F)
453where
454 F: Fn(crate::ActivityContext, String) -> Fut + Send + Sync + 'static,
455 Fut: std::future::Future<Output = Result<String, String>> + Send + 'static;
456
457#[async_trait]
458impl<F, Fut> ActivityHandler for FnActivity<F, Fut>
459where
460 F: Fn(crate::ActivityContext, String) -> Fut + Send + Sync + 'static,
461 Fut: std::future::Future<Output = Result<String, String>> + Send + 'static,
462{
463 async fn invoke(&self, ctx: crate::ActivityContext, input: String) -> Result<String, String> {
464 (self.0)(ctx, input).await
465 }
466}
467
468/// Immutable registry mapping orchestration names to versioned handlers.
469pub use crate::runtime::registry::{OrchestrationRegistry, OrchestrationRegistryBuilder, VersionPolicy};
470
471pub fn kind_of(msg: &WorkItem) -> &'static str {
472 match msg {
473 WorkItem::StartOrchestration { .. } => "StartOrchestration",
474 WorkItem::ActivityExecute { .. } => "ActivityExecute",
475 WorkItem::ActivityCompleted { .. } => "ActivityCompleted",
476 WorkItem::ActivityFailed { .. } => "ActivityFailed",
477 WorkItem::TimerFired { .. } => "TimerFired",
478 WorkItem::ExternalRaised { .. } => "ExternalRaised",
479 WorkItem::QueueMessage { .. } => "ExternalRaisedPersistent",
480 #[cfg(feature = "replay-version-test")]
481 WorkItem::ExternalRaised2 { .. } => "ExternalRaised2",
482 WorkItem::SubOrchCompleted { .. } => "SubOrchCompleted",
483 WorkItem::SubOrchFailed { .. } => "SubOrchFailed",
484 WorkItem::CancelInstance { .. } => "CancelInstance",
485 WorkItem::ContinueAsNew { .. } => "ContinueAsNew",
486 }
487}
488
489/// In-process runtime that executes activities and timers and persists
490/// history via a `Provider`.
491pub struct Runtime {
492 joins: Mutex<Vec<JoinHandle<()>>>,
493 history_store: Arc<dyn Provider>,
494 orchestration_registry: OrchestrationRegistry,
495 /// Track the current execution ID for each active instance
496 current_execution_ids: Mutex<HashMap<String, u64>>,
497 /// Shutdown flag checked by dispatchers
498 shutdown_flag: Arc<AtomicBool>,
499 /// Runtime configuration options
500 options: RuntimeOptions,
501 /// Observability handle for metrics and logging
502 observability_handle: Option<observability::ObservabilityHandle>,
503 /// Unique runtime instance ID (4-char hex, generated on start)
504 runtime_id: String,
505}
506
507/// Introspection: descriptor of an orchestration derived from history.
508#[derive(Debug, Clone, PartialEq, Eq)]
509pub struct OrchestrationDescriptor {
510 pub name: String,
511 pub version: String,
512 pub parent_instance: Option<String>,
513 pub parent_id: Option<u64>,
514}
515
516impl Runtime {
517 /// Helper to get the metrics provider if available.
518 #[inline]
519 fn metrics_provider(&self) -> Option<&observability::MetricsProvider> {
520 self.observability_handle
521 .as_ref()
522 .map(|h| h.metrics_provider().as_ref())
523 }
524
525 // New label-aware metric recording methods
526 #[inline]
527 fn record_orchestration_start(&self, orchestration_name: &str, version: &str, initiated_by: &str) {
528 if let Some(provider) = self.metrics_provider() {
529 provider.record_orchestration_start(orchestration_name, version, initiated_by);
530 }
531 }
532
533 #[inline]
534 fn record_orchestration_completion_with_labels(
535 &self,
536 orchestration_name: &str,
537 version: &str,
538 status: &str,
539 duration_seconds: f64,
540 turn_count: u64,
541 history_events: u64,
542 ) {
543 if let Some(provider) = self.metrics_provider() {
544 provider.record_orchestration_completion(
545 orchestration_name,
546 version,
547 status,
548 duration_seconds,
549 turn_count,
550 history_events,
551 );
552 }
553 }
554
555 #[inline]
556 fn record_orchestration_failure_with_labels(
557 &self,
558 orchestration_name: &str,
559 version: &str,
560 error_type: &str,
561 error_category: &str,
562 ) {
563 if let Some(provider) = self.metrics_provider() {
564 provider.record_orchestration_failure(orchestration_name, version, error_type, error_category);
565 }
566 }
567
568 #[inline]
569 fn record_continue_as_new(&self, orchestration_name: &str, execution_id: u64) {
570 if let Some(provider) = self.metrics_provider() {
571 provider.record_continue_as_new(orchestration_name, execution_id);
572 }
573 }
574
575 #[inline]
576 fn increment_active_orchestrations(&self) {
577 if let Some(provider) = self.metrics_provider() {
578 provider.increment_active_orchestrations();
579 }
580 }
581
582 #[inline]
583 fn decrement_active_orchestrations(&self) {
584 if let Some(provider) = self.metrics_provider() {
585 provider.decrement_active_orchestrations();
586 }
587 }
588
589 #[inline]
590 fn record_activity_execution(
591 &self,
592 activity_name: &str,
593 outcome: &str,
594 duration_seconds: f64,
595 retry_attempt: u32,
596 tag: Option<&str>,
597 ) {
598 if let Some(provider) = self.metrics_provider() {
599 provider.record_activity_execution(activity_name, outcome, duration_seconds, retry_attempt, tag);
600 }
601 }
602
603 // Simple metric recording methods (used by execution.rs and worker.rs)
604 // These call MetricsProvider methods which emit both counter!() and atomic increments
605 #[inline]
606 fn record_orchestration_application_error(&self) {
607 if let Some(provider) = self.metrics_provider() {
608 provider.record_orchestration_application_error();
609 }
610 }
611
612 #[inline]
613 fn record_orchestration_infrastructure_error(&self) {
614 if let Some(provider) = self.metrics_provider() {
615 provider.record_orchestration_infrastructure_error();
616 }
617 }
618
619 #[inline]
620 fn record_orchestration_configuration_error(&self) {
621 if let Some(provider) = self.metrics_provider() {
622 provider.record_orchestration_configuration_error();
623 }
624 }
625
626 #[inline]
627 fn record_activity_success(&self) {
628 if let Some(provider) = self.metrics_provider() {
629 provider.record_activity_success();
630 }
631 }
632
633 #[inline]
634 fn record_activity_app_error(&self) {
635 if let Some(provider) = self.metrics_provider() {
636 provider.record_activity_app_error();
637 }
638 }
639
640 #[inline]
641 fn record_activity_infra_error(&self) {
642 if let Some(provider) = self.metrics_provider() {
643 provider.record_activity_infra_error();
644 }
645 }
646
647 #[inline]
648 fn record_orchestration_poison(&self) {
649 if let Some(provider) = self.metrics_provider() {
650 provider.record_orchestration_poison();
651 }
652 }
653
654 #[inline]
655 fn record_activity_poison(&self) {
656 if let Some(provider) = self.metrics_provider() {
657 provider.record_activity_poison();
658 }
659 }
660
661 pub fn metrics_snapshot(&self) -> Option<observability::MetricsSnapshot> {
662 self.observability_handle
663 .as_ref()
664 .map(|handle| handle.metrics_snapshot())
665 }
666
667 /// Returns a reference to the observability handle, if observability is enabled.
668 pub fn observability_handle(&self) -> Option<&observability::ObservabilityHandle> {
669 self.observability_handle.as_ref()
670 }
671
672 /// Initialize all gauges that need to sync with persistent state on startup.
673 ///
674 /// Gauges (unlike counters) represent current state and must be initialized
675 /// from the provider to reflect reality after a restart.
676 ///
677 /// This initializes:
678 /// - `duroxide_active_orchestrations` - Current running orchestrations
679 /// - `duroxide_orchestrator_queue_depth` - Current orchestrator queue backlog
680 /// - `duroxide_worker_queue_depth` - Current worker queue backlog
681 async fn initialize_gauges(self: Arc<Self>) {
682 if let Some(admin) = self.history_store.as_management_capability() {
683 // Query provider for current state (parallel for efficiency)
684 let system_metrics_future = admin.get_system_metrics();
685 let queue_depths_future = admin.get_queue_depths();
686
687 let (system_result, queue_result) = tokio::join!(system_metrics_future, queue_depths_future);
688
689 if let Some(provider) = self.observability_handle.as_ref().map(|h| h.metrics_provider()) {
690 // Initialize active orchestrations gauge
691 if let Ok(metrics) = system_result {
692 let active_count = metrics.running_instances as i64;
693 provider.set_active_orchestrations(active_count);
694 tracing::debug!(
695 target: "duroxide::runtime",
696 active_count = %active_count,
697 "Initialized active orchestrations gauge"
698 );
699 }
700
701 // Initialize queue depth gauges
702 if let Ok(depths) = queue_result {
703 provider.update_queue_depths(depths.orchestrator_queue as u64, depths.worker_queue as u64);
704 tracing::debug!(
705 target: "duroxide::runtime",
706 orch_queue = %depths.orchestrator_queue,
707 worker_queue = %depths.worker_queue,
708 "Initialized queue depth gauges"
709 );
710 }
711 }
712 }
713 }
714
715 /// Spawn a background task that periodically polls the provider for gauge values.
716 ///
717 /// Updates `duroxide_active_orchestrations`, `duroxide_orchestrator_queue_depth`,
718 /// and `duroxide_worker_queue_depth` gauges from the database at the configured interval.
719 fn start_gauge_poller(self: Arc<Self>) -> JoinHandle<()> {
720 let interval = self.options.observability.gauge_poll_interval;
721 let shutdown_flag = self.shutdown_flag.clone();
722
723 tokio::spawn(async move {
724 tracing::debug!(
725 target: "duroxide::runtime",
726 interval_secs = interval.as_secs(),
727 "Gauge poller started"
728 );
729
730 loop {
731 tokio::time::sleep(interval).await;
732
733 if shutdown_flag.load(Ordering::Relaxed) {
734 break;
735 }
736
737 self.clone().refresh_gauges().await;
738 }
739 })
740 }
741
742 /// Refresh all gauge metrics from the provider.
743 async fn refresh_gauges(self: Arc<Self>) {
744 let provider = &self
745 .observability_handle
746 .as_ref()
747 .expect("gauge poller only runs when observability is enabled")
748 .metrics_provider();
749 let admin = match self.history_store.as_management_capability() {
750 Some(admin) => admin,
751 None => return,
752 };
753
754 let (system_result, queue_result) = tokio::join!(admin.get_system_metrics(), admin.get_queue_depths());
755
756 if let Ok(metrics) = system_result {
757 provider.set_active_orchestrations(metrics.running_instances as i64);
758 }
759 if let Ok(depths) = queue_result {
760 provider.update_queue_depths(depths.orchestrator_queue as u64, depths.worker_queue as u64);
761 }
762 }
763
764 /// Compute execution metadata from history delta without inspecting event contents.
765 /// This allows the runtime to extract semantic information and pass it to the provider
766 /// as pre-computed metadata, preventing the provider from needing orchestration knowledge.
767 fn compute_execution_metadata(
768 history_delta: &[Event],
769 _orchestrator_items: &[WorkItem],
770 _current_execution_id: u64,
771 ) -> ExecutionMetadata {
772 let mut metadata = ExecutionMetadata::default();
773
774 // Scan history_delta for OrchestrationStarted (first event) and terminal events
775 for event in history_delta {
776 match &event.kind {
777 EventKind::OrchestrationStarted {
778 name,
779 version,
780 parent_instance,
781 ..
782 } => {
783 // Capture orchestration metadata from start event
784 metadata.orchestration_name = Some(name.clone());
785 metadata.orchestration_version = Some(version.clone());
786 // Capture parent instance for sub-orchestration tracking (cascading delete)
787 metadata.parent_instance_id = parent_instance.clone();
788 // Extract pinned duroxide version from the event's duroxide_version field.
789 // This is the version of the runtime that created this execution.
790 // The provider stores it for capability-filtered fetching.
791 metadata.pinned_duroxide_version = semver::Version::parse(&event.duroxide_version).ok();
792 }
793 EventKind::OrchestrationCompleted { output } => {
794 metadata.status = Some("Completed".to_string());
795 metadata.output = Some(output.clone());
796 break;
797 }
798 EventKind::OrchestrationFailed { details } => {
799 metadata.status = Some("Failed".to_string());
800 metadata.output = Some(details.display_message());
801 break;
802 }
803 EventKind::OrchestrationContinuedAsNew { input } => {
804 metadata.status = Some("ContinuedAsNew".to_string());
805 metadata.output = Some(input.clone());
806 // Don't set create_next_execution - the new execution will be started
807 // by WorkItem::ContinueAsNew being processed like StartOrchestration
808 break;
809 }
810 _ => {}
811 }
812 }
813
814 metadata
815 }
816
817 // Execution engine: consumes provider queues and persists history atomically.
818 /// Return the most recent descriptor `{ name, version, parent_instance?, parent_id? }` for an instance.
819 /// Returns `None` if the instance/history does not exist or no OrchestrationStarted is present.
820 pub async fn get_orchestration_descriptor(
821 &self,
822 instance: &str,
823 ) -> Option<crate::runtime::OrchestrationDescriptor> {
824 let hist = self.history_store.read(instance).await.unwrap_or_default();
825 for e in hist.iter().rev() {
826 if let EventKind::OrchestrationStarted {
827 name,
828 version,
829 parent_instance,
830 parent_id,
831 ..
832 } = &e.kind
833 {
834 return Some(crate::runtime::OrchestrationDescriptor {
835 name: name.clone(),
836 version: version.clone(),
837 parent_instance: parent_instance.clone(),
838 parent_id: *parent_id,
839 });
840 }
841 }
842 None
843 }
844
845 /// Get the current execution ID for an instance, or fetch from store if not tracked
846 ///
847 /// If `current_execution_id` is provided and the instance matches, use it directly.
848 /// Otherwise, check in-memory tracking, then fall back to INITIAL_EXECUTION_ID.
849 async fn get_execution_id_for_instance(&self, instance: &str, current_execution_id: Option<u64>) -> u64 {
850 // If this is the current instance being processed, use the provided execution_id
851 if let Some(exec_id) = current_execution_id {
852 // Update in-memory tracking for future calls
853 self.current_execution_ids
854 .lock()
855 .await
856 .insert(instance.to_string(), exec_id);
857 return exec_id;
858 }
859
860 // First check in-memory tracking
861 if let Some(&exec_id) = self.current_execution_ids.lock().await.get(instance) {
862 return exec_id;
863 }
864
865 // Fall back to INITIAL_EXECUTION_ID (no longer querying Provider::latest_execution_id)
866 crate::INITIAL_EXECUTION_ID
867 }
868
869 /// Start a new runtime using the in-memory SQLite provider.
870 ///
871 /// Requires the `sqlite` feature.
872 #[cfg(feature = "sqlite")]
873 pub async fn start(
874 activity_registry: registry::ActivityRegistry,
875 orchestration_registry: OrchestrationRegistry,
876 ) -> Arc<Self> {
877 let history_store: Arc<dyn Provider> = Arc::new(
878 crate::providers::sqlite::SqliteProvider::new_in_memory()
879 .await
880 .expect("in-memory SQLite provider creation should never fail"),
881 );
882 Self::start_with_store(history_store, activity_registry, orchestration_registry).await
883 }
884
885 /// Start a new runtime with a custom `Provider` implementation.
886 pub async fn start_with_store(
887 history_store: Arc<dyn Provider>,
888 activity_registry: registry::ActivityRegistry,
889 orchestration_registry: OrchestrationRegistry,
890 ) -> Arc<Self> {
891 Self::start_with_options(
892 history_store,
893 activity_registry,
894 orchestration_registry,
895 RuntimeOptions::default(),
896 )
897 .await
898 }
899
900 /// Start a new runtime with custom options.
901 ///
902 /// # Panics
903 ///
904 /// Panics if `session_idle_timeout` is not greater than the worker lock renewal interval
905 /// (`worker_lock_timeout - worker_lock_renewal_buffer`). This prevents sessions from being
906 /// unpinned during long-running activity execution.
907 pub async fn start_with_options(
908 history_store: Arc<dyn Provider>,
909 activity_registry: registry::ActivityRegistry,
910 orchestration_registry: OrchestrationRegistry,
911 options: RuntimeOptions,
912 ) -> Arc<Self> {
913 // Validate session timeout invariant
914 let worker_renewal_interval = options
915 .worker_lock_timeout
916 .checked_sub(options.worker_lock_renewal_buffer)
917 .unwrap_or(Duration::from_secs(1));
918 if options.session_idle_timeout <= worker_renewal_interval {
919 panic!(
920 "session_idle_timeout ({}s) must be greater than worker lock renewal interval ({}s). \
921 Sessions would unpin during long-running activity execution. \
922 Increase session_idle_timeout or decrease worker_lock_timeout.",
923 options.session_idle_timeout.as_secs(),
924 worker_renewal_interval.as_secs(),
925 );
926 }
927
928 // Inject built-in system activities (new_guid, utc_now_ms, get_kv_value)
929 let activity_registry = inject_builtin_activities(activity_registry);
930
931 // Wrap activity registry in Arc for internal sharing across worker threads
932 let activity_registry = Arc::new(activity_registry);
933
934 // Initialize observability (metrics + structured logging)
935 let observability_handle = observability::ObservabilityHandle::init(&options.observability).ok(); // Gracefully degrade if observability fails to initialize
936
937 // Print version on startup
938 tracing::info!(
939 target: "duroxide::runtime",
940 "duroxide runtime ({}) starting with provider {} ({})",
941 env!("CARGO_PKG_VERSION"),
942 history_store.name(),
943 history_store.version()
944 );
945
946 // Wrap provider with metrics instrumentation if metrics are enabled
947 let history_store: Arc<dyn Provider> = if let Some(ref handle) = observability_handle {
948 let metrics = handle.metrics_provider();
949 Arc::new(crate::providers::instrumented::InstrumentedProvider::new(
950 history_store,
951 Some(metrics.clone()),
952 ))
953 } else {
954 history_store
955 };
956
957 let joins: Vec<JoinHandle<()>> = Vec::new();
958
959 // Generate unique runtime instance ID (4-char hex)
960 use std::time::{SystemTime, UNIX_EPOCH};
961 let runtime_id = format!(
962 "{:04x}",
963 SystemTime::now()
964 .duration_since(UNIX_EPOCH)
965 .map(|d| (d.as_nanos() & 0xFFFF) as u16)
966 .unwrap_or(0)
967 );
968
969 // start request queue + worker
970 let runtime = Arc::new(Self {
971 joins: Mutex::new(joins),
972 history_store,
973 orchestration_registry,
974 current_execution_ids: Mutex::new(HashMap::new()),
975 shutdown_flag: Arc::new(AtomicBool::new(false)),
976
977 options,
978 observability_handle,
979 runtime_id,
980 });
981
982 // Initialize gauges from provider (if supported)
983 runtime.clone().initialize_gauges().await;
984
985 // Start periodic gauge polling if observability is enabled
986 if runtime.observability_handle.is_some() {
987 let gauge_handle = runtime.clone().start_gauge_poller();
988 runtime.joins.lock().await.push(gauge_handle);
989 }
990
991 // background orchestrator dispatcher (extracted from inline poller)
992 let handle = runtime.clone().start_orchestration_dispatcher();
993 runtime.joins.lock().await.push(handle);
994
995 // background work dispatcher (executes activities)
996 let work_handle = runtime.clone().start_work_dispatcher(activity_registry);
997 runtime.joins.lock().await.push(work_handle);
998
999 runtime
1000 }
1001
1002 /// Shutdown the runtime.
1003 ///
1004 /// # Parameters
1005 ///
1006 /// * `timeout_ms` - How long to wait for graceful shutdown:
1007 /// - `None`: Default 1000ms
1008 /// - `Some(Duration::ZERO)`: Immediate abort
1009 /// - `Some(ms)`: Wait specified milliseconds
1010 pub async fn shutdown(self: Arc<Self>, timeout_ms: Option<u64>) {
1011 let timeout_ms = timeout_ms.unwrap_or(1000);
1012
1013 if timeout_ms == 0 {
1014 warn!("Immediate shutdown - aborting all tasks");
1015 let mut joins = self.joins.lock().await;
1016 for j in joins.drain(..) {
1017 j.abort();
1018 }
1019 return;
1020 }
1021
1022 // debug!("Graceful shutdown (timeout: {}ms)", timeout_ms);
1023
1024 // Set shutdown flag - workers check this between iterations
1025 self.shutdown_flag.store(true, Ordering::Relaxed);
1026
1027 // Give workers time to notice and exit gracefully
1028 tokio::time::sleep(std::time::Duration::from_millis(timeout_ms)).await;
1029
1030 // Check if any tasks are still running (need to be aborted)
1031 let mut joins = self.joins.lock().await;
1032
1033 // Abort any remaining tasks
1034 for j in joins.drain(..) {
1035 j.abort();
1036 }
1037
1038 // debug!("Runtime shut down");
1039
1040 // Shutdown observability last (after all workers stopped)
1041 // Note: We can't move out of Arc here, so observability shutdown happens when Runtime is dropped
1042 // or if we could restructure to take ownership in shutdown
1043 }
1044}