rocketmq_client_v4/protocols/header/
pull_message_request_header.rs1use crate::protocols::mq_command::MqCommand;
2use crate::protocols::{request_code, SerializeDeserialize};
3use log::warn;
4use serde::{Deserialize, Serialize};
5use tokio::io::AsyncWriteExt;
6use tokio::net::TcpStream;
7
8const MAX_MSG_NUMS: i32 = 128;
9const SYS_FLAG: i32 = 2;
10
11const SUSPEND_TIMEOUT_MILLIS: i64 = 1 * 1000;
12
13#[derive(Debug, Serialize, Deserialize)]
14#[allow(non_snake_case)]
15pub struct PullMessageRequestHeader {
16 pub consumerGroup: String,
17 pub topic: String,
18 pub queueId: i32,
19 pub queueOffset: i64,
20 pub maxMsgNums: i32,
21 pub sysFlag: i32,
22 pub commitOffset: i64,
23 pub suspendTimeoutMillis: i64,
24 pub subscription: Option<String>,
25 pub subVersion: i64,
26 pub expressionType: Option<String>,
27}
28
29impl SerializeDeserialize for PullMessageRequestHeader {}
30impl PullMessageRequestHeader {
31 pub fn new(
32 consumer_group: String,
33 topic: String,
34 queue_id: i32,
35 queue_offset: i64,
36 commit_offset: i64,
37 ) -> Self {
38 PullMessageRequestHeader {
39 consumerGroup: consumer_group,
40 topic,
41 queueId: queue_id,
42 queueOffset: queue_offset,
43 maxMsgNums: MAX_MSG_NUMS,
44 sysFlag: SYS_FLAG,
45 commitOffset: commit_offset,
46 suspendTimeoutMillis: SUSPEND_TIMEOUT_MILLIS,
47 subscription: Some("*".to_string()),
48 subVersion: 0,
49 expressionType: Some("TAG".to_string()),
50 }
51 }
52
53 pub fn to_command(&self) -> MqCommand {
54 let header_body = self.to_bytes_1();
55 let req = MqCommand::new_with_body(request_code::PULL_MESSAGE, vec![], header_body, vec![]);
56 return req;
57 }
58 pub async fn send_request(&self, broker_stream: &mut TcpStream) -> Option<MqCommand> {
59 let req = self.to_command();
60 let opaque = req.opaque;
61 let req = req.to_bytes();
62
63 let req_result = broker_stream.write_all(&req).await;
64 if req_result.is_err() {
65 warn!("send request failed, error: {:?}", req_result)
66 }
67 let _ = broker_stream.flush().await;
68 let cmd = MqCommand::read_from_stream_with_opaque(broker_stream, opaque).await;
69
70 Some(cmd)
71 }
72}