squads_temporal_sdk/
activity_context.rs

1use crate::app_data::AppData;
2
3use prost_types::{Duration, Timestamp};
4use std::{
5    collections::HashMap,
6    sync::Arc,
7    time::{Duration as StdDuration, SystemTime},
8};
9use squads_temporal_client::Priority;
10use squads_temporal_sdk_core_api::Worker;
11use squads_temporal_sdk_core_protos::{
12    coresdk::{ActivityHeartbeat, activity_task},
13    temporal::api::common::v1::{Payload, RetryPolicy, WorkflowExecution},
14    utilities::TryIntoOrNone,
15};
16use tokio_util::sync::CancellationToken;
17
18/// Used within activities to get info, heartbeat management etc.
19#[derive(Clone)]
20pub struct ActContext {
21    worker: Arc<dyn Worker>,
22    app_data: Arc<AppData>,
23    cancellation_token: CancellationToken,
24    input: Vec<Payload>,
25    heartbeat_details: Vec<Payload>,
26    header_fields: HashMap<String, Payload>,
27    info: ActivityInfo,
28}
29
30#[derive(Clone)]
31pub struct ActivityInfo {
32    pub task_token: Vec<u8>,
33    pub workflow_type: String,
34    pub workflow_namespace: String,
35    pub workflow_execution: Option<WorkflowExecution>,
36    pub activity_id: String,
37    pub activity_type: String,
38    pub task_queue: String,
39    pub heartbeat_timeout: Option<StdDuration>,
40    /// Time activity was scheduled by a workflow
41    pub scheduled_time: Option<SystemTime>,
42    /// Time of activity start
43    pub started_time: Option<SystemTime>,
44    /// Time of activity timeout
45    pub deadline: Option<SystemTime>,
46    /// Attempt starts from 1, and increase by 1 for every retry, if retry policy is specified.
47    pub attempt: u32,
48    /// Time this attempt at the activity was scheduled
49    pub current_attempt_scheduled_time: Option<SystemTime>,
50    pub retry_policy: Option<RetryPolicy>,
51    pub is_local: bool,
52    /// Priority of this activity. If unset uses [Priority::default]
53    pub priority: Priority,
54}
55
56impl ActContext {
57    /// Construct new Activity Context, returning the context and the first argument to the activity
58    /// (which may be a default [Payload]).
59    pub(crate) fn new(
60        worker: Arc<dyn Worker>,
61        app_data: Arc<AppData>,
62        cancellation_token: CancellationToken,
63        task_queue: String,
64        task_token: Vec<u8>,
65        task: activity_task::Start,
66    ) -> (Self, Payload) {
67        let activity_task::Start {
68            workflow_namespace,
69            workflow_type,
70            workflow_execution,
71            activity_id,
72            activity_type,
73            header_fields,
74            mut input,
75            heartbeat_details,
76            scheduled_time,
77            current_attempt_scheduled_time,
78            started_time,
79            attempt,
80            schedule_to_close_timeout,
81            start_to_close_timeout,
82            heartbeat_timeout,
83            retry_policy,
84            is_local,
85            priority,
86        } = task;
87        let deadline = calculate_deadline(
88            scheduled_time.as_ref(),
89            started_time.as_ref(),
90            start_to_close_timeout.as_ref(),
91            schedule_to_close_timeout.as_ref(),
92        );
93        let first_arg = input.pop().unwrap_or_default();
94
95        (
96            ActContext {
97                worker,
98                app_data,
99                cancellation_token,
100                input,
101                heartbeat_details,
102                header_fields,
103                info: ActivityInfo {
104                    task_token,
105                    task_queue,
106                    workflow_type,
107                    workflow_namespace,
108                    workflow_execution,
109                    activity_id,
110                    activity_type,
111                    heartbeat_timeout: heartbeat_timeout.try_into_or_none(),
112                    scheduled_time: scheduled_time.try_into_or_none(),
113                    started_time: started_time.try_into_or_none(),
114                    deadline,
115                    attempt,
116                    current_attempt_scheduled_time: current_attempt_scheduled_time
117                        .try_into_or_none(),
118                    retry_policy,
119                    is_local,
120                    priority: priority.map(Into::into).unwrap_or_default(),
121                },
122            },
123            first_arg,
124        )
125    }
126
127    /// Returns a future the completes if and when the activity this was called inside has been
128    /// cancelled
129    pub async fn cancelled(&self) {
130        self.cancellation_token.clone().cancelled().await
131    }
132
133    /// Returns true if this activity has already been cancelled
134    pub fn is_cancelled(&self) -> bool {
135        self.cancellation_token.is_cancelled()
136    }
137
138    /// Retrieve extra parameters to the Activity. The first input is always popped and passed to
139    /// the Activity function for the currently executing activity. However, if more parameters are
140    /// passed, perhaps from another language's SDK, explicit access is available from extra_inputs
141    pub fn extra_inputs(&mut self) -> &mut [Payload] {
142        &mut self.input
143    }
144
145    /// Extract heartbeat details from last failed attempt. This is used in combination with retry policy.
146    pub fn get_heartbeat_details(&self) -> &[Payload] {
147        &self.heartbeat_details
148    }
149
150    /// RecordHeartbeat sends heartbeat for the currently executing activity
151    pub fn record_heartbeat(&self, details: Vec<Payload>) {
152        if !self.info.is_local {
153            self.worker.record_activity_heartbeat(ActivityHeartbeat {
154                task_token: self.info.task_token.clone(),
155                details,
156            })
157        }
158    }
159
160    /// Get activity info of the executing activity
161    pub fn get_info(&self) -> &ActivityInfo {
162        &self.info
163    }
164
165    /// Get headers attached to this activity
166    pub fn headers(&self) -> &HashMap<String, Payload> {
167        &self.header_fields
168    }
169
170    /// Get custom Application Data
171    pub fn app_data<T: Send + Sync + 'static>(&self) -> Option<&T> {
172        self.app_data.get::<T>()
173    }
174}
175
176/// Deadline calculation.  This is a port of
177/// https://github.com/temporalio/sdk-go/blob/8651550973088f27f678118f997839fb1bb9e62f/internal/activity.go#L225
178fn calculate_deadline(
179    scheduled_time: Option<&Timestamp>,
180    started_time: Option<&Timestamp>,
181    start_to_close_timeout: Option<&Duration>,
182    schedule_to_close_timeout: Option<&Duration>,
183) -> Option<SystemTime> {
184    match (
185        scheduled_time,
186        started_time,
187        start_to_close_timeout,
188        schedule_to_close_timeout,
189    ) {
190        (
191            Some(scheduled),
192            Some(started),
193            Some(start_to_close_timeout),
194            Some(schedule_to_close_timeout),
195        ) => {
196            let scheduled: SystemTime = maybe_convert_timestamp(scheduled)?;
197            let started: SystemTime = maybe_convert_timestamp(started)?;
198            let start_to_close_timeout: StdDuration = (*start_to_close_timeout).try_into().ok()?;
199            let schedule_to_close_timeout: StdDuration =
200                (*schedule_to_close_timeout).try_into().ok()?;
201
202            let start_to_close_deadline: SystemTime =
203                started.checked_add(start_to_close_timeout)?;
204            if schedule_to_close_timeout > StdDuration::ZERO {
205                let schedule_to_close_deadline =
206                    scheduled.checked_add(schedule_to_close_timeout)?;
207                // Minimum of the two deadlines.
208                if schedule_to_close_deadline < start_to_close_deadline {
209                    Some(schedule_to_close_deadline)
210                } else {
211                    Some(start_to_close_deadline)
212                }
213            } else {
214                Some(start_to_close_deadline)
215            }
216        }
217        _ => None,
218    }
219}
220
221/// Helper function lifted from prost_types::Timestamp implementation to prevent double cloning in
222/// error construction
223fn maybe_convert_timestamp(timestamp: &Timestamp) -> Option<SystemTime> {
224    let mut timestamp = *timestamp;
225    timestamp.normalize();
226
227    let system_time = if timestamp.seconds >= 0 {
228        std::time::UNIX_EPOCH.checked_add(StdDuration::from_secs(timestamp.seconds as u64))
229    } else {
230        std::time::UNIX_EPOCH.checked_sub(StdDuration::from_secs((-timestamp.seconds) as u64))
231    };
232
233    system_time.and_then(|system_time| {
234        system_time.checked_add(StdDuration::from_nanos(timestamp.nanos as u64))
235    })
236}