rocketmq_common/common/statistics/
statistics_manager.rs1use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Duration;
20use std::time::SystemTime;
21
22use parking_lot::RwLock;
23use tokio::task;
24use tokio::time::interval;
25
26use crate::common::statistics::statistics_item::StatisticsItem;
27use crate::common::statistics::statistics_item_state_getter::StatisticsItemStateGetter;
28use crate::common::statistics::statistics_kind_meta::StatisticsKindMeta;
29use crate::TimeUtils::get_current_millis;
30
31type StatsTable = Arc<RwLock<HashMap<String, HashMap<String, Arc<StatisticsItem>>>>>;
32
33pub struct StatisticsManager {
34 kind_meta_map: Arc<RwLock<HashMap<String, Arc<StatisticsKindMeta>>>>,
35 brief_metas: Option<Vec<(String, Vec<Vec<i64>>)>>,
36 stats_table: StatsTable,
37 statistics_item_state_getter: Option<Arc<dyn StatisticsItemStateGetter + Send + Sync>>,
38}
39
40impl Default for StatisticsManager {
41 fn default() -> Self {
42 Self::new()
43 }
44}
45
46impl StatisticsManager {
47 const MAX_IDLE_TIME: u64 = 10 * 60 * 1000;
48
49 pub fn new() -> Self {
50 let manager = Self {
51 kind_meta_map: Arc::new(RwLock::new(HashMap::new())),
52 brief_metas: None,
53 stats_table: Arc::new(RwLock::new(HashMap::new())),
54 statistics_item_state_getter: None,
55 };
56 manager.start();
57 manager
58 }
59
60 pub fn with_kind_meta(kind_meta: HashMap<String, Arc<StatisticsKindMeta>>) -> Self {
61 let manager = Self {
62 kind_meta_map: Arc::new(RwLock::new(kind_meta)),
63 brief_metas: None,
64 stats_table: Arc::new(RwLock::new(HashMap::new())),
65 statistics_item_state_getter: None,
66 };
67 manager.start();
68 manager
69 }
70
71 pub fn add_statistics_kind_meta(&self, kind_meta: Arc<StatisticsKindMeta>) {
72 let mut kind_meta_map = self.kind_meta_map.write();
73 kind_meta_map.insert(kind_meta.get_name().to_string(), kind_meta.clone());
74 let mut stats_table = self.stats_table.write();
75 stats_table
76 .entry(kind_meta.get_name().to_string())
77 .or_default();
78 }
79
80 pub fn set_brief_meta(&mut self, brief_metas: Vec<(String, Vec<Vec<i64>>)>) {
81 self.brief_metas = Some(brief_metas);
82 }
83
84 fn start(&self) {
85 let stats_table = self.stats_table.clone();
86 let kind_meta_map = self.kind_meta_map.clone();
87 let statistics_item_state_getter = self.statistics_item_state_getter.clone();
88
89 task::spawn(async move {
90 let mut interval = interval(Duration::from_millis(Self::MAX_IDLE_TIME / 3));
91 let stats_table_clone = stats_table.clone();
92 loop {
93 interval.tick().await;
94
95 let stats_table = stats_table.read();
96 for (_kind, item_map) in stats_table.iter() {
97 let tmp_item_map: HashMap<_, _> = item_map.clone().into_iter().collect();
98
99 for item in tmp_item_map.values() {
100 let last_time_stamp = item.last_timestamp();
101 if get_current_millis() - last_time_stamp > Self::MAX_IDLE_TIME
102 && (statistics_item_state_getter.is_none()
103 || !statistics_item_state_getter.as_ref().unwrap().online(item))
104 {
105 remove(item, &stats_table_clone, &kind_meta_map);
107 }
108 }
109 }
110 }
111 });
112 }
113
114 pub async fn inc(&self, kind: &str, key: &str, item_accumulates: Vec<i64>) -> bool {
115 if let Some(item_map) = self.stats_table.write().get_mut(kind) {
116 if let Some(item) = item_map.get(key) {
117 item.inc_items(item_accumulates.clone());
118 return true;
119 } else {
120 let kind_meta_map = self.kind_meta_map.read();
121 if let Some(kind_meta) = kind_meta_map.get(kind) {
122 let new_item = Arc::new(StatisticsItem::new(
123 kind,
124 key,
125 kind_meta
126 .get_item_names()
127 .iter()
128 .map(|item| item.as_str())
129 .collect(),
130 ));
131 item_map.insert(key.to_string(), new_item.clone());
132 new_item.inc_items(item_accumulates);
133 self.schedule_statistics_item(new_item);
134 return true;
135 }
136 }
137 }
138 false
139 }
140
141 fn schedule_statistics_item(&self, item: Arc<StatisticsItem>) {
142 let kind_meta_map = self.kind_meta_map.read();
143 if let Some(kind_meta) = kind_meta_map.get(item.stat_kind()) {
144 kind_meta.get_scheduled_printer().schedule(item.as_ref());
145 }
146 }
147
148 pub fn set_statistics_item_state_getter(
149 &mut self,
150 getter: Arc<dyn StatisticsItemStateGetter + Send + Sync>,
151 ) {
152 self.statistics_item_state_getter = Some(getter);
153 }
154}
155
156pub fn remove(
157 item: &StatisticsItem,
158 stats_table: &StatsTable,
159 kind_meta_map: &Arc<RwLock<HashMap<String, Arc<StatisticsKindMeta>>>>,
160) {
161 let stat_kind = item.stat_kind();
162 let stat_object = item.stat_object();
163 if let Some(item_map) = stats_table.write().get_mut(stat_kind) {
164 item_map.remove(stat_object);
165 }
166
167 let kind_meta_map = kind_meta_map.write();
169 if let Some(kind_meta) = kind_meta_map.get(stat_kind) {
170 kind_meta.get_scheduled_printer().remove(item);
171 }
172}