fluss/rpc/message/
header.rs1use crate::proto::ErrorResponse;
19use crate::rpc::api_key::ApiKey;
20use crate::rpc::api_version::ApiVersion;
21use crate::rpc::frame::{ReadError, WriteError};
22use crate::rpc::message::{ReadVersionedType, WriteVersionedType};
23use bytes::{Buf, BufMut};
24use prost::Message;
25
26#[allow(dead_code)]
27const REQUEST_HEADER_LENGTH: i32 = 8;
28const SUCCESS_RESPONSE: u8 = 0;
29#[allow(dead_code)]
30const ERROR_RESPONSE: u8 = 1;
31#[allow(dead_code)]
32const SERVER_FAILURE: u8 = 2;
33
34#[derive(Debug, PartialEq, Eq)]
35pub struct RequestHeader {
36 pub request_api_key: ApiKey,
38
39 pub request_api_version: ApiVersion,
40
41 pub request_id: i32,
42
43 pub client_id: Option<String>,
44}
45
46impl<W> WriteVersionedType<W> for RequestHeader
47where
48 W: BufMut,
49{
50 fn write_versioned(&self, writer: &mut W, _version: ApiVersion) -> Result<(), WriteError> {
51 writer.put_i16(self.request_api_key.into());
52 writer.put_i16(self.request_api_version.0);
53 writer.put_i32(self.request_id);
54 Ok(())
55 }
56}
57
58#[derive(Debug, PartialEq)]
59pub struct ResponseHeader {
60 pub request_id: i32,
61 pub error_response: Option<ErrorResponse>,
62}
63
64impl<R> ReadVersionedType<R> for ResponseHeader
65where
66 R: Buf,
67{
68 fn read_versioned(reader: &mut R, _version: ApiVersion) -> Result<Self, ReadError> {
69 let resp_type = reader.get_u8();
70 let request_id = reader.get_i32();
71 if resp_type != SUCCESS_RESPONSE {
72 let error_response = ErrorResponse::decode(reader)?;
73 return Ok(ResponseHeader {
74 request_id,
75 error_response: Some(error_response),
76 });
77 }
78 Ok(ResponseHeader {
79 request_id,
80 error_response: None,
81 })
82 }
83}