mod options;
pub use options::{
ActivityCloseTimeouts, ActivityOptions, ChildWorkflowOptions, ContinueAsNewOptions,
LocalActivityOptions, NexusOperationOptions, Signal, SignalData, TimerOptions,
};
pub use temporalio_common::protos::coresdk::child_workflow::StartChildWorkflowExecutionFailedCause;
use crate::{
CancelExternalWfResult, CancellableID, CancellableIDWithReason, CommandCreateRequest,
CommandSubscribeChildWorkflowCompletion, NexusStartResult, RustWfCmd, SignalExternalWfResult,
SupportsCancelReason, TimerResult, UnblockEvent, Unblockable, WorkflowTermination,
workflow_context::options::IntoWorkflowCommand, workflow_executor::SdkWakeGuard,
};
use futures_util::{
FutureExt,
future::{FusedFuture, Shared},
task::Context,
};
use std::{
cell::{Cell, Ref, RefCell},
collections::HashMap,
future::{self, Future},
marker::PhantomData,
ops::{Deref, DerefMut},
pin::Pin,
rc::Rc,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{Receiver, Sender},
},
task::{Poll, Waker},
time::{Duration, SystemTime},
};
use temporalio_common::{
ActivityDefinition, SignalDefinition, WorkflowDefinition,
data_converters::{
ActivityExecutionDecodeHint, ChildWorkflowExecutionDecodeHint,
ChildWorkflowSignalDecodeHint, ChildWorkflowStartDecodeHint, DataConverter,
GenericPayloadConverter, PayloadConversionError, PayloadConverter, SerializationContext,
SerializationContextData, TemporalDeserializable,
},
error::{
ActivityExecutionError, ChildWorkflowExecutionError, ChildWorkflowSignalError,
ChildWorkflowStartError,
},
protos::{
coresdk::{
activity_result::{ActivityResolution, Cancellation, activity_resolution},
child_workflow::ChildWorkflowResult,
common::NamespacedWorkflowExecution,
nexus::NexusOperationResult,
workflow_activation::{
InitializeWorkflow,
resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus,
},
workflow_commands::{
CancelChildWorkflowExecution, ModifyWorkflowProperties,
RequestCancelExternalWorkflowExecution, SetPatchMarker,
SignalExternalWorkflowExecution, StartTimer, UpsertWorkflowSearchAttributes,
WorkflowCommand, signal_external_workflow_execution as sig_we, workflow_command,
},
},
temporal::api::{
common::v1::{Memo, Payload, SearchAttributes},
failure::v1::{CanceledFailureInfo, Failure, failure::FailureInfo},
sdk::v1::UserMetadata,
},
},
worker::WorkerDeploymentVersion,
};
use tokio::sync::{oneshot, watch};
#[derive(Clone)]
pub struct BaseWorkflowContext {
inner: Rc<WorkflowContextInner>,
}
impl BaseWorkflowContext {
pub(crate) fn shared_mut(&self) -> impl DerefMut<Target = WorkflowContextSharedData> {
self.inner.shared.borrow_mut()
}
pub(crate) fn view(&self) -> WorkflowContextView {
WorkflowContextView::new(
self.inner.namespace.clone(),
self.inner.task_queue.clone(),
self.inner.run_id.clone(),
&self.inner.inital_information,
)
}
}
struct WorkflowContextInner {
namespace: String,
task_queue: String,
run_id: String,
inital_information: InitializeWorkflow,
chan: Sender<RustWfCmd>,
am_cancelled: watch::Receiver<Option<String>>,
shared: RefCell<WorkflowContextSharedData>,
seq_nums: RefCell<WfCtxProtectedDat>,
data_converter: DataConverter,
state_mutated: Cell<bool>,
}
pub struct SyncWorkflowContext<W> {
base: BaseWorkflowContext,
headers: Rc<HashMap<String, Payload>>,
_phantom: PhantomData<W>,
}
impl<W> Clone for SyncWorkflowContext<W> {
fn clone(&self) -> Self {
Self {
base: self.base.clone(),
headers: self.headers.clone(),
_phantom: PhantomData,
}
}
}
pub struct WorkflowContext<W> {
sync: SyncWorkflowContext<W>,
workflow_state: Rc<RefCell<W>>,
condition_wakers: Rc<RefCell<Vec<Waker>>>,
}
impl<W> Clone for WorkflowContext<W> {
fn clone(&self) -> Self {
Self {
sync: self.sync.clone(),
workflow_state: self.workflow_state.clone(),
condition_wakers: self.condition_wakers.clone(),
}
}
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct WorkflowContextView {
pub workflow_id: String,
pub run_id: String,
pub workflow_type: String,
pub task_queue: String,
pub namespace: String,
pub attempt: u32,
pub first_execution_run_id: String,
pub continued_from_run_id: Option<String>,
pub start_time: Option<SystemTime>,
pub execution_timeout: Option<Duration>,
pub run_timeout: Option<Duration>,
pub task_timeout: Option<Duration>,
pub parent: Option<ParentWorkflowInfo>,
pub root: Option<RootWorkflowInfo>,
pub retry_policy: Option<temporalio_common::protos::temporal::api::common::v1::RetryPolicy>,
pub cron_schedule: Option<String>,
pub memo: Option<Memo>,
pub search_attributes: Option<SearchAttributes>,
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct ParentWorkflowInfo {
pub workflow_id: String,
pub run_id: String,
pub namespace: String,
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct RootWorkflowInfo {
pub workflow_id: String,
pub run_id: String,
}
impl WorkflowContextView {
pub(crate) fn new(
namespace: String,
task_queue: String,
run_id: String,
init: &InitializeWorkflow,
) -> Self {
let parent = init
.parent_workflow_info
.as_ref()
.map(|p| ParentWorkflowInfo {
workflow_id: p.workflow_id.clone(),
run_id: p.run_id.clone(),
namespace: p.namespace.clone(),
});
let root = init.root_workflow.as_ref().map(|r| RootWorkflowInfo {
workflow_id: r.workflow_id.clone(),
run_id: r.run_id.clone(),
});
let continued_from_run_id = if init.continued_from_execution_run_id.is_empty() {
None
} else {
Some(init.continued_from_execution_run_id.clone())
};
let cron_schedule = if init.cron_schedule.is_empty() {
None
} else {
Some(init.cron_schedule.clone())
};
Self {
workflow_id: init.workflow_id.clone(),
run_id,
workflow_type: init.workflow_type.clone(),
task_queue,
namespace,
attempt: init.attempt as u32,
first_execution_run_id: init.first_execution_run_id.clone(),
continued_from_run_id,
start_time: init.start_time.and_then(|t| t.try_into().ok()),
execution_timeout: init
.workflow_execution_timeout
.and_then(|d| d.try_into().ok()),
run_timeout: init.workflow_run_timeout.and_then(|d| d.try_into().ok()),
task_timeout: init.workflow_task_timeout.and_then(|d| d.try_into().ok()),
parent,
root,
retry_policy: init.retry_policy.clone(),
cron_schedule,
memo: init.memo.clone(),
search_attributes: init.search_attributes.clone(),
}
}
}
impl BaseWorkflowContext {
pub(crate) fn new(
namespace: String,
task_queue: String,
run_id: String,
init_workflow_job: InitializeWorkflow,
am_cancelled: watch::Receiver<Option<String>>,
data_converter: DataConverter,
) -> (Self, Receiver<RustWfCmd>) {
let (chan, rx) = std::sync::mpsc::channel();
(
Self {
inner: Rc::new(WorkflowContextInner {
namespace,
task_queue,
run_id,
shared: RefCell::new(WorkflowContextSharedData {
random_seed: init_workflow_job.randomness_seed,
search_attributes: init_workflow_job
.search_attributes
.clone()
.unwrap_or_default(),
..Default::default()
}),
inital_information: init_workflow_job,
chan,
am_cancelled,
seq_nums: RefCell::new(WfCtxProtectedDat {
next_timer_sequence_number: 1,
next_activity_sequence_number: 1,
next_child_workflow_sequence_number: 1,
next_cancel_external_wf_sequence_number: 1,
next_signal_external_wf_sequence_number: 1,
next_nexus_op_sequence_number: 1,
}),
data_converter,
state_mutated: Cell::new(false),
}),
},
rx,
)
}
pub(crate) fn send(&self, c: RustWfCmd) {
self.inner.chan.send(c).expect("command channel intact");
}
pub(crate) fn take_state_mutated(&self) -> bool {
self.inner.state_mutated.replace(false)
}
pub(crate) fn set_state_mutated(&self) {
self.inner.state_mutated.set(true);
}
pub(crate) fn current_details(&self) -> String {
self.inner.shared.borrow().current_details.clone()
}
fn cancel(&self, cancellable_id: CancellableID) {
self.send(RustWfCmd::Cancel(cancellable_id));
}
pub fn timer<T: Into<TimerOptions>>(
&self,
opts: T,
) -> impl CancellableFuture<TimerResult> + use<T> {
let opts: TimerOptions = opts.into();
let seq = self.inner.seq_nums.borrow_mut().next_timer_seq();
let (cmd, unblocker) =
CancellableWFCommandFut::new(CancellableID::Timer(seq), self.clone());
let payload_converter = PayloadConverter::default();
let context = SerializationContext {
data: &SerializationContextData::Workflow,
converter: &payload_converter,
};
self.send(
CommandCreateRequest {
cmd: WorkflowCommand {
variant: Some(
StartTimer {
seq,
start_to_fire_timeout: Some(
opts.duration
.try_into()
.expect("Durations must fit into 64 bits"),
),
}
.into(),
),
user_metadata: Some(UserMetadata {
summary: opts.summary.map(|summary| {
payload_converter
.to_payload(&context, &summary)
.expect("String-to-JSON payload serialization is infallible")
}),
details: None,
}),
},
unblocker,
}
.into(),
);
cmd
}
pub fn start_activity<AD: ActivityDefinition>(
&self,
_activity: AD,
input: impl Into<AD::Input>,
mut opts: ActivityOptions,
) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
where
AD::Output: TemporalDeserializable,
{
let input = input.into();
let payload_converter = self.inner.data_converter.payload_converter();
let ctx = SerializationContext {
data: &SerializationContextData::Workflow,
converter: payload_converter,
};
let payloads = match payload_converter.to_payloads(&ctx, &input) {
Ok(p) => p,
Err(e) => {
return ActivityFut::eager(e.into());
}
};
let seq = self.inner.seq_nums.borrow_mut().next_activity_seq();
let (cmd, unblocker) =
CancellableWFCommandFut::new(CancellableID::Activity(seq), self.clone());
if opts.task_queue.is_none() {
opts.task_queue = Some(self.inner.task_queue.clone());
}
self.send(
CommandCreateRequest {
cmd: opts.into_command(AD::name().to_string(), payloads, seq),
unblocker,
}
.into(),
);
ActivityFut::running(cmd, self.inner.data_converter.clone())
}
pub fn start_local_activity<AD: ActivityDefinition>(
&self,
_activity: AD,
input: impl Into<AD::Input>,
opts: LocalActivityOptions,
) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
where
AD::Output: TemporalDeserializable,
{
let input = input.into();
let payload_converter = self.inner.data_converter.payload_converter();
let ctx = SerializationContext {
data: &SerializationContextData::Workflow,
converter: payload_converter,
};
let payloads = match payload_converter.to_payloads(&ctx, &input) {
Ok(p) => p,
Err(e) => {
return ActivityFut::eager(e.into());
}
};
ActivityFut::running(
LATimerBackoffFut::new(AD::name().to_string(), payloads, opts, self.clone()),
self.inner.data_converter.clone(),
)
}
fn child_workflow<WD: WorkflowDefinition>(
&self,
workflow: WD,
input: impl Into<WD::Input>,
opts: ChildWorkflowOptions,
) -> impl CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
where
WD::Output: TemporalDeserializable,
{
let input = input.into();
let payload_converter = self.inner.data_converter.payload_converter();
let ctx = SerializationContext {
data: &SerializationContextData::Workflow,
converter: payload_converter,
};
let payloads = match payload_converter.to_payloads(&ctx, &input) {
Ok(p) => p,
Err(e) => {
return ChildWorkflowStartFut::eager(e.into());
}
};
let workflow_type = workflow.name().to_string();
let child_seq = self.inner.seq_nums.borrow_mut().next_child_workflow_seq();
let (result_cmd, unblocker) = CancellableWFCommandFut::new(
CancellableIDWithReason::ChildWorkflow { seqnum: child_seq },
self.clone(),
);
self.send(
CommandSubscribeChildWorkflowCompletion {
seq: child_seq,
unblocker,
}
.into(),
);
let common = ChildWfCommon {
workflow_id: opts.workflow_id.clone(),
child_seq,
result_future: result_cmd,
base_ctx: self.clone(),
data_converter: self.inner.data_converter.clone(),
};
let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat(
CancellableIDWithReason::ChildWorkflow { seqnum: child_seq },
common,
self.clone(),
);
self.send(
CommandCreateRequest {
cmd: opts.into_command(workflow_type, payloads, child_seq),
unblocker,
}
.into(),
);
ChildWorkflowStartFut::Running(cmd)
}
fn local_activity_no_timer_retry(
self,
activity_type: String,
arguments: Vec<Payload>,
opts: LocalActivityOptions,
) -> impl CancellableFuture<ActivityResolution> {
let seq = self.inner.seq_nums.borrow_mut().next_activity_seq();
let (cmd, unblocker) =
CancellableWFCommandFut::new(CancellableID::LocalActivity(seq), self.clone());
self.inner
.chan
.send(
CommandCreateRequest {
cmd: opts.into_command(activity_type, arguments, seq),
unblocker,
}
.into(),
)
.expect("command channel intact");
cmd
}
fn send_signal_wf(
self,
target: sig_we::Target,
signal: Signal,
) -> impl CancellableFuture<SignalExternalWfResult> {
let seq = self
.inner
.seq_nums
.borrow_mut()
.next_signal_external_wf_seq();
let (cmd, unblocker) =
CancellableWFCommandFut::new(CancellableID::SignalExternalWorkflow(seq), self.clone());
self.send(
CommandCreateRequest {
cmd: WorkflowCommand {
variant: Some(
SignalExternalWorkflowExecution {
seq,
signal_name: signal.signal_name,
args: signal.data.input,
target: Some(target),
headers: signal.data.headers,
}
.into(),
),
user_metadata: None,
},
unblocker,
}
.into(),
);
cmd
}
}
impl<W> SyncWorkflowContext<W> {
pub fn workflow_id(&self) -> &str {
&self.base.inner.inital_information.workflow_id
}
pub fn run_id(&self) -> &str {
&self.base.inner.run_id
}
pub fn namespace(&self) -> &str {
&self.base.inner.namespace
}
pub fn task_queue(&self) -> &str {
&self.base.inner.task_queue
}
pub fn workflow_time(&self) -> Option<SystemTime> {
self.base.inner.shared.borrow().wf_time
}
pub fn history_length(&self) -> u32 {
self.base.inner.shared.borrow().history_length
}
pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
self.base
.inner
.shared
.borrow()
.current_deployment_version
.clone()
}
pub fn search_attributes(&self) -> impl Deref<Target = SearchAttributes> + '_ {
Ref::map(self.base.inner.shared.borrow(), |s| &s.search_attributes)
}
pub fn random_seed(&self) -> u64 {
self.base.inner.shared.borrow().random_seed
}
pub fn is_replaying(&self) -> bool {
self.base.inner.shared.borrow().is_replaying
}
pub fn continue_as_new_suggested(&self) -> bool {
self.base.inner.shared.borrow().continue_as_new_suggested
}
pub fn headers(&self) -> &HashMap<String, Payload> {
&self.headers
}
pub fn payload_converter(&self) -> &PayloadConverter {
self.base.inner.data_converter.payload_converter()
}
pub fn workflow_initial_info(&self) -> &InitializeWorkflow {
&self.base.inner.inital_information
}
pub fn cancelled(&self) -> impl FusedFuture<Output = String> + '_ {
let am_cancelled = self.base.inner.am_cancelled.clone();
async move {
if let Some(s) = am_cancelled.borrow().as_ref() {
return s.clone();
}
am_cancelled
.clone()
.changed()
.await
.expect("Cancelled send half not dropped");
am_cancelled.borrow().as_ref().cloned().unwrap_or_default()
}
.fuse()
}
pub fn continue_as_new(
&self,
input: &<W::Run as WorkflowDefinition>::Input,
opts: ContinueAsNewOptions,
) -> Result<std::convert::Infallible, WorkflowTermination>
where
W: crate::workflows::WorkflowImplementation,
{
let pc = self.base.inner.data_converter.payload_converter();
let ctx = SerializationContext {
data: &SerializationContextData::Workflow,
converter: pc,
};
let arguments = pc
.to_payloads(&ctx, input)
.map_err(WorkflowTermination::from)?;
let workflow_type = self.workflow_initial_info().workflow_type.clone();
let proto = opts.into_proto(workflow_type, arguments);
Err(WorkflowTermination::continue_as_new(proto))
}
pub fn timer<T: Into<TimerOptions>>(&self, opts: T) -> impl CancellableFuture<TimerResult> {
self.base.timer(opts)
}
pub fn start_activity<AD: ActivityDefinition>(
&self,
activity: AD,
input: impl Into<AD::Input>,
opts: ActivityOptions,
) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
where
AD::Output: TemporalDeserializable,
{
self.base.start_activity(activity, input, opts)
}
pub fn start_local_activity<AD: ActivityDefinition>(
&self,
activity: AD,
input: impl Into<AD::Input>,
opts: LocalActivityOptions,
) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
where
AD::Output: TemporalDeserializable,
{
self.base.start_local_activity(activity, input, opts)
}
pub fn child_workflow<WD: WorkflowDefinition>(
&self,
workflow: WD,
input: impl Into<WD::Input>,
opts: ChildWorkflowOptions,
) -> impl CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
where
WD::Output: TemporalDeserializable,
{
self.base.child_workflow(workflow, input, opts)
}
pub fn patched(&self, patch_id: &str) -> bool {
self.patch_impl(patch_id, false)
}
pub fn deprecate_patch(&self, patch_id: &str) -> bool {
self.patch_impl(patch_id, true)
}
fn patch_impl(&self, patch_id: &str, deprecated: bool) -> bool {
self.base.send(
workflow_command::Variant::SetPatchMarker(SetPatchMarker {
patch_id: patch_id.to_string(),
deprecated,
})
.into(),
);
if let Some(present) = self.base.inner.shared.borrow().changes.get(patch_id) {
return *present;
}
let res = !self.base.inner.shared.borrow().is_replaying;
self.base
.inner
.shared
.borrow_mut()
.changes
.insert(patch_id.to_string(), res);
res
}
pub fn external_workflow(
&self,
workflow_id: impl Into<String>,
run_id: Option<String>,
) -> ExternalWorkflowHandle {
ExternalWorkflowHandle {
workflow_id: workflow_id.into(),
run_id,
namespace: self.base.inner.namespace.clone(),
base_ctx: self.base.clone(),
}
}
pub fn upsert_search_attributes(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
self.base.send(RustWfCmd::NewNonblockingCmd(
workflow_command::Variant::UpsertWorkflowSearchAttributes(
UpsertWorkflowSearchAttributes {
search_attributes: Some(SearchAttributes {
indexed_fields: HashMap::from_iter(attr_iter),
}),
},
),
))
}
pub fn upsert_memo(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
self.base.send(RustWfCmd::NewNonblockingCmd(
workflow_command::Variant::ModifyWorkflowProperties(ModifyWorkflowProperties {
upserted_memo: Some(Memo {
fields: HashMap::from_iter(attr_iter),
}),
}),
))
}
pub fn set_current_details(&self, details: impl Into<String>) {
self.base.inner.shared.borrow_mut().current_details = details.into();
}
pub fn force_task_fail(&self, with: anyhow::Error) {
self.base.send(with.into());
}
pub fn start_nexus_operation(
&self,
opts: NexusOperationOptions,
) -> impl CancellableFuture<NexusStartResult> {
let seq = self.base.inner.seq_nums.borrow_mut().next_nexus_op_seq();
let (result_future, unblocker) = WFCommandFut::new();
self.base
.send(RustWfCmd::SubscribeNexusOperationCompletion { seq, unblocker });
let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat(
CancellableID::NexusOp(seq),
NexusUnblockData {
result_future: result_future.shared(),
schedule_seq: seq,
base_ctx: self.base.clone(),
},
self.base.clone(),
);
self.base.send(
CommandCreateRequest {
cmd: opts.into_command(seq),
unblocker,
}
.into(),
);
cmd
}
pub(crate) fn view(&self) -> WorkflowContextView {
self.base.view()
}
}
impl<W> WorkflowContext<W> {
pub(crate) fn from_base(base: BaseWorkflowContext, workflow_state: Rc<RefCell<W>>) -> Self {
Self {
sync: SyncWorkflowContext {
base,
headers: Rc::new(HashMap::new()),
_phantom: PhantomData,
},
workflow_state,
condition_wakers: Rc::new(RefCell::new(Vec::new())),
}
}
pub(crate) fn with_headers(&self, headers: HashMap<String, Payload>) -> Self {
Self {
sync: SyncWorkflowContext {
base: self.sync.base.clone(),
headers: Rc::new(headers),
_phantom: PhantomData,
},
workflow_state: self.workflow_state.clone(),
condition_wakers: self.condition_wakers.clone(),
}
}
pub(crate) fn sync_context(&self) -> SyncWorkflowContext<W> {
self.sync.clone()
}
pub fn workflow_id(&self) -> &str {
self.sync.workflow_id()
}
pub fn run_id(&self) -> &str {
self.sync.run_id()
}
pub fn namespace(&self) -> &str {
self.sync.namespace()
}
pub fn task_queue(&self) -> &str {
self.sync.task_queue()
}
pub fn workflow_time(&self) -> Option<SystemTime> {
self.sync.workflow_time()
}
pub fn history_length(&self) -> u32 {
self.sync.history_length()
}
pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
self.sync.current_deployment_version()
}
pub fn search_attributes(&self) -> impl Deref<Target = SearchAttributes> + '_ {
self.sync.search_attributes()
}
pub fn random_seed(&self) -> u64 {
self.sync.random_seed()
}
pub fn is_replaying(&self) -> bool {
self.sync.is_replaying()
}
pub fn continue_as_new_suggested(&self) -> bool {
self.sync.continue_as_new_suggested()
}
pub fn headers(&self) -> &HashMap<String, Payload> {
self.sync.headers()
}
pub fn payload_converter(&self) -> &PayloadConverter {
self.sync.payload_converter()
}
pub fn workflow_initial_info(&self) -> &InitializeWorkflow {
self.sync.workflow_initial_info()
}
pub fn cancelled(&self) -> impl FusedFuture<Output = String> + '_ {
self.sync.cancelled()
}
pub fn timer<T: Into<TimerOptions>>(&self, opts: T) -> impl CancellableFuture<TimerResult> {
self.sync.timer(opts)
}
pub fn start_activity<AD: ActivityDefinition>(
&self,
activity: AD,
input: impl Into<AD::Input>,
opts: ActivityOptions,
) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
where
AD::Output: TemporalDeserializable,
{
self.sync.start_activity(activity, input, opts)
}
pub fn start_local_activity<AD: ActivityDefinition>(
&self,
activity: AD,
input: impl Into<AD::Input>,
opts: LocalActivityOptions,
) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
where
AD::Output: TemporalDeserializable,
{
self.sync.start_local_activity(activity, input, opts)
}
pub fn child_workflow<WD: WorkflowDefinition>(
&self,
workflow: WD,
input: impl Into<WD::Input>,
opts: ChildWorkflowOptions,
) -> impl CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
where
WD::Output: TemporalDeserializable,
{
self.sync.child_workflow(workflow, input, opts)
}
pub fn patched(&self, patch_id: &str) -> bool {
self.sync.patched(patch_id)
}
pub fn deprecate_patch(&self, patch_id: &str) -> bool {
self.sync.deprecate_patch(patch_id)
}
pub fn external_workflow(
&self,
workflow_id: impl Into<String>,
run_id: Option<String>,
) -> ExternalWorkflowHandle {
self.sync.external_workflow(workflow_id, run_id)
}
pub fn upsert_search_attributes(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
self.sync.upsert_search_attributes(attr_iter)
}
pub fn upsert_memo(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
self.sync.upsert_memo(attr_iter)
}
pub fn set_current_details(&self, details: impl Into<String>) {
self.sync.set_current_details(details)
}
pub fn force_task_fail(&self, with: anyhow::Error) {
self.sync.force_task_fail(with)
}
pub fn start_nexus_operation(
&self,
opts: NexusOperationOptions,
) -> impl CancellableFuture<NexusStartResult> {
self.sync.start_nexus_operation(opts)
}
pub(crate) fn view(&self) -> WorkflowContextView {
self.sync.view()
}
pub fn state<R>(&self, f: impl FnOnce(&W) -> R) -> R {
f(&*self.workflow_state.borrow())
}
pub fn state_mut<R>(&self, f: impl FnOnce(&mut W) -> R) -> R {
let result = f(&mut *self.workflow_state.borrow_mut());
let _guard = SdkWakeGuard::new();
for waker in self.condition_wakers.borrow_mut().drain(..) {
waker.wake();
}
self.sync.base.set_state_mutated();
result
}
pub fn continue_as_new(
&self,
input: &<W::Run as WorkflowDefinition>::Input,
opts: ContinueAsNewOptions,
) -> Result<std::convert::Infallible, WorkflowTermination>
where
W: crate::workflows::WorkflowImplementation,
{
self.sync.continue_as_new(input, opts)
}
pub fn wait_condition<'a>(
&'a self,
mut condition: impl FnMut(&W) -> bool + 'a,
) -> impl FusedFuture<Output = ()> + 'a {
future::poll_fn(move |cx: &mut Context<'_>| {
if condition(&*self.workflow_state.borrow()) {
Poll::Ready(())
} else {
self.condition_wakers.borrow_mut().push(cx.waker().clone());
Poll::Pending
}
})
.fuse()
}
}
struct WfCtxProtectedDat {
next_timer_sequence_number: u32,
next_activity_sequence_number: u32,
next_child_workflow_sequence_number: u32,
next_cancel_external_wf_sequence_number: u32,
next_signal_external_wf_sequence_number: u32,
next_nexus_op_sequence_number: u32,
}
impl WfCtxProtectedDat {
fn next_timer_seq(&mut self) -> u32 {
let seq = self.next_timer_sequence_number;
self.next_timer_sequence_number += 1;
seq
}
fn next_activity_seq(&mut self) -> u32 {
let seq = self.next_activity_sequence_number;
self.next_activity_sequence_number += 1;
seq
}
fn next_child_workflow_seq(&mut self) -> u32 {
let seq = self.next_child_workflow_sequence_number;
self.next_child_workflow_sequence_number += 1;
seq
}
fn next_cancel_external_wf_seq(&mut self) -> u32 {
let seq = self.next_cancel_external_wf_sequence_number;
self.next_cancel_external_wf_sequence_number += 1;
seq
}
fn next_signal_external_wf_seq(&mut self) -> u32 {
let seq = self.next_signal_external_wf_sequence_number;
self.next_signal_external_wf_sequence_number += 1;
seq
}
fn next_nexus_op_seq(&mut self) -> u32 {
let seq = self.next_nexus_op_sequence_number;
self.next_nexus_op_sequence_number += 1;
seq
}
}
#[derive(Clone, Debug, Default)]
pub(crate) struct WorkflowContextSharedData {
pub(crate) changes: HashMap<String, bool>,
pub(crate) is_replaying: bool,
pub(crate) wf_time: Option<SystemTime>,
pub(crate) history_length: u32,
pub(crate) continue_as_new_suggested: bool,
pub(crate) current_deployment_version: Option<WorkerDeploymentVersion>,
pub(crate) search_attributes: SearchAttributes,
pub(crate) random_seed: u64,
pub(crate) current_details: String,
}
pub trait CancellableFuture<T>: Future<Output = T> + FusedFuture {
fn cancel(&self);
}
pub trait CancellableFutureWithReason<T>: CancellableFuture<T> {
fn cancel_with_reason(&self, reason: String);
}
struct WFCommandFut<T, D> {
_unused: PhantomData<T>,
result_rx: oneshot::Receiver<UnblockEvent>,
other_dat: Option<D>,
}
impl<T> WFCommandFut<T, ()> {
fn new() -> (Self, oneshot::Sender<UnblockEvent>) {
Self::new_with_dat(())
}
}
impl<T, D> WFCommandFut<T, D> {
fn new_with_dat(other_dat: D) -> (Self, oneshot::Sender<UnblockEvent>) {
let (tx, rx) = oneshot::channel();
(
Self {
_unused: PhantomData,
result_rx: rx,
other_dat: Some(other_dat),
},
tx,
)
}
}
impl<T, D> Unpin for WFCommandFut<T, D> where T: Unblockable<OtherDat = D> {}
impl<T, D> Future for WFCommandFut<T, D>
where
T: Unblockable<OtherDat = D>,
{
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.result_rx.poll_unpin(cx).map(|x| {
let od = self
.other_dat
.take()
.expect("Other data must exist when resolving command future");
Unblockable::unblock(x.unwrap(), od)
})
}
}
impl<T, D> FusedFuture for WFCommandFut<T, D>
where
T: Unblockable<OtherDat = D>,
{
fn is_terminated(&self) -> bool {
self.other_dat.is_none()
}
}
struct CancellableWFCommandFut<T, D, ID = CancellableID> {
cmd_fut: WFCommandFut<T, D>,
cancellable_id: ID,
base_ctx: BaseWorkflowContext,
}
impl<T, ID> CancellableWFCommandFut<T, (), ID> {
fn new(
cancellable_id: ID,
base_ctx: BaseWorkflowContext,
) -> (Self, oneshot::Sender<UnblockEvent>) {
Self::new_with_dat(cancellable_id, (), base_ctx)
}
}
impl<T, D, ID> CancellableWFCommandFut<T, D, ID> {
fn new_with_dat(
cancellable_id: ID,
other_dat: D,
base_ctx: BaseWorkflowContext,
) -> (Self, oneshot::Sender<UnblockEvent>) {
let (cmd_fut, sender) = WFCommandFut::new_with_dat(other_dat);
(
Self {
cmd_fut,
cancellable_id,
base_ctx,
},
sender,
)
}
}
impl<T, D, ID> Unpin for CancellableWFCommandFut<T, D, ID> where T: Unblockable<OtherDat = D> {}
impl<T, D, ID> Future for CancellableWFCommandFut<T, D, ID>
where
T: Unblockable<OtherDat = D>,
{
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.cmd_fut.poll_unpin(cx)
}
}
impl<T, D, ID> FusedFuture for CancellableWFCommandFut<T, D, ID>
where
T: Unblockable<OtherDat = D>,
{
fn is_terminated(&self) -> bool {
self.cmd_fut.is_terminated()
}
}
impl<T, D, ID> CancellableFuture<T> for CancellableWFCommandFut<T, D, ID>
where
T: Unblockable<OtherDat = D>,
ID: Clone + Into<CancellableID>,
{
fn cancel(&self) {
self.base_ctx.cancel(self.cancellable_id.clone().into());
}
}
impl<T, D> CancellableFutureWithReason<T> for CancellableWFCommandFut<T, D, CancellableIDWithReason>
where
T: Unblockable<OtherDat = D>,
{
fn cancel_with_reason(&self, reason: String) {
let new_id = self.cancellable_id.clone().with_reason(reason);
self.base_ctx.cancel(new_id);
}
}
struct LATimerBackoffFut {
la_opts: LocalActivityOptions,
activity_type: String,
arguments: Vec<Payload>,
current_fut: Pin<Box<dyn CancellableFuture<ActivityResolution> + Unpin>>,
timer_fut: Option<Pin<Box<dyn CancellableFuture<TimerResult> + Unpin>>>,
base_ctx: BaseWorkflowContext,
next_attempt: u32,
next_sched_time: Option<prost_types::Timestamp>,
did_cancel: AtomicBool,
terminated: bool,
}
impl LATimerBackoffFut {
pub(crate) fn new(
activity_type: String,
arguments: Vec<Payload>,
opts: LocalActivityOptions,
base_ctx: BaseWorkflowContext,
) -> Self {
let current_fut = Box::pin(base_ctx.clone().local_activity_no_timer_retry(
activity_type.clone(),
arguments.clone(),
opts.clone(),
));
Self {
la_opts: opts,
activity_type,
arguments,
current_fut,
timer_fut: None,
base_ctx,
next_attempt: 1,
next_sched_time: None,
did_cancel: AtomicBool::new(false),
terminated: false,
}
}
}
impl Unpin for LATimerBackoffFut {}
impl Future for LATimerBackoffFut {
type Output = ActivityResolution;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(tf) = self.timer_fut.as_mut() {
return match tf.poll_unpin(cx) {
Poll::Ready(tr) => {
self.timer_fut = None;
if let TimerResult::Fired = tr {
let mut opts = self.la_opts.clone();
opts.attempt = Some(self.next_attempt);
opts.original_schedule_time
.clone_from(&self.next_sched_time);
self.current_fut =
Box::pin(self.base_ctx.clone().local_activity_no_timer_retry(
self.activity_type.clone(),
self.arguments.clone(),
opts,
));
Poll::Pending
} else {
self.terminated = true;
Poll::Ready(ActivityResolution {
status: Some(activity_resolution::Status::Cancelled(Cancellation {
failure: Some(Failure {
message: "Activity cancelled".to_owned(),
failure_info: Some(FailureInfo::CanceledFailureInfo(
CanceledFailureInfo::default(),
)),
..Default::default()
}),
})),
})
}
}
Poll::Pending => Poll::Pending,
};
}
let poll_res = self.current_fut.poll_unpin(cx);
if let Poll::Ready(ref r) = poll_res
&& let Some(activity_resolution::Status::Backoff(b)) = r.status.as_ref()
{
if self.did_cancel.load(Ordering::Acquire) {
self.terminated = true;
return Poll::Ready(ActivityResolution {
status: Some(activity_resolution::Status::Cancelled(Cancellation {
failure: Some(Failure {
message: "Activity cancelled".to_owned(),
failure_info: Some(FailureInfo::CanceledFailureInfo(
CanceledFailureInfo::default(),
)),
..Default::default()
}),
})),
});
}
let timer_f = self.base_ctx.timer::<Duration>(
b.backoff_duration
.expect("Duration is set")
.try_into()
.expect("duration converts ok"),
);
self.timer_fut = Some(Box::pin(timer_f));
self.next_attempt = b.attempt;
self.next_sched_time.clone_from(&b.original_schedule_time);
return Poll::Pending;
}
if poll_res.is_ready() {
self.terminated = true;
}
poll_res
}
}
impl FusedFuture for LATimerBackoffFut {
fn is_terminated(&self) -> bool {
self.terminated
}
}
impl CancellableFuture<ActivityResolution> for LATimerBackoffFut {
fn cancel(&self) {
self.did_cancel.store(true, Ordering::Release);
if let Some(tf) = self.timer_fut.as_ref() {
tf.cancel();
}
self.current_fut.cancel();
}
}
enum ActivityFut<F, Output> {
Errored {
error: Option<Box<ActivityExecutionError>>,
_phantom: PhantomData<Output>,
},
Running {
inner: F,
data_converter: DataConverter,
_phantom: PhantomData<Output>,
},
Terminated,
}
impl<F, Output> ActivityFut<F, Output> {
fn eager(err: ActivityExecutionError) -> Self {
Self::Errored {
error: Some(Box::new(err)),
_phantom: PhantomData,
}
}
fn running(inner: F, data_converter: DataConverter) -> Self {
Self::Running {
inner,
data_converter,
_phantom: PhantomData,
}
}
}
impl<F, Output> Unpin for ActivityFut<F, Output> where F: Unpin {}
impl<F, Output> Future for ActivityFut<F, Output>
where
F: Future<Output = ActivityResolution> + Unpin,
Output: TemporalDeserializable + 'static,
{
type Output = Result<Output, ActivityExecutionError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let poll = match this {
ActivityFut::Errored { error, .. } => {
Poll::Ready(Err(*error.take().expect("polled after completion")))
}
ActivityFut::Running {
inner,
data_converter,
..
} => match Pin::new(inner).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(resolution) => Poll::Ready({
let status = resolution.status.ok_or_else(|| {
data_converter
.to_error(
&SerializationContextData::Workflow,
Failure {
message: "Activity completed without a status".to_string(),
..Default::default()
},
ActivityExecutionDecodeHint { cancelled: false },
)
.expect("synthetic activity failure should decode")
})?;
match status {
activity_resolution::Status::Completed(success) => {
let payload = success.result.unwrap_or_default();
let ctx = SerializationContext {
data: &SerializationContextData::Workflow,
converter: data_converter.payload_converter(),
};
data_converter
.payload_converter()
.from_payload::<Output>(&ctx, payload)
.map_err(ActivityExecutionError::Serialization)
}
activity_resolution::Status::Failed(f) => Err(data_converter.to_error(
&SerializationContextData::Workflow,
f.failure.unwrap_or_default(),
ActivityExecutionDecodeHint { cancelled: false },
)?),
activity_resolution::Status::Cancelled(c) => Err(data_converter.to_error(
&SerializationContextData::Workflow,
c.failure.unwrap_or_default(),
ActivityExecutionDecodeHint { cancelled: true },
)?),
activity_resolution::Status::Backoff(_) => {
panic!("DoBackoff should be handled by LATimerBackoffFut")
}
}
}),
},
ActivityFut::Terminated => panic!("polled after termination"),
};
if poll.is_ready() {
*this = ActivityFut::Terminated;
}
poll
}
}
impl<F, Output> FusedFuture for ActivityFut<F, Output>
where
F: Future<Output = ActivityResolution> + Unpin,
Output: TemporalDeserializable + 'static,
{
fn is_terminated(&self) -> bool {
matches!(self, ActivityFut::Terminated)
}
}
impl<F, Output> CancellableFuture<Result<Output, ActivityExecutionError>> for ActivityFut<F, Output>
where
F: CancellableFuture<ActivityResolution> + Unpin,
Output: TemporalDeserializable + 'static,
{
fn cancel(&self) {
if let ActivityFut::Running { inner, .. } = self {
inner.cancel()
}
}
}
pub(crate) struct ChildWfCommon {
workflow_id: String,
child_seq: u32,
result_future: CancellableWFCommandFut<ChildWorkflowResult, (), CancellableIDWithReason>,
base_ctx: BaseWorkflowContext,
data_converter: DataConverter,
}
#[derive(derive_more::Debug)]
pub(crate) struct PendingChildWorkflow<WD: WorkflowDefinition> {
pub(crate) status: ChildWorkflowStartStatus,
#[debug(skip)]
pub(crate) common: ChildWfCommon,
pub(crate) _phantom: PhantomData<WD>,
}
#[derive(derive_more::Debug)]
pub struct StartedChildWorkflow<WD: WorkflowDefinition> {
pub run_id: String,
#[debug(skip)]
common: ChildWfCommon,
_phantom: PhantomData<WD>,
}
enum ChildWorkflowFut<F, Output> {
Running {
inner: F,
data_converter: DataConverter,
_phantom: PhantomData<Output>,
},
Terminated,
}
impl<F, Output> Unpin for ChildWorkflowFut<F, Output> where F: Unpin {}
impl<F, Output> Future for ChildWorkflowFut<F, Output>
where
F: Future<Output = ChildWorkflowResult> + Unpin,
Output: TemporalDeserializable + 'static,
{
type Output = Result<Output, ChildWorkflowExecutionError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let poll = match this {
ChildWorkflowFut::Running {
inner,
data_converter,
..
} => match Pin::new(inner).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(result) => Poll::Ready({
use temporalio_common::protos::coresdk::child_workflow::child_workflow_result;
let status = result.status.ok_or_else(|| {
data_converter
.to_error(
&SerializationContextData::Workflow,
Failure {
message: "Child workflow completed without a status"
.to_string(),
..Default::default()
},
ChildWorkflowExecutionDecodeHint,
)
.expect("synthetic child workflow failure should decode")
})?;
match status {
child_workflow_result::Status::Completed(success) => {
let payloads = success.result.into_iter().collect();
let ctx = SerializationContext {
data: &SerializationContextData::Workflow,
converter: data_converter.payload_converter(),
};
data_converter
.payload_converter()
.from_payloads::<Output>(&ctx, payloads)
.map_err(ChildWorkflowExecutionError::Serialization)
}
child_workflow_result::Status::Failed(f) => Err(data_converter.to_error(
&SerializationContextData::Workflow,
f.failure.unwrap_or_default(),
ChildWorkflowExecutionDecodeHint,
)?),
child_workflow_result::Status::Cancelled(c) => Err(data_converter
.to_error(
&SerializationContextData::Workflow,
c.failure.unwrap_or_default(),
ChildWorkflowExecutionDecodeHint,
)?),
}
}),
},
ChildWorkflowFut::Terminated => panic!("polled after termination"),
};
if poll.is_ready() {
*this = ChildWorkflowFut::Terminated;
}
poll
}
}
impl<F, Output> FusedFuture for ChildWorkflowFut<F, Output>
where
F: Future<Output = ChildWorkflowResult> + Unpin,
Output: TemporalDeserializable + 'static,
{
fn is_terminated(&self) -> bool {
matches!(self, ChildWorkflowFut::Terminated)
}
}
impl<F, Output> CancellableFutureWithReason<Result<Output, ChildWorkflowExecutionError>>
for ChildWorkflowFut<F, Output>
where
F: CancellableFutureWithReason<ChildWorkflowResult> + Unpin,
Output: TemporalDeserializable + 'static,
{
fn cancel_with_reason(&self, reason: String) {
if let ChildWorkflowFut::Running { inner, .. } = self {
inner.cancel_with_reason(reason)
}
}
}
impl<F, Output> CancellableFuture<Result<Output, ChildWorkflowExecutionError>>
for ChildWorkflowFut<F, Output>
where
F: CancellableFutureWithReason<ChildWorkflowResult> + Unpin,
Output: TemporalDeserializable + 'static,
{
fn cancel(&self) {
if let ChildWorkflowFut::Running { inner, .. } = self {
inner.cancel()
}
}
}
enum ChildWorkflowStartFut<F, WD: WorkflowDefinition> {
Errored {
error: Option<Box<ChildWorkflowStartError>>,
_phantom: PhantomData<WD>,
},
Running(F),
Terminated,
}
impl<F, WD: WorkflowDefinition> ChildWorkflowStartFut<F, WD> {
fn eager(err: ChildWorkflowStartError) -> Self {
Self::Errored {
error: Some(Box::new(err)),
_phantom: PhantomData,
}
}
}
impl<F, WD: WorkflowDefinition> Unpin for ChildWorkflowStartFut<F, WD> where F: Unpin {}
impl<F, WD> Future for ChildWorkflowStartFut<F, WD>
where
F: Future<Output = PendingChildWorkflow<WD>> + Unpin,
WD: WorkflowDefinition,
{
type Output = Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let poll = match this {
ChildWorkflowStartFut::Errored { error, .. } => {
Poll::Ready(Err(*error.take().expect("polled after completion")))
}
ChildWorkflowStartFut::Running(inner) => match Pin::new(inner).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(pending) => Poll::Ready(match pending.status {
ChildWorkflowStartStatus::Succeeded(s) => Ok(StartedChildWorkflow {
run_id: s.run_id,
common: pending.common,
_phantom: PhantomData,
}),
ChildWorkflowStartStatus::Failed(f) => {
Err(ChildWorkflowStartError::StartFailed {
workflow_id: f.workflow_id,
workflow_type: f.workflow_type,
cause: StartChildWorkflowExecutionFailedCause::try_from(f.cause)
.unwrap_or(StartChildWorkflowExecutionFailedCause::Unspecified),
})
}
ChildWorkflowStartStatus::Cancelled(c) => {
Err(pending.common.data_converter.to_error(
&SerializationContextData::Workflow,
c.failure.unwrap_or_default(),
ChildWorkflowStartDecodeHint,
)?)
}
}),
},
ChildWorkflowStartFut::Terminated => panic!("polled after termination"),
};
if poll.is_ready() {
*this = ChildWorkflowStartFut::Terminated;
}
poll
}
}
impl<F, WD> FusedFuture for ChildWorkflowStartFut<F, WD>
where
F: Future<Output = PendingChildWorkflow<WD>> + Unpin,
WD: WorkflowDefinition,
{
fn is_terminated(&self) -> bool {
matches!(self, ChildWorkflowStartFut::Terminated)
}
}
impl<F, WD> CancellableFuture<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
for ChildWorkflowStartFut<F, WD>
where
F: CancellableFutureWithReason<PendingChildWorkflow<WD>> + Unpin,
WD: WorkflowDefinition,
{
fn cancel(&self) {
if let ChildWorkflowStartFut::Running(inner) = self {
inner.cancel()
}
}
}
impl<F, WD> CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
for ChildWorkflowStartFut<F, WD>
where
F: CancellableFutureWithReason<PendingChildWorkflow<WD>> + Unpin,
WD: WorkflowDefinition,
{
fn cancel_with_reason(&self, reason: String) {
if let ChildWorkflowStartFut::Running(inner) = self {
inner.cancel_with_reason(reason)
}
}
}
enum SignalChildFut<F> {
Errored {
error: Option<ChildWorkflowSignalError>,
},
Running {
inner: F,
data_converter: DataConverter,
},
Terminated,
}
impl<F> SignalChildFut<F> {
fn eager(err: ChildWorkflowSignalError) -> Self {
Self::Errored { error: Some(err) }
}
}
impl<F> Unpin for SignalChildFut<F> where F: Unpin {}
impl<F> Future for SignalChildFut<F>
where
F: Future<Output = SignalExternalWfResult> + Unpin,
{
type Output = Result<(), ChildWorkflowSignalError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let poll = match this {
SignalChildFut::Errored { error } => {
Poll::Ready(Err(error.take().expect("polled after completion")))
}
SignalChildFut::Running {
inner,
data_converter,
} => match Pin::new(inner).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
Poll::Ready(Err(failure)) => Poll::Ready(Err(data_converter.to_error(
&SerializationContextData::Workflow,
failure,
ChildWorkflowSignalDecodeHint,
)?)),
},
SignalChildFut::Terminated => panic!("polled after termination"),
};
if poll.is_ready() {
*this = SignalChildFut::Terminated;
}
poll
}
}
impl<F> FusedFuture for SignalChildFut<F>
where
F: Future<Output = SignalExternalWfResult> + Unpin,
{
fn is_terminated(&self) -> bool {
matches!(self, SignalChildFut::Terminated)
}
}
impl<F> CancellableFuture<Result<(), ChildWorkflowSignalError>> for SignalChildFut<F>
where
F: CancellableFuture<SignalExternalWfResult> + Unpin,
{
fn cancel(&self) {
if let SignalChildFut::Running { inner, .. } = self {
inner.cancel()
}
}
}
impl<WD: WorkflowDefinition> StartedChildWorkflow<WD>
where
WD::Output: TemporalDeserializable + 'static,
{
pub fn result(
self,
) -> impl CancellableFutureWithReason<Result<WD::Output, ChildWorkflowExecutionError>> {
ChildWorkflowFut::Running {
inner: self.common.result_future,
data_converter: self.common.data_converter,
_phantom: PhantomData,
}
}
pub fn cancel(&self, reason: String) {
self.common.base_ctx.send(RustWfCmd::NewNonblockingCmd(
CancelChildWorkflowExecution {
child_workflow_seq: self.common.child_seq,
reason,
}
.into(),
));
}
pub fn signal<S: SignalDefinition<Workflow = WD>>(
&self,
signal: S,
input: S::Input,
) -> impl CancellableFuture<Result<(), ChildWorkflowSignalError>> + 'static {
let payload_converter = self.common.data_converter.payload_converter();
let ctx = SerializationContext {
data: &SerializationContextData::Workflow,
converter: payload_converter,
};
let payloads = match payload_converter.to_payloads(&ctx, &input) {
Ok(p) => p,
Err(e) => {
return SignalChildFut::eager(e.into());
}
};
let signal = Signal::new(S::name(&signal), payloads);
let target = sig_we::Target::ChildWorkflowId(self.common.workflow_id.clone());
SignalChildFut::Running {
inner: self.common.base_ctx.clone().send_signal_wf(target, signal),
data_converter: self.common.data_converter.clone(),
}
}
}
#[derive(derive_more::Debug)]
pub struct ExternalWorkflowHandle {
workflow_id: String,
run_id: Option<String>,
namespace: String,
#[debug(skip)]
base_ctx: BaseWorkflowContext,
}
impl ExternalWorkflowHandle {
pub fn workflow_id(&self) -> &str {
&self.workflow_id
}
pub fn run_id(&self) -> Option<&str> {
self.run_id.as_deref()
}
pub fn signal<S: SignalDefinition>(
&self,
signal: S,
input: S::Input,
) -> impl CancellableFuture<SignalExternalWfResult> + 'static {
let payload_converter = self.base_ctx.inner.data_converter.payload_converter();
let ctx = SerializationContext {
data: &SerializationContextData::Workflow,
converter: payload_converter,
};
let payloads = match payload_converter.to_payloads(&ctx, &input) {
Ok(p) => p,
Err(e) => {
return SignalExternalFut::SerializationError(Some(e));
}
};
let signal = Signal::new(S::name(&signal), payloads);
let target = sig_we::Target::WorkflowExecution(NamespacedWorkflowExecution {
namespace: self.namespace.clone(),
workflow_id: self.workflow_id.clone(),
run_id: self.run_id.clone().unwrap_or_default(),
});
SignalExternalFut::Running(self.base_ctx.clone().send_signal_wf(target, signal))
}
pub fn cancel(
&self,
reason: Option<String>,
) -> impl FusedFuture<Output = CancelExternalWfResult> {
let seq = self
.base_ctx
.inner
.seq_nums
.borrow_mut()
.next_cancel_external_wf_seq();
let (cmd, unblocker) = WFCommandFut::new();
self.base_ctx.send(
CommandCreateRequest {
cmd: WorkflowCommand {
variant: Some(
RequestCancelExternalWorkflowExecution {
seq,
workflow_execution: Some(NamespacedWorkflowExecution {
namespace: self.namespace.clone(),
workflow_id: self.workflow_id.clone(),
run_id: self.run_id.clone().unwrap_or_default(),
}),
reason: reason.unwrap_or_default(),
}
.into(),
),
user_metadata: None,
},
unblocker,
}
.into(),
);
cmd
}
}
enum SignalExternalFut<F> {
Running(F),
SerializationError(Option<PayloadConversionError>),
Done,
}
impl<F: Unpin> Unpin for SignalExternalFut<F> {}
impl<F> Future for SignalExternalFut<F>
where
F: Future<Output = SignalExternalWfResult> + Unpin,
{
type Output = SignalExternalWfResult;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
match this {
SignalExternalFut::Running(inner) => {
let result = std::task::ready!(Pin::new(inner).poll(cx));
*this = SignalExternalFut::Done;
Poll::Ready(result)
}
SignalExternalFut::SerializationError(e) => {
let err = e.take().expect("polled after completion");
*this = SignalExternalFut::Done;
Poll::Ready(Err(Failure {
message: format!("Failed to serialize signal input: {err}"),
..Default::default()
}))
}
SignalExternalFut::Done => panic!("polled after completion"),
}
}
}
impl<F> FusedFuture for SignalExternalFut<F>
where
F: Future<Output = SignalExternalWfResult> + Unpin,
{
fn is_terminated(&self) -> bool {
matches!(self, SignalExternalFut::Done)
}
}
impl<F> CancellableFuture<SignalExternalWfResult> for SignalExternalFut<F>
where
F: CancellableFuture<SignalExternalWfResult> + Unpin,
{
fn cancel(&self) {
if let SignalExternalFut::Running(inner) = self {
inner.cancel()
}
}
}
#[derive(derive_more::Debug)]
#[debug("StartedNexusOperation{{ operation_token: {operation_token:?} }}")]
pub struct StartedNexusOperation {
pub operation_token: Option<String>,
pub(crate) unblock_dat: NexusUnblockData,
}
pub(crate) struct NexusUnblockData {
result_future: Shared<WFCommandFut<NexusOperationResult, ()>>,
schedule_seq: u32,
base_ctx: BaseWorkflowContext,
}
impl StartedNexusOperation {
pub async fn result(&self) -> NexusOperationResult {
self.unblock_dat.result_future.clone().await
}
pub fn cancel(&self) {
self.unblock_dat
.base_ctx
.cancel(CancellableID::NexusOp(self.unblock_dat.schedule_seq));
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use temporalio_common::{
data_converters::{TemporalDeserializable, TemporalSerializable},
protos::{
coresdk::{AsJsonPayloadExt, common::VersioningIntent},
temporal::api::common::v1::{Payload, RetryPolicy},
},
};
use temporalio_macros::{workflow, workflow_methods};
#[workflow]
#[derive(Default)]
struct TestWorkflow;
#[workflow_methods]
impl TestWorkflow {
#[run]
async fn run(_ctx: &mut WorkflowContext<Self>, _input: u8) -> crate::WorkflowResult<()> {
unreachable!("test workflow run should not be polled")
}
}
fn test_context() -> WorkflowContext<TestWorkflow> {
let init = InitializeWorkflow {
workflow_type: TestWorkflow.name().to_string(),
..Default::default()
};
let (_, cancelled_rx) = watch::channel(None);
let (base, _cmd_rx) = BaseWorkflowContext::new(
"default".to_string(),
"orig-task-queue".to_string(),
"run-id".to_string(),
init,
cancelled_rx,
DataConverter::default(),
);
WorkflowContext::from_base(base, Rc::new(RefCell::new(TestWorkflow)))
}
#[test]
fn workflow_context_continue_as_new_serializes_input_and_defaults() {
let ctx = test_context();
let termination = ctx
.continue_as_new(&7, ContinueAsNewOptions::default())
.expect_err("continue_as_new should terminate the workflow");
assert!(
matches!(termination, WorkflowTermination::ContinueAsNew(_)),
"expected continue-as-new termination, got {termination:?}"
);
let WorkflowTermination::ContinueAsNew(cmd) = termination else {
unreachable!()
};
assert_eq!(
*cmd,
temporalio_common::protos::coresdk::workflow_commands::ContinueAsNewWorkflowExecution {
workflow_type: TestWorkflow.name().to_string(),
arguments: vec![7u8.as_json_payload().unwrap()],
versioning_intent: VersioningIntent::Unspecified as i32,
..Default::default()
}
);
}
#[test]
fn sync_workflow_context_continue_as_new_applies_options() {
let ctx = test_context();
let sync = ctx.sync_context();
let mut memo = HashMap::new();
memo.insert(
"memo-key".to_string(),
Payload::from(b"memo-value".as_slice()),
);
let mut headers = HashMap::new();
headers.insert(
"header-key".to_string(),
Payload::from(b"header-value".as_slice()),
);
let mut search_attributes = SearchAttributes::default();
search_attributes.indexed_fields.insert(
"CustomKeywordField".to_string(),
Payload::from(b"value".as_slice()),
);
let termination = sync
.continue_as_new(
&11,
ContinueAsNewOptions {
workflow_type: Some("next-workflow".to_string()),
task_queue: Some("next-task-queue".to_string()),
run_timeout: Some(Duration::from_secs(10)),
task_timeout: Some(Duration::from_secs(3)),
memo: Some(memo.clone()),
headers: Some(headers.clone()),
search_attributes: Some(search_attributes.clone()),
retry_policy: Some(RetryPolicy {
maximum_attempts: 5,
..Default::default()
}),
versioning_intent: Some(VersioningIntent::Compatible),
},
)
.expect_err("continue_as_new should terminate the workflow");
assert!(
matches!(termination, WorkflowTermination::ContinueAsNew(_)),
"expected continue-as-new termination, got {termination:?}"
);
let WorkflowTermination::ContinueAsNew(cmd) = termination else {
unreachable!()
};
assert_eq!(
*cmd,
temporalio_common::protos::coresdk::workflow_commands::ContinueAsNewWorkflowExecution {
workflow_type: "next-workflow".to_string(),
task_queue: "next-task-queue".to_string(),
arguments: vec![11u8.as_json_payload().unwrap()],
workflow_run_timeout: Some(Duration::from_secs(10).try_into().unwrap()),
workflow_task_timeout: Some(Duration::from_secs(3).try_into().unwrap()),
memo,
headers,
search_attributes: Some(search_attributes),
retry_policy: Some(RetryPolicy {
maximum_attempts: 5,
..Default::default()
}),
versioning_intent: VersioningIntent::Compatible as i32,
..Default::default()
}
);
}
#[test]
fn continue_as_new_reports_serialization_errors() {
#[derive(Debug)]
struct FailingInput;
impl TemporalSerializable for FailingInput {
fn to_payload(
&self,
_ctx: &temporalio_common::data_converters::SerializationContext<'_>,
) -> Result<Payload, temporalio_common::data_converters::PayloadConversionError>
{
Err(
temporalio_common::data_converters::PayloadConversionError::EncodingError(
std::io::Error::other("serialization failure").into(),
),
)
}
}
impl TemporalDeserializable for FailingInput {
fn from_payload(
_ctx: &temporalio_common::data_converters::SerializationContext<'_>,
_payload: Payload,
) -> Result<Self, temporalio_common::data_converters::PayloadConversionError>
{
unreachable!("test input is only serialized")
}
}
#[workflow]
#[derive(Default)]
struct FailingWorkflow;
#[workflow_methods]
impl FailingWorkflow {
#[run]
async fn run(
_ctx: &mut WorkflowContext<Self>,
_input: FailingInput,
) -> crate::WorkflowResult<()> {
unreachable!("test workflow run should not be polled")
}
}
let init = InitializeWorkflow {
workflow_type: "failing-workflow".to_string(),
..Default::default()
};
let (_, cancelled_rx) = watch::channel(None);
let (base, _cmd_rx) = BaseWorkflowContext::new(
"default".to_string(),
"orig-task-queue".to_string(),
"run-id".to_string(),
init,
cancelled_rx,
DataConverter::default(),
);
let ctx = WorkflowContext::from_base(base, Rc::new(RefCell::new(FailingWorkflow)));
let err = ctx
.continue_as_new(&FailingInput, ContinueAsNewOptions::default())
.expect_err("serialization errors should be surfaced");
let WorkflowTermination::Failed(err) = err else {
panic!("expected failed termination, got {err:?}");
};
assert_eq!(err.to_string(), "Encoding error: serialization failure");
}
}