#![allow(clippy::expect_used)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::clone_on_ref_ptr)]
use crate::providers::{ExecutionMetadata, Provider, WorkItem};
use crate::{Event, EventKind, OrchestrationContext};
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::warn;
fn inject_builtin_activities(user_registry: registry::ActivityRegistry) -> registry::ActivityRegistry {
registry::ActivityRegistry::builder_from(&user_registry)
.register_builtin(
crate::SYSCALL_ACTIVITY_NEW_GUID,
|_ctx: crate::ActivityContext, _input: String| async move { Ok(crate::generate_guid()) },
)
.register_builtin(
crate::SYSCALL_ACTIVITY_UTC_NOW_MS,
|_ctx: crate::ActivityContext, _input: String| async move {
use std::time::{SystemTime, UNIX_EPOCH};
let ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
Ok(ms.to_string())
},
)
.register_builtin(
crate::SYSCALL_ACTIVITY_GET_KV_VALUE,
|ctx: crate::ActivityContext, input: String| async move {
let parsed: serde_json::Value =
serde_json::from_str(&input).map_err(|e| format!("get_kv_value: invalid input: {e}"))?;
let instance_id = parsed["instance_id"]
.as_str()
.ok_or_else(|| "get_kv_value: missing instance_id".to_string())?;
let key = parsed["key"]
.as_str()
.ok_or_else(|| "get_kv_value: missing key".to_string())?;
let client = ctx.get_client();
let value = client
.get_kv_value(instance_id, key)
.await
.map_err(|e| format!("get_kv_value client error: {e}"))?;
serde_json::to_string(&value).map_err(|e| format!("get_kv_value serialization error: {e}"))
},
)
.build_result()
.expect("builtin syscall activity registration should never fail")
}
#[derive(Debug, Clone)]
pub struct UnregisteredBackoffConfig {
pub base_delay: Duration,
pub max_delay: Duration,
}
impl UnregisteredBackoffConfig {
const MAX_BACKOFF_EXPONENT: u32 = 6;
const DEFAULT_BASE_DELAY: Duration = Duration::from_secs(1);
const DEFAULT_MAX_DELAY: Duration = Duration::from_secs(60);
pub fn delay(&self, attempt_count: u32) -> Duration {
let exponent = attempt_count.saturating_sub(1).min(Self::MAX_BACKOFF_EXPONENT);
let delay = self.base_delay.saturating_mul(1 << exponent);
delay.min(self.max_delay)
}
}
impl Default for UnregisteredBackoffConfig {
fn default() -> Self {
Self {
base_delay: Self::DEFAULT_BASE_DELAY,
max_delay: Self::DEFAULT_MAX_DELAY,
}
}
}
#[derive(Debug, Clone)]
pub struct RuntimeOptions {
pub dispatcher_min_poll_interval: Duration,
pub dispatcher_long_poll_timeout: Duration,
pub orchestration_concurrency: usize,
pub worker_concurrency: usize,
pub orchestrator_lock_timeout: Duration,
pub orchestrator_lock_renewal_buffer: Duration,
pub worker_lock_timeout: Duration,
pub worker_lock_renewal_buffer: Duration,
pub observability: ObservabilityConfig,
pub unregistered_backoff: UnregisteredBackoffConfig,
pub max_attempts: u32,
pub activity_cancellation_grace_period: Duration,
pub supported_replay_versions: Option<crate::providers::SemverRange>,
pub session_lock_timeout: Duration,
pub session_lock_renewal_buffer: Duration,
pub session_idle_timeout: Duration,
pub session_cleanup_interval: Duration,
pub max_sessions_per_runtime: usize,
pub worker_node_id: Option<String>,
pub worker_tag_filter: crate::providers::TagFilter,
}
impl Default for RuntimeOptions {
fn default() -> Self {
Self {
dispatcher_min_poll_interval: Duration::from_millis(100),
dispatcher_long_poll_timeout: Duration::from_secs(30), orchestration_concurrency: 2,
worker_concurrency: 2,
orchestrator_lock_timeout: Duration::from_secs(5),
orchestrator_lock_renewal_buffer: Duration::from_secs(2),
worker_lock_timeout: Duration::from_secs(30),
worker_lock_renewal_buffer: Duration::from_secs(5),
observability: ObservabilityConfig::default(),
unregistered_backoff: UnregisteredBackoffConfig::default(),
max_attempts: 10,
activity_cancellation_grace_period: Duration::from_secs(10),
supported_replay_versions: None,
session_lock_timeout: Duration::from_secs(30),
session_lock_renewal_buffer: Duration::from_secs(5),
session_idle_timeout: Duration::from_secs(300), session_cleanup_interval: Duration::from_secs(300), max_sessions_per_runtime: 10,
worker_node_id: None,
worker_tag_filter: crate::providers::TagFilter::default(),
}
}
}
mod dispatchers;
pub mod limits;
pub mod observability;
pub mod registry;
mod state_helpers;
#[cfg(feature = "test-hooks")]
pub mod test_hooks;
use async_trait::async_trait;
pub use state_helpers::{HistoryManager, WorkItemReader};
pub mod execution;
pub mod replay_engine;
pub use observability::{LogFormat, ObservabilityConfig};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OrchestrationStatus {
NotFound,
Running {
custom_status: Option<String>,
custom_status_version: u64,
},
Completed {
output: String,
custom_status: Option<String>,
custom_status_version: u64,
},
Failed {
details: crate::ErrorDetails,
custom_status: Option<String>,
custom_status_version: u64,
},
}
#[async_trait]
pub trait OrchestrationHandler: Send + Sync {
async fn invoke(&self, ctx: OrchestrationContext, input: String) -> Result<String, String>;
}
pub struct FnOrchestration<F, Fut>(pub F)
where
F: Fn(OrchestrationContext, String) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<String, String>> + Send + 'static;
#[async_trait]
impl<F, Fut> OrchestrationHandler for FnOrchestration<F, Fut>
where
F: Fn(OrchestrationContext, String) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<String, String>> + Send + 'static,
{
async fn invoke(&self, ctx: OrchestrationContext, input: String) -> Result<String, String> {
(self.0)(ctx, input).await
}
}
#[async_trait]
pub trait ActivityHandler: Send + Sync {
async fn invoke(&self, ctx: crate::ActivityContext, input: String) -> Result<String, String>;
}
pub struct FnActivity<F, Fut>(pub F)
where
F: Fn(crate::ActivityContext, String) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<String, String>> + Send + 'static;
#[async_trait]
impl<F, Fut> ActivityHandler for FnActivity<F, Fut>
where
F: Fn(crate::ActivityContext, String) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<String, String>> + Send + 'static,
{
async fn invoke(&self, ctx: crate::ActivityContext, input: String) -> Result<String, String> {
(self.0)(ctx, input).await
}
}
pub use crate::runtime::registry::{OrchestrationRegistry, OrchestrationRegistryBuilder, VersionPolicy};
pub fn kind_of(msg: &WorkItem) -> &'static str {
match msg {
WorkItem::StartOrchestration { .. } => "StartOrchestration",
WorkItem::ActivityExecute { .. } => "ActivityExecute",
WorkItem::ActivityCompleted { .. } => "ActivityCompleted",
WorkItem::ActivityFailed { .. } => "ActivityFailed",
WorkItem::TimerFired { .. } => "TimerFired",
WorkItem::ExternalRaised { .. } => "ExternalRaised",
WorkItem::QueueMessage { .. } => "ExternalRaisedPersistent",
#[cfg(feature = "replay-version-test")]
WorkItem::ExternalRaised2 { .. } => "ExternalRaised2",
WorkItem::SubOrchCompleted { .. } => "SubOrchCompleted",
WorkItem::SubOrchFailed { .. } => "SubOrchFailed",
WorkItem::CancelInstance { .. } => "CancelInstance",
WorkItem::ContinueAsNew { .. } => "ContinueAsNew",
}
}
pub struct Runtime {
joins: Mutex<Vec<JoinHandle<()>>>,
history_store: Arc<dyn Provider>,
orchestration_registry: OrchestrationRegistry,
current_execution_ids: Mutex<HashMap<String, u64>>,
shutdown_flag: Arc<AtomicBool>,
options: RuntimeOptions,
observability_handle: Option<observability::ObservabilityHandle>,
runtime_id: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OrchestrationDescriptor {
pub name: String,
pub version: String,
pub parent_instance: Option<String>,
pub parent_id: Option<u64>,
}
impl Runtime {
#[inline]
fn metrics_provider(&self) -> Option<&observability::MetricsProvider> {
self.observability_handle
.as_ref()
.map(|h| h.metrics_provider().as_ref())
}
#[inline]
fn record_orchestration_start(&self, orchestration_name: &str, version: &str, initiated_by: &str) {
if let Some(provider) = self.metrics_provider() {
provider.record_orchestration_start(orchestration_name, version, initiated_by);
}
}
#[inline]
fn record_orchestration_completion_with_labels(
&self,
orchestration_name: &str,
version: &str,
status: &str,
duration_seconds: f64,
turn_count: u64,
history_events: u64,
) {
if let Some(provider) = self.metrics_provider() {
provider.record_orchestration_completion(
orchestration_name,
version,
status,
duration_seconds,
turn_count,
history_events,
);
}
}
#[inline]
fn record_orchestration_failure_with_labels(
&self,
orchestration_name: &str,
version: &str,
error_type: &str,
error_category: &str,
) {
if let Some(provider) = self.metrics_provider() {
provider.record_orchestration_failure(orchestration_name, version, error_type, error_category);
}
}
#[inline]
fn record_continue_as_new(&self, orchestration_name: &str, execution_id: u64) {
if let Some(provider) = self.metrics_provider() {
provider.record_continue_as_new(orchestration_name, execution_id);
}
}
#[inline]
fn increment_active_orchestrations(&self) {
if let Some(provider) = self.metrics_provider() {
provider.increment_active_orchestrations();
}
}
#[inline]
fn decrement_active_orchestrations(&self) {
if let Some(provider) = self.metrics_provider() {
provider.decrement_active_orchestrations();
}
}
#[inline]
fn record_activity_execution(
&self,
activity_name: &str,
outcome: &str,
duration_seconds: f64,
retry_attempt: u32,
tag: Option<&str>,
) {
if let Some(provider) = self.metrics_provider() {
provider.record_activity_execution(activity_name, outcome, duration_seconds, retry_attempt, tag);
}
}
#[inline]
fn record_orchestration_application_error(&self) {
if let Some(provider) = self.metrics_provider() {
provider.record_orchestration_application_error();
}
}
#[inline]
fn record_orchestration_infrastructure_error(&self) {
if let Some(provider) = self.metrics_provider() {
provider.record_orchestration_infrastructure_error();
}
}
#[inline]
fn record_orchestration_configuration_error(&self) {
if let Some(provider) = self.metrics_provider() {
provider.record_orchestration_configuration_error();
}
}
#[inline]
fn record_activity_success(&self) {
if let Some(provider) = self.metrics_provider() {
provider.record_activity_success();
}
}
#[inline]
fn record_activity_app_error(&self) {
if let Some(provider) = self.metrics_provider() {
provider.record_activity_app_error();
}
}
#[inline]
fn record_activity_infra_error(&self) {
if let Some(provider) = self.metrics_provider() {
provider.record_activity_infra_error();
}
}
#[inline]
fn record_orchestration_poison(&self) {
if let Some(provider) = self.metrics_provider() {
provider.record_orchestration_poison();
}
}
#[inline]
fn record_activity_poison(&self) {
if let Some(provider) = self.metrics_provider() {
provider.record_activity_poison();
}
}
pub fn metrics_snapshot(&self) -> Option<observability::MetricsSnapshot> {
self.observability_handle
.as_ref()
.map(|handle| handle.metrics_snapshot())
}
pub fn observability_handle(&self) -> Option<&observability::ObservabilityHandle> {
self.observability_handle.as_ref()
}
async fn initialize_gauges(self: Arc<Self>) {
if let Some(admin) = self.history_store.as_management_capability() {
let system_metrics_future = admin.get_system_metrics();
let queue_depths_future = admin.get_queue_depths();
let (system_result, queue_result) = tokio::join!(system_metrics_future, queue_depths_future);
if let Some(provider) = self.observability_handle.as_ref().map(|h| h.metrics_provider()) {
if let Ok(metrics) = system_result {
let active_count = metrics.running_instances as i64;
provider.set_active_orchestrations(active_count);
tracing::debug!(
target: "duroxide::runtime",
active_count = %active_count,
"Initialized active orchestrations gauge"
);
}
if let Ok(depths) = queue_result {
provider.update_queue_depths(depths.orchestrator_queue as u64, depths.worker_queue as u64);
tracing::debug!(
target: "duroxide::runtime",
orch_queue = %depths.orchestrator_queue,
worker_queue = %depths.worker_queue,
"Initialized queue depth gauges"
);
}
}
}
}
fn start_gauge_poller(self: Arc<Self>) -> JoinHandle<()> {
let interval = self.options.observability.gauge_poll_interval;
let shutdown_flag = self.shutdown_flag.clone();
tokio::spawn(async move {
tracing::debug!(
target: "duroxide::runtime",
interval_secs = interval.as_secs(),
"Gauge poller started"
);
loop {
tokio::time::sleep(interval).await;
if shutdown_flag.load(Ordering::Relaxed) {
break;
}
self.clone().refresh_gauges().await;
}
})
}
async fn refresh_gauges(self: Arc<Self>) {
let provider = &self
.observability_handle
.as_ref()
.expect("gauge poller only runs when observability is enabled")
.metrics_provider();
let admin = match self.history_store.as_management_capability() {
Some(admin) => admin,
None => return,
};
let (system_result, queue_result) = tokio::join!(admin.get_system_metrics(), admin.get_queue_depths());
if let Ok(metrics) = system_result {
provider.set_active_orchestrations(metrics.running_instances as i64);
}
if let Ok(depths) = queue_result {
provider.update_queue_depths(depths.orchestrator_queue as u64, depths.worker_queue as u64);
}
}
fn compute_execution_metadata(
history_delta: &[Event],
_orchestrator_items: &[WorkItem],
_current_execution_id: u64,
) -> ExecutionMetadata {
let mut metadata = ExecutionMetadata::default();
for event in history_delta {
match &event.kind {
EventKind::OrchestrationStarted {
name,
version,
parent_instance,
..
} => {
metadata.orchestration_name = Some(name.clone());
metadata.orchestration_version = Some(version.clone());
metadata.parent_instance_id = parent_instance.clone();
metadata.pinned_duroxide_version = semver::Version::parse(&event.duroxide_version).ok();
}
EventKind::OrchestrationCompleted { output } => {
metadata.status = Some("Completed".to_string());
metadata.output = Some(output.clone());
break;
}
EventKind::OrchestrationFailed { details } => {
metadata.status = Some("Failed".to_string());
metadata.output = Some(details.display_message());
break;
}
EventKind::OrchestrationContinuedAsNew { input } => {
metadata.status = Some("ContinuedAsNew".to_string());
metadata.output = Some(input.clone());
break;
}
_ => {}
}
}
metadata
}
pub async fn get_orchestration_descriptor(
&self,
instance: &str,
) -> Option<crate::runtime::OrchestrationDescriptor> {
let hist = self.history_store.read(instance).await.unwrap_or_default();
for e in hist.iter().rev() {
if let EventKind::OrchestrationStarted {
name,
version,
parent_instance,
parent_id,
..
} = &e.kind
{
return Some(crate::runtime::OrchestrationDescriptor {
name: name.clone(),
version: version.clone(),
parent_instance: parent_instance.clone(),
parent_id: *parent_id,
});
}
}
None
}
async fn get_execution_id_for_instance(&self, instance: &str, current_execution_id: Option<u64>) -> u64 {
if let Some(exec_id) = current_execution_id {
self.current_execution_ids
.lock()
.await
.insert(instance.to_string(), exec_id);
return exec_id;
}
if let Some(&exec_id) = self.current_execution_ids.lock().await.get(instance) {
return exec_id;
}
crate::INITIAL_EXECUTION_ID
}
#[cfg(feature = "sqlite")]
pub async fn start(
activity_registry: registry::ActivityRegistry,
orchestration_registry: OrchestrationRegistry,
) -> Arc<Self> {
let history_store: Arc<dyn Provider> = Arc::new(
crate::providers::sqlite::SqliteProvider::new_in_memory()
.await
.expect("in-memory SQLite provider creation should never fail"),
);
Self::start_with_store(history_store, activity_registry, orchestration_registry).await
}
pub async fn start_with_store(
history_store: Arc<dyn Provider>,
activity_registry: registry::ActivityRegistry,
orchestration_registry: OrchestrationRegistry,
) -> Arc<Self> {
Self::start_with_options(
history_store,
activity_registry,
orchestration_registry,
RuntimeOptions::default(),
)
.await
}
pub async fn start_with_options(
history_store: Arc<dyn Provider>,
activity_registry: registry::ActivityRegistry,
orchestration_registry: OrchestrationRegistry,
options: RuntimeOptions,
) -> Arc<Self> {
let worker_renewal_interval = options
.worker_lock_timeout
.checked_sub(options.worker_lock_renewal_buffer)
.unwrap_or(Duration::from_secs(1));
if options.session_idle_timeout <= worker_renewal_interval {
panic!(
"session_idle_timeout ({}s) must be greater than worker lock renewal interval ({}s). \
Sessions would unpin during long-running activity execution. \
Increase session_idle_timeout or decrease worker_lock_timeout.",
options.session_idle_timeout.as_secs(),
worker_renewal_interval.as_secs(),
);
}
let activity_registry = inject_builtin_activities(activity_registry);
let activity_registry = Arc::new(activity_registry);
let observability_handle = observability::ObservabilityHandle::init(&options.observability).ok();
tracing::info!(
target: "duroxide::runtime",
"duroxide runtime ({}) starting with provider {} ({})",
env!("CARGO_PKG_VERSION"),
history_store.name(),
history_store.version()
);
let history_store: Arc<dyn Provider> = if let Some(ref handle) = observability_handle {
let metrics = handle.metrics_provider();
Arc::new(crate::providers::instrumented::InstrumentedProvider::new(
history_store,
Some(metrics.clone()),
))
} else {
history_store
};
let joins: Vec<JoinHandle<()>> = Vec::new();
use std::time::{SystemTime, UNIX_EPOCH};
let runtime_id = format!(
"{:04x}",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| (d.as_nanos() & 0xFFFF) as u16)
.unwrap_or(0)
);
let runtime = Arc::new(Self {
joins: Mutex::new(joins),
history_store,
orchestration_registry,
current_execution_ids: Mutex::new(HashMap::new()),
shutdown_flag: Arc::new(AtomicBool::new(false)),
options,
observability_handle,
runtime_id,
});
runtime.clone().initialize_gauges().await;
if runtime.observability_handle.is_some() {
let gauge_handle = runtime.clone().start_gauge_poller();
runtime.joins.lock().await.push(gauge_handle);
}
let handle = runtime.clone().start_orchestration_dispatcher();
runtime.joins.lock().await.push(handle);
let work_handle = runtime.clone().start_work_dispatcher(activity_registry);
runtime.joins.lock().await.push(work_handle);
runtime
}
pub async fn shutdown(self: Arc<Self>, timeout_ms: Option<u64>) {
let timeout_ms = timeout_ms.unwrap_or(1000);
if timeout_ms == 0 {
warn!("Immediate shutdown - aborting all tasks");
let mut joins = self.joins.lock().await;
for j in joins.drain(..) {
j.abort();
}
return;
}
self.shutdown_flag.store(true, Ordering::Relaxed);
tokio::time::sleep(std::time::Duration::from_millis(timeout_ms)).await;
let mut joins = self.joins.lock().await;
for j in joins.drain(..) {
j.abort();
}
}
}