use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
use rocketmq_error::RocketmqError;
use tokio_util::codec::BytesCodec;
use tokio_util::codec::Decoder;
use tokio_util::codec::Encoder;
use crate::protocol::remoting_command::RemotingCommand;
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct RemotingCommandCodec(());
impl Default for RemotingCommandCodec {
fn default() -> Self {
Self::new()
}
}
impl RemotingCommandCodec {
pub fn new() -> Self {
RemotingCommandCodec(())
}
}
impl Decoder for RemotingCommandCodec {
type Error = rocketmq_error::RocketMQError;
type Item = RemotingCommand;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, rocketmq_error::RocketMQError> {
RemotingCommand::decode(src)
}
}
impl Encoder<RemotingCommand> for RemotingCommandCodec {
type Error = rocketmq_error::RocketMQError;
fn encode(&mut self, item: RemotingCommand, dst: &mut BytesMut) -> Result<(), Self::Error> {
let mut item = item;
item.fast_header_encode(dst);
if let Some(body_inner) = item.take_body() {
dst.put(body_inner);
}
Ok(())
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Default)]
pub struct CompositeCodec {
bytes_codec: BytesCodec,
remoting_command_codec: RemotingCommandCodec,
}
impl CompositeCodec {
pub fn new() -> Self {
Self {
bytes_codec: BytesCodec::new(),
remoting_command_codec: RemotingCommandCodec::new(),
}
}
}
impl Decoder for CompositeCodec {
type Error = rocketmq_error::RocketMQError;
type Item = RemotingCommand;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, rocketmq_error::RocketMQError> {
self.remoting_command_codec.decode(src)
}
}
impl Encoder<Bytes> for CompositeCodec {
type Error = rocketmq_error::RocketMQError;
fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> {
self.bytes_codec.encode(item, dst).map_err(|error| {
RocketmqError::RemotingCommandEncoderError(format!("Error encoding bytes: {error}")).into()
})
}
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use super::*;
use crate::protocol::header::client_request_header::GetRouteInfoRequestHeader;
use crate::protocol::LanguageCode;
#[tokio::test]
async fn decode_handles_insufficient_data() {
let mut decoder = RemotingCommandCodec::new();
let mut src = BytesMut::from(&[0, 0, 0, 1][..]);
assert!(matches!(decoder.decode(&mut src), Ok(None)));
}
#[tokio::test]
async fn decode_handles_invalid_total_size() {
let mut decoder = RemotingCommandCodec::new();
let mut src = BytesMut::from(&[0, 0, 0, 1, 0, 0, 0, 0][..]);
assert!(decoder.decode(&mut src).is_err());
}
#[tokio::test]
async fn encode_handles_empty_body() {
let mut encoder = RemotingCommandCodec::new();
let mut dst = BytesMut::new();
let command = RemotingCommand::create_remoting_command(1)
.set_code(1)
.set_language(LanguageCode::JAVA)
.set_opaque(1)
.set_flag(1)
.set_command_custom_header(GetRouteInfoRequestHeader::new("1111", Some(true)))
.set_remark_option(Some("remark".to_string()));
assert!(encoder.encode(command, &mut dst).is_ok());
}
#[tokio::test]
async fn encode_handles_non_empty_body() {
let mut encoder = RemotingCommandCodec::new();
let mut dst = BytesMut::new();
let command = RemotingCommand::create_remoting_command(1)
.set_code(1)
.set_language(LanguageCode::JAVA)
.set_opaque(1)
.set_flag(1)
.set_body(Bytes::from("body"))
.set_command_custom_header(GetRouteInfoRequestHeader::new("1111", Some(true)))
.set_remark_option(Some("remark".to_string()));
assert!(encoder.encode(command, &mut dst).is_ok());
}
}