rocketmq_client_v4/protocols/header/
pull_message_response_header.rs

1use crate::protocols::mq_command::HEADER_SERIALIZE_METHOD_JSON;
2use crate::protocols::SerializeDeserialize;
3use log::warn;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6
7#[derive(Debug, Serialize, Deserialize)]
8#[allow(non_snake_case)]
9pub struct PullMessageResponseHeader {
10    pub suggestWhichBrokerId: Option<i64>,
11    pub nextBeginOffset: Option<i64>,
12    pub minOffset: Option<i64>,
13    pub maxOffset: Option<i64>,
14}
15impl PullMessageResponseHeader {
16    pub fn convert_from_cmd() -> Self {
17        return Self {
18            suggestWhichBrokerId: None,
19            nextBeginOffset: None,
20            minOffset: None,
21            maxOffset: None,
22        };
23    }
24    pub fn bytes_to_header(serialize_method: u8, bytes: Vec<u8>) -> Option<Box<Self>> {
25        // debug!("PullMessageResponseHeader, method:{}, data:{:?}",serialize_method, String::from_utf8(bytes.clone()));
26        if serialize_method == HEADER_SERIALIZE_METHOD_JSON {
27            Self::bates_json_to_header(bytes)
28        } else {
29            Self::bytes_1_to_header(bytes)
30        }
31    }
32
33    fn bates_json_to_header(bytes: Vec<u8>) -> Option<Box<Self>> {
34        let json: Value = serde_json::from_slice(&bytes).unwrap();
35        let swbid: Option<i64> = match json.get("suggestWhichBrokerId") {
36            None => None,
37            Some(v) => v.as_str().unwrap().parse().ok(),
38        };
39
40        let next_begin_offset: Option<i64> = match json.get("nextBeginOffset") {
41            None => None,
42            Some(v) => v.as_str().unwrap().parse().ok(),
43        };
44
45        let min_offset: Option<i64> = match json.get("minOffset") {
46            None => None,
47            Some(v) => v.as_str().unwrap().parse().ok(),
48        };
49
50        let max_offset: Option<i64> = match json.get("maxOffset") {
51            None => None,
52            Some(v) => v.as_str().unwrap().parse().ok(),
53        };
54
55        Some(Box::new(Self {
56            suggestWhichBrokerId: swbid,
57            nextBeginOffset: next_begin_offset,
58            minOffset: min_offset,
59            maxOffset: max_offset,
60        }))
61    }
62}
63impl SerializeDeserialize for PullMessageResponseHeader {
64    fn bytes_1_to_header(bytes: Vec<u8>) -> Option<Box<Self>> {
65        // debug!(
66        //     "pull message response header:{:?}",
67        //     String::from_utf8(bytes.clone())
68        // );
69        if bytes.len() <= 0 {
70            warn!("header is empty");
71            return None;
72        }
73
74        let value = Self::bytes_1_to_map(bytes);
75        let suggest_which_broker_id: Option<i64> = match value.get("suggestWhichBrokerId") {
76            None => None,
77            Some(va) => Some(va.parse().unwrap()),
78        };
79
80        let next_begin_offset: Option<i64> = match value.get("nextBeginOffset") {
81            None => None,
82            Some(va) => Some(va.parse().unwrap()),
83        };
84
85        let min_offset: Option<i64> = match value.get("minOffset") {
86            None => None,
87            Some(va) => Some(va.parse().unwrap()),
88        };
89
90        let max_offset: Option<i64> = match value.get("maxOffset") {
91            None => None,
92            Some(va) => Some(va.parse().unwrap()),
93        };
94
95        Some(Box::new(Self {
96            suggestWhichBrokerId: suggest_which_broker_id,
97            nextBeginOffset: next_begin_offset,
98            minOffset: min_offset,
99            maxOffset: max_offset,
100        }))
101    }
102}