use serde::{Deserialize, Serialize};
use crate::effects::RoleId;
use crate::identifiers::RoleName;
use crate::testing::clock::WallClock;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProtocolEnvelope {
pub protocol: String,
pub from_role: RoleName,
pub from_index: Option<u32>,
pub to_role: RoleName,
pub to_index: Option<u32>,
pub message_type: String,
pub sequence: u64,
pub timestamp_ns: u64,
pub correlation_id: Option<String>,
pub payload: Vec<u8>,
}
impl ProtocolEnvelope {
#[must_use]
pub fn builder() -> EnvelopeBuilder {
EnvelopeBuilder::default()
}
#[must_use]
pub fn payload_size(&self) -> usize {
self.payload.len()
}
#[must_use]
pub fn is_protocol(&self, name: &str) -> bool {
self.protocol == name
}
#[must_use]
pub fn is_from(&self, role: &RoleName) -> bool {
&self.from_role == role
}
#[must_use]
pub fn is_to(&self, role: &RoleName) -> bool {
&self.to_role == role
}
#[must_use]
pub fn routing_key(&self) -> String {
match (&self.from_index, &self.to_index) {
(Some(fi), Some(ti)) => {
format!(
"{}.{}[{}].{}[{}]",
self.protocol, self.from_role, fi, self.to_role, ti
)
}
(Some(fi), None) => {
format!(
"{}.{}[{}].{}",
self.protocol, self.from_role, fi, self.to_role
)
}
(None, Some(ti)) => {
format!(
"{}.{}.{}[{}]",
self.protocol, self.from_role, self.to_role, ti
)
}
(None, None) => {
format!("{}.{}.{}", self.protocol, self.from_role, self.to_role)
}
}
}
pub fn to_bytes(&self) -> Result<Vec<u8>, EnvelopeError> {
bincode::serialize(self).map_err(|e| EnvelopeError::Serialization(e.to_string()))
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self, EnvelopeError> {
bincode::deserialize(bytes).map_err(|e| EnvelopeError::Deserialization(e.to_string()))
}
}
#[derive(Debug, Default)]
pub struct EnvelopeBuilder {
protocol: Option<String>,
from_role: Option<RoleName>,
from_index: Option<u32>,
to_role: Option<RoleName>,
to_index: Option<u32>,
message_type: Option<String>,
sequence: u64,
timestamp_ns: Option<u64>,
correlation_id: Option<String>,
payload: Vec<u8>,
}
impl EnvelopeBuilder {
#[must_use]
pub fn protocol(mut self, protocol: impl Into<String>) -> Self {
self.protocol = Some(protocol.into());
self
}
#[must_use]
pub fn sender(mut self, role: RoleName) -> Self {
self.from_role = Some(role);
self
}
#[must_use]
pub fn sender_role<R: RoleId>(mut self, role: R) -> Self {
self.from_role = Some(role.role_name());
self.from_index = role.role_index();
self
}
#[must_use]
pub fn sender_index(mut self, index: u32) -> Self {
self.from_index = Some(index);
self
}
#[must_use]
pub fn recipient(mut self, role: RoleName) -> Self {
self.to_role = Some(role);
self
}
#[must_use]
pub fn recipient_role<R: RoleId>(mut self, role: R) -> Self {
self.to_role = Some(role.role_name());
self.to_index = role.role_index();
self
}
#[must_use]
pub fn recipient_index(mut self, index: u32) -> Self {
self.to_index = Some(index);
self
}
#[must_use]
pub fn message_type(mut self, msg_type: impl Into<String>) -> Self {
self.message_type = Some(msg_type.into());
self
}
#[must_use]
pub fn sequence(mut self, seq: u64) -> Self {
self.sequence = seq;
self
}
#[must_use]
pub fn timestamp(mut self, timestamp_ns: u64) -> Self {
self.timestamp_ns = Some(timestamp_ns);
self
}
#[must_use]
pub fn timestamp_from<C: WallClock>(mut self, clock: &C) -> Self {
self.timestamp_ns = Some(clock.now_unix_ns());
self
}
#[must_use]
pub fn correlation_id(mut self, id: impl Into<String>) -> Self {
self.correlation_id = Some(id.into());
self
}
#[must_use]
pub fn payload(mut self, payload: Vec<u8>) -> Self {
self.payload = payload;
self
}
pub fn payload_from<T: Serialize>(mut self, msg: &T) -> Result<Self, EnvelopeError> {
self.payload =
bincode::serialize(msg).map_err(|e| EnvelopeError::Serialization(e.to_string()))?;
Ok(self)
}
pub fn build(self) -> Result<ProtocolEnvelope, EnvelopeError> {
let protocol = self
.protocol
.ok_or(EnvelopeError::MissingField(EnvelopeField::Protocol))?;
let from_role = self
.from_role
.ok_or(EnvelopeError::MissingField(EnvelopeField::FromRole))?;
let to_role = self
.to_role
.ok_or(EnvelopeError::MissingField(EnvelopeField::ToRole))?;
let message_type = self
.message_type
.ok_or(EnvelopeError::MissingField(EnvelopeField::MessageType))?;
let timestamp_ns = self.timestamp_ns.unwrap_or(0);
Ok(ProtocolEnvelope {
protocol,
from_role,
from_index: self.from_index,
to_role,
to_index: self.to_index,
message_type,
sequence: self.sequence,
timestamp_ns,
correlation_id: self.correlation_id,
payload: self.payload,
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EnvelopeField {
Protocol,
FromRole,
ToRole,
MessageType,
}
impl std::fmt::Display for EnvelopeField {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
EnvelopeField::Protocol => write!(f, "protocol"),
EnvelopeField::FromRole => write!(f, "from_role"),
EnvelopeField::ToRole => write!(f, "to_role"),
EnvelopeField::MessageType => write!(f, "message_type"),
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum EnvelopeError {
#[error("Missing required field: {0}")]
MissingField(EnvelopeField),
#[error("Serialization error: {0}")]
Serialization(String),
#[error("Deserialization error: {0}")]
Deserialization(String),
}
#[cfg(test)]
mod tests {
use super::*;
use crate::identifiers::RoleName;
#[test]
fn test_envelope_builder() {
let envelope = ProtocolEnvelope::builder()
.protocol("TestProtocol")
.sender(RoleName::from_static("Client"))
.recipient(RoleName::from_static("Server"))
.message_type("Request")
.sequence(1)
.payload(vec![1, 2, 3])
.build()
.unwrap();
assert_eq!(envelope.protocol, "TestProtocol");
assert_eq!(envelope.from_role.as_str(), "Client");
assert_eq!(envelope.to_role.as_str(), "Server");
assert_eq!(envelope.message_type, "Request");
assert_eq!(envelope.sequence, 1);
assert_eq!(envelope.payload, vec![1, 2, 3]);
}
#[test]
fn test_routing_key() {
let envelope = ProtocolEnvelope::builder()
.protocol("Proto")
.sender(RoleName::from_static("A"))
.recipient(RoleName::from_static("B"))
.message_type("Msg")
.build()
.unwrap();
assert_eq!(envelope.routing_key(), "Proto.A.B");
let indexed = ProtocolEnvelope::builder()
.protocol("Proto")
.sender(RoleName::from_static("Worker"))
.sender_index(0)
.recipient(RoleName::from_static("Manager"))
.message_type("Msg")
.build()
.unwrap();
assert_eq!(indexed.routing_key(), "Proto.Worker[0].Manager");
}
#[test]
fn test_envelope_roundtrip() {
let original = ProtocolEnvelope::builder()
.protocol("Test")
.sender(RoleName::from_static("A"))
.recipient(RoleName::from_static("B"))
.message_type("Msg")
.payload(vec![1, 2, 3, 4, 5])
.build()
.unwrap();
let bytes = original.to_bytes().unwrap();
let restored = ProtocolEnvelope::from_bytes(&bytes).unwrap();
assert_eq!(original.protocol, restored.protocol);
assert_eq!(original.payload, restored.payload);
}
}