use anyhow::anyhow;
use std::collections::HashMap;
use temporalio_common::protos::temporal::api::{
common::v1::Payload,
history::v1::{HistoryEvent, history_event},
protocol::v1::{Message, message::SequencingId},
update,
};
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct IncomingProtocolMessage {
pub(crate) id: String,
pub(crate) protocol_instance_id: String,
pub(crate) sequencing_id: Option<SequencingId>,
pub(crate) body: IncomingProtocolMessageBody,
}
impl IncomingProtocolMessage {
pub(crate) fn processable_after_event_id(&self) -> Option<i64> {
match self.sequencing_id {
None => Some(0),
Some(SequencingId::EventId(id)) => Some(id),
Some(SequencingId::CommandIndex(_)) => None,
}
}
}
impl TryFrom<Message> for IncomingProtocolMessage {
type Error = anyhow::Error;
fn try_from(m: Message) -> Result<Self, Self::Error> {
let body = m.body.try_into()?;
Ok(Self {
id: m.id,
protocol_instance_id: m.protocol_instance_id,
sequencing_id: m.sequencing_id,
body,
})
}
}
impl TryFrom<&HistoryEvent> for IncomingProtocolMessage {
type Error = anyhow::Error;
fn try_from(event: &HistoryEvent) -> Result<Self, Self::Error> {
match event.attributes {
Some(history_event::Attributes::WorkflowExecutionUpdateAdmittedEventAttributes(
ref atts,
)) => {
let request = atts.request.as_ref().ok_or_else(|| {
anyhow!("Update admitted event must contain request".to_string())
})?;
let protocol_instance_id = request
.meta
.as_ref()
.ok_or_else(|| {
anyhow!("Update request's `meta` field must be populated".to_string())
})?
.update_id
.clone();
Ok(IncomingProtocolMessage {
id: format!("{protocol_instance_id}/request"),
protocol_instance_id,
sequencing_id: Some(SequencingId::EventId(event.event_id)),
body: IncomingProtocolMessageBody::UpdateRequest(request.clone().try_into()?),
})
}
Some(history_event::Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(
ref atts,
)) => Ok(IncomingProtocolMessage {
id: atts.accepted_request_message_id.clone(),
protocol_instance_id: atts.protocol_instance_id.clone(),
sequencing_id: Some(SequencingId::EventId(
atts.accepted_request_sequencing_event_id,
)),
body: IncomingProtocolMessageBody::UpdateRequest(
atts.accepted_request
.clone()
.ok_or_else(|| {
anyhow!("Update accepted event must contain accepted request")
})?
.try_into()?,
),
}),
_ => Err(anyhow!(
"Cannot convert event of type {} into protocol message. This is an sdk-core bug.",
event.event_type().as_str_name()
)),
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum IncomingProtocolMessageBody {
UpdateRequest(UpdateRequest),
}
impl TryFrom<Option<prost_types::Any>> for IncomingProtocolMessageBody {
type Error = anyhow::Error;
fn try_from(v: Option<prost_types::Any>) -> Result<Self, Self::Error> {
let v = v.ok_or_else(|| anyhow!("Protocol message body must be populated"))?;
Ok(IncomingProtocolMessageBody::UpdateRequest(
v.to_msg::<update::v1::Request>()?.try_into()?,
))
}
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct UpdateRequest {
pub(crate) original: update::v1::Request,
}
impl UpdateRequest {
pub(crate) fn name(&self) -> &str {
&self
.original
.input
.as_ref()
.expect("Update request's `input` field must be populated")
.name
}
pub(crate) fn headers(&self) -> HashMap<String, Payload> {
self.original
.input
.as_ref()
.expect("Update request's `input` field must be populated")
.header
.clone()
.map(Into::into)
.unwrap_or_default()
}
pub(crate) fn input(&self) -> Vec<Payload> {
self.original
.input
.as_ref()
.expect("Update request's `input` field must be populated")
.args
.clone()
.map(|ps| ps.payloads)
.unwrap_or_default()
}
pub(crate) fn meta(&self) -> &update::v1::Meta {
self.original
.meta
.as_ref()
.expect("Update request's `meta` field must be populated")
}
}
impl TryFrom<update::v1::Request> for UpdateRequest {
type Error = anyhow::Error;
fn try_from(r: update::v1::Request) -> Result<Self, Self::Error> {
if r.input.is_none() {
return Err(anyhow!("Update request's `input` field must be populated"));
}
if r.meta.is_none() {
return Err(anyhow!("Update request's `meta` field must be populated"));
}
Ok(UpdateRequest { original: r })
}
}