use super::{
causal::MessageId,
envelope::{MessageEnvelope, SchemaId},
error::ProtocolError,
version::ProtocolVersion,
};
pub const HEADER_LEN: usize = 10;
pub const CONVERSATION_REPLY_REQUESTED_FLAG: u8 = 0x01;
pub const PUBLISH_IDEMPOTENCY_KEY_FLAG: u8 = 0x02;
pub const PUBLISH_DELIVERED_FLAG: u8 = 0x01;
pub(crate) const WORKER_REGISTER_ACK_ACCEPTED: u8 = 0x00;
pub(crate) const WORKER_REGISTER_ACK_REJECTED: u8 = 0x01;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum FrameType {
Connect,
ConnectAck,
ConnectError,
Disconnect,
Subscribe,
SubscribeAck,
SubscribeError,
Unsubscribe,
Publish,
PublishAck,
PublishError,
ConversationOpen,
ConversationMessage,
ConversationClose,
ConversationError,
Accept,
Defer,
Reject,
Ping,
Pong,
Push,
PushReply,
WorkerRegister,
WorkerRegisterAck,
Unknown(u8),
}
impl FrameType {
#[must_use]
pub const fn is_control(self) -> bool {
matches!(
self,
Self::Connect
| Self::ConnectAck
| Self::ConnectError
| Self::Disconnect
| Self::Ping
| Self::Pong
| Self::WorkerRegister
| Self::WorkerRegisterAck
)
}
}
impl From<u8> for FrameType {
fn from(value: u8) -> Self {
match value {
0x01 => Self::Connect,
0x02 => Self::ConnectAck,
0x03 => Self::ConnectError,
0x04 => Self::Disconnect,
0x05 => Self::Subscribe,
0x06 => Self::SubscribeAck,
0x07 => Self::SubscribeError,
0x08 => Self::Unsubscribe,
0x09 => Self::Publish,
0x0A => Self::PublishAck,
0x0B => Self::PublishError,
0x0C => Self::ConversationOpen,
0x0D => Self::ConversationMessage,
0x0E => Self::ConversationClose,
0x0F => Self::ConversationError,
0x10 => Self::Accept,
0x11 => Self::Defer,
0x12 => Self::Reject,
0x13 => Self::Ping,
0x14 => Self::Pong,
0x15 => Self::Push,
0x16 => Self::PushReply,
0x17 => Self::WorkerRegister,
0x18 => Self::WorkerRegisterAck,
unknown => Self::Unknown(unknown),
}
}
}
impl From<FrameType> for u8 {
fn from(value: FrameType) -> Self {
match value {
FrameType::Connect => 0x01,
FrameType::ConnectAck => 0x02,
FrameType::ConnectError => 0x03,
FrameType::Disconnect => 0x04,
FrameType::Subscribe => 0x05,
FrameType::SubscribeAck => 0x06,
FrameType::SubscribeError => 0x07,
FrameType::Unsubscribe => 0x08,
FrameType::Publish => 0x09,
FrameType::PublishAck => 0x0A,
FrameType::PublishError => 0x0B,
FrameType::ConversationOpen => 0x0C,
FrameType::ConversationMessage => 0x0D,
FrameType::ConversationClose => 0x0E,
FrameType::ConversationError => 0x0F,
FrameType::Accept => 0x10,
FrameType::Defer => 0x11,
FrameType::Reject => 0x12,
FrameType::Ping => 0x13,
FrameType::Pong => 0x14,
FrameType::Push => 0x15,
FrameType::PushReply => 0x16,
FrameType::WorkerRegister => 0x17,
FrameType::WorkerRegisterAck => 0x18,
FrameType::Unknown(type_id) => type_id,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct FrameHeader {
pub frame_type: FrameType,
pub flags: u8,
pub stream_id: u32,
pub payload_length: u32,
}
impl FrameHeader {
pub const WIRE_LEN: usize = HEADER_LEN;
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct WorkerRegistration {
pub namespaces: Vec<String>,
pub task_queue: String,
pub node: Option<String>,
pub activity_types: Vec<String>,
pub identity: String,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum WorkerRegisterOutcome {
Accepted,
Rejected {
reason: String,
},
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Frame {
Connect {
flags: u8,
min_version: ProtocolVersion,
max_version: ProtocolVersion,
auth_token: Vec<u8>,
},
ConnectAck {
flags: u8,
selected_version: ProtocolVersion,
capabilities: u32,
},
ConnectError {
flags: u8,
reason_code: u16,
message: Option<String>,
},
Disconnect { flags: u8 },
Subscribe {
flags: u8,
stream_id: u32,
channel: String,
accepted_schemas: Vec<SchemaId>,
max_in_flight: u32,
},
SubscribeAck {
flags: u8,
stream_id: u32,
subscription_id: u64,
selected_schema: SchemaId,
},
SubscribeError {
flags: u8,
stream_id: u32,
reason_code: u16,
message: Option<String>,
},
Unsubscribe {
flags: u8,
stream_id: u32,
subscription_id: u64,
},
Publish {
flags: u8,
stream_id: u32,
channel: String,
envelope: MessageEnvelope,
idempotency_key: Option<String>,
},
PublishAck {
flags: u8,
stream_id: u32,
message_id: u64,
},
PublishError {
flags: u8,
stream_id: u32,
reason_code: u16,
message: Option<String>,
},
ConversationOpen {
flags: u8,
stream_id: u32,
conversation_id: u64,
subject: String,
},
ConversationMessage {
flags: u8,
stream_id: u32,
conversation_id: u64,
envelope: MessageEnvelope,
},
ConversationClose {
flags: u8,
stream_id: u32,
conversation_id: u64,
reason_code: Option<u16>,
message: Option<String>,
},
ConversationError {
flags: u8,
stream_id: u32,
conversation_id: u64,
reason_code: u16,
message: Option<String>,
},
Accept {
flags: u8,
stream_id: u32,
referenced_message_id: MessageId,
},
Defer {
flags: u8,
stream_id: u32,
referenced_message_id: MessageId,
reason: Option<String>,
},
Reject {
flags: u8,
stream_id: u32,
referenced_message_id: MessageId,
reason: Option<String>,
},
Ping { flags: u8 },
Pong { flags: u8 },
Push {
flags: u8,
stream_id: u32,
correlation_id: u64,
payload: Vec<u8>,
},
PushReply {
flags: u8,
stream_id: u32,
correlation_id: u64,
payload: Vec<u8>,
},
WorkerRegister {
flags: u8,
registration: WorkerRegistration,
},
WorkerRegisterAck {
flags: u8,
outcome: WorkerRegisterOutcome,
},
Unknown {
type_id: u8,
flags: u8,
stream_id: u32,
payload: Vec<u8>,
},
}
impl Frame {
pub fn new_ping(stream_id: u32) -> Result<Self, ProtocolError> {
validate_stream(FrameType::Ping, stream_id)?;
Ok(Self::Ping { flags: 0 })
}
pub fn new_publish(
stream_id: u32,
channel: impl Into<String>,
envelope: MessageEnvelope,
) -> Result<Self, ProtocolError> {
validate_stream(FrameType::Publish, stream_id)?;
Ok(Self::Publish {
flags: 0,
stream_id,
channel: channel.into(),
envelope,
idempotency_key: None,
})
}
pub fn new_publish_with_idempotency_key(
stream_id: u32,
channel: impl Into<String>,
envelope: MessageEnvelope,
idempotency_key: impl Into<String>,
) -> Result<Self, ProtocolError> {
validate_stream(FrameType::Publish, stream_id)?;
Ok(Self::Publish {
flags: PUBLISH_IDEMPOTENCY_KEY_FLAG,
stream_id,
channel: channel.into(),
envelope,
idempotency_key: Some(idempotency_key.into()),
})
}
pub fn new_push(
stream_id: u32,
correlation_id: u64,
payload: Vec<u8>,
) -> Result<Self, ProtocolError> {
validate_stream(FrameType::Push, stream_id)?;
Ok(Self::Push {
flags: 0,
stream_id,
correlation_id,
payload,
})
}
pub fn new_push_reply(
stream_id: u32,
correlation_id: u64,
payload: Vec<u8>,
) -> Result<Self, ProtocolError> {
validate_stream(FrameType::PushReply, stream_id)?;
Ok(Self::PushReply {
flags: 0,
stream_id,
correlation_id,
payload,
})
}
#[must_use]
pub const fn frame_type(&self) -> FrameType {
match self {
Self::Connect { .. } => FrameType::Connect,
Self::ConnectAck { .. } => FrameType::ConnectAck,
Self::ConnectError { .. } => FrameType::ConnectError,
Self::Disconnect { .. } => FrameType::Disconnect,
Self::Subscribe { .. } => FrameType::Subscribe,
Self::SubscribeAck { .. } => FrameType::SubscribeAck,
Self::SubscribeError { .. } => FrameType::SubscribeError,
Self::Unsubscribe { .. } => FrameType::Unsubscribe,
Self::Publish { .. } => FrameType::Publish,
Self::PublishAck { .. } => FrameType::PublishAck,
Self::PublishError { .. } => FrameType::PublishError,
Self::ConversationOpen { .. } => FrameType::ConversationOpen,
Self::ConversationMessage { .. } => FrameType::ConversationMessage,
Self::ConversationClose { .. } => FrameType::ConversationClose,
Self::ConversationError { .. } => FrameType::ConversationError,
Self::Accept { .. } => FrameType::Accept,
Self::Defer { .. } => FrameType::Defer,
Self::Reject { .. } => FrameType::Reject,
Self::Ping { .. } => FrameType::Ping,
Self::Pong { .. } => FrameType::Pong,
Self::Push { .. } => FrameType::Push,
Self::PushReply { .. } => FrameType::PushReply,
Self::WorkerRegister { .. } => FrameType::WorkerRegister,
Self::WorkerRegisterAck { .. } => FrameType::WorkerRegisterAck,
Self::Unknown { type_id, .. } => FrameType::Unknown(*type_id),
}
}
#[must_use]
pub const fn flags(&self) -> u8 {
match self {
Self::Connect { flags, .. }
| Self::ConnectAck { flags, .. }
| Self::ConnectError { flags, .. }
| Self::Disconnect { flags, .. }
| Self::Subscribe { flags, .. }
| Self::SubscribeAck { flags, .. }
| Self::SubscribeError { flags, .. }
| Self::Unsubscribe { flags, .. }
| Self::Publish { flags, .. }
| Self::PublishAck { flags, .. }
| Self::PublishError { flags, .. }
| Self::ConversationOpen { flags, .. }
| Self::ConversationMessage { flags, .. }
| Self::ConversationClose { flags, .. }
| Self::ConversationError { flags, .. }
| Self::Accept { flags, .. }
| Self::Defer { flags, .. }
| Self::Reject { flags, .. }
| Self::Ping { flags }
| Self::Pong { flags }
| Self::Push { flags, .. }
| Self::PushReply { flags, .. }
| Self::WorkerRegister { flags, .. }
| Self::WorkerRegisterAck { flags, .. }
| Self::Unknown { flags, .. } => *flags,
}
}
#[must_use]
pub const fn stream_id(&self) -> u32 {
match self {
Self::Connect { .. }
| Self::ConnectAck { .. }
| Self::ConnectError { .. }
| Self::Disconnect { .. }
| Self::Ping { .. }
| Self::Pong { .. }
| Self::WorkerRegister { .. }
| Self::WorkerRegisterAck { .. } => 0,
Self::Subscribe { stream_id, .. }
| Self::SubscribeAck { stream_id, .. }
| Self::SubscribeError { stream_id, .. }
| Self::Unsubscribe { stream_id, .. }
| Self::Publish { stream_id, .. }
| Self::PublishAck { stream_id, .. }
| Self::PublishError { stream_id, .. }
| Self::ConversationOpen { stream_id, .. }
| Self::ConversationMessage { stream_id, .. }
| Self::ConversationClose { stream_id, .. }
| Self::ConversationError { stream_id, .. }
| Self::Accept { stream_id, .. }
| Self::Defer { stream_id, .. }
| Self::Reject { stream_id, .. }
| Self::Push { stream_id, .. }
| Self::PushReply { stream_id, .. }
| Self::Unknown { stream_id, .. } => *stream_id,
}
}
pub(crate) fn validate(&self) -> Result<(), ProtocolError> {
validate_stream(self.frame_type(), self.stream_id())?;
if let Self::Subscribe { max_in_flight, .. } = self {
if *max_in_flight == 0 {
return Err(ProtocolError::codec(
"max_in_flight must be greater than zero",
));
}
}
Ok(())
}
}
pub fn validate_stream(frame_type: FrameType, stream_id: u32) -> Result<(), ProtocolError> {
if matches!(frame_type, FrameType::Unknown(_)) {
return Ok(());
}
let valid = if frame_type.is_control() {
stream_id == 0
} else {
stream_id >= 1
};
if valid {
Ok(())
} else {
Err(ProtocolError::invalid_stream(frame_type, stream_id))
}
}
#[cfg(test)]
mod tests;