use crate::{
NamespacedClient, WorkflowCancelOptions, WorkflowDescribeOptions, WorkflowExecuteUpdateOptions,
WorkflowFetchHistoryOptions, WorkflowGetResultOptions, WorkflowQueryOptions,
WorkflowSignalOptions, WorkflowStartUpdateOptions, WorkflowTerminateOptions,
WorkflowUpdateWaitStage,
errors::{
WorkflowGetResultError, WorkflowInteractionError, WorkflowQueryError, WorkflowUpdateError,
},
grpc::WorkflowService,
};
use std::{fmt::Debug, marker::PhantomData};
pub use temporalio_common::UntypedWorkflow;
use temporalio_common::{
HasWorkflowDefinition, QueryDefinition, SignalDefinition, UpdateDefinition, WorkflowDefinition,
data_converters::{
DataConverter, GenericPayloadConverter, PayloadConversionError, PayloadConverter, RawValue,
SerializationContext, SerializationContextData,
},
payload_visitor::decode_payloads,
protos::{
coresdk::FromPayloadsExt,
proto_ts_to_system_time,
temporal::api::{
common::v1::{Payload, Payloads, WorkflowExecution as ProtoWorkflowExecution},
enums::v1::{HistoryEventFilterType, UpdateWorkflowExecutionLifecycleStage},
failure::v1::Failure,
history::{
self,
v1::{HistoryEvent, history_event::Attributes},
},
query::v1::WorkflowQuery,
sdk::v1::UserMetadata,
update::{self, v1::WaitPolicy},
workflow::v1 as workflow,
workflowservice::v1::{
DescribeWorkflowExecutionRequest, DescribeWorkflowExecutionResponse,
GetWorkflowExecutionHistoryRequest, PollWorkflowExecutionUpdateRequest,
QueryWorkflowRequest, RequestCancelWorkflowExecutionRequest,
SignalWorkflowExecutionRequest, TerminateWorkflowExecutionRequest,
UpdateWorkflowExecutionRequest,
},
},
},
};
use tonic::IntoRequest;
use uuid::Uuid;
#[derive(Debug, Clone, Default, PartialEq, Eq)]
struct DecodedUserMetadata {
summary: Option<String>,
details: Option<String>,
}
fn decode_user_metadata(
context: &SerializationContextData,
user_metadata: Option<UserMetadata>,
) -> Result<DecodedUserMetadata, PayloadConversionError> {
let payload_converter = PayloadConverter::default();
let context = SerializationContext {
data: context,
converter: &payload_converter,
};
let (summary, details) = user_metadata
.map(|metadata| (metadata.summary, metadata.details))
.unwrap_or_default();
Ok(DecodedUserMetadata {
summary: match summary {
Some(payload) => Some(payload_converter.from_payload(&context, payload)?),
None => None,
},
details: match details {
Some(payload) => Some(payload_converter.from_payload(&context, payload)?),
None => None,
},
})
}
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum WorkflowExecutionResult<T> {
Succeeded(T),
Failed(Failure),
Cancelled {
details: Vec<Payload>,
},
Terminated {
details: Vec<Payload>,
},
TimedOut,
ContinuedAsNew,
}
#[derive(Debug, Clone)]
pub struct WorkflowExecutionDescription {
pub raw_description: DescribeWorkflowExecutionResponse,
history_length: usize,
static_summary: Option<String>,
static_details: Option<String>,
}
impl WorkflowExecutionDescription {
async fn new(
mut raw_description: DescribeWorkflowExecutionResponse,
data_converter: &DataConverter,
) -> Result<Self, PayloadConversionError> {
let raw_user_metadata = raw_description
.execution_config
.as_ref()
.and_then(|cfg| cfg.user_metadata.clone());
decode_payloads(
&mut raw_description,
data_converter.codec(),
&SerializationContextData::Workflow,
)
.await;
let decoded_metadata =
decode_user_metadata(&SerializationContextData::Workflow, raw_user_metadata)?;
let history_length_raw = raw_description
.workflow_execution_info
.as_ref()
.map(|info| info.history_length)
.unwrap_or(0);
let history_length = history_length_raw.try_into().map_err(|_| {
PayloadConversionError::EncodingError(
format!("workflow history_length must be non-negative, got {history_length_raw}")
.into(),
)
})?;
Ok(Self {
raw_description,
history_length,
static_summary: decoded_metadata.summary,
static_details: decoded_metadata.details,
})
}
pub fn id(&self) -> &str {
self.execution().workflow_id.as_str()
}
pub fn run_id(&self) -> &str {
self.execution().run_id.as_str()
}
pub fn workflow_type(&self) -> &str {
self.workflow_type_info().name.as_str()
}
pub fn status(
&self,
) -> temporalio_common::protos::temporal::api::enums::v1::WorkflowExecutionStatus {
self.workflow_info().status()
}
pub fn start_time(&self) -> Option<std::time::SystemTime> {
self.workflow_info()
.start_time
.as_ref()
.and_then(proto_ts_to_system_time)
}
pub fn execution_time(&self) -> Option<std::time::SystemTime> {
self.workflow_info()
.execution_time
.as_ref()
.and_then(proto_ts_to_system_time)
}
pub fn close_time(&self) -> Option<std::time::SystemTime> {
self.workflow_info()
.close_time
.as_ref()
.and_then(proto_ts_to_system_time)
}
pub fn task_queue(&self) -> &str {
self.workflow_info().task_queue.as_str()
}
pub fn history_length(&self) -> usize {
self.history_length
}
pub fn memo(&self) -> Option<&temporalio_common::protos::temporal::api::common::v1::Memo> {
self.workflow_info().memo.as_ref()
}
pub fn parent_id(&self) -> Option<&str> {
self.workflow_info()
.parent_execution
.as_ref()
.map(|e| e.workflow_id.as_str())
}
pub fn parent_run_id(&self) -> Option<&str> {
self.workflow_info()
.parent_execution
.as_ref()
.map(|e| e.run_id.as_str())
}
pub fn search_attributes(
&self,
) -> Option<&temporalio_common::protos::temporal::api::common::v1::SearchAttributes> {
self.workflow_info().search_attributes.as_ref()
}
pub fn static_summary(&self) -> Option<&str> {
self.static_summary.as_deref()
}
pub fn static_details(&self) -> Option<&str> {
self.static_details.as_deref()
}
pub fn raw(&self) -> &DescribeWorkflowExecutionResponse {
&self.raw_description
}
pub fn into_raw(self) -> DescribeWorkflowExecutionResponse {
self.raw_description
}
fn workflow_info(&self) -> &workflow::WorkflowExecutionInfo {
self.raw_description
.workflow_execution_info
.as_ref()
.expect("describe response missing workflow_execution_info")
}
fn execution(&self) -> &ProtoWorkflowExecution {
self.workflow_info()
.execution
.as_ref()
.expect("describe response missing workflow_execution_info.execution")
}
fn workflow_type_info(
&self,
) -> &temporalio_common::protos::temporal::api::common::v1::WorkflowType {
self.workflow_info()
.r#type
.as_ref()
.expect("describe response missing workflow_execution_info.type")
}
}
#[derive(Debug, Clone)]
pub struct WorkflowHistory {
events: Vec<HistoryEvent>,
}
impl From<WorkflowHistory> for history::v1::History {
fn from(h: WorkflowHistory) -> Self {
Self { events: h.events }
}
}
impl WorkflowHistory {
fn new(events: Vec<HistoryEvent>) -> Self {
Self { events }
}
pub fn events(&self) -> &[HistoryEvent] {
&self.events
}
pub fn into_events(self) -> Vec<HistoryEvent> {
self.events
}
}
#[derive(Clone)]
pub struct WorkflowHandle<ClientT, W> {
client: ClientT,
info: WorkflowExecutionInfo,
_wf_type: PhantomData<W>,
}
impl<CT, W> WorkflowHandle<CT, W> {
pub fn run_id(&self) -> Option<&str> {
self.info.run_id.as_deref()
}
}
#[derive(Debug, Clone)]
pub struct WorkflowExecutionInfo {
pub namespace: String,
pub workflow_id: String,
pub run_id: Option<String>,
pub first_execution_run_id: Option<String>,
}
impl WorkflowExecutionInfo {
pub fn bind_untyped<CT>(self, client: CT) -> UntypedWorkflowHandle<CT>
where
CT: WorkflowService + Clone,
{
UntypedWorkflowHandle::new(client, self)
}
}
pub type UntypedWorkflowHandle<CT> = WorkflowHandle<CT, UntypedWorkflow>;
pub struct UntypedSignal<W> {
name: String,
_wf: PhantomData<W>,
}
impl<W> UntypedSignal<W> {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
_wf: PhantomData,
}
}
}
impl<W: WorkflowDefinition> SignalDefinition for UntypedSignal<W> {
type Workflow = W;
type Input = RawValue;
fn name(&self) -> &str {
&self.name
}
}
pub struct UntypedQuery<W> {
name: String,
_wf: PhantomData<W>,
}
impl<W> UntypedQuery<W> {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
_wf: PhantomData,
}
}
}
impl<W: WorkflowDefinition> QueryDefinition for UntypedQuery<W> {
type Workflow = W;
type Input = RawValue;
type Output = RawValue;
fn name(&self) -> &str {
&self.name
}
}
pub struct UntypedUpdate<W> {
name: String,
_wf: PhantomData<W>,
}
impl<W> UntypedUpdate<W> {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
_wf: PhantomData,
}
}
}
impl<W: WorkflowDefinition> UpdateDefinition for UntypedUpdate<W> {
type Workflow = W;
type Input = RawValue;
type Output = RawValue;
fn name(&self) -> &str {
&self.name
}
}
impl<CT, W> WorkflowHandle<CT, W>
where
CT: WorkflowService + Clone,
W: HasWorkflowDefinition,
{
pub fn new(client: CT, info: WorkflowExecutionInfo) -> Self {
Self {
client,
info,
_wf_type: PhantomData::<W>,
}
}
pub fn info(&self) -> &WorkflowExecutionInfo {
&self.info
}
pub fn client(&self) -> &CT {
&self.client
}
pub async fn get_result(
&self,
opts: WorkflowGetResultOptions,
) -> Result<W::Output, WorkflowGetResultError>
where
CT: WorkflowService + NamespacedClient + Clone,
{
let raw = self.get_result_raw(opts).await?;
match raw {
WorkflowExecutionResult::Succeeded(v) => Ok(v),
WorkflowExecutionResult::Failed(f) => Err(WorkflowGetResultError::Failed(Box::new(f))),
WorkflowExecutionResult::Cancelled { details } => {
Err(WorkflowGetResultError::Cancelled { details })
}
WorkflowExecutionResult::Terminated { details } => {
Err(WorkflowGetResultError::Terminated { details })
}
WorkflowExecutionResult::TimedOut => Err(WorkflowGetResultError::TimedOut),
WorkflowExecutionResult::ContinuedAsNew => Err(WorkflowGetResultError::ContinuedAsNew),
}
}
async fn get_result_raw(
&self,
opts: WorkflowGetResultOptions,
) -> Result<WorkflowExecutionResult<W::Output>, WorkflowInteractionError>
where
CT: WorkflowService + NamespacedClient + Clone,
{
let mut run_id = self.info.run_id.clone().unwrap_or_default();
let fetch_opts = WorkflowFetchHistoryOptions::builder()
.skip_archival(true)
.wait_new_event(true)
.event_filter_type(HistoryEventFilterType::CloseEvent)
.build();
loop {
let history = self.fetch_history_for_run(&run_id, &fetch_opts).await?;
let mut events = history.into_events();
if events.is_empty() {
continue;
}
let event_attrs = events.pop().and_then(|ev| ev.attributes);
macro_rules! follow {
($attrs:ident) => {
if opts.follow_runs && $attrs.new_execution_run_id != "" {
run_id = $attrs.new_execution_run_id;
continue;
}
};
}
let dc = self.client.data_converter();
break match event_attrs {
Some(Attributes::WorkflowExecutionCompletedEventAttributes(attrs)) => {
follow!(attrs);
let payload = attrs
.result
.and_then(|p| p.payloads.into_iter().next())
.unwrap_or_default();
let result: W::Output = dc
.from_payload(&SerializationContextData::Workflow, payload)
.await?;
Ok(WorkflowExecutionResult::Succeeded(result))
}
Some(Attributes::WorkflowExecutionFailedEventAttributes(attrs)) => {
follow!(attrs);
Ok(WorkflowExecutionResult::Failed(
attrs.failure.unwrap_or_default(),
))
}
Some(Attributes::WorkflowExecutionCanceledEventAttributes(attrs)) => {
Ok(WorkflowExecutionResult::Cancelled {
details: Vec::from_payloads(attrs.details),
})
}
Some(Attributes::WorkflowExecutionTimedOutEventAttributes(attrs)) => {
follow!(attrs);
Ok(WorkflowExecutionResult::TimedOut)
}
Some(Attributes::WorkflowExecutionTerminatedEventAttributes(attrs)) => {
Ok(WorkflowExecutionResult::Terminated {
details: Vec::from_payloads(attrs.details),
})
}
Some(Attributes::WorkflowExecutionContinuedAsNewEventAttributes(attrs)) => {
if opts.follow_runs {
if !attrs.new_execution_run_id.is_empty() {
run_id = attrs.new_execution_run_id;
continue;
} else {
return Err(WorkflowInteractionError::Other(
"New execution run id was empty in continue as new event!".into(),
));
}
} else {
Ok(WorkflowExecutionResult::ContinuedAsNew)
}
}
o => Err(WorkflowInteractionError::Other(
format!(
"Server returned an event that didn't match the CloseEvent filter. \
This is either a server bug or a new event the SDK does not understand. \
Event details: {o:?}"
)
.into(),
)),
};
}
}
pub async fn signal<S>(
&self,
signal: S,
input: S::Input,
opts: WorkflowSignalOptions,
) -> Result<(), WorkflowInteractionError>
where
CT: WorkflowService + NamespacedClient + Clone,
S: SignalDefinition<Workflow = W::Run>,
S::Input: Send,
{
let payloads = self
.client
.data_converter()
.to_payloads(&SerializationContextData::Workflow, &input)
.await?;
WorkflowService::signal_workflow_execution(
&mut self.client.clone(),
SignalWorkflowExecutionRequest {
namespace: self.client.namespace(),
workflow_execution: Some(ProtoWorkflowExecution {
workflow_id: self.info.workflow_id.clone(),
run_id: self.info.run_id.clone().unwrap_or_default(),
}),
signal_name: signal.name().to_string(),
input: Some(Payloads { payloads }),
identity: self.client.identity(),
request_id: opts
.request_id
.unwrap_or_else(|| Uuid::new_v4().to_string()),
header: opts.header,
..Default::default()
}
.into_request(),
)
.await
.map_err(WorkflowInteractionError::from_status)?;
Ok(())
}
pub async fn query<Q>(
&self,
query: Q,
input: Q::Input,
opts: WorkflowQueryOptions,
) -> Result<Q::Output, WorkflowQueryError>
where
CT: WorkflowService + NamespacedClient + Clone,
Q: QueryDefinition<Workflow = W::Run>,
Q::Input: Send,
{
let dc = self.client.data_converter();
let payloads = dc
.to_payloads(&SerializationContextData::Workflow, &input)
.await?;
let response = self
.client
.clone()
.query_workflow(
QueryWorkflowRequest {
namespace: self.client.namespace(),
execution: Some(ProtoWorkflowExecution {
workflow_id: self.info.workflow_id.clone(),
run_id: self.info.run_id.clone().unwrap_or_default(),
}),
query: Some(WorkflowQuery {
query_type: query.name().to_string(),
query_args: Some(Payloads { payloads }),
header: opts.header,
}),
query_reject_condition: opts.reject_condition.map(|c| c as i32).unwrap_or(1),
}
.into_request(),
)
.await
.map_err(WorkflowQueryError::from_status)?
.into_inner();
if let Some(rejected) = response.query_rejected {
return Err(WorkflowQueryError::Rejected(rejected));
}
let result_payloads = response
.query_result
.map(|p| p.payloads)
.unwrap_or_default();
dc.from_payloads(&SerializationContextData::Workflow, result_payloads)
.await
.map_err(WorkflowQueryError::from)
}
pub async fn execute_update<U>(
&self,
update: U,
input: U::Input,
options: WorkflowExecuteUpdateOptions,
) -> Result<U::Output, WorkflowUpdateError>
where
CT: WorkflowService + NamespacedClient + Clone,
U: UpdateDefinition<Workflow = W::Run>,
U::Input: Send,
U::Output: 'static,
{
let handle = self
.start_update(
update,
input,
WorkflowStartUpdateOptions::builder()
.maybe_update_id(options.update_id)
.maybe_header(options.header)
.wait_for_stage(WorkflowUpdateWaitStage::Completed)
.build(),
)
.await?;
handle.get_result().await
}
pub async fn start_update<U>(
&self,
update: U,
input: U::Input,
options: WorkflowStartUpdateOptions,
) -> Result<WorkflowUpdateHandle<CT, U::Output>, WorkflowUpdateError>
where
CT: WorkflowService + NamespacedClient + Clone,
U: UpdateDefinition<Workflow = W::Run>,
U::Input: Send,
{
let dc = self.client.data_converter();
let payloads = dc
.to_payloads(&SerializationContextData::Workflow, &input)
.await?;
let lifecycle_stage = match options.wait_for_stage {
WorkflowUpdateWaitStage::Admitted => UpdateWorkflowExecutionLifecycleStage::Admitted,
WorkflowUpdateWaitStage::Accepted => UpdateWorkflowExecutionLifecycleStage::Accepted,
WorkflowUpdateWaitStage::Completed => UpdateWorkflowExecutionLifecycleStage::Completed,
};
let update_id = options
.update_id
.unwrap_or_else(|| Uuid::new_v4().to_string());
let response = WorkflowService::update_workflow_execution(
&mut self.client.clone(),
UpdateWorkflowExecutionRequest {
namespace: self.client.namespace(),
workflow_execution: Some(ProtoWorkflowExecution {
workflow_id: self.info().workflow_id.clone(),
run_id: self.info().run_id.clone().unwrap_or_default(),
}),
wait_policy: Some(WaitPolicy {
lifecycle_stage: lifecycle_stage.into(),
}),
request: Some(update::v1::Request {
meta: Some(update::v1::Meta {
update_id: update_id.clone(),
identity: self.client.identity(),
}),
input: Some(update::v1::Input {
header: options.header,
name: update.name().to_string(),
args: Some(Payloads { payloads }),
}),
}),
..Default::default()
}
.into_request(),
)
.await
.map_err(WorkflowUpdateError::from_status)?
.into_inner();
let run_id = response
.update_ref
.as_ref()
.and_then(|r| r.workflow_execution.as_ref())
.map(|e| e.run_id.clone())
.filter(|s| !s.is_empty())
.or_else(|| self.info().run_id.clone());
Ok(WorkflowUpdateHandle {
client: self.client.clone(),
update_id,
workflow_id: self.info().workflow_id.clone(),
run_id,
known_outcome: response.outcome,
_output: PhantomData,
})
}
pub async fn cancel(&self, opts: WorkflowCancelOptions) -> Result<(), WorkflowInteractionError>
where
CT: NamespacedClient,
{
WorkflowService::request_cancel_workflow_execution(
&mut self.client.clone(),
RequestCancelWorkflowExecutionRequest {
namespace: self.client.namespace(),
workflow_execution: Some(ProtoWorkflowExecution {
workflow_id: self.info.workflow_id.clone(),
run_id: self.info.run_id.clone().unwrap_or_default(),
}),
identity: self.client.identity(),
request_id: opts
.request_id
.unwrap_or_else(|| Uuid::new_v4().to_string()),
first_execution_run_id: self
.info
.first_execution_run_id
.clone()
.unwrap_or_default(),
reason: opts.reason,
links: vec![],
}
.into_request(),
)
.await
.map_err(WorkflowInteractionError::from_status)?;
Ok(())
}
pub async fn terminate(
&self,
opts: WorkflowTerminateOptions,
) -> Result<(), WorkflowInteractionError>
where
CT: NamespacedClient,
{
WorkflowService::terminate_workflow_execution(
&mut self.client.clone(),
TerminateWorkflowExecutionRequest {
namespace: self.client.namespace(),
workflow_execution: Some(ProtoWorkflowExecution {
workflow_id: self.info.workflow_id.clone(),
run_id: self.info.run_id.clone().unwrap_or_default(),
}),
reason: opts.reason,
details: opts.details,
identity: self.client.identity(),
first_execution_run_id: self
.info
.first_execution_run_id
.clone()
.unwrap_or_default(),
links: vec![],
}
.into_request(),
)
.await
.map_err(WorkflowInteractionError::from_status)?;
Ok(())
}
pub async fn describe(
&self,
_opts: WorkflowDescribeOptions,
) -> Result<WorkflowExecutionDescription, WorkflowInteractionError>
where
CT: NamespacedClient,
{
let response = WorkflowService::describe_workflow_execution(
&mut self.client.clone(),
DescribeWorkflowExecutionRequest {
namespace: self.client.namespace(),
execution: Some(ProtoWorkflowExecution {
workflow_id: self.info.workflow_id.clone(),
run_id: self.info.run_id.clone().unwrap_or_default(),
}),
}
.into_request(),
)
.await
.map_err(WorkflowInteractionError::from_status)?
.into_inner();
WorkflowExecutionDescription::new(response, self.client.data_converter())
.await
.map_err(WorkflowInteractionError::from)
}
pub async fn fetch_history(
&self,
opts: WorkflowFetchHistoryOptions,
) -> Result<WorkflowHistory, WorkflowInteractionError>
where
CT: NamespacedClient,
{
let run_id = self.info.run_id.clone().unwrap_or_default();
self.fetch_history_for_run(&run_id, &opts).await
}
async fn fetch_history_for_run(
&self,
run_id: &str,
opts: &WorkflowFetchHistoryOptions,
) -> Result<WorkflowHistory, WorkflowInteractionError>
where
CT: NamespacedClient,
{
let mut all_events = Vec::new();
let mut next_page_token = vec![];
loop {
let response = WorkflowService::get_workflow_execution_history(
&mut self.client.clone(),
GetWorkflowExecutionHistoryRequest {
namespace: self.client.namespace(),
execution: Some(ProtoWorkflowExecution {
workflow_id: self.info.workflow_id.clone(),
run_id: run_id.to_string(),
}),
next_page_token: next_page_token.clone(),
skip_archival: opts.skip_archival,
wait_new_event: opts.wait_new_event,
history_event_filter_type: opts.event_filter_type as i32,
..Default::default()
}
.into_request(),
)
.await
.map_err(WorkflowInteractionError::from_status)?
.into_inner();
if let Some(history) = response.history {
all_events.extend(history.events);
}
if response.next_page_token.is_empty() {
break;
}
next_page_token = response.next_page_token;
}
Ok(WorkflowHistory::new(all_events))
}
}
pub struct WorkflowUpdateHandle<CT, T> {
client: CT,
update_id: String,
workflow_id: String,
run_id: Option<String>,
known_outcome: Option<update::v1::Outcome>,
_output: PhantomData<T>,
}
impl<CT, T> WorkflowUpdateHandle<CT, T> {
pub fn id(&self) -> &str {
&self.update_id
}
pub fn workflow_id(&self) -> &str {
&self.workflow_id
}
pub fn workflow_run_id(&self) -> Option<&str> {
self.run_id.as_deref()
}
}
impl<CT, T: 'static> WorkflowUpdateHandle<CT, T>
where
CT: WorkflowService + NamespacedClient + Clone,
{
pub async fn get_result(&self) -> Result<T, WorkflowUpdateError>
where
T: temporalio_common::data_converters::TemporalDeserializable,
{
let outcome = if let Some(known) = &self.known_outcome {
known.clone()
} else {
loop {
let response = WorkflowService::poll_workflow_execution_update(
&mut self.client.clone(),
PollWorkflowExecutionUpdateRequest {
namespace: self.client.namespace(),
update_ref: Some(update::v1::UpdateRef {
workflow_execution: Some(ProtoWorkflowExecution {
workflow_id: self.workflow_id.clone(),
run_id: self.run_id.clone().unwrap_or_default(),
}),
update_id: self.update_id.clone(),
}),
identity: self.client.identity(),
wait_policy: Some(WaitPolicy {
lifecycle_stage: UpdateWorkflowExecutionLifecycleStage::Completed
.into(),
}),
}
.into_request(),
)
.await
.map_err(WorkflowUpdateError::from_status)?
.into_inner();
if let Some(outcome) = response.outcome {
break outcome;
}
}
};
match outcome.value {
Some(update::v1::outcome::Value::Success(success)) => self
.client
.data_converter()
.from_payloads(&SerializationContextData::Workflow, success.payloads)
.await
.map_err(WorkflowUpdateError::from),
Some(update::v1::outcome::Value::Failure(failure)) => {
Err(WorkflowUpdateError::Failed(Box::new(failure)))
}
None => Err(WorkflowUpdateError::Other(
"Update returned no outcome value".into(),
)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use temporalio_common::protos::temporal::api::{
common::v1::{Memo, SearchAttributes},
enums::v1::WorkflowExecutionStatus,
sdk::v1::UserMetadata,
workflow::v1::WorkflowExecutionConfig,
};
#[tokio::test]
async fn workflow_description_accessors_expose_decoded_fields() {
let converter = DataConverter::default();
let memo_payload = converter
.to_payload(&SerializationContextData::Workflow, &"memo-value")
.await
.unwrap();
let search_attr_payload = converter
.to_payload(&SerializationContextData::Workflow, &"search-value")
.await
.unwrap();
let summary_payload = converter
.to_payload(&SerializationContextData::Workflow, &"workflow summary")
.await
.unwrap();
let details_payload = converter
.to_payload(&SerializationContextData::Workflow, &"workflow details")
.await
.unwrap();
let description = WorkflowExecutionDescription::new(
DescribeWorkflowExecutionResponse {
workflow_execution_info: Some(workflow::WorkflowExecutionInfo {
execution: Some(ProtoWorkflowExecution {
workflow_id: "wf-id".to_string(),
run_id: "run-id".to_string(),
}),
r#type: Some(
temporalio_common::protos::temporal::api::common::v1::WorkflowType {
name: "wf-type".to_string(),
},
),
status: WorkflowExecutionStatus::Completed as i32,
task_queue: "task-queue".to_string(),
history_length: 42,
memo: Some(Memo {
fields: HashMap::from([("memo-key".to_string(), memo_payload.clone())]),
}),
parent_execution: Some(ProtoWorkflowExecution {
workflow_id: "parent-id".to_string(),
run_id: "parent-run-id".to_string(),
}),
search_attributes: Some(SearchAttributes {
indexed_fields: HashMap::from([(
"CustomKeywordField".to_string(),
search_attr_payload.clone(),
)]),
}),
..Default::default()
}),
execution_config: Some(WorkflowExecutionConfig {
user_metadata: Some(UserMetadata {
summary: Some(summary_payload),
details: Some(details_payload),
}),
..Default::default()
}),
..Default::default()
},
&converter,
)
.await
.unwrap();
assert_eq!(description.id(), "wf-id");
assert_eq!(description.run_id(), "run-id");
assert_eq!(description.workflow_type(), "wf-type");
assert_eq!(description.status(), WorkflowExecutionStatus::Completed);
assert_eq!(description.task_queue(), "task-queue");
assert_eq!(description.history_length(), 42);
assert_eq!(description.parent_id(), Some("parent-id"));
assert_eq!(description.parent_run_id(), Some("parent-run-id"));
assert_eq!(description.memo().unwrap().fields["memo-key"], memo_payload);
assert_eq!(
description.search_attributes().unwrap().indexed_fields["CustomKeywordField"],
search_attr_payload
);
assert_eq!(description.static_summary(), Some("workflow summary"));
assert_eq!(description.static_details(), Some("workflow details"));
}
#[tokio::test]
async fn workflow_description_rejects_negative_history_length() {
let err = WorkflowExecutionDescription::new(
DescribeWorkflowExecutionResponse {
workflow_execution_info: Some(workflow::WorkflowExecutionInfo {
history_length: -1,
..Default::default()
}),
..Default::default()
},
&DataConverter::default(),
)
.await
.unwrap_err();
assert_eq!(
err.to_string(),
"Encoding error: workflow history_length must be non-negative, got -1"
);
}
}