#![allow(clippy::expect_used)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::clone_on_ref_ptr)]
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
pub mod client;
pub mod runtime;
pub use runtime::OrchestrationDescriptor;
pub mod providers;
#[cfg(feature = "provider-test")]
pub mod provider_validations;
#[cfg(feature = "provider-test")]
pub mod provider_validation;
#[cfg(feature = "provider-test")]
pub mod provider_stress_tests;
#[cfg(feature = "provider-test")]
pub mod provider_stress_test;
pub use client::{Client, ClientError};
pub use runtime::{
OrchestrationHandler, OrchestrationRegistry, OrchestrationRegistryBuilder, OrchestrationStatus, RuntimeOptions,
};
pub use providers::{
ExecutionInfo, InstanceInfo, ProviderAdmin, QueueDepths, ScheduledActivityIdentifier, SessionFetchConfig,
SystemMetrics, TagFilter,
};
pub use providers::{DispatcherCapabilityFilter, SemverRange, current_build_version};
pub use providers::{DeleteInstanceResult, InstanceFilter, InstanceTree, PruneOptions, PruneResult};
pub type ProviderRef = Arc<dyn providers::Provider>;
pub type OrchestrationHandlerRef = Arc<dyn runtime::OrchestrationHandler>;
#[derive(Debug, Clone)]
pub enum ScheduleKind {
Activity {
name: String,
},
Timer,
ExternalWait {
event_name: String,
},
QueueDequeue {
event_name: String,
},
SubOrchestration {
token: u64,
},
}
pub struct DurableFuture<T> {
token: u64,
kind: ScheduleKind,
ctx: OrchestrationContext,
completed: bool,
inner: std::pin::Pin<Box<dyn Future<Output = T> + Send>>,
}
impl<T: Send + 'static> DurableFuture<T> {
fn new(
token: u64,
kind: ScheduleKind,
ctx: OrchestrationContext,
inner: impl Future<Output = T> + Send + 'static,
) -> Self {
Self {
token,
kind,
ctx,
completed: false,
inner: Box::pin(inner),
}
}
pub fn map<U: Send + 'static>(self, f: impl FnOnce(T) -> U + Send + 'static) -> DurableFuture<U> {
let token = self.token;
let kind = self.kind.clone();
let ctx = self.ctx.clone();
let inner = unsafe {
let inner = std::ptr::read(&self.inner);
std::mem::forget(self);
inner
};
let mapped = async move {
let value = inner.await;
f(value)
};
DurableFuture::new(token, kind, ctx, mapped)
}
pub fn with_tag(self, tag: impl Into<String>) -> Self {
match &self.kind {
ScheduleKind::Activity { .. } => {}
other => panic!("with_tag() can only be called on activity futures, got {:?}", other),
}
let tag_value = tag.into();
let mut inner = self.ctx.inner.lock().expect("Mutex should not be poisoned");
let mut found = false;
for (token, action) in inner.emitted_actions.iter_mut() {
if *token == self.token {
match action {
Action::CallActivity { tag, .. } => {
*tag = Some(tag_value);
}
_ => panic!("Token matched but action is not CallActivity"),
}
found = true;
break;
}
}
assert!(
found,
"with_tag(): token {} not found in emitted_actions — actions were drained before with_tag() was called",
self.token
);
drop(inner);
self
}
}
impl<T> Future for DurableFuture<T> {
type Output = T;
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<T> {
match self.inner.as_mut().poll(cx) {
Poll::Ready(value) => {
self.completed = true;
Poll::Ready(value)
}
Poll::Pending => Poll::Pending,
}
}
}
impl<T> Drop for DurableFuture<T> {
fn drop(&mut self) {
if !self.completed {
self.ctx.mark_token_cancelled(self.token, &self.kind);
}
}
}
unsafe impl<T: Send> Send for DurableFuture<T> {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Either2<A, B> {
First(A),
Second(B),
}
impl<A, B> Either2<A, B> {
pub fn is_first(&self) -> bool {
matches!(self, Either2::First(_))
}
pub fn is_second(&self) -> bool {
matches!(self, Either2::Second(_))
}
pub fn index(&self) -> usize {
match self {
Either2::First(_) => 0,
Either2::Second(_) => 1,
}
}
}
impl<T> Either2<T, T> {
pub fn into_tuple(self) -> (usize, T) {
match self {
Either2::First(v) => (0, v),
Either2::Second(v) => (1, v),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Either3<A, B, C> {
First(A),
Second(B),
Third(C),
}
impl<A, B, C> Either3<A, B, C> {
pub fn index(&self) -> usize {
match self {
Either3::First(_) => 0,
Either3::Second(_) => 1,
Either3::Third(_) => 2,
}
}
}
impl<T> Either3<T, T, T> {
pub fn into_tuple(self) -> (usize, T) {
match self {
Either3::First(v) => (0, v),
Either3::Second(v) => (1, v),
Either3::Third(v) => (2, v),
}
}
}
pub(crate) const SYSCALL_ACTIVITY_PREFIX: &str = "__duroxide_syscall:";
pub(crate) const SYSCALL_ACTIVITY_NEW_GUID: &str = "__duroxide_syscall:new_guid";
pub(crate) const SYSCALL_ACTIVITY_UTC_NOW_MS: &str = "__duroxide_syscall:utc_now_ms";
pub(crate) const SYSCALL_ACTIVITY_GET_KV_VALUE: &str = "__duroxide_syscall:get_kv_value";
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct SystemStats {
pub history_event_count: u64,
pub history_size_bytes: u64,
pub queue_pending_count: u64,
pub kv_user_key_count: u64,
pub kv_total_value_bytes: u64,
}
use crate::_typed_codec::Codec;
use serde::{Deserialize, Serialize};
use std::time::{Duration as StdDuration, SystemTime, UNIX_EPOCH};
mod _typed_codec {
use serde::{Serialize, de::DeserializeOwned};
use serde_json::Value;
pub trait Codec {
fn encode<T: Serialize>(v: &T) -> Result<String, String>;
fn decode<T: DeserializeOwned>(s: &str) -> Result<T, String>;
}
pub struct Json;
impl Codec for Json {
fn encode<T: Serialize>(v: &T) -> Result<String, String> {
match serde_json::to_value(v) {
Ok(Value::String(s)) => Ok(s),
Ok(val) => serde_json::to_string(&val).map_err(|e| e.to_string()),
Err(e) => Err(e.to_string()),
}
}
fn decode<T: DeserializeOwned>(s: &str) -> Result<T, String> {
match serde_json::from_str::<T>(s) {
Ok(v) => Ok(v),
Err(_) => {
let val = Value::String(s.to_string());
serde_json::from_value(val).map_err(|e| e.to_string())
}
}
}
}
}
pub const INITIAL_EXECUTION_ID: u64 = 1;
pub const INITIAL_EVENT_ID: u64 = 1;
pub const SUB_ORCH_AUTO_PREFIX: &str = "sub::";
pub(crate) const SUB_ORCH_PENDING_PREFIX: &str = "sub::pending_";
#[inline]
pub fn is_auto_generated_sub_orch_id(instance: &str) -> bool {
instance.starts_with(SUB_ORCH_AUTO_PREFIX)
}
#[inline]
pub fn build_child_instance_id(parent_instance: &str, child_instance: &str) -> String {
if is_auto_generated_sub_orch_id(child_instance) {
format!("{parent_instance}::{child_instance}")
} else {
child_instance.to_string()
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum ErrorDetails {
Infrastructure {
operation: String,
message: String,
retryable: bool,
},
Configuration {
kind: ConfigErrorKind,
resource: String,
message: Option<String>,
},
Application {
kind: AppErrorKind,
message: String,
retryable: bool,
},
Poison {
attempt_count: u32,
max_attempts: u32,
message_type: PoisonMessageType,
message: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum PoisonMessageType {
Orchestration { instance: String, execution_id: u64 },
Activity {
instance: String,
execution_id: u64,
activity_name: String,
activity_id: u64,
},
FailedDeserialization {
instance: String,
execution_id: u64,
error: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum ConfigErrorKind {
Nondeterminism,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum AppErrorKind {
ActivityFailed,
OrchestrationFailed,
Panicked,
Cancelled { reason: String },
}
impl ErrorDetails {
pub fn category(&self) -> &'static str {
match self {
ErrorDetails::Infrastructure { .. } => "infrastructure",
ErrorDetails::Configuration { .. } => "configuration",
ErrorDetails::Application { .. } => "application",
ErrorDetails::Poison { .. } => "poison",
}
}
pub fn is_retryable(&self) -> bool {
match self {
ErrorDetails::Infrastructure { retryable, .. } => *retryable,
ErrorDetails::Application { retryable, .. } => *retryable,
ErrorDetails::Configuration { .. } => false,
ErrorDetails::Poison { .. } => false, }
}
pub fn display_message(&self) -> String {
match self {
ErrorDetails::Infrastructure { operation, message, .. } => {
format!("infrastructure:{operation}: {message}")
}
ErrorDetails::Configuration {
kind,
resource,
message,
} => match kind {
ConfigErrorKind::Nondeterminism => message
.as_ref()
.map(|m| format!("nondeterministic: {m}"))
.unwrap_or_else(|| format!("nondeterministic in {resource}")),
},
ErrorDetails::Application { kind, message, .. } => match kind {
AppErrorKind::Cancelled { reason } => format!("canceled: {reason}"),
AppErrorKind::Panicked => format!("orchestration panicked: {message}"),
_ => message.clone(),
},
ErrorDetails::Poison {
attempt_count,
max_attempts,
message_type,
..
} => match message_type {
PoisonMessageType::Orchestration { instance, .. } => {
format!("poison: orchestration {instance} exceeded {attempt_count} attempts (max {max_attempts})")
}
PoisonMessageType::Activity {
activity_name,
activity_id,
..
} => {
format!(
"poison: activity {activity_name}#{activity_id} exceeded {attempt_count} attempts (max {max_attempts})"
)
}
PoisonMessageType::FailedDeserialization { instance, error, .. } => {
format!(
"poison: orchestration {instance} history deserialization failed after {attempt_count} attempts (max {max_attempts}): {error}"
)
}
},
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Event {
pub event_id: u64,
pub source_event_id: Option<u64>,
pub instance_id: String,
pub execution_id: u64,
pub timestamp_ms: u64,
pub duroxide_version: String,
#[serde(flatten)]
pub kind: EventKind,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "type")]
pub enum EventKind {
#[serde(rename = "OrchestrationStarted")]
OrchestrationStarted {
name: String,
version: String,
input: String,
parent_instance: Option<String>,
parent_id: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
carry_forward_events: Option<Vec<(String, String)>>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
initial_custom_status: Option<String>,
},
#[serde(rename = "OrchestrationCompleted")]
OrchestrationCompleted { output: String },
#[serde(rename = "OrchestrationFailed")]
OrchestrationFailed { details: ErrorDetails },
#[serde(rename = "ActivityScheduled")]
ActivityScheduled {
name: String,
input: String,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
session_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
tag: Option<String>,
},
#[serde(rename = "ActivityCompleted")]
ActivityCompleted { result: String },
#[serde(rename = "ActivityFailed")]
ActivityFailed { details: ErrorDetails },
#[serde(rename = "ActivityCancelRequested")]
ActivityCancelRequested { reason: String },
#[serde(rename = "TimerCreated")]
TimerCreated { fire_at_ms: u64 },
#[serde(rename = "TimerFired")]
TimerFired { fire_at_ms: u64 },
#[serde(rename = "ExternalSubscribed")]
ExternalSubscribed { name: String },
#[serde(rename = "ExternalEvent")]
ExternalEvent { name: String, data: String },
#[serde(rename = "OrchestrationChained")]
OrchestrationChained {
name: String,
instance: String,
input: String,
},
#[serde(rename = "SubOrchestrationScheduled")]
SubOrchestrationScheduled {
name: String,
instance: String,
input: String,
},
#[serde(rename = "SubOrchestrationCompleted")]
SubOrchestrationCompleted { result: String },
#[serde(rename = "SubOrchestrationFailed")]
SubOrchestrationFailed { details: ErrorDetails },
#[serde(rename = "SubOrchestrationCancelRequested")]
SubOrchestrationCancelRequested { reason: String },
#[serde(rename = "OrchestrationContinuedAsNew")]
OrchestrationContinuedAsNew { input: String },
#[serde(rename = "OrchestrationCancelRequested")]
OrchestrationCancelRequested { reason: String },
#[serde(rename = "ExternalSubscribedCancelled")]
ExternalSubscribedCancelled { reason: String },
#[serde(rename = "ExternalSubscribedPersistent")]
QueueSubscribed { name: String },
#[serde(rename = "ExternalEventPersistent")]
QueueEventDelivered { name: String, data: String },
#[serde(rename = "ExternalSubscribedPersistentCancelled")]
QueueSubscriptionCancelled { reason: String },
#[serde(rename = "CustomStatusUpdated")]
CustomStatusUpdated { status: Option<String> },
#[cfg(feature = "replay-version-test")]
#[serde(rename = "ExternalSubscribed2")]
ExternalSubscribed2 { name: String, topic: String },
#[cfg(feature = "replay-version-test")]
#[serde(rename = "ExternalEvent2")]
ExternalEvent2 { name: String, topic: String, data: String },
#[serde(rename = "KeyValueSet")]
KeyValueSet {
key: String,
value: String,
#[serde(default)]
last_updated_at_ms: u64,
},
#[serde(rename = "KeyValueCleared")]
KeyValueCleared { key: String },
#[serde(rename = "KeyValuesCleared")]
KeyValuesCleared,
}
impl Event {
pub fn with_event_id(
event_id: u64,
instance_id: impl Into<String>,
execution_id: u64,
source_event_id: Option<u64>,
kind: EventKind,
) -> Self {
use std::time::{SystemTime, UNIX_EPOCH};
Event {
event_id,
source_event_id,
instance_id: instance_id.into(),
execution_id,
timestamp_ms: SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0),
duroxide_version: env!("CARGO_PKG_VERSION").to_string(),
kind,
}
}
pub fn new(
instance_id: impl Into<String>,
execution_id: u64,
source_event_id: Option<u64>,
kind: EventKind,
) -> Self {
Self::with_event_id(0, instance_id, execution_id, source_event_id, kind)
}
#[inline]
pub fn event_id(&self) -> u64 {
self.event_id
}
#[inline]
pub(crate) fn set_event_id(&mut self, id: u64) {
self.event_id = id;
}
#[inline]
pub fn source_event_id(&self) -> Option<u64> {
self.source_event_id
}
pub fn is_terminal(&self) -> bool {
matches!(
self.kind,
EventKind::OrchestrationCompleted { .. }
| EventKind::OrchestrationFailed { .. }
| EventKind::OrchestrationContinuedAsNew { .. }
)
}
}
#[derive(Debug, Clone)]
pub enum LogLevel {
Info,
Warn,
Error,
}
#[derive(Debug, Clone)]
pub enum BackoffStrategy {
None,
Fixed {
delay: std::time::Duration,
},
Linear {
base: std::time::Duration,
max: std::time::Duration,
},
Exponential {
base: std::time::Duration,
multiplier: f64,
max: std::time::Duration,
},
}
impl Default for BackoffStrategy {
fn default() -> Self {
BackoffStrategy::Exponential {
base: std::time::Duration::from_millis(100),
multiplier: 2.0,
max: std::time::Duration::from_secs(30),
}
}
}
impl BackoffStrategy {
pub fn delay_for_attempt(&self, attempt: u32) -> std::time::Duration {
match self {
BackoffStrategy::None => std::time::Duration::ZERO,
BackoffStrategy::Fixed { delay } => *delay,
BackoffStrategy::Linear { base, max } => {
let delay = base.saturating_mul(attempt);
std::cmp::min(delay, *max)
}
BackoffStrategy::Exponential { base, multiplier, max } => {
let factor = multiplier.powi(attempt.saturating_sub(1) as i32);
let delay_nanos = (base.as_nanos() as f64 * factor) as u128;
let delay = std::time::Duration::from_nanos(delay_nanos.min(u64::MAX as u128) as u64);
std::cmp::min(delay, *max)
}
}
}
}
#[derive(Debug, Clone)]
pub struct RetryPolicy {
pub max_attempts: u32,
pub backoff: BackoffStrategy,
pub timeout: Option<std::time::Duration>,
}
impl Default for RetryPolicy {
fn default() -> Self {
Self {
max_attempts: 3,
backoff: BackoffStrategy::default(),
timeout: None,
}
}
}
impl RetryPolicy {
pub fn new(max_attempts: u32) -> Self {
assert!(max_attempts >= 1, "max_attempts must be at least 1");
Self {
max_attempts,
..Default::default()
}
}
pub fn with_timeout(mut self, timeout: std::time::Duration) -> Self {
self.timeout = Some(timeout);
self
}
#[doc(hidden)]
pub fn with_total_timeout(mut self, timeout: std::time::Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn with_backoff(mut self, backoff: BackoffStrategy) -> Self {
self.backoff = backoff;
self
}
pub fn delay_for_attempt(&self, attempt: u32) -> std::time::Duration {
self.backoff.delay_for_attempt(attempt)
}
}
#[derive(Debug, Clone)]
pub enum Action {
CallActivity {
scheduling_event_id: u64,
name: String,
input: String,
session_id: Option<String>,
tag: Option<String>,
},
CreateTimer { scheduling_event_id: u64, fire_at_ms: u64 },
WaitExternal { scheduling_event_id: u64, name: String },
StartOrchestrationDetached {
scheduling_event_id: u64,
name: String,
version: Option<String>,
instance: String,
input: String,
},
StartSubOrchestration {
scheduling_event_id: u64,
name: String,
version: Option<String>,
instance: String,
input: String,
},
ContinueAsNew { input: String, version: Option<String> },
UpdateCustomStatus { status: Option<String> },
DequeueEvent { scheduling_event_id: u64, name: String },
#[cfg(feature = "replay-version-test")]
WaitExternal2 {
scheduling_event_id: u64,
name: String,
topic: String,
},
SetKeyValue {
key: String,
value: String,
last_updated_at_ms: u64,
},
ClearKeyValue { key: String },
ClearKeyValues,
}
#[doc(hidden)]
#[derive(Debug, Clone)]
#[allow(dead_code)] pub enum CompletionResult {
ActivityOk(String),
ActivityErr(String),
TimerFired,
SubOrchOk(String),
SubOrchErr(String),
ExternalData(String),
}
#[derive(Debug)]
struct CtxInner {
is_replaying: bool,
next_token: u64,
emitted_actions: Vec<(u64, Action)>,
completion_results: std::collections::HashMap<u64, CompletionResult>,
token_bindings: std::collections::HashMap<u64, u64>,
external_subscriptions: std::collections::HashMap<u64, (String, usize)>,
external_arrivals: std::collections::HashMap<String, Vec<String>>,
external_next_index: std::collections::HashMap<String, usize>,
external_cancelled_subscriptions: std::collections::HashSet<u64>,
queue_subscriptions: Vec<(u64, String)>,
queue_arrivals: std::collections::HashMap<String, Vec<String>>,
queue_cancelled_subscriptions: std::collections::HashSet<u64>,
queue_resolved_subscriptions: std::collections::HashSet<u64>,
#[cfg(feature = "replay-version-test")]
external2_subscriptions: std::collections::HashMap<u64, (String, String, usize)>,
#[cfg(feature = "replay-version-test")]
external2_arrivals: std::collections::HashMap<(String, String), Vec<String>>,
#[cfg(feature = "replay-version-test")]
external2_next_index: std::collections::HashMap<(String, String), usize>,
sub_orchestration_instances: std::collections::HashMap<u64, String>,
cancelled_tokens: std::collections::HashSet<u64>,
cancelled_token_kinds: std::collections::HashMap<u64, ScheduleKind>,
execution_id: u64,
instance_id: String,
orchestration_name: String,
orchestration_version: String,
logging_enabled_this_poll: bool,
accumulated_custom_status: Option<String>,
kv_state: std::collections::HashMap<String, String>,
kv_metadata: std::collections::HashMap<String, u64>,
}
impl CtxInner {
fn new(
_history: Vec<Event>, execution_id: u64,
instance_id: String,
orchestration_name: String,
orchestration_version: String,
_worker_id: Option<String>, ) -> Self {
Self {
is_replaying: true,
next_token: 0,
emitted_actions: Vec::new(),
completion_results: Default::default(),
token_bindings: Default::default(),
external_subscriptions: Default::default(),
external_arrivals: Default::default(),
external_next_index: Default::default(),
external_cancelled_subscriptions: Default::default(),
queue_subscriptions: Default::default(),
queue_arrivals: Default::default(),
queue_cancelled_subscriptions: Default::default(),
queue_resolved_subscriptions: Default::default(),
#[cfg(feature = "replay-version-test")]
external2_subscriptions: Default::default(),
#[cfg(feature = "replay-version-test")]
external2_arrivals: Default::default(),
#[cfg(feature = "replay-version-test")]
external2_next_index: Default::default(),
sub_orchestration_instances: Default::default(),
cancelled_tokens: Default::default(),
cancelled_token_kinds: Default::default(),
execution_id,
instance_id,
orchestration_name,
orchestration_version,
logging_enabled_this_poll: false,
accumulated_custom_status: None,
kv_state: std::collections::HashMap::new(),
kv_metadata: std::collections::HashMap::new(),
}
}
fn now_ms(&self) -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
fn emit_action(&mut self, action: Action) -> u64 {
self.next_token += 1;
let token = self.next_token;
self.emitted_actions.push((token, action));
token
}
fn drain_emitted_actions(&mut self) -> Vec<(u64, Action)> {
std::mem::take(&mut self.emitted_actions)
}
fn bind_token(&mut self, token: u64, schedule_id: u64) {
self.token_bindings.insert(token, schedule_id);
}
fn get_bound_schedule_id(&self, token: u64) -> Option<u64> {
self.token_bindings.get(&token).copied()
}
fn deliver_result(&mut self, schedule_id: u64, result: CompletionResult) {
for (&token, &sid) in &self.token_bindings {
if sid == schedule_id {
self.completion_results.insert(token, result);
return;
}
}
tracing::warn!(
schedule_id,
"dropping completion result with no binding (unsupported for now)"
);
}
fn get_result(&self, token: u64) -> Option<&CompletionResult> {
self.completion_results.get(&token)
}
fn bind_external_subscription(&mut self, schedule_id: u64, name: &str) {
let idx = self.external_next_index.entry(name.to_string()).or_insert(0);
let subscription_index = *idx;
*idx += 1;
self.external_subscriptions
.insert(schedule_id, (name.to_string(), subscription_index));
}
fn has_pending_subscription_slot(&self, name: &str) -> bool {
let active_subs = self
.external_subscriptions
.iter()
.filter(|(sid, (n, _))| n == name && !self.external_cancelled_subscriptions.contains(sid))
.count();
let delivered = self.external_arrivals.get(name).map_or(0, |v| v.len());
active_subs > delivered
}
fn deliver_external_event(&mut self, name: String, data: String) {
self.external_arrivals.entry(name).or_default().push(data);
}
fn get_external_event(&self, schedule_id: u64) -> Option<&String> {
let (name, subscription_index) = self.external_subscriptions.get(&schedule_id)?;
if self.external_cancelled_subscriptions.contains(&schedule_id) {
return None;
}
let cancelled_below = self
.external_subscriptions
.values()
.filter(|(n, idx)| n == name && *idx < *subscription_index)
.filter(|(_, idx)| {
self.external_subscriptions
.iter()
.any(|(sid, (n, i))| n == name && i == idx && self.external_cancelled_subscriptions.contains(sid))
})
.count();
let effective_index = subscription_index - cancelled_below;
let arrivals = self.external_arrivals.get(name)?;
arrivals.get(effective_index)
}
fn mark_external_subscription_cancelled(&mut self, schedule_id: u64) {
self.external_cancelled_subscriptions.insert(schedule_id);
}
fn bind_queue_subscription(&mut self, schedule_id: u64, name: &str) {
self.queue_subscriptions.push((schedule_id, name.to_string()));
}
fn deliver_queue_message(&mut self, name: String, data: String) {
self.queue_arrivals.entry(name).or_default().push(data);
}
fn mark_queue_subscription_cancelled(&mut self, schedule_id: u64) {
self.queue_cancelled_subscriptions.insert(schedule_id);
}
fn get_queue_message(&mut self, schedule_id: u64) -> Option<String> {
let name = self
.queue_subscriptions
.iter()
.find(|(sid, _)| *sid == schedule_id)
.map(|(_, n)| n.clone())?;
if self.queue_resolved_subscriptions.contains(&schedule_id) {
return None;
}
if self.queue_cancelled_subscriptions.contains(&schedule_id) {
return None;
}
let arrival_index: usize = self
.queue_subscriptions
.iter()
.take_while(|(sid, _)| *sid != schedule_id)
.filter(|(sid, n)| n == &name && !self.queue_cancelled_subscriptions.contains(sid))
.count();
let arrivals = self.queue_arrivals.get(&name)?;
if arrival_index < arrivals.len() {
self.queue_resolved_subscriptions.insert(schedule_id);
Some(arrivals[arrival_index].clone())
} else {
None
}
}
fn get_cancelled_queue_ids(&self) -> Vec<u64> {
let mut ids = Vec::new();
for &token in &self.cancelled_tokens {
if let Some(kind) = self.cancelled_token_kinds.get(&token)
&& matches!(kind, ScheduleKind::QueueDequeue { .. })
&& let Some(&schedule_id) = self.token_bindings.get(&token)
{
ids.push(schedule_id);
}
}
ids
}
#[cfg(feature = "replay-version-test")]
fn bind_external_subscription2(&mut self, schedule_id: u64, name: &str, topic: &str) {
let key = (name.to_string(), topic.to_string());
let idx = self.external2_next_index.entry(key.clone()).or_insert(0);
let subscription_index = *idx;
*idx += 1;
self.external2_subscriptions
.insert(schedule_id, (name.to_string(), topic.to_string(), subscription_index));
}
#[cfg(feature = "replay-version-test")]
fn deliver_external_event2(&mut self, name: String, topic: String, data: String) {
self.external2_arrivals.entry((name, topic)).or_default().push(data);
}
#[cfg(feature = "replay-version-test")]
fn get_external_event2(&self, schedule_id: u64) -> Option<&String> {
let (name, topic, subscription_index) = self.external2_subscriptions.get(&schedule_id)?;
let arrivals = self.external2_arrivals.get(&(name.clone(), topic.clone()))?;
arrivals.get(*subscription_index)
}
fn mark_token_cancelled(&mut self, token: u64, kind: ScheduleKind) {
self.cancelled_tokens.insert(token);
self.cancelled_token_kinds.insert(token, kind);
}
fn get_cancelled_activity_ids(&self) -> Vec<u64> {
let mut ids = Vec::new();
for &token in &self.cancelled_tokens {
if let Some(kind) = self.cancelled_token_kinds.get(&token)
&& matches!(kind, ScheduleKind::Activity { .. })
&& let Some(&schedule_id) = self.token_bindings.get(&token)
{
ids.push(schedule_id);
}
}
ids
}
fn get_cancelled_external_wait_ids(&self) -> Vec<u64> {
let mut ids = Vec::new();
for &token in &self.cancelled_tokens {
if let Some(kind) = self.cancelled_token_kinds.get(&token)
&& matches!(kind, ScheduleKind::ExternalWait { .. })
&& let Some(&schedule_id) = self.token_bindings.get(&token)
{
ids.push(schedule_id);
}
}
ids
}
fn get_cancelled_sub_orchestration_cancellations(&self) -> Vec<(u64, String)> {
let mut cancels = Vec::new();
for &token in &self.cancelled_tokens {
if let Some(ScheduleKind::SubOrchestration { token: sub_token }) = self.cancelled_token_kinds.get(&token)
&& let Some(&schedule_id) = self.token_bindings.get(&token)
{
if let Some(instance_id) = self.sub_orchestration_instances.get(sub_token) {
cancels.push((schedule_id, instance_id.clone()));
}
}
}
cancels
}
fn bind_sub_orchestration_instance(&mut self, token: u64, instance_id: String) {
self.sub_orchestration_instances.insert(token, instance_id);
}
fn clear_cancelled_tokens(&mut self) {
self.cancelled_tokens.clear();
self.cancelled_token_kinds.clear();
}
}
#[derive(Clone)]
pub struct ActivityContext {
instance_id: String,
execution_id: u64,
orchestration_name: String,
orchestration_version: String,
activity_name: String,
activity_id: u64,
worker_id: String,
session_id: Option<String>,
tag: Option<String>,
cancellation_token: tokio_util::sync::CancellationToken,
store: std::sync::Arc<dyn crate::providers::Provider>,
}
impl ActivityContext {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new_with_cancellation(
instance_id: String,
execution_id: u64,
orchestration_name: String,
orchestration_version: String,
activity_name: String,
activity_id: u64,
worker_id: String,
session_id: Option<String>,
tag: Option<String>,
cancellation_token: tokio_util::sync::CancellationToken,
store: std::sync::Arc<dyn crate::providers::Provider>,
) -> Self {
Self {
instance_id,
execution_id,
orchestration_name,
orchestration_version,
activity_name,
activity_id,
worker_id,
session_id,
tag,
cancellation_token,
store,
}
}
pub fn instance_id(&self) -> &str {
&self.instance_id
}
pub fn execution_id(&self) -> u64 {
self.execution_id
}
pub fn orchestration_name(&self) -> &str {
&self.orchestration_name
}
pub fn orchestration_version(&self) -> &str {
&self.orchestration_version
}
pub fn activity_name(&self) -> &str {
&self.activity_name
}
pub fn worker_id(&self) -> &str {
&self.worker_id
}
pub fn session_id(&self) -> Option<&str> {
self.session_id.as_deref()
}
pub fn tag(&self) -> Option<&str> {
self.tag.as_deref()
}
pub fn trace_info(&self, message: impl Into<String>) {
tracing::info!(
target: "duroxide::activity",
instance_id = %self.instance_id,
execution_id = %self.execution_id,
orchestration_name = %self.orchestration_name,
orchestration_version = %self.orchestration_version,
activity_name = %self.activity_name,
activity_id = %self.activity_id,
worker_id = %self.worker_id,
"{}",
message.into()
);
}
pub fn trace_warn(&self, message: impl Into<String>) {
tracing::warn!(
target: "duroxide::activity",
instance_id = %self.instance_id,
execution_id = %self.execution_id,
orchestration_name = %self.orchestration_name,
orchestration_version = %self.orchestration_version,
activity_name = %self.activity_name,
activity_id = %self.activity_id,
worker_id = %self.worker_id,
"{}",
message.into()
);
}
pub fn trace_error(&self, message: impl Into<String>) {
tracing::error!(
target: "duroxide::activity",
instance_id = %self.instance_id,
execution_id = %self.execution_id,
orchestration_name = %self.orchestration_name,
orchestration_version = %self.orchestration_version,
activity_name = %self.activity_name,
activity_id = %self.activity_id,
worker_id = %self.worker_id,
"{}",
message.into()
);
}
pub fn trace_debug(&self, message: impl Into<String>) {
tracing::debug!(
target: "duroxide::activity",
instance_id = %self.instance_id,
execution_id = %self.execution_id,
orchestration_name = %self.orchestration_name,
orchestration_version = %self.orchestration_version,
activity_name = %self.activity_name,
activity_id = %self.activity_id,
worker_id = %self.worker_id,
"{}",
message.into()
);
}
pub fn is_cancelled(&self) -> bool {
self.cancellation_token.is_cancelled()
}
pub async fn cancelled(&self) {
self.cancellation_token.cancelled().await
}
pub fn cancellation_token(&self) -> tokio_util::sync::CancellationToken {
self.cancellation_token.clone()
}
pub fn get_client(&self) -> crate::Client {
crate::Client::new(self.store.clone())
}
}
impl std::fmt::Debug for ActivityContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ActivityContext")
.field("instance_id", &self.instance_id)
.field("execution_id", &self.execution_id)
.field("orchestration_name", &self.orchestration_name)
.field("orchestration_version", &self.orchestration_version)
.field("activity_name", &self.activity_name)
.field("activity_id", &self.activity_id)
.field("worker_id", &self.worker_id)
.field("cancellation_token", &self.cancellation_token)
.field("store", &"<Provider>")
.finish()
}
}
#[derive(Clone)]
pub struct OrchestrationContext {
inner: Arc<Mutex<CtxInner>>,
}
struct ContinueAsNewFuture;
impl Future for ContinueAsNewFuture {
type Output = Result<String, String>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Pending
}
}
impl OrchestrationContext {
pub fn new(
history: Vec<Event>,
execution_id: u64,
instance_id: String,
orchestration_name: String,
orchestration_version: String,
worker_id: Option<String>,
) -> Self {
Self {
inner: Arc::new(Mutex::new(CtxInner::new(
history,
execution_id,
instance_id,
orchestration_name,
orchestration_version,
worker_id,
))),
}
}
pub fn is_replaying(&self) -> bool {
self.inner.lock().unwrap().is_replaying
}
#[doc(hidden)]
pub fn set_is_replaying(&self, is_replaying: bool) {
self.inner.lock().unwrap().is_replaying = is_replaying;
}
#[doc(hidden)]
pub fn bind_external_subscription(&self, schedule_id: u64, name: &str) {
self.inner.lock().unwrap().bind_external_subscription(schedule_id, name);
}
#[doc(hidden)]
pub fn deliver_external_event(&self, name: String, data: String) {
self.inner.lock().unwrap().deliver_external_event(name, data);
}
#[doc(hidden)]
pub fn bind_queue_subscription(&self, schedule_id: u64, name: &str) {
self.inner.lock().unwrap().bind_queue_subscription(schedule_id, name);
}
#[doc(hidden)]
pub fn deliver_queue_message(&self, name: String, data: String) {
self.inner.lock().unwrap().deliver_queue_message(name, data);
}
pub(crate) fn mark_queue_subscription_cancelled(&self, schedule_id: u64) {
self.inner
.lock()
.unwrap()
.mark_queue_subscription_cancelled(schedule_id);
}
pub(crate) fn mark_token_cancelled(&self, token: u64, kind: &ScheduleKind) {
self.inner.lock().unwrap().mark_token_cancelled(token, kind.clone());
}
pub(crate) fn get_cancelled_activity_ids(&self) -> Vec<u64> {
self.inner.lock().unwrap().get_cancelled_activity_ids()
}
pub(crate) fn get_cancelled_external_wait_ids(&self) -> Vec<u64> {
self.inner.lock().unwrap().get_cancelled_external_wait_ids()
}
pub(crate) fn mark_external_subscription_cancelled(&self, schedule_id: u64) {
self.inner
.lock()
.unwrap()
.mark_external_subscription_cancelled(schedule_id);
}
pub(crate) fn get_cancelled_queue_ids(&self) -> Vec<u64> {
self.inner.lock().unwrap().get_cancelled_queue_ids()
}
pub(crate) fn get_cancelled_sub_orchestration_cancellations(&self) -> Vec<(u64, String)> {
self.inner
.lock()
.unwrap()
.get_cancelled_sub_orchestration_cancellations()
}
pub(crate) fn clear_cancelled_tokens(&self) {
self.inner.lock().unwrap().clear_cancelled_tokens();
}
pub(crate) fn bind_sub_orchestration_instance(&self, token: u64, instance_id: String) {
self.inner
.lock()
.unwrap()
.bind_sub_orchestration_instance(token, instance_id);
}
pub fn trace_info(&self, message: impl Into<String>) {
self.trace("INFO", message);
}
pub fn trace_warn(&self, message: impl Into<String>) {
self.trace("WARN", message);
}
pub fn trace_error(&self, message: impl Into<String>) {
self.trace("ERROR", message);
}
pub fn trace_debug(&self, message: impl Into<String>) {
self.trace("DEBUG", message);
}
#[doc(hidden)]
pub fn drain_emitted_actions(&self) -> Vec<(u64, Action)> {
self.inner.lock().unwrap().drain_emitted_actions()
}
#[doc(hidden)]
pub fn get_emitted_actions(&self) -> Vec<(u64, Action)> {
self.inner.lock().unwrap().emitted_actions.clone()
}
#[doc(hidden)]
pub fn bind_token(&self, token: u64, schedule_id: u64) {
self.inner.lock().unwrap().bind_token(token, schedule_id);
}
#[doc(hidden)]
pub fn deliver_result(&self, schedule_id: u64, result: CompletionResult) {
self.inner.lock().unwrap().deliver_result(schedule_id, result);
}
pub fn instance_id(&self) -> String {
self.inner.lock().unwrap().instance_id.clone()
}
pub fn execution_id(&self) -> u64 {
self.inner.lock().unwrap().execution_id
}
pub fn orchestration_name(&self) -> String {
self.inner.lock().unwrap().orchestration_name.clone()
}
pub fn orchestration_version(&self) -> String {
self.inner.lock().unwrap().orchestration_version.clone()
}
pub fn is_logging_enabled(&self) -> bool {
self.inner.lock().unwrap().logging_enabled_this_poll
}
pub fn trace(&self, level: impl Into<String>, message: impl Into<String>) {
self.trace_internal(&level.into(), &message.into());
}
fn trace_internal(&self, level: &str, message: &str) {
let inner = self.inner.lock().unwrap();
if !inner.is_replaying {
match level.to_uppercase().as_str() {
"INFO" => tracing::info!(
target: "duroxide::orchestration",
instance_id = %inner.instance_id,
execution_id = %inner.execution_id,
orchestration_name = %inner.orchestration_name,
orchestration_version = %inner.orchestration_version,
"{}",
message
),
"WARN" => tracing::warn!(
target: "duroxide::orchestration",
instance_id = %inner.instance_id,
execution_id = %inner.execution_id,
orchestration_name = %inner.orchestration_name,
orchestration_version = %inner.orchestration_version,
"{}",
message
),
"ERROR" => tracing::error!(
target: "duroxide::orchestration",
instance_id = %inner.instance_id,
execution_id = %inner.execution_id,
orchestration_name = %inner.orchestration_name,
orchestration_version = %inner.orchestration_version,
"{}",
message
),
"DEBUG" => tracing::debug!(
target: "duroxide::orchestration",
instance_id = %inner.instance_id,
execution_id = %inner.execution_id,
orchestration_name = %inner.orchestration_name,
orchestration_version = %inner.orchestration_version,
"{}",
message
),
_ => tracing::trace!(
target: "duroxide::orchestration",
instance_id = %inner.instance_id,
execution_id = %inner.execution_id,
orchestration_name = %inner.orchestration_name,
orchestration_version = %inner.orchestration_version,
level = %level,
"{}",
message
),
}
}
}
pub fn new_guid(&self) -> impl Future<Output = Result<String, String>> {
self.schedule_activity(SYSCALL_ACTIVITY_NEW_GUID, "")
}
pub fn utc_now(&self) -> impl Future<Output = Result<SystemTime, String>> {
let fut = self.schedule_activity(SYSCALL_ACTIVITY_UTC_NOW_MS, "");
async move {
let s = fut.await?;
let ms = s.parse::<u64>().map_err(|e| e.to_string())?;
Ok(UNIX_EPOCH + StdDuration::from_millis(ms))
}
}
pub fn continue_as_new(&self, input: impl Into<String>) -> impl Future<Output = Result<String, String>> {
let mut inner = self.inner.lock().unwrap();
let input: String = input.into();
let action = Action::ContinueAsNew { input, version: None };
inner.emit_action(action);
ContinueAsNewFuture
}
pub fn continue_as_new_typed<In: serde::Serialize>(
&self,
input: &In,
) -> impl Future<Output = Result<String, String>> {
let payload =
crate::_typed_codec::Json::encode(input).expect("Serialization should never fail for valid input");
self.continue_as_new(payload)
}
pub fn continue_as_new_versioned(
&self,
version: impl Into<String>,
input: impl Into<String>,
) -> impl Future<Output = Result<String, String>> {
let mut inner = self.inner.lock().unwrap();
let action = Action::ContinueAsNew {
input: input.into(),
version: Some(version.into()),
};
inner.emit_action(action);
ContinueAsNewFuture
}
}
pub(crate) fn generate_guid() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
thread_local! {
static COUNTER: std::cell::Cell<u32> = const { std::cell::Cell::new(0) };
}
let counter = COUNTER.with(|c| {
let val = c.get();
c.set(val.wrapping_add(1));
val
});
format!(
"{:08x}-{:04x}-{:04x}-{:04x}-{:012x}",
(timestamp >> 96) as u32,
((timestamp >> 80) & 0xFFFF) as u16,
(counter & 0xFFFF) as u16,
((timestamp >> 64) & 0xFFFF) as u16,
(timestamp & 0xFFFFFFFFFFFF) as u64
)
}
impl OrchestrationContext {
pub async fn schedule_activity_with_retry(
&self,
name: impl Into<String>,
input: impl Into<String>,
policy: RetryPolicy,
) -> Result<String, String> {
let name = name.into();
let input = input.into();
let mut last_error = String::new();
for attempt in 1..=policy.max_attempts {
let activity_result = if let Some(timeout) = policy.timeout {
let deadline = async {
self.schedule_timer(timeout).await;
Err::<String, String>("timeout: activity timed out".to_string())
};
let activity = self.schedule_activity(&name, &input);
match self.select2(activity, deadline).await {
Either2::First(result) => result,
Either2::Second(Err(e)) => {
return Err(e);
}
Either2::Second(Ok(_)) => unreachable!(),
}
} else {
self.schedule_activity(&name, &input).await
};
match activity_result {
Ok(result) => return Ok(result),
Err(e) => {
last_error = e.clone();
if attempt < policy.max_attempts {
self.trace(
"warn",
format!(
"Activity '{}' attempt {}/{} failed: {}. Retrying...",
name, attempt, policy.max_attempts, e
),
);
let delay = policy.delay_for_attempt(attempt);
if !delay.is_zero() {
self.schedule_timer(delay).await;
}
}
}
}
}
Err(last_error)
}
pub async fn schedule_activity_with_retry_typed<In: serde::Serialize, Out: serde::de::DeserializeOwned>(
&self,
name: impl Into<String>,
input: &In,
policy: RetryPolicy,
) -> Result<Out, String> {
let payload = crate::_typed_codec::Json::encode(input).expect("encode");
let result = self.schedule_activity_with_retry(name, payload, policy).await?;
crate::_typed_codec::Json::decode::<Out>(&result)
}
pub async fn schedule_activity_with_retry_on_session(
&self,
name: impl Into<String>,
input: impl Into<String>,
policy: RetryPolicy,
session_id: impl Into<String>,
) -> Result<String, String> {
let name = name.into();
let input = input.into();
let session_id = session_id.into();
let mut last_error = String::new();
for attempt in 1..=policy.max_attempts {
let activity_result = if let Some(timeout) = policy.timeout {
let deadline = async {
self.schedule_timer(timeout).await;
Err::<String, String>("timeout: activity timed out".to_string())
};
let activity = self.schedule_activity_on_session(&name, &input, &session_id);
match self.select2(activity, deadline).await {
Either2::First(result) => result,
Either2::Second(Err(e)) => return Err(e),
Either2::Second(Ok(_)) => unreachable!(),
}
} else {
self.schedule_activity_on_session(&name, &input, &session_id).await
};
match activity_result {
Ok(result) => return Ok(result),
Err(e) => {
last_error = e.clone();
if attempt < policy.max_attempts {
self.trace(
"warn",
format!(
"Activity '{}' (session={}) attempt {}/{} failed: {}. Retrying...",
name, session_id, attempt, policy.max_attempts, e
),
);
let delay = policy.delay_for_attempt(attempt);
if !delay.is_zero() {
self.schedule_timer(delay).await;
}
}
}
}
}
Err(last_error)
}
pub async fn schedule_activity_with_retry_on_session_typed<
In: serde::Serialize,
Out: serde::de::DeserializeOwned,
>(
&self,
name: impl Into<String>,
input: &In,
policy: RetryPolicy,
session_id: impl Into<String>,
) -> Result<Out, String> {
let payload = crate::_typed_codec::Json::encode(input).expect("encode");
let result = self
.schedule_activity_with_retry_on_session(name, payload, policy, session_id)
.await?;
crate::_typed_codec::Json::decode::<Out>(&result)
}
pub fn schedule_orchestration(
&self,
name: impl Into<String>,
instance: impl Into<String>,
input: impl Into<String>,
) {
let name: String = name.into();
let instance: String = instance.into();
let input: String = input.into();
let mut inner = self.inner.lock().unwrap();
let _ = inner.emit_action(Action::StartOrchestrationDetached {
scheduling_event_id: 0, name,
version: None,
instance,
input,
});
}
pub fn schedule_orchestration_typed<In: serde::Serialize>(
&self,
name: impl Into<String>,
instance: impl Into<String>,
input: &In,
) {
let payload = crate::_typed_codec::Json::encode(input).expect("encode");
self.schedule_orchestration(name, instance, payload)
}
pub fn schedule_orchestration_versioned(
&self,
name: impl Into<String>,
version: Option<String>,
instance: impl Into<String>,
input: impl Into<String>,
) {
let name: String = name.into();
let instance: String = instance.into();
let input: String = input.into();
let mut inner = self.inner.lock().unwrap();
let _ = inner.emit_action(Action::StartOrchestrationDetached {
scheduling_event_id: 0, name,
version,
instance,
input,
});
}
pub fn schedule_orchestration_versioned_typed<In: serde::Serialize>(
&self,
name: impl Into<String>,
version: Option<String>,
instance: impl Into<String>,
input: &In,
) {
let payload = crate::_typed_codec::Json::encode(input).expect("encode");
self.schedule_orchestration_versioned(name, version, instance, payload)
}
pub fn set_custom_status(&self, status: impl Into<String>) {
let status: String = status.into();
let mut inner = self.inner.lock().unwrap();
inner.accumulated_custom_status = Some(status.clone());
inner.emit_action(Action::UpdateCustomStatus { status: Some(status) });
}
pub fn reset_custom_status(&self) {
let mut inner = self.inner.lock().unwrap();
inner.accumulated_custom_status = None;
inner.emit_action(Action::UpdateCustomStatus { status: None });
}
pub fn get_custom_status(&self) -> Option<String> {
self.inner.lock().unwrap().accumulated_custom_status.clone()
}
pub fn set_kv_value(&self, key: impl Into<String>, value: impl Into<String>) {
let key: String = key.into();
let value: String = value.into();
let mut inner = self.inner.lock().unwrap();
let last_updated_at_ms = inner.now_ms();
inner.kv_state.insert(key.clone(), value.clone());
inner.kv_metadata.insert(key.clone(), last_updated_at_ms);
inner.emit_action(Action::SetKeyValue {
key,
value,
last_updated_at_ms,
});
}
pub fn set_kv_value_typed<T: serde::Serialize>(&self, key: impl Into<String>, value: &T) {
let serialized = serde_json::to_string(value).expect("KV value serialization should not fail");
self.set_kv_value(key, serialized);
}
pub fn get_kv_value(&self, key: &str) -> Option<String> {
self.inner.lock().unwrap().kv_state.get(key).cloned()
}
pub fn get_kv_value_typed<T: serde::de::DeserializeOwned>(&self, key: &str) -> Result<Option<T>, String> {
match self.get_kv_value(key) {
None => Ok(None),
Some(s) => serde_json::from_str(&s)
.map(Some)
.map_err(|e| format!("KV deserialization error for key '{}': {}", key, e)),
}
}
pub fn get_kv_all_values(&self) -> std::collections::HashMap<String, String> {
self.inner.lock().unwrap().kv_state.clone()
}
pub fn get_kv_all_keys(&self) -> Vec<String> {
self.inner.lock().unwrap().kv_state.keys().cloned().collect()
}
pub fn get_kv_length(&self) -> usize {
self.inner.lock().unwrap().kv_state.len()
}
pub fn get_system_stats(&self) -> Option<crate::SystemStats> {
None
}
pub fn clear_kv_value(&self, key: impl Into<String>) {
let key: String = key.into();
let mut inner = self.inner.lock().unwrap();
inner.kv_state.remove(&key);
inner.kv_metadata.remove(&key);
inner.emit_action(Action::ClearKeyValue { key });
}
pub fn clear_all_kv_values(&self) {
let mut inner = self.inner.lock().unwrap();
inner.kv_state.clear();
inner.kv_metadata.clear();
inner.emit_action(Action::ClearKeyValues);
}
pub fn prune_kv_values_updated_before(&self, updated_before_ms: u64) -> usize {
let keys_to_clear: Vec<String> = {
let inner = self.inner.lock().unwrap();
inner
.kv_metadata
.iter()
.filter(|(_, ts)| **ts < updated_before_ms)
.filter(|(key, _)| inner.kv_state.contains_key(*key))
.map(|(key, _)| key.clone())
.collect()
};
let count = keys_to_clear.len();
for key in keys_to_clear {
self.clear_kv_value(key);
}
count
}
pub fn get_kv_value_from_instance(
&self,
instance_id: impl Into<String>,
key: impl Into<String>,
) -> DurableFuture<Result<Option<String>, String>> {
let input = serde_json::json!({
"instance_id": instance_id.into(),
"key": key.into(),
})
.to_string();
self.schedule_activity(SYSCALL_ACTIVITY_GET_KV_VALUE, input)
.map(|result| match result {
Ok(json_str) => serde_json::from_str::<Option<String>>(&json_str)
.map_err(|e| format!("get_kv_value_from_instance deserialization error: {e}")),
Err(e) => Err(e),
})
}
pub fn get_kv_value_from_instance_typed<T: serde::de::DeserializeOwned + Send + 'static>(
&self,
instance_id: impl Into<String>,
key: impl Into<String>,
) -> DurableFuture<Result<Option<T>, String>> {
self.get_kv_value_from_instance(instance_id, key)
.map(|result| match result {
Ok(None) => Ok(None),
Ok(Some(s)) => serde_json::from_str::<T>(&s)
.map(Some)
.map_err(|e| format!("get_kv_value_from_instance_typed deserialization error: {e}")),
Err(e) => Err(e),
})
}
}
impl OrchestrationContext {
pub fn schedule_activity(
&self,
name: impl Into<String>,
input: impl Into<String>,
) -> DurableFuture<Result<String, String>> {
self.schedule_activity_internal(name, input, None)
}
pub fn schedule_activity_typed<In: serde::Serialize, Out: serde::de::DeserializeOwned + Send + 'static>(
&self,
name: impl Into<String>,
input: &In,
) -> DurableFuture<Result<Out, String>> {
let payload = crate::_typed_codec::Json::encode(input).expect("encode");
self.schedule_activity(name, payload)
.map(|r| r.and_then(|s| crate::_typed_codec::Json::decode::<Out>(&s)))
}
pub fn schedule_activity_on_session(
&self,
name: impl Into<String>,
input: impl Into<String>,
session_id: impl Into<String>,
) -> DurableFuture<Result<String, String>> {
self.schedule_activity_internal(name, input, Some(session_id.into()))
}
pub fn schedule_activity_on_session_typed<
In: serde::Serialize,
Out: serde::de::DeserializeOwned + Send + 'static,
>(
&self,
name: impl Into<String>,
input: &In,
session_id: impl Into<String>,
) -> DurableFuture<Result<Out, String>> {
let payload = crate::_typed_codec::Json::encode(input).expect("encode");
self.schedule_activity_on_session(name, payload, session_id)
.map(|r| r.and_then(|s| crate::_typed_codec::Json::decode::<Out>(&s)))
}
fn schedule_activity_internal(
&self,
name: impl Into<String>,
input: impl Into<String>,
session_id: Option<String>,
) -> DurableFuture<Result<String, String>> {
let name: String = name.into();
let input: String = input.into();
let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
let token = inner.emit_action(Action::CallActivity {
scheduling_event_id: 0, name: name.clone(),
input: input.clone(),
session_id,
tag: None,
});
drop(inner);
let ctx = self.clone();
let inner_future = std::future::poll_fn(move |_cx| {
let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
if let Some(result) = inner.get_result(token) {
match result {
CompletionResult::ActivityOk(s) => Poll::Ready(Ok(s.clone())),
CompletionResult::ActivityErr(e) => Poll::Ready(Err(e.clone())),
_ => Poll::Pending, }
} else {
Poll::Pending
}
});
DurableFuture::new(token, ScheduleKind::Activity { name }, self.clone(), inner_future)
}
pub fn schedule_timer(&self, delay: std::time::Duration) -> DurableFuture<()> {
let delay_ms = delay.as_millis() as u64;
let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
let now = inner.now_ms();
let fire_at_ms = now.saturating_add(delay_ms);
let token = inner.emit_action(Action::CreateTimer {
scheduling_event_id: 0,
fire_at_ms,
});
drop(inner);
let ctx = self.clone();
let inner_future = std::future::poll_fn(move |_cx| {
let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
if let Some(result) = inner.get_result(token) {
match result {
CompletionResult::TimerFired => Poll::Ready(()),
_ => Poll::Pending,
}
} else {
Poll::Pending
}
});
DurableFuture::new(token, ScheduleKind::Timer, self.clone(), inner_future)
}
pub fn schedule_wait(&self, name: impl Into<String>) -> DurableFuture<String> {
let name: String = name.into();
let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
let token = inner.emit_action(Action::WaitExternal {
scheduling_event_id: 0,
name: name.clone(),
});
drop(inner);
let ctx = self.clone();
let inner_future = std::future::poll_fn(move |_cx| {
let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
if let Some(bound_id) = inner.get_bound_schedule_id(token)
&& let Some(data) = inner.get_external_event(bound_id)
{
return Poll::Ready(data.clone());
}
Poll::Pending
});
DurableFuture::new(
token,
ScheduleKind::ExternalWait { event_name: name },
self.clone(),
inner_future,
)
}
pub fn schedule_wait_typed<T: serde::de::DeserializeOwned + Send + 'static>(
&self,
name: impl Into<String>,
) -> DurableFuture<T> {
self.schedule_wait(name)
.map(|s| crate::_typed_codec::Json::decode::<T>(&s).expect("decode"))
}
pub fn dequeue_event(&self, queue: impl Into<String>) -> DurableFuture<String> {
let name: String = queue.into();
let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
let token = inner.emit_action(Action::DequeueEvent {
scheduling_event_id: 0,
name: name.clone(),
});
drop(inner);
let ctx = self.clone();
let inner_future = std::future::poll_fn(move |_cx| {
let mut inner = ctx.inner.lock().expect("Mutex should not be poisoned");
if let Some(bound_id) = inner.get_bound_schedule_id(token)
&& let Some(data) = inner.get_queue_message(bound_id)
{
return Poll::Ready(data);
}
Poll::Pending
});
DurableFuture::new(
token,
ScheduleKind::QueueDequeue { event_name: name },
self.clone(),
inner_future,
)
}
pub fn dequeue_event_typed<T: serde::de::DeserializeOwned>(
&self,
queue: impl Into<String>,
) -> impl Future<Output = T> {
let fut = self.dequeue_event(queue);
async move {
let s = fut.await;
crate::_typed_codec::Json::decode::<T>(&s).expect("decode")
}
}
#[deprecated(note = "Use dequeue_event() instead")]
pub fn schedule_wait_persistent(&self, name: impl Into<String>) -> DurableFuture<String> {
self.dequeue_event(name)
}
#[deprecated(note = "Use dequeue_event_typed() instead")]
pub fn schedule_wait_persistent_typed<T: serde::de::DeserializeOwned>(
&self,
name: impl Into<String>,
) -> impl Future<Output = T> {
self.dequeue_event_typed(name)
}
#[cfg(feature = "replay-version-test")]
pub fn schedule_wait2(&self, name: impl Into<String>, topic: impl Into<String>) -> DurableFuture<String> {
let name: String = name.into();
let topic: String = topic.into();
let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
let token = inner.emit_action(Action::WaitExternal2 {
scheduling_event_id: 0,
name: name.clone(),
topic: topic.clone(),
});
drop(inner);
let ctx = self.clone();
let inner_future = std::future::poll_fn(move |_cx| {
let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
if let Some(bound_id) = inner.get_bound_schedule_id(token)
&& let Some(data) = inner.get_external_event2(bound_id)
{
return Poll::Ready(data.clone());
}
Poll::Pending
});
DurableFuture::new(
token,
ScheduleKind::ExternalWait { event_name: name },
self.clone(),
inner_future,
)
}
pub fn schedule_sub_orchestration(
&self,
name: impl Into<String>,
input: impl Into<String>,
) -> DurableFuture<Result<String, String>> {
self.schedule_sub_orchestration_versioned_with_id_internal(name, None, None, input)
}
pub fn schedule_sub_orchestration_with_id(
&self,
name: impl Into<String>,
instance: impl Into<String>,
input: impl Into<String>,
) -> DurableFuture<Result<String, String>> {
self.schedule_sub_orchestration_versioned_with_id_internal(name, None, Some(instance.into()), input)
}
pub fn schedule_sub_orchestration_versioned(
&self,
name: impl Into<String>,
version: Option<String>,
input: impl Into<String>,
) -> DurableFuture<Result<String, String>> {
self.schedule_sub_orchestration_versioned_with_id_internal(name, version, None, input)
}
pub fn schedule_sub_orchestration_versioned_with_id(
&self,
name: impl Into<String>,
version: Option<String>,
instance: impl Into<String>,
input: impl Into<String>,
) -> DurableFuture<Result<String, String>> {
self.schedule_sub_orchestration_versioned_with_id_internal(name, version, Some(instance.into()), input)
}
fn schedule_sub_orchestration_versioned_with_id_internal(
&self,
name: impl Into<String>,
version: Option<String>,
instance: Option<String>,
input: impl Into<String>,
) -> DurableFuture<Result<String, String>> {
let name: String = name.into();
let input: String = input.into();
let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
let action_instance = match &instance {
Some(explicit_id) => explicit_id.clone(),
None => format!("{}{}", SUB_ORCH_PENDING_PREFIX, inner.next_token + 1),
};
let token = inner.emit_action(Action::StartSubOrchestration {
scheduling_event_id: 0,
name: name.clone(),
version,
instance: action_instance.clone(),
input: input.clone(),
});
drop(inner);
let ctx = self.clone();
let inner_future = std::future::poll_fn(move |_cx| {
let inner = ctx.inner.lock().expect("Mutex should not be poisoned");
if let Some(result) = inner.get_result(token) {
match result {
CompletionResult::SubOrchOk(s) => Poll::Ready(Ok(s.clone())),
CompletionResult::SubOrchErr(e) => Poll::Ready(Err(e.clone())),
_ => Poll::Pending,
}
} else {
Poll::Pending
}
});
DurableFuture::new(
token,
ScheduleKind::SubOrchestration { token },
self.clone(),
inner_future,
)
}
pub fn schedule_sub_orchestration_typed<In: serde::Serialize, Out: serde::de::DeserializeOwned + Send + 'static>(
&self,
name: impl Into<String>,
input: &In,
) -> DurableFuture<Result<Out, String>> {
let payload = crate::_typed_codec::Json::encode(input).expect("encode");
self.schedule_sub_orchestration(name, payload)
.map(|r| r.and_then(|s| crate::_typed_codec::Json::decode::<Out>(&s)))
}
pub fn schedule_sub_orchestration_with_id_typed<
In: serde::Serialize,
Out: serde::de::DeserializeOwned + Send + 'static,
>(
&self,
name: impl Into<String>,
instance: impl Into<String>,
input: &In,
) -> DurableFuture<Result<Out, String>> {
let payload = crate::_typed_codec::Json::encode(input).expect("encode");
self.schedule_sub_orchestration_with_id(name, instance, payload)
.map(|r| r.and_then(|s| crate::_typed_codec::Json::decode::<Out>(&s)))
}
pub async fn join<T, F>(&self, futures: Vec<F>) -> Vec<T>
where
F: Future<Output = T>,
{
::futures::future::join_all(futures).await
}
pub async fn join2<T1, T2, F1, F2>(&self, f1: F1, f2: F2) -> (T1, T2)
where
F1: Future<Output = T1>,
F2: Future<Output = T2>,
{
::futures::future::join(f1, f2).await
}
pub async fn join3<T1, T2, T3, F1, F2, F3>(&self, f1: F1, f2: F2, f3: F3) -> (T1, T2, T3)
where
F1: Future<Output = T1>,
F2: Future<Output = T2>,
F3: Future<Output = T3>,
{
::futures::future::join3(f1, f2, f3).await
}
pub async fn select2<T1, T2, F1, F2>(&self, f1: F1, f2: F2) -> Either2<T1, T2>
where
F1: Future<Output = T1>,
F2: Future<Output = T2>,
{
use ::futures::FutureExt;
let mut f1 = std::pin::pin!(f1.fuse());
let mut f2 = std::pin::pin!(f2.fuse());
::futures::select_biased! {
result = f1 => Either2::First(result),
result = f2 => Either2::Second(result),
}
}
pub async fn select3<T1, T2, T3, F1, F2, F3>(&self, f1: F1, f2: F2, f3: F3) -> Either3<T1, T2, T3>
where
F1: Future<Output = T1>,
F2: Future<Output = T2>,
F3: Future<Output = T3>,
{
use ::futures::FutureExt;
let mut f1 = std::pin::pin!(f1.fuse());
let mut f2 = std::pin::pin!(f2.fuse());
let mut f3 = std::pin::pin!(f3.fuse());
::futures::select_biased! {
result = f1 => Either3::First(result),
result = f2 => Either3::Second(result),
result = f3 => Either3::Third(result),
}
}
}
#[cfg(test)]
mod system_stats_tests {
use super::*;
#[test]
fn system_stats_serialization_roundtrip() {
let stats = SystemStats {
history_event_count: 847,
history_size_bytes: 52301,
queue_pending_count: 3,
kv_user_key_count: 42,
kv_total_value_bytes: 8192,
};
let json = serde_json::to_string(&stats).unwrap();
let deserialized: SystemStats = serde_json::from_str(&json).unwrap();
assert_eq!(stats, deserialized);
}
#[test]
fn system_stats_json_format() {
let stats = SystemStats {
history_event_count: 10,
history_size_bytes: 2000,
queue_pending_count: 0,
kv_user_key_count: 5,
kv_total_value_bytes: 1024,
};
let json = serde_json::to_string(&stats).unwrap();
assert!(json.contains("\"history_event_count\":10"));
assert!(json.contains("\"queue_pending_count\":0"));
}
}