use crate::protocol::{
Frame, FrameType, MessageEnvelope, MessageId, ProtocolError, ProtocolVersion, StreamPressure,
WorkerRegisterOutcome, WorkerRegistration,
frame::{
PUBLISH_IDEMPOTENCY_KEY_FLAG, WORKER_REGISTER_ACK_ACCEPTED, WORKER_REGISTER_ACK_REJECTED,
},
};
use super::payload::PayloadReader;
pub(super) fn decode_known_payload(
frame_type: FrameType,
flags: u8,
stream_id: u32,
payload: &[u8],
) -> Result<Frame, ProtocolError> {
let mut reader = PayloadReader::new(payload);
let frame = match frame_type {
FrameType::Connect
| FrameType::ConnectAck
| FrameType::ConnectError
| FrameType::Disconnect
| FrameType::Ping
| FrameType::Pong => decode_control_payload(frame_type, flags, &mut reader)?,
FrameType::Subscribe
| FrameType::SubscribeAck
| FrameType::SubscribeError
| FrameType::Unsubscribe => {
decode_subscription_payload(frame_type, flags, stream_id, &mut reader)?
}
FrameType::Publish | FrameType::PublishAck | FrameType::PublishError => {
decode_publish_payload(frame_type, flags, stream_id, &mut reader)?
}
FrameType::ConversationOpen
| FrameType::ConversationMessage
| FrameType::ConversationClose
| FrameType::ConversationError => {
decode_conversation_payload(frame_type, flags, stream_id, &mut reader)?
}
FrameType::Accept | FrameType::Defer | FrameType::Reject => {
decode_pressure_payload(frame_type, flags, stream_id, &mut reader)?
}
FrameType::Push | FrameType::PushReply => {
decode_push_payload(frame_type, flags, stream_id, &mut reader)?
}
FrameType::WorkerRegister => decode_worker_register_payload(flags, &mut reader)?,
FrameType::WorkerRegisterAck => decode_worker_register_ack_payload(flags, &mut reader)?,
FrameType::Unknown(type_id) => Frame::Unknown {
type_id,
flags,
stream_id,
payload: payload.to_vec(),
},
};
reader.finish()?;
Ok(frame)
}
fn read_protocol_version(reader: &mut PayloadReader<'_>) -> Result<ProtocolVersion, ProtocolError> {
Ok(ProtocolVersion::new(reader.read_u16()?, reader.read_u16()?))
}
fn decode_control_payload(
frame_type: FrameType,
flags: u8,
reader: &mut PayloadReader<'_>,
) -> Result<Frame, ProtocolError> {
match frame_type {
FrameType::Connect => Ok(Frame::Connect {
flags,
min_version: read_protocol_version(reader)?,
max_version: read_protocol_version(reader)?,
auth_token: reader.read_bytes_field()?,
}),
FrameType::ConnectAck => Ok(Frame::ConnectAck {
flags,
selected_version: read_protocol_version(reader)?,
capabilities: reader.read_u32()?,
}),
FrameType::ConnectError => Ok(Frame::ConnectError {
flags,
reason_code: reader.read_u16()?,
message: reader.read_optional_string()?,
}),
FrameType::Disconnect => Ok(Frame::Disconnect { flags }),
FrameType::Ping => Ok(Frame::Ping { flags }),
FrameType::Pong => Ok(Frame::Pong { flags }),
_ => Err(ProtocolError::codec("frame type was not a control frame")),
}
}
fn decode_subscription_payload(
frame_type: FrameType,
flags: u8,
stream_id: u32,
reader: &mut PayloadReader<'_>,
) -> Result<Frame, ProtocolError> {
match frame_type {
FrameType::Subscribe => {
let channel = reader.read_string_field()?;
let accepted_schemas = reader.read_schema_ids_field()?;
let max_in_flight = reader.read_u32()?;
StreamPressure::new(max_in_flight)?;
Ok(Frame::Subscribe {
flags,
stream_id,
channel,
accepted_schemas,
max_in_flight,
})
}
FrameType::SubscribeAck => Ok(Frame::SubscribeAck {
flags,
stream_id,
subscription_id: reader.read_u64()?,
selected_schema: reader.read_schema_id()?,
}),
FrameType::SubscribeError => Ok(Frame::SubscribeError {
flags,
stream_id,
reason_code: reader.read_u16()?,
message: reader.read_optional_string()?,
}),
FrameType::Unsubscribe => Ok(Frame::Unsubscribe {
flags,
stream_id,
subscription_id: reader.read_u64()?,
}),
_ => Err(ProtocolError::codec(
"frame type was not a subscription frame",
)),
}
}
fn decode_publish_payload(
frame_type: FrameType,
flags: u8,
stream_id: u32,
reader: &mut PayloadReader<'_>,
) -> Result<Frame, ProtocolError> {
match frame_type {
FrameType::Publish => {
let channel = reader.read_string_field()?;
let envelope = MessageEnvelope::deserialize(&reader.read_bytes_field()?)?;
let idempotency_key = if flags & PUBLISH_IDEMPOTENCY_KEY_FLAG == 0 {
None
} else {
Some(reader.read_string_field()?)
};
Ok(Frame::Publish {
flags,
stream_id,
channel,
envelope,
idempotency_key,
})
}
FrameType::PublishAck => Ok(Frame::PublishAck {
flags,
stream_id,
message_id: reader.read_u64()?,
}),
FrameType::PublishError => Ok(Frame::PublishError {
flags,
stream_id,
reason_code: reader.read_u16()?,
message: reader.read_optional_string()?,
}),
_ => Err(ProtocolError::codec("frame type was not a publish frame")),
}
}
fn decode_conversation_payload(
frame_type: FrameType,
flags: u8,
stream_id: u32,
reader: &mut PayloadReader<'_>,
) -> Result<Frame, ProtocolError> {
let conversation_id = reader.read_u64()?;
match frame_type {
FrameType::ConversationOpen => Ok(Frame::ConversationOpen {
flags,
stream_id,
conversation_id,
subject: reader.read_string_field()?,
}),
FrameType::ConversationMessage => Ok(Frame::ConversationMessage {
flags,
stream_id,
conversation_id,
envelope: MessageEnvelope::deserialize(&reader.read_bytes_field()?)?,
}),
FrameType::ConversationClose => Ok(Frame::ConversationClose {
flags,
stream_id,
conversation_id,
reason_code: reader.read_optional_u16()?,
message: reader.read_optional_string()?,
}),
FrameType::ConversationError => Ok(Frame::ConversationError {
flags,
stream_id,
conversation_id,
reason_code: reader.read_u16()?,
message: reader.read_optional_string()?,
}),
_ => Err(ProtocolError::codec(
"frame type was not a conversation frame",
)),
}
}
fn decode_push_payload(
frame_type: FrameType,
flags: u8,
stream_id: u32,
reader: &mut PayloadReader<'_>,
) -> Result<Frame, ProtocolError> {
let correlation_id = reader.read_u64()?;
let payload = reader.read_bytes_field()?;
match frame_type {
FrameType::Push => Ok(Frame::Push {
flags,
stream_id,
correlation_id,
payload,
}),
FrameType::PushReply => Ok(Frame::PushReply {
flags,
stream_id,
correlation_id,
payload,
}),
_ => Err(ProtocolError::codec("frame type was not a push frame")),
}
}
fn decode_pressure_payload(
frame_type: FrameType,
flags: u8,
stream_id: u32,
reader: &mut PayloadReader<'_>,
) -> Result<Frame, ProtocolError> {
match frame_type {
FrameType::Accept => Ok(Frame::Accept {
flags,
stream_id,
referenced_message_id: MessageId::new(reader.read_string_field()?),
}),
FrameType::Defer => Ok(Frame::Defer {
flags,
stream_id,
referenced_message_id: MessageId::new(reader.read_string_field()?),
reason: reader.read_optional_string()?,
}),
FrameType::Reject => Ok(Frame::Reject {
flags,
stream_id,
referenced_message_id: MessageId::new(reader.read_string_field()?),
reason: reader.read_optional_string()?,
}),
_ => Err(ProtocolError::codec("frame type was not a pressure frame")),
}
}
fn decode_worker_register_payload(
flags: u8,
reader: &mut PayloadReader<'_>,
) -> Result<Frame, ProtocolError> {
let namespaces = reader.read_string_vec_field()?;
let task_queue = reader.read_string_field()?;
let node = reader.read_optional_string()?;
let activity_types = reader.read_string_vec_field()?;
let identity = reader.read_string_field()?;
Ok(Frame::WorkerRegister {
flags,
registration: WorkerRegistration {
namespaces,
task_queue,
node,
activity_types,
identity,
},
})
}
fn decode_worker_register_ack_payload(
flags: u8,
reader: &mut PayloadReader<'_>,
) -> Result<Frame, ProtocolError> {
let status = reader.read_u8()?;
let outcome = match status {
WORKER_REGISTER_ACK_ACCEPTED => WorkerRegisterOutcome::Accepted,
WORKER_REGISTER_ACK_REJECTED => WorkerRegisterOutcome::Rejected {
reason: reader.read_string_field()?,
},
_ => {
return Err(ProtocolError::codec(
"worker register ack status byte was invalid",
));
}
};
Ok(Frame::WorkerRegisterAck { flags, outcome })
}