use std::i32;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use tokio_util::codec::{Decoder, Encoder};
use crate::{
error::{RemotingError, RemotingError::RemotingCommandDecoderError},
protocol::remoting_command::RemotingCommand,
};
#[derive(Debug, Clone)]
pub struct RemotingCommandCodec;
impl Default for RemotingCommandCodec {
fn default() -> Self {
Self::new()
}
}
impl RemotingCommandCodec {
pub fn new() -> Self {
Self {}
}
}
impl Decoder for RemotingCommandCodec {
type Item = RemotingCommand;
type Error = RemotingError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let read_to = src.len();
if read_to < 4 {
return Ok(None);
}
let total_size = i32::from_be_bytes([src[0], src[1], src[2], src[3]]) as usize;
if read_to < total_size + 4 {
return Ok(None);
}
let mut cmd_data = src.split_to(total_size + 4);
let _ = cmd_data.get_i32();
let header_length = cmd_data.get_i32() as usize;
let header_data = cmd_data.split_to(header_length);
let cmd = serde_json::from_slice::<RemotingCommand>(&header_data).map_err(|error| {
RemotingCommandDecoderError(format!("Deserialization error: {}", error))
})?;
let body_length = total_size - 4 - header_length;
Ok(Some(if body_length > 0 {
let body_data = cmd_data.split_to(body_length).to_vec();
cmd.set_body(Some(Bytes::from(body_data)))
} else {
cmd
}))
}
}
impl Encoder<RemotingCommand> for RemotingCommandCodec {
type Error = RemotingError;
fn encode(&mut self, item: RemotingCommand, dst: &mut BytesMut) -> Result<(), Self::Error> {
let mut total_length = 4i32;
let header = item.fast_header_encode();
let mut header_length = 0i32;
if let Some(header) = &header {
header_length = header.len() as i32;
total_length += header_length;
}
let body = item.get_body();
if let Some(body) = &body {
total_length += body.len() as i32;
}
dst.reserve(total_length as usize);
dst.put_i32(total_length);
let serialize_type =
RemotingCommand::mark_serialize_type(header_length, item.get_serialize_type());
dst.put_i32(serialize_type);
if let Some(header_inner) = header {
dst.put(header_inner);
}
if let Some(body_inner) = body {
dst.put(body_inner);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::protocol::{header::client_request_header::GetRouteInfoRequestHeader, LanguageCode};
#[test]
fn test_encode() {
let mut dst = BytesMut::new();
let command = RemotingCommand::create_remoting_command(1)
.set_code(1)
.set_language(LanguageCode::JAVA)
.set_opaque(1)
.set_flag(1)
.set_body(Some(Bytes::from("body")))
.set_command_custom_header(Some(Box::new(GetRouteInfoRequestHeader::new(
"1111",
Some(true),
))))
.set_remark(Some("remark".to_string()));
println!("{}", serde_json::to_string(&command).unwrap());
let mut encoder = RemotingCommandCodec::new();
let _ = encoder.encode(command, &mut dst);
let _expected_length = 8 + "header".len() as i32 + "body".len() as i32;
let result = encoder.decode(&mut dst);
println!("{:?}", result.unwrap().unwrap().get_serialize_type());
}
#[test]
fn tsts() {
let mut bytes1 = bytes::BytesMut::from("122222");
let _bytes2 = bytes1.split_to(1);
println!("{}", bytes1.len());
bytes1.reserve(1);
let _bytes2 = bytes1.split_to(1);
println!("{}", bytes1.len());
}
}