#[doc(inline)]
pub use temporalio_macros::activities;
use futures_util::{FutureExt, future::BoxFuture};
use prost_types::{Duration, Timestamp};
use std::{
collections::HashMap,
fmt::Debug,
sync::Arc,
time::{Duration as StdDuration, SystemTime},
};
use temporalio_client::Priority;
use temporalio_common::{
ActivityDefinition,
data_converters::{
DataConverter, GenericPayloadConverter, SerializationContext, SerializationContextData,
},
error::{ApplicationFailure, FailurePayloads},
protos::{
coresdk::{ActivityHeartbeat, activity_task},
temporal::api::common::v1::{Payload, RetryPolicy, WorkflowExecution},
utilities::TryIntoOrNone,
},
};
use temporalio_sdk_core::Worker as CoreWorker;
use tokio_util::sync::CancellationToken;
#[derive(Clone)]
pub struct ActivityContext {
worker: Arc<CoreWorker>,
cancellation_token: CancellationToken,
heartbeat_details: Vec<Payload>,
header_fields: HashMap<String, Payload>,
info: ActivityInfo,
}
impl ActivityContext {
pub fn new(
worker: Arc<CoreWorker>,
cancellation_token: CancellationToken,
task_queue: String,
task_token: Vec<u8>,
task: activity_task::Start,
) -> (Self, Vec<Payload>) {
let activity_task::Start {
workflow_namespace,
workflow_type,
workflow_execution,
activity_id,
activity_type,
header_fields,
input,
heartbeat_details,
scheduled_time,
current_attempt_scheduled_time,
started_time,
attempt,
schedule_to_close_timeout,
start_to_close_timeout,
heartbeat_timeout,
retry_policy,
is_local,
priority,
run_id,
} = task;
let deadline = calculate_deadline(
scheduled_time.as_ref(),
started_time.as_ref(),
start_to_close_timeout.as_ref(),
schedule_to_close_timeout.as_ref(),
);
(
ActivityContext {
worker,
cancellation_token,
heartbeat_details,
header_fields,
info: ActivityInfo {
task_token,
task_queue,
workflow_type,
workflow_namespace,
workflow_execution,
activity_id,
activity_type,
heartbeat_timeout: heartbeat_timeout.try_into_or_none(),
scheduled_time: scheduled_time.try_into_or_none(),
started_time: started_time.try_into_or_none(),
deadline,
attempt,
current_attempt_scheduled_time: current_attempt_scheduled_time
.try_into_or_none(),
retry_policy,
is_local,
priority: priority.map(Into::into).unwrap_or_default(),
run_id: (!run_id.is_empty()).then_some(run_id),
},
},
input,
)
}
pub async fn cancelled(&self) {
self.cancellation_token.clone().cancelled().await
}
pub fn is_cancelled(&self) -> bool {
self.cancellation_token.is_cancelled()
}
pub fn heartbeat_details(&self) -> &[Payload] {
&self.heartbeat_details
}
pub fn record_heartbeat(&self, details: Vec<Payload>) {
if !self.info.is_local {
self.worker.record_activity_heartbeat(ActivityHeartbeat {
task_token: self.info.task_token.clone(),
details,
})
}
}
pub fn info(&self) -> &ActivityInfo {
&self.info
}
pub fn headers(&self) -> &HashMap<String, Payload> {
&self.header_fields
}
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct ActivityInfo {
pub task_token: Vec<u8>,
pub workflow_type: String,
pub workflow_namespace: String,
pub workflow_execution: Option<WorkflowExecution>,
pub activity_id: String,
pub activity_type: String,
pub task_queue: String,
pub heartbeat_timeout: Option<StdDuration>,
pub scheduled_time: Option<SystemTime>,
pub started_time: Option<SystemTime>,
pub deadline: Option<SystemTime>,
pub attempt: u32,
pub current_attempt_scheduled_time: Option<SystemTime>,
pub retry_policy: Option<RetryPolicy>,
pub is_local: bool,
pub priority: Priority,
pub run_id: Option<String>,
}
#[derive(Debug)]
pub enum ActivityError {
Application(Box<ApplicationFailure>),
Cancelled {
details: Option<FailurePayloads>,
},
WillCompleteAsync,
}
impl ActivityError {
pub fn cancelled() -> Self {
Self::Cancelled { details: None }
}
pub fn cancelled_with_details<T>(details: T) -> Self
where
T: Into<FailurePayloads>,
{
Self::Cancelled {
details: Some(details.into()),
}
}
pub fn application(err: ApplicationFailure) -> Self {
Self::Application(err.into())
}
}
impl<E> From<E> for ActivityError
where
E: Into<anyhow::Error>,
{
fn from(source: E) -> Self {
match source.into().downcast::<ApplicationFailure>() {
Ok(application_failure) => Self::Application(Box::new(application_failure)),
Err(err) => Self::Application(ApplicationFailure::new(err).into()),
}
}
}
fn calculate_deadline(
scheduled_time: Option<&Timestamp>,
started_time: Option<&Timestamp>,
start_to_close_timeout: Option<&Duration>,
schedule_to_close_timeout: Option<&Duration>,
) -> Option<SystemTime> {
match (
scheduled_time,
started_time,
start_to_close_timeout,
schedule_to_close_timeout,
) {
(
Some(scheduled),
Some(started),
Some(start_to_close_timeout),
Some(schedule_to_close_timeout),
) => {
let scheduled: SystemTime = maybe_convert_timestamp(scheduled)?;
let started: SystemTime = maybe_convert_timestamp(started)?;
let start_to_close_timeout: StdDuration = (*start_to_close_timeout).try_into().ok()?;
let schedule_to_close_timeout: StdDuration =
(*schedule_to_close_timeout).try_into().ok()?;
let start_to_close_deadline: SystemTime =
started.checked_add(start_to_close_timeout)?;
if schedule_to_close_timeout > StdDuration::ZERO {
let schedule_to_close_deadline =
scheduled.checked_add(schedule_to_close_timeout)?;
if schedule_to_close_deadline < start_to_close_deadline {
Some(schedule_to_close_deadline)
} else {
Some(start_to_close_deadline)
}
} else {
Some(start_to_close_deadline)
}
}
_ => None,
}
}
fn maybe_convert_timestamp(timestamp: &Timestamp) -> Option<SystemTime> {
let mut timestamp = *timestamp;
timestamp.normalize();
let system_time = if timestamp.seconds >= 0 {
std::time::UNIX_EPOCH.checked_add(StdDuration::from_secs(timestamp.seconds as u64))
} else {
std::time::UNIX_EPOCH.checked_sub(StdDuration::from_secs((-timestamp.seconds) as u64))
};
system_time.and_then(|system_time| {
system_time.checked_add(StdDuration::from_nanos(timestamp.nanos as u64))
})
}
pub(crate) type ActivityInvocation = Arc<
dyn Fn(
Vec<Payload>,
DataConverter,
ActivityContext,
) -> BoxFuture<'static, Result<Payload, ActivityError>>
+ Send
+ Sync,
>;
#[doc(hidden)]
pub trait ActivityImplementer {
fn register_all(self: Arc<Self>, defs: &mut ActivityDefinitions);
}
#[doc(hidden)]
pub trait ExecutableActivity: ActivityDefinition {
type Implementer: ActivityImplementer + Send + Sync + 'static;
fn execute(
receiver: Option<Arc<Self::Implementer>>,
ctx: ActivityContext,
input: Self::Input,
) -> BoxFuture<'static, Result<Self::Output, ActivityError>>;
}
#[doc(hidden)]
pub trait HasOnlyStaticMethods {}
#[derive(Default, Clone)]
pub struct ActivityDefinitions {
activities: HashMap<&'static str, ActivityInvocation>,
}
impl ActivityDefinitions {
pub fn register_activities<AI: ActivityImplementer>(&mut self, instance: AI) -> &mut Self {
let arcd = Arc::new(instance);
AI::register_all(arcd, self);
self
}
pub fn register_activity<AD>(&mut self, instance: Arc<AD::Implementer>) -> &mut Self
where
AD: ActivityDefinition + ExecutableActivity,
AD::Output: Send + Sync,
{
self.activities.insert(
AD::name(),
Arc::new(move |payloads, dc, c| {
let instance = instance.clone();
let dc = dc.clone();
async move {
let pc = dc.payload_converter();
let ctx = SerializationContext {
data: &SerializationContextData::Activity,
converter: pc,
};
let deserialized: AD::Input = pc
.from_payloads(&ctx, payloads)
.map_err(ActivityError::from)?;
let result = AD::execute(Some(instance), c, deserialized).await?;
pc.to_payload(&ctx, &result).map_err(ActivityError::from)
}
.boxed()
}),
);
self
}
pub(crate) fn is_empty(&self) -> bool {
self.activities.is_empty()
}
pub(crate) fn get(&self, act_type: &str) -> Option<ActivityInvocation> {
self.activities.get(act_type).cloned()
}
}
impl Debug for ActivityDefinitions {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ActivityDefinitions")
.field("activities", &self.activities.keys())
.finish()
}
}
#[cfg(test)]
mod test {
use super::*;
use rstest::rstest;
#[rstest]
#[case(true)]
#[case(false)]
fn activity_error_conversion_is_not_lossy(#[case] non_retryable: bool) {
use temporalio_common::protos::temporal::api::enums::v1::ApplicationErrorCategory;
let original = ApplicationFailure::builder(anyhow::anyhow!("big boom"))
.type_name("BigBoom".to_owned())
.non_retryable(non_retryable)
.next_retry_delay(StdDuration::from_secs(3))
.category(ApplicationErrorCategory::Benign)
.details("details")
.build();
let err = ActivityError::from(original);
let ActivityError::Application(actual) = err else {
panic!("application failure should become app failure")
};
assert_eq!(actual.type_name(), Some("BigBoom"));
assert_eq!(actual.is_non_retryable(), non_retryable);
assert_eq!(actual.next_retry_delay(), Some(StdDuration::from_secs(3)));
assert_eq!(actual.category(), ApplicationErrorCategory::Benign);
assert_eq!(actual.to_string(), "big boom");
}
#[test]
fn activity_error_from_special_err_becomes_application() {
#[derive(Debug, PartialEq)]
struct MyError;
impl std::error::Error for MyError {}
impl std::fmt::Display for MyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("MyError")
}
}
let err = ActivityError::from(MyError);
let ActivityError::Application(actual) = err else {
panic!("expected application failure, got {err:?}")
};
assert_eq!(actual.to_string(), "MyError");
}
}