1use crate::app_data::AppData;
2
3use prost_types::{Duration, Timestamp};
4use std::{
5 collections::HashMap,
6 sync::Arc,
7 time::{Duration as StdDuration, SystemTime},
8};
9use squads_temporal_client::Priority;
10use squads_temporal_sdk_core_api::Worker;
11use squads_temporal_sdk_core_protos::{
12 coresdk::{ActivityHeartbeat, activity_task},
13 temporal::api::common::v1::{Payload, RetryPolicy, WorkflowExecution},
14 utilities::TryIntoOrNone,
15};
16use tokio_util::sync::CancellationToken;
17
18#[derive(Clone)]
20pub struct ActContext {
21 worker: Arc<dyn Worker>,
22 app_data: Arc<AppData>,
23 cancellation_token: CancellationToken,
24 input: Vec<Payload>,
25 heartbeat_details: Vec<Payload>,
26 header_fields: HashMap<String, Payload>,
27 info: ActivityInfo,
28}
29
30#[derive(Clone)]
31pub struct ActivityInfo {
32 pub task_token: Vec<u8>,
33 pub workflow_type: String,
34 pub workflow_namespace: String,
35 pub workflow_execution: Option<WorkflowExecution>,
36 pub activity_id: String,
37 pub activity_type: String,
38 pub task_queue: String,
39 pub heartbeat_timeout: Option<StdDuration>,
40 pub scheduled_time: Option<SystemTime>,
42 pub started_time: Option<SystemTime>,
44 pub deadline: Option<SystemTime>,
46 pub attempt: u32,
48 pub current_attempt_scheduled_time: Option<SystemTime>,
50 pub retry_policy: Option<RetryPolicy>,
51 pub is_local: bool,
52 pub priority: Priority,
54}
55
56impl ActContext {
57 pub(crate) fn new(
60 worker: Arc<dyn Worker>,
61 app_data: Arc<AppData>,
62 cancellation_token: CancellationToken,
63 task_queue: String,
64 task_token: Vec<u8>,
65 task: activity_task::Start,
66 ) -> (Self, Payload) {
67 let activity_task::Start {
68 workflow_namespace,
69 workflow_type,
70 workflow_execution,
71 activity_id,
72 activity_type,
73 header_fields,
74 mut input,
75 heartbeat_details,
76 scheduled_time,
77 current_attempt_scheduled_time,
78 started_time,
79 attempt,
80 schedule_to_close_timeout,
81 start_to_close_timeout,
82 heartbeat_timeout,
83 retry_policy,
84 is_local,
85 priority,
86 } = task;
87 let deadline = calculate_deadline(
88 scheduled_time.as_ref(),
89 started_time.as_ref(),
90 start_to_close_timeout.as_ref(),
91 schedule_to_close_timeout.as_ref(),
92 );
93 let first_arg = input.pop().unwrap_or_default();
94
95 (
96 ActContext {
97 worker,
98 app_data,
99 cancellation_token,
100 input,
101 heartbeat_details,
102 header_fields,
103 info: ActivityInfo {
104 task_token,
105 task_queue,
106 workflow_type,
107 workflow_namespace,
108 workflow_execution,
109 activity_id,
110 activity_type,
111 heartbeat_timeout: heartbeat_timeout.try_into_or_none(),
112 scheduled_time: scheduled_time.try_into_or_none(),
113 started_time: started_time.try_into_or_none(),
114 deadline,
115 attempt,
116 current_attempt_scheduled_time: current_attempt_scheduled_time
117 .try_into_or_none(),
118 retry_policy,
119 is_local,
120 priority: priority.map(Into::into).unwrap_or_default(),
121 },
122 },
123 first_arg,
124 )
125 }
126
127 pub async fn cancelled(&self) {
130 self.cancellation_token.clone().cancelled().await
131 }
132
133 pub fn is_cancelled(&self) -> bool {
135 self.cancellation_token.is_cancelled()
136 }
137
138 pub fn extra_inputs(&mut self) -> &mut [Payload] {
142 &mut self.input
143 }
144
145 pub fn get_heartbeat_details(&self) -> &[Payload] {
147 &self.heartbeat_details
148 }
149
150 pub fn record_heartbeat(&self, details: Vec<Payload>) {
152 if !self.info.is_local {
153 self.worker.record_activity_heartbeat(ActivityHeartbeat {
154 task_token: self.info.task_token.clone(),
155 details,
156 })
157 }
158 }
159
160 pub fn get_info(&self) -> &ActivityInfo {
162 &self.info
163 }
164
165 pub fn headers(&self) -> &HashMap<String, Payload> {
167 &self.header_fields
168 }
169
170 pub fn app_data<T: Send + Sync + 'static>(&self) -> Option<&T> {
172 self.app_data.get::<T>()
173 }
174}
175
176fn calculate_deadline(
179 scheduled_time: Option<&Timestamp>,
180 started_time: Option<&Timestamp>,
181 start_to_close_timeout: Option<&Duration>,
182 schedule_to_close_timeout: Option<&Duration>,
183) -> Option<SystemTime> {
184 match (
185 scheduled_time,
186 started_time,
187 start_to_close_timeout,
188 schedule_to_close_timeout,
189 ) {
190 (
191 Some(scheduled),
192 Some(started),
193 Some(start_to_close_timeout),
194 Some(schedule_to_close_timeout),
195 ) => {
196 let scheduled: SystemTime = maybe_convert_timestamp(scheduled)?;
197 let started: SystemTime = maybe_convert_timestamp(started)?;
198 let start_to_close_timeout: StdDuration = (*start_to_close_timeout).try_into().ok()?;
199 let schedule_to_close_timeout: StdDuration =
200 (*schedule_to_close_timeout).try_into().ok()?;
201
202 let start_to_close_deadline: SystemTime =
203 started.checked_add(start_to_close_timeout)?;
204 if schedule_to_close_timeout > StdDuration::ZERO {
205 let schedule_to_close_deadline =
206 scheduled.checked_add(schedule_to_close_timeout)?;
207 if schedule_to_close_deadline < start_to_close_deadline {
209 Some(schedule_to_close_deadline)
210 } else {
211 Some(start_to_close_deadline)
212 }
213 } else {
214 Some(start_to_close_deadline)
215 }
216 }
217 _ => None,
218 }
219}
220
221fn maybe_convert_timestamp(timestamp: &Timestamp) -> Option<SystemTime> {
224 let mut timestamp = *timestamp;
225 timestamp.normalize();
226
227 let system_time = if timestamp.seconds >= 0 {
228 std::time::UNIX_EPOCH.checked_add(StdDuration::from_secs(timestamp.seconds as u64))
229 } else {
230 std::time::UNIX_EPOCH.checked_sub(StdDuration::from_secs((-timestamp.seconds) as u64))
231 };
232
233 system_time.and_then(|system_time| {
234 system_time.checked_add(StdDuration::from_nanos(timestamp.nanos as u64))
235 })
236}