Skip to main content

rocketmq_client_rust/stat/
consumer_stats_manager.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use rocketmq_common::common::stats::stats_item_set::StatsItemSet;
16use rocketmq_remoting::protocol::body::consume_status::ConsumeStatus;
17
18const TOPIC_AND_GROUP_CONSUME_OK_TPS: &str = "CONSUME_OK_TPS";
19const TOPIC_AND_GROUP_CONSUME_FAILED_TPS: &str = "CONSUME_FAILED_TPS";
20const TOPIC_AND_GROUP_CONSUME_RT: &str = "CONSUME_RT";
21const TOPIC_AND_GROUP_PULL_TPS: &str = "PULL_TPS";
22const TOPIC_AND_GROUP_PULL_RT: &str = "PULL_RT";
23
24/// Tracks consumer-side statistics for each topic/group pair.
25///
26/// Maintains five time-windowed metric sets: consume OK TPS, consume failed
27/// TPS, consume RT, pull TPS, and pull RT. All `inc_*` methods are
28/// thread-safe and intended to be called on the hot consumption path.
29pub struct ConsumerStatsManager {
30    topic_and_group_consume_ok_tps: StatsItemSet,
31    topic_and_group_consume_rt: StatsItemSet,
32    topic_and_group_consume_failed_tps: StatsItemSet,
33    topic_and_group_pull_tps: StatsItemSet,
34    topic_and_group_pull_rt: StatsItemSet,
35}
36
37impl Default for ConsumerStatsManager {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl ConsumerStatsManager {
44    /// Creates a new `ConsumerStatsManager` with all metric sets initialised.
45    pub fn new() -> Self {
46        Self {
47            topic_and_group_consume_ok_tps: StatsItemSet::new(TOPIC_AND_GROUP_CONSUME_OK_TPS.to_string()),
48            topic_and_group_consume_rt: StatsItemSet::new(TOPIC_AND_GROUP_CONSUME_RT.to_string()),
49            topic_and_group_consume_failed_tps: StatsItemSet::new(TOPIC_AND_GROUP_CONSUME_FAILED_TPS.to_string()),
50            topic_and_group_pull_tps: StatsItemSet::new(TOPIC_AND_GROUP_PULL_TPS.to_string()),
51            topic_and_group_pull_rt: StatsItemSet::new(TOPIC_AND_GROUP_PULL_RT.to_string()),
52        }
53    }
54
55    /// Starts background sampling tasks.
56    ///
57    /// Spawns three Tokio tasks that advance the sliding-window snapshots used
58    /// by [`consume_status`](Self::consume_status):
59    ///
60    /// - every 10 s  → `cs_list_minute` (drives per-minute stats)
61    /// - every 10 min → `cs_list_hour`   (drives per-hour stats)
62    /// - every 1 h   → `cs_list_day`    (drives per-day stats)
63    pub fn start(&self) {
64        // 10-second tick — drives cs_list_minute on each StatsItem.
65        let sets_sec = [
66            self.topic_and_group_consume_ok_tps.clone(),
67            self.topic_and_group_consume_rt.clone(),
68            self.topic_and_group_consume_failed_tps.clone(),
69            self.topic_and_group_pull_tps.clone(),
70            self.topic_and_group_pull_rt.clone(),
71        ];
72        tokio::spawn(async move {
73            let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
74            loop {
75                interval.tick().await;
76                for set in &sets_sec {
77                    set.sampling_in_seconds();
78                }
79            }
80        });
81
82        // 10-minute tick — drives cs_list_hour on each StatsItem.
83        let sets_min = [
84            self.topic_and_group_consume_ok_tps.clone(),
85            self.topic_and_group_consume_rt.clone(),
86            self.topic_and_group_consume_failed_tps.clone(),
87            self.topic_and_group_pull_tps.clone(),
88            self.topic_and_group_pull_rt.clone(),
89        ];
90        tokio::spawn(async move {
91            let mut interval = tokio::time::interval(std::time::Duration::from_secs(10 * 60));
92            loop {
93                interval.tick().await;
94                for set in &sets_min {
95                    set.sampling_in_minutes();
96                }
97            }
98        });
99
100        // 1-hour tick — drives cs_list_day on each StatsItem.
101        let sets_hour = [
102            self.topic_and_group_consume_ok_tps.clone(),
103            self.topic_and_group_consume_rt.clone(),
104            self.topic_and_group_consume_failed_tps.clone(),
105            self.topic_and_group_pull_tps.clone(),
106            self.topic_and_group_pull_rt.clone(),
107        ];
108        tokio::spawn(async move {
109            let mut interval = tokio::time::interval(std::time::Duration::from_secs(3600));
110            loop {
111                interval.tick().await;
112                for set in &sets_hour {
113                    set.sampling_in_hours();
114                }
115            }
116        });
117    }
118
119    /// Shuts down the stats manager.
120    pub fn shutdown(&self) {}
121
122    /// Records a single pull response-time observation in milliseconds.
123    pub fn inc_pull_rt(&self, group: &str, topic: &str, rt: u64) {
124        self.topic_and_group_pull_rt
125            .add_rt_value(&stats_key(topic, group), rt as i64, 1);
126    }
127
128    /// Records `msgs` messages successfully pulled in one batch.
129    pub fn inc_pull_tps(&self, group: &str, topic: &str, msgs: u64) {
130        self.topic_and_group_pull_tps
131            .add_value(&stats_key(topic, group), msgs as i64, 1);
132    }
133
134    /// Records a single consume response-time observation in milliseconds.
135    pub fn inc_consume_rt(&self, group: &str, topic: &str, rt: u64) {
136        self.topic_and_group_consume_rt
137            .add_rt_value(&stats_key(topic, group), rt as i64, 1);
138    }
139
140    /// Records `msgs` messages consumed successfully in one batch.
141    pub fn inc_consume_ok_tps(&self, group: &str, topic: &str, msgs: u64) {
142        self.topic_and_group_consume_ok_tps
143            .add_value(&stats_key(topic, group), msgs as i64, 1);
144    }
145
146    /// Records `msgs` messages that failed consumption in one batch.
147    pub fn inc_consume_failed_tps(&self, group: &str, topic: &str, msgs: u64) {
148        self.topic_and_group_consume_failed_tps
149            .add_value(&stats_key(topic, group), msgs as i64, 1);
150    }
151
152    /// Returns a point-in-time [`ConsumeStatus`] snapshot for the given
153    /// consumer group and topic.
154    ///
155    /// - Pull / consume RT uses the per-minute average; consume RT falls back to the per-hour
156    ///   average when the per-minute window is empty.
157    /// - `consume_failed_msgs` accumulates the per-hour sum of failed messages.
158    pub fn consume_status(&self, group: &str, topic: &str) -> ConsumeStatus {
159        let key = stats_key(topic, group);
160
161        let pull_rt = self.topic_and_group_pull_rt.get_stats_data_in_minute(&key).get_avgpt();
162
163        let pull_tps = self.topic_and_group_pull_tps.get_stats_data_in_minute(&key).get_tps();
164
165        let consume_rt = {
166            let minute = self.topic_and_group_consume_rt.get_stats_data_in_minute(&key);
167            if minute.get_sum() == 0 {
168                self.topic_and_group_consume_rt.get_stats_data_in_hour(&key).get_avgpt()
169            } else {
170                minute.get_avgpt()
171            }
172        };
173
174        let consume_ok_tps = self
175            .topic_and_group_consume_ok_tps
176            .get_stats_data_in_minute(&key)
177            .get_tps();
178
179        let consume_failed_tps = self
180            .topic_and_group_consume_failed_tps
181            .get_stats_data_in_minute(&key)
182            .get_tps();
183
184        let consume_failed_msgs = self
185            .topic_and_group_consume_failed_tps
186            .get_stats_data_in_hour(&key)
187            .get_sum() as i64;
188
189        ConsumeStatus {
190            pull_rt,
191            pull_tps,
192            consume_rt,
193            consume_ok_tps,
194            consume_failed_tps,
195            consume_failed_msgs,
196        }
197    }
198}
199
200/// Builds the canonical stats key `"topic@group"` used by all metric sets.
201#[inline]
202fn stats_key(topic: &str, group: &str) -> String {
203    format!("{topic}@{group}")
204}
205
206#[cfg(test)]
207mod tests {
208    use super::*;
209
210    fn make_manager() -> ConsumerStatsManager {
211        ConsumerStatsManager::new()
212    }
213
214    #[test]
215    fn stats_key_format() {
216        assert_eq!(stats_key("TopicA", "GroupA"), "TopicA@GroupA");
217    }
218
219    #[test]
220    fn smoke_inc_consume_ok_tps() {
221        let mgr = make_manager();
222        mgr.inc_consume_ok_tps("GroupA", "TopicA", 5);
223    }
224
225    #[test]
226    fn smoke_inc_consume_failed_tps() {
227        let mgr = make_manager();
228        mgr.inc_consume_failed_tps("GroupA", "TopicA", 3);
229    }
230
231    #[test]
232    fn smoke_inc_consume_rt() {
233        let mgr = make_manager();
234        mgr.inc_consume_rt("GroupA", "TopicA", 42);
235    }
236
237    #[test]
238    fn smoke_inc_pull_rt() {
239        let mgr = make_manager();
240        mgr.inc_pull_rt("GroupA", "TopicA", 10);
241    }
242
243    #[test]
244    fn smoke_inc_pull_tps() {
245        let mgr = make_manager();
246        mgr.inc_pull_tps("GroupA", "TopicA", 100);
247    }
248
249    #[test]
250    fn consume_status_returns_zero_for_empty_stats() {
251        let mgr = make_manager();
252        let status = mgr.consume_status("GroupA", "TopicA");
253        assert_eq!(status.pull_rt, 0.0);
254        assert_eq!(status.pull_tps, 0.0);
255        assert_eq!(status.consume_rt, 0.0);
256        assert_eq!(status.consume_ok_tps, 0.0);
257        assert_eq!(status.consume_failed_tps, 0.0);
258        assert_eq!(status.consume_failed_msgs, 0);
259    }
260
261    #[tokio::test]
262    async fn start_launches_background_tasks() {
263        let mgr = make_manager();
264        // Verifies start() does not panic in a Tokio runtime context.
265        mgr.start();
266        mgr.shutdown();
267    }
268
269    #[test]
270    fn default_creates_valid_manager() {
271        let mgr = ConsumerStatsManager::default();
272        // Should not panic when querying uninitialised key.
273        let _ = mgr.consume_status("G", "T");
274    }
275}