use crate::{NamespacedClient, errors::AsyncActivityError, grpc::WorkflowService};
use temporalio_common::protos::{
TaskToken,
temporal::api::{
common::v1::Payloads,
failure::v1::Failure,
workflowservice::v1::{
RecordActivityTaskHeartbeatByIdRequest, RecordActivityTaskHeartbeatByIdResponse,
RecordActivityTaskHeartbeatRequest, RecordActivityTaskHeartbeatResponse,
RespondActivityTaskCanceledByIdRequest, RespondActivityTaskCanceledRequest,
RespondActivityTaskCompletedByIdRequest, RespondActivityTaskCompletedRequest,
RespondActivityTaskFailedByIdRequest, RespondActivityTaskFailedRequest,
},
},
};
use tonic::IntoRequest;
#[derive(Debug, Clone)]
pub enum ActivityIdentifier {
TaskToken(TaskToken),
ById {
workflow_id: String,
run_id: String,
activity_id: String,
},
}
impl ActivityIdentifier {
pub fn from_task_token(token: TaskToken) -> Self {
Self::TaskToken(token)
}
pub fn by_id(
workflow_id: impl Into<String>,
run_id: impl Into<String>,
activity_id: impl Into<String>,
) -> Self {
Self::ById {
workflow_id: workflow_id.into(),
run_id: run_id.into(),
activity_id: activity_id.into(),
}
}
}
pub struct AsyncActivityHandle<CT> {
client: CT,
identifier: ActivityIdentifier,
}
impl<CT> AsyncActivityHandle<CT> {
pub fn new(client: CT, identifier: ActivityIdentifier) -> Self {
Self { client, identifier }
}
pub fn identifier(&self) -> &ActivityIdentifier {
&self.identifier
}
pub fn client(&self) -> &CT {
&self.client
}
}
impl<CT: WorkflowService + NamespacedClient + Clone> AsyncActivityHandle<CT> {
pub async fn complete(&self, result: Option<Payloads>) -> Result<(), AsyncActivityError> {
match &self.identifier {
ActivityIdentifier::TaskToken(token) => {
WorkflowService::respond_activity_task_completed(
&mut self.client.clone(),
RespondActivityTaskCompletedRequest {
task_token: token.0.clone(),
result,
identity: self.client.identity(),
namespace: self.client.namespace(),
..Default::default()
}
.into_request(),
)
.await
.map_err(AsyncActivityError::from_status)?;
}
ActivityIdentifier::ById {
workflow_id,
run_id,
activity_id,
} => {
WorkflowService::respond_activity_task_completed_by_id(
&mut self.client.clone(),
RespondActivityTaskCompletedByIdRequest {
namespace: self.client.namespace(),
workflow_id: workflow_id.clone(),
run_id: run_id.clone(),
activity_id: activity_id.clone(),
result,
identity: self.client.identity(),
resource_id: Default::default(),
}
.into_request(),
)
.await
.map_err(AsyncActivityError::from_status)?;
}
}
Ok(())
}
pub async fn fail(
&self,
failure: Failure,
last_heartbeat_details: Option<Payloads>,
) -> Result<(), AsyncActivityError> {
match &self.identifier {
ActivityIdentifier::TaskToken(token) => {
WorkflowService::respond_activity_task_failed(
&mut self.client.clone(),
RespondActivityTaskFailedRequest {
task_token: token.0.clone(),
failure: Some(failure),
identity: self.client.identity(),
namespace: self.client.namespace(),
last_heartbeat_details,
..Default::default()
}
.into_request(),
)
.await
.map_err(AsyncActivityError::from_status)?;
}
ActivityIdentifier::ById {
workflow_id,
run_id,
activity_id,
} => {
WorkflowService::respond_activity_task_failed_by_id(
&mut self.client.clone(),
RespondActivityTaskFailedByIdRequest {
namespace: self.client.namespace(),
workflow_id: workflow_id.clone(),
run_id: run_id.clone(),
activity_id: activity_id.clone(),
failure: Some(failure),
identity: self.client.identity(),
last_heartbeat_details,
resource_id: Default::default(),
}
.into_request(),
)
.await
.map_err(AsyncActivityError::from_status)?;
}
}
Ok(())
}
pub async fn report_cancelation(
&self,
details: Option<Payloads>,
) -> Result<(), AsyncActivityError> {
match &self.identifier {
ActivityIdentifier::TaskToken(token) => {
WorkflowService::respond_activity_task_canceled(
&mut self.client.clone(),
RespondActivityTaskCanceledRequest {
task_token: token.0.clone(),
details,
identity: self.client.identity(),
namespace: self.client.namespace(),
..Default::default()
}
.into_request(),
)
.await
.map_err(AsyncActivityError::from_status)?;
}
ActivityIdentifier::ById {
workflow_id,
run_id,
activity_id,
} => {
WorkflowService::respond_activity_task_canceled_by_id(
&mut self.client.clone(),
RespondActivityTaskCanceledByIdRequest {
namespace: self.client.namespace(),
workflow_id: workflow_id.clone(),
run_id: run_id.clone(),
activity_id: activity_id.clone(),
details,
identity: self.client.identity(),
..Default::default()
}
.into_request(),
)
.await
.map_err(AsyncActivityError::from_status)?;
}
}
Ok(())
}
pub async fn heartbeat(
&self,
details: Option<Payloads>,
) -> Result<ActivityHeartbeatResponse, AsyncActivityError> {
match &self.identifier {
ActivityIdentifier::TaskToken(token) => {
let resp = WorkflowService::record_activity_task_heartbeat(
&mut self.client.clone(),
RecordActivityTaskHeartbeatRequest {
task_token: token.0.clone(),
details,
identity: self.client.identity(),
namespace: self.client.namespace(),
resource_id: Default::default(),
}
.into_request(),
)
.await
.map_err(AsyncActivityError::from_status)?
.into_inner();
Ok(ActivityHeartbeatResponse::from(resp))
}
ActivityIdentifier::ById {
workflow_id,
run_id,
activity_id,
} => {
let resp = WorkflowService::record_activity_task_heartbeat_by_id(
&mut self.client.clone(),
RecordActivityTaskHeartbeatByIdRequest {
namespace: self.client.namespace(),
workflow_id: workflow_id.clone(),
run_id: run_id.clone(),
activity_id: activity_id.clone(),
details,
identity: self.client.identity(),
resource_id: Default::default(),
}
.into_request(),
)
.await
.map_err(AsyncActivityError::from_status)?
.into_inner();
Ok(ActivityHeartbeatResponse::from(resp))
}
}
}
}
#[derive(Debug, Clone)]
pub struct ActivityHeartbeatResponse {
pub cancel_requested: bool,
pub activity_paused: bool,
pub activity_reset: bool,
}
impl From<RecordActivityTaskHeartbeatResponse> for ActivityHeartbeatResponse {
fn from(resp: RecordActivityTaskHeartbeatResponse) -> Self {
Self {
cancel_requested: resp.cancel_requested,
activity_paused: resp.activity_paused,
activity_reset: resp.activity_reset,
}
}
}
impl From<RecordActivityTaskHeartbeatByIdResponse> for ActivityHeartbeatResponse {
fn from(resp: RecordActivityTaskHeartbeatByIdResponse) -> Self {
Self {
cancel_requested: resp.cancel_requested,
activity_paused: resp.activity_paused,
activity_reset: resp.activity_reset,
}
}
}