Skip to main content

temporalio_sdk/
activities.rs

1//! Functionality related to defining and interacting with activities
2//!
3//!
4//! An example of defining an activity:
5//! ```
6//! use std::sync::{
7//!     Arc,
8//!     atomic::{AtomicUsize, Ordering},
9//! };
10//! use temporalio_macros::{activities, activity_definitions};
11//! use temporalio_sdk::activities::{ActivityContext, ActivityError};
12//!
13//! struct MyActivities {
14//!     counter: AtomicUsize,
15//! }
16//!
17//! #[activities]
18//! impl MyActivities {
19//!     #[activity]
20//!     async fn echo(_ctx: ActivityContext, e: String) -> Result<String, ActivityError> {
21//!         Ok(e)
22//!     }
23//!
24//!     #[activity]
25//!     async fn uses_self(self: Arc<Self>, _ctx: ActivityContext) -> Result<(), ActivityError> {
26//!         self.counter.fetch_add(1, Ordering::Relaxed);
27//!         Ok(())
28//!     }
29//! }
30//!
31//! // If you need to refer to an activity that is defined externally, in a different codebase or
32//! // possibly a different language, use `#[activity_definitions]`. Methods must omit the
33//! // `ActivityContext` parameter and have a body of `unimplemented!()`. Workflows can then call
34//! // these definitions just like real activities.
35//!
36//! struct ExternalActivities;
37//! #[activity_definitions]
38//! impl ExternalActivities {
39//!     #[activity(name = "foo")]
40//!     fn foo(_: String) -> Result<String, ActivityError> {
41//!         unimplemented!()
42//!     }
43//! }
44//! ```
45//!
46//! This will allows you to call the activity from workflow code still, but the actual function
47//! will never be invoked, since you won't have registered it with the worker.
48
49#[doc(inline)]
50pub use temporalio_macros::activities;
51
52use crate::{
53    OutgoingActivityError, OutgoingError,
54    interceptors::{
55        ActivityExecutionValue, ActivityInboundInterceptor, ExecuteActivityInput,
56        ExecuteActivityOutput, Next,
57    },
58    panic_formatter,
59};
60use futures_util::{
61    FutureExt,
62    future::{BoxFuture, ready},
63};
64use prost_types::{Duration, Timestamp};
65use std::{
66    collections::HashMap,
67    fmt::Debug,
68    panic::AssertUnwindSafe,
69    sync::Arc,
70    time::{Duration as StdDuration, SystemTime},
71};
72use temporalio_client::{Client, ClientOptions, Priority, WorkflowExecutionInfo, WorkflowHandle};
73pub use temporalio_common::ActivityError;
74use temporalio_common::{
75    ActivityDefinition, HasWorkflowDefinition,
76    data_converters::{
77        DataConverter, GenericPayloadConverter, SerializationContext, SerializationContextData,
78    },
79    error::ApplicationFailure,
80    protos::{
81        coresdk::{ActivityHeartbeat, activity_result::ActivityExecutionResult, activity_task},
82        temporal::api::common::v1::{Payload, RetryPolicy, WorkflowExecution},
83        utilities::TryIntoOrNone,
84    },
85};
86use temporalio_sdk_core::Worker as CoreWorker;
87use tokio_util::sync::CancellationToken;
88
89/// Used within activities to get info, heartbeat management etc.
90#[derive(Clone)]
91pub struct ActivityContext {
92    worker: Arc<CoreWorker>,
93    client_options: ClientOptions,
94    cancellation_token: CancellationToken,
95    heartbeat_details: Vec<Payload>,
96    header_fields: HashMap<String, Payload>,
97    info: ActivityInfo,
98}
99
100impl ActivityContext {
101    /// Construct new Activity Context, returning the context and all arguments to the activity.
102    pub fn new(
103        worker: Arc<CoreWorker>,
104        client_options: ClientOptions,
105        cancellation_token: CancellationToken,
106        task_queue: String,
107        task_token: Vec<u8>,
108        task: activity_task::Start,
109    ) -> (Self, Vec<Payload>) {
110        let activity_task::Start {
111            workflow_namespace,
112            workflow_type,
113            workflow_execution,
114            activity_id,
115            activity_type,
116            header_fields,
117            input,
118            heartbeat_details,
119            scheduled_time,
120            current_attempt_scheduled_time,
121            started_time,
122            attempt,
123            schedule_to_close_timeout,
124            start_to_close_timeout,
125            heartbeat_timeout,
126            retry_policy,
127            is_local,
128            priority,
129            run_id,
130        } = task;
131        let deadline = calculate_deadline(
132            scheduled_time.as_ref(),
133            started_time.as_ref(),
134            start_to_close_timeout.as_ref(),
135            schedule_to_close_timeout.as_ref(),
136        );
137
138        (
139            ActivityContext {
140                worker,
141                client_options,
142                cancellation_token,
143                heartbeat_details,
144                header_fields,
145                info: ActivityInfo {
146                    task_token,
147                    task_queue,
148                    workflow_type,
149                    workflow_namespace,
150                    workflow_execution,
151                    activity_id,
152                    activity_type,
153                    heartbeat_timeout: heartbeat_timeout.try_into_or_none(),
154                    scheduled_time: scheduled_time.try_into_or_none(),
155                    started_time: started_time.try_into_or_none(),
156                    deadline,
157                    attempt,
158                    current_attempt_scheduled_time: current_attempt_scheduled_time
159                        .try_into_or_none(),
160                    retry_policy,
161                    is_local,
162                    priority: priority.map(Into::into).unwrap_or_default(),
163                    run_id: (!run_id.is_empty()).then_some(run_id),
164                },
165            },
166            input,
167        )
168    }
169
170    /// Returns a future the completes if and when the activity this was called inside has been
171    /// cancelled
172    pub async fn cancelled(&self) {
173        self.cancellation_token.clone().cancelled().await
174    }
175
176    /// Returns true if this activity has already been cancelled
177    pub fn is_cancelled(&self) -> bool {
178        self.cancellation_token.is_cancelled()
179    }
180
181    /// Extract heartbeat details from last failed attempt. This is used in combination with retry
182    /// policy.
183    pub fn heartbeat_details(&self) -> &[Payload] {
184        &self.heartbeat_details
185    }
186
187    /// RecordHeartbeat sends heartbeat for the currently executing activity
188    pub fn record_heartbeat(&self, details: Vec<Payload>) {
189        if !self.info.is_local {
190            self.worker.record_activity_heartbeat(ActivityHeartbeat {
191                task_token: self.info.task_token.clone(),
192                details,
193            })
194        }
195    }
196
197    /// Returns activity info of the executing activity
198    pub fn info(&self) -> &ActivityInfo {
199        &self.info
200    }
201
202    /// Return a client targeting the same Temporal service and namespace as this activity's worker.
203    pub fn client(&self) -> Client {
204        let connection = self.worker.get_client_connection().expect(
205            "activity context client is unavailable because the worker was not created from a \
206             Temporal client",
207        );
208        Client::new(connection, self.client_options.clone())
209            .expect("client construction from a worker connection should be infallible")
210    }
211
212    /// Return a workflow handle for the workflow execution that started this activity, if any.
213    pub fn workflow_handle<W: HasWorkflowDefinition>(&self) -> Option<WorkflowHandle<Client, W>> {
214        let workflow_execution = self.info.workflow_execution.as_ref()?;
215        let run_id =
216            (!workflow_execution.run_id.is_empty()).then_some(workflow_execution.run_id.clone());
217        Some(WorkflowHandle::new(
218            self.client(),
219            WorkflowExecutionInfo {
220                namespace: self.client_options.namespace.clone(),
221                workflow_id: workflow_execution.workflow_id.clone(),
222                run_id: run_id.clone(),
223                first_execution_run_id: run_id,
224            },
225        ))
226    }
227
228    /// Get headers attached to this activity
229    pub fn headers(&self) -> &HashMap<String, Payload> {
230        &self.header_fields
231    }
232
233    pub(crate) fn headers_mut(&mut self) -> &mut HashMap<String, Payload> {
234        &mut self.header_fields
235    }
236}
237
238/// Various information about a specific activity attempt.
239#[derive(Clone, Debug)]
240#[non_exhaustive]
241pub struct ActivityInfo {
242    /// An opaque token representing a specific Activity task.
243    pub task_token: Vec<u8>,
244    /// The type of the workflow that invoked this activity.
245    pub workflow_type: String,
246    /// The namespace of the workflow that invoked this activity.
247    pub workflow_namespace: String,
248    /// The execution of the workflow that invoked this activity.
249    pub workflow_execution: Option<WorkflowExecution>,
250    /// The ID of this activity.
251    pub activity_id: String,
252    /// The type of this activity.
253    pub activity_type: String,
254    /// The task queue of this activity.
255    pub task_queue: String,
256    /// The interval within which this activity must heartbeat or be timed out.
257    pub heartbeat_timeout: Option<StdDuration>,
258    /// Time activity was scheduled by a workflow.
259    pub scheduled_time: Option<SystemTime>,
260    /// Time of activity start.
261    pub started_time: Option<SystemTime>,
262    /// Time of activity timeout.
263    pub deadline: Option<SystemTime>,
264    /// Attempt starts from 1, and increase by 1 for every retry, if retry policy is specified.
265    pub attempt: u32,
266    /// Time this attempt at the activity was scheduled.
267    pub current_attempt_scheduled_time: Option<SystemTime>,
268    /// The retry policy for this activity.
269    pub retry_policy: Option<RetryPolicy>,
270    /// Whether or not this is a local activity.
271    pub is_local: bool,
272    /// Priority of this activity. If unset uses [Priority::default].
273    pub priority: Priority,
274    /// Run ID of this activity execution. Only set for standalone activities.
275    pub run_id: Option<String>,
276}
277
278/// Deadline calculation.  This is a port of
279/// https://github.com/temporalio/sdk-go/blob/8651550973088f27f678118f997839fb1bb9e62f/internal/activity.go#L225
280fn calculate_deadline(
281    scheduled_time: Option<&Timestamp>,
282    started_time: Option<&Timestamp>,
283    start_to_close_timeout: Option<&Duration>,
284    schedule_to_close_timeout: Option<&Duration>,
285) -> Option<SystemTime> {
286    match (
287        scheduled_time,
288        started_time,
289        start_to_close_timeout,
290        schedule_to_close_timeout,
291    ) {
292        (
293            Some(scheduled),
294            Some(started),
295            Some(start_to_close_timeout),
296            Some(schedule_to_close_timeout),
297        ) => {
298            let scheduled: SystemTime = maybe_convert_timestamp(scheduled)?;
299            let started: SystemTime = maybe_convert_timestamp(started)?;
300            let start_to_close_timeout: StdDuration = (*start_to_close_timeout).try_into().ok()?;
301            let schedule_to_close_timeout: StdDuration =
302                (*schedule_to_close_timeout).try_into().ok()?;
303
304            let start_to_close_deadline: SystemTime =
305                started.checked_add(start_to_close_timeout)?;
306            if schedule_to_close_timeout > StdDuration::ZERO {
307                let schedule_to_close_deadline =
308                    scheduled.checked_add(schedule_to_close_timeout)?;
309                // Minimum of the two deadlines.
310                if schedule_to_close_deadline < start_to_close_deadline {
311                    Some(schedule_to_close_deadline)
312                } else {
313                    Some(start_to_close_deadline)
314                }
315            } else {
316                Some(start_to_close_deadline)
317            }
318        }
319        _ => None,
320    }
321}
322
323/// Helper function lifted from prost_types::Timestamp implementation to prevent double cloning in
324/// error construction
325fn maybe_convert_timestamp(timestamp: &Timestamp) -> Option<SystemTime> {
326    let mut timestamp = *timestamp;
327    timestamp.normalize();
328
329    let system_time = if timestamp.seconds >= 0 {
330        std::time::UNIX_EPOCH.checked_add(StdDuration::from_secs(timestamp.seconds as u64))
331    } else {
332        std::time::UNIX_EPOCH.checked_sub(StdDuration::from_secs((-timestamp.seconds) as u64))
333    };
334
335    system_time.and_then(|system_time| {
336        system_time.checked_add(StdDuration::from_nanos(timestamp.nanos as u64))
337    })
338}
339
340pub(crate) type ActivityInvocation = Arc<
341    dyn Fn(
342            Vec<Payload>,
343            DataConverter,
344            ActivityContext,
345            Vec<Arc<dyn ActivityInboundInterceptor>>,
346        ) -> ExecuteActivityOutput<'static>
347        + Send
348        + Sync,
349>;
350
351fn call_execute_activity<'a>(
352    interceptors: &'a [Arc<dyn ActivityInboundInterceptor>],
353    input: ExecuteActivityInput,
354    next: Next<'a, ExecuteActivityInput, ExecuteActivityOutput<'a>>,
355) -> ExecuteActivityOutput<'a> {
356    if let Some((first, rest)) = interceptors.split_first() {
357        first.execute_activity(
358            input,
359            Next::new(move |input| call_execute_activity(rest, input, next)),
360        )
361    } else {
362        next.run(input)
363    }
364}
365
366#[doc(hidden)]
367pub trait ActivityImplementer {
368    fn register_all(self: Arc<Self>, defs: &mut ActivityDefinitions);
369}
370
371#[doc(hidden)]
372pub trait ExecutableActivity: ActivityDefinition {
373    type Implementer: ActivityImplementer + Send + Sync + 'static;
374    fn execute(
375        receiver: Option<Arc<Self::Implementer>>,
376        ctx: ActivityContext,
377        input: Self::Input,
378    ) -> BoxFuture<'static, Result<Self::Output, ActivityError>>;
379}
380
381#[doc(hidden)]
382pub trait HasOnlyStaticMethods {}
383
384/// Contains activity registrations in a form ready for execution by workers.
385#[derive(Default, Clone)]
386pub struct ActivityDefinitions {
387    activities: HashMap<&'static str, ActivityInvocation>,
388}
389
390impl ActivityDefinitions {
391    /// Registers all activities on an activity implementer.
392    pub fn register_activities<AI: ActivityImplementer>(&mut self, instance: AI) -> &mut Self {
393        let arcd = Arc::new(instance);
394        AI::register_all(arcd, self);
395        self
396    }
397    /// Registers a specific activitiy.
398    pub fn register_activity<AD>(&mut self, instance: Arc<AD::Implementer>) -> &mut Self
399    where
400        AD: ActivityDefinition + ExecutableActivity,
401        AD::Input: Send + Sync,
402        AD::Output: Send + Sync,
403    {
404        self.activities.insert(
405            AD::name(),
406            Arc::new(move |payloads, dc, c, activity_inbound_interceptors| {
407                let instance = instance.clone();
408                async move {
409                    // Codec application happens at the SDK/Core boundary, so activity
410                    // implementations work with the payload converter directly.
411                    let pc = dc.payload_converter();
412                    let ctx = SerializationContext {
413                        data: &SerializationContextData::Activity,
414                        converter: pc,
415                    };
416                    let input: AD::Input = pc.from_payloads(&ctx, payloads)?;
417                    let input = ExecuteActivityInput::new(c, Box::new(input));
418                    let leaf = activity_inbound_base::<AD>(instance);
419                    let activity_execution =
420                        call_execute_activity(&activity_inbound_interceptors, input, leaf);
421                    match AssertUnwindSafe(activity_execution).catch_unwind().await {
422                        Ok(output) => output,
423                        Err(panic) => Err(ApplicationFailure::new(anyhow::anyhow!(
424                            "Activity function panicked: {}",
425                            panic_formatter(panic)
426                        ))
427                        .into()),
428                    }
429                }
430                .boxed()
431            }),
432        );
433        self
434    }
435
436    pub(crate) fn is_empty(&self) -> bool {
437        self.activities.is_empty()
438    }
439
440    pub(crate) fn get(&self, act_type: &str) -> Option<ActivityInvocation> {
441        self.activities.get(act_type).cloned()
442    }
443
444    pub(crate) fn names(&self) -> Vec<&'static str> {
445        let mut names: Vec<_> = self.activities.keys().copied().collect();
446        names.sort_unstable();
447        names
448    }
449}
450
451fn activity_inbound_base<'a, AD>(
452    instance: Arc<AD::Implementer>,
453) -> Next<'a, ExecuteActivityInput, ExecuteActivityOutput<'a>>
454where
455    AD: ActivityDefinition + ExecutableActivity,
456    AD::Input: Send + Sync,
457    AD::Output: Send + Sync,
458{
459    Next::new(
460        move |input: ExecuteActivityInput| -> ExecuteActivityOutput<'a> {
461            let (activity_context, args) = input.into_parts();
462            let args = match args.downcast::<AD::Input>() {
463                Ok(args) => args,
464                Err(_) => {
465                    return ready(Err(ApplicationFailure::new(anyhow::anyhow!(
466                    "Activity inbound interceptor returned arguments with wrong concrete type for activity {}",
467                    AD::name()
468                ))
469                .into()))
470                .boxed();
471                }
472            };
473
474            async move {
475                match AssertUnwindSafe(AD::execute(Some(instance), activity_context, *args))
476                    .catch_unwind()
477                    .await
478                {
479                    Ok(result) => {
480                        result.map(|output| Box::new(output) as Box<dyn ActivityExecutionValue>)
481                    }
482                    Err(panic) => Err(ApplicationFailure::new(anyhow::anyhow!(
483                        "Activity function panicked: {}",
484                        panic_formatter(panic)
485                    ))
486                    .into()),
487                }
488            }
489            .boxed()
490        },
491    )
492}
493
494pub(crate) fn activity_error_to_core_result(
495    dc: &DataConverter,
496    err: ActivityError,
497) -> ActivityExecutionResult {
498    match err {
499        ActivityError::Application(app) => ActivityExecutionResult::fail(dc.to_failure(
500            &SerializationContextData::Activity,
501            OutgoingError::Activity(OutgoingActivityError::Application(app)),
502        )),
503        ActivityError::Cancelled { details } => ActivityExecutionResult::cancel(dc.to_failure(
504            &SerializationContextData::Activity,
505            OutgoingError::Activity(OutgoingActivityError::Cancelled { details }),
506        )),
507        ActivityError::WillCompleteAsync => ActivityExecutionResult::will_complete_async(),
508    }
509}
510
511impl Debug for ActivityDefinitions {
512    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
513        f.debug_struct("ActivityDefinitions")
514            .field("activities", &self.activities.keys())
515            .finish()
516    }
517}
518
519#[cfg(test)]
520mod test {
521    use super::*;
522    use rstest::rstest;
523    use temporalio_common::error::{ApplicationErrorCategory, ApplicationFailure};
524
525    #[rstest]
526    #[case(true)]
527    #[case(false)]
528    fn activity_error_conversion_is_not_lossy(#[case] non_retryable: bool) {
529        let original = ApplicationFailure::builder(anyhow::anyhow!("big boom"))
530            .type_name("BigBoom".to_owned())
531            .non_retryable(non_retryable)
532            .next_retry_delay(StdDuration::from_secs(3))
533            .category(ApplicationErrorCategory::Benign)
534            .details("details")
535            .build();
536        let err = ActivityError::from(original);
537        let ActivityError::Application(actual) = err else {
538            panic!("application failure should become app failure")
539        };
540        assert_eq!(actual.type_name(), Some("BigBoom"));
541        assert_eq!(actual.is_non_retryable(), non_retryable);
542        assert_eq!(actual.next_retry_delay(), Some(StdDuration::from_secs(3)));
543        assert_eq!(actual.category(), ApplicationErrorCategory::Benign);
544        assert_eq!(actual.to_string(), "big boom");
545    }
546
547    #[test]
548    fn activity_error_from_special_err_becomes_application() {
549        #[derive(Debug, PartialEq)]
550        struct MyError;
551
552        impl std::error::Error for MyError {}
553        impl std::fmt::Display for MyError {
554            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
555                f.write_str("MyError")
556            }
557        }
558
559        let err = ActivityError::from(MyError);
560        let ActivityError::Application(actual) = err else {
561            panic!("expected application failure, got {err:?}")
562        };
563        assert_eq!(actual.to_string(), "MyError");
564    }
565}