Skip to main content

temporalio_sdk/
activities.rs

1//! Functionality related to defining and interacting with activities
2//!
3//!
4//! An example of defining an activity:
5//! ```
6//! use std::sync::{
7//!     Arc,
8//!     atomic::{AtomicUsize, Ordering},
9//! };
10//! use temporalio_macros::activities;
11//! use temporalio_sdk::activities::{ActivityContext, ActivityError};
12//!
13//! struct MyActivities {
14//!     counter: AtomicUsize,
15//! }
16//!
17//! #[activities]
18//! impl MyActivities {
19//!     #[activity]
20//!     async fn echo(_ctx: ActivityContext, e: String) -> Result<String, ActivityError> {
21//!         Ok(e)
22//!     }
23//!
24//!     #[activity]
25//!     async fn uses_self(self: Arc<Self>, _ctx: ActivityContext) -> Result<(), ActivityError> {
26//!         self.counter.fetch_add(1, Ordering::Relaxed);
27//!         Ok(())
28//!     }
29//! }
30//!
31//! // If you need to refer to an activity that is defined externally, in a different codebase or
32//! // possibly a differenet language, you can simply leave the function body unimplemented like so:
33//!
34//! struct ExternalActivities;
35//! #[activities]
36//! impl ExternalActivities {
37//!     #[activity(name = "foo")]
38//!     async fn foo(_ctx: ActivityContext, _: String) -> Result<String, ActivityError> {
39//!         unimplemented!()
40//!     }
41//! }
42//! ```
43//!
44//! This will allows you to call the activity from workflow code still, but the actual function
45//! will never be invoked, since you won't have registered it with the worker.
46
47#[doc(inline)]
48pub use temporalio_macros::activities;
49
50use futures_util::{FutureExt, future::BoxFuture};
51use prost_types::{Duration, Timestamp};
52use std::{
53    collections::HashMap,
54    fmt::Debug,
55    sync::Arc,
56    time::{Duration as StdDuration, SystemTime},
57};
58use temporalio_client::Priority;
59use temporalio_common::{
60    ActivityDefinition,
61    data_converters::{
62        DataConverter, GenericPayloadConverter, SerializationContext, SerializationContextData,
63    },
64    protos::{
65        coresdk::{ActivityHeartbeat, activity_task},
66        temporal::api::common::v1::{Payload, RetryPolicy, WorkflowExecution},
67        utilities::TryIntoOrNone,
68    },
69};
70use temporalio_sdk_core::Worker as CoreWorker;
71use tokio_util::sync::CancellationToken;
72
73/// Used within activities to get info, heartbeat management etc.
74#[derive(Clone)]
75pub struct ActivityContext {
76    worker: Arc<CoreWorker>,
77    cancellation_token: CancellationToken,
78    heartbeat_details: Vec<Payload>,
79    header_fields: HashMap<String, Payload>,
80    info: ActivityInfo,
81}
82
83impl ActivityContext {
84    /// Construct new Activity Context, returning the context and all arguments to the activity.
85    pub fn new(
86        worker: Arc<CoreWorker>,
87        cancellation_token: CancellationToken,
88        task_queue: String,
89        task_token: Vec<u8>,
90        task: activity_task::Start,
91    ) -> (Self, Vec<Payload>) {
92        let activity_task::Start {
93            workflow_namespace,
94            workflow_type,
95            workflow_execution,
96            activity_id,
97            activity_type,
98            header_fields,
99            input,
100            heartbeat_details,
101            scheduled_time,
102            current_attempt_scheduled_time,
103            started_time,
104            attempt,
105            schedule_to_close_timeout,
106            start_to_close_timeout,
107            heartbeat_timeout,
108            retry_policy,
109            is_local,
110            priority,
111        } = task;
112        let deadline = calculate_deadline(
113            scheduled_time.as_ref(),
114            started_time.as_ref(),
115            start_to_close_timeout.as_ref(),
116            schedule_to_close_timeout.as_ref(),
117        );
118
119        (
120            ActivityContext {
121                worker,
122                cancellation_token,
123                heartbeat_details,
124                header_fields,
125                info: ActivityInfo {
126                    task_token,
127                    task_queue,
128                    workflow_type,
129                    workflow_namespace,
130                    workflow_execution,
131                    activity_id,
132                    activity_type,
133                    heartbeat_timeout: heartbeat_timeout.try_into_or_none(),
134                    scheduled_time: scheduled_time.try_into_or_none(),
135                    started_time: started_time.try_into_or_none(),
136                    deadline,
137                    attempt,
138                    current_attempt_scheduled_time: current_attempt_scheduled_time
139                        .try_into_or_none(),
140                    retry_policy,
141                    is_local,
142                    priority: priority.map(Into::into).unwrap_or_default(),
143                },
144            },
145            input,
146        )
147    }
148
149    /// Returns a future the completes if and when the activity this was called inside has been
150    /// cancelled
151    pub async fn cancelled(&self) {
152        self.cancellation_token.clone().cancelled().await
153    }
154
155    /// Returns true if this activity has already been cancelled
156    pub fn is_cancelled(&self) -> bool {
157        self.cancellation_token.is_cancelled()
158    }
159
160    /// Extract heartbeat details from last failed attempt. This is used in combination with retry
161    /// policy.
162    pub fn heartbeat_details(&self) -> &[Payload] {
163        &self.heartbeat_details
164    }
165
166    /// RecordHeartbeat sends heartbeat for the currently executing activity
167    pub fn record_heartbeat(&self, details: Vec<Payload>) {
168        if !self.info.is_local {
169            self.worker.record_activity_heartbeat(ActivityHeartbeat {
170                task_token: self.info.task_token.clone(),
171                details,
172            })
173        }
174    }
175
176    /// Returns activity info of the executing activity
177    pub fn info(&self) -> &ActivityInfo {
178        &self.info
179    }
180
181    /// Get headers attached to this activity
182    pub fn headers(&self) -> &HashMap<String, Payload> {
183        &self.header_fields
184    }
185}
186
187/// Various information about a specific activity attempt.
188#[derive(Clone, Debug)]
189#[non_exhaustive]
190pub struct ActivityInfo {
191    /// An opaque token representing a specific Activity task.
192    pub task_token: Vec<u8>,
193    /// The type of the workflow that invoked this activity.
194    pub workflow_type: String,
195    /// The namespace of the workflow that invoked this activity.
196    pub workflow_namespace: String,
197    /// The execution of the workflow that invoked this activity.
198    pub workflow_execution: Option<WorkflowExecution>,
199    /// The ID of this activity.
200    pub activity_id: String,
201    /// The type of this activity.
202    pub activity_type: String,
203    /// The task queue of this activity.
204    pub task_queue: String,
205    /// The interval within which this activity must heartbeat or be timed out.
206    pub heartbeat_timeout: Option<StdDuration>,
207    /// Time activity was scheduled by a workflow.
208    pub scheduled_time: Option<SystemTime>,
209    /// Time of activity start.
210    pub started_time: Option<SystemTime>,
211    /// Time of activity timeout.
212    pub deadline: Option<SystemTime>,
213    /// Attempt starts from 1, and increase by 1 for every retry, if retry policy is specified.
214    pub attempt: u32,
215    /// Time this attempt at the activity was scheduled.
216    pub current_attempt_scheduled_time: Option<SystemTime>,
217    /// The retry policy for this activity.
218    pub retry_policy: Option<RetryPolicy>,
219    /// Whether or not this is a local activity.
220    pub is_local: bool,
221    /// Priority of this activity. If unset uses [Priority::default].
222    pub priority: Priority,
223}
224
225/// Returned as errors from activity functions.
226#[derive(Debug)]
227pub enum ActivityError {
228    /// This error can be returned from activities to allow the explicit configuration of certain
229    /// error properties. It's also the default error type that arbitrary errors will be converted
230    /// into.
231    Retryable {
232        /// The underlying error
233        source: Box<dyn std::error::Error + Send + Sync + 'static>,
234        /// If specified, the next retry (if there is one) will occur after this delay
235        explicit_delay: Option<StdDuration>,
236    },
237    /// Return this error to indicate your activity is cancelling
238    Cancelled {
239        /// Some data to save as the cancellation reason
240        details: Option<Payload>,
241    },
242    /// Return this error to indicate that the activity should not be retried.
243    NonRetryable(Box<dyn std::error::Error + Send + Sync + 'static>),
244    /// Return this error to indicate that the activity will be completed outside of this activity
245    /// definition, by an external client.
246    WillCompleteAsync,
247}
248
249impl ActivityError {
250    /// Construct a cancelled error without details
251    pub fn cancelled() -> Self {
252        Self::Cancelled { details: None }
253    }
254}
255
256impl<E> From<E> for ActivityError
257where
258    E: Into<anyhow::Error>,
259{
260    fn from(source: E) -> Self {
261        Self::Retryable {
262            source: source.into().into_boxed_dyn_error(),
263            explicit_delay: None,
264        }
265    }
266}
267
268/// Deadline calculation.  This is a port of
269/// https://github.com/temporalio/sdk-go/blob/8651550973088f27f678118f997839fb1bb9e62f/internal/activity.go#L225
270fn calculate_deadline(
271    scheduled_time: Option<&Timestamp>,
272    started_time: Option<&Timestamp>,
273    start_to_close_timeout: Option<&Duration>,
274    schedule_to_close_timeout: Option<&Duration>,
275) -> Option<SystemTime> {
276    match (
277        scheduled_time,
278        started_time,
279        start_to_close_timeout,
280        schedule_to_close_timeout,
281    ) {
282        (
283            Some(scheduled),
284            Some(started),
285            Some(start_to_close_timeout),
286            Some(schedule_to_close_timeout),
287        ) => {
288            let scheduled: SystemTime = maybe_convert_timestamp(scheduled)?;
289            let started: SystemTime = maybe_convert_timestamp(started)?;
290            let start_to_close_timeout: StdDuration = (*start_to_close_timeout).try_into().ok()?;
291            let schedule_to_close_timeout: StdDuration =
292                (*schedule_to_close_timeout).try_into().ok()?;
293
294            let start_to_close_deadline: SystemTime =
295                started.checked_add(start_to_close_timeout)?;
296            if schedule_to_close_timeout > StdDuration::ZERO {
297                let schedule_to_close_deadline =
298                    scheduled.checked_add(schedule_to_close_timeout)?;
299                // Minimum of the two deadlines.
300                if schedule_to_close_deadline < start_to_close_deadline {
301                    Some(schedule_to_close_deadline)
302                } else {
303                    Some(start_to_close_deadline)
304                }
305            } else {
306                Some(start_to_close_deadline)
307            }
308        }
309        _ => None,
310    }
311}
312
313/// Helper function lifted from prost_types::Timestamp implementation to prevent double cloning in
314/// error construction
315fn maybe_convert_timestamp(timestamp: &Timestamp) -> Option<SystemTime> {
316    let mut timestamp = *timestamp;
317    timestamp.normalize();
318
319    let system_time = if timestamp.seconds >= 0 {
320        std::time::UNIX_EPOCH.checked_add(StdDuration::from_secs(timestamp.seconds as u64))
321    } else {
322        std::time::UNIX_EPOCH.checked_sub(StdDuration::from_secs((-timestamp.seconds) as u64))
323    };
324
325    system_time.and_then(|system_time| {
326        system_time.checked_add(StdDuration::from_nanos(timestamp.nanos as u64))
327    })
328}
329
330pub(crate) type ActivityInvocation = Arc<
331    dyn Fn(
332            Vec<Payload>,
333            DataConverter,
334            ActivityContext,
335        ) -> BoxFuture<'static, Result<Payload, ActivityError>>
336        + Send
337        + Sync,
338>;
339
340#[doc(hidden)]
341pub trait ActivityImplementer {
342    fn register_all(self: Arc<Self>, defs: &mut ActivityDefinitions);
343}
344
345#[doc(hidden)]
346pub trait ExecutableActivity: ActivityDefinition {
347    type Implementer: ActivityImplementer + Send + Sync + 'static;
348    fn execute(
349        receiver: Option<Arc<Self::Implementer>>,
350        ctx: ActivityContext,
351        input: Self::Input,
352    ) -> BoxFuture<'static, Result<Self::Output, ActivityError>>;
353}
354
355#[doc(hidden)]
356pub trait HasOnlyStaticMethods {}
357
358/// Contains activity registrations in a form ready for execution by workers.
359#[derive(Default, Clone)]
360pub struct ActivityDefinitions {
361    activities: HashMap<&'static str, ActivityInvocation>,
362}
363
364impl ActivityDefinitions {
365    /// Registers all activities on an activity implementer.
366    pub fn register_activities<AI: ActivityImplementer>(&mut self, instance: AI) -> &mut Self {
367        let arcd = Arc::new(instance);
368        AI::register_all(arcd, self);
369        self
370    }
371    /// Registers a specific activitiy.
372    pub fn register_activity<AD>(&mut self, instance: Arc<AD::Implementer>) -> &mut Self
373    where
374        AD: ActivityDefinition + ExecutableActivity,
375        AD::Output: Send + Sync,
376    {
377        self.activities.insert(
378            AD::name(),
379            Arc::new(move |payloads, dc, c| {
380                let instance = instance.clone();
381                let dc = dc.clone();
382                async move {
383                    // Use PayloadConverter (not DataConverter) since the codec is applied
384                    // at the SDK/Core boundary by the visitor, not here.
385                    let pc = dc.payload_converter();
386                    let ctx = SerializationContext {
387                        data: &SerializationContextData::Activity,
388                        converter: pc,
389                    };
390                    let deserialized: AD::Input = pc
391                        .from_payloads(&ctx, payloads)
392                        .map_err(ActivityError::from)?;
393                    let result = AD::execute(Some(instance), c, deserialized).await?;
394                    pc.to_payload(&ctx, &result).map_err(ActivityError::from)
395                }
396                .boxed()
397            }),
398        );
399        self
400    }
401
402    pub(crate) fn is_empty(&self) -> bool {
403        self.activities.is_empty()
404    }
405
406    pub(crate) fn get(&self, act_type: &str) -> Option<ActivityInvocation> {
407        self.activities.get(act_type).cloned()
408    }
409}
410
411impl Debug for ActivityDefinitions {
412    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
413        f.debug_struct("ActivityDefinitions")
414            .field("activities", &self.activities.keys())
415            .finish()
416    }
417}