use crate::{ProtoActivityId, ProtoPayload, ProtoWorkflowId, WireError};
#[derive(
Clone,
Copy,
Debug,
PartialEq,
Eq,
Hash,
serde::Serialize,
serde::Deserialize,
prost::Enumeration,
)]
#[repr(i32)]
pub enum ProtoActivityErrorKind {
Unspecified = 0,
Retryable = 1,
Terminal = 2,
}
#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
pub struct ProtoActivityError {
#[prost(enumeration = "ProtoActivityErrorKind", tag = "1")]
pub kind: i32,
#[prost(string, tag = "2")]
pub message: String,
#[prost(message, optional, tag = "3")]
pub details: Option<ProtoPayload>,
}
#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
pub struct ProtoRegisterWorker {
#[prost(string, tag = "1")]
pub namespace: String,
#[prost(string, repeated, tag = "2")]
pub activity_types: Vec<String>,
}
#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
pub struct ProtoActivityTask {
#[prost(message, optional, tag = "1")]
pub workflow_id: Option<ProtoWorkflowId>,
#[prost(message, optional, tag = "2")]
pub activity_id: Option<ProtoActivityId>,
#[prost(string, tag = "3")]
pub activity_type: String,
#[prost(message, optional, tag = "4")]
pub input: Option<ProtoPayload>,
#[prost(uint32, tag = "5")]
pub attempt: u32,
}
#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
pub struct ProtoDrainRequest {}
#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
pub struct ProtoRegisterAck {
#[prost(uint64, tag = "1")]
pub worker_id: u64,
#[prost(string, tag = "2")]
pub namespace: String,
#[prost(uint64, tag = "3")]
pub heartbeat_window_ms: u64,
}
#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
pub struct ProtoResultAck {
#[prost(message, optional, tag = "1")]
pub workflow_id: Option<ProtoWorkflowId>,
#[prost(message, optional, tag = "2")]
pub activity_id: Option<ProtoActivityId>,
}
#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
pub struct ProtoActivityResult {
#[prost(message, optional, tag = "1")]
pub workflow_id: Option<ProtoWorkflowId>,
#[prost(message, optional, tag = "2")]
pub activity_id: Option<ProtoActivityId>,
#[prost(oneof = "proto_activity_result::Outcome", tags = "3, 4")]
pub outcome: Option<proto_activity_result::Outcome>,
}
pub mod proto_activity_result {
use super::{ProtoActivityError, ProtoPayload};
#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Oneof)]
pub enum Outcome {
#[prost(message, tag = "3")]
Result(ProtoPayload),
#[prost(message, tag = "4")]
Error(ProtoActivityError),
}
}
#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
pub struct ProtoHeartbeat {
#[prost(message, optional, tag = "1")]
pub workflow_id: Option<ProtoWorkflowId>,
#[prost(message, optional, tag = "2")]
pub activity_id: Option<ProtoActivityId>,
#[prost(message, optional, tag = "3")]
pub progress: Option<ProtoPayload>,
}
impl From<aion_core::ActivityErrorKind> for ProtoActivityErrorKind {
fn from(value: aion_core::ActivityErrorKind) -> Self {
match value {
aion_core::ActivityErrorKind::Retryable => Self::Retryable,
aion_core::ActivityErrorKind::Terminal => Self::Terminal,
}
}
}
impl TryFrom<ProtoActivityErrorKind> for aion_core::ActivityErrorKind {
type Error = WireError;
fn try_from(value: ProtoActivityErrorKind) -> Result<Self, Self::Error> {
match value {
ProtoActivityErrorKind::Unspecified => {
Err(WireError::backend("activity error kind is missing"))
}
ProtoActivityErrorKind::Retryable => Ok(Self::Retryable),
ProtoActivityErrorKind::Terminal => Ok(Self::Terminal),
}
}
}
impl From<aion_core::ActivityError> for ProtoActivityError {
fn from(value: aion_core::ActivityError) -> Self {
Self {
kind: ProtoActivityErrorKind::from(value.kind) as i32,
message: value.message,
details: value.details.map(ProtoPayload::from),
}
}
}
impl TryFrom<ProtoActivityError> for aion_core::ActivityError {
type Error = WireError;
fn try_from(value: ProtoActivityError) -> Result<Self, Self::Error> {
let kind = ProtoActivityErrorKind::try_from(value.kind)
.map_err(|_| WireError::backend("activity error kind is unknown"))?;
Ok(Self {
kind: aion_core::ActivityErrorKind::try_from(kind)?,
message: value.message,
details: value
.details
.map(aion_core::Payload::try_from)
.transpose()?,
})
}
}
#[cfg(test)]
mod tests {
use prost::Message;
use serde_json::json;
use super::{
ProtoActivityError, ProtoActivityErrorKind, ProtoActivityResult, ProtoActivityTask,
ProtoDrainRequest, ProtoHeartbeat, ProtoRegisterAck, ProtoRegisterWorker, ProtoResultAck,
proto_activity_result,
};
use crate::{ProtoActivityId, ProtoPayload, ProtoWorkflowId, WireError};
fn workflow_id() -> aion_core::WorkflowId {
aion_core::WorkflowId::new(uuid::Uuid::nil())
}
#[test]
fn activity_error_round_trips_preserving_classification() -> Result<(), WireError> {
let core = aion_core::ActivityError {
kind: aion_core::ActivityErrorKind::Retryable,
message: String::from("connection reset"),
details: Some(
aion_core::Payload::from_json(&json!({"retry_after_ms": 500}))
.map_err(|_| WireError::backend("test payload could not be created"))?,
),
};
let proto = ProtoActivityError::from(core.clone());
assert_eq!(aion_core::ActivityError::try_from(proto.clone())?, core);
assert!(aion_core::ActivityError::try_from(proto)?.is_retryable());
let terminal = ProtoActivityError {
kind: ProtoActivityErrorKind::Terminal as i32,
message: String::from("invalid request"),
details: None,
};
assert!(!aion_core::ActivityError::try_from(terminal)?.is_retryable());
Ok(())
}
#[test]
fn worker_registration_round_trips_through_serde_and_proto()
-> Result<(), Box<dyn std::error::Error>> {
let registration = ProtoRegisterWorker {
namespace: String::from("tenant-a"),
activity_types: vec![String::from("charge-card"), String::from("send-email")],
};
assert_json_and_proto_round_trip(®istration)
}
#[test]
fn activity_task_round_trips_through_serde_and_proto() -> Result<(), Box<dyn std::error::Error>>
{
let task = ProtoActivityTask {
workflow_id: Some(ProtoWorkflowId::from(workflow_id())),
activity_id: Some(ProtoActivityId::from(
aion_core::ActivityId::from_sequence_position(7),
)),
activity_type: String::from("charge-card"),
input: Some(ProtoPayload::from(aion_core::Payload::from_json(
&json!({"amount": 42}),
)?)),
attempt: 3,
};
assert_json_and_proto_round_trip(&task)
}
#[test]
fn drain_request_round_trips_through_serde_and_proto() -> Result<(), Box<dyn std::error::Error>>
{
assert_json_and_proto_round_trip(&ProtoDrainRequest {})
}
#[test]
fn register_ack_round_trips_through_serde_and_proto() -> Result<(), Box<dyn std::error::Error>>
{
let ack = ProtoRegisterAck {
worker_id: 7,
namespace: String::from("tenant-a"),
heartbeat_window_ms: 30_000,
};
assert_json_and_proto_round_trip(&ack)
}
#[test]
fn result_ack_round_trips_through_serde_and_proto() -> Result<(), Box<dyn std::error::Error>> {
let ack = ProtoResultAck {
workflow_id: Some(ProtoWorkflowId::from(workflow_id())),
activity_id: Some(ProtoActivityId::from(
aion_core::ActivityId::from_sequence_position(11),
)),
};
assert_json_and_proto_round_trip(&ack)
}
#[cfg(feature = "generated")]
#[test]
fn server_to_worker_ack_arms_pin_oneof_tags_three_and_four()
-> Result<(), Box<dyn std::error::Error>> {
let register_ack = crate::generated::ServerToWorker {
message: Some(crate::generated::server_to_worker::Message::RegisterAck(
crate::generated::RegisterAck {
worker_id: 1,
namespace: String::from("tenant-a"),
heartbeat_window_ms: 1_000,
},
)),
};
let mut bytes = Vec::new();
register_ack.encode(&mut bytes)?;
assert_eq!(bytes.first(), Some(&0x1A));
assert_eq!(
crate::generated::ServerToWorker::decode(bytes.as_slice())?,
register_ack
);
let result_ack = crate::generated::ServerToWorker {
message: Some(crate::generated::server_to_worker::Message::ResultAck(
crate::generated::ResultAck {
workflow_id: None,
activity_id: None,
},
)),
};
let mut bytes = Vec::new();
result_ack.encode(&mut bytes)?;
assert_eq!(bytes.first(), Some(&0x22));
assert_eq!(
crate::generated::ServerToWorker::decode(bytes.as_slice())?,
result_ack
);
Ok(())
}
#[test]
fn activity_task_attempt_uses_wire_tag_five() -> Result<(), Box<dyn std::error::Error>> {
let task = ProtoActivityTask {
workflow_id: None,
activity_id: None,
activity_type: String::new(),
input: None,
attempt: 9,
};
let mut bytes = Vec::new();
task.encode(&mut bytes)?;
assert_eq!(bytes, vec![0x28, 9]);
Ok(())
}
#[test]
fn activity_success_result_round_trips_through_serde_and_proto()
-> Result<(), Box<dyn std::error::Error>> {
let result = ProtoActivityResult {
workflow_id: Some(ProtoWorkflowId::from(workflow_id())),
activity_id: Some(ProtoActivityId::from(
aion_core::ActivityId::from_sequence_position(8),
)),
outcome: Some(proto_activity_result::Outcome::Result(ProtoPayload::from(
aion_core::Payload::from_json(&json!({"authorization": "ok"}))?,
))),
};
assert_json_and_proto_round_trip(&result)
}
#[test]
fn activity_error_result_round_trips_through_serde_and_proto()
-> Result<(), Box<dyn std::error::Error>> {
let result = ProtoActivityResult {
workflow_id: Some(ProtoWorkflowId::from(workflow_id())),
activity_id: Some(ProtoActivityId::from(
aion_core::ActivityId::from_sequence_position(9),
)),
outcome: Some(proto_activity_result::Outcome::Error(
ProtoActivityError::from(aion_core::ActivityError {
kind: aion_core::ActivityErrorKind::Terminal,
message: String::from("card declined"),
details: Some(aion_core::Payload::from_json(&json!({"code": "declined"}))?),
}),
)),
};
assert_json_and_proto_round_trip(&result)
}
#[test]
fn heartbeat_round_trips_through_serde_and_proto() -> Result<(), Box<dyn std::error::Error>> {
let heartbeat = ProtoHeartbeat {
workflow_id: Some(ProtoWorkflowId::from(workflow_id())),
activity_id: Some(ProtoActivityId::from(
aion_core::ActivityId::from_sequence_position(10),
)),
progress: Some(ProtoPayload::from(aion_core::Payload::from_json(
&json!({"percent": 50}),
)?)),
};
assert_json_and_proto_round_trip(&heartbeat)
}
fn assert_json_and_proto_round_trip<T>(value: &T) -> Result<(), Box<dyn std::error::Error>>
where
T: Message
+ Default
+ serde::Serialize
+ serde::de::DeserializeOwned
+ PartialEq
+ std::fmt::Debug,
{
assert_eq!(
serde_json::from_str::<T>(&serde_json::to_string(value)?)?,
*value
);
assert_eq!(prost_round_trip(value)?, *value);
Ok(())
}
fn prost_round_trip<T>(value: &T) -> Result<T, Box<dyn std::error::Error>>
where
T: Message + Default,
{
let mut bytes = Vec::new();
value.encode(&mut bytes)?;
Ok(T::decode(bytes.as_slice())?)
}
}