mod options;
pub use options::{
ActivityCloseTimeouts, ActivityOptions, ChildWorkflowOptions, ContinueAsNewOptions,
ContinueAsNewVersioningBehavior, LocalActivityOptions, NexusOperationOptions, Signal,
SignalData, TimerOptions,
};
pub use temporalio_common_wasm::protos::coresdk::child_workflow::StartChildWorkflowExecutionFailedCause;
use crate::runtime::{
SdkGuardedFuture, SdkWakeGuard,
entry::WorkflowImplementation,
host::WorkflowHost,
model::{
CancelExternalWfResult, CancellableID, NexusStartResult, SignalExternalWfResult,
TimerResult, UnblockEvent, Unblockable, WorkflowTermination,
},
};
use futures_channel::oneshot;
use futures_util::{
FutureExt,
future::{FusedFuture, Shared},
task::Context,
};
use std::{
cell::{Cell, Ref, RefCell},
collections::HashMap,
future::{self, Future},
marker::PhantomData,
ops::Deref,
pin::Pin,
rc::Rc,
sync::atomic::{AtomicBool, Ordering},
task::{Poll, Waker},
time::{Duration, SystemTime},
};
use temporalio_common_wasm::{
ActivityDefinition, SignalDefinition, WorkflowDefinition,
data_converters::{
ActivityExecutionDecodeHint, ChildWorkflowExecutionDecodeHint,
ChildWorkflowStartDecodeHint, DataConverter, GenericPayloadConverter, PayloadConverter,
SerializationContext, SerializationContextData, TemporalDeserializable,
WorkflowSignalDecodeHint,
},
error::{
ActivityExecutionError, ChildWorkflowExecutionError, ChildWorkflowStartError,
WorkflowSignalError,
},
protos::{
coresdk::{
activity_result::{ActivityResolution, Cancellation, activity_resolution},
child_workflow::{ChildWorkflowResult, child_workflow_result},
common::NamespacedWorkflowExecution,
nexus::NexusOperationResult,
workflow_activation::{
InitializeWorkflow, WorkflowActivation as CoreWorkflowActivation,
resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus,
workflow_activation_job::Variant as ActivationVariant,
},
workflow_commands::{
CancelChildWorkflowExecution, CancelSignalWorkflow, CancelTimer,
ModifyWorkflowProperties, RequestCancelActivity,
RequestCancelExternalWorkflowExecution, RequestCancelLocalActivity,
RequestCancelNexusOperation, SetPatchMarker, SignalExternalWorkflowExecution,
UpsertWorkflowSearchAttributes, signal_external_workflow_execution,
workflow_command,
},
},
temporal::api::{
common::v1::{Memo, Payload, SearchAttributes},
failure::v1::{CanceledFailureInfo, Failure, failure::FailureInfo},
},
utilities::TryIntoOrNone,
},
worker::WorkerDeploymentVersion,
};
#[derive(Clone)]
pub struct BaseWorkflowContext {
inner: Rc<WorkflowContextInner>,
}
impl BaseWorkflowContext {
pub(crate) fn apply_activation_context(&self, activation: &CoreWorkflowActivation) {
let mut shared = self.inner.shared.borrow_mut();
shared.activation = activation.clone();
if let Some(seed) = activation.jobs.iter().find_map(|job| match &job.variant {
Some(ActivationVariant::UpdateRandomSeed(attrs)) => Some(attrs.randomness_seed),
_ => None,
}) {
shared.random_seed = seed;
}
}
pub fn data_converter(&self) -> &DataConverter {
&self.inner.data_converter
}
pub(crate) fn record_patch(&self, patch_id: String, present: bool) {
self.inner
.shared
.borrow_mut()
.changes
.insert(patch_id, present);
}
pub(crate) fn view(&self) -> WorkflowContextView {
WorkflowContextView::new(
self.inner.namespace.clone(),
self.inner.task_queue.clone(),
self.inner.run_id.clone(),
&self.inner.inital_information,
)
}
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
enum PendingCommandId {
Timer(u32),
Activity(u32),
ChildWorkflowStart(u32),
ChildWorkflowComplete(u32),
SignalExternal(u32),
CancelExternal(u32),
NexusOpStart(u32),
NexusOpComplete(u32),
}
impl PendingCommandId {
fn from_unblock_event(event: &UnblockEvent) -> Self {
match event {
UnblockEvent::Timer(seq, _) => Self::Timer(*seq),
UnblockEvent::Activity(seq, _) => Self::Activity(*seq),
UnblockEvent::WorkflowStart(seq, _) => Self::ChildWorkflowStart(*seq),
UnblockEvent::WorkflowComplete(seq, _) => Self::ChildWorkflowComplete(*seq),
UnblockEvent::SignalExternal(seq, _) => Self::SignalExternal(*seq),
UnblockEvent::CancelExternal(seq, _) => Self::CancelExternal(*seq),
UnblockEvent::NexusOperationStart(seq, _) => Self::NexusOpStart(*seq),
UnblockEvent::NexusOperationComplete(seq, _) => Self::NexusOpComplete(*seq),
}
}
}
struct WorkflowRuntimeState {
host: Rc<dyn WorkflowHost>,
pending_unblocks: RefCell<HashMap<PendingCommandId, oneshot::Sender<UnblockEvent>>>,
forced_wft_failure: RefCell<Option<Box<dyn std::error::Error + Send + Sync>>>,
progress_made: Cell<bool>,
}
impl WorkflowRuntimeState {
fn new(host: Rc<dyn WorkflowHost>) -> Self {
Self {
host,
pending_unblocks: RefCell::new(HashMap::new()),
forced_wft_failure: RefCell::new(None),
progress_made: Cell::new(false),
}
}
fn register_unblocker(&self, id: PendingCommandId, unblocker: oneshot::Sender<UnblockEvent>) {
self.pending_unblocks.borrow_mut().insert(id, unblocker);
}
fn unblock(&self, event: UnblockEvent) -> Result<(), anyhow::Error> {
let id = PendingCommandId::from_unblock_event(&event);
let unblocker = self
.pending_unblocks
.borrow_mut()
.remove(&id)
.ok_or_else(|| anyhow::anyhow!("Command {id:?} not found to unblock"))?;
self.progress_made.set(true);
let _guard = SdkWakeGuard::new();
let _ = unblocker.send(event);
Ok(())
}
fn maybe_unblock(&self, event: UnblockEvent) -> bool {
let id = PendingCommandId::from_unblock_event(&event);
let Some(unblocker) = self.pending_unblocks.borrow_mut().remove(&id) else {
return false;
};
self.progress_made.set(true);
let _guard = SdkWakeGuard::new();
let _ = unblocker.send(event);
true
}
fn set_forced_wft_failure(&self, err: Box<dyn std::error::Error + Send + Sync>) {
*self.forced_wft_failure.borrow_mut() = Some(err);
self.progress_made.set(true);
}
fn take_forced_wft_failure(&self) -> Option<Box<dyn std::error::Error + Send + Sync>> {
self.forced_wft_failure.borrow_mut().take()
}
fn mark_progress(&self) {
self.progress_made.set(true);
}
fn take_progress(&self) -> bool {
self.progress_made.replace(false)
}
}
struct WorkflowContextInner {
namespace: String,
task_queue: String,
run_id: String,
inital_information: InitializeWorkflow,
runtime: WorkflowRuntimeState,
cancelled_reason: RefCell<Option<String>>,
cancel_wakers: RefCell<Vec<Waker>>,
shared: RefCell<WorkflowContextSharedData>,
seq_nums: RefCell<WfCtxProtectedDat>,
data_converter: DataConverter,
state_mutated: Cell<bool>,
}
pub struct SyncWorkflowContext<W> {
base: BaseWorkflowContext,
headers: Rc<HashMap<String, Payload>>,
_phantom: PhantomData<W>,
}
impl<W> Clone for SyncWorkflowContext<W> {
fn clone(&self) -> Self {
Self {
base: self.base.clone(),
headers: self.headers.clone(),
_phantom: PhantomData,
}
}
}
pub struct WorkflowContext<W> {
sync: SyncWorkflowContext<W>,
workflow_state: Rc<RefCell<W>>,
condition_wakers: Rc<RefCell<Vec<Waker>>>,
}
impl<W> Clone for WorkflowContext<W> {
fn clone(&self) -> Self {
Self {
sync: self.sync.clone(),
workflow_state: self.workflow_state.clone(),
condition_wakers: self.condition_wakers.clone(),
}
}
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct WorkflowContextView {
pub workflow_id: String,
pub run_id: String,
pub workflow_type: String,
pub task_queue: String,
pub namespace: String,
pub attempt: u32,
pub first_execution_run_id: String,
pub continued_from_run_id: Option<String>,
pub start_time: Option<SystemTime>,
pub execution_timeout: Option<Duration>,
pub run_timeout: Option<Duration>,
pub task_timeout: Option<Duration>,
pub parent: Option<ParentWorkflowInfo>,
pub root: Option<RootWorkflowInfo>,
pub retry_policy:
Option<temporalio_common_wasm::protos::temporal::api::common::v1::RetryPolicy>,
pub cron_schedule: Option<String>,
pub memo: Option<Memo>,
pub search_attributes: Option<SearchAttributes>,
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct ParentWorkflowInfo {
pub workflow_id: String,
pub run_id: String,
pub namespace: String,
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct RootWorkflowInfo {
pub workflow_id: String,
pub run_id: String,
}
impl WorkflowContextView {
pub(crate) fn new(
namespace: String,
task_queue: String,
run_id: String,
init: &InitializeWorkflow,
) -> Self {
let parent = init
.parent_workflow_info
.as_ref()
.map(|p| ParentWorkflowInfo {
workflow_id: p.workflow_id.clone(),
run_id: p.run_id.clone(),
namespace: p.namespace.clone(),
});
let root = init.root_workflow.as_ref().map(|r| RootWorkflowInfo {
workflow_id: r.workflow_id.clone(),
run_id: r.run_id.clone(),
});
let continued_from_run_id = if init.continued_from_execution_run_id.is_empty() {
None
} else {
Some(init.continued_from_execution_run_id.clone())
};
let cron_schedule = if init.cron_schedule.is_empty() {
None
} else {
Some(init.cron_schedule.clone())
};
Self {
workflow_id: init.workflow_id.clone(),
run_id,
workflow_type: init.workflow_type.clone(),
task_queue,
namespace,
attempt: init.attempt as u32,
first_execution_run_id: init.first_execution_run_id.clone(),
continued_from_run_id,
start_time: init.start_time.and_then(|t| t.try_into().ok()),
execution_timeout: init
.workflow_execution_timeout
.and_then(|d| d.try_into().ok()),
run_timeout: init.workflow_run_timeout.and_then(|d| d.try_into().ok()),
task_timeout: init.workflow_task_timeout.and_then(|d| d.try_into().ok()),
parent,
root,
retry_policy: init.retry_policy.clone(),
cron_schedule,
memo: init.memo.clone(),
search_attributes: init.search_attributes.clone(),
}
}
}
impl BaseWorkflowContext {
#[doc(hidden)]
pub fn new(
namespace: String,
task_queue: String,
run_id: String,
init_workflow_job: InitializeWorkflow,
data_converter: DataConverter,
host: Rc<dyn WorkflowHost>,
) -> Self {
Self {
inner: Rc::new(WorkflowContextInner {
namespace,
task_queue,
run_id,
shared: RefCell::new(WorkflowContextSharedData {
random_seed: init_workflow_job.randomness_seed,
search_attributes: init_workflow_job
.search_attributes
.clone()
.unwrap_or_default(),
..Default::default()
}),
inital_information: init_workflow_job,
runtime: WorkflowRuntimeState::new(host),
cancelled_reason: RefCell::new(None),
cancel_wakers: RefCell::new(Vec::new()),
seq_nums: RefCell::new(WfCtxProtectedDat {
next_timer_sequence_number: 1,
next_activity_sequence_number: 1,
next_child_workflow_sequence_number: 1,
next_cancel_external_wf_sequence_number: 1,
next_signal_external_wf_sequence_number: 1,
next_nexus_op_sequence_number: 1,
}),
data_converter,
state_mutated: Cell::new(false),
}),
}
}
pub(crate) fn take_state_mutated(&self) -> bool {
self.inner.state_mutated.replace(false)
}
pub(crate) fn set_state_mutated(&self) {
self.inner.state_mutated.set(true);
}
pub(crate) fn take_runtime_progress(&self) -> bool {
self.inner.runtime.take_progress()
}
pub(crate) fn take_forced_wft_failure(
&self,
) -> Option<Box<dyn std::error::Error + Send + Sync>> {
self.inner.runtime.take_forced_wft_failure()
}
pub(crate) fn notify_cancel(&self, reason: String) {
let _guard = SdkWakeGuard::new();
*self.inner.cancelled_reason.borrow_mut() = Some(reason);
for waker in self.inner.cancel_wakers.borrow_mut().drain(..) {
waker.wake();
}
self.inner.runtime.mark_progress();
}
pub(crate) fn unblock(&self, event: UnblockEvent) -> Result<(), anyhow::Error> {
self.inner.runtime.unblock(event)
}
fn cancel(&self, cancellable_id: CancellableID) {
match cancellable_id {
CancellableID::Timer(seq) => {
if self
.inner
.runtime
.maybe_unblock(UnblockEvent::Timer(seq, TimerResult::Cancelled))
{
self.inner.runtime.host.push_command(
workflow_command::Variant::CancelTimer(CancelTimer { seq }).into(),
);
}
}
CancellableID::Activity(seq) => {
self.inner.runtime.host.push_command(
workflow_command::Variant::RequestCancelActivity(RequestCancelActivity { seq })
.into(),
);
}
CancellableID::LocalActivity(seq) => {
self.inner.runtime.host.push_command(
workflow_command::Variant::RequestCancelLocalActivity(
RequestCancelLocalActivity { seq },
)
.into(),
);
}
CancellableID::ChildWorkflow { seqnum, reason } => {
self.inner.runtime.host.push_command(
workflow_command::Variant::CancelChildWorkflowExecution(
CancelChildWorkflowExecution {
child_workflow_seq: seqnum,
reason,
},
)
.into(),
);
}
CancellableID::SignalExternalWorkflow(seq) => {
self.inner.runtime.host.push_command(
workflow_command::Variant::CancelSignalWorkflow(CancelSignalWorkflow { seq })
.into(),
);
}
CancellableID::NexusOp(seq) => {
self.inner.runtime.host.push_command(
workflow_command::Variant::RequestCancelNexusOperation(
RequestCancelNexusOperation { seq },
)
.into(),
);
}
}
}
pub fn current_details(&self) -> String {
self.inner.shared.borrow().current_details.clone()
}
pub fn timer<T: Into<TimerOptions>>(
&self,
opts: T,
) -> impl CancellableFuture<TimerResult> + use<T> {
let opts: TimerOptions = opts.into();
let seq = self.inner.seq_nums.borrow_mut().next_timer_seq();
let (cmd, unblocker) =
CancellableWFCommandFut::new(CancellableID::Timer(seq), self.clone());
self.inner
.runtime
.register_unblocker(PendingCommandId::Timer(seq), unblocker);
self.inner.runtime.host.push_command(opts.into_command(seq));
cmd
}
pub fn start_activity<AD: ActivityDefinition>(
&self,
_activity: AD,
input: impl Into<AD::Input>,
mut opts: ActivityOptions,
) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
where
AD::Output: TemporalDeserializable,
{
let input = input.into();
let payload_converter = self.inner.data_converter.payload_converter();
let ctx = SerializationContext {
data: &SerializationContextData::Workflow,
converter: payload_converter,
};
let payloads = match payload_converter.to_payloads(&ctx, &input) {
Ok(p) => p,
Err(e) => {
return ActivityFut::eager(e.into());
}
};
let seq = self.inner.seq_nums.borrow_mut().next_activity_seq();
let (cmd, unblocker) =
CancellableWFCommandFut::new(CancellableID::Activity(seq), self.clone());
self.inner
.runtime
.register_unblocker(PendingCommandId::Activity(seq), unblocker);
if opts.task_queue.is_none() {
opts.task_queue = Some(self.inner.task_queue.clone());
}
self.inner.runtime.host.push_command(opts.into_command(
seq,
AD::name().to_string(),
payloads,
));
ActivityFut::running(cmd, self.inner.data_converter.clone())
}
pub fn start_local_activity<AD: ActivityDefinition>(
&self,
_activity: AD,
input: impl Into<AD::Input>,
opts: LocalActivityOptions,
) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
where
AD::Output: TemporalDeserializable,
{
let input = input.into();
let payload_converter = self.inner.data_converter.payload_converter();
let ctx = SerializationContext {
data: &SerializationContextData::Workflow,
converter: payload_converter,
};
let payloads = match payload_converter.to_payloads(&ctx, &input) {
Ok(p) => p,
Err(e) => {
return ActivityFut::eager(e.into());
}
};
ActivityFut::running(
LATimerBackoffFut::new(AD::name().to_string(), payloads, opts, self.clone()),
self.inner.data_converter.clone(),
)
}
fn start_child_workflow<WD: WorkflowDefinition>(
&self,
workflow: WD,
input: impl Into<WD::Input>,
opts: ChildWorkflowOptions,
) -> impl CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
where
WD::Output: TemporalDeserializable,
{
let input = input.into();
let payload_converter = self.inner.data_converter.payload_converter();
let ctx = SerializationContext {
data: &SerializationContextData::Workflow,
converter: payload_converter,
};
let payloads = match payload_converter.to_payloads(&ctx, &input) {
Ok(p) => p,
Err(e) => {
return ChildWorkflowStartFut::eager(e.into());
}
};
let workflow_type = workflow.name().to_string();
let child_seq = self.inner.seq_nums.borrow_mut().next_child_workflow_seq();
let (result_cmd, unblocker) = CancellableWFCommandFut::new(
CancellableID::ChildWorkflow {
seqnum: child_seq,
reason: String::new(),
},
self.clone(),
);
self.inner.runtime.register_unblocker(
PendingCommandId::ChildWorkflowComplete(child_seq),
unblocker,
);
let common = ChildWfCommon {
workflow_id: opts.workflow_id.clone(),
child_seq,
result_future: result_cmd,
base_ctx: self.clone(),
data_converter: self.inner.data_converter.clone(),
};
let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat(
CancellableID::ChildWorkflow {
seqnum: child_seq,
reason: String::new(),
},
common,
self.clone(),
);
self.inner
.runtime
.register_unblocker(PendingCommandId::ChildWorkflowStart(child_seq), unblocker);
self.inner
.runtime
.host
.push_command(opts.into_command(child_seq, workflow_type, payloads));
ChildWorkflowStartFut::Running(cmd)
}
fn local_activity_no_timer_retry(
self,
activity_type: String,
arguments: Vec<Payload>,
opts: LocalActivityOptions,
) -> impl CancellableFuture<ActivityResolution> {
let seq = self.inner.seq_nums.borrow_mut().next_activity_seq();
let (cmd, unblocker) =
CancellableWFCommandFut::new(CancellableID::LocalActivity(seq), self.clone());
self.inner
.runtime
.register_unblocker(PendingCommandId::Activity(seq), unblocker);
self.inner
.runtime
.host
.push_command(opts.into_command(seq, activity_type, arguments));
cmd
}
fn send_signal_wf(
self,
target: signal_external_workflow_execution::Target,
signal: Signal,
) -> impl CancellableFuture<SignalExternalWfResult> {
let seq = self
.inner
.seq_nums
.borrow_mut()
.next_signal_external_wf_seq();
let (cmd, unblocker) =
CancellableWFCommandFut::new(CancellableID::SignalExternalWorkflow(seq), self.clone());
self.inner
.runtime
.register_unblocker(PendingCommandId::SignalExternal(seq), unblocker);
let signal = signal.into_invocation();
self.inner.runtime.host.push_command(
workflow_command::Variant::SignalExternalWorkflowExecution(
SignalExternalWorkflowExecution {
seq,
signal_name: signal.signal_name,
args: signal.input,
target: Some(target),
headers: signal.headers,
},
)
.into(),
);
cmd
}
}
impl<W> SyncWorkflowContext<W> {
pub fn workflow_id(&self) -> &str {
&self.base.inner.inital_information.workflow_id
}
pub fn run_id(&self) -> &str {
&self.base.inner.run_id
}
pub fn namespace(&self) -> &str {
&self.base.inner.namespace
}
pub fn task_queue(&self) -> &str {
&self.base.inner.task_queue
}
pub fn workflow_time(&self) -> Option<SystemTime> {
self.base
.inner
.shared
.borrow()
.activation
.timestamp
.try_into_or_none()
}
pub fn history_length(&self) -> u32 {
self.base.inner.shared.borrow().activation.history_length
}
pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
self.base
.inner
.shared
.borrow()
.activation
.clone()
.deployment_version_for_current_task
.map(Into::into)
}
pub fn search_attributes(&self) -> impl Deref<Target = SearchAttributes> + '_ {
Ref::map(self.base.inner.shared.borrow(), |s| &s.search_attributes)
}
pub fn random_seed(&self) -> u64 {
self.base.inner.shared.borrow().random_seed
}
pub fn is_replaying(&self) -> bool {
self.base.inner.shared.borrow().activation.is_replaying
}
pub fn continue_as_new_suggested(&self) -> bool {
self.base
.inner
.shared
.borrow()
.activation
.continue_as_new_suggested
}
pub fn target_worker_deployment_version_changed(&self) -> bool {
self.base
.inner
.shared
.borrow()
.activation
.target_worker_deployment_version_changed
}
pub fn headers(&self) -> &HashMap<String, Payload> {
&self.headers
}
pub fn payload_converter(&self) -> &PayloadConverter {
self.base.inner.data_converter.payload_converter()
}
pub fn workflow_initial_info(&self) -> &InitializeWorkflow {
&self.base.inner.inital_information
}
pub fn cancelled(&self) -> impl FusedFuture<Output = String> + '_ {
let inner = self.base.inner.clone();
future::poll_fn(move |cx| {
if let Some(reason) = inner.cancelled_reason.borrow().as_ref() {
Poll::Ready(reason.clone())
} else {
inner.cancel_wakers.borrow_mut().push(cx.waker().clone());
Poll::Pending
}
})
.fuse()
}
pub fn continue_as_new(
&self,
input: &<W::Run as WorkflowDefinition>::Input,
opts: ContinueAsNewOptions,
) -> Result<std::convert::Infallible, WorkflowTermination>
where
W: WorkflowImplementation,
{
let pc = self.base.inner.data_converter.payload_converter();
let ctx = SerializationContext {
data: &SerializationContextData::Workflow,
converter: pc,
};
let arguments = pc
.to_payloads(&ctx, input)
.map_err(WorkflowTermination::from)?;
let workflow_type = self.workflow_initial_info().workflow_type.clone();
let request = opts.into_request(workflow_type, arguments);
Err(WorkflowTermination::continue_as_new(request))
}
pub fn timer<T: Into<TimerOptions>>(&self, opts: T) -> impl CancellableFuture<TimerResult> {
self.base.timer(opts)
}
pub fn start_activity<AD: ActivityDefinition>(
&self,
activity: AD,
input: impl Into<AD::Input>,
opts: ActivityOptions,
) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
where
AD::Output: TemporalDeserializable,
{
self.base.start_activity(activity, input, opts)
}
pub fn start_local_activity<AD: ActivityDefinition>(
&self,
activity: AD,
input: impl Into<AD::Input>,
opts: LocalActivityOptions,
) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
where
AD::Output: TemporalDeserializable,
{
self.base.start_local_activity(activity, input, opts)
}
pub fn start_child_workflow<WD: WorkflowDefinition>(
&self,
workflow: WD,
input: impl Into<WD::Input>,
opts: ChildWorkflowOptions,
) -> impl CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
where
WD::Output: TemporalDeserializable,
{
self.base.start_child_workflow(workflow, input, opts)
}
#[deprecated(note = "use `start_child_workflow` instead")]
pub fn child_workflow<WD: WorkflowDefinition>(
&self,
workflow: WD,
input: impl Into<WD::Input>,
opts: ChildWorkflowOptions,
) -> impl CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
where
WD::Output: TemporalDeserializable,
{
self.start_child_workflow(workflow, input, opts)
}
pub fn patched(&self, patch_id: &str) -> bool {
self.patch_impl(patch_id, false)
}
pub fn deprecate_patch(&self, patch_id: &str) -> bool {
self.patch_impl(patch_id, true)
}
fn patch_impl(&self, patch_id: &str, deprecated: bool) -> bool {
self.base.inner.runtime.host.push_command(
workflow_command::Variant::SetPatchMarker(SetPatchMarker {
patch_id: patch_id.to_string(),
deprecated,
})
.into(),
);
if let Some(present) = self.base.inner.shared.borrow().changes.get(patch_id) {
return *present;
}
let res = !self.base.inner.shared.borrow().activation.is_replaying;
self.base
.inner
.shared
.borrow_mut()
.changes
.insert(patch_id.to_string(), res);
res
}
pub fn external_workflow(
&self,
workflow_id: impl Into<String>,
run_id: Option<String>,
) -> ExternalWorkflowHandle {
ExternalWorkflowHandle {
workflow_id: workflow_id.into(),
run_id,
namespace: self.base.inner.namespace.clone(),
base_ctx: self.base.clone(),
}
}
pub fn upsert_search_attributes(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
self.base.inner.runtime.host.push_command(
workflow_command::Variant::UpsertWorkflowSearchAttributes(
UpsertWorkflowSearchAttributes {
search_attributes: Some(SearchAttributes {
indexed_fields: attr_iter.into_iter().collect(),
}),
},
)
.into(),
);
}
pub fn upsert_memo(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
self.base.inner.runtime.host.push_command(
workflow_command::Variant::ModifyWorkflowProperties(ModifyWorkflowProperties {
upserted_memo: Some(Memo {
fields: attr_iter.into_iter().collect(),
}),
})
.into(),
);
}
pub fn set_current_details(&self, details: impl Into<String>) {
let details = details.into();
self.base.inner.shared.borrow_mut().current_details = details.clone();
self.base.inner.runtime.host.set_current_details(details);
}
pub fn force_task_fail(&self, with: impl Into<Box<dyn std::error::Error + Send + Sync>>) {
self.base.inner.runtime.set_forced_wft_failure(with.into());
}
pub fn start_nexus_operation(
&self,
opts: NexusOperationOptions,
) -> impl CancellableFuture<NexusStartResult> {
let seq = self.base.inner.seq_nums.borrow_mut().next_nexus_op_seq();
let (result_future, unblocker) = WFCommandFut::new();
self.base
.inner
.runtime
.register_unblocker(PendingCommandId::NexusOpComplete(seq), unblocker);
let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat(
CancellableID::NexusOp(seq),
NexusUnblockData {
result_future: result_future.shared(),
schedule_seq: seq,
base_ctx: self.base.clone(),
},
self.base.clone(),
);
self.base
.inner
.runtime
.register_unblocker(PendingCommandId::NexusOpStart(seq), unblocker);
self.base
.inner
.runtime
.host
.push_command(opts.into_command(seq));
cmd
}
pub(crate) fn view(&self) -> WorkflowContextView {
self.base.view()
}
}
impl<W> WorkflowContext<W> {
pub(crate) fn from_base(base: BaseWorkflowContext, workflow_state: Rc<RefCell<W>>) -> Self {
Self {
sync: SyncWorkflowContext {
base,
headers: Rc::new(HashMap::new()),
_phantom: PhantomData,
},
workflow_state,
condition_wakers: Rc::new(RefCell::new(Vec::new())),
}
}
pub(crate) fn with_headers(&self, headers: HashMap<String, Payload>) -> Self {
Self {
sync: SyncWorkflowContext {
base: self.sync.base.clone(),
headers: Rc::new(headers),
_phantom: PhantomData,
},
workflow_state: self.workflow_state.clone(),
condition_wakers: self.condition_wakers.clone(),
}
}
pub(crate) fn sync_context(&self) -> SyncWorkflowContext<W> {
self.sync.clone()
}
pub(crate) fn view(&self) -> WorkflowContextView {
self.sync.view()
}
pub fn workflow_id(&self) -> &str {
self.sync.workflow_id()
}
pub fn run_id(&self) -> &str {
self.sync.run_id()
}
pub fn namespace(&self) -> &str {
self.sync.namespace()
}
pub fn task_queue(&self) -> &str {
self.sync.task_queue()
}
pub fn workflow_time(&self) -> Option<SystemTime> {
self.sync.workflow_time()
}
pub fn history_length(&self) -> u32 {
self.sync.history_length()
}
pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
self.sync.current_deployment_version()
}
pub fn search_attributes(&self) -> impl Deref<Target = SearchAttributes> + '_ {
self.sync.search_attributes()
}
pub fn random_seed(&self) -> u64 {
self.sync.random_seed()
}
pub fn is_replaying(&self) -> bool {
self.sync.is_replaying()
}
pub fn continue_as_new_suggested(&self) -> bool {
self.sync.continue_as_new_suggested()
}
pub fn target_worker_deployment_version_changed(&self) -> bool {
self.sync.target_worker_deployment_version_changed()
}
pub fn headers(&self) -> &HashMap<String, Payload> {
self.sync.headers()
}
pub fn payload_converter(&self) -> &PayloadConverter {
self.sync.payload_converter()
}
pub fn workflow_initial_info(&self) -> &InitializeWorkflow {
self.sync.workflow_initial_info()
}
pub fn cancelled(&self) -> impl FusedFuture<Output = String> + '_ {
self.sync.cancelled()
}
pub fn timer<T: Into<TimerOptions>>(&self, opts: T) -> impl CancellableFuture<TimerResult> {
self.sync.timer(opts)
}
pub fn start_activity<AD: ActivityDefinition>(
&self,
activity: AD,
input: impl Into<AD::Input>,
opts: ActivityOptions,
) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
where
AD::Output: TemporalDeserializable,
{
self.sync.start_activity(activity, input, opts)
}
pub fn start_local_activity<AD: ActivityDefinition>(
&self,
activity: AD,
input: impl Into<AD::Input>,
opts: LocalActivityOptions,
) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
where
AD::Output: TemporalDeserializable,
{
self.sync.start_local_activity(activity, input, opts)
}
pub fn start_child_workflow<WD: WorkflowDefinition>(
&self,
workflow: WD,
input: impl Into<WD::Input>,
opts: ChildWorkflowOptions,
) -> impl CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
where
WD::Output: TemporalDeserializable,
{
self.sync.start_child_workflow(workflow, input, opts)
}
#[deprecated(note = "use `start_child_workflow` instead")]
pub fn child_workflow<WD: WorkflowDefinition>(
&self,
workflow: WD,
input: impl Into<WD::Input>,
opts: ChildWorkflowOptions,
) -> impl CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
where
WD::Output: TemporalDeserializable,
{
self.start_child_workflow(workflow, input, opts)
}
pub fn patched(&self, patch_id: &str) -> bool {
self.sync.patched(patch_id)
}
pub fn deprecate_patch(&self, patch_id: &str) -> bool {
self.sync.deprecate_patch(patch_id)
}
pub fn external_workflow(
&self,
workflow_id: impl Into<String>,
run_id: Option<String>,
) -> ExternalWorkflowHandle {
self.sync.external_workflow(workflow_id, run_id)
}
pub fn upsert_search_attributes(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
self.sync.upsert_search_attributes(attr_iter)
}
pub fn upsert_memo(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
self.sync.upsert_memo(attr_iter)
}
pub fn set_current_details(&self, details: impl Into<String>) {
self.sync.set_current_details(details)
}
pub fn force_task_fail(&self, with: impl Into<Box<dyn std::error::Error + Send + Sync>>) {
self.sync.force_task_fail(with)
}
pub fn start_nexus_operation(
&self,
opts: NexusOperationOptions,
) -> impl CancellableFuture<NexusStartResult> {
self.sync.start_nexus_operation(opts)
}
pub fn state<R>(&self, f: impl FnOnce(&W) -> R) -> R {
f(&*self.workflow_state.borrow())
}
pub fn state_mut<R>(&self, f: impl FnOnce(&mut W) -> R) -> R {
let result = f(&mut *self.workflow_state.borrow_mut());
let _guard = SdkWakeGuard::new();
for waker in self.condition_wakers.borrow_mut().drain(..) {
waker.wake();
}
self.sync.base.set_state_mutated();
result
}
pub fn continue_as_new(
&self,
input: &<W::Run as WorkflowDefinition>::Input,
opts: ContinueAsNewOptions,
) -> Result<std::convert::Infallible, WorkflowTermination>
where
W: WorkflowImplementation,
{
self.sync.continue_as_new(input, opts)
}
pub fn wait_condition<'a>(
&'a self,
mut condition: impl FnMut(&W) -> bool + 'a,
) -> impl FusedFuture<Output = ()> + 'a {
future::poll_fn(move |cx: &mut Context<'_>| {
if condition(&*self.workflow_state.borrow()) {
Poll::Ready(())
} else {
self.condition_wakers.borrow_mut().push(cx.waker().clone());
Poll::Pending
}
})
.fuse()
}
}
struct WfCtxProtectedDat {
next_timer_sequence_number: u32,
next_activity_sequence_number: u32,
next_child_workflow_sequence_number: u32,
next_cancel_external_wf_sequence_number: u32,
next_signal_external_wf_sequence_number: u32,
next_nexus_op_sequence_number: u32,
}
impl WfCtxProtectedDat {
fn next_timer_seq(&mut self) -> u32 {
let seq = self.next_timer_sequence_number;
self.next_timer_sequence_number += 1;
seq
}
fn next_activity_seq(&mut self) -> u32 {
let seq = self.next_activity_sequence_number;
self.next_activity_sequence_number += 1;
seq
}
fn next_child_workflow_seq(&mut self) -> u32 {
let seq = self.next_child_workflow_sequence_number;
self.next_child_workflow_sequence_number += 1;
seq
}
fn next_cancel_external_wf_seq(&mut self) -> u32 {
let seq = self.next_cancel_external_wf_sequence_number;
self.next_cancel_external_wf_sequence_number += 1;
seq
}
fn next_signal_external_wf_seq(&mut self) -> u32 {
let seq = self.next_signal_external_wf_sequence_number;
self.next_signal_external_wf_sequence_number += 1;
seq
}
fn next_nexus_op_seq(&mut self) -> u32 {
let seq = self.next_nexus_op_sequence_number;
self.next_nexus_op_sequence_number += 1;
seq
}
}
#[derive(Clone, Debug, Default)]
struct WorkflowContextSharedData {
changes: HashMap<String, bool>,
activation: CoreWorkflowActivation,
search_attributes: SearchAttributes,
random_seed: u64,
current_details: String,
}
pub trait CancellableFuture<T>: Future<Output = T> + FusedFuture {
fn cancel(&self);
}
pub trait CancellableFutureWithReason<T>: CancellableFuture<T> {
fn cancel_with_reason(&self, reason: String);
}
struct WFCommandFut<T, D> {
_unused: PhantomData<T>,
result_rx: oneshot::Receiver<UnblockEvent>,
other_dat: Option<D>,
}
impl<T> WFCommandFut<T, ()> {
fn new() -> (Self, oneshot::Sender<UnblockEvent>) {
Self::new_with_dat(())
}
}
impl<T, D> WFCommandFut<T, D> {
fn new_with_dat(other_dat: D) -> (Self, oneshot::Sender<UnblockEvent>) {
let (tx, rx) = oneshot::channel();
(
Self {
_unused: PhantomData,
result_rx: rx,
other_dat: Some(other_dat),
},
tx,
)
}
}
impl<T, D> Unpin for WFCommandFut<T, D> where T: Unblockable<OtherDat = D> {}
impl<T, D> Future for WFCommandFut<T, D>
where
T: Unblockable<OtherDat = D>,
{
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.result_rx.poll_unpin(cx).map(|x| {
let od = self
.other_dat
.take()
.expect("Other data must exist when resolving command future");
Unblockable::unblock(x.unwrap(), od)
})
}
}
impl<T, D> FusedFuture for WFCommandFut<T, D>
where
T: Unblockable<OtherDat = D>,
{
fn is_terminated(&self) -> bool {
self.other_dat.is_none()
}
}
struct CancellableWFCommandFut<T, D> {
cmd_fut: WFCommandFut<T, D>,
cancellable_id: CancellableID,
base_ctx: BaseWorkflowContext,
}
impl<T> CancellableWFCommandFut<T, ()> {
fn new(
cancellable_id: CancellableID,
base_ctx: BaseWorkflowContext,
) -> (Self, oneshot::Sender<UnblockEvent>) {
Self::new_with_dat(cancellable_id, (), base_ctx)
}
}
impl<T, D> CancellableWFCommandFut<T, D> {
fn new_with_dat(
cancellable_id: CancellableID,
other_dat: D,
base_ctx: BaseWorkflowContext,
) -> (Self, oneshot::Sender<UnblockEvent>) {
let (cmd_fut, sender) = WFCommandFut::new_with_dat(other_dat);
(
Self {
cmd_fut,
cancellable_id,
base_ctx,
},
sender,
)
}
}
impl<T, D> Unpin for CancellableWFCommandFut<T, D> where T: Unblockable<OtherDat = D> {}
impl<T, D> Future for CancellableWFCommandFut<T, D>
where
T: Unblockable<OtherDat = D>,
{
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.cmd_fut.poll_unpin(cx)
}
}
impl<T, D> FusedFuture for CancellableWFCommandFut<T, D>
where
T: Unblockable<OtherDat = D>,
{
fn is_terminated(&self) -> bool {
self.cmd_fut.is_terminated()
}
}
impl<T, D> CancellableFuture<T> for CancellableWFCommandFut<T, D>
where
T: Unblockable<OtherDat = D>,
{
fn cancel(&self) {
self.base_ctx.cancel(self.cancellable_id.clone());
}
}
impl<T, D> CancellableFutureWithReason<T> for CancellableWFCommandFut<T, D>
where
T: Unblockable<OtherDat = D>,
{
fn cancel_with_reason(&self, reason: String) {
self.base_ctx
.cancel(self.cancellable_id.clone().with_reason(reason));
}
}
struct LATimerBackoffFut {
la_opts: LocalActivityOptions,
activity_type: String,
arguments: Vec<Payload>,
current_fut: Pin<Box<dyn CancellableFuture<ActivityResolution> + Unpin>>,
timer_fut: Option<Pin<Box<dyn CancellableFuture<TimerResult> + Unpin>>>,
base_ctx: BaseWorkflowContext,
next_attempt: u32,
next_sched_time: Option<prost_types::Timestamp>,
did_cancel: AtomicBool,
terminated: bool,
}
impl LATimerBackoffFut {
fn new(
activity_type: String,
arguments: Vec<Payload>,
opts: LocalActivityOptions,
base_ctx: BaseWorkflowContext,
) -> Self {
let current_fut = Box::pin(base_ctx.clone().local_activity_no_timer_retry(
activity_type.clone(),
arguments.clone(),
opts.clone(),
));
Self {
la_opts: opts,
activity_type,
arguments,
current_fut,
timer_fut: None,
base_ctx,
next_attempt: 1,
next_sched_time: None,
did_cancel: AtomicBool::new(false),
terminated: false,
}
}
}
impl Unpin for LATimerBackoffFut {}
impl Future for LATimerBackoffFut {
type Output = ActivityResolution;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(tf) = self.timer_fut.as_mut() {
return match tf.poll_unpin(cx) {
Poll::Ready(tr) => {
self.timer_fut = None;
if let TimerResult::Fired = tr {
let mut opts = self.la_opts.clone();
opts.attempt = Some(self.next_attempt);
opts.original_schedule_time
.clone_from(&self.next_sched_time);
self.current_fut =
Box::pin(self.base_ctx.clone().local_activity_no_timer_retry(
self.activity_type.clone(),
self.arguments.clone(),
opts,
));
Poll::Pending
} else {
self.terminated = true;
Poll::Ready(ActivityResolution {
status: Some(activity_resolution::Status::Cancelled(Cancellation {
failure: Some(Failure {
message: "Activity cancelled".to_owned(),
failure_info: Some(FailureInfo::CanceledFailureInfo(
CanceledFailureInfo::default(),
)),
..Default::default()
}),
})),
})
}
}
Poll::Pending => Poll::Pending,
};
}
let poll_res = self.current_fut.poll_unpin(cx);
if let Poll::Ready(ref r) = poll_res
&& let Some(activity_resolution::Status::Backoff(b)) = r.status.as_ref()
{
if self.did_cancel.load(Ordering::Acquire) {
self.terminated = true;
return Poll::Ready(ActivityResolution {
status: Some(activity_resolution::Status::Cancelled(Cancellation {
failure: Some(Failure {
message: "Activity cancelled".to_owned(),
failure_info: Some(FailureInfo::CanceledFailureInfo(
CanceledFailureInfo::default(),
)),
..Default::default()
}),
})),
});
}
let timer_f = self.base_ctx.timer::<Duration>(
b.backoff_duration
.expect("Duration is set")
.try_into()
.expect("duration converts ok"),
);
self.timer_fut = Some(Box::pin(timer_f));
self.next_attempt = b.attempt;
self.next_sched_time.clone_from(&b.original_schedule_time);
return Poll::Pending;
}
if poll_res.is_ready() {
self.terminated = true;
}
poll_res
}
}
impl FusedFuture for LATimerBackoffFut {
fn is_terminated(&self) -> bool {
self.terminated
}
}
impl CancellableFuture<ActivityResolution> for LATimerBackoffFut {
fn cancel(&self) {
self.did_cancel.store(true, Ordering::Release);
if let Some(tf) = self.timer_fut.as_ref() {
tf.cancel();
}
self.current_fut.cancel();
}
}
enum ActivityFut<F, Output> {
Errored {
error: Option<Box<ActivityExecutionError>>,
_phantom: PhantomData<Output>,
},
Running {
inner: F,
data_converter: DataConverter,
_phantom: PhantomData<Output>,
},
Terminated,
}
impl<F, Output> ActivityFut<F, Output> {
fn eager(err: ActivityExecutionError) -> Self {
Self::Errored {
error: Some(Box::new(err)),
_phantom: PhantomData,
}
}
fn running(inner: F, data_converter: DataConverter) -> Self {
Self::Running {
inner,
data_converter,
_phantom: PhantomData,
}
}
}
impl<F, Output> Unpin for ActivityFut<F, Output> where F: Unpin {}
impl<F, Output> Future for ActivityFut<F, Output>
where
F: Future<Output = ActivityResolution> + Unpin,
Output: TemporalDeserializable + 'static,
{
type Output = Result<Output, ActivityExecutionError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let poll = match this {
ActivityFut::Errored { error, .. } => {
Poll::Ready(Err(*error.take().expect("polled after completion")))
}
ActivityFut::Running {
inner,
data_converter,
..
} => match Pin::new(inner).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(resolution) => Poll::Ready({
let status = resolution.status.ok_or_else(|| {
data_converter
.to_error(
&SerializationContextData::Workflow,
Failure {
message: "Activity completed without a status".to_string(),
..Default::default()
},
ActivityExecutionDecodeHint { cancelled: false },
)
.expect("synthetic activity failure should decode")
})?;
match status {
activity_resolution::Status::Completed(success) => {
let payload = success.result.unwrap_or_default();
let ctx = SerializationContext {
data: &SerializationContextData::Workflow,
converter: data_converter.payload_converter(),
};
data_converter
.payload_converter()
.from_payload::<Output>(&ctx, payload)
.map_err(ActivityExecutionError::Serialization)
}
activity_resolution::Status::Failed(f) => Err(data_converter.to_error(
&SerializationContextData::Workflow,
f.failure.unwrap_or_default(),
ActivityExecutionDecodeHint { cancelled: false },
)?),
activity_resolution::Status::Cancelled(c) => Err(data_converter.to_error(
&SerializationContextData::Workflow,
c.failure.unwrap_or_default(),
ActivityExecutionDecodeHint { cancelled: true },
)?),
activity_resolution::Status::Backoff(_) => {
panic!("DoBackoff should be handled by LATimerBackoffFut")
}
}
}),
},
ActivityFut::Terminated => panic!("polled after termination"),
};
if poll.is_ready() {
*this = ActivityFut::Terminated;
}
poll
}
}
impl<F, Output> FusedFuture for ActivityFut<F, Output>
where
F: Future<Output = ActivityResolution> + Unpin,
Output: TemporalDeserializable + 'static,
{
fn is_terminated(&self) -> bool {
matches!(self, ActivityFut::Terminated)
}
}
impl<F, Output> CancellableFuture<Result<Output, ActivityExecutionError>> for ActivityFut<F, Output>
where
F: CancellableFuture<ActivityResolution> + Unpin,
Output: TemporalDeserializable + 'static,
{
fn cancel(&self) {
if let ActivityFut::Running { inner, .. } = self {
inner.cancel()
}
}
}
pub(crate) struct ChildWfCommon {
workflow_id: String,
child_seq: u32,
result_future: CancellableWFCommandFut<ChildWorkflowResult, ()>,
base_ctx: BaseWorkflowContext,
data_converter: DataConverter,
}
#[derive(derive_more::Debug)]
pub(crate) struct PendingChildWorkflow<WD: WorkflowDefinition> {
pub(crate) status: ChildWorkflowStartStatus,
#[debug(skip)]
pub(crate) common: ChildWfCommon,
pub(crate) _phantom: PhantomData<WD>,
}
#[derive(derive_more::Debug)]
pub struct StartedChildWorkflow<WD: WorkflowDefinition> {
pub run_id: String,
#[debug(skip)]
common: ChildWfCommon,
_phantom: PhantomData<WD>,
}
enum ChildWorkflowFut<F, Output> {
Running {
inner: F,
data_converter: DataConverter,
_phantom: PhantomData<Output>,
},
Terminated,
}
impl<F, Output> Unpin for ChildWorkflowFut<F, Output> where F: Unpin {}
impl<F, Output> Future for ChildWorkflowFut<F, Output>
where
F: Future<Output = ChildWorkflowResult> + Unpin,
Output: TemporalDeserializable + 'static,
{
type Output = Result<Output, ChildWorkflowExecutionError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let poll = match this {
ChildWorkflowFut::Running {
inner,
data_converter,
..
} => match Pin::new(inner).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(result) => Poll::Ready({
let status = result.status.ok_or_else(|| {
data_converter
.to_error(
&SerializationContextData::Workflow,
Failure {
message: "Child workflow completed without a status"
.to_string(),
..Default::default()
},
ChildWorkflowExecutionDecodeHint,
)
.expect("synthetic child workflow failure should decode")
})?;
match status {
child_workflow_result::Status::Completed(success) => {
let payloads = success.result.into_iter().collect();
let ctx = SerializationContext {
data: &SerializationContextData::Workflow,
converter: data_converter.payload_converter(),
};
data_converter
.payload_converter()
.from_payloads::<Output>(&ctx, payloads)
.map_err(ChildWorkflowExecutionError::Serialization)
}
child_workflow_result::Status::Failed(f) => Err(data_converter.to_error(
&SerializationContextData::Workflow,
f.failure.unwrap_or_default(),
ChildWorkflowExecutionDecodeHint,
)?),
child_workflow_result::Status::Cancelled(c) => Err(data_converter
.to_error(
&SerializationContextData::Workflow,
c.failure.unwrap_or_default(),
ChildWorkflowExecutionDecodeHint,
)?),
}
}),
},
ChildWorkflowFut::Terminated => panic!("polled after termination"),
};
if poll.is_ready() {
*this = ChildWorkflowFut::Terminated;
}
poll
}
}
impl<F, Output> FusedFuture for ChildWorkflowFut<F, Output>
where
F: Future<Output = ChildWorkflowResult> + Unpin,
Output: TemporalDeserializable + 'static,
{
fn is_terminated(&self) -> bool {
matches!(self, ChildWorkflowFut::Terminated)
}
}
impl<F, Output> CancellableFutureWithReason<Result<Output, ChildWorkflowExecutionError>>
for ChildWorkflowFut<F, Output>
where
F: CancellableFutureWithReason<ChildWorkflowResult> + Unpin,
Output: TemporalDeserializable + 'static,
{
fn cancel_with_reason(&self, reason: String) {
if let ChildWorkflowFut::Running { inner, .. } = self {
inner.cancel_with_reason(reason)
}
}
}
impl<F, Output> CancellableFuture<Result<Output, ChildWorkflowExecutionError>>
for ChildWorkflowFut<F, Output>
where
F: CancellableFutureWithReason<ChildWorkflowResult> + Unpin,
Output: TemporalDeserializable + 'static,
{
fn cancel(&self) {
if let ChildWorkflowFut::Running { inner, .. } = self {
inner.cancel()
}
}
}
enum ChildWorkflowStartFut<F, WD: WorkflowDefinition> {
Errored {
error: Option<Box<ChildWorkflowStartError>>,
_phantom: PhantomData<WD>,
},
Running(F),
Terminated,
}
impl<F, WD: WorkflowDefinition> ChildWorkflowStartFut<F, WD> {
fn eager(err: ChildWorkflowStartError) -> Self {
Self::Errored {
error: Some(Box::new(err)),
_phantom: PhantomData,
}
}
}
impl<F, WD: WorkflowDefinition> Unpin for ChildWorkflowStartFut<F, WD> where F: Unpin {}
impl<F, WD> Future for ChildWorkflowStartFut<F, WD>
where
F: Future<Output = PendingChildWorkflow<WD>> + Unpin,
WD: WorkflowDefinition,
{
type Output = Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let poll = match this {
ChildWorkflowStartFut::Errored { error, .. } => {
Poll::Ready(Err(*error.take().expect("polled after completion")))
}
ChildWorkflowStartFut::Running(inner) => match Pin::new(inner).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(pending) => Poll::Ready(match pending.status {
ChildWorkflowStartStatus::Succeeded(s) => Ok(StartedChildWorkflow {
run_id: s.run_id,
common: pending.common,
_phantom: PhantomData,
}),
ChildWorkflowStartStatus::Failed(f) => {
Err(ChildWorkflowStartError::StartFailed {
workflow_id: f.workflow_id,
workflow_type: f.workflow_type,
cause: StartChildWorkflowExecutionFailedCause::try_from(f.cause)
.unwrap_or(StartChildWorkflowExecutionFailedCause::Unspecified),
})
}
ChildWorkflowStartStatus::Cancelled(c) => {
Err(pending.common.data_converter.to_error(
&SerializationContextData::Workflow,
c.failure.unwrap_or_default(),
ChildWorkflowStartDecodeHint,
)?)
}
}),
},
ChildWorkflowStartFut::Terminated => panic!("polled after termination"),
};
if poll.is_ready() {
*this = ChildWorkflowStartFut::Terminated;
}
poll
}
}
impl<F, WD> FusedFuture for ChildWorkflowStartFut<F, WD>
where
F: Future<Output = PendingChildWorkflow<WD>> + Unpin,
WD: WorkflowDefinition,
{
fn is_terminated(&self) -> bool {
matches!(self, ChildWorkflowStartFut::Terminated)
}
}
impl<F, WD> CancellableFuture<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
for ChildWorkflowStartFut<F, WD>
where
F: CancellableFutureWithReason<PendingChildWorkflow<WD>> + Unpin,
WD: WorkflowDefinition,
{
fn cancel(&self) {
if let ChildWorkflowStartFut::Running(inner) = self {
inner.cancel()
}
}
}
impl<F, WD> CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
for ChildWorkflowStartFut<F, WD>
where
F: CancellableFutureWithReason<PendingChildWorkflow<WD>> + Unpin,
WD: WorkflowDefinition,
{
fn cancel_with_reason(&self, reason: String) {
if let ChildWorkflowStartFut::Running(inner) = self {
inner.cancel_with_reason(reason)
}
}
}
enum SignalChildFut<F> {
Errored {
error: Option<WorkflowSignalError>,
},
Running {
inner: F,
data_converter: DataConverter,
},
Terminated,
}
impl<F> SignalChildFut<F> {
fn eager(err: WorkflowSignalError) -> Self {
Self::Errored { error: Some(err) }
}
}
impl<F> Unpin for SignalChildFut<F> where F: Unpin {}
impl<F> Future for SignalChildFut<F>
where
F: Future<Output = SignalExternalWfResult> + Unpin,
{
type Output = Result<(), WorkflowSignalError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let poll = match this {
SignalChildFut::Errored { error } => {
Poll::Ready(Err(error.take().expect("polled after completion")))
}
SignalChildFut::Running {
inner,
data_converter,
} => match Pin::new(inner).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
Poll::Ready(Err(failure)) => Poll::Ready(Err(data_converter.to_error(
&SerializationContextData::Workflow,
failure,
WorkflowSignalDecodeHint,
)?)),
},
SignalChildFut::Terminated => panic!("polled after termination"),
};
if poll.is_ready() {
*this = SignalChildFut::Terminated;
}
poll
}
}
impl<F> FusedFuture for SignalChildFut<F>
where
F: Future<Output = SignalExternalWfResult> + Unpin,
{
fn is_terminated(&self) -> bool {
matches!(self, SignalChildFut::Terminated)
}
}
impl<F> CancellableFuture<Result<(), WorkflowSignalError>> for SignalChildFut<F>
where
F: CancellableFuture<SignalExternalWfResult> + Unpin,
{
fn cancel(&self) {
if let SignalChildFut::Running { inner, .. } = self {
inner.cancel()
}
}
}
impl<WD: WorkflowDefinition> StartedChildWorkflow<WD>
where
WD::Output: TemporalDeserializable + 'static,
{
pub fn result(
self,
) -> impl CancellableFutureWithReason<Result<WD::Output, ChildWorkflowExecutionError>> {
ChildWorkflowFut::Running {
inner: self.common.result_future,
data_converter: self.common.data_converter,
_phantom: PhantomData,
}
}
pub fn cancel(&self, reason: String) {
self.common.base_ctx.inner.runtime.host.push_command(
workflow_command::Variant::CancelChildWorkflowExecution(CancelChildWorkflowExecution {
child_workflow_seq: self.common.child_seq,
reason,
})
.into(),
);
}
pub fn signal<S: SignalDefinition<Workflow = WD>>(
&self,
signal: S,
input: S::Input,
) -> impl CancellableFuture<Result<(), WorkflowSignalError>> + 'static {
let payload_converter = self.common.data_converter.payload_converter();
let ctx = SerializationContext {
data: &SerializationContextData::Workflow,
converter: payload_converter,
};
let payloads = match payload_converter.to_payloads(&ctx, &input) {
Ok(p) => p,
Err(e) => {
return SignalChildFut::eager(e.into());
}
};
let signal = Signal::new(S::name(&signal), payloads);
let target = signal_external_workflow_execution::Target::ChildWorkflowId(
self.common.workflow_id.clone(),
);
SignalChildFut::Running {
inner: self.common.base_ctx.clone().send_signal_wf(target, signal),
data_converter: self.common.data_converter.clone(),
}
}
}
#[derive(derive_more::Debug)]
pub struct ExternalWorkflowHandle {
workflow_id: String,
run_id: Option<String>,
namespace: String,
#[debug(skip)]
base_ctx: BaseWorkflowContext,
}
impl ExternalWorkflowHandle {
pub fn workflow_id(&self) -> &str {
&self.workflow_id
}
pub fn run_id(&self) -> Option<&str> {
self.run_id.as_deref()
}
pub fn signal<S: SignalDefinition>(
&self,
signal: S,
input: S::Input,
) -> impl CancellableFuture<Result<(), WorkflowSignalError>> + 'static {
let payload_converter = self.base_ctx.data_converter().payload_converter();
let ctx = SerializationContext {
data: &SerializationContextData::Workflow,
converter: payload_converter,
};
let payloads = match payload_converter.to_payloads(&ctx, &input) {
Ok(p) => p,
Err(e) => {
return SignalChildFut::eager(e.into());
}
};
let signal = Signal::new(S::name(&signal), payloads);
let target = signal_external_workflow_execution::Target::WorkflowExecution(
NamespacedWorkflowExecution {
namespace: self.namespace.clone(),
workflow_id: self.workflow_id.clone(),
run_id: self.run_id.clone().unwrap_or_default(),
},
);
SignalChildFut::Running {
inner: self.base_ctx.clone().send_signal_wf(target, signal),
data_converter: self.base_ctx.data_converter().clone(),
}
}
pub fn cancel(
&self,
reason: Option<String>,
) -> impl FusedFuture<Output = CancelExternalWfResult> {
let seq = self
.base_ctx
.inner
.seq_nums
.borrow_mut()
.next_cancel_external_wf_seq();
let (cmd, unblocker) = WFCommandFut::new();
self.base_ctx
.inner
.runtime
.register_unblocker(PendingCommandId::CancelExternal(seq), unblocker);
self.base_ctx.inner.runtime.host.push_command(
workflow_command::Variant::RequestCancelExternalWorkflowExecution(
RequestCancelExternalWorkflowExecution {
seq,
workflow_execution: Some(NamespacedWorkflowExecution {
namespace: self.namespace.clone(),
workflow_id: self.workflow_id.clone(),
run_id: self.run_id.clone().unwrap_or_default(),
}),
reason: reason.unwrap_or_default(),
},
)
.into(),
);
cmd
}
}
#[derive(derive_more::Debug)]
#[debug("StartedNexusOperation{{ operation_token: {operation_token:?} }}")]
pub struct StartedNexusOperation {
pub operation_token: Option<String>,
pub(crate) unblock_dat: NexusUnblockData,
}
pub(crate) struct NexusUnblockData {
result_future: Shared<WFCommandFut<NexusOperationResult, ()>>,
schedule_seq: u32,
base_ctx: BaseWorkflowContext,
}
impl StartedNexusOperation {
pub async fn result(&self) -> NexusOperationResult {
SdkGuardedFuture(self.unblock_dat.result_future.clone()).await
}
pub fn cancel(&self) {
self.unblock_dat
.base_ctx
.cancel(CancellableID::NexusOp(self.unblock_dat.schedule_seq));
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use temporalio_common_wasm::{
data_converters::{TemporalDeserializable, TemporalSerializable},
protos::{
coresdk::{
AsJsonPayloadExt, common::VersioningIntent as ProtoVersioningIntent,
workflow_commands::WorkflowCommand,
},
temporal::api::{
common::v1::{Payload, RetryPolicy},
enums::v1::ContinueAsNewVersioningBehavior as ProtoContinueAsNewVersioningBehavior,
},
},
};
use temporalio_macros::{workflow, workflow_methods};
#[derive(Default)]
struct NoopHost;
impl WorkflowHost for NoopHost {
fn set_current_details(&self, _details: String) {}
fn push_command(&self, _command: WorkflowCommand) {}
}
#[workflow]
#[derive(Default)]
struct TestWorkflow;
#[workflow_methods]
impl TestWorkflow {
#[run]
async fn run(_ctx: &mut WorkflowContext<Self>, _input: u8) -> crate::WorkflowResult<()> {
unreachable!("test workflow run should not be polled")
}
}
fn test_context() -> WorkflowContext<TestWorkflow> {
let init = InitializeWorkflow {
workflow_type: TestWorkflow.name().to_string(),
..Default::default()
};
let base = BaseWorkflowContext::new(
"default".to_string(),
"orig-task-queue".to_string(),
"run-id".to_string(),
init,
DataConverter::default(),
Rc::new(NoopHost),
);
WorkflowContext::from_base(base, Rc::new(RefCell::new(TestWorkflow)))
}
#[test]
fn workflow_context_continue_as_new_serializes_input_and_defaults() {
let ctx = test_context();
let termination = ctx
.continue_as_new(&7, ContinueAsNewOptions::default())
.expect_err("continue_as_new should terminate the workflow");
assert!(
matches!(termination, WorkflowTermination::ContinueAsNew(_)),
"expected continue-as-new termination, got {termination:?}"
);
let WorkflowTermination::ContinueAsNew(cmd) = termination else {
unreachable!()
};
assert_eq!(
*cmd,
crate::runtime::types::ContinueAsNewRequest {
workflow_type: TestWorkflow.name().to_string(),
task_queue: String::new(),
arguments: vec![7u8.as_json_payload().unwrap()],
workflow_run_timeout: None,
workflow_task_timeout: None,
backoff_start_interval: None,
memo: HashMap::new(),
headers: HashMap::new(),
search_attributes: None,
retry_policy: None,
versioning_intent: ProtoVersioningIntent::Unspecified.into(),
initial_versioning_behavior: ProtoContinueAsNewVersioningBehavior::Unspecified
.into(),
}
);
}
#[test]
fn sync_workflow_context_continue_as_new_applies_options() {
let ctx = test_context();
let sync = ctx.sync_context();
let mut memo = HashMap::new();
memo.insert(
"memo-key".to_string(),
Payload::from(b"memo-value".as_slice()),
);
let mut headers = HashMap::new();
headers.insert(
"header-key".to_string(),
Payload::from(b"header-value".as_slice()),
);
let mut search_attributes = SearchAttributes::default();
search_attributes.indexed_fields.insert(
"CustomKeywordField".to_string(),
Payload::from(b"value".as_slice()),
);
let termination = sync
.continue_as_new(
&11,
ContinueAsNewOptions {
workflow_type: Some("next-workflow".to_string()),
task_queue: Some("next-task-queue".to_string()),
run_timeout: Some(Duration::from_secs(10)),
task_timeout: Some(Duration::from_secs(3)),
backoff_start_interval: Some(Duration::from_secs(4)),
memo: Some(memo.clone()),
headers: Some(headers.clone()),
search_attributes: Some(search_attributes.clone()),
retry_policy: Some(RetryPolicy {
maximum_attempts: 5,
..Default::default()
}),
versioning_intent: Some(ProtoVersioningIntent::Compatible),
initial_versioning_behavior: Some(
ContinueAsNewVersioningBehavior::UseRampingVersion,
),
},
)
.expect_err("continue_as_new should terminate the workflow");
assert!(
matches!(termination, WorkflowTermination::ContinueAsNew(_)),
"expected continue-as-new termination, got {termination:?}"
);
let WorkflowTermination::ContinueAsNew(cmd) = termination else {
unreachable!()
};
assert_eq!(
*cmd,
crate::runtime::types::ContinueAsNewRequest {
workflow_type: "next-workflow".to_string(),
task_queue: "next-task-queue".to_string(),
arguments: vec![11u8.as_json_payload().unwrap()],
workflow_run_timeout: Some(Duration::from_secs(10).try_into().unwrap()),
workflow_task_timeout: Some(Duration::from_secs(3).try_into().unwrap()),
backoff_start_interval: Some(Duration::from_secs(4).try_into().unwrap()),
memo,
headers,
search_attributes: Some(search_attributes),
retry_policy: Some(RetryPolicy {
maximum_attempts: 5,
..Default::default()
}),
versioning_intent: ProtoVersioningIntent::Compatible.into(),
initial_versioning_behavior: ProtoContinueAsNewVersioningBehavior::UseRampingVersion
as i32,
}
);
}
#[test]
fn continue_as_new_preserves_explicit_empty_search_attributes() {
let ctx = test_context();
let sync = ctx.sync_context();
let termination = sync
.continue_as_new(
&11,
ContinueAsNewOptions {
search_attributes: Some(SearchAttributes::default()),
..Default::default()
},
)
.expect_err("continue_as_new should terminate the workflow");
let WorkflowTermination::ContinueAsNew(cmd) = termination else {
unreachable!()
};
assert_eq!(cmd.search_attributes, Some(SearchAttributes::default()));
}
#[test]
fn workflow_context_continue_as_new_applies_auto_upgrade_versioning_behavior() {
let ctx = test_context();
let termination = ctx
.continue_as_new(
&13,
ContinueAsNewOptions {
initial_versioning_behavior: Some(ContinueAsNewVersioningBehavior::AutoUpgrade),
..Default::default()
},
)
.expect_err("continue_as_new should terminate the workflow");
let WorkflowTermination::ContinueAsNew(cmd) = termination else {
unreachable!()
};
assert_eq!(
cmd.initial_versioning_behavior,
ProtoContinueAsNewVersioningBehavior::AutoUpgrade as i32
);
}
#[test]
fn continue_as_new_reports_serialization_errors() {
#[derive(Debug)]
struct FailingInput;
impl TemporalSerializable for FailingInput {
fn to_payload(
&self,
_ctx: &temporalio_common_wasm::data_converters::SerializationContext<'_>,
) -> Result<Payload, temporalio_common_wasm::data_converters::PayloadConversionError>
{
Err(
temporalio_common_wasm::data_converters::PayloadConversionError::EncodingError(
std::io::Error::other("serialization failure").into(),
),
)
}
}
impl TemporalDeserializable for FailingInput {
fn from_payload(
_ctx: &temporalio_common_wasm::data_converters::SerializationContext<'_>,
_payload: Payload,
) -> Result<Self, temporalio_common_wasm::data_converters::PayloadConversionError>
{
unreachable!("test input is only serialized")
}
}
#[workflow]
#[derive(Default)]
struct FailingWorkflow;
#[workflow_methods]
impl FailingWorkflow {
#[run]
async fn run(
_ctx: &mut WorkflowContext<Self>,
_input: FailingInput,
) -> crate::WorkflowResult<()> {
unreachable!("test workflow run should not be polled")
}
}
let init = InitializeWorkflow {
workflow_type: "failing-workflow".to_string(),
..Default::default()
};
let base = BaseWorkflowContext::new(
"default".to_string(),
"orig-task-queue".to_string(),
"run-id".to_string(),
init,
DataConverter::default(),
Rc::new(NoopHost),
);
let ctx = WorkflowContext::from_base(base, Rc::new(RefCell::new(FailingWorkflow)));
let err = ctx
.continue_as_new(&FailingInput, ContinueAsNewOptions::default())
.expect_err("serialization errors should be surfaced");
let WorkflowTermination::Failed(err) = err else {
panic!("expected failed termination, got {err:?}");
};
assert_eq!(err.to_string(), "Encoding error: serialization failure");
}
}