rocketmq_common/common/statistics/
statistics_manager.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Duration;
20use std::time::SystemTime;
21
22use parking_lot::RwLock;
23use tokio::task;
24use tokio::time::interval;
25
26use crate::common::statistics::statistics_item::StatisticsItem;
27use crate::common::statistics::statistics_item_state_getter::StatisticsItemStateGetter;
28use crate::common::statistics::statistics_kind_meta::StatisticsKindMeta;
29use crate::TimeUtils::get_current_millis;
30
31type StatsTable = Arc<RwLock<HashMap<String, HashMap<String, Arc<StatisticsItem>>>>>;
32
33pub struct StatisticsManager {
34    kind_meta_map: Arc<RwLock<HashMap<String, Arc<StatisticsKindMeta>>>>,
35    brief_metas: Option<Vec<(String, Vec<Vec<i64>>)>>,
36    stats_table: StatsTable,
37    statistics_item_state_getter: Option<Arc<dyn StatisticsItemStateGetter + Send + Sync>>,
38}
39
40impl Default for StatisticsManager {
41    fn default() -> Self {
42        Self::new()
43    }
44}
45
46impl StatisticsManager {
47    const MAX_IDLE_TIME: u64 = 10 * 60 * 1000;
48
49    pub fn new() -> Self {
50        let manager = Self {
51            kind_meta_map: Arc::new(RwLock::new(HashMap::new())),
52            brief_metas: None,
53            stats_table: Arc::new(RwLock::new(HashMap::new())),
54            statistics_item_state_getter: None,
55        };
56        manager.start();
57        manager
58    }
59
60    pub fn with_kind_meta(kind_meta: HashMap<String, Arc<StatisticsKindMeta>>) -> Self {
61        let manager = Self {
62            kind_meta_map: Arc::new(RwLock::new(kind_meta)),
63            brief_metas: None,
64            stats_table: Arc::new(RwLock::new(HashMap::new())),
65            statistics_item_state_getter: None,
66        };
67        manager.start();
68        manager
69    }
70
71    pub fn add_statistics_kind_meta(&self, kind_meta: Arc<StatisticsKindMeta>) {
72        let mut kind_meta_map = self.kind_meta_map.write();
73        kind_meta_map.insert(kind_meta.get_name().to_string(), kind_meta.clone());
74        let mut stats_table = self.stats_table.write();
75        stats_table
76            .entry(kind_meta.get_name().to_string())
77            .or_default();
78    }
79
80    pub fn set_brief_meta(&mut self, brief_metas: Vec<(String, Vec<Vec<i64>>)>) {
81        self.brief_metas = Some(brief_metas);
82    }
83
84    fn start(&self) {
85        let stats_table = self.stats_table.clone();
86        let kind_meta_map = self.kind_meta_map.clone();
87        let statistics_item_state_getter = self.statistics_item_state_getter.clone();
88
89        task::spawn(async move {
90            let mut interval = interval(Duration::from_millis(Self::MAX_IDLE_TIME / 3));
91            let stats_table_clone = stats_table.clone();
92            loop {
93                interval.tick().await;
94
95                let stats_table = stats_table.read();
96                for (_kind, item_map) in stats_table.iter() {
97                    let tmp_item_map: HashMap<_, _> = item_map.clone().into_iter().collect();
98
99                    for item in tmp_item_map.values() {
100                        let last_time_stamp = item.last_timestamp();
101                        if get_current_millis() - last_time_stamp > Self::MAX_IDLE_TIME
102                            && (statistics_item_state_getter.is_none()
103                                || !statistics_item_state_getter.as_ref().unwrap().online(item))
104                        {
105                            // Remove expired item
106                            remove(item, &stats_table_clone, &kind_meta_map);
107                        }
108                    }
109                }
110            }
111        });
112    }
113
114    pub async fn inc(&self, kind: &str, key: &str, item_accumulates: Vec<i64>) -> bool {
115        if let Some(item_map) = self.stats_table.write().get_mut(kind) {
116            if let Some(item) = item_map.get(key) {
117                item.inc_items(item_accumulates.clone());
118                return true;
119            } else {
120                let kind_meta_map = self.kind_meta_map.read();
121                if let Some(kind_meta) = kind_meta_map.get(kind) {
122                    let new_item = Arc::new(StatisticsItem::new(
123                        kind,
124                        key,
125                        kind_meta
126                            .get_item_names()
127                            .iter()
128                            .map(|item| item.as_str())
129                            .collect(),
130                    ));
131                    item_map.insert(key.to_string(), new_item.clone());
132                    new_item.inc_items(item_accumulates);
133                    self.schedule_statistics_item(new_item);
134                    return true;
135                }
136            }
137        }
138        false
139    }
140
141    fn schedule_statistics_item(&self, item: Arc<StatisticsItem>) {
142        let kind_meta_map = self.kind_meta_map.read();
143        if let Some(kind_meta) = kind_meta_map.get(item.stat_kind()) {
144            kind_meta.get_scheduled_printer().schedule(item.as_ref());
145        }
146    }
147
148    pub fn set_statistics_item_state_getter(
149        &mut self,
150        getter: Arc<dyn StatisticsItemStateGetter + Send + Sync>,
151    ) {
152        self.statistics_item_state_getter = Some(getter);
153    }
154}
155
156pub fn remove(
157    item: &StatisticsItem,
158    stats_table: &StatsTable,
159    kind_meta_map: &Arc<RwLock<HashMap<String, Arc<StatisticsKindMeta>>>>,
160) {
161    let stat_kind = item.stat_kind();
162    let stat_object = item.stat_object();
163    if let Some(item_map) = stats_table.write().get_mut(stat_kind) {
164        item_map.remove(stat_object);
165    }
166
167    // Remove from scheduled printer
168    let kind_meta_map = kind_meta_map.write();
169    if let Some(kind_meta) = kind_meta_map.get(stat_kind) {
170        kind_meta.get_scheduled_printer().remove(item);
171    }
172}