1#[doc(inline)]
48pub use temporalio_macros::activities;
49
50use futures_util::{FutureExt, future::BoxFuture};
51use prost_types::{Duration, Timestamp};
52use std::{
53 collections::HashMap,
54 fmt::Debug,
55 sync::Arc,
56 time::{Duration as StdDuration, SystemTime},
57};
58use temporalio_client::Priority;
59use temporalio_common::{
60 ActivityDefinition,
61 data_converters::{
62 DataConverter, GenericPayloadConverter, SerializationContext, SerializationContextData,
63 },
64 protos::{
65 coresdk::{ActivityHeartbeat, activity_task},
66 temporal::api::common::v1::{Payload, RetryPolicy, WorkflowExecution},
67 utilities::TryIntoOrNone,
68 },
69};
70use temporalio_sdk_core::Worker as CoreWorker;
71use tokio_util::sync::CancellationToken;
72
73#[derive(Clone)]
75pub struct ActivityContext {
76 worker: Arc<CoreWorker>,
77 cancellation_token: CancellationToken,
78 heartbeat_details: Vec<Payload>,
79 header_fields: HashMap<String, Payload>,
80 info: ActivityInfo,
81}
82
83impl ActivityContext {
84 pub fn new(
86 worker: Arc<CoreWorker>,
87 cancellation_token: CancellationToken,
88 task_queue: String,
89 task_token: Vec<u8>,
90 task: activity_task::Start,
91 ) -> (Self, Vec<Payload>) {
92 let activity_task::Start {
93 workflow_namespace,
94 workflow_type,
95 workflow_execution,
96 activity_id,
97 activity_type,
98 header_fields,
99 input,
100 heartbeat_details,
101 scheduled_time,
102 current_attempt_scheduled_time,
103 started_time,
104 attempt,
105 schedule_to_close_timeout,
106 start_to_close_timeout,
107 heartbeat_timeout,
108 retry_policy,
109 is_local,
110 priority,
111 } = task;
112 let deadline = calculate_deadline(
113 scheduled_time.as_ref(),
114 started_time.as_ref(),
115 start_to_close_timeout.as_ref(),
116 schedule_to_close_timeout.as_ref(),
117 );
118
119 (
120 ActivityContext {
121 worker,
122 cancellation_token,
123 heartbeat_details,
124 header_fields,
125 info: ActivityInfo {
126 task_token,
127 task_queue,
128 workflow_type,
129 workflow_namespace,
130 workflow_execution,
131 activity_id,
132 activity_type,
133 heartbeat_timeout: heartbeat_timeout.try_into_or_none(),
134 scheduled_time: scheduled_time.try_into_or_none(),
135 started_time: started_time.try_into_or_none(),
136 deadline,
137 attempt,
138 current_attempt_scheduled_time: current_attempt_scheduled_time
139 .try_into_or_none(),
140 retry_policy,
141 is_local,
142 priority: priority.map(Into::into).unwrap_or_default(),
143 },
144 },
145 input,
146 )
147 }
148
149 pub async fn cancelled(&self) {
152 self.cancellation_token.clone().cancelled().await
153 }
154
155 pub fn is_cancelled(&self) -> bool {
157 self.cancellation_token.is_cancelled()
158 }
159
160 pub fn heartbeat_details(&self) -> &[Payload] {
163 &self.heartbeat_details
164 }
165
166 pub fn record_heartbeat(&self, details: Vec<Payload>) {
168 if !self.info.is_local {
169 self.worker.record_activity_heartbeat(ActivityHeartbeat {
170 task_token: self.info.task_token.clone(),
171 details,
172 })
173 }
174 }
175
176 pub fn info(&self) -> &ActivityInfo {
178 &self.info
179 }
180
181 pub fn headers(&self) -> &HashMap<String, Payload> {
183 &self.header_fields
184 }
185}
186
187#[derive(Clone, Debug)]
189#[non_exhaustive]
190pub struct ActivityInfo {
191 pub task_token: Vec<u8>,
193 pub workflow_type: String,
195 pub workflow_namespace: String,
197 pub workflow_execution: Option<WorkflowExecution>,
199 pub activity_id: String,
201 pub activity_type: String,
203 pub task_queue: String,
205 pub heartbeat_timeout: Option<StdDuration>,
207 pub scheduled_time: Option<SystemTime>,
209 pub started_time: Option<SystemTime>,
211 pub deadline: Option<SystemTime>,
213 pub attempt: u32,
215 pub current_attempt_scheduled_time: Option<SystemTime>,
217 pub retry_policy: Option<RetryPolicy>,
219 pub is_local: bool,
221 pub priority: Priority,
223}
224
225#[derive(Debug)]
227pub enum ActivityError {
228 Retryable {
232 source: Box<dyn std::error::Error + Send + Sync + 'static>,
234 explicit_delay: Option<StdDuration>,
236 },
237 Cancelled {
239 details: Option<Payload>,
241 },
242 NonRetryable(Box<dyn std::error::Error + Send + Sync + 'static>),
244 WillCompleteAsync,
247}
248
249impl ActivityError {
250 pub fn cancelled() -> Self {
252 Self::Cancelled { details: None }
253 }
254}
255
256impl<E> From<E> for ActivityError
257where
258 E: Into<anyhow::Error>,
259{
260 fn from(source: E) -> Self {
261 Self::Retryable {
262 source: source.into().into_boxed_dyn_error(),
263 explicit_delay: None,
264 }
265 }
266}
267
268fn calculate_deadline(
271 scheduled_time: Option<&Timestamp>,
272 started_time: Option<&Timestamp>,
273 start_to_close_timeout: Option<&Duration>,
274 schedule_to_close_timeout: Option<&Duration>,
275) -> Option<SystemTime> {
276 match (
277 scheduled_time,
278 started_time,
279 start_to_close_timeout,
280 schedule_to_close_timeout,
281 ) {
282 (
283 Some(scheduled),
284 Some(started),
285 Some(start_to_close_timeout),
286 Some(schedule_to_close_timeout),
287 ) => {
288 let scheduled: SystemTime = maybe_convert_timestamp(scheduled)?;
289 let started: SystemTime = maybe_convert_timestamp(started)?;
290 let start_to_close_timeout: StdDuration = (*start_to_close_timeout).try_into().ok()?;
291 let schedule_to_close_timeout: StdDuration =
292 (*schedule_to_close_timeout).try_into().ok()?;
293
294 let start_to_close_deadline: SystemTime =
295 started.checked_add(start_to_close_timeout)?;
296 if schedule_to_close_timeout > StdDuration::ZERO {
297 let schedule_to_close_deadline =
298 scheduled.checked_add(schedule_to_close_timeout)?;
299 if schedule_to_close_deadline < start_to_close_deadline {
301 Some(schedule_to_close_deadline)
302 } else {
303 Some(start_to_close_deadline)
304 }
305 } else {
306 Some(start_to_close_deadline)
307 }
308 }
309 _ => None,
310 }
311}
312
313fn maybe_convert_timestamp(timestamp: &Timestamp) -> Option<SystemTime> {
316 let mut timestamp = *timestamp;
317 timestamp.normalize();
318
319 let system_time = if timestamp.seconds >= 0 {
320 std::time::UNIX_EPOCH.checked_add(StdDuration::from_secs(timestamp.seconds as u64))
321 } else {
322 std::time::UNIX_EPOCH.checked_sub(StdDuration::from_secs((-timestamp.seconds) as u64))
323 };
324
325 system_time.and_then(|system_time| {
326 system_time.checked_add(StdDuration::from_nanos(timestamp.nanos as u64))
327 })
328}
329
330pub(crate) type ActivityInvocation = Arc<
331 dyn Fn(
332 Vec<Payload>,
333 DataConverter,
334 ActivityContext,
335 ) -> BoxFuture<'static, Result<Payload, ActivityError>>
336 + Send
337 + Sync,
338>;
339
340#[doc(hidden)]
341pub trait ActivityImplementer {
342 fn register_all(self: Arc<Self>, defs: &mut ActivityDefinitions);
343}
344
345#[doc(hidden)]
346pub trait ExecutableActivity: ActivityDefinition {
347 type Implementer: ActivityImplementer + Send + Sync + 'static;
348 fn execute(
349 receiver: Option<Arc<Self::Implementer>>,
350 ctx: ActivityContext,
351 input: Self::Input,
352 ) -> BoxFuture<'static, Result<Self::Output, ActivityError>>;
353}
354
355#[doc(hidden)]
356pub trait HasOnlyStaticMethods {}
357
358#[derive(Default, Clone)]
360pub struct ActivityDefinitions {
361 activities: HashMap<&'static str, ActivityInvocation>,
362}
363
364impl ActivityDefinitions {
365 pub fn register_activities<AI: ActivityImplementer>(&mut self, instance: AI) -> &mut Self {
367 let arcd = Arc::new(instance);
368 AI::register_all(arcd, self);
369 self
370 }
371 pub fn register_activity<AD>(&mut self, instance: Arc<AD::Implementer>) -> &mut Self
373 where
374 AD: ActivityDefinition + ExecutableActivity,
375 AD::Output: Send + Sync,
376 {
377 self.activities.insert(
378 AD::name(),
379 Arc::new(move |payloads, dc, c| {
380 let instance = instance.clone();
381 let dc = dc.clone();
382 async move {
383 let pc = dc.payload_converter();
386 let ctx = SerializationContext {
387 data: &SerializationContextData::Activity,
388 converter: pc,
389 };
390 let deserialized: AD::Input = pc
391 .from_payloads(&ctx, payloads)
392 .map_err(ActivityError::from)?;
393 let result = AD::execute(Some(instance), c, deserialized).await?;
394 pc.to_payload(&ctx, &result).map_err(ActivityError::from)
395 }
396 .boxed()
397 }),
398 );
399 self
400 }
401
402 pub(crate) fn is_empty(&self) -> bool {
403 self.activities.is_empty()
404 }
405
406 pub(crate) fn get(&self, act_type: &str) -> Option<ActivityInvocation> {
407 self.activities.get(act_type).cloned()
408 }
409}
410
411impl Debug for ActivityDefinitions {
412 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
413 f.debug_struct("ActivityDefinitions")
414 .field("activities", &self.activities.keys())
415 .finish()
416 }
417}