1#[doc(inline)]
50pub use temporalio_macros::activities;
51
52use crate::{
53 OutgoingActivityError, OutgoingError,
54 interceptors::{
55 ActivityExecutionValue, ActivityInboundInterceptor, ExecuteActivityInput,
56 ExecuteActivityOutput, Next,
57 },
58 panic_formatter,
59};
60use futures_util::{
61 FutureExt,
62 future::{BoxFuture, ready},
63};
64use prost_types::{Duration, Timestamp};
65use std::{
66 collections::HashMap,
67 fmt::Debug,
68 panic::AssertUnwindSafe,
69 sync::Arc,
70 time::{Duration as StdDuration, SystemTime},
71};
72use temporalio_client::{Client, ClientOptions, Priority, WorkflowExecutionInfo, WorkflowHandle};
73pub use temporalio_common::ActivityError;
74use temporalio_common::{
75 ActivityDefinition, HasWorkflowDefinition,
76 data_converters::{
77 DataConverter, GenericPayloadConverter, SerializationContext, SerializationContextData,
78 },
79 error::ApplicationFailure,
80 protos::{
81 coresdk::{ActivityHeartbeat, activity_result::ActivityExecutionResult, activity_task},
82 temporal::api::common::v1::{Payload, RetryPolicy, WorkflowExecution},
83 utilities::TryIntoOrNone,
84 },
85};
86use temporalio_sdk_core::Worker as CoreWorker;
87use tokio_util::sync::CancellationToken;
88
89#[derive(Clone)]
91pub struct ActivityContext {
92 worker: Arc<CoreWorker>,
93 client_options: ClientOptions,
94 cancellation_token: CancellationToken,
95 heartbeat_details: Vec<Payload>,
96 header_fields: HashMap<String, Payload>,
97 info: ActivityInfo,
98}
99
100impl ActivityContext {
101 pub fn new(
103 worker: Arc<CoreWorker>,
104 client_options: ClientOptions,
105 cancellation_token: CancellationToken,
106 task_queue: String,
107 task_token: Vec<u8>,
108 task: activity_task::Start,
109 ) -> (Self, Vec<Payload>) {
110 let activity_task::Start {
111 workflow_namespace,
112 workflow_type,
113 workflow_execution,
114 activity_id,
115 activity_type,
116 header_fields,
117 input,
118 heartbeat_details,
119 scheduled_time,
120 current_attempt_scheduled_time,
121 started_time,
122 attempt,
123 schedule_to_close_timeout,
124 start_to_close_timeout,
125 heartbeat_timeout,
126 retry_policy,
127 is_local,
128 priority,
129 run_id,
130 } = task;
131 let deadline = calculate_deadline(
132 scheduled_time.as_ref(),
133 started_time.as_ref(),
134 start_to_close_timeout.as_ref(),
135 schedule_to_close_timeout.as_ref(),
136 );
137
138 (
139 ActivityContext {
140 worker,
141 client_options,
142 cancellation_token,
143 heartbeat_details,
144 header_fields,
145 info: ActivityInfo {
146 task_token,
147 task_queue,
148 workflow_type,
149 workflow_namespace,
150 workflow_execution,
151 activity_id,
152 activity_type,
153 heartbeat_timeout: heartbeat_timeout.try_into_or_none(),
154 scheduled_time: scheduled_time.try_into_or_none(),
155 started_time: started_time.try_into_or_none(),
156 deadline,
157 attempt,
158 current_attempt_scheduled_time: current_attempt_scheduled_time
159 .try_into_or_none(),
160 retry_policy,
161 is_local,
162 priority: priority.map(Into::into).unwrap_or_default(),
163 run_id: (!run_id.is_empty()).then_some(run_id),
164 },
165 },
166 input,
167 )
168 }
169
170 pub async fn cancelled(&self) {
173 self.cancellation_token.clone().cancelled().await
174 }
175
176 pub fn is_cancelled(&self) -> bool {
178 self.cancellation_token.is_cancelled()
179 }
180
181 pub fn heartbeat_details(&self) -> &[Payload] {
184 &self.heartbeat_details
185 }
186
187 pub fn record_heartbeat(&self, details: Vec<Payload>) {
189 if !self.info.is_local {
190 self.worker.record_activity_heartbeat(ActivityHeartbeat {
191 task_token: self.info.task_token.clone(),
192 details,
193 })
194 }
195 }
196
197 pub fn info(&self) -> &ActivityInfo {
199 &self.info
200 }
201
202 pub fn client(&self) -> Client {
204 let connection = self.worker.get_client_connection().expect(
205 "activity context client is unavailable because the worker was not created from a \
206 Temporal client",
207 );
208 Client::new(connection, self.client_options.clone())
209 .expect("client construction from a worker connection should be infallible")
210 }
211
212 pub fn workflow_handle<W: HasWorkflowDefinition>(&self) -> Option<WorkflowHandle<Client, W>> {
214 let workflow_execution = self.info.workflow_execution.as_ref()?;
215 let run_id =
216 (!workflow_execution.run_id.is_empty()).then_some(workflow_execution.run_id.clone());
217 Some(WorkflowHandle::new(
218 self.client(),
219 WorkflowExecutionInfo {
220 namespace: self.client_options.namespace.clone(),
221 workflow_id: workflow_execution.workflow_id.clone(),
222 run_id: run_id.clone(),
223 first_execution_run_id: run_id,
224 },
225 ))
226 }
227
228 pub fn headers(&self) -> &HashMap<String, Payload> {
230 &self.header_fields
231 }
232
233 pub(crate) fn headers_mut(&mut self) -> &mut HashMap<String, Payload> {
234 &mut self.header_fields
235 }
236}
237
238#[derive(Clone, Debug)]
240#[non_exhaustive]
241pub struct ActivityInfo {
242 pub task_token: Vec<u8>,
244 pub workflow_type: String,
246 pub workflow_namespace: String,
248 pub workflow_execution: Option<WorkflowExecution>,
250 pub activity_id: String,
252 pub activity_type: String,
254 pub task_queue: String,
256 pub heartbeat_timeout: Option<StdDuration>,
258 pub scheduled_time: Option<SystemTime>,
260 pub started_time: Option<SystemTime>,
262 pub deadline: Option<SystemTime>,
264 pub attempt: u32,
266 pub current_attempt_scheduled_time: Option<SystemTime>,
268 pub retry_policy: Option<RetryPolicy>,
270 pub is_local: bool,
272 pub priority: Priority,
274 pub run_id: Option<String>,
276}
277
278fn calculate_deadline(
281 scheduled_time: Option<&Timestamp>,
282 started_time: Option<&Timestamp>,
283 start_to_close_timeout: Option<&Duration>,
284 schedule_to_close_timeout: Option<&Duration>,
285) -> Option<SystemTime> {
286 match (
287 scheduled_time,
288 started_time,
289 start_to_close_timeout,
290 schedule_to_close_timeout,
291 ) {
292 (
293 Some(scheduled),
294 Some(started),
295 Some(start_to_close_timeout),
296 Some(schedule_to_close_timeout),
297 ) => {
298 let scheduled: SystemTime = maybe_convert_timestamp(scheduled)?;
299 let started: SystemTime = maybe_convert_timestamp(started)?;
300 let start_to_close_timeout: StdDuration = (*start_to_close_timeout).try_into().ok()?;
301 let schedule_to_close_timeout: StdDuration =
302 (*schedule_to_close_timeout).try_into().ok()?;
303
304 let start_to_close_deadline: SystemTime =
305 started.checked_add(start_to_close_timeout)?;
306 if schedule_to_close_timeout > StdDuration::ZERO {
307 let schedule_to_close_deadline =
308 scheduled.checked_add(schedule_to_close_timeout)?;
309 if schedule_to_close_deadline < start_to_close_deadline {
311 Some(schedule_to_close_deadline)
312 } else {
313 Some(start_to_close_deadline)
314 }
315 } else {
316 Some(start_to_close_deadline)
317 }
318 }
319 _ => None,
320 }
321}
322
323fn maybe_convert_timestamp(timestamp: &Timestamp) -> Option<SystemTime> {
326 let mut timestamp = *timestamp;
327 timestamp.normalize();
328
329 let system_time = if timestamp.seconds >= 0 {
330 std::time::UNIX_EPOCH.checked_add(StdDuration::from_secs(timestamp.seconds as u64))
331 } else {
332 std::time::UNIX_EPOCH.checked_sub(StdDuration::from_secs((-timestamp.seconds) as u64))
333 };
334
335 system_time.and_then(|system_time| {
336 system_time.checked_add(StdDuration::from_nanos(timestamp.nanos as u64))
337 })
338}
339
340pub(crate) type ActivityInvocation = Arc<
341 dyn Fn(
342 Vec<Payload>,
343 DataConverter,
344 ActivityContext,
345 Vec<Arc<dyn ActivityInboundInterceptor>>,
346 ) -> ExecuteActivityOutput<'static>
347 + Send
348 + Sync,
349>;
350
351fn call_execute_activity<'a>(
352 interceptors: &'a [Arc<dyn ActivityInboundInterceptor>],
353 input: ExecuteActivityInput,
354 next: Next<'a, ExecuteActivityInput, ExecuteActivityOutput<'a>>,
355) -> ExecuteActivityOutput<'a> {
356 if let Some((first, rest)) = interceptors.split_first() {
357 first.execute_activity(
358 input,
359 Next::new(move |input| call_execute_activity(rest, input, next)),
360 )
361 } else {
362 next.run(input)
363 }
364}
365
366#[doc(hidden)]
367pub trait ActivityImplementer {
368 fn register_all(self: Arc<Self>, defs: &mut ActivityDefinitions);
369}
370
371#[doc(hidden)]
372pub trait ExecutableActivity: ActivityDefinition {
373 type Implementer: ActivityImplementer + Send + Sync + 'static;
374 fn execute(
375 receiver: Option<Arc<Self::Implementer>>,
376 ctx: ActivityContext,
377 input: Self::Input,
378 ) -> BoxFuture<'static, Result<Self::Output, ActivityError>>;
379}
380
381#[doc(hidden)]
382pub trait HasOnlyStaticMethods {}
383
384#[derive(Default, Clone)]
386pub struct ActivityDefinitions {
387 activities: HashMap<&'static str, ActivityInvocation>,
388}
389
390impl ActivityDefinitions {
391 pub fn register_activities<AI: ActivityImplementer>(&mut self, instance: AI) -> &mut Self {
393 let arcd = Arc::new(instance);
394 AI::register_all(arcd, self);
395 self
396 }
397 pub fn register_activity<AD>(&mut self, instance: Arc<AD::Implementer>) -> &mut Self
399 where
400 AD: ActivityDefinition + ExecutableActivity,
401 AD::Input: Send + Sync,
402 AD::Output: Send + Sync,
403 {
404 self.activities.insert(
405 AD::name(),
406 Arc::new(move |payloads, dc, c, activity_inbound_interceptors| {
407 let instance = instance.clone();
408 async move {
409 let pc = dc.payload_converter();
412 let ctx = SerializationContext {
413 data: &SerializationContextData::Activity,
414 converter: pc,
415 };
416 let input: AD::Input = pc.from_payloads(&ctx, payloads)?;
417 let input = ExecuteActivityInput::new(c, Box::new(input));
418 let leaf = activity_inbound_base::<AD>(instance);
419 let activity_execution =
420 call_execute_activity(&activity_inbound_interceptors, input, leaf);
421 match AssertUnwindSafe(activity_execution).catch_unwind().await {
422 Ok(output) => output,
423 Err(panic) => Err(ApplicationFailure::new(anyhow::anyhow!(
424 "Activity function panicked: {}",
425 panic_formatter(panic)
426 ))
427 .into()),
428 }
429 }
430 .boxed()
431 }),
432 );
433 self
434 }
435
436 pub(crate) fn is_empty(&self) -> bool {
437 self.activities.is_empty()
438 }
439
440 pub(crate) fn get(&self, act_type: &str) -> Option<ActivityInvocation> {
441 self.activities.get(act_type).cloned()
442 }
443
444 pub(crate) fn names(&self) -> Vec<&'static str> {
445 let mut names: Vec<_> = self.activities.keys().copied().collect();
446 names.sort_unstable();
447 names
448 }
449}
450
451fn activity_inbound_base<'a, AD>(
452 instance: Arc<AD::Implementer>,
453) -> Next<'a, ExecuteActivityInput, ExecuteActivityOutput<'a>>
454where
455 AD: ActivityDefinition + ExecutableActivity,
456 AD::Input: Send + Sync,
457 AD::Output: Send + Sync,
458{
459 Next::new(
460 move |input: ExecuteActivityInput| -> ExecuteActivityOutput<'a> {
461 let (activity_context, args) = input.into_parts();
462 let args = match args.downcast::<AD::Input>() {
463 Ok(args) => args,
464 Err(_) => {
465 return ready(Err(ApplicationFailure::new(anyhow::anyhow!(
466 "Activity inbound interceptor returned arguments with wrong concrete type for activity {}",
467 AD::name()
468 ))
469 .into()))
470 .boxed();
471 }
472 };
473
474 async move {
475 match AssertUnwindSafe(AD::execute(Some(instance), activity_context, *args))
476 .catch_unwind()
477 .await
478 {
479 Ok(result) => {
480 result.map(|output| Box::new(output) as Box<dyn ActivityExecutionValue>)
481 }
482 Err(panic) => Err(ApplicationFailure::new(anyhow::anyhow!(
483 "Activity function panicked: {}",
484 panic_formatter(panic)
485 ))
486 .into()),
487 }
488 }
489 .boxed()
490 },
491 )
492}
493
494pub(crate) fn activity_error_to_core_result(
495 dc: &DataConverter,
496 err: ActivityError,
497) -> ActivityExecutionResult {
498 match err {
499 ActivityError::Application(app) => ActivityExecutionResult::fail(dc.to_failure(
500 &SerializationContextData::Activity,
501 OutgoingError::Activity(OutgoingActivityError::Application(app)),
502 )),
503 ActivityError::Cancelled { details } => ActivityExecutionResult::cancel(dc.to_failure(
504 &SerializationContextData::Activity,
505 OutgoingError::Activity(OutgoingActivityError::Cancelled { details }),
506 )),
507 ActivityError::WillCompleteAsync => ActivityExecutionResult::will_complete_async(),
508 }
509}
510
511impl Debug for ActivityDefinitions {
512 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
513 f.debug_struct("ActivityDefinitions")
514 .field("activities", &self.activities.keys())
515 .finish()
516 }
517}
518
519#[cfg(test)]
520mod test {
521 use super::*;
522 use rstest::rstest;
523 use temporalio_common::error::{ApplicationErrorCategory, ApplicationFailure};
524
525 #[rstest]
526 #[case(true)]
527 #[case(false)]
528 fn activity_error_conversion_is_not_lossy(#[case] non_retryable: bool) {
529 let original = ApplicationFailure::builder(anyhow::anyhow!("big boom"))
530 .type_name("BigBoom".to_owned())
531 .non_retryable(non_retryable)
532 .next_retry_delay(StdDuration::from_secs(3))
533 .category(ApplicationErrorCategory::Benign)
534 .details("details")
535 .build();
536 let err = ActivityError::from(original);
537 let ActivityError::Application(actual) = err else {
538 panic!("application failure should become app failure")
539 };
540 assert_eq!(actual.type_name(), Some("BigBoom"));
541 assert_eq!(actual.is_non_retryable(), non_retryable);
542 assert_eq!(actual.next_retry_delay(), Some(StdDuration::from_secs(3)));
543 assert_eq!(actual.category(), ApplicationErrorCategory::Benign);
544 assert_eq!(actual.to_string(), "big boom");
545 }
546
547 #[test]
548 fn activity_error_from_special_err_becomes_application() {
549 #[derive(Debug, PartialEq)]
550 struct MyError;
551
552 impl std::error::Error for MyError {}
553 impl std::fmt::Display for MyError {
554 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
555 f.write_str("MyError")
556 }
557 }
558
559 let err = ActivityError::from(MyError);
560 let ActivityError::Application(actual) = err else {
561 panic!("expected application failure, got {err:?}")
562 };
563 assert_eq!(actual.to_string(), "MyError");
564 }
565}