rocketmq_client_v4/protocols/header/
pull_message_request_header.rs

1use crate::protocols::mq_command::MqCommand;
2use crate::protocols::{request_code, SerializeDeserialize};
3use log::warn;
4use serde::{Deserialize, Serialize};
5use tokio::io::AsyncWriteExt;
6use tokio::net::TcpStream;
7
8const MAX_MSG_NUMS: i32 = 128;
9const SYS_FLAG: i32 = 2;
10
11const SUSPEND_TIMEOUT_MILLIS: i64 = 1 * 1000;
12
13#[derive(Debug, Serialize, Deserialize)]
14#[allow(non_snake_case)]
15pub struct PullMessageRequestHeader {
16    pub consumerGroup: String,
17    pub topic: String,
18    pub queueId: i32,
19    pub queueOffset: i64,
20    pub maxMsgNums: i32,
21    pub sysFlag: i32,
22    pub commitOffset: i64,
23    pub suspendTimeoutMillis: i64,
24    pub subscription: Option<String>,
25    pub subVersion: i64,
26    pub expressionType: Option<String>,
27}
28
29impl SerializeDeserialize for PullMessageRequestHeader {}
30impl PullMessageRequestHeader {
31    pub fn new(
32        consumer_group: String,
33        topic: String,
34        queue_id: i32,
35        queue_offset: i64,
36        commit_offset: i64,
37    ) -> Self {
38        PullMessageRequestHeader {
39            consumerGroup: consumer_group,
40            topic,
41            queueId: queue_id,
42            queueOffset: queue_offset,
43            maxMsgNums: MAX_MSG_NUMS,
44            sysFlag: SYS_FLAG,
45            commitOffset: commit_offset,
46            suspendTimeoutMillis: SUSPEND_TIMEOUT_MILLIS,
47            subscription: Some("*".to_string()),
48            subVersion: 0,
49            expressionType: Some("TAG".to_string()),
50        }
51    }
52
53    pub fn to_command(&self) -> MqCommand {
54        let header_body = self.to_bytes_1();
55        let req = MqCommand::new_with_body(request_code::PULL_MESSAGE, vec![], header_body, vec![]);
56        return req;
57    }
58    pub async fn send_request(&self, broker_stream: &mut TcpStream) -> Option<MqCommand> {
59        let req = self.to_command();
60        let opaque = req.opaque;
61        let req = req.to_bytes();
62
63        let req_result = broker_stream.write_all(&req).await;
64        if req_result.is_err() {
65            warn!("send request failed, error: {:?}", req_result)
66        }
67        let _ = broker_stream.flush().await;
68        let cmd = MqCommand::read_from_stream_with_opaque(broker_stream, opaque).await;
69
70        Some(cmd)
71    }
72}