Skip to main content

rocketmq_client_rust/consumer/consumer_impl/
pull_request.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::hash::Hash;
16use std::hash::Hasher;
17use std::sync::Arc;
18
19use cheetah_string::CheetahString;
20use rocketmq_common::common::message::message_enum::MessageRequestMode;
21use rocketmq_common::common::message::message_queue::MessageQueue;
22
23use crate::consumer::consumer_impl::message_request::MessageRequest;
24use crate::consumer::consumer_impl::process_queue::ProcessQueue;
25
26#[derive(Clone)]
27pub struct PullRequest {
28    pub consumer_group: CheetahString,
29    pub message_queue: MessageQueue,
30    pub process_queue: Arc<ProcessQueue>,
31    pub next_offset: i64,
32    pub previously_locked: bool,
33}
34
35impl PullRequest {
36    pub fn new(
37        consumer_group: CheetahString,
38        message_queue: MessageQueue,
39        process_queue: Arc<ProcessQueue>,
40        next_offset: i64,
41    ) -> Self {
42        PullRequest {
43            consumer_group,
44            message_queue,
45            process_queue,
46            next_offset,
47            previously_locked: false,
48        }
49    }
50
51    pub fn is_previously_locked(&self) -> bool {
52        self.previously_locked
53    }
54
55    pub fn set_previously_locked(&mut self, previously_locked: bool) {
56        self.previously_locked = previously_locked;
57    }
58
59    pub fn get_consumer_group(&self) -> &str {
60        &self.consumer_group
61    }
62
63    pub fn set_consumer_group(&mut self, consumer_group: CheetahString) {
64        self.consumer_group = consumer_group;
65    }
66
67    pub fn get_message_queue(&self) -> &MessageQueue {
68        &self.message_queue
69    }
70
71    pub fn set_message_queue(&mut self, message_queue: MessageQueue) {
72        self.message_queue = message_queue;
73    }
74
75    pub fn get_next_offset(&self) -> i64 {
76        self.next_offset
77    }
78
79    pub fn set_next_offset(&mut self, next_offset: i64) {
80        self.next_offset = next_offset;
81    }
82
83    pub fn get_process_queue(&self) -> &Arc<ProcessQueue> {
84        &self.process_queue
85    }
86
87    pub fn set_process_queue(&mut self, process_queue: Arc<ProcessQueue>) {
88        self.process_queue = process_queue;
89    }
90}
91
92impl MessageRequest for PullRequest {
93    fn get_message_request_mode(&self) -> MessageRequestMode {
94        MessageRequestMode::Pull
95    }
96}
97
98impl Hash for PullRequest {
99    fn hash<H: Hasher>(&self, state: &mut H) {
100        self.consumer_group.hash(state);
101        self.message_queue.hash(state);
102    }
103}
104
105impl PartialEq for PullRequest {
106    fn eq(&self, other: &Self) -> bool {
107        self.consumer_group == other.consumer_group && self.message_queue == other.message_queue
108    }
109}
110
111impl Eq for PullRequest {}
112
113impl std::fmt::Display for PullRequest {
114    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115        write!(
116            f,
117            "PullRequest [consumer_group={}, message_queue={:?}, next_offset={}]",
118            self.consumer_group, self.message_queue, self.next_offset
119        )
120    }
121}