#![allow(unreachable_pub)]
use std::time::Duration;
use temporalio_common::protos::coresdk::AsJsonPayloadExt;
use temporalio_macros::{activities, workflow, workflow_methods};
use temporalio_sdk::{
ActivityOptions, WorkflowContext, WorkflowResult,
activities::{ActivityContext, ActivityError},
};
pub struct HeartbeatingActivities;
#[activities]
impl HeartbeatingActivities {
#[activity]
pub async fn long_running_activity(
ctx: ActivityContext,
total_steps: u32,
) -> Result<String, ActivityError> {
let start_step: u32 = ctx
.heartbeat_details()
.first()
.and_then(|p| serde_json::from_slice(&p.data).ok())
.unwrap_or(0);
for step in start_step..total_steps {
if ctx.is_cancelled() {
return Err(ActivityError::cancelled());
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
ctx.record_heartbeat(vec![step.as_json_payload().unwrap()]);
}
Ok(format!("Completed {total_steps} steps"))
}
}
#[workflow]
#[derive(Default)]
pub struct HeartbeatingWorkflow;
#[workflow_methods]
impl HeartbeatingWorkflow {
#[run]
pub async fn run(ctx: &mut WorkflowContext<Self>, total_steps: u32) -> WorkflowResult<String> {
let result = ctx
.start_activity(
HeartbeatingActivities::long_running_activity,
total_steps,
ActivityOptions::with_start_to_close_timeout(Duration::from_secs(30))
.heartbeat_timeout(Duration::from_secs(5))
.build(),
)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
Ok(result)
}
}