#[doc(inline)]
pub use temporalio_macros::activities;
use futures_util::{FutureExt, future::BoxFuture};
use prost_types::{Duration, Timestamp};
use std::{
collections::HashMap,
fmt::Debug,
sync::Arc,
time::{Duration as StdDuration, SystemTime},
};
use temporalio_client::Priority;
use temporalio_common::{
ActivityDefinition,
data_converters::{
DataConverter, GenericPayloadConverter, SerializationContext, SerializationContextData,
},
protos::{
coresdk::{ActivityHeartbeat, activity_task},
temporal::api::common::v1::{Payload, RetryPolicy, WorkflowExecution},
utilities::TryIntoOrNone,
},
};
use temporalio_sdk_core::Worker as CoreWorker;
use tokio_util::sync::CancellationToken;
#[derive(Clone)]
pub struct ActivityContext {
worker: Arc<CoreWorker>,
cancellation_token: CancellationToken,
heartbeat_details: Vec<Payload>,
header_fields: HashMap<String, Payload>,
info: ActivityInfo,
}
impl ActivityContext {
pub fn new(
worker: Arc<CoreWorker>,
cancellation_token: CancellationToken,
task_queue: String,
task_token: Vec<u8>,
task: activity_task::Start,
) -> (Self, Vec<Payload>) {
let activity_task::Start {
workflow_namespace,
workflow_type,
workflow_execution,
activity_id,
activity_type,
header_fields,
input,
heartbeat_details,
scheduled_time,
current_attempt_scheduled_time,
started_time,
attempt,
schedule_to_close_timeout,
start_to_close_timeout,
heartbeat_timeout,
retry_policy,
is_local,
priority,
} = task;
let deadline = calculate_deadline(
scheduled_time.as_ref(),
started_time.as_ref(),
start_to_close_timeout.as_ref(),
schedule_to_close_timeout.as_ref(),
);
(
ActivityContext {
worker,
cancellation_token,
heartbeat_details,
header_fields,
info: ActivityInfo {
task_token,
task_queue,
workflow_type,
workflow_namespace,
workflow_execution,
activity_id,
activity_type,
heartbeat_timeout: heartbeat_timeout.try_into_or_none(),
scheduled_time: scheduled_time.try_into_or_none(),
started_time: started_time.try_into_or_none(),
deadline,
attempt,
current_attempt_scheduled_time: current_attempt_scheduled_time
.try_into_or_none(),
retry_policy,
is_local,
priority: priority.map(Into::into).unwrap_or_default(),
},
},
input,
)
}
pub async fn cancelled(&self) {
self.cancellation_token.clone().cancelled().await
}
pub fn is_cancelled(&self) -> bool {
self.cancellation_token.is_cancelled()
}
pub fn heartbeat_details(&self) -> &[Payload] {
&self.heartbeat_details
}
pub fn record_heartbeat(&self, details: Vec<Payload>) {
if !self.info.is_local {
self.worker.record_activity_heartbeat(ActivityHeartbeat {
task_token: self.info.task_token.clone(),
details,
})
}
}
pub fn info(&self) -> &ActivityInfo {
&self.info
}
pub fn headers(&self) -> &HashMap<String, Payload> {
&self.header_fields
}
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct ActivityInfo {
pub task_token: Vec<u8>,
pub workflow_type: String,
pub workflow_namespace: String,
pub workflow_execution: Option<WorkflowExecution>,
pub activity_id: String,
pub activity_type: String,
pub task_queue: String,
pub heartbeat_timeout: Option<StdDuration>,
pub scheduled_time: Option<SystemTime>,
pub started_time: Option<SystemTime>,
pub deadline: Option<SystemTime>,
pub attempt: u32,
pub current_attempt_scheduled_time: Option<SystemTime>,
pub retry_policy: Option<RetryPolicy>,
pub is_local: bool,
pub priority: Priority,
}
#[derive(Debug)]
pub enum ActivityError {
Retryable {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
explicit_delay: Option<StdDuration>,
},
Cancelled {
details: Option<Payload>,
},
NonRetryable(Box<dyn std::error::Error + Send + Sync + 'static>),
WillCompleteAsync,
}
impl ActivityError {
pub fn cancelled() -> Self {
Self::Cancelled { details: None }
}
}
impl<E> From<E> for ActivityError
where
E: Into<anyhow::Error>,
{
fn from(source: E) -> Self {
Self::Retryable {
source: source.into().into_boxed_dyn_error(),
explicit_delay: None,
}
}
}
fn calculate_deadline(
scheduled_time: Option<&Timestamp>,
started_time: Option<&Timestamp>,
start_to_close_timeout: Option<&Duration>,
schedule_to_close_timeout: Option<&Duration>,
) -> Option<SystemTime> {
match (
scheduled_time,
started_time,
start_to_close_timeout,
schedule_to_close_timeout,
) {
(
Some(scheduled),
Some(started),
Some(start_to_close_timeout),
Some(schedule_to_close_timeout),
) => {
let scheduled: SystemTime = maybe_convert_timestamp(scheduled)?;
let started: SystemTime = maybe_convert_timestamp(started)?;
let start_to_close_timeout: StdDuration = (*start_to_close_timeout).try_into().ok()?;
let schedule_to_close_timeout: StdDuration =
(*schedule_to_close_timeout).try_into().ok()?;
let start_to_close_deadline: SystemTime =
started.checked_add(start_to_close_timeout)?;
if schedule_to_close_timeout > StdDuration::ZERO {
let schedule_to_close_deadline =
scheduled.checked_add(schedule_to_close_timeout)?;
if schedule_to_close_deadline < start_to_close_deadline {
Some(schedule_to_close_deadline)
} else {
Some(start_to_close_deadline)
}
} else {
Some(start_to_close_deadline)
}
}
_ => None,
}
}
fn maybe_convert_timestamp(timestamp: &Timestamp) -> Option<SystemTime> {
let mut timestamp = *timestamp;
timestamp.normalize();
let system_time = if timestamp.seconds >= 0 {
std::time::UNIX_EPOCH.checked_add(StdDuration::from_secs(timestamp.seconds as u64))
} else {
std::time::UNIX_EPOCH.checked_sub(StdDuration::from_secs((-timestamp.seconds) as u64))
};
system_time.and_then(|system_time| {
system_time.checked_add(StdDuration::from_nanos(timestamp.nanos as u64))
})
}
pub(crate) type ActivityInvocation = Arc<
dyn Fn(
Vec<Payload>,
DataConverter,
ActivityContext,
) -> BoxFuture<'static, Result<Payload, ActivityError>>
+ Send
+ Sync,
>;
#[doc(hidden)]
pub trait ActivityImplementer {
fn register_all(self: Arc<Self>, defs: &mut ActivityDefinitions);
}
#[doc(hidden)]
pub trait ExecutableActivity: ActivityDefinition {
type Implementer: ActivityImplementer + Send + Sync + 'static;
fn execute(
receiver: Option<Arc<Self::Implementer>>,
ctx: ActivityContext,
input: Self::Input,
) -> BoxFuture<'static, Result<Self::Output, ActivityError>>;
}
#[doc(hidden)]
pub trait HasOnlyStaticMethods {}
#[derive(Default, Clone)]
pub struct ActivityDefinitions {
activities: HashMap<&'static str, ActivityInvocation>,
}
impl ActivityDefinitions {
pub fn register_activities<AI: ActivityImplementer>(&mut self, instance: AI) -> &mut Self {
let arcd = Arc::new(instance);
AI::register_all(arcd, self);
self
}
pub fn register_activity<AD>(&mut self, instance: Arc<AD::Implementer>) -> &mut Self
where
AD: ActivityDefinition + ExecutableActivity,
AD::Output: Send + Sync,
{
self.activities.insert(
AD::name(),
Arc::new(move |payloads, dc, c| {
let instance = instance.clone();
let dc = dc.clone();
async move {
let pc = dc.payload_converter();
let ctx = SerializationContext {
data: &SerializationContextData::Activity,
converter: pc,
};
let deserialized: AD::Input = pc
.from_payloads(&ctx, payloads)
.map_err(ActivityError::from)?;
let result = AD::execute(Some(instance), c, deserialized).await?;
pc.to_payload(&ctx, &result).map_err(ActivityError::from)
}
.boxed()
}),
);
self
}
pub(crate) fn is_empty(&self) -> bool {
self.activities.is_empty()
}
pub(crate) fn get(&self, act_type: &str) -> Option<ActivityInvocation> {
self.activities.get(act_type).cloned()
}
}
impl Debug for ActivityDefinitions {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ActivityDefinitions")
.field("activities", &self.activities.keys())
.finish()
}
}