rocketmq_client_rust/stat/
consumer_stats_manager.rs1use rocketmq_common::common::stats::stats_item_set::StatsItemSet;
16use rocketmq_remoting::protocol::body::consume_status::ConsumeStatus;
17
18const TOPIC_AND_GROUP_CONSUME_OK_TPS: &str = "CONSUME_OK_TPS";
19const TOPIC_AND_GROUP_CONSUME_FAILED_TPS: &str = "CONSUME_FAILED_TPS";
20const TOPIC_AND_GROUP_CONSUME_RT: &str = "CONSUME_RT";
21const TOPIC_AND_GROUP_PULL_TPS: &str = "PULL_TPS";
22const TOPIC_AND_GROUP_PULL_RT: &str = "PULL_RT";
23
24pub struct ConsumerStatsManager {
30 topic_and_group_consume_ok_tps: StatsItemSet,
31 topic_and_group_consume_rt: StatsItemSet,
32 topic_and_group_consume_failed_tps: StatsItemSet,
33 topic_and_group_pull_tps: StatsItemSet,
34 topic_and_group_pull_rt: StatsItemSet,
35}
36
37impl Default for ConsumerStatsManager {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl ConsumerStatsManager {
44 pub fn new() -> Self {
46 Self {
47 topic_and_group_consume_ok_tps: StatsItemSet::new(TOPIC_AND_GROUP_CONSUME_OK_TPS.to_string()),
48 topic_and_group_consume_rt: StatsItemSet::new(TOPIC_AND_GROUP_CONSUME_RT.to_string()),
49 topic_and_group_consume_failed_tps: StatsItemSet::new(TOPIC_AND_GROUP_CONSUME_FAILED_TPS.to_string()),
50 topic_and_group_pull_tps: StatsItemSet::new(TOPIC_AND_GROUP_PULL_TPS.to_string()),
51 topic_and_group_pull_rt: StatsItemSet::new(TOPIC_AND_GROUP_PULL_RT.to_string()),
52 }
53 }
54
55 pub fn start(&self) {
64 let sets_sec = [
66 self.topic_and_group_consume_ok_tps.clone(),
67 self.topic_and_group_consume_rt.clone(),
68 self.topic_and_group_consume_failed_tps.clone(),
69 self.topic_and_group_pull_tps.clone(),
70 self.topic_and_group_pull_rt.clone(),
71 ];
72 tokio::spawn(async move {
73 let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
74 loop {
75 interval.tick().await;
76 for set in &sets_sec {
77 set.sampling_in_seconds();
78 }
79 }
80 });
81
82 let sets_min = [
84 self.topic_and_group_consume_ok_tps.clone(),
85 self.topic_and_group_consume_rt.clone(),
86 self.topic_and_group_consume_failed_tps.clone(),
87 self.topic_and_group_pull_tps.clone(),
88 self.topic_and_group_pull_rt.clone(),
89 ];
90 tokio::spawn(async move {
91 let mut interval = tokio::time::interval(std::time::Duration::from_secs(10 * 60));
92 loop {
93 interval.tick().await;
94 for set in &sets_min {
95 set.sampling_in_minutes();
96 }
97 }
98 });
99
100 let sets_hour = [
102 self.topic_and_group_consume_ok_tps.clone(),
103 self.topic_and_group_consume_rt.clone(),
104 self.topic_and_group_consume_failed_tps.clone(),
105 self.topic_and_group_pull_tps.clone(),
106 self.topic_and_group_pull_rt.clone(),
107 ];
108 tokio::spawn(async move {
109 let mut interval = tokio::time::interval(std::time::Duration::from_secs(3600));
110 loop {
111 interval.tick().await;
112 for set in &sets_hour {
113 set.sampling_in_hours();
114 }
115 }
116 });
117 }
118
119 pub fn shutdown(&self) {}
121
122 pub fn inc_pull_rt(&self, group: &str, topic: &str, rt: u64) {
124 self.topic_and_group_pull_rt
125 .add_rt_value(&stats_key(topic, group), rt as i64, 1);
126 }
127
128 pub fn inc_pull_tps(&self, group: &str, topic: &str, msgs: u64) {
130 self.topic_and_group_pull_tps
131 .add_value(&stats_key(topic, group), msgs as i64, 1);
132 }
133
134 pub fn inc_consume_rt(&self, group: &str, topic: &str, rt: u64) {
136 self.topic_and_group_consume_rt
137 .add_rt_value(&stats_key(topic, group), rt as i64, 1);
138 }
139
140 pub fn inc_consume_ok_tps(&self, group: &str, topic: &str, msgs: u64) {
142 self.topic_and_group_consume_ok_tps
143 .add_value(&stats_key(topic, group), msgs as i64, 1);
144 }
145
146 pub fn inc_consume_failed_tps(&self, group: &str, topic: &str, msgs: u64) {
148 self.topic_and_group_consume_failed_tps
149 .add_value(&stats_key(topic, group), msgs as i64, 1);
150 }
151
152 pub fn consume_status(&self, group: &str, topic: &str) -> ConsumeStatus {
159 let key = stats_key(topic, group);
160
161 let pull_rt = self.topic_and_group_pull_rt.get_stats_data_in_minute(&key).get_avgpt();
162
163 let pull_tps = self.topic_and_group_pull_tps.get_stats_data_in_minute(&key).get_tps();
164
165 let consume_rt = {
166 let minute = self.topic_and_group_consume_rt.get_stats_data_in_minute(&key);
167 if minute.get_sum() == 0 {
168 self.topic_and_group_consume_rt.get_stats_data_in_hour(&key).get_avgpt()
169 } else {
170 minute.get_avgpt()
171 }
172 };
173
174 let consume_ok_tps = self
175 .topic_and_group_consume_ok_tps
176 .get_stats_data_in_minute(&key)
177 .get_tps();
178
179 let consume_failed_tps = self
180 .topic_and_group_consume_failed_tps
181 .get_stats_data_in_minute(&key)
182 .get_tps();
183
184 let consume_failed_msgs = self
185 .topic_and_group_consume_failed_tps
186 .get_stats_data_in_hour(&key)
187 .get_sum() as i64;
188
189 ConsumeStatus {
190 pull_rt,
191 pull_tps,
192 consume_rt,
193 consume_ok_tps,
194 consume_failed_tps,
195 consume_failed_msgs,
196 }
197 }
198}
199
200#[inline]
202fn stats_key(topic: &str, group: &str) -> String {
203 format!("{topic}@{group}")
204}
205
206#[cfg(test)]
207mod tests {
208 use super::*;
209
210 fn make_manager() -> ConsumerStatsManager {
211 ConsumerStatsManager::new()
212 }
213
214 #[test]
215 fn stats_key_format() {
216 assert_eq!(stats_key("TopicA", "GroupA"), "TopicA@GroupA");
217 }
218
219 #[test]
220 fn smoke_inc_consume_ok_tps() {
221 let mgr = make_manager();
222 mgr.inc_consume_ok_tps("GroupA", "TopicA", 5);
223 }
224
225 #[test]
226 fn smoke_inc_consume_failed_tps() {
227 let mgr = make_manager();
228 mgr.inc_consume_failed_tps("GroupA", "TopicA", 3);
229 }
230
231 #[test]
232 fn smoke_inc_consume_rt() {
233 let mgr = make_manager();
234 mgr.inc_consume_rt("GroupA", "TopicA", 42);
235 }
236
237 #[test]
238 fn smoke_inc_pull_rt() {
239 let mgr = make_manager();
240 mgr.inc_pull_rt("GroupA", "TopicA", 10);
241 }
242
243 #[test]
244 fn smoke_inc_pull_tps() {
245 let mgr = make_manager();
246 mgr.inc_pull_tps("GroupA", "TopicA", 100);
247 }
248
249 #[test]
250 fn consume_status_returns_zero_for_empty_stats() {
251 let mgr = make_manager();
252 let status = mgr.consume_status("GroupA", "TopicA");
253 assert_eq!(status.pull_rt, 0.0);
254 assert_eq!(status.pull_tps, 0.0);
255 assert_eq!(status.consume_rt, 0.0);
256 assert_eq!(status.consume_ok_tps, 0.0);
257 assert_eq!(status.consume_failed_tps, 0.0);
258 assert_eq!(status.consume_failed_msgs, 0);
259 }
260
261 #[tokio::test]
262 async fn start_launches_background_tasks() {
263 let mgr = make_manager();
264 mgr.start();
266 mgr.shutdown();
267 }
268
269 #[test]
270 fn default_creates_valid_manager() {
271 let mgr = ConsumerStatsManager::default();
272 let _ = mgr.consume_status("G", "T");
274 }
275}